xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision 55843e3e646b2cfef4dff8bdf6ad93160bb9339c)
1 #ifdef PETSC_RCS_HEADER
2 static char vcid[] = "$Id: mpibaij.c,v 1.87 1997/11/03 04:46:15 bsmith Exp bsmith $";
3 #endif
4 
5 #include "pinclude/pviewer.h"
6 #include "src/mat/impls/baij/mpi/mpibaij.h"
7 #include "src/vec/vecimpl.h"
8 
9 
10 extern int MatSetUpMultiply_MPIBAIJ(Mat);
11 extern int DisAssemble_MPIBAIJ(Mat);
12 extern int MatIncreaseOverlap_MPIBAIJ(Mat,int,IS *,int);
13 extern int MatGetSubMatrices_MPIBAIJ(Mat,int,IS *,IS *,MatGetSubMatrixCall,Mat **);
14 
15 /*
16      Local utility routine that creates a mapping from the global column
17    number to the local number in the off-diagonal part of the local
18    storage of the matrix.  This is done in a non scable way since the
19    length of colmap equals the global matrix length.
20 */
21 #undef __FUNC__
22 #define __FUNC__ "CreateColmap_MPIBAIJ_Private"
23 static int CreateColmap_MPIBAIJ_Private(Mat mat)
24 {
25   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
26   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
27   int         nbs = B->nbs,i,bs=B->bs;;
28 
29   PetscFunctionBegin;
30   baij->colmap = (int *) PetscMalloc((baij->Nbs+1)*sizeof(int));CHKPTRQ(baij->colmap);
31   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
32   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
33   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i*bs+1;
34   PetscFunctionReturn(0);
35 }
36 
37 #define CHUNKSIZE  10
38 
39 #define  MatSetValues_SeqBAIJ_A_Private(row,col,value,addv) \
40 { \
41  \
42     brow = row/bs;  \
43     rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
44     rmax = aimax[brow]; nrow = ailen[brow]; \
45       bcol = col/bs; \
46       ridx = row % bs; cidx = col % bs; \
47       low = 0; high = nrow; \
48       while (high-low > 3) { \
49         t = (low+high)/2; \
50         if (rp[t] > bcol) high = t; \
51         else              low  = t; \
52       } \
53       for ( _i=low; _i<high; _i++ ) { \
54         if (rp[_i] > bcol) break; \
55         if (rp[_i] == bcol) { \
56           bap  = ap +  bs2*_i + bs*cidx + ridx; \
57           if (addv == ADD_VALUES) *bap += value;  \
58           else                    *bap  = value;  \
59           goto a_noinsert; \
60         } \
61       } \
62       if (a->nonew == 1) goto a_noinsert; \
63       else if (a->nonew == -1) SETERRQ(1,0,"Inserting a new nonzero in the matrix"); \
64       if (nrow >= rmax) { \
65         /* there is no extra room in row, therefore enlarge */ \
66         int    new_nz = ai[a->mbs] + CHUNKSIZE,len,*new_i,*new_j; \
67         Scalar *new_a; \
68  \
69         if (a->nonew == -2) SETERRQ(1,0,"Inserting a new nonzero in the matrix"); \
70  \
71         /* malloc new storage space */ \
72         len     = new_nz*(sizeof(int)+bs2*sizeof(Scalar))+(a->mbs+1)*sizeof(int); \
73         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
74         new_j   = (int *) (new_a + bs2*new_nz); \
75         new_i   = new_j + new_nz; \
76  \
77         /* copy over old data into new slots */ \
78         for ( ii=0; ii<brow+1; ii++ ) {new_i[ii] = ai[ii];} \
79         for ( ii=brow+1; ii<a->mbs+1; ii++ ) {new_i[ii] = ai[ii]+CHUNKSIZE;} \
80         PetscMemcpy(new_j,aj,(ai[brow]+nrow)*sizeof(int)); \
81         len = (new_nz - CHUNKSIZE - ai[brow] - nrow); \
82         PetscMemcpy(new_j+ai[brow]+nrow+CHUNKSIZE,aj+ai[brow]+nrow, \
83                                                            len*sizeof(int)); \
84         PetscMemcpy(new_a,aa,(ai[brow]+nrow)*bs2*sizeof(Scalar)); \
85         PetscMemzero(new_a+bs2*(ai[brow]+nrow),bs2*CHUNKSIZE*sizeof(Scalar)); \
86         PetscMemcpy(new_a+bs2*(ai[brow]+nrow+CHUNKSIZE), \
87                     aa+bs2*(ai[brow]+nrow),bs2*len*sizeof(Scalar));  \
88         /* free up old matrix storage */ \
89         PetscFree(a->a);  \
90         if (!a->singlemalloc) {PetscFree(a->i);PetscFree(a->j);} \
91         aa = a->a = new_a; ai = a->i = new_i; aj = a->j = new_j;  \
92         a->singlemalloc = 1; \
93  \
94         rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
95         rmax = aimax[brow] = aimax[brow] + CHUNKSIZE; \
96         PLogObjectMemory(A,CHUNKSIZE*(sizeof(int) + bs2*sizeof(Scalar))); \
97         a->maxnz += bs2*CHUNKSIZE; \
98         a->reallocs++; \
99         a->nz++; \
100       } \
101       N = nrow++ - 1;  \
102       /* shift up all the later entries in this row */ \
103       for ( ii=N; ii>=_i; ii-- ) { \
104         rp[ii+1] = rp[ii]; \
105         PetscMemcpy(ap+bs2*(ii+1),ap+bs2*(ii),bs2*sizeof(Scalar)); \
106       } \
107       if (N>=_i) PetscMemzero(ap+bs2*_i,bs2*sizeof(Scalar));  \
108       rp[_i]                      = bcol;  \
109       ap[bs2*_i + bs*cidx + ridx] = value;  \
110       a_noinsert:; \
111     ailen[brow] = nrow; \
112 }
113 
114 #define  MatSetValues_SeqBAIJ_B_Private(row,col,value,addv) \
115 { \
116  \
117     brow = row/bs;  \
118     rp   = bj + bi[brow]; ap = ba + bs2*bi[brow]; \
119     rmax = bimax[brow]; nrow = bilen[brow]; \
120       bcol = col/bs; \
121       ridx = row % bs; cidx = col % bs; \
122       low = 0; high = nrow; \
123       while (high-low > 3) { \
124         t = (low+high)/2; \
125         if (rp[t] > bcol) high = t; \
126         else              low  = t; \
127       } \
128       for ( _i=low; _i<high; _i++ ) { \
129         if (rp[_i] > bcol) break; \
130         if (rp[_i] == bcol) { \
131           bap  = ap +  bs2*_i + bs*cidx + ridx; \
132           if (addv == ADD_VALUES) *bap += value;  \
133           else                    *bap  = value;  \
134           goto b_noinsert; \
135         } \
136       } \
137       if (b->nonew == 1) goto b_noinsert; \
138       else if (b->nonew == -1) SETERRQ(1,0,"Inserting a new nonzero in the matrix"); \
139       if (nrow >= rmax) { \
140         /* there is no extra room in row, therefore enlarge */ \
141         int    new_nz = bi[b->mbs] + CHUNKSIZE,len,*new_i,*new_j; \
142         Scalar *new_a; \
143  \
144         if (b->nonew == -2) SETERRQ(1,0,"Inserting a new nonzero in the matrix"); \
145  \
146         /* malloc new storage space */ \
147         len     = new_nz*(sizeof(int)+bs2*sizeof(Scalar))+(b->mbs+1)*sizeof(int); \
148         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
149         new_j   = (int *) (new_a + bs2*new_nz); \
150         new_i   = new_j + new_nz; \
151  \
152         /* copy over old data into new slots */ \
153         for ( ii=0; ii<brow+1; ii++ ) {new_i[ii] = bi[ii];} \
154         for ( ii=brow+1; ii<b->mbs+1; ii++ ) {new_i[ii] = bi[ii]+CHUNKSIZE;} \
155         PetscMemcpy(new_j,bj,(bi[brow]+nrow)*sizeof(int)); \
156         len = (new_nz - CHUNKSIZE - bi[brow] - nrow); \
157         PetscMemcpy(new_j+bi[brow]+nrow+CHUNKSIZE,bj+bi[brow]+nrow, \
158                                                            len*sizeof(int)); \
159         PetscMemcpy(new_a,ba,(bi[brow]+nrow)*bs2*sizeof(Scalar)); \
160         PetscMemzero(new_a+bs2*(bi[brow]+nrow),bs2*CHUNKSIZE*sizeof(Scalar)); \
161         PetscMemcpy(new_a+bs2*(bi[brow]+nrow+CHUNKSIZE), \
162                     ba+bs2*(bi[brow]+nrow),bs2*len*sizeof(Scalar));  \
163         /* free up old matrix storage */ \
164         PetscFree(b->a);  \
165         if (!b->singlemalloc) {PetscFree(b->i);PetscFree(b->j);} \
166         ba = b->a = new_a; bi = b->i = new_i; bj = b->j = new_j;  \
167         b->singlemalloc = 1; \
168  \
169         rp   = bj + bi[brow]; ap = ba + bs2*bi[brow]; \
170         rmax = bimax[brow] = bimax[brow] + CHUNKSIZE; \
171         PLogObjectMemory(B,CHUNKSIZE*(sizeof(int) + bs2*sizeof(Scalar))); \
172         b->maxnz += bs2*CHUNKSIZE; \
173         b->reallocs++; \
174         b->nz++; \
175       } \
176       N = nrow++ - 1;  \
177       /* shift up all the later entries in this row */ \
178       for ( ii=N; ii>=_i; ii-- ) { \
179         rp[ii+1] = rp[ii]; \
180         PetscMemcpy(ap+bs2*(ii+1),ap+bs2*(ii),bs2*sizeof(Scalar)); \
181       } \
182       if (N>=_i) PetscMemzero(ap+bs2*_i,bs2*sizeof(Scalar));  \
183       rp[_i]                      = bcol;  \
184       ap[bs2*_i + bs*cidx + ridx] = value;  \
185       b_noinsert:; \
186     bilen[brow] = nrow; \
187 }
188 
189 #undef __FUNC__
190 #define __FUNC__ "MatSetValues_MPIBAIJ"
191 int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
192 {
193   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
194   Scalar      value;
195   int         ierr,i,j,row,col;
196   int         roworiented = baij->roworiented,rstart_orig=baij->rstart_bs ;
197   int         rend_orig=baij->rend_bs,cstart_orig=baij->cstart_bs;
198   int         cend_orig=baij->cend_bs,bs=baij->bs;
199 
200   /* Some Variables required in the macro */
201   Mat         A = baij->A;
202   Mat_SeqBAIJ *a = (Mat_SeqBAIJ *) (A)->data;
203   int         *aimax=a->imax,*ai=a->i,*ailen=a->ilen,*aj=a->j;
204   Scalar      *aa=a->a;
205 
206   Mat         B = baij->B;
207   Mat_SeqBAIJ *b = (Mat_SeqBAIJ *) (B)->data;
208   int         *bimax=b->imax,*bi=b->i,*bilen=b->ilen,*bj=b->j;
209   Scalar      *ba=b->a;
210 
211   int         *rp,ii,nrow,_i,rmax,N,brow,bcol;
212   int         low,high,t,ridx,cidx,bs2=a->bs2;
213   Scalar      *ap,*bap;
214 
215   PetscFunctionBegin;
216   for ( i=0; i<m; i++ ) {
217 #if defined(USE_PETSC_BOPT_g)
218     if (im[i] < 0) SETERRQ(1,0,"Negative row");
219     if (im[i] >= baij->M) SETERRQ(1,0,"Row too large");
220 #endif
221     if (im[i] >= rstart_orig && im[i] < rend_orig) {
222       row = im[i] - rstart_orig;
223       for ( j=0; j<n; j++ ) {
224         if (in[j] >= cstart_orig && in[j] < cend_orig){
225           col = in[j] - cstart_orig;
226           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
227           MatSetValues_SeqBAIJ_A_Private(row,col,value,addv);
228           /* ierr = MatSetValues_SeqBAIJ(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
229         }
230 #if defined(USE_PETSC_BOPT_g)
231         else if (in[j] < 0) {SETERRQ(1,0,"Negative column");}
232         else if (in[j] >= baij->N) {SETERRQ(1,0,"Col too large");}
233 #endif
234         else {
235           if (mat->was_assembled) {
236             if (!baij->colmap) {
237               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
238             }
239             col = baij->colmap[in[j]/bs] - 1 + in[j]%bs;
240             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
241               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
242               col =  in[j];
243               /* Reinitialize the variables required by MatSetValues_SeqBAIJ_B_Private() */
244               B = baij->B;
245               b = (Mat_SeqBAIJ *) (B)->data;
246               bimax=b->imax;bi=b->i;bilen=b->ilen;bj=b->j;
247               ba=b->a;
248             }
249           } else col = in[j];
250           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
251           MatSetValues_SeqBAIJ_B_Private(row,col,value,addv);
252           /* ierr = MatSetValues_SeqBAIJ(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
253         }
254       }
255     } else {
256       if (roworiented && !baij->donotstash) {
257         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
258       } else {
259         if (!baij->donotstash) {
260           row = im[i];
261 	  for ( j=0; j<n; j++ ) {
262 	    ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
263           }
264         }
265       }
266     }
267   }
268   PetscFunctionReturn(0);
269 }
270 
271 extern int MatSetValuesBlocked_SeqBAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
272 #undef __FUNC__
273 #define __FUNC__ "MatSetValuesBlocked_MPIBAIJ"
274 int MatSetValuesBlocked_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
275 {
276   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
277   Scalar      *value,*barray=baij->barray;
278   int         ierr,i,j,ii,jj,row,col,k,l;
279   int         roworiented = baij->roworiented,rstart=baij->rstart ;
280   int         rend=baij->rend,cstart=baij->cstart,stepval;
281   int         cend=baij->cend,bs=baij->bs,bs2=baij->bs2;
282 
283   if(!barray) {
284     baij->barray = barray = (Scalar*) PetscMalloc(bs2*sizeof(Scalar)); CHKPTRQ(barray);
285   }
286 
287   if (roworiented) {
288     stepval = (n-1)*bs;
289   } else {
290     stepval = (m-1)*bs;
291   }
292   for ( i=0; i<m; i++ ) {
293 #if defined(USE_PETSC_BOPT_g)
294     if (im[i] < 0) SETERRQ(1,0,"Negative row");
295     if (im[i] >= baij->Mbs) SETERRQ(1,0,"Row too large");
296 #endif
297     if (im[i] >= rstart && im[i] < rend) {
298       row = im[i] - rstart;
299       for ( j=0; j<n; j++ ) {
300         /* If NumCol = 1 then a copy is not required */
301         if ((roworiented) && (n == 1)) {
302           barray = v + i*bs2;
303         } else if((!roworiented) && (m == 1)) {
304           barray = v + j*bs2;
305         } else { /* Here a copy is required */
306           if (roworiented) {
307             value = v + i*(stepval+bs)*bs + j*bs;
308           } else {
309             value = v + j*(stepval+bs)*bs + i*bs;
310           }
311           for ( ii=0; ii<bs; ii++,value+=stepval ) {
312             for (jj=0; jj<bs; jj++ ) {
313               *barray++  = *value++;
314             }
315           }
316           barray -=bs2;
317         }
318 
319         if (in[j] >= cstart && in[j] < cend){
320           col  = in[j] - cstart;
321           ierr = MatSetValuesBlocked_SeqBAIJ(baij->A,1,&row,1,&col,barray,addv);CHKERRQ(ierr);
322         }
323 #if defined(USE_PETSC_BOPT_g)
324         else if (in[j] < 0) {SETERRQ(1,0,"Negative column");}
325         else if (in[j] >= baij->Nbs) {SETERRQ(1,0,"Column too large");}
326 #endif
327         else {
328           if (mat->was_assembled) {
329             if (!baij->colmap) {
330               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
331             }
332 
333 #if defined(USE_PETSC_BOPT_g)
334             if ((baij->colmap[in[j]] - 1) % bs) {SETERRQ(1,0,"Incorrect colmap");}
335 #endif
336             col = (baij->colmap[in[j]] - 1)/bs;
337             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
338               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
339               col =  in[j];
340             }
341           }
342           else col = in[j];
343           ierr = MatSetValuesBlocked_SeqBAIJ(baij->B,1,&row,1,&col,barray,addv);CHKERRQ(ierr);
344         }
345       }
346     } else {
347       if (!baij->donotstash) {
348         if (roworiented ) {
349           row   = im[i]*bs;
350           value = v + i*(stepval+bs)*bs;
351           for ( j=0; j<bs; j++,row++ ) {
352             for ( k=0; k<n; k++ ) {
353               for ( col=in[k]*bs,l=0; l<bs; l++,col++) {
354                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
355               }
356             }
357           }
358         } else {
359           for ( j=0; j<n; j++ ) {
360             value = v + j*(stepval+bs)*bs + i*bs;
361             col   = in[j]*bs;
362             for ( k=0; k<bs; k++,col++,value+=stepval) {
363               for ( row = im[i]*bs,l=0; l<bs; l++,row++) {
364                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
365               }
366             }
367           }
368         }
369       }
370     }
371   }
372   PetscFunctionReturn(0);
373 }
374 
375 #undef __FUNC__
376 #define __FUNC__ "MatGetValues_MPIBAIJ"
377 int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
378 {
379   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
380   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
381   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col;
382 
383   for ( i=0; i<m; i++ ) {
384     if (idxm[i] < 0) SETERRQ(1,0,"Negative row");
385     if (idxm[i] >= baij->M) SETERRQ(1,0,"Row too large");
386     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
387       row = idxm[i] - bsrstart;
388       for ( j=0; j<n; j++ ) {
389         if (idxn[j] < 0) SETERRQ(1,0,"Negative column");
390         if (idxn[j] >= baij->N) SETERRQ(1,0,"Col too large");
391         if (idxn[j] >= bscstart && idxn[j] < bscend){
392           col = idxn[j] - bscstart;
393           ierr = MatGetValues(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
394         } else {
395           if (!baij->colmap) {
396             ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
397           }
398           if((baij->colmap[idxn[j]/bs]-1 < 0) ||
399              (baij->garray[(baij->colmap[idxn[j]/bs]-1)/bs] != idxn[j]/bs)) *(v+i*n+j) = 0.0;
400           else {
401             col  = (baij->colmap[idxn[j]/bs]-1) + idxn[j]%bs;
402             ierr = MatGetValues(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
403           }
404         }
405       }
406     } else {
407       SETERRQ(1,0,"Only local values currently supported");
408     }
409   }
410   PetscFunctionReturn(0);
411 }
412 
413 #undef __FUNC__
414 #define __FUNC__ "MatNorm_MPIBAIJ"
415 int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
416 {
417   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
418   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
419   int        ierr, i,bs2=baij->bs2;
420   double     sum = 0.0;
421   Scalar     *v;
422 
423   PetscFunctionBegin;
424   if (baij->size == 1) {
425     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
426   } else {
427     if (type == NORM_FROBENIUS) {
428       v = amat->a;
429       for (i=0; i<amat->nz*bs2; i++ ) {
430 #if defined(USE_PETSC_COMPLEX)
431         sum += real(conj(*v)*(*v)); v++;
432 #else
433         sum += (*v)*(*v); v++;
434 #endif
435       }
436       v = bmat->a;
437       for (i=0; i<bmat->nz*bs2; i++ ) {
438 #if defined(USE_PETSC_COMPLEX)
439         sum += real(conj(*v)*(*v)); v++;
440 #else
441         sum += (*v)*(*v); v++;
442 #endif
443       }
444       ierr = MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);CHKERRQ(ierr);
445       *norm = sqrt(*norm);
446     } else {
447       SETERRQ(PETSC_ERR_SUP,0,"No support for this norm yet");
448     }
449   }
450   PetscFunctionReturn(0);
451 }
452 
453 #undef __FUNC__
454 #define __FUNC__ "MatAssemblyBegin_MPIBAIJ"
455 int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
456 {
457   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
458   MPI_Comm    comm = mat->comm;
459   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
460   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
461   MPI_Request *send_waits,*recv_waits;
462   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
463   InsertMode  addv;
464   Scalar      *rvalues,*svalues;
465 
466   PetscFunctionBegin;
467   /* make sure all processors are either in INSERTMODE or ADDMODE */
468   ierr = MPI_Allreduce(&mat->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);CHKERRQ(ierr);
469   if (addv == (ADD_VALUES|INSERT_VALUES)) {
470     SETERRQ(1,0,"Some processors inserted others added");
471   }
472   mat->insertmode = addv; /* in case this processor had no cache */
473 
474   /*  first count number of contributors to each processor */
475   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
476   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
477   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
478   for ( i=0; i<baij->stash.n; i++ ) {
479     idx = baij->stash.idx[i];
480     for ( j=0; j<size; j++ ) {
481       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
482         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
483       }
484     }
485   }
486   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
487 
488   /* inform other processors of number of messages and max length*/
489   work      = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
490   ierr      = MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
491   nreceives = work[rank];
492   ierr      = MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);CHKERRQ(ierr);
493   nmax      = work[rank];
494   PetscFree(work);
495 
496   /* post receives:
497        1) each message will consist of ordered pairs
498      (global index,value) we store the global index as a double
499      to simplify the message passing.
500        2) since we don't know how long each individual message is we
501      allocate the largest needed buffer for each receive. Potentially
502      this is a lot of wasted space.
503 
504 
505        This could be done better.
506   */
507   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));CHKPTRQ(rvalues);
508   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
509   for ( i=0; i<nreceives; i++ ) {
510     ierr = MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,comm,recv_waits+i);CHKERRQ(ierr);
511   }
512 
513   /* do sends:
514       1) starts[i] gives the starting index in svalues for stuff going to
515          the ith processor
516   */
517   svalues = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
518   send_waits = (MPI_Request *) PetscMalloc((nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
519   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
520   starts[0] = 0;
521   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
522   for ( i=0; i<baij->stash.n; i++ ) {
523     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
524     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
525     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
526   }
527   PetscFree(owner);
528   starts[0] = 0;
529   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
530   count = 0;
531   for ( i=0; i<size; i++ ) {
532     if (procs[i]) {
533       ierr = MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,comm,send_waits+count++);CHKERRQ(ierr);
534     }
535   }
536   PetscFree(starts); PetscFree(nprocs);
537 
538   /* Free cache space */
539   PLogInfo(mat,"MatAssemblyBegin_MPIBAIJ:Number of off-processor values %d\n",baij->stash.n);
540   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
541 
542   baij->svalues    = svalues;    baij->rvalues    = rvalues;
543   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
544   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
545   baij->rmax       = nmax;
546 
547   PetscFunctionReturn(0);
548 }
549 #include <math.h>
550 #define HASH_KEY 0.6180339887
551 #define HASH1(size,key) ((int)((size)*fmod(((key)*HASH_KEY),1)))
552 #define HASH2(size,key) ((int)((size)*fmod(((key+0.5)*HASH_KEY),1)))
553 
554 
555 int CreateHashTable(Mat mat,double factor)
556 {
557   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
558   Mat         A = baij->A, B=baij->B;
559   Mat_SeqBAIJ *a=(Mat_SeqBAIJ *)A->data, *b=(Mat_SeqBAIJ *)B->data;
560   int         i,j,k,nz=a->nz+b->nz,h1,h2,*ai=a->i,*aj=a->j,*bi=b->i,*bj=b->j;
561   int         size=(int)(factor*nz),ct=0,max1=0,max2=0;
562   /* Scalar      *aa=a->a,*ba=b->a; */
563   double      key;
564   static double *HT;
565   static      int flag=1;
566   extern int PetscGlobalRank;
567 
568   PetscFunctionBegin;
569   flag = 1;
570   /* Allocate Memory for Hash Table */
571   if (flag) {
572     HT = (double*)PetscMalloc(size*sizeof(double));
573     flag = 0;
574   }
575   PetscMemzero(HT,size*sizeof(double));
576 
577   /* Loop Over A */
578   for ( i=0; i<a->n; i++ ) {
579     for ( j=ai[i]; j<ai[i+1]; j++ ) {
580       key = i*baij->n+aj[j]+1;
581       h1  = HASH1(size,key);
582       h2  = HASH2(size,key);
583 
584       for ( k=1; k<size; k++ ){
585         if (HT[(h1*k)%size] == 0.0) {
586           HT[(h1*k)%size] = key;
587           break;
588         } else ct++;
589       }
590       if (k> max1) max1 =k;
591     }
592   }
593   /* Loop Over B */
594   for ( i=0; i<b->n; i++ ) {
595     for ( j=bi[i]; j<bi[i+1]; j++ ) {
596       key = i*b->n+bj[j]+1;
597       h1  = HASH1(size,key);
598       for ( k=1; k<size; k++ ){
599         if (HT[(h1*k)%size] == 0.0) {
600           HT[(h1*k)%size] = key;
601           break;
602         } else ct++;
603       }
604       if (k> max2) max2 =k;
605     }
606   }
607 
608   /* Print Summary */
609   for ( i=0,key=0.0,j=0; i<size; i++)
610     if (HT[i]) {j++;}
611 
612   printf("[%d] fact = %3.2f max1 = %5d max2 = %5d Size %5d - Searches %5d Avg %5.2f Keys %5d\n",
613          PetscGlobalRank,factor,max1,max2,size,ct+j,((float)ct+j)/j,j);
614   PetscFree(HT);
615   PetscFunctionReturn(0);
616 }
617 
618 #undef __FUNC__
619 #define __FUNC__ "MatAssemblyEnd_MPIBAIJ"
620 int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
621 {
622   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
623   MPI_Status  *send_status,recv_status;
624   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
625   int         bs=baij->bs,row,col,other_disassembled,flg;
626   Scalar      *values,val;
627   InsertMode  addv = mat->insertmode;
628 
629   PetscFunctionBegin;
630   /*  wait on receives */
631   while (count) {
632     ierr = MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);CHKERRQ(ierr);
633     /* unpack receives into our local space */
634     values = baij->rvalues + 3*imdex*baij->rmax;
635     ierr = MPI_Get_count(&recv_status,MPIU_SCALAR,&n);CHKERRQ(ierr);
636     n = n/3;
637     for ( i=0; i<n; i++ ) {
638       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
639       col = (int) PetscReal(values[3*i+1]);
640       val = values[3*i+2];
641       if (col >= baij->cstart*bs && col < baij->cend*bs) {
642         col -= baij->cstart*bs;
643         ierr = MatSetValues(baij->A,1,&row,1,&col,&val,addv); CHKERRQ(ierr)
644       } else {
645         if (mat->was_assembled) {
646           if (!baij->colmap) {
647             ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);
648           }
649           col = (baij->colmap[col/bs]) - 1 + col%bs;
650           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
651             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
652             col = (int) PetscReal(values[3*i+1]);
653           }
654         }
655         ierr = MatSetValues(baij->B,1,&row,1,&col,&val,addv); CHKERRQ(ierr)
656       }
657     }
658     count--;
659   }
660   PetscFree(baij->recv_waits); PetscFree(baij->rvalues);
661 
662   /* wait on sends */
663   if (baij->nsends) {
664     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
665     ierr        = MPI_Waitall(baij->nsends,baij->send_waits,send_status);CHKERRQ(ierr);
666     PetscFree(send_status);
667   }
668   PetscFree(baij->send_waits); PetscFree(baij->svalues);
669 
670   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
671   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
672 
673   /* determine if any processor has disassembled, if so we must
674      also disassemble ourselfs, in order that we may reassemble. */
675   ierr = MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);CHKERRQ(ierr);
676   if (mat->was_assembled && !other_disassembled) {
677     ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
678   }
679 
680   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
681     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
682   }
683   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
684   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
685 
686   ierr = OptionsHasName(PETSC_NULL,"-use_hash",&flg); CHKERRQ(ierr);
687   if (flg) {
688     double fact;
689     for ( fact=1.2; fact<2.0; fact +=0.05) CreateHashTable(mat,fact);
690   }
691   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
692   PetscFunctionReturn(0);
693 }
694 
695 #undef __FUNC__
696 #define __FUNC__ "MatView_MPIBAIJ_Binary"
697 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
698 {
699   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
700   int          ierr;
701 
702   PetscFunctionBegin;
703   if (baij->size == 1) {
704     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
705   } else SETERRQ(1,0,"Only uniprocessor output supported");
706   PetscFunctionReturn(0);
707 }
708 
709 #undef __FUNC__
710 #define __FUNC__ "MatView_MPIBAIJ_ASCIIorDraworMatlab"
711 static int MatView_MPIBAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
712 {
713   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
714   int          ierr, format,rank,bs = baij->bs;
715   FILE         *fd;
716   ViewerType   vtype;
717 
718   PetscFunctionBegin;
719   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
720   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
721     ierr = ViewerGetFormat(viewer,&format);
722     if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
723       MatInfo info;
724       MPI_Comm_rank(mat->comm,&rank);
725       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
726       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
727       PetscSequentialPhaseBegin(mat->comm,1);
728       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
729               rank,baij->m,(int)info.nz_used*bs,(int)info.nz_allocated*bs,
730               baij->bs,(int)info.memory);
731       ierr = MatGetInfo(baij->A,MAT_LOCAL,&info);
732       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
733       ierr = MatGetInfo(baij->B,MAT_LOCAL,&info);
734       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
735       fflush(fd);
736       PetscSequentialPhaseEnd(mat->comm,1);
737       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
738       PetscFunctionReturn(0);
739     } else if (format == VIEWER_FORMAT_ASCII_INFO) {
740       PetscPrintf(mat->comm,"  block size is %d\n",bs);
741       PetscFunctionReturn(0);
742     }
743   }
744 
745   if (vtype == DRAW_VIEWER) {
746     Draw       draw;
747     PetscTruth isnull;
748     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
749     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) PetscFunctionReturn(0);
750   }
751 
752   if (vtype == ASCII_FILE_VIEWER) {
753     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
754     PetscSequentialPhaseBegin(mat->comm,1);
755     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
756            baij->rank,baij->m,baij->rstart*bs,baij->rend*bs,baij->n,
757             baij->cstart*bs,baij->cend*bs);
758     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
759     ierr = MatView(baij->B,viewer); CHKERRQ(ierr);
760     fflush(fd);
761     PetscSequentialPhaseEnd(mat->comm,1);
762   } else {
763     int size = baij->size;
764     rank = baij->rank;
765     if (size == 1) {
766       ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
767     } else {
768       /* assemble the entire matrix onto first processor. */
769       Mat         A;
770       Mat_SeqBAIJ *Aloc;
771       int         M = baij->M, N = baij->N,*ai,*aj,row,col,i,j,k,*rvals;
772       int         mbs=baij->mbs;
773       Scalar      *a;
774 
775       if (!rank) {
776         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);CHKERRQ(ierr);
777       } else {
778         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);CHKERRQ(ierr);
779       }
780       PLogObjectParent(mat,A);
781 
782       /* copy over the A part */
783       Aloc = (Mat_SeqBAIJ*) baij->A->data;
784       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
785       row = baij->rstart;
786       rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
787 
788       for ( i=0; i<mbs; i++ ) {
789         rvals[0] = bs*(baij->rstart + i);
790         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
791         for ( j=ai[i]; j<ai[i+1]; j++ ) {
792           col = (baij->cstart+aj[j])*bs;
793           for (k=0; k<bs; k++ ) {
794             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
795             col++; a += bs;
796           }
797         }
798       }
799       /* copy over the B part */
800       Aloc = (Mat_SeqBAIJ*) baij->B->data;
801       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
802       row = baij->rstart*bs;
803       for ( i=0; i<mbs; i++ ) {
804         rvals[0] = bs*(baij->rstart + i);
805         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
806         for ( j=ai[i]; j<ai[i+1]; j++ ) {
807           col = baij->garray[aj[j]]*bs;
808           for (k=0; k<bs; k++ ) {
809             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
810             col++; a += bs;
811           }
812         }
813       }
814       PetscFree(rvals);
815       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
816       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
817       /*
818          Everyone has to call to draw the matrix since the graphics waits are
819          synchronized across all processors that share the Draw object
820       */
821       if (!rank || vtype == DRAW_VIEWER) {
822         ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
823       }
824       ierr = MatDestroy(A); CHKERRQ(ierr);
825     }
826   }
827   PetscFunctionReturn(0);
828 }
829 
830 
831 
832 #undef __FUNC__
833 #define __FUNC__ "MatView_MPIBAIJ"
834 int MatView_MPIBAIJ(PetscObject obj,Viewer viewer)
835 {
836   Mat         mat = (Mat) obj;
837   int         ierr;
838   ViewerType  vtype;
839 
840   PetscFunctionBegin;
841   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
842   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
843       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
844     ierr = MatView_MPIBAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
845   } else if (vtype == BINARY_FILE_VIEWER) {
846     ierr = MatView_MPIBAIJ_Binary(mat,viewer);CHKERRQ(ierr);
847   }
848   PetscFunctionReturn(0);
849 }
850 
851 #undef __FUNC__
852 #define __FUNC__ "MatDestroy_MPIBAIJ"
853 int MatDestroy_MPIBAIJ(PetscObject obj)
854 {
855   Mat         mat = (Mat) obj;
856   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
857   int         ierr;
858 
859   PetscFunctionBegin;
860 #if defined(USE_PETSC_LOG)
861   PLogObjectState(obj,"Rows=%d, Cols=%d",baij->M,baij->N);
862 #endif
863 
864   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
865   PetscFree(baij->rowners);
866   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
867   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
868   if (baij->colmap) PetscFree(baij->colmap);
869   if (baij->garray) PetscFree(baij->garray);
870   if (baij->lvec)   VecDestroy(baij->lvec);
871   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
872   if (baij->rowvalues) PetscFree(baij->rowvalues);
873   if (baij->barray) PetscFree(baij->barray);
874   PetscFree(baij);
875   PLogObjectDestroy(mat);
876   PetscHeaderDestroy(mat);
877   PetscFunctionReturn(0);
878 }
879 
880 #undef __FUNC__
881 #define __FUNC__ "MatMult_MPIBAIJ"
882 int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
883 {
884   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
885   int         ierr, nt;
886 
887   PetscFunctionBegin;
888   VecGetLocalSize_Fast(xx,nt);
889   if (nt != a->n) {
890     SETERRQ(1,0,"Incompatible partition of A and xx");
891   }
892   VecGetLocalSize_Fast(yy,nt);
893   if (nt != a->m) {
894     SETERRQ(1,0,"Incompatible parition of A and yy");
895   }
896   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
897   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
898   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
899   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
900   ierr = VecScatterPostRecvs(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
901   PetscFunctionReturn(0);
902 }
903 
904 #undef __FUNC__
905 #define __FUNC__ "MatMultAdd_MPIBAIJ"
906 int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
907 {
908   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
909   int        ierr;
910 
911   PetscFunctionBegin;
912   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
913   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
914   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
915   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
916   PetscFunctionReturn(0);
917 }
918 
919 #undef __FUNC__
920 #define __FUNC__ "MatMultTrans_MPIBAIJ"
921 int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
922 {
923   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
924   int         ierr;
925 
926   PetscFunctionBegin;
927   /* do nondiagonal part */
928   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
929   /* send it on its way */
930   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
931   /* do local part */
932   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
933   /* receive remote parts: note this assumes the values are not actually */
934   /* inserted in yy until the next line, which is true for my implementation*/
935   /* but is not perhaps always true. */
936   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
937   PetscFunctionReturn(0);
938 }
939 
940 #undef __FUNC__
941 #define __FUNC__ "MatMultTransAdd_MPIBAIJ"
942 int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
943 {
944   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
945   int         ierr;
946 
947   PetscFunctionBegin;
948   /* do nondiagonal part */
949   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
950   /* send it on its way */
951   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
952   /* do local part */
953   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
954   /* receive remote parts: note this assumes the values are not actually */
955   /* inserted in yy until the next line, which is true for my implementation*/
956   /* but is not perhaps always true. */
957   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
958   PetscFunctionReturn(0);
959 }
960 
961 /*
962   This only works correctly for square matrices where the subblock A->A is the
963    diagonal block
964 */
965 #undef __FUNC__
966 #define __FUNC__ "MatGetDiagonal_MPIBAIJ"
967 int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
968 {
969   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
970   int         ierr;
971 
972   PetscFunctionBegin;
973   if (a->M != a->N) SETERRQ(1,0,"Supports only square matrix where A->A is diag block");
974   ierr = MatGetDiagonal(a->A,v);CHKERRQ(ierr);
975   PetscFunctionReturn(0);
976 }
977 
978 #undef __FUNC__
979 #define __FUNC__ "MatScale_MPIBAIJ"
980 int MatScale_MPIBAIJ(Scalar *aa,Mat A)
981 {
982   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
983   int         ierr;
984 
985   PetscFunctionBegin;
986   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
987   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
988   PetscFunctionReturn(0);
989 }
990 
991 #undef __FUNC__
992 #define __FUNC__ "MatGetSize_MPIBAIJ"
993 int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
994 {
995   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
996 
997   PetscFunctionBegin;
998   if (m) *m = mat->M;
999   if (n) *n = mat->N;
1000   PetscFunctionReturn(0);
1001 }
1002 
1003 #undef __FUNC__
1004 #define __FUNC__ "MatGetLocalSize_MPIBAIJ"
1005 int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
1006 {
1007   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1008 
1009   PetscFunctionBegin;
1010   *m = mat->m; *n = mat->N;
1011   PetscFunctionReturn(0);
1012 }
1013 
1014 #undef __FUNC__
1015 #define __FUNC__ "MatGetOwnershipRange_MPIBAIJ"
1016 int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
1017 {
1018   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1019 
1020   PetscFunctionBegin;
1021   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
1022   PetscFunctionReturn(0);
1023 }
1024 
1025 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
1026 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
1027 
1028 #undef __FUNC__
1029 #define __FUNC__ "MatGetRow_MPIBAIJ"
1030 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
1031 {
1032   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1033   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
1034   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
1035   int        nztot, nzA, nzB, lrow, brstart = mat->rstart*bs, brend = mat->rend*bs;
1036   int        *cmap, *idx_p,cstart = mat->cstart;
1037 
1038   PetscFunctionBegin;
1039   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,0,"Already active");
1040   mat->getrowactive = PETSC_TRUE;
1041 
1042   if (!mat->rowvalues && (idx || v)) {
1043     /*
1044         allocate enough space to hold information from the longest row.
1045     */
1046     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
1047     int     max = 1,mbs = mat->mbs,tmp;
1048     for ( i=0; i<mbs; i++ ) {
1049       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
1050       if (max < tmp) { max = tmp; }
1051     }
1052     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
1053     CHKPTRQ(mat->rowvalues);
1054     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
1055   }
1056 
1057   if (row < brstart || row >= brend) SETERRQ(1,0,"Only local rows")
1058   lrow = row - brstart;
1059 
1060   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
1061   if (!v)   {pvA = 0; pvB = 0;}
1062   if (!idx) {pcA = 0; if (!v) pcB = 0;}
1063   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1064   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1065   nztot = nzA + nzB;
1066 
1067   cmap  = mat->garray;
1068   if (v  || idx) {
1069     if (nztot) {
1070       /* Sort by increasing column numbers, assuming A and B already sorted */
1071       int imark = -1;
1072       if (v) {
1073         *v = v_p = mat->rowvalues;
1074         for ( i=0; i<nzB; i++ ) {
1075           if (cmap[cworkB[i]/bs] < cstart)   v_p[i] = vworkB[i];
1076           else break;
1077         }
1078         imark = i;
1079         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
1080         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
1081       }
1082       if (idx) {
1083         *idx = idx_p = mat->rowindices;
1084         if (imark > -1) {
1085           for ( i=0; i<imark; i++ ) {
1086             idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs;
1087           }
1088         } else {
1089           for ( i=0; i<nzB; i++ ) {
1090             if (cmap[cworkB[i]/bs] < cstart)
1091               idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
1092             else break;
1093           }
1094           imark = i;
1095         }
1096         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart*bs + cworkA[i];
1097         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
1098       }
1099     } else {
1100       if (idx) *idx = 0;
1101       if (v)   *v   = 0;
1102     }
1103   }
1104   *nz = nztot;
1105   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1106   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1107   PetscFunctionReturn(0);
1108 }
1109 
1110 #undef __FUNC__
1111 #define __FUNC__ "MatRestoreRow_MPIBAIJ"
1112 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
1113 {
1114   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1115 
1116   PetscFunctionBegin;
1117   if (baij->getrowactive == PETSC_FALSE) {
1118     SETERRQ(1,0,"MatGetRow not called");
1119   }
1120   baij->getrowactive = PETSC_FALSE;
1121   PetscFunctionReturn(0);
1122 }
1123 
1124 #undef __FUNC__
1125 #define __FUNC__ "MatGetBlockSize_MPIBAIJ"
1126 int MatGetBlockSize_MPIBAIJ(Mat mat,int *bs)
1127 {
1128   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1129 
1130   PetscFunctionBegin;
1131   *bs = baij->bs;
1132   PetscFunctionReturn(0);
1133 }
1134 
1135 #undef __FUNC__
1136 #define __FUNC__ "MatZeroEntries_MPIBAIJ"
1137 int MatZeroEntries_MPIBAIJ(Mat A)
1138 {
1139   Mat_MPIBAIJ *l = (Mat_MPIBAIJ *) A->data;
1140   int         ierr;
1141 
1142   PetscFunctionBegin;
1143   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
1144   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
1145   PetscFunctionReturn(0);
1146 }
1147 
1148 #undef __FUNC__
1149 #define __FUNC__ "MatGetInfo_MPIBAIJ"
1150 int MatGetInfo_MPIBAIJ(Mat matin,MatInfoType flag,MatInfo *info)
1151 {
1152   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) matin->data;
1153   Mat         A = a->A, B = a->B;
1154   int         ierr;
1155   double      isend[5], irecv[5];
1156 
1157   PetscFunctionBegin;
1158   info->rows_global    = (double)a->M;
1159   info->columns_global = (double)a->N;
1160   info->rows_local     = (double)a->m;
1161   info->columns_local  = (double)a->N;
1162   info->block_size     = (double)a->bs;
1163   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
1164   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->memory;
1165   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
1166   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->memory;
1167   if (flag == MAT_LOCAL) {
1168     info->nz_used      = isend[0];
1169     info->nz_allocated = isend[1];
1170     info->nz_unneeded  = isend[2];
1171     info->memory       = isend[3];
1172     info->mallocs      = isend[4];
1173   } else if (flag == MAT_GLOBAL_MAX) {
1174     ierr = MPI_Allreduce(isend,irecv,5,MPI_INT,MPI_MAX,matin->comm);CHKERRQ(ierr);
1175     info->nz_used      = irecv[0];
1176     info->nz_allocated = irecv[1];
1177     info->nz_unneeded  = irecv[2];
1178     info->memory       = irecv[3];
1179     info->mallocs      = irecv[4];
1180   } else if (flag == MAT_GLOBAL_SUM) {
1181     ierr = MPI_Allreduce(isend,irecv,5,MPI_INT,MPI_SUM,matin->comm);CHKERRQ(ierr);
1182     info->nz_used      = irecv[0];
1183     info->nz_allocated = irecv[1];
1184     info->nz_unneeded  = irecv[2];
1185     info->memory       = irecv[3];
1186     info->mallocs      = irecv[4];
1187   }
1188   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
1189   info->fill_ratio_needed = 0;
1190   info->factor_mallocs    = 0;
1191   PetscFunctionReturn(0);
1192 }
1193 
1194 #undef __FUNC__
1195 #define __FUNC__ "MatSetOption_MPIBAIJ"
1196 int MatSetOption_MPIBAIJ(Mat A,MatOption op)
1197 {
1198   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1199 
1200   PetscFunctionBegin;
1201   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
1202       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
1203       op == MAT_COLUMNS_UNSORTED ||
1204       op == MAT_COLUMNS_SORTED ||
1205       op == MAT_NEW_NONZERO_ALLOCATION_ERROR ||
1206       op == MAT_NEW_NONZERO_LOCATION_ERROR) {
1207         MatSetOption(a->A,op);
1208         MatSetOption(a->B,op);
1209   } else if (op == MAT_ROW_ORIENTED) {
1210         a->roworiented = 1;
1211         MatSetOption(a->A,op);
1212         MatSetOption(a->B,op);
1213   } else if (op == MAT_ROWS_SORTED ||
1214              op == MAT_ROWS_UNSORTED ||
1215              op == MAT_SYMMETRIC ||
1216              op == MAT_STRUCTURALLY_SYMMETRIC ||
1217              op == MAT_YES_NEW_DIAGONALS)
1218     PLogInfo(A,"Info:MatSetOption_MPIBAIJ:Option ignored\n");
1219   else if (op == MAT_COLUMN_ORIENTED) {
1220     a->roworiented = 0;
1221     MatSetOption(a->A,op);
1222     MatSetOption(a->B,op);
1223   } else if (op == MAT_IGNORE_OFF_PROC_ENTRIES) {
1224     a->donotstash = 1;
1225   } else if (op == MAT_NO_NEW_DIAGONALS) {
1226     SETERRQ(PETSC_ERR_SUP,0,"MAT_NO_NEW_DIAGONALS");
1227   } else {
1228     SETERRQ(PETSC_ERR_SUP,0,"unknown option");
1229   }
1230   PetscFunctionReturn(0);
1231 }
1232 
1233 #undef __FUNC__
1234 #define __FUNC__ "MatTranspose_MPIBAIJ("
1235 int MatTranspose_MPIBAIJ(Mat A,Mat *matout)
1236 {
1237   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) A->data;
1238   Mat_SeqBAIJ *Aloc;
1239   Mat        B;
1240   int        ierr,M=baij->M,N=baij->N,*ai,*aj,row,i,*rvals,j,k,col;
1241   int        bs=baij->bs,mbs=baij->mbs;
1242   Scalar     *a;
1243 
1244   PetscFunctionBegin;
1245   if (matout == PETSC_NULL && M != N) SETERRQ(1,0,"Square matrix only for in-place");
1246   ierr = MatCreateMPIBAIJ(A->comm,baij->bs,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);
1247   CHKERRQ(ierr);
1248 
1249   /* copy over the A part */
1250   Aloc = (Mat_SeqBAIJ*) baij->A->data;
1251   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1252   row = baij->rstart;
1253   rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
1254 
1255   for ( i=0; i<mbs; i++ ) {
1256     rvals[0] = bs*(baij->rstart + i);
1257     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1258     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1259       col = (baij->cstart+aj[j])*bs;
1260       for (k=0; k<bs; k++ ) {
1261         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1262         col++; a += bs;
1263       }
1264     }
1265   }
1266   /* copy over the B part */
1267   Aloc = (Mat_SeqBAIJ*) baij->B->data;
1268   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1269   row = baij->rstart*bs;
1270   for ( i=0; i<mbs; i++ ) {
1271     rvals[0] = bs*(baij->rstart + i);
1272     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1273     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1274       col = baij->garray[aj[j]]*bs;
1275       for (k=0; k<bs; k++ ) {
1276         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1277         col++; a += bs;
1278       }
1279     }
1280   }
1281   PetscFree(rvals);
1282   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1283   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1284 
1285   if (matout != PETSC_NULL) {
1286     *matout = B;
1287   } else {
1288     /* This isn't really an in-place transpose .... but free data structures from baij */
1289     PetscFree(baij->rowners);
1290     ierr = MatDestroy(baij->A); CHKERRQ(ierr);
1291     ierr = MatDestroy(baij->B); CHKERRQ(ierr);
1292     if (baij->colmap) PetscFree(baij->colmap);
1293     if (baij->garray) PetscFree(baij->garray);
1294     if (baij->lvec) VecDestroy(baij->lvec);
1295     if (baij->Mvctx) VecScatterDestroy(baij->Mvctx);
1296     PetscFree(baij);
1297     PetscMemcpy(A,B,sizeof(struct _p_Mat));
1298     PetscHeaderDestroy(B);
1299   }
1300   PetscFunctionReturn(0);
1301 }
1302 
1303 #undef __FUNC__
1304 #define __FUNC__ "MatDiagonalScale_MPIBAIJ"
1305 int MatDiagonalScale_MPIBAIJ(Mat A,Vec ll,Vec rr)
1306 {
1307   Mat a = ((Mat_MPIBAIJ *) A->data)->A;
1308   Mat b = ((Mat_MPIBAIJ *) A->data)->B;
1309   int ierr,s1,s2,s3;
1310 
1311   PetscFunctionBegin;
1312   if (ll)  {
1313     ierr = VecGetLocalSize(ll,&s1); CHKERRQ(ierr);
1314     ierr = MatGetLocalSize(A,&s2,&s3); CHKERRQ(ierr);
1315     if (s1!=s2) SETERRQ(1,0,"non-conforming local sizes");
1316     ierr = MatDiagonalScale(a,ll,0); CHKERRQ(ierr);
1317     ierr = MatDiagonalScale(b,ll,0); CHKERRQ(ierr);
1318   }
1319   if (rr) SETERRQ(1,0,"not supported for right vector");
1320   PetscFunctionReturn(0);
1321 }
1322 
1323 /* the code does not do the diagonal entries correctly unless the
1324    matrix is square and the column and row owerships are identical.
1325    This is a BUG. The only way to fix it seems to be to access
1326    baij->A and baij->B directly and not through the MatZeroRows()
1327    routine.
1328 */
1329 #undef __FUNC__
1330 #define __FUNC__ "MatZeroRows_MPIBAIJ"
1331 int MatZeroRows_MPIBAIJ(Mat A,IS is,Scalar *diag)
1332 {
1333   Mat_MPIBAIJ    *l = (Mat_MPIBAIJ *) A->data;
1334   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
1335   int            *procs,*nprocs,j,found,idx,nsends,*work;
1336   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
1337   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
1338   int            *lens,imdex,*lrows,*values,bs=l->bs;
1339   MPI_Comm       comm = A->comm;
1340   MPI_Request    *send_waits,*recv_waits;
1341   MPI_Status     recv_status,*send_status;
1342   IS             istmp;
1343 
1344   PetscFunctionBegin;
1345   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
1346   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
1347 
1348   /*  first count number of contributors to each processor */
1349   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
1350   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
1351   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
1352   for ( i=0; i<N; i++ ) {
1353     idx = rows[i];
1354     found = 0;
1355     for ( j=0; j<size; j++ ) {
1356       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
1357         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
1358       }
1359     }
1360     if (!found) SETERRQ(1,0,"Index out of range");
1361   }
1362   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
1363 
1364   /* inform other processors of number of messages and max length*/
1365   work   = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
1366   ierr   = MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
1367   nrecvs = work[rank];
1368   ierr   = MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);CHKERRQ(ierr);
1369   nmax   = work[rank];
1370   PetscFree(work);
1371 
1372   /* post receives:   */
1373   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); CHKPTRQ(rvalues);
1374   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
1375   for ( i=0; i<nrecvs; i++ ) {
1376     ierr = MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);CHKERRQ(ierr);
1377   }
1378 
1379   /* do sends:
1380       1) starts[i] gives the starting index in svalues for stuff going to
1381          the ith processor
1382   */
1383   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
1384   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
1385   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
1386   starts[0] = 0;
1387   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1388   for ( i=0; i<N; i++ ) {
1389     svalues[starts[owner[i]]++] = rows[i];
1390   }
1391   ISRestoreIndices(is,&rows);
1392 
1393   starts[0] = 0;
1394   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1395   count = 0;
1396   for ( i=0; i<size; i++ ) {
1397     if (procs[i]) {
1398       ierr = MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);CHKERRQ(ierr);
1399     }
1400   }
1401   PetscFree(starts);
1402 
1403   base = owners[rank]*bs;
1404 
1405   /*  wait on receives */
1406   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
1407   source = lens + nrecvs;
1408   count  = nrecvs; slen = 0;
1409   while (count) {
1410     ierr = MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);CHKERRQ(ierr);
1411     /* unpack receives into our local space */
1412     ierr = MPI_Get_count(&recv_status,MPI_INT,&n);CHKERRQ(ierr);
1413     source[imdex]  = recv_status.MPI_SOURCE;
1414     lens[imdex]  = n;
1415     slen += n;
1416     count--;
1417   }
1418   PetscFree(recv_waits);
1419 
1420   /* move the data into the send scatter */
1421   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
1422   count = 0;
1423   for ( i=0; i<nrecvs; i++ ) {
1424     values = rvalues + i*nmax;
1425     for ( j=0; j<lens[i]; j++ ) {
1426       lrows[count++] = values[j] - base;
1427     }
1428   }
1429   PetscFree(rvalues); PetscFree(lens);
1430   PetscFree(owner); PetscFree(nprocs);
1431 
1432   /* actually zap the local rows */
1433   ierr = ISCreateGeneral(PETSC_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
1434   PLogObjectParent(A,istmp);
1435   PetscFree(lrows);
1436   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
1437   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
1438   ierr = ISDestroy(istmp); CHKERRQ(ierr);
1439 
1440   /* wait on sends */
1441   if (nsends) {
1442     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
1443     ierr        = MPI_Waitall(nsends,send_waits,send_status);CHKERRQ(ierr);
1444     PetscFree(send_status);
1445   }
1446   PetscFree(send_waits); PetscFree(svalues);
1447 
1448   PetscFunctionReturn(0);
1449 }
1450 extern int MatPrintHelp_SeqBAIJ(Mat);
1451 #undef __FUNC__
1452 #define __FUNC__ "MatPrintHelp_MPIBAIJ"
1453 int MatPrintHelp_MPIBAIJ(Mat A)
1454 {
1455   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1456   int         ierr;
1457 
1458   PetscFunctionBegin;
1459   if (!a->rank) {
1460     ierr = MatPrintHelp_SeqBAIJ(a->A);CHKERRQ(ierr);
1461   }
1462   PetscFunctionReturn(0);
1463 }
1464 
1465 #undef __FUNC__
1466 #define __FUNC__ "MatSetUnfactored_MPIBAIJ"
1467 int MatSetUnfactored_MPIBAIJ(Mat A)
1468 {
1469   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1470   int         ierr;
1471 
1472   PetscFunctionBegin;
1473   ierr = MatSetUnfactored(a->A); CHKERRQ(ierr);
1474   PetscFunctionReturn(0);
1475 }
1476 
1477 static int MatConvertSameType_MPIBAIJ(Mat,Mat *,int);
1478 
1479 /* -------------------------------------------------------------------*/
1480 static struct _MatOps MatOps = {
1481   MatSetValues_MPIBAIJ,MatGetRow_MPIBAIJ,MatRestoreRow_MPIBAIJ,MatMult_MPIBAIJ,
1482   MatMultAdd_MPIBAIJ,MatMultTrans_MPIBAIJ,MatMultTransAdd_MPIBAIJ,0,
1483   0,0,0,0,
1484   0,0,MatTranspose_MPIBAIJ,MatGetInfo_MPIBAIJ,
1485   0,MatGetDiagonal_MPIBAIJ,MatDiagonalScale_MPIBAIJ,MatNorm_MPIBAIJ,
1486   MatAssemblyBegin_MPIBAIJ,MatAssemblyEnd_MPIBAIJ,0,MatSetOption_MPIBAIJ,
1487   MatZeroEntries_MPIBAIJ,MatZeroRows_MPIBAIJ,0,
1488   0,0,0,MatGetSize_MPIBAIJ,
1489   MatGetLocalSize_MPIBAIJ,MatGetOwnershipRange_MPIBAIJ,0,0,
1490   0,0,MatConvertSameType_MPIBAIJ,0,0,
1491   0,0,0,MatGetSubMatrices_MPIBAIJ,
1492   MatIncreaseOverlap_MPIBAIJ,MatGetValues_MPIBAIJ,0,MatPrintHelp_MPIBAIJ,
1493   MatScale_MPIBAIJ,0,0,0,MatGetBlockSize_MPIBAIJ,
1494   0,0,0,0,0,0,MatSetUnfactored_MPIBAIJ,0,MatSetValuesBlocked_MPIBAIJ};
1495 
1496 
1497 #undef __FUNC__
1498 #define __FUNC__ "MatCreateMPIBAIJ"
1499 /*@C
1500    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
1501    (block compressed row).  For good matrix assembly performance
1502    the user should preallocate the matrix storage by setting the parameters
1503    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1504    performance can be increased by more than a factor of 50.
1505 
1506    Input Parameters:
1507 .  comm - MPI communicator
1508 .  bs   - size of blockk
1509 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1510            This value should be the same as the local size used in creating the
1511            y vector for the matrix-vector product y = Ax.
1512 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1513            This value should be the same as the local size used in creating the
1514            x vector for the matrix-vector product y = Ax.
1515 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1516 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1517 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
1518            submatrix  (same for all local rows)
1519 .  d_nzz - array containing the number of block nonzeros in the various block rows
1520            of the in diagonal portion of the local (possibly different for each block
1521            row) or PETSC_NULL.  You must leave room for the diagonal entry even if
1522            it is zero.
1523 .  o_nz  - number of block nonzeros per block row in the off-diagonal portion of local
1524            submatrix (same for all local rows).
1525 .  o_nzz - array containing the number of nonzeros in the various block rows of the
1526            off-diagonal portion of the local submatrix (possibly different for
1527            each block row) or PETSC_NULL.
1528 
1529    Output Parameter:
1530 .  A - the matrix
1531 
1532    Notes:
1533    The user MUST specify either the local or global matrix dimensions
1534    (possibly both).
1535 
1536    Storage Information:
1537    For a square global matrix we define each processor's diagonal portion
1538    to be its local rows and the corresponding columns (a square submatrix);
1539    each processor's off-diagonal portion encompasses the remainder of the
1540    local matrix (a rectangular submatrix).
1541 
1542    The user can specify preallocated storage for the diagonal part of
1543    the local submatrix with either d_nz or d_nnz (not both).  Set
1544    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1545    memory allocation.  Likewise, specify preallocated storage for the
1546    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1547 
1548    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1549    the figure below we depict these three local rows and all columns (0-11).
1550 
1551 $          0 1 2 3 4 5 6 7 8 9 10 11
1552 $         -------------------
1553 $  row 3  |  o o o d d d o o o o o o
1554 $  row 4  |  o o o d d d o o o o o o
1555 $  row 5  |  o o o d d d o o o o o o
1556 $         -------------------
1557 $
1558 
1559    Thus, any entries in the d locations are stored in the d (diagonal)
1560    submatrix, and any entries in the o locations are stored in the
1561    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1562    stored simply in the MATSEQBAIJ format for compressed row storage.
1563 
1564    Now d_nz should indicate the number of block nonzeros per row in the d matrix,
1565    and o_nz should indicate the number of block nonzeros per row in the o matrix.
1566    In general, for PDE problems in which most nonzeros are near the diagonal,
1567    one expects d_nz >> o_nz.   For large problems you MUST preallocate memory
1568    or you will get TERRIBLE performance; see the users' manual chapter on
1569    matrices.
1570 
1571 .keywords: matrix, block, aij, compressed row, sparse, parallel
1572 
1573 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues()
1574 @*/
1575 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
1576                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1577 {
1578   Mat          B;
1579   Mat_MPIBAIJ  *b;
1580   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE,size;
1581 
1582   PetscFunctionBegin;
1583   if (bs < 1) SETERRQ(1,0,"Invalid block size specified, must be positive");
1584 
1585   MPI_Comm_size(comm,&size);
1586   if (size == 1) {
1587     if (M == PETSC_DECIDE) M = m;
1588     if (N == PETSC_DECIDE) N = n;
1589     ierr = MatCreateSeqBAIJ(comm,bs,M,N,d_nz,d_nnz,A); CHKERRQ(ierr);
1590     PetscFunctionReturn(0);
1591   }
1592 
1593   *A = 0;
1594   PetscHeaderCreate(B,_p_Mat,MAT_COOKIE,MATMPIBAIJ,comm,MatDestroy,MatView);
1595   PLogObjectCreate(B);
1596   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
1597   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
1598   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1599 
1600   B->destroy    = MatDestroy_MPIBAIJ;
1601   B->view       = MatView_MPIBAIJ;
1602   B->mapping    = 0;
1603   B->factor     = 0;
1604   B->assembled  = PETSC_FALSE;
1605 
1606   B->insertmode = NOT_SET_VALUES;
1607   MPI_Comm_rank(comm,&b->rank);
1608   MPI_Comm_size(comm,&b->size);
1609 
1610   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL)) {
1611     SETERRQ(1,0,"Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1612   }
1613   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) SETERRQ(1,0,"either M or m should be specified");
1614   if ( N == PETSC_DECIDE && n == PETSC_DECIDE) SETERRQ(1,0,"either N or n should be specified");
1615   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
1616   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
1617 
1618   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1619     work[0] = m; work[1] = n;
1620     mbs = m/bs; nbs = n/bs;
1621     ierr = MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );CHKERRQ(ierr);
1622     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
1623     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
1624   }
1625   if (m == PETSC_DECIDE) {
1626     Mbs = M/bs;
1627     if (Mbs*bs != M) SETERRQ(1,0,"No of global rows must be divisible by blocksize");
1628     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
1629     m   = mbs*bs;
1630   }
1631   if (n == PETSC_DECIDE) {
1632     Nbs = N/bs;
1633     if (Nbs*bs != N) SETERRQ(1,0,"No of global cols must be divisible by blocksize");
1634     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
1635     n   = nbs*bs;
1636   }
1637   if (mbs*bs != m || nbs*bs != n) SETERRQ(1,0,"No of local rows, cols must be divisible by blocksize");
1638 
1639   b->m = m; B->m = m;
1640   b->n = n; B->n = n;
1641   b->N = N; B->N = N;
1642   b->M = M; B->M = M;
1643   b->bs  = bs;
1644   b->bs2 = bs*bs;
1645   b->mbs = mbs;
1646   b->nbs = nbs;
1647   b->Mbs = Mbs;
1648   b->Nbs = Nbs;
1649 
1650   /* build local table of row and column ownerships */
1651   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1652   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIBAIJ));
1653   b->cowners = b->rowners + b->size + 2;
1654   ierr = MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1655   b->rowners[0] = 0;
1656   for ( i=2; i<=b->size; i++ ) {
1657     b->rowners[i] += b->rowners[i-1];
1658   }
1659   b->rstart    = b->rowners[b->rank];
1660   b->rend      = b->rowners[b->rank+1];
1661   b->rstart_bs = b->rstart * bs;
1662   b->rend_bs   = b->rend * bs;
1663 
1664   ierr = MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1665   b->cowners[0] = 0;
1666   for ( i=2; i<=b->size; i++ ) {
1667     b->cowners[i] += b->cowners[i-1];
1668   }
1669   b->cstart    = b->cowners[b->rank];
1670   b->cend      = b->cowners[b->rank+1];
1671   b->cstart_bs = b->cstart * bs;
1672   b->cend_bs   = b->cend * bs;
1673 
1674   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1675   ierr = MatCreateSeqBAIJ(PETSC_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1676   PLogObjectParent(B,b->A);
1677   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1678   ierr = MatCreateSeqBAIJ(PETSC_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1679   PLogObjectParent(B,b->B);
1680 
1681   /* build cache for off array entries formed */
1682   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1683   b->donotstash  = 0;
1684   b->colmap      = 0;
1685   b->garray      = 0;
1686   b->roworiented = 1;
1687 
1688   /* stuff used in block assembly */
1689   b->barray      = 0;
1690 
1691   /* stuff used for matrix vector multiply */
1692   b->lvec        = 0;
1693   b->Mvctx       = 0;
1694 
1695   /* stuff for MatGetRow() */
1696   b->rowindices   = 0;
1697   b->rowvalues    = 0;
1698   b->getrowactive = PETSC_FALSE;
1699 
1700   *A = B;
1701   PetscFunctionReturn(0);
1702 }
1703 
1704 #undef __FUNC__
1705 #define __FUNC__ "MatConvertSameType_MPIBAIJ"
1706 static int MatConvertSameType_MPIBAIJ(Mat matin,Mat *newmat,int cpvalues)
1707 {
1708   Mat         mat;
1709   Mat_MPIBAIJ *a,*oldmat = (Mat_MPIBAIJ *) matin->data;
1710   int         ierr, len=0, flg;
1711 
1712   PetscFunctionBegin;
1713   *newmat       = 0;
1714   PetscHeaderCreate(mat,_p_Mat,MAT_COOKIE,MATMPIBAIJ,matin->comm,MatDestroy,MatView);
1715   PLogObjectCreate(mat);
1716   mat->data       = (void *) (a = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(a);
1717   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1718   mat->destroy    = MatDestroy_MPIBAIJ;
1719   mat->view       = MatView_MPIBAIJ;
1720   mat->factor     = matin->factor;
1721   mat->assembled  = PETSC_TRUE;
1722 
1723   a->m = mat->m   = oldmat->m;
1724   a->n = mat->n   = oldmat->n;
1725   a->M = mat->M   = oldmat->M;
1726   a->N = mat->N   = oldmat->N;
1727 
1728   a->bs  = oldmat->bs;
1729   a->bs2 = oldmat->bs2;
1730   a->mbs = oldmat->mbs;
1731   a->nbs = oldmat->nbs;
1732   a->Mbs = oldmat->Mbs;
1733   a->Nbs = oldmat->Nbs;
1734 
1735   a->rstart       = oldmat->rstart;
1736   a->rend         = oldmat->rend;
1737   a->cstart       = oldmat->cstart;
1738   a->cend         = oldmat->cend;
1739   a->size         = oldmat->size;
1740   a->rank         = oldmat->rank;
1741   mat->insertmode = NOT_SET_VALUES;
1742   a->rowvalues    = 0;
1743   a->getrowactive = PETSC_FALSE;
1744   a->barray       = 0;
1745 
1746   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1747   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIBAIJ));
1748   a->cowners = a->rowners + a->size + 2;
1749   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1750   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1751   if (oldmat->colmap) {
1752     a->colmap = (int *) PetscMalloc((a->Nbs)*sizeof(int));CHKPTRQ(a->colmap);
1753     PLogObjectMemory(mat,(a->Nbs)*sizeof(int));
1754     PetscMemcpy(a->colmap,oldmat->colmap,(a->Nbs)*sizeof(int));
1755   } else a->colmap = 0;
1756   if (oldmat->garray && (len = ((Mat_SeqBAIJ *) (oldmat->B->data))->nbs)) {
1757     a->garray = (int *) PetscMalloc(len*sizeof(int)); CHKPTRQ(a->garray);
1758     PLogObjectMemory(mat,len*sizeof(int));
1759     PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1760   } else a->garray = 0;
1761 
1762   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1763   PLogObjectParent(mat,a->lvec);
1764   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1765   PLogObjectParent(mat,a->Mvctx);
1766   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1767   PLogObjectParent(mat,a->A);
1768   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1769   PLogObjectParent(mat,a->B);
1770   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1771   if (flg) {
1772     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1773   }
1774   *newmat = mat;
1775   PetscFunctionReturn(0);
1776 }
1777 
1778 #include "sys.h"
1779 
1780 #undef __FUNC__
1781 #define __FUNC__ "MatLoad_MPIBAIJ"
1782 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
1783 {
1784   Mat          A;
1785   int          i, nz, ierr, j,rstart, rend, fd;
1786   Scalar       *vals,*buf;
1787   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1788   MPI_Status   status;
1789   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
1790   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
1791   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,bs2,Mbs,mbs,extra_rows;
1792   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
1793   int          dcount,kmax,k,nzcount,tmp;
1794 
1795   PetscFunctionBegin;
1796   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
1797   bs2  = bs*bs;
1798 
1799   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1800   if (!rank) {
1801     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1802     ierr = PetscBinaryRead(fd,(char *)header,4,PETSC_INT); CHKERRQ(ierr);
1803     if (header[0] != MAT_COOKIE) SETERRQ(1,0,"not matrix object");
1804     if (header[3] < 0) {
1805       SETERRQ(1,1,"Matrix stored in special format on disk, cannot load as MPIBAIJ");
1806     }
1807   }
1808 
1809   ierr = MPI_Bcast(header+1,3,MPI_INT,0,comm);CHKERRQ(ierr);
1810   M = header[1]; N = header[2];
1811 
1812   if (M != N) SETERRQ(1,0,"Can only do square matrices");
1813 
1814   /*
1815      This code adds extra rows to make sure the number of rows is
1816      divisible by the blocksize
1817   */
1818   Mbs        = M/bs;
1819   extra_rows = bs - M + bs*(Mbs);
1820   if (extra_rows == bs) extra_rows = 0;
1821   else                  Mbs++;
1822   if (extra_rows &&!rank) {
1823     PLogInfo(0,"MatLoad_MPIBAIJ:Padding loaded matrix to match blocksize\n");
1824   }
1825 
1826   /* determine ownership of all rows */
1827   mbs = Mbs/size + ((Mbs % size) > rank);
1828   m   = mbs * bs;
1829   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
1830   browners = rowners + size + 1;
1831   ierr = MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1832   rowners[0] = 0;
1833   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
1834   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
1835   rstart = rowners[rank];
1836   rend   = rowners[rank+1];
1837 
1838   /* distribute row lengths to all processors */
1839   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
1840   if (!rank) {
1841     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
1842     ierr = PetscBinaryRead(fd,rowlengths,M,PETSC_INT); CHKERRQ(ierr);
1843     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
1844     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1845     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
1846     ierr = MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);CHKERRQ(ierr);
1847     PetscFree(sndcounts);
1848   } else {
1849     ierr = MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);CHKERRQ(ierr);
1850   }
1851 
1852   if (!rank) {
1853     /* calculate the number of nonzeros on each processor */
1854     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1855     PetscMemzero(procsnz,size*sizeof(int));
1856     for ( i=0; i<size; i++ ) {
1857       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
1858         procsnz[i] += rowlengths[j];
1859       }
1860     }
1861     PetscFree(rowlengths);
1862 
1863     /* determine max buffer needed and allocate it */
1864     maxnz = 0;
1865     for ( i=0; i<size; i++ ) {
1866       maxnz = PetscMax(maxnz,procsnz[i]);
1867     }
1868     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1869 
1870     /* read in my part of the matrix column indices  */
1871     nz = procsnz[0];
1872     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1873     mycols = ibuf;
1874     if (size == 1)  nz -= extra_rows;
1875     ierr = PetscBinaryRead(fd,mycols,nz,PETSC_INT); CHKERRQ(ierr);
1876     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
1877 
1878     /* read in every ones (except the last) and ship off */
1879     for ( i=1; i<size-1; i++ ) {
1880       nz   = procsnz[i];
1881       ierr = PetscBinaryRead(fd,cols,nz,PETSC_INT); CHKERRQ(ierr);
1882       ierr = MPI_Send(cols,nz,MPI_INT,i,tag,comm);CHKERRQ(ierr);
1883     }
1884     /* read in the stuff for the last proc */
1885     if ( size != 1 ) {
1886       nz   = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
1887       ierr = PetscBinaryRead(fd,cols,nz,PETSC_INT); CHKERRQ(ierr);
1888       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
1889       ierr = MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);CHKERRQ(ierr);
1890     }
1891     PetscFree(cols);
1892   } else {
1893     /* determine buffer space needed for message */
1894     nz = 0;
1895     for ( i=0; i<m; i++ ) {
1896       nz += locrowlens[i];
1897     }
1898     ibuf   = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1899     mycols = ibuf;
1900     /* receive message of column indices*/
1901     ierr = MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);CHKERRQ(ierr);
1902     ierr = MPI_Get_count(&status,MPI_INT,&maxnz);CHKERRQ(ierr);
1903     if (maxnz != nz) SETERRQ(1,0,"something is wrong with file");
1904   }
1905 
1906   /* loop over local rows, determining number of off diagonal entries */
1907   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
1908   odlens = dlens + (rend-rstart);
1909   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
1910   PetscMemzero(mask,3*Mbs*sizeof(int));
1911   masked1 = mask    + Mbs;
1912   masked2 = masked1 + Mbs;
1913   rowcount = 0; nzcount = 0;
1914   for ( i=0; i<mbs; i++ ) {
1915     dcount  = 0;
1916     odcount = 0;
1917     for ( j=0; j<bs; j++ ) {
1918       kmax = locrowlens[rowcount];
1919       for ( k=0; k<kmax; k++ ) {
1920         tmp = mycols[nzcount++]/bs;
1921         if (!mask[tmp]) {
1922           mask[tmp] = 1;
1923           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
1924           else masked1[dcount++] = tmp;
1925         }
1926       }
1927       rowcount++;
1928     }
1929 
1930     dlens[i]  = dcount;
1931     odlens[i] = odcount;
1932 
1933     /* zero out the mask elements we set */
1934     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
1935     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
1936   }
1937 
1938   /* create our matrix */
1939   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);
1940          CHKERRQ(ierr);
1941   A = *newmat;
1942   MatSetOption(A,MAT_COLUMNS_SORTED);
1943 
1944   if (!rank) {
1945     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
1946     /* read in my part of the matrix numerical values  */
1947     nz = procsnz[0];
1948     vals = buf;
1949     mycols = ibuf;
1950     if (size == 1)  nz -= extra_rows;
1951     ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
1952     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
1953 
1954     /* insert into matrix */
1955     jj      = rstart*bs;
1956     for ( i=0; i<m; i++ ) {
1957       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1958       mycols += locrowlens[i];
1959       vals   += locrowlens[i];
1960       jj++;
1961     }
1962     /* read in other processors (except the last one) and ship out */
1963     for ( i=1; i<size-1; i++ ) {
1964       nz   = procsnz[i];
1965       vals = buf;
1966       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
1967       ierr = MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);CHKERRQ(ierr);
1968     }
1969     /* the last proc */
1970     if ( size != 1 ){
1971       nz   = procsnz[i] - extra_rows;
1972       vals = buf;
1973       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
1974       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
1975       ierr = MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);CHKERRQ(ierr);
1976     }
1977     PetscFree(procsnz);
1978   } else {
1979     /* receive numeric values */
1980     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
1981 
1982     /* receive message of values*/
1983     vals   = buf;
1984     mycols = ibuf;
1985     ierr   = MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);CHKERRQ(ierr);
1986     ierr   = MPI_Get_count(&status,MPIU_SCALAR,&maxnz);CHKERRQ(ierr);
1987     if (maxnz != nz) SETERRQ(1,0,"something is wrong with file");
1988 
1989     /* insert into matrix */
1990     jj      = rstart*bs;
1991     for ( i=0; i<m; i++ ) {
1992       ierr    = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1993       mycols += locrowlens[i];
1994       vals   += locrowlens[i];
1995       jj++;
1996     }
1997   }
1998   PetscFree(locrowlens);
1999   PetscFree(buf);
2000   PetscFree(ibuf);
2001   PetscFree(rowners);
2002   PetscFree(dlens);
2003   PetscFree(mask);
2004   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2005   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2006   PetscFunctionReturn(0);
2007 }
2008 
2009 
2010