xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision 98166b15b11922357fb11bb2cf7cf9eb33ae6adf)
1 #ifdef PETSC_RCS_HEADER
2 static char vcid[] = "$Id: mpibaij.c,v 1.86 1997/10/29 23:05:26 bsmith Exp bsmith $";
3 #endif
4 
5 #include "pinclude/pviewer.h"
6 #include "src/mat/impls/baij/mpi/mpibaij.h"
7 #include "src/vec/vecimpl.h"
8 
9 
10 extern int MatSetUpMultiply_MPIBAIJ(Mat);
11 extern int DisAssemble_MPIBAIJ(Mat);
12 extern int MatIncreaseOverlap_MPIBAIJ(Mat,int,IS *,int);
13 extern int MatGetSubMatrices_MPIBAIJ(Mat,int,IS *,IS *,MatGetSubMatrixCall,Mat **);
14 
15 /*
16      Local utility routine that creates a mapping from the global column
17    number to the local number in the off-diagonal part of the local
18    storage of the matrix.  This is done in a non scable way since the
19    length of colmap equals the global matrix length.
20 */
21 #undef __FUNC__
22 #define __FUNC__ "CreateColmap_MPIBAIJ_Private"
23 static int CreateColmap_MPIBAIJ_Private(Mat mat)
24 {
25   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
26   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
27   int         nbs = B->nbs,i,bs=B->bs;;
28 
29   PetscFunctionBegin;
30   baij->colmap = (int *) PetscMalloc((baij->Nbs+1)*sizeof(int));CHKPTRQ(baij->colmap);
31   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
32   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
33   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i*bs+1;
34   PetscFunctionReturn(0);
35 }
36 
37 #define CHUNKSIZE  10
38 
39 #define  MatSetValues_SeqBAIJ_A_Private(row,col,value,addv) \
40 { \
41  \
42     brow = row/bs;  \
43     rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
44     rmax = aimax[brow]; nrow = ailen[brow]; \
45       bcol = col/bs; \
46       ridx = row % bs; cidx = col % bs; \
47       low = 0; high = nrow; \
48       while (high-low > 3) { \
49         t = (low+high)/2; \
50         if (rp[t] > bcol) high = t; \
51         else              low  = t; \
52       } \
53       for ( _i=low; _i<high; _i++ ) { \
54         if (rp[_i] > bcol) break; \
55         if (rp[_i] == bcol) { \
56           bap  = ap +  bs2*_i + bs*cidx + ridx; \
57           if (addv == ADD_VALUES) *bap += value;  \
58           else                    *bap  = value;  \
59           goto a_noinsert; \
60         } \
61       } \
62       if (a->nonew == 1) goto a_noinsert; \
63       else if (a->nonew == -1) SETERRQ(1,0,"Inserting a new nonzero in the matrix"); \
64       if (nrow >= rmax) { \
65         /* there is no extra room in row, therefore enlarge */ \
66         int    new_nz = ai[a->mbs] + CHUNKSIZE,len,*new_i,*new_j; \
67         Scalar *new_a; \
68  \
69         if (a->nonew == -2) SETERRQ(1,0,"Inserting a new nonzero in the matrix"); \
70  \
71         /* malloc new storage space */ \
72         len     = new_nz*(sizeof(int)+bs2*sizeof(Scalar))+(a->mbs+1)*sizeof(int); \
73         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
74         new_j   = (int *) (new_a + bs2*new_nz); \
75         new_i   = new_j + new_nz; \
76  \
77         /* copy over old data into new slots */ \
78         for ( ii=0; ii<brow+1; ii++ ) {new_i[ii] = ai[ii];} \
79         for ( ii=brow+1; ii<a->mbs+1; ii++ ) {new_i[ii] = ai[ii]+CHUNKSIZE;} \
80         PetscMemcpy(new_j,aj,(ai[brow]+nrow)*sizeof(int)); \
81         len = (new_nz - CHUNKSIZE - ai[brow] - nrow); \
82         PetscMemcpy(new_j+ai[brow]+nrow+CHUNKSIZE,aj+ai[brow]+nrow, \
83                                                            len*sizeof(int)); \
84         PetscMemcpy(new_a,aa,(ai[brow]+nrow)*bs2*sizeof(Scalar)); \
85         PetscMemzero(new_a+bs2*(ai[brow]+nrow),bs2*CHUNKSIZE*sizeof(Scalar)); \
86         PetscMemcpy(new_a+bs2*(ai[brow]+nrow+CHUNKSIZE), \
87                     aa+bs2*(ai[brow]+nrow),bs2*len*sizeof(Scalar));  \
88         /* free up old matrix storage */ \
89         PetscFree(a->a);  \
90         if (!a->singlemalloc) {PetscFree(a->i);PetscFree(a->j);} \
91         aa = a->a = new_a; ai = a->i = new_i; aj = a->j = new_j;  \
92         a->singlemalloc = 1; \
93  \
94         rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
95         rmax = aimax[brow] = aimax[brow] + CHUNKSIZE; \
96         PLogObjectMemory(A,CHUNKSIZE*(sizeof(int) + bs2*sizeof(Scalar))); \
97         a->maxnz += bs2*CHUNKSIZE; \
98         a->reallocs++; \
99         a->nz++; \
100       } \
101       N = nrow++ - 1;  \
102       /* shift up all the later entries in this row */ \
103       for ( ii=N; ii>=_i; ii-- ) { \
104         rp[ii+1] = rp[ii]; \
105         PetscMemcpy(ap+bs2*(ii+1),ap+bs2*(ii),bs2*sizeof(Scalar)); \
106       } \
107       if (N>=_i) PetscMemzero(ap+bs2*_i,bs2*sizeof(Scalar));  \
108       rp[_i]                      = bcol;  \
109       ap[bs2*_i + bs*cidx + ridx] = value;  \
110       a_noinsert:; \
111     ailen[brow] = nrow; \
112 }
113 
114 #define  MatSetValues_SeqBAIJ_B_Private(row,col,value,addv) \
115 { \
116  \
117     brow = row/bs;  \
118     rp   = bj + bi[brow]; ap = ba + bs2*bi[brow]; \
119     rmax = bimax[brow]; nrow = bilen[brow]; \
120       bcol = col/bs; \
121       ridx = row % bs; cidx = col % bs; \
122       low = 0; high = nrow; \
123       while (high-low > 3) { \
124         t = (low+high)/2; \
125         if (rp[t] > bcol) high = t; \
126         else              low  = t; \
127       } \
128       for ( _i=low; _i<high; _i++ ) { \
129         if (rp[_i] > bcol) break; \
130         if (rp[_i] == bcol) { \
131           bap  = ap +  bs2*_i + bs*cidx + ridx; \
132           if (addv == ADD_VALUES) *bap += value;  \
133           else                    *bap  = value;  \
134           goto b_noinsert; \
135         } \
136       } \
137       if (b->nonew == 1) goto b_noinsert; \
138       else if (b->nonew == -1) SETERRQ(1,0,"Inserting a new nonzero in the matrix"); \
139       if (nrow >= rmax) { \
140         /* there is no extra room in row, therefore enlarge */ \
141         int    new_nz = bi[b->mbs] + CHUNKSIZE,len,*new_i,*new_j; \
142         Scalar *new_a; \
143  \
144         if (b->nonew == -2) SETERRQ(1,0,"Inserting a new nonzero in the matrix"); \
145  \
146         /* malloc new storage space */ \
147         len     = new_nz*(sizeof(int)+bs2*sizeof(Scalar))+(b->mbs+1)*sizeof(int); \
148         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
149         new_j   = (int *) (new_a + bs2*new_nz); \
150         new_i   = new_j + new_nz; \
151  \
152         /* copy over old data into new slots */ \
153         for ( ii=0; ii<brow+1; ii++ ) {new_i[ii] = bi[ii];} \
154         for ( ii=brow+1; ii<b->mbs+1; ii++ ) {new_i[ii] = bi[ii]+CHUNKSIZE;} \
155         PetscMemcpy(new_j,bj,(bi[brow]+nrow)*sizeof(int)); \
156         len = (new_nz - CHUNKSIZE - bi[brow] - nrow); \
157         PetscMemcpy(new_j+bi[brow]+nrow+CHUNKSIZE,bj+bi[brow]+nrow, \
158                                                            len*sizeof(int)); \
159         PetscMemcpy(new_a,ba,(bi[brow]+nrow)*bs2*sizeof(Scalar)); \
160         PetscMemzero(new_a+bs2*(bi[brow]+nrow),bs2*CHUNKSIZE*sizeof(Scalar)); \
161         PetscMemcpy(new_a+bs2*(bi[brow]+nrow+CHUNKSIZE), \
162                     ba+bs2*(bi[brow]+nrow),bs2*len*sizeof(Scalar));  \
163         /* free up old matrix storage */ \
164         PetscFree(b->a);  \
165         if (!b->singlemalloc) {PetscFree(b->i);PetscFree(b->j);} \
166         ba = b->a = new_a; bi = b->i = new_i; bj = b->j = new_j;  \
167         b->singlemalloc = 1; \
168  \
169         rp   = bj + bi[brow]; ap = ba + bs2*bi[brow]; \
170         rmax = bimax[brow] = bimax[brow] + CHUNKSIZE; \
171         PLogObjectMemory(B,CHUNKSIZE*(sizeof(int) + bs2*sizeof(Scalar))); \
172         b->maxnz += bs2*CHUNKSIZE; \
173         b->reallocs++; \
174         b->nz++; \
175       } \
176       N = nrow++ - 1;  \
177       /* shift up all the later entries in this row */ \
178       for ( ii=N; ii>=_i; ii-- ) { \
179         rp[ii+1] = rp[ii]; \
180         PetscMemcpy(ap+bs2*(ii+1),ap+bs2*(ii),bs2*sizeof(Scalar)); \
181       } \
182       if (N>=_i) PetscMemzero(ap+bs2*_i,bs2*sizeof(Scalar));  \
183       rp[_i]                      = bcol;  \
184       ap[bs2*_i + bs*cidx + ridx] = value;  \
185       b_noinsert:; \
186     bilen[brow] = nrow; \
187 }
188 
189 #undef __FUNC__
190 #define __FUNC__ "MatSetValues_MPIBAIJ"
191 int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
192 {
193   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
194   Scalar      value;
195   int         ierr,i,j,row,col;
196   int         roworiented = baij->roworiented,rstart_orig=baij->rstart_bs ;
197   int         rend_orig=baij->rend_bs,cstart_orig=baij->cstart_bs;
198   int         cend_orig=baij->cend_bs,bs=baij->bs;
199 
200   /* Some Variables required in the macro */
201   Mat         A = baij->A;
202   Mat_SeqBAIJ *a = (Mat_SeqBAIJ *) (A)->data;
203   int         *aimax=a->imax,*ai=a->i,*ailen=a->ilen,*aj=a->j;
204   Scalar      *aa=a->a;
205 
206   Mat         B = baij->B;
207   Mat_SeqBAIJ *b = (Mat_SeqBAIJ *) (B)->data;
208   int         *bimax=b->imax,*bi=b->i,*bilen=b->ilen,*bj=b->j;
209   Scalar      *ba=b->a;
210 
211   int         *rp,ii,nrow,_i,rmax,N,brow,bcol;
212   int         low,high,t,ridx,cidx,bs2=a->bs2;
213   Scalar      *ap,*bap;
214 
215   PetscFunctionBegin;
216   for ( i=0; i<m; i++ ) {
217 #if defined(USE_PETSC_BOPT_g)
218     if (im[i] < 0) SETERRQ(1,0,"Negative row");
219     if (im[i] >= baij->M) SETERRQ(1,0,"Row too large");
220 #endif
221     if (im[i] >= rstart_orig && im[i] < rend_orig) {
222       row = im[i] - rstart_orig;
223       for ( j=0; j<n; j++ ) {
224         if (in[j] >= cstart_orig && in[j] < cend_orig){
225           col = in[j] - cstart_orig;
226           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
227           MatSetValues_SeqBAIJ_A_Private(row,col,value,addv);
228           /* ierr = MatSetValues_SeqBAIJ(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
229         }
230 #if defined(USE_PETSC_BOPT_g)
231         else if (in[j] < 0) {SETERRQ(1,0,"Negative column");}
232         else if (in[j] >= baij->N) {SETERRQ(1,0,"Col too large");}
233 #endif
234         else {
235           if (mat->was_assembled) {
236             if (!baij->colmap) {
237               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
238             }
239             col = baij->colmap[in[j]/bs] - 1 + in[j]%bs;
240             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
241               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
242               col =  in[j];
243               /* Reinitialize the variables required by MatSetValues_SeqBAIJ_B_Private() */
244               B = baij->B;
245               b = (Mat_SeqBAIJ *) (B)->data;
246               bimax=b->imax;bi=b->i;bilen=b->ilen;bj=b->j;
247               ba=b->a;
248             }
249           } else col = in[j];
250           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
251           MatSetValues_SeqBAIJ_B_Private(row,col,value,addv);
252           /* ierr = MatSetValues_SeqBAIJ(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
253         }
254       }
255     } else {
256       if (roworiented && !baij->donotstash) {
257         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
258       } else {
259         if (!baij->donotstash) {
260           row = im[i];
261 	  for ( j=0; j<n; j++ ) {
262 	    ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
263           }
264         }
265       }
266     }
267   }
268   PetscFunctionReturn(0);
269 }
270 
271 extern int MatSetValuesBlocked_SeqBAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
272 #undef __FUNC__
273 #define __FUNC__ "MatSetValuesBlocked_MPIBAIJ"
274 int MatSetValuesBlocked_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
275 {
276   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
277   Scalar      *value,*barray=baij->barray;
278   int         ierr,i,j,ii,jj,row,col,k,l;
279   int         roworiented = baij->roworiented,rstart=baij->rstart ;
280   int         rend=baij->rend,cstart=baij->cstart,stepval;
281   int         cend=baij->cend,bs=baij->bs,bs2=baij->bs2;
282 
283   if(!barray) {
284     baij->barray = barray = (Scalar*) PetscMalloc(bs2*sizeof(Scalar)); CHKPTRQ(barray);
285   }
286 
287   if (roworiented) {
288     stepval = (n-1)*bs;
289   } else {
290     stepval = (m-1)*bs;
291   }
292   for ( i=0; i<m; i++ ) {
293 #if defined(USE_PETSC_BOPT_g)
294     if (im[i] < 0) SETERRQ(1,0,"Negative row");
295     if (im[i] >= baij->Mbs) SETERRQ(1,0,"Row too large");
296 #endif
297     if (im[i] >= rstart && im[i] < rend) {
298       row = im[i] - rstart;
299       for ( j=0; j<n; j++ ) {
300         /* If NumCol = 1 then a copy is not required */
301         if ((roworiented) && (n == 1)) {
302           barray = v + i*bs2;
303         } else if((!roworiented) && (m == 1)) {
304           barray = v + j*bs2;
305         } else { /* Here a copy is required */
306           if (roworiented) {
307             value = v + i*(stepval+bs)*bs + j*bs;
308           } else {
309             value = v + j*(stepval+bs)*bs + i*bs;
310           }
311           for ( ii=0; ii<bs; ii++,value+=stepval ) {
312             for (jj=0; jj<bs; jj++ ) {
313               *barray++  = *value++;
314             }
315           }
316           barray -=bs2;
317         }
318 
319         if (in[j] >= cstart && in[j] < cend){
320           col  = in[j] - cstart;
321           ierr = MatSetValuesBlocked_SeqBAIJ(baij->A,1,&row,1,&col,barray,addv);CHKERRQ(ierr);
322         }
323 #if defined(USE_PETSC_BOPT_g)
324         else if (in[j] < 0) {SETERRQ(1,0,"Negative column");}
325         else if (in[j] >= baij->Nbs) {SETERRQ(1,0,"Column too large");}
326 #endif
327         else {
328           if (mat->was_assembled) {
329             if (!baij->colmap) {
330               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
331             }
332 
333 #if defined(USE_PETSC_BOPT_g)
334             if ((baij->colmap[in[j]] - 1) % bs) {SETERRQ(1,0,"Incorrect colmap");}
335 #endif
336             col = (baij->colmap[in[j]] - 1)/bs;
337             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
338               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
339               col =  in[j];
340             }
341           }
342           else col = in[j];
343           ierr = MatSetValuesBlocked_SeqBAIJ(baij->B,1,&row,1,&col,barray,addv);CHKERRQ(ierr);
344         }
345       }
346     } else {
347       if (!baij->donotstash) {
348         if (roworiented ) {
349           row   = im[i]*bs;
350           value = v + i*(stepval+bs)*bs;
351           for ( j=0; j<bs; j++,row++ ) {
352             for ( k=0; k<n; k++ ) {
353               for ( col=in[k]*bs,l=0; l<bs; l++,col++) {
354                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
355               }
356             }
357           }
358         } else {
359           for ( j=0; j<n; j++ ) {
360             value = v + j*(stepval+bs)*bs + i*bs;
361             col   = in[j]*bs;
362             for ( k=0; k<bs; k++,col++,value+=stepval) {
363               for ( row = im[i]*bs,l=0; l<bs; l++,row++) {
364                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
365               }
366             }
367           }
368         }
369       }
370     }
371   }
372   PetscFunctionReturn(0);
373 }
374 
375 #undef __FUNC__
376 #define __FUNC__ "MatGetValues_MPIBAIJ"
377 int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
378 {
379   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
380   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
381   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col;
382 
383   for ( i=0; i<m; i++ ) {
384     if (idxm[i] < 0) SETERRQ(1,0,"Negative row");
385     if (idxm[i] >= baij->M) SETERRQ(1,0,"Row too large");
386     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
387       row = idxm[i] - bsrstart;
388       for ( j=0; j<n; j++ ) {
389         if (idxn[j] < 0) SETERRQ(1,0,"Negative column");
390         if (idxn[j] >= baij->N) SETERRQ(1,0,"Col too large");
391         if (idxn[j] >= bscstart && idxn[j] < bscend){
392           col = idxn[j] - bscstart;
393           ierr = MatGetValues(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
394         } else {
395           if (!baij->colmap) {
396             ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
397           }
398           if((baij->colmap[idxn[j]/bs]-1 < 0) ||
399              (baij->garray[(baij->colmap[idxn[j]/bs]-1)/bs] != idxn[j]/bs)) *(v+i*n+j) = 0.0;
400           else {
401             col  = (baij->colmap[idxn[j]/bs]-1) + idxn[j]%bs;
402             ierr = MatGetValues(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
403           }
404         }
405       }
406     } else {
407       SETERRQ(1,0,"Only local values currently supported");
408     }
409   }
410   PetscFunctionReturn(0);
411 }
412 
413 #undef __FUNC__
414 #define __FUNC__ "MatNorm_MPIBAIJ"
415 int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
416 {
417   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
418   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
419   int        ierr, i,bs2=baij->bs2;
420   double     sum = 0.0;
421   Scalar     *v;
422 
423   PetscFunctionBegin;
424   if (baij->size == 1) {
425     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
426   } else {
427     if (type == NORM_FROBENIUS) {
428       v = amat->a;
429       for (i=0; i<amat->nz*bs2; i++ ) {
430 #if defined(USE_PETSC_COMPLEX)
431         sum += real(conj(*v)*(*v)); v++;
432 #else
433         sum += (*v)*(*v); v++;
434 #endif
435       }
436       v = bmat->a;
437       for (i=0; i<bmat->nz*bs2; i++ ) {
438 #if defined(USE_PETSC_COMPLEX)
439         sum += real(conj(*v)*(*v)); v++;
440 #else
441         sum += (*v)*(*v); v++;
442 #endif
443       }
444       ierr = MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);CHKERRQ(ierr);
445       *norm = sqrt(*norm);
446     } else {
447       SETERRQ(PETSC_ERR_SUP,0,"No support for this norm yet");
448     }
449   }
450   PetscFunctionReturn(0);
451 }
452 
453 #undef __FUNC__
454 #define __FUNC__ "MatAssemblyBegin_MPIBAIJ"
455 int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
456 {
457   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
458   MPI_Comm    comm = mat->comm;
459   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
460   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
461   MPI_Request *send_waits,*recv_waits;
462   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
463   InsertMode  addv;
464   Scalar      *rvalues,*svalues;
465 
466   PetscFunctionBegin;
467   /* make sure all processors are either in INSERTMODE or ADDMODE */
468   ierr = MPI_Allreduce(&mat->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);CHKERRQ(ierr);
469   if (addv == (ADD_VALUES|INSERT_VALUES)) {
470     SETERRQ(1,0,"Some processors inserted others added");
471   }
472   mat->insertmode = addv; /* in case this processor had no cache */
473 
474   /*  first count number of contributors to each processor */
475   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
476   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
477   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
478   for ( i=0; i<baij->stash.n; i++ ) {
479     idx = baij->stash.idx[i];
480     for ( j=0; j<size; j++ ) {
481       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
482         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
483       }
484     }
485   }
486   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
487 
488   /* inform other processors of number of messages and max length*/
489   work      = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
490   ierr      = MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
491   nreceives = work[rank];
492   ierr      = MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);CHKERRQ(ierr);
493   nmax      = work[rank];
494   PetscFree(work);
495 
496   /* post receives:
497        1) each message will consist of ordered pairs
498      (global index,value) we store the global index as a double
499      to simplify the message passing.
500        2) since we don't know how long each individual message is we
501      allocate the largest needed buffer for each receive. Potentially
502      this is a lot of wasted space.
503 
504 
505        This could be done better.
506   */
507   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));CHKPTRQ(rvalues);
508   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
509   for ( i=0; i<nreceives; i++ ) {
510     ierr = MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,comm,recv_waits+i);CHKERRQ(ierr);
511   }
512 
513   /* do sends:
514       1) starts[i] gives the starting index in svalues for stuff going to
515          the ith processor
516   */
517   svalues = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
518   send_waits = (MPI_Request *) PetscMalloc((nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
519   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
520   starts[0] = 0;
521   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
522   for ( i=0; i<baij->stash.n; i++ ) {
523     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
524     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
525     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
526   }
527   PetscFree(owner);
528   starts[0] = 0;
529   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
530   count = 0;
531   for ( i=0; i<size; i++ ) {
532     if (procs[i]) {
533       ierr = MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,comm,send_waits+count++);CHKERRQ(ierr);
534     }
535   }
536   PetscFree(starts); PetscFree(nprocs);
537 
538   /* Free cache space */
539   PLogInfo(mat,"MatAssemblyBegin_MPIBAIJ:Number of off-processor values %d\n",baij->stash.n);
540   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
541 
542   baij->svalues    = svalues;    baij->rvalues    = rvalues;
543   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
544   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
545   baij->rmax       = nmax;
546 
547   PetscFunctionReturn(0);
548 }
549 #include <math.h>
550 #define HASH_KEY 0.6180339887
551 #define HASH1(size,key) ((int)((size)*fmod(((key)*HASH_KEY),1)))
552 #define HASH2(size,key) ((int)((size)*fmod(((key+0.5)*HASH_KEY),1)))
553 
554 
555 int CreateHashTable(Mat mat,double factor)
556 {
557   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
558   Mat         A = baij->A, B=baij->B;
559   Mat_SeqBAIJ *a=(Mat_SeqBAIJ *)A->data, *b=(Mat_SeqBAIJ *)B->data;
560   int         i,j,k,nz=a->nz+b->nz,h1,h2,*ai=a->i,*aj=a->j,*bi=b->i,*bj=b->j;
561   int         size=(int)(factor*nz),ct=0,max1=0,max2=0;
562   /* Scalar      *aa=a->a,*ba=b->a; */
563   double      key;
564   static double *HT;
565   static      int flag=1;
566   extern int PetscGlobalRank;
567 
568   PetscFunctionBegin;
569   flag = 1;
570   /* Allocate Memory for Hash Table */
571   if (flag) {
572     HT = (double*)PetscMalloc(size*sizeof(double));
573     flag = 0;
574   }
575   PetscMemzero(HT,size*sizeof(double));
576 
577   /* Loop Over A */
578   for ( i=0; i<a->n; i++ ) {
579     for ( j=ai[i]; j<ai[i+1]; j++ ) {
580       key = i*baij->n+aj[j]+1;
581       h1  = HASH1(size,key);
582       h2  = HASH2(size,key);
583 
584       for ( k=1; k<size; k++ ){
585         if (HT[(h1*k)%size] == 0.0) {
586           HT[(h1*k)%size] = key;
587           break;
588         } else ct++;
589       }
590       if (k> max1) max1 =k;
591     }
592   }
593   /* Loop Over B */
594   for ( i=0; i<b->n; i++ ) {
595     for ( j=bi[i]; j<bi[i+1]; j++ ) {
596       key = i*b->n+bj[j]+1;
597       h1  = HASH1(size,key);
598       for ( k=1; k<size; k++ ){
599         if (HT[(h1*k)%size] == 0.0) {
600           HT[(h1*k)%size] = key;
601           break;
602         } else ct++;
603       }
604       if (k> max2) max2 =k;
605     }
606   }
607 
608   /* Print Summary */
609   for ( i=0,key=0.0,j=0; i<size; i++)
610     if (HT[i]) {j++;}
611 
612   printf("[%d] fact = %3.2f max1 = %5d max2 = %5d Size %5d - Searches %5d Avg %5.2f Keys %5d\n",
613          PetscGlobalRank,factor,max1,max2,size,ct+j,((float)ct+j)/j,j);
614   PetscFree(HT);
615   PetscFunctionReturn(0);
616 }
617 
618 #undef __FUNC__
619 #define __FUNC__ "MatAssemblyEnd_MPIBAIJ"
620 int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
621 {
622   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
623   MPI_Status  *send_status,recv_status;
624   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
625   int         bs=baij->bs,row,col,other_disassembled,flg;
626   Scalar      *values,val;
627   InsertMode  addv = mat->insertmode;
628 
629   PetscFunctionBegin;
630   /*  wait on receives */
631   while (count) {
632     ierr = MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);CHKERRQ(ierr);
633     /* unpack receives into our local space */
634     values = baij->rvalues + 3*imdex*baij->rmax;
635     ierr = MPI_Get_count(&recv_status,MPIU_SCALAR,&n);CHKERRQ(ierr);
636     n = n/3;
637     for ( i=0; i<n; i++ ) {
638       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
639       col = (int) PetscReal(values[3*i+1]);
640       val = values[3*i+2];
641       if (col >= baij->cstart*bs && col < baij->cend*bs) {
642         col -= baij->cstart*bs;
643         ierr = MatSetValues(baij->A,1,&row,1,&col,&val,addv); CHKERRQ(ierr)
644       } else {
645         if (mat->was_assembled) {
646           if (!baij->colmap) {
647             ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);
648           }
649           col = (baij->colmap[col/bs]) - 1 + col%bs;
650           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
651             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
652             col = (int) PetscReal(values[3*i+1]);
653           }
654         }
655         ierr = MatSetValues(baij->B,1,&row,1,&col,&val,addv); CHKERRQ(ierr)
656       }
657     }
658     count--;
659   }
660   PetscFree(baij->recv_waits); PetscFree(baij->rvalues);
661 
662   /* wait on sends */
663   if (baij->nsends) {
664     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
665     ierr        = MPI_Waitall(baij->nsends,baij->send_waits,send_status);CHKERRQ(ierr);
666     PetscFree(send_status);
667   }
668   PetscFree(baij->send_waits); PetscFree(baij->svalues);
669 
670   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
671   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
672 
673   /* determine if any processor has disassembled, if so we must
674      also disassemble ourselfs, in order that we may reassemble. */
675   ierr = MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);CHKERRQ(ierr);
676   if (mat->was_assembled && !other_disassembled) {
677     ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
678   }
679 
680   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
681     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
682   }
683   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
684   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
685 
686   ierr = OptionsHasName(PETSC_NULL,"-use_hash",&flg); CHKERRQ(ierr);
687   if (flg) {
688     double fact;
689     for ( fact=1.2; fact<2.0; fact +=0.05) CreateHashTable(mat,fact);
690   }
691   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
692   PetscFunctionReturn(0);
693 }
694 
695 #undef __FUNC__
696 #define __FUNC__ "MatView_MPIBAIJ_Binary"
697 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
698 {
699   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
700   int          ierr;
701 
702   PetscFunctionBegin;
703   if (baij->size == 1) {
704     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
705   } else SETERRQ(1,0,"Only uniprocessor output supported");
706   PetscFunctionReturn(0);
707 }
708 
709 #undef __FUNC__
710 #define __FUNC__ "MatView_MPIBAIJ_ASCIIorDraworMatlab"
711 static int MatView_MPIBAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
712 {
713   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
714   int          ierr, format,rank,bs = baij->bs;
715   FILE         *fd;
716   ViewerType   vtype;
717 
718   PetscFunctionBegin;
719   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
720   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
721     ierr = ViewerGetFormat(viewer,&format);
722     if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
723       MatInfo info;
724       MPI_Comm_rank(mat->comm,&rank);
725       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
726       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
727       PetscSequentialPhaseBegin(mat->comm,1);
728       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
729               rank,baij->m,(int)info.nz_used*bs,(int)info.nz_allocated*bs,
730               baij->bs,(int)info.memory);
731       ierr = MatGetInfo(baij->A,MAT_LOCAL,&info);
732       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
733       ierr = MatGetInfo(baij->B,MAT_LOCAL,&info);
734       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
735       fflush(fd);
736       PetscSequentialPhaseEnd(mat->comm,1);
737       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
738       PetscFunctionReturn(0);
739     } else if (format == VIEWER_FORMAT_ASCII_INFO) {
740       PetscPrintf(mat->comm,"  block size is %d\n",bs);
741       PetscFunctionReturn(0);
742     }
743   }
744 
745   if (vtype == DRAW_VIEWER) {
746     Draw       draw;
747     PetscTruth isnull;
748     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
749     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) PetscFunctionReturn(0);
750   }
751 
752   if (vtype == ASCII_FILE_VIEWER) {
753     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
754     PetscSequentialPhaseBegin(mat->comm,1);
755     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
756            baij->rank,baij->m,baij->rstart*bs,baij->rend*bs,baij->n,
757             baij->cstart*bs,baij->cend*bs);
758     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
759     ierr = MatView(baij->B,viewer); CHKERRQ(ierr);
760     fflush(fd);
761     PetscSequentialPhaseEnd(mat->comm,1);
762   } else {
763     int size = baij->size;
764     rank = baij->rank;
765     if (size == 1) {
766       ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
767     } else {
768       /* assemble the entire matrix onto first processor. */
769       Mat         A;
770       Mat_SeqBAIJ *Aloc;
771       int         M = baij->M, N = baij->N,*ai,*aj,row,col,i,j,k,*rvals;
772       int         mbs=baij->mbs;
773       Scalar      *a;
774 
775       if (!rank) {
776         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
777         CHKERRQ(ierr);
778       } else {
779         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
780         CHKERRQ(ierr);
781       }
782       PLogObjectParent(mat,A);
783 
784       /* copy over the A part */
785       Aloc = (Mat_SeqBAIJ*) baij->A->data;
786       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
787       row = baij->rstart;
788       rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
789 
790       for ( i=0; i<mbs; i++ ) {
791         rvals[0] = bs*(baij->rstart + i);
792         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
793         for ( j=ai[i]; j<ai[i+1]; j++ ) {
794           col = (baij->cstart+aj[j])*bs;
795           for (k=0; k<bs; k++ ) {
796             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
797             col++; a += bs;
798           }
799         }
800       }
801       /* copy over the B part */
802       Aloc = (Mat_SeqBAIJ*) baij->B->data;
803       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
804       row = baij->rstart*bs;
805       for ( i=0; i<mbs; i++ ) {
806         rvals[0] = bs*(baij->rstart + i);
807         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
808         for ( j=ai[i]; j<ai[i+1]; j++ ) {
809           col = baij->garray[aj[j]]*bs;
810           for (k=0; k<bs; k++ ) {
811             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
812             col++; a += bs;
813           }
814         }
815       }
816       PetscFree(rvals);
817       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
818       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
819       if (!rank) {
820         ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
821       }
822       ierr = MatDestroy(A); CHKERRQ(ierr);
823     }
824   }
825   PetscFunctionReturn(0);
826 }
827 
828 
829 
830 #undef __FUNC__
831 #define __FUNC__ "MatView_MPIBAIJ"
832 int MatView_MPIBAIJ(PetscObject obj,Viewer viewer)
833 {
834   Mat         mat = (Mat) obj;
835   int         ierr;
836   ViewerType  vtype;
837 
838   PetscFunctionBegin;
839   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
840   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
841       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
842     ierr = MatView_MPIBAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
843   } else if (vtype == BINARY_FILE_VIEWER) {
844     ierr = MatView_MPIBAIJ_Binary(mat,viewer);CHKERRQ(ierr);
845   }
846   PetscFunctionReturn(0);
847 }
848 
849 #undef __FUNC__
850 #define __FUNC__ "MatDestroy_MPIBAIJ"
851 int MatDestroy_MPIBAIJ(PetscObject obj)
852 {
853   Mat         mat = (Mat) obj;
854   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
855   int         ierr;
856 
857   PetscFunctionBegin;
858 #if defined(USE_PETSC_LOG)
859   PLogObjectState(obj,"Rows=%d, Cols=%d",baij->M,baij->N);
860 #endif
861 
862   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
863   PetscFree(baij->rowners);
864   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
865   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
866   if (baij->colmap) PetscFree(baij->colmap);
867   if (baij->garray) PetscFree(baij->garray);
868   if (baij->lvec)   VecDestroy(baij->lvec);
869   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
870   if (baij->rowvalues) PetscFree(baij->rowvalues);
871   if (baij->barray) PetscFree(baij->barray);
872   PetscFree(baij);
873   PLogObjectDestroy(mat);
874   PetscHeaderDestroy(mat);
875   PetscFunctionReturn(0);
876 }
877 
878 #undef __FUNC__
879 #define __FUNC__ "MatMult_MPIBAIJ"
880 int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
881 {
882   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
883   int         ierr, nt;
884 
885   PetscFunctionBegin;
886   VecGetLocalSize_Fast(xx,nt);
887   if (nt != a->n) {
888     SETERRQ(1,0,"Incompatible partition of A and xx");
889   }
890   VecGetLocalSize_Fast(yy,nt);
891   if (nt != a->m) {
892     SETERRQ(1,0,"Incompatible parition of A and yy");
893   }
894   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
895   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
896   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
897   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
898   ierr = VecScatterPostRecvs(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
899   PetscFunctionReturn(0);
900 }
901 
902 #undef __FUNC__
903 #define __FUNC__ "MatMultAdd_MPIBAIJ"
904 int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
905 {
906   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
907   int        ierr;
908 
909   PetscFunctionBegin;
910   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
911   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
912   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
913   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
914   PetscFunctionReturn(0);
915 }
916 
917 #undef __FUNC__
918 #define __FUNC__ "MatMultTrans_MPIBAIJ"
919 int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
920 {
921   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
922   int         ierr;
923 
924   PetscFunctionBegin;
925   /* do nondiagonal part */
926   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
927   /* send it on its way */
928   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
929   /* do local part */
930   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
931   /* receive remote parts: note this assumes the values are not actually */
932   /* inserted in yy until the next line, which is true for my implementation*/
933   /* but is not perhaps always true. */
934   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
935   PetscFunctionReturn(0);
936 }
937 
938 #undef __FUNC__
939 #define __FUNC__ "MatMultTransAdd_MPIBAIJ"
940 int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
941 {
942   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
943   int         ierr;
944 
945   PetscFunctionBegin;
946   /* do nondiagonal part */
947   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
948   /* send it on its way */
949   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
950   /* do local part */
951   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
952   /* receive remote parts: note this assumes the values are not actually */
953   /* inserted in yy until the next line, which is true for my implementation*/
954   /* but is not perhaps always true. */
955   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
956   PetscFunctionReturn(0);
957 }
958 
959 /*
960   This only works correctly for square matrices where the subblock A->A is the
961    diagonal block
962 */
963 #undef __FUNC__
964 #define __FUNC__ "MatGetDiagonal_MPIBAIJ"
965 int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
966 {
967   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
968   int         ierr;
969 
970   PetscFunctionBegin;
971   if (a->M != a->N) SETERRQ(1,0,"Supports only square matrix where A->A is diag block");
972   ierr = MatGetDiagonal(a->A,v);CHKERRQ(ierr);
973   PetscFunctionReturn(0);
974 }
975 
976 #undef __FUNC__
977 #define __FUNC__ "MatScale_MPIBAIJ"
978 int MatScale_MPIBAIJ(Scalar *aa,Mat A)
979 {
980   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
981   int         ierr;
982 
983   PetscFunctionBegin;
984   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
985   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
986   PetscFunctionReturn(0);
987 }
988 
989 #undef __FUNC__
990 #define __FUNC__ "MatGetSize_MPIBAIJ"
991 int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
992 {
993   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
994 
995   PetscFunctionBegin;
996   if (m) *m = mat->M;
997   if (n) *n = mat->N;
998   PetscFunctionReturn(0);
999 }
1000 
1001 #undef __FUNC__
1002 #define __FUNC__ "MatGetLocalSize_MPIBAIJ"
1003 int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
1004 {
1005   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1006 
1007   PetscFunctionBegin;
1008   *m = mat->m; *n = mat->N;
1009   PetscFunctionReturn(0);
1010 }
1011 
1012 #undef __FUNC__
1013 #define __FUNC__ "MatGetOwnershipRange_MPIBAIJ"
1014 int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
1015 {
1016   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1017 
1018   PetscFunctionBegin;
1019   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
1020   PetscFunctionReturn(0);
1021 }
1022 
1023 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
1024 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
1025 
1026 #undef __FUNC__
1027 #define __FUNC__ "MatGetRow_MPIBAIJ"
1028 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
1029 {
1030   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1031   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
1032   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
1033   int        nztot, nzA, nzB, lrow, brstart = mat->rstart*bs, brend = mat->rend*bs;
1034   int        *cmap, *idx_p,cstart = mat->cstart;
1035 
1036   PetscFunctionBegin;
1037   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,0,"Already active");
1038   mat->getrowactive = PETSC_TRUE;
1039 
1040   if (!mat->rowvalues && (idx || v)) {
1041     /*
1042         allocate enough space to hold information from the longest row.
1043     */
1044     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
1045     int     max = 1,mbs = mat->mbs,tmp;
1046     for ( i=0; i<mbs; i++ ) {
1047       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
1048       if (max < tmp) { max = tmp; }
1049     }
1050     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
1051     CHKPTRQ(mat->rowvalues);
1052     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
1053   }
1054 
1055   if (row < brstart || row >= brend) SETERRQ(1,0,"Only local rows")
1056   lrow = row - brstart;
1057 
1058   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
1059   if (!v)   {pvA = 0; pvB = 0;}
1060   if (!idx) {pcA = 0; if (!v) pcB = 0;}
1061   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1062   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1063   nztot = nzA + nzB;
1064 
1065   cmap  = mat->garray;
1066   if (v  || idx) {
1067     if (nztot) {
1068       /* Sort by increasing column numbers, assuming A and B already sorted */
1069       int imark = -1;
1070       if (v) {
1071         *v = v_p = mat->rowvalues;
1072         for ( i=0; i<nzB; i++ ) {
1073           if (cmap[cworkB[i]/bs] < cstart)   v_p[i] = vworkB[i];
1074           else break;
1075         }
1076         imark = i;
1077         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
1078         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
1079       }
1080       if (idx) {
1081         *idx = idx_p = mat->rowindices;
1082         if (imark > -1) {
1083           for ( i=0; i<imark; i++ ) {
1084             idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs;
1085           }
1086         } else {
1087           for ( i=0; i<nzB; i++ ) {
1088             if (cmap[cworkB[i]/bs] < cstart)
1089               idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
1090             else break;
1091           }
1092           imark = i;
1093         }
1094         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart*bs + cworkA[i];
1095         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
1096       }
1097     } else {
1098       if (idx) *idx = 0;
1099       if (v)   *v   = 0;
1100     }
1101   }
1102   *nz = nztot;
1103   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1104   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1105   PetscFunctionReturn(0);
1106 }
1107 
1108 #undef __FUNC__
1109 #define __FUNC__ "MatRestoreRow_MPIBAIJ"
1110 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
1111 {
1112   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1113 
1114   PetscFunctionBegin;
1115   if (baij->getrowactive == PETSC_FALSE) {
1116     SETERRQ(1,0,"MatGetRow not called");
1117   }
1118   baij->getrowactive = PETSC_FALSE;
1119   PetscFunctionReturn(0);
1120 }
1121 
1122 #undef __FUNC__
1123 #define __FUNC__ "MatGetBlockSize_MPIBAIJ"
1124 int MatGetBlockSize_MPIBAIJ(Mat mat,int *bs)
1125 {
1126   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1127 
1128   PetscFunctionBegin;
1129   *bs = baij->bs;
1130   PetscFunctionReturn(0);
1131 }
1132 
1133 #undef __FUNC__
1134 #define __FUNC__ "MatZeroEntries_MPIBAIJ"
1135 int MatZeroEntries_MPIBAIJ(Mat A)
1136 {
1137   Mat_MPIBAIJ *l = (Mat_MPIBAIJ *) A->data;
1138   int         ierr;
1139 
1140   PetscFunctionBegin;
1141   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
1142   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
1143   PetscFunctionReturn(0);
1144 }
1145 
1146 #undef __FUNC__
1147 #define __FUNC__ "MatGetInfo_MPIBAIJ"
1148 int MatGetInfo_MPIBAIJ(Mat matin,MatInfoType flag,MatInfo *info)
1149 {
1150   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) matin->data;
1151   Mat         A = a->A, B = a->B;
1152   int         ierr;
1153   double      isend[5], irecv[5];
1154 
1155   PetscFunctionBegin;
1156   info->rows_global    = (double)a->M;
1157   info->columns_global = (double)a->N;
1158   info->rows_local     = (double)a->m;
1159   info->columns_local  = (double)a->N;
1160   info->block_size     = (double)a->bs;
1161   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
1162   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->memory;
1163   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
1164   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->memory;
1165   if (flag == MAT_LOCAL) {
1166     info->nz_used      = isend[0];
1167     info->nz_allocated = isend[1];
1168     info->nz_unneeded  = isend[2];
1169     info->memory       = isend[3];
1170     info->mallocs      = isend[4];
1171   } else if (flag == MAT_GLOBAL_MAX) {
1172     ierr = MPI_Allreduce(isend,irecv,5,MPI_INT,MPI_MAX,matin->comm);CHKERRQ(ierr);
1173     info->nz_used      = irecv[0];
1174     info->nz_allocated = irecv[1];
1175     info->nz_unneeded  = irecv[2];
1176     info->memory       = irecv[3];
1177     info->mallocs      = irecv[4];
1178   } else if (flag == MAT_GLOBAL_SUM) {
1179     ierr = MPI_Allreduce(isend,irecv,5,MPI_INT,MPI_SUM,matin->comm);CHKERRQ(ierr);
1180     info->nz_used      = irecv[0];
1181     info->nz_allocated = irecv[1];
1182     info->nz_unneeded  = irecv[2];
1183     info->memory       = irecv[3];
1184     info->mallocs      = irecv[4];
1185   }
1186   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
1187   info->fill_ratio_needed = 0;
1188   info->factor_mallocs    = 0;
1189   PetscFunctionReturn(0);
1190 }
1191 
1192 #undef __FUNC__
1193 #define __FUNC__ "MatSetOption_MPIBAIJ"
1194 int MatSetOption_MPIBAIJ(Mat A,MatOption op)
1195 {
1196   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1197 
1198   PetscFunctionBegin;
1199   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
1200       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
1201       op == MAT_COLUMNS_UNSORTED ||
1202       op == MAT_COLUMNS_SORTED ||
1203       op == MAT_NEW_NONZERO_ALLOCATION_ERROR ||
1204       op == MAT_NEW_NONZERO_LOCATION_ERROR) {
1205         MatSetOption(a->A,op);
1206         MatSetOption(a->B,op);
1207   } else if (op == MAT_ROW_ORIENTED) {
1208         a->roworiented = 1;
1209         MatSetOption(a->A,op);
1210         MatSetOption(a->B,op);
1211   } else if (op == MAT_ROWS_SORTED ||
1212              op == MAT_ROWS_UNSORTED ||
1213              op == MAT_SYMMETRIC ||
1214              op == MAT_STRUCTURALLY_SYMMETRIC ||
1215              op == MAT_YES_NEW_DIAGONALS)
1216     PLogInfo(A,"Info:MatSetOption_MPIBAIJ:Option ignored\n");
1217   else if (op == MAT_COLUMN_ORIENTED) {
1218     a->roworiented = 0;
1219     MatSetOption(a->A,op);
1220     MatSetOption(a->B,op);
1221   } else if (op == MAT_IGNORE_OFF_PROC_ENTRIES) {
1222     a->donotstash = 1;
1223   } else if (op == MAT_NO_NEW_DIAGONALS) {
1224     SETERRQ(PETSC_ERR_SUP,0,"MAT_NO_NEW_DIAGONALS");
1225   } else {
1226     SETERRQ(PETSC_ERR_SUP,0,"unknown option");
1227   }
1228   PetscFunctionReturn(0);
1229 }
1230 
1231 #undef __FUNC__
1232 #define __FUNC__ "MatTranspose_MPIBAIJ("
1233 int MatTranspose_MPIBAIJ(Mat A,Mat *matout)
1234 {
1235   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) A->data;
1236   Mat_SeqBAIJ *Aloc;
1237   Mat        B;
1238   int        ierr,M=baij->M,N=baij->N,*ai,*aj,row,i,*rvals,j,k,col;
1239   int        bs=baij->bs,mbs=baij->mbs;
1240   Scalar     *a;
1241 
1242   PetscFunctionBegin;
1243   if (matout == PETSC_NULL && M != N) SETERRQ(1,0,"Square matrix only for in-place");
1244   ierr = MatCreateMPIBAIJ(A->comm,baij->bs,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);
1245   CHKERRQ(ierr);
1246 
1247   /* copy over the A part */
1248   Aloc = (Mat_SeqBAIJ*) baij->A->data;
1249   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1250   row = baij->rstart;
1251   rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
1252 
1253   for ( i=0; i<mbs; i++ ) {
1254     rvals[0] = bs*(baij->rstart + i);
1255     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1256     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1257       col = (baij->cstart+aj[j])*bs;
1258       for (k=0; k<bs; k++ ) {
1259         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1260         col++; a += bs;
1261       }
1262     }
1263   }
1264   /* copy over the B part */
1265   Aloc = (Mat_SeqBAIJ*) baij->B->data;
1266   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1267   row = baij->rstart*bs;
1268   for ( i=0; i<mbs; i++ ) {
1269     rvals[0] = bs*(baij->rstart + i);
1270     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1271     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1272       col = baij->garray[aj[j]]*bs;
1273       for (k=0; k<bs; k++ ) {
1274         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1275         col++; a += bs;
1276       }
1277     }
1278   }
1279   PetscFree(rvals);
1280   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1281   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1282 
1283   if (matout != PETSC_NULL) {
1284     *matout = B;
1285   } else {
1286     /* This isn't really an in-place transpose .... but free data structures from baij */
1287     PetscFree(baij->rowners);
1288     ierr = MatDestroy(baij->A); CHKERRQ(ierr);
1289     ierr = MatDestroy(baij->B); CHKERRQ(ierr);
1290     if (baij->colmap) PetscFree(baij->colmap);
1291     if (baij->garray) PetscFree(baij->garray);
1292     if (baij->lvec) VecDestroy(baij->lvec);
1293     if (baij->Mvctx) VecScatterDestroy(baij->Mvctx);
1294     PetscFree(baij);
1295     PetscMemcpy(A,B,sizeof(struct _p_Mat));
1296     PetscHeaderDestroy(B);
1297   }
1298   PetscFunctionReturn(0);
1299 }
1300 
1301 #undef __FUNC__
1302 #define __FUNC__ "MatDiagonalScale_MPIBAIJ"
1303 int MatDiagonalScale_MPIBAIJ(Mat A,Vec ll,Vec rr)
1304 {
1305   Mat a = ((Mat_MPIBAIJ *) A->data)->A;
1306   Mat b = ((Mat_MPIBAIJ *) A->data)->B;
1307   int ierr,s1,s2,s3;
1308 
1309   PetscFunctionBegin;
1310   if (ll)  {
1311     ierr = VecGetLocalSize(ll,&s1); CHKERRQ(ierr);
1312     ierr = MatGetLocalSize(A,&s2,&s3); CHKERRQ(ierr);
1313     if (s1!=s2) SETERRQ(1,0,"non-conforming local sizes");
1314     ierr = MatDiagonalScale(a,ll,0); CHKERRQ(ierr);
1315     ierr = MatDiagonalScale(b,ll,0); CHKERRQ(ierr);
1316   }
1317   if (rr) SETERRQ(1,0,"not supported for right vector");
1318   PetscFunctionReturn(0);
1319 }
1320 
1321 /* the code does not do the diagonal entries correctly unless the
1322    matrix is square and the column and row owerships are identical.
1323    This is a BUG. The only way to fix it seems to be to access
1324    baij->A and baij->B directly and not through the MatZeroRows()
1325    routine.
1326 */
1327 #undef __FUNC__
1328 #define __FUNC__ "MatZeroRows_MPIBAIJ"
1329 int MatZeroRows_MPIBAIJ(Mat A,IS is,Scalar *diag)
1330 {
1331   Mat_MPIBAIJ    *l = (Mat_MPIBAIJ *) A->data;
1332   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
1333   int            *procs,*nprocs,j,found,idx,nsends,*work;
1334   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
1335   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
1336   int            *lens,imdex,*lrows,*values,bs=l->bs;
1337   MPI_Comm       comm = A->comm;
1338   MPI_Request    *send_waits,*recv_waits;
1339   MPI_Status     recv_status,*send_status;
1340   IS             istmp;
1341 
1342   PetscFunctionBegin;
1343   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
1344   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
1345 
1346   /*  first count number of contributors to each processor */
1347   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
1348   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
1349   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
1350   for ( i=0; i<N; i++ ) {
1351     idx = rows[i];
1352     found = 0;
1353     for ( j=0; j<size; j++ ) {
1354       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
1355         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
1356       }
1357     }
1358     if (!found) SETERRQ(1,0,"Index out of range");
1359   }
1360   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
1361 
1362   /* inform other processors of number of messages and max length*/
1363   work   = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
1364   ierr   = MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
1365   nrecvs = work[rank];
1366   ierr   = MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);CHKERRQ(ierr);
1367   nmax   = work[rank];
1368   PetscFree(work);
1369 
1370   /* post receives:   */
1371   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); CHKPTRQ(rvalues);
1372   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
1373   for ( i=0; i<nrecvs; i++ ) {
1374     ierr = MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);CHKERRQ(ierr);
1375   }
1376 
1377   /* do sends:
1378       1) starts[i] gives the starting index in svalues for stuff going to
1379          the ith processor
1380   */
1381   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
1382   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
1383   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
1384   starts[0] = 0;
1385   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1386   for ( i=0; i<N; i++ ) {
1387     svalues[starts[owner[i]]++] = rows[i];
1388   }
1389   ISRestoreIndices(is,&rows);
1390 
1391   starts[0] = 0;
1392   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1393   count = 0;
1394   for ( i=0; i<size; i++ ) {
1395     if (procs[i]) {
1396       ierr = MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);CHKERRQ(ierr);
1397     }
1398   }
1399   PetscFree(starts);
1400 
1401   base = owners[rank]*bs;
1402 
1403   /*  wait on receives */
1404   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
1405   source = lens + nrecvs;
1406   count  = nrecvs; slen = 0;
1407   while (count) {
1408     ierr = MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);CHKERRQ(ierr);
1409     /* unpack receives into our local space */
1410     ierr = MPI_Get_count(&recv_status,MPI_INT,&n);CHKERRQ(ierr);
1411     source[imdex]  = recv_status.MPI_SOURCE;
1412     lens[imdex]  = n;
1413     slen += n;
1414     count--;
1415   }
1416   PetscFree(recv_waits);
1417 
1418   /* move the data into the send scatter */
1419   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
1420   count = 0;
1421   for ( i=0; i<nrecvs; i++ ) {
1422     values = rvalues + i*nmax;
1423     for ( j=0; j<lens[i]; j++ ) {
1424       lrows[count++] = values[j] - base;
1425     }
1426   }
1427   PetscFree(rvalues); PetscFree(lens);
1428   PetscFree(owner); PetscFree(nprocs);
1429 
1430   /* actually zap the local rows */
1431   ierr = ISCreateGeneral(PETSC_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
1432   PLogObjectParent(A,istmp);
1433   PetscFree(lrows);
1434   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
1435   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
1436   ierr = ISDestroy(istmp); CHKERRQ(ierr);
1437 
1438   /* wait on sends */
1439   if (nsends) {
1440     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
1441     ierr        = MPI_Waitall(nsends,send_waits,send_status);CHKERRQ(ierr);
1442     PetscFree(send_status);
1443   }
1444   PetscFree(send_waits); PetscFree(svalues);
1445 
1446   PetscFunctionReturn(0);
1447 }
1448 extern int MatPrintHelp_SeqBAIJ(Mat);
1449 #undef __FUNC__
1450 #define __FUNC__ "MatPrintHelp_MPIBAIJ"
1451 int MatPrintHelp_MPIBAIJ(Mat A)
1452 {
1453   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1454   int         ierr;
1455 
1456   PetscFunctionBegin;
1457   if (!a->rank) {
1458     ierr = MatPrintHelp_SeqBAIJ(a->A);CHKERRQ(ierr);
1459   }
1460   PetscFunctionReturn(0);
1461 }
1462 
1463 #undef __FUNC__
1464 #define __FUNC__ "MatSetUnfactored_MPIBAIJ"
1465 int MatSetUnfactored_MPIBAIJ(Mat A)
1466 {
1467   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1468   int         ierr;
1469 
1470   PetscFunctionBegin;
1471   ierr = MatSetUnfactored(a->A); CHKERRQ(ierr);
1472   PetscFunctionReturn(0);
1473 }
1474 
1475 static int MatConvertSameType_MPIBAIJ(Mat,Mat *,int);
1476 
1477 /* -------------------------------------------------------------------*/
1478 static struct _MatOps MatOps = {
1479   MatSetValues_MPIBAIJ,MatGetRow_MPIBAIJ,MatRestoreRow_MPIBAIJ,MatMult_MPIBAIJ,
1480   MatMultAdd_MPIBAIJ,MatMultTrans_MPIBAIJ,MatMultTransAdd_MPIBAIJ,0,
1481   0,0,0,0,
1482   0,0,MatTranspose_MPIBAIJ,MatGetInfo_MPIBAIJ,
1483   0,MatGetDiagonal_MPIBAIJ,MatDiagonalScale_MPIBAIJ,MatNorm_MPIBAIJ,
1484   MatAssemblyBegin_MPIBAIJ,MatAssemblyEnd_MPIBAIJ,0,MatSetOption_MPIBAIJ,
1485   MatZeroEntries_MPIBAIJ,MatZeroRows_MPIBAIJ,0,
1486   0,0,0,MatGetSize_MPIBAIJ,
1487   MatGetLocalSize_MPIBAIJ,MatGetOwnershipRange_MPIBAIJ,0,0,
1488   0,0,MatConvertSameType_MPIBAIJ,0,0,
1489   0,0,0,MatGetSubMatrices_MPIBAIJ,
1490   MatIncreaseOverlap_MPIBAIJ,MatGetValues_MPIBAIJ,0,MatPrintHelp_MPIBAIJ,
1491   MatScale_MPIBAIJ,0,0,0,MatGetBlockSize_MPIBAIJ,
1492   0,0,0,0,0,0,MatSetUnfactored_MPIBAIJ,0,MatSetValuesBlocked_MPIBAIJ};
1493 
1494 
1495 #undef __FUNC__
1496 #define __FUNC__ "MatCreateMPIBAIJ"
1497 /*@C
1498    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
1499    (block compressed row).  For good matrix assembly performance
1500    the user should preallocate the matrix storage by setting the parameters
1501    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1502    performance can be increased by more than a factor of 50.
1503 
1504    Input Parameters:
1505 .  comm - MPI communicator
1506 .  bs   - size of blockk
1507 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1508            This value should be the same as the local size used in creating the
1509            y vector for the matrix-vector product y = Ax.
1510 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1511            This value should be the same as the local size used in creating the
1512            x vector for the matrix-vector product y = Ax.
1513 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1514 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1515 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
1516            submatrix  (same for all local rows)
1517 .  d_nzz - array containing the number of block nonzeros in the various block rows
1518            of the in diagonal portion of the local (possibly different for each block
1519            row) or PETSC_NULL.  You must leave room for the diagonal entry even if
1520            it is zero.
1521 .  o_nz  - number of block nonzeros per block row in the off-diagonal portion of local
1522            submatrix (same for all local rows).
1523 .  o_nzz - array containing the number of nonzeros in the various block rows of the
1524            off-diagonal portion of the local submatrix (possibly different for
1525            each block row) or PETSC_NULL.
1526 
1527    Output Parameter:
1528 .  A - the matrix
1529 
1530    Notes:
1531    The user MUST specify either the local or global matrix dimensions
1532    (possibly both).
1533 
1534    Storage Information:
1535    For a square global matrix we define each processor's diagonal portion
1536    to be its local rows and the corresponding columns (a square submatrix);
1537    each processor's off-diagonal portion encompasses the remainder of the
1538    local matrix (a rectangular submatrix).
1539 
1540    The user can specify preallocated storage for the diagonal part of
1541    the local submatrix with either d_nz or d_nnz (not both).  Set
1542    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1543    memory allocation.  Likewise, specify preallocated storage for the
1544    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1545 
1546    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1547    the figure below we depict these three local rows and all columns (0-11).
1548 
1549 $          0 1 2 3 4 5 6 7 8 9 10 11
1550 $         -------------------
1551 $  row 3  |  o o o d d d o o o o o o
1552 $  row 4  |  o o o d d d o o o o o o
1553 $  row 5  |  o o o d d d o o o o o o
1554 $         -------------------
1555 $
1556 
1557    Thus, any entries in the d locations are stored in the d (diagonal)
1558    submatrix, and any entries in the o locations are stored in the
1559    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1560    stored simply in the MATSEQBAIJ format for compressed row storage.
1561 
1562    Now d_nz should indicate the number of block nonzeros per row in the d matrix,
1563    and o_nz should indicate the number of block nonzeros per row in the o matrix.
1564    In general, for PDE problems in which most nonzeros are near the diagonal,
1565    one expects d_nz >> o_nz.   For large problems you MUST preallocate memory
1566    or you will get TERRIBLE performance; see the users' manual chapter on
1567    matrices.
1568 
1569 .keywords: matrix, block, aij, compressed row, sparse, parallel
1570 
1571 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues()
1572 @*/
1573 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
1574                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1575 {
1576   Mat          B;
1577   Mat_MPIBAIJ  *b;
1578   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE,size;
1579 
1580   PetscFunctionBegin;
1581   if (bs < 1) SETERRQ(1,0,"Invalid block size specified, must be positive");
1582 
1583   MPI_Comm_size(comm,&size);
1584   if (size == 1) {
1585     if (M == PETSC_DECIDE) M = m;
1586     if (N == PETSC_DECIDE) N = n;
1587     ierr = MatCreateSeqBAIJ(comm,bs,M,N,d_nz,d_nnz,A); CHKERRQ(ierr);
1588     PetscFunctionReturn(0);
1589   }
1590 
1591   *A = 0;
1592   PetscHeaderCreate(B,_p_Mat,MAT_COOKIE,MATMPIBAIJ,comm,MatDestroy,MatView);
1593   PLogObjectCreate(B);
1594   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
1595   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
1596   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1597 
1598   B->destroy    = MatDestroy_MPIBAIJ;
1599   B->view       = MatView_MPIBAIJ;
1600   B->mapping    = 0;
1601   B->factor     = 0;
1602   B->assembled  = PETSC_FALSE;
1603 
1604   B->insertmode = NOT_SET_VALUES;
1605   MPI_Comm_rank(comm,&b->rank);
1606   MPI_Comm_size(comm,&b->size);
1607 
1608   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL)) {
1609     SETERRQ(1,0,"Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1610   }
1611   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) SETERRQ(1,0,"either M or m should be specified");
1612   if ( N == PETSC_DECIDE && n == PETSC_DECIDE) SETERRQ(1,0,"either N or n should be specified");
1613   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
1614   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
1615 
1616   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1617     work[0] = m; work[1] = n;
1618     mbs = m/bs; nbs = n/bs;
1619     ierr = MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );CHKERRQ(ierr);
1620     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
1621     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
1622   }
1623   if (m == PETSC_DECIDE) {
1624     Mbs = M/bs;
1625     if (Mbs*bs != M) SETERRQ(1,0,"No of global rows must be divisible by blocksize");
1626     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
1627     m   = mbs*bs;
1628   }
1629   if (n == PETSC_DECIDE) {
1630     Nbs = N/bs;
1631     if (Nbs*bs != N) SETERRQ(1,0,"No of global cols must be divisible by blocksize");
1632     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
1633     n   = nbs*bs;
1634   }
1635   if (mbs*bs != m || nbs*bs != n) SETERRQ(1,0,"No of local rows, cols must be divisible by blocksize");
1636 
1637   b->m = m; B->m = m;
1638   b->n = n; B->n = n;
1639   b->N = N; B->N = N;
1640   b->M = M; B->M = M;
1641   b->bs  = bs;
1642   b->bs2 = bs*bs;
1643   b->mbs = mbs;
1644   b->nbs = nbs;
1645   b->Mbs = Mbs;
1646   b->Nbs = Nbs;
1647 
1648   /* build local table of row and column ownerships */
1649   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1650   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIBAIJ));
1651   b->cowners = b->rowners + b->size + 2;
1652   ierr = MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1653   b->rowners[0] = 0;
1654   for ( i=2; i<=b->size; i++ ) {
1655     b->rowners[i] += b->rowners[i-1];
1656   }
1657   b->rstart    = b->rowners[b->rank];
1658   b->rend      = b->rowners[b->rank+1];
1659   b->rstart_bs = b->rstart * bs;
1660   b->rend_bs   = b->rend * bs;
1661 
1662   ierr = MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1663   b->cowners[0] = 0;
1664   for ( i=2; i<=b->size; i++ ) {
1665     b->cowners[i] += b->cowners[i-1];
1666   }
1667   b->cstart    = b->cowners[b->rank];
1668   b->cend      = b->cowners[b->rank+1];
1669   b->cstart_bs = b->cstart * bs;
1670   b->cend_bs   = b->cend * bs;
1671 
1672   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1673   ierr = MatCreateSeqBAIJ(PETSC_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1674   PLogObjectParent(B,b->A);
1675   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1676   ierr = MatCreateSeqBAIJ(PETSC_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1677   PLogObjectParent(B,b->B);
1678 
1679   /* build cache for off array entries formed */
1680   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1681   b->donotstash  = 0;
1682   b->colmap      = 0;
1683   b->garray      = 0;
1684   b->roworiented = 1;
1685 
1686   /* stuff used in block assembly */
1687   b->barray      = 0;
1688 
1689   /* stuff used for matrix vector multiply */
1690   b->lvec        = 0;
1691   b->Mvctx       = 0;
1692 
1693   /* stuff for MatGetRow() */
1694   b->rowindices   = 0;
1695   b->rowvalues    = 0;
1696   b->getrowactive = PETSC_FALSE;
1697 
1698   *A = B;
1699   PetscFunctionReturn(0);
1700 }
1701 
1702 #undef __FUNC__
1703 #define __FUNC__ "MatConvertSameType_MPIBAIJ"
1704 static int MatConvertSameType_MPIBAIJ(Mat matin,Mat *newmat,int cpvalues)
1705 {
1706   Mat         mat;
1707   Mat_MPIBAIJ *a,*oldmat = (Mat_MPIBAIJ *) matin->data;
1708   int         ierr, len=0, flg;
1709 
1710   PetscFunctionBegin;
1711   *newmat       = 0;
1712   PetscHeaderCreate(mat,_p_Mat,MAT_COOKIE,MATMPIBAIJ,matin->comm,MatDestroy,MatView);
1713   PLogObjectCreate(mat);
1714   mat->data       = (void *) (a = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(a);
1715   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1716   mat->destroy    = MatDestroy_MPIBAIJ;
1717   mat->view       = MatView_MPIBAIJ;
1718   mat->factor     = matin->factor;
1719   mat->assembled  = PETSC_TRUE;
1720 
1721   a->m = mat->m   = oldmat->m;
1722   a->n = mat->n   = oldmat->n;
1723   a->M = mat->M   = oldmat->M;
1724   a->N = mat->N   = oldmat->N;
1725 
1726   a->bs  = oldmat->bs;
1727   a->bs2 = oldmat->bs2;
1728   a->mbs = oldmat->mbs;
1729   a->nbs = oldmat->nbs;
1730   a->Mbs = oldmat->Mbs;
1731   a->Nbs = oldmat->Nbs;
1732 
1733   a->rstart       = oldmat->rstart;
1734   a->rend         = oldmat->rend;
1735   a->cstart       = oldmat->cstart;
1736   a->cend         = oldmat->cend;
1737   a->size         = oldmat->size;
1738   a->rank         = oldmat->rank;
1739   mat->insertmode = NOT_SET_VALUES;
1740   a->rowvalues    = 0;
1741   a->getrowactive = PETSC_FALSE;
1742   a->barray       = 0;
1743 
1744   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1745   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIBAIJ));
1746   a->cowners = a->rowners + a->size + 2;
1747   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1748   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1749   if (oldmat->colmap) {
1750     a->colmap = (int *) PetscMalloc((a->Nbs)*sizeof(int));CHKPTRQ(a->colmap);
1751     PLogObjectMemory(mat,(a->Nbs)*sizeof(int));
1752     PetscMemcpy(a->colmap,oldmat->colmap,(a->Nbs)*sizeof(int));
1753   } else a->colmap = 0;
1754   if (oldmat->garray && (len = ((Mat_SeqBAIJ *) (oldmat->B->data))->nbs)) {
1755     a->garray = (int *) PetscMalloc(len*sizeof(int)); CHKPTRQ(a->garray);
1756     PLogObjectMemory(mat,len*sizeof(int));
1757     PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1758   } else a->garray = 0;
1759 
1760   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1761   PLogObjectParent(mat,a->lvec);
1762   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1763   PLogObjectParent(mat,a->Mvctx);
1764   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1765   PLogObjectParent(mat,a->A);
1766   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1767   PLogObjectParent(mat,a->B);
1768   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1769   if (flg) {
1770     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1771   }
1772   *newmat = mat;
1773   PetscFunctionReturn(0);
1774 }
1775 
1776 #include "sys.h"
1777 
1778 #undef __FUNC__
1779 #define __FUNC__ "MatLoad_MPIBAIJ"
1780 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
1781 {
1782   Mat          A;
1783   int          i, nz, ierr, j,rstart, rend, fd;
1784   Scalar       *vals,*buf;
1785   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1786   MPI_Status   status;
1787   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
1788   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
1789   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,bs2,Mbs,mbs,extra_rows;
1790   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
1791   int          dcount,kmax,k,nzcount,tmp;
1792 
1793   PetscFunctionBegin;
1794   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
1795   bs2  = bs*bs;
1796 
1797   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1798   if (!rank) {
1799     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1800     ierr = PetscBinaryRead(fd,(char *)header,4,PETSC_INT); CHKERRQ(ierr);
1801     if (header[0] != MAT_COOKIE) SETERRQ(1,0,"not matrix object");
1802     if (header[3] < 0) {
1803       SETERRQ(1,1,"Matrix stored in special format on disk, cannot load as MPIBAIJ");
1804     }
1805   }
1806 
1807   ierr = MPI_Bcast(header+1,3,MPI_INT,0,comm);CHKERRQ(ierr);
1808   M = header[1]; N = header[2];
1809 
1810   if (M != N) SETERRQ(1,0,"Can only do square matrices");
1811 
1812   /*
1813      This code adds extra rows to make sure the number of rows is
1814      divisible by the blocksize
1815   */
1816   Mbs        = M/bs;
1817   extra_rows = bs - M + bs*(Mbs);
1818   if (extra_rows == bs) extra_rows = 0;
1819   else                  Mbs++;
1820   if (extra_rows &&!rank) {
1821     PLogInfo(0,"MatLoad_MPIBAIJ:Padding loaded matrix to match blocksize\n");
1822   }
1823 
1824   /* determine ownership of all rows */
1825   mbs = Mbs/size + ((Mbs % size) > rank);
1826   m   = mbs * bs;
1827   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
1828   browners = rowners + size + 1;
1829   ierr = MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1830   rowners[0] = 0;
1831   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
1832   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
1833   rstart = rowners[rank];
1834   rend   = rowners[rank+1];
1835 
1836   /* distribute row lengths to all processors */
1837   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
1838   if (!rank) {
1839     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
1840     ierr = PetscBinaryRead(fd,rowlengths,M,PETSC_INT); CHKERRQ(ierr);
1841     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
1842     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1843     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
1844     ierr = MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);CHKERRQ(ierr);
1845     PetscFree(sndcounts);
1846   } else {
1847     ierr = MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);CHKERRQ(ierr);
1848   }
1849 
1850   if (!rank) {
1851     /* calculate the number of nonzeros on each processor */
1852     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1853     PetscMemzero(procsnz,size*sizeof(int));
1854     for ( i=0; i<size; i++ ) {
1855       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
1856         procsnz[i] += rowlengths[j];
1857       }
1858     }
1859     PetscFree(rowlengths);
1860 
1861     /* determine max buffer needed and allocate it */
1862     maxnz = 0;
1863     for ( i=0; i<size; i++ ) {
1864       maxnz = PetscMax(maxnz,procsnz[i]);
1865     }
1866     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1867 
1868     /* read in my part of the matrix column indices  */
1869     nz = procsnz[0];
1870     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1871     mycols = ibuf;
1872     if (size == 1)  nz -= extra_rows;
1873     ierr = PetscBinaryRead(fd,mycols,nz,PETSC_INT); CHKERRQ(ierr);
1874     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
1875 
1876     /* read in every ones (except the last) and ship off */
1877     for ( i=1; i<size-1; i++ ) {
1878       nz   = procsnz[i];
1879       ierr = PetscBinaryRead(fd,cols,nz,PETSC_INT); CHKERRQ(ierr);
1880       ierr = MPI_Send(cols,nz,MPI_INT,i,tag,comm);CHKERRQ(ierr);
1881     }
1882     /* read in the stuff for the last proc */
1883     if ( size != 1 ) {
1884       nz   = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
1885       ierr = PetscBinaryRead(fd,cols,nz,PETSC_INT); CHKERRQ(ierr);
1886       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
1887       ierr = MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);CHKERRQ(ierr);
1888     }
1889     PetscFree(cols);
1890   } else {
1891     /* determine buffer space needed for message */
1892     nz = 0;
1893     for ( i=0; i<m; i++ ) {
1894       nz += locrowlens[i];
1895     }
1896     ibuf   = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1897     mycols = ibuf;
1898     /* receive message of column indices*/
1899     ierr = MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);CHKERRQ(ierr);
1900     ierr = MPI_Get_count(&status,MPI_INT,&maxnz);CHKERRQ(ierr);
1901     if (maxnz != nz) SETERRQ(1,0,"something is wrong with file");
1902   }
1903 
1904   /* loop over local rows, determining number of off diagonal entries */
1905   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
1906   odlens = dlens + (rend-rstart);
1907   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
1908   PetscMemzero(mask,3*Mbs*sizeof(int));
1909   masked1 = mask    + Mbs;
1910   masked2 = masked1 + Mbs;
1911   rowcount = 0; nzcount = 0;
1912   for ( i=0; i<mbs; i++ ) {
1913     dcount  = 0;
1914     odcount = 0;
1915     for ( j=0; j<bs; j++ ) {
1916       kmax = locrowlens[rowcount];
1917       for ( k=0; k<kmax; k++ ) {
1918         tmp = mycols[nzcount++]/bs;
1919         if (!mask[tmp]) {
1920           mask[tmp] = 1;
1921           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
1922           else masked1[dcount++] = tmp;
1923         }
1924       }
1925       rowcount++;
1926     }
1927 
1928     dlens[i]  = dcount;
1929     odlens[i] = odcount;
1930 
1931     /* zero out the mask elements we set */
1932     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
1933     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
1934   }
1935 
1936   /* create our matrix */
1937   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);
1938          CHKERRQ(ierr);
1939   A = *newmat;
1940   MatSetOption(A,MAT_COLUMNS_SORTED);
1941 
1942   if (!rank) {
1943     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
1944     /* read in my part of the matrix numerical values  */
1945     nz = procsnz[0];
1946     vals = buf;
1947     mycols = ibuf;
1948     if (size == 1)  nz -= extra_rows;
1949     ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
1950     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
1951 
1952     /* insert into matrix */
1953     jj      = rstart*bs;
1954     for ( i=0; i<m; i++ ) {
1955       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1956       mycols += locrowlens[i];
1957       vals   += locrowlens[i];
1958       jj++;
1959     }
1960     /* read in other processors (except the last one) and ship out */
1961     for ( i=1; i<size-1; i++ ) {
1962       nz   = procsnz[i];
1963       vals = buf;
1964       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
1965       ierr = MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);CHKERRQ(ierr);
1966     }
1967     /* the last proc */
1968     if ( size != 1 ){
1969       nz   = procsnz[i] - extra_rows;
1970       vals = buf;
1971       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
1972       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
1973       ierr = MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);CHKERRQ(ierr);
1974     }
1975     PetscFree(procsnz);
1976   } else {
1977     /* receive numeric values */
1978     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
1979 
1980     /* receive message of values*/
1981     vals   = buf;
1982     mycols = ibuf;
1983     ierr   = MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);CHKERRQ(ierr);
1984     ierr   = MPI_Get_count(&status,MPIU_SCALAR,&maxnz);CHKERRQ(ierr);
1985     if (maxnz != nz) SETERRQ(1,0,"something is wrong with file");
1986 
1987     /* insert into matrix */
1988     jj      = rstart*bs;
1989     for ( i=0; i<m; i++ ) {
1990       ierr    = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1991       mycols += locrowlens[i];
1992       vals   += locrowlens[i];
1993       jj++;
1994     }
1995   }
1996   PetscFree(locrowlens);
1997   PetscFree(buf);
1998   PetscFree(ibuf);
1999   PetscFree(rowners);
2000   PetscFree(dlens);
2001   PetscFree(mask);
2002   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2003   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2004   PetscFunctionReturn(0);
2005 }
2006 
2007 
2008