xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision 6525c446fdebc33f8f2914de551b2af5a320bf84)
1 #ifdef PETSC_RCS_HEADER
2 static char vcid[] = "$Id: mpibaij.c,v 1.150 1999/02/17 19:09:54 balay Exp balay $";
3 #endif
4 
5 #include "src/mat/impls/baij/mpi/mpibaij.h"   /*I  "mat.h"  I*/
6 #include "src/vec/vecimpl.h"
7 
8 extern int MatSetUpMultiply_MPIBAIJ(Mat);
9 extern int DisAssemble_MPIBAIJ(Mat);
10 extern int MatIncreaseOverlap_MPIBAIJ(Mat,int,IS *,int);
11 extern int MatGetSubMatrices_MPIBAIJ(Mat,int,IS *,IS *,MatReuse,Mat **);
12 extern int MatGetValues_SeqBAIJ(Mat,int,int *,int,int *,Scalar *);
13 
14 /*
15      Local utility routine that creates a mapping from the global column
16    number to the local number in the off-diagonal part of the local
17    storage of the matrix.  This is done in a non scable way since the
18    length of colmap equals the global matrix length.
19 */
20 #undef __FUNC__
21 #define __FUNC__ "CreateColmap_MPIBAIJ_Private"
22 static int CreateColmap_MPIBAIJ_Private(Mat mat)
23 {
24   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
25   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
26   int         nbs = B->nbs,i,bs=B->bs;
27 #if defined (USE_CTABLE)
28   int         ierr;
29 #endif
30 
31   PetscFunctionBegin;
32 #if defined (USE_CTABLE)
33   ierr = TableCreate( &baij->colmap, baij->nbs/5 ); CHKERRQ(ierr);
34   for ( i=0; i<nbs; i++ ){
35     ierr = TableAdd( baij->colmap, baij->garray[i] + 1, i*bs+1 );CHKERRQ(ierr);
36   }
37 #else
38   baij->colmap = (int *) PetscMalloc((baij->Nbs+1)*sizeof(int));CHKPTRQ(baij->colmap);
39   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
40   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
41   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i*bs+1;
42 #endif
43   PetscFunctionReturn(0);
44 }
45 
46 #define CHUNKSIZE  10
47 
48 #define  MatSetValues_SeqBAIJ_A_Private(row,col,value,addv) \
49 { \
50  \
51     brow = row/bs;  \
52     rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
53     rmax = aimax[brow]; nrow = ailen[brow]; \
54       bcol = col/bs; \
55       ridx = row % bs; cidx = col % bs; \
56       low = 0; high = nrow; \
57       while (high-low > 3) { \
58         t = (low+high)/2; \
59         if (rp[t] > bcol) high = t; \
60         else              low  = t; \
61       } \
62       for ( _i=low; _i<high; _i++ ) { \
63         if (rp[_i] > bcol) break; \
64         if (rp[_i] == bcol) { \
65           bap  = ap +  bs2*_i + bs*cidx + ridx; \
66           if (addv == ADD_VALUES) *bap += value;  \
67           else                    *bap  = value;  \
68           goto a_noinsert; \
69         } \
70       } \
71       if (a->nonew == 1) goto a_noinsert; \
72       else if (a->nonew == -1) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero into matrix"); \
73       if (nrow >= rmax) { \
74         /* there is no extra room in row, therefore enlarge */ \
75         int    new_nz = ai[a->mbs] + CHUNKSIZE,len,*new_i,*new_j; \
76         Scalar *new_a; \
77  \
78         if (a->nonew == -2) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero in the matrix"); \
79  \
80         /* malloc new storage space */ \
81         len     = new_nz*(sizeof(int)+bs2*sizeof(Scalar))+(a->mbs+1)*sizeof(int); \
82         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
83         new_j   = (int *) (new_a + bs2*new_nz); \
84         new_i   = new_j + new_nz; \
85  \
86         /* copy over old data into new slots */ \
87         for ( ii=0; ii<brow+1; ii++ ) {new_i[ii] = ai[ii];} \
88         for ( ii=brow+1; ii<a->mbs+1; ii++ ) {new_i[ii] = ai[ii]+CHUNKSIZE;} \
89         PetscMemcpy(new_j,aj,(ai[brow]+nrow)*sizeof(int)); \
90         len = (new_nz - CHUNKSIZE - ai[brow] - nrow); \
91         PetscMemcpy(new_j+ai[brow]+nrow+CHUNKSIZE,aj+ai[brow]+nrow, \
92                                                            len*sizeof(int)); \
93         PetscMemcpy(new_a,aa,(ai[brow]+nrow)*bs2*sizeof(Scalar)); \
94         PetscMemzero(new_a+bs2*(ai[brow]+nrow),bs2*CHUNKSIZE*sizeof(Scalar)); \
95         PetscMemcpy(new_a+bs2*(ai[brow]+nrow+CHUNKSIZE), \
96                     aa+bs2*(ai[brow]+nrow),bs2*len*sizeof(Scalar));  \
97         /* free up old matrix storage */ \
98         PetscFree(a->a);  \
99         if (!a->singlemalloc) {PetscFree(a->i);PetscFree(a->j);} \
100         aa = a->a = new_a; ai = a->i = new_i; aj = a->j = new_j;  \
101         a->singlemalloc = 1; \
102  \
103         rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
104         rmax = aimax[brow] = aimax[brow] + CHUNKSIZE; \
105         PLogObjectMemory(A,CHUNKSIZE*(sizeof(int) + bs2*sizeof(Scalar))); \
106         a->maxnz += bs2*CHUNKSIZE; \
107         a->reallocs++; \
108         a->nz++; \
109       } \
110       N = nrow++ - 1;  \
111       /* shift up all the later entries in this row */ \
112       for ( ii=N; ii>=_i; ii-- ) { \
113         rp[ii+1] = rp[ii]; \
114         PetscMemcpy(ap+bs2*(ii+1),ap+bs2*(ii),bs2*sizeof(Scalar)); \
115       } \
116       if (N>=_i) PetscMemzero(ap+bs2*_i,bs2*sizeof(Scalar));  \
117       rp[_i]                      = bcol;  \
118       ap[bs2*_i + bs*cidx + ridx] = value;  \
119       a_noinsert:; \
120     ailen[brow] = nrow; \
121 }
122 
123 #define  MatSetValues_SeqBAIJ_B_Private(row,col,value,addv) \
124 { \
125  \
126     brow = row/bs;  \
127     rp   = bj + bi[brow]; ap = ba + bs2*bi[brow]; \
128     rmax = bimax[brow]; nrow = bilen[brow]; \
129       bcol = col/bs; \
130       ridx = row % bs; cidx = col % bs; \
131       low = 0; high = nrow; \
132       while (high-low > 3) { \
133         t = (low+high)/2; \
134         if (rp[t] > bcol) high = t; \
135         else              low  = t; \
136       } \
137       for ( _i=low; _i<high; _i++ ) { \
138         if (rp[_i] > bcol) break; \
139         if (rp[_i] == bcol) { \
140           bap  = ap +  bs2*_i + bs*cidx + ridx; \
141           if (addv == ADD_VALUES) *bap += value;  \
142           else                    *bap  = value;  \
143           goto b_noinsert; \
144         } \
145       } \
146       if (b->nonew == 1) goto b_noinsert; \
147       else if (b->nonew == -1) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero into matrix"); \
148       if (nrow >= rmax) { \
149         /* there is no extra room in row, therefore enlarge */ \
150         int    new_nz = bi[b->mbs] + CHUNKSIZE,len,*new_i,*new_j; \
151         Scalar *new_a; \
152  \
153         if (b->nonew == -2) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero in the matrix"); \
154  \
155         /* malloc new storage space */ \
156         len     = new_nz*(sizeof(int)+bs2*sizeof(Scalar))+(b->mbs+1)*sizeof(int); \
157         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
158         new_j   = (int *) (new_a + bs2*new_nz); \
159         new_i   = new_j + new_nz; \
160  \
161         /* copy over old data into new slots */ \
162         for ( ii=0; ii<brow+1; ii++ ) {new_i[ii] = bi[ii];} \
163         for ( ii=brow+1; ii<b->mbs+1; ii++ ) {new_i[ii] = bi[ii]+CHUNKSIZE;} \
164         PetscMemcpy(new_j,bj,(bi[brow]+nrow)*sizeof(int)); \
165         len = (new_nz - CHUNKSIZE - bi[brow] - nrow); \
166         PetscMemcpy(new_j+bi[brow]+nrow+CHUNKSIZE,bj+bi[brow]+nrow, \
167                                                            len*sizeof(int)); \
168         PetscMemcpy(new_a,ba,(bi[brow]+nrow)*bs2*sizeof(Scalar)); \
169         PetscMemzero(new_a+bs2*(bi[brow]+nrow),bs2*CHUNKSIZE*sizeof(Scalar)); \
170         PetscMemcpy(new_a+bs2*(bi[brow]+nrow+CHUNKSIZE), \
171                     ba+bs2*(bi[brow]+nrow),bs2*len*sizeof(Scalar));  \
172         /* free up old matrix storage */ \
173         PetscFree(b->a);  \
174         if (!b->singlemalloc) {PetscFree(b->i);PetscFree(b->j);} \
175         ba = b->a = new_a; bi = b->i = new_i; bj = b->j = new_j;  \
176         b->singlemalloc = 1; \
177  \
178         rp   = bj + bi[brow]; ap = ba + bs2*bi[brow]; \
179         rmax = bimax[brow] = bimax[brow] + CHUNKSIZE; \
180         PLogObjectMemory(B,CHUNKSIZE*(sizeof(int) + bs2*sizeof(Scalar))); \
181         b->maxnz += bs2*CHUNKSIZE; \
182         b->reallocs++; \
183         b->nz++; \
184       } \
185       N = nrow++ - 1;  \
186       /* shift up all the later entries in this row */ \
187       for ( ii=N; ii>=_i; ii-- ) { \
188         rp[ii+1] = rp[ii]; \
189         PetscMemcpy(ap+bs2*(ii+1),ap+bs2*(ii),bs2*sizeof(Scalar)); \
190       } \
191       if (N>=_i) PetscMemzero(ap+bs2*_i,bs2*sizeof(Scalar));  \
192       rp[_i]                      = bcol;  \
193       ap[bs2*_i + bs*cidx + ridx] = value;  \
194       b_noinsert:; \
195     bilen[brow] = nrow; \
196 }
197 
198 #undef __FUNC__
199 #define __FUNC__ "MatSetValues_MPIBAIJ"
200 int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
201 {
202   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
203   Scalar      value;
204   int         ierr,i,j,row,col;
205   int         roworiented = baij->roworiented,rstart_orig=baij->rstart_bs ;
206   int         rend_orig=baij->rend_bs,cstart_orig=baij->cstart_bs;
207   int         cend_orig=baij->cend_bs,bs=baij->bs;
208 
209   /* Some Variables required in the macro */
210   Mat         A = baij->A;
211   Mat_SeqBAIJ *a = (Mat_SeqBAIJ *) (A)->data;
212   int         *aimax=a->imax,*ai=a->i,*ailen=a->ilen,*aj=a->j;
213   Scalar      *aa=a->a;
214 
215   Mat         B = baij->B;
216   Mat_SeqBAIJ *b = (Mat_SeqBAIJ *) (B)->data;
217   int         *bimax=b->imax,*bi=b->i,*bilen=b->ilen,*bj=b->j;
218   Scalar      *ba=b->a;
219 
220   int         *rp,ii,nrow,_i,rmax,N,brow,bcol;
221   int         low,high,t,ridx,cidx,bs2=a->bs2;
222   Scalar      *ap,*bap;
223 
224   PetscFunctionBegin;
225   for ( i=0; i<m; i++ ) {
226     if (im[i] < 0) continue;
227 #if defined(USE_PETSC_BOPT_g)
228     if (im[i] >= baij->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
229 #endif
230     if (im[i] >= rstart_orig && im[i] < rend_orig) {
231       row = im[i] - rstart_orig;
232       for ( j=0; j<n; j++ ) {
233         if (in[j] >= cstart_orig && in[j] < cend_orig){
234           col = in[j] - cstart_orig;
235           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
236           MatSetValues_SeqBAIJ_A_Private(row,col,value,addv);
237           /* ierr = MatSetValues_SeqBAIJ(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
238         }
239         else if (in[j] < 0) continue;
240 #if defined(USE_PETSC_BOPT_g)
241         else if (in[j] >= baij->N) {SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Col too large");}
242 #endif
243         else {
244           if (mat->was_assembled) {
245             if (!baij->colmap) {
246               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
247             }
248 #if defined (USE_CTABLE)
249 	    col = TableFind( baij->colmap, in[j]/bs + 1 ) - 1 + in[j]%bs;
250 #else
251             col = baij->colmap[in[j]/bs] - 1 + in[j]%bs;
252 #endif
253             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
254               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
255               col =  in[j];
256               /* Reinitialize the variables required by MatSetValues_SeqBAIJ_B_Private() */
257               B = baij->B;
258               b = (Mat_SeqBAIJ *) (B)->data;
259               bimax=b->imax;bi=b->i;bilen=b->ilen;bj=b->j;
260               ba=b->a;
261             }
262           } else col = in[j];
263           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
264           MatSetValues_SeqBAIJ_B_Private(row,col,value,addv);
265           /* ierr = MatSetValues_SeqBAIJ(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
266         }
267       }
268     } else {
269       if (roworiented && !baij->donotstash) {
270         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
271       } else {
272         if (!baij->donotstash) {
273           row = im[i];
274 	  for ( j=0; j<n; j++ ) {
275 	    ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
276           }
277         }
278       }
279     }
280   }
281   PetscFunctionReturn(0);
282 }
283 
284 extern int MatSetValuesBlocked_SeqBAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
285 #undef __FUNC__
286 #define __FUNC__ "MatSetValuesBlocked_MPIBAIJ"
287 int MatSetValuesBlocked_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
288 {
289   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
290   Scalar      *value,*barray=baij->barray;
291   int         ierr,i,j,ii,jj,row,col,k,l;
292   int         roworiented = baij->roworiented,rstart=baij->rstart ;
293   int         rend=baij->rend,cstart=baij->cstart,stepval;
294   int         cend=baij->cend,bs=baij->bs,bs2=baij->bs2;
295 
296   if(!barray) {
297     baij->barray = barray = (Scalar*) PetscMalloc(bs2*sizeof(Scalar)); CHKPTRQ(barray);
298   }
299 
300   if (roworiented) {
301     stepval = (n-1)*bs;
302   } else {
303     stepval = (m-1)*bs;
304   }
305   for ( i=0; i<m; i++ ) {
306     if (im[i] < 0) continue;
307 #if defined(USE_PETSC_BOPT_g)
308     if (im[i] >= baij->Mbs) SETERRQ2(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large, row %d max %d",im[i],baij->Mbs);
309 #endif
310     if (im[i] >= rstart && im[i] < rend) {
311       row = im[i] - rstart;
312       for ( j=0; j<n; j++ ) {
313         /* If NumCol = 1 then a copy is not required */
314         if ((roworiented) && (n == 1)) {
315           barray = v + i*bs2;
316         } else if((!roworiented) && (m == 1)) {
317           barray = v + j*bs2;
318         } else { /* Here a copy is required */
319           if (roworiented) {
320             value = v + i*(stepval+bs)*bs + j*bs;
321           } else {
322             value = v + j*(stepval+bs)*bs + i*bs;
323           }
324           for ( ii=0; ii<bs; ii++,value+=stepval ) {
325             for (jj=0; jj<bs; jj++ ) {
326               *barray++  = *value++;
327             }
328           }
329           barray -=bs2;
330         }
331 
332         if (in[j] >= cstart && in[j] < cend){
333           col  = in[j] - cstart;
334           ierr = MatSetValuesBlocked_SeqBAIJ(baij->A,1,&row,1,&col,barray,addv);CHKERRQ(ierr);
335         }
336         else if (in[j] < 0) continue;
337 #if defined(USE_PETSC_BOPT_g)
338         else if (in[j] >= baij->Nbs) {SETERRQ2(PETSC_ERR_ARG_OUTOFRANGE,0,"Column too large, col %d max %d",in[j],baij->Nbs);}
339 #endif
340         else {
341           if (mat->was_assembled) {
342             if (!baij->colmap) {
343               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
344             }
345 
346 #if defined(USE_PETSC_BOPT_g)
347 #if defined (USE_CTABLE)
348             if( (TableFind( baij->colmap, in[j] + 1 ) - 1) % bs)
349 	      {SETERRQ(PETSC_ERR_PLIB,0,"Incorrect colmap");}
350 #else
351             if ((baij->colmap[in[j]] - 1) % bs) {SETERRQ(PETSC_ERR_PLIB,0,"Incorrect colmap");}
352 #endif
353 #endif
354 #if defined (USE_CTABLE)
355 	    col = (TableFind( baij->colmap, in[j] + 1 ) - 1)/bs;
356 #else
357             col = (baij->colmap[in[j]] - 1)/bs;
358 #endif
359             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
360               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
361               col =  in[j];
362             }
363           }
364           else col = in[j];
365           ierr = MatSetValuesBlocked_SeqBAIJ(baij->B,1,&row,1,&col,barray,addv);CHKERRQ(ierr);
366         }
367       }
368     } else {
369       if (!baij->donotstash) {
370         if (roworiented ) {
371           row   = im[i]*bs;
372           value = v + i*(stepval+bs)*bs;
373           for ( j=0; j<bs; j++,row++ ) {
374             for ( k=0; k<n; k++ ) {
375               for ( col=in[k]*bs,l=0; l<bs; l++,col++) {
376                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
377               }
378             }
379           }
380         } else {
381           for ( j=0; j<n; j++ ) {
382             value = v + j*(stepval+bs)*bs + i*bs;
383             col   = in[j]*bs;
384             for ( k=0; k<bs; k++,col++,value+=stepval) {
385               for ( row = im[i]*bs,l=0; l<bs; l++,row++) {
386                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
387               }
388             }
389           }
390         }
391       }
392     }
393   }
394   PetscFunctionReturn(0);
395 }
396 #define HASH_KEY 0.6180339887
397 /* #define HASH1(size,key) ((int)((size)*fmod(((key)*HASH_KEY),1))) */
398 #define HASH(size,key,tmp) (tmp = (key)*HASH_KEY,(int)((size)*(tmp-(int)tmp)))
399 /* #define HASH(size,key,tmp) ((int)((size)*fmod(((key)*HASH_KEY),1))) */
400 #undef __FUNC__
401 #define __FUNC__ "MatSetValues_MPIBAIJ_HT"
402 int MatSetValues_MPIBAIJ_HT(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
403 {
404   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
405   int         ierr,i,j,row,col;
406   int         roworiented = baij->roworiented,rstart_orig=baij->rstart_bs ;
407   int         rend_orig=baij->rend_bs,Nbs=baij->Nbs;
408   int         h1,key,size=baij->ht_size,bs=baij->bs,*HT=baij->ht,idx;
409   double      tmp;
410   Scalar      ** HD = baij->hd,value;
411 #if defined(USE_PETSC_BOPT_g)
412   int         total_ct=baij->ht_total_ct,insert_ct=baij->ht_insert_ct;
413 #endif
414 
415   PetscFunctionBegin;
416 
417   for ( i=0; i<m; i++ ) {
418 #if defined(USE_PETSC_BOPT_g)
419     if (im[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
420     if (im[i] >= baij->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
421 #endif
422       row = im[i];
423     if (row >= rstart_orig && row < rend_orig) {
424       for ( j=0; j<n; j++ ) {
425         col = in[j];
426         if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
427         /* Look up into the Hash Table */
428         key = (row/bs)*Nbs+(col/bs)+1;
429         h1  = HASH(size,key,tmp);
430 
431 
432         idx = h1;
433 #if defined(USE_PETSC_BOPT_g)
434         insert_ct++;
435         total_ct++;
436         if (HT[idx] != key) {
437           for ( idx=h1; (idx<size) && (HT[idx]!=key); idx++,total_ct++);
438           if (idx == size) {
439             for ( idx=0; (idx<h1) && (HT[idx]!=key); idx++,total_ct++);
440             if (idx == h1) {
441               SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"(row,col) has no entry in the hash table");
442             }
443           }
444         }
445 #else
446         if (HT[idx] != key) {
447           for ( idx=h1; (idx<size) && (HT[idx]!=key); idx++);
448           if (idx == size) {
449             for ( idx=0; (idx<h1) && (HT[idx]!=key); idx++);
450             if (idx == h1) {
451               SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"(row,col) has no entry in the hash table");
452             }
453           }
454         }
455 #endif
456         /* A HASH table entry is found, so insert the values at the correct address */
457         if (addv == ADD_VALUES) *(HD[idx]+ (col % bs)*bs + (row % bs)) += value;
458         else                    *(HD[idx]+ (col % bs)*bs + (row % bs))  = value;
459       }
460     } else {
461       if (roworiented && !baij->donotstash) {
462         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
463       } else {
464         if (!baij->donotstash) {
465           row = im[i];
466 	  for ( j=0; j<n; j++ ) {
467 	    ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
468           }
469         }
470       }
471     }
472   }
473 #if defined(USE_PETSC_BOPT_g)
474   baij->ht_total_ct = total_ct;
475   baij->ht_insert_ct = insert_ct;
476 #endif
477   PetscFunctionReturn(0);
478 }
479 
480 #undef __FUNC__
481 #define __FUNC__ "MatSetValuesBlocked_MPIBAIJ_HT"
482 int MatSetValuesBlocked_MPIBAIJ_HT(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
483 {
484   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
485   int         ierr,i,j,ii,jj,row,col,k,l;
486   int         roworiented = baij->roworiented,rstart=baij->rstart ;
487   int         rend=baij->rend,stepval,bs=baij->bs,bs2=baij->bs2;
488   int         h1,key,size=baij->ht_size,idx,*HT=baij->ht,Nbs=baij->Nbs;
489   double      tmp;
490   Scalar      ** HD = baij->hd,*value,*v_t,*baij_a;
491 #if defined(USE_PETSC_BOPT_g)
492   int         total_ct=baij->ht_total_ct,insert_ct=baij->ht_insert_ct;
493 #endif
494 
495   PetscFunctionBegin;
496 
497   if (roworiented) {
498     stepval = (n-1)*bs;
499   } else {
500     stepval = (m-1)*bs;
501   }
502   for ( i=0; i<m; i++ ) {
503 #if defined(USE_PETSC_BOPT_g)
504     if (im[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
505     if (im[i] >= baij->Mbs) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
506 #endif
507     row   = im[i];
508     v_t   = v + i*bs2;
509     if (row >= rstart && row < rend) {
510       for ( j=0; j<n; j++ ) {
511         col = in[j];
512 
513         /* Look up into the Hash Table */
514         key = row*Nbs+col+1;
515         h1  = HASH(size,key,tmp);
516 
517         idx = h1;
518 #if defined(USE_PETSC_BOPT_g)
519         total_ct++;
520         insert_ct++;
521        if (HT[idx] != key) {
522           for ( idx=h1; (idx<size) && (HT[idx]!=key); idx++,total_ct++);
523           if (idx == size) {
524             for ( idx=0; (idx<h1) && (HT[idx]!=key); idx++,total_ct++);
525             if (idx == h1) {
526               SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"(row,col) has no entry in the hash table");
527             }
528           }
529         }
530 #else
531         if (HT[idx] != key) {
532           for ( idx=h1; (idx<size) && (HT[idx]!=key); idx++);
533           if (idx == size) {
534             for ( idx=0; (idx<h1) && (HT[idx]!=key); idx++);
535             if (idx == h1) {
536               SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"(row,col) has no entry in the hash table");
537             }
538           }
539         }
540 #endif
541         baij_a = HD[idx];
542         if (roworiented) {
543           /*value = v + i*(stepval+bs)*bs + j*bs;*/
544           /* value = v + (i*(stepval+bs)+j)*bs; */
545           value = v_t;
546           v_t  += bs;
547           if (addv == ADD_VALUES) {
548             for ( ii=0; ii<bs; ii++,value+=stepval) {
549               for ( jj=ii; jj<bs2; jj+=bs ) {
550                 baij_a[jj]  += *value++;
551               }
552             }
553           } else {
554             for ( ii=0; ii<bs; ii++,value+=stepval) {
555               for ( jj=ii; jj<bs2; jj+=bs ) {
556                 baij_a[jj]  = *value++;
557               }
558             }
559           }
560         } else {
561           value = v + j*(stepval+bs)*bs + i*bs;
562           if (addv == ADD_VALUES) {
563             for ( ii=0; ii<bs; ii++,value+=stepval,baij_a+=bs ) {
564               for ( jj=0; jj<bs; jj++ ) {
565                 baij_a[jj]  += *value++;
566               }
567             }
568           } else {
569             for ( ii=0; ii<bs; ii++,value+=stepval,baij_a+=bs ) {
570               for ( jj=0; jj<bs; jj++ ) {
571                 baij_a[jj]  = *value++;
572               }
573             }
574           }
575         }
576       }
577     } else {
578       if (!baij->donotstash) {
579         if (roworiented ) {
580           row   = im[i]*bs;
581           value = v + i*(stepval+bs)*bs;
582           for ( j=0; j<bs; j++,row++ ) {
583             for ( k=0; k<n; k++ ) {
584               for ( col=in[k]*bs,l=0; l<bs; l++,col++) {
585                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
586               }
587             }
588           }
589         } else {
590           for ( j=0; j<n; j++ ) {
591             value = v + j*(stepval+bs)*bs + i*bs;
592             col   = in[j]*bs;
593             for ( k=0; k<bs; k++,col++,value+=stepval) {
594               for ( row = im[i]*bs,l=0; l<bs; l++,row++) {
595                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
596               }
597             }
598           }
599         }
600       }
601     }
602   }
603 #if defined(USE_PETSC_BOPT_g)
604   baij->ht_total_ct = total_ct;
605   baij->ht_insert_ct = insert_ct;
606 #endif
607   PetscFunctionReturn(0);
608 }
609 
610 #undef __FUNC__
611 #define __FUNC__ "MatGetValues_MPIBAIJ"
612 int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
613 {
614   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
615   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
616   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col,data;
617 
618   PetscFunctionBegin;
619   for ( i=0; i<m; i++ ) {
620     if (idxm[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
621     if (idxm[i] >= baij->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
622     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
623       row = idxm[i] - bsrstart;
624       for ( j=0; j<n; j++ ) {
625         if (idxn[j] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative column");
626         if (idxn[j] >= baij->N) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Column too large");
627         if (idxn[j] >= bscstart && idxn[j] < bscend){
628           col = idxn[j] - bscstart;
629           ierr = MatGetValues_SeqBAIJ(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
630         } else {
631           if (!baij->colmap) {
632             ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
633           }
634 #if defined (USE_CTABLE)
635           data = TableFind( baij->colmap, idxn[j]/bs + 1 ) - 1;
636 #else
637           data = baij->colmap[idxn[j]/bs]-1;
638 #endif
639           if((data < 0) || (baij->garray[data/bs] != idxn[j]/bs)) *(v+i*n+j) = 0.0;
640           else {
641             col  = data + idxn[j]%bs;
642             ierr = MatGetValues_SeqBAIJ(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
643           }
644         }
645       }
646     } else {
647       SETERRQ(PETSC_ERR_SUP,0,"Only local values currently supported");
648     }
649   }
650  PetscFunctionReturn(0);
651 }
652 
653 #undef __FUNC__
654 #define __FUNC__ "MatNorm_MPIBAIJ"
655 int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
656 {
657   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
658   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
659   int        ierr, i,bs2=baij->bs2;
660   double     sum = 0.0;
661   Scalar     *v;
662 
663   PetscFunctionBegin;
664   if (baij->size == 1) {
665     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
666   } else {
667     if (type == NORM_FROBENIUS) {
668       v = amat->a;
669       for (i=0; i<amat->nz*bs2; i++ ) {
670 #if defined(USE_PETSC_COMPLEX)
671         sum += PetscReal(PetscConj(*v)*(*v)); v++;
672 #else
673         sum += (*v)*(*v); v++;
674 #endif
675       }
676       v = bmat->a;
677       for (i=0; i<bmat->nz*bs2; i++ ) {
678 #if defined(USE_PETSC_COMPLEX)
679         sum += PetscReal(PetscConj(*v)*(*v)); v++;
680 #else
681         sum += (*v)*(*v); v++;
682 #endif
683       }
684       ierr = MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);CHKERRQ(ierr);
685       *norm = sqrt(*norm);
686     } else {
687       SETERRQ(PETSC_ERR_SUP,0,"No support for this norm yet");
688     }
689   }
690   PetscFunctionReturn(0);
691 }
692 
693 #undef __FUNC__
694 #define __FUNC__ "MatAssemblyBegin_MPIBAIJ"
695 int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
696 {
697   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
698   MPI_Comm    comm = mat->comm;
699   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
700   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
701   MPI_Request *send_waits,*recv_waits;
702   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
703   InsertMode  addv;
704   Scalar      *rvalues,*svalues;
705 
706   PetscFunctionBegin;
707   if (baij->donotstash) {
708     baij->svalues    = 0; baij->rvalues    = 0;
709     baij->nsends     = 0; baij->nrecvs     = 0;
710     baij->send_waits = 0; baij->recv_waits = 0;
711     baij->rmax       = 0;
712     PetscFunctionReturn(0);
713   }
714 
715   /* make sure all processors are either in INSERTMODE or ADDMODE */
716   ierr = MPI_Allreduce(&mat->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);CHKERRQ(ierr);
717   if (addv == (ADD_VALUES|INSERT_VALUES)) {
718     SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"Some processors inserted others added");
719   }
720   mat->insertmode = addv; /* in case this processor had no cache */
721 
722   /*  first count number of contributors to each processor */
723   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
724   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
725   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
726   for ( i=0; i<baij->stash.n; i++ ) {
727     idx = baij->stash.idx[i];
728     for ( j=0; j<size; j++ ) {
729       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
730         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
731       }
732     }
733   }
734   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
735 
736   /* inform other processors of number of messages and max length*/
737   work      = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
738   ierr      = MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
739   nreceives = work[rank];
740   ierr      = MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);CHKERRQ(ierr);
741   nmax      = work[rank];
742   PetscFree(work);
743 
744   /* post receives:
745        1) each message will consist of ordered pairs
746      (global index,value) we store the global index as a double
747      to simplify the message passing.
748        2) since we don't know how long each individual message is we
749      allocate the largest needed buffer for each receive. Potentially
750      this is a lot of wasted space.
751 
752 
753        This could be done better.
754   */
755   rvalues    = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));CHKPTRQ(rvalues);
756   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
757   for ( i=0; i<nreceives; i++ ) {
758     ierr = MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,comm,recv_waits+i);CHKERRQ(ierr);
759   }
760 
761   /* do sends:
762       1) starts[i] gives the starting index in svalues for stuff going to
763          the ith processor
764   */
765   svalues    = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
766   send_waits = (MPI_Request *) PetscMalloc((nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
767   starts     = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
768   starts[0] = 0;
769   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
770   for ( i=0; i<baij->stash.n; i++ ) {
771     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
772     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
773     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
774   }
775   PetscFree(owner);
776   starts[0] = 0;
777   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
778   count = 0;
779   for ( i=0; i<size; i++ ) {
780     if (procs[i]) {
781       ierr = MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,comm,send_waits+count++);CHKERRQ(ierr);
782     }
783   }
784   PetscFree(starts); PetscFree(nprocs);
785 
786   /* Free cache space */
787   PLogInfo(baij->A,"MatAssemblyBegin_MPIBAIJ:Number of off-processor values %d\n",baij->stash.n);
788   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
789 
790   baij->svalues    = svalues;    baij->rvalues    = rvalues;
791   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
792   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
793   baij->rmax       = nmax;
794 
795   PetscFunctionReturn(0);
796 }
797 
798 /*
799   Creates the hash table, and sets the table
800   This table is created only once.
801   If new entried need to be added to the matrix
802   then the hash table has to be destroyed and
803   recreated.
804 */
805 #undef __FUNC__
806 #define __FUNC__ "MatCreateHashTable_MPIBAIJ_Private"
807 int MatCreateHashTable_MPIBAIJ_Private(Mat mat,double factor)
808 {
809   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
810   Mat         A = baij->A, B=baij->B;
811   Mat_SeqBAIJ *a=(Mat_SeqBAIJ *)A->data, *b=(Mat_SeqBAIJ *)B->data;
812   int         i,j,k,nz=a->nz+b->nz,h1,*ai=a->i,*aj=a->j,*bi=b->i,*bj=b->j;
813   int         size,bs2=baij->bs2,rstart=baij->rstart;
814   int         cstart=baij->cstart,*garray=baij->garray,row,col,Nbs=baij->Nbs;
815   int         *HT,key;
816   Scalar      **HD;
817   double      tmp;
818 #if defined(USE_PETSC_BOPT_g)
819   int         ct=0,max=0;
820 #endif
821 
822   PetscFunctionBegin;
823   baij->ht_size=(int)(factor*nz);
824   size = baij->ht_size;
825 
826   if (baij->ht) {
827     PetscFunctionReturn(0);
828   }
829 
830   /* Allocate Memory for Hash Table */
831   baij->hd = (Scalar**)PetscMalloc((size)*(sizeof(int)+sizeof(Scalar*))+1); CHKPTRQ(baij->hd);
832   baij->ht = (int*)(baij->hd + size);
833   HD = baij->hd;
834   HT = baij->ht;
835 
836 
837   PetscMemzero(HD,size*(sizeof(int)+sizeof(Scalar*)));
838 
839 
840   /* Loop Over A */
841   for ( i=0; i<a->mbs; i++ ) {
842     for ( j=ai[i]; j<ai[i+1]; j++ ) {
843       row = i+rstart;
844       col = aj[j]+cstart;
845 
846       key = row*Nbs + col + 1;
847       h1  = HASH(size,key,tmp);
848       for ( k=0; k<size; k++ ){
849         if (HT[(h1+k)%size] == 0.0) {
850           HT[(h1+k)%size] = key;
851           HD[(h1+k)%size] = a->a + j*bs2;
852           break;
853 #if defined(USE_PETSC_BOPT_g)
854         } else {
855           ct++;
856 #endif
857         }
858       }
859 #if defined(USE_PETSC_BOPT_g)
860       if (k> max) max = k;
861 #endif
862     }
863   }
864   /* Loop Over B */
865   for ( i=0; i<b->mbs; i++ ) {
866     for ( j=bi[i]; j<bi[i+1]; j++ ) {
867       row = i+rstart;
868       col = garray[bj[j]];
869       key = row*Nbs + col + 1;
870       h1  = HASH(size,key,tmp);
871       for ( k=0; k<size; k++ ){
872         if (HT[(h1+k)%size] == 0.0) {
873           HT[(h1+k)%size] = key;
874           HD[(h1+k)%size] = b->a + j*bs2;
875           break;
876 #if defined(USE_PETSC_BOPT_g)
877         } else {
878           ct++;
879 #endif
880         }
881       }
882 #if defined(USE_PETSC_BOPT_g)
883       if (k> max) max = k;
884 #endif
885     }
886   }
887 
888   /* Print Summary */
889 #if defined(USE_PETSC_BOPT_g)
890   for ( i=0,j=0; i<size; i++)
891     if (HT[i]) {j++;}
892   PLogInfo(0,"MatCreateHashTable_MPIBAIJ_Private: Average Search = %5.2f,max search = %d\n",
893            (j== 0)? 0.0:((double)(ct+j))/j,max);
894 #endif
895   PetscFunctionReturn(0);
896 }
897 
898 #undef __FUNC__
899 #define __FUNC__ "MatAssemblyEnd_MPIBAIJ"
900 int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
901 {
902   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
903   MPI_Status  *send_status,recv_status;
904   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
905   int         bs=baij->bs,row,col,other_disassembled;
906   Scalar      *values,val;
907   InsertMode  addv = mat->insertmode;
908 
909   PetscFunctionBegin;
910   /*  wait on receives */
911   while (count) {
912     ierr = MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);CHKERRQ(ierr);
913     /* unpack receives into our local space */
914     values = baij->rvalues + 3*imdex*baij->rmax;
915     ierr = MPI_Get_count(&recv_status,MPIU_SCALAR,&n);CHKERRQ(ierr);
916     n = n/3;
917     for ( i=0; i<n; i++ ) {
918       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
919       col = (int) PetscReal(values[3*i+1]);
920       val = values[3*i+2];
921       if (col >= baij->cstart*bs && col < baij->cend*bs) {
922         col -= baij->cstart*bs;
923         ierr = MatSetValues(baij->A,1,&row,1,&col,&val,addv); CHKERRQ(ierr)
924       } else {
925         if (mat->was_assembled) {
926           if (!baij->colmap) {
927             ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);
928           }
929 #if defined (USE_CTABLE)
930 	  col = TableFind( baij->colmap, col/bs + 1 ) - 1 + col%bs;
931 #else
932           col = (baij->colmap[col/bs]) - 1 + col%bs;
933 #endif
934           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
935             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
936             col = (int) PetscReal(values[3*i+1]);
937           }
938         }
939         ierr = MatSetValues(baij->B,1,&row,1,&col,&val,addv); CHKERRQ(ierr)
940       }
941     }
942     count--;
943   }
944   if (baij->recv_waits) PetscFree(baij->recv_waits);
945   if (baij->rvalues)    PetscFree(baij->rvalues);
946 
947   /* wait on sends */
948   if (baij->nsends) {
949     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
950     ierr        = MPI_Waitall(baij->nsends,baij->send_waits,send_status);CHKERRQ(ierr);
951     PetscFree(send_status);
952   }
953   if (baij->send_waits) PetscFree(baij->send_waits);
954   if (baij->svalues)    PetscFree(baij->svalues);
955 
956   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
957   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
958 
959   /* determine if any processor has disassembled, if so we must
960      also disassemble ourselfs, in order that we may reassemble. */
961   /*
962      if nonzero structure of submatrix B cannot change then we know that
963      no processor disassembled thus we can skip this stuff
964   */
965   if (!((Mat_SeqBAIJ*) baij->B->data)->nonew)  {
966     ierr = MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);CHKERRQ(ierr);
967     if (mat->was_assembled && !other_disassembled) {
968       ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
969     }
970   }
971 
972   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
973     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
974   }
975   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
976   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
977 
978 #if defined(USE_PETSC_BOPT_g)
979   if (baij->ht && mode== MAT_FINAL_ASSEMBLY) {
980     PLogInfo(0,"MatAssemblyEnd_MPIBAIJ:Average Hash Table Search in MatSetValues = %5.2f\n",
981              ((double)baij->ht_total_ct)/baij->ht_insert_ct);
982     baij->ht_total_ct  = 0;
983     baij->ht_insert_ct = 0;
984   }
985 #endif
986   if (baij->ht_flag && !baij->ht && mode == MAT_FINAL_ASSEMBLY) {
987     ierr = MatCreateHashTable_MPIBAIJ_Private(mat,baij->ht_fact); CHKERRQ(ierr);
988     mat->ops->setvalues        = MatSetValues_MPIBAIJ_HT;
989     mat->ops->setvaluesblocked = MatSetValuesBlocked_MPIBAIJ_HT;
990   }
991 
992   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
993   PetscFunctionReturn(0);
994 }
995 
996 #undef __FUNC__
997 #define __FUNC__ "MatView_MPIBAIJ_Binary"
998 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
999 {
1000   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
1001   int          ierr;
1002 
1003   PetscFunctionBegin;
1004   if (baij->size == 1) {
1005     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
1006   } else SETERRQ(PETSC_ERR_SUP,0,"Only uniprocessor output supported");
1007   PetscFunctionReturn(0);
1008 }
1009 
1010 #undef __FUNC__
1011 #define __FUNC__ "MatView_MPIBAIJ_ASCIIorDraworSocket"
1012 static int MatView_MPIBAIJ_ASCIIorDraworSocket(Mat mat,Viewer viewer)
1013 {
1014   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
1015   int          ierr, format,bs = baij->bs, size = baij->size, rank = baij->rank;
1016   FILE         *fd;
1017   ViewerType   vtype;
1018 
1019   PetscFunctionBegin;
1020   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
1021   if (PetscTypeCompare(vtype,ASCII_VIEWER)) {
1022     ierr = ViewerGetFormat(viewer,&format);
1023     if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
1024       MatInfo info;
1025       MPI_Comm_rank(mat->comm,&rank);
1026       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
1027       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
1028       PetscSequentialPhaseBegin(mat->comm,1);
1029       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
1030               rank,baij->m,(int)info.nz_used*bs,(int)info.nz_allocated*bs,
1031               baij->bs,(int)info.memory);
1032       ierr = MatGetInfo(baij->A,MAT_LOCAL,&info);
1033       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
1034       ierr = MatGetInfo(baij->B,MAT_LOCAL,&info);
1035       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
1036       fflush(fd);
1037       PetscSequentialPhaseEnd(mat->comm,1);
1038       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
1039       PetscFunctionReturn(0);
1040     } else if (format == VIEWER_FORMAT_ASCII_INFO) {
1041       PetscPrintf(mat->comm,"  block size is %d\n",bs);
1042       PetscFunctionReturn(0);
1043     }
1044   }
1045 
1046   if (PetscTypeCompare(vtype,DRAW_VIEWER)) {
1047     Draw       draw;
1048     PetscTruth isnull;
1049     ierr = ViewerDrawGetDraw(viewer,0,&draw); CHKERRQ(ierr);
1050     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) PetscFunctionReturn(0);
1051   }
1052 
1053   if (size == 1) {
1054     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
1055   } else {
1056     /* assemble the entire matrix onto first processor. */
1057     Mat         A;
1058     Mat_SeqBAIJ *Aloc;
1059     int         M = baij->M, N = baij->N,*ai,*aj,col,i,j,k,*rvals;
1060     int         mbs = baij->mbs;
1061     Scalar      *a;
1062 
1063     if (!rank) {
1064       ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);CHKERRQ(ierr);
1065     } else {
1066       ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);CHKERRQ(ierr);
1067     }
1068     PLogObjectParent(mat,A);
1069 
1070     /* copy over the A part */
1071     Aloc = (Mat_SeqBAIJ*) baij->A->data;
1072     ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1073     rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
1074 
1075     for ( i=0; i<mbs; i++ ) {
1076       rvals[0] = bs*(baij->rstart + i);
1077       for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1078       for ( j=ai[i]; j<ai[i+1]; j++ ) {
1079         col = (baij->cstart+aj[j])*bs;
1080         for (k=0; k<bs; k++ ) {
1081           ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
1082           col++; a += bs;
1083         }
1084       }
1085     }
1086     /* copy over the B part */
1087     Aloc = (Mat_SeqBAIJ*) baij->B->data;
1088     ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1089     for ( i=0; i<mbs; i++ ) {
1090       rvals[0] = bs*(baij->rstart + i);
1091       for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1092       for ( j=ai[i]; j<ai[i+1]; j++ ) {
1093         col = baij->garray[aj[j]]*bs;
1094         for (k=0; k<bs; k++ ) {
1095           ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
1096           col++; a += bs;
1097         }
1098       }
1099     }
1100     PetscFree(rvals);
1101     ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1102     ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1103     /*
1104        Everyone has to call to draw the matrix since the graphics waits are
1105        synchronized across all processors that share the Draw object
1106     */
1107     if (!rank || PetscTypeCompare(vtype,DRAW_VIEWER)) {
1108       ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
1109     }
1110     ierr = MatDestroy(A); CHKERRQ(ierr);
1111   }
1112   PetscFunctionReturn(0);
1113 }
1114 
1115 
1116 
1117 #undef __FUNC__
1118 #define __FUNC__ "MatView_MPIBAIJ"
1119 int MatView_MPIBAIJ(Mat mat,Viewer viewer)
1120 {
1121   int         ierr;
1122   ViewerType  vtype;
1123 
1124   PetscFunctionBegin;
1125   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
1126   if (PetscTypeCompare(vtype,ASCII_VIEWER) || PetscTypeCompare(vtype,DRAW_VIEWER) ||
1127       PetscTypeCompare(vtype,SOCKET_VIEWER)) {
1128     ierr = MatView_MPIBAIJ_ASCIIorDraworSocket(mat,viewer); CHKERRQ(ierr);
1129   } else if (PetscTypeCompare(vtype,BINARY_VIEWER)) {
1130     ierr = MatView_MPIBAIJ_Binary(mat,viewer);CHKERRQ(ierr);
1131   } else {
1132     SETERRQ(1,1,"Viewer type not supported by PETSc object");
1133   }
1134   PetscFunctionReturn(0);
1135 }
1136 
1137 #undef __FUNC__
1138 #define __FUNC__ "MatDestroy_MPIBAIJ"
1139 int MatDestroy_MPIBAIJ(Mat mat)
1140 {
1141   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1142   int         ierr;
1143 
1144   PetscFunctionBegin;
1145   if (--mat->refct > 0) PetscFunctionReturn(0);
1146 
1147   if (mat->mapping) {
1148     ierr = ISLocalToGlobalMappingDestroy(mat->mapping); CHKERRQ(ierr);
1149   }
1150   if (mat->bmapping) {
1151     ierr = ISLocalToGlobalMappingDestroy(mat->bmapping); CHKERRQ(ierr);
1152   }
1153   if (mat->rmap) {
1154     ierr = MapDestroy(mat->rmap);CHKERRQ(ierr);
1155   }
1156   if (mat->cmap) {
1157     ierr = MapDestroy(mat->cmap);CHKERRQ(ierr);
1158   }
1159 #if defined(USE_PETSC_LOG)
1160   PLogObjectState((PetscObject)mat,"Rows=%d, Cols=%d",baij->M,baij->N);
1161 #endif
1162 
1163   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
1164   PetscFree(baij->rowners);
1165   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
1166   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
1167 #if defined (USE_CTABLE)
1168   if (baij->colmap) TableDelete(baij->colmap);
1169 #else
1170   if (baij->colmap) PetscFree(baij->colmap);
1171 #endif
1172   if (baij->garray) PetscFree(baij->garray);
1173   if (baij->lvec)   VecDestroy(baij->lvec);
1174   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
1175   if (baij->rowvalues) PetscFree(baij->rowvalues);
1176   if (baij->barray) PetscFree(baij->barray);
1177   if (baij->hd) PetscFree(baij->hd);
1178   PetscFree(baij);
1179   PLogObjectDestroy(mat);
1180   PetscHeaderDestroy(mat);
1181   PetscFunctionReturn(0);
1182 }
1183 
1184 #undef __FUNC__
1185 #define __FUNC__ "MatMult_MPIBAIJ"
1186 int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
1187 {
1188   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1189   int         ierr, nt;
1190 
1191   PetscFunctionBegin;
1192   ierr = VecGetLocalSize(xx,&nt);CHKERRQ(ierr);
1193   if (nt != a->n) {
1194     SETERRQ(PETSC_ERR_ARG_SIZ,0,"Incompatible partition of A and xx");
1195   }
1196   ierr = VecGetLocalSize(yy,&nt);CHKERRQ(ierr);
1197   if (nt != a->m) {
1198     SETERRQ(PETSC_ERR_ARG_SIZ,0,"Incompatible parition of A and yy");
1199   }
1200   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
1201   ierr = (*a->A->ops->mult)(a->A,xx,yy); CHKERRQ(ierr);
1202   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
1203   ierr = (*a->B->ops->multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
1204   ierr = VecScatterPostRecvs(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
1205   PetscFunctionReturn(0);
1206 }
1207 
1208 #undef __FUNC__
1209 #define __FUNC__ "MatMultAdd_MPIBAIJ"
1210 int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
1211 {
1212   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1213   int        ierr;
1214 
1215   PetscFunctionBegin;
1216   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
1217   ierr = (*a->A->ops->multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
1218   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
1219   ierr = (*a->B->ops->multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
1220   PetscFunctionReturn(0);
1221 }
1222 
1223 #undef __FUNC__
1224 #define __FUNC__ "MatMultTrans_MPIBAIJ"
1225 int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
1226 {
1227   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1228   int         ierr;
1229 
1230   PetscFunctionBegin;
1231   /* do nondiagonal part */
1232   ierr = (*a->B->ops->multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
1233   /* send it on its way */
1234   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
1235   /* do local part */
1236   ierr = (*a->A->ops->multtrans)(a->A,xx,yy); CHKERRQ(ierr);
1237   /* receive remote parts: note this assumes the values are not actually */
1238   /* inserted in yy until the next line, which is true for my implementation*/
1239   /* but is not perhaps always true. */
1240   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
1241   PetscFunctionReturn(0);
1242 }
1243 
1244 #undef __FUNC__
1245 #define __FUNC__ "MatMultTransAdd_MPIBAIJ"
1246 int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
1247 {
1248   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1249   int         ierr;
1250 
1251   PetscFunctionBegin;
1252   /* do nondiagonal part */
1253   ierr = (*a->B->ops->multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
1254   /* send it on its way */
1255   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
1256   /* do local part */
1257   ierr = (*a->A->ops->multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
1258   /* receive remote parts: note this assumes the values are not actually */
1259   /* inserted in yy until the next line, which is true for my implementation*/
1260   /* but is not perhaps always true. */
1261   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
1262   PetscFunctionReturn(0);
1263 }
1264 
1265 /*
1266   This only works correctly for square matrices where the subblock A->A is the
1267    diagonal block
1268 */
1269 #undef __FUNC__
1270 #define __FUNC__ "MatGetDiagonal_MPIBAIJ"
1271 int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
1272 {
1273   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1274   int         ierr;
1275 
1276   PetscFunctionBegin;
1277   if (a->M != a->N) SETERRQ(PETSC_ERR_SUP,0,"Supports only square matrix where A->A is diag block");
1278   ierr = MatGetDiagonal(a->A,v);CHKERRQ(ierr);
1279   PetscFunctionReturn(0);
1280 }
1281 
1282 #undef __FUNC__
1283 #define __FUNC__ "MatScale_MPIBAIJ"
1284 int MatScale_MPIBAIJ(Scalar *aa,Mat A)
1285 {
1286   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1287   int         ierr;
1288 
1289   PetscFunctionBegin;
1290   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
1291   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
1292   PetscFunctionReturn(0);
1293 }
1294 
1295 #undef __FUNC__
1296 #define __FUNC__ "MatGetSize_MPIBAIJ"
1297 int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
1298 {
1299   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1300 
1301   PetscFunctionBegin;
1302   if (m) *m = mat->M;
1303   if (n) *n = mat->N;
1304   PetscFunctionReturn(0);
1305 }
1306 
1307 #undef __FUNC__
1308 #define __FUNC__ "MatGetLocalSize_MPIBAIJ"
1309 int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
1310 {
1311   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1312 
1313   PetscFunctionBegin;
1314   *m = mat->m; *n = mat->n;
1315   PetscFunctionReturn(0);
1316 }
1317 
1318 #undef __FUNC__
1319 #define __FUNC__ "MatGetOwnershipRange_MPIBAIJ"
1320 int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
1321 {
1322   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1323 
1324   PetscFunctionBegin;
1325   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
1326   PetscFunctionReturn(0);
1327 }
1328 
1329 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
1330 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
1331 
1332 #undef __FUNC__
1333 #define __FUNC__ "MatGetRow_MPIBAIJ"
1334 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
1335 {
1336   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1337   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
1338   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
1339   int        nztot, nzA, nzB, lrow, brstart = mat->rstart*bs, brend = mat->rend*bs;
1340   int        *cmap, *idx_p,cstart = mat->cstart;
1341 
1342   PetscFunctionBegin;
1343   if (mat->getrowactive == PETSC_TRUE) SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"Already active");
1344   mat->getrowactive = PETSC_TRUE;
1345 
1346   if (!mat->rowvalues && (idx || v)) {
1347     /*
1348         allocate enough space to hold information from the longest row.
1349     */
1350     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
1351     int     max = 1,mbs = mat->mbs,tmp;
1352     for ( i=0; i<mbs; i++ ) {
1353       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
1354       if (max < tmp) { max = tmp; }
1355     }
1356     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
1357     CHKPTRQ(mat->rowvalues);
1358     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
1359   }
1360 
1361   if (row < brstart || row >= brend) SETERRQ(PETSC_ERR_SUP,0,"Only local rows")
1362   lrow = row - brstart;
1363 
1364   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
1365   if (!v)   {pvA = 0; pvB = 0;}
1366   if (!idx) {pcA = 0; if (!v) pcB = 0;}
1367   ierr = (*mat->A->ops->getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1368   ierr = (*mat->B->ops->getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1369   nztot = nzA + nzB;
1370 
1371   cmap  = mat->garray;
1372   if (v  || idx) {
1373     if (nztot) {
1374       /* Sort by increasing column numbers, assuming A and B already sorted */
1375       int imark = -1;
1376       if (v) {
1377         *v = v_p = mat->rowvalues;
1378         for ( i=0; i<nzB; i++ ) {
1379           if (cmap[cworkB[i]/bs] < cstart)   v_p[i] = vworkB[i];
1380           else break;
1381         }
1382         imark = i;
1383         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
1384         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
1385       }
1386       if (idx) {
1387         *idx = idx_p = mat->rowindices;
1388         if (imark > -1) {
1389           for ( i=0; i<imark; i++ ) {
1390             idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs;
1391           }
1392         } else {
1393           for ( i=0; i<nzB; i++ ) {
1394             if (cmap[cworkB[i]/bs] < cstart)
1395               idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
1396             else break;
1397           }
1398           imark = i;
1399         }
1400         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart*bs + cworkA[i];
1401         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
1402       }
1403     } else {
1404       if (idx) *idx = 0;
1405       if (v)   *v   = 0;
1406     }
1407   }
1408   *nz = nztot;
1409   ierr = (*mat->A->ops->restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1410   ierr = (*mat->B->ops->restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1411   PetscFunctionReturn(0);
1412 }
1413 
1414 #undef __FUNC__
1415 #define __FUNC__ "MatRestoreRow_MPIBAIJ"
1416 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
1417 {
1418   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1419 
1420   PetscFunctionBegin;
1421   if (baij->getrowactive == PETSC_FALSE) {
1422     SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"MatGetRow not called");
1423   }
1424   baij->getrowactive = PETSC_FALSE;
1425   PetscFunctionReturn(0);
1426 }
1427 
1428 #undef __FUNC__
1429 #define __FUNC__ "MatGetBlockSize_MPIBAIJ"
1430 int MatGetBlockSize_MPIBAIJ(Mat mat,int *bs)
1431 {
1432   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1433 
1434   PetscFunctionBegin;
1435   *bs = baij->bs;
1436   PetscFunctionReturn(0);
1437 }
1438 
1439 #undef __FUNC__
1440 #define __FUNC__ "MatZeroEntries_MPIBAIJ"
1441 int MatZeroEntries_MPIBAIJ(Mat A)
1442 {
1443   Mat_MPIBAIJ *l = (Mat_MPIBAIJ *) A->data;
1444   int         ierr;
1445 
1446   PetscFunctionBegin;
1447   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
1448   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
1449   PetscFunctionReturn(0);
1450 }
1451 
1452 #undef __FUNC__
1453 #define __FUNC__ "MatGetInfo_MPIBAIJ"
1454 int MatGetInfo_MPIBAIJ(Mat matin,MatInfoType flag,MatInfo *info)
1455 {
1456   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) matin->data;
1457   Mat         A = a->A, B = a->B;
1458   int         ierr;
1459   double      isend[5], irecv[5];
1460 
1461   PetscFunctionBegin;
1462   info->block_size     = (double)a->bs;
1463   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
1464   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->nz_unneeded;
1465   isend[3] = info->memory;  isend[4] = info->mallocs;
1466   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
1467   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->nz_unneeded;
1468   isend[3] += info->memory;  isend[4] += info->mallocs;
1469   if (flag == MAT_LOCAL) {
1470     info->nz_used      = isend[0];
1471     info->nz_allocated = isend[1];
1472     info->nz_unneeded  = isend[2];
1473     info->memory       = isend[3];
1474     info->mallocs      = isend[4];
1475   } else if (flag == MAT_GLOBAL_MAX) {
1476     ierr = MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_MAX,matin->comm);CHKERRQ(ierr);
1477     info->nz_used      = irecv[0];
1478     info->nz_allocated = irecv[1];
1479     info->nz_unneeded  = irecv[2];
1480     info->memory       = irecv[3];
1481     info->mallocs      = irecv[4];
1482   } else if (flag == MAT_GLOBAL_SUM) {
1483     ierr = MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_SUM,matin->comm);CHKERRQ(ierr);
1484     info->nz_used      = irecv[0];
1485     info->nz_allocated = irecv[1];
1486     info->nz_unneeded  = irecv[2];
1487     info->memory       = irecv[3];
1488     info->mallocs      = irecv[4];
1489   }
1490   info->rows_global       = (double)a->M;
1491   info->columns_global    = (double)a->N;
1492   info->rows_local        = (double)a->m;
1493   info->columns_local     = (double)a->N;
1494   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
1495   info->fill_ratio_needed = 0;
1496   info->factor_mallocs    = 0;
1497   PetscFunctionReturn(0);
1498 }
1499 
1500 #undef __FUNC__
1501 #define __FUNC__ "MatSetOption_MPIBAIJ"
1502 int MatSetOption_MPIBAIJ(Mat A,MatOption op)
1503 {
1504   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1505 
1506   PetscFunctionBegin;
1507   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
1508       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
1509       op == MAT_COLUMNS_UNSORTED ||
1510       op == MAT_COLUMNS_SORTED ||
1511       op == MAT_NEW_NONZERO_ALLOCATION_ERR ||
1512       op == MAT_NEW_NONZERO_LOCATION_ERR) {
1513         MatSetOption(a->A,op);
1514         MatSetOption(a->B,op);
1515   } else if (op == MAT_ROW_ORIENTED) {
1516         a->roworiented = 1;
1517         MatSetOption(a->A,op);
1518         MatSetOption(a->B,op);
1519   } else if (op == MAT_ROWS_SORTED ||
1520              op == MAT_ROWS_UNSORTED ||
1521              op == MAT_SYMMETRIC ||
1522              op == MAT_STRUCTURALLY_SYMMETRIC ||
1523              op == MAT_YES_NEW_DIAGONALS ||
1524              op == MAT_USE_HASH_TABLE)
1525     PLogInfo(A,"Info:MatSetOption_MPIBAIJ:Option ignored\n");
1526   else if (op == MAT_COLUMN_ORIENTED) {
1527     a->roworiented = 0;
1528     MatSetOption(a->A,op);
1529     MatSetOption(a->B,op);
1530   } else if (op == MAT_IGNORE_OFF_PROC_ENTRIES) {
1531     a->donotstash = 1;
1532   } else if (op == MAT_NO_NEW_DIAGONALS) {
1533     SETERRQ(PETSC_ERR_SUP,0,"MAT_NO_NEW_DIAGONALS");
1534   } else if (op == MAT_USE_HASH_TABLE) {
1535     a->ht_flag = 1;
1536   } else {
1537     SETERRQ(PETSC_ERR_SUP,0,"unknown option");
1538   }
1539   PetscFunctionReturn(0);
1540 }
1541 
1542 #undef __FUNC__
1543 #define __FUNC__ "MatTranspose_MPIBAIJ("
1544 int MatTranspose_MPIBAIJ(Mat A,Mat *matout)
1545 {
1546   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) A->data;
1547   Mat_SeqBAIJ *Aloc;
1548   Mat        B;
1549   int        ierr,M=baij->M,N=baij->N,*ai,*aj,i,*rvals,j,k,col;
1550   int        bs=baij->bs,mbs=baij->mbs;
1551   Scalar     *a;
1552 
1553   PetscFunctionBegin;
1554   if (matout == PETSC_NULL && M != N) SETERRQ(PETSC_ERR_ARG_SIZ,0,"Square matrix only for in-place");
1555   ierr = MatCreateMPIBAIJ(A->comm,baij->bs,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);
1556   CHKERRQ(ierr);
1557 
1558   /* copy over the A part */
1559   Aloc = (Mat_SeqBAIJ*) baij->A->data;
1560   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1561   rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
1562 
1563   for ( i=0; i<mbs; i++ ) {
1564     rvals[0] = bs*(baij->rstart + i);
1565     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1566     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1567       col = (baij->cstart+aj[j])*bs;
1568       for (k=0; k<bs; k++ ) {
1569         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1570         col++; a += bs;
1571       }
1572     }
1573   }
1574   /* copy over the B part */
1575   Aloc = (Mat_SeqBAIJ*) baij->B->data;
1576   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1577   for ( i=0; i<mbs; i++ ) {
1578     rvals[0] = bs*(baij->rstart + i);
1579     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1580     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1581       col = baij->garray[aj[j]]*bs;
1582       for (k=0; k<bs; k++ ) {
1583         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1584         col++; a += bs;
1585       }
1586     }
1587   }
1588   PetscFree(rvals);
1589   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1590   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1591 
1592   if (matout != PETSC_NULL) {
1593     *matout = B;
1594   } else {
1595     PetscOps *Abops;
1596     MatOps   Aops;
1597 
1598     /* This isn't really an in-place transpose .... but free data structures from baij */
1599     PetscFree(baij->rowners);
1600     ierr = MatDestroy(baij->A); CHKERRQ(ierr);
1601     ierr = MatDestroy(baij->B); CHKERRQ(ierr);
1602 #if defined (USE_CTABLE)
1603     if (baij->colmap) TableDelete(baij->colmap);
1604 #else
1605     if (baij->colmap) PetscFree(baij->colmap);
1606 #endif
1607     if (baij->garray) PetscFree(baij->garray);
1608     if (baij->lvec) VecDestroy(baij->lvec);
1609     if (baij->Mvctx) VecScatterDestroy(baij->Mvctx);
1610     PetscFree(baij);
1611 
1612     /*
1613        This is horrible, horrible code. We need to keep the
1614       A pointers for the bops and ops but copy everything
1615       else from C.
1616     */
1617     Abops = A->bops;
1618     Aops  = A->ops;
1619     PetscMemcpy(A,B,sizeof(struct _p_Mat));
1620     A->bops = Abops;
1621     A->ops  = Aops;
1622 
1623     PetscHeaderDestroy(B);
1624   }
1625   PetscFunctionReturn(0);
1626 }
1627 
1628 #undef __FUNC__
1629 #define __FUNC__ "MatDiagonalScale_MPIBAIJ"
1630 int MatDiagonalScale_MPIBAIJ(Mat A,Vec ll,Vec rr)
1631 {
1632   Mat a = ((Mat_MPIBAIJ *) A->data)->A;
1633   Mat b = ((Mat_MPIBAIJ *) A->data)->B;
1634   int ierr,s1,s2,s3;
1635 
1636   PetscFunctionBegin;
1637   if (ll)  {
1638     ierr = VecGetLocalSize(ll,&s1); CHKERRQ(ierr);
1639     ierr = MatGetLocalSize(A,&s2,&s3); CHKERRQ(ierr);
1640     if (s1!=s2) SETERRQ(PETSC_ERR_ARG_SIZ,0,"non-conforming local sizes");
1641     ierr = MatDiagonalScale(a,ll,0); CHKERRQ(ierr);
1642     ierr = MatDiagonalScale(b,ll,0); CHKERRQ(ierr);
1643   }
1644   if (rr) SETERRQ(PETSC_ERR_SUP,0,"not supported for right vector");
1645   PetscFunctionReturn(0);
1646 }
1647 
1648 #undef __FUNC__
1649 #define __FUNC__ "MatZeroRows_MPIBAIJ"
1650 int MatZeroRows_MPIBAIJ(Mat A,IS is,Scalar *diag)
1651 {
1652   Mat_MPIBAIJ    *l = (Mat_MPIBAIJ *) A->data;
1653   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
1654   int            *procs,*nprocs,j,found,idx,nsends,*work,row;
1655   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
1656   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
1657   int            *lens,imdex,*lrows,*values,bs=l->bs,rstart_bs=l->rstart_bs;
1658   MPI_Comm       comm = A->comm;
1659   MPI_Request    *send_waits,*recv_waits;
1660   MPI_Status     recv_status,*send_status;
1661   IS             istmp;
1662 
1663   PetscFunctionBegin;
1664   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
1665   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
1666 
1667   /*  first count number of contributors to each processor */
1668   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
1669   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
1670   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
1671   for ( i=0; i<N; i++ ) {
1672     idx = rows[i];
1673     found = 0;
1674     for ( j=0; j<size; j++ ) {
1675       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
1676         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
1677       }
1678     }
1679     if (!found) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Index out of range");
1680   }
1681   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
1682 
1683   /* inform other processors of number of messages and max length*/
1684   work   = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
1685   ierr   = MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
1686   nrecvs = work[rank];
1687   ierr   = MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);CHKERRQ(ierr);
1688   nmax   = work[rank];
1689   PetscFree(work);
1690 
1691   /* post receives:   */
1692   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); CHKPTRQ(rvalues);
1693   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
1694   for ( i=0; i<nrecvs; i++ ) {
1695     ierr = MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);CHKERRQ(ierr);
1696   }
1697 
1698   /* do sends:
1699      1) starts[i] gives the starting index in svalues for stuff going to
1700      the ith processor
1701   */
1702   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
1703   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
1704   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
1705   starts[0] = 0;
1706   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1707   for ( i=0; i<N; i++ ) {
1708     svalues[starts[owner[i]]++] = rows[i];
1709   }
1710   ISRestoreIndices(is,&rows);
1711 
1712   starts[0] = 0;
1713   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1714   count = 0;
1715   for ( i=0; i<size; i++ ) {
1716     if (procs[i]) {
1717       ierr = MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);CHKERRQ(ierr);
1718     }
1719   }
1720   PetscFree(starts);
1721 
1722   base = owners[rank]*bs;
1723 
1724   /*  wait on receives */
1725   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
1726   source = lens + nrecvs;
1727   count  = nrecvs; slen = 0;
1728   while (count) {
1729     ierr = MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);CHKERRQ(ierr);
1730     /* unpack receives into our local space */
1731     ierr = MPI_Get_count(&recv_status,MPI_INT,&n);CHKERRQ(ierr);
1732     source[imdex]  = recv_status.MPI_SOURCE;
1733     lens[imdex]  = n;
1734     slen += n;
1735     count--;
1736   }
1737   PetscFree(recv_waits);
1738 
1739   /* move the data into the send scatter */
1740   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
1741   count = 0;
1742   for ( i=0; i<nrecvs; i++ ) {
1743     values = rvalues + i*nmax;
1744     for ( j=0; j<lens[i]; j++ ) {
1745       lrows[count++] = values[j] - base;
1746     }
1747   }
1748   PetscFree(rvalues); PetscFree(lens);
1749   PetscFree(owner); PetscFree(nprocs);
1750 
1751   /* actually zap the local rows */
1752   ierr = ISCreateGeneral(PETSC_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
1753   PLogObjectParent(A,istmp);
1754 
1755   /*
1756         Zero the required rows. If the "diagonal block" of the matrix
1757      is square and the user wishes to set the diagonal we use seperate
1758      code so that MatSetValues() is not called for each diagonal allocating
1759      new memory, thus calling lots of mallocs and slowing things down.
1760 
1761        Contributed by: Mathew Knepley
1762   */
1763   /* must zero l->B before l->A because the (diag) case below may put values into l->B*/
1764   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
1765   if (diag && (l->A->M == l->A->N)) {
1766     ierr      = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
1767   } else if (diag) {
1768     ierr = MatZeroRows(l->A,istmp,0); CHKERRQ(ierr);
1769     if (!((Mat_SeqBAIJ*)l->A->data)->nonew) {
1770       SETERRQ(PETSC_ERR_SUP,0,"MatZeroRows() on rectangular matrices cannot be used with \n\
1771 the Mat options MAT_NO_NEW_NONZERO_LOCATIONS, MAT_NEW_NONZERO_LOCATION_ERR");
1772     }
1773     for ( i = 0; i < slen; i++ ) {
1774       row  = lrows[i] + rstart_bs;
1775       ierr = MatSetValues(A,1,&row,1,&row,diag,INSERT_VALUES);CHKERRQ(ierr);
1776     }
1777     ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
1778     ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
1779   } else {
1780     ierr = MatZeroRows(l->A,istmp,0); CHKERRQ(ierr);
1781   }
1782 
1783   ierr = ISDestroy(istmp); CHKERRQ(ierr);
1784   PetscFree(lrows);
1785 
1786   /* wait on sends */
1787   if (nsends) {
1788     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
1789     ierr        = MPI_Waitall(nsends,send_waits,send_status);CHKERRQ(ierr);
1790     PetscFree(send_status);
1791   }
1792   PetscFree(send_waits); PetscFree(svalues);
1793 
1794   PetscFunctionReturn(0);
1795 }
1796 
1797 extern int MatPrintHelp_SeqBAIJ(Mat);
1798 #undef __FUNC__
1799 #define __FUNC__ "MatPrintHelp_MPIBAIJ"
1800 int MatPrintHelp_MPIBAIJ(Mat A)
1801 {
1802   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1803   MPI_Comm    comm = A->comm;
1804   static int  called = 0;
1805   int         ierr;
1806 
1807   PetscFunctionBegin;
1808   if (!a->rank) {
1809     ierr = MatPrintHelp_SeqBAIJ(a->A);CHKERRQ(ierr);
1810   }
1811   if (called) {PetscFunctionReturn(0);} else called = 1;
1812   (*PetscHelpPrintf)(comm," Options for MATMPIBAIJ matrix format (the defaults):\n");
1813   (*PetscHelpPrintf)(comm,"  -mat_use_hash_table <factor>: Use hashtable for efficient matrix assembly\n");
1814   PetscFunctionReturn(0);
1815 }
1816 
1817 #undef __FUNC__
1818 #define __FUNC__ "MatSetUnfactored_MPIBAIJ"
1819 int MatSetUnfactored_MPIBAIJ(Mat A)
1820 {
1821   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1822   int         ierr;
1823 
1824   PetscFunctionBegin;
1825   ierr = MatSetUnfactored(a->A); CHKERRQ(ierr);
1826   PetscFunctionReturn(0);
1827 }
1828 
1829 static int MatDuplicate_MPIBAIJ(Mat,MatDuplicateOption,Mat *);
1830 
1831 /* -------------------------------------------------------------------*/
1832 static struct _MatOps MatOps_Values = {
1833   MatSetValues_MPIBAIJ,
1834   MatGetRow_MPIBAIJ,
1835   MatRestoreRow_MPIBAIJ,
1836   MatMult_MPIBAIJ,
1837   MatMultAdd_MPIBAIJ,
1838   MatMultTrans_MPIBAIJ,
1839   MatMultTransAdd_MPIBAIJ,
1840   0,
1841   0,
1842   0,
1843   0,
1844   0,
1845   0,
1846   0,
1847   MatTranspose_MPIBAIJ,
1848   MatGetInfo_MPIBAIJ,
1849   0,
1850   MatGetDiagonal_MPIBAIJ,
1851   MatDiagonalScale_MPIBAIJ,
1852   MatNorm_MPIBAIJ,
1853   MatAssemblyBegin_MPIBAIJ,
1854   MatAssemblyEnd_MPIBAIJ,
1855   0,
1856   MatSetOption_MPIBAIJ,
1857   MatZeroEntries_MPIBAIJ,
1858   MatZeroRows_MPIBAIJ,
1859   0,
1860   0,
1861   0,
1862   0,
1863   MatGetSize_MPIBAIJ,
1864   MatGetLocalSize_MPIBAIJ,
1865   MatGetOwnershipRange_MPIBAIJ,
1866   0,
1867   0,
1868   0,
1869   0,
1870   MatDuplicate_MPIBAIJ,
1871   0,
1872   0,
1873   0,
1874   0,
1875   0,
1876   MatGetSubMatrices_MPIBAIJ,
1877   MatIncreaseOverlap_MPIBAIJ,
1878   MatGetValues_MPIBAIJ,
1879   0,
1880   MatPrintHelp_MPIBAIJ,
1881   MatScale_MPIBAIJ,
1882   0,
1883   0,
1884   0,
1885   MatGetBlockSize_MPIBAIJ,
1886   0,
1887   0,
1888   0,
1889   0,
1890   0,
1891   0,
1892   MatSetUnfactored_MPIBAIJ,
1893   0,
1894   MatSetValuesBlocked_MPIBAIJ,
1895   0,
1896   0,
1897   0,
1898   MatGetMaps_Petsc};
1899 
1900 
1901 EXTERN_C_BEGIN
1902 #undef __FUNC__
1903 #define __FUNC__ "MatGetDiagonalBlock_MPIBAIJ"
1904 int MatGetDiagonalBlock_MPIBAIJ(Mat A,PetscTruth *iscopy,MatReuse reuse,Mat *a)
1905 {
1906   PetscFunctionBegin;
1907   *a      = ((Mat_MPIBAIJ *)A->data)->A;
1908   *iscopy = PETSC_FALSE;
1909   PetscFunctionReturn(0);
1910 }
1911 EXTERN_C_END
1912 
1913 #undef __FUNC__
1914 #define __FUNC__ "MatCreateMPIBAIJ"
1915 /*@C
1916    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
1917    (block compressed row).  For good matrix assembly performance
1918    the user should preallocate the matrix storage by setting the parameters
1919    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1920    performance can be increased by more than a factor of 50.
1921 
1922    Collective on MPI_Comm
1923 
1924    Input Parameters:
1925 +  comm - MPI communicator
1926 .  bs   - size of blockk
1927 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1928            This value should be the same as the local size used in creating the
1929            y vector for the matrix-vector product y = Ax.
1930 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1931            This value should be the same as the local size used in creating the
1932            x vector for the matrix-vector product y = Ax.
1933 .  M - number of global rows (or PETSC_DETERMINE to have calculated if m is given)
1934 .  N - number of global columns (or PETSC_DETERMINE to have calculated if n is given)
1935 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
1936            submatrix  (same for all local rows)
1937 .  d_nzz - array containing the number of block nonzeros in the various block rows
1938            of the in diagonal portion of the local (possibly different for each block
1939            row) or PETSC_NULL.  You must leave room for the diagonal entry even if it is zero.
1940 .  o_nz  - number of block nonzeros per block row in the off-diagonal portion of local
1941            submatrix (same for all local rows).
1942 -  o_nzz - array containing the number of nonzeros in the various block rows of the
1943            off-diagonal portion of the local submatrix (possibly different for
1944            each block row) or PETSC_NULL.
1945 
1946    Output Parameter:
1947 .  A - the matrix
1948 
1949    Options Database Keys:
1950 .   -mat_no_unroll - uses code that does not unroll the loops in the
1951                      block calculations (much slower)
1952 .   -mat_block_size - size of the blocks to use
1953 .   -mat_mpi - use the parallel matrix data structures even on one processor
1954                (defaults to using SeqBAIJ format on one processor)
1955 
1956    Notes:
1957    The user MUST specify either the local or global matrix dimensions
1958    (possibly both).
1959 
1960    If PETSC_DECIDE or  PETSC_DETERMINE is used for a particular argument on one processor
1961    than it must be used on all processors that share the object for that argument.
1962 
1963    Storage Information:
1964    For a square global matrix we define each processor's diagonal portion
1965    to be its local rows and the corresponding columns (a square submatrix);
1966    each processor's off-diagonal portion encompasses the remainder of the
1967    local matrix (a rectangular submatrix).
1968 
1969    The user can specify preallocated storage for the diagonal part of
1970    the local submatrix with either d_nz or d_nnz (not both).  Set
1971    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1972    memory allocation.  Likewise, specify preallocated storage for the
1973    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1974 
1975    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1976    the figure below we depict these three local rows and all columns (0-11).
1977 
1978 .vb
1979            0 1 2 3 4 5 6 7 8 9 10 11
1980           -------------------
1981    row 3  |  o o o d d d o o o o o o
1982    row 4  |  o o o d d d o o o o o o
1983    row 5  |  o o o d d d o o o o o o
1984           -------------------
1985 .ve
1986 
1987    Thus, any entries in the d locations are stored in the d (diagonal)
1988    submatrix, and any entries in the o locations are stored in the
1989    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1990    stored simply in the MATSEQBAIJ format for compressed row storage.
1991 
1992    Now d_nz should indicate the number of block nonzeros per row in the d matrix,
1993    and o_nz should indicate the number of block nonzeros per row in the o matrix.
1994    In general, for PDE problems in which most nonzeros are near the diagonal,
1995    one expects d_nz >> o_nz.   For large problems you MUST preallocate memory
1996    or you will get TERRIBLE performance; see the users' manual chapter on
1997    matrices.
1998 
1999    Level: intermediate
2000 
2001 .keywords: matrix, block, aij, compressed row, sparse, parallel
2002 
2003 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues(), MatCreateMPIAIJ()
2004 @*/
2005 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
2006                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
2007 {
2008   Mat          B;
2009   Mat_MPIBAIJ  *b;
2010   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE,size,flg;
2011   int          flag1 = 0,flag2 = 0;
2012 
2013   PetscFunctionBegin;
2014   if (bs < 1) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Invalid block size specified, must be positive");
2015 
2016   MPI_Comm_size(comm,&size);
2017   ierr = OptionsHasName(PETSC_NULL,"-mat_mpibaij",&flag1); CHKERRQ(ierr);
2018   ierr = OptionsHasName(PETSC_NULL,"-mat_mpi",&flag2); CHKERRQ(ierr);
2019   if (!flag1 && !flag2 && size == 1) {
2020     if (M == PETSC_DECIDE) M = m;
2021     if (N == PETSC_DECIDE) N = n;
2022     ierr = MatCreateSeqBAIJ(comm,bs,M,N,d_nz,d_nnz,A); CHKERRQ(ierr);
2023     PetscFunctionReturn(0);
2024   }
2025 
2026   *A = 0;
2027   PetscHeaderCreate(B,_p_Mat,struct _MatOps,MAT_COOKIE,MATMPIBAIJ,"Mat",comm,MatDestroy,MatView);
2028   PLogObjectCreate(B);
2029   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
2030   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
2031   PetscMemcpy(B->ops,&MatOps_Values,sizeof(struct _MatOps));
2032 
2033   B->ops->destroy    = MatDestroy_MPIBAIJ;
2034   B->ops->view       = MatView_MPIBAIJ;
2035   B->mapping    = 0;
2036   B->factor     = 0;
2037   B->assembled  = PETSC_FALSE;
2038 
2039   B->insertmode = NOT_SET_VALUES;
2040   MPI_Comm_rank(comm,&b->rank);
2041   MPI_Comm_size(comm,&b->size);
2042 
2043   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL)) {
2044     SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
2045   }
2046   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) {
2047     SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"either M or m should be specified");
2048   }
2049   if ( N == PETSC_DECIDE && n == PETSC_DECIDE) {
2050     SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"either N or n should be specified");
2051   }
2052   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
2053   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
2054 
2055   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
2056     work[0] = m; work[1] = n;
2057     mbs = m/bs; nbs = n/bs;
2058     ierr = MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );CHKERRQ(ierr);
2059     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
2060     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
2061   }
2062   if (m == PETSC_DECIDE) {
2063     Mbs = M/bs;
2064     if (Mbs*bs != M) SETERRQ(PETSC_ERR_ARG_SIZ,0,"No of global rows must be divisible by blocksize");
2065     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
2066     m   = mbs*bs;
2067   }
2068   if (n == PETSC_DECIDE) {
2069     Nbs = N/bs;
2070     if (Nbs*bs != N) SETERRQ(PETSC_ERR_ARG_SIZ,0,"No of global cols must be divisible by blocksize");
2071     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
2072     n   = nbs*bs;
2073   }
2074   if (mbs*bs != m || nbs*bs != n) {
2075     SETERRQ(PETSC_ERR_ARG_SIZ,0,"No of local rows, cols must be divisible by blocksize");
2076   }
2077 
2078   b->m = m; B->m = m;
2079   b->n = n; B->n = n;
2080   b->N = N; B->N = N;
2081   b->M = M; B->M = M;
2082   b->bs  = bs;
2083   b->bs2 = bs*bs;
2084   b->mbs = mbs;
2085   b->nbs = nbs;
2086   b->Mbs = Mbs;
2087   b->Nbs = Nbs;
2088 
2089   /* the information in the maps duplicates the information computed below, eventually
2090      we should remove the duplicate information that is not contained in the maps */
2091   ierr = MapCreateMPI(comm,m,M,&B->rmap);CHKERRQ(ierr);
2092   ierr = MapCreateMPI(comm,n,N,&B->cmap);CHKERRQ(ierr);
2093 
2094   /* build local table of row and column ownerships */
2095   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
2096   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIBAIJ));
2097   b->cowners = b->rowners + b->size + 2;
2098   ierr = MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
2099   b->rowners[0] = 0;
2100   for ( i=2; i<=b->size; i++ ) {
2101     b->rowners[i] += b->rowners[i-1];
2102   }
2103   b->rstart    = b->rowners[b->rank];
2104   b->rend      = b->rowners[b->rank+1];
2105   b->rstart_bs = b->rstart * bs;
2106   b->rend_bs   = b->rend * bs;
2107 
2108   ierr = MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
2109   b->cowners[0] = 0;
2110   for ( i=2; i<=b->size; i++ ) {
2111     b->cowners[i] += b->cowners[i-1];
2112   }
2113   b->cstart    = b->cowners[b->rank];
2114   b->cend      = b->cowners[b->rank+1];
2115   b->cstart_bs = b->cstart * bs;
2116   b->cend_bs   = b->cend * bs;
2117 
2118 
2119   if (d_nz == PETSC_DEFAULT) d_nz = 5;
2120   ierr = MatCreateSeqBAIJ(PETSC_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
2121   PLogObjectParent(B,b->A);
2122   if (o_nz == PETSC_DEFAULT) o_nz = 0;
2123   ierr = MatCreateSeqBAIJ(PETSC_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
2124   PLogObjectParent(B,b->B);
2125 
2126   /* build cache for off array entries formed */
2127   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
2128   b->donotstash  = 0;
2129   b->colmap      = 0;
2130   b->garray      = 0;
2131   b->roworiented = 1;
2132 
2133   /* stuff used in block assembly */
2134   b->barray       = 0;
2135 
2136   /* stuff used for matrix vector multiply */
2137   b->lvec         = 0;
2138   b->Mvctx        = 0;
2139 
2140   /* stuff for MatGetRow() */
2141   b->rowindices   = 0;
2142   b->rowvalues    = 0;
2143   b->getrowactive = PETSC_FALSE;
2144 
2145   /* hash table stuff */
2146   b->ht           = 0;
2147   b->hd           = 0;
2148   b->ht_size      = 0;
2149   b->ht_flag      = 0;
2150   b->ht_fact      = 0;
2151   b->ht_total_ct  = 0;
2152   b->ht_insert_ct = 0;
2153 
2154   *A = B;
2155   ierr = OptionsHasName(PETSC_NULL,"-mat_use_hash_table",&flg); CHKERRQ(ierr);
2156   if (flg) {
2157     double fact = 1.39;
2158     ierr = MatSetOption(B,MAT_USE_HASH_TABLE); CHKERRQ(ierr);
2159     ierr = OptionsGetDouble(PETSC_NULL,"-mat_use_hash_table",&fact,&flg); CHKERRQ(ierr);
2160     if (fact <= 1.0) fact = 1.39;
2161     ierr = MatMPIBAIJSetHashTableFactor(B,fact); CHKERRQ(ierr);
2162     PLogInfo(0,"MatCreateMPIBAIJ:Hash table Factor used %5.2f\n",fact);
2163   }
2164   ierr = PetscObjectComposeFunction((PetscObject)B,"MatGetDiagonalBlock_C",
2165                                      "MatGetDiagonalBlock_MPIBAIJ",
2166                                      (void*)MatGetDiagonalBlock_MPIBAIJ);CHKERRQ(ierr);
2167   PetscFunctionReturn(0);
2168 }
2169 
2170 #undef __FUNC__
2171 #define __FUNC__ "MatDuplicate_MPIBAIJ"
2172 static int MatDuplicate_MPIBAIJ(Mat matin,MatDuplicateOption cpvalues,Mat *newmat)
2173 {
2174   Mat         mat;
2175   Mat_MPIBAIJ *a,*oldmat = (Mat_MPIBAIJ *) matin->data;
2176   int         ierr, len=0, flg;
2177 
2178   PetscFunctionBegin;
2179   *newmat       = 0;
2180   PetscHeaderCreate(mat,_p_Mat,struct _MatOps,MAT_COOKIE,MATMPIBAIJ,"Mat",matin->comm,MatDestroy,MatView);
2181   PLogObjectCreate(mat);
2182   mat->data       = (void *) (a = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(a);
2183   PetscMemcpy(mat->ops,&MatOps_Values,sizeof(struct _MatOps));
2184   mat->ops->destroy    = MatDestroy_MPIBAIJ;
2185   mat->ops->view       = MatView_MPIBAIJ;
2186   mat->factor          = matin->factor;
2187   mat->assembled       = PETSC_TRUE;
2188   mat->insertmode      = NOT_SET_VALUES;
2189 
2190   a->m = mat->m   = oldmat->m;
2191   a->n = mat->n   = oldmat->n;
2192   a->M = mat->M   = oldmat->M;
2193   a->N = mat->N   = oldmat->N;
2194 
2195   a->bs  = oldmat->bs;
2196   a->bs2 = oldmat->bs2;
2197   a->mbs = oldmat->mbs;
2198   a->nbs = oldmat->nbs;
2199   a->Mbs = oldmat->Mbs;
2200   a->Nbs = oldmat->Nbs;
2201 
2202   a->rstart       = oldmat->rstart;
2203   a->rend         = oldmat->rend;
2204   a->cstart       = oldmat->cstart;
2205   a->cend         = oldmat->cend;
2206   a->size         = oldmat->size;
2207   a->rank         = oldmat->rank;
2208   a->donotstash   = oldmat->donotstash;
2209   a->roworiented  = oldmat->roworiented;
2210   a->rowindices   = 0;
2211   a->rowvalues    = 0;
2212   a->getrowactive = PETSC_FALSE;
2213   a->barray       = 0;
2214   a->rstart_bs    = oldmat->rstart_bs;
2215   a->rend_bs      = oldmat->rend_bs;
2216   a->cstart_bs    = oldmat->cstart_bs;
2217   a->cend_bs      = oldmat->cend_bs;
2218 
2219   /* hash table stuff */
2220   a->ht           = 0;
2221   a->hd           = 0;
2222   a->ht_size      = 0;
2223   a->ht_flag      = oldmat->ht_flag;
2224   a->ht_fact      = oldmat->ht_fact;
2225   a->ht_total_ct  = 0;
2226   a->ht_insert_ct = 0;
2227 
2228 
2229   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
2230   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIBAIJ));
2231   a->cowners = a->rowners + a->size + 2;
2232   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
2233   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
2234   if (oldmat->colmap) {
2235 #if defined (USE_CTABLE)
2236   ierr = TableCreateCopy( &a->colmap,oldmat->colmap ); CHKERRQ(ierr);
2237 #else
2238     a->colmap = (int *) PetscMalloc((a->Nbs)*sizeof(int));CHKPTRQ(a->colmap);
2239     PLogObjectMemory(mat,(a->Nbs)*sizeof(int));
2240     PetscMemcpy(a->colmap,oldmat->colmap,(a->Nbs)*sizeof(int));
2241 #endif
2242   } else a->colmap = 0;
2243   if (oldmat->garray && (len = ((Mat_SeqBAIJ *) (oldmat->B->data))->nbs)) {
2244     a->garray = (int *) PetscMalloc(len*sizeof(int)); CHKPTRQ(a->garray);
2245     PLogObjectMemory(mat,len*sizeof(int));
2246     PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
2247   } else a->garray = 0;
2248 
2249   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
2250   PLogObjectParent(mat,a->lvec);
2251   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
2252 
2253   PLogObjectParent(mat,a->Mvctx);
2254   ierr =  MatDuplicate(oldmat->A,cpvalues,&a->A); CHKERRQ(ierr);
2255   PLogObjectParent(mat,a->A);
2256   ierr =  MatDuplicate(oldmat->B,cpvalues,&a->B); CHKERRQ(ierr);
2257   PLogObjectParent(mat,a->B);
2258   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
2259   ierr = FListDuplicate(mat->qlist,&matin->qlist);CHKERRQ(ierr);
2260   if (flg) {
2261     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
2262   }
2263   *newmat = mat;
2264   PetscFunctionReturn(0);
2265 }
2266 
2267 #include "sys.h"
2268 
2269 #undef __FUNC__
2270 #define __FUNC__ "MatLoad_MPIBAIJ"
2271 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
2272 {
2273   Mat          A;
2274   int          i, nz, ierr, j,rstart, rend, fd;
2275   Scalar       *vals,*buf;
2276   MPI_Comm     comm = ((PetscObject)viewer)->comm;
2277   MPI_Status   status;
2278   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
2279   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
2280   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,Mbs,mbs,extra_rows;
2281   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
2282   int          dcount,kmax,k,nzcount,tmp;
2283 
2284   PetscFunctionBegin;
2285   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
2286 
2287   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
2288   if (!rank) {
2289     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
2290     ierr = PetscBinaryRead(fd,(char *)header,4,PETSC_INT); CHKERRQ(ierr);
2291     if (header[0] != MAT_COOKIE) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"not matrix object");
2292     if (header[3] < 0) {
2293       SETERRQ(PETSC_ERR_FILE_UNEXPECTED,1,"Matrix stored in special format, cannot load as MPIBAIJ");
2294     }
2295   }
2296 
2297   ierr = MPI_Bcast(header+1,3,MPI_INT,0,comm);CHKERRQ(ierr);
2298   M = header[1]; N = header[2];
2299 
2300   if (M != N) SETERRQ(PETSC_ERR_SUP,0,"Can only do square matrices");
2301 
2302   /*
2303      This code adds extra rows to make sure the number of rows is
2304      divisible by the blocksize
2305   */
2306   Mbs        = M/bs;
2307   extra_rows = bs - M + bs*(Mbs);
2308   if (extra_rows == bs) extra_rows = 0;
2309   else                  Mbs++;
2310   if (extra_rows &&!rank) {
2311     PLogInfo(0,"MatLoad_MPIBAIJ:Padding loaded matrix to match blocksize\n");
2312   }
2313 
2314   /* determine ownership of all rows */
2315   mbs = Mbs/size + ((Mbs % size) > rank);
2316   m   = mbs * bs;
2317   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
2318   browners = rowners + size + 1;
2319   ierr = MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
2320   rowners[0] = 0;
2321   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
2322   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
2323   rstart = rowners[rank];
2324   rend   = rowners[rank+1];
2325 
2326   /* distribute row lengths to all processors */
2327   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
2328   if (!rank) {
2329     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
2330     ierr = PetscBinaryRead(fd,rowlengths,M,PETSC_INT); CHKERRQ(ierr);
2331     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
2332     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
2333     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
2334     ierr = MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);CHKERRQ(ierr);
2335     PetscFree(sndcounts);
2336   } else {
2337     ierr = MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);CHKERRQ(ierr);
2338   }
2339 
2340   if (!rank) {
2341     /* calculate the number of nonzeros on each processor */
2342     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
2343     PetscMemzero(procsnz,size*sizeof(int));
2344     for ( i=0; i<size; i++ ) {
2345       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
2346         procsnz[i] += rowlengths[j];
2347       }
2348     }
2349     PetscFree(rowlengths);
2350 
2351     /* determine max buffer needed and allocate it */
2352     maxnz = 0;
2353     for ( i=0; i<size; i++ ) {
2354       maxnz = PetscMax(maxnz,procsnz[i]);
2355     }
2356     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
2357 
2358     /* read in my part of the matrix column indices  */
2359     nz = procsnz[0];
2360     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
2361     mycols = ibuf;
2362     if (size == 1)  nz -= extra_rows;
2363     ierr = PetscBinaryRead(fd,mycols,nz,PETSC_INT); CHKERRQ(ierr);
2364     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
2365 
2366     /* read in every ones (except the last) and ship off */
2367     for ( i=1; i<size-1; i++ ) {
2368       nz   = procsnz[i];
2369       ierr = PetscBinaryRead(fd,cols,nz,PETSC_INT); CHKERRQ(ierr);
2370       ierr = MPI_Send(cols,nz,MPI_INT,i,tag,comm);CHKERRQ(ierr);
2371     }
2372     /* read in the stuff for the last proc */
2373     if ( size != 1 ) {
2374       nz   = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
2375       ierr = PetscBinaryRead(fd,cols,nz,PETSC_INT); CHKERRQ(ierr);
2376       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
2377       ierr = MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);CHKERRQ(ierr);
2378     }
2379     PetscFree(cols);
2380   } else {
2381     /* determine buffer space needed for message */
2382     nz = 0;
2383     for ( i=0; i<m; i++ ) {
2384       nz += locrowlens[i];
2385     }
2386     ibuf   = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
2387     mycols = ibuf;
2388     /* receive message of column indices*/
2389     ierr = MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);CHKERRQ(ierr);
2390     ierr = MPI_Get_count(&status,MPI_INT,&maxnz);CHKERRQ(ierr);
2391     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"something is wrong with file");
2392   }
2393 
2394   /* loop over local rows, determining number of off diagonal entries */
2395   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
2396   odlens = dlens + (rend-rstart);
2397   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
2398   PetscMemzero(mask,3*Mbs*sizeof(int));
2399   masked1 = mask    + Mbs;
2400   masked2 = masked1 + Mbs;
2401   rowcount = 0; nzcount = 0;
2402   for ( i=0; i<mbs; i++ ) {
2403     dcount  = 0;
2404     odcount = 0;
2405     for ( j=0; j<bs; j++ ) {
2406       kmax = locrowlens[rowcount];
2407       for ( k=0; k<kmax; k++ ) {
2408         tmp = mycols[nzcount++]/bs;
2409         if (!mask[tmp]) {
2410           mask[tmp] = 1;
2411           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
2412           else masked1[dcount++] = tmp;
2413         }
2414       }
2415       rowcount++;
2416     }
2417 
2418     dlens[i]  = dcount;
2419     odlens[i] = odcount;
2420 
2421     /* zero out the mask elements we set */
2422     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
2423     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
2424   }
2425 
2426   /* create our matrix */
2427   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);
2428          CHKERRQ(ierr);
2429   A = *newmat;
2430   MatSetOption(A,MAT_COLUMNS_SORTED);
2431 
2432   if (!rank) {
2433     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
2434     /* read in my part of the matrix numerical values  */
2435     nz = procsnz[0];
2436     vals = buf;
2437     mycols = ibuf;
2438     if (size == 1)  nz -= extra_rows;
2439     ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
2440     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
2441 
2442     /* insert into matrix */
2443     jj      = rstart*bs;
2444     for ( i=0; i<m; i++ ) {
2445       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
2446       mycols += locrowlens[i];
2447       vals   += locrowlens[i];
2448       jj++;
2449     }
2450     /* read in other processors (except the last one) and ship out */
2451     for ( i=1; i<size-1; i++ ) {
2452       nz   = procsnz[i];
2453       vals = buf;
2454       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
2455       ierr = MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);CHKERRQ(ierr);
2456     }
2457     /* the last proc */
2458     if ( size != 1 ){
2459       nz   = procsnz[i] - extra_rows;
2460       vals = buf;
2461       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
2462       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
2463       ierr = MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);CHKERRQ(ierr);
2464     }
2465     PetscFree(procsnz);
2466   } else {
2467     /* receive numeric values */
2468     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
2469 
2470     /* receive message of values*/
2471     vals   = buf;
2472     mycols = ibuf;
2473     ierr   = MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);CHKERRQ(ierr);
2474     ierr   = MPI_Get_count(&status,MPIU_SCALAR,&maxnz);CHKERRQ(ierr);
2475     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"something is wrong with file");
2476 
2477     /* insert into matrix */
2478     jj      = rstart*bs;
2479     for ( i=0; i<m; i++ ) {
2480       ierr    = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
2481       mycols += locrowlens[i];
2482       vals   += locrowlens[i];
2483       jj++;
2484     }
2485   }
2486   PetscFree(locrowlens);
2487   PetscFree(buf);
2488   PetscFree(ibuf);
2489   PetscFree(rowners);
2490   PetscFree(dlens);
2491   PetscFree(mask);
2492   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2493   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2494   PetscFunctionReturn(0);
2495 }
2496 
2497 
2498 
2499 #undef __FUNC__
2500 #define __FUNC__ "MatMPIBAIJSetHashTableFactor"
2501 /*@
2502    MatMPIBAIJSetHashTableFactor - Sets the factor required to compute the size of the HashTable.
2503 
2504    Input Parameters:
2505 .  mat  - the matrix
2506 .  fact - factor
2507 
2508    Collective on Mat
2509 
2510   Notes:
2511    This can also be set by the command line option: -mat_use_hash_table fact
2512 
2513 .keywords: matrix, hashtable, factor, HT
2514 
2515 .seealso: MatSetOption()
2516 @*/
2517 int MatMPIBAIJSetHashTableFactor(Mat mat,double fact)
2518 {
2519   Mat_MPIBAIJ *baij;
2520 
2521   PetscFunctionBegin;
2522   PetscValidHeaderSpecific(mat,MAT_COOKIE);
2523   if (mat->type != MATMPIBAIJ) {
2524       SETERRQ(PETSC_ERR_ARG_WRONG,1,"Incorrect matrix type. Use MPIBAIJ only.");
2525   }
2526   baij = (Mat_MPIBAIJ*) mat->data;
2527   baij->ht_fact = fact;
2528   PetscFunctionReturn(0);
2529 }
2530