xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision fbbc08e16cd775b8c27b3e96bde110f8b14868e7)
1 #ifdef PETSC_RCS_HEADER
2 static char vcid[] = "$Id: mpibaij.c,v 1.145 1999/01/27 19:47:51 bsmith Exp curfman $";
3 #endif
4 
5 #include "src/mat/impls/baij/mpi/mpibaij.h"   /*I  "mat.h"  I*/
6 #include "src/vec/vecimpl.h"
7 
8 extern int MatSetUpMultiply_MPIBAIJ(Mat);
9 extern int DisAssemble_MPIBAIJ(Mat);
10 extern int MatIncreaseOverlap_MPIBAIJ(Mat,int,IS *,int);
11 extern int MatGetSubMatrices_MPIBAIJ(Mat,int,IS *,IS *,MatReuse,Mat **);
12 extern int MatGetValues_SeqBAIJ(Mat,int,int *,int,int *,Scalar *);
13 
14 /*
15      Local utility routine that creates a mapping from the global column
16    number to the local number in the off-diagonal part of the local
17    storage of the matrix.  This is done in a non scable way since the
18    length of colmap equals the global matrix length.
19 */
20 #undef __FUNC__
21 #define __FUNC__ "CreateColmap_MPIBAIJ_Private"
22 static int CreateColmap_MPIBAIJ_Private(Mat mat)
23 {
24   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
25   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
26   int         nbs = B->nbs,i,bs=B->bs;
27 #if defined (USE_CTABLE)
28   int         ierr;
29 #endif
30 
31   PetscFunctionBegin;
32 #if defined (USE_CTABLE)
33   ierr = TableCreate( &baij->colmap, baij->nbs/5 ); CHKERRQ(ierr);
34   for ( i=0; i<nbs; i++ ){
35     ierr = TableAdd( baij->colmap, baij->garray[i] + 1, i*bs+1 );CHKERRQ(ierr);
36   }
37 #else
38   baij->colmap = (int *) PetscMalloc((baij->Nbs+1)*sizeof(int));CHKPTRQ(baij->colmap);
39   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
40   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
41   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i*bs+1;
42 #endif
43   PetscFunctionReturn(0);
44 }
45 
46 #define CHUNKSIZE  10
47 
48 #define  MatSetValues_SeqBAIJ_A_Private(row,col,value,addv) \
49 { \
50  \
51     brow = row/bs;  \
52     rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
53     rmax = aimax[brow]; nrow = ailen[brow]; \
54       bcol = col/bs; \
55       ridx = row % bs; cidx = col % bs; \
56       low = 0; high = nrow; \
57       while (high-low > 3) { \
58         t = (low+high)/2; \
59         if (rp[t] > bcol) high = t; \
60         else              low  = t; \
61       } \
62       for ( _i=low; _i<high; _i++ ) { \
63         if (rp[_i] > bcol) break; \
64         if (rp[_i] == bcol) { \
65           bap  = ap +  bs2*_i + bs*cidx + ridx; \
66           if (addv == ADD_VALUES) *bap += value;  \
67           else                    *bap  = value;  \
68           goto a_noinsert; \
69         } \
70       } \
71       if (a->nonew == 1) goto a_noinsert; \
72       else if (a->nonew == -1) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero into matrix"); \
73       if (nrow >= rmax) { \
74         /* there is no extra room in row, therefore enlarge */ \
75         int    new_nz = ai[a->mbs] + CHUNKSIZE,len,*new_i,*new_j; \
76         Scalar *new_a; \
77  \
78         if (a->nonew == -2) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero in the matrix"); \
79  \
80         /* malloc new storage space */ \
81         len     = new_nz*(sizeof(int)+bs2*sizeof(Scalar))+(a->mbs+1)*sizeof(int); \
82         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
83         new_j   = (int *) (new_a + bs2*new_nz); \
84         new_i   = new_j + new_nz; \
85  \
86         /* copy over old data into new slots */ \
87         for ( ii=0; ii<brow+1; ii++ ) {new_i[ii] = ai[ii];} \
88         for ( ii=brow+1; ii<a->mbs+1; ii++ ) {new_i[ii] = ai[ii]+CHUNKSIZE;} \
89         PetscMemcpy(new_j,aj,(ai[brow]+nrow)*sizeof(int)); \
90         len = (new_nz - CHUNKSIZE - ai[brow] - nrow); \
91         PetscMemcpy(new_j+ai[brow]+nrow+CHUNKSIZE,aj+ai[brow]+nrow, \
92                                                            len*sizeof(int)); \
93         PetscMemcpy(new_a,aa,(ai[brow]+nrow)*bs2*sizeof(Scalar)); \
94         PetscMemzero(new_a+bs2*(ai[brow]+nrow),bs2*CHUNKSIZE*sizeof(Scalar)); \
95         PetscMemcpy(new_a+bs2*(ai[brow]+nrow+CHUNKSIZE), \
96                     aa+bs2*(ai[brow]+nrow),bs2*len*sizeof(Scalar));  \
97         /* free up old matrix storage */ \
98         PetscFree(a->a);  \
99         if (!a->singlemalloc) {PetscFree(a->i);PetscFree(a->j);} \
100         aa = a->a = new_a; ai = a->i = new_i; aj = a->j = new_j;  \
101         a->singlemalloc = 1; \
102  \
103         rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
104         rmax = aimax[brow] = aimax[brow] + CHUNKSIZE; \
105         PLogObjectMemory(A,CHUNKSIZE*(sizeof(int) + bs2*sizeof(Scalar))); \
106         a->maxnz += bs2*CHUNKSIZE; \
107         a->reallocs++; \
108         a->nz++; \
109       } \
110       N = nrow++ - 1;  \
111       /* shift up all the later entries in this row */ \
112       for ( ii=N; ii>=_i; ii-- ) { \
113         rp[ii+1] = rp[ii]; \
114         PetscMemcpy(ap+bs2*(ii+1),ap+bs2*(ii),bs2*sizeof(Scalar)); \
115       } \
116       if (N>=_i) PetscMemzero(ap+bs2*_i,bs2*sizeof(Scalar));  \
117       rp[_i]                      = bcol;  \
118       ap[bs2*_i + bs*cidx + ridx] = value;  \
119       a_noinsert:; \
120     ailen[brow] = nrow; \
121 }
122 
123 #define  MatSetValues_SeqBAIJ_B_Private(row,col,value,addv) \
124 { \
125  \
126     brow = row/bs;  \
127     rp   = bj + bi[brow]; ap = ba + bs2*bi[brow]; \
128     rmax = bimax[brow]; nrow = bilen[brow]; \
129       bcol = col/bs; \
130       ridx = row % bs; cidx = col % bs; \
131       low = 0; high = nrow; \
132       while (high-low > 3) { \
133         t = (low+high)/2; \
134         if (rp[t] > bcol) high = t; \
135         else              low  = t; \
136       } \
137       for ( _i=low; _i<high; _i++ ) { \
138         if (rp[_i] > bcol) break; \
139         if (rp[_i] == bcol) { \
140           bap  = ap +  bs2*_i + bs*cidx + ridx; \
141           if (addv == ADD_VALUES) *bap += value;  \
142           else                    *bap  = value;  \
143           goto b_noinsert; \
144         } \
145       } \
146       if (b->nonew == 1) goto b_noinsert; \
147       else if (b->nonew == -1) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero into matrix"); \
148       if (nrow >= rmax) { \
149         /* there is no extra room in row, therefore enlarge */ \
150         int    new_nz = bi[b->mbs] + CHUNKSIZE,len,*new_i,*new_j; \
151         Scalar *new_a; \
152  \
153         if (b->nonew == -2) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero in the matrix"); \
154  \
155         /* malloc new storage space */ \
156         len     = new_nz*(sizeof(int)+bs2*sizeof(Scalar))+(b->mbs+1)*sizeof(int); \
157         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
158         new_j   = (int *) (new_a + bs2*new_nz); \
159         new_i   = new_j + new_nz; \
160  \
161         /* copy over old data into new slots */ \
162         for ( ii=0; ii<brow+1; ii++ ) {new_i[ii] = bi[ii];} \
163         for ( ii=brow+1; ii<b->mbs+1; ii++ ) {new_i[ii] = bi[ii]+CHUNKSIZE;} \
164         PetscMemcpy(new_j,bj,(bi[brow]+nrow)*sizeof(int)); \
165         len = (new_nz - CHUNKSIZE - bi[brow] - nrow); \
166         PetscMemcpy(new_j+bi[brow]+nrow+CHUNKSIZE,bj+bi[brow]+nrow, \
167                                                            len*sizeof(int)); \
168         PetscMemcpy(new_a,ba,(bi[brow]+nrow)*bs2*sizeof(Scalar)); \
169         PetscMemzero(new_a+bs2*(bi[brow]+nrow),bs2*CHUNKSIZE*sizeof(Scalar)); \
170         PetscMemcpy(new_a+bs2*(bi[brow]+nrow+CHUNKSIZE), \
171                     ba+bs2*(bi[brow]+nrow),bs2*len*sizeof(Scalar));  \
172         /* free up old matrix storage */ \
173         PetscFree(b->a);  \
174         if (!b->singlemalloc) {PetscFree(b->i);PetscFree(b->j);} \
175         ba = b->a = new_a; bi = b->i = new_i; bj = b->j = new_j;  \
176         b->singlemalloc = 1; \
177  \
178         rp   = bj + bi[brow]; ap = ba + bs2*bi[brow]; \
179         rmax = bimax[brow] = bimax[brow] + CHUNKSIZE; \
180         PLogObjectMemory(B,CHUNKSIZE*(sizeof(int) + bs2*sizeof(Scalar))); \
181         b->maxnz += bs2*CHUNKSIZE; \
182         b->reallocs++; \
183         b->nz++; \
184       } \
185       N = nrow++ - 1;  \
186       /* shift up all the later entries in this row */ \
187       for ( ii=N; ii>=_i; ii-- ) { \
188         rp[ii+1] = rp[ii]; \
189         PetscMemcpy(ap+bs2*(ii+1),ap+bs2*(ii),bs2*sizeof(Scalar)); \
190       } \
191       if (N>=_i) PetscMemzero(ap+bs2*_i,bs2*sizeof(Scalar));  \
192       rp[_i]                      = bcol;  \
193       ap[bs2*_i + bs*cidx + ridx] = value;  \
194       b_noinsert:; \
195     bilen[brow] = nrow; \
196 }
197 
198 #undef __FUNC__
199 #define __FUNC__ "MatSetValues_MPIBAIJ"
200 int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
201 {
202   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
203   Scalar      value;
204   int         ierr,i,j,row,col;
205   int         roworiented = baij->roworiented,rstart_orig=baij->rstart_bs ;
206   int         rend_orig=baij->rend_bs,cstart_orig=baij->cstart_bs;
207   int         cend_orig=baij->cend_bs,bs=baij->bs;
208 
209   /* Some Variables required in the macro */
210   Mat         A = baij->A;
211   Mat_SeqBAIJ *a = (Mat_SeqBAIJ *) (A)->data;
212   int         *aimax=a->imax,*ai=a->i,*ailen=a->ilen,*aj=a->j;
213   Scalar      *aa=a->a;
214 
215   Mat         B = baij->B;
216   Mat_SeqBAIJ *b = (Mat_SeqBAIJ *) (B)->data;
217   int         *bimax=b->imax,*bi=b->i,*bilen=b->ilen,*bj=b->j;
218   Scalar      *ba=b->a;
219 
220   int         *rp,ii,nrow,_i,rmax,N,brow,bcol;
221   int         low,high,t,ridx,cidx,bs2=a->bs2;
222   Scalar      *ap,*bap;
223 
224   PetscFunctionBegin;
225   for ( i=0; i<m; i++ ) {
226     if (im[i] < 0) continue;
227 #if defined(USE_PETSC_BOPT_g)
228     if (im[i] >= baij->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
229 #endif
230     if (im[i] >= rstart_orig && im[i] < rend_orig) {
231       row = im[i] - rstart_orig;
232       for ( j=0; j<n; j++ ) {
233         if (in[j] >= cstart_orig && in[j] < cend_orig){
234           col = in[j] - cstart_orig;
235           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
236           MatSetValues_SeqBAIJ_A_Private(row,col,value,addv);
237           /* ierr = MatSetValues_SeqBAIJ(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
238         }
239         else if (in[j] < 0) continue;
240 #if defined(USE_PETSC_BOPT_g)
241         else if (in[j] >= baij->N) {SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Col too large");}
242 #endif
243         else {
244           if (mat->was_assembled) {
245             if (!baij->colmap) {
246               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
247             }
248 #if defined (USE_CTABLE)
249 	    col = TableFind( baij->colmap, in[j]/bs + 1 ) - 1 + in[j]%bs;
250 #else
251             col = baij->colmap[in[j]/bs] - 1 + in[j]%bs;
252 #endif
253             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
254               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
255               col =  in[j];
256               /* Reinitialize the variables required by MatSetValues_SeqBAIJ_B_Private() */
257               B = baij->B;
258               b = (Mat_SeqBAIJ *) (B)->data;
259               bimax=b->imax;bi=b->i;bilen=b->ilen;bj=b->j;
260               ba=b->a;
261             }
262           } else col = in[j];
263           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
264           MatSetValues_SeqBAIJ_B_Private(row,col,value,addv);
265           /* ierr = MatSetValues_SeqBAIJ(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
266         }
267       }
268     } else {
269       if (roworiented && !baij->donotstash) {
270         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
271       } else {
272         if (!baij->donotstash) {
273           row = im[i];
274 	  for ( j=0; j<n; j++ ) {
275 	    ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
276           }
277         }
278       }
279     }
280   }
281   PetscFunctionReturn(0);
282 }
283 
284 extern int MatSetValuesBlocked_SeqBAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
285 #undef __FUNC__
286 #define __FUNC__ "MatSetValuesBlocked_MPIBAIJ"
287 int MatSetValuesBlocked_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
288 {
289   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
290   Scalar      *value,*barray=baij->barray;
291   int         ierr,i,j,ii,jj,row,col,k,l;
292   int         roworiented = baij->roworiented,rstart=baij->rstart ;
293   int         rend=baij->rend,cstart=baij->cstart,stepval;
294   int         cend=baij->cend,bs=baij->bs,bs2=baij->bs2;
295 
296   if(!barray) {
297     baij->barray = barray = (Scalar*) PetscMalloc(bs2*sizeof(Scalar)); CHKPTRQ(barray);
298   }
299 
300   if (roworiented) {
301     stepval = (n-1)*bs;
302   } else {
303     stepval = (m-1)*bs;
304   }
305   for ( i=0; i<m; i++ ) {
306     if (im[i] < 0) continue;
307 #if defined(USE_PETSC_BOPT_g)
308     if (im[i] >= baij->Mbs) SETERRQ2(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large, row %d max %d",im[i],baij->Mbs);
309 #endif
310     if (im[i] >= rstart && im[i] < rend) {
311       row = im[i] - rstart;
312       for ( j=0; j<n; j++ ) {
313         /* If NumCol = 1 then a copy is not required */
314         if ((roworiented) && (n == 1)) {
315           barray = v + i*bs2;
316         } else if((!roworiented) && (m == 1)) {
317           barray = v + j*bs2;
318         } else { /* Here a copy is required */
319           if (roworiented) {
320             value = v + i*(stepval+bs)*bs + j*bs;
321           } else {
322             value = v + j*(stepval+bs)*bs + i*bs;
323           }
324           for ( ii=0; ii<bs; ii++,value+=stepval ) {
325             for (jj=0; jj<bs; jj++ ) {
326               *barray++  = *value++;
327             }
328           }
329           barray -=bs2;
330         }
331 
332         if (in[j] >= cstart && in[j] < cend){
333           col  = in[j] - cstart;
334           ierr = MatSetValuesBlocked_SeqBAIJ(baij->A,1,&row,1,&col,barray,addv);CHKERRQ(ierr);
335         }
336         else if (in[j] < 0) continue;
337 #if defined(USE_PETSC_BOPT_g)
338         else if (in[j] >= baij->Nbs) {SETERRQ2(PETSC_ERR_ARG_OUTOFRANGE,0,"Column too large, col %d max %d",in[j],baij->Nbs);}
339 #endif
340         else {
341           if (mat->was_assembled) {
342             if (!baij->colmap) {
343               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
344             }
345 
346 #if defined(USE_PETSC_BOPT_g)
347 #if defined (USE_CTABLE)
348             if( (TableFind( baij->colmap, in[j] + 1 ) - 1) % bs)
349 	      {SETERRQ(PETSC_ERR_PLIB,0,"Incorrect colmap");}
350 #else
351             if ((baij->colmap[in[j]] - 1) % bs) {SETERRQ(PETSC_ERR_PLIB,0,"Incorrect colmap");}
352 #endif
353 #endif
354 #if defined (USE_CTABLE)
355 	    col = (TableFind( baij->colmap, in[j] + 1 ) - 1)/bs;
356 #else
357             col = (baij->colmap[in[j]] - 1)/bs;
358 #endif
359             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
360               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
361               col =  in[j];
362             }
363           }
364           else col = in[j];
365           ierr = MatSetValuesBlocked_SeqBAIJ(baij->B,1,&row,1,&col,barray,addv);CHKERRQ(ierr);
366         }
367       }
368     } else {
369       if (!baij->donotstash) {
370         if (roworiented ) {
371           row   = im[i]*bs;
372           value = v + i*(stepval+bs)*bs;
373           for ( j=0; j<bs; j++,row++ ) {
374             for ( k=0; k<n; k++ ) {
375               for ( col=in[k]*bs,l=0; l<bs; l++,col++) {
376                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
377               }
378             }
379           }
380         } else {
381           for ( j=0; j<n; j++ ) {
382             value = v + j*(stepval+bs)*bs + i*bs;
383             col   = in[j]*bs;
384             for ( k=0; k<bs; k++,col++,value+=stepval) {
385               for ( row = im[i]*bs,l=0; l<bs; l++,row++) {
386                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
387               }
388             }
389           }
390         }
391       }
392     }
393   }
394   PetscFunctionReturn(0);
395 }
396 #define HASH_KEY 0.6180339887
397 /* #define HASH1(size,key) ((int)((size)*fmod(((key)*HASH_KEY),1))) */
398 #define HASH(size,key,tmp) (tmp = (key)*HASH_KEY,(int)((size)*(tmp-(int)tmp)))
399 /* #define HASH(size,key,tmp) ((int)((size)*fmod(((key)*HASH_KEY),1))) */
400 #undef __FUNC__
401 #define __FUNC__ "MatSetValues_MPIBAIJ_HT"
402 int MatSetValues_MPIBAIJ_HT(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
403 {
404   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
405   int         ierr,i,j,row,col;
406   int         roworiented = baij->roworiented,rstart_orig=baij->rstart_bs ;
407   int         rend_orig=baij->rend_bs,Nbs=baij->Nbs;
408   int         h1,key,size=baij->ht_size,bs=baij->bs,*HT=baij->ht,idx;
409   double      tmp;
410   Scalar      ** HD = baij->hd,value;
411 #if defined(USE_PETSC_BOPT_g)
412   int         total_ct=baij->ht_total_ct,insert_ct=baij->ht_insert_ct;
413 #endif
414 
415   PetscFunctionBegin;
416 
417   for ( i=0; i<m; i++ ) {
418 #if defined(USE_PETSC_BOPT_g)
419     if (im[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
420     if (im[i] >= baij->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
421 #endif
422       row = im[i];
423     if (row >= rstart_orig && row < rend_orig) {
424       for ( j=0; j<n; j++ ) {
425         col = in[j];
426         if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
427         /* Look up into the Hash Table */
428         key = (row/bs)*Nbs+(col/bs)+1;
429         h1  = HASH(size,key,tmp);
430 
431 
432         idx = h1;
433 #if defined(USE_PETSC_BOPT_g)
434         insert_ct++;
435         total_ct++;
436         if (HT[idx] != key) {
437           for ( idx=h1; (idx<size) && (HT[idx]!=key); idx++,total_ct++);
438           if (idx == size) {
439             for ( idx=0; (idx<h1) && (HT[idx]!=key); idx++,total_ct++);
440             if (idx == h1) {
441               SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"(row,col) has no entry in the hash table");
442             }
443           }
444         }
445 #else
446         if (HT[idx] != key) {
447           for ( idx=h1; (idx<size) && (HT[idx]!=key); idx++);
448           if (idx == size) {
449             for ( idx=0; (idx<h1) && (HT[idx]!=key); idx++);
450             if (idx == h1) {
451               SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"(row,col) has no entry in the hash table");
452             }
453           }
454         }
455 #endif
456         /* A HASH table entry is found, so insert the values at the correct address */
457         if (addv == ADD_VALUES) *(HD[idx]+ (col % bs)*bs + (row % bs)) += value;
458         else                    *(HD[idx]+ (col % bs)*bs + (row % bs))  = value;
459       }
460     } else {
461       if (roworiented && !baij->donotstash) {
462         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
463       } else {
464         if (!baij->donotstash) {
465           row = im[i];
466 	  for ( j=0; j<n; j++ ) {
467 	    ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
468           }
469         }
470       }
471     }
472   }
473 #if defined(USE_PETSC_BOPT_g)
474   baij->ht_total_ct = total_ct;
475   baij->ht_insert_ct = insert_ct;
476 #endif
477   PetscFunctionReturn(0);
478 }
479 
480 #undef __FUNC__
481 #define __FUNC__ "MatSetValuesBlocked_MPIBAIJ_HT"
482 int MatSetValuesBlocked_MPIBAIJ_HT(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
483 {
484   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
485   int         ierr,i,j,ii,jj,row,col,k,l;
486   int         roworiented = baij->roworiented,rstart=baij->rstart ;
487   int         rend=baij->rend,stepval,bs=baij->bs,bs2=baij->bs2;
488   int         h1,key,size=baij->ht_size,idx,*HT=baij->ht,Nbs=baij->Nbs;
489   double      tmp;
490   Scalar      ** HD = baij->hd,*value,*v_t,*baij_a;
491 #if defined(USE_PETSC_BOPT_g)
492   int         total_ct=baij->ht_total_ct,insert_ct=baij->ht_insert_ct;
493 #endif
494 
495   PetscFunctionBegin;
496 
497   if (roworiented) {
498     stepval = (n-1)*bs;
499   } else {
500     stepval = (m-1)*bs;
501   }
502   for ( i=0; i<m; i++ ) {
503 #if defined(USE_PETSC_BOPT_g)
504     if (im[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
505     if (im[i] >= baij->Mbs) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
506 #endif
507     row   = im[i];
508     v_t   = v + i*bs2;
509     if (row >= rstart && row < rend) {
510       for ( j=0; j<n; j++ ) {
511         col = in[j];
512 
513         /* Look up into the Hash Table */
514         key = row*Nbs+col+1;
515         h1  = HASH(size,key,tmp);
516 
517         idx = h1;
518 #if defined(USE_PETSC_BOPT_g)
519         total_ct++;
520         insert_ct++;
521        if (HT[idx] != key) {
522           for ( idx=h1; (idx<size) && (HT[idx]!=key); idx++,total_ct++);
523           if (idx == size) {
524             for ( idx=0; (idx<h1) && (HT[idx]!=key); idx++,total_ct++);
525             if (idx == h1) {
526               SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"(row,col) has no entry in the hash table");
527             }
528           }
529         }
530 #else
531         if (HT[idx] != key) {
532           for ( idx=h1; (idx<size) && (HT[idx]!=key); idx++);
533           if (idx == size) {
534             for ( idx=0; (idx<h1) && (HT[idx]!=key); idx++);
535             if (idx == h1) {
536               SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"(row,col) has no entry in the hash table");
537             }
538           }
539         }
540 #endif
541         baij_a = HD[idx];
542         if (roworiented) {
543           /*value = v + i*(stepval+bs)*bs + j*bs;*/
544           /* value = v + (i*(stepval+bs)+j)*bs; */
545           value = v_t;
546           v_t  += bs;
547           if (addv == ADD_VALUES) {
548             for ( ii=0; ii<bs; ii++,value+=stepval) {
549               for ( jj=ii; jj<bs2; jj+=bs ) {
550                 baij_a[jj]  += *value++;
551               }
552             }
553           } else {
554             for ( ii=0; ii<bs; ii++,value+=stepval) {
555               for ( jj=ii; jj<bs2; jj+=bs ) {
556                 baij_a[jj]  = *value++;
557               }
558             }
559           }
560         } else {
561           value = v + j*(stepval+bs)*bs + i*bs;
562           if (addv == ADD_VALUES) {
563             for ( ii=0; ii<bs; ii++,value+=stepval,baij_a+=bs ) {
564               for ( jj=0; jj<bs; jj++ ) {
565                 baij_a[jj]  += *value++;
566               }
567             }
568           } else {
569             for ( ii=0; ii<bs; ii++,value+=stepval,baij_a+=bs ) {
570               for ( jj=0; jj<bs; jj++ ) {
571                 baij_a[jj]  = *value++;
572               }
573             }
574           }
575         }
576       }
577     } else {
578       if (!baij->donotstash) {
579         if (roworiented ) {
580           row   = im[i]*bs;
581           value = v + i*(stepval+bs)*bs;
582           for ( j=0; j<bs; j++,row++ ) {
583             for ( k=0; k<n; k++ ) {
584               for ( col=in[k]*bs,l=0; l<bs; l++,col++) {
585                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
586               }
587             }
588           }
589         } else {
590           for ( j=0; j<n; j++ ) {
591             value = v + j*(stepval+bs)*bs + i*bs;
592             col   = in[j]*bs;
593             for ( k=0; k<bs; k++,col++,value+=stepval) {
594               for ( row = im[i]*bs,l=0; l<bs; l++,row++) {
595                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
596               }
597             }
598           }
599         }
600       }
601     }
602   }
603 #if defined(USE_PETSC_BOPT_g)
604   baij->ht_total_ct = total_ct;
605   baij->ht_insert_ct = insert_ct;
606 #endif
607   PetscFunctionReturn(0);
608 }
609 
610 #undef __FUNC__
611 #define __FUNC__ "MatGetValues_MPIBAIJ"
612 int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
613 {
614   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
615   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
616   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col,data;
617 
618   PetscFunctionBegin;
619   for ( i=0; i<m; i++ ) {
620     if (idxm[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
621     if (idxm[i] >= baij->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
622     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
623       row = idxm[i] - bsrstart;
624       for ( j=0; j<n; j++ ) {
625         if (idxn[j] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative column");
626         if (idxn[j] >= baij->N) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Column too large");
627         if (idxn[j] >= bscstart && idxn[j] < bscend){
628           col = idxn[j] - bscstart;
629           ierr = MatGetValues_SeqBAIJ(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
630         } else {
631           if (!baij->colmap) {
632             ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
633           }
634 #if defined (USE_CTABLE)
635           data = TableFind( baij->colmap, idxn[j]/bs + 1 ) - 1;
636 #else
637           data = baij->colmap[idxn[j]/bs]-1;
638 #endif
639           if((data < 0) || (baij->garray[data/bs] != idxn[j]/bs)) *(v+i*n+j) = 0.0;
640           else {
641             col  = data + idxn[j]%bs;
642             ierr = MatGetValues_SeqBAIJ(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
643           }
644         }
645       }
646     } else {
647       SETERRQ(PETSC_ERR_SUP,0,"Only local values currently supported");
648     }
649   }
650  PetscFunctionReturn(0);
651 }
652 
653 #undef __FUNC__
654 #define __FUNC__ "MatNorm_MPIBAIJ"
655 int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
656 {
657   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
658   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
659   int        ierr, i,bs2=baij->bs2;
660   double     sum = 0.0;
661   Scalar     *v;
662 
663   PetscFunctionBegin;
664   if (baij->size == 1) {
665     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
666   } else {
667     if (type == NORM_FROBENIUS) {
668       v = amat->a;
669       for (i=0; i<amat->nz*bs2; i++ ) {
670 #if defined(USE_PETSC_COMPLEX)
671         sum += PetscReal(PetscConj(*v)*(*v)); v++;
672 #else
673         sum += (*v)*(*v); v++;
674 #endif
675       }
676       v = bmat->a;
677       for (i=0; i<bmat->nz*bs2; i++ ) {
678 #if defined(USE_PETSC_COMPLEX)
679         sum += PetscReal(PetscConj(*v)*(*v)); v++;
680 #else
681         sum += (*v)*(*v); v++;
682 #endif
683       }
684       ierr = MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);CHKERRQ(ierr);
685       *norm = sqrt(*norm);
686     } else {
687       SETERRQ(PETSC_ERR_SUP,0,"No support for this norm yet");
688     }
689   }
690   PetscFunctionReturn(0);
691 }
692 
693 #undef __FUNC__
694 #define __FUNC__ "MatAssemblyBegin_MPIBAIJ"
695 int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
696 {
697   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
698   MPI_Comm    comm = mat->comm;
699   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
700   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
701   MPI_Request *send_waits,*recv_waits;
702   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
703   InsertMode  addv;
704   Scalar      *rvalues,*svalues;
705 
706   PetscFunctionBegin;
707   if (baij->donotstash) {
708     baij->svalues    = 0; baij->rvalues    = 0;
709     baij->nsends     = 0; baij->nrecvs     = 0;
710     baij->send_waits = 0; baij->recv_waits = 0;
711     baij->rmax       = 0;
712     PetscFunctionReturn(0);
713   }
714 
715   /* make sure all processors are either in INSERTMODE or ADDMODE */
716   ierr = MPI_Allreduce(&mat->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);CHKERRQ(ierr);
717   if (addv == (ADD_VALUES|INSERT_VALUES)) {
718     SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"Some processors inserted others added");
719   }
720   mat->insertmode = addv; /* in case this processor had no cache */
721 
722   /*  first count number of contributors to each processor */
723   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
724   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
725   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
726   for ( i=0; i<baij->stash.n; i++ ) {
727     idx = baij->stash.idx[i];
728     for ( j=0; j<size; j++ ) {
729       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
730         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
731       }
732     }
733   }
734   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
735 
736   /* inform other processors of number of messages and max length*/
737   work      = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
738   ierr      = MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
739   nreceives = work[rank];
740   ierr      = MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);CHKERRQ(ierr);
741   nmax      = work[rank];
742   PetscFree(work);
743 
744   /* post receives:
745        1) each message will consist of ordered pairs
746      (global index,value) we store the global index as a double
747      to simplify the message passing.
748        2) since we don't know how long each individual message is we
749      allocate the largest needed buffer for each receive. Potentially
750      this is a lot of wasted space.
751 
752 
753        This could be done better.
754   */
755   rvalues    = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));CHKPTRQ(rvalues);
756   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
757   for ( i=0; i<nreceives; i++ ) {
758     ierr = MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,comm,recv_waits+i);CHKERRQ(ierr);
759   }
760 
761   /* do sends:
762       1) starts[i] gives the starting index in svalues for stuff going to
763          the ith processor
764   */
765   svalues    = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
766   send_waits = (MPI_Request *) PetscMalloc((nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
767   starts     = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
768   starts[0] = 0;
769   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
770   for ( i=0; i<baij->stash.n; i++ ) {
771     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
772     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
773     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
774   }
775   PetscFree(owner);
776   starts[0] = 0;
777   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
778   count = 0;
779   for ( i=0; i<size; i++ ) {
780     if (procs[i]) {
781       ierr = MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,comm,send_waits+count++);CHKERRQ(ierr);
782     }
783   }
784   PetscFree(starts); PetscFree(nprocs);
785 
786   /* Free cache space */
787   PLogInfo(baij->A,"MatAssemblyBegin_MPIBAIJ:Number of off-processor values %d\n",baij->stash.n);
788   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
789 
790   baij->svalues    = svalues;    baij->rvalues    = rvalues;
791   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
792   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
793   baij->rmax       = nmax;
794 
795   PetscFunctionReturn(0);
796 }
797 
798 /*
799   Creates the hash table, and sets the table
800   This table is created only once.
801   If new entried need to be added to the matrix
802   then the hash table has to be destroyed and
803   recreated.
804 */
805 #undef __FUNC__
806 #define __FUNC__ "MatCreateHashTable_MPIBAIJ_Private"
807 int MatCreateHashTable_MPIBAIJ_Private(Mat mat,double factor)
808 {
809   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
810   Mat         A = baij->A, B=baij->B;
811   Mat_SeqBAIJ *a=(Mat_SeqBAIJ *)A->data, *b=(Mat_SeqBAIJ *)B->data;
812   int         i,j,k,nz=a->nz+b->nz,h1,*ai=a->i,*aj=a->j,*bi=b->i,*bj=b->j;
813   int         size,bs2=baij->bs2,rstart=baij->rstart;
814   int         cstart=baij->cstart,*garray=baij->garray,row,col,Nbs=baij->Nbs;
815   int         *HT,key;
816   Scalar      **HD;
817   double      tmp;
818 #if defined(USE_PETSC_BOPT_g)
819   int         ct=0,max=0;
820 #endif
821 
822   PetscFunctionBegin;
823   baij->ht_size=(int)(factor*nz);
824   size = baij->ht_size;
825 
826   if (baij->ht) {
827     PetscFunctionReturn(0);
828   }
829 
830   /* Allocate Memory for Hash Table */
831   baij->hd = (Scalar**)PetscMalloc((size)*(sizeof(int)+sizeof(Scalar*))+1); CHKPTRQ(baij->hd);
832   baij->ht = (int*)(baij->hd + size);
833   HD = baij->hd;
834   HT = baij->ht;
835 
836 
837   PetscMemzero(HD,size*(sizeof(int)+sizeof(Scalar*)));
838 
839 
840   /* Loop Over A */
841   for ( i=0; i<a->mbs; i++ ) {
842     for ( j=ai[i]; j<ai[i+1]; j++ ) {
843       row = i+rstart;
844       col = aj[j]+cstart;
845 
846       key = row*Nbs + col + 1;
847       h1  = HASH(size,key,tmp);
848       for ( k=0; k<size; k++ ){
849         if (HT[(h1+k)%size] == 0.0) {
850           HT[(h1+k)%size] = key;
851           HD[(h1+k)%size] = a->a + j*bs2;
852           break;
853 #if defined(USE_PETSC_BOPT_g)
854         } else {
855           ct++;
856 #endif
857         }
858       }
859 #if defined(USE_PETSC_BOPT_g)
860       if (k> max) max = k;
861 #endif
862     }
863   }
864   /* Loop Over B */
865   for ( i=0; i<b->mbs; i++ ) {
866     for ( j=bi[i]; j<bi[i+1]; j++ ) {
867       row = i+rstart;
868       col = garray[bj[j]];
869       key = row*Nbs + col + 1;
870       h1  = HASH(size,key,tmp);
871       for ( k=0; k<size; k++ ){
872         if (HT[(h1+k)%size] == 0.0) {
873           HT[(h1+k)%size] = key;
874           HD[(h1+k)%size] = b->a + j*bs2;
875           break;
876 #if defined(USE_PETSC_BOPT_g)
877         } else {
878           ct++;
879 #endif
880         }
881       }
882 #if defined(USE_PETSC_BOPT_g)
883       if (k> max) max = k;
884 #endif
885     }
886   }
887 
888   /* Print Summary */
889 #if defined(USE_PETSC_BOPT_g)
890   for ( i=0,j=0; i<size; i++)
891     if (HT[i]) {j++;}
892   PLogInfo(0,"MatCreateHashTable_MPIBAIJ_Private: Average Search = %5.2f,max search = %d\n",
893            (j== 0)? 0.0:((double)(ct+j))/j,max);
894 #endif
895   PetscFunctionReturn(0);
896 }
897 
898 #undef __FUNC__
899 #define __FUNC__ "MatAssemblyEnd_MPIBAIJ"
900 int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
901 {
902   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
903   MPI_Status  *send_status,recv_status;
904   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
905   int         bs=baij->bs,row,col,other_disassembled;
906   Scalar      *values,val;
907   InsertMode  addv = mat->insertmode;
908 
909   PetscFunctionBegin;
910   /*  wait on receives */
911   while (count) {
912     ierr = MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);CHKERRQ(ierr);
913     /* unpack receives into our local space */
914     values = baij->rvalues + 3*imdex*baij->rmax;
915     ierr = MPI_Get_count(&recv_status,MPIU_SCALAR,&n);CHKERRQ(ierr);
916     n = n/3;
917     for ( i=0; i<n; i++ ) {
918       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
919       col = (int) PetscReal(values[3*i+1]);
920       val = values[3*i+2];
921       if (col >= baij->cstart*bs && col < baij->cend*bs) {
922         col -= baij->cstart*bs;
923         ierr = MatSetValues(baij->A,1,&row,1,&col,&val,addv); CHKERRQ(ierr)
924       } else {
925         if (mat->was_assembled) {
926           if (!baij->colmap) {
927             ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);
928           }
929 #if defined (USE_CTABLE)
930 	  col = TableFind( baij->colmap, col/bs + 1 ) - 1 + col%bs;
931 #else
932           col = (baij->colmap[col/bs]) - 1 + col%bs;
933 #endif
934           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
935             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
936             col = (int) PetscReal(values[3*i+1]);
937           }
938         }
939         ierr = MatSetValues(baij->B,1,&row,1,&col,&val,addv); CHKERRQ(ierr)
940       }
941     }
942     count--;
943   }
944   if (baij->recv_waits) PetscFree(baij->recv_waits);
945   if (baij->rvalues)    PetscFree(baij->rvalues);
946 
947   /* wait on sends */
948   if (baij->nsends) {
949     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
950     ierr        = MPI_Waitall(baij->nsends,baij->send_waits,send_status);CHKERRQ(ierr);
951     PetscFree(send_status);
952   }
953   if (baij->send_waits) PetscFree(baij->send_waits);
954   if (baij->svalues)    PetscFree(baij->svalues);
955 
956   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
957   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
958 
959   /* determine if any processor has disassembled, if so we must
960      also disassemble ourselfs, in order that we may reassemble. */
961   /*
962      if nonzero structure of submatrix B cannot change then we know that
963      no processor disassembled thus we can skip this stuff
964   */
965   if (!((Mat_SeqBAIJ*) baij->B->data)->nonew)  {
966     ierr = MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);CHKERRQ(ierr);
967     if (mat->was_assembled && !other_disassembled) {
968       ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
969     }
970   }
971 
972   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
973     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
974   }
975   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
976   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
977 
978 #if defined(USE_PETSC_BOPT_g)
979   if (baij->ht && mode== MAT_FINAL_ASSEMBLY) {
980     PLogInfo(0,"MatAssemblyEnd_MPIBAIJ:Average Hash Table Search in MatSetValues = %5.2f\n",
981              ((double)baij->ht_total_ct)/baij->ht_insert_ct);
982     baij->ht_total_ct  = 0;
983     baij->ht_insert_ct = 0;
984   }
985 #endif
986   if (baij->ht_flag && !baij->ht && mode == MAT_FINAL_ASSEMBLY) {
987     ierr = MatCreateHashTable_MPIBAIJ_Private(mat,baij->ht_fact); CHKERRQ(ierr);
988     mat->ops->setvalues        = MatSetValues_MPIBAIJ_HT;
989     mat->ops->setvaluesblocked = MatSetValuesBlocked_MPIBAIJ_HT;
990   }
991 
992   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
993   PetscFunctionReturn(0);
994 }
995 
996 #undef __FUNC__
997 #define __FUNC__ "MatView_MPIBAIJ_Binary"
998 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
999 {
1000   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
1001   int          ierr;
1002 
1003   PetscFunctionBegin;
1004   if (baij->size == 1) {
1005     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
1006   } else SETERRQ(PETSC_ERR_SUP,0,"Only uniprocessor output supported");
1007   PetscFunctionReturn(0);
1008 }
1009 
1010 #undef __FUNC__
1011 #define __FUNC__ "MatView_MPIBAIJ_ASCIIorDraworSocket"
1012 static int MatView_MPIBAIJ_ASCIIorDraworSocket(Mat mat,Viewer viewer)
1013 {
1014   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
1015   int          ierr, format,bs = baij->bs, size = baij->size, rank = baij->rank;
1016   FILE         *fd;
1017   ViewerType   vtype;
1018 
1019   PetscFunctionBegin;
1020   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
1021   if (PetscTypeCompare(vtype,ASCII_VIEWER)) {
1022     ierr = ViewerGetFormat(viewer,&format);
1023     if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
1024       MatInfo info;
1025       MPI_Comm_rank(mat->comm,&rank);
1026       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
1027       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
1028       PetscSequentialPhaseBegin(mat->comm,1);
1029       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
1030               rank,baij->m,(int)info.nz_used*bs,(int)info.nz_allocated*bs,
1031               baij->bs,(int)info.memory);
1032       ierr = MatGetInfo(baij->A,MAT_LOCAL,&info);
1033       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
1034       ierr = MatGetInfo(baij->B,MAT_LOCAL,&info);
1035       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
1036       fflush(fd);
1037       PetscSequentialPhaseEnd(mat->comm,1);
1038       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
1039       PetscFunctionReturn(0);
1040     } else if (format == VIEWER_FORMAT_ASCII_INFO) {
1041       PetscPrintf(mat->comm,"  block size is %d\n",bs);
1042       PetscFunctionReturn(0);
1043     }
1044   }
1045 
1046   if (PetscTypeCompare(vtype,DRAW_VIEWER)) {
1047     Draw       draw;
1048     PetscTruth isnull;
1049     ierr = ViewerDrawGetDraw(viewer,0,&draw); CHKERRQ(ierr);
1050     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) PetscFunctionReturn(0);
1051   }
1052 
1053   if (size == 1) {
1054     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
1055   } else {
1056     /* assemble the entire matrix onto first processor. */
1057     Mat         A;
1058     Mat_SeqBAIJ *Aloc;
1059     int         M = baij->M, N = baij->N,*ai,*aj,col,i,j,k,*rvals;
1060     int         mbs = baij->mbs;
1061     Scalar      *a;
1062 
1063     if (!rank) {
1064       ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);CHKERRQ(ierr);
1065     } else {
1066       ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);CHKERRQ(ierr);
1067     }
1068     PLogObjectParent(mat,A);
1069 
1070     /* copy over the A part */
1071     Aloc = (Mat_SeqBAIJ*) baij->A->data;
1072     ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1073     rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
1074 
1075     for ( i=0; i<mbs; i++ ) {
1076       rvals[0] = bs*(baij->rstart + i);
1077       for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1078       for ( j=ai[i]; j<ai[i+1]; j++ ) {
1079         col = (baij->cstart+aj[j])*bs;
1080         for (k=0; k<bs; k++ ) {
1081           ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
1082           col++; a += bs;
1083         }
1084       }
1085     }
1086     /* copy over the B part */
1087     Aloc = (Mat_SeqBAIJ*) baij->B->data;
1088     ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1089     for ( i=0; i<mbs; i++ ) {
1090       rvals[0] = bs*(baij->rstart + i);
1091       for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1092       for ( j=ai[i]; j<ai[i+1]; j++ ) {
1093         col = baij->garray[aj[j]]*bs;
1094         for (k=0; k<bs; k++ ) {
1095           ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
1096           col++; a += bs;
1097         }
1098       }
1099     }
1100     PetscFree(rvals);
1101     ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1102     ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1103     /*
1104        Everyone has to call to draw the matrix since the graphics waits are
1105        synchronized across all processors that share the Draw object
1106     */
1107     if (!rank || PetscTypeCompare(vtype,DRAW_VIEWER)) {
1108       ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
1109     }
1110     ierr = MatDestroy(A); CHKERRQ(ierr);
1111   }
1112   PetscFunctionReturn(0);
1113 }
1114 
1115 
1116 
1117 #undef __FUNC__
1118 #define __FUNC__ "MatView_MPIBAIJ"
1119 int MatView_MPIBAIJ(Mat mat,Viewer viewer)
1120 {
1121   int         ierr;
1122   ViewerType  vtype;
1123 
1124   PetscFunctionBegin;
1125   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
1126   if (PetscTypeCompare(vtype,ASCII_VIEWER) || PetscTypeCompare(vtype,DRAW_VIEWER) ||
1127       PetscTypeCompare(vtype,SOCKET_VIEWER)) {
1128     ierr = MatView_MPIBAIJ_ASCIIorDraworSocket(mat,viewer); CHKERRQ(ierr);
1129   } else if (PetscTypeCompare(vtype,BINARY_VIEWER)) {
1130     ierr = MatView_MPIBAIJ_Binary(mat,viewer);CHKERRQ(ierr);
1131   } else {
1132     SETERRQ(1,1,"Viewer type not supported by PETSc object");
1133   }
1134   PetscFunctionReturn(0);
1135 }
1136 
1137 #undef __FUNC__
1138 #define __FUNC__ "MatDestroy_MPIBAIJ"
1139 int MatDestroy_MPIBAIJ(Mat mat)
1140 {
1141   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1142   int         ierr;
1143 
1144   PetscFunctionBegin;
1145   if (--mat->refct > 0) PetscFunctionReturn(0);
1146 
1147   if (mat->mapping) {
1148     ierr = ISLocalToGlobalMappingDestroy(mat->mapping); CHKERRQ(ierr);
1149   }
1150   if (mat->bmapping) {
1151     ierr = ISLocalToGlobalMappingDestroy(mat->bmapping); CHKERRQ(ierr);
1152   }
1153   if (mat->rmap) {
1154     ierr = MapDestroy(mat->rmap);CHKERRQ(ierr);
1155   }
1156   if (mat->cmap) {
1157     ierr = MapDestroy(mat->cmap);CHKERRQ(ierr);
1158   }
1159 #if defined(USE_PETSC_LOG)
1160   PLogObjectState((PetscObject)mat,"Rows=%d, Cols=%d",baij->M,baij->N);
1161 #endif
1162 
1163   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
1164   PetscFree(baij->rowners);
1165   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
1166   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
1167 #if defined (USE_CTABLE)
1168   if (baij->colmap) TableDelete(baij->colmap);
1169 #else
1170   if (baij->colmap) PetscFree(baij->colmap);
1171 #endif
1172   if (baij->garray) PetscFree(baij->garray);
1173   if (baij->lvec)   VecDestroy(baij->lvec);
1174   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
1175   if (baij->rowvalues) PetscFree(baij->rowvalues);
1176   if (baij->barray) PetscFree(baij->barray);
1177   if (baij->hd) PetscFree(baij->hd);
1178   PetscFree(baij);
1179   PLogObjectDestroy(mat);
1180   PetscHeaderDestroy(mat);
1181   PetscFunctionReturn(0);
1182 }
1183 
1184 #undef __FUNC__
1185 #define __FUNC__ "MatMult_MPIBAIJ"
1186 int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
1187 {
1188   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1189   int         ierr, nt;
1190 
1191   PetscFunctionBegin;
1192   ierr = VecGetLocalSize(xx,&nt);CHKERRQ(ierr);
1193   if (nt != a->n) {
1194     SETERRQ(PETSC_ERR_ARG_SIZ,0,"Incompatible partition of A and xx");
1195   }
1196   ierr = VecGetLocalSize(yy,&nt);CHKERRQ(ierr);
1197   if (nt != a->m) {
1198     SETERRQ(PETSC_ERR_ARG_SIZ,0,"Incompatible parition of A and yy");
1199   }
1200   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
1201   ierr = (*a->A->ops->mult)(a->A,xx,yy); CHKERRQ(ierr);
1202   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
1203   ierr = (*a->B->ops->multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
1204   ierr = VecScatterPostRecvs(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
1205   PetscFunctionReturn(0);
1206 }
1207 
1208 #undef __FUNC__
1209 #define __FUNC__ "MatMultAdd_MPIBAIJ"
1210 int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
1211 {
1212   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1213   int        ierr;
1214 
1215   PetscFunctionBegin;
1216   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
1217   ierr = (*a->A->ops->multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
1218   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
1219   ierr = (*a->B->ops->multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
1220   PetscFunctionReturn(0);
1221 }
1222 
1223 #undef __FUNC__
1224 #define __FUNC__ "MatMultTrans_MPIBAIJ"
1225 int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
1226 {
1227   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1228   int         ierr;
1229 
1230   PetscFunctionBegin;
1231   /* do nondiagonal part */
1232   ierr = (*a->B->ops->multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
1233   /* send it on its way */
1234   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
1235   /* do local part */
1236   ierr = (*a->A->ops->multtrans)(a->A,xx,yy); CHKERRQ(ierr);
1237   /* receive remote parts: note this assumes the values are not actually */
1238   /* inserted in yy until the next line, which is true for my implementation*/
1239   /* but is not perhaps always true. */
1240   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
1241   PetscFunctionReturn(0);
1242 }
1243 
1244 #undef __FUNC__
1245 #define __FUNC__ "MatMultTransAdd_MPIBAIJ"
1246 int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
1247 {
1248   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1249   int         ierr;
1250 
1251   PetscFunctionBegin;
1252   /* do nondiagonal part */
1253   ierr = (*a->B->ops->multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
1254   /* send it on its way */
1255   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
1256   /* do local part */
1257   ierr = (*a->A->ops->multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
1258   /* receive remote parts: note this assumes the values are not actually */
1259   /* inserted in yy until the next line, which is true for my implementation*/
1260   /* but is not perhaps always true. */
1261   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
1262   PetscFunctionReturn(0);
1263 }
1264 
1265 /*
1266   This only works correctly for square matrices where the subblock A->A is the
1267    diagonal block
1268 */
1269 #undef __FUNC__
1270 #define __FUNC__ "MatGetDiagonal_MPIBAIJ"
1271 int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
1272 {
1273   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1274   int         ierr;
1275 
1276   PetscFunctionBegin;
1277   if (a->M != a->N) SETERRQ(PETSC_ERR_SUP,0,"Supports only square matrix where A->A is diag block");
1278   ierr = MatGetDiagonal(a->A,v);CHKERRQ(ierr);
1279   PetscFunctionReturn(0);
1280 }
1281 
1282 #undef __FUNC__
1283 #define __FUNC__ "MatScale_MPIBAIJ"
1284 int MatScale_MPIBAIJ(Scalar *aa,Mat A)
1285 {
1286   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1287   int         ierr;
1288 
1289   PetscFunctionBegin;
1290   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
1291   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
1292   PetscFunctionReturn(0);
1293 }
1294 
1295 #undef __FUNC__
1296 #define __FUNC__ "MatGetSize_MPIBAIJ"
1297 int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
1298 {
1299   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1300 
1301   PetscFunctionBegin;
1302   if (m) *m = mat->M;
1303   if (n) *n = mat->N;
1304   PetscFunctionReturn(0);
1305 }
1306 
1307 #undef __FUNC__
1308 #define __FUNC__ "MatGetLocalSize_MPIBAIJ"
1309 int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
1310 {
1311   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1312 
1313   PetscFunctionBegin;
1314   *m = mat->m; *n = mat->n;
1315   PetscFunctionReturn(0);
1316 }
1317 
1318 #undef __FUNC__
1319 #define __FUNC__ "MatGetOwnershipRange_MPIBAIJ"
1320 int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
1321 {
1322   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1323 
1324   PetscFunctionBegin;
1325   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
1326   PetscFunctionReturn(0);
1327 }
1328 
1329 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
1330 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
1331 
1332 #undef __FUNC__
1333 #define __FUNC__ "MatGetRow_MPIBAIJ"
1334 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
1335 {
1336   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1337   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
1338   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
1339   int        nztot, nzA, nzB, lrow, brstart = mat->rstart*bs, brend = mat->rend*bs;
1340   int        *cmap, *idx_p,cstart = mat->cstart;
1341 
1342   PetscFunctionBegin;
1343   if (mat->getrowactive == PETSC_TRUE) SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"Already active");
1344   mat->getrowactive = PETSC_TRUE;
1345 
1346   if (!mat->rowvalues && (idx || v)) {
1347     /*
1348         allocate enough space to hold information from the longest row.
1349     */
1350     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
1351     int     max = 1,mbs = mat->mbs,tmp;
1352     for ( i=0; i<mbs; i++ ) {
1353       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
1354       if (max < tmp) { max = tmp; }
1355     }
1356     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
1357     CHKPTRQ(mat->rowvalues);
1358     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
1359   }
1360 
1361   if (row < brstart || row >= brend) SETERRQ(PETSC_ERR_SUP,0,"Only local rows")
1362   lrow = row - brstart;
1363 
1364   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
1365   if (!v)   {pvA = 0; pvB = 0;}
1366   if (!idx) {pcA = 0; if (!v) pcB = 0;}
1367   ierr = (*mat->A->ops->getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1368   ierr = (*mat->B->ops->getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1369   nztot = nzA + nzB;
1370 
1371   cmap  = mat->garray;
1372   if (v  || idx) {
1373     if (nztot) {
1374       /* Sort by increasing column numbers, assuming A and B already sorted */
1375       int imark = -1;
1376       if (v) {
1377         *v = v_p = mat->rowvalues;
1378         for ( i=0; i<nzB; i++ ) {
1379           if (cmap[cworkB[i]/bs] < cstart)   v_p[i] = vworkB[i];
1380           else break;
1381         }
1382         imark = i;
1383         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
1384         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
1385       }
1386       if (idx) {
1387         *idx = idx_p = mat->rowindices;
1388         if (imark > -1) {
1389           for ( i=0; i<imark; i++ ) {
1390             idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs;
1391           }
1392         } else {
1393           for ( i=0; i<nzB; i++ ) {
1394             if (cmap[cworkB[i]/bs] < cstart)
1395               idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
1396             else break;
1397           }
1398           imark = i;
1399         }
1400         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart*bs + cworkA[i];
1401         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
1402       }
1403     } else {
1404       if (idx) *idx = 0;
1405       if (v)   *v   = 0;
1406     }
1407   }
1408   *nz = nztot;
1409   ierr = (*mat->A->ops->restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1410   ierr = (*mat->B->ops->restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1411   PetscFunctionReturn(0);
1412 }
1413 
1414 #undef __FUNC__
1415 #define __FUNC__ "MatRestoreRow_MPIBAIJ"
1416 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
1417 {
1418   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1419 
1420   PetscFunctionBegin;
1421   if (baij->getrowactive == PETSC_FALSE) {
1422     SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"MatGetRow not called");
1423   }
1424   baij->getrowactive = PETSC_FALSE;
1425   PetscFunctionReturn(0);
1426 }
1427 
1428 #undef __FUNC__
1429 #define __FUNC__ "MatGetBlockSize_MPIBAIJ"
1430 int MatGetBlockSize_MPIBAIJ(Mat mat,int *bs)
1431 {
1432   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1433 
1434   PetscFunctionBegin;
1435   *bs = baij->bs;
1436   PetscFunctionReturn(0);
1437 }
1438 
1439 #undef __FUNC__
1440 #define __FUNC__ "MatZeroEntries_MPIBAIJ"
1441 int MatZeroEntries_MPIBAIJ(Mat A)
1442 {
1443   Mat_MPIBAIJ *l = (Mat_MPIBAIJ *) A->data;
1444   int         ierr;
1445 
1446   PetscFunctionBegin;
1447   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
1448   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
1449   PetscFunctionReturn(0);
1450 }
1451 
1452 #undef __FUNC__
1453 #define __FUNC__ "MatGetInfo_MPIBAIJ"
1454 int MatGetInfo_MPIBAIJ(Mat matin,MatInfoType flag,MatInfo *info)
1455 {
1456   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) matin->data;
1457   Mat         A = a->A, B = a->B;
1458   int         ierr;
1459   double      isend[5], irecv[5];
1460 
1461   PetscFunctionBegin;
1462   info->block_size     = (double)a->bs;
1463   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
1464   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->nz_unneeded;
1465   isend[3] = info->memory;  isend[4] = info->mallocs;
1466   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
1467   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->nz_unneeded;
1468   isend[3] += info->memory;  isend[4] += info->mallocs;
1469   if (flag == MAT_LOCAL) {
1470     info->nz_used      = isend[0];
1471     info->nz_allocated = isend[1];
1472     info->nz_unneeded  = isend[2];
1473     info->memory       = isend[3];
1474     info->mallocs      = isend[4];
1475   } else if (flag == MAT_GLOBAL_MAX) {
1476     ierr = MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_MAX,matin->comm);CHKERRQ(ierr);
1477     info->nz_used      = irecv[0];
1478     info->nz_allocated = irecv[1];
1479     info->nz_unneeded  = irecv[2];
1480     info->memory       = irecv[3];
1481     info->mallocs      = irecv[4];
1482   } else if (flag == MAT_GLOBAL_SUM) {
1483     ierr = MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_SUM,matin->comm);CHKERRQ(ierr);
1484     info->nz_used      = irecv[0];
1485     info->nz_allocated = irecv[1];
1486     info->nz_unneeded  = irecv[2];
1487     info->memory       = irecv[3];
1488     info->mallocs      = irecv[4];
1489   }
1490   info->rows_global       = (double)a->M;
1491   info->columns_global    = (double)a->N;
1492   info->rows_local        = (double)a->m;
1493   info->columns_local     = (double)a->N;
1494   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
1495   info->fill_ratio_needed = 0;
1496   info->factor_mallocs    = 0;
1497   PetscFunctionReturn(0);
1498 }
1499 
1500 #undef __FUNC__
1501 #define __FUNC__ "MatSetOption_MPIBAIJ"
1502 int MatSetOption_MPIBAIJ(Mat A,MatOption op)
1503 {
1504   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1505 
1506   PetscFunctionBegin;
1507   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
1508       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
1509       op == MAT_COLUMNS_UNSORTED ||
1510       op == MAT_COLUMNS_SORTED ||
1511       op == MAT_NEW_NONZERO_ALLOCATION_ERROR ||
1512       op == MAT_NEW_NONZERO_LOCATION_ERROR) {
1513         MatSetOption(a->A,op);
1514         MatSetOption(a->B,op);
1515   } else if (op == MAT_ROW_ORIENTED) {
1516         a->roworiented = 1;
1517         MatSetOption(a->A,op);
1518         MatSetOption(a->B,op);
1519   } else if (op == MAT_ROWS_SORTED ||
1520              op == MAT_ROWS_UNSORTED ||
1521              op == MAT_SYMMETRIC ||
1522              op == MAT_STRUCTURALLY_SYMMETRIC ||
1523              op == MAT_YES_NEW_DIAGONALS ||
1524              op == MAT_USE_HASH_TABLE)
1525     PLogInfo(A,"Info:MatSetOption_MPIBAIJ:Option ignored\n");
1526   else if (op == MAT_COLUMN_ORIENTED) {
1527     a->roworiented = 0;
1528     MatSetOption(a->A,op);
1529     MatSetOption(a->B,op);
1530   } else if (op == MAT_IGNORE_OFF_PROC_ENTRIES) {
1531     a->donotstash = 1;
1532   } else if (op == MAT_NO_NEW_DIAGONALS) {
1533     SETERRQ(PETSC_ERR_SUP,0,"MAT_NO_NEW_DIAGONALS");
1534   } else if (op == MAT_USE_HASH_TABLE) {
1535     a->ht_flag = 1;
1536   } else {
1537     SETERRQ(PETSC_ERR_SUP,0,"unknown option");
1538   }
1539   PetscFunctionReturn(0);
1540 }
1541 
1542 #undef __FUNC__
1543 #define __FUNC__ "MatTranspose_MPIBAIJ("
1544 int MatTranspose_MPIBAIJ(Mat A,Mat *matout)
1545 {
1546   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) A->data;
1547   Mat_SeqBAIJ *Aloc;
1548   Mat        B;
1549   int        ierr,M=baij->M,N=baij->N,*ai,*aj,i,*rvals,j,k,col;
1550   int        bs=baij->bs,mbs=baij->mbs;
1551   Scalar     *a;
1552 
1553   PetscFunctionBegin;
1554   if (matout == PETSC_NULL && M != N) SETERRQ(PETSC_ERR_ARG_SIZ,0,"Square matrix only for in-place");
1555   ierr = MatCreateMPIBAIJ(A->comm,baij->bs,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);
1556   CHKERRQ(ierr);
1557 
1558   /* copy over the A part */
1559   Aloc = (Mat_SeqBAIJ*) baij->A->data;
1560   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1561   rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
1562 
1563   for ( i=0; i<mbs; i++ ) {
1564     rvals[0] = bs*(baij->rstart + i);
1565     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1566     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1567       col = (baij->cstart+aj[j])*bs;
1568       for (k=0; k<bs; k++ ) {
1569         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1570         col++; a += bs;
1571       }
1572     }
1573   }
1574   /* copy over the B part */
1575   Aloc = (Mat_SeqBAIJ*) baij->B->data;
1576   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1577   for ( i=0; i<mbs; i++ ) {
1578     rvals[0] = bs*(baij->rstart + i);
1579     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1580     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1581       col = baij->garray[aj[j]]*bs;
1582       for (k=0; k<bs; k++ ) {
1583         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1584         col++; a += bs;
1585       }
1586     }
1587   }
1588   PetscFree(rvals);
1589   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1590   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1591 
1592   if (matout != PETSC_NULL) {
1593     *matout = B;
1594   } else {
1595     PetscOps *Abops;
1596     MatOps   Aops;
1597 
1598     /* This isn't really an in-place transpose .... but free data structures from baij */
1599     PetscFree(baij->rowners);
1600     ierr = MatDestroy(baij->A); CHKERRQ(ierr);
1601     ierr = MatDestroy(baij->B); CHKERRQ(ierr);
1602 #if defined (USE_CTABLE)
1603     if (baij->colmap) TableDelete(baij->colmap);
1604 #else
1605     if (baij->colmap) PetscFree(baij->colmap);
1606 #endif
1607     if (baij->garray) PetscFree(baij->garray);
1608     if (baij->lvec) VecDestroy(baij->lvec);
1609     if (baij->Mvctx) VecScatterDestroy(baij->Mvctx);
1610     PetscFree(baij);
1611 
1612     /*
1613        This is horrible, horrible code. We need to keep the
1614       A pointers for the bops and ops but copy everything
1615       else from C.
1616     */
1617     Abops = A->bops;
1618     Aops  = A->ops;
1619     PetscMemcpy(A,B,sizeof(struct _p_Mat));
1620     A->bops = Abops;
1621     A->ops  = Aops;
1622 
1623     PetscHeaderDestroy(B);
1624   }
1625   PetscFunctionReturn(0);
1626 }
1627 
1628 #undef __FUNC__
1629 #define __FUNC__ "MatDiagonalScale_MPIBAIJ"
1630 int MatDiagonalScale_MPIBAIJ(Mat A,Vec ll,Vec rr)
1631 {
1632   Mat a = ((Mat_MPIBAIJ *) A->data)->A;
1633   Mat b = ((Mat_MPIBAIJ *) A->data)->B;
1634   int ierr,s1,s2,s3;
1635 
1636   PetscFunctionBegin;
1637   if (ll)  {
1638     ierr = VecGetLocalSize(ll,&s1); CHKERRQ(ierr);
1639     ierr = MatGetLocalSize(A,&s2,&s3); CHKERRQ(ierr);
1640     if (s1!=s2) SETERRQ(PETSC_ERR_ARG_SIZ,0,"non-conforming local sizes");
1641     ierr = MatDiagonalScale(a,ll,0); CHKERRQ(ierr);
1642     ierr = MatDiagonalScale(b,ll,0); CHKERRQ(ierr);
1643   }
1644   if (rr) SETERRQ(PETSC_ERR_SUP,0,"not supported for right vector");
1645   PetscFunctionReturn(0);
1646 }
1647 
1648 #undef __FUNC__
1649 #define __FUNC__ "MatZeroRows_MPIBAIJ"
1650 int MatZeroRows_MPIBAIJ(Mat A,IS is,Scalar *diag)
1651 {
1652   Mat_MPIBAIJ    *l = (Mat_MPIBAIJ *) A->data;
1653   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
1654   int            *procs,*nprocs,j,found,idx,nsends,*work,row;
1655   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
1656   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
1657   int            *lens,imdex,*lrows,*values,bs=l->bs,rstart_bs=l->rstart_bs;
1658   MPI_Comm       comm = A->comm;
1659   MPI_Request    *send_waits,*recv_waits;
1660   MPI_Status     recv_status,*send_status;
1661   IS             istmp;
1662   PetscTruth     localdiag;
1663 
1664   PetscFunctionBegin;
1665   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
1666   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
1667 
1668   /*  first count number of contributors to each processor */
1669   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
1670   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
1671   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
1672   for ( i=0; i<N; i++ ) {
1673     idx = rows[i];
1674     found = 0;
1675     for ( j=0; j<size; j++ ) {
1676       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
1677         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
1678       }
1679     }
1680     if (!found) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Index out of range");
1681   }
1682   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
1683 
1684   /* inform other processors of number of messages and max length*/
1685   work   = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
1686   ierr   = MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
1687   nrecvs = work[rank];
1688   ierr   = MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);CHKERRQ(ierr);
1689   nmax   = work[rank];
1690   PetscFree(work);
1691 
1692   /* post receives:   */
1693   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); CHKPTRQ(rvalues);
1694   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
1695   for ( i=0; i<nrecvs; i++ ) {
1696     ierr = MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);CHKERRQ(ierr);
1697   }
1698 
1699   /* do sends:
1700      1) starts[i] gives the starting index in svalues for stuff going to
1701      the ith processor
1702   */
1703   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
1704   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
1705   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
1706   starts[0] = 0;
1707   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1708   for ( i=0; i<N; i++ ) {
1709     svalues[starts[owner[i]]++] = rows[i];
1710   }
1711   ISRestoreIndices(is,&rows);
1712 
1713   starts[0] = 0;
1714   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1715   count = 0;
1716   for ( i=0; i<size; i++ ) {
1717     if (procs[i]) {
1718       ierr = MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);CHKERRQ(ierr);
1719     }
1720   }
1721   PetscFree(starts);
1722 
1723   base = owners[rank]*bs;
1724 
1725   /*  wait on receives */
1726   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
1727   source = lens + nrecvs;
1728   count  = nrecvs; slen = 0;
1729   while (count) {
1730     ierr = MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);CHKERRQ(ierr);
1731     /* unpack receives into our local space */
1732     ierr = MPI_Get_count(&recv_status,MPI_INT,&n);CHKERRQ(ierr);
1733     source[imdex]  = recv_status.MPI_SOURCE;
1734     lens[imdex]  = n;
1735     slen += n;
1736     count--;
1737   }
1738   PetscFree(recv_waits);
1739 
1740   /* move the data into the send scatter */
1741   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
1742   count = 0;
1743   for ( i=0; i<nrecvs; i++ ) {
1744     values = rvalues + i*nmax;
1745     for ( j=0; j<lens[i]; j++ ) {
1746       lrows[count++] = values[j] - base;
1747     }
1748   }
1749   PetscFree(rvalues); PetscFree(lens);
1750   PetscFree(owner); PetscFree(nprocs);
1751 
1752   /* actually zap the local rows */
1753   ierr = ISCreateGeneral(PETSC_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
1754   PLogObjectParent(A,istmp);
1755 
1756   /*
1757         Zero the required rows. If the "diagonal block" of the matrix
1758      is square and the user wishes to set the diagonal we use seperate
1759      code so that MatSetValues() is not called for each diagonal allocating
1760      new memory, thus calling lots of mallocs and slowing things down.
1761 
1762        Contributed by: Mathew Knepley
1763   */
1764   localdiag = PETSC_FALSE;
1765   if (diag && (l->A->M == l->A->N)) {
1766     localdiag = PETSC_TRUE;
1767     ierr      = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
1768   } else {
1769     ierr = MatZeroRows(l->A,istmp,0); CHKERRQ(ierr);
1770   }
1771   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
1772   ierr = ISDestroy(istmp); CHKERRQ(ierr);
1773 
1774   if (diag && (localdiag == PETSC_FALSE)) {
1775     for ( i = 0; i < slen; i++ ) {
1776       row = lrows[i] + rstart_bs;
1777       ierr = MatSetValues(A,1,&row,1,&row,diag,INSERT_VALUES); CHKERRQ(ierr);
1778     }
1779     ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1780     ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1781   }
1782   PetscFree(lrows);
1783 
1784   /* wait on sends */
1785   if (nsends) {
1786     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
1787     ierr        = MPI_Waitall(nsends,send_waits,send_status);CHKERRQ(ierr);
1788     PetscFree(send_status);
1789   }
1790   PetscFree(send_waits); PetscFree(svalues);
1791 
1792   PetscFunctionReturn(0);
1793 }
1794 
1795 extern int MatPrintHelp_SeqBAIJ(Mat);
1796 #undef __FUNC__
1797 #define __FUNC__ "MatPrintHelp_MPIBAIJ"
1798 int MatPrintHelp_MPIBAIJ(Mat A)
1799 {
1800   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1801   MPI_Comm    comm = A->comm;
1802   static int  called = 0;
1803   int         ierr;
1804 
1805   PetscFunctionBegin;
1806   if (!a->rank) {
1807     ierr = MatPrintHelp_SeqBAIJ(a->A);CHKERRQ(ierr);
1808   }
1809   if (called) {PetscFunctionReturn(0);} else called = 1;
1810   (*PetscHelpPrintf)(comm," Options for MATMPIBAIJ matrix format (the defaults):\n");
1811   (*PetscHelpPrintf)(comm,"  -mat_use_hash_table <factor>: Use hashtable for efficient matrix assembly\n");
1812   PetscFunctionReturn(0);
1813 }
1814 
1815 #undef __FUNC__
1816 #define __FUNC__ "MatSetUnfactored_MPIBAIJ"
1817 int MatSetUnfactored_MPIBAIJ(Mat A)
1818 {
1819   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1820   int         ierr;
1821 
1822   PetscFunctionBegin;
1823   ierr = MatSetUnfactored(a->A); CHKERRQ(ierr);
1824   PetscFunctionReturn(0);
1825 }
1826 
1827 static int MatDuplicate_MPIBAIJ(Mat,MatDuplicateOption,Mat *);
1828 
1829 /* -------------------------------------------------------------------*/
1830 static struct _MatOps MatOps_Values = {
1831   MatSetValues_MPIBAIJ,
1832   MatGetRow_MPIBAIJ,
1833   MatRestoreRow_MPIBAIJ,
1834   MatMult_MPIBAIJ,
1835   MatMultAdd_MPIBAIJ,
1836   MatMultTrans_MPIBAIJ,
1837   MatMultTransAdd_MPIBAIJ,
1838   0,
1839   0,
1840   0,
1841   0,
1842   0,
1843   0,
1844   0,
1845   MatTranspose_MPIBAIJ,
1846   MatGetInfo_MPIBAIJ,
1847   0,
1848   MatGetDiagonal_MPIBAIJ,
1849   MatDiagonalScale_MPIBAIJ,
1850   MatNorm_MPIBAIJ,
1851   MatAssemblyBegin_MPIBAIJ,
1852   MatAssemblyEnd_MPIBAIJ,
1853   0,
1854   MatSetOption_MPIBAIJ,
1855   MatZeroEntries_MPIBAIJ,
1856   MatZeroRows_MPIBAIJ,
1857   0,
1858   0,
1859   0,
1860   0,
1861   MatGetSize_MPIBAIJ,
1862   MatGetLocalSize_MPIBAIJ,
1863   MatGetOwnershipRange_MPIBAIJ,
1864   0,
1865   0,
1866   0,
1867   0,
1868   MatDuplicate_MPIBAIJ,
1869   0,
1870   0,
1871   0,
1872   0,
1873   0,
1874   MatGetSubMatrices_MPIBAIJ,
1875   MatIncreaseOverlap_MPIBAIJ,
1876   MatGetValues_MPIBAIJ,
1877   0,
1878   MatPrintHelp_MPIBAIJ,
1879   MatScale_MPIBAIJ,
1880   0,
1881   0,
1882   0,
1883   MatGetBlockSize_MPIBAIJ,
1884   0,
1885   0,
1886   0,
1887   0,
1888   0,
1889   0,
1890   MatSetUnfactored_MPIBAIJ,
1891   0,
1892   MatSetValuesBlocked_MPIBAIJ,
1893   0,
1894   0,
1895   0,
1896   MatGetMaps_Petsc};
1897 
1898 
1899 EXTERN_C_BEGIN
1900 #undef __FUNC__
1901 #define __FUNC__ "MatGetDiagonalBlock_MPIBAIJ"
1902 int MatGetDiagonalBlock_MPIBAIJ(Mat A,PetscTruth *iscopy,MatReuse reuse,Mat *a)
1903 {
1904   PetscFunctionBegin;
1905   *a      = ((Mat_MPIBAIJ *)A->data)->A;
1906   *iscopy = PETSC_FALSE;
1907   PetscFunctionReturn(0);
1908 }
1909 EXTERN_C_END
1910 
1911 #undef __FUNC__
1912 #define __FUNC__ "MatCreateMPIBAIJ"
1913 /*@C
1914    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
1915    (block compressed row).  For good matrix assembly performance
1916    the user should preallocate the matrix storage by setting the parameters
1917    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1918    performance can be increased by more than a factor of 50.
1919 
1920    Collective on MPI_Comm
1921 
1922    Input Parameters:
1923 +  comm - MPI communicator
1924 .  bs   - size of blockk
1925 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1926            This value should be the same as the local size used in creating the
1927            y vector for the matrix-vector product y = Ax.
1928 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1929            This value should be the same as the local size used in creating the
1930            x vector for the matrix-vector product y = Ax.
1931 .  M - number of global rows (or PETSC_DETERMINE to have calculated if m is given)
1932 .  N - number of global columns (or PETSC_DETERMINE to have calculated if n is given)
1933 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
1934            submatrix  (same for all local rows)
1935 .  d_nzz - array containing the number of block nonzeros in the various block rows
1936            of the in diagonal portion of the local (possibly different for each block
1937            row) or PETSC_NULL.  You must leave room for the diagonal entry even if it is zero.
1938 .  o_nz  - number of block nonzeros per block row in the off-diagonal portion of local
1939            submatrix (same for all local rows).
1940 -  o_nzz - array containing the number of nonzeros in the various block rows of the
1941            off-diagonal portion of the local submatrix (possibly different for
1942            each block row) or PETSC_NULL.
1943 
1944    Output Parameter:
1945 .  A - the matrix
1946 
1947    Options Database Keys:
1948 .   -mat_no_unroll - uses code that does not unroll the loops in the
1949                      block calculations (much slower)
1950 .   -mat_block_size - size of the blocks to use
1951 .   -mat_mpi - use the parallel matrix data structures even on one processor
1952                (defaults to using SeqBAIJ format on one processor)
1953 
1954    Notes:
1955    The user MUST specify either the local or global matrix dimensions
1956    (possibly both).
1957 
1958    If PETSC_DECIDE or  PETSC_DETERMINE is used for a particular argument on one processor
1959    than it must be used on all processors that share the object for that argument.
1960 
1961    Storage Information:
1962    For a square global matrix we define each processor's diagonal portion
1963    to be its local rows and the corresponding columns (a square submatrix);
1964    each processor's off-diagonal portion encompasses the remainder of the
1965    local matrix (a rectangular submatrix).
1966 
1967    The user can specify preallocated storage for the diagonal part of
1968    the local submatrix with either d_nz or d_nnz (not both).  Set
1969    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1970    memory allocation.  Likewise, specify preallocated storage for the
1971    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1972 
1973    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1974    the figure below we depict these three local rows and all columns (0-11).
1975 
1976 .vb
1977            0 1 2 3 4 5 6 7 8 9 10 11
1978           -------------------
1979    row 3  |  o o o d d d o o o o o o
1980    row 4  |  o o o d d d o o o o o o
1981    row 5  |  o o o d d d o o o o o o
1982           -------------------
1983 .ve
1984 
1985    Thus, any entries in the d locations are stored in the d (diagonal)
1986    submatrix, and any entries in the o locations are stored in the
1987    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1988    stored simply in the MATSEQBAIJ format for compressed row storage.
1989 
1990    Now d_nz should indicate the number of block nonzeros per row in the d matrix,
1991    and o_nz should indicate the number of block nonzeros per row in the o matrix.
1992    In general, for PDE problems in which most nonzeros are near the diagonal,
1993    one expects d_nz >> o_nz.   For large problems you MUST preallocate memory
1994    or you will get TERRIBLE performance; see the users' manual chapter on
1995    matrices.
1996 
1997    Level: intermediate
1998 
1999 .keywords: matrix, block, aij, compressed row, sparse, parallel
2000 
2001 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues(), MatCreateMPIAIJ()
2002 @*/
2003 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
2004                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
2005 {
2006   Mat          B;
2007   Mat_MPIBAIJ  *b;
2008   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE,size,flg;
2009   int          flag1 = 0,flag2 = 0;
2010 
2011   PetscFunctionBegin;
2012   if (bs < 1) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Invalid block size specified, must be positive");
2013 
2014   MPI_Comm_size(comm,&size);
2015   ierr = OptionsHasName(PETSC_NULL,"-mat_mpibaij",&flag1); CHKERRQ(ierr);
2016   ierr = OptionsHasName(PETSC_NULL,"-mat_mpi",&flag2); CHKERRQ(ierr);
2017   if (!flag1 && !flag2 && size == 1) {
2018     if (M == PETSC_DECIDE) M = m;
2019     if (N == PETSC_DECIDE) N = n;
2020     ierr = MatCreateSeqBAIJ(comm,bs,M,N,d_nz,d_nnz,A); CHKERRQ(ierr);
2021     PetscFunctionReturn(0);
2022   }
2023 
2024   *A = 0;
2025   PetscHeaderCreate(B,_p_Mat,struct _MatOps,MAT_COOKIE,MATMPIBAIJ,"Mat",comm,MatDestroy,MatView);
2026   PLogObjectCreate(B);
2027   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
2028   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
2029   PetscMemcpy(B->ops,&MatOps_Values,sizeof(struct _MatOps));
2030 
2031   B->ops->destroy    = MatDestroy_MPIBAIJ;
2032   B->ops->view       = MatView_MPIBAIJ;
2033   B->mapping    = 0;
2034   B->factor     = 0;
2035   B->assembled  = PETSC_FALSE;
2036 
2037   B->insertmode = NOT_SET_VALUES;
2038   MPI_Comm_rank(comm,&b->rank);
2039   MPI_Comm_size(comm,&b->size);
2040 
2041   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL)) {
2042     SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
2043   }
2044   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) {
2045     SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"either M or m should be specified");
2046   }
2047   if ( N == PETSC_DECIDE && n == PETSC_DECIDE) {
2048     SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"either N or n should be specified");
2049   }
2050   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
2051   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
2052 
2053   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
2054     work[0] = m; work[1] = n;
2055     mbs = m/bs; nbs = n/bs;
2056     ierr = MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );CHKERRQ(ierr);
2057     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
2058     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
2059   }
2060   if (m == PETSC_DECIDE) {
2061     Mbs = M/bs;
2062     if (Mbs*bs != M) SETERRQ(PETSC_ERR_ARG_SIZ,0,"No of global rows must be divisible by blocksize");
2063     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
2064     m   = mbs*bs;
2065   }
2066   if (n == PETSC_DECIDE) {
2067     Nbs = N/bs;
2068     if (Nbs*bs != N) SETERRQ(PETSC_ERR_ARG_SIZ,0,"No of global cols must be divisible by blocksize");
2069     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
2070     n   = nbs*bs;
2071   }
2072   if (mbs*bs != m || nbs*bs != n) {
2073     SETERRQ(PETSC_ERR_ARG_SIZ,0,"No of local rows, cols must be divisible by blocksize");
2074   }
2075 
2076   b->m = m; B->m = m;
2077   b->n = n; B->n = n;
2078   b->N = N; B->N = N;
2079   b->M = M; B->M = M;
2080   b->bs  = bs;
2081   b->bs2 = bs*bs;
2082   b->mbs = mbs;
2083   b->nbs = nbs;
2084   b->Mbs = Mbs;
2085   b->Nbs = Nbs;
2086 
2087   /* the information in the maps duplicates the information computed below, eventually
2088      we should remove the duplicate information that is not contained in the maps */
2089   ierr = MapCreateMPI(comm,m,M,&B->rmap);CHKERRQ(ierr);
2090   ierr = MapCreateMPI(comm,n,N,&B->cmap);CHKERRQ(ierr);
2091 
2092   /* build local table of row and column ownerships */
2093   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
2094   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIBAIJ));
2095   b->cowners = b->rowners + b->size + 2;
2096   ierr = MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
2097   b->rowners[0] = 0;
2098   for ( i=2; i<=b->size; i++ ) {
2099     b->rowners[i] += b->rowners[i-1];
2100   }
2101   b->rstart    = b->rowners[b->rank];
2102   b->rend      = b->rowners[b->rank+1];
2103   b->rstart_bs = b->rstart * bs;
2104   b->rend_bs   = b->rend * bs;
2105 
2106   ierr = MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
2107   b->cowners[0] = 0;
2108   for ( i=2; i<=b->size; i++ ) {
2109     b->cowners[i] += b->cowners[i-1];
2110   }
2111   b->cstart    = b->cowners[b->rank];
2112   b->cend      = b->cowners[b->rank+1];
2113   b->cstart_bs = b->cstart * bs;
2114   b->cend_bs   = b->cend * bs;
2115 
2116 
2117   if (d_nz == PETSC_DEFAULT) d_nz = 5;
2118   ierr = MatCreateSeqBAIJ(PETSC_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
2119   PLogObjectParent(B,b->A);
2120   if (o_nz == PETSC_DEFAULT) o_nz = 0;
2121   ierr = MatCreateSeqBAIJ(PETSC_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
2122   PLogObjectParent(B,b->B);
2123 
2124   /* build cache for off array entries formed */
2125   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
2126   b->donotstash  = 0;
2127   b->colmap      = 0;
2128   b->garray      = 0;
2129   b->roworiented = 1;
2130 
2131   /* stuff used in block assembly */
2132   b->barray       = 0;
2133 
2134   /* stuff used for matrix vector multiply */
2135   b->lvec         = 0;
2136   b->Mvctx        = 0;
2137 
2138   /* stuff for MatGetRow() */
2139   b->rowindices   = 0;
2140   b->rowvalues    = 0;
2141   b->getrowactive = PETSC_FALSE;
2142 
2143   /* hash table stuff */
2144   b->ht           = 0;
2145   b->hd           = 0;
2146   b->ht_size      = 0;
2147   b->ht_flag      = 0;
2148   b->ht_fact      = 0;
2149   b->ht_total_ct  = 0;
2150   b->ht_insert_ct = 0;
2151 
2152   *A = B;
2153   ierr = OptionsHasName(PETSC_NULL,"-mat_use_hash_table",&flg); CHKERRQ(ierr);
2154   if (flg) {
2155     double fact = 1.39;
2156     ierr = MatSetOption(B,MAT_USE_HASH_TABLE); CHKERRQ(ierr);
2157     ierr = OptionsGetDouble(PETSC_NULL,"-mat_use_hash_table",&fact,&flg); CHKERRQ(ierr);
2158     if (fact <= 1.0) fact = 1.39;
2159     ierr = MatMPIBAIJSetHashTableFactor(B,fact); CHKERRQ(ierr);
2160     PLogInfo(0,"MatCreateMPIBAIJ:Hash table Factor used %5.2f\n",fact);
2161   }
2162   ierr = PetscObjectComposeFunction((PetscObject)B,"MatGetDiagonalBlock_C",
2163                                      "MatGetDiagonalBlock_MPIBAIJ",
2164                                      (void*)MatGetDiagonalBlock_MPIBAIJ);CHKERRQ(ierr);
2165   PetscFunctionReturn(0);
2166 }
2167 
2168 #undef __FUNC__
2169 #define __FUNC__ "MatDuplicate_MPIBAIJ"
2170 static int MatDuplicate_MPIBAIJ(Mat matin,MatDuplicateOption cpvalues,Mat *newmat)
2171 {
2172   Mat         mat;
2173   Mat_MPIBAIJ *a,*oldmat = (Mat_MPIBAIJ *) matin->data;
2174   int         ierr, len=0, flg;
2175 
2176   PetscFunctionBegin;
2177   *newmat       = 0;
2178   PetscHeaderCreate(mat,_p_Mat,struct _MatOps,MAT_COOKIE,MATMPIBAIJ,"Mat",matin->comm,MatDestroy,MatView);
2179   PLogObjectCreate(mat);
2180   mat->data       = (void *) (a = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(a);
2181   PetscMemcpy(mat->ops,&MatOps_Values,sizeof(struct _MatOps));
2182   mat->ops->destroy    = MatDestroy_MPIBAIJ;
2183   mat->ops->view       = MatView_MPIBAIJ;
2184   mat->factor          = matin->factor;
2185   mat->assembled       = PETSC_TRUE;
2186 
2187   a->m = mat->m   = oldmat->m;
2188   a->n = mat->n   = oldmat->n;
2189   a->M = mat->M   = oldmat->M;
2190   a->N = mat->N   = oldmat->N;
2191 
2192   a->bs  = oldmat->bs;
2193   a->bs2 = oldmat->bs2;
2194   a->mbs = oldmat->mbs;
2195   a->nbs = oldmat->nbs;
2196   a->Mbs = oldmat->Mbs;
2197   a->Nbs = oldmat->Nbs;
2198 
2199   a->rstart       = oldmat->rstart;
2200   a->rend         = oldmat->rend;
2201   a->cstart       = oldmat->cstart;
2202   a->cend         = oldmat->cend;
2203   a->size         = oldmat->size;
2204   a->rank         = oldmat->rank;
2205   mat->insertmode = NOT_SET_VALUES;
2206   a->rowvalues    = 0;
2207   a->getrowactive = PETSC_FALSE;
2208   a->barray       = 0;
2209 
2210   /* hash table stuff */
2211   a->ht           = 0;
2212   a->hd           = 0;
2213   a->ht_size      = 0;
2214   a->ht_flag      = oldmat->ht_flag;
2215   a->ht_fact      = oldmat->ht_fact;
2216   a->ht_total_ct  = 0;
2217   a->ht_insert_ct = 0;
2218 
2219 
2220   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
2221   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIBAIJ));
2222   a->cowners = a->rowners + a->size + 2;
2223   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
2224   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
2225   if (oldmat->colmap) {
2226 #if defined (USE_CTABLE)
2227   ierr = TableCreateCopy( &a->colmap,oldmat->colmap ); CHKERRQ(ierr);
2228 #else
2229     a->colmap = (int *) PetscMalloc((a->Nbs)*sizeof(int));CHKPTRQ(a->colmap);
2230     PLogObjectMemory(mat,(a->Nbs)*sizeof(int));
2231     PetscMemcpy(a->colmap,oldmat->colmap,(a->Nbs)*sizeof(int));
2232 #endif
2233   } else a->colmap = 0;
2234   if (oldmat->garray && (len = ((Mat_SeqBAIJ *) (oldmat->B->data))->nbs)) {
2235     a->garray = (int *) PetscMalloc(len*sizeof(int)); CHKPTRQ(a->garray);
2236     PLogObjectMemory(mat,len*sizeof(int));
2237     PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
2238   } else a->garray = 0;
2239 
2240   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
2241   PLogObjectParent(mat,a->lvec);
2242   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
2243 
2244   PLogObjectParent(mat,a->Mvctx);
2245   ierr =  MatDuplicate(oldmat->A,cpvalues,&a->A); CHKERRQ(ierr);
2246   PLogObjectParent(mat,a->A);
2247   ierr =  MatDuplicate(oldmat->B,cpvalues,&a->B); CHKERRQ(ierr);
2248   PLogObjectParent(mat,a->B);
2249   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
2250   ierr = FListDuplicate(mat->qlist,&matin->qlist);CHKERRQ(ierr);
2251   if (flg) {
2252     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
2253   }
2254   *newmat = mat;
2255   PetscFunctionReturn(0);
2256 }
2257 
2258 #include "sys.h"
2259 
2260 #undef __FUNC__
2261 #define __FUNC__ "MatLoad_MPIBAIJ"
2262 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
2263 {
2264   Mat          A;
2265   int          i, nz, ierr, j,rstart, rend, fd;
2266   Scalar       *vals,*buf;
2267   MPI_Comm     comm = ((PetscObject)viewer)->comm;
2268   MPI_Status   status;
2269   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
2270   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
2271   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,Mbs,mbs,extra_rows;
2272   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
2273   int          dcount,kmax,k,nzcount,tmp;
2274 
2275   PetscFunctionBegin;
2276   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
2277 
2278   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
2279   if (!rank) {
2280     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
2281     ierr = PetscBinaryRead(fd,(char *)header,4,PETSC_INT); CHKERRQ(ierr);
2282     if (header[0] != MAT_COOKIE) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"not matrix object");
2283     if (header[3] < 0) {
2284       SETERRQ(PETSC_ERR_FILE_UNEXPECTED,1,"Matrix stored in special format, cannot load as MPIBAIJ");
2285     }
2286   }
2287 
2288   ierr = MPI_Bcast(header+1,3,MPI_INT,0,comm);CHKERRQ(ierr);
2289   M = header[1]; N = header[2];
2290 
2291   if (M != N) SETERRQ(PETSC_ERR_SUP,0,"Can only do square matrices");
2292 
2293   /*
2294      This code adds extra rows to make sure the number of rows is
2295      divisible by the blocksize
2296   */
2297   Mbs        = M/bs;
2298   extra_rows = bs - M + bs*(Mbs);
2299   if (extra_rows == bs) extra_rows = 0;
2300   else                  Mbs++;
2301   if (extra_rows &&!rank) {
2302     PLogInfo(0,"MatLoad_MPIBAIJ:Padding loaded matrix to match blocksize\n");
2303   }
2304 
2305   /* determine ownership of all rows */
2306   mbs = Mbs/size + ((Mbs % size) > rank);
2307   m   = mbs * bs;
2308   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
2309   browners = rowners + size + 1;
2310   ierr = MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
2311   rowners[0] = 0;
2312   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
2313   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
2314   rstart = rowners[rank];
2315   rend   = rowners[rank+1];
2316 
2317   /* distribute row lengths to all processors */
2318   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
2319   if (!rank) {
2320     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
2321     ierr = PetscBinaryRead(fd,rowlengths,M,PETSC_INT); CHKERRQ(ierr);
2322     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
2323     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
2324     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
2325     ierr = MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);CHKERRQ(ierr);
2326     PetscFree(sndcounts);
2327   } else {
2328     ierr = MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);CHKERRQ(ierr);
2329   }
2330 
2331   if (!rank) {
2332     /* calculate the number of nonzeros on each processor */
2333     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
2334     PetscMemzero(procsnz,size*sizeof(int));
2335     for ( i=0; i<size; i++ ) {
2336       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
2337         procsnz[i] += rowlengths[j];
2338       }
2339     }
2340     PetscFree(rowlengths);
2341 
2342     /* determine max buffer needed and allocate it */
2343     maxnz = 0;
2344     for ( i=0; i<size; i++ ) {
2345       maxnz = PetscMax(maxnz,procsnz[i]);
2346     }
2347     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
2348 
2349     /* read in my part of the matrix column indices  */
2350     nz = procsnz[0];
2351     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
2352     mycols = ibuf;
2353     if (size == 1)  nz -= extra_rows;
2354     ierr = PetscBinaryRead(fd,mycols,nz,PETSC_INT); CHKERRQ(ierr);
2355     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
2356 
2357     /* read in every ones (except the last) and ship off */
2358     for ( i=1; i<size-1; i++ ) {
2359       nz   = procsnz[i];
2360       ierr = PetscBinaryRead(fd,cols,nz,PETSC_INT); CHKERRQ(ierr);
2361       ierr = MPI_Send(cols,nz,MPI_INT,i,tag,comm);CHKERRQ(ierr);
2362     }
2363     /* read in the stuff for the last proc */
2364     if ( size != 1 ) {
2365       nz   = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
2366       ierr = PetscBinaryRead(fd,cols,nz,PETSC_INT); CHKERRQ(ierr);
2367       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
2368       ierr = MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);CHKERRQ(ierr);
2369     }
2370     PetscFree(cols);
2371   } else {
2372     /* determine buffer space needed for message */
2373     nz = 0;
2374     for ( i=0; i<m; i++ ) {
2375       nz += locrowlens[i];
2376     }
2377     ibuf   = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
2378     mycols = ibuf;
2379     /* receive message of column indices*/
2380     ierr = MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);CHKERRQ(ierr);
2381     ierr = MPI_Get_count(&status,MPI_INT,&maxnz);CHKERRQ(ierr);
2382     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"something is wrong with file");
2383   }
2384 
2385   /* loop over local rows, determining number of off diagonal entries */
2386   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
2387   odlens = dlens + (rend-rstart);
2388   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
2389   PetscMemzero(mask,3*Mbs*sizeof(int));
2390   masked1 = mask    + Mbs;
2391   masked2 = masked1 + Mbs;
2392   rowcount = 0; nzcount = 0;
2393   for ( i=0; i<mbs; i++ ) {
2394     dcount  = 0;
2395     odcount = 0;
2396     for ( j=0; j<bs; j++ ) {
2397       kmax = locrowlens[rowcount];
2398       for ( k=0; k<kmax; k++ ) {
2399         tmp = mycols[nzcount++]/bs;
2400         if (!mask[tmp]) {
2401           mask[tmp] = 1;
2402           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
2403           else masked1[dcount++] = tmp;
2404         }
2405       }
2406       rowcount++;
2407     }
2408 
2409     dlens[i]  = dcount;
2410     odlens[i] = odcount;
2411 
2412     /* zero out the mask elements we set */
2413     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
2414     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
2415   }
2416 
2417   /* create our matrix */
2418   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);
2419          CHKERRQ(ierr);
2420   A = *newmat;
2421   MatSetOption(A,MAT_COLUMNS_SORTED);
2422 
2423   if (!rank) {
2424     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
2425     /* read in my part of the matrix numerical values  */
2426     nz = procsnz[0];
2427     vals = buf;
2428     mycols = ibuf;
2429     if (size == 1)  nz -= extra_rows;
2430     ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
2431     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
2432 
2433     /* insert into matrix */
2434     jj      = rstart*bs;
2435     for ( i=0; i<m; i++ ) {
2436       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
2437       mycols += locrowlens[i];
2438       vals   += locrowlens[i];
2439       jj++;
2440     }
2441     /* read in other processors (except the last one) and ship out */
2442     for ( i=1; i<size-1; i++ ) {
2443       nz   = procsnz[i];
2444       vals = buf;
2445       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
2446       ierr = MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);CHKERRQ(ierr);
2447     }
2448     /* the last proc */
2449     if ( size != 1 ){
2450       nz   = procsnz[i] - extra_rows;
2451       vals = buf;
2452       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
2453       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
2454       ierr = MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);CHKERRQ(ierr);
2455     }
2456     PetscFree(procsnz);
2457   } else {
2458     /* receive numeric values */
2459     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
2460 
2461     /* receive message of values*/
2462     vals   = buf;
2463     mycols = ibuf;
2464     ierr   = MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);CHKERRQ(ierr);
2465     ierr   = MPI_Get_count(&status,MPIU_SCALAR,&maxnz);CHKERRQ(ierr);
2466     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"something is wrong with file");
2467 
2468     /* insert into matrix */
2469     jj      = rstart*bs;
2470     for ( i=0; i<m; i++ ) {
2471       ierr    = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
2472       mycols += locrowlens[i];
2473       vals   += locrowlens[i];
2474       jj++;
2475     }
2476   }
2477   PetscFree(locrowlens);
2478   PetscFree(buf);
2479   PetscFree(ibuf);
2480   PetscFree(rowners);
2481   PetscFree(dlens);
2482   PetscFree(mask);
2483   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2484   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2485   PetscFunctionReturn(0);
2486 }
2487 
2488 
2489 
2490 #undef __FUNC__
2491 #define __FUNC__ "MatMPIBAIJSetHashTableFactor"
2492 /*@
2493    MatMPIBAIJSetHashTableFactor - Sets the factor required to compute the size of the HashTable.
2494 
2495    Input Parameters:
2496 .  mat  - the matrix
2497 .  fact - factor
2498 
2499    Collective on Mat
2500 
2501   Notes:
2502    This can also be set by the command line option: -mat_use_hash_table fact
2503 
2504 .keywords: matrix, hashtable, factor, HT
2505 
2506 .seealso: MatSetOption()
2507 @*/
2508 int MatMPIBAIJSetHashTableFactor(Mat mat,double fact)
2509 {
2510   Mat_MPIBAIJ *baij;
2511 
2512   PetscFunctionBegin;
2513   PetscValidHeaderSpecific(mat,MAT_COOKIE);
2514   if (mat->type != MATMPIBAIJ) {
2515       SETERRQ(PETSC_ERR_ARG_WRONG,1,"Incorrect matrix type. Use MPIBAIJ only.");
2516   }
2517   baij = (Mat_MPIBAIJ*) mat->data;
2518   baij->ht_fact = fact;
2519   PetscFunctionReturn(0);
2520 }
2521