xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision 5878e97499cd38905ee659f9c7703c3db33dff18)
1 #ifdef PETSC_RCS_HEADER
2 static char vcid[] = "$Id: mpibaij.c,v 1.91 1997/12/01 01:55:07 bsmith Exp balay $";
3 #endif
4 
5 #include "pinclude/pviewer.h"
6 #include "src/mat/impls/baij/mpi/mpibaij.h"
7 #include "src/vec/vecimpl.h"
8 
9 
10 extern int MatSetUpMultiply_MPIBAIJ(Mat);
11 extern int DisAssemble_MPIBAIJ(Mat);
12 extern int MatIncreaseOverlap_MPIBAIJ(Mat,int,IS *,int);
13 extern int MatGetSubMatrices_MPIBAIJ(Mat,int,IS *,IS *,MatGetSubMatrixCall,Mat **);
14 
15 /*
16      Local utility routine that creates a mapping from the global column
17    number to the local number in the off-diagonal part of the local
18    storage of the matrix.  This is done in a non scable way since the
19    length of colmap equals the global matrix length.
20 */
21 #undef __FUNC__
22 #define __FUNC__ "CreateColmap_MPIBAIJ_Private"
23 static int CreateColmap_MPIBAIJ_Private(Mat mat)
24 {
25   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
26   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
27   int         nbs = B->nbs,i,bs=B->bs;;
28 
29   PetscFunctionBegin;
30   baij->colmap = (int *) PetscMalloc((baij->Nbs+1)*sizeof(int));CHKPTRQ(baij->colmap);
31   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
32   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
33   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i*bs+1;
34   PetscFunctionReturn(0);
35 }
36 
37 #define CHUNKSIZE  10
38 
39 #define  MatSetValues_SeqBAIJ_A_Private(row,col,value,addv) \
40 { \
41  \
42     brow = row/bs;  \
43     rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
44     rmax = aimax[brow]; nrow = ailen[brow]; \
45       bcol = col/bs; \
46       ridx = row % bs; cidx = col % bs; \
47       low = 0; high = nrow; \
48       while (high-low > 3) { \
49         t = (low+high)/2; \
50         if (rp[t] > bcol) high = t; \
51         else              low  = t; \
52       } \
53       for ( _i=low; _i<high; _i++ ) { \
54         if (rp[_i] > bcol) break; \
55         if (rp[_i] == bcol) { \
56           bap  = ap +  bs2*_i + bs*cidx + ridx; \
57           if (addv == ADD_VALUES) *bap += value;  \
58           else                    *bap  = value;  \
59           goto a_noinsert; \
60         } \
61       } \
62       if (a->nonew == 1) goto a_noinsert; \
63       else if (a->nonew == -1) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero into matrix"); \
64       if (nrow >= rmax) { \
65         /* there is no extra room in row, therefore enlarge */ \
66         int    new_nz = ai[a->mbs] + CHUNKSIZE,len,*new_i,*new_j; \
67         Scalar *new_a; \
68  \
69         if (a->nonew == -2) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero in the matrix"); \
70  \
71         /* malloc new storage space */ \
72         len     = new_nz*(sizeof(int)+bs2*sizeof(Scalar))+(a->mbs+1)*sizeof(int); \
73         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
74         new_j   = (int *) (new_a + bs2*new_nz); \
75         new_i   = new_j + new_nz; \
76  \
77         /* copy over old data into new slots */ \
78         for ( ii=0; ii<brow+1; ii++ ) {new_i[ii] = ai[ii];} \
79         for ( ii=brow+1; ii<a->mbs+1; ii++ ) {new_i[ii] = ai[ii]+CHUNKSIZE;} \
80         PetscMemcpy(new_j,aj,(ai[brow]+nrow)*sizeof(int)); \
81         len = (new_nz - CHUNKSIZE - ai[brow] - nrow); \
82         PetscMemcpy(new_j+ai[brow]+nrow+CHUNKSIZE,aj+ai[brow]+nrow, \
83                                                            len*sizeof(int)); \
84         PetscMemcpy(new_a,aa,(ai[brow]+nrow)*bs2*sizeof(Scalar)); \
85         PetscMemzero(new_a+bs2*(ai[brow]+nrow),bs2*CHUNKSIZE*sizeof(Scalar)); \
86         PetscMemcpy(new_a+bs2*(ai[brow]+nrow+CHUNKSIZE), \
87                     aa+bs2*(ai[brow]+nrow),bs2*len*sizeof(Scalar));  \
88         /* free up old matrix storage */ \
89         PetscFree(a->a);  \
90         if (!a->singlemalloc) {PetscFree(a->i);PetscFree(a->j);} \
91         aa = a->a = new_a; ai = a->i = new_i; aj = a->j = new_j;  \
92         a->singlemalloc = 1; \
93  \
94         rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
95         rmax = aimax[brow] = aimax[brow] + CHUNKSIZE; \
96         PLogObjectMemory(A,CHUNKSIZE*(sizeof(int) + bs2*sizeof(Scalar))); \
97         a->maxnz += bs2*CHUNKSIZE; \
98         a->reallocs++; \
99         a->nz++; \
100       } \
101       N = nrow++ - 1;  \
102       /* shift up all the later entries in this row */ \
103       for ( ii=N; ii>=_i; ii-- ) { \
104         rp[ii+1] = rp[ii]; \
105         PetscMemcpy(ap+bs2*(ii+1),ap+bs2*(ii),bs2*sizeof(Scalar)); \
106       } \
107       if (N>=_i) PetscMemzero(ap+bs2*_i,bs2*sizeof(Scalar));  \
108       rp[_i]                      = bcol;  \
109       ap[bs2*_i + bs*cidx + ridx] = value;  \
110       a_noinsert:; \
111     ailen[brow] = nrow; \
112 }
113 
114 #define  MatSetValues_SeqBAIJ_B_Private(row,col,value,addv) \
115 { \
116  \
117     brow = row/bs;  \
118     rp   = bj + bi[brow]; ap = ba + bs2*bi[brow]; \
119     rmax = bimax[brow]; nrow = bilen[brow]; \
120       bcol = col/bs; \
121       ridx = row % bs; cidx = col % bs; \
122       low = 0; high = nrow; \
123       while (high-low > 3) { \
124         t = (low+high)/2; \
125         if (rp[t] > bcol) high = t; \
126         else              low  = t; \
127       } \
128       for ( _i=low; _i<high; _i++ ) { \
129         if (rp[_i] > bcol) break; \
130         if (rp[_i] == bcol) { \
131           bap  = ap +  bs2*_i + bs*cidx + ridx; \
132           if (addv == ADD_VALUES) *bap += value;  \
133           else                    *bap  = value;  \
134           goto b_noinsert; \
135         } \
136       } \
137       if (b->nonew == 1) goto b_noinsert; \
138       else if (b->nonew == -1) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero into matrix"); \
139       if (nrow >= rmax) { \
140         /* there is no extra room in row, therefore enlarge */ \
141         int    new_nz = bi[b->mbs] + CHUNKSIZE,len,*new_i,*new_j; \
142         Scalar *new_a; \
143  \
144         if (b->nonew == -2) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero in the matrix"); \
145  \
146         /* malloc new storage space */ \
147         len     = new_nz*(sizeof(int)+bs2*sizeof(Scalar))+(b->mbs+1)*sizeof(int); \
148         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
149         new_j   = (int *) (new_a + bs2*new_nz); \
150         new_i   = new_j + new_nz; \
151  \
152         /* copy over old data into new slots */ \
153         for ( ii=0; ii<brow+1; ii++ ) {new_i[ii] = bi[ii];} \
154         for ( ii=brow+1; ii<b->mbs+1; ii++ ) {new_i[ii] = bi[ii]+CHUNKSIZE;} \
155         PetscMemcpy(new_j,bj,(bi[brow]+nrow)*sizeof(int)); \
156         len = (new_nz - CHUNKSIZE - bi[brow] - nrow); \
157         PetscMemcpy(new_j+bi[brow]+nrow+CHUNKSIZE,bj+bi[brow]+nrow, \
158                                                            len*sizeof(int)); \
159         PetscMemcpy(new_a,ba,(bi[brow]+nrow)*bs2*sizeof(Scalar)); \
160         PetscMemzero(new_a+bs2*(bi[brow]+nrow),bs2*CHUNKSIZE*sizeof(Scalar)); \
161         PetscMemcpy(new_a+bs2*(bi[brow]+nrow+CHUNKSIZE), \
162                     ba+bs2*(bi[brow]+nrow),bs2*len*sizeof(Scalar));  \
163         /* free up old matrix storage */ \
164         PetscFree(b->a);  \
165         if (!b->singlemalloc) {PetscFree(b->i);PetscFree(b->j);} \
166         ba = b->a = new_a; bi = b->i = new_i; bj = b->j = new_j;  \
167         b->singlemalloc = 1; \
168  \
169         rp   = bj + bi[brow]; ap = ba + bs2*bi[brow]; \
170         rmax = bimax[brow] = bimax[brow] + CHUNKSIZE; \
171         PLogObjectMemory(B,CHUNKSIZE*(sizeof(int) + bs2*sizeof(Scalar))); \
172         b->maxnz += bs2*CHUNKSIZE; \
173         b->reallocs++; \
174         b->nz++; \
175       } \
176       N = nrow++ - 1;  \
177       /* shift up all the later entries in this row */ \
178       for ( ii=N; ii>=_i; ii-- ) { \
179         rp[ii+1] = rp[ii]; \
180         PetscMemcpy(ap+bs2*(ii+1),ap+bs2*(ii),bs2*sizeof(Scalar)); \
181       } \
182       if (N>=_i) PetscMemzero(ap+bs2*_i,bs2*sizeof(Scalar));  \
183       rp[_i]                      = bcol;  \
184       ap[bs2*_i + bs*cidx + ridx] = value;  \
185       b_noinsert:; \
186     bilen[brow] = nrow; \
187 }
188 
189 #undef __FUNC__
190 #define __FUNC__ "MatSetValues_MPIBAIJ"
191 int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
192 {
193   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
194   Scalar      value;
195   int         ierr,i,j,row,col;
196   int         roworiented = baij->roworiented,rstart_orig=baij->rstart_bs ;
197   int         rend_orig=baij->rend_bs,cstart_orig=baij->cstart_bs;
198   int         cend_orig=baij->cend_bs,bs=baij->bs;
199 
200   /* Some Variables required in the macro */
201   Mat         A = baij->A;
202   Mat_SeqBAIJ *a = (Mat_SeqBAIJ *) (A)->data;
203   int         *aimax=a->imax,*ai=a->i,*ailen=a->ilen,*aj=a->j;
204   Scalar      *aa=a->a;
205 
206   Mat         B = baij->B;
207   Mat_SeqBAIJ *b = (Mat_SeqBAIJ *) (B)->data;
208   int         *bimax=b->imax,*bi=b->i,*bilen=b->ilen,*bj=b->j;
209   Scalar      *ba=b->a;
210 
211   int         *rp,ii,nrow,_i,rmax,N,brow,bcol;
212   int         low,high,t,ridx,cidx,bs2=a->bs2;
213   Scalar      *ap,*bap;
214 
215   PetscFunctionBegin;
216   for ( i=0; i<m; i++ ) {
217 #if defined(USE_PETSC_BOPT_g)
218     if (im[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
219     if (im[i] >= baij->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
220 #endif
221     if (im[i] >= rstart_orig && im[i] < rend_orig) {
222       row = im[i] - rstart_orig;
223       for ( j=0; j<n; j++ ) {
224         if (in[j] >= cstart_orig && in[j] < cend_orig){
225           col = in[j] - cstart_orig;
226           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
227           MatSetValues_SeqBAIJ_A_Private(row,col,value,addv);
228           /* ierr = MatSetValues_SeqBAIJ(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
229         }
230 #if defined(USE_PETSC_BOPT_g)
231         else if (in[j] < 0) {SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative column");}
232         else if (in[j] >= baij->N) {SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Col too large");}
233 #endif
234         else {
235           if (mat->was_assembled) {
236             if (!baij->colmap) {
237               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
238             }
239             col = baij->colmap[in[j]/bs] - 1 + in[j]%bs;
240             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
241               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
242               col =  in[j];
243               /* Reinitialize the variables required by MatSetValues_SeqBAIJ_B_Private() */
244               B = baij->B;
245               b = (Mat_SeqBAIJ *) (B)->data;
246               bimax=b->imax;bi=b->i;bilen=b->ilen;bj=b->j;
247               ba=b->a;
248             }
249           } else col = in[j];
250           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
251           MatSetValues_SeqBAIJ_B_Private(row,col,value,addv);
252           /* ierr = MatSetValues_SeqBAIJ(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
253         }
254       }
255     } else {
256       if (roworiented && !baij->donotstash) {
257         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
258       } else {
259         if (!baij->donotstash) {
260           row = im[i];
261 	  for ( j=0; j<n; j++ ) {
262 	    ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
263           }
264         }
265       }
266     }
267   }
268   PetscFunctionReturn(0);
269 }
270 
271 extern int MatSetValuesBlocked_SeqBAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
272 #undef __FUNC__
273 #define __FUNC__ "MatSetValuesBlocked_MPIBAIJ"
274 int MatSetValuesBlocked_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
275 {
276   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
277   Scalar      *value,*barray=baij->barray;
278   int         ierr,i,j,ii,jj,row,col,k,l;
279   int         roworiented = baij->roworiented,rstart=baij->rstart ;
280   int         rend=baij->rend,cstart=baij->cstart,stepval;
281   int         cend=baij->cend,bs=baij->bs,bs2=baij->bs2;
282 
283   if(!barray) {
284     baij->barray = barray = (Scalar*) PetscMalloc(bs2*sizeof(Scalar)); CHKPTRQ(barray);
285   }
286 
287   if (roworiented) {
288     stepval = (n-1)*bs;
289   } else {
290     stepval = (m-1)*bs;
291   }
292   for ( i=0; i<m; i++ ) {
293 #if defined(USE_PETSC_BOPT_g)
294     if (im[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
295     if (im[i] >= baij->Mbs) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
296 #endif
297     if (im[i] >= rstart && im[i] < rend) {
298       row = im[i] - rstart;
299       for ( j=0; j<n; j++ ) {
300         /* If NumCol = 1 then a copy is not required */
301         if ((roworiented) && (n == 1)) {
302           barray = v + i*bs2;
303         } else if((!roworiented) && (m == 1)) {
304           barray = v + j*bs2;
305         } else { /* Here a copy is required */
306           if (roworiented) {
307             value = v + i*(stepval+bs)*bs + j*bs;
308           } else {
309             value = v + j*(stepval+bs)*bs + i*bs;
310           }
311           for ( ii=0; ii<bs; ii++,value+=stepval ) {
312             for (jj=0; jj<bs; jj++ ) {
313               *barray++  = *value++;
314             }
315           }
316           barray -=bs2;
317         }
318 
319         if (in[j] >= cstart && in[j] < cend){
320           col  = in[j] - cstart;
321           ierr = MatSetValuesBlocked_SeqBAIJ(baij->A,1,&row,1,&col,barray,addv);CHKERRQ(ierr);
322         }
323 #if defined(USE_PETSC_BOPT_g)
324         else if (in[j] < 0) {SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative column");}
325         else if (in[j] >= baij->Nbs) {SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Column too large");}
326 #endif
327         else {
328           if (mat->was_assembled) {
329             if (!baij->colmap) {
330               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
331             }
332 
333 #if defined(USE_PETSC_BOPT_g)
334             if ((baij->colmap[in[j]] - 1) % bs) {SETERRQ(PETSC_ERR_PLIB,0,"Incorrect colmap");}
335 #endif
336             col = (baij->colmap[in[j]] - 1)/bs;
337             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
338               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
339               col =  in[j];
340             }
341           }
342           else col = in[j];
343           ierr = MatSetValuesBlocked_SeqBAIJ(baij->B,1,&row,1,&col,barray,addv);CHKERRQ(ierr);
344         }
345       }
346     } else {
347       if (!baij->donotstash) {
348         if (roworiented ) {
349           row   = im[i]*bs;
350           value = v + i*(stepval+bs)*bs;
351           for ( j=0; j<bs; j++,row++ ) {
352             for ( k=0; k<n; k++ ) {
353               for ( col=in[k]*bs,l=0; l<bs; l++,col++) {
354                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
355               }
356             }
357           }
358         } else {
359           for ( j=0; j<n; j++ ) {
360             value = v + j*(stepval+bs)*bs + i*bs;
361             col   = in[j]*bs;
362             for ( k=0; k<bs; k++,col++,value+=stepval) {
363               for ( row = im[i]*bs,l=0; l<bs; l++,row++) {
364                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
365               }
366             }
367           }
368         }
369       }
370     }
371   }
372   PetscFunctionReturn(0);
373 }
374 
375 #undef __FUNC__
376 #define __FUNC__ "MatGetValues_MPIBAIJ"
377 int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
378 {
379   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
380   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
381   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col;
382 
383   for ( i=0; i<m; i++ ) {
384     if (idxm[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
385     if (idxm[i] >= baij->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
386     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
387       row = idxm[i] - bsrstart;
388       for ( j=0; j<n; j++ ) {
389         if (idxn[j] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative column");
390         if (idxn[j] >= baij->N) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Column too large");
391         if (idxn[j] >= bscstart && idxn[j] < bscend){
392           col = idxn[j] - bscstart;
393           ierr = MatGetValues(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
394         } else {
395           if (!baij->colmap) {
396             ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
397           }
398           if((baij->colmap[idxn[j]/bs]-1 < 0) ||
399              (baij->garray[(baij->colmap[idxn[j]/bs]-1)/bs] != idxn[j]/bs)) *(v+i*n+j) = 0.0;
400           else {
401             col  = (baij->colmap[idxn[j]/bs]-1) + idxn[j]%bs;
402             ierr = MatGetValues(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
403           }
404         }
405       }
406     } else {
407       SETERRQ(PETSC_ERR_SUP,0,"Only local values currently supported");
408     }
409   }
410   PetscFunctionReturn(0);
411 }
412 
413 #undef __FUNC__
414 #define __FUNC__ "MatNorm_MPIBAIJ"
415 int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
416 {
417   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
418   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
419   int        ierr, i,bs2=baij->bs2;
420   double     sum = 0.0;
421   Scalar     *v;
422 
423   PetscFunctionBegin;
424   if (baij->size == 1) {
425     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
426   } else {
427     if (type == NORM_FROBENIUS) {
428       v = amat->a;
429       for (i=0; i<amat->nz*bs2; i++ ) {
430 #if defined(USE_PETSC_COMPLEX)
431         sum += real(conj(*v)*(*v)); v++;
432 #else
433         sum += (*v)*(*v); v++;
434 #endif
435       }
436       v = bmat->a;
437       for (i=0; i<bmat->nz*bs2; i++ ) {
438 #if defined(USE_PETSC_COMPLEX)
439         sum += real(conj(*v)*(*v)); v++;
440 #else
441         sum += (*v)*(*v); v++;
442 #endif
443       }
444       ierr = MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);CHKERRQ(ierr);
445       *norm = sqrt(*norm);
446     } else {
447       SETERRQ(PETSC_ERR_SUP,0,"No support for this norm yet");
448     }
449   }
450   PetscFunctionReturn(0);
451 }
452 
453 #undef __FUNC__
454 #define __FUNC__ "MatAssemblyBegin_MPIBAIJ"
455 int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
456 {
457   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
458   MPI_Comm    comm = mat->comm;
459   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
460   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
461   MPI_Request *send_waits,*recv_waits;
462   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
463   InsertMode  addv;
464   Scalar      *rvalues,*svalues;
465 
466   PetscFunctionBegin;
467   /* make sure all processors are either in INSERTMODE or ADDMODE */
468   ierr = MPI_Allreduce(&mat->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);CHKERRQ(ierr);
469   if (addv == (ADD_VALUES|INSERT_VALUES)) {
470     SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"Some processors inserted others added");
471   }
472   mat->insertmode = addv; /* in case this processor had no cache */
473 
474   /*  first count number of contributors to each processor */
475   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
476   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
477   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
478   for ( i=0; i<baij->stash.n; i++ ) {
479     idx = baij->stash.idx[i];
480     for ( j=0; j<size; j++ ) {
481       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
482         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
483       }
484     }
485   }
486   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
487 
488   /* inform other processors of number of messages and max length*/
489   work      = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
490   ierr      = MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
491   nreceives = work[rank];
492   ierr      = MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);CHKERRQ(ierr);
493   nmax      = work[rank];
494   PetscFree(work);
495 
496   /* post receives:
497        1) each message will consist of ordered pairs
498      (global index,value) we store the global index as a double
499      to simplify the message passing.
500        2) since we don't know how long each individual message is we
501      allocate the largest needed buffer for each receive. Potentially
502      this is a lot of wasted space.
503 
504 
505        This could be done better.
506   */
507   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));CHKPTRQ(rvalues);
508   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
509   for ( i=0; i<nreceives; i++ ) {
510     ierr = MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,comm,recv_waits+i);CHKERRQ(ierr);
511   }
512 
513   /* do sends:
514       1) starts[i] gives the starting index in svalues for stuff going to
515          the ith processor
516   */
517   svalues = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
518   send_waits = (MPI_Request *) PetscMalloc((nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
519   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
520   starts[0] = 0;
521   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
522   for ( i=0; i<baij->stash.n; i++ ) {
523     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
524     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
525     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
526   }
527   PetscFree(owner);
528   starts[0] = 0;
529   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
530   count = 0;
531   for ( i=0; i<size; i++ ) {
532     if (procs[i]) {
533       ierr = MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,comm,send_waits+count++);CHKERRQ(ierr);
534     }
535   }
536   PetscFree(starts); PetscFree(nprocs);
537 
538   /* Free cache space */
539   PLogInfo(0,"[%d]MatAssemblyBegin_MPIBAIJ:Number of off-processor values %d\n",rank,baij->stash.n);
540   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
541 
542   baij->svalues    = svalues;    baij->rvalues    = rvalues;
543   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
544   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
545   baij->rmax       = nmax;
546 
547   PetscFunctionReturn(0);
548 }
549 #include <math.h>
550 #define HASH_KEY 0.6180339887
551 #define HASH1(size,key) ((int)((size)*fmod(((key)*HASH_KEY),1)))
552 #define HASH2(size,key) ((int)((size)*fmod(((key+0.5)*HASH_KEY),1)))
553 
554 
555 int CreateHashTable(Mat mat,double factor)
556 {
557   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
558   Mat         A = baij->A, B=baij->B;
559   Mat_SeqBAIJ *a=(Mat_SeqBAIJ *)A->data, *b=(Mat_SeqBAIJ *)B->data;
560   int         i,j,k,nz=a->nz+b->nz,h1,h2,*ai=a->i,*aj=a->j,*bi=b->i,*bj=b->j;
561   int         size=(int)(factor*nz),ct=0,max1=0,max2=0;
562   double      *HT,key;
563   extern int  PetscGlobalRank;
564   /*
565      Scalar      *aa=a->a,*ba=b->a;
566      static double *HT;
567      static      int flag=1;
568      */
569 
570   PetscFunctionBegin;
571   /* Allocate Memory for Hash Table */
572   if (!baij->ht) {
573     baij->ht = (double*)PetscMalloc(size*sizeof(double)); CHKPTRQ(baij->ht);
574   }
575   HT = baij->ht;
576   PetscMemzero(HT,size*sizeof(double));
577 
578   /* Loop Over A */
579   for ( i=0; i<a->n; i++ ) {
580     for ( j=ai[i]; j<ai[i+1]; j++ ) {
581       key = i*baij->n+aj[j]+1;
582       h1  = HASH1(size,key);
583       h2  = HASH2(size,key);
584 
585       for ( k=1; k<size; k++ ){
586         if (HT[(h1*k)%size] == 0.0) {
587           HT[(h1*k)%size] = key;
588           break;
589         } else ct++;
590       }
591       if (k> max1) max1 = k;
592     }
593   }
594   /* Loop Over B */
595   for ( i=0; i<b->n; i++ ) {
596     for ( j=bi[i]; j<bi[i+1]; j++ ) {
597       key = i*b->n+bj[j]+1;
598       h1  = HASH1(size,key);
599       for ( k=1; k<size; k++ ){
600         if (HT[(h1*k)%size] == 0.0) {
601           HT[(h1*k)%size] = key;
602           break;
603         } else ct++;
604       }
605       if (k> max2) max2 = k;
606     }
607   }
608 
609   /* Print Summary */
610   for ( i=0,key=0.0,j=0; i<size; i++)
611     if (HT[i]) {j++;}
612 
613   printf("[%d] fact = %3.2f max1 = %5d max2 = %5d Size %5d - Searches %5d Avg %5.2f Keys %5d\n",
614          PetscGlobalRank,factor,max1,max2,size,ct+j,((float)ct+j)/j,j);
615   PetscFree(HT);
616   PetscFunctionReturn(0);
617 }
618 
619 #undef __FUNC__
620 #define __FUNC__ "MatAssemblyEnd_MPIBAIJ"
621 int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
622 {
623   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
624   MPI_Status  *send_status,recv_status;
625   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
626   int         bs=baij->bs,row,col,other_disassembled,flg;
627   Scalar      *values,val;
628   InsertMode  addv = mat->insertmode;
629 
630   PetscFunctionBegin;
631   /*  wait on receives */
632   while (count) {
633     ierr = MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);CHKERRQ(ierr);
634     /* unpack receives into our local space */
635     values = baij->rvalues + 3*imdex*baij->rmax;
636     ierr = MPI_Get_count(&recv_status,MPIU_SCALAR,&n);CHKERRQ(ierr);
637     n = n/3;
638     for ( i=0; i<n; i++ ) {
639       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
640       col = (int) PetscReal(values[3*i+1]);
641       val = values[3*i+2];
642       if (col >= baij->cstart*bs && col < baij->cend*bs) {
643         col -= baij->cstart*bs;
644         ierr = MatSetValues(baij->A,1,&row,1,&col,&val,addv); CHKERRQ(ierr)
645       } else {
646         if (mat->was_assembled) {
647           if (!baij->colmap) {
648             ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);
649           }
650           col = (baij->colmap[col/bs]) - 1 + col%bs;
651           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
652             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
653             col = (int) PetscReal(values[3*i+1]);
654           }
655         }
656         ierr = MatSetValues(baij->B,1,&row,1,&col,&val,addv); CHKERRQ(ierr)
657       }
658     }
659     count--;
660   }
661   PetscFree(baij->recv_waits); PetscFree(baij->rvalues);
662 
663   /* wait on sends */
664   if (baij->nsends) {
665     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
666     ierr        = MPI_Waitall(baij->nsends,baij->send_waits,send_status);CHKERRQ(ierr);
667     PetscFree(send_status);
668   }
669   PetscFree(baij->send_waits); PetscFree(baij->svalues);
670 
671   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
672   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
673 
674   /* determine if any processor has disassembled, if so we must
675      also disassemble ourselfs, in order that we may reassemble. */
676   ierr = MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);CHKERRQ(ierr);
677   if (mat->was_assembled && !other_disassembled) {
678     ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
679   }
680 
681   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
682     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
683   }
684   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
685   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
686 
687   ierr = OptionsHasName(PETSC_NULL,"-use_hash",&flg); CHKERRQ(ierr);
688   if (flg) {
689     double fact = 1.39;
690     /* for ( fact=1.2; fact<2.0; fact +=0.05) */ CreateHashTable(mat,fact);
691   }
692   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
693   PetscFunctionReturn(0);
694 }
695 
696 #undef __FUNC__
697 #define __FUNC__ "MatView_MPIBAIJ_Binary"
698 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
699 {
700   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
701   int          ierr;
702 
703   PetscFunctionBegin;
704   if (baij->size == 1) {
705     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
706   } else SETERRQ(PETSC_ERR_SUP,0,"Only uniprocessor output supported");
707   PetscFunctionReturn(0);
708 }
709 
710 #undef __FUNC__
711 #define __FUNC__ "MatView_MPIBAIJ_ASCIIorDraworMatlab"
712 static int MatView_MPIBAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
713 {
714   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
715   int          ierr, format,rank,bs = baij->bs;
716   FILE         *fd;
717   ViewerType   vtype;
718 
719   PetscFunctionBegin;
720   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
721   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
722     ierr = ViewerGetFormat(viewer,&format);
723     if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
724       MatInfo info;
725       MPI_Comm_rank(mat->comm,&rank);
726       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
727       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
728       PetscSequentialPhaseBegin(mat->comm,1);
729       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
730               rank,baij->m,(int)info.nz_used*bs,(int)info.nz_allocated*bs,
731               baij->bs,(int)info.memory);
732       ierr = MatGetInfo(baij->A,MAT_LOCAL,&info);
733       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
734       ierr = MatGetInfo(baij->B,MAT_LOCAL,&info);
735       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
736       fflush(fd);
737       PetscSequentialPhaseEnd(mat->comm,1);
738       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
739       PetscFunctionReturn(0);
740     } else if (format == VIEWER_FORMAT_ASCII_INFO) {
741       PetscPrintf(mat->comm,"  block size is %d\n",bs);
742       PetscFunctionReturn(0);
743     }
744   }
745 
746   if (vtype == DRAW_VIEWER) {
747     Draw       draw;
748     PetscTruth isnull;
749     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
750     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) PetscFunctionReturn(0);
751   }
752 
753   if (vtype == ASCII_FILE_VIEWER) {
754     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
755     PetscSequentialPhaseBegin(mat->comm,1);
756     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
757            baij->rank,baij->m,baij->rstart*bs,baij->rend*bs,baij->n,
758             baij->cstart*bs,baij->cend*bs);
759     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
760     ierr = MatView(baij->B,viewer); CHKERRQ(ierr);
761     fflush(fd);
762     PetscSequentialPhaseEnd(mat->comm,1);
763   } else {
764     int size = baij->size;
765     rank = baij->rank;
766     if (size == 1) {
767       ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
768     } else {
769       /* assemble the entire matrix onto first processor. */
770       Mat         A;
771       Mat_SeqBAIJ *Aloc;
772       int         M = baij->M, N = baij->N,*ai,*aj,row,col,i,j,k,*rvals;
773       int         mbs=baij->mbs;
774       Scalar      *a;
775 
776       if (!rank) {
777         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);CHKERRQ(ierr);
778       } else {
779         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);CHKERRQ(ierr);
780       }
781       PLogObjectParent(mat,A);
782 
783       /* copy over the A part */
784       Aloc = (Mat_SeqBAIJ*) baij->A->data;
785       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
786       row = baij->rstart;
787       rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
788 
789       for ( i=0; i<mbs; i++ ) {
790         rvals[0] = bs*(baij->rstart + i);
791         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
792         for ( j=ai[i]; j<ai[i+1]; j++ ) {
793           col = (baij->cstart+aj[j])*bs;
794           for (k=0; k<bs; k++ ) {
795             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
796             col++; a += bs;
797           }
798         }
799       }
800       /* copy over the B part */
801       Aloc = (Mat_SeqBAIJ*) baij->B->data;
802       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
803       row = baij->rstart*bs;
804       for ( i=0; i<mbs; i++ ) {
805         rvals[0] = bs*(baij->rstart + i);
806         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
807         for ( j=ai[i]; j<ai[i+1]; j++ ) {
808           col = baij->garray[aj[j]]*bs;
809           for (k=0; k<bs; k++ ) {
810             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
811             col++; a += bs;
812           }
813         }
814       }
815       PetscFree(rvals);
816       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
817       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
818       /*
819          Everyone has to call to draw the matrix since the graphics waits are
820          synchronized across all processors that share the Draw object
821       */
822       if (!rank || vtype == DRAW_VIEWER) {
823         ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
824       }
825       ierr = MatDestroy(A); CHKERRQ(ierr);
826     }
827   }
828   PetscFunctionReturn(0);
829 }
830 
831 
832 
833 #undef __FUNC__
834 #define __FUNC__ "MatView_MPIBAIJ"
835 int MatView_MPIBAIJ(PetscObject obj,Viewer viewer)
836 {
837   Mat         mat = (Mat) obj;
838   int         ierr;
839   ViewerType  vtype;
840 
841   PetscFunctionBegin;
842   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
843   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
844       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
845     ierr = MatView_MPIBAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
846   } else if (vtype == BINARY_FILE_VIEWER) {
847     ierr = MatView_MPIBAIJ_Binary(mat,viewer);CHKERRQ(ierr);
848   }
849   PetscFunctionReturn(0);
850 }
851 
852 #undef __FUNC__
853 #define __FUNC__ "MatDestroy_MPIBAIJ"
854 int MatDestroy_MPIBAIJ(PetscObject obj)
855 {
856   Mat         mat = (Mat) obj;
857   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
858   int         ierr;
859 
860   PetscFunctionBegin;
861 #if defined(USE_PETSC_LOG)
862   PLogObjectState(obj,"Rows=%d, Cols=%d",baij->M,baij->N);
863 #endif
864 
865   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
866   PetscFree(baij->rowners);
867   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
868   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
869   if (baij->colmap) PetscFree(baij->colmap);
870   if (baij->garray) PetscFree(baij->garray);
871   if (baij->lvec)   VecDestroy(baij->lvec);
872   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
873   if (baij->rowvalues) PetscFree(baij->rowvalues);
874   if (baij->barray) PetscFree(baij->barray);
875   PetscFree(baij);
876   PLogObjectDestroy(mat);
877   PetscHeaderDestroy(mat);
878   PetscFunctionReturn(0);
879 }
880 
881 #undef __FUNC__
882 #define __FUNC__ "MatMult_MPIBAIJ"
883 int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
884 {
885   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
886   int         ierr, nt;
887 
888   PetscFunctionBegin;
889   VecGetLocalSize_Fast(xx,nt);
890   if (nt != a->n) {
891     SETERRQ(PETSC_ERR_ARG_SIZ,0,"Incompatible partition of A and xx");
892   }
893   VecGetLocalSize_Fast(yy,nt);
894   if (nt != a->m) {
895     SETERRQ(PETSC_ERR_ARG_SIZ,0,"Incompatible parition of A and yy");
896   }
897   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
898   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
899   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
900   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
901   ierr = VecScatterPostRecvs(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
902   PetscFunctionReturn(0);
903 }
904 
905 #undef __FUNC__
906 #define __FUNC__ "MatMultAdd_MPIBAIJ"
907 int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
908 {
909   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
910   int        ierr;
911 
912   PetscFunctionBegin;
913   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
914   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
915   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
916   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
917   PetscFunctionReturn(0);
918 }
919 
920 #undef __FUNC__
921 #define __FUNC__ "MatMultTrans_MPIBAIJ"
922 int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
923 {
924   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
925   int         ierr;
926 
927   PetscFunctionBegin;
928   /* do nondiagonal part */
929   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
930   /* send it on its way */
931   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
932   /* do local part */
933   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
934   /* receive remote parts: note this assumes the values are not actually */
935   /* inserted in yy until the next line, which is true for my implementation*/
936   /* but is not perhaps always true. */
937   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
938   PetscFunctionReturn(0);
939 }
940 
941 #undef __FUNC__
942 #define __FUNC__ "MatMultTransAdd_MPIBAIJ"
943 int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
944 {
945   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
946   int         ierr;
947 
948   PetscFunctionBegin;
949   /* do nondiagonal part */
950   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
951   /* send it on its way */
952   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
953   /* do local part */
954   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
955   /* receive remote parts: note this assumes the values are not actually */
956   /* inserted in yy until the next line, which is true for my implementation*/
957   /* but is not perhaps always true. */
958   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
959   PetscFunctionReturn(0);
960 }
961 
962 /*
963   This only works correctly for square matrices where the subblock A->A is the
964    diagonal block
965 */
966 #undef __FUNC__
967 #define __FUNC__ "MatGetDiagonal_MPIBAIJ"
968 int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
969 {
970   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
971   int         ierr;
972 
973   PetscFunctionBegin;
974   if (a->M != a->N) SETERRQ(PETSC_ERR_SUP,0,"Supports only square matrix where A->A is diag block");
975   ierr = MatGetDiagonal(a->A,v);CHKERRQ(ierr);
976   PetscFunctionReturn(0);
977 }
978 
979 #undef __FUNC__
980 #define __FUNC__ "MatScale_MPIBAIJ"
981 int MatScale_MPIBAIJ(Scalar *aa,Mat A)
982 {
983   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
984   int         ierr;
985 
986   PetscFunctionBegin;
987   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
988   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
989   PetscFunctionReturn(0);
990 }
991 
992 #undef __FUNC__
993 #define __FUNC__ "MatGetSize_MPIBAIJ"
994 int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
995 {
996   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
997 
998   PetscFunctionBegin;
999   if (m) *m = mat->M;
1000   if (n) *n = mat->N;
1001   PetscFunctionReturn(0);
1002 }
1003 
1004 #undef __FUNC__
1005 #define __FUNC__ "MatGetLocalSize_MPIBAIJ"
1006 int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
1007 {
1008   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1009 
1010   PetscFunctionBegin;
1011   *m = mat->m; *n = mat->N;
1012   PetscFunctionReturn(0);
1013 }
1014 
1015 #undef __FUNC__
1016 #define __FUNC__ "MatGetOwnershipRange_MPIBAIJ"
1017 int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
1018 {
1019   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1020 
1021   PetscFunctionBegin;
1022   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
1023   PetscFunctionReturn(0);
1024 }
1025 
1026 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
1027 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
1028 
1029 #undef __FUNC__
1030 #define __FUNC__ "MatGetRow_MPIBAIJ"
1031 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
1032 {
1033   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1034   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
1035   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
1036   int        nztot, nzA, nzB, lrow, brstart = mat->rstart*bs, brend = mat->rend*bs;
1037   int        *cmap, *idx_p,cstart = mat->cstart;
1038 
1039   PetscFunctionBegin;
1040   if (mat->getrowactive == PETSC_TRUE) SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"Already active");
1041   mat->getrowactive = PETSC_TRUE;
1042 
1043   if (!mat->rowvalues && (idx || v)) {
1044     /*
1045         allocate enough space to hold information from the longest row.
1046     */
1047     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
1048     int     max = 1,mbs = mat->mbs,tmp;
1049     for ( i=0; i<mbs; i++ ) {
1050       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
1051       if (max < tmp) { max = tmp; }
1052     }
1053     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
1054     CHKPTRQ(mat->rowvalues);
1055     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
1056   }
1057 
1058   if (row < brstart || row >= brend) SETERRQ(PETSC_ERR_SUP,0,"Only local rows")
1059   lrow = row - brstart;
1060 
1061   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
1062   if (!v)   {pvA = 0; pvB = 0;}
1063   if (!idx) {pcA = 0; if (!v) pcB = 0;}
1064   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1065   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1066   nztot = nzA + nzB;
1067 
1068   cmap  = mat->garray;
1069   if (v  || idx) {
1070     if (nztot) {
1071       /* Sort by increasing column numbers, assuming A and B already sorted */
1072       int imark = -1;
1073       if (v) {
1074         *v = v_p = mat->rowvalues;
1075         for ( i=0; i<nzB; i++ ) {
1076           if (cmap[cworkB[i]/bs] < cstart)   v_p[i] = vworkB[i];
1077           else break;
1078         }
1079         imark = i;
1080         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
1081         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
1082       }
1083       if (idx) {
1084         *idx = idx_p = mat->rowindices;
1085         if (imark > -1) {
1086           for ( i=0; i<imark; i++ ) {
1087             idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs;
1088           }
1089         } else {
1090           for ( i=0; i<nzB; i++ ) {
1091             if (cmap[cworkB[i]/bs] < cstart)
1092               idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
1093             else break;
1094           }
1095           imark = i;
1096         }
1097         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart*bs + cworkA[i];
1098         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
1099       }
1100     } else {
1101       if (idx) *idx = 0;
1102       if (v)   *v   = 0;
1103     }
1104   }
1105   *nz = nztot;
1106   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1107   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1108   PetscFunctionReturn(0);
1109 }
1110 
1111 #undef __FUNC__
1112 #define __FUNC__ "MatRestoreRow_MPIBAIJ"
1113 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
1114 {
1115   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1116 
1117   PetscFunctionBegin;
1118   if (baij->getrowactive == PETSC_FALSE) {
1119     SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"MatGetRow not called");
1120   }
1121   baij->getrowactive = PETSC_FALSE;
1122   PetscFunctionReturn(0);
1123 }
1124 
1125 #undef __FUNC__
1126 #define __FUNC__ "MatGetBlockSize_MPIBAIJ"
1127 int MatGetBlockSize_MPIBAIJ(Mat mat,int *bs)
1128 {
1129   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1130 
1131   PetscFunctionBegin;
1132   *bs = baij->bs;
1133   PetscFunctionReturn(0);
1134 }
1135 
1136 #undef __FUNC__
1137 #define __FUNC__ "MatZeroEntries_MPIBAIJ"
1138 int MatZeroEntries_MPIBAIJ(Mat A)
1139 {
1140   Mat_MPIBAIJ *l = (Mat_MPIBAIJ *) A->data;
1141   int         ierr;
1142 
1143   PetscFunctionBegin;
1144   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
1145   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
1146   PetscFunctionReturn(0);
1147 }
1148 
1149 #undef __FUNC__
1150 #define __FUNC__ "MatGetInfo_MPIBAIJ"
1151 int MatGetInfo_MPIBAIJ(Mat matin,MatInfoType flag,MatInfo *info)
1152 {
1153   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) matin->data;
1154   Mat         A = a->A, B = a->B;
1155   int         ierr;
1156   double      isend[5], irecv[5];
1157 
1158   PetscFunctionBegin;
1159   info->block_size     = (double)a->bs;
1160   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
1161   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->memory;
1162   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
1163   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->memory;
1164   if (flag == MAT_LOCAL) {
1165     info->nz_used      = isend[0];
1166     info->nz_allocated = isend[1];
1167     info->nz_unneeded  = isend[2];
1168     info->memory       = isend[3];
1169     info->mallocs      = isend[4];
1170   } else if (flag == MAT_GLOBAL_MAX) {
1171     ierr = MPI_Allreduce(isend,irecv,5,MPI_INT,MPI_MAX,matin->comm);CHKERRQ(ierr);
1172     info->nz_used      = irecv[0];
1173     info->nz_allocated = irecv[1];
1174     info->nz_unneeded  = irecv[2];
1175     info->memory       = irecv[3];
1176     info->mallocs      = irecv[4];
1177   } else if (flag == MAT_GLOBAL_SUM) {
1178     ierr = MPI_Allreduce(isend,irecv,5,MPI_INT,MPI_SUM,matin->comm);CHKERRQ(ierr);
1179     info->nz_used      = irecv[0];
1180     info->nz_allocated = irecv[1];
1181     info->nz_unneeded  = irecv[2];
1182     info->memory       = irecv[3];
1183     info->mallocs      = irecv[4];
1184   }
1185   info->rows_global       = (double)a->M;
1186   info->columns_global    = (double)a->N;
1187   info->rows_local        = (double)a->m;
1188   info->columns_local     = (double)a->N;
1189   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
1190   info->fill_ratio_needed = 0;
1191   info->factor_mallocs    = 0;
1192   PetscFunctionReturn(0);
1193 }
1194 
1195 #undef __FUNC__
1196 #define __FUNC__ "MatSetOption_MPIBAIJ"
1197 int MatSetOption_MPIBAIJ(Mat A,MatOption op)
1198 {
1199   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1200 
1201   PetscFunctionBegin;
1202   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
1203       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
1204       op == MAT_COLUMNS_UNSORTED ||
1205       op == MAT_COLUMNS_SORTED ||
1206       op == MAT_NEW_NONZERO_ALLOCATION_ERROR ||
1207       op == MAT_NEW_NONZERO_LOCATION_ERROR) {
1208         MatSetOption(a->A,op);
1209         MatSetOption(a->B,op);
1210   } else if (op == MAT_ROW_ORIENTED) {
1211         a->roworiented = 1;
1212         MatSetOption(a->A,op);
1213         MatSetOption(a->B,op);
1214   } else if (op == MAT_ROWS_SORTED ||
1215              op == MAT_ROWS_UNSORTED ||
1216              op == MAT_SYMMETRIC ||
1217              op == MAT_STRUCTURALLY_SYMMETRIC ||
1218              op == MAT_YES_NEW_DIAGONALS)
1219     PLogInfo(A,"Info:MatSetOption_MPIBAIJ:Option ignored\n");
1220   else if (op == MAT_COLUMN_ORIENTED) {
1221     a->roworiented = 0;
1222     MatSetOption(a->A,op);
1223     MatSetOption(a->B,op);
1224   } else if (op == MAT_IGNORE_OFF_PROC_ENTRIES) {
1225     a->donotstash = 1;
1226   } else if (op == MAT_NO_NEW_DIAGONALS) {
1227     SETERRQ(PETSC_ERR_SUP,0,"MAT_NO_NEW_DIAGONALS");
1228   } else {
1229     SETERRQ(PETSC_ERR_SUP,0,"unknown option");
1230   }
1231   PetscFunctionReturn(0);
1232 }
1233 
1234 #undef __FUNC__
1235 #define __FUNC__ "MatTranspose_MPIBAIJ("
1236 int MatTranspose_MPIBAIJ(Mat A,Mat *matout)
1237 {
1238   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) A->data;
1239   Mat_SeqBAIJ *Aloc;
1240   Mat        B;
1241   int        ierr,M=baij->M,N=baij->N,*ai,*aj,row,i,*rvals,j,k,col;
1242   int        bs=baij->bs,mbs=baij->mbs;
1243   Scalar     *a;
1244 
1245   PetscFunctionBegin;
1246   if (matout == PETSC_NULL && M != N) SETERRQ(PETSC_ERR_ARG_SIZ,0,"Square matrix only for in-place");
1247   ierr = MatCreateMPIBAIJ(A->comm,baij->bs,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);
1248   CHKERRQ(ierr);
1249 
1250   /* copy over the A part */
1251   Aloc = (Mat_SeqBAIJ*) baij->A->data;
1252   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1253   row = baij->rstart;
1254   rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
1255 
1256   for ( i=0; i<mbs; i++ ) {
1257     rvals[0] = bs*(baij->rstart + i);
1258     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1259     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1260       col = (baij->cstart+aj[j])*bs;
1261       for (k=0; k<bs; k++ ) {
1262         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1263         col++; a += bs;
1264       }
1265     }
1266   }
1267   /* copy over the B part */
1268   Aloc = (Mat_SeqBAIJ*) baij->B->data;
1269   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1270   row = baij->rstart*bs;
1271   for ( i=0; i<mbs; i++ ) {
1272     rvals[0] = bs*(baij->rstart + i);
1273     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1274     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1275       col = baij->garray[aj[j]]*bs;
1276       for (k=0; k<bs; k++ ) {
1277         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1278         col++; a += bs;
1279       }
1280     }
1281   }
1282   PetscFree(rvals);
1283   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1284   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1285 
1286   if (matout != PETSC_NULL) {
1287     *matout = B;
1288   } else {
1289     /* This isn't really an in-place transpose .... but free data structures from baij */
1290     PetscFree(baij->rowners);
1291     ierr = MatDestroy(baij->A); CHKERRQ(ierr);
1292     ierr = MatDestroy(baij->B); CHKERRQ(ierr);
1293     if (baij->colmap) PetscFree(baij->colmap);
1294     if (baij->garray) PetscFree(baij->garray);
1295     if (baij->lvec) VecDestroy(baij->lvec);
1296     if (baij->Mvctx) VecScatterDestroy(baij->Mvctx);
1297     PetscFree(baij);
1298     PetscMemcpy(A,B,sizeof(struct _p_Mat));
1299     PetscHeaderDestroy(B);
1300   }
1301   PetscFunctionReturn(0);
1302 }
1303 
1304 #undef __FUNC__
1305 #define __FUNC__ "MatDiagonalScale_MPIBAIJ"
1306 int MatDiagonalScale_MPIBAIJ(Mat A,Vec ll,Vec rr)
1307 {
1308   Mat a = ((Mat_MPIBAIJ *) A->data)->A;
1309   Mat b = ((Mat_MPIBAIJ *) A->data)->B;
1310   int ierr,s1,s2,s3;
1311 
1312   PetscFunctionBegin;
1313   if (ll)  {
1314     ierr = VecGetLocalSize(ll,&s1); CHKERRQ(ierr);
1315     ierr = MatGetLocalSize(A,&s2,&s3); CHKERRQ(ierr);
1316     if (s1!=s2) SETERRQ(PETSC_ERR_ARG_SIZ,0,"non-conforming local sizes");
1317     ierr = MatDiagonalScale(a,ll,0); CHKERRQ(ierr);
1318     ierr = MatDiagonalScale(b,ll,0); CHKERRQ(ierr);
1319   }
1320   if (rr) SETERRQ(PETSC_ERR_SUP,0,"not supported for right vector");
1321   PetscFunctionReturn(0);
1322 }
1323 
1324 #undef __FUNC__
1325 #define __FUNC__ "MatZeroRows_MPIBAIJ"
1326 int MatZeroRows_MPIBAIJ(Mat A,IS is,Scalar *diag)
1327 {
1328   Mat_MPIBAIJ    *l = (Mat_MPIBAIJ *) A->data;
1329   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
1330   int            *procs,*nprocs,j,found,idx,nsends,*work,row;
1331   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
1332   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
1333   int            *lens,imdex,*lrows,*values,bs=l->bs,rstart_bs=l->rstart_bs;
1334   MPI_Comm       comm = A->comm;
1335   MPI_Request    *send_waits,*recv_waits;
1336   MPI_Status     recv_status,*send_status;
1337   IS             istmp;
1338 
1339   PetscFunctionBegin;
1340   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
1341   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
1342 
1343   /*  first count number of contributors to each processor */
1344   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
1345   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
1346   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
1347   for ( i=0; i<N; i++ ) {
1348     idx = rows[i];
1349     found = 0;
1350     for ( j=0; j<size; j++ ) {
1351       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
1352         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
1353       }
1354     }
1355     if (!found) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Index out of range");
1356   }
1357   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
1358 
1359   /* inform other processors of number of messages and max length*/
1360   work   = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
1361   ierr   = MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
1362   nrecvs = work[rank];
1363   ierr   = MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);CHKERRQ(ierr);
1364   nmax   = work[rank];
1365   PetscFree(work);
1366 
1367   /* post receives:   */
1368   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); CHKPTRQ(rvalues);
1369   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
1370   for ( i=0; i<nrecvs; i++ ) {
1371     ierr = MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);CHKERRQ(ierr);
1372   }
1373 
1374   /* do sends:
1375      1) starts[i] gives the starting index in svalues for stuff going to
1376      the ith processor
1377   */
1378   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
1379   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
1380   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
1381   starts[0] = 0;
1382   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1383   for ( i=0; i<N; i++ ) {
1384     svalues[starts[owner[i]]++] = rows[i];
1385   }
1386   ISRestoreIndices(is,&rows);
1387 
1388   starts[0] = 0;
1389   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1390   count = 0;
1391   for ( i=0; i<size; i++ ) {
1392     if (procs[i]) {
1393       ierr = MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);CHKERRQ(ierr);
1394     }
1395   }
1396   PetscFree(starts);
1397 
1398   base = owners[rank]*bs;
1399 
1400   /*  wait on receives */
1401   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
1402   source = lens + nrecvs;
1403   count  = nrecvs; slen = 0;
1404   while (count) {
1405     ierr = MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);CHKERRQ(ierr);
1406     /* unpack receives into our local space */
1407     ierr = MPI_Get_count(&recv_status,MPI_INT,&n);CHKERRQ(ierr);
1408     source[imdex]  = recv_status.MPI_SOURCE;
1409     lens[imdex]  = n;
1410     slen += n;
1411     count--;
1412   }
1413   PetscFree(recv_waits);
1414 
1415   /* move the data into the send scatter */
1416   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
1417   count = 0;
1418   for ( i=0; i<nrecvs; i++ ) {
1419     values = rvalues + i*nmax;
1420     for ( j=0; j<lens[i]; j++ ) {
1421       lrows[count++] = values[j] - base;
1422     }
1423   }
1424   PetscFree(rvalues); PetscFree(lens);
1425   PetscFree(owner); PetscFree(nprocs);
1426 
1427   /* actually zap the local rows */
1428   ierr = ISCreateGeneral(PETSC_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
1429   PLogObjectParent(A,istmp);
1430 
1431   ierr = MatZeroRows(l->A,istmp,0); CHKERRQ(ierr);
1432   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
1433   ierr = ISDestroy(istmp); CHKERRQ(ierr);
1434 
1435   if (diag) {
1436     for ( i = 0; i < slen; i++ ) {
1437       row = lrows[i] + rstart_bs;
1438       ierr = MatSetValues(A,1,&row,1,&row,diag,INSERT_VALUES); CHKERRQ(ierr);
1439     }
1440     ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1441     ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1442   }
1443   PetscFree(lrows);
1444 
1445   /* wait on sends */
1446   if (nsends) {
1447     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
1448     ierr        = MPI_Waitall(nsends,send_waits,send_status);CHKERRQ(ierr);
1449     PetscFree(send_status);
1450   }
1451   PetscFree(send_waits); PetscFree(svalues);
1452 
1453   PetscFunctionReturn(0);
1454 }
1455 extern int MatPrintHelp_SeqBAIJ(Mat);
1456 #undef __FUNC__
1457 #define __FUNC__ "MatPrintHelp_MPIBAIJ"
1458 int MatPrintHelp_MPIBAIJ(Mat A)
1459 {
1460   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1461   int         ierr;
1462 
1463   PetscFunctionBegin;
1464   if (!a->rank) {
1465     ierr = MatPrintHelp_SeqBAIJ(a->A);CHKERRQ(ierr);
1466   }
1467   PetscFunctionReturn(0);
1468 }
1469 
1470 #undef __FUNC__
1471 #define __FUNC__ "MatSetUnfactored_MPIBAIJ"
1472 int MatSetUnfactored_MPIBAIJ(Mat A)
1473 {
1474   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1475   int         ierr;
1476 
1477   PetscFunctionBegin;
1478   ierr = MatSetUnfactored(a->A); CHKERRQ(ierr);
1479   PetscFunctionReturn(0);
1480 }
1481 
1482 static int MatConvertSameType_MPIBAIJ(Mat,Mat *,int);
1483 
1484 /* -------------------------------------------------------------------*/
1485 static struct _MatOps MatOps = {
1486   MatSetValues_MPIBAIJ,MatGetRow_MPIBAIJ,MatRestoreRow_MPIBAIJ,MatMult_MPIBAIJ,
1487   MatMultAdd_MPIBAIJ,MatMultTrans_MPIBAIJ,MatMultTransAdd_MPIBAIJ,0,
1488   0,0,0,0,
1489   0,0,MatTranspose_MPIBAIJ,MatGetInfo_MPIBAIJ,
1490   0,MatGetDiagonal_MPIBAIJ,MatDiagonalScale_MPIBAIJ,MatNorm_MPIBAIJ,
1491   MatAssemblyBegin_MPIBAIJ,MatAssemblyEnd_MPIBAIJ,0,MatSetOption_MPIBAIJ,
1492   MatZeroEntries_MPIBAIJ,MatZeroRows_MPIBAIJ,0,
1493   0,0,0,MatGetSize_MPIBAIJ,
1494   MatGetLocalSize_MPIBAIJ,MatGetOwnershipRange_MPIBAIJ,0,0,
1495   0,0,MatConvertSameType_MPIBAIJ,0,0,
1496   0,0,0,MatGetSubMatrices_MPIBAIJ,
1497   MatIncreaseOverlap_MPIBAIJ,MatGetValues_MPIBAIJ,0,MatPrintHelp_MPIBAIJ,
1498   MatScale_MPIBAIJ,0,0,0,MatGetBlockSize_MPIBAIJ,
1499   0,0,0,0,0,0,MatSetUnfactored_MPIBAIJ,0,MatSetValuesBlocked_MPIBAIJ};
1500 
1501 
1502 #undef __FUNC__
1503 #define __FUNC__ "MatCreateMPIBAIJ"
1504 /*@C
1505    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
1506    (block compressed row).  For good matrix assembly performance
1507    the user should preallocate the matrix storage by setting the parameters
1508    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1509    performance can be increased by more than a factor of 50.
1510 
1511    Input Parameters:
1512 .  comm - MPI communicator
1513 .  bs   - size of blockk
1514 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1515            This value should be the same as the local size used in creating the
1516            y vector for the matrix-vector product y = Ax.
1517 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1518            This value should be the same as the local size used in creating the
1519            x vector for the matrix-vector product y = Ax.
1520 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1521 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1522 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
1523            submatrix  (same for all local rows)
1524 .  d_nzz - array containing the number of block nonzeros in the various block rows
1525            of the in diagonal portion of the local (possibly different for each block
1526            row) or PETSC_NULL.  You must leave room for the diagonal entry even if
1527            it is zero.
1528 .  o_nz  - number of block nonzeros per block row in the off-diagonal portion of local
1529            submatrix (same for all local rows).
1530 .  o_nzz - array containing the number of nonzeros in the various block rows of the
1531            off-diagonal portion of the local submatrix (possibly different for
1532            each block row) or PETSC_NULL.
1533 
1534    Output Parameter:
1535 .  A - the matrix
1536 
1537    Notes:
1538    The user MUST specify either the local or global matrix dimensions
1539    (possibly both).
1540 
1541    Storage Information:
1542    For a square global matrix we define each processor's diagonal portion
1543    to be its local rows and the corresponding columns (a square submatrix);
1544    each processor's off-diagonal portion encompasses the remainder of the
1545    local matrix (a rectangular submatrix).
1546 
1547    The user can specify preallocated storage for the diagonal part of
1548    the local submatrix with either d_nz or d_nnz (not both).  Set
1549    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1550    memory allocation.  Likewise, specify preallocated storage for the
1551    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1552 
1553    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1554    the figure below we depict these three local rows and all columns (0-11).
1555 
1556 $          0 1 2 3 4 5 6 7 8 9 10 11
1557 $         -------------------
1558 $  row 3  |  o o o d d d o o o o o o
1559 $  row 4  |  o o o d d d o o o o o o
1560 $  row 5  |  o o o d d d o o o o o o
1561 $         -------------------
1562 $
1563 
1564    Thus, any entries in the d locations are stored in the d (diagonal)
1565    submatrix, and any entries in the o locations are stored in the
1566    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1567    stored simply in the MATSEQBAIJ format for compressed row storage.
1568 
1569    Now d_nz should indicate the number of block nonzeros per row in the d matrix,
1570    and o_nz should indicate the number of block nonzeros per row in the o matrix.
1571    In general, for PDE problems in which most nonzeros are near the diagonal,
1572    one expects d_nz >> o_nz.   For large problems you MUST preallocate memory
1573    or you will get TERRIBLE performance; see the users' manual chapter on
1574    matrices.
1575 
1576 .keywords: matrix, block, aij, compressed row, sparse, parallel
1577 
1578 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues()
1579 @*/
1580 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
1581                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1582 {
1583   Mat          B;
1584   Mat_MPIBAIJ  *b;
1585   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE,size;
1586 
1587   PetscFunctionBegin;
1588   if (bs < 1) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Invalid block size specified, must be positive");
1589 
1590   MPI_Comm_size(comm,&size);
1591   if (size == 1) {
1592     if (M == PETSC_DECIDE) M = m;
1593     if (N == PETSC_DECIDE) N = n;
1594     ierr = MatCreateSeqBAIJ(comm,bs,M,N,d_nz,d_nnz,A); CHKERRQ(ierr);
1595     PetscFunctionReturn(0);
1596   }
1597 
1598   *A = 0;
1599   PetscHeaderCreate(B,_p_Mat,MAT_COOKIE,MATMPIBAIJ,comm,MatDestroy,MatView);
1600   PLogObjectCreate(B);
1601   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
1602   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
1603   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1604 
1605   B->destroy    = MatDestroy_MPIBAIJ;
1606   B->view       = MatView_MPIBAIJ;
1607   B->mapping    = 0;
1608   B->factor     = 0;
1609   B->assembled  = PETSC_FALSE;
1610 
1611   B->insertmode = NOT_SET_VALUES;
1612   MPI_Comm_rank(comm,&b->rank);
1613   MPI_Comm_size(comm,&b->size);
1614 
1615   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL)) {
1616     SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1617   }
1618   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) {
1619     SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"either M or m should be specified");
1620   }
1621   if ( N == PETSC_DECIDE && n == PETSC_DECIDE) {
1622     SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"either N or n should be specified");
1623   }
1624   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
1625   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
1626 
1627   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1628     work[0] = m; work[1] = n;
1629     mbs = m/bs; nbs = n/bs;
1630     ierr = MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );CHKERRQ(ierr);
1631     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
1632     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
1633   }
1634   if (m == PETSC_DECIDE) {
1635     Mbs = M/bs;
1636     if (Mbs*bs != M) SETERRQ(PETSC_ERR_ARG_SIZ,0,"No of global rows must be divisible by blocksize");
1637     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
1638     m   = mbs*bs;
1639   }
1640   if (n == PETSC_DECIDE) {
1641     Nbs = N/bs;
1642     if (Nbs*bs != N) SETERRQ(PETSC_ERR_ARG_SIZ,0,"No of global cols must be divisible by blocksize");
1643     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
1644     n   = nbs*bs;
1645   }
1646   if (mbs*bs != m || nbs*bs != n) {
1647     SETERRQ(PETSC_ERR_ARG_SIZ,0,"No of local rows, cols must be divisible by blocksize");
1648   }
1649 
1650   b->m = m; B->m = m;
1651   b->n = n; B->n = n;
1652   b->N = N; B->N = N;
1653   b->M = M; B->M = M;
1654   b->bs  = bs;
1655   b->bs2 = bs*bs;
1656   b->mbs = mbs;
1657   b->nbs = nbs;
1658   b->Mbs = Mbs;
1659   b->Nbs = Nbs;
1660 
1661   /* build local table of row and column ownerships */
1662   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1663   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIBAIJ));
1664   b->cowners = b->rowners + b->size + 2;
1665   ierr = MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1666   b->rowners[0] = 0;
1667   for ( i=2; i<=b->size; i++ ) {
1668     b->rowners[i] += b->rowners[i-1];
1669   }
1670   b->rstart    = b->rowners[b->rank];
1671   b->rend      = b->rowners[b->rank+1];
1672   b->rstart_bs = b->rstart * bs;
1673   b->rend_bs   = b->rend * bs;
1674 
1675   ierr = MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1676   b->cowners[0] = 0;
1677   for ( i=2; i<=b->size; i++ ) {
1678     b->cowners[i] += b->cowners[i-1];
1679   }
1680   b->cstart    = b->cowners[b->rank];
1681   b->cend      = b->cowners[b->rank+1];
1682   b->cstart_bs = b->cstart * bs;
1683   b->cend_bs   = b->cend * bs;
1684 
1685 
1686   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1687   ierr = MatCreateSeqBAIJ(PETSC_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1688   PLogObjectParent(B,b->A);
1689   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1690   ierr = MatCreateSeqBAIJ(PETSC_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1691   PLogObjectParent(B,b->B);
1692 
1693   /* build cache for off array entries formed */
1694   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1695   b->donotstash  = 0;
1696   b->colmap      = 0;
1697   b->garray      = 0;
1698   b->roworiented = 1;
1699 
1700   /* stuff used in block assembly */
1701   b->barray       = 0;
1702 
1703   /* stuff used for matrix vector multiply */
1704   b->lvec         = 0;
1705   b->Mvctx        = 0;
1706 
1707   /* stuff for MatGetRow() */
1708   b->rowindices   = 0;
1709   b->rowvalues    = 0;
1710   b->getrowactive = PETSC_FALSE;
1711 
1712   /* hash table stuff */
1713   b->ht          = 0;
1714 
1715   *A = B;
1716   PetscFunctionReturn(0);
1717 }
1718 
1719 #undef __FUNC__
1720 #define __FUNC__ "MatConvertSameType_MPIBAIJ"
1721 static int MatConvertSameType_MPIBAIJ(Mat matin,Mat *newmat,int cpvalues)
1722 {
1723   Mat         mat;
1724   Mat_MPIBAIJ *a,*oldmat = (Mat_MPIBAIJ *) matin->data;
1725   int         ierr, len=0, flg;
1726 
1727   PetscFunctionBegin;
1728   *newmat       = 0;
1729   PetscHeaderCreate(mat,_p_Mat,MAT_COOKIE,MATMPIBAIJ,matin->comm,MatDestroy,MatView);
1730   PLogObjectCreate(mat);
1731   mat->data       = (void *) (a = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(a);
1732   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1733   mat->destroy    = MatDestroy_MPIBAIJ;
1734   mat->view       = MatView_MPIBAIJ;
1735   mat->factor     = matin->factor;
1736   mat->assembled  = PETSC_TRUE;
1737 
1738   a->m = mat->m   = oldmat->m;
1739   a->n = mat->n   = oldmat->n;
1740   a->M = mat->M   = oldmat->M;
1741   a->N = mat->N   = oldmat->N;
1742 
1743   a->bs  = oldmat->bs;
1744   a->bs2 = oldmat->bs2;
1745   a->mbs = oldmat->mbs;
1746   a->nbs = oldmat->nbs;
1747   a->Mbs = oldmat->Mbs;
1748   a->Nbs = oldmat->Nbs;
1749 
1750   a->rstart       = oldmat->rstart;
1751   a->rend         = oldmat->rend;
1752   a->cstart       = oldmat->cstart;
1753   a->cend         = oldmat->cend;
1754   a->size         = oldmat->size;
1755   a->rank         = oldmat->rank;
1756   mat->insertmode = NOT_SET_VALUES;
1757   a->rowvalues    = 0;
1758   a->getrowactive = PETSC_FALSE;
1759   a->barray       = 0;
1760 
1761   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1762   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIBAIJ));
1763   a->cowners = a->rowners + a->size + 2;
1764   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1765   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1766   if (oldmat->colmap) {
1767     a->colmap = (int *) PetscMalloc((a->Nbs)*sizeof(int));CHKPTRQ(a->colmap);
1768     PLogObjectMemory(mat,(a->Nbs)*sizeof(int));
1769     PetscMemcpy(a->colmap,oldmat->colmap,(a->Nbs)*sizeof(int));
1770   } else a->colmap = 0;
1771   if (oldmat->garray && (len = ((Mat_SeqBAIJ *) (oldmat->B->data))->nbs)) {
1772     a->garray = (int *) PetscMalloc(len*sizeof(int)); CHKPTRQ(a->garray);
1773     PLogObjectMemory(mat,len*sizeof(int));
1774     PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1775   } else a->garray = 0;
1776 
1777   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1778   PLogObjectParent(mat,a->lvec);
1779   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1780   PLogObjectParent(mat,a->Mvctx);
1781   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1782   PLogObjectParent(mat,a->A);
1783   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1784   PLogObjectParent(mat,a->B);
1785   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1786   if (flg) {
1787     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1788   }
1789   *newmat = mat;
1790   PetscFunctionReturn(0);
1791 }
1792 
1793 #include "sys.h"
1794 
1795 #undef __FUNC__
1796 #define __FUNC__ "MatLoad_MPIBAIJ"
1797 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
1798 {
1799   Mat          A;
1800   int          i, nz, ierr, j,rstart, rend, fd;
1801   Scalar       *vals,*buf;
1802   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1803   MPI_Status   status;
1804   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
1805   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
1806   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,bs2,Mbs,mbs,extra_rows;
1807   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
1808   int          dcount,kmax,k,nzcount,tmp;
1809 
1810   PetscFunctionBegin;
1811   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
1812   bs2  = bs*bs;
1813 
1814   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1815   if (!rank) {
1816     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1817     ierr = PetscBinaryRead(fd,(char *)header,4,PETSC_INT); CHKERRQ(ierr);
1818     if (header[0] != MAT_COOKIE) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"not matrix object");
1819     if (header[3] < 0) {
1820       SETERRQ(PETSC_ERR_FILE_UNEXPECTED,1,"Matrix stored in special format, cannot load as MPIBAIJ");
1821     }
1822   }
1823 
1824   ierr = MPI_Bcast(header+1,3,MPI_INT,0,comm);CHKERRQ(ierr);
1825   M = header[1]; N = header[2];
1826 
1827   if (M != N) SETERRQ(PETSC_ERR_SUP,0,"Can only do square matrices");
1828 
1829   /*
1830      This code adds extra rows to make sure the number of rows is
1831      divisible by the blocksize
1832   */
1833   Mbs        = M/bs;
1834   extra_rows = bs - M + bs*(Mbs);
1835   if (extra_rows == bs) extra_rows = 0;
1836   else                  Mbs++;
1837   if (extra_rows &&!rank) {
1838     PLogInfo(0,"MatLoad_MPIBAIJ:Padding loaded matrix to match blocksize\n");
1839   }
1840 
1841   /* determine ownership of all rows */
1842   mbs = Mbs/size + ((Mbs % size) > rank);
1843   m   = mbs * bs;
1844   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
1845   browners = rowners + size + 1;
1846   ierr = MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1847   rowners[0] = 0;
1848   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
1849   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
1850   rstart = rowners[rank];
1851   rend   = rowners[rank+1];
1852 
1853   /* distribute row lengths to all processors */
1854   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
1855   if (!rank) {
1856     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
1857     ierr = PetscBinaryRead(fd,rowlengths,M,PETSC_INT); CHKERRQ(ierr);
1858     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
1859     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1860     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
1861     ierr = MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);CHKERRQ(ierr);
1862     PetscFree(sndcounts);
1863   } else {
1864     ierr = MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);CHKERRQ(ierr);
1865   }
1866 
1867   if (!rank) {
1868     /* calculate the number of nonzeros on each processor */
1869     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1870     PetscMemzero(procsnz,size*sizeof(int));
1871     for ( i=0; i<size; i++ ) {
1872       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
1873         procsnz[i] += rowlengths[j];
1874       }
1875     }
1876     PetscFree(rowlengths);
1877 
1878     /* determine max buffer needed and allocate it */
1879     maxnz = 0;
1880     for ( i=0; i<size; i++ ) {
1881       maxnz = PetscMax(maxnz,procsnz[i]);
1882     }
1883     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1884 
1885     /* read in my part of the matrix column indices  */
1886     nz = procsnz[0];
1887     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1888     mycols = ibuf;
1889     if (size == 1)  nz -= extra_rows;
1890     ierr = PetscBinaryRead(fd,mycols,nz,PETSC_INT); CHKERRQ(ierr);
1891     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
1892 
1893     /* read in every ones (except the last) and ship off */
1894     for ( i=1; i<size-1; i++ ) {
1895       nz   = procsnz[i];
1896       ierr = PetscBinaryRead(fd,cols,nz,PETSC_INT); CHKERRQ(ierr);
1897       ierr = MPI_Send(cols,nz,MPI_INT,i,tag,comm);CHKERRQ(ierr);
1898     }
1899     /* read in the stuff for the last proc */
1900     if ( size != 1 ) {
1901       nz   = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
1902       ierr = PetscBinaryRead(fd,cols,nz,PETSC_INT); CHKERRQ(ierr);
1903       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
1904       ierr = MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);CHKERRQ(ierr);
1905     }
1906     PetscFree(cols);
1907   } else {
1908     /* determine buffer space needed for message */
1909     nz = 0;
1910     for ( i=0; i<m; i++ ) {
1911       nz += locrowlens[i];
1912     }
1913     ibuf   = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1914     mycols = ibuf;
1915     /* receive message of column indices*/
1916     ierr = MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);CHKERRQ(ierr);
1917     ierr = MPI_Get_count(&status,MPI_INT,&maxnz);CHKERRQ(ierr);
1918     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"something is wrong with file");
1919   }
1920 
1921   /* loop over local rows, determining number of off diagonal entries */
1922   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
1923   odlens = dlens + (rend-rstart);
1924   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
1925   PetscMemzero(mask,3*Mbs*sizeof(int));
1926   masked1 = mask    + Mbs;
1927   masked2 = masked1 + Mbs;
1928   rowcount = 0; nzcount = 0;
1929   for ( i=0; i<mbs; i++ ) {
1930     dcount  = 0;
1931     odcount = 0;
1932     for ( j=0; j<bs; j++ ) {
1933       kmax = locrowlens[rowcount];
1934       for ( k=0; k<kmax; k++ ) {
1935         tmp = mycols[nzcount++]/bs;
1936         if (!mask[tmp]) {
1937           mask[tmp] = 1;
1938           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
1939           else masked1[dcount++] = tmp;
1940         }
1941       }
1942       rowcount++;
1943     }
1944 
1945     dlens[i]  = dcount;
1946     odlens[i] = odcount;
1947 
1948     /* zero out the mask elements we set */
1949     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
1950     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
1951   }
1952 
1953   /* create our matrix */
1954   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);
1955          CHKERRQ(ierr);
1956   A = *newmat;
1957   MatSetOption(A,MAT_COLUMNS_SORTED);
1958 
1959   if (!rank) {
1960     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
1961     /* read in my part of the matrix numerical values  */
1962     nz = procsnz[0];
1963     vals = buf;
1964     mycols = ibuf;
1965     if (size == 1)  nz -= extra_rows;
1966     ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
1967     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
1968 
1969     /* insert into matrix */
1970     jj      = rstart*bs;
1971     for ( i=0; i<m; i++ ) {
1972       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1973       mycols += locrowlens[i];
1974       vals   += locrowlens[i];
1975       jj++;
1976     }
1977     /* read in other processors (except the last one) and ship out */
1978     for ( i=1; i<size-1; i++ ) {
1979       nz   = procsnz[i];
1980       vals = buf;
1981       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
1982       ierr = MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);CHKERRQ(ierr);
1983     }
1984     /* the last proc */
1985     if ( size != 1 ){
1986       nz   = procsnz[i] - extra_rows;
1987       vals = buf;
1988       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
1989       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
1990       ierr = MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);CHKERRQ(ierr);
1991     }
1992     PetscFree(procsnz);
1993   } else {
1994     /* receive numeric values */
1995     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
1996 
1997     /* receive message of values*/
1998     vals   = buf;
1999     mycols = ibuf;
2000     ierr   = MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);CHKERRQ(ierr);
2001     ierr   = MPI_Get_count(&status,MPIU_SCALAR,&maxnz);CHKERRQ(ierr);
2002     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"something is wrong with file");
2003 
2004     /* insert into matrix */
2005     jj      = rstart*bs;
2006     for ( i=0; i<m; i++ ) {
2007       ierr    = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
2008       mycols += locrowlens[i];
2009       vals   += locrowlens[i];
2010       jj++;
2011     }
2012   }
2013   PetscFree(locrowlens);
2014   PetscFree(buf);
2015   PetscFree(ibuf);
2016   PetscFree(rowners);
2017   PetscFree(dlens);
2018   PetscFree(mask);
2019   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2020   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2021   PetscFunctionReturn(0);
2022 }
2023 
2024 
2025