xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision 83e2fdc756c13f964f3ed5aaff42e3626c5b72bc)
1 #ifdef PETSC_RCS_HEADER
2 static char vcid[] = "$Id: mpibaij.c,v 1.74 1997/07/28 16:13:24 bsmith Exp bsmith $";
3 #endif
4 
5 #include "pinclude/pviewer.h"
6 #include "src/mat/impls/baij/mpi/mpibaij.h"
7 #include "src/vec/vecimpl.h"
8 
9 
10 extern int MatSetUpMultiply_MPIBAIJ(Mat);
11 extern int DisAssemble_MPIBAIJ(Mat);
12 extern int MatIncreaseOverlap_MPIBAIJ(Mat,int,IS *,int);
13 extern int MatGetSubMatrices_MPIBAIJ(Mat,int,IS *,IS *,MatGetSubMatrixCall,Mat **);
14 extern int MatLUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,Mat *);
15 extern int MatLUFactorNumeric_MPIBAIJ(Mat,Mat *);
16 extern int MatLUFactor_MPIBAIJ(Mat,IS,IS,double);
17 extern int MatSolve_MPIBAIJ(Mat,Vec,Vec);
18 extern int MatSolveAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
19 extern int MatSolveTrans_MPIBAIJ(Mat,Vec,Vec);
20 extern int MatSolveTransAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
21 extern int MatILUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,int,Mat *);
22 
23 
24 /*
25      Local utility routine that creates a mapping from the global column
26    number to the local number in the off-diagonal part of the local
27    storage of the matrix.  This is done in a non scable way since the
28    length of colmap equals the global matrix length.
29 */
30 #undef __FUNC__
31 #define __FUNC__ "CreateColmap_MPIBAIJ_Private"
32 static int CreateColmap_MPIBAIJ_Private(Mat mat)
33 {
34   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
35   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
36   int         nbs = B->nbs,i,bs=B->bs;;
37 
38   baij->colmap = (int *) PetscMalloc((baij->Nbs+1)*sizeof(int));CHKPTRQ(baij->colmap);
39   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
40   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
41   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i*bs+1;
42   return 0;
43 }
44 
45 #undef __FUNC__
46 #define __FUNC__ "MatGetRowIJ_MPIBAIJ("
47 static int MatGetRowIJ_MPIBAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
48                             PetscTruth *done)
49 {
50   Mat_MPIBAIJ *aij = (Mat_MPIBAIJ *) mat->data;
51   int         ierr;
52   if (aij->size == 1) {
53     ierr = MatGetRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
54   } else SETERRQ(1,0,"not supported in parallel");
55   return 0;
56 }
57 
58 #undef __FUNC__
59 #define __FUNC__ "MatRestoreRowIJ_MPIBAIJ"
60 static int MatRestoreRowIJ_MPIBAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
61                                 PetscTruth *done)
62 {
63   Mat_MPIBAIJ *aij = (Mat_MPIBAIJ *) mat->data;
64   int        ierr;
65   if (aij->size == 1) {
66     ierr = MatRestoreRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
67   } else SETERRQ(1,0,"not supported in parallel");
68   return 0;
69 }
70 #define CHUNKSIZE  10
71 
72 #define  MatSetValues_SeqBAIJ_A_Private(row,col,value,addv) \
73 { \
74  \
75     brow = row/bs;  \
76     rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
77     rmax = aimax[brow]; nrow = ailen[brow]; \
78       bcol = col/bs; \
79       ridx = row % bs; cidx = col % bs; \
80       low = 0; high = nrow; \
81       while (high-low > 3) { \
82         t = (low+high)/2; \
83         if (rp[t] > bcol) high = t; \
84         else              low  = t; \
85       } \
86       for ( _i=low; _i<high; _i++ ) { \
87         if (rp[_i] > bcol) break; \
88         if (rp[_i] == bcol) { \
89           bap  = ap +  bs2*_i + bs*cidx + ridx; \
90           if (addv == ADD_VALUES) *bap += value;  \
91           else                    *bap  = value;  \
92           goto a_noinsert; \
93         } \
94       } \
95       if (a->nonew == 1) goto a_noinsert; \
96       else if (a->nonew == -1) SETERRQ(1,0,"Inserting a new nonzero in the matrix"); \
97       if (nrow >= rmax) { \
98         /* there is no extra room in row, therefore enlarge */ \
99         int    new_nz = ai[a->mbs] + CHUNKSIZE,len,*new_i,*new_j; \
100         Scalar *new_a; \
101  \
102         if (a->nonew == -2) SETERRQ(1,0,"Inserting a new nonzero in the matrix"); \
103  \
104         /* malloc new storage space */ \
105         len     = new_nz*(sizeof(int)+bs2*sizeof(Scalar))+(a->mbs+1)*sizeof(int); \
106         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
107         new_j   = (int *) (new_a + bs2*new_nz); \
108         new_i   = new_j + new_nz; \
109  \
110         /* copy over old data into new slots */ \
111         for ( ii=0; ii<brow+1; ii++ ) {new_i[ii] = ai[ii];} \
112         for ( ii=brow+1; ii<a->mbs+1; ii++ ) {new_i[ii] = ai[ii]+CHUNKSIZE;} \
113         PetscMemcpy(new_j,aj,(ai[brow]+nrow)*sizeof(int)); \
114         len = (new_nz - CHUNKSIZE - ai[brow] - nrow); \
115         PetscMemcpy(new_j+ai[brow]+nrow+CHUNKSIZE,aj+ai[brow]+nrow, \
116                                                            len*sizeof(int)); \
117         PetscMemcpy(new_a,aa,(ai[brow]+nrow)*bs2*sizeof(Scalar)); \
118         PetscMemzero(new_a+bs2*(ai[brow]+nrow),bs2*CHUNKSIZE*sizeof(Scalar)); \
119         PetscMemcpy(new_a+bs2*(ai[brow]+nrow+CHUNKSIZE), \
120                     aa+bs2*(ai[brow]+nrow),bs2*len*sizeof(Scalar));  \
121         /* free up old matrix storage */ \
122         PetscFree(a->a);  \
123         if (!a->singlemalloc) {PetscFree(a->i);PetscFree(a->j);} \
124         aa = a->a = new_a; ai = a->i = new_i; aj = a->j = new_j;  \
125         a->singlemalloc = 1; \
126  \
127         rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
128         rmax = aimax[brow] = aimax[brow] + CHUNKSIZE; \
129         PLogObjectMemory(A,CHUNKSIZE*(sizeof(int) + bs2*sizeof(Scalar))); \
130         a->maxnz += bs2*CHUNKSIZE; \
131         a->reallocs++; \
132         a->nz++; \
133       } \
134       N = nrow++ - 1;  \
135       /* shift up all the later entries in this row */ \
136       for ( ii=N; ii>=_i; ii-- ) { \
137         rp[ii+1] = rp[ii]; \
138         PetscMemcpy(ap+bs2*(ii+1),ap+bs2*(ii),bs2*sizeof(Scalar)); \
139       } \
140       if (N>=_i) PetscMemzero(ap+bs2*_i,bs2*sizeof(Scalar));  \
141       rp[_i]                      = bcol;  \
142       ap[bs2*_i + bs*cidx + ridx] = value;  \
143       a_noinsert:; \
144     ailen[brow] = nrow; \
145 }
146 
147 #define  MatSetValues_SeqBAIJ_B_Private(row,col,value,addv) \
148 { \
149  \
150     brow = row/bs;  \
151     rp   = bj + bi[brow]; ap = ba + bs2*bi[brow]; \
152     rmax = bimax[brow]; nrow = bilen[brow]; \
153       bcol = col/bs; \
154       ridx = row % bs; cidx = col % bs; \
155       low = 0; high = nrow; \
156       while (high-low > 3) { \
157         t = (low+high)/2; \
158         if (rp[t] > bcol) high = t; \
159         else              low  = t; \
160       } \
161       for ( _i=low; _i<high; _i++ ) { \
162         if (rp[_i] > bcol) break; \
163         if (rp[_i] == bcol) { \
164           bap  = ap +  bs2*_i + bs*cidx + ridx; \
165           if (addv == ADD_VALUES) *bap += value;  \
166           else                    *bap  = value;  \
167           goto b_noinsert; \
168         } \
169       } \
170       if (b->nonew == 1) goto b_noinsert; \
171       else if (b->nonew == -1) SETERRQ(1,0,"Inserting a new nonzero in the matrix"); \
172       if (nrow >= rmax) { \
173         /* there is no extra room in row, therefore enlarge */ \
174         int    new_nz = bi[b->mbs] + CHUNKSIZE,len,*new_i,*new_j; \
175         Scalar *new_a; \
176  \
177         if (b->nonew == -2) SETERRQ(1,0,"Inserting a new nonzero in the matrix"); \
178  \
179         /* malloc new storage space */ \
180         len     = new_nz*(sizeof(int)+bs2*sizeof(Scalar))+(b->mbs+1)*sizeof(int); \
181         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
182         new_j   = (int *) (new_a + bs2*new_nz); \
183         new_i   = new_j + new_nz; \
184  \
185         /* copy over old data into new slots */ \
186         for ( ii=0; ii<brow+1; ii++ ) {new_i[ii] = bi[ii];} \
187         for ( ii=brow+1; ii<b->mbs+1; ii++ ) {new_i[ii] = bi[ii]+CHUNKSIZE;} \
188         PetscMemcpy(new_j,bj,(bi[brow]+nrow)*sizeof(int)); \
189         len = (new_nz - CHUNKSIZE - bi[brow] - nrow); \
190         PetscMemcpy(new_j+bi[brow]+nrow+CHUNKSIZE,bj+bi[brow]+nrow, \
191                                                            len*sizeof(int)); \
192         PetscMemcpy(new_a,ba,(bi[brow]+nrow)*bs2*sizeof(Scalar)); \
193         PetscMemzero(new_a+bs2*(bi[brow]+nrow),bs2*CHUNKSIZE*sizeof(Scalar)); \
194         PetscMemcpy(new_a+bs2*(bi[brow]+nrow+CHUNKSIZE), \
195                     ba+bs2*(bi[brow]+nrow),bs2*len*sizeof(Scalar));  \
196         /* free up old matrix storage */ \
197         PetscFree(b->a);  \
198         if (!b->singlemalloc) {PetscFree(b->i);PetscFree(b->j);} \
199         ba = b->a = new_a; bi = b->i = new_i; bj = b->j = new_j;  \
200         b->singlemalloc = 1; \
201  \
202         rp   = bj + bi[brow]; ap = ba + bs2*bi[brow]; \
203         rmax = bimax[brow] = bimax[brow] + CHUNKSIZE; \
204         PLogObjectMemory(B,CHUNKSIZE*(sizeof(int) + bs2*sizeof(Scalar))); \
205         b->maxnz += bs2*CHUNKSIZE; \
206         b->reallocs++; \
207         b->nz++; \
208       } \
209       N = nrow++ - 1;  \
210       /* shift up all the later entries in this row */ \
211       for ( ii=N; ii>=_i; ii-- ) { \
212         rp[ii+1] = rp[ii]; \
213         PetscMemcpy(ap+bs2*(ii+1),ap+bs2*(ii),bs2*sizeof(Scalar)); \
214       } \
215       if (N>=_i) PetscMemzero(ap+bs2*_i,bs2*sizeof(Scalar));  \
216       rp[_i]                      = bcol;  \
217       ap[bs2*_i + bs*cidx + ridx] = value;  \
218       b_noinsert:; \
219     bilen[brow] = nrow; \
220 }
221 
222 #undef __FUNC__
223 #define __FUNC__ "MatSetValues_MPIBAIJ"
224 int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
225 {
226   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
227   Scalar      value;
228   int         ierr,i,j,row,col;
229   int         roworiented = baij->roworiented,rstart_orig=baij->rstart_bs ;
230   int         rend_orig=baij->rend_bs,cstart_orig=baij->cstart_bs;
231   int         cend_orig=baij->cend_bs,bs=baij->bs;
232 
233   /* Some Variables required in the macro */
234   Mat         A = baij->A;
235   Mat_SeqBAIJ *a = (Mat_SeqBAIJ *) (A)->data;
236   int         *aimax=a->imax,*ai=a->i,*ailen=a->ilen,*aj=a->j;
237   Scalar      *aa=a->a;
238 
239   Mat         B = baij->B;
240   Mat_SeqBAIJ *b = (Mat_SeqBAIJ *) (B)->data;
241   int         *bimax=b->imax,*bi=b->i,*bilen=b->ilen,*bj=b->j;
242   Scalar      *ba=b->a;
243 
244   int         *rp,ii,nrow,_i,rmax,N,brow,bcol;
245   int         low,high,t,ridx,cidx,bs2=a->bs2;
246   Scalar      *ap,*bap;
247 
248   for ( i=0; i<m; i++ ) {
249 #if defined(PETSC_BOPT_g)
250     if (im[i] < 0) SETERRQ(1,0,"Negative row");
251     if (im[i] >= baij->M) SETERRQ(1,0,"Row too large");
252 #endif
253     if (im[i] >= rstart_orig && im[i] < rend_orig) {
254       row = im[i] - rstart_orig;
255       for ( j=0; j<n; j++ ) {
256         if (in[j] >= cstart_orig && in[j] < cend_orig){
257           col = in[j] - cstart_orig;
258           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
259           MatSetValues_SeqBAIJ_A_Private(row,col,value,addv);
260           /* ierr = MatSetValues_SeqBAIJ(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
261         }
262 #if defined(PETSC_BOPT_g)
263         else if (in[j] < 0) {SETERRQ(1,0,"Negative column");}
264         else if (in[j] >= baij->N) {SETERRQ(1,0,"Col too large");}
265 #endif
266         else {
267           if (mat->was_assembled) {
268             if (!baij->colmap) {
269               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
270             }
271             col = baij->colmap[in[j]/bs] - 1 + in[j]%bs;
272             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
273               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
274               col =  in[j];
275               /* Reinitialize the variables required by MatSetValues_SeqBAIJ_B_Private() */
276               B = baij->B;
277               b = (Mat_SeqBAIJ *) (B)->data;
278               bimax=b->imax;bi=b->i;bilen=b->ilen;bj=b->j;
279               ba=b->a;
280             }
281           }
282           else col = in[j];
283           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
284           MatSetValues_SeqBAIJ_B_Private(row,col,value,addv);
285           /* ierr = MatSetValues_SeqBAIJ(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
286         }
287       }
288     }
289     else {
290       if (roworiented && !baij->donotstash) {
291         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
292       }
293       else {
294         if (!baij->donotstash) {
295           row = im[i];
296 	  for ( j=0; j<n; j++ ) {
297 	    ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
298           }
299         }
300       }
301     }
302   }
303   return 0;
304 }
305 
306 extern int MatSetValuesBlocked_SeqBAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
307 #undef __FUNC__
308 #define __FUNC__ "MatSetValuesBlocked_MPIBAIJ"
309 int MatSetValuesBlocked_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
310 {
311   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
312   Scalar      *value,*barray=baij->barray;
313   int         ierr,i,j,ii,jj,row,col,k,l;
314   int         roworiented = baij->roworiented,rstart=baij->rstart ;
315   int         rend=baij->rend,cstart=baij->cstart,stepval;
316   int         cend=baij->cend,bs=baij->bs,bs2=baij->bs2;
317 
318 
319   if(!barray) {
320     baij->barray = barray = (Scalar*) PetscMalloc(bs2*sizeof(Scalar)); CHKPTRQ(barray);
321   }
322 
323   if (roworiented) {
324     stepval = (n-1)*bs;
325   } else {
326     stepval = (m-1)*bs;
327   }
328   for ( i=0; i<m; i++ ) {
329 #if defined(PETSC_BOPT_g)
330     if (im[i] < 0) SETERRQ(1,0,"Negative row");
331     if (im[i] >= baij->Mbs) SETERRQ(1,0,"Row too large");
332 #endif
333     if (im[i] >= rstart && im[i] < rend) {
334       row = im[i] - rstart;
335       for ( j=0; j<n; j++ ) {
336         /* If NumCol = 1 then a copy is not required */
337         if ((roworiented) && (n == 1)) {
338           barray = v + i*bs2;
339         } else if((!roworiented) && (m == 1)) {
340           barray = v + j*bs2;
341         } else { /* Here a copy is required */
342           if (roworiented) {
343             value = v + i*(stepval+bs)*bs + j*bs;
344           } else {
345             value = v + j*(stepval+bs)*bs + i*bs;
346           }
347           for ( ii=0; ii<bs; ii++,value+=stepval ) {
348             for (jj=0; jj<bs; jj++ ) {
349               *barray++  = *value++;
350             }
351           }
352           barray -=bs2;
353         }
354 
355         if (in[j] >= cstart && in[j] < cend){
356           col  = in[j] - cstart;
357           ierr = MatSetValuesBlocked_SeqBAIJ(baij->A,1,&row,1,&col,barray,addv);CHKERRQ(ierr);
358         }
359 #if defined(PETSC_BOPT_g)
360         else if (in[j] < 0) {SETERRQ(1,0,"Negative column");}
361         else if (in[j] >= baij->Nbs) {SETERRQ(1,0,"Column too large");}
362 #endif
363         else {
364           if (mat->was_assembled) {
365             if (!baij->colmap) {
366               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
367             }
368 
369 #if defined(PETSC_BOPT_g)
370             if ((baij->colmap[in[j]] - 1) % bs) {SETERRQ(1,0,"Incorrect colmap");}
371 #endif
372             col = (baij->colmap[in[j]] - 1)/bs;
373             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
374               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
375               col =  in[j];
376             }
377           }
378           else col = in[j];
379           ierr = MatSetValuesBlocked_SeqBAIJ(baij->B,1,&row,1,&col,barray,addv);CHKERRQ(ierr);
380         }
381       }
382     }
383     else {
384       if (!baij->donotstash) {
385         if (roworiented ) {
386           row   = im[i]*bs;
387           value = v + i*(stepval+bs)*bs;
388           for ( j=0; j<bs; j++,row++ ) {
389             for ( k=0; k<n; k++ ) {
390               for ( col=in[k]*bs,l=0; l<bs; l++,col++) {
391                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
392               }
393             }
394           }
395         }
396         else {
397           for ( j=0; j<n; j++ ) {
398             value = v + j*(stepval+bs)*bs + i*bs;
399             col   = in[j]*bs;
400             for ( k=0; k<bs; k++,col++,value+=stepval) {
401               for ( row = im[i]*bs,l=0; l<bs; l++,row++) {
402                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
403               }
404             }
405           }
406         }
407       }
408     }
409   }
410   return 0;
411 }
412 
413 #undef __FUNC__
414 #define __FUNC__ "MatGetValues_MPIBAIJ"
415 int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
416 {
417   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
418   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
419   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col;
420 
421   for ( i=0; i<m; i++ ) {
422     if (idxm[i] < 0) SETERRQ(1,0,"Negative row");
423     if (idxm[i] >= baij->M) SETERRQ(1,0,"Row too large");
424     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
425       row = idxm[i] - bsrstart;
426       for ( j=0; j<n; j++ ) {
427         if (idxn[j] < 0) SETERRQ(1,0,"Negative column");
428         if (idxn[j] >= baij->N) SETERRQ(1,0,"Col too large");
429         if (idxn[j] >= bscstart && idxn[j] < bscend){
430           col = idxn[j] - bscstart;
431           ierr = MatGetValues(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
432         }
433         else {
434           if (!baij->colmap) {
435             ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
436           }
437           if((baij->colmap[idxn[j]/bs]-1 < 0) ||
438              (baij->garray[(baij->colmap[idxn[j]/bs]-1)/bs] != idxn[j]/bs)) *(v+i*n+j) = 0.0;
439           else {
440             col  = (baij->colmap[idxn[j]/bs]-1) + idxn[j]%bs;
441             ierr = MatGetValues(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
442           }
443         }
444       }
445     }
446     else {
447       SETERRQ(1,0,"Only local values currently supported");
448     }
449   }
450   return 0;
451 }
452 
453 #undef __FUNC__
454 #define __FUNC__ "MatNorm_MPIBAIJ"
455 int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
456 {
457   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
458   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
459   int        ierr, i,bs2=baij->bs2;
460   double     sum = 0.0;
461   Scalar     *v;
462 
463   if (baij->size == 1) {
464     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
465   } else {
466     if (type == NORM_FROBENIUS) {
467       v = amat->a;
468       for (i=0; i<amat->nz*bs2; i++ ) {
469 #if defined(PETSC_COMPLEX)
470         sum += real(conj(*v)*(*v)); v++;
471 #else
472         sum += (*v)*(*v); v++;
473 #endif
474       }
475       v = bmat->a;
476       for (i=0; i<bmat->nz*bs2; i++ ) {
477 #if defined(PETSC_COMPLEX)
478         sum += real(conj(*v)*(*v)); v++;
479 #else
480         sum += (*v)*(*v); v++;
481 #endif
482       }
483       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
484       *norm = sqrt(*norm);
485     }
486     else
487       SETERRQ(PETSC_ERR_SUP,0,"No support for this norm yet");
488   }
489   return 0;
490 }
491 
492 #undef __FUNC__
493 #define __FUNC__ "MatAssemblyBegin_MPIBAIJ"
494 int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
495 {
496   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
497   MPI_Comm    comm = mat->comm;
498   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
499   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
500   MPI_Request *send_waits,*recv_waits;
501   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
502   InsertMode  addv;
503   Scalar      *rvalues,*svalues;
504 
505   /* make sure all processors are either in INSERTMODE or ADDMODE */
506   MPI_Allreduce(&mat->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
507   if (addv == (ADD_VALUES|INSERT_VALUES)) {
508     SETERRQ(1,0,"Some processors inserted others added");
509   }
510   mat->insertmode = addv; /* in case this processor had no cache */
511 
512   /*  first count number of contributors to each processor */
513   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
514   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
515   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
516   for ( i=0; i<baij->stash.n; i++ ) {
517     idx = baij->stash.idx[i];
518     for ( j=0; j<size; j++ ) {
519       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
520         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
521       }
522     }
523   }
524   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
525 
526   /* inform other processors of number of messages and max length*/
527   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
528   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
529   nreceives = work[rank];
530   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
531   nmax = work[rank];
532   PetscFree(work);
533 
534   /* post receives:
535        1) each message will consist of ordered pairs
536      (global index,value) we store the global index as a double
537      to simplify the message passing.
538        2) since we don't know how long each individual message is we
539      allocate the largest needed buffer for each receive. Potentially
540      this is a lot of wasted space.
541 
542 
543        This could be done better.
544   */
545   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
546   CHKPTRQ(rvalues);
547   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
548   CHKPTRQ(recv_waits);
549   for ( i=0; i<nreceives; i++ ) {
550     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
551               comm,recv_waits+i);
552   }
553 
554   /* do sends:
555       1) starts[i] gives the starting index in svalues for stuff going to
556          the ith processor
557   */
558   svalues = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
559   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
560   CHKPTRQ(send_waits);
561   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
562   starts[0] = 0;
563   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
564   for ( i=0; i<baij->stash.n; i++ ) {
565     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
566     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
567     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
568   }
569   PetscFree(owner);
570   starts[0] = 0;
571   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
572   count = 0;
573   for ( i=0; i<size; i++ ) {
574     if (procs[i]) {
575       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
576                 comm,send_waits+count++);
577     }
578   }
579   PetscFree(starts); PetscFree(nprocs);
580 
581   /* Free cache space */
582   PLogInfo(mat,"MatAssemblyBegin_MPIBAIJ:Number of off-processor values %d\n",baij->stash.n);
583   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
584 
585   baij->svalues    = svalues;    baij->rvalues    = rvalues;
586   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
587   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
588   baij->rmax       = nmax;
589 
590   return 0;
591 }
592 
593 
594 #undef __FUNC__
595 #define __FUNC__ "MatAssemblyEnd_MPIBAIJ"
596 int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
597 {
598   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
599   MPI_Status  *send_status,recv_status;
600   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
601   int         bs=baij->bs,row,col,other_disassembled;
602   Scalar      *values,val;
603   InsertMode  addv = mat->insertmode;
604 
605   /*  wait on receives */
606   while (count) {
607     MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);
608     /* unpack receives into our local space */
609     values = baij->rvalues + 3*imdex*baij->rmax;
610     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
611     n = n/3;
612     for ( i=0; i<n; i++ ) {
613       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
614       col = (int) PetscReal(values[3*i+1]);
615       val = values[3*i+2];
616       if (col >= baij->cstart*bs && col < baij->cend*bs) {
617         col -= baij->cstart*bs;
618         ierr = MatSetValues(baij->A,1,&row,1,&col,&val,addv); CHKERRQ(ierr)
619       }
620       else {
621         if (mat->was_assembled) {
622           if (!baij->colmap) {
623             ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);
624           }
625           col = (baij->colmap[col/bs]) - 1 + col%bs;
626           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
627             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
628             col = (int) PetscReal(values[3*i+1]);
629           }
630         }
631         ierr = MatSetValues(baij->B,1,&row,1,&col,&val,addv); CHKERRQ(ierr)
632       }
633     }
634     count--;
635   }
636   PetscFree(baij->recv_waits); PetscFree(baij->rvalues);
637 
638   /* wait on sends */
639   if (baij->nsends) {
640     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));
641     CHKPTRQ(send_status);
642     MPI_Waitall(baij->nsends,baij->send_waits,send_status);
643     PetscFree(send_status);
644   }
645   PetscFree(baij->send_waits); PetscFree(baij->svalues);
646 
647   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
648   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
649 
650   /* determine if any processor has disassembled, if so we must
651      also disassemble ourselfs, in order that we may reassemble. */
652   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
653   if (mat->was_assembled && !other_disassembled) {
654     ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
655   }
656 
657   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
658     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
659   }
660   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
661   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
662 
663   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
664   return 0;
665 }
666 
667 #undef __FUNC__
668 #define __FUNC__ "MatView_MPIBAIJ_Binary"
669 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
670 {
671   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
672   int          ierr;
673 
674   if (baij->size == 1) {
675     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
676   }
677   else SETERRQ(1,0,"Only uniprocessor output supported");
678   return 0;
679 }
680 
681 #undef __FUNC__
682 #define __FUNC__ "MatView_MPIBAIJ_ASCIIorDraworMatlab"
683 static int MatView_MPIBAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
684 {
685   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
686   int          ierr, format,rank,bs = baij->bs;
687   FILE         *fd;
688   ViewerType   vtype;
689 
690   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
691   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
692     ierr = ViewerGetFormat(viewer,&format);
693     if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
694       MatInfo info;
695       MPI_Comm_rank(mat->comm,&rank);
696       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
697       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
698       PetscSequentialPhaseBegin(mat->comm,1);
699       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
700               rank,baij->m,(int)info.nz_used*bs,(int)info.nz_allocated*bs,
701               baij->bs,(int)info.memory);
702       ierr = MatGetInfo(baij->A,MAT_LOCAL,&info);
703       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
704       ierr = MatGetInfo(baij->B,MAT_LOCAL,&info);
705       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
706       fflush(fd);
707       PetscSequentialPhaseEnd(mat->comm,1);
708       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
709       return 0;
710     }
711     else if (format == VIEWER_FORMAT_ASCII_INFO) {
712       PetscPrintf(mat->comm,"  block size is %d\n",bs);
713       return 0;
714     }
715   }
716 
717   if (vtype == DRAW_VIEWER) {
718     Draw       draw;
719     PetscTruth isnull;
720     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
721     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
722   }
723 
724   if (vtype == ASCII_FILE_VIEWER) {
725     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
726     PetscSequentialPhaseBegin(mat->comm,1);
727     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
728            baij->rank,baij->m,baij->rstart*bs,baij->rend*bs,baij->n,
729             baij->cstart*bs,baij->cend*bs);
730     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
731     ierr = MatView(baij->B,viewer); CHKERRQ(ierr);
732     fflush(fd);
733     PetscSequentialPhaseEnd(mat->comm,1);
734   }
735   else {
736     int size = baij->size;
737     rank = baij->rank;
738     if (size == 1) {
739       ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
740     }
741     else {
742       /* assemble the entire matrix onto first processor. */
743       Mat         A;
744       Mat_SeqBAIJ *Aloc;
745       int         M = baij->M, N = baij->N,*ai,*aj,row,col,i,j,k,*rvals;
746       int         mbs=baij->mbs;
747       Scalar      *a;
748 
749       if (!rank) {
750         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
751         CHKERRQ(ierr);
752       }
753       else {
754         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
755         CHKERRQ(ierr);
756       }
757       PLogObjectParent(mat,A);
758 
759       /* copy over the A part */
760       Aloc = (Mat_SeqBAIJ*) baij->A->data;
761       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
762       row = baij->rstart;
763       rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
764 
765       for ( i=0; i<mbs; i++ ) {
766         rvals[0] = bs*(baij->rstart + i);
767         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
768         for ( j=ai[i]; j<ai[i+1]; j++ ) {
769           col = (baij->cstart+aj[j])*bs;
770           for (k=0; k<bs; k++ ) {
771             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
772             col++; a += bs;
773           }
774         }
775       }
776       /* copy over the B part */
777       Aloc = (Mat_SeqBAIJ*) baij->B->data;
778       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
779       row = baij->rstart*bs;
780       for ( i=0; i<mbs; i++ ) {
781         rvals[0] = bs*(baij->rstart + i);
782         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
783         for ( j=ai[i]; j<ai[i+1]; j++ ) {
784           col = baij->garray[aj[j]]*bs;
785           for (k=0; k<bs; k++ ) {
786             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
787             col++; a += bs;
788           }
789         }
790       }
791       PetscFree(rvals);
792       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
793       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
794       if (!rank) {
795         ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
796       }
797       ierr = MatDestroy(A); CHKERRQ(ierr);
798     }
799   }
800   return 0;
801 }
802 
803 
804 
805 #undef __FUNC__
806 #define __FUNC__ "MatView_MPIBAIJ"
807 int MatView_MPIBAIJ(PetscObject obj,Viewer viewer)
808 {
809   Mat         mat = (Mat) obj;
810   int         ierr;
811   ViewerType  vtype;
812 
813   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
814   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
815       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
816     ierr = MatView_MPIBAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
817   }
818   else if (vtype == BINARY_FILE_VIEWER) {
819     return MatView_MPIBAIJ_Binary(mat,viewer);
820   }
821   return 0;
822 }
823 
824 #undef __FUNC__
825 #define __FUNC__ "MatDestroy_MPIBAIJ"
826 int MatDestroy_MPIBAIJ(PetscObject obj)
827 {
828   Mat         mat = (Mat) obj;
829   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
830   int         ierr;
831 
832 #if defined(PETSC_LOG)
833   PLogObjectState(obj,"Rows=%d, Cols=%d",baij->M,baij->N);
834 #endif
835 
836   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
837   PetscFree(baij->rowners);
838   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
839   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
840   if (baij->colmap) PetscFree(baij->colmap);
841   if (baij->garray) PetscFree(baij->garray);
842   if (baij->lvec)   VecDestroy(baij->lvec);
843   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
844   if (baij->rowvalues) PetscFree(baij->rowvalues);
845   if (baij->barray) PetscFree(baij->barray);
846   PetscFree(baij);
847   if (mat->mapping) {
848     ierr = ISLocalToGlobalMappingDestroy(mat->mapping); CHKERRQ(ierr);
849   }
850   PLogObjectDestroy(mat);
851   PetscHeaderDestroy(mat);
852   return 0;
853 }
854 
855 #undef __FUNC__
856 #define __FUNC__ "MatMult_MPIBAIJ"
857 int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
858 {
859   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
860   int         ierr, nt;
861 
862   VecGetLocalSize_Fast(xx,nt);
863   if (nt != a->n) {
864     SETERRQ(1,0,"Incompatible partition of A and xx");
865   }
866   VecGetLocalSize_Fast(yy,nt);
867   if (nt != a->m) {
868     SETERRQ(1,0,"Incompatible parition of A and yy");
869   }
870   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
871   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
872   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
873   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
874   ierr = VecScatterPostRecvs(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
875   return 0;
876 }
877 
878 #undef __FUNC__
879 #define __FUNC__ "MatMultAdd_MPIBAIJ"
880 int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
881 {
882   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
883   int        ierr;
884   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
885   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
886   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
887   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
888   return 0;
889 }
890 
891 #undef __FUNC__
892 #define __FUNC__ "MatMultTrans_MPIBAIJ"
893 int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
894 {
895   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
896   int        ierr;
897 
898   /* do nondiagonal part */
899   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
900   /* send it on its way */
901   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
902   /* do local part */
903   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
904   /* receive remote parts: note this assumes the values are not actually */
905   /* inserted in yy until the next line, which is true for my implementation*/
906   /* but is not perhaps always true. */
907   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
908   return 0;
909 }
910 
911 #undef __FUNC__
912 #define __FUNC__ "MatMultTransAdd_MPIBAIJ"
913 int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
914 {
915   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
916   int        ierr;
917 
918   /* do nondiagonal part */
919   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
920   /* send it on its way */
921   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
922   /* do local part */
923   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
924   /* receive remote parts: note this assumes the values are not actually */
925   /* inserted in yy until the next line, which is true for my implementation*/
926   /* but is not perhaps always true. */
927   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
928   return 0;
929 }
930 
931 /*
932   This only works correctly for square matrices where the subblock A->A is the
933    diagonal block
934 */
935 #undef __FUNC__
936 #define __FUNC__ "MatGetDiagonal_MPIBAIJ"
937 int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
938 {
939   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
940   if (a->M != a->N)
941     SETERRQ(1,0,"Supports only square matrix where A->A is diag block");
942   return MatGetDiagonal(a->A,v);
943 }
944 
945 #undef __FUNC__
946 #define __FUNC__ "MatScale_MPIBAIJ"
947 int MatScale_MPIBAIJ(Scalar *aa,Mat A)
948 {
949   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
950   int        ierr;
951   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
952   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
953   return 0;
954 }
955 
956 #undef __FUNC__
957 #define __FUNC__ "MatGetSize_MPIBAIJ"
958 int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
959 {
960   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
961   *m = mat->M; *n = mat->N;
962   return 0;
963 }
964 
965 #undef __FUNC__
966 #define __FUNC__ "MatGetLocalSize_MPIBAIJ"
967 int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
968 {
969   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
970   *m = mat->m; *n = mat->N;
971   return 0;
972 }
973 
974 #undef __FUNC__
975 #define __FUNC__ "MatGetOwnershipRange_MPIBAIJ"
976 int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
977 {
978   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
979   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
980   return 0;
981 }
982 
983 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
984 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
985 
986 #undef __FUNC__
987 #define __FUNC__ "MatGetRow_MPIBAIJ"
988 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
989 {
990   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
991   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
992   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
993   int        nztot, nzA, nzB, lrow, brstart = mat->rstart*bs, brend = mat->rend*bs;
994   int        *cmap, *idx_p,cstart = mat->cstart;
995 
996   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,0,"Already active");
997   mat->getrowactive = PETSC_TRUE;
998 
999   if (!mat->rowvalues && (idx || v)) {
1000     /*
1001         allocate enough space to hold information from the longest row.
1002     */
1003     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
1004     int     max = 1,mbs = mat->mbs,tmp;
1005     for ( i=0; i<mbs; i++ ) {
1006       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
1007       if (max < tmp) { max = tmp; }
1008     }
1009     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
1010     CHKPTRQ(mat->rowvalues);
1011     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
1012   }
1013 
1014 
1015   if (row < brstart || row >= brend) SETERRQ(1,0,"Only local rows")
1016   lrow = row - brstart;
1017 
1018   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
1019   if (!v)   {pvA = 0; pvB = 0;}
1020   if (!idx) {pcA = 0; if (!v) pcB = 0;}
1021   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1022   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1023   nztot = nzA + nzB;
1024 
1025   cmap  = mat->garray;
1026   if (v  || idx) {
1027     if (nztot) {
1028       /* Sort by increasing column numbers, assuming A and B already sorted */
1029       int imark = -1;
1030       if (v) {
1031         *v = v_p = mat->rowvalues;
1032         for ( i=0; i<nzB; i++ ) {
1033           if (cmap[cworkB[i]/bs] < cstart)   v_p[i] = vworkB[i];
1034           else break;
1035         }
1036         imark = i;
1037         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
1038         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
1039       }
1040       if (idx) {
1041         *idx = idx_p = mat->rowindices;
1042         if (imark > -1) {
1043           for ( i=0; i<imark; i++ ) {
1044             idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs;
1045           }
1046         } else {
1047           for ( i=0; i<nzB; i++ ) {
1048             if (cmap[cworkB[i]/bs] < cstart)
1049               idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
1050             else break;
1051           }
1052           imark = i;
1053         }
1054         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart*bs + cworkA[i];
1055         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
1056       }
1057     }
1058     else {
1059       if (idx) *idx = 0;
1060       if (v)   *v   = 0;
1061     }
1062   }
1063   *nz = nztot;
1064   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1065   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1066   return 0;
1067 }
1068 
1069 #undef __FUNC__
1070 #define __FUNC__ "MatRestoreRow_MPIBAIJ"
1071 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
1072 {
1073   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1074   if (baij->getrowactive == PETSC_FALSE) {
1075     SETERRQ(1,0,"MatGetRow not called");
1076   }
1077   baij->getrowactive = PETSC_FALSE;
1078   return 0;
1079 }
1080 
1081 #undef __FUNC__
1082 #define __FUNC__ "MatGetBlockSize_MPIBAIJ"
1083 int MatGetBlockSize_MPIBAIJ(Mat mat,int *bs)
1084 {
1085   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1086   *bs = baij->bs;
1087   return 0;
1088 }
1089 
1090 #undef __FUNC__
1091 #define __FUNC__ "MatZeroEntries_MPIBAIJ"
1092 int MatZeroEntries_MPIBAIJ(Mat A)
1093 {
1094   Mat_MPIBAIJ *l = (Mat_MPIBAIJ *) A->data;
1095   int         ierr;
1096   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
1097   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
1098   return 0;
1099 }
1100 
1101 #undef __FUNC__
1102 #define __FUNC__ "MatGetInfo_MPIBAIJ"
1103 int MatGetInfo_MPIBAIJ(Mat matin,MatInfoType flag,MatInfo *info)
1104 {
1105   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) matin->data;
1106   Mat         A = a->A, B = a->B;
1107   int         ierr;
1108   double      isend[5], irecv[5];
1109 
1110   info->rows_global    = (double)a->M;
1111   info->columns_global = (double)a->N;
1112   info->rows_local     = (double)a->m;
1113   info->columns_local  = (double)a->N;
1114   info->block_size     = (double)a->bs;
1115   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
1116   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->memory;
1117   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
1118   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->memory;
1119   if (flag == MAT_LOCAL) {
1120     info->nz_used      = isend[0];
1121     info->nz_allocated = isend[1];
1122     info->nz_unneeded  = isend[2];
1123     info->memory       = isend[3];
1124     info->mallocs      = isend[4];
1125   } else if (flag == MAT_GLOBAL_MAX) {
1126     MPI_Allreduce(isend,irecv,5,MPI_INT,MPI_MAX,matin->comm);
1127     info->nz_used      = irecv[0];
1128     info->nz_allocated = irecv[1];
1129     info->nz_unneeded  = irecv[2];
1130     info->memory       = irecv[3];
1131     info->mallocs      = irecv[4];
1132   } else if (flag == MAT_GLOBAL_SUM) {
1133     MPI_Allreduce(isend,irecv,5,MPI_INT,MPI_SUM,matin->comm);
1134     info->nz_used      = irecv[0];
1135     info->nz_allocated = irecv[1];
1136     info->nz_unneeded  = irecv[2];
1137     info->memory       = irecv[3];
1138     info->mallocs      = irecv[4];
1139   }
1140   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
1141   info->fill_ratio_needed = 0;
1142   info->factor_mallocs    = 0;
1143   return 0;
1144 }
1145 
1146 #undef __FUNC__
1147 #define __FUNC__ "MatSetOption_MPIBAIJ"
1148 int MatSetOption_MPIBAIJ(Mat A,MatOption op)
1149 {
1150   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1151 
1152   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
1153       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
1154       op == MAT_COLUMNS_UNSORTED ||
1155       op == MAT_COLUMNS_SORTED ||
1156       op == MAT_NEW_NONZERO_ALLOCATION_ERROR ||
1157       op == MAT_NEW_NONZERO_LOCATION_ERROR) {
1158         MatSetOption(a->A,op);
1159         MatSetOption(a->B,op);
1160   } else if (op == MAT_ROW_ORIENTED) {
1161         a->roworiented = 1;
1162         MatSetOption(a->A,op);
1163         MatSetOption(a->B,op);
1164   } else if (op == MAT_ROWS_SORTED ||
1165              op == MAT_ROWS_UNSORTED ||
1166              op == MAT_SYMMETRIC ||
1167              op == MAT_STRUCTURALLY_SYMMETRIC ||
1168              op == MAT_YES_NEW_DIAGONALS)
1169     PLogInfo(A,"Info:MatSetOption_MPIBAIJ:Option ignored\n");
1170   else if (op == MAT_COLUMN_ORIENTED) {
1171     a->roworiented = 0;
1172     MatSetOption(a->A,op);
1173     MatSetOption(a->B,op);
1174   } else if (op == MAT_IGNORE_OFF_PROC_ENTRIES) {
1175     a->donotstash = 1;
1176   } else if (op == MAT_NO_NEW_DIAGONALS)
1177     {SETERRQ(PETSC_ERR_SUP,0,"MAT_NO_NEW_DIAGONALS");}
1178   else
1179     {SETERRQ(PETSC_ERR_SUP,0,"unknown option");}
1180   return 0;
1181 }
1182 
1183 #undef __FUNC__
1184 #define __FUNC__ "MatTranspose_MPIBAIJ("
1185 int MatTranspose_MPIBAIJ(Mat A,Mat *matout)
1186 {
1187   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) A->data;
1188   Mat_SeqBAIJ *Aloc;
1189   Mat        B;
1190   int        ierr,M=baij->M,N=baij->N,*ai,*aj,row,i,*rvals,j,k,col;
1191   int        bs=baij->bs,mbs=baij->mbs;
1192   Scalar     *a;
1193 
1194   if (matout == PETSC_NULL && M != N)
1195     SETERRQ(1,0,"Square matrix only for in-place");
1196   ierr = MatCreateMPIBAIJ(A->comm,baij->bs,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);
1197   CHKERRQ(ierr);
1198 
1199   /* copy over the A part */
1200   Aloc = (Mat_SeqBAIJ*) baij->A->data;
1201   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1202   row = baij->rstart;
1203   rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
1204 
1205   for ( i=0; i<mbs; i++ ) {
1206     rvals[0] = bs*(baij->rstart + i);
1207     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1208     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1209       col = (baij->cstart+aj[j])*bs;
1210       for (k=0; k<bs; k++ ) {
1211         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1212         col++; a += bs;
1213       }
1214     }
1215   }
1216   /* copy over the B part */
1217   Aloc = (Mat_SeqBAIJ*) baij->B->data;
1218   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1219   row = baij->rstart*bs;
1220   for ( i=0; i<mbs; i++ ) {
1221     rvals[0] = bs*(baij->rstart + i);
1222     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1223     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1224       col = baij->garray[aj[j]]*bs;
1225       for (k=0; k<bs; k++ ) {
1226         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1227         col++; a += bs;
1228       }
1229     }
1230   }
1231   PetscFree(rvals);
1232   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1233   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1234 
1235   if (matout != PETSC_NULL) {
1236     *matout = B;
1237   } else {
1238     /* This isn't really an in-place transpose .... but free data structures from baij */
1239     PetscFree(baij->rowners);
1240     ierr = MatDestroy(baij->A); CHKERRQ(ierr);
1241     ierr = MatDestroy(baij->B); CHKERRQ(ierr);
1242     if (baij->colmap) PetscFree(baij->colmap);
1243     if (baij->garray) PetscFree(baij->garray);
1244     if (baij->lvec) VecDestroy(baij->lvec);
1245     if (baij->Mvctx) VecScatterDestroy(baij->Mvctx);
1246     PetscFree(baij);
1247     PetscMemcpy(A,B,sizeof(struct _p_Mat));
1248     PetscHeaderDestroy(B);
1249   }
1250   return 0;
1251 }
1252 
1253 #undef __FUNC__
1254 #define __FUNC__ "MatDiagonalScale_MPIBAIJ"
1255 int MatDiagonalScale_MPIBAIJ(Mat A,Vec ll,Vec rr)
1256 {
1257   Mat a = ((Mat_MPIBAIJ *) A->data)->A;
1258   Mat b = ((Mat_MPIBAIJ *) A->data)->B;
1259   int ierr,s1,s2,s3;
1260 
1261   if (ll)  {
1262     ierr = VecGetLocalSize(ll,&s1); CHKERRQ(ierr);
1263     ierr = MatGetLocalSize(A,&s2,&s3); CHKERRQ(ierr);
1264     if (s1!=s2) SETERRQ(1,0,"non-conforming local sizes");
1265     ierr = MatDiagonalScale(a,ll,0); CHKERRQ(ierr);
1266     ierr = MatDiagonalScale(b,ll,0); CHKERRQ(ierr);
1267   }
1268   if (rr) SETERRQ(1,0,"not supported for right vector");
1269   return 0;
1270 }
1271 
1272 /* the code does not do the diagonal entries correctly unless the
1273    matrix is square and the column and row owerships are identical.
1274    This is a BUG. The only way to fix it seems to be to access
1275    baij->A and baij->B directly and not through the MatZeroRows()
1276    routine.
1277 */
1278 #undef __FUNC__
1279 #define __FUNC__ "MatZeroRows_MPIBAIJ"
1280 int MatZeroRows_MPIBAIJ(Mat A,IS is,Scalar *diag)
1281 {
1282   Mat_MPIBAIJ    *l = (Mat_MPIBAIJ *) A->data;
1283   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
1284   int            *procs,*nprocs,j,found,idx,nsends,*work;
1285   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
1286   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
1287   int            *lens,imdex,*lrows,*values,bs=l->bs;
1288   MPI_Comm       comm = A->comm;
1289   MPI_Request    *send_waits,*recv_waits;
1290   MPI_Status     recv_status,*send_status;
1291   IS             istmp;
1292 
1293   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
1294   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
1295 
1296   /*  first count number of contributors to each processor */
1297   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
1298   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
1299   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
1300   for ( i=0; i<N; i++ ) {
1301     idx = rows[i];
1302     found = 0;
1303     for ( j=0; j<size; j++ ) {
1304       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
1305         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
1306       }
1307     }
1308     if (!found) SETERRQ(1,0,"Index out of range");
1309   }
1310   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
1311 
1312   /* inform other processors of number of messages and max length*/
1313   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
1314   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
1315   nrecvs = work[rank];
1316   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
1317   nmax = work[rank];
1318   PetscFree(work);
1319 
1320   /* post receives:   */
1321   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
1322   CHKPTRQ(rvalues);
1323   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
1324   CHKPTRQ(recv_waits);
1325   for ( i=0; i<nrecvs; i++ ) {
1326     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
1327   }
1328 
1329   /* do sends:
1330       1) starts[i] gives the starting index in svalues for stuff going to
1331          the ith processor
1332   */
1333   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
1334   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
1335   CHKPTRQ(send_waits);
1336   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
1337   starts[0] = 0;
1338   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1339   for ( i=0; i<N; i++ ) {
1340     svalues[starts[owner[i]]++] = rows[i];
1341   }
1342   ISRestoreIndices(is,&rows);
1343 
1344   starts[0] = 0;
1345   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1346   count = 0;
1347   for ( i=0; i<size; i++ ) {
1348     if (procs[i]) {
1349       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
1350     }
1351   }
1352   PetscFree(starts);
1353 
1354   base = owners[rank]*bs;
1355 
1356   /*  wait on receives */
1357   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
1358   source = lens + nrecvs;
1359   count  = nrecvs; slen = 0;
1360   while (count) {
1361     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
1362     /* unpack receives into our local space */
1363     MPI_Get_count(&recv_status,MPI_INT,&n);
1364     source[imdex]  = recv_status.MPI_SOURCE;
1365     lens[imdex]  = n;
1366     slen += n;
1367     count--;
1368   }
1369   PetscFree(recv_waits);
1370 
1371   /* move the data into the send scatter */
1372   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
1373   count = 0;
1374   for ( i=0; i<nrecvs; i++ ) {
1375     values = rvalues + i*nmax;
1376     for ( j=0; j<lens[i]; j++ ) {
1377       lrows[count++] = values[j] - base;
1378     }
1379   }
1380   PetscFree(rvalues); PetscFree(lens);
1381   PetscFree(owner); PetscFree(nprocs);
1382 
1383   /* actually zap the local rows */
1384   ierr = ISCreateGeneral(PETSC_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
1385   PLogObjectParent(A,istmp);
1386   PetscFree(lrows);
1387   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
1388   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
1389   ierr = ISDestroy(istmp); CHKERRQ(ierr);
1390 
1391   /* wait on sends */
1392   if (nsends) {
1393     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
1394     CHKPTRQ(send_status);
1395     MPI_Waitall(nsends,send_waits,send_status);
1396     PetscFree(send_status);
1397   }
1398   PetscFree(send_waits); PetscFree(svalues);
1399 
1400   return 0;
1401 }
1402 extern int MatPrintHelp_SeqBAIJ(Mat);
1403 #undef __FUNC__
1404 #define __FUNC__ "MatPrintHelp_MPIBAIJ"
1405 int MatPrintHelp_MPIBAIJ(Mat A)
1406 {
1407   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1408 
1409   if (!a->rank) return MatPrintHelp_SeqBAIJ(a->A);
1410   else return 0;
1411 }
1412 
1413 #undef __FUNC__
1414 #define __FUNC__ "MatSetUnfactored_MPIBAIJ"
1415 int MatSetUnfactored_MPIBAIJ(Mat A)
1416 {
1417   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1418   int         ierr;
1419   ierr = MatSetUnfactored(a->A); CHKERRQ(ierr);
1420   return 0;
1421 }
1422 
1423 static int MatConvertSameType_MPIBAIJ(Mat,Mat *,int);
1424 
1425 /* -------------------------------------------------------------------*/
1426 static struct _MatOps MatOps = {
1427   MatSetValues_MPIBAIJ,MatGetRow_MPIBAIJ,MatRestoreRow_MPIBAIJ,MatMult_MPIBAIJ,
1428   MatMultAdd_MPIBAIJ,MatMultTrans_MPIBAIJ,MatMultTransAdd_MPIBAIJ,0,
1429   0,0,0,0,
1430   0,0,MatTranspose_MPIBAIJ,MatGetInfo_MPIBAIJ,
1431   0,MatGetDiagonal_MPIBAIJ,MatDiagonalScale_MPIBAIJ,MatNorm_MPIBAIJ,
1432   MatAssemblyBegin_MPIBAIJ,MatAssemblyEnd_MPIBAIJ,0,MatSetOption_MPIBAIJ,
1433   MatZeroEntries_MPIBAIJ,MatZeroRows_MPIBAIJ,0,
1434   0,0,0,MatGetSize_MPIBAIJ,
1435   MatGetLocalSize_MPIBAIJ,MatGetOwnershipRange_MPIBAIJ,0,0,
1436   0,0,MatConvertSameType_MPIBAIJ,0,0,
1437   0,0,0,MatGetSubMatrices_MPIBAIJ,
1438   MatIncreaseOverlap_MPIBAIJ,MatGetValues_MPIBAIJ,0,MatPrintHelp_MPIBAIJ,
1439   MatScale_MPIBAIJ,0,0,0,MatGetBlockSize_MPIBAIJ,
1440   0,0,0,0,0,0,MatSetUnfactored_MPIBAIJ,0,MatSetValuesBlocked_MPIBAIJ};
1441 
1442 
1443 #undef __FUNC__
1444 #define __FUNC__ "MatCreateMPIBAIJ"
1445 /*@C
1446    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
1447    (block compressed row).  For good matrix assembly performance
1448    the user should preallocate the matrix storage by setting the parameters
1449    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1450    performance can be increased by more than a factor of 50.
1451 
1452    Input Parameters:
1453 .  comm - MPI communicator
1454 .  bs   - size of blockk
1455 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1456            This value should be the same as the local size used in creating the
1457            y vector for the matrix-vector product y = Ax.
1458 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1459            This value should be the same as the local size used in creating the
1460            x vector for the matrix-vector product y = Ax.
1461 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1462 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1463 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
1464            submatrix  (same for all local rows)
1465 .  d_nzz - array containing the number of block nonzeros in the various block rows
1466            of the in diagonal portion of the local (possibly different for each block
1467            row) or PETSC_NULL.  You must leave room for the diagonal entry even if
1468            it is zero.
1469 .  o_nz  - number of block nonzeros per block row in the off-diagonal portion of local
1470            submatrix (same for all local rows).
1471 .  o_nzz - array containing the number of nonzeros in the various block rows of the
1472            off-diagonal portion of the local submatrix (possibly different for
1473            each block row) or PETSC_NULL.
1474 
1475    Output Parameter:
1476 .  A - the matrix
1477 
1478    Notes:
1479    The user MUST specify either the local or global matrix dimensions
1480    (possibly both).
1481 
1482    Storage Information:
1483    For a square global matrix we define each processor's diagonal portion
1484    to be its local rows and the corresponding columns (a square submatrix);
1485    each processor's off-diagonal portion encompasses the remainder of the
1486    local matrix (a rectangular submatrix).
1487 
1488    The user can specify preallocated storage for the diagonal part of
1489    the local submatrix with either d_nz or d_nnz (not both).  Set
1490    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1491    memory allocation.  Likewise, specify preallocated storage for the
1492    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1493 
1494    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1495    the figure below we depict these three local rows and all columns (0-11).
1496 
1497 $          0 1 2 3 4 5 6 7 8 9 10 11
1498 $         -------------------
1499 $  row 3  |  o o o d d d o o o o o o
1500 $  row 4  |  o o o d d d o o o o o o
1501 $  row 5  |  o o o d d d o o o o o o
1502 $         -------------------
1503 $
1504 
1505    Thus, any entries in the d locations are stored in the d (diagonal)
1506    submatrix, and any entries in the o locations are stored in the
1507    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1508    stored simply in the MATSEQBAIJ format for compressed row storage.
1509 
1510    Now d_nz should indicate the number of nonzeros per row in the d matrix,
1511    and o_nz should indicate the number of nonzeros per row in the o matrix.
1512    In general, for PDE problems in which most nonzeros are near the diagonal,
1513    one expects d_nz >> o_nz.   For large problems you MUST preallocate memory
1514    or you will get TERRIBLE performance; see the users' manual chapter on
1515    matrices.
1516 
1517 .keywords: matrix, block, aij, compressed row, sparse, parallel
1518 
1519 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues()
1520 @*/
1521 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
1522                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1523 {
1524   Mat          B;
1525   Mat_MPIBAIJ  *b;
1526   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE,size;
1527 
1528   if (bs < 1) SETERRQ(1,0,"invalid block size specified");
1529   *A = 0;
1530   PetscHeaderCreate(B,_p_Mat,MAT_COOKIE,MATMPIBAIJ,comm);
1531   PLogObjectCreate(B);
1532   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
1533   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
1534   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1535   MPI_Comm_size(comm,&size);
1536   if (size == 1) {
1537     B->ops.getrowij          = MatGetRowIJ_MPIBAIJ;
1538     B->ops.restorerowij      = MatRestoreRowIJ_MPIBAIJ;
1539     B->ops.lufactorsymbolic  = MatLUFactorSymbolic_MPIBAIJ;
1540     B->ops.lufactornumeric   = MatLUFactorNumeric_MPIBAIJ;
1541     B->ops.lufactor          = MatLUFactor_MPIBAIJ;
1542     B->ops.solve             = MatSolve_MPIBAIJ;
1543     B->ops.solveadd          = MatSolveAdd_MPIBAIJ;
1544     B->ops.solvetrans        = MatSolveTrans_MPIBAIJ;
1545     B->ops.solvetransadd     = MatSolveTransAdd_MPIBAIJ;
1546     B->ops.ilufactorsymbolic = MatILUFactorSymbolic_MPIBAIJ;
1547   }
1548 
1549   B->destroy    = MatDestroy_MPIBAIJ;
1550   B->view       = MatView_MPIBAIJ;
1551   B->mapping    = 0;
1552   B->factor     = 0;
1553   B->assembled  = PETSC_FALSE;
1554 
1555   B->insertmode = NOT_SET_VALUES;
1556   MPI_Comm_rank(comm,&b->rank);
1557   MPI_Comm_size(comm,&b->size);
1558 
1559   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
1560     SETERRQ(1,0,"Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1561   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) SETERRQ(1,0,"either M or m should be specified");
1562   if ( M == PETSC_DECIDE && n == PETSC_DECIDE)SETERRQ(1,0,"either N or n should be specified");
1563   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
1564   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
1565 
1566   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1567     work[0] = m; work[1] = n;
1568     mbs = m/bs; nbs = n/bs;
1569     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
1570     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
1571     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
1572   }
1573   if (m == PETSC_DECIDE) {
1574     Mbs = M/bs;
1575     if (Mbs*bs != M) SETERRQ(1,0,"No of global rows must be divisible by blocksize");
1576     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
1577     m   = mbs*bs;
1578   }
1579   if (n == PETSC_DECIDE) {
1580     Nbs = N/bs;
1581     if (Nbs*bs != N) SETERRQ(1,0,"No of global cols must be divisible by blocksize");
1582     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
1583     n   = nbs*bs;
1584   }
1585   if (mbs*bs != m || nbs*bs != n) SETERRQ(1,0,"No of local rows, cols must be divisible by blocksize");
1586 
1587   b->m = m; B->m = m;
1588   b->n = n; B->n = n;
1589   b->N = N; B->N = N;
1590   b->M = M; B->M = M;
1591   b->bs  = bs;
1592   b->bs2 = bs*bs;
1593   b->mbs = mbs;
1594   b->nbs = nbs;
1595   b->Mbs = Mbs;
1596   b->Nbs = Nbs;
1597 
1598   /* build local table of row and column ownerships */
1599   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1600   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIBAIJ));
1601   b->cowners = b->rowners + b->size + 2;
1602   MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
1603   b->rowners[0] = 0;
1604   for ( i=2; i<=b->size; i++ ) {
1605     b->rowners[i] += b->rowners[i-1];
1606   }
1607   b->rstart    = b->rowners[b->rank];
1608   b->rend      = b->rowners[b->rank+1];
1609   b->rstart_bs = b->rstart * bs;
1610   b->rend_bs   = b->rend * bs;
1611 
1612   MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
1613   b->cowners[0] = 0;
1614   for ( i=2; i<=b->size; i++ ) {
1615     b->cowners[i] += b->cowners[i-1];
1616   }
1617   b->cstart    = b->cowners[b->rank];
1618   b->cend      = b->cowners[b->rank+1];
1619   b->cstart_bs = b->cstart * bs;
1620   b->cend_bs   = b->cend * bs;
1621 
1622   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1623   ierr = MatCreateSeqBAIJ(PETSC_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1624   PLogObjectParent(B,b->A);
1625   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1626   ierr = MatCreateSeqBAIJ(PETSC_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1627   PLogObjectParent(B,b->B);
1628 
1629   /* build cache for off array entries formed */
1630   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1631   b->donotstash  = 0;
1632   b->colmap      = 0;
1633   b->garray      = 0;
1634   b->roworiented = 1;
1635 
1636   /* stuff used in block assembly */
1637   b->barray      = 0;
1638 
1639   /* stuff used for matrix vector multiply */
1640   b->lvec        = 0;
1641   b->Mvctx       = 0;
1642 
1643   /* stuff for MatGetRow() */
1644   b->rowindices   = 0;
1645   b->rowvalues    = 0;
1646   b->getrowactive = PETSC_FALSE;
1647 
1648   *A = B;
1649   return 0;
1650 }
1651 
1652 #undef __FUNC__
1653 #define __FUNC__ "MatConvertSameType_MPIBAIJ"
1654 static int MatConvertSameType_MPIBAIJ(Mat matin,Mat *newmat,int cpvalues)
1655 {
1656   Mat         mat;
1657   Mat_MPIBAIJ *a,*oldmat = (Mat_MPIBAIJ *) matin->data;
1658   int         ierr, len=0, flg;
1659 
1660   *newmat       = 0;
1661   PetscHeaderCreate(mat,_p_Mat,MAT_COOKIE,MATMPIBAIJ,matin->comm);
1662   PLogObjectCreate(mat);
1663   mat->data       = (void *) (a = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(a);
1664   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1665   mat->destroy    = MatDestroy_MPIBAIJ;
1666   mat->view       = MatView_MPIBAIJ;
1667   mat->factor     = matin->factor;
1668   mat->assembled  = PETSC_TRUE;
1669 
1670   a->m = mat->m   = oldmat->m;
1671   a->n = mat->n   = oldmat->n;
1672   a->M = mat->M   = oldmat->M;
1673   a->N = mat->N   = oldmat->N;
1674 
1675   a->bs  = oldmat->bs;
1676   a->bs2 = oldmat->bs2;
1677   a->mbs = oldmat->mbs;
1678   a->nbs = oldmat->nbs;
1679   a->Mbs = oldmat->Mbs;
1680   a->Nbs = oldmat->Nbs;
1681 
1682   a->rstart       = oldmat->rstart;
1683   a->rend         = oldmat->rend;
1684   a->cstart       = oldmat->cstart;
1685   a->cend         = oldmat->cend;
1686   a->size         = oldmat->size;
1687   a->rank         = oldmat->rank;
1688   mat->insertmode = NOT_SET_VALUES;
1689   a->rowvalues    = 0;
1690   a->getrowactive = PETSC_FALSE;
1691   a->barray       = 0;
1692 
1693   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1694   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIBAIJ));
1695   a->cowners = a->rowners + a->size + 2;
1696   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1697   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1698   if (oldmat->colmap) {
1699     a->colmap = (int *) PetscMalloc((a->Nbs)*sizeof(int));CHKPTRQ(a->colmap);
1700     PLogObjectMemory(mat,(a->Nbs)*sizeof(int));
1701     PetscMemcpy(a->colmap,oldmat->colmap,(a->Nbs)*sizeof(int));
1702   } else a->colmap = 0;
1703   if (oldmat->garray && (len = ((Mat_SeqBAIJ *) (oldmat->B->data))->nbs)) {
1704     a->garray = (int *) PetscMalloc(len*sizeof(int)); CHKPTRQ(a->garray);
1705     PLogObjectMemory(mat,len*sizeof(int));
1706     PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1707   } else a->garray = 0;
1708 
1709   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1710   PLogObjectParent(mat,a->lvec);
1711   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1712   PLogObjectParent(mat,a->Mvctx);
1713   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1714   PLogObjectParent(mat,a->A);
1715   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1716   PLogObjectParent(mat,a->B);
1717   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1718   if (flg) {
1719     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1720   }
1721   *newmat = mat;
1722   return 0;
1723 }
1724 
1725 #include "sys.h"
1726 
1727 #undef __FUNC__
1728 #define __FUNC__ "MatLoad_MPIBAIJ"
1729 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
1730 {
1731   Mat          A;
1732   int          i, nz, ierr, j,rstart, rend, fd;
1733   Scalar       *vals,*buf;
1734   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1735   MPI_Status   status;
1736   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
1737   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
1738   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,bs2,Mbs,mbs,extra_rows;
1739   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
1740   int          dcount,kmax,k,nzcount,tmp;
1741 
1742 
1743   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
1744   bs2  = bs*bs;
1745 
1746   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1747   if (!rank) {
1748     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1749     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
1750     if (header[0] != MAT_COOKIE) SETERRQ(1,0,"not matrix object");
1751   }
1752 
1753   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1754   M = header[1]; N = header[2];
1755 
1756   if (M != N) SETERRQ(1,0,"Can only do square matrices");
1757 
1758   /*
1759      This code adds extra rows to make sure the number of rows is
1760      divisible by the blocksize
1761   */
1762   Mbs        = M/bs;
1763   extra_rows = bs - M + bs*(Mbs);
1764   if (extra_rows == bs) extra_rows = 0;
1765   else                  Mbs++;
1766   if (extra_rows &&!rank) {
1767     PLogInfo(0,"MatLoad_MPIBAIJ:Padding loaded matrix to match blocksize\n");
1768   }
1769 
1770   /* determine ownership of all rows */
1771   mbs = Mbs/size + ((Mbs % size) > rank);
1772   m   = mbs * bs;
1773   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
1774   browners = rowners + size + 1;
1775   MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1776   rowners[0] = 0;
1777   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
1778   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
1779   rstart = rowners[rank];
1780   rend   = rowners[rank+1];
1781 
1782   /* distribute row lengths to all processors */
1783   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
1784   if (!rank) {
1785     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
1786     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1787     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
1788     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1789     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
1790     MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);
1791     PetscFree(sndcounts);
1792   }
1793   else {
1794     MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);
1795   }
1796 
1797   if (!rank) {
1798     /* calculate the number of nonzeros on each processor */
1799     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1800     PetscMemzero(procsnz,size*sizeof(int));
1801     for ( i=0; i<size; i++ ) {
1802       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
1803         procsnz[i] += rowlengths[j];
1804       }
1805     }
1806     PetscFree(rowlengths);
1807 
1808     /* determine max buffer needed and allocate it */
1809     maxnz = 0;
1810     for ( i=0; i<size; i++ ) {
1811       maxnz = PetscMax(maxnz,procsnz[i]);
1812     }
1813     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1814 
1815     /* read in my part of the matrix column indices  */
1816     nz = procsnz[0];
1817     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1818     mycols = ibuf;
1819     if (size == 1)  nz -= extra_rows;
1820     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1821     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
1822 
1823     /* read in every ones (except the last) and ship off */
1824     for ( i=1; i<size-1; i++ ) {
1825       nz = procsnz[i];
1826       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1827       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1828     }
1829     /* read in the stuff for the last proc */
1830     if ( size != 1 ) {
1831       nz = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
1832       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1833       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
1834       MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);
1835     }
1836     PetscFree(cols);
1837   }
1838   else {
1839     /* determine buffer space needed for message */
1840     nz = 0;
1841     for ( i=0; i<m; i++ ) {
1842       nz += locrowlens[i];
1843     }
1844     ibuf = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1845     mycols = ibuf;
1846     /* receive message of column indices*/
1847     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1848     MPI_Get_count(&status,MPI_INT,&maxnz);
1849     if (maxnz != nz) SETERRQ(1,0,"something is wrong with file");
1850   }
1851 
1852   /* loop over local rows, determining number of off diagonal entries */
1853   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
1854   odlens = dlens + (rend-rstart);
1855   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
1856   PetscMemzero(mask,3*Mbs*sizeof(int));
1857   masked1 = mask    + Mbs;
1858   masked2 = masked1 + Mbs;
1859   rowcount = 0; nzcount = 0;
1860   for ( i=0; i<mbs; i++ ) {
1861     dcount  = 0;
1862     odcount = 0;
1863     for ( j=0; j<bs; j++ ) {
1864       kmax = locrowlens[rowcount];
1865       for ( k=0; k<kmax; k++ ) {
1866         tmp = mycols[nzcount++]/bs;
1867         if (!mask[tmp]) {
1868           mask[tmp] = 1;
1869           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
1870           else masked1[dcount++] = tmp;
1871         }
1872       }
1873       rowcount++;
1874     }
1875 
1876     dlens[i]  = dcount;
1877     odlens[i] = odcount;
1878 
1879     /* zero out the mask elements we set */
1880     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
1881     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
1882   }
1883 
1884   /* create our matrix */
1885   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);
1886          CHKERRQ(ierr);
1887   A = *newmat;
1888   MatSetOption(A,MAT_COLUMNS_SORTED);
1889 
1890   if (!rank) {
1891     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
1892     /* read in my part of the matrix numerical values  */
1893     nz = procsnz[0];
1894     vals = buf;
1895     mycols = ibuf;
1896     if (size == 1)  nz -= extra_rows;
1897     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1898     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
1899 
1900     /* insert into matrix */
1901     jj      = rstart*bs;
1902     for ( i=0; i<m; i++ ) {
1903       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1904       mycols += locrowlens[i];
1905       vals   += locrowlens[i];
1906       jj++;
1907     }
1908     /* read in other processors (except the last one) and ship out */
1909     for ( i=1; i<size-1; i++ ) {
1910       nz = procsnz[i];
1911       vals = buf;
1912       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1913       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1914     }
1915     /* the last proc */
1916     if ( size != 1 ){
1917       nz = procsnz[i] - extra_rows;
1918       vals = buf;
1919       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1920       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
1921       MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);
1922     }
1923     PetscFree(procsnz);
1924   }
1925   else {
1926     /* receive numeric values */
1927     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
1928 
1929     /* receive message of values*/
1930     vals = buf;
1931     mycols = ibuf;
1932     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1933     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1934     if (maxnz != nz) SETERRQ(1,0,"something is wrong with file");
1935 
1936     /* insert into matrix */
1937     jj      = rstart*bs;
1938     for ( i=0; i<m; i++ ) {
1939       ierr    = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1940       mycols += locrowlens[i];
1941       vals   += locrowlens[i];
1942       jj++;
1943     }
1944   }
1945   PetscFree(locrowlens);
1946   PetscFree(buf);
1947   PetscFree(ibuf);
1948   PetscFree(rowners);
1949   PetscFree(dlens);
1950   PetscFree(mask);
1951   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1952   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1953   return 0;
1954 }
1955 
1956 
1957