xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision abffa341d06f511e7d690deae555d0d92cf30f96)
1 #ifndef lint
2 static char vcid[] = "$Id: mpibaij.c,v 1.61 1997/03/29 00:41:11 balay Exp balay $";
3 #endif
4 
5 #include "pinclude/pviewer.h"
6 #include "src/mat/impls/baij/mpi/mpibaij.h"
7 #include "src/vec/vecimpl.h"
8 
9 
10 extern int MatSetUpMultiply_MPIBAIJ(Mat);
11 extern int DisAssemble_MPIBAIJ(Mat);
12 extern int MatIncreaseOverlap_MPIBAIJ(Mat,int,IS *,int);
13 extern int MatGetSubMatrices_MPIBAIJ(Mat,int,IS *,IS *,MatGetSubMatrixCall,Mat **);
14 extern int MatLUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,Mat *);
15 extern int MatLUFactorNumeric_MPIBAIJ(Mat,Mat *);
16 extern int MatLUFactor_MPIBAIJ(Mat,IS,IS,double);
17 extern int MatSolve_MPIBAIJ(Mat,Vec,Vec);
18 extern int MatSolveAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
19 extern int MatSolveTrans_MPIBAIJ(Mat,Vec,Vec);
20 extern int MatSolveTransAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
21 extern int MatILUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,int,Mat *);
22 
23 
24 /*
25      Local utility routine that creates a mapping from the global column
26    number to the local number in the off-diagonal part of the local
27    storage of the matrix.  This is done in a non scable way since the
28    length of colmap equals the global matrix length.
29 */
30 #undef __FUNC__
31 #define __FUNC__ "CreateColmap_MPIBAIJ_Private"
32 static int CreateColmap_MPIBAIJ_Private(Mat mat)
33 {
34   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
35   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
36   int         nbs = B->nbs,i,bs=B->bs;;
37 
38   baij->colmap = (int *) PetscMalloc(baij->Nbs*sizeof(int));CHKPTRQ(baij->colmap);
39   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
40   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
41   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i*bs+1;
42   return 0;
43 }
44 
45 #undef __FUNC__
46 #define __FUNC__ "MatGetRowIJ_MPIBAIJ("
47 static int MatGetRowIJ_MPIBAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
48                             PetscTruth *done)
49 {
50   Mat_MPIBAIJ *aij = (Mat_MPIBAIJ *) mat->data;
51   int         ierr;
52   if (aij->size == 1) {
53     ierr = MatGetRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
54   } else SETERRQ(1,0,"not supported in parallel");
55   return 0;
56 }
57 
58 #undef __FUNC__
59 #define __FUNC__ "MatRestoreRowIJ_MPIBAIJ"
60 static int MatRestoreRowIJ_MPIBAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
61                                 PetscTruth *done)
62 {
63   Mat_MPIBAIJ *aij = (Mat_MPIBAIJ *) mat->data;
64   int        ierr;
65   if (aij->size == 1) {
66     ierr = MatRestoreRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
67   } else SETERRQ(1,0,"not supported in parallel");
68   return 0;
69 }
70 #define CHUNKSIZE  10
71 
72 #define  MatSetValues_SeqBAIJ_A_Private(row,col,value,addv) \
73 { \
74  \
75     brow = row/bs;  \
76     rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
77     rmax = aimax[brow]; nrow = ailen[brow]; \
78       bcol = col/bs; \
79       ridx = row % bs; cidx = col % bs; \
80       low = 0; high = nrow; \
81       while (high-low > 3) { \
82         t = (low+high)/2; \
83         if (rp[t] > bcol) high = t; \
84         else              low  = t; \
85       } \
86       for ( _i=low; _i<high; _i++ ) { \
87         if (rp[_i] > bcol) break; \
88         if (rp[_i] == bcol) { \
89           bap  = ap +  bs2*_i + bs*cidx + ridx; \
90           if (addv == ADD_VALUES) *bap += value;  \
91           else                    *bap  = value;  \
92           goto a_noinsert; \
93         } \
94       } \
95       if (a->nonew) goto a_noinsert; \
96       if (nrow >= rmax) { \
97         /* there is no extra room in row, therefore enlarge */ \
98         int    new_nz = ai[a->mbs] + CHUNKSIZE,len,*new_i,*new_j; \
99         Scalar *new_a; \
100  \
101         /* malloc new storage space */ \
102         len     = new_nz*(sizeof(int)+bs2*sizeof(Scalar))+(a->mbs+1)*sizeof(int); \
103         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
104         new_j   = (int *) (new_a + bs2*new_nz); \
105         new_i   = new_j + new_nz; \
106  \
107         /* copy over old data into new slots */ \
108         for ( ii=0; ii<brow+1; ii++ ) {new_i[ii] = ai[ii];} \
109         for ( ii=brow+1; ii<a->mbs+1; ii++ ) {new_i[ii] = ai[ii]+CHUNKSIZE;} \
110         PetscMemcpy(new_j,aj,(ai[brow]+nrow)*sizeof(int)); \
111         len = (new_nz - CHUNKSIZE - ai[brow] - nrow); \
112         PetscMemcpy(new_j+ai[brow]+nrow+CHUNKSIZE,aj+ai[brow]+nrow, \
113                                                            len*sizeof(int)); \
114         PetscMemcpy(new_a,aa,(ai[brow]+nrow)*bs2*sizeof(Scalar)); \
115         PetscMemzero(new_a+bs2*(ai[brow]+nrow),bs2*CHUNKSIZE*sizeof(Scalar)); \
116         PetscMemcpy(new_a+bs2*(ai[brow]+nrow+CHUNKSIZE), \
117                     aa+bs2*(ai[brow]+nrow),bs2*len*sizeof(Scalar));  \
118         /* free up old matrix storage */ \
119         PetscFree(a->a);  \
120         if (!a->singlemalloc) {PetscFree(a->i);PetscFree(a->j);} \
121         aa = a->a = new_a; ai = a->i = new_i; aj = a->j = new_j;  \
122         a->singlemalloc = 1; \
123  \
124         rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
125         rmax = aimax[brow] = aimax[brow] + CHUNKSIZE; \
126         PLogObjectMemory(A,CHUNKSIZE*(sizeof(int) + bs2*sizeof(Scalar))); \
127         a->maxnz += bs2*CHUNKSIZE; \
128         a->reallocs++; \
129         a->nz++; \
130       } \
131       N = nrow++ - 1;  \
132       /* shift up all the later entries in this row */ \
133       for ( ii=N; ii>=_i; ii-- ) { \
134         rp[ii+1] = rp[ii]; \
135         PetscMemcpy(ap+bs2*(ii+1),ap+bs2*(ii),bs2*sizeof(Scalar)); \
136       } \
137       if (N>=_i) PetscMemzero(ap+bs2*_i,bs2*sizeof(Scalar));  \
138       rp[_i]                      = bcol;  \
139       ap[bs2*_i + bs*cidx + ridx] = value;  \
140       a_noinsert:; \
141     ailen[brow] = nrow; \
142 }
143 
144 #define  MatSetValues_SeqBAIJ_B_Private(row,col,value,addv) \
145 { \
146  \
147     brow = row/bs;  \
148     rp   = bj + bi[brow]; ap = ba + bs2*bi[brow]; \
149     rmax = bimax[brow]; nrow = bilen[brow]; \
150       bcol = col/bs; \
151       ridx = row % bs; cidx = col % bs; \
152       low = 0; high = nrow; \
153       while (high-low > 3) { \
154         t = (low+high)/2; \
155         if (rp[t] > bcol) high = t; \
156         else              low  = t; \
157       } \
158       for ( _i=low; _i<high; _i++ ) { \
159         if (rp[_i] > bcol) break; \
160         if (rp[_i] == bcol) { \
161           bap  = ap +  bs2*_i + bs*cidx + ridx; \
162           if (addv == ADD_VALUES) *bap += value;  \
163           else                    *bap  = value;  \
164           goto b_noinsert; \
165         } \
166       } \
167       if (b->nonew) goto b_noinsert; \
168       if (nrow >= rmax) { \
169         /* there is no extra room in row, therefore enlarge */ \
170         int    new_nz = bi[b->mbs] + CHUNKSIZE,len,*new_i,*new_j; \
171         Scalar *new_a; \
172  \
173         /* malloc new storage space */ \
174         len     = new_nz*(sizeof(int)+bs2*sizeof(Scalar))+(b->mbs+1)*sizeof(int); \
175         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
176         new_j   = (int *) (new_a + bs2*new_nz); \
177         new_i   = new_j + new_nz; \
178  \
179         /* copy over old data into new slots */ \
180         for ( ii=0; ii<brow+1; ii++ ) {new_i[ii] = bi[ii];} \
181         for ( ii=brow+1; ii<b->mbs+1; ii++ ) {new_i[ii] = bi[ii]+CHUNKSIZE;} \
182         PetscMemcpy(new_j,bj,(bi[brow]+nrow)*sizeof(int)); \
183         len = (new_nz - CHUNKSIZE - bi[brow] - nrow); \
184         PetscMemcpy(new_j+bi[brow]+nrow+CHUNKSIZE,bj+bi[brow]+nrow, \
185                                                            len*sizeof(int)); \
186         PetscMemcpy(new_a,ba,(bi[brow]+nrow)*bs2*sizeof(Scalar)); \
187         PetscMemzero(new_a+bs2*(bi[brow]+nrow),bs2*CHUNKSIZE*sizeof(Scalar)); \
188         PetscMemcpy(new_a+bs2*(bi[brow]+nrow+CHUNKSIZE), \
189                     ba+bs2*(bi[brow]+nrow),bs2*len*sizeof(Scalar));  \
190         /* free up old matrix storage */ \
191         PetscFree(b->a);  \
192         if (!b->singlemalloc) {PetscFree(b->i);PetscFree(b->j);} \
193         ba = b->a = new_a; bi = b->i = new_i; bj = b->j = new_j;  \
194         b->singlemalloc = 1; \
195  \
196         rp   = bj + bi[brow]; ap = ba + bs2*bi[brow]; \
197         rmax = bimax[brow] = bimax[brow] + CHUNKSIZE; \
198         PLogObjectMemory(B,CHUNKSIZE*(sizeof(int) + bs2*sizeof(Scalar))); \
199         b->maxnz += bs2*CHUNKSIZE; \
200         b->reallocs++; \
201         b->nz++; \
202       } \
203       N = nrow++ - 1;  \
204       /* shift up all the later entries in this row */ \
205       for ( ii=N; ii>=_i; ii-- ) { \
206         rp[ii+1] = rp[ii]; \
207         PetscMemcpy(ap+bs2*(ii+1),ap+bs2*(ii),bs2*sizeof(Scalar)); \
208       } \
209       if (N>=_i) PetscMemzero(ap+bs2*_i,bs2*sizeof(Scalar));  \
210       rp[_i]                      = bcol;  \
211       ap[bs2*_i + bs*cidx + ridx] = value;  \
212       b_noinsert:; \
213     bilen[brow] = nrow; \
214 }
215 
216 #undef __FUNC__
217 #define __FUNC__ "MatSetValues_MPIBAIJ"
218 int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
219 {
220   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
221   Scalar      value;
222   int         ierr,i,j,row,col;
223   int         roworiented = baij->roworiented,rstart_orig=baij->rstart_bs ;
224   int         rend_orig=baij->rend_bs,cstart_orig=baij->cstart_bs;
225   int         cend_orig=baij->cend_bs,bs=baij->bs;
226 
227   /* Some Variables required in the macro */
228   Mat         A = baij->A;
229   Mat_SeqBAIJ *a = (Mat_SeqBAIJ *) (A)->data;
230   int         *aimax=a->imax,*ai=a->i,*ailen=a->ilen,*aj=a->j;
231   Scalar      *aa=a->a;
232 
233   Mat         B = baij->B;
234   Mat_SeqBAIJ *b = (Mat_SeqBAIJ *) (B)->data;
235   int         *bimax=b->imax,*bi=b->i,*bilen=b->ilen,*bj=b->j;
236   Scalar      *ba=b->a;
237 
238   int         *rp,ii,nrow,_i,rmax,N,brow,bcol;
239   int         low,high,t,ridx,cidx,bs2=a->bs2;
240   Scalar      *ap,*bap;
241 
242   for ( i=0; i<m; i++ ) {
243 #if defined(PETSC_BOPT_g)
244     if (im[i] < 0) SETERRQ(1,0,"Negative row");
245     if (im[i] >= baij->M) SETERRQ(1,0,"Row too large");
246 #endif
247     if (im[i] >= rstart_orig && im[i] < rend_orig) {
248       row = im[i] - rstart_orig;
249       for ( j=0; j<n; j++ ) {
250         if (in[j] >= cstart_orig && in[j] < cend_orig){
251           col = in[j] - cstart_orig;
252           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
253           MatSetValues_SeqBAIJ_A_Private(row,col,value,addv);
254           /* ierr = MatSetValues_SeqBAIJ(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
255         }
256 #if defined(PETSC_BOPT_g)
257         else if (in[j] < 0) {SETERRQ(1,0,"Negative column");}
258         else if (in[j] >= baij->N) {SETERRQ(1,0,"Col too large");}
259 #endif
260         else {
261           if (mat->was_assembled) {
262             if (!baij->colmap) {
263               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
264             }
265             col = baij->colmap[in[j]/bs] - 1 + in[j]%bs;
266             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
267               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
268               col =  in[j];
269             }
270           }
271           else col = in[j];
272           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
273           MatSetValues_SeqBAIJ_B_Private(row,col,value,addv);
274           /* ierr = MatSetValues_SeqBAIJ(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
275         }
276       }
277     }
278     else {
279       if (roworiented && !baij->donotstash) {
280         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
281       }
282       else {
283         if (!baij->donotstash) {
284           row = im[i];
285 	  for ( j=0; j<n; j++ ) {
286 	    ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
287           }
288         }
289       }
290     }
291   }
292   return 0;
293 }
294 
295 extern int MatSetValuesBlocked_SeqBAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
296 #undef __FUNC__
297 #define __FUNC__ "MatSetValuesBlocked_MPIBAIJ"
298 int MatSetValuesBlocked_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
299 {
300   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
301   Scalar      *value,*barray=baij->barray;
302   int         ierr,i,j,ii,jj,row,col,k,l;
303   int         roworiented = baij->roworiented,rstart=baij->rstart ;
304   int         rend=baij->rend,cstart=baij->cstart,stepval;
305   int         cend=baij->cend,bs=baij->bs,bs2=baij->bs2;
306 
307 
308   if(!barray) {
309     barray = (Scalar*) PetscMalloc(bs2*sizeof(Scalar)); CHKPTRQ(barray);
310     baij->barray = barray;
311   }
312 
313   if (roworiented) {
314     stepval = (n-1)*bs;
315   } else {
316     stepval = (m-1)*bs;
317   }
318   for ( i=0; i<m; i++ ) {
319 #if defined(PETSC_BOPT_g)
320     if (im[i] < 0) SETERRQ(1,0,"Negative row");
321     if (im[i] >= baij->Mbs) SETERRQ(1,0,"Row too large");
322 #endif
323     if (im[i] >= rstart && im[i] < rend) {
324       row = im[i] - rstart;
325       for ( j=0; j<n; j++ ) {
326         if (roworiented) {
327           value = v + i*(stepval+bs)*bs + j*bs;
328         } else {
329           value = v + j*(stepval+bs)*bs + i*bs;
330         }
331         for ( ii=0; ii<bs; ii++,value+=stepval )
332           for (jj=0; jj<bs; jj++ )
333             *barray++  = *value++;
334         barray -=bs2;
335 
336         if (in[j] >= cstart && in[j] < cend){
337           col = in[j] - cstart;
338           ierr = MatSetValuesBlocked_SeqBAIJ(baij->A,1,&row,1,&col,barray,addv);CHKERRQ(ierr);
339         }
340 #if defined(PETSC_BOPT_g)
341         else if (in[j] < 0) {SETERRQ(1,0,"Negative column");}
342         else if (in[j] >= baij->Nbs) {SETERRQ(1,0,"Col too large");}
343 #endif
344         else {
345           if (mat->was_assembled) {
346             if (!baij->colmap) {
347               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
348             }
349             col = baij->colmap[in[j]] - 1;
350             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
351               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
352               col =  in[j];
353             }
354           }
355           else col = in[j];
356           ierr = MatSetValuesBlocked_SeqBAIJ(baij->B,1,&row,1,&col,barray,addv);CHKERRQ(ierr);
357         }
358       }
359     }
360     else {
361       if (!baij->donotstash) {
362         if (roworiented ) {
363           row   = im[i]*bs;
364           value = v + i*(stepval+bs)*bs;
365           for ( j=0; j<bs; j++,row++ ) {
366             for ( k=0; k<n; k++ ) {
367               for ( col=in[k]*bs,l=0; l<bs; l++,col++) {
368                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
369               }
370             }
371           }
372         }
373         else {
374           for ( j=0; j<n; j++ ) {
375             value = v + j*(stepval+bs)*bs + i*bs;
376             col   = in[j]*bs;
377             for ( k=0; k<bs; k++,col++,value+=stepval) {
378               for ( row = im[i]*bs,l=0; l<bs; l++,row++) {
379                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
380               }
381             }
382           }
383 
384         }
385       }
386     }
387   }
388   return 0;
389 }
390 
391 #undef __FUNC__
392 #define __FUNC__ "MatGetValues_MPIBAIJ"
393 int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
394 {
395   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
396   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
397   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col;
398 
399   for ( i=0; i<m; i++ ) {
400     if (idxm[i] < 0) SETERRQ(1,0,"Negative row");
401     if (idxm[i] >= baij->M) SETERRQ(1,0,"Row too large");
402     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
403       row = idxm[i] - bsrstart;
404       for ( j=0; j<n; j++ ) {
405         if (idxn[j] < 0) SETERRQ(1,0,"Negative column");
406         if (idxn[j] >= baij->N) SETERRQ(1,0,"Col too large");
407         if (idxn[j] >= bscstart && idxn[j] < bscend){
408           col = idxn[j] - bscstart;
409           ierr = MatGetValues(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
410         }
411         else {
412           if (!baij->colmap) {
413             ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
414           }
415           if((baij->colmap[idxn[j]/bs]-1 < 0) ||
416              (baij->garray[(baij->colmap[idxn[j]/bs]-1)/bs] != idxn[j]/bs)) *(v+i*n+j) = 0.0;
417           else {
418             col  = (baij->colmap[idxn[j]/bs]-1) + idxn[j]%bs;
419             ierr = MatGetValues(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
420           }
421         }
422       }
423     }
424     else {
425       SETERRQ(1,0,"Only local values currently supported");
426     }
427   }
428   return 0;
429 }
430 
431 #undef __FUNC__
432 #define __FUNC__ "MatNorm_MPIBAIJ"
433 int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
434 {
435   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
436   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
437   int        ierr, i,bs2=baij->bs2;
438   double     sum = 0.0;
439   Scalar     *v;
440 
441   if (baij->size == 1) {
442     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
443   } else {
444     if (type == NORM_FROBENIUS) {
445       v = amat->a;
446       for (i=0; i<amat->nz*bs2; i++ ) {
447 #if defined(PETSC_COMPLEX)
448         sum += real(conj(*v)*(*v)); v++;
449 #else
450         sum += (*v)*(*v); v++;
451 #endif
452       }
453       v = bmat->a;
454       for (i=0; i<bmat->nz*bs2; i++ ) {
455 #if defined(PETSC_COMPLEX)
456         sum += real(conj(*v)*(*v)); v++;
457 #else
458         sum += (*v)*(*v); v++;
459 #endif
460       }
461       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
462       *norm = sqrt(*norm);
463     }
464     else
465       SETERRQ(PETSC_ERR_SUP,0,"No support for this norm yet");
466   }
467   return 0;
468 }
469 
470 #undef __FUNC__
471 #define __FUNC__ "MatAssemblyBegin_MPIBAIJ"
472 int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
473 {
474   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
475   MPI_Comm    comm = mat->comm;
476   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
477   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
478   MPI_Request *send_waits,*recv_waits;
479   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
480   InsertMode  addv;
481   Scalar      *rvalues,*svalues;
482 
483   /* make sure all processors are either in INSERTMODE or ADDMODE */
484   MPI_Allreduce(&mat->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
485   if (addv == (ADD_VALUES|INSERT_VALUES)) {
486     SETERRQ(1,0,"Some processors inserted others added");
487   }
488   mat->insertmode = addv; /* in case this processor had no cache */
489 
490   /*  first count number of contributors to each processor */
491   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
492   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
493   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
494   for ( i=0; i<baij->stash.n; i++ ) {
495     idx = baij->stash.idx[i];
496     for ( j=0; j<size; j++ ) {
497       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
498         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
499       }
500     }
501   }
502   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
503 
504   /* inform other processors of number of messages and max length*/
505   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
506   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
507   nreceives = work[rank];
508   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
509   nmax = work[rank];
510   PetscFree(work);
511 
512   /* post receives:
513        1) each message will consist of ordered pairs
514      (global index,value) we store the global index as a double
515      to simplify the message passing.
516        2) since we don't know how long each individual message is we
517      allocate the largest needed buffer for each receive. Potentially
518      this is a lot of wasted space.
519 
520 
521        This could be done better.
522   */
523   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
524   CHKPTRQ(rvalues);
525   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
526   CHKPTRQ(recv_waits);
527   for ( i=0; i<nreceives; i++ ) {
528     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
529               comm,recv_waits+i);
530   }
531 
532   /* do sends:
533       1) starts[i] gives the starting index in svalues for stuff going to
534          the ith processor
535   */
536   svalues = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
537   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
538   CHKPTRQ(send_waits);
539   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
540   starts[0] = 0;
541   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
542   for ( i=0; i<baij->stash.n; i++ ) {
543     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
544     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
545     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
546   }
547   PetscFree(owner);
548   starts[0] = 0;
549   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
550   count = 0;
551   for ( i=0; i<size; i++ ) {
552     if (procs[i]) {
553       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
554                 comm,send_waits+count++);
555     }
556   }
557   PetscFree(starts); PetscFree(nprocs);
558 
559   /* Free cache space */
560   PLogInfo(mat,"MatAssemblyBegin_MPIBAIJ:Number of off-processor values %d\n",baij->stash.n);
561   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
562 
563   baij->svalues    = svalues;    baij->rvalues    = rvalues;
564   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
565   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
566   baij->rmax       = nmax;
567 
568   return 0;
569 }
570 
571 
572 #undef __FUNC__
573 #define __FUNC__ "MatAssemblyEnd_MPIBAIJ"
574 int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
575 {
576   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
577   MPI_Status  *send_status,recv_status;
578   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
579   int         bs=baij->bs,row,col,other_disassembled;
580   Scalar      *values,val;
581   InsertMode  addv = mat->insertmode;
582 
583   /*  wait on receives */
584   while (count) {
585     MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);
586     /* unpack receives into our local space */
587     values = baij->rvalues + 3*imdex*baij->rmax;
588     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
589     n = n/3;
590     for ( i=0; i<n; i++ ) {
591       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
592       col = (int) PetscReal(values[3*i+1]);
593       val = values[3*i+2];
594       if (col >= baij->cstart*bs && col < baij->cend*bs) {
595         col -= baij->cstart*bs;
596         MatSetValues(baij->A,1,&row,1,&col,&val,addv);
597       }
598       else {
599         if (mat->was_assembled) {
600           if (!baij->colmap) {
601             ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);
602           }
603           col = (baij->colmap[col/bs]-1)*bs + col%bs;
604           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
605             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
606             col = (int) PetscReal(values[3*i+1]);
607           }
608         }
609         MatSetValues(baij->B,1,&row,1,&col,&val,addv);
610       }
611     }
612     count--;
613   }
614   PetscFree(baij->recv_waits); PetscFree(baij->rvalues);
615 
616   /* wait on sends */
617   if (baij->nsends) {
618     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));
619     CHKPTRQ(send_status);
620     MPI_Waitall(baij->nsends,baij->send_waits,send_status);
621     PetscFree(send_status);
622   }
623   PetscFree(baij->send_waits); PetscFree(baij->svalues);
624 
625   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
626   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
627 
628   /* determine if any processor has disassembled, if so we must
629      also disassemble ourselfs, in order that we may reassemble. */
630   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
631   if (mat->was_assembled && !other_disassembled) {
632     ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
633   }
634 
635   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
636     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
637   }
638   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
639   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
640 
641   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
642   return 0;
643 }
644 
645 #undef __FUNC__
646 #define __FUNC__ "MatView_MPIBAIJ_Binary"
647 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
648 {
649   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
650   int          ierr;
651 
652   if (baij->size == 1) {
653     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
654   }
655   else SETERRQ(1,0,"Only uniprocessor output supported");
656   return 0;
657 }
658 
659 #undef __FUNC__
660 #define __FUNC__ "MatView_MPIBAIJ_ASCIIorDraworMatlab"
661 static int MatView_MPIBAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
662 {
663   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
664   int          ierr, format,rank,bs = baij->bs;
665   FILE         *fd;
666   ViewerType   vtype;
667 
668   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
669   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
670     ierr = ViewerGetFormat(viewer,&format);
671     if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
672       MatInfo info;
673       MPI_Comm_rank(mat->comm,&rank);
674       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
675       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
676       PetscSequentialPhaseBegin(mat->comm,1);
677       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
678               rank,baij->m,(int)info.nz_used*bs,(int)info.nz_allocated*bs,
679               baij->bs,(int)info.memory);
680       ierr = MatGetInfo(baij->A,MAT_LOCAL,&info);
681       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
682       ierr = MatGetInfo(baij->B,MAT_LOCAL,&info);
683       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
684       fflush(fd);
685       PetscSequentialPhaseEnd(mat->comm,1);
686       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
687       return 0;
688     }
689     else if (format == VIEWER_FORMAT_ASCII_INFO) {
690       PetscPrintf(mat->comm,"  block size is %d\n",bs);
691       return 0;
692     }
693   }
694 
695   if (vtype == DRAW_VIEWER) {
696     Draw       draw;
697     PetscTruth isnull;
698     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
699     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
700   }
701 
702   if (vtype == ASCII_FILE_VIEWER) {
703     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
704     PetscSequentialPhaseBegin(mat->comm,1);
705     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
706            baij->rank,baij->m,baij->rstart*bs,baij->rend*bs,baij->n,
707             baij->cstart*bs,baij->cend*bs);
708     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
709     ierr = MatView(baij->B,viewer); CHKERRQ(ierr);
710     fflush(fd);
711     PetscSequentialPhaseEnd(mat->comm,1);
712   }
713   else {
714     int size = baij->size;
715     rank = baij->rank;
716     if (size == 1) {
717       ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
718     }
719     else {
720       /* assemble the entire matrix onto first processor. */
721       Mat         A;
722       Mat_SeqBAIJ *Aloc;
723       int         M = baij->M, N = baij->N,*ai,*aj,row,col,i,j,k,*rvals;
724       int         mbs=baij->mbs;
725       Scalar      *a;
726 
727       if (!rank) {
728         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
729         CHKERRQ(ierr);
730       }
731       else {
732         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
733         CHKERRQ(ierr);
734       }
735       PLogObjectParent(mat,A);
736 
737       /* copy over the A part */
738       Aloc = (Mat_SeqBAIJ*) baij->A->data;
739       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
740       row = baij->rstart;
741       rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
742 
743       for ( i=0; i<mbs; i++ ) {
744         rvals[0] = bs*(baij->rstart + i);
745         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
746         for ( j=ai[i]; j<ai[i+1]; j++ ) {
747           col = (baij->cstart+aj[j])*bs;
748           for (k=0; k<bs; k++ ) {
749             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
750             col++; a += bs;
751           }
752         }
753       }
754       /* copy over the B part */
755       Aloc = (Mat_SeqBAIJ*) baij->B->data;
756       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
757       row = baij->rstart*bs;
758       for ( i=0; i<mbs; i++ ) {
759         rvals[0] = bs*(baij->rstart + i);
760         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
761         for ( j=ai[i]; j<ai[i+1]; j++ ) {
762           col = baij->garray[aj[j]]*bs;
763           for (k=0; k<bs; k++ ) {
764             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
765             col++; a += bs;
766           }
767         }
768       }
769       PetscFree(rvals);
770       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
771       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
772       if (!rank) {
773         ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
774       }
775       ierr = MatDestroy(A); CHKERRQ(ierr);
776     }
777   }
778   return 0;
779 }
780 
781 
782 
783 #undef __FUNC__
784 #define __FUNC__ "MatView_MPIBAIJ"
785 int MatView_MPIBAIJ(PetscObject obj,Viewer viewer)
786 {
787   Mat         mat = (Mat) obj;
788   int         ierr;
789   ViewerType  vtype;
790 
791   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
792   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
793       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
794     ierr = MatView_MPIBAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
795   }
796   else if (vtype == BINARY_FILE_VIEWER) {
797     return MatView_MPIBAIJ_Binary(mat,viewer);
798   }
799   return 0;
800 }
801 
802 #undef __FUNC__
803 #define __FUNC__ "MatDestroy_MPIBAIJ"
804 int MatDestroy_MPIBAIJ(PetscObject obj)
805 {
806   Mat         mat = (Mat) obj;
807   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
808   int         ierr;
809 
810 #if defined(PETSC_LOG)
811   PLogObjectState(obj,"Rows=%d, Cols=%d",baij->M,baij->N);
812 #endif
813 
814   PetscFree(baij->rowners);
815   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
816   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
817   if (baij->colmap) PetscFree(baij->colmap);
818   if (baij->garray) PetscFree(baij->garray);
819   if (baij->lvec)   VecDestroy(baij->lvec);
820   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
821   if (baij->rowvalues) PetscFree(baij->rowvalues);
822   if (baij->barray) PetscFree(baij->barray);
823   PetscFree(baij);
824   if (mat->mapping) {
825     ierr = ISLocalToGlobalMappingDestroy(mat->mapping); CHKERRQ(ierr);
826   }
827   PLogObjectDestroy(mat);
828   PetscHeaderDestroy(mat);
829   return 0;
830 }
831 
832 #undef __FUNC__
833 #define __FUNC__ "MatMult_MPIBAIJ"
834 int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
835 {
836   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
837   int         ierr, nt;
838 
839   VecGetLocalSize_Fast(xx,nt);
840   if (nt != a->n) {
841     SETERRQ(1,0,"Incompatible partition of A and xx");
842   }
843   VecGetLocalSize_Fast(yy,nt);
844   if (nt != a->m) {
845     SETERRQ(1,0,"Incompatible parition of A and yy");
846   }
847   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
848   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
849   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
850   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
851   ierr = VecScatterPostRecvs(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
852   return 0;
853 }
854 
855 #undef __FUNC__
856 #define __FUNC__ "MatMultAdd_MPIBAIJ"
857 int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
858 {
859   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
860   int        ierr;
861   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
862   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
863   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
864   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
865   return 0;
866 }
867 
868 #undef __FUNC__
869 #define __FUNC__ "MatMultTrans_MPIBAIJ"
870 int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
871 {
872   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
873   int        ierr;
874 
875   /* do nondiagonal part */
876   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
877   /* send it on its way */
878   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
879   /* do local part */
880   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
881   /* receive remote parts: note this assumes the values are not actually */
882   /* inserted in yy until the next line, which is true for my implementation*/
883   /* but is not perhaps always true. */
884   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
885   return 0;
886 }
887 
888 #undef __FUNC__
889 #define __FUNC__ "MatMultTransAdd_MPIBAIJ"
890 int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
891 {
892   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
893   int        ierr;
894 
895   /* do nondiagonal part */
896   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
897   /* send it on its way */
898   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
899   /* do local part */
900   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
901   /* receive remote parts: note this assumes the values are not actually */
902   /* inserted in yy until the next line, which is true for my implementation*/
903   /* but is not perhaps always true. */
904   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
905   return 0;
906 }
907 
908 /*
909   This only works correctly for square matrices where the subblock A->A is the
910    diagonal block
911 */
912 #undef __FUNC__
913 #define __FUNC__ "MatGetDiagonal_MPIBAIJ"
914 int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
915 {
916   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
917   if (a->M != a->N)
918     SETERRQ(1,0,"Supports only square matrix where A->A is diag block");
919   return MatGetDiagonal(a->A,v);
920 }
921 
922 #undef __FUNC__
923 #define __FUNC__ "MatScale_MPIBAIJ"
924 int MatScale_MPIBAIJ(Scalar *aa,Mat A)
925 {
926   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
927   int        ierr;
928   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
929   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
930   return 0;
931 }
932 
933 #undef __FUNC__
934 #define __FUNC__ "MatGetSize_MPIBAIJ"
935 int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
936 {
937   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
938   *m = mat->M; *n = mat->N;
939   return 0;
940 }
941 
942 #undef __FUNC__
943 #define __FUNC__ "MatGetLocalSize_MPIBAIJ"
944 int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
945 {
946   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
947   *m = mat->m; *n = mat->N;
948   return 0;
949 }
950 
951 #undef __FUNC__
952 #define __FUNC__ "MatGetOwnershipRange_MPIBAIJ"
953 int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
954 {
955   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
956   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
957   return 0;
958 }
959 
960 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
961 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
962 
963 #undef __FUNC__
964 #define __FUNC__ "MatGetRow_MPIBAIJ"
965 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
966 {
967   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
968   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
969   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
970   int        nztot, nzA, nzB, lrow, brstart = mat->rstart*bs, brend = mat->rend*bs;
971   int        *cmap, *idx_p,cstart = mat->cstart;
972 
973   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,0,"Already active");
974   mat->getrowactive = PETSC_TRUE;
975 
976   if (!mat->rowvalues && (idx || v)) {
977     /*
978         allocate enough space to hold information from the longest row.
979     */
980     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
981     int     max = 1,mbs = mat->mbs,tmp;
982     for ( i=0; i<mbs; i++ ) {
983       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
984       if (max < tmp) { max = tmp; }
985     }
986     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
987     CHKPTRQ(mat->rowvalues);
988     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
989   }
990 
991 
992   if (row < brstart || row >= brend) SETERRQ(1,0,"Only local rows")
993   lrow = row - brstart;
994 
995   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
996   if (!v)   {pvA = 0; pvB = 0;}
997   if (!idx) {pcA = 0; if (!v) pcB = 0;}
998   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
999   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1000   nztot = nzA + nzB;
1001 
1002   cmap  = mat->garray;
1003   if (v  || idx) {
1004     if (nztot) {
1005       /* Sort by increasing column numbers, assuming A and B already sorted */
1006       int imark = -1;
1007       if (v) {
1008         *v = v_p = mat->rowvalues;
1009         for ( i=0; i<nzB; i++ ) {
1010           if (cmap[cworkB[i]/bs] < cstart)   v_p[i] = vworkB[i];
1011           else break;
1012         }
1013         imark = i;
1014         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
1015         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
1016       }
1017       if (idx) {
1018         *idx = idx_p = mat->rowindices;
1019         if (imark > -1) {
1020           for ( i=0; i<imark; i++ ) {
1021             idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs;
1022           }
1023         } else {
1024           for ( i=0; i<nzB; i++ ) {
1025             if (cmap[cworkB[i]/bs] < cstart)
1026               idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
1027             else break;
1028           }
1029           imark = i;
1030         }
1031         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart*bs + cworkA[i];
1032         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
1033       }
1034     }
1035     else {
1036       if (idx) *idx = 0;
1037       if (v)   *v   = 0;
1038     }
1039   }
1040   *nz = nztot;
1041   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1042   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1043   return 0;
1044 }
1045 
1046 #undef __FUNC__
1047 #define __FUNC__ "MatRestoreRow_MPIBAIJ"
1048 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
1049 {
1050   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1051   if (baij->getrowactive == PETSC_FALSE) {
1052     SETERRQ(1,0,"MatGetRow not called");
1053   }
1054   baij->getrowactive = PETSC_FALSE;
1055   return 0;
1056 }
1057 
1058 #undef __FUNC__
1059 #define __FUNC__ "MatGetBlockSize_MPIBAIJ"
1060 int MatGetBlockSize_MPIBAIJ(Mat mat,int *bs)
1061 {
1062   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1063   *bs = baij->bs;
1064   return 0;
1065 }
1066 
1067 #undef __FUNC__
1068 #define __FUNC__ "MatZeroEntries_MPIBAIJ"
1069 int MatZeroEntries_MPIBAIJ(Mat A)
1070 {
1071   Mat_MPIBAIJ *l = (Mat_MPIBAIJ *) A->data;
1072   int         ierr;
1073   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
1074   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
1075   return 0;
1076 }
1077 
1078 #undef __FUNC__
1079 #define __FUNC__ "MatGetInfo_MPIBAIJ"
1080 int MatGetInfo_MPIBAIJ(Mat matin,MatInfoType flag,MatInfo *info)
1081 {
1082   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) matin->data;
1083   Mat         A = a->A, B = a->B;
1084   int         ierr;
1085   double      isend[5], irecv[5];
1086 
1087   info->rows_global    = (double)a->M;
1088   info->columns_global = (double)a->N;
1089   info->rows_local     = (double)a->m;
1090   info->columns_local  = (double)a->N;
1091   info->block_size     = (double)a->bs;
1092   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
1093   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->memory;
1094   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
1095   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->memory;
1096   if (flag == MAT_LOCAL) {
1097     info->nz_used      = isend[0];
1098     info->nz_allocated = isend[1];
1099     info->nz_unneeded  = isend[2];
1100     info->memory       = isend[3];
1101     info->mallocs      = isend[4];
1102   } else if (flag == MAT_GLOBAL_MAX) {
1103     MPI_Allreduce(isend,irecv,5,MPI_INT,MPI_MAX,matin->comm);
1104     info->nz_used      = irecv[0];
1105     info->nz_allocated = irecv[1];
1106     info->nz_unneeded  = irecv[2];
1107     info->memory       = irecv[3];
1108     info->mallocs      = irecv[4];
1109   } else if (flag == MAT_GLOBAL_SUM) {
1110     MPI_Allreduce(isend,irecv,5,MPI_INT,MPI_SUM,matin->comm);
1111     info->nz_used      = irecv[0];
1112     info->nz_allocated = irecv[1];
1113     info->nz_unneeded  = irecv[2];
1114     info->memory       = irecv[3];
1115     info->mallocs      = irecv[4];
1116   }
1117   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
1118   info->fill_ratio_needed = 0;
1119   info->factor_mallocs    = 0;
1120   return 0;
1121 }
1122 
1123 #undef __FUNC__
1124 #define __FUNC__ "MatSetOption_MPIBAIJ"
1125 int MatSetOption_MPIBAIJ(Mat A,MatOption op)
1126 {
1127   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1128 
1129   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
1130       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
1131       op == MAT_COLUMNS_UNSORTED ||
1132       op == MAT_COLUMNS_SORTED ||
1133       op == MAT_NEW_NONZERO_LOCATION_ERROR) {
1134         MatSetOption(a->A,op);
1135         MatSetOption(a->B,op);
1136   } else if (op == MAT_ROW_ORIENTED) {
1137         a->roworiented = 1;
1138         MatSetOption(a->A,op);
1139         MatSetOption(a->B,op);
1140   } else if (op == MAT_ROWS_SORTED ||
1141              op == MAT_ROWS_UNSORTED ||
1142              op == MAT_SYMMETRIC ||
1143              op == MAT_STRUCTURALLY_SYMMETRIC ||
1144              op == MAT_YES_NEW_DIAGONALS)
1145     PLogInfo(A,"Info:MatSetOption_MPIBAIJ:Option ignored\n");
1146   else if (op == MAT_COLUMN_ORIENTED) {
1147     a->roworiented = 0;
1148     MatSetOption(a->A,op);
1149     MatSetOption(a->B,op);
1150   } else if (op == MAT_IGNORE_OFF_PROCESSOR_ENTRIES) {
1151     a->donotstash = 1;
1152   } else if (op == MAT_NO_NEW_DIAGONALS)
1153     {SETERRQ(PETSC_ERR_SUP,0,"MAT_NO_NEW_DIAGONALS");}
1154   else
1155     {SETERRQ(PETSC_ERR_SUP,0,"unknown option");}
1156   return 0;
1157 }
1158 
1159 #undef __FUNC__
1160 #define __FUNC__ "MatTranspose_MPIBAIJ("
1161 int MatTranspose_MPIBAIJ(Mat A,Mat *matout)
1162 {
1163   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) A->data;
1164   Mat_SeqBAIJ *Aloc;
1165   Mat        B;
1166   int        ierr,M=baij->M,N=baij->N,*ai,*aj,row,i,*rvals,j,k,col;
1167   int        bs=baij->bs,mbs=baij->mbs;
1168   Scalar     *a;
1169 
1170   if (matout == PETSC_NULL && M != N)
1171     SETERRQ(1,0,"Square matrix only for in-place");
1172   ierr = MatCreateMPIBAIJ(A->comm,baij->bs,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);
1173   CHKERRQ(ierr);
1174 
1175   /* copy over the A part */
1176   Aloc = (Mat_SeqBAIJ*) baij->A->data;
1177   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1178   row = baij->rstart;
1179   rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
1180 
1181   for ( i=0; i<mbs; i++ ) {
1182     rvals[0] = bs*(baij->rstart + i);
1183     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1184     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1185       col = (baij->cstart+aj[j])*bs;
1186       for (k=0; k<bs; k++ ) {
1187         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1188         col++; a += bs;
1189       }
1190     }
1191   }
1192   /* copy over the B part */
1193   Aloc = (Mat_SeqBAIJ*) baij->B->data;
1194   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1195   row = baij->rstart*bs;
1196   for ( i=0; i<mbs; i++ ) {
1197     rvals[0] = bs*(baij->rstart + i);
1198     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1199     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1200       col = baij->garray[aj[j]]*bs;
1201       for (k=0; k<bs; k++ ) {
1202         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1203         col++; a += bs;
1204       }
1205     }
1206   }
1207   PetscFree(rvals);
1208   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1209   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1210 
1211   if (matout != PETSC_NULL) {
1212     *matout = B;
1213   } else {
1214     /* This isn't really an in-place transpose .... but free data structures from baij */
1215     PetscFree(baij->rowners);
1216     ierr = MatDestroy(baij->A); CHKERRQ(ierr);
1217     ierr = MatDestroy(baij->B); CHKERRQ(ierr);
1218     if (baij->colmap) PetscFree(baij->colmap);
1219     if (baij->garray) PetscFree(baij->garray);
1220     if (baij->lvec) VecDestroy(baij->lvec);
1221     if (baij->Mvctx) VecScatterDestroy(baij->Mvctx);
1222     PetscFree(baij);
1223     PetscMemcpy(A,B,sizeof(struct _Mat));
1224     PetscHeaderDestroy(B);
1225   }
1226   return 0;
1227 }
1228 
1229 #undef __FUNC__
1230 #define __FUNC__ "MatDiagonalScale_MPIBAIJ"
1231 int MatDiagonalScale_MPIBAIJ(Mat A,Vec ll,Vec rr)
1232 {
1233   Mat a = ((Mat_MPIBAIJ *) A->data)->A;
1234   Mat b = ((Mat_MPIBAIJ *) A->data)->B;
1235   int ierr,s1,s2,s3;
1236 
1237   if (ll)  {
1238     ierr = VecGetLocalSize(ll,&s1); CHKERRQ(ierr);
1239     ierr = MatGetLocalSize(A,&s2,&s3); CHKERRQ(ierr);
1240     if (s1!=s2) SETERRQ(1,0,"non-conforming local sizes");
1241     ierr = MatDiagonalScale(a,ll,0); CHKERRQ(ierr);
1242     ierr = MatDiagonalScale(b,ll,0); CHKERRQ(ierr);
1243   }
1244   if (rr) SETERRQ(1,0,"not supported for right vector");
1245   return 0;
1246 }
1247 
1248 /* the code does not do the diagonal entries correctly unless the
1249    matrix is square and the column and row owerships are identical.
1250    This is a BUG. The only way to fix it seems to be to access
1251    baij->A and baij->B directly and not through the MatZeroRows()
1252    routine.
1253 */
1254 #undef __FUNC__
1255 #define __FUNC__ "MatZeroRows_MPIBAIJ"
1256 int MatZeroRows_MPIBAIJ(Mat A,IS is,Scalar *diag)
1257 {
1258   Mat_MPIBAIJ    *l = (Mat_MPIBAIJ *) A->data;
1259   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
1260   int            *procs,*nprocs,j,found,idx,nsends,*work;
1261   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
1262   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
1263   int            *lens,imdex,*lrows,*values,bs=l->bs;
1264   MPI_Comm       comm = A->comm;
1265   MPI_Request    *send_waits,*recv_waits;
1266   MPI_Status     recv_status,*send_status;
1267   IS             istmp;
1268 
1269   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
1270   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
1271 
1272   /*  first count number of contributors to each processor */
1273   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
1274   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
1275   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
1276   for ( i=0; i<N; i++ ) {
1277     idx = rows[i];
1278     found = 0;
1279     for ( j=0; j<size; j++ ) {
1280       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
1281         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
1282       }
1283     }
1284     if (!found) SETERRQ(1,0,"Index out of range");
1285   }
1286   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
1287 
1288   /* inform other processors of number of messages and max length*/
1289   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
1290   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
1291   nrecvs = work[rank];
1292   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
1293   nmax = work[rank];
1294   PetscFree(work);
1295 
1296   /* post receives:   */
1297   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
1298   CHKPTRQ(rvalues);
1299   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
1300   CHKPTRQ(recv_waits);
1301   for ( i=0; i<nrecvs; i++ ) {
1302     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
1303   }
1304 
1305   /* do sends:
1306       1) starts[i] gives the starting index in svalues for stuff going to
1307          the ith processor
1308   */
1309   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
1310   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
1311   CHKPTRQ(send_waits);
1312   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
1313   starts[0] = 0;
1314   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1315   for ( i=0; i<N; i++ ) {
1316     svalues[starts[owner[i]]++] = rows[i];
1317   }
1318   ISRestoreIndices(is,&rows);
1319 
1320   starts[0] = 0;
1321   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1322   count = 0;
1323   for ( i=0; i<size; i++ ) {
1324     if (procs[i]) {
1325       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
1326     }
1327   }
1328   PetscFree(starts);
1329 
1330   base = owners[rank]*bs;
1331 
1332   /*  wait on receives */
1333   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
1334   source = lens + nrecvs;
1335   count  = nrecvs; slen = 0;
1336   while (count) {
1337     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
1338     /* unpack receives into our local space */
1339     MPI_Get_count(&recv_status,MPI_INT,&n);
1340     source[imdex]  = recv_status.MPI_SOURCE;
1341     lens[imdex]  = n;
1342     slen += n;
1343     count--;
1344   }
1345   PetscFree(recv_waits);
1346 
1347   /* move the data into the send scatter */
1348   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
1349   count = 0;
1350   for ( i=0; i<nrecvs; i++ ) {
1351     values = rvalues + i*nmax;
1352     for ( j=0; j<lens[i]; j++ ) {
1353       lrows[count++] = values[j] - base;
1354     }
1355   }
1356   PetscFree(rvalues); PetscFree(lens);
1357   PetscFree(owner); PetscFree(nprocs);
1358 
1359   /* actually zap the local rows */
1360   ierr = ISCreateGeneral(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
1361   PLogObjectParent(A,istmp);
1362   PetscFree(lrows);
1363   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
1364   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
1365   ierr = ISDestroy(istmp); CHKERRQ(ierr);
1366 
1367   /* wait on sends */
1368   if (nsends) {
1369     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
1370     CHKPTRQ(send_status);
1371     MPI_Waitall(nsends,send_waits,send_status);
1372     PetscFree(send_status);
1373   }
1374   PetscFree(send_waits); PetscFree(svalues);
1375 
1376   return 0;
1377 }
1378 extern int MatPrintHelp_SeqBAIJ(Mat);
1379 #undef __FUNC__
1380 #define __FUNC__ "MatPrintHelp_MPIBAIJ"
1381 int MatPrintHelp_MPIBAIJ(Mat A)
1382 {
1383   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1384 
1385   if (!a->rank) return MatPrintHelp_SeqBAIJ(a->A);
1386   else return 0;
1387 }
1388 
1389 #undef __FUNC__
1390 #define __FUNC__ "MatSetUnfactored_MPIBAIJ"
1391 int MatSetUnfactored_MPIBAIJ(Mat A)
1392 {
1393   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1394   int         ierr;
1395   ierr = MatSetUnfactored(a->A); CHKERRQ(ierr);
1396   return 0;
1397 }
1398 
1399 static int MatConvertSameType_MPIBAIJ(Mat,Mat *,int);
1400 
1401 /* -------------------------------------------------------------------*/
1402 static struct _MatOps MatOps = {
1403   MatSetValues_MPIBAIJ,MatGetRow_MPIBAIJ,MatRestoreRow_MPIBAIJ,MatMult_MPIBAIJ,
1404   MatMultAdd_MPIBAIJ,MatMultTrans_MPIBAIJ,MatMultTransAdd_MPIBAIJ,0,
1405   0,0,0,0,
1406   0,0,MatTranspose_MPIBAIJ,MatGetInfo_MPIBAIJ,
1407   0,MatGetDiagonal_MPIBAIJ,MatDiagonalScale_MPIBAIJ,MatNorm_MPIBAIJ,
1408   MatAssemblyBegin_MPIBAIJ,MatAssemblyEnd_MPIBAIJ,0,MatSetOption_MPIBAIJ,
1409   MatZeroEntries_MPIBAIJ,MatZeroRows_MPIBAIJ,0,
1410   0,0,0,MatGetSize_MPIBAIJ,
1411   MatGetLocalSize_MPIBAIJ,MatGetOwnershipRange_MPIBAIJ,0,0,
1412   0,0,MatConvertSameType_MPIBAIJ,0,0,
1413   0,0,0,MatGetSubMatrices_MPIBAIJ,
1414   MatIncreaseOverlap_MPIBAIJ,MatGetValues_MPIBAIJ,0,MatPrintHelp_MPIBAIJ,
1415   MatScale_MPIBAIJ,0,0,0,MatGetBlockSize_MPIBAIJ,
1416   0,0,0,0,0,0,MatSetUnfactored_MPIBAIJ,0,MatSetValuesBlocked_MPIBAIJ};
1417 
1418 
1419 #undef __FUNC__
1420 #define __FUNC__ "MatCreateMPIBAIJ"
1421 /*@C
1422    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
1423    (block compressed row).  For good matrix assembly performance
1424    the user should preallocate the matrix storage by setting the parameters
1425    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1426    performance can be increased by more than a factor of 50.
1427 
1428    Input Parameters:
1429 .  comm - MPI communicator
1430 .  bs   - size of blockk
1431 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1432            This value should be the same as the local size used in creating the
1433            y vector for the matrix-vector product y = Ax.
1434 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1435            This value should be the same as the local size used in creating the
1436            x vector for the matrix-vector product y = Ax.
1437 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1438 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1439 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
1440            submatrix  (same for all local rows)
1441 .  d_nzz - array containing the number of block nonzeros in the various block rows
1442            of the in diagonal portion of the local (possibly different for each block
1443            row) or PETSC_NULL.  You must leave room for the diagonal entry even if
1444            it is zero.
1445 .  o_nz  - number of block nonzeros per block row in the off-diagonal portion of local
1446            submatrix (same for all local rows).
1447 .  o_nzz - array containing the number of nonzeros in the various block rows of the
1448            off-diagonal portion of the local submatrix (possibly different for
1449            each block row) or PETSC_NULL.
1450 
1451    Output Parameter:
1452 .  A - the matrix
1453 
1454    Notes:
1455    The user MUST specify either the local or global matrix dimensions
1456    (possibly both).
1457 
1458    Storage Information:
1459    For a square global matrix we define each processor's diagonal portion
1460    to be its local rows and the corresponding columns (a square submatrix);
1461    each processor's off-diagonal portion encompasses the remainder of the
1462    local matrix (a rectangular submatrix).
1463 
1464    The user can specify preallocated storage for the diagonal part of
1465    the local submatrix with either d_nz or d_nnz (not both).  Set
1466    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1467    memory allocation.  Likewise, specify preallocated storage for the
1468    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1469 
1470    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1471    the figure below we depict these three local rows and all columns (0-11).
1472 
1473 $          0 1 2 3 4 5 6 7 8 9 10 11
1474 $         -------------------
1475 $  row 3  |  o o o d d d o o o o o o
1476 $  row 4  |  o o o d d d o o o o o o
1477 $  row 5  |  o o o d d d o o o o o o
1478 $         -------------------
1479 $
1480 
1481    Thus, any entries in the d locations are stored in the d (diagonal)
1482    submatrix, and any entries in the o locations are stored in the
1483    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1484    stored simply in the MATSEQBAIJ format for compressed row storage.
1485 
1486    Now d_nz should indicate the number of nonzeros per row in the d matrix,
1487    and o_nz should indicate the number of nonzeros per row in the o matrix.
1488    In general, for PDE problems in which most nonzeros are near the diagonal,
1489    one expects d_nz >> o_nz.   For large problems you MUST preallocate memory
1490    or you will get TERRIBLE performance; see the users' manual chapter on
1491    matrices.
1492 
1493 .keywords: matrix, block, aij, compressed row, sparse, parallel
1494 
1495 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues()
1496 @*/
1497 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
1498                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1499 {
1500   Mat          B;
1501   Mat_MPIBAIJ  *b;
1502   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE,size;
1503 
1504   if (bs < 1) SETERRQ(1,0,"invalid block size specified");
1505   *A = 0;
1506   PetscHeaderCreate(B,_Mat,MAT_COOKIE,MATMPIBAIJ,comm);
1507   PLogObjectCreate(B);
1508   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
1509   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
1510   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1511   MPI_Comm_size(comm,&size);
1512   if (size == 1) {
1513     B->ops.getrowij          = MatGetRowIJ_MPIBAIJ;
1514     B->ops.restorerowij      = MatRestoreRowIJ_MPIBAIJ;
1515     B->ops.lufactorsymbolic  = MatLUFactorSymbolic_MPIBAIJ;
1516     B->ops.lufactornumeric   = MatLUFactorNumeric_MPIBAIJ;
1517     B->ops.lufactor          = MatLUFactor_MPIBAIJ;
1518     B->ops.solve             = MatSolve_MPIBAIJ;
1519     B->ops.solveadd          = MatSolveAdd_MPIBAIJ;
1520     B->ops.solvetrans        = MatSolveTrans_MPIBAIJ;
1521     B->ops.solvetransadd     = MatSolveTransAdd_MPIBAIJ;
1522     B->ops.ilufactorsymbolic = MatILUFactorSymbolic_MPIBAIJ;
1523   }
1524 
1525   B->destroy    = MatDestroy_MPIBAIJ;
1526   B->view       = MatView_MPIBAIJ;
1527   B->mapping    = 0;
1528   B->factor     = 0;
1529   B->assembled  = PETSC_FALSE;
1530 
1531   B->insertmode = NOT_SET_VALUES;
1532   MPI_Comm_rank(comm,&b->rank);
1533   MPI_Comm_size(comm,&b->size);
1534 
1535   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
1536     SETERRQ(1,0,"Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1537   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) SETERRQ(1,0,"either M or m should be specified");
1538   if ( M == PETSC_DECIDE && n == PETSC_DECIDE)SETERRQ(1,0,"either N or n should be specified");
1539   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
1540   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
1541 
1542   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1543     work[0] = m; work[1] = n;
1544     mbs = m/bs; nbs = n/bs;
1545     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
1546     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
1547     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
1548   }
1549   if (m == PETSC_DECIDE) {
1550     Mbs = M/bs;
1551     if (Mbs*bs != M) SETERRQ(1,0,"No of global rows must be divisible by blocksize");
1552     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
1553     m   = mbs*bs;
1554   }
1555   if (n == PETSC_DECIDE) {
1556     Nbs = N/bs;
1557     if (Nbs*bs != N) SETERRQ(1,0,"No of global cols must be divisible by blocksize");
1558     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
1559     n   = nbs*bs;
1560   }
1561   if (mbs*bs != m || nbs*bs != n) SETERRQ(1,0,"No of local rows, cols must be divisible by blocksize");
1562 
1563   b->m = m; B->m = m;
1564   b->n = n; B->n = n;
1565   b->N = N; B->N = N;
1566   b->M = M; B->M = M;
1567   b->bs  = bs;
1568   b->bs2 = bs*bs;
1569   b->mbs = mbs;
1570   b->nbs = nbs;
1571   b->Mbs = Mbs;
1572   b->Nbs = Nbs;
1573 
1574   /* build local table of row and column ownerships */
1575   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1576   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
1577   b->cowners = b->rowners + b->size + 2;
1578   MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
1579   b->rowners[0] = 0;
1580   for ( i=2; i<=b->size; i++ ) {
1581     b->rowners[i] += b->rowners[i-1];
1582   }
1583   b->rstart    = b->rowners[b->rank];
1584   b->rend      = b->rowners[b->rank+1];
1585   b->rstart_bs = b->rstart * bs;
1586   b->rend_bs   = b->rend * bs;
1587 
1588   MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
1589   b->cowners[0] = 0;
1590   for ( i=2; i<=b->size; i++ ) {
1591     b->cowners[i] += b->cowners[i-1];
1592   }
1593   b->cstart    = b->cowners[b->rank];
1594   b->cend      = b->cowners[b->rank+1];
1595   b->cstart_bs = b->cstart * bs;
1596   b->cend_bs   = b->cend * bs;
1597 
1598   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1599   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1600   PLogObjectParent(B,b->A);
1601   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1602   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1603   PLogObjectParent(B,b->B);
1604 
1605   /* build cache for off array entries formed */
1606   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1607   b->donotstash  = 0;
1608   b->colmap      = 0;
1609   b->garray      = 0;
1610   b->roworiented = 1;
1611 
1612   /* stuff used in block assembly */
1613   b->barray      = 0;
1614 
1615   /* stuff used for matrix vector multiply */
1616   b->lvec        = 0;
1617   b->Mvctx       = 0;
1618 
1619   /* stuff for MatGetRow() */
1620   b->rowindices   = 0;
1621   b->rowvalues    = 0;
1622   b->getrowactive = PETSC_FALSE;
1623 
1624   *A = B;
1625   return 0;
1626 }
1627 
1628 #undef __FUNC__
1629 #define __FUNC__ "MatConvertSameType_MPIBAIJ"
1630 static int MatConvertSameType_MPIBAIJ(Mat matin,Mat *newmat,int cpvalues)
1631 {
1632   Mat         mat;
1633   Mat_MPIBAIJ *a,*oldmat = (Mat_MPIBAIJ *) matin->data;
1634   int         ierr, len=0, flg;
1635 
1636   *newmat       = 0;
1637   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIBAIJ,matin->comm);
1638   PLogObjectCreate(mat);
1639   mat->data       = (void *) (a = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(a);
1640   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1641   mat->destroy    = MatDestroy_MPIBAIJ;
1642   mat->view       = MatView_MPIBAIJ;
1643   mat->factor     = matin->factor;
1644   mat->assembled  = PETSC_TRUE;
1645 
1646   a->m = mat->m   = oldmat->m;
1647   a->n = mat->n   = oldmat->n;
1648   a->M = mat->M   = oldmat->M;
1649   a->N = mat->N   = oldmat->N;
1650 
1651   a->bs  = oldmat->bs;
1652   a->bs2 = oldmat->bs2;
1653   a->mbs = oldmat->mbs;
1654   a->nbs = oldmat->nbs;
1655   a->Mbs = oldmat->Mbs;
1656   a->Nbs = oldmat->Nbs;
1657 
1658   a->rstart       = oldmat->rstart;
1659   a->rend         = oldmat->rend;
1660   a->cstart       = oldmat->cstart;
1661   a->cend         = oldmat->cend;
1662   a->size         = oldmat->size;
1663   a->rank         = oldmat->rank;
1664   mat->insertmode = NOT_SET_VALUES;
1665   a->rowvalues    = 0;
1666   a->getrowactive = PETSC_FALSE;
1667   a->barray       = 0;
1668 
1669   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1670   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
1671   a->cowners = a->rowners + a->size + 2;
1672   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1673   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1674   if (oldmat->colmap) {
1675     a->colmap = (int *) PetscMalloc((a->Nbs)*sizeof(int));CHKPTRQ(a->colmap);
1676     PLogObjectMemory(mat,(a->Nbs)*sizeof(int));
1677     PetscMemcpy(a->colmap,oldmat->colmap,(a->Nbs)*sizeof(int));
1678   } else a->colmap = 0;
1679   if (oldmat->garray && (len = ((Mat_SeqBAIJ *) (oldmat->B->data))->nbs)) {
1680     a->garray = (int *) PetscMalloc(len*sizeof(int)); CHKPTRQ(a->garray);
1681     PLogObjectMemory(mat,len*sizeof(int));
1682     PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1683   } else a->garray = 0;
1684 
1685   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1686   PLogObjectParent(mat,a->lvec);
1687   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1688   PLogObjectParent(mat,a->Mvctx);
1689   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1690   PLogObjectParent(mat,a->A);
1691   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1692   PLogObjectParent(mat,a->B);
1693   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1694   if (flg) {
1695     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1696   }
1697   *newmat = mat;
1698   return 0;
1699 }
1700 
1701 #include "sys.h"
1702 
1703 #undef __FUNC__
1704 #define __FUNC__ "MatLoad_MPIBAIJ"
1705 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
1706 {
1707   Mat          A;
1708   int          i, nz, ierr, j,rstart, rend, fd;
1709   Scalar       *vals,*buf;
1710   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1711   MPI_Status   status;
1712   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
1713   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
1714   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,bs2,Mbs,mbs,extra_rows;
1715   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
1716   int          dcount,kmax,k,nzcount,tmp;
1717 
1718 
1719   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
1720   bs2  = bs*bs;
1721 
1722   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1723   if (!rank) {
1724     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1725     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
1726     if (header[0] != MAT_COOKIE) SETERRQ(1,0,"not matrix object");
1727   }
1728 
1729   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1730   M = header[1]; N = header[2];
1731 
1732   if (M != N) SETERRQ(1,0,"Can only do square matrices");
1733 
1734   /*
1735      This code adds extra rows to make sure the number of rows is
1736      divisible by the blocksize
1737   */
1738   Mbs        = M/bs;
1739   extra_rows = bs - M + bs*(Mbs);
1740   if (extra_rows == bs) extra_rows = 0;
1741   else                  Mbs++;
1742   if (extra_rows &&!rank) {
1743     PLogInfo(0,"MatLoad_MPIBAIJ:Padding loaded matrix to match blocksize\n");
1744   }
1745 
1746   /* determine ownership of all rows */
1747   mbs = Mbs/size + ((Mbs % size) > rank);
1748   m   = mbs * bs;
1749   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
1750   browners = rowners + size + 1;
1751   MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1752   rowners[0] = 0;
1753   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
1754   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
1755   rstart = rowners[rank];
1756   rend   = rowners[rank+1];
1757 
1758   /* distribute row lengths to all processors */
1759   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
1760   if (!rank) {
1761     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
1762     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1763     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
1764     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1765     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
1766     MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);
1767     PetscFree(sndcounts);
1768   }
1769   else {
1770     MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);
1771   }
1772 
1773   if (!rank) {
1774     /* calculate the number of nonzeros on each processor */
1775     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1776     PetscMemzero(procsnz,size*sizeof(int));
1777     for ( i=0; i<size; i++ ) {
1778       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
1779         procsnz[i] += rowlengths[j];
1780       }
1781     }
1782     PetscFree(rowlengths);
1783 
1784     /* determine max buffer needed and allocate it */
1785     maxnz = 0;
1786     for ( i=0; i<size; i++ ) {
1787       maxnz = PetscMax(maxnz,procsnz[i]);
1788     }
1789     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1790 
1791     /* read in my part of the matrix column indices  */
1792     nz = procsnz[0];
1793     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1794     mycols = ibuf;
1795     if (size == 1)  nz -= extra_rows;
1796     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1797     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
1798 
1799     /* read in every ones (except the last) and ship off */
1800     for ( i=1; i<size-1; i++ ) {
1801       nz = procsnz[i];
1802       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1803       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1804     }
1805     /* read in the stuff for the last proc */
1806     if ( size != 1 ) {
1807       nz = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
1808       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1809       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
1810       MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);
1811     }
1812     PetscFree(cols);
1813   }
1814   else {
1815     /* determine buffer space needed for message */
1816     nz = 0;
1817     for ( i=0; i<m; i++ ) {
1818       nz += locrowlens[i];
1819     }
1820     ibuf = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1821     mycols = ibuf;
1822     /* receive message of column indices*/
1823     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1824     MPI_Get_count(&status,MPI_INT,&maxnz);
1825     if (maxnz != nz) SETERRQ(1,0,"something is wrong with file");
1826   }
1827 
1828   /* loop over local rows, determining number of off diagonal entries */
1829   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
1830   odlens = dlens + (rend-rstart);
1831   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
1832   PetscMemzero(mask,3*Mbs*sizeof(int));
1833   masked1 = mask    + Mbs;
1834   masked2 = masked1 + Mbs;
1835   rowcount = 0; nzcount = 0;
1836   for ( i=0; i<mbs; i++ ) {
1837     dcount  = 0;
1838     odcount = 0;
1839     for ( j=0; j<bs; j++ ) {
1840       kmax = locrowlens[rowcount];
1841       for ( k=0; k<kmax; k++ ) {
1842         tmp = mycols[nzcount++]/bs;
1843         if (!mask[tmp]) {
1844           mask[tmp] = 1;
1845           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
1846           else masked1[dcount++] = tmp;
1847         }
1848       }
1849       rowcount++;
1850     }
1851 
1852     dlens[i]  = dcount;
1853     odlens[i] = odcount;
1854 
1855     /* zero out the mask elements we set */
1856     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
1857     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
1858   }
1859 
1860   /* create our matrix */
1861   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);
1862          CHKERRQ(ierr);
1863   A = *newmat;
1864   MatSetOption(A,MAT_COLUMNS_SORTED);
1865 
1866   if (!rank) {
1867     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
1868     /* read in my part of the matrix numerical values  */
1869     nz = procsnz[0];
1870     vals = buf;
1871     mycols = ibuf;
1872     if (size == 1)  nz -= extra_rows;
1873     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1874     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
1875 
1876     /* insert into matrix */
1877     jj      = rstart*bs;
1878     for ( i=0; i<m; i++ ) {
1879       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1880       mycols += locrowlens[i];
1881       vals   += locrowlens[i];
1882       jj++;
1883     }
1884     /* read in other processors (except the last one) and ship out */
1885     for ( i=1; i<size-1; i++ ) {
1886       nz = procsnz[i];
1887       vals = buf;
1888       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1889       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1890     }
1891     /* the last proc */
1892     if ( size != 1 ){
1893       nz = procsnz[i] - extra_rows;
1894       vals = buf;
1895       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1896       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
1897       MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);
1898     }
1899     PetscFree(procsnz);
1900   }
1901   else {
1902     /* receive numeric values */
1903     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
1904 
1905     /* receive message of values*/
1906     vals = buf;
1907     mycols = ibuf;
1908     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1909     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1910     if (maxnz != nz) SETERRQ(1,0,"something is wrong with file");
1911 
1912     /* insert into matrix */
1913     jj      = rstart*bs;
1914     for ( i=0; i<m; i++ ) {
1915       ierr    = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1916       mycols += locrowlens[i];
1917       vals   += locrowlens[i];
1918       jj++;
1919     }
1920   }
1921   PetscFree(locrowlens);
1922   PetscFree(buf);
1923   PetscFree(ibuf);
1924   PetscFree(rowners);
1925   PetscFree(dlens);
1926   PetscFree(mask);
1927   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1928   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1929   return 0;
1930 }
1931 
1932 
1933