xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision fa46199cd6736f835a9dfd5fb06fcbb3763bab4e)
1 #ifdef PETSC_RCS_HEADER
2 static char vcid[] = "$Id: mpibaij.c,v 1.151 1999/02/17 19:29:46 balay Exp balay $";
3 #endif
4 
5 #include "src/mat/impls/baij/mpi/mpibaij.h"   /*I  "mat.h"  I*/
6 #include "src/vec/vecimpl.h"
7 
8 extern int MatSetUpMultiply_MPIBAIJ(Mat);
9 extern int DisAssemble_MPIBAIJ(Mat);
10 extern int MatIncreaseOverlap_MPIBAIJ(Mat,int,IS *,int);
11 extern int MatGetSubMatrices_MPIBAIJ(Mat,int,IS *,IS *,MatReuse,Mat **);
12 extern int MatGetValues_SeqBAIJ(Mat,int,int *,int,int *,Scalar *);
13 
14 /*
15      Local utility routine that creates a mapping from the global column
16    number to the local number in the off-diagonal part of the local
17    storage of the matrix.  This is done in a non scable way since the
18    length of colmap equals the global matrix length.
19 */
20 #undef __FUNC__
21 #define __FUNC__ "CreateColmap_MPIBAIJ_Private"
22 static int CreateColmap_MPIBAIJ_Private(Mat mat)
23 {
24   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
25   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
26   int         nbs = B->nbs,i,bs=B->bs;
27 #if defined (USE_CTABLE)
28   int         ierr;
29 #endif
30 
31   PetscFunctionBegin;
32 #if defined (USE_CTABLE)
33   ierr = TableCreate(baij->nbs/5,&baij->colmap); CHKERRQ(ierr);
34   for ( i=0; i<nbs; i++ ){
35     ierr = TableAdd(baij->colmap,baij->garray[i]+1,i*bs+1);CHKERRQ(ierr);
36   }
37 #else
38   baij->colmap = (int *) PetscMalloc((baij->Nbs+1)*sizeof(int));CHKPTRQ(baij->colmap);
39   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
40   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
41   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i*bs+1;
42 #endif
43   PetscFunctionReturn(0);
44 }
45 
46 #define CHUNKSIZE  10
47 
48 #define  MatSetValues_SeqBAIJ_A_Private(row,col,value,addv) \
49 { \
50  \
51     brow = row/bs;  \
52     rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
53     rmax = aimax[brow]; nrow = ailen[brow]; \
54       bcol = col/bs; \
55       ridx = row % bs; cidx = col % bs; \
56       low = 0; high = nrow; \
57       while (high-low > 3) { \
58         t = (low+high)/2; \
59         if (rp[t] > bcol) high = t; \
60         else              low  = t; \
61       } \
62       for ( _i=low; _i<high; _i++ ) { \
63         if (rp[_i] > bcol) break; \
64         if (rp[_i] == bcol) { \
65           bap  = ap +  bs2*_i + bs*cidx + ridx; \
66           if (addv == ADD_VALUES) *bap += value;  \
67           else                    *bap  = value;  \
68           goto a_noinsert; \
69         } \
70       } \
71       if (a->nonew == 1) goto a_noinsert; \
72       else if (a->nonew == -1) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero into matrix"); \
73       if (nrow >= rmax) { \
74         /* there is no extra room in row, therefore enlarge */ \
75         int    new_nz = ai[a->mbs] + CHUNKSIZE,len,*new_i,*new_j; \
76         Scalar *new_a; \
77  \
78         if (a->nonew == -2) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero in the matrix"); \
79  \
80         /* malloc new storage space */ \
81         len     = new_nz*(sizeof(int)+bs2*sizeof(Scalar))+(a->mbs+1)*sizeof(int); \
82         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
83         new_j   = (int *) (new_a + bs2*new_nz); \
84         new_i   = new_j + new_nz; \
85  \
86         /* copy over old data into new slots */ \
87         for ( ii=0; ii<brow+1; ii++ ) {new_i[ii] = ai[ii];} \
88         for ( ii=brow+1; ii<a->mbs+1; ii++ ) {new_i[ii] = ai[ii]+CHUNKSIZE;} \
89         PetscMemcpy(new_j,aj,(ai[brow]+nrow)*sizeof(int)); \
90         len = (new_nz - CHUNKSIZE - ai[brow] - nrow); \
91         PetscMemcpy(new_j+ai[brow]+nrow+CHUNKSIZE,aj+ai[brow]+nrow, \
92                                                            len*sizeof(int)); \
93         PetscMemcpy(new_a,aa,(ai[brow]+nrow)*bs2*sizeof(Scalar)); \
94         PetscMemzero(new_a+bs2*(ai[brow]+nrow),bs2*CHUNKSIZE*sizeof(Scalar)); \
95         PetscMemcpy(new_a+bs2*(ai[brow]+nrow+CHUNKSIZE), \
96                     aa+bs2*(ai[brow]+nrow),bs2*len*sizeof(Scalar));  \
97         /* free up old matrix storage */ \
98         PetscFree(a->a);  \
99         if (!a->singlemalloc) {PetscFree(a->i);PetscFree(a->j);} \
100         aa = a->a = new_a; ai = a->i = new_i; aj = a->j = new_j;  \
101         a->singlemalloc = 1; \
102  \
103         rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
104         rmax = aimax[brow] = aimax[brow] + CHUNKSIZE; \
105         PLogObjectMemory(A,CHUNKSIZE*(sizeof(int) + bs2*sizeof(Scalar))); \
106         a->maxnz += bs2*CHUNKSIZE; \
107         a->reallocs++; \
108         a->nz++; \
109       } \
110       N = nrow++ - 1;  \
111       /* shift up all the later entries in this row */ \
112       for ( ii=N; ii>=_i; ii-- ) { \
113         rp[ii+1] = rp[ii]; \
114         PetscMemcpy(ap+bs2*(ii+1),ap+bs2*(ii),bs2*sizeof(Scalar)); \
115       } \
116       if (N>=_i) PetscMemzero(ap+bs2*_i,bs2*sizeof(Scalar));  \
117       rp[_i]                      = bcol;  \
118       ap[bs2*_i + bs*cidx + ridx] = value;  \
119       a_noinsert:; \
120     ailen[brow] = nrow; \
121 }
122 
123 #define  MatSetValues_SeqBAIJ_B_Private(row,col,value,addv) \
124 { \
125  \
126     brow = row/bs;  \
127     rp   = bj + bi[brow]; ap = ba + bs2*bi[brow]; \
128     rmax = bimax[brow]; nrow = bilen[brow]; \
129       bcol = col/bs; \
130       ridx = row % bs; cidx = col % bs; \
131       low = 0; high = nrow; \
132       while (high-low > 3) { \
133         t = (low+high)/2; \
134         if (rp[t] > bcol) high = t; \
135         else              low  = t; \
136       } \
137       for ( _i=low; _i<high; _i++ ) { \
138         if (rp[_i] > bcol) break; \
139         if (rp[_i] == bcol) { \
140           bap  = ap +  bs2*_i + bs*cidx + ridx; \
141           if (addv == ADD_VALUES) *bap += value;  \
142           else                    *bap  = value;  \
143           goto b_noinsert; \
144         } \
145       } \
146       if (b->nonew == 1) goto b_noinsert; \
147       else if (b->nonew == -1) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero into matrix"); \
148       if (nrow >= rmax) { \
149         /* there is no extra room in row, therefore enlarge */ \
150         int    new_nz = bi[b->mbs] + CHUNKSIZE,len,*new_i,*new_j; \
151         Scalar *new_a; \
152  \
153         if (b->nonew == -2) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero in the matrix"); \
154  \
155         /* malloc new storage space */ \
156         len     = new_nz*(sizeof(int)+bs2*sizeof(Scalar))+(b->mbs+1)*sizeof(int); \
157         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
158         new_j   = (int *) (new_a + bs2*new_nz); \
159         new_i   = new_j + new_nz; \
160  \
161         /* copy over old data into new slots */ \
162         for ( ii=0; ii<brow+1; ii++ ) {new_i[ii] = bi[ii];} \
163         for ( ii=brow+1; ii<b->mbs+1; ii++ ) {new_i[ii] = bi[ii]+CHUNKSIZE;} \
164         PetscMemcpy(new_j,bj,(bi[brow]+nrow)*sizeof(int)); \
165         len = (new_nz - CHUNKSIZE - bi[brow] - nrow); \
166         PetscMemcpy(new_j+bi[brow]+nrow+CHUNKSIZE,bj+bi[brow]+nrow, \
167                                                            len*sizeof(int)); \
168         PetscMemcpy(new_a,ba,(bi[brow]+nrow)*bs2*sizeof(Scalar)); \
169         PetscMemzero(new_a+bs2*(bi[brow]+nrow),bs2*CHUNKSIZE*sizeof(Scalar)); \
170         PetscMemcpy(new_a+bs2*(bi[brow]+nrow+CHUNKSIZE), \
171                     ba+bs2*(bi[brow]+nrow),bs2*len*sizeof(Scalar));  \
172         /* free up old matrix storage */ \
173         PetscFree(b->a);  \
174         if (!b->singlemalloc) {PetscFree(b->i);PetscFree(b->j);} \
175         ba = b->a = new_a; bi = b->i = new_i; bj = b->j = new_j;  \
176         b->singlemalloc = 1; \
177  \
178         rp   = bj + bi[brow]; ap = ba + bs2*bi[brow]; \
179         rmax = bimax[brow] = bimax[brow] + CHUNKSIZE; \
180         PLogObjectMemory(B,CHUNKSIZE*(sizeof(int) + bs2*sizeof(Scalar))); \
181         b->maxnz += bs2*CHUNKSIZE; \
182         b->reallocs++; \
183         b->nz++; \
184       } \
185       N = nrow++ - 1;  \
186       /* shift up all the later entries in this row */ \
187       for ( ii=N; ii>=_i; ii-- ) { \
188         rp[ii+1] = rp[ii]; \
189         PetscMemcpy(ap+bs2*(ii+1),ap+bs2*(ii),bs2*sizeof(Scalar)); \
190       } \
191       if (N>=_i) PetscMemzero(ap+bs2*_i,bs2*sizeof(Scalar));  \
192       rp[_i]                      = bcol;  \
193       ap[bs2*_i + bs*cidx + ridx] = value;  \
194       b_noinsert:; \
195     bilen[brow] = nrow; \
196 }
197 
198 #undef __FUNC__
199 #define __FUNC__ "MatSetValues_MPIBAIJ"
200 int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
201 {
202   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
203   Scalar      value;
204   int         ierr,i,j,row,col;
205   int         roworiented = baij->roworiented,rstart_orig=baij->rstart_bs ;
206   int         rend_orig=baij->rend_bs,cstart_orig=baij->cstart_bs;
207   int         cend_orig=baij->cend_bs,bs=baij->bs;
208 
209   /* Some Variables required in the macro */
210   Mat         A = baij->A;
211   Mat_SeqBAIJ *a = (Mat_SeqBAIJ *) (A)->data;
212   int         *aimax=a->imax,*ai=a->i,*ailen=a->ilen,*aj=a->j;
213   Scalar      *aa=a->a;
214 
215   Mat         B = baij->B;
216   Mat_SeqBAIJ *b = (Mat_SeqBAIJ *) (B)->data;
217   int         *bimax=b->imax,*bi=b->i,*bilen=b->ilen,*bj=b->j;
218   Scalar      *ba=b->a;
219 
220   int         *rp,ii,nrow,_i,rmax,N,brow,bcol;
221   int         low,high,t,ridx,cidx,bs2=a->bs2;
222   Scalar      *ap,*bap;
223 
224   PetscFunctionBegin;
225   for ( i=0; i<m; i++ ) {
226     if (im[i] < 0) continue;
227 #if defined(USE_PETSC_BOPT_g)
228     if (im[i] >= baij->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
229 #endif
230     if (im[i] >= rstart_orig && im[i] < rend_orig) {
231       row = im[i] - rstart_orig;
232       for ( j=0; j<n; j++ ) {
233         if (in[j] >= cstart_orig && in[j] < cend_orig){
234           col = in[j] - cstart_orig;
235           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
236           MatSetValues_SeqBAIJ_A_Private(row,col,value,addv);
237           /* ierr = MatSetValues_SeqBAIJ(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
238         }
239         else if (in[j] < 0) continue;
240 #if defined(USE_PETSC_BOPT_g)
241         else if (in[j] >= baij->N) {SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Col too large");}
242 #endif
243         else {
244           if (mat->was_assembled) {
245             if (!baij->colmap) {
246               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
247             }
248 #if defined (USE_CTABLE)
249             ierr = TableFind(baij->colmap, in[j]/bs + 1,&col); CHKERRQ(ierr);
250             col  = col - 1 + in[j]%bs;
251 #else
252             col = baij->colmap[in[j]/bs] - 1 + in[j]%bs;
253 #endif
254             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
255               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
256               col =  in[j];
257               /* Reinitialize the variables required by MatSetValues_SeqBAIJ_B_Private() */
258               B = baij->B;
259               b = (Mat_SeqBAIJ *) (B)->data;
260               bimax=b->imax;bi=b->i;bilen=b->ilen;bj=b->j;
261               ba=b->a;
262             }
263           } else col = in[j];
264           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
265           MatSetValues_SeqBAIJ_B_Private(row,col,value,addv);
266           /* ierr = MatSetValues_SeqBAIJ(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
267         }
268       }
269     } else {
270       if (roworiented && !baij->donotstash) {
271         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
272       } else {
273         if (!baij->donotstash) {
274           row = im[i];
275 	  for ( j=0; j<n; j++ ) {
276 	    ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
277           }
278         }
279       }
280     }
281   }
282   PetscFunctionReturn(0);
283 }
284 
285 extern int MatSetValuesBlocked_SeqBAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
286 #undef __FUNC__
287 #define __FUNC__ "MatSetValuesBlocked_MPIBAIJ"
288 int MatSetValuesBlocked_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
289 {
290   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
291   Scalar      *value,*barray=baij->barray;
292   int         ierr,i,j,ii,jj,row,col,k,l;
293   int         roworiented = baij->roworiented,rstart=baij->rstart ;
294   int         rend=baij->rend,cstart=baij->cstart,stepval;
295   int         cend=baij->cend,bs=baij->bs,bs2=baij->bs2;
296 
297   if(!barray) {
298     baij->barray = barray = (Scalar*) PetscMalloc(bs2*sizeof(Scalar)); CHKPTRQ(barray);
299   }
300 
301   if (roworiented) {
302     stepval = (n-1)*bs;
303   } else {
304     stepval = (m-1)*bs;
305   }
306   for ( i=0; i<m; i++ ) {
307     if (im[i] < 0) continue;
308 #if defined(USE_PETSC_BOPT_g)
309     if (im[i] >= baij->Mbs) SETERRQ2(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large, row %d max %d",im[i],baij->Mbs);
310 #endif
311     if (im[i] >= rstart && im[i] < rend) {
312       row = im[i] - rstart;
313       for ( j=0; j<n; j++ ) {
314         /* If NumCol = 1 then a copy is not required */
315         if ((roworiented) && (n == 1)) {
316           barray = v + i*bs2;
317         } else if((!roworiented) && (m == 1)) {
318           barray = v + j*bs2;
319         } else { /* Here a copy is required */
320           if (roworiented) {
321             value = v + i*(stepval+bs)*bs + j*bs;
322           } else {
323             value = v + j*(stepval+bs)*bs + i*bs;
324           }
325           for ( ii=0; ii<bs; ii++,value+=stepval ) {
326             for (jj=0; jj<bs; jj++ ) {
327               *barray++  = *value++;
328             }
329           }
330           barray -=bs2;
331         }
332 
333         if (in[j] >= cstart && in[j] < cend){
334           col  = in[j] - cstart;
335           ierr = MatSetValuesBlocked_SeqBAIJ(baij->A,1,&row,1,&col,barray,addv);CHKERRQ(ierr);
336         }
337         else if (in[j] < 0) continue;
338 #if defined(USE_PETSC_BOPT_g)
339         else if (in[j] >= baij->Nbs) {SETERRQ2(PETSC_ERR_ARG_OUTOFRANGE,0,"Column too large, col %d max %d",in[j],baij->Nbs);}
340 #endif
341         else {
342           if (mat->was_assembled) {
343             if (!baij->colmap) {
344               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
345             }
346 
347 #if defined(USE_PETSC_BOPT_g)
348 #if defined (USE_CTABLE)
349             { int data;
350             ierr = TableFind(baij->colmap,in[j]+1,&data); CHKERRQ(ierr);
351             if((data - 1) % bs)
352 	      {SETERRQ(PETSC_ERR_PLIB,0,"Incorrect colmap");}
353             }
354 #else
355             if ((baij->colmap[in[j]] - 1) % bs) {SETERRQ(PETSC_ERR_PLIB,0,"Incorrect colmap");}
356 #endif
357 #endif
358 #if defined (USE_CTABLE)
359 	    ierr = TableFind(baij->colmap,in[j]+1,&col); CHKERRQ(ierr);
360             col  = (col - 1)/bs;
361 #else
362             col = (baij->colmap[in[j]] - 1)/bs;
363 #endif
364             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
365               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
366               col =  in[j];
367             }
368           }
369           else col = in[j];
370           ierr = MatSetValuesBlocked_SeqBAIJ(baij->B,1,&row,1,&col,barray,addv);CHKERRQ(ierr);
371         }
372       }
373     } else {
374       if (!baij->donotstash) {
375         if (roworiented ) {
376           row   = im[i]*bs;
377           value = v + i*(stepval+bs)*bs;
378           for ( j=0; j<bs; j++,row++ ) {
379             for ( k=0; k<n; k++ ) {
380               for ( col=in[k]*bs,l=0; l<bs; l++,col++) {
381                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
382               }
383             }
384           }
385         } else {
386           for ( j=0; j<n; j++ ) {
387             value = v + j*(stepval+bs)*bs + i*bs;
388             col   = in[j]*bs;
389             for ( k=0; k<bs; k++,col++,value+=stepval) {
390               for ( row = im[i]*bs,l=0; l<bs; l++,row++) {
391                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
392               }
393             }
394           }
395         }
396       }
397     }
398   }
399   PetscFunctionReturn(0);
400 }
401 #define HASH_KEY 0.6180339887
402 /* #define HASH1(size,key) ((int)((size)*fmod(((key)*HASH_KEY),1))) */
403 #define HASH(size,key,tmp) (tmp = (key)*HASH_KEY,(int)((size)*(tmp-(int)tmp)))
404 /* #define HASH(size,key,tmp) ((int)((size)*fmod(((key)*HASH_KEY),1))) */
405 #undef __FUNC__
406 #define __FUNC__ "MatSetValues_MPIBAIJ_HT"
407 int MatSetValues_MPIBAIJ_HT(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
408 {
409   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
410   int         ierr,i,j,row,col;
411   int         roworiented = baij->roworiented,rstart_orig=baij->rstart_bs ;
412   int         rend_orig=baij->rend_bs,Nbs=baij->Nbs;
413   int         h1,key,size=baij->ht_size,bs=baij->bs,*HT=baij->ht,idx;
414   double      tmp;
415   Scalar      ** HD = baij->hd,value;
416 #if defined(USE_PETSC_BOPT_g)
417   int         total_ct=baij->ht_total_ct,insert_ct=baij->ht_insert_ct;
418 #endif
419 
420   PetscFunctionBegin;
421 
422   for ( i=0; i<m; i++ ) {
423 #if defined(USE_PETSC_BOPT_g)
424     if (im[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
425     if (im[i] >= baij->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
426 #endif
427       row = im[i];
428     if (row >= rstart_orig && row < rend_orig) {
429       for ( j=0; j<n; j++ ) {
430         col = in[j];
431         if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
432         /* Look up into the Hash Table */
433         key = (row/bs)*Nbs+(col/bs)+1;
434         h1  = HASH(size,key,tmp);
435 
436 
437         idx = h1;
438 #if defined(USE_PETSC_BOPT_g)
439         insert_ct++;
440         total_ct++;
441         if (HT[idx] != key) {
442           for ( idx=h1; (idx<size) && (HT[idx]!=key); idx++,total_ct++);
443           if (idx == size) {
444             for ( idx=0; (idx<h1) && (HT[idx]!=key); idx++,total_ct++);
445             if (idx == h1) {
446               SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"(row,col) has no entry in the hash table");
447             }
448           }
449         }
450 #else
451         if (HT[idx] != key) {
452           for ( idx=h1; (idx<size) && (HT[idx]!=key); idx++);
453           if (idx == size) {
454             for ( idx=0; (idx<h1) && (HT[idx]!=key); idx++);
455             if (idx == h1) {
456               SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"(row,col) has no entry in the hash table");
457             }
458           }
459         }
460 #endif
461         /* A HASH table entry is found, so insert the values at the correct address */
462         if (addv == ADD_VALUES) *(HD[idx]+ (col % bs)*bs + (row % bs)) += value;
463         else                    *(HD[idx]+ (col % bs)*bs + (row % bs))  = value;
464       }
465     } else {
466       if (roworiented && !baij->donotstash) {
467         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
468       } else {
469         if (!baij->donotstash) {
470           row = im[i];
471 	  for ( j=0; j<n; j++ ) {
472 	    ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
473           }
474         }
475       }
476     }
477   }
478 #if defined(USE_PETSC_BOPT_g)
479   baij->ht_total_ct = total_ct;
480   baij->ht_insert_ct = insert_ct;
481 #endif
482   PetscFunctionReturn(0);
483 }
484 
485 #undef __FUNC__
486 #define __FUNC__ "MatSetValuesBlocked_MPIBAIJ_HT"
487 int MatSetValuesBlocked_MPIBAIJ_HT(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
488 {
489   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
490   int         ierr,i,j,ii,jj,row,col,k,l;
491   int         roworiented = baij->roworiented,rstart=baij->rstart ;
492   int         rend=baij->rend,stepval,bs=baij->bs,bs2=baij->bs2;
493   int         h1,key,size=baij->ht_size,idx,*HT=baij->ht,Nbs=baij->Nbs;
494   double      tmp;
495   Scalar      ** HD = baij->hd,*value,*v_t,*baij_a;
496 #if defined(USE_PETSC_BOPT_g)
497   int         total_ct=baij->ht_total_ct,insert_ct=baij->ht_insert_ct;
498 #endif
499 
500   PetscFunctionBegin;
501 
502   if (roworiented) {
503     stepval = (n-1)*bs;
504   } else {
505     stepval = (m-1)*bs;
506   }
507   for ( i=0; i<m; i++ ) {
508 #if defined(USE_PETSC_BOPT_g)
509     if (im[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
510     if (im[i] >= baij->Mbs) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
511 #endif
512     row   = im[i];
513     v_t   = v + i*bs2;
514     if (row >= rstart && row < rend) {
515       for ( j=0; j<n; j++ ) {
516         col = in[j];
517 
518         /* Look up into the Hash Table */
519         key = row*Nbs+col+1;
520         h1  = HASH(size,key,tmp);
521 
522         idx = h1;
523 #if defined(USE_PETSC_BOPT_g)
524         total_ct++;
525         insert_ct++;
526        if (HT[idx] != key) {
527           for ( idx=h1; (idx<size) && (HT[idx]!=key); idx++,total_ct++);
528           if (idx == size) {
529             for ( idx=0; (idx<h1) && (HT[idx]!=key); idx++,total_ct++);
530             if (idx == h1) {
531               SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"(row,col) has no entry in the hash table");
532             }
533           }
534         }
535 #else
536         if (HT[idx] != key) {
537           for ( idx=h1; (idx<size) && (HT[idx]!=key); idx++);
538           if (idx == size) {
539             for ( idx=0; (idx<h1) && (HT[idx]!=key); idx++);
540             if (idx == h1) {
541               SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"(row,col) has no entry in the hash table");
542             }
543           }
544         }
545 #endif
546         baij_a = HD[idx];
547         if (roworiented) {
548           /*value = v + i*(stepval+bs)*bs + j*bs;*/
549           /* value = v + (i*(stepval+bs)+j)*bs; */
550           value = v_t;
551           v_t  += bs;
552           if (addv == ADD_VALUES) {
553             for ( ii=0; ii<bs; ii++,value+=stepval) {
554               for ( jj=ii; jj<bs2; jj+=bs ) {
555                 baij_a[jj]  += *value++;
556               }
557             }
558           } else {
559             for ( ii=0; ii<bs; ii++,value+=stepval) {
560               for ( jj=ii; jj<bs2; jj+=bs ) {
561                 baij_a[jj]  = *value++;
562               }
563             }
564           }
565         } else {
566           value = v + j*(stepval+bs)*bs + i*bs;
567           if (addv == ADD_VALUES) {
568             for ( ii=0; ii<bs; ii++,value+=stepval,baij_a+=bs ) {
569               for ( jj=0; jj<bs; jj++ ) {
570                 baij_a[jj]  += *value++;
571               }
572             }
573           } else {
574             for ( ii=0; ii<bs; ii++,value+=stepval,baij_a+=bs ) {
575               for ( jj=0; jj<bs; jj++ ) {
576                 baij_a[jj]  = *value++;
577               }
578             }
579           }
580         }
581       }
582     } else {
583       if (!baij->donotstash) {
584         if (roworiented ) {
585           row   = im[i]*bs;
586           value = v + i*(stepval+bs)*bs;
587           for ( j=0; j<bs; j++,row++ ) {
588             for ( k=0; k<n; k++ ) {
589               for ( col=in[k]*bs,l=0; l<bs; l++,col++) {
590                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
591               }
592             }
593           }
594         } else {
595           for ( j=0; j<n; j++ ) {
596             value = v + j*(stepval+bs)*bs + i*bs;
597             col   = in[j]*bs;
598             for ( k=0; k<bs; k++,col++,value+=stepval) {
599               for ( row = im[i]*bs,l=0; l<bs; l++,row++) {
600                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
601               }
602             }
603           }
604         }
605       }
606     }
607   }
608 #if defined(USE_PETSC_BOPT_g)
609   baij->ht_total_ct = total_ct;
610   baij->ht_insert_ct = insert_ct;
611 #endif
612   PetscFunctionReturn(0);
613 }
614 
615 #undef __FUNC__
616 #define __FUNC__ "MatGetValues_MPIBAIJ"
617 int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
618 {
619   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
620   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
621   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col,data;
622 
623   PetscFunctionBegin;
624   for ( i=0; i<m; i++ ) {
625     if (idxm[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
626     if (idxm[i] >= baij->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
627     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
628       row = idxm[i] - bsrstart;
629       for ( j=0; j<n; j++ ) {
630         if (idxn[j] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative column");
631         if (idxn[j] >= baij->N) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Column too large");
632         if (idxn[j] >= bscstart && idxn[j] < bscend){
633           col = idxn[j] - bscstart;
634           ierr = MatGetValues_SeqBAIJ(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
635         } else {
636           if (!baij->colmap) {
637             ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
638           }
639 #if defined (USE_CTABLE)
640           ierr = TableFind(baij->colmap,idxn[j]/bs+1,&data); CHKERRQ(ierr);
641           data --;
642 #else
643           data = baij->colmap[idxn[j]/bs]-1;
644 #endif
645           if((data < 0) || (baij->garray[data/bs] != idxn[j]/bs)) *(v+i*n+j) = 0.0;
646           else {
647             col  = data + idxn[j]%bs;
648             ierr = MatGetValues_SeqBAIJ(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
649           }
650         }
651       }
652     } else {
653       SETERRQ(PETSC_ERR_SUP,0,"Only local values currently supported");
654     }
655   }
656  PetscFunctionReturn(0);
657 }
658 
659 #undef __FUNC__
660 #define __FUNC__ "MatNorm_MPIBAIJ"
661 int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
662 {
663   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
664   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
665   int        ierr, i,bs2=baij->bs2;
666   double     sum = 0.0;
667   Scalar     *v;
668 
669   PetscFunctionBegin;
670   if (baij->size == 1) {
671     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
672   } else {
673     if (type == NORM_FROBENIUS) {
674       v = amat->a;
675       for (i=0; i<amat->nz*bs2; i++ ) {
676 #if defined(USE_PETSC_COMPLEX)
677         sum += PetscReal(PetscConj(*v)*(*v)); v++;
678 #else
679         sum += (*v)*(*v); v++;
680 #endif
681       }
682       v = bmat->a;
683       for (i=0; i<bmat->nz*bs2; i++ ) {
684 #if defined(USE_PETSC_COMPLEX)
685         sum += PetscReal(PetscConj(*v)*(*v)); v++;
686 #else
687         sum += (*v)*(*v); v++;
688 #endif
689       }
690       ierr = MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);CHKERRQ(ierr);
691       *norm = sqrt(*norm);
692     } else {
693       SETERRQ(PETSC_ERR_SUP,0,"No support for this norm yet");
694     }
695   }
696   PetscFunctionReturn(0);
697 }
698 
699 #undef __FUNC__
700 #define __FUNC__ "MatAssemblyBegin_MPIBAIJ"
701 int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
702 {
703   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
704   MPI_Comm    comm = mat->comm;
705   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
706   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
707   MPI_Request *send_waits,*recv_waits;
708   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
709   InsertMode  addv;
710   Scalar      *rvalues,*svalues;
711 
712   PetscFunctionBegin;
713   if (baij->donotstash) {
714     baij->svalues    = 0; baij->rvalues    = 0;
715     baij->nsends     = 0; baij->nrecvs     = 0;
716     baij->send_waits = 0; baij->recv_waits = 0;
717     baij->rmax       = 0;
718     PetscFunctionReturn(0);
719   }
720 
721   /* make sure all processors are either in INSERTMODE or ADDMODE */
722   ierr = MPI_Allreduce(&mat->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);CHKERRQ(ierr);
723   if (addv == (ADD_VALUES|INSERT_VALUES)) {
724     SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"Some processors inserted others added");
725   }
726   mat->insertmode = addv; /* in case this processor had no cache */
727 
728   /*  first count number of contributors to each processor */
729   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
730   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
731   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
732   for ( i=0; i<baij->stash.n; i++ ) {
733     idx = baij->stash.idx[i];
734     for ( j=0; j<size; j++ ) {
735       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
736         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
737       }
738     }
739   }
740   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
741 
742   /* inform other processors of number of messages and max length*/
743   work      = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
744   ierr      = MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
745   nreceives = work[rank];
746   ierr      = MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);CHKERRQ(ierr);
747   nmax      = work[rank];
748   PetscFree(work);
749 
750   /* post receives:
751        1) each message will consist of ordered pairs
752      (global index,value) we store the global index as a double
753      to simplify the message passing.
754        2) since we don't know how long each individual message is we
755      allocate the largest needed buffer for each receive. Potentially
756      this is a lot of wasted space.
757 
758 
759        This could be done better.
760   */
761   rvalues    = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));CHKPTRQ(rvalues);
762   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
763   for ( i=0; i<nreceives; i++ ) {
764     ierr = MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,comm,recv_waits+i);CHKERRQ(ierr);
765   }
766 
767   /* do sends:
768       1) starts[i] gives the starting index in svalues for stuff going to
769          the ith processor
770   */
771   svalues    = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
772   send_waits = (MPI_Request *) PetscMalloc((nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
773   starts     = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
774   starts[0] = 0;
775   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
776   for ( i=0; i<baij->stash.n; i++ ) {
777     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
778     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
779     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
780   }
781   PetscFree(owner);
782   starts[0] = 0;
783   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
784   count = 0;
785   for ( i=0; i<size; i++ ) {
786     if (procs[i]) {
787       ierr = MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,comm,send_waits+count++);CHKERRQ(ierr);
788     }
789   }
790   PetscFree(starts); PetscFree(nprocs);
791 
792   /* Free cache space */
793   PLogInfo(baij->A,"MatAssemblyBegin_MPIBAIJ:Number of off-processor values %d\n",baij->stash.n);
794   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
795 
796   baij->svalues    = svalues;    baij->rvalues    = rvalues;
797   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
798   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
799   baij->rmax       = nmax;
800 
801   PetscFunctionReturn(0);
802 }
803 
804 /*
805   Creates the hash table, and sets the table
806   This table is created only once.
807   If new entried need to be added to the matrix
808   then the hash table has to be destroyed and
809   recreated.
810 */
811 #undef __FUNC__
812 #define __FUNC__ "MatCreateHashTable_MPIBAIJ_Private"
813 int MatCreateHashTable_MPIBAIJ_Private(Mat mat,double factor)
814 {
815   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
816   Mat         A = baij->A, B=baij->B;
817   Mat_SeqBAIJ *a=(Mat_SeqBAIJ *)A->data, *b=(Mat_SeqBAIJ *)B->data;
818   int         i,j,k,nz=a->nz+b->nz,h1,*ai=a->i,*aj=a->j,*bi=b->i,*bj=b->j;
819   int         size,bs2=baij->bs2,rstart=baij->rstart;
820   int         cstart=baij->cstart,*garray=baij->garray,row,col,Nbs=baij->Nbs;
821   int         *HT,key;
822   Scalar      **HD;
823   double      tmp;
824 #if defined(USE_PETSC_BOPT_g)
825   int         ct=0,max=0;
826 #endif
827 
828   PetscFunctionBegin;
829   baij->ht_size=(int)(factor*nz);
830   size = baij->ht_size;
831 
832   if (baij->ht) {
833     PetscFunctionReturn(0);
834   }
835 
836   /* Allocate Memory for Hash Table */
837   baij->hd = (Scalar**)PetscMalloc((size)*(sizeof(int)+sizeof(Scalar*))+1); CHKPTRQ(baij->hd);
838   baij->ht = (int*)(baij->hd + size);
839   HD = baij->hd;
840   HT = baij->ht;
841 
842 
843   PetscMemzero(HD,size*(sizeof(int)+sizeof(Scalar*)));
844 
845 
846   /* Loop Over A */
847   for ( i=0; i<a->mbs; i++ ) {
848     for ( j=ai[i]; j<ai[i+1]; j++ ) {
849       row = i+rstart;
850       col = aj[j]+cstart;
851 
852       key = row*Nbs + col + 1;
853       h1  = HASH(size,key,tmp);
854       for ( k=0; k<size; k++ ){
855         if (HT[(h1+k)%size] == 0.0) {
856           HT[(h1+k)%size] = key;
857           HD[(h1+k)%size] = a->a + j*bs2;
858           break;
859 #if defined(USE_PETSC_BOPT_g)
860         } else {
861           ct++;
862 #endif
863         }
864       }
865 #if defined(USE_PETSC_BOPT_g)
866       if (k> max) max = k;
867 #endif
868     }
869   }
870   /* Loop Over B */
871   for ( i=0; i<b->mbs; i++ ) {
872     for ( j=bi[i]; j<bi[i+1]; j++ ) {
873       row = i+rstart;
874       col = garray[bj[j]];
875       key = row*Nbs + col + 1;
876       h1  = HASH(size,key,tmp);
877       for ( k=0; k<size; k++ ){
878         if (HT[(h1+k)%size] == 0.0) {
879           HT[(h1+k)%size] = key;
880           HD[(h1+k)%size] = b->a + j*bs2;
881           break;
882 #if defined(USE_PETSC_BOPT_g)
883         } else {
884           ct++;
885 #endif
886         }
887       }
888 #if defined(USE_PETSC_BOPT_g)
889       if (k> max) max = k;
890 #endif
891     }
892   }
893 
894   /* Print Summary */
895 #if defined(USE_PETSC_BOPT_g)
896   for ( i=0,j=0; i<size; i++)
897     if (HT[i]) {j++;}
898   PLogInfo(0,"MatCreateHashTable_MPIBAIJ_Private: Average Search = %5.2f,max search = %d\n",
899            (j== 0)? 0.0:((double)(ct+j))/j,max);
900 #endif
901   PetscFunctionReturn(0);
902 }
903 
904 #undef __FUNC__
905 #define __FUNC__ "MatAssemblyEnd_MPIBAIJ"
906 int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
907 {
908   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
909   MPI_Status  *send_status,recv_status;
910   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
911   int         bs=baij->bs,row,col,other_disassembled;
912   Scalar      *values,val;
913   InsertMode  addv = mat->insertmode;
914 
915   PetscFunctionBegin;
916   /*  wait on receives */
917   while (count) {
918     ierr = MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);CHKERRQ(ierr);
919     /* unpack receives into our local space */
920     values = baij->rvalues + 3*imdex*baij->rmax;
921     ierr = MPI_Get_count(&recv_status,MPIU_SCALAR,&n);CHKERRQ(ierr);
922     n = n/3;
923     for ( i=0; i<n; i++ ) {
924       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
925       col = (int) PetscReal(values[3*i+1]);
926       val = values[3*i+2];
927       if (col >= baij->cstart*bs && col < baij->cend*bs) {
928         col -= baij->cstart*bs;
929         ierr = MatSetValues(baij->A,1,&row,1,&col,&val,addv); CHKERRQ(ierr)
930       } else {
931         if (mat->was_assembled) {
932           if (!baij->colmap) {
933             ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);
934           }
935 #if defined (USE_CTABLE)
936 	  ierr = TableFind(baij->colmap,col/bs+1,&col); CHKERRQ(ierr);
937           col  = col - 1 + col%bs;
938 #else
939           col = (baij->colmap[col/bs]) - 1 + col%bs;
940 #endif
941           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
942             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
943             col = (int) PetscReal(values[3*i+1]);
944           }
945         }
946         ierr = MatSetValues(baij->B,1,&row,1,&col,&val,addv); CHKERRQ(ierr)
947       }
948     }
949     count--;
950   }
951   if (baij->recv_waits) PetscFree(baij->recv_waits);
952   if (baij->rvalues)    PetscFree(baij->rvalues);
953 
954   /* wait on sends */
955   if (baij->nsends) {
956     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
957     ierr        = MPI_Waitall(baij->nsends,baij->send_waits,send_status);CHKERRQ(ierr);
958     PetscFree(send_status);
959   }
960   if (baij->send_waits) PetscFree(baij->send_waits);
961   if (baij->svalues)    PetscFree(baij->svalues);
962 
963   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
964   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
965 
966   /* determine if any processor has disassembled, if so we must
967      also disassemble ourselfs, in order that we may reassemble. */
968   /*
969      if nonzero structure of submatrix B cannot change then we know that
970      no processor disassembled thus we can skip this stuff
971   */
972   if (!((Mat_SeqBAIJ*) baij->B->data)->nonew)  {
973     ierr = MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);CHKERRQ(ierr);
974     if (mat->was_assembled && !other_disassembled) {
975       ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
976     }
977   }
978 
979   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
980     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
981   }
982   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
983   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
984 
985 #if defined(USE_PETSC_BOPT_g)
986   if (baij->ht && mode== MAT_FINAL_ASSEMBLY) {
987     PLogInfo(0,"MatAssemblyEnd_MPIBAIJ:Average Hash Table Search in MatSetValues = %5.2f\n",
988              ((double)baij->ht_total_ct)/baij->ht_insert_ct);
989     baij->ht_total_ct  = 0;
990     baij->ht_insert_ct = 0;
991   }
992 #endif
993   if (baij->ht_flag && !baij->ht && mode == MAT_FINAL_ASSEMBLY) {
994     ierr = MatCreateHashTable_MPIBAIJ_Private(mat,baij->ht_fact); CHKERRQ(ierr);
995     mat->ops->setvalues        = MatSetValues_MPIBAIJ_HT;
996     mat->ops->setvaluesblocked = MatSetValuesBlocked_MPIBAIJ_HT;
997   }
998 
999   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
1000   PetscFunctionReturn(0);
1001 }
1002 
1003 #undef __FUNC__
1004 #define __FUNC__ "MatView_MPIBAIJ_Binary"
1005 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
1006 {
1007   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
1008   int          ierr;
1009 
1010   PetscFunctionBegin;
1011   if (baij->size == 1) {
1012     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
1013   } else SETERRQ(PETSC_ERR_SUP,0,"Only uniprocessor output supported");
1014   PetscFunctionReturn(0);
1015 }
1016 
1017 #undef __FUNC__
1018 #define __FUNC__ "MatView_MPIBAIJ_ASCIIorDraworSocket"
1019 static int MatView_MPIBAIJ_ASCIIorDraworSocket(Mat mat,Viewer viewer)
1020 {
1021   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
1022   int          ierr, format,bs = baij->bs, size = baij->size, rank = baij->rank;
1023   FILE         *fd;
1024   ViewerType   vtype;
1025 
1026   PetscFunctionBegin;
1027   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
1028   if (PetscTypeCompare(vtype,ASCII_VIEWER)) {
1029     ierr = ViewerGetFormat(viewer,&format);
1030     if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
1031       MatInfo info;
1032       MPI_Comm_rank(mat->comm,&rank);
1033       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
1034       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
1035       PetscSequentialPhaseBegin(mat->comm,1);
1036       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
1037               rank,baij->m,(int)info.nz_used*bs,(int)info.nz_allocated*bs,
1038               baij->bs,(int)info.memory);
1039       ierr = MatGetInfo(baij->A,MAT_LOCAL,&info);
1040       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
1041       ierr = MatGetInfo(baij->B,MAT_LOCAL,&info);
1042       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
1043       fflush(fd);
1044       PetscSequentialPhaseEnd(mat->comm,1);
1045       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
1046       PetscFunctionReturn(0);
1047     } else if (format == VIEWER_FORMAT_ASCII_INFO) {
1048       PetscPrintf(mat->comm,"  block size is %d\n",bs);
1049       PetscFunctionReturn(0);
1050     }
1051   }
1052 
1053   if (PetscTypeCompare(vtype,DRAW_VIEWER)) {
1054     Draw       draw;
1055     PetscTruth isnull;
1056     ierr = ViewerDrawGetDraw(viewer,0,&draw); CHKERRQ(ierr);
1057     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) PetscFunctionReturn(0);
1058   }
1059 
1060   if (size == 1) {
1061     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
1062   } else {
1063     /* assemble the entire matrix onto first processor. */
1064     Mat         A;
1065     Mat_SeqBAIJ *Aloc;
1066     int         M = baij->M, N = baij->N,*ai,*aj,col,i,j,k,*rvals;
1067     int         mbs = baij->mbs;
1068     Scalar      *a;
1069 
1070     if (!rank) {
1071       ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);CHKERRQ(ierr);
1072     } else {
1073       ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);CHKERRQ(ierr);
1074     }
1075     PLogObjectParent(mat,A);
1076 
1077     /* copy over the A part */
1078     Aloc = (Mat_SeqBAIJ*) baij->A->data;
1079     ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1080     rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
1081 
1082     for ( i=0; i<mbs; i++ ) {
1083       rvals[0] = bs*(baij->rstart + i);
1084       for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1085       for ( j=ai[i]; j<ai[i+1]; j++ ) {
1086         col = (baij->cstart+aj[j])*bs;
1087         for (k=0; k<bs; k++ ) {
1088           ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
1089           col++; a += bs;
1090         }
1091       }
1092     }
1093     /* copy over the B part */
1094     Aloc = (Mat_SeqBAIJ*) baij->B->data;
1095     ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1096     for ( i=0; i<mbs; i++ ) {
1097       rvals[0] = bs*(baij->rstart + i);
1098       for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1099       for ( j=ai[i]; j<ai[i+1]; j++ ) {
1100         col = baij->garray[aj[j]]*bs;
1101         for (k=0; k<bs; k++ ) {
1102           ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
1103           col++; a += bs;
1104         }
1105       }
1106     }
1107     PetscFree(rvals);
1108     ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1109     ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1110     /*
1111        Everyone has to call to draw the matrix since the graphics waits are
1112        synchronized across all processors that share the Draw object
1113     */
1114     if (!rank || PetscTypeCompare(vtype,DRAW_VIEWER)) {
1115       ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
1116     }
1117     ierr = MatDestroy(A); CHKERRQ(ierr);
1118   }
1119   PetscFunctionReturn(0);
1120 }
1121 
1122 
1123 
1124 #undef __FUNC__
1125 #define __FUNC__ "MatView_MPIBAIJ"
1126 int MatView_MPIBAIJ(Mat mat,Viewer viewer)
1127 {
1128   int         ierr;
1129   ViewerType  vtype;
1130 
1131   PetscFunctionBegin;
1132   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
1133   if (PetscTypeCompare(vtype,ASCII_VIEWER) || PetscTypeCompare(vtype,DRAW_VIEWER) ||
1134       PetscTypeCompare(vtype,SOCKET_VIEWER)) {
1135     ierr = MatView_MPIBAIJ_ASCIIorDraworSocket(mat,viewer); CHKERRQ(ierr);
1136   } else if (PetscTypeCompare(vtype,BINARY_VIEWER)) {
1137     ierr = MatView_MPIBAIJ_Binary(mat,viewer);CHKERRQ(ierr);
1138   } else {
1139     SETERRQ(1,1,"Viewer type not supported by PETSc object");
1140   }
1141   PetscFunctionReturn(0);
1142 }
1143 
1144 #undef __FUNC__
1145 #define __FUNC__ "MatDestroy_MPIBAIJ"
1146 int MatDestroy_MPIBAIJ(Mat mat)
1147 {
1148   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1149   int         ierr;
1150 
1151   PetscFunctionBegin;
1152   if (--mat->refct > 0) PetscFunctionReturn(0);
1153 
1154   if (mat->mapping) {
1155     ierr = ISLocalToGlobalMappingDestroy(mat->mapping); CHKERRQ(ierr);
1156   }
1157   if (mat->bmapping) {
1158     ierr = ISLocalToGlobalMappingDestroy(mat->bmapping); CHKERRQ(ierr);
1159   }
1160   if (mat->rmap) {
1161     ierr = MapDestroy(mat->rmap);CHKERRQ(ierr);
1162   }
1163   if (mat->cmap) {
1164     ierr = MapDestroy(mat->cmap);CHKERRQ(ierr);
1165   }
1166 #if defined(USE_PETSC_LOG)
1167   PLogObjectState((PetscObject)mat,"Rows=%d, Cols=%d",baij->M,baij->N);
1168 #endif
1169 
1170   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
1171   PetscFree(baij->rowners);
1172   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
1173   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
1174 #if defined (USE_CTABLE)
1175   if (baij->colmap) TableDelete(baij->colmap);
1176 #else
1177   if (baij->colmap) PetscFree(baij->colmap);
1178 #endif
1179   if (baij->garray) PetscFree(baij->garray);
1180   if (baij->lvec)   VecDestroy(baij->lvec);
1181   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
1182   if (baij->rowvalues) PetscFree(baij->rowvalues);
1183   if (baij->barray) PetscFree(baij->barray);
1184   if (baij->hd) PetscFree(baij->hd);
1185   PetscFree(baij);
1186   PLogObjectDestroy(mat);
1187   PetscHeaderDestroy(mat);
1188   PetscFunctionReturn(0);
1189 }
1190 
1191 #undef __FUNC__
1192 #define __FUNC__ "MatMult_MPIBAIJ"
1193 int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
1194 {
1195   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1196   int         ierr, nt;
1197 
1198   PetscFunctionBegin;
1199   ierr = VecGetLocalSize(xx,&nt);CHKERRQ(ierr);
1200   if (nt != a->n) {
1201     SETERRQ(PETSC_ERR_ARG_SIZ,0,"Incompatible partition of A and xx");
1202   }
1203   ierr = VecGetLocalSize(yy,&nt);CHKERRQ(ierr);
1204   if (nt != a->m) {
1205     SETERRQ(PETSC_ERR_ARG_SIZ,0,"Incompatible parition of A and yy");
1206   }
1207   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
1208   ierr = (*a->A->ops->mult)(a->A,xx,yy); CHKERRQ(ierr);
1209   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
1210   ierr = (*a->B->ops->multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
1211   ierr = VecScatterPostRecvs(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
1212   PetscFunctionReturn(0);
1213 }
1214 
1215 #undef __FUNC__
1216 #define __FUNC__ "MatMultAdd_MPIBAIJ"
1217 int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
1218 {
1219   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1220   int        ierr;
1221 
1222   PetscFunctionBegin;
1223   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
1224   ierr = (*a->A->ops->multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
1225   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
1226   ierr = (*a->B->ops->multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
1227   PetscFunctionReturn(0);
1228 }
1229 
1230 #undef __FUNC__
1231 #define __FUNC__ "MatMultTrans_MPIBAIJ"
1232 int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
1233 {
1234   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1235   int         ierr;
1236 
1237   PetscFunctionBegin;
1238   /* do nondiagonal part */
1239   ierr = (*a->B->ops->multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
1240   /* send it on its way */
1241   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
1242   /* do local part */
1243   ierr = (*a->A->ops->multtrans)(a->A,xx,yy); CHKERRQ(ierr);
1244   /* receive remote parts: note this assumes the values are not actually */
1245   /* inserted in yy until the next line, which is true for my implementation*/
1246   /* but is not perhaps always true. */
1247   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
1248   PetscFunctionReturn(0);
1249 }
1250 
1251 #undef __FUNC__
1252 #define __FUNC__ "MatMultTransAdd_MPIBAIJ"
1253 int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
1254 {
1255   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1256   int         ierr;
1257 
1258   PetscFunctionBegin;
1259   /* do nondiagonal part */
1260   ierr = (*a->B->ops->multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
1261   /* send it on its way */
1262   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
1263   /* do local part */
1264   ierr = (*a->A->ops->multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
1265   /* receive remote parts: note this assumes the values are not actually */
1266   /* inserted in yy until the next line, which is true for my implementation*/
1267   /* but is not perhaps always true. */
1268   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
1269   PetscFunctionReturn(0);
1270 }
1271 
1272 /*
1273   This only works correctly for square matrices where the subblock A->A is the
1274    diagonal block
1275 */
1276 #undef __FUNC__
1277 #define __FUNC__ "MatGetDiagonal_MPIBAIJ"
1278 int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
1279 {
1280   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1281   int         ierr;
1282 
1283   PetscFunctionBegin;
1284   if (a->M != a->N) SETERRQ(PETSC_ERR_SUP,0,"Supports only square matrix where A->A is diag block");
1285   ierr = MatGetDiagonal(a->A,v);CHKERRQ(ierr);
1286   PetscFunctionReturn(0);
1287 }
1288 
1289 #undef __FUNC__
1290 #define __FUNC__ "MatScale_MPIBAIJ"
1291 int MatScale_MPIBAIJ(Scalar *aa,Mat A)
1292 {
1293   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1294   int         ierr;
1295 
1296   PetscFunctionBegin;
1297   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
1298   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
1299   PetscFunctionReturn(0);
1300 }
1301 
1302 #undef __FUNC__
1303 #define __FUNC__ "MatGetSize_MPIBAIJ"
1304 int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
1305 {
1306   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1307 
1308   PetscFunctionBegin;
1309   if (m) *m = mat->M;
1310   if (n) *n = mat->N;
1311   PetscFunctionReturn(0);
1312 }
1313 
1314 #undef __FUNC__
1315 #define __FUNC__ "MatGetLocalSize_MPIBAIJ"
1316 int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
1317 {
1318   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1319 
1320   PetscFunctionBegin;
1321   *m = mat->m; *n = mat->n;
1322   PetscFunctionReturn(0);
1323 }
1324 
1325 #undef __FUNC__
1326 #define __FUNC__ "MatGetOwnershipRange_MPIBAIJ"
1327 int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
1328 {
1329   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1330 
1331   PetscFunctionBegin;
1332   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
1333   PetscFunctionReturn(0);
1334 }
1335 
1336 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
1337 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
1338 
1339 #undef __FUNC__
1340 #define __FUNC__ "MatGetRow_MPIBAIJ"
1341 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
1342 {
1343   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1344   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
1345   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
1346   int        nztot, nzA, nzB, lrow, brstart = mat->rstart*bs, brend = mat->rend*bs;
1347   int        *cmap, *idx_p,cstart = mat->cstart;
1348 
1349   PetscFunctionBegin;
1350   if (mat->getrowactive == PETSC_TRUE) SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"Already active");
1351   mat->getrowactive = PETSC_TRUE;
1352 
1353   if (!mat->rowvalues && (idx || v)) {
1354     /*
1355         allocate enough space to hold information from the longest row.
1356     */
1357     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
1358     int     max = 1,mbs = mat->mbs,tmp;
1359     for ( i=0; i<mbs; i++ ) {
1360       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
1361       if (max < tmp) { max = tmp; }
1362     }
1363     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
1364     CHKPTRQ(mat->rowvalues);
1365     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
1366   }
1367 
1368   if (row < brstart || row >= brend) SETERRQ(PETSC_ERR_SUP,0,"Only local rows")
1369   lrow = row - brstart;
1370 
1371   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
1372   if (!v)   {pvA = 0; pvB = 0;}
1373   if (!idx) {pcA = 0; if (!v) pcB = 0;}
1374   ierr = (*mat->A->ops->getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1375   ierr = (*mat->B->ops->getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1376   nztot = nzA + nzB;
1377 
1378   cmap  = mat->garray;
1379   if (v  || idx) {
1380     if (nztot) {
1381       /* Sort by increasing column numbers, assuming A and B already sorted */
1382       int imark = -1;
1383       if (v) {
1384         *v = v_p = mat->rowvalues;
1385         for ( i=0; i<nzB; i++ ) {
1386           if (cmap[cworkB[i]/bs] < cstart)   v_p[i] = vworkB[i];
1387           else break;
1388         }
1389         imark = i;
1390         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
1391         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
1392       }
1393       if (idx) {
1394         *idx = idx_p = mat->rowindices;
1395         if (imark > -1) {
1396           for ( i=0; i<imark; i++ ) {
1397             idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs;
1398           }
1399         } else {
1400           for ( i=0; i<nzB; i++ ) {
1401             if (cmap[cworkB[i]/bs] < cstart)
1402               idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
1403             else break;
1404           }
1405           imark = i;
1406         }
1407         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart*bs + cworkA[i];
1408         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
1409       }
1410     } else {
1411       if (idx) *idx = 0;
1412       if (v)   *v   = 0;
1413     }
1414   }
1415   *nz = nztot;
1416   ierr = (*mat->A->ops->restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1417   ierr = (*mat->B->ops->restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1418   PetscFunctionReturn(0);
1419 }
1420 
1421 #undef __FUNC__
1422 #define __FUNC__ "MatRestoreRow_MPIBAIJ"
1423 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
1424 {
1425   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1426 
1427   PetscFunctionBegin;
1428   if (baij->getrowactive == PETSC_FALSE) {
1429     SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"MatGetRow not called");
1430   }
1431   baij->getrowactive = PETSC_FALSE;
1432   PetscFunctionReturn(0);
1433 }
1434 
1435 #undef __FUNC__
1436 #define __FUNC__ "MatGetBlockSize_MPIBAIJ"
1437 int MatGetBlockSize_MPIBAIJ(Mat mat,int *bs)
1438 {
1439   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1440 
1441   PetscFunctionBegin;
1442   *bs = baij->bs;
1443   PetscFunctionReturn(0);
1444 }
1445 
1446 #undef __FUNC__
1447 #define __FUNC__ "MatZeroEntries_MPIBAIJ"
1448 int MatZeroEntries_MPIBAIJ(Mat A)
1449 {
1450   Mat_MPIBAIJ *l = (Mat_MPIBAIJ *) A->data;
1451   int         ierr;
1452 
1453   PetscFunctionBegin;
1454   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
1455   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
1456   PetscFunctionReturn(0);
1457 }
1458 
1459 #undef __FUNC__
1460 #define __FUNC__ "MatGetInfo_MPIBAIJ"
1461 int MatGetInfo_MPIBAIJ(Mat matin,MatInfoType flag,MatInfo *info)
1462 {
1463   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) matin->data;
1464   Mat         A = a->A, B = a->B;
1465   int         ierr;
1466   double      isend[5], irecv[5];
1467 
1468   PetscFunctionBegin;
1469   info->block_size     = (double)a->bs;
1470   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
1471   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->nz_unneeded;
1472   isend[3] = info->memory;  isend[4] = info->mallocs;
1473   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
1474   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->nz_unneeded;
1475   isend[3] += info->memory;  isend[4] += info->mallocs;
1476   if (flag == MAT_LOCAL) {
1477     info->nz_used      = isend[0];
1478     info->nz_allocated = isend[1];
1479     info->nz_unneeded  = isend[2];
1480     info->memory       = isend[3];
1481     info->mallocs      = isend[4];
1482   } else if (flag == MAT_GLOBAL_MAX) {
1483     ierr = MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_MAX,matin->comm);CHKERRQ(ierr);
1484     info->nz_used      = irecv[0];
1485     info->nz_allocated = irecv[1];
1486     info->nz_unneeded  = irecv[2];
1487     info->memory       = irecv[3];
1488     info->mallocs      = irecv[4];
1489   } else if (flag == MAT_GLOBAL_SUM) {
1490     ierr = MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_SUM,matin->comm);CHKERRQ(ierr);
1491     info->nz_used      = irecv[0];
1492     info->nz_allocated = irecv[1];
1493     info->nz_unneeded  = irecv[2];
1494     info->memory       = irecv[3];
1495     info->mallocs      = irecv[4];
1496   }
1497   info->rows_global       = (double)a->M;
1498   info->columns_global    = (double)a->N;
1499   info->rows_local        = (double)a->m;
1500   info->columns_local     = (double)a->N;
1501   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
1502   info->fill_ratio_needed = 0;
1503   info->factor_mallocs    = 0;
1504   PetscFunctionReturn(0);
1505 }
1506 
1507 #undef __FUNC__
1508 #define __FUNC__ "MatSetOption_MPIBAIJ"
1509 int MatSetOption_MPIBAIJ(Mat A,MatOption op)
1510 {
1511   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1512 
1513   PetscFunctionBegin;
1514   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
1515       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
1516       op == MAT_COLUMNS_UNSORTED ||
1517       op == MAT_COLUMNS_SORTED ||
1518       op == MAT_NEW_NONZERO_ALLOCATION_ERR ||
1519       op == MAT_NEW_NONZERO_LOCATION_ERR) {
1520         MatSetOption(a->A,op);
1521         MatSetOption(a->B,op);
1522   } else if (op == MAT_ROW_ORIENTED) {
1523         a->roworiented = 1;
1524         MatSetOption(a->A,op);
1525         MatSetOption(a->B,op);
1526   } else if (op == MAT_ROWS_SORTED ||
1527              op == MAT_ROWS_UNSORTED ||
1528              op == MAT_SYMMETRIC ||
1529              op == MAT_STRUCTURALLY_SYMMETRIC ||
1530              op == MAT_YES_NEW_DIAGONALS ||
1531              op == MAT_USE_HASH_TABLE)
1532     PLogInfo(A,"Info:MatSetOption_MPIBAIJ:Option ignored\n");
1533   else if (op == MAT_COLUMN_ORIENTED) {
1534     a->roworiented = 0;
1535     MatSetOption(a->A,op);
1536     MatSetOption(a->B,op);
1537   } else if (op == MAT_IGNORE_OFF_PROC_ENTRIES) {
1538     a->donotstash = 1;
1539   } else if (op == MAT_NO_NEW_DIAGONALS) {
1540     SETERRQ(PETSC_ERR_SUP,0,"MAT_NO_NEW_DIAGONALS");
1541   } else if (op == MAT_USE_HASH_TABLE) {
1542     a->ht_flag = 1;
1543   } else {
1544     SETERRQ(PETSC_ERR_SUP,0,"unknown option");
1545   }
1546   PetscFunctionReturn(0);
1547 }
1548 
1549 #undef __FUNC__
1550 #define __FUNC__ "MatTranspose_MPIBAIJ("
1551 int MatTranspose_MPIBAIJ(Mat A,Mat *matout)
1552 {
1553   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) A->data;
1554   Mat_SeqBAIJ *Aloc;
1555   Mat        B;
1556   int        ierr,M=baij->M,N=baij->N,*ai,*aj,i,*rvals,j,k,col;
1557   int        bs=baij->bs,mbs=baij->mbs;
1558   Scalar     *a;
1559 
1560   PetscFunctionBegin;
1561   if (matout == PETSC_NULL && M != N) SETERRQ(PETSC_ERR_ARG_SIZ,0,"Square matrix only for in-place");
1562   ierr = MatCreateMPIBAIJ(A->comm,baij->bs,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);
1563   CHKERRQ(ierr);
1564 
1565   /* copy over the A part */
1566   Aloc = (Mat_SeqBAIJ*) baij->A->data;
1567   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1568   rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
1569 
1570   for ( i=0; i<mbs; i++ ) {
1571     rvals[0] = bs*(baij->rstart + i);
1572     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1573     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1574       col = (baij->cstart+aj[j])*bs;
1575       for (k=0; k<bs; k++ ) {
1576         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1577         col++; a += bs;
1578       }
1579     }
1580   }
1581   /* copy over the B part */
1582   Aloc = (Mat_SeqBAIJ*) baij->B->data;
1583   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1584   for ( i=0; i<mbs; i++ ) {
1585     rvals[0] = bs*(baij->rstart + i);
1586     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1587     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1588       col = baij->garray[aj[j]]*bs;
1589       for (k=0; k<bs; k++ ) {
1590         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1591         col++; a += bs;
1592       }
1593     }
1594   }
1595   PetscFree(rvals);
1596   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1597   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1598 
1599   if (matout != PETSC_NULL) {
1600     *matout = B;
1601   } else {
1602     PetscOps *Abops;
1603     MatOps   Aops;
1604 
1605     /* This isn't really an in-place transpose .... but free data structures from baij */
1606     PetscFree(baij->rowners);
1607     ierr = MatDestroy(baij->A); CHKERRQ(ierr);
1608     ierr = MatDestroy(baij->B); CHKERRQ(ierr);
1609 #if defined (USE_CTABLE)
1610     if (baij->colmap) TableDelete(baij->colmap);
1611 #else
1612     if (baij->colmap) PetscFree(baij->colmap);
1613 #endif
1614     if (baij->garray) PetscFree(baij->garray);
1615     if (baij->lvec) VecDestroy(baij->lvec);
1616     if (baij->Mvctx) VecScatterDestroy(baij->Mvctx);
1617     PetscFree(baij);
1618 
1619     /*
1620        This is horrible, horrible code. We need to keep the
1621       A pointers for the bops and ops but copy everything
1622       else from C.
1623     */
1624     Abops = A->bops;
1625     Aops  = A->ops;
1626     PetscMemcpy(A,B,sizeof(struct _p_Mat));
1627     A->bops = Abops;
1628     A->ops  = Aops;
1629 
1630     PetscHeaderDestroy(B);
1631   }
1632   PetscFunctionReturn(0);
1633 }
1634 
1635 #undef __FUNC__
1636 #define __FUNC__ "MatDiagonalScale_MPIBAIJ"
1637 int MatDiagonalScale_MPIBAIJ(Mat A,Vec ll,Vec rr)
1638 {
1639   Mat a = ((Mat_MPIBAIJ *) A->data)->A;
1640   Mat b = ((Mat_MPIBAIJ *) A->data)->B;
1641   int ierr,s1,s2,s3;
1642 
1643   PetscFunctionBegin;
1644   if (ll)  {
1645     ierr = VecGetLocalSize(ll,&s1); CHKERRQ(ierr);
1646     ierr = MatGetLocalSize(A,&s2,&s3); CHKERRQ(ierr);
1647     if (s1!=s2) SETERRQ(PETSC_ERR_ARG_SIZ,0,"non-conforming local sizes");
1648     ierr = MatDiagonalScale(a,ll,0); CHKERRQ(ierr);
1649     ierr = MatDiagonalScale(b,ll,0); CHKERRQ(ierr);
1650   }
1651   if (rr) SETERRQ(PETSC_ERR_SUP,0,"not supported for right vector");
1652   PetscFunctionReturn(0);
1653 }
1654 
1655 #undef __FUNC__
1656 #define __FUNC__ "MatZeroRows_MPIBAIJ"
1657 int MatZeroRows_MPIBAIJ(Mat A,IS is,Scalar *diag)
1658 {
1659   Mat_MPIBAIJ    *l = (Mat_MPIBAIJ *) A->data;
1660   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
1661   int            *procs,*nprocs,j,found,idx,nsends,*work,row;
1662   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
1663   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
1664   int            *lens,imdex,*lrows,*values,bs=l->bs,rstart_bs=l->rstart_bs;
1665   MPI_Comm       comm = A->comm;
1666   MPI_Request    *send_waits,*recv_waits;
1667   MPI_Status     recv_status,*send_status;
1668   IS             istmp;
1669 
1670   PetscFunctionBegin;
1671   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
1672   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
1673 
1674   /*  first count number of contributors to each processor */
1675   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
1676   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
1677   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
1678   for ( i=0; i<N; i++ ) {
1679     idx = rows[i];
1680     found = 0;
1681     for ( j=0; j<size; j++ ) {
1682       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
1683         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
1684       }
1685     }
1686     if (!found) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Index out of range");
1687   }
1688   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
1689 
1690   /* inform other processors of number of messages and max length*/
1691   work   = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
1692   ierr   = MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
1693   nrecvs = work[rank];
1694   ierr   = MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);CHKERRQ(ierr);
1695   nmax   = work[rank];
1696   PetscFree(work);
1697 
1698   /* post receives:   */
1699   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); CHKPTRQ(rvalues);
1700   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
1701   for ( i=0; i<nrecvs; i++ ) {
1702     ierr = MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);CHKERRQ(ierr);
1703   }
1704 
1705   /* do sends:
1706      1) starts[i] gives the starting index in svalues for stuff going to
1707      the ith processor
1708   */
1709   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
1710   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
1711   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
1712   starts[0] = 0;
1713   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1714   for ( i=0; i<N; i++ ) {
1715     svalues[starts[owner[i]]++] = rows[i];
1716   }
1717   ISRestoreIndices(is,&rows);
1718 
1719   starts[0] = 0;
1720   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1721   count = 0;
1722   for ( i=0; i<size; i++ ) {
1723     if (procs[i]) {
1724       ierr = MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);CHKERRQ(ierr);
1725     }
1726   }
1727   PetscFree(starts);
1728 
1729   base = owners[rank]*bs;
1730 
1731   /*  wait on receives */
1732   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
1733   source = lens + nrecvs;
1734   count  = nrecvs; slen = 0;
1735   while (count) {
1736     ierr = MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);CHKERRQ(ierr);
1737     /* unpack receives into our local space */
1738     ierr = MPI_Get_count(&recv_status,MPI_INT,&n);CHKERRQ(ierr);
1739     source[imdex]  = recv_status.MPI_SOURCE;
1740     lens[imdex]  = n;
1741     slen += n;
1742     count--;
1743   }
1744   PetscFree(recv_waits);
1745 
1746   /* move the data into the send scatter */
1747   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
1748   count = 0;
1749   for ( i=0; i<nrecvs; i++ ) {
1750     values = rvalues + i*nmax;
1751     for ( j=0; j<lens[i]; j++ ) {
1752       lrows[count++] = values[j] - base;
1753     }
1754   }
1755   PetscFree(rvalues); PetscFree(lens);
1756   PetscFree(owner); PetscFree(nprocs);
1757 
1758   /* actually zap the local rows */
1759   ierr = ISCreateGeneral(PETSC_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
1760   PLogObjectParent(A,istmp);
1761 
1762   /*
1763         Zero the required rows. If the "diagonal block" of the matrix
1764      is square and the user wishes to set the diagonal we use seperate
1765      code so that MatSetValues() is not called for each diagonal allocating
1766      new memory, thus calling lots of mallocs and slowing things down.
1767 
1768        Contributed by: Mathew Knepley
1769   */
1770   /* must zero l->B before l->A because the (diag) case below may put values into l->B*/
1771   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
1772   if (diag && (l->A->M == l->A->N)) {
1773     ierr      = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
1774   } else if (diag) {
1775     ierr = MatZeroRows(l->A,istmp,0); CHKERRQ(ierr);
1776     if (((Mat_SeqBAIJ*)l->A->data)->nonew) {
1777       SETERRQ(PETSC_ERR_SUP,0,"MatZeroRows() on rectangular matrices cannot be used with the Mat options \n\
1778 MAT_NO_NEW_NONZERO_LOCATIONS,MAT_NEW_NONZERO_LOCATION_ERR,MAT_NEW_NONZERO_ALLOCATION_ERR");
1779     }
1780     for ( i = 0; i < slen; i++ ) {
1781       row  = lrows[i] + rstart_bs;
1782       ierr = MatSetValues(A,1,&row,1,&row,diag,INSERT_VALUES);CHKERRQ(ierr);
1783     }
1784     ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
1785     ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
1786   } else {
1787     ierr = MatZeroRows(l->A,istmp,0); CHKERRQ(ierr);
1788   }
1789 
1790   ierr = ISDestroy(istmp); CHKERRQ(ierr);
1791   PetscFree(lrows);
1792 
1793   /* wait on sends */
1794   if (nsends) {
1795     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
1796     ierr        = MPI_Waitall(nsends,send_waits,send_status);CHKERRQ(ierr);
1797     PetscFree(send_status);
1798   }
1799   PetscFree(send_waits); PetscFree(svalues);
1800 
1801   PetscFunctionReturn(0);
1802 }
1803 
1804 extern int MatPrintHelp_SeqBAIJ(Mat);
1805 #undef __FUNC__
1806 #define __FUNC__ "MatPrintHelp_MPIBAIJ"
1807 int MatPrintHelp_MPIBAIJ(Mat A)
1808 {
1809   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1810   MPI_Comm    comm = A->comm;
1811   static int  called = 0;
1812   int         ierr;
1813 
1814   PetscFunctionBegin;
1815   if (!a->rank) {
1816     ierr = MatPrintHelp_SeqBAIJ(a->A);CHKERRQ(ierr);
1817   }
1818   if (called) {PetscFunctionReturn(0);} else called = 1;
1819   (*PetscHelpPrintf)(comm," Options for MATMPIBAIJ matrix format (the defaults):\n");
1820   (*PetscHelpPrintf)(comm,"  -mat_use_hash_table <factor>: Use hashtable for efficient matrix assembly\n");
1821   PetscFunctionReturn(0);
1822 }
1823 
1824 #undef __FUNC__
1825 #define __FUNC__ "MatSetUnfactored_MPIBAIJ"
1826 int MatSetUnfactored_MPIBAIJ(Mat A)
1827 {
1828   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1829   int         ierr;
1830 
1831   PetscFunctionBegin;
1832   ierr = MatSetUnfactored(a->A); CHKERRQ(ierr);
1833   PetscFunctionReturn(0);
1834 }
1835 
1836 static int MatDuplicate_MPIBAIJ(Mat,MatDuplicateOption,Mat *);
1837 
1838 /* -------------------------------------------------------------------*/
1839 static struct _MatOps MatOps_Values = {
1840   MatSetValues_MPIBAIJ,
1841   MatGetRow_MPIBAIJ,
1842   MatRestoreRow_MPIBAIJ,
1843   MatMult_MPIBAIJ,
1844   MatMultAdd_MPIBAIJ,
1845   MatMultTrans_MPIBAIJ,
1846   MatMultTransAdd_MPIBAIJ,
1847   0,
1848   0,
1849   0,
1850   0,
1851   0,
1852   0,
1853   0,
1854   MatTranspose_MPIBAIJ,
1855   MatGetInfo_MPIBAIJ,
1856   0,
1857   MatGetDiagonal_MPIBAIJ,
1858   MatDiagonalScale_MPIBAIJ,
1859   MatNorm_MPIBAIJ,
1860   MatAssemblyBegin_MPIBAIJ,
1861   MatAssemblyEnd_MPIBAIJ,
1862   0,
1863   MatSetOption_MPIBAIJ,
1864   MatZeroEntries_MPIBAIJ,
1865   MatZeroRows_MPIBAIJ,
1866   0,
1867   0,
1868   0,
1869   0,
1870   MatGetSize_MPIBAIJ,
1871   MatGetLocalSize_MPIBAIJ,
1872   MatGetOwnershipRange_MPIBAIJ,
1873   0,
1874   0,
1875   0,
1876   0,
1877   MatDuplicate_MPIBAIJ,
1878   0,
1879   0,
1880   0,
1881   0,
1882   0,
1883   MatGetSubMatrices_MPIBAIJ,
1884   MatIncreaseOverlap_MPIBAIJ,
1885   MatGetValues_MPIBAIJ,
1886   0,
1887   MatPrintHelp_MPIBAIJ,
1888   MatScale_MPIBAIJ,
1889   0,
1890   0,
1891   0,
1892   MatGetBlockSize_MPIBAIJ,
1893   0,
1894   0,
1895   0,
1896   0,
1897   0,
1898   0,
1899   MatSetUnfactored_MPIBAIJ,
1900   0,
1901   MatSetValuesBlocked_MPIBAIJ,
1902   0,
1903   0,
1904   0,
1905   MatGetMaps_Petsc};
1906 
1907 
1908 EXTERN_C_BEGIN
1909 #undef __FUNC__
1910 #define __FUNC__ "MatGetDiagonalBlock_MPIBAIJ"
1911 int MatGetDiagonalBlock_MPIBAIJ(Mat A,PetscTruth *iscopy,MatReuse reuse,Mat *a)
1912 {
1913   PetscFunctionBegin;
1914   *a      = ((Mat_MPIBAIJ *)A->data)->A;
1915   *iscopy = PETSC_FALSE;
1916   PetscFunctionReturn(0);
1917 }
1918 EXTERN_C_END
1919 
1920 #undef __FUNC__
1921 #define __FUNC__ "MatCreateMPIBAIJ"
1922 /*@C
1923    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
1924    (block compressed row).  For good matrix assembly performance
1925    the user should preallocate the matrix storage by setting the parameters
1926    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1927    performance can be increased by more than a factor of 50.
1928 
1929    Collective on MPI_Comm
1930 
1931    Input Parameters:
1932 +  comm - MPI communicator
1933 .  bs   - size of blockk
1934 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1935            This value should be the same as the local size used in creating the
1936            y vector for the matrix-vector product y = Ax.
1937 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1938            This value should be the same as the local size used in creating the
1939            x vector for the matrix-vector product y = Ax.
1940 .  M - number of global rows (or PETSC_DETERMINE to have calculated if m is given)
1941 .  N - number of global columns (or PETSC_DETERMINE to have calculated if n is given)
1942 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
1943            submatrix  (same for all local rows)
1944 .  d_nzz - array containing the number of block nonzeros in the various block rows
1945            of the in diagonal portion of the local (possibly different for each block
1946            row) or PETSC_NULL.  You must leave room for the diagonal entry even if it is zero.
1947 .  o_nz  - number of block nonzeros per block row in the off-diagonal portion of local
1948            submatrix (same for all local rows).
1949 -  o_nzz - array containing the number of nonzeros in the various block rows of the
1950            off-diagonal portion of the local submatrix (possibly different for
1951            each block row) or PETSC_NULL.
1952 
1953    Output Parameter:
1954 .  A - the matrix
1955 
1956    Options Database Keys:
1957 .   -mat_no_unroll - uses code that does not unroll the loops in the
1958                      block calculations (much slower)
1959 .   -mat_block_size - size of the blocks to use
1960 .   -mat_mpi - use the parallel matrix data structures even on one processor
1961                (defaults to using SeqBAIJ format on one processor)
1962 
1963    Notes:
1964    The user MUST specify either the local or global matrix dimensions
1965    (possibly both).
1966 
1967    If PETSC_DECIDE or  PETSC_DETERMINE is used for a particular argument on one processor
1968    than it must be used on all processors that share the object for that argument.
1969 
1970    Storage Information:
1971    For a square global matrix we define each processor's diagonal portion
1972    to be its local rows and the corresponding columns (a square submatrix);
1973    each processor's off-diagonal portion encompasses the remainder of the
1974    local matrix (a rectangular submatrix).
1975 
1976    The user can specify preallocated storage for the diagonal part of
1977    the local submatrix with either d_nz or d_nnz (not both).  Set
1978    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1979    memory allocation.  Likewise, specify preallocated storage for the
1980    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1981 
1982    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1983    the figure below we depict these three local rows and all columns (0-11).
1984 
1985 .vb
1986            0 1 2 3 4 5 6 7 8 9 10 11
1987           -------------------
1988    row 3  |  o o o d d d o o o o o o
1989    row 4  |  o o o d d d o o o o o o
1990    row 5  |  o o o d d d o o o o o o
1991           -------------------
1992 .ve
1993 
1994    Thus, any entries in the d locations are stored in the d (diagonal)
1995    submatrix, and any entries in the o locations are stored in the
1996    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1997    stored simply in the MATSEQBAIJ format for compressed row storage.
1998 
1999    Now d_nz should indicate the number of block nonzeros per row in the d matrix,
2000    and o_nz should indicate the number of block nonzeros per row in the o matrix.
2001    In general, for PDE problems in which most nonzeros are near the diagonal,
2002    one expects d_nz >> o_nz.   For large problems you MUST preallocate memory
2003    or you will get TERRIBLE performance; see the users' manual chapter on
2004    matrices.
2005 
2006    Level: intermediate
2007 
2008 .keywords: matrix, block, aij, compressed row, sparse, parallel
2009 
2010 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues(), MatCreateMPIAIJ()
2011 @*/
2012 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
2013                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
2014 {
2015   Mat          B;
2016   Mat_MPIBAIJ  *b;
2017   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE,size,flg;
2018   int          flag1 = 0,flag2 = 0;
2019 
2020   PetscFunctionBegin;
2021   if (bs < 1) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Invalid block size specified, must be positive");
2022 
2023   MPI_Comm_size(comm,&size);
2024   ierr = OptionsHasName(PETSC_NULL,"-mat_mpibaij",&flag1); CHKERRQ(ierr);
2025   ierr = OptionsHasName(PETSC_NULL,"-mat_mpi",&flag2); CHKERRQ(ierr);
2026   if (!flag1 && !flag2 && size == 1) {
2027     if (M == PETSC_DECIDE) M = m;
2028     if (N == PETSC_DECIDE) N = n;
2029     ierr = MatCreateSeqBAIJ(comm,bs,M,N,d_nz,d_nnz,A); CHKERRQ(ierr);
2030     PetscFunctionReturn(0);
2031   }
2032 
2033   *A = 0;
2034   PetscHeaderCreate(B,_p_Mat,struct _MatOps,MAT_COOKIE,MATMPIBAIJ,"Mat",comm,MatDestroy,MatView);
2035   PLogObjectCreate(B);
2036   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
2037   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
2038   PetscMemcpy(B->ops,&MatOps_Values,sizeof(struct _MatOps));
2039 
2040   B->ops->destroy    = MatDestroy_MPIBAIJ;
2041   B->ops->view       = MatView_MPIBAIJ;
2042   B->mapping    = 0;
2043   B->factor     = 0;
2044   B->assembled  = PETSC_FALSE;
2045 
2046   B->insertmode = NOT_SET_VALUES;
2047   MPI_Comm_rank(comm,&b->rank);
2048   MPI_Comm_size(comm,&b->size);
2049 
2050   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL)) {
2051     SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
2052   }
2053   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) {
2054     SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"either M or m should be specified");
2055   }
2056   if ( N == PETSC_DECIDE && n == PETSC_DECIDE) {
2057     SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"either N or n should be specified");
2058   }
2059   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
2060   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
2061 
2062   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
2063     work[0] = m; work[1] = n;
2064     mbs = m/bs; nbs = n/bs;
2065     ierr = MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );CHKERRQ(ierr);
2066     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
2067     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
2068   }
2069   if (m == PETSC_DECIDE) {
2070     Mbs = M/bs;
2071     if (Mbs*bs != M) SETERRQ(PETSC_ERR_ARG_SIZ,0,"No of global rows must be divisible by blocksize");
2072     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
2073     m   = mbs*bs;
2074   }
2075   if (n == PETSC_DECIDE) {
2076     Nbs = N/bs;
2077     if (Nbs*bs != N) SETERRQ(PETSC_ERR_ARG_SIZ,0,"No of global cols must be divisible by blocksize");
2078     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
2079     n   = nbs*bs;
2080   }
2081   if (mbs*bs != m || nbs*bs != n) {
2082     SETERRQ(PETSC_ERR_ARG_SIZ,0,"No of local rows, cols must be divisible by blocksize");
2083   }
2084 
2085   b->m = m; B->m = m;
2086   b->n = n; B->n = n;
2087   b->N = N; B->N = N;
2088   b->M = M; B->M = M;
2089   b->bs  = bs;
2090   b->bs2 = bs*bs;
2091   b->mbs = mbs;
2092   b->nbs = nbs;
2093   b->Mbs = Mbs;
2094   b->Nbs = Nbs;
2095 
2096   /* the information in the maps duplicates the information computed below, eventually
2097      we should remove the duplicate information that is not contained in the maps */
2098   ierr = MapCreateMPI(comm,m,M,&B->rmap);CHKERRQ(ierr);
2099   ierr = MapCreateMPI(comm,n,N,&B->cmap);CHKERRQ(ierr);
2100 
2101   /* build local table of row and column ownerships */
2102   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
2103   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIBAIJ));
2104   b->cowners = b->rowners + b->size + 2;
2105   ierr = MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
2106   b->rowners[0] = 0;
2107   for ( i=2; i<=b->size; i++ ) {
2108     b->rowners[i] += b->rowners[i-1];
2109   }
2110   b->rstart    = b->rowners[b->rank];
2111   b->rend      = b->rowners[b->rank+1];
2112   b->rstart_bs = b->rstart * bs;
2113   b->rend_bs   = b->rend * bs;
2114 
2115   ierr = MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
2116   b->cowners[0] = 0;
2117   for ( i=2; i<=b->size; i++ ) {
2118     b->cowners[i] += b->cowners[i-1];
2119   }
2120   b->cstart    = b->cowners[b->rank];
2121   b->cend      = b->cowners[b->rank+1];
2122   b->cstart_bs = b->cstart * bs;
2123   b->cend_bs   = b->cend * bs;
2124 
2125 
2126   if (d_nz == PETSC_DEFAULT) d_nz = 5;
2127   ierr = MatCreateSeqBAIJ(PETSC_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
2128   PLogObjectParent(B,b->A);
2129   if (o_nz == PETSC_DEFAULT) o_nz = 0;
2130   ierr = MatCreateSeqBAIJ(PETSC_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
2131   PLogObjectParent(B,b->B);
2132 
2133   /* build cache for off array entries formed */
2134   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
2135   b->donotstash  = 0;
2136   b->colmap      = 0;
2137   b->garray      = 0;
2138   b->roworiented = 1;
2139 
2140   /* stuff used in block assembly */
2141   b->barray       = 0;
2142 
2143   /* stuff used for matrix vector multiply */
2144   b->lvec         = 0;
2145   b->Mvctx        = 0;
2146 
2147   /* stuff for MatGetRow() */
2148   b->rowindices   = 0;
2149   b->rowvalues    = 0;
2150   b->getrowactive = PETSC_FALSE;
2151 
2152   /* hash table stuff */
2153   b->ht           = 0;
2154   b->hd           = 0;
2155   b->ht_size      = 0;
2156   b->ht_flag      = 0;
2157   b->ht_fact      = 0;
2158   b->ht_total_ct  = 0;
2159   b->ht_insert_ct = 0;
2160 
2161   *A = B;
2162   ierr = OptionsHasName(PETSC_NULL,"-mat_use_hash_table",&flg); CHKERRQ(ierr);
2163   if (flg) {
2164     double fact = 1.39;
2165     ierr = MatSetOption(B,MAT_USE_HASH_TABLE); CHKERRQ(ierr);
2166     ierr = OptionsGetDouble(PETSC_NULL,"-mat_use_hash_table",&fact,&flg); CHKERRQ(ierr);
2167     if (fact <= 1.0) fact = 1.39;
2168     ierr = MatMPIBAIJSetHashTableFactor(B,fact); CHKERRQ(ierr);
2169     PLogInfo(0,"MatCreateMPIBAIJ:Hash table Factor used %5.2f\n",fact);
2170   }
2171   ierr = PetscObjectComposeFunction((PetscObject)B,"MatGetDiagonalBlock_C",
2172                                      "MatGetDiagonalBlock_MPIBAIJ",
2173                                      (void*)MatGetDiagonalBlock_MPIBAIJ);CHKERRQ(ierr);
2174   PetscFunctionReturn(0);
2175 }
2176 
2177 #undef __FUNC__
2178 #define __FUNC__ "MatDuplicate_MPIBAIJ"
2179 static int MatDuplicate_MPIBAIJ(Mat matin,MatDuplicateOption cpvalues,Mat *newmat)
2180 {
2181   Mat         mat;
2182   Mat_MPIBAIJ *a,*oldmat = (Mat_MPIBAIJ *) matin->data;
2183   int         ierr, len=0, flg;
2184 
2185   PetscFunctionBegin;
2186   *newmat       = 0;
2187   PetscHeaderCreate(mat,_p_Mat,struct _MatOps,MAT_COOKIE,MATMPIBAIJ,"Mat",matin->comm,MatDestroy,MatView);
2188   PLogObjectCreate(mat);
2189   mat->data       = (void *) (a = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(a);
2190   PetscMemcpy(mat->ops,&MatOps_Values,sizeof(struct _MatOps));
2191   mat->ops->destroy    = MatDestroy_MPIBAIJ;
2192   mat->ops->view       = MatView_MPIBAIJ;
2193   mat->factor          = matin->factor;
2194   mat->assembled       = PETSC_TRUE;
2195   mat->insertmode      = NOT_SET_VALUES;
2196 
2197   a->m = mat->m   = oldmat->m;
2198   a->n = mat->n   = oldmat->n;
2199   a->M = mat->M   = oldmat->M;
2200   a->N = mat->N   = oldmat->N;
2201 
2202   a->bs  = oldmat->bs;
2203   a->bs2 = oldmat->bs2;
2204   a->mbs = oldmat->mbs;
2205   a->nbs = oldmat->nbs;
2206   a->Mbs = oldmat->Mbs;
2207   a->Nbs = oldmat->Nbs;
2208 
2209   a->rstart       = oldmat->rstart;
2210   a->rend         = oldmat->rend;
2211   a->cstart       = oldmat->cstart;
2212   a->cend         = oldmat->cend;
2213   a->size         = oldmat->size;
2214   a->rank         = oldmat->rank;
2215   a->donotstash   = oldmat->donotstash;
2216   a->roworiented  = oldmat->roworiented;
2217   a->rowindices   = 0;
2218   a->rowvalues    = 0;
2219   a->getrowactive = PETSC_FALSE;
2220   a->barray       = 0;
2221   a->rstart_bs    = oldmat->rstart_bs;
2222   a->rend_bs      = oldmat->rend_bs;
2223   a->cstart_bs    = oldmat->cstart_bs;
2224   a->cend_bs      = oldmat->cend_bs;
2225 
2226   /* hash table stuff */
2227   a->ht           = 0;
2228   a->hd           = 0;
2229   a->ht_size      = 0;
2230   a->ht_flag      = oldmat->ht_flag;
2231   a->ht_fact      = oldmat->ht_fact;
2232   a->ht_total_ct  = 0;
2233   a->ht_insert_ct = 0;
2234 
2235 
2236   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
2237   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIBAIJ));
2238   a->cowners = a->rowners + a->size + 2;
2239   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
2240   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
2241   if (oldmat->colmap) {
2242 #if defined (USE_CTABLE)
2243   ierr = TableCreateCopy(oldmat->colmap,&a->colmap); CHKERRQ(ierr);
2244 #else
2245     a->colmap = (int *) PetscMalloc((a->Nbs)*sizeof(int));CHKPTRQ(a->colmap);
2246     PLogObjectMemory(mat,(a->Nbs)*sizeof(int));
2247     PetscMemcpy(a->colmap,oldmat->colmap,(a->Nbs)*sizeof(int));
2248 #endif
2249   } else a->colmap = 0;
2250   if (oldmat->garray && (len = ((Mat_SeqBAIJ *) (oldmat->B->data))->nbs)) {
2251     a->garray = (int *) PetscMalloc(len*sizeof(int)); CHKPTRQ(a->garray);
2252     PLogObjectMemory(mat,len*sizeof(int));
2253     PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
2254   } else a->garray = 0;
2255 
2256   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
2257   PLogObjectParent(mat,a->lvec);
2258   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
2259 
2260   PLogObjectParent(mat,a->Mvctx);
2261   ierr =  MatDuplicate(oldmat->A,cpvalues,&a->A); CHKERRQ(ierr);
2262   PLogObjectParent(mat,a->A);
2263   ierr =  MatDuplicate(oldmat->B,cpvalues,&a->B); CHKERRQ(ierr);
2264   PLogObjectParent(mat,a->B);
2265   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
2266   ierr = FListDuplicate(mat->qlist,&matin->qlist);CHKERRQ(ierr);
2267   if (flg) {
2268     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
2269   }
2270   *newmat = mat;
2271   PetscFunctionReturn(0);
2272 }
2273 
2274 #include "sys.h"
2275 
2276 #undef __FUNC__
2277 #define __FUNC__ "MatLoad_MPIBAIJ"
2278 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
2279 {
2280   Mat          A;
2281   int          i, nz, ierr, j,rstart, rend, fd;
2282   Scalar       *vals,*buf;
2283   MPI_Comm     comm = ((PetscObject)viewer)->comm;
2284   MPI_Status   status;
2285   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
2286   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
2287   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,Mbs,mbs,extra_rows;
2288   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
2289   int          dcount,kmax,k,nzcount,tmp;
2290 
2291   PetscFunctionBegin;
2292   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
2293 
2294   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
2295   if (!rank) {
2296     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
2297     ierr = PetscBinaryRead(fd,(char *)header,4,PETSC_INT); CHKERRQ(ierr);
2298     if (header[0] != MAT_COOKIE) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"not matrix object");
2299     if (header[3] < 0) {
2300       SETERRQ(PETSC_ERR_FILE_UNEXPECTED,1,"Matrix stored in special format, cannot load as MPIBAIJ");
2301     }
2302   }
2303 
2304   ierr = MPI_Bcast(header+1,3,MPI_INT,0,comm);CHKERRQ(ierr);
2305   M = header[1]; N = header[2];
2306 
2307   if (M != N) SETERRQ(PETSC_ERR_SUP,0,"Can only do square matrices");
2308 
2309   /*
2310      This code adds extra rows to make sure the number of rows is
2311      divisible by the blocksize
2312   */
2313   Mbs        = M/bs;
2314   extra_rows = bs - M + bs*(Mbs);
2315   if (extra_rows == bs) extra_rows = 0;
2316   else                  Mbs++;
2317   if (extra_rows &&!rank) {
2318     PLogInfo(0,"MatLoad_MPIBAIJ:Padding loaded matrix to match blocksize\n");
2319   }
2320 
2321   /* determine ownership of all rows */
2322   mbs = Mbs/size + ((Mbs % size) > rank);
2323   m   = mbs * bs;
2324   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
2325   browners = rowners + size + 1;
2326   ierr = MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
2327   rowners[0] = 0;
2328   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
2329   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
2330   rstart = rowners[rank];
2331   rend   = rowners[rank+1];
2332 
2333   /* distribute row lengths to all processors */
2334   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
2335   if (!rank) {
2336     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
2337     ierr = PetscBinaryRead(fd,rowlengths,M,PETSC_INT); CHKERRQ(ierr);
2338     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
2339     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
2340     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
2341     ierr = MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);CHKERRQ(ierr);
2342     PetscFree(sndcounts);
2343   } else {
2344     ierr = MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);CHKERRQ(ierr);
2345   }
2346 
2347   if (!rank) {
2348     /* calculate the number of nonzeros on each processor */
2349     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
2350     PetscMemzero(procsnz,size*sizeof(int));
2351     for ( i=0; i<size; i++ ) {
2352       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
2353         procsnz[i] += rowlengths[j];
2354       }
2355     }
2356     PetscFree(rowlengths);
2357 
2358     /* determine max buffer needed and allocate it */
2359     maxnz = 0;
2360     for ( i=0; i<size; i++ ) {
2361       maxnz = PetscMax(maxnz,procsnz[i]);
2362     }
2363     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
2364 
2365     /* read in my part of the matrix column indices  */
2366     nz = procsnz[0];
2367     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
2368     mycols = ibuf;
2369     if (size == 1)  nz -= extra_rows;
2370     ierr = PetscBinaryRead(fd,mycols,nz,PETSC_INT); CHKERRQ(ierr);
2371     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
2372 
2373     /* read in every ones (except the last) and ship off */
2374     for ( i=1; i<size-1; i++ ) {
2375       nz   = procsnz[i];
2376       ierr = PetscBinaryRead(fd,cols,nz,PETSC_INT); CHKERRQ(ierr);
2377       ierr = MPI_Send(cols,nz,MPI_INT,i,tag,comm);CHKERRQ(ierr);
2378     }
2379     /* read in the stuff for the last proc */
2380     if ( size != 1 ) {
2381       nz   = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
2382       ierr = PetscBinaryRead(fd,cols,nz,PETSC_INT); CHKERRQ(ierr);
2383       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
2384       ierr = MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);CHKERRQ(ierr);
2385     }
2386     PetscFree(cols);
2387   } else {
2388     /* determine buffer space needed for message */
2389     nz = 0;
2390     for ( i=0; i<m; i++ ) {
2391       nz += locrowlens[i];
2392     }
2393     ibuf   = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
2394     mycols = ibuf;
2395     /* receive message of column indices*/
2396     ierr = MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);CHKERRQ(ierr);
2397     ierr = MPI_Get_count(&status,MPI_INT,&maxnz);CHKERRQ(ierr);
2398     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"something is wrong with file");
2399   }
2400 
2401   /* loop over local rows, determining number of off diagonal entries */
2402   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
2403   odlens = dlens + (rend-rstart);
2404   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
2405   PetscMemzero(mask,3*Mbs*sizeof(int));
2406   masked1 = mask    + Mbs;
2407   masked2 = masked1 + Mbs;
2408   rowcount = 0; nzcount = 0;
2409   for ( i=0; i<mbs; i++ ) {
2410     dcount  = 0;
2411     odcount = 0;
2412     for ( j=0; j<bs; j++ ) {
2413       kmax = locrowlens[rowcount];
2414       for ( k=0; k<kmax; k++ ) {
2415         tmp = mycols[nzcount++]/bs;
2416         if (!mask[tmp]) {
2417           mask[tmp] = 1;
2418           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
2419           else masked1[dcount++] = tmp;
2420         }
2421       }
2422       rowcount++;
2423     }
2424 
2425     dlens[i]  = dcount;
2426     odlens[i] = odcount;
2427 
2428     /* zero out the mask elements we set */
2429     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
2430     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
2431   }
2432 
2433   /* create our matrix */
2434   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);
2435          CHKERRQ(ierr);
2436   A = *newmat;
2437   MatSetOption(A,MAT_COLUMNS_SORTED);
2438 
2439   if (!rank) {
2440     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
2441     /* read in my part of the matrix numerical values  */
2442     nz = procsnz[0];
2443     vals = buf;
2444     mycols = ibuf;
2445     if (size == 1)  nz -= extra_rows;
2446     ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
2447     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
2448 
2449     /* insert into matrix */
2450     jj      = rstart*bs;
2451     for ( i=0; i<m; i++ ) {
2452       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
2453       mycols += locrowlens[i];
2454       vals   += locrowlens[i];
2455       jj++;
2456     }
2457     /* read in other processors (except the last one) and ship out */
2458     for ( i=1; i<size-1; i++ ) {
2459       nz   = procsnz[i];
2460       vals = buf;
2461       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
2462       ierr = MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);CHKERRQ(ierr);
2463     }
2464     /* the last proc */
2465     if ( size != 1 ){
2466       nz   = procsnz[i] - extra_rows;
2467       vals = buf;
2468       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
2469       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
2470       ierr = MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);CHKERRQ(ierr);
2471     }
2472     PetscFree(procsnz);
2473   } else {
2474     /* receive numeric values */
2475     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
2476 
2477     /* receive message of values*/
2478     vals   = buf;
2479     mycols = ibuf;
2480     ierr   = MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);CHKERRQ(ierr);
2481     ierr   = MPI_Get_count(&status,MPIU_SCALAR,&maxnz);CHKERRQ(ierr);
2482     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"something is wrong with file");
2483 
2484     /* insert into matrix */
2485     jj      = rstart*bs;
2486     for ( i=0; i<m; i++ ) {
2487       ierr    = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
2488       mycols += locrowlens[i];
2489       vals   += locrowlens[i];
2490       jj++;
2491     }
2492   }
2493   PetscFree(locrowlens);
2494   PetscFree(buf);
2495   PetscFree(ibuf);
2496   PetscFree(rowners);
2497   PetscFree(dlens);
2498   PetscFree(mask);
2499   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2500   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2501   PetscFunctionReturn(0);
2502 }
2503 
2504 
2505 
2506 #undef __FUNC__
2507 #define __FUNC__ "MatMPIBAIJSetHashTableFactor"
2508 /*@
2509    MatMPIBAIJSetHashTableFactor - Sets the factor required to compute the size of the HashTable.
2510 
2511    Input Parameters:
2512 .  mat  - the matrix
2513 .  fact - factor
2514 
2515    Collective on Mat
2516 
2517   Notes:
2518    This can also be set by the command line option: -mat_use_hash_table fact
2519 
2520 .keywords: matrix, hashtable, factor, HT
2521 
2522 .seealso: MatSetOption()
2523 @*/
2524 int MatMPIBAIJSetHashTableFactor(Mat mat,double fact)
2525 {
2526   Mat_MPIBAIJ *baij;
2527 
2528   PetscFunctionBegin;
2529   PetscValidHeaderSpecific(mat,MAT_COOKIE);
2530   if (mat->type != MATMPIBAIJ) {
2531       SETERRQ(PETSC_ERR_ARG_WRONG,1,"Incorrect matrix type. Use MPIBAIJ only.");
2532   }
2533   baij = (Mat_MPIBAIJ*) mat->data;
2534   baij->ht_fact = fact;
2535   PetscFunctionReturn(0);
2536 }
2537