xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision dd9472c661dc670b920d195e948779028afb0b09)
1 #ifdef PETSC_RCS_HEADER
2 static char vcid[] = "$Id: mpibaij.c,v 1.73 1997/07/09 20:55:26 balay Exp bsmith $";
3 #endif
4 
5 #include "pinclude/pviewer.h"
6 #include "src/mat/impls/baij/mpi/mpibaij.h"
7 #include "src/vec/vecimpl.h"
8 
9 
10 extern int MatSetUpMultiply_MPIBAIJ(Mat);
11 extern int DisAssemble_MPIBAIJ(Mat);
12 extern int MatIncreaseOverlap_MPIBAIJ(Mat,int,IS *,int);
13 extern int MatGetSubMatrices_MPIBAIJ(Mat,int,IS *,IS *,MatGetSubMatrixCall,Mat **);
14 extern int MatLUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,Mat *);
15 extern int MatLUFactorNumeric_MPIBAIJ(Mat,Mat *);
16 extern int MatLUFactor_MPIBAIJ(Mat,IS,IS,double);
17 extern int MatSolve_MPIBAIJ(Mat,Vec,Vec);
18 extern int MatSolveAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
19 extern int MatSolveTrans_MPIBAIJ(Mat,Vec,Vec);
20 extern int MatSolveTransAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
21 extern int MatILUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,int,Mat *);
22 
23 
24 /*
25      Local utility routine that creates a mapping from the global column
26    number to the local number in the off-diagonal part of the local
27    storage of the matrix.  This is done in a non scable way since the
28    length of colmap equals the global matrix length.
29 */
30 #undef __FUNC__
31 #define __FUNC__ "CreateColmap_MPIBAIJ_Private"
32 static int CreateColmap_MPIBAIJ_Private(Mat mat)
33 {
34   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
35   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
36   int         nbs = B->nbs,i,bs=B->bs;;
37 
38   baij->colmap = (int *) PetscMalloc((baij->Nbs+1)*sizeof(int));CHKPTRQ(baij->colmap);
39   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
40   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
41   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i*bs+1;
42   return 0;
43 }
44 
45 #undef __FUNC__
46 #define __FUNC__ "MatGetRowIJ_MPIBAIJ("
47 static int MatGetRowIJ_MPIBAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
48                             PetscTruth *done)
49 {
50   Mat_MPIBAIJ *aij = (Mat_MPIBAIJ *) mat->data;
51   int         ierr;
52   if (aij->size == 1) {
53     ierr = MatGetRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
54   } else SETERRQ(1,0,"not supported in parallel");
55   return 0;
56 }
57 
58 #undef __FUNC__
59 #define __FUNC__ "MatRestoreRowIJ_MPIBAIJ"
60 static int MatRestoreRowIJ_MPIBAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
61                                 PetscTruth *done)
62 {
63   Mat_MPIBAIJ *aij = (Mat_MPIBAIJ *) mat->data;
64   int        ierr;
65   if (aij->size == 1) {
66     ierr = MatRestoreRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
67   } else SETERRQ(1,0,"not supported in parallel");
68   return 0;
69 }
70 #define CHUNKSIZE  10
71 
72 #define  MatSetValues_SeqBAIJ_A_Private(row,col,value,addv) \
73 { \
74  \
75     brow = row/bs;  \
76     rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
77     rmax = aimax[brow]; nrow = ailen[brow]; \
78       bcol = col/bs; \
79       ridx = row % bs; cidx = col % bs; \
80       low = 0; high = nrow; \
81       while (high-low > 3) { \
82         t = (low+high)/2; \
83         if (rp[t] > bcol) high = t; \
84         else              low  = t; \
85       } \
86       for ( _i=low; _i<high; _i++ ) { \
87         if (rp[_i] > bcol) break; \
88         if (rp[_i] == bcol) { \
89           bap  = ap +  bs2*_i + bs*cidx + ridx; \
90           if (addv == ADD_VALUES) *bap += value;  \
91           else                    *bap  = value;  \
92           goto a_noinsert; \
93         } \
94       } \
95       if (a->nonew == 1) goto a_noinsert; \
96       else if (a->nonew == -1) SETERRQ(1,0,"Inserting a new nonzero in the matrix"); \
97       if (nrow >= rmax) { \
98         /* there is no extra room in row, therefore enlarge */ \
99         int    new_nz = ai[a->mbs] + CHUNKSIZE,len,*new_i,*new_j; \
100         Scalar *new_a; \
101  \
102         if (a->nonew == -2) SETERRQ(1,0,"Inserting a new nonzero in the matrix"); \
103  \
104         /* malloc new storage space */ \
105         len     = new_nz*(sizeof(int)+bs2*sizeof(Scalar))+(a->mbs+1)*sizeof(int); \
106         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
107         new_j   = (int *) (new_a + bs2*new_nz); \
108         new_i   = new_j + new_nz; \
109  \
110         /* copy over old data into new slots */ \
111         for ( ii=0; ii<brow+1; ii++ ) {new_i[ii] = ai[ii];} \
112         for ( ii=brow+1; ii<a->mbs+1; ii++ ) {new_i[ii] = ai[ii]+CHUNKSIZE;} \
113         PetscMemcpy(new_j,aj,(ai[brow]+nrow)*sizeof(int)); \
114         len = (new_nz - CHUNKSIZE - ai[brow] - nrow); \
115         PetscMemcpy(new_j+ai[brow]+nrow+CHUNKSIZE,aj+ai[brow]+nrow, \
116                                                            len*sizeof(int)); \
117         PetscMemcpy(new_a,aa,(ai[brow]+nrow)*bs2*sizeof(Scalar)); \
118         PetscMemzero(new_a+bs2*(ai[brow]+nrow),bs2*CHUNKSIZE*sizeof(Scalar)); \
119         PetscMemcpy(new_a+bs2*(ai[brow]+nrow+CHUNKSIZE), \
120                     aa+bs2*(ai[brow]+nrow),bs2*len*sizeof(Scalar));  \
121         /* free up old matrix storage */ \
122         PetscFree(a->a);  \
123         if (!a->singlemalloc) {PetscFree(a->i);PetscFree(a->j);} \
124         aa = a->a = new_a; ai = a->i = new_i; aj = a->j = new_j;  \
125         a->singlemalloc = 1; \
126  \
127         rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
128         rmax = aimax[brow] = aimax[brow] + CHUNKSIZE; \
129         PLogObjectMemory(A,CHUNKSIZE*(sizeof(int) + bs2*sizeof(Scalar))); \
130         a->maxnz += bs2*CHUNKSIZE; \
131         a->reallocs++; \
132         a->nz++; \
133       } \
134       N = nrow++ - 1;  \
135       /* shift up all the later entries in this row */ \
136       for ( ii=N; ii>=_i; ii-- ) { \
137         rp[ii+1] = rp[ii]; \
138         PetscMemcpy(ap+bs2*(ii+1),ap+bs2*(ii),bs2*sizeof(Scalar)); \
139       } \
140       if (N>=_i) PetscMemzero(ap+bs2*_i,bs2*sizeof(Scalar));  \
141       rp[_i]                      = bcol;  \
142       ap[bs2*_i + bs*cidx + ridx] = value;  \
143       a_noinsert:; \
144     ailen[brow] = nrow; \
145 }
146 
147 #define  MatSetValues_SeqBAIJ_B_Private(row,col,value,addv) \
148 { \
149  \
150     brow = row/bs;  \
151     rp   = bj + bi[brow]; ap = ba + bs2*bi[brow]; \
152     rmax = bimax[brow]; nrow = bilen[brow]; \
153       bcol = col/bs; \
154       ridx = row % bs; cidx = col % bs; \
155       low = 0; high = nrow; \
156       while (high-low > 3) { \
157         t = (low+high)/2; \
158         if (rp[t] > bcol) high = t; \
159         else              low  = t; \
160       } \
161       for ( _i=low; _i<high; _i++ ) { \
162         if (rp[_i] > bcol) break; \
163         if (rp[_i] == bcol) { \
164           bap  = ap +  bs2*_i + bs*cidx + ridx; \
165           if (addv == ADD_VALUES) *bap += value;  \
166           else                    *bap  = value;  \
167           goto b_noinsert; \
168         } \
169       } \
170       if (b->nonew == 1) goto b_noinsert; \
171       else if (b->nonew == -1) SETERRQ(1,0,"Inserting a new nonzero in the matrix"); \
172       if (nrow >= rmax) { \
173         /* there is no extra room in row, therefore enlarge */ \
174         int    new_nz = bi[b->mbs] + CHUNKSIZE,len,*new_i,*new_j; \
175         Scalar *new_a; \
176  \
177         if (b->nonew == -2) SETERRQ(1,0,"Inserting a new nonzero in the matrix"); \
178  \
179         /* malloc new storage space */ \
180         len     = new_nz*(sizeof(int)+bs2*sizeof(Scalar))+(b->mbs+1)*sizeof(int); \
181         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
182         new_j   = (int *) (new_a + bs2*new_nz); \
183         new_i   = new_j + new_nz; \
184  \
185         /* copy over old data into new slots */ \
186         for ( ii=0; ii<brow+1; ii++ ) {new_i[ii] = bi[ii];} \
187         for ( ii=brow+1; ii<b->mbs+1; ii++ ) {new_i[ii] = bi[ii]+CHUNKSIZE;} \
188         PetscMemcpy(new_j,bj,(bi[brow]+nrow)*sizeof(int)); \
189         len = (new_nz - CHUNKSIZE - bi[brow] - nrow); \
190         PetscMemcpy(new_j+bi[brow]+nrow+CHUNKSIZE,bj+bi[brow]+nrow, \
191                                                            len*sizeof(int)); \
192         PetscMemcpy(new_a,ba,(bi[brow]+nrow)*bs2*sizeof(Scalar)); \
193         PetscMemzero(new_a+bs2*(bi[brow]+nrow),bs2*CHUNKSIZE*sizeof(Scalar)); \
194         PetscMemcpy(new_a+bs2*(bi[brow]+nrow+CHUNKSIZE), \
195                     ba+bs2*(bi[brow]+nrow),bs2*len*sizeof(Scalar));  \
196         /* free up old matrix storage */ \
197         PetscFree(b->a);  \
198         if (!b->singlemalloc) {PetscFree(b->i);PetscFree(b->j);} \
199         ba = b->a = new_a; bi = b->i = new_i; bj = b->j = new_j;  \
200         b->singlemalloc = 1; \
201  \
202         rp   = bj + bi[brow]; ap = ba + bs2*bi[brow]; \
203         rmax = bimax[brow] = bimax[brow] + CHUNKSIZE; \
204         PLogObjectMemory(B,CHUNKSIZE*(sizeof(int) + bs2*sizeof(Scalar))); \
205         b->maxnz += bs2*CHUNKSIZE; \
206         b->reallocs++; \
207         b->nz++; \
208       } \
209       N = nrow++ - 1;  \
210       /* shift up all the later entries in this row */ \
211       for ( ii=N; ii>=_i; ii-- ) { \
212         rp[ii+1] = rp[ii]; \
213         PetscMemcpy(ap+bs2*(ii+1),ap+bs2*(ii),bs2*sizeof(Scalar)); \
214       } \
215       if (N>=_i) PetscMemzero(ap+bs2*_i,bs2*sizeof(Scalar));  \
216       rp[_i]                      = bcol;  \
217       ap[bs2*_i + bs*cidx + ridx] = value;  \
218       b_noinsert:; \
219     bilen[brow] = nrow; \
220 }
221 
222 #undef __FUNC__
223 #define __FUNC__ "MatSetValues_MPIBAIJ"
224 int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
225 {
226   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
227   Scalar      value;
228   int         ierr,i,j,row,col;
229   int         roworiented = baij->roworiented,rstart_orig=baij->rstart_bs ;
230   int         rend_orig=baij->rend_bs,cstart_orig=baij->cstart_bs;
231   int         cend_orig=baij->cend_bs,bs=baij->bs;
232 
233   /* Some Variables required in the macro */
234   Mat         A = baij->A;
235   Mat_SeqBAIJ *a = (Mat_SeqBAIJ *) (A)->data;
236   int         *aimax=a->imax,*ai=a->i,*ailen=a->ilen,*aj=a->j;
237   Scalar      *aa=a->a;
238 
239   Mat         B = baij->B;
240   Mat_SeqBAIJ *b = (Mat_SeqBAIJ *) (B)->data;
241   int         *bimax=b->imax,*bi=b->i,*bilen=b->ilen,*bj=b->j;
242   Scalar      *ba=b->a;
243 
244   int         *rp,ii,nrow,_i,rmax,N,brow,bcol;
245   int         low,high,t,ridx,cidx,bs2=a->bs2;
246   Scalar      *ap,*bap;
247 
248   for ( i=0; i<m; i++ ) {
249 #if defined(PETSC_BOPT_g)
250     if (im[i] < 0) SETERRQ(1,0,"Negative row");
251     if (im[i] >= baij->M) SETERRQ(1,0,"Row too large");
252 #endif
253     if (im[i] >= rstart_orig && im[i] < rend_orig) {
254       row = im[i] - rstart_orig;
255       for ( j=0; j<n; j++ ) {
256         if (in[j] >= cstart_orig && in[j] < cend_orig){
257           col = in[j] - cstart_orig;
258           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
259           MatSetValues_SeqBAIJ_A_Private(row,col,value,addv);
260           /* ierr = MatSetValues_SeqBAIJ(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
261         }
262 #if defined(PETSC_BOPT_g)
263         else if (in[j] < 0) {SETERRQ(1,0,"Negative column");}
264         else if (in[j] >= baij->N) {SETERRQ(1,0,"Col too large");}
265 #endif
266         else {
267           if (mat->was_assembled) {
268             if (!baij->colmap) {
269               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
270             }
271             col = baij->colmap[in[j]/bs] - 1 + in[j]%bs;
272             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
273               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
274               col =  in[j];
275               /* Reinitialize the variables required by MatSetValues_SeqBAIJ_B_Private() */
276               B = baij->B;
277               b = (Mat_SeqBAIJ *) (B)->data;
278               bimax=b->imax;bi=b->i;bilen=b->ilen;bj=b->j;
279               ba=b->a;
280             }
281           }
282           else col = in[j];
283           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
284           MatSetValues_SeqBAIJ_B_Private(row,col,value,addv);
285           /* ierr = MatSetValues_SeqBAIJ(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
286         }
287       }
288     }
289     else {
290       if (roworiented && !baij->donotstash) {
291         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
292       }
293       else {
294         if (!baij->donotstash) {
295           row = im[i];
296 	  for ( j=0; j<n; j++ ) {
297 	    ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
298           }
299         }
300       }
301     }
302   }
303   return 0;
304 }
305 
306 extern int MatSetValuesBlocked_SeqBAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
307 #undef __FUNC__
308 #define __FUNC__ "MatSetValuesBlocked_MPIBAIJ"
309 int MatSetValuesBlocked_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
310 {
311   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
312   Scalar      *value,*barray=baij->barray;
313   int         ierr,i,j,ii,jj,row,col,k,l;
314   int         roworiented = baij->roworiented,rstart=baij->rstart ;
315   int         rend=baij->rend,cstart=baij->cstart,stepval;
316   int         cend=baij->cend,bs=baij->bs,bs2=baij->bs2;
317 
318 
319   if(!barray) {
320     baij->barray = barray = (Scalar*) PetscMalloc(bs2*sizeof(Scalar)); CHKPTRQ(barray);
321   }
322 
323   if (roworiented) {
324     stepval = (n-1)*bs;
325   } else {
326     stepval = (m-1)*bs;
327   }
328   for ( i=0; i<m; i++ ) {
329 #if defined(PETSC_BOPT_g)
330     if (im[i] < 0) SETERRQ(1,0,"Negative row");
331     if (im[i] >= baij->Mbs) SETERRQ(1,0,"Row too large");
332 #endif
333     if (im[i] >= rstart && im[i] < rend) {
334       row = im[i] - rstart;
335       for ( j=0; j<n; j++ ) {
336         /* If NumCol = 1 then a copy is not required */
337         if ((roworiented) && (n == 1)) {
338           barray = v + i*bs2;
339         } else if((!roworiented) && (m == 1)) {
340           barray = v + j*bs2;
341         } else { /* Here a copy is required */
342           if (roworiented) {
343             value = v + i*(stepval+bs)*bs + j*bs;
344           } else {
345             value = v + j*(stepval+bs)*bs + i*bs;
346           }
347           for ( ii=0; ii<bs; ii++,value+=stepval ) {
348             for (jj=0; jj<bs; jj++ ) {
349               *barray++  = *value++;
350             }
351           }
352           barray -=bs2;
353         }
354 
355         if (in[j] >= cstart && in[j] < cend){
356           col  = in[j] - cstart;
357           ierr = MatSetValuesBlocked_SeqBAIJ(baij->A,1,&row,1,&col,barray,addv);CHKERRQ(ierr);
358         }
359 #if defined(PETSC_BOPT_g)
360         else if (in[j] < 0) {SETERRQ(1,0,"Negative column");}
361         else if (in[j] >= baij->Nbs) {SETERRQ(1,0,"Column too large");}
362 #endif
363         else {
364           if (mat->was_assembled) {
365             if (!baij->colmap) {
366               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
367             }
368 
369 #if defined(PETSC_BOPT_g)
370             if ((baij->colmap[in[j]] - 1) % bs) {SETERRQ(1,0,"Incorrect colmap");}
371 #endif
372             col = (baij->colmap[in[j]] - 1)/bs;
373             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
374               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
375               col =  in[j];
376             }
377           }
378           else col = in[j];
379           ierr = MatSetValuesBlocked_SeqBAIJ(baij->B,1,&row,1,&col,barray,addv);CHKERRQ(ierr);
380         }
381       }
382     }
383     else {
384       if (!baij->donotstash) {
385         if (roworiented ) {
386           row   = im[i]*bs;
387           value = v + i*(stepval+bs)*bs;
388           for ( j=0; j<bs; j++,row++ ) {
389             for ( k=0; k<n; k++ ) {
390               for ( col=in[k]*bs,l=0; l<bs; l++,col++) {
391                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
392               }
393             }
394           }
395         }
396         else {
397           for ( j=0; j<n; j++ ) {
398             value = v + j*(stepval+bs)*bs + i*bs;
399             col   = in[j]*bs;
400             for ( k=0; k<bs; k++,col++,value+=stepval) {
401               for ( row = im[i]*bs,l=0; l<bs; l++,row++) {
402                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
403               }
404             }
405           }
406         }
407       }
408     }
409   }
410   return 0;
411 }
412 
413 #undef __FUNC__
414 #define __FUNC__ "MatGetValues_MPIBAIJ"
415 int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
416 {
417   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
418   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
419   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col;
420 
421   for ( i=0; i<m; i++ ) {
422     if (idxm[i] < 0) SETERRQ(1,0,"Negative row");
423     if (idxm[i] >= baij->M) SETERRQ(1,0,"Row too large");
424     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
425       row = idxm[i] - bsrstart;
426       for ( j=0; j<n; j++ ) {
427         if (idxn[j] < 0) SETERRQ(1,0,"Negative column");
428         if (idxn[j] >= baij->N) SETERRQ(1,0,"Col too large");
429         if (idxn[j] >= bscstart && idxn[j] < bscend){
430           col = idxn[j] - bscstart;
431           ierr = MatGetValues(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
432         }
433         else {
434           if (!baij->colmap) {
435             ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
436           }
437           if((baij->colmap[idxn[j]/bs]-1 < 0) ||
438              (baij->garray[(baij->colmap[idxn[j]/bs]-1)/bs] != idxn[j]/bs)) *(v+i*n+j) = 0.0;
439           else {
440             col  = (baij->colmap[idxn[j]/bs]-1) + idxn[j]%bs;
441             ierr = MatGetValues(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
442           }
443         }
444       }
445     }
446     else {
447       SETERRQ(1,0,"Only local values currently supported");
448     }
449   }
450   return 0;
451 }
452 
453 #undef __FUNC__
454 #define __FUNC__ "MatNorm_MPIBAIJ"
455 int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
456 {
457   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
458   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
459   int        ierr, i,bs2=baij->bs2;
460   double     sum = 0.0;
461   Scalar     *v;
462 
463   if (baij->size == 1) {
464     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
465   } else {
466     if (type == NORM_FROBENIUS) {
467       v = amat->a;
468       for (i=0; i<amat->nz*bs2; i++ ) {
469 #if defined(PETSC_COMPLEX)
470         sum += real(conj(*v)*(*v)); v++;
471 #else
472         sum += (*v)*(*v); v++;
473 #endif
474       }
475       v = bmat->a;
476       for (i=0; i<bmat->nz*bs2; i++ ) {
477 #if defined(PETSC_COMPLEX)
478         sum += real(conj(*v)*(*v)); v++;
479 #else
480         sum += (*v)*(*v); v++;
481 #endif
482       }
483       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
484       *norm = sqrt(*norm);
485     }
486     else
487       SETERRQ(PETSC_ERR_SUP,0,"No support for this norm yet");
488   }
489   return 0;
490 }
491 
492 #undef __FUNC__
493 #define __FUNC__ "MatAssemblyBegin_MPIBAIJ"
494 int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
495 {
496   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
497   MPI_Comm    comm = mat->comm;
498   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
499   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
500   MPI_Request *send_waits,*recv_waits;
501   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
502   InsertMode  addv;
503   Scalar      *rvalues,*svalues;
504 
505   /* make sure all processors are either in INSERTMODE or ADDMODE */
506   MPI_Allreduce(&mat->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
507   if (addv == (ADD_VALUES|INSERT_VALUES)) {
508     SETERRQ(1,0,"Some processors inserted others added");
509   }
510   mat->insertmode = addv; /* in case this processor had no cache */
511 
512   /*  first count number of contributors to each processor */
513   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
514   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
515   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
516   for ( i=0; i<baij->stash.n; i++ ) {
517     idx = baij->stash.idx[i];
518     for ( j=0; j<size; j++ ) {
519       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
520         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
521       }
522     }
523   }
524   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
525 
526   /* inform other processors of number of messages and max length*/
527   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
528   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
529   nreceives = work[rank];
530   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
531   nmax = work[rank];
532   PetscFree(work);
533 
534   /* post receives:
535        1) each message will consist of ordered pairs
536      (global index,value) we store the global index as a double
537      to simplify the message passing.
538        2) since we don't know how long each individual message is we
539      allocate the largest needed buffer for each receive. Potentially
540      this is a lot of wasted space.
541 
542 
543        This could be done better.
544   */
545   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
546   CHKPTRQ(rvalues);
547   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
548   CHKPTRQ(recv_waits);
549   for ( i=0; i<nreceives; i++ ) {
550     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
551               comm,recv_waits+i);
552   }
553 
554   /* do sends:
555       1) starts[i] gives the starting index in svalues for stuff going to
556          the ith processor
557   */
558   svalues = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
559   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
560   CHKPTRQ(send_waits);
561   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
562   starts[0] = 0;
563   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
564   for ( i=0; i<baij->stash.n; i++ ) {
565     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
566     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
567     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
568   }
569   PetscFree(owner);
570   starts[0] = 0;
571   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
572   count = 0;
573   for ( i=0; i<size; i++ ) {
574     if (procs[i]) {
575       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
576                 comm,send_waits+count++);
577     }
578   }
579   PetscFree(starts); PetscFree(nprocs);
580 
581   /* Free cache space */
582   PLogInfo(mat,"MatAssemblyBegin_MPIBAIJ:Number of off-processor values %d\n",baij->stash.n);
583   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
584 
585   baij->svalues    = svalues;    baij->rvalues    = rvalues;
586   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
587   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
588   baij->rmax       = nmax;
589 
590   return 0;
591 }
592 
593 
594 #undef __FUNC__
595 #define __FUNC__ "MatAssemblyEnd_MPIBAIJ"
596 int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
597 {
598   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
599   MPI_Status  *send_status,recv_status;
600   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
601   int         bs=baij->bs,row,col,other_disassembled;
602   Scalar      *values,val;
603   InsertMode  addv = mat->insertmode;
604 
605   /*  wait on receives */
606   while (count) {
607     MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);
608     /* unpack receives into our local space */
609     values = baij->rvalues + 3*imdex*baij->rmax;
610     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
611     n = n/3;
612     for ( i=0; i<n; i++ ) {
613       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
614       col = (int) PetscReal(values[3*i+1]);
615       val = values[3*i+2];
616       if (col >= baij->cstart*bs && col < baij->cend*bs) {
617         col -= baij->cstart*bs;
618         ierr = MatSetValues(baij->A,1,&row,1,&col,&val,addv); CHKERRQ(ierr)
619       }
620       else {
621         if (mat->was_assembled) {
622           if (!baij->colmap) {
623             ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);
624           }
625           col = (baij->colmap[col/bs]) - 1 + col%bs;
626           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
627             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
628             col = (int) PetscReal(values[3*i+1]);
629           }
630         }
631         ierr = MatSetValues(baij->B,1,&row,1,&col,&val,addv); CHKERRQ(ierr)
632       }
633     }
634     count--;
635   }
636   PetscFree(baij->recv_waits); PetscFree(baij->rvalues);
637 
638   /* wait on sends */
639   if (baij->nsends) {
640     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));
641     CHKPTRQ(send_status);
642     MPI_Waitall(baij->nsends,baij->send_waits,send_status);
643     PetscFree(send_status);
644   }
645   PetscFree(baij->send_waits); PetscFree(baij->svalues);
646 
647   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
648   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
649 
650   /* determine if any processor has disassembled, if so we must
651      also disassemble ourselfs, in order that we may reassemble. */
652   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
653   if (mat->was_assembled && !other_disassembled) {
654     ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
655   }
656 
657   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
658     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
659   }
660   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
661   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
662 
663   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
664   return 0;
665 }
666 
667 #undef __FUNC__
668 #define __FUNC__ "MatView_MPIBAIJ_Binary"
669 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
670 {
671   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
672   int          ierr;
673 
674   if (baij->size == 1) {
675     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
676   }
677   else SETERRQ(1,0,"Only uniprocessor output supported");
678   return 0;
679 }
680 
681 #undef __FUNC__
682 #define __FUNC__ "MatView_MPIBAIJ_ASCIIorDraworMatlab"
683 static int MatView_MPIBAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
684 {
685   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
686   int          ierr, format,rank,bs = baij->bs;
687   FILE         *fd;
688   ViewerType   vtype;
689 
690   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
691   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
692     ierr = ViewerGetFormat(viewer,&format);
693     if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
694       MatInfo info;
695       MPI_Comm_rank(mat->comm,&rank);
696       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
697       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
698       PetscSequentialPhaseBegin(mat->comm,1);
699       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
700               rank,baij->m,(int)info.nz_used*bs,(int)info.nz_allocated*bs,
701               baij->bs,(int)info.memory);
702       ierr = MatGetInfo(baij->A,MAT_LOCAL,&info);
703       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
704       ierr = MatGetInfo(baij->B,MAT_LOCAL,&info);
705       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
706       fflush(fd);
707       PetscSequentialPhaseEnd(mat->comm,1);
708       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
709       return 0;
710     }
711     else if (format == VIEWER_FORMAT_ASCII_INFO) {
712       PetscPrintf(mat->comm,"  block size is %d\n",bs);
713       return 0;
714     }
715   }
716 
717   if (vtype == DRAW_VIEWER) {
718     Draw       draw;
719     PetscTruth isnull;
720     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
721     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
722   }
723 
724   if (vtype == ASCII_FILE_VIEWER) {
725     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
726     PetscSequentialPhaseBegin(mat->comm,1);
727     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
728            baij->rank,baij->m,baij->rstart*bs,baij->rend*bs,baij->n,
729             baij->cstart*bs,baij->cend*bs);
730     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
731     ierr = MatView(baij->B,viewer); CHKERRQ(ierr);
732     fflush(fd);
733     PetscSequentialPhaseEnd(mat->comm,1);
734   }
735   else {
736     int size = baij->size;
737     rank = baij->rank;
738     if (size == 1) {
739       ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
740     }
741     else {
742       /* assemble the entire matrix onto first processor. */
743       Mat         A;
744       Mat_SeqBAIJ *Aloc;
745       int         M = baij->M, N = baij->N,*ai,*aj,row,col,i,j,k,*rvals;
746       int         mbs=baij->mbs;
747       Scalar      *a;
748 
749       if (!rank) {
750         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
751         CHKERRQ(ierr);
752       }
753       else {
754         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
755         CHKERRQ(ierr);
756       }
757       PLogObjectParent(mat,A);
758 
759       /* copy over the A part */
760       Aloc = (Mat_SeqBAIJ*) baij->A->data;
761       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
762       row = baij->rstart;
763       rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
764 
765       for ( i=0; i<mbs; i++ ) {
766         rvals[0] = bs*(baij->rstart + i);
767         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
768         for ( j=ai[i]; j<ai[i+1]; j++ ) {
769           col = (baij->cstart+aj[j])*bs;
770           for (k=0; k<bs; k++ ) {
771             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
772             col++; a += bs;
773           }
774         }
775       }
776       /* copy over the B part */
777       Aloc = (Mat_SeqBAIJ*) baij->B->data;
778       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
779       row = baij->rstart*bs;
780       for ( i=0; i<mbs; i++ ) {
781         rvals[0] = bs*(baij->rstart + i);
782         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
783         for ( j=ai[i]; j<ai[i+1]; j++ ) {
784           col = baij->garray[aj[j]]*bs;
785           for (k=0; k<bs; k++ ) {
786             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
787             col++; a += bs;
788           }
789         }
790       }
791       PetscFree(rvals);
792       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
793       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
794       if (!rank) {
795         ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
796       }
797       ierr = MatDestroy(A); CHKERRQ(ierr);
798     }
799   }
800   return 0;
801 }
802 
803 
804 
805 #undef __FUNC__
806 #define __FUNC__ "MatView_MPIBAIJ"
807 int MatView_MPIBAIJ(PetscObject obj,Viewer viewer)
808 {
809   Mat         mat = (Mat) obj;
810   int         ierr;
811   ViewerType  vtype;
812 
813   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
814   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
815       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
816     ierr = MatView_MPIBAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
817   }
818   else if (vtype == BINARY_FILE_VIEWER) {
819     return MatView_MPIBAIJ_Binary(mat,viewer);
820   }
821   return 0;
822 }
823 
824 #undef __FUNC__
825 #define __FUNC__ "MatDestroy_MPIBAIJ"
826 int MatDestroy_MPIBAIJ(PetscObject obj)
827 {
828   Mat         mat = (Mat) obj;
829   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
830   int         ierr;
831 
832 #if defined(PETSC_LOG)
833   PLogObjectState(obj,"Rows=%d, Cols=%d",baij->M,baij->N);
834 #endif
835 
836   PetscFree(baij->rowners);
837   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
838   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
839   if (baij->colmap) PetscFree(baij->colmap);
840   if (baij->garray) PetscFree(baij->garray);
841   if (baij->lvec)   VecDestroy(baij->lvec);
842   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
843   if (baij->rowvalues) PetscFree(baij->rowvalues);
844   if (baij->barray) PetscFree(baij->barray);
845   PetscFree(baij);
846   if (mat->mapping) {
847     ierr = ISLocalToGlobalMappingDestroy(mat->mapping); CHKERRQ(ierr);
848   }
849   PLogObjectDestroy(mat);
850   PetscHeaderDestroy(mat);
851   return 0;
852 }
853 
854 #undef __FUNC__
855 #define __FUNC__ "MatMult_MPIBAIJ"
856 int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
857 {
858   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
859   int         ierr, nt;
860 
861   VecGetLocalSize_Fast(xx,nt);
862   if (nt != a->n) {
863     SETERRQ(1,0,"Incompatible partition of A and xx");
864   }
865   VecGetLocalSize_Fast(yy,nt);
866   if (nt != a->m) {
867     SETERRQ(1,0,"Incompatible parition of A and yy");
868   }
869   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
870   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
871   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
872   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
873   ierr = VecScatterPostRecvs(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
874   return 0;
875 }
876 
877 #undef __FUNC__
878 #define __FUNC__ "MatMultAdd_MPIBAIJ"
879 int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
880 {
881   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
882   int        ierr;
883   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
884   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
885   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
886   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
887   return 0;
888 }
889 
890 #undef __FUNC__
891 #define __FUNC__ "MatMultTrans_MPIBAIJ"
892 int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
893 {
894   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
895   int        ierr;
896 
897   /* do nondiagonal part */
898   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
899   /* send it on its way */
900   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
901   /* do local part */
902   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
903   /* receive remote parts: note this assumes the values are not actually */
904   /* inserted in yy until the next line, which is true for my implementation*/
905   /* but is not perhaps always true. */
906   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
907   return 0;
908 }
909 
910 #undef __FUNC__
911 #define __FUNC__ "MatMultTransAdd_MPIBAIJ"
912 int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
913 {
914   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
915   int        ierr;
916 
917   /* do nondiagonal part */
918   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
919   /* send it on its way */
920   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
921   /* do local part */
922   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
923   /* receive remote parts: note this assumes the values are not actually */
924   /* inserted in yy until the next line, which is true for my implementation*/
925   /* but is not perhaps always true. */
926   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
927   return 0;
928 }
929 
930 /*
931   This only works correctly for square matrices where the subblock A->A is the
932    diagonal block
933 */
934 #undef __FUNC__
935 #define __FUNC__ "MatGetDiagonal_MPIBAIJ"
936 int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
937 {
938   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
939   if (a->M != a->N)
940     SETERRQ(1,0,"Supports only square matrix where A->A is diag block");
941   return MatGetDiagonal(a->A,v);
942 }
943 
944 #undef __FUNC__
945 #define __FUNC__ "MatScale_MPIBAIJ"
946 int MatScale_MPIBAIJ(Scalar *aa,Mat A)
947 {
948   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
949   int        ierr;
950   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
951   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
952   return 0;
953 }
954 
955 #undef __FUNC__
956 #define __FUNC__ "MatGetSize_MPIBAIJ"
957 int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
958 {
959   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
960   *m = mat->M; *n = mat->N;
961   return 0;
962 }
963 
964 #undef __FUNC__
965 #define __FUNC__ "MatGetLocalSize_MPIBAIJ"
966 int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
967 {
968   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
969   *m = mat->m; *n = mat->N;
970   return 0;
971 }
972 
973 #undef __FUNC__
974 #define __FUNC__ "MatGetOwnershipRange_MPIBAIJ"
975 int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
976 {
977   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
978   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
979   return 0;
980 }
981 
982 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
983 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
984 
985 #undef __FUNC__
986 #define __FUNC__ "MatGetRow_MPIBAIJ"
987 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
988 {
989   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
990   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
991   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
992   int        nztot, nzA, nzB, lrow, brstart = mat->rstart*bs, brend = mat->rend*bs;
993   int        *cmap, *idx_p,cstart = mat->cstart;
994 
995   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,0,"Already active");
996   mat->getrowactive = PETSC_TRUE;
997 
998   if (!mat->rowvalues && (idx || v)) {
999     /*
1000         allocate enough space to hold information from the longest row.
1001     */
1002     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
1003     int     max = 1,mbs = mat->mbs,tmp;
1004     for ( i=0; i<mbs; i++ ) {
1005       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
1006       if (max < tmp) { max = tmp; }
1007     }
1008     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
1009     CHKPTRQ(mat->rowvalues);
1010     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
1011   }
1012 
1013 
1014   if (row < brstart || row >= brend) SETERRQ(1,0,"Only local rows")
1015   lrow = row - brstart;
1016 
1017   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
1018   if (!v)   {pvA = 0; pvB = 0;}
1019   if (!idx) {pcA = 0; if (!v) pcB = 0;}
1020   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1021   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1022   nztot = nzA + nzB;
1023 
1024   cmap  = mat->garray;
1025   if (v  || idx) {
1026     if (nztot) {
1027       /* Sort by increasing column numbers, assuming A and B already sorted */
1028       int imark = -1;
1029       if (v) {
1030         *v = v_p = mat->rowvalues;
1031         for ( i=0; i<nzB; i++ ) {
1032           if (cmap[cworkB[i]/bs] < cstart)   v_p[i] = vworkB[i];
1033           else break;
1034         }
1035         imark = i;
1036         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
1037         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
1038       }
1039       if (idx) {
1040         *idx = idx_p = mat->rowindices;
1041         if (imark > -1) {
1042           for ( i=0; i<imark; i++ ) {
1043             idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs;
1044           }
1045         } else {
1046           for ( i=0; i<nzB; i++ ) {
1047             if (cmap[cworkB[i]/bs] < cstart)
1048               idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
1049             else break;
1050           }
1051           imark = i;
1052         }
1053         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart*bs + cworkA[i];
1054         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
1055       }
1056     }
1057     else {
1058       if (idx) *idx = 0;
1059       if (v)   *v   = 0;
1060     }
1061   }
1062   *nz = nztot;
1063   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1064   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1065   return 0;
1066 }
1067 
1068 #undef __FUNC__
1069 #define __FUNC__ "MatRestoreRow_MPIBAIJ"
1070 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
1071 {
1072   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1073   if (baij->getrowactive == PETSC_FALSE) {
1074     SETERRQ(1,0,"MatGetRow not called");
1075   }
1076   baij->getrowactive = PETSC_FALSE;
1077   return 0;
1078 }
1079 
1080 #undef __FUNC__
1081 #define __FUNC__ "MatGetBlockSize_MPIBAIJ"
1082 int MatGetBlockSize_MPIBAIJ(Mat mat,int *bs)
1083 {
1084   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1085   *bs = baij->bs;
1086   return 0;
1087 }
1088 
1089 #undef __FUNC__
1090 #define __FUNC__ "MatZeroEntries_MPIBAIJ"
1091 int MatZeroEntries_MPIBAIJ(Mat A)
1092 {
1093   Mat_MPIBAIJ *l = (Mat_MPIBAIJ *) A->data;
1094   int         ierr;
1095   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
1096   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
1097   return 0;
1098 }
1099 
1100 #undef __FUNC__
1101 #define __FUNC__ "MatGetInfo_MPIBAIJ"
1102 int MatGetInfo_MPIBAIJ(Mat matin,MatInfoType flag,MatInfo *info)
1103 {
1104   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) matin->data;
1105   Mat         A = a->A, B = a->B;
1106   int         ierr;
1107   double      isend[5], irecv[5];
1108 
1109   info->rows_global    = (double)a->M;
1110   info->columns_global = (double)a->N;
1111   info->rows_local     = (double)a->m;
1112   info->columns_local  = (double)a->N;
1113   info->block_size     = (double)a->bs;
1114   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
1115   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->memory;
1116   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
1117   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->memory;
1118   if (flag == MAT_LOCAL) {
1119     info->nz_used      = isend[0];
1120     info->nz_allocated = isend[1];
1121     info->nz_unneeded  = isend[2];
1122     info->memory       = isend[3];
1123     info->mallocs      = isend[4];
1124   } else if (flag == MAT_GLOBAL_MAX) {
1125     MPI_Allreduce(isend,irecv,5,MPI_INT,MPI_MAX,matin->comm);
1126     info->nz_used      = irecv[0];
1127     info->nz_allocated = irecv[1];
1128     info->nz_unneeded  = irecv[2];
1129     info->memory       = irecv[3];
1130     info->mallocs      = irecv[4];
1131   } else if (flag == MAT_GLOBAL_SUM) {
1132     MPI_Allreduce(isend,irecv,5,MPI_INT,MPI_SUM,matin->comm);
1133     info->nz_used      = irecv[0];
1134     info->nz_allocated = irecv[1];
1135     info->nz_unneeded  = irecv[2];
1136     info->memory       = irecv[3];
1137     info->mallocs      = irecv[4];
1138   }
1139   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
1140   info->fill_ratio_needed = 0;
1141   info->factor_mallocs    = 0;
1142   return 0;
1143 }
1144 
1145 #undef __FUNC__
1146 #define __FUNC__ "MatSetOption_MPIBAIJ"
1147 int MatSetOption_MPIBAIJ(Mat A,MatOption op)
1148 {
1149   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1150 
1151   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
1152       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
1153       op == MAT_COLUMNS_UNSORTED ||
1154       op == MAT_COLUMNS_SORTED ||
1155       op == MAT_NEW_NONZERO_ALLOCATION_ERROR ||
1156       op == MAT_NEW_NONZERO_LOCATION_ERROR) {
1157         MatSetOption(a->A,op);
1158         MatSetOption(a->B,op);
1159   } else if (op == MAT_ROW_ORIENTED) {
1160         a->roworiented = 1;
1161         MatSetOption(a->A,op);
1162         MatSetOption(a->B,op);
1163   } else if (op == MAT_ROWS_SORTED ||
1164              op == MAT_ROWS_UNSORTED ||
1165              op == MAT_SYMMETRIC ||
1166              op == MAT_STRUCTURALLY_SYMMETRIC ||
1167              op == MAT_YES_NEW_DIAGONALS)
1168     PLogInfo(A,"Info:MatSetOption_MPIBAIJ:Option ignored\n");
1169   else if (op == MAT_COLUMN_ORIENTED) {
1170     a->roworiented = 0;
1171     MatSetOption(a->A,op);
1172     MatSetOption(a->B,op);
1173   } else if (op == MAT_IGNORE_OFF_PROC_ENTRIES) {
1174     a->donotstash = 1;
1175   } else if (op == MAT_NO_NEW_DIAGONALS)
1176     {SETERRQ(PETSC_ERR_SUP,0,"MAT_NO_NEW_DIAGONALS");}
1177   else
1178     {SETERRQ(PETSC_ERR_SUP,0,"unknown option");}
1179   return 0;
1180 }
1181 
1182 #undef __FUNC__
1183 #define __FUNC__ "MatTranspose_MPIBAIJ("
1184 int MatTranspose_MPIBAIJ(Mat A,Mat *matout)
1185 {
1186   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) A->data;
1187   Mat_SeqBAIJ *Aloc;
1188   Mat        B;
1189   int        ierr,M=baij->M,N=baij->N,*ai,*aj,row,i,*rvals,j,k,col;
1190   int        bs=baij->bs,mbs=baij->mbs;
1191   Scalar     *a;
1192 
1193   if (matout == PETSC_NULL && M != N)
1194     SETERRQ(1,0,"Square matrix only for in-place");
1195   ierr = MatCreateMPIBAIJ(A->comm,baij->bs,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);
1196   CHKERRQ(ierr);
1197 
1198   /* copy over the A part */
1199   Aloc = (Mat_SeqBAIJ*) baij->A->data;
1200   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1201   row = baij->rstart;
1202   rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
1203 
1204   for ( i=0; i<mbs; i++ ) {
1205     rvals[0] = bs*(baij->rstart + i);
1206     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1207     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1208       col = (baij->cstart+aj[j])*bs;
1209       for (k=0; k<bs; k++ ) {
1210         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1211         col++; a += bs;
1212       }
1213     }
1214   }
1215   /* copy over the B part */
1216   Aloc = (Mat_SeqBAIJ*) baij->B->data;
1217   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1218   row = baij->rstart*bs;
1219   for ( i=0; i<mbs; i++ ) {
1220     rvals[0] = bs*(baij->rstart + i);
1221     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1222     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1223       col = baij->garray[aj[j]]*bs;
1224       for (k=0; k<bs; k++ ) {
1225         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1226         col++; a += bs;
1227       }
1228     }
1229   }
1230   PetscFree(rvals);
1231   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1232   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1233 
1234   if (matout != PETSC_NULL) {
1235     *matout = B;
1236   } else {
1237     /* This isn't really an in-place transpose .... but free data structures from baij */
1238     PetscFree(baij->rowners);
1239     ierr = MatDestroy(baij->A); CHKERRQ(ierr);
1240     ierr = MatDestroy(baij->B); CHKERRQ(ierr);
1241     if (baij->colmap) PetscFree(baij->colmap);
1242     if (baij->garray) PetscFree(baij->garray);
1243     if (baij->lvec) VecDestroy(baij->lvec);
1244     if (baij->Mvctx) VecScatterDestroy(baij->Mvctx);
1245     PetscFree(baij);
1246     PetscMemcpy(A,B,sizeof(struct _p_Mat));
1247     PetscHeaderDestroy(B);
1248   }
1249   return 0;
1250 }
1251 
1252 #undef __FUNC__
1253 #define __FUNC__ "MatDiagonalScale_MPIBAIJ"
1254 int MatDiagonalScale_MPIBAIJ(Mat A,Vec ll,Vec rr)
1255 {
1256   Mat a = ((Mat_MPIBAIJ *) A->data)->A;
1257   Mat b = ((Mat_MPIBAIJ *) A->data)->B;
1258   int ierr,s1,s2,s3;
1259 
1260   if (ll)  {
1261     ierr = VecGetLocalSize(ll,&s1); CHKERRQ(ierr);
1262     ierr = MatGetLocalSize(A,&s2,&s3); CHKERRQ(ierr);
1263     if (s1!=s2) SETERRQ(1,0,"non-conforming local sizes");
1264     ierr = MatDiagonalScale(a,ll,0); CHKERRQ(ierr);
1265     ierr = MatDiagonalScale(b,ll,0); CHKERRQ(ierr);
1266   }
1267   if (rr) SETERRQ(1,0,"not supported for right vector");
1268   return 0;
1269 }
1270 
1271 /* the code does not do the diagonal entries correctly unless the
1272    matrix is square and the column and row owerships are identical.
1273    This is a BUG. The only way to fix it seems to be to access
1274    baij->A and baij->B directly and not through the MatZeroRows()
1275    routine.
1276 */
1277 #undef __FUNC__
1278 #define __FUNC__ "MatZeroRows_MPIBAIJ"
1279 int MatZeroRows_MPIBAIJ(Mat A,IS is,Scalar *diag)
1280 {
1281   Mat_MPIBAIJ    *l = (Mat_MPIBAIJ *) A->data;
1282   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
1283   int            *procs,*nprocs,j,found,idx,nsends,*work;
1284   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
1285   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
1286   int            *lens,imdex,*lrows,*values,bs=l->bs;
1287   MPI_Comm       comm = A->comm;
1288   MPI_Request    *send_waits,*recv_waits;
1289   MPI_Status     recv_status,*send_status;
1290   IS             istmp;
1291 
1292   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
1293   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
1294 
1295   /*  first count number of contributors to each processor */
1296   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
1297   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
1298   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
1299   for ( i=0; i<N; i++ ) {
1300     idx = rows[i];
1301     found = 0;
1302     for ( j=0; j<size; j++ ) {
1303       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
1304         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
1305       }
1306     }
1307     if (!found) SETERRQ(1,0,"Index out of range");
1308   }
1309   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
1310 
1311   /* inform other processors of number of messages and max length*/
1312   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
1313   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
1314   nrecvs = work[rank];
1315   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
1316   nmax = work[rank];
1317   PetscFree(work);
1318 
1319   /* post receives:   */
1320   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
1321   CHKPTRQ(rvalues);
1322   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
1323   CHKPTRQ(recv_waits);
1324   for ( i=0; i<nrecvs; i++ ) {
1325     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
1326   }
1327 
1328   /* do sends:
1329       1) starts[i] gives the starting index in svalues for stuff going to
1330          the ith processor
1331   */
1332   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
1333   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
1334   CHKPTRQ(send_waits);
1335   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
1336   starts[0] = 0;
1337   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1338   for ( i=0; i<N; i++ ) {
1339     svalues[starts[owner[i]]++] = rows[i];
1340   }
1341   ISRestoreIndices(is,&rows);
1342 
1343   starts[0] = 0;
1344   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1345   count = 0;
1346   for ( i=0; i<size; i++ ) {
1347     if (procs[i]) {
1348       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
1349     }
1350   }
1351   PetscFree(starts);
1352 
1353   base = owners[rank]*bs;
1354 
1355   /*  wait on receives */
1356   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
1357   source = lens + nrecvs;
1358   count  = nrecvs; slen = 0;
1359   while (count) {
1360     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
1361     /* unpack receives into our local space */
1362     MPI_Get_count(&recv_status,MPI_INT,&n);
1363     source[imdex]  = recv_status.MPI_SOURCE;
1364     lens[imdex]  = n;
1365     slen += n;
1366     count--;
1367   }
1368   PetscFree(recv_waits);
1369 
1370   /* move the data into the send scatter */
1371   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
1372   count = 0;
1373   for ( i=0; i<nrecvs; i++ ) {
1374     values = rvalues + i*nmax;
1375     for ( j=0; j<lens[i]; j++ ) {
1376       lrows[count++] = values[j] - base;
1377     }
1378   }
1379   PetscFree(rvalues); PetscFree(lens);
1380   PetscFree(owner); PetscFree(nprocs);
1381 
1382   /* actually zap the local rows */
1383   ierr = ISCreateGeneral(PETSC_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
1384   PLogObjectParent(A,istmp);
1385   PetscFree(lrows);
1386   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
1387   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
1388   ierr = ISDestroy(istmp); CHKERRQ(ierr);
1389 
1390   /* wait on sends */
1391   if (nsends) {
1392     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
1393     CHKPTRQ(send_status);
1394     MPI_Waitall(nsends,send_waits,send_status);
1395     PetscFree(send_status);
1396   }
1397   PetscFree(send_waits); PetscFree(svalues);
1398 
1399   return 0;
1400 }
1401 extern int MatPrintHelp_SeqBAIJ(Mat);
1402 #undef __FUNC__
1403 #define __FUNC__ "MatPrintHelp_MPIBAIJ"
1404 int MatPrintHelp_MPIBAIJ(Mat A)
1405 {
1406   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1407 
1408   if (!a->rank) return MatPrintHelp_SeqBAIJ(a->A);
1409   else return 0;
1410 }
1411 
1412 #undef __FUNC__
1413 #define __FUNC__ "MatSetUnfactored_MPIBAIJ"
1414 int MatSetUnfactored_MPIBAIJ(Mat A)
1415 {
1416   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1417   int         ierr;
1418   ierr = MatSetUnfactored(a->A); CHKERRQ(ierr);
1419   return 0;
1420 }
1421 
1422 static int MatConvertSameType_MPIBAIJ(Mat,Mat *,int);
1423 
1424 /* -------------------------------------------------------------------*/
1425 static struct _MatOps MatOps = {
1426   MatSetValues_MPIBAIJ,MatGetRow_MPIBAIJ,MatRestoreRow_MPIBAIJ,MatMult_MPIBAIJ,
1427   MatMultAdd_MPIBAIJ,MatMultTrans_MPIBAIJ,MatMultTransAdd_MPIBAIJ,0,
1428   0,0,0,0,
1429   0,0,MatTranspose_MPIBAIJ,MatGetInfo_MPIBAIJ,
1430   0,MatGetDiagonal_MPIBAIJ,MatDiagonalScale_MPIBAIJ,MatNorm_MPIBAIJ,
1431   MatAssemblyBegin_MPIBAIJ,MatAssemblyEnd_MPIBAIJ,0,MatSetOption_MPIBAIJ,
1432   MatZeroEntries_MPIBAIJ,MatZeroRows_MPIBAIJ,0,
1433   0,0,0,MatGetSize_MPIBAIJ,
1434   MatGetLocalSize_MPIBAIJ,MatGetOwnershipRange_MPIBAIJ,0,0,
1435   0,0,MatConvertSameType_MPIBAIJ,0,0,
1436   0,0,0,MatGetSubMatrices_MPIBAIJ,
1437   MatIncreaseOverlap_MPIBAIJ,MatGetValues_MPIBAIJ,0,MatPrintHelp_MPIBAIJ,
1438   MatScale_MPIBAIJ,0,0,0,MatGetBlockSize_MPIBAIJ,
1439   0,0,0,0,0,0,MatSetUnfactored_MPIBAIJ,0,MatSetValuesBlocked_MPIBAIJ};
1440 
1441 
1442 #undef __FUNC__
1443 #define __FUNC__ "MatCreateMPIBAIJ"
1444 /*@C
1445    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
1446    (block compressed row).  For good matrix assembly performance
1447    the user should preallocate the matrix storage by setting the parameters
1448    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1449    performance can be increased by more than a factor of 50.
1450 
1451    Input Parameters:
1452 .  comm - MPI communicator
1453 .  bs   - size of blockk
1454 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1455            This value should be the same as the local size used in creating the
1456            y vector for the matrix-vector product y = Ax.
1457 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1458            This value should be the same as the local size used in creating the
1459            x vector for the matrix-vector product y = Ax.
1460 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1461 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1462 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
1463            submatrix  (same for all local rows)
1464 .  d_nzz - array containing the number of block nonzeros in the various block rows
1465            of the in diagonal portion of the local (possibly different for each block
1466            row) or PETSC_NULL.  You must leave room for the diagonal entry even if
1467            it is zero.
1468 .  o_nz  - number of block nonzeros per block row in the off-diagonal portion of local
1469            submatrix (same for all local rows).
1470 .  o_nzz - array containing the number of nonzeros in the various block rows of the
1471            off-diagonal portion of the local submatrix (possibly different for
1472            each block row) or PETSC_NULL.
1473 
1474    Output Parameter:
1475 .  A - the matrix
1476 
1477    Notes:
1478    The user MUST specify either the local or global matrix dimensions
1479    (possibly both).
1480 
1481    Storage Information:
1482    For a square global matrix we define each processor's diagonal portion
1483    to be its local rows and the corresponding columns (a square submatrix);
1484    each processor's off-diagonal portion encompasses the remainder of the
1485    local matrix (a rectangular submatrix).
1486 
1487    The user can specify preallocated storage for the diagonal part of
1488    the local submatrix with either d_nz or d_nnz (not both).  Set
1489    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1490    memory allocation.  Likewise, specify preallocated storage for the
1491    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1492 
1493    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1494    the figure below we depict these three local rows and all columns (0-11).
1495 
1496 $          0 1 2 3 4 5 6 7 8 9 10 11
1497 $         -------------------
1498 $  row 3  |  o o o d d d o o o o o o
1499 $  row 4  |  o o o d d d o o o o o o
1500 $  row 5  |  o o o d d d o o o o o o
1501 $         -------------------
1502 $
1503 
1504    Thus, any entries in the d locations are stored in the d (diagonal)
1505    submatrix, and any entries in the o locations are stored in the
1506    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1507    stored simply in the MATSEQBAIJ format for compressed row storage.
1508 
1509    Now d_nz should indicate the number of nonzeros per row in the d matrix,
1510    and o_nz should indicate the number of nonzeros per row in the o matrix.
1511    In general, for PDE problems in which most nonzeros are near the diagonal,
1512    one expects d_nz >> o_nz.   For large problems you MUST preallocate memory
1513    or you will get TERRIBLE performance; see the users' manual chapter on
1514    matrices.
1515 
1516 .keywords: matrix, block, aij, compressed row, sparse, parallel
1517 
1518 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues()
1519 @*/
1520 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
1521                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1522 {
1523   Mat          B;
1524   Mat_MPIBAIJ  *b;
1525   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE,size;
1526 
1527   if (bs < 1) SETERRQ(1,0,"invalid block size specified");
1528   *A = 0;
1529   PetscHeaderCreate(B,_p_Mat,MAT_COOKIE,MATMPIBAIJ,comm);
1530   PLogObjectCreate(B);
1531   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
1532   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
1533   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1534   MPI_Comm_size(comm,&size);
1535   if (size == 1) {
1536     B->ops.getrowij          = MatGetRowIJ_MPIBAIJ;
1537     B->ops.restorerowij      = MatRestoreRowIJ_MPIBAIJ;
1538     B->ops.lufactorsymbolic  = MatLUFactorSymbolic_MPIBAIJ;
1539     B->ops.lufactornumeric   = MatLUFactorNumeric_MPIBAIJ;
1540     B->ops.lufactor          = MatLUFactor_MPIBAIJ;
1541     B->ops.solve             = MatSolve_MPIBAIJ;
1542     B->ops.solveadd          = MatSolveAdd_MPIBAIJ;
1543     B->ops.solvetrans        = MatSolveTrans_MPIBAIJ;
1544     B->ops.solvetransadd     = MatSolveTransAdd_MPIBAIJ;
1545     B->ops.ilufactorsymbolic = MatILUFactorSymbolic_MPIBAIJ;
1546   }
1547 
1548   B->destroy    = MatDestroy_MPIBAIJ;
1549   B->view       = MatView_MPIBAIJ;
1550   B->mapping    = 0;
1551   B->factor     = 0;
1552   B->assembled  = PETSC_FALSE;
1553 
1554   B->insertmode = NOT_SET_VALUES;
1555   MPI_Comm_rank(comm,&b->rank);
1556   MPI_Comm_size(comm,&b->size);
1557 
1558   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
1559     SETERRQ(1,0,"Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1560   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) SETERRQ(1,0,"either M or m should be specified");
1561   if ( M == PETSC_DECIDE && n == PETSC_DECIDE)SETERRQ(1,0,"either N or n should be specified");
1562   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
1563   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
1564 
1565   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1566     work[0] = m; work[1] = n;
1567     mbs = m/bs; nbs = n/bs;
1568     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
1569     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
1570     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
1571   }
1572   if (m == PETSC_DECIDE) {
1573     Mbs = M/bs;
1574     if (Mbs*bs != M) SETERRQ(1,0,"No of global rows must be divisible by blocksize");
1575     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
1576     m   = mbs*bs;
1577   }
1578   if (n == PETSC_DECIDE) {
1579     Nbs = N/bs;
1580     if (Nbs*bs != N) SETERRQ(1,0,"No of global cols must be divisible by blocksize");
1581     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
1582     n   = nbs*bs;
1583   }
1584   if (mbs*bs != m || nbs*bs != n) SETERRQ(1,0,"No of local rows, cols must be divisible by blocksize");
1585 
1586   b->m = m; B->m = m;
1587   b->n = n; B->n = n;
1588   b->N = N; B->N = N;
1589   b->M = M; B->M = M;
1590   b->bs  = bs;
1591   b->bs2 = bs*bs;
1592   b->mbs = mbs;
1593   b->nbs = nbs;
1594   b->Mbs = Mbs;
1595   b->Nbs = Nbs;
1596 
1597   /* build local table of row and column ownerships */
1598   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1599   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIBAIJ));
1600   b->cowners = b->rowners + b->size + 2;
1601   MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
1602   b->rowners[0] = 0;
1603   for ( i=2; i<=b->size; i++ ) {
1604     b->rowners[i] += b->rowners[i-1];
1605   }
1606   b->rstart    = b->rowners[b->rank];
1607   b->rend      = b->rowners[b->rank+1];
1608   b->rstart_bs = b->rstart * bs;
1609   b->rend_bs   = b->rend * bs;
1610 
1611   MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
1612   b->cowners[0] = 0;
1613   for ( i=2; i<=b->size; i++ ) {
1614     b->cowners[i] += b->cowners[i-1];
1615   }
1616   b->cstart    = b->cowners[b->rank];
1617   b->cend      = b->cowners[b->rank+1];
1618   b->cstart_bs = b->cstart * bs;
1619   b->cend_bs   = b->cend * bs;
1620 
1621   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1622   ierr = MatCreateSeqBAIJ(PETSC_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1623   PLogObjectParent(B,b->A);
1624   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1625   ierr = MatCreateSeqBAIJ(PETSC_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1626   PLogObjectParent(B,b->B);
1627 
1628   /* build cache for off array entries formed */
1629   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1630   b->donotstash  = 0;
1631   b->colmap      = 0;
1632   b->garray      = 0;
1633   b->roworiented = 1;
1634 
1635   /* stuff used in block assembly */
1636   b->barray      = 0;
1637 
1638   /* stuff used for matrix vector multiply */
1639   b->lvec        = 0;
1640   b->Mvctx       = 0;
1641 
1642   /* stuff for MatGetRow() */
1643   b->rowindices   = 0;
1644   b->rowvalues    = 0;
1645   b->getrowactive = PETSC_FALSE;
1646 
1647   *A = B;
1648   return 0;
1649 }
1650 
1651 #undef __FUNC__
1652 #define __FUNC__ "MatConvertSameType_MPIBAIJ"
1653 static int MatConvertSameType_MPIBAIJ(Mat matin,Mat *newmat,int cpvalues)
1654 {
1655   Mat         mat;
1656   Mat_MPIBAIJ *a,*oldmat = (Mat_MPIBAIJ *) matin->data;
1657   int         ierr, len=0, flg;
1658 
1659   *newmat       = 0;
1660   PetscHeaderCreate(mat,_p_Mat,MAT_COOKIE,MATMPIBAIJ,matin->comm);
1661   PLogObjectCreate(mat);
1662   mat->data       = (void *) (a = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(a);
1663   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1664   mat->destroy    = MatDestroy_MPIBAIJ;
1665   mat->view       = MatView_MPIBAIJ;
1666   mat->factor     = matin->factor;
1667   mat->assembled  = PETSC_TRUE;
1668 
1669   a->m = mat->m   = oldmat->m;
1670   a->n = mat->n   = oldmat->n;
1671   a->M = mat->M   = oldmat->M;
1672   a->N = mat->N   = oldmat->N;
1673 
1674   a->bs  = oldmat->bs;
1675   a->bs2 = oldmat->bs2;
1676   a->mbs = oldmat->mbs;
1677   a->nbs = oldmat->nbs;
1678   a->Mbs = oldmat->Mbs;
1679   a->Nbs = oldmat->Nbs;
1680 
1681   a->rstart       = oldmat->rstart;
1682   a->rend         = oldmat->rend;
1683   a->cstart       = oldmat->cstart;
1684   a->cend         = oldmat->cend;
1685   a->size         = oldmat->size;
1686   a->rank         = oldmat->rank;
1687   mat->insertmode = NOT_SET_VALUES;
1688   a->rowvalues    = 0;
1689   a->getrowactive = PETSC_FALSE;
1690   a->barray       = 0;
1691 
1692   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1693   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIBAIJ));
1694   a->cowners = a->rowners + a->size + 2;
1695   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1696   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1697   if (oldmat->colmap) {
1698     a->colmap = (int *) PetscMalloc((a->Nbs)*sizeof(int));CHKPTRQ(a->colmap);
1699     PLogObjectMemory(mat,(a->Nbs)*sizeof(int));
1700     PetscMemcpy(a->colmap,oldmat->colmap,(a->Nbs)*sizeof(int));
1701   } else a->colmap = 0;
1702   if (oldmat->garray && (len = ((Mat_SeqBAIJ *) (oldmat->B->data))->nbs)) {
1703     a->garray = (int *) PetscMalloc(len*sizeof(int)); CHKPTRQ(a->garray);
1704     PLogObjectMemory(mat,len*sizeof(int));
1705     PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1706   } else a->garray = 0;
1707 
1708   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1709   PLogObjectParent(mat,a->lvec);
1710   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1711   PLogObjectParent(mat,a->Mvctx);
1712   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1713   PLogObjectParent(mat,a->A);
1714   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1715   PLogObjectParent(mat,a->B);
1716   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1717   if (flg) {
1718     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1719   }
1720   *newmat = mat;
1721   return 0;
1722 }
1723 
1724 #include "sys.h"
1725 
1726 #undef __FUNC__
1727 #define __FUNC__ "MatLoad_MPIBAIJ"
1728 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
1729 {
1730   Mat          A;
1731   int          i, nz, ierr, j,rstart, rend, fd;
1732   Scalar       *vals,*buf;
1733   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1734   MPI_Status   status;
1735   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
1736   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
1737   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,bs2,Mbs,mbs,extra_rows;
1738   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
1739   int          dcount,kmax,k,nzcount,tmp;
1740 
1741 
1742   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
1743   bs2  = bs*bs;
1744 
1745   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1746   if (!rank) {
1747     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1748     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
1749     if (header[0] != MAT_COOKIE) SETERRQ(1,0,"not matrix object");
1750   }
1751 
1752   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1753   M = header[1]; N = header[2];
1754 
1755   if (M != N) SETERRQ(1,0,"Can only do square matrices");
1756 
1757   /*
1758      This code adds extra rows to make sure the number of rows is
1759      divisible by the blocksize
1760   */
1761   Mbs        = M/bs;
1762   extra_rows = bs - M + bs*(Mbs);
1763   if (extra_rows == bs) extra_rows = 0;
1764   else                  Mbs++;
1765   if (extra_rows &&!rank) {
1766     PLogInfo(0,"MatLoad_MPIBAIJ:Padding loaded matrix to match blocksize\n");
1767   }
1768 
1769   /* determine ownership of all rows */
1770   mbs = Mbs/size + ((Mbs % size) > rank);
1771   m   = mbs * bs;
1772   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
1773   browners = rowners + size + 1;
1774   MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1775   rowners[0] = 0;
1776   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
1777   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
1778   rstart = rowners[rank];
1779   rend   = rowners[rank+1];
1780 
1781   /* distribute row lengths to all processors */
1782   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
1783   if (!rank) {
1784     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
1785     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1786     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
1787     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1788     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
1789     MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);
1790     PetscFree(sndcounts);
1791   }
1792   else {
1793     MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);
1794   }
1795 
1796   if (!rank) {
1797     /* calculate the number of nonzeros on each processor */
1798     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1799     PetscMemzero(procsnz,size*sizeof(int));
1800     for ( i=0; i<size; i++ ) {
1801       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
1802         procsnz[i] += rowlengths[j];
1803       }
1804     }
1805     PetscFree(rowlengths);
1806 
1807     /* determine max buffer needed and allocate it */
1808     maxnz = 0;
1809     for ( i=0; i<size; i++ ) {
1810       maxnz = PetscMax(maxnz,procsnz[i]);
1811     }
1812     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1813 
1814     /* read in my part of the matrix column indices  */
1815     nz = procsnz[0];
1816     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1817     mycols = ibuf;
1818     if (size == 1)  nz -= extra_rows;
1819     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1820     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
1821 
1822     /* read in every ones (except the last) and ship off */
1823     for ( i=1; i<size-1; i++ ) {
1824       nz = procsnz[i];
1825       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1826       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1827     }
1828     /* read in the stuff for the last proc */
1829     if ( size != 1 ) {
1830       nz = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
1831       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1832       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
1833       MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);
1834     }
1835     PetscFree(cols);
1836   }
1837   else {
1838     /* determine buffer space needed for message */
1839     nz = 0;
1840     for ( i=0; i<m; i++ ) {
1841       nz += locrowlens[i];
1842     }
1843     ibuf = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1844     mycols = ibuf;
1845     /* receive message of column indices*/
1846     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1847     MPI_Get_count(&status,MPI_INT,&maxnz);
1848     if (maxnz != nz) SETERRQ(1,0,"something is wrong with file");
1849   }
1850 
1851   /* loop over local rows, determining number of off diagonal entries */
1852   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
1853   odlens = dlens + (rend-rstart);
1854   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
1855   PetscMemzero(mask,3*Mbs*sizeof(int));
1856   masked1 = mask    + Mbs;
1857   masked2 = masked1 + Mbs;
1858   rowcount = 0; nzcount = 0;
1859   for ( i=0; i<mbs; i++ ) {
1860     dcount  = 0;
1861     odcount = 0;
1862     for ( j=0; j<bs; j++ ) {
1863       kmax = locrowlens[rowcount];
1864       for ( k=0; k<kmax; k++ ) {
1865         tmp = mycols[nzcount++]/bs;
1866         if (!mask[tmp]) {
1867           mask[tmp] = 1;
1868           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
1869           else masked1[dcount++] = tmp;
1870         }
1871       }
1872       rowcount++;
1873     }
1874 
1875     dlens[i]  = dcount;
1876     odlens[i] = odcount;
1877 
1878     /* zero out the mask elements we set */
1879     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
1880     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
1881   }
1882 
1883   /* create our matrix */
1884   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);
1885          CHKERRQ(ierr);
1886   A = *newmat;
1887   MatSetOption(A,MAT_COLUMNS_SORTED);
1888 
1889   if (!rank) {
1890     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
1891     /* read in my part of the matrix numerical values  */
1892     nz = procsnz[0];
1893     vals = buf;
1894     mycols = ibuf;
1895     if (size == 1)  nz -= extra_rows;
1896     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1897     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
1898 
1899     /* insert into matrix */
1900     jj      = rstart*bs;
1901     for ( i=0; i<m; i++ ) {
1902       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1903       mycols += locrowlens[i];
1904       vals   += locrowlens[i];
1905       jj++;
1906     }
1907     /* read in other processors (except the last one) and ship out */
1908     for ( i=1; i<size-1; i++ ) {
1909       nz = procsnz[i];
1910       vals = buf;
1911       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1912       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1913     }
1914     /* the last proc */
1915     if ( size != 1 ){
1916       nz = procsnz[i] - extra_rows;
1917       vals = buf;
1918       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1919       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
1920       MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);
1921     }
1922     PetscFree(procsnz);
1923   }
1924   else {
1925     /* receive numeric values */
1926     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
1927 
1928     /* receive message of values*/
1929     vals = buf;
1930     mycols = ibuf;
1931     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1932     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1933     if (maxnz != nz) SETERRQ(1,0,"something is wrong with file");
1934 
1935     /* insert into matrix */
1936     jj      = rstart*bs;
1937     for ( i=0; i<m; i++ ) {
1938       ierr    = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1939       mycols += locrowlens[i];
1940       vals   += locrowlens[i];
1941       jj++;
1942     }
1943   }
1944   PetscFree(locrowlens);
1945   PetscFree(buf);
1946   PetscFree(ibuf);
1947   PetscFree(rowners);
1948   PetscFree(dlens);
1949   PetscFree(mask);
1950   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1951   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1952   return 0;
1953 }
1954 
1955 
1956