xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision 6c5fab8fcb40ff5ba426e0c09e69e39e313ab018)
1 #ifdef PETSC_RCS_HEADER
2 static char vcid[] = "$Id: mpibaij.c,v 1.84 1997/10/28 14:23:15 bsmith Exp bsmith $";
3 #endif
4 
5 #include "pinclude/pviewer.h"
6 #include "src/mat/impls/baij/mpi/mpibaij.h"
7 #include "src/vec/vecimpl.h"
8 
9 
10 extern int MatSetUpMultiply_MPIBAIJ(Mat);
11 extern int DisAssemble_MPIBAIJ(Mat);
12 extern int MatIncreaseOverlap_MPIBAIJ(Mat,int,IS *,int);
13 extern int MatGetSubMatrices_MPIBAIJ(Mat,int,IS *,IS *,MatGetSubMatrixCall,Mat **);
14 
15 /*
16      Local utility routine that creates a mapping from the global column
17    number to the local number in the off-diagonal part of the local
18    storage of the matrix.  This is done in a non scable way since the
19    length of colmap equals the global matrix length.
20 */
21 #undef __FUNC__
22 #define __FUNC__ "CreateColmap_MPIBAIJ_Private"
23 static int CreateColmap_MPIBAIJ_Private(Mat mat)
24 {
25   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
26   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
27   int         nbs = B->nbs,i,bs=B->bs;;
28 
29   PetscFunctionBegin;
30   baij->colmap = (int *) PetscMalloc((baij->Nbs+1)*sizeof(int));CHKPTRQ(baij->colmap);
31   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
32   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
33   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i*bs+1;
34   PetscFunctionReturn(0);
35 }
36 
37 #define CHUNKSIZE  10
38 
39 #define  MatSetValues_SeqBAIJ_A_Private(row,col,value,addv) \
40 { \
41  \
42     brow = row/bs;  \
43     rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
44     rmax = aimax[brow]; nrow = ailen[brow]; \
45       bcol = col/bs; \
46       ridx = row % bs; cidx = col % bs; \
47       low = 0; high = nrow; \
48       while (high-low > 3) { \
49         t = (low+high)/2; \
50         if (rp[t] > bcol) high = t; \
51         else              low  = t; \
52       } \
53       for ( _i=low; _i<high; _i++ ) { \
54         if (rp[_i] > bcol) break; \
55         if (rp[_i] == bcol) { \
56           bap  = ap +  bs2*_i + bs*cidx + ridx; \
57           if (addv == ADD_VALUES) *bap += value;  \
58           else                    *bap  = value;  \
59           goto a_noinsert; \
60         } \
61       } \
62       if (a->nonew == 1) goto a_noinsert; \
63       else if (a->nonew == -1) SETERRQ(1,0,"Inserting a new nonzero in the matrix"); \
64       if (nrow >= rmax) { \
65         /* there is no extra room in row, therefore enlarge */ \
66         int    new_nz = ai[a->mbs] + CHUNKSIZE,len,*new_i,*new_j; \
67         Scalar *new_a; \
68  \
69         if (a->nonew == -2) SETERRQ(1,0,"Inserting a new nonzero in the matrix"); \
70  \
71         /* malloc new storage space */ \
72         len     = new_nz*(sizeof(int)+bs2*sizeof(Scalar))+(a->mbs+1)*sizeof(int); \
73         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
74         new_j   = (int *) (new_a + bs2*new_nz); \
75         new_i   = new_j + new_nz; \
76  \
77         /* copy over old data into new slots */ \
78         for ( ii=0; ii<brow+1; ii++ ) {new_i[ii] = ai[ii];} \
79         for ( ii=brow+1; ii<a->mbs+1; ii++ ) {new_i[ii] = ai[ii]+CHUNKSIZE;} \
80         PetscMemcpy(new_j,aj,(ai[brow]+nrow)*sizeof(int)); \
81         len = (new_nz - CHUNKSIZE - ai[brow] - nrow); \
82         PetscMemcpy(new_j+ai[brow]+nrow+CHUNKSIZE,aj+ai[brow]+nrow, \
83                                                            len*sizeof(int)); \
84         PetscMemcpy(new_a,aa,(ai[brow]+nrow)*bs2*sizeof(Scalar)); \
85         PetscMemzero(new_a+bs2*(ai[brow]+nrow),bs2*CHUNKSIZE*sizeof(Scalar)); \
86         PetscMemcpy(new_a+bs2*(ai[brow]+nrow+CHUNKSIZE), \
87                     aa+bs2*(ai[brow]+nrow),bs2*len*sizeof(Scalar));  \
88         /* free up old matrix storage */ \
89         PetscFree(a->a);  \
90         if (!a->singlemalloc) {PetscFree(a->i);PetscFree(a->j);} \
91         aa = a->a = new_a; ai = a->i = new_i; aj = a->j = new_j;  \
92         a->singlemalloc = 1; \
93  \
94         rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
95         rmax = aimax[brow] = aimax[brow] + CHUNKSIZE; \
96         PLogObjectMemory(A,CHUNKSIZE*(sizeof(int) + bs2*sizeof(Scalar))); \
97         a->maxnz += bs2*CHUNKSIZE; \
98         a->reallocs++; \
99         a->nz++; \
100       } \
101       N = nrow++ - 1;  \
102       /* shift up all the later entries in this row */ \
103       for ( ii=N; ii>=_i; ii-- ) { \
104         rp[ii+1] = rp[ii]; \
105         PetscMemcpy(ap+bs2*(ii+1),ap+bs2*(ii),bs2*sizeof(Scalar)); \
106       } \
107       if (N>=_i) PetscMemzero(ap+bs2*_i,bs2*sizeof(Scalar));  \
108       rp[_i]                      = bcol;  \
109       ap[bs2*_i + bs*cidx + ridx] = value;  \
110       a_noinsert:; \
111     ailen[brow] = nrow; \
112 }
113 
114 #define  MatSetValues_SeqBAIJ_B_Private(row,col,value,addv) \
115 { \
116  \
117     brow = row/bs;  \
118     rp   = bj + bi[brow]; ap = ba + bs2*bi[brow]; \
119     rmax = bimax[brow]; nrow = bilen[brow]; \
120       bcol = col/bs; \
121       ridx = row % bs; cidx = col % bs; \
122       low = 0; high = nrow; \
123       while (high-low > 3) { \
124         t = (low+high)/2; \
125         if (rp[t] > bcol) high = t; \
126         else              low  = t; \
127       } \
128       for ( _i=low; _i<high; _i++ ) { \
129         if (rp[_i] > bcol) break; \
130         if (rp[_i] == bcol) { \
131           bap  = ap +  bs2*_i + bs*cidx + ridx; \
132           if (addv == ADD_VALUES) *bap += value;  \
133           else                    *bap  = value;  \
134           goto b_noinsert; \
135         } \
136       } \
137       if (b->nonew == 1) goto b_noinsert; \
138       else if (b->nonew == -1) SETERRQ(1,0,"Inserting a new nonzero in the matrix"); \
139       if (nrow >= rmax) { \
140         /* there is no extra room in row, therefore enlarge */ \
141         int    new_nz = bi[b->mbs] + CHUNKSIZE,len,*new_i,*new_j; \
142         Scalar *new_a; \
143  \
144         if (b->nonew == -2) SETERRQ(1,0,"Inserting a new nonzero in the matrix"); \
145  \
146         /* malloc new storage space */ \
147         len     = new_nz*(sizeof(int)+bs2*sizeof(Scalar))+(b->mbs+1)*sizeof(int); \
148         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
149         new_j   = (int *) (new_a + bs2*new_nz); \
150         new_i   = new_j + new_nz; \
151  \
152         /* copy over old data into new slots */ \
153         for ( ii=0; ii<brow+1; ii++ ) {new_i[ii] = bi[ii];} \
154         for ( ii=brow+1; ii<b->mbs+1; ii++ ) {new_i[ii] = bi[ii]+CHUNKSIZE;} \
155         PetscMemcpy(new_j,bj,(bi[brow]+nrow)*sizeof(int)); \
156         len = (new_nz - CHUNKSIZE - bi[brow] - nrow); \
157         PetscMemcpy(new_j+bi[brow]+nrow+CHUNKSIZE,bj+bi[brow]+nrow, \
158                                                            len*sizeof(int)); \
159         PetscMemcpy(new_a,ba,(bi[brow]+nrow)*bs2*sizeof(Scalar)); \
160         PetscMemzero(new_a+bs2*(bi[brow]+nrow),bs2*CHUNKSIZE*sizeof(Scalar)); \
161         PetscMemcpy(new_a+bs2*(bi[brow]+nrow+CHUNKSIZE), \
162                     ba+bs2*(bi[brow]+nrow),bs2*len*sizeof(Scalar));  \
163         /* free up old matrix storage */ \
164         PetscFree(b->a);  \
165         if (!b->singlemalloc) {PetscFree(b->i);PetscFree(b->j);} \
166         ba = b->a = new_a; bi = b->i = new_i; bj = b->j = new_j;  \
167         b->singlemalloc = 1; \
168  \
169         rp   = bj + bi[brow]; ap = ba + bs2*bi[brow]; \
170         rmax = bimax[brow] = bimax[brow] + CHUNKSIZE; \
171         PLogObjectMemory(B,CHUNKSIZE*(sizeof(int) + bs2*sizeof(Scalar))); \
172         b->maxnz += bs2*CHUNKSIZE; \
173         b->reallocs++; \
174         b->nz++; \
175       } \
176       N = nrow++ - 1;  \
177       /* shift up all the later entries in this row */ \
178       for ( ii=N; ii>=_i; ii-- ) { \
179         rp[ii+1] = rp[ii]; \
180         PetscMemcpy(ap+bs2*(ii+1),ap+bs2*(ii),bs2*sizeof(Scalar)); \
181       } \
182       if (N>=_i) PetscMemzero(ap+bs2*_i,bs2*sizeof(Scalar));  \
183       rp[_i]                      = bcol;  \
184       ap[bs2*_i + bs*cidx + ridx] = value;  \
185       b_noinsert:; \
186     bilen[brow] = nrow; \
187 }
188 
189 #undef __FUNC__
190 #define __FUNC__ "MatSetValues_MPIBAIJ"
191 int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
192 {
193   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
194   Scalar      value;
195   int         ierr,i,j,row,col;
196   int         roworiented = baij->roworiented,rstart_orig=baij->rstart_bs ;
197   int         rend_orig=baij->rend_bs,cstart_orig=baij->cstart_bs;
198   int         cend_orig=baij->cend_bs,bs=baij->bs;
199 
200   /* Some Variables required in the macro */
201   Mat         A = baij->A;
202   Mat_SeqBAIJ *a = (Mat_SeqBAIJ *) (A)->data;
203   int         *aimax=a->imax,*ai=a->i,*ailen=a->ilen,*aj=a->j;
204   Scalar      *aa=a->a;
205 
206   Mat         B = baij->B;
207   Mat_SeqBAIJ *b = (Mat_SeqBAIJ *) (B)->data;
208   int         *bimax=b->imax,*bi=b->i,*bilen=b->ilen,*bj=b->j;
209   Scalar      *ba=b->a;
210 
211   int         *rp,ii,nrow,_i,rmax,N,brow,bcol;
212   int         low,high,t,ridx,cidx,bs2=a->bs2;
213   Scalar      *ap,*bap;
214 
215   PetscFunctionBegin;
216   for ( i=0; i<m; i++ ) {
217 #if defined(USE_PETSC_BOPT_g)
218     if (im[i] < 0) SETERRQ(1,0,"Negative row");
219     if (im[i] >= baij->M) SETERRQ(1,0,"Row too large");
220 #endif
221     if (im[i] >= rstart_orig && im[i] < rend_orig) {
222       row = im[i] - rstart_orig;
223       for ( j=0; j<n; j++ ) {
224         if (in[j] >= cstart_orig && in[j] < cend_orig){
225           col = in[j] - cstart_orig;
226           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
227           MatSetValues_SeqBAIJ_A_Private(row,col,value,addv);
228           /* ierr = MatSetValues_SeqBAIJ(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
229         }
230 #if defined(USE_PETSC_BOPT_g)
231         else if (in[j] < 0) {SETERRQ(1,0,"Negative column");}
232         else if (in[j] >= baij->N) {SETERRQ(1,0,"Col too large");}
233 #endif
234         else {
235           if (mat->was_assembled) {
236             if (!baij->colmap) {
237               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
238             }
239             col = baij->colmap[in[j]/bs] - 1 + in[j]%bs;
240             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
241               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
242               col =  in[j];
243               /* Reinitialize the variables required by MatSetValues_SeqBAIJ_B_Private() */
244               B = baij->B;
245               b = (Mat_SeqBAIJ *) (B)->data;
246               bimax=b->imax;bi=b->i;bilen=b->ilen;bj=b->j;
247               ba=b->a;
248             }
249           } else col = in[j];
250           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
251           MatSetValues_SeqBAIJ_B_Private(row,col,value,addv);
252           /* ierr = MatSetValues_SeqBAIJ(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
253         }
254       }
255     } else {
256       if (roworiented && !baij->donotstash) {
257         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
258       } else {
259         if (!baij->donotstash) {
260           row = im[i];
261 	  for ( j=0; j<n; j++ ) {
262 	    ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
263           }
264         }
265       }
266     }
267   }
268   PetscFunctionReturn(0);
269 }
270 
271 extern int MatSetValuesBlocked_SeqBAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
272 #undef __FUNC__
273 #define __FUNC__ "MatSetValuesBlocked_MPIBAIJ"
274 int MatSetValuesBlocked_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
275 {
276   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
277   Scalar      *value,*barray=baij->barray;
278   int         ierr,i,j,ii,jj,row,col,k,l;
279   int         roworiented = baij->roworiented,rstart=baij->rstart ;
280   int         rend=baij->rend,cstart=baij->cstart,stepval;
281   int         cend=baij->cend,bs=baij->bs,bs2=baij->bs2;
282 
283   if(!barray) {
284     baij->barray = barray = (Scalar*) PetscMalloc(bs2*sizeof(Scalar)); CHKPTRQ(barray);
285   }
286 
287   if (roworiented) {
288     stepval = (n-1)*bs;
289   } else {
290     stepval = (m-1)*bs;
291   }
292   for ( i=0; i<m; i++ ) {
293 #if defined(USE_PETSC_BOPT_g)
294     if (im[i] < 0) SETERRQ(1,0,"Negative row");
295     if (im[i] >= baij->Mbs) SETERRQ(1,0,"Row too large");
296 #endif
297     if (im[i] >= rstart && im[i] < rend) {
298       row = im[i] - rstart;
299       for ( j=0; j<n; j++ ) {
300         /* If NumCol = 1 then a copy is not required */
301         if ((roworiented) && (n == 1)) {
302           barray = v + i*bs2;
303         } else if((!roworiented) && (m == 1)) {
304           barray = v + j*bs2;
305         } else { /* Here a copy is required */
306           if (roworiented) {
307             value = v + i*(stepval+bs)*bs + j*bs;
308           } else {
309             value = v + j*(stepval+bs)*bs + i*bs;
310           }
311           for ( ii=0; ii<bs; ii++,value+=stepval ) {
312             for (jj=0; jj<bs; jj++ ) {
313               *barray++  = *value++;
314             }
315           }
316           barray -=bs2;
317         }
318 
319         if (in[j] >= cstart && in[j] < cend){
320           col  = in[j] - cstart;
321           ierr = MatSetValuesBlocked_SeqBAIJ(baij->A,1,&row,1,&col,barray,addv);CHKERRQ(ierr);
322         }
323 #if defined(USE_PETSC_BOPT_g)
324         else if (in[j] < 0) {SETERRQ(1,0,"Negative column");}
325         else if (in[j] >= baij->Nbs) {SETERRQ(1,0,"Column too large");}
326 #endif
327         else {
328           if (mat->was_assembled) {
329             if (!baij->colmap) {
330               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
331             }
332 
333 #if defined(USE_PETSC_BOPT_g)
334             if ((baij->colmap[in[j]] - 1) % bs) {SETERRQ(1,0,"Incorrect colmap");}
335 #endif
336             col = (baij->colmap[in[j]] - 1)/bs;
337             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
338               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
339               col =  in[j];
340             }
341           }
342           else col = in[j];
343           ierr = MatSetValuesBlocked_SeqBAIJ(baij->B,1,&row,1,&col,barray,addv);CHKERRQ(ierr);
344         }
345       }
346     } else {
347       if (!baij->donotstash) {
348         if (roworiented ) {
349           row   = im[i]*bs;
350           value = v + i*(stepval+bs)*bs;
351           for ( j=0; j<bs; j++,row++ ) {
352             for ( k=0; k<n; k++ ) {
353               for ( col=in[k]*bs,l=0; l<bs; l++,col++) {
354                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
355               }
356             }
357           }
358         } else {
359           for ( j=0; j<n; j++ ) {
360             value = v + j*(stepval+bs)*bs + i*bs;
361             col   = in[j]*bs;
362             for ( k=0; k<bs; k++,col++,value+=stepval) {
363               for ( row = im[i]*bs,l=0; l<bs; l++,row++) {
364                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
365               }
366             }
367           }
368         }
369       }
370     }
371   }
372   PetscFunctionReturn(0);
373 }
374 
375 #undef __FUNC__
376 #define __FUNC__ "MatGetValues_MPIBAIJ"
377 int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
378 {
379   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
380   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
381   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col;
382 
383   for ( i=0; i<m; i++ ) {
384     if (idxm[i] < 0) SETERRQ(1,0,"Negative row");
385     if (idxm[i] >= baij->M) SETERRQ(1,0,"Row too large");
386     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
387       row = idxm[i] - bsrstart;
388       for ( j=0; j<n; j++ ) {
389         if (idxn[j] < 0) SETERRQ(1,0,"Negative column");
390         if (idxn[j] >= baij->N) SETERRQ(1,0,"Col too large");
391         if (idxn[j] >= bscstart && idxn[j] < bscend){
392           col = idxn[j] - bscstart;
393           ierr = MatGetValues(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
394         } else {
395           if (!baij->colmap) {
396             ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
397           }
398           if((baij->colmap[idxn[j]/bs]-1 < 0) ||
399              (baij->garray[(baij->colmap[idxn[j]/bs]-1)/bs] != idxn[j]/bs)) *(v+i*n+j) = 0.0;
400           else {
401             col  = (baij->colmap[idxn[j]/bs]-1) + idxn[j]%bs;
402             ierr = MatGetValues(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
403           }
404         }
405       }
406     } else {
407       SETERRQ(1,0,"Only local values currently supported");
408     }
409   }
410   PetscFunctionReturn(0);
411 }
412 
413 #undef __FUNC__
414 #define __FUNC__ "MatNorm_MPIBAIJ"
415 int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
416 {
417   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
418   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
419   int        ierr, i,bs2=baij->bs2;
420   double     sum = 0.0;
421   Scalar     *v;
422 
423   PetscFunctionBegin;
424   if (baij->size == 1) {
425     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
426   } else {
427     if (type == NORM_FROBENIUS) {
428       v = amat->a;
429       for (i=0; i<amat->nz*bs2; i++ ) {
430 #if defined(USE_PETSC_COMPLEX)
431         sum += real(conj(*v)*(*v)); v++;
432 #else
433         sum += (*v)*(*v); v++;
434 #endif
435       }
436       v = bmat->a;
437       for (i=0; i<bmat->nz*bs2; i++ ) {
438 #if defined(USE_PETSC_COMPLEX)
439         sum += real(conj(*v)*(*v)); v++;
440 #else
441         sum += (*v)*(*v); v++;
442 #endif
443       }
444       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
445       *norm = sqrt(*norm);
446     } else {
447       SETERRQ(PETSC_ERR_SUP,0,"No support for this norm yet");
448     }
449   }
450   PetscFunctionReturn(0);
451 }
452 
453 #undef __FUNC__
454 #define __FUNC__ "MatAssemblyBegin_MPIBAIJ"
455 int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
456 {
457   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
458   MPI_Comm    comm = mat->comm;
459   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
460   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
461   MPI_Request *send_waits,*recv_waits;
462   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
463   InsertMode  addv;
464   Scalar      *rvalues,*svalues;
465 
466   PetscFunctionBegin;
467   /* make sure all processors are either in INSERTMODE or ADDMODE */
468   MPI_Allreduce(&mat->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
469   if (addv == (ADD_VALUES|INSERT_VALUES)) {
470     SETERRQ(1,0,"Some processors inserted others added");
471   }
472   mat->insertmode = addv; /* in case this processor had no cache */
473 
474   /*  first count number of contributors to each processor */
475   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
476   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
477   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
478   for ( i=0; i<baij->stash.n; i++ ) {
479     idx = baij->stash.idx[i];
480     for ( j=0; j<size; j++ ) {
481       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
482         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
483       }
484     }
485   }
486   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
487 
488   /* inform other processors of number of messages and max length*/
489   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
490   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
491   nreceives = work[rank];
492   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
493   nmax = work[rank];
494   PetscFree(work);
495 
496   /* post receives:
497        1) each message will consist of ordered pairs
498      (global index,value) we store the global index as a double
499      to simplify the message passing.
500        2) since we don't know how long each individual message is we
501      allocate the largest needed buffer for each receive. Potentially
502      this is a lot of wasted space.
503 
504 
505        This could be done better.
506   */
507   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
508   CHKPTRQ(rvalues);
509   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
510   CHKPTRQ(recv_waits);
511   for ( i=0; i<nreceives; i++ ) {
512     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
513               comm,recv_waits+i);
514   }
515 
516   /* do sends:
517       1) starts[i] gives the starting index in svalues for stuff going to
518          the ith processor
519   */
520   svalues = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
521   send_waits = (MPI_Request *) PetscMalloc((nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
522   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
523   starts[0] = 0;
524   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
525   for ( i=0; i<baij->stash.n; i++ ) {
526     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
527     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
528     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
529   }
530   PetscFree(owner);
531   starts[0] = 0;
532   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
533   count = 0;
534   for ( i=0; i<size; i++ ) {
535     if (procs[i]) {
536       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
537                 comm,send_waits+count++);
538     }
539   }
540   PetscFree(starts); PetscFree(nprocs);
541 
542   /* Free cache space */
543   PLogInfo(mat,"MatAssemblyBegin_MPIBAIJ:Number of off-processor values %d\n",baij->stash.n);
544   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
545 
546   baij->svalues    = svalues;    baij->rvalues    = rvalues;
547   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
548   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
549   baij->rmax       = nmax;
550 
551   PetscFunctionReturn(0);
552 }
553 #include <math.h>
554 #define HASH_KEY 0.6180339887
555 #define HASH1(size,key) ((int)((size)*fmod(((key)*HASH_KEY),1)))
556 #define HASH2(size,key) ((int)((size)*fmod(((key+0.5)*HASH_KEY),1)))
557 
558 
559 int CreateHashTable(Mat mat,double factor)
560 {
561   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
562   Mat         A = baij->A, B=baij->B;
563   Mat_SeqBAIJ *a=(Mat_SeqBAIJ *)A->data, *b=(Mat_SeqBAIJ *)B->data;
564   int         i,j,k,nz=a->nz+b->nz,h1,h2,*ai=a->i,*aj=a->j,*bi=b->i,*bj=b->j;
565   int         size=(int)(factor*nz),ct=0,max1=0,max2=0;
566   /* Scalar      *aa=a->a,*ba=b->a; */
567   double      key;
568   static double *HT;
569   static      int flag=1;
570   extern int PetscGlobalRank;
571 
572   PetscFunctionBegin;
573   flag = 1;
574   /* Allocate Memory for Hash Table */
575   if (flag) {
576     HT = (double*)PetscMalloc(size*sizeof(double));
577     flag = 0;
578   }
579   PetscMemzero(HT,size*sizeof(double));
580 
581   /* Loop Over A */
582   for ( i=0; i<a->n; i++ ) {
583     for ( j=ai[i]; j<ai[i+1]; j++ ) {
584       key = i*baij->n+aj[j]+1;
585       h1  = HASH1(size,key);
586       h2  = HASH2(size,key);
587 
588       for ( k=1; k<size; k++ ){
589         if (HT[(h1*k)%size] == 0.0) {
590           HT[(h1*k)%size] = key;
591           break;
592         } else ct++;
593       }
594       if (k> max1) max1 =k;
595     }
596   }
597   /* Loop Over B */
598   for ( i=0; i<b->n; i++ ) {
599     for ( j=bi[i]; j<bi[i+1]; j++ ) {
600       key = i*b->n+bj[j]+1;
601       h1  = HASH1(size,key);
602       for ( k=1; k<size; k++ ){
603         if (HT[(h1*k)%size] == 0.0) {
604           HT[(h1*k)%size] = key;
605           break;
606         } else ct++;
607       }
608       if (k> max2) max2 =k;
609     }
610   }
611 
612   /* Print Summary */
613   for ( i=0,key=0.0,j=0; i<size; i++)
614     if (HT[i]) {j++;}
615 
616   printf("[%d] fact = %3.2f max1 = %5d max2 = %5d Size %5d - Searches %5d Avg %5.2f Keys %5d\n",
617          PetscGlobalRank,factor,max1,max2,size,ct+j,((float)ct+j)/j,j);
618   PetscFree(HT);
619   PetscFunctionReturn(0);
620 }
621 
622 #undef __FUNC__
623 #define __FUNC__ "MatAssemblyEnd_MPIBAIJ"
624 int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
625 {
626   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
627   MPI_Status  *send_status,recv_status;
628   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
629   int         bs=baij->bs,row,col,other_disassembled,flg;
630   Scalar      *values,val;
631   InsertMode  addv = mat->insertmode;
632 
633   PetscFunctionBegin;
634   /*  wait on receives */
635   while (count) {
636     MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);
637     /* unpack receives into our local space */
638     values = baij->rvalues + 3*imdex*baij->rmax;
639     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
640     n = n/3;
641     for ( i=0; i<n; i++ ) {
642       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
643       col = (int) PetscReal(values[3*i+1]);
644       val = values[3*i+2];
645       if (col >= baij->cstart*bs && col < baij->cend*bs) {
646         col -= baij->cstart*bs;
647         ierr = MatSetValues(baij->A,1,&row,1,&col,&val,addv); CHKERRQ(ierr)
648       } else {
649         if (mat->was_assembled) {
650           if (!baij->colmap) {
651             ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);
652           }
653           col = (baij->colmap[col/bs]) - 1 + col%bs;
654           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
655             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
656             col = (int) PetscReal(values[3*i+1]);
657           }
658         }
659         ierr = MatSetValues(baij->B,1,&row,1,&col,&val,addv); CHKERRQ(ierr)
660       }
661     }
662     count--;
663   }
664   PetscFree(baij->recv_waits); PetscFree(baij->rvalues);
665 
666   /* wait on sends */
667   if (baij->nsends) {
668     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
669     MPI_Waitall(baij->nsends,baij->send_waits,send_status);
670     PetscFree(send_status);
671   }
672   PetscFree(baij->send_waits); PetscFree(baij->svalues);
673 
674   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
675   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
676 
677   /* determine if any processor has disassembled, if so we must
678      also disassemble ourselfs, in order that we may reassemble. */
679   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
680   if (mat->was_assembled && !other_disassembled) {
681     ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
682   }
683 
684   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
685     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
686   }
687   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
688   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
689 
690   ierr = OptionsHasName(PETSC_NULL,"-use_hash",&flg); CHKERRQ(ierr);
691   if (flg) {
692     double fact;
693     for ( fact=1.2; fact<2.0; fact +=0.05) CreateHashTable(mat,fact);
694   }
695   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
696   PetscFunctionReturn(0);
697 }
698 
699 #undef __FUNC__
700 #define __FUNC__ "MatView_MPIBAIJ_Binary"
701 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
702 {
703   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
704   int          ierr;
705 
706   PetscFunctionBegin;
707   if (baij->size == 1) {
708     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
709   } else SETERRQ(1,0,"Only uniprocessor output supported");
710   PetscFunctionReturn(0);
711 }
712 
713 #undef __FUNC__
714 #define __FUNC__ "MatView_MPIBAIJ_ASCIIorDraworMatlab"
715 static int MatView_MPIBAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
716 {
717   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
718   int          ierr, format,rank,bs = baij->bs;
719   FILE         *fd;
720   ViewerType   vtype;
721 
722   PetscFunctionBegin;
723   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
724   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
725     ierr = ViewerGetFormat(viewer,&format);
726     if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
727       MatInfo info;
728       MPI_Comm_rank(mat->comm,&rank);
729       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
730       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
731       PetscSequentialPhaseBegin(mat->comm,1);
732       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
733               rank,baij->m,(int)info.nz_used*bs,(int)info.nz_allocated*bs,
734               baij->bs,(int)info.memory);
735       ierr = MatGetInfo(baij->A,MAT_LOCAL,&info);
736       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
737       ierr = MatGetInfo(baij->B,MAT_LOCAL,&info);
738       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
739       fflush(fd);
740       PetscSequentialPhaseEnd(mat->comm,1);
741       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
742       PetscFunctionReturn(0);
743     } else if (format == VIEWER_FORMAT_ASCII_INFO) {
744       PetscPrintf(mat->comm,"  block size is %d\n",bs);
745       PetscFunctionReturn(0);
746     }
747   }
748 
749   if (vtype == DRAW_VIEWER) {
750     Draw       draw;
751     PetscTruth isnull;
752     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
753     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) PetscFunctionReturn(0);
754   }
755 
756   if (vtype == ASCII_FILE_VIEWER) {
757     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
758     PetscSequentialPhaseBegin(mat->comm,1);
759     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
760            baij->rank,baij->m,baij->rstart*bs,baij->rend*bs,baij->n,
761             baij->cstart*bs,baij->cend*bs);
762     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
763     ierr = MatView(baij->B,viewer); CHKERRQ(ierr);
764     fflush(fd);
765     PetscSequentialPhaseEnd(mat->comm,1);
766   } else {
767     int size = baij->size;
768     rank = baij->rank;
769     if (size == 1) {
770       ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
771     } else {
772       /* assemble the entire matrix onto first processor. */
773       Mat         A;
774       Mat_SeqBAIJ *Aloc;
775       int         M = baij->M, N = baij->N,*ai,*aj,row,col,i,j,k,*rvals;
776       int         mbs=baij->mbs;
777       Scalar      *a;
778 
779       if (!rank) {
780         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
781         CHKERRQ(ierr);
782       } else {
783         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
784         CHKERRQ(ierr);
785       }
786       PLogObjectParent(mat,A);
787 
788       /* copy over the A part */
789       Aloc = (Mat_SeqBAIJ*) baij->A->data;
790       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
791       row = baij->rstart;
792       rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
793 
794       for ( i=0; i<mbs; i++ ) {
795         rvals[0] = bs*(baij->rstart + i);
796         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
797         for ( j=ai[i]; j<ai[i+1]; j++ ) {
798           col = (baij->cstart+aj[j])*bs;
799           for (k=0; k<bs; k++ ) {
800             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
801             col++; a += bs;
802           }
803         }
804       }
805       /* copy over the B part */
806       Aloc = (Mat_SeqBAIJ*) baij->B->data;
807       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
808       row = baij->rstart*bs;
809       for ( i=0; i<mbs; i++ ) {
810         rvals[0] = bs*(baij->rstart + i);
811         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
812         for ( j=ai[i]; j<ai[i+1]; j++ ) {
813           col = baij->garray[aj[j]]*bs;
814           for (k=0; k<bs; k++ ) {
815             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
816             col++; a += bs;
817           }
818         }
819       }
820       PetscFree(rvals);
821       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
822       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
823       if (!rank) {
824         ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
825       }
826       ierr = MatDestroy(A); CHKERRQ(ierr);
827     }
828   }
829   PetscFunctionReturn(0);
830 }
831 
832 
833 
834 #undef __FUNC__
835 #define __FUNC__ "MatView_MPIBAIJ"
836 int MatView_MPIBAIJ(PetscObject obj,Viewer viewer)
837 {
838   Mat         mat = (Mat) obj;
839   int         ierr;
840   ViewerType  vtype;
841 
842   PetscFunctionBegin;
843   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
844   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
845       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
846     ierr = MatView_MPIBAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
847   } else if (vtype == BINARY_FILE_VIEWER) {
848     ierr = MatView_MPIBAIJ_Binary(mat,viewer);CHKERRQ(ierr);
849   }
850   PetscFunctionReturn(0);
851 }
852 
853 #undef __FUNC__
854 #define __FUNC__ "MatDestroy_MPIBAIJ"
855 int MatDestroy_MPIBAIJ(PetscObject obj)
856 {
857   Mat         mat = (Mat) obj;
858   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
859   int         ierr;
860 
861   PetscFunctionBegin;
862 #if defined(USE_PETSC_LOG)
863   PLogObjectState(obj,"Rows=%d, Cols=%d",baij->M,baij->N);
864 #endif
865 
866   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
867   PetscFree(baij->rowners);
868   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
869   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
870   if (baij->colmap) PetscFree(baij->colmap);
871   if (baij->garray) PetscFree(baij->garray);
872   if (baij->lvec)   VecDestroy(baij->lvec);
873   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
874   if (baij->rowvalues) PetscFree(baij->rowvalues);
875   if (baij->barray) PetscFree(baij->barray);
876   PetscFree(baij);
877   PLogObjectDestroy(mat);
878   PetscHeaderDestroy(mat);
879   PetscFunctionReturn(0);
880 }
881 
882 #undef __FUNC__
883 #define __FUNC__ "MatMult_MPIBAIJ"
884 int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
885 {
886   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
887   int         ierr, nt;
888 
889   PetscFunctionBegin;
890   VecGetLocalSize_Fast(xx,nt);
891   if (nt != a->n) {
892     SETERRQ(1,0,"Incompatible partition of A and xx");
893   }
894   VecGetLocalSize_Fast(yy,nt);
895   if (nt != a->m) {
896     SETERRQ(1,0,"Incompatible parition of A and yy");
897   }
898   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
899   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
900   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
901   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
902   ierr = VecScatterPostRecvs(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
903   PetscFunctionReturn(0);
904 }
905 
906 #undef __FUNC__
907 #define __FUNC__ "MatMultAdd_MPIBAIJ"
908 int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
909 {
910   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
911   int        ierr;
912 
913   PetscFunctionBegin;
914   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
915   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
916   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
917   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
918   PetscFunctionReturn(0);
919 }
920 
921 #undef __FUNC__
922 #define __FUNC__ "MatMultTrans_MPIBAIJ"
923 int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
924 {
925   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
926   int         ierr;
927 
928   PetscFunctionBegin;
929   /* do nondiagonal part */
930   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
931   /* send it on its way */
932   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
933   /* do local part */
934   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
935   /* receive remote parts: note this assumes the values are not actually */
936   /* inserted in yy until the next line, which is true for my implementation*/
937   /* but is not perhaps always true. */
938   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
939   PetscFunctionReturn(0);
940 }
941 
942 #undef __FUNC__
943 #define __FUNC__ "MatMultTransAdd_MPIBAIJ"
944 int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
945 {
946   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
947   int         ierr;
948 
949   PetscFunctionBegin;
950   /* do nondiagonal part */
951   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
952   /* send it on its way */
953   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
954   /* do local part */
955   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
956   /* receive remote parts: note this assumes the values are not actually */
957   /* inserted in yy until the next line, which is true for my implementation*/
958   /* but is not perhaps always true. */
959   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
960   PetscFunctionReturn(0);
961 }
962 
963 /*
964   This only works correctly for square matrices where the subblock A->A is the
965    diagonal block
966 */
967 #undef __FUNC__
968 #define __FUNC__ "MatGetDiagonal_MPIBAIJ"
969 int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
970 {
971   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
972   int         ierr;
973 
974   PetscFunctionBegin;
975   if (a->M != a->N) SETERRQ(1,0,"Supports only square matrix where A->A is diag block");
976   ierr = MatGetDiagonal(a->A,v);CHKERRQ(ierr);
977   PetscFunctionReturn(0);
978 }
979 
980 #undef __FUNC__
981 #define __FUNC__ "MatScale_MPIBAIJ"
982 int MatScale_MPIBAIJ(Scalar *aa,Mat A)
983 {
984   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
985   int         ierr;
986 
987   PetscFunctionBegin;
988   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
989   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
990   PetscFunctionReturn(0);
991 }
992 
993 #undef __FUNC__
994 #define __FUNC__ "MatGetSize_MPIBAIJ"
995 int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
996 {
997   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
998 
999   PetscFunctionBegin;
1000   if (m) *m = mat->M;
1001   if (n) *n = mat->N;
1002   PetscFunctionReturn(0);
1003 }
1004 
1005 #undef __FUNC__
1006 #define __FUNC__ "MatGetLocalSize_MPIBAIJ"
1007 int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
1008 {
1009   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1010 
1011   PetscFunctionBegin;
1012   *m = mat->m; *n = mat->N;
1013   PetscFunctionReturn(0);
1014 }
1015 
1016 #undef __FUNC__
1017 #define __FUNC__ "MatGetOwnershipRange_MPIBAIJ"
1018 int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
1019 {
1020   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1021 
1022   PetscFunctionBegin;
1023   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
1024   PetscFunctionReturn(0);
1025 }
1026 
1027 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
1028 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
1029 
1030 #undef __FUNC__
1031 #define __FUNC__ "MatGetRow_MPIBAIJ"
1032 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
1033 {
1034   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1035   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
1036   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
1037   int        nztot, nzA, nzB, lrow, brstart = mat->rstart*bs, brend = mat->rend*bs;
1038   int        *cmap, *idx_p,cstart = mat->cstart;
1039 
1040   PetscFunctionBegin;
1041   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,0,"Already active");
1042   mat->getrowactive = PETSC_TRUE;
1043 
1044   if (!mat->rowvalues && (idx || v)) {
1045     /*
1046         allocate enough space to hold information from the longest row.
1047     */
1048     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
1049     int     max = 1,mbs = mat->mbs,tmp;
1050     for ( i=0; i<mbs; i++ ) {
1051       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
1052       if (max < tmp) { max = tmp; }
1053     }
1054     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
1055     CHKPTRQ(mat->rowvalues);
1056     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
1057   }
1058 
1059   if (row < brstart || row >= brend) SETERRQ(1,0,"Only local rows")
1060   lrow = row - brstart;
1061 
1062   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
1063   if (!v)   {pvA = 0; pvB = 0;}
1064   if (!idx) {pcA = 0; if (!v) pcB = 0;}
1065   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1066   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1067   nztot = nzA + nzB;
1068 
1069   cmap  = mat->garray;
1070   if (v  || idx) {
1071     if (nztot) {
1072       /* Sort by increasing column numbers, assuming A and B already sorted */
1073       int imark = -1;
1074       if (v) {
1075         *v = v_p = mat->rowvalues;
1076         for ( i=0; i<nzB; i++ ) {
1077           if (cmap[cworkB[i]/bs] < cstart)   v_p[i] = vworkB[i];
1078           else break;
1079         }
1080         imark = i;
1081         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
1082         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
1083       }
1084       if (idx) {
1085         *idx = idx_p = mat->rowindices;
1086         if (imark > -1) {
1087           for ( i=0; i<imark; i++ ) {
1088             idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs;
1089           }
1090         } else {
1091           for ( i=0; i<nzB; i++ ) {
1092             if (cmap[cworkB[i]/bs] < cstart)
1093               idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
1094             else break;
1095           }
1096           imark = i;
1097         }
1098         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart*bs + cworkA[i];
1099         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
1100       }
1101     } else {
1102       if (idx) *idx = 0;
1103       if (v)   *v   = 0;
1104     }
1105   }
1106   *nz = nztot;
1107   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1108   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1109   PetscFunctionReturn(0);
1110 }
1111 
1112 #undef __FUNC__
1113 #define __FUNC__ "MatRestoreRow_MPIBAIJ"
1114 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
1115 {
1116   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1117 
1118   PetscFunctionBegin;
1119   if (baij->getrowactive == PETSC_FALSE) {
1120     SETERRQ(1,0,"MatGetRow not called");
1121   }
1122   baij->getrowactive = PETSC_FALSE;
1123   PetscFunctionReturn(0);
1124 }
1125 
1126 #undef __FUNC__
1127 #define __FUNC__ "MatGetBlockSize_MPIBAIJ"
1128 int MatGetBlockSize_MPIBAIJ(Mat mat,int *bs)
1129 {
1130   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1131 
1132   PetscFunctionBegin;
1133   *bs = baij->bs;
1134   PetscFunctionReturn(0);
1135 }
1136 
1137 #undef __FUNC__
1138 #define __FUNC__ "MatZeroEntries_MPIBAIJ"
1139 int MatZeroEntries_MPIBAIJ(Mat A)
1140 {
1141   Mat_MPIBAIJ *l = (Mat_MPIBAIJ *) A->data;
1142   int         ierr;
1143 
1144   PetscFunctionBegin;
1145   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
1146   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
1147   PetscFunctionReturn(0);
1148 }
1149 
1150 #undef __FUNC__
1151 #define __FUNC__ "MatGetInfo_MPIBAIJ"
1152 int MatGetInfo_MPIBAIJ(Mat matin,MatInfoType flag,MatInfo *info)
1153 {
1154   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) matin->data;
1155   Mat         A = a->A, B = a->B;
1156   int         ierr;
1157   double      isend[5], irecv[5];
1158 
1159   PetscFunctionBegin;
1160   info->rows_global    = (double)a->M;
1161   info->columns_global = (double)a->N;
1162   info->rows_local     = (double)a->m;
1163   info->columns_local  = (double)a->N;
1164   info->block_size     = (double)a->bs;
1165   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
1166   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->memory;
1167   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
1168   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->memory;
1169   if (flag == MAT_LOCAL) {
1170     info->nz_used      = isend[0];
1171     info->nz_allocated = isend[1];
1172     info->nz_unneeded  = isend[2];
1173     info->memory       = isend[3];
1174     info->mallocs      = isend[4];
1175   } else if (flag == MAT_GLOBAL_MAX) {
1176     MPI_Allreduce(isend,irecv,5,MPI_INT,MPI_MAX,matin->comm);
1177     info->nz_used      = irecv[0];
1178     info->nz_allocated = irecv[1];
1179     info->nz_unneeded  = irecv[2];
1180     info->memory       = irecv[3];
1181     info->mallocs      = irecv[4];
1182   } else if (flag == MAT_GLOBAL_SUM) {
1183     MPI_Allreduce(isend,irecv,5,MPI_INT,MPI_SUM,matin->comm);
1184     info->nz_used      = irecv[0];
1185     info->nz_allocated = irecv[1];
1186     info->nz_unneeded  = irecv[2];
1187     info->memory       = irecv[3];
1188     info->mallocs      = irecv[4];
1189   }
1190   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
1191   info->fill_ratio_needed = 0;
1192   info->factor_mallocs    = 0;
1193   PetscFunctionReturn(0);
1194 }
1195 
1196 #undef __FUNC__
1197 #define __FUNC__ "MatSetOption_MPIBAIJ"
1198 int MatSetOption_MPIBAIJ(Mat A,MatOption op)
1199 {
1200   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1201 
1202   PetscFunctionBegin;
1203   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
1204       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
1205       op == MAT_COLUMNS_UNSORTED ||
1206       op == MAT_COLUMNS_SORTED ||
1207       op == MAT_NEW_NONZERO_ALLOCATION_ERROR ||
1208       op == MAT_NEW_NONZERO_LOCATION_ERROR) {
1209         MatSetOption(a->A,op);
1210         MatSetOption(a->B,op);
1211   } else if (op == MAT_ROW_ORIENTED) {
1212         a->roworiented = 1;
1213         MatSetOption(a->A,op);
1214         MatSetOption(a->B,op);
1215   } else if (op == MAT_ROWS_SORTED ||
1216              op == MAT_ROWS_UNSORTED ||
1217              op == MAT_SYMMETRIC ||
1218              op == MAT_STRUCTURALLY_SYMMETRIC ||
1219              op == MAT_YES_NEW_DIAGONALS)
1220     PLogInfo(A,"Info:MatSetOption_MPIBAIJ:Option ignored\n");
1221   else if (op == MAT_COLUMN_ORIENTED) {
1222     a->roworiented = 0;
1223     MatSetOption(a->A,op);
1224     MatSetOption(a->B,op);
1225   } else if (op == MAT_IGNORE_OFF_PROC_ENTRIES) {
1226     a->donotstash = 1;
1227   } else if (op == MAT_NO_NEW_DIAGONALS) {
1228     SETERRQ(PETSC_ERR_SUP,0,"MAT_NO_NEW_DIAGONALS");
1229   } else {
1230     SETERRQ(PETSC_ERR_SUP,0,"unknown option");
1231   }
1232   PetscFunctionReturn(0);
1233 }
1234 
1235 #undef __FUNC__
1236 #define __FUNC__ "MatTranspose_MPIBAIJ("
1237 int MatTranspose_MPIBAIJ(Mat A,Mat *matout)
1238 {
1239   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) A->data;
1240   Mat_SeqBAIJ *Aloc;
1241   Mat        B;
1242   int        ierr,M=baij->M,N=baij->N,*ai,*aj,row,i,*rvals,j,k,col;
1243   int        bs=baij->bs,mbs=baij->mbs;
1244   Scalar     *a;
1245 
1246   PetscFunctionBegin;
1247   if (matout == PETSC_NULL && M != N) SETERRQ(1,0,"Square matrix only for in-place");
1248   ierr = MatCreateMPIBAIJ(A->comm,baij->bs,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);
1249   CHKERRQ(ierr);
1250 
1251   /* copy over the A part */
1252   Aloc = (Mat_SeqBAIJ*) baij->A->data;
1253   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1254   row = baij->rstart;
1255   rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
1256 
1257   for ( i=0; i<mbs; i++ ) {
1258     rvals[0] = bs*(baij->rstart + i);
1259     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1260     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1261       col = (baij->cstart+aj[j])*bs;
1262       for (k=0; k<bs; k++ ) {
1263         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1264         col++; a += bs;
1265       }
1266     }
1267   }
1268   /* copy over the B part */
1269   Aloc = (Mat_SeqBAIJ*) baij->B->data;
1270   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1271   row = baij->rstart*bs;
1272   for ( i=0; i<mbs; i++ ) {
1273     rvals[0] = bs*(baij->rstart + i);
1274     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1275     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1276       col = baij->garray[aj[j]]*bs;
1277       for (k=0; k<bs; k++ ) {
1278         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1279         col++; a += bs;
1280       }
1281     }
1282   }
1283   PetscFree(rvals);
1284   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1285   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1286 
1287   if (matout != PETSC_NULL) {
1288     *matout = B;
1289   } else {
1290     /* This isn't really an in-place transpose .... but free data structures from baij */
1291     PetscFree(baij->rowners);
1292     ierr = MatDestroy(baij->A); CHKERRQ(ierr);
1293     ierr = MatDestroy(baij->B); CHKERRQ(ierr);
1294     if (baij->colmap) PetscFree(baij->colmap);
1295     if (baij->garray) PetscFree(baij->garray);
1296     if (baij->lvec) VecDestroy(baij->lvec);
1297     if (baij->Mvctx) VecScatterDestroy(baij->Mvctx);
1298     PetscFree(baij);
1299     PetscMemcpy(A,B,sizeof(struct _p_Mat));
1300     PetscHeaderDestroy(B);
1301   }
1302   PetscFunctionReturn(0);
1303 }
1304 
1305 #undef __FUNC__
1306 #define __FUNC__ "MatDiagonalScale_MPIBAIJ"
1307 int MatDiagonalScale_MPIBAIJ(Mat A,Vec ll,Vec rr)
1308 {
1309   Mat a = ((Mat_MPIBAIJ *) A->data)->A;
1310   Mat b = ((Mat_MPIBAIJ *) A->data)->B;
1311   int ierr,s1,s2,s3;
1312 
1313   PetscFunctionBegin;
1314   if (ll)  {
1315     ierr = VecGetLocalSize(ll,&s1); CHKERRQ(ierr);
1316     ierr = MatGetLocalSize(A,&s2,&s3); CHKERRQ(ierr);
1317     if (s1!=s2) SETERRQ(1,0,"non-conforming local sizes");
1318     ierr = MatDiagonalScale(a,ll,0); CHKERRQ(ierr);
1319     ierr = MatDiagonalScale(b,ll,0); CHKERRQ(ierr);
1320   }
1321   if (rr) SETERRQ(1,0,"not supported for right vector");
1322   PetscFunctionReturn(0);
1323 }
1324 
1325 /* the code does not do the diagonal entries correctly unless the
1326    matrix is square and the column and row owerships are identical.
1327    This is a BUG. The only way to fix it seems to be to access
1328    baij->A and baij->B directly and not through the MatZeroRows()
1329    routine.
1330 */
1331 #undef __FUNC__
1332 #define __FUNC__ "MatZeroRows_MPIBAIJ"
1333 int MatZeroRows_MPIBAIJ(Mat A,IS is,Scalar *diag)
1334 {
1335   Mat_MPIBAIJ    *l = (Mat_MPIBAIJ *) A->data;
1336   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
1337   int            *procs,*nprocs,j,found,idx,nsends,*work;
1338   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
1339   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
1340   int            *lens,imdex,*lrows,*values,bs=l->bs;
1341   MPI_Comm       comm = A->comm;
1342   MPI_Request    *send_waits,*recv_waits;
1343   MPI_Status     recv_status,*send_status;
1344   IS             istmp;
1345 
1346   PetscFunctionBegin;
1347   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
1348   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
1349 
1350   /*  first count number of contributors to each processor */
1351   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
1352   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
1353   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
1354   for ( i=0; i<N; i++ ) {
1355     idx = rows[i];
1356     found = 0;
1357     for ( j=0; j<size; j++ ) {
1358       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
1359         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
1360       }
1361     }
1362     if (!found) SETERRQ(1,0,"Index out of range");
1363   }
1364   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
1365 
1366   /* inform other processors of number of messages and max length*/
1367   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
1368   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
1369   nrecvs = work[rank];
1370   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
1371   nmax = work[rank];
1372   PetscFree(work);
1373 
1374   /* post receives:   */
1375   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); CHKPTRQ(rvalues);
1376   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
1377   for ( i=0; i<nrecvs; i++ ) {
1378     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
1379   }
1380 
1381   /* do sends:
1382       1) starts[i] gives the starting index in svalues for stuff going to
1383          the ith processor
1384   */
1385   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
1386   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
1387   CHKPTRQ(send_waits);
1388   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
1389   starts[0] = 0;
1390   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1391   for ( i=0; i<N; i++ ) {
1392     svalues[starts[owner[i]]++] = rows[i];
1393   }
1394   ISRestoreIndices(is,&rows);
1395 
1396   starts[0] = 0;
1397   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1398   count = 0;
1399   for ( i=0; i<size; i++ ) {
1400     if (procs[i]) {
1401       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
1402     }
1403   }
1404   PetscFree(starts);
1405 
1406   base = owners[rank]*bs;
1407 
1408   /*  wait on receives */
1409   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
1410   source = lens + nrecvs;
1411   count  = nrecvs; slen = 0;
1412   while (count) {
1413     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
1414     /* unpack receives into our local space */
1415     MPI_Get_count(&recv_status,MPI_INT,&n);
1416     source[imdex]  = recv_status.MPI_SOURCE;
1417     lens[imdex]  = n;
1418     slen += n;
1419     count--;
1420   }
1421   PetscFree(recv_waits);
1422 
1423   /* move the data into the send scatter */
1424   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
1425   count = 0;
1426   for ( i=0; i<nrecvs; i++ ) {
1427     values = rvalues + i*nmax;
1428     for ( j=0; j<lens[i]; j++ ) {
1429       lrows[count++] = values[j] - base;
1430     }
1431   }
1432   PetscFree(rvalues); PetscFree(lens);
1433   PetscFree(owner); PetscFree(nprocs);
1434 
1435   /* actually zap the local rows */
1436   ierr = ISCreateGeneral(PETSC_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
1437   PLogObjectParent(A,istmp);
1438   PetscFree(lrows);
1439   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
1440   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
1441   ierr = ISDestroy(istmp); CHKERRQ(ierr);
1442 
1443   /* wait on sends */
1444   if (nsends) {
1445     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
1446     MPI_Waitall(nsends,send_waits,send_status);
1447     PetscFree(send_status);
1448   }
1449   PetscFree(send_waits); PetscFree(svalues);
1450 
1451   PetscFunctionReturn(0);
1452 }
1453 extern int MatPrintHelp_SeqBAIJ(Mat);
1454 #undef __FUNC__
1455 #define __FUNC__ "MatPrintHelp_MPIBAIJ"
1456 int MatPrintHelp_MPIBAIJ(Mat A)
1457 {
1458   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1459   int         ierr;
1460 
1461   PetscFunctionBegin;
1462   if (!a->rank) {
1463     ierr = MatPrintHelp_SeqBAIJ(a->A);CHKERRQ(ierr);
1464   }
1465   PetscFunctionReturn(0);
1466 }
1467 
1468 #undef __FUNC__
1469 #define __FUNC__ "MatSetUnfactored_MPIBAIJ"
1470 int MatSetUnfactored_MPIBAIJ(Mat A)
1471 {
1472   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1473   int         ierr;
1474 
1475   PetscFunctionBegin;
1476   ierr = MatSetUnfactored(a->A); CHKERRQ(ierr);
1477   PetscFunctionReturn(0);
1478 }
1479 
1480 static int MatConvertSameType_MPIBAIJ(Mat,Mat *,int);
1481 
1482 /* -------------------------------------------------------------------*/
1483 static struct _MatOps MatOps = {
1484   MatSetValues_MPIBAIJ,MatGetRow_MPIBAIJ,MatRestoreRow_MPIBAIJ,MatMult_MPIBAIJ,
1485   MatMultAdd_MPIBAIJ,MatMultTrans_MPIBAIJ,MatMultTransAdd_MPIBAIJ,0,
1486   0,0,0,0,
1487   0,0,MatTranspose_MPIBAIJ,MatGetInfo_MPIBAIJ,
1488   0,MatGetDiagonal_MPIBAIJ,MatDiagonalScale_MPIBAIJ,MatNorm_MPIBAIJ,
1489   MatAssemblyBegin_MPIBAIJ,MatAssemblyEnd_MPIBAIJ,0,MatSetOption_MPIBAIJ,
1490   MatZeroEntries_MPIBAIJ,MatZeroRows_MPIBAIJ,0,
1491   0,0,0,MatGetSize_MPIBAIJ,
1492   MatGetLocalSize_MPIBAIJ,MatGetOwnershipRange_MPIBAIJ,0,0,
1493   0,0,MatConvertSameType_MPIBAIJ,0,0,
1494   0,0,0,MatGetSubMatrices_MPIBAIJ,
1495   MatIncreaseOverlap_MPIBAIJ,MatGetValues_MPIBAIJ,0,MatPrintHelp_MPIBAIJ,
1496   MatScale_MPIBAIJ,0,0,0,MatGetBlockSize_MPIBAIJ,
1497   0,0,0,0,0,0,MatSetUnfactored_MPIBAIJ,0,MatSetValuesBlocked_MPIBAIJ};
1498 
1499 
1500 #undef __FUNC__
1501 #define __FUNC__ "MatCreateMPIBAIJ"
1502 /*@C
1503    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
1504    (block compressed row).  For good matrix assembly performance
1505    the user should preallocate the matrix storage by setting the parameters
1506    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1507    performance can be increased by more than a factor of 50.
1508 
1509    Input Parameters:
1510 .  comm - MPI communicator
1511 .  bs   - size of blockk
1512 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1513            This value should be the same as the local size used in creating the
1514            y vector for the matrix-vector product y = Ax.
1515 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1516            This value should be the same as the local size used in creating the
1517            x vector for the matrix-vector product y = Ax.
1518 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1519 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1520 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
1521            submatrix  (same for all local rows)
1522 .  d_nzz - array containing the number of block nonzeros in the various block rows
1523            of the in diagonal portion of the local (possibly different for each block
1524            row) or PETSC_NULL.  You must leave room for the diagonal entry even if
1525            it is zero.
1526 .  o_nz  - number of block nonzeros per block row in the off-diagonal portion of local
1527            submatrix (same for all local rows).
1528 .  o_nzz - array containing the number of nonzeros in the various block rows of the
1529            off-diagonal portion of the local submatrix (possibly different for
1530            each block row) or PETSC_NULL.
1531 
1532    Output Parameter:
1533 .  A - the matrix
1534 
1535    Notes:
1536    The user MUST specify either the local or global matrix dimensions
1537    (possibly both).
1538 
1539    Storage Information:
1540    For a square global matrix we define each processor's diagonal portion
1541    to be its local rows and the corresponding columns (a square submatrix);
1542    each processor's off-diagonal portion encompasses the remainder of the
1543    local matrix (a rectangular submatrix).
1544 
1545    The user can specify preallocated storage for the diagonal part of
1546    the local submatrix with either d_nz or d_nnz (not both).  Set
1547    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1548    memory allocation.  Likewise, specify preallocated storage for the
1549    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1550 
1551    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1552    the figure below we depict these three local rows and all columns (0-11).
1553 
1554 $          0 1 2 3 4 5 6 7 8 9 10 11
1555 $         -------------------
1556 $  row 3  |  o o o d d d o o o o o o
1557 $  row 4  |  o o o d d d o o o o o o
1558 $  row 5  |  o o o d d d o o o o o o
1559 $         -------------------
1560 $
1561 
1562    Thus, any entries in the d locations are stored in the d (diagonal)
1563    submatrix, and any entries in the o locations are stored in the
1564    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1565    stored simply in the MATSEQBAIJ format for compressed row storage.
1566 
1567    Now d_nz should indicate the number of block nonzeros per row in the d matrix,
1568    and o_nz should indicate the number of block nonzeros per row in the o matrix.
1569    In general, for PDE problems in which most nonzeros are near the diagonal,
1570    one expects d_nz >> o_nz.   For large problems you MUST preallocate memory
1571    or you will get TERRIBLE performance; see the users' manual chapter on
1572    matrices.
1573 
1574 .keywords: matrix, block, aij, compressed row, sparse, parallel
1575 
1576 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues()
1577 @*/
1578 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
1579                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1580 {
1581   Mat          B;
1582   Mat_MPIBAIJ  *b;
1583   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE,size;
1584 
1585   PetscFunctionBegin;
1586   if (bs < 1) SETERRQ(1,0,"Invalid block size specified, must be positive");
1587 
1588   MPI_Comm_size(comm,&size);
1589   if (size == 1) {
1590     if (M == PETSC_DECIDE) M = m;
1591     if (N == PETSC_DECIDE) N = n;
1592     ierr = MatCreateSeqBAIJ(comm,bs,M,N,d_nz,d_nnz,A); CHKERRQ(ierr);
1593     PetscFunctionReturn(0);
1594   }
1595 
1596   *A = 0;
1597   PetscHeaderCreate(B,_p_Mat,MAT_COOKIE,MATMPIBAIJ,comm,MatDestroy,MatView);
1598   PLogObjectCreate(B);
1599   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
1600   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
1601   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1602 
1603   B->destroy    = MatDestroy_MPIBAIJ;
1604   B->view       = MatView_MPIBAIJ;
1605   B->mapping    = 0;
1606   B->factor     = 0;
1607   B->assembled  = PETSC_FALSE;
1608 
1609   B->insertmode = NOT_SET_VALUES;
1610   MPI_Comm_rank(comm,&b->rank);
1611   MPI_Comm_size(comm,&b->size);
1612 
1613   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL)) {
1614     SETERRQ(1,0,"Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1615   }
1616   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) SETERRQ(1,0,"either M or m should be specified");
1617   if ( N == PETSC_DECIDE && n == PETSC_DECIDE) SETERRQ(1,0,"either N or n should be specified");
1618   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
1619   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
1620 
1621   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1622     work[0] = m; work[1] = n;
1623     mbs = m/bs; nbs = n/bs;
1624     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
1625     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
1626     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
1627   }
1628   if (m == PETSC_DECIDE) {
1629     Mbs = M/bs;
1630     if (Mbs*bs != M) SETERRQ(1,0,"No of global rows must be divisible by blocksize");
1631     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
1632     m   = mbs*bs;
1633   }
1634   if (n == PETSC_DECIDE) {
1635     Nbs = N/bs;
1636     if (Nbs*bs != N) SETERRQ(1,0,"No of global cols must be divisible by blocksize");
1637     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
1638     n   = nbs*bs;
1639   }
1640   if (mbs*bs != m || nbs*bs != n) SETERRQ(1,0,"No of local rows, cols must be divisible by blocksize");
1641 
1642   b->m = m; B->m = m;
1643   b->n = n; B->n = n;
1644   b->N = N; B->N = N;
1645   b->M = M; B->M = M;
1646   b->bs  = bs;
1647   b->bs2 = bs*bs;
1648   b->mbs = mbs;
1649   b->nbs = nbs;
1650   b->Mbs = Mbs;
1651   b->Nbs = Nbs;
1652 
1653   /* build local table of row and column ownerships */
1654   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1655   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIBAIJ));
1656   b->cowners = b->rowners + b->size + 2;
1657   MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
1658   b->rowners[0] = 0;
1659   for ( i=2; i<=b->size; i++ ) {
1660     b->rowners[i] += b->rowners[i-1];
1661   }
1662   b->rstart    = b->rowners[b->rank];
1663   b->rend      = b->rowners[b->rank+1];
1664   b->rstart_bs = b->rstart * bs;
1665   b->rend_bs   = b->rend * bs;
1666 
1667   MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
1668   b->cowners[0] = 0;
1669   for ( i=2; i<=b->size; i++ ) {
1670     b->cowners[i] += b->cowners[i-1];
1671   }
1672   b->cstart    = b->cowners[b->rank];
1673   b->cend      = b->cowners[b->rank+1];
1674   b->cstart_bs = b->cstart * bs;
1675   b->cend_bs   = b->cend * bs;
1676 
1677   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1678   ierr = MatCreateSeqBAIJ(PETSC_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1679   PLogObjectParent(B,b->A);
1680   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1681   ierr = MatCreateSeqBAIJ(PETSC_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1682   PLogObjectParent(B,b->B);
1683 
1684   /* build cache for off array entries formed */
1685   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1686   b->donotstash  = 0;
1687   b->colmap      = 0;
1688   b->garray      = 0;
1689   b->roworiented = 1;
1690 
1691   /* stuff used in block assembly */
1692   b->barray      = 0;
1693 
1694   /* stuff used for matrix vector multiply */
1695   b->lvec        = 0;
1696   b->Mvctx       = 0;
1697 
1698   /* stuff for MatGetRow() */
1699   b->rowindices   = 0;
1700   b->rowvalues    = 0;
1701   b->getrowactive = PETSC_FALSE;
1702 
1703   *A = B;
1704   PetscFunctionReturn(0);
1705 }
1706 
1707 #undef __FUNC__
1708 #define __FUNC__ "MatConvertSameType_MPIBAIJ"
1709 static int MatConvertSameType_MPIBAIJ(Mat matin,Mat *newmat,int cpvalues)
1710 {
1711   Mat         mat;
1712   Mat_MPIBAIJ *a,*oldmat = (Mat_MPIBAIJ *) matin->data;
1713   int         ierr, len=0, flg;
1714 
1715   PetscFunctionBegin;
1716   *newmat       = 0;
1717   PetscHeaderCreate(mat,_p_Mat,MAT_COOKIE,MATMPIBAIJ,matin->comm,MatDestroy,MatView);
1718   PLogObjectCreate(mat);
1719   mat->data       = (void *) (a = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(a);
1720   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1721   mat->destroy    = MatDestroy_MPIBAIJ;
1722   mat->view       = MatView_MPIBAIJ;
1723   mat->factor     = matin->factor;
1724   mat->assembled  = PETSC_TRUE;
1725 
1726   a->m = mat->m   = oldmat->m;
1727   a->n = mat->n   = oldmat->n;
1728   a->M = mat->M   = oldmat->M;
1729   a->N = mat->N   = oldmat->N;
1730 
1731   a->bs  = oldmat->bs;
1732   a->bs2 = oldmat->bs2;
1733   a->mbs = oldmat->mbs;
1734   a->nbs = oldmat->nbs;
1735   a->Mbs = oldmat->Mbs;
1736   a->Nbs = oldmat->Nbs;
1737 
1738   a->rstart       = oldmat->rstart;
1739   a->rend         = oldmat->rend;
1740   a->cstart       = oldmat->cstart;
1741   a->cend         = oldmat->cend;
1742   a->size         = oldmat->size;
1743   a->rank         = oldmat->rank;
1744   mat->insertmode = NOT_SET_VALUES;
1745   a->rowvalues    = 0;
1746   a->getrowactive = PETSC_FALSE;
1747   a->barray       = 0;
1748 
1749   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1750   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIBAIJ));
1751   a->cowners = a->rowners + a->size + 2;
1752   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1753   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1754   if (oldmat->colmap) {
1755     a->colmap = (int *) PetscMalloc((a->Nbs)*sizeof(int));CHKPTRQ(a->colmap);
1756     PLogObjectMemory(mat,(a->Nbs)*sizeof(int));
1757     PetscMemcpy(a->colmap,oldmat->colmap,(a->Nbs)*sizeof(int));
1758   } else a->colmap = 0;
1759   if (oldmat->garray && (len = ((Mat_SeqBAIJ *) (oldmat->B->data))->nbs)) {
1760     a->garray = (int *) PetscMalloc(len*sizeof(int)); CHKPTRQ(a->garray);
1761     PLogObjectMemory(mat,len*sizeof(int));
1762     PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1763   } else a->garray = 0;
1764 
1765   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1766   PLogObjectParent(mat,a->lvec);
1767   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1768   PLogObjectParent(mat,a->Mvctx);
1769   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1770   PLogObjectParent(mat,a->A);
1771   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1772   PLogObjectParent(mat,a->B);
1773   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1774   if (flg) {
1775     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1776   }
1777   *newmat = mat;
1778   PetscFunctionReturn(0);
1779 }
1780 
1781 #include "sys.h"
1782 
1783 #undef __FUNC__
1784 #define __FUNC__ "MatLoad_MPIBAIJ"
1785 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
1786 {
1787   Mat          A;
1788   int          i, nz, ierr, j,rstart, rend, fd;
1789   Scalar       *vals,*buf;
1790   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1791   MPI_Status   status;
1792   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
1793   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
1794   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,bs2,Mbs,mbs,extra_rows;
1795   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
1796   int          dcount,kmax,k,nzcount,tmp;
1797 
1798   PetscFunctionBegin;
1799   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
1800   bs2  = bs*bs;
1801 
1802   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1803   if (!rank) {
1804     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1805     ierr = PetscBinaryRead(fd,(char *)header,4,PETSC_INT); CHKERRQ(ierr);
1806     if (header[0] != MAT_COOKIE) SETERRQ(1,0,"not matrix object");
1807     if (header[3] < 0) {
1808       SETERRQ(1,1,"Matrix stored in special format on disk, cannot load as MPIBAIJ");
1809     }
1810   }
1811 
1812   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1813   M = header[1]; N = header[2];
1814 
1815   if (M != N) SETERRQ(1,0,"Can only do square matrices");
1816 
1817   /*
1818      This code adds extra rows to make sure the number of rows is
1819      divisible by the blocksize
1820   */
1821   Mbs        = M/bs;
1822   extra_rows = bs - M + bs*(Mbs);
1823   if (extra_rows == bs) extra_rows = 0;
1824   else                  Mbs++;
1825   if (extra_rows &&!rank) {
1826     PLogInfo(0,"MatLoad_MPIBAIJ:Padding loaded matrix to match blocksize\n");
1827   }
1828 
1829   /* determine ownership of all rows */
1830   mbs = Mbs/size + ((Mbs % size) > rank);
1831   m   = mbs * bs;
1832   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
1833   browners = rowners + size + 1;
1834   MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1835   rowners[0] = 0;
1836   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
1837   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
1838   rstart = rowners[rank];
1839   rend   = rowners[rank+1];
1840 
1841   /* distribute row lengths to all processors */
1842   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
1843   if (!rank) {
1844     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
1845     ierr = PetscBinaryRead(fd,rowlengths,M,PETSC_INT); CHKERRQ(ierr);
1846     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
1847     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1848     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
1849     MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);
1850     PetscFree(sndcounts);
1851   } else {
1852     MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);
1853   }
1854 
1855   if (!rank) {
1856     /* calculate the number of nonzeros on each processor */
1857     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1858     PetscMemzero(procsnz,size*sizeof(int));
1859     for ( i=0; i<size; i++ ) {
1860       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
1861         procsnz[i] += rowlengths[j];
1862       }
1863     }
1864     PetscFree(rowlengths);
1865 
1866     /* determine max buffer needed and allocate it */
1867     maxnz = 0;
1868     for ( i=0; i<size; i++ ) {
1869       maxnz = PetscMax(maxnz,procsnz[i]);
1870     }
1871     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1872 
1873     /* read in my part of the matrix column indices  */
1874     nz = procsnz[0];
1875     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1876     mycols = ibuf;
1877     if (size == 1)  nz -= extra_rows;
1878     ierr = PetscBinaryRead(fd,mycols,nz,PETSC_INT); CHKERRQ(ierr);
1879     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
1880 
1881     /* read in every ones (except the last) and ship off */
1882     for ( i=1; i<size-1; i++ ) {
1883       nz = procsnz[i];
1884       ierr = PetscBinaryRead(fd,cols,nz,PETSC_INT); CHKERRQ(ierr);
1885       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1886     }
1887     /* read in the stuff for the last proc */
1888     if ( size != 1 ) {
1889       nz = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
1890       ierr = PetscBinaryRead(fd,cols,nz,PETSC_INT); CHKERRQ(ierr);
1891       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
1892       MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);
1893     }
1894     PetscFree(cols);
1895   } else {
1896     /* determine buffer space needed for message */
1897     nz = 0;
1898     for ( i=0; i<m; i++ ) {
1899       nz += locrowlens[i];
1900     }
1901     ibuf = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1902     mycols = ibuf;
1903     /* receive message of column indices*/
1904     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1905     MPI_Get_count(&status,MPI_INT,&maxnz);
1906     if (maxnz != nz) SETERRQ(1,0,"something is wrong with file");
1907   }
1908 
1909   /* loop over local rows, determining number of off diagonal entries */
1910   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
1911   odlens = dlens + (rend-rstart);
1912   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
1913   PetscMemzero(mask,3*Mbs*sizeof(int));
1914   masked1 = mask    + Mbs;
1915   masked2 = masked1 + Mbs;
1916   rowcount = 0; nzcount = 0;
1917   for ( i=0; i<mbs; i++ ) {
1918     dcount  = 0;
1919     odcount = 0;
1920     for ( j=0; j<bs; j++ ) {
1921       kmax = locrowlens[rowcount];
1922       for ( k=0; k<kmax; k++ ) {
1923         tmp = mycols[nzcount++]/bs;
1924         if (!mask[tmp]) {
1925           mask[tmp] = 1;
1926           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
1927           else masked1[dcount++] = tmp;
1928         }
1929       }
1930       rowcount++;
1931     }
1932 
1933     dlens[i]  = dcount;
1934     odlens[i] = odcount;
1935 
1936     /* zero out the mask elements we set */
1937     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
1938     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
1939   }
1940 
1941   /* create our matrix */
1942   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);
1943          CHKERRQ(ierr);
1944   A = *newmat;
1945   MatSetOption(A,MAT_COLUMNS_SORTED);
1946 
1947   if (!rank) {
1948     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
1949     /* read in my part of the matrix numerical values  */
1950     nz = procsnz[0];
1951     vals = buf;
1952     mycols = ibuf;
1953     if (size == 1)  nz -= extra_rows;
1954     ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
1955     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
1956 
1957     /* insert into matrix */
1958     jj      = rstart*bs;
1959     for ( i=0; i<m; i++ ) {
1960       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1961       mycols += locrowlens[i];
1962       vals   += locrowlens[i];
1963       jj++;
1964     }
1965     /* read in other processors (except the last one) and ship out */
1966     for ( i=1; i<size-1; i++ ) {
1967       nz = procsnz[i];
1968       vals = buf;
1969       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
1970       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1971     }
1972     /* the last proc */
1973     if ( size != 1 ){
1974       nz = procsnz[i] - extra_rows;
1975       vals = buf;
1976       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
1977       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
1978       MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);
1979     }
1980     PetscFree(procsnz);
1981   } else {
1982     /* receive numeric values */
1983     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
1984 
1985     /* receive message of values*/
1986     vals = buf;
1987     mycols = ibuf;
1988     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1989     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1990     if (maxnz != nz) SETERRQ(1,0,"something is wrong with file");
1991 
1992     /* insert into matrix */
1993     jj      = rstart*bs;
1994     for ( i=0; i<m; i++ ) {
1995       ierr    = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1996       mycols += locrowlens[i];
1997       vals   += locrowlens[i];
1998       jj++;
1999     }
2000   }
2001   PetscFree(locrowlens);
2002   PetscFree(buf);
2003   PetscFree(ibuf);
2004   PetscFree(rowners);
2005   PetscFree(dlens);
2006   PetscFree(mask);
2007   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2008   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2009   PetscFunctionReturn(0);
2010 }
2011 
2012 
2013