xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision 352017bd5aa6ceda2bc2a2dc683edf1965bb8960)
1 #ifdef PETSC_RCS_HEADER
2 static char vcid[] = "$Id: mpibaij.c,v 1.99 1998/01/13 00:07:02 balay Exp balay $";
3 #endif
4 
5 #include "pinclude/pviewer.h"
6 #include "src/mat/impls/baij/mpi/mpibaij.h"
7 #include "src/vec/vecimpl.h"
8 
9 
10 extern int MatSetUpMultiply_MPIBAIJ(Mat);
11 extern int DisAssemble_MPIBAIJ(Mat);
12 extern int MatIncreaseOverlap_MPIBAIJ(Mat,int,IS *,int);
13 extern int MatGetSubMatrices_MPIBAIJ(Mat,int,IS *,IS *,MatGetSubMatrixCall,Mat **);
14 
15 /*
16      Local utility routine that creates a mapping from the global column
17    number to the local number in the off-diagonal part of the local
18    storage of the matrix.  This is done in a non scable way since the
19    length of colmap equals the global matrix length.
20 */
21 #undef __FUNC__
22 #define __FUNC__ "CreateColmap_MPIBAIJ_Private"
23 static int CreateColmap_MPIBAIJ_Private(Mat mat)
24 {
25   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
26   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
27   int         nbs = B->nbs,i,bs=B->bs;;
28 
29   PetscFunctionBegin;
30   baij->colmap = (int *) PetscMalloc((baij->Nbs+1)*sizeof(int));CHKPTRQ(baij->colmap);
31   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
32   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
33   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i*bs+1;
34   PetscFunctionReturn(0);
35 }
36 
37 #define CHUNKSIZE  10
38 
39 #define  MatSetValues_SeqBAIJ_A_Private(row,col,value,addv) \
40 { \
41  \
42     brow = row/bs;  \
43     rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
44     rmax = aimax[brow]; nrow = ailen[brow]; \
45       bcol = col/bs; \
46       ridx = row % bs; cidx = col % bs; \
47       low = 0; high = nrow; \
48       while (high-low > 3) { \
49         t = (low+high)/2; \
50         if (rp[t] > bcol) high = t; \
51         else              low  = t; \
52       } \
53       for ( _i=low; _i<high; _i++ ) { \
54         if (rp[_i] > bcol) break; \
55         if (rp[_i] == bcol) { \
56           bap  = ap +  bs2*_i + bs*cidx + ridx; \
57           if (addv == ADD_VALUES) *bap += value;  \
58           else                    *bap  = value;  \
59           goto a_noinsert; \
60         } \
61       } \
62       if (a->nonew == 1) goto a_noinsert; \
63       else if (a->nonew == -1) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero into matrix"); \
64       if (nrow >= rmax) { \
65         /* there is no extra room in row, therefore enlarge */ \
66         int    new_nz = ai[a->mbs] + CHUNKSIZE,len,*new_i,*new_j; \
67         Scalar *new_a; \
68  \
69         if (a->nonew == -2) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero in the matrix"); \
70  \
71         /* malloc new storage space */ \
72         len     = new_nz*(sizeof(int)+bs2*sizeof(Scalar))+(a->mbs+1)*sizeof(int); \
73         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
74         new_j   = (int *) (new_a + bs2*new_nz); \
75         new_i   = new_j + new_nz; \
76  \
77         /* copy over old data into new slots */ \
78         for ( ii=0; ii<brow+1; ii++ ) {new_i[ii] = ai[ii];} \
79         for ( ii=brow+1; ii<a->mbs+1; ii++ ) {new_i[ii] = ai[ii]+CHUNKSIZE;} \
80         PetscMemcpy(new_j,aj,(ai[brow]+nrow)*sizeof(int)); \
81         len = (new_nz - CHUNKSIZE - ai[brow] - nrow); \
82         PetscMemcpy(new_j+ai[brow]+nrow+CHUNKSIZE,aj+ai[brow]+nrow, \
83                                                            len*sizeof(int)); \
84         PetscMemcpy(new_a,aa,(ai[brow]+nrow)*bs2*sizeof(Scalar)); \
85         PetscMemzero(new_a+bs2*(ai[brow]+nrow),bs2*CHUNKSIZE*sizeof(Scalar)); \
86         PetscMemcpy(new_a+bs2*(ai[brow]+nrow+CHUNKSIZE), \
87                     aa+bs2*(ai[brow]+nrow),bs2*len*sizeof(Scalar));  \
88         /* free up old matrix storage */ \
89         PetscFree(a->a);  \
90         if (!a->singlemalloc) {PetscFree(a->i);PetscFree(a->j);} \
91         aa = a->a = new_a; ai = a->i = new_i; aj = a->j = new_j;  \
92         a->singlemalloc = 1; \
93  \
94         rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
95         rmax = aimax[brow] = aimax[brow] + CHUNKSIZE; \
96         PLogObjectMemory(A,CHUNKSIZE*(sizeof(int) + bs2*sizeof(Scalar))); \
97         a->maxnz += bs2*CHUNKSIZE; \
98         a->reallocs++; \
99         a->nz++; \
100       } \
101       N = nrow++ - 1;  \
102       /* shift up all the later entries in this row */ \
103       for ( ii=N; ii>=_i; ii-- ) { \
104         rp[ii+1] = rp[ii]; \
105         PetscMemcpy(ap+bs2*(ii+1),ap+bs2*(ii),bs2*sizeof(Scalar)); \
106       } \
107       if (N>=_i) PetscMemzero(ap+bs2*_i,bs2*sizeof(Scalar));  \
108       rp[_i]                      = bcol;  \
109       ap[bs2*_i + bs*cidx + ridx] = value;  \
110       a_noinsert:; \
111     ailen[brow] = nrow; \
112 }
113 
114 #define  MatSetValues_SeqBAIJ_B_Private(row,col,value,addv) \
115 { \
116  \
117     brow = row/bs;  \
118     rp   = bj + bi[brow]; ap = ba + bs2*bi[brow]; \
119     rmax = bimax[brow]; nrow = bilen[brow]; \
120       bcol = col/bs; \
121       ridx = row % bs; cidx = col % bs; \
122       low = 0; high = nrow; \
123       while (high-low > 3) { \
124         t = (low+high)/2; \
125         if (rp[t] > bcol) high = t; \
126         else              low  = t; \
127       } \
128       for ( _i=low; _i<high; _i++ ) { \
129         if (rp[_i] > bcol) break; \
130         if (rp[_i] == bcol) { \
131           bap  = ap +  bs2*_i + bs*cidx + ridx; \
132           if (addv == ADD_VALUES) *bap += value;  \
133           else                    *bap  = value;  \
134           goto b_noinsert; \
135         } \
136       } \
137       if (b->nonew == 1) goto b_noinsert; \
138       else if (b->nonew == -1) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero into matrix"); \
139       if (nrow >= rmax) { \
140         /* there is no extra room in row, therefore enlarge */ \
141         int    new_nz = bi[b->mbs] + CHUNKSIZE,len,*new_i,*new_j; \
142         Scalar *new_a; \
143  \
144         if (b->nonew == -2) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero in the matrix"); \
145  \
146         /* malloc new storage space */ \
147         len     = new_nz*(sizeof(int)+bs2*sizeof(Scalar))+(b->mbs+1)*sizeof(int); \
148         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
149         new_j   = (int *) (new_a + bs2*new_nz); \
150         new_i   = new_j + new_nz; \
151  \
152         /* copy over old data into new slots */ \
153         for ( ii=0; ii<brow+1; ii++ ) {new_i[ii] = bi[ii];} \
154         for ( ii=brow+1; ii<b->mbs+1; ii++ ) {new_i[ii] = bi[ii]+CHUNKSIZE;} \
155         PetscMemcpy(new_j,bj,(bi[brow]+nrow)*sizeof(int)); \
156         len = (new_nz - CHUNKSIZE - bi[brow] - nrow); \
157         PetscMemcpy(new_j+bi[brow]+nrow+CHUNKSIZE,bj+bi[brow]+nrow, \
158                                                            len*sizeof(int)); \
159         PetscMemcpy(new_a,ba,(bi[brow]+nrow)*bs2*sizeof(Scalar)); \
160         PetscMemzero(new_a+bs2*(bi[brow]+nrow),bs2*CHUNKSIZE*sizeof(Scalar)); \
161         PetscMemcpy(new_a+bs2*(bi[brow]+nrow+CHUNKSIZE), \
162                     ba+bs2*(bi[brow]+nrow),bs2*len*sizeof(Scalar));  \
163         /* free up old matrix storage */ \
164         PetscFree(b->a);  \
165         if (!b->singlemalloc) {PetscFree(b->i);PetscFree(b->j);} \
166         ba = b->a = new_a; bi = b->i = new_i; bj = b->j = new_j;  \
167         b->singlemalloc = 1; \
168  \
169         rp   = bj + bi[brow]; ap = ba + bs2*bi[brow]; \
170         rmax = bimax[brow] = bimax[brow] + CHUNKSIZE; \
171         PLogObjectMemory(B,CHUNKSIZE*(sizeof(int) + bs2*sizeof(Scalar))); \
172         b->maxnz += bs2*CHUNKSIZE; \
173         b->reallocs++; \
174         b->nz++; \
175       } \
176       N = nrow++ - 1;  \
177       /* shift up all the later entries in this row */ \
178       for ( ii=N; ii>=_i; ii-- ) { \
179         rp[ii+1] = rp[ii]; \
180         PetscMemcpy(ap+bs2*(ii+1),ap+bs2*(ii),bs2*sizeof(Scalar)); \
181       } \
182       if (N>=_i) PetscMemzero(ap+bs2*_i,bs2*sizeof(Scalar));  \
183       rp[_i]                      = bcol;  \
184       ap[bs2*_i + bs*cidx + ridx] = value;  \
185       b_noinsert:; \
186     bilen[brow] = nrow; \
187 }
188 
189 #undef __FUNC__
190 #define __FUNC__ "MatSetValues_MPIBAIJ"
191 int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
192 {
193   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
194   Scalar      value;
195   int         ierr,i,j,row,col;
196   int         roworiented = baij->roworiented,rstart_orig=baij->rstart_bs ;
197   int         rend_orig=baij->rend_bs,cstart_orig=baij->cstart_bs;
198   int         cend_orig=baij->cend_bs,bs=baij->bs;
199 
200   /* Some Variables required in the macro */
201   Mat         A = baij->A;
202   Mat_SeqBAIJ *a = (Mat_SeqBAIJ *) (A)->data;
203   int         *aimax=a->imax,*ai=a->i,*ailen=a->ilen,*aj=a->j;
204   Scalar      *aa=a->a;
205 
206   Mat         B = baij->B;
207   Mat_SeqBAIJ *b = (Mat_SeqBAIJ *) (B)->data;
208   int         *bimax=b->imax,*bi=b->i,*bilen=b->ilen,*bj=b->j;
209   Scalar      *ba=b->a;
210 
211   int         *rp,ii,nrow,_i,rmax,N,brow,bcol;
212   int         low,high,t,ridx,cidx,bs2=a->bs2;
213   Scalar      *ap,*bap;
214 
215   PetscFunctionBegin;
216   for ( i=0; i<m; i++ ) {
217 #if defined(USE_PETSC_BOPT_g)
218     if (im[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
219     if (im[i] >= baij->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
220 #endif
221     if (im[i] >= rstart_orig && im[i] < rend_orig) {
222       row = im[i] - rstart_orig;
223       for ( j=0; j<n; j++ ) {
224         if (in[j] >= cstart_orig && in[j] < cend_orig){
225           col = in[j] - cstart_orig;
226           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
227           MatSetValues_SeqBAIJ_A_Private(row,col,value,addv);
228           /* ierr = MatSetValues_SeqBAIJ(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
229         }
230 #if defined(USE_PETSC_BOPT_g)
231         else if (in[j] < 0) {SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative column");}
232         else if (in[j] >= baij->N) {SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Col too large");}
233 #endif
234         else {
235           if (mat->was_assembled) {
236             if (!baij->colmap) {
237               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
238             }
239             col = baij->colmap[in[j]/bs] - 1 + in[j]%bs;
240             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
241               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
242               col =  in[j];
243               /* Reinitialize the variables required by MatSetValues_SeqBAIJ_B_Private() */
244               B = baij->B;
245               b = (Mat_SeqBAIJ *) (B)->data;
246               bimax=b->imax;bi=b->i;bilen=b->ilen;bj=b->j;
247               ba=b->a;
248             }
249           } else col = in[j];
250           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
251           MatSetValues_SeqBAIJ_B_Private(row,col,value,addv);
252           /* ierr = MatSetValues_SeqBAIJ(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
253         }
254       }
255     } else {
256       if (roworiented && !baij->donotstash) {
257         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
258       } else {
259         if (!baij->donotstash) {
260           row = im[i];
261 	  for ( j=0; j<n; j++ ) {
262 	    ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
263           }
264         }
265       }
266     }
267   }
268   PetscFunctionReturn(0);
269 }
270 
271 extern int MatSetValuesBlocked_SeqBAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
272 #undef __FUNC__
273 #define __FUNC__ "MatSetValuesBlocked_MPIBAIJ"
274 int MatSetValuesBlocked_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
275 {
276   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
277   Scalar      *value,*barray=baij->barray;
278   int         ierr,i,j,ii,jj,row,col,k,l;
279   int         roworiented = baij->roworiented,rstart=baij->rstart ;
280   int         rend=baij->rend,cstart=baij->cstart,stepval;
281   int         cend=baij->cend,bs=baij->bs,bs2=baij->bs2;
282 
283   if(!barray) {
284     baij->barray = barray = (Scalar*) PetscMalloc(bs2*sizeof(Scalar)); CHKPTRQ(barray);
285   }
286 
287   if (roworiented) {
288     stepval = (n-1)*bs;
289   } else {
290     stepval = (m-1)*bs;
291   }
292   for ( i=0; i<m; i++ ) {
293 #if defined(USE_PETSC_BOPT_g)
294     if (im[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
295     if (im[i] >= baij->Mbs) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
296 #endif
297     if (im[i] >= rstart && im[i] < rend) {
298       row = im[i] - rstart;
299       for ( j=0; j<n; j++ ) {
300         /* If NumCol = 1 then a copy is not required */
301         if ((roworiented) && (n == 1)) {
302           barray = v + i*bs2;
303         } else if((!roworiented) && (m == 1)) {
304           barray = v + j*bs2;
305         } else { /* Here a copy is required */
306           if (roworiented) {
307             value = v + i*(stepval+bs)*bs + j*bs;
308           } else {
309             value = v + j*(stepval+bs)*bs + i*bs;
310           }
311           for ( ii=0; ii<bs; ii++,value+=stepval ) {
312             for (jj=0; jj<bs; jj++ ) {
313               *barray++  = *value++;
314             }
315           }
316           barray -=bs2;
317         }
318 
319         if (in[j] >= cstart && in[j] < cend){
320           col  = in[j] - cstart;
321           ierr = MatSetValuesBlocked_SeqBAIJ(baij->A,1,&row,1,&col,barray,addv);CHKERRQ(ierr);
322         }
323 #if defined(USE_PETSC_BOPT_g)
324         else if (in[j] < 0) {SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative column");}
325         else if (in[j] >= baij->Nbs) {SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Column too large");}
326 #endif
327         else {
328           if (mat->was_assembled) {
329             if (!baij->colmap) {
330               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
331             }
332 
333 #if defined(USE_PETSC_BOPT_g)
334             if ((baij->colmap[in[j]] - 1) % bs) {SETERRQ(PETSC_ERR_PLIB,0,"Incorrect colmap");}
335 #endif
336             col = (baij->colmap[in[j]] - 1)/bs;
337             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
338               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
339               col =  in[j];
340             }
341           }
342           else col = in[j];
343           ierr = MatSetValuesBlocked_SeqBAIJ(baij->B,1,&row,1,&col,barray,addv);CHKERRQ(ierr);
344         }
345       }
346     } else {
347       if (!baij->donotstash) {
348         if (roworiented ) {
349           row   = im[i]*bs;
350           value = v + i*(stepval+bs)*bs;
351           for ( j=0; j<bs; j++,row++ ) {
352             for ( k=0; k<n; k++ ) {
353               for ( col=in[k]*bs,l=0; l<bs; l++,col++) {
354                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
355               }
356             }
357           }
358         } else {
359           for ( j=0; j<n; j++ ) {
360             value = v + j*(stepval+bs)*bs + i*bs;
361             col   = in[j]*bs;
362             for ( k=0; k<bs; k++,col++,value+=stepval) {
363               for ( row = im[i]*bs,l=0; l<bs; l++,row++) {
364                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
365               }
366             }
367           }
368         }
369       }
370     }
371   }
372   PetscFunctionReturn(0);
373 }
374 #include <math.h>
375 #define HASH_KEY 0.6180339887
376 /* #define HASH1(size,key) ((int)((size)*fmod(((key)*HASH_KEY),1))) */
377 #define HASH(size,key,tmp) (tmp = (key)*HASH_KEY,(int)((size)*(tmp-(int)tmp)))
378 /* #define HASH(size,key,tmp) ((int)((size)*fmod(((key)*HASH_KEY),1))) */
379 #undef __FUNC__
380 #define __FUNC__ "MatSetValues_MPIBAIJ_HT"
381 int MatSetValues_MPIBAIJ_HT(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
382 {
383   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
384   int         ierr,i,j,row,col;
385   int         roworiented = baij->roworiented,rstart_orig=baij->rstart_bs ;
386   int         rend_orig=baij->rend_bs,Nbs=baij->Nbs;
387 
388   int         h1,key,size=baij->ht_size,bs=baij->bs,*HT=baij->ht,idx;
389   double      tmp;
390   Scalar      ** HD = baij->hd,value;
391 
392 
393   PetscFunctionBegin;
394 
395   for ( i=0; i<m; i++ ) {
396 #if defined(USE_PETSC_BOPT_g)
397     if (im[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
398     if (im[i] >= baij->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
399 #endif
400       row = im[i];
401     if (row >= rstart_orig && row < rend_orig) {
402       for ( j=0; j<n; j++ ) {
403         col = in[j];
404         if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
405         /* Look up into the Hash Table */
406         key = (row/bs)*Nbs+(col/bs)+1;
407         h1  = HASH(size,key,tmp);
408 
409 
410         idx = h1;
411         if (HT[idx] != key) {
412           for ( idx=h1; (idx<size) && (HT[idx]!=key); idx++);
413           if (idx == size) {
414             for ( idx=0; (idx<h1) && (HT[idx]!=key); idx++);
415             if (idx == h1) {
416               SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"(row,col) has no entry in the hash table");
417             }
418           }
419         }
420         /* A HASH table entry is found, so insert the values at the correct address */
421         if (addv == ADD_VALUES) *(HD[idx]+ (col % bs)*bs + (row % bs)) += value;
422         else                    *(HD[idx]+ (col % bs)*bs + (row % bs))  = value;
423         break;
424       }
425     } else {
426       if (roworiented && !baij->donotstash) {
427         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
428       } else {
429         if (!baij->donotstash) {
430           row = im[i];
431 	  for ( j=0; j<n; j++ ) {
432 	    ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
433           }
434         }
435       }
436     }
437   }
438   PetscFunctionReturn(0);
439 }
440 
441 #undef __FUNC__
442 #define __FUNC__ "MatSetValuesBlocked_MPIBAIJ_HT"
443 int MatSetValuesBlocked_MPIBAIJ_HT(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
444 {
445   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
446   int         ierr,i,j,ii,jj,row,col,k,l;
447   int         roworiented = baij->roworiented,rstart=baij->rstart ;
448   int         rend=baij->rend,stepval,bs=baij->bs,bs2=baij->bs2;
449 
450   int         h1,key,size=baij->ht_size,idx,*HT=baij->ht,Nbs=baij->Nbs;
451   double      tmp;
452   Scalar      ** HD = baij->hd,*value,*baij_a;
453 
454   PetscFunctionBegin;
455 
456   if (roworiented) {
457     stepval = (n-1)*bs;
458   } else {
459     stepval = (m-1)*bs;
460   }
461   for ( i=0; i<m; i++ ) {
462 #if defined(USE_PETSC_BOPT_g)
463     if (im[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
464     if (im[i] >= baij->Mbs) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
465 #endif
466     row = im[i];
467     if (row >= rstart && row < rend) {
468       for ( j=0; j<n; j++ ) {
469         col = in[j];
470 
471         /* Look up into the Hash Table */
472         key = row*Nbs+col+1;
473         h1  = HASH(size,key,tmp);
474 
475         idx = h1;
476         if (HT[idx] != key) {
477           for ( idx=h1; (idx<size) && (HT[idx]!=key); idx++);
478           if (idx == size) {
479             for ( idx=0; (idx<h1) && (HT[idx]!=key); idx++);
480             if (idx == h1) {
481               SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"(row,col) has no entry in the hash table");
482             }
483           }
484         }
485         baij_a = HD[idx];
486         if (roworiented) {
487           /*value = v + i*(stepval+bs)*bs + j*bs;*/
488           value = v + (i*(stepval+bs)+j)*bs;
489           if (addv == ADD_VALUES) {
490             for ( ii=0; ii<bs; ii++,value+=stepval) {
491               for ( jj=ii; jj<bs2; jj+=bs ) {
492                 baij_a[jj]  += *value++;
493               }
494             }
495           } else {
496             for ( ii=0; ii<bs; ii++,value+=stepval) {
497               for ( jj=ii; jj<bs2; jj+=bs ) {
498                 baij_a[jj]  = *value++;
499               }
500             }
501           }
502         } else {
503           value = v + j*(stepval+bs)*bs + i*bs;
504           if (addv == ADD_VALUES) {
505             for ( ii=0; ii<bs; ii++,value+=stepval,baij_a+=bs ) {
506               for ( jj=0; jj<bs; jj++ ) {
507                 baij_a[jj]  += *value++;
508               }
509             }
510           } else {
511             for ( ii=0; ii<bs; ii++,value+=stepval,baij_a+=bs ) {
512               for ( jj=0; jj<bs; jj++ ) {
513                 baij_a[jj]  = *value++;
514               }
515             }
516           }
517         }
518       }
519     } else {
520       if (!baij->donotstash) {
521         if (roworiented ) {
522           row   = im[i]*bs;
523           value = v + i*(stepval+bs)*bs;
524           for ( j=0; j<bs; j++,row++ ) {
525             for ( k=0; k<n; k++ ) {
526               for ( col=in[k]*bs,l=0; l<bs; l++,col++) {
527                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
528               }
529             }
530           }
531         } else {
532           for ( j=0; j<n; j++ ) {
533             value = v + j*(stepval+bs)*bs + i*bs;
534             col   = in[j]*bs;
535             for ( k=0; k<bs; k++,col++,value+=stepval) {
536               for ( row = im[i]*bs,l=0; l<bs; l++,row++) {
537                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
538               }
539             }
540           }
541         }
542       }
543     }
544   }
545   PetscFunctionReturn(0);
546 }
547 #undef __FUNC__
548 #define __FUNC__ "MatGetValues_MPIBAIJ"
549 int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
550 {
551   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
552   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
553   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col;
554 
555   for ( i=0; i<m; i++ ) {
556     if (idxm[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
557     if (idxm[i] >= baij->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
558     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
559       row = idxm[i] - bsrstart;
560       for ( j=0; j<n; j++ ) {
561         if (idxn[j] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative column");
562         if (idxn[j] >= baij->N) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Column too large");
563         if (idxn[j] >= bscstart && idxn[j] < bscend){
564           col = idxn[j] - bscstart;
565           ierr = MatGetValues(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
566         } else {
567           if (!baij->colmap) {
568             ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
569           }
570           if((baij->colmap[idxn[j]/bs]-1 < 0) ||
571              (baij->garray[(baij->colmap[idxn[j]/bs]-1)/bs] != idxn[j]/bs)) *(v+i*n+j) = 0.0;
572           else {
573             col  = (baij->colmap[idxn[j]/bs]-1) + idxn[j]%bs;
574             ierr = MatGetValues(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
575           }
576         }
577       }
578     } else {
579       SETERRQ(PETSC_ERR_SUP,0,"Only local values currently supported");
580     }
581   }
582   PetscFunctionReturn(0);
583 }
584 
585 #undef __FUNC__
586 #define __FUNC__ "MatNorm_MPIBAIJ"
587 int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
588 {
589   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
590   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
591   int        ierr, i,bs2=baij->bs2;
592   double     sum = 0.0;
593   Scalar     *v;
594 
595   PetscFunctionBegin;
596   if (baij->size == 1) {
597     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
598   } else {
599     if (type == NORM_FROBENIUS) {
600       v = amat->a;
601       for (i=0; i<amat->nz*bs2; i++ ) {
602 #if defined(USE_PETSC_COMPLEX)
603         sum += real(conj(*v)*(*v)); v++;
604 #else
605         sum += (*v)*(*v); v++;
606 #endif
607       }
608       v = bmat->a;
609       for (i=0; i<bmat->nz*bs2; i++ ) {
610 #if defined(USE_PETSC_COMPLEX)
611         sum += real(conj(*v)*(*v)); v++;
612 #else
613         sum += (*v)*(*v); v++;
614 #endif
615       }
616       ierr = MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);CHKERRQ(ierr);
617       *norm = sqrt(*norm);
618     } else {
619       SETERRQ(PETSC_ERR_SUP,0,"No support for this norm yet");
620     }
621   }
622   PetscFunctionReturn(0);
623 }
624 
625 #undef __FUNC__
626 #define __FUNC__ "MatAssemblyBegin_MPIBAIJ"
627 int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
628 {
629   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
630   MPI_Comm    comm = mat->comm;
631   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
632   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
633   MPI_Request *send_waits,*recv_waits;
634   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
635   InsertMode  addv;
636   Scalar      *rvalues,*svalues;
637 
638   PetscFunctionBegin;
639   /* make sure all processors are either in INSERTMODE or ADDMODE */
640   ierr = MPI_Allreduce(&mat->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);CHKERRQ(ierr);
641   if (addv == (ADD_VALUES|INSERT_VALUES)) {
642     SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"Some processors inserted others added");
643   }
644   mat->insertmode = addv; /* in case this processor had no cache */
645 
646   /*  first count number of contributors to each processor */
647   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
648   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
649   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
650   for ( i=0; i<baij->stash.n; i++ ) {
651     idx = baij->stash.idx[i];
652     for ( j=0; j<size; j++ ) {
653       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
654         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
655       }
656     }
657   }
658   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
659 
660   /* inform other processors of number of messages and max length*/
661   work      = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
662   ierr      = MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
663   nreceives = work[rank];
664   ierr      = MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);CHKERRQ(ierr);
665   nmax      = work[rank];
666   PetscFree(work);
667 
668   /* post receives:
669        1) each message will consist of ordered pairs
670      (global index,value) we store the global index as a double
671      to simplify the message passing.
672        2) since we don't know how long each individual message is we
673      allocate the largest needed buffer for each receive. Potentially
674      this is a lot of wasted space.
675 
676 
677        This could be done better.
678   */
679   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));CHKPTRQ(rvalues);
680   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
681   for ( i=0; i<nreceives; i++ ) {
682     ierr = MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,comm,recv_waits+i);CHKERRQ(ierr);
683   }
684 
685   /* do sends:
686       1) starts[i] gives the starting index in svalues for stuff going to
687          the ith processor
688   */
689   svalues = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
690   send_waits = (MPI_Request *) PetscMalloc((nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
691   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
692   starts[0] = 0;
693   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
694   for ( i=0; i<baij->stash.n; i++ ) {
695     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
696     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
697     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
698   }
699   PetscFree(owner);
700   starts[0] = 0;
701   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
702   count = 0;
703   for ( i=0; i<size; i++ ) {
704     if (procs[i]) {
705       ierr = MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,comm,send_waits+count++);CHKERRQ(ierr);
706     }
707   }
708   PetscFree(starts); PetscFree(nprocs);
709 
710   /* Free cache space */
711   PLogInfo(0,"[%d]MatAssemblyBegin_MPIBAIJ:Number of off-processor values %d\n",rank,baij->stash.n);
712   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
713 
714   baij->svalues    = svalues;    baij->rvalues    = rvalues;
715   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
716   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
717   baij->rmax       = nmax;
718 
719   PetscFunctionReturn(0);
720 }
721 
722 /*
723   Creates the hash table, and sets the table
724   This table is created only once.
725   If new entried need to be added to the matrix
726   then the hash table has to be destroyed and
727   recreated.
728 */
729 #undef __FUNC__
730 #define __FUNC__ "MatCreateHashTable_MPIBAIJ_Private"
731 int MatCreateHashTable_MPIBAIJ_Private(Mat mat,double factor)
732 {
733   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
734   Mat         A = baij->A, B=baij->B;
735   Mat_SeqBAIJ *a=(Mat_SeqBAIJ *)A->data, *b=(Mat_SeqBAIJ *)B->data;
736   int         i,j,k,nz=a->nz+b->nz,h1,*ai=a->i,*aj=a->j,*bi=b->i,*bj=b->j;
737   int         size,ct=0,max1=0,max2=0,bs2=baij->bs2,rstart=baij->rstart;
738   int         cstart=baij->cstart,*garray=baij->garray,row,col;
739   int         *HT,key;
740   Scalar      **HD;
741   double      tmp;
742 
743   PetscFunctionBegin;
744   baij->ht_size=(int)(factor*nz);
745   size = baij->ht_size;
746 
747   if (baij->ht) {
748     PetscFunctionReturn(0);
749   }
750 
751   /* Allocate Memory for Hash Table */
752   baij->hd = (Scalar**)PetscMalloc((size)*(sizeof(int)+sizeof(Scalar*))+1); CHKPTRQ(baij->hd);
753   baij->ht = (int*)(baij->hd + size);
754   HD = baij->hd;
755   HT = baij->ht;
756 
757 
758   PetscMemzero(HD,size*(sizeof(int)+sizeof(Scalar*)));
759 
760 
761   /* Loop Over A */
762   for ( i=0; i<a->mbs; i++ ) {
763     for ( j=ai[i]; j<ai[i+1]; j++ ) {
764       row = i+rstart;
765       col = aj[j]+cstart;
766 
767       key = row*baij->Nbs + col + 1;
768       /* printf("[%d]row,col,key = %2d,%2d,%5.2f\n",PetscGlobalRank,row,col,key); */
769       h1  = HASH(size,key,tmp);
770 
771       for ( k=0; k<size; k++ ){
772         if (HT[(h1+k)%size] == 0.0) {
773           HT[(h1+k)%size] = key;
774           HD[(h1+k)%size] = a->a + j*bs2;
775           break;
776         } else ct++;
777       }
778       if (k> max1) max1 = k;
779     }
780   }
781   /* Loop Over B */
782   for ( i=0; i<b->mbs; i++ ) {
783     for ( j=bi[i]; j<bi[i+1]; j++ ) {
784       row = i+rstart;
785       col = garray[bj[j]];
786       key = row*baij->Nbs + col + 1;
787       /* printf("[%d]row,col,key = %2d,%2d,%5.2f\n",PetscGlobalRank,row,col,key); */
788       h1  = HASH(size,key,tmp);
789       for ( k=0; k<size; k++ ){
790         if (HT[(h1+k)%size] == 0.0) {
791           HT[(h1+k)%size] = key;
792           HD[(h1+k)%size] = b->a + j*bs2;
793           break;
794         } else ct++;
795       }
796       if (k> max2) max2 = k;
797     }
798   }
799 
800   /* Print Summary */
801   for ( i=0,j=0; i<size; i++)
802     if (HT[i]) {j++;}
803 
804   /*printf("[%d] fact = %3.2f max1 = %5d max2 = %5d Size %5d - Searches %5d Avg %5.2f Keys %5d\n",
805          PetscGlobalRank,factor,max1,max2,size,ct+j,((float)ct+j),j); */
806   fflush(stdout);
807   PetscFunctionReturn(0);
808 }
809 
810 #undef __FUNC__
811 #define __FUNC__ "MatAssemblyEnd_MPIBAIJ"
812 int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
813 {
814   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
815   MPI_Status  *send_status,recv_status;
816   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
817   int         bs=baij->bs,row,col,other_disassembled,flg;
818   Scalar      *values,val;
819   InsertMode  addv = mat->insertmode;
820 
821   PetscFunctionBegin;
822   /*  wait on receives */
823   while (count) {
824     ierr = MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);CHKERRQ(ierr);
825     /* unpack receives into our local space */
826     values = baij->rvalues + 3*imdex*baij->rmax;
827     ierr = MPI_Get_count(&recv_status,MPIU_SCALAR,&n);CHKERRQ(ierr);
828     n = n/3;
829     for ( i=0; i<n; i++ ) {
830       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
831       col = (int) PetscReal(values[3*i+1]);
832       val = values[3*i+2];
833       if (col >= baij->cstart*bs && col < baij->cend*bs) {
834         col -= baij->cstart*bs;
835         ierr = MatSetValues(baij->A,1,&row,1,&col,&val,addv); CHKERRQ(ierr)
836       } else {
837         if (mat->was_assembled) {
838           if (!baij->colmap) {
839             ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);
840           }
841           col = (baij->colmap[col/bs]) - 1 + col%bs;
842           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
843             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
844             col = (int) PetscReal(values[3*i+1]);
845           }
846         }
847         ierr = MatSetValues(baij->B,1,&row,1,&col,&val,addv); CHKERRQ(ierr)
848       }
849     }
850     count--;
851   }
852   PetscFree(baij->recv_waits); PetscFree(baij->rvalues);
853 
854   /* wait on sends */
855   if (baij->nsends) {
856     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
857     ierr        = MPI_Waitall(baij->nsends,baij->send_waits,send_status);CHKERRQ(ierr);
858     PetscFree(send_status);
859   }
860   PetscFree(baij->send_waits); PetscFree(baij->svalues);
861 
862   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
863   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
864 
865   /* determine if any processor has disassembled, if so we must
866      also disassemble ourselfs, in order that we may reassemble. */
867   ierr = MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);CHKERRQ(ierr);
868   if (mat->was_assembled && !other_disassembled) {
869     ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
870   }
871 
872   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
873     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
874   }
875   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
876   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
877 
878   ierr = OptionsHasName(PETSC_NULL,"-use_hash",&flg); CHKERRQ(ierr);
879   if (flg && !baij->ht && mode== MAT_FINAL_ASSEMBLY) {
880     double fact = 1.39;
881     ierr = OptionsGetDouble(PETSC_NULL,"-use_hash",&fact,&flg); CHKERRQ(ierr);
882     if (fact <= 1.0) fact = 1.39;
883     PLogInfo(0,"[%d]MatAssemblyEnd_MPIBAIJ:Hash table Factor used %5.2f\n",PetscGlobalRank,fact);
884     ierr = MatCreateHashTable_MPIBAIJ_Private(mat,fact); CHKERRQ(ierr);
885     mat->ops.setvalues        = MatSetValues_MPIBAIJ_HT;
886     mat->ops.setvaluesblocked = MatSetValuesBlocked_MPIBAIJ_HT;
887   }
888   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
889   PetscFunctionReturn(0);
890 }
891 
892 #undef __FUNC__
893 #define __FUNC__ "MatView_MPIBAIJ_Binary"
894 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
895 {
896   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
897   int          ierr;
898 
899   PetscFunctionBegin;
900   if (baij->size == 1) {
901     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
902   } else SETERRQ(PETSC_ERR_SUP,0,"Only uniprocessor output supported");
903   PetscFunctionReturn(0);
904 }
905 
906 #undef __FUNC__
907 #define __FUNC__ "MatView_MPIBAIJ_ASCIIorDraworMatlab"
908 static int MatView_MPIBAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
909 {
910   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
911   int          ierr, format,rank,bs = baij->bs;
912   FILE         *fd;
913   ViewerType   vtype;
914 
915   PetscFunctionBegin;
916   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
917   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
918     ierr = ViewerGetFormat(viewer,&format);
919     if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
920       MatInfo info;
921       MPI_Comm_rank(mat->comm,&rank);
922       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
923       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
924       PetscSequentialPhaseBegin(mat->comm,1);
925       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
926               rank,baij->m,(int)info.nz_used*bs,(int)info.nz_allocated*bs,
927               baij->bs,(int)info.memory);
928       ierr = MatGetInfo(baij->A,MAT_LOCAL,&info);
929       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
930       ierr = MatGetInfo(baij->B,MAT_LOCAL,&info);
931       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
932       fflush(fd);
933       PetscSequentialPhaseEnd(mat->comm,1);
934       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
935       PetscFunctionReturn(0);
936     } else if (format == VIEWER_FORMAT_ASCII_INFO) {
937       PetscPrintf(mat->comm,"  block size is %d\n",bs);
938       PetscFunctionReturn(0);
939     }
940   }
941 
942   if (vtype == DRAW_VIEWER) {
943     Draw       draw;
944     PetscTruth isnull;
945     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
946     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) PetscFunctionReturn(0);
947   }
948 
949   if (vtype == ASCII_FILE_VIEWER) {
950     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
951     PetscSequentialPhaseBegin(mat->comm,1);
952     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
953            baij->rank,baij->m,baij->rstart*bs,baij->rend*bs,baij->n,
954             baij->cstart*bs,baij->cend*bs);
955     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
956     ierr = MatView(baij->B,viewer); CHKERRQ(ierr);
957     fflush(fd);
958     PetscSequentialPhaseEnd(mat->comm,1);
959   } else {
960     int size = baij->size;
961     rank = baij->rank;
962     if (size == 1) {
963       ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
964     } else {
965       /* assemble the entire matrix onto first processor. */
966       Mat         A;
967       Mat_SeqBAIJ *Aloc;
968       int         M = baij->M, N = baij->N,*ai,*aj,row,col,i,j,k,*rvals;
969       int         mbs=baij->mbs;
970       Scalar      *a;
971 
972       if (!rank) {
973         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);CHKERRQ(ierr);
974       } else {
975         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);CHKERRQ(ierr);
976       }
977       PLogObjectParent(mat,A);
978 
979       /* copy over the A part */
980       Aloc = (Mat_SeqBAIJ*) baij->A->data;
981       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
982       row = baij->rstart;
983       rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
984 
985       for ( i=0; i<mbs; i++ ) {
986         rvals[0] = bs*(baij->rstart + i);
987         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
988         for ( j=ai[i]; j<ai[i+1]; j++ ) {
989           col = (baij->cstart+aj[j])*bs;
990           for (k=0; k<bs; k++ ) {
991             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
992             col++; a += bs;
993           }
994         }
995       }
996       /* copy over the B part */
997       Aloc = (Mat_SeqBAIJ*) baij->B->data;
998       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
999       row = baij->rstart*bs;
1000       for ( i=0; i<mbs; i++ ) {
1001         rvals[0] = bs*(baij->rstart + i);
1002         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1003         for ( j=ai[i]; j<ai[i+1]; j++ ) {
1004           col = baij->garray[aj[j]]*bs;
1005           for (k=0; k<bs; k++ ) {
1006             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
1007             col++; a += bs;
1008           }
1009         }
1010       }
1011       PetscFree(rvals);
1012       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1013       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1014       /*
1015          Everyone has to call to draw the matrix since the graphics waits are
1016          synchronized across all processors that share the Draw object
1017       */
1018       if (!rank || vtype == DRAW_VIEWER) {
1019         ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
1020       }
1021       ierr = MatDestroy(A); CHKERRQ(ierr);
1022     }
1023   }
1024   PetscFunctionReturn(0);
1025 }
1026 
1027 
1028 
1029 #undef __FUNC__
1030 #define __FUNC__ "MatView_MPIBAIJ"
1031 int MatView_MPIBAIJ(PetscObject obj,Viewer viewer)
1032 {
1033   Mat         mat = (Mat) obj;
1034   int         ierr;
1035   ViewerType  vtype;
1036 
1037   PetscFunctionBegin;
1038   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
1039   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
1040       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
1041     ierr = MatView_MPIBAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
1042   } else if (vtype == BINARY_FILE_VIEWER) {
1043     ierr = MatView_MPIBAIJ_Binary(mat,viewer);CHKERRQ(ierr);
1044   }
1045   PetscFunctionReturn(0);
1046 }
1047 
1048 #undef __FUNC__
1049 #define __FUNC__ "MatDestroy_MPIBAIJ"
1050 int MatDestroy_MPIBAIJ(PetscObject obj)
1051 {
1052   Mat         mat = (Mat) obj;
1053   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1054   int         ierr;
1055 
1056   PetscFunctionBegin;
1057 #if defined(USE_PETSC_LOG)
1058   PLogObjectState(obj,"Rows=%d, Cols=%d",baij->M,baij->N);
1059 #endif
1060 
1061   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
1062   PetscFree(baij->rowners);
1063   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
1064   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
1065   if (baij->colmap) PetscFree(baij->colmap);
1066   if (baij->garray) PetscFree(baij->garray);
1067   if (baij->lvec)   VecDestroy(baij->lvec);
1068   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
1069   if (baij->rowvalues) PetscFree(baij->rowvalues);
1070   if (baij->barray) PetscFree(baij->barray);
1071   if (baij->hd) PetscFree(baij->hd);
1072   PetscFree(baij);
1073   PLogObjectDestroy(mat);
1074   PetscHeaderDestroy(mat);
1075   PetscFunctionReturn(0);
1076 }
1077 
1078 #undef __FUNC__
1079 #define __FUNC__ "MatMult_MPIBAIJ"
1080 int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
1081 {
1082   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1083   int         ierr, nt;
1084 
1085   PetscFunctionBegin;
1086   VecGetLocalSize_Fast(xx,nt);
1087   if (nt != a->n) {
1088     SETERRQ(PETSC_ERR_ARG_SIZ,0,"Incompatible partition of A and xx");
1089   }
1090   VecGetLocalSize_Fast(yy,nt);
1091   if (nt != a->m) {
1092     SETERRQ(PETSC_ERR_ARG_SIZ,0,"Incompatible parition of A and yy");
1093   }
1094   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
1095   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
1096   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
1097   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
1098   ierr = VecScatterPostRecvs(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
1099   PetscFunctionReturn(0);
1100 }
1101 
1102 #undef __FUNC__
1103 #define __FUNC__ "MatMultAdd_MPIBAIJ"
1104 int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
1105 {
1106   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1107   int        ierr;
1108 
1109   PetscFunctionBegin;
1110   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
1111   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
1112   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
1113   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
1114   PetscFunctionReturn(0);
1115 }
1116 
1117 #undef __FUNC__
1118 #define __FUNC__ "MatMultTrans_MPIBAIJ"
1119 int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
1120 {
1121   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1122   int         ierr;
1123 
1124   PetscFunctionBegin;
1125   /* do nondiagonal part */
1126   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
1127   /* send it on its way */
1128   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
1129   /* do local part */
1130   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
1131   /* receive remote parts: note this assumes the values are not actually */
1132   /* inserted in yy until the next line, which is true for my implementation*/
1133   /* but is not perhaps always true. */
1134   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
1135   PetscFunctionReturn(0);
1136 }
1137 
1138 #undef __FUNC__
1139 #define __FUNC__ "MatMultTransAdd_MPIBAIJ"
1140 int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
1141 {
1142   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1143   int         ierr;
1144 
1145   PetscFunctionBegin;
1146   /* do nondiagonal part */
1147   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
1148   /* send it on its way */
1149   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
1150   /* do local part */
1151   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
1152   /* receive remote parts: note this assumes the values are not actually */
1153   /* inserted in yy until the next line, which is true for my implementation*/
1154   /* but is not perhaps always true. */
1155   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
1156   PetscFunctionReturn(0);
1157 }
1158 
1159 /*
1160   This only works correctly for square matrices where the subblock A->A is the
1161    diagonal block
1162 */
1163 #undef __FUNC__
1164 #define __FUNC__ "MatGetDiagonal_MPIBAIJ"
1165 int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
1166 {
1167   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1168   int         ierr;
1169 
1170   PetscFunctionBegin;
1171   if (a->M != a->N) SETERRQ(PETSC_ERR_SUP,0,"Supports only square matrix where A->A is diag block");
1172   ierr = MatGetDiagonal(a->A,v);CHKERRQ(ierr);
1173   PetscFunctionReturn(0);
1174 }
1175 
1176 #undef __FUNC__
1177 #define __FUNC__ "MatScale_MPIBAIJ"
1178 int MatScale_MPIBAIJ(Scalar *aa,Mat A)
1179 {
1180   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1181   int         ierr;
1182 
1183   PetscFunctionBegin;
1184   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
1185   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
1186   PetscFunctionReturn(0);
1187 }
1188 
1189 #undef __FUNC__
1190 #define __FUNC__ "MatGetSize_MPIBAIJ"
1191 int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
1192 {
1193   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1194 
1195   PetscFunctionBegin;
1196   if (m) *m = mat->M;
1197   if (n) *n = mat->N;
1198   PetscFunctionReturn(0);
1199 }
1200 
1201 #undef __FUNC__
1202 #define __FUNC__ "MatGetLocalSize_MPIBAIJ"
1203 int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
1204 {
1205   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1206 
1207   PetscFunctionBegin;
1208   *m = mat->m; *n = mat->N;
1209   PetscFunctionReturn(0);
1210 }
1211 
1212 #undef __FUNC__
1213 #define __FUNC__ "MatGetOwnershipRange_MPIBAIJ"
1214 int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
1215 {
1216   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1217 
1218   PetscFunctionBegin;
1219   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
1220   PetscFunctionReturn(0);
1221 }
1222 
1223 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
1224 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
1225 
1226 #undef __FUNC__
1227 #define __FUNC__ "MatGetRow_MPIBAIJ"
1228 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
1229 {
1230   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1231   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
1232   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
1233   int        nztot, nzA, nzB, lrow, brstart = mat->rstart*bs, brend = mat->rend*bs;
1234   int        *cmap, *idx_p,cstart = mat->cstart;
1235 
1236   PetscFunctionBegin;
1237   if (mat->getrowactive == PETSC_TRUE) SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"Already active");
1238   mat->getrowactive = PETSC_TRUE;
1239 
1240   if (!mat->rowvalues && (idx || v)) {
1241     /*
1242         allocate enough space to hold information from the longest row.
1243     */
1244     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
1245     int     max = 1,mbs = mat->mbs,tmp;
1246     for ( i=0; i<mbs; i++ ) {
1247       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
1248       if (max < tmp) { max = tmp; }
1249     }
1250     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
1251     CHKPTRQ(mat->rowvalues);
1252     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
1253   }
1254 
1255   if (row < brstart || row >= brend) SETERRQ(PETSC_ERR_SUP,0,"Only local rows")
1256   lrow = row - brstart;
1257 
1258   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
1259   if (!v)   {pvA = 0; pvB = 0;}
1260   if (!idx) {pcA = 0; if (!v) pcB = 0;}
1261   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1262   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1263   nztot = nzA + nzB;
1264 
1265   cmap  = mat->garray;
1266   if (v  || idx) {
1267     if (nztot) {
1268       /* Sort by increasing column numbers, assuming A and B already sorted */
1269       int imark = -1;
1270       if (v) {
1271         *v = v_p = mat->rowvalues;
1272         for ( i=0; i<nzB; i++ ) {
1273           if (cmap[cworkB[i]/bs] < cstart)   v_p[i] = vworkB[i];
1274           else break;
1275         }
1276         imark = i;
1277         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
1278         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
1279       }
1280       if (idx) {
1281         *idx = idx_p = mat->rowindices;
1282         if (imark > -1) {
1283           for ( i=0; i<imark; i++ ) {
1284             idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs;
1285           }
1286         } else {
1287           for ( i=0; i<nzB; i++ ) {
1288             if (cmap[cworkB[i]/bs] < cstart)
1289               idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
1290             else break;
1291           }
1292           imark = i;
1293         }
1294         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart*bs + cworkA[i];
1295         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
1296       }
1297     } else {
1298       if (idx) *idx = 0;
1299       if (v)   *v   = 0;
1300     }
1301   }
1302   *nz = nztot;
1303   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1304   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1305   PetscFunctionReturn(0);
1306 }
1307 
1308 #undef __FUNC__
1309 #define __FUNC__ "MatRestoreRow_MPIBAIJ"
1310 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
1311 {
1312   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1313 
1314   PetscFunctionBegin;
1315   if (baij->getrowactive == PETSC_FALSE) {
1316     SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"MatGetRow not called");
1317   }
1318   baij->getrowactive = PETSC_FALSE;
1319   PetscFunctionReturn(0);
1320 }
1321 
1322 #undef __FUNC__
1323 #define __FUNC__ "MatGetBlockSize_MPIBAIJ"
1324 int MatGetBlockSize_MPIBAIJ(Mat mat,int *bs)
1325 {
1326   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1327 
1328   PetscFunctionBegin;
1329   *bs = baij->bs;
1330   PetscFunctionReturn(0);
1331 }
1332 
1333 #undef __FUNC__
1334 #define __FUNC__ "MatZeroEntries_MPIBAIJ"
1335 int MatZeroEntries_MPIBAIJ(Mat A)
1336 {
1337   Mat_MPIBAIJ *l = (Mat_MPIBAIJ *) A->data;
1338   int         ierr;
1339 
1340   PetscFunctionBegin;
1341   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
1342   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
1343   PetscFunctionReturn(0);
1344 }
1345 
1346 #undef __FUNC__
1347 #define __FUNC__ "MatGetInfo_MPIBAIJ"
1348 int MatGetInfo_MPIBAIJ(Mat matin,MatInfoType flag,MatInfo *info)
1349 {
1350   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) matin->data;
1351   Mat         A = a->A, B = a->B;
1352   int         ierr;
1353   double      isend[5], irecv[5];
1354 
1355   PetscFunctionBegin;
1356   info->block_size     = (double)a->bs;
1357   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
1358   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->memory;
1359   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
1360   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->memory;
1361   if (flag == MAT_LOCAL) {
1362     info->nz_used      = isend[0];
1363     info->nz_allocated = isend[1];
1364     info->nz_unneeded  = isend[2];
1365     info->memory       = isend[3];
1366     info->mallocs      = isend[4];
1367   } else if (flag == MAT_GLOBAL_MAX) {
1368     ierr = MPI_Allreduce(isend,irecv,5,MPI_INT,MPI_MAX,matin->comm);CHKERRQ(ierr);
1369     info->nz_used      = irecv[0];
1370     info->nz_allocated = irecv[1];
1371     info->nz_unneeded  = irecv[2];
1372     info->memory       = irecv[3];
1373     info->mallocs      = irecv[4];
1374   } else if (flag == MAT_GLOBAL_SUM) {
1375     ierr = MPI_Allreduce(isend,irecv,5,MPI_INT,MPI_SUM,matin->comm);CHKERRQ(ierr);
1376     info->nz_used      = irecv[0];
1377     info->nz_allocated = irecv[1];
1378     info->nz_unneeded  = irecv[2];
1379     info->memory       = irecv[3];
1380     info->mallocs      = irecv[4];
1381   }
1382   info->rows_global       = (double)a->M;
1383   info->columns_global    = (double)a->N;
1384   info->rows_local        = (double)a->m;
1385   info->columns_local     = (double)a->N;
1386   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
1387   info->fill_ratio_needed = 0;
1388   info->factor_mallocs    = 0;
1389   PetscFunctionReturn(0);
1390 }
1391 
1392 #undef __FUNC__
1393 #define __FUNC__ "MatSetOption_MPIBAIJ"
1394 int MatSetOption_MPIBAIJ(Mat A,MatOption op)
1395 {
1396   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1397 
1398   PetscFunctionBegin;
1399   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
1400       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
1401       op == MAT_COLUMNS_UNSORTED ||
1402       op == MAT_COLUMNS_SORTED ||
1403       op == MAT_NEW_NONZERO_ALLOCATION_ERROR ||
1404       op == MAT_NEW_NONZERO_LOCATION_ERROR) {
1405         MatSetOption(a->A,op);
1406         MatSetOption(a->B,op);
1407   } else if (op == MAT_ROW_ORIENTED) {
1408         a->roworiented = 1;
1409         MatSetOption(a->A,op);
1410         MatSetOption(a->B,op);
1411   } else if (op == MAT_ROWS_SORTED ||
1412              op == MAT_ROWS_UNSORTED ||
1413              op == MAT_SYMMETRIC ||
1414              op == MAT_STRUCTURALLY_SYMMETRIC ||
1415              op == MAT_YES_NEW_DIAGONALS)
1416     PLogInfo(A,"Info:MatSetOption_MPIBAIJ:Option ignored\n");
1417   else if (op == MAT_COLUMN_ORIENTED) {
1418     a->roworiented = 0;
1419     MatSetOption(a->A,op);
1420     MatSetOption(a->B,op);
1421   } else if (op == MAT_IGNORE_OFF_PROC_ENTRIES) {
1422     a->donotstash = 1;
1423   } else if (op == MAT_NO_NEW_DIAGONALS) {
1424     SETERRQ(PETSC_ERR_SUP,0,"MAT_NO_NEW_DIAGONALS");
1425   } else {
1426     SETERRQ(PETSC_ERR_SUP,0,"unknown option");
1427   }
1428   PetscFunctionReturn(0);
1429 }
1430 
1431 #undef __FUNC__
1432 #define __FUNC__ "MatTranspose_MPIBAIJ("
1433 int MatTranspose_MPIBAIJ(Mat A,Mat *matout)
1434 {
1435   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) A->data;
1436   Mat_SeqBAIJ *Aloc;
1437   Mat        B;
1438   int        ierr,M=baij->M,N=baij->N,*ai,*aj,row,i,*rvals,j,k,col;
1439   int        bs=baij->bs,mbs=baij->mbs;
1440   Scalar     *a;
1441 
1442   PetscFunctionBegin;
1443   if (matout == PETSC_NULL && M != N) SETERRQ(PETSC_ERR_ARG_SIZ,0,"Square matrix only for in-place");
1444   ierr = MatCreateMPIBAIJ(A->comm,baij->bs,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);
1445   CHKERRQ(ierr);
1446 
1447   /* copy over the A part */
1448   Aloc = (Mat_SeqBAIJ*) baij->A->data;
1449   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1450   row = baij->rstart;
1451   rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
1452 
1453   for ( i=0; i<mbs; i++ ) {
1454     rvals[0] = bs*(baij->rstart + i);
1455     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1456     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1457       col = (baij->cstart+aj[j])*bs;
1458       for (k=0; k<bs; k++ ) {
1459         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1460         col++; a += bs;
1461       }
1462     }
1463   }
1464   /* copy over the B part */
1465   Aloc = (Mat_SeqBAIJ*) baij->B->data;
1466   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1467   row = baij->rstart*bs;
1468   for ( i=0; i<mbs; i++ ) {
1469     rvals[0] = bs*(baij->rstart + i);
1470     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1471     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1472       col = baij->garray[aj[j]]*bs;
1473       for (k=0; k<bs; k++ ) {
1474         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1475         col++; a += bs;
1476       }
1477     }
1478   }
1479   PetscFree(rvals);
1480   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1481   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1482 
1483   if (matout != PETSC_NULL) {
1484     *matout = B;
1485   } else {
1486     /* This isn't really an in-place transpose .... but free data structures from baij */
1487     PetscFree(baij->rowners);
1488     ierr = MatDestroy(baij->A); CHKERRQ(ierr);
1489     ierr = MatDestroy(baij->B); CHKERRQ(ierr);
1490     if (baij->colmap) PetscFree(baij->colmap);
1491     if (baij->garray) PetscFree(baij->garray);
1492     if (baij->lvec) VecDestroy(baij->lvec);
1493     if (baij->Mvctx) VecScatterDestroy(baij->Mvctx);
1494     PetscFree(baij);
1495     PetscMemcpy(A,B,sizeof(struct _p_Mat));
1496     PetscHeaderDestroy(B);
1497   }
1498   PetscFunctionReturn(0);
1499 }
1500 
1501 #undef __FUNC__
1502 #define __FUNC__ "MatDiagonalScale_MPIBAIJ"
1503 int MatDiagonalScale_MPIBAIJ(Mat A,Vec ll,Vec rr)
1504 {
1505   Mat a = ((Mat_MPIBAIJ *) A->data)->A;
1506   Mat b = ((Mat_MPIBAIJ *) A->data)->B;
1507   int ierr,s1,s2,s3;
1508 
1509   PetscFunctionBegin;
1510   if (ll)  {
1511     ierr = VecGetLocalSize(ll,&s1); CHKERRQ(ierr);
1512     ierr = MatGetLocalSize(A,&s2,&s3); CHKERRQ(ierr);
1513     if (s1!=s2) SETERRQ(PETSC_ERR_ARG_SIZ,0,"non-conforming local sizes");
1514     ierr = MatDiagonalScale(a,ll,0); CHKERRQ(ierr);
1515     ierr = MatDiagonalScale(b,ll,0); CHKERRQ(ierr);
1516   }
1517   if (rr) SETERRQ(PETSC_ERR_SUP,0,"not supported for right vector");
1518   PetscFunctionReturn(0);
1519 }
1520 
1521 #undef __FUNC__
1522 #define __FUNC__ "MatZeroRows_MPIBAIJ"
1523 int MatZeroRows_MPIBAIJ(Mat A,IS is,Scalar *diag)
1524 {
1525   Mat_MPIBAIJ    *l = (Mat_MPIBAIJ *) A->data;
1526   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
1527   int            *procs,*nprocs,j,found,idx,nsends,*work,row;
1528   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
1529   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
1530   int            *lens,imdex,*lrows,*values,bs=l->bs,rstart_bs=l->rstart_bs;
1531   MPI_Comm       comm = A->comm;
1532   MPI_Request    *send_waits,*recv_waits;
1533   MPI_Status     recv_status,*send_status;
1534   IS             istmp;
1535 
1536   PetscFunctionBegin;
1537   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
1538   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
1539 
1540   /*  first count number of contributors to each processor */
1541   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
1542   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
1543   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
1544   for ( i=0; i<N; i++ ) {
1545     idx = rows[i];
1546     found = 0;
1547     for ( j=0; j<size; j++ ) {
1548       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
1549         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
1550       }
1551     }
1552     if (!found) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Index out of range");
1553   }
1554   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
1555 
1556   /* inform other processors of number of messages and max length*/
1557   work   = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
1558   ierr   = MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
1559   nrecvs = work[rank];
1560   ierr   = MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);CHKERRQ(ierr);
1561   nmax   = work[rank];
1562   PetscFree(work);
1563 
1564   /* post receives:   */
1565   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); CHKPTRQ(rvalues);
1566   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
1567   for ( i=0; i<nrecvs; i++ ) {
1568     ierr = MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);CHKERRQ(ierr);
1569   }
1570 
1571   /* do sends:
1572      1) starts[i] gives the starting index in svalues for stuff going to
1573      the ith processor
1574   */
1575   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
1576   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
1577   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
1578   starts[0] = 0;
1579   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1580   for ( i=0; i<N; i++ ) {
1581     svalues[starts[owner[i]]++] = rows[i];
1582   }
1583   ISRestoreIndices(is,&rows);
1584 
1585   starts[0] = 0;
1586   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1587   count = 0;
1588   for ( i=0; i<size; i++ ) {
1589     if (procs[i]) {
1590       ierr = MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);CHKERRQ(ierr);
1591     }
1592   }
1593   PetscFree(starts);
1594 
1595   base = owners[rank]*bs;
1596 
1597   /*  wait on receives */
1598   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
1599   source = lens + nrecvs;
1600   count  = nrecvs; slen = 0;
1601   while (count) {
1602     ierr = MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);CHKERRQ(ierr);
1603     /* unpack receives into our local space */
1604     ierr = MPI_Get_count(&recv_status,MPI_INT,&n);CHKERRQ(ierr);
1605     source[imdex]  = recv_status.MPI_SOURCE;
1606     lens[imdex]  = n;
1607     slen += n;
1608     count--;
1609   }
1610   PetscFree(recv_waits);
1611 
1612   /* move the data into the send scatter */
1613   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
1614   count = 0;
1615   for ( i=0; i<nrecvs; i++ ) {
1616     values = rvalues + i*nmax;
1617     for ( j=0; j<lens[i]; j++ ) {
1618       lrows[count++] = values[j] - base;
1619     }
1620   }
1621   PetscFree(rvalues); PetscFree(lens);
1622   PetscFree(owner); PetscFree(nprocs);
1623 
1624   /* actually zap the local rows */
1625   ierr = ISCreateGeneral(PETSC_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
1626   PLogObjectParent(A,istmp);
1627 
1628   ierr = MatZeroRows(l->A,istmp,0); CHKERRQ(ierr);
1629   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
1630   ierr = ISDestroy(istmp); CHKERRQ(ierr);
1631 
1632   if (diag) {
1633     for ( i = 0; i < slen; i++ ) {
1634       row = lrows[i] + rstart_bs;
1635       ierr = MatSetValues(A,1,&row,1,&row,diag,INSERT_VALUES); CHKERRQ(ierr);
1636     }
1637     ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1638     ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1639   }
1640   PetscFree(lrows);
1641 
1642   /* wait on sends */
1643   if (nsends) {
1644     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
1645     ierr        = MPI_Waitall(nsends,send_waits,send_status);CHKERRQ(ierr);
1646     PetscFree(send_status);
1647   }
1648   PetscFree(send_waits); PetscFree(svalues);
1649 
1650   PetscFunctionReturn(0);
1651 }
1652 extern int MatPrintHelp_SeqBAIJ(Mat);
1653 #undef __FUNC__
1654 #define __FUNC__ "MatPrintHelp_MPIBAIJ"
1655 int MatPrintHelp_MPIBAIJ(Mat A)
1656 {
1657   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1658   int         ierr;
1659 
1660   PetscFunctionBegin;
1661   if (!a->rank) {
1662     ierr = MatPrintHelp_SeqBAIJ(a->A);CHKERRQ(ierr);
1663   }
1664   PetscFunctionReturn(0);
1665 }
1666 
1667 #undef __FUNC__
1668 #define __FUNC__ "MatSetUnfactored_MPIBAIJ"
1669 int MatSetUnfactored_MPIBAIJ(Mat A)
1670 {
1671   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1672   int         ierr;
1673 
1674   PetscFunctionBegin;
1675   ierr = MatSetUnfactored(a->A); CHKERRQ(ierr);
1676   PetscFunctionReturn(0);
1677 }
1678 
1679 static int MatConvertSameType_MPIBAIJ(Mat,Mat *,int);
1680 
1681 /* -------------------------------------------------------------------*/
1682 static struct _MatOps MatOps = {
1683   MatSetValues_MPIBAIJ,MatGetRow_MPIBAIJ,MatRestoreRow_MPIBAIJ,MatMult_MPIBAIJ,
1684   MatMultAdd_MPIBAIJ,MatMultTrans_MPIBAIJ,MatMultTransAdd_MPIBAIJ,0,
1685   0,0,0,0,
1686   0,0,MatTranspose_MPIBAIJ,MatGetInfo_MPIBAIJ,
1687   0,MatGetDiagonal_MPIBAIJ,MatDiagonalScale_MPIBAIJ,MatNorm_MPIBAIJ,
1688   MatAssemblyBegin_MPIBAIJ,MatAssemblyEnd_MPIBAIJ,0,MatSetOption_MPIBAIJ,
1689   MatZeroEntries_MPIBAIJ,MatZeroRows_MPIBAIJ,0,
1690   0,0,0,MatGetSize_MPIBAIJ,
1691   MatGetLocalSize_MPIBAIJ,MatGetOwnershipRange_MPIBAIJ,0,0,
1692   0,0,MatConvertSameType_MPIBAIJ,0,0,
1693   0,0,0,MatGetSubMatrices_MPIBAIJ,
1694   MatIncreaseOverlap_MPIBAIJ,MatGetValues_MPIBAIJ,0,MatPrintHelp_MPIBAIJ,
1695   MatScale_MPIBAIJ,0,0,0,MatGetBlockSize_MPIBAIJ,
1696   0,0,0,0,0,0,MatSetUnfactored_MPIBAIJ,0,MatSetValuesBlocked_MPIBAIJ};
1697 
1698 
1699 #undef __FUNC__
1700 #define __FUNC__ "MatCreateMPIBAIJ"
1701 /*@C
1702    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
1703    (block compressed row).  For good matrix assembly performance
1704    the user should preallocate the matrix storage by setting the parameters
1705    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1706    performance can be increased by more than a factor of 50.
1707 
1708    Input Parameters:
1709 .  comm - MPI communicator
1710 .  bs   - size of blockk
1711 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1712            This value should be the same as the local size used in creating the
1713            y vector for the matrix-vector product y = Ax.
1714 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1715            This value should be the same as the local size used in creating the
1716            x vector for the matrix-vector product y = Ax.
1717 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1718 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1719 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
1720            submatrix  (same for all local rows)
1721 .  d_nzz - array containing the number of block nonzeros in the various block rows
1722            of the in diagonal portion of the local (possibly different for each block
1723            row) or PETSC_NULL.  You must leave room for the diagonal entry even if
1724            it is zero.
1725 .  o_nz  - number of block nonzeros per block row in the off-diagonal portion of local
1726            submatrix (same for all local rows).
1727 .  o_nzz - array containing the number of nonzeros in the various block rows of the
1728            off-diagonal portion of the local submatrix (possibly different for
1729            each block row) or PETSC_NULL.
1730 
1731    Output Parameter:
1732 .  A - the matrix
1733 
1734    Notes:
1735    The user MUST specify either the local or global matrix dimensions
1736    (possibly both).
1737 
1738    Storage Information:
1739    For a square global matrix we define each processor's diagonal portion
1740    to be its local rows and the corresponding columns (a square submatrix);
1741    each processor's off-diagonal portion encompasses the remainder of the
1742    local matrix (a rectangular submatrix).
1743 
1744    The user can specify preallocated storage for the diagonal part of
1745    the local submatrix with either d_nz or d_nnz (not both).  Set
1746    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1747    memory allocation.  Likewise, specify preallocated storage for the
1748    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1749 
1750    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1751    the figure below we depict these three local rows and all columns (0-11).
1752 
1753 $          0 1 2 3 4 5 6 7 8 9 10 11
1754 $         -------------------
1755 $  row 3  |  o o o d d d o o o o o o
1756 $  row 4  |  o o o d d d o o o o o o
1757 $  row 5  |  o o o d d d o o o o o o
1758 $         -------------------
1759 $
1760 
1761    Thus, any entries in the d locations are stored in the d (diagonal)
1762    submatrix, and any entries in the o locations are stored in the
1763    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1764    stored simply in the MATSEQBAIJ format for compressed row storage.
1765 
1766    Now d_nz should indicate the number of block nonzeros per row in the d matrix,
1767    and o_nz should indicate the number of block nonzeros per row in the o matrix.
1768    In general, for PDE problems in which most nonzeros are near the diagonal,
1769    one expects d_nz >> o_nz.   For large problems you MUST preallocate memory
1770    or you will get TERRIBLE performance; see the users' manual chapter on
1771    matrices.
1772 
1773 .keywords: matrix, block, aij, compressed row, sparse, parallel
1774 
1775 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues()
1776 @*/
1777 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
1778                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1779 {
1780   Mat          B;
1781   Mat_MPIBAIJ  *b;
1782   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE,size;
1783 
1784   PetscFunctionBegin;
1785   if (bs < 1) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Invalid block size specified, must be positive");
1786 
1787   MPI_Comm_size(comm,&size);
1788   if (size == 1) {
1789     if (M == PETSC_DECIDE) M = m;
1790     if (N == PETSC_DECIDE) N = n;
1791     ierr = MatCreateSeqBAIJ(comm,bs,M,N,d_nz,d_nnz,A); CHKERRQ(ierr);
1792     PetscFunctionReturn(0);
1793   }
1794 
1795   *A = 0;
1796   PetscHeaderCreate(B,_p_Mat,MAT_COOKIE,MATMPIBAIJ,comm,MatDestroy,MatView);
1797   PLogObjectCreate(B);
1798   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
1799   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
1800   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1801 
1802   B->destroy    = MatDestroy_MPIBAIJ;
1803   B->view       = MatView_MPIBAIJ;
1804   B->mapping    = 0;
1805   B->factor     = 0;
1806   B->assembled  = PETSC_FALSE;
1807 
1808   B->insertmode = NOT_SET_VALUES;
1809   MPI_Comm_rank(comm,&b->rank);
1810   MPI_Comm_size(comm,&b->size);
1811 
1812   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL)) {
1813     SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1814   }
1815   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) {
1816     SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"either M or m should be specified");
1817   }
1818   if ( N == PETSC_DECIDE && n == PETSC_DECIDE) {
1819     SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"either N or n should be specified");
1820   }
1821   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
1822   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
1823 
1824   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1825     work[0] = m; work[1] = n;
1826     mbs = m/bs; nbs = n/bs;
1827     ierr = MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );CHKERRQ(ierr);
1828     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
1829     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
1830   }
1831   if (m == PETSC_DECIDE) {
1832     Mbs = M/bs;
1833     if (Mbs*bs != M) SETERRQ(PETSC_ERR_ARG_SIZ,0,"No of global rows must be divisible by blocksize");
1834     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
1835     m   = mbs*bs;
1836   }
1837   if (n == PETSC_DECIDE) {
1838     Nbs = N/bs;
1839     if (Nbs*bs != N) SETERRQ(PETSC_ERR_ARG_SIZ,0,"No of global cols must be divisible by blocksize");
1840     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
1841     n   = nbs*bs;
1842   }
1843   if (mbs*bs != m || nbs*bs != n) {
1844     SETERRQ(PETSC_ERR_ARG_SIZ,0,"No of local rows, cols must be divisible by blocksize");
1845   }
1846 
1847   b->m = m; B->m = m;
1848   b->n = n; B->n = n;
1849   b->N = N; B->N = N;
1850   b->M = M; B->M = M;
1851   b->bs  = bs;
1852   b->bs2 = bs*bs;
1853   b->mbs = mbs;
1854   b->nbs = nbs;
1855   b->Mbs = Mbs;
1856   b->Nbs = Nbs;
1857 
1858   /* build local table of row and column ownerships */
1859   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1860   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIBAIJ));
1861   b->cowners = b->rowners + b->size + 2;
1862   ierr = MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1863   b->rowners[0] = 0;
1864   for ( i=2; i<=b->size; i++ ) {
1865     b->rowners[i] += b->rowners[i-1];
1866   }
1867   b->rstart    = b->rowners[b->rank];
1868   b->rend      = b->rowners[b->rank+1];
1869   b->rstart_bs = b->rstart * bs;
1870   b->rend_bs   = b->rend * bs;
1871 
1872   ierr = MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1873   b->cowners[0] = 0;
1874   for ( i=2; i<=b->size; i++ ) {
1875     b->cowners[i] += b->cowners[i-1];
1876   }
1877   b->cstart    = b->cowners[b->rank];
1878   b->cend      = b->cowners[b->rank+1];
1879   b->cstart_bs = b->cstart * bs;
1880   b->cend_bs   = b->cend * bs;
1881 
1882 
1883   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1884   ierr = MatCreateSeqBAIJ(PETSC_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1885   PLogObjectParent(B,b->A);
1886   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1887   ierr = MatCreateSeqBAIJ(PETSC_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1888   PLogObjectParent(B,b->B);
1889 
1890   /* build cache for off array entries formed */
1891   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1892   b->donotstash  = 0;
1893   b->colmap      = 0;
1894   b->garray      = 0;
1895   b->roworiented = 1;
1896 
1897   /* stuff used in block assembly */
1898   b->barray       = 0;
1899 
1900   /* stuff used for matrix vector multiply */
1901   b->lvec         = 0;
1902   b->Mvctx        = 0;
1903 
1904   /* stuff for MatGetRow() */
1905   b->rowindices   = 0;
1906   b->rowvalues    = 0;
1907   b->getrowactive = PETSC_FALSE;
1908 
1909   /* hash table stuff */
1910   b->ht          = 0;
1911   b->ht_size     = 0;
1912 
1913   *A = B;
1914   PetscFunctionReturn(0);
1915 }
1916 
1917 #undef __FUNC__
1918 #define __FUNC__ "MatConvertSameType_MPIBAIJ"
1919 static int MatConvertSameType_MPIBAIJ(Mat matin,Mat *newmat,int cpvalues)
1920 {
1921   Mat         mat;
1922   Mat_MPIBAIJ *a,*oldmat = (Mat_MPIBAIJ *) matin->data;
1923   int         ierr, len=0, flg;
1924 
1925   PetscFunctionBegin;
1926   *newmat       = 0;
1927   PetscHeaderCreate(mat,_p_Mat,MAT_COOKIE,MATMPIBAIJ,matin->comm,MatDestroy,MatView);
1928   PLogObjectCreate(mat);
1929   mat->data       = (void *) (a = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(a);
1930   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1931   mat->destroy    = MatDestroy_MPIBAIJ;
1932   mat->view       = MatView_MPIBAIJ;
1933   mat->factor     = matin->factor;
1934   mat->assembled  = PETSC_TRUE;
1935 
1936   a->m = mat->m   = oldmat->m;
1937   a->n = mat->n   = oldmat->n;
1938   a->M = mat->M   = oldmat->M;
1939   a->N = mat->N   = oldmat->N;
1940 
1941   a->bs  = oldmat->bs;
1942   a->bs2 = oldmat->bs2;
1943   a->mbs = oldmat->mbs;
1944   a->nbs = oldmat->nbs;
1945   a->Mbs = oldmat->Mbs;
1946   a->Nbs = oldmat->Nbs;
1947 
1948   a->rstart       = oldmat->rstart;
1949   a->rend         = oldmat->rend;
1950   a->cstart       = oldmat->cstart;
1951   a->cend         = oldmat->cend;
1952   a->size         = oldmat->size;
1953   a->rank         = oldmat->rank;
1954   mat->insertmode = NOT_SET_VALUES;
1955   a->rowvalues    = 0;
1956   a->getrowactive = PETSC_FALSE;
1957   a->barray       = 0;
1958 
1959   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1960   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIBAIJ));
1961   a->cowners = a->rowners + a->size + 2;
1962   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1963   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1964   if (oldmat->colmap) {
1965     a->colmap = (int *) PetscMalloc((a->Nbs)*sizeof(int));CHKPTRQ(a->colmap);
1966     PLogObjectMemory(mat,(a->Nbs)*sizeof(int));
1967     PetscMemcpy(a->colmap,oldmat->colmap,(a->Nbs)*sizeof(int));
1968   } else a->colmap = 0;
1969   if (oldmat->garray && (len = ((Mat_SeqBAIJ *) (oldmat->B->data))->nbs)) {
1970     a->garray = (int *) PetscMalloc(len*sizeof(int)); CHKPTRQ(a->garray);
1971     PLogObjectMemory(mat,len*sizeof(int));
1972     PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1973   } else a->garray = 0;
1974 
1975   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1976   PLogObjectParent(mat,a->lvec);
1977   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1978   PLogObjectParent(mat,a->Mvctx);
1979   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1980   PLogObjectParent(mat,a->A);
1981   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1982   PLogObjectParent(mat,a->B);
1983   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1984   if (flg) {
1985     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1986   }
1987   *newmat = mat;
1988   PetscFunctionReturn(0);
1989 }
1990 
1991 #include "sys.h"
1992 
1993 #undef __FUNC__
1994 #define __FUNC__ "MatLoad_MPIBAIJ"
1995 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
1996 {
1997   Mat          A;
1998   int          i, nz, ierr, j,rstart, rend, fd;
1999   Scalar       *vals,*buf;
2000   MPI_Comm     comm = ((PetscObject)viewer)->comm;
2001   MPI_Status   status;
2002   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
2003   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
2004   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,bs2,Mbs,mbs,extra_rows;
2005   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
2006   int          dcount,kmax,k,nzcount,tmp;
2007 
2008   PetscFunctionBegin;
2009   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
2010   bs2  = bs*bs;
2011 
2012   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
2013   if (!rank) {
2014     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
2015     ierr = PetscBinaryRead(fd,(char *)header,4,PETSC_INT); CHKERRQ(ierr);
2016     if (header[0] != MAT_COOKIE) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"not matrix object");
2017     if (header[3] < 0) {
2018       SETERRQ(PETSC_ERR_FILE_UNEXPECTED,1,"Matrix stored in special format, cannot load as MPIBAIJ");
2019     }
2020   }
2021 
2022   ierr = MPI_Bcast(header+1,3,MPI_INT,0,comm);CHKERRQ(ierr);
2023   M = header[1]; N = header[2];
2024 
2025   if (M != N) SETERRQ(PETSC_ERR_SUP,0,"Can only do square matrices");
2026 
2027   /*
2028      This code adds extra rows to make sure the number of rows is
2029      divisible by the blocksize
2030   */
2031   Mbs        = M/bs;
2032   extra_rows = bs - M + bs*(Mbs);
2033   if (extra_rows == bs) extra_rows = 0;
2034   else                  Mbs++;
2035   if (extra_rows &&!rank) {
2036     PLogInfo(0,"MatLoad_MPIBAIJ:Padding loaded matrix to match blocksize\n");
2037   }
2038 
2039   /* determine ownership of all rows */
2040   mbs = Mbs/size + ((Mbs % size) > rank);
2041   m   = mbs * bs;
2042   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
2043   browners = rowners + size + 1;
2044   ierr = MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
2045   rowners[0] = 0;
2046   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
2047   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
2048   rstart = rowners[rank];
2049   rend   = rowners[rank+1];
2050 
2051   /* distribute row lengths to all processors */
2052   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
2053   if (!rank) {
2054     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
2055     ierr = PetscBinaryRead(fd,rowlengths,M,PETSC_INT); CHKERRQ(ierr);
2056     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
2057     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
2058     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
2059     ierr = MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);CHKERRQ(ierr);
2060     PetscFree(sndcounts);
2061   } else {
2062     ierr = MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);CHKERRQ(ierr);
2063   }
2064 
2065   if (!rank) {
2066     /* calculate the number of nonzeros on each processor */
2067     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
2068     PetscMemzero(procsnz,size*sizeof(int));
2069     for ( i=0; i<size; i++ ) {
2070       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
2071         procsnz[i] += rowlengths[j];
2072       }
2073     }
2074     PetscFree(rowlengths);
2075 
2076     /* determine max buffer needed and allocate it */
2077     maxnz = 0;
2078     for ( i=0; i<size; i++ ) {
2079       maxnz = PetscMax(maxnz,procsnz[i]);
2080     }
2081     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
2082 
2083     /* read in my part of the matrix column indices  */
2084     nz = procsnz[0];
2085     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
2086     mycols = ibuf;
2087     if (size == 1)  nz -= extra_rows;
2088     ierr = PetscBinaryRead(fd,mycols,nz,PETSC_INT); CHKERRQ(ierr);
2089     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
2090 
2091     /* read in every ones (except the last) and ship off */
2092     for ( i=1; i<size-1; i++ ) {
2093       nz   = procsnz[i];
2094       ierr = PetscBinaryRead(fd,cols,nz,PETSC_INT); CHKERRQ(ierr);
2095       ierr = MPI_Send(cols,nz,MPI_INT,i,tag,comm);CHKERRQ(ierr);
2096     }
2097     /* read in the stuff for the last proc */
2098     if ( size != 1 ) {
2099       nz   = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
2100       ierr = PetscBinaryRead(fd,cols,nz,PETSC_INT); CHKERRQ(ierr);
2101       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
2102       ierr = MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);CHKERRQ(ierr);
2103     }
2104     PetscFree(cols);
2105   } else {
2106     /* determine buffer space needed for message */
2107     nz = 0;
2108     for ( i=0; i<m; i++ ) {
2109       nz += locrowlens[i];
2110     }
2111     ibuf   = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
2112     mycols = ibuf;
2113     /* receive message of column indices*/
2114     ierr = MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);CHKERRQ(ierr);
2115     ierr = MPI_Get_count(&status,MPI_INT,&maxnz);CHKERRQ(ierr);
2116     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"something is wrong with file");
2117   }
2118 
2119   /* loop over local rows, determining number of off diagonal entries */
2120   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
2121   odlens = dlens + (rend-rstart);
2122   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
2123   PetscMemzero(mask,3*Mbs*sizeof(int));
2124   masked1 = mask    + Mbs;
2125   masked2 = masked1 + Mbs;
2126   rowcount = 0; nzcount = 0;
2127   for ( i=0; i<mbs; i++ ) {
2128     dcount  = 0;
2129     odcount = 0;
2130     for ( j=0; j<bs; j++ ) {
2131       kmax = locrowlens[rowcount];
2132       for ( k=0; k<kmax; k++ ) {
2133         tmp = mycols[nzcount++]/bs;
2134         if (!mask[tmp]) {
2135           mask[tmp] = 1;
2136           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
2137           else masked1[dcount++] = tmp;
2138         }
2139       }
2140       rowcount++;
2141     }
2142 
2143     dlens[i]  = dcount;
2144     odlens[i] = odcount;
2145 
2146     /* zero out the mask elements we set */
2147     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
2148     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
2149   }
2150 
2151   /* create our matrix */
2152   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);
2153          CHKERRQ(ierr);
2154   A = *newmat;
2155   MatSetOption(A,MAT_COLUMNS_SORTED);
2156 
2157   if (!rank) {
2158     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
2159     /* read in my part of the matrix numerical values  */
2160     nz = procsnz[0];
2161     vals = buf;
2162     mycols = ibuf;
2163     if (size == 1)  nz -= extra_rows;
2164     ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
2165     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
2166 
2167     /* insert into matrix */
2168     jj      = rstart*bs;
2169     for ( i=0; i<m; i++ ) {
2170       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
2171       mycols += locrowlens[i];
2172       vals   += locrowlens[i];
2173       jj++;
2174     }
2175     /* read in other processors (except the last one) and ship out */
2176     for ( i=1; i<size-1; i++ ) {
2177       nz   = procsnz[i];
2178       vals = buf;
2179       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
2180       ierr = MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);CHKERRQ(ierr);
2181     }
2182     /* the last proc */
2183     if ( size != 1 ){
2184       nz   = procsnz[i] - extra_rows;
2185       vals = buf;
2186       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
2187       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
2188       ierr = MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);CHKERRQ(ierr);
2189     }
2190     PetscFree(procsnz);
2191   } else {
2192     /* receive numeric values */
2193     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
2194 
2195     /* receive message of values*/
2196     vals   = buf;
2197     mycols = ibuf;
2198     ierr   = MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);CHKERRQ(ierr);
2199     ierr   = MPI_Get_count(&status,MPIU_SCALAR,&maxnz);CHKERRQ(ierr);
2200     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"something is wrong with file");
2201 
2202     /* insert into matrix */
2203     jj      = rstart*bs;
2204     for ( i=0; i<m; i++ ) {
2205       ierr    = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
2206       mycols += locrowlens[i];
2207       vals   += locrowlens[i];
2208       jj++;
2209     }
2210   }
2211   PetscFree(locrowlens);
2212   PetscFree(buf);
2213   PetscFree(ibuf);
2214   PetscFree(rowners);
2215   PetscFree(dlens);
2216   PetscFree(mask);
2217   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2218   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2219   PetscFunctionReturn(0);
2220 }
2221 
2222 
2223