xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision 4787f7683dc5ea85e35b4269f7afceca0dae908f)
1 #ifdef PETSC_RCS_HEADER
2 static char vcid[] = "$Id: mpibaij.c,v 1.147 1999/02/12 00:04:25 balay Exp balay $";
3 #endif
4 
5 #include "src/mat/impls/baij/mpi/mpibaij.h"   /*I  "mat.h"  I*/
6 #include "src/vec/vecimpl.h"
7 
8 extern int MatSetUpMultiply_MPIBAIJ(Mat);
9 extern int DisAssemble_MPIBAIJ(Mat);
10 extern int MatIncreaseOverlap_MPIBAIJ(Mat,int,IS *,int);
11 extern int MatGetSubMatrices_MPIBAIJ(Mat,int,IS *,IS *,MatReuse,Mat **);
12 extern int MatGetValues_SeqBAIJ(Mat,int,int *,int,int *,Scalar *);
13 
14 /*
15      Local utility routine that creates a mapping from the global column
16    number to the local number in the off-diagonal part of the local
17    storage of the matrix.  This is done in a non scable way since the
18    length of colmap equals the global matrix length.
19 */
20 #undef __FUNC__
21 #define __FUNC__ "CreateColmap_MPIBAIJ_Private"
22 static int CreateColmap_MPIBAIJ_Private(Mat mat)
23 {
24   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
25   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
26   int         nbs = B->nbs,i,bs=B->bs;
27 #if defined (USE_CTABLE)
28   int         ierr;
29 #endif
30 
31   PetscFunctionBegin;
32 #if defined (USE_CTABLE)
33   ierr = TableCreate( &baij->colmap, baij->nbs/5 ); CHKERRQ(ierr);
34   for ( i=0; i<nbs; i++ ){
35     ierr = TableAdd( baij->colmap, baij->garray[i] + 1, i*bs+1 );CHKERRQ(ierr);
36   }
37 #else
38   baij->colmap = (int *) PetscMalloc((baij->Nbs+1)*sizeof(int));CHKPTRQ(baij->colmap);
39   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
40   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
41   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i*bs+1;
42 #endif
43   PetscFunctionReturn(0);
44 }
45 
46 #define CHUNKSIZE  10
47 
48 #define  MatSetValues_SeqBAIJ_A_Private(row,col,value,addv) \
49 { \
50  \
51     brow = row/bs;  \
52     rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
53     rmax = aimax[brow]; nrow = ailen[brow]; \
54       bcol = col/bs; \
55       ridx = row % bs; cidx = col % bs; \
56       low = 0; high = nrow; \
57       while (high-low > 3) { \
58         t = (low+high)/2; \
59         if (rp[t] > bcol) high = t; \
60         else              low  = t; \
61       } \
62       for ( _i=low; _i<high; _i++ ) { \
63         if (rp[_i] > bcol) break; \
64         if (rp[_i] == bcol) { \
65           bap  = ap +  bs2*_i + bs*cidx + ridx; \
66           if (addv == ADD_VALUES) *bap += value;  \
67           else                    *bap  = value;  \
68           goto a_noinsert; \
69         } \
70       } \
71       if (a->nonew == 1) goto a_noinsert; \
72       else if (a->nonew == -1) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero into matrix"); \
73       if (nrow >= rmax) { \
74         /* there is no extra room in row, therefore enlarge */ \
75         int    new_nz = ai[a->mbs] + CHUNKSIZE,len,*new_i,*new_j; \
76         Scalar *new_a; \
77  \
78         if (a->nonew == -2) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero in the matrix"); \
79  \
80         /* malloc new storage space */ \
81         len     = new_nz*(sizeof(int)+bs2*sizeof(Scalar))+(a->mbs+1)*sizeof(int); \
82         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
83         new_j   = (int *) (new_a + bs2*new_nz); \
84         new_i   = new_j + new_nz; \
85  \
86         /* copy over old data into new slots */ \
87         for ( ii=0; ii<brow+1; ii++ ) {new_i[ii] = ai[ii];} \
88         for ( ii=brow+1; ii<a->mbs+1; ii++ ) {new_i[ii] = ai[ii]+CHUNKSIZE;} \
89         PetscMemcpy(new_j,aj,(ai[brow]+nrow)*sizeof(int)); \
90         len = (new_nz - CHUNKSIZE - ai[brow] - nrow); \
91         PetscMemcpy(new_j+ai[brow]+nrow+CHUNKSIZE,aj+ai[brow]+nrow, \
92                                                            len*sizeof(int)); \
93         PetscMemcpy(new_a,aa,(ai[brow]+nrow)*bs2*sizeof(Scalar)); \
94         PetscMemzero(new_a+bs2*(ai[brow]+nrow),bs2*CHUNKSIZE*sizeof(Scalar)); \
95         PetscMemcpy(new_a+bs2*(ai[brow]+nrow+CHUNKSIZE), \
96                     aa+bs2*(ai[brow]+nrow),bs2*len*sizeof(Scalar));  \
97         /* free up old matrix storage */ \
98         PetscFree(a->a);  \
99         if (!a->singlemalloc) {PetscFree(a->i);PetscFree(a->j);} \
100         aa = a->a = new_a; ai = a->i = new_i; aj = a->j = new_j;  \
101         a->singlemalloc = 1; \
102  \
103         rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
104         rmax = aimax[brow] = aimax[brow] + CHUNKSIZE; \
105         PLogObjectMemory(A,CHUNKSIZE*(sizeof(int) + bs2*sizeof(Scalar))); \
106         a->maxnz += bs2*CHUNKSIZE; \
107         a->reallocs++; \
108         a->nz++; \
109       } \
110       N = nrow++ - 1;  \
111       /* shift up all the later entries in this row */ \
112       for ( ii=N; ii>=_i; ii-- ) { \
113         rp[ii+1] = rp[ii]; \
114         PetscMemcpy(ap+bs2*(ii+1),ap+bs2*(ii),bs2*sizeof(Scalar)); \
115       } \
116       if (N>=_i) PetscMemzero(ap+bs2*_i,bs2*sizeof(Scalar));  \
117       rp[_i]                      = bcol;  \
118       ap[bs2*_i + bs*cidx + ridx] = value;  \
119       a_noinsert:; \
120     ailen[brow] = nrow; \
121 }
122 
123 #define  MatSetValues_SeqBAIJ_B_Private(row,col,value,addv) \
124 { \
125  \
126     brow = row/bs;  \
127     rp   = bj + bi[brow]; ap = ba + bs2*bi[brow]; \
128     rmax = bimax[brow]; nrow = bilen[brow]; \
129       bcol = col/bs; \
130       ridx = row % bs; cidx = col % bs; \
131       low = 0; high = nrow; \
132       while (high-low > 3) { \
133         t = (low+high)/2; \
134         if (rp[t] > bcol) high = t; \
135         else              low  = t; \
136       } \
137       for ( _i=low; _i<high; _i++ ) { \
138         if (rp[_i] > bcol) break; \
139         if (rp[_i] == bcol) { \
140           bap  = ap +  bs2*_i + bs*cidx + ridx; \
141           if (addv == ADD_VALUES) *bap += value;  \
142           else                    *bap  = value;  \
143           goto b_noinsert; \
144         } \
145       } \
146       if (b->nonew == 1) goto b_noinsert; \
147       else if (b->nonew == -1) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero into matrix"); \
148       if (nrow >= rmax) { \
149         /* there is no extra room in row, therefore enlarge */ \
150         int    new_nz = bi[b->mbs] + CHUNKSIZE,len,*new_i,*new_j; \
151         Scalar *new_a; \
152  \
153         if (b->nonew == -2) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero in the matrix"); \
154  \
155         /* malloc new storage space */ \
156         len     = new_nz*(sizeof(int)+bs2*sizeof(Scalar))+(b->mbs+1)*sizeof(int); \
157         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
158         new_j   = (int *) (new_a + bs2*new_nz); \
159         new_i   = new_j + new_nz; \
160  \
161         /* copy over old data into new slots */ \
162         for ( ii=0; ii<brow+1; ii++ ) {new_i[ii] = bi[ii];} \
163         for ( ii=brow+1; ii<b->mbs+1; ii++ ) {new_i[ii] = bi[ii]+CHUNKSIZE;} \
164         PetscMemcpy(new_j,bj,(bi[brow]+nrow)*sizeof(int)); \
165         len = (new_nz - CHUNKSIZE - bi[brow] - nrow); \
166         PetscMemcpy(new_j+bi[brow]+nrow+CHUNKSIZE,bj+bi[brow]+nrow, \
167                                                            len*sizeof(int)); \
168         PetscMemcpy(new_a,ba,(bi[brow]+nrow)*bs2*sizeof(Scalar)); \
169         PetscMemzero(new_a+bs2*(bi[brow]+nrow),bs2*CHUNKSIZE*sizeof(Scalar)); \
170         PetscMemcpy(new_a+bs2*(bi[brow]+nrow+CHUNKSIZE), \
171                     ba+bs2*(bi[brow]+nrow),bs2*len*sizeof(Scalar));  \
172         /* free up old matrix storage */ \
173         PetscFree(b->a);  \
174         if (!b->singlemalloc) {PetscFree(b->i);PetscFree(b->j);} \
175         ba = b->a = new_a; bi = b->i = new_i; bj = b->j = new_j;  \
176         b->singlemalloc = 1; \
177  \
178         rp   = bj + bi[brow]; ap = ba + bs2*bi[brow]; \
179         rmax = bimax[brow] = bimax[brow] + CHUNKSIZE; \
180         PLogObjectMemory(B,CHUNKSIZE*(sizeof(int) + bs2*sizeof(Scalar))); \
181         b->maxnz += bs2*CHUNKSIZE; \
182         b->reallocs++; \
183         b->nz++; \
184       } \
185       N = nrow++ - 1;  \
186       /* shift up all the later entries in this row */ \
187       for ( ii=N; ii>=_i; ii-- ) { \
188         rp[ii+1] = rp[ii]; \
189         PetscMemcpy(ap+bs2*(ii+1),ap+bs2*(ii),bs2*sizeof(Scalar)); \
190       } \
191       if (N>=_i) PetscMemzero(ap+bs2*_i,bs2*sizeof(Scalar));  \
192       rp[_i]                      = bcol;  \
193       ap[bs2*_i + bs*cidx + ridx] = value;  \
194       b_noinsert:; \
195     bilen[brow] = nrow; \
196 }
197 
198 #undef __FUNC__
199 #define __FUNC__ "MatSetValues_MPIBAIJ"
200 int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
201 {
202   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
203   Scalar      value;
204   int         ierr,i,j,row,col;
205   int         roworiented = baij->roworiented,rstart_orig=baij->rstart_bs ;
206   int         rend_orig=baij->rend_bs,cstart_orig=baij->cstart_bs;
207   int         cend_orig=baij->cend_bs,bs=baij->bs;
208 
209   /* Some Variables required in the macro */
210   Mat         A = baij->A;
211   Mat_SeqBAIJ *a = (Mat_SeqBAIJ *) (A)->data;
212   int         *aimax=a->imax,*ai=a->i,*ailen=a->ilen,*aj=a->j;
213   Scalar      *aa=a->a;
214 
215   Mat         B = baij->B;
216   Mat_SeqBAIJ *b = (Mat_SeqBAIJ *) (B)->data;
217   int         *bimax=b->imax,*bi=b->i,*bilen=b->ilen,*bj=b->j;
218   Scalar      *ba=b->a;
219 
220   int         *rp,ii,nrow,_i,rmax,N,brow,bcol;
221   int         low,high,t,ridx,cidx,bs2=a->bs2;
222   Scalar      *ap,*bap;
223 
224   PetscFunctionBegin;
225   for ( i=0; i<m; i++ ) {
226     if (im[i] < 0) continue;
227 #if defined(USE_PETSC_BOPT_g)
228     if (im[i] >= baij->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
229 #endif
230     if (im[i] >= rstart_orig && im[i] < rend_orig) {
231       row = im[i] - rstart_orig;
232       for ( j=0; j<n; j++ ) {
233         if (in[j] >= cstart_orig && in[j] < cend_orig){
234           col = in[j] - cstart_orig;
235           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
236           MatSetValues_SeqBAIJ_A_Private(row,col,value,addv);
237           /* ierr = MatSetValues_SeqBAIJ(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
238         }
239         else if (in[j] < 0) continue;
240 #if defined(USE_PETSC_BOPT_g)
241         else if (in[j] >= baij->N) {SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Col too large");}
242 #endif
243         else {
244           if (mat->was_assembled) {
245             if (!baij->colmap) {
246               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
247             }
248 #if defined (USE_CTABLE)
249 	    col = TableFind( baij->colmap, in[j]/bs + 1 ) - 1 + in[j]%bs;
250 #else
251             col = baij->colmap[in[j]/bs] - 1 + in[j]%bs;
252 #endif
253             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
254               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
255               col =  in[j];
256               /* Reinitialize the variables required by MatSetValues_SeqBAIJ_B_Private() */
257               B = baij->B;
258               b = (Mat_SeqBAIJ *) (B)->data;
259               bimax=b->imax;bi=b->i;bilen=b->ilen;bj=b->j;
260               ba=b->a;
261             }
262           } else col = in[j];
263           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
264           MatSetValues_SeqBAIJ_B_Private(row,col,value,addv);
265           /* ierr = MatSetValues_SeqBAIJ(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
266         }
267       }
268     } else {
269       if (roworiented && !baij->donotstash) {
270         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
271       } else {
272         if (!baij->donotstash) {
273           row = im[i];
274 	  for ( j=0; j<n; j++ ) {
275 	    ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
276           }
277         }
278       }
279     }
280   }
281   PetscFunctionReturn(0);
282 }
283 
284 extern int MatSetValuesBlocked_SeqBAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
285 #undef __FUNC__
286 #define __FUNC__ "MatSetValuesBlocked_MPIBAIJ"
287 int MatSetValuesBlocked_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
288 {
289   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
290   Scalar      *value,*barray=baij->barray;
291   int         ierr,i,j,ii,jj,row,col,k,l;
292   int         roworiented = baij->roworiented,rstart=baij->rstart ;
293   int         rend=baij->rend,cstart=baij->cstart,stepval;
294   int         cend=baij->cend,bs=baij->bs,bs2=baij->bs2;
295 
296   if(!barray) {
297     baij->barray = barray = (Scalar*) PetscMalloc(bs2*sizeof(Scalar)); CHKPTRQ(barray);
298   }
299 
300   if (roworiented) {
301     stepval = (n-1)*bs;
302   } else {
303     stepval = (m-1)*bs;
304   }
305   for ( i=0; i<m; i++ ) {
306     if (im[i] < 0) continue;
307 #if defined(USE_PETSC_BOPT_g)
308     if (im[i] >= baij->Mbs) SETERRQ2(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large, row %d max %d",im[i],baij->Mbs);
309 #endif
310     if (im[i] >= rstart && im[i] < rend) {
311       row = im[i] - rstart;
312       for ( j=0; j<n; j++ ) {
313         /* If NumCol = 1 then a copy is not required */
314         if ((roworiented) && (n == 1)) {
315           barray = v + i*bs2;
316         } else if((!roworiented) && (m == 1)) {
317           barray = v + j*bs2;
318         } else { /* Here a copy is required */
319           if (roworiented) {
320             value = v + i*(stepval+bs)*bs + j*bs;
321           } else {
322             value = v + j*(stepval+bs)*bs + i*bs;
323           }
324           for ( ii=0; ii<bs; ii++,value+=stepval ) {
325             for (jj=0; jj<bs; jj++ ) {
326               *barray++  = *value++;
327             }
328           }
329           barray -=bs2;
330         }
331 
332         if (in[j] >= cstart && in[j] < cend){
333           col  = in[j] - cstart;
334           ierr = MatSetValuesBlocked_SeqBAIJ(baij->A,1,&row,1,&col,barray,addv);CHKERRQ(ierr);
335         }
336         else if (in[j] < 0) continue;
337 #if defined(USE_PETSC_BOPT_g)
338         else if (in[j] >= baij->Nbs) {SETERRQ2(PETSC_ERR_ARG_OUTOFRANGE,0,"Column too large, col %d max %d",in[j],baij->Nbs);}
339 #endif
340         else {
341           if (mat->was_assembled) {
342             if (!baij->colmap) {
343               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
344             }
345 
346 #if defined(USE_PETSC_BOPT_g)
347 #if defined (USE_CTABLE)
348             if( (TableFind( baij->colmap, in[j] + 1 ) - 1) % bs)
349 	      {SETERRQ(PETSC_ERR_PLIB,0,"Incorrect colmap");}
350 #else
351             if ((baij->colmap[in[j]] - 1) % bs) {SETERRQ(PETSC_ERR_PLIB,0,"Incorrect colmap");}
352 #endif
353 #endif
354 #if defined (USE_CTABLE)
355 	    col = (TableFind( baij->colmap, in[j] + 1 ) - 1)/bs;
356 #else
357             col = (baij->colmap[in[j]] - 1)/bs;
358 #endif
359             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
360               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
361               col =  in[j];
362             }
363           }
364           else col = in[j];
365           ierr = MatSetValuesBlocked_SeqBAIJ(baij->B,1,&row,1,&col,barray,addv);CHKERRQ(ierr);
366         }
367       }
368     } else {
369       if (!baij->donotstash) {
370         if (roworiented ) {
371           row   = im[i]*bs;
372           value = v + i*(stepval+bs)*bs;
373           for ( j=0; j<bs; j++,row++ ) {
374             for ( k=0; k<n; k++ ) {
375               for ( col=in[k]*bs,l=0; l<bs; l++,col++) {
376                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
377               }
378             }
379           }
380         } else {
381           for ( j=0; j<n; j++ ) {
382             value = v + j*(stepval+bs)*bs + i*bs;
383             col   = in[j]*bs;
384             for ( k=0; k<bs; k++,col++,value+=stepval) {
385               for ( row = im[i]*bs,l=0; l<bs; l++,row++) {
386                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
387               }
388             }
389           }
390         }
391       }
392     }
393   }
394   PetscFunctionReturn(0);
395 }
396 #define HASH_KEY 0.6180339887
397 /* #define HASH1(size,key) ((int)((size)*fmod(((key)*HASH_KEY),1))) */
398 #define HASH(size,key,tmp) (tmp = (key)*HASH_KEY,(int)((size)*(tmp-(int)tmp)))
399 /* #define HASH(size,key,tmp) ((int)((size)*fmod(((key)*HASH_KEY),1))) */
400 #undef __FUNC__
401 #define __FUNC__ "MatSetValues_MPIBAIJ_HT"
402 int MatSetValues_MPIBAIJ_HT(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
403 {
404   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
405   int         ierr,i,j,row,col;
406   int         roworiented = baij->roworiented,rstart_orig=baij->rstart_bs ;
407   int         rend_orig=baij->rend_bs,Nbs=baij->Nbs;
408   int         h1,key,size=baij->ht_size,bs=baij->bs,*HT=baij->ht,idx;
409   double      tmp;
410   Scalar      ** HD = baij->hd,value;
411 #if defined(USE_PETSC_BOPT_g)
412   int         total_ct=baij->ht_total_ct,insert_ct=baij->ht_insert_ct;
413 #endif
414 
415   PetscFunctionBegin;
416 
417   for ( i=0; i<m; i++ ) {
418 #if defined(USE_PETSC_BOPT_g)
419     if (im[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
420     if (im[i] >= baij->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
421 #endif
422       row = im[i];
423     if (row >= rstart_orig && row < rend_orig) {
424       for ( j=0; j<n; j++ ) {
425         col = in[j];
426         if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
427         /* Look up into the Hash Table */
428         key = (row/bs)*Nbs+(col/bs)+1;
429         h1  = HASH(size,key,tmp);
430 
431 
432         idx = h1;
433 #if defined(USE_PETSC_BOPT_g)
434         insert_ct++;
435         total_ct++;
436         if (HT[idx] != key) {
437           for ( idx=h1; (idx<size) && (HT[idx]!=key); idx++,total_ct++);
438           if (idx == size) {
439             for ( idx=0; (idx<h1) && (HT[idx]!=key); idx++,total_ct++);
440             if (idx == h1) {
441               SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"(row,col) has no entry in the hash table");
442             }
443           }
444         }
445 #else
446         if (HT[idx] != key) {
447           for ( idx=h1; (idx<size) && (HT[idx]!=key); idx++);
448           if (idx == size) {
449             for ( idx=0; (idx<h1) && (HT[idx]!=key); idx++);
450             if (idx == h1) {
451               SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"(row,col) has no entry in the hash table");
452             }
453           }
454         }
455 #endif
456         /* A HASH table entry is found, so insert the values at the correct address */
457         if (addv == ADD_VALUES) *(HD[idx]+ (col % bs)*bs + (row % bs)) += value;
458         else                    *(HD[idx]+ (col % bs)*bs + (row % bs))  = value;
459       }
460     } else {
461       if (roworiented && !baij->donotstash) {
462         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
463       } else {
464         if (!baij->donotstash) {
465           row = im[i];
466 	  for ( j=0; j<n; j++ ) {
467 	    ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
468           }
469         }
470       }
471     }
472   }
473 #if defined(USE_PETSC_BOPT_g)
474   baij->ht_total_ct = total_ct;
475   baij->ht_insert_ct = insert_ct;
476 #endif
477   PetscFunctionReturn(0);
478 }
479 
480 #undef __FUNC__
481 #define __FUNC__ "MatSetValuesBlocked_MPIBAIJ_HT"
482 int MatSetValuesBlocked_MPIBAIJ_HT(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
483 {
484   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
485   int         ierr,i,j,ii,jj,row,col,k,l;
486   int         roworiented = baij->roworiented,rstart=baij->rstart ;
487   int         rend=baij->rend,stepval,bs=baij->bs,bs2=baij->bs2;
488   int         h1,key,size=baij->ht_size,idx,*HT=baij->ht,Nbs=baij->Nbs;
489   double      tmp;
490   Scalar      ** HD = baij->hd,*value,*v_t,*baij_a;
491 #if defined(USE_PETSC_BOPT_g)
492   int         total_ct=baij->ht_total_ct,insert_ct=baij->ht_insert_ct;
493 #endif
494 
495   PetscFunctionBegin;
496 
497   if (roworiented) {
498     stepval = (n-1)*bs;
499   } else {
500     stepval = (m-1)*bs;
501   }
502   for ( i=0; i<m; i++ ) {
503 #if defined(USE_PETSC_BOPT_g)
504     if (im[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
505     if (im[i] >= baij->Mbs) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
506 #endif
507     row   = im[i];
508     v_t   = v + i*bs2;
509     if (row >= rstart && row < rend) {
510       for ( j=0; j<n; j++ ) {
511         col = in[j];
512 
513         /* Look up into the Hash Table */
514         key = row*Nbs+col+1;
515         h1  = HASH(size,key,tmp);
516 
517         idx = h1;
518 #if defined(USE_PETSC_BOPT_g)
519         total_ct++;
520         insert_ct++;
521        if (HT[idx] != key) {
522           for ( idx=h1; (idx<size) && (HT[idx]!=key); idx++,total_ct++);
523           if (idx == size) {
524             for ( idx=0; (idx<h1) && (HT[idx]!=key); idx++,total_ct++);
525             if (idx == h1) {
526               SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"(row,col) has no entry in the hash table");
527             }
528           }
529         }
530 #else
531         if (HT[idx] != key) {
532           for ( idx=h1; (idx<size) && (HT[idx]!=key); idx++);
533           if (idx == size) {
534             for ( idx=0; (idx<h1) && (HT[idx]!=key); idx++);
535             if (idx == h1) {
536               SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"(row,col) has no entry in the hash table");
537             }
538           }
539         }
540 #endif
541         baij_a = HD[idx];
542         if (roworiented) {
543           /*value = v + i*(stepval+bs)*bs + j*bs;*/
544           /* value = v + (i*(stepval+bs)+j)*bs; */
545           value = v_t;
546           v_t  += bs;
547           if (addv == ADD_VALUES) {
548             for ( ii=0; ii<bs; ii++,value+=stepval) {
549               for ( jj=ii; jj<bs2; jj+=bs ) {
550                 baij_a[jj]  += *value++;
551               }
552             }
553           } else {
554             for ( ii=0; ii<bs; ii++,value+=stepval) {
555               for ( jj=ii; jj<bs2; jj+=bs ) {
556                 baij_a[jj]  = *value++;
557               }
558             }
559           }
560         } else {
561           value = v + j*(stepval+bs)*bs + i*bs;
562           if (addv == ADD_VALUES) {
563             for ( ii=0; ii<bs; ii++,value+=stepval,baij_a+=bs ) {
564               for ( jj=0; jj<bs; jj++ ) {
565                 baij_a[jj]  += *value++;
566               }
567             }
568           } else {
569             for ( ii=0; ii<bs; ii++,value+=stepval,baij_a+=bs ) {
570               for ( jj=0; jj<bs; jj++ ) {
571                 baij_a[jj]  = *value++;
572               }
573             }
574           }
575         }
576       }
577     } else {
578       if (!baij->donotstash) {
579         if (roworiented ) {
580           row   = im[i]*bs;
581           value = v + i*(stepval+bs)*bs;
582           for ( j=0; j<bs; j++,row++ ) {
583             for ( k=0; k<n; k++ ) {
584               for ( col=in[k]*bs,l=0; l<bs; l++,col++) {
585                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
586               }
587             }
588           }
589         } else {
590           for ( j=0; j<n; j++ ) {
591             value = v + j*(stepval+bs)*bs + i*bs;
592             col   = in[j]*bs;
593             for ( k=0; k<bs; k++,col++,value+=stepval) {
594               for ( row = im[i]*bs,l=0; l<bs; l++,row++) {
595                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
596               }
597             }
598           }
599         }
600       }
601     }
602   }
603 #if defined(USE_PETSC_BOPT_g)
604   baij->ht_total_ct = total_ct;
605   baij->ht_insert_ct = insert_ct;
606 #endif
607   PetscFunctionReturn(0);
608 }
609 
610 #undef __FUNC__
611 #define __FUNC__ "MatGetValues_MPIBAIJ"
612 int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
613 {
614   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
615   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
616   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col,data;
617 
618   PetscFunctionBegin;
619   for ( i=0; i<m; i++ ) {
620     if (idxm[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
621     if (idxm[i] >= baij->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
622     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
623       row = idxm[i] - bsrstart;
624       for ( j=0; j<n; j++ ) {
625         if (idxn[j] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative column");
626         if (idxn[j] >= baij->N) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Column too large");
627         if (idxn[j] >= bscstart && idxn[j] < bscend){
628           col = idxn[j] - bscstart;
629           ierr = MatGetValues_SeqBAIJ(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
630         } else {
631           if (!baij->colmap) {
632             ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
633           }
634 #if defined (USE_CTABLE)
635           data = TableFind( baij->colmap, idxn[j]/bs + 1 ) - 1;
636 #else
637           data = baij->colmap[idxn[j]/bs]-1;
638 #endif
639           if((data < 0) || (baij->garray[data/bs] != idxn[j]/bs)) *(v+i*n+j) = 0.0;
640           else {
641             col  = data + idxn[j]%bs;
642             ierr = MatGetValues_SeqBAIJ(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
643           }
644         }
645       }
646     } else {
647       SETERRQ(PETSC_ERR_SUP,0,"Only local values currently supported");
648     }
649   }
650  PetscFunctionReturn(0);
651 }
652 
653 #undef __FUNC__
654 #define __FUNC__ "MatNorm_MPIBAIJ"
655 int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
656 {
657   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
658   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
659   int        ierr, i,bs2=baij->bs2;
660   double     sum = 0.0;
661   Scalar     *v;
662 
663   PetscFunctionBegin;
664   if (baij->size == 1) {
665     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
666   } else {
667     if (type == NORM_FROBENIUS) {
668       v = amat->a;
669       for (i=0; i<amat->nz*bs2; i++ ) {
670 #if defined(USE_PETSC_COMPLEX)
671         sum += PetscReal(PetscConj(*v)*(*v)); v++;
672 #else
673         sum += (*v)*(*v); v++;
674 #endif
675       }
676       v = bmat->a;
677       for (i=0; i<bmat->nz*bs2; i++ ) {
678 #if defined(USE_PETSC_COMPLEX)
679         sum += PetscReal(PetscConj(*v)*(*v)); v++;
680 #else
681         sum += (*v)*(*v); v++;
682 #endif
683       }
684       ierr = MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);CHKERRQ(ierr);
685       *norm = sqrt(*norm);
686     } else {
687       SETERRQ(PETSC_ERR_SUP,0,"No support for this norm yet");
688     }
689   }
690   PetscFunctionReturn(0);
691 }
692 
693 #undef __FUNC__
694 #define __FUNC__ "MatAssemblyBegin_MPIBAIJ"
695 int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
696 {
697   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
698   MPI_Comm    comm = mat->comm;
699   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
700   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
701   MPI_Request *send_waits,*recv_waits;
702   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
703   InsertMode  addv;
704   Scalar      *rvalues,*svalues;
705 
706   PetscFunctionBegin;
707   if (baij->donotstash) {
708     baij->svalues    = 0; baij->rvalues    = 0;
709     baij->nsends     = 0; baij->nrecvs     = 0;
710     baij->send_waits = 0; baij->recv_waits = 0;
711     baij->rmax       = 0;
712     PetscFunctionReturn(0);
713   }
714 
715   /* make sure all processors are either in INSERTMODE or ADDMODE */
716   ierr = MPI_Allreduce(&mat->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);CHKERRQ(ierr);
717   if (addv == (ADD_VALUES|INSERT_VALUES)) {
718     SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"Some processors inserted others added");
719   }
720   mat->insertmode = addv; /* in case this processor had no cache */
721 
722   /*  first count number of contributors to each processor */
723   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
724   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
725   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
726   for ( i=0; i<baij->stash.n; i++ ) {
727     idx = baij->stash.idx[i];
728     for ( j=0; j<size; j++ ) {
729       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
730         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
731       }
732     }
733   }
734   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
735 
736   /* inform other processors of number of messages and max length*/
737   work      = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
738   ierr      = MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
739   nreceives = work[rank];
740   ierr      = MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);CHKERRQ(ierr);
741   nmax      = work[rank];
742   PetscFree(work);
743 
744   /* post receives:
745        1) each message will consist of ordered pairs
746      (global index,value) we store the global index as a double
747      to simplify the message passing.
748        2) since we don't know how long each individual message is we
749      allocate the largest needed buffer for each receive. Potentially
750      this is a lot of wasted space.
751 
752 
753        This could be done better.
754   */
755   rvalues    = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));CHKPTRQ(rvalues);
756   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
757   for ( i=0; i<nreceives; i++ ) {
758     ierr = MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,comm,recv_waits+i);CHKERRQ(ierr);
759   }
760 
761   /* do sends:
762       1) starts[i] gives the starting index in svalues for stuff going to
763          the ith processor
764   */
765   svalues    = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
766   send_waits = (MPI_Request *) PetscMalloc((nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
767   starts     = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
768   starts[0] = 0;
769   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
770   for ( i=0; i<baij->stash.n; i++ ) {
771     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
772     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
773     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
774   }
775   PetscFree(owner);
776   starts[0] = 0;
777   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
778   count = 0;
779   for ( i=0; i<size; i++ ) {
780     if (procs[i]) {
781       ierr = MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,comm,send_waits+count++);CHKERRQ(ierr);
782     }
783   }
784   PetscFree(starts); PetscFree(nprocs);
785 
786   /* Free cache space */
787   PLogInfo(baij->A,"MatAssemblyBegin_MPIBAIJ:Number of off-processor values %d\n",baij->stash.n);
788   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
789 
790   baij->svalues    = svalues;    baij->rvalues    = rvalues;
791   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
792   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
793   baij->rmax       = nmax;
794 
795   PetscFunctionReturn(0);
796 }
797 
798 /*
799   Creates the hash table, and sets the table
800   This table is created only once.
801   If new entried need to be added to the matrix
802   then the hash table has to be destroyed and
803   recreated.
804 */
805 #undef __FUNC__
806 #define __FUNC__ "MatCreateHashTable_MPIBAIJ_Private"
807 int MatCreateHashTable_MPIBAIJ_Private(Mat mat,double factor)
808 {
809   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
810   Mat         A = baij->A, B=baij->B;
811   Mat_SeqBAIJ *a=(Mat_SeqBAIJ *)A->data, *b=(Mat_SeqBAIJ *)B->data;
812   int         i,j,k,nz=a->nz+b->nz,h1,*ai=a->i,*aj=a->j,*bi=b->i,*bj=b->j;
813   int         size,bs2=baij->bs2,rstart=baij->rstart;
814   int         cstart=baij->cstart,*garray=baij->garray,row,col,Nbs=baij->Nbs;
815   int         *HT,key;
816   Scalar      **HD;
817   double      tmp;
818 #if defined(USE_PETSC_BOPT_g)
819   int         ct=0,max=0;
820 #endif
821 
822   PetscFunctionBegin;
823   baij->ht_size=(int)(factor*nz);
824   size = baij->ht_size;
825 
826   if (baij->ht) {
827     PetscFunctionReturn(0);
828   }
829 
830   /* Allocate Memory for Hash Table */
831   baij->hd = (Scalar**)PetscMalloc((size)*(sizeof(int)+sizeof(Scalar*))+1); CHKPTRQ(baij->hd);
832   baij->ht = (int*)(baij->hd + size);
833   HD = baij->hd;
834   HT = baij->ht;
835 
836 
837   PetscMemzero(HD,size*(sizeof(int)+sizeof(Scalar*)));
838 
839 
840   /* Loop Over A */
841   for ( i=0; i<a->mbs; i++ ) {
842     for ( j=ai[i]; j<ai[i+1]; j++ ) {
843       row = i+rstart;
844       col = aj[j]+cstart;
845 
846       key = row*Nbs + col + 1;
847       h1  = HASH(size,key,tmp);
848       for ( k=0; k<size; k++ ){
849         if (HT[(h1+k)%size] == 0.0) {
850           HT[(h1+k)%size] = key;
851           HD[(h1+k)%size] = a->a + j*bs2;
852           break;
853 #if defined(USE_PETSC_BOPT_g)
854         } else {
855           ct++;
856 #endif
857         }
858       }
859 #if defined(USE_PETSC_BOPT_g)
860       if (k> max) max = k;
861 #endif
862     }
863   }
864   /* Loop Over B */
865   for ( i=0; i<b->mbs; i++ ) {
866     for ( j=bi[i]; j<bi[i+1]; j++ ) {
867       row = i+rstart;
868       col = garray[bj[j]];
869       key = row*Nbs + col + 1;
870       h1  = HASH(size,key,tmp);
871       for ( k=0; k<size; k++ ){
872         if (HT[(h1+k)%size] == 0.0) {
873           HT[(h1+k)%size] = key;
874           HD[(h1+k)%size] = b->a + j*bs2;
875           break;
876 #if defined(USE_PETSC_BOPT_g)
877         } else {
878           ct++;
879 #endif
880         }
881       }
882 #if defined(USE_PETSC_BOPT_g)
883       if (k> max) max = k;
884 #endif
885     }
886   }
887 
888   /* Print Summary */
889 #if defined(USE_PETSC_BOPT_g)
890   for ( i=0,j=0; i<size; i++)
891     if (HT[i]) {j++;}
892   PLogInfo(0,"MatCreateHashTable_MPIBAIJ_Private: Average Search = %5.2f,max search = %d\n",
893            (j== 0)? 0.0:((double)(ct+j))/j,max);
894 #endif
895   PetscFunctionReturn(0);
896 }
897 
898 #undef __FUNC__
899 #define __FUNC__ "MatAssemblyEnd_MPIBAIJ"
900 int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
901 {
902   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
903   MPI_Status  *send_status,recv_status;
904   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
905   int         bs=baij->bs,row,col,other_disassembled;
906   Scalar      *values,val;
907   InsertMode  addv = mat->insertmode;
908 
909   PetscFunctionBegin;
910   /*  wait on receives */
911   while (count) {
912     ierr = MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);CHKERRQ(ierr);
913     /* unpack receives into our local space */
914     values = baij->rvalues + 3*imdex*baij->rmax;
915     ierr = MPI_Get_count(&recv_status,MPIU_SCALAR,&n);CHKERRQ(ierr);
916     n = n/3;
917     for ( i=0; i<n; i++ ) {
918       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
919       col = (int) PetscReal(values[3*i+1]);
920       val = values[3*i+2];
921       if (col >= baij->cstart*bs && col < baij->cend*bs) {
922         col -= baij->cstart*bs;
923         ierr = MatSetValues(baij->A,1,&row,1,&col,&val,addv); CHKERRQ(ierr)
924       } else {
925         if (mat->was_assembled) {
926           if (!baij->colmap) {
927             ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);
928           }
929 #if defined (USE_CTABLE)
930 	  col = TableFind( baij->colmap, col/bs + 1 ) - 1 + col%bs;
931 #else
932           col = (baij->colmap[col/bs]) - 1 + col%bs;
933 #endif
934           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
935             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
936             col = (int) PetscReal(values[3*i+1]);
937           }
938         }
939         ierr = MatSetValues(baij->B,1,&row,1,&col,&val,addv); CHKERRQ(ierr)
940       }
941     }
942     count--;
943   }
944   if (baij->recv_waits) PetscFree(baij->recv_waits);
945   if (baij->rvalues)    PetscFree(baij->rvalues);
946 
947   /* wait on sends */
948   if (baij->nsends) {
949     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
950     ierr        = MPI_Waitall(baij->nsends,baij->send_waits,send_status);CHKERRQ(ierr);
951     PetscFree(send_status);
952   }
953   if (baij->send_waits) PetscFree(baij->send_waits);
954   if (baij->svalues)    PetscFree(baij->svalues);
955 
956   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
957   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
958 
959   /* determine if any processor has disassembled, if so we must
960      also disassemble ourselfs, in order that we may reassemble. */
961   /*
962      if nonzero structure of submatrix B cannot change then we know that
963      no processor disassembled thus we can skip this stuff
964   */
965   if (!((Mat_SeqBAIJ*) baij->B->data)->nonew)  {
966     ierr = MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);CHKERRQ(ierr);
967     if (mat->was_assembled && !other_disassembled) {
968       ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
969     }
970   }
971 
972   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
973     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
974   }
975   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
976   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
977 
978 #if defined(USE_PETSC_BOPT_g)
979   if (baij->ht && mode== MAT_FINAL_ASSEMBLY) {
980     PLogInfo(0,"MatAssemblyEnd_MPIBAIJ:Average Hash Table Search in MatSetValues = %5.2f\n",
981              ((double)baij->ht_total_ct)/baij->ht_insert_ct);
982     baij->ht_total_ct  = 0;
983     baij->ht_insert_ct = 0;
984   }
985 #endif
986   if (baij->ht_flag && !baij->ht && mode == MAT_FINAL_ASSEMBLY) {
987     ierr = MatCreateHashTable_MPIBAIJ_Private(mat,baij->ht_fact); CHKERRQ(ierr);
988     mat->ops->setvalues        = MatSetValues_MPIBAIJ_HT;
989     mat->ops->setvaluesblocked = MatSetValuesBlocked_MPIBAIJ_HT;
990   }
991 
992   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
993   PetscFunctionReturn(0);
994 }
995 
996 #undef __FUNC__
997 #define __FUNC__ "MatView_MPIBAIJ_Binary"
998 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
999 {
1000   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
1001   int          ierr;
1002 
1003   PetscFunctionBegin;
1004   if (baij->size == 1) {
1005     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
1006   } else SETERRQ(PETSC_ERR_SUP,0,"Only uniprocessor output supported");
1007   PetscFunctionReturn(0);
1008 }
1009 
1010 #undef __FUNC__
1011 #define __FUNC__ "MatView_MPIBAIJ_ASCIIorDraworSocket"
1012 static int MatView_MPIBAIJ_ASCIIorDraworSocket(Mat mat,Viewer viewer)
1013 {
1014   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
1015   int          ierr, format,bs = baij->bs, size = baij->size, rank = baij->rank;
1016   FILE         *fd;
1017   ViewerType   vtype;
1018 
1019   PetscFunctionBegin;
1020   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
1021   if (PetscTypeCompare(vtype,ASCII_VIEWER)) {
1022     ierr = ViewerGetFormat(viewer,&format);
1023     if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
1024       MatInfo info;
1025       MPI_Comm_rank(mat->comm,&rank);
1026       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
1027       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
1028       PetscSequentialPhaseBegin(mat->comm,1);
1029       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
1030               rank,baij->m,(int)info.nz_used*bs,(int)info.nz_allocated*bs,
1031               baij->bs,(int)info.memory);
1032       ierr = MatGetInfo(baij->A,MAT_LOCAL,&info);
1033       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
1034       ierr = MatGetInfo(baij->B,MAT_LOCAL,&info);
1035       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
1036       fflush(fd);
1037       PetscSequentialPhaseEnd(mat->comm,1);
1038       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
1039       PetscFunctionReturn(0);
1040     } else if (format == VIEWER_FORMAT_ASCII_INFO) {
1041       PetscPrintf(mat->comm,"  block size is %d\n",bs);
1042       PetscFunctionReturn(0);
1043     }
1044   }
1045 
1046   if (PetscTypeCompare(vtype,DRAW_VIEWER)) {
1047     Draw       draw;
1048     PetscTruth isnull;
1049     ierr = ViewerDrawGetDraw(viewer,0,&draw); CHKERRQ(ierr);
1050     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) PetscFunctionReturn(0);
1051   }
1052 
1053   if (size == 1) {
1054     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
1055   } else {
1056     /* assemble the entire matrix onto first processor. */
1057     Mat         A;
1058     Mat_SeqBAIJ *Aloc;
1059     int         M = baij->M, N = baij->N,*ai,*aj,col,i,j,k,*rvals;
1060     int         mbs = baij->mbs;
1061     Scalar      *a;
1062 
1063     if (!rank) {
1064       ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);CHKERRQ(ierr);
1065     } else {
1066       ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);CHKERRQ(ierr);
1067     }
1068     PLogObjectParent(mat,A);
1069 
1070     /* copy over the A part */
1071     Aloc = (Mat_SeqBAIJ*) baij->A->data;
1072     ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1073     rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
1074 
1075     for ( i=0; i<mbs; i++ ) {
1076       rvals[0] = bs*(baij->rstart + i);
1077       for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1078       for ( j=ai[i]; j<ai[i+1]; j++ ) {
1079         col = (baij->cstart+aj[j])*bs;
1080         for (k=0; k<bs; k++ ) {
1081           ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
1082           col++; a += bs;
1083         }
1084       }
1085     }
1086     /* copy over the B part */
1087     Aloc = (Mat_SeqBAIJ*) baij->B->data;
1088     ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1089     for ( i=0; i<mbs; i++ ) {
1090       rvals[0] = bs*(baij->rstart + i);
1091       for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1092       for ( j=ai[i]; j<ai[i+1]; j++ ) {
1093         col = baij->garray[aj[j]]*bs;
1094         for (k=0; k<bs; k++ ) {
1095           ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
1096           col++; a += bs;
1097         }
1098       }
1099     }
1100     PetscFree(rvals);
1101     ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1102     ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1103     /*
1104        Everyone has to call to draw the matrix since the graphics waits are
1105        synchronized across all processors that share the Draw object
1106     */
1107     if (!rank || PetscTypeCompare(vtype,DRAW_VIEWER)) {
1108       ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
1109     }
1110     ierr = MatDestroy(A); CHKERRQ(ierr);
1111   }
1112   PetscFunctionReturn(0);
1113 }
1114 
1115 
1116 
1117 #undef __FUNC__
1118 #define __FUNC__ "MatView_MPIBAIJ"
1119 int MatView_MPIBAIJ(Mat mat,Viewer viewer)
1120 {
1121   int         ierr;
1122   ViewerType  vtype;
1123 
1124   PetscFunctionBegin;
1125   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
1126   if (PetscTypeCompare(vtype,ASCII_VIEWER) || PetscTypeCompare(vtype,DRAW_VIEWER) ||
1127       PetscTypeCompare(vtype,SOCKET_VIEWER)) {
1128     ierr = MatView_MPIBAIJ_ASCIIorDraworSocket(mat,viewer); CHKERRQ(ierr);
1129   } else if (PetscTypeCompare(vtype,BINARY_VIEWER)) {
1130     ierr = MatView_MPIBAIJ_Binary(mat,viewer);CHKERRQ(ierr);
1131   } else {
1132     SETERRQ(1,1,"Viewer type not supported by PETSc object");
1133   }
1134   PetscFunctionReturn(0);
1135 }
1136 
1137 #undef __FUNC__
1138 #define __FUNC__ "MatDestroy_MPIBAIJ"
1139 int MatDestroy_MPIBAIJ(Mat mat)
1140 {
1141   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1142   int         ierr;
1143 
1144   PetscFunctionBegin;
1145   if (--mat->refct > 0) PetscFunctionReturn(0);
1146 
1147   if (mat->mapping) {
1148     ierr = ISLocalToGlobalMappingDestroy(mat->mapping); CHKERRQ(ierr);
1149   }
1150   if (mat->bmapping) {
1151     ierr = ISLocalToGlobalMappingDestroy(mat->bmapping); CHKERRQ(ierr);
1152   }
1153   if (mat->rmap) {
1154     ierr = MapDestroy(mat->rmap);CHKERRQ(ierr);
1155   }
1156   if (mat->cmap) {
1157     ierr = MapDestroy(mat->cmap);CHKERRQ(ierr);
1158   }
1159 #if defined(USE_PETSC_LOG)
1160   PLogObjectState((PetscObject)mat,"Rows=%d, Cols=%d",baij->M,baij->N);
1161 #endif
1162 
1163   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
1164   PetscFree(baij->rowners);
1165   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
1166   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
1167 #if defined (USE_CTABLE)
1168   if (baij->colmap) TableDelete(baij->colmap);
1169 #else
1170   if (baij->colmap) PetscFree(baij->colmap);
1171 #endif
1172   if (baij->garray) PetscFree(baij->garray);
1173   if (baij->lvec)   VecDestroy(baij->lvec);
1174   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
1175   if (baij->rowvalues) PetscFree(baij->rowvalues);
1176   if (baij->barray) PetscFree(baij->barray);
1177   if (baij->hd) PetscFree(baij->hd);
1178   PetscFree(baij);
1179   PLogObjectDestroy(mat);
1180   PetscHeaderDestroy(mat);
1181   PetscFunctionReturn(0);
1182 }
1183 
1184 #undef __FUNC__
1185 #define __FUNC__ "MatMult_MPIBAIJ"
1186 int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
1187 {
1188   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1189   int         ierr, nt;
1190 
1191   PetscFunctionBegin;
1192   ierr = VecGetLocalSize(xx,&nt);CHKERRQ(ierr);
1193   if (nt != a->n) {
1194     SETERRQ(PETSC_ERR_ARG_SIZ,0,"Incompatible partition of A and xx");
1195   }
1196   ierr = VecGetLocalSize(yy,&nt);CHKERRQ(ierr);
1197   if (nt != a->m) {
1198     SETERRQ(PETSC_ERR_ARG_SIZ,0,"Incompatible parition of A and yy");
1199   }
1200   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
1201   ierr = (*a->A->ops->mult)(a->A,xx,yy); CHKERRQ(ierr);
1202   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
1203   ierr = (*a->B->ops->multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
1204   ierr = VecScatterPostRecvs(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
1205   PetscFunctionReturn(0);
1206 }
1207 
1208 #undef __FUNC__
1209 #define __FUNC__ "MatMultAdd_MPIBAIJ"
1210 int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
1211 {
1212   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1213   int        ierr;
1214 
1215   PetscFunctionBegin;
1216   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
1217   ierr = (*a->A->ops->multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
1218   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
1219   ierr = (*a->B->ops->multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
1220   PetscFunctionReturn(0);
1221 }
1222 
1223 #undef __FUNC__
1224 #define __FUNC__ "MatMultTrans_MPIBAIJ"
1225 int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
1226 {
1227   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1228   int         ierr;
1229 
1230   PetscFunctionBegin;
1231   /* do nondiagonal part */
1232   ierr = (*a->B->ops->multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
1233   /* send it on its way */
1234   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
1235   /* do local part */
1236   ierr = (*a->A->ops->multtrans)(a->A,xx,yy); CHKERRQ(ierr);
1237   /* receive remote parts: note this assumes the values are not actually */
1238   /* inserted in yy until the next line, which is true for my implementation*/
1239   /* but is not perhaps always true. */
1240   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
1241   PetscFunctionReturn(0);
1242 }
1243 
1244 #undef __FUNC__
1245 #define __FUNC__ "MatMultTransAdd_MPIBAIJ"
1246 int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
1247 {
1248   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1249   int         ierr;
1250 
1251   PetscFunctionBegin;
1252   /* do nondiagonal part */
1253   ierr = (*a->B->ops->multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
1254   /* send it on its way */
1255   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
1256   /* do local part */
1257   ierr = (*a->A->ops->multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
1258   /* receive remote parts: note this assumes the values are not actually */
1259   /* inserted in yy until the next line, which is true for my implementation*/
1260   /* but is not perhaps always true. */
1261   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
1262   PetscFunctionReturn(0);
1263 }
1264 
1265 /*
1266   This only works correctly for square matrices where the subblock A->A is the
1267    diagonal block
1268 */
1269 #undef __FUNC__
1270 #define __FUNC__ "MatGetDiagonal_MPIBAIJ"
1271 int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
1272 {
1273   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1274   int         ierr;
1275 
1276   PetscFunctionBegin;
1277   if (a->M != a->N) SETERRQ(PETSC_ERR_SUP,0,"Supports only square matrix where A->A is diag block");
1278   ierr = MatGetDiagonal(a->A,v);CHKERRQ(ierr);
1279   PetscFunctionReturn(0);
1280 }
1281 
1282 #undef __FUNC__
1283 #define __FUNC__ "MatScale_MPIBAIJ"
1284 int MatScale_MPIBAIJ(Scalar *aa,Mat A)
1285 {
1286   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1287   int         ierr;
1288 
1289   PetscFunctionBegin;
1290   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
1291   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
1292   PetscFunctionReturn(0);
1293 }
1294 
1295 #undef __FUNC__
1296 #define __FUNC__ "MatGetSize_MPIBAIJ"
1297 int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
1298 {
1299   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1300 
1301   PetscFunctionBegin;
1302   if (m) *m = mat->M;
1303   if (n) *n = mat->N;
1304   PetscFunctionReturn(0);
1305 }
1306 
1307 #undef __FUNC__
1308 #define __FUNC__ "MatGetLocalSize_MPIBAIJ"
1309 int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
1310 {
1311   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1312 
1313   PetscFunctionBegin;
1314   *m = mat->m; *n = mat->n;
1315   PetscFunctionReturn(0);
1316 }
1317 
1318 #undef __FUNC__
1319 #define __FUNC__ "MatGetOwnershipRange_MPIBAIJ"
1320 int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
1321 {
1322   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1323 
1324   PetscFunctionBegin;
1325   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
1326   PetscFunctionReturn(0);
1327 }
1328 
1329 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
1330 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
1331 
1332 #undef __FUNC__
1333 #define __FUNC__ "MatGetRow_MPIBAIJ"
1334 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
1335 {
1336   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1337   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
1338   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
1339   int        nztot, nzA, nzB, lrow, brstart = mat->rstart*bs, brend = mat->rend*bs;
1340   int        *cmap, *idx_p,cstart = mat->cstart;
1341 
1342   PetscFunctionBegin;
1343   if (mat->getrowactive == PETSC_TRUE) SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"Already active");
1344   mat->getrowactive = PETSC_TRUE;
1345 
1346   if (!mat->rowvalues && (idx || v)) {
1347     /*
1348         allocate enough space to hold information from the longest row.
1349     */
1350     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
1351     int     max = 1,mbs = mat->mbs,tmp;
1352     for ( i=0; i<mbs; i++ ) {
1353       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
1354       if (max < tmp) { max = tmp; }
1355     }
1356     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
1357     CHKPTRQ(mat->rowvalues);
1358     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
1359   }
1360 
1361   if (row < brstart || row >= brend) SETERRQ(PETSC_ERR_SUP,0,"Only local rows")
1362   lrow = row - brstart;
1363 
1364   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
1365   if (!v)   {pvA = 0; pvB = 0;}
1366   if (!idx) {pcA = 0; if (!v) pcB = 0;}
1367   ierr = (*mat->A->ops->getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1368   ierr = (*mat->B->ops->getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1369   nztot = nzA + nzB;
1370 
1371   cmap  = mat->garray;
1372   if (v  || idx) {
1373     if (nztot) {
1374       /* Sort by increasing column numbers, assuming A and B already sorted */
1375       int imark = -1;
1376       if (v) {
1377         *v = v_p = mat->rowvalues;
1378         for ( i=0; i<nzB; i++ ) {
1379           if (cmap[cworkB[i]/bs] < cstart)   v_p[i] = vworkB[i];
1380           else break;
1381         }
1382         imark = i;
1383         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
1384         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
1385       }
1386       if (idx) {
1387         *idx = idx_p = mat->rowindices;
1388         if (imark > -1) {
1389           for ( i=0; i<imark; i++ ) {
1390             idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs;
1391           }
1392         } else {
1393           for ( i=0; i<nzB; i++ ) {
1394             if (cmap[cworkB[i]/bs] < cstart)
1395               idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
1396             else break;
1397           }
1398           imark = i;
1399         }
1400         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart*bs + cworkA[i];
1401         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
1402       }
1403     } else {
1404       if (idx) *idx = 0;
1405       if (v)   *v   = 0;
1406     }
1407   }
1408   *nz = nztot;
1409   ierr = (*mat->A->ops->restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1410   ierr = (*mat->B->ops->restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1411   PetscFunctionReturn(0);
1412 }
1413 
1414 #undef __FUNC__
1415 #define __FUNC__ "MatRestoreRow_MPIBAIJ"
1416 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
1417 {
1418   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1419 
1420   PetscFunctionBegin;
1421   if (baij->getrowactive == PETSC_FALSE) {
1422     SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"MatGetRow not called");
1423   }
1424   baij->getrowactive = PETSC_FALSE;
1425   PetscFunctionReturn(0);
1426 }
1427 
1428 #undef __FUNC__
1429 #define __FUNC__ "MatGetBlockSize_MPIBAIJ"
1430 int MatGetBlockSize_MPIBAIJ(Mat mat,int *bs)
1431 {
1432   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1433 
1434   PetscFunctionBegin;
1435   *bs = baij->bs;
1436   PetscFunctionReturn(0);
1437 }
1438 
1439 #undef __FUNC__
1440 #define __FUNC__ "MatZeroEntries_MPIBAIJ"
1441 int MatZeroEntries_MPIBAIJ(Mat A)
1442 {
1443   Mat_MPIBAIJ *l = (Mat_MPIBAIJ *) A->data;
1444   int         ierr;
1445 
1446   PetscFunctionBegin;
1447   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
1448   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
1449   PetscFunctionReturn(0);
1450 }
1451 
1452 #undef __FUNC__
1453 #define __FUNC__ "MatGetInfo_MPIBAIJ"
1454 int MatGetInfo_MPIBAIJ(Mat matin,MatInfoType flag,MatInfo *info)
1455 {
1456   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) matin->data;
1457   Mat         A = a->A, B = a->B;
1458   int         ierr;
1459   double      isend[5], irecv[5];
1460 
1461   PetscFunctionBegin;
1462   info->block_size     = (double)a->bs;
1463   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
1464   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->nz_unneeded;
1465   isend[3] = info->memory;  isend[4] = info->mallocs;
1466   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
1467   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->nz_unneeded;
1468   isend[3] += info->memory;  isend[4] += info->mallocs;
1469   if (flag == MAT_LOCAL) {
1470     info->nz_used      = isend[0];
1471     info->nz_allocated = isend[1];
1472     info->nz_unneeded  = isend[2];
1473     info->memory       = isend[3];
1474     info->mallocs      = isend[4];
1475   } else if (flag == MAT_GLOBAL_MAX) {
1476     ierr = MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_MAX,matin->comm);CHKERRQ(ierr);
1477     info->nz_used      = irecv[0];
1478     info->nz_allocated = irecv[1];
1479     info->nz_unneeded  = irecv[2];
1480     info->memory       = irecv[3];
1481     info->mallocs      = irecv[4];
1482   } else if (flag == MAT_GLOBAL_SUM) {
1483     ierr = MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_SUM,matin->comm);CHKERRQ(ierr);
1484     info->nz_used      = irecv[0];
1485     info->nz_allocated = irecv[1];
1486     info->nz_unneeded  = irecv[2];
1487     info->memory       = irecv[3];
1488     info->mallocs      = irecv[4];
1489   }
1490   info->rows_global       = (double)a->M;
1491   info->columns_global    = (double)a->N;
1492   info->rows_local        = (double)a->m;
1493   info->columns_local     = (double)a->N;
1494   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
1495   info->fill_ratio_needed = 0;
1496   info->factor_mallocs    = 0;
1497   PetscFunctionReturn(0);
1498 }
1499 
1500 #undef __FUNC__
1501 #define __FUNC__ "MatSetOption_MPIBAIJ"
1502 int MatSetOption_MPIBAIJ(Mat A,MatOption op)
1503 {
1504   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1505 
1506   PetscFunctionBegin;
1507   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
1508       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
1509       op == MAT_COLUMNS_UNSORTED ||
1510       op == MAT_COLUMNS_SORTED ||
1511       op == MAT_NEW_NONZERO_ALLOCATION_ERR ||
1512       op == MAT_NEW_NONZERO_LOCATION_ERR) {
1513         MatSetOption(a->A,op);
1514         MatSetOption(a->B,op);
1515   } else if (op == MAT_ROW_ORIENTED) {
1516         a->roworiented = 1;
1517         MatSetOption(a->A,op);
1518         MatSetOption(a->B,op);
1519   } else if (op == MAT_ROWS_SORTED ||
1520              op == MAT_ROWS_UNSORTED ||
1521              op == MAT_SYMMETRIC ||
1522              op == MAT_STRUCTURALLY_SYMMETRIC ||
1523              op == MAT_YES_NEW_DIAGONALS ||
1524              op == MAT_USE_HASH_TABLE)
1525     PLogInfo(A,"Info:MatSetOption_MPIBAIJ:Option ignored\n");
1526   else if (op == MAT_COLUMN_ORIENTED) {
1527     a->roworiented = 0;
1528     MatSetOption(a->A,op);
1529     MatSetOption(a->B,op);
1530   } else if (op == MAT_IGNORE_OFF_PROC_ENTRIES) {
1531     a->donotstash = 1;
1532   } else if (op == MAT_NO_NEW_DIAGONALS) {
1533     SETERRQ(PETSC_ERR_SUP,0,"MAT_NO_NEW_DIAGONALS");
1534   } else if (op == MAT_USE_HASH_TABLE) {
1535     a->ht_flag = 1;
1536   } else {
1537     SETERRQ(PETSC_ERR_SUP,0,"unknown option");
1538   }
1539   PetscFunctionReturn(0);
1540 }
1541 
1542 #undef __FUNC__
1543 #define __FUNC__ "MatTranspose_MPIBAIJ("
1544 int MatTranspose_MPIBAIJ(Mat A,Mat *matout)
1545 {
1546   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) A->data;
1547   Mat_SeqBAIJ *Aloc;
1548   Mat        B;
1549   int        ierr,M=baij->M,N=baij->N,*ai,*aj,i,*rvals,j,k,col;
1550   int        bs=baij->bs,mbs=baij->mbs;
1551   Scalar     *a;
1552 
1553   PetscFunctionBegin;
1554   if (matout == PETSC_NULL && M != N) SETERRQ(PETSC_ERR_ARG_SIZ,0,"Square matrix only for in-place");
1555   ierr = MatCreateMPIBAIJ(A->comm,baij->bs,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);
1556   CHKERRQ(ierr);
1557 
1558   /* copy over the A part */
1559   Aloc = (Mat_SeqBAIJ*) baij->A->data;
1560   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1561   rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
1562 
1563   for ( i=0; i<mbs; i++ ) {
1564     rvals[0] = bs*(baij->rstart + i);
1565     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1566     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1567       col = (baij->cstart+aj[j])*bs;
1568       for (k=0; k<bs; k++ ) {
1569         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1570         col++; a += bs;
1571       }
1572     }
1573   }
1574   /* copy over the B part */
1575   Aloc = (Mat_SeqBAIJ*) baij->B->data;
1576   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1577   for ( i=0; i<mbs; i++ ) {
1578     rvals[0] = bs*(baij->rstart + i);
1579     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1580     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1581       col = baij->garray[aj[j]]*bs;
1582       for (k=0; k<bs; k++ ) {
1583         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1584         col++; a += bs;
1585       }
1586     }
1587   }
1588   PetscFree(rvals);
1589   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1590   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1591 
1592   if (matout != PETSC_NULL) {
1593     *matout = B;
1594   } else {
1595     PetscOps *Abops;
1596     MatOps   Aops;
1597 
1598     /* This isn't really an in-place transpose .... but free data structures from baij */
1599     PetscFree(baij->rowners);
1600     ierr = MatDestroy(baij->A); CHKERRQ(ierr);
1601     ierr = MatDestroy(baij->B); CHKERRQ(ierr);
1602 #if defined (USE_CTABLE)
1603     if (baij->colmap) TableDelete(baij->colmap);
1604 #else
1605     if (baij->colmap) PetscFree(baij->colmap);
1606 #endif
1607     if (baij->garray) PetscFree(baij->garray);
1608     if (baij->lvec) VecDestroy(baij->lvec);
1609     if (baij->Mvctx) VecScatterDestroy(baij->Mvctx);
1610     PetscFree(baij);
1611 
1612     /*
1613        This is horrible, horrible code. We need to keep the
1614       A pointers for the bops and ops but copy everything
1615       else from C.
1616     */
1617     Abops = A->bops;
1618     Aops  = A->ops;
1619     PetscMemcpy(A,B,sizeof(struct _p_Mat));
1620     A->bops = Abops;
1621     A->ops  = Aops;
1622 
1623     PetscHeaderDestroy(B);
1624   }
1625   PetscFunctionReturn(0);
1626 }
1627 
1628 #undef __FUNC__
1629 #define __FUNC__ "MatDiagonalScale_MPIBAIJ"
1630 int MatDiagonalScale_MPIBAIJ(Mat A,Vec ll,Vec rr)
1631 {
1632   Mat a = ((Mat_MPIBAIJ *) A->data)->A;
1633   Mat b = ((Mat_MPIBAIJ *) A->data)->B;
1634   int ierr,s1,s2,s3;
1635 
1636   PetscFunctionBegin;
1637   if (ll)  {
1638     ierr = VecGetLocalSize(ll,&s1); CHKERRQ(ierr);
1639     ierr = MatGetLocalSize(A,&s2,&s3); CHKERRQ(ierr);
1640     if (s1!=s2) SETERRQ(PETSC_ERR_ARG_SIZ,0,"non-conforming local sizes");
1641     ierr = MatDiagonalScale(a,ll,0); CHKERRQ(ierr);
1642     ierr = MatDiagonalScale(b,ll,0); CHKERRQ(ierr);
1643   }
1644   if (rr) SETERRQ(PETSC_ERR_SUP,0,"not supported for right vector");
1645   PetscFunctionReturn(0);
1646 }
1647 
1648 #undef __FUNC__
1649 #define __FUNC__ "MatZeroRows_MPIBAIJ"
1650 int MatZeroRows_MPIBAIJ(Mat A,IS is,Scalar *diag)
1651 {
1652   Mat_MPIBAIJ    *l = (Mat_MPIBAIJ *) A->data;
1653   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
1654   int            *procs,*nprocs,j,found,idx,nsends,*work,row;
1655   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
1656   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
1657   int            *lens,imdex,*lrows,*values,bs=l->bs,rstart_bs=l->rstart_bs;
1658   MPI_Comm       comm = A->comm;
1659   MPI_Request    *send_waits,*recv_waits;
1660   MPI_Status     recv_status,*send_status;
1661   IS             istmp;
1662 
1663   PetscFunctionBegin;
1664   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
1665   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
1666 
1667   /*  first count number of contributors to each processor */
1668   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
1669   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
1670   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
1671   for ( i=0; i<N; i++ ) {
1672     idx = rows[i];
1673     found = 0;
1674     for ( j=0; j<size; j++ ) {
1675       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
1676         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
1677       }
1678     }
1679     if (!found) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Index out of range");
1680   }
1681   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
1682 
1683   /* inform other processors of number of messages and max length*/
1684   work   = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
1685   ierr   = MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
1686   nrecvs = work[rank];
1687   ierr   = MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);CHKERRQ(ierr);
1688   nmax   = work[rank];
1689   PetscFree(work);
1690 
1691   /* post receives:   */
1692   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); CHKPTRQ(rvalues);
1693   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
1694   for ( i=0; i<nrecvs; i++ ) {
1695     ierr = MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);CHKERRQ(ierr);
1696   }
1697 
1698   /* do sends:
1699      1) starts[i] gives the starting index in svalues for stuff going to
1700      the ith processor
1701   */
1702   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
1703   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
1704   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
1705   starts[0] = 0;
1706   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1707   for ( i=0; i<N; i++ ) {
1708     svalues[starts[owner[i]]++] = rows[i];
1709   }
1710   ISRestoreIndices(is,&rows);
1711 
1712   starts[0] = 0;
1713   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1714   count = 0;
1715   for ( i=0; i<size; i++ ) {
1716     if (procs[i]) {
1717       ierr = MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);CHKERRQ(ierr);
1718     }
1719   }
1720   PetscFree(starts);
1721 
1722   base = owners[rank]*bs;
1723 
1724   /*  wait on receives */
1725   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
1726   source = lens + nrecvs;
1727   count  = nrecvs; slen = 0;
1728   while (count) {
1729     ierr = MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);CHKERRQ(ierr);
1730     /* unpack receives into our local space */
1731     ierr = MPI_Get_count(&recv_status,MPI_INT,&n);CHKERRQ(ierr);
1732     source[imdex]  = recv_status.MPI_SOURCE;
1733     lens[imdex]  = n;
1734     slen += n;
1735     count--;
1736   }
1737   PetscFree(recv_waits);
1738 
1739   /* move the data into the send scatter */
1740   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
1741   count = 0;
1742   for ( i=0; i<nrecvs; i++ ) {
1743     values = rvalues + i*nmax;
1744     for ( j=0; j<lens[i]; j++ ) {
1745       lrows[count++] = values[j] - base;
1746     }
1747   }
1748   PetscFree(rvalues); PetscFree(lens);
1749   PetscFree(owner); PetscFree(nprocs);
1750 
1751   /* actually zap the local rows */
1752   ierr = ISCreateGeneral(PETSC_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
1753   PLogObjectParent(A,istmp);
1754 
1755   /*
1756         Zero the required rows. If the "diagonal block" of the matrix
1757      is square and the user wishes to set the diagonal we use seperate
1758      code so that MatSetValues() is not called for each diagonal allocating
1759      new memory, thus calling lots of mallocs and slowing things down.
1760 
1761        Contributed by: Mathew Knepley
1762   */
1763   /* must zero l->B before l->A because the (diag) case below may put values into l->B*/
1764   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
1765   if (diag && (l->A->M == l->A->N)) {
1766     ierr      = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
1767   } else if (diag) {
1768     ierr = MatZeroRows(l->A,istmp,0); CHKERRQ(ierr);
1769     for ( i = 0; i < slen; i++ ) {
1770       row  = lrows[i] + rstart_bs;
1771       ierr = MatSetValues(A,1,&row,1,&row,diag,INSERT_VALUES);CHKERRQ(ierr);
1772     }
1773     ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
1774     ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
1775   } else {
1776     ierr = MatZeroRows(l->A,istmp,0); CHKERRQ(ierr);
1777   }
1778 
1779   ierr = ISDestroy(istmp); CHKERRQ(ierr);
1780   PetscFree(lrows);
1781 
1782   /* wait on sends */
1783   if (nsends) {
1784     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
1785     ierr        = MPI_Waitall(nsends,send_waits,send_status);CHKERRQ(ierr);
1786     PetscFree(send_status);
1787   }
1788   PetscFree(send_waits); PetscFree(svalues);
1789 
1790   PetscFunctionReturn(0);
1791 }
1792 
1793 extern int MatPrintHelp_SeqBAIJ(Mat);
1794 #undef __FUNC__
1795 #define __FUNC__ "MatPrintHelp_MPIBAIJ"
1796 int MatPrintHelp_MPIBAIJ(Mat A)
1797 {
1798   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1799   MPI_Comm    comm = A->comm;
1800   static int  called = 0;
1801   int         ierr;
1802 
1803   PetscFunctionBegin;
1804   if (!a->rank) {
1805     ierr = MatPrintHelp_SeqBAIJ(a->A);CHKERRQ(ierr);
1806   }
1807   if (called) {PetscFunctionReturn(0);} else called = 1;
1808   (*PetscHelpPrintf)(comm," Options for MATMPIBAIJ matrix format (the defaults):\n");
1809   (*PetscHelpPrintf)(comm,"  -mat_use_hash_table <factor>: Use hashtable for efficient matrix assembly\n");
1810   PetscFunctionReturn(0);
1811 }
1812 
1813 #undef __FUNC__
1814 #define __FUNC__ "MatSetUnfactored_MPIBAIJ"
1815 int MatSetUnfactored_MPIBAIJ(Mat A)
1816 {
1817   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1818   int         ierr;
1819 
1820   PetscFunctionBegin;
1821   ierr = MatSetUnfactored(a->A); CHKERRQ(ierr);
1822   PetscFunctionReturn(0);
1823 }
1824 
1825 static int MatDuplicate_MPIBAIJ(Mat,MatDuplicateOption,Mat *);
1826 
1827 /* -------------------------------------------------------------------*/
1828 static struct _MatOps MatOps_Values = {
1829   MatSetValues_MPIBAIJ,
1830   MatGetRow_MPIBAIJ,
1831   MatRestoreRow_MPIBAIJ,
1832   MatMult_MPIBAIJ,
1833   MatMultAdd_MPIBAIJ,
1834   MatMultTrans_MPIBAIJ,
1835   MatMultTransAdd_MPIBAIJ,
1836   0,
1837   0,
1838   0,
1839   0,
1840   0,
1841   0,
1842   0,
1843   MatTranspose_MPIBAIJ,
1844   MatGetInfo_MPIBAIJ,
1845   0,
1846   MatGetDiagonal_MPIBAIJ,
1847   MatDiagonalScale_MPIBAIJ,
1848   MatNorm_MPIBAIJ,
1849   MatAssemblyBegin_MPIBAIJ,
1850   MatAssemblyEnd_MPIBAIJ,
1851   0,
1852   MatSetOption_MPIBAIJ,
1853   MatZeroEntries_MPIBAIJ,
1854   MatZeroRows_MPIBAIJ,
1855   0,
1856   0,
1857   0,
1858   0,
1859   MatGetSize_MPIBAIJ,
1860   MatGetLocalSize_MPIBAIJ,
1861   MatGetOwnershipRange_MPIBAIJ,
1862   0,
1863   0,
1864   0,
1865   0,
1866   MatDuplicate_MPIBAIJ,
1867   0,
1868   0,
1869   0,
1870   0,
1871   0,
1872   MatGetSubMatrices_MPIBAIJ,
1873   MatIncreaseOverlap_MPIBAIJ,
1874   MatGetValues_MPIBAIJ,
1875   0,
1876   MatPrintHelp_MPIBAIJ,
1877   MatScale_MPIBAIJ,
1878   0,
1879   0,
1880   0,
1881   MatGetBlockSize_MPIBAIJ,
1882   0,
1883   0,
1884   0,
1885   0,
1886   0,
1887   0,
1888   MatSetUnfactored_MPIBAIJ,
1889   0,
1890   MatSetValuesBlocked_MPIBAIJ,
1891   0,
1892   0,
1893   0,
1894   MatGetMaps_Petsc};
1895 
1896 
1897 EXTERN_C_BEGIN
1898 #undef __FUNC__
1899 #define __FUNC__ "MatGetDiagonalBlock_MPIBAIJ"
1900 int MatGetDiagonalBlock_MPIBAIJ(Mat A,PetscTruth *iscopy,MatReuse reuse,Mat *a)
1901 {
1902   PetscFunctionBegin;
1903   *a      = ((Mat_MPIBAIJ *)A->data)->A;
1904   *iscopy = PETSC_FALSE;
1905   PetscFunctionReturn(0);
1906 }
1907 EXTERN_C_END
1908 
1909 #undef __FUNC__
1910 #define __FUNC__ "MatCreateMPIBAIJ"
1911 /*@C
1912    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
1913    (block compressed row).  For good matrix assembly performance
1914    the user should preallocate the matrix storage by setting the parameters
1915    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1916    performance can be increased by more than a factor of 50.
1917 
1918    Collective on MPI_Comm
1919 
1920    Input Parameters:
1921 +  comm - MPI communicator
1922 .  bs   - size of blockk
1923 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1924            This value should be the same as the local size used in creating the
1925            y vector for the matrix-vector product y = Ax.
1926 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1927            This value should be the same as the local size used in creating the
1928            x vector for the matrix-vector product y = Ax.
1929 .  M - number of global rows (or PETSC_DETERMINE to have calculated if m is given)
1930 .  N - number of global columns (or PETSC_DETERMINE to have calculated if n is given)
1931 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
1932            submatrix  (same for all local rows)
1933 .  d_nzz - array containing the number of block nonzeros in the various block rows
1934            of the in diagonal portion of the local (possibly different for each block
1935            row) or PETSC_NULL.  You must leave room for the diagonal entry even if it is zero.
1936 .  o_nz  - number of block nonzeros per block row in the off-diagonal portion of local
1937            submatrix (same for all local rows).
1938 -  o_nzz - array containing the number of nonzeros in the various block rows of the
1939            off-diagonal portion of the local submatrix (possibly different for
1940            each block row) or PETSC_NULL.
1941 
1942    Output Parameter:
1943 .  A - the matrix
1944 
1945    Options Database Keys:
1946 .   -mat_no_unroll - uses code that does not unroll the loops in the
1947                      block calculations (much slower)
1948 .   -mat_block_size - size of the blocks to use
1949 .   -mat_mpi - use the parallel matrix data structures even on one processor
1950                (defaults to using SeqBAIJ format on one processor)
1951 
1952    Notes:
1953    The user MUST specify either the local or global matrix dimensions
1954    (possibly both).
1955 
1956    If PETSC_DECIDE or  PETSC_DETERMINE is used for a particular argument on one processor
1957    than it must be used on all processors that share the object for that argument.
1958 
1959    Storage Information:
1960    For a square global matrix we define each processor's diagonal portion
1961    to be its local rows and the corresponding columns (a square submatrix);
1962    each processor's off-diagonal portion encompasses the remainder of the
1963    local matrix (a rectangular submatrix).
1964 
1965    The user can specify preallocated storage for the diagonal part of
1966    the local submatrix with either d_nz or d_nnz (not both).  Set
1967    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1968    memory allocation.  Likewise, specify preallocated storage for the
1969    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1970 
1971    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1972    the figure below we depict these three local rows and all columns (0-11).
1973 
1974 .vb
1975            0 1 2 3 4 5 6 7 8 9 10 11
1976           -------------------
1977    row 3  |  o o o d d d o o o o o o
1978    row 4  |  o o o d d d o o o o o o
1979    row 5  |  o o o d d d o o o o o o
1980           -------------------
1981 .ve
1982 
1983    Thus, any entries in the d locations are stored in the d (diagonal)
1984    submatrix, and any entries in the o locations are stored in the
1985    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1986    stored simply in the MATSEQBAIJ format for compressed row storage.
1987 
1988    Now d_nz should indicate the number of block nonzeros per row in the d matrix,
1989    and o_nz should indicate the number of block nonzeros per row in the o matrix.
1990    In general, for PDE problems in which most nonzeros are near the diagonal,
1991    one expects d_nz >> o_nz.   For large problems you MUST preallocate memory
1992    or you will get TERRIBLE performance; see the users' manual chapter on
1993    matrices.
1994 
1995    Level: intermediate
1996 
1997 .keywords: matrix, block, aij, compressed row, sparse, parallel
1998 
1999 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues(), MatCreateMPIAIJ()
2000 @*/
2001 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
2002                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
2003 {
2004   Mat          B;
2005   Mat_MPIBAIJ  *b;
2006   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE,size,flg;
2007   int          flag1 = 0,flag2 = 0;
2008 
2009   PetscFunctionBegin;
2010   if (bs < 1) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Invalid block size specified, must be positive");
2011 
2012   MPI_Comm_size(comm,&size);
2013   ierr = OptionsHasName(PETSC_NULL,"-mat_mpibaij",&flag1); CHKERRQ(ierr);
2014   ierr = OptionsHasName(PETSC_NULL,"-mat_mpi",&flag2); CHKERRQ(ierr);
2015   if (!flag1 && !flag2 && size == 1) {
2016     if (M == PETSC_DECIDE) M = m;
2017     if (N == PETSC_DECIDE) N = n;
2018     ierr = MatCreateSeqBAIJ(comm,bs,M,N,d_nz,d_nnz,A); CHKERRQ(ierr);
2019     PetscFunctionReturn(0);
2020   }
2021 
2022   *A = 0;
2023   PetscHeaderCreate(B,_p_Mat,struct _MatOps,MAT_COOKIE,MATMPIBAIJ,"Mat",comm,MatDestroy,MatView);
2024   PLogObjectCreate(B);
2025   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
2026   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
2027   PetscMemcpy(B->ops,&MatOps_Values,sizeof(struct _MatOps));
2028 
2029   B->ops->destroy    = MatDestroy_MPIBAIJ;
2030   B->ops->view       = MatView_MPIBAIJ;
2031   B->mapping    = 0;
2032   B->factor     = 0;
2033   B->assembled  = PETSC_FALSE;
2034 
2035   B->insertmode = NOT_SET_VALUES;
2036   MPI_Comm_rank(comm,&b->rank);
2037   MPI_Comm_size(comm,&b->size);
2038 
2039   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL)) {
2040     SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
2041   }
2042   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) {
2043     SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"either M or m should be specified");
2044   }
2045   if ( N == PETSC_DECIDE && n == PETSC_DECIDE) {
2046     SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"either N or n should be specified");
2047   }
2048   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
2049   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
2050 
2051   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
2052     work[0] = m; work[1] = n;
2053     mbs = m/bs; nbs = n/bs;
2054     ierr = MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );CHKERRQ(ierr);
2055     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
2056     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
2057   }
2058   if (m == PETSC_DECIDE) {
2059     Mbs = M/bs;
2060     if (Mbs*bs != M) SETERRQ(PETSC_ERR_ARG_SIZ,0,"No of global rows must be divisible by blocksize");
2061     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
2062     m   = mbs*bs;
2063   }
2064   if (n == PETSC_DECIDE) {
2065     Nbs = N/bs;
2066     if (Nbs*bs != N) SETERRQ(PETSC_ERR_ARG_SIZ,0,"No of global cols must be divisible by blocksize");
2067     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
2068     n   = nbs*bs;
2069   }
2070   if (mbs*bs != m || nbs*bs != n) {
2071     SETERRQ(PETSC_ERR_ARG_SIZ,0,"No of local rows, cols must be divisible by blocksize");
2072   }
2073 
2074   b->m = m; B->m = m;
2075   b->n = n; B->n = n;
2076   b->N = N; B->N = N;
2077   b->M = M; B->M = M;
2078   b->bs  = bs;
2079   b->bs2 = bs*bs;
2080   b->mbs = mbs;
2081   b->nbs = nbs;
2082   b->Mbs = Mbs;
2083   b->Nbs = Nbs;
2084 
2085   /* the information in the maps duplicates the information computed below, eventually
2086      we should remove the duplicate information that is not contained in the maps */
2087   ierr = MapCreateMPI(comm,m,M,&B->rmap);CHKERRQ(ierr);
2088   ierr = MapCreateMPI(comm,n,N,&B->cmap);CHKERRQ(ierr);
2089 
2090   /* build local table of row and column ownerships */
2091   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
2092   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIBAIJ));
2093   b->cowners = b->rowners + b->size + 2;
2094   ierr = MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
2095   b->rowners[0] = 0;
2096   for ( i=2; i<=b->size; i++ ) {
2097     b->rowners[i] += b->rowners[i-1];
2098   }
2099   b->rstart    = b->rowners[b->rank];
2100   b->rend      = b->rowners[b->rank+1];
2101   b->rstart_bs = b->rstart * bs;
2102   b->rend_bs   = b->rend * bs;
2103 
2104   ierr = MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
2105   b->cowners[0] = 0;
2106   for ( i=2; i<=b->size; i++ ) {
2107     b->cowners[i] += b->cowners[i-1];
2108   }
2109   b->cstart    = b->cowners[b->rank];
2110   b->cend      = b->cowners[b->rank+1];
2111   b->cstart_bs = b->cstart * bs;
2112   b->cend_bs   = b->cend * bs;
2113 
2114 
2115   if (d_nz == PETSC_DEFAULT) d_nz = 5;
2116   ierr = MatCreateSeqBAIJ(PETSC_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
2117   PLogObjectParent(B,b->A);
2118   if (o_nz == PETSC_DEFAULT) o_nz = 0;
2119   ierr = MatCreateSeqBAIJ(PETSC_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
2120   PLogObjectParent(B,b->B);
2121 
2122   /* build cache for off array entries formed */
2123   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
2124   b->donotstash  = 0;
2125   b->colmap      = 0;
2126   b->garray      = 0;
2127   b->roworiented = 1;
2128 
2129   /* stuff used in block assembly */
2130   b->barray       = 0;
2131 
2132   /* stuff used for matrix vector multiply */
2133   b->lvec         = 0;
2134   b->Mvctx        = 0;
2135 
2136   /* stuff for MatGetRow() */
2137   b->rowindices   = 0;
2138   b->rowvalues    = 0;
2139   b->getrowactive = PETSC_FALSE;
2140 
2141   /* hash table stuff */
2142   b->ht           = 0;
2143   b->hd           = 0;
2144   b->ht_size      = 0;
2145   b->ht_flag      = 0;
2146   b->ht_fact      = 0;
2147   b->ht_total_ct  = 0;
2148   b->ht_insert_ct = 0;
2149 
2150   *A = B;
2151   ierr = OptionsHasName(PETSC_NULL,"-mat_use_hash_table",&flg); CHKERRQ(ierr);
2152   if (flg) {
2153     double fact = 1.39;
2154     ierr = MatSetOption(B,MAT_USE_HASH_TABLE); CHKERRQ(ierr);
2155     ierr = OptionsGetDouble(PETSC_NULL,"-mat_use_hash_table",&fact,&flg); CHKERRQ(ierr);
2156     if (fact <= 1.0) fact = 1.39;
2157     ierr = MatMPIBAIJSetHashTableFactor(B,fact); CHKERRQ(ierr);
2158     PLogInfo(0,"MatCreateMPIBAIJ:Hash table Factor used %5.2f\n",fact);
2159   }
2160   ierr = PetscObjectComposeFunction((PetscObject)B,"MatGetDiagonalBlock_C",
2161                                      "MatGetDiagonalBlock_MPIBAIJ",
2162                                      (void*)MatGetDiagonalBlock_MPIBAIJ);CHKERRQ(ierr);
2163   PetscFunctionReturn(0);
2164 }
2165 
2166 #undef __FUNC__
2167 #define __FUNC__ "MatDuplicate_MPIBAIJ"
2168 static int MatDuplicate_MPIBAIJ(Mat matin,MatDuplicateOption cpvalues,Mat *newmat)
2169 {
2170   Mat         mat;
2171   Mat_MPIBAIJ *a,*oldmat = (Mat_MPIBAIJ *) matin->data;
2172   int         ierr, len=0, flg;
2173 
2174   PetscFunctionBegin;
2175   *newmat       = 0;
2176   PetscHeaderCreate(mat,_p_Mat,struct _MatOps,MAT_COOKIE,MATMPIBAIJ,"Mat",matin->comm,MatDestroy,MatView);
2177   PLogObjectCreate(mat);
2178   mat->data       = (void *) (a = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(a);
2179   PetscMemcpy(mat->ops,&MatOps_Values,sizeof(struct _MatOps));
2180   mat->ops->destroy    = MatDestroy_MPIBAIJ;
2181   mat->ops->view       = MatView_MPIBAIJ;
2182   mat->factor          = matin->factor;
2183   mat->assembled       = PETSC_TRUE;
2184 
2185   a->m = mat->m   = oldmat->m;
2186   a->n = mat->n   = oldmat->n;
2187   a->M = mat->M   = oldmat->M;
2188   a->N = mat->N   = oldmat->N;
2189 
2190   a->bs  = oldmat->bs;
2191   a->bs2 = oldmat->bs2;
2192   a->mbs = oldmat->mbs;
2193   a->nbs = oldmat->nbs;
2194   a->Mbs = oldmat->Mbs;
2195   a->Nbs = oldmat->Nbs;
2196 
2197   a->rstart       = oldmat->rstart;
2198   a->rend         = oldmat->rend;
2199   a->cstart       = oldmat->cstart;
2200   a->cend         = oldmat->cend;
2201   a->size         = oldmat->size;
2202   a->rank         = oldmat->rank;
2203   mat->insertmode = NOT_SET_VALUES;
2204   a->rowvalues    = 0;
2205   a->getrowactive = PETSC_FALSE;
2206   a->barray       = 0;
2207 
2208   /* hash table stuff */
2209   a->ht           = 0;
2210   a->hd           = 0;
2211   a->ht_size      = 0;
2212   a->ht_flag      = oldmat->ht_flag;
2213   a->ht_fact      = oldmat->ht_fact;
2214   a->ht_total_ct  = 0;
2215   a->ht_insert_ct = 0;
2216 
2217 
2218   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
2219   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIBAIJ));
2220   a->cowners = a->rowners + a->size + 2;
2221   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
2222   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
2223   if (oldmat->colmap) {
2224 #if defined (USE_CTABLE)
2225   ierr = TableCreateCopy( &a->colmap,oldmat->colmap ); CHKERRQ(ierr);
2226 #else
2227     a->colmap = (int *) PetscMalloc((a->Nbs)*sizeof(int));CHKPTRQ(a->colmap);
2228     PLogObjectMemory(mat,(a->Nbs)*sizeof(int));
2229     PetscMemcpy(a->colmap,oldmat->colmap,(a->Nbs)*sizeof(int));
2230 #endif
2231   } else a->colmap = 0;
2232   if (oldmat->garray && (len = ((Mat_SeqBAIJ *) (oldmat->B->data))->nbs)) {
2233     a->garray = (int *) PetscMalloc(len*sizeof(int)); CHKPTRQ(a->garray);
2234     PLogObjectMemory(mat,len*sizeof(int));
2235     PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
2236   } else a->garray = 0;
2237 
2238   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
2239   PLogObjectParent(mat,a->lvec);
2240   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
2241 
2242   PLogObjectParent(mat,a->Mvctx);
2243   ierr =  MatDuplicate(oldmat->A,cpvalues,&a->A); CHKERRQ(ierr);
2244   PLogObjectParent(mat,a->A);
2245   ierr =  MatDuplicate(oldmat->B,cpvalues,&a->B); CHKERRQ(ierr);
2246   PLogObjectParent(mat,a->B);
2247   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
2248   ierr = FListDuplicate(mat->qlist,&matin->qlist);CHKERRQ(ierr);
2249   if (flg) {
2250     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
2251   }
2252   *newmat = mat;
2253   PetscFunctionReturn(0);
2254 }
2255 
2256 #include "sys.h"
2257 
2258 #undef __FUNC__
2259 #define __FUNC__ "MatLoad_MPIBAIJ"
2260 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
2261 {
2262   Mat          A;
2263   int          i, nz, ierr, j,rstart, rend, fd;
2264   Scalar       *vals,*buf;
2265   MPI_Comm     comm = ((PetscObject)viewer)->comm;
2266   MPI_Status   status;
2267   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
2268   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
2269   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,Mbs,mbs,extra_rows;
2270   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
2271   int          dcount,kmax,k,nzcount,tmp;
2272 
2273   PetscFunctionBegin;
2274   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
2275 
2276   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
2277   if (!rank) {
2278     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
2279     ierr = PetscBinaryRead(fd,(char *)header,4,PETSC_INT); CHKERRQ(ierr);
2280     if (header[0] != MAT_COOKIE) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"not matrix object");
2281     if (header[3] < 0) {
2282       SETERRQ(PETSC_ERR_FILE_UNEXPECTED,1,"Matrix stored in special format, cannot load as MPIBAIJ");
2283     }
2284   }
2285 
2286   ierr = MPI_Bcast(header+1,3,MPI_INT,0,comm);CHKERRQ(ierr);
2287   M = header[1]; N = header[2];
2288 
2289   if (M != N) SETERRQ(PETSC_ERR_SUP,0,"Can only do square matrices");
2290 
2291   /*
2292      This code adds extra rows to make sure the number of rows is
2293      divisible by the blocksize
2294   */
2295   Mbs        = M/bs;
2296   extra_rows = bs - M + bs*(Mbs);
2297   if (extra_rows == bs) extra_rows = 0;
2298   else                  Mbs++;
2299   if (extra_rows &&!rank) {
2300     PLogInfo(0,"MatLoad_MPIBAIJ:Padding loaded matrix to match blocksize\n");
2301   }
2302 
2303   /* determine ownership of all rows */
2304   mbs = Mbs/size + ((Mbs % size) > rank);
2305   m   = mbs * bs;
2306   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
2307   browners = rowners + size + 1;
2308   ierr = MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
2309   rowners[0] = 0;
2310   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
2311   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
2312   rstart = rowners[rank];
2313   rend   = rowners[rank+1];
2314 
2315   /* distribute row lengths to all processors */
2316   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
2317   if (!rank) {
2318     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
2319     ierr = PetscBinaryRead(fd,rowlengths,M,PETSC_INT); CHKERRQ(ierr);
2320     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
2321     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
2322     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
2323     ierr = MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);CHKERRQ(ierr);
2324     PetscFree(sndcounts);
2325   } else {
2326     ierr = MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);CHKERRQ(ierr);
2327   }
2328 
2329   if (!rank) {
2330     /* calculate the number of nonzeros on each processor */
2331     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
2332     PetscMemzero(procsnz,size*sizeof(int));
2333     for ( i=0; i<size; i++ ) {
2334       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
2335         procsnz[i] += rowlengths[j];
2336       }
2337     }
2338     PetscFree(rowlengths);
2339 
2340     /* determine max buffer needed and allocate it */
2341     maxnz = 0;
2342     for ( i=0; i<size; i++ ) {
2343       maxnz = PetscMax(maxnz,procsnz[i]);
2344     }
2345     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
2346 
2347     /* read in my part of the matrix column indices  */
2348     nz = procsnz[0];
2349     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
2350     mycols = ibuf;
2351     if (size == 1)  nz -= extra_rows;
2352     ierr = PetscBinaryRead(fd,mycols,nz,PETSC_INT); CHKERRQ(ierr);
2353     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
2354 
2355     /* read in every ones (except the last) and ship off */
2356     for ( i=1; i<size-1; i++ ) {
2357       nz   = procsnz[i];
2358       ierr = PetscBinaryRead(fd,cols,nz,PETSC_INT); CHKERRQ(ierr);
2359       ierr = MPI_Send(cols,nz,MPI_INT,i,tag,comm);CHKERRQ(ierr);
2360     }
2361     /* read in the stuff for the last proc */
2362     if ( size != 1 ) {
2363       nz   = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
2364       ierr = PetscBinaryRead(fd,cols,nz,PETSC_INT); CHKERRQ(ierr);
2365       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
2366       ierr = MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);CHKERRQ(ierr);
2367     }
2368     PetscFree(cols);
2369   } else {
2370     /* determine buffer space needed for message */
2371     nz = 0;
2372     for ( i=0; i<m; i++ ) {
2373       nz += locrowlens[i];
2374     }
2375     ibuf   = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
2376     mycols = ibuf;
2377     /* receive message of column indices*/
2378     ierr = MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);CHKERRQ(ierr);
2379     ierr = MPI_Get_count(&status,MPI_INT,&maxnz);CHKERRQ(ierr);
2380     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"something is wrong with file");
2381   }
2382 
2383   /* loop over local rows, determining number of off diagonal entries */
2384   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
2385   odlens = dlens + (rend-rstart);
2386   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
2387   PetscMemzero(mask,3*Mbs*sizeof(int));
2388   masked1 = mask    + Mbs;
2389   masked2 = masked1 + Mbs;
2390   rowcount = 0; nzcount = 0;
2391   for ( i=0; i<mbs; i++ ) {
2392     dcount  = 0;
2393     odcount = 0;
2394     for ( j=0; j<bs; j++ ) {
2395       kmax = locrowlens[rowcount];
2396       for ( k=0; k<kmax; k++ ) {
2397         tmp = mycols[nzcount++]/bs;
2398         if (!mask[tmp]) {
2399           mask[tmp] = 1;
2400           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
2401           else masked1[dcount++] = tmp;
2402         }
2403       }
2404       rowcount++;
2405     }
2406 
2407     dlens[i]  = dcount;
2408     odlens[i] = odcount;
2409 
2410     /* zero out the mask elements we set */
2411     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
2412     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
2413   }
2414 
2415   /* create our matrix */
2416   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);
2417          CHKERRQ(ierr);
2418   A = *newmat;
2419   MatSetOption(A,MAT_COLUMNS_SORTED);
2420 
2421   if (!rank) {
2422     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
2423     /* read in my part of the matrix numerical values  */
2424     nz = procsnz[0];
2425     vals = buf;
2426     mycols = ibuf;
2427     if (size == 1)  nz -= extra_rows;
2428     ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
2429     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
2430 
2431     /* insert into matrix */
2432     jj      = rstart*bs;
2433     for ( i=0; i<m; i++ ) {
2434       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
2435       mycols += locrowlens[i];
2436       vals   += locrowlens[i];
2437       jj++;
2438     }
2439     /* read in other processors (except the last one) and ship out */
2440     for ( i=1; i<size-1; i++ ) {
2441       nz   = procsnz[i];
2442       vals = buf;
2443       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
2444       ierr = MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);CHKERRQ(ierr);
2445     }
2446     /* the last proc */
2447     if ( size != 1 ){
2448       nz   = procsnz[i] - extra_rows;
2449       vals = buf;
2450       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
2451       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
2452       ierr = MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);CHKERRQ(ierr);
2453     }
2454     PetscFree(procsnz);
2455   } else {
2456     /* receive numeric values */
2457     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
2458 
2459     /* receive message of values*/
2460     vals   = buf;
2461     mycols = ibuf;
2462     ierr   = MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);CHKERRQ(ierr);
2463     ierr   = MPI_Get_count(&status,MPIU_SCALAR,&maxnz);CHKERRQ(ierr);
2464     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"something is wrong with file");
2465 
2466     /* insert into matrix */
2467     jj      = rstart*bs;
2468     for ( i=0; i<m; i++ ) {
2469       ierr    = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
2470       mycols += locrowlens[i];
2471       vals   += locrowlens[i];
2472       jj++;
2473     }
2474   }
2475   PetscFree(locrowlens);
2476   PetscFree(buf);
2477   PetscFree(ibuf);
2478   PetscFree(rowners);
2479   PetscFree(dlens);
2480   PetscFree(mask);
2481   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2482   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2483   PetscFunctionReturn(0);
2484 }
2485 
2486 
2487 
2488 #undef __FUNC__
2489 #define __FUNC__ "MatMPIBAIJSetHashTableFactor"
2490 /*@
2491    MatMPIBAIJSetHashTableFactor - Sets the factor required to compute the size of the HashTable.
2492 
2493    Input Parameters:
2494 .  mat  - the matrix
2495 .  fact - factor
2496 
2497    Collective on Mat
2498 
2499   Notes:
2500    This can also be set by the command line option: -mat_use_hash_table fact
2501 
2502 .keywords: matrix, hashtable, factor, HT
2503 
2504 .seealso: MatSetOption()
2505 @*/
2506 int MatMPIBAIJSetHashTableFactor(Mat mat,double fact)
2507 {
2508   Mat_MPIBAIJ *baij;
2509 
2510   PetscFunctionBegin;
2511   PetscValidHeaderSpecific(mat,MAT_COOKIE);
2512   if (mat->type != MATMPIBAIJ) {
2513       SETERRQ(PETSC_ERR_ARG_WRONG,1,"Incorrect matrix type. Use MPIBAIJ only.");
2514   }
2515   baij = (Mat_MPIBAIJ*) mat->data;
2516   baij->ht_fact = fact;
2517   PetscFunctionReturn(0);
2518 }
2519