xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision bbb85fb35fa65146d77861245dba6ac34c084efa)
1 #ifdef PETSC_RCS_HEADER
2 static char vcid[] = "$Id: mpibaij.c,v 1.155 1999/02/26 17:08:24 balay Exp balay $";
3 #endif
4 
5 #include "src/mat/impls/baij/mpi/mpibaij.h"   /*I  "mat.h"  I*/
6 #include "src/vec/vecimpl.h"
7 
8 extern int MatSetUpMultiply_MPIBAIJ(Mat);
9 extern int DisAssemble_MPIBAIJ(Mat);
10 extern int MatIncreaseOverlap_MPIBAIJ(Mat,int,IS *,int);
11 extern int MatGetSubMatrices_MPIBAIJ(Mat,int,IS *,IS *,MatReuse,Mat **);
12 extern int MatGetValues_SeqBAIJ(Mat,int,int *,int,int *,Scalar *);
13 extern int MatSetValues_SeqBAIJ(Mat,int,int *,int,int *,Scalar *,InsertMode);
14 extern int MatSetValuesBlocked_SeqBAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
15 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
16 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
17 extern int MatPrintHelp_SeqBAIJ(Mat);
18 
19 /*
20      Local utility routine that creates a mapping from the global column
21    number to the local number in the off-diagonal part of the local
22    storage of the matrix.  This is done in a non scable way since the
23    length of colmap equals the global matrix length.
24 */
25 #undef __FUNC__
26 #define __FUNC__ "CreateColmap_MPIBAIJ_Private"
27 static int CreateColmap_MPIBAIJ_Private(Mat mat)
28 {
29   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
30   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
31   int         nbs = B->nbs,i,bs=B->bs;
32 #if defined (USE_CTABLE)
33   int         ierr;
34 #endif
35 
36   PetscFunctionBegin;
37 #if defined (USE_CTABLE)
38   ierr = TableCreate(baij->nbs/5,&baij->colmap); CHKERRQ(ierr);
39   for ( i=0; i<nbs; i++ ){
40     ierr = TableAdd(baij->colmap,baij->garray[i]+1,i*bs+1);CHKERRQ(ierr);
41   }
42 #else
43   baij->colmap = (int *) PetscMalloc((baij->Nbs+1)*sizeof(int));CHKPTRQ(baij->colmap);
44   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
45   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
46   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i*bs+1;
47 #endif
48   PetscFunctionReturn(0);
49 }
50 
51 #define CHUNKSIZE  10
52 
53 #define  MatSetValues_SeqBAIJ_A_Private(row,col,value,addv) \
54 { \
55  \
56     brow = row/bs;  \
57     rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
58     rmax = aimax[brow]; nrow = ailen[brow]; \
59       bcol = col/bs; \
60       ridx = row % bs; cidx = col % bs; \
61       low = 0; high = nrow; \
62       while (high-low > 3) { \
63         t = (low+high)/2; \
64         if (rp[t] > bcol) high = t; \
65         else              low  = t; \
66       } \
67       for ( _i=low; _i<high; _i++ ) { \
68         if (rp[_i] > bcol) break; \
69         if (rp[_i] == bcol) { \
70           bap  = ap +  bs2*_i + bs*cidx + ridx; \
71           if (addv == ADD_VALUES) *bap += value;  \
72           else                    *bap  = value;  \
73           goto a_noinsert; \
74         } \
75       } \
76       if (a->nonew == 1) goto a_noinsert; \
77       else if (a->nonew == -1) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero into matrix"); \
78       if (nrow >= rmax) { \
79         /* there is no extra room in row, therefore enlarge */ \
80         int    new_nz = ai[a->mbs] + CHUNKSIZE,len,*new_i,*new_j; \
81         Scalar *new_a; \
82  \
83         if (a->nonew == -2) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero in the matrix"); \
84  \
85         /* malloc new storage space */ \
86         len     = new_nz*(sizeof(int)+bs2*sizeof(Scalar))+(a->mbs+1)*sizeof(int); \
87         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
88         new_j   = (int *) (new_a + bs2*new_nz); \
89         new_i   = new_j + new_nz; \
90  \
91         /* copy over old data into new slots */ \
92         for ( ii=0; ii<brow+1; ii++ ) {new_i[ii] = ai[ii];} \
93         for ( ii=brow+1; ii<a->mbs+1; ii++ ) {new_i[ii] = ai[ii]+CHUNKSIZE;} \
94         PetscMemcpy(new_j,aj,(ai[brow]+nrow)*sizeof(int)); \
95         len = (new_nz - CHUNKSIZE - ai[brow] - nrow); \
96         PetscMemcpy(new_j+ai[brow]+nrow+CHUNKSIZE,aj+ai[brow]+nrow, \
97                                                            len*sizeof(int)); \
98         PetscMemcpy(new_a,aa,(ai[brow]+nrow)*bs2*sizeof(Scalar)); \
99         PetscMemzero(new_a+bs2*(ai[brow]+nrow),bs2*CHUNKSIZE*sizeof(Scalar)); \
100         PetscMemcpy(new_a+bs2*(ai[brow]+nrow+CHUNKSIZE), \
101                     aa+bs2*(ai[brow]+nrow),bs2*len*sizeof(Scalar));  \
102         /* free up old matrix storage */ \
103         PetscFree(a->a);  \
104         if (!a->singlemalloc) {PetscFree(a->i);PetscFree(a->j);} \
105         aa = a->a = new_a; ai = a->i = new_i; aj = a->j = new_j;  \
106         a->singlemalloc = 1; \
107  \
108         rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
109         rmax = aimax[brow] = aimax[brow] + CHUNKSIZE; \
110         PLogObjectMemory(A,CHUNKSIZE*(sizeof(int) + bs2*sizeof(Scalar))); \
111         a->maxnz += bs2*CHUNKSIZE; \
112         a->reallocs++; \
113         a->nz++; \
114       } \
115       N = nrow++ - 1;  \
116       /* shift up all the later entries in this row */ \
117       for ( ii=N; ii>=_i; ii-- ) { \
118         rp[ii+1] = rp[ii]; \
119         PetscMemcpy(ap+bs2*(ii+1),ap+bs2*(ii),bs2*sizeof(Scalar)); \
120       } \
121       if (N>=_i) PetscMemzero(ap+bs2*_i,bs2*sizeof(Scalar));  \
122       rp[_i]                      = bcol;  \
123       ap[bs2*_i + bs*cidx + ridx] = value;  \
124       a_noinsert:; \
125     ailen[brow] = nrow; \
126 }
127 
128 #define  MatSetValues_SeqBAIJ_B_Private(row,col,value,addv) \
129 { \
130  \
131     brow = row/bs;  \
132     rp   = bj + bi[brow]; ap = ba + bs2*bi[brow]; \
133     rmax = bimax[brow]; nrow = bilen[brow]; \
134       bcol = col/bs; \
135       ridx = row % bs; cidx = col % bs; \
136       low = 0; high = nrow; \
137       while (high-low > 3) { \
138         t = (low+high)/2; \
139         if (rp[t] > bcol) high = t; \
140         else              low  = t; \
141       } \
142       for ( _i=low; _i<high; _i++ ) { \
143         if (rp[_i] > bcol) break; \
144         if (rp[_i] == bcol) { \
145           bap  = ap +  bs2*_i + bs*cidx + ridx; \
146           if (addv == ADD_VALUES) *bap += value;  \
147           else                    *bap  = value;  \
148           goto b_noinsert; \
149         } \
150       } \
151       if (b->nonew == 1) goto b_noinsert; \
152       else if (b->nonew == -1) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero into matrix"); \
153       if (nrow >= rmax) { \
154         /* there is no extra room in row, therefore enlarge */ \
155         int    new_nz = bi[b->mbs] + CHUNKSIZE,len,*new_i,*new_j; \
156         Scalar *new_a; \
157  \
158         if (b->nonew == -2) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero in the matrix"); \
159  \
160         /* malloc new storage space */ \
161         len     = new_nz*(sizeof(int)+bs2*sizeof(Scalar))+(b->mbs+1)*sizeof(int); \
162         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
163         new_j   = (int *) (new_a + bs2*new_nz); \
164         new_i   = new_j + new_nz; \
165  \
166         /* copy over old data into new slots */ \
167         for ( ii=0; ii<brow+1; ii++ ) {new_i[ii] = bi[ii];} \
168         for ( ii=brow+1; ii<b->mbs+1; ii++ ) {new_i[ii] = bi[ii]+CHUNKSIZE;} \
169         PetscMemcpy(new_j,bj,(bi[brow]+nrow)*sizeof(int)); \
170         len = (new_nz - CHUNKSIZE - bi[brow] - nrow); \
171         PetscMemcpy(new_j+bi[brow]+nrow+CHUNKSIZE,bj+bi[brow]+nrow, \
172                                                            len*sizeof(int)); \
173         PetscMemcpy(new_a,ba,(bi[brow]+nrow)*bs2*sizeof(Scalar)); \
174         PetscMemzero(new_a+bs2*(bi[brow]+nrow),bs2*CHUNKSIZE*sizeof(Scalar)); \
175         PetscMemcpy(new_a+bs2*(bi[brow]+nrow+CHUNKSIZE), \
176                     ba+bs2*(bi[brow]+nrow),bs2*len*sizeof(Scalar));  \
177         /* free up old matrix storage */ \
178         PetscFree(b->a);  \
179         if (!b->singlemalloc) {PetscFree(b->i);PetscFree(b->j);} \
180         ba = b->a = new_a; bi = b->i = new_i; bj = b->j = new_j;  \
181         b->singlemalloc = 1; \
182  \
183         rp   = bj + bi[brow]; ap = ba + bs2*bi[brow]; \
184         rmax = bimax[brow] = bimax[brow] + CHUNKSIZE; \
185         PLogObjectMemory(B,CHUNKSIZE*(sizeof(int) + bs2*sizeof(Scalar))); \
186         b->maxnz += bs2*CHUNKSIZE; \
187         b->reallocs++; \
188         b->nz++; \
189       } \
190       N = nrow++ - 1;  \
191       /* shift up all the later entries in this row */ \
192       for ( ii=N; ii>=_i; ii-- ) { \
193         rp[ii+1] = rp[ii]; \
194         PetscMemcpy(ap+bs2*(ii+1),ap+bs2*(ii),bs2*sizeof(Scalar)); \
195       } \
196       if (N>=_i) PetscMemzero(ap+bs2*_i,bs2*sizeof(Scalar));  \
197       rp[_i]                      = bcol;  \
198       ap[bs2*_i + bs*cidx + ridx] = value;  \
199       b_noinsert:; \
200     bilen[brow] = nrow; \
201 }
202 
203 #undef __FUNC__
204 #define __FUNC__ "MatSetValues_MPIBAIJ"
205 int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
206 {
207   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
208   Scalar      value;
209   int         ierr,i,j,row,col;
210   int         roworiented = baij->roworiented,rstart_orig=baij->rstart_bs ;
211   int         rend_orig=baij->rend_bs,cstart_orig=baij->cstart_bs;
212   int         cend_orig=baij->cend_bs,bs=baij->bs;
213 
214   /* Some Variables required in the macro */
215   Mat         A = baij->A;
216   Mat_SeqBAIJ *a = (Mat_SeqBAIJ *) (A)->data;
217   int         *aimax=a->imax,*ai=a->i,*ailen=a->ilen,*aj=a->j;
218   Scalar      *aa=a->a;
219 
220   Mat         B = baij->B;
221   Mat_SeqBAIJ *b = (Mat_SeqBAIJ *) (B)->data;
222   int         *bimax=b->imax,*bi=b->i,*bilen=b->ilen,*bj=b->j;
223   Scalar      *ba=b->a;
224 
225   int         *rp,ii,nrow,_i,rmax,N,brow,bcol;
226   int         low,high,t,ridx,cidx,bs2=a->bs2;
227   Scalar      *ap,*bap;
228 
229   PetscFunctionBegin;
230   for ( i=0; i<m; i++ ) {
231     if (im[i] < 0) continue;
232 #if defined(USE_PETSC_BOPT_g)
233     if (im[i] >= baij->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
234 #endif
235     if (im[i] >= rstart_orig && im[i] < rend_orig) {
236       row = im[i] - rstart_orig;
237       for ( j=0; j<n; j++ ) {
238         if (in[j] >= cstart_orig && in[j] < cend_orig){
239           col = in[j] - cstart_orig;
240           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
241           MatSetValues_SeqBAIJ_A_Private(row,col,value,addv);
242           /* ierr = MatSetValues_SeqBAIJ(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
243         }
244         else if (in[j] < 0) continue;
245 #if defined(USE_PETSC_BOPT_g)
246         else if (in[j] >= baij->N) {SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Col too large");}
247 #endif
248         else {
249           if (mat->was_assembled) {
250             if (!baij->colmap) {
251               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
252             }
253 #if defined (USE_CTABLE)
254             ierr = TableFind(baij->colmap, in[j]/bs + 1,&col); CHKERRQ(ierr);
255             col  = col - 1 + in[j]%bs;
256 #else
257             col = baij->colmap[in[j]/bs] - 1 + in[j]%bs;
258 #endif
259             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
260               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
261               col =  in[j];
262               /* Reinitialize the variables required by MatSetValues_SeqBAIJ_B_Private() */
263               B = baij->B;
264               b = (Mat_SeqBAIJ *) (B)->data;
265               bimax=b->imax;bi=b->i;bilen=b->ilen;bj=b->j;
266               ba=b->a;
267             }
268           } else col = in[j];
269           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
270           MatSetValues_SeqBAIJ_B_Private(row,col,value,addv);
271           /* ierr = MatSetValues_SeqBAIJ(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
272         }
273       }
274     } else {
275       if (roworiented && !baij->donotstash) {
276         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
277       } else {
278         if (!baij->donotstash) {
279           row = im[i];
280 	  for ( j=0; j<n; j++ ) {
281 	    ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
282           }
283         }
284       }
285     }
286   }
287   PetscFunctionReturn(0);
288 }
289 
290 #undef __FUNC__
291 #define __FUNC__ "MatSetValuesBlocked_MPIBAIJ"
292 int MatSetValuesBlocked_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
293 {
294   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
295   Scalar      *value,*barray=baij->barray;
296   int         ierr,i,j,ii,jj,row,col,k,l;
297   int         roworiented = baij->roworiented,rstart=baij->rstart ;
298   int         rend=baij->rend,cstart=baij->cstart,stepval;
299   int         cend=baij->cend,bs=baij->bs,bs2=baij->bs2;
300 
301   if(!barray) {
302     baij->barray = barray = (Scalar*) PetscMalloc(bs2*sizeof(Scalar)); CHKPTRQ(barray);
303   }
304 
305   if (roworiented) {
306     stepval = (n-1)*bs;
307   } else {
308     stepval = (m-1)*bs;
309   }
310   for ( i=0; i<m; i++ ) {
311     if (im[i] < 0) continue;
312 #if defined(USE_PETSC_BOPT_g)
313     if (im[i] >= baij->Mbs) SETERRQ2(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large, row %d max %d",im[i],baij->Mbs);
314 #endif
315     if (im[i] >= rstart && im[i] < rend) {
316       row = im[i] - rstart;
317       for ( j=0; j<n; j++ ) {
318         /* If NumCol = 1 then a copy is not required */
319         if ((roworiented) && (n == 1)) {
320           barray = v + i*bs2;
321         } else if((!roworiented) && (m == 1)) {
322           barray = v + j*bs2;
323         } else { /* Here a copy is required */
324           if (roworiented) {
325             value = v + i*(stepval+bs)*bs + j*bs;
326           } else {
327             value = v + j*(stepval+bs)*bs + i*bs;
328           }
329           for ( ii=0; ii<bs; ii++,value+=stepval ) {
330             for (jj=0; jj<bs; jj++ ) {
331               *barray++  = *value++;
332             }
333           }
334           barray -=bs2;
335         }
336 
337         if (in[j] >= cstart && in[j] < cend){
338           col  = in[j] - cstart;
339           ierr = MatSetValuesBlocked_SeqBAIJ(baij->A,1,&row,1,&col,barray,addv);CHKERRQ(ierr);
340         }
341         else if (in[j] < 0) continue;
342 #if defined(USE_PETSC_BOPT_g)
343         else if (in[j] >= baij->Nbs) {SETERRQ2(PETSC_ERR_ARG_OUTOFRANGE,0,"Column too large, col %d max %d",in[j],baij->Nbs);}
344 #endif
345         else {
346           if (mat->was_assembled) {
347             if (!baij->colmap) {
348               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
349             }
350 
351 #if defined(USE_PETSC_BOPT_g)
352 #if defined (USE_CTABLE)
353             { int data;
354             ierr = TableFind(baij->colmap,in[j]+1,&data); CHKERRQ(ierr);
355             if((data - 1) % bs)
356 	      {SETERRQ(PETSC_ERR_PLIB,0,"Incorrect colmap");}
357             }
358 #else
359             if ((baij->colmap[in[j]] - 1) % bs) {SETERRQ(PETSC_ERR_PLIB,0,"Incorrect colmap");}
360 #endif
361 #endif
362 #if defined (USE_CTABLE)
363 	    ierr = TableFind(baij->colmap,in[j]+1,&col); CHKERRQ(ierr);
364             col  = (col - 1)/bs;
365 #else
366             col = (baij->colmap[in[j]] - 1)/bs;
367 #endif
368             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
369               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
370               col =  in[j];
371             }
372           }
373           else col = in[j];
374           ierr = MatSetValuesBlocked_SeqBAIJ(baij->B,1,&row,1,&col,barray,addv);CHKERRQ(ierr);
375         }
376       }
377     } else {
378       if (!baij->donotstash) {
379         if (roworiented ) {
380           row   = im[i]*bs;
381           value = v + i*(stepval+bs)*bs;
382           for ( j=0; j<bs; j++,row++ ) {
383             for ( k=0; k<n; k++ ) {
384               for ( col=in[k]*bs,l=0; l<bs; l++,col++) {
385                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
386               }
387             }
388           }
389         } else {
390           for ( j=0; j<n; j++ ) {
391             value = v + j*(stepval+bs)*bs + i*bs;
392             col   = in[j]*bs;
393             for ( k=0; k<bs; k++,col++,value+=stepval) {
394               for ( row = im[i]*bs,l=0; l<bs; l++,row++) {
395                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
396               }
397             }
398           }
399         }
400       }
401     }
402   }
403   PetscFunctionReturn(0);
404 }
405 #define HASH_KEY 0.6180339887
406 /* #define HASH1(size,key) ((int)((size)*fmod(((key)*HASH_KEY),1))) */
407 #define HASH(size,key,tmp) (tmp = (key)*HASH_KEY,(int)((size)*(tmp-(int)tmp)))
408 /* #define HASH(size,key,tmp) ((int)((size)*fmod(((key)*HASH_KEY),1))) */
409 #undef __FUNC__
410 #define __FUNC__ "MatSetValues_MPIBAIJ_HT"
411 int MatSetValues_MPIBAIJ_HT(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
412 {
413   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
414   int         ierr,i,j,row,col;
415   int         roworiented = baij->roworiented,rstart_orig=baij->rstart_bs ;
416   int         rend_orig=baij->rend_bs,Nbs=baij->Nbs;
417   int         h1,key,size=baij->ht_size,bs=baij->bs,*HT=baij->ht,idx;
418   double      tmp;
419   Scalar      ** HD = baij->hd,value;
420 #if defined(USE_PETSC_BOPT_g)
421   int         total_ct=baij->ht_total_ct,insert_ct=baij->ht_insert_ct;
422 #endif
423 
424   PetscFunctionBegin;
425 
426   for ( i=0; i<m; i++ ) {
427 #if defined(USE_PETSC_BOPT_g)
428     if (im[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
429     if (im[i] >= baij->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
430 #endif
431       row = im[i];
432     if (row >= rstart_orig && row < rend_orig) {
433       for ( j=0; j<n; j++ ) {
434         col = in[j];
435         if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
436         /* Look up into the Hash Table */
437         key = (row/bs)*Nbs+(col/bs)+1;
438         h1  = HASH(size,key,tmp);
439 
440 
441         idx = h1;
442 #if defined(USE_PETSC_BOPT_g)
443         insert_ct++;
444         total_ct++;
445         if (HT[idx] != key) {
446           for ( idx=h1; (idx<size) && (HT[idx]!=key); idx++,total_ct++);
447           if (idx == size) {
448             for ( idx=0; (idx<h1) && (HT[idx]!=key); idx++,total_ct++);
449             if (idx == h1) {
450               SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"(row,col) has no entry in the hash table");
451             }
452           }
453         }
454 #else
455         if (HT[idx] != key) {
456           for ( idx=h1; (idx<size) && (HT[idx]!=key); idx++);
457           if (idx == size) {
458             for ( idx=0; (idx<h1) && (HT[idx]!=key); idx++);
459             if (idx == h1) {
460               SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"(row,col) has no entry in the hash table");
461             }
462           }
463         }
464 #endif
465         /* A HASH table entry is found, so insert the values at the correct address */
466         if (addv == ADD_VALUES) *(HD[idx]+ (col % bs)*bs + (row % bs)) += value;
467         else                    *(HD[idx]+ (col % bs)*bs + (row % bs))  = value;
468       }
469     } else {
470       if (roworiented && !baij->donotstash) {
471         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
472       } else {
473         if (!baij->donotstash) {
474           row = im[i];
475 	  for ( j=0; j<n; j++ ) {
476 	    ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
477           }
478         }
479       }
480     }
481   }
482 #if defined(USE_PETSC_BOPT_g)
483   baij->ht_total_ct = total_ct;
484   baij->ht_insert_ct = insert_ct;
485 #endif
486   PetscFunctionReturn(0);
487 }
488 
489 #undef __FUNC__
490 #define __FUNC__ "MatSetValuesBlocked_MPIBAIJ_HT"
491 int MatSetValuesBlocked_MPIBAIJ_HT(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
492 {
493   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
494   int         ierr,i,j,ii,jj,row,col,k,l;
495   int         roworiented = baij->roworiented,rstart=baij->rstart ;
496   int         rend=baij->rend,stepval,bs=baij->bs,bs2=baij->bs2;
497   int         h1,key,size=baij->ht_size,idx,*HT=baij->ht,Nbs=baij->Nbs;
498   double      tmp;
499   Scalar      ** HD = baij->hd,*value,*v_t,*baij_a;
500 #if defined(USE_PETSC_BOPT_g)
501   int         total_ct=baij->ht_total_ct,insert_ct=baij->ht_insert_ct;
502 #endif
503 
504   PetscFunctionBegin;
505 
506   if (roworiented) {
507     stepval = (n-1)*bs;
508   } else {
509     stepval = (m-1)*bs;
510   }
511   for ( i=0; i<m; i++ ) {
512 #if defined(USE_PETSC_BOPT_g)
513     if (im[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
514     if (im[i] >= baij->Mbs) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
515 #endif
516     row   = im[i];
517     v_t   = v + i*bs2;
518     if (row >= rstart && row < rend) {
519       for ( j=0; j<n; j++ ) {
520         col = in[j];
521 
522         /* Look up into the Hash Table */
523         key = row*Nbs+col+1;
524         h1  = HASH(size,key,tmp);
525 
526         idx = h1;
527 #if defined(USE_PETSC_BOPT_g)
528         total_ct++;
529         insert_ct++;
530        if (HT[idx] != key) {
531           for ( idx=h1; (idx<size) && (HT[idx]!=key); idx++,total_ct++);
532           if (idx == size) {
533             for ( idx=0; (idx<h1) && (HT[idx]!=key); idx++,total_ct++);
534             if (idx == h1) {
535               SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"(row,col) has no entry in the hash table");
536             }
537           }
538         }
539 #else
540         if (HT[idx] != key) {
541           for ( idx=h1; (idx<size) && (HT[idx]!=key); idx++);
542           if (idx == size) {
543             for ( idx=0; (idx<h1) && (HT[idx]!=key); idx++);
544             if (idx == h1) {
545               SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"(row,col) has no entry in the hash table");
546             }
547           }
548         }
549 #endif
550         baij_a = HD[idx];
551         if (roworiented) {
552           /*value = v + i*(stepval+bs)*bs + j*bs;*/
553           /* value = v + (i*(stepval+bs)+j)*bs; */
554           value = v_t;
555           v_t  += bs;
556           if (addv == ADD_VALUES) {
557             for ( ii=0; ii<bs; ii++,value+=stepval) {
558               for ( jj=ii; jj<bs2; jj+=bs ) {
559                 baij_a[jj]  += *value++;
560               }
561             }
562           } else {
563             for ( ii=0; ii<bs; ii++,value+=stepval) {
564               for ( jj=ii; jj<bs2; jj+=bs ) {
565                 baij_a[jj]  = *value++;
566               }
567             }
568           }
569         } else {
570           value = v + j*(stepval+bs)*bs + i*bs;
571           if (addv == ADD_VALUES) {
572             for ( ii=0; ii<bs; ii++,value+=stepval,baij_a+=bs ) {
573               for ( jj=0; jj<bs; jj++ ) {
574                 baij_a[jj]  += *value++;
575               }
576             }
577           } else {
578             for ( ii=0; ii<bs; ii++,value+=stepval,baij_a+=bs ) {
579               for ( jj=0; jj<bs; jj++ ) {
580                 baij_a[jj]  = *value++;
581               }
582             }
583           }
584         }
585       }
586     } else {
587       if (!baij->donotstash) {
588         if (roworiented ) {
589           row   = im[i]*bs;
590           value = v + i*(stepval+bs)*bs;
591           for ( j=0; j<bs; j++,row++ ) {
592             for ( k=0; k<n; k++ ) {
593               for ( col=in[k]*bs,l=0; l<bs; l++,col++) {
594                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
595               }
596             }
597           }
598         } else {
599           for ( j=0; j<n; j++ ) {
600             value = v + j*(stepval+bs)*bs + i*bs;
601             col   = in[j]*bs;
602             for ( k=0; k<bs; k++,col++,value+=stepval) {
603               for ( row = im[i]*bs,l=0; l<bs; l++,row++) {
604                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
605               }
606             }
607           }
608         }
609       }
610     }
611   }
612 #if defined(USE_PETSC_BOPT_g)
613   baij->ht_total_ct = total_ct;
614   baij->ht_insert_ct = insert_ct;
615 #endif
616   PetscFunctionReturn(0);
617 }
618 
619 #undef __FUNC__
620 #define __FUNC__ "MatGetValues_MPIBAIJ"
621 int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
622 {
623   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
624   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
625   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col,data;
626 
627   PetscFunctionBegin;
628   for ( i=0; i<m; i++ ) {
629     if (idxm[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
630     if (idxm[i] >= baij->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
631     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
632       row = idxm[i] - bsrstart;
633       for ( j=0; j<n; j++ ) {
634         if (idxn[j] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative column");
635         if (idxn[j] >= baij->N) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Column too large");
636         if (idxn[j] >= bscstart && idxn[j] < bscend){
637           col = idxn[j] - bscstart;
638           ierr = MatGetValues_SeqBAIJ(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
639         } else {
640           if (!baij->colmap) {
641             ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
642           }
643 #if defined (USE_CTABLE)
644           ierr = TableFind(baij->colmap,idxn[j]/bs+1,&data); CHKERRQ(ierr);
645           data --;
646 #else
647           data = baij->colmap[idxn[j]/bs]-1;
648 #endif
649           if((data < 0) || (baij->garray[data/bs] != idxn[j]/bs)) *(v+i*n+j) = 0.0;
650           else {
651             col  = data + idxn[j]%bs;
652             ierr = MatGetValues_SeqBAIJ(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
653           }
654         }
655       }
656     } else {
657       SETERRQ(PETSC_ERR_SUP,0,"Only local values currently supported");
658     }
659   }
660  PetscFunctionReturn(0);
661 }
662 
663 #undef __FUNC__
664 #define __FUNC__ "MatNorm_MPIBAIJ"
665 int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
666 {
667   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
668   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
669   int        ierr, i,bs2=baij->bs2;
670   double     sum = 0.0;
671   Scalar     *v;
672 
673   PetscFunctionBegin;
674   if (baij->size == 1) {
675     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
676   } else {
677     if (type == NORM_FROBENIUS) {
678       v = amat->a;
679       for (i=0; i<amat->nz*bs2; i++ ) {
680 #if defined(USE_PETSC_COMPLEX)
681         sum += PetscReal(PetscConj(*v)*(*v)); v++;
682 #else
683         sum += (*v)*(*v); v++;
684 #endif
685       }
686       v = bmat->a;
687       for (i=0; i<bmat->nz*bs2; i++ ) {
688 #if defined(USE_PETSC_COMPLEX)
689         sum += PetscReal(PetscConj(*v)*(*v)); v++;
690 #else
691         sum += (*v)*(*v); v++;
692 #endif
693       }
694       ierr = MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);CHKERRQ(ierr);
695       *norm = sqrt(*norm);
696     } else {
697       SETERRQ(PETSC_ERR_SUP,0,"No support for this norm yet");
698     }
699   }
700   PetscFunctionReturn(0);
701 }
702 
703 
704 /*
705   Creates the hash table, and sets the table
706   This table is created only once.
707   If new entried need to be added to the matrix
708   then the hash table has to be destroyed and
709   recreated.
710 */
711 #undef __FUNC__
712 #define __FUNC__ "MatCreateHashTable_MPIBAIJ_Private"
713 int MatCreateHashTable_MPIBAIJ_Private(Mat mat,double factor)
714 {
715   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
716   Mat         A = baij->A, B=baij->B;
717   Mat_SeqBAIJ *a=(Mat_SeqBAIJ *)A->data, *b=(Mat_SeqBAIJ *)B->data;
718   int         i,j,k,nz=a->nz+b->nz,h1,*ai=a->i,*aj=a->j,*bi=b->i,*bj=b->j;
719   int         size,bs2=baij->bs2,rstart=baij->rstart;
720   int         cstart=baij->cstart,*garray=baij->garray,row,col,Nbs=baij->Nbs;
721   int         *HT,key;
722   Scalar      **HD;
723   double      tmp;
724 #if defined(USE_PETSC_BOPT_g)
725   int         ct=0,max=0;
726 #endif
727 
728   PetscFunctionBegin;
729   baij->ht_size=(int)(factor*nz);
730   size = baij->ht_size;
731 
732   if (baij->ht) {
733     PetscFunctionReturn(0);
734   }
735 
736   /* Allocate Memory for Hash Table */
737   baij->hd = (Scalar**)PetscMalloc((size)*(sizeof(int)+sizeof(Scalar*))+1); CHKPTRQ(baij->hd);
738   baij->ht = (int*)(baij->hd + size);
739   HD = baij->hd;
740   HT = baij->ht;
741 
742 
743   PetscMemzero(HD,size*(sizeof(int)+sizeof(Scalar*)));
744 
745 
746   /* Loop Over A */
747   for ( i=0; i<a->mbs; i++ ) {
748     for ( j=ai[i]; j<ai[i+1]; j++ ) {
749       row = i+rstart;
750       col = aj[j]+cstart;
751 
752       key = row*Nbs + col + 1;
753       h1  = HASH(size,key,tmp);
754       for ( k=0; k<size; k++ ){
755         if (HT[(h1+k)%size] == 0.0) {
756           HT[(h1+k)%size] = key;
757           HD[(h1+k)%size] = a->a + j*bs2;
758           break;
759 #if defined(USE_PETSC_BOPT_g)
760         } else {
761           ct++;
762 #endif
763         }
764       }
765 #if defined(USE_PETSC_BOPT_g)
766       if (k> max) max = k;
767 #endif
768     }
769   }
770   /* Loop Over B */
771   for ( i=0; i<b->mbs; i++ ) {
772     for ( j=bi[i]; j<bi[i+1]; j++ ) {
773       row = i+rstart;
774       col = garray[bj[j]];
775       key = row*Nbs + col + 1;
776       h1  = HASH(size,key,tmp);
777       for ( k=0; k<size; k++ ){
778         if (HT[(h1+k)%size] == 0.0) {
779           HT[(h1+k)%size] = key;
780           HD[(h1+k)%size] = b->a + j*bs2;
781           break;
782 #if defined(USE_PETSC_BOPT_g)
783         } else {
784           ct++;
785 #endif
786         }
787       }
788 #if defined(USE_PETSC_BOPT_g)
789       if (k> max) max = k;
790 #endif
791     }
792   }
793 
794   /* Print Summary */
795 #if defined(USE_PETSC_BOPT_g)
796   for ( i=0,j=0; i<size; i++)
797     if (HT[i]) {j++;}
798   PLogInfo(0,"MatCreateHashTable_MPIBAIJ_Private: Average Search = %5.2f,max search = %d\n",
799            (j== 0)? 0.0:((double)(ct+j))/j,max);
800 #endif
801   PetscFunctionReturn(0);
802 }
803 
804 #if defined (__JUNK__)
805 #undef __FUNC__
806 #define __FUNC__ "MatAssemblyBegin_MPIBAIJ"
807 int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
808 {
809   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
810   MPI_Comm    comm = mat->comm;
811   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
812   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
813   MPI_Request *send_waits,*recv_waits;
814   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
815   InsertMode  addv;
816   Scalar      *rvalues,*svalues;
817 
818   PetscFunctionBegin;
819   if (baij->donotstash) {
820     baij->svalues    = 0; baij->rvalues    = 0;
821     baij->nsends     = 0; baij->nrecvs     = 0;
822     baij->send_waits = 0; baij->recv_waits = 0;
823     baij->rmax       = 0;
824     PetscFunctionReturn(0);
825   }
826 
827   /* make sure all processors are either in INSERTMODE or ADDMODE */
828   ierr = MPI_Allreduce(&mat->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);CHKERRQ(ierr);
829   if (addv == (ADD_VALUES|INSERT_VALUES)) {
830     SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"Some processors inserted others added");
831   }
832   mat->insertmode = addv; /* in case this processor had no cache */
833 
834   /*  first count number of contributors to each processor */
835   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
836   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
837   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
838   for ( i=0; i<baij->stash.n; i++ ) {
839     idx = baij->stash.idx[i];
840     for ( j=0; j<size; j++ ) {
841       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
842         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
843       }
844     }
845   }
846   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
847 
848   /* inform other processors of number of messages and max length*/
849   work      = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
850   ierr      = MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
851   nreceives = work[rank];
852   ierr      = MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);CHKERRQ(ierr);
853   nmax      = work[rank];
854   PetscFree(work);
855 
856   /* post receives:
857        1) each message will consist of ordered pairs
858      (global index,value) we store the global index as a double
859      to simplify the message passing.
860        2) since we don't know how long each individual message is we
861      allocate the largest needed buffer for each receive. Potentially
862      this is a lot of wasted space.
863 
864 
865        This could be done better.
866   */
867   rvalues    = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));CHKPTRQ(rvalues);
868   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
869   for ( i=0; i<nreceives; i++ ) {
870     ierr = MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,comm,recv_waits+i);CHKERRQ(ierr);
871   }
872 
873   /* do sends:
874       1) starts[i] gives the starting index in svalues for stuff going to
875          the ith processor
876   */
877   svalues    = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
878   send_waits = (MPI_Request *) PetscMalloc((nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
879   starts     = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
880   starts[0] = 0;
881   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
882   for ( i=0; i<baij->stash.n; i++ ) {
883     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
884     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
885     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
886   }
887   PetscFree(owner);
888   starts[0] = 0;
889   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
890   count = 0;
891   for ( i=0; i<size; i++ ) {
892     if (procs[i]) {
893       ierr = MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,comm,send_waits+count++);CHKERRQ(ierr);
894     }
895   }
896   PetscFree(starts); PetscFree(nprocs);
897 
898   /* Free cache space */
899   PLogInfo(baij->A,"MatAssemblyBegin_MPIBAIJ:Number of off-processor values %d\n",baij->stash.n);
900   ierr = StashReset_Private(&baij->stash); CHKERRQ(ierr);
901 
902   baij->svalues    = svalues;    baij->rvalues    = rvalues;
903   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
904   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
905   baij->rmax       = nmax;
906 
907   PetscFunctionReturn(0);
908 }
909 
910 #undef __FUNC__
911 #define __FUNC__ "MatAssemblyEnd_MPIBAIJ"
912 int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
913 {
914   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
915   MPI_Status  *send_status,recv_status;
916   int         imdex,count = nrecvs, i, n, ierr;
917   int         bs=baij->bs,row,col,other_disassembled;
918   Scalar      val;
919   InsertMode  addv = mat->insertmode;
920 
921   PetscFunctionBegin;
922   /*  wait on receives */
923   while (count) {
924     ierr = MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);CHKERRQ(ierr);
925     /* unpack receives into our local space */
926     values = baij->rvalues + 3*imdex*baij->rmax;
927     ierr = MPI_Get_count(&recv_status,MPIU_SCALAR,&n);CHKERRQ(ierr);
928     n = n/3;
929     for ( i=0; i<n; i++ ) {
930       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
931       col = (int) PetscReal(values[3*i+1]);
932       val = values[3*i+2];
933       if (col >= baij->cstart*bs && col < baij->cend*bs) {
934         col -= baij->cstart*bs;
935         ierr = MatSetValues_SeqBAIJ(baij->A,1,&row,1,&col,&val,addv); CHKERRQ(ierr)
936       } else {
937         if (mat->was_assembled) {
938           if (!baij->colmap) {
939             ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);
940           }
941 #if defined (USE_CTABLE)
942 	  ierr = TableFind(baij->colmap,col/bs+1,&col); CHKERRQ(ierr);
943           col  = col - 1 + col%bs;
944 #else
945           col = (baij->colmap[col/bs]) - 1 + col%bs;
946 #endif
947           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
948             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
949             col = (int) PetscReal(values[3*i+1]);
950           }
951         }
952         ierr = MatSetValues_SeqBAIJ(baij->B,1,&row,1,&col,&val,addv); CHKERRQ(ierr)
953       }
954     }
955     count--;
956   }
957   if (baij->recv_waits) PetscFree(baij->recv_waits);
958   if (baij->rvalues)    PetscFree(baij->rvalues);
959 
960   /* wait on sends */
961   if (baij->nsends) {
962     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
963     ierr        = MPI_Waitall(baij->nsends,baij->send_waits,send_status);CHKERRQ(ierr);
964     PetscFree(send_status);
965   }
966   if (baij->send_waits) PetscFree(baij->send_waits);
967   if (baij->svalues)    PetscFree(baij->svalues);
968 
969   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
970   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
971 
972   /* determine if any processor has disassembled, if so we must
973      also disassemble ourselfs, in order that we may reassemble. */
974   /*
975      if nonzero structure of submatrix B cannot change then we know that
976      no processor disassembled thus we can skip this stuff
977   */
978   if (!((Mat_SeqBAIJ*) baij->B->data)->nonew)  {
979     ierr = MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);CHKERRQ(ierr);
980     if (mat->was_assembled && !other_disassembled) {
981       ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
982     }
983   }
984 
985   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
986     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
987   }
988   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
989   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
990 
991 #if defined(USE_PETSC_BOPT_g)
992   if (baij->ht && mode== MAT_FINAL_ASSEMBLY) {
993     PLogInfo(0,"MatAssemblyEnd_MPIBAIJ:Average Hash Table Search in MatSetValues = %5.2f\n",
994              ((double)baij->ht_total_ct)/baij->ht_insert_ct);
995     baij->ht_total_ct  = 0;
996     baij->ht_insert_ct = 0;
997   }
998 #endif
999   if (baij->ht_flag && !baij->ht && mode == MAT_FINAL_ASSEMBLY) {
1000     ierr = MatCreateHashTable_MPIBAIJ_Private(mat,baij->ht_fact); CHKERRQ(ierr);
1001     mat->ops->setvalues        = MatSetValues_MPIBAIJ_HT;
1002     mat->ops->setvaluesblocked = MatSetValuesBlocked_MPIBAIJ_HT;
1003   }
1004 
1005   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
1006   PetscFunctionReturn(0);
1007 }
1008 #endif
1009 #undef __FUNC__
1010 #define __FUNC__ "MatAssemblyBegin_MPIBAIJ"
1011 int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
1012 {
1013   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1014   int         ierr;
1015   InsertMode  addv;
1016 
1017   PetscFunctionBegin;
1018   if (baij->donotstash) {
1019     PetscFunctionReturn(0);
1020   }
1021 
1022   /* make sure all processors are either in INSERTMODE or ADDMODE */
1023   ierr = MPI_Allreduce(&mat->insertmode,&addv,1,MPI_INT,MPI_BOR,mat->comm);CHKERRQ(ierr);
1024   if (addv == (ADD_VALUES|INSERT_VALUES)) {
1025     SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"Some processors inserted others added");
1026   }
1027   mat->insertmode = addv; /* in case this processor had no cache */
1028 
1029   ierr =  StashScatterBegin_Private(&baij->stash,baij->rowners); CHKERRQ(ierr);
1030 
1031   /* Free cache space */
1032   PLogInfo(baij->A,"MatAssemblyBegin_MPIBAIJ:Number of off-processor values %d\n",baij->stash.n);
1033   PetscFunctionReturn(0);
1034 }
1035 
1036 #undef __FUNC__
1037 #define __FUNC__ "MatAssemblyEnd_MPIBAIJ"
1038 int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
1039 {
1040   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1041   int         imdex,i,j,rstart,ncols,n,ierr;
1042   int         *row,*col,other_disassembled;
1043   Scalar      *val;
1044   InsertMode  addv = mat->insertmode;
1045 
1046   PetscFunctionBegin;
1047   if (!baij->donotstash) {
1048     ierr = StashScatterEnd_Private(&baij->stash); CHKERRQ(ierr);
1049     for (imdex=0; imdex<baij->stash.nrecvs; imdex++ ) {
1050       n   = baij->stash.rdata[imdex].n;
1051       row = baij->stash.rdata[imdex].i;
1052       col = baij->stash.rdata[imdex].j;
1053       val = baij->stash.rdata[imdex].a;
1054       for ( i=0; i<n; ) {
1055         /* Now identify the consecutive vals belonging to the same row */
1056         for ( j=i,rstart=row[j]; j<n; j++ ) { if (row[j] != rstart) break; }
1057         if (j < n) ncols = j-i;
1058         else       ncols = n-i;
1059         /* Now assemble all these values with a single function call */
1060         ierr = MatSetValues_MPIBAIJ(mat,1,row+i,ncols,col+i,val+i,addv); CHKERRQ(ierr);
1061         i = j;
1062       }
1063     }
1064   ierr = StashReset_Private(&baij->stash); CHKERRQ(ierr);
1065   }
1066 
1067   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
1068   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
1069 
1070   /* determine if any processor has disassembled, if so we must
1071      also disassemble ourselfs, in order that we may reassemble. */
1072   /*
1073      if nonzero structure of submatrix B cannot change then we know that
1074      no processor disassembled thus we can skip this stuff
1075   */
1076   if (!((Mat_SeqBAIJ*) baij->B->data)->nonew)  {
1077     ierr = MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);CHKERRQ(ierr);
1078     if (mat->was_assembled && !other_disassembled) {
1079       ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
1080     }
1081   }
1082 
1083   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
1084     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
1085   }
1086   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
1087   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
1088 
1089 #if defined(USE_PETSC_BOPT_g)
1090   if (baij->ht && mode== MAT_FINAL_ASSEMBLY) {
1091     PLogInfo(0,"MatAssemblyEnd_MPIBAIJ:Average Hash Table Search in MatSetValues = %5.2f\n",
1092              ((double)baij->ht_total_ct)/baij->ht_insert_ct);
1093     baij->ht_total_ct  = 0;
1094     baij->ht_insert_ct = 0;
1095   }
1096 #endif
1097   if (baij->ht_flag && !baij->ht && mode == MAT_FINAL_ASSEMBLY) {
1098     ierr = MatCreateHashTable_MPIBAIJ_Private(mat,baij->ht_fact); CHKERRQ(ierr);
1099     mat->ops->setvalues        = MatSetValues_MPIBAIJ_HT;
1100     mat->ops->setvaluesblocked = MatSetValuesBlocked_MPIBAIJ_HT;
1101   }
1102 
1103   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
1104   PetscFunctionReturn(0);
1105 }
1106 
1107 #undef __FUNC__
1108 #define __FUNC__ "MatView_MPIBAIJ_Binary"
1109 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
1110 {
1111   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
1112   int          ierr;
1113 
1114   PetscFunctionBegin;
1115   if (baij->size == 1) {
1116     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
1117   } else SETERRQ(PETSC_ERR_SUP,0,"Only uniprocessor output supported");
1118   PetscFunctionReturn(0);
1119 }
1120 
1121 #undef __FUNC__
1122 #define __FUNC__ "MatView_MPIBAIJ_ASCIIorDraworSocket"
1123 static int MatView_MPIBAIJ_ASCIIorDraworSocket(Mat mat,Viewer viewer)
1124 {
1125   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
1126   int          ierr, format,bs = baij->bs, size = baij->size, rank = baij->rank;
1127   FILE         *fd;
1128   ViewerType   vtype;
1129 
1130   PetscFunctionBegin;
1131   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
1132   if (PetscTypeCompare(vtype,ASCII_VIEWER)) {
1133     ierr = ViewerGetFormat(viewer,&format); CHKERRQ(ierr);
1134     if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
1135       MatInfo info;
1136       MPI_Comm_rank(mat->comm,&rank);
1137       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
1138       ierr = MatGetInfo(mat,MAT_LOCAL,&info);CHKERRQ(ierr);
1139       PetscSequentialPhaseBegin(mat->comm,1);
1140       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
1141               rank,baij->m,(int)info.nz_used*bs,(int)info.nz_allocated*bs,
1142               baij->bs,(int)info.memory);
1143       ierr = MatGetInfo(baij->A,MAT_LOCAL,&info);
1144       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
1145       ierr = MatGetInfo(baij->B,MAT_LOCAL,&info);
1146       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
1147       fflush(fd);
1148       PetscSequentialPhaseEnd(mat->comm,1);
1149       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
1150       PetscFunctionReturn(0);
1151     } else if (format == VIEWER_FORMAT_ASCII_INFO) {
1152       PetscPrintf(mat->comm,"  block size is %d\n",bs);
1153       PetscFunctionReturn(0);
1154     }
1155   }
1156 
1157   if (PetscTypeCompare(vtype,DRAW_VIEWER)) {
1158     Draw       draw;
1159     PetscTruth isnull;
1160     ierr = ViewerDrawGetDraw(viewer,0,&draw); CHKERRQ(ierr);
1161     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) PetscFunctionReturn(0);
1162   }
1163 
1164   if (size == 1) {
1165     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
1166   } else {
1167     /* assemble the entire matrix onto first processor. */
1168     Mat         A;
1169     Mat_SeqBAIJ *Aloc;
1170     int         M = baij->M, N = baij->N,*ai,*aj,col,i,j,k,*rvals;
1171     int         mbs = baij->mbs;
1172     Scalar      *a;
1173 
1174     if (!rank) {
1175       ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);CHKERRQ(ierr);
1176     } else {
1177       ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);CHKERRQ(ierr);
1178     }
1179     PLogObjectParent(mat,A);
1180 
1181     /* copy over the A part */
1182     Aloc = (Mat_SeqBAIJ*) baij->A->data;
1183     ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1184     rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
1185 
1186     for ( i=0; i<mbs; i++ ) {
1187       rvals[0] = bs*(baij->rstart + i);
1188       for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1189       for ( j=ai[i]; j<ai[i+1]; j++ ) {
1190         col = (baij->cstart+aj[j])*bs;
1191         for (k=0; k<bs; k++ ) {
1192           ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
1193           col++; a += bs;
1194         }
1195       }
1196     }
1197     /* copy over the B part */
1198     Aloc = (Mat_SeqBAIJ*) baij->B->data;
1199     ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1200     for ( i=0; i<mbs; i++ ) {
1201       rvals[0] = bs*(baij->rstart + i);
1202       for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1203       for ( j=ai[i]; j<ai[i+1]; j++ ) {
1204         col = baij->garray[aj[j]]*bs;
1205         for (k=0; k<bs; k++ ) {
1206           ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
1207           col++; a += bs;
1208         }
1209       }
1210     }
1211     PetscFree(rvals);
1212     ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1213     ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1214     /*
1215        Everyone has to call to draw the matrix since the graphics waits are
1216        synchronized across all processors that share the Draw object
1217     */
1218     if (!rank || PetscTypeCompare(vtype,DRAW_VIEWER)) {
1219       ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
1220     }
1221     ierr = MatDestroy(A); CHKERRQ(ierr);
1222   }
1223   PetscFunctionReturn(0);
1224 }
1225 
1226 
1227 
1228 #undef __FUNC__
1229 #define __FUNC__ "MatView_MPIBAIJ"
1230 int MatView_MPIBAIJ(Mat mat,Viewer viewer)
1231 {
1232   int         ierr;
1233   ViewerType  vtype;
1234 
1235   PetscFunctionBegin;
1236   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
1237   if (PetscTypeCompare(vtype,ASCII_VIEWER) || PetscTypeCompare(vtype,DRAW_VIEWER) ||
1238       PetscTypeCompare(vtype,SOCKET_VIEWER)) {
1239     ierr = MatView_MPIBAIJ_ASCIIorDraworSocket(mat,viewer); CHKERRQ(ierr);
1240   } else if (PetscTypeCompare(vtype,BINARY_VIEWER)) {
1241     ierr = MatView_MPIBAIJ_Binary(mat,viewer);CHKERRQ(ierr);
1242   } else {
1243     SETERRQ(1,1,"Viewer type not supported by PETSc object");
1244   }
1245   PetscFunctionReturn(0);
1246 }
1247 
1248 #undef __FUNC__
1249 #define __FUNC__ "MatDestroy_MPIBAIJ"
1250 int MatDestroy_MPIBAIJ(Mat mat)
1251 {
1252   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1253   int         ierr;
1254 
1255   PetscFunctionBegin;
1256   if (--mat->refct > 0) PetscFunctionReturn(0);
1257 
1258   if (mat->mapping) {
1259     ierr = ISLocalToGlobalMappingDestroy(mat->mapping); CHKERRQ(ierr);
1260   }
1261   if (mat->bmapping) {
1262     ierr = ISLocalToGlobalMappingDestroy(mat->bmapping); CHKERRQ(ierr);
1263   }
1264   if (mat->rmap) {
1265     ierr = MapDestroy(mat->rmap);CHKERRQ(ierr);
1266   }
1267   if (mat->cmap) {
1268     ierr = MapDestroy(mat->cmap);CHKERRQ(ierr);
1269   }
1270 #if defined(USE_PETSC_LOG)
1271   PLogObjectState((PetscObject)mat,"Rows=%d, Cols=%d",baij->M,baij->N);
1272 #endif
1273 
1274   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
1275   PetscFree(baij->rowners);
1276   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
1277   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
1278 #if defined (USE_CTABLE)
1279   if (baij->colmap) TableDelete(baij->colmap);
1280 #else
1281   if (baij->colmap) PetscFree(baij->colmap);
1282 #endif
1283   if (baij->garray) PetscFree(baij->garray);
1284   if (baij->lvec)   VecDestroy(baij->lvec);
1285   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
1286   if (baij->rowvalues) PetscFree(baij->rowvalues);
1287   if (baij->barray) PetscFree(baij->barray);
1288   if (baij->hd) PetscFree(baij->hd);
1289   PetscFree(baij);
1290   PLogObjectDestroy(mat);
1291   PetscHeaderDestroy(mat);
1292   PetscFunctionReturn(0);
1293 }
1294 
1295 #undef __FUNC__
1296 #define __FUNC__ "MatMult_MPIBAIJ"
1297 int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
1298 {
1299   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1300   int         ierr, nt;
1301 
1302   PetscFunctionBegin;
1303   ierr = VecGetLocalSize(xx,&nt);CHKERRQ(ierr);
1304   if (nt != a->n) {
1305     SETERRQ(PETSC_ERR_ARG_SIZ,0,"Incompatible partition of A and xx");
1306   }
1307   ierr = VecGetLocalSize(yy,&nt);CHKERRQ(ierr);
1308   if (nt != a->m) {
1309     SETERRQ(PETSC_ERR_ARG_SIZ,0,"Incompatible parition of A and yy");
1310   }
1311   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
1312   ierr = (*a->A->ops->mult)(a->A,xx,yy); CHKERRQ(ierr);
1313   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
1314   ierr = (*a->B->ops->multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
1315   ierr = VecScatterPostRecvs(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
1316   PetscFunctionReturn(0);
1317 }
1318 
1319 #undef __FUNC__
1320 #define __FUNC__ "MatMultAdd_MPIBAIJ"
1321 int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
1322 {
1323   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1324   int        ierr;
1325 
1326   PetscFunctionBegin;
1327   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
1328   ierr = (*a->A->ops->multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
1329   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
1330   ierr = (*a->B->ops->multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
1331   PetscFunctionReturn(0);
1332 }
1333 
1334 #undef __FUNC__
1335 #define __FUNC__ "MatMultTrans_MPIBAIJ"
1336 int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
1337 {
1338   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1339   int         ierr;
1340 
1341   PetscFunctionBegin;
1342   /* do nondiagonal part */
1343   ierr = (*a->B->ops->multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
1344   /* send it on its way */
1345   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
1346   /* do local part */
1347   ierr = (*a->A->ops->multtrans)(a->A,xx,yy); CHKERRQ(ierr);
1348   /* receive remote parts: note this assumes the values are not actually */
1349   /* inserted in yy until the next line, which is true for my implementation*/
1350   /* but is not perhaps always true. */
1351   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
1352   PetscFunctionReturn(0);
1353 }
1354 
1355 #undef __FUNC__
1356 #define __FUNC__ "MatMultTransAdd_MPIBAIJ"
1357 int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
1358 {
1359   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1360   int         ierr;
1361 
1362   PetscFunctionBegin;
1363   /* do nondiagonal part */
1364   ierr = (*a->B->ops->multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
1365   /* send it on its way */
1366   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
1367   /* do local part */
1368   ierr = (*a->A->ops->multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
1369   /* receive remote parts: note this assumes the values are not actually */
1370   /* inserted in yy until the next line, which is true for my implementation*/
1371   /* but is not perhaps always true. */
1372   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
1373   PetscFunctionReturn(0);
1374 }
1375 
1376 /*
1377   This only works correctly for square matrices where the subblock A->A is the
1378    diagonal block
1379 */
1380 #undef __FUNC__
1381 #define __FUNC__ "MatGetDiagonal_MPIBAIJ"
1382 int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
1383 {
1384   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1385   int         ierr;
1386 
1387   PetscFunctionBegin;
1388   if (a->M != a->N) SETERRQ(PETSC_ERR_SUP,0,"Supports only square matrix where A->A is diag block");
1389   ierr = MatGetDiagonal(a->A,v);CHKERRQ(ierr);
1390   PetscFunctionReturn(0);
1391 }
1392 
1393 #undef __FUNC__
1394 #define __FUNC__ "MatScale_MPIBAIJ"
1395 int MatScale_MPIBAIJ(Scalar *aa,Mat A)
1396 {
1397   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1398   int         ierr;
1399 
1400   PetscFunctionBegin;
1401   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
1402   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
1403   PetscFunctionReturn(0);
1404 }
1405 
1406 #undef __FUNC__
1407 #define __FUNC__ "MatGetSize_MPIBAIJ"
1408 int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
1409 {
1410   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1411 
1412   PetscFunctionBegin;
1413   if (m) *m = mat->M;
1414   if (n) *n = mat->N;
1415   PetscFunctionReturn(0);
1416 }
1417 
1418 #undef __FUNC__
1419 #define __FUNC__ "MatGetLocalSize_MPIBAIJ"
1420 int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
1421 {
1422   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1423 
1424   PetscFunctionBegin;
1425   *m = mat->m; *n = mat->n;
1426   PetscFunctionReturn(0);
1427 }
1428 
1429 #undef __FUNC__
1430 #define __FUNC__ "MatGetOwnershipRange_MPIBAIJ"
1431 int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
1432 {
1433   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1434 
1435   PetscFunctionBegin;
1436   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
1437   PetscFunctionReturn(0);
1438 }
1439 
1440 #undef __FUNC__
1441 #define __FUNC__ "MatGetRow_MPIBAIJ"
1442 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
1443 {
1444   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1445   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
1446   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
1447   int        nztot, nzA, nzB, lrow, brstart = mat->rstart*bs, brend = mat->rend*bs;
1448   int        *cmap, *idx_p,cstart = mat->cstart;
1449 
1450   PetscFunctionBegin;
1451   if (mat->getrowactive == PETSC_TRUE) SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"Already active");
1452   mat->getrowactive = PETSC_TRUE;
1453 
1454   if (!mat->rowvalues && (idx || v)) {
1455     /*
1456         allocate enough space to hold information from the longest row.
1457     */
1458     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
1459     int     max = 1,mbs = mat->mbs,tmp;
1460     for ( i=0; i<mbs; i++ ) {
1461       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
1462       if (max < tmp) { max = tmp; }
1463     }
1464     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
1465     CHKPTRQ(mat->rowvalues);
1466     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
1467   }
1468 
1469   if (row < brstart || row >= brend) SETERRQ(PETSC_ERR_SUP,0,"Only local rows")
1470   lrow = row - brstart;
1471 
1472   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
1473   if (!v)   {pvA = 0; pvB = 0;}
1474   if (!idx) {pcA = 0; if (!v) pcB = 0;}
1475   ierr = (*mat->A->ops->getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1476   ierr = (*mat->B->ops->getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1477   nztot = nzA + nzB;
1478 
1479   cmap  = mat->garray;
1480   if (v  || idx) {
1481     if (nztot) {
1482       /* Sort by increasing column numbers, assuming A and B already sorted */
1483       int imark = -1;
1484       if (v) {
1485         *v = v_p = mat->rowvalues;
1486         for ( i=0; i<nzB; i++ ) {
1487           if (cmap[cworkB[i]/bs] < cstart)   v_p[i] = vworkB[i];
1488           else break;
1489         }
1490         imark = i;
1491         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
1492         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
1493       }
1494       if (idx) {
1495         *idx = idx_p = mat->rowindices;
1496         if (imark > -1) {
1497           for ( i=0; i<imark; i++ ) {
1498             idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs;
1499           }
1500         } else {
1501           for ( i=0; i<nzB; i++ ) {
1502             if (cmap[cworkB[i]/bs] < cstart)
1503               idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
1504             else break;
1505           }
1506           imark = i;
1507         }
1508         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart*bs + cworkA[i];
1509         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
1510       }
1511     } else {
1512       if (idx) *idx = 0;
1513       if (v)   *v   = 0;
1514     }
1515   }
1516   *nz = nztot;
1517   ierr = (*mat->A->ops->restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1518   ierr = (*mat->B->ops->restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1519   PetscFunctionReturn(0);
1520 }
1521 
1522 #undef __FUNC__
1523 #define __FUNC__ "MatRestoreRow_MPIBAIJ"
1524 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
1525 {
1526   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1527 
1528   PetscFunctionBegin;
1529   if (baij->getrowactive == PETSC_FALSE) {
1530     SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"MatGetRow not called");
1531   }
1532   baij->getrowactive = PETSC_FALSE;
1533   PetscFunctionReturn(0);
1534 }
1535 
1536 #undef __FUNC__
1537 #define __FUNC__ "MatGetBlockSize_MPIBAIJ"
1538 int MatGetBlockSize_MPIBAIJ(Mat mat,int *bs)
1539 {
1540   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1541 
1542   PetscFunctionBegin;
1543   *bs = baij->bs;
1544   PetscFunctionReturn(0);
1545 }
1546 
1547 #undef __FUNC__
1548 #define __FUNC__ "MatZeroEntries_MPIBAIJ"
1549 int MatZeroEntries_MPIBAIJ(Mat A)
1550 {
1551   Mat_MPIBAIJ *l = (Mat_MPIBAIJ *) A->data;
1552   int         ierr;
1553 
1554   PetscFunctionBegin;
1555   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
1556   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
1557   PetscFunctionReturn(0);
1558 }
1559 
1560 #undef __FUNC__
1561 #define __FUNC__ "MatGetInfo_MPIBAIJ"
1562 int MatGetInfo_MPIBAIJ(Mat matin,MatInfoType flag,MatInfo *info)
1563 {
1564   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) matin->data;
1565   Mat         A = a->A, B = a->B;
1566   int         ierr;
1567   double      isend[5], irecv[5];
1568 
1569   PetscFunctionBegin;
1570   info->block_size     = (double)a->bs;
1571   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
1572   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->nz_unneeded;
1573   isend[3] = info->memory;  isend[4] = info->mallocs;
1574   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
1575   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->nz_unneeded;
1576   isend[3] += info->memory;  isend[4] += info->mallocs;
1577   if (flag == MAT_LOCAL) {
1578     info->nz_used      = isend[0];
1579     info->nz_allocated = isend[1];
1580     info->nz_unneeded  = isend[2];
1581     info->memory       = isend[3];
1582     info->mallocs      = isend[4];
1583   } else if (flag == MAT_GLOBAL_MAX) {
1584     ierr = MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_MAX,matin->comm);CHKERRQ(ierr);
1585     info->nz_used      = irecv[0];
1586     info->nz_allocated = irecv[1];
1587     info->nz_unneeded  = irecv[2];
1588     info->memory       = irecv[3];
1589     info->mallocs      = irecv[4];
1590   } else if (flag == MAT_GLOBAL_SUM) {
1591     ierr = MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_SUM,matin->comm);CHKERRQ(ierr);
1592     info->nz_used      = irecv[0];
1593     info->nz_allocated = irecv[1];
1594     info->nz_unneeded  = irecv[2];
1595     info->memory       = irecv[3];
1596     info->mallocs      = irecv[4];
1597   } else {
1598     SETERRQ1(1,1,"Unknown MatInfoType argument %d",flag);
1599   }
1600   info->rows_global       = (double)a->M;
1601   info->columns_global    = (double)a->N;
1602   info->rows_local        = (double)a->m;
1603   info->columns_local     = (double)a->N;
1604   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
1605   info->fill_ratio_needed = 0;
1606   info->factor_mallocs    = 0;
1607   PetscFunctionReturn(0);
1608 }
1609 
1610 #undef __FUNC__
1611 #define __FUNC__ "MatSetOption_MPIBAIJ"
1612 int MatSetOption_MPIBAIJ(Mat A,MatOption op)
1613 {
1614   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1615   int         ierr;
1616 
1617   PetscFunctionBegin;
1618   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
1619       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
1620       op == MAT_COLUMNS_UNSORTED ||
1621       op == MAT_COLUMNS_SORTED ||
1622       op == MAT_NEW_NONZERO_ALLOCATION_ERR ||
1623       op == MAT_NEW_NONZERO_LOCATION_ERR) {
1624         ierr = MatSetOption(a->A,op);CHKERRQ(ierr);
1625         ierr = MatSetOption(a->B,op);CHKERRQ(ierr);
1626   } else if (op == MAT_ROW_ORIENTED) {
1627         a->roworiented = 1;
1628         ierr = MatSetOption(a->A,op);CHKERRQ(ierr);
1629         ierr = MatSetOption(a->B,op);CHKERRQ(ierr);
1630   } else if (op == MAT_ROWS_SORTED ||
1631              op == MAT_ROWS_UNSORTED ||
1632              op == MAT_SYMMETRIC ||
1633              op == MAT_STRUCTURALLY_SYMMETRIC ||
1634              op == MAT_YES_NEW_DIAGONALS ||
1635              op == MAT_USE_HASH_TABLE) {
1636     PLogInfo(A,"Info:MatSetOption_MPIBAIJ:Option ignored\n");
1637   } else if (op == MAT_COLUMN_ORIENTED) {
1638     a->roworiented = 0;
1639     ierr = MatSetOption(a->A,op);CHKERRQ(ierr);
1640     ierr = MatSetOption(a->B,op);CHKERRQ(ierr);
1641   } else if (op == MAT_IGNORE_OFF_PROC_ENTRIES) {
1642     a->donotstash = 1;
1643   } else if (op == MAT_NO_NEW_DIAGONALS) {
1644     SETERRQ(PETSC_ERR_SUP,0,"MAT_NO_NEW_DIAGONALS");
1645   } else if (op == MAT_USE_HASH_TABLE) {
1646     a->ht_flag = 1;
1647   } else {
1648     SETERRQ(PETSC_ERR_SUP,0,"unknown option");
1649   }
1650   PetscFunctionReturn(0);
1651 }
1652 
1653 #undef __FUNC__
1654 #define __FUNC__ "MatTranspose_MPIBAIJ("
1655 int MatTranspose_MPIBAIJ(Mat A,Mat *matout)
1656 {
1657   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) A->data;
1658   Mat_SeqBAIJ *Aloc;
1659   Mat        B;
1660   int        ierr,M=baij->M,N=baij->N,*ai,*aj,i,*rvals,j,k,col;
1661   int        bs=baij->bs,mbs=baij->mbs;
1662   Scalar     *a;
1663 
1664   PetscFunctionBegin;
1665   if (matout == PETSC_NULL && M != N) SETERRQ(PETSC_ERR_ARG_SIZ,0,"Square matrix only for in-place");
1666   ierr = MatCreateMPIBAIJ(A->comm,baij->bs,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);
1667   CHKERRQ(ierr);
1668 
1669   /* copy over the A part */
1670   Aloc = (Mat_SeqBAIJ*) baij->A->data;
1671   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1672   rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
1673 
1674   for ( i=0; i<mbs; i++ ) {
1675     rvals[0] = bs*(baij->rstart + i);
1676     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1677     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1678       col = (baij->cstart+aj[j])*bs;
1679       for (k=0; k<bs; k++ ) {
1680         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1681         col++; a += bs;
1682       }
1683     }
1684   }
1685   /* copy over the B part */
1686   Aloc = (Mat_SeqBAIJ*) baij->B->data;
1687   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1688   for ( i=0; i<mbs; i++ ) {
1689     rvals[0] = bs*(baij->rstart + i);
1690     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1691     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1692       col = baij->garray[aj[j]]*bs;
1693       for (k=0; k<bs; k++ ) {
1694         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1695         col++; a += bs;
1696       }
1697     }
1698   }
1699   PetscFree(rvals);
1700   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1701   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1702 
1703   if (matout != PETSC_NULL) {
1704     *matout = B;
1705   } else {
1706     PetscOps *Abops;
1707     MatOps   Aops;
1708 
1709     /* This isn't really an in-place transpose .... but free data structures from baij */
1710     PetscFree(baij->rowners);
1711     ierr = MatDestroy(baij->A); CHKERRQ(ierr);
1712     ierr = MatDestroy(baij->B); CHKERRQ(ierr);
1713 #if defined (USE_CTABLE)
1714     if (baij->colmap) TableDelete(baij->colmap);
1715 #else
1716     if (baij->colmap) PetscFree(baij->colmap);
1717 #endif
1718     if (baij->garray) PetscFree(baij->garray);
1719     if (baij->lvec) VecDestroy(baij->lvec);
1720     if (baij->Mvctx) VecScatterDestroy(baij->Mvctx);
1721     PetscFree(baij);
1722 
1723     /*
1724        This is horrible, horrible code. We need to keep the
1725       A pointers for the bops and ops but copy everything
1726       else from C.
1727     */
1728     Abops = A->bops;
1729     Aops  = A->ops;
1730     PetscMemcpy(A,B,sizeof(struct _p_Mat));
1731     A->bops = Abops;
1732     A->ops  = Aops;
1733 
1734     PetscHeaderDestroy(B);
1735   }
1736   PetscFunctionReturn(0);
1737 }
1738 
1739 #undef __FUNC__
1740 #define __FUNC__ "MatDiagonalScale_MPIBAIJ"
1741 int MatDiagonalScale_MPIBAIJ(Mat A,Vec ll,Vec rr)
1742 {
1743   Mat a = ((Mat_MPIBAIJ *) A->data)->A;
1744   Mat b = ((Mat_MPIBAIJ *) A->data)->B;
1745   int ierr,s1,s2,s3;
1746 
1747   PetscFunctionBegin;
1748   if (ll)  {
1749     ierr = VecGetLocalSize(ll,&s1); CHKERRQ(ierr);
1750     ierr = MatGetLocalSize(A,&s2,&s3); CHKERRQ(ierr);
1751     if (s1!=s2) SETERRQ(PETSC_ERR_ARG_SIZ,0,"non-conforming local sizes");
1752     ierr = MatDiagonalScale(a,ll,0); CHKERRQ(ierr);
1753     ierr = MatDiagonalScale(b,ll,0); CHKERRQ(ierr);
1754   }
1755   if (rr) SETERRQ(PETSC_ERR_SUP,0,"not supported for right vector");
1756   PetscFunctionReturn(0);
1757 }
1758 
1759 #undef __FUNC__
1760 #define __FUNC__ "MatZeroRows_MPIBAIJ"
1761 int MatZeroRows_MPIBAIJ(Mat A,IS is,Scalar *diag)
1762 {
1763   Mat_MPIBAIJ    *l = (Mat_MPIBAIJ *) A->data;
1764   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
1765   int            *procs,*nprocs,j,found,idx,nsends,*work,row;
1766   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
1767   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
1768   int            *lens,imdex,*lrows,*values,bs=l->bs,rstart_bs=l->rstart_bs;
1769   MPI_Comm       comm = A->comm;
1770   MPI_Request    *send_waits,*recv_waits;
1771   MPI_Status     recv_status,*send_status;
1772   IS             istmp;
1773 
1774   PetscFunctionBegin;
1775   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
1776   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
1777 
1778   /*  first count number of contributors to each processor */
1779   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
1780   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
1781   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
1782   for ( i=0; i<N; i++ ) {
1783     idx = rows[i];
1784     found = 0;
1785     for ( j=0; j<size; j++ ) {
1786       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
1787         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
1788       }
1789     }
1790     if (!found) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Index out of range");
1791   }
1792   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
1793 
1794   /* inform other processors of number of messages and max length*/
1795   work   = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
1796   ierr   = MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
1797   nrecvs = work[rank];
1798   ierr   = MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);CHKERRQ(ierr);
1799   nmax   = work[rank];
1800   PetscFree(work);
1801 
1802   /* post receives:   */
1803   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); CHKPTRQ(rvalues);
1804   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
1805   for ( i=0; i<nrecvs; i++ ) {
1806     ierr = MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);CHKERRQ(ierr);
1807   }
1808 
1809   /* do sends:
1810      1) starts[i] gives the starting index in svalues for stuff going to
1811      the ith processor
1812   */
1813   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
1814   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
1815   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
1816   starts[0] = 0;
1817   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1818   for ( i=0; i<N; i++ ) {
1819     svalues[starts[owner[i]]++] = rows[i];
1820   }
1821   ISRestoreIndices(is,&rows);
1822 
1823   starts[0] = 0;
1824   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1825   count = 0;
1826   for ( i=0; i<size; i++ ) {
1827     if (procs[i]) {
1828       ierr = MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);CHKERRQ(ierr);
1829     }
1830   }
1831   PetscFree(starts);
1832 
1833   base = owners[rank]*bs;
1834 
1835   /*  wait on receives */
1836   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
1837   source = lens + nrecvs;
1838   count  = nrecvs; slen = 0;
1839   while (count) {
1840     ierr = MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);CHKERRQ(ierr);
1841     /* unpack receives into our local space */
1842     ierr = MPI_Get_count(&recv_status,MPI_INT,&n);CHKERRQ(ierr);
1843     source[imdex]  = recv_status.MPI_SOURCE;
1844     lens[imdex]  = n;
1845     slen += n;
1846     count--;
1847   }
1848   PetscFree(recv_waits);
1849 
1850   /* move the data into the send scatter */
1851   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
1852   count = 0;
1853   for ( i=0; i<nrecvs; i++ ) {
1854     values = rvalues + i*nmax;
1855     for ( j=0; j<lens[i]; j++ ) {
1856       lrows[count++] = values[j] - base;
1857     }
1858   }
1859   PetscFree(rvalues); PetscFree(lens);
1860   PetscFree(owner); PetscFree(nprocs);
1861 
1862   /* actually zap the local rows */
1863   ierr = ISCreateGeneral(PETSC_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
1864   PLogObjectParent(A,istmp);
1865 
1866   /*
1867         Zero the required rows. If the "diagonal block" of the matrix
1868      is square and the user wishes to set the diagonal we use seperate
1869      code so that MatSetValues() is not called for each diagonal allocating
1870      new memory, thus calling lots of mallocs and slowing things down.
1871 
1872        Contributed by: Mathew Knepley
1873   */
1874   /* must zero l->B before l->A because the (diag) case below may put values into l->B*/
1875   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
1876   if (diag && (l->A->M == l->A->N)) {
1877     ierr      = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
1878   } else if (diag) {
1879     ierr = MatZeroRows(l->A,istmp,0); CHKERRQ(ierr);
1880     if (((Mat_SeqBAIJ*)l->A->data)->nonew) {
1881       SETERRQ(PETSC_ERR_SUP,0,"MatZeroRows() on rectangular matrices cannot be used with the Mat options \n\
1882 MAT_NO_NEW_NONZERO_LOCATIONS,MAT_NEW_NONZERO_LOCATION_ERR,MAT_NEW_NONZERO_ALLOCATION_ERR");
1883     }
1884     for ( i = 0; i < slen; i++ ) {
1885       row  = lrows[i] + rstart_bs;
1886       ierr = MatSetValues(A,1,&row,1,&row,diag,INSERT_VALUES);CHKERRQ(ierr);
1887     }
1888     ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
1889     ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
1890   } else {
1891     ierr = MatZeroRows(l->A,istmp,0); CHKERRQ(ierr);
1892   }
1893 
1894   ierr = ISDestroy(istmp); CHKERRQ(ierr);
1895   PetscFree(lrows);
1896 
1897   /* wait on sends */
1898   if (nsends) {
1899     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
1900     ierr        = MPI_Waitall(nsends,send_waits,send_status);CHKERRQ(ierr);
1901     PetscFree(send_status);
1902   }
1903   PetscFree(send_waits); PetscFree(svalues);
1904 
1905   PetscFunctionReturn(0);
1906 }
1907 
1908 #undef __FUNC__
1909 #define __FUNC__ "MatPrintHelp_MPIBAIJ"
1910 int MatPrintHelp_MPIBAIJ(Mat A)
1911 {
1912   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1913   MPI_Comm    comm = A->comm;
1914   static int  called = 0;
1915   int         ierr;
1916 
1917   PetscFunctionBegin;
1918   if (!a->rank) {
1919     ierr = MatPrintHelp_SeqBAIJ(a->A);CHKERRQ(ierr);
1920   }
1921   if (called) {PetscFunctionReturn(0);} else called = 1;
1922   (*PetscHelpPrintf)(comm," Options for MATMPIBAIJ matrix format (the defaults):\n");
1923   (*PetscHelpPrintf)(comm,"  -mat_use_hash_table <factor>: Use hashtable for efficient matrix assembly\n");
1924   PetscFunctionReturn(0);
1925 }
1926 
1927 #undef __FUNC__
1928 #define __FUNC__ "MatSetUnfactored_MPIBAIJ"
1929 int MatSetUnfactored_MPIBAIJ(Mat A)
1930 {
1931   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1932   int         ierr;
1933 
1934   PetscFunctionBegin;
1935   ierr = MatSetUnfactored(a->A); CHKERRQ(ierr);
1936   PetscFunctionReturn(0);
1937 }
1938 
1939 static int MatDuplicate_MPIBAIJ(Mat,MatDuplicateOption,Mat *);
1940 
1941 /* -------------------------------------------------------------------*/
1942 static struct _MatOps MatOps_Values = {
1943   MatSetValues_MPIBAIJ,
1944   MatGetRow_MPIBAIJ,
1945   MatRestoreRow_MPIBAIJ,
1946   MatMult_MPIBAIJ,
1947   MatMultAdd_MPIBAIJ,
1948   MatMultTrans_MPIBAIJ,
1949   MatMultTransAdd_MPIBAIJ,
1950   0,
1951   0,
1952   0,
1953   0,
1954   0,
1955   0,
1956   0,
1957   MatTranspose_MPIBAIJ,
1958   MatGetInfo_MPIBAIJ,
1959   0,
1960   MatGetDiagonal_MPIBAIJ,
1961   MatDiagonalScale_MPIBAIJ,
1962   MatNorm_MPIBAIJ,
1963   MatAssemblyBegin_MPIBAIJ,
1964   MatAssemblyEnd_MPIBAIJ,
1965   0,
1966   MatSetOption_MPIBAIJ,
1967   MatZeroEntries_MPIBAIJ,
1968   MatZeroRows_MPIBAIJ,
1969   0,
1970   0,
1971   0,
1972   0,
1973   MatGetSize_MPIBAIJ,
1974   MatGetLocalSize_MPIBAIJ,
1975   MatGetOwnershipRange_MPIBAIJ,
1976   0,
1977   0,
1978   0,
1979   0,
1980   MatDuplicate_MPIBAIJ,
1981   0,
1982   0,
1983   0,
1984   0,
1985   0,
1986   MatGetSubMatrices_MPIBAIJ,
1987   MatIncreaseOverlap_MPIBAIJ,
1988   MatGetValues_MPIBAIJ,
1989   0,
1990   MatPrintHelp_MPIBAIJ,
1991   MatScale_MPIBAIJ,
1992   0,
1993   0,
1994   0,
1995   MatGetBlockSize_MPIBAIJ,
1996   0,
1997   0,
1998   0,
1999   0,
2000   0,
2001   0,
2002   MatSetUnfactored_MPIBAIJ,
2003   0,
2004   MatSetValuesBlocked_MPIBAIJ,
2005   0,
2006   0,
2007   0,
2008   MatGetMaps_Petsc};
2009 
2010 
2011 EXTERN_C_BEGIN
2012 #undef __FUNC__
2013 #define __FUNC__ "MatGetDiagonalBlock_MPIBAIJ"
2014 int MatGetDiagonalBlock_MPIBAIJ(Mat A,PetscTruth *iscopy,MatReuse reuse,Mat *a)
2015 {
2016   PetscFunctionBegin;
2017   *a      = ((Mat_MPIBAIJ *)A->data)->A;
2018   *iscopy = PETSC_FALSE;
2019   PetscFunctionReturn(0);
2020 }
2021 EXTERN_C_END
2022 
2023 #undef __FUNC__
2024 #define __FUNC__ "MatCreateMPIBAIJ"
2025 /*@C
2026    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
2027    (block compressed row).  For good matrix assembly performance
2028    the user should preallocate the matrix storage by setting the parameters
2029    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
2030    performance can be increased by more than a factor of 50.
2031 
2032    Collective on MPI_Comm
2033 
2034    Input Parameters:
2035 +  comm - MPI communicator
2036 .  bs   - size of blockk
2037 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
2038            This value should be the same as the local size used in creating the
2039            y vector for the matrix-vector product y = Ax.
2040 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
2041            This value should be the same as the local size used in creating the
2042            x vector for the matrix-vector product y = Ax.
2043 .  M - number of global rows (or PETSC_DETERMINE to have calculated if m is given)
2044 .  N - number of global columns (or PETSC_DETERMINE to have calculated if n is given)
2045 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
2046            submatrix  (same for all local rows)
2047 .  d_nzz - array containing the number of block nonzeros in the various block rows
2048            of the in diagonal portion of the local (possibly different for each block
2049            row) or PETSC_NULL.  You must leave room for the diagonal entry even if it is zero.
2050 .  o_nz  - number of block nonzeros per block row in the off-diagonal portion of local
2051            submatrix (same for all local rows).
2052 -  o_nzz - array containing the number of nonzeros in the various block rows of the
2053            off-diagonal portion of the local submatrix (possibly different for
2054            each block row) or PETSC_NULL.
2055 
2056    Output Parameter:
2057 .  A - the matrix
2058 
2059    Options Database Keys:
2060 .   -mat_no_unroll - uses code that does not unroll the loops in the
2061                      block calculations (much slower)
2062 .   -mat_block_size - size of the blocks to use
2063 .   -mat_mpi - use the parallel matrix data structures even on one processor
2064                (defaults to using SeqBAIJ format on one processor)
2065 
2066    Notes:
2067    The user MUST specify either the local or global matrix dimensions
2068    (possibly both).
2069 
2070    If PETSC_DECIDE or  PETSC_DETERMINE is used for a particular argument on one processor
2071    than it must be used on all processors that share the object for that argument.
2072 
2073    Storage Information:
2074    For a square global matrix we define each processor's diagonal portion
2075    to be its local rows and the corresponding columns (a square submatrix);
2076    each processor's off-diagonal portion encompasses the remainder of the
2077    local matrix (a rectangular submatrix).
2078 
2079    The user can specify preallocated storage for the diagonal part of
2080    the local submatrix with either d_nz or d_nnz (not both).  Set
2081    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
2082    memory allocation.  Likewise, specify preallocated storage for the
2083    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
2084 
2085    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
2086    the figure below we depict these three local rows and all columns (0-11).
2087 
2088 .vb
2089            0 1 2 3 4 5 6 7 8 9 10 11
2090           -------------------
2091    row 3  |  o o o d d d o o o o o o
2092    row 4  |  o o o d d d o o o o o o
2093    row 5  |  o o o d d d o o o o o o
2094           -------------------
2095 .ve
2096 
2097    Thus, any entries in the d locations are stored in the d (diagonal)
2098    submatrix, and any entries in the o locations are stored in the
2099    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
2100    stored simply in the MATSEQBAIJ format for compressed row storage.
2101 
2102    Now d_nz should indicate the number of block nonzeros per row in the d matrix,
2103    and o_nz should indicate the number of block nonzeros per row in the o matrix.
2104    In general, for PDE problems in which most nonzeros are near the diagonal,
2105    one expects d_nz >> o_nz.   For large problems you MUST preallocate memory
2106    or you will get TERRIBLE performance; see the users' manual chapter on
2107    matrices.
2108 
2109    Level: intermediate
2110 
2111 .keywords: matrix, block, aij, compressed row, sparse, parallel
2112 
2113 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues(), MatCreateMPIAIJ()
2114 @*/
2115 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
2116                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
2117 {
2118   Mat          B;
2119   Mat_MPIBAIJ  *b;
2120   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE,size,flg;
2121   int          flag1 = 0,flag2 = 0;
2122 
2123   PetscFunctionBegin;
2124   if (bs < 1) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Invalid block size specified, must be positive");
2125 
2126   MPI_Comm_size(comm,&size);
2127   ierr = OptionsHasName(PETSC_NULL,"-mat_mpibaij",&flag1); CHKERRQ(ierr);
2128   ierr = OptionsHasName(PETSC_NULL,"-mat_mpi",&flag2); CHKERRQ(ierr);
2129   if (!flag1 && !flag2 && size == 1) {
2130     if (M == PETSC_DECIDE) M = m;
2131     if (N == PETSC_DECIDE) N = n;
2132     ierr = MatCreateSeqBAIJ(comm,bs,M,N,d_nz,d_nnz,A); CHKERRQ(ierr);
2133     PetscFunctionReturn(0);
2134   }
2135 
2136   *A = 0;
2137   PetscHeaderCreate(B,_p_Mat,struct _MatOps,MAT_COOKIE,MATMPIBAIJ,"Mat",comm,MatDestroy,MatView);
2138   PLogObjectCreate(B);
2139   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
2140   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
2141   PetscMemcpy(B->ops,&MatOps_Values,sizeof(struct _MatOps));
2142 
2143   B->ops->destroy    = MatDestroy_MPIBAIJ;
2144   B->ops->view       = MatView_MPIBAIJ;
2145   B->mapping    = 0;
2146   B->factor     = 0;
2147   B->assembled  = PETSC_FALSE;
2148 
2149   B->insertmode = NOT_SET_VALUES;
2150   MPI_Comm_rank(comm,&b->rank);
2151   MPI_Comm_size(comm,&b->size);
2152 
2153   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL)) {
2154     SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
2155   }
2156   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) {
2157     SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"either M or m should be specified");
2158   }
2159   if ( N == PETSC_DECIDE && n == PETSC_DECIDE) {
2160     SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"either N or n should be specified");
2161   }
2162   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
2163   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
2164 
2165   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
2166     work[0] = m; work[1] = n;
2167     mbs = m/bs; nbs = n/bs;
2168     ierr = MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );CHKERRQ(ierr);
2169     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
2170     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
2171   }
2172   if (m == PETSC_DECIDE) {
2173     Mbs = M/bs;
2174     if (Mbs*bs != M) SETERRQ(PETSC_ERR_ARG_SIZ,0,"No of global rows must be divisible by blocksize");
2175     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
2176     m   = mbs*bs;
2177   }
2178   if (n == PETSC_DECIDE) {
2179     Nbs = N/bs;
2180     if (Nbs*bs != N) SETERRQ(PETSC_ERR_ARG_SIZ,0,"No of global cols must be divisible by blocksize");
2181     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
2182     n   = nbs*bs;
2183   }
2184   if (mbs*bs != m || nbs*bs != n) {
2185     SETERRQ(PETSC_ERR_ARG_SIZ,0,"No of local rows, cols must be divisible by blocksize");
2186   }
2187 
2188   b->m = m; B->m = m;
2189   b->n = n; B->n = n;
2190   b->N = N; B->N = N;
2191   b->M = M; B->M = M;
2192   b->bs  = bs;
2193   b->bs2 = bs*bs;
2194   b->mbs = mbs;
2195   b->nbs = nbs;
2196   b->Mbs = Mbs;
2197   b->Nbs = Nbs;
2198 
2199   /* the information in the maps duplicates the information computed below, eventually
2200      we should remove the duplicate information that is not contained in the maps */
2201   ierr = MapCreateMPI(comm,m,M,&B->rmap);CHKERRQ(ierr);
2202   ierr = MapCreateMPI(comm,n,N,&B->cmap);CHKERRQ(ierr);
2203 
2204   /* build local table of row and column ownerships */
2205   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
2206   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIBAIJ));
2207   b->cowners = b->rowners + b->size + 2;
2208   ierr = MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
2209   b->rowners[0] = 0;
2210   for ( i=2; i<=b->size; i++ ) {
2211     b->rowners[i] += b->rowners[i-1];
2212   }
2213   b->rstart    = b->rowners[b->rank];
2214   b->rend      = b->rowners[b->rank+1];
2215   b->rstart_bs = b->rstart * bs;
2216   b->rend_bs   = b->rend * bs;
2217 
2218   ierr = MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
2219   b->cowners[0] = 0;
2220   for ( i=2; i<=b->size; i++ ) {
2221     b->cowners[i] += b->cowners[i-1];
2222   }
2223   b->cstart    = b->cowners[b->rank];
2224   b->cend      = b->cowners[b->rank+1];
2225   b->cstart_bs = b->cstart * bs;
2226   b->cend_bs   = b->cend * bs;
2227 
2228 
2229   if (d_nz == PETSC_DEFAULT) d_nz = 5;
2230   ierr = MatCreateSeqBAIJ(PETSC_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
2231   PLogObjectParent(B,b->A);
2232   if (o_nz == PETSC_DEFAULT) o_nz = 0;
2233   ierr = MatCreateSeqBAIJ(PETSC_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
2234   PLogObjectParent(B,b->B);
2235 
2236   /* build cache for off array entries formed */
2237   ierr = StashCreate_Private(comm,bs,&b->stash); CHKERRQ(ierr);
2238   b->donotstash  = 0;
2239   b->colmap      = 0;
2240   b->garray      = 0;
2241   b->roworiented = 1;
2242 
2243   /* stuff used in block assembly */
2244   b->barray       = 0;
2245 
2246   /* stuff used for matrix vector multiply */
2247   b->lvec         = 0;
2248   b->Mvctx        = 0;
2249 
2250   /* stuff for MatGetRow() */
2251   b->rowindices   = 0;
2252   b->rowvalues    = 0;
2253   b->getrowactive = PETSC_FALSE;
2254 
2255   /* hash table stuff */
2256   b->ht           = 0;
2257   b->hd           = 0;
2258   b->ht_size      = 0;
2259   b->ht_flag      = 0;
2260   b->ht_fact      = 0;
2261   b->ht_total_ct  = 0;
2262   b->ht_insert_ct = 0;
2263 
2264   *A = B;
2265   ierr = OptionsHasName(PETSC_NULL,"-mat_use_hash_table",&flg); CHKERRQ(ierr);
2266   if (flg) {
2267     double fact = 1.39;
2268     ierr = MatSetOption(B,MAT_USE_HASH_TABLE); CHKERRQ(ierr);
2269     ierr = OptionsGetDouble(PETSC_NULL,"-mat_use_hash_table",&fact,&flg); CHKERRQ(ierr);
2270     if (fact <= 1.0) fact = 1.39;
2271     ierr = MatMPIBAIJSetHashTableFactor(B,fact); CHKERRQ(ierr);
2272     PLogInfo(0,"MatCreateMPIBAIJ:Hash table Factor used %5.2f\n",fact);
2273   }
2274   ierr = PetscObjectComposeFunction((PetscObject)B,"MatGetDiagonalBlock_C",
2275                                      "MatGetDiagonalBlock_MPIBAIJ",
2276                                      (void*)MatGetDiagonalBlock_MPIBAIJ);CHKERRQ(ierr);
2277   PetscFunctionReturn(0);
2278 }
2279 
2280 #undef __FUNC__
2281 #define __FUNC__ "MatDuplicate_MPIBAIJ"
2282 static int MatDuplicate_MPIBAIJ(Mat matin,MatDuplicateOption cpvalues,Mat *newmat)
2283 {
2284   Mat         mat;
2285   Mat_MPIBAIJ *a,*oldmat = (Mat_MPIBAIJ *) matin->data;
2286   int         ierr, len=0, flg;
2287 
2288   PetscFunctionBegin;
2289   *newmat       = 0;
2290   PetscHeaderCreate(mat,_p_Mat,struct _MatOps,MAT_COOKIE,MATMPIBAIJ,"Mat",matin->comm,MatDestroy,MatView);
2291   PLogObjectCreate(mat);
2292   mat->data       = (void *) (a = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(a);
2293   PetscMemcpy(mat->ops,&MatOps_Values,sizeof(struct _MatOps));
2294   mat->ops->destroy    = MatDestroy_MPIBAIJ;
2295   mat->ops->view       = MatView_MPIBAIJ;
2296   mat->factor          = matin->factor;
2297   mat->assembled       = PETSC_TRUE;
2298   mat->insertmode      = NOT_SET_VALUES;
2299 
2300   a->m = mat->m   = oldmat->m;
2301   a->n = mat->n   = oldmat->n;
2302   a->M = mat->M   = oldmat->M;
2303   a->N = mat->N   = oldmat->N;
2304 
2305   a->bs  = oldmat->bs;
2306   a->bs2 = oldmat->bs2;
2307   a->mbs = oldmat->mbs;
2308   a->nbs = oldmat->nbs;
2309   a->Mbs = oldmat->Mbs;
2310   a->Nbs = oldmat->Nbs;
2311 
2312   a->rstart       = oldmat->rstart;
2313   a->rend         = oldmat->rend;
2314   a->cstart       = oldmat->cstart;
2315   a->cend         = oldmat->cend;
2316   a->size         = oldmat->size;
2317   a->rank         = oldmat->rank;
2318   a->donotstash   = oldmat->donotstash;
2319   a->roworiented  = oldmat->roworiented;
2320   a->rowindices   = 0;
2321   a->rowvalues    = 0;
2322   a->getrowactive = PETSC_FALSE;
2323   a->barray       = 0;
2324   a->rstart_bs    = oldmat->rstart_bs;
2325   a->rend_bs      = oldmat->rend_bs;
2326   a->cstart_bs    = oldmat->cstart_bs;
2327   a->cend_bs      = oldmat->cend_bs;
2328 
2329   /* hash table stuff */
2330   a->ht           = 0;
2331   a->hd           = 0;
2332   a->ht_size      = 0;
2333   a->ht_flag      = oldmat->ht_flag;
2334   a->ht_fact      = oldmat->ht_fact;
2335   a->ht_total_ct  = 0;
2336   a->ht_insert_ct = 0;
2337 
2338 
2339   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
2340   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIBAIJ));
2341   a->cowners = a->rowners + a->size + 2;
2342   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
2343   ierr = StashCreate_Private(matin->comm,oldmat->bs,&a->stash); CHKERRQ(ierr);
2344   if (oldmat->colmap) {
2345 #if defined (USE_CTABLE)
2346   ierr = TableCreateCopy(oldmat->colmap,&a->colmap); CHKERRQ(ierr);
2347 #else
2348     a->colmap = (int *) PetscMalloc((a->Nbs)*sizeof(int));CHKPTRQ(a->colmap);
2349     PLogObjectMemory(mat,(a->Nbs)*sizeof(int));
2350     PetscMemcpy(a->colmap,oldmat->colmap,(a->Nbs)*sizeof(int));
2351 #endif
2352   } else a->colmap = 0;
2353   if (oldmat->garray && (len = ((Mat_SeqBAIJ *) (oldmat->B->data))->nbs)) {
2354     a->garray = (int *) PetscMalloc(len*sizeof(int)); CHKPTRQ(a->garray);
2355     PLogObjectMemory(mat,len*sizeof(int));
2356     PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
2357   } else a->garray = 0;
2358 
2359   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
2360   PLogObjectParent(mat,a->lvec);
2361   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
2362 
2363   PLogObjectParent(mat,a->Mvctx);
2364   ierr =  MatDuplicate(oldmat->A,cpvalues,&a->A); CHKERRQ(ierr);
2365   PLogObjectParent(mat,a->A);
2366   ierr =  MatDuplicate(oldmat->B,cpvalues,&a->B); CHKERRQ(ierr);
2367   PLogObjectParent(mat,a->B);
2368   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
2369   ierr = FListDuplicate(mat->qlist,&matin->qlist);CHKERRQ(ierr);
2370   if (flg) {
2371     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
2372   }
2373   *newmat = mat;
2374   PetscFunctionReturn(0);
2375 }
2376 
2377 #include "sys.h"
2378 
2379 #undef __FUNC__
2380 #define __FUNC__ "MatLoad_MPIBAIJ"
2381 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
2382 {
2383   Mat          A;
2384   int          i, nz, ierr, j,rstart, rend, fd;
2385   Scalar       *vals,*buf;
2386   MPI_Comm     comm = ((PetscObject)viewer)->comm;
2387   MPI_Status   status;
2388   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
2389   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
2390   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,Mbs,mbs,extra_rows;
2391   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
2392   int          dcount,kmax,k,nzcount,tmp;
2393 
2394   PetscFunctionBegin;
2395   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
2396 
2397   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
2398   if (!rank) {
2399     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
2400     ierr = PetscBinaryRead(fd,(char *)header,4,PETSC_INT); CHKERRQ(ierr);
2401     if (header[0] != MAT_COOKIE) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"not matrix object");
2402     if (header[3] < 0) {
2403       SETERRQ(PETSC_ERR_FILE_UNEXPECTED,1,"Matrix stored in special format, cannot load as MPIBAIJ");
2404     }
2405   }
2406 
2407   ierr = MPI_Bcast(header+1,3,MPI_INT,0,comm);CHKERRQ(ierr);
2408   M = header[1]; N = header[2];
2409 
2410   if (M != N) SETERRQ(PETSC_ERR_SUP,0,"Can only do square matrices");
2411 
2412   /*
2413      This code adds extra rows to make sure the number of rows is
2414      divisible by the blocksize
2415   */
2416   Mbs        = M/bs;
2417   extra_rows = bs - M + bs*(Mbs);
2418   if (extra_rows == bs) extra_rows = 0;
2419   else                  Mbs++;
2420   if (extra_rows &&!rank) {
2421     PLogInfo(0,"MatLoad_MPIBAIJ:Padding loaded matrix to match blocksize\n");
2422   }
2423 
2424   /* determine ownership of all rows */
2425   mbs = Mbs/size + ((Mbs % size) > rank);
2426   m   = mbs * bs;
2427   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
2428   browners = rowners + size + 1;
2429   ierr = MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
2430   rowners[0] = 0;
2431   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
2432   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
2433   rstart = rowners[rank];
2434   rend   = rowners[rank+1];
2435 
2436   /* distribute row lengths to all processors */
2437   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
2438   if (!rank) {
2439     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
2440     ierr = PetscBinaryRead(fd,rowlengths,M,PETSC_INT); CHKERRQ(ierr);
2441     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
2442     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
2443     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
2444     ierr = MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);CHKERRQ(ierr);
2445     PetscFree(sndcounts);
2446   } else {
2447     ierr = MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);CHKERRQ(ierr);
2448   }
2449 
2450   if (!rank) {
2451     /* calculate the number of nonzeros on each processor */
2452     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
2453     PetscMemzero(procsnz,size*sizeof(int));
2454     for ( i=0; i<size; i++ ) {
2455       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
2456         procsnz[i] += rowlengths[j];
2457       }
2458     }
2459     PetscFree(rowlengths);
2460 
2461     /* determine max buffer needed and allocate it */
2462     maxnz = 0;
2463     for ( i=0; i<size; i++ ) {
2464       maxnz = PetscMax(maxnz,procsnz[i]);
2465     }
2466     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
2467 
2468     /* read in my part of the matrix column indices  */
2469     nz = procsnz[0];
2470     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
2471     mycols = ibuf;
2472     if (size == 1)  nz -= extra_rows;
2473     ierr = PetscBinaryRead(fd,mycols,nz,PETSC_INT); CHKERRQ(ierr);
2474     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
2475 
2476     /* read in every ones (except the last) and ship off */
2477     for ( i=1; i<size-1; i++ ) {
2478       nz   = procsnz[i];
2479       ierr = PetscBinaryRead(fd,cols,nz,PETSC_INT); CHKERRQ(ierr);
2480       ierr = MPI_Send(cols,nz,MPI_INT,i,tag,comm);CHKERRQ(ierr);
2481     }
2482     /* read in the stuff for the last proc */
2483     if ( size != 1 ) {
2484       nz   = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
2485       ierr = PetscBinaryRead(fd,cols,nz,PETSC_INT); CHKERRQ(ierr);
2486       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
2487       ierr = MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);CHKERRQ(ierr);
2488     }
2489     PetscFree(cols);
2490   } else {
2491     /* determine buffer space needed for message */
2492     nz = 0;
2493     for ( i=0; i<m; i++ ) {
2494       nz += locrowlens[i];
2495     }
2496     ibuf   = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
2497     mycols = ibuf;
2498     /* receive message of column indices*/
2499     ierr = MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);CHKERRQ(ierr);
2500     ierr = MPI_Get_count(&status,MPI_INT,&maxnz);CHKERRQ(ierr);
2501     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"something is wrong with file");
2502   }
2503 
2504   /* loop over local rows, determining number of off diagonal entries */
2505   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
2506   odlens = dlens + (rend-rstart);
2507   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
2508   PetscMemzero(mask,3*Mbs*sizeof(int));
2509   masked1 = mask    + Mbs;
2510   masked2 = masked1 + Mbs;
2511   rowcount = 0; nzcount = 0;
2512   for ( i=0; i<mbs; i++ ) {
2513     dcount  = 0;
2514     odcount = 0;
2515     for ( j=0; j<bs; j++ ) {
2516       kmax = locrowlens[rowcount];
2517       for ( k=0; k<kmax; k++ ) {
2518         tmp = mycols[nzcount++]/bs;
2519         if (!mask[tmp]) {
2520           mask[tmp] = 1;
2521           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
2522           else masked1[dcount++] = tmp;
2523         }
2524       }
2525       rowcount++;
2526     }
2527 
2528     dlens[i]  = dcount;
2529     odlens[i] = odcount;
2530 
2531     /* zero out the mask elements we set */
2532     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
2533     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
2534   }
2535 
2536   /* create our matrix */
2537   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);
2538          CHKERRQ(ierr);
2539   A = *newmat;
2540   MatSetOption(A,MAT_COLUMNS_SORTED);
2541 
2542   if (!rank) {
2543     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
2544     /* read in my part of the matrix numerical values  */
2545     nz = procsnz[0];
2546     vals = buf;
2547     mycols = ibuf;
2548     if (size == 1)  nz -= extra_rows;
2549     ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
2550     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
2551 
2552     /* insert into matrix */
2553     jj      = rstart*bs;
2554     for ( i=0; i<m; i++ ) {
2555       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
2556       mycols += locrowlens[i];
2557       vals   += locrowlens[i];
2558       jj++;
2559     }
2560     /* read in other processors (except the last one) and ship out */
2561     for ( i=1; i<size-1; i++ ) {
2562       nz   = procsnz[i];
2563       vals = buf;
2564       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
2565       ierr = MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);CHKERRQ(ierr);
2566     }
2567     /* the last proc */
2568     if ( size != 1 ){
2569       nz   = procsnz[i] - extra_rows;
2570       vals = buf;
2571       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
2572       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
2573       ierr = MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);CHKERRQ(ierr);
2574     }
2575     PetscFree(procsnz);
2576   } else {
2577     /* receive numeric values */
2578     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
2579 
2580     /* receive message of values*/
2581     vals   = buf;
2582     mycols = ibuf;
2583     ierr   = MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);CHKERRQ(ierr);
2584     ierr   = MPI_Get_count(&status,MPIU_SCALAR,&maxnz);CHKERRQ(ierr);
2585     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"something is wrong with file");
2586 
2587     /* insert into matrix */
2588     jj      = rstart*bs;
2589     for ( i=0; i<m; i++ ) {
2590       ierr    = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
2591       mycols += locrowlens[i];
2592       vals   += locrowlens[i];
2593       jj++;
2594     }
2595   }
2596   PetscFree(locrowlens);
2597   PetscFree(buf);
2598   PetscFree(ibuf);
2599   PetscFree(rowners);
2600   PetscFree(dlens);
2601   PetscFree(mask);
2602   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2603   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2604   PetscFunctionReturn(0);
2605 }
2606 
2607 
2608 
2609 #undef __FUNC__
2610 #define __FUNC__ "MatMPIBAIJSetHashTableFactor"
2611 /*@
2612    MatMPIBAIJSetHashTableFactor - Sets the factor required to compute the size of the HashTable.
2613 
2614    Input Parameters:
2615 .  mat  - the matrix
2616 .  fact - factor
2617 
2618    Collective on Mat
2619 
2620   Notes:
2621    This can also be set by the command line option: -mat_use_hash_table fact
2622 
2623 .keywords: matrix, hashtable, factor, HT
2624 
2625 .seealso: MatSetOption()
2626 @*/
2627 int MatMPIBAIJSetHashTableFactor(Mat mat,double fact)
2628 {
2629   Mat_MPIBAIJ *baij;
2630 
2631   PetscFunctionBegin;
2632   PetscValidHeaderSpecific(mat,MAT_COOKIE);
2633   if (mat->type != MATMPIBAIJ) {
2634       SETERRQ(PETSC_ERR_ARG_WRONG,1,"Incorrect matrix type. Use MPIBAIJ only.");
2635   }
2636   baij = (Mat_MPIBAIJ*) mat->data;
2637   baij->ht_fact = fact;
2638   PetscFunctionReturn(0);
2639 }
2640