xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision 7afbd364210a49753bbc0d0f925bfebaa4e7c628)
1 #ifdef PETSC_RCS_HEADER
2 static char vcid[] = "$Id: mpibaij.c,v 1.154 1999/02/18 17:09:28 bsmith Exp balay $";
3 #endif
4 
5 #include "src/mat/impls/baij/mpi/mpibaij.h"   /*I  "mat.h"  I*/
6 #include "src/vec/vecimpl.h"
7 
8 extern int MatSetUpMultiply_MPIBAIJ(Mat);
9 extern int DisAssemble_MPIBAIJ(Mat);
10 extern int MatIncreaseOverlap_MPIBAIJ(Mat,int,IS *,int);
11 extern int MatGetSubMatrices_MPIBAIJ(Mat,int,IS *,IS *,MatReuse,Mat **);
12 extern int MatGetValues_SeqBAIJ(Mat,int,int *,int,int *,Scalar *);
13 
14 /*
15      Local utility routine that creates a mapping from the global column
16    number to the local number in the off-diagonal part of the local
17    storage of the matrix.  This is done in a non scable way since the
18    length of colmap equals the global matrix length.
19 */
20 #undef __FUNC__
21 #define __FUNC__ "CreateColmap_MPIBAIJ_Private"
22 static int CreateColmap_MPIBAIJ_Private(Mat mat)
23 {
24   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
25   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
26   int         nbs = B->nbs,i,bs=B->bs;
27 #if defined (USE_CTABLE)
28   int         ierr;
29 #endif
30 
31   PetscFunctionBegin;
32 #if defined (USE_CTABLE)
33   ierr = TableCreate(baij->nbs/5,&baij->colmap); CHKERRQ(ierr);
34   for ( i=0; i<nbs; i++ ){
35     ierr = TableAdd(baij->colmap,baij->garray[i]+1,i*bs+1);CHKERRQ(ierr);
36   }
37 #else
38   baij->colmap = (int *) PetscMalloc((baij->Nbs+1)*sizeof(int));CHKPTRQ(baij->colmap);
39   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
40   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
41   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i*bs+1;
42 #endif
43   PetscFunctionReturn(0);
44 }
45 
46 #define CHUNKSIZE  10
47 
48 #define  MatSetValues_SeqBAIJ_A_Private(row,col,value,addv) \
49 { \
50  \
51     brow = row/bs;  \
52     rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
53     rmax = aimax[brow]; nrow = ailen[brow]; \
54       bcol = col/bs; \
55       ridx = row % bs; cidx = col % bs; \
56       low = 0; high = nrow; \
57       while (high-low > 3) { \
58         t = (low+high)/2; \
59         if (rp[t] > bcol) high = t; \
60         else              low  = t; \
61       } \
62       for ( _i=low; _i<high; _i++ ) { \
63         if (rp[_i] > bcol) break; \
64         if (rp[_i] == bcol) { \
65           bap  = ap +  bs2*_i + bs*cidx + ridx; \
66           if (addv == ADD_VALUES) *bap += value;  \
67           else                    *bap  = value;  \
68           goto a_noinsert; \
69         } \
70       } \
71       if (a->nonew == 1) goto a_noinsert; \
72       else if (a->nonew == -1) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero into matrix"); \
73       if (nrow >= rmax) { \
74         /* there is no extra room in row, therefore enlarge */ \
75         int    new_nz = ai[a->mbs] + CHUNKSIZE,len,*new_i,*new_j; \
76         Scalar *new_a; \
77  \
78         if (a->nonew == -2) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero in the matrix"); \
79  \
80         /* malloc new storage space */ \
81         len     = new_nz*(sizeof(int)+bs2*sizeof(Scalar))+(a->mbs+1)*sizeof(int); \
82         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
83         new_j   = (int *) (new_a + bs2*new_nz); \
84         new_i   = new_j + new_nz; \
85  \
86         /* copy over old data into new slots */ \
87         for ( ii=0; ii<brow+1; ii++ ) {new_i[ii] = ai[ii];} \
88         for ( ii=brow+1; ii<a->mbs+1; ii++ ) {new_i[ii] = ai[ii]+CHUNKSIZE;} \
89         PetscMemcpy(new_j,aj,(ai[brow]+nrow)*sizeof(int)); \
90         len = (new_nz - CHUNKSIZE - ai[brow] - nrow); \
91         PetscMemcpy(new_j+ai[brow]+nrow+CHUNKSIZE,aj+ai[brow]+nrow, \
92                                                            len*sizeof(int)); \
93         PetscMemcpy(new_a,aa,(ai[brow]+nrow)*bs2*sizeof(Scalar)); \
94         PetscMemzero(new_a+bs2*(ai[brow]+nrow),bs2*CHUNKSIZE*sizeof(Scalar)); \
95         PetscMemcpy(new_a+bs2*(ai[brow]+nrow+CHUNKSIZE), \
96                     aa+bs2*(ai[brow]+nrow),bs2*len*sizeof(Scalar));  \
97         /* free up old matrix storage */ \
98         PetscFree(a->a);  \
99         if (!a->singlemalloc) {PetscFree(a->i);PetscFree(a->j);} \
100         aa = a->a = new_a; ai = a->i = new_i; aj = a->j = new_j;  \
101         a->singlemalloc = 1; \
102  \
103         rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
104         rmax = aimax[brow] = aimax[brow] + CHUNKSIZE; \
105         PLogObjectMemory(A,CHUNKSIZE*(sizeof(int) + bs2*sizeof(Scalar))); \
106         a->maxnz += bs2*CHUNKSIZE; \
107         a->reallocs++; \
108         a->nz++; \
109       } \
110       N = nrow++ - 1;  \
111       /* shift up all the later entries in this row */ \
112       for ( ii=N; ii>=_i; ii-- ) { \
113         rp[ii+1] = rp[ii]; \
114         PetscMemcpy(ap+bs2*(ii+1),ap+bs2*(ii),bs2*sizeof(Scalar)); \
115       } \
116       if (N>=_i) PetscMemzero(ap+bs2*_i,bs2*sizeof(Scalar));  \
117       rp[_i]                      = bcol;  \
118       ap[bs2*_i + bs*cidx + ridx] = value;  \
119       a_noinsert:; \
120     ailen[brow] = nrow; \
121 }
122 
123 #define  MatSetValues_SeqBAIJ_B_Private(row,col,value,addv) \
124 { \
125  \
126     brow = row/bs;  \
127     rp   = bj + bi[brow]; ap = ba + bs2*bi[brow]; \
128     rmax = bimax[brow]; nrow = bilen[brow]; \
129       bcol = col/bs; \
130       ridx = row % bs; cidx = col % bs; \
131       low = 0; high = nrow; \
132       while (high-low > 3) { \
133         t = (low+high)/2; \
134         if (rp[t] > bcol) high = t; \
135         else              low  = t; \
136       } \
137       for ( _i=low; _i<high; _i++ ) { \
138         if (rp[_i] > bcol) break; \
139         if (rp[_i] == bcol) { \
140           bap  = ap +  bs2*_i + bs*cidx + ridx; \
141           if (addv == ADD_VALUES) *bap += value;  \
142           else                    *bap  = value;  \
143           goto b_noinsert; \
144         } \
145       } \
146       if (b->nonew == 1) goto b_noinsert; \
147       else if (b->nonew == -1) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero into matrix"); \
148       if (nrow >= rmax) { \
149         /* there is no extra room in row, therefore enlarge */ \
150         int    new_nz = bi[b->mbs] + CHUNKSIZE,len,*new_i,*new_j; \
151         Scalar *new_a; \
152  \
153         if (b->nonew == -2) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero in the matrix"); \
154  \
155         /* malloc new storage space */ \
156         len     = new_nz*(sizeof(int)+bs2*sizeof(Scalar))+(b->mbs+1)*sizeof(int); \
157         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
158         new_j   = (int *) (new_a + bs2*new_nz); \
159         new_i   = new_j + new_nz; \
160  \
161         /* copy over old data into new slots */ \
162         for ( ii=0; ii<brow+1; ii++ ) {new_i[ii] = bi[ii];} \
163         for ( ii=brow+1; ii<b->mbs+1; ii++ ) {new_i[ii] = bi[ii]+CHUNKSIZE;} \
164         PetscMemcpy(new_j,bj,(bi[brow]+nrow)*sizeof(int)); \
165         len = (new_nz - CHUNKSIZE - bi[brow] - nrow); \
166         PetscMemcpy(new_j+bi[brow]+nrow+CHUNKSIZE,bj+bi[brow]+nrow, \
167                                                            len*sizeof(int)); \
168         PetscMemcpy(new_a,ba,(bi[brow]+nrow)*bs2*sizeof(Scalar)); \
169         PetscMemzero(new_a+bs2*(bi[brow]+nrow),bs2*CHUNKSIZE*sizeof(Scalar)); \
170         PetscMemcpy(new_a+bs2*(bi[brow]+nrow+CHUNKSIZE), \
171                     ba+bs2*(bi[brow]+nrow),bs2*len*sizeof(Scalar));  \
172         /* free up old matrix storage */ \
173         PetscFree(b->a);  \
174         if (!b->singlemalloc) {PetscFree(b->i);PetscFree(b->j);} \
175         ba = b->a = new_a; bi = b->i = new_i; bj = b->j = new_j;  \
176         b->singlemalloc = 1; \
177  \
178         rp   = bj + bi[brow]; ap = ba + bs2*bi[brow]; \
179         rmax = bimax[brow] = bimax[brow] + CHUNKSIZE; \
180         PLogObjectMemory(B,CHUNKSIZE*(sizeof(int) + bs2*sizeof(Scalar))); \
181         b->maxnz += bs2*CHUNKSIZE; \
182         b->reallocs++; \
183         b->nz++; \
184       } \
185       N = nrow++ - 1;  \
186       /* shift up all the later entries in this row */ \
187       for ( ii=N; ii>=_i; ii-- ) { \
188         rp[ii+1] = rp[ii]; \
189         PetscMemcpy(ap+bs2*(ii+1),ap+bs2*(ii),bs2*sizeof(Scalar)); \
190       } \
191       if (N>=_i) PetscMemzero(ap+bs2*_i,bs2*sizeof(Scalar));  \
192       rp[_i]                      = bcol;  \
193       ap[bs2*_i + bs*cidx + ridx] = value;  \
194       b_noinsert:; \
195     bilen[brow] = nrow; \
196 }
197 
198 #undef __FUNC__
199 #define __FUNC__ "MatSetValues_MPIBAIJ"
200 int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
201 {
202   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
203   Scalar      value;
204   int         ierr,i,j,row,col;
205   int         roworiented = baij->roworiented,rstart_orig=baij->rstart_bs ;
206   int         rend_orig=baij->rend_bs,cstart_orig=baij->cstart_bs;
207   int         cend_orig=baij->cend_bs,bs=baij->bs;
208 
209   /* Some Variables required in the macro */
210   Mat         A = baij->A;
211   Mat_SeqBAIJ *a = (Mat_SeqBAIJ *) (A)->data;
212   int         *aimax=a->imax,*ai=a->i,*ailen=a->ilen,*aj=a->j;
213   Scalar      *aa=a->a;
214 
215   Mat         B = baij->B;
216   Mat_SeqBAIJ *b = (Mat_SeqBAIJ *) (B)->data;
217   int         *bimax=b->imax,*bi=b->i,*bilen=b->ilen,*bj=b->j;
218   Scalar      *ba=b->a;
219 
220   int         *rp,ii,nrow,_i,rmax,N,brow,bcol;
221   int         low,high,t,ridx,cidx,bs2=a->bs2;
222   Scalar      *ap,*bap;
223 
224   PetscFunctionBegin;
225   for ( i=0; i<m; i++ ) {
226     if (im[i] < 0) continue;
227 #if defined(USE_PETSC_BOPT_g)
228     if (im[i] >= baij->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
229 #endif
230     if (im[i] >= rstart_orig && im[i] < rend_orig) {
231       row = im[i] - rstart_orig;
232       for ( j=0; j<n; j++ ) {
233         if (in[j] >= cstart_orig && in[j] < cend_orig){
234           col = in[j] - cstart_orig;
235           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
236           MatSetValues_SeqBAIJ_A_Private(row,col,value,addv);
237           /* ierr = MatSetValues_SeqBAIJ(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
238         }
239         else if (in[j] < 0) continue;
240 #if defined(USE_PETSC_BOPT_g)
241         else if (in[j] >= baij->N) {SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Col too large");}
242 #endif
243         else {
244           if (mat->was_assembled) {
245             if (!baij->colmap) {
246               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
247             }
248 #if defined (USE_CTABLE)
249             ierr = TableFind(baij->colmap, in[j]/bs + 1,&col); CHKERRQ(ierr);
250             col  = col - 1 + in[j]%bs;
251 #else
252             col = baij->colmap[in[j]/bs] - 1 + in[j]%bs;
253 #endif
254             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
255               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
256               col =  in[j];
257               /* Reinitialize the variables required by MatSetValues_SeqBAIJ_B_Private() */
258               B = baij->B;
259               b = (Mat_SeqBAIJ *) (B)->data;
260               bimax=b->imax;bi=b->i;bilen=b->ilen;bj=b->j;
261               ba=b->a;
262             }
263           } else col = in[j];
264           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
265           MatSetValues_SeqBAIJ_B_Private(row,col,value,addv);
266           /* ierr = MatSetValues_SeqBAIJ(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
267         }
268       }
269     } else {
270       if (roworiented && !baij->donotstash) {
271         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
272       } else {
273         if (!baij->donotstash) {
274           row = im[i];
275 	  for ( j=0; j<n; j++ ) {
276 	    ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
277           }
278         }
279       }
280     }
281   }
282   PetscFunctionReturn(0);
283 }
284 
285 extern int MatSetValuesBlocked_SeqBAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
286 #undef __FUNC__
287 #define __FUNC__ "MatSetValuesBlocked_MPIBAIJ"
288 int MatSetValuesBlocked_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
289 {
290   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
291   Scalar      *value,*barray=baij->barray;
292   int         ierr,i,j,ii,jj,row,col,k,l;
293   int         roworiented = baij->roworiented,rstart=baij->rstart ;
294   int         rend=baij->rend,cstart=baij->cstart,stepval;
295   int         cend=baij->cend,bs=baij->bs,bs2=baij->bs2;
296 
297   if(!barray) {
298     baij->barray = barray = (Scalar*) PetscMalloc(bs2*sizeof(Scalar)); CHKPTRQ(barray);
299   }
300 
301   if (roworiented) {
302     stepval = (n-1)*bs;
303   } else {
304     stepval = (m-1)*bs;
305   }
306   for ( i=0; i<m; i++ ) {
307     if (im[i] < 0) continue;
308 #if defined(USE_PETSC_BOPT_g)
309     if (im[i] >= baij->Mbs) SETERRQ2(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large, row %d max %d",im[i],baij->Mbs);
310 #endif
311     if (im[i] >= rstart && im[i] < rend) {
312       row = im[i] - rstart;
313       for ( j=0; j<n; j++ ) {
314         /* If NumCol = 1 then a copy is not required */
315         if ((roworiented) && (n == 1)) {
316           barray = v + i*bs2;
317         } else if((!roworiented) && (m == 1)) {
318           barray = v + j*bs2;
319         } else { /* Here a copy is required */
320           if (roworiented) {
321             value = v + i*(stepval+bs)*bs + j*bs;
322           } else {
323             value = v + j*(stepval+bs)*bs + i*bs;
324           }
325           for ( ii=0; ii<bs; ii++,value+=stepval ) {
326             for (jj=0; jj<bs; jj++ ) {
327               *barray++  = *value++;
328             }
329           }
330           barray -=bs2;
331         }
332 
333         if (in[j] >= cstart && in[j] < cend){
334           col  = in[j] - cstart;
335           ierr = MatSetValuesBlocked_SeqBAIJ(baij->A,1,&row,1,&col,barray,addv);CHKERRQ(ierr);
336         }
337         else if (in[j] < 0) continue;
338 #if defined(USE_PETSC_BOPT_g)
339         else if (in[j] >= baij->Nbs) {SETERRQ2(PETSC_ERR_ARG_OUTOFRANGE,0,"Column too large, col %d max %d",in[j],baij->Nbs);}
340 #endif
341         else {
342           if (mat->was_assembled) {
343             if (!baij->colmap) {
344               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
345             }
346 
347 #if defined(USE_PETSC_BOPT_g)
348 #if defined (USE_CTABLE)
349             { int data;
350             ierr = TableFind(baij->colmap,in[j]+1,&data); CHKERRQ(ierr);
351             if((data - 1) % bs)
352 	      {SETERRQ(PETSC_ERR_PLIB,0,"Incorrect colmap");}
353             }
354 #else
355             if ((baij->colmap[in[j]] - 1) % bs) {SETERRQ(PETSC_ERR_PLIB,0,"Incorrect colmap");}
356 #endif
357 #endif
358 #if defined (USE_CTABLE)
359 	    ierr = TableFind(baij->colmap,in[j]+1,&col); CHKERRQ(ierr);
360             col  = (col - 1)/bs;
361 #else
362             col = (baij->colmap[in[j]] - 1)/bs;
363 #endif
364             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
365               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
366               col =  in[j];
367             }
368           }
369           else col = in[j];
370           ierr = MatSetValuesBlocked_SeqBAIJ(baij->B,1,&row,1,&col,barray,addv);CHKERRQ(ierr);
371         }
372       }
373     } else {
374       if (!baij->donotstash) {
375         if (roworiented ) {
376           row   = im[i]*bs;
377           value = v + i*(stepval+bs)*bs;
378           for ( j=0; j<bs; j++,row++ ) {
379             for ( k=0; k<n; k++ ) {
380               for ( col=in[k]*bs,l=0; l<bs; l++,col++) {
381                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
382               }
383             }
384           }
385         } else {
386           for ( j=0; j<n; j++ ) {
387             value = v + j*(stepval+bs)*bs + i*bs;
388             col   = in[j]*bs;
389             for ( k=0; k<bs; k++,col++,value+=stepval) {
390               for ( row = im[i]*bs,l=0; l<bs; l++,row++) {
391                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
392               }
393             }
394           }
395         }
396       }
397     }
398   }
399   PetscFunctionReturn(0);
400 }
401 #define HASH_KEY 0.6180339887
402 /* #define HASH1(size,key) ((int)((size)*fmod(((key)*HASH_KEY),1))) */
403 #define HASH(size,key,tmp) (tmp = (key)*HASH_KEY,(int)((size)*(tmp-(int)tmp)))
404 /* #define HASH(size,key,tmp) ((int)((size)*fmod(((key)*HASH_KEY),1))) */
405 #undef __FUNC__
406 #define __FUNC__ "MatSetValues_MPIBAIJ_HT"
407 int MatSetValues_MPIBAIJ_HT(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
408 {
409   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
410   int         ierr,i,j,row,col;
411   int         roworiented = baij->roworiented,rstart_orig=baij->rstart_bs ;
412   int         rend_orig=baij->rend_bs,Nbs=baij->Nbs;
413   int         h1,key,size=baij->ht_size,bs=baij->bs,*HT=baij->ht,idx;
414   double      tmp;
415   Scalar      ** HD = baij->hd,value;
416 #if defined(USE_PETSC_BOPT_g)
417   int         total_ct=baij->ht_total_ct,insert_ct=baij->ht_insert_ct;
418 #endif
419 
420   PetscFunctionBegin;
421 
422   for ( i=0; i<m; i++ ) {
423 #if defined(USE_PETSC_BOPT_g)
424     if (im[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
425     if (im[i] >= baij->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
426 #endif
427       row = im[i];
428     if (row >= rstart_orig && row < rend_orig) {
429       for ( j=0; j<n; j++ ) {
430         col = in[j];
431         if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
432         /* Look up into the Hash Table */
433         key = (row/bs)*Nbs+(col/bs)+1;
434         h1  = HASH(size,key,tmp);
435 
436 
437         idx = h1;
438 #if defined(USE_PETSC_BOPT_g)
439         insert_ct++;
440         total_ct++;
441         if (HT[idx] != key) {
442           for ( idx=h1; (idx<size) && (HT[idx]!=key); idx++,total_ct++);
443           if (idx == size) {
444             for ( idx=0; (idx<h1) && (HT[idx]!=key); idx++,total_ct++);
445             if (idx == h1) {
446               SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"(row,col) has no entry in the hash table");
447             }
448           }
449         }
450 #else
451         if (HT[idx] != key) {
452           for ( idx=h1; (idx<size) && (HT[idx]!=key); idx++);
453           if (idx == size) {
454             for ( idx=0; (idx<h1) && (HT[idx]!=key); idx++);
455             if (idx == h1) {
456               SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"(row,col) has no entry in the hash table");
457             }
458           }
459         }
460 #endif
461         /* A HASH table entry is found, so insert the values at the correct address */
462         if (addv == ADD_VALUES) *(HD[idx]+ (col % bs)*bs + (row % bs)) += value;
463         else                    *(HD[idx]+ (col % bs)*bs + (row % bs))  = value;
464       }
465     } else {
466       if (roworiented && !baij->donotstash) {
467         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
468       } else {
469         if (!baij->donotstash) {
470           row = im[i];
471 	  for ( j=0; j<n; j++ ) {
472 	    ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
473           }
474         }
475       }
476     }
477   }
478 #if defined(USE_PETSC_BOPT_g)
479   baij->ht_total_ct = total_ct;
480   baij->ht_insert_ct = insert_ct;
481 #endif
482   PetscFunctionReturn(0);
483 }
484 
485 #undef __FUNC__
486 #define __FUNC__ "MatSetValuesBlocked_MPIBAIJ_HT"
487 int MatSetValuesBlocked_MPIBAIJ_HT(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
488 {
489   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
490   int         ierr,i,j,ii,jj,row,col,k,l;
491   int         roworiented = baij->roworiented,rstart=baij->rstart ;
492   int         rend=baij->rend,stepval,bs=baij->bs,bs2=baij->bs2;
493   int         h1,key,size=baij->ht_size,idx,*HT=baij->ht,Nbs=baij->Nbs;
494   double      tmp;
495   Scalar      ** HD = baij->hd,*value,*v_t,*baij_a;
496 #if defined(USE_PETSC_BOPT_g)
497   int         total_ct=baij->ht_total_ct,insert_ct=baij->ht_insert_ct;
498 #endif
499 
500   PetscFunctionBegin;
501 
502   if (roworiented) {
503     stepval = (n-1)*bs;
504   } else {
505     stepval = (m-1)*bs;
506   }
507   for ( i=0; i<m; i++ ) {
508 #if defined(USE_PETSC_BOPT_g)
509     if (im[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
510     if (im[i] >= baij->Mbs) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
511 #endif
512     row   = im[i];
513     v_t   = v + i*bs2;
514     if (row >= rstart && row < rend) {
515       for ( j=0; j<n; j++ ) {
516         col = in[j];
517 
518         /* Look up into the Hash Table */
519         key = row*Nbs+col+1;
520         h1  = HASH(size,key,tmp);
521 
522         idx = h1;
523 #if defined(USE_PETSC_BOPT_g)
524         total_ct++;
525         insert_ct++;
526        if (HT[idx] != key) {
527           for ( idx=h1; (idx<size) && (HT[idx]!=key); idx++,total_ct++);
528           if (idx == size) {
529             for ( idx=0; (idx<h1) && (HT[idx]!=key); idx++,total_ct++);
530             if (idx == h1) {
531               SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"(row,col) has no entry in the hash table");
532             }
533           }
534         }
535 #else
536         if (HT[idx] != key) {
537           for ( idx=h1; (idx<size) && (HT[idx]!=key); idx++);
538           if (idx == size) {
539             for ( idx=0; (idx<h1) && (HT[idx]!=key); idx++);
540             if (idx == h1) {
541               SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"(row,col) has no entry in the hash table");
542             }
543           }
544         }
545 #endif
546         baij_a = HD[idx];
547         if (roworiented) {
548           /*value = v + i*(stepval+bs)*bs + j*bs;*/
549           /* value = v + (i*(stepval+bs)+j)*bs; */
550           value = v_t;
551           v_t  += bs;
552           if (addv == ADD_VALUES) {
553             for ( ii=0; ii<bs; ii++,value+=stepval) {
554               for ( jj=ii; jj<bs2; jj+=bs ) {
555                 baij_a[jj]  += *value++;
556               }
557             }
558           } else {
559             for ( ii=0; ii<bs; ii++,value+=stepval) {
560               for ( jj=ii; jj<bs2; jj+=bs ) {
561                 baij_a[jj]  = *value++;
562               }
563             }
564           }
565         } else {
566           value = v + j*(stepval+bs)*bs + i*bs;
567           if (addv == ADD_VALUES) {
568             for ( ii=0; ii<bs; ii++,value+=stepval,baij_a+=bs ) {
569               for ( jj=0; jj<bs; jj++ ) {
570                 baij_a[jj]  += *value++;
571               }
572             }
573           } else {
574             for ( ii=0; ii<bs; ii++,value+=stepval,baij_a+=bs ) {
575               for ( jj=0; jj<bs; jj++ ) {
576                 baij_a[jj]  = *value++;
577               }
578             }
579           }
580         }
581       }
582     } else {
583       if (!baij->donotstash) {
584         if (roworiented ) {
585           row   = im[i]*bs;
586           value = v + i*(stepval+bs)*bs;
587           for ( j=0; j<bs; j++,row++ ) {
588             for ( k=0; k<n; k++ ) {
589               for ( col=in[k]*bs,l=0; l<bs; l++,col++) {
590                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
591               }
592             }
593           }
594         } else {
595           for ( j=0; j<n; j++ ) {
596             value = v + j*(stepval+bs)*bs + i*bs;
597             col   = in[j]*bs;
598             for ( k=0; k<bs; k++,col++,value+=stepval) {
599               for ( row = im[i]*bs,l=0; l<bs; l++,row++) {
600                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
601               }
602             }
603           }
604         }
605       }
606     }
607   }
608 #if defined(USE_PETSC_BOPT_g)
609   baij->ht_total_ct = total_ct;
610   baij->ht_insert_ct = insert_ct;
611 #endif
612   PetscFunctionReturn(0);
613 }
614 
615 #undef __FUNC__
616 #define __FUNC__ "MatGetValues_MPIBAIJ"
617 int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
618 {
619   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
620   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
621   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col,data;
622 
623   PetscFunctionBegin;
624   for ( i=0; i<m; i++ ) {
625     if (idxm[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
626     if (idxm[i] >= baij->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
627     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
628       row = idxm[i] - bsrstart;
629       for ( j=0; j<n; j++ ) {
630         if (idxn[j] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative column");
631         if (idxn[j] >= baij->N) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Column too large");
632         if (idxn[j] >= bscstart && idxn[j] < bscend){
633           col = idxn[j] - bscstart;
634           ierr = MatGetValues_SeqBAIJ(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
635         } else {
636           if (!baij->colmap) {
637             ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
638           }
639 #if defined (USE_CTABLE)
640           ierr = TableFind(baij->colmap,idxn[j]/bs+1,&data); CHKERRQ(ierr);
641           data --;
642 #else
643           data = baij->colmap[idxn[j]/bs]-1;
644 #endif
645           if((data < 0) || (baij->garray[data/bs] != idxn[j]/bs)) *(v+i*n+j) = 0.0;
646           else {
647             col  = data + idxn[j]%bs;
648             ierr = MatGetValues_SeqBAIJ(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
649           }
650         }
651       }
652     } else {
653       SETERRQ(PETSC_ERR_SUP,0,"Only local values currently supported");
654     }
655   }
656  PetscFunctionReturn(0);
657 }
658 
659 #undef __FUNC__
660 #define __FUNC__ "MatNorm_MPIBAIJ"
661 int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
662 {
663   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
664   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
665   int        ierr, i,bs2=baij->bs2;
666   double     sum = 0.0;
667   Scalar     *v;
668 
669   PetscFunctionBegin;
670   if (baij->size == 1) {
671     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
672   } else {
673     if (type == NORM_FROBENIUS) {
674       v = amat->a;
675       for (i=0; i<amat->nz*bs2; i++ ) {
676 #if defined(USE_PETSC_COMPLEX)
677         sum += PetscReal(PetscConj(*v)*(*v)); v++;
678 #else
679         sum += (*v)*(*v); v++;
680 #endif
681       }
682       v = bmat->a;
683       for (i=0; i<bmat->nz*bs2; i++ ) {
684 #if defined(USE_PETSC_COMPLEX)
685         sum += PetscReal(PetscConj(*v)*(*v)); v++;
686 #else
687         sum += (*v)*(*v); v++;
688 #endif
689       }
690       ierr = MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);CHKERRQ(ierr);
691       *norm = sqrt(*norm);
692     } else {
693       SETERRQ(PETSC_ERR_SUP,0,"No support for this norm yet");
694     }
695   }
696   PetscFunctionReturn(0);
697 }
698 
699 #undef __FUNC__
700 #define __FUNC__ "MatAssemblyBegin_MPIBAIJ"
701 int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
702 {
703   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
704   MPI_Comm    comm = mat->comm;
705   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
706   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
707   MPI_Request *send_waits,*recv_waits;
708   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
709   InsertMode  addv;
710   Scalar      *rvalues,*svalues;
711 
712   PetscFunctionBegin;
713   if (baij->donotstash) {
714     baij->svalues    = 0; baij->rvalues    = 0;
715     baij->nsends     = 0; baij->nrecvs     = 0;
716     baij->send_waits = 0; baij->recv_waits = 0;
717     baij->rmax       = 0;
718     PetscFunctionReturn(0);
719   }
720 
721   /* make sure all processors are either in INSERTMODE or ADDMODE */
722   ierr = MPI_Allreduce(&mat->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);CHKERRQ(ierr);
723   if (addv == (ADD_VALUES|INSERT_VALUES)) {
724     SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"Some processors inserted others added");
725   }
726   mat->insertmode = addv; /* in case this processor had no cache */
727 
728   /*  first count number of contributors to each processor */
729   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
730   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
731   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
732   for ( i=0; i<baij->stash.n; i++ ) {
733     idx = baij->stash.idx[i];
734     for ( j=0; j<size; j++ ) {
735       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
736         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
737       }
738     }
739   }
740   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
741 
742   /* inform other processors of number of messages and max length*/
743   work      = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
744   ierr      = MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
745   nreceives = work[rank];
746   ierr      = MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);CHKERRQ(ierr);
747   nmax      = work[rank];
748   PetscFree(work);
749 
750   /* post receives:
751        1) each message will consist of ordered pairs
752      (global index,value) we store the global index as a double
753      to simplify the message passing.
754        2) since we don't know how long each individual message is we
755      allocate the largest needed buffer for each receive. Potentially
756      this is a lot of wasted space.
757 
758 
759        This could be done better.
760   */
761   rvalues    = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));CHKPTRQ(rvalues);
762   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
763   for ( i=0; i<nreceives; i++ ) {
764     ierr = MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,comm,recv_waits+i);CHKERRQ(ierr);
765   }
766 
767   /* do sends:
768       1) starts[i] gives the starting index in svalues for stuff going to
769          the ith processor
770   */
771   svalues    = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
772   send_waits = (MPI_Request *) PetscMalloc((nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
773   starts     = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
774   starts[0] = 0;
775   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
776   for ( i=0; i<baij->stash.n; i++ ) {
777     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
778     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
779     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
780   }
781   PetscFree(owner);
782   starts[0] = 0;
783   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
784   count = 0;
785   for ( i=0; i<size; i++ ) {
786     if (procs[i]) {
787       ierr = MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,comm,send_waits+count++);CHKERRQ(ierr);
788     }
789   }
790   PetscFree(starts); PetscFree(nprocs);
791 
792   /* Free cache space */
793   PLogInfo(baij->A,"MatAssemblyBegin_MPIBAIJ:Number of off-processor values %d\n",baij->stash.n);
794   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
795 
796   baij->svalues    = svalues;    baij->rvalues    = rvalues;
797   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
798   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
799   baij->rmax       = nmax;
800 
801   PetscFunctionReturn(0);
802 }
803 
804 /*
805   Creates the hash table, and sets the table
806   This table is created only once.
807   If new entried need to be added to the matrix
808   then the hash table has to be destroyed and
809   recreated.
810 */
811 #undef __FUNC__
812 #define __FUNC__ "MatCreateHashTable_MPIBAIJ_Private"
813 int MatCreateHashTable_MPIBAIJ_Private(Mat mat,double factor)
814 {
815   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
816   Mat         A = baij->A, B=baij->B;
817   Mat_SeqBAIJ *a=(Mat_SeqBAIJ *)A->data, *b=(Mat_SeqBAIJ *)B->data;
818   int         i,j,k,nz=a->nz+b->nz,h1,*ai=a->i,*aj=a->j,*bi=b->i,*bj=b->j;
819   int         size,bs2=baij->bs2,rstart=baij->rstart;
820   int         cstart=baij->cstart,*garray=baij->garray,row,col,Nbs=baij->Nbs;
821   int         *HT,key;
822   Scalar      **HD;
823   double      tmp;
824 #if defined(USE_PETSC_BOPT_g)
825   int         ct=0,max=0;
826 #endif
827 
828   PetscFunctionBegin;
829   baij->ht_size=(int)(factor*nz);
830   size = baij->ht_size;
831 
832   if (baij->ht) {
833     PetscFunctionReturn(0);
834   }
835 
836   /* Allocate Memory for Hash Table */
837   baij->hd = (Scalar**)PetscMalloc((size)*(sizeof(int)+sizeof(Scalar*))+1); CHKPTRQ(baij->hd);
838   baij->ht = (int*)(baij->hd + size);
839   HD = baij->hd;
840   HT = baij->ht;
841 
842 
843   PetscMemzero(HD,size*(sizeof(int)+sizeof(Scalar*)));
844 
845 
846   /* Loop Over A */
847   for ( i=0; i<a->mbs; i++ ) {
848     for ( j=ai[i]; j<ai[i+1]; j++ ) {
849       row = i+rstart;
850       col = aj[j]+cstart;
851 
852       key = row*Nbs + col + 1;
853       h1  = HASH(size,key,tmp);
854       for ( k=0; k<size; k++ ){
855         if (HT[(h1+k)%size] == 0.0) {
856           HT[(h1+k)%size] = key;
857           HD[(h1+k)%size] = a->a + j*bs2;
858           break;
859 #if defined(USE_PETSC_BOPT_g)
860         } else {
861           ct++;
862 #endif
863         }
864       }
865 #if defined(USE_PETSC_BOPT_g)
866       if (k> max) max = k;
867 #endif
868     }
869   }
870   /* Loop Over B */
871   for ( i=0; i<b->mbs; i++ ) {
872     for ( j=bi[i]; j<bi[i+1]; j++ ) {
873       row = i+rstart;
874       col = garray[bj[j]];
875       key = row*Nbs + col + 1;
876       h1  = HASH(size,key,tmp);
877       for ( k=0; k<size; k++ ){
878         if (HT[(h1+k)%size] == 0.0) {
879           HT[(h1+k)%size] = key;
880           HD[(h1+k)%size] = b->a + j*bs2;
881           break;
882 #if defined(USE_PETSC_BOPT_g)
883         } else {
884           ct++;
885 #endif
886         }
887       }
888 #if defined(USE_PETSC_BOPT_g)
889       if (k> max) max = k;
890 #endif
891     }
892   }
893 
894   /* Print Summary */
895 #if defined(USE_PETSC_BOPT_g)
896   for ( i=0,j=0; i<size; i++)
897     if (HT[i]) {j++;}
898   PLogInfo(0,"MatCreateHashTable_MPIBAIJ_Private: Average Search = %5.2f,max search = %d\n",
899            (j== 0)? 0.0:((double)(ct+j))/j,max);
900 #endif
901   PetscFunctionReturn(0);
902 }
903 
904 extern int MatSetValues_SeqBAIJ(Mat,int,int *,int,int *,Scalar *,InsertMode);
905 
906 #undef __FUNC__
907 #define __FUNC__ "MatAssemblyEnd_MPIBAIJ"
908 int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
909 {
910   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
911   MPI_Status  *send_status,recv_status;
912   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
913   int         bs=baij->bs,row,col,other_disassembled;
914   Scalar      *values,val;
915   InsertMode  addv = mat->insertmode;
916 
917   PetscFunctionBegin;
918   /*  wait on receives */
919   while (count) {
920     ierr = MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);CHKERRQ(ierr);
921     /* unpack receives into our local space */
922     values = baij->rvalues + 3*imdex*baij->rmax;
923     ierr = MPI_Get_count(&recv_status,MPIU_SCALAR,&n);CHKERRQ(ierr);
924     n = n/3;
925     for ( i=0; i<n; i++ ) {
926       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
927       col = (int) PetscReal(values[3*i+1]);
928       val = values[3*i+2];
929       if (col >= baij->cstart*bs && col < baij->cend*bs) {
930         col -= baij->cstart*bs;
931         ierr = MatSetValues_SeqBAIJ(baij->A,1,&row,1,&col,&val,addv); CHKERRQ(ierr)
932       } else {
933         if (mat->was_assembled) {
934           if (!baij->colmap) {
935             ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);
936           }
937 #if defined (USE_CTABLE)
938 	  ierr = TableFind(baij->colmap,col/bs+1,&col); CHKERRQ(ierr);
939           col  = col - 1 + col%bs;
940 #else
941           col = (baij->colmap[col/bs]) - 1 + col%bs;
942 #endif
943           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
944             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
945             col = (int) PetscReal(values[3*i+1]);
946           }
947         }
948         ierr = MatSetValues_SeqBAIJ(baij->B,1,&row,1,&col,&val,addv); CHKERRQ(ierr)
949       }
950     }
951     count--;
952   }
953   if (baij->recv_waits) PetscFree(baij->recv_waits);
954   if (baij->rvalues)    PetscFree(baij->rvalues);
955 
956   /* wait on sends */
957   if (baij->nsends) {
958     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
959     ierr        = MPI_Waitall(baij->nsends,baij->send_waits,send_status);CHKERRQ(ierr);
960     PetscFree(send_status);
961   }
962   if (baij->send_waits) PetscFree(baij->send_waits);
963   if (baij->svalues)    PetscFree(baij->svalues);
964 
965   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
966   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
967 
968   /* determine if any processor has disassembled, if so we must
969      also disassemble ourselfs, in order that we may reassemble. */
970   /*
971      if nonzero structure of submatrix B cannot change then we know that
972      no processor disassembled thus we can skip this stuff
973   */
974   if (!((Mat_SeqBAIJ*) baij->B->data)->nonew)  {
975     ierr = MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);CHKERRQ(ierr);
976     if (mat->was_assembled && !other_disassembled) {
977       ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
978     }
979   }
980 
981   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
982     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
983   }
984   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
985   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
986 
987 #if defined(USE_PETSC_BOPT_g)
988   if (baij->ht && mode== MAT_FINAL_ASSEMBLY) {
989     PLogInfo(0,"MatAssemblyEnd_MPIBAIJ:Average Hash Table Search in MatSetValues = %5.2f\n",
990              ((double)baij->ht_total_ct)/baij->ht_insert_ct);
991     baij->ht_total_ct  = 0;
992     baij->ht_insert_ct = 0;
993   }
994 #endif
995   if (baij->ht_flag && !baij->ht && mode == MAT_FINAL_ASSEMBLY) {
996     ierr = MatCreateHashTable_MPIBAIJ_Private(mat,baij->ht_fact); CHKERRQ(ierr);
997     mat->ops->setvalues        = MatSetValues_MPIBAIJ_HT;
998     mat->ops->setvaluesblocked = MatSetValuesBlocked_MPIBAIJ_HT;
999   }
1000 
1001   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
1002   PetscFunctionReturn(0);
1003 }
1004 
1005 #undef __FUNC__
1006 #define __FUNC__ "MatView_MPIBAIJ_Binary"
1007 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
1008 {
1009   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
1010   int          ierr;
1011 
1012   PetscFunctionBegin;
1013   if (baij->size == 1) {
1014     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
1015   } else SETERRQ(PETSC_ERR_SUP,0,"Only uniprocessor output supported");
1016   PetscFunctionReturn(0);
1017 }
1018 
1019 #undef __FUNC__
1020 #define __FUNC__ "MatView_MPIBAIJ_ASCIIorDraworSocket"
1021 static int MatView_MPIBAIJ_ASCIIorDraworSocket(Mat mat,Viewer viewer)
1022 {
1023   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
1024   int          ierr, format,bs = baij->bs, size = baij->size, rank = baij->rank;
1025   FILE         *fd;
1026   ViewerType   vtype;
1027 
1028   PetscFunctionBegin;
1029   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
1030   if (PetscTypeCompare(vtype,ASCII_VIEWER)) {
1031     ierr = ViewerGetFormat(viewer,&format); CHKERRQ(ierr);
1032     if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
1033       MatInfo info;
1034       MPI_Comm_rank(mat->comm,&rank);
1035       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
1036       ierr = MatGetInfo(mat,MAT_LOCAL,&info);CHKERRQ(ierr);
1037       PetscSequentialPhaseBegin(mat->comm,1);
1038       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
1039               rank,baij->m,(int)info.nz_used*bs,(int)info.nz_allocated*bs,
1040               baij->bs,(int)info.memory);
1041       ierr = MatGetInfo(baij->A,MAT_LOCAL,&info);
1042       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
1043       ierr = MatGetInfo(baij->B,MAT_LOCAL,&info);
1044       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
1045       fflush(fd);
1046       PetscSequentialPhaseEnd(mat->comm,1);
1047       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
1048       PetscFunctionReturn(0);
1049     } else if (format == VIEWER_FORMAT_ASCII_INFO) {
1050       PetscPrintf(mat->comm,"  block size is %d\n",bs);
1051       PetscFunctionReturn(0);
1052     }
1053   }
1054 
1055   if (PetscTypeCompare(vtype,DRAW_VIEWER)) {
1056     Draw       draw;
1057     PetscTruth isnull;
1058     ierr = ViewerDrawGetDraw(viewer,0,&draw); CHKERRQ(ierr);
1059     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) PetscFunctionReturn(0);
1060   }
1061 
1062   if (size == 1) {
1063     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
1064   } else {
1065     /* assemble the entire matrix onto first processor. */
1066     Mat         A;
1067     Mat_SeqBAIJ *Aloc;
1068     int         M = baij->M, N = baij->N,*ai,*aj,col,i,j,k,*rvals;
1069     int         mbs = baij->mbs;
1070     Scalar      *a;
1071 
1072     if (!rank) {
1073       ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);CHKERRQ(ierr);
1074     } else {
1075       ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);CHKERRQ(ierr);
1076     }
1077     PLogObjectParent(mat,A);
1078 
1079     /* copy over the A part */
1080     Aloc = (Mat_SeqBAIJ*) baij->A->data;
1081     ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1082     rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
1083 
1084     for ( i=0; i<mbs; i++ ) {
1085       rvals[0] = bs*(baij->rstart + i);
1086       for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1087       for ( j=ai[i]; j<ai[i+1]; j++ ) {
1088         col = (baij->cstart+aj[j])*bs;
1089         for (k=0; k<bs; k++ ) {
1090           ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
1091           col++; a += bs;
1092         }
1093       }
1094     }
1095     /* copy over the B part */
1096     Aloc = (Mat_SeqBAIJ*) baij->B->data;
1097     ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1098     for ( i=0; i<mbs; i++ ) {
1099       rvals[0] = bs*(baij->rstart + i);
1100       for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1101       for ( j=ai[i]; j<ai[i+1]; j++ ) {
1102         col = baij->garray[aj[j]]*bs;
1103         for (k=0; k<bs; k++ ) {
1104           ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
1105           col++; a += bs;
1106         }
1107       }
1108     }
1109     PetscFree(rvals);
1110     ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1111     ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1112     /*
1113        Everyone has to call to draw the matrix since the graphics waits are
1114        synchronized across all processors that share the Draw object
1115     */
1116     if (!rank || PetscTypeCompare(vtype,DRAW_VIEWER)) {
1117       ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
1118     }
1119     ierr = MatDestroy(A); CHKERRQ(ierr);
1120   }
1121   PetscFunctionReturn(0);
1122 }
1123 
1124 
1125 
1126 #undef __FUNC__
1127 #define __FUNC__ "MatView_MPIBAIJ"
1128 int MatView_MPIBAIJ(Mat mat,Viewer viewer)
1129 {
1130   int         ierr;
1131   ViewerType  vtype;
1132 
1133   PetscFunctionBegin;
1134   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
1135   if (PetscTypeCompare(vtype,ASCII_VIEWER) || PetscTypeCompare(vtype,DRAW_VIEWER) ||
1136       PetscTypeCompare(vtype,SOCKET_VIEWER)) {
1137     ierr = MatView_MPIBAIJ_ASCIIorDraworSocket(mat,viewer); CHKERRQ(ierr);
1138   } else if (PetscTypeCompare(vtype,BINARY_VIEWER)) {
1139     ierr = MatView_MPIBAIJ_Binary(mat,viewer);CHKERRQ(ierr);
1140   } else {
1141     SETERRQ(1,1,"Viewer type not supported by PETSc object");
1142   }
1143   PetscFunctionReturn(0);
1144 }
1145 
1146 #undef __FUNC__
1147 #define __FUNC__ "MatDestroy_MPIBAIJ"
1148 int MatDestroy_MPIBAIJ(Mat mat)
1149 {
1150   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1151   int         ierr;
1152 
1153   PetscFunctionBegin;
1154   if (--mat->refct > 0) PetscFunctionReturn(0);
1155 
1156   if (mat->mapping) {
1157     ierr = ISLocalToGlobalMappingDestroy(mat->mapping); CHKERRQ(ierr);
1158   }
1159   if (mat->bmapping) {
1160     ierr = ISLocalToGlobalMappingDestroy(mat->bmapping); CHKERRQ(ierr);
1161   }
1162   if (mat->rmap) {
1163     ierr = MapDestroy(mat->rmap);CHKERRQ(ierr);
1164   }
1165   if (mat->cmap) {
1166     ierr = MapDestroy(mat->cmap);CHKERRQ(ierr);
1167   }
1168 #if defined(USE_PETSC_LOG)
1169   PLogObjectState((PetscObject)mat,"Rows=%d, Cols=%d",baij->M,baij->N);
1170 #endif
1171 
1172   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
1173   PetscFree(baij->rowners);
1174   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
1175   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
1176 #if defined (USE_CTABLE)
1177   if (baij->colmap) TableDelete(baij->colmap);
1178 #else
1179   if (baij->colmap) PetscFree(baij->colmap);
1180 #endif
1181   if (baij->garray) PetscFree(baij->garray);
1182   if (baij->lvec)   VecDestroy(baij->lvec);
1183   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
1184   if (baij->rowvalues) PetscFree(baij->rowvalues);
1185   if (baij->barray) PetscFree(baij->barray);
1186   if (baij->hd) PetscFree(baij->hd);
1187   PetscFree(baij);
1188   PLogObjectDestroy(mat);
1189   PetscHeaderDestroy(mat);
1190   PetscFunctionReturn(0);
1191 }
1192 
1193 #undef __FUNC__
1194 #define __FUNC__ "MatMult_MPIBAIJ"
1195 int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
1196 {
1197   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1198   int         ierr, nt;
1199 
1200   PetscFunctionBegin;
1201   ierr = VecGetLocalSize(xx,&nt);CHKERRQ(ierr);
1202   if (nt != a->n) {
1203     SETERRQ(PETSC_ERR_ARG_SIZ,0,"Incompatible partition of A and xx");
1204   }
1205   ierr = VecGetLocalSize(yy,&nt);CHKERRQ(ierr);
1206   if (nt != a->m) {
1207     SETERRQ(PETSC_ERR_ARG_SIZ,0,"Incompatible parition of A and yy");
1208   }
1209   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
1210   ierr = (*a->A->ops->mult)(a->A,xx,yy); CHKERRQ(ierr);
1211   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
1212   ierr = (*a->B->ops->multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
1213   ierr = VecScatterPostRecvs(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
1214   PetscFunctionReturn(0);
1215 }
1216 
1217 #undef __FUNC__
1218 #define __FUNC__ "MatMultAdd_MPIBAIJ"
1219 int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
1220 {
1221   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1222   int        ierr;
1223 
1224   PetscFunctionBegin;
1225   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
1226   ierr = (*a->A->ops->multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
1227   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
1228   ierr = (*a->B->ops->multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
1229   PetscFunctionReturn(0);
1230 }
1231 
1232 #undef __FUNC__
1233 #define __FUNC__ "MatMultTrans_MPIBAIJ"
1234 int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
1235 {
1236   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1237   int         ierr;
1238 
1239   PetscFunctionBegin;
1240   /* do nondiagonal part */
1241   ierr = (*a->B->ops->multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
1242   /* send it on its way */
1243   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
1244   /* do local part */
1245   ierr = (*a->A->ops->multtrans)(a->A,xx,yy); CHKERRQ(ierr);
1246   /* receive remote parts: note this assumes the values are not actually */
1247   /* inserted in yy until the next line, which is true for my implementation*/
1248   /* but is not perhaps always true. */
1249   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
1250   PetscFunctionReturn(0);
1251 }
1252 
1253 #undef __FUNC__
1254 #define __FUNC__ "MatMultTransAdd_MPIBAIJ"
1255 int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
1256 {
1257   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1258   int         ierr;
1259 
1260   PetscFunctionBegin;
1261   /* do nondiagonal part */
1262   ierr = (*a->B->ops->multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
1263   /* send it on its way */
1264   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
1265   /* do local part */
1266   ierr = (*a->A->ops->multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
1267   /* receive remote parts: note this assumes the values are not actually */
1268   /* inserted in yy until the next line, which is true for my implementation*/
1269   /* but is not perhaps always true. */
1270   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
1271   PetscFunctionReturn(0);
1272 }
1273 
1274 /*
1275   This only works correctly for square matrices where the subblock A->A is the
1276    diagonal block
1277 */
1278 #undef __FUNC__
1279 #define __FUNC__ "MatGetDiagonal_MPIBAIJ"
1280 int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
1281 {
1282   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1283   int         ierr;
1284 
1285   PetscFunctionBegin;
1286   if (a->M != a->N) SETERRQ(PETSC_ERR_SUP,0,"Supports only square matrix where A->A is diag block");
1287   ierr = MatGetDiagonal(a->A,v);CHKERRQ(ierr);
1288   PetscFunctionReturn(0);
1289 }
1290 
1291 #undef __FUNC__
1292 #define __FUNC__ "MatScale_MPIBAIJ"
1293 int MatScale_MPIBAIJ(Scalar *aa,Mat A)
1294 {
1295   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1296   int         ierr;
1297 
1298   PetscFunctionBegin;
1299   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
1300   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
1301   PetscFunctionReturn(0);
1302 }
1303 
1304 #undef __FUNC__
1305 #define __FUNC__ "MatGetSize_MPIBAIJ"
1306 int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
1307 {
1308   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1309 
1310   PetscFunctionBegin;
1311   if (m) *m = mat->M;
1312   if (n) *n = mat->N;
1313   PetscFunctionReturn(0);
1314 }
1315 
1316 #undef __FUNC__
1317 #define __FUNC__ "MatGetLocalSize_MPIBAIJ"
1318 int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
1319 {
1320   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1321 
1322   PetscFunctionBegin;
1323   *m = mat->m; *n = mat->n;
1324   PetscFunctionReturn(0);
1325 }
1326 
1327 #undef __FUNC__
1328 #define __FUNC__ "MatGetOwnershipRange_MPIBAIJ"
1329 int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
1330 {
1331   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1332 
1333   PetscFunctionBegin;
1334   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
1335   PetscFunctionReturn(0);
1336 }
1337 
1338 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
1339 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
1340 
1341 #undef __FUNC__
1342 #define __FUNC__ "MatGetRow_MPIBAIJ"
1343 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
1344 {
1345   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1346   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
1347   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
1348   int        nztot, nzA, nzB, lrow, brstart = mat->rstart*bs, brend = mat->rend*bs;
1349   int        *cmap, *idx_p,cstart = mat->cstart;
1350 
1351   PetscFunctionBegin;
1352   if (mat->getrowactive == PETSC_TRUE) SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"Already active");
1353   mat->getrowactive = PETSC_TRUE;
1354 
1355   if (!mat->rowvalues && (idx || v)) {
1356     /*
1357         allocate enough space to hold information from the longest row.
1358     */
1359     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
1360     int     max = 1,mbs = mat->mbs,tmp;
1361     for ( i=0; i<mbs; i++ ) {
1362       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
1363       if (max < tmp) { max = tmp; }
1364     }
1365     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
1366     CHKPTRQ(mat->rowvalues);
1367     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
1368   }
1369 
1370   if (row < brstart || row >= brend) SETERRQ(PETSC_ERR_SUP,0,"Only local rows")
1371   lrow = row - brstart;
1372 
1373   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
1374   if (!v)   {pvA = 0; pvB = 0;}
1375   if (!idx) {pcA = 0; if (!v) pcB = 0;}
1376   ierr = (*mat->A->ops->getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1377   ierr = (*mat->B->ops->getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1378   nztot = nzA + nzB;
1379 
1380   cmap  = mat->garray;
1381   if (v  || idx) {
1382     if (nztot) {
1383       /* Sort by increasing column numbers, assuming A and B already sorted */
1384       int imark = -1;
1385       if (v) {
1386         *v = v_p = mat->rowvalues;
1387         for ( i=0; i<nzB; i++ ) {
1388           if (cmap[cworkB[i]/bs] < cstart)   v_p[i] = vworkB[i];
1389           else break;
1390         }
1391         imark = i;
1392         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
1393         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
1394       }
1395       if (idx) {
1396         *idx = idx_p = mat->rowindices;
1397         if (imark > -1) {
1398           for ( i=0; i<imark; i++ ) {
1399             idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs;
1400           }
1401         } else {
1402           for ( i=0; i<nzB; i++ ) {
1403             if (cmap[cworkB[i]/bs] < cstart)
1404               idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
1405             else break;
1406           }
1407           imark = i;
1408         }
1409         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart*bs + cworkA[i];
1410         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
1411       }
1412     } else {
1413       if (idx) *idx = 0;
1414       if (v)   *v   = 0;
1415     }
1416   }
1417   *nz = nztot;
1418   ierr = (*mat->A->ops->restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1419   ierr = (*mat->B->ops->restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1420   PetscFunctionReturn(0);
1421 }
1422 
1423 #undef __FUNC__
1424 #define __FUNC__ "MatRestoreRow_MPIBAIJ"
1425 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
1426 {
1427   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1428 
1429   PetscFunctionBegin;
1430   if (baij->getrowactive == PETSC_FALSE) {
1431     SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"MatGetRow not called");
1432   }
1433   baij->getrowactive = PETSC_FALSE;
1434   PetscFunctionReturn(0);
1435 }
1436 
1437 #undef __FUNC__
1438 #define __FUNC__ "MatGetBlockSize_MPIBAIJ"
1439 int MatGetBlockSize_MPIBAIJ(Mat mat,int *bs)
1440 {
1441   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1442 
1443   PetscFunctionBegin;
1444   *bs = baij->bs;
1445   PetscFunctionReturn(0);
1446 }
1447 
1448 #undef __FUNC__
1449 #define __FUNC__ "MatZeroEntries_MPIBAIJ"
1450 int MatZeroEntries_MPIBAIJ(Mat A)
1451 {
1452   Mat_MPIBAIJ *l = (Mat_MPIBAIJ *) A->data;
1453   int         ierr;
1454 
1455   PetscFunctionBegin;
1456   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
1457   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
1458   PetscFunctionReturn(0);
1459 }
1460 
1461 #undef __FUNC__
1462 #define __FUNC__ "MatGetInfo_MPIBAIJ"
1463 int MatGetInfo_MPIBAIJ(Mat matin,MatInfoType flag,MatInfo *info)
1464 {
1465   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) matin->data;
1466   Mat         A = a->A, B = a->B;
1467   int         ierr;
1468   double      isend[5], irecv[5];
1469 
1470   PetscFunctionBegin;
1471   info->block_size     = (double)a->bs;
1472   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
1473   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->nz_unneeded;
1474   isend[3] = info->memory;  isend[4] = info->mallocs;
1475   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
1476   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->nz_unneeded;
1477   isend[3] += info->memory;  isend[4] += info->mallocs;
1478   if (flag == MAT_LOCAL) {
1479     info->nz_used      = isend[0];
1480     info->nz_allocated = isend[1];
1481     info->nz_unneeded  = isend[2];
1482     info->memory       = isend[3];
1483     info->mallocs      = isend[4];
1484   } else if (flag == MAT_GLOBAL_MAX) {
1485     ierr = MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_MAX,matin->comm);CHKERRQ(ierr);
1486     info->nz_used      = irecv[0];
1487     info->nz_allocated = irecv[1];
1488     info->nz_unneeded  = irecv[2];
1489     info->memory       = irecv[3];
1490     info->mallocs      = irecv[4];
1491   } else if (flag == MAT_GLOBAL_SUM) {
1492     ierr = MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_SUM,matin->comm);CHKERRQ(ierr);
1493     info->nz_used      = irecv[0];
1494     info->nz_allocated = irecv[1];
1495     info->nz_unneeded  = irecv[2];
1496     info->memory       = irecv[3];
1497     info->mallocs      = irecv[4];
1498   } else {
1499     SETERRQ1(1,1,"Unknown MatInfoType argument %d",flag);
1500   }
1501   info->rows_global       = (double)a->M;
1502   info->columns_global    = (double)a->N;
1503   info->rows_local        = (double)a->m;
1504   info->columns_local     = (double)a->N;
1505   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
1506   info->fill_ratio_needed = 0;
1507   info->factor_mallocs    = 0;
1508   PetscFunctionReturn(0);
1509 }
1510 
1511 #undef __FUNC__
1512 #define __FUNC__ "MatSetOption_MPIBAIJ"
1513 int MatSetOption_MPIBAIJ(Mat A,MatOption op)
1514 {
1515   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1516   int         ierr;
1517 
1518   PetscFunctionBegin;
1519   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
1520       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
1521       op == MAT_COLUMNS_UNSORTED ||
1522       op == MAT_COLUMNS_SORTED ||
1523       op == MAT_NEW_NONZERO_ALLOCATION_ERR ||
1524       op == MAT_NEW_NONZERO_LOCATION_ERR) {
1525         ierr = MatSetOption(a->A,op);CHKERRQ(ierr);
1526         ierr = MatSetOption(a->B,op);CHKERRQ(ierr);
1527   } else if (op == MAT_ROW_ORIENTED) {
1528         a->roworiented = 1;
1529         ierr = MatSetOption(a->A,op);CHKERRQ(ierr);
1530         ierr = MatSetOption(a->B,op);CHKERRQ(ierr);
1531   } else if (op == MAT_ROWS_SORTED ||
1532              op == MAT_ROWS_UNSORTED ||
1533              op == MAT_SYMMETRIC ||
1534              op == MAT_STRUCTURALLY_SYMMETRIC ||
1535              op == MAT_YES_NEW_DIAGONALS ||
1536              op == MAT_USE_HASH_TABLE) {
1537     PLogInfo(A,"Info:MatSetOption_MPIBAIJ:Option ignored\n");
1538   } else if (op == MAT_COLUMN_ORIENTED) {
1539     a->roworiented = 0;
1540     ierr = MatSetOption(a->A,op);CHKERRQ(ierr);
1541     ierr = MatSetOption(a->B,op);CHKERRQ(ierr);
1542   } else if (op == MAT_IGNORE_OFF_PROC_ENTRIES) {
1543     a->donotstash = 1;
1544   } else if (op == MAT_NO_NEW_DIAGONALS) {
1545     SETERRQ(PETSC_ERR_SUP,0,"MAT_NO_NEW_DIAGONALS");
1546   } else if (op == MAT_USE_HASH_TABLE) {
1547     a->ht_flag = 1;
1548   } else {
1549     SETERRQ(PETSC_ERR_SUP,0,"unknown option");
1550   }
1551   PetscFunctionReturn(0);
1552 }
1553 
1554 #undef __FUNC__
1555 #define __FUNC__ "MatTranspose_MPIBAIJ("
1556 int MatTranspose_MPIBAIJ(Mat A,Mat *matout)
1557 {
1558   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) A->data;
1559   Mat_SeqBAIJ *Aloc;
1560   Mat        B;
1561   int        ierr,M=baij->M,N=baij->N,*ai,*aj,i,*rvals,j,k,col;
1562   int        bs=baij->bs,mbs=baij->mbs;
1563   Scalar     *a;
1564 
1565   PetscFunctionBegin;
1566   if (matout == PETSC_NULL && M != N) SETERRQ(PETSC_ERR_ARG_SIZ,0,"Square matrix only for in-place");
1567   ierr = MatCreateMPIBAIJ(A->comm,baij->bs,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);
1568   CHKERRQ(ierr);
1569 
1570   /* copy over the A part */
1571   Aloc = (Mat_SeqBAIJ*) baij->A->data;
1572   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1573   rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
1574 
1575   for ( i=0; i<mbs; i++ ) {
1576     rvals[0] = bs*(baij->rstart + i);
1577     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1578     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1579       col = (baij->cstart+aj[j])*bs;
1580       for (k=0; k<bs; k++ ) {
1581         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1582         col++; a += bs;
1583       }
1584     }
1585   }
1586   /* copy over the B part */
1587   Aloc = (Mat_SeqBAIJ*) baij->B->data;
1588   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1589   for ( i=0; i<mbs; i++ ) {
1590     rvals[0] = bs*(baij->rstart + i);
1591     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1592     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1593       col = baij->garray[aj[j]]*bs;
1594       for (k=0; k<bs; k++ ) {
1595         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1596         col++; a += bs;
1597       }
1598     }
1599   }
1600   PetscFree(rvals);
1601   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1602   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1603 
1604   if (matout != PETSC_NULL) {
1605     *matout = B;
1606   } else {
1607     PetscOps *Abops;
1608     MatOps   Aops;
1609 
1610     /* This isn't really an in-place transpose .... but free data structures from baij */
1611     PetscFree(baij->rowners);
1612     ierr = MatDestroy(baij->A); CHKERRQ(ierr);
1613     ierr = MatDestroy(baij->B); CHKERRQ(ierr);
1614 #if defined (USE_CTABLE)
1615     if (baij->colmap) TableDelete(baij->colmap);
1616 #else
1617     if (baij->colmap) PetscFree(baij->colmap);
1618 #endif
1619     if (baij->garray) PetscFree(baij->garray);
1620     if (baij->lvec) VecDestroy(baij->lvec);
1621     if (baij->Mvctx) VecScatterDestroy(baij->Mvctx);
1622     PetscFree(baij);
1623 
1624     /*
1625        This is horrible, horrible code. We need to keep the
1626       A pointers for the bops and ops but copy everything
1627       else from C.
1628     */
1629     Abops = A->bops;
1630     Aops  = A->ops;
1631     PetscMemcpy(A,B,sizeof(struct _p_Mat));
1632     A->bops = Abops;
1633     A->ops  = Aops;
1634 
1635     PetscHeaderDestroy(B);
1636   }
1637   PetscFunctionReturn(0);
1638 }
1639 
1640 #undef __FUNC__
1641 #define __FUNC__ "MatDiagonalScale_MPIBAIJ"
1642 int MatDiagonalScale_MPIBAIJ(Mat A,Vec ll,Vec rr)
1643 {
1644   Mat a = ((Mat_MPIBAIJ *) A->data)->A;
1645   Mat b = ((Mat_MPIBAIJ *) A->data)->B;
1646   int ierr,s1,s2,s3;
1647 
1648   PetscFunctionBegin;
1649   if (ll)  {
1650     ierr = VecGetLocalSize(ll,&s1); CHKERRQ(ierr);
1651     ierr = MatGetLocalSize(A,&s2,&s3); CHKERRQ(ierr);
1652     if (s1!=s2) SETERRQ(PETSC_ERR_ARG_SIZ,0,"non-conforming local sizes");
1653     ierr = MatDiagonalScale(a,ll,0); CHKERRQ(ierr);
1654     ierr = MatDiagonalScale(b,ll,0); CHKERRQ(ierr);
1655   }
1656   if (rr) SETERRQ(PETSC_ERR_SUP,0,"not supported for right vector");
1657   PetscFunctionReturn(0);
1658 }
1659 
1660 #undef __FUNC__
1661 #define __FUNC__ "MatZeroRows_MPIBAIJ"
1662 int MatZeroRows_MPIBAIJ(Mat A,IS is,Scalar *diag)
1663 {
1664   Mat_MPIBAIJ    *l = (Mat_MPIBAIJ *) A->data;
1665   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
1666   int            *procs,*nprocs,j,found,idx,nsends,*work,row;
1667   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
1668   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
1669   int            *lens,imdex,*lrows,*values,bs=l->bs,rstart_bs=l->rstart_bs;
1670   MPI_Comm       comm = A->comm;
1671   MPI_Request    *send_waits,*recv_waits;
1672   MPI_Status     recv_status,*send_status;
1673   IS             istmp;
1674 
1675   PetscFunctionBegin;
1676   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
1677   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
1678 
1679   /*  first count number of contributors to each processor */
1680   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
1681   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
1682   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
1683   for ( i=0; i<N; i++ ) {
1684     idx = rows[i];
1685     found = 0;
1686     for ( j=0; j<size; j++ ) {
1687       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
1688         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
1689       }
1690     }
1691     if (!found) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Index out of range");
1692   }
1693   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
1694 
1695   /* inform other processors of number of messages and max length*/
1696   work   = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
1697   ierr   = MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
1698   nrecvs = work[rank];
1699   ierr   = MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);CHKERRQ(ierr);
1700   nmax   = work[rank];
1701   PetscFree(work);
1702 
1703   /* post receives:   */
1704   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); CHKPTRQ(rvalues);
1705   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
1706   for ( i=0; i<nrecvs; i++ ) {
1707     ierr = MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);CHKERRQ(ierr);
1708   }
1709 
1710   /* do sends:
1711      1) starts[i] gives the starting index in svalues for stuff going to
1712      the ith processor
1713   */
1714   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
1715   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
1716   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
1717   starts[0] = 0;
1718   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1719   for ( i=0; i<N; i++ ) {
1720     svalues[starts[owner[i]]++] = rows[i];
1721   }
1722   ISRestoreIndices(is,&rows);
1723 
1724   starts[0] = 0;
1725   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1726   count = 0;
1727   for ( i=0; i<size; i++ ) {
1728     if (procs[i]) {
1729       ierr = MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);CHKERRQ(ierr);
1730     }
1731   }
1732   PetscFree(starts);
1733 
1734   base = owners[rank]*bs;
1735 
1736   /*  wait on receives */
1737   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
1738   source = lens + nrecvs;
1739   count  = nrecvs; slen = 0;
1740   while (count) {
1741     ierr = MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);CHKERRQ(ierr);
1742     /* unpack receives into our local space */
1743     ierr = MPI_Get_count(&recv_status,MPI_INT,&n);CHKERRQ(ierr);
1744     source[imdex]  = recv_status.MPI_SOURCE;
1745     lens[imdex]  = n;
1746     slen += n;
1747     count--;
1748   }
1749   PetscFree(recv_waits);
1750 
1751   /* move the data into the send scatter */
1752   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
1753   count = 0;
1754   for ( i=0; i<nrecvs; i++ ) {
1755     values = rvalues + i*nmax;
1756     for ( j=0; j<lens[i]; j++ ) {
1757       lrows[count++] = values[j] - base;
1758     }
1759   }
1760   PetscFree(rvalues); PetscFree(lens);
1761   PetscFree(owner); PetscFree(nprocs);
1762 
1763   /* actually zap the local rows */
1764   ierr = ISCreateGeneral(PETSC_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
1765   PLogObjectParent(A,istmp);
1766 
1767   /*
1768         Zero the required rows. If the "diagonal block" of the matrix
1769      is square and the user wishes to set the diagonal we use seperate
1770      code so that MatSetValues() is not called for each diagonal allocating
1771      new memory, thus calling lots of mallocs and slowing things down.
1772 
1773        Contributed by: Mathew Knepley
1774   */
1775   /* must zero l->B before l->A because the (diag) case below may put values into l->B*/
1776   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
1777   if (diag && (l->A->M == l->A->N)) {
1778     ierr      = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
1779   } else if (diag) {
1780     ierr = MatZeroRows(l->A,istmp,0); CHKERRQ(ierr);
1781     if (((Mat_SeqBAIJ*)l->A->data)->nonew) {
1782       SETERRQ(PETSC_ERR_SUP,0,"MatZeroRows() on rectangular matrices cannot be used with the Mat options \n\
1783 MAT_NO_NEW_NONZERO_LOCATIONS,MAT_NEW_NONZERO_LOCATION_ERR,MAT_NEW_NONZERO_ALLOCATION_ERR");
1784     }
1785     for ( i = 0; i < slen; i++ ) {
1786       row  = lrows[i] + rstart_bs;
1787       ierr = MatSetValues(A,1,&row,1,&row,diag,INSERT_VALUES);CHKERRQ(ierr);
1788     }
1789     ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
1790     ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
1791   } else {
1792     ierr = MatZeroRows(l->A,istmp,0); CHKERRQ(ierr);
1793   }
1794 
1795   ierr = ISDestroy(istmp); CHKERRQ(ierr);
1796   PetscFree(lrows);
1797 
1798   /* wait on sends */
1799   if (nsends) {
1800     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
1801     ierr        = MPI_Waitall(nsends,send_waits,send_status);CHKERRQ(ierr);
1802     PetscFree(send_status);
1803   }
1804   PetscFree(send_waits); PetscFree(svalues);
1805 
1806   PetscFunctionReturn(0);
1807 }
1808 
1809 extern int MatPrintHelp_SeqBAIJ(Mat);
1810 #undef __FUNC__
1811 #define __FUNC__ "MatPrintHelp_MPIBAIJ"
1812 int MatPrintHelp_MPIBAIJ(Mat A)
1813 {
1814   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1815   MPI_Comm    comm = A->comm;
1816   static int  called = 0;
1817   int         ierr;
1818 
1819   PetscFunctionBegin;
1820   if (!a->rank) {
1821     ierr = MatPrintHelp_SeqBAIJ(a->A);CHKERRQ(ierr);
1822   }
1823   if (called) {PetscFunctionReturn(0);} else called = 1;
1824   (*PetscHelpPrintf)(comm," Options for MATMPIBAIJ matrix format (the defaults):\n");
1825   (*PetscHelpPrintf)(comm,"  -mat_use_hash_table <factor>: Use hashtable for efficient matrix assembly\n");
1826   PetscFunctionReturn(0);
1827 }
1828 
1829 #undef __FUNC__
1830 #define __FUNC__ "MatSetUnfactored_MPIBAIJ"
1831 int MatSetUnfactored_MPIBAIJ(Mat A)
1832 {
1833   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1834   int         ierr;
1835 
1836   PetscFunctionBegin;
1837   ierr = MatSetUnfactored(a->A); CHKERRQ(ierr);
1838   PetscFunctionReturn(0);
1839 }
1840 
1841 static int MatDuplicate_MPIBAIJ(Mat,MatDuplicateOption,Mat *);
1842 
1843 /* -------------------------------------------------------------------*/
1844 static struct _MatOps MatOps_Values = {
1845   MatSetValues_MPIBAIJ,
1846   MatGetRow_MPIBAIJ,
1847   MatRestoreRow_MPIBAIJ,
1848   MatMult_MPIBAIJ,
1849   MatMultAdd_MPIBAIJ,
1850   MatMultTrans_MPIBAIJ,
1851   MatMultTransAdd_MPIBAIJ,
1852   0,
1853   0,
1854   0,
1855   0,
1856   0,
1857   0,
1858   0,
1859   MatTranspose_MPIBAIJ,
1860   MatGetInfo_MPIBAIJ,
1861   0,
1862   MatGetDiagonal_MPIBAIJ,
1863   MatDiagonalScale_MPIBAIJ,
1864   MatNorm_MPIBAIJ,
1865   MatAssemblyBegin_MPIBAIJ,
1866   MatAssemblyEnd_MPIBAIJ,
1867   0,
1868   MatSetOption_MPIBAIJ,
1869   MatZeroEntries_MPIBAIJ,
1870   MatZeroRows_MPIBAIJ,
1871   0,
1872   0,
1873   0,
1874   0,
1875   MatGetSize_MPIBAIJ,
1876   MatGetLocalSize_MPIBAIJ,
1877   MatGetOwnershipRange_MPIBAIJ,
1878   0,
1879   0,
1880   0,
1881   0,
1882   MatDuplicate_MPIBAIJ,
1883   0,
1884   0,
1885   0,
1886   0,
1887   0,
1888   MatGetSubMatrices_MPIBAIJ,
1889   MatIncreaseOverlap_MPIBAIJ,
1890   MatGetValues_MPIBAIJ,
1891   0,
1892   MatPrintHelp_MPIBAIJ,
1893   MatScale_MPIBAIJ,
1894   0,
1895   0,
1896   0,
1897   MatGetBlockSize_MPIBAIJ,
1898   0,
1899   0,
1900   0,
1901   0,
1902   0,
1903   0,
1904   MatSetUnfactored_MPIBAIJ,
1905   0,
1906   MatSetValuesBlocked_MPIBAIJ,
1907   0,
1908   0,
1909   0,
1910   MatGetMaps_Petsc};
1911 
1912 
1913 EXTERN_C_BEGIN
1914 #undef __FUNC__
1915 #define __FUNC__ "MatGetDiagonalBlock_MPIBAIJ"
1916 int MatGetDiagonalBlock_MPIBAIJ(Mat A,PetscTruth *iscopy,MatReuse reuse,Mat *a)
1917 {
1918   PetscFunctionBegin;
1919   *a      = ((Mat_MPIBAIJ *)A->data)->A;
1920   *iscopy = PETSC_FALSE;
1921   PetscFunctionReturn(0);
1922 }
1923 EXTERN_C_END
1924 
1925 #undef __FUNC__
1926 #define __FUNC__ "MatCreateMPIBAIJ"
1927 /*@C
1928    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
1929    (block compressed row).  For good matrix assembly performance
1930    the user should preallocate the matrix storage by setting the parameters
1931    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1932    performance can be increased by more than a factor of 50.
1933 
1934    Collective on MPI_Comm
1935 
1936    Input Parameters:
1937 +  comm - MPI communicator
1938 .  bs   - size of blockk
1939 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1940            This value should be the same as the local size used in creating the
1941            y vector for the matrix-vector product y = Ax.
1942 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1943            This value should be the same as the local size used in creating the
1944            x vector for the matrix-vector product y = Ax.
1945 .  M - number of global rows (or PETSC_DETERMINE to have calculated if m is given)
1946 .  N - number of global columns (or PETSC_DETERMINE to have calculated if n is given)
1947 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
1948            submatrix  (same for all local rows)
1949 .  d_nzz - array containing the number of block nonzeros in the various block rows
1950            of the in diagonal portion of the local (possibly different for each block
1951            row) or PETSC_NULL.  You must leave room for the diagonal entry even if it is zero.
1952 .  o_nz  - number of block nonzeros per block row in the off-diagonal portion of local
1953            submatrix (same for all local rows).
1954 -  o_nzz - array containing the number of nonzeros in the various block rows of the
1955            off-diagonal portion of the local submatrix (possibly different for
1956            each block row) or PETSC_NULL.
1957 
1958    Output Parameter:
1959 .  A - the matrix
1960 
1961    Options Database Keys:
1962 .   -mat_no_unroll - uses code that does not unroll the loops in the
1963                      block calculations (much slower)
1964 .   -mat_block_size - size of the blocks to use
1965 .   -mat_mpi - use the parallel matrix data structures even on one processor
1966                (defaults to using SeqBAIJ format on one processor)
1967 
1968    Notes:
1969    The user MUST specify either the local or global matrix dimensions
1970    (possibly both).
1971 
1972    If PETSC_DECIDE or  PETSC_DETERMINE is used for a particular argument on one processor
1973    than it must be used on all processors that share the object for that argument.
1974 
1975    Storage Information:
1976    For a square global matrix we define each processor's diagonal portion
1977    to be its local rows and the corresponding columns (a square submatrix);
1978    each processor's off-diagonal portion encompasses the remainder of the
1979    local matrix (a rectangular submatrix).
1980 
1981    The user can specify preallocated storage for the diagonal part of
1982    the local submatrix with either d_nz or d_nnz (not both).  Set
1983    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1984    memory allocation.  Likewise, specify preallocated storage for the
1985    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1986 
1987    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1988    the figure below we depict these three local rows and all columns (0-11).
1989 
1990 .vb
1991            0 1 2 3 4 5 6 7 8 9 10 11
1992           -------------------
1993    row 3  |  o o o d d d o o o o o o
1994    row 4  |  o o o d d d o o o o o o
1995    row 5  |  o o o d d d o o o o o o
1996           -------------------
1997 .ve
1998 
1999    Thus, any entries in the d locations are stored in the d (diagonal)
2000    submatrix, and any entries in the o locations are stored in the
2001    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
2002    stored simply in the MATSEQBAIJ format for compressed row storage.
2003 
2004    Now d_nz should indicate the number of block nonzeros per row in the d matrix,
2005    and o_nz should indicate the number of block nonzeros per row in the o matrix.
2006    In general, for PDE problems in which most nonzeros are near the diagonal,
2007    one expects d_nz >> o_nz.   For large problems you MUST preallocate memory
2008    or you will get TERRIBLE performance; see the users' manual chapter on
2009    matrices.
2010 
2011    Level: intermediate
2012 
2013 .keywords: matrix, block, aij, compressed row, sparse, parallel
2014 
2015 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues(), MatCreateMPIAIJ()
2016 @*/
2017 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
2018                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
2019 {
2020   Mat          B;
2021   Mat_MPIBAIJ  *b;
2022   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE,size,flg;
2023   int          flag1 = 0,flag2 = 0;
2024 
2025   PetscFunctionBegin;
2026   if (bs < 1) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Invalid block size specified, must be positive");
2027 
2028   MPI_Comm_size(comm,&size);
2029   ierr = OptionsHasName(PETSC_NULL,"-mat_mpibaij",&flag1); CHKERRQ(ierr);
2030   ierr = OptionsHasName(PETSC_NULL,"-mat_mpi",&flag2); CHKERRQ(ierr);
2031   if (!flag1 && !flag2 && size == 1) {
2032     if (M == PETSC_DECIDE) M = m;
2033     if (N == PETSC_DECIDE) N = n;
2034     ierr = MatCreateSeqBAIJ(comm,bs,M,N,d_nz,d_nnz,A); CHKERRQ(ierr);
2035     PetscFunctionReturn(0);
2036   }
2037 
2038   *A = 0;
2039   PetscHeaderCreate(B,_p_Mat,struct _MatOps,MAT_COOKIE,MATMPIBAIJ,"Mat",comm,MatDestroy,MatView);
2040   PLogObjectCreate(B);
2041   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
2042   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
2043   PetscMemcpy(B->ops,&MatOps_Values,sizeof(struct _MatOps));
2044 
2045   B->ops->destroy    = MatDestroy_MPIBAIJ;
2046   B->ops->view       = MatView_MPIBAIJ;
2047   B->mapping    = 0;
2048   B->factor     = 0;
2049   B->assembled  = PETSC_FALSE;
2050 
2051   B->insertmode = NOT_SET_VALUES;
2052   MPI_Comm_rank(comm,&b->rank);
2053   MPI_Comm_size(comm,&b->size);
2054 
2055   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL)) {
2056     SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
2057   }
2058   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) {
2059     SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"either M or m should be specified");
2060   }
2061   if ( N == PETSC_DECIDE && n == PETSC_DECIDE) {
2062     SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"either N or n should be specified");
2063   }
2064   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
2065   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
2066 
2067   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
2068     work[0] = m; work[1] = n;
2069     mbs = m/bs; nbs = n/bs;
2070     ierr = MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );CHKERRQ(ierr);
2071     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
2072     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
2073   }
2074   if (m == PETSC_DECIDE) {
2075     Mbs = M/bs;
2076     if (Mbs*bs != M) SETERRQ(PETSC_ERR_ARG_SIZ,0,"No of global rows must be divisible by blocksize");
2077     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
2078     m   = mbs*bs;
2079   }
2080   if (n == PETSC_DECIDE) {
2081     Nbs = N/bs;
2082     if (Nbs*bs != N) SETERRQ(PETSC_ERR_ARG_SIZ,0,"No of global cols must be divisible by blocksize");
2083     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
2084     n   = nbs*bs;
2085   }
2086   if (mbs*bs != m || nbs*bs != n) {
2087     SETERRQ(PETSC_ERR_ARG_SIZ,0,"No of local rows, cols must be divisible by blocksize");
2088   }
2089 
2090   b->m = m; B->m = m;
2091   b->n = n; B->n = n;
2092   b->N = N; B->N = N;
2093   b->M = M; B->M = M;
2094   b->bs  = bs;
2095   b->bs2 = bs*bs;
2096   b->mbs = mbs;
2097   b->nbs = nbs;
2098   b->Mbs = Mbs;
2099   b->Nbs = Nbs;
2100 
2101   /* the information in the maps duplicates the information computed below, eventually
2102      we should remove the duplicate information that is not contained in the maps */
2103   ierr = MapCreateMPI(comm,m,M,&B->rmap);CHKERRQ(ierr);
2104   ierr = MapCreateMPI(comm,n,N,&B->cmap);CHKERRQ(ierr);
2105 
2106   /* build local table of row and column ownerships */
2107   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
2108   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIBAIJ));
2109   b->cowners = b->rowners + b->size + 2;
2110   ierr = MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
2111   b->rowners[0] = 0;
2112   for ( i=2; i<=b->size; i++ ) {
2113     b->rowners[i] += b->rowners[i-1];
2114   }
2115   b->rstart    = b->rowners[b->rank];
2116   b->rend      = b->rowners[b->rank+1];
2117   b->rstart_bs = b->rstart * bs;
2118   b->rend_bs   = b->rend * bs;
2119 
2120   ierr = MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
2121   b->cowners[0] = 0;
2122   for ( i=2; i<=b->size; i++ ) {
2123     b->cowners[i] += b->cowners[i-1];
2124   }
2125   b->cstart    = b->cowners[b->rank];
2126   b->cend      = b->cowners[b->rank+1];
2127   b->cstart_bs = b->cstart * bs;
2128   b->cend_bs   = b->cend * bs;
2129 
2130 
2131   if (d_nz == PETSC_DEFAULT) d_nz = 5;
2132   ierr = MatCreateSeqBAIJ(PETSC_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
2133   PLogObjectParent(B,b->A);
2134   if (o_nz == PETSC_DEFAULT) o_nz = 0;
2135   ierr = MatCreateSeqBAIJ(PETSC_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
2136   PLogObjectParent(B,b->B);
2137 
2138   /* build cache for off array entries formed */
2139   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
2140   b->donotstash  = 0;
2141   b->colmap      = 0;
2142   b->garray      = 0;
2143   b->roworiented = 1;
2144 
2145   /* stuff used in block assembly */
2146   b->barray       = 0;
2147 
2148   /* stuff used for matrix vector multiply */
2149   b->lvec         = 0;
2150   b->Mvctx        = 0;
2151 
2152   /* stuff for MatGetRow() */
2153   b->rowindices   = 0;
2154   b->rowvalues    = 0;
2155   b->getrowactive = PETSC_FALSE;
2156 
2157   /* hash table stuff */
2158   b->ht           = 0;
2159   b->hd           = 0;
2160   b->ht_size      = 0;
2161   b->ht_flag      = 0;
2162   b->ht_fact      = 0;
2163   b->ht_total_ct  = 0;
2164   b->ht_insert_ct = 0;
2165 
2166   *A = B;
2167   ierr = OptionsHasName(PETSC_NULL,"-mat_use_hash_table",&flg); CHKERRQ(ierr);
2168   if (flg) {
2169     double fact = 1.39;
2170     ierr = MatSetOption(B,MAT_USE_HASH_TABLE); CHKERRQ(ierr);
2171     ierr = OptionsGetDouble(PETSC_NULL,"-mat_use_hash_table",&fact,&flg); CHKERRQ(ierr);
2172     if (fact <= 1.0) fact = 1.39;
2173     ierr = MatMPIBAIJSetHashTableFactor(B,fact); CHKERRQ(ierr);
2174     PLogInfo(0,"MatCreateMPIBAIJ:Hash table Factor used %5.2f\n",fact);
2175   }
2176   ierr = PetscObjectComposeFunction((PetscObject)B,"MatGetDiagonalBlock_C",
2177                                      "MatGetDiagonalBlock_MPIBAIJ",
2178                                      (void*)MatGetDiagonalBlock_MPIBAIJ);CHKERRQ(ierr);
2179   PetscFunctionReturn(0);
2180 }
2181 
2182 #undef __FUNC__
2183 #define __FUNC__ "MatDuplicate_MPIBAIJ"
2184 static int MatDuplicate_MPIBAIJ(Mat matin,MatDuplicateOption cpvalues,Mat *newmat)
2185 {
2186   Mat         mat;
2187   Mat_MPIBAIJ *a,*oldmat = (Mat_MPIBAIJ *) matin->data;
2188   int         ierr, len=0, flg;
2189 
2190   PetscFunctionBegin;
2191   *newmat       = 0;
2192   PetscHeaderCreate(mat,_p_Mat,struct _MatOps,MAT_COOKIE,MATMPIBAIJ,"Mat",matin->comm,MatDestroy,MatView);
2193   PLogObjectCreate(mat);
2194   mat->data       = (void *) (a = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(a);
2195   PetscMemcpy(mat->ops,&MatOps_Values,sizeof(struct _MatOps));
2196   mat->ops->destroy    = MatDestroy_MPIBAIJ;
2197   mat->ops->view       = MatView_MPIBAIJ;
2198   mat->factor          = matin->factor;
2199   mat->assembled       = PETSC_TRUE;
2200   mat->insertmode      = NOT_SET_VALUES;
2201 
2202   a->m = mat->m   = oldmat->m;
2203   a->n = mat->n   = oldmat->n;
2204   a->M = mat->M   = oldmat->M;
2205   a->N = mat->N   = oldmat->N;
2206 
2207   a->bs  = oldmat->bs;
2208   a->bs2 = oldmat->bs2;
2209   a->mbs = oldmat->mbs;
2210   a->nbs = oldmat->nbs;
2211   a->Mbs = oldmat->Mbs;
2212   a->Nbs = oldmat->Nbs;
2213 
2214   a->rstart       = oldmat->rstart;
2215   a->rend         = oldmat->rend;
2216   a->cstart       = oldmat->cstart;
2217   a->cend         = oldmat->cend;
2218   a->size         = oldmat->size;
2219   a->rank         = oldmat->rank;
2220   a->donotstash   = oldmat->donotstash;
2221   a->roworiented  = oldmat->roworiented;
2222   a->rowindices   = 0;
2223   a->rowvalues    = 0;
2224   a->getrowactive = PETSC_FALSE;
2225   a->barray       = 0;
2226   a->rstart_bs    = oldmat->rstart_bs;
2227   a->rend_bs      = oldmat->rend_bs;
2228   a->cstart_bs    = oldmat->cstart_bs;
2229   a->cend_bs      = oldmat->cend_bs;
2230 
2231   /* hash table stuff */
2232   a->ht           = 0;
2233   a->hd           = 0;
2234   a->ht_size      = 0;
2235   a->ht_flag      = oldmat->ht_flag;
2236   a->ht_fact      = oldmat->ht_fact;
2237   a->ht_total_ct  = 0;
2238   a->ht_insert_ct = 0;
2239 
2240 
2241   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
2242   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIBAIJ));
2243   a->cowners = a->rowners + a->size + 2;
2244   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
2245   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
2246   if (oldmat->colmap) {
2247 #if defined (USE_CTABLE)
2248   ierr = TableCreateCopy(oldmat->colmap,&a->colmap); CHKERRQ(ierr);
2249 #else
2250     a->colmap = (int *) PetscMalloc((a->Nbs)*sizeof(int));CHKPTRQ(a->colmap);
2251     PLogObjectMemory(mat,(a->Nbs)*sizeof(int));
2252     PetscMemcpy(a->colmap,oldmat->colmap,(a->Nbs)*sizeof(int));
2253 #endif
2254   } else a->colmap = 0;
2255   if (oldmat->garray && (len = ((Mat_SeqBAIJ *) (oldmat->B->data))->nbs)) {
2256     a->garray = (int *) PetscMalloc(len*sizeof(int)); CHKPTRQ(a->garray);
2257     PLogObjectMemory(mat,len*sizeof(int));
2258     PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
2259   } else a->garray = 0;
2260 
2261   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
2262   PLogObjectParent(mat,a->lvec);
2263   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
2264 
2265   PLogObjectParent(mat,a->Mvctx);
2266   ierr =  MatDuplicate(oldmat->A,cpvalues,&a->A); CHKERRQ(ierr);
2267   PLogObjectParent(mat,a->A);
2268   ierr =  MatDuplicate(oldmat->B,cpvalues,&a->B); CHKERRQ(ierr);
2269   PLogObjectParent(mat,a->B);
2270   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
2271   ierr = FListDuplicate(mat->qlist,&matin->qlist);CHKERRQ(ierr);
2272   if (flg) {
2273     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
2274   }
2275   *newmat = mat;
2276   PetscFunctionReturn(0);
2277 }
2278 
2279 #include "sys.h"
2280 
2281 #undef __FUNC__
2282 #define __FUNC__ "MatLoad_MPIBAIJ"
2283 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
2284 {
2285   Mat          A;
2286   int          i, nz, ierr, j,rstart, rend, fd;
2287   Scalar       *vals,*buf;
2288   MPI_Comm     comm = ((PetscObject)viewer)->comm;
2289   MPI_Status   status;
2290   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
2291   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
2292   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,Mbs,mbs,extra_rows;
2293   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
2294   int          dcount,kmax,k,nzcount,tmp;
2295 
2296   PetscFunctionBegin;
2297   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
2298 
2299   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
2300   if (!rank) {
2301     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
2302     ierr = PetscBinaryRead(fd,(char *)header,4,PETSC_INT); CHKERRQ(ierr);
2303     if (header[0] != MAT_COOKIE) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"not matrix object");
2304     if (header[3] < 0) {
2305       SETERRQ(PETSC_ERR_FILE_UNEXPECTED,1,"Matrix stored in special format, cannot load as MPIBAIJ");
2306     }
2307   }
2308 
2309   ierr = MPI_Bcast(header+1,3,MPI_INT,0,comm);CHKERRQ(ierr);
2310   M = header[1]; N = header[2];
2311 
2312   if (M != N) SETERRQ(PETSC_ERR_SUP,0,"Can only do square matrices");
2313 
2314   /*
2315      This code adds extra rows to make sure the number of rows is
2316      divisible by the blocksize
2317   */
2318   Mbs        = M/bs;
2319   extra_rows = bs - M + bs*(Mbs);
2320   if (extra_rows == bs) extra_rows = 0;
2321   else                  Mbs++;
2322   if (extra_rows &&!rank) {
2323     PLogInfo(0,"MatLoad_MPIBAIJ:Padding loaded matrix to match blocksize\n");
2324   }
2325 
2326   /* determine ownership of all rows */
2327   mbs = Mbs/size + ((Mbs % size) > rank);
2328   m   = mbs * bs;
2329   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
2330   browners = rowners + size + 1;
2331   ierr = MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
2332   rowners[0] = 0;
2333   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
2334   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
2335   rstart = rowners[rank];
2336   rend   = rowners[rank+1];
2337 
2338   /* distribute row lengths to all processors */
2339   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
2340   if (!rank) {
2341     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
2342     ierr = PetscBinaryRead(fd,rowlengths,M,PETSC_INT); CHKERRQ(ierr);
2343     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
2344     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
2345     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
2346     ierr = MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);CHKERRQ(ierr);
2347     PetscFree(sndcounts);
2348   } else {
2349     ierr = MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);CHKERRQ(ierr);
2350   }
2351 
2352   if (!rank) {
2353     /* calculate the number of nonzeros on each processor */
2354     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
2355     PetscMemzero(procsnz,size*sizeof(int));
2356     for ( i=0; i<size; i++ ) {
2357       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
2358         procsnz[i] += rowlengths[j];
2359       }
2360     }
2361     PetscFree(rowlengths);
2362 
2363     /* determine max buffer needed and allocate it */
2364     maxnz = 0;
2365     for ( i=0; i<size; i++ ) {
2366       maxnz = PetscMax(maxnz,procsnz[i]);
2367     }
2368     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
2369 
2370     /* read in my part of the matrix column indices  */
2371     nz = procsnz[0];
2372     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
2373     mycols = ibuf;
2374     if (size == 1)  nz -= extra_rows;
2375     ierr = PetscBinaryRead(fd,mycols,nz,PETSC_INT); CHKERRQ(ierr);
2376     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
2377 
2378     /* read in every ones (except the last) and ship off */
2379     for ( i=1; i<size-1; i++ ) {
2380       nz   = procsnz[i];
2381       ierr = PetscBinaryRead(fd,cols,nz,PETSC_INT); CHKERRQ(ierr);
2382       ierr = MPI_Send(cols,nz,MPI_INT,i,tag,comm);CHKERRQ(ierr);
2383     }
2384     /* read in the stuff for the last proc */
2385     if ( size != 1 ) {
2386       nz   = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
2387       ierr = PetscBinaryRead(fd,cols,nz,PETSC_INT); CHKERRQ(ierr);
2388       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
2389       ierr = MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);CHKERRQ(ierr);
2390     }
2391     PetscFree(cols);
2392   } else {
2393     /* determine buffer space needed for message */
2394     nz = 0;
2395     for ( i=0; i<m; i++ ) {
2396       nz += locrowlens[i];
2397     }
2398     ibuf   = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
2399     mycols = ibuf;
2400     /* receive message of column indices*/
2401     ierr = MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);CHKERRQ(ierr);
2402     ierr = MPI_Get_count(&status,MPI_INT,&maxnz);CHKERRQ(ierr);
2403     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"something is wrong with file");
2404   }
2405 
2406   /* loop over local rows, determining number of off diagonal entries */
2407   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
2408   odlens = dlens + (rend-rstart);
2409   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
2410   PetscMemzero(mask,3*Mbs*sizeof(int));
2411   masked1 = mask    + Mbs;
2412   masked2 = masked1 + Mbs;
2413   rowcount = 0; nzcount = 0;
2414   for ( i=0; i<mbs; i++ ) {
2415     dcount  = 0;
2416     odcount = 0;
2417     for ( j=0; j<bs; j++ ) {
2418       kmax = locrowlens[rowcount];
2419       for ( k=0; k<kmax; k++ ) {
2420         tmp = mycols[nzcount++]/bs;
2421         if (!mask[tmp]) {
2422           mask[tmp] = 1;
2423           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
2424           else masked1[dcount++] = tmp;
2425         }
2426       }
2427       rowcount++;
2428     }
2429 
2430     dlens[i]  = dcount;
2431     odlens[i] = odcount;
2432 
2433     /* zero out the mask elements we set */
2434     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
2435     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
2436   }
2437 
2438   /* create our matrix */
2439   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);
2440          CHKERRQ(ierr);
2441   A = *newmat;
2442   MatSetOption(A,MAT_COLUMNS_SORTED);
2443 
2444   if (!rank) {
2445     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
2446     /* read in my part of the matrix numerical values  */
2447     nz = procsnz[0];
2448     vals = buf;
2449     mycols = ibuf;
2450     if (size == 1)  nz -= extra_rows;
2451     ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
2452     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
2453 
2454     /* insert into matrix */
2455     jj      = rstart*bs;
2456     for ( i=0; i<m; i++ ) {
2457       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
2458       mycols += locrowlens[i];
2459       vals   += locrowlens[i];
2460       jj++;
2461     }
2462     /* read in other processors (except the last one) and ship out */
2463     for ( i=1; i<size-1; i++ ) {
2464       nz   = procsnz[i];
2465       vals = buf;
2466       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
2467       ierr = MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);CHKERRQ(ierr);
2468     }
2469     /* the last proc */
2470     if ( size != 1 ){
2471       nz   = procsnz[i] - extra_rows;
2472       vals = buf;
2473       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
2474       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
2475       ierr = MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);CHKERRQ(ierr);
2476     }
2477     PetscFree(procsnz);
2478   } else {
2479     /* receive numeric values */
2480     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
2481 
2482     /* receive message of values*/
2483     vals   = buf;
2484     mycols = ibuf;
2485     ierr   = MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);CHKERRQ(ierr);
2486     ierr   = MPI_Get_count(&status,MPIU_SCALAR,&maxnz);CHKERRQ(ierr);
2487     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"something is wrong with file");
2488 
2489     /* insert into matrix */
2490     jj      = rstart*bs;
2491     for ( i=0; i<m; i++ ) {
2492       ierr    = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
2493       mycols += locrowlens[i];
2494       vals   += locrowlens[i];
2495       jj++;
2496     }
2497   }
2498   PetscFree(locrowlens);
2499   PetscFree(buf);
2500   PetscFree(ibuf);
2501   PetscFree(rowners);
2502   PetscFree(dlens);
2503   PetscFree(mask);
2504   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2505   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2506   PetscFunctionReturn(0);
2507 }
2508 
2509 
2510 
2511 #undef __FUNC__
2512 #define __FUNC__ "MatMPIBAIJSetHashTableFactor"
2513 /*@
2514    MatMPIBAIJSetHashTableFactor - Sets the factor required to compute the size of the HashTable.
2515 
2516    Input Parameters:
2517 .  mat  - the matrix
2518 .  fact - factor
2519 
2520    Collective on Mat
2521 
2522   Notes:
2523    This can also be set by the command line option: -mat_use_hash_table fact
2524 
2525 .keywords: matrix, hashtable, factor, HT
2526 
2527 .seealso: MatSetOption()
2528 @*/
2529 int MatMPIBAIJSetHashTableFactor(Mat mat,double fact)
2530 {
2531   Mat_MPIBAIJ *baij;
2532 
2533   PetscFunctionBegin;
2534   PetscValidHeaderSpecific(mat,MAT_COOKIE);
2535   if (mat->type != MATMPIBAIJ) {
2536       SETERRQ(PETSC_ERR_ARG_WRONG,1,"Incorrect matrix type. Use MPIBAIJ only.");
2537   }
2538   baij = (Mat_MPIBAIJ*) mat->data;
2539   baij->ht_fact = fact;
2540   PetscFunctionReturn(0);
2541 }
2542