xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision 6e08aefdc683df73fa21dda75dc95d93cf00f12b)
1 
2 #ifdef PETSC_RCS_HEADER
3 static char vcid[] = "$Id: mpibaij.c,v 1.79 1997/08/22 15:14:37 bsmith Exp bsmith $";
4 #endif
5 
6 #include "pinclude/pviewer.h"
7 #include "src/mat/impls/baij/mpi/mpibaij.h"
8 #include "src/vec/vecimpl.h"
9 
10 
11 extern int MatSetUpMultiply_MPIBAIJ(Mat);
12 extern int DisAssemble_MPIBAIJ(Mat);
13 extern int MatIncreaseOverlap_MPIBAIJ(Mat,int,IS *,int);
14 extern int MatGetSubMatrices_MPIBAIJ(Mat,int,IS *,IS *,MatGetSubMatrixCall,Mat **);
15 
16 /*
17      Local utility routine that creates a mapping from the global column
18    number to the local number in the off-diagonal part of the local
19    storage of the matrix.  This is done in a non scable way since the
20    length of colmap equals the global matrix length.
21 */
22 #undef __FUNC__
23 #define __FUNC__ "CreateColmap_MPIBAIJ_Private"
24 static int CreateColmap_MPIBAIJ_Private(Mat mat)
25 {
26   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
27   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
28   int         nbs = B->nbs,i,bs=B->bs;;
29 
30   baij->colmap = (int *) PetscMalloc((baij->Nbs+1)*sizeof(int));CHKPTRQ(baij->colmap);
31   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
32   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
33   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i*bs+1;
34   return 0;
35 }
36 
37 #define CHUNKSIZE  10
38 
39 #define  MatSetValues_SeqBAIJ_A_Private(row,col,value,addv) \
40 { \
41  \
42     brow = row/bs;  \
43     rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
44     rmax = aimax[brow]; nrow = ailen[brow]; \
45       bcol = col/bs; \
46       ridx = row % bs; cidx = col % bs; \
47       low = 0; high = nrow; \
48       while (high-low > 3) { \
49         t = (low+high)/2; \
50         if (rp[t] > bcol) high = t; \
51         else              low  = t; \
52       } \
53       for ( _i=low; _i<high; _i++ ) { \
54         if (rp[_i] > bcol) break; \
55         if (rp[_i] == bcol) { \
56           bap  = ap +  bs2*_i + bs*cidx + ridx; \
57           if (addv == ADD_VALUES) *bap += value;  \
58           else                    *bap  = value;  \
59           goto a_noinsert; \
60         } \
61       } \
62       if (a->nonew == 1) goto a_noinsert; \
63       else if (a->nonew == -1) SETERRQ(1,0,"Inserting a new nonzero in the matrix"); \
64       if (nrow >= rmax) { \
65         /* there is no extra room in row, therefore enlarge */ \
66         int    new_nz = ai[a->mbs] + CHUNKSIZE,len,*new_i,*new_j; \
67         Scalar *new_a; \
68  \
69         if (a->nonew == -2) SETERRQ(1,0,"Inserting a new nonzero in the matrix"); \
70  \
71         /* malloc new storage space */ \
72         len     = new_nz*(sizeof(int)+bs2*sizeof(Scalar))+(a->mbs+1)*sizeof(int); \
73         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
74         new_j   = (int *) (new_a + bs2*new_nz); \
75         new_i   = new_j + new_nz; \
76  \
77         /* copy over old data into new slots */ \
78         for ( ii=0; ii<brow+1; ii++ ) {new_i[ii] = ai[ii];} \
79         for ( ii=brow+1; ii<a->mbs+1; ii++ ) {new_i[ii] = ai[ii]+CHUNKSIZE;} \
80         PetscMemcpy(new_j,aj,(ai[brow]+nrow)*sizeof(int)); \
81         len = (new_nz - CHUNKSIZE - ai[brow] - nrow); \
82         PetscMemcpy(new_j+ai[brow]+nrow+CHUNKSIZE,aj+ai[brow]+nrow, \
83                                                            len*sizeof(int)); \
84         PetscMemcpy(new_a,aa,(ai[brow]+nrow)*bs2*sizeof(Scalar)); \
85         PetscMemzero(new_a+bs2*(ai[brow]+nrow),bs2*CHUNKSIZE*sizeof(Scalar)); \
86         PetscMemcpy(new_a+bs2*(ai[brow]+nrow+CHUNKSIZE), \
87                     aa+bs2*(ai[brow]+nrow),bs2*len*sizeof(Scalar));  \
88         /* free up old matrix storage */ \
89         PetscFree(a->a);  \
90         if (!a->singlemalloc) {PetscFree(a->i);PetscFree(a->j);} \
91         aa = a->a = new_a; ai = a->i = new_i; aj = a->j = new_j;  \
92         a->singlemalloc = 1; \
93  \
94         rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
95         rmax = aimax[brow] = aimax[brow] + CHUNKSIZE; \
96         PLogObjectMemory(A,CHUNKSIZE*(sizeof(int) + bs2*sizeof(Scalar))); \
97         a->maxnz += bs2*CHUNKSIZE; \
98         a->reallocs++; \
99         a->nz++; \
100       } \
101       N = nrow++ - 1;  \
102       /* shift up all the later entries in this row */ \
103       for ( ii=N; ii>=_i; ii-- ) { \
104         rp[ii+1] = rp[ii]; \
105         PetscMemcpy(ap+bs2*(ii+1),ap+bs2*(ii),bs2*sizeof(Scalar)); \
106       } \
107       if (N>=_i) PetscMemzero(ap+bs2*_i,bs2*sizeof(Scalar));  \
108       rp[_i]                      = bcol;  \
109       ap[bs2*_i + bs*cidx + ridx] = value;  \
110       a_noinsert:; \
111     ailen[brow] = nrow; \
112 }
113 
114 #define  MatSetValues_SeqBAIJ_B_Private(row,col,value,addv) \
115 { \
116  \
117     brow = row/bs;  \
118     rp   = bj + bi[brow]; ap = ba + bs2*bi[brow]; \
119     rmax = bimax[brow]; nrow = bilen[brow]; \
120       bcol = col/bs; \
121       ridx = row % bs; cidx = col % bs; \
122       low = 0; high = nrow; \
123       while (high-low > 3) { \
124         t = (low+high)/2; \
125         if (rp[t] > bcol) high = t; \
126         else              low  = t; \
127       } \
128       for ( _i=low; _i<high; _i++ ) { \
129         if (rp[_i] > bcol) break; \
130         if (rp[_i] == bcol) { \
131           bap  = ap +  bs2*_i + bs*cidx + ridx; \
132           if (addv == ADD_VALUES) *bap += value;  \
133           else                    *bap  = value;  \
134           goto b_noinsert; \
135         } \
136       } \
137       if (b->nonew == 1) goto b_noinsert; \
138       else if (b->nonew == -1) SETERRQ(1,0,"Inserting a new nonzero in the matrix"); \
139       if (nrow >= rmax) { \
140         /* there is no extra room in row, therefore enlarge */ \
141         int    new_nz = bi[b->mbs] + CHUNKSIZE,len,*new_i,*new_j; \
142         Scalar *new_a; \
143  \
144         if (b->nonew == -2) SETERRQ(1,0,"Inserting a new nonzero in the matrix"); \
145  \
146         /* malloc new storage space */ \
147         len     = new_nz*(sizeof(int)+bs2*sizeof(Scalar))+(b->mbs+1)*sizeof(int); \
148         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
149         new_j   = (int *) (new_a + bs2*new_nz); \
150         new_i   = new_j + new_nz; \
151  \
152         /* copy over old data into new slots */ \
153         for ( ii=0; ii<brow+1; ii++ ) {new_i[ii] = bi[ii];} \
154         for ( ii=brow+1; ii<b->mbs+1; ii++ ) {new_i[ii] = bi[ii]+CHUNKSIZE;} \
155         PetscMemcpy(new_j,bj,(bi[brow]+nrow)*sizeof(int)); \
156         len = (new_nz - CHUNKSIZE - bi[brow] - nrow); \
157         PetscMemcpy(new_j+bi[brow]+nrow+CHUNKSIZE,bj+bi[brow]+nrow, \
158                                                            len*sizeof(int)); \
159         PetscMemcpy(new_a,ba,(bi[brow]+nrow)*bs2*sizeof(Scalar)); \
160         PetscMemzero(new_a+bs2*(bi[brow]+nrow),bs2*CHUNKSIZE*sizeof(Scalar)); \
161         PetscMemcpy(new_a+bs2*(bi[brow]+nrow+CHUNKSIZE), \
162                     ba+bs2*(bi[brow]+nrow),bs2*len*sizeof(Scalar));  \
163         /* free up old matrix storage */ \
164         PetscFree(b->a);  \
165         if (!b->singlemalloc) {PetscFree(b->i);PetscFree(b->j);} \
166         ba = b->a = new_a; bi = b->i = new_i; bj = b->j = new_j;  \
167         b->singlemalloc = 1; \
168  \
169         rp   = bj + bi[brow]; ap = ba + bs2*bi[brow]; \
170         rmax = bimax[brow] = bimax[brow] + CHUNKSIZE; \
171         PLogObjectMemory(B,CHUNKSIZE*(sizeof(int) + bs2*sizeof(Scalar))); \
172         b->maxnz += bs2*CHUNKSIZE; \
173         b->reallocs++; \
174         b->nz++; \
175       } \
176       N = nrow++ - 1;  \
177       /* shift up all the later entries in this row */ \
178       for ( ii=N; ii>=_i; ii-- ) { \
179         rp[ii+1] = rp[ii]; \
180         PetscMemcpy(ap+bs2*(ii+1),ap+bs2*(ii),bs2*sizeof(Scalar)); \
181       } \
182       if (N>=_i) PetscMemzero(ap+bs2*_i,bs2*sizeof(Scalar));  \
183       rp[_i]                      = bcol;  \
184       ap[bs2*_i + bs*cidx + ridx] = value;  \
185       b_noinsert:; \
186     bilen[brow] = nrow; \
187 }
188 
189 #undef __FUNC__
190 #define __FUNC__ "MatSetValues_MPIBAIJ"
191 int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
192 {
193   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
194   Scalar      value;
195   int         ierr,i,j,row,col;
196   int         roworiented = baij->roworiented,rstart_orig=baij->rstart_bs ;
197   int         rend_orig=baij->rend_bs,cstart_orig=baij->cstart_bs;
198   int         cend_orig=baij->cend_bs,bs=baij->bs;
199 
200   /* Some Variables required in the macro */
201   Mat         A = baij->A;
202   Mat_SeqBAIJ *a = (Mat_SeqBAIJ *) (A)->data;
203   int         *aimax=a->imax,*ai=a->i,*ailen=a->ilen,*aj=a->j;
204   Scalar      *aa=a->a;
205 
206   Mat         B = baij->B;
207   Mat_SeqBAIJ *b = (Mat_SeqBAIJ *) (B)->data;
208   int         *bimax=b->imax,*bi=b->i,*bilen=b->ilen,*bj=b->j;
209   Scalar      *ba=b->a;
210 
211   int         *rp,ii,nrow,_i,rmax,N,brow,bcol;
212   int         low,high,t,ridx,cidx,bs2=a->bs2;
213   Scalar      *ap,*bap;
214 
215   for ( i=0; i<m; i++ ) {
216 #if defined(PETSC_BOPT_g)
217     if (im[i] < 0) SETERRQ(1,0,"Negative row");
218     if (im[i] >= baij->M) SETERRQ(1,0,"Row too large");
219 #endif
220     if (im[i] >= rstart_orig && im[i] < rend_orig) {
221       row = im[i] - rstart_orig;
222       for ( j=0; j<n; j++ ) {
223         if (in[j] >= cstart_orig && in[j] < cend_orig){
224           col = in[j] - cstart_orig;
225           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
226           MatSetValues_SeqBAIJ_A_Private(row,col,value,addv);
227           /* ierr = MatSetValues_SeqBAIJ(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
228         }
229 #if defined(PETSC_BOPT_g)
230         else if (in[j] < 0) {SETERRQ(1,0,"Negative column");}
231         else if (in[j] >= baij->N) {SETERRQ(1,0,"Col too large");}
232 #endif
233         else {
234           if (mat->was_assembled) {
235             if (!baij->colmap) {
236               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
237             }
238             col = baij->colmap[in[j]/bs] - 1 + in[j]%bs;
239             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
240               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
241               col =  in[j];
242               /* Reinitialize the variables required by MatSetValues_SeqBAIJ_B_Private() */
243               B = baij->B;
244               b = (Mat_SeqBAIJ *) (B)->data;
245               bimax=b->imax;bi=b->i;bilen=b->ilen;bj=b->j;
246               ba=b->a;
247             }
248           }
249           else col = in[j];
250           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
251           MatSetValues_SeqBAIJ_B_Private(row,col,value,addv);
252           /* ierr = MatSetValues_SeqBAIJ(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
253         }
254       }
255     }
256     else {
257       if (roworiented && !baij->donotstash) {
258         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
259       }
260       else {
261         if (!baij->donotstash) {
262           row = im[i];
263 	  for ( j=0; j<n; j++ ) {
264 	    ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
265           }
266         }
267       }
268     }
269   }
270   return 0;
271 }
272 
273 extern int MatSetValuesBlocked_SeqBAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
274 #undef __FUNC__
275 #define __FUNC__ "MatSetValuesBlocked_MPIBAIJ"
276 int MatSetValuesBlocked_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
277 {
278   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
279   Scalar      *value,*barray=baij->barray;
280   int         ierr,i,j,ii,jj,row,col,k,l;
281   int         roworiented = baij->roworiented,rstart=baij->rstart ;
282   int         rend=baij->rend,cstart=baij->cstart,stepval;
283   int         cend=baij->cend,bs=baij->bs,bs2=baij->bs2;
284 
285 
286   if(!barray) {
287     baij->barray = barray = (Scalar*) PetscMalloc(bs2*sizeof(Scalar)); CHKPTRQ(barray);
288   }
289 
290   if (roworiented) {
291     stepval = (n-1)*bs;
292   } else {
293     stepval = (m-1)*bs;
294   }
295   for ( i=0; i<m; i++ ) {
296 #if defined(PETSC_BOPT_g)
297     if (im[i] < 0) SETERRQ(1,0,"Negative row");
298     if (im[i] >= baij->Mbs) SETERRQ(1,0,"Row too large");
299 #endif
300     if (im[i] >= rstart && im[i] < rend) {
301       row = im[i] - rstart;
302       for ( j=0; j<n; j++ ) {
303         /* If NumCol = 1 then a copy is not required */
304         if ((roworiented) && (n == 1)) {
305           barray = v + i*bs2;
306         } else if((!roworiented) && (m == 1)) {
307           barray = v + j*bs2;
308         } else { /* Here a copy is required */
309           if (roworiented) {
310             value = v + i*(stepval+bs)*bs + j*bs;
311           } else {
312             value = v + j*(stepval+bs)*bs + i*bs;
313           }
314           for ( ii=0; ii<bs; ii++,value+=stepval ) {
315             for (jj=0; jj<bs; jj++ ) {
316               *barray++  = *value++;
317             }
318           }
319           barray -=bs2;
320         }
321 
322         if (in[j] >= cstart && in[j] < cend){
323           col  = in[j] - cstart;
324           ierr = MatSetValuesBlocked_SeqBAIJ(baij->A,1,&row,1,&col,barray,addv);CHKERRQ(ierr);
325         }
326 #if defined(PETSC_BOPT_g)
327         else if (in[j] < 0) {SETERRQ(1,0,"Negative column");}
328         else if (in[j] >= baij->Nbs) {SETERRQ(1,0,"Column too large");}
329 #endif
330         else {
331           if (mat->was_assembled) {
332             if (!baij->colmap) {
333               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
334             }
335 
336 #if defined(PETSC_BOPT_g)
337             if ((baij->colmap[in[j]] - 1) % bs) {SETERRQ(1,0,"Incorrect colmap");}
338 #endif
339             col = (baij->colmap[in[j]] - 1)/bs;
340             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
341               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
342               col =  in[j];
343             }
344           }
345           else col = in[j];
346           ierr = MatSetValuesBlocked_SeqBAIJ(baij->B,1,&row,1,&col,barray,addv);CHKERRQ(ierr);
347         }
348       }
349     }
350     else {
351       if (!baij->donotstash) {
352         if (roworiented ) {
353           row   = im[i]*bs;
354           value = v + i*(stepval+bs)*bs;
355           for ( j=0; j<bs; j++,row++ ) {
356             for ( k=0; k<n; k++ ) {
357               for ( col=in[k]*bs,l=0; l<bs; l++,col++) {
358                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
359               }
360             }
361           }
362         }
363         else {
364           for ( j=0; j<n; j++ ) {
365             value = v + j*(stepval+bs)*bs + i*bs;
366             col   = in[j]*bs;
367             for ( k=0; k<bs; k++,col++,value+=stepval) {
368               for ( row = im[i]*bs,l=0; l<bs; l++,row++) {
369                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
370               }
371             }
372           }
373         }
374       }
375     }
376   }
377   return 0;
378 }
379 
380 #undef __FUNC__
381 #define __FUNC__ "MatGetValues_MPIBAIJ"
382 int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
383 {
384   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
385   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
386   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col;
387 
388   for ( i=0; i<m; i++ ) {
389     if (idxm[i] < 0) SETERRQ(1,0,"Negative row");
390     if (idxm[i] >= baij->M) SETERRQ(1,0,"Row too large");
391     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
392       row = idxm[i] - bsrstart;
393       for ( j=0; j<n; j++ ) {
394         if (idxn[j] < 0) SETERRQ(1,0,"Negative column");
395         if (idxn[j] >= baij->N) SETERRQ(1,0,"Col too large");
396         if (idxn[j] >= bscstart && idxn[j] < bscend){
397           col = idxn[j] - bscstart;
398           ierr = MatGetValues(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
399         }
400         else {
401           if (!baij->colmap) {
402             ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
403           }
404           if((baij->colmap[idxn[j]/bs]-1 < 0) ||
405              (baij->garray[(baij->colmap[idxn[j]/bs]-1)/bs] != idxn[j]/bs)) *(v+i*n+j) = 0.0;
406           else {
407             col  = (baij->colmap[idxn[j]/bs]-1) + idxn[j]%bs;
408             ierr = MatGetValues(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
409           }
410         }
411       }
412     }
413     else {
414       SETERRQ(1,0,"Only local values currently supported");
415     }
416   }
417   return 0;
418 }
419 
420 #undef __FUNC__
421 #define __FUNC__ "MatNorm_MPIBAIJ"
422 int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
423 {
424   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
425   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
426   int        ierr, i,bs2=baij->bs2;
427   double     sum = 0.0;
428   Scalar     *v;
429 
430   if (baij->size == 1) {
431     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
432   } else {
433     if (type == NORM_FROBENIUS) {
434       v = amat->a;
435       for (i=0; i<amat->nz*bs2; i++ ) {
436 #if defined(PETSC_COMPLEX)
437         sum += real(conj(*v)*(*v)); v++;
438 #else
439         sum += (*v)*(*v); v++;
440 #endif
441       }
442       v = bmat->a;
443       for (i=0; i<bmat->nz*bs2; i++ ) {
444 #if defined(PETSC_COMPLEX)
445         sum += real(conj(*v)*(*v)); v++;
446 #else
447         sum += (*v)*(*v); v++;
448 #endif
449       }
450       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
451       *norm = sqrt(*norm);
452     }
453     else
454       SETERRQ(PETSC_ERR_SUP,0,"No support for this norm yet");
455   }
456   return 0;
457 }
458 
459 #undef __FUNC__
460 #define __FUNC__ "MatAssemblyBegin_MPIBAIJ"
461 int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
462 {
463   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
464   MPI_Comm    comm = mat->comm;
465   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
466   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
467   MPI_Request *send_waits,*recv_waits;
468   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
469   InsertMode  addv;
470   Scalar      *rvalues,*svalues;
471 
472   /* make sure all processors are either in INSERTMODE or ADDMODE */
473   MPI_Allreduce(&mat->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
474   if (addv == (ADD_VALUES|INSERT_VALUES)) {
475     SETERRQ(1,0,"Some processors inserted others added");
476   }
477   mat->insertmode = addv; /* in case this processor had no cache */
478 
479   /*  first count number of contributors to each processor */
480   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
481   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
482   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
483   for ( i=0; i<baij->stash.n; i++ ) {
484     idx = baij->stash.idx[i];
485     for ( j=0; j<size; j++ ) {
486       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
487         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
488       }
489     }
490   }
491   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
492 
493   /* inform other processors of number of messages and max length*/
494   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
495   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
496   nreceives = work[rank];
497   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
498   nmax = work[rank];
499   PetscFree(work);
500 
501   /* post receives:
502        1) each message will consist of ordered pairs
503      (global index,value) we store the global index as a double
504      to simplify the message passing.
505        2) since we don't know how long each individual message is we
506      allocate the largest needed buffer for each receive. Potentially
507      this is a lot of wasted space.
508 
509 
510        This could be done better.
511   */
512   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
513   CHKPTRQ(rvalues);
514   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
515   CHKPTRQ(recv_waits);
516   for ( i=0; i<nreceives; i++ ) {
517     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
518               comm,recv_waits+i);
519   }
520 
521   /* do sends:
522       1) starts[i] gives the starting index in svalues for stuff going to
523          the ith processor
524   */
525   svalues = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
526   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
527   CHKPTRQ(send_waits);
528   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
529   starts[0] = 0;
530   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
531   for ( i=0; i<baij->stash.n; i++ ) {
532     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
533     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
534     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
535   }
536   PetscFree(owner);
537   starts[0] = 0;
538   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
539   count = 0;
540   for ( i=0; i<size; i++ ) {
541     if (procs[i]) {
542       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
543                 comm,send_waits+count++);
544     }
545   }
546   PetscFree(starts); PetscFree(nprocs);
547 
548   /* Free cache space */
549   PLogInfo(mat,"MatAssemblyBegin_MPIBAIJ:Number of off-processor values %d\n",baij->stash.n);
550   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
551 
552   baij->svalues    = svalues;    baij->rvalues    = rvalues;
553   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
554   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
555   baij->rmax       = nmax;
556 
557   return 0;
558 }
559 #include <math.h>
560 #define HASH_KEY 0.6180339887
561 #define HASH1(size,key) ((int)((size)*fmod(((key)*HASH_KEY),1))+1)
562 
563 int CreateHashTable(Mat mat)
564 {
565   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
566   Mat         A = baij->A, B=baij->B;
567   Mat_SeqBAIJ *a=(Mat_SeqBAIJ *)A->data, *b=(Mat_SeqBAIJ *)B->data;
568   int         i,j,k,nz=a->nz+b->nz,h1,*ai=a->i,*aj=a->j,*bi=b->i,*bj=b->j;
569   int         size=(int)(1.5*nz),ct=0,max=0;
570   /* Scalar      *aa=a->a,*ba=b->a; */
571   double      key;
572   static double *HT;
573   static      int flag=1;
574 
575 
576   /* Allocate Memory for Hash Table */
577   if (flag) {
578     HT = (double*)PetscMalloc(size*sizeof(double));
579     flag = 0;
580   }
581   PetscMemzero(HT,size*sizeof(double));
582 
583   /* Loop Over A */
584   for ( i=0; i<a->n; i++ ) {
585     for ( j=ai[i]; j<ai[i+1]; j++ ) {
586       key = i*baij->n+aj[j]+1;
587       h1  = HASH1(size,key);
588 
589       for ( k=1; k<size; k++ ){
590         if (HT[(h1*k)%size] == 0.0) {
591           HT[(h1*k)%size] = key;
592           break;
593         } else ct++;
594       }
595       if (k> max) max =k;
596     }
597   }
598    printf("***max1 = %d\n",max);
599   /* Loop Over B */
600   for ( i=0; i<b->n; i++ ) {
601     for ( j=bi[i]; j<bi[i+1]; j++ ) {
602       key = i*b->n+bj[j]+1;
603       h1  = HASH1(size,key);
604       for ( k=1; k<size; k++ ){
605         if (HT[(h1*k)%size] == 0.0) {
606           HT[(h1*k)%size] = key;
607           break;
608         } else ct++;
609       }
610       if (k> max) max =k;
611     }
612   }
613 
614   printf("***max2 = %d\n",max);
615   /* Print Summary */
616   for ( i=0,key=0.0,j=0; i<size; i++)
617     if (HT[i]) {j++;}
618 
619   printf("Size %d Average Buckets %d no of Keys %d\n",size,ct,j);
620   return 0;
621 }
622 
623 #undef __FUNC__
624 #define __FUNC__ "MatAssemblyEnd_MPIBAIJ"
625 int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
626 {
627   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
628   MPI_Status  *send_status,recv_status;
629   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
630   int         bs=baij->bs,row,col,other_disassembled,flg;
631   Scalar      *values,val;
632   InsertMode  addv = mat->insertmode;
633 
634   /*  wait on receives */
635   while (count) {
636     MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);
637     /* unpack receives into our local space */
638     values = baij->rvalues + 3*imdex*baij->rmax;
639     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
640     n = n/3;
641     for ( i=0; i<n; i++ ) {
642       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
643       col = (int) PetscReal(values[3*i+1]);
644       val = values[3*i+2];
645       if (col >= baij->cstart*bs && col < baij->cend*bs) {
646         col -= baij->cstart*bs;
647         ierr = MatSetValues(baij->A,1,&row,1,&col,&val,addv); CHKERRQ(ierr)
648       }
649       else {
650         if (mat->was_assembled) {
651           if (!baij->colmap) {
652             ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);
653           }
654           col = (baij->colmap[col/bs]) - 1 + col%bs;
655           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
656             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
657             col = (int) PetscReal(values[3*i+1]);
658           }
659         }
660         ierr = MatSetValues(baij->B,1,&row,1,&col,&val,addv); CHKERRQ(ierr)
661       }
662     }
663     count--;
664   }
665   PetscFree(baij->recv_waits); PetscFree(baij->rvalues);
666 
667   /* wait on sends */
668   if (baij->nsends) {
669     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));
670     CHKPTRQ(send_status);
671     MPI_Waitall(baij->nsends,baij->send_waits,send_status);
672     PetscFree(send_status);
673   }
674   PetscFree(baij->send_waits); PetscFree(baij->svalues);
675 
676   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
677   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
678 
679   /* determine if any processor has disassembled, if so we must
680      also disassemble ourselfs, in order that we may reassemble. */
681   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
682   if (mat->was_assembled && !other_disassembled) {
683     ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
684   }
685 
686   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
687     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
688   }
689   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
690   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
691 
692   ierr = OptionsHasName(PETSC_NULL,"-use_hash",&flg); CHKERRQ(ierr);
693   if (flg) CreateHashTable(mat);
694   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
695   return 0;
696 }
697 
698 #undef __FUNC__
699 #define __FUNC__ "MatView_MPIBAIJ_Binary"
700 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
701 {
702   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
703   int          ierr;
704 
705   if (baij->size == 1) {
706     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
707   }
708   else SETERRQ(1,0,"Only uniprocessor output supported");
709   return 0;
710 }
711 
712 #undef __FUNC__
713 #define __FUNC__ "MatView_MPIBAIJ_ASCIIorDraworMatlab"
714 static int MatView_MPIBAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
715 {
716   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
717   int          ierr, format,rank,bs = baij->bs;
718   FILE         *fd;
719   ViewerType   vtype;
720 
721   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
722   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
723     ierr = ViewerGetFormat(viewer,&format);
724     if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
725       MatInfo info;
726       MPI_Comm_rank(mat->comm,&rank);
727       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
728       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
729       PetscSequentialPhaseBegin(mat->comm,1);
730       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
731               rank,baij->m,(int)info.nz_used*bs,(int)info.nz_allocated*bs,
732               baij->bs,(int)info.memory);
733       ierr = MatGetInfo(baij->A,MAT_LOCAL,&info);
734       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
735       ierr = MatGetInfo(baij->B,MAT_LOCAL,&info);
736       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
737       fflush(fd);
738       PetscSequentialPhaseEnd(mat->comm,1);
739       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
740       return 0;
741     }
742     else if (format == VIEWER_FORMAT_ASCII_INFO) {
743       PetscPrintf(mat->comm,"  block size is %d\n",bs);
744       return 0;
745     }
746   }
747 
748   if (vtype == DRAW_VIEWER) {
749     Draw       draw;
750     PetscTruth isnull;
751     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
752     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
753   }
754 
755   if (vtype == ASCII_FILE_VIEWER) {
756     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
757     PetscSequentialPhaseBegin(mat->comm,1);
758     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
759            baij->rank,baij->m,baij->rstart*bs,baij->rend*bs,baij->n,
760             baij->cstart*bs,baij->cend*bs);
761     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
762     ierr = MatView(baij->B,viewer); CHKERRQ(ierr);
763     fflush(fd);
764     PetscSequentialPhaseEnd(mat->comm,1);
765   }
766   else {
767     int size = baij->size;
768     rank = baij->rank;
769     if (size == 1) {
770       ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
771     }
772     else {
773       /* assemble the entire matrix onto first processor. */
774       Mat         A;
775       Mat_SeqBAIJ *Aloc;
776       int         M = baij->M, N = baij->N,*ai,*aj,row,col,i,j,k,*rvals;
777       int         mbs=baij->mbs;
778       Scalar      *a;
779 
780       if (!rank) {
781         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
782         CHKERRQ(ierr);
783       }
784       else {
785         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
786         CHKERRQ(ierr);
787       }
788       PLogObjectParent(mat,A);
789 
790       /* copy over the A part */
791       Aloc = (Mat_SeqBAIJ*) baij->A->data;
792       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
793       row = baij->rstart;
794       rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
795 
796       for ( i=0; i<mbs; i++ ) {
797         rvals[0] = bs*(baij->rstart + i);
798         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
799         for ( j=ai[i]; j<ai[i+1]; j++ ) {
800           col = (baij->cstart+aj[j])*bs;
801           for (k=0; k<bs; k++ ) {
802             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
803             col++; a += bs;
804           }
805         }
806       }
807       /* copy over the B part */
808       Aloc = (Mat_SeqBAIJ*) baij->B->data;
809       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
810       row = baij->rstart*bs;
811       for ( i=0; i<mbs; i++ ) {
812         rvals[0] = bs*(baij->rstart + i);
813         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
814         for ( j=ai[i]; j<ai[i+1]; j++ ) {
815           col = baij->garray[aj[j]]*bs;
816           for (k=0; k<bs; k++ ) {
817             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
818             col++; a += bs;
819           }
820         }
821       }
822       PetscFree(rvals);
823       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
824       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
825       if (!rank) {
826         ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
827       }
828       ierr = MatDestroy(A); CHKERRQ(ierr);
829     }
830   }
831   return 0;
832 }
833 
834 
835 
836 #undef __FUNC__
837 #define __FUNC__ "MatView_MPIBAIJ"
838 int MatView_MPIBAIJ(PetscObject obj,Viewer viewer)
839 {
840   Mat         mat = (Mat) obj;
841   int         ierr;
842   ViewerType  vtype;
843 
844   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
845   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
846       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
847     ierr = MatView_MPIBAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
848   }
849   else if (vtype == BINARY_FILE_VIEWER) {
850     return MatView_MPIBAIJ_Binary(mat,viewer);
851   }
852   return 0;
853 }
854 
855 #undef __FUNC__
856 #define __FUNC__ "MatDestroy_MPIBAIJ"
857 int MatDestroy_MPIBAIJ(PetscObject obj)
858 {
859   Mat         mat = (Mat) obj;
860   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
861   int         ierr;
862 
863 #if defined(PETSC_LOG)
864   PLogObjectState(obj,"Rows=%d, Cols=%d",baij->M,baij->N);
865 #endif
866 
867   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
868   PetscFree(baij->rowners);
869   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
870   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
871   if (baij->colmap) PetscFree(baij->colmap);
872   if (baij->garray) PetscFree(baij->garray);
873   if (baij->lvec)   VecDestroy(baij->lvec);
874   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
875   if (baij->rowvalues) PetscFree(baij->rowvalues);
876   if (baij->barray) PetscFree(baij->barray);
877   PetscFree(baij);
878   PLogObjectDestroy(mat);
879   PetscHeaderDestroy(mat);
880   return 0;
881 }
882 
883 #undef __FUNC__
884 #define __FUNC__ "MatMult_MPIBAIJ"
885 int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
886 {
887   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
888   int         ierr, nt;
889 
890   VecGetLocalSize_Fast(xx,nt);
891   if (nt != a->n) {
892     SETERRQ(1,0,"Incompatible partition of A and xx");
893   }
894   VecGetLocalSize_Fast(yy,nt);
895   if (nt != a->m) {
896     SETERRQ(1,0,"Incompatible parition of A and yy");
897   }
898   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
899   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
900   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
901   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
902   ierr = VecScatterPostRecvs(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
903   return 0;
904 }
905 
906 #undef __FUNC__
907 #define __FUNC__ "MatMultAdd_MPIBAIJ"
908 int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
909 {
910   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
911   int        ierr;
912   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
913   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
914   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
915   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
916   return 0;
917 }
918 
919 #undef __FUNC__
920 #define __FUNC__ "MatMultTrans_MPIBAIJ"
921 int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
922 {
923   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
924   int        ierr;
925 
926   /* do nondiagonal part */
927   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
928   /* send it on its way */
929   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
930   /* do local part */
931   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
932   /* receive remote parts: note this assumes the values are not actually */
933   /* inserted in yy until the next line, which is true for my implementation*/
934   /* but is not perhaps always true. */
935   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
936   return 0;
937 }
938 
939 #undef __FUNC__
940 #define __FUNC__ "MatMultTransAdd_MPIBAIJ"
941 int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
942 {
943   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
944   int        ierr;
945 
946   /* do nondiagonal part */
947   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
948   /* send it on its way */
949   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
950   /* do local part */
951   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
952   /* receive remote parts: note this assumes the values are not actually */
953   /* inserted in yy until the next line, which is true for my implementation*/
954   /* but is not perhaps always true. */
955   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
956   return 0;
957 }
958 
959 /*
960   This only works correctly for square matrices where the subblock A->A is the
961    diagonal block
962 */
963 #undef __FUNC__
964 #define __FUNC__ "MatGetDiagonal_MPIBAIJ"
965 int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
966 {
967   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
968   if (a->M != a->N)
969     SETERRQ(1,0,"Supports only square matrix where A->A is diag block");
970   return MatGetDiagonal(a->A,v);
971 }
972 
973 #undef __FUNC__
974 #define __FUNC__ "MatScale_MPIBAIJ"
975 int MatScale_MPIBAIJ(Scalar *aa,Mat A)
976 {
977   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
978   int        ierr;
979   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
980   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
981   return 0;
982 }
983 
984 #undef __FUNC__
985 #define __FUNC__ "MatGetSize_MPIBAIJ"
986 int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
987 {
988   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
989   *m = mat->M; *n = mat->N;
990   return 0;
991 }
992 
993 #undef __FUNC__
994 #define __FUNC__ "MatGetLocalSize_MPIBAIJ"
995 int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
996 {
997   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
998   *m = mat->m; *n = mat->N;
999   return 0;
1000 }
1001 
1002 #undef __FUNC__
1003 #define __FUNC__ "MatGetOwnershipRange_MPIBAIJ"
1004 int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
1005 {
1006   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1007   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
1008   return 0;
1009 }
1010 
1011 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
1012 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
1013 
1014 #undef __FUNC__
1015 #define __FUNC__ "MatGetRow_MPIBAIJ"
1016 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
1017 {
1018   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1019   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
1020   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
1021   int        nztot, nzA, nzB, lrow, brstart = mat->rstart*bs, brend = mat->rend*bs;
1022   int        *cmap, *idx_p,cstart = mat->cstart;
1023 
1024   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,0,"Already active");
1025   mat->getrowactive = PETSC_TRUE;
1026 
1027   if (!mat->rowvalues && (idx || v)) {
1028     /*
1029         allocate enough space to hold information from the longest row.
1030     */
1031     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
1032     int     max = 1,mbs = mat->mbs,tmp;
1033     for ( i=0; i<mbs; i++ ) {
1034       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
1035       if (max < tmp) { max = tmp; }
1036     }
1037     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
1038     CHKPTRQ(mat->rowvalues);
1039     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
1040   }
1041 
1042 
1043   if (row < brstart || row >= brend) SETERRQ(1,0,"Only local rows")
1044   lrow = row - brstart;
1045 
1046   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
1047   if (!v)   {pvA = 0; pvB = 0;}
1048   if (!idx) {pcA = 0; if (!v) pcB = 0;}
1049   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1050   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1051   nztot = nzA + nzB;
1052 
1053   cmap  = mat->garray;
1054   if (v  || idx) {
1055     if (nztot) {
1056       /* Sort by increasing column numbers, assuming A and B already sorted */
1057       int imark = -1;
1058       if (v) {
1059         *v = v_p = mat->rowvalues;
1060         for ( i=0; i<nzB; i++ ) {
1061           if (cmap[cworkB[i]/bs] < cstart)   v_p[i] = vworkB[i];
1062           else break;
1063         }
1064         imark = i;
1065         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
1066         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
1067       }
1068       if (idx) {
1069         *idx = idx_p = mat->rowindices;
1070         if (imark > -1) {
1071           for ( i=0; i<imark; i++ ) {
1072             idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs;
1073           }
1074         } else {
1075           for ( i=0; i<nzB; i++ ) {
1076             if (cmap[cworkB[i]/bs] < cstart)
1077               idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
1078             else break;
1079           }
1080           imark = i;
1081         }
1082         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart*bs + cworkA[i];
1083         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
1084       }
1085     }
1086     else {
1087       if (idx) *idx = 0;
1088       if (v)   *v   = 0;
1089     }
1090   }
1091   *nz = nztot;
1092   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1093   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1094   return 0;
1095 }
1096 
1097 #undef __FUNC__
1098 #define __FUNC__ "MatRestoreRow_MPIBAIJ"
1099 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
1100 {
1101   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1102   if (baij->getrowactive == PETSC_FALSE) {
1103     SETERRQ(1,0,"MatGetRow not called");
1104   }
1105   baij->getrowactive = PETSC_FALSE;
1106   return 0;
1107 }
1108 
1109 #undef __FUNC__
1110 #define __FUNC__ "MatGetBlockSize_MPIBAIJ"
1111 int MatGetBlockSize_MPIBAIJ(Mat mat,int *bs)
1112 {
1113   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1114   *bs = baij->bs;
1115   return 0;
1116 }
1117 
1118 #undef __FUNC__
1119 #define __FUNC__ "MatZeroEntries_MPIBAIJ"
1120 int MatZeroEntries_MPIBAIJ(Mat A)
1121 {
1122   Mat_MPIBAIJ *l = (Mat_MPIBAIJ *) A->data;
1123   int         ierr;
1124   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
1125   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
1126   return 0;
1127 }
1128 
1129 #undef __FUNC__
1130 #define __FUNC__ "MatGetInfo_MPIBAIJ"
1131 int MatGetInfo_MPIBAIJ(Mat matin,MatInfoType flag,MatInfo *info)
1132 {
1133   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) matin->data;
1134   Mat         A = a->A, B = a->B;
1135   int         ierr;
1136   double      isend[5], irecv[5];
1137 
1138   info->rows_global    = (double)a->M;
1139   info->columns_global = (double)a->N;
1140   info->rows_local     = (double)a->m;
1141   info->columns_local  = (double)a->N;
1142   info->block_size     = (double)a->bs;
1143   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
1144   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->memory;
1145   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
1146   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->memory;
1147   if (flag == MAT_LOCAL) {
1148     info->nz_used      = isend[0];
1149     info->nz_allocated = isend[1];
1150     info->nz_unneeded  = isend[2];
1151     info->memory       = isend[3];
1152     info->mallocs      = isend[4];
1153   } else if (flag == MAT_GLOBAL_MAX) {
1154     MPI_Allreduce(isend,irecv,5,MPI_INT,MPI_MAX,matin->comm);
1155     info->nz_used      = irecv[0];
1156     info->nz_allocated = irecv[1];
1157     info->nz_unneeded  = irecv[2];
1158     info->memory       = irecv[3];
1159     info->mallocs      = irecv[4];
1160   } else if (flag == MAT_GLOBAL_SUM) {
1161     MPI_Allreduce(isend,irecv,5,MPI_INT,MPI_SUM,matin->comm);
1162     info->nz_used      = irecv[0];
1163     info->nz_allocated = irecv[1];
1164     info->nz_unneeded  = irecv[2];
1165     info->memory       = irecv[3];
1166     info->mallocs      = irecv[4];
1167   }
1168   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
1169   info->fill_ratio_needed = 0;
1170   info->factor_mallocs    = 0;
1171   return 0;
1172 }
1173 
1174 #undef __FUNC__
1175 #define __FUNC__ "MatSetOption_MPIBAIJ"
1176 int MatSetOption_MPIBAIJ(Mat A,MatOption op)
1177 {
1178   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1179 
1180   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
1181       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
1182       op == MAT_COLUMNS_UNSORTED ||
1183       op == MAT_COLUMNS_SORTED ||
1184       op == MAT_NEW_NONZERO_ALLOCATION_ERROR ||
1185       op == MAT_NEW_NONZERO_LOCATION_ERROR) {
1186         MatSetOption(a->A,op);
1187         MatSetOption(a->B,op);
1188   } else if (op == MAT_ROW_ORIENTED) {
1189         a->roworiented = 1;
1190         MatSetOption(a->A,op);
1191         MatSetOption(a->B,op);
1192   } else if (op == MAT_ROWS_SORTED ||
1193              op == MAT_ROWS_UNSORTED ||
1194              op == MAT_SYMMETRIC ||
1195              op == MAT_STRUCTURALLY_SYMMETRIC ||
1196              op == MAT_YES_NEW_DIAGONALS)
1197     PLogInfo(A,"Info:MatSetOption_MPIBAIJ:Option ignored\n");
1198   else if (op == MAT_COLUMN_ORIENTED) {
1199     a->roworiented = 0;
1200     MatSetOption(a->A,op);
1201     MatSetOption(a->B,op);
1202   } else if (op == MAT_IGNORE_OFF_PROC_ENTRIES) {
1203     a->donotstash = 1;
1204   } else if (op == MAT_NO_NEW_DIAGONALS)
1205     {SETERRQ(PETSC_ERR_SUP,0,"MAT_NO_NEW_DIAGONALS");}
1206   else
1207     {SETERRQ(PETSC_ERR_SUP,0,"unknown option");}
1208   return 0;
1209 }
1210 
1211 #undef __FUNC__
1212 #define __FUNC__ "MatTranspose_MPIBAIJ("
1213 int MatTranspose_MPIBAIJ(Mat A,Mat *matout)
1214 {
1215   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) A->data;
1216   Mat_SeqBAIJ *Aloc;
1217   Mat        B;
1218   int        ierr,M=baij->M,N=baij->N,*ai,*aj,row,i,*rvals,j,k,col;
1219   int        bs=baij->bs,mbs=baij->mbs;
1220   Scalar     *a;
1221 
1222   if (matout == PETSC_NULL && M != N)
1223     SETERRQ(1,0,"Square matrix only for in-place");
1224   ierr = MatCreateMPIBAIJ(A->comm,baij->bs,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);
1225   CHKERRQ(ierr);
1226 
1227   /* copy over the A part */
1228   Aloc = (Mat_SeqBAIJ*) baij->A->data;
1229   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1230   row = baij->rstart;
1231   rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
1232 
1233   for ( i=0; i<mbs; i++ ) {
1234     rvals[0] = bs*(baij->rstart + i);
1235     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1236     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1237       col = (baij->cstart+aj[j])*bs;
1238       for (k=0; k<bs; k++ ) {
1239         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1240         col++; a += bs;
1241       }
1242     }
1243   }
1244   /* copy over the B part */
1245   Aloc = (Mat_SeqBAIJ*) baij->B->data;
1246   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1247   row = baij->rstart*bs;
1248   for ( i=0; i<mbs; i++ ) {
1249     rvals[0] = bs*(baij->rstart + i);
1250     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1251     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1252       col = baij->garray[aj[j]]*bs;
1253       for (k=0; k<bs; k++ ) {
1254         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1255         col++; a += bs;
1256       }
1257     }
1258   }
1259   PetscFree(rvals);
1260   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1261   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1262 
1263   if (matout != PETSC_NULL) {
1264     *matout = B;
1265   } else {
1266     /* This isn't really an in-place transpose .... but free data structures from baij */
1267     PetscFree(baij->rowners);
1268     ierr = MatDestroy(baij->A); CHKERRQ(ierr);
1269     ierr = MatDestroy(baij->B); CHKERRQ(ierr);
1270     if (baij->colmap) PetscFree(baij->colmap);
1271     if (baij->garray) PetscFree(baij->garray);
1272     if (baij->lvec) VecDestroy(baij->lvec);
1273     if (baij->Mvctx) VecScatterDestroy(baij->Mvctx);
1274     PetscFree(baij);
1275     PetscMemcpy(A,B,sizeof(struct _p_Mat));
1276     PetscHeaderDestroy(B);
1277   }
1278   return 0;
1279 }
1280 
1281 #undef __FUNC__
1282 #define __FUNC__ "MatDiagonalScale_MPIBAIJ"
1283 int MatDiagonalScale_MPIBAIJ(Mat A,Vec ll,Vec rr)
1284 {
1285   Mat a = ((Mat_MPIBAIJ *) A->data)->A;
1286   Mat b = ((Mat_MPIBAIJ *) A->data)->B;
1287   int ierr,s1,s2,s3;
1288 
1289   if (ll)  {
1290     ierr = VecGetLocalSize(ll,&s1); CHKERRQ(ierr);
1291     ierr = MatGetLocalSize(A,&s2,&s3); CHKERRQ(ierr);
1292     if (s1!=s2) SETERRQ(1,0,"non-conforming local sizes");
1293     ierr = MatDiagonalScale(a,ll,0); CHKERRQ(ierr);
1294     ierr = MatDiagonalScale(b,ll,0); CHKERRQ(ierr);
1295   }
1296   if (rr) SETERRQ(1,0,"not supported for right vector");
1297   return 0;
1298 }
1299 
1300 /* the code does not do the diagonal entries correctly unless the
1301    matrix is square and the column and row owerships are identical.
1302    This is a BUG. The only way to fix it seems to be to access
1303    baij->A and baij->B directly and not through the MatZeroRows()
1304    routine.
1305 */
1306 #undef __FUNC__
1307 #define __FUNC__ "MatZeroRows_MPIBAIJ"
1308 int MatZeroRows_MPIBAIJ(Mat A,IS is,Scalar *diag)
1309 {
1310   Mat_MPIBAIJ    *l = (Mat_MPIBAIJ *) A->data;
1311   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
1312   int            *procs,*nprocs,j,found,idx,nsends,*work;
1313   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
1314   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
1315   int            *lens,imdex,*lrows,*values,bs=l->bs;
1316   MPI_Comm       comm = A->comm;
1317   MPI_Request    *send_waits,*recv_waits;
1318   MPI_Status     recv_status,*send_status;
1319   IS             istmp;
1320 
1321   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
1322   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
1323 
1324   /*  first count number of contributors to each processor */
1325   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
1326   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
1327   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
1328   for ( i=0; i<N; i++ ) {
1329     idx = rows[i];
1330     found = 0;
1331     for ( j=0; j<size; j++ ) {
1332       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
1333         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
1334       }
1335     }
1336     if (!found) SETERRQ(1,0,"Index out of range");
1337   }
1338   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
1339 
1340   /* inform other processors of number of messages and max length*/
1341   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
1342   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
1343   nrecvs = work[rank];
1344   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
1345   nmax = work[rank];
1346   PetscFree(work);
1347 
1348   /* post receives:   */
1349   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
1350   CHKPTRQ(rvalues);
1351   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
1352   CHKPTRQ(recv_waits);
1353   for ( i=0; i<nrecvs; i++ ) {
1354     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
1355   }
1356 
1357   /* do sends:
1358       1) starts[i] gives the starting index in svalues for stuff going to
1359          the ith processor
1360   */
1361   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
1362   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
1363   CHKPTRQ(send_waits);
1364   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
1365   starts[0] = 0;
1366   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1367   for ( i=0; i<N; i++ ) {
1368     svalues[starts[owner[i]]++] = rows[i];
1369   }
1370   ISRestoreIndices(is,&rows);
1371 
1372   starts[0] = 0;
1373   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1374   count = 0;
1375   for ( i=0; i<size; i++ ) {
1376     if (procs[i]) {
1377       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
1378     }
1379   }
1380   PetscFree(starts);
1381 
1382   base = owners[rank]*bs;
1383 
1384   /*  wait on receives */
1385   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
1386   source = lens + nrecvs;
1387   count  = nrecvs; slen = 0;
1388   while (count) {
1389     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
1390     /* unpack receives into our local space */
1391     MPI_Get_count(&recv_status,MPI_INT,&n);
1392     source[imdex]  = recv_status.MPI_SOURCE;
1393     lens[imdex]  = n;
1394     slen += n;
1395     count--;
1396   }
1397   PetscFree(recv_waits);
1398 
1399   /* move the data into the send scatter */
1400   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
1401   count = 0;
1402   for ( i=0; i<nrecvs; i++ ) {
1403     values = rvalues + i*nmax;
1404     for ( j=0; j<lens[i]; j++ ) {
1405       lrows[count++] = values[j] - base;
1406     }
1407   }
1408   PetscFree(rvalues); PetscFree(lens);
1409   PetscFree(owner); PetscFree(nprocs);
1410 
1411   /* actually zap the local rows */
1412   ierr = ISCreateGeneral(PETSC_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
1413   PLogObjectParent(A,istmp);
1414   PetscFree(lrows);
1415   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
1416   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
1417   ierr = ISDestroy(istmp); CHKERRQ(ierr);
1418 
1419   /* wait on sends */
1420   if (nsends) {
1421     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
1422     CHKPTRQ(send_status);
1423     MPI_Waitall(nsends,send_waits,send_status);
1424     PetscFree(send_status);
1425   }
1426   PetscFree(send_waits); PetscFree(svalues);
1427 
1428   return 0;
1429 }
1430 extern int MatPrintHelp_SeqBAIJ(Mat);
1431 #undef __FUNC__
1432 #define __FUNC__ "MatPrintHelp_MPIBAIJ"
1433 int MatPrintHelp_MPIBAIJ(Mat A)
1434 {
1435   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1436 
1437   if (!a->rank) return MatPrintHelp_SeqBAIJ(a->A);
1438   else return 0;
1439 }
1440 
1441 #undef __FUNC__
1442 #define __FUNC__ "MatSetUnfactored_MPIBAIJ"
1443 int MatSetUnfactored_MPIBAIJ(Mat A)
1444 {
1445   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1446   int         ierr;
1447   ierr = MatSetUnfactored(a->A); CHKERRQ(ierr);
1448   return 0;
1449 }
1450 
1451 static int MatConvertSameType_MPIBAIJ(Mat,Mat *,int);
1452 
1453 /* -------------------------------------------------------------------*/
1454 static struct _MatOps MatOps = {
1455   MatSetValues_MPIBAIJ,MatGetRow_MPIBAIJ,MatRestoreRow_MPIBAIJ,MatMult_MPIBAIJ,
1456   MatMultAdd_MPIBAIJ,MatMultTrans_MPIBAIJ,MatMultTransAdd_MPIBAIJ,0,
1457   0,0,0,0,
1458   0,0,MatTranspose_MPIBAIJ,MatGetInfo_MPIBAIJ,
1459   0,MatGetDiagonal_MPIBAIJ,MatDiagonalScale_MPIBAIJ,MatNorm_MPIBAIJ,
1460   MatAssemblyBegin_MPIBAIJ,MatAssemblyEnd_MPIBAIJ,0,MatSetOption_MPIBAIJ,
1461   MatZeroEntries_MPIBAIJ,MatZeroRows_MPIBAIJ,0,
1462   0,0,0,MatGetSize_MPIBAIJ,
1463   MatGetLocalSize_MPIBAIJ,MatGetOwnershipRange_MPIBAIJ,0,0,
1464   0,0,MatConvertSameType_MPIBAIJ,0,0,
1465   0,0,0,MatGetSubMatrices_MPIBAIJ,
1466   MatIncreaseOverlap_MPIBAIJ,MatGetValues_MPIBAIJ,0,MatPrintHelp_MPIBAIJ,
1467   MatScale_MPIBAIJ,0,0,0,MatGetBlockSize_MPIBAIJ,
1468   0,0,0,0,0,0,MatSetUnfactored_MPIBAIJ,0,MatSetValuesBlocked_MPIBAIJ};
1469 
1470 
1471 #undef __FUNC__
1472 #define __FUNC__ "MatCreateMPIBAIJ"
1473 /*@C
1474    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
1475    (block compressed row).  For good matrix assembly performance
1476    the user should preallocate the matrix storage by setting the parameters
1477    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1478    performance can be increased by more than a factor of 50.
1479 
1480    Input Parameters:
1481 .  comm - MPI communicator
1482 .  bs   - size of blockk
1483 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1484            This value should be the same as the local size used in creating the
1485            y vector for the matrix-vector product y = Ax.
1486 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1487            This value should be the same as the local size used in creating the
1488            x vector for the matrix-vector product y = Ax.
1489 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1490 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1491 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
1492            submatrix  (same for all local rows)
1493 .  d_nzz - array containing the number of block nonzeros in the various block rows
1494            of the in diagonal portion of the local (possibly different for each block
1495            row) or PETSC_NULL.  You must leave room for the diagonal entry even if
1496            it is zero.
1497 .  o_nz  - number of block nonzeros per block row in the off-diagonal portion of local
1498            submatrix (same for all local rows).
1499 .  o_nzz - array containing the number of nonzeros in the various block rows of the
1500            off-diagonal portion of the local submatrix (possibly different for
1501            each block row) or PETSC_NULL.
1502 
1503    Output Parameter:
1504 .  A - the matrix
1505 
1506    Notes:
1507    The user MUST specify either the local or global matrix dimensions
1508    (possibly both).
1509 
1510    Storage Information:
1511    For a square global matrix we define each processor's diagonal portion
1512    to be its local rows and the corresponding columns (a square submatrix);
1513    each processor's off-diagonal portion encompasses the remainder of the
1514    local matrix (a rectangular submatrix).
1515 
1516    The user can specify preallocated storage for the diagonal part of
1517    the local submatrix with either d_nz or d_nnz (not both).  Set
1518    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1519    memory allocation.  Likewise, specify preallocated storage for the
1520    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1521 
1522    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1523    the figure below we depict these three local rows and all columns (0-11).
1524 
1525 $          0 1 2 3 4 5 6 7 8 9 10 11
1526 $         -------------------
1527 $  row 3  |  o o o d d d o o o o o o
1528 $  row 4  |  o o o d d d o o o o o o
1529 $  row 5  |  o o o d d d o o o o o o
1530 $         -------------------
1531 $
1532 
1533    Thus, any entries in the d locations are stored in the d (diagonal)
1534    submatrix, and any entries in the o locations are stored in the
1535    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1536    stored simply in the MATSEQBAIJ format for compressed row storage.
1537 
1538    Now d_nz should indicate the number of nonzeros per row in the d matrix,
1539    and o_nz should indicate the number of nonzeros per row in the o matrix.
1540    In general, for PDE problems in which most nonzeros are near the diagonal,
1541    one expects d_nz >> o_nz.   For large problems you MUST preallocate memory
1542    or you will get TERRIBLE performance; see the users' manual chapter on
1543    matrices.
1544 
1545 .keywords: matrix, block, aij, compressed row, sparse, parallel
1546 
1547 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues()
1548 @*/
1549 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
1550                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1551 {
1552   Mat          B;
1553   Mat_MPIBAIJ  *b;
1554   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE,size;
1555 
1556   if (bs < 1) SETERRQ(1,0,"Invalid block size specified, must be positive");
1557 
1558   MPI_Comm_size(comm,&size);
1559   if (size == 1) {
1560     if (M == PETSC_DECIDE) M = m;
1561     if (N == PETSC_DECIDE) N = n;
1562     ierr = MatCreateSeqBAIJ(comm,bs,M,N,d_nz,d_nnz,A); CHKERRQ(ierr);
1563     return 0;
1564   }
1565 
1566   *A = 0;
1567   PetscHeaderCreate(B,_p_Mat,MAT_COOKIE,MATMPIBAIJ,comm,MatDestroy,MatView);
1568   PLogObjectCreate(B);
1569   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
1570   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
1571   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1572 
1573   B->destroy    = MatDestroy_MPIBAIJ;
1574   B->view       = MatView_MPIBAIJ;
1575   B->mapping    = 0;
1576   B->factor     = 0;
1577   B->assembled  = PETSC_FALSE;
1578 
1579   B->insertmode = NOT_SET_VALUES;
1580   MPI_Comm_rank(comm,&b->rank);
1581   MPI_Comm_size(comm,&b->size);
1582 
1583   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
1584     SETERRQ(1,0,"Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1585   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) SETERRQ(1,0,"either M or m should be specified");
1586   if ( M == PETSC_DECIDE && n == PETSC_DECIDE)SETERRQ(1,0,"either N or n should be specified");
1587   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
1588   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
1589 
1590   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1591     work[0] = m; work[1] = n;
1592     mbs = m/bs; nbs = n/bs;
1593     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
1594     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
1595     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
1596   }
1597   if (m == PETSC_DECIDE) {
1598     Mbs = M/bs;
1599     if (Mbs*bs != M) SETERRQ(1,0,"No of global rows must be divisible by blocksize");
1600     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
1601     m   = mbs*bs;
1602   }
1603   if (n == PETSC_DECIDE) {
1604     Nbs = N/bs;
1605     if (Nbs*bs != N) SETERRQ(1,0,"No of global cols must be divisible by blocksize");
1606     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
1607     n   = nbs*bs;
1608   }
1609   if (mbs*bs != m || nbs*bs != n) SETERRQ(1,0,"No of local rows, cols must be divisible by blocksize");
1610 
1611   b->m = m; B->m = m;
1612   b->n = n; B->n = n;
1613   b->N = N; B->N = N;
1614   b->M = M; B->M = M;
1615   b->bs  = bs;
1616   b->bs2 = bs*bs;
1617   b->mbs = mbs;
1618   b->nbs = nbs;
1619   b->Mbs = Mbs;
1620   b->Nbs = Nbs;
1621 
1622   /* build local table of row and column ownerships */
1623   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1624   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIBAIJ));
1625   b->cowners = b->rowners + b->size + 2;
1626   MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
1627   b->rowners[0] = 0;
1628   for ( i=2; i<=b->size; i++ ) {
1629     b->rowners[i] += b->rowners[i-1];
1630   }
1631   b->rstart    = b->rowners[b->rank];
1632   b->rend      = b->rowners[b->rank+1];
1633   b->rstart_bs = b->rstart * bs;
1634   b->rend_bs   = b->rend * bs;
1635 
1636   MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
1637   b->cowners[0] = 0;
1638   for ( i=2; i<=b->size; i++ ) {
1639     b->cowners[i] += b->cowners[i-1];
1640   }
1641   b->cstart    = b->cowners[b->rank];
1642   b->cend      = b->cowners[b->rank+1];
1643   b->cstart_bs = b->cstart * bs;
1644   b->cend_bs   = b->cend * bs;
1645 
1646   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1647   ierr = MatCreateSeqBAIJ(PETSC_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1648   PLogObjectParent(B,b->A);
1649   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1650   ierr = MatCreateSeqBAIJ(PETSC_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1651   PLogObjectParent(B,b->B);
1652 
1653   /* build cache for off array entries formed */
1654   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1655   b->donotstash  = 0;
1656   b->colmap      = 0;
1657   b->garray      = 0;
1658   b->roworiented = 1;
1659 
1660   /* stuff used in block assembly */
1661   b->barray      = 0;
1662 
1663   /* stuff used for matrix vector multiply */
1664   b->lvec        = 0;
1665   b->Mvctx       = 0;
1666 
1667   /* stuff for MatGetRow() */
1668   b->rowindices   = 0;
1669   b->rowvalues    = 0;
1670   b->getrowactive = PETSC_FALSE;
1671 
1672   *A = B;
1673   return 0;
1674 }
1675 
1676 #undef __FUNC__
1677 #define __FUNC__ "MatConvertSameType_MPIBAIJ"
1678 static int MatConvertSameType_MPIBAIJ(Mat matin,Mat *newmat,int cpvalues)
1679 {
1680   Mat         mat;
1681   Mat_MPIBAIJ *a,*oldmat = (Mat_MPIBAIJ *) matin->data;
1682   int         ierr, len=0, flg;
1683 
1684   *newmat       = 0;
1685   PetscHeaderCreate(mat,_p_Mat,MAT_COOKIE,MATMPIBAIJ,matin->comm,MatDestroy,MatView);
1686   PLogObjectCreate(mat);
1687   mat->data       = (void *) (a = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(a);
1688   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1689   mat->destroy    = MatDestroy_MPIBAIJ;
1690   mat->view       = MatView_MPIBAIJ;
1691   mat->factor     = matin->factor;
1692   mat->assembled  = PETSC_TRUE;
1693 
1694   a->m = mat->m   = oldmat->m;
1695   a->n = mat->n   = oldmat->n;
1696   a->M = mat->M   = oldmat->M;
1697   a->N = mat->N   = oldmat->N;
1698 
1699   a->bs  = oldmat->bs;
1700   a->bs2 = oldmat->bs2;
1701   a->mbs = oldmat->mbs;
1702   a->nbs = oldmat->nbs;
1703   a->Mbs = oldmat->Mbs;
1704   a->Nbs = oldmat->Nbs;
1705 
1706   a->rstart       = oldmat->rstart;
1707   a->rend         = oldmat->rend;
1708   a->cstart       = oldmat->cstart;
1709   a->cend         = oldmat->cend;
1710   a->size         = oldmat->size;
1711   a->rank         = oldmat->rank;
1712   mat->insertmode = NOT_SET_VALUES;
1713   a->rowvalues    = 0;
1714   a->getrowactive = PETSC_FALSE;
1715   a->barray       = 0;
1716 
1717   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1718   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIBAIJ));
1719   a->cowners = a->rowners + a->size + 2;
1720   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1721   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1722   if (oldmat->colmap) {
1723     a->colmap = (int *) PetscMalloc((a->Nbs)*sizeof(int));CHKPTRQ(a->colmap);
1724     PLogObjectMemory(mat,(a->Nbs)*sizeof(int));
1725     PetscMemcpy(a->colmap,oldmat->colmap,(a->Nbs)*sizeof(int));
1726   } else a->colmap = 0;
1727   if (oldmat->garray && (len = ((Mat_SeqBAIJ *) (oldmat->B->data))->nbs)) {
1728     a->garray = (int *) PetscMalloc(len*sizeof(int)); CHKPTRQ(a->garray);
1729     PLogObjectMemory(mat,len*sizeof(int));
1730     PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1731   } else a->garray = 0;
1732 
1733   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1734   PLogObjectParent(mat,a->lvec);
1735   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1736   PLogObjectParent(mat,a->Mvctx);
1737   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1738   PLogObjectParent(mat,a->A);
1739   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1740   PLogObjectParent(mat,a->B);
1741   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1742   if (flg) {
1743     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1744   }
1745   *newmat = mat;
1746   return 0;
1747 }
1748 
1749 #include "sys.h"
1750 
1751 #undef __FUNC__
1752 #define __FUNC__ "MatLoad_MPIBAIJ"
1753 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
1754 {
1755   Mat          A;
1756   int          i, nz, ierr, j,rstart, rend, fd;
1757   Scalar       *vals,*buf;
1758   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1759   MPI_Status   status;
1760   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
1761   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
1762   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,bs2,Mbs,mbs,extra_rows;
1763   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
1764   int          dcount,kmax,k,nzcount,tmp;
1765 
1766 
1767   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
1768   bs2  = bs*bs;
1769 
1770   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1771   if (!rank) {
1772     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1773     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
1774     if (header[0] != MAT_COOKIE) SETERRQ(1,0,"not matrix object");
1775   }
1776 
1777   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1778   M = header[1]; N = header[2];
1779 
1780   if (M != N) SETERRQ(1,0,"Can only do square matrices");
1781 
1782   /*
1783      This code adds extra rows to make sure the number of rows is
1784      divisible by the blocksize
1785   */
1786   Mbs        = M/bs;
1787   extra_rows = bs - M + bs*(Mbs);
1788   if (extra_rows == bs) extra_rows = 0;
1789   else                  Mbs++;
1790   if (extra_rows &&!rank) {
1791     PLogInfo(0,"MatLoad_MPIBAIJ:Padding loaded matrix to match blocksize\n");
1792   }
1793 
1794   /* determine ownership of all rows */
1795   mbs = Mbs/size + ((Mbs % size) > rank);
1796   m   = mbs * bs;
1797   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
1798   browners = rowners + size + 1;
1799   MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1800   rowners[0] = 0;
1801   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
1802   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
1803   rstart = rowners[rank];
1804   rend   = rowners[rank+1];
1805 
1806   /* distribute row lengths to all processors */
1807   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
1808   if (!rank) {
1809     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
1810     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1811     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
1812     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1813     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
1814     MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);
1815     PetscFree(sndcounts);
1816   }
1817   else {
1818     MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);
1819   }
1820 
1821   if (!rank) {
1822     /* calculate the number of nonzeros on each processor */
1823     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1824     PetscMemzero(procsnz,size*sizeof(int));
1825     for ( i=0; i<size; i++ ) {
1826       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
1827         procsnz[i] += rowlengths[j];
1828       }
1829     }
1830     PetscFree(rowlengths);
1831 
1832     /* determine max buffer needed and allocate it */
1833     maxnz = 0;
1834     for ( i=0; i<size; i++ ) {
1835       maxnz = PetscMax(maxnz,procsnz[i]);
1836     }
1837     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1838 
1839     /* read in my part of the matrix column indices  */
1840     nz = procsnz[0];
1841     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1842     mycols = ibuf;
1843     if (size == 1)  nz -= extra_rows;
1844     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1845     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
1846 
1847     /* read in every ones (except the last) and ship off */
1848     for ( i=1; i<size-1; i++ ) {
1849       nz = procsnz[i];
1850       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1851       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1852     }
1853     /* read in the stuff for the last proc */
1854     if ( size != 1 ) {
1855       nz = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
1856       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1857       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
1858       MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);
1859     }
1860     PetscFree(cols);
1861   }
1862   else {
1863     /* determine buffer space needed for message */
1864     nz = 0;
1865     for ( i=0; i<m; i++ ) {
1866       nz += locrowlens[i];
1867     }
1868     ibuf = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1869     mycols = ibuf;
1870     /* receive message of column indices*/
1871     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1872     MPI_Get_count(&status,MPI_INT,&maxnz);
1873     if (maxnz != nz) SETERRQ(1,0,"something is wrong with file");
1874   }
1875 
1876   /* loop over local rows, determining number of off diagonal entries */
1877   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
1878   odlens = dlens + (rend-rstart);
1879   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
1880   PetscMemzero(mask,3*Mbs*sizeof(int));
1881   masked1 = mask    + Mbs;
1882   masked2 = masked1 + Mbs;
1883   rowcount = 0; nzcount = 0;
1884   for ( i=0; i<mbs; i++ ) {
1885     dcount  = 0;
1886     odcount = 0;
1887     for ( j=0; j<bs; j++ ) {
1888       kmax = locrowlens[rowcount];
1889       for ( k=0; k<kmax; k++ ) {
1890         tmp = mycols[nzcount++]/bs;
1891         if (!mask[tmp]) {
1892           mask[tmp] = 1;
1893           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
1894           else masked1[dcount++] = tmp;
1895         }
1896       }
1897       rowcount++;
1898     }
1899 
1900     dlens[i]  = dcount;
1901     odlens[i] = odcount;
1902 
1903     /* zero out the mask elements we set */
1904     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
1905     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
1906   }
1907 
1908   /* create our matrix */
1909   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);
1910          CHKERRQ(ierr);
1911   A = *newmat;
1912   MatSetOption(A,MAT_COLUMNS_SORTED);
1913 
1914   if (!rank) {
1915     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
1916     /* read in my part of the matrix numerical values  */
1917     nz = procsnz[0];
1918     vals = buf;
1919     mycols = ibuf;
1920     if (size == 1)  nz -= extra_rows;
1921     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1922     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
1923 
1924     /* insert into matrix */
1925     jj      = rstart*bs;
1926     for ( i=0; i<m; i++ ) {
1927       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1928       mycols += locrowlens[i];
1929       vals   += locrowlens[i];
1930       jj++;
1931     }
1932     /* read in other processors (except the last one) and ship out */
1933     for ( i=1; i<size-1; i++ ) {
1934       nz = procsnz[i];
1935       vals = buf;
1936       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1937       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1938     }
1939     /* the last proc */
1940     if ( size != 1 ){
1941       nz = procsnz[i] - extra_rows;
1942       vals = buf;
1943       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1944       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
1945       MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);
1946     }
1947     PetscFree(procsnz);
1948   }
1949   else {
1950     /* receive numeric values */
1951     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
1952 
1953     /* receive message of values*/
1954     vals = buf;
1955     mycols = ibuf;
1956     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1957     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1958     if (maxnz != nz) SETERRQ(1,0,"something is wrong with file");
1959 
1960     /* insert into matrix */
1961     jj      = rstart*bs;
1962     for ( i=0; i<m; i++ ) {
1963       ierr    = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1964       mycols += locrowlens[i];
1965       vals   += locrowlens[i];
1966       jj++;
1967     }
1968   }
1969   PetscFree(locrowlens);
1970   PetscFree(buf);
1971   PetscFree(ibuf);
1972   PetscFree(rowners);
1973   PetscFree(dlens);
1974   PetscFree(mask);
1975   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1976   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1977   return 0;
1978 }
1979 
1980 
1981