xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision e18c124a844f8a09db1af4148adb1e1cdba562d7)
1 #ifdef PETSC_RCS_HEADER
2 static char vcid[] = "$Id: mpibaij.c,v 1.139 1998/12/17 22:10:47 bsmith Exp bsmith $";
3 #endif
4 
5 #include "src/mat/impls/baij/mpi/mpibaij.h"   /*I  "mat.h"  I*/
6 #include "src/vec/vecimpl.h"
7 
8 extern int MatSetUpMultiply_MPIBAIJ(Mat);
9 extern int DisAssemble_MPIBAIJ(Mat);
10 extern int MatIncreaseOverlap_MPIBAIJ(Mat,int,IS *,int);
11 extern int MatGetSubMatrices_MPIBAIJ(Mat,int,IS *,IS *,MatGetSubMatrixCall,Mat **);
12 extern int MatGetValues_SeqBAIJ(Mat,int,int *,int,int *,Scalar *);
13 
14 /*
15      Local utility routine that creates a mapping from the global column
16    number to the local number in the off-diagonal part of the local
17    storage of the matrix.  This is done in a non scable way since the
18    length of colmap equals the global matrix length.
19 */
20 #undef __FUNC__
21 #define __FUNC__ "CreateColmap_MPIBAIJ_Private"
22 static int CreateColmap_MPIBAIJ_Private(Mat mat)
23 {
24   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
25   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
26   int         nbs = B->nbs,i,bs=B->bs;;
27 
28   PetscFunctionBegin;
29   baij->colmap = (int *) PetscMalloc((baij->Nbs+1)*sizeof(int));CHKPTRQ(baij->colmap);
30   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
31   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
32   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i*bs+1;
33   PetscFunctionReturn(0);
34 }
35 
36 #define CHUNKSIZE  10
37 
38 #define  MatSetValues_SeqBAIJ_A_Private(row,col,value,addv) \
39 { \
40  \
41     brow = row/bs;  \
42     rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
43     rmax = aimax[brow]; nrow = ailen[brow]; \
44       bcol = col/bs; \
45       ridx = row % bs; cidx = col % bs; \
46       low = 0; high = nrow; \
47       while (high-low > 3) { \
48         t = (low+high)/2; \
49         if (rp[t] > bcol) high = t; \
50         else              low  = t; \
51       } \
52       for ( _i=low; _i<high; _i++ ) { \
53         if (rp[_i] > bcol) break; \
54         if (rp[_i] == bcol) { \
55           bap  = ap +  bs2*_i + bs*cidx + ridx; \
56           if (addv == ADD_VALUES) *bap += value;  \
57           else                    *bap  = value;  \
58           goto a_noinsert; \
59         } \
60       } \
61       if (a->nonew == 1) goto a_noinsert; \
62       else if (a->nonew == -1) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero into matrix"); \
63       if (nrow >= rmax) { \
64         /* there is no extra room in row, therefore enlarge */ \
65         int    new_nz = ai[a->mbs] + CHUNKSIZE,len,*new_i,*new_j; \
66         Scalar *new_a; \
67  \
68         if (a->nonew == -2) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero in the matrix"); \
69  \
70         /* malloc new storage space */ \
71         len     = new_nz*(sizeof(int)+bs2*sizeof(Scalar))+(a->mbs+1)*sizeof(int); \
72         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
73         new_j   = (int *) (new_a + bs2*new_nz); \
74         new_i   = new_j + new_nz; \
75  \
76         /* copy over old data into new slots */ \
77         for ( ii=0; ii<brow+1; ii++ ) {new_i[ii] = ai[ii];} \
78         for ( ii=brow+1; ii<a->mbs+1; ii++ ) {new_i[ii] = ai[ii]+CHUNKSIZE;} \
79         PetscMemcpy(new_j,aj,(ai[brow]+nrow)*sizeof(int)); \
80         len = (new_nz - CHUNKSIZE - ai[brow] - nrow); \
81         PetscMemcpy(new_j+ai[brow]+nrow+CHUNKSIZE,aj+ai[brow]+nrow, \
82                                                            len*sizeof(int)); \
83         PetscMemcpy(new_a,aa,(ai[brow]+nrow)*bs2*sizeof(Scalar)); \
84         PetscMemzero(new_a+bs2*(ai[brow]+nrow),bs2*CHUNKSIZE*sizeof(Scalar)); \
85         PetscMemcpy(new_a+bs2*(ai[brow]+nrow+CHUNKSIZE), \
86                     aa+bs2*(ai[brow]+nrow),bs2*len*sizeof(Scalar));  \
87         /* free up old matrix storage */ \
88         PetscFree(a->a);  \
89         if (!a->singlemalloc) {PetscFree(a->i);PetscFree(a->j);} \
90         aa = a->a = new_a; ai = a->i = new_i; aj = a->j = new_j;  \
91         a->singlemalloc = 1; \
92  \
93         rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
94         rmax = aimax[brow] = aimax[brow] + CHUNKSIZE; \
95         PLogObjectMemory(A,CHUNKSIZE*(sizeof(int) + bs2*sizeof(Scalar))); \
96         a->maxnz += bs2*CHUNKSIZE; \
97         a->reallocs++; \
98         a->nz++; \
99       } \
100       N = nrow++ - 1;  \
101       /* shift up all the later entries in this row */ \
102       for ( ii=N; ii>=_i; ii-- ) { \
103         rp[ii+1] = rp[ii]; \
104         PetscMemcpy(ap+bs2*(ii+1),ap+bs2*(ii),bs2*sizeof(Scalar)); \
105       } \
106       if (N>=_i) PetscMemzero(ap+bs2*_i,bs2*sizeof(Scalar));  \
107       rp[_i]                      = bcol;  \
108       ap[bs2*_i + bs*cidx + ridx] = value;  \
109       a_noinsert:; \
110     ailen[brow] = nrow; \
111 }
112 
113 #define  MatSetValues_SeqBAIJ_B_Private(row,col,value,addv) \
114 { \
115  \
116     brow = row/bs;  \
117     rp   = bj + bi[brow]; ap = ba + bs2*bi[brow]; \
118     rmax = bimax[brow]; nrow = bilen[brow]; \
119       bcol = col/bs; \
120       ridx = row % bs; cidx = col % bs; \
121       low = 0; high = nrow; \
122       while (high-low > 3) { \
123         t = (low+high)/2; \
124         if (rp[t] > bcol) high = t; \
125         else              low  = t; \
126       } \
127       for ( _i=low; _i<high; _i++ ) { \
128         if (rp[_i] > bcol) break; \
129         if (rp[_i] == bcol) { \
130           bap  = ap +  bs2*_i + bs*cidx + ridx; \
131           if (addv == ADD_VALUES) *bap += value;  \
132           else                    *bap  = value;  \
133           goto b_noinsert; \
134         } \
135       } \
136       if (b->nonew == 1) goto b_noinsert; \
137       else if (b->nonew == -1) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero into matrix"); \
138       if (nrow >= rmax) { \
139         /* there is no extra room in row, therefore enlarge */ \
140         int    new_nz = bi[b->mbs] + CHUNKSIZE,len,*new_i,*new_j; \
141         Scalar *new_a; \
142  \
143         if (b->nonew == -2) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero in the matrix"); \
144  \
145         /* malloc new storage space */ \
146         len     = new_nz*(sizeof(int)+bs2*sizeof(Scalar))+(b->mbs+1)*sizeof(int); \
147         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
148         new_j   = (int *) (new_a + bs2*new_nz); \
149         new_i   = new_j + new_nz; \
150  \
151         /* copy over old data into new slots */ \
152         for ( ii=0; ii<brow+1; ii++ ) {new_i[ii] = bi[ii];} \
153         for ( ii=brow+1; ii<b->mbs+1; ii++ ) {new_i[ii] = bi[ii]+CHUNKSIZE;} \
154         PetscMemcpy(new_j,bj,(bi[brow]+nrow)*sizeof(int)); \
155         len = (new_nz - CHUNKSIZE - bi[brow] - nrow); \
156         PetscMemcpy(new_j+bi[brow]+nrow+CHUNKSIZE,bj+bi[brow]+nrow, \
157                                                            len*sizeof(int)); \
158         PetscMemcpy(new_a,ba,(bi[brow]+nrow)*bs2*sizeof(Scalar)); \
159         PetscMemzero(new_a+bs2*(bi[brow]+nrow),bs2*CHUNKSIZE*sizeof(Scalar)); \
160         PetscMemcpy(new_a+bs2*(bi[brow]+nrow+CHUNKSIZE), \
161                     ba+bs2*(bi[brow]+nrow),bs2*len*sizeof(Scalar));  \
162         /* free up old matrix storage */ \
163         PetscFree(b->a);  \
164         if (!b->singlemalloc) {PetscFree(b->i);PetscFree(b->j);} \
165         ba = b->a = new_a; bi = b->i = new_i; bj = b->j = new_j;  \
166         b->singlemalloc = 1; \
167  \
168         rp   = bj + bi[brow]; ap = ba + bs2*bi[brow]; \
169         rmax = bimax[brow] = bimax[brow] + CHUNKSIZE; \
170         PLogObjectMemory(B,CHUNKSIZE*(sizeof(int) + bs2*sizeof(Scalar))); \
171         b->maxnz += bs2*CHUNKSIZE; \
172         b->reallocs++; \
173         b->nz++; \
174       } \
175       N = nrow++ - 1;  \
176       /* shift up all the later entries in this row */ \
177       for ( ii=N; ii>=_i; ii-- ) { \
178         rp[ii+1] = rp[ii]; \
179         PetscMemcpy(ap+bs2*(ii+1),ap+bs2*(ii),bs2*sizeof(Scalar)); \
180       } \
181       if (N>=_i) PetscMemzero(ap+bs2*_i,bs2*sizeof(Scalar));  \
182       rp[_i]                      = bcol;  \
183       ap[bs2*_i + bs*cidx + ridx] = value;  \
184       b_noinsert:; \
185     bilen[brow] = nrow; \
186 }
187 
188 #undef __FUNC__
189 #define __FUNC__ "MatSetValues_MPIBAIJ"
190 int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
191 {
192   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
193   Scalar      value;
194   int         ierr,i,j,row,col;
195   int         roworiented = baij->roworiented,rstart_orig=baij->rstart_bs ;
196   int         rend_orig=baij->rend_bs,cstart_orig=baij->cstart_bs;
197   int         cend_orig=baij->cend_bs,bs=baij->bs;
198 
199   /* Some Variables required in the macro */
200   Mat         A = baij->A;
201   Mat_SeqBAIJ *a = (Mat_SeqBAIJ *) (A)->data;
202   int         *aimax=a->imax,*ai=a->i,*ailen=a->ilen,*aj=a->j;
203   Scalar      *aa=a->a;
204 
205   Mat         B = baij->B;
206   Mat_SeqBAIJ *b = (Mat_SeqBAIJ *) (B)->data;
207   int         *bimax=b->imax,*bi=b->i,*bilen=b->ilen,*bj=b->j;
208   Scalar      *ba=b->a;
209 
210   int         *rp,ii,nrow,_i,rmax,N,brow,bcol;
211   int         low,high,t,ridx,cidx,bs2=a->bs2;
212   Scalar      *ap,*bap;
213 
214   PetscFunctionBegin;
215   for ( i=0; i<m; i++ ) {
216 #if defined(USE_PETSC_BOPT_g)
217     if (im[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
218     if (im[i] >= baij->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
219 #endif
220     if (im[i] >= rstart_orig && im[i] < rend_orig) {
221       row = im[i] - rstart_orig;
222       for ( j=0; j<n; j++ ) {
223         if (in[j] >= cstart_orig && in[j] < cend_orig){
224           col = in[j] - cstart_orig;
225           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
226           MatSetValues_SeqBAIJ_A_Private(row,col,value,addv);
227           /* ierr = MatSetValues_SeqBAIJ(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
228         }
229 #if defined(USE_PETSC_BOPT_g)
230         else if (in[j] < 0) {SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative column");}
231         else if (in[j] >= baij->N) {SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Col too large");}
232 #endif
233         else {
234           if (mat->was_assembled) {
235             if (!baij->colmap) {
236               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
237             }
238             col = baij->colmap[in[j]/bs] - 1 + in[j]%bs;
239             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
240               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
241               col =  in[j];
242               /* Reinitialize the variables required by MatSetValues_SeqBAIJ_B_Private() */
243               B = baij->B;
244               b = (Mat_SeqBAIJ *) (B)->data;
245               bimax=b->imax;bi=b->i;bilen=b->ilen;bj=b->j;
246               ba=b->a;
247             }
248           } else col = in[j];
249           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
250           MatSetValues_SeqBAIJ_B_Private(row,col,value,addv);
251           /* ierr = MatSetValues_SeqBAIJ(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
252         }
253       }
254     } else {
255       if (roworiented && !baij->donotstash) {
256         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
257       } else {
258         if (!baij->donotstash) {
259           row = im[i];
260 	  for ( j=0; j<n; j++ ) {
261 	    ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
262           }
263         }
264       }
265     }
266   }
267   PetscFunctionReturn(0);
268 }
269 
270 extern int MatSetValuesBlocked_SeqBAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
271 #undef __FUNC__
272 #define __FUNC__ "MatSetValuesBlocked_MPIBAIJ"
273 int MatSetValuesBlocked_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
274 {
275   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
276   Scalar      *value,*barray=baij->barray;
277   int         ierr,i,j,ii,jj,row,col,k,l;
278   int         roworiented = baij->roworiented,rstart=baij->rstart ;
279   int         rend=baij->rend,cstart=baij->cstart,stepval;
280   int         cend=baij->cend,bs=baij->bs,bs2=baij->bs2;
281 
282   if(!barray) {
283     baij->barray = barray = (Scalar*) PetscMalloc(bs2*sizeof(Scalar)); CHKPTRQ(barray);
284   }
285 
286   if (roworiented) {
287     stepval = (n-1)*bs;
288   } else {
289     stepval = (m-1)*bs;
290   }
291   for ( i=0; i<m; i++ ) {
292 #if defined(USE_PETSC_BOPT_g)
293     if (im[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
294     if (im[i] >= baij->Mbs) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
295 #endif
296     if (im[i] >= rstart && im[i] < rend) {
297       row = im[i] - rstart;
298       for ( j=0; j<n; j++ ) {
299         /* If NumCol = 1 then a copy is not required */
300         if ((roworiented) && (n == 1)) {
301           barray = v + i*bs2;
302         } else if((!roworiented) && (m == 1)) {
303           barray = v + j*bs2;
304         } else { /* Here a copy is required */
305           if (roworiented) {
306             value = v + i*(stepval+bs)*bs + j*bs;
307           } else {
308             value = v + j*(stepval+bs)*bs + i*bs;
309           }
310           for ( ii=0; ii<bs; ii++,value+=stepval ) {
311             for (jj=0; jj<bs; jj++ ) {
312               *barray++  = *value++;
313             }
314           }
315           barray -=bs2;
316         }
317 
318         if (in[j] >= cstart && in[j] < cend){
319           col  = in[j] - cstart;
320           ierr = MatSetValuesBlocked_SeqBAIJ(baij->A,1,&row,1,&col,barray,addv);CHKERRQ(ierr);
321         }
322 #if defined(USE_PETSC_BOPT_g)
323         else if (in[j] < 0) {SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative column");}
324         else if (in[j] >= baij->Nbs) {SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Column too large");}
325 #endif
326         else {
327           if (mat->was_assembled) {
328             if (!baij->colmap) {
329               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
330             }
331 
332 #if defined(USE_PETSC_BOPT_g)
333             if ((baij->colmap[in[j]] - 1) % bs) {SETERRQ(PETSC_ERR_PLIB,0,"Incorrect colmap");}
334 #endif
335             col = (baij->colmap[in[j]] - 1)/bs;
336             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
337               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
338               col =  in[j];
339             }
340           }
341           else col = in[j];
342           ierr = MatSetValuesBlocked_SeqBAIJ(baij->B,1,&row,1,&col,barray,addv);CHKERRQ(ierr);
343         }
344       }
345     } else {
346       if (!baij->donotstash) {
347         if (roworiented ) {
348           row   = im[i]*bs;
349           value = v + i*(stepval+bs)*bs;
350           for ( j=0; j<bs; j++,row++ ) {
351             for ( k=0; k<n; k++ ) {
352               for ( col=in[k]*bs,l=0; l<bs; l++,col++) {
353                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
354               }
355             }
356           }
357         } else {
358           for ( j=0; j<n; j++ ) {
359             value = v + j*(stepval+bs)*bs + i*bs;
360             col   = in[j]*bs;
361             for ( k=0; k<bs; k++,col++,value+=stepval) {
362               for ( row = im[i]*bs,l=0; l<bs; l++,row++) {
363                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
364               }
365             }
366           }
367         }
368       }
369     }
370   }
371   PetscFunctionReturn(0);
372 }
373 #define HASH_KEY 0.6180339887
374 /* #define HASH1(size,key) ((int)((size)*fmod(((key)*HASH_KEY),1))) */
375 #define HASH(size,key,tmp) (tmp = (key)*HASH_KEY,(int)((size)*(tmp-(int)tmp)))
376 /* #define HASH(size,key,tmp) ((int)((size)*fmod(((key)*HASH_KEY),1))) */
377 #undef __FUNC__
378 #define __FUNC__ "MatSetValues_MPIBAIJ_HT"
379 int MatSetValues_MPIBAIJ_HT(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
380 {
381   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
382   int         ierr,i,j,row,col;
383   int         roworiented = baij->roworiented,rstart_orig=baij->rstart_bs ;
384   int         rend_orig=baij->rend_bs,Nbs=baij->Nbs;
385   int         h1,key,size=baij->ht_size,bs=baij->bs,*HT=baij->ht,idx;
386   double      tmp;
387   Scalar      ** HD = baij->hd,value;
388 #if defined(USE_PETSC_BOPT_g)
389   int         total_ct=baij->ht_total_ct,insert_ct=baij->ht_insert_ct;
390 #endif
391 
392   PetscFunctionBegin;
393 
394   for ( i=0; i<m; i++ ) {
395 #if defined(USE_PETSC_BOPT_g)
396     if (im[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
397     if (im[i] >= baij->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
398 #endif
399       row = im[i];
400     if (row >= rstart_orig && row < rend_orig) {
401       for ( j=0; j<n; j++ ) {
402         col = in[j];
403         if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
404         /* Look up into the Hash Table */
405         key = (row/bs)*Nbs+(col/bs)+1;
406         h1  = HASH(size,key,tmp);
407 
408 
409         idx = h1;
410 #if defined(USE_PETSC_BOPT_g)
411         insert_ct++;
412         total_ct++;
413         if (HT[idx] != key) {
414           for ( idx=h1; (idx<size) && (HT[idx]!=key); idx++,total_ct++);
415           if (idx == size) {
416             for ( idx=0; (idx<h1) && (HT[idx]!=key); idx++,total_ct++);
417             if (idx == h1) {
418               SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"(row,col) has no entry in the hash table");
419             }
420           }
421         }
422 #else
423         if (HT[idx] != key) {
424           for ( idx=h1; (idx<size) && (HT[idx]!=key); idx++);
425           if (idx == size) {
426             for ( idx=0; (idx<h1) && (HT[idx]!=key); idx++);
427             if (idx == h1) {
428               SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"(row,col) has no entry in the hash table");
429             }
430           }
431         }
432 #endif
433         /* A HASH table entry is found, so insert the values at the correct address */
434         if (addv == ADD_VALUES) *(HD[idx]+ (col % bs)*bs + (row % bs)) += value;
435         else                    *(HD[idx]+ (col % bs)*bs + (row % bs))  = value;
436       }
437     } else {
438       if (roworiented && !baij->donotstash) {
439         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
440       } else {
441         if (!baij->donotstash) {
442           row = im[i];
443 	  for ( j=0; j<n; j++ ) {
444 	    ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
445           }
446         }
447       }
448     }
449   }
450 #if defined(USE_PETSC_BOPT_g)
451   baij->ht_total_ct = total_ct;
452   baij->ht_insert_ct = insert_ct;
453 #endif
454   PetscFunctionReturn(0);
455 }
456 
457 #undef __FUNC__
458 #define __FUNC__ "MatSetValuesBlocked_MPIBAIJ_HT"
459 int MatSetValuesBlocked_MPIBAIJ_HT(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
460 {
461   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
462   int         ierr,i,j,ii,jj,row,col,k,l;
463   int         roworiented = baij->roworiented,rstart=baij->rstart ;
464   int         rend=baij->rend,stepval,bs=baij->bs,bs2=baij->bs2;
465   int         h1,key,size=baij->ht_size,idx,*HT=baij->ht,Nbs=baij->Nbs;
466   double      tmp;
467   Scalar      ** HD = baij->hd,*value,*v_t,*baij_a;
468 #if defined(USE_PETSC_BOPT_g)
469   int         total_ct=baij->ht_total_ct,insert_ct=baij->ht_insert_ct;
470 #endif
471 
472   PetscFunctionBegin;
473 
474   if (roworiented) {
475     stepval = (n-1)*bs;
476   } else {
477     stepval = (m-1)*bs;
478   }
479   for ( i=0; i<m; i++ ) {
480 #if defined(USE_PETSC_BOPT_g)
481     if (im[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
482     if (im[i] >= baij->Mbs) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
483 #endif
484     row   = im[i];
485     v_t   = v + i*bs2;
486     if (row >= rstart && row < rend) {
487       for ( j=0; j<n; j++ ) {
488         col = in[j];
489 
490         /* Look up into the Hash Table */
491         key = row*Nbs+col+1;
492         h1  = HASH(size,key,tmp);
493 
494         idx = h1;
495 #if defined(USE_PETSC_BOPT_g)
496         total_ct++;
497         insert_ct++;
498        if (HT[idx] != key) {
499           for ( idx=h1; (idx<size) && (HT[idx]!=key); idx++,total_ct++);
500           if (idx == size) {
501             for ( idx=0; (idx<h1) && (HT[idx]!=key); idx++,total_ct++);
502             if (idx == h1) {
503               SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"(row,col) has no entry in the hash table");
504             }
505           }
506         }
507 #else
508         if (HT[idx] != key) {
509           for ( idx=h1; (idx<size) && (HT[idx]!=key); idx++);
510           if (idx == size) {
511             for ( idx=0; (idx<h1) && (HT[idx]!=key); idx++);
512             if (idx == h1) {
513               SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"(row,col) has no entry in the hash table");
514             }
515           }
516         }
517 #endif
518         baij_a = HD[idx];
519         if (roworiented) {
520           /*value = v + i*(stepval+bs)*bs + j*bs;*/
521           /* value = v + (i*(stepval+bs)+j)*bs; */
522           value = v_t;
523           v_t  += bs;
524           if (addv == ADD_VALUES) {
525             for ( ii=0; ii<bs; ii++,value+=stepval) {
526               for ( jj=ii; jj<bs2; jj+=bs ) {
527                 baij_a[jj]  += *value++;
528               }
529             }
530           } else {
531             for ( ii=0; ii<bs; ii++,value+=stepval) {
532               for ( jj=ii; jj<bs2; jj+=bs ) {
533                 baij_a[jj]  = *value++;
534               }
535             }
536           }
537         } else {
538           value = v + j*(stepval+bs)*bs + i*bs;
539           if (addv == ADD_VALUES) {
540             for ( ii=0; ii<bs; ii++,value+=stepval,baij_a+=bs ) {
541               for ( jj=0; jj<bs; jj++ ) {
542                 baij_a[jj]  += *value++;
543               }
544             }
545           } else {
546             for ( ii=0; ii<bs; ii++,value+=stepval,baij_a+=bs ) {
547               for ( jj=0; jj<bs; jj++ ) {
548                 baij_a[jj]  = *value++;
549               }
550             }
551           }
552         }
553       }
554     } else {
555       if (!baij->donotstash) {
556         if (roworiented ) {
557           row   = im[i]*bs;
558           value = v + i*(stepval+bs)*bs;
559           for ( j=0; j<bs; j++,row++ ) {
560             for ( k=0; k<n; k++ ) {
561               for ( col=in[k]*bs,l=0; l<bs; l++,col++) {
562                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
563               }
564             }
565           }
566         } else {
567           for ( j=0; j<n; j++ ) {
568             value = v + j*(stepval+bs)*bs + i*bs;
569             col   = in[j]*bs;
570             for ( k=0; k<bs; k++,col++,value+=stepval) {
571               for ( row = im[i]*bs,l=0; l<bs; l++,row++) {
572                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
573               }
574             }
575           }
576         }
577       }
578     }
579   }
580 #if defined(USE_PETSC_BOPT_g)
581   baij->ht_total_ct = total_ct;
582   baij->ht_insert_ct = insert_ct;
583 #endif
584   PetscFunctionReturn(0);
585 }
586 
587 #undef __FUNC__
588 #define __FUNC__ "MatGetValues_MPIBAIJ"
589 int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
590 {
591   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
592   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
593   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col;
594 
595   PetscFunctionBegin;
596   for ( i=0; i<m; i++ ) {
597     if (idxm[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
598     if (idxm[i] >= baij->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
599     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
600       row = idxm[i] - bsrstart;
601       for ( j=0; j<n; j++ ) {
602         if (idxn[j] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative column");
603         if (idxn[j] >= baij->N) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Column too large");
604         if (idxn[j] >= bscstart && idxn[j] < bscend){
605           col = idxn[j] - bscstart;
606           ierr = MatGetValues_SeqBAIJ(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
607         } else {
608           if (!baij->colmap) {
609             ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
610           }
611           if((baij->colmap[idxn[j]/bs]-1 < 0) ||
612              (baij->garray[(baij->colmap[idxn[j]/bs]-1)/bs] != idxn[j]/bs)) *(v+i*n+j) = 0.0;
613           else {
614             col  = (baij->colmap[idxn[j]/bs]-1) + idxn[j]%bs;
615             ierr = MatGetValues_SeqBAIJ(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
616           }
617         }
618       }
619     } else {
620       SETERRQ(PETSC_ERR_SUP,0,"Only local values currently supported");
621     }
622   }
623  PetscFunctionReturn(0);
624 }
625 
626 #undef __FUNC__
627 #define __FUNC__ "MatNorm_MPIBAIJ"
628 int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
629 {
630   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
631   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
632   int        ierr, i,bs2=baij->bs2;
633   double     sum = 0.0;
634   Scalar     *v;
635 
636   PetscFunctionBegin;
637   if (baij->size == 1) {
638     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
639   } else {
640     if (type == NORM_FROBENIUS) {
641       v = amat->a;
642       for (i=0; i<amat->nz*bs2; i++ ) {
643 #if defined(USE_PETSC_COMPLEX)
644         sum += PetscReal(PetscConj(*v)*(*v)); v++;
645 #else
646         sum += (*v)*(*v); v++;
647 #endif
648       }
649       v = bmat->a;
650       for (i=0; i<bmat->nz*bs2; i++ ) {
651 #if defined(USE_PETSC_COMPLEX)
652         sum += PetscReal(PetscConj(*v)*(*v)); v++;
653 #else
654         sum += (*v)*(*v); v++;
655 #endif
656       }
657       ierr = MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);CHKERRQ(ierr);
658       *norm = sqrt(*norm);
659     } else {
660       SETERRQ(PETSC_ERR_SUP,0,"No support for this norm yet");
661     }
662   }
663   PetscFunctionReturn(0);
664 }
665 
666 #undef __FUNC__
667 #define __FUNC__ "MatAssemblyBegin_MPIBAIJ"
668 int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
669 {
670   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
671   MPI_Comm    comm = mat->comm;
672   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
673   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
674   MPI_Request *send_waits,*recv_waits;
675   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
676   InsertMode  addv;
677   Scalar      *rvalues,*svalues;
678 
679   PetscFunctionBegin;
680   if (baij->donotstash) {
681     baij->svalues    = 0; baij->rvalues    = 0;
682     baij->nsends     = 0; baij->nrecvs     = 0;
683     baij->send_waits = 0; baij->recv_waits = 0;
684     baij->rmax       = 0;
685     PetscFunctionReturn(0);
686   }
687 
688   /* make sure all processors are either in INSERTMODE or ADDMODE */
689   ierr = MPI_Allreduce(&mat->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);CHKERRQ(ierr);
690   if (addv == (ADD_VALUES|INSERT_VALUES)) {
691     SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"Some processors inserted others added");
692   }
693   mat->insertmode = addv; /* in case this processor had no cache */
694 
695   /*  first count number of contributors to each processor */
696   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
697   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
698   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
699   for ( i=0; i<baij->stash.n; i++ ) {
700     idx = baij->stash.idx[i];
701     for ( j=0; j<size; j++ ) {
702       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
703         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
704       }
705     }
706   }
707   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
708 
709   /* inform other processors of number of messages and max length*/
710   work      = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
711   ierr      = MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
712   nreceives = work[rank];
713   ierr      = MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);CHKERRQ(ierr);
714   nmax      = work[rank];
715   PetscFree(work);
716 
717   /* post receives:
718        1) each message will consist of ordered pairs
719      (global index,value) we store the global index as a double
720      to simplify the message passing.
721        2) since we don't know how long each individual message is we
722      allocate the largest needed buffer for each receive. Potentially
723      this is a lot of wasted space.
724 
725 
726        This could be done better.
727   */
728   rvalues    = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));CHKPTRQ(rvalues);
729   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
730   for ( i=0; i<nreceives; i++ ) {
731     ierr = MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,comm,recv_waits+i);CHKERRQ(ierr);
732   }
733 
734   /* do sends:
735       1) starts[i] gives the starting index in svalues for stuff going to
736          the ith processor
737   */
738   svalues    = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
739   send_waits = (MPI_Request *) PetscMalloc((nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
740   starts     = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
741   starts[0] = 0;
742   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
743   for ( i=0; i<baij->stash.n; i++ ) {
744     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
745     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
746     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
747   }
748   PetscFree(owner);
749   starts[0] = 0;
750   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
751   count = 0;
752   for ( i=0; i<size; i++ ) {
753     if (procs[i]) {
754       ierr = MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,comm,send_waits+count++);CHKERRQ(ierr);
755     }
756   }
757   PetscFree(starts); PetscFree(nprocs);
758 
759   /* Free cache space */
760   PLogInfo(baij->A,"MatAssemblyBegin_MPIBAIJ:Number of off-processor values %d\n",baij->stash.n);
761   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
762 
763   baij->svalues    = svalues;    baij->rvalues    = rvalues;
764   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
765   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
766   baij->rmax       = nmax;
767 
768   PetscFunctionReturn(0);
769 }
770 
771 /*
772   Creates the hash table, and sets the table
773   This table is created only once.
774   If new entried need to be added to the matrix
775   then the hash table has to be destroyed and
776   recreated.
777 */
778 #undef __FUNC__
779 #define __FUNC__ "MatCreateHashTable_MPIBAIJ_Private"
780 int MatCreateHashTable_MPIBAIJ_Private(Mat mat,double factor)
781 {
782   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
783   Mat         A = baij->A, B=baij->B;
784   Mat_SeqBAIJ *a=(Mat_SeqBAIJ *)A->data, *b=(Mat_SeqBAIJ *)B->data;
785   int         i,j,k,nz=a->nz+b->nz,h1,*ai=a->i,*aj=a->j,*bi=b->i,*bj=b->j;
786   int         size,bs2=baij->bs2,rstart=baij->rstart;
787   int         cstart=baij->cstart,*garray=baij->garray,row,col,Nbs=baij->Nbs;
788   int         *HT,key;
789   Scalar      **HD;
790   double      tmp;
791 #if defined(USE_PETSC_BOPT_g)
792   int         ct=0,max=0;
793 #endif
794 
795   PetscFunctionBegin;
796   baij->ht_size=(int)(factor*nz);
797   size = baij->ht_size;
798 
799   if (baij->ht) {
800     PetscFunctionReturn(0);
801   }
802 
803   /* Allocate Memory for Hash Table */
804   baij->hd = (Scalar**)PetscMalloc((size)*(sizeof(int)+sizeof(Scalar*))+1); CHKPTRQ(baij->hd);
805   baij->ht = (int*)(baij->hd + size);
806   HD = baij->hd;
807   HT = baij->ht;
808 
809 
810   PetscMemzero(HD,size*(sizeof(int)+sizeof(Scalar*)));
811 
812 
813   /* Loop Over A */
814   for ( i=0; i<a->mbs; i++ ) {
815     for ( j=ai[i]; j<ai[i+1]; j++ ) {
816       row = i+rstart;
817       col = aj[j]+cstart;
818 
819       key = row*Nbs + col + 1;
820       h1  = HASH(size,key,tmp);
821       for ( k=0; k<size; k++ ){
822         if (HT[(h1+k)%size] == 0.0) {
823           HT[(h1+k)%size] = key;
824           HD[(h1+k)%size] = a->a + j*bs2;
825           break;
826 #if defined(USE_PETSC_BOPT_g)
827         } else {
828           ct++;
829 #endif
830         }
831       }
832 #if defined(USE_PETSC_BOPT_g)
833       if (k> max) max = k;
834 #endif
835     }
836   }
837   /* Loop Over B */
838   for ( i=0; i<b->mbs; i++ ) {
839     for ( j=bi[i]; j<bi[i+1]; j++ ) {
840       row = i+rstart;
841       col = garray[bj[j]];
842       key = row*Nbs + col + 1;
843       h1  = HASH(size,key,tmp);
844       for ( k=0; k<size; k++ ){
845         if (HT[(h1+k)%size] == 0.0) {
846           HT[(h1+k)%size] = key;
847           HD[(h1+k)%size] = b->a + j*bs2;
848           break;
849 #if defined(USE_PETSC_BOPT_g)
850         } else {
851           ct++;
852 #endif
853         }
854       }
855 #if defined(USE_PETSC_BOPT_g)
856       if (k> max) max = k;
857 #endif
858     }
859   }
860 
861   /* Print Summary */
862 #if defined(USE_PETSC_BOPT_g)
863   for ( i=0,j=0; i<size; i++)
864     if (HT[i]) {j++;}
865   PLogInfo(0,"MatCreateHashTable_MPIBAIJ_Private: Average Search = %5.2f,max search = %d\n",
866            (j== 0)? 0.0:((double)(ct+j))/j,max);
867 #endif
868   PetscFunctionReturn(0);
869 }
870 
871 #undef __FUNC__
872 #define __FUNC__ "MatAssemblyEnd_MPIBAIJ"
873 int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
874 {
875   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
876   MPI_Status  *send_status,recv_status;
877   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
878   int         bs=baij->bs,row,col,other_disassembled;
879   Scalar      *values,val;
880   InsertMode  addv = mat->insertmode;
881 
882   PetscFunctionBegin;
883   /*  wait on receives */
884   while (count) {
885     ierr = MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);CHKERRQ(ierr);
886     /* unpack receives into our local space */
887     values = baij->rvalues + 3*imdex*baij->rmax;
888     ierr = MPI_Get_count(&recv_status,MPIU_SCALAR,&n);CHKERRQ(ierr);
889     n = n/3;
890     for ( i=0; i<n; i++ ) {
891       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
892       col = (int) PetscReal(values[3*i+1]);
893       val = values[3*i+2];
894       if (col >= baij->cstart*bs && col < baij->cend*bs) {
895         col -= baij->cstart*bs;
896         ierr = MatSetValues(baij->A,1,&row,1,&col,&val,addv); CHKERRQ(ierr)
897       } else {
898         if (mat->was_assembled) {
899           if (!baij->colmap) {
900             ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);
901           }
902           col = (baij->colmap[col/bs]) - 1 + col%bs;
903           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
904             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
905             col = (int) PetscReal(values[3*i+1]);
906           }
907         }
908         ierr = MatSetValues(baij->B,1,&row,1,&col,&val,addv); CHKERRQ(ierr)
909       }
910     }
911     count--;
912   }
913   if (baij->recv_waits) PetscFree(baij->recv_waits);
914   if (baij->rvalues)    PetscFree(baij->rvalues);
915 
916   /* wait on sends */
917   if (baij->nsends) {
918     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
919     ierr        = MPI_Waitall(baij->nsends,baij->send_waits,send_status);CHKERRQ(ierr);
920     PetscFree(send_status);
921   }
922   if (baij->send_waits) PetscFree(baij->send_waits);
923   if (baij->svalues)    PetscFree(baij->svalues);
924 
925   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
926   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
927 
928   /* determine if any processor has disassembled, if so we must
929      also disassemble ourselfs, in order that we may reassemble. */
930   /*
931      if nonzero structure of submatrix B cannot change then we know that
932      no processor disassembled thus we can skip this stuff
933   */
934   if (!((Mat_SeqBAIJ*) baij->B->data)->nonew)  {
935     ierr = MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);CHKERRQ(ierr);
936     if (mat->was_assembled && !other_disassembled) {
937       ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
938     }
939   }
940 
941   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
942     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
943   }
944   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
945   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
946 
947 #if defined(USE_PETSC_BOPT_g)
948   if (baij->ht && mode== MAT_FINAL_ASSEMBLY) {
949     PLogInfo(0,"MatAssemblyEnd_MPIBAIJ:Average Hash Table Search in MatSetValues = %5.2f\n",
950              ((double)baij->ht_total_ct)/baij->ht_insert_ct);
951     baij->ht_total_ct  = 0;
952     baij->ht_insert_ct = 0;
953   }
954 #endif
955   if (baij->ht_flag && !baij->ht && mode == MAT_FINAL_ASSEMBLY) {
956     ierr = MatCreateHashTable_MPIBAIJ_Private(mat,baij->ht_fact); CHKERRQ(ierr);
957     mat->ops->setvalues        = MatSetValues_MPIBAIJ_HT;
958     mat->ops->setvaluesblocked = MatSetValuesBlocked_MPIBAIJ_HT;
959   }
960 
961   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
962   PetscFunctionReturn(0);
963 }
964 
965 #undef __FUNC__
966 #define __FUNC__ "MatView_MPIBAIJ_Binary"
967 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
968 {
969   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
970   int          ierr;
971 
972   PetscFunctionBegin;
973   if (baij->size == 1) {
974     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
975   } else SETERRQ(PETSC_ERR_SUP,0,"Only uniprocessor output supported");
976   PetscFunctionReturn(0);
977 }
978 
979 #undef __FUNC__
980 #define __FUNC__ "MatView_MPIBAIJ_ASCIIorDraworMatlab"
981 static int MatView_MPIBAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
982 {
983   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
984   int          ierr, format,bs = baij->bs, size = baij->size, rank = baij->rank;
985   FILE         *fd;
986   ViewerType   vtype;
987 
988   PetscFunctionBegin;
989   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
990   if (PetscTypeCompare(vtype,ASCII_VIEWER)) {
991     ierr = ViewerGetFormat(viewer,&format);
992     if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
993       MatInfo info;
994       MPI_Comm_rank(mat->comm,&rank);
995       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
996       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
997       PetscSequentialPhaseBegin(mat->comm,1);
998       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
999               rank,baij->m,(int)info.nz_used*bs,(int)info.nz_allocated*bs,
1000               baij->bs,(int)info.memory);
1001       ierr = MatGetInfo(baij->A,MAT_LOCAL,&info);
1002       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
1003       ierr = MatGetInfo(baij->B,MAT_LOCAL,&info);
1004       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
1005       fflush(fd);
1006       PetscSequentialPhaseEnd(mat->comm,1);
1007       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
1008       PetscFunctionReturn(0);
1009     } else if (format == VIEWER_FORMAT_ASCII_INFO) {
1010       PetscPrintf(mat->comm,"  block size is %d\n",bs);
1011       PetscFunctionReturn(0);
1012     }
1013   }
1014 
1015   if (PetscTypeCompare(vtype,DRAW_VIEWER)) {
1016     Draw       draw;
1017     PetscTruth isnull;
1018     ierr = ViewerDrawGetDraw(viewer,0,&draw); CHKERRQ(ierr);
1019     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) PetscFunctionReturn(0);
1020   }
1021 
1022   if (size == 1) {
1023     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
1024   } else {
1025     /* assemble the entire matrix onto first processor. */
1026     Mat         A;
1027     Mat_SeqBAIJ *Aloc;
1028     int         M = baij->M, N = baij->N,*ai,*aj,col,i,j,k,*rvals;
1029     int         mbs = baij->mbs;
1030     Scalar      *a;
1031 
1032     if (!rank) {
1033       ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);CHKERRQ(ierr);
1034     } else {
1035       ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);CHKERRQ(ierr);
1036     }
1037     PLogObjectParent(mat,A);
1038 
1039     /* copy over the A part */
1040     Aloc = (Mat_SeqBAIJ*) baij->A->data;
1041     ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1042     rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
1043 
1044     for ( i=0; i<mbs; i++ ) {
1045       rvals[0] = bs*(baij->rstart + i);
1046       for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1047       for ( j=ai[i]; j<ai[i+1]; j++ ) {
1048         col = (baij->cstart+aj[j])*bs;
1049         for (k=0; k<bs; k++ ) {
1050           ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
1051           col++; a += bs;
1052         }
1053       }
1054     }
1055     /* copy over the B part */
1056     Aloc = (Mat_SeqBAIJ*) baij->B->data;
1057     ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1058     for ( i=0; i<mbs; i++ ) {
1059       rvals[0] = bs*(baij->rstart + i);
1060       for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1061       for ( j=ai[i]; j<ai[i+1]; j++ ) {
1062         col = baij->garray[aj[j]]*bs;
1063         for (k=0; k<bs; k++ ) {
1064           ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
1065           col++; a += bs;
1066         }
1067       }
1068     }
1069     PetscFree(rvals);
1070     ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1071     ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1072     /*
1073        Everyone has to call to draw the matrix since the graphics waits are
1074        synchronized across all processors that share the Draw object
1075     */
1076     if (!rank || PetscTypeCompare(vtype,DRAW_VIEWER)) {
1077       ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
1078     }
1079     ierr = MatDestroy(A); CHKERRQ(ierr);
1080   }
1081   PetscFunctionReturn(0);
1082 }
1083 
1084 
1085 
1086 #undef __FUNC__
1087 #define __FUNC__ "MatView_MPIBAIJ"
1088 int MatView_MPIBAIJ(Mat mat,Viewer viewer)
1089 {
1090   int         ierr;
1091   ViewerType  vtype;
1092 
1093   PetscFunctionBegin;
1094   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
1095   if (PetscTypeCompare(vtype,ASCII_VIEWER) || PetscTypeCompare(vtype,DRAW_VIEWER) ||
1096       PetscTypeCompare(vtype,MATLAB_VIEWER)) {
1097     ierr = MatView_MPIBAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
1098   } else if (PetscTypeCompare(vtype,BINARY_VIEWER)) {
1099     ierr = MatView_MPIBAIJ_Binary(mat,viewer);CHKERRQ(ierr);
1100   } else {
1101     SETERRQ(1,1,"Viewer type not supported by PETSc object");
1102   }
1103   PetscFunctionReturn(0);
1104 }
1105 
1106 #undef __FUNC__
1107 #define __FUNC__ "MatDestroy_MPIBAIJ"
1108 int MatDestroy_MPIBAIJ(Mat mat)
1109 {
1110   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1111   int         ierr;
1112 
1113   PetscFunctionBegin;
1114   if (--mat->refct > 0) PetscFunctionReturn(0);
1115 
1116   if (mat->mapping) {
1117     ierr = ISLocalToGlobalMappingDestroy(mat->mapping); CHKERRQ(ierr);
1118   }
1119   if (mat->bmapping) {
1120     ierr = ISLocalToGlobalMappingDestroy(mat->bmapping); CHKERRQ(ierr);
1121   }
1122   if (mat->rmap) {
1123     ierr = MapDestroy(mat->rmap);CHKERRQ(ierr);
1124   }
1125   if (mat->cmap) {
1126     ierr = MapDestroy(mat->cmap);CHKERRQ(ierr);
1127   }
1128 #if defined(USE_PETSC_LOG)
1129   PLogObjectState((PetscObject)mat,"Rows=%d, Cols=%d",baij->M,baij->N);
1130 #endif
1131 
1132   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
1133   PetscFree(baij->rowners);
1134   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
1135   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
1136   if (baij->colmap) PetscFree(baij->colmap);
1137   if (baij->garray) PetscFree(baij->garray);
1138   if (baij->lvec)   VecDestroy(baij->lvec);
1139   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
1140   if (baij->rowvalues) PetscFree(baij->rowvalues);
1141   if (baij->barray) PetscFree(baij->barray);
1142   if (baij->hd) PetscFree(baij->hd);
1143   PetscFree(baij);
1144   PLogObjectDestroy(mat);
1145   PetscHeaderDestroy(mat);
1146   PetscFunctionReturn(0);
1147 }
1148 
1149 #undef __FUNC__
1150 #define __FUNC__ "MatMult_MPIBAIJ"
1151 int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
1152 {
1153   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1154   int         ierr, nt;
1155 
1156   PetscFunctionBegin;
1157   ierr = VecGetLocalSize(xx,&nt);CHKERRQ(ierr);
1158   if (nt != a->n) {
1159     SETERRQ(PETSC_ERR_ARG_SIZ,0,"Incompatible partition of A and xx");
1160   }
1161   ierr = VecGetLocalSize(yy,&nt);CHKERRQ(ierr);
1162   if (nt != a->m) {
1163     SETERRQ(PETSC_ERR_ARG_SIZ,0,"Incompatible parition of A and yy");
1164   }
1165   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
1166   ierr = (*a->A->ops->mult)(a->A,xx,yy); CHKERRQ(ierr);
1167   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
1168   ierr = (*a->B->ops->multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
1169   ierr = VecScatterPostRecvs(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
1170   PetscFunctionReturn(0);
1171 }
1172 
1173 #undef __FUNC__
1174 #define __FUNC__ "MatMultAdd_MPIBAIJ"
1175 int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
1176 {
1177   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1178   int        ierr;
1179 
1180   PetscFunctionBegin;
1181   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
1182   ierr = (*a->A->ops->multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
1183   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
1184   ierr = (*a->B->ops->multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
1185   PetscFunctionReturn(0);
1186 }
1187 
1188 #undef __FUNC__
1189 #define __FUNC__ "MatMultTrans_MPIBAIJ"
1190 int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
1191 {
1192   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1193   int         ierr;
1194 
1195   PetscFunctionBegin;
1196   /* do nondiagonal part */
1197   ierr = (*a->B->ops->multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
1198   /* send it on its way */
1199   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
1200   /* do local part */
1201   ierr = (*a->A->ops->multtrans)(a->A,xx,yy); CHKERRQ(ierr);
1202   /* receive remote parts: note this assumes the values are not actually */
1203   /* inserted in yy until the next line, which is true for my implementation*/
1204   /* but is not perhaps always true. */
1205   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
1206   PetscFunctionReturn(0);
1207 }
1208 
1209 #undef __FUNC__
1210 #define __FUNC__ "MatMultTransAdd_MPIBAIJ"
1211 int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
1212 {
1213   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1214   int         ierr;
1215 
1216   PetscFunctionBegin;
1217   /* do nondiagonal part */
1218   ierr = (*a->B->ops->multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
1219   /* send it on its way */
1220   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
1221   /* do local part */
1222   ierr = (*a->A->ops->multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
1223   /* receive remote parts: note this assumes the values are not actually */
1224   /* inserted in yy until the next line, which is true for my implementation*/
1225   /* but is not perhaps always true. */
1226   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
1227   PetscFunctionReturn(0);
1228 }
1229 
1230 /*
1231   This only works correctly for square matrices where the subblock A->A is the
1232    diagonal block
1233 */
1234 #undef __FUNC__
1235 #define __FUNC__ "MatGetDiagonal_MPIBAIJ"
1236 int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
1237 {
1238   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1239   int         ierr;
1240 
1241   PetscFunctionBegin;
1242   if (a->M != a->N) SETERRQ(PETSC_ERR_SUP,0,"Supports only square matrix where A->A is diag block");
1243   ierr = MatGetDiagonal(a->A,v);CHKERRQ(ierr);
1244   PetscFunctionReturn(0);
1245 }
1246 
1247 #undef __FUNC__
1248 #define __FUNC__ "MatScale_MPIBAIJ"
1249 int MatScale_MPIBAIJ(Scalar *aa,Mat A)
1250 {
1251   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1252   int         ierr;
1253 
1254   PetscFunctionBegin;
1255   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
1256   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
1257   PetscFunctionReturn(0);
1258 }
1259 
1260 #undef __FUNC__
1261 #define __FUNC__ "MatGetSize_MPIBAIJ"
1262 int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
1263 {
1264   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1265 
1266   PetscFunctionBegin;
1267   if (m) *m = mat->M;
1268   if (n) *n = mat->N;
1269   PetscFunctionReturn(0);
1270 }
1271 
1272 #undef __FUNC__
1273 #define __FUNC__ "MatGetLocalSize_MPIBAIJ"
1274 int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
1275 {
1276   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1277 
1278   PetscFunctionBegin;
1279   *m = mat->m; *n = mat->n;
1280   PetscFunctionReturn(0);
1281 }
1282 
1283 #undef __FUNC__
1284 #define __FUNC__ "MatGetOwnershipRange_MPIBAIJ"
1285 int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
1286 {
1287   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1288 
1289   PetscFunctionBegin;
1290   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
1291   PetscFunctionReturn(0);
1292 }
1293 
1294 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
1295 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
1296 
1297 #undef __FUNC__
1298 #define __FUNC__ "MatGetRow_MPIBAIJ"
1299 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
1300 {
1301   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
1302   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
1303   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
1304   int        nztot, nzA, nzB, lrow, brstart = mat->rstart*bs, brend = mat->rend*bs;
1305   int        *cmap, *idx_p,cstart = mat->cstart;
1306 
1307   PetscFunctionBegin;
1308   if (mat->getrowactive == PETSC_TRUE) SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"Already active");
1309   mat->getrowactive = PETSC_TRUE;
1310 
1311   if (!mat->rowvalues && (idx || v)) {
1312     /*
1313         allocate enough space to hold information from the longest row.
1314     */
1315     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
1316     int     max = 1,mbs = mat->mbs,tmp;
1317     for ( i=0; i<mbs; i++ ) {
1318       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
1319       if (max < tmp) { max = tmp; }
1320     }
1321     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
1322     CHKPTRQ(mat->rowvalues);
1323     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
1324   }
1325 
1326   if (row < brstart || row >= brend) SETERRQ(PETSC_ERR_SUP,0,"Only local rows")
1327   lrow = row - brstart;
1328 
1329   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
1330   if (!v)   {pvA = 0; pvB = 0;}
1331   if (!idx) {pcA = 0; if (!v) pcB = 0;}
1332   ierr = (*mat->A->ops->getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1333   ierr = (*mat->B->ops->getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1334   nztot = nzA + nzB;
1335 
1336   cmap  = mat->garray;
1337   if (v  || idx) {
1338     if (nztot) {
1339       /* Sort by increasing column numbers, assuming A and B already sorted */
1340       int imark = -1;
1341       if (v) {
1342         *v = v_p = mat->rowvalues;
1343         for ( i=0; i<nzB; i++ ) {
1344           if (cmap[cworkB[i]/bs] < cstart)   v_p[i] = vworkB[i];
1345           else break;
1346         }
1347         imark = i;
1348         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
1349         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
1350       }
1351       if (idx) {
1352         *idx = idx_p = mat->rowindices;
1353         if (imark > -1) {
1354           for ( i=0; i<imark; i++ ) {
1355             idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs;
1356           }
1357         } else {
1358           for ( i=0; i<nzB; i++ ) {
1359             if (cmap[cworkB[i]/bs] < cstart)
1360               idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
1361             else break;
1362           }
1363           imark = i;
1364         }
1365         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart*bs + cworkA[i];
1366         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
1367       }
1368     } else {
1369       if (idx) *idx = 0;
1370       if (v)   *v   = 0;
1371     }
1372   }
1373   *nz = nztot;
1374   ierr = (*mat->A->ops->restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1375   ierr = (*mat->B->ops->restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1376   PetscFunctionReturn(0);
1377 }
1378 
1379 #undef __FUNC__
1380 #define __FUNC__ "MatRestoreRow_MPIBAIJ"
1381 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
1382 {
1383   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1384 
1385   PetscFunctionBegin;
1386   if (baij->getrowactive == PETSC_FALSE) {
1387     SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"MatGetRow not called");
1388   }
1389   baij->getrowactive = PETSC_FALSE;
1390   PetscFunctionReturn(0);
1391 }
1392 
1393 #undef __FUNC__
1394 #define __FUNC__ "MatGetBlockSize_MPIBAIJ"
1395 int MatGetBlockSize_MPIBAIJ(Mat mat,int *bs)
1396 {
1397   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1398 
1399   PetscFunctionBegin;
1400   *bs = baij->bs;
1401   PetscFunctionReturn(0);
1402 }
1403 
1404 #undef __FUNC__
1405 #define __FUNC__ "MatZeroEntries_MPIBAIJ"
1406 int MatZeroEntries_MPIBAIJ(Mat A)
1407 {
1408   Mat_MPIBAIJ *l = (Mat_MPIBAIJ *) A->data;
1409   int         ierr;
1410 
1411   PetscFunctionBegin;
1412   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
1413   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
1414   PetscFunctionReturn(0);
1415 }
1416 
1417 #undef __FUNC__
1418 #define __FUNC__ "MatGetInfo_MPIBAIJ"
1419 int MatGetInfo_MPIBAIJ(Mat matin,MatInfoType flag,MatInfo *info)
1420 {
1421   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) matin->data;
1422   Mat         A = a->A, B = a->B;
1423   int         ierr;
1424   double      isend[5], irecv[5];
1425 
1426   PetscFunctionBegin;
1427   info->block_size     = (double)a->bs;
1428   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
1429   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->nz_unneeded;
1430   isend[3] = info->memory;  isend[4] = info->mallocs;
1431   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
1432   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->nz_unneeded;
1433   isend[3] += info->memory;  isend[4] += info->mallocs;
1434   if (flag == MAT_LOCAL) {
1435     info->nz_used      = isend[0];
1436     info->nz_allocated = isend[1];
1437     info->nz_unneeded  = isend[2];
1438     info->memory       = isend[3];
1439     info->mallocs      = isend[4];
1440   } else if (flag == MAT_GLOBAL_MAX) {
1441     ierr = MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_MAX,matin->comm);CHKERRQ(ierr);
1442     info->nz_used      = irecv[0];
1443     info->nz_allocated = irecv[1];
1444     info->nz_unneeded  = irecv[2];
1445     info->memory       = irecv[3];
1446     info->mallocs      = irecv[4];
1447   } else if (flag == MAT_GLOBAL_SUM) {
1448     ierr = MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_SUM,matin->comm);CHKERRQ(ierr);
1449     info->nz_used      = irecv[0];
1450     info->nz_allocated = irecv[1];
1451     info->nz_unneeded  = irecv[2];
1452     info->memory       = irecv[3];
1453     info->mallocs      = irecv[4];
1454   }
1455   info->rows_global       = (double)a->M;
1456   info->columns_global    = (double)a->N;
1457   info->rows_local        = (double)a->m;
1458   info->columns_local     = (double)a->N;
1459   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
1460   info->fill_ratio_needed = 0;
1461   info->factor_mallocs    = 0;
1462   PetscFunctionReturn(0);
1463 }
1464 
1465 #undef __FUNC__
1466 #define __FUNC__ "MatSetOption_MPIBAIJ"
1467 int MatSetOption_MPIBAIJ(Mat A,MatOption op)
1468 {
1469   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1470 
1471   PetscFunctionBegin;
1472   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
1473       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
1474       op == MAT_COLUMNS_UNSORTED ||
1475       op == MAT_COLUMNS_SORTED ||
1476       op == MAT_NEW_NONZERO_ALLOCATION_ERROR ||
1477       op == MAT_NEW_NONZERO_LOCATION_ERROR) {
1478         MatSetOption(a->A,op);
1479         MatSetOption(a->B,op);
1480   } else if (op == MAT_ROW_ORIENTED) {
1481         a->roworiented = 1;
1482         MatSetOption(a->A,op);
1483         MatSetOption(a->B,op);
1484   } else if (op == MAT_ROWS_SORTED ||
1485              op == MAT_ROWS_UNSORTED ||
1486              op == MAT_SYMMETRIC ||
1487              op == MAT_STRUCTURALLY_SYMMETRIC ||
1488              op == MAT_YES_NEW_DIAGONALS ||
1489              op == MAT_USE_HASH_TABLE)
1490     PLogInfo(A,"Info:MatSetOption_MPIBAIJ:Option ignored\n");
1491   else if (op == MAT_COLUMN_ORIENTED) {
1492     a->roworiented = 0;
1493     MatSetOption(a->A,op);
1494     MatSetOption(a->B,op);
1495   } else if (op == MAT_IGNORE_OFF_PROC_ENTRIES) {
1496     a->donotstash = 1;
1497   } else if (op == MAT_NO_NEW_DIAGONALS) {
1498     SETERRQ(PETSC_ERR_SUP,0,"MAT_NO_NEW_DIAGONALS");
1499   } else if (op == MAT_USE_HASH_TABLE) {
1500     a->ht_flag = 1;
1501   } else {
1502     SETERRQ(PETSC_ERR_SUP,0,"unknown option");
1503   }
1504   PetscFunctionReturn(0);
1505 }
1506 
1507 #undef __FUNC__
1508 #define __FUNC__ "MatTranspose_MPIBAIJ("
1509 int MatTranspose_MPIBAIJ(Mat A,Mat *matout)
1510 {
1511   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) A->data;
1512   Mat_SeqBAIJ *Aloc;
1513   Mat        B;
1514   int        ierr,M=baij->M,N=baij->N,*ai,*aj,i,*rvals,j,k,col;
1515   int        bs=baij->bs,mbs=baij->mbs;
1516   Scalar     *a;
1517 
1518   PetscFunctionBegin;
1519   if (matout == PETSC_NULL && M != N) SETERRQ(PETSC_ERR_ARG_SIZ,0,"Square matrix only for in-place");
1520   ierr = MatCreateMPIBAIJ(A->comm,baij->bs,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);
1521   CHKERRQ(ierr);
1522 
1523   /* copy over the A part */
1524   Aloc = (Mat_SeqBAIJ*) baij->A->data;
1525   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1526   rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
1527 
1528   for ( i=0; i<mbs; i++ ) {
1529     rvals[0] = bs*(baij->rstart + i);
1530     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1531     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1532       col = (baij->cstart+aj[j])*bs;
1533       for (k=0; k<bs; k++ ) {
1534         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1535         col++; a += bs;
1536       }
1537     }
1538   }
1539   /* copy over the B part */
1540   Aloc = (Mat_SeqBAIJ*) baij->B->data;
1541   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1542   for ( i=0; i<mbs; i++ ) {
1543     rvals[0] = bs*(baij->rstart + i);
1544     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1545     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1546       col = baij->garray[aj[j]]*bs;
1547       for (k=0; k<bs; k++ ) {
1548         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1549         col++; a += bs;
1550       }
1551     }
1552   }
1553   PetscFree(rvals);
1554   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1555   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1556 
1557   if (matout != PETSC_NULL) {
1558     *matout = B;
1559   } else {
1560     PetscOps *Abops;
1561     MatOps   Aops;
1562 
1563     /* This isn't really an in-place transpose .... but free data structures from baij */
1564     PetscFree(baij->rowners);
1565     ierr = MatDestroy(baij->A); CHKERRQ(ierr);
1566     ierr = MatDestroy(baij->B); CHKERRQ(ierr);
1567     if (baij->colmap) PetscFree(baij->colmap);
1568     if (baij->garray) PetscFree(baij->garray);
1569     if (baij->lvec) VecDestroy(baij->lvec);
1570     if (baij->Mvctx) VecScatterDestroy(baij->Mvctx);
1571     PetscFree(baij);
1572 
1573     /*
1574        This is horrible, horrible code. We need to keep the
1575       A pointers for the bops and ops but copy everything
1576       else from C.
1577     */
1578     Abops = A->bops;
1579     Aops  = A->ops;
1580     PetscMemcpy(A,B,sizeof(struct _p_Mat));
1581     A->bops = Abops;
1582     A->ops  = Aops;
1583 
1584     PetscHeaderDestroy(B);
1585   }
1586   PetscFunctionReturn(0);
1587 }
1588 
1589 #undef __FUNC__
1590 #define __FUNC__ "MatDiagonalScale_MPIBAIJ"
1591 int MatDiagonalScale_MPIBAIJ(Mat A,Vec ll,Vec rr)
1592 {
1593   Mat a = ((Mat_MPIBAIJ *) A->data)->A;
1594   Mat b = ((Mat_MPIBAIJ *) A->data)->B;
1595   int ierr,s1,s2,s3;
1596 
1597   PetscFunctionBegin;
1598   if (ll)  {
1599     ierr = VecGetLocalSize(ll,&s1); CHKERRQ(ierr);
1600     ierr = MatGetLocalSize(A,&s2,&s3); CHKERRQ(ierr);
1601     if (s1!=s2) SETERRQ(PETSC_ERR_ARG_SIZ,0,"non-conforming local sizes");
1602     ierr = MatDiagonalScale(a,ll,0); CHKERRQ(ierr);
1603     ierr = MatDiagonalScale(b,ll,0); CHKERRQ(ierr);
1604   }
1605   if (rr) SETERRQ(PETSC_ERR_SUP,0,"not supported for right vector");
1606   PetscFunctionReturn(0);
1607 }
1608 
1609 #undef __FUNC__
1610 #define __FUNC__ "MatZeroRows_MPIBAIJ"
1611 int MatZeroRows_MPIBAIJ(Mat A,IS is,Scalar *diag)
1612 {
1613   Mat_MPIBAIJ    *l = (Mat_MPIBAIJ *) A->data;
1614   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
1615   int            *procs,*nprocs,j,found,idx,nsends,*work,row;
1616   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
1617   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
1618   int            *lens,imdex,*lrows,*values,bs=l->bs,rstart_bs=l->rstart_bs;
1619   MPI_Comm       comm = A->comm;
1620   MPI_Request    *send_waits,*recv_waits;
1621   MPI_Status     recv_status,*send_status;
1622   IS             istmp;
1623   PetscTruth     localdiag;
1624 
1625   PetscFunctionBegin;
1626   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
1627   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
1628 
1629   /*  first count number of contributors to each processor */
1630   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
1631   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
1632   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
1633   for ( i=0; i<N; i++ ) {
1634     idx = rows[i];
1635     found = 0;
1636     for ( j=0; j<size; j++ ) {
1637       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
1638         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
1639       }
1640     }
1641     if (!found) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Index out of range");
1642   }
1643   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
1644 
1645   /* inform other processors of number of messages and max length*/
1646   work   = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
1647   ierr   = MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
1648   nrecvs = work[rank];
1649   ierr   = MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);CHKERRQ(ierr);
1650   nmax   = work[rank];
1651   PetscFree(work);
1652 
1653   /* post receives:   */
1654   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); CHKPTRQ(rvalues);
1655   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
1656   for ( i=0; i<nrecvs; i++ ) {
1657     ierr = MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);CHKERRQ(ierr);
1658   }
1659 
1660   /* do sends:
1661      1) starts[i] gives the starting index in svalues for stuff going to
1662      the ith processor
1663   */
1664   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
1665   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
1666   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
1667   starts[0] = 0;
1668   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1669   for ( i=0; i<N; i++ ) {
1670     svalues[starts[owner[i]]++] = rows[i];
1671   }
1672   ISRestoreIndices(is,&rows);
1673 
1674   starts[0] = 0;
1675   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1676   count = 0;
1677   for ( i=0; i<size; i++ ) {
1678     if (procs[i]) {
1679       ierr = MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);CHKERRQ(ierr);
1680     }
1681   }
1682   PetscFree(starts);
1683 
1684   base = owners[rank]*bs;
1685 
1686   /*  wait on receives */
1687   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
1688   source = lens + nrecvs;
1689   count  = nrecvs; slen = 0;
1690   while (count) {
1691     ierr = MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);CHKERRQ(ierr);
1692     /* unpack receives into our local space */
1693     ierr = MPI_Get_count(&recv_status,MPI_INT,&n);CHKERRQ(ierr);
1694     source[imdex]  = recv_status.MPI_SOURCE;
1695     lens[imdex]  = n;
1696     slen += n;
1697     count--;
1698   }
1699   PetscFree(recv_waits);
1700 
1701   /* move the data into the send scatter */
1702   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
1703   count = 0;
1704   for ( i=0; i<nrecvs; i++ ) {
1705     values = rvalues + i*nmax;
1706     for ( j=0; j<lens[i]; j++ ) {
1707       lrows[count++] = values[j] - base;
1708     }
1709   }
1710   PetscFree(rvalues); PetscFree(lens);
1711   PetscFree(owner); PetscFree(nprocs);
1712 
1713   /* actually zap the local rows */
1714   ierr = ISCreateGeneral(PETSC_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
1715   PLogObjectParent(A,istmp);
1716 
1717   /*
1718         Zero the required rows. If the "diagonal block" of the matrix
1719      is square and the user wishes to set the diagonal we use seperate
1720      code so that MatSetValues() is not called for each diagonal allocating
1721      new memory, thus calling lots of mallocs and slowing things down.
1722 
1723        Contributed by: Mathew Knepley
1724   */
1725   localdiag = PETSC_FALSE;
1726   if (diag && (l->A->M == l->A->N)) {
1727     localdiag = PETSC_TRUE;
1728     ierr      = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
1729   } else {
1730     ierr = MatZeroRows(l->A,istmp,0); CHKERRQ(ierr);
1731   }
1732   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
1733   ierr = ISDestroy(istmp); CHKERRQ(ierr);
1734 
1735   if (diag && (localdiag == PETSC_FALSE)) {
1736     for ( i = 0; i < slen; i++ ) {
1737       row = lrows[i] + rstart_bs;
1738       ierr = MatSetValues(A,1,&row,1,&row,diag,INSERT_VALUES); CHKERRQ(ierr);
1739     }
1740     ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1741     ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1742   }
1743   PetscFree(lrows);
1744 
1745   /* wait on sends */
1746   if (nsends) {
1747     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
1748     ierr        = MPI_Waitall(nsends,send_waits,send_status);CHKERRQ(ierr);
1749     PetscFree(send_status);
1750   }
1751   PetscFree(send_waits); PetscFree(svalues);
1752 
1753   PetscFunctionReturn(0);
1754 }
1755 
1756 extern int MatPrintHelp_SeqBAIJ(Mat);
1757 #undef __FUNC__
1758 #define __FUNC__ "MatPrintHelp_MPIBAIJ"
1759 int MatPrintHelp_MPIBAIJ(Mat A)
1760 {
1761   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1762   MPI_Comm    comm = A->comm;
1763   static int  called = 0;
1764   int         ierr;
1765 
1766   PetscFunctionBegin;
1767   if (!a->rank) {
1768     ierr = MatPrintHelp_SeqBAIJ(a->A);CHKERRQ(ierr);
1769   }
1770   if (called) {PetscFunctionReturn(0);} else called = 1;
1771   (*PetscHelpPrintf)(comm," Options for MATMPIBAIJ matrix format (the defaults):\n");
1772   (*PetscHelpPrintf)(comm,"  -mat_use_hash_table <factor>: Use hashtable for efficient matrix assembly\n");
1773   PetscFunctionReturn(0);
1774 }
1775 
1776 #undef __FUNC__
1777 #define __FUNC__ "MatSetUnfactored_MPIBAIJ"
1778 int MatSetUnfactored_MPIBAIJ(Mat A)
1779 {
1780   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1781   int         ierr;
1782 
1783   PetscFunctionBegin;
1784   ierr = MatSetUnfactored(a->A); CHKERRQ(ierr);
1785   PetscFunctionReturn(0);
1786 }
1787 
1788 static int MatDuplicate_MPIBAIJ(Mat,MatDuplicateOption,Mat *);
1789 
1790 /* -------------------------------------------------------------------*/
1791 static struct _MatOps MatOps_Values = {
1792   MatSetValues_MPIBAIJ,
1793   MatGetRow_MPIBAIJ,
1794   MatRestoreRow_MPIBAIJ,
1795   MatMult_MPIBAIJ,
1796   MatMultAdd_MPIBAIJ,
1797   MatMultTrans_MPIBAIJ,
1798   MatMultTransAdd_MPIBAIJ,
1799   0,
1800   0,
1801   0,
1802   0,
1803   0,
1804   0,
1805   0,
1806   MatTranspose_MPIBAIJ,
1807   MatGetInfo_MPIBAIJ,
1808   0,
1809   MatGetDiagonal_MPIBAIJ,
1810   MatDiagonalScale_MPIBAIJ,
1811   MatNorm_MPIBAIJ,
1812   MatAssemblyBegin_MPIBAIJ,
1813   MatAssemblyEnd_MPIBAIJ,
1814   0,
1815   MatSetOption_MPIBAIJ,
1816   MatZeroEntries_MPIBAIJ,
1817   MatZeroRows_MPIBAIJ,
1818   0,
1819   0,
1820   0,
1821   0,
1822   MatGetSize_MPIBAIJ,
1823   MatGetLocalSize_MPIBAIJ,
1824   MatGetOwnershipRange_MPIBAIJ,
1825   0,
1826   0,
1827   0,
1828   0,
1829   MatDuplicate_MPIBAIJ,
1830   0,
1831   0,
1832   0,
1833   0,
1834   0,
1835   MatGetSubMatrices_MPIBAIJ,
1836   MatIncreaseOverlap_MPIBAIJ,
1837   MatGetValues_MPIBAIJ,
1838   0,
1839   MatPrintHelp_MPIBAIJ,
1840   MatScale_MPIBAIJ,
1841   0,
1842   0,
1843   0,
1844   MatGetBlockSize_MPIBAIJ,
1845   0,
1846   0,
1847   0,
1848   0,
1849   0,
1850   0,
1851   MatSetUnfactored_MPIBAIJ,
1852   0,
1853   MatSetValuesBlocked_MPIBAIJ,
1854   0,
1855   0,
1856   0,
1857   MatGetMaps_Petsc};
1858 
1859 #include "pc.h"
1860 EXTERN_C_BEGIN
1861 extern int PCSetUp_BJacobi_BAIJ(PC);
1862 EXTERN_C_END
1863 
1864 #undef __FUNC__
1865 #define __FUNC__ "MatCreateMPIBAIJ"
1866 /*@C
1867    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
1868    (block compressed row).  For good matrix assembly performance
1869    the user should preallocate the matrix storage by setting the parameters
1870    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1871    performance can be increased by more than a factor of 50.
1872 
1873    Collective on MPI_Comm
1874 
1875    Input Parameters:
1876 +  comm - MPI communicator
1877 .  bs   - size of blockk
1878 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1879            This value should be the same as the local size used in creating the
1880            y vector for the matrix-vector product y = Ax.
1881 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1882            This value should be the same as the local size used in creating the
1883            x vector for the matrix-vector product y = Ax.
1884 .  M - number of global rows (or PETSC_DETERMINE to have calculated if m is given)
1885 .  N - number of global columns (or PETSC_DETERMINE to have calculated if n is given)
1886 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
1887            submatrix  (same for all local rows)
1888 .  d_nzz - array containing the number of block nonzeros in the various block rows
1889            of the in diagonal portion of the local (possibly different for each block
1890            row) or PETSC_NULL.  You must leave room for the diagonal entry even if it is zero.
1891 .  o_nz  - number of block nonzeros per block row in the off-diagonal portion of local
1892            submatrix (same for all local rows).
1893 -  o_nzz - array containing the number of nonzeros in the various block rows of the
1894            off-diagonal portion of the local submatrix (possibly different for
1895            each block row) or PETSC_NULL.
1896 
1897    Output Parameter:
1898 .  A - the matrix
1899 
1900    Options Database Keys:
1901 .   -mat_no_unroll - uses code that does not unroll the loops in the
1902                      block calculations (much slower)
1903 .   -mat_block_size - size of the blocks to use
1904 .   -mat_mpi - use the parallel matrix data structures even on one processor
1905                (defaults to using SeqBAIJ format on one processor)
1906 
1907    Notes:
1908    The user MUST specify either the local or global matrix dimensions
1909    (possibly both).
1910 
1911    If PETSC_DECIDE or  PETSC_DETERMINE is used for a particular argument on one processor
1912    than it must be used on all processors that share the object for that argument.
1913 
1914    Storage Information:
1915    For a square global matrix we define each processor's diagonal portion
1916    to be its local rows and the corresponding columns (a square submatrix);
1917    each processor's off-diagonal portion encompasses the remainder of the
1918    local matrix (a rectangular submatrix).
1919 
1920    The user can specify preallocated storage for the diagonal part of
1921    the local submatrix with either d_nz or d_nnz (not both).  Set
1922    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1923    memory allocation.  Likewise, specify preallocated storage for the
1924    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1925 
1926    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1927    the figure below we depict these three local rows and all columns (0-11).
1928 
1929 .vb
1930            0 1 2 3 4 5 6 7 8 9 10 11
1931           -------------------
1932    row 3  |  o o o d d d o o o o o o
1933    row 4  |  o o o d d d o o o o o o
1934    row 5  |  o o o d d d o o o o o o
1935           -------------------
1936 .ve
1937 
1938    Thus, any entries in the d locations are stored in the d (diagonal)
1939    submatrix, and any entries in the o locations are stored in the
1940    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1941    stored simply in the MATSEQBAIJ format for compressed row storage.
1942 
1943    Now d_nz should indicate the number of block nonzeros per row in the d matrix,
1944    and o_nz should indicate the number of block nonzeros per row in the o matrix.
1945    In general, for PDE problems in which most nonzeros are near the diagonal,
1946    one expects d_nz >> o_nz.   For large problems you MUST preallocate memory
1947    or you will get TERRIBLE performance; see the users' manual chapter on
1948    matrices.
1949 
1950 .keywords: matrix, block, aij, compressed row, sparse, parallel
1951 
1952 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues(), MatCreateMPIAIJ()
1953 @*/
1954 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
1955                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1956 {
1957   Mat          B;
1958   Mat_MPIBAIJ  *b;
1959   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE,size,flg;
1960   int          flag1 = 0,flag2 = 0;
1961 
1962   PetscFunctionBegin;
1963   if (bs < 1) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Invalid block size specified, must be positive");
1964 
1965   MPI_Comm_size(comm,&size);
1966   ierr = OptionsHasName(PETSC_NULL,"-mat_mpibaij",&flag1); CHKERRQ(ierr);
1967   ierr = OptionsHasName(PETSC_NULL,"-mat_mpi",&flag2); CHKERRQ(ierr);
1968   if (!flag1 && !flag2 && size == 1) {
1969     if (M == PETSC_DECIDE) M = m;
1970     if (N == PETSC_DECIDE) N = n;
1971     ierr = MatCreateSeqBAIJ(comm,bs,M,N,d_nz,d_nnz,A); CHKERRQ(ierr);
1972     PetscFunctionReturn(0);
1973   }
1974 
1975   *A = 0;
1976   PetscHeaderCreate(B,_p_Mat,struct _MatOps,MAT_COOKIE,MATMPIBAIJ,"Mat",comm,MatDestroy,MatView);
1977   PLogObjectCreate(B);
1978   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
1979   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
1980   PetscMemcpy(B->ops,&MatOps_Values,sizeof(struct _MatOps));
1981 
1982   B->ops->destroy    = MatDestroy_MPIBAIJ;
1983   B->ops->view       = MatView_MPIBAIJ;
1984   B->mapping    = 0;
1985   B->factor     = 0;
1986   B->assembled  = PETSC_FALSE;
1987 
1988   B->insertmode = NOT_SET_VALUES;
1989   MPI_Comm_rank(comm,&b->rank);
1990   MPI_Comm_size(comm,&b->size);
1991 
1992   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL)) {
1993     SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1994   }
1995   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) {
1996     SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"either M or m should be specified");
1997   }
1998   if ( N == PETSC_DECIDE && n == PETSC_DECIDE) {
1999     SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"either N or n should be specified");
2000   }
2001   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
2002   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
2003 
2004   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
2005     work[0] = m; work[1] = n;
2006     mbs = m/bs; nbs = n/bs;
2007     ierr = MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );CHKERRQ(ierr);
2008     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
2009     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
2010   }
2011   if (m == PETSC_DECIDE) {
2012     Mbs = M/bs;
2013     if (Mbs*bs != M) SETERRQ(PETSC_ERR_ARG_SIZ,0,"No of global rows must be divisible by blocksize");
2014     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
2015     m   = mbs*bs;
2016   }
2017   if (n == PETSC_DECIDE) {
2018     Nbs = N/bs;
2019     if (Nbs*bs != N) SETERRQ(PETSC_ERR_ARG_SIZ,0,"No of global cols must be divisible by blocksize");
2020     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
2021     n   = nbs*bs;
2022   }
2023   if (mbs*bs != m || nbs*bs != n) {
2024     SETERRQ(PETSC_ERR_ARG_SIZ,0,"No of local rows, cols must be divisible by blocksize");
2025   }
2026 
2027   b->m = m; B->m = m;
2028   b->n = n; B->n = n;
2029   b->N = N; B->N = N;
2030   b->M = M; B->M = M;
2031   b->bs  = bs;
2032   b->bs2 = bs*bs;
2033   b->mbs = mbs;
2034   b->nbs = nbs;
2035   b->Mbs = Mbs;
2036   b->Nbs = Nbs;
2037 
2038   /* the information in the maps duplicates the information computed below, eventually
2039      we should remove the duplicate information that is not contained in the maps */
2040   ierr = MapCreateMPI(comm,m,M,&B->rmap);CHKERRQ(ierr);
2041   ierr = MapCreateMPI(comm,n,N,&B->cmap);CHKERRQ(ierr);
2042 
2043   /* build local table of row and column ownerships */
2044   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
2045   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIBAIJ));
2046   b->cowners = b->rowners + b->size + 2;
2047   ierr = MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
2048   b->rowners[0] = 0;
2049   for ( i=2; i<=b->size; i++ ) {
2050     b->rowners[i] += b->rowners[i-1];
2051   }
2052   b->rstart    = b->rowners[b->rank];
2053   b->rend      = b->rowners[b->rank+1];
2054   b->rstart_bs = b->rstart * bs;
2055   b->rend_bs   = b->rend * bs;
2056 
2057   ierr = MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
2058   b->cowners[0] = 0;
2059   for ( i=2; i<=b->size; i++ ) {
2060     b->cowners[i] += b->cowners[i-1];
2061   }
2062   b->cstart    = b->cowners[b->rank];
2063   b->cend      = b->cowners[b->rank+1];
2064   b->cstart_bs = b->cstart * bs;
2065   b->cend_bs   = b->cend * bs;
2066 
2067 
2068   if (d_nz == PETSC_DEFAULT) d_nz = 5;
2069   ierr = MatCreateSeqBAIJ(PETSC_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
2070   PLogObjectParent(B,b->A);
2071   if (o_nz == PETSC_DEFAULT) o_nz = 0;
2072   ierr = MatCreateSeqBAIJ(PETSC_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
2073   PLogObjectParent(B,b->B);
2074 
2075   /* build cache for off array entries formed */
2076   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
2077   b->donotstash  = 0;
2078   b->colmap      = 0;
2079   b->garray      = 0;
2080   b->roworiented = 1;
2081 
2082   /* stuff used in block assembly */
2083   b->barray       = 0;
2084 
2085   /* stuff used for matrix vector multiply */
2086   b->lvec         = 0;
2087   b->Mvctx        = 0;
2088 
2089   /* stuff for MatGetRow() */
2090   b->rowindices   = 0;
2091   b->rowvalues    = 0;
2092   b->getrowactive = PETSC_FALSE;
2093 
2094   /* hash table stuff */
2095   b->ht           = 0;
2096   b->hd           = 0;
2097   b->ht_size      = 0;
2098   b->ht_flag      = 0;
2099   b->ht_fact      = 0;
2100   b->ht_total_ct  = 0;
2101   b->ht_insert_ct = 0;
2102 
2103   *A = B;
2104   ierr = OptionsHasName(PETSC_NULL,"-mat_use_hash_table",&flg); CHKERRQ(ierr);
2105   if (flg) {
2106     double fact = 1.39;
2107     ierr = MatSetOption(B,MAT_USE_HASH_TABLE); CHKERRQ(ierr);
2108     ierr = OptionsGetDouble(PETSC_NULL,"-mat_use_hash_table",&fact,&flg); CHKERRQ(ierr);
2109     if (fact <= 1.0) fact = 1.39;
2110     ierr = MatMPIBAIJSetHashTableFactor(B,fact); CHKERRQ(ierr);
2111     PLogInfo(0,"MatCreateMPIBAIJ:Hash table Factor used %5.2f\n",fact);
2112   }
2113   ierr = PetscObjectComposeFunction((PetscObject)B,"PCSetUp_BJacobi_C",
2114                                      "PCSetUp_BJacobi_BAIJ",
2115                                      (void*)PCSetUp_BJacobi_BAIJ);CHKERRQ(ierr);
2116   PetscFunctionReturn(0);
2117 }
2118 
2119 #undef __FUNC__
2120 #define __FUNC__ "MatDuplicate_MPIBAIJ"
2121 static int MatDuplicate_MPIBAIJ(Mat matin,MatDuplicateOption cpvalues,Mat *newmat)
2122 {
2123   Mat         mat;
2124   Mat_MPIBAIJ *a,*oldmat = (Mat_MPIBAIJ *) matin->data;
2125   int         ierr, len=0, flg;
2126 
2127   PetscFunctionBegin;
2128   *newmat       = 0;
2129   PetscHeaderCreate(mat,_p_Mat,struct _MatOps,MAT_COOKIE,MATMPIBAIJ,"Mat",matin->comm,MatDestroy,MatView);
2130   PLogObjectCreate(mat);
2131   mat->data       = (void *) (a = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(a);
2132   PetscMemcpy(mat->ops,&MatOps_Values,sizeof(struct _MatOps));
2133   mat->ops->destroy    = MatDestroy_MPIBAIJ;
2134   mat->ops->view       = MatView_MPIBAIJ;
2135   mat->factor          = matin->factor;
2136   mat->assembled       = PETSC_TRUE;
2137 
2138   a->m = mat->m   = oldmat->m;
2139   a->n = mat->n   = oldmat->n;
2140   a->M = mat->M   = oldmat->M;
2141   a->N = mat->N   = oldmat->N;
2142 
2143   a->bs  = oldmat->bs;
2144   a->bs2 = oldmat->bs2;
2145   a->mbs = oldmat->mbs;
2146   a->nbs = oldmat->nbs;
2147   a->Mbs = oldmat->Mbs;
2148   a->Nbs = oldmat->Nbs;
2149 
2150   a->rstart       = oldmat->rstart;
2151   a->rend         = oldmat->rend;
2152   a->cstart       = oldmat->cstart;
2153   a->cend         = oldmat->cend;
2154   a->size         = oldmat->size;
2155   a->rank         = oldmat->rank;
2156   mat->insertmode = NOT_SET_VALUES;
2157   a->rowvalues    = 0;
2158   a->getrowactive = PETSC_FALSE;
2159   a->barray       = 0;
2160 
2161   /* hash table stuff */
2162   a->ht           = 0;
2163   a->hd           = 0;
2164   a->ht_size      = 0;
2165   a->ht_flag      = oldmat->ht_flag;
2166   a->ht_fact      = oldmat->ht_fact;
2167   a->ht_total_ct  = 0;
2168   a->ht_insert_ct = 0;
2169 
2170 
2171   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
2172   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIBAIJ));
2173   a->cowners = a->rowners + a->size + 2;
2174   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
2175   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
2176   if (oldmat->colmap) {
2177     a->colmap = (int *) PetscMalloc((a->Nbs)*sizeof(int));CHKPTRQ(a->colmap);
2178     PLogObjectMemory(mat,(a->Nbs)*sizeof(int));
2179     PetscMemcpy(a->colmap,oldmat->colmap,(a->Nbs)*sizeof(int));
2180   } else a->colmap = 0;
2181   if (oldmat->garray && (len = ((Mat_SeqBAIJ *) (oldmat->B->data))->nbs)) {
2182     a->garray = (int *) PetscMalloc(len*sizeof(int)); CHKPTRQ(a->garray);
2183     PLogObjectMemory(mat,len*sizeof(int));
2184     PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
2185   } else a->garray = 0;
2186 
2187   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
2188   PLogObjectParent(mat,a->lvec);
2189   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
2190 
2191   PLogObjectParent(mat,a->Mvctx);
2192   ierr =  MatDuplicate(oldmat->A,cpvalues,&a->A); CHKERRQ(ierr);
2193   PLogObjectParent(mat,a->A);
2194   ierr =  MatDuplicate(oldmat->B,cpvalues,&a->B); CHKERRQ(ierr);
2195   PLogObjectParent(mat,a->B);
2196   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
2197   ierr = FListDuplicate(mat->qlist,&matin->qlist);CHKERRQ(ierr);
2198   if (flg) {
2199     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
2200   }
2201   *newmat = mat;
2202   PetscFunctionReturn(0);
2203 }
2204 
2205 #include "sys.h"
2206 
2207 #undef __FUNC__
2208 #define __FUNC__ "MatLoad_MPIBAIJ"
2209 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
2210 {
2211   Mat          A;
2212   int          i, nz, ierr, j,rstart, rend, fd;
2213   Scalar       *vals,*buf;
2214   MPI_Comm     comm = ((PetscObject)viewer)->comm;
2215   MPI_Status   status;
2216   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
2217   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
2218   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,Mbs,mbs,extra_rows;
2219   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
2220   int          dcount,kmax,k,nzcount,tmp;
2221 
2222   PetscFunctionBegin;
2223   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
2224 
2225   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
2226   if (!rank) {
2227     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
2228     ierr = PetscBinaryRead(fd,(char *)header,4,PETSC_INT); CHKERRQ(ierr);
2229     if (header[0] != MAT_COOKIE) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"not matrix object");
2230     if (header[3] < 0) {
2231       SETERRQ(PETSC_ERR_FILE_UNEXPECTED,1,"Matrix stored in special format, cannot load as MPIBAIJ");
2232     }
2233   }
2234 
2235   ierr = MPI_Bcast(header+1,3,MPI_INT,0,comm);CHKERRQ(ierr);
2236   M = header[1]; N = header[2];
2237 
2238   if (M != N) SETERRQ(PETSC_ERR_SUP,0,"Can only do square matrices");
2239 
2240   /*
2241      This code adds extra rows to make sure the number of rows is
2242      divisible by the blocksize
2243   */
2244   Mbs        = M/bs;
2245   extra_rows = bs - M + bs*(Mbs);
2246   if (extra_rows == bs) extra_rows = 0;
2247   else                  Mbs++;
2248   if (extra_rows &&!rank) {
2249     PLogInfo(0,"MatLoad_MPIBAIJ:Padding loaded matrix to match blocksize\n");
2250   }
2251 
2252   /* determine ownership of all rows */
2253   mbs = Mbs/size + ((Mbs % size) > rank);
2254   m   = mbs * bs;
2255   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
2256   browners = rowners + size + 1;
2257   ierr = MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
2258   rowners[0] = 0;
2259   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
2260   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
2261   rstart = rowners[rank];
2262   rend   = rowners[rank+1];
2263 
2264   /* distribute row lengths to all processors */
2265   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
2266   if (!rank) {
2267     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
2268     ierr = PetscBinaryRead(fd,rowlengths,M,PETSC_INT); CHKERRQ(ierr);
2269     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
2270     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
2271     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
2272     ierr = MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);CHKERRQ(ierr);
2273     PetscFree(sndcounts);
2274   } else {
2275     ierr = MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);CHKERRQ(ierr);
2276   }
2277 
2278   if (!rank) {
2279     /* calculate the number of nonzeros on each processor */
2280     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
2281     PetscMemzero(procsnz,size*sizeof(int));
2282     for ( i=0; i<size; i++ ) {
2283       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
2284         procsnz[i] += rowlengths[j];
2285       }
2286     }
2287     PetscFree(rowlengths);
2288 
2289     /* determine max buffer needed and allocate it */
2290     maxnz = 0;
2291     for ( i=0; i<size; i++ ) {
2292       maxnz = PetscMax(maxnz,procsnz[i]);
2293     }
2294     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
2295 
2296     /* read in my part of the matrix column indices  */
2297     nz = procsnz[0];
2298     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
2299     mycols = ibuf;
2300     if (size == 1)  nz -= extra_rows;
2301     ierr = PetscBinaryRead(fd,mycols,nz,PETSC_INT); CHKERRQ(ierr);
2302     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
2303 
2304     /* read in every ones (except the last) and ship off */
2305     for ( i=1; i<size-1; i++ ) {
2306       nz   = procsnz[i];
2307       ierr = PetscBinaryRead(fd,cols,nz,PETSC_INT); CHKERRQ(ierr);
2308       ierr = MPI_Send(cols,nz,MPI_INT,i,tag,comm);CHKERRQ(ierr);
2309     }
2310     /* read in the stuff for the last proc */
2311     if ( size != 1 ) {
2312       nz   = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
2313       ierr = PetscBinaryRead(fd,cols,nz,PETSC_INT); CHKERRQ(ierr);
2314       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
2315       ierr = MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);CHKERRQ(ierr);
2316     }
2317     PetscFree(cols);
2318   } else {
2319     /* determine buffer space needed for message */
2320     nz = 0;
2321     for ( i=0; i<m; i++ ) {
2322       nz += locrowlens[i];
2323     }
2324     ibuf   = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
2325     mycols = ibuf;
2326     /* receive message of column indices*/
2327     ierr = MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);CHKERRQ(ierr);
2328     ierr = MPI_Get_count(&status,MPI_INT,&maxnz);CHKERRQ(ierr);
2329     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"something is wrong with file");
2330   }
2331 
2332   /* loop over local rows, determining number of off diagonal entries */
2333   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
2334   odlens = dlens + (rend-rstart);
2335   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
2336   PetscMemzero(mask,3*Mbs*sizeof(int));
2337   masked1 = mask    + Mbs;
2338   masked2 = masked1 + Mbs;
2339   rowcount = 0; nzcount = 0;
2340   for ( i=0; i<mbs; i++ ) {
2341     dcount  = 0;
2342     odcount = 0;
2343     for ( j=0; j<bs; j++ ) {
2344       kmax = locrowlens[rowcount];
2345       for ( k=0; k<kmax; k++ ) {
2346         tmp = mycols[nzcount++]/bs;
2347         if (!mask[tmp]) {
2348           mask[tmp] = 1;
2349           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
2350           else masked1[dcount++] = tmp;
2351         }
2352       }
2353       rowcount++;
2354     }
2355 
2356     dlens[i]  = dcount;
2357     odlens[i] = odcount;
2358 
2359     /* zero out the mask elements we set */
2360     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
2361     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
2362   }
2363 
2364   /* create our matrix */
2365   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);
2366          CHKERRQ(ierr);
2367   A = *newmat;
2368   MatSetOption(A,MAT_COLUMNS_SORTED);
2369 
2370   if (!rank) {
2371     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
2372     /* read in my part of the matrix numerical values  */
2373     nz = procsnz[0];
2374     vals = buf;
2375     mycols = ibuf;
2376     if (size == 1)  nz -= extra_rows;
2377     ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
2378     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
2379 
2380     /* insert into matrix */
2381     jj      = rstart*bs;
2382     for ( i=0; i<m; i++ ) {
2383       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
2384       mycols += locrowlens[i];
2385       vals   += locrowlens[i];
2386       jj++;
2387     }
2388     /* read in other processors (except the last one) and ship out */
2389     for ( i=1; i<size-1; i++ ) {
2390       nz   = procsnz[i];
2391       vals = buf;
2392       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
2393       ierr = MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);CHKERRQ(ierr);
2394     }
2395     /* the last proc */
2396     if ( size != 1 ){
2397       nz   = procsnz[i] - extra_rows;
2398       vals = buf;
2399       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
2400       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
2401       ierr = MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);CHKERRQ(ierr);
2402     }
2403     PetscFree(procsnz);
2404   } else {
2405     /* receive numeric values */
2406     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
2407 
2408     /* receive message of values*/
2409     vals   = buf;
2410     mycols = ibuf;
2411     ierr   = MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);CHKERRQ(ierr);
2412     ierr   = MPI_Get_count(&status,MPIU_SCALAR,&maxnz);CHKERRQ(ierr);
2413     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"something is wrong with file");
2414 
2415     /* insert into matrix */
2416     jj      = rstart*bs;
2417     for ( i=0; i<m; i++ ) {
2418       ierr    = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
2419       mycols += locrowlens[i];
2420       vals   += locrowlens[i];
2421       jj++;
2422     }
2423   }
2424   PetscFree(locrowlens);
2425   PetscFree(buf);
2426   PetscFree(ibuf);
2427   PetscFree(rowners);
2428   PetscFree(dlens);
2429   PetscFree(mask);
2430   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2431   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2432   PetscFunctionReturn(0);
2433 }
2434 
2435 
2436 
2437 #undef __FUNC__
2438 #define __FUNC__ "MatMPIBAIJSetHashTableFactor"
2439 /*@
2440    MatMPIBAIJSetHashTableFactor - Sets the factor required to compute the size of the HashTable.
2441 
2442    Input Parameters:
2443 .  mat  - the matrix
2444 .  fact - factor
2445 
2446    Collective on Mat
2447 
2448   Notes:
2449    This can also be set by the command line option: -mat_use_hash_table fact
2450 
2451 .keywords: matrix, hashtable, factor, HT
2452 
2453 .seealso: MatSetOption()
2454 @*/
2455 int MatMPIBAIJSetHashTableFactor(Mat mat,double fact)
2456 {
2457   Mat_MPIBAIJ *baij;
2458 
2459   PetscFunctionBegin;
2460   PetscValidHeaderSpecific(mat,MAT_COOKIE);
2461   if (mat->type != MATMPIBAIJ) {
2462       SETERRQ(PETSC_ERR_ARG_WRONG,1,"Incorrect matrix type. Use MPIBAIJ only.");
2463   }
2464   baij = (Mat_MPIBAIJ*) mat->data;
2465   baij->ht_fact = fact;
2466   PetscFunctionReturn(0);
2467 }
2468