xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision 44bdcb2d3326b2ce09a36bf9b6840bc8773a5d60)
1 #ifdef PETSC_RCS_HEADER
2 static char vcid[] = "$Id: mpibaij.c,v 1.128 1998/05/31 23:32:48 bsmith Exp bsmith $";
3 #endif
4 
5 #include "pinclude/pviewer.h"         /*I "mat.h" I*/
6 #include "src/mat/impls/baij/mpi/mpibaij.h"
7 #include "src/vec/vecimpl.h"
8 
9 
10 extern int MatSetUpMultiply_MPIBAIJ(Mat);
11 extern int DisAssemble_MPIBAIJ(Mat);
12 extern int MatIncreaseOverlap_MPIBAIJ(Mat,int,IS *,int);
13 extern int MatGetSubMatrices_MPIBAIJ(Mat,int,IS *,IS *,MatGetSubMatrixCall,Mat **);
14 extern int MatGetValues_SeqBAIJ(Mat,int,int *,int,int *,Scalar *);
15 
16 /*
17      Local utility routine that creates a mapping from the global column
18    number to the local number in the off-diagonal part of the local
19    storage of the matrix.  This is done in a non scable way since the
20    length of colmap equals the global matrix length.
21 */
22 #undef __FUNC__
23 #define __FUNC__ "CreateColmap_MPIBAIJ_Private"
24 static int CreateColmap_MPIBAIJ_Private(Mat mat)
25 {
26   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
27   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
28   int         nbs = B->nbs,i,bs=B->bs;;
29 
30   PetscFunctionBegin;
31   baij->colmap = (int *) PetscMalloc((baij->Nbs+1)*sizeof(int));CHKPTRQ(baij->colmap);
32   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
33   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
34   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i*bs+1;
35   PetscFunctionReturn(0);
36 }
37 
38 #define CHUNKSIZE  10
39 
40 #define  MatSetValues_SeqBAIJ_A_Private(row,col,value,addv) \
41 { \
42  \
43     brow = row/bs;  \
44     rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
45     rmax = aimax[brow]; nrow = ailen[brow]; \
46       bcol = col/bs; \
47       ridx = row % bs; cidx = col % bs; \
48       low = 0; high = nrow; \
49       while (high-low > 3) { \
50         t = (low+high)/2; \
51         if (rp[t] > bcol) high = t; \
52         else              low  = t; \
53       } \
54       for ( _i=low; _i<high; _i++ ) { \
55         if (rp[_i] > bcol) break; \
56         if (rp[_i] == bcol) { \
57           bap  = ap +  bs2*_i + bs*cidx + ridx; \
58           if (addv == ADD_VALUES) *bap += value;  \
59           else                    *bap  = value;  \
60           goto a_noinsert; \
61         } \
62       } \
63       if (a->nonew == 1) goto a_noinsert; \
64       else if (a->nonew == -1) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero into matrix"); \
65       if (nrow >= rmax) { \
66         /* there is no extra room in row, therefore enlarge */ \
67         int    new_nz = ai[a->mbs] + CHUNKSIZE,len,*new_i,*new_j; \
68         Scalar *new_a; \
69  \
70         if (a->nonew == -2) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero in the matrix"); \
71  \
72         /* malloc new storage space */ \
73         len     = new_nz*(sizeof(int)+bs2*sizeof(Scalar))+(a->mbs+1)*sizeof(int); \
74         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
75         new_j   = (int *) (new_a + bs2*new_nz); \
76         new_i   = new_j + new_nz; \
77  \
78         /* copy over old data into new slots */ \
79         for ( ii=0; ii<brow+1; ii++ ) {new_i[ii] = ai[ii];} \
80         for ( ii=brow+1; ii<a->mbs+1; ii++ ) {new_i[ii] = ai[ii]+CHUNKSIZE;} \
81         PetscMemcpy(new_j,aj,(ai[brow]+nrow)*sizeof(int)); \
82         len = (new_nz - CHUNKSIZE - ai[brow] - nrow); \
83         PetscMemcpy(new_j+ai[brow]+nrow+CHUNKSIZE,aj+ai[brow]+nrow, \
84                                                            len*sizeof(int)); \
85         PetscMemcpy(new_a,aa,(ai[brow]+nrow)*bs2*sizeof(Scalar)); \
86         PetscMemzero(new_a+bs2*(ai[brow]+nrow),bs2*CHUNKSIZE*sizeof(Scalar)); \
87         PetscMemcpy(new_a+bs2*(ai[brow]+nrow+CHUNKSIZE), \
88                     aa+bs2*(ai[brow]+nrow),bs2*len*sizeof(Scalar));  \
89         /* free up old matrix storage */ \
90         PetscFree(a->a);  \
91         if (!a->singlemalloc) {PetscFree(a->i);PetscFree(a->j);} \
92         aa = a->a = new_a; ai = a->i = new_i; aj = a->j = new_j;  \
93         a->singlemalloc = 1; \
94  \
95         rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
96         rmax = aimax[brow] = aimax[brow] + CHUNKSIZE; \
97         PLogObjectMemory(A,CHUNKSIZE*(sizeof(int) + bs2*sizeof(Scalar))); \
98         a->maxnz += bs2*CHUNKSIZE; \
99         a->reallocs++; \
100         a->nz++; \
101       } \
102       N = nrow++ - 1;  \
103       /* shift up all the later entries in this row */ \
104       for ( ii=N; ii>=_i; ii-- ) { \
105         rp[ii+1] = rp[ii]; \
106         PetscMemcpy(ap+bs2*(ii+1),ap+bs2*(ii),bs2*sizeof(Scalar)); \
107       } \
108       if (N>=_i) PetscMemzero(ap+bs2*_i,bs2*sizeof(Scalar));  \
109       rp[_i]                      = bcol;  \
110       ap[bs2*_i + bs*cidx + ridx] = value;  \
111       a_noinsert:; \
112     ailen[brow] = nrow; \
113 }
114 
115 #define  MatSetValues_SeqBAIJ_B_Private(row,col,value,addv) \
116 { \
117  \
118     brow = row/bs;  \
119     rp   = bj + bi[brow]; ap = ba + bs2*bi[brow]; \
120     rmax = bimax[brow]; nrow = bilen[brow]; \
121       bcol = col/bs; \
122       ridx = row % bs; cidx = col % bs; \
123       low = 0; high = nrow; \
124       while (high-low > 3) { \
125         t = (low+high)/2; \
126         if (rp[t] > bcol) high = t; \
127         else              low  = t; \
128       } \
129       for ( _i=low; _i<high; _i++ ) { \
130         if (rp[_i] > bcol) break; \
131         if (rp[_i] == bcol) { \
132           bap  = ap +  bs2*_i + bs*cidx + ridx; \
133           if (addv == ADD_VALUES) *bap += value;  \
134           else                    *bap  = value;  \
135           goto b_noinsert; \
136         } \
137       } \
138       if (b->nonew == 1) goto b_noinsert; \
139       else if (b->nonew == -1) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero into matrix"); \
140       if (nrow >= rmax) { \
141         /* there is no extra room in row, therefore enlarge */ \
142         int    new_nz = bi[b->mbs] + CHUNKSIZE,len,*new_i,*new_j; \
143         Scalar *new_a; \
144  \
145         if (b->nonew == -2) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero in the matrix"); \
146  \
147         /* malloc new storage space */ \
148         len     = new_nz*(sizeof(int)+bs2*sizeof(Scalar))+(b->mbs+1)*sizeof(int); \
149         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
150         new_j   = (int *) (new_a + bs2*new_nz); \
151         new_i   = new_j + new_nz; \
152  \
153         /* copy over old data into new slots */ \
154         for ( ii=0; ii<brow+1; ii++ ) {new_i[ii] = bi[ii];} \
155         for ( ii=brow+1; ii<b->mbs+1; ii++ ) {new_i[ii] = bi[ii]+CHUNKSIZE;} \
156         PetscMemcpy(new_j,bj,(bi[brow]+nrow)*sizeof(int)); \
157         len = (new_nz - CHUNKSIZE - bi[brow] - nrow); \
158         PetscMemcpy(new_j+bi[brow]+nrow+CHUNKSIZE,bj+bi[brow]+nrow, \
159                                                            len*sizeof(int)); \
160         PetscMemcpy(new_a,ba,(bi[brow]+nrow)*bs2*sizeof(Scalar)); \
161         PetscMemzero(new_a+bs2*(bi[brow]+nrow),bs2*CHUNKSIZE*sizeof(Scalar)); \
162         PetscMemcpy(new_a+bs2*(bi[brow]+nrow+CHUNKSIZE), \
163                     ba+bs2*(bi[brow]+nrow),bs2*len*sizeof(Scalar));  \
164         /* free up old matrix storage */ \
165         PetscFree(b->a);  \
166         if (!b->singlemalloc) {PetscFree(b->i);PetscFree(b->j);} \
167         ba = b->a = new_a; bi = b->i = new_i; bj = b->j = new_j;  \
168         b->singlemalloc = 1; \
169  \
170         rp   = bj + bi[brow]; ap = ba + bs2*bi[brow]; \
171         rmax = bimax[brow] = bimax[brow] + CHUNKSIZE; \
172         PLogObjectMemory(B,CHUNKSIZE*(sizeof(int) + bs2*sizeof(Scalar))); \
173         b->maxnz += bs2*CHUNKSIZE; \
174         b->reallocs++; \
175         b->nz++; \
176       } \
177       N = nrow++ - 1;  \
178       /* shift up all the later entries in this row */ \
179       for ( ii=N; ii>=_i; ii-- ) { \
180         rp[ii+1] = rp[ii]; \
181         PetscMemcpy(ap+bs2*(ii+1),ap+bs2*(ii),bs2*sizeof(Scalar)); \
182       } \
183       if (N>=_i) PetscMemzero(ap+bs2*_i,bs2*sizeof(Scalar));  \
184       rp[_i]                      = bcol;  \
185       ap[bs2*_i + bs*cidx + ridx] = value;  \
186       b_noinsert:; \
187     bilen[brow] = nrow; \
188 }
189 
190 #undef __FUNC__
191 #define __FUNC__ "MatSetValues_MPIBAIJ"
192 int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
193 {
194   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
195   Scalar      value;
196   int         ierr,i,j,row,col;
197   int         roworiented = baij->roworiented,rstart_orig=baij->rstart_bs ;
198   int         rend_orig=baij->rend_bs,cstart_orig=baij->cstart_bs;
199   int         cend_orig=baij->cend_bs,bs=baij->bs;
200 
201   /* Some Variables required in the macro */
202   Mat         A = baij->A;
203   Mat_SeqBAIJ *a = (Mat_SeqBAIJ *) (A)->data;
204   int         *aimax=a->imax,*ai=a->i,*ailen=a->ilen,*aj=a->j;
205   Scalar      *aa=a->a;
206 
207   Mat         B = baij->B;
208   Mat_SeqBAIJ *b = (Mat_SeqBAIJ *) (B)->data;
209   int         *bimax=b->imax,*bi=b->i,*bilen=b->ilen,*bj=b->j;
210   Scalar      *ba=b->a;
211 
212   int         *rp,ii,nrow,_i,rmax,N,brow,bcol;
213   int         low,high,t,ridx,cidx,bs2=a->bs2;
214   Scalar      *ap,*bap;
215 
216   PetscFunctionBegin;
217   for ( i=0; i<m; i++ ) {
218 #if defined(USE_PETSC_BOPT_g)
219     if (im[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
220     if (im[i] >= baij->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
221 #endif
222     if (im[i] >= rstart_orig && im[i] < rend_orig) {
223       row = im[i] - rstart_orig;
224       for ( j=0; j<n; j++ ) {
225         if (in[j] >= cstart_orig && in[j] < cend_orig){
226           col = in[j] - cstart_orig;
227           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
228           MatSetValues_SeqBAIJ_A_Private(row,col,value,addv);
229           /* ierr = MatSetValues_SeqBAIJ(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
230         }
231 #if defined(USE_PETSC_BOPT_g)
232         else if (in[j] < 0) {SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative column");}
233         else if (in[j] >= baij->N) {SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Col too large");}
234 #endif
235         else {
236           if (mat->was_assembled) {
237             if (!baij->colmap) {
238               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
239             }
240             col = baij->colmap[in[j]/bs] - 1 + in[j]%bs;
241             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
242               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
243               col =  in[j];
244               /* Reinitialize the variables required by MatSetValues_SeqBAIJ_B_Private() */
245               B = baij->B;
246               b = (Mat_SeqBAIJ *) (B)->data;
247               bimax=b->imax;bi=b->i;bilen=b->ilen;bj=b->j;
248               ba=b->a;
249             }
250           } else col = in[j];
251           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
252           MatSetValues_SeqBAIJ_B_Private(row,col,value,addv);
253           /* ierr = MatSetValues_SeqBAIJ(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
254         }
255       }
256     } else {
257       if (roworiented && !baij->donotstash) {
258         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
259       } else {
260         if (!baij->donotstash) {
261           row = im[i];
262 	  for ( j=0; j<n; j++ ) {
263 	    ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
264           }
265         }
266       }
267     }
268   }
269   PetscFunctionReturn(0);
270 }
271 
272 extern int MatSetValuesBlocked_SeqBAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
273 #undef __FUNC__
274 #define __FUNC__ "MatSetValuesBlocked_MPIBAIJ"
275 int MatSetValuesBlocked_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
276 {
277   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
278   Scalar      *value,*barray=baij->barray;
279   int         ierr,i,j,ii,jj,row,col,k,l;
280   int         roworiented = baij->roworiented,rstart=baij->rstart ;
281   int         rend=baij->rend,cstart=baij->cstart,stepval;
282   int         cend=baij->cend,bs=baij->bs,bs2=baij->bs2;
283 
284   if(!barray) {
285     baij->barray = barray = (Scalar*) PetscMalloc(bs2*sizeof(Scalar)); CHKPTRQ(barray);
286   }
287 
288   if (roworiented) {
289     stepval = (n-1)*bs;
290   } else {
291     stepval = (m-1)*bs;
292   }
293   for ( i=0; i<m; i++ ) {
294 #if defined(USE_PETSC_BOPT_g)
295     if (im[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
296     if (im[i] >= baij->Mbs) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
297 #endif
298     if (im[i] >= rstart && im[i] < rend) {
299       row = im[i] - rstart;
300       for ( j=0; j<n; j++ ) {
301         /* If NumCol = 1 then a copy is not required */
302         if ((roworiented) && (n == 1)) {
303           barray = v + i*bs2;
304         } else if((!roworiented) && (m == 1)) {
305           barray = v + j*bs2;
306         } else { /* Here a copy is required */
307           if (roworiented) {
308             value = v + i*(stepval+bs)*bs + j*bs;
309           } else {
310             value = v + j*(stepval+bs)*bs + i*bs;
311           }
312           for ( ii=0; ii<bs; ii++,value+=stepval ) {
313             for (jj=0; jj<bs; jj++ ) {
314               *barray++  = *value++;
315             }
316           }
317           barray -=bs2;
318         }
319 
320         if (in[j] >= cstart && in[j] < cend){
321           col  = in[j] - cstart;
322           ierr = MatSetValuesBlocked_SeqBAIJ(baij->A,1,&row,1,&col,barray,addv);CHKERRQ(ierr);
323         }
324 #if defined(USE_PETSC_BOPT_g)
325         else if (in[j] < 0) {SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative column");}
326         else if (in[j] >= baij->Nbs) {SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Column too large");}
327 #endif
328         else {
329           if (mat->was_assembled) {
330             if (!baij->colmap) {
331               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
332             }
333 
334 #if defined(USE_PETSC_BOPT_g)
335             if ((baij->colmap[in[j]] - 1) % bs) {SETERRQ(PETSC_ERR_PLIB,0,"Incorrect colmap");}
336 #endif
337             col = (baij->colmap[in[j]] - 1)/bs;
338             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
339               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
340               col =  in[j];
341             }
342           }
343           else col = in[j];
344           ierr = MatSetValuesBlocked_SeqBAIJ(baij->B,1,&row,1,&col,barray,addv);CHKERRQ(ierr);
345         }
346       }
347     } else {
348       if (!baij->donotstash) {
349         if (roworiented ) {
350           row   = im[i]*bs;
351           value = v + i*(stepval+bs)*bs;
352           for ( j=0; j<bs; j++,row++ ) {
353             for ( k=0; k<n; k++ ) {
354               for ( col=in[k]*bs,l=0; l<bs; l++,col++) {
355                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
356               }
357             }
358           }
359         } else {
360           for ( j=0; j<n; j++ ) {
361             value = v + j*(stepval+bs)*bs + i*bs;
362             col   = in[j]*bs;
363             for ( k=0; k<bs; k++,col++,value+=stepval) {
364               for ( row = im[i]*bs,l=0; l<bs; l++,row++) {
365                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
366               }
367             }
368           }
369         }
370       }
371     }
372   }
373   PetscFunctionReturn(0);
374 }
375 #include <math.h>
376 #define HASH_KEY 0.6180339887
377 /* #define HASH1(size,key) ((int)((size)*fmod(((key)*HASH_KEY),1))) */
378 #define HASH(size,key,tmp) (tmp = (key)*HASH_KEY,(int)((size)*(tmp-(int)tmp)))
379 /* #define HASH(size,key,tmp) ((int)((size)*fmod(((key)*HASH_KEY),1))) */
380 #undef __FUNC__
381 #define __FUNC__ "MatSetValues_MPIBAIJ_HT"
382 int MatSetValues_MPIBAIJ_HT(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
383 {
384   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
385   int         ierr,i,j,row,col;
386   int         roworiented = baij->roworiented,rstart_orig=baij->rstart_bs ;
387   int         rend_orig=baij->rend_bs,Nbs=baij->Nbs;
388   int         h1,key,size=baij->ht_size,bs=baij->bs,*HT=baij->ht,idx;
389   double      tmp;
390   Scalar      ** HD = baij->hd,value;
391 #if defined(USE_PETSC_BOPT_g)
392   int         total_ct=baij->ht_total_ct,insert_ct=baij->ht_insert_ct;
393 #endif
394 
395   PetscFunctionBegin;
396 
397   for ( i=0; i<m; i++ ) {
398 #if defined(USE_PETSC_BOPT_g)
399     if (im[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
400     if (im[i] >= baij->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
401 #endif
402       row = im[i];
403     if (row >= rstart_orig && row < rend_orig) {
404       for ( j=0; j<n; j++ ) {
405         col = in[j];
406         if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
407         /* Look up into the Hash Table */
408         key = (row/bs)*Nbs+(col/bs)+1;
409         h1  = HASH(size,key,tmp);
410 
411 
412         idx = h1;
413 #if defined(USE_PETSC_BOPT_g)
414         insert_ct++;
415         total_ct++;
416         if (HT[idx] != key) {
417           for ( idx=h1; (idx<size) && (HT[idx]!=key); idx++,total_ct++);
418           if (idx == size) {
419             for ( idx=0; (idx<h1) && (HT[idx]!=key); idx++,total_ct++);
420             if (idx == h1) {
421               SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"(row,col) has no entry in the hash table");
422             }
423           }
424         }
425 #else
426         if (HT[idx] != key) {
427           for ( idx=h1; (idx<size) && (HT[idx]!=key); idx++);
428           if (idx == size) {
429             for ( idx=0; (idx<h1) && (HT[idx]!=key); idx++);
430             if (idx == h1) {
431               SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"(row,col) has no entry in the hash table");
432             }
433           }
434         }
435 #endif
436         /* A HASH table entry is found, so insert the values at the correct address */
437         if (addv == ADD_VALUES) *(HD[idx]+ (col % bs)*bs + (row % bs)) += value;
438         else                    *(HD[idx]+ (col % bs)*bs + (row % bs))  = value;
439       }
440     } else {
441       if (roworiented && !baij->donotstash) {
442         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
443       } else {
444         if (!baij->donotstash) {
445           row = im[i];
446 	  for ( j=0; j<n; j++ ) {
447 	    ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
448           }
449         }
450       }
451     }
452   }
453 #if defined(USE_PETSC_BOPT_g)
454   baij->ht_total_ct = total_ct;
455   baij->ht_insert_ct = insert_ct;
456 #endif
457   PetscFunctionReturn(0);
458 }
459 
460 #undef __FUNC__
461 #define __FUNC__ "MatSetValuesBlocked_MPIBAIJ_HT"
462 int MatSetValuesBlocked_MPIBAIJ_HT(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
463 {
464   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
465   int         ierr,i,j,ii,jj,row,col,k,l;
466   int         roworiented = baij->roworiented,rstart=baij->rstart ;
467   int         rend=baij->rend,stepval,bs=baij->bs,bs2=baij->bs2;
468   int         h1,key,size=baij->ht_size,idx,*HT=baij->ht,Nbs=baij->Nbs;
469   double      tmp;
470   Scalar      ** HD = baij->hd,*value,*v_t,*baij_a;
471 #if defined(USE_PETSC_BOPT_g)
472   int         total_ct=baij->ht_total_ct,insert_ct=baij->ht_insert_ct;
473 #endif
474 
475   PetscFunctionBegin;
476 
477   if (roworiented) {
478     stepval = (n-1)*bs;
479   } else {
480     stepval = (m-1)*bs;
481   }
482   for ( i=0; i<m; i++ ) {
483 #if defined(USE_PETSC_BOPT_g)
484     if (im[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
485     if (im[i] >= baij->Mbs) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
486 #endif
487     row   = im[i];
488     v_t   = v + i*bs2;
489     if (row >= rstart && row < rend) {
490       for ( j=0; j<n; j++ ) {
491         col = in[j];
492 
493         /* Look up into the Hash Table */
494         key = row*Nbs+col+1;
495         h1  = HASH(size,key,tmp);
496 
497         idx = h1;
498 #if defined(USE_PETSC_BOPT_g)
499         total_ct++;
500         insert_ct++;
501        if (HT[idx] != key) {
502           for ( idx=h1; (idx<size) && (HT[idx]!=key); idx++,total_ct++);
503           if (idx == size) {
504             for ( idx=0; (idx<h1) && (HT[idx]!=key); idx++,total_ct++);
505             if (idx == h1) {
506               SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"(row,col) has no entry in the hash table");
507             }
508           }
509         }
510 #else
511         if (HT[idx] != key) {
512           for ( idx=h1; (idx<size) && (HT[idx]!=key); idx++);
513           if (idx == size) {
514             for ( idx=0; (idx<h1) && (HT[idx]!=key); idx++);
515             if (idx == h1) {
516               SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"(row,col) has no entry in the hash table");
517             }
518           }
519         }
520 #endif
521         baij_a = HD[idx];
522         if (roworiented) {
523           /*value = v + i*(stepval+bs)*bs + j*bs;*/
524           /* value = v + (i*(stepval+bs)+j)*bs; */
525           value = v_t;
526           v_t  += bs;
527           if (addv == ADD_VALUES) {
528             for ( ii=0; ii<bs; ii++,value+=stepval) {
529               for ( jj=ii; jj<bs2; jj+=bs ) {
530                 baij_a[jj]  += *value++;
531               }
532             }
533           } else {
534             for ( ii=0; ii<bs; ii++,value+=stepval) {
535               for ( jj=ii; jj<bs2; jj+=bs ) {
536                 baij_a[jj]  = *value++;
537               }
538             }
539           }
540         } else {
541           value = v + j*(stepval+bs)*bs + i*bs;
542           if (addv == ADD_VALUES) {
543             for ( ii=0; ii<bs; ii++,value+=stepval,baij_a+=bs ) {
544               for ( jj=0; jj<bs; jj++ ) {
545                 baij_a[jj]  += *value++;
546               }
547             }
548           } else {
549             for ( ii=0; ii<bs; ii++,value+=stepval,baij_a+=bs ) {
550               for ( jj=0; jj<bs; jj++ ) {
551                 baij_a[jj]  = *value++;
552               }
553             }
554           }
555         }
556       }
557     } else {
558       if (!baij->donotstash) {
559         if (roworiented ) {
560           row   = im[i]*bs;
561           value = v + i*(stepval+bs)*bs;
562           for ( j=0; j<bs; j++,row++ ) {
563             for ( k=0; k<n; k++ ) {
564               for ( col=in[k]*bs,l=0; l<bs; l++,col++) {
565                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
566               }
567             }
568           }
569         } else {
570           for ( j=0; j<n; j++ ) {
571             value = v + j*(stepval+bs)*bs + i*bs;
572             col   = in[j]*bs;
573             for ( k=0; k<bs; k++,col++,value+=stepval) {
574               for ( row = im[i]*bs,l=0; l<bs; l++,row++) {
575                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
576               }
577             }
578           }
579         }
580       }
581     }
582   }
583 #if defined(USE_PETSC_BOPT_g)
584   baij->ht_total_ct = total_ct;
585   baij->ht_insert_ct = insert_ct;
586 #endif
587   PetscFunctionReturn(0);
588 }
589 
590 #undef __FUNC__
591 #define __FUNC__ "MatGetValues_MPIBAIJ"
592 int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
593 {
594   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
595   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
596   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col;
597 
598   PetscFunctionBegin;
599   for ( i=0; i<m; i++ ) {
600     if (idxm[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
601     if (idxm[i] >= baij->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
602     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
603       row = idxm[i] - bsrstart;
604       for ( j=0; j<n; j++ ) {
605         if (idxn[j] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative column");
606         if (idxn[j] >= baij->N) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Column too large");
607         if (idxn[j] >= bscstart && idxn[j] < bscend){
608           col = idxn[j] - bscstart;
609           ierr = MatGetValues_SeqBAIJ(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
610         } else {
611           if (!baij->colmap) {
612             ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
613           }
614           if((baij->colmap[idxn[j]/bs]-1 < 0) ||
615              (baij->garray[(baij->colmap[idxn[j]/bs]-1)/bs] != idxn[j]/bs)) *(v+i*n+j) = 0.0;
616           else {
617             col  = (baij->colmap[idxn[j]/bs]-1) + idxn[j]%bs;
618             ierr = MatGetValues_SeqBAIJ(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
619           }
620         }
621       }
622     } else {
623       SETERRQ(PETSC_ERR_SUP,0,"Only local values currently supported");
624     }
625   }
626  PetscFunctionReturn(0);
627 }
628 
629 #undef __FUNC__
630 #define __FUNC__ "MatNorm_MPIBAIJ"
631 int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
632 {
633   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
634   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
635   int        ierr, i,bs2=baij->bs2;
636   double     sum = 0.0;
637   Scalar     *v;
638 
639   PetscFunctionBegin;
640   if (baij->size == 1) {
641     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
642   } else {
643     if (type == NORM_FROBENIUS) {
644       v = amat->a;
645       for (i=0; i<amat->nz*bs2; i++ ) {
646 #if defined(USE_PETSC_COMPLEX)
647         sum += PetscReal(PetscConj(*v)*(*v)); v++;
648 #else
649         sum += (*v)*(*v); v++;
650 #endif
651       }
652       v = bmat->a;
653       for (i=0; i<bmat->nz*bs2; i++ ) {
654 #if defined(USE_PETSC_COMPLEX)
655         sum += PetscReal(PetscConj(*v)*(*v)); v++;
656 #else
657         sum += (*v)*(*v); v++;
658 #endif
659       }
660       ierr = MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);CHKERRQ(ierr);
661       *norm = sqrt(*norm);
662     } else {
663       SETERRQ(PETSC_ERR_SUP,0,"No support for this norm yet");
664     }
665   }
666   PetscFunctionReturn(0);
667 }
668 
669 #undef __FUNC__
670 #define __FUNC__ "MatAssemblyBegin_MPIBAIJ"
671 int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
672 {
673   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
674   MPI_Comm    comm = mat->comm;
675   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
676   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
677   MPI_Request *send_waits,*recv_waits;
678   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
679   InsertMode  addv;
680   Scalar      *rvalues,*svalues;
681 
682   PetscFunctionBegin;
683   if (baij->donotstash) {
684     baij->svalues    = 0; baij->rvalues    = 0;
685     baij->nsends     = 0; baij->nrecvs     = 0;
686     baij->send_waits = 0; baij->recv_waits = 0;
687     baij->rmax       = 0;
688     PetscFunctionReturn(0);
689   }
690 
691   /* make sure all processors are either in INSERTMODE or ADDMODE */
692   ierr = MPI_Allreduce(&mat->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);CHKERRQ(ierr);
693   if (addv == (ADD_VALUES|INSERT_VALUES)) {
694     SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"Some processors inserted others added");
695   }
696   mat->insertmode = addv; /* in case this processor had no cache */
697 
698   /*  first count number of contributors to each processor */
699   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
700   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
701   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
702   for ( i=0; i<baij->stash.n; i++ ) {
703     idx = baij->stash.idx[i];
704     for ( j=0; j<size; j++ ) {
705       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
706         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
707       }
708     }
709   }
710   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
711 
712   /* inform other processors of number of messages and max length*/
713   work      = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
714   ierr      = MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
715   nreceives = work[rank];
716   ierr      = MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);CHKERRQ(ierr);
717   nmax      = work[rank];
718   PetscFree(work);
719 
720   /* post receives:
721        1) each message will consist of ordered pairs
722      (global index,value) we store the global index as a double
723      to simplify the message passing.
724        2) since we don't know how long each individual message is we
725      allocate the largest needed buffer for each receive. Potentially
726      this is a lot of wasted space.
727 
728 
729        This could be done better.
730   */
731   rvalues    = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));CHKPTRQ(rvalues);
732   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
733   for ( i=0; i<nreceives; i++ ) {
734     ierr = MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,comm,recv_waits+i);CHKERRQ(ierr);
735   }
736 
737   /* do sends:
738       1) starts[i] gives the starting index in svalues for stuff going to
739          the ith processor
740   */
741   svalues    = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
742   send_waits = (MPI_Request *) PetscMalloc((nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
743   starts     = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
744   starts[0] = 0;
745   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
746   for ( i=0; i<baij->stash.n; i++ ) {
747     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
748     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
749     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
750   }
751   PetscFree(owner);
752   starts[0] = 0;
753   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
754   count = 0;
755   for ( i=0; i<size; i++ ) {
756     if (procs[i]) {
757       ierr = MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,comm,send_waits+count++);CHKERRQ(ierr);
758     }
759   }
760   PetscFree(starts); PetscFree(nprocs);
761 
762   /* Free cache space */
763   PLogInfo(baij->A,"MatAssemblyBegin_MPIBAIJ:Number of off-processor values %d\n",baij->stash.n);
764   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
765 
766   baij->svalues    = svalues;    baij->rvalues    = rvalues;
767   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
768   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
769   baij->rmax       = nmax;
770 
771   PetscFunctionReturn(0);
772 }
773 
774 /*
775   Creates the hash table, and sets the table
776   This table is created only once.
777   If new entried need to be added to the matrix
778   then the hash table has to be destroyed and
779   recreated.
780 */
781 #undef __FUNC__
782 #define __FUNC__ "MatCreateHashTable_MPIBAIJ_Private"
783 int MatCreateHashTable_MPIBAIJ_Private(Mat mat,double factor)
784 {
785   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
786   Mat         A = baij->A, B=baij->B;
787   Mat_SeqBAIJ *a=(Mat_SeqBAIJ *)A->data, *b=(Mat_SeqBAIJ *)B->data;
788   int         i,j,k,nz=a->nz+b->nz,h1,*ai=a->i,*aj=a->j,*bi=b->i,*bj=b->j;
789   int         size,bs2=baij->bs2,rstart=baij->rstart;
790   int         cstart=baij->cstart,*garray=baij->garray,row,col,Nbs=baij->Nbs;
791   int         *HT,key;
792   Scalar      **HD;
793   double      tmp;
794 #if defined(USE_PETSC_BOPT_g)
795   int         ct=0,max=0;
796 #endif
797 
798   PetscFunctionBegin;
799   baij->ht_size=(int)(factor*nz);
800   size = baij->ht_size;
801 
802   if (baij->ht) {
803     PetscFunctionReturn(0);
804   }
805 
806   /* Allocate Memory for Hash Table */
807   baij->hd = (Scalar**)PetscMalloc((size)*(sizeof(int)+sizeof(Scalar*))+1); CHKPTRQ(baij->hd);
808   baij->ht = (int*)(baij->hd + size);
809   HD = baij->hd;
810   HT = baij->ht;
811 
812 
813   PetscMemzero(HD,size*(sizeof(int)+sizeof(Scalar*)));
814 
815 
816   /* Loop Over A */
817   for ( i=0; i<a->mbs; i++ ) {
818     for ( j=ai[i]; j<ai[i+1]; j++ ) {
819       row = i+rstart;
820       col = aj[j]+cstart;
821 
822       key = row*Nbs + col + 1;
823       h1  = HASH(size,key,tmp);
824       for ( k=0; k<size; k++ ){
825         if (HT[(h1+k)%size] == 0.0) {
826           HT[(h1+k)%size] = key;
827           HD[(h1+k)%size] = a->a + j*bs2;
828           break;
829 #if defined(USE_PETSC_BOPT_g)
830         } else {
831           ct++;
832 #endif
833         }
834       }
835 #if defined(USE_PETSC_BOPT_g)
836       if (k> max) max = k;
837 #endif
838     }
839   }
840   /* Loop Over B */
841   for ( i=0; i<b->mbs; i++ ) {
842     for ( j=bi[i]; j<bi[i+1]; j++ ) {
843       row = i+rstart;
844       col = garray[bj[j]];
845       key = row*Nbs + col + 1;
846       h1  = HASH(size,key,tmp);
847       for ( k=0; k<size; k++ ){
848         if (HT[(h1+k)%size] == 0.0) {
849           HT[(h1+k)%size] = key;
850           HD[(h1+k)%size] = b->a + j*bs2;
851           break;
852 #if defined(USE_PETSC_BOPT_g)
853         } else {
854           ct++;
855 #endif
856         }
857       }
858 #if defined(USE_PETSC_BOPT_g)
859       if (k> max) max = k;
860 #endif
861     }
862   }
863 
864   /* Print Summary */
865 #if defined(USE_PETSC_BOPT_g)
866   for ( i=0,j=0; i<size; i++)
867     if (HT[i]) {j++;}
868   PLogInfo(0,"MatCreateHashTable_MPIBAIJ_Private: Average Search = %5.2f,max search = %d\n",
869            (j== 0)? 0.0:((double)(ct+j))/j,max);
870 #endif
871   PetscFunctionReturn(0);
872 }
873 
874 #undef __FUNC__
875 #define __FUNC__ "MatAssemblyEnd_MPIBAIJ"
876 int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
877 {
878   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
879   MPI_Status  *send_status,recv_status;
880   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
881   int         bs=baij->bs,row,col,other_disassembled;
882   Scalar      *values,val;
883   InsertMode  addv = mat->insertmode;
884 
885   PetscFunctionBegin;
886   /*  wait on receives */
887   while (count) {
888     ierr = MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);CHKERRQ(ierr);
889     /* unpack receives into our local space */
890     values = baij->rvalues + 3*imdex*baij->rmax;
891     ierr = MPI_Get_count(&recv_status,MPIU_SCALAR,&n);CHKERRQ(ierr);
892     n = n/3;
893     for ( i=0; i<n; i++ ) {
894       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
895       col = (int) PetscReal(values[3*i+1]);
896       val = values[3*i+2];
897       if (col >= baij->cstart*bs && col < baij->cend*bs) {
898         col -= baij->cstart*bs;
899         ierr = MatSetValues(baij->A,1,&row,1,&col,&val,addv); CHKERRQ(ierr)
900       } else {
901         if (mat->was_assembled) {
902           if (!baij->colmap) {
903             ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);
904           }
905           col = (baij->colmap[col/bs]) - 1 + col%bs;
906           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
907             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
908             col = (int) PetscReal(values[3*i+1]);
909           }
910         }
911         ierr = MatSetValues(baij->B,1,&row,1,&col,&val,addv); CHKERRQ(ierr)
912       }
913     }
914     count--;
915   }
916   if (baij->recv_waits) PetscFree(baij->recv_waits);
917   if (baij->rvalues)    PetscFree(baij->rvalues);
918 
919   /* wait on sends */
920   if (baij->nsends) {
921     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
922     ierr        = MPI_Waitall(baij->nsends,baij->send_waits,send_status);CHKERRQ(ierr);
923     PetscFree(send_status);
924   }
925   if (baij->send_waits) PetscFree(baij->send_waits);
926   if (baij->svalues)    PetscFree(baij->svalues);
927 
928   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
929   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
930 
931   /* determine if any processor has disassembled, if so we must
932      also disassemble ourselfs, in order that we may reassemble. */
933   /*
934      if nonzero structure of submatrix B cannot change then we know that
935      no processor disassembled thus we can skip this stuff
936   */
937   if (!((Mat_SeqBAIJ*) baij->B->data)->nonew)  {
938     ierr = MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);CHKERRQ(ierr);
939     if (mat->was_assembled && !other_disassembled) {
940       ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
941     }
942   }
943 
944   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
945     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
946   }
947   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
948   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
949 
950 #if defined(USE_PETSC_BOPT_g)
951   if (baij->ht && mode== MAT_FINAL_ASSEMBLY) {
952     PLogInfo(0,"MatAssemblyEnd_MPIBAIJ:Average Hash Table Search in MatSetValues = %5.2f\n",
953              ((double)baij->ht_total_ct)/baij->ht_insert_ct);
954     baij->ht_total_ct  = 0;
955     baij->ht_insert_ct = 0;
956   }
957 #endif
958   if (baij->ht_flag && !baij->ht && mode == MAT_FINAL_ASSEMBLY) {
959     ierr = MatCreateHashTable_MPIBAIJ_Private(mat,baij->ht_fact); CHKERRQ(ierr);
960     mat->ops->setvalues        = MatSetValues_MPIBAIJ_HT;
961     mat->ops->setvaluesblocked = MatSetValuesBlocked_MPIBAIJ_HT;
962   }
963 
964   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
965   PetscFunctionReturn(0);
966 }
967 
968 #undef __FUNC__
969 #define __FUNC__ "MatView_MPIBAIJ_Binary"
970 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
971 {
972   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
973   int          ierr;
974 
975   PetscFunctionBegin;
976   if (baij->size == 1) {
977     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
978   } else SETERRQ(PETSC_ERR_SUP,0,"Only uniprocessor output supported");
979   PetscFunctionReturn(0);
980 }
981 
982 #undef __FUNC__
983 #define __FUNC__ "MatView_MPIBAIJ_ASCIIorDraworMatlab"
984 static int MatView_MPIBAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
985 {
986   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
987   int          ierr, format,rank,bs = baij->bs;
988   FILE         *fd;
989   ViewerType   vtype;
990 
991   PetscFunctionBegin;
992   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
993   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
994     ierr = ViewerGetFormat(viewer,&format);
995     if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
996       MatInfo info;
997       MPI_Comm_rank(mat->comm,&rank);
998       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
999       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
1000       PetscSequentialPhaseBegin(mat->comm,1);
1001       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
1002               rank,baij->m,(int)info.nz_used*bs,(int)info.nz_allocated*bs,
1003               baij->bs,(int)info.memory);
1004       ierr = MatGetInfo(baij->A,MAT_LOCAL,&info);
1005       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
1006       ierr = MatGetInfo(baij->B,MAT_LOCAL,&info);
1007       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
1008       fflush(fd);
1009       PetscSequentialPhaseEnd(mat->comm,1);
1010       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
1011       PetscFunctionReturn(0);
1012     } else if (format == VIEWER_FORMAT_ASCII_INFO) {
1013       PetscPrintf(mat->comm,"  block size is %d\n",bs);
1014       PetscFunctionReturn(0);
1015     }
1016   }
1017 
1018   if (vtype == DRAW_VIEWER) {
1019     Draw       draw;
1020     PetscTruth isnull;
1021     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
1022     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) PetscFunctionReturn(0);
1023   }
1024 
1025   if (vtype == ASCII_FILE_VIEWER) {
1026     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
1027     PetscSequentialPhaseBegin(mat->comm,1);
1028     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
1029            baij->rank,baij->m,baij->rstart*bs,baij->rend*bs,baij->n,
1030             baij->cstart*bs,baij->cend*bs);
1031     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
1032     ierr = MatView(baij->B,viewer); CHKERRQ(ierr);
1033     fflush(fd);
1034     PetscSequentialPhaseEnd(mat->comm,1);
1035   } else {
1036     int size = baij->size;
1037     rank = baij->rank;
1038     if (size == 1) {
1039       ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
1040     } else {
1041       /* assemble the entire matrix onto first processor. */
1042       Mat         A;
1043       Mat_SeqBAIJ *Aloc;
1044       int         M = baij->M, N = baij->N,*ai,*aj,col,i,j,k,*rvals;
1045       int         mbs=baij->mbs;
1046       Scalar      *a;
1047 
1048       if (!rank) {
1049         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);CHKERRQ(ierr);
1050       } else {
1051         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);CHKERRQ(ierr);
1052       }
1053       PLogObjectParent(mat,A);
1054 
1055       /* copy over the A part */
1056       Aloc = (Mat_SeqBAIJ*) baij->A->data;
1057       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1058       rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
1059 
1060       for ( i=0; i<mbs; i++ ) {
1061         rvals[0] = bs*(baij->rstart + i);
1062         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1063         for ( j=ai[i]; j<ai[i+1]; j++ ) {
1064           col = (baij->cstart+aj[j])*bs;
1065           for (k=0; k<bs; k++ ) {
1066             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
1067             col++; a += bs;
1068           }
1069         }
1070       }
1071       /* copy over the B part */
1072       Aloc = (Mat_SeqBAIJ*) baij->B->data;
1073       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1074       for ( i=0; i<mbs; i++ ) {
1075         rvals[0] = bs*(baij->rstart + i);
1076         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1077         for ( j=ai[i]; j<ai[i+1]; j++ ) {
1078           col = baij->garray[aj[j]]*bs;
1079           for (k=0; k<bs; k++ ) {
1080             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
1081             col++; a += bs;
1082           }
1083         }
1084       }
1085       PetscFree(rvals);
1086       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1087       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1088       /*
1089          Everyone has to call to draw the matrix since the graphics waits are
1090          synchronized across all processors that share the Draw object
1091       */
1092       if (!rank || vtype == DRAW_VIEWER) {
1093         ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
1094       }
1095       ierr = MatDestroy(A); CHKERRQ(ierr);
1096     }
1097   }
1098   PetscFunctionReturn(0);
1099 }
1100 
1101 
1102 
1103 #undef __FUNC__
1104 #define __FUNC__ "MatView_MPIBAIJ"
1105 int MatView_MPIBAIJ(Mat mat,Viewer viewer)
1106 {
1107   int         ierr;
1108   ViewerType  vtype;
1109 
1110   PetscFunctionBegin;
1111   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
1112   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
1113       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
1114     ierr = MatView_MPIBAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
1115   } else if (vtype == BINARY_FILE_VIEWER) {
1116     ierr = MatView_MPIBAIJ_Binary(mat,viewer);CHKERRQ(ierr);
1117   } else {
1118     SETERRQ(1,1,"Viewer type not supported by PETSc object");
1119   }
1120   PetscFunctionReturn(0);
1121 }
1122 
1123 #undef __FUNC__
1124 #define __FUNC__ "MatDestroy_MPIBAIJ"
1125 int MatDestroy_MPIBAIJ(Mat mat)
1126 {
1127   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1128   int         ierr;
1129 
1130   PetscFunctionBegin;
1131   if (--mat->refct > 0) PetscFunctionReturn(0);
1132 
1133   if (mat->mapping) {
1134     ierr = ISLocalToGlobalMappingDestroy(mat->mapping); CHKERRQ(ierr);
1135   }
1136   if (mat->bmapping) {
1137     ierr = ISLocalToGlobalMappingDestroy(mat->bmapping); CHKERRQ(ierr);
1138   }
1139 #if defined(USE_PETSC_LOG)
1140   PLogObjectState((PetscObject)mat,"Rows=%d, Cols=%d",baij->M,baij->N);
1141 #endif
1142 
1143   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
1144   PetscFree(baij->rowners);
1145   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
1146   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
1147   if (baij->colmap) PetscFree(baij->colmap);
1148   if (baij->garray) PetscFree(baij->garray);
1149   if (baij->lvec)   VecDestroy(baij->lvec);
1150   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
1151   if (baij->rowvalues) PetscFree(baij->rowvalues);
1152   if (baij->barray) PetscFree(baij->barray);
1153   if (baij->hd) PetscFree(baij->hd);
1154   PetscFree(baij);
1155   PLogObjectDestroy(mat);
1156   PetscHeaderDestroy(mat);
1157   PetscFunctionReturn(0);
1158 }
1159 
1160 #undef __FUNC__
1161 #define __FUNC__ "MatMult_MPIBAIJ"
1162 int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
1163 {
1164   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1165   int         ierr, nt;
1166 
1167   PetscFunctionBegin;
1168   ierr = VecGetLocalSize(xx,&nt);CHKERRQ(ierr);
1169   if (nt != a->n) {
1170     SETERRQ(PETSC_ERR_ARG_SIZ,0,"Incompatible partition of A and xx");
1171   }
1172   ierr = VecGetLocalSize(yy,&nt);CHKERRQ(ierr);
1173   if (nt != a->m) {
1174     SETERRQ(PETSC_ERR_ARG_SIZ,0,"Incompatible parition of A and yy");
1175   }
1176   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
1177   ierr = (*a->A->ops->mult)(a->A,xx,yy); CHKERRQ(ierr);
1178   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
1179   ierr = (*a->B->ops->multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
1180   ierr = VecScatterPostRecvs(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
1181   PetscFunctionReturn(0);
1182 }
1183 
1184 #undef __FUNC__
1185 #define __FUNC__ "MatMultAdd_MPIBAIJ"
1186 int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
1187 {
1188   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1189   int        ierr;
1190 
1191   PetscFunctionBegin;
1192   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
1193   ierr = (*a->A->ops->multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
1194   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
1195   ierr = (*a->B->ops->multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
1196   PetscFunctionReturn(0);
1197 }
1198 
1199 #undef __FUNC__
1200 #define __FUNC__ "MatMultTrans_MPIBAIJ"
1201 int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
1202 {
1203   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1204   int         ierr;
1205 
1206   PetscFunctionBegin;
1207   /* do nondiagonal part */
1208   ierr = (*a->B->ops->multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
1209   /* send it on its way */
1210   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
1211   /* do local part */
1212   ierr = (*a->A->ops->multtrans)(a->A,xx,yy); CHKERRQ(ierr);
1213   /* receive remote parts: note this assumes the values are not actually */
1214   /* inserted in yy until the next line, which is true for my implementation*/
1215   /* but is not perhaps always true. */
1216   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
1217   PetscFunctionReturn(0);
1218 }
1219 
1220 #undef __FUNC__
1221 #define __FUNC__ "MatMultTransAdd_MPIBAIJ"
1222 int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
1223 {
1224   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1225   int         ierr;
1226 
1227   PetscFunctionBegin;
1228   /* do nondiagonal part */
1229   ierr = (*a->B->ops->multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
1230   /* send it on its way */
1231   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
1232   /* do local part */
1233   ierr = (*a->A->ops->multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
1234   /* receive remote parts: note this assumes the values are not actually */
1235   /* inserted in yy until the next line, which is true for my implementation*/
1236   /* but is not perhaps always true. */
1237   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
1238   PetscFunctionReturn(0);
1239 }
1240 
1241 /*
1242   This only works correctly for square matrices where the subblock A->A is the
1243    diagonal block
1244 */
1245 #undef __FUNC__
1246 #define __FUNC__ "MatGetDiagonal_MPIBAIJ"
1247 int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
1248 {
1249   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1250   int         ierr;
1251 
1252   PetscFunctionBegin;
1253   if (a->M != a->N) SETERRQ(PETSC_ERR_SUP,0,"Supports only square matrix where A->A is diag block");
1254   ierr = MatGetDiagonal(a->A,v);CHKERRQ(ierr);
1255   PetscFunctionReturn(0);
1256 }
1257 
1258 #undef __FUNC__
1259 #define __FUNC__ "MatScale_MPIBAIJ"
1260 int MatScale_MPIBAIJ(Scalar *aa,Mat A)
1261 {
1262   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1263   int         ierr;
1264 
1265   PetscFunctionBegin;
1266   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
1267   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
1268   PetscFunctionReturn(0);
1269 }
1270 
1271 #undef __FUNC__
1272 #define __FUNC__ "MatGetSize_MPIBAIJ"
1273 int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
1274 {
1275   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1276 
1277   PetscFunctionBegin;
1278   if (m) *m = mat->M;
1279   if (n) *n = mat->N;
1280   PetscFunctionReturn(0);
1281 }
1282 
1283 #undef __FUNC__
1284 #define __FUNC__ "MatGetLocalSize_MPIBAIJ"
1285 int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
1286 {
1287   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1288 
1289   PetscFunctionBegin;
1290   *m = mat->m; *n = mat->n;
1291   PetscFunctionReturn(0);
1292 }
1293 
1294 #undef __FUNC__
1295 #define __FUNC__ "MatGetOwnershipRange_MPIBAIJ"
1296 int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
1297 {
1298   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1299 
1300   PetscFunctionBegin;
1301   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
1302   PetscFunctionReturn(0);
1303 }
1304 
1305 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
1306 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
1307 
1308 #undef __FUNC__
1309 #define __FUNC__ "MatGetRow_MPIBAIJ"
1310 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
1311 {
1312   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1313   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
1314   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
1315   int        nztot, nzA, nzB, lrow, brstart = mat->rstart*bs, brend = mat->rend*bs;
1316   int        *cmap, *idx_p,cstart = mat->cstart;
1317 
1318   PetscFunctionBegin;
1319   if (mat->getrowactive == PETSC_TRUE) SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"Already active");
1320   mat->getrowactive = PETSC_TRUE;
1321 
1322   if (!mat->rowvalues && (idx || v)) {
1323     /*
1324         allocate enough space to hold information from the longest row.
1325     */
1326     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
1327     int     max = 1,mbs = mat->mbs,tmp;
1328     for ( i=0; i<mbs; i++ ) {
1329       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
1330       if (max < tmp) { max = tmp; }
1331     }
1332     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
1333     CHKPTRQ(mat->rowvalues);
1334     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
1335   }
1336 
1337   if (row < brstart || row >= brend) SETERRQ(PETSC_ERR_SUP,0,"Only local rows")
1338   lrow = row - brstart;
1339 
1340   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
1341   if (!v)   {pvA = 0; pvB = 0;}
1342   if (!idx) {pcA = 0; if (!v) pcB = 0;}
1343   ierr = (*mat->A->ops->getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1344   ierr = (*mat->B->ops->getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1345   nztot = nzA + nzB;
1346 
1347   cmap  = mat->garray;
1348   if (v  || idx) {
1349     if (nztot) {
1350       /* Sort by increasing column numbers, assuming A and B already sorted */
1351       int imark = -1;
1352       if (v) {
1353         *v = v_p = mat->rowvalues;
1354         for ( i=0; i<nzB; i++ ) {
1355           if (cmap[cworkB[i]/bs] < cstart)   v_p[i] = vworkB[i];
1356           else break;
1357         }
1358         imark = i;
1359         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
1360         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
1361       }
1362       if (idx) {
1363         *idx = idx_p = mat->rowindices;
1364         if (imark > -1) {
1365           for ( i=0; i<imark; i++ ) {
1366             idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs;
1367           }
1368         } else {
1369           for ( i=0; i<nzB; i++ ) {
1370             if (cmap[cworkB[i]/bs] < cstart)
1371               idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
1372             else break;
1373           }
1374           imark = i;
1375         }
1376         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart*bs + cworkA[i];
1377         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
1378       }
1379     } else {
1380       if (idx) *idx = 0;
1381       if (v)   *v   = 0;
1382     }
1383   }
1384   *nz = nztot;
1385   ierr = (*mat->A->ops->restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1386   ierr = (*mat->B->ops->restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1387   PetscFunctionReturn(0);
1388 }
1389 
1390 #undef __FUNC__
1391 #define __FUNC__ "MatRestoreRow_MPIBAIJ"
1392 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
1393 {
1394   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1395 
1396   PetscFunctionBegin;
1397   if (baij->getrowactive == PETSC_FALSE) {
1398     SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"MatGetRow not called");
1399   }
1400   baij->getrowactive = PETSC_FALSE;
1401   PetscFunctionReturn(0);
1402 }
1403 
1404 #undef __FUNC__
1405 #define __FUNC__ "MatGetBlockSize_MPIBAIJ"
1406 int MatGetBlockSize_MPIBAIJ(Mat mat,int *bs)
1407 {
1408   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1409 
1410   PetscFunctionBegin;
1411   *bs = baij->bs;
1412   PetscFunctionReturn(0);
1413 }
1414 
1415 #undef __FUNC__
1416 #define __FUNC__ "MatZeroEntries_MPIBAIJ"
1417 int MatZeroEntries_MPIBAIJ(Mat A)
1418 {
1419   Mat_MPIBAIJ *l = (Mat_MPIBAIJ *) A->data;
1420   int         ierr;
1421 
1422   PetscFunctionBegin;
1423   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
1424   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
1425   PetscFunctionReturn(0);
1426 }
1427 
1428 #undef __FUNC__
1429 #define __FUNC__ "MatGetInfo_MPIBAIJ"
1430 int MatGetInfo_MPIBAIJ(Mat matin,MatInfoType flag,MatInfo *info)
1431 {
1432   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) matin->data;
1433   Mat         A = a->A, B = a->B;
1434   int         ierr;
1435   double      isend[5], irecv[5];
1436 
1437   PetscFunctionBegin;
1438   info->block_size     = (double)a->bs;
1439   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
1440   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->nz_unneeded;
1441   isend[3] = info->memory;  isend[4] = info->mallocs;
1442   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
1443   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->nz_unneeded;
1444   isend[3] += info->memory;  isend[4] += info->mallocs;
1445   if (flag == MAT_LOCAL) {
1446     info->nz_used      = isend[0];
1447     info->nz_allocated = isend[1];
1448     info->nz_unneeded  = isend[2];
1449     info->memory       = isend[3];
1450     info->mallocs      = isend[4];
1451   } else if (flag == MAT_GLOBAL_MAX) {
1452     ierr = MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_MAX,matin->comm);CHKERRQ(ierr);
1453     info->nz_used      = irecv[0];
1454     info->nz_allocated = irecv[1];
1455     info->nz_unneeded  = irecv[2];
1456     info->memory       = irecv[3];
1457     info->mallocs      = irecv[4];
1458   } else if (flag == MAT_GLOBAL_SUM) {
1459     ierr = MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_SUM,matin->comm);CHKERRQ(ierr);
1460     info->nz_used      = irecv[0];
1461     info->nz_allocated = irecv[1];
1462     info->nz_unneeded  = irecv[2];
1463     info->memory       = irecv[3];
1464     info->mallocs      = irecv[4];
1465   }
1466   info->rows_global       = (double)a->M;
1467   info->columns_global    = (double)a->N;
1468   info->rows_local        = (double)a->m;
1469   info->columns_local     = (double)a->N;
1470   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
1471   info->fill_ratio_needed = 0;
1472   info->factor_mallocs    = 0;
1473   PetscFunctionReturn(0);
1474 }
1475 
1476 #undef __FUNC__
1477 #define __FUNC__ "MatSetOption_MPIBAIJ"
1478 int MatSetOption_MPIBAIJ(Mat A,MatOption op)
1479 {
1480   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1481 
1482   PetscFunctionBegin;
1483   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
1484       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
1485       op == MAT_COLUMNS_UNSORTED ||
1486       op == MAT_COLUMNS_SORTED ||
1487       op == MAT_NEW_NONZERO_ALLOCATION_ERROR ||
1488       op == MAT_NEW_NONZERO_LOCATION_ERROR) {
1489         MatSetOption(a->A,op);
1490         MatSetOption(a->B,op);
1491   } else if (op == MAT_ROW_ORIENTED) {
1492         a->roworiented = 1;
1493         MatSetOption(a->A,op);
1494         MatSetOption(a->B,op);
1495   } else if (op == MAT_ROWS_SORTED ||
1496              op == MAT_ROWS_UNSORTED ||
1497              op == MAT_SYMMETRIC ||
1498              op == MAT_STRUCTURALLY_SYMMETRIC ||
1499              op == MAT_YES_NEW_DIAGONALS ||
1500              op == MAT_USE_HASH_TABLE)
1501     PLogInfo(A,"Info:MatSetOption_MPIBAIJ:Option ignored\n");
1502   else if (op == MAT_COLUMN_ORIENTED) {
1503     a->roworiented = 0;
1504     MatSetOption(a->A,op);
1505     MatSetOption(a->B,op);
1506   } else if (op == MAT_IGNORE_OFF_PROC_ENTRIES) {
1507     a->donotstash = 1;
1508   } else if (op == MAT_NO_NEW_DIAGONALS) {
1509     SETERRQ(PETSC_ERR_SUP,0,"MAT_NO_NEW_DIAGONALS");
1510   } else if (op == MAT_USE_HASH_TABLE) {
1511     a->ht_flag = 1;
1512   } else {
1513     SETERRQ(PETSC_ERR_SUP,0,"unknown option");
1514   }
1515   PetscFunctionReturn(0);
1516 }
1517 
1518 #undef __FUNC__
1519 #define __FUNC__ "MatTranspose_MPIBAIJ("
1520 int MatTranspose_MPIBAIJ(Mat A,Mat *matout)
1521 {
1522   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) A->data;
1523   Mat_SeqBAIJ *Aloc;
1524   Mat        B;
1525   int        ierr,M=baij->M,N=baij->N,*ai,*aj,i,*rvals,j,k,col;
1526   int        bs=baij->bs,mbs=baij->mbs;
1527   Scalar     *a;
1528 
1529   PetscFunctionBegin;
1530   if (matout == PETSC_NULL && M != N) SETERRQ(PETSC_ERR_ARG_SIZ,0,"Square matrix only for in-place");
1531   ierr = MatCreateMPIBAIJ(A->comm,baij->bs,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);
1532   CHKERRQ(ierr);
1533 
1534   /* copy over the A part */
1535   Aloc = (Mat_SeqBAIJ*) baij->A->data;
1536   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1537   rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
1538 
1539   for ( i=0; i<mbs; i++ ) {
1540     rvals[0] = bs*(baij->rstart + i);
1541     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1542     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1543       col = (baij->cstart+aj[j])*bs;
1544       for (k=0; k<bs; k++ ) {
1545         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1546         col++; a += bs;
1547       }
1548     }
1549   }
1550   /* copy over the B part */
1551   Aloc = (Mat_SeqBAIJ*) baij->B->data;
1552   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1553   for ( i=0; i<mbs; i++ ) {
1554     rvals[0] = bs*(baij->rstart + i);
1555     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1556     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1557       col = baij->garray[aj[j]]*bs;
1558       for (k=0; k<bs; k++ ) {
1559         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1560         col++; a += bs;
1561       }
1562     }
1563   }
1564   PetscFree(rvals);
1565   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1566   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1567 
1568   if (matout != PETSC_NULL) {
1569     *matout = B;
1570   } else {
1571     PetscOps       *Abops;
1572     struct _MatOps *Aops;
1573 
1574     /* This isn't really an in-place transpose .... but free data structures from baij */
1575     PetscFree(baij->rowners);
1576     ierr = MatDestroy(baij->A); CHKERRQ(ierr);
1577     ierr = MatDestroy(baij->B); CHKERRQ(ierr);
1578     if (baij->colmap) PetscFree(baij->colmap);
1579     if (baij->garray) PetscFree(baij->garray);
1580     if (baij->lvec) VecDestroy(baij->lvec);
1581     if (baij->Mvctx) VecScatterDestroy(baij->Mvctx);
1582     PetscFree(baij);
1583 
1584     /*
1585        This is horrible, horrible code. We need to keep the
1586       A pointers for the bops and ops but copy everything
1587       else from C.
1588     */
1589     Abops = A->bops;
1590     Aops  = A->ops;
1591     PetscMemcpy(A,B,sizeof(struct _p_Mat));
1592     A->bops = Abops;
1593     A->ops  = Aops;
1594 
1595     PetscHeaderDestroy(B);
1596   }
1597   PetscFunctionReturn(0);
1598 }
1599 
1600 #undef __FUNC__
1601 #define __FUNC__ "MatDiagonalScale_MPIBAIJ"
1602 int MatDiagonalScale_MPIBAIJ(Mat A,Vec ll,Vec rr)
1603 {
1604   Mat a = ((Mat_MPIBAIJ *) A->data)->A;
1605   Mat b = ((Mat_MPIBAIJ *) A->data)->B;
1606   int ierr,s1,s2,s3;
1607 
1608   PetscFunctionBegin;
1609   if (ll)  {
1610     ierr = VecGetLocalSize(ll,&s1); CHKERRQ(ierr);
1611     ierr = MatGetLocalSize(A,&s2,&s3); CHKERRQ(ierr);
1612     if (s1!=s2) SETERRQ(PETSC_ERR_ARG_SIZ,0,"non-conforming local sizes");
1613     ierr = MatDiagonalScale(a,ll,0); CHKERRQ(ierr);
1614     ierr = MatDiagonalScale(b,ll,0); CHKERRQ(ierr);
1615   }
1616   if (rr) SETERRQ(PETSC_ERR_SUP,0,"not supported for right vector");
1617   PetscFunctionReturn(0);
1618 }
1619 
1620 #undef __FUNC__
1621 #define __FUNC__ "MatZeroRows_MPIBAIJ"
1622 int MatZeroRows_MPIBAIJ(Mat A,IS is,Scalar *diag)
1623 {
1624   Mat_MPIBAIJ    *l = (Mat_MPIBAIJ *) A->data;
1625   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
1626   int            *procs,*nprocs,j,found,idx,nsends,*work,row;
1627   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
1628   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
1629   int            *lens,imdex,*lrows,*values,bs=l->bs,rstart_bs=l->rstart_bs;
1630   MPI_Comm       comm = A->comm;
1631   MPI_Request    *send_waits,*recv_waits;
1632   MPI_Status     recv_status,*send_status;
1633   IS             istmp;
1634   PetscTruth     localdiag;
1635 
1636   PetscFunctionBegin;
1637   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
1638   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
1639 
1640   /*  first count number of contributors to each processor */
1641   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
1642   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
1643   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
1644   for ( i=0; i<N; i++ ) {
1645     idx = rows[i];
1646     found = 0;
1647     for ( j=0; j<size; j++ ) {
1648       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
1649         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
1650       }
1651     }
1652     if (!found) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Index out of range");
1653   }
1654   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
1655 
1656   /* inform other processors of number of messages and max length*/
1657   work   = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
1658   ierr   = MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
1659   nrecvs = work[rank];
1660   ierr   = MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);CHKERRQ(ierr);
1661   nmax   = work[rank];
1662   PetscFree(work);
1663 
1664   /* post receives:   */
1665   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); CHKPTRQ(rvalues);
1666   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
1667   for ( i=0; i<nrecvs; i++ ) {
1668     ierr = MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);CHKERRQ(ierr);
1669   }
1670 
1671   /* do sends:
1672      1) starts[i] gives the starting index in svalues for stuff going to
1673      the ith processor
1674   */
1675   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
1676   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
1677   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
1678   starts[0] = 0;
1679   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1680   for ( i=0; i<N; i++ ) {
1681     svalues[starts[owner[i]]++] = rows[i];
1682   }
1683   ISRestoreIndices(is,&rows);
1684 
1685   starts[0] = 0;
1686   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1687   count = 0;
1688   for ( i=0; i<size; i++ ) {
1689     if (procs[i]) {
1690       ierr = MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);CHKERRQ(ierr);
1691     }
1692   }
1693   PetscFree(starts);
1694 
1695   base = owners[rank]*bs;
1696 
1697   /*  wait on receives */
1698   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
1699   source = lens + nrecvs;
1700   count  = nrecvs; slen = 0;
1701   while (count) {
1702     ierr = MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);CHKERRQ(ierr);
1703     /* unpack receives into our local space */
1704     ierr = MPI_Get_count(&recv_status,MPI_INT,&n);CHKERRQ(ierr);
1705     source[imdex]  = recv_status.MPI_SOURCE;
1706     lens[imdex]  = n;
1707     slen += n;
1708     count--;
1709   }
1710   PetscFree(recv_waits);
1711 
1712   /* move the data into the send scatter */
1713   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
1714   count = 0;
1715   for ( i=0; i<nrecvs; i++ ) {
1716     values = rvalues + i*nmax;
1717     for ( j=0; j<lens[i]; j++ ) {
1718       lrows[count++] = values[j] - base;
1719     }
1720   }
1721   PetscFree(rvalues); PetscFree(lens);
1722   PetscFree(owner); PetscFree(nprocs);
1723 
1724   /* actually zap the local rows */
1725   ierr = ISCreateGeneral(PETSC_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
1726   PLogObjectParent(A,istmp);
1727 
1728   /*
1729         Zero the required rows. If the "diagonal block" of the matrix
1730      is square and the user wishes to set the diagonal we use seperate
1731      code so that MatSetValues() is not called for each diagonal allocating
1732      new memory, thus calling lots of mallocs and slowing things down.
1733 
1734        Contributed by: Mathew Knepley
1735   */
1736   localdiag = PETSC_FALSE;
1737   if (diag && (l->A->M == l->A->N)) {
1738     localdiag = PETSC_TRUE;
1739     ierr      = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
1740   } else {
1741     ierr = MatZeroRows(l->A,istmp,0); CHKERRQ(ierr);
1742   }
1743   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
1744   ierr = ISDestroy(istmp); CHKERRQ(ierr);
1745 
1746   if (diag && (localdiag == PETSC_FALSE)) {
1747     for ( i = 0; i < slen; i++ ) {
1748       row = lrows[i] + rstart_bs;
1749       ierr = MatSetValues(A,1,&row,1,&row,diag,INSERT_VALUES); CHKERRQ(ierr);
1750     }
1751     ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1752     ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1753   }
1754   PetscFree(lrows);
1755 
1756   /* wait on sends */
1757   if (nsends) {
1758     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
1759     ierr        = MPI_Waitall(nsends,send_waits,send_status);CHKERRQ(ierr);
1760     PetscFree(send_status);
1761   }
1762   PetscFree(send_waits); PetscFree(svalues);
1763 
1764   PetscFunctionReturn(0);
1765 }
1766 
1767 extern int MatPrintHelp_SeqBAIJ(Mat);
1768 #undef __FUNC__
1769 #define __FUNC__ "MatPrintHelp_MPIBAIJ"
1770 int MatPrintHelp_MPIBAIJ(Mat A)
1771 {
1772   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1773   MPI_Comm    comm = A->comm;
1774   static int  called = 0;
1775   int         ierr;
1776 
1777   PetscFunctionBegin;
1778   if (!a->rank) {
1779     ierr = MatPrintHelp_SeqBAIJ(a->A);CHKERRQ(ierr);
1780   }
1781   if (called) {PetscFunctionReturn(0);} else called = 1;
1782   (*PetscHelpPrintf)(comm," Options for MATMPIBAIJ matrix format (the defaults):\n");
1783   (*PetscHelpPrintf)(comm,"  -mat_use_hash_table <factor>: Use hashtable for efficient matrix assembly\n");
1784   PetscFunctionReturn(0);
1785 }
1786 
1787 #undef __FUNC__
1788 #define __FUNC__ "MatSetUnfactored_MPIBAIJ"
1789 int MatSetUnfactored_MPIBAIJ(Mat A)
1790 {
1791   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1792   int         ierr;
1793 
1794   PetscFunctionBegin;
1795   ierr = MatSetUnfactored(a->A); CHKERRQ(ierr);
1796   PetscFunctionReturn(0);
1797 }
1798 
1799 static int MatConvertSameType_MPIBAIJ(Mat,Mat *,int);
1800 
1801 /* -------------------------------------------------------------------*/
1802 static struct _MatOps MatOps = {
1803   MatSetValues_MPIBAIJ,MatGetRow_MPIBAIJ,MatRestoreRow_MPIBAIJ,MatMult_MPIBAIJ,
1804   MatMultAdd_MPIBAIJ,MatMultTrans_MPIBAIJ,MatMultTransAdd_MPIBAIJ,0,
1805   0,0,0,0,
1806   0,0,MatTranspose_MPIBAIJ,MatGetInfo_MPIBAIJ,
1807   0,MatGetDiagonal_MPIBAIJ,MatDiagonalScale_MPIBAIJ,MatNorm_MPIBAIJ,
1808   MatAssemblyBegin_MPIBAIJ,MatAssemblyEnd_MPIBAIJ,0,MatSetOption_MPIBAIJ,
1809   MatZeroEntries_MPIBAIJ,MatZeroRows_MPIBAIJ,0,
1810   0,0,0,MatGetSize_MPIBAIJ,
1811   MatGetLocalSize_MPIBAIJ,MatGetOwnershipRange_MPIBAIJ,0,0,
1812   0,0,MatConvertSameType_MPIBAIJ,0,0,
1813   0,0,0,MatGetSubMatrices_MPIBAIJ,
1814   MatIncreaseOverlap_MPIBAIJ,MatGetValues_MPIBAIJ,0,MatPrintHelp_MPIBAIJ,
1815   MatScale_MPIBAIJ,0,0,0,MatGetBlockSize_MPIBAIJ,
1816   0,0,0,0,0,0,MatSetUnfactored_MPIBAIJ,0,MatSetValuesBlocked_MPIBAIJ};
1817 
1818 
1819 #undef __FUNC__
1820 #define __FUNC__ "MatCreateMPIBAIJ"
1821 /*@C
1822    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
1823    (block compressed row).  For good matrix assembly performance
1824    the user should preallocate the matrix storage by setting the parameters
1825    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1826    performance can be increased by more than a factor of 50.
1827 
1828    Collective on MPI_Comm
1829 
1830    Input Parameters:
1831 +  comm - MPI communicator
1832 .  bs   - size of blockk
1833 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1834            This value should be the same as the local size used in creating the
1835            y vector for the matrix-vector product y = Ax.
1836 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1837            This value should be the same as the local size used in creating the
1838            x vector for the matrix-vector product y = Ax.
1839 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1840 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1841 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
1842            submatrix  (same for all local rows)
1843 .  d_nzz - array containing the number of block nonzeros in the various block rows
1844            of the in diagonal portion of the local (possibly different for each block
1845            row) or PETSC_NULL.  You must leave room for the diagonal entry even if it is zero.
1846 .  o_nz  - number of block nonzeros per block row in the off-diagonal portion of local
1847            submatrix (same for all local rows).
1848 -  o_nzz - array containing the number of nonzeros in the various block rows of the
1849            off-diagonal portion of the local submatrix (possibly different for
1850            each block row) or PETSC_NULL.
1851 
1852    Output Parameter:
1853 .  A - the matrix
1854 
1855    Options Database Keys:
1856 .   -mat_no_unroll - uses code that does not unroll the loops in the
1857                      block calculations (much slower)
1858 .   -mat_block_size - size of the blocks to use
1859 
1860    Notes:
1861    The user MUST specify either the local or global matrix dimensions
1862    (possibly both).
1863 
1864    Storage Information:
1865    For a square global matrix we define each processor's diagonal portion
1866    to be its local rows and the corresponding columns (a square submatrix);
1867    each processor's off-diagonal portion encompasses the remainder of the
1868    local matrix (a rectangular submatrix).
1869 
1870    The user can specify preallocated storage for the diagonal part of
1871    the local submatrix with either d_nz or d_nnz (not both).  Set
1872    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1873    memory allocation.  Likewise, specify preallocated storage for the
1874    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1875 
1876    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1877    the figure below we depict these three local rows and all columns (0-11).
1878 
1879 .vb
1880            0 1 2 3 4 5 6 7 8 9 10 11
1881           -------------------
1882    row 3  |  o o o d d d o o o o o o
1883    row 4  |  o o o d d d o o o o o o
1884    row 5  |  o o o d d d o o o o o o
1885           -------------------
1886 .ve
1887 
1888    Thus, any entries in the d locations are stored in the d (diagonal)
1889    submatrix, and any entries in the o locations are stored in the
1890    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1891    stored simply in the MATSEQBAIJ format for compressed row storage.
1892 
1893    Now d_nz should indicate the number of block nonzeros per row in the d matrix,
1894    and o_nz should indicate the number of block nonzeros per row in the o matrix.
1895    In general, for PDE problems in which most nonzeros are near the diagonal,
1896    one expects d_nz >> o_nz.   For large problems you MUST preallocate memory
1897    or you will get TERRIBLE performance; see the users' manual chapter on
1898    matrices.
1899 
1900 .keywords: matrix, block, aij, compressed row, sparse, parallel
1901 
1902 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues(), MatCreateMPIAIJ()
1903 @*/
1904 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
1905                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1906 {
1907   Mat          B;
1908   Mat_MPIBAIJ  *b;
1909   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE,size,flg;
1910 
1911   PetscFunctionBegin;
1912   if (bs < 1) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Invalid block size specified, must be positive");
1913 
1914   MPI_Comm_size(comm,&size);
1915   if (size == 1) {
1916     if (M == PETSC_DECIDE) M = m;
1917     if (N == PETSC_DECIDE) N = n;
1918     ierr = MatCreateSeqBAIJ(comm,bs,M,N,d_nz,d_nnz,A); CHKERRQ(ierr);
1919     PetscFunctionReturn(0);
1920   }
1921 
1922   *A = 0;
1923   PetscHeaderCreate(B,_p_Mat,struct _MatOps,MAT_COOKIE,MATMPIBAIJ,comm,MatDestroy,MatView);
1924   PLogObjectCreate(B);
1925   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
1926   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
1927   PetscMemcpy(B->ops,&MatOps,sizeof(struct _MatOps));
1928 
1929   B->ops->destroy    = MatDestroy_MPIBAIJ;
1930   B->ops->view       = MatView_MPIBAIJ;
1931   B->mapping    = 0;
1932   B->factor     = 0;
1933   B->assembled  = PETSC_FALSE;
1934 
1935   B->insertmode = NOT_SET_VALUES;
1936   MPI_Comm_rank(comm,&b->rank);
1937   MPI_Comm_size(comm,&b->size);
1938 
1939   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL)) {
1940     SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1941   }
1942   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) {
1943     SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"either M or m should be specified");
1944   }
1945   if ( N == PETSC_DECIDE && n == PETSC_DECIDE) {
1946     SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"either N or n should be specified");
1947   }
1948   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
1949   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
1950 
1951   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1952     work[0] = m; work[1] = n;
1953     mbs = m/bs; nbs = n/bs;
1954     ierr = MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );CHKERRQ(ierr);
1955     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
1956     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
1957   }
1958   if (m == PETSC_DECIDE) {
1959     Mbs = M/bs;
1960     if (Mbs*bs != M) SETERRQ(PETSC_ERR_ARG_SIZ,0,"No of global rows must be divisible by blocksize");
1961     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
1962     m   = mbs*bs;
1963   }
1964   if (n == PETSC_DECIDE) {
1965     Nbs = N/bs;
1966     if (Nbs*bs != N) SETERRQ(PETSC_ERR_ARG_SIZ,0,"No of global cols must be divisible by blocksize");
1967     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
1968     n   = nbs*bs;
1969   }
1970   if (mbs*bs != m || nbs*bs != n) {
1971     SETERRQ(PETSC_ERR_ARG_SIZ,0,"No of local rows, cols must be divisible by blocksize");
1972   }
1973 
1974   b->m = m; B->m = m;
1975   b->n = n; B->n = n;
1976   b->N = N; B->N = N;
1977   b->M = M; B->M = M;
1978   b->bs  = bs;
1979   b->bs2 = bs*bs;
1980   b->mbs = mbs;
1981   b->nbs = nbs;
1982   b->Mbs = Mbs;
1983   b->Nbs = Nbs;
1984 
1985   /* build local table of row and column ownerships */
1986   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1987   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIBAIJ));
1988   b->cowners = b->rowners + b->size + 2;
1989   ierr = MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1990   b->rowners[0] = 0;
1991   for ( i=2; i<=b->size; i++ ) {
1992     b->rowners[i] += b->rowners[i-1];
1993   }
1994   b->rstart    = b->rowners[b->rank];
1995   b->rend      = b->rowners[b->rank+1];
1996   b->rstart_bs = b->rstart * bs;
1997   b->rend_bs   = b->rend * bs;
1998 
1999   ierr = MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
2000   b->cowners[0] = 0;
2001   for ( i=2; i<=b->size; i++ ) {
2002     b->cowners[i] += b->cowners[i-1];
2003   }
2004   b->cstart    = b->cowners[b->rank];
2005   b->cend      = b->cowners[b->rank+1];
2006   b->cstart_bs = b->cstart * bs;
2007   b->cend_bs   = b->cend * bs;
2008 
2009 
2010   if (d_nz == PETSC_DEFAULT) d_nz = 5;
2011   ierr = MatCreateSeqBAIJ(PETSC_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
2012   PLogObjectParent(B,b->A);
2013   if (o_nz == PETSC_DEFAULT) o_nz = 0;
2014   ierr = MatCreateSeqBAIJ(PETSC_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
2015   PLogObjectParent(B,b->B);
2016 
2017   /* build cache for off array entries formed */
2018   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
2019   b->donotstash  = 0;
2020   b->colmap      = 0;
2021   b->garray      = 0;
2022   b->roworiented = 1;
2023 
2024   /* stuff used in block assembly */
2025   b->barray       = 0;
2026 
2027   /* stuff used for matrix vector multiply */
2028   b->lvec         = 0;
2029   b->Mvctx        = 0;
2030 
2031   /* stuff for MatGetRow() */
2032   b->rowindices   = 0;
2033   b->rowvalues    = 0;
2034   b->getrowactive = PETSC_FALSE;
2035 
2036   /* hash table stuff */
2037   b->ht           = 0;
2038   b->hd           = 0;
2039   b->ht_size      = 0;
2040   b->ht_flag      = 0;
2041   b->ht_fact      = 0;
2042   b->ht_total_ct  = 0;
2043   b->ht_insert_ct = 0;
2044 
2045   *A = B;
2046   ierr = OptionsHasName(PETSC_NULL,"-mat_use_hash_table",&flg); CHKERRQ(ierr);
2047   if (flg) {
2048     double fact = 1.39;
2049     ierr = MatSetOption(B,MAT_USE_HASH_TABLE); CHKERRQ(ierr);
2050     ierr = OptionsGetDouble(PETSC_NULL,"-mat_use_hash_table",&fact,&flg); CHKERRQ(ierr);
2051     if (fact <= 1.0) fact = 1.39;
2052     ierr = MatMPIBAIJSetHashTableFactor(B,fact); CHKERRQ(ierr);
2053     PLogInfo(0,"MatCreateMPIBAIJ:Hash table Factor used %5.2f\n",fact);
2054   }
2055   PetscFunctionReturn(0);
2056 }
2057 
2058 #undef __FUNC__
2059 #define __FUNC__ "MatConvertSameType_MPIBAIJ"
2060 static int MatConvertSameType_MPIBAIJ(Mat matin,Mat *newmat,int cpvalues)
2061 {
2062   Mat         mat;
2063   Mat_MPIBAIJ *a,*oldmat = (Mat_MPIBAIJ *) matin->data;
2064   int         ierr, len=0, flg;
2065 
2066   PetscFunctionBegin;
2067   *newmat       = 0;
2068   PetscHeaderCreate(mat,_p_Mat,struct _MatOps,MAT_COOKIE,MATMPIBAIJ,matin->comm,MatDestroy,MatView);
2069   PLogObjectCreate(mat);
2070   mat->data       = (void *) (a = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(a);
2071   PetscMemcpy(mat->ops,&MatOps,sizeof(struct _MatOps));
2072   mat->ops->destroy    = MatDestroy_MPIBAIJ;
2073   mat->ops->view       = MatView_MPIBAIJ;
2074   mat->factor     = matin->factor;
2075   mat->assembled  = PETSC_TRUE;
2076 
2077   a->m = mat->m   = oldmat->m;
2078   a->n = mat->n   = oldmat->n;
2079   a->M = mat->M   = oldmat->M;
2080   a->N = mat->N   = oldmat->N;
2081 
2082   a->bs  = oldmat->bs;
2083   a->bs2 = oldmat->bs2;
2084   a->mbs = oldmat->mbs;
2085   a->nbs = oldmat->nbs;
2086   a->Mbs = oldmat->Mbs;
2087   a->Nbs = oldmat->Nbs;
2088 
2089   a->rstart       = oldmat->rstart;
2090   a->rend         = oldmat->rend;
2091   a->cstart       = oldmat->cstart;
2092   a->cend         = oldmat->cend;
2093   a->size         = oldmat->size;
2094   a->rank         = oldmat->rank;
2095   mat->insertmode = NOT_SET_VALUES;
2096   a->rowvalues    = 0;
2097   a->getrowactive = PETSC_FALSE;
2098   a->barray       = 0;
2099 
2100   /* hash table stuff */
2101   a->ht           = 0;
2102   a->hd           = 0;
2103   a->ht_size      = 0;
2104   a->ht_flag      = oldmat->ht_flag;
2105   a->ht_fact      = oldmat->ht_fact;
2106   a->ht_total_ct  = 0;
2107   a->ht_insert_ct = 0;
2108 
2109 
2110   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
2111   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIBAIJ));
2112   a->cowners = a->rowners + a->size + 2;
2113   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
2114   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
2115   if (oldmat->colmap) {
2116     a->colmap = (int *) PetscMalloc((a->Nbs)*sizeof(int));CHKPTRQ(a->colmap);
2117     PLogObjectMemory(mat,(a->Nbs)*sizeof(int));
2118     PetscMemcpy(a->colmap,oldmat->colmap,(a->Nbs)*sizeof(int));
2119   } else a->colmap = 0;
2120   if (oldmat->garray && (len = ((Mat_SeqBAIJ *) (oldmat->B->data))->nbs)) {
2121     a->garray = (int *) PetscMalloc(len*sizeof(int)); CHKPTRQ(a->garray);
2122     PLogObjectMemory(mat,len*sizeof(int));
2123     PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
2124   } else a->garray = 0;
2125 
2126   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
2127   PLogObjectParent(mat,a->lvec);
2128   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
2129   PLogObjectParent(mat,a->Mvctx);
2130   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
2131   PLogObjectParent(mat,a->A);
2132   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
2133   PLogObjectParent(mat,a->B);
2134   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
2135   if (flg) {
2136     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
2137   }
2138   *newmat = mat;
2139   PetscFunctionReturn(0);
2140 }
2141 
2142 #include "sys.h"
2143 
2144 #undef __FUNC__
2145 #define __FUNC__ "MatLoad_MPIBAIJ"
2146 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
2147 {
2148   Mat          A;
2149   int          i, nz, ierr, j,rstart, rend, fd;
2150   Scalar       *vals,*buf;
2151   MPI_Comm     comm = ((PetscObject)viewer)->comm;
2152   MPI_Status   status;
2153   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
2154   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
2155   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,Mbs,mbs,extra_rows;
2156   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
2157   int          dcount,kmax,k,nzcount,tmp;
2158 
2159   PetscFunctionBegin;
2160   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
2161 
2162   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
2163   if (!rank) {
2164     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
2165     ierr = PetscBinaryRead(fd,(char *)header,4,PETSC_INT); CHKERRQ(ierr);
2166     if (header[0] != MAT_COOKIE) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"not matrix object");
2167     if (header[3] < 0) {
2168       SETERRQ(PETSC_ERR_FILE_UNEXPECTED,1,"Matrix stored in special format, cannot load as MPIBAIJ");
2169     }
2170   }
2171 
2172   ierr = MPI_Bcast(header+1,3,MPI_INT,0,comm);CHKERRQ(ierr);
2173   M = header[1]; N = header[2];
2174 
2175   if (M != N) SETERRQ(PETSC_ERR_SUP,0,"Can only do square matrices");
2176 
2177   /*
2178      This code adds extra rows to make sure the number of rows is
2179      divisible by the blocksize
2180   */
2181   Mbs        = M/bs;
2182   extra_rows = bs - M + bs*(Mbs);
2183   if (extra_rows == bs) extra_rows = 0;
2184   else                  Mbs++;
2185   if (extra_rows &&!rank) {
2186     PLogInfo(0,"MatLoad_MPIBAIJ:Padding loaded matrix to match blocksize\n");
2187   }
2188 
2189   /* determine ownership of all rows */
2190   mbs = Mbs/size + ((Mbs % size) > rank);
2191   m   = mbs * bs;
2192   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
2193   browners = rowners + size + 1;
2194   ierr = MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
2195   rowners[0] = 0;
2196   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
2197   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
2198   rstart = rowners[rank];
2199   rend   = rowners[rank+1];
2200 
2201   /* distribute row lengths to all processors */
2202   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
2203   if (!rank) {
2204     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
2205     ierr = PetscBinaryRead(fd,rowlengths,M,PETSC_INT); CHKERRQ(ierr);
2206     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
2207     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
2208     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
2209     ierr = MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);CHKERRQ(ierr);
2210     PetscFree(sndcounts);
2211   } else {
2212     ierr = MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);CHKERRQ(ierr);
2213   }
2214 
2215   if (!rank) {
2216     /* calculate the number of nonzeros on each processor */
2217     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
2218     PetscMemzero(procsnz,size*sizeof(int));
2219     for ( i=0; i<size; i++ ) {
2220       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
2221         procsnz[i] += rowlengths[j];
2222       }
2223     }
2224     PetscFree(rowlengths);
2225 
2226     /* determine max buffer needed and allocate it */
2227     maxnz = 0;
2228     for ( i=0; i<size; i++ ) {
2229       maxnz = PetscMax(maxnz,procsnz[i]);
2230     }
2231     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
2232 
2233     /* read in my part of the matrix column indices  */
2234     nz = procsnz[0];
2235     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
2236     mycols = ibuf;
2237     if (size == 1)  nz -= extra_rows;
2238     ierr = PetscBinaryRead(fd,mycols,nz,PETSC_INT); CHKERRQ(ierr);
2239     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
2240 
2241     /* read in every ones (except the last) and ship off */
2242     for ( i=1; i<size-1; i++ ) {
2243       nz   = procsnz[i];
2244       ierr = PetscBinaryRead(fd,cols,nz,PETSC_INT); CHKERRQ(ierr);
2245       ierr = MPI_Send(cols,nz,MPI_INT,i,tag,comm);CHKERRQ(ierr);
2246     }
2247     /* read in the stuff for the last proc */
2248     if ( size != 1 ) {
2249       nz   = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
2250       ierr = PetscBinaryRead(fd,cols,nz,PETSC_INT); CHKERRQ(ierr);
2251       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
2252       ierr = MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);CHKERRQ(ierr);
2253     }
2254     PetscFree(cols);
2255   } else {
2256     /* determine buffer space needed for message */
2257     nz = 0;
2258     for ( i=0; i<m; i++ ) {
2259       nz += locrowlens[i];
2260     }
2261     ibuf   = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
2262     mycols = ibuf;
2263     /* receive message of column indices*/
2264     ierr = MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);CHKERRQ(ierr);
2265     ierr = MPI_Get_count(&status,MPI_INT,&maxnz);CHKERRQ(ierr);
2266     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"something is wrong with file");
2267   }
2268 
2269   /* loop over local rows, determining number of off diagonal entries */
2270   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
2271   odlens = dlens + (rend-rstart);
2272   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
2273   PetscMemzero(mask,3*Mbs*sizeof(int));
2274   masked1 = mask    + Mbs;
2275   masked2 = masked1 + Mbs;
2276   rowcount = 0; nzcount = 0;
2277   for ( i=0; i<mbs; i++ ) {
2278     dcount  = 0;
2279     odcount = 0;
2280     for ( j=0; j<bs; j++ ) {
2281       kmax = locrowlens[rowcount];
2282       for ( k=0; k<kmax; k++ ) {
2283         tmp = mycols[nzcount++]/bs;
2284         if (!mask[tmp]) {
2285           mask[tmp] = 1;
2286           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
2287           else masked1[dcount++] = tmp;
2288         }
2289       }
2290       rowcount++;
2291     }
2292 
2293     dlens[i]  = dcount;
2294     odlens[i] = odcount;
2295 
2296     /* zero out the mask elements we set */
2297     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
2298     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
2299   }
2300 
2301   /* create our matrix */
2302   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);
2303          CHKERRQ(ierr);
2304   A = *newmat;
2305   MatSetOption(A,MAT_COLUMNS_SORTED);
2306 
2307   if (!rank) {
2308     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
2309     /* read in my part of the matrix numerical values  */
2310     nz = procsnz[0];
2311     vals = buf;
2312     mycols = ibuf;
2313     if (size == 1)  nz -= extra_rows;
2314     ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
2315     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
2316 
2317     /* insert into matrix */
2318     jj      = rstart*bs;
2319     for ( i=0; i<m; i++ ) {
2320       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
2321       mycols += locrowlens[i];
2322       vals   += locrowlens[i];
2323       jj++;
2324     }
2325     /* read in other processors (except the last one) and ship out */
2326     for ( i=1; i<size-1; i++ ) {
2327       nz   = procsnz[i];
2328       vals = buf;
2329       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
2330       ierr = MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);CHKERRQ(ierr);
2331     }
2332     /* the last proc */
2333     if ( size != 1 ){
2334       nz   = procsnz[i] - extra_rows;
2335       vals = buf;
2336       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
2337       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
2338       ierr = MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);CHKERRQ(ierr);
2339     }
2340     PetscFree(procsnz);
2341   } else {
2342     /* receive numeric values */
2343     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
2344 
2345     /* receive message of values*/
2346     vals   = buf;
2347     mycols = ibuf;
2348     ierr   = MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);CHKERRQ(ierr);
2349     ierr   = MPI_Get_count(&status,MPIU_SCALAR,&maxnz);CHKERRQ(ierr);
2350     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"something is wrong with file");
2351 
2352     /* insert into matrix */
2353     jj      = rstart*bs;
2354     for ( i=0; i<m; i++ ) {
2355       ierr    = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
2356       mycols += locrowlens[i];
2357       vals   += locrowlens[i];
2358       jj++;
2359     }
2360   }
2361   PetscFree(locrowlens);
2362   PetscFree(buf);
2363   PetscFree(ibuf);
2364   PetscFree(rowners);
2365   PetscFree(dlens);
2366   PetscFree(mask);
2367   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2368   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2369   PetscFunctionReturn(0);
2370 }
2371 
2372 
2373 
2374 #undef __FUNC__
2375 #define __FUNC__ "MatMPIBAIJSetHashTableFactor"
2376 /*@
2377    MatMPIBAIJSetHashTableFactor - Sets the factor required to compute the size of the HashTable.
2378 
2379    Input Parameters:
2380 .  mat  - the matrix
2381 .  fact - factor
2382 
2383    Collective on Mat
2384 
2385   Notes:
2386    This can also be set by the command line option: -mat_use_hash_table fact
2387 
2388 .keywords: matrix, hashtable, factor, HT
2389 
2390 .seealso: MatSetOption()
2391 @*/
2392 int MatMPIBAIJSetHashTableFactor(Mat mat,double fact)
2393 {
2394   Mat_MPIBAIJ *baij;
2395 
2396   PetscFunctionBegin;
2397   PetscValidHeaderSpecific(mat,MAT_COOKIE);
2398   if (mat->type != MATMPIBAIJ) {
2399       SETERRQ(PETSC_ERR_ARG_WRONG,1,"Incorrect matrix type. Use MPIBAIJ only.");
2400   }
2401   baij = (Mat_MPIBAIJ*) mat->data;
2402   baij->ht_fact = fact;
2403   PetscFunctionReturn(0);
2404 }
2405