xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision cd2245dea71a3aa4fce336c3d5cb6692bd00b1f7)
1 #ifndef lint
2 static char vcid[] = "$Id: mpibaij.c,v 1.54 1997/03/09 17:44:41 curfman Exp curfman $";
3 #endif
4 
5 #include "src/mat/impls/baij/mpi/mpibaij.h"
6 #include "src/vec/vecimpl.h"
7 
8 #include "draw.h"
9 #include "pinclude/pviewer.h"
10 
11 extern int MatSetUpMultiply_MPIBAIJ(Mat);
12 extern int DisAssemble_MPIBAIJ(Mat);
13 extern int MatIncreaseOverlap_MPIBAIJ(Mat,int,IS *,int);
14 extern int MatGetSubMatrices_MPIBAIJ(Mat,int,IS *,IS *,MatGetSubMatrixCall,Mat **);
15 extern int MatLUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,Mat *);
16 extern int MatLUFactorNumeric_MPIBAIJ(Mat,Mat *);
17 extern int MatLUFactor_MPIBAIJ(Mat,IS,IS,double);
18 extern int MatSolve_MPIBAIJ(Mat,Vec,Vec);
19 extern int MatSolveAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
20 extern int MatSolveTrans_MPIBAIJ(Mat,Vec,Vec);
21 extern int MatSolveTransAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
22 extern int MatILUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,int,Mat *);
23 
24 
25 /*
26      Local utility routine that creates a mapping from the global column
27    number to the local number in the off-diagonal part of the local
28    storage of the matrix.  This is done in a non scable way since the
29    length of colmap equals the global matrix length.
30 */
31 #undef __FUNC__
32 #define __FUNC__ "CreateColmap_MPIBAIJ_Private"
33 static int CreateColmap_MPIBAIJ_Private(Mat mat)
34 {
35   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
36   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
37   int         nbs = B->nbs,i,bs=B->bs;;
38 
39   baij->colmap = (int *) PetscMalloc(baij->Nbs*sizeof(int));CHKPTRQ(baij->colmap);
40   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
41   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
42   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i*bs+1;
43   return 0;
44 }
45 
46 #undef __FUNC__
47 #define __FUNC__ "MatGetRowIJ_MPIBAIJ("
48 static int MatGetRowIJ_MPIBAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
49                             PetscTruth *done)
50 {
51   Mat_MPIBAIJ *aij = (Mat_MPIBAIJ *) mat->data;
52   int         ierr;
53   if (aij->size == 1) {
54     ierr = MatGetRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
55   } else SETERRQ(1,0,"not supported in parallel");
56   return 0;
57 }
58 
59 #undef __FUNC__
60 #define __FUNC__ "MatRestoreRowIJ_MPIBAIJ"
61 static int MatRestoreRowIJ_MPIBAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
62                                 PetscTruth *done)
63 {
64   Mat_MPIBAIJ *aij = (Mat_MPIBAIJ *) mat->data;
65   int        ierr;
66   if (aij->size == 1) {
67     ierr = MatRestoreRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
68   } else SETERRQ(1,0,"not supported in parallel");
69   return 0;
70 }
71 #define CHUNKSIZE  10
72 
73 #define  MatSetValues_SeqBAIJ_A_Private(row,col,value,addv) \
74 { \
75  \
76     brow = row/bs;  \
77     rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
78     rmax = imax[brow]; nrow = ailen[brow]; \
79       bcol = col/bs; \
80       ridx = row % bs; cidx = col % bs; \
81       low = 0; high = nrow; \
82       while (high-low > 3) { \
83         t = (low+high)/2; \
84         if (rp[t] > bcol) high = t; \
85         else              low  = t; \
86       } \
87       for ( _i=low; _i<high; _i++ ) { \
88         if (rp[_i] > bcol) break; \
89         if (rp[_i] == bcol) { \
90           bap  = ap +  bs2*_i + bs*cidx + ridx; \
91           if (addv == ADD_VALUES) *bap += value;  \
92           else                    *bap  = value;  \
93           goto _noinsert; \
94         } \
95       } \
96       if (a->nonew) goto _noinsert; \
97       if (nrow >= rmax) { \
98         /* there is no extra room in row, therefore enlarge */ \
99         int    new_nz = ai[a->mbs] + CHUNKSIZE,len,*new_i,*new_j; \
100         Scalar *new_a; \
101  \
102         /* malloc new storage space */ \
103         len     = new_nz*(sizeof(int)+bs2*sizeof(Scalar))+(a->mbs+1)*sizeof(int); \
104         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
105         new_j   = (int *) (new_a + bs2*new_nz); \
106         new_i   = new_j + new_nz; \
107  \
108         /* copy over old data into new slots */ \
109         for ( ii=0; ii<brow+1; ii++ ) {new_i[ii] = ai[ii];} \
110         for ( ii=brow+1; ii<a->mbs+1; ii++ ) {new_i[ii] = ai[ii]+CHUNKSIZE;} \
111         PetscMemcpy(new_j,aj,(ai[brow]+nrow)*sizeof(int)); \
112         len = (new_nz - CHUNKSIZE - ai[brow] - nrow); \
113         PetscMemcpy(new_j+ai[brow]+nrow+CHUNKSIZE,aj+ai[brow]+nrow, \
114                                                            len*sizeof(int)); \
115         PetscMemcpy(new_a,aa,(ai[brow]+nrow)*bs2*sizeof(Scalar)); \
116         PetscMemzero(new_a+bs2*(ai[brow]+nrow),bs2*CHUNKSIZE*sizeof(Scalar)); \
117         PetscMemcpy(new_a+bs2*(ai[brow]+nrow+CHUNKSIZE), \
118                     aa+bs2*(ai[brow]+nrow),bs2*len*sizeof(Scalar));  \
119         /* free up old matrix storage */ \
120         PetscFree(a->a);  \
121         if (!a->singlemalloc) {PetscFree(a->i);PetscFree(a->j);} \
122         aa = a->a = new_a; ai = a->i = new_i; aj = a->j = new_j;  \
123         a->singlemalloc = 1; \
124  \
125         rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
126         rmax = imax[brow] = imax[brow] + CHUNKSIZE; \
127         PLogObjectMemory(A,CHUNKSIZE*(sizeof(int) + bs2*sizeof(Scalar))); \
128         a->maxnz += bs2*CHUNKSIZE; \
129         a->reallocs++; \
130         a->nz++; \
131       } \
132       N = nrow++ - 1;  \
133       /* shift up all the later entries in this row */ \
134       for ( ii=N; ii>=_i; ii-- ) { \
135         rp[ii+1] = rp[ii]; \
136         PetscMemcpy(ap+bs2*(ii+1),ap+bs2*(ii),bs2*sizeof(Scalar)); \
137       } \
138       if (N>=_i) PetscMemzero(ap+bs2*_i,bs2*sizeof(Scalar));  \
139       rp[_i]                      = bcol;  \
140       ap[bs2*_i + bs*cidx + ridx] = value;  \
141       _noinsert:; \
142     ailen[brow] = nrow; \
143 }
144 
145 extern int MatSetValues_SeqBAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
146 #undef __FUNC__
147 #define __FUNC__ "MatSetValues_MPIBAIJ"
148 static int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
149 {
150   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
151   Scalar      value;
152   int         ierr,i,j,row,col;
153   int         roworiented = baij->roworiented,rstart_orig=baij->rstart_bs ;
154   int         rend_orig=baij->rend_bs,cstart_orig=baij->cstart_bs;
155   int         cend_orig=baij->cend_bs,bs=baij->bs;
156 
157   /* Some Variables required in the macro */
158   Mat         A = baij->A;
159   Mat_SeqBAIJ *a = (Mat_SeqBAIJ *) (A)->data;
160   int         *rp,ii,nrow,_i,rmax,N;
161   int         *imax=a->imax,*ai=a->i,*ailen=a->ilen;
162   int         *aj=a->j,brow,bcol;
163   int         low,high,t,ridx,cidx,bs2=a->bs2;
164   Scalar      *ap,*aa=a->a,*bap;
165 
166   for ( i=0; i<m; i++ ) {
167 #if defined(PETSC_BOPT_g)
168     if (im[i] < 0) SETERRQ(1,0,"Negative row");
169     if (im[i] >= baij->M) SETERRQ(1,0,"Row too large");
170 #endif
171     if (im[i] >= rstart_orig && im[i] < rend_orig) {
172       row = im[i] - rstart_orig;
173       for ( j=0; j<n; j++ ) {
174         if (in[j] >= cstart_orig && in[j] < cend_orig){
175           col = in[j] - cstart_orig;
176           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
177           MatSetValues_SeqBAIJ_A_Private(row,col,value,addv);
178           /* ierr = MatSetValues_SeqBAIJ(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
179         }
180 #if defined(PETSC_BOPT_g)
181         else if (in[j] < 0) {SETERRQ(1,0,"Negative column");}
182         else if (in[j] >= baij->N) {SETERRQ(1,0,"Col too large");}
183 #endif
184         else {
185           if (mat->was_assembled) {
186             if (!baij->colmap) {
187               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
188             }
189             col = baij->colmap[in[j]/bs] - 1 + in[j]%bs;
190             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
191               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
192               col =  in[j];
193             }
194           }
195           else col = in[j];
196           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
197           ierr = MatSetValues_SeqBAIJ(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
198         }
199       }
200     }
201     else {
202       if (roworiented && !baij->donotstash) {
203         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
204       }
205       else {
206         if (!baij->donotstash) {
207           row = im[i];
208 	  for ( j=0; j<n; j++ ) {
209 	    ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
210           }
211         }
212       }
213     }
214   }
215   return 0;
216 }
217 
218 extern int MatSetValuesBlocked_SeqBAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
219 #undef __FUNC__
220 #define __FUNC__ "MatSetValuesBlocked_MPIBAIJ"
221 static int MatSetValuesBlocked_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
222 {
223   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
224   Scalar      *value,*tmp;
225   int         ierr,i,j,ii,jj,row,col;
226   int         roworiented = baij->roworiented,rstart=baij->rstart ;
227   int         rend=baij->rend,cstart=baij->cstart,stepval;
228   int         cend=baij->cend,bs=baij->bs,bs2=baij->bs2;
229 
230   /* Should be stashed somewhere to avoid multiple mallocs */
231   tmp = (Scalar*) PetscMalloc(bs2*sizeof(Scalar)); CHKPTRQ(tmp);
232   if (roworiented) {
233     stepval = (n-1)*bs;
234   } else {
235     stepval = (m-1)*bs;
236   }
237   value = (Scalar *)PetscMalloc(bs2*sizeof(Scalar)); CHKPTRQ(value);
238   for ( i=0; i<m; i++ ) {
239 #if defined(PETSC_BOPT_g)
240     if (im[i] < 0) SETERRQ(1,0,"Negative row");
241     if (im[i] >= baij->Mbs) SETERRQ(1,0,"Row too large");
242 #endif
243     if (im[i] >= rstart && im[i] < rend) {
244       row = im[i] - rstart;
245       for ( j=0; j<n; j++ ) {
246         if (in[j] >= cstart && in[j] < cend){
247           col = in[j] - cstart;
248           if (roworiented) {
249             value = v + i*(stepval+bs)*bs + j*bs;
250             for ( ii=0; ii<bs; ii++,value+=stepval )
251               for (jj=ii; jj<bs2; jj+=bs )
252                 tmp[jj] = *value++;
253           } else {
254             value = v + j*(stepval+bs)*bs + i*bs;
255             for ( ii=0; ii<bs; ii++,value+=stepval )
256               for (jj=0; jj<bs; jj++ )
257                 *tmp++  = *value++;
258             tmp -=bs2;
259           }
260           ierr = MatSetValuesBlocked_SeqBAIJ(baij->A,1,&row,1,&col,tmp,addv);CHKERRQ(ierr);
261         }
262 #if defined(PETSC_BOPT_g)
263         else if (in[j] < 0) {SETERRQ(1,0,"Negative column");}
264         else if (in[j] >= baij->Nbs) {SETERRQ(1,0,"Col too large");}
265 #endif
266         else {
267           if (mat->was_assembled) {
268             if (!baij->colmap) {
269               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
270             }
271             col = baij->colmap[in[j]] - 1;
272             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
273               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
274               col =  in[j];
275             }
276           }
277           else col = in[j];
278           if (roworiented) {
279             value = v + i*(stepval+bs)*bs + j*bs;
280             for ( ii=0; ii<bs; ii++,value+=stepval )
281               for (jj=ii; jj<bs2; jj+=bs )
282                 tmp[jj] = *value++;
283           } else {
284             value = v + j*(stepval+bs)*bs + i*bs;
285             for ( ii=0; ii<bs; ii++,value+=stepval )
286               for (jj=0; jj<bs; jj++ )
287                 *tmp++  = *value++;
288             tmp -=bs2;
289           }
290           ierr = MatSetValuesBlocked_SeqBAIJ(baij->B,1,&row,1,&col,tmp,addv);CHKERRQ(ierr);
291         }
292       }
293     }
294     else {
295       if (!baij->donotstash) {
296         row = im[i]*bs;
297         if (roworiented ) {
298           for ( ii=0; ii<bs; i++,row++ ) {
299             for ( jj=0; jj<bs; jj++ ) {
300               ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
301             }
302           }
303         }
304         else {
305           row = im[i];
306 	  for ( j=0; j<n; j++ ) {
307 	    ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
308           }
309         }
310       }
311     }
312   }
313   PetscFree(tmp);
314   return 0;
315 }
316 
317 #undef __FUNC__
318 #define __FUNC__ "MatGetValues_MPIBAIJ"
319 static int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
320 {
321   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
322   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
323   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col;
324 
325   for ( i=0; i<m; i++ ) {
326     if (idxm[i] < 0) SETERRQ(1,0,"Negative row");
327     if (idxm[i] >= baij->M) SETERRQ(1,0,"Row too large");
328     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
329       row = idxm[i] - bsrstart;
330       for ( j=0; j<n; j++ ) {
331         if (idxn[j] < 0) SETERRQ(1,0,"Negative column");
332         if (idxn[j] >= baij->N) SETERRQ(1,0,"Col too large");
333         if (idxn[j] >= bscstart && idxn[j] < bscend){
334           col = idxn[j] - bscstart;
335           ierr = MatGetValues(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
336         }
337         else {
338           if (!baij->colmap) {
339             ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
340           }
341           if((baij->colmap[idxn[j]/bs]-1 < 0) ||
342              (baij->garray[(baij->colmap[idxn[j]/bs]-1)/bs] != idxn[j]/bs)) *(v+i*n+j) = 0.0;
343           else {
344             col  = (baij->colmap[idxn[j]/bs]-1) + idxn[j]%bs;
345             ierr = MatGetValues(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
346           }
347         }
348       }
349     }
350     else {
351       SETERRQ(1,0,"Only local values currently supported");
352     }
353   }
354   return 0;
355 }
356 
357 #undef __FUNC__
358 #define __FUNC__ "MatNorm_MPIBAIJ"
359 static int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
360 {
361   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
362   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
363   int        ierr, i,bs2=baij->bs2;
364   double     sum = 0.0;
365   Scalar     *v;
366 
367   if (baij->size == 1) {
368     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
369   } else {
370     if (type == NORM_FROBENIUS) {
371       v = amat->a;
372       for (i=0; i<amat->nz*bs2; i++ ) {
373 #if defined(PETSC_COMPLEX)
374         sum += real(conj(*v)*(*v)); v++;
375 #else
376         sum += (*v)*(*v); v++;
377 #endif
378       }
379       v = bmat->a;
380       for (i=0; i<bmat->nz*bs2; i++ ) {
381 #if defined(PETSC_COMPLEX)
382         sum += real(conj(*v)*(*v)); v++;
383 #else
384         sum += (*v)*(*v); v++;
385 #endif
386       }
387       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
388       *norm = sqrt(*norm);
389     }
390     else
391       SETERRQ(PETSC_ERR_SUP,0,"No support for this norm yet");
392   }
393   return 0;
394 }
395 
396 #undef __FUNC__
397 #define __FUNC__ "MatAssemblyBegin_MPIBAIJ"
398 static int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
399 {
400   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
401   MPI_Comm    comm = mat->comm;
402   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
403   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
404   MPI_Request *send_waits,*recv_waits;
405   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
406   InsertMode  addv;
407   Scalar      *rvalues,*svalues;
408 
409   /* make sure all processors are either in INSERTMODE or ADDMODE */
410   MPI_Allreduce(&mat->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
411   if (addv == (ADD_VALUES|INSERT_VALUES)) {
412     SETERRQ(1,0,"Some processors inserted others added");
413   }
414   mat->insertmode = addv; /* in case this processor had no cache */
415 
416   /*  first count number of contributors to each processor */
417   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
418   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
419   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
420   for ( i=0; i<baij->stash.n; i++ ) {
421     idx = baij->stash.idx[i];
422     for ( j=0; j<size; j++ ) {
423       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
424         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
425       }
426     }
427   }
428   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
429 
430   /* inform other processors of number of messages and max length*/
431   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
432   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
433   nreceives = work[rank];
434   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
435   nmax = work[rank];
436   PetscFree(work);
437 
438   /* post receives:
439        1) each message will consist of ordered pairs
440      (global index,value) we store the global index as a double
441      to simplify the message passing.
442        2) since we don't know how long each individual message is we
443      allocate the largest needed buffer for each receive. Potentially
444      this is a lot of wasted space.
445 
446 
447        This could be done better.
448   */
449   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
450   CHKPTRQ(rvalues);
451   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
452   CHKPTRQ(recv_waits);
453   for ( i=0; i<nreceives; i++ ) {
454     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
455               comm,recv_waits+i);
456   }
457 
458   /* do sends:
459       1) starts[i] gives the starting index in svalues for stuff going to
460          the ith processor
461   */
462   svalues = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
463   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
464   CHKPTRQ(send_waits);
465   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
466   starts[0] = 0;
467   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
468   for ( i=0; i<baij->stash.n; i++ ) {
469     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
470     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
471     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
472   }
473   PetscFree(owner);
474   starts[0] = 0;
475   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
476   count = 0;
477   for ( i=0; i<size; i++ ) {
478     if (procs[i]) {
479       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
480                 comm,send_waits+count++);
481     }
482   }
483   PetscFree(starts); PetscFree(nprocs);
484 
485   /* Free cache space */
486   PLogInfo(mat,"MatAssemblyBegin_MPIBAIJ:Number of off-processor values %d\n",baij->stash.n);
487   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
488 
489   baij->svalues    = svalues;    baij->rvalues    = rvalues;
490   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
491   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
492   baij->rmax       = nmax;
493 
494   return 0;
495 }
496 
497 
498 #undef __FUNC__
499 #define __FUNC__ "MatAssemblyEnd_MPIBAIJ"
500 static int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
501 {
502   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
503   MPI_Status  *send_status,recv_status;
504   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
505   int         bs=baij->bs,row,col,other_disassembled;
506   Scalar      *values,val;
507   InsertMode  addv = mat->insertmode;
508 
509   /*  wait on receives */
510   while (count) {
511     MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);
512     /* unpack receives into our local space */
513     values = baij->rvalues + 3*imdex*baij->rmax;
514     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
515     n = n/3;
516     for ( i=0; i<n; i++ ) {
517       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
518       col = (int) PetscReal(values[3*i+1]);
519       val = values[3*i+2];
520       if (col >= baij->cstart*bs && col < baij->cend*bs) {
521         col -= baij->cstart*bs;
522         MatSetValues(baij->A,1,&row,1,&col,&val,addv);
523       }
524       else {
525         if (mat->was_assembled) {
526           if (!baij->colmap) {
527             ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);
528           }
529           col = (baij->colmap[col/bs]-1)*bs + col%bs;
530           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
531             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
532             col = (int) PetscReal(values[3*i+1]);
533           }
534         }
535         MatSetValues(baij->B,1,&row,1,&col,&val,addv);
536       }
537     }
538     count--;
539   }
540   PetscFree(baij->recv_waits); PetscFree(baij->rvalues);
541 
542   /* wait on sends */
543   if (baij->nsends) {
544     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));
545     CHKPTRQ(send_status);
546     MPI_Waitall(baij->nsends,baij->send_waits,send_status);
547     PetscFree(send_status);
548   }
549   PetscFree(baij->send_waits); PetscFree(baij->svalues);
550 
551   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
552   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
553 
554   /* determine if any processor has disassembled, if so we must
555      also disassemble ourselfs, in order that we may reassemble. */
556   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
557   if (mat->was_assembled && !other_disassembled) {
558     ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
559   }
560 
561   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
562     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
563   }
564   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
565   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
566 
567   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
568   return 0;
569 }
570 
571 #undef __FUNC__
572 #define __FUNC__ "MatView_MPIBAIJ_Binary"
573 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
574 {
575   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
576   int          ierr;
577 
578   if (baij->size == 1) {
579     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
580   }
581   else SETERRQ(1,0,"Only uniprocessor output supported");
582   return 0;
583 }
584 
585 #undef __FUNC__
586 #define __FUNC__ "MatView_MPIBAIJ_ASCIIorDraworMatlab"
587 static int MatView_MPIBAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
588 {
589   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
590   int          ierr, format,rank,bs = baij->bs;
591   FILE         *fd;
592   ViewerType   vtype;
593 
594   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
595   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
596     ierr = ViewerGetFormat(viewer,&format);
597     if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
598       MatInfo info;
599       MPI_Comm_rank(mat->comm,&rank);
600       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
601       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
602       PetscSequentialPhaseBegin(mat->comm,1);
603       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
604               rank,baij->m,(int)info.nz_used*bs,(int)info.nz_allocated*bs,
605               baij->bs,(int)info.memory);
606       ierr = MatGetInfo(baij->A,MAT_LOCAL,&info);
607       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
608       ierr = MatGetInfo(baij->B,MAT_LOCAL,&info);
609       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
610       fflush(fd);
611       PetscSequentialPhaseEnd(mat->comm,1);
612       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
613       return 0;
614     }
615     else if (format == VIEWER_FORMAT_ASCII_INFO) {
616       PetscPrintf(mat->comm,"  block size is %d\n",bs);
617       return 0;
618     }
619   }
620 
621   if (vtype == DRAW_VIEWER) {
622     Draw       draw;
623     PetscTruth isnull;
624     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
625     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
626   }
627 
628   if (vtype == ASCII_FILE_VIEWER) {
629     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
630     PetscSequentialPhaseBegin(mat->comm,1);
631     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
632            baij->rank,baij->m,baij->rstart*bs,baij->rend*bs,baij->n,
633             baij->cstart*bs,baij->cend*bs);
634     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
635     ierr = MatView(baij->B,viewer); CHKERRQ(ierr);
636     fflush(fd);
637     PetscSequentialPhaseEnd(mat->comm,1);
638   }
639   else {
640     int size = baij->size;
641     rank = baij->rank;
642     if (size == 1) {
643       ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
644     }
645     else {
646       /* assemble the entire matrix onto first processor. */
647       Mat         A;
648       Mat_SeqBAIJ *Aloc;
649       int         M = baij->M, N = baij->N,*ai,*aj,row,col,i,j,k,*rvals;
650       int         mbs=baij->mbs;
651       Scalar      *a;
652 
653       if (!rank) {
654         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
655         CHKERRQ(ierr);
656       }
657       else {
658         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
659         CHKERRQ(ierr);
660       }
661       PLogObjectParent(mat,A);
662 
663       /* copy over the A part */
664       Aloc = (Mat_SeqBAIJ*) baij->A->data;
665       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
666       row = baij->rstart;
667       rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
668 
669       for ( i=0; i<mbs; i++ ) {
670         rvals[0] = bs*(baij->rstart + i);
671         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
672         for ( j=ai[i]; j<ai[i+1]; j++ ) {
673           col = (baij->cstart+aj[j])*bs;
674           for (k=0; k<bs; k++ ) {
675             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
676             col++; a += bs;
677           }
678         }
679       }
680       /* copy over the B part */
681       Aloc = (Mat_SeqBAIJ*) baij->B->data;
682       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
683       row = baij->rstart*bs;
684       for ( i=0; i<mbs; i++ ) {
685         rvals[0] = bs*(baij->rstart + i);
686         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
687         for ( j=ai[i]; j<ai[i+1]; j++ ) {
688           col = baij->garray[aj[j]]*bs;
689           for (k=0; k<bs; k++ ) {
690             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
691             col++; a += bs;
692           }
693         }
694       }
695       PetscFree(rvals);
696       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
697       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
698       if (!rank) {
699         ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
700       }
701       ierr = MatDestroy(A); CHKERRQ(ierr);
702     }
703   }
704   return 0;
705 }
706 
707 
708 
709 #undef __FUNC__
710 #define __FUNC__ "MatView_MPIBAIJ"
711 static int MatView_MPIBAIJ(PetscObject obj,Viewer viewer)
712 {
713   Mat         mat = (Mat) obj;
714   int         ierr;
715   ViewerType  vtype;
716 
717   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
718   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
719       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
720     ierr = MatView_MPIBAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
721   }
722   else if (vtype == BINARY_FILE_VIEWER) {
723     return MatView_MPIBAIJ_Binary(mat,viewer);
724   }
725   return 0;
726 }
727 
728 #undef __FUNC__
729 #define __FUNC__ "MatDestroy_MPIBAIJ"
730 static int MatDestroy_MPIBAIJ(PetscObject obj)
731 {
732   Mat         mat = (Mat) obj;
733   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
734   int         ierr;
735 
736 #if defined(PETSC_LOG)
737   PLogObjectState(obj,"Rows=%d, Cols=%d",baij->M,baij->N);
738 #endif
739 
740   PetscFree(baij->rowners);
741   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
742   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
743   if (baij->colmap) PetscFree(baij->colmap);
744   if (baij->garray) PetscFree(baij->garray);
745   if (baij->lvec)   VecDestroy(baij->lvec);
746   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
747   if (baij->rowvalues) PetscFree(baij->rowvalues);
748   PetscFree(baij);
749   if (mat->mapping) {
750     ierr = ISLocalToGlobalMappingDestroy(mat->mapping); CHKERRQ(ierr);
751   }
752   PLogObjectDestroy(mat);
753   PetscHeaderDestroy(mat);
754   return 0;
755 }
756 
757 #undef __FUNC__
758 #define __FUNC__ "MatMult_MPIBAIJ"
759 static int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
760 {
761   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
762   int         ierr, nt;
763 
764   VecGetLocalSize_Fast(xx,nt);
765   if (nt != a->n) {
766     SETERRQ(1,0,"Incompatible partition of A and xx");
767   }
768   VecGetLocalSize_Fast(yy,nt);
769   if (nt != a->m) {
770     SETERRQ(1,0,"Incompatible parition of A and yy");
771   }
772   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
773   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
774   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
775   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
776   ierr = VecScatterPostRecvs(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
777   return 0;
778 }
779 
780 #undef __FUNC__
781 #define __FUNC__ "MatMultAdd_MPIBAIJ"
782 static int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
783 {
784   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
785   int        ierr;
786   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
787   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
788   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
789   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
790   return 0;
791 }
792 
793 #undef __FUNC__
794 #define __FUNC__ "MatMultTrans_MPIBAIJ"
795 static int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
796 {
797   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
798   int        ierr;
799 
800   /* do nondiagonal part */
801   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
802   /* send it on its way */
803   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
804   /* do local part */
805   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
806   /* receive remote parts: note this assumes the values are not actually */
807   /* inserted in yy until the next line, which is true for my implementation*/
808   /* but is not perhaps always true. */
809   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
810   return 0;
811 }
812 
813 #undef __FUNC__
814 #define __FUNC__ "MatMultTransAdd_MPIBAIJ"
815 static int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
816 {
817   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
818   int        ierr;
819 
820   /* do nondiagonal part */
821   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
822   /* send it on its way */
823   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
824   /* do local part */
825   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
826   /* receive remote parts: note this assumes the values are not actually */
827   /* inserted in yy until the next line, which is true for my implementation*/
828   /* but is not perhaps always true. */
829   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
830   return 0;
831 }
832 
833 /*
834   This only works correctly for square matrices where the subblock A->A is the
835    diagonal block
836 */
837 #undef __FUNC__
838 #define __FUNC__ "MatGetDiagonal_MPIBAIJ"
839 static int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
840 {
841   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
842   if (a->M != a->N)
843     SETERRQ(1,0,"Supports only square matrix where A->A is diag block");
844   return MatGetDiagonal(a->A,v);
845 }
846 
847 #undef __FUNC__
848 #define __FUNC__ "MatScale_MPIBAIJ"
849 static int MatScale_MPIBAIJ(Scalar *aa,Mat A)
850 {
851   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
852   int        ierr;
853   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
854   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
855   return 0;
856 }
857 
858 #undef __FUNC__
859 #define __FUNC__ "MatGetSize_MPIBAIJ"
860 static int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
861 {
862   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
863   *m = mat->M; *n = mat->N;
864   return 0;
865 }
866 
867 #undef __FUNC__
868 #define __FUNC__ "MatGetLocalSize_MPIBAIJ"
869 static int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
870 {
871   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
872   *m = mat->m; *n = mat->N;
873   return 0;
874 }
875 
876 #undef __FUNC__
877 #define __FUNC__ "MatGetOwnershipRange_MPIBAIJ"
878 static int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
879 {
880   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
881   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
882   return 0;
883 }
884 
885 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
886 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
887 
888 #undef __FUNC__
889 #define __FUNC__ "MatGetRow_MPIBAIJ"
890 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
891 {
892   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
893   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
894   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
895   int        nztot, nzA, nzB, lrow, brstart = mat->rstart*bs, brend = mat->rend*bs;
896   int        *cmap, *idx_p,cstart = mat->cstart;
897 
898   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,0,"Already active");
899   mat->getrowactive = PETSC_TRUE;
900 
901   if (!mat->rowvalues && (idx || v)) {
902     /*
903         allocate enough space to hold information from the longest row.
904     */
905     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
906     int     max = 1,mbs = mat->mbs,tmp;
907     for ( i=0; i<mbs; i++ ) {
908       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
909       if (max < tmp) { max = tmp; }
910     }
911     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
912     CHKPTRQ(mat->rowvalues);
913     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
914   }
915 
916 
917   if (row < brstart || row >= brend) SETERRQ(1,0,"Only local rows")
918   lrow = row - brstart;
919 
920   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
921   if (!v)   {pvA = 0; pvB = 0;}
922   if (!idx) {pcA = 0; if (!v) pcB = 0;}
923   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
924   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
925   nztot = nzA + nzB;
926 
927   cmap  = mat->garray;
928   if (v  || idx) {
929     if (nztot) {
930       /* Sort by increasing column numbers, assuming A and B already sorted */
931       int imark = -1;
932       if (v) {
933         *v = v_p = mat->rowvalues;
934         for ( i=0; i<nzB; i++ ) {
935           if (cmap[cworkB[i]/bs] < cstart)   v_p[i] = vworkB[i];
936           else break;
937         }
938         imark = i;
939         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
940         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
941       }
942       if (idx) {
943         *idx = idx_p = mat->rowindices;
944         if (imark > -1) {
945           for ( i=0; i<imark; i++ ) {
946             idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs;
947           }
948         } else {
949           for ( i=0; i<nzB; i++ ) {
950             if (cmap[cworkB[i]/bs] < cstart)
951               idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
952             else break;
953           }
954           imark = i;
955         }
956         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart*bs + cworkA[i];
957         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
958       }
959     }
960     else {
961       if (idx) *idx = 0;
962       if (v)   *v   = 0;
963     }
964   }
965   *nz = nztot;
966   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
967   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
968   return 0;
969 }
970 
971 #undef __FUNC__
972 #define __FUNC__ "MatRestoreRow_MPIBAIJ"
973 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
974 {
975   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
976   if (baij->getrowactive == PETSC_FALSE) {
977     SETERRQ(1,0,"MatGetRow not called");
978   }
979   baij->getrowactive = PETSC_FALSE;
980   return 0;
981 }
982 
983 #undef __FUNC__
984 #define __FUNC__ "MatGetBlockSize_MPIBAIJ"
985 static int MatGetBlockSize_MPIBAIJ(Mat mat,int *bs)
986 {
987   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
988   *bs = baij->bs;
989   return 0;
990 }
991 
992 #undef __FUNC__
993 #define __FUNC__ "MatZeroEntries_MPIBAIJ"
994 static int MatZeroEntries_MPIBAIJ(Mat A)
995 {
996   Mat_MPIBAIJ *l = (Mat_MPIBAIJ *) A->data;
997   int         ierr;
998   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
999   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
1000   return 0;
1001 }
1002 
1003 #undef __FUNC__
1004 #define __FUNC__ "MatGetInfo_MPIBAIJ"
1005 static int MatGetInfo_MPIBAIJ(Mat matin,MatInfoType flag,MatInfo *info)
1006 {
1007   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) matin->data;
1008   Mat         A = a->A, B = a->B;
1009   int         ierr;
1010   double      isend[5], irecv[5];
1011 
1012   info->rows_global    = (double)a->M;
1013   info->columns_global = (double)a->N;
1014   info->rows_local     = (double)a->m;
1015   info->columns_local  = (double)a->N;
1016   info->block_size     = (double)a->bs;
1017   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
1018   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->memory;
1019   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
1020   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->memory;
1021   if (flag == MAT_LOCAL) {
1022     info->nz_used      = isend[0];
1023     info->nz_allocated = isend[1];
1024     info->nz_unneeded  = isend[2];
1025     info->memory       = isend[3];
1026     info->mallocs      = isend[4];
1027   } else if (flag == MAT_GLOBAL_MAX) {
1028     MPI_Allreduce(isend,irecv,5,MPI_INT,MPI_MAX,matin->comm);
1029     info->nz_used      = irecv[0];
1030     info->nz_allocated = irecv[1];
1031     info->nz_unneeded  = irecv[2];
1032     info->memory       = irecv[3];
1033     info->mallocs      = irecv[4];
1034   } else if (flag == MAT_GLOBAL_SUM) {
1035     MPI_Allreduce(isend,irecv,5,MPI_INT,MPI_SUM,matin->comm);
1036     info->nz_used      = irecv[0];
1037     info->nz_allocated = irecv[1];
1038     info->nz_unneeded  = irecv[2];
1039     info->memory       = irecv[3];
1040     info->mallocs      = irecv[4];
1041   }
1042   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
1043   info->fill_ratio_needed = 0;
1044   info->factor_mallocs    = 0;
1045   return 0;
1046 }
1047 
1048 #undef __FUNC__
1049 #define __FUNC__ "MatSetOption_MPIBAIJ"
1050 static int MatSetOption_MPIBAIJ(Mat A,MatOption op)
1051 {
1052   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1053 
1054   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
1055       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
1056       op == MAT_COLUMNS_UNSORTED ||
1057       op == MAT_COLUMNS_SORTED) {
1058         MatSetOption(a->A,op);
1059         MatSetOption(a->B,op);
1060   } else if (op == MAT_ROW_ORIENTED) {
1061         a->roworiented = 1;
1062         MatSetOption(a->A,op);
1063         MatSetOption(a->B,op);
1064   } else if (op == MAT_ROWS_SORTED ||
1065              op == MAT_ROWS_UNSORTED ||
1066              op == MAT_SYMMETRIC ||
1067              op == MAT_STRUCTURALLY_SYMMETRIC ||
1068              op == MAT_YES_NEW_DIAGONALS)
1069     PLogInfo(A,"Info:MatSetOption_MPIBAIJ:Option ignored\n");
1070   else if (op == MAT_COLUMN_ORIENTED) {
1071     a->roworiented = 0;
1072     MatSetOption(a->A,op);
1073     MatSetOption(a->B,op);
1074   } else if (op == MAT_IGNORE_OFF_PROCESSOR_ENTRIES) {
1075     a->donotstash = 1;
1076   } else if (op == MAT_NO_NEW_DIAGONALS)
1077     {SETERRQ(PETSC_ERR_SUP,0,"MAT_NO_NEW_DIAGONALS");}
1078   else
1079     {SETERRQ(PETSC_ERR_SUP,0,"unknown option");}
1080   return 0;
1081 }
1082 
1083 #undef __FUNC__
1084 #define __FUNC__ "MatTranspose_MPIBAIJ("
1085 static int MatTranspose_MPIBAIJ(Mat A,Mat *matout)
1086 {
1087   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) A->data;
1088   Mat_SeqBAIJ *Aloc;
1089   Mat        B;
1090   int        ierr,M=baij->M,N=baij->N,*ai,*aj,row,i,*rvals,j,k,col;
1091   int        bs=baij->bs,mbs=baij->mbs;
1092   Scalar     *a;
1093 
1094   if (matout == PETSC_NULL && M != N)
1095     SETERRQ(1,0,"Square matrix only for in-place");
1096   ierr = MatCreateMPIBAIJ(A->comm,baij->bs,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);
1097   CHKERRQ(ierr);
1098 
1099   /* copy over the A part */
1100   Aloc = (Mat_SeqBAIJ*) baij->A->data;
1101   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1102   row = baij->rstart;
1103   rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
1104 
1105   for ( i=0; i<mbs; i++ ) {
1106     rvals[0] = bs*(baij->rstart + i);
1107     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1108     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1109       col = (baij->cstart+aj[j])*bs;
1110       for (k=0; k<bs; k++ ) {
1111         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1112         col++; a += bs;
1113       }
1114     }
1115   }
1116   /* copy over the B part */
1117   Aloc = (Mat_SeqBAIJ*) baij->B->data;
1118   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1119   row = baij->rstart*bs;
1120   for ( i=0; i<mbs; i++ ) {
1121     rvals[0] = bs*(baij->rstart + i);
1122     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1123     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1124       col = baij->garray[aj[j]]*bs;
1125       for (k=0; k<bs; k++ ) {
1126         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1127         col++; a += bs;
1128       }
1129     }
1130   }
1131   PetscFree(rvals);
1132   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1133   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1134 
1135   if (matout != PETSC_NULL) {
1136     *matout = B;
1137   } else {
1138     /* This isn't really an in-place transpose .... but free data structures from baij */
1139     PetscFree(baij->rowners);
1140     ierr = MatDestroy(baij->A); CHKERRQ(ierr);
1141     ierr = MatDestroy(baij->B); CHKERRQ(ierr);
1142     if (baij->colmap) PetscFree(baij->colmap);
1143     if (baij->garray) PetscFree(baij->garray);
1144     if (baij->lvec) VecDestroy(baij->lvec);
1145     if (baij->Mvctx) VecScatterDestroy(baij->Mvctx);
1146     PetscFree(baij);
1147     PetscMemcpy(A,B,sizeof(struct _Mat));
1148     PetscHeaderDestroy(B);
1149   }
1150   return 0;
1151 }
1152 
1153 #undef __FUNC__
1154 #define __FUNC__ "MatDiagonalScale_MPIBAIJ"
1155 int MatDiagonalScale_MPIBAIJ(Mat A,Vec ll,Vec rr)
1156 {
1157   Mat a = ((Mat_MPIBAIJ *) A->data)->A;
1158   Mat b = ((Mat_MPIBAIJ *) A->data)->B;
1159   int ierr,s1,s2,s3;
1160 
1161   if (ll)  {
1162     ierr = VecGetLocalSize(ll,&s1); CHKERRQ(ierr);
1163     ierr = MatGetLocalSize(A,&s2,&s3); CHKERRQ(ierr);
1164     if (s1!=s2) SETERRQ(1,0,"non-conforming local sizes");
1165     ierr = MatDiagonalScale(a,ll,0); CHKERRQ(ierr);
1166     ierr = MatDiagonalScale(b,ll,0); CHKERRQ(ierr);
1167   }
1168   if (rr) SETERRQ(1,0,"not supported for right vector");
1169   return 0;
1170 }
1171 
1172 /* the code does not do the diagonal entries correctly unless the
1173    matrix is square and the column and row owerships are identical.
1174    This is a BUG. The only way to fix it seems to be to access
1175    baij->A and baij->B directly and not through the MatZeroRows()
1176    routine.
1177 */
1178 #undef __FUNC__
1179 #define __FUNC__ "MatZeroRows_MPIBAIJ"
1180 static int MatZeroRows_MPIBAIJ(Mat A,IS is,Scalar *diag)
1181 {
1182   Mat_MPIBAIJ    *l = (Mat_MPIBAIJ *) A->data;
1183   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
1184   int            *procs,*nprocs,j,found,idx,nsends,*work;
1185   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
1186   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
1187   int            *lens,imdex,*lrows,*values,bs=l->bs;
1188   MPI_Comm       comm = A->comm;
1189   MPI_Request    *send_waits,*recv_waits;
1190   MPI_Status     recv_status,*send_status;
1191   IS             istmp;
1192 
1193   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
1194   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
1195 
1196   /*  first count number of contributors to each processor */
1197   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
1198   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
1199   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
1200   for ( i=0; i<N; i++ ) {
1201     idx = rows[i];
1202     found = 0;
1203     for ( j=0; j<size; j++ ) {
1204       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
1205         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
1206       }
1207     }
1208     if (!found) SETERRQ(1,0,"Index out of range");
1209   }
1210   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
1211 
1212   /* inform other processors of number of messages and max length*/
1213   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
1214   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
1215   nrecvs = work[rank];
1216   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
1217   nmax = work[rank];
1218   PetscFree(work);
1219 
1220   /* post receives:   */
1221   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
1222   CHKPTRQ(rvalues);
1223   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
1224   CHKPTRQ(recv_waits);
1225   for ( i=0; i<nrecvs; i++ ) {
1226     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
1227   }
1228 
1229   /* do sends:
1230       1) starts[i] gives the starting index in svalues for stuff going to
1231          the ith processor
1232   */
1233   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
1234   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
1235   CHKPTRQ(send_waits);
1236   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
1237   starts[0] = 0;
1238   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1239   for ( i=0; i<N; i++ ) {
1240     svalues[starts[owner[i]]++] = rows[i];
1241   }
1242   ISRestoreIndices(is,&rows);
1243 
1244   starts[0] = 0;
1245   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1246   count = 0;
1247   for ( i=0; i<size; i++ ) {
1248     if (procs[i]) {
1249       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
1250     }
1251   }
1252   PetscFree(starts);
1253 
1254   base = owners[rank]*bs;
1255 
1256   /*  wait on receives */
1257   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
1258   source = lens + nrecvs;
1259   count  = nrecvs; slen = 0;
1260   while (count) {
1261     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
1262     /* unpack receives into our local space */
1263     MPI_Get_count(&recv_status,MPI_INT,&n);
1264     source[imdex]  = recv_status.MPI_SOURCE;
1265     lens[imdex]  = n;
1266     slen += n;
1267     count--;
1268   }
1269   PetscFree(recv_waits);
1270 
1271   /* move the data into the send scatter */
1272   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
1273   count = 0;
1274   for ( i=0; i<nrecvs; i++ ) {
1275     values = rvalues + i*nmax;
1276     for ( j=0; j<lens[i]; j++ ) {
1277       lrows[count++] = values[j] - base;
1278     }
1279   }
1280   PetscFree(rvalues); PetscFree(lens);
1281   PetscFree(owner); PetscFree(nprocs);
1282 
1283   /* actually zap the local rows */
1284   ierr = ISCreateGeneral(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
1285   PLogObjectParent(A,istmp);
1286   PetscFree(lrows);
1287   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
1288   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
1289   ierr = ISDestroy(istmp); CHKERRQ(ierr);
1290 
1291   /* wait on sends */
1292   if (nsends) {
1293     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
1294     CHKPTRQ(send_status);
1295     MPI_Waitall(nsends,send_waits,send_status);
1296     PetscFree(send_status);
1297   }
1298   PetscFree(send_waits); PetscFree(svalues);
1299 
1300   return 0;
1301 }
1302 extern int MatPrintHelp_SeqBAIJ(Mat);
1303 #undef __FUNC__
1304 #define __FUNC__ "MatPrintHelp_MPIBAIJ"
1305 static int MatPrintHelp_MPIBAIJ(Mat A)
1306 {
1307   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1308 
1309   if (!a->rank) return MatPrintHelp_SeqBAIJ(a->A);
1310   else return 0;
1311 }
1312 
1313 #undef __FUNC__
1314 #define __FUNC__ "MatSetUnfactored_MPIBAIJ"
1315 static int MatSetUnfactored_MPIBAIJ(Mat A)
1316 {
1317   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1318   int         ierr;
1319   ierr = MatSetUnfactored(a->A); CHKERRQ(ierr);
1320   return 0;
1321 }
1322 
1323 static int MatConvertSameType_MPIBAIJ(Mat,Mat *,int);
1324 
1325 /* -------------------------------------------------------------------*/
1326 static struct _MatOps MatOps = {
1327   MatSetValues_MPIBAIJ,MatGetRow_MPIBAIJ,MatRestoreRow_MPIBAIJ,MatMult_MPIBAIJ,
1328   MatMultAdd_MPIBAIJ,MatMultTrans_MPIBAIJ,MatMultTransAdd_MPIBAIJ,0,
1329   0,0,0,0,
1330   0,0,MatTranspose_MPIBAIJ,MatGetInfo_MPIBAIJ,
1331   0,MatGetDiagonal_MPIBAIJ,MatDiagonalScale_MPIBAIJ,MatNorm_MPIBAIJ,
1332   MatAssemblyBegin_MPIBAIJ,MatAssemblyEnd_MPIBAIJ,0,MatSetOption_MPIBAIJ,
1333   MatZeroEntries_MPIBAIJ,MatZeroRows_MPIBAIJ,0,
1334   0,0,0,MatGetSize_MPIBAIJ,
1335   MatGetLocalSize_MPIBAIJ,MatGetOwnershipRange_MPIBAIJ,0,0,
1336   0,0,MatConvertSameType_MPIBAIJ,0,0,
1337   0,0,0,MatGetSubMatrices_MPIBAIJ,
1338   MatIncreaseOverlap_MPIBAIJ,MatGetValues_MPIBAIJ,0,MatPrintHelp_MPIBAIJ,
1339   MatScale_MPIBAIJ,0,0,0,MatGetBlockSize_MPIBAIJ,
1340   0,0,0,0,0,0,MatSetUnfactored_MPIBAIJ,0,MatSetValuesBlocked_MPIBAIJ};
1341 
1342 
1343 #undef __FUNC__
1344 #define __FUNC__ "MatCreateMPIBAIJ"
1345 /*@C
1346    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
1347    (block compressed row).  For good matrix assembly performance
1348    the user should preallocate the matrix storage by setting the parameters
1349    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1350    performance can be increased by more than a factor of 50.
1351 
1352    Input Parameters:
1353 .  comm - MPI communicator
1354 .  bs   - size of blockk
1355 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1356            This value should be the same as the local size used in creating the
1357            y vector for the matrix-vector product y = Ax.
1358 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1359            This value should be the same as the local size used in creating the
1360            x vector for the matrix-vector product y = Ax.
1361 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1362 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1363 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
1364            submatrix  (same for all local rows)
1365 .  d_nzz - array containing the number of block nonzeros in the various block rows
1366            of the in diagonal portion of the local (possibly different for each block
1367            row) or PETSC_NULL.  You must leave room for the diagonal entry even if
1368            it is zero.
1369 .  o_nz  - number of block nonzeros per block row in the off-diagonal portion of local
1370            submatrix (same for all local rows).
1371 .  o_nzz - array containing the number of nonzeros in the various block rows of the
1372            off-diagonal portion of the local submatrix (possibly different for
1373            each block row) or PETSC_NULL.
1374 
1375    Output Parameter:
1376 .  A - the matrix
1377 
1378    Notes:
1379    The user MUST specify either the local or global matrix dimensions
1380    (possibly both).
1381 
1382    Storage Information:
1383    For a square global matrix we define each processor's diagonal portion
1384    to be its local rows and the corresponding columns (a square submatrix);
1385    each processor's off-diagonal portion encompasses the remainder of the
1386    local matrix (a rectangular submatrix).
1387 
1388    The user can specify preallocated storage for the diagonal part of
1389    the local submatrix with either d_nz or d_nnz (not both).  Set
1390    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1391    memory allocation.  Likewise, specify preallocated storage for the
1392    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1393 
1394    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1395    the figure below we depict these three local rows and all columns (0-11).
1396 
1397 $          0 1 2 3 4 5 6 7 8 9 10 11
1398 $         -------------------
1399 $  row 3  |  o o o d d d o o o o o o
1400 $  row 4  |  o o o d d d o o o o o o
1401 $  row 5  |  o o o d d d o o o o o o
1402 $         -------------------
1403 $
1404 
1405    Thus, any entries in the d locations are stored in the d (diagonal)
1406    submatrix, and any entries in the o locations are stored in the
1407    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1408    stored simply in the MATSEQBAIJ format for compressed row storage.
1409 
1410    Now d_nz should indicate the number of nonzeros per row in the d matrix,
1411    and o_nz should indicate the number of nonzeros per row in the o matrix.
1412    In general, for PDE problems in which most nonzeros are near the diagonal,
1413    one expects d_nz >> o_nz.   For large problems you MUST preallocate memory
1414    or you will get TERRIBLE performance; see the users' manual chapter on
1415    matrices.
1416 
1417 .keywords: matrix, block, aij, compressed row, sparse, parallel
1418 
1419 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues()
1420 @*/
1421 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
1422                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1423 {
1424   Mat          B;
1425   Mat_MPIBAIJ  *b;
1426   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE,size;
1427 
1428   if (bs < 1) SETERRQ(1,0,"invalid block size specified");
1429   *A = 0;
1430   PetscHeaderCreate(B,_Mat,MAT_COOKIE,MATMPIBAIJ,comm);
1431   PLogObjectCreate(B);
1432   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
1433   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
1434   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1435   MPI_Comm_size(comm,&size);
1436   if (size == 1) {
1437     B->ops.getrowij          = MatGetRowIJ_MPIBAIJ;
1438     B->ops.restorerowij      = MatRestoreRowIJ_MPIBAIJ;
1439     B->ops.lufactorsymbolic  = MatLUFactorSymbolic_MPIBAIJ;
1440     B->ops.lufactornumeric   = MatLUFactorNumeric_MPIBAIJ;
1441     B->ops.lufactor          = MatLUFactor_MPIBAIJ;
1442     B->ops.solve             = MatSolve_MPIBAIJ;
1443     B->ops.solveadd          = MatSolveAdd_MPIBAIJ;
1444     B->ops.solvetrans        = MatSolveTrans_MPIBAIJ;
1445     B->ops.solvetransadd     = MatSolveTransAdd_MPIBAIJ;
1446     B->ops.ilufactorsymbolic = MatILUFactorSymbolic_MPIBAIJ;
1447   }
1448 
1449   B->destroy    = MatDestroy_MPIBAIJ;
1450   B->view       = MatView_MPIBAIJ;
1451   B->mapping    = 0;
1452   B->factor     = 0;
1453   B->assembled  = PETSC_FALSE;
1454 
1455   B->insertmode = NOT_SET_VALUES;
1456   MPI_Comm_rank(comm,&b->rank);
1457   MPI_Comm_size(comm,&b->size);
1458 
1459   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
1460     SETERRQ(1,0,"Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1461   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) SETERRQ(1,0,"either M or m should be specified");
1462   if ( M == PETSC_DECIDE && n == PETSC_DECIDE)SETERRQ(1,0,"either N or n should be specified");
1463   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
1464   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
1465 
1466   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1467     work[0] = m; work[1] = n;
1468     mbs = m/bs; nbs = n/bs;
1469     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
1470     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
1471     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
1472   }
1473   if (m == PETSC_DECIDE) {
1474     Mbs = M/bs;
1475     if (Mbs*bs != M) SETERRQ(1,0,"No of global rows must be divisible by blocksize");
1476     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
1477     m   = mbs*bs;
1478   }
1479   if (n == PETSC_DECIDE) {
1480     Nbs = N/bs;
1481     if (Nbs*bs != N) SETERRQ(1,0,"No of global cols must be divisible by blocksize");
1482     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
1483     n   = nbs*bs;
1484   }
1485   if (mbs*bs != m || nbs*bs != n) SETERRQ(1,0,"No of local rows, cols must be divisible by blocksize");
1486 
1487   b->m = m; B->m = m;
1488   b->n = n; B->n = n;
1489   b->N = N; B->N = N;
1490   b->M = M; B->M = M;
1491   b->bs  = bs;
1492   b->bs2 = bs*bs;
1493   b->mbs = mbs;
1494   b->nbs = nbs;
1495   b->Mbs = Mbs;
1496   b->Nbs = Nbs;
1497 
1498   /* build local table of row and column ownerships */
1499   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1500   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
1501   b->cowners = b->rowners + b->size + 2;
1502   MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
1503   b->rowners[0] = 0;
1504   for ( i=2; i<=b->size; i++ ) {
1505     b->rowners[i] += b->rowners[i-1];
1506   }
1507   b->rstart    = b->rowners[b->rank];
1508   b->rend      = b->rowners[b->rank+1];
1509   b->rstart_bs = b->rstart * bs;
1510   b->rend_bs   = b->rend * bs;
1511 
1512   MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
1513   b->cowners[0] = 0;
1514   for ( i=2; i<=b->size; i++ ) {
1515     b->cowners[i] += b->cowners[i-1];
1516   }
1517   b->cstart    = b->cowners[b->rank];
1518   b->cend      = b->cowners[b->rank+1];
1519   b->cstart_bs = b->cstart * bs;
1520   b->cend_bs   = b->cend * bs;
1521 
1522   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1523   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1524   PLogObjectParent(B,b->A);
1525   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1526   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1527   PLogObjectParent(B,b->B);
1528 
1529   /* build cache for off array entries formed */
1530   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1531   b->donotstash  = 0;
1532   b->colmap      = 0;
1533   b->garray      = 0;
1534   b->roworiented = 1;
1535 
1536   /* stuff used for matrix vector multiply */
1537   b->lvec      = 0;
1538   b->Mvctx     = 0;
1539 
1540   /* stuff for MatGetRow() */
1541   b->rowindices   = 0;
1542   b->rowvalues    = 0;
1543   b->getrowactive = PETSC_FALSE;
1544 
1545   *A = B;
1546   return 0;
1547 }
1548 
1549 #undef __FUNC__
1550 #define __FUNC__ "MatConvertSameType_MPIBAIJ"
1551 static int MatConvertSameType_MPIBAIJ(Mat matin,Mat *newmat,int cpvalues)
1552 {
1553   Mat         mat;
1554   Mat_MPIBAIJ *a,*oldmat = (Mat_MPIBAIJ *) matin->data;
1555   int         ierr, len=0, flg;
1556 
1557   *newmat       = 0;
1558   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIBAIJ,matin->comm);
1559   PLogObjectCreate(mat);
1560   mat->data       = (void *) (a = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(a);
1561   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1562   mat->destroy    = MatDestroy_MPIBAIJ;
1563   mat->view       = MatView_MPIBAIJ;
1564   mat->factor     = matin->factor;
1565   mat->assembled  = PETSC_TRUE;
1566 
1567   a->m = mat->m   = oldmat->m;
1568   a->n = mat->n   = oldmat->n;
1569   a->M = mat->M   = oldmat->M;
1570   a->N = mat->N   = oldmat->N;
1571 
1572   a->bs  = oldmat->bs;
1573   a->bs2 = oldmat->bs2;
1574   a->mbs = oldmat->mbs;
1575   a->nbs = oldmat->nbs;
1576   a->Mbs = oldmat->Mbs;
1577   a->Nbs = oldmat->Nbs;
1578 
1579   a->rstart       = oldmat->rstart;
1580   a->rend         = oldmat->rend;
1581   a->cstart       = oldmat->cstart;
1582   a->cend         = oldmat->cend;
1583   a->size         = oldmat->size;
1584   a->rank         = oldmat->rank;
1585   mat->insertmode = NOT_SET_VALUES;
1586   a->rowvalues    = 0;
1587   a->getrowactive = PETSC_FALSE;
1588 
1589   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1590   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
1591   a->cowners = a->rowners + a->size + 2;
1592   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1593   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1594   if (oldmat->colmap) {
1595     a->colmap = (int *) PetscMalloc((a->Nbs)*sizeof(int));CHKPTRQ(a->colmap);
1596     PLogObjectMemory(mat,(a->Nbs)*sizeof(int));
1597     PetscMemcpy(a->colmap,oldmat->colmap,(a->Nbs)*sizeof(int));
1598   } else a->colmap = 0;
1599   if (oldmat->garray && (len = ((Mat_SeqBAIJ *) (oldmat->B->data))->nbs)) {
1600     a->garray = (int *) PetscMalloc(len*sizeof(int)); CHKPTRQ(a->garray);
1601     PLogObjectMemory(mat,len*sizeof(int));
1602     PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1603   } else a->garray = 0;
1604 
1605   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1606   PLogObjectParent(mat,a->lvec);
1607   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1608   PLogObjectParent(mat,a->Mvctx);
1609   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1610   PLogObjectParent(mat,a->A);
1611   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1612   PLogObjectParent(mat,a->B);
1613   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1614   if (flg) {
1615     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1616   }
1617   *newmat = mat;
1618   return 0;
1619 }
1620 
1621 #include "sys.h"
1622 
1623 #undef __FUNC__
1624 #define __FUNC__ "MatLoad_MPIBAIJ"
1625 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
1626 {
1627   Mat          A;
1628   int          i, nz, ierr, j,rstart, rend, fd;
1629   Scalar       *vals,*buf;
1630   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1631   MPI_Status   status;
1632   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
1633   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
1634   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,bs2,Mbs,mbs,extra_rows;
1635   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
1636   int          dcount,kmax,k,nzcount,tmp;
1637 
1638 
1639   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
1640   bs2  = bs*bs;
1641 
1642   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1643   if (!rank) {
1644     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1645     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
1646     if (header[0] != MAT_COOKIE) SETERRQ(1,0,"not matrix object");
1647   }
1648 
1649   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1650   M = header[1]; N = header[2];
1651 
1652   if (M != N) SETERRQ(1,0,"Can only do square matrices");
1653 
1654   /*
1655      This code adds extra rows to make sure the number of rows is
1656      divisible by the blocksize
1657   */
1658   Mbs        = M/bs;
1659   extra_rows = bs - M + bs*(Mbs);
1660   if (extra_rows == bs) extra_rows = 0;
1661   else                  Mbs++;
1662   if (extra_rows &&!rank) {
1663     PLogInfo(0,"MatLoad_MPIBAIJ:Padding loaded matrix to match blocksize\n");
1664   }
1665 
1666   /* determine ownership of all rows */
1667   mbs = Mbs/size + ((Mbs % size) > rank);
1668   m   = mbs * bs;
1669   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
1670   browners = rowners + size + 1;
1671   MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1672   rowners[0] = 0;
1673   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
1674   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
1675   rstart = rowners[rank];
1676   rend   = rowners[rank+1];
1677 
1678   /* distribute row lengths to all processors */
1679   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
1680   if (!rank) {
1681     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
1682     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1683     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
1684     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1685     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
1686     MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);
1687     PetscFree(sndcounts);
1688   }
1689   else {
1690     MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);
1691   }
1692 
1693   if (!rank) {
1694     /* calculate the number of nonzeros on each processor */
1695     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1696     PetscMemzero(procsnz,size*sizeof(int));
1697     for ( i=0; i<size; i++ ) {
1698       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
1699         procsnz[i] += rowlengths[j];
1700       }
1701     }
1702     PetscFree(rowlengths);
1703 
1704     /* determine max buffer needed and allocate it */
1705     maxnz = 0;
1706     for ( i=0; i<size; i++ ) {
1707       maxnz = PetscMax(maxnz,procsnz[i]);
1708     }
1709     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1710 
1711     /* read in my part of the matrix column indices  */
1712     nz = procsnz[0];
1713     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1714     mycols = ibuf;
1715     if (size == 1)  nz -= extra_rows;
1716     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1717     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
1718 
1719     /* read in every ones (except the last) and ship off */
1720     for ( i=1; i<size-1; i++ ) {
1721       nz = procsnz[i];
1722       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1723       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1724     }
1725     /* read in the stuff for the last proc */
1726     if ( size != 1 ) {
1727       nz = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
1728       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1729       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
1730       MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);
1731     }
1732     PetscFree(cols);
1733   }
1734   else {
1735     /* determine buffer space needed for message */
1736     nz = 0;
1737     for ( i=0; i<m; i++ ) {
1738       nz += locrowlens[i];
1739     }
1740     ibuf = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1741     mycols = ibuf;
1742     /* receive message of column indices*/
1743     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1744     MPI_Get_count(&status,MPI_INT,&maxnz);
1745     if (maxnz != nz) SETERRQ(1,0,"something is wrong with file");
1746   }
1747 
1748   /* loop over local rows, determining number of off diagonal entries */
1749   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
1750   odlens = dlens + (rend-rstart);
1751   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
1752   PetscMemzero(mask,3*Mbs*sizeof(int));
1753   masked1 = mask    + Mbs;
1754   masked2 = masked1 + Mbs;
1755   rowcount = 0; nzcount = 0;
1756   for ( i=0; i<mbs; i++ ) {
1757     dcount  = 0;
1758     odcount = 0;
1759     for ( j=0; j<bs; j++ ) {
1760       kmax = locrowlens[rowcount];
1761       for ( k=0; k<kmax; k++ ) {
1762         tmp = mycols[nzcount++]/bs;
1763         if (!mask[tmp]) {
1764           mask[tmp] = 1;
1765           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
1766           else masked1[dcount++] = tmp;
1767         }
1768       }
1769       rowcount++;
1770     }
1771 
1772     dlens[i]  = dcount;
1773     odlens[i] = odcount;
1774 
1775     /* zero out the mask elements we set */
1776     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
1777     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
1778   }
1779 
1780   /* create our matrix */
1781   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);
1782          CHKERRQ(ierr);
1783   A = *newmat;
1784   MatSetOption(A,MAT_COLUMNS_SORTED);
1785 
1786   if (!rank) {
1787     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
1788     /* read in my part of the matrix numerical values  */
1789     nz = procsnz[0];
1790     vals = buf;
1791     mycols = ibuf;
1792     if (size == 1)  nz -= extra_rows;
1793     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1794     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
1795 
1796     /* insert into matrix */
1797     jj      = rstart*bs;
1798     for ( i=0; i<m; i++ ) {
1799       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1800       mycols += locrowlens[i];
1801       vals   += locrowlens[i];
1802       jj++;
1803     }
1804     /* read in other processors (except the last one) and ship out */
1805     for ( i=1; i<size-1; i++ ) {
1806       nz = procsnz[i];
1807       vals = buf;
1808       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1809       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1810     }
1811     /* the last proc */
1812     if ( size != 1 ){
1813       nz = procsnz[i] - extra_rows;
1814       vals = buf;
1815       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1816       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
1817       MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);
1818     }
1819     PetscFree(procsnz);
1820   }
1821   else {
1822     /* receive numeric values */
1823     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
1824 
1825     /* receive message of values*/
1826     vals = buf;
1827     mycols = ibuf;
1828     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1829     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1830     if (maxnz != nz) SETERRQ(1,0,"something is wrong with file");
1831 
1832     /* insert into matrix */
1833     jj      = rstart*bs;
1834     for ( i=0; i<m; i++ ) {
1835       ierr    = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1836       mycols += locrowlens[i];
1837       vals   += locrowlens[i];
1838       jj++;
1839     }
1840   }
1841   PetscFree(locrowlens);
1842   PetscFree(buf);
1843   PetscFree(ibuf);
1844   PetscFree(rowners);
1845   PetscFree(dlens);
1846   PetscFree(mask);
1847   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1848   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1849   return 0;
1850 }
1851 
1852 
1853