xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision f8abc2e8a49348f162ce37debc6ce8f5479b4450)
1 #ifdef PETSC_RCS_HEADER
2 static char vcid[] = "$Id: mpibaij.c,v 1.85 1997/10/29 14:08:11 bsmith Exp bsmith $";
3 #endif
4 
5 #include "pinclude/pviewer.h"
6 #include "src/mat/impls/baij/mpi/mpibaij.h"
7 #include "src/vec/vecimpl.h"
8 
9 
10 extern int MatSetUpMultiply_MPIBAIJ(Mat);
11 extern int DisAssemble_MPIBAIJ(Mat);
12 extern int MatIncreaseOverlap_MPIBAIJ(Mat,int,IS *,int);
13 extern int MatGetSubMatrices_MPIBAIJ(Mat,int,IS *,IS *,MatGetSubMatrixCall,Mat **);
14 
15 /*
16      Local utility routine that creates a mapping from the global column
17    number to the local number in the off-diagonal part of the local
18    storage of the matrix.  This is done in a non scable way since the
19    length of colmap equals the global matrix length.
20 */
21 #undef __FUNC__
22 #define __FUNC__ "CreateColmap_MPIBAIJ_Private"
23 static int CreateColmap_MPIBAIJ_Private(Mat mat)
24 {
25   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
26   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
27   int         nbs = B->nbs,i,bs=B->bs;;
28 
29   PetscFunctionBegin;
30   baij->colmap = (int *) PetscMalloc((baij->Nbs+1)*sizeof(int));CHKPTRQ(baij->colmap);
31   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
32   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
33   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i*bs+1;
34   PetscFunctionReturn(0);
35 }
36 
37 #define CHUNKSIZE  10
38 
39 #define  MatSetValues_SeqBAIJ_A_Private(row,col,value,addv) \
40 { \
41  \
42     brow = row/bs;  \
43     rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
44     rmax = aimax[brow]; nrow = ailen[brow]; \
45       bcol = col/bs; \
46       ridx = row % bs; cidx = col % bs; \
47       low = 0; high = nrow; \
48       while (high-low > 3) { \
49         t = (low+high)/2; \
50         if (rp[t] > bcol) high = t; \
51         else              low  = t; \
52       } \
53       for ( _i=low; _i<high; _i++ ) { \
54         if (rp[_i] > bcol) break; \
55         if (rp[_i] == bcol) { \
56           bap  = ap +  bs2*_i + bs*cidx + ridx; \
57           if (addv == ADD_VALUES) *bap += value;  \
58           else                    *bap  = value;  \
59           goto a_noinsert; \
60         } \
61       } \
62       if (a->nonew == 1) goto a_noinsert; \
63       else if (a->nonew == -1) SETERRQ(1,0,"Inserting a new nonzero in the matrix"); \
64       if (nrow >= rmax) { \
65         /* there is no extra room in row, therefore enlarge */ \
66         int    new_nz = ai[a->mbs] + CHUNKSIZE,len,*new_i,*new_j; \
67         Scalar *new_a; \
68  \
69         if (a->nonew == -2) SETERRQ(1,0,"Inserting a new nonzero in the matrix"); \
70  \
71         /* malloc new storage space */ \
72         len     = new_nz*(sizeof(int)+bs2*sizeof(Scalar))+(a->mbs+1)*sizeof(int); \
73         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
74         new_j   = (int *) (new_a + bs2*new_nz); \
75         new_i   = new_j + new_nz; \
76  \
77         /* copy over old data into new slots */ \
78         for ( ii=0; ii<brow+1; ii++ ) {new_i[ii] = ai[ii];} \
79         for ( ii=brow+1; ii<a->mbs+1; ii++ ) {new_i[ii] = ai[ii]+CHUNKSIZE;} \
80         PetscMemcpy(new_j,aj,(ai[brow]+nrow)*sizeof(int)); \
81         len = (new_nz - CHUNKSIZE - ai[brow] - nrow); \
82         PetscMemcpy(new_j+ai[brow]+nrow+CHUNKSIZE,aj+ai[brow]+nrow, \
83                                                            len*sizeof(int)); \
84         PetscMemcpy(new_a,aa,(ai[brow]+nrow)*bs2*sizeof(Scalar)); \
85         PetscMemzero(new_a+bs2*(ai[brow]+nrow),bs2*CHUNKSIZE*sizeof(Scalar)); \
86         PetscMemcpy(new_a+bs2*(ai[brow]+nrow+CHUNKSIZE), \
87                     aa+bs2*(ai[brow]+nrow),bs2*len*sizeof(Scalar));  \
88         /* free up old matrix storage */ \
89         PetscFree(a->a);  \
90         if (!a->singlemalloc) {PetscFree(a->i);PetscFree(a->j);} \
91         aa = a->a = new_a; ai = a->i = new_i; aj = a->j = new_j;  \
92         a->singlemalloc = 1; \
93  \
94         rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
95         rmax = aimax[brow] = aimax[brow] + CHUNKSIZE; \
96         PLogObjectMemory(A,CHUNKSIZE*(sizeof(int) + bs2*sizeof(Scalar))); \
97         a->maxnz += bs2*CHUNKSIZE; \
98         a->reallocs++; \
99         a->nz++; \
100       } \
101       N = nrow++ - 1;  \
102       /* shift up all the later entries in this row */ \
103       for ( ii=N; ii>=_i; ii-- ) { \
104         rp[ii+1] = rp[ii]; \
105         PetscMemcpy(ap+bs2*(ii+1),ap+bs2*(ii),bs2*sizeof(Scalar)); \
106       } \
107       if (N>=_i) PetscMemzero(ap+bs2*_i,bs2*sizeof(Scalar));  \
108       rp[_i]                      = bcol;  \
109       ap[bs2*_i + bs*cidx + ridx] = value;  \
110       a_noinsert:; \
111     ailen[brow] = nrow; \
112 }
113 
114 #define  MatSetValues_SeqBAIJ_B_Private(row,col,value,addv) \
115 { \
116  \
117     brow = row/bs;  \
118     rp   = bj + bi[brow]; ap = ba + bs2*bi[brow]; \
119     rmax = bimax[brow]; nrow = bilen[brow]; \
120       bcol = col/bs; \
121       ridx = row % bs; cidx = col % bs; \
122       low = 0; high = nrow; \
123       while (high-low > 3) { \
124         t = (low+high)/2; \
125         if (rp[t] > bcol) high = t; \
126         else              low  = t; \
127       } \
128       for ( _i=low; _i<high; _i++ ) { \
129         if (rp[_i] > bcol) break; \
130         if (rp[_i] == bcol) { \
131           bap  = ap +  bs2*_i + bs*cidx + ridx; \
132           if (addv == ADD_VALUES) *bap += value;  \
133           else                    *bap  = value;  \
134           goto b_noinsert; \
135         } \
136       } \
137       if (b->nonew == 1) goto b_noinsert; \
138       else if (b->nonew == -1) SETERRQ(1,0,"Inserting a new nonzero in the matrix"); \
139       if (nrow >= rmax) { \
140         /* there is no extra room in row, therefore enlarge */ \
141         int    new_nz = bi[b->mbs] + CHUNKSIZE,len,*new_i,*new_j; \
142         Scalar *new_a; \
143  \
144         if (b->nonew == -2) SETERRQ(1,0,"Inserting a new nonzero in the matrix"); \
145  \
146         /* malloc new storage space */ \
147         len     = new_nz*(sizeof(int)+bs2*sizeof(Scalar))+(b->mbs+1)*sizeof(int); \
148         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
149         new_j   = (int *) (new_a + bs2*new_nz); \
150         new_i   = new_j + new_nz; \
151  \
152         /* copy over old data into new slots */ \
153         for ( ii=0; ii<brow+1; ii++ ) {new_i[ii] = bi[ii];} \
154         for ( ii=brow+1; ii<b->mbs+1; ii++ ) {new_i[ii] = bi[ii]+CHUNKSIZE;} \
155         PetscMemcpy(new_j,bj,(bi[brow]+nrow)*sizeof(int)); \
156         len = (new_nz - CHUNKSIZE - bi[brow] - nrow); \
157         PetscMemcpy(new_j+bi[brow]+nrow+CHUNKSIZE,bj+bi[brow]+nrow, \
158                                                            len*sizeof(int)); \
159         PetscMemcpy(new_a,ba,(bi[brow]+nrow)*bs2*sizeof(Scalar)); \
160         PetscMemzero(new_a+bs2*(bi[brow]+nrow),bs2*CHUNKSIZE*sizeof(Scalar)); \
161         PetscMemcpy(new_a+bs2*(bi[brow]+nrow+CHUNKSIZE), \
162                     ba+bs2*(bi[brow]+nrow),bs2*len*sizeof(Scalar));  \
163         /* free up old matrix storage */ \
164         PetscFree(b->a);  \
165         if (!b->singlemalloc) {PetscFree(b->i);PetscFree(b->j);} \
166         ba = b->a = new_a; bi = b->i = new_i; bj = b->j = new_j;  \
167         b->singlemalloc = 1; \
168  \
169         rp   = bj + bi[brow]; ap = ba + bs2*bi[brow]; \
170         rmax = bimax[brow] = bimax[brow] + CHUNKSIZE; \
171         PLogObjectMemory(B,CHUNKSIZE*(sizeof(int) + bs2*sizeof(Scalar))); \
172         b->maxnz += bs2*CHUNKSIZE; \
173         b->reallocs++; \
174         b->nz++; \
175       } \
176       N = nrow++ - 1;  \
177       /* shift up all the later entries in this row */ \
178       for ( ii=N; ii>=_i; ii-- ) { \
179         rp[ii+1] = rp[ii]; \
180         PetscMemcpy(ap+bs2*(ii+1),ap+bs2*(ii),bs2*sizeof(Scalar)); \
181       } \
182       if (N>=_i) PetscMemzero(ap+bs2*_i,bs2*sizeof(Scalar));  \
183       rp[_i]                      = bcol;  \
184       ap[bs2*_i + bs*cidx + ridx] = value;  \
185       b_noinsert:; \
186     bilen[brow] = nrow; \
187 }
188 
189 #undef __FUNC__
190 #define __FUNC__ "MatSetValues_MPIBAIJ"
191 int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
192 {
193   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
194   Scalar      value;
195   int         ierr,i,j,row,col;
196   int         roworiented = baij->roworiented,rstart_orig=baij->rstart_bs ;
197   int         rend_orig=baij->rend_bs,cstart_orig=baij->cstart_bs;
198   int         cend_orig=baij->cend_bs,bs=baij->bs;
199 
200   /* Some Variables required in the macro */
201   Mat         A = baij->A;
202   Mat_SeqBAIJ *a = (Mat_SeqBAIJ *) (A)->data;
203   int         *aimax=a->imax,*ai=a->i,*ailen=a->ilen,*aj=a->j;
204   Scalar      *aa=a->a;
205 
206   Mat         B = baij->B;
207   Mat_SeqBAIJ *b = (Mat_SeqBAIJ *) (B)->data;
208   int         *bimax=b->imax,*bi=b->i,*bilen=b->ilen,*bj=b->j;
209   Scalar      *ba=b->a;
210 
211   int         *rp,ii,nrow,_i,rmax,N,brow,bcol;
212   int         low,high,t,ridx,cidx,bs2=a->bs2;
213   Scalar      *ap,*bap;
214 
215   PetscFunctionBegin;
216   for ( i=0; i<m; i++ ) {
217 #if defined(USE_PETSC_BOPT_g)
218     if (im[i] < 0) SETERRQ(1,0,"Negative row");
219     if (im[i] >= baij->M) SETERRQ(1,0,"Row too large");
220 #endif
221     if (im[i] >= rstart_orig && im[i] < rend_orig) {
222       row = im[i] - rstart_orig;
223       for ( j=0; j<n; j++ ) {
224         if (in[j] >= cstart_orig && in[j] < cend_orig){
225           col = in[j] - cstart_orig;
226           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
227           MatSetValues_SeqBAIJ_A_Private(row,col,value,addv);
228           /* ierr = MatSetValues_SeqBAIJ(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
229         }
230 #if defined(USE_PETSC_BOPT_g)
231         else if (in[j] < 0) {SETERRQ(1,0,"Negative column");}
232         else if (in[j] >= baij->N) {SETERRQ(1,0,"Col too large");}
233 #endif
234         else {
235           if (mat->was_assembled) {
236             if (!baij->colmap) {
237               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
238             }
239             col = baij->colmap[in[j]/bs] - 1 + in[j]%bs;
240             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
241               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
242               col =  in[j];
243               /* Reinitialize the variables required by MatSetValues_SeqBAIJ_B_Private() */
244               B = baij->B;
245               b = (Mat_SeqBAIJ *) (B)->data;
246               bimax=b->imax;bi=b->i;bilen=b->ilen;bj=b->j;
247               ba=b->a;
248             }
249           } else col = in[j];
250           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
251           MatSetValues_SeqBAIJ_B_Private(row,col,value,addv);
252           /* ierr = MatSetValues_SeqBAIJ(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
253         }
254       }
255     } else {
256       if (roworiented && !baij->donotstash) {
257         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
258       } else {
259         if (!baij->donotstash) {
260           row = im[i];
261 	  for ( j=0; j<n; j++ ) {
262 	    ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
263           }
264         }
265       }
266     }
267   }
268   PetscFunctionReturn(0);
269 }
270 
271 extern int MatSetValuesBlocked_SeqBAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
272 #undef __FUNC__
273 #define __FUNC__ "MatSetValuesBlocked_MPIBAIJ"
274 int MatSetValuesBlocked_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
275 {
276   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
277   Scalar      *value,*barray=baij->barray;
278   int         ierr,i,j,ii,jj,row,col,k,l;
279   int         roworiented = baij->roworiented,rstart=baij->rstart ;
280   int         rend=baij->rend,cstart=baij->cstart,stepval;
281   int         cend=baij->cend,bs=baij->bs,bs2=baij->bs2;
282 
283   if(!barray) {
284     baij->barray = barray = (Scalar*) PetscMalloc(bs2*sizeof(Scalar)); CHKPTRQ(barray);
285   }
286 
287   if (roworiented) {
288     stepval = (n-1)*bs;
289   } else {
290     stepval = (m-1)*bs;
291   }
292   for ( i=0; i<m; i++ ) {
293 #if defined(USE_PETSC_BOPT_g)
294     if (im[i] < 0) SETERRQ(1,0,"Negative row");
295     if (im[i] >= baij->Mbs) SETERRQ(1,0,"Row too large");
296 #endif
297     if (im[i] >= rstart && im[i] < rend) {
298       row = im[i] - rstart;
299       for ( j=0; j<n; j++ ) {
300         /* If NumCol = 1 then a copy is not required */
301         if ((roworiented) && (n == 1)) {
302           barray = v + i*bs2;
303         } else if((!roworiented) && (m == 1)) {
304           barray = v + j*bs2;
305         } else { /* Here a copy is required */
306           if (roworiented) {
307             value = v + i*(stepval+bs)*bs + j*bs;
308           } else {
309             value = v + j*(stepval+bs)*bs + i*bs;
310           }
311           for ( ii=0; ii<bs; ii++,value+=stepval ) {
312             for (jj=0; jj<bs; jj++ ) {
313               *barray++  = *value++;
314             }
315           }
316           barray -=bs2;
317         }
318 
319         if (in[j] >= cstart && in[j] < cend){
320           col  = in[j] - cstart;
321           ierr = MatSetValuesBlocked_SeqBAIJ(baij->A,1,&row,1,&col,barray,addv);CHKERRQ(ierr);
322         }
323 #if defined(USE_PETSC_BOPT_g)
324         else if (in[j] < 0) {SETERRQ(1,0,"Negative column");}
325         else if (in[j] >= baij->Nbs) {SETERRQ(1,0,"Column too large");}
326 #endif
327         else {
328           if (mat->was_assembled) {
329             if (!baij->colmap) {
330               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
331             }
332 
333 #if defined(USE_PETSC_BOPT_g)
334             if ((baij->colmap[in[j]] - 1) % bs) {SETERRQ(1,0,"Incorrect colmap");}
335 #endif
336             col = (baij->colmap[in[j]] - 1)/bs;
337             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
338               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
339               col =  in[j];
340             }
341           }
342           else col = in[j];
343           ierr = MatSetValuesBlocked_SeqBAIJ(baij->B,1,&row,1,&col,barray,addv);CHKERRQ(ierr);
344         }
345       }
346     } else {
347       if (!baij->donotstash) {
348         if (roworiented ) {
349           row   = im[i]*bs;
350           value = v + i*(stepval+bs)*bs;
351           for ( j=0; j<bs; j++,row++ ) {
352             for ( k=0; k<n; k++ ) {
353               for ( col=in[k]*bs,l=0; l<bs; l++,col++) {
354                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
355               }
356             }
357           }
358         } else {
359           for ( j=0; j<n; j++ ) {
360             value = v + j*(stepval+bs)*bs + i*bs;
361             col   = in[j]*bs;
362             for ( k=0; k<bs; k++,col++,value+=stepval) {
363               for ( row = im[i]*bs,l=0; l<bs; l++,row++) {
364                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
365               }
366             }
367           }
368         }
369       }
370     }
371   }
372   PetscFunctionReturn(0);
373 }
374 
375 #undef __FUNC__
376 #define __FUNC__ "MatGetValues_MPIBAIJ"
377 int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
378 {
379   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
380   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
381   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col;
382 
383   for ( i=0; i<m; i++ ) {
384     if (idxm[i] < 0) SETERRQ(1,0,"Negative row");
385     if (idxm[i] >= baij->M) SETERRQ(1,0,"Row too large");
386     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
387       row = idxm[i] - bsrstart;
388       for ( j=0; j<n; j++ ) {
389         if (idxn[j] < 0) SETERRQ(1,0,"Negative column");
390         if (idxn[j] >= baij->N) SETERRQ(1,0,"Col too large");
391         if (idxn[j] >= bscstart && idxn[j] < bscend){
392           col = idxn[j] - bscstart;
393           ierr = MatGetValues(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
394         } else {
395           if (!baij->colmap) {
396             ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
397           }
398           if((baij->colmap[idxn[j]/bs]-1 < 0) ||
399              (baij->garray[(baij->colmap[idxn[j]/bs]-1)/bs] != idxn[j]/bs)) *(v+i*n+j) = 0.0;
400           else {
401             col  = (baij->colmap[idxn[j]/bs]-1) + idxn[j]%bs;
402             ierr = MatGetValues(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
403           }
404         }
405       }
406     } else {
407       SETERRQ(1,0,"Only local values currently supported");
408     }
409   }
410   PetscFunctionReturn(0);
411 }
412 
413 #undef __FUNC__
414 #define __FUNC__ "MatNorm_MPIBAIJ"
415 int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
416 {
417   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
418   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
419   int        ierr, i,bs2=baij->bs2;
420   double     sum = 0.0;
421   Scalar     *v;
422 
423   PetscFunctionBegin;
424   if (baij->size == 1) {
425     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
426   } else {
427     if (type == NORM_FROBENIUS) {
428       v = amat->a;
429       for (i=0; i<amat->nz*bs2; i++ ) {
430 #if defined(USE_PETSC_COMPLEX)
431         sum += real(conj(*v)*(*v)); v++;
432 #else
433         sum += (*v)*(*v); v++;
434 #endif
435       }
436       v = bmat->a;
437       for (i=0; i<bmat->nz*bs2; i++ ) {
438 #if defined(USE_PETSC_COMPLEX)
439         sum += real(conj(*v)*(*v)); v++;
440 #else
441         sum += (*v)*(*v); v++;
442 #endif
443       }
444       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
445       *norm = sqrt(*norm);
446     } else {
447       SETERRQ(PETSC_ERR_SUP,0,"No support for this norm yet");
448     }
449   }
450   PetscFunctionReturn(0);
451 }
452 
453 #undef __FUNC__
454 #define __FUNC__ "MatAssemblyBegin_MPIBAIJ"
455 int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
456 {
457   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
458   MPI_Comm    comm = mat->comm;
459   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
460   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
461   MPI_Request *send_waits,*recv_waits;
462   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
463   InsertMode  addv;
464   Scalar      *rvalues,*svalues;
465 
466   PetscFunctionBegin;
467   /* make sure all processors are either in INSERTMODE or ADDMODE */
468   MPI_Allreduce(&mat->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
469   if (addv == (ADD_VALUES|INSERT_VALUES)) {
470     SETERRQ(1,0,"Some processors inserted others added");
471   }
472   mat->insertmode = addv; /* in case this processor had no cache */
473 
474   /*  first count number of contributors to each processor */
475   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
476   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
477   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
478   for ( i=0; i<baij->stash.n; i++ ) {
479     idx = baij->stash.idx[i];
480     for ( j=0; j<size; j++ ) {
481       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
482         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
483       }
484     }
485   }
486   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
487 
488   /* inform other processors of number of messages and max length*/
489   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
490   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
491   nreceives = work[rank];
492   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
493   nmax = work[rank];
494   PetscFree(work);
495 
496   /* post receives:
497        1) each message will consist of ordered pairs
498      (global index,value) we store the global index as a double
499      to simplify the message passing.
500        2) since we don't know how long each individual message is we
501      allocate the largest needed buffer for each receive. Potentially
502      this is a lot of wasted space.
503 
504 
505        This could be done better.
506   */
507   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));CHKPTRQ(rvalues);
508   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
509   for ( i=0; i<nreceives; i++ ) {
510     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
511   }
512 
513   /* do sends:
514       1) starts[i] gives the starting index in svalues for stuff going to
515          the ith processor
516   */
517   svalues = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
518   send_waits = (MPI_Request *) PetscMalloc((nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
519   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
520   starts[0] = 0;
521   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
522   for ( i=0; i<baij->stash.n; i++ ) {
523     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
524     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
525     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
526   }
527   PetscFree(owner);
528   starts[0] = 0;
529   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
530   count = 0;
531   for ( i=0; i<size; i++ ) {
532     if (procs[i]) {
533       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,comm,send_waits+count++);
534     }
535   }
536   PetscFree(starts); PetscFree(nprocs);
537 
538   /* Free cache space */
539   PLogInfo(mat,"MatAssemblyBegin_MPIBAIJ:Number of off-processor values %d\n",baij->stash.n);
540   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
541 
542   baij->svalues    = svalues;    baij->rvalues    = rvalues;
543   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
544   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
545   baij->rmax       = nmax;
546 
547   PetscFunctionReturn(0);
548 }
549 #include <math.h>
550 #define HASH_KEY 0.6180339887
551 #define HASH1(size,key) ((int)((size)*fmod(((key)*HASH_KEY),1)))
552 #define HASH2(size,key) ((int)((size)*fmod(((key+0.5)*HASH_KEY),1)))
553 
554 
555 int CreateHashTable(Mat mat,double factor)
556 {
557   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
558   Mat         A = baij->A, B=baij->B;
559   Mat_SeqBAIJ *a=(Mat_SeqBAIJ *)A->data, *b=(Mat_SeqBAIJ *)B->data;
560   int         i,j,k,nz=a->nz+b->nz,h1,h2,*ai=a->i,*aj=a->j,*bi=b->i,*bj=b->j;
561   int         size=(int)(factor*nz),ct=0,max1=0,max2=0;
562   /* Scalar      *aa=a->a,*ba=b->a; */
563   double      key;
564   static double *HT;
565   static      int flag=1;
566   extern int PetscGlobalRank;
567 
568   PetscFunctionBegin;
569   flag = 1;
570   /* Allocate Memory for Hash Table */
571   if (flag) {
572     HT = (double*)PetscMalloc(size*sizeof(double));
573     flag = 0;
574   }
575   PetscMemzero(HT,size*sizeof(double));
576 
577   /* Loop Over A */
578   for ( i=0; i<a->n; i++ ) {
579     for ( j=ai[i]; j<ai[i+1]; j++ ) {
580       key = i*baij->n+aj[j]+1;
581       h1  = HASH1(size,key);
582       h2  = HASH2(size,key);
583 
584       for ( k=1; k<size; k++ ){
585         if (HT[(h1*k)%size] == 0.0) {
586           HT[(h1*k)%size] = key;
587           break;
588         } else ct++;
589       }
590       if (k> max1) max1 =k;
591     }
592   }
593   /* Loop Over B */
594   for ( i=0; i<b->n; i++ ) {
595     for ( j=bi[i]; j<bi[i+1]; j++ ) {
596       key = i*b->n+bj[j]+1;
597       h1  = HASH1(size,key);
598       for ( k=1; k<size; k++ ){
599         if (HT[(h1*k)%size] == 0.0) {
600           HT[(h1*k)%size] = key;
601           break;
602         } else ct++;
603       }
604       if (k> max2) max2 =k;
605     }
606   }
607 
608   /* Print Summary */
609   for ( i=0,key=0.0,j=0; i<size; i++)
610     if (HT[i]) {j++;}
611 
612   printf("[%d] fact = %3.2f max1 = %5d max2 = %5d Size %5d - Searches %5d Avg %5.2f Keys %5d\n",
613          PetscGlobalRank,factor,max1,max2,size,ct+j,((float)ct+j)/j,j);
614   PetscFree(HT);
615   PetscFunctionReturn(0);
616 }
617 
618 #undef __FUNC__
619 #define __FUNC__ "MatAssemblyEnd_MPIBAIJ"
620 int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
621 {
622   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
623   MPI_Status  *send_status,recv_status;
624   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
625   int         bs=baij->bs,row,col,other_disassembled,flg;
626   Scalar      *values,val;
627   InsertMode  addv = mat->insertmode;
628 
629   PetscFunctionBegin;
630   /*  wait on receives */
631   while (count) {
632     MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);
633     /* unpack receives into our local space */
634     values = baij->rvalues + 3*imdex*baij->rmax;
635     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
636     n = n/3;
637     for ( i=0; i<n; i++ ) {
638       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
639       col = (int) PetscReal(values[3*i+1]);
640       val = values[3*i+2];
641       if (col >= baij->cstart*bs && col < baij->cend*bs) {
642         col -= baij->cstart*bs;
643         ierr = MatSetValues(baij->A,1,&row,1,&col,&val,addv); CHKERRQ(ierr)
644       } else {
645         if (mat->was_assembled) {
646           if (!baij->colmap) {
647             ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);
648           }
649           col = (baij->colmap[col/bs]) - 1 + col%bs;
650           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
651             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
652             col = (int) PetscReal(values[3*i+1]);
653           }
654         }
655         ierr = MatSetValues(baij->B,1,&row,1,&col,&val,addv); CHKERRQ(ierr)
656       }
657     }
658     count--;
659   }
660   PetscFree(baij->recv_waits); PetscFree(baij->rvalues);
661 
662   /* wait on sends */
663   if (baij->nsends) {
664     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
665     MPI_Waitall(baij->nsends,baij->send_waits,send_status);
666     PetscFree(send_status);
667   }
668   PetscFree(baij->send_waits); PetscFree(baij->svalues);
669 
670   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
671   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
672 
673   /* determine if any processor has disassembled, if so we must
674      also disassemble ourselfs, in order that we may reassemble. */
675   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
676   if (mat->was_assembled && !other_disassembled) {
677     ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
678   }
679 
680   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
681     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
682   }
683   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
684   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
685 
686   ierr = OptionsHasName(PETSC_NULL,"-use_hash",&flg); CHKERRQ(ierr);
687   if (flg) {
688     double fact;
689     for ( fact=1.2; fact<2.0; fact +=0.05) CreateHashTable(mat,fact);
690   }
691   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
692   PetscFunctionReturn(0);
693 }
694 
695 #undef __FUNC__
696 #define __FUNC__ "MatView_MPIBAIJ_Binary"
697 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
698 {
699   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
700   int          ierr;
701 
702   PetscFunctionBegin;
703   if (baij->size == 1) {
704     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
705   } else SETERRQ(1,0,"Only uniprocessor output supported");
706   PetscFunctionReturn(0);
707 }
708 
709 #undef __FUNC__
710 #define __FUNC__ "MatView_MPIBAIJ_ASCIIorDraworMatlab"
711 static int MatView_MPIBAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
712 {
713   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
714   int          ierr, format,rank,bs = baij->bs;
715   FILE         *fd;
716   ViewerType   vtype;
717 
718   PetscFunctionBegin;
719   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
720   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
721     ierr = ViewerGetFormat(viewer,&format);
722     if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
723       MatInfo info;
724       MPI_Comm_rank(mat->comm,&rank);
725       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
726       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
727       PetscSequentialPhaseBegin(mat->comm,1);
728       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
729               rank,baij->m,(int)info.nz_used*bs,(int)info.nz_allocated*bs,
730               baij->bs,(int)info.memory);
731       ierr = MatGetInfo(baij->A,MAT_LOCAL,&info);
732       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
733       ierr = MatGetInfo(baij->B,MAT_LOCAL,&info);
734       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
735       fflush(fd);
736       PetscSequentialPhaseEnd(mat->comm,1);
737       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
738       PetscFunctionReturn(0);
739     } else if (format == VIEWER_FORMAT_ASCII_INFO) {
740       PetscPrintf(mat->comm,"  block size is %d\n",bs);
741       PetscFunctionReturn(0);
742     }
743   }
744 
745   if (vtype == DRAW_VIEWER) {
746     Draw       draw;
747     PetscTruth isnull;
748     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
749     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) PetscFunctionReturn(0);
750   }
751 
752   if (vtype == ASCII_FILE_VIEWER) {
753     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
754     PetscSequentialPhaseBegin(mat->comm,1);
755     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
756            baij->rank,baij->m,baij->rstart*bs,baij->rend*bs,baij->n,
757             baij->cstart*bs,baij->cend*bs);
758     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
759     ierr = MatView(baij->B,viewer); CHKERRQ(ierr);
760     fflush(fd);
761     PetscSequentialPhaseEnd(mat->comm,1);
762   } else {
763     int size = baij->size;
764     rank = baij->rank;
765     if (size == 1) {
766       ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
767     } else {
768       /* assemble the entire matrix onto first processor. */
769       Mat         A;
770       Mat_SeqBAIJ *Aloc;
771       int         M = baij->M, N = baij->N,*ai,*aj,row,col,i,j,k,*rvals;
772       int         mbs=baij->mbs;
773       Scalar      *a;
774 
775       if (!rank) {
776         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
777         CHKERRQ(ierr);
778       } else {
779         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
780         CHKERRQ(ierr);
781       }
782       PLogObjectParent(mat,A);
783 
784       /* copy over the A part */
785       Aloc = (Mat_SeqBAIJ*) baij->A->data;
786       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
787       row = baij->rstart;
788       rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
789 
790       for ( i=0; i<mbs; i++ ) {
791         rvals[0] = bs*(baij->rstart + i);
792         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
793         for ( j=ai[i]; j<ai[i+1]; j++ ) {
794           col = (baij->cstart+aj[j])*bs;
795           for (k=0; k<bs; k++ ) {
796             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
797             col++; a += bs;
798           }
799         }
800       }
801       /* copy over the B part */
802       Aloc = (Mat_SeqBAIJ*) baij->B->data;
803       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
804       row = baij->rstart*bs;
805       for ( i=0; i<mbs; i++ ) {
806         rvals[0] = bs*(baij->rstart + i);
807         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
808         for ( j=ai[i]; j<ai[i+1]; j++ ) {
809           col = baij->garray[aj[j]]*bs;
810           for (k=0; k<bs; k++ ) {
811             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
812             col++; a += bs;
813           }
814         }
815       }
816       PetscFree(rvals);
817       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
818       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
819       if (!rank) {
820         ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
821       }
822       ierr = MatDestroy(A); CHKERRQ(ierr);
823     }
824   }
825   PetscFunctionReturn(0);
826 }
827 
828 
829 
830 #undef __FUNC__
831 #define __FUNC__ "MatView_MPIBAIJ"
832 int MatView_MPIBAIJ(PetscObject obj,Viewer viewer)
833 {
834   Mat         mat = (Mat) obj;
835   int         ierr;
836   ViewerType  vtype;
837 
838   PetscFunctionBegin;
839   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
840   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
841       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
842     ierr = MatView_MPIBAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
843   } else if (vtype == BINARY_FILE_VIEWER) {
844     ierr = MatView_MPIBAIJ_Binary(mat,viewer);CHKERRQ(ierr);
845   }
846   PetscFunctionReturn(0);
847 }
848 
849 #undef __FUNC__
850 #define __FUNC__ "MatDestroy_MPIBAIJ"
851 int MatDestroy_MPIBAIJ(PetscObject obj)
852 {
853   Mat         mat = (Mat) obj;
854   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
855   int         ierr;
856 
857   PetscFunctionBegin;
858 #if defined(USE_PETSC_LOG)
859   PLogObjectState(obj,"Rows=%d, Cols=%d",baij->M,baij->N);
860 #endif
861 
862   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
863   PetscFree(baij->rowners);
864   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
865   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
866   if (baij->colmap) PetscFree(baij->colmap);
867   if (baij->garray) PetscFree(baij->garray);
868   if (baij->lvec)   VecDestroy(baij->lvec);
869   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
870   if (baij->rowvalues) PetscFree(baij->rowvalues);
871   if (baij->barray) PetscFree(baij->barray);
872   PetscFree(baij);
873   PLogObjectDestroy(mat);
874   PetscHeaderDestroy(mat);
875   PetscFunctionReturn(0);
876 }
877 
878 #undef __FUNC__
879 #define __FUNC__ "MatMult_MPIBAIJ"
880 int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
881 {
882   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
883   int         ierr, nt;
884 
885   PetscFunctionBegin;
886   VecGetLocalSize_Fast(xx,nt);
887   if (nt != a->n) {
888     SETERRQ(1,0,"Incompatible partition of A and xx");
889   }
890   VecGetLocalSize_Fast(yy,nt);
891   if (nt != a->m) {
892     SETERRQ(1,0,"Incompatible parition of A and yy");
893   }
894   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
895   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
896   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
897   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
898   ierr = VecScatterPostRecvs(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
899   PetscFunctionReturn(0);
900 }
901 
902 #undef __FUNC__
903 #define __FUNC__ "MatMultAdd_MPIBAIJ"
904 int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
905 {
906   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
907   int        ierr;
908 
909   PetscFunctionBegin;
910   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
911   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
912   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
913   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
914   PetscFunctionReturn(0);
915 }
916 
917 #undef __FUNC__
918 #define __FUNC__ "MatMultTrans_MPIBAIJ"
919 int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
920 {
921   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
922   int         ierr;
923 
924   PetscFunctionBegin;
925   /* do nondiagonal part */
926   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
927   /* send it on its way */
928   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
929   /* do local part */
930   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
931   /* receive remote parts: note this assumes the values are not actually */
932   /* inserted in yy until the next line, which is true for my implementation*/
933   /* but is not perhaps always true. */
934   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
935   PetscFunctionReturn(0);
936 }
937 
938 #undef __FUNC__
939 #define __FUNC__ "MatMultTransAdd_MPIBAIJ"
940 int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
941 {
942   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
943   int         ierr;
944 
945   PetscFunctionBegin;
946   /* do nondiagonal part */
947   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
948   /* send it on its way */
949   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
950   /* do local part */
951   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
952   /* receive remote parts: note this assumes the values are not actually */
953   /* inserted in yy until the next line, which is true for my implementation*/
954   /* but is not perhaps always true. */
955   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
956   PetscFunctionReturn(0);
957 }
958 
959 /*
960   This only works correctly for square matrices where the subblock A->A is the
961    diagonal block
962 */
963 #undef __FUNC__
964 #define __FUNC__ "MatGetDiagonal_MPIBAIJ"
965 int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
966 {
967   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
968   int         ierr;
969 
970   PetscFunctionBegin;
971   if (a->M != a->N) SETERRQ(1,0,"Supports only square matrix where A->A is diag block");
972   ierr = MatGetDiagonal(a->A,v);CHKERRQ(ierr);
973   PetscFunctionReturn(0);
974 }
975 
976 #undef __FUNC__
977 #define __FUNC__ "MatScale_MPIBAIJ"
978 int MatScale_MPIBAIJ(Scalar *aa,Mat A)
979 {
980   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
981   int         ierr;
982 
983   PetscFunctionBegin;
984   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
985   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
986   PetscFunctionReturn(0);
987 }
988 
989 #undef __FUNC__
990 #define __FUNC__ "MatGetSize_MPIBAIJ"
991 int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
992 {
993   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
994 
995   PetscFunctionBegin;
996   if (m) *m = mat->M;
997   if (n) *n = mat->N;
998   PetscFunctionReturn(0);
999 }
1000 
1001 #undef __FUNC__
1002 #define __FUNC__ "MatGetLocalSize_MPIBAIJ"
1003 int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
1004 {
1005   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1006 
1007   PetscFunctionBegin;
1008   *m = mat->m; *n = mat->N;
1009   PetscFunctionReturn(0);
1010 }
1011 
1012 #undef __FUNC__
1013 #define __FUNC__ "MatGetOwnershipRange_MPIBAIJ"
1014 int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
1015 {
1016   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1017 
1018   PetscFunctionBegin;
1019   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
1020   PetscFunctionReturn(0);
1021 }
1022 
1023 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
1024 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
1025 
1026 #undef __FUNC__
1027 #define __FUNC__ "MatGetRow_MPIBAIJ"
1028 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
1029 {
1030   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1031   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
1032   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
1033   int        nztot, nzA, nzB, lrow, brstart = mat->rstart*bs, brend = mat->rend*bs;
1034   int        *cmap, *idx_p,cstart = mat->cstart;
1035 
1036   PetscFunctionBegin;
1037   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,0,"Already active");
1038   mat->getrowactive = PETSC_TRUE;
1039 
1040   if (!mat->rowvalues && (idx || v)) {
1041     /*
1042         allocate enough space to hold information from the longest row.
1043     */
1044     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
1045     int     max = 1,mbs = mat->mbs,tmp;
1046     for ( i=0; i<mbs; i++ ) {
1047       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
1048       if (max < tmp) { max = tmp; }
1049     }
1050     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
1051     CHKPTRQ(mat->rowvalues);
1052     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
1053   }
1054 
1055   if (row < brstart || row >= brend) SETERRQ(1,0,"Only local rows")
1056   lrow = row - brstart;
1057 
1058   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
1059   if (!v)   {pvA = 0; pvB = 0;}
1060   if (!idx) {pcA = 0; if (!v) pcB = 0;}
1061   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1062   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1063   nztot = nzA + nzB;
1064 
1065   cmap  = mat->garray;
1066   if (v  || idx) {
1067     if (nztot) {
1068       /* Sort by increasing column numbers, assuming A and B already sorted */
1069       int imark = -1;
1070       if (v) {
1071         *v = v_p = mat->rowvalues;
1072         for ( i=0; i<nzB; i++ ) {
1073           if (cmap[cworkB[i]/bs] < cstart)   v_p[i] = vworkB[i];
1074           else break;
1075         }
1076         imark = i;
1077         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
1078         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
1079       }
1080       if (idx) {
1081         *idx = idx_p = mat->rowindices;
1082         if (imark > -1) {
1083           for ( i=0; i<imark; i++ ) {
1084             idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs;
1085           }
1086         } else {
1087           for ( i=0; i<nzB; i++ ) {
1088             if (cmap[cworkB[i]/bs] < cstart)
1089               idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
1090             else break;
1091           }
1092           imark = i;
1093         }
1094         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart*bs + cworkA[i];
1095         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
1096       }
1097     } else {
1098       if (idx) *idx = 0;
1099       if (v)   *v   = 0;
1100     }
1101   }
1102   *nz = nztot;
1103   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1104   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1105   PetscFunctionReturn(0);
1106 }
1107 
1108 #undef __FUNC__
1109 #define __FUNC__ "MatRestoreRow_MPIBAIJ"
1110 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
1111 {
1112   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1113 
1114   PetscFunctionBegin;
1115   if (baij->getrowactive == PETSC_FALSE) {
1116     SETERRQ(1,0,"MatGetRow not called");
1117   }
1118   baij->getrowactive = PETSC_FALSE;
1119   PetscFunctionReturn(0);
1120 }
1121 
1122 #undef __FUNC__
1123 #define __FUNC__ "MatGetBlockSize_MPIBAIJ"
1124 int MatGetBlockSize_MPIBAIJ(Mat mat,int *bs)
1125 {
1126   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1127 
1128   PetscFunctionBegin;
1129   *bs = baij->bs;
1130   PetscFunctionReturn(0);
1131 }
1132 
1133 #undef __FUNC__
1134 #define __FUNC__ "MatZeroEntries_MPIBAIJ"
1135 int MatZeroEntries_MPIBAIJ(Mat A)
1136 {
1137   Mat_MPIBAIJ *l = (Mat_MPIBAIJ *) A->data;
1138   int         ierr;
1139 
1140   PetscFunctionBegin;
1141   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
1142   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
1143   PetscFunctionReturn(0);
1144 }
1145 
1146 #undef __FUNC__
1147 #define __FUNC__ "MatGetInfo_MPIBAIJ"
1148 int MatGetInfo_MPIBAIJ(Mat matin,MatInfoType flag,MatInfo *info)
1149 {
1150   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) matin->data;
1151   Mat         A = a->A, B = a->B;
1152   int         ierr;
1153   double      isend[5], irecv[5];
1154 
1155   PetscFunctionBegin;
1156   info->rows_global    = (double)a->M;
1157   info->columns_global = (double)a->N;
1158   info->rows_local     = (double)a->m;
1159   info->columns_local  = (double)a->N;
1160   info->block_size     = (double)a->bs;
1161   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
1162   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->memory;
1163   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
1164   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->memory;
1165   if (flag == MAT_LOCAL) {
1166     info->nz_used      = isend[0];
1167     info->nz_allocated = isend[1];
1168     info->nz_unneeded  = isend[2];
1169     info->memory       = isend[3];
1170     info->mallocs      = isend[4];
1171   } else if (flag == MAT_GLOBAL_MAX) {
1172     MPI_Allreduce(isend,irecv,5,MPI_INT,MPI_MAX,matin->comm);
1173     info->nz_used      = irecv[0];
1174     info->nz_allocated = irecv[1];
1175     info->nz_unneeded  = irecv[2];
1176     info->memory       = irecv[3];
1177     info->mallocs      = irecv[4];
1178   } else if (flag == MAT_GLOBAL_SUM) {
1179     MPI_Allreduce(isend,irecv,5,MPI_INT,MPI_SUM,matin->comm);
1180     info->nz_used      = irecv[0];
1181     info->nz_allocated = irecv[1];
1182     info->nz_unneeded  = irecv[2];
1183     info->memory       = irecv[3];
1184     info->mallocs      = irecv[4];
1185   }
1186   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
1187   info->fill_ratio_needed = 0;
1188   info->factor_mallocs    = 0;
1189   PetscFunctionReturn(0);
1190 }
1191 
1192 #undef __FUNC__
1193 #define __FUNC__ "MatSetOption_MPIBAIJ"
1194 int MatSetOption_MPIBAIJ(Mat A,MatOption op)
1195 {
1196   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1197 
1198   PetscFunctionBegin;
1199   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
1200       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
1201       op == MAT_COLUMNS_UNSORTED ||
1202       op == MAT_COLUMNS_SORTED ||
1203       op == MAT_NEW_NONZERO_ALLOCATION_ERROR ||
1204       op == MAT_NEW_NONZERO_LOCATION_ERROR) {
1205         MatSetOption(a->A,op);
1206         MatSetOption(a->B,op);
1207   } else if (op == MAT_ROW_ORIENTED) {
1208         a->roworiented = 1;
1209         MatSetOption(a->A,op);
1210         MatSetOption(a->B,op);
1211   } else if (op == MAT_ROWS_SORTED ||
1212              op == MAT_ROWS_UNSORTED ||
1213              op == MAT_SYMMETRIC ||
1214              op == MAT_STRUCTURALLY_SYMMETRIC ||
1215              op == MAT_YES_NEW_DIAGONALS)
1216     PLogInfo(A,"Info:MatSetOption_MPIBAIJ:Option ignored\n");
1217   else if (op == MAT_COLUMN_ORIENTED) {
1218     a->roworiented = 0;
1219     MatSetOption(a->A,op);
1220     MatSetOption(a->B,op);
1221   } else if (op == MAT_IGNORE_OFF_PROC_ENTRIES) {
1222     a->donotstash = 1;
1223   } else if (op == MAT_NO_NEW_DIAGONALS) {
1224     SETERRQ(PETSC_ERR_SUP,0,"MAT_NO_NEW_DIAGONALS");
1225   } else {
1226     SETERRQ(PETSC_ERR_SUP,0,"unknown option");
1227   }
1228   PetscFunctionReturn(0);
1229 }
1230 
1231 #undef __FUNC__
1232 #define __FUNC__ "MatTranspose_MPIBAIJ("
1233 int MatTranspose_MPIBAIJ(Mat A,Mat *matout)
1234 {
1235   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) A->data;
1236   Mat_SeqBAIJ *Aloc;
1237   Mat        B;
1238   int        ierr,M=baij->M,N=baij->N,*ai,*aj,row,i,*rvals,j,k,col;
1239   int        bs=baij->bs,mbs=baij->mbs;
1240   Scalar     *a;
1241 
1242   PetscFunctionBegin;
1243   if (matout == PETSC_NULL && M != N) SETERRQ(1,0,"Square matrix only for in-place");
1244   ierr = MatCreateMPIBAIJ(A->comm,baij->bs,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);
1245   CHKERRQ(ierr);
1246 
1247   /* copy over the A part */
1248   Aloc = (Mat_SeqBAIJ*) baij->A->data;
1249   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1250   row = baij->rstart;
1251   rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
1252 
1253   for ( i=0; i<mbs; i++ ) {
1254     rvals[0] = bs*(baij->rstart + i);
1255     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1256     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1257       col = (baij->cstart+aj[j])*bs;
1258       for (k=0; k<bs; k++ ) {
1259         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1260         col++; a += bs;
1261       }
1262     }
1263   }
1264   /* copy over the B part */
1265   Aloc = (Mat_SeqBAIJ*) baij->B->data;
1266   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1267   row = baij->rstart*bs;
1268   for ( i=0; i<mbs; i++ ) {
1269     rvals[0] = bs*(baij->rstart + i);
1270     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1271     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1272       col = baij->garray[aj[j]]*bs;
1273       for (k=0; k<bs; k++ ) {
1274         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1275         col++; a += bs;
1276       }
1277     }
1278   }
1279   PetscFree(rvals);
1280   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1281   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1282 
1283   if (matout != PETSC_NULL) {
1284     *matout = B;
1285   } else {
1286     /* This isn't really an in-place transpose .... but free data structures from baij */
1287     PetscFree(baij->rowners);
1288     ierr = MatDestroy(baij->A); CHKERRQ(ierr);
1289     ierr = MatDestroy(baij->B); CHKERRQ(ierr);
1290     if (baij->colmap) PetscFree(baij->colmap);
1291     if (baij->garray) PetscFree(baij->garray);
1292     if (baij->lvec) VecDestroy(baij->lvec);
1293     if (baij->Mvctx) VecScatterDestroy(baij->Mvctx);
1294     PetscFree(baij);
1295     PetscMemcpy(A,B,sizeof(struct _p_Mat));
1296     PetscHeaderDestroy(B);
1297   }
1298   PetscFunctionReturn(0);
1299 }
1300 
1301 #undef __FUNC__
1302 #define __FUNC__ "MatDiagonalScale_MPIBAIJ"
1303 int MatDiagonalScale_MPIBAIJ(Mat A,Vec ll,Vec rr)
1304 {
1305   Mat a = ((Mat_MPIBAIJ *) A->data)->A;
1306   Mat b = ((Mat_MPIBAIJ *) A->data)->B;
1307   int ierr,s1,s2,s3;
1308 
1309   PetscFunctionBegin;
1310   if (ll)  {
1311     ierr = VecGetLocalSize(ll,&s1); CHKERRQ(ierr);
1312     ierr = MatGetLocalSize(A,&s2,&s3); CHKERRQ(ierr);
1313     if (s1!=s2) SETERRQ(1,0,"non-conforming local sizes");
1314     ierr = MatDiagonalScale(a,ll,0); CHKERRQ(ierr);
1315     ierr = MatDiagonalScale(b,ll,0); CHKERRQ(ierr);
1316   }
1317   if (rr) SETERRQ(1,0,"not supported for right vector");
1318   PetscFunctionReturn(0);
1319 }
1320 
1321 /* the code does not do the diagonal entries correctly unless the
1322    matrix is square and the column and row owerships are identical.
1323    This is a BUG. The only way to fix it seems to be to access
1324    baij->A and baij->B directly and not through the MatZeroRows()
1325    routine.
1326 */
1327 #undef __FUNC__
1328 #define __FUNC__ "MatZeroRows_MPIBAIJ"
1329 int MatZeroRows_MPIBAIJ(Mat A,IS is,Scalar *diag)
1330 {
1331   Mat_MPIBAIJ    *l = (Mat_MPIBAIJ *) A->data;
1332   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
1333   int            *procs,*nprocs,j,found,idx,nsends,*work;
1334   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
1335   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
1336   int            *lens,imdex,*lrows,*values,bs=l->bs;
1337   MPI_Comm       comm = A->comm;
1338   MPI_Request    *send_waits,*recv_waits;
1339   MPI_Status     recv_status,*send_status;
1340   IS             istmp;
1341 
1342   PetscFunctionBegin;
1343   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
1344   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
1345 
1346   /*  first count number of contributors to each processor */
1347   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
1348   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
1349   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
1350   for ( i=0; i<N; i++ ) {
1351     idx = rows[i];
1352     found = 0;
1353     for ( j=0; j<size; j++ ) {
1354       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
1355         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
1356       }
1357     }
1358     if (!found) SETERRQ(1,0,"Index out of range");
1359   }
1360   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
1361 
1362   /* inform other processors of number of messages and max length*/
1363   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
1364   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
1365   nrecvs = work[rank];
1366   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
1367   nmax = work[rank];
1368   PetscFree(work);
1369 
1370   /* post receives:   */
1371   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); CHKPTRQ(rvalues);
1372   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
1373   for ( i=0; i<nrecvs; i++ ) {
1374     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
1375   }
1376 
1377   /* do sends:
1378       1) starts[i] gives the starting index in svalues for stuff going to
1379          the ith processor
1380   */
1381   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
1382   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
1383   CHKPTRQ(send_waits);
1384   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
1385   starts[0] = 0;
1386   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1387   for ( i=0; i<N; i++ ) {
1388     svalues[starts[owner[i]]++] = rows[i];
1389   }
1390   ISRestoreIndices(is,&rows);
1391 
1392   starts[0] = 0;
1393   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1394   count = 0;
1395   for ( i=0; i<size; i++ ) {
1396     if (procs[i]) {
1397       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
1398     }
1399   }
1400   PetscFree(starts);
1401 
1402   base = owners[rank]*bs;
1403 
1404   /*  wait on receives */
1405   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
1406   source = lens + nrecvs;
1407   count  = nrecvs; slen = 0;
1408   while (count) {
1409     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
1410     /* unpack receives into our local space */
1411     MPI_Get_count(&recv_status,MPI_INT,&n);
1412     source[imdex]  = recv_status.MPI_SOURCE;
1413     lens[imdex]  = n;
1414     slen += n;
1415     count--;
1416   }
1417   PetscFree(recv_waits);
1418 
1419   /* move the data into the send scatter */
1420   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
1421   count = 0;
1422   for ( i=0; i<nrecvs; i++ ) {
1423     values = rvalues + i*nmax;
1424     for ( j=0; j<lens[i]; j++ ) {
1425       lrows[count++] = values[j] - base;
1426     }
1427   }
1428   PetscFree(rvalues); PetscFree(lens);
1429   PetscFree(owner); PetscFree(nprocs);
1430 
1431   /* actually zap the local rows */
1432   ierr = ISCreateGeneral(PETSC_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
1433   PLogObjectParent(A,istmp);
1434   PetscFree(lrows);
1435   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
1436   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
1437   ierr = ISDestroy(istmp); CHKERRQ(ierr);
1438 
1439   /* wait on sends */
1440   if (nsends) {
1441     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
1442     MPI_Waitall(nsends,send_waits,send_status);
1443     PetscFree(send_status);
1444   }
1445   PetscFree(send_waits); PetscFree(svalues);
1446 
1447   PetscFunctionReturn(0);
1448 }
1449 extern int MatPrintHelp_SeqBAIJ(Mat);
1450 #undef __FUNC__
1451 #define __FUNC__ "MatPrintHelp_MPIBAIJ"
1452 int MatPrintHelp_MPIBAIJ(Mat A)
1453 {
1454   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1455   int         ierr;
1456 
1457   PetscFunctionBegin;
1458   if (!a->rank) {
1459     ierr = MatPrintHelp_SeqBAIJ(a->A);CHKERRQ(ierr);
1460   }
1461   PetscFunctionReturn(0);
1462 }
1463 
1464 #undef __FUNC__
1465 #define __FUNC__ "MatSetUnfactored_MPIBAIJ"
1466 int MatSetUnfactored_MPIBAIJ(Mat A)
1467 {
1468   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1469   int         ierr;
1470 
1471   PetscFunctionBegin;
1472   ierr = MatSetUnfactored(a->A); CHKERRQ(ierr);
1473   PetscFunctionReturn(0);
1474 }
1475 
1476 static int MatConvertSameType_MPIBAIJ(Mat,Mat *,int);
1477 
1478 /* -------------------------------------------------------------------*/
1479 static struct _MatOps MatOps = {
1480   MatSetValues_MPIBAIJ,MatGetRow_MPIBAIJ,MatRestoreRow_MPIBAIJ,MatMult_MPIBAIJ,
1481   MatMultAdd_MPIBAIJ,MatMultTrans_MPIBAIJ,MatMultTransAdd_MPIBAIJ,0,
1482   0,0,0,0,
1483   0,0,MatTranspose_MPIBAIJ,MatGetInfo_MPIBAIJ,
1484   0,MatGetDiagonal_MPIBAIJ,MatDiagonalScale_MPIBAIJ,MatNorm_MPIBAIJ,
1485   MatAssemblyBegin_MPIBAIJ,MatAssemblyEnd_MPIBAIJ,0,MatSetOption_MPIBAIJ,
1486   MatZeroEntries_MPIBAIJ,MatZeroRows_MPIBAIJ,0,
1487   0,0,0,MatGetSize_MPIBAIJ,
1488   MatGetLocalSize_MPIBAIJ,MatGetOwnershipRange_MPIBAIJ,0,0,
1489   0,0,MatConvertSameType_MPIBAIJ,0,0,
1490   0,0,0,MatGetSubMatrices_MPIBAIJ,
1491   MatIncreaseOverlap_MPIBAIJ,MatGetValues_MPIBAIJ,0,MatPrintHelp_MPIBAIJ,
1492   MatScale_MPIBAIJ,0,0,0,MatGetBlockSize_MPIBAIJ,
1493   0,0,0,0,0,0,MatSetUnfactored_MPIBAIJ,0,MatSetValuesBlocked_MPIBAIJ};
1494 
1495 
1496 #undef __FUNC__
1497 #define __FUNC__ "MatCreateMPIBAIJ"
1498 /*@C
1499    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
1500    (block compressed row).  For good matrix assembly performance
1501    the user should preallocate the matrix storage by setting the parameters
1502    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1503    performance can be increased by more than a factor of 50.
1504 
1505    Input Parameters:
1506 .  comm - MPI communicator
1507 .  bs   - size of blockk
1508 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1509            This value should be the same as the local size used in creating the
1510            y vector for the matrix-vector product y = Ax.
1511 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1512            This value should be the same as the local size used in creating the
1513            x vector for the matrix-vector product y = Ax.
1514 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1515 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1516 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
1517            submatrix  (same for all local rows)
1518 .  d_nzz - array containing the number of block nonzeros in the various block rows
1519            of the in diagonal portion of the local (possibly different for each block
1520            row) or PETSC_NULL.  You must leave room for the diagonal entry even if
1521            it is zero.
1522 .  o_nz  - number of block nonzeros per block row in the off-diagonal portion of local
1523            submatrix (same for all local rows).
1524 .  o_nzz - array containing the number of nonzeros in the various block rows of the
1525            off-diagonal portion of the local submatrix (possibly different for
1526            each block row) or PETSC_NULL.
1527 
1528    Output Parameter:
1529 .  A - the matrix
1530 
1531    Notes:
1532    The user MUST specify either the local or global matrix dimensions
1533    (possibly both).
1534 
1535    Storage Information:
1536    For a square global matrix we define each processor's diagonal portion
1537    to be its local rows and the corresponding columns (a square submatrix);
1538    each processor's off-diagonal portion encompasses the remainder of the
1539    local matrix (a rectangular submatrix).
1540 
1541    The user can specify preallocated storage for the diagonal part of
1542    the local submatrix with either d_nz or d_nnz (not both).  Set
1543    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1544    memory allocation.  Likewise, specify preallocated storage for the
1545    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1546 
1547    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1548    the figure below we depict these three local rows and all columns (0-11).
1549 
1550 $          0 1 2 3 4 5 6 7 8 9 10 11
1551 $         -------------------
1552 $  row 3  |  o o o d d d o o o o o o
1553 $  row 4  |  o o o d d d o o o o o o
1554 $  row 5  |  o o o d d d o o o o o o
1555 $         -------------------
1556 $
1557 
1558    Thus, any entries in the d locations are stored in the d (diagonal)
1559    submatrix, and any entries in the o locations are stored in the
1560    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1561    stored simply in the MATSEQBAIJ format for compressed row storage.
1562 
1563    Now d_nz should indicate the number of block nonzeros per row in the d matrix,
1564    and o_nz should indicate the number of block nonzeros per row in the o matrix.
1565    In general, for PDE problems in which most nonzeros are near the diagonal,
1566    one expects d_nz >> o_nz.   For large problems you MUST preallocate memory
1567    or you will get TERRIBLE performance; see the users' manual chapter on
1568    matrices.
1569 
1570 .keywords: matrix, block, aij, compressed row, sparse, parallel
1571 
1572 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues()
1573 @*/
1574 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
1575                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1576 {
1577   Mat          B;
1578   Mat_MPIBAIJ  *b;
1579   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE,size;
1580 
1581   PetscFunctionBegin;
1582   if (bs < 1) SETERRQ(1,0,"Invalid block size specified, must be positive");
1583 
1584   MPI_Comm_size(comm,&size);
1585   if (size == 1) {
1586     if (M == PETSC_DECIDE) M = m;
1587     if (N == PETSC_DECIDE) N = n;
1588     ierr = MatCreateSeqBAIJ(comm,bs,M,N,d_nz,d_nnz,A); CHKERRQ(ierr);
1589     PetscFunctionReturn(0);
1590   }
1591 
1592   *A = 0;
1593   PetscHeaderCreate(B,_p_Mat,MAT_COOKIE,MATMPIBAIJ,comm,MatDestroy,MatView);
1594   PLogObjectCreate(B);
1595   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
1596   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
1597   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1598 
1599   B->destroy    = MatDestroy_MPIBAIJ;
1600   B->view       = MatView_MPIBAIJ;
1601   B->mapping    = 0;
1602   B->factor     = 0;
1603   B->assembled  = PETSC_FALSE;
1604 
1605   B->insertmode = NOT_SET_VALUES;
1606   MPI_Comm_rank(comm,&b->rank);
1607   MPI_Comm_size(comm,&b->size);
1608 
1609   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL)) {
1610     SETERRQ(1,0,"Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1611   }
1612   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) SETERRQ(1,0,"either M or m should be specified");
1613   if ( N == PETSC_DECIDE && n == PETSC_DECIDE) SETERRQ(1,0,"either N or n should be specified");
1614   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
1615   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
1616 
1617   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1618     work[0] = m; work[1] = n;
1619     mbs = m/bs; nbs = n/bs;
1620     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
1621     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
1622     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
1623   }
1624   if (m == PETSC_DECIDE) {
1625     Mbs = M/bs;
1626     if (Mbs*bs != M) SETERRQ(1,0,"No of global rows must be divisible by blocksize");
1627     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
1628     m   = mbs*bs;
1629   }
1630   if (n == PETSC_DECIDE) {
1631     Nbs = N/bs;
1632     if (Nbs*bs != N) SETERRQ(1,0,"No of global cols must be divisible by blocksize");
1633     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
1634     n   = nbs*bs;
1635   }
1636   if (mbs*bs != m || nbs*bs != n) SETERRQ(1,0,"No of local rows, cols must be divisible by blocksize");
1637 
1638   b->m = m; B->m = m;
1639   b->n = n; B->n = n;
1640   b->N = N; B->N = N;
1641   b->M = M; B->M = M;
1642   b->bs  = bs;
1643   b->bs2 = bs*bs;
1644   b->mbs = mbs;
1645   b->nbs = nbs;
1646   b->Mbs = Mbs;
1647   b->Nbs = Nbs;
1648 
1649   /* build local table of row and column ownerships */
1650   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1651   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIBAIJ));
1652   b->cowners = b->rowners + b->size + 2;
1653   MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
1654   b->rowners[0] = 0;
1655   for ( i=2; i<=b->size; i++ ) {
1656     b->rowners[i] += b->rowners[i-1];
1657   }
1658   b->rstart    = b->rowners[b->rank];
1659   b->rend      = b->rowners[b->rank+1];
1660   b->rstart_bs = b->rstart * bs;
1661   b->rend_bs   = b->rend * bs;
1662 
1663   MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
1664   b->cowners[0] = 0;
1665   for ( i=2; i<=b->size; i++ ) {
1666     b->cowners[i] += b->cowners[i-1];
1667   }
1668   b->cstart    = b->cowners[b->rank];
1669   b->cend      = b->cowners[b->rank+1];
1670   b->cstart_bs = b->cstart * bs;
1671   b->cend_bs   = b->cend * bs;
1672 
1673   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1674   ierr = MatCreateSeqBAIJ(PETSC_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1675   PLogObjectParent(B,b->A);
1676   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1677   ierr = MatCreateSeqBAIJ(PETSC_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1678   PLogObjectParent(B,b->B);
1679 
1680   /* build cache for off array entries formed */
1681   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1682   b->donotstash  = 0;
1683   b->colmap      = 0;
1684   b->garray      = 0;
1685   b->roworiented = 1;
1686 
1687   /* stuff used in block assembly */
1688   b->barray      = 0;
1689 
1690   /* stuff used for matrix vector multiply */
1691   b->lvec        = 0;
1692   b->Mvctx       = 0;
1693 
1694   /* stuff for MatGetRow() */
1695   b->rowindices   = 0;
1696   b->rowvalues    = 0;
1697   b->getrowactive = PETSC_FALSE;
1698 
1699   *A = B;
1700   PetscFunctionReturn(0);
1701 }
1702 
1703 #undef __FUNC__
1704 #define __FUNC__ "MatConvertSameType_MPIBAIJ"
1705 static int MatConvertSameType_MPIBAIJ(Mat matin,Mat *newmat,int cpvalues)
1706 {
1707   Mat         mat;
1708   Mat_MPIBAIJ *a,*oldmat = (Mat_MPIBAIJ *) matin->data;
1709   int         ierr, len=0, flg;
1710 
1711   PetscFunctionBegin;
1712   *newmat       = 0;
1713   PetscHeaderCreate(mat,_p_Mat,MAT_COOKIE,MATMPIBAIJ,matin->comm,MatDestroy,MatView);
1714   PLogObjectCreate(mat);
1715   mat->data       = (void *) (a = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(a);
1716   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1717   mat->destroy    = MatDestroy_MPIBAIJ;
1718   mat->view       = MatView_MPIBAIJ;
1719   mat->factor     = matin->factor;
1720   mat->assembled  = PETSC_TRUE;
1721 
1722   a->m = mat->m   = oldmat->m;
1723   a->n = mat->n   = oldmat->n;
1724   a->M = mat->M   = oldmat->M;
1725   a->N = mat->N   = oldmat->N;
1726 
1727   a->bs  = oldmat->bs;
1728   a->bs2 = oldmat->bs2;
1729   a->mbs = oldmat->mbs;
1730   a->nbs = oldmat->nbs;
1731   a->Mbs = oldmat->Mbs;
1732   a->Nbs = oldmat->Nbs;
1733 
1734   a->rstart       = oldmat->rstart;
1735   a->rend         = oldmat->rend;
1736   a->cstart       = oldmat->cstart;
1737   a->cend         = oldmat->cend;
1738   a->size         = oldmat->size;
1739   a->rank         = oldmat->rank;
1740   mat->insertmode = NOT_SET_VALUES;
1741   a->rowvalues    = 0;
1742   a->getrowactive = PETSC_FALSE;
1743   a->barray       = 0;
1744 
1745   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1746   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIBAIJ));
1747   a->cowners = a->rowners + a->size + 2;
1748   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1749   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1750   if (oldmat->colmap) {
1751     a->colmap = (int *) PetscMalloc((a->Nbs)*sizeof(int));CHKPTRQ(a->colmap);
1752     PLogObjectMemory(mat,(a->Nbs)*sizeof(int));
1753     PetscMemcpy(a->colmap,oldmat->colmap,(a->Nbs)*sizeof(int));
1754   } else a->colmap = 0;
1755   if (oldmat->garray && (len = ((Mat_SeqBAIJ *) (oldmat->B->data))->nbs)) {
1756     a->garray = (int *) PetscMalloc(len*sizeof(int)); CHKPTRQ(a->garray);
1757     PLogObjectMemory(mat,len*sizeof(int));
1758     PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1759   } else a->garray = 0;
1760 
1761   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1762   PLogObjectParent(mat,a->lvec);
1763   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1764   PLogObjectParent(mat,a->Mvctx);
1765   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1766   PLogObjectParent(mat,a->A);
1767   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1768   PLogObjectParent(mat,a->B);
1769   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1770   if (flg) {
1771     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1772   }
1773   *newmat = mat;
1774   PetscFunctionReturn(0);
1775 }
1776 
1777 #include "sys.h"
1778 
1779 #undef __FUNC__
1780 #define __FUNC__ "MatLoad_MPIBAIJ"
1781 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
1782 {
1783   Mat          A;
1784   int          i, nz, ierr, j,rstart, rend, fd;
1785   Scalar       *vals,*buf;
1786   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1787   MPI_Status   status;
1788   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
1789   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
1790   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,bs2,Mbs,mbs,extra_rows;
1791   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
1792   int          dcount,kmax,k,nzcount,tmp;
1793 
1794   PetscFunctionBegin;
1795   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
1796   bs2  = bs*bs;
1797 
1798   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1799   if (!rank) {
1800     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1801     ierr = PetscBinaryRead(fd,(char *)header,4,PETSC_INT); CHKERRQ(ierr);
1802     if (header[0] != MAT_COOKIE) SETERRQ(1,0,"not matrix object");
1803     if (header[3] < 0) {
1804       SETERRQ(1,1,"Matrix stored in special format on disk, cannot load as MPIBAIJ");
1805     }
1806   }
1807 
1808   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1809   M = header[1]; N = header[2];
1810 
1811   if (M != N) SETERRQ(1,0,"Can only do square matrices");
1812 
1813   /*
1814      This code adds extra rows to make sure the number of rows is
1815      divisible by the blocksize
1816   */
1817   Mbs        = M/bs;
1818   extra_rows = bs - M + bs*(Mbs);
1819   if (extra_rows == bs) extra_rows = 0;
1820   else                  Mbs++;
1821   if (extra_rows &&!rank) {
1822     PLogInfo(0,"MatLoad_MPIBAIJ:Padding loaded matrix to match blocksize\n");
1823   }
1824 
1825   /* determine ownership of all rows */
1826   mbs = Mbs/size + ((Mbs % size) > rank);
1827   m   = mbs * bs;
1828   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
1829   browners = rowners + size + 1;
1830   MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1831   rowners[0] = 0;
1832   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
1833   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
1834   rstart = rowners[rank];
1835   rend   = rowners[rank+1];
1836 
1837   /* distribute row lengths to all processors */
1838   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
1839   if (!rank) {
1840     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
1841     ierr = PetscBinaryRead(fd,rowlengths,M,PETSC_INT); CHKERRQ(ierr);
1842     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
1843     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1844     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
1845     MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);
1846     PetscFree(sndcounts);
1847   } else {
1848     MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);
1849   }
1850 
1851   if (!rank) {
1852     /* calculate the number of nonzeros on each processor */
1853     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1854     PetscMemzero(procsnz,size*sizeof(int));
1855     for ( i=0; i<size; i++ ) {
1856       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
1857         procsnz[i] += rowlengths[j];
1858       }
1859     }
1860     PetscFree(rowlengths);
1861 
1862     /* determine max buffer needed and allocate it */
1863     maxnz = 0;
1864     for ( i=0; i<size; i++ ) {
1865       maxnz = PetscMax(maxnz,procsnz[i]);
1866     }
1867     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1868 
1869     /* read in my part of the matrix column indices  */
1870     nz = procsnz[0];
1871     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1872     mycols = ibuf;
1873     if (size == 1)  nz -= extra_rows;
1874     ierr = PetscBinaryRead(fd,mycols,nz,PETSC_INT); CHKERRQ(ierr);
1875     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
1876 
1877     /* read in every ones (except the last) and ship off */
1878     for ( i=1; i<size-1; i++ ) {
1879       nz = procsnz[i];
1880       ierr = PetscBinaryRead(fd,cols,nz,PETSC_INT); CHKERRQ(ierr);
1881       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1882     }
1883     /* read in the stuff for the last proc */
1884     if ( size != 1 ) {
1885       nz = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
1886       ierr = PetscBinaryRead(fd,cols,nz,PETSC_INT); CHKERRQ(ierr);
1887       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
1888       MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);
1889     }
1890     PetscFree(cols);
1891   } else {
1892     /* determine buffer space needed for message */
1893     nz = 0;
1894     for ( i=0; i<m; i++ ) {
1895       nz += locrowlens[i];
1896     }
1897     ibuf = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1898     mycols = ibuf;
1899     /* receive message of column indices*/
1900     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1901     MPI_Get_count(&status,MPI_INT,&maxnz);
1902     if (maxnz != nz) SETERRQ(1,0,"something is wrong with file");
1903   }
1904 
1905   /* loop over local rows, determining number of off diagonal entries */
1906   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
1907   odlens = dlens + (rend-rstart);
1908   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
1909   PetscMemzero(mask,3*Mbs*sizeof(int));
1910   masked1 = mask    + Mbs;
1911   masked2 = masked1 + Mbs;
1912   rowcount = 0; nzcount = 0;
1913   for ( i=0; i<mbs; i++ ) {
1914     dcount  = 0;
1915     odcount = 0;
1916     for ( j=0; j<bs; j++ ) {
1917       kmax = locrowlens[rowcount];
1918       for ( k=0; k<kmax; k++ ) {
1919         tmp = mycols[nzcount++]/bs;
1920         if (!mask[tmp]) {
1921           mask[tmp] = 1;
1922           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
1923           else masked1[dcount++] = tmp;
1924         }
1925       }
1926       rowcount++;
1927     }
1928 
1929     dlens[i]  = dcount;
1930     odlens[i] = odcount;
1931 
1932     /* zero out the mask elements we set */
1933     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
1934     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
1935   }
1936 
1937   /* create our matrix */
1938   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);
1939          CHKERRQ(ierr);
1940   A = *newmat;
1941   MatSetOption(A,MAT_COLUMNS_SORTED);
1942 
1943   if (!rank) {
1944     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
1945     /* read in my part of the matrix numerical values  */
1946     nz = procsnz[0];
1947     vals = buf;
1948     mycols = ibuf;
1949     if (size == 1)  nz -= extra_rows;
1950     ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
1951     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
1952 
1953     /* insert into matrix */
1954     jj      = rstart*bs;
1955     for ( i=0; i<m; i++ ) {
1956       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1957       mycols += locrowlens[i];
1958       vals   += locrowlens[i];
1959       jj++;
1960     }
1961     /* read in other processors (except the last one) and ship out */
1962     for ( i=1; i<size-1; i++ ) {
1963       nz = procsnz[i];
1964       vals = buf;
1965       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
1966       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1967     }
1968     /* the last proc */
1969     if ( size != 1 ){
1970       nz = procsnz[i] - extra_rows;
1971       vals = buf;
1972       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
1973       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
1974       MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);
1975     }
1976     PetscFree(procsnz);
1977   } else {
1978     /* receive numeric values */
1979     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
1980 
1981     /* receive message of values*/
1982     vals = buf;
1983     mycols = ibuf;
1984     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1985     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1986     if (maxnz != nz) SETERRQ(1,0,"something is wrong with file");
1987 
1988     /* insert into matrix */
1989     jj      = rstart*bs;
1990     for ( i=0; i<m; i++ ) {
1991       ierr    = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1992       mycols += locrowlens[i];
1993       vals   += locrowlens[i];
1994       jj++;
1995     }
1996   }
1997   PetscFree(locrowlens);
1998   PetscFree(buf);
1999   PetscFree(ibuf);
2000   PetscFree(rowners);
2001   PetscFree(dlens);
2002   PetscFree(mask);
2003   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2004   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2005   PetscFunctionReturn(0);
2006 }
2007 
2008 
2009