xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision 3369ce9a255b285539dc954b9ad9cefcea2f84b6)
1 #ifndef lint
2 static char vcid[] = "$Id: mpibaij.c,v 1.58 1997/03/13 21:03:42 balay Exp bsmith $";
3 #endif
4 
5 #include "pinclude/pviewer.h"
6 #include "src/mat/impls/baij/mpi/mpibaij.h"
7 #include "src/vec/vecimpl.h"
8 
9 
10 extern int MatSetUpMultiply_MPIBAIJ(Mat);
11 extern int DisAssemble_MPIBAIJ(Mat);
12 extern int MatIncreaseOverlap_MPIBAIJ(Mat,int,IS *,int);
13 extern int MatGetSubMatrices_MPIBAIJ(Mat,int,IS *,IS *,MatGetSubMatrixCall,Mat **);
14 extern int MatLUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,Mat *);
15 extern int MatLUFactorNumeric_MPIBAIJ(Mat,Mat *);
16 extern int MatLUFactor_MPIBAIJ(Mat,IS,IS,double);
17 extern int MatSolve_MPIBAIJ(Mat,Vec,Vec);
18 extern int MatSolveAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
19 extern int MatSolveTrans_MPIBAIJ(Mat,Vec,Vec);
20 extern int MatSolveTransAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
21 extern int MatILUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,int,Mat *);
22 
23 
24 /*
25      Local utility routine that creates a mapping from the global column
26    number to the local number in the off-diagonal part of the local
27    storage of the matrix.  This is done in a non scable way since the
28    length of colmap equals the global matrix length.
29 */
30 #undef __FUNC__
31 #define __FUNC__ "CreateColmap_MPIBAIJ_Private"
32 static int CreateColmap_MPIBAIJ_Private(Mat mat)
33 {
34   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
35   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
36   int         nbs = B->nbs,i,bs=B->bs;;
37 
38   baij->colmap = (int *) PetscMalloc(baij->Nbs*sizeof(int));CHKPTRQ(baij->colmap);
39   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
40   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
41   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i*bs+1;
42   return 0;
43 }
44 
45 #undef __FUNC__
46 #define __FUNC__ "MatGetRowIJ_MPIBAIJ("
47 static int MatGetRowIJ_MPIBAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
48                             PetscTruth *done)
49 {
50   Mat_MPIBAIJ *aij = (Mat_MPIBAIJ *) mat->data;
51   int         ierr;
52   if (aij->size == 1) {
53     ierr = MatGetRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
54   } else SETERRQ(1,0,"not supported in parallel");
55   return 0;
56 }
57 
58 #undef __FUNC__
59 #define __FUNC__ "MatRestoreRowIJ_MPIBAIJ"
60 static int MatRestoreRowIJ_MPIBAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
61                                 PetscTruth *done)
62 {
63   Mat_MPIBAIJ *aij = (Mat_MPIBAIJ *) mat->data;
64   int        ierr;
65   if (aij->size == 1) {
66     ierr = MatRestoreRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
67   } else SETERRQ(1,0,"not supported in parallel");
68   return 0;
69 }
70 #define CHUNKSIZE  10
71 
72 #define  MatSetValues_SeqBAIJ_A_Private(row,col,value,addv) \
73 { \
74  \
75     brow = row/bs;  \
76     rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
77     rmax = imax[brow]; nrow = ailen[brow]; \
78       bcol = col/bs; \
79       ridx = row % bs; cidx = col % bs; \
80       low = 0; high = nrow; \
81       while (high-low > 3) { \
82         t = (low+high)/2; \
83         if (rp[t] > bcol) high = t; \
84         else              low  = t; \
85       } \
86       for ( _i=low; _i<high; _i++ ) { \
87         if (rp[_i] > bcol) break; \
88         if (rp[_i] == bcol) { \
89           bap  = ap +  bs2*_i + bs*cidx + ridx; \
90           if (addv == ADD_VALUES) *bap += value;  \
91           else                    *bap  = value;  \
92           goto _noinsert; \
93         } \
94       } \
95       if (a->nonew) goto _noinsert; \
96       if (nrow >= rmax) { \
97         /* there is no extra room in row, therefore enlarge */ \
98         int    new_nz = ai[a->mbs] + CHUNKSIZE,len,*new_i,*new_j; \
99         Scalar *new_a; \
100  \
101         /* malloc new storage space */ \
102         len     = new_nz*(sizeof(int)+bs2*sizeof(Scalar))+(a->mbs+1)*sizeof(int); \
103         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
104         new_j   = (int *) (new_a + bs2*new_nz); \
105         new_i   = new_j + new_nz; \
106  \
107         /* copy over old data into new slots */ \
108         for ( ii=0; ii<brow+1; ii++ ) {new_i[ii] = ai[ii];} \
109         for ( ii=brow+1; ii<a->mbs+1; ii++ ) {new_i[ii] = ai[ii]+CHUNKSIZE;} \
110         PetscMemcpy(new_j,aj,(ai[brow]+nrow)*sizeof(int)); \
111         len = (new_nz - CHUNKSIZE - ai[brow] - nrow); \
112         PetscMemcpy(new_j+ai[brow]+nrow+CHUNKSIZE,aj+ai[brow]+nrow, \
113                                                            len*sizeof(int)); \
114         PetscMemcpy(new_a,aa,(ai[brow]+nrow)*bs2*sizeof(Scalar)); \
115         PetscMemzero(new_a+bs2*(ai[brow]+nrow),bs2*CHUNKSIZE*sizeof(Scalar)); \
116         PetscMemcpy(new_a+bs2*(ai[brow]+nrow+CHUNKSIZE), \
117                     aa+bs2*(ai[brow]+nrow),bs2*len*sizeof(Scalar));  \
118         /* free up old matrix storage */ \
119         PetscFree(a->a);  \
120         if (!a->singlemalloc) {PetscFree(a->i);PetscFree(a->j);} \
121         aa = a->a = new_a; ai = a->i = new_i; aj = a->j = new_j;  \
122         a->singlemalloc = 1; \
123  \
124         rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
125         rmax = imax[brow] = imax[brow] + CHUNKSIZE; \
126         PLogObjectMemory(A,CHUNKSIZE*(sizeof(int) + bs2*sizeof(Scalar))); \
127         a->maxnz += bs2*CHUNKSIZE; \
128         a->reallocs++; \
129         a->nz++; \
130       } \
131       N = nrow++ - 1;  \
132       /* shift up all the later entries in this row */ \
133       for ( ii=N; ii>=_i; ii-- ) { \
134         rp[ii+1] = rp[ii]; \
135         PetscMemcpy(ap+bs2*(ii+1),ap+bs2*(ii),bs2*sizeof(Scalar)); \
136       } \
137       if (N>=_i) PetscMemzero(ap+bs2*_i,bs2*sizeof(Scalar));  \
138       rp[_i]                      = bcol;  \
139       ap[bs2*_i + bs*cidx + ridx] = value;  \
140       _noinsert:; \
141     ailen[brow] = nrow; \
142 }
143 
144 extern int MatSetValues_SeqBAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
145 #undef __FUNC__
146 #define __FUNC__ "MatSetValues_MPIBAIJ"
147 int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
148 {
149   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
150   Scalar      value;
151   int         ierr,i,j,row,col;
152   int         roworiented = baij->roworiented,rstart_orig=baij->rstart_bs ;
153   int         rend_orig=baij->rend_bs,cstart_orig=baij->cstart_bs;
154   int         cend_orig=baij->cend_bs,bs=baij->bs;
155 
156   /* Some Variables required in the macro */
157   Mat         A = baij->A;
158   Mat_SeqBAIJ *a = (Mat_SeqBAIJ *) (A)->data;
159   int         *rp,ii,nrow,_i,rmax,N;
160   int         *imax=a->imax,*ai=a->i,*ailen=a->ilen;
161   int         *aj=a->j,brow,bcol;
162   int         low,high,t,ridx,cidx,bs2=a->bs2;
163   Scalar      *ap,*aa=a->a,*bap;
164 
165   for ( i=0; i<m; i++ ) {
166 #if defined(PETSC_BOPT_g)
167     if (im[i] < 0) SETERRQ(1,0,"Negative row");
168     if (im[i] >= baij->M) SETERRQ(1,0,"Row too large");
169 #endif
170     if (im[i] >= rstart_orig && im[i] < rend_orig) {
171       row = im[i] - rstart_orig;
172       for ( j=0; j<n; j++ ) {
173         if (in[j] >= cstart_orig && in[j] < cend_orig){
174           col = in[j] - cstart_orig;
175           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
176           MatSetValues_SeqBAIJ_A_Private(row,col,value,addv);
177           /* ierr = MatSetValues_SeqBAIJ(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
178         }
179 #if defined(PETSC_BOPT_g)
180         else if (in[j] < 0) {SETERRQ(1,0,"Negative column");}
181         else if (in[j] >= baij->N) {SETERRQ(1,0,"Col too large");}
182 #endif
183         else {
184           if (mat->was_assembled) {
185             if (!baij->colmap) {
186               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
187             }
188             col = baij->colmap[in[j]/bs] - 1 + in[j]%bs;
189             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
190               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
191               col =  in[j];
192             }
193           }
194           else col = in[j];
195           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
196           ierr = MatSetValues_SeqBAIJ(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
197         }
198       }
199     }
200     else {
201       if (roworiented && !baij->donotstash) {
202         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
203       }
204       else {
205         if (!baij->donotstash) {
206           row = im[i];
207 	  for ( j=0; j<n; j++ ) {
208 	    ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
209           }
210         }
211       }
212     }
213   }
214   return 0;
215 }
216 
217 extern int MatSetValuesBlocked_SeqBAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
218 #undef __FUNC__
219 #define __FUNC__ "MatSetValuesBlocked_MPIBAIJ"
220 int MatSetValuesBlocked_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
221 {
222   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
223   Scalar      *value,*tmp;
224   int         ierr,i,j,ii,jj,row,col,k,l;
225   int         roworiented = baij->roworiented,rstart=baij->rstart ;
226   int         rend=baij->rend,cstart=baij->cstart,stepval;
227   int         cend=baij->cend,bs=baij->bs,bs2=baij->bs2;
228 
229   /* Should be stashed somewhere to avoid multiple mallocs */
230   tmp = (Scalar*) PetscMalloc(bs2*sizeof(Scalar)); CHKPTRQ(tmp);
231   if (roworiented) {
232     stepval = (n-1)*bs;
233   } else {
234     stepval = (m-1)*bs;
235   }
236   for ( i=0; i<m; i++ ) {
237 #if defined(PETSC_BOPT_g)
238     if (im[i] < 0) SETERRQ(1,0,"Negative row");
239     if (im[i] >= baij->Mbs) SETERRQ(1,0,"Row too large");
240 #endif
241     if (im[i] >= rstart && im[i] < rend) {
242       row = im[i] - rstart;
243       for ( j=0; j<n; j++ ) {
244         if (roworiented) {
245           value = v + i*(stepval+bs)*bs + j*bs;
246         } else {
247           value = v + j*(stepval+bs)*bs + i*bs;
248         }
249         for ( ii=0; ii<bs; ii++,value+=stepval )
250           for (jj=0; jj<bs; jj++ )
251             *tmp++  = *value++;
252         tmp -=bs2;
253 
254         if (in[j] >= cstart && in[j] < cend){
255           col = in[j] - cstart;
256           ierr = MatSetValuesBlocked_SeqBAIJ(baij->A,1,&row,1,&col,tmp,addv);CHKERRQ(ierr);
257         }
258 #if defined(PETSC_BOPT_g)
259         else if (in[j] < 0) {SETERRQ(1,0,"Negative column");}
260         else if (in[j] >= baij->Nbs) {SETERRQ(1,0,"Col too large");}
261 #endif
262         else {
263           if (mat->was_assembled) {
264             if (!baij->colmap) {
265               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
266             }
267             col = baij->colmap[in[j]] - 1;
268             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
269               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
270               col =  in[j];
271             }
272           }
273           else col = in[j];
274           ierr = MatSetValuesBlocked_SeqBAIJ(baij->B,1,&row,1,&col,tmp,addv);CHKERRQ(ierr);
275         }
276       }
277     }
278     else {
279       if (!baij->donotstash) {
280         if (roworiented ) {
281           row   = im[i]*bs;
282           value = v + i*(stepval+bs)*bs;
283           for ( j=0; j<bs; j++,row++ ) {
284             for ( k=0; k<n; k++ ) {
285               for ( col=in[k]*bs,l=0; l<bs; l++,col++) {
286                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
287               }
288             }
289           }
290         }
291         else {
292           for ( j=0; j<n; j++ ) {
293             value = v + j*(stepval+bs)*bs + i*bs;
294             col   = in[j]*bs;
295             for ( k=0; k<bs; k++,col++,value+=stepval) {
296               for ( row = im[i]*bs,l=0; l<bs; l++,row++) {
297                 ierr = StashValues_Private(&baij->stash,row,1,&col,value++,addv);CHKERRQ(ierr);
298               }
299             }
300           }
301 
302         }
303       }
304     }
305   }
306   PetscFree(tmp);
307   return 0;
308 }
309 
310 #undef __FUNC__
311 #define __FUNC__ "MatGetValues_MPIBAIJ"
312 int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
313 {
314   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
315   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
316   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col;
317 
318   for ( i=0; i<m; i++ ) {
319     if (idxm[i] < 0) SETERRQ(1,0,"Negative row");
320     if (idxm[i] >= baij->M) SETERRQ(1,0,"Row too large");
321     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
322       row = idxm[i] - bsrstart;
323       for ( j=0; j<n; j++ ) {
324         if (idxn[j] < 0) SETERRQ(1,0,"Negative column");
325         if (idxn[j] >= baij->N) SETERRQ(1,0,"Col too large");
326         if (idxn[j] >= bscstart && idxn[j] < bscend){
327           col = idxn[j] - bscstart;
328           ierr = MatGetValues(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
329         }
330         else {
331           if (!baij->colmap) {
332             ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
333           }
334           if((baij->colmap[idxn[j]/bs]-1 < 0) ||
335              (baij->garray[(baij->colmap[idxn[j]/bs]-1)/bs] != idxn[j]/bs)) *(v+i*n+j) = 0.0;
336           else {
337             col  = (baij->colmap[idxn[j]/bs]-1) + idxn[j]%bs;
338             ierr = MatGetValues(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
339           }
340         }
341       }
342     }
343     else {
344       SETERRQ(1,0,"Only local values currently supported");
345     }
346   }
347   return 0;
348 }
349 
350 #undef __FUNC__
351 #define __FUNC__ "MatNorm_MPIBAIJ"
352 int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
353 {
354   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
355   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
356   int        ierr, i,bs2=baij->bs2;
357   double     sum = 0.0;
358   Scalar     *v;
359 
360   if (baij->size == 1) {
361     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
362   } else {
363     if (type == NORM_FROBENIUS) {
364       v = amat->a;
365       for (i=0; i<amat->nz*bs2; i++ ) {
366 #if defined(PETSC_COMPLEX)
367         sum += real(conj(*v)*(*v)); v++;
368 #else
369         sum += (*v)*(*v); v++;
370 #endif
371       }
372       v = bmat->a;
373       for (i=0; i<bmat->nz*bs2; i++ ) {
374 #if defined(PETSC_COMPLEX)
375         sum += real(conj(*v)*(*v)); v++;
376 #else
377         sum += (*v)*(*v); v++;
378 #endif
379       }
380       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
381       *norm = sqrt(*norm);
382     }
383     else
384       SETERRQ(PETSC_ERR_SUP,0,"No support for this norm yet");
385   }
386   return 0;
387 }
388 
389 #undef __FUNC__
390 #define __FUNC__ "MatAssemblyBegin_MPIBAIJ"
391 int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
392 {
393   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
394   MPI_Comm    comm = mat->comm;
395   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
396   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
397   MPI_Request *send_waits,*recv_waits;
398   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
399   InsertMode  addv;
400   Scalar      *rvalues,*svalues;
401 
402   /* make sure all processors are either in INSERTMODE or ADDMODE */
403   MPI_Allreduce(&mat->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
404   if (addv == (ADD_VALUES|INSERT_VALUES)) {
405     SETERRQ(1,0,"Some processors inserted others added");
406   }
407   mat->insertmode = addv; /* in case this processor had no cache */
408 
409   /*  first count number of contributors to each processor */
410   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
411   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
412   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
413   for ( i=0; i<baij->stash.n; i++ ) {
414     idx = baij->stash.idx[i];
415     for ( j=0; j<size; j++ ) {
416       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
417         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
418       }
419     }
420   }
421   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
422 
423   /* inform other processors of number of messages and max length*/
424   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
425   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
426   nreceives = work[rank];
427   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
428   nmax = work[rank];
429   PetscFree(work);
430 
431   /* post receives:
432        1) each message will consist of ordered pairs
433      (global index,value) we store the global index as a double
434      to simplify the message passing.
435        2) since we don't know how long each individual message is we
436      allocate the largest needed buffer for each receive. Potentially
437      this is a lot of wasted space.
438 
439 
440        This could be done better.
441   */
442   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
443   CHKPTRQ(rvalues);
444   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
445   CHKPTRQ(recv_waits);
446   for ( i=0; i<nreceives; i++ ) {
447     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
448               comm,recv_waits+i);
449   }
450 
451   /* do sends:
452       1) starts[i] gives the starting index in svalues for stuff going to
453          the ith processor
454   */
455   svalues = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
456   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
457   CHKPTRQ(send_waits);
458   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
459   starts[0] = 0;
460   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
461   for ( i=0; i<baij->stash.n; i++ ) {
462     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
463     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
464     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
465   }
466   PetscFree(owner);
467   starts[0] = 0;
468   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
469   count = 0;
470   for ( i=0; i<size; i++ ) {
471     if (procs[i]) {
472       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
473                 comm,send_waits+count++);
474     }
475   }
476   PetscFree(starts); PetscFree(nprocs);
477 
478   /* Free cache space */
479   PLogInfo(mat,"MatAssemblyBegin_MPIBAIJ:Number of off-processor values %d\n",baij->stash.n);
480   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
481 
482   baij->svalues    = svalues;    baij->rvalues    = rvalues;
483   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
484   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
485   baij->rmax       = nmax;
486 
487   return 0;
488 }
489 
490 
491 #undef __FUNC__
492 #define __FUNC__ "MatAssemblyEnd_MPIBAIJ"
493 int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
494 {
495   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
496   MPI_Status  *send_status,recv_status;
497   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
498   int         bs=baij->bs,row,col,other_disassembled;
499   Scalar      *values,val;
500   InsertMode  addv = mat->insertmode;
501 
502   /*  wait on receives */
503   while (count) {
504     MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);
505     /* unpack receives into our local space */
506     values = baij->rvalues + 3*imdex*baij->rmax;
507     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
508     n = n/3;
509     for ( i=0; i<n; i++ ) {
510       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
511       col = (int) PetscReal(values[3*i+1]);
512       val = values[3*i+2];
513       if (col >= baij->cstart*bs && col < baij->cend*bs) {
514         col -= baij->cstart*bs;
515         MatSetValues(baij->A,1,&row,1,&col,&val,addv);
516       }
517       else {
518         if (mat->was_assembled) {
519           if (!baij->colmap) {
520             ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);
521           }
522           col = (baij->colmap[col/bs]-1)*bs + col%bs;
523           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
524             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
525             col = (int) PetscReal(values[3*i+1]);
526           }
527         }
528         MatSetValues(baij->B,1,&row,1,&col,&val,addv);
529       }
530     }
531     count--;
532   }
533   PetscFree(baij->recv_waits); PetscFree(baij->rvalues);
534 
535   /* wait on sends */
536   if (baij->nsends) {
537     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));
538     CHKPTRQ(send_status);
539     MPI_Waitall(baij->nsends,baij->send_waits,send_status);
540     PetscFree(send_status);
541   }
542   PetscFree(baij->send_waits); PetscFree(baij->svalues);
543 
544   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
545   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
546 
547   /* determine if any processor has disassembled, if so we must
548      also disassemble ourselfs, in order that we may reassemble. */
549   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
550   if (mat->was_assembled && !other_disassembled) {
551     ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
552   }
553 
554   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
555     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
556   }
557   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
558   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
559 
560   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
561   return 0;
562 }
563 
564 #undef __FUNC__
565 #define __FUNC__ "MatView_MPIBAIJ_Binary"
566 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
567 {
568   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
569   int          ierr;
570 
571   if (baij->size == 1) {
572     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
573   }
574   else SETERRQ(1,0,"Only uniprocessor output supported");
575   return 0;
576 }
577 
578 #undef __FUNC__
579 #define __FUNC__ "MatView_MPIBAIJ_ASCIIorDraworMatlab"
580 static int MatView_MPIBAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
581 {
582   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
583   int          ierr, format,rank,bs = baij->bs;
584   FILE         *fd;
585   ViewerType   vtype;
586 
587   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
588   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
589     ierr = ViewerGetFormat(viewer,&format);
590     if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
591       MatInfo info;
592       MPI_Comm_rank(mat->comm,&rank);
593       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
594       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
595       PetscSequentialPhaseBegin(mat->comm,1);
596       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
597               rank,baij->m,(int)info.nz_used*bs,(int)info.nz_allocated*bs,
598               baij->bs,(int)info.memory);
599       ierr = MatGetInfo(baij->A,MAT_LOCAL,&info);
600       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
601       ierr = MatGetInfo(baij->B,MAT_LOCAL,&info);
602       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
603       fflush(fd);
604       PetscSequentialPhaseEnd(mat->comm,1);
605       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
606       return 0;
607     }
608     else if (format == VIEWER_FORMAT_ASCII_INFO) {
609       PetscPrintf(mat->comm,"  block size is %d\n",bs);
610       return 0;
611     }
612   }
613 
614   if (vtype == DRAW_VIEWER) {
615     Draw       draw;
616     PetscTruth isnull;
617     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
618     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
619   }
620 
621   if (vtype == ASCII_FILE_VIEWER) {
622     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
623     PetscSequentialPhaseBegin(mat->comm,1);
624     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
625            baij->rank,baij->m,baij->rstart*bs,baij->rend*bs,baij->n,
626             baij->cstart*bs,baij->cend*bs);
627     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
628     ierr = MatView(baij->B,viewer); CHKERRQ(ierr);
629     fflush(fd);
630     PetscSequentialPhaseEnd(mat->comm,1);
631   }
632   else {
633     int size = baij->size;
634     rank = baij->rank;
635     if (size == 1) {
636       ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
637     }
638     else {
639       /* assemble the entire matrix onto first processor. */
640       Mat         A;
641       Mat_SeqBAIJ *Aloc;
642       int         M = baij->M, N = baij->N,*ai,*aj,row,col,i,j,k,*rvals;
643       int         mbs=baij->mbs;
644       Scalar      *a;
645 
646       if (!rank) {
647         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
648         CHKERRQ(ierr);
649       }
650       else {
651         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
652         CHKERRQ(ierr);
653       }
654       PLogObjectParent(mat,A);
655 
656       /* copy over the A part */
657       Aloc = (Mat_SeqBAIJ*) baij->A->data;
658       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
659       row = baij->rstart;
660       rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
661 
662       for ( i=0; i<mbs; i++ ) {
663         rvals[0] = bs*(baij->rstart + i);
664         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
665         for ( j=ai[i]; j<ai[i+1]; j++ ) {
666           col = (baij->cstart+aj[j])*bs;
667           for (k=0; k<bs; k++ ) {
668             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
669             col++; a += bs;
670           }
671         }
672       }
673       /* copy over the B part */
674       Aloc = (Mat_SeqBAIJ*) baij->B->data;
675       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
676       row = baij->rstart*bs;
677       for ( i=0; i<mbs; i++ ) {
678         rvals[0] = bs*(baij->rstart + i);
679         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
680         for ( j=ai[i]; j<ai[i+1]; j++ ) {
681           col = baij->garray[aj[j]]*bs;
682           for (k=0; k<bs; k++ ) {
683             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
684             col++; a += bs;
685           }
686         }
687       }
688       PetscFree(rvals);
689       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
690       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
691       if (!rank) {
692         ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
693       }
694       ierr = MatDestroy(A); CHKERRQ(ierr);
695     }
696   }
697   return 0;
698 }
699 
700 
701 
702 #undef __FUNC__
703 #define __FUNC__ "MatView_MPIBAIJ"
704 int MatView_MPIBAIJ(PetscObject obj,Viewer viewer)
705 {
706   Mat         mat = (Mat) obj;
707   int         ierr;
708   ViewerType  vtype;
709 
710   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
711   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
712       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
713     ierr = MatView_MPIBAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
714   }
715   else if (vtype == BINARY_FILE_VIEWER) {
716     return MatView_MPIBAIJ_Binary(mat,viewer);
717   }
718   return 0;
719 }
720 
721 #undef __FUNC__
722 #define __FUNC__ "MatDestroy_MPIBAIJ"
723 int MatDestroy_MPIBAIJ(PetscObject obj)
724 {
725   Mat         mat = (Mat) obj;
726   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
727   int         ierr;
728 
729 #if defined(PETSC_LOG)
730   PLogObjectState(obj,"Rows=%d, Cols=%d",baij->M,baij->N);
731 #endif
732 
733   PetscFree(baij->rowners);
734   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
735   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
736   if (baij->colmap) PetscFree(baij->colmap);
737   if (baij->garray) PetscFree(baij->garray);
738   if (baij->lvec)   VecDestroy(baij->lvec);
739   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
740   if (baij->rowvalues) PetscFree(baij->rowvalues);
741   PetscFree(baij);
742   if (mat->mapping) {
743     ierr = ISLocalToGlobalMappingDestroy(mat->mapping); CHKERRQ(ierr);
744   }
745   PLogObjectDestroy(mat);
746   PetscHeaderDestroy(mat);
747   return 0;
748 }
749 
750 #undef __FUNC__
751 #define __FUNC__ "MatMult_MPIBAIJ"
752 int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
753 {
754   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
755   int         ierr, nt;
756 
757   VecGetLocalSize_Fast(xx,nt);
758   if (nt != a->n) {
759     SETERRQ(1,0,"Incompatible partition of A and xx");
760   }
761   VecGetLocalSize_Fast(yy,nt);
762   if (nt != a->m) {
763     SETERRQ(1,0,"Incompatible parition of A and yy");
764   }
765   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
766   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
767   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
768   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
769   ierr = VecScatterPostRecvs(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
770   return 0;
771 }
772 
773 #undef __FUNC__
774 #define __FUNC__ "MatMultAdd_MPIBAIJ"
775 int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
776 {
777   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
778   int        ierr;
779   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
780   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
781   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
782   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
783   return 0;
784 }
785 
786 #undef __FUNC__
787 #define __FUNC__ "MatMultTrans_MPIBAIJ"
788 int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
789 {
790   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
791   int        ierr;
792 
793   /* do nondiagonal part */
794   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
795   /* send it on its way */
796   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
797   /* do local part */
798   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
799   /* receive remote parts: note this assumes the values are not actually */
800   /* inserted in yy until the next line, which is true for my implementation*/
801   /* but is not perhaps always true. */
802   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
803   return 0;
804 }
805 
806 #undef __FUNC__
807 #define __FUNC__ "MatMultTransAdd_MPIBAIJ"
808 int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
809 {
810   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
811   int        ierr;
812 
813   /* do nondiagonal part */
814   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
815   /* send it on its way */
816   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
817   /* do local part */
818   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
819   /* receive remote parts: note this assumes the values are not actually */
820   /* inserted in yy until the next line, which is true for my implementation*/
821   /* but is not perhaps always true. */
822   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
823   return 0;
824 }
825 
826 /*
827   This only works correctly for square matrices where the subblock A->A is the
828    diagonal block
829 */
830 #undef __FUNC__
831 #define __FUNC__ "MatGetDiagonal_MPIBAIJ"
832 int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
833 {
834   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
835   if (a->M != a->N)
836     SETERRQ(1,0,"Supports only square matrix where A->A is diag block");
837   return MatGetDiagonal(a->A,v);
838 }
839 
840 #undef __FUNC__
841 #define __FUNC__ "MatScale_MPIBAIJ"
842 int MatScale_MPIBAIJ(Scalar *aa,Mat A)
843 {
844   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
845   int        ierr;
846   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
847   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
848   return 0;
849 }
850 
851 #undef __FUNC__
852 #define __FUNC__ "MatGetSize_MPIBAIJ"
853 int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
854 {
855   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
856   *m = mat->M; *n = mat->N;
857   return 0;
858 }
859 
860 #undef __FUNC__
861 #define __FUNC__ "MatGetLocalSize_MPIBAIJ"
862 int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
863 {
864   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
865   *m = mat->m; *n = mat->N;
866   return 0;
867 }
868 
869 #undef __FUNC__
870 #define __FUNC__ "MatGetOwnershipRange_MPIBAIJ"
871 int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
872 {
873   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
874   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
875   return 0;
876 }
877 
878 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
879 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
880 
881 #undef __FUNC__
882 #define __FUNC__ "MatGetRow_MPIBAIJ"
883 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
884 {
885   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
886   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
887   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
888   int        nztot, nzA, nzB, lrow, brstart = mat->rstart*bs, brend = mat->rend*bs;
889   int        *cmap, *idx_p,cstart = mat->cstart;
890 
891   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,0,"Already active");
892   mat->getrowactive = PETSC_TRUE;
893 
894   if (!mat->rowvalues && (idx || v)) {
895     /*
896         allocate enough space to hold information from the longest row.
897     */
898     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
899     int     max = 1,mbs = mat->mbs,tmp;
900     for ( i=0; i<mbs; i++ ) {
901       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
902       if (max < tmp) { max = tmp; }
903     }
904     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
905     CHKPTRQ(mat->rowvalues);
906     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
907   }
908 
909 
910   if (row < brstart || row >= brend) SETERRQ(1,0,"Only local rows")
911   lrow = row - brstart;
912 
913   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
914   if (!v)   {pvA = 0; pvB = 0;}
915   if (!idx) {pcA = 0; if (!v) pcB = 0;}
916   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
917   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
918   nztot = nzA + nzB;
919 
920   cmap  = mat->garray;
921   if (v  || idx) {
922     if (nztot) {
923       /* Sort by increasing column numbers, assuming A and B already sorted */
924       int imark = -1;
925       if (v) {
926         *v = v_p = mat->rowvalues;
927         for ( i=0; i<nzB; i++ ) {
928           if (cmap[cworkB[i]/bs] < cstart)   v_p[i] = vworkB[i];
929           else break;
930         }
931         imark = i;
932         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
933         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
934       }
935       if (idx) {
936         *idx = idx_p = mat->rowindices;
937         if (imark > -1) {
938           for ( i=0; i<imark; i++ ) {
939             idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs;
940           }
941         } else {
942           for ( i=0; i<nzB; i++ ) {
943             if (cmap[cworkB[i]/bs] < cstart)
944               idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
945             else break;
946           }
947           imark = i;
948         }
949         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart*bs + cworkA[i];
950         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
951       }
952     }
953     else {
954       if (idx) *idx = 0;
955       if (v)   *v   = 0;
956     }
957   }
958   *nz = nztot;
959   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
960   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
961   return 0;
962 }
963 
964 #undef __FUNC__
965 #define __FUNC__ "MatRestoreRow_MPIBAIJ"
966 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
967 {
968   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
969   if (baij->getrowactive == PETSC_FALSE) {
970     SETERRQ(1,0,"MatGetRow not called");
971   }
972   baij->getrowactive = PETSC_FALSE;
973   return 0;
974 }
975 
976 #undef __FUNC__
977 #define __FUNC__ "MatGetBlockSize_MPIBAIJ"
978 int MatGetBlockSize_MPIBAIJ(Mat mat,int *bs)
979 {
980   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
981   *bs = baij->bs;
982   return 0;
983 }
984 
985 #undef __FUNC__
986 #define __FUNC__ "MatZeroEntries_MPIBAIJ"
987 int MatZeroEntries_MPIBAIJ(Mat A)
988 {
989   Mat_MPIBAIJ *l = (Mat_MPIBAIJ *) A->data;
990   int         ierr;
991   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
992   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
993   return 0;
994 }
995 
996 #undef __FUNC__
997 #define __FUNC__ "MatGetInfo_MPIBAIJ"
998 int MatGetInfo_MPIBAIJ(Mat matin,MatInfoType flag,MatInfo *info)
999 {
1000   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) matin->data;
1001   Mat         A = a->A, B = a->B;
1002   int         ierr;
1003   double      isend[5], irecv[5];
1004 
1005   info->rows_global    = (double)a->M;
1006   info->columns_global = (double)a->N;
1007   info->rows_local     = (double)a->m;
1008   info->columns_local  = (double)a->N;
1009   info->block_size     = (double)a->bs;
1010   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
1011   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->memory;
1012   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
1013   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->memory;
1014   if (flag == MAT_LOCAL) {
1015     info->nz_used      = isend[0];
1016     info->nz_allocated = isend[1];
1017     info->nz_unneeded  = isend[2];
1018     info->memory       = isend[3];
1019     info->mallocs      = isend[4];
1020   } else if (flag == MAT_GLOBAL_MAX) {
1021     MPI_Allreduce(isend,irecv,5,MPI_INT,MPI_MAX,matin->comm);
1022     info->nz_used      = irecv[0];
1023     info->nz_allocated = irecv[1];
1024     info->nz_unneeded  = irecv[2];
1025     info->memory       = irecv[3];
1026     info->mallocs      = irecv[4];
1027   } else if (flag == MAT_GLOBAL_SUM) {
1028     MPI_Allreduce(isend,irecv,5,MPI_INT,MPI_SUM,matin->comm);
1029     info->nz_used      = irecv[0];
1030     info->nz_allocated = irecv[1];
1031     info->nz_unneeded  = irecv[2];
1032     info->memory       = irecv[3];
1033     info->mallocs      = irecv[4];
1034   }
1035   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
1036   info->fill_ratio_needed = 0;
1037   info->factor_mallocs    = 0;
1038   return 0;
1039 }
1040 
1041 #undef __FUNC__
1042 #define __FUNC__ "MatSetOption_MPIBAIJ"
1043 int MatSetOption_MPIBAIJ(Mat A,MatOption op)
1044 {
1045   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1046 
1047   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
1048       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
1049       op == MAT_COLUMNS_UNSORTED ||
1050       op == MAT_COLUMNS_SORTED ||
1051       op == MAT_NEW_NONZERO_LOCATION_ERROR) {
1052         MatSetOption(a->A,op);
1053         MatSetOption(a->B,op);
1054   } else if (op == MAT_ROW_ORIENTED) {
1055         a->roworiented = 1;
1056         MatSetOption(a->A,op);
1057         MatSetOption(a->B,op);
1058   } else if (op == MAT_ROWS_SORTED ||
1059              op == MAT_ROWS_UNSORTED ||
1060              op == MAT_SYMMETRIC ||
1061              op == MAT_STRUCTURALLY_SYMMETRIC ||
1062              op == MAT_YES_NEW_DIAGONALS)
1063     PLogInfo(A,"Info:MatSetOption_MPIBAIJ:Option ignored\n");
1064   else if (op == MAT_COLUMN_ORIENTED) {
1065     a->roworiented = 0;
1066     MatSetOption(a->A,op);
1067     MatSetOption(a->B,op);
1068   } else if (op == MAT_IGNORE_OFF_PROCESSOR_ENTRIES) {
1069     a->donotstash = 1;
1070   } else if (op == MAT_NO_NEW_DIAGONALS)
1071     {SETERRQ(PETSC_ERR_SUP,0,"MAT_NO_NEW_DIAGONALS");}
1072   else
1073     {SETERRQ(PETSC_ERR_SUP,0,"unknown option");}
1074   return 0;
1075 }
1076 
1077 #undef __FUNC__
1078 #define __FUNC__ "MatTranspose_MPIBAIJ("
1079 int MatTranspose_MPIBAIJ(Mat A,Mat *matout)
1080 {
1081   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) A->data;
1082   Mat_SeqBAIJ *Aloc;
1083   Mat        B;
1084   int        ierr,M=baij->M,N=baij->N,*ai,*aj,row,i,*rvals,j,k,col;
1085   int        bs=baij->bs,mbs=baij->mbs;
1086   Scalar     *a;
1087 
1088   if (matout == PETSC_NULL && M != N)
1089     SETERRQ(1,0,"Square matrix only for in-place");
1090   ierr = MatCreateMPIBAIJ(A->comm,baij->bs,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);
1091   CHKERRQ(ierr);
1092 
1093   /* copy over the A part */
1094   Aloc = (Mat_SeqBAIJ*) baij->A->data;
1095   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1096   row = baij->rstart;
1097   rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
1098 
1099   for ( i=0; i<mbs; i++ ) {
1100     rvals[0] = bs*(baij->rstart + i);
1101     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1102     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1103       col = (baij->cstart+aj[j])*bs;
1104       for (k=0; k<bs; k++ ) {
1105         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1106         col++; a += bs;
1107       }
1108     }
1109   }
1110   /* copy over the B part */
1111   Aloc = (Mat_SeqBAIJ*) baij->B->data;
1112   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1113   row = baij->rstart*bs;
1114   for ( i=0; i<mbs; i++ ) {
1115     rvals[0] = bs*(baij->rstart + i);
1116     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1117     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1118       col = baij->garray[aj[j]]*bs;
1119       for (k=0; k<bs; k++ ) {
1120         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1121         col++; a += bs;
1122       }
1123     }
1124   }
1125   PetscFree(rvals);
1126   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1127   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1128 
1129   if (matout != PETSC_NULL) {
1130     *matout = B;
1131   } else {
1132     /* This isn't really an in-place transpose .... but free data structures from baij */
1133     PetscFree(baij->rowners);
1134     ierr = MatDestroy(baij->A); CHKERRQ(ierr);
1135     ierr = MatDestroy(baij->B); CHKERRQ(ierr);
1136     if (baij->colmap) PetscFree(baij->colmap);
1137     if (baij->garray) PetscFree(baij->garray);
1138     if (baij->lvec) VecDestroy(baij->lvec);
1139     if (baij->Mvctx) VecScatterDestroy(baij->Mvctx);
1140     PetscFree(baij);
1141     PetscMemcpy(A,B,sizeof(struct _Mat));
1142     PetscHeaderDestroy(B);
1143   }
1144   return 0;
1145 }
1146 
1147 #undef __FUNC__
1148 #define __FUNC__ "MatDiagonalScale_MPIBAIJ"
1149 int MatDiagonalScale_MPIBAIJ(Mat A,Vec ll,Vec rr)
1150 {
1151   Mat a = ((Mat_MPIBAIJ *) A->data)->A;
1152   Mat b = ((Mat_MPIBAIJ *) A->data)->B;
1153   int ierr,s1,s2,s3;
1154 
1155   if (ll)  {
1156     ierr = VecGetLocalSize(ll,&s1); CHKERRQ(ierr);
1157     ierr = MatGetLocalSize(A,&s2,&s3); CHKERRQ(ierr);
1158     if (s1!=s2) SETERRQ(1,0,"non-conforming local sizes");
1159     ierr = MatDiagonalScale(a,ll,0); CHKERRQ(ierr);
1160     ierr = MatDiagonalScale(b,ll,0); CHKERRQ(ierr);
1161   }
1162   if (rr) SETERRQ(1,0,"not supported for right vector");
1163   return 0;
1164 }
1165 
1166 /* the code does not do the diagonal entries correctly unless the
1167    matrix is square and the column and row owerships are identical.
1168    This is a BUG. The only way to fix it seems to be to access
1169    baij->A and baij->B directly and not through the MatZeroRows()
1170    routine.
1171 */
1172 #undef __FUNC__
1173 #define __FUNC__ "MatZeroRows_MPIBAIJ"
1174 int MatZeroRows_MPIBAIJ(Mat A,IS is,Scalar *diag)
1175 {
1176   Mat_MPIBAIJ    *l = (Mat_MPIBAIJ *) A->data;
1177   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
1178   int            *procs,*nprocs,j,found,idx,nsends,*work;
1179   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
1180   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
1181   int            *lens,imdex,*lrows,*values,bs=l->bs;
1182   MPI_Comm       comm = A->comm;
1183   MPI_Request    *send_waits,*recv_waits;
1184   MPI_Status     recv_status,*send_status;
1185   IS             istmp;
1186 
1187   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
1188   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
1189 
1190   /*  first count number of contributors to each processor */
1191   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
1192   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
1193   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
1194   for ( i=0; i<N; i++ ) {
1195     idx = rows[i];
1196     found = 0;
1197     for ( j=0; j<size; j++ ) {
1198       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
1199         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
1200       }
1201     }
1202     if (!found) SETERRQ(1,0,"Index out of range");
1203   }
1204   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
1205 
1206   /* inform other processors of number of messages and max length*/
1207   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
1208   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
1209   nrecvs = work[rank];
1210   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
1211   nmax = work[rank];
1212   PetscFree(work);
1213 
1214   /* post receives:   */
1215   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
1216   CHKPTRQ(rvalues);
1217   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
1218   CHKPTRQ(recv_waits);
1219   for ( i=0; i<nrecvs; i++ ) {
1220     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
1221   }
1222 
1223   /* do sends:
1224       1) starts[i] gives the starting index in svalues for stuff going to
1225          the ith processor
1226   */
1227   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
1228   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
1229   CHKPTRQ(send_waits);
1230   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
1231   starts[0] = 0;
1232   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1233   for ( i=0; i<N; i++ ) {
1234     svalues[starts[owner[i]]++] = rows[i];
1235   }
1236   ISRestoreIndices(is,&rows);
1237 
1238   starts[0] = 0;
1239   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1240   count = 0;
1241   for ( i=0; i<size; i++ ) {
1242     if (procs[i]) {
1243       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
1244     }
1245   }
1246   PetscFree(starts);
1247 
1248   base = owners[rank]*bs;
1249 
1250   /*  wait on receives */
1251   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
1252   source = lens + nrecvs;
1253   count  = nrecvs; slen = 0;
1254   while (count) {
1255     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
1256     /* unpack receives into our local space */
1257     MPI_Get_count(&recv_status,MPI_INT,&n);
1258     source[imdex]  = recv_status.MPI_SOURCE;
1259     lens[imdex]  = n;
1260     slen += n;
1261     count--;
1262   }
1263   PetscFree(recv_waits);
1264 
1265   /* move the data into the send scatter */
1266   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
1267   count = 0;
1268   for ( i=0; i<nrecvs; i++ ) {
1269     values = rvalues + i*nmax;
1270     for ( j=0; j<lens[i]; j++ ) {
1271       lrows[count++] = values[j] - base;
1272     }
1273   }
1274   PetscFree(rvalues); PetscFree(lens);
1275   PetscFree(owner); PetscFree(nprocs);
1276 
1277   /* actually zap the local rows */
1278   ierr = ISCreateGeneral(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
1279   PLogObjectParent(A,istmp);
1280   PetscFree(lrows);
1281   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
1282   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
1283   ierr = ISDestroy(istmp); CHKERRQ(ierr);
1284 
1285   /* wait on sends */
1286   if (nsends) {
1287     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
1288     CHKPTRQ(send_status);
1289     MPI_Waitall(nsends,send_waits,send_status);
1290     PetscFree(send_status);
1291   }
1292   PetscFree(send_waits); PetscFree(svalues);
1293 
1294   return 0;
1295 }
1296 extern int MatPrintHelp_SeqBAIJ(Mat);
1297 #undef __FUNC__
1298 #define __FUNC__ "MatPrintHelp_MPIBAIJ"
1299 int MatPrintHelp_MPIBAIJ(Mat A)
1300 {
1301   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1302 
1303   if (!a->rank) return MatPrintHelp_SeqBAIJ(a->A);
1304   else return 0;
1305 }
1306 
1307 #undef __FUNC__
1308 #define __FUNC__ "MatSetUnfactored_MPIBAIJ"
1309 int MatSetUnfactored_MPIBAIJ(Mat A)
1310 {
1311   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1312   int         ierr;
1313   ierr = MatSetUnfactored(a->A); CHKERRQ(ierr);
1314   return 0;
1315 }
1316 
1317 static int MatConvertSameType_MPIBAIJ(Mat,Mat *,int);
1318 
1319 /* -------------------------------------------------------------------*/
1320 static struct _MatOps MatOps = {
1321   MatSetValues_MPIBAIJ,MatGetRow_MPIBAIJ,MatRestoreRow_MPIBAIJ,MatMult_MPIBAIJ,
1322   MatMultAdd_MPIBAIJ,MatMultTrans_MPIBAIJ,MatMultTransAdd_MPIBAIJ,0,
1323   0,0,0,0,
1324   0,0,MatTranspose_MPIBAIJ,MatGetInfo_MPIBAIJ,
1325   0,MatGetDiagonal_MPIBAIJ,MatDiagonalScale_MPIBAIJ,MatNorm_MPIBAIJ,
1326   MatAssemblyBegin_MPIBAIJ,MatAssemblyEnd_MPIBAIJ,0,MatSetOption_MPIBAIJ,
1327   MatZeroEntries_MPIBAIJ,MatZeroRows_MPIBAIJ,0,
1328   0,0,0,MatGetSize_MPIBAIJ,
1329   MatGetLocalSize_MPIBAIJ,MatGetOwnershipRange_MPIBAIJ,0,0,
1330   0,0,MatConvertSameType_MPIBAIJ,0,0,
1331   0,0,0,MatGetSubMatrices_MPIBAIJ,
1332   MatIncreaseOverlap_MPIBAIJ,MatGetValues_MPIBAIJ,0,MatPrintHelp_MPIBAIJ,
1333   MatScale_MPIBAIJ,0,0,0,MatGetBlockSize_MPIBAIJ,
1334   0,0,0,0,0,0,MatSetUnfactored_MPIBAIJ,0,MatSetValuesBlocked_MPIBAIJ};
1335 
1336 
1337 #undef __FUNC__
1338 #define __FUNC__ "MatCreateMPIBAIJ"
1339 /*@C
1340    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
1341    (block compressed row).  For good matrix assembly performance
1342    the user should preallocate the matrix storage by setting the parameters
1343    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1344    performance can be increased by more than a factor of 50.
1345 
1346    Input Parameters:
1347 .  comm - MPI communicator
1348 .  bs   - size of blockk
1349 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1350            This value should be the same as the local size used in creating the
1351            y vector for the matrix-vector product y = Ax.
1352 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1353            This value should be the same as the local size used in creating the
1354            x vector for the matrix-vector product y = Ax.
1355 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1356 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1357 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
1358            submatrix  (same for all local rows)
1359 .  d_nzz - array containing the number of block nonzeros in the various block rows
1360            of the in diagonal portion of the local (possibly different for each block
1361            row) or PETSC_NULL.  You must leave room for the diagonal entry even if
1362            it is zero.
1363 .  o_nz  - number of block nonzeros per block row in the off-diagonal portion of local
1364            submatrix (same for all local rows).
1365 .  o_nzz - array containing the number of nonzeros in the various block rows of the
1366            off-diagonal portion of the local submatrix (possibly different for
1367            each block row) or PETSC_NULL.
1368 
1369    Output Parameter:
1370 .  A - the matrix
1371 
1372    Notes:
1373    The user MUST specify either the local or global matrix dimensions
1374    (possibly both).
1375 
1376    Storage Information:
1377    For a square global matrix we define each processor's diagonal portion
1378    to be its local rows and the corresponding columns (a square submatrix);
1379    each processor's off-diagonal portion encompasses the remainder of the
1380    local matrix (a rectangular submatrix).
1381 
1382    The user can specify preallocated storage for the diagonal part of
1383    the local submatrix with either d_nz or d_nnz (not both).  Set
1384    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1385    memory allocation.  Likewise, specify preallocated storage for the
1386    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1387 
1388    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1389    the figure below we depict these three local rows and all columns (0-11).
1390 
1391 $          0 1 2 3 4 5 6 7 8 9 10 11
1392 $         -------------------
1393 $  row 3  |  o o o d d d o o o o o o
1394 $  row 4  |  o o o d d d o o o o o o
1395 $  row 5  |  o o o d d d o o o o o o
1396 $         -------------------
1397 $
1398 
1399    Thus, any entries in the d locations are stored in the d (diagonal)
1400    submatrix, and any entries in the o locations are stored in the
1401    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1402    stored simply in the MATSEQBAIJ format for compressed row storage.
1403 
1404    Now d_nz should indicate the number of nonzeros per row in the d matrix,
1405    and o_nz should indicate the number of nonzeros per row in the o matrix.
1406    In general, for PDE problems in which most nonzeros are near the diagonal,
1407    one expects d_nz >> o_nz.   For large problems you MUST preallocate memory
1408    or you will get TERRIBLE performance; see the users' manual chapter on
1409    matrices.
1410 
1411 .keywords: matrix, block, aij, compressed row, sparse, parallel
1412 
1413 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues()
1414 @*/
1415 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
1416                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1417 {
1418   Mat          B;
1419   Mat_MPIBAIJ  *b;
1420   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE,size;
1421 
1422   if (bs < 1) SETERRQ(1,0,"invalid block size specified");
1423   *A = 0;
1424   PetscHeaderCreate(B,_Mat,MAT_COOKIE,MATMPIBAIJ,comm);
1425   PLogObjectCreate(B);
1426   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
1427   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
1428   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1429   MPI_Comm_size(comm,&size);
1430   if (size == 1) {
1431     B->ops.getrowij          = MatGetRowIJ_MPIBAIJ;
1432     B->ops.restorerowij      = MatRestoreRowIJ_MPIBAIJ;
1433     B->ops.lufactorsymbolic  = MatLUFactorSymbolic_MPIBAIJ;
1434     B->ops.lufactornumeric   = MatLUFactorNumeric_MPIBAIJ;
1435     B->ops.lufactor          = MatLUFactor_MPIBAIJ;
1436     B->ops.solve             = MatSolve_MPIBAIJ;
1437     B->ops.solveadd          = MatSolveAdd_MPIBAIJ;
1438     B->ops.solvetrans        = MatSolveTrans_MPIBAIJ;
1439     B->ops.solvetransadd     = MatSolveTransAdd_MPIBAIJ;
1440     B->ops.ilufactorsymbolic = MatILUFactorSymbolic_MPIBAIJ;
1441   }
1442 
1443   B->destroy    = MatDestroy_MPIBAIJ;
1444   B->view       = MatView_MPIBAIJ;
1445   B->mapping    = 0;
1446   B->factor     = 0;
1447   B->assembled  = PETSC_FALSE;
1448 
1449   B->insertmode = NOT_SET_VALUES;
1450   MPI_Comm_rank(comm,&b->rank);
1451   MPI_Comm_size(comm,&b->size);
1452 
1453   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
1454     SETERRQ(1,0,"Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1455   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) SETERRQ(1,0,"either M or m should be specified");
1456   if ( M == PETSC_DECIDE && n == PETSC_DECIDE)SETERRQ(1,0,"either N or n should be specified");
1457   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
1458   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
1459 
1460   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1461     work[0] = m; work[1] = n;
1462     mbs = m/bs; nbs = n/bs;
1463     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
1464     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
1465     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
1466   }
1467   if (m == PETSC_DECIDE) {
1468     Mbs = M/bs;
1469     if (Mbs*bs != M) SETERRQ(1,0,"No of global rows must be divisible by blocksize");
1470     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
1471     m   = mbs*bs;
1472   }
1473   if (n == PETSC_DECIDE) {
1474     Nbs = N/bs;
1475     if (Nbs*bs != N) SETERRQ(1,0,"No of global cols must be divisible by blocksize");
1476     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
1477     n   = nbs*bs;
1478   }
1479   if (mbs*bs != m || nbs*bs != n) SETERRQ(1,0,"No of local rows, cols must be divisible by blocksize");
1480 
1481   b->m = m; B->m = m;
1482   b->n = n; B->n = n;
1483   b->N = N; B->N = N;
1484   b->M = M; B->M = M;
1485   b->bs  = bs;
1486   b->bs2 = bs*bs;
1487   b->mbs = mbs;
1488   b->nbs = nbs;
1489   b->Mbs = Mbs;
1490   b->Nbs = Nbs;
1491 
1492   /* build local table of row and column ownerships */
1493   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1494   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
1495   b->cowners = b->rowners + b->size + 2;
1496   MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
1497   b->rowners[0] = 0;
1498   for ( i=2; i<=b->size; i++ ) {
1499     b->rowners[i] += b->rowners[i-1];
1500   }
1501   b->rstart    = b->rowners[b->rank];
1502   b->rend      = b->rowners[b->rank+1];
1503   b->rstart_bs = b->rstart * bs;
1504   b->rend_bs   = b->rend * bs;
1505 
1506   MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
1507   b->cowners[0] = 0;
1508   for ( i=2; i<=b->size; i++ ) {
1509     b->cowners[i] += b->cowners[i-1];
1510   }
1511   b->cstart    = b->cowners[b->rank];
1512   b->cend      = b->cowners[b->rank+1];
1513   b->cstart_bs = b->cstart * bs;
1514   b->cend_bs   = b->cend * bs;
1515 
1516   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1517   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1518   PLogObjectParent(B,b->A);
1519   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1520   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1521   PLogObjectParent(B,b->B);
1522 
1523   /* build cache for off array entries formed */
1524   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1525   b->donotstash  = 0;
1526   b->colmap      = 0;
1527   b->garray      = 0;
1528   b->roworiented = 1;
1529 
1530   /* stuff used for matrix vector multiply */
1531   b->lvec      = 0;
1532   b->Mvctx     = 0;
1533 
1534   /* stuff for MatGetRow() */
1535   b->rowindices   = 0;
1536   b->rowvalues    = 0;
1537   b->getrowactive = PETSC_FALSE;
1538 
1539   *A = B;
1540   return 0;
1541 }
1542 
1543 #undef __FUNC__
1544 #define __FUNC__ "MatConvertSameType_MPIBAIJ"
1545 static int MatConvertSameType_MPIBAIJ(Mat matin,Mat *newmat,int cpvalues)
1546 {
1547   Mat         mat;
1548   Mat_MPIBAIJ *a,*oldmat = (Mat_MPIBAIJ *) matin->data;
1549   int         ierr, len=0, flg;
1550 
1551   *newmat       = 0;
1552   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIBAIJ,matin->comm);
1553   PLogObjectCreate(mat);
1554   mat->data       = (void *) (a = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(a);
1555   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1556   mat->destroy    = MatDestroy_MPIBAIJ;
1557   mat->view       = MatView_MPIBAIJ;
1558   mat->factor     = matin->factor;
1559   mat->assembled  = PETSC_TRUE;
1560 
1561   a->m = mat->m   = oldmat->m;
1562   a->n = mat->n   = oldmat->n;
1563   a->M = mat->M   = oldmat->M;
1564   a->N = mat->N   = oldmat->N;
1565 
1566   a->bs  = oldmat->bs;
1567   a->bs2 = oldmat->bs2;
1568   a->mbs = oldmat->mbs;
1569   a->nbs = oldmat->nbs;
1570   a->Mbs = oldmat->Mbs;
1571   a->Nbs = oldmat->Nbs;
1572 
1573   a->rstart       = oldmat->rstart;
1574   a->rend         = oldmat->rend;
1575   a->cstart       = oldmat->cstart;
1576   a->cend         = oldmat->cend;
1577   a->size         = oldmat->size;
1578   a->rank         = oldmat->rank;
1579   mat->insertmode = NOT_SET_VALUES;
1580   a->rowvalues    = 0;
1581   a->getrowactive = PETSC_FALSE;
1582 
1583   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1584   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
1585   a->cowners = a->rowners + a->size + 2;
1586   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1587   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1588   if (oldmat->colmap) {
1589     a->colmap = (int *) PetscMalloc((a->Nbs)*sizeof(int));CHKPTRQ(a->colmap);
1590     PLogObjectMemory(mat,(a->Nbs)*sizeof(int));
1591     PetscMemcpy(a->colmap,oldmat->colmap,(a->Nbs)*sizeof(int));
1592   } else a->colmap = 0;
1593   if (oldmat->garray && (len = ((Mat_SeqBAIJ *) (oldmat->B->data))->nbs)) {
1594     a->garray = (int *) PetscMalloc(len*sizeof(int)); CHKPTRQ(a->garray);
1595     PLogObjectMemory(mat,len*sizeof(int));
1596     PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1597   } else a->garray = 0;
1598 
1599   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1600   PLogObjectParent(mat,a->lvec);
1601   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1602   PLogObjectParent(mat,a->Mvctx);
1603   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1604   PLogObjectParent(mat,a->A);
1605   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1606   PLogObjectParent(mat,a->B);
1607   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1608   if (flg) {
1609     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1610   }
1611   *newmat = mat;
1612   return 0;
1613 }
1614 
1615 #include "sys.h"
1616 
1617 #undef __FUNC__
1618 #define __FUNC__ "MatLoad_MPIBAIJ"
1619 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
1620 {
1621   Mat          A;
1622   int          i, nz, ierr, j,rstart, rend, fd;
1623   Scalar       *vals,*buf;
1624   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1625   MPI_Status   status;
1626   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
1627   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
1628   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,bs2,Mbs,mbs,extra_rows;
1629   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
1630   int          dcount,kmax,k,nzcount,tmp;
1631 
1632 
1633   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
1634   bs2  = bs*bs;
1635 
1636   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1637   if (!rank) {
1638     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1639     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
1640     if (header[0] != MAT_COOKIE) SETERRQ(1,0,"not matrix object");
1641   }
1642 
1643   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1644   M = header[1]; N = header[2];
1645 
1646   if (M != N) SETERRQ(1,0,"Can only do square matrices");
1647 
1648   /*
1649      This code adds extra rows to make sure the number of rows is
1650      divisible by the blocksize
1651   */
1652   Mbs        = M/bs;
1653   extra_rows = bs - M + bs*(Mbs);
1654   if (extra_rows == bs) extra_rows = 0;
1655   else                  Mbs++;
1656   if (extra_rows &&!rank) {
1657     PLogInfo(0,"MatLoad_MPIBAIJ:Padding loaded matrix to match blocksize\n");
1658   }
1659 
1660   /* determine ownership of all rows */
1661   mbs = Mbs/size + ((Mbs % size) > rank);
1662   m   = mbs * bs;
1663   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
1664   browners = rowners + size + 1;
1665   MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1666   rowners[0] = 0;
1667   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
1668   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
1669   rstart = rowners[rank];
1670   rend   = rowners[rank+1];
1671 
1672   /* distribute row lengths to all processors */
1673   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
1674   if (!rank) {
1675     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
1676     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1677     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
1678     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1679     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
1680     MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);
1681     PetscFree(sndcounts);
1682   }
1683   else {
1684     MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);
1685   }
1686 
1687   if (!rank) {
1688     /* calculate the number of nonzeros on each processor */
1689     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1690     PetscMemzero(procsnz,size*sizeof(int));
1691     for ( i=0; i<size; i++ ) {
1692       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
1693         procsnz[i] += rowlengths[j];
1694       }
1695     }
1696     PetscFree(rowlengths);
1697 
1698     /* determine max buffer needed and allocate it */
1699     maxnz = 0;
1700     for ( i=0; i<size; i++ ) {
1701       maxnz = PetscMax(maxnz,procsnz[i]);
1702     }
1703     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1704 
1705     /* read in my part of the matrix column indices  */
1706     nz = procsnz[0];
1707     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1708     mycols = ibuf;
1709     if (size == 1)  nz -= extra_rows;
1710     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1711     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
1712 
1713     /* read in every ones (except the last) and ship off */
1714     for ( i=1; i<size-1; i++ ) {
1715       nz = procsnz[i];
1716       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1717       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1718     }
1719     /* read in the stuff for the last proc */
1720     if ( size != 1 ) {
1721       nz = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
1722       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1723       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
1724       MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);
1725     }
1726     PetscFree(cols);
1727   }
1728   else {
1729     /* determine buffer space needed for message */
1730     nz = 0;
1731     for ( i=0; i<m; i++ ) {
1732       nz += locrowlens[i];
1733     }
1734     ibuf = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1735     mycols = ibuf;
1736     /* receive message of column indices*/
1737     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1738     MPI_Get_count(&status,MPI_INT,&maxnz);
1739     if (maxnz != nz) SETERRQ(1,0,"something is wrong with file");
1740   }
1741 
1742   /* loop over local rows, determining number of off diagonal entries */
1743   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
1744   odlens = dlens + (rend-rstart);
1745   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
1746   PetscMemzero(mask,3*Mbs*sizeof(int));
1747   masked1 = mask    + Mbs;
1748   masked2 = masked1 + Mbs;
1749   rowcount = 0; nzcount = 0;
1750   for ( i=0; i<mbs; i++ ) {
1751     dcount  = 0;
1752     odcount = 0;
1753     for ( j=0; j<bs; j++ ) {
1754       kmax = locrowlens[rowcount];
1755       for ( k=0; k<kmax; k++ ) {
1756         tmp = mycols[nzcount++]/bs;
1757         if (!mask[tmp]) {
1758           mask[tmp] = 1;
1759           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
1760           else masked1[dcount++] = tmp;
1761         }
1762       }
1763       rowcount++;
1764     }
1765 
1766     dlens[i]  = dcount;
1767     odlens[i] = odcount;
1768 
1769     /* zero out the mask elements we set */
1770     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
1771     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
1772   }
1773 
1774   /* create our matrix */
1775   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);
1776          CHKERRQ(ierr);
1777   A = *newmat;
1778   MatSetOption(A,MAT_COLUMNS_SORTED);
1779 
1780   if (!rank) {
1781     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
1782     /* read in my part of the matrix numerical values  */
1783     nz = procsnz[0];
1784     vals = buf;
1785     mycols = ibuf;
1786     if (size == 1)  nz -= extra_rows;
1787     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1788     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
1789 
1790     /* insert into matrix */
1791     jj      = rstart*bs;
1792     for ( i=0; i<m; i++ ) {
1793       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1794       mycols += locrowlens[i];
1795       vals   += locrowlens[i];
1796       jj++;
1797     }
1798     /* read in other processors (except the last one) and ship out */
1799     for ( i=1; i<size-1; i++ ) {
1800       nz = procsnz[i];
1801       vals = buf;
1802       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1803       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1804     }
1805     /* the last proc */
1806     if ( size != 1 ){
1807       nz = procsnz[i] - extra_rows;
1808       vals = buf;
1809       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1810       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
1811       MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);
1812     }
1813     PetscFree(procsnz);
1814   }
1815   else {
1816     /* receive numeric values */
1817     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
1818 
1819     /* receive message of values*/
1820     vals = buf;
1821     mycols = ibuf;
1822     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1823     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1824     if (maxnz != nz) SETERRQ(1,0,"something is wrong with file");
1825 
1826     /* insert into matrix */
1827     jj      = rstart*bs;
1828     for ( i=0; i<m; i++ ) {
1829       ierr    = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1830       mycols += locrowlens[i];
1831       vals   += locrowlens[i];
1832       jj++;
1833     }
1834   }
1835   PetscFree(locrowlens);
1836   PetscFree(buf);
1837   PetscFree(ibuf);
1838   PetscFree(rowners);
1839   PetscFree(dlens);
1840   PetscFree(mask);
1841   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1842   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1843   return 0;
1844 }
1845 
1846 
1847