xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision 52c87ff2747de67dfdd6ccb0bd18d7d7f8d26768)
1 
2 #ifdef PETSC_RCS_HEADER
3 static char vcid[] = "$Id: mpibaij.c,v 1.77 1997/08/07 14:40:00 bsmith Exp balay $";
4 #endif
5 
6 #include "pinclude/pviewer.h"
7 #include "src/mat/impls/baij/mpi/mpibaij.h"
8 #include "src/vec/vecimpl.h"
9 
10 
11 extern int MatSetUpMultiply_MPIBAIJ(Mat);
12 extern int DisAssemble_MPIBAIJ(Mat);
13 extern int MatIncreaseOverlap_MPIBAIJ(Mat,int,IS *,int);
14 extern int MatGetSubMatrices_MPIBAIJ(Mat,int,IS *,IS *,MatGetSubMatrixCall,Mat **);
15 
16 /*
17      Local utility routine that creates a mapping from the global column
18    number to the local number in the off-diagonal part of the local
19    storage of the matrix.  This is done in a non scable way since the
20    length of colmap equals the global matrix length.
21 */
22 #undef __FUNC__
23 #define __FUNC__ "CreateColmap_MPIBAIJ_Private"
24 static int CreateColmap_MPIBAIJ_Private(Mat mat)
25 {
26   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
27   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
28   int         nbs = B->nbs,i,bs=B->bs;;
29 
30   baij->colmap = (int *) PetscMalloc((baij->Nbs+1)*sizeof(int));CHKPTRQ(baij->colmap);
31   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
32   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
33   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i*bs+1;
34   return 0;
35 }
36 
37 #define CHUNKSIZE  10
38 
39 #define  MatSetValues_SeqBAIJ_A_Private(row,col,value,addv) \
40 { \
41  \
42     brow = row/bs;  \
43     rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
44     rmax = aimax[brow]; nrow = ailen[brow]; \
45       bcol = col/bs; \
46       ridx = row % bs; cidx = col % bs; \
47       low = 0; high = nrow; \
48       while (high-low > 3) { \
49         t = (low+high)/2; \
50         if (rp[t] > bcol) high = t; \
51         else              low  = t; \
52       } \
53       for ( _i=low; _i<high; _i++ ) { \
54         if (rp[_i] > bcol) break; \
55         if (rp[_i] == bcol) { \
56           bap  = ap +  bs2*_i + bs*cidx + ridx; \
57           if (addv == ADD_VALUES) *bap += value;  \
58           else                    *bap  = value;  \
59           goto a_noinsert; \
60         } \
61       } \
62       if (a->nonew == 1) goto a_noinsert; \
63       else if (a->nonew == -1) SETERRQ(1,0,"Inserting a new nonzero in the matrix"); \
64       if (nrow >= rmax) { \
65         /* there is no extra room in row, therefore enlarge */ \
66         int    new_nz = ai[a->mbs] + CHUNKSIZE,len,*new_i,*new_j; \
67         Scalar *new_a; \
68  \
69         if (a->nonew == -2) SETERRQ(1,0,"Inserting a new nonzero in the matrix"); \
70  \
71         /* malloc new storage space */ \
72         len     = new_nz*(sizeof(int)+bs2*sizeof(Scalar))+(a->mbs+1)*sizeof(int); \
73         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
74         new_j   = (int *) (new_a + bs2*new_nz); \
75         new_i   = new_j + new_nz; \
76  \
77         /* copy over old data into new slots */ \
78         for ( ii=0; ii<brow+1; ii++ ) {new_i[ii] = ai[ii];} \
79         for ( ii=brow+1; ii<a->mbs+1; ii++ ) {new_i[ii] = ai[ii]+CHUNKSIZE;} \
80         PetscMemcpy(new_j,aj,(ai[brow]+nrow)*sizeof(int)); \
81         len = (new_nz - CHUNKSIZE - ai[brow] - nrow); \
82         PetscMemcpy(new_j+ai[brow]+nrow+CHUNKSIZE,aj+ai[brow]+nrow, \
83                                                            len*sizeof(int)); \
84         PetscMemcpy(new_a,aa,(ai[brow]+nrow)*bs2*sizeof(Scalar)); \
85         PetscMemzero(new_a+bs2*(ai[brow]+nrow),bs2*CHUNKSIZE*sizeof(Scalar)); \
86         PetscMemcpy(new_a+bs2*(ai[brow]+nrow+CHUNKSIZE), \
87                     aa+bs2*(ai[brow]+nrow),bs2*len*sizeof(Scalar));  \
88         /* free up old matrix storage */ \
89         PetscFree(a->a);  \
90         if (!a->singlemalloc) {PetscFree(a->i);PetscFree(a->j);} \
91         aa = a->a = new_a; ai = a->i = new_i; aj = a->j = new_j;  \
92         a->singlemalloc = 1; \
93  \
94         rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
95         rmax = aimax[brow] = aimax[brow] + CHUNKSIZE; \
96         PLogObjectMemory(A,CHUNKSIZE*(sizeof(int) + bs2*sizeof(Scalar))); \
97         a->maxnz += bs2*CHUNKSIZE; \
98         a->reallocs++; \
99         a->nz++; \
100       } \
101       N = nrow++ - 1;  \
102       /* shift up all the later entries in this row */ \
103       for ( ii=N; ii>=_i; ii-- ) { \
104         rp[ii+1] = rp[ii]; \
105         PetscMemcpy(ap+bs2*(ii+1),ap+bs2*(ii),bs2*sizeof(Scalar)); \
106       } \
107       if (N>=_i) PetscMemzero(ap+bs2*_i,bs2*sizeof(Scalar));  \
108       rp[_i]                      = bcol;  \
109       ap[bs2*_i + bs*cidx + ridx] = value;  \
110       a_noinsert:; \
111     ailen[brow] = nrow; \
112 }
113 
114 #define  MatSetValues_SeqBAIJ_B_Private(row,col,value,addv) \
115 { \
116  \
117     brow = row/bs;  \
118     rp   = bj + bi[brow]; ap = ba + bs2*bi[brow]; \
119     rmax = bimax[brow]; nrow = bilen[brow]; \
120       bcol = col/bs; \
121       ridx = row % bs; cidx = col % bs; \
122       low = 0; high = nrow; \
123       while (high-low > 3) { \
124         t = (low+high)/2; \
125         if (rp[t] > bcol) high = t; \
126         else              low  = t; \
127       } \
128       for ( _i=low; _i<high; _i++ ) { \
129         if (rp[_i] > bcol) break; \
130         if (rp[_i] == bcol) { \
131           bap  = ap +  bs2*_i + bs*cidx + ridx; \
132           if (addv == ADD_VALUES) *bap += value;  \
133           else                    *bap  = value;  \
134           goto b_noinsert; \
135         } \
136       } \
137       if (b->nonew == 1) goto b_noinsert; \
138       else if (b->nonew == -1) SETERRQ(1,0,"Inserting a new nonzero in the matrix"); \
139       if (nrow >= rmax) { \
140         /* there is no extra room in row, therefore enlarge */ \
141         int    new_nz = bi[b->mbs] + CHUNKSIZE,len,*new_i,*new_j; \
142         Scalar *new_a; \
143  \
144         if (b->nonew == -2) SETERRQ(1,0,"Inserting a new nonzero in the matrix"); \
145  \
146         /* malloc new storage space */ \
147         len     = new_nz*(sizeof(int)+bs2*sizeof(Scalar))+(b->mbs+1)*sizeof(int); \
148         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
149         new_j   = (int *) (new_a + bs2*new_nz); \
150         new_i   = new_j + new_nz; \
151  \
152         /* copy over old data into new slots */ \
153         for ( ii=0; ii<brow+1; ii++ ) {new_i[ii] = bi[ii];} \
154         for ( ii=brow+1; ii<b->mbs+1; ii++ ) {new_i[ii] = bi[ii]+CHUNKSIZE;} \
155         PetscMemcpy(new_j,bj,(bi[brow]+nrow)*sizeof(int)); \
156         len = (new_nz - CHUNKSIZE - bi[brow] - nrow); \
157         PetscMemcpy(new_j+bi[brow]+nrow+CHUNKSIZE,bj+bi[brow]+nrow, \
158                                                            len*sizeof(int)); \
159         PetscMemcpy(new_a,ba,(bi[brow]+nrow)*bs2*sizeof(Scalar)); \
160         PetscMemzero(new_a+bs2*(bi[brow]+nrow),bs2*CHUNKSIZE*sizeof(Scalar)); \
161         PetscMemcpy(new_a+bs2*(bi[brow]+nrow+CHUNKSIZE), \
162                     ba+bs2*(bi[brow]+nrow),bs2*len*sizeof(Scalar));  \
163         /* free up old matrix storage */ \
164         PetscFree(b->a);  \
165         if (!b->singlemalloc) {PetscFree(b->i);PetscFree(b->j);} \
166         ba = b->a = new_a; bi = b->i = new_i; bj = b->j = new_j;  \
167         b->singlemalloc = 1; \
168  \
169         rp   = bj + bi[brow]; ap = ba + bs2*bi[brow]; \
170         rmax = bimax[brow] = bimax[brow] + CHUNKSIZE; \
171         PLogObjectMemory(B,CHUNKSIZE*(sizeof(int) + bs2*sizeof(Scalar))); \
172         b->maxnz += bs2*CHUNKSIZE; \
173         b->reallocs++; \
174         b->nz++; \
175       } \
176       N = nrow++ - 1;  \
177       /* shift up all the later entries in this row */ \
178       for ( ii=N; ii>=_i; ii-- ) { \
179         rp[ii+1] = rp[ii]; \
180         PetscMemcpy(ap+bs2*(ii+1),ap+bs2*(ii),bs2*sizeof(Scalar)); \
181       } \
182       if (N>=_i) PetscMemzero(ap+bs2*_i,bs2*sizeof(Scalar));  \
183       rp[_i]                      = bcol;  \
184       ap[bs2*_i + bs*cidx + ridx] = value;  \
185       b_noinsert:; \
186     bilen[brow] = nrow; \
187 }
188 
189 #undef __FUNC__
190 #define __FUNC__ "MatSetValues_MPIBAIJ"
191 int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
192 {
193   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
194   Scalar      value;
195   int         ierr,i,j,row,col;
196   int         roworiented = baij->roworiented,rstart_orig=baij->rstart_bs ;
197   int         rend_orig=baij->rend_bs,cstart_orig=baij->cstart_bs;
198   int         cend_orig=baij->cend_bs,bs=baij->bs;
199 
200   /* Some Variables required in the macro */
201   Mat         A = baij->A;
202   Mat_SeqBAIJ *a = (Mat_SeqBAIJ *) (A)->data;
203   int         *aimax=a->imax,*ai=a->i,*ailen=a->ilen,*aj=a->j;
204   Scalar      *aa=a->a;
205 
206   Mat         B = baij->B;
207   Mat_SeqBAIJ *b = (Mat_SeqBAIJ *) (B)->data;
208   int         *bimax=b->imax,*bi=b->i,*bilen=b->ilen,*bj=b->j;
209   Scalar      *ba=b->a;
210 
211   int         *rp,ii,nrow,_i,rmax,N,brow,bcol;
212   int         low,high,t,ridx,cidx,bs2=a->bs2;
213   Scalar      *ap,*bap;
214 
215   for ( i=0; i<m; i++ ) {
216 #if defined(PETSC_BOPT_g)
217     if (im[i] < 0) SETERRQ(1,0,"Negative row");
218     if (im[i] >= baij->M) SETERRQ(1,0,"Row too large");
219 #endif
220     if (im[i] >= rstart_orig && im[i] < rend_orig) {
221       row = im[i] - rstart_orig;
222       for ( j=0; j<n; j++ ) {
223         if (in[j] >= cstart_orig && in[j] < cend_orig){
224           col = in[j] - cstart_orig;
225           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
226           MatSetValues_SeqBAIJ_A_Private(row,col,value,addv);
227           /* ierr = MatSetValues_SeqBAIJ(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
228         }
229 #if defined(PETSC_BOPT_g)
230         else if (in[j] < 0) {SETERRQ(1,0,"Negative column");}
231         else if (in[j] >= baij->N) {SETERRQ(1,0,"Col too large");}
232 #endif
233         else {
234           if (mat->was_assembled) {
235             if (!baij->colmap) {
236               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
237             }
238             col = baij->colmap[in[j]/bs] - 1 + in[j]%bs;
239             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
240               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
241               col =  in[j];
242               /* Reinitialize the variables required by MatSetValues_SeqBAIJ_B_Private() */
243               B = baij->B;
244               b = (Mat_SeqBAIJ *) (B)->data;
245               bimax=b->imax;bi=b->i;bilen=b->ilen;bj=b->j;
246               ba=b->a;
247             }
248           }
249           else col = in[j];
250           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
251           MatSetValues_SeqBAIJ_B_Private(row,col,value,addv);
252           /* ierr = MatSetValues_SeqBAIJ(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
253         }
254       }
255     }
256     else {
257       if (roworiented && !baij->donotstash) {
258         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
259       }
260       else {
261         if (!baij->donotstash) {
262           row = im[i];
263 	  for ( j=0; j<n; j++ ) {
264 	    ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
265           }
266         }
267       }
268     }
269   }
270   return 0;
271 }
272 
273 extern int MatSetValuesBlocked_SeqBAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
274 #undef __FUNC__
275 #define __FUNC__ "MatSetValuesBlocked_MPIBAIJ"
276 int MatSetValuesBlocked_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
277 {
278   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
279   Scalar      *value,*barray=baij->barray;
280   int         ierr,i,j,ii,jj,row,col,k,l;
281   int         roworiented = baij->roworiented,rstart=baij->rstart ;
282   int         rend=baij->rend,cstart=baij->cstart,stepval;
283   int         cend=baij->cend,bs=baij->bs,bs2=baij->bs2;
284 
285 
286   if(!barray) {
287     baij->barray = barray = (Scalar*) PetscMalloc(bs2*sizeof(Scalar)); CHKPTRQ(barray);
288   }
289 
290   if (roworiented) {
291     stepval = (n-1)*bs;
292   } else {
293     stepval = (m-1)*bs;
294   }
295   for ( i=0; i<m; i++ ) {
296 #if defined(PETSC_BOPT_g)
297     if (im[i] < 0) SETERRQ(1,0,"Negative row");
298     if (im[i] >= baij->Mbs) SETERRQ(1,0,"Row too large");
299 #endif
300     if (im[i] >= rstart && im[i] < rend) {
301       row = im[i] - rstart;
302       for ( j=0; j<n; j++ ) {
303         /* If NumCol = 1 then a copy is not required */
304         if ((roworiented) && (n == 1)) {
305           barray = v + i*bs2;
306         } else if((!roworiented) && (m == 1)) {
307           barray = v + j*bs2;
308         } else { /* Here a copy is required */
309           if (roworiented) {
310             value = v + i*(stepval+bs)*bs + j*bs;
311           } else {
312             value = v + j*(stepval+bs)*bs + i*bs;
313           }
314           for ( ii=0; ii<bs; ii++,value+=stepval ) {
315             for (jj=0; jj<bs; jj++ ) {
316               *barray++  = *value++;
317             }
318           }
319           barray -=bs2;
320         }
321 
322         if (in[j] >= cstart && in[j] < cend){
323           col  = in[j] - cstart;
324           ierr = MatSetValuesBlocked_SeqBAIJ(baij->A,1,&row,1,&col,barray,addv);CHKERRQ(ierr);
325         }
326 #if defined(PETSC_BOPT_g)
327         else if (in[j] < 0) {SETERRQ(1,0,"Negative column");}
328         else if (in[j] >= baij->Nbs) {SETERRQ(1,0,"Column too large");}
329 #endif
330         else {
331           if (mat->was_assembled) {
332             if (!baij->colmap) {
333               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
334             }
335 
336 #if defined(PETSC_BOPT_g)
337             if ((baij->colmap[in[j]] - 1) % bs) {SETERRQ(1,0,"Incorrect colmap");}
338 #endif
339             col = (baij->colmap[in[j]] - 1)/bs;
340             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
341               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
342               col =  in[j];
343             }
344           }
345           else col = in[j];
346           ierr = MatSetValuesBlocked_SeqBAIJ(baij->B,1,&row,1,&col,barray,addv);CHKERRQ(ierr);
347         }
348       }
349     }
350     else {
351       if (!baij->donotstash) {
352         if (roworiented ) {
353           row   = im[i]*bs;
354           value = v + i*(stepval+bs)*bs;
355           for ( j=0; j<bs; j++,row++ ) {
356             for ( k=0; k<n; k++ ) {
357               for ( col=in[k]*bs,l=0; l<bs; l++,col++) {
358                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
359               }
360             }
361           }
362         }
363         else {
364           for ( j=0; j<n; j++ ) {
365             value = v + j*(stepval+bs)*bs + i*bs;
366             col   = in[j]*bs;
367             for ( k=0; k<bs; k++,col++,value+=stepval) {
368               for ( row = im[i]*bs,l=0; l<bs; l++,row++) {
369                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
370               }
371             }
372           }
373         }
374       }
375     }
376   }
377   return 0;
378 }
379 
380 #undef __FUNC__
381 #define __FUNC__ "MatGetValues_MPIBAIJ"
382 int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
383 {
384   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
385   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
386   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col;
387 
388   for ( i=0; i<m; i++ ) {
389     if (idxm[i] < 0) SETERRQ(1,0,"Negative row");
390     if (idxm[i] >= baij->M) SETERRQ(1,0,"Row too large");
391     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
392       row = idxm[i] - bsrstart;
393       for ( j=0; j<n; j++ ) {
394         if (idxn[j] < 0) SETERRQ(1,0,"Negative column");
395         if (idxn[j] >= baij->N) SETERRQ(1,0,"Col too large");
396         if (idxn[j] >= bscstart && idxn[j] < bscend){
397           col = idxn[j] - bscstart;
398           ierr = MatGetValues(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
399         }
400         else {
401           if (!baij->colmap) {
402             ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
403           }
404           if((baij->colmap[idxn[j]/bs]-1 < 0) ||
405              (baij->garray[(baij->colmap[idxn[j]/bs]-1)/bs] != idxn[j]/bs)) *(v+i*n+j) = 0.0;
406           else {
407             col  = (baij->colmap[idxn[j]/bs]-1) + idxn[j]%bs;
408             ierr = MatGetValues(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
409           }
410         }
411       }
412     }
413     else {
414       SETERRQ(1,0,"Only local values currently supported");
415     }
416   }
417   return 0;
418 }
419 
420 #undef __FUNC__
421 #define __FUNC__ "MatNorm_MPIBAIJ"
422 int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
423 {
424   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
425   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
426   int        ierr, i,bs2=baij->bs2;
427   double     sum = 0.0;
428   Scalar     *v;
429 
430   if (baij->size == 1) {
431     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
432   } else {
433     if (type == NORM_FROBENIUS) {
434       v = amat->a;
435       for (i=0; i<amat->nz*bs2; i++ ) {
436 #if defined(PETSC_COMPLEX)
437         sum += real(conj(*v)*(*v)); v++;
438 #else
439         sum += (*v)*(*v); v++;
440 #endif
441       }
442       v = bmat->a;
443       for (i=0; i<bmat->nz*bs2; i++ ) {
444 #if defined(PETSC_COMPLEX)
445         sum += real(conj(*v)*(*v)); v++;
446 #else
447         sum += (*v)*(*v); v++;
448 #endif
449       }
450       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
451       *norm = sqrt(*norm);
452     }
453     else
454       SETERRQ(PETSC_ERR_SUP,0,"No support for this norm yet");
455   }
456   return 0;
457 }
458 
459 #undef __FUNC__
460 #define __FUNC__ "MatAssemblyBegin_MPIBAIJ"
461 int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
462 {
463   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
464   MPI_Comm    comm = mat->comm;
465   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
466   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
467   MPI_Request *send_waits,*recv_waits;
468   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
469   InsertMode  addv;
470   Scalar      *rvalues,*svalues;
471 
472   /* make sure all processors are either in INSERTMODE or ADDMODE */
473   MPI_Allreduce(&mat->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
474   if (addv == (ADD_VALUES|INSERT_VALUES)) {
475     SETERRQ(1,0,"Some processors inserted others added");
476   }
477   mat->insertmode = addv; /* in case this processor had no cache */
478 
479   /*  first count number of contributors to each processor */
480   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
481   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
482   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
483   for ( i=0; i<baij->stash.n; i++ ) {
484     idx = baij->stash.idx[i];
485     for ( j=0; j<size; j++ ) {
486       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
487         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
488       }
489     }
490   }
491   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
492 
493   /* inform other processors of number of messages and max length*/
494   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
495   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
496   nreceives = work[rank];
497   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
498   nmax = work[rank];
499   PetscFree(work);
500 
501   /* post receives:
502        1) each message will consist of ordered pairs
503      (global index,value) we store the global index as a double
504      to simplify the message passing.
505        2) since we don't know how long each individual message is we
506      allocate the largest needed buffer for each receive. Potentially
507      this is a lot of wasted space.
508 
509 
510        This could be done better.
511   */
512   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
513   CHKPTRQ(rvalues);
514   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
515   CHKPTRQ(recv_waits);
516   for ( i=0; i<nreceives; i++ ) {
517     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
518               comm,recv_waits+i);
519   }
520 
521   /* do sends:
522       1) starts[i] gives the starting index in svalues for stuff going to
523          the ith processor
524   */
525   svalues = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
526   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
527   CHKPTRQ(send_waits);
528   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
529   starts[0] = 0;
530   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
531   for ( i=0; i<baij->stash.n; i++ ) {
532     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
533     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
534     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
535   }
536   PetscFree(owner);
537   starts[0] = 0;
538   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
539   count = 0;
540   for ( i=0; i<size; i++ ) {
541     if (procs[i]) {
542       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
543                 comm,send_waits+count++);
544     }
545   }
546   PetscFree(starts); PetscFree(nprocs);
547 
548   /* Free cache space */
549   PLogInfo(mat,"MatAssemblyBegin_MPIBAIJ:Number of off-processor values %d\n",baij->stash.n);
550   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
551 
552   baij->svalues    = svalues;    baij->rvalues    = rvalues;
553   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
554   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
555   baij->rmax       = nmax;
556 
557   return 0;
558 }
559 #include <math.h>
560 #define HASH_KEY 0.6180339887
561 #define HASH1(size,key) ((int)((size)*fmod(((key)*HASH_KEY),1))+1)
562 
563 int CreateHashTable(Mat mat)
564 {
565   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
566   Mat         A = baij->A, B=baij->B;
567   Mat_SeqBAIJ *a=(Mat_SeqBAIJ *)A->data, *b=(Mat_SeqBAIJ *)B->data;
568   int         i,j,k,nz=a->nz+b->nz,h1,*ai=a->i,*aj=a->j,*bi=b->i,*bj=b->j;
569   int         size=(int)(1.5*nz),ct=0,max=0;
570   /* Scalar      *aa=a->a,*ba=b->a; */
571   double      key;
572   static double *HT;
573   static      int flag=1;
574 
575 
576   /* Allocate Memory for Hash Table */
577   if (flag) {
578     HT = (double*)PetscMalloc(size*sizeof(double));
579     flag = 0;
580   }
581   PetscMemzero(HT,size*sizeof(double));
582 
583   /* Loop Over A */
584   for ( i=0; i<a->n; i++ ) {
585     for ( j=ai[i]; j<ai[i+1]; j++ ) {
586       key = i*baij->n+aj[j]+1;
587       h1  = HASH1(size,key);
588 
589       for ( k=1; k<size; k++ ){
590         if (HT[(h1*k)%size] == 0.0) {
591           HT[(h1*k)%size] = key;
592           break;
593         } else ct++;
594       }
595       if (k> max) max =k;
596     }
597   }
598    printf("***max1 = %d\n",max);
599   /* Loop Over B */
600   for ( i=0; i<b->n; i++ ) {
601     for ( j=bi[i]; j<bi[i+1]; j++ ) {
602       key = i*b->n+bj[j]+1;
603       h1  = HASH1(size,key);
604       for ( k=1; k<size; k++ ){
605         if (HT[(h1*k)%size] == 0.0) {
606           HT[(h1*k)%size] = key;
607           break;
608         } else ct++;
609       }
610       if (k> max) max =k;
611     }
612   }
613 
614   printf("***max2 = %d\n",max);
615   /* Print Summary */
616   for ( i=0,key=0.0,j=0; i<size; i++)
617     if (HT[i]) {j++;}
618 
619   printf("Size %d Average Buckets %d no of Keys %d\n",size,ct,j);
620   return 0;
621 }
622 
623 #undef __FUNC__
624 #define __FUNC__ "MatAssemblyEnd_MPIBAIJ"
625 int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
626 {
627   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
628   MPI_Status  *send_status,recv_status;
629   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
630   int         bs=baij->bs,row,col,other_disassembled,flg;
631   Scalar      *values,val;
632   InsertMode  addv = mat->insertmode;
633 
634   /*  wait on receives */
635   while (count) {
636     MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);
637     /* unpack receives into our local space */
638     values = baij->rvalues + 3*imdex*baij->rmax;
639     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
640     n = n/3;
641     for ( i=0; i<n; i++ ) {
642       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
643       col = (int) PetscReal(values[3*i+1]);
644       val = values[3*i+2];
645       if (col >= baij->cstart*bs && col < baij->cend*bs) {
646         col -= baij->cstart*bs;
647         ierr = MatSetValues(baij->A,1,&row,1,&col,&val,addv); CHKERRQ(ierr)
648       }
649       else {
650         if (mat->was_assembled) {
651           if (!baij->colmap) {
652             ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);
653           }
654           col = (baij->colmap[col/bs]) - 1 + col%bs;
655           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
656             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
657             col = (int) PetscReal(values[3*i+1]);
658           }
659         }
660         ierr = MatSetValues(baij->B,1,&row,1,&col,&val,addv); CHKERRQ(ierr)
661       }
662     }
663     count--;
664   }
665   PetscFree(baij->recv_waits); PetscFree(baij->rvalues);
666 
667   /* wait on sends */
668   if (baij->nsends) {
669     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));
670     CHKPTRQ(send_status);
671     MPI_Waitall(baij->nsends,baij->send_waits,send_status);
672     PetscFree(send_status);
673   }
674   PetscFree(baij->send_waits); PetscFree(baij->svalues);
675 
676   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
677   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
678 
679   /* determine if any processor has disassembled, if so we must
680      also disassemble ourselfs, in order that we may reassemble. */
681   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
682   if (mat->was_assembled && !other_disassembled) {
683     ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
684   }
685 
686   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
687     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
688   }
689   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
690   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
691 
692   ierr = OptionsHasName(PETSC_NULL,"-use_hash",&flg); CHKERRQ(ierr);
693   if (flg) CreateHashTable(mat);
694   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
695   return 0;
696 }
697 
698 #undef __FUNC__
699 #define __FUNC__ "MatView_MPIBAIJ_Binary"
700 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
701 {
702   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
703   int          ierr;
704 
705   if (baij->size == 1) {
706     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
707   }
708   else SETERRQ(1,0,"Only uniprocessor output supported");
709   return 0;
710 }
711 
712 #undef __FUNC__
713 #define __FUNC__ "MatView_MPIBAIJ_ASCIIorDraworMatlab"
714 static int MatView_MPIBAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
715 {
716   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
717   int          ierr, format,rank,bs = baij->bs;
718   FILE         *fd;
719   ViewerType   vtype;
720 
721   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
722   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
723     ierr = ViewerGetFormat(viewer,&format);
724     if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
725       MatInfo info;
726       MPI_Comm_rank(mat->comm,&rank);
727       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
728       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
729       PetscSequentialPhaseBegin(mat->comm,1);
730       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
731               rank,baij->m,(int)info.nz_used*bs,(int)info.nz_allocated*bs,
732               baij->bs,(int)info.memory);
733       ierr = MatGetInfo(baij->A,MAT_LOCAL,&info);
734       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
735       ierr = MatGetInfo(baij->B,MAT_LOCAL,&info);
736       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
737       fflush(fd);
738       PetscSequentialPhaseEnd(mat->comm,1);
739       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
740       return 0;
741     }
742     else if (format == VIEWER_FORMAT_ASCII_INFO) {
743       PetscPrintf(mat->comm,"  block size is %d\n",bs);
744       return 0;
745     }
746   }
747 
748   if (vtype == DRAW_VIEWER) {
749     Draw       draw;
750     PetscTruth isnull;
751     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
752     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
753   }
754 
755   if (vtype == ASCII_FILE_VIEWER) {
756     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
757     PetscSequentialPhaseBegin(mat->comm,1);
758     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
759            baij->rank,baij->m,baij->rstart*bs,baij->rend*bs,baij->n,
760             baij->cstart*bs,baij->cend*bs);
761     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
762     ierr = MatView(baij->B,viewer); CHKERRQ(ierr);
763     fflush(fd);
764     PetscSequentialPhaseEnd(mat->comm,1);
765   }
766   else {
767     int size = baij->size;
768     rank = baij->rank;
769     if (size == 1) {
770       ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
771     }
772     else {
773       /* assemble the entire matrix onto first processor. */
774       Mat         A;
775       Mat_SeqBAIJ *Aloc;
776       int         M = baij->M, N = baij->N,*ai,*aj,row,col,i,j,k,*rvals;
777       int         mbs=baij->mbs;
778       Scalar      *a;
779 
780       if (!rank) {
781         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
782         CHKERRQ(ierr);
783       }
784       else {
785         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
786         CHKERRQ(ierr);
787       }
788       PLogObjectParent(mat,A);
789 
790       /* copy over the A part */
791       Aloc = (Mat_SeqBAIJ*) baij->A->data;
792       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
793       row = baij->rstart;
794       rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
795 
796       for ( i=0; i<mbs; i++ ) {
797         rvals[0] = bs*(baij->rstart + i);
798         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
799         for ( j=ai[i]; j<ai[i+1]; j++ ) {
800           col = (baij->cstart+aj[j])*bs;
801           for (k=0; k<bs; k++ ) {
802             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
803             col++; a += bs;
804           }
805         }
806       }
807       /* copy over the B part */
808       Aloc = (Mat_SeqBAIJ*) baij->B->data;
809       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
810       row = baij->rstart*bs;
811       for ( i=0; i<mbs; i++ ) {
812         rvals[0] = bs*(baij->rstart + i);
813         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
814         for ( j=ai[i]; j<ai[i+1]; j++ ) {
815           col = baij->garray[aj[j]]*bs;
816           for (k=0; k<bs; k++ ) {
817             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
818             col++; a += bs;
819           }
820         }
821       }
822       PetscFree(rvals);
823       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
824       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
825       if (!rank) {
826         ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
827       }
828       ierr = MatDestroy(A); CHKERRQ(ierr);
829     }
830   }
831   return 0;
832 }
833 
834 
835 
836 #undef __FUNC__
837 #define __FUNC__ "MatView_MPIBAIJ"
838 int MatView_MPIBAIJ(PetscObject obj,Viewer viewer)
839 {
840   Mat         mat = (Mat) obj;
841   int         ierr;
842   ViewerType  vtype;
843 
844   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
845   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
846       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
847     ierr = MatView_MPIBAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
848   }
849   else if (vtype == BINARY_FILE_VIEWER) {
850     return MatView_MPIBAIJ_Binary(mat,viewer);
851   }
852   return 0;
853 }
854 
855 #undef __FUNC__
856 #define __FUNC__ "MatDestroy_MPIBAIJ"
857 int MatDestroy_MPIBAIJ(PetscObject obj)
858 {
859   Mat         mat = (Mat) obj;
860   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
861   int         ierr;
862 
863 #if defined(PETSC_LOG)
864   PLogObjectState(obj,"Rows=%d, Cols=%d",baij->M,baij->N);
865 #endif
866 
867   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
868   PetscFree(baij->rowners);
869   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
870   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
871   if (baij->colmap) PetscFree(baij->colmap);
872   if (baij->garray) PetscFree(baij->garray);
873   if (baij->lvec)   VecDestroy(baij->lvec);
874   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
875   if (baij->rowvalues) PetscFree(baij->rowvalues);
876   if (baij->barray) PetscFree(baij->barray);
877   PetscFree(baij);
878   if (mat->mapping) {
879     ierr = ISLocalToGlobalMappingDestroy(mat->mapping); CHKERRQ(ierr);
880   }
881   PLogObjectDestroy(mat);
882   PetscHeaderDestroy(mat);
883   return 0;
884 }
885 
886 #undef __FUNC__
887 #define __FUNC__ "MatMult_MPIBAIJ"
888 int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
889 {
890   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
891   int         ierr, nt;
892 
893   VecGetLocalSize_Fast(xx,nt);
894   if (nt != a->n) {
895     SETERRQ(1,0,"Incompatible partition of A and xx");
896   }
897   VecGetLocalSize_Fast(yy,nt);
898   if (nt != a->m) {
899     SETERRQ(1,0,"Incompatible parition of A and yy");
900   }
901   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
902   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
903   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
904   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
905   ierr = VecScatterPostRecvs(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
906   return 0;
907 }
908 
909 #undef __FUNC__
910 #define __FUNC__ "MatMultAdd_MPIBAIJ"
911 int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
912 {
913   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
914   int        ierr;
915   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
916   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
917   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
918   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
919   return 0;
920 }
921 
922 #undef __FUNC__
923 #define __FUNC__ "MatMultTrans_MPIBAIJ"
924 int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
925 {
926   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
927   int        ierr;
928 
929   /* do nondiagonal part */
930   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
931   /* send it on its way */
932   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
933   /* do local part */
934   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
935   /* receive remote parts: note this assumes the values are not actually */
936   /* inserted in yy until the next line, which is true for my implementation*/
937   /* but is not perhaps always true. */
938   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
939   return 0;
940 }
941 
942 #undef __FUNC__
943 #define __FUNC__ "MatMultTransAdd_MPIBAIJ"
944 int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
945 {
946   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
947   int        ierr;
948 
949   /* do nondiagonal part */
950   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
951   /* send it on its way */
952   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
953   /* do local part */
954   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
955   /* receive remote parts: note this assumes the values are not actually */
956   /* inserted in yy until the next line, which is true for my implementation*/
957   /* but is not perhaps always true. */
958   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
959   return 0;
960 }
961 
962 /*
963   This only works correctly for square matrices where the subblock A->A is the
964    diagonal block
965 */
966 #undef __FUNC__
967 #define __FUNC__ "MatGetDiagonal_MPIBAIJ"
968 int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
969 {
970   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
971   if (a->M != a->N)
972     SETERRQ(1,0,"Supports only square matrix where A->A is diag block");
973   return MatGetDiagonal(a->A,v);
974 }
975 
976 #undef __FUNC__
977 #define __FUNC__ "MatScale_MPIBAIJ"
978 int MatScale_MPIBAIJ(Scalar *aa,Mat A)
979 {
980   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
981   int        ierr;
982   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
983   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
984   return 0;
985 }
986 
987 #undef __FUNC__
988 #define __FUNC__ "MatGetSize_MPIBAIJ"
989 int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
990 {
991   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
992   *m = mat->M; *n = mat->N;
993   return 0;
994 }
995 
996 #undef __FUNC__
997 #define __FUNC__ "MatGetLocalSize_MPIBAIJ"
998 int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
999 {
1000   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1001   *m = mat->m; *n = mat->N;
1002   return 0;
1003 }
1004 
1005 #undef __FUNC__
1006 #define __FUNC__ "MatGetOwnershipRange_MPIBAIJ"
1007 int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
1008 {
1009   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1010   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
1011   return 0;
1012 }
1013 
1014 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
1015 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
1016 
1017 #undef __FUNC__
1018 #define __FUNC__ "MatGetRow_MPIBAIJ"
1019 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
1020 {
1021   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1022   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
1023   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
1024   int        nztot, nzA, nzB, lrow, brstart = mat->rstart*bs, brend = mat->rend*bs;
1025   int        *cmap, *idx_p,cstart = mat->cstart;
1026 
1027   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,0,"Already active");
1028   mat->getrowactive = PETSC_TRUE;
1029 
1030   if (!mat->rowvalues && (idx || v)) {
1031     /*
1032         allocate enough space to hold information from the longest row.
1033     */
1034     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
1035     int     max = 1,mbs = mat->mbs,tmp;
1036     for ( i=0; i<mbs; i++ ) {
1037       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
1038       if (max < tmp) { max = tmp; }
1039     }
1040     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
1041     CHKPTRQ(mat->rowvalues);
1042     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
1043   }
1044 
1045 
1046   if (row < brstart || row >= brend) SETERRQ(1,0,"Only local rows")
1047   lrow = row - brstart;
1048 
1049   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
1050   if (!v)   {pvA = 0; pvB = 0;}
1051   if (!idx) {pcA = 0; if (!v) pcB = 0;}
1052   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1053   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1054   nztot = nzA + nzB;
1055 
1056   cmap  = mat->garray;
1057   if (v  || idx) {
1058     if (nztot) {
1059       /* Sort by increasing column numbers, assuming A and B already sorted */
1060       int imark = -1;
1061       if (v) {
1062         *v = v_p = mat->rowvalues;
1063         for ( i=0; i<nzB; i++ ) {
1064           if (cmap[cworkB[i]/bs] < cstart)   v_p[i] = vworkB[i];
1065           else break;
1066         }
1067         imark = i;
1068         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
1069         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
1070       }
1071       if (idx) {
1072         *idx = idx_p = mat->rowindices;
1073         if (imark > -1) {
1074           for ( i=0; i<imark; i++ ) {
1075             idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs;
1076           }
1077         } else {
1078           for ( i=0; i<nzB; i++ ) {
1079             if (cmap[cworkB[i]/bs] < cstart)
1080               idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
1081             else break;
1082           }
1083           imark = i;
1084         }
1085         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart*bs + cworkA[i];
1086         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
1087       }
1088     }
1089     else {
1090       if (idx) *idx = 0;
1091       if (v)   *v   = 0;
1092     }
1093   }
1094   *nz = nztot;
1095   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1096   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1097   return 0;
1098 }
1099 
1100 #undef __FUNC__
1101 #define __FUNC__ "MatRestoreRow_MPIBAIJ"
1102 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
1103 {
1104   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1105   if (baij->getrowactive == PETSC_FALSE) {
1106     SETERRQ(1,0,"MatGetRow not called");
1107   }
1108   baij->getrowactive = PETSC_FALSE;
1109   return 0;
1110 }
1111 
1112 #undef __FUNC__
1113 #define __FUNC__ "MatGetBlockSize_MPIBAIJ"
1114 int MatGetBlockSize_MPIBAIJ(Mat mat,int *bs)
1115 {
1116   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1117   *bs = baij->bs;
1118   return 0;
1119 }
1120 
1121 #undef __FUNC__
1122 #define __FUNC__ "MatZeroEntries_MPIBAIJ"
1123 int MatZeroEntries_MPIBAIJ(Mat A)
1124 {
1125   Mat_MPIBAIJ *l = (Mat_MPIBAIJ *) A->data;
1126   int         ierr;
1127   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
1128   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
1129   return 0;
1130 }
1131 
1132 #undef __FUNC__
1133 #define __FUNC__ "MatGetInfo_MPIBAIJ"
1134 int MatGetInfo_MPIBAIJ(Mat matin,MatInfoType flag,MatInfo *info)
1135 {
1136   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) matin->data;
1137   Mat         A = a->A, B = a->B;
1138   int         ierr;
1139   double      isend[5], irecv[5];
1140 
1141   info->rows_global    = (double)a->M;
1142   info->columns_global = (double)a->N;
1143   info->rows_local     = (double)a->m;
1144   info->columns_local  = (double)a->N;
1145   info->block_size     = (double)a->bs;
1146   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
1147   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->memory;
1148   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
1149   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->memory;
1150   if (flag == MAT_LOCAL) {
1151     info->nz_used      = isend[0];
1152     info->nz_allocated = isend[1];
1153     info->nz_unneeded  = isend[2];
1154     info->memory       = isend[3];
1155     info->mallocs      = isend[4];
1156   } else if (flag == MAT_GLOBAL_MAX) {
1157     MPI_Allreduce(isend,irecv,5,MPI_INT,MPI_MAX,matin->comm);
1158     info->nz_used      = irecv[0];
1159     info->nz_allocated = irecv[1];
1160     info->nz_unneeded  = irecv[2];
1161     info->memory       = irecv[3];
1162     info->mallocs      = irecv[4];
1163   } else if (flag == MAT_GLOBAL_SUM) {
1164     MPI_Allreduce(isend,irecv,5,MPI_INT,MPI_SUM,matin->comm);
1165     info->nz_used      = irecv[0];
1166     info->nz_allocated = irecv[1];
1167     info->nz_unneeded  = irecv[2];
1168     info->memory       = irecv[3];
1169     info->mallocs      = irecv[4];
1170   }
1171   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
1172   info->fill_ratio_needed = 0;
1173   info->factor_mallocs    = 0;
1174   return 0;
1175 }
1176 
1177 #undef __FUNC__
1178 #define __FUNC__ "MatSetOption_MPIBAIJ"
1179 int MatSetOption_MPIBAIJ(Mat A,MatOption op)
1180 {
1181   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1182 
1183   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
1184       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
1185       op == MAT_COLUMNS_UNSORTED ||
1186       op == MAT_COLUMNS_SORTED ||
1187       op == MAT_NEW_NONZERO_ALLOCATION_ERROR ||
1188       op == MAT_NEW_NONZERO_LOCATION_ERROR) {
1189         MatSetOption(a->A,op);
1190         MatSetOption(a->B,op);
1191   } else if (op == MAT_ROW_ORIENTED) {
1192         a->roworiented = 1;
1193         MatSetOption(a->A,op);
1194         MatSetOption(a->B,op);
1195   } else if (op == MAT_ROWS_SORTED ||
1196              op == MAT_ROWS_UNSORTED ||
1197              op == MAT_SYMMETRIC ||
1198              op == MAT_STRUCTURALLY_SYMMETRIC ||
1199              op == MAT_YES_NEW_DIAGONALS)
1200     PLogInfo(A,"Info:MatSetOption_MPIBAIJ:Option ignored\n");
1201   else if (op == MAT_COLUMN_ORIENTED) {
1202     a->roworiented = 0;
1203     MatSetOption(a->A,op);
1204     MatSetOption(a->B,op);
1205   } else if (op == MAT_IGNORE_OFF_PROC_ENTRIES) {
1206     a->donotstash = 1;
1207   } else if (op == MAT_NO_NEW_DIAGONALS)
1208     {SETERRQ(PETSC_ERR_SUP,0,"MAT_NO_NEW_DIAGONALS");}
1209   else
1210     {SETERRQ(PETSC_ERR_SUP,0,"unknown option");}
1211   return 0;
1212 }
1213 
1214 #undef __FUNC__
1215 #define __FUNC__ "MatTranspose_MPIBAIJ("
1216 int MatTranspose_MPIBAIJ(Mat A,Mat *matout)
1217 {
1218   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) A->data;
1219   Mat_SeqBAIJ *Aloc;
1220   Mat        B;
1221   int        ierr,M=baij->M,N=baij->N,*ai,*aj,row,i,*rvals,j,k,col;
1222   int        bs=baij->bs,mbs=baij->mbs;
1223   Scalar     *a;
1224 
1225   if (matout == PETSC_NULL && M != N)
1226     SETERRQ(1,0,"Square matrix only for in-place");
1227   ierr = MatCreateMPIBAIJ(A->comm,baij->bs,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);
1228   CHKERRQ(ierr);
1229 
1230   /* copy over the A part */
1231   Aloc = (Mat_SeqBAIJ*) baij->A->data;
1232   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1233   row = baij->rstart;
1234   rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
1235 
1236   for ( i=0; i<mbs; i++ ) {
1237     rvals[0] = bs*(baij->rstart + i);
1238     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1239     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1240       col = (baij->cstart+aj[j])*bs;
1241       for (k=0; k<bs; k++ ) {
1242         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1243         col++; a += bs;
1244       }
1245     }
1246   }
1247   /* copy over the B part */
1248   Aloc = (Mat_SeqBAIJ*) baij->B->data;
1249   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1250   row = baij->rstart*bs;
1251   for ( i=0; i<mbs; i++ ) {
1252     rvals[0] = bs*(baij->rstart + i);
1253     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1254     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1255       col = baij->garray[aj[j]]*bs;
1256       for (k=0; k<bs; k++ ) {
1257         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1258         col++; a += bs;
1259       }
1260     }
1261   }
1262   PetscFree(rvals);
1263   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1264   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1265 
1266   if (matout != PETSC_NULL) {
1267     *matout = B;
1268   } else {
1269     /* This isn't really an in-place transpose .... but free data structures from baij */
1270     PetscFree(baij->rowners);
1271     ierr = MatDestroy(baij->A); CHKERRQ(ierr);
1272     ierr = MatDestroy(baij->B); CHKERRQ(ierr);
1273     if (baij->colmap) PetscFree(baij->colmap);
1274     if (baij->garray) PetscFree(baij->garray);
1275     if (baij->lvec) VecDestroy(baij->lvec);
1276     if (baij->Mvctx) VecScatterDestroy(baij->Mvctx);
1277     PetscFree(baij);
1278     PetscMemcpy(A,B,sizeof(struct _p_Mat));
1279     PetscHeaderDestroy(B);
1280   }
1281   return 0;
1282 }
1283 
1284 #undef __FUNC__
1285 #define __FUNC__ "MatDiagonalScale_MPIBAIJ"
1286 int MatDiagonalScale_MPIBAIJ(Mat A,Vec ll,Vec rr)
1287 {
1288   Mat a = ((Mat_MPIBAIJ *) A->data)->A;
1289   Mat b = ((Mat_MPIBAIJ *) A->data)->B;
1290   int ierr,s1,s2,s3;
1291 
1292   if (ll)  {
1293     ierr = VecGetLocalSize(ll,&s1); CHKERRQ(ierr);
1294     ierr = MatGetLocalSize(A,&s2,&s3); CHKERRQ(ierr);
1295     if (s1!=s2) SETERRQ(1,0,"non-conforming local sizes");
1296     ierr = MatDiagonalScale(a,ll,0); CHKERRQ(ierr);
1297     ierr = MatDiagonalScale(b,ll,0); CHKERRQ(ierr);
1298   }
1299   if (rr) SETERRQ(1,0,"not supported for right vector");
1300   return 0;
1301 }
1302 
1303 /* the code does not do the diagonal entries correctly unless the
1304    matrix is square and the column and row owerships are identical.
1305    This is a BUG. The only way to fix it seems to be to access
1306    baij->A and baij->B directly and not through the MatZeroRows()
1307    routine.
1308 */
1309 #undef __FUNC__
1310 #define __FUNC__ "MatZeroRows_MPIBAIJ"
1311 int MatZeroRows_MPIBAIJ(Mat A,IS is,Scalar *diag)
1312 {
1313   Mat_MPIBAIJ    *l = (Mat_MPIBAIJ *) A->data;
1314   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
1315   int            *procs,*nprocs,j,found,idx,nsends,*work;
1316   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
1317   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
1318   int            *lens,imdex,*lrows,*values,bs=l->bs;
1319   MPI_Comm       comm = A->comm;
1320   MPI_Request    *send_waits,*recv_waits;
1321   MPI_Status     recv_status,*send_status;
1322   IS             istmp;
1323 
1324   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
1325   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
1326 
1327   /*  first count number of contributors to each processor */
1328   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
1329   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
1330   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
1331   for ( i=0; i<N; i++ ) {
1332     idx = rows[i];
1333     found = 0;
1334     for ( j=0; j<size; j++ ) {
1335       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
1336         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
1337       }
1338     }
1339     if (!found) SETERRQ(1,0,"Index out of range");
1340   }
1341   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
1342 
1343   /* inform other processors of number of messages and max length*/
1344   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
1345   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
1346   nrecvs = work[rank];
1347   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
1348   nmax = work[rank];
1349   PetscFree(work);
1350 
1351   /* post receives:   */
1352   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
1353   CHKPTRQ(rvalues);
1354   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
1355   CHKPTRQ(recv_waits);
1356   for ( i=0; i<nrecvs; i++ ) {
1357     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
1358   }
1359 
1360   /* do sends:
1361       1) starts[i] gives the starting index in svalues for stuff going to
1362          the ith processor
1363   */
1364   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
1365   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
1366   CHKPTRQ(send_waits);
1367   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
1368   starts[0] = 0;
1369   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1370   for ( i=0; i<N; i++ ) {
1371     svalues[starts[owner[i]]++] = rows[i];
1372   }
1373   ISRestoreIndices(is,&rows);
1374 
1375   starts[0] = 0;
1376   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1377   count = 0;
1378   for ( i=0; i<size; i++ ) {
1379     if (procs[i]) {
1380       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
1381     }
1382   }
1383   PetscFree(starts);
1384 
1385   base = owners[rank]*bs;
1386 
1387   /*  wait on receives */
1388   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
1389   source = lens + nrecvs;
1390   count  = nrecvs; slen = 0;
1391   while (count) {
1392     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
1393     /* unpack receives into our local space */
1394     MPI_Get_count(&recv_status,MPI_INT,&n);
1395     source[imdex]  = recv_status.MPI_SOURCE;
1396     lens[imdex]  = n;
1397     slen += n;
1398     count--;
1399   }
1400   PetscFree(recv_waits);
1401 
1402   /* move the data into the send scatter */
1403   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
1404   count = 0;
1405   for ( i=0; i<nrecvs; i++ ) {
1406     values = rvalues + i*nmax;
1407     for ( j=0; j<lens[i]; j++ ) {
1408       lrows[count++] = values[j] - base;
1409     }
1410   }
1411   PetscFree(rvalues); PetscFree(lens);
1412   PetscFree(owner); PetscFree(nprocs);
1413 
1414   /* actually zap the local rows */
1415   ierr = ISCreateGeneral(PETSC_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
1416   PLogObjectParent(A,istmp);
1417   PetscFree(lrows);
1418   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
1419   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
1420   ierr = ISDestroy(istmp); CHKERRQ(ierr);
1421 
1422   /* wait on sends */
1423   if (nsends) {
1424     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
1425     CHKPTRQ(send_status);
1426     MPI_Waitall(nsends,send_waits,send_status);
1427     PetscFree(send_status);
1428   }
1429   PetscFree(send_waits); PetscFree(svalues);
1430 
1431   return 0;
1432 }
1433 extern int MatPrintHelp_SeqBAIJ(Mat);
1434 #undef __FUNC__
1435 #define __FUNC__ "MatPrintHelp_MPIBAIJ"
1436 int MatPrintHelp_MPIBAIJ(Mat A)
1437 {
1438   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1439 
1440   if (!a->rank) return MatPrintHelp_SeqBAIJ(a->A);
1441   else return 0;
1442 }
1443 
1444 #undef __FUNC__
1445 #define __FUNC__ "MatSetUnfactored_MPIBAIJ"
1446 int MatSetUnfactored_MPIBAIJ(Mat A)
1447 {
1448   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1449   int         ierr;
1450   ierr = MatSetUnfactored(a->A); CHKERRQ(ierr);
1451   return 0;
1452 }
1453 
1454 static int MatConvertSameType_MPIBAIJ(Mat,Mat *,int);
1455 
1456 /* -------------------------------------------------------------------*/
1457 static struct _MatOps MatOps = {
1458   MatSetValues_MPIBAIJ,MatGetRow_MPIBAIJ,MatRestoreRow_MPIBAIJ,MatMult_MPIBAIJ,
1459   MatMultAdd_MPIBAIJ,MatMultTrans_MPIBAIJ,MatMultTransAdd_MPIBAIJ,0,
1460   0,0,0,0,
1461   0,0,MatTranspose_MPIBAIJ,MatGetInfo_MPIBAIJ,
1462   0,MatGetDiagonal_MPIBAIJ,MatDiagonalScale_MPIBAIJ,MatNorm_MPIBAIJ,
1463   MatAssemblyBegin_MPIBAIJ,MatAssemblyEnd_MPIBAIJ,0,MatSetOption_MPIBAIJ,
1464   MatZeroEntries_MPIBAIJ,MatZeroRows_MPIBAIJ,0,
1465   0,0,0,MatGetSize_MPIBAIJ,
1466   MatGetLocalSize_MPIBAIJ,MatGetOwnershipRange_MPIBAIJ,0,0,
1467   0,0,MatConvertSameType_MPIBAIJ,0,0,
1468   0,0,0,MatGetSubMatrices_MPIBAIJ,
1469   MatIncreaseOverlap_MPIBAIJ,MatGetValues_MPIBAIJ,0,MatPrintHelp_MPIBAIJ,
1470   MatScale_MPIBAIJ,0,0,0,MatGetBlockSize_MPIBAIJ,
1471   0,0,0,0,0,0,MatSetUnfactored_MPIBAIJ,0,MatSetValuesBlocked_MPIBAIJ};
1472 
1473 
1474 #undef __FUNC__
1475 #define __FUNC__ "MatCreateMPIBAIJ"
1476 /*@C
1477    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
1478    (block compressed row).  For good matrix assembly performance
1479    the user should preallocate the matrix storage by setting the parameters
1480    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1481    performance can be increased by more than a factor of 50.
1482 
1483    Input Parameters:
1484 .  comm - MPI communicator
1485 .  bs   - size of blockk
1486 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1487            This value should be the same as the local size used in creating the
1488            y vector for the matrix-vector product y = Ax.
1489 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1490            This value should be the same as the local size used in creating the
1491            x vector for the matrix-vector product y = Ax.
1492 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1493 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1494 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
1495            submatrix  (same for all local rows)
1496 .  d_nzz - array containing the number of block nonzeros in the various block rows
1497            of the in diagonal portion of the local (possibly different for each block
1498            row) or PETSC_NULL.  You must leave room for the diagonal entry even if
1499            it is zero.
1500 .  o_nz  - number of block nonzeros per block row in the off-diagonal portion of local
1501            submatrix (same for all local rows).
1502 .  o_nzz - array containing the number of nonzeros in the various block rows of the
1503            off-diagonal portion of the local submatrix (possibly different for
1504            each block row) or PETSC_NULL.
1505 
1506    Output Parameter:
1507 .  A - the matrix
1508 
1509    Notes:
1510    The user MUST specify either the local or global matrix dimensions
1511    (possibly both).
1512 
1513    Storage Information:
1514    For a square global matrix we define each processor's diagonal portion
1515    to be its local rows and the corresponding columns (a square submatrix);
1516    each processor's off-diagonal portion encompasses the remainder of the
1517    local matrix (a rectangular submatrix).
1518 
1519    The user can specify preallocated storage for the diagonal part of
1520    the local submatrix with either d_nz or d_nnz (not both).  Set
1521    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1522    memory allocation.  Likewise, specify preallocated storage for the
1523    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1524 
1525    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1526    the figure below we depict these three local rows and all columns (0-11).
1527 
1528 $          0 1 2 3 4 5 6 7 8 9 10 11
1529 $         -------------------
1530 $  row 3  |  o o o d d d o o o o o o
1531 $  row 4  |  o o o d d d o o o o o o
1532 $  row 5  |  o o o d d d o o o o o o
1533 $         -------------------
1534 $
1535 
1536    Thus, any entries in the d locations are stored in the d (diagonal)
1537    submatrix, and any entries in the o locations are stored in the
1538    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1539    stored simply in the MATSEQBAIJ format for compressed row storage.
1540 
1541    Now d_nz should indicate the number of nonzeros per row in the d matrix,
1542    and o_nz should indicate the number of nonzeros per row in the o matrix.
1543    In general, for PDE problems in which most nonzeros are near the diagonal,
1544    one expects d_nz >> o_nz.   For large problems you MUST preallocate memory
1545    or you will get TERRIBLE performance; see the users' manual chapter on
1546    matrices.
1547 
1548 .keywords: matrix, block, aij, compressed row, sparse, parallel
1549 
1550 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues()
1551 @*/
1552 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
1553                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1554 {
1555   Mat          B;
1556   Mat_MPIBAIJ  *b;
1557   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE,size;
1558 
1559   if (bs < 1) SETERRQ(1,0,"Invalid block size specified, must be positive");
1560 
1561   MPI_Comm_size(comm,&size);
1562   if (size == 1) {
1563     if (M == PETSC_DECIDE) M = m;
1564     if (N == PETSC_DECIDE) N = n;
1565     ierr = MatCreateSeqBAIJ(comm,bs,M,N,d_nz,d_nnz,A); CHKERRQ(ierr);
1566     return 0;
1567   }
1568 
1569   *A = 0;
1570   PetscHeaderCreate(B,_p_Mat,MAT_COOKIE,MATMPIBAIJ,comm);
1571   PLogObjectCreate(B);
1572   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
1573   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
1574   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1575 
1576   B->destroy    = MatDestroy_MPIBAIJ;
1577   B->view       = MatView_MPIBAIJ;
1578   B->mapping    = 0;
1579   B->factor     = 0;
1580   B->assembled  = PETSC_FALSE;
1581 
1582   B->insertmode = NOT_SET_VALUES;
1583   MPI_Comm_rank(comm,&b->rank);
1584   MPI_Comm_size(comm,&b->size);
1585 
1586   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
1587     SETERRQ(1,0,"Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1588   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) SETERRQ(1,0,"either M or m should be specified");
1589   if ( M == PETSC_DECIDE && n == PETSC_DECIDE)SETERRQ(1,0,"either N or n should be specified");
1590   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
1591   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
1592 
1593   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1594     work[0] = m; work[1] = n;
1595     mbs = m/bs; nbs = n/bs;
1596     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
1597     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
1598     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
1599   }
1600   if (m == PETSC_DECIDE) {
1601     Mbs = M/bs;
1602     if (Mbs*bs != M) SETERRQ(1,0,"No of global rows must be divisible by blocksize");
1603     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
1604     m   = mbs*bs;
1605   }
1606   if (n == PETSC_DECIDE) {
1607     Nbs = N/bs;
1608     if (Nbs*bs != N) SETERRQ(1,0,"No of global cols must be divisible by blocksize");
1609     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
1610     n   = nbs*bs;
1611   }
1612   if (mbs*bs != m || nbs*bs != n) SETERRQ(1,0,"No of local rows, cols must be divisible by blocksize");
1613 
1614   b->m = m; B->m = m;
1615   b->n = n; B->n = n;
1616   b->N = N; B->N = N;
1617   b->M = M; B->M = M;
1618   b->bs  = bs;
1619   b->bs2 = bs*bs;
1620   b->mbs = mbs;
1621   b->nbs = nbs;
1622   b->Mbs = Mbs;
1623   b->Nbs = Nbs;
1624 
1625   /* build local table of row and column ownerships */
1626   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1627   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIBAIJ));
1628   b->cowners = b->rowners + b->size + 2;
1629   MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
1630   b->rowners[0] = 0;
1631   for ( i=2; i<=b->size; i++ ) {
1632     b->rowners[i] += b->rowners[i-1];
1633   }
1634   b->rstart    = b->rowners[b->rank];
1635   b->rend      = b->rowners[b->rank+1];
1636   b->rstart_bs = b->rstart * bs;
1637   b->rend_bs   = b->rend * bs;
1638 
1639   MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
1640   b->cowners[0] = 0;
1641   for ( i=2; i<=b->size; i++ ) {
1642     b->cowners[i] += b->cowners[i-1];
1643   }
1644   b->cstart    = b->cowners[b->rank];
1645   b->cend      = b->cowners[b->rank+1];
1646   b->cstart_bs = b->cstart * bs;
1647   b->cend_bs   = b->cend * bs;
1648 
1649   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1650   ierr = MatCreateSeqBAIJ(PETSC_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1651   PLogObjectParent(B,b->A);
1652   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1653   ierr = MatCreateSeqBAIJ(PETSC_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1654   PLogObjectParent(B,b->B);
1655 
1656   /* build cache for off array entries formed */
1657   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1658   b->donotstash  = 0;
1659   b->colmap      = 0;
1660   b->garray      = 0;
1661   b->roworiented = 1;
1662 
1663   /* stuff used in block assembly */
1664   b->barray      = 0;
1665 
1666   /* stuff used for matrix vector multiply */
1667   b->lvec        = 0;
1668   b->Mvctx       = 0;
1669 
1670   /* stuff for MatGetRow() */
1671   b->rowindices   = 0;
1672   b->rowvalues    = 0;
1673   b->getrowactive = PETSC_FALSE;
1674 
1675   *A = B;
1676   return 0;
1677 }
1678 
1679 #undef __FUNC__
1680 #define __FUNC__ "MatConvertSameType_MPIBAIJ"
1681 static int MatConvertSameType_MPIBAIJ(Mat matin,Mat *newmat,int cpvalues)
1682 {
1683   Mat         mat;
1684   Mat_MPIBAIJ *a,*oldmat = (Mat_MPIBAIJ *) matin->data;
1685   int         ierr, len=0, flg;
1686 
1687   *newmat       = 0;
1688   PetscHeaderCreate(mat,_p_Mat,MAT_COOKIE,MATMPIBAIJ,matin->comm);
1689   PLogObjectCreate(mat);
1690   mat->data       = (void *) (a = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(a);
1691   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1692   mat->destroy    = MatDestroy_MPIBAIJ;
1693   mat->view       = MatView_MPIBAIJ;
1694   mat->factor     = matin->factor;
1695   mat->assembled  = PETSC_TRUE;
1696 
1697   a->m = mat->m   = oldmat->m;
1698   a->n = mat->n   = oldmat->n;
1699   a->M = mat->M   = oldmat->M;
1700   a->N = mat->N   = oldmat->N;
1701 
1702   a->bs  = oldmat->bs;
1703   a->bs2 = oldmat->bs2;
1704   a->mbs = oldmat->mbs;
1705   a->nbs = oldmat->nbs;
1706   a->Mbs = oldmat->Mbs;
1707   a->Nbs = oldmat->Nbs;
1708 
1709   a->rstart       = oldmat->rstart;
1710   a->rend         = oldmat->rend;
1711   a->cstart       = oldmat->cstart;
1712   a->cend         = oldmat->cend;
1713   a->size         = oldmat->size;
1714   a->rank         = oldmat->rank;
1715   mat->insertmode = NOT_SET_VALUES;
1716   a->rowvalues    = 0;
1717   a->getrowactive = PETSC_FALSE;
1718   a->barray       = 0;
1719 
1720   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1721   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIBAIJ));
1722   a->cowners = a->rowners + a->size + 2;
1723   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1724   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1725   if (oldmat->colmap) {
1726     a->colmap = (int *) PetscMalloc((a->Nbs)*sizeof(int));CHKPTRQ(a->colmap);
1727     PLogObjectMemory(mat,(a->Nbs)*sizeof(int));
1728     PetscMemcpy(a->colmap,oldmat->colmap,(a->Nbs)*sizeof(int));
1729   } else a->colmap = 0;
1730   if (oldmat->garray && (len = ((Mat_SeqBAIJ *) (oldmat->B->data))->nbs)) {
1731     a->garray = (int *) PetscMalloc(len*sizeof(int)); CHKPTRQ(a->garray);
1732     PLogObjectMemory(mat,len*sizeof(int));
1733     PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1734   } else a->garray = 0;
1735 
1736   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1737   PLogObjectParent(mat,a->lvec);
1738   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1739   PLogObjectParent(mat,a->Mvctx);
1740   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1741   PLogObjectParent(mat,a->A);
1742   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1743   PLogObjectParent(mat,a->B);
1744   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1745   if (flg) {
1746     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1747   }
1748   *newmat = mat;
1749   return 0;
1750 }
1751 
1752 #include "sys.h"
1753 
1754 #undef __FUNC__
1755 #define __FUNC__ "MatLoad_MPIBAIJ"
1756 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
1757 {
1758   Mat          A;
1759   int          i, nz, ierr, j,rstart, rend, fd;
1760   Scalar       *vals,*buf;
1761   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1762   MPI_Status   status;
1763   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
1764   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
1765   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,bs2,Mbs,mbs,extra_rows;
1766   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
1767   int          dcount,kmax,k,nzcount,tmp;
1768 
1769 
1770   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
1771   bs2  = bs*bs;
1772 
1773   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1774   if (!rank) {
1775     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1776     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
1777     if (header[0] != MAT_COOKIE) SETERRQ(1,0,"not matrix object");
1778   }
1779 
1780   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1781   M = header[1]; N = header[2];
1782 
1783   if (M != N) SETERRQ(1,0,"Can only do square matrices");
1784 
1785   /*
1786      This code adds extra rows to make sure the number of rows is
1787      divisible by the blocksize
1788   */
1789   Mbs        = M/bs;
1790   extra_rows = bs - M + bs*(Mbs);
1791   if (extra_rows == bs) extra_rows = 0;
1792   else                  Mbs++;
1793   if (extra_rows &&!rank) {
1794     PLogInfo(0,"MatLoad_MPIBAIJ:Padding loaded matrix to match blocksize\n");
1795   }
1796 
1797   /* determine ownership of all rows */
1798   mbs = Mbs/size + ((Mbs % size) > rank);
1799   m   = mbs * bs;
1800   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
1801   browners = rowners + size + 1;
1802   MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1803   rowners[0] = 0;
1804   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
1805   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
1806   rstart = rowners[rank];
1807   rend   = rowners[rank+1];
1808 
1809   /* distribute row lengths to all processors */
1810   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
1811   if (!rank) {
1812     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
1813     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1814     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
1815     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1816     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
1817     MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);
1818     PetscFree(sndcounts);
1819   }
1820   else {
1821     MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);
1822   }
1823 
1824   if (!rank) {
1825     /* calculate the number of nonzeros on each processor */
1826     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1827     PetscMemzero(procsnz,size*sizeof(int));
1828     for ( i=0; i<size; i++ ) {
1829       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
1830         procsnz[i] += rowlengths[j];
1831       }
1832     }
1833     PetscFree(rowlengths);
1834 
1835     /* determine max buffer needed and allocate it */
1836     maxnz = 0;
1837     for ( i=0; i<size; i++ ) {
1838       maxnz = PetscMax(maxnz,procsnz[i]);
1839     }
1840     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1841 
1842     /* read in my part of the matrix column indices  */
1843     nz = procsnz[0];
1844     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1845     mycols = ibuf;
1846     if (size == 1)  nz -= extra_rows;
1847     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1848     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
1849 
1850     /* read in every ones (except the last) and ship off */
1851     for ( i=1; i<size-1; i++ ) {
1852       nz = procsnz[i];
1853       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1854       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1855     }
1856     /* read in the stuff for the last proc */
1857     if ( size != 1 ) {
1858       nz = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
1859       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1860       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
1861       MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);
1862     }
1863     PetscFree(cols);
1864   }
1865   else {
1866     /* determine buffer space needed for message */
1867     nz = 0;
1868     for ( i=0; i<m; i++ ) {
1869       nz += locrowlens[i];
1870     }
1871     ibuf = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1872     mycols = ibuf;
1873     /* receive message of column indices*/
1874     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1875     MPI_Get_count(&status,MPI_INT,&maxnz);
1876     if (maxnz != nz) SETERRQ(1,0,"something is wrong with file");
1877   }
1878 
1879   /* loop over local rows, determining number of off diagonal entries */
1880   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
1881   odlens = dlens + (rend-rstart);
1882   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
1883   PetscMemzero(mask,3*Mbs*sizeof(int));
1884   masked1 = mask    + Mbs;
1885   masked2 = masked1 + Mbs;
1886   rowcount = 0; nzcount = 0;
1887   for ( i=0; i<mbs; i++ ) {
1888     dcount  = 0;
1889     odcount = 0;
1890     for ( j=0; j<bs; j++ ) {
1891       kmax = locrowlens[rowcount];
1892       for ( k=0; k<kmax; k++ ) {
1893         tmp = mycols[nzcount++]/bs;
1894         if (!mask[tmp]) {
1895           mask[tmp] = 1;
1896           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
1897           else masked1[dcount++] = tmp;
1898         }
1899       }
1900       rowcount++;
1901     }
1902 
1903     dlens[i]  = dcount;
1904     odlens[i] = odcount;
1905 
1906     /* zero out the mask elements we set */
1907     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
1908     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
1909   }
1910 
1911   /* create our matrix */
1912   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);
1913          CHKERRQ(ierr);
1914   A = *newmat;
1915   MatSetOption(A,MAT_COLUMNS_SORTED);
1916 
1917   if (!rank) {
1918     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
1919     /* read in my part of the matrix numerical values  */
1920     nz = procsnz[0];
1921     vals = buf;
1922     mycols = ibuf;
1923     if (size == 1)  nz -= extra_rows;
1924     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1925     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
1926 
1927     /* insert into matrix */
1928     jj      = rstart*bs;
1929     for ( i=0; i<m; i++ ) {
1930       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1931       mycols += locrowlens[i];
1932       vals   += locrowlens[i];
1933       jj++;
1934     }
1935     /* read in other processors (except the last one) and ship out */
1936     for ( i=1; i<size-1; i++ ) {
1937       nz = procsnz[i];
1938       vals = buf;
1939       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1940       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1941     }
1942     /* the last proc */
1943     if ( size != 1 ){
1944       nz = procsnz[i] - extra_rows;
1945       vals = buf;
1946       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1947       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
1948       MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);
1949     }
1950     PetscFree(procsnz);
1951   }
1952   else {
1953     /* receive numeric values */
1954     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
1955 
1956     /* receive message of values*/
1957     vals = buf;
1958     mycols = ibuf;
1959     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1960     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1961     if (maxnz != nz) SETERRQ(1,0,"something is wrong with file");
1962 
1963     /* insert into matrix */
1964     jj      = rstart*bs;
1965     for ( i=0; i<m; i++ ) {
1966       ierr    = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1967       mycols += locrowlens[i];
1968       vals   += locrowlens[i];
1969       jj++;
1970     }
1971   }
1972   PetscFree(locrowlens);
1973   PetscFree(buf);
1974   PetscFree(ibuf);
1975   PetscFree(rowners);
1976   PetscFree(dlens);
1977   PetscFree(mask);
1978   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1979   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1980   return 0;
1981 }
1982 
1983 
1984