xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision a74b47ca65aaa1a4193175fce9cb999199946187)
1 #ifdef PETSC_RCS_HEADER
2 static char vcid[] = "$Id: mpibaij.c,v 1.97 1998/01/12 20:34:41 balay Exp balay $";
3 #endif
4 
5 #include "pinclude/pviewer.h"
6 #include "src/mat/impls/baij/mpi/mpibaij.h"
7 #include "src/vec/vecimpl.h"
8 
9 
10 extern int MatSetUpMultiply_MPIBAIJ(Mat);
11 extern int DisAssemble_MPIBAIJ(Mat);
12 extern int MatIncreaseOverlap_MPIBAIJ(Mat,int,IS *,int);
13 extern int MatGetSubMatrices_MPIBAIJ(Mat,int,IS *,IS *,MatGetSubMatrixCall,Mat **);
14 
15 /*
16      Local utility routine that creates a mapping from the global column
17    number to the local number in the off-diagonal part of the local
18    storage of the matrix.  This is done in a non scable way since the
19    length of colmap equals the global matrix length.
20 */
21 #undef __FUNC__
22 #define __FUNC__ "CreateColmap_MPIBAIJ_Private"
23 static int CreateColmap_MPIBAIJ_Private(Mat mat)
24 {
25   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
26   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
27   int         nbs = B->nbs,i,bs=B->bs;;
28 
29   PetscFunctionBegin;
30   baij->colmap = (int *) PetscMalloc((baij->Nbs+1)*sizeof(int));CHKPTRQ(baij->colmap);
31   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
32   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
33   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i*bs+1;
34   PetscFunctionReturn(0);
35 }
36 
37 #define CHUNKSIZE  10
38 
39 #define  MatSetValues_SeqBAIJ_A_Private(row,col,value,addv) \
40 { \
41  \
42     brow = row/bs;  \
43     rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
44     rmax = aimax[brow]; nrow = ailen[brow]; \
45       bcol = col/bs; \
46       ridx = row % bs; cidx = col % bs; \
47       low = 0; high = nrow; \
48       while (high-low > 3) { \
49         t = (low+high)/2; \
50         if (rp[t] > bcol) high = t; \
51         else              low  = t; \
52       } \
53       for ( _i=low; _i<high; _i++ ) { \
54         if (rp[_i] > bcol) break; \
55         if (rp[_i] == bcol) { \
56           bap  = ap +  bs2*_i + bs*cidx + ridx; \
57           if (addv == ADD_VALUES) *bap += value;  \
58           else                    *bap  = value;  \
59           goto a_noinsert; \
60         } \
61       } \
62       if (a->nonew == 1) goto a_noinsert; \
63       else if (a->nonew == -1) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero into matrix"); \
64       if (nrow >= rmax) { \
65         /* there is no extra room in row, therefore enlarge */ \
66         int    new_nz = ai[a->mbs] + CHUNKSIZE,len,*new_i,*new_j; \
67         Scalar *new_a; \
68  \
69         if (a->nonew == -2) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero in the matrix"); \
70  \
71         /* malloc new storage space */ \
72         len     = new_nz*(sizeof(int)+bs2*sizeof(Scalar))+(a->mbs+1)*sizeof(int); \
73         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
74         new_j   = (int *) (new_a + bs2*new_nz); \
75         new_i   = new_j + new_nz; \
76  \
77         /* copy over old data into new slots */ \
78         for ( ii=0; ii<brow+1; ii++ ) {new_i[ii] = ai[ii];} \
79         for ( ii=brow+1; ii<a->mbs+1; ii++ ) {new_i[ii] = ai[ii]+CHUNKSIZE;} \
80         PetscMemcpy(new_j,aj,(ai[brow]+nrow)*sizeof(int)); \
81         len = (new_nz - CHUNKSIZE - ai[brow] - nrow); \
82         PetscMemcpy(new_j+ai[brow]+nrow+CHUNKSIZE,aj+ai[brow]+nrow, \
83                                                            len*sizeof(int)); \
84         PetscMemcpy(new_a,aa,(ai[brow]+nrow)*bs2*sizeof(Scalar)); \
85         PetscMemzero(new_a+bs2*(ai[brow]+nrow),bs2*CHUNKSIZE*sizeof(Scalar)); \
86         PetscMemcpy(new_a+bs2*(ai[brow]+nrow+CHUNKSIZE), \
87                     aa+bs2*(ai[brow]+nrow),bs2*len*sizeof(Scalar));  \
88         /* free up old matrix storage */ \
89         PetscFree(a->a);  \
90         if (!a->singlemalloc) {PetscFree(a->i);PetscFree(a->j);} \
91         aa = a->a = new_a; ai = a->i = new_i; aj = a->j = new_j;  \
92         a->singlemalloc = 1; \
93  \
94         rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
95         rmax = aimax[brow] = aimax[brow] + CHUNKSIZE; \
96         PLogObjectMemory(A,CHUNKSIZE*(sizeof(int) + bs2*sizeof(Scalar))); \
97         a->maxnz += bs2*CHUNKSIZE; \
98         a->reallocs++; \
99         a->nz++; \
100       } \
101       N = nrow++ - 1;  \
102       /* shift up all the later entries in this row */ \
103       for ( ii=N; ii>=_i; ii-- ) { \
104         rp[ii+1] = rp[ii]; \
105         PetscMemcpy(ap+bs2*(ii+1),ap+bs2*(ii),bs2*sizeof(Scalar)); \
106       } \
107       if (N>=_i) PetscMemzero(ap+bs2*_i,bs2*sizeof(Scalar));  \
108       rp[_i]                      = bcol;  \
109       ap[bs2*_i + bs*cidx + ridx] = value;  \
110       a_noinsert:; \
111     ailen[brow] = nrow; \
112 }
113 
114 #define  MatSetValues_SeqBAIJ_B_Private(row,col,value,addv) \
115 { \
116  \
117     brow = row/bs;  \
118     rp   = bj + bi[brow]; ap = ba + bs2*bi[brow]; \
119     rmax = bimax[brow]; nrow = bilen[brow]; \
120       bcol = col/bs; \
121       ridx = row % bs; cidx = col % bs; \
122       low = 0; high = nrow; \
123       while (high-low > 3) { \
124         t = (low+high)/2; \
125         if (rp[t] > bcol) high = t; \
126         else              low  = t; \
127       } \
128       for ( _i=low; _i<high; _i++ ) { \
129         if (rp[_i] > bcol) break; \
130         if (rp[_i] == bcol) { \
131           bap  = ap +  bs2*_i + bs*cidx + ridx; \
132           if (addv == ADD_VALUES) *bap += value;  \
133           else                    *bap  = value;  \
134           goto b_noinsert; \
135         } \
136       } \
137       if (b->nonew == 1) goto b_noinsert; \
138       else if (b->nonew == -1) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero into matrix"); \
139       if (nrow >= rmax) { \
140         /* there is no extra room in row, therefore enlarge */ \
141         int    new_nz = bi[b->mbs] + CHUNKSIZE,len,*new_i,*new_j; \
142         Scalar *new_a; \
143  \
144         if (b->nonew == -2) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero in the matrix"); \
145  \
146         /* malloc new storage space */ \
147         len     = new_nz*(sizeof(int)+bs2*sizeof(Scalar))+(b->mbs+1)*sizeof(int); \
148         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
149         new_j   = (int *) (new_a + bs2*new_nz); \
150         new_i   = new_j + new_nz; \
151  \
152         /* copy over old data into new slots */ \
153         for ( ii=0; ii<brow+1; ii++ ) {new_i[ii] = bi[ii];} \
154         for ( ii=brow+1; ii<b->mbs+1; ii++ ) {new_i[ii] = bi[ii]+CHUNKSIZE;} \
155         PetscMemcpy(new_j,bj,(bi[brow]+nrow)*sizeof(int)); \
156         len = (new_nz - CHUNKSIZE - bi[brow] - nrow); \
157         PetscMemcpy(new_j+bi[brow]+nrow+CHUNKSIZE,bj+bi[brow]+nrow, \
158                                                            len*sizeof(int)); \
159         PetscMemcpy(new_a,ba,(bi[brow]+nrow)*bs2*sizeof(Scalar)); \
160         PetscMemzero(new_a+bs2*(bi[brow]+nrow),bs2*CHUNKSIZE*sizeof(Scalar)); \
161         PetscMemcpy(new_a+bs2*(bi[brow]+nrow+CHUNKSIZE), \
162                     ba+bs2*(bi[brow]+nrow),bs2*len*sizeof(Scalar));  \
163         /* free up old matrix storage */ \
164         PetscFree(b->a);  \
165         if (!b->singlemalloc) {PetscFree(b->i);PetscFree(b->j);} \
166         ba = b->a = new_a; bi = b->i = new_i; bj = b->j = new_j;  \
167         b->singlemalloc = 1; \
168  \
169         rp   = bj + bi[brow]; ap = ba + bs2*bi[brow]; \
170         rmax = bimax[brow] = bimax[brow] + CHUNKSIZE; \
171         PLogObjectMemory(B,CHUNKSIZE*(sizeof(int) + bs2*sizeof(Scalar))); \
172         b->maxnz += bs2*CHUNKSIZE; \
173         b->reallocs++; \
174         b->nz++; \
175       } \
176       N = nrow++ - 1;  \
177       /* shift up all the later entries in this row */ \
178       for ( ii=N; ii>=_i; ii-- ) { \
179         rp[ii+1] = rp[ii]; \
180         PetscMemcpy(ap+bs2*(ii+1),ap+bs2*(ii),bs2*sizeof(Scalar)); \
181       } \
182       if (N>=_i) PetscMemzero(ap+bs2*_i,bs2*sizeof(Scalar));  \
183       rp[_i]                      = bcol;  \
184       ap[bs2*_i + bs*cidx + ridx] = value;  \
185       b_noinsert:; \
186     bilen[brow] = nrow; \
187 }
188 
189 #undef __FUNC__
190 #define __FUNC__ "MatSetValues_MPIBAIJ"
191 int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
192 {
193   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
194   Scalar      value;
195   int         ierr,i,j,row,col;
196   int         roworiented = baij->roworiented,rstart_orig=baij->rstart_bs ;
197   int         rend_orig=baij->rend_bs,cstart_orig=baij->cstart_bs;
198   int         cend_orig=baij->cend_bs,bs=baij->bs;
199 
200   /* Some Variables required in the macro */
201   Mat         A = baij->A;
202   Mat_SeqBAIJ *a = (Mat_SeqBAIJ *) (A)->data;
203   int         *aimax=a->imax,*ai=a->i,*ailen=a->ilen,*aj=a->j;
204   Scalar      *aa=a->a;
205 
206   Mat         B = baij->B;
207   Mat_SeqBAIJ *b = (Mat_SeqBAIJ *) (B)->data;
208   int         *bimax=b->imax,*bi=b->i,*bilen=b->ilen,*bj=b->j;
209   Scalar      *ba=b->a;
210 
211   int         *rp,ii,nrow,_i,rmax,N,brow,bcol;
212   int         low,high,t,ridx,cidx,bs2=a->bs2;
213   Scalar      *ap,*bap;
214 
215   PetscFunctionBegin;
216   for ( i=0; i<m; i++ ) {
217 #if defined(USE_PETSC_BOPT_g)
218     if (im[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
219     if (im[i] >= baij->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
220 #endif
221     if (im[i] >= rstart_orig && im[i] < rend_orig) {
222       row = im[i] - rstart_orig;
223       for ( j=0; j<n; j++ ) {
224         if (in[j] >= cstart_orig && in[j] < cend_orig){
225           col = in[j] - cstart_orig;
226           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
227           MatSetValues_SeqBAIJ_A_Private(row,col,value,addv);
228           /* ierr = MatSetValues_SeqBAIJ(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
229         }
230 #if defined(USE_PETSC_BOPT_g)
231         else if (in[j] < 0) {SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative column");}
232         else if (in[j] >= baij->N) {SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Col too large");}
233 #endif
234         else {
235           if (mat->was_assembled) {
236             if (!baij->colmap) {
237               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
238             }
239             col = baij->colmap[in[j]/bs] - 1 + in[j]%bs;
240             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
241               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
242               col =  in[j];
243               /* Reinitialize the variables required by MatSetValues_SeqBAIJ_B_Private() */
244               B = baij->B;
245               b = (Mat_SeqBAIJ *) (B)->data;
246               bimax=b->imax;bi=b->i;bilen=b->ilen;bj=b->j;
247               ba=b->a;
248             }
249           } else col = in[j];
250           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
251           MatSetValues_SeqBAIJ_B_Private(row,col,value,addv);
252           /* ierr = MatSetValues_SeqBAIJ(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
253         }
254       }
255     } else {
256       if (roworiented && !baij->donotstash) {
257         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
258       } else {
259         if (!baij->donotstash) {
260           row = im[i];
261 	  for ( j=0; j<n; j++ ) {
262 	    ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
263           }
264         }
265       }
266     }
267   }
268   PetscFunctionReturn(0);
269 }
270 
271 extern int MatSetValuesBlocked_SeqBAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
272 #undef __FUNC__
273 #define __FUNC__ "MatSetValuesBlocked_MPIBAIJ"
274 int MatSetValuesBlocked_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
275 {
276   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
277   Scalar      *value,*barray=baij->barray;
278   int         ierr,i,j,ii,jj,row,col,k,l;
279   int         roworiented = baij->roworiented,rstart=baij->rstart ;
280   int         rend=baij->rend,cstart=baij->cstart,stepval;
281   int         cend=baij->cend,bs=baij->bs,bs2=baij->bs2;
282 
283   if(!barray) {
284     baij->barray = barray = (Scalar*) PetscMalloc(bs2*sizeof(Scalar)); CHKPTRQ(barray);
285   }
286 
287   if (roworiented) {
288     stepval = (n-1)*bs;
289   } else {
290     stepval = (m-1)*bs;
291   }
292   for ( i=0; i<m; i++ ) {
293 #if defined(USE_PETSC_BOPT_g)
294     if (im[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
295     if (im[i] >= baij->Mbs) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
296 #endif
297     if (im[i] >= rstart && im[i] < rend) {
298       row = im[i] - rstart;
299       for ( j=0; j<n; j++ ) {
300         /* If NumCol = 1 then a copy is not required */
301         if ((roworiented) && (n == 1)) {
302           barray = v + i*bs2;
303         } else if((!roworiented) && (m == 1)) {
304           barray = v + j*bs2;
305         } else { /* Here a copy is required */
306           if (roworiented) {
307             value = v + i*(stepval+bs)*bs + j*bs;
308           } else {
309             value = v + j*(stepval+bs)*bs + i*bs;
310           }
311           for ( ii=0; ii<bs; ii++,value+=stepval ) {
312             for (jj=0; jj<bs; jj++ ) {
313               *barray++  = *value++;
314             }
315           }
316           barray -=bs2;
317         }
318 
319         if (in[j] >= cstart && in[j] < cend){
320           col  = in[j] - cstart;
321           ierr = MatSetValuesBlocked_SeqBAIJ(baij->A,1,&row,1,&col,barray,addv);CHKERRQ(ierr);
322         }
323 #if defined(USE_PETSC_BOPT_g)
324         else if (in[j] < 0) {SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative column");}
325         else if (in[j] >= baij->Nbs) {SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Column too large");}
326 #endif
327         else {
328           if (mat->was_assembled) {
329             if (!baij->colmap) {
330               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
331             }
332 
333 #if defined(USE_PETSC_BOPT_g)
334             if ((baij->colmap[in[j]] - 1) % bs) {SETERRQ(PETSC_ERR_PLIB,0,"Incorrect colmap");}
335 #endif
336             col = (baij->colmap[in[j]] - 1)/bs;
337             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
338               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
339               col =  in[j];
340             }
341           }
342           else col = in[j];
343           ierr = MatSetValuesBlocked_SeqBAIJ(baij->B,1,&row,1,&col,barray,addv);CHKERRQ(ierr);
344         }
345       }
346     } else {
347       if (!baij->donotstash) {
348         if (roworiented ) {
349           row   = im[i]*bs;
350           value = v + i*(stepval+bs)*bs;
351           for ( j=0; j<bs; j++,row++ ) {
352             for ( k=0; k<n; k++ ) {
353               for ( col=in[k]*bs,l=0; l<bs; l++,col++) {
354                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
355               }
356             }
357           }
358         } else {
359           for ( j=0; j<n; j++ ) {
360             value = v + j*(stepval+bs)*bs + i*bs;
361             col   = in[j]*bs;
362             for ( k=0; k<bs; k++,col++,value+=stepval) {
363               for ( row = im[i]*bs,l=0; l<bs; l++,row++) {
364                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
365               }
366             }
367           }
368         }
369       }
370     }
371   }
372   PetscFunctionReturn(0);
373 }
374 #include <math.h>
375 #define HASH_KEY 0.6180339887
376 #define HASH1(size,key) ((int)((size)*fmod(((key)*HASH_KEY),1)))
377 
378 #undef __FUNC__
379 #define __FUNC__ "MatSetValues_MPIBAIJ_HT"
380 int MatSetValues_MPIBAIJ_HT(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
381 {
382   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
383   int         ierr,i,j,row,col;
384   int         roworiented = baij->roworiented,rstart_orig=baij->rstart_bs ;
385   int         rend_orig=baij->rend_bs;
386 
387   int         h1,key,size=baij->ht_size,k,bs=baij->bs;
388   int         * HT  = baij->ht;
389   Scalar      ** HD = (Scalar **)(HT + size),value;
390 
391 
392   PetscFunctionBegin;
393 
394   for ( i=0; i<m; i++ ) {
395 #if defined(USE_PETSC_BOPT_g)
396     if (im[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
397     if (im[i] >= baij->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
398 #endif
399     if (im[i] >= rstart_orig && im[i] < rend_orig) {
400       row = im[i];
401       for ( j=0; j<n; j++ ) {
402         col = in[j];
403         if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
404         /* Look up into the Hash Table */
405         key = (row/bs)*baij->Nbs+(col/bs)+1;
406         h1  = HASH1(size,key);
407 
408         for ( k=0; k<size; k++ ){
409           if (HT[(h1+k)%size] == key) {
410             if (addv == ADD_VALUES) *(HD[(h1+k)%size]+ (col % bs)*bs + row % bs) += value;
411             else                    *(HD[(h1+k)%size]+ (col % bs)*bs + row % bs)  = value;
412             break;
413           }
414         }
415         if ( k==size) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"(row,col) has no entry in the hash table");
416       }
417     } else {
418       if (roworiented && !baij->donotstash) {
419         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
420       } else {
421         if (!baij->donotstash) {
422           row = im[i];
423 	  for ( j=0; j<n; j++ ) {
424 	    ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
425           }
426         }
427       }
428     }
429   }
430   PetscFunctionReturn(0);
431 }
432 
433 #undef __FUNC__
434 #define __FUNC__ "MatSetValuesBlocked_MPIBAIJ_HT"
435 int MatSetValuesBlocked_MPIBAIJ_HT(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
436 {
437   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
438   int         ierr,i,j,ii,jj,row,col,k,l;
439   int         roworiented = baij->roworiented,rstart=baij->rstart ;
440   int         rend=baij->rend,stepval,bs=baij->bs,bs2=baij->bs2;
441 
442   int         h1,key,size=baij->ht_size;
443   int         * HT  = baij->ht;
444   Scalar      ** HD = (Scalar **)(HT + size),*value,*baij_a;
445 
446   PetscFunctionBegin;
447 
448   if (roworiented) {
449     stepval = (n-1)*bs;
450   } else {
451     stepval = (m-1)*bs;
452   }
453   for ( i=0; i<m; i++ ) {
454 #if defined(USE_PETSC_BOPT_g)
455     if (im[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
456     if (im[i] >= baij->Mbs) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
457 #endif
458     if (im[i] >= rstart && im[i] < rend) {
459       row = im[i];
460       for ( j=0; j<n; j++ ) {
461         col = in[j];
462 
463         /* Look up into the Hash Table */
464         key = row*baij->Nbs+col+1;
465         h1  = HASH1(size,key);
466 
467         for ( k=0; k<size; k++ ){
468           if (HT[(h1+k)%size] == key) {
469             baij_a = HD[(h1+k)%size];
470 
471             if (roworiented) {
472               value = v + i*(stepval+bs)*bs + j*bs;
473               if (addv == ADD_VALUES) {
474                 for ( ii=0; ii<bs; ii++,value+=stepval,baij_a++ ) {
475                   for ( jj=0; jj<bs2; jj+=bs ) {
476                     baij_a[jj]  += *value++;
477                   }
478                 }
479               } else {
480                 for ( ii=0; ii<bs; ii++,value+=stepval,baij_a++ ) {
481                   for ( jj=0; jj<bs2; jj+=bs ) {
482                     baij_a[jj]  = *value++;
483                   }
484                 }
485               }
486             } else {
487               value = v + j*(stepval+bs)*bs + i*bs;
488               if (addv == ADD_VALUES) {
489                 for ( ii=0; ii<bs; ii++,value+=stepval,baij_a+=bs ) {
490                   for ( jj=0; jj<bs; jj++ ) {
491                     baij_a[jj]  += *value++;
492                   }
493                 }
494               } else {
495                 for ( ii=0; ii<bs; ii++,value+=stepval,baij_a+=bs ) {
496                   for ( jj=0; jj<bs; jj++ ) {
497                     baij_a[jj]  = *value++;
498                   }
499                 }
500               }
501             }
502             break;
503           }
504         }
505         if ( k==size) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"(row,col) has no entry in the hash table");
506       }
507     } else {
508       if (!baij->donotstash) {
509         if (roworiented ) {
510           row   = im[i]*bs;
511           value = v + i*(stepval+bs)*bs;
512           for ( j=0; j<bs; j++,row++ ) {
513             for ( k=0; k<n; k++ ) {
514               for ( col=in[k]*bs,l=0; l<bs; l++,col++) {
515                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
516               }
517             }
518           }
519         } else {
520           for ( j=0; j<n; j++ ) {
521             value = v + j*(stepval+bs)*bs + i*bs;
522             col   = in[j]*bs;
523             for ( k=0; k<bs; k++,col++,value+=stepval) {
524               for ( row = im[i]*bs,l=0; l<bs; l++,row++) {
525                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
526               }
527             }
528           }
529         }
530       }
531     }
532   }
533   PetscFunctionReturn(0);
534 }
535 #undef __FUNC__
536 #define __FUNC__ "MatGetValues_MPIBAIJ"
537 int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
538 {
539   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
540   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
541   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col;
542 
543   for ( i=0; i<m; i++ ) {
544     if (idxm[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
545     if (idxm[i] >= baij->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
546     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
547       row = idxm[i] - bsrstart;
548       for ( j=0; j<n; j++ ) {
549         if (idxn[j] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative column");
550         if (idxn[j] >= baij->N) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Column too large");
551         if (idxn[j] >= bscstart && idxn[j] < bscend){
552           col = idxn[j] - bscstart;
553           ierr = MatGetValues(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
554         } else {
555           if (!baij->colmap) {
556             ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
557           }
558           if((baij->colmap[idxn[j]/bs]-1 < 0) ||
559              (baij->garray[(baij->colmap[idxn[j]/bs]-1)/bs] != idxn[j]/bs)) *(v+i*n+j) = 0.0;
560           else {
561             col  = (baij->colmap[idxn[j]/bs]-1) + idxn[j]%bs;
562             ierr = MatGetValues(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
563           }
564         }
565       }
566     } else {
567       SETERRQ(PETSC_ERR_SUP,0,"Only local values currently supported");
568     }
569   }
570   PetscFunctionReturn(0);
571 }
572 
573 #undef __FUNC__
574 #define __FUNC__ "MatNorm_MPIBAIJ"
575 int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
576 {
577   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
578   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
579   int        ierr, i,bs2=baij->bs2;
580   double     sum = 0.0;
581   Scalar     *v;
582 
583   PetscFunctionBegin;
584   if (baij->size == 1) {
585     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
586   } else {
587     if (type == NORM_FROBENIUS) {
588       v = amat->a;
589       for (i=0; i<amat->nz*bs2; i++ ) {
590 #if defined(USE_PETSC_COMPLEX)
591         sum += real(conj(*v)*(*v)); v++;
592 #else
593         sum += (*v)*(*v); v++;
594 #endif
595       }
596       v = bmat->a;
597       for (i=0; i<bmat->nz*bs2; i++ ) {
598 #if defined(USE_PETSC_COMPLEX)
599         sum += real(conj(*v)*(*v)); v++;
600 #else
601         sum += (*v)*(*v); v++;
602 #endif
603       }
604       ierr = MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);CHKERRQ(ierr);
605       *norm = sqrt(*norm);
606     } else {
607       SETERRQ(PETSC_ERR_SUP,0,"No support for this norm yet");
608     }
609   }
610   PetscFunctionReturn(0);
611 }
612 
613 #undef __FUNC__
614 #define __FUNC__ "MatAssemblyBegin_MPIBAIJ"
615 int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
616 {
617   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
618   MPI_Comm    comm = mat->comm;
619   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
620   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
621   MPI_Request *send_waits,*recv_waits;
622   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
623   InsertMode  addv;
624   Scalar      *rvalues,*svalues;
625 
626   PetscFunctionBegin;
627   /* make sure all processors are either in INSERTMODE or ADDMODE */
628   ierr = MPI_Allreduce(&mat->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);CHKERRQ(ierr);
629   if (addv == (ADD_VALUES|INSERT_VALUES)) {
630     SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"Some processors inserted others added");
631   }
632   mat->insertmode = addv; /* in case this processor had no cache */
633 
634   /*  first count number of contributors to each processor */
635   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
636   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
637   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
638   for ( i=0; i<baij->stash.n; i++ ) {
639     idx = baij->stash.idx[i];
640     for ( j=0; j<size; j++ ) {
641       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
642         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
643       }
644     }
645   }
646   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
647 
648   /* inform other processors of number of messages and max length*/
649   work      = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
650   ierr      = MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
651   nreceives = work[rank];
652   ierr      = MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);CHKERRQ(ierr);
653   nmax      = work[rank];
654   PetscFree(work);
655 
656   /* post receives:
657        1) each message will consist of ordered pairs
658      (global index,value) we store the global index as a double
659      to simplify the message passing.
660        2) since we don't know how long each individual message is we
661      allocate the largest needed buffer for each receive. Potentially
662      this is a lot of wasted space.
663 
664 
665        This could be done better.
666   */
667   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));CHKPTRQ(rvalues);
668   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
669   for ( i=0; i<nreceives; i++ ) {
670     ierr = MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,comm,recv_waits+i);CHKERRQ(ierr);
671   }
672 
673   /* do sends:
674       1) starts[i] gives the starting index in svalues for stuff going to
675          the ith processor
676   */
677   svalues = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
678   send_waits = (MPI_Request *) PetscMalloc((nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
679   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
680   starts[0] = 0;
681   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
682   for ( i=0; i<baij->stash.n; i++ ) {
683     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
684     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
685     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
686   }
687   PetscFree(owner);
688   starts[0] = 0;
689   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
690   count = 0;
691   for ( i=0; i<size; i++ ) {
692     if (procs[i]) {
693       ierr = MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,comm,send_waits+count++);CHKERRQ(ierr);
694     }
695   }
696   PetscFree(starts); PetscFree(nprocs);
697 
698   /* Free cache space */
699   PLogInfo(0,"[%d]MatAssemblyBegin_MPIBAIJ:Number of off-processor values %d\n",rank,baij->stash.n);
700   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
701 
702   baij->svalues    = svalues;    baij->rvalues    = rvalues;
703   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
704   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
705   baij->rmax       = nmax;
706 
707   PetscFunctionReturn(0);
708 }
709 
710 /*
711   Creates the hash table, and sets the table
712   This table is created only once.
713   If new entried need to be added to the matrix
714   then the hash table has to be destroyed and
715   recreated.
716 */
717 #undef __FUNC__
718 #define __FUNC__ "MatCreateHashTable_MPIBAIJ_Private"
719 int MatCreateHashTable_MPIBAIJ_Private(Mat mat,double factor)
720 {
721   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
722   Mat         A = baij->A, B=baij->B;
723   Mat_SeqBAIJ *a=(Mat_SeqBAIJ *)A->data, *b=(Mat_SeqBAIJ *)B->data;
724   int         i,j,k,nz=a->nz+b->nz,h1,*ai=a->i,*aj=a->j,*bi=b->i,*bj=b->j;
725   int         size,ct=0,max1=0,max2=0,bs2=baij->bs2,rstart=baij->rstart;
726   int         cstart=baij->cstart,*garray=baij->garray,row,col;
727   int         *HT,key;
728   Scalar      **HD;
729 
730   PetscFunctionBegin;
731   baij->ht_size=(int)(factor*nz);
732   size = baij->ht_size;
733 
734   if (baij->ht) {
735     PetscFunctionReturn(0);
736   }
737 
738   /* Allocate Memory for Hash Table */
739   baij->ht = (int*)PetscMalloc(size*(sizeof(int)+sizeof(Scalar*))+1); CHKPTRQ(baij->ht);
740   HT = baij->ht;
741   HD = (Scalar**)(HT + size);
742   PetscMemzero(HT,size*sizeof(int)+sizeof(Scalar*));
743 
744 
745   /* Loop Over A */
746   for ( i=0; i<a->mbs; i++ ) {
747     for ( j=ai[i]; j<ai[i+1]; j++ ) {
748       row = i+rstart;
749       col = aj[j]+cstart;
750 
751       key = row*baij->Nbs + col + 1;
752       /* printf("[%d]row,col,key = %2d,%2d,%5.2f\n",PetscGlobalRank,row,col,key); */
753       h1  = HASH1(size,key);
754 
755       for ( k=0; k<size; k++ ){
756         if (HT[(h1+k)%size] == 0.0) {
757           HT[(h1+k)%size] = key;
758           HD[(h1+k)%size] = a->a + j*bs2;
759           break;
760         } else ct++;
761       }
762       if (k> max1) max1 = k;
763     }
764   }
765   /* Loop Over B */
766   for ( i=0; i<b->mbs; i++ ) {
767     for ( j=bi[i]; j<bi[i+1]; j++ ) {
768       row = i+rstart;
769       col = garray[bj[j]];
770       key = row*baij->Nbs + col + 1;
771       /* printf("[%d]row,col,key = %2d,%2d,%5.2f\n",PetscGlobalRank,row,col,key); */
772       h1  = HASH1(size,key);
773       for ( k=0; k<size; k++ ){
774         if (HT[(h1+k)%size] == 0.0) {
775           HT[(h1+k)%size] = key;
776           HD[(h1+k)%size] = b->a + j*bs2;
777           break;
778         } else ct++;
779       }
780       if (k> max2) max2 = k;
781     }
782   }
783 
784   /* Print Summary */
785   for ( i=0,key=0.0,j=0; i<size; i++)
786     if (HT[i]) {j++;}
787 
788   /*printf("[%d] fact = %3.2f max1 = %5d max2 = %5d Size %5d - Searches %5d Avg %5.2f Keys %5d\n",
789          PetscGlobalRank,factor,max1,max2,size,ct+j,((float)ct+j),j); */
790   fflush(stdout);
791   PetscFunctionReturn(0);
792 }
793 
794 #undef __FUNC__
795 #define __FUNC__ "MatAssemblyEnd_MPIBAIJ"
796 int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
797 {
798   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
799   MPI_Status  *send_status,recv_status;
800   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
801   int         bs=baij->bs,row,col,other_disassembled,flg;
802   Scalar      *values,val;
803   InsertMode  addv = mat->insertmode;
804 
805   PetscFunctionBegin;
806   /*  wait on receives */
807   while (count) {
808     ierr = MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);CHKERRQ(ierr);
809     /* unpack receives into our local space */
810     values = baij->rvalues + 3*imdex*baij->rmax;
811     ierr = MPI_Get_count(&recv_status,MPIU_SCALAR,&n);CHKERRQ(ierr);
812     n = n/3;
813     for ( i=0; i<n; i++ ) {
814       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
815       col = (int) PetscReal(values[3*i+1]);
816       val = values[3*i+2];
817       if (col >= baij->cstart*bs && col < baij->cend*bs) {
818         col -= baij->cstart*bs;
819         ierr = MatSetValues(baij->A,1,&row,1,&col,&val,addv); CHKERRQ(ierr)
820       } else {
821         if (mat->was_assembled) {
822           if (!baij->colmap) {
823             ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);
824           }
825           col = (baij->colmap[col/bs]) - 1 + col%bs;
826           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
827             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
828             col = (int) PetscReal(values[3*i+1]);
829           }
830         }
831         ierr = MatSetValues(baij->B,1,&row,1,&col,&val,addv); CHKERRQ(ierr)
832       }
833     }
834     count--;
835   }
836   PetscFree(baij->recv_waits); PetscFree(baij->rvalues);
837 
838   /* wait on sends */
839   if (baij->nsends) {
840     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
841     ierr        = MPI_Waitall(baij->nsends,baij->send_waits,send_status);CHKERRQ(ierr);
842     PetscFree(send_status);
843   }
844   PetscFree(baij->send_waits); PetscFree(baij->svalues);
845 
846   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
847   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
848 
849   /* determine if any processor has disassembled, if so we must
850      also disassemble ourselfs, in order that we may reassemble. */
851   ierr = MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);CHKERRQ(ierr);
852   if (mat->was_assembled && !other_disassembled) {
853     ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
854   }
855 
856   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
857     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
858   }
859   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
860   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
861 
862   ierr = OptionsHasName(PETSC_NULL,"-use_hash",&flg); CHKERRQ(ierr);
863   if (flg && !baij->ht && mode== MAT_FINAL_ASSEMBLY) {
864     double fact = 1.39;
865     ierr = OptionsGetDouble(PETSC_NULL,"-use_hash",&fact,&flg); CHKERRQ(ierr);
866     if (fact <= 1.0) fact = 1.39;
867     PLogInfo(0,"[%d]MatAssemblyEnd_MPIBAIJ:Hash table Factor used %5.2f\n",PetscGlobalRank,fact);
868     ierr = MatCreateHashTable_MPIBAIJ_Private(mat,fact); CHKERRQ(ierr);
869     mat->ops.setvalues        = MatSetValues_MPIBAIJ_HT;
870     mat->ops.setvaluesblocked = MatSetValuesBlocked_MPIBAIJ_HT;
871   }
872   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
873   PetscFunctionReturn(0);
874 }
875 
876 #undef __FUNC__
877 #define __FUNC__ "MatView_MPIBAIJ_Binary"
878 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
879 {
880   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
881   int          ierr;
882 
883   PetscFunctionBegin;
884   if (baij->size == 1) {
885     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
886   } else SETERRQ(PETSC_ERR_SUP,0,"Only uniprocessor output supported");
887   PetscFunctionReturn(0);
888 }
889 
890 #undef __FUNC__
891 #define __FUNC__ "MatView_MPIBAIJ_ASCIIorDraworMatlab"
892 static int MatView_MPIBAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
893 {
894   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
895   int          ierr, format,rank,bs = baij->bs;
896   FILE         *fd;
897   ViewerType   vtype;
898 
899   PetscFunctionBegin;
900   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
901   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
902     ierr = ViewerGetFormat(viewer,&format);
903     if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
904       MatInfo info;
905       MPI_Comm_rank(mat->comm,&rank);
906       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
907       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
908       PetscSequentialPhaseBegin(mat->comm,1);
909       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
910               rank,baij->m,(int)info.nz_used*bs,(int)info.nz_allocated*bs,
911               baij->bs,(int)info.memory);
912       ierr = MatGetInfo(baij->A,MAT_LOCAL,&info);
913       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
914       ierr = MatGetInfo(baij->B,MAT_LOCAL,&info);
915       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
916       fflush(fd);
917       PetscSequentialPhaseEnd(mat->comm,1);
918       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
919       PetscFunctionReturn(0);
920     } else if (format == VIEWER_FORMAT_ASCII_INFO) {
921       PetscPrintf(mat->comm,"  block size is %d\n",bs);
922       PetscFunctionReturn(0);
923     }
924   }
925 
926   if (vtype == DRAW_VIEWER) {
927     Draw       draw;
928     PetscTruth isnull;
929     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
930     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) PetscFunctionReturn(0);
931   }
932 
933   if (vtype == ASCII_FILE_VIEWER) {
934     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
935     PetscSequentialPhaseBegin(mat->comm,1);
936     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
937            baij->rank,baij->m,baij->rstart*bs,baij->rend*bs,baij->n,
938             baij->cstart*bs,baij->cend*bs);
939     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
940     ierr = MatView(baij->B,viewer); CHKERRQ(ierr);
941     fflush(fd);
942     PetscSequentialPhaseEnd(mat->comm,1);
943   } else {
944     int size = baij->size;
945     rank = baij->rank;
946     if (size == 1) {
947       ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
948     } else {
949       /* assemble the entire matrix onto first processor. */
950       Mat         A;
951       Mat_SeqBAIJ *Aloc;
952       int         M = baij->M, N = baij->N,*ai,*aj,row,col,i,j,k,*rvals;
953       int         mbs=baij->mbs;
954       Scalar      *a;
955 
956       if (!rank) {
957         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);CHKERRQ(ierr);
958       } else {
959         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);CHKERRQ(ierr);
960       }
961       PLogObjectParent(mat,A);
962 
963       /* copy over the A part */
964       Aloc = (Mat_SeqBAIJ*) baij->A->data;
965       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
966       row = baij->rstart;
967       rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
968 
969       for ( i=0; i<mbs; i++ ) {
970         rvals[0] = bs*(baij->rstart + i);
971         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
972         for ( j=ai[i]; j<ai[i+1]; j++ ) {
973           col = (baij->cstart+aj[j])*bs;
974           for (k=0; k<bs; k++ ) {
975             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
976             col++; a += bs;
977           }
978         }
979       }
980       /* copy over the B part */
981       Aloc = (Mat_SeqBAIJ*) baij->B->data;
982       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
983       row = baij->rstart*bs;
984       for ( i=0; i<mbs; i++ ) {
985         rvals[0] = bs*(baij->rstart + i);
986         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
987         for ( j=ai[i]; j<ai[i+1]; j++ ) {
988           col = baij->garray[aj[j]]*bs;
989           for (k=0; k<bs; k++ ) {
990             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
991             col++; a += bs;
992           }
993         }
994       }
995       PetscFree(rvals);
996       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
997       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
998       /*
999          Everyone has to call to draw the matrix since the graphics waits are
1000          synchronized across all processors that share the Draw object
1001       */
1002       if (!rank || vtype == DRAW_VIEWER) {
1003         ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
1004       }
1005       ierr = MatDestroy(A); CHKERRQ(ierr);
1006     }
1007   }
1008   PetscFunctionReturn(0);
1009 }
1010 
1011 
1012 
1013 #undef __FUNC__
1014 #define __FUNC__ "MatView_MPIBAIJ"
1015 int MatView_MPIBAIJ(PetscObject obj,Viewer viewer)
1016 {
1017   Mat         mat = (Mat) obj;
1018   int         ierr;
1019   ViewerType  vtype;
1020 
1021   PetscFunctionBegin;
1022   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
1023   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
1024       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
1025     ierr = MatView_MPIBAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
1026   } else if (vtype == BINARY_FILE_VIEWER) {
1027     ierr = MatView_MPIBAIJ_Binary(mat,viewer);CHKERRQ(ierr);
1028   }
1029   PetscFunctionReturn(0);
1030 }
1031 
1032 #undef __FUNC__
1033 #define __FUNC__ "MatDestroy_MPIBAIJ"
1034 int MatDestroy_MPIBAIJ(PetscObject obj)
1035 {
1036   Mat         mat = (Mat) obj;
1037   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1038   int         ierr;
1039 
1040   PetscFunctionBegin;
1041 #if defined(USE_PETSC_LOG)
1042   PLogObjectState(obj,"Rows=%d, Cols=%d",baij->M,baij->N);
1043 #endif
1044 
1045   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
1046   PetscFree(baij->rowners);
1047   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
1048   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
1049   if (baij->colmap) PetscFree(baij->colmap);
1050   if (baij->garray) PetscFree(baij->garray);
1051   if (baij->lvec)   VecDestroy(baij->lvec);
1052   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
1053   if (baij->rowvalues) PetscFree(baij->rowvalues);
1054   if (baij->barray) PetscFree(baij->barray);
1055   if (baij->ht) PetscFree(baij->ht);
1056   PetscFree(baij);
1057   PLogObjectDestroy(mat);
1058   PetscHeaderDestroy(mat);
1059   PetscFunctionReturn(0);
1060 }
1061 
1062 #undef __FUNC__
1063 #define __FUNC__ "MatMult_MPIBAIJ"
1064 int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
1065 {
1066   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1067   int         ierr, nt;
1068 
1069   PetscFunctionBegin;
1070   VecGetLocalSize_Fast(xx,nt);
1071   if (nt != a->n) {
1072     SETERRQ(PETSC_ERR_ARG_SIZ,0,"Incompatible partition of A and xx");
1073   }
1074   VecGetLocalSize_Fast(yy,nt);
1075   if (nt != a->m) {
1076     SETERRQ(PETSC_ERR_ARG_SIZ,0,"Incompatible parition of A and yy");
1077   }
1078   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
1079   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
1080   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
1081   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
1082   ierr = VecScatterPostRecvs(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
1083   PetscFunctionReturn(0);
1084 }
1085 
1086 #undef __FUNC__
1087 #define __FUNC__ "MatMultAdd_MPIBAIJ"
1088 int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
1089 {
1090   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1091   int        ierr;
1092 
1093   PetscFunctionBegin;
1094   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
1095   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
1096   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
1097   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
1098   PetscFunctionReturn(0);
1099 }
1100 
1101 #undef __FUNC__
1102 #define __FUNC__ "MatMultTrans_MPIBAIJ"
1103 int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
1104 {
1105   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1106   int         ierr;
1107 
1108   PetscFunctionBegin;
1109   /* do nondiagonal part */
1110   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
1111   /* send it on its way */
1112   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
1113   /* do local part */
1114   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
1115   /* receive remote parts: note this assumes the values are not actually */
1116   /* inserted in yy until the next line, which is true for my implementation*/
1117   /* but is not perhaps always true. */
1118   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
1119   PetscFunctionReturn(0);
1120 }
1121 
1122 #undef __FUNC__
1123 #define __FUNC__ "MatMultTransAdd_MPIBAIJ"
1124 int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
1125 {
1126   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1127   int         ierr;
1128 
1129   PetscFunctionBegin;
1130   /* do nondiagonal part */
1131   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
1132   /* send it on its way */
1133   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
1134   /* do local part */
1135   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
1136   /* receive remote parts: note this assumes the values are not actually */
1137   /* inserted in yy until the next line, which is true for my implementation*/
1138   /* but is not perhaps always true. */
1139   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
1140   PetscFunctionReturn(0);
1141 }
1142 
1143 /*
1144   This only works correctly for square matrices where the subblock A->A is the
1145    diagonal block
1146 */
1147 #undef __FUNC__
1148 #define __FUNC__ "MatGetDiagonal_MPIBAIJ"
1149 int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
1150 {
1151   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1152   int         ierr;
1153 
1154   PetscFunctionBegin;
1155   if (a->M != a->N) SETERRQ(PETSC_ERR_SUP,0,"Supports only square matrix where A->A is diag block");
1156   ierr = MatGetDiagonal(a->A,v);CHKERRQ(ierr);
1157   PetscFunctionReturn(0);
1158 }
1159 
1160 #undef __FUNC__
1161 #define __FUNC__ "MatScale_MPIBAIJ"
1162 int MatScale_MPIBAIJ(Scalar *aa,Mat A)
1163 {
1164   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1165   int         ierr;
1166 
1167   PetscFunctionBegin;
1168   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
1169   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
1170   PetscFunctionReturn(0);
1171 }
1172 
1173 #undef __FUNC__
1174 #define __FUNC__ "MatGetSize_MPIBAIJ"
1175 int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
1176 {
1177   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1178 
1179   PetscFunctionBegin;
1180   if (m) *m = mat->M;
1181   if (n) *n = mat->N;
1182   PetscFunctionReturn(0);
1183 }
1184 
1185 #undef __FUNC__
1186 #define __FUNC__ "MatGetLocalSize_MPIBAIJ"
1187 int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
1188 {
1189   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1190 
1191   PetscFunctionBegin;
1192   *m = mat->m; *n = mat->N;
1193   PetscFunctionReturn(0);
1194 }
1195 
1196 #undef __FUNC__
1197 #define __FUNC__ "MatGetOwnershipRange_MPIBAIJ"
1198 int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
1199 {
1200   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1201 
1202   PetscFunctionBegin;
1203   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
1204   PetscFunctionReturn(0);
1205 }
1206 
1207 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
1208 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
1209 
1210 #undef __FUNC__
1211 #define __FUNC__ "MatGetRow_MPIBAIJ"
1212 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
1213 {
1214   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1215   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
1216   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
1217   int        nztot, nzA, nzB, lrow, brstart = mat->rstart*bs, brend = mat->rend*bs;
1218   int        *cmap, *idx_p,cstart = mat->cstart;
1219 
1220   PetscFunctionBegin;
1221   if (mat->getrowactive == PETSC_TRUE) SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"Already active");
1222   mat->getrowactive = PETSC_TRUE;
1223 
1224   if (!mat->rowvalues && (idx || v)) {
1225     /*
1226         allocate enough space to hold information from the longest row.
1227     */
1228     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
1229     int     max = 1,mbs = mat->mbs,tmp;
1230     for ( i=0; i<mbs; i++ ) {
1231       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
1232       if (max < tmp) { max = tmp; }
1233     }
1234     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
1235     CHKPTRQ(mat->rowvalues);
1236     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
1237   }
1238 
1239   if (row < brstart || row >= brend) SETERRQ(PETSC_ERR_SUP,0,"Only local rows")
1240   lrow = row - brstart;
1241 
1242   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
1243   if (!v)   {pvA = 0; pvB = 0;}
1244   if (!idx) {pcA = 0; if (!v) pcB = 0;}
1245   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1246   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1247   nztot = nzA + nzB;
1248 
1249   cmap  = mat->garray;
1250   if (v  || idx) {
1251     if (nztot) {
1252       /* Sort by increasing column numbers, assuming A and B already sorted */
1253       int imark = -1;
1254       if (v) {
1255         *v = v_p = mat->rowvalues;
1256         for ( i=0; i<nzB; i++ ) {
1257           if (cmap[cworkB[i]/bs] < cstart)   v_p[i] = vworkB[i];
1258           else break;
1259         }
1260         imark = i;
1261         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
1262         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
1263       }
1264       if (idx) {
1265         *idx = idx_p = mat->rowindices;
1266         if (imark > -1) {
1267           for ( i=0; i<imark; i++ ) {
1268             idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs;
1269           }
1270         } else {
1271           for ( i=0; i<nzB; i++ ) {
1272             if (cmap[cworkB[i]/bs] < cstart)
1273               idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
1274             else break;
1275           }
1276           imark = i;
1277         }
1278         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart*bs + cworkA[i];
1279         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
1280       }
1281     } else {
1282       if (idx) *idx = 0;
1283       if (v)   *v   = 0;
1284     }
1285   }
1286   *nz = nztot;
1287   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1288   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1289   PetscFunctionReturn(0);
1290 }
1291 
1292 #undef __FUNC__
1293 #define __FUNC__ "MatRestoreRow_MPIBAIJ"
1294 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
1295 {
1296   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1297 
1298   PetscFunctionBegin;
1299   if (baij->getrowactive == PETSC_FALSE) {
1300     SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"MatGetRow not called");
1301   }
1302   baij->getrowactive = PETSC_FALSE;
1303   PetscFunctionReturn(0);
1304 }
1305 
1306 #undef __FUNC__
1307 #define __FUNC__ "MatGetBlockSize_MPIBAIJ"
1308 int MatGetBlockSize_MPIBAIJ(Mat mat,int *bs)
1309 {
1310   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1311 
1312   PetscFunctionBegin;
1313   *bs = baij->bs;
1314   PetscFunctionReturn(0);
1315 }
1316 
1317 #undef __FUNC__
1318 #define __FUNC__ "MatZeroEntries_MPIBAIJ"
1319 int MatZeroEntries_MPIBAIJ(Mat A)
1320 {
1321   Mat_MPIBAIJ *l = (Mat_MPIBAIJ *) A->data;
1322   int         ierr;
1323 
1324   PetscFunctionBegin;
1325   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
1326   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
1327   PetscFunctionReturn(0);
1328 }
1329 
1330 #undef __FUNC__
1331 #define __FUNC__ "MatGetInfo_MPIBAIJ"
1332 int MatGetInfo_MPIBAIJ(Mat matin,MatInfoType flag,MatInfo *info)
1333 {
1334   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) matin->data;
1335   Mat         A = a->A, B = a->B;
1336   int         ierr;
1337   double      isend[5], irecv[5];
1338 
1339   PetscFunctionBegin;
1340   info->block_size     = (double)a->bs;
1341   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
1342   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->memory;
1343   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
1344   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->memory;
1345   if (flag == MAT_LOCAL) {
1346     info->nz_used      = isend[0];
1347     info->nz_allocated = isend[1];
1348     info->nz_unneeded  = isend[2];
1349     info->memory       = isend[3];
1350     info->mallocs      = isend[4];
1351   } else if (flag == MAT_GLOBAL_MAX) {
1352     ierr = MPI_Allreduce(isend,irecv,5,MPI_INT,MPI_MAX,matin->comm);CHKERRQ(ierr);
1353     info->nz_used      = irecv[0];
1354     info->nz_allocated = irecv[1];
1355     info->nz_unneeded  = irecv[2];
1356     info->memory       = irecv[3];
1357     info->mallocs      = irecv[4];
1358   } else if (flag == MAT_GLOBAL_SUM) {
1359     ierr = MPI_Allreduce(isend,irecv,5,MPI_INT,MPI_SUM,matin->comm);CHKERRQ(ierr);
1360     info->nz_used      = irecv[0];
1361     info->nz_allocated = irecv[1];
1362     info->nz_unneeded  = irecv[2];
1363     info->memory       = irecv[3];
1364     info->mallocs      = irecv[4];
1365   }
1366   info->rows_global       = (double)a->M;
1367   info->columns_global    = (double)a->N;
1368   info->rows_local        = (double)a->m;
1369   info->columns_local     = (double)a->N;
1370   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
1371   info->fill_ratio_needed = 0;
1372   info->factor_mallocs    = 0;
1373   PetscFunctionReturn(0);
1374 }
1375 
1376 #undef __FUNC__
1377 #define __FUNC__ "MatSetOption_MPIBAIJ"
1378 int MatSetOption_MPIBAIJ(Mat A,MatOption op)
1379 {
1380   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1381 
1382   PetscFunctionBegin;
1383   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
1384       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
1385       op == MAT_COLUMNS_UNSORTED ||
1386       op == MAT_COLUMNS_SORTED ||
1387       op == MAT_NEW_NONZERO_ALLOCATION_ERROR ||
1388       op == MAT_NEW_NONZERO_LOCATION_ERROR) {
1389         MatSetOption(a->A,op);
1390         MatSetOption(a->B,op);
1391   } else if (op == MAT_ROW_ORIENTED) {
1392         a->roworiented = 1;
1393         MatSetOption(a->A,op);
1394         MatSetOption(a->B,op);
1395   } else if (op == MAT_ROWS_SORTED ||
1396              op == MAT_ROWS_UNSORTED ||
1397              op == MAT_SYMMETRIC ||
1398              op == MAT_STRUCTURALLY_SYMMETRIC ||
1399              op == MAT_YES_NEW_DIAGONALS)
1400     PLogInfo(A,"Info:MatSetOption_MPIBAIJ:Option ignored\n");
1401   else if (op == MAT_COLUMN_ORIENTED) {
1402     a->roworiented = 0;
1403     MatSetOption(a->A,op);
1404     MatSetOption(a->B,op);
1405   } else if (op == MAT_IGNORE_OFF_PROC_ENTRIES) {
1406     a->donotstash = 1;
1407   } else if (op == MAT_NO_NEW_DIAGONALS) {
1408     SETERRQ(PETSC_ERR_SUP,0,"MAT_NO_NEW_DIAGONALS");
1409   } else {
1410     SETERRQ(PETSC_ERR_SUP,0,"unknown option");
1411   }
1412   PetscFunctionReturn(0);
1413 }
1414 
1415 #undef __FUNC__
1416 #define __FUNC__ "MatTranspose_MPIBAIJ("
1417 int MatTranspose_MPIBAIJ(Mat A,Mat *matout)
1418 {
1419   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) A->data;
1420   Mat_SeqBAIJ *Aloc;
1421   Mat        B;
1422   int        ierr,M=baij->M,N=baij->N,*ai,*aj,row,i,*rvals,j,k,col;
1423   int        bs=baij->bs,mbs=baij->mbs;
1424   Scalar     *a;
1425 
1426   PetscFunctionBegin;
1427   if (matout == PETSC_NULL && M != N) SETERRQ(PETSC_ERR_ARG_SIZ,0,"Square matrix only for in-place");
1428   ierr = MatCreateMPIBAIJ(A->comm,baij->bs,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);
1429   CHKERRQ(ierr);
1430 
1431   /* copy over the A part */
1432   Aloc = (Mat_SeqBAIJ*) baij->A->data;
1433   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1434   row = baij->rstart;
1435   rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
1436 
1437   for ( i=0; i<mbs; i++ ) {
1438     rvals[0] = bs*(baij->rstart + i);
1439     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1440     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1441       col = (baij->cstart+aj[j])*bs;
1442       for (k=0; k<bs; k++ ) {
1443         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1444         col++; a += bs;
1445       }
1446     }
1447   }
1448   /* copy over the B part */
1449   Aloc = (Mat_SeqBAIJ*) baij->B->data;
1450   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1451   row = baij->rstart*bs;
1452   for ( i=0; i<mbs; i++ ) {
1453     rvals[0] = bs*(baij->rstart + i);
1454     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1455     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1456       col = baij->garray[aj[j]]*bs;
1457       for (k=0; k<bs; k++ ) {
1458         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1459         col++; a += bs;
1460       }
1461     }
1462   }
1463   PetscFree(rvals);
1464   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1465   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1466 
1467   if (matout != PETSC_NULL) {
1468     *matout = B;
1469   } else {
1470     /* This isn't really an in-place transpose .... but free data structures from baij */
1471     PetscFree(baij->rowners);
1472     ierr = MatDestroy(baij->A); CHKERRQ(ierr);
1473     ierr = MatDestroy(baij->B); CHKERRQ(ierr);
1474     if (baij->colmap) PetscFree(baij->colmap);
1475     if (baij->garray) PetscFree(baij->garray);
1476     if (baij->lvec) VecDestroy(baij->lvec);
1477     if (baij->Mvctx) VecScatterDestroy(baij->Mvctx);
1478     PetscFree(baij);
1479     PetscMemcpy(A,B,sizeof(struct _p_Mat));
1480     PetscHeaderDestroy(B);
1481   }
1482   PetscFunctionReturn(0);
1483 }
1484 
1485 #undef __FUNC__
1486 #define __FUNC__ "MatDiagonalScale_MPIBAIJ"
1487 int MatDiagonalScale_MPIBAIJ(Mat A,Vec ll,Vec rr)
1488 {
1489   Mat a = ((Mat_MPIBAIJ *) A->data)->A;
1490   Mat b = ((Mat_MPIBAIJ *) A->data)->B;
1491   int ierr,s1,s2,s3;
1492 
1493   PetscFunctionBegin;
1494   if (ll)  {
1495     ierr = VecGetLocalSize(ll,&s1); CHKERRQ(ierr);
1496     ierr = MatGetLocalSize(A,&s2,&s3); CHKERRQ(ierr);
1497     if (s1!=s2) SETERRQ(PETSC_ERR_ARG_SIZ,0,"non-conforming local sizes");
1498     ierr = MatDiagonalScale(a,ll,0); CHKERRQ(ierr);
1499     ierr = MatDiagonalScale(b,ll,0); CHKERRQ(ierr);
1500   }
1501   if (rr) SETERRQ(PETSC_ERR_SUP,0,"not supported for right vector");
1502   PetscFunctionReturn(0);
1503 }
1504 
1505 #undef __FUNC__
1506 #define __FUNC__ "MatZeroRows_MPIBAIJ"
1507 int MatZeroRows_MPIBAIJ(Mat A,IS is,Scalar *diag)
1508 {
1509   Mat_MPIBAIJ    *l = (Mat_MPIBAIJ *) A->data;
1510   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
1511   int            *procs,*nprocs,j,found,idx,nsends,*work,row;
1512   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
1513   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
1514   int            *lens,imdex,*lrows,*values,bs=l->bs,rstart_bs=l->rstart_bs;
1515   MPI_Comm       comm = A->comm;
1516   MPI_Request    *send_waits,*recv_waits;
1517   MPI_Status     recv_status,*send_status;
1518   IS             istmp;
1519 
1520   PetscFunctionBegin;
1521   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
1522   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
1523 
1524   /*  first count number of contributors to each processor */
1525   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
1526   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
1527   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
1528   for ( i=0; i<N; i++ ) {
1529     idx = rows[i];
1530     found = 0;
1531     for ( j=0; j<size; j++ ) {
1532       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
1533         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
1534       }
1535     }
1536     if (!found) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Index out of range");
1537   }
1538   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
1539 
1540   /* inform other processors of number of messages and max length*/
1541   work   = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
1542   ierr   = MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
1543   nrecvs = work[rank];
1544   ierr   = MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);CHKERRQ(ierr);
1545   nmax   = work[rank];
1546   PetscFree(work);
1547 
1548   /* post receives:   */
1549   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); CHKPTRQ(rvalues);
1550   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
1551   for ( i=0; i<nrecvs; i++ ) {
1552     ierr = MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);CHKERRQ(ierr);
1553   }
1554 
1555   /* do sends:
1556      1) starts[i] gives the starting index in svalues for stuff going to
1557      the ith processor
1558   */
1559   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
1560   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
1561   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
1562   starts[0] = 0;
1563   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1564   for ( i=0; i<N; i++ ) {
1565     svalues[starts[owner[i]]++] = rows[i];
1566   }
1567   ISRestoreIndices(is,&rows);
1568 
1569   starts[0] = 0;
1570   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1571   count = 0;
1572   for ( i=0; i<size; i++ ) {
1573     if (procs[i]) {
1574       ierr = MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);CHKERRQ(ierr);
1575     }
1576   }
1577   PetscFree(starts);
1578 
1579   base = owners[rank]*bs;
1580 
1581   /*  wait on receives */
1582   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
1583   source = lens + nrecvs;
1584   count  = nrecvs; slen = 0;
1585   while (count) {
1586     ierr = MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);CHKERRQ(ierr);
1587     /* unpack receives into our local space */
1588     ierr = MPI_Get_count(&recv_status,MPI_INT,&n);CHKERRQ(ierr);
1589     source[imdex]  = recv_status.MPI_SOURCE;
1590     lens[imdex]  = n;
1591     slen += n;
1592     count--;
1593   }
1594   PetscFree(recv_waits);
1595 
1596   /* move the data into the send scatter */
1597   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
1598   count = 0;
1599   for ( i=0; i<nrecvs; i++ ) {
1600     values = rvalues + i*nmax;
1601     for ( j=0; j<lens[i]; j++ ) {
1602       lrows[count++] = values[j] - base;
1603     }
1604   }
1605   PetscFree(rvalues); PetscFree(lens);
1606   PetscFree(owner); PetscFree(nprocs);
1607 
1608   /* actually zap the local rows */
1609   ierr = ISCreateGeneral(PETSC_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
1610   PLogObjectParent(A,istmp);
1611 
1612   ierr = MatZeroRows(l->A,istmp,0); CHKERRQ(ierr);
1613   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
1614   ierr = ISDestroy(istmp); CHKERRQ(ierr);
1615 
1616   if (diag) {
1617     for ( i = 0; i < slen; i++ ) {
1618       row = lrows[i] + rstart_bs;
1619       ierr = MatSetValues(A,1,&row,1,&row,diag,INSERT_VALUES); CHKERRQ(ierr);
1620     }
1621     ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1622     ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1623   }
1624   PetscFree(lrows);
1625 
1626   /* wait on sends */
1627   if (nsends) {
1628     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
1629     ierr        = MPI_Waitall(nsends,send_waits,send_status);CHKERRQ(ierr);
1630     PetscFree(send_status);
1631   }
1632   PetscFree(send_waits); PetscFree(svalues);
1633 
1634   PetscFunctionReturn(0);
1635 }
1636 extern int MatPrintHelp_SeqBAIJ(Mat);
1637 #undef __FUNC__
1638 #define __FUNC__ "MatPrintHelp_MPIBAIJ"
1639 int MatPrintHelp_MPIBAIJ(Mat A)
1640 {
1641   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1642   int         ierr;
1643 
1644   PetscFunctionBegin;
1645   if (!a->rank) {
1646     ierr = MatPrintHelp_SeqBAIJ(a->A);CHKERRQ(ierr);
1647   }
1648   PetscFunctionReturn(0);
1649 }
1650 
1651 #undef __FUNC__
1652 #define __FUNC__ "MatSetUnfactored_MPIBAIJ"
1653 int MatSetUnfactored_MPIBAIJ(Mat A)
1654 {
1655   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1656   int         ierr;
1657 
1658   PetscFunctionBegin;
1659   ierr = MatSetUnfactored(a->A); CHKERRQ(ierr);
1660   PetscFunctionReturn(0);
1661 }
1662 
1663 static int MatConvertSameType_MPIBAIJ(Mat,Mat *,int);
1664 
1665 /* -------------------------------------------------------------------*/
1666 static struct _MatOps MatOps = {
1667   MatSetValues_MPIBAIJ,MatGetRow_MPIBAIJ,MatRestoreRow_MPIBAIJ,MatMult_MPIBAIJ,
1668   MatMultAdd_MPIBAIJ,MatMultTrans_MPIBAIJ,MatMultTransAdd_MPIBAIJ,0,
1669   0,0,0,0,
1670   0,0,MatTranspose_MPIBAIJ,MatGetInfo_MPIBAIJ,
1671   0,MatGetDiagonal_MPIBAIJ,MatDiagonalScale_MPIBAIJ,MatNorm_MPIBAIJ,
1672   MatAssemblyBegin_MPIBAIJ,MatAssemblyEnd_MPIBAIJ,0,MatSetOption_MPIBAIJ,
1673   MatZeroEntries_MPIBAIJ,MatZeroRows_MPIBAIJ,0,
1674   0,0,0,MatGetSize_MPIBAIJ,
1675   MatGetLocalSize_MPIBAIJ,MatGetOwnershipRange_MPIBAIJ,0,0,
1676   0,0,MatConvertSameType_MPIBAIJ,0,0,
1677   0,0,0,MatGetSubMatrices_MPIBAIJ,
1678   MatIncreaseOverlap_MPIBAIJ,MatGetValues_MPIBAIJ,0,MatPrintHelp_MPIBAIJ,
1679   MatScale_MPIBAIJ,0,0,0,MatGetBlockSize_MPIBAIJ,
1680   0,0,0,0,0,0,MatSetUnfactored_MPIBAIJ,0,MatSetValuesBlocked_MPIBAIJ};
1681 
1682 
1683 #undef __FUNC__
1684 #define __FUNC__ "MatCreateMPIBAIJ"
1685 /*@C
1686    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
1687    (block compressed row).  For good matrix assembly performance
1688    the user should preallocate the matrix storage by setting the parameters
1689    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1690    performance can be increased by more than a factor of 50.
1691 
1692    Input Parameters:
1693 .  comm - MPI communicator
1694 .  bs   - size of blockk
1695 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1696            This value should be the same as the local size used in creating the
1697            y vector for the matrix-vector product y = Ax.
1698 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1699            This value should be the same as the local size used in creating the
1700            x vector for the matrix-vector product y = Ax.
1701 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1702 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1703 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
1704            submatrix  (same for all local rows)
1705 .  d_nzz - array containing the number of block nonzeros in the various block rows
1706            of the in diagonal portion of the local (possibly different for each block
1707            row) or PETSC_NULL.  You must leave room for the diagonal entry even if
1708            it is zero.
1709 .  o_nz  - number of block nonzeros per block row in the off-diagonal portion of local
1710            submatrix (same for all local rows).
1711 .  o_nzz - array containing the number of nonzeros in the various block rows of the
1712            off-diagonal portion of the local submatrix (possibly different for
1713            each block row) or PETSC_NULL.
1714 
1715    Output Parameter:
1716 .  A - the matrix
1717 
1718    Notes:
1719    The user MUST specify either the local or global matrix dimensions
1720    (possibly both).
1721 
1722    Storage Information:
1723    For a square global matrix we define each processor's diagonal portion
1724    to be its local rows and the corresponding columns (a square submatrix);
1725    each processor's off-diagonal portion encompasses the remainder of the
1726    local matrix (a rectangular submatrix).
1727 
1728    The user can specify preallocated storage for the diagonal part of
1729    the local submatrix with either d_nz or d_nnz (not both).  Set
1730    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1731    memory allocation.  Likewise, specify preallocated storage for the
1732    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1733 
1734    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1735    the figure below we depict these three local rows and all columns (0-11).
1736 
1737 $          0 1 2 3 4 5 6 7 8 9 10 11
1738 $         -------------------
1739 $  row 3  |  o o o d d d o o o o o o
1740 $  row 4  |  o o o d d d o o o o o o
1741 $  row 5  |  o o o d d d o o o o o o
1742 $         -------------------
1743 $
1744 
1745    Thus, any entries in the d locations are stored in the d (diagonal)
1746    submatrix, and any entries in the o locations are stored in the
1747    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1748    stored simply in the MATSEQBAIJ format for compressed row storage.
1749 
1750    Now d_nz should indicate the number of block nonzeros per row in the d matrix,
1751    and o_nz should indicate the number of block nonzeros per row in the o matrix.
1752    In general, for PDE problems in which most nonzeros are near the diagonal,
1753    one expects d_nz >> o_nz.   For large problems you MUST preallocate memory
1754    or you will get TERRIBLE performance; see the users' manual chapter on
1755    matrices.
1756 
1757 .keywords: matrix, block, aij, compressed row, sparse, parallel
1758 
1759 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues()
1760 @*/
1761 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
1762                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1763 {
1764   Mat          B;
1765   Mat_MPIBAIJ  *b;
1766   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE,size;
1767 
1768   PetscFunctionBegin;
1769   if (bs < 1) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Invalid block size specified, must be positive");
1770 
1771   MPI_Comm_size(comm,&size);
1772   if (size == 1) {
1773     if (M == PETSC_DECIDE) M = m;
1774     if (N == PETSC_DECIDE) N = n;
1775     ierr = MatCreateSeqBAIJ(comm,bs,M,N,d_nz,d_nnz,A); CHKERRQ(ierr);
1776     PetscFunctionReturn(0);
1777   }
1778 
1779   *A = 0;
1780   PetscHeaderCreate(B,_p_Mat,MAT_COOKIE,MATMPIBAIJ,comm,MatDestroy,MatView);
1781   PLogObjectCreate(B);
1782   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
1783   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
1784   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1785 
1786   B->destroy    = MatDestroy_MPIBAIJ;
1787   B->view       = MatView_MPIBAIJ;
1788   B->mapping    = 0;
1789   B->factor     = 0;
1790   B->assembled  = PETSC_FALSE;
1791 
1792   B->insertmode = NOT_SET_VALUES;
1793   MPI_Comm_rank(comm,&b->rank);
1794   MPI_Comm_size(comm,&b->size);
1795 
1796   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL)) {
1797     SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1798   }
1799   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) {
1800     SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"either M or m should be specified");
1801   }
1802   if ( N == PETSC_DECIDE && n == PETSC_DECIDE) {
1803     SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"either N or n should be specified");
1804   }
1805   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
1806   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
1807 
1808   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1809     work[0] = m; work[1] = n;
1810     mbs = m/bs; nbs = n/bs;
1811     ierr = MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );CHKERRQ(ierr);
1812     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
1813     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
1814   }
1815   if (m == PETSC_DECIDE) {
1816     Mbs = M/bs;
1817     if (Mbs*bs != M) SETERRQ(PETSC_ERR_ARG_SIZ,0,"No of global rows must be divisible by blocksize");
1818     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
1819     m   = mbs*bs;
1820   }
1821   if (n == PETSC_DECIDE) {
1822     Nbs = N/bs;
1823     if (Nbs*bs != N) SETERRQ(PETSC_ERR_ARG_SIZ,0,"No of global cols must be divisible by blocksize");
1824     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
1825     n   = nbs*bs;
1826   }
1827   if (mbs*bs != m || nbs*bs != n) {
1828     SETERRQ(PETSC_ERR_ARG_SIZ,0,"No of local rows, cols must be divisible by blocksize");
1829   }
1830 
1831   b->m = m; B->m = m;
1832   b->n = n; B->n = n;
1833   b->N = N; B->N = N;
1834   b->M = M; B->M = M;
1835   b->bs  = bs;
1836   b->bs2 = bs*bs;
1837   b->mbs = mbs;
1838   b->nbs = nbs;
1839   b->Mbs = Mbs;
1840   b->Nbs = Nbs;
1841 
1842   /* build local table of row and column ownerships */
1843   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1844   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIBAIJ));
1845   b->cowners = b->rowners + b->size + 2;
1846   ierr = MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1847   b->rowners[0] = 0;
1848   for ( i=2; i<=b->size; i++ ) {
1849     b->rowners[i] += b->rowners[i-1];
1850   }
1851   b->rstart    = b->rowners[b->rank];
1852   b->rend      = b->rowners[b->rank+1];
1853   b->rstart_bs = b->rstart * bs;
1854   b->rend_bs   = b->rend * bs;
1855 
1856   ierr = MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1857   b->cowners[0] = 0;
1858   for ( i=2; i<=b->size; i++ ) {
1859     b->cowners[i] += b->cowners[i-1];
1860   }
1861   b->cstart    = b->cowners[b->rank];
1862   b->cend      = b->cowners[b->rank+1];
1863   b->cstart_bs = b->cstart * bs;
1864   b->cend_bs   = b->cend * bs;
1865 
1866 
1867   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1868   ierr = MatCreateSeqBAIJ(PETSC_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1869   PLogObjectParent(B,b->A);
1870   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1871   ierr = MatCreateSeqBAIJ(PETSC_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1872   PLogObjectParent(B,b->B);
1873 
1874   /* build cache for off array entries formed */
1875   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1876   b->donotstash  = 0;
1877   b->colmap      = 0;
1878   b->garray      = 0;
1879   b->roworiented = 1;
1880 
1881   /* stuff used in block assembly */
1882   b->barray       = 0;
1883 
1884   /* stuff used for matrix vector multiply */
1885   b->lvec         = 0;
1886   b->Mvctx        = 0;
1887 
1888   /* stuff for MatGetRow() */
1889   b->rowindices   = 0;
1890   b->rowvalues    = 0;
1891   b->getrowactive = PETSC_FALSE;
1892 
1893   /* hash table stuff */
1894   b->ht          = 0;
1895   b->ht_size     = 0;
1896 
1897   *A = B;
1898   PetscFunctionReturn(0);
1899 }
1900 
1901 #undef __FUNC__
1902 #define __FUNC__ "MatConvertSameType_MPIBAIJ"
1903 static int MatConvertSameType_MPIBAIJ(Mat matin,Mat *newmat,int cpvalues)
1904 {
1905   Mat         mat;
1906   Mat_MPIBAIJ *a,*oldmat = (Mat_MPIBAIJ *) matin->data;
1907   int         ierr, len=0, flg;
1908 
1909   PetscFunctionBegin;
1910   *newmat       = 0;
1911   PetscHeaderCreate(mat,_p_Mat,MAT_COOKIE,MATMPIBAIJ,matin->comm,MatDestroy,MatView);
1912   PLogObjectCreate(mat);
1913   mat->data       = (void *) (a = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(a);
1914   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1915   mat->destroy    = MatDestroy_MPIBAIJ;
1916   mat->view       = MatView_MPIBAIJ;
1917   mat->factor     = matin->factor;
1918   mat->assembled  = PETSC_TRUE;
1919 
1920   a->m = mat->m   = oldmat->m;
1921   a->n = mat->n   = oldmat->n;
1922   a->M = mat->M   = oldmat->M;
1923   a->N = mat->N   = oldmat->N;
1924 
1925   a->bs  = oldmat->bs;
1926   a->bs2 = oldmat->bs2;
1927   a->mbs = oldmat->mbs;
1928   a->nbs = oldmat->nbs;
1929   a->Mbs = oldmat->Mbs;
1930   a->Nbs = oldmat->Nbs;
1931 
1932   a->rstart       = oldmat->rstart;
1933   a->rend         = oldmat->rend;
1934   a->cstart       = oldmat->cstart;
1935   a->cend         = oldmat->cend;
1936   a->size         = oldmat->size;
1937   a->rank         = oldmat->rank;
1938   mat->insertmode = NOT_SET_VALUES;
1939   a->rowvalues    = 0;
1940   a->getrowactive = PETSC_FALSE;
1941   a->barray       = 0;
1942 
1943   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1944   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIBAIJ));
1945   a->cowners = a->rowners + a->size + 2;
1946   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1947   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1948   if (oldmat->colmap) {
1949     a->colmap = (int *) PetscMalloc((a->Nbs)*sizeof(int));CHKPTRQ(a->colmap);
1950     PLogObjectMemory(mat,(a->Nbs)*sizeof(int));
1951     PetscMemcpy(a->colmap,oldmat->colmap,(a->Nbs)*sizeof(int));
1952   } else a->colmap = 0;
1953   if (oldmat->garray && (len = ((Mat_SeqBAIJ *) (oldmat->B->data))->nbs)) {
1954     a->garray = (int *) PetscMalloc(len*sizeof(int)); CHKPTRQ(a->garray);
1955     PLogObjectMemory(mat,len*sizeof(int));
1956     PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1957   } else a->garray = 0;
1958 
1959   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1960   PLogObjectParent(mat,a->lvec);
1961   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1962   PLogObjectParent(mat,a->Mvctx);
1963   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1964   PLogObjectParent(mat,a->A);
1965   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1966   PLogObjectParent(mat,a->B);
1967   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1968   if (flg) {
1969     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1970   }
1971   *newmat = mat;
1972   PetscFunctionReturn(0);
1973 }
1974 
1975 #include "sys.h"
1976 
1977 #undef __FUNC__
1978 #define __FUNC__ "MatLoad_MPIBAIJ"
1979 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
1980 {
1981   Mat          A;
1982   int          i, nz, ierr, j,rstart, rend, fd;
1983   Scalar       *vals,*buf;
1984   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1985   MPI_Status   status;
1986   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
1987   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
1988   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,bs2,Mbs,mbs,extra_rows;
1989   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
1990   int          dcount,kmax,k,nzcount,tmp;
1991 
1992   PetscFunctionBegin;
1993   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
1994   bs2  = bs*bs;
1995 
1996   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1997   if (!rank) {
1998     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1999     ierr = PetscBinaryRead(fd,(char *)header,4,PETSC_INT); CHKERRQ(ierr);
2000     if (header[0] != MAT_COOKIE) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"not matrix object");
2001     if (header[3] < 0) {
2002       SETERRQ(PETSC_ERR_FILE_UNEXPECTED,1,"Matrix stored in special format, cannot load as MPIBAIJ");
2003     }
2004   }
2005 
2006   ierr = MPI_Bcast(header+1,3,MPI_INT,0,comm);CHKERRQ(ierr);
2007   M = header[1]; N = header[2];
2008 
2009   if (M != N) SETERRQ(PETSC_ERR_SUP,0,"Can only do square matrices");
2010 
2011   /*
2012      This code adds extra rows to make sure the number of rows is
2013      divisible by the blocksize
2014   */
2015   Mbs        = M/bs;
2016   extra_rows = bs - M + bs*(Mbs);
2017   if (extra_rows == bs) extra_rows = 0;
2018   else                  Mbs++;
2019   if (extra_rows &&!rank) {
2020     PLogInfo(0,"MatLoad_MPIBAIJ:Padding loaded matrix to match blocksize\n");
2021   }
2022 
2023   /* determine ownership of all rows */
2024   mbs = Mbs/size + ((Mbs % size) > rank);
2025   m   = mbs * bs;
2026   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
2027   browners = rowners + size + 1;
2028   ierr = MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
2029   rowners[0] = 0;
2030   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
2031   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
2032   rstart = rowners[rank];
2033   rend   = rowners[rank+1];
2034 
2035   /* distribute row lengths to all processors */
2036   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
2037   if (!rank) {
2038     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
2039     ierr = PetscBinaryRead(fd,rowlengths,M,PETSC_INT); CHKERRQ(ierr);
2040     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
2041     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
2042     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
2043     ierr = MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);CHKERRQ(ierr);
2044     PetscFree(sndcounts);
2045   } else {
2046     ierr = MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);CHKERRQ(ierr);
2047   }
2048 
2049   if (!rank) {
2050     /* calculate the number of nonzeros on each processor */
2051     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
2052     PetscMemzero(procsnz,size*sizeof(int));
2053     for ( i=0; i<size; i++ ) {
2054       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
2055         procsnz[i] += rowlengths[j];
2056       }
2057     }
2058     PetscFree(rowlengths);
2059 
2060     /* determine max buffer needed and allocate it */
2061     maxnz = 0;
2062     for ( i=0; i<size; i++ ) {
2063       maxnz = PetscMax(maxnz,procsnz[i]);
2064     }
2065     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
2066 
2067     /* read in my part of the matrix column indices  */
2068     nz = procsnz[0];
2069     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
2070     mycols = ibuf;
2071     if (size == 1)  nz -= extra_rows;
2072     ierr = PetscBinaryRead(fd,mycols,nz,PETSC_INT); CHKERRQ(ierr);
2073     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
2074 
2075     /* read in every ones (except the last) and ship off */
2076     for ( i=1; i<size-1; i++ ) {
2077       nz   = procsnz[i];
2078       ierr = PetscBinaryRead(fd,cols,nz,PETSC_INT); CHKERRQ(ierr);
2079       ierr = MPI_Send(cols,nz,MPI_INT,i,tag,comm);CHKERRQ(ierr);
2080     }
2081     /* read in the stuff for the last proc */
2082     if ( size != 1 ) {
2083       nz   = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
2084       ierr = PetscBinaryRead(fd,cols,nz,PETSC_INT); CHKERRQ(ierr);
2085       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
2086       ierr = MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);CHKERRQ(ierr);
2087     }
2088     PetscFree(cols);
2089   } else {
2090     /* determine buffer space needed for message */
2091     nz = 0;
2092     for ( i=0; i<m; i++ ) {
2093       nz += locrowlens[i];
2094     }
2095     ibuf   = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
2096     mycols = ibuf;
2097     /* receive message of column indices*/
2098     ierr = MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);CHKERRQ(ierr);
2099     ierr = MPI_Get_count(&status,MPI_INT,&maxnz);CHKERRQ(ierr);
2100     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"something is wrong with file");
2101   }
2102 
2103   /* loop over local rows, determining number of off diagonal entries */
2104   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
2105   odlens = dlens + (rend-rstart);
2106   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
2107   PetscMemzero(mask,3*Mbs*sizeof(int));
2108   masked1 = mask    + Mbs;
2109   masked2 = masked1 + Mbs;
2110   rowcount = 0; nzcount = 0;
2111   for ( i=0; i<mbs; i++ ) {
2112     dcount  = 0;
2113     odcount = 0;
2114     for ( j=0; j<bs; j++ ) {
2115       kmax = locrowlens[rowcount];
2116       for ( k=0; k<kmax; k++ ) {
2117         tmp = mycols[nzcount++]/bs;
2118         if (!mask[tmp]) {
2119           mask[tmp] = 1;
2120           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
2121           else masked1[dcount++] = tmp;
2122         }
2123       }
2124       rowcount++;
2125     }
2126 
2127     dlens[i]  = dcount;
2128     odlens[i] = odcount;
2129 
2130     /* zero out the mask elements we set */
2131     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
2132     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
2133   }
2134 
2135   /* create our matrix */
2136   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);
2137          CHKERRQ(ierr);
2138   A = *newmat;
2139   MatSetOption(A,MAT_COLUMNS_SORTED);
2140 
2141   if (!rank) {
2142     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
2143     /* read in my part of the matrix numerical values  */
2144     nz = procsnz[0];
2145     vals = buf;
2146     mycols = ibuf;
2147     if (size == 1)  nz -= extra_rows;
2148     ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
2149     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
2150 
2151     /* insert into matrix */
2152     jj      = rstart*bs;
2153     for ( i=0; i<m; i++ ) {
2154       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
2155       mycols += locrowlens[i];
2156       vals   += locrowlens[i];
2157       jj++;
2158     }
2159     /* read in other processors (except the last one) and ship out */
2160     for ( i=1; i<size-1; i++ ) {
2161       nz   = procsnz[i];
2162       vals = buf;
2163       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
2164       ierr = MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);CHKERRQ(ierr);
2165     }
2166     /* the last proc */
2167     if ( size != 1 ){
2168       nz   = procsnz[i] - extra_rows;
2169       vals = buf;
2170       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
2171       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
2172       ierr = MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);CHKERRQ(ierr);
2173     }
2174     PetscFree(procsnz);
2175   } else {
2176     /* receive numeric values */
2177     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
2178 
2179     /* receive message of values*/
2180     vals   = buf;
2181     mycols = ibuf;
2182     ierr   = MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);CHKERRQ(ierr);
2183     ierr   = MPI_Get_count(&status,MPIU_SCALAR,&maxnz);CHKERRQ(ierr);
2184     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"something is wrong with file");
2185 
2186     /* insert into matrix */
2187     jj      = rstart*bs;
2188     for ( i=0; i<m; i++ ) {
2189       ierr    = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
2190       mycols += locrowlens[i];
2191       vals   += locrowlens[i];
2192       jj++;
2193     }
2194   }
2195   PetscFree(locrowlens);
2196   PetscFree(buf);
2197   PetscFree(ibuf);
2198   PetscFree(rowners);
2199   PetscFree(dlens);
2200   PetscFree(mask);
2201   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2202   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2203   PetscFunctionReturn(0);
2204 }
2205 
2206 
2207