xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision 3a40ed3dce77c081171d005ae1a6ff4bb9d13b6f)
1 #ifdef PETSC_RCS_HEADER
2 static char vcid[] = "$Id: mpibaij.c,v 1.82 1997/09/25 21:43:23 bsmith Exp bsmith $";
3 #endif
4 
5 #include "pinclude/pviewer.h"
6 #include "src/mat/impls/baij/mpi/mpibaij.h"
7 #include "src/vec/vecimpl.h"
8 
9 
10 extern int MatSetUpMultiply_MPIBAIJ(Mat);
11 extern int DisAssemble_MPIBAIJ(Mat);
12 extern int MatIncreaseOverlap_MPIBAIJ(Mat,int,IS *,int);
13 extern int MatGetSubMatrices_MPIBAIJ(Mat,int,IS *,IS *,MatGetSubMatrixCall,Mat **);
14 
15 /*
16      Local utility routine that creates a mapping from the global column
17    number to the local number in the off-diagonal part of the local
18    storage of the matrix.  This is done in a non scable way since the
19    length of colmap equals the global matrix length.
20 */
21 #undef __FUNC__
22 #define __FUNC__ "CreateColmap_MPIBAIJ_Private"
23 static int CreateColmap_MPIBAIJ_Private(Mat mat)
24 {
25   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
26   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
27   int         nbs = B->nbs,i,bs=B->bs;;
28 
29   baij->colmap = (int *) PetscMalloc((baij->Nbs+1)*sizeof(int));CHKPTRQ(baij->colmap);
30   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
31   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
32   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i*bs+1;
33   PetscFunctionReturn(0);
34 }
35 
36 #define CHUNKSIZE  10
37 
38 #define  MatSetValues_SeqBAIJ_A_Private(row,col,value,addv) \
39 { \
40  \
41     brow = row/bs;  \
42     rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
43     rmax = aimax[brow]; nrow = ailen[brow]; \
44       bcol = col/bs; \
45       ridx = row % bs; cidx = col % bs; \
46       low = 0; high = nrow; \
47       while (high-low > 3) { \
48         t = (low+high)/2; \
49         if (rp[t] > bcol) high = t; \
50         else              low  = t; \
51       } \
52       for ( _i=low; _i<high; _i++ ) { \
53         if (rp[_i] > bcol) break; \
54         if (rp[_i] == bcol) { \
55           bap  = ap +  bs2*_i + bs*cidx + ridx; \
56           if (addv == ADD_VALUES) *bap += value;  \
57           else                    *bap  = value;  \
58           goto a_noinsert; \
59         } \
60       } \
61       if (a->nonew == 1) goto a_noinsert; \
62       else if (a->nonew == -1) SETERRQ(1,0,"Inserting a new nonzero in the matrix"); \
63       if (nrow >= rmax) { \
64         /* there is no extra room in row, therefore enlarge */ \
65         int    new_nz = ai[a->mbs] + CHUNKSIZE,len,*new_i,*new_j; \
66         Scalar *new_a; \
67  \
68         if (a->nonew == -2) SETERRQ(1,0,"Inserting a new nonzero in the matrix"); \
69  \
70         /* malloc new storage space */ \
71         len     = new_nz*(sizeof(int)+bs2*sizeof(Scalar))+(a->mbs+1)*sizeof(int); \
72         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
73         new_j   = (int *) (new_a + bs2*new_nz); \
74         new_i   = new_j + new_nz; \
75  \
76         /* copy over old data into new slots */ \
77         for ( ii=0; ii<brow+1; ii++ ) {new_i[ii] = ai[ii];} \
78         for ( ii=brow+1; ii<a->mbs+1; ii++ ) {new_i[ii] = ai[ii]+CHUNKSIZE;} \
79         PetscMemcpy(new_j,aj,(ai[brow]+nrow)*sizeof(int)); \
80         len = (new_nz - CHUNKSIZE - ai[brow] - nrow); \
81         PetscMemcpy(new_j+ai[brow]+nrow+CHUNKSIZE,aj+ai[brow]+nrow, \
82                                                            len*sizeof(int)); \
83         PetscMemcpy(new_a,aa,(ai[brow]+nrow)*bs2*sizeof(Scalar)); \
84         PetscMemzero(new_a+bs2*(ai[brow]+nrow),bs2*CHUNKSIZE*sizeof(Scalar)); \
85         PetscMemcpy(new_a+bs2*(ai[brow]+nrow+CHUNKSIZE), \
86                     aa+bs2*(ai[brow]+nrow),bs2*len*sizeof(Scalar));  \
87         /* free up old matrix storage */ \
88         PetscFree(a->a);  \
89         if (!a->singlemalloc) {PetscFree(a->i);PetscFree(a->j);} \
90         aa = a->a = new_a; ai = a->i = new_i; aj = a->j = new_j;  \
91         a->singlemalloc = 1; \
92  \
93         rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
94         rmax = aimax[brow] = aimax[brow] + CHUNKSIZE; \
95         PLogObjectMemory(A,CHUNKSIZE*(sizeof(int) + bs2*sizeof(Scalar))); \
96         a->maxnz += bs2*CHUNKSIZE; \
97         a->reallocs++; \
98         a->nz++; \
99       } \
100       N = nrow++ - 1;  \
101       /* shift up all the later entries in this row */ \
102       for ( ii=N; ii>=_i; ii-- ) { \
103         rp[ii+1] = rp[ii]; \
104         PetscMemcpy(ap+bs2*(ii+1),ap+bs2*(ii),bs2*sizeof(Scalar)); \
105       } \
106       if (N>=_i) PetscMemzero(ap+bs2*_i,bs2*sizeof(Scalar));  \
107       rp[_i]                      = bcol;  \
108       ap[bs2*_i + bs*cidx + ridx] = value;  \
109       a_noinsert:; \
110     ailen[brow] = nrow; \
111 }
112 
113 #define  MatSetValues_SeqBAIJ_B_Private(row,col,value,addv) \
114 { \
115  \
116     brow = row/bs;  \
117     rp   = bj + bi[brow]; ap = ba + bs2*bi[brow]; \
118     rmax = bimax[brow]; nrow = bilen[brow]; \
119       bcol = col/bs; \
120       ridx = row % bs; cidx = col % bs; \
121       low = 0; high = nrow; \
122       while (high-low > 3) { \
123         t = (low+high)/2; \
124         if (rp[t] > bcol) high = t; \
125         else              low  = t; \
126       } \
127       for ( _i=low; _i<high; _i++ ) { \
128         if (rp[_i] > bcol) break; \
129         if (rp[_i] == bcol) { \
130           bap  = ap +  bs2*_i + bs*cidx + ridx; \
131           if (addv == ADD_VALUES) *bap += value;  \
132           else                    *bap  = value;  \
133           goto b_noinsert; \
134         } \
135       } \
136       if (b->nonew == 1) goto b_noinsert; \
137       else if (b->nonew == -1) SETERRQ(1,0,"Inserting a new nonzero in the matrix"); \
138       if (nrow >= rmax) { \
139         /* there is no extra room in row, therefore enlarge */ \
140         int    new_nz = bi[b->mbs] + CHUNKSIZE,len,*new_i,*new_j; \
141         Scalar *new_a; \
142  \
143         if (b->nonew == -2) SETERRQ(1,0,"Inserting a new nonzero in the matrix"); \
144  \
145         /* malloc new storage space */ \
146         len     = new_nz*(sizeof(int)+bs2*sizeof(Scalar))+(b->mbs+1)*sizeof(int); \
147         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
148         new_j   = (int *) (new_a + bs2*new_nz); \
149         new_i   = new_j + new_nz; \
150  \
151         /* copy over old data into new slots */ \
152         for ( ii=0; ii<brow+1; ii++ ) {new_i[ii] = bi[ii];} \
153         for ( ii=brow+1; ii<b->mbs+1; ii++ ) {new_i[ii] = bi[ii]+CHUNKSIZE;} \
154         PetscMemcpy(new_j,bj,(bi[brow]+nrow)*sizeof(int)); \
155         len = (new_nz - CHUNKSIZE - bi[brow] - nrow); \
156         PetscMemcpy(new_j+bi[brow]+nrow+CHUNKSIZE,bj+bi[brow]+nrow, \
157                                                            len*sizeof(int)); \
158         PetscMemcpy(new_a,ba,(bi[brow]+nrow)*bs2*sizeof(Scalar)); \
159         PetscMemzero(new_a+bs2*(bi[brow]+nrow),bs2*CHUNKSIZE*sizeof(Scalar)); \
160         PetscMemcpy(new_a+bs2*(bi[brow]+nrow+CHUNKSIZE), \
161                     ba+bs2*(bi[brow]+nrow),bs2*len*sizeof(Scalar));  \
162         /* free up old matrix storage */ \
163         PetscFree(b->a);  \
164         if (!b->singlemalloc) {PetscFree(b->i);PetscFree(b->j);} \
165         ba = b->a = new_a; bi = b->i = new_i; bj = b->j = new_j;  \
166         b->singlemalloc = 1; \
167  \
168         rp   = bj + bi[brow]; ap = ba + bs2*bi[brow]; \
169         rmax = bimax[brow] = bimax[brow] + CHUNKSIZE; \
170         PLogObjectMemory(B,CHUNKSIZE*(sizeof(int) + bs2*sizeof(Scalar))); \
171         b->maxnz += bs2*CHUNKSIZE; \
172         b->reallocs++; \
173         b->nz++; \
174       } \
175       N = nrow++ - 1;  \
176       /* shift up all the later entries in this row */ \
177       for ( ii=N; ii>=_i; ii-- ) { \
178         rp[ii+1] = rp[ii]; \
179         PetscMemcpy(ap+bs2*(ii+1),ap+bs2*(ii),bs2*sizeof(Scalar)); \
180       } \
181       if (N>=_i) PetscMemzero(ap+bs2*_i,bs2*sizeof(Scalar));  \
182       rp[_i]                      = bcol;  \
183       ap[bs2*_i + bs*cidx + ridx] = value;  \
184       b_noinsert:; \
185     bilen[brow] = nrow; \
186 }
187 
188 #undef __FUNC__
189 #define __FUNC__ "MatSetValues_MPIBAIJ"
190 int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
191 {
192   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
193   Scalar      value;
194   int         ierr,i,j,row,col;
195   int         roworiented = baij->roworiented,rstart_orig=baij->rstart_bs ;
196   int         rend_orig=baij->rend_bs,cstart_orig=baij->cstart_bs;
197   int         cend_orig=baij->cend_bs,bs=baij->bs;
198 
199   /* Some Variables required in the macro */
200   Mat         A = baij->A;
201   Mat_SeqBAIJ *a = (Mat_SeqBAIJ *) (A)->data;
202   int         *aimax=a->imax,*ai=a->i,*ailen=a->ilen,*aj=a->j;
203   Scalar      *aa=a->a;
204 
205   Mat         B = baij->B;
206   Mat_SeqBAIJ *b = (Mat_SeqBAIJ *) (B)->data;
207   int         *bimax=b->imax,*bi=b->i,*bilen=b->ilen,*bj=b->j;
208   Scalar      *ba=b->a;
209 
210   int         *rp,ii,nrow,_i,rmax,N,brow,bcol;
211   int         low,high,t,ridx,cidx,bs2=a->bs2;
212   Scalar      *ap,*bap;
213 
214   for ( i=0; i<m; i++ ) {
215 #if defined(USE_PETSC_BOPT_g)
216     if (im[i] < 0) SETERRQ(1,0,"Negative row");
217     if (im[i] >= baij->M) SETERRQ(1,0,"Row too large");
218 #endif
219     if (im[i] >= rstart_orig && im[i] < rend_orig) {
220       row = im[i] - rstart_orig;
221       for ( j=0; j<n; j++ ) {
222         if (in[j] >= cstart_orig && in[j] < cend_orig){
223           col = in[j] - cstart_orig;
224           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
225           MatSetValues_SeqBAIJ_A_Private(row,col,value,addv);
226           /* ierr = MatSetValues_SeqBAIJ(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
227         }
228 #if defined(USE_PETSC_BOPT_g)
229         else if (in[j] < 0) {SETERRQ(1,0,"Negative column");}
230         else if (in[j] >= baij->N) {SETERRQ(1,0,"Col too large");}
231 #endif
232         else {
233           if (mat->was_assembled) {
234             if (!baij->colmap) {
235               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
236             }
237             col = baij->colmap[in[j]/bs] - 1 + in[j]%bs;
238             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
239               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
240               col =  in[j];
241               /* Reinitialize the variables required by MatSetValues_SeqBAIJ_B_Private() */
242               B = baij->B;
243               b = (Mat_SeqBAIJ *) (B)->data;
244               bimax=b->imax;bi=b->i;bilen=b->ilen;bj=b->j;
245               ba=b->a;
246             }
247           }
248           else col = in[j];
249           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
250           MatSetValues_SeqBAIJ_B_Private(row,col,value,addv);
251           /* ierr = MatSetValues_SeqBAIJ(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
252         }
253       }
254     }
255     else {
256       if (roworiented && !baij->donotstash) {
257         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
258       }
259       else {
260         if (!baij->donotstash) {
261           row = im[i];
262 	  for ( j=0; j<n; j++ ) {
263 	    ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
264           }
265         }
266       }
267     }
268   }
269   PetscFunctionReturn(0);
270 }
271 
272 extern int MatSetValuesBlocked_SeqBAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
273 #undef __FUNC__
274 #define __FUNC__ "MatSetValuesBlocked_MPIBAIJ"
275 int MatSetValuesBlocked_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
276 {
277   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
278   Scalar      *value,*barray=baij->barray;
279   int         ierr,i,j,ii,jj,row,col,k,l;
280   int         roworiented = baij->roworiented,rstart=baij->rstart ;
281   int         rend=baij->rend,cstart=baij->cstart,stepval;
282   int         cend=baij->cend,bs=baij->bs,bs2=baij->bs2;
283 
284 
285   if(!barray) {
286     baij->barray = barray = (Scalar*) PetscMalloc(bs2*sizeof(Scalar)); CHKPTRQ(barray);
287   }
288 
289   if (roworiented) {
290     stepval = (n-1)*bs;
291   } else {
292     stepval = (m-1)*bs;
293   }
294   for ( i=0; i<m; i++ ) {
295 #if defined(USE_PETSC_BOPT_g)
296     if (im[i] < 0) SETERRQ(1,0,"Negative row");
297     if (im[i] >= baij->Mbs) SETERRQ(1,0,"Row too large");
298 #endif
299     if (im[i] >= rstart && im[i] < rend) {
300       row = im[i] - rstart;
301       for ( j=0; j<n; j++ ) {
302         /* If NumCol = 1 then a copy is not required */
303         if ((roworiented) && (n == 1)) {
304           barray = v + i*bs2;
305         } else if((!roworiented) && (m == 1)) {
306           barray = v + j*bs2;
307         } else { /* Here a copy is required */
308           if (roworiented) {
309             value = v + i*(stepval+bs)*bs + j*bs;
310           } else {
311             value = v + j*(stepval+bs)*bs + i*bs;
312           }
313           for ( ii=0; ii<bs; ii++,value+=stepval ) {
314             for (jj=0; jj<bs; jj++ ) {
315               *barray++  = *value++;
316             }
317           }
318           barray -=bs2;
319         }
320 
321         if (in[j] >= cstart && in[j] < cend){
322           col  = in[j] - cstart;
323           ierr = MatSetValuesBlocked_SeqBAIJ(baij->A,1,&row,1,&col,barray,addv);CHKERRQ(ierr);
324         }
325 #if defined(USE_PETSC_BOPT_g)
326         else if (in[j] < 0) {SETERRQ(1,0,"Negative column");}
327         else if (in[j] >= baij->Nbs) {SETERRQ(1,0,"Column too large");}
328 #endif
329         else {
330           if (mat->was_assembled) {
331             if (!baij->colmap) {
332               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
333             }
334 
335 #if defined(USE_PETSC_BOPT_g)
336             if ((baij->colmap[in[j]] - 1) % bs) {SETERRQ(1,0,"Incorrect colmap");}
337 #endif
338             col = (baij->colmap[in[j]] - 1)/bs;
339             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
340               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
341               col =  in[j];
342             }
343           }
344           else col = in[j];
345           ierr = MatSetValuesBlocked_SeqBAIJ(baij->B,1,&row,1,&col,barray,addv);CHKERRQ(ierr);
346         }
347       }
348     }
349     else {
350       if (!baij->donotstash) {
351         if (roworiented ) {
352           row   = im[i]*bs;
353           value = v + i*(stepval+bs)*bs;
354           for ( j=0; j<bs; j++,row++ ) {
355             for ( k=0; k<n; k++ ) {
356               for ( col=in[k]*bs,l=0; l<bs; l++,col++) {
357                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
358               }
359             }
360           }
361         }
362         else {
363           for ( j=0; j<n; j++ ) {
364             value = v + j*(stepval+bs)*bs + i*bs;
365             col   = in[j]*bs;
366             for ( k=0; k<bs; k++,col++,value+=stepval) {
367               for ( row = im[i]*bs,l=0; l<bs; l++,row++) {
368                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
369               }
370             }
371           }
372         }
373       }
374     }
375   }
376   PetscFunctionReturn(0);
377 }
378 
379 #undef __FUNC__
380 #define __FUNC__ "MatGetValues_MPIBAIJ"
381 int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
382 {
383   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
384   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
385   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col;
386 
387   for ( i=0; i<m; i++ ) {
388     if (idxm[i] < 0) SETERRQ(1,0,"Negative row");
389     if (idxm[i] >= baij->M) SETERRQ(1,0,"Row too large");
390     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
391       row = idxm[i] - bsrstart;
392       for ( j=0; j<n; j++ ) {
393         if (idxn[j] < 0) SETERRQ(1,0,"Negative column");
394         if (idxn[j] >= baij->N) SETERRQ(1,0,"Col too large");
395         if (idxn[j] >= bscstart && idxn[j] < bscend){
396           col = idxn[j] - bscstart;
397           ierr = MatGetValues(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
398         }
399         else {
400           if (!baij->colmap) {
401             ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
402           }
403           if((baij->colmap[idxn[j]/bs]-1 < 0) ||
404              (baij->garray[(baij->colmap[idxn[j]/bs]-1)/bs] != idxn[j]/bs)) *(v+i*n+j) = 0.0;
405           else {
406             col  = (baij->colmap[idxn[j]/bs]-1) + idxn[j]%bs;
407             ierr = MatGetValues(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
408           }
409         }
410       }
411     }
412     else {
413       SETERRQ(1,0,"Only local values currently supported");
414     }
415   }
416   PetscFunctionReturn(0);
417 }
418 
419 #undef __FUNC__
420 #define __FUNC__ "MatNorm_MPIBAIJ"
421 int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
422 {
423   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
424   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
425   int        ierr, i,bs2=baij->bs2;
426   double     sum = 0.0;
427   Scalar     *v;
428 
429   if (baij->size == 1) {
430     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
431   } else {
432     if (type == NORM_FROBENIUS) {
433       v = amat->a;
434       for (i=0; i<amat->nz*bs2; i++ ) {
435 #if defined(USE_PETSC_COMPLEX)
436         sum += real(conj(*v)*(*v)); v++;
437 #else
438         sum += (*v)*(*v); v++;
439 #endif
440       }
441       v = bmat->a;
442       for (i=0; i<bmat->nz*bs2; i++ ) {
443 #if defined(USE_PETSC_COMPLEX)
444         sum += real(conj(*v)*(*v)); v++;
445 #else
446         sum += (*v)*(*v); v++;
447 #endif
448       }
449       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
450       *norm = sqrt(*norm);
451     }
452     else
453       SETERRQ(PETSC_ERR_SUP,0,"No support for this norm yet");
454   }
455   PetscFunctionReturn(0);
456 }
457 
458 #undef __FUNC__
459 #define __FUNC__ "MatAssemblyBegin_MPIBAIJ"
460 int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
461 {
462   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
463   MPI_Comm    comm = mat->comm;
464   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
465   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
466   MPI_Request *send_waits,*recv_waits;
467   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
468   InsertMode  addv;
469   Scalar      *rvalues,*svalues;
470 
471   /* make sure all processors are either in INSERTMODE or ADDMODE */
472   MPI_Allreduce(&mat->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
473   if (addv == (ADD_VALUES|INSERT_VALUES)) {
474     SETERRQ(1,0,"Some processors inserted others added");
475   }
476   mat->insertmode = addv; /* in case this processor had no cache */
477 
478   /*  first count number of contributors to each processor */
479   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
480   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
481   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
482   for ( i=0; i<baij->stash.n; i++ ) {
483     idx = baij->stash.idx[i];
484     for ( j=0; j<size; j++ ) {
485       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
486         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
487       }
488     }
489   }
490   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
491 
492   /* inform other processors of number of messages and max length*/
493   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
494   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
495   nreceives = work[rank];
496   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
497   nmax = work[rank];
498   PetscFree(work);
499 
500   /* post receives:
501        1) each message will consist of ordered pairs
502      (global index,value) we store the global index as a double
503      to simplify the message passing.
504        2) since we don't know how long each individual message is we
505      allocate the largest needed buffer for each receive. Potentially
506      this is a lot of wasted space.
507 
508 
509        This could be done better.
510   */
511   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
512   CHKPTRQ(rvalues);
513   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
514   CHKPTRQ(recv_waits);
515   for ( i=0; i<nreceives; i++ ) {
516     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
517               comm,recv_waits+i);
518   }
519 
520   /* do sends:
521       1) starts[i] gives the starting index in svalues for stuff going to
522          the ith processor
523   */
524   svalues = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
525   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
526   CHKPTRQ(send_waits);
527   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
528   starts[0] = 0;
529   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
530   for ( i=0; i<baij->stash.n; i++ ) {
531     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
532     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
533     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
534   }
535   PetscFree(owner);
536   starts[0] = 0;
537   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
538   count = 0;
539   for ( i=0; i<size; i++ ) {
540     if (procs[i]) {
541       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
542                 comm,send_waits+count++);
543     }
544   }
545   PetscFree(starts); PetscFree(nprocs);
546 
547   /* Free cache space */
548   PLogInfo(mat,"MatAssemblyBegin_MPIBAIJ:Number of off-processor values %d\n",baij->stash.n);
549   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
550 
551   baij->svalues    = svalues;    baij->rvalues    = rvalues;
552   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
553   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
554   baij->rmax       = nmax;
555 
556   PetscFunctionReturn(0);
557 }
558 #include <math.h>
559 #define HASH_KEY 0.6180339887
560 #define HASH1(size,key) ((int)((size)*fmod(((key)*HASH_KEY),1)))
561 #define HASH2(size,key) ((int)((size)*fmod(((key+0.5)*HASH_KEY),1)))
562 
563 
564 int CreateHashTable(Mat mat,double factor)
565 {
566   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
567   Mat         A = baij->A, B=baij->B;
568   Mat_SeqBAIJ *a=(Mat_SeqBAIJ *)A->data, *b=(Mat_SeqBAIJ *)B->data;
569   int         i,j,k,nz=a->nz+b->nz,h1,h2,*ai=a->i,*aj=a->j,*bi=b->i,*bj=b->j;
570   int         size=(int)(factor*nz),ct=0,max1=0,max2=0;
571   /* Scalar      *aa=a->a,*ba=b->a; */
572   double      key;
573   static double *HT;
574   static      int flag=1;
575   extern int PetscGlobalRank;
576   flag = 1;
577   /* Allocate Memory for Hash Table */
578   if (flag) {
579     HT = (double*)PetscMalloc(size*sizeof(double));
580     flag = 0;
581   }
582   PetscMemzero(HT,size*sizeof(double));
583 
584   /* Loop Over A */
585   for ( i=0; i<a->n; i++ ) {
586     for ( j=ai[i]; j<ai[i+1]; j++ ) {
587       key = i*baij->n+aj[j]+1;
588       h1  = HASH1(size,key);
589       h2  = HASH2(size,key);
590 
591       for ( k=1; k<size; k++ ){
592         if (HT[(h1*k)%size] == 0.0) {
593           HT[(h1*k)%size] = key;
594           break;
595         } else ct++;
596       }
597       if (k> max1) max1 =k;
598     }
599   }
600   /* Loop Over B */
601   for ( i=0; i<b->n; i++ ) {
602     for ( j=bi[i]; j<bi[i+1]; j++ ) {
603       key = i*b->n+bj[j]+1;
604       h1  = HASH1(size,key);
605       for ( k=1; k<size; k++ ){
606         if (HT[(h1*k)%size] == 0.0) {
607           HT[(h1*k)%size] = key;
608           break;
609         } else ct++;
610       }
611       if (k> max2) max2 =k;
612     }
613   }
614 
615   /* Print Summary */
616   for ( i=0,key=0.0,j=0; i<size; i++)
617     if (HT[i]) {j++;}
618 
619   printf("[%d] fact = %3.2f max1 = %5d max2 = %5d Size %5d - Searches %5d Avg %5.2f Keys %5d\n",
620          PetscGlobalRank,factor,max1,max2,size,ct+j,((float)ct+j)/j,j);
621   PetscFree(HT);
622   PetscFunctionReturn(0);
623 }
624 
625 #undef __FUNC__
626 #define __FUNC__ "MatAssemblyEnd_MPIBAIJ"
627 int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
628 {
629   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
630   MPI_Status  *send_status,recv_status;
631   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
632   int         bs=baij->bs,row,col,other_disassembled,flg;
633   Scalar      *values,val;
634   InsertMode  addv = mat->insertmode;
635 
636   /*  wait on receives */
637   while (count) {
638     MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);
639     /* unpack receives into our local space */
640     values = baij->rvalues + 3*imdex*baij->rmax;
641     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
642     n = n/3;
643     for ( i=0; i<n; i++ ) {
644       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
645       col = (int) PetscReal(values[3*i+1]);
646       val = values[3*i+2];
647       if (col >= baij->cstart*bs && col < baij->cend*bs) {
648         col -= baij->cstart*bs;
649         ierr = MatSetValues(baij->A,1,&row,1,&col,&val,addv); CHKERRQ(ierr)
650       }
651       else {
652         if (mat->was_assembled) {
653           if (!baij->colmap) {
654             ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);
655           }
656           col = (baij->colmap[col/bs]) - 1 + col%bs;
657           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
658             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
659             col = (int) PetscReal(values[3*i+1]);
660           }
661         }
662         ierr = MatSetValues(baij->B,1,&row,1,&col,&val,addv); CHKERRQ(ierr)
663       }
664     }
665     count--;
666   }
667   PetscFree(baij->recv_waits); PetscFree(baij->rvalues);
668 
669   /* wait on sends */
670   if (baij->nsends) {
671     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));
672     CHKPTRQ(send_status);
673     MPI_Waitall(baij->nsends,baij->send_waits,send_status);
674     PetscFree(send_status);
675   }
676   PetscFree(baij->send_waits); PetscFree(baij->svalues);
677 
678   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
679   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
680 
681   /* determine if any processor has disassembled, if so we must
682      also disassemble ourselfs, in order that we may reassemble. */
683   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
684   if (mat->was_assembled && !other_disassembled) {
685     ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
686   }
687 
688   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
689     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
690   }
691   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
692   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
693 
694   ierr = OptionsHasName(PETSC_NULL,"-use_hash",&flg); CHKERRQ(ierr);
695   if (flg) {
696     double fact;
697     for ( fact=1.2; fact<2.0; fact +=0.05) CreateHashTable(mat,fact);
698   }
699   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
700   PetscFunctionReturn(0);
701 }
702 
703 #undef __FUNC__
704 #define __FUNC__ "MatView_MPIBAIJ_Binary"
705 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
706 {
707   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
708   int          ierr;
709 
710   if (baij->size == 1) {
711     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
712   }
713   else SETERRQ(1,0,"Only uniprocessor output supported");
714   PetscFunctionReturn(0);
715 }
716 
717 #undef __FUNC__
718 #define __FUNC__ "MatView_MPIBAIJ_ASCIIorDraworMatlab"
719 static int MatView_MPIBAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
720 {
721   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
722   int          ierr, format,rank,bs = baij->bs;
723   FILE         *fd;
724   ViewerType   vtype;
725 
726   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
727   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
728     ierr = ViewerGetFormat(viewer,&format);
729     if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
730       MatInfo info;
731       MPI_Comm_rank(mat->comm,&rank);
732       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
733       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
734       PetscSequentialPhaseBegin(mat->comm,1);
735       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
736               rank,baij->m,(int)info.nz_used*bs,(int)info.nz_allocated*bs,
737               baij->bs,(int)info.memory);
738       ierr = MatGetInfo(baij->A,MAT_LOCAL,&info);
739       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
740       ierr = MatGetInfo(baij->B,MAT_LOCAL,&info);
741       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
742       fflush(fd);
743       PetscSequentialPhaseEnd(mat->comm,1);
744       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
745       PetscFunctionReturn(0);
746     }
747     else if (format == VIEWER_FORMAT_ASCII_INFO) {
748       PetscPrintf(mat->comm,"  block size is %d\n",bs);
749       PetscFunctionReturn(0);
750     }
751   }
752 
753   if (vtype == DRAW_VIEWER) {
754     Draw       draw;
755     PetscTruth isnull;
756     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
757     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) PetscFunctionReturn(0);
758   }
759 
760   if (vtype == ASCII_FILE_VIEWER) {
761     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
762     PetscSequentialPhaseBegin(mat->comm,1);
763     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
764            baij->rank,baij->m,baij->rstart*bs,baij->rend*bs,baij->n,
765             baij->cstart*bs,baij->cend*bs);
766     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
767     ierr = MatView(baij->B,viewer); CHKERRQ(ierr);
768     fflush(fd);
769     PetscSequentialPhaseEnd(mat->comm,1);
770   }
771   else {
772     int size = baij->size;
773     rank = baij->rank;
774     if (size == 1) {
775       ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
776     }
777     else {
778       /* assemble the entire matrix onto first processor. */
779       Mat         A;
780       Mat_SeqBAIJ *Aloc;
781       int         M = baij->M, N = baij->N,*ai,*aj,row,col,i,j,k,*rvals;
782       int         mbs=baij->mbs;
783       Scalar      *a;
784 
785       if (!rank) {
786         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
787         CHKERRQ(ierr);
788       }
789       else {
790         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
791         CHKERRQ(ierr);
792       }
793       PLogObjectParent(mat,A);
794 
795       /* copy over the A part */
796       Aloc = (Mat_SeqBAIJ*) baij->A->data;
797       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
798       row = baij->rstart;
799       rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
800 
801       for ( i=0; i<mbs; i++ ) {
802         rvals[0] = bs*(baij->rstart + i);
803         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
804         for ( j=ai[i]; j<ai[i+1]; j++ ) {
805           col = (baij->cstart+aj[j])*bs;
806           for (k=0; k<bs; k++ ) {
807             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
808             col++; a += bs;
809           }
810         }
811       }
812       /* copy over the B part */
813       Aloc = (Mat_SeqBAIJ*) baij->B->data;
814       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
815       row = baij->rstart*bs;
816       for ( i=0; i<mbs; i++ ) {
817         rvals[0] = bs*(baij->rstart + i);
818         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
819         for ( j=ai[i]; j<ai[i+1]; j++ ) {
820           col = baij->garray[aj[j]]*bs;
821           for (k=0; k<bs; k++ ) {
822             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
823             col++; a += bs;
824           }
825         }
826       }
827       PetscFree(rvals);
828       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
829       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
830       if (!rank) {
831         ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
832       }
833       ierr = MatDestroy(A); CHKERRQ(ierr);
834     }
835   }
836   PetscFunctionReturn(0);
837 }
838 
839 
840 
841 #undef __FUNC__
842 #define __FUNC__ "MatView_MPIBAIJ"
843 int MatView_MPIBAIJ(PetscObject obj,Viewer viewer)
844 {
845   Mat         mat = (Mat) obj;
846   int         ierr;
847   ViewerType  vtype;
848 
849   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
850   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
851       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
852     ierr = MatView_MPIBAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
853   } else if (vtype == BINARY_FILE_VIEWER) {
854     ierr = MatView_MPIBAIJ_Binary(mat,viewer);CHKERRQ(ierr);
855   }
856   PetscFunctionReturn(0);
857 }
858 
859 #undef __FUNC__
860 #define __FUNC__ "MatDestroy_MPIBAIJ"
861 int MatDestroy_MPIBAIJ(PetscObject obj)
862 {
863   Mat         mat = (Mat) obj;
864   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
865   int         ierr;
866 
867 #if defined(USE_PETSC_LOG)
868   PLogObjectState(obj,"Rows=%d, Cols=%d",baij->M,baij->N);
869 #endif
870 
871   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
872   PetscFree(baij->rowners);
873   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
874   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
875   if (baij->colmap) PetscFree(baij->colmap);
876   if (baij->garray) PetscFree(baij->garray);
877   if (baij->lvec)   VecDestroy(baij->lvec);
878   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
879   if (baij->rowvalues) PetscFree(baij->rowvalues);
880   if (baij->barray) PetscFree(baij->barray);
881   PetscFree(baij);
882   PLogObjectDestroy(mat);
883   PetscHeaderDestroy(mat);
884   PetscFunctionReturn(0);
885 }
886 
887 #undef __FUNC__
888 #define __FUNC__ "MatMult_MPIBAIJ"
889 int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
890 {
891   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
892   int         ierr, nt;
893 
894   VecGetLocalSize_Fast(xx,nt);
895   if (nt != a->n) {
896     SETERRQ(1,0,"Incompatible partition of A and xx");
897   }
898   VecGetLocalSize_Fast(yy,nt);
899   if (nt != a->m) {
900     SETERRQ(1,0,"Incompatible parition of A and yy");
901   }
902   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
903   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
904   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
905   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
906   ierr = VecScatterPostRecvs(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
907   PetscFunctionReturn(0);
908 }
909 
910 #undef __FUNC__
911 #define __FUNC__ "MatMultAdd_MPIBAIJ"
912 int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
913 {
914   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
915   int        ierr;
916   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
917   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
918   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
919   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
920   PetscFunctionReturn(0);
921 }
922 
923 #undef __FUNC__
924 #define __FUNC__ "MatMultTrans_MPIBAIJ"
925 int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
926 {
927   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
928   int        ierr;
929 
930   /* do nondiagonal part */
931   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
932   /* send it on its way */
933   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
934   /* do local part */
935   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
936   /* receive remote parts: note this assumes the values are not actually */
937   /* inserted in yy until the next line, which is true for my implementation*/
938   /* but is not perhaps always true. */
939   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
940   PetscFunctionReturn(0);
941 }
942 
943 #undef __FUNC__
944 #define __FUNC__ "MatMultTransAdd_MPIBAIJ"
945 int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
946 {
947   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
948   int        ierr;
949 
950   /* do nondiagonal part */
951   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
952   /* send it on its way */
953   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
954   /* do local part */
955   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
956   /* receive remote parts: note this assumes the values are not actually */
957   /* inserted in yy until the next line, which is true for my implementation*/
958   /* but is not perhaps always true. */
959   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
960   PetscFunctionReturn(0);
961 }
962 
963 /*
964   This only works correctly for square matrices where the subblock A->A is the
965    diagonal block
966 */
967 #undef __FUNC__
968 #define __FUNC__ "MatGetDiagonal_MPIBAIJ"
969 int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
970 {
971   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
972   int         ierr;
973   if (a->M != a->N) SETERRQ(1,0,"Supports only square matrix where A->A is diag block");
974   ierr = MatGetDiagonal(a->A,v);CHKERRQ(ierr);
975   PetscFunctionReturn(0);
976 }
977 
978 #undef __FUNC__
979 #define __FUNC__ "MatScale_MPIBAIJ"
980 int MatScale_MPIBAIJ(Scalar *aa,Mat A)
981 {
982   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
983   int        ierr;
984   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
985   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
986   PetscFunctionReturn(0);
987 }
988 
989 #undef __FUNC__
990 #define __FUNC__ "MatGetSize_MPIBAIJ"
991 int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
992 {
993   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
994   if (m) *m = mat->M;
995   if (n) *n = mat->N;
996   PetscFunctionReturn(0);
997 }
998 
999 #undef __FUNC__
1000 #define __FUNC__ "MatGetLocalSize_MPIBAIJ"
1001 int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
1002 {
1003   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1004   *m = mat->m; *n = mat->N;
1005   PetscFunctionReturn(0);
1006 }
1007 
1008 #undef __FUNC__
1009 #define __FUNC__ "MatGetOwnershipRange_MPIBAIJ"
1010 int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
1011 {
1012   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1013   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
1014   PetscFunctionReturn(0);
1015 }
1016 
1017 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
1018 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
1019 
1020 #undef __FUNC__
1021 #define __FUNC__ "MatGetRow_MPIBAIJ"
1022 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
1023 {
1024   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1025   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
1026   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
1027   int        nztot, nzA, nzB, lrow, brstart = mat->rstart*bs, brend = mat->rend*bs;
1028   int        *cmap, *idx_p,cstart = mat->cstart;
1029 
1030   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,0,"Already active");
1031   mat->getrowactive = PETSC_TRUE;
1032 
1033   if (!mat->rowvalues && (idx || v)) {
1034     /*
1035         allocate enough space to hold information from the longest row.
1036     */
1037     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
1038     int     max = 1,mbs = mat->mbs,tmp;
1039     for ( i=0; i<mbs; i++ ) {
1040       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
1041       if (max < tmp) { max = tmp; }
1042     }
1043     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
1044     CHKPTRQ(mat->rowvalues);
1045     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
1046   }
1047 
1048 
1049   if (row < brstart || row >= brend) SETERRQ(1,0,"Only local rows")
1050   lrow = row - brstart;
1051 
1052   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
1053   if (!v)   {pvA = 0; pvB = 0;}
1054   if (!idx) {pcA = 0; if (!v) pcB = 0;}
1055   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1056   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1057   nztot = nzA + nzB;
1058 
1059   cmap  = mat->garray;
1060   if (v  || idx) {
1061     if (nztot) {
1062       /* Sort by increasing column numbers, assuming A and B already sorted */
1063       int imark = -1;
1064       if (v) {
1065         *v = v_p = mat->rowvalues;
1066         for ( i=0; i<nzB; i++ ) {
1067           if (cmap[cworkB[i]/bs] < cstart)   v_p[i] = vworkB[i];
1068           else break;
1069         }
1070         imark = i;
1071         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
1072         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
1073       }
1074       if (idx) {
1075         *idx = idx_p = mat->rowindices;
1076         if (imark > -1) {
1077           for ( i=0; i<imark; i++ ) {
1078             idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs;
1079           }
1080         } else {
1081           for ( i=0; i<nzB; i++ ) {
1082             if (cmap[cworkB[i]/bs] < cstart)
1083               idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
1084             else break;
1085           }
1086           imark = i;
1087         }
1088         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart*bs + cworkA[i];
1089         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
1090       }
1091     }
1092     else {
1093       if (idx) *idx = 0;
1094       if (v)   *v   = 0;
1095     }
1096   }
1097   *nz = nztot;
1098   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1099   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1100   PetscFunctionReturn(0);
1101 }
1102 
1103 #undef __FUNC__
1104 #define __FUNC__ "MatRestoreRow_MPIBAIJ"
1105 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
1106 {
1107   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1108   if (baij->getrowactive == PETSC_FALSE) {
1109     SETERRQ(1,0,"MatGetRow not called");
1110   }
1111   baij->getrowactive = PETSC_FALSE;
1112   PetscFunctionReturn(0);
1113 }
1114 
1115 #undef __FUNC__
1116 #define __FUNC__ "MatGetBlockSize_MPIBAIJ"
1117 int MatGetBlockSize_MPIBAIJ(Mat mat,int *bs)
1118 {
1119   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1120   *bs = baij->bs;
1121   PetscFunctionReturn(0);
1122 }
1123 
1124 #undef __FUNC__
1125 #define __FUNC__ "MatZeroEntries_MPIBAIJ"
1126 int MatZeroEntries_MPIBAIJ(Mat A)
1127 {
1128   Mat_MPIBAIJ *l = (Mat_MPIBAIJ *) A->data;
1129   int         ierr;
1130   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
1131   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
1132   PetscFunctionReturn(0);
1133 }
1134 
1135 #undef __FUNC__
1136 #define __FUNC__ "MatGetInfo_MPIBAIJ"
1137 int MatGetInfo_MPIBAIJ(Mat matin,MatInfoType flag,MatInfo *info)
1138 {
1139   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) matin->data;
1140   Mat         A = a->A, B = a->B;
1141   int         ierr;
1142   double      isend[5], irecv[5];
1143 
1144   info->rows_global    = (double)a->M;
1145   info->columns_global = (double)a->N;
1146   info->rows_local     = (double)a->m;
1147   info->columns_local  = (double)a->N;
1148   info->block_size     = (double)a->bs;
1149   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
1150   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->memory;
1151   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
1152   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->memory;
1153   if (flag == MAT_LOCAL) {
1154     info->nz_used      = isend[0];
1155     info->nz_allocated = isend[1];
1156     info->nz_unneeded  = isend[2];
1157     info->memory       = isend[3];
1158     info->mallocs      = isend[4];
1159   } else if (flag == MAT_GLOBAL_MAX) {
1160     MPI_Allreduce(isend,irecv,5,MPI_INT,MPI_MAX,matin->comm);
1161     info->nz_used      = irecv[0];
1162     info->nz_allocated = irecv[1];
1163     info->nz_unneeded  = irecv[2];
1164     info->memory       = irecv[3];
1165     info->mallocs      = irecv[4];
1166   } else if (flag == MAT_GLOBAL_SUM) {
1167     MPI_Allreduce(isend,irecv,5,MPI_INT,MPI_SUM,matin->comm);
1168     info->nz_used      = irecv[0];
1169     info->nz_allocated = irecv[1];
1170     info->nz_unneeded  = irecv[2];
1171     info->memory       = irecv[3];
1172     info->mallocs      = irecv[4];
1173   }
1174   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
1175   info->fill_ratio_needed = 0;
1176   info->factor_mallocs    = 0;
1177   PetscFunctionReturn(0);
1178 }
1179 
1180 #undef __FUNC__
1181 #define __FUNC__ "MatSetOption_MPIBAIJ"
1182 int MatSetOption_MPIBAIJ(Mat A,MatOption op)
1183 {
1184   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1185 
1186   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
1187       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
1188       op == MAT_COLUMNS_UNSORTED ||
1189       op == MAT_COLUMNS_SORTED ||
1190       op == MAT_NEW_NONZERO_ALLOCATION_ERROR ||
1191       op == MAT_NEW_NONZERO_LOCATION_ERROR) {
1192         MatSetOption(a->A,op);
1193         MatSetOption(a->B,op);
1194   } else if (op == MAT_ROW_ORIENTED) {
1195         a->roworiented = 1;
1196         MatSetOption(a->A,op);
1197         MatSetOption(a->B,op);
1198   } else if (op == MAT_ROWS_SORTED ||
1199              op == MAT_ROWS_UNSORTED ||
1200              op == MAT_SYMMETRIC ||
1201              op == MAT_STRUCTURALLY_SYMMETRIC ||
1202              op == MAT_YES_NEW_DIAGONALS)
1203     PLogInfo(A,"Info:MatSetOption_MPIBAIJ:Option ignored\n");
1204   else if (op == MAT_COLUMN_ORIENTED) {
1205     a->roworiented = 0;
1206     MatSetOption(a->A,op);
1207     MatSetOption(a->B,op);
1208   } else if (op == MAT_IGNORE_OFF_PROC_ENTRIES) {
1209     a->donotstash = 1;
1210   } else if (op == MAT_NO_NEW_DIAGONALS)
1211     {SETERRQ(PETSC_ERR_SUP,0,"MAT_NO_NEW_DIAGONALS");}
1212   else
1213     {SETERRQ(PETSC_ERR_SUP,0,"unknown option");}
1214   PetscFunctionReturn(0);
1215 }
1216 
1217 #undef __FUNC__
1218 #define __FUNC__ "MatTranspose_MPIBAIJ("
1219 int MatTranspose_MPIBAIJ(Mat A,Mat *matout)
1220 {
1221   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) A->data;
1222   Mat_SeqBAIJ *Aloc;
1223   Mat        B;
1224   int        ierr,M=baij->M,N=baij->N,*ai,*aj,row,i,*rvals,j,k,col;
1225   int        bs=baij->bs,mbs=baij->mbs;
1226   Scalar     *a;
1227 
1228   if (matout == PETSC_NULL && M != N)
1229     SETERRQ(1,0,"Square matrix only for in-place");
1230   ierr = MatCreateMPIBAIJ(A->comm,baij->bs,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);
1231   CHKERRQ(ierr);
1232 
1233   /* copy over the A part */
1234   Aloc = (Mat_SeqBAIJ*) baij->A->data;
1235   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1236   row = baij->rstart;
1237   rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
1238 
1239   for ( i=0; i<mbs; i++ ) {
1240     rvals[0] = bs*(baij->rstart + i);
1241     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1242     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1243       col = (baij->cstart+aj[j])*bs;
1244       for (k=0; k<bs; k++ ) {
1245         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1246         col++; a += bs;
1247       }
1248     }
1249   }
1250   /* copy over the B part */
1251   Aloc = (Mat_SeqBAIJ*) baij->B->data;
1252   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1253   row = baij->rstart*bs;
1254   for ( i=0; i<mbs; i++ ) {
1255     rvals[0] = bs*(baij->rstart + i);
1256     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1257     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1258       col = baij->garray[aj[j]]*bs;
1259       for (k=0; k<bs; k++ ) {
1260         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1261         col++; a += bs;
1262       }
1263     }
1264   }
1265   PetscFree(rvals);
1266   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1267   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1268 
1269   if (matout != PETSC_NULL) {
1270     *matout = B;
1271   } else {
1272     /* This isn't really an in-place transpose .... but free data structures from baij */
1273     PetscFree(baij->rowners);
1274     ierr = MatDestroy(baij->A); CHKERRQ(ierr);
1275     ierr = MatDestroy(baij->B); CHKERRQ(ierr);
1276     if (baij->colmap) PetscFree(baij->colmap);
1277     if (baij->garray) PetscFree(baij->garray);
1278     if (baij->lvec) VecDestroy(baij->lvec);
1279     if (baij->Mvctx) VecScatterDestroy(baij->Mvctx);
1280     PetscFree(baij);
1281     PetscMemcpy(A,B,sizeof(struct _p_Mat));
1282     PetscHeaderDestroy(B);
1283   }
1284   PetscFunctionReturn(0);
1285 }
1286 
1287 #undef __FUNC__
1288 #define __FUNC__ "MatDiagonalScale_MPIBAIJ"
1289 int MatDiagonalScale_MPIBAIJ(Mat A,Vec ll,Vec rr)
1290 {
1291   Mat a = ((Mat_MPIBAIJ *) A->data)->A;
1292   Mat b = ((Mat_MPIBAIJ *) A->data)->B;
1293   int ierr,s1,s2,s3;
1294 
1295   if (ll)  {
1296     ierr = VecGetLocalSize(ll,&s1); CHKERRQ(ierr);
1297     ierr = MatGetLocalSize(A,&s2,&s3); CHKERRQ(ierr);
1298     if (s1!=s2) SETERRQ(1,0,"non-conforming local sizes");
1299     ierr = MatDiagonalScale(a,ll,0); CHKERRQ(ierr);
1300     ierr = MatDiagonalScale(b,ll,0); CHKERRQ(ierr);
1301   }
1302   if (rr) SETERRQ(1,0,"not supported for right vector");
1303   PetscFunctionReturn(0);
1304 }
1305 
1306 /* the code does not do the diagonal entries correctly unless the
1307    matrix is square and the column and row owerships are identical.
1308    This is a BUG. The only way to fix it seems to be to access
1309    baij->A and baij->B directly and not through the MatZeroRows()
1310    routine.
1311 */
1312 #undef __FUNC__
1313 #define __FUNC__ "MatZeroRows_MPIBAIJ"
1314 int MatZeroRows_MPIBAIJ(Mat A,IS is,Scalar *diag)
1315 {
1316   Mat_MPIBAIJ    *l = (Mat_MPIBAIJ *) A->data;
1317   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
1318   int            *procs,*nprocs,j,found,idx,nsends,*work;
1319   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
1320   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
1321   int            *lens,imdex,*lrows,*values,bs=l->bs;
1322   MPI_Comm       comm = A->comm;
1323   MPI_Request    *send_waits,*recv_waits;
1324   MPI_Status     recv_status,*send_status;
1325   IS             istmp;
1326 
1327   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
1328   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
1329 
1330   /*  first count number of contributors to each processor */
1331   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
1332   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
1333   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
1334   for ( i=0; i<N; i++ ) {
1335     idx = rows[i];
1336     found = 0;
1337     for ( j=0; j<size; j++ ) {
1338       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
1339         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
1340       }
1341     }
1342     if (!found) SETERRQ(1,0,"Index out of range");
1343   }
1344   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
1345 
1346   /* inform other processors of number of messages and max length*/
1347   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
1348   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
1349   nrecvs = work[rank];
1350   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
1351   nmax = work[rank];
1352   PetscFree(work);
1353 
1354   /* post receives:   */
1355   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
1356   CHKPTRQ(rvalues);
1357   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
1358   CHKPTRQ(recv_waits);
1359   for ( i=0; i<nrecvs; i++ ) {
1360     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
1361   }
1362 
1363   /* do sends:
1364       1) starts[i] gives the starting index in svalues for stuff going to
1365          the ith processor
1366   */
1367   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
1368   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
1369   CHKPTRQ(send_waits);
1370   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
1371   starts[0] = 0;
1372   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1373   for ( i=0; i<N; i++ ) {
1374     svalues[starts[owner[i]]++] = rows[i];
1375   }
1376   ISRestoreIndices(is,&rows);
1377 
1378   starts[0] = 0;
1379   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1380   count = 0;
1381   for ( i=0; i<size; i++ ) {
1382     if (procs[i]) {
1383       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
1384     }
1385   }
1386   PetscFree(starts);
1387 
1388   base = owners[rank]*bs;
1389 
1390   /*  wait on receives */
1391   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
1392   source = lens + nrecvs;
1393   count  = nrecvs; slen = 0;
1394   while (count) {
1395     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
1396     /* unpack receives into our local space */
1397     MPI_Get_count(&recv_status,MPI_INT,&n);
1398     source[imdex]  = recv_status.MPI_SOURCE;
1399     lens[imdex]  = n;
1400     slen += n;
1401     count--;
1402   }
1403   PetscFree(recv_waits);
1404 
1405   /* move the data into the send scatter */
1406   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
1407   count = 0;
1408   for ( i=0; i<nrecvs; i++ ) {
1409     values = rvalues + i*nmax;
1410     for ( j=0; j<lens[i]; j++ ) {
1411       lrows[count++] = values[j] - base;
1412     }
1413   }
1414   PetscFree(rvalues); PetscFree(lens);
1415   PetscFree(owner); PetscFree(nprocs);
1416 
1417   /* actually zap the local rows */
1418   ierr = ISCreateGeneral(PETSC_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
1419   PLogObjectParent(A,istmp);
1420   PetscFree(lrows);
1421   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
1422   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
1423   ierr = ISDestroy(istmp); CHKERRQ(ierr);
1424 
1425   /* wait on sends */
1426   if (nsends) {
1427     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
1428     CHKPTRQ(send_status);
1429     MPI_Waitall(nsends,send_waits,send_status);
1430     PetscFree(send_status);
1431   }
1432   PetscFree(send_waits); PetscFree(svalues);
1433 
1434   PetscFunctionReturn(0);
1435 }
1436 extern int MatPrintHelp_SeqBAIJ(Mat);
1437 #undef __FUNC__
1438 #define __FUNC__ "MatPrintHelp_MPIBAIJ"
1439 int MatPrintHelp_MPIBAIJ(Mat A)
1440 {
1441   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1442   int         ierr;
1443 
1444   if (!a->rank) {
1445     ierr = MatPrintHelp_SeqBAIJ(a->A);CHKERRQ(ierr);
1446   }
1447   PetscFunctionReturn(0);
1448 }
1449 
1450 #undef __FUNC__
1451 #define __FUNC__ "MatSetUnfactored_MPIBAIJ"
1452 int MatSetUnfactored_MPIBAIJ(Mat A)
1453 {
1454   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1455   int         ierr;
1456   ierr = MatSetUnfactored(a->A); CHKERRQ(ierr);
1457   PetscFunctionReturn(0);
1458 }
1459 
1460 static int MatConvertSameType_MPIBAIJ(Mat,Mat *,int);
1461 
1462 /* -------------------------------------------------------------------*/
1463 static struct _MatOps MatOps = {
1464   MatSetValues_MPIBAIJ,MatGetRow_MPIBAIJ,MatRestoreRow_MPIBAIJ,MatMult_MPIBAIJ,
1465   MatMultAdd_MPIBAIJ,MatMultTrans_MPIBAIJ,MatMultTransAdd_MPIBAIJ,0,
1466   0,0,0,0,
1467   0,0,MatTranspose_MPIBAIJ,MatGetInfo_MPIBAIJ,
1468   0,MatGetDiagonal_MPIBAIJ,MatDiagonalScale_MPIBAIJ,MatNorm_MPIBAIJ,
1469   MatAssemblyBegin_MPIBAIJ,MatAssemblyEnd_MPIBAIJ,0,MatSetOption_MPIBAIJ,
1470   MatZeroEntries_MPIBAIJ,MatZeroRows_MPIBAIJ,0,
1471   0,0,0,MatGetSize_MPIBAIJ,
1472   MatGetLocalSize_MPIBAIJ,MatGetOwnershipRange_MPIBAIJ,0,0,
1473   0,0,MatConvertSameType_MPIBAIJ,0,0,
1474   0,0,0,MatGetSubMatrices_MPIBAIJ,
1475   MatIncreaseOverlap_MPIBAIJ,MatGetValues_MPIBAIJ,0,MatPrintHelp_MPIBAIJ,
1476   MatScale_MPIBAIJ,0,0,0,MatGetBlockSize_MPIBAIJ,
1477   0,0,0,0,0,0,MatSetUnfactored_MPIBAIJ,0,MatSetValuesBlocked_MPIBAIJ};
1478 
1479 
1480 #undef __FUNC__
1481 #define __FUNC__ "MatCreateMPIBAIJ"
1482 /*@C
1483    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
1484    (block compressed row).  For good matrix assembly performance
1485    the user should preallocate the matrix storage by setting the parameters
1486    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1487    performance can be increased by more than a factor of 50.
1488 
1489    Input Parameters:
1490 .  comm - MPI communicator
1491 .  bs   - size of blockk
1492 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1493            This value should be the same as the local size used in creating the
1494            y vector for the matrix-vector product y = Ax.
1495 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1496            This value should be the same as the local size used in creating the
1497            x vector for the matrix-vector product y = Ax.
1498 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1499 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1500 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
1501            submatrix  (same for all local rows)
1502 .  d_nzz - array containing the number of block nonzeros in the various block rows
1503            of the in diagonal portion of the local (possibly different for each block
1504            row) or PETSC_NULL.  You must leave room for the diagonal entry even if
1505            it is zero.
1506 .  o_nz  - number of block nonzeros per block row in the off-diagonal portion of local
1507            submatrix (same for all local rows).
1508 .  o_nzz - array containing the number of nonzeros in the various block rows of the
1509            off-diagonal portion of the local submatrix (possibly different for
1510            each block row) or PETSC_NULL.
1511 
1512    Output Parameter:
1513 .  A - the matrix
1514 
1515    Notes:
1516    The user MUST specify either the local or global matrix dimensions
1517    (possibly both).
1518 
1519    Storage Information:
1520    For a square global matrix we define each processor's diagonal portion
1521    to be its local rows and the corresponding columns (a square submatrix);
1522    each processor's off-diagonal portion encompasses the remainder of the
1523    local matrix (a rectangular submatrix).
1524 
1525    The user can specify preallocated storage for the diagonal part of
1526    the local submatrix with either d_nz or d_nnz (not both).  Set
1527    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1528    memory allocation.  Likewise, specify preallocated storage for the
1529    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1530 
1531    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1532    the figure below we depict these three local rows and all columns (0-11).
1533 
1534 $          0 1 2 3 4 5 6 7 8 9 10 11
1535 $         -------------------
1536 $  row 3  |  o o o d d d o o o o o o
1537 $  row 4  |  o o o d d d o o o o o o
1538 $  row 5  |  o o o d d d o o o o o o
1539 $         -------------------
1540 $
1541 
1542    Thus, any entries in the d locations are stored in the d (diagonal)
1543    submatrix, and any entries in the o locations are stored in the
1544    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1545    stored simply in the MATSEQBAIJ format for compressed row storage.
1546 
1547    Now d_nz should indicate the number of nonzeros per row in the d matrix,
1548    and o_nz should indicate the number of nonzeros per row in the o matrix.
1549    In general, for PDE problems in which most nonzeros are near the diagonal,
1550    one expects d_nz >> o_nz.   For large problems you MUST preallocate memory
1551    or you will get TERRIBLE performance; see the users' manual chapter on
1552    matrices.
1553 
1554 .keywords: matrix, block, aij, compressed row, sparse, parallel
1555 
1556 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues()
1557 @*/
1558 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
1559                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1560 {
1561   Mat          B;
1562   Mat_MPIBAIJ  *b;
1563   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE,size;
1564 
1565   if (bs < 1) SETERRQ(1,0,"Invalid block size specified, must be positive");
1566 
1567   MPI_Comm_size(comm,&size);
1568   if (size == 1) {
1569     if (M == PETSC_DECIDE) M = m;
1570     if (N == PETSC_DECIDE) N = n;
1571     ierr = MatCreateSeqBAIJ(comm,bs,M,N,d_nz,d_nnz,A); CHKERRQ(ierr);
1572     PetscFunctionReturn(0);
1573   }
1574 
1575   *A = 0;
1576   PetscHeaderCreate(B,_p_Mat,MAT_COOKIE,MATMPIBAIJ,comm,MatDestroy,MatView);
1577   PLogObjectCreate(B);
1578   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
1579   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
1580   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1581 
1582   B->destroy    = MatDestroy_MPIBAIJ;
1583   B->view       = MatView_MPIBAIJ;
1584   B->mapping    = 0;
1585   B->factor     = 0;
1586   B->assembled  = PETSC_FALSE;
1587 
1588   B->insertmode = NOT_SET_VALUES;
1589   MPI_Comm_rank(comm,&b->rank);
1590   MPI_Comm_size(comm,&b->size);
1591 
1592   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
1593     SETERRQ(1,0,"Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1594   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) SETERRQ(1,0,"either M or m should be specified");
1595   if ( M == PETSC_DECIDE && n == PETSC_DECIDE)SETERRQ(1,0,"either N or n should be specified");
1596   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
1597   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
1598 
1599   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1600     work[0] = m; work[1] = n;
1601     mbs = m/bs; nbs = n/bs;
1602     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
1603     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
1604     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
1605   }
1606   if (m == PETSC_DECIDE) {
1607     Mbs = M/bs;
1608     if (Mbs*bs != M) SETERRQ(1,0,"No of global rows must be divisible by blocksize");
1609     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
1610     m   = mbs*bs;
1611   }
1612   if (n == PETSC_DECIDE) {
1613     Nbs = N/bs;
1614     if (Nbs*bs != N) SETERRQ(1,0,"No of global cols must be divisible by blocksize");
1615     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
1616     n   = nbs*bs;
1617   }
1618   if (mbs*bs != m || nbs*bs != n) SETERRQ(1,0,"No of local rows, cols must be divisible by blocksize");
1619 
1620   b->m = m; B->m = m;
1621   b->n = n; B->n = n;
1622   b->N = N; B->N = N;
1623   b->M = M; B->M = M;
1624   b->bs  = bs;
1625   b->bs2 = bs*bs;
1626   b->mbs = mbs;
1627   b->nbs = nbs;
1628   b->Mbs = Mbs;
1629   b->Nbs = Nbs;
1630 
1631   /* build local table of row and column ownerships */
1632   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1633   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIBAIJ));
1634   b->cowners = b->rowners + b->size + 2;
1635   MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
1636   b->rowners[0] = 0;
1637   for ( i=2; i<=b->size; i++ ) {
1638     b->rowners[i] += b->rowners[i-1];
1639   }
1640   b->rstart    = b->rowners[b->rank];
1641   b->rend      = b->rowners[b->rank+1];
1642   b->rstart_bs = b->rstart * bs;
1643   b->rend_bs   = b->rend * bs;
1644 
1645   MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
1646   b->cowners[0] = 0;
1647   for ( i=2; i<=b->size; i++ ) {
1648     b->cowners[i] += b->cowners[i-1];
1649   }
1650   b->cstart    = b->cowners[b->rank];
1651   b->cend      = b->cowners[b->rank+1];
1652   b->cstart_bs = b->cstart * bs;
1653   b->cend_bs   = b->cend * bs;
1654 
1655   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1656   ierr = MatCreateSeqBAIJ(PETSC_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1657   PLogObjectParent(B,b->A);
1658   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1659   ierr = MatCreateSeqBAIJ(PETSC_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1660   PLogObjectParent(B,b->B);
1661 
1662   /* build cache for off array entries formed */
1663   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1664   b->donotstash  = 0;
1665   b->colmap      = 0;
1666   b->garray      = 0;
1667   b->roworiented = 1;
1668 
1669   /* stuff used in block assembly */
1670   b->barray      = 0;
1671 
1672   /* stuff used for matrix vector multiply */
1673   b->lvec        = 0;
1674   b->Mvctx       = 0;
1675 
1676   /* stuff for MatGetRow() */
1677   b->rowindices   = 0;
1678   b->rowvalues    = 0;
1679   b->getrowactive = PETSC_FALSE;
1680 
1681   *A = B;
1682   PetscFunctionReturn(0);
1683 }
1684 
1685 #undef __FUNC__
1686 #define __FUNC__ "MatConvertSameType_MPIBAIJ"
1687 static int MatConvertSameType_MPIBAIJ(Mat matin,Mat *newmat,int cpvalues)
1688 {
1689   Mat         mat;
1690   Mat_MPIBAIJ *a,*oldmat = (Mat_MPIBAIJ *) matin->data;
1691   int         ierr, len=0, flg;
1692 
1693   *newmat       = 0;
1694   PetscHeaderCreate(mat,_p_Mat,MAT_COOKIE,MATMPIBAIJ,matin->comm,MatDestroy,MatView);
1695   PLogObjectCreate(mat);
1696   mat->data       = (void *) (a = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(a);
1697   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1698   mat->destroy    = MatDestroy_MPIBAIJ;
1699   mat->view       = MatView_MPIBAIJ;
1700   mat->factor     = matin->factor;
1701   mat->assembled  = PETSC_TRUE;
1702 
1703   a->m = mat->m   = oldmat->m;
1704   a->n = mat->n   = oldmat->n;
1705   a->M = mat->M   = oldmat->M;
1706   a->N = mat->N   = oldmat->N;
1707 
1708   a->bs  = oldmat->bs;
1709   a->bs2 = oldmat->bs2;
1710   a->mbs = oldmat->mbs;
1711   a->nbs = oldmat->nbs;
1712   a->Mbs = oldmat->Mbs;
1713   a->Nbs = oldmat->Nbs;
1714 
1715   a->rstart       = oldmat->rstart;
1716   a->rend         = oldmat->rend;
1717   a->cstart       = oldmat->cstart;
1718   a->cend         = oldmat->cend;
1719   a->size         = oldmat->size;
1720   a->rank         = oldmat->rank;
1721   mat->insertmode = NOT_SET_VALUES;
1722   a->rowvalues    = 0;
1723   a->getrowactive = PETSC_FALSE;
1724   a->barray       = 0;
1725 
1726   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1727   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIBAIJ));
1728   a->cowners = a->rowners + a->size + 2;
1729   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1730   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1731   if (oldmat->colmap) {
1732     a->colmap = (int *) PetscMalloc((a->Nbs)*sizeof(int));CHKPTRQ(a->colmap);
1733     PLogObjectMemory(mat,(a->Nbs)*sizeof(int));
1734     PetscMemcpy(a->colmap,oldmat->colmap,(a->Nbs)*sizeof(int));
1735   } else a->colmap = 0;
1736   if (oldmat->garray && (len = ((Mat_SeqBAIJ *) (oldmat->B->data))->nbs)) {
1737     a->garray = (int *) PetscMalloc(len*sizeof(int)); CHKPTRQ(a->garray);
1738     PLogObjectMemory(mat,len*sizeof(int));
1739     PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1740   } else a->garray = 0;
1741 
1742   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1743   PLogObjectParent(mat,a->lvec);
1744   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1745   PLogObjectParent(mat,a->Mvctx);
1746   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1747   PLogObjectParent(mat,a->A);
1748   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1749   PLogObjectParent(mat,a->B);
1750   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1751   if (flg) {
1752     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1753   }
1754   *newmat = mat;
1755   PetscFunctionReturn(0);
1756 }
1757 
1758 #include "sys.h"
1759 
1760 #undef __FUNC__
1761 #define __FUNC__ "MatLoad_MPIBAIJ"
1762 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
1763 {
1764   Mat          A;
1765   int          i, nz, ierr, j,rstart, rend, fd;
1766   Scalar       *vals,*buf;
1767   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1768   MPI_Status   status;
1769   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
1770   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
1771   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,bs2,Mbs,mbs,extra_rows;
1772   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
1773   int          dcount,kmax,k,nzcount,tmp;
1774 
1775 
1776   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
1777   bs2  = bs*bs;
1778 
1779   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1780   if (!rank) {
1781     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1782     ierr = PetscBinaryRead(fd,(char *)header,4,PETSC_INT); CHKERRQ(ierr);
1783     if (header[0] != MAT_COOKIE) SETERRQ(1,0,"not matrix object");
1784   }
1785 
1786   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1787   M = header[1]; N = header[2];
1788 
1789   if (M != N) SETERRQ(1,0,"Can only do square matrices");
1790 
1791   /*
1792      This code adds extra rows to make sure the number of rows is
1793      divisible by the blocksize
1794   */
1795   Mbs        = M/bs;
1796   extra_rows = bs - M + bs*(Mbs);
1797   if (extra_rows == bs) extra_rows = 0;
1798   else                  Mbs++;
1799   if (extra_rows &&!rank) {
1800     PLogInfo(0,"MatLoad_MPIBAIJ:Padding loaded matrix to match blocksize\n");
1801   }
1802 
1803   /* determine ownership of all rows */
1804   mbs = Mbs/size + ((Mbs % size) > rank);
1805   m   = mbs * bs;
1806   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
1807   browners = rowners + size + 1;
1808   MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1809   rowners[0] = 0;
1810   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
1811   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
1812   rstart = rowners[rank];
1813   rend   = rowners[rank+1];
1814 
1815   /* distribute row lengths to all processors */
1816   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
1817   if (!rank) {
1818     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
1819     ierr = PetscBinaryRead(fd,rowlengths,M,PETSC_INT); CHKERRQ(ierr);
1820     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
1821     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1822     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
1823     MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);
1824     PetscFree(sndcounts);
1825   }
1826   else {
1827     MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);
1828   }
1829 
1830   if (!rank) {
1831     /* calculate the number of nonzeros on each processor */
1832     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1833     PetscMemzero(procsnz,size*sizeof(int));
1834     for ( i=0; i<size; i++ ) {
1835       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
1836         procsnz[i] += rowlengths[j];
1837       }
1838     }
1839     PetscFree(rowlengths);
1840 
1841     /* determine max buffer needed and allocate it */
1842     maxnz = 0;
1843     for ( i=0; i<size; i++ ) {
1844       maxnz = PetscMax(maxnz,procsnz[i]);
1845     }
1846     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1847 
1848     /* read in my part of the matrix column indices  */
1849     nz = procsnz[0];
1850     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1851     mycols = ibuf;
1852     if (size == 1)  nz -= extra_rows;
1853     ierr = PetscBinaryRead(fd,mycols,nz,PETSC_INT); CHKERRQ(ierr);
1854     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
1855 
1856     /* read in every ones (except the last) and ship off */
1857     for ( i=1; i<size-1; i++ ) {
1858       nz = procsnz[i];
1859       ierr = PetscBinaryRead(fd,cols,nz,PETSC_INT); CHKERRQ(ierr);
1860       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1861     }
1862     /* read in the stuff for the last proc */
1863     if ( size != 1 ) {
1864       nz = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
1865       ierr = PetscBinaryRead(fd,cols,nz,PETSC_INT); CHKERRQ(ierr);
1866       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
1867       MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);
1868     }
1869     PetscFree(cols);
1870   }
1871   else {
1872     /* determine buffer space needed for message */
1873     nz = 0;
1874     for ( i=0; i<m; i++ ) {
1875       nz += locrowlens[i];
1876     }
1877     ibuf = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1878     mycols = ibuf;
1879     /* receive message of column indices*/
1880     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1881     MPI_Get_count(&status,MPI_INT,&maxnz);
1882     if (maxnz != nz) SETERRQ(1,0,"something is wrong with file");
1883   }
1884 
1885   /* loop over local rows, determining number of off diagonal entries */
1886   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
1887   odlens = dlens + (rend-rstart);
1888   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
1889   PetscMemzero(mask,3*Mbs*sizeof(int));
1890   masked1 = mask    + Mbs;
1891   masked2 = masked1 + Mbs;
1892   rowcount = 0; nzcount = 0;
1893   for ( i=0; i<mbs; i++ ) {
1894     dcount  = 0;
1895     odcount = 0;
1896     for ( j=0; j<bs; j++ ) {
1897       kmax = locrowlens[rowcount];
1898       for ( k=0; k<kmax; k++ ) {
1899         tmp = mycols[nzcount++]/bs;
1900         if (!mask[tmp]) {
1901           mask[tmp] = 1;
1902           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
1903           else masked1[dcount++] = tmp;
1904         }
1905       }
1906       rowcount++;
1907     }
1908 
1909     dlens[i]  = dcount;
1910     odlens[i] = odcount;
1911 
1912     /* zero out the mask elements we set */
1913     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
1914     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
1915   }
1916 
1917   /* create our matrix */
1918   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);
1919          CHKERRQ(ierr);
1920   A = *newmat;
1921   MatSetOption(A,MAT_COLUMNS_SORTED);
1922 
1923   if (!rank) {
1924     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
1925     /* read in my part of the matrix numerical values  */
1926     nz = procsnz[0];
1927     vals = buf;
1928     mycols = ibuf;
1929     if (size == 1)  nz -= extra_rows;
1930     ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
1931     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
1932 
1933     /* insert into matrix */
1934     jj      = rstart*bs;
1935     for ( i=0; i<m; i++ ) {
1936       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1937       mycols += locrowlens[i];
1938       vals   += locrowlens[i];
1939       jj++;
1940     }
1941     /* read in other processors (except the last one) and ship out */
1942     for ( i=1; i<size-1; i++ ) {
1943       nz = procsnz[i];
1944       vals = buf;
1945       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
1946       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1947     }
1948     /* the last proc */
1949     if ( size != 1 ){
1950       nz = procsnz[i] - extra_rows;
1951       vals = buf;
1952       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
1953       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
1954       MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);
1955     }
1956     PetscFree(procsnz);
1957   }
1958   else {
1959     /* receive numeric values */
1960     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
1961 
1962     /* receive message of values*/
1963     vals = buf;
1964     mycols = ibuf;
1965     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1966     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1967     if (maxnz != nz) SETERRQ(1,0,"something is wrong with file");
1968 
1969     /* insert into matrix */
1970     jj      = rstart*bs;
1971     for ( i=0; i<m; i++ ) {
1972       ierr    = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1973       mycols += locrowlens[i];
1974       vals   += locrowlens[i];
1975       jj++;
1976     }
1977   }
1978   PetscFree(locrowlens);
1979   PetscFree(buf);
1980   PetscFree(ibuf);
1981   PetscFree(rowners);
1982   PetscFree(dlens);
1983   PetscFree(mask);
1984   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1985   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1986   PetscFunctionReturn(0);
1987 }
1988 
1989 
1990