xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision 2b36279985d181bf1ae28d424577ae2c2fbebdf4)
1 #ifndef lint
2 static char vcid[] = "$Id: mpibaij.c,v 1.63 1997/04/01 14:34:09 balay Exp balay $";
3 #endif
4 
5 #include "pinclude/pviewer.h"
6 #include "src/mat/impls/baij/mpi/mpibaij.h"
7 #include "src/vec/vecimpl.h"
8 
9 
10 extern int MatSetUpMultiply_MPIBAIJ(Mat);
11 extern int DisAssemble_MPIBAIJ(Mat);
12 extern int MatIncreaseOverlap_MPIBAIJ(Mat,int,IS *,int);
13 extern int MatGetSubMatrices_MPIBAIJ(Mat,int,IS *,IS *,MatGetSubMatrixCall,Mat **);
14 extern int MatLUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,Mat *);
15 extern int MatLUFactorNumeric_MPIBAIJ(Mat,Mat *);
16 extern int MatLUFactor_MPIBAIJ(Mat,IS,IS,double);
17 extern int MatSolve_MPIBAIJ(Mat,Vec,Vec);
18 extern int MatSolveAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
19 extern int MatSolveTrans_MPIBAIJ(Mat,Vec,Vec);
20 extern int MatSolveTransAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
21 extern int MatILUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,int,Mat *);
22 
23 
24 /*
25      Local utility routine that creates a mapping from the global column
26    number to the local number in the off-diagonal part of the local
27    storage of the matrix.  This is done in a non scable way since the
28    length of colmap equals the global matrix length.
29 */
30 #undef __FUNC__
31 #define __FUNC__ "CreateColmap_MPIBAIJ_Private"
32 static int CreateColmap_MPIBAIJ_Private(Mat mat)
33 {
34   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
35   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
36   int         nbs = B->nbs,i,bs=B->bs;;
37 
38   baij->colmap = (int *) PetscMalloc(baij->Nbs*sizeof(int));CHKPTRQ(baij->colmap);
39   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
40   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
41   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i*bs+1;
42   return 0;
43 }
44 
45 #undef __FUNC__
46 #define __FUNC__ "MatGetRowIJ_MPIBAIJ("
47 static int MatGetRowIJ_MPIBAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
48                             PetscTruth *done)
49 {
50   Mat_MPIBAIJ *aij = (Mat_MPIBAIJ *) mat->data;
51   int         ierr;
52   if (aij->size == 1) {
53     ierr = MatGetRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
54   } else SETERRQ(1,0,"not supported in parallel");
55   return 0;
56 }
57 
58 #undef __FUNC__
59 #define __FUNC__ "MatRestoreRowIJ_MPIBAIJ"
60 static int MatRestoreRowIJ_MPIBAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
61                                 PetscTruth *done)
62 {
63   Mat_MPIBAIJ *aij = (Mat_MPIBAIJ *) mat->data;
64   int        ierr;
65   if (aij->size == 1) {
66     ierr = MatRestoreRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
67   } else SETERRQ(1,0,"not supported in parallel");
68   return 0;
69 }
70 #define CHUNKSIZE  10
71 
72 #define  MatSetValues_SeqBAIJ_A_Private(row,col,value,addv) \
73 { \
74  \
75     brow = row/bs;  \
76     rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
77     rmax = aimax[brow]; nrow = ailen[brow]; \
78       bcol = col/bs; \
79       ridx = row % bs; cidx = col % bs; \
80       low = 0; high = nrow; \
81       while (high-low > 3) { \
82         t = (low+high)/2; \
83         if (rp[t] > bcol) high = t; \
84         else              low  = t; \
85       } \
86       for ( _i=low; _i<high; _i++ ) { \
87         if (rp[_i] > bcol) break; \
88         if (rp[_i] == bcol) { \
89           bap  = ap +  bs2*_i + bs*cidx + ridx; \
90           if (addv == ADD_VALUES) *bap += value;  \
91           else                    *bap  = value;  \
92           goto a_noinsert; \
93         } \
94       } \
95       if (a->nonew) goto a_noinsert; \
96       if (nrow >= rmax) { \
97         /* there is no extra room in row, therefore enlarge */ \
98         int    new_nz = ai[a->mbs] + CHUNKSIZE,len,*new_i,*new_j; \
99         Scalar *new_a; \
100  \
101         /* malloc new storage space */ \
102         len     = new_nz*(sizeof(int)+bs2*sizeof(Scalar))+(a->mbs+1)*sizeof(int); \
103         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
104         new_j   = (int *) (new_a + bs2*new_nz); \
105         new_i   = new_j + new_nz; \
106  \
107         /* copy over old data into new slots */ \
108         for ( ii=0; ii<brow+1; ii++ ) {new_i[ii] = ai[ii];} \
109         for ( ii=brow+1; ii<a->mbs+1; ii++ ) {new_i[ii] = ai[ii]+CHUNKSIZE;} \
110         PetscMemcpy(new_j,aj,(ai[brow]+nrow)*sizeof(int)); \
111         len = (new_nz - CHUNKSIZE - ai[brow] - nrow); \
112         PetscMemcpy(new_j+ai[brow]+nrow+CHUNKSIZE,aj+ai[brow]+nrow, \
113                                                            len*sizeof(int)); \
114         PetscMemcpy(new_a,aa,(ai[brow]+nrow)*bs2*sizeof(Scalar)); \
115         PetscMemzero(new_a+bs2*(ai[brow]+nrow),bs2*CHUNKSIZE*sizeof(Scalar)); \
116         PetscMemcpy(new_a+bs2*(ai[brow]+nrow+CHUNKSIZE), \
117                     aa+bs2*(ai[brow]+nrow),bs2*len*sizeof(Scalar));  \
118         /* free up old matrix storage */ \
119         PetscFree(a->a);  \
120         if (!a->singlemalloc) {PetscFree(a->i);PetscFree(a->j);} \
121         aa = a->a = new_a; ai = a->i = new_i; aj = a->j = new_j;  \
122         a->singlemalloc = 1; \
123  \
124         rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
125         rmax = aimax[brow] = aimax[brow] + CHUNKSIZE; \
126         PLogObjectMemory(A,CHUNKSIZE*(sizeof(int) + bs2*sizeof(Scalar))); \
127         a->maxnz += bs2*CHUNKSIZE; \
128         a->reallocs++; \
129         a->nz++; \
130       } \
131       N = nrow++ - 1;  \
132       /* shift up all the later entries in this row */ \
133       for ( ii=N; ii>=_i; ii-- ) { \
134         rp[ii+1] = rp[ii]; \
135         PetscMemcpy(ap+bs2*(ii+1),ap+bs2*(ii),bs2*sizeof(Scalar)); \
136       } \
137       if (N>=_i) PetscMemzero(ap+bs2*_i,bs2*sizeof(Scalar));  \
138       rp[_i]                      = bcol;  \
139       ap[bs2*_i + bs*cidx + ridx] = value;  \
140       a_noinsert:; \
141     ailen[brow] = nrow; \
142 }
143 
144 #define  MatSetValues_SeqBAIJ_B_Private(row,col,value,addv) \
145 { \
146  \
147     brow = row/bs;  \
148     rp   = bj + bi[brow]; ap = ba + bs2*bi[brow]; \
149     rmax = bimax[brow]; nrow = bilen[brow]; \
150       bcol = col/bs; \
151       ridx = row % bs; cidx = col % bs; \
152       low = 0; high = nrow; \
153       while (high-low > 3) { \
154         t = (low+high)/2; \
155         if (rp[t] > bcol) high = t; \
156         else              low  = t; \
157       } \
158       for ( _i=low; _i<high; _i++ ) { \
159         if (rp[_i] > bcol) break; \
160         if (rp[_i] == bcol) { \
161           bap  = ap +  bs2*_i + bs*cidx + ridx; \
162           if (addv == ADD_VALUES) *bap += value;  \
163           else                    *bap  = value;  \
164           goto b_noinsert; \
165         } \
166       } \
167       if (b->nonew) goto b_noinsert; \
168       if (nrow >= rmax) { \
169         /* there is no extra room in row, therefore enlarge */ \
170         int    new_nz = bi[b->mbs] + CHUNKSIZE,len,*new_i,*new_j; \
171         Scalar *new_a; \
172  \
173         /* malloc new storage space */ \
174         len     = new_nz*(sizeof(int)+bs2*sizeof(Scalar))+(b->mbs+1)*sizeof(int); \
175         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
176         new_j   = (int *) (new_a + bs2*new_nz); \
177         new_i   = new_j + new_nz; \
178  \
179         /* copy over old data into new slots */ \
180         for ( ii=0; ii<brow+1; ii++ ) {new_i[ii] = bi[ii];} \
181         for ( ii=brow+1; ii<b->mbs+1; ii++ ) {new_i[ii] = bi[ii]+CHUNKSIZE;} \
182         PetscMemcpy(new_j,bj,(bi[brow]+nrow)*sizeof(int)); \
183         len = (new_nz - CHUNKSIZE - bi[brow] - nrow); \
184         PetscMemcpy(new_j+bi[brow]+nrow+CHUNKSIZE,bj+bi[brow]+nrow, \
185                                                            len*sizeof(int)); \
186         PetscMemcpy(new_a,ba,(bi[brow]+nrow)*bs2*sizeof(Scalar)); \
187         PetscMemzero(new_a+bs2*(bi[brow]+nrow),bs2*CHUNKSIZE*sizeof(Scalar)); \
188         PetscMemcpy(new_a+bs2*(bi[brow]+nrow+CHUNKSIZE), \
189                     ba+bs2*(bi[brow]+nrow),bs2*len*sizeof(Scalar));  \
190         /* free up old matrix storage */ \
191         PetscFree(b->a);  \
192         if (!b->singlemalloc) {PetscFree(b->i);PetscFree(b->j);} \
193         ba = b->a = new_a; bi = b->i = new_i; bj = b->j = new_j;  \
194         b->singlemalloc = 1; \
195  \
196         rp   = bj + bi[brow]; ap = ba + bs2*bi[brow]; \
197         rmax = bimax[brow] = bimax[brow] + CHUNKSIZE; \
198         PLogObjectMemory(B,CHUNKSIZE*(sizeof(int) + bs2*sizeof(Scalar))); \
199         b->maxnz += bs2*CHUNKSIZE; \
200         b->reallocs++; \
201         b->nz++; \
202       } \
203       N = nrow++ - 1;  \
204       /* shift up all the later entries in this row */ \
205       for ( ii=N; ii>=_i; ii-- ) { \
206         rp[ii+1] = rp[ii]; \
207         PetscMemcpy(ap+bs2*(ii+1),ap+bs2*(ii),bs2*sizeof(Scalar)); \
208       } \
209       if (N>=_i) PetscMemzero(ap+bs2*_i,bs2*sizeof(Scalar));  \
210       rp[_i]                      = bcol;  \
211       ap[bs2*_i + bs*cidx + ridx] = value;  \
212       b_noinsert:; \
213     bilen[brow] = nrow; \
214 }
215 
216 #undef __FUNC__
217 #define __FUNC__ "MatSetValues_MPIBAIJ"
218 int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
219 {
220   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
221   Scalar      value;
222   int         ierr,i,j,row,col;
223   int         roworiented = baij->roworiented,rstart_orig=baij->rstart_bs ;
224   int         rend_orig=baij->rend_bs,cstart_orig=baij->cstart_bs;
225   int         cend_orig=baij->cend_bs,bs=baij->bs;
226 
227   /* Some Variables required in the macro */
228   Mat         A = baij->A;
229   Mat_SeqBAIJ *a = (Mat_SeqBAIJ *) (A)->data;
230   int         *aimax=a->imax,*ai=a->i,*ailen=a->ilen,*aj=a->j;
231   Scalar      *aa=a->a;
232 
233   Mat         B = baij->B;
234   Mat_SeqBAIJ *b = (Mat_SeqBAIJ *) (B)->data;
235   int         *bimax=b->imax,*bi=b->i,*bilen=b->ilen,*bj=b->j;
236   Scalar      *ba=b->a;
237 
238   int         *rp,ii,nrow,_i,rmax,N,brow,bcol;
239   int         low,high,t,ridx,cidx,bs2=a->bs2;
240   Scalar      *ap,*bap;
241 
242   for ( i=0; i<m; i++ ) {
243 #if defined(PETSC_BOPT_g)
244     if (im[i] < 0) SETERRQ(1,0,"Negative row");
245     if (im[i] >= baij->M) SETERRQ(1,0,"Row too large");
246 #endif
247     if (im[i] >= rstart_orig && im[i] < rend_orig) {
248       row = im[i] - rstart_orig;
249       for ( j=0; j<n; j++ ) {
250         if (in[j] >= cstart_orig && in[j] < cend_orig){
251           col = in[j] - cstart_orig;
252           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
253           MatSetValues_SeqBAIJ_A_Private(row,col,value,addv);
254           /* ierr = MatSetValues_SeqBAIJ(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
255         }
256 #if defined(PETSC_BOPT_g)
257         else if (in[j] < 0) {SETERRQ(1,0,"Negative column");}
258         else if (in[j] >= baij->N) {SETERRQ(1,0,"Col too large");}
259 #endif
260         else {
261           if (mat->was_assembled) {
262             if (!baij->colmap) {
263               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
264             }
265             col = baij->colmap[in[j]/bs] - 1 + in[j]%bs;
266             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
267               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
268               col =  in[j];
269             }
270           }
271           else col = in[j];
272           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
273           MatSetValues_SeqBAIJ_B_Private(row,col,value,addv);
274           /* ierr = MatSetValues_SeqBAIJ(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
275         }
276       }
277     }
278     else {
279       if (roworiented && !baij->donotstash) {
280         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
281       }
282       else {
283         if (!baij->donotstash) {
284           row = im[i];
285 	  for ( j=0; j<n; j++ ) {
286 	    ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
287           }
288         }
289       }
290     }
291   }
292   return 0;
293 }
294 
295 extern int MatSetValuesBlocked_SeqBAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
296 #undef __FUNC__
297 #define __FUNC__ "MatSetValuesBlocked_MPIBAIJ"
298 int MatSetValuesBlocked_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
299 {
300   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
301   Scalar      *value,*barray=baij->barray;
302   int         ierr,i,j,ii,jj,row,col,k,l;
303   int         roworiented = baij->roworiented,rstart=baij->rstart ;
304   int         rend=baij->rend,cstart=baij->cstart,stepval;
305   int         cend=baij->cend,bs=baij->bs,bs2=baij->bs2;
306 
307 
308   if(!barray) {
309     barray = (Scalar*) PetscMalloc(bs2*sizeof(Scalar)); CHKPTRQ(barray);
310     baij->barray = barray;
311   }
312 
313   if (roworiented) {
314     stepval = (n-1)*bs;
315   } else {
316     stepval = (m-1)*bs;
317   }
318   for ( i=0; i<m; i++ ) {
319 #if defined(PETSC_BOPT_g)
320     if (im[i] < 0) SETERRQ(1,0,"Negative row");
321     if (im[i] >= baij->Mbs) SETERRQ(1,0,"Row too large");
322 #endif
323     if (im[i] >= rstart && im[i] < rend) {
324       row = im[i] - rstart;
325       for ( j=0; j<n; j++ ) {
326         /* If NumCol = 1 then a copy is not required */
327         if ((roworiented) && (n == 1)) {
328           barray = v + i*bs2;
329         } else if((!roworiented) && (m == 1)) {
330           barray = v + j*bs2;
331         } else { /* Here a copy is required */
332           if (roworiented) {
333             value = v + i*(stepval+bs)*bs + j*bs;
334           } else {
335             value = v + j*(stepval+bs)*bs + i*bs;
336           }
337           for ( ii=0; ii<bs; ii++,value+=stepval )
338             for (jj=0; jj<bs; jj++ )
339               *barray++  = *value++;
340           barray -=bs2;
341         }
342 
343         if (in[j] >= cstart && in[j] < cend){
344           col = in[j] - cstart;
345           ierr = MatSetValuesBlocked_SeqBAIJ(baij->A,1,&row,1,&col,barray,addv);CHKERRQ(ierr);
346         }
347 #if defined(PETSC_BOPT_g)
348         else if (in[j] < 0) {SETERRQ(1,0,"Negative column");}
349         else if (in[j] >= baij->Nbs) {SETERRQ(1,0,"Col too large");}
350 #endif
351         else {
352           if (mat->was_assembled) {
353             if (!baij->colmap) {
354               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
355             }
356             col = baij->colmap[in[j]] - 1;
357             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
358               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
359               col =  in[j];
360             }
361           }
362           else col = in[j];
363           ierr = MatSetValuesBlocked_SeqBAIJ(baij->B,1,&row,1,&col,barray,addv);CHKERRQ(ierr);
364         }
365       }
366     }
367     else {
368       if (!baij->donotstash) {
369         if (roworiented ) {
370           row   = im[i]*bs;
371           value = v + i*(stepval+bs)*bs;
372           for ( j=0; j<bs; j++,row++ ) {
373             for ( k=0; k<n; k++ ) {
374               for ( col=in[k]*bs,l=0; l<bs; l++,col++) {
375                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
376               }
377             }
378           }
379         }
380         else {
381           for ( j=0; j<n; j++ ) {
382             value = v + j*(stepval+bs)*bs + i*bs;
383             col   = in[j]*bs;
384             for ( k=0; k<bs; k++,col++,value+=stepval) {
385               for ( row = im[i]*bs,l=0; l<bs; l++,row++) {
386                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
387               }
388             }
389           }
390 
391         }
392       }
393     }
394   }
395   return 0;
396 }
397 
398 #undef __FUNC__
399 #define __FUNC__ "MatGetValues_MPIBAIJ"
400 int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
401 {
402   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
403   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
404   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col;
405 
406   for ( i=0; i<m; i++ ) {
407     if (idxm[i] < 0) SETERRQ(1,0,"Negative row");
408     if (idxm[i] >= baij->M) SETERRQ(1,0,"Row too large");
409     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
410       row = idxm[i] - bsrstart;
411       for ( j=0; j<n; j++ ) {
412         if (idxn[j] < 0) SETERRQ(1,0,"Negative column");
413         if (idxn[j] >= baij->N) SETERRQ(1,0,"Col too large");
414         if (idxn[j] >= bscstart && idxn[j] < bscend){
415           col = idxn[j] - bscstart;
416           ierr = MatGetValues(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
417         }
418         else {
419           if (!baij->colmap) {
420             ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
421           }
422           if((baij->colmap[idxn[j]/bs]-1 < 0) ||
423              (baij->garray[(baij->colmap[idxn[j]/bs]-1)/bs] != idxn[j]/bs)) *(v+i*n+j) = 0.0;
424           else {
425             col  = (baij->colmap[idxn[j]/bs]-1) + idxn[j]%bs;
426             ierr = MatGetValues(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
427           }
428         }
429       }
430     }
431     else {
432       SETERRQ(1,0,"Only local values currently supported");
433     }
434   }
435   return 0;
436 }
437 
438 #undef __FUNC__
439 #define __FUNC__ "MatNorm_MPIBAIJ"
440 int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
441 {
442   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
443   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
444   int        ierr, i,bs2=baij->bs2;
445   double     sum = 0.0;
446   Scalar     *v;
447 
448   if (baij->size == 1) {
449     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
450   } else {
451     if (type == NORM_FROBENIUS) {
452       v = amat->a;
453       for (i=0; i<amat->nz*bs2; i++ ) {
454 #if defined(PETSC_COMPLEX)
455         sum += real(conj(*v)*(*v)); v++;
456 #else
457         sum += (*v)*(*v); v++;
458 #endif
459       }
460       v = bmat->a;
461       for (i=0; i<bmat->nz*bs2; i++ ) {
462 #if defined(PETSC_COMPLEX)
463         sum += real(conj(*v)*(*v)); v++;
464 #else
465         sum += (*v)*(*v); v++;
466 #endif
467       }
468       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
469       *norm = sqrt(*norm);
470     }
471     else
472       SETERRQ(PETSC_ERR_SUP,0,"No support for this norm yet");
473   }
474   return 0;
475 }
476 
477 #undef __FUNC__
478 #define __FUNC__ "MatAssemblyBegin_MPIBAIJ"
479 int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
480 {
481   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
482   MPI_Comm    comm = mat->comm;
483   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
484   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
485   MPI_Request *send_waits,*recv_waits;
486   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
487   InsertMode  addv;
488   Scalar      *rvalues,*svalues;
489 
490   /* make sure all processors are either in INSERTMODE or ADDMODE */
491   MPI_Allreduce(&mat->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
492   if (addv == (ADD_VALUES|INSERT_VALUES)) {
493     SETERRQ(1,0,"Some processors inserted others added");
494   }
495   mat->insertmode = addv; /* in case this processor had no cache */
496 
497   /*  first count number of contributors to each processor */
498   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
499   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
500   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
501   for ( i=0; i<baij->stash.n; i++ ) {
502     idx = baij->stash.idx[i];
503     for ( j=0; j<size; j++ ) {
504       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
505         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
506       }
507     }
508   }
509   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
510 
511   /* inform other processors of number of messages and max length*/
512   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
513   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
514   nreceives = work[rank];
515   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
516   nmax = work[rank];
517   PetscFree(work);
518 
519   /* post receives:
520        1) each message will consist of ordered pairs
521      (global index,value) we store the global index as a double
522      to simplify the message passing.
523        2) since we don't know how long each individual message is we
524      allocate the largest needed buffer for each receive. Potentially
525      this is a lot of wasted space.
526 
527 
528        This could be done better.
529   */
530   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
531   CHKPTRQ(rvalues);
532   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
533   CHKPTRQ(recv_waits);
534   for ( i=0; i<nreceives; i++ ) {
535     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
536               comm,recv_waits+i);
537   }
538 
539   /* do sends:
540       1) starts[i] gives the starting index in svalues for stuff going to
541          the ith processor
542   */
543   svalues = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
544   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
545   CHKPTRQ(send_waits);
546   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
547   starts[0] = 0;
548   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
549   for ( i=0; i<baij->stash.n; i++ ) {
550     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
551     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
552     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
553   }
554   PetscFree(owner);
555   starts[0] = 0;
556   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
557   count = 0;
558   for ( i=0; i<size; i++ ) {
559     if (procs[i]) {
560       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
561                 comm,send_waits+count++);
562     }
563   }
564   PetscFree(starts); PetscFree(nprocs);
565 
566   /* Free cache space */
567   PLogInfo(mat,"MatAssemblyBegin_MPIBAIJ:Number of off-processor values %d\n",baij->stash.n);
568   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
569 
570   baij->svalues    = svalues;    baij->rvalues    = rvalues;
571   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
572   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
573   baij->rmax       = nmax;
574 
575   return 0;
576 }
577 
578 
579 #undef __FUNC__
580 #define __FUNC__ "MatAssemblyEnd_MPIBAIJ"
581 int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
582 {
583   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
584   MPI_Status  *send_status,recv_status;
585   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
586   int         bs=baij->bs,row,col,other_disassembled;
587   Scalar      *values,val;
588   InsertMode  addv = mat->insertmode;
589 
590   /*  wait on receives */
591   while (count) {
592     MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);
593     /* unpack receives into our local space */
594     values = baij->rvalues + 3*imdex*baij->rmax;
595     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
596     n = n/3;
597     for ( i=0; i<n; i++ ) {
598       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
599       col = (int) PetscReal(values[3*i+1]);
600       val = values[3*i+2];
601       if (col >= baij->cstart*bs && col < baij->cend*bs) {
602         col -= baij->cstart*bs;
603         MatSetValues(baij->A,1,&row,1,&col,&val,addv);
604       }
605       else {
606         if (mat->was_assembled) {
607           if (!baij->colmap) {
608             ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);
609           }
610           col = (baij->colmap[col/bs]-1)*bs + col%bs;
611           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
612             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
613             col = (int) PetscReal(values[3*i+1]);
614           }
615         }
616         MatSetValues(baij->B,1,&row,1,&col,&val,addv);
617       }
618     }
619     count--;
620   }
621   PetscFree(baij->recv_waits); PetscFree(baij->rvalues);
622 
623   /* wait on sends */
624   if (baij->nsends) {
625     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));
626     CHKPTRQ(send_status);
627     MPI_Waitall(baij->nsends,baij->send_waits,send_status);
628     PetscFree(send_status);
629   }
630   PetscFree(baij->send_waits); PetscFree(baij->svalues);
631 
632   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
633   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
634 
635   /* determine if any processor has disassembled, if so we must
636      also disassemble ourselfs, in order that we may reassemble. */
637   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
638   if (mat->was_assembled && !other_disassembled) {
639     ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
640   }
641 
642   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
643     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
644   }
645   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
646   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
647 
648   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
649   return 0;
650 }
651 
652 #undef __FUNC__
653 #define __FUNC__ "MatView_MPIBAIJ_Binary"
654 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
655 {
656   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
657   int          ierr;
658 
659   if (baij->size == 1) {
660     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
661   }
662   else SETERRQ(1,0,"Only uniprocessor output supported");
663   return 0;
664 }
665 
666 #undef __FUNC__
667 #define __FUNC__ "MatView_MPIBAIJ_ASCIIorDraworMatlab"
668 static int MatView_MPIBAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
669 {
670   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
671   int          ierr, format,rank,bs = baij->bs;
672   FILE         *fd;
673   ViewerType   vtype;
674 
675   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
676   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
677     ierr = ViewerGetFormat(viewer,&format);
678     if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
679       MatInfo info;
680       MPI_Comm_rank(mat->comm,&rank);
681       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
682       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
683       PetscSequentialPhaseBegin(mat->comm,1);
684       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
685               rank,baij->m,(int)info.nz_used*bs,(int)info.nz_allocated*bs,
686               baij->bs,(int)info.memory);
687       ierr = MatGetInfo(baij->A,MAT_LOCAL,&info);
688       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
689       ierr = MatGetInfo(baij->B,MAT_LOCAL,&info);
690       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
691       fflush(fd);
692       PetscSequentialPhaseEnd(mat->comm,1);
693       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
694       return 0;
695     }
696     else if (format == VIEWER_FORMAT_ASCII_INFO) {
697       PetscPrintf(mat->comm,"  block size is %d\n",bs);
698       return 0;
699     }
700   }
701 
702   if (vtype == DRAW_VIEWER) {
703     Draw       draw;
704     PetscTruth isnull;
705     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
706     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
707   }
708 
709   if (vtype == ASCII_FILE_VIEWER) {
710     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
711     PetscSequentialPhaseBegin(mat->comm,1);
712     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
713            baij->rank,baij->m,baij->rstart*bs,baij->rend*bs,baij->n,
714             baij->cstart*bs,baij->cend*bs);
715     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
716     ierr = MatView(baij->B,viewer); CHKERRQ(ierr);
717     fflush(fd);
718     PetscSequentialPhaseEnd(mat->comm,1);
719   }
720   else {
721     int size = baij->size;
722     rank = baij->rank;
723     if (size == 1) {
724       ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
725     }
726     else {
727       /* assemble the entire matrix onto first processor. */
728       Mat         A;
729       Mat_SeqBAIJ *Aloc;
730       int         M = baij->M, N = baij->N,*ai,*aj,row,col,i,j,k,*rvals;
731       int         mbs=baij->mbs;
732       Scalar      *a;
733 
734       if (!rank) {
735         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
736         CHKERRQ(ierr);
737       }
738       else {
739         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
740         CHKERRQ(ierr);
741       }
742       PLogObjectParent(mat,A);
743 
744       /* copy over the A part */
745       Aloc = (Mat_SeqBAIJ*) baij->A->data;
746       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
747       row = baij->rstart;
748       rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
749 
750       for ( i=0; i<mbs; i++ ) {
751         rvals[0] = bs*(baij->rstart + i);
752         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
753         for ( j=ai[i]; j<ai[i+1]; j++ ) {
754           col = (baij->cstart+aj[j])*bs;
755           for (k=0; k<bs; k++ ) {
756             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
757             col++; a += bs;
758           }
759         }
760       }
761       /* copy over the B part */
762       Aloc = (Mat_SeqBAIJ*) baij->B->data;
763       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
764       row = baij->rstart*bs;
765       for ( i=0; i<mbs; i++ ) {
766         rvals[0] = bs*(baij->rstart + i);
767         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
768         for ( j=ai[i]; j<ai[i+1]; j++ ) {
769           col = baij->garray[aj[j]]*bs;
770           for (k=0; k<bs; k++ ) {
771             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
772             col++; a += bs;
773           }
774         }
775       }
776       PetscFree(rvals);
777       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
778       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
779       if (!rank) {
780         ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
781       }
782       ierr = MatDestroy(A); CHKERRQ(ierr);
783     }
784   }
785   return 0;
786 }
787 
788 
789 
790 #undef __FUNC__
791 #define __FUNC__ "MatView_MPIBAIJ"
792 int MatView_MPIBAIJ(PetscObject obj,Viewer viewer)
793 {
794   Mat         mat = (Mat) obj;
795   int         ierr;
796   ViewerType  vtype;
797 
798   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
799   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
800       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
801     ierr = MatView_MPIBAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
802   }
803   else if (vtype == BINARY_FILE_VIEWER) {
804     return MatView_MPIBAIJ_Binary(mat,viewer);
805   }
806   return 0;
807 }
808 
809 #undef __FUNC__
810 #define __FUNC__ "MatDestroy_MPIBAIJ"
811 int MatDestroy_MPIBAIJ(PetscObject obj)
812 {
813   Mat         mat = (Mat) obj;
814   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
815   int         ierr;
816 
817 #if defined(PETSC_LOG)
818   PLogObjectState(obj,"Rows=%d, Cols=%d",baij->M,baij->N);
819 #endif
820 
821   PetscFree(baij->rowners);
822   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
823   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
824   if (baij->colmap) PetscFree(baij->colmap);
825   if (baij->garray) PetscFree(baij->garray);
826   if (baij->lvec)   VecDestroy(baij->lvec);
827   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
828   if (baij->rowvalues) PetscFree(baij->rowvalues);
829   if (baij->barray) PetscFree(baij->barray);
830   PetscFree(baij);
831   if (mat->mapping) {
832     ierr = ISLocalToGlobalMappingDestroy(mat->mapping); CHKERRQ(ierr);
833   }
834   PLogObjectDestroy(mat);
835   PetscHeaderDestroy(mat);
836   return 0;
837 }
838 
839 #undef __FUNC__
840 #define __FUNC__ "MatMult_MPIBAIJ"
841 int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
842 {
843   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
844   int         ierr, nt;
845 
846   VecGetLocalSize_Fast(xx,nt);
847   if (nt != a->n) {
848     SETERRQ(1,0,"Incompatible partition of A and xx");
849   }
850   VecGetLocalSize_Fast(yy,nt);
851   if (nt != a->m) {
852     SETERRQ(1,0,"Incompatible parition of A and yy");
853   }
854   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
855   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
856   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
857   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
858   ierr = VecScatterPostRecvs(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
859   return 0;
860 }
861 
862 #undef __FUNC__
863 #define __FUNC__ "MatMultAdd_MPIBAIJ"
864 int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
865 {
866   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
867   int        ierr;
868   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
869   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
870   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
871   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
872   return 0;
873 }
874 
875 #undef __FUNC__
876 #define __FUNC__ "MatMultTrans_MPIBAIJ"
877 int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
878 {
879   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
880   int        ierr;
881 
882   /* do nondiagonal part */
883   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
884   /* send it on its way */
885   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
886   /* do local part */
887   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
888   /* receive remote parts: note this assumes the values are not actually */
889   /* inserted in yy until the next line, which is true for my implementation*/
890   /* but is not perhaps always true. */
891   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
892   return 0;
893 }
894 
895 #undef __FUNC__
896 #define __FUNC__ "MatMultTransAdd_MPIBAIJ"
897 int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
898 {
899   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
900   int        ierr;
901 
902   /* do nondiagonal part */
903   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
904   /* send it on its way */
905   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
906   /* do local part */
907   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
908   /* receive remote parts: note this assumes the values are not actually */
909   /* inserted in yy until the next line, which is true for my implementation*/
910   /* but is not perhaps always true. */
911   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
912   return 0;
913 }
914 
915 /*
916   This only works correctly for square matrices where the subblock A->A is the
917    diagonal block
918 */
919 #undef __FUNC__
920 #define __FUNC__ "MatGetDiagonal_MPIBAIJ"
921 int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
922 {
923   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
924   if (a->M != a->N)
925     SETERRQ(1,0,"Supports only square matrix where A->A is diag block");
926   return MatGetDiagonal(a->A,v);
927 }
928 
929 #undef __FUNC__
930 #define __FUNC__ "MatScale_MPIBAIJ"
931 int MatScale_MPIBAIJ(Scalar *aa,Mat A)
932 {
933   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
934   int        ierr;
935   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
936   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
937   return 0;
938 }
939 
940 #undef __FUNC__
941 #define __FUNC__ "MatGetSize_MPIBAIJ"
942 int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
943 {
944   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
945   *m = mat->M; *n = mat->N;
946   return 0;
947 }
948 
949 #undef __FUNC__
950 #define __FUNC__ "MatGetLocalSize_MPIBAIJ"
951 int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
952 {
953   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
954   *m = mat->m; *n = mat->N;
955   return 0;
956 }
957 
958 #undef __FUNC__
959 #define __FUNC__ "MatGetOwnershipRange_MPIBAIJ"
960 int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
961 {
962   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
963   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
964   return 0;
965 }
966 
967 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
968 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
969 
970 #undef __FUNC__
971 #define __FUNC__ "MatGetRow_MPIBAIJ"
972 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
973 {
974   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
975   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
976   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
977   int        nztot, nzA, nzB, lrow, brstart = mat->rstart*bs, brend = mat->rend*bs;
978   int        *cmap, *idx_p,cstart = mat->cstart;
979 
980   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,0,"Already active");
981   mat->getrowactive = PETSC_TRUE;
982 
983   if (!mat->rowvalues && (idx || v)) {
984     /*
985         allocate enough space to hold information from the longest row.
986     */
987     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
988     int     max = 1,mbs = mat->mbs,tmp;
989     for ( i=0; i<mbs; i++ ) {
990       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
991       if (max < tmp) { max = tmp; }
992     }
993     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
994     CHKPTRQ(mat->rowvalues);
995     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
996   }
997 
998 
999   if (row < brstart || row >= brend) SETERRQ(1,0,"Only local rows")
1000   lrow = row - brstart;
1001 
1002   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
1003   if (!v)   {pvA = 0; pvB = 0;}
1004   if (!idx) {pcA = 0; if (!v) pcB = 0;}
1005   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1006   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1007   nztot = nzA + nzB;
1008 
1009   cmap  = mat->garray;
1010   if (v  || idx) {
1011     if (nztot) {
1012       /* Sort by increasing column numbers, assuming A and B already sorted */
1013       int imark = -1;
1014       if (v) {
1015         *v = v_p = mat->rowvalues;
1016         for ( i=0; i<nzB; i++ ) {
1017           if (cmap[cworkB[i]/bs] < cstart)   v_p[i] = vworkB[i];
1018           else break;
1019         }
1020         imark = i;
1021         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
1022         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
1023       }
1024       if (idx) {
1025         *idx = idx_p = mat->rowindices;
1026         if (imark > -1) {
1027           for ( i=0; i<imark; i++ ) {
1028             idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs;
1029           }
1030         } else {
1031           for ( i=0; i<nzB; i++ ) {
1032             if (cmap[cworkB[i]/bs] < cstart)
1033               idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
1034             else break;
1035           }
1036           imark = i;
1037         }
1038         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart*bs + cworkA[i];
1039         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
1040       }
1041     }
1042     else {
1043       if (idx) *idx = 0;
1044       if (v)   *v   = 0;
1045     }
1046   }
1047   *nz = nztot;
1048   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1049   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1050   return 0;
1051 }
1052 
1053 #undef __FUNC__
1054 #define __FUNC__ "MatRestoreRow_MPIBAIJ"
1055 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
1056 {
1057   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1058   if (baij->getrowactive == PETSC_FALSE) {
1059     SETERRQ(1,0,"MatGetRow not called");
1060   }
1061   baij->getrowactive = PETSC_FALSE;
1062   return 0;
1063 }
1064 
1065 #undef __FUNC__
1066 #define __FUNC__ "MatGetBlockSize_MPIBAIJ"
1067 int MatGetBlockSize_MPIBAIJ(Mat mat,int *bs)
1068 {
1069   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1070   *bs = baij->bs;
1071   return 0;
1072 }
1073 
1074 #undef __FUNC__
1075 #define __FUNC__ "MatZeroEntries_MPIBAIJ"
1076 int MatZeroEntries_MPIBAIJ(Mat A)
1077 {
1078   Mat_MPIBAIJ *l = (Mat_MPIBAIJ *) A->data;
1079   int         ierr;
1080   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
1081   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
1082   return 0;
1083 }
1084 
1085 #undef __FUNC__
1086 #define __FUNC__ "MatGetInfo_MPIBAIJ"
1087 int MatGetInfo_MPIBAIJ(Mat matin,MatInfoType flag,MatInfo *info)
1088 {
1089   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) matin->data;
1090   Mat         A = a->A, B = a->B;
1091   int         ierr;
1092   double      isend[5], irecv[5];
1093 
1094   info->rows_global    = (double)a->M;
1095   info->columns_global = (double)a->N;
1096   info->rows_local     = (double)a->m;
1097   info->columns_local  = (double)a->N;
1098   info->block_size     = (double)a->bs;
1099   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
1100   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->memory;
1101   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
1102   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->memory;
1103   if (flag == MAT_LOCAL) {
1104     info->nz_used      = isend[0];
1105     info->nz_allocated = isend[1];
1106     info->nz_unneeded  = isend[2];
1107     info->memory       = isend[3];
1108     info->mallocs      = isend[4];
1109   } else if (flag == MAT_GLOBAL_MAX) {
1110     MPI_Allreduce(isend,irecv,5,MPI_INT,MPI_MAX,matin->comm);
1111     info->nz_used      = irecv[0];
1112     info->nz_allocated = irecv[1];
1113     info->nz_unneeded  = irecv[2];
1114     info->memory       = irecv[3];
1115     info->mallocs      = irecv[4];
1116   } else if (flag == MAT_GLOBAL_SUM) {
1117     MPI_Allreduce(isend,irecv,5,MPI_INT,MPI_SUM,matin->comm);
1118     info->nz_used      = irecv[0];
1119     info->nz_allocated = irecv[1];
1120     info->nz_unneeded  = irecv[2];
1121     info->memory       = irecv[3];
1122     info->mallocs      = irecv[4];
1123   }
1124   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
1125   info->fill_ratio_needed = 0;
1126   info->factor_mallocs    = 0;
1127   return 0;
1128 }
1129 
1130 #undef __FUNC__
1131 #define __FUNC__ "MatSetOption_MPIBAIJ"
1132 int MatSetOption_MPIBAIJ(Mat A,MatOption op)
1133 {
1134   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1135 
1136   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
1137       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
1138       op == MAT_COLUMNS_UNSORTED ||
1139       op == MAT_COLUMNS_SORTED ||
1140       op == MAT_NEW_NONZERO_LOCATION_ERROR) {
1141         MatSetOption(a->A,op);
1142         MatSetOption(a->B,op);
1143   } else if (op == MAT_ROW_ORIENTED) {
1144         a->roworiented = 1;
1145         MatSetOption(a->A,op);
1146         MatSetOption(a->B,op);
1147   } else if (op == MAT_ROWS_SORTED ||
1148              op == MAT_ROWS_UNSORTED ||
1149              op == MAT_SYMMETRIC ||
1150              op == MAT_STRUCTURALLY_SYMMETRIC ||
1151              op == MAT_YES_NEW_DIAGONALS)
1152     PLogInfo(A,"Info:MatSetOption_MPIBAIJ:Option ignored\n");
1153   else if (op == MAT_COLUMN_ORIENTED) {
1154     a->roworiented = 0;
1155     MatSetOption(a->A,op);
1156     MatSetOption(a->B,op);
1157   } else if (op == MAT_IGNORE_OFF_PROC_ENTRIES) {
1158     a->donotstash = 1;
1159   } else if (op == MAT_NO_NEW_DIAGONALS)
1160     {SETERRQ(PETSC_ERR_SUP,0,"MAT_NO_NEW_DIAGONALS");}
1161   else
1162     {SETERRQ(PETSC_ERR_SUP,0,"unknown option");}
1163   return 0;
1164 }
1165 
1166 #undef __FUNC__
1167 #define __FUNC__ "MatTranspose_MPIBAIJ("
1168 int MatTranspose_MPIBAIJ(Mat A,Mat *matout)
1169 {
1170   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) A->data;
1171   Mat_SeqBAIJ *Aloc;
1172   Mat        B;
1173   int        ierr,M=baij->M,N=baij->N,*ai,*aj,row,i,*rvals,j,k,col;
1174   int        bs=baij->bs,mbs=baij->mbs;
1175   Scalar     *a;
1176 
1177   if (matout == PETSC_NULL && M != N)
1178     SETERRQ(1,0,"Square matrix only for in-place");
1179   ierr = MatCreateMPIBAIJ(A->comm,baij->bs,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);
1180   CHKERRQ(ierr);
1181 
1182   /* copy over the A part */
1183   Aloc = (Mat_SeqBAIJ*) baij->A->data;
1184   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1185   row = baij->rstart;
1186   rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
1187 
1188   for ( i=0; i<mbs; i++ ) {
1189     rvals[0] = bs*(baij->rstart + i);
1190     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1191     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1192       col = (baij->cstart+aj[j])*bs;
1193       for (k=0; k<bs; k++ ) {
1194         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1195         col++; a += bs;
1196       }
1197     }
1198   }
1199   /* copy over the B part */
1200   Aloc = (Mat_SeqBAIJ*) baij->B->data;
1201   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1202   row = baij->rstart*bs;
1203   for ( i=0; i<mbs; i++ ) {
1204     rvals[0] = bs*(baij->rstart + i);
1205     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1206     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1207       col = baij->garray[aj[j]]*bs;
1208       for (k=0; k<bs; k++ ) {
1209         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1210         col++; a += bs;
1211       }
1212     }
1213   }
1214   PetscFree(rvals);
1215   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1216   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1217 
1218   if (matout != PETSC_NULL) {
1219     *matout = B;
1220   } else {
1221     /* This isn't really an in-place transpose .... but free data structures from baij */
1222     PetscFree(baij->rowners);
1223     ierr = MatDestroy(baij->A); CHKERRQ(ierr);
1224     ierr = MatDestroy(baij->B); CHKERRQ(ierr);
1225     if (baij->colmap) PetscFree(baij->colmap);
1226     if (baij->garray) PetscFree(baij->garray);
1227     if (baij->lvec) VecDestroy(baij->lvec);
1228     if (baij->Mvctx) VecScatterDestroy(baij->Mvctx);
1229     PetscFree(baij);
1230     PetscMemcpy(A,B,sizeof(struct _Mat));
1231     PetscHeaderDestroy(B);
1232   }
1233   return 0;
1234 }
1235 
1236 #undef __FUNC__
1237 #define __FUNC__ "MatDiagonalScale_MPIBAIJ"
1238 int MatDiagonalScale_MPIBAIJ(Mat A,Vec ll,Vec rr)
1239 {
1240   Mat a = ((Mat_MPIBAIJ *) A->data)->A;
1241   Mat b = ((Mat_MPIBAIJ *) A->data)->B;
1242   int ierr,s1,s2,s3;
1243 
1244   if (ll)  {
1245     ierr = VecGetLocalSize(ll,&s1); CHKERRQ(ierr);
1246     ierr = MatGetLocalSize(A,&s2,&s3); CHKERRQ(ierr);
1247     if (s1!=s2) SETERRQ(1,0,"non-conforming local sizes");
1248     ierr = MatDiagonalScale(a,ll,0); CHKERRQ(ierr);
1249     ierr = MatDiagonalScale(b,ll,0); CHKERRQ(ierr);
1250   }
1251   if (rr) SETERRQ(1,0,"not supported for right vector");
1252   return 0;
1253 }
1254 
1255 /* the code does not do the diagonal entries correctly unless the
1256    matrix is square and the column and row owerships are identical.
1257    This is a BUG. The only way to fix it seems to be to access
1258    baij->A and baij->B directly and not through the MatZeroRows()
1259    routine.
1260 */
1261 #undef __FUNC__
1262 #define __FUNC__ "MatZeroRows_MPIBAIJ"
1263 int MatZeroRows_MPIBAIJ(Mat A,IS is,Scalar *diag)
1264 {
1265   Mat_MPIBAIJ    *l = (Mat_MPIBAIJ *) A->data;
1266   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
1267   int            *procs,*nprocs,j,found,idx,nsends,*work;
1268   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
1269   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
1270   int            *lens,imdex,*lrows,*values,bs=l->bs;
1271   MPI_Comm       comm = A->comm;
1272   MPI_Request    *send_waits,*recv_waits;
1273   MPI_Status     recv_status,*send_status;
1274   IS             istmp;
1275 
1276   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
1277   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
1278 
1279   /*  first count number of contributors to each processor */
1280   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
1281   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
1282   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
1283   for ( i=0; i<N; i++ ) {
1284     idx = rows[i];
1285     found = 0;
1286     for ( j=0; j<size; j++ ) {
1287       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
1288         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
1289       }
1290     }
1291     if (!found) SETERRQ(1,0,"Index out of range");
1292   }
1293   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
1294 
1295   /* inform other processors of number of messages and max length*/
1296   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
1297   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
1298   nrecvs = work[rank];
1299   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
1300   nmax = work[rank];
1301   PetscFree(work);
1302 
1303   /* post receives:   */
1304   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
1305   CHKPTRQ(rvalues);
1306   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
1307   CHKPTRQ(recv_waits);
1308   for ( i=0; i<nrecvs; i++ ) {
1309     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
1310   }
1311 
1312   /* do sends:
1313       1) starts[i] gives the starting index in svalues for stuff going to
1314          the ith processor
1315   */
1316   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
1317   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
1318   CHKPTRQ(send_waits);
1319   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
1320   starts[0] = 0;
1321   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1322   for ( i=0; i<N; i++ ) {
1323     svalues[starts[owner[i]]++] = rows[i];
1324   }
1325   ISRestoreIndices(is,&rows);
1326 
1327   starts[0] = 0;
1328   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1329   count = 0;
1330   for ( i=0; i<size; i++ ) {
1331     if (procs[i]) {
1332       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
1333     }
1334   }
1335   PetscFree(starts);
1336 
1337   base = owners[rank]*bs;
1338 
1339   /*  wait on receives */
1340   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
1341   source = lens + nrecvs;
1342   count  = nrecvs; slen = 0;
1343   while (count) {
1344     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
1345     /* unpack receives into our local space */
1346     MPI_Get_count(&recv_status,MPI_INT,&n);
1347     source[imdex]  = recv_status.MPI_SOURCE;
1348     lens[imdex]  = n;
1349     slen += n;
1350     count--;
1351   }
1352   PetscFree(recv_waits);
1353 
1354   /* move the data into the send scatter */
1355   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
1356   count = 0;
1357   for ( i=0; i<nrecvs; i++ ) {
1358     values = rvalues + i*nmax;
1359     for ( j=0; j<lens[i]; j++ ) {
1360       lrows[count++] = values[j] - base;
1361     }
1362   }
1363   PetscFree(rvalues); PetscFree(lens);
1364   PetscFree(owner); PetscFree(nprocs);
1365 
1366   /* actually zap the local rows */
1367   ierr = ISCreateGeneral(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
1368   PLogObjectParent(A,istmp);
1369   PetscFree(lrows);
1370   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
1371   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
1372   ierr = ISDestroy(istmp); CHKERRQ(ierr);
1373 
1374   /* wait on sends */
1375   if (nsends) {
1376     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
1377     CHKPTRQ(send_status);
1378     MPI_Waitall(nsends,send_waits,send_status);
1379     PetscFree(send_status);
1380   }
1381   PetscFree(send_waits); PetscFree(svalues);
1382 
1383   return 0;
1384 }
1385 extern int MatPrintHelp_SeqBAIJ(Mat);
1386 #undef __FUNC__
1387 #define __FUNC__ "MatPrintHelp_MPIBAIJ"
1388 int MatPrintHelp_MPIBAIJ(Mat A)
1389 {
1390   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1391 
1392   if (!a->rank) return MatPrintHelp_SeqBAIJ(a->A);
1393   else return 0;
1394 }
1395 
1396 #undef __FUNC__
1397 #define __FUNC__ "MatSetUnfactored_MPIBAIJ"
1398 int MatSetUnfactored_MPIBAIJ(Mat A)
1399 {
1400   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1401   int         ierr;
1402   ierr = MatSetUnfactored(a->A); CHKERRQ(ierr);
1403   return 0;
1404 }
1405 
1406 static int MatConvertSameType_MPIBAIJ(Mat,Mat *,int);
1407 
1408 /* -------------------------------------------------------------------*/
1409 static struct _MatOps MatOps = {
1410   MatSetValues_MPIBAIJ,MatGetRow_MPIBAIJ,MatRestoreRow_MPIBAIJ,MatMult_MPIBAIJ,
1411   MatMultAdd_MPIBAIJ,MatMultTrans_MPIBAIJ,MatMultTransAdd_MPIBAIJ,0,
1412   0,0,0,0,
1413   0,0,MatTranspose_MPIBAIJ,MatGetInfo_MPIBAIJ,
1414   0,MatGetDiagonal_MPIBAIJ,MatDiagonalScale_MPIBAIJ,MatNorm_MPIBAIJ,
1415   MatAssemblyBegin_MPIBAIJ,MatAssemblyEnd_MPIBAIJ,0,MatSetOption_MPIBAIJ,
1416   MatZeroEntries_MPIBAIJ,MatZeroRows_MPIBAIJ,0,
1417   0,0,0,MatGetSize_MPIBAIJ,
1418   MatGetLocalSize_MPIBAIJ,MatGetOwnershipRange_MPIBAIJ,0,0,
1419   0,0,MatConvertSameType_MPIBAIJ,0,0,
1420   0,0,0,MatGetSubMatrices_MPIBAIJ,
1421   MatIncreaseOverlap_MPIBAIJ,MatGetValues_MPIBAIJ,0,MatPrintHelp_MPIBAIJ,
1422   MatScale_MPIBAIJ,0,0,0,MatGetBlockSize_MPIBAIJ,
1423   0,0,0,0,0,0,MatSetUnfactored_MPIBAIJ,0,MatSetValuesBlocked_MPIBAIJ};
1424 
1425 
1426 #undef __FUNC__
1427 #define __FUNC__ "MatCreateMPIBAIJ"
1428 /*@C
1429    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
1430    (block compressed row).  For good matrix assembly performance
1431    the user should preallocate the matrix storage by setting the parameters
1432    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1433    performance can be increased by more than a factor of 50.
1434 
1435    Input Parameters:
1436 .  comm - MPI communicator
1437 .  bs   - size of blockk
1438 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1439            This value should be the same as the local size used in creating the
1440            y vector for the matrix-vector product y = Ax.
1441 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1442            This value should be the same as the local size used in creating the
1443            x vector for the matrix-vector product y = Ax.
1444 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1445 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1446 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
1447            submatrix  (same for all local rows)
1448 .  d_nzz - array containing the number of block nonzeros in the various block rows
1449            of the in diagonal portion of the local (possibly different for each block
1450            row) or PETSC_NULL.  You must leave room for the diagonal entry even if
1451            it is zero.
1452 .  o_nz  - number of block nonzeros per block row in the off-diagonal portion of local
1453            submatrix (same for all local rows).
1454 .  o_nzz - array containing the number of nonzeros in the various block rows of the
1455            off-diagonal portion of the local submatrix (possibly different for
1456            each block row) or PETSC_NULL.
1457 
1458    Output Parameter:
1459 .  A - the matrix
1460 
1461    Notes:
1462    The user MUST specify either the local or global matrix dimensions
1463    (possibly both).
1464 
1465    Storage Information:
1466    For a square global matrix we define each processor's diagonal portion
1467    to be its local rows and the corresponding columns (a square submatrix);
1468    each processor's off-diagonal portion encompasses the remainder of the
1469    local matrix (a rectangular submatrix).
1470 
1471    The user can specify preallocated storage for the diagonal part of
1472    the local submatrix with either d_nz or d_nnz (not both).  Set
1473    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1474    memory allocation.  Likewise, specify preallocated storage for the
1475    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1476 
1477    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1478    the figure below we depict these three local rows and all columns (0-11).
1479 
1480 $          0 1 2 3 4 5 6 7 8 9 10 11
1481 $         -------------------
1482 $  row 3  |  o o o d d d o o o o o o
1483 $  row 4  |  o o o d d d o o o o o o
1484 $  row 5  |  o o o d d d o o o o o o
1485 $         -------------------
1486 $
1487 
1488    Thus, any entries in the d locations are stored in the d (diagonal)
1489    submatrix, and any entries in the o locations are stored in the
1490    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1491    stored simply in the MATSEQBAIJ format for compressed row storage.
1492 
1493    Now d_nz should indicate the number of nonzeros per row in the d matrix,
1494    and o_nz should indicate the number of nonzeros per row in the o matrix.
1495    In general, for PDE problems in which most nonzeros are near the diagonal,
1496    one expects d_nz >> o_nz.   For large problems you MUST preallocate memory
1497    or you will get TERRIBLE performance; see the users' manual chapter on
1498    matrices.
1499 
1500 .keywords: matrix, block, aij, compressed row, sparse, parallel
1501 
1502 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues()
1503 @*/
1504 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
1505                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1506 {
1507   Mat          B;
1508   Mat_MPIBAIJ  *b;
1509   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE,size;
1510 
1511   if (bs < 1) SETERRQ(1,0,"invalid block size specified");
1512   *A = 0;
1513   PetscHeaderCreate(B,_Mat,MAT_COOKIE,MATMPIBAIJ,comm);
1514   PLogObjectCreate(B);
1515   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
1516   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
1517   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1518   MPI_Comm_size(comm,&size);
1519   if (size == 1) {
1520     B->ops.getrowij          = MatGetRowIJ_MPIBAIJ;
1521     B->ops.restorerowij      = MatRestoreRowIJ_MPIBAIJ;
1522     B->ops.lufactorsymbolic  = MatLUFactorSymbolic_MPIBAIJ;
1523     B->ops.lufactornumeric   = MatLUFactorNumeric_MPIBAIJ;
1524     B->ops.lufactor          = MatLUFactor_MPIBAIJ;
1525     B->ops.solve             = MatSolve_MPIBAIJ;
1526     B->ops.solveadd          = MatSolveAdd_MPIBAIJ;
1527     B->ops.solvetrans        = MatSolveTrans_MPIBAIJ;
1528     B->ops.solvetransadd     = MatSolveTransAdd_MPIBAIJ;
1529     B->ops.ilufactorsymbolic = MatILUFactorSymbolic_MPIBAIJ;
1530   }
1531 
1532   B->destroy    = MatDestroy_MPIBAIJ;
1533   B->view       = MatView_MPIBAIJ;
1534   B->mapping    = 0;
1535   B->factor     = 0;
1536   B->assembled  = PETSC_FALSE;
1537 
1538   B->insertmode = NOT_SET_VALUES;
1539   MPI_Comm_rank(comm,&b->rank);
1540   MPI_Comm_size(comm,&b->size);
1541 
1542   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
1543     SETERRQ(1,0,"Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1544   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) SETERRQ(1,0,"either M or m should be specified");
1545   if ( M == PETSC_DECIDE && n == PETSC_DECIDE)SETERRQ(1,0,"either N or n should be specified");
1546   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
1547   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
1548 
1549   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1550     work[0] = m; work[1] = n;
1551     mbs = m/bs; nbs = n/bs;
1552     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
1553     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
1554     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
1555   }
1556   if (m == PETSC_DECIDE) {
1557     Mbs = M/bs;
1558     if (Mbs*bs != M) SETERRQ(1,0,"No of global rows must be divisible by blocksize");
1559     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
1560     m   = mbs*bs;
1561   }
1562   if (n == PETSC_DECIDE) {
1563     Nbs = N/bs;
1564     if (Nbs*bs != N) SETERRQ(1,0,"No of global cols must be divisible by blocksize");
1565     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
1566     n   = nbs*bs;
1567   }
1568   if (mbs*bs != m || nbs*bs != n) SETERRQ(1,0,"No of local rows, cols must be divisible by blocksize");
1569 
1570   b->m = m; B->m = m;
1571   b->n = n; B->n = n;
1572   b->N = N; B->N = N;
1573   b->M = M; B->M = M;
1574   b->bs  = bs;
1575   b->bs2 = bs*bs;
1576   b->mbs = mbs;
1577   b->nbs = nbs;
1578   b->Mbs = Mbs;
1579   b->Nbs = Nbs;
1580 
1581   /* build local table of row and column ownerships */
1582   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1583   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
1584   b->cowners = b->rowners + b->size + 2;
1585   MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
1586   b->rowners[0] = 0;
1587   for ( i=2; i<=b->size; i++ ) {
1588     b->rowners[i] += b->rowners[i-1];
1589   }
1590   b->rstart    = b->rowners[b->rank];
1591   b->rend      = b->rowners[b->rank+1];
1592   b->rstart_bs = b->rstart * bs;
1593   b->rend_bs   = b->rend * bs;
1594 
1595   MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
1596   b->cowners[0] = 0;
1597   for ( i=2; i<=b->size; i++ ) {
1598     b->cowners[i] += b->cowners[i-1];
1599   }
1600   b->cstart    = b->cowners[b->rank];
1601   b->cend      = b->cowners[b->rank+1];
1602   b->cstart_bs = b->cstart * bs;
1603   b->cend_bs   = b->cend * bs;
1604 
1605   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1606   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1607   PLogObjectParent(B,b->A);
1608   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1609   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1610   PLogObjectParent(B,b->B);
1611 
1612   /* build cache for off array entries formed */
1613   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1614   b->donotstash  = 0;
1615   b->colmap      = 0;
1616   b->garray      = 0;
1617   b->roworiented = 1;
1618 
1619   /* stuff used in block assembly */
1620   b->barray      = 0;
1621 
1622   /* stuff used for matrix vector multiply */
1623   b->lvec        = 0;
1624   b->Mvctx       = 0;
1625 
1626   /* stuff for MatGetRow() */
1627   b->rowindices   = 0;
1628   b->rowvalues    = 0;
1629   b->getrowactive = PETSC_FALSE;
1630 
1631   *A = B;
1632   return 0;
1633 }
1634 
1635 #undef __FUNC__
1636 #define __FUNC__ "MatConvertSameType_MPIBAIJ"
1637 static int MatConvertSameType_MPIBAIJ(Mat matin,Mat *newmat,int cpvalues)
1638 {
1639   Mat         mat;
1640   Mat_MPIBAIJ *a,*oldmat = (Mat_MPIBAIJ *) matin->data;
1641   int         ierr, len=0, flg;
1642 
1643   *newmat       = 0;
1644   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIBAIJ,matin->comm);
1645   PLogObjectCreate(mat);
1646   mat->data       = (void *) (a = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(a);
1647   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1648   mat->destroy    = MatDestroy_MPIBAIJ;
1649   mat->view       = MatView_MPIBAIJ;
1650   mat->factor     = matin->factor;
1651   mat->assembled  = PETSC_TRUE;
1652 
1653   a->m = mat->m   = oldmat->m;
1654   a->n = mat->n   = oldmat->n;
1655   a->M = mat->M   = oldmat->M;
1656   a->N = mat->N   = oldmat->N;
1657 
1658   a->bs  = oldmat->bs;
1659   a->bs2 = oldmat->bs2;
1660   a->mbs = oldmat->mbs;
1661   a->nbs = oldmat->nbs;
1662   a->Mbs = oldmat->Mbs;
1663   a->Nbs = oldmat->Nbs;
1664 
1665   a->rstart       = oldmat->rstart;
1666   a->rend         = oldmat->rend;
1667   a->cstart       = oldmat->cstart;
1668   a->cend         = oldmat->cend;
1669   a->size         = oldmat->size;
1670   a->rank         = oldmat->rank;
1671   mat->insertmode = NOT_SET_VALUES;
1672   a->rowvalues    = 0;
1673   a->getrowactive = PETSC_FALSE;
1674   a->barray       = 0;
1675 
1676   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1677   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
1678   a->cowners = a->rowners + a->size + 2;
1679   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1680   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1681   if (oldmat->colmap) {
1682     a->colmap = (int *) PetscMalloc((a->Nbs)*sizeof(int));CHKPTRQ(a->colmap);
1683     PLogObjectMemory(mat,(a->Nbs)*sizeof(int));
1684     PetscMemcpy(a->colmap,oldmat->colmap,(a->Nbs)*sizeof(int));
1685   } else a->colmap = 0;
1686   if (oldmat->garray && (len = ((Mat_SeqBAIJ *) (oldmat->B->data))->nbs)) {
1687     a->garray = (int *) PetscMalloc(len*sizeof(int)); CHKPTRQ(a->garray);
1688     PLogObjectMemory(mat,len*sizeof(int));
1689     PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1690   } else a->garray = 0;
1691 
1692   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1693   PLogObjectParent(mat,a->lvec);
1694   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1695   PLogObjectParent(mat,a->Mvctx);
1696   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1697   PLogObjectParent(mat,a->A);
1698   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1699   PLogObjectParent(mat,a->B);
1700   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1701   if (flg) {
1702     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1703   }
1704   *newmat = mat;
1705   return 0;
1706 }
1707 
1708 #include "sys.h"
1709 
1710 #undef __FUNC__
1711 #define __FUNC__ "MatLoad_MPIBAIJ"
1712 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
1713 {
1714   Mat          A;
1715   int          i, nz, ierr, j,rstart, rend, fd;
1716   Scalar       *vals,*buf;
1717   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1718   MPI_Status   status;
1719   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
1720   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
1721   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,bs2,Mbs,mbs,extra_rows;
1722   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
1723   int          dcount,kmax,k,nzcount,tmp;
1724 
1725 
1726   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
1727   bs2  = bs*bs;
1728 
1729   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1730   if (!rank) {
1731     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1732     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
1733     if (header[0] != MAT_COOKIE) SETERRQ(1,0,"not matrix object");
1734   }
1735 
1736   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1737   M = header[1]; N = header[2];
1738 
1739   if (M != N) SETERRQ(1,0,"Can only do square matrices");
1740 
1741   /*
1742      This code adds extra rows to make sure the number of rows is
1743      divisible by the blocksize
1744   */
1745   Mbs        = M/bs;
1746   extra_rows = bs - M + bs*(Mbs);
1747   if (extra_rows == bs) extra_rows = 0;
1748   else                  Mbs++;
1749   if (extra_rows &&!rank) {
1750     PLogInfo(0,"MatLoad_MPIBAIJ:Padding loaded matrix to match blocksize\n");
1751   }
1752 
1753   /* determine ownership of all rows */
1754   mbs = Mbs/size + ((Mbs % size) > rank);
1755   m   = mbs * bs;
1756   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
1757   browners = rowners + size + 1;
1758   MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1759   rowners[0] = 0;
1760   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
1761   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
1762   rstart = rowners[rank];
1763   rend   = rowners[rank+1];
1764 
1765   /* distribute row lengths to all processors */
1766   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
1767   if (!rank) {
1768     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
1769     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1770     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
1771     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1772     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
1773     MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);
1774     PetscFree(sndcounts);
1775   }
1776   else {
1777     MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);
1778   }
1779 
1780   if (!rank) {
1781     /* calculate the number of nonzeros on each processor */
1782     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1783     PetscMemzero(procsnz,size*sizeof(int));
1784     for ( i=0; i<size; i++ ) {
1785       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
1786         procsnz[i] += rowlengths[j];
1787       }
1788     }
1789     PetscFree(rowlengths);
1790 
1791     /* determine max buffer needed and allocate it */
1792     maxnz = 0;
1793     for ( i=0; i<size; i++ ) {
1794       maxnz = PetscMax(maxnz,procsnz[i]);
1795     }
1796     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1797 
1798     /* read in my part of the matrix column indices  */
1799     nz = procsnz[0];
1800     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1801     mycols = ibuf;
1802     if (size == 1)  nz -= extra_rows;
1803     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1804     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
1805 
1806     /* read in every ones (except the last) and ship off */
1807     for ( i=1; i<size-1; i++ ) {
1808       nz = procsnz[i];
1809       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1810       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1811     }
1812     /* read in the stuff for the last proc */
1813     if ( size != 1 ) {
1814       nz = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
1815       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1816       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
1817       MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);
1818     }
1819     PetscFree(cols);
1820   }
1821   else {
1822     /* determine buffer space needed for message */
1823     nz = 0;
1824     for ( i=0; i<m; i++ ) {
1825       nz += locrowlens[i];
1826     }
1827     ibuf = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1828     mycols = ibuf;
1829     /* receive message of column indices*/
1830     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1831     MPI_Get_count(&status,MPI_INT,&maxnz);
1832     if (maxnz != nz) SETERRQ(1,0,"something is wrong with file");
1833   }
1834 
1835   /* loop over local rows, determining number of off diagonal entries */
1836   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
1837   odlens = dlens + (rend-rstart);
1838   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
1839   PetscMemzero(mask,3*Mbs*sizeof(int));
1840   masked1 = mask    + Mbs;
1841   masked2 = masked1 + Mbs;
1842   rowcount = 0; nzcount = 0;
1843   for ( i=0; i<mbs; i++ ) {
1844     dcount  = 0;
1845     odcount = 0;
1846     for ( j=0; j<bs; j++ ) {
1847       kmax = locrowlens[rowcount];
1848       for ( k=0; k<kmax; k++ ) {
1849         tmp = mycols[nzcount++]/bs;
1850         if (!mask[tmp]) {
1851           mask[tmp] = 1;
1852           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
1853           else masked1[dcount++] = tmp;
1854         }
1855       }
1856       rowcount++;
1857     }
1858 
1859     dlens[i]  = dcount;
1860     odlens[i] = odcount;
1861 
1862     /* zero out the mask elements we set */
1863     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
1864     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
1865   }
1866 
1867   /* create our matrix */
1868   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);
1869          CHKERRQ(ierr);
1870   A = *newmat;
1871   MatSetOption(A,MAT_COLUMNS_SORTED);
1872 
1873   if (!rank) {
1874     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
1875     /* read in my part of the matrix numerical values  */
1876     nz = procsnz[0];
1877     vals = buf;
1878     mycols = ibuf;
1879     if (size == 1)  nz -= extra_rows;
1880     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1881     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
1882 
1883     /* insert into matrix */
1884     jj      = rstart*bs;
1885     for ( i=0; i<m; i++ ) {
1886       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1887       mycols += locrowlens[i];
1888       vals   += locrowlens[i];
1889       jj++;
1890     }
1891     /* read in other processors (except the last one) and ship out */
1892     for ( i=1; i<size-1; i++ ) {
1893       nz = procsnz[i];
1894       vals = buf;
1895       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1896       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1897     }
1898     /* the last proc */
1899     if ( size != 1 ){
1900       nz = procsnz[i] - extra_rows;
1901       vals = buf;
1902       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1903       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
1904       MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);
1905     }
1906     PetscFree(procsnz);
1907   }
1908   else {
1909     /* receive numeric values */
1910     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
1911 
1912     /* receive message of values*/
1913     vals = buf;
1914     mycols = ibuf;
1915     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1916     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1917     if (maxnz != nz) SETERRQ(1,0,"something is wrong with file");
1918 
1919     /* insert into matrix */
1920     jj      = rstart*bs;
1921     for ( i=0; i<m; i++ ) {
1922       ierr    = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1923       mycols += locrowlens[i];
1924       vals   += locrowlens[i];
1925       jj++;
1926     }
1927   }
1928   PetscFree(locrowlens);
1929   PetscFree(buf);
1930   PetscFree(ibuf);
1931   PetscFree(rowners);
1932   PetscFree(dlens);
1933   PetscFree(mask);
1934   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1935   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1936   return 0;
1937 }
1938 
1939 
1940