xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision 596b8d2e907766f4f3c92fab62aa7579371552eb)
1 #ifdef PETSC_RCS_HEADER
2 static char vcid[] = "$Id: mpibaij.c,v 1.75 1997/07/29 14:10:02 bsmith Exp balay $";
3 #endif
4 
5 #include "pinclude/pviewer.h"
6 #include "src/mat/impls/baij/mpi/mpibaij.h"
7 #include "src/vec/vecimpl.h"
8 
9 
10 extern int MatSetUpMultiply_MPIBAIJ(Mat);
11 extern int DisAssemble_MPIBAIJ(Mat);
12 extern int MatIncreaseOverlap_MPIBAIJ(Mat,int,IS *,int);
13 extern int MatGetSubMatrices_MPIBAIJ(Mat,int,IS *,IS *,MatGetSubMatrixCall,Mat **);
14 extern int MatLUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,Mat *);
15 extern int MatLUFactorNumeric_MPIBAIJ(Mat,Mat *);
16 extern int MatLUFactor_MPIBAIJ(Mat,IS,IS,double);
17 extern int MatSolve_MPIBAIJ(Mat,Vec,Vec);
18 extern int MatSolveAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
19 extern int MatSolveTrans_MPIBAIJ(Mat,Vec,Vec);
20 extern int MatSolveTransAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
21 extern int MatILUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,int,Mat *);
22 
23 
24 /*
25      Local utility routine that creates a mapping from the global column
26    number to the local number in the off-diagonal part of the local
27    storage of the matrix.  This is done in a non scable way since the
28    length of colmap equals the global matrix length.
29 */
30 #undef __FUNC__
31 #define __FUNC__ "CreateColmap_MPIBAIJ_Private"
32 static int CreateColmap_MPIBAIJ_Private(Mat mat)
33 {
34   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
35   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
36   int         nbs = B->nbs,i,bs=B->bs;;
37 
38   baij->colmap = (int *) PetscMalloc((baij->Nbs+1)*sizeof(int));CHKPTRQ(baij->colmap);
39   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
40   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
41   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i*bs+1;
42   return 0;
43 }
44 
45 #undef __FUNC__
46 #define __FUNC__ "MatGetRowIJ_MPIBAIJ("
47 static int MatGetRowIJ_MPIBAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
48                             PetscTruth *done)
49 {
50   Mat_MPIBAIJ *aij = (Mat_MPIBAIJ *) mat->data;
51   int         ierr;
52   if (aij->size == 1) {
53     ierr = MatGetRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
54   } else SETERRQ(1,0,"not supported in parallel");
55   return 0;
56 }
57 
58 #undef __FUNC__
59 #define __FUNC__ "MatRestoreRowIJ_MPIBAIJ"
60 static int MatRestoreRowIJ_MPIBAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
61                                 PetscTruth *done)
62 {
63   Mat_MPIBAIJ *aij = (Mat_MPIBAIJ *) mat->data;
64   int        ierr;
65   if (aij->size == 1) {
66     ierr = MatRestoreRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
67   } else SETERRQ(1,0,"not supported in parallel");
68   return 0;
69 }
70 #define CHUNKSIZE  10
71 
72 #define  MatSetValues_SeqBAIJ_A_Private(row,col,value,addv) \
73 { \
74  \
75     brow = row/bs;  \
76     rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
77     rmax = aimax[brow]; nrow = ailen[brow]; \
78       bcol = col/bs; \
79       ridx = row % bs; cidx = col % bs; \
80       low = 0; high = nrow; \
81       while (high-low > 3) { \
82         t = (low+high)/2; \
83         if (rp[t] > bcol) high = t; \
84         else              low  = t; \
85       } \
86       for ( _i=low; _i<high; _i++ ) { \
87         if (rp[_i] > bcol) break; \
88         if (rp[_i] == bcol) { \
89           bap  = ap +  bs2*_i + bs*cidx + ridx; \
90           if (addv == ADD_VALUES) *bap += value;  \
91           else                    *bap  = value;  \
92           goto a_noinsert; \
93         } \
94       } \
95       if (a->nonew == 1) goto a_noinsert; \
96       else if (a->nonew == -1) SETERRQ(1,0,"Inserting a new nonzero in the matrix"); \
97       if (nrow >= rmax) { \
98         /* there is no extra room in row, therefore enlarge */ \
99         int    new_nz = ai[a->mbs] + CHUNKSIZE,len,*new_i,*new_j; \
100         Scalar *new_a; \
101  \
102         if (a->nonew == -2) SETERRQ(1,0,"Inserting a new nonzero in the matrix"); \
103  \
104         /* malloc new storage space */ \
105         len     = new_nz*(sizeof(int)+bs2*sizeof(Scalar))+(a->mbs+1)*sizeof(int); \
106         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
107         new_j   = (int *) (new_a + bs2*new_nz); \
108         new_i   = new_j + new_nz; \
109  \
110         /* copy over old data into new slots */ \
111         for ( ii=0; ii<brow+1; ii++ ) {new_i[ii] = ai[ii];} \
112         for ( ii=brow+1; ii<a->mbs+1; ii++ ) {new_i[ii] = ai[ii]+CHUNKSIZE;} \
113         PetscMemcpy(new_j,aj,(ai[brow]+nrow)*sizeof(int)); \
114         len = (new_nz - CHUNKSIZE - ai[brow] - nrow); \
115         PetscMemcpy(new_j+ai[brow]+nrow+CHUNKSIZE,aj+ai[brow]+nrow, \
116                                                            len*sizeof(int)); \
117         PetscMemcpy(new_a,aa,(ai[brow]+nrow)*bs2*sizeof(Scalar)); \
118         PetscMemzero(new_a+bs2*(ai[brow]+nrow),bs2*CHUNKSIZE*sizeof(Scalar)); \
119         PetscMemcpy(new_a+bs2*(ai[brow]+nrow+CHUNKSIZE), \
120                     aa+bs2*(ai[brow]+nrow),bs2*len*sizeof(Scalar));  \
121         /* free up old matrix storage */ \
122         PetscFree(a->a);  \
123         if (!a->singlemalloc) {PetscFree(a->i);PetscFree(a->j);} \
124         aa = a->a = new_a; ai = a->i = new_i; aj = a->j = new_j;  \
125         a->singlemalloc = 1; \
126  \
127         rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
128         rmax = aimax[brow] = aimax[brow] + CHUNKSIZE; \
129         PLogObjectMemory(A,CHUNKSIZE*(sizeof(int) + bs2*sizeof(Scalar))); \
130         a->maxnz += bs2*CHUNKSIZE; \
131         a->reallocs++; \
132         a->nz++; \
133       } \
134       N = nrow++ - 1;  \
135       /* shift up all the later entries in this row */ \
136       for ( ii=N; ii>=_i; ii-- ) { \
137         rp[ii+1] = rp[ii]; \
138         PetscMemcpy(ap+bs2*(ii+1),ap+bs2*(ii),bs2*sizeof(Scalar)); \
139       } \
140       if (N>=_i) PetscMemzero(ap+bs2*_i,bs2*sizeof(Scalar));  \
141       rp[_i]                      = bcol;  \
142       ap[bs2*_i + bs*cidx + ridx] = value;  \
143       a_noinsert:; \
144     ailen[brow] = nrow; \
145 }
146 
147 #define  MatSetValues_SeqBAIJ_B_Private(row,col,value,addv) \
148 { \
149  \
150     brow = row/bs;  \
151     rp   = bj + bi[brow]; ap = ba + bs2*bi[brow]; \
152     rmax = bimax[brow]; nrow = bilen[brow]; \
153       bcol = col/bs; \
154       ridx = row % bs; cidx = col % bs; \
155       low = 0; high = nrow; \
156       while (high-low > 3) { \
157         t = (low+high)/2; \
158         if (rp[t] > bcol) high = t; \
159         else              low  = t; \
160       } \
161       for ( _i=low; _i<high; _i++ ) { \
162         if (rp[_i] > bcol) break; \
163         if (rp[_i] == bcol) { \
164           bap  = ap +  bs2*_i + bs*cidx + ridx; \
165           if (addv == ADD_VALUES) *bap += value;  \
166           else                    *bap  = value;  \
167           goto b_noinsert; \
168         } \
169       } \
170       if (b->nonew == 1) goto b_noinsert; \
171       else if (b->nonew == -1) SETERRQ(1,0,"Inserting a new nonzero in the matrix"); \
172       if (nrow >= rmax) { \
173         /* there is no extra room in row, therefore enlarge */ \
174         int    new_nz = bi[b->mbs] + CHUNKSIZE,len,*new_i,*new_j; \
175         Scalar *new_a; \
176  \
177         if (b->nonew == -2) SETERRQ(1,0,"Inserting a new nonzero in the matrix"); \
178  \
179         /* malloc new storage space */ \
180         len     = new_nz*(sizeof(int)+bs2*sizeof(Scalar))+(b->mbs+1)*sizeof(int); \
181         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
182         new_j   = (int *) (new_a + bs2*new_nz); \
183         new_i   = new_j + new_nz; \
184  \
185         /* copy over old data into new slots */ \
186         for ( ii=0; ii<brow+1; ii++ ) {new_i[ii] = bi[ii];} \
187         for ( ii=brow+1; ii<b->mbs+1; ii++ ) {new_i[ii] = bi[ii]+CHUNKSIZE;} \
188         PetscMemcpy(new_j,bj,(bi[brow]+nrow)*sizeof(int)); \
189         len = (new_nz - CHUNKSIZE - bi[brow] - nrow); \
190         PetscMemcpy(new_j+bi[brow]+nrow+CHUNKSIZE,bj+bi[brow]+nrow, \
191                                                            len*sizeof(int)); \
192         PetscMemcpy(new_a,ba,(bi[brow]+nrow)*bs2*sizeof(Scalar)); \
193         PetscMemzero(new_a+bs2*(bi[brow]+nrow),bs2*CHUNKSIZE*sizeof(Scalar)); \
194         PetscMemcpy(new_a+bs2*(bi[brow]+nrow+CHUNKSIZE), \
195                     ba+bs2*(bi[brow]+nrow),bs2*len*sizeof(Scalar));  \
196         /* free up old matrix storage */ \
197         PetscFree(b->a);  \
198         if (!b->singlemalloc) {PetscFree(b->i);PetscFree(b->j);} \
199         ba = b->a = new_a; bi = b->i = new_i; bj = b->j = new_j;  \
200         b->singlemalloc = 1; \
201  \
202         rp   = bj + bi[brow]; ap = ba + bs2*bi[brow]; \
203         rmax = bimax[brow] = bimax[brow] + CHUNKSIZE; \
204         PLogObjectMemory(B,CHUNKSIZE*(sizeof(int) + bs2*sizeof(Scalar))); \
205         b->maxnz += bs2*CHUNKSIZE; \
206         b->reallocs++; \
207         b->nz++; \
208       } \
209       N = nrow++ - 1;  \
210       /* shift up all the later entries in this row */ \
211       for ( ii=N; ii>=_i; ii-- ) { \
212         rp[ii+1] = rp[ii]; \
213         PetscMemcpy(ap+bs2*(ii+1),ap+bs2*(ii),bs2*sizeof(Scalar)); \
214       } \
215       if (N>=_i) PetscMemzero(ap+bs2*_i,bs2*sizeof(Scalar));  \
216       rp[_i]                      = bcol;  \
217       ap[bs2*_i + bs*cidx + ridx] = value;  \
218       b_noinsert:; \
219     bilen[brow] = nrow; \
220 }
221 
222 #undef __FUNC__
223 #define __FUNC__ "MatSetValues_MPIBAIJ"
224 int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
225 {
226   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
227   Scalar      value;
228   int         ierr,i,j,row,col;
229   int         roworiented = baij->roworiented,rstart_orig=baij->rstart_bs ;
230   int         rend_orig=baij->rend_bs,cstart_orig=baij->cstart_bs;
231   int         cend_orig=baij->cend_bs,bs=baij->bs;
232 
233   /* Some Variables required in the macro */
234   Mat         A = baij->A;
235   Mat_SeqBAIJ *a = (Mat_SeqBAIJ *) (A)->data;
236   int         *aimax=a->imax,*ai=a->i,*ailen=a->ilen,*aj=a->j;
237   Scalar      *aa=a->a;
238 
239   Mat         B = baij->B;
240   Mat_SeqBAIJ *b = (Mat_SeqBAIJ *) (B)->data;
241   int         *bimax=b->imax,*bi=b->i,*bilen=b->ilen,*bj=b->j;
242   Scalar      *ba=b->a;
243 
244   int         *rp,ii,nrow,_i,rmax,N,brow,bcol;
245   int         low,high,t,ridx,cidx,bs2=a->bs2;
246   Scalar      *ap,*bap;
247 
248   for ( i=0; i<m; i++ ) {
249 #if defined(PETSC_BOPT_g)
250     if (im[i] < 0) SETERRQ(1,0,"Negative row");
251     if (im[i] >= baij->M) SETERRQ(1,0,"Row too large");
252 #endif
253     if (im[i] >= rstart_orig && im[i] < rend_orig) {
254       row = im[i] - rstart_orig;
255       for ( j=0; j<n; j++ ) {
256         if (in[j] >= cstart_orig && in[j] < cend_orig){
257           col = in[j] - cstart_orig;
258           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
259           MatSetValues_SeqBAIJ_A_Private(row,col,value,addv);
260           /* ierr = MatSetValues_SeqBAIJ(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
261         }
262 #if defined(PETSC_BOPT_g)
263         else if (in[j] < 0) {SETERRQ(1,0,"Negative column");}
264         else if (in[j] >= baij->N) {SETERRQ(1,0,"Col too large");}
265 #endif
266         else {
267           if (mat->was_assembled) {
268             if (!baij->colmap) {
269               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
270             }
271             col = baij->colmap[in[j]/bs] - 1 + in[j]%bs;
272             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
273               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
274               col =  in[j];
275               /* Reinitialize the variables required by MatSetValues_SeqBAIJ_B_Private() */
276               B = baij->B;
277               b = (Mat_SeqBAIJ *) (B)->data;
278               bimax=b->imax;bi=b->i;bilen=b->ilen;bj=b->j;
279               ba=b->a;
280             }
281           }
282           else col = in[j];
283           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
284           MatSetValues_SeqBAIJ_B_Private(row,col,value,addv);
285           /* ierr = MatSetValues_SeqBAIJ(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
286         }
287       }
288     }
289     else {
290       if (roworiented && !baij->donotstash) {
291         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
292       }
293       else {
294         if (!baij->donotstash) {
295           row = im[i];
296 	  for ( j=0; j<n; j++ ) {
297 	    ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
298           }
299         }
300       }
301     }
302   }
303   return 0;
304 }
305 
306 extern int MatSetValuesBlocked_SeqBAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
307 #undef __FUNC__
308 #define __FUNC__ "MatSetValuesBlocked_MPIBAIJ"
309 int MatSetValuesBlocked_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
310 {
311   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
312   Scalar      *value,*barray=baij->barray;
313   int         ierr,i,j,ii,jj,row,col,k,l;
314   int         roworiented = baij->roworiented,rstart=baij->rstart ;
315   int         rend=baij->rend,cstart=baij->cstart,stepval;
316   int         cend=baij->cend,bs=baij->bs,bs2=baij->bs2;
317 
318 
319   if(!barray) {
320     baij->barray = barray = (Scalar*) PetscMalloc(bs2*sizeof(Scalar)); CHKPTRQ(barray);
321   }
322 
323   if (roworiented) {
324     stepval = (n-1)*bs;
325   } else {
326     stepval = (m-1)*bs;
327   }
328   for ( i=0; i<m; i++ ) {
329 #if defined(PETSC_BOPT_g)
330     if (im[i] < 0) SETERRQ(1,0,"Negative row");
331     if (im[i] >= baij->Mbs) SETERRQ(1,0,"Row too large");
332 #endif
333     if (im[i] >= rstart && im[i] < rend) {
334       row = im[i] - rstart;
335       for ( j=0; j<n; j++ ) {
336         /* If NumCol = 1 then a copy is not required */
337         if ((roworiented) && (n == 1)) {
338           barray = v + i*bs2;
339         } else if((!roworiented) && (m == 1)) {
340           barray = v + j*bs2;
341         } else { /* Here a copy is required */
342           if (roworiented) {
343             value = v + i*(stepval+bs)*bs + j*bs;
344           } else {
345             value = v + j*(stepval+bs)*bs + i*bs;
346           }
347           for ( ii=0; ii<bs; ii++,value+=stepval ) {
348             for (jj=0; jj<bs; jj++ ) {
349               *barray++  = *value++;
350             }
351           }
352           barray -=bs2;
353         }
354 
355         if (in[j] >= cstart && in[j] < cend){
356           col  = in[j] - cstart;
357           ierr = MatSetValuesBlocked_SeqBAIJ(baij->A,1,&row,1,&col,barray,addv);CHKERRQ(ierr);
358         }
359 #if defined(PETSC_BOPT_g)
360         else if (in[j] < 0) {SETERRQ(1,0,"Negative column");}
361         else if (in[j] >= baij->Nbs) {SETERRQ(1,0,"Column too large");}
362 #endif
363         else {
364           if (mat->was_assembled) {
365             if (!baij->colmap) {
366               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
367             }
368 
369 #if defined(PETSC_BOPT_g)
370             if ((baij->colmap[in[j]] - 1) % bs) {SETERRQ(1,0,"Incorrect colmap");}
371 #endif
372             col = (baij->colmap[in[j]] - 1)/bs;
373             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
374               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
375               col =  in[j];
376             }
377           }
378           else col = in[j];
379           ierr = MatSetValuesBlocked_SeqBAIJ(baij->B,1,&row,1,&col,barray,addv);CHKERRQ(ierr);
380         }
381       }
382     }
383     else {
384       if (!baij->donotstash) {
385         if (roworiented ) {
386           row   = im[i]*bs;
387           value = v + i*(stepval+bs)*bs;
388           for ( j=0; j<bs; j++,row++ ) {
389             for ( k=0; k<n; k++ ) {
390               for ( col=in[k]*bs,l=0; l<bs; l++,col++) {
391                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
392               }
393             }
394           }
395         }
396         else {
397           for ( j=0; j<n; j++ ) {
398             value = v + j*(stepval+bs)*bs + i*bs;
399             col   = in[j]*bs;
400             for ( k=0; k<bs; k++,col++,value+=stepval) {
401               for ( row = im[i]*bs,l=0; l<bs; l++,row++) {
402                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
403               }
404             }
405           }
406         }
407       }
408     }
409   }
410   return 0;
411 }
412 
413 #undef __FUNC__
414 #define __FUNC__ "MatGetValues_MPIBAIJ"
415 int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
416 {
417   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
418   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
419   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col;
420 
421   for ( i=0; i<m; i++ ) {
422     if (idxm[i] < 0) SETERRQ(1,0,"Negative row");
423     if (idxm[i] >= baij->M) SETERRQ(1,0,"Row too large");
424     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
425       row = idxm[i] - bsrstart;
426       for ( j=0; j<n; j++ ) {
427         if (idxn[j] < 0) SETERRQ(1,0,"Negative column");
428         if (idxn[j] >= baij->N) SETERRQ(1,0,"Col too large");
429         if (idxn[j] >= bscstart && idxn[j] < bscend){
430           col = idxn[j] - bscstart;
431           ierr = MatGetValues(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
432         }
433         else {
434           if (!baij->colmap) {
435             ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
436           }
437           if((baij->colmap[idxn[j]/bs]-1 < 0) ||
438              (baij->garray[(baij->colmap[idxn[j]/bs]-1)/bs] != idxn[j]/bs)) *(v+i*n+j) = 0.0;
439           else {
440             col  = (baij->colmap[idxn[j]/bs]-1) + idxn[j]%bs;
441             ierr = MatGetValues(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
442           }
443         }
444       }
445     }
446     else {
447       SETERRQ(1,0,"Only local values currently supported");
448     }
449   }
450   return 0;
451 }
452 
453 #undef __FUNC__
454 #define __FUNC__ "MatNorm_MPIBAIJ"
455 int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
456 {
457   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
458   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
459   int        ierr, i,bs2=baij->bs2;
460   double     sum = 0.0;
461   Scalar     *v;
462 
463   if (baij->size == 1) {
464     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
465   } else {
466     if (type == NORM_FROBENIUS) {
467       v = amat->a;
468       for (i=0; i<amat->nz*bs2; i++ ) {
469 #if defined(PETSC_COMPLEX)
470         sum += real(conj(*v)*(*v)); v++;
471 #else
472         sum += (*v)*(*v); v++;
473 #endif
474       }
475       v = bmat->a;
476       for (i=0; i<bmat->nz*bs2; i++ ) {
477 #if defined(PETSC_COMPLEX)
478         sum += real(conj(*v)*(*v)); v++;
479 #else
480         sum += (*v)*(*v); v++;
481 #endif
482       }
483       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
484       *norm = sqrt(*norm);
485     }
486     else
487       SETERRQ(PETSC_ERR_SUP,0,"No support for this norm yet");
488   }
489   return 0;
490 }
491 
492 #undef __FUNC__
493 #define __FUNC__ "MatAssemblyBegin_MPIBAIJ"
494 int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
495 {
496   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
497   MPI_Comm    comm = mat->comm;
498   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
499   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
500   MPI_Request *send_waits,*recv_waits;
501   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
502   InsertMode  addv;
503   Scalar      *rvalues,*svalues;
504 
505   /* make sure all processors are either in INSERTMODE or ADDMODE */
506   MPI_Allreduce(&mat->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
507   if (addv == (ADD_VALUES|INSERT_VALUES)) {
508     SETERRQ(1,0,"Some processors inserted others added");
509   }
510   mat->insertmode = addv; /* in case this processor had no cache */
511 
512   /*  first count number of contributors to each processor */
513   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
514   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
515   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
516   for ( i=0; i<baij->stash.n; i++ ) {
517     idx = baij->stash.idx[i];
518     for ( j=0; j<size; j++ ) {
519       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
520         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
521       }
522     }
523   }
524   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
525 
526   /* inform other processors of number of messages and max length*/
527   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
528   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
529   nreceives = work[rank];
530   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
531   nmax = work[rank];
532   PetscFree(work);
533 
534   /* post receives:
535        1) each message will consist of ordered pairs
536      (global index,value) we store the global index as a double
537      to simplify the message passing.
538        2) since we don't know how long each individual message is we
539      allocate the largest needed buffer for each receive. Potentially
540      this is a lot of wasted space.
541 
542 
543        This could be done better.
544   */
545   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
546   CHKPTRQ(rvalues);
547   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
548   CHKPTRQ(recv_waits);
549   for ( i=0; i<nreceives; i++ ) {
550     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
551               comm,recv_waits+i);
552   }
553 
554   /* do sends:
555       1) starts[i] gives the starting index in svalues for stuff going to
556          the ith processor
557   */
558   svalues = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
559   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
560   CHKPTRQ(send_waits);
561   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
562   starts[0] = 0;
563   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
564   for ( i=0; i<baij->stash.n; i++ ) {
565     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
566     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
567     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
568   }
569   PetscFree(owner);
570   starts[0] = 0;
571   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
572   count = 0;
573   for ( i=0; i<size; i++ ) {
574     if (procs[i]) {
575       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
576                 comm,send_waits+count++);
577     }
578   }
579   PetscFree(starts); PetscFree(nprocs);
580 
581   /* Free cache space */
582   PLogInfo(mat,"MatAssemblyBegin_MPIBAIJ:Number of off-processor values %d\n",baij->stash.n);
583   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
584 
585   baij->svalues    = svalues;    baij->rvalues    = rvalues;
586   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
587   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
588   baij->rmax       = nmax;
589 
590   return 0;
591 }
592 #include <math.h>
593 #define HASH_KEY 0.6180339887
594 #define HASH1(size,key) ((int)((size)*fmod(((key)*HASH_KEY),1))+1)
595 
596 int CreateHashTable(Mat mat)
597 {
598   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
599   Mat         A = baij->A, B=baij->B;
600   Mat_SeqBAIJ *a=(Mat_SeqBAIJ *)A->data, *b=(Mat_SeqBAIJ *)B->data;
601   int         i,j,k,nz=a->nz+b->nz,h1,*ai=a->i,*aj=a->j,*bi=b->i,*bj=b->j;
602   int         size=1.5*nz,ct=0,max=0;
603   Scalar      *aa=a->a,*ba=b->a;
604   double      key;
605   static double *HT;
606   static      int flag=1;
607 
608 
609   /* Allocate Memory for Hash Table */
610   if (flag) {
611     HT = (double*)PetscMalloc(size*sizeof(double));
612     flag = 0;
613   }
614   PetscMemzero(HT,size*sizeof(double));
615 
616   /* Loop Over A */
617   for ( i=0; i<a->n; i++ ) {
618     for ( j=ai[i]; j<ai[i+1]; j++ ) {
619       key = i*baij->n+aj[j]+1;
620       h1  = HASH1(size,key);
621 
622       for ( k=1; k<size; k++ ){
623         if (HT[(h1*k)%size] == 0.0) {
624           HT[(h1*k)%size] = key;
625           break;
626         } else ct++;
627       }
628       if (k> max) max =k;
629     }
630   }
631    printf("***max1 = %d\n",max);
632   /* Loop Over B */
633   for ( i=0; i<b->n; i++ ) {
634     for ( j=bi[i]; j<bi[i+1]; j++ ) {
635       key = i*b->n+bj[j]+1;
636       h1  = HASH1(size,key);
637       for ( k=1; k<size; k++ ){
638         if (HT[(h1*k)%size] == 0.0) {
639           HT[(h1*k)%size] = key;
640           break;
641         } else ct++;
642       }
643       if (k> max) max =k;
644     }
645   }
646 
647   printf("***max2 = %d\n",max);
648   /* Print Summary */
649   for ( i=0,key=0.0,j=0; i<size; i++)
650     if (HT[i]) {j++;}
651 
652   printf("Size %d Average Buckets %d no of Keys %d\n",size,ct,j);
653   return 0;
654 }
655 
656 #undef __FUNC__
657 #define __FUNC__ "MatAssemblyEnd_MPIBAIJ"
658 int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
659 {
660   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
661   MPI_Status  *send_status,recv_status;
662   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
663   int         bs=baij->bs,row,col,other_disassembled,flg;
664   Scalar      *values,val;
665   InsertMode  addv = mat->insertmode;
666 
667   /*  wait on receives */
668   while (count) {
669     MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);
670     /* unpack receives into our local space */
671     values = baij->rvalues + 3*imdex*baij->rmax;
672     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
673     n = n/3;
674     for ( i=0; i<n; i++ ) {
675       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
676       col = (int) PetscReal(values[3*i+1]);
677       val = values[3*i+2];
678       if (col >= baij->cstart*bs && col < baij->cend*bs) {
679         col -= baij->cstart*bs;
680         ierr = MatSetValues(baij->A,1,&row,1,&col,&val,addv); CHKERRQ(ierr)
681       }
682       else {
683         if (mat->was_assembled) {
684           if (!baij->colmap) {
685             ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);
686           }
687           col = (baij->colmap[col/bs]) - 1 + col%bs;
688           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
689             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
690             col = (int) PetscReal(values[3*i+1]);
691           }
692         }
693         ierr = MatSetValues(baij->B,1,&row,1,&col,&val,addv); CHKERRQ(ierr)
694       }
695     }
696     count--;
697   }
698   PetscFree(baij->recv_waits); PetscFree(baij->rvalues);
699 
700   /* wait on sends */
701   if (baij->nsends) {
702     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));
703     CHKPTRQ(send_status);
704     MPI_Waitall(baij->nsends,baij->send_waits,send_status);
705     PetscFree(send_status);
706   }
707   PetscFree(baij->send_waits); PetscFree(baij->svalues);
708 
709   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
710   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
711 
712   /* determine if any processor has disassembled, if so we must
713      also disassemble ourselfs, in order that we may reassemble. */
714   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
715   if (mat->was_assembled && !other_disassembled) {
716     ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
717   }
718 
719   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
720     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
721   }
722   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
723   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
724 
725   ierr = OptionsHasName(PETSC_NULL,"-use_hash",&flg); CHKERRQ(ierr);
726   if (flg) CreateHashTable(mat);
727   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
728   return 0;
729 }
730 
731 #undef __FUNC__
732 #define __FUNC__ "MatView_MPIBAIJ_Binary"
733 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
734 {
735   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
736   int          ierr;
737 
738   if (baij->size == 1) {
739     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
740   }
741   else SETERRQ(1,0,"Only uniprocessor output supported");
742   return 0;
743 }
744 
745 #undef __FUNC__
746 #define __FUNC__ "MatView_MPIBAIJ_ASCIIorDraworMatlab"
747 static int MatView_MPIBAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
748 {
749   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
750   int          ierr, format,rank,bs = baij->bs;
751   FILE         *fd;
752   ViewerType   vtype;
753 
754   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
755   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
756     ierr = ViewerGetFormat(viewer,&format);
757     if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
758       MatInfo info;
759       MPI_Comm_rank(mat->comm,&rank);
760       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
761       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
762       PetscSequentialPhaseBegin(mat->comm,1);
763       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
764               rank,baij->m,(int)info.nz_used*bs,(int)info.nz_allocated*bs,
765               baij->bs,(int)info.memory);
766       ierr = MatGetInfo(baij->A,MAT_LOCAL,&info);
767       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
768       ierr = MatGetInfo(baij->B,MAT_LOCAL,&info);
769       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
770       fflush(fd);
771       PetscSequentialPhaseEnd(mat->comm,1);
772       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
773       return 0;
774     }
775     else if (format == VIEWER_FORMAT_ASCII_INFO) {
776       PetscPrintf(mat->comm,"  block size is %d\n",bs);
777       return 0;
778     }
779   }
780 
781   if (vtype == DRAW_VIEWER) {
782     Draw       draw;
783     PetscTruth isnull;
784     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
785     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
786   }
787 
788   if (vtype == ASCII_FILE_VIEWER) {
789     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
790     PetscSequentialPhaseBegin(mat->comm,1);
791     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
792            baij->rank,baij->m,baij->rstart*bs,baij->rend*bs,baij->n,
793             baij->cstart*bs,baij->cend*bs);
794     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
795     ierr = MatView(baij->B,viewer); CHKERRQ(ierr);
796     fflush(fd);
797     PetscSequentialPhaseEnd(mat->comm,1);
798   }
799   else {
800     int size = baij->size;
801     rank = baij->rank;
802     if (size == 1) {
803       ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
804     }
805     else {
806       /* assemble the entire matrix onto first processor. */
807       Mat         A;
808       Mat_SeqBAIJ *Aloc;
809       int         M = baij->M, N = baij->N,*ai,*aj,row,col,i,j,k,*rvals;
810       int         mbs=baij->mbs;
811       Scalar      *a;
812 
813       if (!rank) {
814         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
815         CHKERRQ(ierr);
816       }
817       else {
818         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
819         CHKERRQ(ierr);
820       }
821       PLogObjectParent(mat,A);
822 
823       /* copy over the A part */
824       Aloc = (Mat_SeqBAIJ*) baij->A->data;
825       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
826       row = baij->rstart;
827       rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
828 
829       for ( i=0; i<mbs; i++ ) {
830         rvals[0] = bs*(baij->rstart + i);
831         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
832         for ( j=ai[i]; j<ai[i+1]; j++ ) {
833           col = (baij->cstart+aj[j])*bs;
834           for (k=0; k<bs; k++ ) {
835             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
836             col++; a += bs;
837           }
838         }
839       }
840       /* copy over the B part */
841       Aloc = (Mat_SeqBAIJ*) baij->B->data;
842       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
843       row = baij->rstart*bs;
844       for ( i=0; i<mbs; i++ ) {
845         rvals[0] = bs*(baij->rstart + i);
846         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
847         for ( j=ai[i]; j<ai[i+1]; j++ ) {
848           col = baij->garray[aj[j]]*bs;
849           for (k=0; k<bs; k++ ) {
850             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
851             col++; a += bs;
852           }
853         }
854       }
855       PetscFree(rvals);
856       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
857       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
858       if (!rank) {
859         ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
860       }
861       ierr = MatDestroy(A); CHKERRQ(ierr);
862     }
863   }
864   return 0;
865 }
866 
867 
868 
869 #undef __FUNC__
870 #define __FUNC__ "MatView_MPIBAIJ"
871 int MatView_MPIBAIJ(PetscObject obj,Viewer viewer)
872 {
873   Mat         mat = (Mat) obj;
874   int         ierr;
875   ViewerType  vtype;
876 
877   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
878   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
879       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
880     ierr = MatView_MPIBAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
881   }
882   else if (vtype == BINARY_FILE_VIEWER) {
883     return MatView_MPIBAIJ_Binary(mat,viewer);
884   }
885   return 0;
886 }
887 
888 #undef __FUNC__
889 #define __FUNC__ "MatDestroy_MPIBAIJ"
890 int MatDestroy_MPIBAIJ(PetscObject obj)
891 {
892   Mat         mat = (Mat) obj;
893   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
894   int         ierr;
895 
896 #if defined(PETSC_LOG)
897   PLogObjectState(obj,"Rows=%d, Cols=%d",baij->M,baij->N);
898 #endif
899 
900   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
901   PetscFree(baij->rowners);
902   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
903   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
904   if (baij->colmap) PetscFree(baij->colmap);
905   if (baij->garray) PetscFree(baij->garray);
906   if (baij->lvec)   VecDestroy(baij->lvec);
907   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
908   if (baij->rowvalues) PetscFree(baij->rowvalues);
909   if (baij->barray) PetscFree(baij->barray);
910   PetscFree(baij);
911   if (mat->mapping) {
912     ierr = ISLocalToGlobalMappingDestroy(mat->mapping); CHKERRQ(ierr);
913   }
914   PLogObjectDestroy(mat);
915   PetscHeaderDestroy(mat);
916   return 0;
917 }
918 
919 #undef __FUNC__
920 #define __FUNC__ "MatMult_MPIBAIJ"
921 int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
922 {
923   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
924   int         ierr, nt;
925 
926   VecGetLocalSize_Fast(xx,nt);
927   if (nt != a->n) {
928     SETERRQ(1,0,"Incompatible partition of A and xx");
929   }
930   VecGetLocalSize_Fast(yy,nt);
931   if (nt != a->m) {
932     SETERRQ(1,0,"Incompatible parition of A and yy");
933   }
934   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
935   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
936   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
937   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
938   ierr = VecScatterPostRecvs(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
939   return 0;
940 }
941 
942 #undef __FUNC__
943 #define __FUNC__ "MatMultAdd_MPIBAIJ"
944 int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
945 {
946   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
947   int        ierr;
948   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
949   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
950   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
951   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
952   return 0;
953 }
954 
955 #undef __FUNC__
956 #define __FUNC__ "MatMultTrans_MPIBAIJ"
957 int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
958 {
959   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
960   int        ierr;
961 
962   /* do nondiagonal part */
963   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
964   /* send it on its way */
965   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
966   /* do local part */
967   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
968   /* receive remote parts: note this assumes the values are not actually */
969   /* inserted in yy until the next line, which is true for my implementation*/
970   /* but is not perhaps always true. */
971   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
972   return 0;
973 }
974 
975 #undef __FUNC__
976 #define __FUNC__ "MatMultTransAdd_MPIBAIJ"
977 int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
978 {
979   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
980   int        ierr;
981 
982   /* do nondiagonal part */
983   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
984   /* send it on its way */
985   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
986   /* do local part */
987   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
988   /* receive remote parts: note this assumes the values are not actually */
989   /* inserted in yy until the next line, which is true for my implementation*/
990   /* but is not perhaps always true. */
991   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
992   return 0;
993 }
994 
995 /*
996   This only works correctly for square matrices where the subblock A->A is the
997    diagonal block
998 */
999 #undef __FUNC__
1000 #define __FUNC__ "MatGetDiagonal_MPIBAIJ"
1001 int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
1002 {
1003   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1004   if (a->M != a->N)
1005     SETERRQ(1,0,"Supports only square matrix where A->A is diag block");
1006   return MatGetDiagonal(a->A,v);
1007 }
1008 
1009 #undef __FUNC__
1010 #define __FUNC__ "MatScale_MPIBAIJ"
1011 int MatScale_MPIBAIJ(Scalar *aa,Mat A)
1012 {
1013   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1014   int        ierr;
1015   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
1016   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
1017   return 0;
1018 }
1019 
1020 #undef __FUNC__
1021 #define __FUNC__ "MatGetSize_MPIBAIJ"
1022 int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
1023 {
1024   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1025   *m = mat->M; *n = mat->N;
1026   return 0;
1027 }
1028 
1029 #undef __FUNC__
1030 #define __FUNC__ "MatGetLocalSize_MPIBAIJ"
1031 int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
1032 {
1033   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1034   *m = mat->m; *n = mat->N;
1035   return 0;
1036 }
1037 
1038 #undef __FUNC__
1039 #define __FUNC__ "MatGetOwnershipRange_MPIBAIJ"
1040 int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
1041 {
1042   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1043   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
1044   return 0;
1045 }
1046 
1047 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
1048 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
1049 
1050 #undef __FUNC__
1051 #define __FUNC__ "MatGetRow_MPIBAIJ"
1052 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
1053 {
1054   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1055   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
1056   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
1057   int        nztot, nzA, nzB, lrow, brstart = mat->rstart*bs, brend = mat->rend*bs;
1058   int        *cmap, *idx_p,cstart = mat->cstart;
1059 
1060   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,0,"Already active");
1061   mat->getrowactive = PETSC_TRUE;
1062 
1063   if (!mat->rowvalues && (idx || v)) {
1064     /*
1065         allocate enough space to hold information from the longest row.
1066     */
1067     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
1068     int     max = 1,mbs = mat->mbs,tmp;
1069     for ( i=0; i<mbs; i++ ) {
1070       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
1071       if (max < tmp) { max = tmp; }
1072     }
1073     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
1074     CHKPTRQ(mat->rowvalues);
1075     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
1076   }
1077 
1078 
1079   if (row < brstart || row >= brend) SETERRQ(1,0,"Only local rows")
1080   lrow = row - brstart;
1081 
1082   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
1083   if (!v)   {pvA = 0; pvB = 0;}
1084   if (!idx) {pcA = 0; if (!v) pcB = 0;}
1085   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1086   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1087   nztot = nzA + nzB;
1088 
1089   cmap  = mat->garray;
1090   if (v  || idx) {
1091     if (nztot) {
1092       /* Sort by increasing column numbers, assuming A and B already sorted */
1093       int imark = -1;
1094       if (v) {
1095         *v = v_p = mat->rowvalues;
1096         for ( i=0; i<nzB; i++ ) {
1097           if (cmap[cworkB[i]/bs] < cstart)   v_p[i] = vworkB[i];
1098           else break;
1099         }
1100         imark = i;
1101         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
1102         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
1103       }
1104       if (idx) {
1105         *idx = idx_p = mat->rowindices;
1106         if (imark > -1) {
1107           for ( i=0; i<imark; i++ ) {
1108             idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs;
1109           }
1110         } else {
1111           for ( i=0; i<nzB; i++ ) {
1112             if (cmap[cworkB[i]/bs] < cstart)
1113               idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
1114             else break;
1115           }
1116           imark = i;
1117         }
1118         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart*bs + cworkA[i];
1119         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
1120       }
1121     }
1122     else {
1123       if (idx) *idx = 0;
1124       if (v)   *v   = 0;
1125     }
1126   }
1127   *nz = nztot;
1128   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1129   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1130   return 0;
1131 }
1132 
1133 #undef __FUNC__
1134 #define __FUNC__ "MatRestoreRow_MPIBAIJ"
1135 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
1136 {
1137   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1138   if (baij->getrowactive == PETSC_FALSE) {
1139     SETERRQ(1,0,"MatGetRow not called");
1140   }
1141   baij->getrowactive = PETSC_FALSE;
1142   return 0;
1143 }
1144 
1145 #undef __FUNC__
1146 #define __FUNC__ "MatGetBlockSize_MPIBAIJ"
1147 int MatGetBlockSize_MPIBAIJ(Mat mat,int *bs)
1148 {
1149   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1150   *bs = baij->bs;
1151   return 0;
1152 }
1153 
1154 #undef __FUNC__
1155 #define __FUNC__ "MatZeroEntries_MPIBAIJ"
1156 int MatZeroEntries_MPIBAIJ(Mat A)
1157 {
1158   Mat_MPIBAIJ *l = (Mat_MPIBAIJ *) A->data;
1159   int         ierr;
1160   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
1161   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
1162   return 0;
1163 }
1164 
1165 #undef __FUNC__
1166 #define __FUNC__ "MatGetInfo_MPIBAIJ"
1167 int MatGetInfo_MPIBAIJ(Mat matin,MatInfoType flag,MatInfo *info)
1168 {
1169   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) matin->data;
1170   Mat         A = a->A, B = a->B;
1171   int         ierr;
1172   double      isend[5], irecv[5];
1173 
1174   info->rows_global    = (double)a->M;
1175   info->columns_global = (double)a->N;
1176   info->rows_local     = (double)a->m;
1177   info->columns_local  = (double)a->N;
1178   info->block_size     = (double)a->bs;
1179   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
1180   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->memory;
1181   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
1182   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->memory;
1183   if (flag == MAT_LOCAL) {
1184     info->nz_used      = isend[0];
1185     info->nz_allocated = isend[1];
1186     info->nz_unneeded  = isend[2];
1187     info->memory       = isend[3];
1188     info->mallocs      = isend[4];
1189   } else if (flag == MAT_GLOBAL_MAX) {
1190     MPI_Allreduce(isend,irecv,5,MPI_INT,MPI_MAX,matin->comm);
1191     info->nz_used      = irecv[0];
1192     info->nz_allocated = irecv[1];
1193     info->nz_unneeded  = irecv[2];
1194     info->memory       = irecv[3];
1195     info->mallocs      = irecv[4];
1196   } else if (flag == MAT_GLOBAL_SUM) {
1197     MPI_Allreduce(isend,irecv,5,MPI_INT,MPI_SUM,matin->comm);
1198     info->nz_used      = irecv[0];
1199     info->nz_allocated = irecv[1];
1200     info->nz_unneeded  = irecv[2];
1201     info->memory       = irecv[3];
1202     info->mallocs      = irecv[4];
1203   }
1204   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
1205   info->fill_ratio_needed = 0;
1206   info->factor_mallocs    = 0;
1207   return 0;
1208 }
1209 
1210 #undef __FUNC__
1211 #define __FUNC__ "MatSetOption_MPIBAIJ"
1212 int MatSetOption_MPIBAIJ(Mat A,MatOption op)
1213 {
1214   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1215 
1216   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
1217       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
1218       op == MAT_COLUMNS_UNSORTED ||
1219       op == MAT_COLUMNS_SORTED ||
1220       op == MAT_NEW_NONZERO_ALLOCATION_ERROR ||
1221       op == MAT_NEW_NONZERO_LOCATION_ERROR) {
1222         MatSetOption(a->A,op);
1223         MatSetOption(a->B,op);
1224   } else if (op == MAT_ROW_ORIENTED) {
1225         a->roworiented = 1;
1226         MatSetOption(a->A,op);
1227         MatSetOption(a->B,op);
1228   } else if (op == MAT_ROWS_SORTED ||
1229              op == MAT_ROWS_UNSORTED ||
1230              op == MAT_SYMMETRIC ||
1231              op == MAT_STRUCTURALLY_SYMMETRIC ||
1232              op == MAT_YES_NEW_DIAGONALS)
1233     PLogInfo(A,"Info:MatSetOption_MPIBAIJ:Option ignored\n");
1234   else if (op == MAT_COLUMN_ORIENTED) {
1235     a->roworiented = 0;
1236     MatSetOption(a->A,op);
1237     MatSetOption(a->B,op);
1238   } else if (op == MAT_IGNORE_OFF_PROC_ENTRIES) {
1239     a->donotstash = 1;
1240   } else if (op == MAT_NO_NEW_DIAGONALS)
1241     {SETERRQ(PETSC_ERR_SUP,0,"MAT_NO_NEW_DIAGONALS");}
1242   else
1243     {SETERRQ(PETSC_ERR_SUP,0,"unknown option");}
1244   return 0;
1245 }
1246 
1247 #undef __FUNC__
1248 #define __FUNC__ "MatTranspose_MPIBAIJ("
1249 int MatTranspose_MPIBAIJ(Mat A,Mat *matout)
1250 {
1251   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) A->data;
1252   Mat_SeqBAIJ *Aloc;
1253   Mat        B;
1254   int        ierr,M=baij->M,N=baij->N,*ai,*aj,row,i,*rvals,j,k,col;
1255   int        bs=baij->bs,mbs=baij->mbs;
1256   Scalar     *a;
1257 
1258   if (matout == PETSC_NULL && M != N)
1259     SETERRQ(1,0,"Square matrix only for in-place");
1260   ierr = MatCreateMPIBAIJ(A->comm,baij->bs,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);
1261   CHKERRQ(ierr);
1262 
1263   /* copy over the A part */
1264   Aloc = (Mat_SeqBAIJ*) baij->A->data;
1265   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1266   row = baij->rstart;
1267   rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
1268 
1269   for ( i=0; i<mbs; i++ ) {
1270     rvals[0] = bs*(baij->rstart + i);
1271     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1272     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1273       col = (baij->cstart+aj[j])*bs;
1274       for (k=0; k<bs; k++ ) {
1275         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1276         col++; a += bs;
1277       }
1278     }
1279   }
1280   /* copy over the B part */
1281   Aloc = (Mat_SeqBAIJ*) baij->B->data;
1282   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1283   row = baij->rstart*bs;
1284   for ( i=0; i<mbs; i++ ) {
1285     rvals[0] = bs*(baij->rstart + i);
1286     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1287     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1288       col = baij->garray[aj[j]]*bs;
1289       for (k=0; k<bs; k++ ) {
1290         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1291         col++; a += bs;
1292       }
1293     }
1294   }
1295   PetscFree(rvals);
1296   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1297   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1298 
1299   if (matout != PETSC_NULL) {
1300     *matout = B;
1301   } else {
1302     /* This isn't really an in-place transpose .... but free data structures from baij */
1303     PetscFree(baij->rowners);
1304     ierr = MatDestroy(baij->A); CHKERRQ(ierr);
1305     ierr = MatDestroy(baij->B); CHKERRQ(ierr);
1306     if (baij->colmap) PetscFree(baij->colmap);
1307     if (baij->garray) PetscFree(baij->garray);
1308     if (baij->lvec) VecDestroy(baij->lvec);
1309     if (baij->Mvctx) VecScatterDestroy(baij->Mvctx);
1310     PetscFree(baij);
1311     PetscMemcpy(A,B,sizeof(struct _p_Mat));
1312     PetscHeaderDestroy(B);
1313   }
1314   return 0;
1315 }
1316 
1317 #undef __FUNC__
1318 #define __FUNC__ "MatDiagonalScale_MPIBAIJ"
1319 int MatDiagonalScale_MPIBAIJ(Mat A,Vec ll,Vec rr)
1320 {
1321   Mat a = ((Mat_MPIBAIJ *) A->data)->A;
1322   Mat b = ((Mat_MPIBAIJ *) A->data)->B;
1323   int ierr,s1,s2,s3;
1324 
1325   if (ll)  {
1326     ierr = VecGetLocalSize(ll,&s1); CHKERRQ(ierr);
1327     ierr = MatGetLocalSize(A,&s2,&s3); CHKERRQ(ierr);
1328     if (s1!=s2) SETERRQ(1,0,"non-conforming local sizes");
1329     ierr = MatDiagonalScale(a,ll,0); CHKERRQ(ierr);
1330     ierr = MatDiagonalScale(b,ll,0); CHKERRQ(ierr);
1331   }
1332   if (rr) SETERRQ(1,0,"not supported for right vector");
1333   return 0;
1334 }
1335 
1336 /* the code does not do the diagonal entries correctly unless the
1337    matrix is square and the column and row owerships are identical.
1338    This is a BUG. The only way to fix it seems to be to access
1339    baij->A and baij->B directly and not through the MatZeroRows()
1340    routine.
1341 */
1342 #undef __FUNC__
1343 #define __FUNC__ "MatZeroRows_MPIBAIJ"
1344 int MatZeroRows_MPIBAIJ(Mat A,IS is,Scalar *diag)
1345 {
1346   Mat_MPIBAIJ    *l = (Mat_MPIBAIJ *) A->data;
1347   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
1348   int            *procs,*nprocs,j,found,idx,nsends,*work;
1349   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
1350   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
1351   int            *lens,imdex,*lrows,*values,bs=l->bs;
1352   MPI_Comm       comm = A->comm;
1353   MPI_Request    *send_waits,*recv_waits;
1354   MPI_Status     recv_status,*send_status;
1355   IS             istmp;
1356 
1357   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
1358   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
1359 
1360   /*  first count number of contributors to each processor */
1361   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
1362   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
1363   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
1364   for ( i=0; i<N; i++ ) {
1365     idx = rows[i];
1366     found = 0;
1367     for ( j=0; j<size; j++ ) {
1368       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
1369         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
1370       }
1371     }
1372     if (!found) SETERRQ(1,0,"Index out of range");
1373   }
1374   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
1375 
1376   /* inform other processors of number of messages and max length*/
1377   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
1378   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
1379   nrecvs = work[rank];
1380   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
1381   nmax = work[rank];
1382   PetscFree(work);
1383 
1384   /* post receives:   */
1385   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
1386   CHKPTRQ(rvalues);
1387   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
1388   CHKPTRQ(recv_waits);
1389   for ( i=0; i<nrecvs; i++ ) {
1390     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
1391   }
1392 
1393   /* do sends:
1394       1) starts[i] gives the starting index in svalues for stuff going to
1395          the ith processor
1396   */
1397   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
1398   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
1399   CHKPTRQ(send_waits);
1400   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
1401   starts[0] = 0;
1402   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1403   for ( i=0; i<N; i++ ) {
1404     svalues[starts[owner[i]]++] = rows[i];
1405   }
1406   ISRestoreIndices(is,&rows);
1407 
1408   starts[0] = 0;
1409   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1410   count = 0;
1411   for ( i=0; i<size; i++ ) {
1412     if (procs[i]) {
1413       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
1414     }
1415   }
1416   PetscFree(starts);
1417 
1418   base = owners[rank]*bs;
1419 
1420   /*  wait on receives */
1421   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
1422   source = lens + nrecvs;
1423   count  = nrecvs; slen = 0;
1424   while (count) {
1425     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
1426     /* unpack receives into our local space */
1427     MPI_Get_count(&recv_status,MPI_INT,&n);
1428     source[imdex]  = recv_status.MPI_SOURCE;
1429     lens[imdex]  = n;
1430     slen += n;
1431     count--;
1432   }
1433   PetscFree(recv_waits);
1434 
1435   /* move the data into the send scatter */
1436   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
1437   count = 0;
1438   for ( i=0; i<nrecvs; i++ ) {
1439     values = rvalues + i*nmax;
1440     for ( j=0; j<lens[i]; j++ ) {
1441       lrows[count++] = values[j] - base;
1442     }
1443   }
1444   PetscFree(rvalues); PetscFree(lens);
1445   PetscFree(owner); PetscFree(nprocs);
1446 
1447   /* actually zap the local rows */
1448   ierr = ISCreateGeneral(PETSC_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
1449   PLogObjectParent(A,istmp);
1450   PetscFree(lrows);
1451   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
1452   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
1453   ierr = ISDestroy(istmp); CHKERRQ(ierr);
1454 
1455   /* wait on sends */
1456   if (nsends) {
1457     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
1458     CHKPTRQ(send_status);
1459     MPI_Waitall(nsends,send_waits,send_status);
1460     PetscFree(send_status);
1461   }
1462   PetscFree(send_waits); PetscFree(svalues);
1463 
1464   return 0;
1465 }
1466 extern int MatPrintHelp_SeqBAIJ(Mat);
1467 #undef __FUNC__
1468 #define __FUNC__ "MatPrintHelp_MPIBAIJ"
1469 int MatPrintHelp_MPIBAIJ(Mat A)
1470 {
1471   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1472 
1473   if (!a->rank) return MatPrintHelp_SeqBAIJ(a->A);
1474   else return 0;
1475 }
1476 
1477 #undef __FUNC__
1478 #define __FUNC__ "MatSetUnfactored_MPIBAIJ"
1479 int MatSetUnfactored_MPIBAIJ(Mat A)
1480 {
1481   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1482   int         ierr;
1483   ierr = MatSetUnfactored(a->A); CHKERRQ(ierr);
1484   return 0;
1485 }
1486 
1487 static int MatConvertSameType_MPIBAIJ(Mat,Mat *,int);
1488 
1489 /* -------------------------------------------------------------------*/
1490 static struct _MatOps MatOps = {
1491   MatSetValues_MPIBAIJ,MatGetRow_MPIBAIJ,MatRestoreRow_MPIBAIJ,MatMult_MPIBAIJ,
1492   MatMultAdd_MPIBAIJ,MatMultTrans_MPIBAIJ,MatMultTransAdd_MPIBAIJ,0,
1493   0,0,0,0,
1494   0,0,MatTranspose_MPIBAIJ,MatGetInfo_MPIBAIJ,
1495   0,MatGetDiagonal_MPIBAIJ,MatDiagonalScale_MPIBAIJ,MatNorm_MPIBAIJ,
1496   MatAssemblyBegin_MPIBAIJ,MatAssemblyEnd_MPIBAIJ,0,MatSetOption_MPIBAIJ,
1497   MatZeroEntries_MPIBAIJ,MatZeroRows_MPIBAIJ,0,
1498   0,0,0,MatGetSize_MPIBAIJ,
1499   MatGetLocalSize_MPIBAIJ,MatGetOwnershipRange_MPIBAIJ,0,0,
1500   0,0,MatConvertSameType_MPIBAIJ,0,0,
1501   0,0,0,MatGetSubMatrices_MPIBAIJ,
1502   MatIncreaseOverlap_MPIBAIJ,MatGetValues_MPIBAIJ,0,MatPrintHelp_MPIBAIJ,
1503   MatScale_MPIBAIJ,0,0,0,MatGetBlockSize_MPIBAIJ,
1504   0,0,0,0,0,0,MatSetUnfactored_MPIBAIJ,0,MatSetValuesBlocked_MPIBAIJ};
1505 
1506 
1507 #undef __FUNC__
1508 #define __FUNC__ "MatCreateMPIBAIJ"
1509 /*@C
1510    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
1511    (block compressed row).  For good matrix assembly performance
1512    the user should preallocate the matrix storage by setting the parameters
1513    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1514    performance can be increased by more than a factor of 50.
1515 
1516    Input Parameters:
1517 .  comm - MPI communicator
1518 .  bs   - size of blockk
1519 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1520            This value should be the same as the local size used in creating the
1521            y vector for the matrix-vector product y = Ax.
1522 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1523            This value should be the same as the local size used in creating the
1524            x vector for the matrix-vector product y = Ax.
1525 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1526 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1527 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
1528            submatrix  (same for all local rows)
1529 .  d_nzz - array containing the number of block nonzeros in the various block rows
1530            of the in diagonal portion of the local (possibly different for each block
1531            row) or PETSC_NULL.  You must leave room for the diagonal entry even if
1532            it is zero.
1533 .  o_nz  - number of block nonzeros per block row in the off-diagonal portion of local
1534            submatrix (same for all local rows).
1535 .  o_nzz - array containing the number of nonzeros in the various block rows of the
1536            off-diagonal portion of the local submatrix (possibly different for
1537            each block row) or PETSC_NULL.
1538 
1539    Output Parameter:
1540 .  A - the matrix
1541 
1542    Notes:
1543    The user MUST specify either the local or global matrix dimensions
1544    (possibly both).
1545 
1546    Storage Information:
1547    For a square global matrix we define each processor's diagonal portion
1548    to be its local rows and the corresponding columns (a square submatrix);
1549    each processor's off-diagonal portion encompasses the remainder of the
1550    local matrix (a rectangular submatrix).
1551 
1552    The user can specify preallocated storage for the diagonal part of
1553    the local submatrix with either d_nz or d_nnz (not both).  Set
1554    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1555    memory allocation.  Likewise, specify preallocated storage for the
1556    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1557 
1558    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1559    the figure below we depict these three local rows and all columns (0-11).
1560 
1561 $          0 1 2 3 4 5 6 7 8 9 10 11
1562 $         -------------------
1563 $  row 3  |  o o o d d d o o o o o o
1564 $  row 4  |  o o o d d d o o o o o o
1565 $  row 5  |  o o o d d d o o o o o o
1566 $         -------------------
1567 $
1568 
1569    Thus, any entries in the d locations are stored in the d (diagonal)
1570    submatrix, and any entries in the o locations are stored in the
1571    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1572    stored simply in the MATSEQBAIJ format for compressed row storage.
1573 
1574    Now d_nz should indicate the number of nonzeros per row in the d matrix,
1575    and o_nz should indicate the number of nonzeros per row in the o matrix.
1576    In general, for PDE problems in which most nonzeros are near the diagonal,
1577    one expects d_nz >> o_nz.   For large problems you MUST preallocate memory
1578    or you will get TERRIBLE performance; see the users' manual chapter on
1579    matrices.
1580 
1581 .keywords: matrix, block, aij, compressed row, sparse, parallel
1582 
1583 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues()
1584 @*/
1585 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
1586                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1587 {
1588   Mat          B;
1589   Mat_MPIBAIJ  *b;
1590   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE,size;
1591 
1592   if (bs < 1) SETERRQ(1,0,"invalid block size specified");
1593   *A = 0;
1594   PetscHeaderCreate(B,_p_Mat,MAT_COOKIE,MATMPIBAIJ,comm);
1595   PLogObjectCreate(B);
1596   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
1597   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
1598   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1599   MPI_Comm_size(comm,&size);
1600   if (size == 1) {
1601     B->ops.getrowij          = MatGetRowIJ_MPIBAIJ;
1602     B->ops.restorerowij      = MatRestoreRowIJ_MPIBAIJ;
1603     B->ops.lufactorsymbolic  = MatLUFactorSymbolic_MPIBAIJ;
1604     B->ops.lufactornumeric   = MatLUFactorNumeric_MPIBAIJ;
1605     B->ops.lufactor          = MatLUFactor_MPIBAIJ;
1606     B->ops.solve             = MatSolve_MPIBAIJ;
1607     B->ops.solveadd          = MatSolveAdd_MPIBAIJ;
1608     B->ops.solvetrans        = MatSolveTrans_MPIBAIJ;
1609     B->ops.solvetransadd     = MatSolveTransAdd_MPIBAIJ;
1610     B->ops.ilufactorsymbolic = MatILUFactorSymbolic_MPIBAIJ;
1611   }
1612 
1613   B->destroy    = MatDestroy_MPIBAIJ;
1614   B->view       = MatView_MPIBAIJ;
1615   B->mapping    = 0;
1616   B->factor     = 0;
1617   B->assembled  = PETSC_FALSE;
1618 
1619   B->insertmode = NOT_SET_VALUES;
1620   MPI_Comm_rank(comm,&b->rank);
1621   MPI_Comm_size(comm,&b->size);
1622 
1623   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
1624     SETERRQ(1,0,"Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1625   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) SETERRQ(1,0,"either M or m should be specified");
1626   if ( M == PETSC_DECIDE && n == PETSC_DECIDE)SETERRQ(1,0,"either N or n should be specified");
1627   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
1628   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
1629 
1630   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1631     work[0] = m; work[1] = n;
1632     mbs = m/bs; nbs = n/bs;
1633     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
1634     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
1635     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
1636   }
1637   if (m == PETSC_DECIDE) {
1638     Mbs = M/bs;
1639     if (Mbs*bs != M) SETERRQ(1,0,"No of global rows must be divisible by blocksize");
1640     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
1641     m   = mbs*bs;
1642   }
1643   if (n == PETSC_DECIDE) {
1644     Nbs = N/bs;
1645     if (Nbs*bs != N) SETERRQ(1,0,"No of global cols must be divisible by blocksize");
1646     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
1647     n   = nbs*bs;
1648   }
1649   if (mbs*bs != m || nbs*bs != n) SETERRQ(1,0,"No of local rows, cols must be divisible by blocksize");
1650 
1651   b->m = m; B->m = m;
1652   b->n = n; B->n = n;
1653   b->N = N; B->N = N;
1654   b->M = M; B->M = M;
1655   b->bs  = bs;
1656   b->bs2 = bs*bs;
1657   b->mbs = mbs;
1658   b->nbs = nbs;
1659   b->Mbs = Mbs;
1660   b->Nbs = Nbs;
1661 
1662   /* build local table of row and column ownerships */
1663   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1664   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIBAIJ));
1665   b->cowners = b->rowners + b->size + 2;
1666   MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
1667   b->rowners[0] = 0;
1668   for ( i=2; i<=b->size; i++ ) {
1669     b->rowners[i] += b->rowners[i-1];
1670   }
1671   b->rstart    = b->rowners[b->rank];
1672   b->rend      = b->rowners[b->rank+1];
1673   b->rstart_bs = b->rstart * bs;
1674   b->rend_bs   = b->rend * bs;
1675 
1676   MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
1677   b->cowners[0] = 0;
1678   for ( i=2; i<=b->size; i++ ) {
1679     b->cowners[i] += b->cowners[i-1];
1680   }
1681   b->cstart    = b->cowners[b->rank];
1682   b->cend      = b->cowners[b->rank+1];
1683   b->cstart_bs = b->cstart * bs;
1684   b->cend_bs   = b->cend * bs;
1685 
1686   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1687   ierr = MatCreateSeqBAIJ(PETSC_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1688   PLogObjectParent(B,b->A);
1689   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1690   ierr = MatCreateSeqBAIJ(PETSC_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1691   PLogObjectParent(B,b->B);
1692 
1693   /* build cache for off array entries formed */
1694   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1695   b->donotstash  = 0;
1696   b->colmap      = 0;
1697   b->garray      = 0;
1698   b->roworiented = 1;
1699 
1700   /* stuff used in block assembly */
1701   b->barray      = 0;
1702 
1703   /* stuff used for matrix vector multiply */
1704   b->lvec        = 0;
1705   b->Mvctx       = 0;
1706 
1707   /* stuff for MatGetRow() */
1708   b->rowindices   = 0;
1709   b->rowvalues    = 0;
1710   b->getrowactive = PETSC_FALSE;
1711 
1712   *A = B;
1713   return 0;
1714 }
1715 
1716 #undef __FUNC__
1717 #define __FUNC__ "MatConvertSameType_MPIBAIJ"
1718 static int MatConvertSameType_MPIBAIJ(Mat matin,Mat *newmat,int cpvalues)
1719 {
1720   Mat         mat;
1721   Mat_MPIBAIJ *a,*oldmat = (Mat_MPIBAIJ *) matin->data;
1722   int         ierr, len=0, flg;
1723 
1724   *newmat       = 0;
1725   PetscHeaderCreate(mat,_p_Mat,MAT_COOKIE,MATMPIBAIJ,matin->comm);
1726   PLogObjectCreate(mat);
1727   mat->data       = (void *) (a = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(a);
1728   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1729   mat->destroy    = MatDestroy_MPIBAIJ;
1730   mat->view       = MatView_MPIBAIJ;
1731   mat->factor     = matin->factor;
1732   mat->assembled  = PETSC_TRUE;
1733 
1734   a->m = mat->m   = oldmat->m;
1735   a->n = mat->n   = oldmat->n;
1736   a->M = mat->M   = oldmat->M;
1737   a->N = mat->N   = oldmat->N;
1738 
1739   a->bs  = oldmat->bs;
1740   a->bs2 = oldmat->bs2;
1741   a->mbs = oldmat->mbs;
1742   a->nbs = oldmat->nbs;
1743   a->Mbs = oldmat->Mbs;
1744   a->Nbs = oldmat->Nbs;
1745 
1746   a->rstart       = oldmat->rstart;
1747   a->rend         = oldmat->rend;
1748   a->cstart       = oldmat->cstart;
1749   a->cend         = oldmat->cend;
1750   a->size         = oldmat->size;
1751   a->rank         = oldmat->rank;
1752   mat->insertmode = NOT_SET_VALUES;
1753   a->rowvalues    = 0;
1754   a->getrowactive = PETSC_FALSE;
1755   a->barray       = 0;
1756 
1757   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1758   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIBAIJ));
1759   a->cowners = a->rowners + a->size + 2;
1760   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1761   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1762   if (oldmat->colmap) {
1763     a->colmap = (int *) PetscMalloc((a->Nbs)*sizeof(int));CHKPTRQ(a->colmap);
1764     PLogObjectMemory(mat,(a->Nbs)*sizeof(int));
1765     PetscMemcpy(a->colmap,oldmat->colmap,(a->Nbs)*sizeof(int));
1766   } else a->colmap = 0;
1767   if (oldmat->garray && (len = ((Mat_SeqBAIJ *) (oldmat->B->data))->nbs)) {
1768     a->garray = (int *) PetscMalloc(len*sizeof(int)); CHKPTRQ(a->garray);
1769     PLogObjectMemory(mat,len*sizeof(int));
1770     PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1771   } else a->garray = 0;
1772 
1773   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1774   PLogObjectParent(mat,a->lvec);
1775   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1776   PLogObjectParent(mat,a->Mvctx);
1777   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1778   PLogObjectParent(mat,a->A);
1779   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1780   PLogObjectParent(mat,a->B);
1781   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1782   if (flg) {
1783     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1784   }
1785   *newmat = mat;
1786   return 0;
1787 }
1788 
1789 #include "sys.h"
1790 
1791 #undef __FUNC__
1792 #define __FUNC__ "MatLoad_MPIBAIJ"
1793 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
1794 {
1795   Mat          A;
1796   int          i, nz, ierr, j,rstart, rend, fd;
1797   Scalar       *vals,*buf;
1798   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1799   MPI_Status   status;
1800   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
1801   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
1802   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,bs2,Mbs,mbs,extra_rows;
1803   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
1804   int          dcount,kmax,k,nzcount,tmp;
1805 
1806 
1807   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
1808   bs2  = bs*bs;
1809 
1810   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1811   if (!rank) {
1812     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1813     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
1814     if (header[0] != MAT_COOKIE) SETERRQ(1,0,"not matrix object");
1815   }
1816 
1817   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1818   M = header[1]; N = header[2];
1819 
1820   if (M != N) SETERRQ(1,0,"Can only do square matrices");
1821 
1822   /*
1823      This code adds extra rows to make sure the number of rows is
1824      divisible by the blocksize
1825   */
1826   Mbs        = M/bs;
1827   extra_rows = bs - M + bs*(Mbs);
1828   if (extra_rows == bs) extra_rows = 0;
1829   else                  Mbs++;
1830   if (extra_rows &&!rank) {
1831     PLogInfo(0,"MatLoad_MPIBAIJ:Padding loaded matrix to match blocksize\n");
1832   }
1833 
1834   /* determine ownership of all rows */
1835   mbs = Mbs/size + ((Mbs % size) > rank);
1836   m   = mbs * bs;
1837   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
1838   browners = rowners + size + 1;
1839   MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1840   rowners[0] = 0;
1841   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
1842   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
1843   rstart = rowners[rank];
1844   rend   = rowners[rank+1];
1845 
1846   /* distribute row lengths to all processors */
1847   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
1848   if (!rank) {
1849     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
1850     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1851     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
1852     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1853     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
1854     MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);
1855     PetscFree(sndcounts);
1856   }
1857   else {
1858     MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);
1859   }
1860 
1861   if (!rank) {
1862     /* calculate the number of nonzeros on each processor */
1863     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1864     PetscMemzero(procsnz,size*sizeof(int));
1865     for ( i=0; i<size; i++ ) {
1866       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
1867         procsnz[i] += rowlengths[j];
1868       }
1869     }
1870     PetscFree(rowlengths);
1871 
1872     /* determine max buffer needed and allocate it */
1873     maxnz = 0;
1874     for ( i=0; i<size; i++ ) {
1875       maxnz = PetscMax(maxnz,procsnz[i]);
1876     }
1877     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1878 
1879     /* read in my part of the matrix column indices  */
1880     nz = procsnz[0];
1881     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1882     mycols = ibuf;
1883     if (size == 1)  nz -= extra_rows;
1884     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1885     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
1886 
1887     /* read in every ones (except the last) and ship off */
1888     for ( i=1; i<size-1; i++ ) {
1889       nz = procsnz[i];
1890       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1891       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1892     }
1893     /* read in the stuff for the last proc */
1894     if ( size != 1 ) {
1895       nz = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
1896       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1897       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
1898       MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);
1899     }
1900     PetscFree(cols);
1901   }
1902   else {
1903     /* determine buffer space needed for message */
1904     nz = 0;
1905     for ( i=0; i<m; i++ ) {
1906       nz += locrowlens[i];
1907     }
1908     ibuf = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1909     mycols = ibuf;
1910     /* receive message of column indices*/
1911     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1912     MPI_Get_count(&status,MPI_INT,&maxnz);
1913     if (maxnz != nz) SETERRQ(1,0,"something is wrong with file");
1914   }
1915 
1916   /* loop over local rows, determining number of off diagonal entries */
1917   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
1918   odlens = dlens + (rend-rstart);
1919   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
1920   PetscMemzero(mask,3*Mbs*sizeof(int));
1921   masked1 = mask    + Mbs;
1922   masked2 = masked1 + Mbs;
1923   rowcount = 0; nzcount = 0;
1924   for ( i=0; i<mbs; i++ ) {
1925     dcount  = 0;
1926     odcount = 0;
1927     for ( j=0; j<bs; j++ ) {
1928       kmax = locrowlens[rowcount];
1929       for ( k=0; k<kmax; k++ ) {
1930         tmp = mycols[nzcount++]/bs;
1931         if (!mask[tmp]) {
1932           mask[tmp] = 1;
1933           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
1934           else masked1[dcount++] = tmp;
1935         }
1936       }
1937       rowcount++;
1938     }
1939 
1940     dlens[i]  = dcount;
1941     odlens[i] = odcount;
1942 
1943     /* zero out the mask elements we set */
1944     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
1945     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
1946   }
1947 
1948   /* create our matrix */
1949   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);
1950          CHKERRQ(ierr);
1951   A = *newmat;
1952   MatSetOption(A,MAT_COLUMNS_SORTED);
1953 
1954   if (!rank) {
1955     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
1956     /* read in my part of the matrix numerical values  */
1957     nz = procsnz[0];
1958     vals = buf;
1959     mycols = ibuf;
1960     if (size == 1)  nz -= extra_rows;
1961     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1962     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
1963 
1964     /* insert into matrix */
1965     jj      = rstart*bs;
1966     for ( i=0; i<m; i++ ) {
1967       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1968       mycols += locrowlens[i];
1969       vals   += locrowlens[i];
1970       jj++;
1971     }
1972     /* read in other processors (except the last one) and ship out */
1973     for ( i=1; i<size-1; i++ ) {
1974       nz = procsnz[i];
1975       vals = buf;
1976       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1977       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1978     }
1979     /* the last proc */
1980     if ( size != 1 ){
1981       nz = procsnz[i] - extra_rows;
1982       vals = buf;
1983       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1984       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
1985       MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);
1986     }
1987     PetscFree(procsnz);
1988   }
1989   else {
1990     /* receive numeric values */
1991     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
1992 
1993     /* receive message of values*/
1994     vals = buf;
1995     mycols = ibuf;
1996     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1997     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1998     if (maxnz != nz) SETERRQ(1,0,"something is wrong with file");
1999 
2000     /* insert into matrix */
2001     jj      = rstart*bs;
2002     for ( i=0; i<m; i++ ) {
2003       ierr    = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
2004       mycols += locrowlens[i];
2005       vals   += locrowlens[i];
2006       jj++;
2007     }
2008   }
2009   PetscFree(locrowlens);
2010   PetscFree(buf);
2011   PetscFree(ibuf);
2012   PetscFree(rowners);
2013   PetscFree(dlens);
2014   PetscFree(mask);
2015   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2016   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2017   return 0;
2018 }
2019 
2020 
2021