xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision 1de57adc8a1ca54985b5e97fb2fb4832b3ccb382)
1 #ifndef lint
2 static char vcid[] = "$Id: mpibaij.c,v 1.71 1997/05/23 18:38:15 balay Exp balay $";
3 #endif
4 
5 #include "pinclude/pviewer.h"
6 #include "src/mat/impls/baij/mpi/mpibaij.h"
7 #include "src/vec/vecimpl.h"
8 
9 
10 extern int MatSetUpMultiply_MPIBAIJ(Mat);
11 extern int DisAssemble_MPIBAIJ(Mat);
12 extern int MatIncreaseOverlap_MPIBAIJ(Mat,int,IS *,int);
13 extern int MatGetSubMatrices_MPIBAIJ(Mat,int,IS *,IS *,MatGetSubMatrixCall,Mat **);
14 extern int MatLUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,Mat *);
15 extern int MatLUFactorNumeric_MPIBAIJ(Mat,Mat *);
16 extern int MatLUFactor_MPIBAIJ(Mat,IS,IS,double);
17 extern int MatSolve_MPIBAIJ(Mat,Vec,Vec);
18 extern int MatSolveAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
19 extern int MatSolveTrans_MPIBAIJ(Mat,Vec,Vec);
20 extern int MatSolveTransAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
21 extern int MatILUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,int,Mat *);
22 
23 
24 /*
25      Local utility routine that creates a mapping from the global column
26    number to the local number in the off-diagonal part of the local
27    storage of the matrix.  This is done in a non scable way since the
28    length of colmap equals the global matrix length.
29 */
30 #undef __FUNC__
31 #define __FUNC__ "CreateColmap_MPIBAIJ_Private"
32 static int CreateColmap_MPIBAIJ_Private(Mat mat)
33 {
34   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
35   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
36   int         nbs = B->nbs,i,bs=B->bs;;
37 
38   baij->colmap = (int *) PetscMalloc((baij->Nbs+1)*sizeof(int));CHKPTRQ(baij->colmap);
39   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
40   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
41   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i*bs+1;
42   return 0;
43 }
44 
45 #undef __FUNC__
46 #define __FUNC__ "MatGetRowIJ_MPIBAIJ("
47 static int MatGetRowIJ_MPIBAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
48                             PetscTruth *done)
49 {
50   Mat_MPIBAIJ *aij = (Mat_MPIBAIJ *) mat->data;
51   int         ierr;
52   if (aij->size == 1) {
53     ierr = MatGetRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
54   } else SETERRQ(1,0,"not supported in parallel");
55   return 0;
56 }
57 
58 #undef __FUNC__
59 #define __FUNC__ "MatRestoreRowIJ_MPIBAIJ"
60 static int MatRestoreRowIJ_MPIBAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
61                                 PetscTruth *done)
62 {
63   Mat_MPIBAIJ *aij = (Mat_MPIBAIJ *) mat->data;
64   int        ierr;
65   if (aij->size == 1) {
66     ierr = MatRestoreRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
67   } else SETERRQ(1,0,"not supported in parallel");
68   return 0;
69 }
70 #define CHUNKSIZE  10
71 
72 #define  MatSetValues_SeqBAIJ_A_Private(row,col,value,addv) \
73 { \
74  \
75     brow = row/bs;  \
76     rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
77     rmax = aimax[brow]; nrow = ailen[brow]; \
78       bcol = col/bs; \
79       ridx = row % bs; cidx = col % bs; \
80       low = 0; high = nrow; \
81       while (high-low > 3) { \
82         t = (low+high)/2; \
83         if (rp[t] > bcol) high = t; \
84         else              low  = t; \
85       } \
86       for ( _i=low; _i<high; _i++ ) { \
87         if (rp[_i] > bcol) break; \
88         if (rp[_i] == bcol) { \
89           bap  = ap +  bs2*_i + bs*cidx + ridx; \
90           if (addv == ADD_VALUES) *bap += value;  \
91           else                    *bap  = value;  \
92           goto a_noinsert; \
93         } \
94       } \
95       if (a->nonew == 1) goto a_noinsert; \
96       else if (a->nonew == -1) SETERRQ(1,0,"Inserting a new nonzero in the matrix"); \
97       if (nrow >= rmax) { \
98         /* there is no extra room in row, therefore enlarge */ \
99         int    new_nz = ai[a->mbs] + CHUNKSIZE,len,*new_i,*new_j; \
100         Scalar *new_a; \
101  \
102         if (a->nonew == -2) SETERRQ(1,0,"Inserting a new nonzero in the matrix"); \
103  \
104         /* malloc new storage space */ \
105         len     = new_nz*(sizeof(int)+bs2*sizeof(Scalar))+(a->mbs+1)*sizeof(int); \
106         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
107         new_j   = (int *) (new_a + bs2*new_nz); \
108         new_i   = new_j + new_nz; \
109  \
110         /* copy over old data into new slots */ \
111         for ( ii=0; ii<brow+1; ii++ ) {new_i[ii] = ai[ii];} \
112         for ( ii=brow+1; ii<a->mbs+1; ii++ ) {new_i[ii] = ai[ii]+CHUNKSIZE;} \
113         PetscMemcpy(new_j,aj,(ai[brow]+nrow)*sizeof(int)); \
114         len = (new_nz - CHUNKSIZE - ai[brow] - nrow); \
115         PetscMemcpy(new_j+ai[brow]+nrow+CHUNKSIZE,aj+ai[brow]+nrow, \
116                                                            len*sizeof(int)); \
117         PetscMemcpy(new_a,aa,(ai[brow]+nrow)*bs2*sizeof(Scalar)); \
118         PetscMemzero(new_a+bs2*(ai[brow]+nrow),bs2*CHUNKSIZE*sizeof(Scalar)); \
119         PetscMemcpy(new_a+bs2*(ai[brow]+nrow+CHUNKSIZE), \
120                     aa+bs2*(ai[brow]+nrow),bs2*len*sizeof(Scalar));  \
121         /* free up old matrix storage */ \
122         PetscFree(a->a);  \
123         if (!a->singlemalloc) {PetscFree(a->i);PetscFree(a->j);} \
124         aa = a->a = new_a; ai = a->i = new_i; aj = a->j = new_j;  \
125         a->singlemalloc = 1; \
126  \
127         rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
128         rmax = aimax[brow] = aimax[brow] + CHUNKSIZE; \
129         PLogObjectMemory(A,CHUNKSIZE*(sizeof(int) + bs2*sizeof(Scalar))); \
130         a->maxnz += bs2*CHUNKSIZE; \
131         a->reallocs++; \
132         a->nz++; \
133       } \
134       N = nrow++ - 1;  \
135       /* shift up all the later entries in this row */ \
136       for ( ii=N; ii>=_i; ii-- ) { \
137         rp[ii+1] = rp[ii]; \
138         PetscMemcpy(ap+bs2*(ii+1),ap+bs2*(ii),bs2*sizeof(Scalar)); \
139       } \
140       if (N>=_i) PetscMemzero(ap+bs2*_i,bs2*sizeof(Scalar));  \
141       rp[_i]                      = bcol;  \
142       ap[bs2*_i + bs*cidx + ridx] = value;  \
143       a_noinsert:; \
144     ailen[brow] = nrow; \
145 }
146 
147 #define  MatSetValues_SeqBAIJ_B_Private(row,col,value,addv) \
148 { \
149  \
150     brow = row/bs;  \
151     rp   = bj + bi[brow]; ap = ba + bs2*bi[brow]; \
152     rmax = bimax[brow]; nrow = bilen[brow]; \
153       bcol = col/bs; \
154       ridx = row % bs; cidx = col % bs; \
155       low = 0; high = nrow; \
156       while (high-low > 3) { \
157         t = (low+high)/2; \
158         if (rp[t] > bcol) high = t; \
159         else              low  = t; \
160       } \
161       for ( _i=low; _i<high; _i++ ) { \
162         if (rp[_i] > bcol) break; \
163         if (rp[_i] == bcol) { \
164           bap  = ap +  bs2*_i + bs*cidx + ridx; \
165           if (addv == ADD_VALUES) *bap += value;  \
166           else                    *bap  = value;  \
167           goto b_noinsert; \
168         } \
169       } \
170       if (b->nonew == 1) goto b_noinsert; \
171       else if (b->nonew == -1) SETERRQ(1,0,"Inserting a new nonzero in the matrix"); \
172       if (nrow >= rmax) { \
173         /* there is no extra room in row, therefore enlarge */ \
174         int    new_nz = bi[b->mbs] + CHUNKSIZE,len,*new_i,*new_j; \
175         Scalar *new_a; \
176  \
177         if (b->nonew == -2) SETERRQ(1,0,"Inserting a new nonzero in the matrix"); \
178  \
179         /* malloc new storage space */ \
180         len     = new_nz*(sizeof(int)+bs2*sizeof(Scalar))+(b->mbs+1)*sizeof(int); \
181         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
182         new_j   = (int *) (new_a + bs2*new_nz); \
183         new_i   = new_j + new_nz; \
184  \
185         /* copy over old data into new slots */ \
186         for ( ii=0; ii<brow+1; ii++ ) {new_i[ii] = bi[ii];} \
187         for ( ii=brow+1; ii<b->mbs+1; ii++ ) {new_i[ii] = bi[ii]+CHUNKSIZE;} \
188         PetscMemcpy(new_j,bj,(bi[brow]+nrow)*sizeof(int)); \
189         len = (new_nz - CHUNKSIZE - bi[brow] - nrow); \
190         PetscMemcpy(new_j+bi[brow]+nrow+CHUNKSIZE,bj+bi[brow]+nrow, \
191                                                            len*sizeof(int)); \
192         PetscMemcpy(new_a,ba,(bi[brow]+nrow)*bs2*sizeof(Scalar)); \
193         PetscMemzero(new_a+bs2*(bi[brow]+nrow),bs2*CHUNKSIZE*sizeof(Scalar)); \
194         PetscMemcpy(new_a+bs2*(bi[brow]+nrow+CHUNKSIZE), \
195                     ba+bs2*(bi[brow]+nrow),bs2*len*sizeof(Scalar));  \
196         /* free up old matrix storage */ \
197         PetscFree(b->a);  \
198         if (!b->singlemalloc) {PetscFree(b->i);PetscFree(b->j);} \
199         ba = b->a = new_a; bi = b->i = new_i; bj = b->j = new_j;  \
200         b->singlemalloc = 1; \
201  \
202         rp   = bj + bi[brow]; ap = ba + bs2*bi[brow]; \
203         rmax = bimax[brow] = bimax[brow] + CHUNKSIZE; \
204         PLogObjectMemory(B,CHUNKSIZE*(sizeof(int) + bs2*sizeof(Scalar))); \
205         b->maxnz += bs2*CHUNKSIZE; \
206         b->reallocs++; \
207         b->nz++; \
208       } \
209       N = nrow++ - 1;  \
210       /* shift up all the later entries in this row */ \
211       for ( ii=N; ii>=_i; ii-- ) { \
212         rp[ii+1] = rp[ii]; \
213         PetscMemcpy(ap+bs2*(ii+1),ap+bs2*(ii),bs2*sizeof(Scalar)); \
214       } \
215       if (N>=_i) PetscMemzero(ap+bs2*_i,bs2*sizeof(Scalar));  \
216       rp[_i]                      = bcol;  \
217       ap[bs2*_i + bs*cidx + ridx] = value;  \
218       b_noinsert:; \
219     bilen[brow] = nrow; \
220 }
221 
222 #undef __FUNC__
223 #define __FUNC__ "MatSetValues_MPIBAIJ"
224 int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
225 {
226   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
227   Scalar      value;
228   int         ierr,i,j,row,col;
229   int         roworiented = baij->roworiented,rstart_orig=baij->rstart_bs ;
230   int         rend_orig=baij->rend_bs,cstart_orig=baij->cstart_bs;
231   int         cend_orig=baij->cend_bs,bs=baij->bs;
232 
233   /* Some Variables required in the macro */
234   Mat         A = baij->A;
235   Mat_SeqBAIJ *a = (Mat_SeqBAIJ *) (A)->data;
236   int         *aimax=a->imax,*ai=a->i,*ailen=a->ilen,*aj=a->j;
237   Scalar      *aa=a->a;
238 
239   Mat         B = baij->B;
240   Mat_SeqBAIJ *b = (Mat_SeqBAIJ *) (B)->data;
241   int         *bimax=b->imax,*bi=b->i,*bilen=b->ilen,*bj=b->j;
242   Scalar      *ba=b->a;
243 
244   int         *rp,ii,nrow,_i,rmax,N,brow,bcol;
245   int         low,high,t,ridx,cidx,bs2=a->bs2;
246   Scalar      *ap,*bap;
247 
248   for ( i=0; i<m; i++ ) {
249 #if defined(PETSC_BOPT_g)
250     if (im[i] < 0) SETERRQ(1,0,"Negative row");
251     if (im[i] >= baij->M) SETERRQ(1,0,"Row too large");
252 #endif
253     if (im[i] >= rstart_orig && im[i] < rend_orig) {
254       row = im[i] - rstart_orig;
255       for ( j=0; j<n; j++ ) {
256         if (in[j] >= cstart_orig && in[j] < cend_orig){
257           col = in[j] - cstart_orig;
258           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
259           MatSetValues_SeqBAIJ_A_Private(row,col,value,addv);
260           /* ierr = MatSetValues_SeqBAIJ(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
261         }
262 #if defined(PETSC_BOPT_g)
263         else if (in[j] < 0) {SETERRQ(1,0,"Negative column");}
264         else if (in[j] >= baij->N) {SETERRQ(1,0,"Col too large");}
265 #endif
266         else {
267           if (mat->was_assembled) {
268             if (!baij->colmap) {
269               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
270             }
271             col = baij->colmap[in[j]/bs] - 1 + in[j]%bs;
272             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
273               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
274               col =  in[j];
275               /* Reinitialize the variables required by MatSetValues_SeqBAIJ_B_Private() */
276               B = baij->B;
277               b = (Mat_SeqBAIJ *) (B)->data;
278               bimax=b->imax;bi=b->i;bilen=b->ilen;bj=b->j;
279               ba=b->a;
280             }
281           }
282           else col = in[j];
283           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
284           MatSetValues_SeqBAIJ_B_Private(row,col,value,addv);
285           /* ierr = MatSetValues_SeqBAIJ(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
286         }
287       }
288     }
289     else {
290       if (roworiented && !baij->donotstash) {
291         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
292       }
293       else {
294         if (!baij->donotstash) {
295           row = im[i];
296 	  for ( j=0; j<n; j++ ) {
297 	    ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
298           }
299         }
300       }
301     }
302   }
303   return 0;
304 }
305 
306 extern int MatSetValuesBlocked_SeqBAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
307 #undef __FUNC__
308 #define __FUNC__ "MatSetValuesBlocked_MPIBAIJ"
309 int MatSetValuesBlocked_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
310 {
311   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
312   Scalar      *value,*barray=baij->barray;
313   int         ierr,i,j,ii,jj,row,col,k,l;
314   int         roworiented = baij->roworiented,rstart=baij->rstart ;
315   int         rend=baij->rend,cstart=baij->cstart,stepval;
316   int         cend=baij->cend,bs=baij->bs,bs2=baij->bs2;
317 
318 
319   if(!barray) {
320     barray = (Scalar*) PetscMalloc(bs2*sizeof(Scalar)); CHKPTRQ(barray);
321     baij->barray = barray;
322   }
323 
324   if (roworiented) {
325     stepval = (n-1)*bs;
326   } else {
327     stepval = (m-1)*bs;
328   }
329   for ( i=0; i<m; i++ ) {
330 #if defined(PETSC_BOPT_g)
331     if (im[i] < 0) SETERRQ(1,0,"Negative row");
332     if (im[i] >= baij->Mbs) SETERRQ(1,0,"Row too large");
333 #endif
334     if (im[i] >= rstart && im[i] < rend) {
335       row = im[i] - rstart;
336       for ( j=0; j<n; j++ ) {
337         /* If NumCol = 1 then a copy is not required */
338         if ((roworiented) && (n == 1)) {
339           barray = v + i*bs2;
340         } else if((!roworiented) && (m == 1)) {
341           barray = v + j*bs2;
342         } else { /* Here a copy is required */
343           if (roworiented) {
344             value = v + i*(stepval+bs)*bs + j*bs;
345           } else {
346             value = v + j*(stepval+bs)*bs + i*bs;
347           }
348           for ( ii=0; ii<bs; ii++,value+=stepval )
349             for (jj=0; jj<bs; jj++ )
350               *barray++  = *value++;
351           barray -=bs2;
352         }
353 
354         if (in[j] >= cstart && in[j] < cend){
355           col = in[j] - cstart;
356           ierr = MatSetValuesBlocked_SeqBAIJ(baij->A,1,&row,1,&col,barray,addv);CHKERRQ(ierr);
357         }
358 #if defined(PETSC_BOPT_g)
359         else if (in[j] < 0) {SETERRQ(1,0,"Negative column");}
360         else if (in[j] >= baij->Nbs) {SETERRQ(1,0,"Col too large");}
361 #endif
362         else {
363           if (mat->was_assembled) {
364             if (!baij->colmap) {
365               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
366             }
367             col = baij->colmap[in[j]] - 1;
368             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
369               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
370               col =  in[j];
371             }
372           }
373           else col = in[j];
374           ierr = MatSetValuesBlocked_SeqBAIJ(baij->B,1,&row,1,&col,barray,addv);CHKERRQ(ierr);
375         }
376       }
377     }
378     else {
379       if (!baij->donotstash) {
380         if (roworiented ) {
381           row   = im[i]*bs;
382           value = v + i*(stepval+bs)*bs;
383           for ( j=0; j<bs; j++,row++ ) {
384             for ( k=0; k<n; k++ ) {
385               for ( col=in[k]*bs,l=0; l<bs; l++,col++) {
386                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
387               }
388             }
389           }
390         }
391         else {
392           for ( j=0; j<n; j++ ) {
393             value = v + j*(stepval+bs)*bs + i*bs;
394             col   = in[j]*bs;
395             for ( k=0; k<bs; k++,col++,value+=stepval) {
396               for ( row = im[i]*bs,l=0; l<bs; l++,row++) {
397                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
398               }
399             }
400           }
401 
402         }
403       }
404     }
405   }
406   return 0;
407 }
408 
409 #undef __FUNC__
410 #define __FUNC__ "MatGetValues_MPIBAIJ"
411 int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
412 {
413   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
414   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
415   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col;
416 
417   for ( i=0; i<m; i++ ) {
418     if (idxm[i] < 0) SETERRQ(1,0,"Negative row");
419     if (idxm[i] >= baij->M) SETERRQ(1,0,"Row too large");
420     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
421       row = idxm[i] - bsrstart;
422       for ( j=0; j<n; j++ ) {
423         if (idxn[j] < 0) SETERRQ(1,0,"Negative column");
424         if (idxn[j] >= baij->N) SETERRQ(1,0,"Col too large");
425         if (idxn[j] >= bscstart && idxn[j] < bscend){
426           col = idxn[j] - bscstart;
427           ierr = MatGetValues(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
428         }
429         else {
430           if (!baij->colmap) {
431             ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
432           }
433           if((baij->colmap[idxn[j]/bs]-1 < 0) ||
434              (baij->garray[(baij->colmap[idxn[j]/bs]-1)/bs] != idxn[j]/bs)) *(v+i*n+j) = 0.0;
435           else {
436             col  = (baij->colmap[idxn[j]/bs]-1) + idxn[j]%bs;
437             ierr = MatGetValues(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
438           }
439         }
440       }
441     }
442     else {
443       SETERRQ(1,0,"Only local values currently supported");
444     }
445   }
446   return 0;
447 }
448 
449 #undef __FUNC__
450 #define __FUNC__ "MatNorm_MPIBAIJ"
451 int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
452 {
453   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
454   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
455   int        ierr, i,bs2=baij->bs2;
456   double     sum = 0.0;
457   Scalar     *v;
458 
459   if (baij->size == 1) {
460     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
461   } else {
462     if (type == NORM_FROBENIUS) {
463       v = amat->a;
464       for (i=0; i<amat->nz*bs2; i++ ) {
465 #if defined(PETSC_COMPLEX)
466         sum += real(conj(*v)*(*v)); v++;
467 #else
468         sum += (*v)*(*v); v++;
469 #endif
470       }
471       v = bmat->a;
472       for (i=0; i<bmat->nz*bs2; i++ ) {
473 #if defined(PETSC_COMPLEX)
474         sum += real(conj(*v)*(*v)); v++;
475 #else
476         sum += (*v)*(*v); v++;
477 #endif
478       }
479       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
480       *norm = sqrt(*norm);
481     }
482     else
483       SETERRQ(PETSC_ERR_SUP,0,"No support for this norm yet");
484   }
485   return 0;
486 }
487 
488 #undef __FUNC__
489 #define __FUNC__ "MatAssemblyBegin_MPIBAIJ"
490 int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
491 {
492   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
493   MPI_Comm    comm = mat->comm;
494   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
495   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
496   MPI_Request *send_waits,*recv_waits;
497   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
498   InsertMode  addv;
499   Scalar      *rvalues,*svalues;
500 
501   /* make sure all processors are either in INSERTMODE or ADDMODE */
502   MPI_Allreduce(&mat->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
503   if (addv == (ADD_VALUES|INSERT_VALUES)) {
504     SETERRQ(1,0,"Some processors inserted others added");
505   }
506   mat->insertmode = addv; /* in case this processor had no cache */
507 
508   /*  first count number of contributors to each processor */
509   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
510   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
511   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
512   for ( i=0; i<baij->stash.n; i++ ) {
513     idx = baij->stash.idx[i];
514     for ( j=0; j<size; j++ ) {
515       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
516         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
517       }
518     }
519   }
520   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
521 
522   /* inform other processors of number of messages and max length*/
523   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
524   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
525   nreceives = work[rank];
526   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
527   nmax = work[rank];
528   PetscFree(work);
529 
530   /* post receives:
531        1) each message will consist of ordered pairs
532      (global index,value) we store the global index as a double
533      to simplify the message passing.
534        2) since we don't know how long each individual message is we
535      allocate the largest needed buffer for each receive. Potentially
536      this is a lot of wasted space.
537 
538 
539        This could be done better.
540   */
541   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
542   CHKPTRQ(rvalues);
543   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
544   CHKPTRQ(recv_waits);
545   for ( i=0; i<nreceives; i++ ) {
546     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
547               comm,recv_waits+i);
548   }
549 
550   /* do sends:
551       1) starts[i] gives the starting index in svalues for stuff going to
552          the ith processor
553   */
554   svalues = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
555   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
556   CHKPTRQ(send_waits);
557   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
558   starts[0] = 0;
559   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
560   for ( i=0; i<baij->stash.n; i++ ) {
561     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
562     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
563     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
564   }
565   PetscFree(owner);
566   starts[0] = 0;
567   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
568   count = 0;
569   for ( i=0; i<size; i++ ) {
570     if (procs[i]) {
571       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
572                 comm,send_waits+count++);
573     }
574   }
575   PetscFree(starts); PetscFree(nprocs);
576 
577   /* Free cache space */
578   PLogInfo(mat,"MatAssemblyBegin_MPIBAIJ:Number of off-processor values %d\n",baij->stash.n);
579   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
580 
581   baij->svalues    = svalues;    baij->rvalues    = rvalues;
582   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
583   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
584   baij->rmax       = nmax;
585 
586   return 0;
587 }
588 
589 
590 #undef __FUNC__
591 #define __FUNC__ "MatAssemblyEnd_MPIBAIJ"
592 int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
593 {
594   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
595   MPI_Status  *send_status,recv_status;
596   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
597   int         bs=baij->bs,row,col,other_disassembled;
598   Scalar      *values,val;
599   InsertMode  addv = mat->insertmode;
600 
601   /*  wait on receives */
602   while (count) {
603     MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);
604     /* unpack receives into our local space */
605     values = baij->rvalues + 3*imdex*baij->rmax;
606     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
607     n = n/3;
608     for ( i=0; i<n; i++ ) {
609       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
610       col = (int) PetscReal(values[3*i+1]);
611       val = values[3*i+2];
612       if (col >= baij->cstart*bs && col < baij->cend*bs) {
613         col -= baij->cstart*bs;
614         ierr = MatSetValues(baij->A,1,&row,1,&col,&val,addv); CHKERRQ(ierr)
615       }
616       else {
617         if (mat->was_assembled) {
618           if (!baij->colmap) {
619             ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);
620           }
621           col = (baij->colmap[col/bs]-1)*bs + col%bs;
622           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
623             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
624             col = (int) PetscReal(values[3*i+1]);
625           }
626         }
627         ierr = MatSetValues(baij->B,1,&row,1,&col,&val,addv); CHKERRQ(ierr)
628       }
629     }
630     count--;
631   }
632   PetscFree(baij->recv_waits); PetscFree(baij->rvalues);
633 
634   /* wait on sends */
635   if (baij->nsends) {
636     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));
637     CHKPTRQ(send_status);
638     MPI_Waitall(baij->nsends,baij->send_waits,send_status);
639     PetscFree(send_status);
640   }
641   PetscFree(baij->send_waits); PetscFree(baij->svalues);
642 
643   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
644   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
645 
646   /* determine if any processor has disassembled, if so we must
647      also disassemble ourselfs, in order that we may reassemble. */
648   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
649   if (mat->was_assembled && !other_disassembled) {
650     ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
651   }
652 
653   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
654     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
655   }
656   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
657   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
658 
659   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
660   return 0;
661 }
662 
663 #undef __FUNC__
664 #define __FUNC__ "MatView_MPIBAIJ_Binary"
665 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
666 {
667   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
668   int          ierr;
669 
670   if (baij->size == 1) {
671     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
672   }
673   else SETERRQ(1,0,"Only uniprocessor output supported");
674   return 0;
675 }
676 
677 #undef __FUNC__
678 #define __FUNC__ "MatView_MPIBAIJ_ASCIIorDraworMatlab"
679 static int MatView_MPIBAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
680 {
681   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
682   int          ierr, format,rank,bs = baij->bs;
683   FILE         *fd;
684   ViewerType   vtype;
685 
686   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
687   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
688     ierr = ViewerGetFormat(viewer,&format);
689     if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
690       MatInfo info;
691       MPI_Comm_rank(mat->comm,&rank);
692       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
693       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
694       PetscSequentialPhaseBegin(mat->comm,1);
695       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
696               rank,baij->m,(int)info.nz_used*bs,(int)info.nz_allocated*bs,
697               baij->bs,(int)info.memory);
698       ierr = MatGetInfo(baij->A,MAT_LOCAL,&info);
699       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
700       ierr = MatGetInfo(baij->B,MAT_LOCAL,&info);
701       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
702       fflush(fd);
703       PetscSequentialPhaseEnd(mat->comm,1);
704       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
705       return 0;
706     }
707     else if (format == VIEWER_FORMAT_ASCII_INFO) {
708       PetscPrintf(mat->comm,"  block size is %d\n",bs);
709       return 0;
710     }
711   }
712 
713   if (vtype == DRAW_VIEWER) {
714     Draw       draw;
715     PetscTruth isnull;
716     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
717     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
718   }
719 
720   if (vtype == ASCII_FILE_VIEWER) {
721     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
722     PetscSequentialPhaseBegin(mat->comm,1);
723     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
724            baij->rank,baij->m,baij->rstart*bs,baij->rend*bs,baij->n,
725             baij->cstart*bs,baij->cend*bs);
726     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
727     ierr = MatView(baij->B,viewer); CHKERRQ(ierr);
728     fflush(fd);
729     PetscSequentialPhaseEnd(mat->comm,1);
730   }
731   else {
732     int size = baij->size;
733     rank = baij->rank;
734     if (size == 1) {
735       ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
736     }
737     else {
738       /* assemble the entire matrix onto first processor. */
739       Mat         A;
740       Mat_SeqBAIJ *Aloc;
741       int         M = baij->M, N = baij->N,*ai,*aj,row,col,i,j,k,*rvals;
742       int         mbs=baij->mbs;
743       Scalar      *a;
744 
745       if (!rank) {
746         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
747         CHKERRQ(ierr);
748       }
749       else {
750         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
751         CHKERRQ(ierr);
752       }
753       PLogObjectParent(mat,A);
754 
755       /* copy over the A part */
756       Aloc = (Mat_SeqBAIJ*) baij->A->data;
757       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
758       row = baij->rstart;
759       rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
760 
761       for ( i=0; i<mbs; i++ ) {
762         rvals[0] = bs*(baij->rstart + i);
763         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
764         for ( j=ai[i]; j<ai[i+1]; j++ ) {
765           col = (baij->cstart+aj[j])*bs;
766           for (k=0; k<bs; k++ ) {
767             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
768             col++; a += bs;
769           }
770         }
771       }
772       /* copy over the B part */
773       Aloc = (Mat_SeqBAIJ*) baij->B->data;
774       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
775       row = baij->rstart*bs;
776       for ( i=0; i<mbs; i++ ) {
777         rvals[0] = bs*(baij->rstart + i);
778         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
779         for ( j=ai[i]; j<ai[i+1]; j++ ) {
780           col = baij->garray[aj[j]]*bs;
781           for (k=0; k<bs; k++ ) {
782             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
783             col++; a += bs;
784           }
785         }
786       }
787       PetscFree(rvals);
788       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
789       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
790       if (!rank) {
791         ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
792       }
793       ierr = MatDestroy(A); CHKERRQ(ierr);
794     }
795   }
796   return 0;
797 }
798 
799 
800 
801 #undef __FUNC__
802 #define __FUNC__ "MatView_MPIBAIJ"
803 int MatView_MPIBAIJ(PetscObject obj,Viewer viewer)
804 {
805   Mat         mat = (Mat) obj;
806   int         ierr;
807   ViewerType  vtype;
808 
809   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
810   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
811       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
812     ierr = MatView_MPIBAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
813   }
814   else if (vtype == BINARY_FILE_VIEWER) {
815     return MatView_MPIBAIJ_Binary(mat,viewer);
816   }
817   return 0;
818 }
819 
820 #undef __FUNC__
821 #define __FUNC__ "MatDestroy_MPIBAIJ"
822 int MatDestroy_MPIBAIJ(PetscObject obj)
823 {
824   Mat         mat = (Mat) obj;
825   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
826   int         ierr;
827 
828 #if defined(PETSC_LOG)
829   PLogObjectState(obj,"Rows=%d, Cols=%d",baij->M,baij->N);
830 #endif
831 
832   PetscFree(baij->rowners);
833   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
834   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
835   if (baij->colmap) PetscFree(baij->colmap);
836   if (baij->garray) PetscFree(baij->garray);
837   if (baij->lvec)   VecDestroy(baij->lvec);
838   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
839   if (baij->rowvalues) PetscFree(baij->rowvalues);
840   if (baij->barray) PetscFree(baij->barray);
841   PetscFree(baij);
842   if (mat->mapping) {
843     ierr = ISLocalToGlobalMappingDestroy(mat->mapping); CHKERRQ(ierr);
844   }
845   PLogObjectDestroy(mat);
846   PetscHeaderDestroy(mat);
847   return 0;
848 }
849 
850 #undef __FUNC__
851 #define __FUNC__ "MatMult_MPIBAIJ"
852 int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
853 {
854   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
855   int         ierr, nt;
856 
857   VecGetLocalSize_Fast(xx,nt);
858   if (nt != a->n) {
859     SETERRQ(1,0,"Incompatible partition of A and xx");
860   }
861   VecGetLocalSize_Fast(yy,nt);
862   if (nt != a->m) {
863     SETERRQ(1,0,"Incompatible parition of A and yy");
864   }
865   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
866   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
867   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
868   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
869   ierr = VecScatterPostRecvs(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
870   return 0;
871 }
872 
873 #undef __FUNC__
874 #define __FUNC__ "MatMultAdd_MPIBAIJ"
875 int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
876 {
877   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
878   int        ierr;
879   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
880   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
881   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
882   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
883   return 0;
884 }
885 
886 #undef __FUNC__
887 #define __FUNC__ "MatMultTrans_MPIBAIJ"
888 int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
889 {
890   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
891   int        ierr;
892 
893   /* do nondiagonal part */
894   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
895   /* send it on its way */
896   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
897   /* do local part */
898   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
899   /* receive remote parts: note this assumes the values are not actually */
900   /* inserted in yy until the next line, which is true for my implementation*/
901   /* but is not perhaps always true. */
902   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
903   return 0;
904 }
905 
906 #undef __FUNC__
907 #define __FUNC__ "MatMultTransAdd_MPIBAIJ"
908 int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
909 {
910   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
911   int        ierr;
912 
913   /* do nondiagonal part */
914   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
915   /* send it on its way */
916   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
917   /* do local part */
918   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
919   /* receive remote parts: note this assumes the values are not actually */
920   /* inserted in yy until the next line, which is true for my implementation*/
921   /* but is not perhaps always true. */
922   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
923   return 0;
924 }
925 
926 /*
927   This only works correctly for square matrices where the subblock A->A is the
928    diagonal block
929 */
930 #undef __FUNC__
931 #define __FUNC__ "MatGetDiagonal_MPIBAIJ"
932 int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
933 {
934   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
935   if (a->M != a->N)
936     SETERRQ(1,0,"Supports only square matrix where A->A is diag block");
937   return MatGetDiagonal(a->A,v);
938 }
939 
940 #undef __FUNC__
941 #define __FUNC__ "MatScale_MPIBAIJ"
942 int MatScale_MPIBAIJ(Scalar *aa,Mat A)
943 {
944   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
945   int        ierr;
946   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
947   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
948   return 0;
949 }
950 
951 #undef __FUNC__
952 #define __FUNC__ "MatGetSize_MPIBAIJ"
953 int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
954 {
955   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
956   *m = mat->M; *n = mat->N;
957   return 0;
958 }
959 
960 #undef __FUNC__
961 #define __FUNC__ "MatGetLocalSize_MPIBAIJ"
962 int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
963 {
964   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
965   *m = mat->m; *n = mat->N;
966   return 0;
967 }
968 
969 #undef __FUNC__
970 #define __FUNC__ "MatGetOwnershipRange_MPIBAIJ"
971 int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
972 {
973   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
974   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
975   return 0;
976 }
977 
978 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
979 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
980 
981 #undef __FUNC__
982 #define __FUNC__ "MatGetRow_MPIBAIJ"
983 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
984 {
985   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
986   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
987   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
988   int        nztot, nzA, nzB, lrow, brstart = mat->rstart*bs, brend = mat->rend*bs;
989   int        *cmap, *idx_p,cstart = mat->cstart;
990 
991   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,0,"Already active");
992   mat->getrowactive = PETSC_TRUE;
993 
994   if (!mat->rowvalues && (idx || v)) {
995     /*
996         allocate enough space to hold information from the longest row.
997     */
998     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
999     int     max = 1,mbs = mat->mbs,tmp;
1000     for ( i=0; i<mbs; i++ ) {
1001       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
1002       if (max < tmp) { max = tmp; }
1003     }
1004     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
1005     CHKPTRQ(mat->rowvalues);
1006     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
1007   }
1008 
1009 
1010   if (row < brstart || row >= brend) SETERRQ(1,0,"Only local rows")
1011   lrow = row - brstart;
1012 
1013   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
1014   if (!v)   {pvA = 0; pvB = 0;}
1015   if (!idx) {pcA = 0; if (!v) pcB = 0;}
1016   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1017   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1018   nztot = nzA + nzB;
1019 
1020   cmap  = mat->garray;
1021   if (v  || idx) {
1022     if (nztot) {
1023       /* Sort by increasing column numbers, assuming A and B already sorted */
1024       int imark = -1;
1025       if (v) {
1026         *v = v_p = mat->rowvalues;
1027         for ( i=0; i<nzB; i++ ) {
1028           if (cmap[cworkB[i]/bs] < cstart)   v_p[i] = vworkB[i];
1029           else break;
1030         }
1031         imark = i;
1032         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
1033         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
1034       }
1035       if (idx) {
1036         *idx = idx_p = mat->rowindices;
1037         if (imark > -1) {
1038           for ( i=0; i<imark; i++ ) {
1039             idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs;
1040           }
1041         } else {
1042           for ( i=0; i<nzB; i++ ) {
1043             if (cmap[cworkB[i]/bs] < cstart)
1044               idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
1045             else break;
1046           }
1047           imark = i;
1048         }
1049         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart*bs + cworkA[i];
1050         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
1051       }
1052     }
1053     else {
1054       if (idx) *idx = 0;
1055       if (v)   *v   = 0;
1056     }
1057   }
1058   *nz = nztot;
1059   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1060   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1061   return 0;
1062 }
1063 
1064 #undef __FUNC__
1065 #define __FUNC__ "MatRestoreRow_MPIBAIJ"
1066 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
1067 {
1068   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1069   if (baij->getrowactive == PETSC_FALSE) {
1070     SETERRQ(1,0,"MatGetRow not called");
1071   }
1072   baij->getrowactive = PETSC_FALSE;
1073   return 0;
1074 }
1075 
1076 #undef __FUNC__
1077 #define __FUNC__ "MatGetBlockSize_MPIBAIJ"
1078 int MatGetBlockSize_MPIBAIJ(Mat mat,int *bs)
1079 {
1080   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1081   *bs = baij->bs;
1082   return 0;
1083 }
1084 
1085 #undef __FUNC__
1086 #define __FUNC__ "MatZeroEntries_MPIBAIJ"
1087 int MatZeroEntries_MPIBAIJ(Mat A)
1088 {
1089   Mat_MPIBAIJ *l = (Mat_MPIBAIJ *) A->data;
1090   int         ierr;
1091   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
1092   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
1093   return 0;
1094 }
1095 
1096 #undef __FUNC__
1097 #define __FUNC__ "MatGetInfo_MPIBAIJ"
1098 int MatGetInfo_MPIBAIJ(Mat matin,MatInfoType flag,MatInfo *info)
1099 {
1100   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) matin->data;
1101   Mat         A = a->A, B = a->B;
1102   int         ierr;
1103   double      isend[5], irecv[5];
1104 
1105   info->rows_global    = (double)a->M;
1106   info->columns_global = (double)a->N;
1107   info->rows_local     = (double)a->m;
1108   info->columns_local  = (double)a->N;
1109   info->block_size     = (double)a->bs;
1110   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
1111   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->memory;
1112   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
1113   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->memory;
1114   if (flag == MAT_LOCAL) {
1115     info->nz_used      = isend[0];
1116     info->nz_allocated = isend[1];
1117     info->nz_unneeded  = isend[2];
1118     info->memory       = isend[3];
1119     info->mallocs      = isend[4];
1120   } else if (flag == MAT_GLOBAL_MAX) {
1121     MPI_Allreduce(isend,irecv,5,MPI_INT,MPI_MAX,matin->comm);
1122     info->nz_used      = irecv[0];
1123     info->nz_allocated = irecv[1];
1124     info->nz_unneeded  = irecv[2];
1125     info->memory       = irecv[3];
1126     info->mallocs      = irecv[4];
1127   } else if (flag == MAT_GLOBAL_SUM) {
1128     MPI_Allreduce(isend,irecv,5,MPI_INT,MPI_SUM,matin->comm);
1129     info->nz_used      = irecv[0];
1130     info->nz_allocated = irecv[1];
1131     info->nz_unneeded  = irecv[2];
1132     info->memory       = irecv[3];
1133     info->mallocs      = irecv[4];
1134   }
1135   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
1136   info->fill_ratio_needed = 0;
1137   info->factor_mallocs    = 0;
1138   return 0;
1139 }
1140 
1141 #undef __FUNC__
1142 #define __FUNC__ "MatSetOption_MPIBAIJ"
1143 int MatSetOption_MPIBAIJ(Mat A,MatOption op)
1144 {
1145   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1146 
1147   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
1148       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
1149       op == MAT_COLUMNS_UNSORTED ||
1150       op == MAT_COLUMNS_SORTED ||
1151       op == MAT_NEW_NONZERO_ALLOCATION_ERROR ||
1152       op == MAT_NEW_NONZERO_LOCATION_ERROR) {
1153         MatSetOption(a->A,op);
1154         MatSetOption(a->B,op);
1155   } else if (op == MAT_ROW_ORIENTED) {
1156         a->roworiented = 1;
1157         MatSetOption(a->A,op);
1158         MatSetOption(a->B,op);
1159   } else if (op == MAT_ROWS_SORTED ||
1160              op == MAT_ROWS_UNSORTED ||
1161              op == MAT_SYMMETRIC ||
1162              op == MAT_STRUCTURALLY_SYMMETRIC ||
1163              op == MAT_YES_NEW_DIAGONALS)
1164     PLogInfo(A,"Info:MatSetOption_MPIBAIJ:Option ignored\n");
1165   else if (op == MAT_COLUMN_ORIENTED) {
1166     a->roworiented = 0;
1167     MatSetOption(a->A,op);
1168     MatSetOption(a->B,op);
1169   } else if (op == MAT_IGNORE_OFF_PROC_ENTRIES) {
1170     a->donotstash = 1;
1171   } else if (op == MAT_NO_NEW_DIAGONALS)
1172     {SETERRQ(PETSC_ERR_SUP,0,"MAT_NO_NEW_DIAGONALS");}
1173   else
1174     {SETERRQ(PETSC_ERR_SUP,0,"unknown option");}
1175   return 0;
1176 }
1177 
1178 #undef __FUNC__
1179 #define __FUNC__ "MatTranspose_MPIBAIJ("
1180 int MatTranspose_MPIBAIJ(Mat A,Mat *matout)
1181 {
1182   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) A->data;
1183   Mat_SeqBAIJ *Aloc;
1184   Mat        B;
1185   int        ierr,M=baij->M,N=baij->N,*ai,*aj,row,i,*rvals,j,k,col;
1186   int        bs=baij->bs,mbs=baij->mbs;
1187   Scalar     *a;
1188 
1189   if (matout == PETSC_NULL && M != N)
1190     SETERRQ(1,0,"Square matrix only for in-place");
1191   ierr = MatCreateMPIBAIJ(A->comm,baij->bs,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);
1192   CHKERRQ(ierr);
1193 
1194   /* copy over the A part */
1195   Aloc = (Mat_SeqBAIJ*) baij->A->data;
1196   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1197   row = baij->rstart;
1198   rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
1199 
1200   for ( i=0; i<mbs; i++ ) {
1201     rvals[0] = bs*(baij->rstart + i);
1202     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1203     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1204       col = (baij->cstart+aj[j])*bs;
1205       for (k=0; k<bs; k++ ) {
1206         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1207         col++; a += bs;
1208       }
1209     }
1210   }
1211   /* copy over the B part */
1212   Aloc = (Mat_SeqBAIJ*) baij->B->data;
1213   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1214   row = baij->rstart*bs;
1215   for ( i=0; i<mbs; i++ ) {
1216     rvals[0] = bs*(baij->rstart + i);
1217     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1218     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1219       col = baij->garray[aj[j]]*bs;
1220       for (k=0; k<bs; k++ ) {
1221         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1222         col++; a += bs;
1223       }
1224     }
1225   }
1226   PetscFree(rvals);
1227   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1228   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1229 
1230   if (matout != PETSC_NULL) {
1231     *matout = B;
1232   } else {
1233     /* This isn't really an in-place transpose .... but free data structures from baij */
1234     PetscFree(baij->rowners);
1235     ierr = MatDestroy(baij->A); CHKERRQ(ierr);
1236     ierr = MatDestroy(baij->B); CHKERRQ(ierr);
1237     if (baij->colmap) PetscFree(baij->colmap);
1238     if (baij->garray) PetscFree(baij->garray);
1239     if (baij->lvec) VecDestroy(baij->lvec);
1240     if (baij->Mvctx) VecScatterDestroy(baij->Mvctx);
1241     PetscFree(baij);
1242     PetscMemcpy(A,B,sizeof(struct _p_Mat));
1243     PetscHeaderDestroy(B);
1244   }
1245   return 0;
1246 }
1247 
1248 #undef __FUNC__
1249 #define __FUNC__ "MatDiagonalScale_MPIBAIJ"
1250 int MatDiagonalScale_MPIBAIJ(Mat A,Vec ll,Vec rr)
1251 {
1252   Mat a = ((Mat_MPIBAIJ *) A->data)->A;
1253   Mat b = ((Mat_MPIBAIJ *) A->data)->B;
1254   int ierr,s1,s2,s3;
1255 
1256   if (ll)  {
1257     ierr = VecGetLocalSize(ll,&s1); CHKERRQ(ierr);
1258     ierr = MatGetLocalSize(A,&s2,&s3); CHKERRQ(ierr);
1259     if (s1!=s2) SETERRQ(1,0,"non-conforming local sizes");
1260     ierr = MatDiagonalScale(a,ll,0); CHKERRQ(ierr);
1261     ierr = MatDiagonalScale(b,ll,0); CHKERRQ(ierr);
1262   }
1263   if (rr) SETERRQ(1,0,"not supported for right vector");
1264   return 0;
1265 }
1266 
1267 /* the code does not do the diagonal entries correctly unless the
1268    matrix is square and the column and row owerships are identical.
1269    This is a BUG. The only way to fix it seems to be to access
1270    baij->A and baij->B directly and not through the MatZeroRows()
1271    routine.
1272 */
1273 #undef __FUNC__
1274 #define __FUNC__ "MatZeroRows_MPIBAIJ"
1275 int MatZeroRows_MPIBAIJ(Mat A,IS is,Scalar *diag)
1276 {
1277   Mat_MPIBAIJ    *l = (Mat_MPIBAIJ *) A->data;
1278   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
1279   int            *procs,*nprocs,j,found,idx,nsends,*work;
1280   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
1281   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
1282   int            *lens,imdex,*lrows,*values,bs=l->bs;
1283   MPI_Comm       comm = A->comm;
1284   MPI_Request    *send_waits,*recv_waits;
1285   MPI_Status     recv_status,*send_status;
1286   IS             istmp;
1287 
1288   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
1289   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
1290 
1291   /*  first count number of contributors to each processor */
1292   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
1293   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
1294   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
1295   for ( i=0; i<N; i++ ) {
1296     idx = rows[i];
1297     found = 0;
1298     for ( j=0; j<size; j++ ) {
1299       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
1300         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
1301       }
1302     }
1303     if (!found) SETERRQ(1,0,"Index out of range");
1304   }
1305   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
1306 
1307   /* inform other processors of number of messages and max length*/
1308   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
1309   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
1310   nrecvs = work[rank];
1311   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
1312   nmax = work[rank];
1313   PetscFree(work);
1314 
1315   /* post receives:   */
1316   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
1317   CHKPTRQ(rvalues);
1318   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
1319   CHKPTRQ(recv_waits);
1320   for ( i=0; i<nrecvs; i++ ) {
1321     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
1322   }
1323 
1324   /* do sends:
1325       1) starts[i] gives the starting index in svalues for stuff going to
1326          the ith processor
1327   */
1328   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
1329   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
1330   CHKPTRQ(send_waits);
1331   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
1332   starts[0] = 0;
1333   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1334   for ( i=0; i<N; i++ ) {
1335     svalues[starts[owner[i]]++] = rows[i];
1336   }
1337   ISRestoreIndices(is,&rows);
1338 
1339   starts[0] = 0;
1340   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1341   count = 0;
1342   for ( i=0; i<size; i++ ) {
1343     if (procs[i]) {
1344       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
1345     }
1346   }
1347   PetscFree(starts);
1348 
1349   base = owners[rank]*bs;
1350 
1351   /*  wait on receives */
1352   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
1353   source = lens + nrecvs;
1354   count  = nrecvs; slen = 0;
1355   while (count) {
1356     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
1357     /* unpack receives into our local space */
1358     MPI_Get_count(&recv_status,MPI_INT,&n);
1359     source[imdex]  = recv_status.MPI_SOURCE;
1360     lens[imdex]  = n;
1361     slen += n;
1362     count--;
1363   }
1364   PetscFree(recv_waits);
1365 
1366   /* move the data into the send scatter */
1367   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
1368   count = 0;
1369   for ( i=0; i<nrecvs; i++ ) {
1370     values = rvalues + i*nmax;
1371     for ( j=0; j<lens[i]; j++ ) {
1372       lrows[count++] = values[j] - base;
1373     }
1374   }
1375   PetscFree(rvalues); PetscFree(lens);
1376   PetscFree(owner); PetscFree(nprocs);
1377 
1378   /* actually zap the local rows */
1379   ierr = ISCreateGeneral(PETSC_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
1380   PLogObjectParent(A,istmp);
1381   PetscFree(lrows);
1382   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
1383   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
1384   ierr = ISDestroy(istmp); CHKERRQ(ierr);
1385 
1386   /* wait on sends */
1387   if (nsends) {
1388     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
1389     CHKPTRQ(send_status);
1390     MPI_Waitall(nsends,send_waits,send_status);
1391     PetscFree(send_status);
1392   }
1393   PetscFree(send_waits); PetscFree(svalues);
1394 
1395   return 0;
1396 }
1397 extern int MatPrintHelp_SeqBAIJ(Mat);
1398 #undef __FUNC__
1399 #define __FUNC__ "MatPrintHelp_MPIBAIJ"
1400 int MatPrintHelp_MPIBAIJ(Mat A)
1401 {
1402   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1403 
1404   if (!a->rank) return MatPrintHelp_SeqBAIJ(a->A);
1405   else return 0;
1406 }
1407 
1408 #undef __FUNC__
1409 #define __FUNC__ "MatSetUnfactored_MPIBAIJ"
1410 int MatSetUnfactored_MPIBAIJ(Mat A)
1411 {
1412   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1413   int         ierr;
1414   ierr = MatSetUnfactored(a->A); CHKERRQ(ierr);
1415   return 0;
1416 }
1417 
1418 static int MatConvertSameType_MPIBAIJ(Mat,Mat *,int);
1419 
1420 /* -------------------------------------------------------------------*/
1421 static struct _MatOps MatOps = {
1422   MatSetValues_MPIBAIJ,MatGetRow_MPIBAIJ,MatRestoreRow_MPIBAIJ,MatMult_MPIBAIJ,
1423   MatMultAdd_MPIBAIJ,MatMultTrans_MPIBAIJ,MatMultTransAdd_MPIBAIJ,0,
1424   0,0,0,0,
1425   0,0,MatTranspose_MPIBAIJ,MatGetInfo_MPIBAIJ,
1426   0,MatGetDiagonal_MPIBAIJ,MatDiagonalScale_MPIBAIJ,MatNorm_MPIBAIJ,
1427   MatAssemblyBegin_MPIBAIJ,MatAssemblyEnd_MPIBAIJ,0,MatSetOption_MPIBAIJ,
1428   MatZeroEntries_MPIBAIJ,MatZeroRows_MPIBAIJ,0,
1429   0,0,0,MatGetSize_MPIBAIJ,
1430   MatGetLocalSize_MPIBAIJ,MatGetOwnershipRange_MPIBAIJ,0,0,
1431   0,0,MatConvertSameType_MPIBAIJ,0,0,
1432   0,0,0,MatGetSubMatrices_MPIBAIJ,
1433   MatIncreaseOverlap_MPIBAIJ,MatGetValues_MPIBAIJ,0,MatPrintHelp_MPIBAIJ,
1434   MatScale_MPIBAIJ,0,0,0,MatGetBlockSize_MPIBAIJ,
1435   0,0,0,0,0,0,MatSetUnfactored_MPIBAIJ,0,MatSetValuesBlocked_MPIBAIJ};
1436 
1437 
1438 #undef __FUNC__
1439 #define __FUNC__ "MatCreateMPIBAIJ"
1440 /*@C
1441    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
1442    (block compressed row).  For good matrix assembly performance
1443    the user should preallocate the matrix storage by setting the parameters
1444    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1445    performance can be increased by more than a factor of 50.
1446 
1447    Input Parameters:
1448 .  comm - MPI communicator
1449 .  bs   - size of blockk
1450 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1451            This value should be the same as the local size used in creating the
1452            y vector for the matrix-vector product y = Ax.
1453 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1454            This value should be the same as the local size used in creating the
1455            x vector for the matrix-vector product y = Ax.
1456 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1457 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1458 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
1459            submatrix  (same for all local rows)
1460 .  d_nzz - array containing the number of block nonzeros in the various block rows
1461            of the in diagonal portion of the local (possibly different for each block
1462            row) or PETSC_NULL.  You must leave room for the diagonal entry even if
1463            it is zero.
1464 .  o_nz  - number of block nonzeros per block row in the off-diagonal portion of local
1465            submatrix (same for all local rows).
1466 .  o_nzz - array containing the number of nonzeros in the various block rows of the
1467            off-diagonal portion of the local submatrix (possibly different for
1468            each block row) or PETSC_NULL.
1469 
1470    Output Parameter:
1471 .  A - the matrix
1472 
1473    Notes:
1474    The user MUST specify either the local or global matrix dimensions
1475    (possibly both).
1476 
1477    Storage Information:
1478    For a square global matrix we define each processor's diagonal portion
1479    to be its local rows and the corresponding columns (a square submatrix);
1480    each processor's off-diagonal portion encompasses the remainder of the
1481    local matrix (a rectangular submatrix).
1482 
1483    The user can specify preallocated storage for the diagonal part of
1484    the local submatrix with either d_nz or d_nnz (not both).  Set
1485    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1486    memory allocation.  Likewise, specify preallocated storage for the
1487    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1488 
1489    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1490    the figure below we depict these three local rows and all columns (0-11).
1491 
1492 $          0 1 2 3 4 5 6 7 8 9 10 11
1493 $         -------------------
1494 $  row 3  |  o o o d d d o o o o o o
1495 $  row 4  |  o o o d d d o o o o o o
1496 $  row 5  |  o o o d d d o o o o o o
1497 $         -------------------
1498 $
1499 
1500    Thus, any entries in the d locations are stored in the d (diagonal)
1501    submatrix, and any entries in the o locations are stored in the
1502    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1503    stored simply in the MATSEQBAIJ format for compressed row storage.
1504 
1505    Now d_nz should indicate the number of nonzeros per row in the d matrix,
1506    and o_nz should indicate the number of nonzeros per row in the o matrix.
1507    In general, for PDE problems in which most nonzeros are near the diagonal,
1508    one expects d_nz >> o_nz.   For large problems you MUST preallocate memory
1509    or you will get TERRIBLE performance; see the users' manual chapter on
1510    matrices.
1511 
1512 .keywords: matrix, block, aij, compressed row, sparse, parallel
1513 
1514 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues()
1515 @*/
1516 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
1517                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1518 {
1519   Mat          B;
1520   Mat_MPIBAIJ  *b;
1521   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE,size;
1522 
1523   if (bs < 1) SETERRQ(1,0,"invalid block size specified");
1524   *A = 0;
1525   PetscHeaderCreate(B,_p_Mat,MAT_COOKIE,MATMPIBAIJ,comm);
1526   PLogObjectCreate(B);
1527   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
1528   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
1529   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1530   MPI_Comm_size(comm,&size);
1531   if (size == 1) {
1532     B->ops.getrowij          = MatGetRowIJ_MPIBAIJ;
1533     B->ops.restorerowij      = MatRestoreRowIJ_MPIBAIJ;
1534     B->ops.lufactorsymbolic  = MatLUFactorSymbolic_MPIBAIJ;
1535     B->ops.lufactornumeric   = MatLUFactorNumeric_MPIBAIJ;
1536     B->ops.lufactor          = MatLUFactor_MPIBAIJ;
1537     B->ops.solve             = MatSolve_MPIBAIJ;
1538     B->ops.solveadd          = MatSolveAdd_MPIBAIJ;
1539     B->ops.solvetrans        = MatSolveTrans_MPIBAIJ;
1540     B->ops.solvetransadd     = MatSolveTransAdd_MPIBAIJ;
1541     B->ops.ilufactorsymbolic = MatILUFactorSymbolic_MPIBAIJ;
1542   }
1543 
1544   B->destroy    = MatDestroy_MPIBAIJ;
1545   B->view       = MatView_MPIBAIJ;
1546   B->mapping    = 0;
1547   B->factor     = 0;
1548   B->assembled  = PETSC_FALSE;
1549 
1550   B->insertmode = NOT_SET_VALUES;
1551   MPI_Comm_rank(comm,&b->rank);
1552   MPI_Comm_size(comm,&b->size);
1553 
1554   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
1555     SETERRQ(1,0,"Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1556   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) SETERRQ(1,0,"either M or m should be specified");
1557   if ( M == PETSC_DECIDE && n == PETSC_DECIDE)SETERRQ(1,0,"either N or n should be specified");
1558   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
1559   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
1560 
1561   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1562     work[0] = m; work[1] = n;
1563     mbs = m/bs; nbs = n/bs;
1564     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
1565     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
1566     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
1567   }
1568   if (m == PETSC_DECIDE) {
1569     Mbs = M/bs;
1570     if (Mbs*bs != M) SETERRQ(1,0,"No of global rows must be divisible by blocksize");
1571     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
1572     m   = mbs*bs;
1573   }
1574   if (n == PETSC_DECIDE) {
1575     Nbs = N/bs;
1576     if (Nbs*bs != N) SETERRQ(1,0,"No of global cols must be divisible by blocksize");
1577     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
1578     n   = nbs*bs;
1579   }
1580   if (mbs*bs != m || nbs*bs != n) SETERRQ(1,0,"No of local rows, cols must be divisible by blocksize");
1581 
1582   b->m = m; B->m = m;
1583   b->n = n; B->n = n;
1584   b->N = N; B->N = N;
1585   b->M = M; B->M = M;
1586   b->bs  = bs;
1587   b->bs2 = bs*bs;
1588   b->mbs = mbs;
1589   b->nbs = nbs;
1590   b->Mbs = Mbs;
1591   b->Nbs = Nbs;
1592 
1593   /* build local table of row and column ownerships */
1594   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1595   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIBAIJ));
1596   b->cowners = b->rowners + b->size + 2;
1597   MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
1598   b->rowners[0] = 0;
1599   for ( i=2; i<=b->size; i++ ) {
1600     b->rowners[i] += b->rowners[i-1];
1601   }
1602   b->rstart    = b->rowners[b->rank];
1603   b->rend      = b->rowners[b->rank+1];
1604   b->rstart_bs = b->rstart * bs;
1605   b->rend_bs   = b->rend * bs;
1606 
1607   MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
1608   b->cowners[0] = 0;
1609   for ( i=2; i<=b->size; i++ ) {
1610     b->cowners[i] += b->cowners[i-1];
1611   }
1612   b->cstart    = b->cowners[b->rank];
1613   b->cend      = b->cowners[b->rank+1];
1614   b->cstart_bs = b->cstart * bs;
1615   b->cend_bs   = b->cend * bs;
1616 
1617   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1618   ierr = MatCreateSeqBAIJ(PETSC_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1619   PLogObjectParent(B,b->A);
1620   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1621   ierr = MatCreateSeqBAIJ(PETSC_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1622   PLogObjectParent(B,b->B);
1623 
1624   /* build cache for off array entries formed */
1625   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1626   b->donotstash  = 0;
1627   b->colmap      = 0;
1628   b->garray      = 0;
1629   b->roworiented = 1;
1630 
1631   /* stuff used in block assembly */
1632   b->barray      = 0;
1633 
1634   /* stuff used for matrix vector multiply */
1635   b->lvec        = 0;
1636   b->Mvctx       = 0;
1637 
1638   /* stuff for MatGetRow() */
1639   b->rowindices   = 0;
1640   b->rowvalues    = 0;
1641   b->getrowactive = PETSC_FALSE;
1642 
1643   *A = B;
1644   return 0;
1645 }
1646 
1647 #undef __FUNC__
1648 #define __FUNC__ "MatConvertSameType_MPIBAIJ"
1649 static int MatConvertSameType_MPIBAIJ(Mat matin,Mat *newmat,int cpvalues)
1650 {
1651   Mat         mat;
1652   Mat_MPIBAIJ *a,*oldmat = (Mat_MPIBAIJ *) matin->data;
1653   int         ierr, len=0, flg;
1654 
1655   *newmat       = 0;
1656   PetscHeaderCreate(mat,_p_Mat,MAT_COOKIE,MATMPIBAIJ,matin->comm);
1657   PLogObjectCreate(mat);
1658   mat->data       = (void *) (a = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(a);
1659   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1660   mat->destroy    = MatDestroy_MPIBAIJ;
1661   mat->view       = MatView_MPIBAIJ;
1662   mat->factor     = matin->factor;
1663   mat->assembled  = PETSC_TRUE;
1664 
1665   a->m = mat->m   = oldmat->m;
1666   a->n = mat->n   = oldmat->n;
1667   a->M = mat->M   = oldmat->M;
1668   a->N = mat->N   = oldmat->N;
1669 
1670   a->bs  = oldmat->bs;
1671   a->bs2 = oldmat->bs2;
1672   a->mbs = oldmat->mbs;
1673   a->nbs = oldmat->nbs;
1674   a->Mbs = oldmat->Mbs;
1675   a->Nbs = oldmat->Nbs;
1676 
1677   a->rstart       = oldmat->rstart;
1678   a->rend         = oldmat->rend;
1679   a->cstart       = oldmat->cstart;
1680   a->cend         = oldmat->cend;
1681   a->size         = oldmat->size;
1682   a->rank         = oldmat->rank;
1683   mat->insertmode = NOT_SET_VALUES;
1684   a->rowvalues    = 0;
1685   a->getrowactive = PETSC_FALSE;
1686   a->barray       = 0;
1687 
1688   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1689   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIBAIJ));
1690   a->cowners = a->rowners + a->size + 2;
1691   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1692   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1693   if (oldmat->colmap) {
1694     a->colmap = (int *) PetscMalloc((a->Nbs)*sizeof(int));CHKPTRQ(a->colmap);
1695     PLogObjectMemory(mat,(a->Nbs)*sizeof(int));
1696     PetscMemcpy(a->colmap,oldmat->colmap,(a->Nbs)*sizeof(int));
1697   } else a->colmap = 0;
1698   if (oldmat->garray && (len = ((Mat_SeqBAIJ *) (oldmat->B->data))->nbs)) {
1699     a->garray = (int *) PetscMalloc(len*sizeof(int)); CHKPTRQ(a->garray);
1700     PLogObjectMemory(mat,len*sizeof(int));
1701     PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1702   } else a->garray = 0;
1703 
1704   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1705   PLogObjectParent(mat,a->lvec);
1706   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1707   PLogObjectParent(mat,a->Mvctx);
1708   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1709   PLogObjectParent(mat,a->A);
1710   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1711   PLogObjectParent(mat,a->B);
1712   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1713   if (flg) {
1714     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1715   }
1716   *newmat = mat;
1717   return 0;
1718 }
1719 
1720 #include "sys.h"
1721 
1722 #undef __FUNC__
1723 #define __FUNC__ "MatLoad_MPIBAIJ"
1724 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
1725 {
1726   Mat          A;
1727   int          i, nz, ierr, j,rstart, rend, fd;
1728   Scalar       *vals,*buf;
1729   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1730   MPI_Status   status;
1731   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
1732   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
1733   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,bs2,Mbs,mbs,extra_rows;
1734   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
1735   int          dcount,kmax,k,nzcount,tmp;
1736 
1737 
1738   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
1739   bs2  = bs*bs;
1740 
1741   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1742   if (!rank) {
1743     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1744     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
1745     if (header[0] != MAT_COOKIE) SETERRQ(1,0,"not matrix object");
1746   }
1747 
1748   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1749   M = header[1]; N = header[2];
1750 
1751   if (M != N) SETERRQ(1,0,"Can only do square matrices");
1752 
1753   /*
1754      This code adds extra rows to make sure the number of rows is
1755      divisible by the blocksize
1756   */
1757   Mbs        = M/bs;
1758   extra_rows = bs - M + bs*(Mbs);
1759   if (extra_rows == bs) extra_rows = 0;
1760   else                  Mbs++;
1761   if (extra_rows &&!rank) {
1762     PLogInfo(0,"MatLoad_MPIBAIJ:Padding loaded matrix to match blocksize\n");
1763   }
1764 
1765   /* determine ownership of all rows */
1766   mbs = Mbs/size + ((Mbs % size) > rank);
1767   m   = mbs * bs;
1768   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
1769   browners = rowners + size + 1;
1770   MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1771   rowners[0] = 0;
1772   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
1773   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
1774   rstart = rowners[rank];
1775   rend   = rowners[rank+1];
1776 
1777   /* distribute row lengths to all processors */
1778   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
1779   if (!rank) {
1780     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
1781     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1782     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
1783     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1784     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
1785     MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);
1786     PetscFree(sndcounts);
1787   }
1788   else {
1789     MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);
1790   }
1791 
1792   if (!rank) {
1793     /* calculate the number of nonzeros on each processor */
1794     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1795     PetscMemzero(procsnz,size*sizeof(int));
1796     for ( i=0; i<size; i++ ) {
1797       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
1798         procsnz[i] += rowlengths[j];
1799       }
1800     }
1801     PetscFree(rowlengths);
1802 
1803     /* determine max buffer needed and allocate it */
1804     maxnz = 0;
1805     for ( i=0; i<size; i++ ) {
1806       maxnz = PetscMax(maxnz,procsnz[i]);
1807     }
1808     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1809 
1810     /* read in my part of the matrix column indices  */
1811     nz = procsnz[0];
1812     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1813     mycols = ibuf;
1814     if (size == 1)  nz -= extra_rows;
1815     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1816     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
1817 
1818     /* read in every ones (except the last) and ship off */
1819     for ( i=1; i<size-1; i++ ) {
1820       nz = procsnz[i];
1821       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1822       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1823     }
1824     /* read in the stuff for the last proc */
1825     if ( size != 1 ) {
1826       nz = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
1827       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1828       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
1829       MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);
1830     }
1831     PetscFree(cols);
1832   }
1833   else {
1834     /* determine buffer space needed for message */
1835     nz = 0;
1836     for ( i=0; i<m; i++ ) {
1837       nz += locrowlens[i];
1838     }
1839     ibuf = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1840     mycols = ibuf;
1841     /* receive message of column indices*/
1842     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1843     MPI_Get_count(&status,MPI_INT,&maxnz);
1844     if (maxnz != nz) SETERRQ(1,0,"something is wrong with file");
1845   }
1846 
1847   /* loop over local rows, determining number of off diagonal entries */
1848   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
1849   odlens = dlens + (rend-rstart);
1850   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
1851   PetscMemzero(mask,3*Mbs*sizeof(int));
1852   masked1 = mask    + Mbs;
1853   masked2 = masked1 + Mbs;
1854   rowcount = 0; nzcount = 0;
1855   for ( i=0; i<mbs; i++ ) {
1856     dcount  = 0;
1857     odcount = 0;
1858     for ( j=0; j<bs; j++ ) {
1859       kmax = locrowlens[rowcount];
1860       for ( k=0; k<kmax; k++ ) {
1861         tmp = mycols[nzcount++]/bs;
1862         if (!mask[tmp]) {
1863           mask[tmp] = 1;
1864           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
1865           else masked1[dcount++] = tmp;
1866         }
1867       }
1868       rowcount++;
1869     }
1870 
1871     dlens[i]  = dcount;
1872     odlens[i] = odcount;
1873 
1874     /* zero out the mask elements we set */
1875     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
1876     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
1877   }
1878 
1879   /* create our matrix */
1880   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);
1881          CHKERRQ(ierr);
1882   A = *newmat;
1883   MatSetOption(A,MAT_COLUMNS_SORTED);
1884 
1885   if (!rank) {
1886     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
1887     /* read in my part of the matrix numerical values  */
1888     nz = procsnz[0];
1889     vals = buf;
1890     mycols = ibuf;
1891     if (size == 1)  nz -= extra_rows;
1892     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1893     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
1894 
1895     /* insert into matrix */
1896     jj      = rstart*bs;
1897     for ( i=0; i<m; i++ ) {
1898       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1899       mycols += locrowlens[i];
1900       vals   += locrowlens[i];
1901       jj++;
1902     }
1903     /* read in other processors (except the last one) and ship out */
1904     for ( i=1; i<size-1; i++ ) {
1905       nz = procsnz[i];
1906       vals = buf;
1907       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1908       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1909     }
1910     /* the last proc */
1911     if ( size != 1 ){
1912       nz = procsnz[i] - extra_rows;
1913       vals = buf;
1914       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1915       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
1916       MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);
1917     }
1918     PetscFree(procsnz);
1919   }
1920   else {
1921     /* receive numeric values */
1922     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
1923 
1924     /* receive message of values*/
1925     vals = buf;
1926     mycols = ibuf;
1927     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1928     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1929     if (maxnz != nz) SETERRQ(1,0,"something is wrong with file");
1930 
1931     /* insert into matrix */
1932     jj      = rstart*bs;
1933     for ( i=0; i<m; i++ ) {
1934       ierr    = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1935       mycols += locrowlens[i];
1936       vals   += locrowlens[i];
1937       jj++;
1938     }
1939   }
1940   PetscFree(locrowlens);
1941   PetscFree(buf);
1942   PetscFree(ibuf);
1943   PetscFree(rowners);
1944   PetscFree(dlens);
1945   PetscFree(mask);
1946   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1947   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1948   return 0;
1949 }
1950 
1951 
1952