xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision 04e1fe25b050a09aca077fa0c61d9e38a6d584c0)
1 #ifdef PETSC_RCS_HEADER
2 static char vcid[] = "$Id: mpibaij.c,v 1.90 1997/11/09 03:57:58 bsmith Exp bsmith $";
3 #endif
4 
5 #include "pinclude/pviewer.h"
6 #include "src/mat/impls/baij/mpi/mpibaij.h"
7 #include "src/vec/vecimpl.h"
8 
9 
10 extern int MatSetUpMultiply_MPIBAIJ(Mat);
11 extern int DisAssemble_MPIBAIJ(Mat);
12 extern int MatIncreaseOverlap_MPIBAIJ(Mat,int,IS *,int);
13 extern int MatGetSubMatrices_MPIBAIJ(Mat,int,IS *,IS *,MatGetSubMatrixCall,Mat **);
14 
15 /*
16      Local utility routine that creates a mapping from the global column
17    number to the local number in the off-diagonal part of the local
18    storage of the matrix.  This is done in a non scable way since the
19    length of colmap equals the global matrix length.
20 */
21 #undef __FUNC__
22 #define __FUNC__ "CreateColmap_MPIBAIJ_Private"
23 static int CreateColmap_MPIBAIJ_Private(Mat mat)
24 {
25   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
26   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
27   int         nbs = B->nbs,i,bs=B->bs;;
28 
29   PetscFunctionBegin;
30   baij->colmap = (int *) PetscMalloc((baij->Nbs+1)*sizeof(int));CHKPTRQ(baij->colmap);
31   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
32   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
33   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i*bs+1;
34   PetscFunctionReturn(0);
35 }
36 
37 #define CHUNKSIZE  10
38 
39 #define  MatSetValues_SeqBAIJ_A_Private(row,col,value,addv) \
40 { \
41  \
42     brow = row/bs;  \
43     rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
44     rmax = aimax[brow]; nrow = ailen[brow]; \
45       bcol = col/bs; \
46       ridx = row % bs; cidx = col % bs; \
47       low = 0; high = nrow; \
48       while (high-low > 3) { \
49         t = (low+high)/2; \
50         if (rp[t] > bcol) high = t; \
51         else              low  = t; \
52       } \
53       for ( _i=low; _i<high; _i++ ) { \
54         if (rp[_i] > bcol) break; \
55         if (rp[_i] == bcol) { \
56           bap  = ap +  bs2*_i + bs*cidx + ridx; \
57           if (addv == ADD_VALUES) *bap += value;  \
58           else                    *bap  = value;  \
59           goto a_noinsert; \
60         } \
61       } \
62       if (a->nonew == 1) goto a_noinsert; \
63       else if (a->nonew == -1) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero into matrix"); \
64       if (nrow >= rmax) { \
65         /* there is no extra room in row, therefore enlarge */ \
66         int    new_nz = ai[a->mbs] + CHUNKSIZE,len,*new_i,*new_j; \
67         Scalar *new_a; \
68  \
69         if (a->nonew == -2) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero in the matrix"); \
70  \
71         /* malloc new storage space */ \
72         len     = new_nz*(sizeof(int)+bs2*sizeof(Scalar))+(a->mbs+1)*sizeof(int); \
73         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
74         new_j   = (int *) (new_a + bs2*new_nz); \
75         new_i   = new_j + new_nz; \
76  \
77         /* copy over old data into new slots */ \
78         for ( ii=0; ii<brow+1; ii++ ) {new_i[ii] = ai[ii];} \
79         for ( ii=brow+1; ii<a->mbs+1; ii++ ) {new_i[ii] = ai[ii]+CHUNKSIZE;} \
80         PetscMemcpy(new_j,aj,(ai[brow]+nrow)*sizeof(int)); \
81         len = (new_nz - CHUNKSIZE - ai[brow] - nrow); \
82         PetscMemcpy(new_j+ai[brow]+nrow+CHUNKSIZE,aj+ai[brow]+nrow, \
83                                                            len*sizeof(int)); \
84         PetscMemcpy(new_a,aa,(ai[brow]+nrow)*bs2*sizeof(Scalar)); \
85         PetscMemzero(new_a+bs2*(ai[brow]+nrow),bs2*CHUNKSIZE*sizeof(Scalar)); \
86         PetscMemcpy(new_a+bs2*(ai[brow]+nrow+CHUNKSIZE), \
87                     aa+bs2*(ai[brow]+nrow),bs2*len*sizeof(Scalar));  \
88         /* free up old matrix storage */ \
89         PetscFree(a->a);  \
90         if (!a->singlemalloc) {PetscFree(a->i);PetscFree(a->j);} \
91         aa = a->a = new_a; ai = a->i = new_i; aj = a->j = new_j;  \
92         a->singlemalloc = 1; \
93  \
94         rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
95         rmax = aimax[brow] = aimax[brow] + CHUNKSIZE; \
96         PLogObjectMemory(A,CHUNKSIZE*(sizeof(int) + bs2*sizeof(Scalar))); \
97         a->maxnz += bs2*CHUNKSIZE; \
98         a->reallocs++; \
99         a->nz++; \
100       } \
101       N = nrow++ - 1;  \
102       /* shift up all the later entries in this row */ \
103       for ( ii=N; ii>=_i; ii-- ) { \
104         rp[ii+1] = rp[ii]; \
105         PetscMemcpy(ap+bs2*(ii+1),ap+bs2*(ii),bs2*sizeof(Scalar)); \
106       } \
107       if (N>=_i) PetscMemzero(ap+bs2*_i,bs2*sizeof(Scalar));  \
108       rp[_i]                      = bcol;  \
109       ap[bs2*_i + bs*cidx + ridx] = value;  \
110       a_noinsert:; \
111     ailen[brow] = nrow; \
112 }
113 
114 #define  MatSetValues_SeqBAIJ_B_Private(row,col,value,addv) \
115 { \
116  \
117     brow = row/bs;  \
118     rp   = bj + bi[brow]; ap = ba + bs2*bi[brow]; \
119     rmax = bimax[brow]; nrow = bilen[brow]; \
120       bcol = col/bs; \
121       ridx = row % bs; cidx = col % bs; \
122       low = 0; high = nrow; \
123       while (high-low > 3) { \
124         t = (low+high)/2; \
125         if (rp[t] > bcol) high = t; \
126         else              low  = t; \
127       } \
128       for ( _i=low; _i<high; _i++ ) { \
129         if (rp[_i] > bcol) break; \
130         if (rp[_i] == bcol) { \
131           bap  = ap +  bs2*_i + bs*cidx + ridx; \
132           if (addv == ADD_VALUES) *bap += value;  \
133           else                    *bap  = value;  \
134           goto b_noinsert; \
135         } \
136       } \
137       if (b->nonew == 1) goto b_noinsert; \
138       else if (b->nonew == -1) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero into matrix"); \
139       if (nrow >= rmax) { \
140         /* there is no extra room in row, therefore enlarge */ \
141         int    new_nz = bi[b->mbs] + CHUNKSIZE,len,*new_i,*new_j; \
142         Scalar *new_a; \
143  \
144         if (b->nonew == -2) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero in the matrix"); \
145  \
146         /* malloc new storage space */ \
147         len     = new_nz*(sizeof(int)+bs2*sizeof(Scalar))+(b->mbs+1)*sizeof(int); \
148         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
149         new_j   = (int *) (new_a + bs2*new_nz); \
150         new_i   = new_j + new_nz; \
151  \
152         /* copy over old data into new slots */ \
153         for ( ii=0; ii<brow+1; ii++ ) {new_i[ii] = bi[ii];} \
154         for ( ii=brow+1; ii<b->mbs+1; ii++ ) {new_i[ii] = bi[ii]+CHUNKSIZE;} \
155         PetscMemcpy(new_j,bj,(bi[brow]+nrow)*sizeof(int)); \
156         len = (new_nz - CHUNKSIZE - bi[brow] - nrow); \
157         PetscMemcpy(new_j+bi[brow]+nrow+CHUNKSIZE,bj+bi[brow]+nrow, \
158                                                            len*sizeof(int)); \
159         PetscMemcpy(new_a,ba,(bi[brow]+nrow)*bs2*sizeof(Scalar)); \
160         PetscMemzero(new_a+bs2*(bi[brow]+nrow),bs2*CHUNKSIZE*sizeof(Scalar)); \
161         PetscMemcpy(new_a+bs2*(bi[brow]+nrow+CHUNKSIZE), \
162                     ba+bs2*(bi[brow]+nrow),bs2*len*sizeof(Scalar));  \
163         /* free up old matrix storage */ \
164         PetscFree(b->a);  \
165         if (!b->singlemalloc) {PetscFree(b->i);PetscFree(b->j);} \
166         ba = b->a = new_a; bi = b->i = new_i; bj = b->j = new_j;  \
167         b->singlemalloc = 1; \
168  \
169         rp   = bj + bi[brow]; ap = ba + bs2*bi[brow]; \
170         rmax = bimax[brow] = bimax[brow] + CHUNKSIZE; \
171         PLogObjectMemory(B,CHUNKSIZE*(sizeof(int) + bs2*sizeof(Scalar))); \
172         b->maxnz += bs2*CHUNKSIZE; \
173         b->reallocs++; \
174         b->nz++; \
175       } \
176       N = nrow++ - 1;  \
177       /* shift up all the later entries in this row */ \
178       for ( ii=N; ii>=_i; ii-- ) { \
179         rp[ii+1] = rp[ii]; \
180         PetscMemcpy(ap+bs2*(ii+1),ap+bs2*(ii),bs2*sizeof(Scalar)); \
181       } \
182       if (N>=_i) PetscMemzero(ap+bs2*_i,bs2*sizeof(Scalar));  \
183       rp[_i]                      = bcol;  \
184       ap[bs2*_i + bs*cidx + ridx] = value;  \
185       b_noinsert:; \
186     bilen[brow] = nrow; \
187 }
188 
189 #undef __FUNC__
190 #define __FUNC__ "MatSetValues_MPIBAIJ"
191 int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
192 {
193   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
194   Scalar      value;
195   int         ierr,i,j,row,col;
196   int         roworiented = baij->roworiented,rstart_orig=baij->rstart_bs ;
197   int         rend_orig=baij->rend_bs,cstart_orig=baij->cstart_bs;
198   int         cend_orig=baij->cend_bs,bs=baij->bs;
199 
200   /* Some Variables required in the macro */
201   Mat         A = baij->A;
202   Mat_SeqBAIJ *a = (Mat_SeqBAIJ *) (A)->data;
203   int         *aimax=a->imax,*ai=a->i,*ailen=a->ilen,*aj=a->j;
204   Scalar      *aa=a->a;
205 
206   Mat         B = baij->B;
207   Mat_SeqBAIJ *b = (Mat_SeqBAIJ *) (B)->data;
208   int         *bimax=b->imax,*bi=b->i,*bilen=b->ilen,*bj=b->j;
209   Scalar      *ba=b->a;
210 
211   int         *rp,ii,nrow,_i,rmax,N,brow,bcol;
212   int         low,high,t,ridx,cidx,bs2=a->bs2;
213   Scalar      *ap,*bap;
214 
215   PetscFunctionBegin;
216   for ( i=0; i<m; i++ ) {
217 #if defined(USE_PETSC_BOPT_g)
218     if (im[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
219     if (im[i] >= baij->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
220 #endif
221     if (im[i] >= rstart_orig && im[i] < rend_orig) {
222       row = im[i] - rstart_orig;
223       for ( j=0; j<n; j++ ) {
224         if (in[j] >= cstart_orig && in[j] < cend_orig){
225           col = in[j] - cstart_orig;
226           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
227           MatSetValues_SeqBAIJ_A_Private(row,col,value,addv);
228           /* ierr = MatSetValues_SeqBAIJ(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
229         }
230 #if defined(USE_PETSC_BOPT_g)
231         else if (in[j] < 0) {SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative column");}
232         else if (in[j] >= baij->N) {SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Col too large");}
233 #endif
234         else {
235           if (mat->was_assembled) {
236             if (!baij->colmap) {
237               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
238             }
239             col = baij->colmap[in[j]/bs] - 1 + in[j]%bs;
240             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
241               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
242               col =  in[j];
243               /* Reinitialize the variables required by MatSetValues_SeqBAIJ_B_Private() */
244               B = baij->B;
245               b = (Mat_SeqBAIJ *) (B)->data;
246               bimax=b->imax;bi=b->i;bilen=b->ilen;bj=b->j;
247               ba=b->a;
248             }
249           } else col = in[j];
250           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
251           MatSetValues_SeqBAIJ_B_Private(row,col,value,addv);
252           /* ierr = MatSetValues_SeqBAIJ(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
253         }
254       }
255     } else {
256       if (roworiented && !baij->donotstash) {
257         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
258       } else {
259         if (!baij->donotstash) {
260           row = im[i];
261 	  for ( j=0; j<n; j++ ) {
262 	    ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
263           }
264         }
265       }
266     }
267   }
268   PetscFunctionReturn(0);
269 }
270 
271 extern int MatSetValuesBlocked_SeqBAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
272 #undef __FUNC__
273 #define __FUNC__ "MatSetValuesBlocked_MPIBAIJ"
274 int MatSetValuesBlocked_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
275 {
276   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
277   Scalar      *value,*barray=baij->barray;
278   int         ierr,i,j,ii,jj,row,col,k,l;
279   int         roworiented = baij->roworiented,rstart=baij->rstart ;
280   int         rend=baij->rend,cstart=baij->cstart,stepval;
281   int         cend=baij->cend,bs=baij->bs,bs2=baij->bs2;
282 
283   if(!barray) {
284     baij->barray = barray = (Scalar*) PetscMalloc(bs2*sizeof(Scalar)); CHKPTRQ(barray);
285   }
286 
287   if (roworiented) {
288     stepval = (n-1)*bs;
289   } else {
290     stepval = (m-1)*bs;
291   }
292   for ( i=0; i<m; i++ ) {
293 #if defined(USE_PETSC_BOPT_g)
294     if (im[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
295     if (im[i] >= baij->Mbs) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
296 #endif
297     if (im[i] >= rstart && im[i] < rend) {
298       row = im[i] - rstart;
299       for ( j=0; j<n; j++ ) {
300         /* If NumCol = 1 then a copy is not required */
301         if ((roworiented) && (n == 1)) {
302           barray = v + i*bs2;
303         } else if((!roworiented) && (m == 1)) {
304           barray = v + j*bs2;
305         } else { /* Here a copy is required */
306           if (roworiented) {
307             value = v + i*(stepval+bs)*bs + j*bs;
308           } else {
309             value = v + j*(stepval+bs)*bs + i*bs;
310           }
311           for ( ii=0; ii<bs; ii++,value+=stepval ) {
312             for (jj=0; jj<bs; jj++ ) {
313               *barray++  = *value++;
314             }
315           }
316           barray -=bs2;
317         }
318 
319         if (in[j] >= cstart && in[j] < cend){
320           col  = in[j] - cstart;
321           ierr = MatSetValuesBlocked_SeqBAIJ(baij->A,1,&row,1,&col,barray,addv);CHKERRQ(ierr);
322         }
323 #if defined(USE_PETSC_BOPT_g)
324         else if (in[j] < 0) {SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative column");}
325         else if (in[j] >= baij->Nbs) {SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Column too large");}
326 #endif
327         else {
328           if (mat->was_assembled) {
329             if (!baij->colmap) {
330               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
331             }
332 
333 #if defined(USE_PETSC_BOPT_g)
334             if ((baij->colmap[in[j]] - 1) % bs) {SETERRQ(PETSC_ERR_PLIB,0,"Incorrect colmap");}
335 #endif
336             col = (baij->colmap[in[j]] - 1)/bs;
337             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
338               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
339               col =  in[j];
340             }
341           }
342           else col = in[j];
343           ierr = MatSetValuesBlocked_SeqBAIJ(baij->B,1,&row,1,&col,barray,addv);CHKERRQ(ierr);
344         }
345       }
346     } else {
347       if (!baij->donotstash) {
348         if (roworiented ) {
349           row   = im[i]*bs;
350           value = v + i*(stepval+bs)*bs;
351           for ( j=0; j<bs; j++,row++ ) {
352             for ( k=0; k<n; k++ ) {
353               for ( col=in[k]*bs,l=0; l<bs; l++,col++) {
354                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
355               }
356             }
357           }
358         } else {
359           for ( j=0; j<n; j++ ) {
360             value = v + j*(stepval+bs)*bs + i*bs;
361             col   = in[j]*bs;
362             for ( k=0; k<bs; k++,col++,value+=stepval) {
363               for ( row = im[i]*bs,l=0; l<bs; l++,row++) {
364                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
365               }
366             }
367           }
368         }
369       }
370     }
371   }
372   PetscFunctionReturn(0);
373 }
374 
375 #undef __FUNC__
376 #define __FUNC__ "MatGetValues_MPIBAIJ"
377 int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
378 {
379   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
380   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
381   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col;
382 
383   for ( i=0; i<m; i++ ) {
384     if (idxm[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
385     if (idxm[i] >= baij->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
386     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
387       row = idxm[i] - bsrstart;
388       for ( j=0; j<n; j++ ) {
389         if (idxn[j] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative column");
390         if (idxn[j] >= baij->N) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Column too large");
391         if (idxn[j] >= bscstart && idxn[j] < bscend){
392           col = idxn[j] - bscstart;
393           ierr = MatGetValues(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
394         } else {
395           if (!baij->colmap) {
396             ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
397           }
398           if((baij->colmap[idxn[j]/bs]-1 < 0) ||
399              (baij->garray[(baij->colmap[idxn[j]/bs]-1)/bs] != idxn[j]/bs)) *(v+i*n+j) = 0.0;
400           else {
401             col  = (baij->colmap[idxn[j]/bs]-1) + idxn[j]%bs;
402             ierr = MatGetValues(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
403           }
404         }
405       }
406     } else {
407       SETERRQ(PETSC_ERR_SUP,0,"Only local values currently supported");
408     }
409   }
410   PetscFunctionReturn(0);
411 }
412 
413 #undef __FUNC__
414 #define __FUNC__ "MatNorm_MPIBAIJ"
415 int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
416 {
417   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
418   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
419   int        ierr, i,bs2=baij->bs2;
420   double     sum = 0.0;
421   Scalar     *v;
422 
423   PetscFunctionBegin;
424   if (baij->size == 1) {
425     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
426   } else {
427     if (type == NORM_FROBENIUS) {
428       v = amat->a;
429       for (i=0; i<amat->nz*bs2; i++ ) {
430 #if defined(USE_PETSC_COMPLEX)
431         sum += real(conj(*v)*(*v)); v++;
432 #else
433         sum += (*v)*(*v); v++;
434 #endif
435       }
436       v = bmat->a;
437       for (i=0; i<bmat->nz*bs2; i++ ) {
438 #if defined(USE_PETSC_COMPLEX)
439         sum += real(conj(*v)*(*v)); v++;
440 #else
441         sum += (*v)*(*v); v++;
442 #endif
443       }
444       ierr = MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);CHKERRQ(ierr);
445       *norm = sqrt(*norm);
446     } else {
447       SETERRQ(PETSC_ERR_SUP,0,"No support for this norm yet");
448     }
449   }
450   PetscFunctionReturn(0);
451 }
452 
453 #undef __FUNC__
454 #define __FUNC__ "MatAssemblyBegin_MPIBAIJ"
455 int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
456 {
457   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
458   MPI_Comm    comm = mat->comm;
459   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
460   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
461   MPI_Request *send_waits,*recv_waits;
462   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
463   InsertMode  addv;
464   Scalar      *rvalues,*svalues;
465 
466   PetscFunctionBegin;
467   /* make sure all processors are either in INSERTMODE or ADDMODE */
468   ierr = MPI_Allreduce(&mat->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);CHKERRQ(ierr);
469   if (addv == (ADD_VALUES|INSERT_VALUES)) {
470     SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"Some processors inserted others added");
471   }
472   mat->insertmode = addv; /* in case this processor had no cache */
473 
474   /*  first count number of contributors to each processor */
475   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
476   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
477   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
478   for ( i=0; i<baij->stash.n; i++ ) {
479     idx = baij->stash.idx[i];
480     for ( j=0; j<size; j++ ) {
481       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
482         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
483       }
484     }
485   }
486   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
487 
488   /* inform other processors of number of messages and max length*/
489   work      = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
490   ierr      = MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
491   nreceives = work[rank];
492   ierr      = MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);CHKERRQ(ierr);
493   nmax      = work[rank];
494   PetscFree(work);
495 
496   /* post receives:
497        1) each message will consist of ordered pairs
498      (global index,value) we store the global index as a double
499      to simplify the message passing.
500        2) since we don't know how long each individual message is we
501      allocate the largest needed buffer for each receive. Potentially
502      this is a lot of wasted space.
503 
504 
505        This could be done better.
506   */
507   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));CHKPTRQ(rvalues);
508   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
509   for ( i=0; i<nreceives; i++ ) {
510     ierr = MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,comm,recv_waits+i);CHKERRQ(ierr);
511   }
512 
513   /* do sends:
514       1) starts[i] gives the starting index in svalues for stuff going to
515          the ith processor
516   */
517   svalues = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
518   send_waits = (MPI_Request *) PetscMalloc((nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
519   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
520   starts[0] = 0;
521   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
522   for ( i=0; i<baij->stash.n; i++ ) {
523     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
524     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
525     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
526   }
527   PetscFree(owner);
528   starts[0] = 0;
529   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
530   count = 0;
531   for ( i=0; i<size; i++ ) {
532     if (procs[i]) {
533       ierr = MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,comm,send_waits+count++);CHKERRQ(ierr);
534     }
535   }
536   PetscFree(starts); PetscFree(nprocs);
537 
538   /* Free cache space */
539   PLogInfo(0,"[%d]MatAssemblyBegin_MPIBAIJ:Number of off-processor values %d\n",rank,baij->stash.n);
540   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
541 
542   baij->svalues    = svalues;    baij->rvalues    = rvalues;
543   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
544   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
545   baij->rmax       = nmax;
546 
547   PetscFunctionReturn(0);
548 }
549 #include <math.h>
550 #define HASH_KEY 0.6180339887
551 #define HASH1(size,key) ((int)((size)*fmod(((key)*HASH_KEY),1)))
552 #define HASH2(size,key) ((int)((size)*fmod(((key+0.5)*HASH_KEY),1)))
553 
554 
555 int CreateHashTable(Mat mat,double factor)
556 {
557   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
558   Mat         A = baij->A, B=baij->B;
559   Mat_SeqBAIJ *a=(Mat_SeqBAIJ *)A->data, *b=(Mat_SeqBAIJ *)B->data;
560   int         i,j,k,nz=a->nz+b->nz,h1,h2,*ai=a->i,*aj=a->j,*bi=b->i,*bj=b->j;
561   int         size=(int)(factor*nz),ct=0,max1=0,max2=0;
562   /* Scalar      *aa=a->a,*ba=b->a; */
563   double      key;
564   static double *HT;
565   static      int flag=1;
566   extern int PetscGlobalRank;
567 
568   PetscFunctionBegin;
569   flag = 1;
570   /* Allocate Memory for Hash Table */
571   if (flag) {
572     HT = (double*)PetscMalloc(size*sizeof(double));
573     flag = 0;
574   }
575   PetscMemzero(HT,size*sizeof(double));
576 
577   /* Loop Over A */
578   for ( i=0; i<a->n; i++ ) {
579     for ( j=ai[i]; j<ai[i+1]; j++ ) {
580       key = i*baij->n+aj[j]+1;
581       h1  = HASH1(size,key);
582       h2  = HASH2(size,key);
583 
584       for ( k=1; k<size; k++ ){
585         if (HT[(h1*k)%size] == 0.0) {
586           HT[(h1*k)%size] = key;
587           break;
588         } else ct++;
589       }
590       if (k> max1) max1 =k;
591     }
592   }
593   /* Loop Over B */
594   for ( i=0; i<b->n; i++ ) {
595     for ( j=bi[i]; j<bi[i+1]; j++ ) {
596       key = i*b->n+bj[j]+1;
597       h1  = HASH1(size,key);
598       for ( k=1; k<size; k++ ){
599         if (HT[(h1*k)%size] == 0.0) {
600           HT[(h1*k)%size] = key;
601           break;
602         } else ct++;
603       }
604       if (k> max2) max2 =k;
605     }
606   }
607 
608   /* Print Summary */
609   for ( i=0,key=0.0,j=0; i<size; i++)
610     if (HT[i]) {j++;}
611 
612   printf("[%d] fact = %3.2f max1 = %5d max2 = %5d Size %5d - Searches %5d Avg %5.2f Keys %5d\n",
613          PetscGlobalRank,factor,max1,max2,size,ct+j,((float)ct+j)/j,j);
614   PetscFree(HT);
615   PetscFunctionReturn(0);
616 }
617 
618 #undef __FUNC__
619 #define __FUNC__ "MatAssemblyEnd_MPIBAIJ"
620 int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
621 {
622   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
623   MPI_Status  *send_status,recv_status;
624   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
625   int         bs=baij->bs,row,col,other_disassembled,flg;
626   Scalar      *values,val;
627   InsertMode  addv = mat->insertmode;
628 
629   PetscFunctionBegin;
630   /*  wait on receives */
631   while (count) {
632     ierr = MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);CHKERRQ(ierr);
633     /* unpack receives into our local space */
634     values = baij->rvalues + 3*imdex*baij->rmax;
635     ierr = MPI_Get_count(&recv_status,MPIU_SCALAR,&n);CHKERRQ(ierr);
636     n = n/3;
637     for ( i=0; i<n; i++ ) {
638       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
639       col = (int) PetscReal(values[3*i+1]);
640       val = values[3*i+2];
641       if (col >= baij->cstart*bs && col < baij->cend*bs) {
642         col -= baij->cstart*bs;
643         ierr = MatSetValues(baij->A,1,&row,1,&col,&val,addv); CHKERRQ(ierr)
644       } else {
645         if (mat->was_assembled) {
646           if (!baij->colmap) {
647             ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);
648           }
649           col = (baij->colmap[col/bs]) - 1 + col%bs;
650           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
651             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
652             col = (int) PetscReal(values[3*i+1]);
653           }
654         }
655         ierr = MatSetValues(baij->B,1,&row,1,&col,&val,addv); CHKERRQ(ierr)
656       }
657     }
658     count--;
659   }
660   PetscFree(baij->recv_waits); PetscFree(baij->rvalues);
661 
662   /* wait on sends */
663   if (baij->nsends) {
664     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
665     ierr        = MPI_Waitall(baij->nsends,baij->send_waits,send_status);CHKERRQ(ierr);
666     PetscFree(send_status);
667   }
668   PetscFree(baij->send_waits); PetscFree(baij->svalues);
669 
670   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
671   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
672 
673   /* determine if any processor has disassembled, if so we must
674      also disassemble ourselfs, in order that we may reassemble. */
675   ierr = MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);CHKERRQ(ierr);
676   if (mat->was_assembled && !other_disassembled) {
677     ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
678   }
679 
680   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
681     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
682   }
683   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
684   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
685 
686   ierr = OptionsHasName(PETSC_NULL,"-use_hash",&flg); CHKERRQ(ierr);
687   if (flg) {
688     double fact;
689     for ( fact=1.2; fact<2.0; fact +=0.05) CreateHashTable(mat,fact);
690   }
691   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
692   PetscFunctionReturn(0);
693 }
694 
695 #undef __FUNC__
696 #define __FUNC__ "MatView_MPIBAIJ_Binary"
697 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
698 {
699   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
700   int          ierr;
701 
702   PetscFunctionBegin;
703   if (baij->size == 1) {
704     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
705   } else SETERRQ(PETSC_ERR_SUP,0,"Only uniprocessor output supported");
706   PetscFunctionReturn(0);
707 }
708 
709 #undef __FUNC__
710 #define __FUNC__ "MatView_MPIBAIJ_ASCIIorDraworMatlab"
711 static int MatView_MPIBAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
712 {
713   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
714   int          ierr, format,rank,bs = baij->bs;
715   FILE         *fd;
716   ViewerType   vtype;
717 
718   PetscFunctionBegin;
719   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
720   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
721     ierr = ViewerGetFormat(viewer,&format);
722     if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
723       MatInfo info;
724       MPI_Comm_rank(mat->comm,&rank);
725       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
726       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
727       PetscSequentialPhaseBegin(mat->comm,1);
728       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
729               rank,baij->m,(int)info.nz_used*bs,(int)info.nz_allocated*bs,
730               baij->bs,(int)info.memory);
731       ierr = MatGetInfo(baij->A,MAT_LOCAL,&info);
732       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
733       ierr = MatGetInfo(baij->B,MAT_LOCAL,&info);
734       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
735       fflush(fd);
736       PetscSequentialPhaseEnd(mat->comm,1);
737       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
738       PetscFunctionReturn(0);
739     } else if (format == VIEWER_FORMAT_ASCII_INFO) {
740       PetscPrintf(mat->comm,"  block size is %d\n",bs);
741       PetscFunctionReturn(0);
742     }
743   }
744 
745   if (vtype == DRAW_VIEWER) {
746     Draw       draw;
747     PetscTruth isnull;
748     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
749     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) PetscFunctionReturn(0);
750   }
751 
752   if (vtype == ASCII_FILE_VIEWER) {
753     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
754     PetscSequentialPhaseBegin(mat->comm,1);
755     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
756            baij->rank,baij->m,baij->rstart*bs,baij->rend*bs,baij->n,
757             baij->cstart*bs,baij->cend*bs);
758     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
759     ierr = MatView(baij->B,viewer); CHKERRQ(ierr);
760     fflush(fd);
761     PetscSequentialPhaseEnd(mat->comm,1);
762   } else {
763     int size = baij->size;
764     rank = baij->rank;
765     if (size == 1) {
766       ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
767     } else {
768       /* assemble the entire matrix onto first processor. */
769       Mat         A;
770       Mat_SeqBAIJ *Aloc;
771       int         M = baij->M, N = baij->N,*ai,*aj,row,col,i,j,k,*rvals;
772       int         mbs=baij->mbs;
773       Scalar      *a;
774 
775       if (!rank) {
776         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);CHKERRQ(ierr);
777       } else {
778         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);CHKERRQ(ierr);
779       }
780       PLogObjectParent(mat,A);
781 
782       /* copy over the A part */
783       Aloc = (Mat_SeqBAIJ*) baij->A->data;
784       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
785       row = baij->rstart;
786       rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
787 
788       for ( i=0; i<mbs; i++ ) {
789         rvals[0] = bs*(baij->rstart + i);
790         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
791         for ( j=ai[i]; j<ai[i+1]; j++ ) {
792           col = (baij->cstart+aj[j])*bs;
793           for (k=0; k<bs; k++ ) {
794             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
795             col++; a += bs;
796           }
797         }
798       }
799       /* copy over the B part */
800       Aloc = (Mat_SeqBAIJ*) baij->B->data;
801       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
802       row = baij->rstart*bs;
803       for ( i=0; i<mbs; i++ ) {
804         rvals[0] = bs*(baij->rstart + i);
805         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
806         for ( j=ai[i]; j<ai[i+1]; j++ ) {
807           col = baij->garray[aj[j]]*bs;
808           for (k=0; k<bs; k++ ) {
809             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
810             col++; a += bs;
811           }
812         }
813       }
814       PetscFree(rvals);
815       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
816       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
817       /*
818          Everyone has to call to draw the matrix since the graphics waits are
819          synchronized across all processors that share the Draw object
820       */
821       if (!rank || vtype == DRAW_VIEWER) {
822         ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
823       }
824       ierr = MatDestroy(A); CHKERRQ(ierr);
825     }
826   }
827   PetscFunctionReturn(0);
828 }
829 
830 
831 
832 #undef __FUNC__
833 #define __FUNC__ "MatView_MPIBAIJ"
834 int MatView_MPIBAIJ(PetscObject obj,Viewer viewer)
835 {
836   Mat         mat = (Mat) obj;
837   int         ierr;
838   ViewerType  vtype;
839 
840   PetscFunctionBegin;
841   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
842   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
843       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
844     ierr = MatView_MPIBAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
845   } else if (vtype == BINARY_FILE_VIEWER) {
846     ierr = MatView_MPIBAIJ_Binary(mat,viewer);CHKERRQ(ierr);
847   }
848   PetscFunctionReturn(0);
849 }
850 
851 #undef __FUNC__
852 #define __FUNC__ "MatDestroy_MPIBAIJ"
853 int MatDestroy_MPIBAIJ(PetscObject obj)
854 {
855   Mat         mat = (Mat) obj;
856   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
857   int         ierr;
858 
859   PetscFunctionBegin;
860 #if defined(USE_PETSC_LOG)
861   PLogObjectState(obj,"Rows=%d, Cols=%d",baij->M,baij->N);
862 #endif
863 
864   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
865   PetscFree(baij->rowners);
866   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
867   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
868   if (baij->colmap) PetscFree(baij->colmap);
869   if (baij->garray) PetscFree(baij->garray);
870   if (baij->lvec)   VecDestroy(baij->lvec);
871   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
872   if (baij->rowvalues) PetscFree(baij->rowvalues);
873   if (baij->barray) PetscFree(baij->barray);
874   PetscFree(baij);
875   PLogObjectDestroy(mat);
876   PetscHeaderDestroy(mat);
877   PetscFunctionReturn(0);
878 }
879 
880 #undef __FUNC__
881 #define __FUNC__ "MatMult_MPIBAIJ"
882 int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
883 {
884   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
885   int         ierr, nt;
886 
887   PetscFunctionBegin;
888   VecGetLocalSize_Fast(xx,nt);
889   if (nt != a->n) {
890     SETERRQ(PETSC_ERR_ARG_SIZ,0,"Incompatible partition of A and xx");
891   }
892   VecGetLocalSize_Fast(yy,nt);
893   if (nt != a->m) {
894     SETERRQ(PETSC_ERR_ARG_SIZ,0,"Incompatible parition of A and yy");
895   }
896   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
897   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
898   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
899   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
900   ierr = VecScatterPostRecvs(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
901   PetscFunctionReturn(0);
902 }
903 
904 #undef __FUNC__
905 #define __FUNC__ "MatMultAdd_MPIBAIJ"
906 int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
907 {
908   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
909   int        ierr;
910 
911   PetscFunctionBegin;
912   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
913   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
914   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
915   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
916   PetscFunctionReturn(0);
917 }
918 
919 #undef __FUNC__
920 #define __FUNC__ "MatMultTrans_MPIBAIJ"
921 int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
922 {
923   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
924   int         ierr;
925 
926   PetscFunctionBegin;
927   /* do nondiagonal part */
928   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
929   /* send it on its way */
930   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
931   /* do local part */
932   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
933   /* receive remote parts: note this assumes the values are not actually */
934   /* inserted in yy until the next line, which is true for my implementation*/
935   /* but is not perhaps always true. */
936   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
937   PetscFunctionReturn(0);
938 }
939 
940 #undef __FUNC__
941 #define __FUNC__ "MatMultTransAdd_MPIBAIJ"
942 int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
943 {
944   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
945   int         ierr;
946 
947   PetscFunctionBegin;
948   /* do nondiagonal part */
949   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
950   /* send it on its way */
951   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
952   /* do local part */
953   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
954   /* receive remote parts: note this assumes the values are not actually */
955   /* inserted in yy until the next line, which is true for my implementation*/
956   /* but is not perhaps always true. */
957   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
958   PetscFunctionReturn(0);
959 }
960 
961 /*
962   This only works correctly for square matrices where the subblock A->A is the
963    diagonal block
964 */
965 #undef __FUNC__
966 #define __FUNC__ "MatGetDiagonal_MPIBAIJ"
967 int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
968 {
969   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
970   int         ierr;
971 
972   PetscFunctionBegin;
973   if (a->M != a->N) SETERRQ(PETSC_ERR_SUP,0,"Supports only square matrix where A->A is diag block");
974   ierr = MatGetDiagonal(a->A,v);CHKERRQ(ierr);
975   PetscFunctionReturn(0);
976 }
977 
978 #undef __FUNC__
979 #define __FUNC__ "MatScale_MPIBAIJ"
980 int MatScale_MPIBAIJ(Scalar *aa,Mat A)
981 {
982   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
983   int         ierr;
984 
985   PetscFunctionBegin;
986   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
987   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
988   PetscFunctionReturn(0);
989 }
990 
991 #undef __FUNC__
992 #define __FUNC__ "MatGetSize_MPIBAIJ"
993 int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
994 {
995   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
996 
997   PetscFunctionBegin;
998   if (m) *m = mat->M;
999   if (n) *n = mat->N;
1000   PetscFunctionReturn(0);
1001 }
1002 
1003 #undef __FUNC__
1004 #define __FUNC__ "MatGetLocalSize_MPIBAIJ"
1005 int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
1006 {
1007   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1008 
1009   PetscFunctionBegin;
1010   *m = mat->m; *n = mat->N;
1011   PetscFunctionReturn(0);
1012 }
1013 
1014 #undef __FUNC__
1015 #define __FUNC__ "MatGetOwnershipRange_MPIBAIJ"
1016 int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
1017 {
1018   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1019 
1020   PetscFunctionBegin;
1021   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
1022   PetscFunctionReturn(0);
1023 }
1024 
1025 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
1026 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
1027 
1028 #undef __FUNC__
1029 #define __FUNC__ "MatGetRow_MPIBAIJ"
1030 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
1031 {
1032   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1033   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
1034   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
1035   int        nztot, nzA, nzB, lrow, brstart = mat->rstart*bs, brend = mat->rend*bs;
1036   int        *cmap, *idx_p,cstart = mat->cstart;
1037 
1038   PetscFunctionBegin;
1039   if (mat->getrowactive == PETSC_TRUE) SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"Already active");
1040   mat->getrowactive = PETSC_TRUE;
1041 
1042   if (!mat->rowvalues && (idx || v)) {
1043     /*
1044         allocate enough space to hold information from the longest row.
1045     */
1046     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
1047     int     max = 1,mbs = mat->mbs,tmp;
1048     for ( i=0; i<mbs; i++ ) {
1049       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
1050       if (max < tmp) { max = tmp; }
1051     }
1052     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
1053     CHKPTRQ(mat->rowvalues);
1054     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
1055   }
1056 
1057   if (row < brstart || row >= brend) SETERRQ(PETSC_ERR_SUP,0,"Only local rows")
1058   lrow = row - brstart;
1059 
1060   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
1061   if (!v)   {pvA = 0; pvB = 0;}
1062   if (!idx) {pcA = 0; if (!v) pcB = 0;}
1063   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1064   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1065   nztot = nzA + nzB;
1066 
1067   cmap  = mat->garray;
1068   if (v  || idx) {
1069     if (nztot) {
1070       /* Sort by increasing column numbers, assuming A and B already sorted */
1071       int imark = -1;
1072       if (v) {
1073         *v = v_p = mat->rowvalues;
1074         for ( i=0; i<nzB; i++ ) {
1075           if (cmap[cworkB[i]/bs] < cstart)   v_p[i] = vworkB[i];
1076           else break;
1077         }
1078         imark = i;
1079         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
1080         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
1081       }
1082       if (idx) {
1083         *idx = idx_p = mat->rowindices;
1084         if (imark > -1) {
1085           for ( i=0; i<imark; i++ ) {
1086             idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs;
1087           }
1088         } else {
1089           for ( i=0; i<nzB; i++ ) {
1090             if (cmap[cworkB[i]/bs] < cstart)
1091               idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
1092             else break;
1093           }
1094           imark = i;
1095         }
1096         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart*bs + cworkA[i];
1097         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
1098       }
1099     } else {
1100       if (idx) *idx = 0;
1101       if (v)   *v   = 0;
1102     }
1103   }
1104   *nz = nztot;
1105   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1106   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1107   PetscFunctionReturn(0);
1108 }
1109 
1110 #undef __FUNC__
1111 #define __FUNC__ "MatRestoreRow_MPIBAIJ"
1112 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
1113 {
1114   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1115 
1116   PetscFunctionBegin;
1117   if (baij->getrowactive == PETSC_FALSE) {
1118     SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"MatGetRow not called");
1119   }
1120   baij->getrowactive = PETSC_FALSE;
1121   PetscFunctionReturn(0);
1122 }
1123 
1124 #undef __FUNC__
1125 #define __FUNC__ "MatGetBlockSize_MPIBAIJ"
1126 int MatGetBlockSize_MPIBAIJ(Mat mat,int *bs)
1127 {
1128   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1129 
1130   PetscFunctionBegin;
1131   *bs = baij->bs;
1132   PetscFunctionReturn(0);
1133 }
1134 
1135 #undef __FUNC__
1136 #define __FUNC__ "MatZeroEntries_MPIBAIJ"
1137 int MatZeroEntries_MPIBAIJ(Mat A)
1138 {
1139   Mat_MPIBAIJ *l = (Mat_MPIBAIJ *) A->data;
1140   int         ierr;
1141 
1142   PetscFunctionBegin;
1143   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
1144   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
1145   PetscFunctionReturn(0);
1146 }
1147 
1148 #undef __FUNC__
1149 #define __FUNC__ "MatGetInfo_MPIBAIJ"
1150 int MatGetInfo_MPIBAIJ(Mat matin,MatInfoType flag,MatInfo *info)
1151 {
1152   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) matin->data;
1153   Mat         A = a->A, B = a->B;
1154   int         ierr;
1155   double      isend[5], irecv[5];
1156 
1157   PetscFunctionBegin;
1158   info->block_size     = (double)a->bs;
1159   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
1160   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->memory;
1161   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
1162   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->memory;
1163   if (flag == MAT_LOCAL) {
1164     info->nz_used      = isend[0];
1165     info->nz_allocated = isend[1];
1166     info->nz_unneeded  = isend[2];
1167     info->memory       = isend[3];
1168     info->mallocs      = isend[4];
1169   } else if (flag == MAT_GLOBAL_MAX) {
1170     ierr = MPI_Allreduce(isend,irecv,5,MPI_INT,MPI_MAX,matin->comm);CHKERRQ(ierr);
1171     info->nz_used      = irecv[0];
1172     info->nz_allocated = irecv[1];
1173     info->nz_unneeded  = irecv[2];
1174     info->memory       = irecv[3];
1175     info->mallocs      = irecv[4];
1176   } else if (flag == MAT_GLOBAL_SUM) {
1177     ierr = MPI_Allreduce(isend,irecv,5,MPI_INT,MPI_SUM,matin->comm);CHKERRQ(ierr);
1178     info->nz_used      = irecv[0];
1179     info->nz_allocated = irecv[1];
1180     info->nz_unneeded  = irecv[2];
1181     info->memory       = irecv[3];
1182     info->mallocs      = irecv[4];
1183   }
1184   info->rows_global       = (double)a->M;
1185   info->columns_global    = (double)a->N;
1186   info->rows_local        = (double)a->m;
1187   info->columns_local     = (double)a->N;
1188   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
1189   info->fill_ratio_needed = 0;
1190   info->factor_mallocs    = 0;
1191   PetscFunctionReturn(0);
1192 }
1193 
1194 #undef __FUNC__
1195 #define __FUNC__ "MatSetOption_MPIBAIJ"
1196 int MatSetOption_MPIBAIJ(Mat A,MatOption op)
1197 {
1198   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1199 
1200   PetscFunctionBegin;
1201   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
1202       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
1203       op == MAT_COLUMNS_UNSORTED ||
1204       op == MAT_COLUMNS_SORTED ||
1205       op == MAT_NEW_NONZERO_ALLOCATION_ERROR ||
1206       op == MAT_NEW_NONZERO_LOCATION_ERROR) {
1207         MatSetOption(a->A,op);
1208         MatSetOption(a->B,op);
1209   } else if (op == MAT_ROW_ORIENTED) {
1210         a->roworiented = 1;
1211         MatSetOption(a->A,op);
1212         MatSetOption(a->B,op);
1213   } else if (op == MAT_ROWS_SORTED ||
1214              op == MAT_ROWS_UNSORTED ||
1215              op == MAT_SYMMETRIC ||
1216              op == MAT_STRUCTURALLY_SYMMETRIC ||
1217              op == MAT_YES_NEW_DIAGONALS)
1218     PLogInfo(A,"Info:MatSetOption_MPIBAIJ:Option ignored\n");
1219   else if (op == MAT_COLUMN_ORIENTED) {
1220     a->roworiented = 0;
1221     MatSetOption(a->A,op);
1222     MatSetOption(a->B,op);
1223   } else if (op == MAT_IGNORE_OFF_PROC_ENTRIES) {
1224     a->donotstash = 1;
1225   } else if (op == MAT_NO_NEW_DIAGONALS) {
1226     SETERRQ(PETSC_ERR_SUP,0,"MAT_NO_NEW_DIAGONALS");
1227   } else {
1228     SETERRQ(PETSC_ERR_SUP,0,"unknown option");
1229   }
1230   PetscFunctionReturn(0);
1231 }
1232 
1233 #undef __FUNC__
1234 #define __FUNC__ "MatTranspose_MPIBAIJ("
1235 int MatTranspose_MPIBAIJ(Mat A,Mat *matout)
1236 {
1237   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) A->data;
1238   Mat_SeqBAIJ *Aloc;
1239   Mat        B;
1240   int        ierr,M=baij->M,N=baij->N,*ai,*aj,row,i,*rvals,j,k,col;
1241   int        bs=baij->bs,mbs=baij->mbs;
1242   Scalar     *a;
1243 
1244   PetscFunctionBegin;
1245   if (matout == PETSC_NULL && M != N) SETERRQ(PETSC_ERR_ARG_SIZ,0,"Square matrix only for in-place");
1246   ierr = MatCreateMPIBAIJ(A->comm,baij->bs,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);
1247   CHKERRQ(ierr);
1248 
1249   /* copy over the A part */
1250   Aloc = (Mat_SeqBAIJ*) baij->A->data;
1251   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1252   row = baij->rstart;
1253   rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
1254 
1255   for ( i=0; i<mbs; i++ ) {
1256     rvals[0] = bs*(baij->rstart + i);
1257     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1258     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1259       col = (baij->cstart+aj[j])*bs;
1260       for (k=0; k<bs; k++ ) {
1261         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1262         col++; a += bs;
1263       }
1264     }
1265   }
1266   /* copy over the B part */
1267   Aloc = (Mat_SeqBAIJ*) baij->B->data;
1268   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1269   row = baij->rstart*bs;
1270   for ( i=0; i<mbs; i++ ) {
1271     rvals[0] = bs*(baij->rstart + i);
1272     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1273     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1274       col = baij->garray[aj[j]]*bs;
1275       for (k=0; k<bs; k++ ) {
1276         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1277         col++; a += bs;
1278       }
1279     }
1280   }
1281   PetscFree(rvals);
1282   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1283   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1284 
1285   if (matout != PETSC_NULL) {
1286     *matout = B;
1287   } else {
1288     /* This isn't really an in-place transpose .... but free data structures from baij */
1289     PetscFree(baij->rowners);
1290     ierr = MatDestroy(baij->A); CHKERRQ(ierr);
1291     ierr = MatDestroy(baij->B); CHKERRQ(ierr);
1292     if (baij->colmap) PetscFree(baij->colmap);
1293     if (baij->garray) PetscFree(baij->garray);
1294     if (baij->lvec) VecDestroy(baij->lvec);
1295     if (baij->Mvctx) VecScatterDestroy(baij->Mvctx);
1296     PetscFree(baij);
1297     PetscMemcpy(A,B,sizeof(struct _p_Mat));
1298     PetscHeaderDestroy(B);
1299   }
1300   PetscFunctionReturn(0);
1301 }
1302 
1303 #undef __FUNC__
1304 #define __FUNC__ "MatDiagonalScale_MPIBAIJ"
1305 int MatDiagonalScale_MPIBAIJ(Mat A,Vec ll,Vec rr)
1306 {
1307   Mat a = ((Mat_MPIBAIJ *) A->data)->A;
1308   Mat b = ((Mat_MPIBAIJ *) A->data)->B;
1309   int ierr,s1,s2,s3;
1310 
1311   PetscFunctionBegin;
1312   if (ll)  {
1313     ierr = VecGetLocalSize(ll,&s1); CHKERRQ(ierr);
1314     ierr = MatGetLocalSize(A,&s2,&s3); CHKERRQ(ierr);
1315     if (s1!=s2) SETERRQ(PETSC_ERR_ARG_SIZ,0,"non-conforming local sizes");
1316     ierr = MatDiagonalScale(a,ll,0); CHKERRQ(ierr);
1317     ierr = MatDiagonalScale(b,ll,0); CHKERRQ(ierr);
1318   }
1319   if (rr) SETERRQ(PETSC_ERR_SUP,0,"not supported for right vector");
1320   PetscFunctionReturn(0);
1321 }
1322 
1323 /* the code does not do the diagonal entries correctly unless the
1324    matrix is square and the column and row owerships are identical.
1325    This is a BUG. The only way to fix it seems to be to access
1326    baij->A and baij->B directly and not through the MatZeroRows()
1327    routine.
1328 */
1329 #undef __FUNC__
1330 #define __FUNC__ "MatZeroRows_MPIBAIJ"
1331 int MatZeroRows_MPIBAIJ(Mat A,IS is,Scalar *diag)
1332 {
1333   Mat_MPIBAIJ    *l = (Mat_MPIBAIJ *) A->data;
1334   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
1335   int            *procs,*nprocs,j,found,idx,nsends,*work;
1336   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
1337   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
1338   int            *lens,imdex,*lrows,*values,bs=l->bs;
1339   MPI_Comm       comm = A->comm;
1340   MPI_Request    *send_waits,*recv_waits;
1341   MPI_Status     recv_status,*send_status;
1342   IS             istmp;
1343 
1344   PetscFunctionBegin;
1345   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
1346   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
1347 
1348   /*  first count number of contributors to each processor */
1349   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
1350   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
1351   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
1352   for ( i=0; i<N; i++ ) {
1353     idx = rows[i];
1354     found = 0;
1355     for ( j=0; j<size; j++ ) {
1356       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
1357         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
1358       }
1359     }
1360     if (!found) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Index out of range");
1361   }
1362   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
1363 
1364   /* inform other processors of number of messages and max length*/
1365   work   = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
1366   ierr   = MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
1367   nrecvs = work[rank];
1368   ierr   = MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);CHKERRQ(ierr);
1369   nmax   = work[rank];
1370   PetscFree(work);
1371 
1372   /* post receives:   */
1373   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); CHKPTRQ(rvalues);
1374   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
1375   for ( i=0; i<nrecvs; i++ ) {
1376     ierr = MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);CHKERRQ(ierr);
1377   }
1378 
1379   /* do sends:
1380       1) starts[i] gives the starting index in svalues for stuff going to
1381          the ith processor
1382   */
1383   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
1384   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
1385   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
1386   starts[0] = 0;
1387   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1388   for ( i=0; i<N; i++ ) {
1389     svalues[starts[owner[i]]++] = rows[i];
1390   }
1391   ISRestoreIndices(is,&rows);
1392 
1393   starts[0] = 0;
1394   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1395   count = 0;
1396   for ( i=0; i<size; i++ ) {
1397     if (procs[i]) {
1398       ierr = MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);CHKERRQ(ierr);
1399     }
1400   }
1401   PetscFree(starts);
1402 
1403   base = owners[rank]*bs;
1404 
1405   /*  wait on receives */
1406   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
1407   source = lens + nrecvs;
1408   count  = nrecvs; slen = 0;
1409   while (count) {
1410     ierr = MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);CHKERRQ(ierr);
1411     /* unpack receives into our local space */
1412     ierr = MPI_Get_count(&recv_status,MPI_INT,&n);CHKERRQ(ierr);
1413     source[imdex]  = recv_status.MPI_SOURCE;
1414     lens[imdex]  = n;
1415     slen += n;
1416     count--;
1417   }
1418   PetscFree(recv_waits);
1419 
1420   /* move the data into the send scatter */
1421   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
1422   count = 0;
1423   for ( i=0; i<nrecvs; i++ ) {
1424     values = rvalues + i*nmax;
1425     for ( j=0; j<lens[i]; j++ ) {
1426       lrows[count++] = values[j] - base;
1427     }
1428   }
1429   PetscFree(rvalues); PetscFree(lens);
1430   PetscFree(owner); PetscFree(nprocs);
1431 
1432   /* actually zap the local rows */
1433   ierr = ISCreateGeneral(PETSC_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
1434   PLogObjectParent(A,istmp);
1435   PetscFree(lrows);
1436   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
1437   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
1438   ierr = ISDestroy(istmp); CHKERRQ(ierr);
1439 
1440   /* wait on sends */
1441   if (nsends) {
1442     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
1443     ierr        = MPI_Waitall(nsends,send_waits,send_status);CHKERRQ(ierr);
1444     PetscFree(send_status);
1445   }
1446   PetscFree(send_waits); PetscFree(svalues);
1447 
1448   PetscFunctionReturn(0);
1449 }
1450 extern int MatPrintHelp_SeqBAIJ(Mat);
1451 #undef __FUNC__
1452 #define __FUNC__ "MatPrintHelp_MPIBAIJ"
1453 int MatPrintHelp_MPIBAIJ(Mat A)
1454 {
1455   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1456   int         ierr;
1457 
1458   PetscFunctionBegin;
1459   if (!a->rank) {
1460     ierr = MatPrintHelp_SeqBAIJ(a->A);CHKERRQ(ierr);
1461   }
1462   PetscFunctionReturn(0);
1463 }
1464 
1465 #undef __FUNC__
1466 #define __FUNC__ "MatSetUnfactored_MPIBAIJ"
1467 int MatSetUnfactored_MPIBAIJ(Mat A)
1468 {
1469   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1470   int         ierr;
1471 
1472   PetscFunctionBegin;
1473   ierr = MatSetUnfactored(a->A); CHKERRQ(ierr);
1474   PetscFunctionReturn(0);
1475 }
1476 
1477 static int MatConvertSameType_MPIBAIJ(Mat,Mat *,int);
1478 
1479 /* -------------------------------------------------------------------*/
1480 static struct _MatOps MatOps = {
1481   MatSetValues_MPIBAIJ,MatGetRow_MPIBAIJ,MatRestoreRow_MPIBAIJ,MatMult_MPIBAIJ,
1482   MatMultAdd_MPIBAIJ,MatMultTrans_MPIBAIJ,MatMultTransAdd_MPIBAIJ,0,
1483   0,0,0,0,
1484   0,0,MatTranspose_MPIBAIJ,MatGetInfo_MPIBAIJ,
1485   0,MatGetDiagonal_MPIBAIJ,MatDiagonalScale_MPIBAIJ,MatNorm_MPIBAIJ,
1486   MatAssemblyBegin_MPIBAIJ,MatAssemblyEnd_MPIBAIJ,0,MatSetOption_MPIBAIJ,
1487   MatZeroEntries_MPIBAIJ,MatZeroRows_MPIBAIJ,0,
1488   0,0,0,MatGetSize_MPIBAIJ,
1489   MatGetLocalSize_MPIBAIJ,MatGetOwnershipRange_MPIBAIJ,0,0,
1490   0,0,MatConvertSameType_MPIBAIJ,0,0,
1491   0,0,0,MatGetSubMatrices_MPIBAIJ,
1492   MatIncreaseOverlap_MPIBAIJ,MatGetValues_MPIBAIJ,0,MatPrintHelp_MPIBAIJ,
1493   MatScale_MPIBAIJ,0,0,0,MatGetBlockSize_MPIBAIJ,
1494   0,0,0,0,0,0,MatSetUnfactored_MPIBAIJ,0,MatSetValuesBlocked_MPIBAIJ};
1495 
1496 
1497 #undef __FUNC__
1498 #define __FUNC__ "MatCreateMPIBAIJ"
1499 /*@C
1500    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
1501    (block compressed row).  For good matrix assembly performance
1502    the user should preallocate the matrix storage by setting the parameters
1503    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1504    performance can be increased by more than a factor of 50.
1505 
1506    Input Parameters:
1507 .  comm - MPI communicator
1508 .  bs   - size of blockk
1509 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1510            This value should be the same as the local size used in creating the
1511            y vector for the matrix-vector product y = Ax.
1512 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1513            This value should be the same as the local size used in creating the
1514            x vector for the matrix-vector product y = Ax.
1515 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1516 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1517 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
1518            submatrix  (same for all local rows)
1519 .  d_nzz - array containing the number of block nonzeros in the various block rows
1520            of the in diagonal portion of the local (possibly different for each block
1521            row) or PETSC_NULL.  You must leave room for the diagonal entry even if
1522            it is zero.
1523 .  o_nz  - number of block nonzeros per block row in the off-diagonal portion of local
1524            submatrix (same for all local rows).
1525 .  o_nzz - array containing the number of nonzeros in the various block rows of the
1526            off-diagonal portion of the local submatrix (possibly different for
1527            each block row) or PETSC_NULL.
1528 
1529    Output Parameter:
1530 .  A - the matrix
1531 
1532    Notes:
1533    The user MUST specify either the local or global matrix dimensions
1534    (possibly both).
1535 
1536    Storage Information:
1537    For a square global matrix we define each processor's diagonal portion
1538    to be its local rows and the corresponding columns (a square submatrix);
1539    each processor's off-diagonal portion encompasses the remainder of the
1540    local matrix (a rectangular submatrix).
1541 
1542    The user can specify preallocated storage for the diagonal part of
1543    the local submatrix with either d_nz or d_nnz (not both).  Set
1544    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1545    memory allocation.  Likewise, specify preallocated storage for the
1546    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1547 
1548    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1549    the figure below we depict these three local rows and all columns (0-11).
1550 
1551 $          0 1 2 3 4 5 6 7 8 9 10 11
1552 $         -------------------
1553 $  row 3  |  o o o d d d o o o o o o
1554 $  row 4  |  o o o d d d o o o o o o
1555 $  row 5  |  o o o d d d o o o o o o
1556 $         -------------------
1557 $
1558 
1559    Thus, any entries in the d locations are stored in the d (diagonal)
1560    submatrix, and any entries in the o locations are stored in the
1561    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1562    stored simply in the MATSEQBAIJ format for compressed row storage.
1563 
1564    Now d_nz should indicate the number of block nonzeros per row in the d matrix,
1565    and o_nz should indicate the number of block nonzeros per row in the o matrix.
1566    In general, for PDE problems in which most nonzeros are near the diagonal,
1567    one expects d_nz >> o_nz.   For large problems you MUST preallocate memory
1568    or you will get TERRIBLE performance; see the users' manual chapter on
1569    matrices.
1570 
1571 .keywords: matrix, block, aij, compressed row, sparse, parallel
1572 
1573 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues()
1574 @*/
1575 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
1576                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1577 {
1578   Mat          B;
1579   Mat_MPIBAIJ  *b;
1580   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE,size;
1581 
1582   PetscFunctionBegin;
1583   if (bs < 1) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Invalid block size specified, must be positive");
1584 
1585   MPI_Comm_size(comm,&size);
1586   if (size == 1) {
1587     if (M == PETSC_DECIDE) M = m;
1588     if (N == PETSC_DECIDE) N = n;
1589     ierr = MatCreateSeqBAIJ(comm,bs,M,N,d_nz,d_nnz,A); CHKERRQ(ierr);
1590     PetscFunctionReturn(0);
1591   }
1592 
1593   *A = 0;
1594   PetscHeaderCreate(B,_p_Mat,MAT_COOKIE,MATMPIBAIJ,comm,MatDestroy,MatView);
1595   PLogObjectCreate(B);
1596   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
1597   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
1598   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1599 
1600   B->destroy    = MatDestroy_MPIBAIJ;
1601   B->view       = MatView_MPIBAIJ;
1602   B->mapping    = 0;
1603   B->factor     = 0;
1604   B->assembled  = PETSC_FALSE;
1605 
1606   B->insertmode = NOT_SET_VALUES;
1607   MPI_Comm_rank(comm,&b->rank);
1608   MPI_Comm_size(comm,&b->size);
1609 
1610   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL)) {
1611     SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1612   }
1613   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) {
1614     SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"either M or m should be specified");
1615   }
1616   if ( N == PETSC_DECIDE && n == PETSC_DECIDE) {
1617     SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"either N or n should be specified");
1618   }
1619   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
1620   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
1621 
1622   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1623     work[0] = m; work[1] = n;
1624     mbs = m/bs; nbs = n/bs;
1625     ierr = MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );CHKERRQ(ierr);
1626     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
1627     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
1628   }
1629   if (m == PETSC_DECIDE) {
1630     Mbs = M/bs;
1631     if (Mbs*bs != M) SETERRQ(PETSC_ERR_ARG_SIZ,0,"No of global rows must be divisible by blocksize");
1632     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
1633     m   = mbs*bs;
1634   }
1635   if (n == PETSC_DECIDE) {
1636     Nbs = N/bs;
1637     if (Nbs*bs != N) SETERRQ(PETSC_ERR_ARG_SIZ,0,"No of global cols must be divisible by blocksize");
1638     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
1639     n   = nbs*bs;
1640   }
1641   if (mbs*bs != m || nbs*bs != n) {
1642     SETERRQ(PETSC_ERR_ARG_SIZ,0,"No of local rows, cols must be divisible by blocksize");
1643   }
1644 
1645   b->m = m; B->m = m;
1646   b->n = n; B->n = n;
1647   b->N = N; B->N = N;
1648   b->M = M; B->M = M;
1649   b->bs  = bs;
1650   b->bs2 = bs*bs;
1651   b->mbs = mbs;
1652   b->nbs = nbs;
1653   b->Mbs = Mbs;
1654   b->Nbs = Nbs;
1655 
1656   /* build local table of row and column ownerships */
1657   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1658   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIBAIJ));
1659   b->cowners = b->rowners + b->size + 2;
1660   ierr = MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1661   b->rowners[0] = 0;
1662   for ( i=2; i<=b->size; i++ ) {
1663     b->rowners[i] += b->rowners[i-1];
1664   }
1665   b->rstart    = b->rowners[b->rank];
1666   b->rend      = b->rowners[b->rank+1];
1667   b->rstart_bs = b->rstart * bs;
1668   b->rend_bs   = b->rend * bs;
1669 
1670   ierr = MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1671   b->cowners[0] = 0;
1672   for ( i=2; i<=b->size; i++ ) {
1673     b->cowners[i] += b->cowners[i-1];
1674   }
1675   b->cstart    = b->cowners[b->rank];
1676   b->cend      = b->cowners[b->rank+1];
1677   b->cstart_bs = b->cstart * bs;
1678   b->cend_bs   = b->cend * bs;
1679 
1680   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1681   ierr = MatCreateSeqBAIJ(PETSC_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1682   PLogObjectParent(B,b->A);
1683   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1684   ierr = MatCreateSeqBAIJ(PETSC_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1685   PLogObjectParent(B,b->B);
1686 
1687   /* build cache for off array entries formed */
1688   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1689   b->donotstash  = 0;
1690   b->colmap      = 0;
1691   b->garray      = 0;
1692   b->roworiented = 1;
1693 
1694   /* stuff used in block assembly */
1695   b->barray      = 0;
1696 
1697   /* stuff used for matrix vector multiply */
1698   b->lvec        = 0;
1699   b->Mvctx       = 0;
1700 
1701   /* stuff for MatGetRow() */
1702   b->rowindices   = 0;
1703   b->rowvalues    = 0;
1704   b->getrowactive = PETSC_FALSE;
1705 
1706   *A = B;
1707   PetscFunctionReturn(0);
1708 }
1709 
1710 #undef __FUNC__
1711 #define __FUNC__ "MatConvertSameType_MPIBAIJ"
1712 static int MatConvertSameType_MPIBAIJ(Mat matin,Mat *newmat,int cpvalues)
1713 {
1714   Mat         mat;
1715   Mat_MPIBAIJ *a,*oldmat = (Mat_MPIBAIJ *) matin->data;
1716   int         ierr, len=0, flg;
1717 
1718   PetscFunctionBegin;
1719   *newmat       = 0;
1720   PetscHeaderCreate(mat,_p_Mat,MAT_COOKIE,MATMPIBAIJ,matin->comm,MatDestroy,MatView);
1721   PLogObjectCreate(mat);
1722   mat->data       = (void *) (a = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(a);
1723   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1724   mat->destroy    = MatDestroy_MPIBAIJ;
1725   mat->view       = MatView_MPIBAIJ;
1726   mat->factor     = matin->factor;
1727   mat->assembled  = PETSC_TRUE;
1728 
1729   a->m = mat->m   = oldmat->m;
1730   a->n = mat->n   = oldmat->n;
1731   a->M = mat->M   = oldmat->M;
1732   a->N = mat->N   = oldmat->N;
1733 
1734   a->bs  = oldmat->bs;
1735   a->bs2 = oldmat->bs2;
1736   a->mbs = oldmat->mbs;
1737   a->nbs = oldmat->nbs;
1738   a->Mbs = oldmat->Mbs;
1739   a->Nbs = oldmat->Nbs;
1740 
1741   a->rstart       = oldmat->rstart;
1742   a->rend         = oldmat->rend;
1743   a->cstart       = oldmat->cstart;
1744   a->cend         = oldmat->cend;
1745   a->size         = oldmat->size;
1746   a->rank         = oldmat->rank;
1747   mat->insertmode = NOT_SET_VALUES;
1748   a->rowvalues    = 0;
1749   a->getrowactive = PETSC_FALSE;
1750   a->barray       = 0;
1751 
1752   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1753   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIBAIJ));
1754   a->cowners = a->rowners + a->size + 2;
1755   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1756   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1757   if (oldmat->colmap) {
1758     a->colmap = (int *) PetscMalloc((a->Nbs)*sizeof(int));CHKPTRQ(a->colmap);
1759     PLogObjectMemory(mat,(a->Nbs)*sizeof(int));
1760     PetscMemcpy(a->colmap,oldmat->colmap,(a->Nbs)*sizeof(int));
1761   } else a->colmap = 0;
1762   if (oldmat->garray && (len = ((Mat_SeqBAIJ *) (oldmat->B->data))->nbs)) {
1763     a->garray = (int *) PetscMalloc(len*sizeof(int)); CHKPTRQ(a->garray);
1764     PLogObjectMemory(mat,len*sizeof(int));
1765     PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1766   } else a->garray = 0;
1767 
1768   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1769   PLogObjectParent(mat,a->lvec);
1770   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1771   PLogObjectParent(mat,a->Mvctx);
1772   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1773   PLogObjectParent(mat,a->A);
1774   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1775   PLogObjectParent(mat,a->B);
1776   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1777   if (flg) {
1778     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1779   }
1780   *newmat = mat;
1781   PetscFunctionReturn(0);
1782 }
1783 
1784 #include "sys.h"
1785 
1786 #undef __FUNC__
1787 #define __FUNC__ "MatLoad_MPIBAIJ"
1788 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
1789 {
1790   Mat          A;
1791   int          i, nz, ierr, j,rstart, rend, fd;
1792   Scalar       *vals,*buf;
1793   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1794   MPI_Status   status;
1795   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
1796   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
1797   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,bs2,Mbs,mbs,extra_rows;
1798   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
1799   int          dcount,kmax,k,nzcount,tmp;
1800 
1801   PetscFunctionBegin;
1802   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
1803   bs2  = bs*bs;
1804 
1805   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1806   if (!rank) {
1807     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1808     ierr = PetscBinaryRead(fd,(char *)header,4,PETSC_INT); CHKERRQ(ierr);
1809     if (header[0] != MAT_COOKIE) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"not matrix object");
1810     if (header[3] < 0) {
1811       SETERRQ(PETSC_ERR_FILE_UNEXPECTED,1,"Matrix stored in special format, cannot load as MPIBAIJ");
1812     }
1813   }
1814 
1815   ierr = MPI_Bcast(header+1,3,MPI_INT,0,comm);CHKERRQ(ierr);
1816   M = header[1]; N = header[2];
1817 
1818   if (M != N) SETERRQ(PETSC_ERR_SUP,0,"Can only do square matrices");
1819 
1820   /*
1821      This code adds extra rows to make sure the number of rows is
1822      divisible by the blocksize
1823   */
1824   Mbs        = M/bs;
1825   extra_rows = bs - M + bs*(Mbs);
1826   if (extra_rows == bs) extra_rows = 0;
1827   else                  Mbs++;
1828   if (extra_rows &&!rank) {
1829     PLogInfo(0,"MatLoad_MPIBAIJ:Padding loaded matrix to match blocksize\n");
1830   }
1831 
1832   /* determine ownership of all rows */
1833   mbs = Mbs/size + ((Mbs % size) > rank);
1834   m   = mbs * bs;
1835   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
1836   browners = rowners + size + 1;
1837   ierr = MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1838   rowners[0] = 0;
1839   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
1840   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
1841   rstart = rowners[rank];
1842   rend   = rowners[rank+1];
1843 
1844   /* distribute row lengths to all processors */
1845   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
1846   if (!rank) {
1847     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
1848     ierr = PetscBinaryRead(fd,rowlengths,M,PETSC_INT); CHKERRQ(ierr);
1849     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
1850     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1851     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
1852     ierr = MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);CHKERRQ(ierr);
1853     PetscFree(sndcounts);
1854   } else {
1855     ierr = MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);CHKERRQ(ierr);
1856   }
1857 
1858   if (!rank) {
1859     /* calculate the number of nonzeros on each processor */
1860     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1861     PetscMemzero(procsnz,size*sizeof(int));
1862     for ( i=0; i<size; i++ ) {
1863       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
1864         procsnz[i] += rowlengths[j];
1865       }
1866     }
1867     PetscFree(rowlengths);
1868 
1869     /* determine max buffer needed and allocate it */
1870     maxnz = 0;
1871     for ( i=0; i<size; i++ ) {
1872       maxnz = PetscMax(maxnz,procsnz[i]);
1873     }
1874     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1875 
1876     /* read in my part of the matrix column indices  */
1877     nz = procsnz[0];
1878     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1879     mycols = ibuf;
1880     if (size == 1)  nz -= extra_rows;
1881     ierr = PetscBinaryRead(fd,mycols,nz,PETSC_INT); CHKERRQ(ierr);
1882     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
1883 
1884     /* read in every ones (except the last) and ship off */
1885     for ( i=1; i<size-1; i++ ) {
1886       nz   = procsnz[i];
1887       ierr = PetscBinaryRead(fd,cols,nz,PETSC_INT); CHKERRQ(ierr);
1888       ierr = MPI_Send(cols,nz,MPI_INT,i,tag,comm);CHKERRQ(ierr);
1889     }
1890     /* read in the stuff for the last proc */
1891     if ( size != 1 ) {
1892       nz   = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
1893       ierr = PetscBinaryRead(fd,cols,nz,PETSC_INT); CHKERRQ(ierr);
1894       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
1895       ierr = MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);CHKERRQ(ierr);
1896     }
1897     PetscFree(cols);
1898   } else {
1899     /* determine buffer space needed for message */
1900     nz = 0;
1901     for ( i=0; i<m; i++ ) {
1902       nz += locrowlens[i];
1903     }
1904     ibuf   = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1905     mycols = ibuf;
1906     /* receive message of column indices*/
1907     ierr = MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);CHKERRQ(ierr);
1908     ierr = MPI_Get_count(&status,MPI_INT,&maxnz);CHKERRQ(ierr);
1909     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"something is wrong with file");
1910   }
1911 
1912   /* loop over local rows, determining number of off diagonal entries */
1913   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
1914   odlens = dlens + (rend-rstart);
1915   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
1916   PetscMemzero(mask,3*Mbs*sizeof(int));
1917   masked1 = mask    + Mbs;
1918   masked2 = masked1 + Mbs;
1919   rowcount = 0; nzcount = 0;
1920   for ( i=0; i<mbs; i++ ) {
1921     dcount  = 0;
1922     odcount = 0;
1923     for ( j=0; j<bs; j++ ) {
1924       kmax = locrowlens[rowcount];
1925       for ( k=0; k<kmax; k++ ) {
1926         tmp = mycols[nzcount++]/bs;
1927         if (!mask[tmp]) {
1928           mask[tmp] = 1;
1929           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
1930           else masked1[dcount++] = tmp;
1931         }
1932       }
1933       rowcount++;
1934     }
1935 
1936     dlens[i]  = dcount;
1937     odlens[i] = odcount;
1938 
1939     /* zero out the mask elements we set */
1940     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
1941     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
1942   }
1943 
1944   /* create our matrix */
1945   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);
1946          CHKERRQ(ierr);
1947   A = *newmat;
1948   MatSetOption(A,MAT_COLUMNS_SORTED);
1949 
1950   if (!rank) {
1951     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
1952     /* read in my part of the matrix numerical values  */
1953     nz = procsnz[0];
1954     vals = buf;
1955     mycols = ibuf;
1956     if (size == 1)  nz -= extra_rows;
1957     ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
1958     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
1959 
1960     /* insert into matrix */
1961     jj      = rstart*bs;
1962     for ( i=0; i<m; i++ ) {
1963       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1964       mycols += locrowlens[i];
1965       vals   += locrowlens[i];
1966       jj++;
1967     }
1968     /* read in other processors (except the last one) and ship out */
1969     for ( i=1; i<size-1; i++ ) {
1970       nz   = procsnz[i];
1971       vals = buf;
1972       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
1973       ierr = MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);CHKERRQ(ierr);
1974     }
1975     /* the last proc */
1976     if ( size != 1 ){
1977       nz   = procsnz[i] - extra_rows;
1978       vals = buf;
1979       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
1980       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
1981       ierr = MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);CHKERRQ(ierr);
1982     }
1983     PetscFree(procsnz);
1984   } else {
1985     /* receive numeric values */
1986     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
1987 
1988     /* receive message of values*/
1989     vals   = buf;
1990     mycols = ibuf;
1991     ierr   = MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);CHKERRQ(ierr);
1992     ierr   = MPI_Get_count(&status,MPIU_SCALAR,&maxnz);CHKERRQ(ierr);
1993     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"something is wrong with file");
1994 
1995     /* insert into matrix */
1996     jj      = rstart*bs;
1997     for ( i=0; i<m; i++ ) {
1998       ierr    = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1999       mycols += locrowlens[i];
2000       vals   += locrowlens[i];
2001       jj++;
2002     }
2003   }
2004   PetscFree(locrowlens);
2005   PetscFree(buf);
2006   PetscFree(ibuf);
2007   PetscFree(rowners);
2008   PetscFree(dlens);
2009   PetscFree(mask);
2010   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2011   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2012   PetscFunctionReturn(0);
2013 }
2014 
2015 
2016