xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision e5638eb34a8b16d498de18cd9b3470fac7fcaa21)
1 #ifdef PETSC_RCS_HEADER
2 static char vcid[] = "$Id: mpibaij.c,v 1.81 1997/09/25 21:42:10 balay Exp bsmith $";
3 #endif
4 
5 #include "pinclude/pviewer.h"
6 #include "src/mat/impls/baij/mpi/mpibaij.h"
7 #include "src/vec/vecimpl.h"
8 
9 
10 extern int MatSetUpMultiply_MPIBAIJ(Mat);
11 extern int DisAssemble_MPIBAIJ(Mat);
12 extern int MatIncreaseOverlap_MPIBAIJ(Mat,int,IS *,int);
13 extern int MatGetSubMatrices_MPIBAIJ(Mat,int,IS *,IS *,MatGetSubMatrixCall,Mat **);
14 
15 /*
16      Local utility routine that creates a mapping from the global column
17    number to the local number in the off-diagonal part of the local
18    storage of the matrix.  This is done in a non scable way since the
19    length of colmap equals the global matrix length.
20 */
21 #undef __FUNC__
22 #define __FUNC__ "CreateColmap_MPIBAIJ_Private"
23 static int CreateColmap_MPIBAIJ_Private(Mat mat)
24 {
25   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
26   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
27   int         nbs = B->nbs,i,bs=B->bs;;
28 
29   baij->colmap = (int *) PetscMalloc((baij->Nbs+1)*sizeof(int));CHKPTRQ(baij->colmap);
30   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
31   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
32   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i*bs+1;
33   return 0;
34 }
35 
36 #define CHUNKSIZE  10
37 
38 #define  MatSetValues_SeqBAIJ_A_Private(row,col,value,addv) \
39 { \
40  \
41     brow = row/bs;  \
42     rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
43     rmax = aimax[brow]; nrow = ailen[brow]; \
44       bcol = col/bs; \
45       ridx = row % bs; cidx = col % bs; \
46       low = 0; high = nrow; \
47       while (high-low > 3) { \
48         t = (low+high)/2; \
49         if (rp[t] > bcol) high = t; \
50         else              low  = t; \
51       } \
52       for ( _i=low; _i<high; _i++ ) { \
53         if (rp[_i] > bcol) break; \
54         if (rp[_i] == bcol) { \
55           bap  = ap +  bs2*_i + bs*cidx + ridx; \
56           if (addv == ADD_VALUES) *bap += value;  \
57           else                    *bap  = value;  \
58           goto a_noinsert; \
59         } \
60       } \
61       if (a->nonew == 1) goto a_noinsert; \
62       else if (a->nonew == -1) SETERRQ(1,0,"Inserting a new nonzero in the matrix"); \
63       if (nrow >= rmax) { \
64         /* there is no extra room in row, therefore enlarge */ \
65         int    new_nz = ai[a->mbs] + CHUNKSIZE,len,*new_i,*new_j; \
66         Scalar *new_a; \
67  \
68         if (a->nonew == -2) SETERRQ(1,0,"Inserting a new nonzero in the matrix"); \
69  \
70         /* malloc new storage space */ \
71         len     = new_nz*(sizeof(int)+bs2*sizeof(Scalar))+(a->mbs+1)*sizeof(int); \
72         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
73         new_j   = (int *) (new_a + bs2*new_nz); \
74         new_i   = new_j + new_nz; \
75  \
76         /* copy over old data into new slots */ \
77         for ( ii=0; ii<brow+1; ii++ ) {new_i[ii] = ai[ii];} \
78         for ( ii=brow+1; ii<a->mbs+1; ii++ ) {new_i[ii] = ai[ii]+CHUNKSIZE;} \
79         PetscMemcpy(new_j,aj,(ai[brow]+nrow)*sizeof(int)); \
80         len = (new_nz - CHUNKSIZE - ai[brow] - nrow); \
81         PetscMemcpy(new_j+ai[brow]+nrow+CHUNKSIZE,aj+ai[brow]+nrow, \
82                                                            len*sizeof(int)); \
83         PetscMemcpy(new_a,aa,(ai[brow]+nrow)*bs2*sizeof(Scalar)); \
84         PetscMemzero(new_a+bs2*(ai[brow]+nrow),bs2*CHUNKSIZE*sizeof(Scalar)); \
85         PetscMemcpy(new_a+bs2*(ai[brow]+nrow+CHUNKSIZE), \
86                     aa+bs2*(ai[brow]+nrow),bs2*len*sizeof(Scalar));  \
87         /* free up old matrix storage */ \
88         PetscFree(a->a);  \
89         if (!a->singlemalloc) {PetscFree(a->i);PetscFree(a->j);} \
90         aa = a->a = new_a; ai = a->i = new_i; aj = a->j = new_j;  \
91         a->singlemalloc = 1; \
92  \
93         rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
94         rmax = aimax[brow] = aimax[brow] + CHUNKSIZE; \
95         PLogObjectMemory(A,CHUNKSIZE*(sizeof(int) + bs2*sizeof(Scalar))); \
96         a->maxnz += bs2*CHUNKSIZE; \
97         a->reallocs++; \
98         a->nz++; \
99       } \
100       N = nrow++ - 1;  \
101       /* shift up all the later entries in this row */ \
102       for ( ii=N; ii>=_i; ii-- ) { \
103         rp[ii+1] = rp[ii]; \
104         PetscMemcpy(ap+bs2*(ii+1),ap+bs2*(ii),bs2*sizeof(Scalar)); \
105       } \
106       if (N>=_i) PetscMemzero(ap+bs2*_i,bs2*sizeof(Scalar));  \
107       rp[_i]                      = bcol;  \
108       ap[bs2*_i + bs*cidx + ridx] = value;  \
109       a_noinsert:; \
110     ailen[brow] = nrow; \
111 }
112 
113 #define  MatSetValues_SeqBAIJ_B_Private(row,col,value,addv) \
114 { \
115  \
116     brow = row/bs;  \
117     rp   = bj + bi[brow]; ap = ba + bs2*bi[brow]; \
118     rmax = bimax[brow]; nrow = bilen[brow]; \
119       bcol = col/bs; \
120       ridx = row % bs; cidx = col % bs; \
121       low = 0; high = nrow; \
122       while (high-low > 3) { \
123         t = (low+high)/2; \
124         if (rp[t] > bcol) high = t; \
125         else              low  = t; \
126       } \
127       for ( _i=low; _i<high; _i++ ) { \
128         if (rp[_i] > bcol) break; \
129         if (rp[_i] == bcol) { \
130           bap  = ap +  bs2*_i + bs*cidx + ridx; \
131           if (addv == ADD_VALUES) *bap += value;  \
132           else                    *bap  = value;  \
133           goto b_noinsert; \
134         } \
135       } \
136       if (b->nonew == 1) goto b_noinsert; \
137       else if (b->nonew == -1) SETERRQ(1,0,"Inserting a new nonzero in the matrix"); \
138       if (nrow >= rmax) { \
139         /* there is no extra room in row, therefore enlarge */ \
140         int    new_nz = bi[b->mbs] + CHUNKSIZE,len,*new_i,*new_j; \
141         Scalar *new_a; \
142  \
143         if (b->nonew == -2) SETERRQ(1,0,"Inserting a new nonzero in the matrix"); \
144  \
145         /* malloc new storage space */ \
146         len     = new_nz*(sizeof(int)+bs2*sizeof(Scalar))+(b->mbs+1)*sizeof(int); \
147         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
148         new_j   = (int *) (new_a + bs2*new_nz); \
149         new_i   = new_j + new_nz; \
150  \
151         /* copy over old data into new slots */ \
152         for ( ii=0; ii<brow+1; ii++ ) {new_i[ii] = bi[ii];} \
153         for ( ii=brow+1; ii<b->mbs+1; ii++ ) {new_i[ii] = bi[ii]+CHUNKSIZE;} \
154         PetscMemcpy(new_j,bj,(bi[brow]+nrow)*sizeof(int)); \
155         len = (new_nz - CHUNKSIZE - bi[brow] - nrow); \
156         PetscMemcpy(new_j+bi[brow]+nrow+CHUNKSIZE,bj+bi[brow]+nrow, \
157                                                            len*sizeof(int)); \
158         PetscMemcpy(new_a,ba,(bi[brow]+nrow)*bs2*sizeof(Scalar)); \
159         PetscMemzero(new_a+bs2*(bi[brow]+nrow),bs2*CHUNKSIZE*sizeof(Scalar)); \
160         PetscMemcpy(new_a+bs2*(bi[brow]+nrow+CHUNKSIZE), \
161                     ba+bs2*(bi[brow]+nrow),bs2*len*sizeof(Scalar));  \
162         /* free up old matrix storage */ \
163         PetscFree(b->a);  \
164         if (!b->singlemalloc) {PetscFree(b->i);PetscFree(b->j);} \
165         ba = b->a = new_a; bi = b->i = new_i; bj = b->j = new_j;  \
166         b->singlemalloc = 1; \
167  \
168         rp   = bj + bi[brow]; ap = ba + bs2*bi[brow]; \
169         rmax = bimax[brow] = bimax[brow] + CHUNKSIZE; \
170         PLogObjectMemory(B,CHUNKSIZE*(sizeof(int) + bs2*sizeof(Scalar))); \
171         b->maxnz += bs2*CHUNKSIZE; \
172         b->reallocs++; \
173         b->nz++; \
174       } \
175       N = nrow++ - 1;  \
176       /* shift up all the later entries in this row */ \
177       for ( ii=N; ii>=_i; ii-- ) { \
178         rp[ii+1] = rp[ii]; \
179         PetscMemcpy(ap+bs2*(ii+1),ap+bs2*(ii),bs2*sizeof(Scalar)); \
180       } \
181       if (N>=_i) PetscMemzero(ap+bs2*_i,bs2*sizeof(Scalar));  \
182       rp[_i]                      = bcol;  \
183       ap[bs2*_i + bs*cidx + ridx] = value;  \
184       b_noinsert:; \
185     bilen[brow] = nrow; \
186 }
187 
188 #undef __FUNC__
189 #define __FUNC__ "MatSetValues_MPIBAIJ"
190 int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
191 {
192   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
193   Scalar      value;
194   int         ierr,i,j,row,col;
195   int         roworiented = baij->roworiented,rstart_orig=baij->rstart_bs ;
196   int         rend_orig=baij->rend_bs,cstart_orig=baij->cstart_bs;
197   int         cend_orig=baij->cend_bs,bs=baij->bs;
198 
199   /* Some Variables required in the macro */
200   Mat         A = baij->A;
201   Mat_SeqBAIJ *a = (Mat_SeqBAIJ *) (A)->data;
202   int         *aimax=a->imax,*ai=a->i,*ailen=a->ilen,*aj=a->j;
203   Scalar      *aa=a->a;
204 
205   Mat         B = baij->B;
206   Mat_SeqBAIJ *b = (Mat_SeqBAIJ *) (B)->data;
207   int         *bimax=b->imax,*bi=b->i,*bilen=b->ilen,*bj=b->j;
208   Scalar      *ba=b->a;
209 
210   int         *rp,ii,nrow,_i,rmax,N,brow,bcol;
211   int         low,high,t,ridx,cidx,bs2=a->bs2;
212   Scalar      *ap,*bap;
213 
214   for ( i=0; i<m; i++ ) {
215 #if defined(PETSC_BOPT_g)
216     if (im[i] < 0) SETERRQ(1,0,"Negative row");
217     if (im[i] >= baij->M) SETERRQ(1,0,"Row too large");
218 #endif
219     if (im[i] >= rstart_orig && im[i] < rend_orig) {
220       row = im[i] - rstart_orig;
221       for ( j=0; j<n; j++ ) {
222         if (in[j] >= cstart_orig && in[j] < cend_orig){
223           col = in[j] - cstart_orig;
224           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
225           MatSetValues_SeqBAIJ_A_Private(row,col,value,addv);
226           /* ierr = MatSetValues_SeqBAIJ(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
227         }
228 #if defined(PETSC_BOPT_g)
229         else if (in[j] < 0) {SETERRQ(1,0,"Negative column");}
230         else if (in[j] >= baij->N) {SETERRQ(1,0,"Col too large");}
231 #endif
232         else {
233           if (mat->was_assembled) {
234             if (!baij->colmap) {
235               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
236             }
237             col = baij->colmap[in[j]/bs] - 1 + in[j]%bs;
238             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
239               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
240               col =  in[j];
241               /* Reinitialize the variables required by MatSetValues_SeqBAIJ_B_Private() */
242               B = baij->B;
243               b = (Mat_SeqBAIJ *) (B)->data;
244               bimax=b->imax;bi=b->i;bilen=b->ilen;bj=b->j;
245               ba=b->a;
246             }
247           }
248           else col = in[j];
249           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
250           MatSetValues_SeqBAIJ_B_Private(row,col,value,addv);
251           /* ierr = MatSetValues_SeqBAIJ(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
252         }
253       }
254     }
255     else {
256       if (roworiented && !baij->donotstash) {
257         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
258       }
259       else {
260         if (!baij->donotstash) {
261           row = im[i];
262 	  for ( j=0; j<n; j++ ) {
263 	    ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
264           }
265         }
266       }
267     }
268   }
269   return 0;
270 }
271 
272 extern int MatSetValuesBlocked_SeqBAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
273 #undef __FUNC__
274 #define __FUNC__ "MatSetValuesBlocked_MPIBAIJ"
275 int MatSetValuesBlocked_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
276 {
277   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
278   Scalar      *value,*barray=baij->barray;
279   int         ierr,i,j,ii,jj,row,col,k,l;
280   int         roworiented = baij->roworiented,rstart=baij->rstart ;
281   int         rend=baij->rend,cstart=baij->cstart,stepval;
282   int         cend=baij->cend,bs=baij->bs,bs2=baij->bs2;
283 
284 
285   if(!barray) {
286     baij->barray = barray = (Scalar*) PetscMalloc(bs2*sizeof(Scalar)); CHKPTRQ(barray);
287   }
288 
289   if (roworiented) {
290     stepval = (n-1)*bs;
291   } else {
292     stepval = (m-1)*bs;
293   }
294   for ( i=0; i<m; i++ ) {
295 #if defined(PETSC_BOPT_g)
296     if (im[i] < 0) SETERRQ(1,0,"Negative row");
297     if (im[i] >= baij->Mbs) SETERRQ(1,0,"Row too large");
298 #endif
299     if (im[i] >= rstart && im[i] < rend) {
300       row = im[i] - rstart;
301       for ( j=0; j<n; j++ ) {
302         /* If NumCol = 1 then a copy is not required */
303         if ((roworiented) && (n == 1)) {
304           barray = v + i*bs2;
305         } else if((!roworiented) && (m == 1)) {
306           barray = v + j*bs2;
307         } else { /* Here a copy is required */
308           if (roworiented) {
309             value = v + i*(stepval+bs)*bs + j*bs;
310           } else {
311             value = v + j*(stepval+bs)*bs + i*bs;
312           }
313           for ( ii=0; ii<bs; ii++,value+=stepval ) {
314             for (jj=0; jj<bs; jj++ ) {
315               *barray++  = *value++;
316             }
317           }
318           barray -=bs2;
319         }
320 
321         if (in[j] >= cstart && in[j] < cend){
322           col  = in[j] - cstart;
323           ierr = MatSetValuesBlocked_SeqBAIJ(baij->A,1,&row,1,&col,barray,addv);CHKERRQ(ierr);
324         }
325 #if defined(PETSC_BOPT_g)
326         else if (in[j] < 0) {SETERRQ(1,0,"Negative column");}
327         else if (in[j] >= baij->Nbs) {SETERRQ(1,0,"Column too large");}
328 #endif
329         else {
330           if (mat->was_assembled) {
331             if (!baij->colmap) {
332               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
333             }
334 
335 #if defined(PETSC_BOPT_g)
336             if ((baij->colmap[in[j]] - 1) % bs) {SETERRQ(1,0,"Incorrect colmap");}
337 #endif
338             col = (baij->colmap[in[j]] - 1)/bs;
339             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
340               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
341               col =  in[j];
342             }
343           }
344           else col = in[j];
345           ierr = MatSetValuesBlocked_SeqBAIJ(baij->B,1,&row,1,&col,barray,addv);CHKERRQ(ierr);
346         }
347       }
348     }
349     else {
350       if (!baij->donotstash) {
351         if (roworiented ) {
352           row   = im[i]*bs;
353           value = v + i*(stepval+bs)*bs;
354           for ( j=0; j<bs; j++,row++ ) {
355             for ( k=0; k<n; k++ ) {
356               for ( col=in[k]*bs,l=0; l<bs; l++,col++) {
357                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
358               }
359             }
360           }
361         }
362         else {
363           for ( j=0; j<n; j++ ) {
364             value = v + j*(stepval+bs)*bs + i*bs;
365             col   = in[j]*bs;
366             for ( k=0; k<bs; k++,col++,value+=stepval) {
367               for ( row = im[i]*bs,l=0; l<bs; l++,row++) {
368                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
369               }
370             }
371           }
372         }
373       }
374     }
375   }
376   return 0;
377 }
378 
379 #undef __FUNC__
380 #define __FUNC__ "MatGetValues_MPIBAIJ"
381 int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
382 {
383   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
384   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
385   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col;
386 
387   for ( i=0; i<m; i++ ) {
388     if (idxm[i] < 0) SETERRQ(1,0,"Negative row");
389     if (idxm[i] >= baij->M) SETERRQ(1,0,"Row too large");
390     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
391       row = idxm[i] - bsrstart;
392       for ( j=0; j<n; j++ ) {
393         if (idxn[j] < 0) SETERRQ(1,0,"Negative column");
394         if (idxn[j] >= baij->N) SETERRQ(1,0,"Col too large");
395         if (idxn[j] >= bscstart && idxn[j] < bscend){
396           col = idxn[j] - bscstart;
397           ierr = MatGetValues(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
398         }
399         else {
400           if (!baij->colmap) {
401             ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
402           }
403           if((baij->colmap[idxn[j]/bs]-1 < 0) ||
404              (baij->garray[(baij->colmap[idxn[j]/bs]-1)/bs] != idxn[j]/bs)) *(v+i*n+j) = 0.0;
405           else {
406             col  = (baij->colmap[idxn[j]/bs]-1) + idxn[j]%bs;
407             ierr = MatGetValues(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
408           }
409         }
410       }
411     }
412     else {
413       SETERRQ(1,0,"Only local values currently supported");
414     }
415   }
416   return 0;
417 }
418 
419 #undef __FUNC__
420 #define __FUNC__ "MatNorm_MPIBAIJ"
421 int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
422 {
423   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
424   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
425   int        ierr, i,bs2=baij->bs2;
426   double     sum = 0.0;
427   Scalar     *v;
428 
429   if (baij->size == 1) {
430     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
431   } else {
432     if (type == NORM_FROBENIUS) {
433       v = amat->a;
434       for (i=0; i<amat->nz*bs2; i++ ) {
435 #if defined(PETSC_COMPLEX)
436         sum += real(conj(*v)*(*v)); v++;
437 #else
438         sum += (*v)*(*v); v++;
439 #endif
440       }
441       v = bmat->a;
442       for (i=0; i<bmat->nz*bs2; i++ ) {
443 #if defined(PETSC_COMPLEX)
444         sum += real(conj(*v)*(*v)); v++;
445 #else
446         sum += (*v)*(*v); v++;
447 #endif
448       }
449       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
450       *norm = sqrt(*norm);
451     }
452     else
453       SETERRQ(PETSC_ERR_SUP,0,"No support for this norm yet");
454   }
455   return 0;
456 }
457 
458 #undef __FUNC__
459 #define __FUNC__ "MatAssemblyBegin_MPIBAIJ"
460 int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
461 {
462   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
463   MPI_Comm    comm = mat->comm;
464   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
465   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
466   MPI_Request *send_waits,*recv_waits;
467   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
468   InsertMode  addv;
469   Scalar      *rvalues,*svalues;
470 
471   /* make sure all processors are either in INSERTMODE or ADDMODE */
472   MPI_Allreduce(&mat->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
473   if (addv == (ADD_VALUES|INSERT_VALUES)) {
474     SETERRQ(1,0,"Some processors inserted others added");
475   }
476   mat->insertmode = addv; /* in case this processor had no cache */
477 
478   /*  first count number of contributors to each processor */
479   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
480   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
481   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
482   for ( i=0; i<baij->stash.n; i++ ) {
483     idx = baij->stash.idx[i];
484     for ( j=0; j<size; j++ ) {
485       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
486         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
487       }
488     }
489   }
490   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
491 
492   /* inform other processors of number of messages and max length*/
493   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
494   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
495   nreceives = work[rank];
496   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
497   nmax = work[rank];
498   PetscFree(work);
499 
500   /* post receives:
501        1) each message will consist of ordered pairs
502      (global index,value) we store the global index as a double
503      to simplify the message passing.
504        2) since we don't know how long each individual message is we
505      allocate the largest needed buffer for each receive. Potentially
506      this is a lot of wasted space.
507 
508 
509        This could be done better.
510   */
511   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
512   CHKPTRQ(rvalues);
513   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
514   CHKPTRQ(recv_waits);
515   for ( i=0; i<nreceives; i++ ) {
516     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
517               comm,recv_waits+i);
518   }
519 
520   /* do sends:
521       1) starts[i] gives the starting index in svalues for stuff going to
522          the ith processor
523   */
524   svalues = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
525   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
526   CHKPTRQ(send_waits);
527   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
528   starts[0] = 0;
529   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
530   for ( i=0; i<baij->stash.n; i++ ) {
531     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
532     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
533     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
534   }
535   PetscFree(owner);
536   starts[0] = 0;
537   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
538   count = 0;
539   for ( i=0; i<size; i++ ) {
540     if (procs[i]) {
541       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
542                 comm,send_waits+count++);
543     }
544   }
545   PetscFree(starts); PetscFree(nprocs);
546 
547   /* Free cache space */
548   PLogInfo(mat,"MatAssemblyBegin_MPIBAIJ:Number of off-processor values %d\n",baij->stash.n);
549   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
550 
551   baij->svalues    = svalues;    baij->rvalues    = rvalues;
552   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
553   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
554   baij->rmax       = nmax;
555 
556   return 0;
557 }
558 #include <math.h>
559 #define HASH_KEY 0.6180339887
560 #define HASH1(size,key) ((int)((size)*fmod(((key)*HASH_KEY),1)))
561 #define HASH2(size,key) ((int)((size)*fmod(((key+0.5)*HASH_KEY),1)))
562 
563 
564 int CreateHashTable(Mat mat,double factor)
565 {
566   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
567   Mat         A = baij->A, B=baij->B;
568   Mat_SeqBAIJ *a=(Mat_SeqBAIJ *)A->data, *b=(Mat_SeqBAIJ *)B->data;
569   int         i,j,k,nz=a->nz+b->nz,h1,h2,*ai=a->i,*aj=a->j,*bi=b->i,*bj=b->j;
570   int         size=(int)(factor*nz),ct=0,max1=0,max2=0;
571   /* Scalar      *aa=a->a,*ba=b->a; */
572   double      key;
573   static double *HT;
574   static      int flag=1;
575   extern int PetscGlobalRank;
576   flag = 1;
577   /* Allocate Memory for Hash Table */
578   if (flag) {
579     HT = (double*)PetscMalloc(size*sizeof(double));
580     flag = 0;
581   }
582   PetscMemzero(HT,size*sizeof(double));
583 
584   /* Loop Over A */
585   for ( i=0; i<a->n; i++ ) {
586     for ( j=ai[i]; j<ai[i+1]; j++ ) {
587       key = i*baij->n+aj[j]+1;
588       h1  = HASH1(size,key);
589       h2  = HASH2(size,key);
590 
591       for ( k=1; k<size; k++ ){
592         if (HT[(h1*k)%size] == 0.0) {
593           HT[(h1*k)%size] = key;
594           break;
595         } else ct++;
596       }
597       if (k> max1) max1 =k;
598     }
599   }
600   /* Loop Over B */
601   for ( i=0; i<b->n; i++ ) {
602     for ( j=bi[i]; j<bi[i+1]; j++ ) {
603       key = i*b->n+bj[j]+1;
604       h1  = HASH1(size,key);
605       for ( k=1; k<size; k++ ){
606         if (HT[(h1*k)%size] == 0.0) {
607           HT[(h1*k)%size] = key;
608           break;
609         } else ct++;
610       }
611       if (k> max2) max2 =k;
612     }
613   }
614 
615   /* Print Summary */
616   for ( i=0,key=0.0,j=0; i<size; i++)
617     if (HT[i]) {j++;}
618 
619   printf("[%d] fact = %3.2f max1 = %5d max2 = %5d Size %5d - Searches %5d Avg %5.2f Keys %5d\n",
620          PetscGlobalRank,factor,max1,max2,size,ct+j,((float)ct+j)/j,j);
621   PetscFree(HT);
622   return 0;
623 }
624 
625 #undef __FUNC__
626 #define __FUNC__ "MatAssemblyEnd_MPIBAIJ"
627 int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
628 {
629   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
630   MPI_Status  *send_status,recv_status;
631   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
632   int         bs=baij->bs,row,col,other_disassembled,flg;
633   Scalar      *values,val;
634   InsertMode  addv = mat->insertmode;
635 
636   /*  wait on receives */
637   while (count) {
638     MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);
639     /* unpack receives into our local space */
640     values = baij->rvalues + 3*imdex*baij->rmax;
641     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
642     n = n/3;
643     for ( i=0; i<n; i++ ) {
644       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
645       col = (int) PetscReal(values[3*i+1]);
646       val = values[3*i+2];
647       if (col >= baij->cstart*bs && col < baij->cend*bs) {
648         col -= baij->cstart*bs;
649         ierr = MatSetValues(baij->A,1,&row,1,&col,&val,addv); CHKERRQ(ierr)
650       }
651       else {
652         if (mat->was_assembled) {
653           if (!baij->colmap) {
654             ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);
655           }
656           col = (baij->colmap[col/bs]) - 1 + col%bs;
657           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
658             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
659             col = (int) PetscReal(values[3*i+1]);
660           }
661         }
662         ierr = MatSetValues(baij->B,1,&row,1,&col,&val,addv); CHKERRQ(ierr)
663       }
664     }
665     count--;
666   }
667   PetscFree(baij->recv_waits); PetscFree(baij->rvalues);
668 
669   /* wait on sends */
670   if (baij->nsends) {
671     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));
672     CHKPTRQ(send_status);
673     MPI_Waitall(baij->nsends,baij->send_waits,send_status);
674     PetscFree(send_status);
675   }
676   PetscFree(baij->send_waits); PetscFree(baij->svalues);
677 
678   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
679   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
680 
681   /* determine if any processor has disassembled, if so we must
682      also disassemble ourselfs, in order that we may reassemble. */
683   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
684   if (mat->was_assembled && !other_disassembled) {
685     ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
686   }
687 
688   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
689     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
690   }
691   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
692   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
693 
694   ierr = OptionsHasName(PETSC_NULL,"-use_hash",&flg); CHKERRQ(ierr);
695   if (flg) {
696     double fact;
697     for ( fact=1.2; fact<2.0; fact +=0.05) CreateHashTable(mat,fact);
698   }
699   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
700   return 0;
701 }
702 
703 #undef __FUNC__
704 #define __FUNC__ "MatView_MPIBAIJ_Binary"
705 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
706 {
707   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
708   int          ierr;
709 
710   if (baij->size == 1) {
711     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
712   }
713   else SETERRQ(1,0,"Only uniprocessor output supported");
714   return 0;
715 }
716 
717 #undef __FUNC__
718 #define __FUNC__ "MatView_MPIBAIJ_ASCIIorDraworMatlab"
719 static int MatView_MPIBAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
720 {
721   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
722   int          ierr, format,rank,bs = baij->bs;
723   FILE         *fd;
724   ViewerType   vtype;
725 
726   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
727   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
728     ierr = ViewerGetFormat(viewer,&format);
729     if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
730       MatInfo info;
731       MPI_Comm_rank(mat->comm,&rank);
732       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
733       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
734       PetscSequentialPhaseBegin(mat->comm,1);
735       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
736               rank,baij->m,(int)info.nz_used*bs,(int)info.nz_allocated*bs,
737               baij->bs,(int)info.memory);
738       ierr = MatGetInfo(baij->A,MAT_LOCAL,&info);
739       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
740       ierr = MatGetInfo(baij->B,MAT_LOCAL,&info);
741       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
742       fflush(fd);
743       PetscSequentialPhaseEnd(mat->comm,1);
744       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
745       return 0;
746     }
747     else if (format == VIEWER_FORMAT_ASCII_INFO) {
748       PetscPrintf(mat->comm,"  block size is %d\n",bs);
749       return 0;
750     }
751   }
752 
753   if (vtype == DRAW_VIEWER) {
754     Draw       draw;
755     PetscTruth isnull;
756     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
757     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
758   }
759 
760   if (vtype == ASCII_FILE_VIEWER) {
761     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
762     PetscSequentialPhaseBegin(mat->comm,1);
763     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
764            baij->rank,baij->m,baij->rstart*bs,baij->rend*bs,baij->n,
765             baij->cstart*bs,baij->cend*bs);
766     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
767     ierr = MatView(baij->B,viewer); CHKERRQ(ierr);
768     fflush(fd);
769     PetscSequentialPhaseEnd(mat->comm,1);
770   }
771   else {
772     int size = baij->size;
773     rank = baij->rank;
774     if (size == 1) {
775       ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
776     }
777     else {
778       /* assemble the entire matrix onto first processor. */
779       Mat         A;
780       Mat_SeqBAIJ *Aloc;
781       int         M = baij->M, N = baij->N,*ai,*aj,row,col,i,j,k,*rvals;
782       int         mbs=baij->mbs;
783       Scalar      *a;
784 
785       if (!rank) {
786         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
787         CHKERRQ(ierr);
788       }
789       else {
790         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
791         CHKERRQ(ierr);
792       }
793       PLogObjectParent(mat,A);
794 
795       /* copy over the A part */
796       Aloc = (Mat_SeqBAIJ*) baij->A->data;
797       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
798       row = baij->rstart;
799       rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
800 
801       for ( i=0; i<mbs; i++ ) {
802         rvals[0] = bs*(baij->rstart + i);
803         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
804         for ( j=ai[i]; j<ai[i+1]; j++ ) {
805           col = (baij->cstart+aj[j])*bs;
806           for (k=0; k<bs; k++ ) {
807             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
808             col++; a += bs;
809           }
810         }
811       }
812       /* copy over the B part */
813       Aloc = (Mat_SeqBAIJ*) baij->B->data;
814       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
815       row = baij->rstart*bs;
816       for ( i=0; i<mbs; i++ ) {
817         rvals[0] = bs*(baij->rstart + i);
818         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
819         for ( j=ai[i]; j<ai[i+1]; j++ ) {
820           col = baij->garray[aj[j]]*bs;
821           for (k=0; k<bs; k++ ) {
822             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
823             col++; a += bs;
824           }
825         }
826       }
827       PetscFree(rvals);
828       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
829       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
830       if (!rank) {
831         ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
832       }
833       ierr = MatDestroy(A); CHKERRQ(ierr);
834     }
835   }
836   return 0;
837 }
838 
839 
840 
841 #undef __FUNC__
842 #define __FUNC__ "MatView_MPIBAIJ"
843 int MatView_MPIBAIJ(PetscObject obj,Viewer viewer)
844 {
845   Mat         mat = (Mat) obj;
846   int         ierr;
847   ViewerType  vtype;
848 
849   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
850   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
851       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
852     ierr = MatView_MPIBAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
853   }
854   else if (vtype == BINARY_FILE_VIEWER) {
855     return MatView_MPIBAIJ_Binary(mat,viewer);
856   }
857   return 0;
858 }
859 
860 #undef __FUNC__
861 #define __FUNC__ "MatDestroy_MPIBAIJ"
862 int MatDestroy_MPIBAIJ(PetscObject obj)
863 {
864   Mat         mat = (Mat) obj;
865   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
866   int         ierr;
867 
868 #if defined(PETSC_LOG)
869   PLogObjectState(obj,"Rows=%d, Cols=%d",baij->M,baij->N);
870 #endif
871 
872   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
873   PetscFree(baij->rowners);
874   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
875   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
876   if (baij->colmap) PetscFree(baij->colmap);
877   if (baij->garray) PetscFree(baij->garray);
878   if (baij->lvec)   VecDestroy(baij->lvec);
879   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
880   if (baij->rowvalues) PetscFree(baij->rowvalues);
881   if (baij->barray) PetscFree(baij->barray);
882   PetscFree(baij);
883   PLogObjectDestroy(mat);
884   PetscHeaderDestroy(mat);
885   return 0;
886 }
887 
888 #undef __FUNC__
889 #define __FUNC__ "MatMult_MPIBAIJ"
890 int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
891 {
892   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
893   int         ierr, nt;
894 
895   VecGetLocalSize_Fast(xx,nt);
896   if (nt != a->n) {
897     SETERRQ(1,0,"Incompatible partition of A and xx");
898   }
899   VecGetLocalSize_Fast(yy,nt);
900   if (nt != a->m) {
901     SETERRQ(1,0,"Incompatible parition of A and yy");
902   }
903   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
904   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
905   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
906   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
907   ierr = VecScatterPostRecvs(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
908   return 0;
909 }
910 
911 #undef __FUNC__
912 #define __FUNC__ "MatMultAdd_MPIBAIJ"
913 int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
914 {
915   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
916   int        ierr;
917   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
918   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
919   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
920   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
921   return 0;
922 }
923 
924 #undef __FUNC__
925 #define __FUNC__ "MatMultTrans_MPIBAIJ"
926 int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
927 {
928   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
929   int        ierr;
930 
931   /* do nondiagonal part */
932   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
933   /* send it on its way */
934   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
935   /* do local part */
936   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
937   /* receive remote parts: note this assumes the values are not actually */
938   /* inserted in yy until the next line, which is true for my implementation*/
939   /* but is not perhaps always true. */
940   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
941   return 0;
942 }
943 
944 #undef __FUNC__
945 #define __FUNC__ "MatMultTransAdd_MPIBAIJ"
946 int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
947 {
948   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
949   int        ierr;
950 
951   /* do nondiagonal part */
952   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
953   /* send it on its way */
954   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
955   /* do local part */
956   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
957   /* receive remote parts: note this assumes the values are not actually */
958   /* inserted in yy until the next line, which is true for my implementation*/
959   /* but is not perhaps always true. */
960   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
961   return 0;
962 }
963 
964 /*
965   This only works correctly for square matrices where the subblock A->A is the
966    diagonal block
967 */
968 #undef __FUNC__
969 #define __FUNC__ "MatGetDiagonal_MPIBAIJ"
970 int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
971 {
972   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
973   if (a->M != a->N)
974     SETERRQ(1,0,"Supports only square matrix where A->A is diag block");
975   return MatGetDiagonal(a->A,v);
976 }
977 
978 #undef __FUNC__
979 #define __FUNC__ "MatScale_MPIBAIJ"
980 int MatScale_MPIBAIJ(Scalar *aa,Mat A)
981 {
982   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
983   int        ierr;
984   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
985   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
986   return 0;
987 }
988 
989 #undef __FUNC__
990 #define __FUNC__ "MatGetSize_MPIBAIJ"
991 int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
992 {
993   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
994   if (m) *m = mat->M;
995   if (n) *n = mat->N;
996   return 0;
997 }
998 
999 #undef __FUNC__
1000 #define __FUNC__ "MatGetLocalSize_MPIBAIJ"
1001 int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
1002 {
1003   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1004   *m = mat->m; *n = mat->N;
1005   return 0;
1006 }
1007 
1008 #undef __FUNC__
1009 #define __FUNC__ "MatGetOwnershipRange_MPIBAIJ"
1010 int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
1011 {
1012   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1013   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
1014   return 0;
1015 }
1016 
1017 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
1018 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
1019 
1020 #undef __FUNC__
1021 #define __FUNC__ "MatGetRow_MPIBAIJ"
1022 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
1023 {
1024   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1025   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
1026   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
1027   int        nztot, nzA, nzB, lrow, brstart = mat->rstart*bs, brend = mat->rend*bs;
1028   int        *cmap, *idx_p,cstart = mat->cstart;
1029 
1030   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,0,"Already active");
1031   mat->getrowactive = PETSC_TRUE;
1032 
1033   if (!mat->rowvalues && (idx || v)) {
1034     /*
1035         allocate enough space to hold information from the longest row.
1036     */
1037     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
1038     int     max = 1,mbs = mat->mbs,tmp;
1039     for ( i=0; i<mbs; i++ ) {
1040       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
1041       if (max < tmp) { max = tmp; }
1042     }
1043     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
1044     CHKPTRQ(mat->rowvalues);
1045     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
1046   }
1047 
1048 
1049   if (row < brstart || row >= brend) SETERRQ(1,0,"Only local rows")
1050   lrow = row - brstart;
1051 
1052   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
1053   if (!v)   {pvA = 0; pvB = 0;}
1054   if (!idx) {pcA = 0; if (!v) pcB = 0;}
1055   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1056   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1057   nztot = nzA + nzB;
1058 
1059   cmap  = mat->garray;
1060   if (v  || idx) {
1061     if (nztot) {
1062       /* Sort by increasing column numbers, assuming A and B already sorted */
1063       int imark = -1;
1064       if (v) {
1065         *v = v_p = mat->rowvalues;
1066         for ( i=0; i<nzB; i++ ) {
1067           if (cmap[cworkB[i]/bs] < cstart)   v_p[i] = vworkB[i];
1068           else break;
1069         }
1070         imark = i;
1071         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
1072         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
1073       }
1074       if (idx) {
1075         *idx = idx_p = mat->rowindices;
1076         if (imark > -1) {
1077           for ( i=0; i<imark; i++ ) {
1078             idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs;
1079           }
1080         } else {
1081           for ( i=0; i<nzB; i++ ) {
1082             if (cmap[cworkB[i]/bs] < cstart)
1083               idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
1084             else break;
1085           }
1086           imark = i;
1087         }
1088         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart*bs + cworkA[i];
1089         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
1090       }
1091     }
1092     else {
1093       if (idx) *idx = 0;
1094       if (v)   *v   = 0;
1095     }
1096   }
1097   *nz = nztot;
1098   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1099   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1100   return 0;
1101 }
1102 
1103 #undef __FUNC__
1104 #define __FUNC__ "MatRestoreRow_MPIBAIJ"
1105 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
1106 {
1107   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1108   if (baij->getrowactive == PETSC_FALSE) {
1109     SETERRQ(1,0,"MatGetRow not called");
1110   }
1111   baij->getrowactive = PETSC_FALSE;
1112   return 0;
1113 }
1114 
1115 #undef __FUNC__
1116 #define __FUNC__ "MatGetBlockSize_MPIBAIJ"
1117 int MatGetBlockSize_MPIBAIJ(Mat mat,int *bs)
1118 {
1119   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1120   *bs = baij->bs;
1121   return 0;
1122 }
1123 
1124 #undef __FUNC__
1125 #define __FUNC__ "MatZeroEntries_MPIBAIJ"
1126 int MatZeroEntries_MPIBAIJ(Mat A)
1127 {
1128   Mat_MPIBAIJ *l = (Mat_MPIBAIJ *) A->data;
1129   int         ierr;
1130   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
1131   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
1132   return 0;
1133 }
1134 
1135 #undef __FUNC__
1136 #define __FUNC__ "MatGetInfo_MPIBAIJ"
1137 int MatGetInfo_MPIBAIJ(Mat matin,MatInfoType flag,MatInfo *info)
1138 {
1139   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) matin->data;
1140   Mat         A = a->A, B = a->B;
1141   int         ierr;
1142   double      isend[5], irecv[5];
1143 
1144   info->rows_global    = (double)a->M;
1145   info->columns_global = (double)a->N;
1146   info->rows_local     = (double)a->m;
1147   info->columns_local  = (double)a->N;
1148   info->block_size     = (double)a->bs;
1149   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
1150   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->memory;
1151   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
1152   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->memory;
1153   if (flag == MAT_LOCAL) {
1154     info->nz_used      = isend[0];
1155     info->nz_allocated = isend[1];
1156     info->nz_unneeded  = isend[2];
1157     info->memory       = isend[3];
1158     info->mallocs      = isend[4];
1159   } else if (flag == MAT_GLOBAL_MAX) {
1160     MPI_Allreduce(isend,irecv,5,MPI_INT,MPI_MAX,matin->comm);
1161     info->nz_used      = irecv[0];
1162     info->nz_allocated = irecv[1];
1163     info->nz_unneeded  = irecv[2];
1164     info->memory       = irecv[3];
1165     info->mallocs      = irecv[4];
1166   } else if (flag == MAT_GLOBAL_SUM) {
1167     MPI_Allreduce(isend,irecv,5,MPI_INT,MPI_SUM,matin->comm);
1168     info->nz_used      = irecv[0];
1169     info->nz_allocated = irecv[1];
1170     info->nz_unneeded  = irecv[2];
1171     info->memory       = irecv[3];
1172     info->mallocs      = irecv[4];
1173   }
1174   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
1175   info->fill_ratio_needed = 0;
1176   info->factor_mallocs    = 0;
1177   return 0;
1178 }
1179 
1180 #undef __FUNC__
1181 #define __FUNC__ "MatSetOption_MPIBAIJ"
1182 int MatSetOption_MPIBAIJ(Mat A,MatOption op)
1183 {
1184   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1185 
1186   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
1187       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
1188       op == MAT_COLUMNS_UNSORTED ||
1189       op == MAT_COLUMNS_SORTED ||
1190       op == MAT_NEW_NONZERO_ALLOCATION_ERROR ||
1191       op == MAT_NEW_NONZERO_LOCATION_ERROR) {
1192         MatSetOption(a->A,op);
1193         MatSetOption(a->B,op);
1194   } else if (op == MAT_ROW_ORIENTED) {
1195         a->roworiented = 1;
1196         MatSetOption(a->A,op);
1197         MatSetOption(a->B,op);
1198   } else if (op == MAT_ROWS_SORTED ||
1199              op == MAT_ROWS_UNSORTED ||
1200              op == MAT_SYMMETRIC ||
1201              op == MAT_STRUCTURALLY_SYMMETRIC ||
1202              op == MAT_YES_NEW_DIAGONALS)
1203     PLogInfo(A,"Info:MatSetOption_MPIBAIJ:Option ignored\n");
1204   else if (op == MAT_COLUMN_ORIENTED) {
1205     a->roworiented = 0;
1206     MatSetOption(a->A,op);
1207     MatSetOption(a->B,op);
1208   } else if (op == MAT_IGNORE_OFF_PROC_ENTRIES) {
1209     a->donotstash = 1;
1210   } else if (op == MAT_NO_NEW_DIAGONALS)
1211     {SETERRQ(PETSC_ERR_SUP,0,"MAT_NO_NEW_DIAGONALS");}
1212   else
1213     {SETERRQ(PETSC_ERR_SUP,0,"unknown option");}
1214   return 0;
1215 }
1216 
1217 #undef __FUNC__
1218 #define __FUNC__ "MatTranspose_MPIBAIJ("
1219 int MatTranspose_MPIBAIJ(Mat A,Mat *matout)
1220 {
1221   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) A->data;
1222   Mat_SeqBAIJ *Aloc;
1223   Mat        B;
1224   int        ierr,M=baij->M,N=baij->N,*ai,*aj,row,i,*rvals,j,k,col;
1225   int        bs=baij->bs,mbs=baij->mbs;
1226   Scalar     *a;
1227 
1228   if (matout == PETSC_NULL && M != N)
1229     SETERRQ(1,0,"Square matrix only for in-place");
1230   ierr = MatCreateMPIBAIJ(A->comm,baij->bs,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);
1231   CHKERRQ(ierr);
1232 
1233   /* copy over the A part */
1234   Aloc = (Mat_SeqBAIJ*) baij->A->data;
1235   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1236   row = baij->rstart;
1237   rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
1238 
1239   for ( i=0; i<mbs; i++ ) {
1240     rvals[0] = bs*(baij->rstart + i);
1241     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1242     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1243       col = (baij->cstart+aj[j])*bs;
1244       for (k=0; k<bs; k++ ) {
1245         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1246         col++; a += bs;
1247       }
1248     }
1249   }
1250   /* copy over the B part */
1251   Aloc = (Mat_SeqBAIJ*) baij->B->data;
1252   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1253   row = baij->rstart*bs;
1254   for ( i=0; i<mbs; i++ ) {
1255     rvals[0] = bs*(baij->rstart + i);
1256     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1257     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1258       col = baij->garray[aj[j]]*bs;
1259       for (k=0; k<bs; k++ ) {
1260         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1261         col++; a += bs;
1262       }
1263     }
1264   }
1265   PetscFree(rvals);
1266   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1267   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1268 
1269   if (matout != PETSC_NULL) {
1270     *matout = B;
1271   } else {
1272     /* This isn't really an in-place transpose .... but free data structures from baij */
1273     PetscFree(baij->rowners);
1274     ierr = MatDestroy(baij->A); CHKERRQ(ierr);
1275     ierr = MatDestroy(baij->B); CHKERRQ(ierr);
1276     if (baij->colmap) PetscFree(baij->colmap);
1277     if (baij->garray) PetscFree(baij->garray);
1278     if (baij->lvec) VecDestroy(baij->lvec);
1279     if (baij->Mvctx) VecScatterDestroy(baij->Mvctx);
1280     PetscFree(baij);
1281     PetscMemcpy(A,B,sizeof(struct _p_Mat));
1282     PetscHeaderDestroy(B);
1283   }
1284   return 0;
1285 }
1286 
1287 #undef __FUNC__
1288 #define __FUNC__ "MatDiagonalScale_MPIBAIJ"
1289 int MatDiagonalScale_MPIBAIJ(Mat A,Vec ll,Vec rr)
1290 {
1291   Mat a = ((Mat_MPIBAIJ *) A->data)->A;
1292   Mat b = ((Mat_MPIBAIJ *) A->data)->B;
1293   int ierr,s1,s2,s3;
1294 
1295   if (ll)  {
1296     ierr = VecGetLocalSize(ll,&s1); CHKERRQ(ierr);
1297     ierr = MatGetLocalSize(A,&s2,&s3); CHKERRQ(ierr);
1298     if (s1!=s2) SETERRQ(1,0,"non-conforming local sizes");
1299     ierr = MatDiagonalScale(a,ll,0); CHKERRQ(ierr);
1300     ierr = MatDiagonalScale(b,ll,0); CHKERRQ(ierr);
1301   }
1302   if (rr) SETERRQ(1,0,"not supported for right vector");
1303   return 0;
1304 }
1305 
1306 /* the code does not do the diagonal entries correctly unless the
1307    matrix is square and the column and row owerships are identical.
1308    This is a BUG. The only way to fix it seems to be to access
1309    baij->A and baij->B directly and not through the MatZeroRows()
1310    routine.
1311 */
1312 #undef __FUNC__
1313 #define __FUNC__ "MatZeroRows_MPIBAIJ"
1314 int MatZeroRows_MPIBAIJ(Mat A,IS is,Scalar *diag)
1315 {
1316   Mat_MPIBAIJ    *l = (Mat_MPIBAIJ *) A->data;
1317   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
1318   int            *procs,*nprocs,j,found,idx,nsends,*work;
1319   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
1320   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
1321   int            *lens,imdex,*lrows,*values,bs=l->bs;
1322   MPI_Comm       comm = A->comm;
1323   MPI_Request    *send_waits,*recv_waits;
1324   MPI_Status     recv_status,*send_status;
1325   IS             istmp;
1326 
1327   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
1328   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
1329 
1330   /*  first count number of contributors to each processor */
1331   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
1332   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
1333   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
1334   for ( i=0; i<N; i++ ) {
1335     idx = rows[i];
1336     found = 0;
1337     for ( j=0; j<size; j++ ) {
1338       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
1339         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
1340       }
1341     }
1342     if (!found) SETERRQ(1,0,"Index out of range");
1343   }
1344   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
1345 
1346   /* inform other processors of number of messages and max length*/
1347   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
1348   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
1349   nrecvs = work[rank];
1350   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
1351   nmax = work[rank];
1352   PetscFree(work);
1353 
1354   /* post receives:   */
1355   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
1356   CHKPTRQ(rvalues);
1357   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
1358   CHKPTRQ(recv_waits);
1359   for ( i=0; i<nrecvs; i++ ) {
1360     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
1361   }
1362 
1363   /* do sends:
1364       1) starts[i] gives the starting index in svalues for stuff going to
1365          the ith processor
1366   */
1367   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
1368   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
1369   CHKPTRQ(send_waits);
1370   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
1371   starts[0] = 0;
1372   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1373   for ( i=0; i<N; i++ ) {
1374     svalues[starts[owner[i]]++] = rows[i];
1375   }
1376   ISRestoreIndices(is,&rows);
1377 
1378   starts[0] = 0;
1379   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1380   count = 0;
1381   for ( i=0; i<size; i++ ) {
1382     if (procs[i]) {
1383       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
1384     }
1385   }
1386   PetscFree(starts);
1387 
1388   base = owners[rank]*bs;
1389 
1390   /*  wait on receives */
1391   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
1392   source = lens + nrecvs;
1393   count  = nrecvs; slen = 0;
1394   while (count) {
1395     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
1396     /* unpack receives into our local space */
1397     MPI_Get_count(&recv_status,MPI_INT,&n);
1398     source[imdex]  = recv_status.MPI_SOURCE;
1399     lens[imdex]  = n;
1400     slen += n;
1401     count--;
1402   }
1403   PetscFree(recv_waits);
1404 
1405   /* move the data into the send scatter */
1406   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
1407   count = 0;
1408   for ( i=0; i<nrecvs; i++ ) {
1409     values = rvalues + i*nmax;
1410     for ( j=0; j<lens[i]; j++ ) {
1411       lrows[count++] = values[j] - base;
1412     }
1413   }
1414   PetscFree(rvalues); PetscFree(lens);
1415   PetscFree(owner); PetscFree(nprocs);
1416 
1417   /* actually zap the local rows */
1418   ierr = ISCreateGeneral(PETSC_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
1419   PLogObjectParent(A,istmp);
1420   PetscFree(lrows);
1421   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
1422   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
1423   ierr = ISDestroy(istmp); CHKERRQ(ierr);
1424 
1425   /* wait on sends */
1426   if (nsends) {
1427     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
1428     CHKPTRQ(send_status);
1429     MPI_Waitall(nsends,send_waits,send_status);
1430     PetscFree(send_status);
1431   }
1432   PetscFree(send_waits); PetscFree(svalues);
1433 
1434   return 0;
1435 }
1436 extern int MatPrintHelp_SeqBAIJ(Mat);
1437 #undef __FUNC__
1438 #define __FUNC__ "MatPrintHelp_MPIBAIJ"
1439 int MatPrintHelp_MPIBAIJ(Mat A)
1440 {
1441   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1442 
1443   if (!a->rank) return MatPrintHelp_SeqBAIJ(a->A);
1444   else return 0;
1445 }
1446 
1447 #undef __FUNC__
1448 #define __FUNC__ "MatSetUnfactored_MPIBAIJ"
1449 int MatSetUnfactored_MPIBAIJ(Mat A)
1450 {
1451   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1452   int         ierr;
1453   ierr = MatSetUnfactored(a->A); CHKERRQ(ierr);
1454   return 0;
1455 }
1456 
1457 static int MatConvertSameType_MPIBAIJ(Mat,Mat *,int);
1458 
1459 /* -------------------------------------------------------------------*/
1460 static struct _MatOps MatOps = {
1461   MatSetValues_MPIBAIJ,MatGetRow_MPIBAIJ,MatRestoreRow_MPIBAIJ,MatMult_MPIBAIJ,
1462   MatMultAdd_MPIBAIJ,MatMultTrans_MPIBAIJ,MatMultTransAdd_MPIBAIJ,0,
1463   0,0,0,0,
1464   0,0,MatTranspose_MPIBAIJ,MatGetInfo_MPIBAIJ,
1465   0,MatGetDiagonal_MPIBAIJ,MatDiagonalScale_MPIBAIJ,MatNorm_MPIBAIJ,
1466   MatAssemblyBegin_MPIBAIJ,MatAssemblyEnd_MPIBAIJ,0,MatSetOption_MPIBAIJ,
1467   MatZeroEntries_MPIBAIJ,MatZeroRows_MPIBAIJ,0,
1468   0,0,0,MatGetSize_MPIBAIJ,
1469   MatGetLocalSize_MPIBAIJ,MatGetOwnershipRange_MPIBAIJ,0,0,
1470   0,0,MatConvertSameType_MPIBAIJ,0,0,
1471   0,0,0,MatGetSubMatrices_MPIBAIJ,
1472   MatIncreaseOverlap_MPIBAIJ,MatGetValues_MPIBAIJ,0,MatPrintHelp_MPIBAIJ,
1473   MatScale_MPIBAIJ,0,0,0,MatGetBlockSize_MPIBAIJ,
1474   0,0,0,0,0,0,MatSetUnfactored_MPIBAIJ,0,MatSetValuesBlocked_MPIBAIJ};
1475 
1476 
1477 #undef __FUNC__
1478 #define __FUNC__ "MatCreateMPIBAIJ"
1479 /*@C
1480    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
1481    (block compressed row).  For good matrix assembly performance
1482    the user should preallocate the matrix storage by setting the parameters
1483    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1484    performance can be increased by more than a factor of 50.
1485 
1486    Input Parameters:
1487 .  comm - MPI communicator
1488 .  bs   - size of blockk
1489 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1490            This value should be the same as the local size used in creating the
1491            y vector for the matrix-vector product y = Ax.
1492 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1493            This value should be the same as the local size used in creating the
1494            x vector for the matrix-vector product y = Ax.
1495 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1496 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1497 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
1498            submatrix  (same for all local rows)
1499 .  d_nzz - array containing the number of block nonzeros in the various block rows
1500            of the in diagonal portion of the local (possibly different for each block
1501            row) or PETSC_NULL.  You must leave room for the diagonal entry even if
1502            it is zero.
1503 .  o_nz  - number of block nonzeros per block row in the off-diagonal portion of local
1504            submatrix (same for all local rows).
1505 .  o_nzz - array containing the number of nonzeros in the various block rows of the
1506            off-diagonal portion of the local submatrix (possibly different for
1507            each block row) or PETSC_NULL.
1508 
1509    Output Parameter:
1510 .  A - the matrix
1511 
1512    Notes:
1513    The user MUST specify either the local or global matrix dimensions
1514    (possibly both).
1515 
1516    Storage Information:
1517    For a square global matrix we define each processor's diagonal portion
1518    to be its local rows and the corresponding columns (a square submatrix);
1519    each processor's off-diagonal portion encompasses the remainder of the
1520    local matrix (a rectangular submatrix).
1521 
1522    The user can specify preallocated storage for the diagonal part of
1523    the local submatrix with either d_nz or d_nnz (not both).  Set
1524    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1525    memory allocation.  Likewise, specify preallocated storage for the
1526    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1527 
1528    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1529    the figure below we depict these three local rows and all columns (0-11).
1530 
1531 $          0 1 2 3 4 5 6 7 8 9 10 11
1532 $         -------------------
1533 $  row 3  |  o o o d d d o o o o o o
1534 $  row 4  |  o o o d d d o o o o o o
1535 $  row 5  |  o o o d d d o o o o o o
1536 $         -------------------
1537 $
1538 
1539    Thus, any entries in the d locations are stored in the d (diagonal)
1540    submatrix, and any entries in the o locations are stored in the
1541    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1542    stored simply in the MATSEQBAIJ format for compressed row storage.
1543 
1544    Now d_nz should indicate the number of nonzeros per row in the d matrix,
1545    and o_nz should indicate the number of nonzeros per row in the o matrix.
1546    In general, for PDE problems in which most nonzeros are near the diagonal,
1547    one expects d_nz >> o_nz.   For large problems you MUST preallocate memory
1548    or you will get TERRIBLE performance; see the users' manual chapter on
1549    matrices.
1550 
1551 .keywords: matrix, block, aij, compressed row, sparse, parallel
1552 
1553 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues()
1554 @*/
1555 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
1556                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1557 {
1558   Mat          B;
1559   Mat_MPIBAIJ  *b;
1560   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE,size;
1561 
1562   if (bs < 1) SETERRQ(1,0,"Invalid block size specified, must be positive");
1563 
1564   MPI_Comm_size(comm,&size);
1565   if (size == 1) {
1566     if (M == PETSC_DECIDE) M = m;
1567     if (N == PETSC_DECIDE) N = n;
1568     ierr = MatCreateSeqBAIJ(comm,bs,M,N,d_nz,d_nnz,A); CHKERRQ(ierr);
1569     return 0;
1570   }
1571 
1572   *A = 0;
1573   PetscHeaderCreate(B,_p_Mat,MAT_COOKIE,MATMPIBAIJ,comm,MatDestroy,MatView);
1574   PLogObjectCreate(B);
1575   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
1576   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
1577   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1578 
1579   B->destroy    = MatDestroy_MPIBAIJ;
1580   B->view       = MatView_MPIBAIJ;
1581   B->mapping    = 0;
1582   B->factor     = 0;
1583   B->assembled  = PETSC_FALSE;
1584 
1585   B->insertmode = NOT_SET_VALUES;
1586   MPI_Comm_rank(comm,&b->rank);
1587   MPI_Comm_size(comm,&b->size);
1588 
1589   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
1590     SETERRQ(1,0,"Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1591   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) SETERRQ(1,0,"either M or m should be specified");
1592   if ( M == PETSC_DECIDE && n == PETSC_DECIDE)SETERRQ(1,0,"either N or n should be specified");
1593   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
1594   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
1595 
1596   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1597     work[0] = m; work[1] = n;
1598     mbs = m/bs; nbs = n/bs;
1599     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
1600     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
1601     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
1602   }
1603   if (m == PETSC_DECIDE) {
1604     Mbs = M/bs;
1605     if (Mbs*bs != M) SETERRQ(1,0,"No of global rows must be divisible by blocksize");
1606     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
1607     m   = mbs*bs;
1608   }
1609   if (n == PETSC_DECIDE) {
1610     Nbs = N/bs;
1611     if (Nbs*bs != N) SETERRQ(1,0,"No of global cols must be divisible by blocksize");
1612     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
1613     n   = nbs*bs;
1614   }
1615   if (mbs*bs != m || nbs*bs != n) SETERRQ(1,0,"No of local rows, cols must be divisible by blocksize");
1616 
1617   b->m = m; B->m = m;
1618   b->n = n; B->n = n;
1619   b->N = N; B->N = N;
1620   b->M = M; B->M = M;
1621   b->bs  = bs;
1622   b->bs2 = bs*bs;
1623   b->mbs = mbs;
1624   b->nbs = nbs;
1625   b->Mbs = Mbs;
1626   b->Nbs = Nbs;
1627 
1628   /* build local table of row and column ownerships */
1629   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1630   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIBAIJ));
1631   b->cowners = b->rowners + b->size + 2;
1632   MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
1633   b->rowners[0] = 0;
1634   for ( i=2; i<=b->size; i++ ) {
1635     b->rowners[i] += b->rowners[i-1];
1636   }
1637   b->rstart    = b->rowners[b->rank];
1638   b->rend      = b->rowners[b->rank+1];
1639   b->rstart_bs = b->rstart * bs;
1640   b->rend_bs   = b->rend * bs;
1641 
1642   MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
1643   b->cowners[0] = 0;
1644   for ( i=2; i<=b->size; i++ ) {
1645     b->cowners[i] += b->cowners[i-1];
1646   }
1647   b->cstart    = b->cowners[b->rank];
1648   b->cend      = b->cowners[b->rank+1];
1649   b->cstart_bs = b->cstart * bs;
1650   b->cend_bs   = b->cend * bs;
1651 
1652   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1653   ierr = MatCreateSeqBAIJ(PETSC_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1654   PLogObjectParent(B,b->A);
1655   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1656   ierr = MatCreateSeqBAIJ(PETSC_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1657   PLogObjectParent(B,b->B);
1658 
1659   /* build cache for off array entries formed */
1660   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1661   b->donotstash  = 0;
1662   b->colmap      = 0;
1663   b->garray      = 0;
1664   b->roworiented = 1;
1665 
1666   /* stuff used in block assembly */
1667   b->barray      = 0;
1668 
1669   /* stuff used for matrix vector multiply */
1670   b->lvec        = 0;
1671   b->Mvctx       = 0;
1672 
1673   /* stuff for MatGetRow() */
1674   b->rowindices   = 0;
1675   b->rowvalues    = 0;
1676   b->getrowactive = PETSC_FALSE;
1677 
1678   *A = B;
1679   return 0;
1680 }
1681 
1682 #undef __FUNC__
1683 #define __FUNC__ "MatConvertSameType_MPIBAIJ"
1684 static int MatConvertSameType_MPIBAIJ(Mat matin,Mat *newmat,int cpvalues)
1685 {
1686   Mat         mat;
1687   Mat_MPIBAIJ *a,*oldmat = (Mat_MPIBAIJ *) matin->data;
1688   int         ierr, len=0, flg;
1689 
1690   *newmat       = 0;
1691   PetscHeaderCreate(mat,_p_Mat,MAT_COOKIE,MATMPIBAIJ,matin->comm,MatDestroy,MatView);
1692   PLogObjectCreate(mat);
1693   mat->data       = (void *) (a = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(a);
1694   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1695   mat->destroy    = MatDestroy_MPIBAIJ;
1696   mat->view       = MatView_MPIBAIJ;
1697   mat->factor     = matin->factor;
1698   mat->assembled  = PETSC_TRUE;
1699 
1700   a->m = mat->m   = oldmat->m;
1701   a->n = mat->n   = oldmat->n;
1702   a->M = mat->M   = oldmat->M;
1703   a->N = mat->N   = oldmat->N;
1704 
1705   a->bs  = oldmat->bs;
1706   a->bs2 = oldmat->bs2;
1707   a->mbs = oldmat->mbs;
1708   a->nbs = oldmat->nbs;
1709   a->Mbs = oldmat->Mbs;
1710   a->Nbs = oldmat->Nbs;
1711 
1712   a->rstart       = oldmat->rstart;
1713   a->rend         = oldmat->rend;
1714   a->cstart       = oldmat->cstart;
1715   a->cend         = oldmat->cend;
1716   a->size         = oldmat->size;
1717   a->rank         = oldmat->rank;
1718   mat->insertmode = NOT_SET_VALUES;
1719   a->rowvalues    = 0;
1720   a->getrowactive = PETSC_FALSE;
1721   a->barray       = 0;
1722 
1723   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1724   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIBAIJ));
1725   a->cowners = a->rowners + a->size + 2;
1726   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1727   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1728   if (oldmat->colmap) {
1729     a->colmap = (int *) PetscMalloc((a->Nbs)*sizeof(int));CHKPTRQ(a->colmap);
1730     PLogObjectMemory(mat,(a->Nbs)*sizeof(int));
1731     PetscMemcpy(a->colmap,oldmat->colmap,(a->Nbs)*sizeof(int));
1732   } else a->colmap = 0;
1733   if (oldmat->garray && (len = ((Mat_SeqBAIJ *) (oldmat->B->data))->nbs)) {
1734     a->garray = (int *) PetscMalloc(len*sizeof(int)); CHKPTRQ(a->garray);
1735     PLogObjectMemory(mat,len*sizeof(int));
1736     PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1737   } else a->garray = 0;
1738 
1739   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1740   PLogObjectParent(mat,a->lvec);
1741   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1742   PLogObjectParent(mat,a->Mvctx);
1743   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1744   PLogObjectParent(mat,a->A);
1745   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1746   PLogObjectParent(mat,a->B);
1747   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1748   if (flg) {
1749     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1750   }
1751   *newmat = mat;
1752   return 0;
1753 }
1754 
1755 #include "sys.h"
1756 
1757 #undef __FUNC__
1758 #define __FUNC__ "MatLoad_MPIBAIJ"
1759 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
1760 {
1761   Mat          A;
1762   int          i, nz, ierr, j,rstart, rend, fd;
1763   Scalar       *vals,*buf;
1764   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1765   MPI_Status   status;
1766   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
1767   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
1768   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,bs2,Mbs,mbs,extra_rows;
1769   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
1770   int          dcount,kmax,k,nzcount,tmp;
1771 
1772 
1773   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
1774   bs2  = bs*bs;
1775 
1776   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1777   if (!rank) {
1778     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1779     ierr = PetscBinaryRead(fd,(char *)header,4,PETSC_INT); CHKERRQ(ierr);
1780     if (header[0] != MAT_COOKIE) SETERRQ(1,0,"not matrix object");
1781   }
1782 
1783   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1784   M = header[1]; N = header[2];
1785 
1786   if (M != N) SETERRQ(1,0,"Can only do square matrices");
1787 
1788   /*
1789      This code adds extra rows to make sure the number of rows is
1790      divisible by the blocksize
1791   */
1792   Mbs        = M/bs;
1793   extra_rows = bs - M + bs*(Mbs);
1794   if (extra_rows == bs) extra_rows = 0;
1795   else                  Mbs++;
1796   if (extra_rows &&!rank) {
1797     PLogInfo(0,"MatLoad_MPIBAIJ:Padding loaded matrix to match blocksize\n");
1798   }
1799 
1800   /* determine ownership of all rows */
1801   mbs = Mbs/size + ((Mbs % size) > rank);
1802   m   = mbs * bs;
1803   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
1804   browners = rowners + size + 1;
1805   MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1806   rowners[0] = 0;
1807   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
1808   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
1809   rstart = rowners[rank];
1810   rend   = rowners[rank+1];
1811 
1812   /* distribute row lengths to all processors */
1813   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
1814   if (!rank) {
1815     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
1816     ierr = PetscBinaryRead(fd,rowlengths,M,PETSC_INT); CHKERRQ(ierr);
1817     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
1818     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1819     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
1820     MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);
1821     PetscFree(sndcounts);
1822   }
1823   else {
1824     MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);
1825   }
1826 
1827   if (!rank) {
1828     /* calculate the number of nonzeros on each processor */
1829     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1830     PetscMemzero(procsnz,size*sizeof(int));
1831     for ( i=0; i<size; i++ ) {
1832       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
1833         procsnz[i] += rowlengths[j];
1834       }
1835     }
1836     PetscFree(rowlengths);
1837 
1838     /* determine max buffer needed and allocate it */
1839     maxnz = 0;
1840     for ( i=0; i<size; i++ ) {
1841       maxnz = PetscMax(maxnz,procsnz[i]);
1842     }
1843     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1844 
1845     /* read in my part of the matrix column indices  */
1846     nz = procsnz[0];
1847     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1848     mycols = ibuf;
1849     if (size == 1)  nz -= extra_rows;
1850     ierr = PetscBinaryRead(fd,mycols,nz,PETSC_INT); CHKERRQ(ierr);
1851     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
1852 
1853     /* read in every ones (except the last) and ship off */
1854     for ( i=1; i<size-1; i++ ) {
1855       nz = procsnz[i];
1856       ierr = PetscBinaryRead(fd,cols,nz,PETSC_INT); CHKERRQ(ierr);
1857       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1858     }
1859     /* read in the stuff for the last proc */
1860     if ( size != 1 ) {
1861       nz = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
1862       ierr = PetscBinaryRead(fd,cols,nz,PETSC_INT); CHKERRQ(ierr);
1863       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
1864       MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);
1865     }
1866     PetscFree(cols);
1867   }
1868   else {
1869     /* determine buffer space needed for message */
1870     nz = 0;
1871     for ( i=0; i<m; i++ ) {
1872       nz += locrowlens[i];
1873     }
1874     ibuf = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1875     mycols = ibuf;
1876     /* receive message of column indices*/
1877     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1878     MPI_Get_count(&status,MPI_INT,&maxnz);
1879     if (maxnz != nz) SETERRQ(1,0,"something is wrong with file");
1880   }
1881 
1882   /* loop over local rows, determining number of off diagonal entries */
1883   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
1884   odlens = dlens + (rend-rstart);
1885   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
1886   PetscMemzero(mask,3*Mbs*sizeof(int));
1887   masked1 = mask    + Mbs;
1888   masked2 = masked1 + Mbs;
1889   rowcount = 0; nzcount = 0;
1890   for ( i=0; i<mbs; i++ ) {
1891     dcount  = 0;
1892     odcount = 0;
1893     for ( j=0; j<bs; j++ ) {
1894       kmax = locrowlens[rowcount];
1895       for ( k=0; k<kmax; k++ ) {
1896         tmp = mycols[nzcount++]/bs;
1897         if (!mask[tmp]) {
1898           mask[tmp] = 1;
1899           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
1900           else masked1[dcount++] = tmp;
1901         }
1902       }
1903       rowcount++;
1904     }
1905 
1906     dlens[i]  = dcount;
1907     odlens[i] = odcount;
1908 
1909     /* zero out the mask elements we set */
1910     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
1911     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
1912   }
1913 
1914   /* create our matrix */
1915   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);
1916          CHKERRQ(ierr);
1917   A = *newmat;
1918   MatSetOption(A,MAT_COLUMNS_SORTED);
1919 
1920   if (!rank) {
1921     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
1922     /* read in my part of the matrix numerical values  */
1923     nz = procsnz[0];
1924     vals = buf;
1925     mycols = ibuf;
1926     if (size == 1)  nz -= extra_rows;
1927     ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
1928     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
1929 
1930     /* insert into matrix */
1931     jj      = rstart*bs;
1932     for ( i=0; i<m; i++ ) {
1933       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1934       mycols += locrowlens[i];
1935       vals   += locrowlens[i];
1936       jj++;
1937     }
1938     /* read in other processors (except the last one) and ship out */
1939     for ( i=1; i<size-1; i++ ) {
1940       nz = procsnz[i];
1941       vals = buf;
1942       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
1943       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1944     }
1945     /* the last proc */
1946     if ( size != 1 ){
1947       nz = procsnz[i] - extra_rows;
1948       vals = buf;
1949       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
1950       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
1951       MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);
1952     }
1953     PetscFree(procsnz);
1954   }
1955   else {
1956     /* receive numeric values */
1957     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
1958 
1959     /* receive message of values*/
1960     vals = buf;
1961     mycols = ibuf;
1962     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1963     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1964     if (maxnz != nz) SETERRQ(1,0,"something is wrong with file");
1965 
1966     /* insert into matrix */
1967     jj      = rstart*bs;
1968     for ( i=0; i<m; i++ ) {
1969       ierr    = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1970       mycols += locrowlens[i];
1971       vals   += locrowlens[i];
1972       jj++;
1973     }
1974   }
1975   PetscFree(locrowlens);
1976   PetscFree(buf);
1977   PetscFree(ibuf);
1978   PetscFree(rowners);
1979   PetscFree(dlens);
1980   PetscFree(mask);
1981   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1982   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1983   return 0;
1984 }
1985 
1986 
1987