xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision 96854ed6ad2ded53123cd28a73515071866c9e4e)
1 #ifndef lint
2 static char vcid[] = "$Id: mpibaij.c,v 1.66 1997/04/10 00:03:33 bsmith Exp curfman $";
3 #endif
4 
5 #include "pinclude/pviewer.h"
6 #include "src/mat/impls/baij/mpi/mpibaij.h"
7 #include "src/vec/vecimpl.h"
8 
9 
10 extern int MatSetUpMultiply_MPIBAIJ(Mat);
11 extern int DisAssemble_MPIBAIJ(Mat);
12 extern int MatIncreaseOverlap_MPIBAIJ(Mat,int,IS *,int);
13 extern int MatGetSubMatrices_MPIBAIJ(Mat,int,IS *,IS *,MatGetSubMatrixCall,Mat **);
14 extern int MatLUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,Mat *);
15 extern int MatLUFactorNumeric_MPIBAIJ(Mat,Mat *);
16 extern int MatLUFactor_MPIBAIJ(Mat,IS,IS,double);
17 extern int MatSolve_MPIBAIJ(Mat,Vec,Vec);
18 extern int MatSolveAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
19 extern int MatSolveTrans_MPIBAIJ(Mat,Vec,Vec);
20 extern int MatSolveTransAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
21 extern int MatILUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,int,Mat *);
22 
23 
24 /*
25      Local utility routine that creates a mapping from the global column
26    number to the local number in the off-diagonal part of the local
27    storage of the matrix.  This is done in a non scable way since the
28    length of colmap equals the global matrix length.
29 */
30 #undef __FUNC__
31 #define __FUNC__ "CreateColmap_MPIBAIJ_Private"
32 static int CreateColmap_MPIBAIJ_Private(Mat mat)
33 {
34   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
35   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
36   int         nbs = B->nbs,i,bs=B->bs;;
37 
38   baij->colmap = (int *) PetscMalloc((baij->Nbs+1)*sizeof(int));CHKPTRQ(baij->colmap);
39   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
40   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
41   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i*bs+1;
42   return 0;
43 }
44 
45 #undef __FUNC__
46 #define __FUNC__ "MatGetRowIJ_MPIBAIJ("
47 static int MatGetRowIJ_MPIBAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
48                             PetscTruth *done)
49 {
50   Mat_MPIBAIJ *aij = (Mat_MPIBAIJ *) mat->data;
51   int         ierr;
52   if (aij->size == 1) {
53     ierr = MatGetRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
54   } else SETERRQ(1,0,"not supported in parallel");
55   return 0;
56 }
57 
58 #undef __FUNC__
59 #define __FUNC__ "MatRestoreRowIJ_MPIBAIJ"
60 static int MatRestoreRowIJ_MPIBAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
61                                 PetscTruth *done)
62 {
63   Mat_MPIBAIJ *aij = (Mat_MPIBAIJ *) mat->data;
64   int        ierr;
65   if (aij->size == 1) {
66     ierr = MatRestoreRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
67   } else SETERRQ(1,0,"not supported in parallel");
68   return 0;
69 }
70 #define CHUNKSIZE  10
71 
72 #define  MatSetValues_SeqBAIJ_A_Private(row,col,value,addv) \
73 { \
74  \
75     brow = row/bs;  \
76     rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
77     rmax = aimax[brow]; nrow = ailen[brow]; \
78       bcol = col/bs; \
79       ridx = row % bs; cidx = col % bs; \
80       low = 0; high = nrow; \
81       while (high-low > 3) { \
82         t = (low+high)/2; \
83         if (rp[t] > bcol) high = t; \
84         else              low  = t; \
85       } \
86       for ( _i=low; _i<high; _i++ ) { \
87         if (rp[_i] > bcol) break; \
88         if (rp[_i] == bcol) { \
89           bap  = ap +  bs2*_i + bs*cidx + ridx; \
90           if (addv == ADD_VALUES) *bap += value;  \
91           else                    *bap  = value;  \
92           goto a_noinsert; \
93         } \
94       } \
95       if (a->nonew) goto a_noinsert; \
96       if (nrow >= rmax) { \
97         /* there is no extra room in row, therefore enlarge */ \
98         int    new_nz = ai[a->mbs] + CHUNKSIZE,len,*new_i,*new_j; \
99         Scalar *new_a; \
100  \
101         /* malloc new storage space */ \
102         len     = new_nz*(sizeof(int)+bs2*sizeof(Scalar))+(a->mbs+1)*sizeof(int); \
103         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
104         new_j   = (int *) (new_a + bs2*new_nz); \
105         new_i   = new_j + new_nz; \
106  \
107         /* copy over old data into new slots */ \
108         for ( ii=0; ii<brow+1; ii++ ) {new_i[ii] = ai[ii];} \
109         for ( ii=brow+1; ii<a->mbs+1; ii++ ) {new_i[ii] = ai[ii]+CHUNKSIZE;} \
110         PetscMemcpy(new_j,aj,(ai[brow]+nrow)*sizeof(int)); \
111         len = (new_nz - CHUNKSIZE - ai[brow] - nrow); \
112         PetscMemcpy(new_j+ai[brow]+nrow+CHUNKSIZE,aj+ai[brow]+nrow, \
113                                                            len*sizeof(int)); \
114         PetscMemcpy(new_a,aa,(ai[brow]+nrow)*bs2*sizeof(Scalar)); \
115         PetscMemzero(new_a+bs2*(ai[brow]+nrow),bs2*CHUNKSIZE*sizeof(Scalar)); \
116         PetscMemcpy(new_a+bs2*(ai[brow]+nrow+CHUNKSIZE), \
117                     aa+bs2*(ai[brow]+nrow),bs2*len*sizeof(Scalar));  \
118         /* free up old matrix storage */ \
119         PetscFree(a->a);  \
120         if (!a->singlemalloc) {PetscFree(a->i);PetscFree(a->j);} \
121         aa = a->a = new_a; ai = a->i = new_i; aj = a->j = new_j;  \
122         a->singlemalloc = 1; \
123  \
124         rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
125         rmax = aimax[brow] = aimax[brow] + CHUNKSIZE; \
126         PLogObjectMemory(A,CHUNKSIZE*(sizeof(int) + bs2*sizeof(Scalar))); \
127         a->maxnz += bs2*CHUNKSIZE; \
128         a->reallocs++; \
129         a->nz++; \
130       } \
131       N = nrow++ - 1;  \
132       /* shift up all the later entries in this row */ \
133       for ( ii=N; ii>=_i; ii-- ) { \
134         rp[ii+1] = rp[ii]; \
135         PetscMemcpy(ap+bs2*(ii+1),ap+bs2*(ii),bs2*sizeof(Scalar)); \
136       } \
137       if (N>=_i) PetscMemzero(ap+bs2*_i,bs2*sizeof(Scalar));  \
138       rp[_i]                      = bcol;  \
139       ap[bs2*_i + bs*cidx + ridx] = value;  \
140       a_noinsert:; \
141     ailen[brow] = nrow; \
142 }
143 
144 #define  MatSetValues_SeqBAIJ_B_Private(row,col,value,addv) \
145 { \
146  \
147     brow = row/bs;  \
148     rp   = bj + bi[brow]; ap = ba + bs2*bi[brow]; \
149     rmax = bimax[brow]; nrow = bilen[brow]; \
150       bcol = col/bs; \
151       ridx = row % bs; cidx = col % bs; \
152       low = 0; high = nrow; \
153       while (high-low > 3) { \
154         t = (low+high)/2; \
155         if (rp[t] > bcol) high = t; \
156         else              low  = t; \
157       } \
158       for ( _i=low; _i<high; _i++ ) { \
159         if (rp[_i] > bcol) break; \
160         if (rp[_i] == bcol) { \
161           bap  = ap +  bs2*_i + bs*cidx + ridx; \
162           if (addv == ADD_VALUES) *bap += value;  \
163           else                    *bap  = value;  \
164           goto b_noinsert; \
165         } \
166       } \
167       if (b->nonew) goto b_noinsert; \
168       if (nrow >= rmax) { \
169         /* there is no extra room in row, therefore enlarge */ \
170         int    new_nz = bi[b->mbs] + CHUNKSIZE,len,*new_i,*new_j; \
171         Scalar *new_a; \
172  \
173         /* malloc new storage space */ \
174         len     = new_nz*(sizeof(int)+bs2*sizeof(Scalar))+(b->mbs+1)*sizeof(int); \
175         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
176         new_j   = (int *) (new_a + bs2*new_nz); \
177         new_i   = new_j + new_nz; \
178  \
179         /* copy over old data into new slots */ \
180         for ( ii=0; ii<brow+1; ii++ ) {new_i[ii] = bi[ii];} \
181         for ( ii=brow+1; ii<b->mbs+1; ii++ ) {new_i[ii] = bi[ii]+CHUNKSIZE;} \
182         PetscMemcpy(new_j,bj,(bi[brow]+nrow)*sizeof(int)); \
183         len = (new_nz - CHUNKSIZE - bi[brow] - nrow); \
184         PetscMemcpy(new_j+bi[brow]+nrow+CHUNKSIZE,bj+bi[brow]+nrow, \
185                                                            len*sizeof(int)); \
186         PetscMemcpy(new_a,ba,(bi[brow]+nrow)*bs2*sizeof(Scalar)); \
187         PetscMemzero(new_a+bs2*(bi[brow]+nrow),bs2*CHUNKSIZE*sizeof(Scalar)); \
188         PetscMemcpy(new_a+bs2*(bi[brow]+nrow+CHUNKSIZE), \
189                     ba+bs2*(bi[brow]+nrow),bs2*len*sizeof(Scalar));  \
190         /* free up old matrix storage */ \
191         PetscFree(b->a);  \
192         if (!b->singlemalloc) {PetscFree(b->i);PetscFree(b->j);} \
193         ba = b->a = new_a; bi = b->i = new_i; bj = b->j = new_j;  \
194         b->singlemalloc = 1; \
195  \
196         rp   = bj + bi[brow]; ap = ba + bs2*bi[brow]; \
197         rmax = bimax[brow] = bimax[brow] + CHUNKSIZE; \
198         PLogObjectMemory(B,CHUNKSIZE*(sizeof(int) + bs2*sizeof(Scalar))); \
199         b->maxnz += bs2*CHUNKSIZE; \
200         b->reallocs++; \
201         b->nz++; \
202       } \
203       N = nrow++ - 1;  \
204       /* shift up all the later entries in this row */ \
205       for ( ii=N; ii>=_i; ii-- ) { \
206         rp[ii+1] = rp[ii]; \
207         PetscMemcpy(ap+bs2*(ii+1),ap+bs2*(ii),bs2*sizeof(Scalar)); \
208       } \
209       if (N>=_i) PetscMemzero(ap+bs2*_i,bs2*sizeof(Scalar));  \
210       rp[_i]                      = bcol;  \
211       ap[bs2*_i + bs*cidx + ridx] = value;  \
212       b_noinsert:; \
213     bilen[brow] = nrow; \
214 }
215 
216 #undef __FUNC__
217 #define __FUNC__ "MatSetValues_MPIBAIJ"
218 int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
219 {
220   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
221   Scalar      value;
222   int         ierr,i,j,row,col;
223   int         roworiented = baij->roworiented,rstart_orig=baij->rstart_bs ;
224   int         rend_orig=baij->rend_bs,cstart_orig=baij->cstart_bs;
225   int         cend_orig=baij->cend_bs,bs=baij->bs;
226 
227   /* Some Variables required in the macro */
228   Mat         A = baij->A;
229   Mat_SeqBAIJ *a = (Mat_SeqBAIJ *) (A)->data;
230   int         *aimax=a->imax,*ai=a->i,*ailen=a->ilen,*aj=a->j;
231   Scalar      *aa=a->a;
232 
233   Mat         B = baij->B;
234   Mat_SeqBAIJ *b = (Mat_SeqBAIJ *) (B)->data;
235   int         *bimax=b->imax,*bi=b->i,*bilen=b->ilen,*bj=b->j;
236   Scalar      *ba=b->a;
237 
238   int         *rp,ii,nrow,_i,rmax,N,brow,bcol;
239   int         low,high,t,ridx,cidx,bs2=a->bs2;
240   Scalar      *ap,*bap;
241 
242   for ( i=0; i<m; i++ ) {
243 #if defined(PETSC_BOPT_g)
244     if (im[i] < 0) SETERRQ(1,0,"Negative row");
245     if (im[i] >= baij->M) SETERRQ(1,0,"Row too large");
246 #endif
247     if (im[i] >= rstart_orig && im[i] < rend_orig) {
248       row = im[i] - rstart_orig;
249       for ( j=0; j<n; j++ ) {
250         if (in[j] >= cstart_orig && in[j] < cend_orig){
251           col = in[j] - cstart_orig;
252           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
253           MatSetValues_SeqBAIJ_A_Private(row,col,value,addv);
254           /* ierr = MatSetValues_SeqBAIJ(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
255         }
256 #if defined(PETSC_BOPT_g)
257         else if (in[j] < 0) {SETERRQ(1,0,"Negative column");}
258         else if (in[j] >= baij->N) {SETERRQ(1,0,"Col too large");}
259 #endif
260         else {
261           if (mat->was_assembled) {
262             if (!baij->colmap) {
263               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
264             }
265             col = baij->colmap[in[j]/bs] - 1 + in[j]%bs;
266             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
267               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
268               col =  in[j];
269             }
270           }
271           else col = in[j];
272           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
273           MatSetValues_SeqBAIJ_B_Private(row,col,value,addv);
274           /* ierr = MatSetValues_SeqBAIJ(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
275         }
276       }
277     }
278     else {
279       if (roworiented && !baij->donotstash) {
280         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
281       }
282       else {
283         if (!baij->donotstash) {
284           row = im[i];
285 	  for ( j=0; j<n; j++ ) {
286 	    ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
287           }
288         }
289       }
290     }
291   }
292   return 0;
293 }
294 
295 extern int MatSetValuesBlocked_SeqBAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
296 #undef __FUNC__
297 #define __FUNC__ "MatSetValuesBlocked_MPIBAIJ"
298 int MatSetValuesBlocked_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
299 {
300   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
301   Scalar      *value,*barray=baij->barray;
302   int         ierr,i,j,ii,jj,row,col,k,l;
303   int         roworiented = baij->roworiented,rstart=baij->rstart ;
304   int         rend=baij->rend,cstart=baij->cstart,stepval;
305   int         cend=baij->cend,bs=baij->bs,bs2=baij->bs2;
306 
307 
308   if(!barray) {
309     barray = (Scalar*) PetscMalloc(bs2*sizeof(Scalar)); CHKPTRQ(barray);
310     baij->barray = barray;
311   }
312 
313   if (roworiented) {
314     stepval = (n-1)*bs;
315   } else {
316     stepval = (m-1)*bs;
317   }
318   for ( i=0; i<m; i++ ) {
319 #if defined(PETSC_BOPT_g)
320     if (im[i] < 0) SETERRQ(1,0,"Negative row");
321     if (im[i] >= baij->Mbs) SETERRQ(1,0,"Row too large");
322 #endif
323     if (im[i] >= rstart && im[i] < rend) {
324       row = im[i] - rstart;
325       for ( j=0; j<n; j++ ) {
326         /* If NumCol = 1 then a copy is not required */
327         if ((roworiented) && (n == 1)) {
328           barray = v + i*bs2;
329         } else if((!roworiented) && (m == 1)) {
330           barray = v + j*bs2;
331         } else { /* Here a copy is required */
332           if (roworiented) {
333             value = v + i*(stepval+bs)*bs + j*bs;
334           } else {
335             value = v + j*(stepval+bs)*bs + i*bs;
336           }
337           for ( ii=0; ii<bs; ii++,value+=stepval )
338             for (jj=0; jj<bs; jj++ )
339               *barray++  = *value++;
340           barray -=bs2;
341         }
342 
343         if (in[j] >= cstart && in[j] < cend){
344           col = in[j] - cstart;
345           ierr = MatSetValuesBlocked_SeqBAIJ(baij->A,1,&row,1,&col,barray,addv);CHKERRQ(ierr);
346         }
347 #if defined(PETSC_BOPT_g)
348         else if (in[j] < 0) {SETERRQ(1,0,"Negative column");}
349         else if (in[j] >= baij->Nbs) {SETERRQ(1,0,"Col too large");}
350 #endif
351         else {
352           if (mat->was_assembled) {
353             if (!baij->colmap) {
354               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
355             }
356             col = baij->colmap[in[j]] - 1;
357             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
358               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
359               col =  in[j];
360             }
361           }
362           else col = in[j];
363           ierr = MatSetValuesBlocked_SeqBAIJ(baij->B,1,&row,1,&col,barray,addv);CHKERRQ(ierr);
364         }
365       }
366     }
367     else {
368       if (!baij->donotstash) {
369         if (roworiented ) {
370           row   = im[i]*bs;
371           value = v + i*(stepval+bs)*bs;
372           for ( j=0; j<bs; j++,row++ ) {
373             for ( k=0; k<n; k++ ) {
374               for ( col=in[k]*bs,l=0; l<bs; l++,col++) {
375                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
376               }
377             }
378           }
379         }
380         else {
381           for ( j=0; j<n; j++ ) {
382             value = v + j*(stepval+bs)*bs + i*bs;
383             col   = in[j]*bs;
384             for ( k=0; k<bs; k++,col++,value+=stepval) {
385               for ( row = im[i]*bs,l=0; l<bs; l++,row++) {
386                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
387               }
388             }
389           }
390 
391         }
392       }
393     }
394   }
395   return 0;
396 }
397 
398 #undef __FUNC__
399 #define __FUNC__ "MatGetValues_MPIBAIJ"
400 int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
401 {
402   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
403   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
404   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col;
405 
406   for ( i=0; i<m; i++ ) {
407     if (idxm[i] < 0) SETERRQ(1,0,"Negative row");
408     if (idxm[i] >= baij->M) SETERRQ(1,0,"Row too large");
409     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
410       row = idxm[i] - bsrstart;
411       for ( j=0; j<n; j++ ) {
412         if (idxn[j] < 0) SETERRQ(1,0,"Negative column");
413         if (idxn[j] >= baij->N) SETERRQ(1,0,"Col too large");
414         if (idxn[j] >= bscstart && idxn[j] < bscend){
415           col = idxn[j] - bscstart;
416           ierr = MatGetValues(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
417         }
418         else {
419           if (!baij->colmap) {
420             ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
421           }
422           if((baij->colmap[idxn[j]/bs]-1 < 0) ||
423              (baij->garray[(baij->colmap[idxn[j]/bs]-1)/bs] != idxn[j]/bs)) *(v+i*n+j) = 0.0;
424           else {
425             col  = (baij->colmap[idxn[j]/bs]-1) + idxn[j]%bs;
426             ierr = MatGetValues(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
427           }
428         }
429       }
430     }
431     else {
432       SETERRQ(1,0,"Only local values currently supported");
433     }
434   }
435   return 0;
436 }
437 
438 #undef __FUNC__
439 #define __FUNC__ "MatNorm_MPIBAIJ"
440 int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
441 {
442   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
443   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
444   int        ierr, i,bs2=baij->bs2;
445   double     sum = 0.0;
446   Scalar     *v;
447 
448   if (baij->size == 1) {
449     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
450   } else {
451     if (type == NORM_FROBENIUS) {
452       v = amat->a;
453       for (i=0; i<amat->nz*bs2; i++ ) {
454 #if defined(PETSC_COMPLEX)
455         sum += real(conj(*v)*(*v)); v++;
456 #else
457         sum += (*v)*(*v); v++;
458 #endif
459       }
460       v = bmat->a;
461       for (i=0; i<bmat->nz*bs2; i++ ) {
462 #if defined(PETSC_COMPLEX)
463         sum += real(conj(*v)*(*v)); v++;
464 #else
465         sum += (*v)*(*v); v++;
466 #endif
467       }
468       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
469       *norm = sqrt(*norm);
470     }
471     else
472       SETERRQ(PETSC_ERR_SUP,0,"No support for this norm yet");
473   }
474   return 0;
475 }
476 
477 #undef __FUNC__
478 #define __FUNC__ "MatAssemblyBegin_MPIBAIJ"
479 int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
480 {
481   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
482   MPI_Comm    comm = mat->comm;
483   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
484   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
485   MPI_Request *send_waits,*recv_waits;
486   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
487   InsertMode  addv;
488   Scalar      *rvalues,*svalues;
489 
490   /* make sure all processors are either in INSERTMODE or ADDMODE */
491   MPI_Allreduce(&mat->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
492   if (addv == (ADD_VALUES|INSERT_VALUES)) {
493     SETERRQ(1,0,"Some processors inserted others added");
494   }
495   mat->insertmode = addv; /* in case this processor had no cache */
496 
497   /*  first count number of contributors to each processor */
498   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
499   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
500   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
501   for ( i=0; i<baij->stash.n; i++ ) {
502     idx = baij->stash.idx[i];
503     for ( j=0; j<size; j++ ) {
504       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
505         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
506       }
507     }
508   }
509   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
510 
511   /* inform other processors of number of messages and max length*/
512   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
513   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
514   nreceives = work[rank];
515   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
516   nmax = work[rank];
517   PetscFree(work);
518 
519   /* post receives:
520        1) each message will consist of ordered pairs
521      (global index,value) we store the global index as a double
522      to simplify the message passing.
523        2) since we don't know how long each individual message is we
524      allocate the largest needed buffer for each receive. Potentially
525      this is a lot of wasted space.
526 
527 
528        This could be done better.
529   */
530   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
531   CHKPTRQ(rvalues);
532   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
533   CHKPTRQ(recv_waits);
534   for ( i=0; i<nreceives; i++ ) {
535     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
536               comm,recv_waits+i);
537   }
538 
539   /* do sends:
540       1) starts[i] gives the starting index in svalues for stuff going to
541          the ith processor
542   */
543   svalues = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
544   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
545   CHKPTRQ(send_waits);
546   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
547   starts[0] = 0;
548   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
549   for ( i=0; i<baij->stash.n; i++ ) {
550     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
551     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
552     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
553   }
554   PetscFree(owner);
555   starts[0] = 0;
556   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
557   count = 0;
558   for ( i=0; i<size; i++ ) {
559     if (procs[i]) {
560       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
561                 comm,send_waits+count++);
562     }
563   }
564   PetscFree(starts); PetscFree(nprocs);
565 
566   /* Free cache space */
567   PLogInfo(mat,"MatAssemblyBegin_MPIBAIJ:Number of off-processor values %d\n",baij->stash.n);
568   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
569 
570   baij->svalues    = svalues;    baij->rvalues    = rvalues;
571   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
572   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
573   baij->rmax       = nmax;
574 
575   return 0;
576 }
577 
578 
579 #undef __FUNC__
580 #define __FUNC__ "MatAssemblyEnd_MPIBAIJ"
581 int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
582 {
583   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
584   MPI_Status  *send_status,recv_status;
585   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
586   int         bs=baij->bs,row,col,other_disassembled;
587   Scalar      *values,val;
588   InsertMode  addv = mat->insertmode;
589 
590   /*  wait on receives */
591   while (count) {
592     MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);
593     /* unpack receives into our local space */
594     values = baij->rvalues + 3*imdex*baij->rmax;
595     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
596     n = n/3;
597     for ( i=0; i<n; i++ ) {
598       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
599       col = (int) PetscReal(values[3*i+1]);
600       val = values[3*i+2];
601       if (col >= baij->cstart*bs && col < baij->cend*bs) {
602         col -= baij->cstart*bs;
603         MatSetValues(baij->A,1,&row,1,&col,&val,addv);
604       }
605       else {
606         if (mat->was_assembled) {
607           if (!baij->colmap) {
608             ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);
609           }
610           col = (baij->colmap[col/bs]-1)*bs + col%bs;
611           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
612             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
613             col = (int) PetscReal(values[3*i+1]);
614           }
615         }
616         MatSetValues(baij->B,1,&row,1,&col,&val,addv);
617       }
618     }
619     count--;
620   }
621   PetscFree(baij->recv_waits); PetscFree(baij->rvalues);
622 
623   /* wait on sends */
624   if (baij->nsends) {
625     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));
626     CHKPTRQ(send_status);
627     MPI_Waitall(baij->nsends,baij->send_waits,send_status);
628     PetscFree(send_status);
629   }
630   PetscFree(baij->send_waits); PetscFree(baij->svalues);
631 
632   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
633   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
634 
635   /* determine if any processor has disassembled, if so we must
636      also disassemble ourselfs, in order that we may reassemble. */
637   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
638   if (mat->was_assembled && !other_disassembled) {
639     ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
640   }
641 
642   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
643     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
644   }
645   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
646   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
647 
648   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
649   return 0;
650 }
651 
652 #undef __FUNC__
653 #define __FUNC__ "MatView_MPIBAIJ_Binary"
654 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
655 {
656   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
657   int          ierr;
658 
659   if (baij->size == 1) {
660     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
661   }
662   else SETERRQ(1,0,"Only uniprocessor output supported");
663   return 0;
664 }
665 
666 #undef __FUNC__
667 #define __FUNC__ "MatView_MPIBAIJ_ASCIIorDraworMatlab"
668 static int MatView_MPIBAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
669 {
670   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
671   int          ierr, format,rank,bs = baij->bs;
672   FILE         *fd;
673   ViewerType   vtype;
674 
675   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
676   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
677     ierr = ViewerGetFormat(viewer,&format);
678     if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
679       MatInfo info;
680       MPI_Comm_rank(mat->comm,&rank);
681       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
682       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
683       PetscSequentialPhaseBegin(mat->comm,1);
684       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
685               rank,baij->m,(int)info.nz_used*bs,(int)info.nz_allocated*bs,
686               baij->bs,(int)info.memory);
687       ierr = MatGetInfo(baij->A,MAT_LOCAL,&info);
688       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
689       ierr = MatGetInfo(baij->B,MAT_LOCAL,&info);
690       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
691       fflush(fd);
692       PetscSequentialPhaseEnd(mat->comm,1);
693       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
694       return 0;
695     }
696     else if (format == VIEWER_FORMAT_ASCII_INFO) {
697       PetscPrintf(mat->comm,"  block size is %d\n",bs);
698       return 0;
699     }
700   }
701 
702   if (vtype == DRAW_VIEWER) {
703     Draw       draw;
704     PetscTruth isnull;
705     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
706     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
707   }
708 
709   if (vtype == ASCII_FILE_VIEWER) {
710     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
711     PetscSequentialPhaseBegin(mat->comm,1);
712     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
713            baij->rank,baij->m,baij->rstart*bs,baij->rend*bs,baij->n,
714             baij->cstart*bs,baij->cend*bs);
715     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
716     ierr = MatView(baij->B,viewer); CHKERRQ(ierr);
717     fflush(fd);
718     PetscSequentialPhaseEnd(mat->comm,1);
719   }
720   else {
721     int size = baij->size;
722     rank = baij->rank;
723     if (size == 1) {
724       ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
725     }
726     else {
727       /* assemble the entire matrix onto first processor. */
728       Mat         A;
729       Mat_SeqBAIJ *Aloc;
730       int         M = baij->M, N = baij->N,*ai,*aj,row,col,i,j,k,*rvals;
731       int         mbs=baij->mbs;
732       Scalar      *a;
733 
734       if (!rank) {
735         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
736         CHKERRQ(ierr);
737       }
738       else {
739         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
740         CHKERRQ(ierr);
741       }
742       PLogObjectParent(mat,A);
743 
744       /* copy over the A part */
745       Aloc = (Mat_SeqBAIJ*) baij->A->data;
746       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
747       row = baij->rstart;
748       rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
749 
750       for ( i=0; i<mbs; i++ ) {
751         rvals[0] = bs*(baij->rstart + i);
752         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
753         for ( j=ai[i]; j<ai[i+1]; j++ ) {
754           col = (baij->cstart+aj[j])*bs;
755           for (k=0; k<bs; k++ ) {
756             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
757             col++; a += bs;
758           }
759         }
760       }
761       /* copy over the B part */
762       Aloc = (Mat_SeqBAIJ*) baij->B->data;
763       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
764       row = baij->rstart*bs;
765       for ( i=0; i<mbs; i++ ) {
766         rvals[0] = bs*(baij->rstart + i);
767         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
768         for ( j=ai[i]; j<ai[i+1]; j++ ) {
769           col = baij->garray[aj[j]]*bs;
770           for (k=0; k<bs; k++ ) {
771             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
772             col++; a += bs;
773           }
774         }
775       }
776       PetscFree(rvals);
777       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
778       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
779       if (!rank) {
780         ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
781       }
782       ierr = MatDestroy(A); CHKERRQ(ierr);
783     }
784   }
785   return 0;
786 }
787 
788 
789 
790 #undef __FUNC__
791 #define __FUNC__ "MatView_MPIBAIJ"
792 int MatView_MPIBAIJ(PetscObject obj,Viewer viewer)
793 {
794   Mat         mat = (Mat) obj;
795   int         ierr;
796   ViewerType  vtype;
797 
798   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
799   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
800       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
801     ierr = MatView_MPIBAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
802   }
803   else if (vtype == BINARY_FILE_VIEWER) {
804     return MatView_MPIBAIJ_Binary(mat,viewer);
805   }
806   return 0;
807 }
808 
809 #undef __FUNC__
810 #define __FUNC__ "MatDestroy_MPIBAIJ"
811 int MatDestroy_MPIBAIJ(PetscObject obj)
812 {
813   Mat         mat = (Mat) obj;
814   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
815   int         ierr;
816 
817 #if defined(PETSC_LOG)
818   PLogObjectState(obj,"Rows=%d, Cols=%d",baij->M,baij->N);
819 #endif
820 
821   PetscFree(baij->rowners);
822   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
823   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
824   if (baij->colmap) PetscFree(baij->colmap);
825   if (baij->garray) PetscFree(baij->garray);
826   if (baij->lvec)   VecDestroy(baij->lvec);
827   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
828   if (baij->rowvalues) PetscFree(baij->rowvalues);
829   if (baij->barray) PetscFree(baij->barray);
830   PetscFree(baij);
831   if (mat->mapping) {
832     ierr = ISLocalToGlobalMappingDestroy(mat->mapping); CHKERRQ(ierr);
833   }
834   PLogObjectDestroy(mat);
835   PetscHeaderDestroy(mat);
836   return 0;
837 }
838 
839 #undef __FUNC__
840 #define __FUNC__ "MatMult_MPIBAIJ"
841 int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
842 {
843   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
844   int         ierr, nt;
845 
846   VecGetLocalSize_Fast(xx,nt);
847   if (nt != a->n) {
848     SETERRQ(1,0,"Incompatible partition of A and xx");
849   }
850   VecGetLocalSize_Fast(yy,nt);
851   if (nt != a->m) {
852     SETERRQ(1,0,"Incompatible parition of A and yy");
853   }
854   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
855   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
856   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
857   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
858   ierr = VecScatterPostRecvs(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
859   return 0;
860 }
861 
862 #undef __FUNC__
863 #define __FUNC__ "MatMultAdd_MPIBAIJ"
864 int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
865 {
866   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
867   int        ierr;
868   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
869   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
870   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
871   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
872   return 0;
873 }
874 
875 #undef __FUNC__
876 #define __FUNC__ "MatMultTrans_MPIBAIJ"
877 int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
878 {
879   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
880   int        ierr;
881 
882   /* do nondiagonal part */
883   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
884   /* send it on its way */
885   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
886   /* do local part */
887   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
888   /* receive remote parts: note this assumes the values are not actually */
889   /* inserted in yy until the next line, which is true for my implementation*/
890   /* but is not perhaps always true. */
891   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
892   return 0;
893 }
894 
895 #undef __FUNC__
896 #define __FUNC__ "MatMultTransAdd_MPIBAIJ"
897 int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
898 {
899   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
900   int        ierr;
901 
902   /* do nondiagonal part */
903   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
904   /* send it on its way */
905   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
906   /* do local part */
907   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
908   /* receive remote parts: note this assumes the values are not actually */
909   /* inserted in yy until the next line, which is true for my implementation*/
910   /* but is not perhaps always true. */
911   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
912   return 0;
913 }
914 
915 /*
916   This only works correctly for square matrices where the subblock A->A is the
917    diagonal block
918 */
919 #undef __FUNC__
920 #define __FUNC__ "MatGetDiagonal_MPIBAIJ"
921 int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
922 {
923   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
924   if (a->M != a->N)
925     SETERRQ(1,0,"Supports only square matrix where A->A is diag block");
926   return MatGetDiagonal(a->A,v);
927 }
928 
929 #undef __FUNC__
930 #define __FUNC__ "MatScale_MPIBAIJ"
931 int MatScale_MPIBAIJ(Scalar *aa,Mat A)
932 {
933   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
934   int        ierr;
935   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
936   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
937   return 0;
938 }
939 
940 #undef __FUNC__
941 #define __FUNC__ "MatGetSize_MPIBAIJ"
942 int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
943 {
944   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
945   *m = mat->M; *n = mat->N;
946   return 0;
947 }
948 
949 #undef __FUNC__
950 #define __FUNC__ "MatGetLocalSize_MPIBAIJ"
951 int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
952 {
953   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
954   *m = mat->m; *n = mat->N;
955   return 0;
956 }
957 
958 #undef __FUNC__
959 #define __FUNC__ "MatGetOwnershipRange_MPIBAIJ"
960 int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
961 {
962   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
963   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
964   return 0;
965 }
966 
967 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
968 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
969 
970 #undef __FUNC__
971 #define __FUNC__ "MatGetRow_MPIBAIJ"
972 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
973 {
974   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
975   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
976   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
977   int        nztot, nzA, nzB, lrow, brstart = mat->rstart*bs, brend = mat->rend*bs;
978   int        *cmap, *idx_p,cstart = mat->cstart;
979 
980   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,0,"Already active");
981   mat->getrowactive = PETSC_TRUE;
982 
983   if (!mat->rowvalues && (idx || v)) {
984     /*
985         allocate enough space to hold information from the longest row.
986     */
987     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
988     int     max = 1,mbs = mat->mbs,tmp;
989     for ( i=0; i<mbs; i++ ) {
990       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
991       if (max < tmp) { max = tmp; }
992     }
993     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
994     CHKPTRQ(mat->rowvalues);
995     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
996   }
997 
998 
999   if (row < brstart || row >= brend) SETERRQ(1,0,"Only local rows")
1000   lrow = row - brstart;
1001 
1002   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
1003   if (!v)   {pvA = 0; pvB = 0;}
1004   if (!idx) {pcA = 0; if (!v) pcB = 0;}
1005   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1006   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1007   nztot = nzA + nzB;
1008 
1009   cmap  = mat->garray;
1010   if (v  || idx) {
1011     if (nztot) {
1012       /* Sort by increasing column numbers, assuming A and B already sorted */
1013       int imark = -1;
1014       if (v) {
1015         *v = v_p = mat->rowvalues;
1016         for ( i=0; i<nzB; i++ ) {
1017           if (cmap[cworkB[i]/bs] < cstart)   v_p[i] = vworkB[i];
1018           else break;
1019         }
1020         imark = i;
1021         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
1022         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
1023       }
1024       if (idx) {
1025         *idx = idx_p = mat->rowindices;
1026         if (imark > -1) {
1027           for ( i=0; i<imark; i++ ) {
1028             idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs;
1029           }
1030         } else {
1031           for ( i=0; i<nzB; i++ ) {
1032             if (cmap[cworkB[i]/bs] < cstart)
1033               idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
1034             else break;
1035           }
1036           imark = i;
1037         }
1038         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart*bs + cworkA[i];
1039         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
1040       }
1041     }
1042     else {
1043       if (idx) *idx = 0;
1044       if (v)   *v   = 0;
1045     }
1046   }
1047   *nz = nztot;
1048   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1049   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1050   return 0;
1051 }
1052 
1053 #undef __FUNC__
1054 #define __FUNC__ "MatRestoreRow_MPIBAIJ"
1055 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
1056 {
1057   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1058   if (baij->getrowactive == PETSC_FALSE) {
1059     SETERRQ(1,0,"MatGetRow not called");
1060   }
1061   baij->getrowactive = PETSC_FALSE;
1062   return 0;
1063 }
1064 
1065 #undef __FUNC__
1066 #define __FUNC__ "MatGetBlockSize_MPIBAIJ"
1067 int MatGetBlockSize_MPIBAIJ(Mat mat,int *bs)
1068 {
1069   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1070   *bs = baij->bs;
1071   return 0;
1072 }
1073 
1074 #undef __FUNC__
1075 #define __FUNC__ "MatZeroEntries_MPIBAIJ"
1076 int MatZeroEntries_MPIBAIJ(Mat A)
1077 {
1078   Mat_MPIBAIJ *l = (Mat_MPIBAIJ *) A->data;
1079   int         ierr;
1080   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
1081   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
1082   return 0;
1083 }
1084 
1085 #undef __FUNC__
1086 #define __FUNC__ "MatGetInfo_MPIBAIJ"
1087 int MatGetInfo_MPIBAIJ(Mat matin,MatInfoType flag,MatInfo *info)
1088 {
1089   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) matin->data;
1090   Mat         A = a->A, B = a->B;
1091   int         ierr;
1092   double      isend[5], irecv[5];
1093 
1094   info->rows_global    = (double)a->M;
1095   info->columns_global = (double)a->N;
1096   info->rows_local     = (double)a->m;
1097   info->columns_local  = (double)a->N;
1098   info->block_size     = (double)a->bs;
1099   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
1100   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->memory;
1101   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
1102   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->memory;
1103   if (flag == MAT_LOCAL) {
1104     info->nz_used      = isend[0];
1105     info->nz_allocated = isend[1];
1106     info->nz_unneeded  = isend[2];
1107     info->memory       = isend[3];
1108     info->mallocs      = isend[4];
1109   } else if (flag == MAT_GLOBAL_MAX) {
1110     MPI_Allreduce(isend,irecv,5,MPI_INT,MPI_MAX,matin->comm);
1111     info->nz_used      = irecv[0];
1112     info->nz_allocated = irecv[1];
1113     info->nz_unneeded  = irecv[2];
1114     info->memory       = irecv[3];
1115     info->mallocs      = irecv[4];
1116   } else if (flag == MAT_GLOBAL_SUM) {
1117     MPI_Allreduce(isend,irecv,5,MPI_INT,MPI_SUM,matin->comm);
1118     info->nz_used      = irecv[0];
1119     info->nz_allocated = irecv[1];
1120     info->nz_unneeded  = irecv[2];
1121     info->memory       = irecv[3];
1122     info->mallocs      = irecv[4];
1123   }
1124   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
1125   info->fill_ratio_needed = 0;
1126   info->factor_mallocs    = 0;
1127   return 0;
1128 }
1129 
1130 #undef __FUNC__
1131 #define __FUNC__ "MatSetOption_MPIBAIJ"
1132 int MatSetOption_MPIBAIJ(Mat A,MatOption op)
1133 {
1134   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1135 
1136   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
1137       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
1138       op == MAT_COLUMNS_UNSORTED ||
1139       op == MAT_COLUMNS_SORTED ||
1140       op == MAT_NEW_NONZERO_ALLOCATION_ERROR ||
1141       op == MAT_NEW_NONZERO_LOCATION_ERROR) {
1142         MatSetOption(a->A,op);
1143         MatSetOption(a->B,op);
1144   } else if (op == MAT_ROW_ORIENTED) {
1145         a->roworiented = 1;
1146         MatSetOption(a->A,op);
1147         MatSetOption(a->B,op);
1148   } else if (op == MAT_ROWS_SORTED ||
1149              op == MAT_ROWS_UNSORTED ||
1150              op == MAT_SYMMETRIC ||
1151              op == MAT_STRUCTURALLY_SYMMETRIC ||
1152              op == MAT_YES_NEW_DIAGONALS)
1153     PLogInfo(A,"Info:MatSetOption_MPIBAIJ:Option ignored\n");
1154   else if (op == MAT_COLUMN_ORIENTED) {
1155     a->roworiented = 0;
1156     MatSetOption(a->A,op);
1157     MatSetOption(a->B,op);
1158   } else if (op == MAT_IGNORE_OFF_PROC_ENTRIES) {
1159     a->donotstash = 1;
1160   } else if (op == MAT_NO_NEW_DIAGONALS)
1161     {SETERRQ(PETSC_ERR_SUP,0,"MAT_NO_NEW_DIAGONALS");}
1162   else
1163     {SETERRQ(PETSC_ERR_SUP,0,"unknown option");}
1164   return 0;
1165 }
1166 
1167 #undef __FUNC__
1168 #define __FUNC__ "MatTranspose_MPIBAIJ("
1169 int MatTranspose_MPIBAIJ(Mat A,Mat *matout)
1170 {
1171   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) A->data;
1172   Mat_SeqBAIJ *Aloc;
1173   Mat        B;
1174   int        ierr,M=baij->M,N=baij->N,*ai,*aj,row,i,*rvals,j,k,col;
1175   int        bs=baij->bs,mbs=baij->mbs;
1176   Scalar     *a;
1177 
1178   if (matout == PETSC_NULL && M != N)
1179     SETERRQ(1,0,"Square matrix only for in-place");
1180   ierr = MatCreateMPIBAIJ(A->comm,baij->bs,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);
1181   CHKERRQ(ierr);
1182 
1183   /* copy over the A part */
1184   Aloc = (Mat_SeqBAIJ*) baij->A->data;
1185   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1186   row = baij->rstart;
1187   rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
1188 
1189   for ( i=0; i<mbs; i++ ) {
1190     rvals[0] = bs*(baij->rstart + i);
1191     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1192     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1193       col = (baij->cstart+aj[j])*bs;
1194       for (k=0; k<bs; k++ ) {
1195         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1196         col++; a += bs;
1197       }
1198     }
1199   }
1200   /* copy over the B part */
1201   Aloc = (Mat_SeqBAIJ*) baij->B->data;
1202   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1203   row = baij->rstart*bs;
1204   for ( i=0; i<mbs; i++ ) {
1205     rvals[0] = bs*(baij->rstart + i);
1206     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1207     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1208       col = baij->garray[aj[j]]*bs;
1209       for (k=0; k<bs; k++ ) {
1210         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1211         col++; a += bs;
1212       }
1213     }
1214   }
1215   PetscFree(rvals);
1216   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1217   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1218 
1219   if (matout != PETSC_NULL) {
1220     *matout = B;
1221   } else {
1222     /* This isn't really an in-place transpose .... but free data structures from baij */
1223     PetscFree(baij->rowners);
1224     ierr = MatDestroy(baij->A); CHKERRQ(ierr);
1225     ierr = MatDestroy(baij->B); CHKERRQ(ierr);
1226     if (baij->colmap) PetscFree(baij->colmap);
1227     if (baij->garray) PetscFree(baij->garray);
1228     if (baij->lvec) VecDestroy(baij->lvec);
1229     if (baij->Mvctx) VecScatterDestroy(baij->Mvctx);
1230     PetscFree(baij);
1231     PetscMemcpy(A,B,sizeof(struct _Mat));
1232     PetscHeaderDestroy(B);
1233   }
1234   return 0;
1235 }
1236 
1237 #undef __FUNC__
1238 #define __FUNC__ "MatDiagonalScale_MPIBAIJ"
1239 int MatDiagonalScale_MPIBAIJ(Mat A,Vec ll,Vec rr)
1240 {
1241   Mat a = ((Mat_MPIBAIJ *) A->data)->A;
1242   Mat b = ((Mat_MPIBAIJ *) A->data)->B;
1243   int ierr,s1,s2,s3;
1244 
1245   if (ll)  {
1246     ierr = VecGetLocalSize(ll,&s1); CHKERRQ(ierr);
1247     ierr = MatGetLocalSize(A,&s2,&s3); CHKERRQ(ierr);
1248     if (s1!=s2) SETERRQ(1,0,"non-conforming local sizes");
1249     ierr = MatDiagonalScale(a,ll,0); CHKERRQ(ierr);
1250     ierr = MatDiagonalScale(b,ll,0); CHKERRQ(ierr);
1251   }
1252   if (rr) SETERRQ(1,0,"not supported for right vector");
1253   return 0;
1254 }
1255 
1256 /* the code does not do the diagonal entries correctly unless the
1257    matrix is square and the column and row owerships are identical.
1258    This is a BUG. The only way to fix it seems to be to access
1259    baij->A and baij->B directly and not through the MatZeroRows()
1260    routine.
1261 */
1262 #undef __FUNC__
1263 #define __FUNC__ "MatZeroRows_MPIBAIJ"
1264 int MatZeroRows_MPIBAIJ(Mat A,IS is,Scalar *diag)
1265 {
1266   Mat_MPIBAIJ    *l = (Mat_MPIBAIJ *) A->data;
1267   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
1268   int            *procs,*nprocs,j,found,idx,nsends,*work;
1269   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
1270   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
1271   int            *lens,imdex,*lrows,*values,bs=l->bs;
1272   MPI_Comm       comm = A->comm;
1273   MPI_Request    *send_waits,*recv_waits;
1274   MPI_Status     recv_status,*send_status;
1275   IS             istmp;
1276 
1277   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
1278   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
1279 
1280   /*  first count number of contributors to each processor */
1281   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
1282   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
1283   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
1284   for ( i=0; i<N; i++ ) {
1285     idx = rows[i];
1286     found = 0;
1287     for ( j=0; j<size; j++ ) {
1288       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
1289         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
1290       }
1291     }
1292     if (!found) SETERRQ(1,0,"Index out of range");
1293   }
1294   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
1295 
1296   /* inform other processors of number of messages and max length*/
1297   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
1298   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
1299   nrecvs = work[rank];
1300   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
1301   nmax = work[rank];
1302   PetscFree(work);
1303 
1304   /* post receives:   */
1305   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
1306   CHKPTRQ(rvalues);
1307   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
1308   CHKPTRQ(recv_waits);
1309   for ( i=0; i<nrecvs; i++ ) {
1310     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
1311   }
1312 
1313   /* do sends:
1314       1) starts[i] gives the starting index in svalues for stuff going to
1315          the ith processor
1316   */
1317   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
1318   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
1319   CHKPTRQ(send_waits);
1320   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
1321   starts[0] = 0;
1322   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1323   for ( i=0; i<N; i++ ) {
1324     svalues[starts[owner[i]]++] = rows[i];
1325   }
1326   ISRestoreIndices(is,&rows);
1327 
1328   starts[0] = 0;
1329   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1330   count = 0;
1331   for ( i=0; i<size; i++ ) {
1332     if (procs[i]) {
1333       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
1334     }
1335   }
1336   PetscFree(starts);
1337 
1338   base = owners[rank]*bs;
1339 
1340   /*  wait on receives */
1341   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
1342   source = lens + nrecvs;
1343   count  = nrecvs; slen = 0;
1344   while (count) {
1345     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
1346     /* unpack receives into our local space */
1347     MPI_Get_count(&recv_status,MPI_INT,&n);
1348     source[imdex]  = recv_status.MPI_SOURCE;
1349     lens[imdex]  = n;
1350     slen += n;
1351     count--;
1352   }
1353   PetscFree(recv_waits);
1354 
1355   /* move the data into the send scatter */
1356   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
1357   count = 0;
1358   for ( i=0; i<nrecvs; i++ ) {
1359     values = rvalues + i*nmax;
1360     for ( j=0; j<lens[i]; j++ ) {
1361       lrows[count++] = values[j] - base;
1362     }
1363   }
1364   PetscFree(rvalues); PetscFree(lens);
1365   PetscFree(owner); PetscFree(nprocs);
1366 
1367   /* actually zap the local rows */
1368   ierr = ISCreateGeneral(PETSC_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
1369   PLogObjectParent(A,istmp);
1370   PetscFree(lrows);
1371   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
1372   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
1373   ierr = ISDestroy(istmp); CHKERRQ(ierr);
1374 
1375   /* wait on sends */
1376   if (nsends) {
1377     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
1378     CHKPTRQ(send_status);
1379     MPI_Waitall(nsends,send_waits,send_status);
1380     PetscFree(send_status);
1381   }
1382   PetscFree(send_waits); PetscFree(svalues);
1383 
1384   return 0;
1385 }
1386 extern int MatPrintHelp_SeqBAIJ(Mat);
1387 #undef __FUNC__
1388 #define __FUNC__ "MatPrintHelp_MPIBAIJ"
1389 int MatPrintHelp_MPIBAIJ(Mat A)
1390 {
1391   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1392 
1393   if (!a->rank) return MatPrintHelp_SeqBAIJ(a->A);
1394   else return 0;
1395 }
1396 
1397 #undef __FUNC__
1398 #define __FUNC__ "MatSetUnfactored_MPIBAIJ"
1399 int MatSetUnfactored_MPIBAIJ(Mat A)
1400 {
1401   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1402   int         ierr;
1403   ierr = MatSetUnfactored(a->A); CHKERRQ(ierr);
1404   return 0;
1405 }
1406 
1407 static int MatConvertSameType_MPIBAIJ(Mat,Mat *,int);
1408 
1409 /* -------------------------------------------------------------------*/
1410 static struct _MatOps MatOps = {
1411   MatSetValues_MPIBAIJ,MatGetRow_MPIBAIJ,MatRestoreRow_MPIBAIJ,MatMult_MPIBAIJ,
1412   MatMultAdd_MPIBAIJ,MatMultTrans_MPIBAIJ,MatMultTransAdd_MPIBAIJ,0,
1413   0,0,0,0,
1414   0,0,MatTranspose_MPIBAIJ,MatGetInfo_MPIBAIJ,
1415   0,MatGetDiagonal_MPIBAIJ,MatDiagonalScale_MPIBAIJ,MatNorm_MPIBAIJ,
1416   MatAssemblyBegin_MPIBAIJ,MatAssemblyEnd_MPIBAIJ,0,MatSetOption_MPIBAIJ,
1417   MatZeroEntries_MPIBAIJ,MatZeroRows_MPIBAIJ,0,
1418   0,0,0,MatGetSize_MPIBAIJ,
1419   MatGetLocalSize_MPIBAIJ,MatGetOwnershipRange_MPIBAIJ,0,0,
1420   0,0,MatConvertSameType_MPIBAIJ,0,0,
1421   0,0,0,MatGetSubMatrices_MPIBAIJ,
1422   MatIncreaseOverlap_MPIBAIJ,MatGetValues_MPIBAIJ,0,MatPrintHelp_MPIBAIJ,
1423   MatScale_MPIBAIJ,0,0,0,MatGetBlockSize_MPIBAIJ,
1424   0,0,0,0,0,0,MatSetUnfactored_MPIBAIJ,0,MatSetValuesBlocked_MPIBAIJ};
1425 
1426 
1427 #undef __FUNC__
1428 #define __FUNC__ "MatCreateMPIBAIJ"
1429 /*@C
1430    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
1431    (block compressed row).  For good matrix assembly performance
1432    the user should preallocate the matrix storage by setting the parameters
1433    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1434    performance can be increased by more than a factor of 50.
1435 
1436    Input Parameters:
1437 .  comm - MPI communicator
1438 .  bs   - size of blockk
1439 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1440            This value should be the same as the local size used in creating the
1441            y vector for the matrix-vector product y = Ax.
1442 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1443            This value should be the same as the local size used in creating the
1444            x vector for the matrix-vector product y = Ax.
1445 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1446 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1447 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
1448            submatrix  (same for all local rows)
1449 .  d_nzz - array containing the number of block nonzeros in the various block rows
1450            of the in diagonal portion of the local (possibly different for each block
1451            row) or PETSC_NULL.  You must leave room for the diagonal entry even if
1452            it is zero.
1453 .  o_nz  - number of block nonzeros per block row in the off-diagonal portion of local
1454            submatrix (same for all local rows).
1455 .  o_nzz - array containing the number of nonzeros in the various block rows of the
1456            off-diagonal portion of the local submatrix (possibly different for
1457            each block row) or PETSC_NULL.
1458 
1459    Output Parameter:
1460 .  A - the matrix
1461 
1462    Notes:
1463    The user MUST specify either the local or global matrix dimensions
1464    (possibly both).
1465 
1466    Storage Information:
1467    For a square global matrix we define each processor's diagonal portion
1468    to be its local rows and the corresponding columns (a square submatrix);
1469    each processor's off-diagonal portion encompasses the remainder of the
1470    local matrix (a rectangular submatrix).
1471 
1472    The user can specify preallocated storage for the diagonal part of
1473    the local submatrix with either d_nz or d_nnz (not both).  Set
1474    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1475    memory allocation.  Likewise, specify preallocated storage for the
1476    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1477 
1478    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1479    the figure below we depict these three local rows and all columns (0-11).
1480 
1481 $          0 1 2 3 4 5 6 7 8 9 10 11
1482 $         -------------------
1483 $  row 3  |  o o o d d d o o o o o o
1484 $  row 4  |  o o o d d d o o o o o o
1485 $  row 5  |  o o o d d d o o o o o o
1486 $         -------------------
1487 $
1488 
1489    Thus, any entries in the d locations are stored in the d (diagonal)
1490    submatrix, and any entries in the o locations are stored in the
1491    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1492    stored simply in the MATSEQBAIJ format for compressed row storage.
1493 
1494    Now d_nz should indicate the number of nonzeros per row in the d matrix,
1495    and o_nz should indicate the number of nonzeros per row in the o matrix.
1496    In general, for PDE problems in which most nonzeros are near the diagonal,
1497    one expects d_nz >> o_nz.   For large problems you MUST preallocate memory
1498    or you will get TERRIBLE performance; see the users' manual chapter on
1499    matrices.
1500 
1501 .keywords: matrix, block, aij, compressed row, sparse, parallel
1502 
1503 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues()
1504 @*/
1505 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
1506                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1507 {
1508   Mat          B;
1509   Mat_MPIBAIJ  *b;
1510   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE,size;
1511 
1512   if (bs < 1) SETERRQ(1,0,"invalid block size specified");
1513   *A = 0;
1514   PetscHeaderCreate(B,_Mat,MAT_COOKIE,MATMPIBAIJ,comm);
1515   PLogObjectCreate(B);
1516   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
1517   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
1518   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1519   MPI_Comm_size(comm,&size);
1520   if (size == 1) {
1521     B->ops.getrowij          = MatGetRowIJ_MPIBAIJ;
1522     B->ops.restorerowij      = MatRestoreRowIJ_MPIBAIJ;
1523     B->ops.lufactorsymbolic  = MatLUFactorSymbolic_MPIBAIJ;
1524     B->ops.lufactornumeric   = MatLUFactorNumeric_MPIBAIJ;
1525     B->ops.lufactor          = MatLUFactor_MPIBAIJ;
1526     B->ops.solve             = MatSolve_MPIBAIJ;
1527     B->ops.solveadd          = MatSolveAdd_MPIBAIJ;
1528     B->ops.solvetrans        = MatSolveTrans_MPIBAIJ;
1529     B->ops.solvetransadd     = MatSolveTransAdd_MPIBAIJ;
1530     B->ops.ilufactorsymbolic = MatILUFactorSymbolic_MPIBAIJ;
1531   }
1532 
1533   B->destroy    = MatDestroy_MPIBAIJ;
1534   B->view       = MatView_MPIBAIJ;
1535   B->mapping    = 0;
1536   B->factor     = 0;
1537   B->assembled  = PETSC_FALSE;
1538 
1539   B->insertmode = NOT_SET_VALUES;
1540   MPI_Comm_rank(comm,&b->rank);
1541   MPI_Comm_size(comm,&b->size);
1542 
1543   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
1544     SETERRQ(1,0,"Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1545   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) SETERRQ(1,0,"either M or m should be specified");
1546   if ( M == PETSC_DECIDE && n == PETSC_DECIDE)SETERRQ(1,0,"either N or n should be specified");
1547   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
1548   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
1549 
1550   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1551     work[0] = m; work[1] = n;
1552     mbs = m/bs; nbs = n/bs;
1553     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
1554     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
1555     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
1556   }
1557   if (m == PETSC_DECIDE) {
1558     Mbs = M/bs;
1559     if (Mbs*bs != M) SETERRQ(1,0,"No of global rows must be divisible by blocksize");
1560     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
1561     m   = mbs*bs;
1562   }
1563   if (n == PETSC_DECIDE) {
1564     Nbs = N/bs;
1565     if (Nbs*bs != N) SETERRQ(1,0,"No of global cols must be divisible by blocksize");
1566     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
1567     n   = nbs*bs;
1568   }
1569   if (mbs*bs != m || nbs*bs != n) SETERRQ(1,0,"No of local rows, cols must be divisible by blocksize");
1570 
1571   b->m = m; B->m = m;
1572   b->n = n; B->n = n;
1573   b->N = N; B->N = N;
1574   b->M = M; B->M = M;
1575   b->bs  = bs;
1576   b->bs2 = bs*bs;
1577   b->mbs = mbs;
1578   b->nbs = nbs;
1579   b->Mbs = Mbs;
1580   b->Nbs = Nbs;
1581 
1582   /* build local table of row and column ownerships */
1583   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1584   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
1585   b->cowners = b->rowners + b->size + 2;
1586   MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
1587   b->rowners[0] = 0;
1588   for ( i=2; i<=b->size; i++ ) {
1589     b->rowners[i] += b->rowners[i-1];
1590   }
1591   b->rstart    = b->rowners[b->rank];
1592   b->rend      = b->rowners[b->rank+1];
1593   b->rstart_bs = b->rstart * bs;
1594   b->rend_bs   = b->rend * bs;
1595 
1596   MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
1597   b->cowners[0] = 0;
1598   for ( i=2; i<=b->size; i++ ) {
1599     b->cowners[i] += b->cowners[i-1];
1600   }
1601   b->cstart    = b->cowners[b->rank];
1602   b->cend      = b->cowners[b->rank+1];
1603   b->cstart_bs = b->cstart * bs;
1604   b->cend_bs   = b->cend * bs;
1605 
1606   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1607   ierr = MatCreateSeqBAIJ(PETSC_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1608   PLogObjectParent(B,b->A);
1609   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1610   ierr = MatCreateSeqBAIJ(PETSC_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1611   PLogObjectParent(B,b->B);
1612 
1613   /* build cache for off array entries formed */
1614   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1615   b->donotstash  = 0;
1616   b->colmap      = 0;
1617   b->garray      = 0;
1618   b->roworiented = 1;
1619 
1620   /* stuff used in block assembly */
1621   b->barray      = 0;
1622 
1623   /* stuff used for matrix vector multiply */
1624   b->lvec        = 0;
1625   b->Mvctx       = 0;
1626 
1627   /* stuff for MatGetRow() */
1628   b->rowindices   = 0;
1629   b->rowvalues    = 0;
1630   b->getrowactive = PETSC_FALSE;
1631 
1632   *A = B;
1633   return 0;
1634 }
1635 
1636 #undef __FUNC__
1637 #define __FUNC__ "MatConvertSameType_MPIBAIJ"
1638 static int MatConvertSameType_MPIBAIJ(Mat matin,Mat *newmat,int cpvalues)
1639 {
1640   Mat         mat;
1641   Mat_MPIBAIJ *a,*oldmat = (Mat_MPIBAIJ *) matin->data;
1642   int         ierr, len=0, flg;
1643 
1644   *newmat       = 0;
1645   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIBAIJ,matin->comm);
1646   PLogObjectCreate(mat);
1647   mat->data       = (void *) (a = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(a);
1648   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1649   mat->destroy    = MatDestroy_MPIBAIJ;
1650   mat->view       = MatView_MPIBAIJ;
1651   mat->factor     = matin->factor;
1652   mat->assembled  = PETSC_TRUE;
1653 
1654   a->m = mat->m   = oldmat->m;
1655   a->n = mat->n   = oldmat->n;
1656   a->M = mat->M   = oldmat->M;
1657   a->N = mat->N   = oldmat->N;
1658 
1659   a->bs  = oldmat->bs;
1660   a->bs2 = oldmat->bs2;
1661   a->mbs = oldmat->mbs;
1662   a->nbs = oldmat->nbs;
1663   a->Mbs = oldmat->Mbs;
1664   a->Nbs = oldmat->Nbs;
1665 
1666   a->rstart       = oldmat->rstart;
1667   a->rend         = oldmat->rend;
1668   a->cstart       = oldmat->cstart;
1669   a->cend         = oldmat->cend;
1670   a->size         = oldmat->size;
1671   a->rank         = oldmat->rank;
1672   mat->insertmode = NOT_SET_VALUES;
1673   a->rowvalues    = 0;
1674   a->getrowactive = PETSC_FALSE;
1675   a->barray       = 0;
1676 
1677   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1678   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
1679   a->cowners = a->rowners + a->size + 2;
1680   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1681   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1682   if (oldmat->colmap) {
1683     a->colmap = (int *) PetscMalloc((a->Nbs)*sizeof(int));CHKPTRQ(a->colmap);
1684     PLogObjectMemory(mat,(a->Nbs)*sizeof(int));
1685     PetscMemcpy(a->colmap,oldmat->colmap,(a->Nbs)*sizeof(int));
1686   } else a->colmap = 0;
1687   if (oldmat->garray && (len = ((Mat_SeqBAIJ *) (oldmat->B->data))->nbs)) {
1688     a->garray = (int *) PetscMalloc(len*sizeof(int)); CHKPTRQ(a->garray);
1689     PLogObjectMemory(mat,len*sizeof(int));
1690     PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1691   } else a->garray = 0;
1692 
1693   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1694   PLogObjectParent(mat,a->lvec);
1695   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1696   PLogObjectParent(mat,a->Mvctx);
1697   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1698   PLogObjectParent(mat,a->A);
1699   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1700   PLogObjectParent(mat,a->B);
1701   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1702   if (flg) {
1703     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1704   }
1705   *newmat = mat;
1706   return 0;
1707 }
1708 
1709 #include "sys.h"
1710 
1711 #undef __FUNC__
1712 #define __FUNC__ "MatLoad_MPIBAIJ"
1713 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
1714 {
1715   Mat          A;
1716   int          i, nz, ierr, j,rstart, rend, fd;
1717   Scalar       *vals,*buf;
1718   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1719   MPI_Status   status;
1720   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
1721   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
1722   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,bs2,Mbs,mbs,extra_rows;
1723   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
1724   int          dcount,kmax,k,nzcount,tmp;
1725 
1726 
1727   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
1728   bs2  = bs*bs;
1729 
1730   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1731   if (!rank) {
1732     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1733     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
1734     if (header[0] != MAT_COOKIE) SETERRQ(1,0,"not matrix object");
1735   }
1736 
1737   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1738   M = header[1]; N = header[2];
1739 
1740   if (M != N) SETERRQ(1,0,"Can only do square matrices");
1741 
1742   /*
1743      This code adds extra rows to make sure the number of rows is
1744      divisible by the blocksize
1745   */
1746   Mbs        = M/bs;
1747   extra_rows = bs - M + bs*(Mbs);
1748   if (extra_rows == bs) extra_rows = 0;
1749   else                  Mbs++;
1750   if (extra_rows &&!rank) {
1751     PLogInfo(0,"MatLoad_MPIBAIJ:Padding loaded matrix to match blocksize\n");
1752   }
1753 
1754   /* determine ownership of all rows */
1755   mbs = Mbs/size + ((Mbs % size) > rank);
1756   m   = mbs * bs;
1757   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
1758   browners = rowners + size + 1;
1759   MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1760   rowners[0] = 0;
1761   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
1762   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
1763   rstart = rowners[rank];
1764   rend   = rowners[rank+1];
1765 
1766   /* distribute row lengths to all processors */
1767   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
1768   if (!rank) {
1769     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
1770     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1771     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
1772     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1773     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
1774     MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);
1775     PetscFree(sndcounts);
1776   }
1777   else {
1778     MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);
1779   }
1780 
1781   if (!rank) {
1782     /* calculate the number of nonzeros on each processor */
1783     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1784     PetscMemzero(procsnz,size*sizeof(int));
1785     for ( i=0; i<size; i++ ) {
1786       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
1787         procsnz[i] += rowlengths[j];
1788       }
1789     }
1790     PetscFree(rowlengths);
1791 
1792     /* determine max buffer needed and allocate it */
1793     maxnz = 0;
1794     for ( i=0; i<size; i++ ) {
1795       maxnz = PetscMax(maxnz,procsnz[i]);
1796     }
1797     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1798 
1799     /* read in my part of the matrix column indices  */
1800     nz = procsnz[0];
1801     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1802     mycols = ibuf;
1803     if (size == 1)  nz -= extra_rows;
1804     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1805     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
1806 
1807     /* read in every ones (except the last) and ship off */
1808     for ( i=1; i<size-1; i++ ) {
1809       nz = procsnz[i];
1810       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1811       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1812     }
1813     /* read in the stuff for the last proc */
1814     if ( size != 1 ) {
1815       nz = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
1816       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1817       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
1818       MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);
1819     }
1820     PetscFree(cols);
1821   }
1822   else {
1823     /* determine buffer space needed for message */
1824     nz = 0;
1825     for ( i=0; i<m; i++ ) {
1826       nz += locrowlens[i];
1827     }
1828     ibuf = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1829     mycols = ibuf;
1830     /* receive message of column indices*/
1831     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1832     MPI_Get_count(&status,MPI_INT,&maxnz);
1833     if (maxnz != nz) SETERRQ(1,0,"something is wrong with file");
1834   }
1835 
1836   /* loop over local rows, determining number of off diagonal entries */
1837   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
1838   odlens = dlens + (rend-rstart);
1839   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
1840   PetscMemzero(mask,3*Mbs*sizeof(int));
1841   masked1 = mask    + Mbs;
1842   masked2 = masked1 + Mbs;
1843   rowcount = 0; nzcount = 0;
1844   for ( i=0; i<mbs; i++ ) {
1845     dcount  = 0;
1846     odcount = 0;
1847     for ( j=0; j<bs; j++ ) {
1848       kmax = locrowlens[rowcount];
1849       for ( k=0; k<kmax; k++ ) {
1850         tmp = mycols[nzcount++]/bs;
1851         if (!mask[tmp]) {
1852           mask[tmp] = 1;
1853           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
1854           else masked1[dcount++] = tmp;
1855         }
1856       }
1857       rowcount++;
1858     }
1859 
1860     dlens[i]  = dcount;
1861     odlens[i] = odcount;
1862 
1863     /* zero out the mask elements we set */
1864     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
1865     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
1866   }
1867 
1868   /* create our matrix */
1869   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);
1870          CHKERRQ(ierr);
1871   A = *newmat;
1872   MatSetOption(A,MAT_COLUMNS_SORTED);
1873 
1874   if (!rank) {
1875     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
1876     /* read in my part of the matrix numerical values  */
1877     nz = procsnz[0];
1878     vals = buf;
1879     mycols = ibuf;
1880     if (size == 1)  nz -= extra_rows;
1881     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1882     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
1883 
1884     /* insert into matrix */
1885     jj      = rstart*bs;
1886     for ( i=0; i<m; i++ ) {
1887       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1888       mycols += locrowlens[i];
1889       vals   += locrowlens[i];
1890       jj++;
1891     }
1892     /* read in other processors (except the last one) and ship out */
1893     for ( i=1; i<size-1; i++ ) {
1894       nz = procsnz[i];
1895       vals = buf;
1896       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1897       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1898     }
1899     /* the last proc */
1900     if ( size != 1 ){
1901       nz = procsnz[i] - extra_rows;
1902       vals = buf;
1903       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1904       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
1905       MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);
1906     }
1907     PetscFree(procsnz);
1908   }
1909   else {
1910     /* receive numeric values */
1911     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
1912 
1913     /* receive message of values*/
1914     vals = buf;
1915     mycols = ibuf;
1916     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1917     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1918     if (maxnz != nz) SETERRQ(1,0,"something is wrong with file");
1919 
1920     /* insert into matrix */
1921     jj      = rstart*bs;
1922     for ( i=0; i<m; i++ ) {
1923       ierr    = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1924       mycols += locrowlens[i];
1925       vals   += locrowlens[i];
1926       jj++;
1927     }
1928   }
1929   PetscFree(locrowlens);
1930   PetscFree(buf);
1931   PetscFree(ibuf);
1932   PetscFree(rowners);
1933   PetscFree(dlens);
1934   PetscFree(mask);
1935   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1936   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1937   return 0;
1938 }
1939 
1940 
1941