xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision c2653b3d308917c4b5698e5f59fd76add26d7f41)
1 #ifndef lint
2 static char vcid[] = "$Id: mpibaij.c,v 1.55 1997/03/09 17:57:57 curfman Exp bsmith $";
3 #endif
4 
5 #include "src/mat/impls/baij/mpi/mpibaij.h"
6 #include "src/vec/vecimpl.h"
7 
8 #include "draw.h"
9 #include "pinclude/pviewer.h"
10 
11 extern int MatSetUpMultiply_MPIBAIJ(Mat);
12 extern int DisAssemble_MPIBAIJ(Mat);
13 extern int MatIncreaseOverlap_MPIBAIJ(Mat,int,IS *,int);
14 extern int MatGetSubMatrices_MPIBAIJ(Mat,int,IS *,IS *,MatGetSubMatrixCall,Mat **);
15 extern int MatLUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,Mat *);
16 extern int MatLUFactorNumeric_MPIBAIJ(Mat,Mat *);
17 extern int MatLUFactor_MPIBAIJ(Mat,IS,IS,double);
18 extern int MatSolve_MPIBAIJ(Mat,Vec,Vec);
19 extern int MatSolveAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
20 extern int MatSolveTrans_MPIBAIJ(Mat,Vec,Vec);
21 extern int MatSolveTransAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
22 extern int MatILUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,int,Mat *);
23 
24 
25 /*
26      Local utility routine that creates a mapping from the global column
27    number to the local number in the off-diagonal part of the local
28    storage of the matrix.  This is done in a non scable way since the
29    length of colmap equals the global matrix length.
30 */
31 #undef __FUNC__
32 #define __FUNC__ "CreateColmap_MPIBAIJ_Private"
33 static int CreateColmap_MPIBAIJ_Private(Mat mat)
34 {
35   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
36   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
37   int         nbs = B->nbs,i,bs=B->bs;;
38 
39   baij->colmap = (int *) PetscMalloc(baij->Nbs*sizeof(int));CHKPTRQ(baij->colmap);
40   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
41   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
42   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i*bs+1;
43   return 0;
44 }
45 
46 #undef __FUNC__
47 #define __FUNC__ "MatGetRowIJ_MPIBAIJ("
48 static int MatGetRowIJ_MPIBAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
49                             PetscTruth *done)
50 {
51   Mat_MPIBAIJ *aij = (Mat_MPIBAIJ *) mat->data;
52   int         ierr;
53   if (aij->size == 1) {
54     ierr = MatGetRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
55   } else SETERRQ(1,0,"not supported in parallel");
56   return 0;
57 }
58 
59 #undef __FUNC__
60 #define __FUNC__ "MatRestoreRowIJ_MPIBAIJ"
61 static int MatRestoreRowIJ_MPIBAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
62                                 PetscTruth *done)
63 {
64   Mat_MPIBAIJ *aij = (Mat_MPIBAIJ *) mat->data;
65   int        ierr;
66   if (aij->size == 1) {
67     ierr = MatRestoreRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
68   } else SETERRQ(1,0,"not supported in parallel");
69   return 0;
70 }
71 #define CHUNKSIZE  10
72 
73 #define  MatSetValues_SeqBAIJ_A_Private(row,col,value,addv) \
74 { \
75  \
76     brow = row/bs;  \
77     rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
78     rmax = imax[brow]; nrow = ailen[brow]; \
79       bcol = col/bs; \
80       ridx = row % bs; cidx = col % bs; \
81       low = 0; high = nrow; \
82       while (high-low > 3) { \
83         t = (low+high)/2; \
84         if (rp[t] > bcol) high = t; \
85         else              low  = t; \
86       } \
87       for ( _i=low; _i<high; _i++ ) { \
88         if (rp[_i] > bcol) break; \
89         if (rp[_i] == bcol) { \
90           bap  = ap +  bs2*_i + bs*cidx + ridx; \
91           if (addv == ADD_VALUES) *bap += value;  \
92           else                    *bap  = value;  \
93           goto _noinsert; \
94         } \
95       } \
96       if (a->nonew) goto _noinsert; \
97       if (nrow >= rmax) { \
98         /* there is no extra room in row, therefore enlarge */ \
99         int    new_nz = ai[a->mbs] + CHUNKSIZE,len,*new_i,*new_j; \
100         Scalar *new_a; \
101  \
102         /* malloc new storage space */ \
103         len     = new_nz*(sizeof(int)+bs2*sizeof(Scalar))+(a->mbs+1)*sizeof(int); \
104         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
105         new_j   = (int *) (new_a + bs2*new_nz); \
106         new_i   = new_j + new_nz; \
107  \
108         /* copy over old data into new slots */ \
109         for ( ii=0; ii<brow+1; ii++ ) {new_i[ii] = ai[ii];} \
110         for ( ii=brow+1; ii<a->mbs+1; ii++ ) {new_i[ii] = ai[ii]+CHUNKSIZE;} \
111         PetscMemcpy(new_j,aj,(ai[brow]+nrow)*sizeof(int)); \
112         len = (new_nz - CHUNKSIZE - ai[brow] - nrow); \
113         PetscMemcpy(new_j+ai[brow]+nrow+CHUNKSIZE,aj+ai[brow]+nrow, \
114                                                            len*sizeof(int)); \
115         PetscMemcpy(new_a,aa,(ai[brow]+nrow)*bs2*sizeof(Scalar)); \
116         PetscMemzero(new_a+bs2*(ai[brow]+nrow),bs2*CHUNKSIZE*sizeof(Scalar)); \
117         PetscMemcpy(new_a+bs2*(ai[brow]+nrow+CHUNKSIZE), \
118                     aa+bs2*(ai[brow]+nrow),bs2*len*sizeof(Scalar));  \
119         /* free up old matrix storage */ \
120         PetscFree(a->a);  \
121         if (!a->singlemalloc) {PetscFree(a->i);PetscFree(a->j);} \
122         aa = a->a = new_a; ai = a->i = new_i; aj = a->j = new_j;  \
123         a->singlemalloc = 1; \
124  \
125         rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
126         rmax = imax[brow] = imax[brow] + CHUNKSIZE; \
127         PLogObjectMemory(A,CHUNKSIZE*(sizeof(int) + bs2*sizeof(Scalar))); \
128         a->maxnz += bs2*CHUNKSIZE; \
129         a->reallocs++; \
130         a->nz++; \
131       } \
132       N = nrow++ - 1;  \
133       /* shift up all the later entries in this row */ \
134       for ( ii=N; ii>=_i; ii-- ) { \
135         rp[ii+1] = rp[ii]; \
136         PetscMemcpy(ap+bs2*(ii+1),ap+bs2*(ii),bs2*sizeof(Scalar)); \
137       } \
138       if (N>=_i) PetscMemzero(ap+bs2*_i,bs2*sizeof(Scalar));  \
139       rp[_i]                      = bcol;  \
140       ap[bs2*_i + bs*cidx + ridx] = value;  \
141       _noinsert:; \
142     ailen[brow] = nrow; \
143 }
144 
145 extern int MatSetValues_SeqBAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
146 #undef __FUNC__
147 #define __FUNC__ "MatSetValues_MPIBAIJ"
148 static int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
149 {
150   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
151   Scalar      value;
152   int         ierr,i,j,row,col;
153   int         roworiented = baij->roworiented,rstart_orig=baij->rstart_bs ;
154   int         rend_orig=baij->rend_bs,cstart_orig=baij->cstart_bs;
155   int         cend_orig=baij->cend_bs,bs=baij->bs;
156 
157   /* Some Variables required in the macro */
158   Mat         A = baij->A;
159   Mat_SeqBAIJ *a = (Mat_SeqBAIJ *) (A)->data;
160   int         *rp,ii,nrow,_i,rmax,N;
161   int         *imax=a->imax,*ai=a->i,*ailen=a->ilen;
162   int         *aj=a->j,brow,bcol;
163   int         low,high,t,ridx,cidx,bs2=a->bs2;
164   Scalar      *ap,*aa=a->a,*bap;
165 
166   for ( i=0; i<m; i++ ) {
167 #if defined(PETSC_BOPT_g)
168     if (im[i] < 0) SETERRQ(1,0,"Negative row");
169     if (im[i] >= baij->M) SETERRQ(1,0,"Row too large");
170 #endif
171     if (im[i] >= rstart_orig && im[i] < rend_orig) {
172       row = im[i] - rstart_orig;
173       for ( j=0; j<n; j++ ) {
174         if (in[j] >= cstart_orig && in[j] < cend_orig){
175           col = in[j] - cstart_orig;
176           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
177           MatSetValues_SeqBAIJ_A_Private(row,col,value,addv);
178           /* ierr = MatSetValues_SeqBAIJ(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
179         }
180 #if defined(PETSC_BOPT_g)
181         else if (in[j] < 0) {SETERRQ(1,0,"Negative column");}
182         else if (in[j] >= baij->N) {SETERRQ(1,0,"Col too large");}
183 #endif
184         else {
185           if (mat->was_assembled) {
186             if (!baij->colmap) {
187               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
188             }
189             col = baij->colmap[in[j]/bs] - 1 + in[j]%bs;
190             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
191               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
192               col =  in[j];
193             }
194           }
195           else col = in[j];
196           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
197           ierr = MatSetValues_SeqBAIJ(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
198         }
199       }
200     }
201     else {
202       if (roworiented && !baij->donotstash) {
203         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
204       }
205       else {
206         if (!baij->donotstash) {
207           row = im[i];
208 	  for ( j=0; j<n; j++ ) {
209 	    ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
210           }
211         }
212       }
213     }
214   }
215   return 0;
216 }
217 
218 extern int MatSetValuesBlocked_SeqBAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
219 #undef __FUNC__
220 #define __FUNC__ "MatSetValuesBlocked_MPIBAIJ"
221 static int MatSetValuesBlocked_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
222 {
223   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
224   Scalar      *value,*tmp;
225   int         ierr,i,j,ii,jj,row,col;
226   int         roworiented = baij->roworiented,rstart=baij->rstart ;
227   int         rend=baij->rend,cstart=baij->cstart,stepval;
228   int         cend=baij->cend,bs=baij->bs,bs2=baij->bs2;
229 
230   /* Should be stashed somewhere to avoid multiple mallocs */
231   tmp = (Scalar*) PetscMalloc(bs2*sizeof(Scalar)); CHKPTRQ(tmp);
232   if (roworiented) {
233     stepval = (n-1)*bs;
234   } else {
235     stepval = (m-1)*bs;
236   }
237   value = (Scalar *)PetscMalloc(bs2*sizeof(Scalar)); CHKPTRQ(value);
238   for ( i=0; i<m; i++ ) {
239 #if defined(PETSC_BOPT_g)
240     if (im[i] < 0) SETERRQ(1,0,"Negative row");
241     if (im[i] >= baij->Mbs) SETERRQ(1,0,"Row too large");
242 #endif
243     if (im[i] >= rstart && im[i] < rend) {
244       row = im[i] - rstart;
245       for ( j=0; j<n; j++ ) {
246         if (in[j] >= cstart && in[j] < cend){
247           col = in[j] - cstart;
248           if (roworiented) {
249             value = v + i*(stepval+bs)*bs + j*bs;
250             for ( ii=0; ii<bs; ii++,value+=stepval )
251               for (jj=ii; jj<bs2; jj+=bs )
252                 tmp[jj] = *value++;
253           } else {
254             value = v + j*(stepval+bs)*bs + i*bs;
255             for ( ii=0; ii<bs; ii++,value+=stepval )
256               for (jj=0; jj<bs; jj++ )
257                 *tmp++  = *value++;
258             tmp -=bs2;
259           }
260           ierr = MatSetValuesBlocked_SeqBAIJ(baij->A,1,&row,1,&col,tmp,addv);CHKERRQ(ierr);
261         }
262 #if defined(PETSC_BOPT_g)
263         else if (in[j] < 0) {SETERRQ(1,0,"Negative column");}
264         else if (in[j] >= baij->Nbs) {SETERRQ(1,0,"Col too large");}
265 #endif
266         else {
267           if (mat->was_assembled) {
268             if (!baij->colmap) {
269               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
270             }
271             col = baij->colmap[in[j]] - 1;
272             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
273               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
274               col =  in[j];
275             }
276           }
277           else col = in[j];
278           if (roworiented) {
279             value = v + i*(stepval+bs)*bs + j*bs;
280             for ( ii=0; ii<bs; ii++,value+=stepval )
281               for (jj=ii; jj<bs2; jj+=bs )
282                 tmp[jj] = *value++;
283           } else {
284             value = v + j*(stepval+bs)*bs + i*bs;
285             for ( ii=0; ii<bs; ii++,value+=stepval )
286               for (jj=0; jj<bs; jj++ )
287                 *tmp++  = *value++;
288             tmp -=bs2;
289           }
290           ierr = MatSetValuesBlocked_SeqBAIJ(baij->B,1,&row,1,&col,tmp,addv);CHKERRQ(ierr);
291         }
292       }
293     }
294     else {
295       if (!baij->donotstash) {
296         row = im[i]*bs;
297         if (roworiented ) {
298           for ( ii=0; ii<bs; i++,row++ ) {
299             for ( jj=0; jj<bs; jj++ ) {
300               ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
301             }
302           }
303         }
304         else {
305           row = im[i];
306 	  for ( j=0; j<n; j++ ) {
307 	    ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
308           }
309         }
310       }
311     }
312   }
313   PetscFree(tmp);
314   return 0;
315 }
316 
317 #undef __FUNC__
318 #define __FUNC__ "MatGetValues_MPIBAIJ"
319 static int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
320 {
321   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
322   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
323   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col;
324 
325   for ( i=0; i<m; i++ ) {
326     if (idxm[i] < 0) SETERRQ(1,0,"Negative row");
327     if (idxm[i] >= baij->M) SETERRQ(1,0,"Row too large");
328     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
329       row = idxm[i] - bsrstart;
330       for ( j=0; j<n; j++ ) {
331         if (idxn[j] < 0) SETERRQ(1,0,"Negative column");
332         if (idxn[j] >= baij->N) SETERRQ(1,0,"Col too large");
333         if (idxn[j] >= bscstart && idxn[j] < bscend){
334           col = idxn[j] - bscstart;
335           ierr = MatGetValues(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
336         }
337         else {
338           if (!baij->colmap) {
339             ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
340           }
341           if((baij->colmap[idxn[j]/bs]-1 < 0) ||
342              (baij->garray[(baij->colmap[idxn[j]/bs]-1)/bs] != idxn[j]/bs)) *(v+i*n+j) = 0.0;
343           else {
344             col  = (baij->colmap[idxn[j]/bs]-1) + idxn[j]%bs;
345             ierr = MatGetValues(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
346           }
347         }
348       }
349     }
350     else {
351       SETERRQ(1,0,"Only local values currently supported");
352     }
353   }
354   return 0;
355 }
356 
357 #undef __FUNC__
358 #define __FUNC__ "MatNorm_MPIBAIJ"
359 static int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
360 {
361   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
362   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
363   int        ierr, i,bs2=baij->bs2;
364   double     sum = 0.0;
365   Scalar     *v;
366 
367   if (baij->size == 1) {
368     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
369   } else {
370     if (type == NORM_FROBENIUS) {
371       v = amat->a;
372       for (i=0; i<amat->nz*bs2; i++ ) {
373 #if defined(PETSC_COMPLEX)
374         sum += real(conj(*v)*(*v)); v++;
375 #else
376         sum += (*v)*(*v); v++;
377 #endif
378       }
379       v = bmat->a;
380       for (i=0; i<bmat->nz*bs2; i++ ) {
381 #if defined(PETSC_COMPLEX)
382         sum += real(conj(*v)*(*v)); v++;
383 #else
384         sum += (*v)*(*v); v++;
385 #endif
386       }
387       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
388       *norm = sqrt(*norm);
389     }
390     else
391       SETERRQ(PETSC_ERR_SUP,0,"No support for this norm yet");
392   }
393   return 0;
394 }
395 
396 #undef __FUNC__
397 #define __FUNC__ "MatAssemblyBegin_MPIBAIJ"
398 static int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
399 {
400   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
401   MPI_Comm    comm = mat->comm;
402   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
403   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
404   MPI_Request *send_waits,*recv_waits;
405   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
406   InsertMode  addv;
407   Scalar      *rvalues,*svalues;
408 
409   /* make sure all processors are either in INSERTMODE or ADDMODE */
410   MPI_Allreduce(&mat->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
411   if (addv == (ADD_VALUES|INSERT_VALUES)) {
412     SETERRQ(1,0,"Some processors inserted others added");
413   }
414   mat->insertmode = addv; /* in case this processor had no cache */
415 
416   /*  first count number of contributors to each processor */
417   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
418   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
419   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
420   for ( i=0; i<baij->stash.n; i++ ) {
421     idx = baij->stash.idx[i];
422     for ( j=0; j<size; j++ ) {
423       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
424         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
425       }
426     }
427   }
428   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
429 
430   /* inform other processors of number of messages and max length*/
431   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
432   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
433   nreceives = work[rank];
434   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
435   nmax = work[rank];
436   PetscFree(work);
437 
438   /* post receives:
439        1) each message will consist of ordered pairs
440      (global index,value) we store the global index as a double
441      to simplify the message passing.
442        2) since we don't know how long each individual message is we
443      allocate the largest needed buffer for each receive. Potentially
444      this is a lot of wasted space.
445 
446 
447        This could be done better.
448   */
449   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
450   CHKPTRQ(rvalues);
451   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
452   CHKPTRQ(recv_waits);
453   for ( i=0; i<nreceives; i++ ) {
454     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
455               comm,recv_waits+i);
456   }
457 
458   /* do sends:
459       1) starts[i] gives the starting index in svalues for stuff going to
460          the ith processor
461   */
462   svalues = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
463   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
464   CHKPTRQ(send_waits);
465   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
466   starts[0] = 0;
467   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
468   for ( i=0; i<baij->stash.n; i++ ) {
469     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
470     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
471     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
472   }
473   PetscFree(owner);
474   starts[0] = 0;
475   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
476   count = 0;
477   for ( i=0; i<size; i++ ) {
478     if (procs[i]) {
479       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
480                 comm,send_waits+count++);
481     }
482   }
483   PetscFree(starts); PetscFree(nprocs);
484 
485   /* Free cache space */
486   PLogInfo(mat,"MatAssemblyBegin_MPIBAIJ:Number of off-processor values %d\n",baij->stash.n);
487   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
488 
489   baij->svalues    = svalues;    baij->rvalues    = rvalues;
490   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
491   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
492   baij->rmax       = nmax;
493 
494   return 0;
495 }
496 
497 
498 #undef __FUNC__
499 #define __FUNC__ "MatAssemblyEnd_MPIBAIJ"
500 static int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
501 {
502   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
503   MPI_Status  *send_status,recv_status;
504   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
505   int         bs=baij->bs,row,col,other_disassembled;
506   Scalar      *values,val;
507   InsertMode  addv = mat->insertmode;
508 
509   /*  wait on receives */
510   while (count) {
511     MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);
512     /* unpack receives into our local space */
513     values = baij->rvalues + 3*imdex*baij->rmax;
514     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
515     n = n/3;
516     for ( i=0; i<n; i++ ) {
517       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
518       col = (int) PetscReal(values[3*i+1]);
519       val = values[3*i+2];
520       if (col >= baij->cstart*bs && col < baij->cend*bs) {
521         col -= baij->cstart*bs;
522         MatSetValues(baij->A,1,&row,1,&col,&val,addv);
523       }
524       else {
525         if (mat->was_assembled) {
526           if (!baij->colmap) {
527             ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);
528           }
529           col = (baij->colmap[col/bs]-1)*bs + col%bs;
530           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
531             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
532             col = (int) PetscReal(values[3*i+1]);
533           }
534         }
535         MatSetValues(baij->B,1,&row,1,&col,&val,addv);
536       }
537     }
538     count--;
539   }
540   PetscFree(baij->recv_waits); PetscFree(baij->rvalues);
541 
542   /* wait on sends */
543   if (baij->nsends) {
544     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));
545     CHKPTRQ(send_status);
546     MPI_Waitall(baij->nsends,baij->send_waits,send_status);
547     PetscFree(send_status);
548   }
549   PetscFree(baij->send_waits); PetscFree(baij->svalues);
550 
551   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
552   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
553 
554   /* determine if any processor has disassembled, if so we must
555      also disassemble ourselfs, in order that we may reassemble. */
556   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
557   if (mat->was_assembled && !other_disassembled) {
558     ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
559   }
560 
561   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
562     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
563   }
564   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
565   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
566 
567   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
568   return 0;
569 }
570 
571 #undef __FUNC__
572 #define __FUNC__ "MatView_MPIBAIJ_Binary"
573 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
574 {
575   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
576   int          ierr;
577 
578   if (baij->size == 1) {
579     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
580   }
581   else SETERRQ(1,0,"Only uniprocessor output supported");
582   return 0;
583 }
584 
585 #undef __FUNC__
586 #define __FUNC__ "MatView_MPIBAIJ_ASCIIorDraworMatlab"
587 static int MatView_MPIBAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
588 {
589   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
590   int          ierr, format,rank,bs = baij->bs;
591   FILE         *fd;
592   ViewerType   vtype;
593 
594   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
595   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
596     ierr = ViewerGetFormat(viewer,&format);
597     if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
598       MatInfo info;
599       MPI_Comm_rank(mat->comm,&rank);
600       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
601       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
602       PetscSequentialPhaseBegin(mat->comm,1);
603       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
604               rank,baij->m,(int)info.nz_used*bs,(int)info.nz_allocated*bs,
605               baij->bs,(int)info.memory);
606       ierr = MatGetInfo(baij->A,MAT_LOCAL,&info);
607       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
608       ierr = MatGetInfo(baij->B,MAT_LOCAL,&info);
609       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
610       fflush(fd);
611       PetscSequentialPhaseEnd(mat->comm,1);
612       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
613       return 0;
614     }
615     else if (format == VIEWER_FORMAT_ASCII_INFO) {
616       PetscPrintf(mat->comm,"  block size is %d\n",bs);
617       return 0;
618     }
619   }
620 
621   if (vtype == DRAW_VIEWER) {
622     Draw       draw;
623     PetscTruth isnull;
624     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
625     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
626   }
627 
628   if (vtype == ASCII_FILE_VIEWER) {
629     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
630     PetscSequentialPhaseBegin(mat->comm,1);
631     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
632            baij->rank,baij->m,baij->rstart*bs,baij->rend*bs,baij->n,
633             baij->cstart*bs,baij->cend*bs);
634     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
635     ierr = MatView(baij->B,viewer); CHKERRQ(ierr);
636     fflush(fd);
637     PetscSequentialPhaseEnd(mat->comm,1);
638   }
639   else {
640     int size = baij->size;
641     rank = baij->rank;
642     if (size == 1) {
643       ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
644     }
645     else {
646       /* assemble the entire matrix onto first processor. */
647       Mat         A;
648       Mat_SeqBAIJ *Aloc;
649       int         M = baij->M, N = baij->N,*ai,*aj,row,col,i,j,k,*rvals;
650       int         mbs=baij->mbs;
651       Scalar      *a;
652 
653       if (!rank) {
654         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
655         CHKERRQ(ierr);
656       }
657       else {
658         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
659         CHKERRQ(ierr);
660       }
661       PLogObjectParent(mat,A);
662 
663       /* copy over the A part */
664       Aloc = (Mat_SeqBAIJ*) baij->A->data;
665       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
666       row = baij->rstart;
667       rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
668 
669       for ( i=0; i<mbs; i++ ) {
670         rvals[0] = bs*(baij->rstart + i);
671         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
672         for ( j=ai[i]; j<ai[i+1]; j++ ) {
673           col = (baij->cstart+aj[j])*bs;
674           for (k=0; k<bs; k++ ) {
675             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
676             col++; a += bs;
677           }
678         }
679       }
680       /* copy over the B part */
681       Aloc = (Mat_SeqBAIJ*) baij->B->data;
682       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
683       row = baij->rstart*bs;
684       for ( i=0; i<mbs; i++ ) {
685         rvals[0] = bs*(baij->rstart + i);
686         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
687         for ( j=ai[i]; j<ai[i+1]; j++ ) {
688           col = baij->garray[aj[j]]*bs;
689           for (k=0; k<bs; k++ ) {
690             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
691             col++; a += bs;
692           }
693         }
694       }
695       PetscFree(rvals);
696       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
697       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
698       if (!rank) {
699         ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
700       }
701       ierr = MatDestroy(A); CHKERRQ(ierr);
702     }
703   }
704   return 0;
705 }
706 
707 
708 
709 #undef __FUNC__
710 #define __FUNC__ "MatView_MPIBAIJ"
711 static int MatView_MPIBAIJ(PetscObject obj,Viewer viewer)
712 {
713   Mat         mat = (Mat) obj;
714   int         ierr;
715   ViewerType  vtype;
716 
717   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
718   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
719       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
720     ierr = MatView_MPIBAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
721   }
722   else if (vtype == BINARY_FILE_VIEWER) {
723     return MatView_MPIBAIJ_Binary(mat,viewer);
724   }
725   return 0;
726 }
727 
728 #undef __FUNC__
729 #define __FUNC__ "MatDestroy_MPIBAIJ"
730 static int MatDestroy_MPIBAIJ(PetscObject obj)
731 {
732   Mat         mat = (Mat) obj;
733   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
734   int         ierr;
735 
736 #if defined(PETSC_LOG)
737   PLogObjectState(obj,"Rows=%d, Cols=%d",baij->M,baij->N);
738 #endif
739 
740   PetscFree(baij->rowners);
741   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
742   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
743   if (baij->colmap) PetscFree(baij->colmap);
744   if (baij->garray) PetscFree(baij->garray);
745   if (baij->lvec)   VecDestroy(baij->lvec);
746   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
747   if (baij->rowvalues) PetscFree(baij->rowvalues);
748   PetscFree(baij);
749   if (mat->mapping) {
750     ierr = ISLocalToGlobalMappingDestroy(mat->mapping); CHKERRQ(ierr);
751   }
752   PLogObjectDestroy(mat);
753   PetscHeaderDestroy(mat);
754   return 0;
755 }
756 
757 #undef __FUNC__
758 #define __FUNC__ "MatMult_MPIBAIJ"
759 static int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
760 {
761   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
762   int         ierr, nt;
763 
764   VecGetLocalSize_Fast(xx,nt);
765   if (nt != a->n) {
766     SETERRQ(1,0,"Incompatible partition of A and xx");
767   }
768   VecGetLocalSize_Fast(yy,nt);
769   if (nt != a->m) {
770     SETERRQ(1,0,"Incompatible parition of A and yy");
771   }
772   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
773   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
774   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
775   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
776   ierr = VecScatterPostRecvs(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
777   return 0;
778 }
779 
780 #undef __FUNC__
781 #define __FUNC__ "MatMultAdd_MPIBAIJ"
782 static int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
783 {
784   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
785   int        ierr;
786   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
787   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
788   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
789   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
790   return 0;
791 }
792 
793 #undef __FUNC__
794 #define __FUNC__ "MatMultTrans_MPIBAIJ"
795 static int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
796 {
797   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
798   int        ierr;
799 
800   /* do nondiagonal part */
801   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
802   /* send it on its way */
803   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
804   /* do local part */
805   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
806   /* receive remote parts: note this assumes the values are not actually */
807   /* inserted in yy until the next line, which is true for my implementation*/
808   /* but is not perhaps always true. */
809   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
810   return 0;
811 }
812 
813 #undef __FUNC__
814 #define __FUNC__ "MatMultTransAdd_MPIBAIJ"
815 static int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
816 {
817   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
818   int        ierr;
819 
820   /* do nondiagonal part */
821   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
822   /* send it on its way */
823   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
824   /* do local part */
825   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
826   /* receive remote parts: note this assumes the values are not actually */
827   /* inserted in yy until the next line, which is true for my implementation*/
828   /* but is not perhaps always true. */
829   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
830   return 0;
831 }
832 
833 /*
834   This only works correctly for square matrices where the subblock A->A is the
835    diagonal block
836 */
837 #undef __FUNC__
838 #define __FUNC__ "MatGetDiagonal_MPIBAIJ"
839 static int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
840 {
841   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
842   if (a->M != a->N)
843     SETERRQ(1,0,"Supports only square matrix where A->A is diag block");
844   return MatGetDiagonal(a->A,v);
845 }
846 
847 #undef __FUNC__
848 #define __FUNC__ "MatScale_MPIBAIJ"
849 static int MatScale_MPIBAIJ(Scalar *aa,Mat A)
850 {
851   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
852   int        ierr;
853   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
854   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
855   return 0;
856 }
857 
858 #undef __FUNC__
859 #define __FUNC__ "MatGetSize_MPIBAIJ"
860 static int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
861 {
862   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
863   *m = mat->M; *n = mat->N;
864   return 0;
865 }
866 
867 #undef __FUNC__
868 #define __FUNC__ "MatGetLocalSize_MPIBAIJ"
869 static int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
870 {
871   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
872   *m = mat->m; *n = mat->N;
873   return 0;
874 }
875 
876 #undef __FUNC__
877 #define __FUNC__ "MatGetOwnershipRange_MPIBAIJ"
878 static int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
879 {
880   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
881   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
882   return 0;
883 }
884 
885 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
886 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
887 
888 #undef __FUNC__
889 #define __FUNC__ "MatGetRow_MPIBAIJ"
890 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
891 {
892   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
893   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
894   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
895   int        nztot, nzA, nzB, lrow, brstart = mat->rstart*bs, brend = mat->rend*bs;
896   int        *cmap, *idx_p,cstart = mat->cstart;
897 
898   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,0,"Already active");
899   mat->getrowactive = PETSC_TRUE;
900 
901   if (!mat->rowvalues && (idx || v)) {
902     /*
903         allocate enough space to hold information from the longest row.
904     */
905     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
906     int     max = 1,mbs = mat->mbs,tmp;
907     for ( i=0; i<mbs; i++ ) {
908       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
909       if (max < tmp) { max = tmp; }
910     }
911     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
912     CHKPTRQ(mat->rowvalues);
913     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
914   }
915 
916 
917   if (row < brstart || row >= brend) SETERRQ(1,0,"Only local rows")
918   lrow = row - brstart;
919 
920   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
921   if (!v)   {pvA = 0; pvB = 0;}
922   if (!idx) {pcA = 0; if (!v) pcB = 0;}
923   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
924   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
925   nztot = nzA + nzB;
926 
927   cmap  = mat->garray;
928   if (v  || idx) {
929     if (nztot) {
930       /* Sort by increasing column numbers, assuming A and B already sorted */
931       int imark = -1;
932       if (v) {
933         *v = v_p = mat->rowvalues;
934         for ( i=0; i<nzB; i++ ) {
935           if (cmap[cworkB[i]/bs] < cstart)   v_p[i] = vworkB[i];
936           else break;
937         }
938         imark = i;
939         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
940         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
941       }
942       if (idx) {
943         *idx = idx_p = mat->rowindices;
944         if (imark > -1) {
945           for ( i=0; i<imark; i++ ) {
946             idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs;
947           }
948         } else {
949           for ( i=0; i<nzB; i++ ) {
950             if (cmap[cworkB[i]/bs] < cstart)
951               idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
952             else break;
953           }
954           imark = i;
955         }
956         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart*bs + cworkA[i];
957         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
958       }
959     }
960     else {
961       if (idx) *idx = 0;
962       if (v)   *v   = 0;
963     }
964   }
965   *nz = nztot;
966   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
967   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
968   return 0;
969 }
970 
971 #undef __FUNC__
972 #define __FUNC__ "MatRestoreRow_MPIBAIJ"
973 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
974 {
975   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
976   if (baij->getrowactive == PETSC_FALSE) {
977     SETERRQ(1,0,"MatGetRow not called");
978   }
979   baij->getrowactive = PETSC_FALSE;
980   return 0;
981 }
982 
983 #undef __FUNC__
984 #define __FUNC__ "MatGetBlockSize_MPIBAIJ"
985 static int MatGetBlockSize_MPIBAIJ(Mat mat,int *bs)
986 {
987   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
988   *bs = baij->bs;
989   return 0;
990 }
991 
992 #undef __FUNC__
993 #define __FUNC__ "MatZeroEntries_MPIBAIJ"
994 static int MatZeroEntries_MPIBAIJ(Mat A)
995 {
996   Mat_MPIBAIJ *l = (Mat_MPIBAIJ *) A->data;
997   int         ierr;
998   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
999   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
1000   return 0;
1001 }
1002 
1003 #undef __FUNC__
1004 #define __FUNC__ "MatGetInfo_MPIBAIJ"
1005 static int MatGetInfo_MPIBAIJ(Mat matin,MatInfoType flag,MatInfo *info)
1006 {
1007   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) matin->data;
1008   Mat         A = a->A, B = a->B;
1009   int         ierr;
1010   double      isend[5], irecv[5];
1011 
1012   info->rows_global    = (double)a->M;
1013   info->columns_global = (double)a->N;
1014   info->rows_local     = (double)a->m;
1015   info->columns_local  = (double)a->N;
1016   info->block_size     = (double)a->bs;
1017   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
1018   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->memory;
1019   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
1020   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->memory;
1021   if (flag == MAT_LOCAL) {
1022     info->nz_used      = isend[0];
1023     info->nz_allocated = isend[1];
1024     info->nz_unneeded  = isend[2];
1025     info->memory       = isend[3];
1026     info->mallocs      = isend[4];
1027   } else if (flag == MAT_GLOBAL_MAX) {
1028     MPI_Allreduce(isend,irecv,5,MPI_INT,MPI_MAX,matin->comm);
1029     info->nz_used      = irecv[0];
1030     info->nz_allocated = irecv[1];
1031     info->nz_unneeded  = irecv[2];
1032     info->memory       = irecv[3];
1033     info->mallocs      = irecv[4];
1034   } else if (flag == MAT_GLOBAL_SUM) {
1035     MPI_Allreduce(isend,irecv,5,MPI_INT,MPI_SUM,matin->comm);
1036     info->nz_used      = irecv[0];
1037     info->nz_allocated = irecv[1];
1038     info->nz_unneeded  = irecv[2];
1039     info->memory       = irecv[3];
1040     info->mallocs      = irecv[4];
1041   }
1042   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
1043   info->fill_ratio_needed = 0;
1044   info->factor_mallocs    = 0;
1045   return 0;
1046 }
1047 
1048 #undef __FUNC__
1049 #define __FUNC__ "MatSetOption_MPIBAIJ"
1050 static int MatSetOption_MPIBAIJ(Mat A,MatOption op)
1051 {
1052   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1053 
1054   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
1055       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
1056       op == MAT_COLUMNS_UNSORTED ||
1057       op == MAT_COLUMNS_SORTED ||
1058       op == MAT_NEW_NONZERO_LOCATION_ERROR) {
1059         MatSetOption(a->A,op);
1060         MatSetOption(a->B,op);
1061   } else if (op == MAT_ROW_ORIENTED) {
1062         a->roworiented = 1;
1063         MatSetOption(a->A,op);
1064         MatSetOption(a->B,op);
1065   } else if (op == MAT_ROWS_SORTED ||
1066              op == MAT_ROWS_UNSORTED ||
1067              op == MAT_SYMMETRIC ||
1068              op == MAT_STRUCTURALLY_SYMMETRIC ||
1069              op == MAT_YES_NEW_DIAGONALS)
1070     PLogInfo(A,"Info:MatSetOption_MPIBAIJ:Option ignored\n");
1071   else if (op == MAT_COLUMN_ORIENTED) {
1072     a->roworiented = 0;
1073     MatSetOption(a->A,op);
1074     MatSetOption(a->B,op);
1075   } else if (op == MAT_IGNORE_OFF_PROCESSOR_ENTRIES) {
1076     a->donotstash = 1;
1077   } else if (op == MAT_NO_NEW_DIAGONALS)
1078     {SETERRQ(PETSC_ERR_SUP,0,"MAT_NO_NEW_DIAGONALS");}
1079   else
1080     {SETERRQ(PETSC_ERR_SUP,0,"unknown option");}
1081   return 0;
1082 }
1083 
1084 #undef __FUNC__
1085 #define __FUNC__ "MatTranspose_MPIBAIJ("
1086 static int MatTranspose_MPIBAIJ(Mat A,Mat *matout)
1087 {
1088   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) A->data;
1089   Mat_SeqBAIJ *Aloc;
1090   Mat        B;
1091   int        ierr,M=baij->M,N=baij->N,*ai,*aj,row,i,*rvals,j,k,col;
1092   int        bs=baij->bs,mbs=baij->mbs;
1093   Scalar     *a;
1094 
1095   if (matout == PETSC_NULL && M != N)
1096     SETERRQ(1,0,"Square matrix only for in-place");
1097   ierr = MatCreateMPIBAIJ(A->comm,baij->bs,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);
1098   CHKERRQ(ierr);
1099 
1100   /* copy over the A part */
1101   Aloc = (Mat_SeqBAIJ*) baij->A->data;
1102   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1103   row = baij->rstart;
1104   rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
1105 
1106   for ( i=0; i<mbs; i++ ) {
1107     rvals[0] = bs*(baij->rstart + i);
1108     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1109     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1110       col = (baij->cstart+aj[j])*bs;
1111       for (k=0; k<bs; k++ ) {
1112         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1113         col++; a += bs;
1114       }
1115     }
1116   }
1117   /* copy over the B part */
1118   Aloc = (Mat_SeqBAIJ*) baij->B->data;
1119   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1120   row = baij->rstart*bs;
1121   for ( i=0; i<mbs; i++ ) {
1122     rvals[0] = bs*(baij->rstart + i);
1123     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1124     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1125       col = baij->garray[aj[j]]*bs;
1126       for (k=0; k<bs; k++ ) {
1127         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1128         col++; a += bs;
1129       }
1130     }
1131   }
1132   PetscFree(rvals);
1133   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1134   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1135 
1136   if (matout != PETSC_NULL) {
1137     *matout = B;
1138   } else {
1139     /* This isn't really an in-place transpose .... but free data structures from baij */
1140     PetscFree(baij->rowners);
1141     ierr = MatDestroy(baij->A); CHKERRQ(ierr);
1142     ierr = MatDestroy(baij->B); CHKERRQ(ierr);
1143     if (baij->colmap) PetscFree(baij->colmap);
1144     if (baij->garray) PetscFree(baij->garray);
1145     if (baij->lvec) VecDestroy(baij->lvec);
1146     if (baij->Mvctx) VecScatterDestroy(baij->Mvctx);
1147     PetscFree(baij);
1148     PetscMemcpy(A,B,sizeof(struct _Mat));
1149     PetscHeaderDestroy(B);
1150   }
1151   return 0;
1152 }
1153 
1154 #undef __FUNC__
1155 #define __FUNC__ "MatDiagonalScale_MPIBAIJ"
1156 int MatDiagonalScale_MPIBAIJ(Mat A,Vec ll,Vec rr)
1157 {
1158   Mat a = ((Mat_MPIBAIJ *) A->data)->A;
1159   Mat b = ((Mat_MPIBAIJ *) A->data)->B;
1160   int ierr,s1,s2,s3;
1161 
1162   if (ll)  {
1163     ierr = VecGetLocalSize(ll,&s1); CHKERRQ(ierr);
1164     ierr = MatGetLocalSize(A,&s2,&s3); CHKERRQ(ierr);
1165     if (s1!=s2) SETERRQ(1,0,"non-conforming local sizes");
1166     ierr = MatDiagonalScale(a,ll,0); CHKERRQ(ierr);
1167     ierr = MatDiagonalScale(b,ll,0); CHKERRQ(ierr);
1168   }
1169   if (rr) SETERRQ(1,0,"not supported for right vector");
1170   return 0;
1171 }
1172 
1173 /* the code does not do the diagonal entries correctly unless the
1174    matrix is square and the column and row owerships are identical.
1175    This is a BUG. The only way to fix it seems to be to access
1176    baij->A and baij->B directly and not through the MatZeroRows()
1177    routine.
1178 */
1179 #undef __FUNC__
1180 #define __FUNC__ "MatZeroRows_MPIBAIJ"
1181 static int MatZeroRows_MPIBAIJ(Mat A,IS is,Scalar *diag)
1182 {
1183   Mat_MPIBAIJ    *l = (Mat_MPIBAIJ *) A->data;
1184   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
1185   int            *procs,*nprocs,j,found,idx,nsends,*work;
1186   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
1187   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
1188   int            *lens,imdex,*lrows,*values,bs=l->bs;
1189   MPI_Comm       comm = A->comm;
1190   MPI_Request    *send_waits,*recv_waits;
1191   MPI_Status     recv_status,*send_status;
1192   IS             istmp;
1193 
1194   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
1195   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
1196 
1197   /*  first count number of contributors to each processor */
1198   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
1199   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
1200   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
1201   for ( i=0; i<N; i++ ) {
1202     idx = rows[i];
1203     found = 0;
1204     for ( j=0; j<size; j++ ) {
1205       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
1206         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
1207       }
1208     }
1209     if (!found) SETERRQ(1,0,"Index out of range");
1210   }
1211   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
1212 
1213   /* inform other processors of number of messages and max length*/
1214   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
1215   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
1216   nrecvs = work[rank];
1217   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
1218   nmax = work[rank];
1219   PetscFree(work);
1220 
1221   /* post receives:   */
1222   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
1223   CHKPTRQ(rvalues);
1224   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
1225   CHKPTRQ(recv_waits);
1226   for ( i=0; i<nrecvs; i++ ) {
1227     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
1228   }
1229 
1230   /* do sends:
1231       1) starts[i] gives the starting index in svalues for stuff going to
1232          the ith processor
1233   */
1234   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
1235   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
1236   CHKPTRQ(send_waits);
1237   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
1238   starts[0] = 0;
1239   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1240   for ( i=0; i<N; i++ ) {
1241     svalues[starts[owner[i]]++] = rows[i];
1242   }
1243   ISRestoreIndices(is,&rows);
1244 
1245   starts[0] = 0;
1246   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1247   count = 0;
1248   for ( i=0; i<size; i++ ) {
1249     if (procs[i]) {
1250       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
1251     }
1252   }
1253   PetscFree(starts);
1254 
1255   base = owners[rank]*bs;
1256 
1257   /*  wait on receives */
1258   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
1259   source = lens + nrecvs;
1260   count  = nrecvs; slen = 0;
1261   while (count) {
1262     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
1263     /* unpack receives into our local space */
1264     MPI_Get_count(&recv_status,MPI_INT,&n);
1265     source[imdex]  = recv_status.MPI_SOURCE;
1266     lens[imdex]  = n;
1267     slen += n;
1268     count--;
1269   }
1270   PetscFree(recv_waits);
1271 
1272   /* move the data into the send scatter */
1273   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
1274   count = 0;
1275   for ( i=0; i<nrecvs; i++ ) {
1276     values = rvalues + i*nmax;
1277     for ( j=0; j<lens[i]; j++ ) {
1278       lrows[count++] = values[j] - base;
1279     }
1280   }
1281   PetscFree(rvalues); PetscFree(lens);
1282   PetscFree(owner); PetscFree(nprocs);
1283 
1284   /* actually zap the local rows */
1285   ierr = ISCreateGeneral(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
1286   PLogObjectParent(A,istmp);
1287   PetscFree(lrows);
1288   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
1289   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
1290   ierr = ISDestroy(istmp); CHKERRQ(ierr);
1291 
1292   /* wait on sends */
1293   if (nsends) {
1294     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
1295     CHKPTRQ(send_status);
1296     MPI_Waitall(nsends,send_waits,send_status);
1297     PetscFree(send_status);
1298   }
1299   PetscFree(send_waits); PetscFree(svalues);
1300 
1301   return 0;
1302 }
1303 extern int MatPrintHelp_SeqBAIJ(Mat);
1304 #undef __FUNC__
1305 #define __FUNC__ "MatPrintHelp_MPIBAIJ"
1306 static int MatPrintHelp_MPIBAIJ(Mat A)
1307 {
1308   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1309 
1310   if (!a->rank) return MatPrintHelp_SeqBAIJ(a->A);
1311   else return 0;
1312 }
1313 
1314 #undef __FUNC__
1315 #define __FUNC__ "MatSetUnfactored_MPIBAIJ"
1316 static int MatSetUnfactored_MPIBAIJ(Mat A)
1317 {
1318   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1319   int         ierr;
1320   ierr = MatSetUnfactored(a->A); CHKERRQ(ierr);
1321   return 0;
1322 }
1323 
1324 static int MatConvertSameType_MPIBAIJ(Mat,Mat *,int);
1325 
1326 /* -------------------------------------------------------------------*/
1327 static struct _MatOps MatOps = {
1328   MatSetValues_MPIBAIJ,MatGetRow_MPIBAIJ,MatRestoreRow_MPIBAIJ,MatMult_MPIBAIJ,
1329   MatMultAdd_MPIBAIJ,MatMultTrans_MPIBAIJ,MatMultTransAdd_MPIBAIJ,0,
1330   0,0,0,0,
1331   0,0,MatTranspose_MPIBAIJ,MatGetInfo_MPIBAIJ,
1332   0,MatGetDiagonal_MPIBAIJ,MatDiagonalScale_MPIBAIJ,MatNorm_MPIBAIJ,
1333   MatAssemblyBegin_MPIBAIJ,MatAssemblyEnd_MPIBAIJ,0,MatSetOption_MPIBAIJ,
1334   MatZeroEntries_MPIBAIJ,MatZeroRows_MPIBAIJ,0,
1335   0,0,0,MatGetSize_MPIBAIJ,
1336   MatGetLocalSize_MPIBAIJ,MatGetOwnershipRange_MPIBAIJ,0,0,
1337   0,0,MatConvertSameType_MPIBAIJ,0,0,
1338   0,0,0,MatGetSubMatrices_MPIBAIJ,
1339   MatIncreaseOverlap_MPIBAIJ,MatGetValues_MPIBAIJ,0,MatPrintHelp_MPIBAIJ,
1340   MatScale_MPIBAIJ,0,0,0,MatGetBlockSize_MPIBAIJ,
1341   0,0,0,0,0,0,MatSetUnfactored_MPIBAIJ,0,MatSetValuesBlocked_MPIBAIJ};
1342 
1343 
1344 #undef __FUNC__
1345 #define __FUNC__ "MatCreateMPIBAIJ"
1346 /*@C
1347    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
1348    (block compressed row).  For good matrix assembly performance
1349    the user should preallocate the matrix storage by setting the parameters
1350    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1351    performance can be increased by more than a factor of 50.
1352 
1353    Input Parameters:
1354 .  comm - MPI communicator
1355 .  bs   - size of blockk
1356 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1357            This value should be the same as the local size used in creating the
1358            y vector for the matrix-vector product y = Ax.
1359 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1360            This value should be the same as the local size used in creating the
1361            x vector for the matrix-vector product y = Ax.
1362 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1363 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1364 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
1365            submatrix  (same for all local rows)
1366 .  d_nzz - array containing the number of block nonzeros in the various block rows
1367            of the in diagonal portion of the local (possibly different for each block
1368            row) or PETSC_NULL.  You must leave room for the diagonal entry even if
1369            it is zero.
1370 .  o_nz  - number of block nonzeros per block row in the off-diagonal portion of local
1371            submatrix (same for all local rows).
1372 .  o_nzz - array containing the number of nonzeros in the various block rows of the
1373            off-diagonal portion of the local submatrix (possibly different for
1374            each block row) or PETSC_NULL.
1375 
1376    Output Parameter:
1377 .  A - the matrix
1378 
1379    Notes:
1380    The user MUST specify either the local or global matrix dimensions
1381    (possibly both).
1382 
1383    Storage Information:
1384    For a square global matrix we define each processor's diagonal portion
1385    to be its local rows and the corresponding columns (a square submatrix);
1386    each processor's off-diagonal portion encompasses the remainder of the
1387    local matrix (a rectangular submatrix).
1388 
1389    The user can specify preallocated storage for the diagonal part of
1390    the local submatrix with either d_nz or d_nnz (not both).  Set
1391    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1392    memory allocation.  Likewise, specify preallocated storage for the
1393    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1394 
1395    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1396    the figure below we depict these three local rows and all columns (0-11).
1397 
1398 $          0 1 2 3 4 5 6 7 8 9 10 11
1399 $         -------------------
1400 $  row 3  |  o o o d d d o o o o o o
1401 $  row 4  |  o o o d d d o o o o o o
1402 $  row 5  |  o o o d d d o o o o o o
1403 $         -------------------
1404 $
1405 
1406    Thus, any entries in the d locations are stored in the d (diagonal)
1407    submatrix, and any entries in the o locations are stored in the
1408    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1409    stored simply in the MATSEQBAIJ format for compressed row storage.
1410 
1411    Now d_nz should indicate the number of nonzeros per row in the d matrix,
1412    and o_nz should indicate the number of nonzeros per row in the o matrix.
1413    In general, for PDE problems in which most nonzeros are near the diagonal,
1414    one expects d_nz >> o_nz.   For large problems you MUST preallocate memory
1415    or you will get TERRIBLE performance; see the users' manual chapter on
1416    matrices.
1417 
1418 .keywords: matrix, block, aij, compressed row, sparse, parallel
1419 
1420 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues()
1421 @*/
1422 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
1423                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1424 {
1425   Mat          B;
1426   Mat_MPIBAIJ  *b;
1427   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE,size;
1428 
1429   if (bs < 1) SETERRQ(1,0,"invalid block size specified");
1430   *A = 0;
1431   PetscHeaderCreate(B,_Mat,MAT_COOKIE,MATMPIBAIJ,comm);
1432   PLogObjectCreate(B);
1433   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
1434   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
1435   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1436   MPI_Comm_size(comm,&size);
1437   if (size == 1) {
1438     B->ops.getrowij          = MatGetRowIJ_MPIBAIJ;
1439     B->ops.restorerowij      = MatRestoreRowIJ_MPIBAIJ;
1440     B->ops.lufactorsymbolic  = MatLUFactorSymbolic_MPIBAIJ;
1441     B->ops.lufactornumeric   = MatLUFactorNumeric_MPIBAIJ;
1442     B->ops.lufactor          = MatLUFactor_MPIBAIJ;
1443     B->ops.solve             = MatSolve_MPIBAIJ;
1444     B->ops.solveadd          = MatSolveAdd_MPIBAIJ;
1445     B->ops.solvetrans        = MatSolveTrans_MPIBAIJ;
1446     B->ops.solvetransadd     = MatSolveTransAdd_MPIBAIJ;
1447     B->ops.ilufactorsymbolic = MatILUFactorSymbolic_MPIBAIJ;
1448   }
1449 
1450   B->destroy    = MatDestroy_MPIBAIJ;
1451   B->view       = MatView_MPIBAIJ;
1452   B->mapping    = 0;
1453   B->factor     = 0;
1454   B->assembled  = PETSC_FALSE;
1455 
1456   B->insertmode = NOT_SET_VALUES;
1457   MPI_Comm_rank(comm,&b->rank);
1458   MPI_Comm_size(comm,&b->size);
1459 
1460   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
1461     SETERRQ(1,0,"Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1462   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) SETERRQ(1,0,"either M or m should be specified");
1463   if ( M == PETSC_DECIDE && n == PETSC_DECIDE)SETERRQ(1,0,"either N or n should be specified");
1464   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
1465   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
1466 
1467   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1468     work[0] = m; work[1] = n;
1469     mbs = m/bs; nbs = n/bs;
1470     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
1471     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
1472     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
1473   }
1474   if (m == PETSC_DECIDE) {
1475     Mbs = M/bs;
1476     if (Mbs*bs != M) SETERRQ(1,0,"No of global rows must be divisible by blocksize");
1477     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
1478     m   = mbs*bs;
1479   }
1480   if (n == PETSC_DECIDE) {
1481     Nbs = N/bs;
1482     if (Nbs*bs != N) SETERRQ(1,0,"No of global cols must be divisible by blocksize");
1483     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
1484     n   = nbs*bs;
1485   }
1486   if (mbs*bs != m || nbs*bs != n) SETERRQ(1,0,"No of local rows, cols must be divisible by blocksize");
1487 
1488   b->m = m; B->m = m;
1489   b->n = n; B->n = n;
1490   b->N = N; B->N = N;
1491   b->M = M; B->M = M;
1492   b->bs  = bs;
1493   b->bs2 = bs*bs;
1494   b->mbs = mbs;
1495   b->nbs = nbs;
1496   b->Mbs = Mbs;
1497   b->Nbs = Nbs;
1498 
1499   /* build local table of row and column ownerships */
1500   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1501   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
1502   b->cowners = b->rowners + b->size + 2;
1503   MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
1504   b->rowners[0] = 0;
1505   for ( i=2; i<=b->size; i++ ) {
1506     b->rowners[i] += b->rowners[i-1];
1507   }
1508   b->rstart    = b->rowners[b->rank];
1509   b->rend      = b->rowners[b->rank+1];
1510   b->rstart_bs = b->rstart * bs;
1511   b->rend_bs   = b->rend * bs;
1512 
1513   MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
1514   b->cowners[0] = 0;
1515   for ( i=2; i<=b->size; i++ ) {
1516     b->cowners[i] += b->cowners[i-1];
1517   }
1518   b->cstart    = b->cowners[b->rank];
1519   b->cend      = b->cowners[b->rank+1];
1520   b->cstart_bs = b->cstart * bs;
1521   b->cend_bs   = b->cend * bs;
1522 
1523   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1524   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1525   PLogObjectParent(B,b->A);
1526   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1527   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1528   PLogObjectParent(B,b->B);
1529 
1530   /* build cache for off array entries formed */
1531   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1532   b->donotstash  = 0;
1533   b->colmap      = 0;
1534   b->garray      = 0;
1535   b->roworiented = 1;
1536 
1537   /* stuff used for matrix vector multiply */
1538   b->lvec      = 0;
1539   b->Mvctx     = 0;
1540 
1541   /* stuff for MatGetRow() */
1542   b->rowindices   = 0;
1543   b->rowvalues    = 0;
1544   b->getrowactive = PETSC_FALSE;
1545 
1546   *A = B;
1547   return 0;
1548 }
1549 
1550 #undef __FUNC__
1551 #define __FUNC__ "MatConvertSameType_MPIBAIJ"
1552 static int MatConvertSameType_MPIBAIJ(Mat matin,Mat *newmat,int cpvalues)
1553 {
1554   Mat         mat;
1555   Mat_MPIBAIJ *a,*oldmat = (Mat_MPIBAIJ *) matin->data;
1556   int         ierr, len=0, flg;
1557 
1558   *newmat       = 0;
1559   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIBAIJ,matin->comm);
1560   PLogObjectCreate(mat);
1561   mat->data       = (void *) (a = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(a);
1562   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1563   mat->destroy    = MatDestroy_MPIBAIJ;
1564   mat->view       = MatView_MPIBAIJ;
1565   mat->factor     = matin->factor;
1566   mat->assembled  = PETSC_TRUE;
1567 
1568   a->m = mat->m   = oldmat->m;
1569   a->n = mat->n   = oldmat->n;
1570   a->M = mat->M   = oldmat->M;
1571   a->N = mat->N   = oldmat->N;
1572 
1573   a->bs  = oldmat->bs;
1574   a->bs2 = oldmat->bs2;
1575   a->mbs = oldmat->mbs;
1576   a->nbs = oldmat->nbs;
1577   a->Mbs = oldmat->Mbs;
1578   a->Nbs = oldmat->Nbs;
1579 
1580   a->rstart       = oldmat->rstart;
1581   a->rend         = oldmat->rend;
1582   a->cstart       = oldmat->cstart;
1583   a->cend         = oldmat->cend;
1584   a->size         = oldmat->size;
1585   a->rank         = oldmat->rank;
1586   mat->insertmode = NOT_SET_VALUES;
1587   a->rowvalues    = 0;
1588   a->getrowactive = PETSC_FALSE;
1589 
1590   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1591   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
1592   a->cowners = a->rowners + a->size + 2;
1593   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1594   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1595   if (oldmat->colmap) {
1596     a->colmap = (int *) PetscMalloc((a->Nbs)*sizeof(int));CHKPTRQ(a->colmap);
1597     PLogObjectMemory(mat,(a->Nbs)*sizeof(int));
1598     PetscMemcpy(a->colmap,oldmat->colmap,(a->Nbs)*sizeof(int));
1599   } else a->colmap = 0;
1600   if (oldmat->garray && (len = ((Mat_SeqBAIJ *) (oldmat->B->data))->nbs)) {
1601     a->garray = (int *) PetscMalloc(len*sizeof(int)); CHKPTRQ(a->garray);
1602     PLogObjectMemory(mat,len*sizeof(int));
1603     PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1604   } else a->garray = 0;
1605 
1606   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1607   PLogObjectParent(mat,a->lvec);
1608   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1609   PLogObjectParent(mat,a->Mvctx);
1610   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1611   PLogObjectParent(mat,a->A);
1612   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1613   PLogObjectParent(mat,a->B);
1614   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1615   if (flg) {
1616     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1617   }
1618   *newmat = mat;
1619   return 0;
1620 }
1621 
1622 #include "sys.h"
1623 
1624 #undef __FUNC__
1625 #define __FUNC__ "MatLoad_MPIBAIJ"
1626 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
1627 {
1628   Mat          A;
1629   int          i, nz, ierr, j,rstart, rend, fd;
1630   Scalar       *vals,*buf;
1631   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1632   MPI_Status   status;
1633   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
1634   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
1635   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,bs2,Mbs,mbs,extra_rows;
1636   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
1637   int          dcount,kmax,k,nzcount,tmp;
1638 
1639 
1640   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
1641   bs2  = bs*bs;
1642 
1643   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1644   if (!rank) {
1645     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1646     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
1647     if (header[0] != MAT_COOKIE) SETERRQ(1,0,"not matrix object");
1648   }
1649 
1650   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1651   M = header[1]; N = header[2];
1652 
1653   if (M != N) SETERRQ(1,0,"Can only do square matrices");
1654 
1655   /*
1656      This code adds extra rows to make sure the number of rows is
1657      divisible by the blocksize
1658   */
1659   Mbs        = M/bs;
1660   extra_rows = bs - M + bs*(Mbs);
1661   if (extra_rows == bs) extra_rows = 0;
1662   else                  Mbs++;
1663   if (extra_rows &&!rank) {
1664     PLogInfo(0,"MatLoad_MPIBAIJ:Padding loaded matrix to match blocksize\n");
1665   }
1666 
1667   /* determine ownership of all rows */
1668   mbs = Mbs/size + ((Mbs % size) > rank);
1669   m   = mbs * bs;
1670   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
1671   browners = rowners + size + 1;
1672   MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1673   rowners[0] = 0;
1674   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
1675   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
1676   rstart = rowners[rank];
1677   rend   = rowners[rank+1];
1678 
1679   /* distribute row lengths to all processors */
1680   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
1681   if (!rank) {
1682     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
1683     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1684     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
1685     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1686     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
1687     MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);
1688     PetscFree(sndcounts);
1689   }
1690   else {
1691     MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);
1692   }
1693 
1694   if (!rank) {
1695     /* calculate the number of nonzeros on each processor */
1696     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1697     PetscMemzero(procsnz,size*sizeof(int));
1698     for ( i=0; i<size; i++ ) {
1699       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
1700         procsnz[i] += rowlengths[j];
1701       }
1702     }
1703     PetscFree(rowlengths);
1704 
1705     /* determine max buffer needed and allocate it */
1706     maxnz = 0;
1707     for ( i=0; i<size; i++ ) {
1708       maxnz = PetscMax(maxnz,procsnz[i]);
1709     }
1710     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1711 
1712     /* read in my part of the matrix column indices  */
1713     nz = procsnz[0];
1714     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1715     mycols = ibuf;
1716     if (size == 1)  nz -= extra_rows;
1717     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1718     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
1719 
1720     /* read in every ones (except the last) and ship off */
1721     for ( i=1; i<size-1; i++ ) {
1722       nz = procsnz[i];
1723       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1724       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1725     }
1726     /* read in the stuff for the last proc */
1727     if ( size != 1 ) {
1728       nz = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
1729       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1730       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
1731       MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);
1732     }
1733     PetscFree(cols);
1734   }
1735   else {
1736     /* determine buffer space needed for message */
1737     nz = 0;
1738     for ( i=0; i<m; i++ ) {
1739       nz += locrowlens[i];
1740     }
1741     ibuf = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1742     mycols = ibuf;
1743     /* receive message of column indices*/
1744     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1745     MPI_Get_count(&status,MPI_INT,&maxnz);
1746     if (maxnz != nz) SETERRQ(1,0,"something is wrong with file");
1747   }
1748 
1749   /* loop over local rows, determining number of off diagonal entries */
1750   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
1751   odlens = dlens + (rend-rstart);
1752   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
1753   PetscMemzero(mask,3*Mbs*sizeof(int));
1754   masked1 = mask    + Mbs;
1755   masked2 = masked1 + Mbs;
1756   rowcount = 0; nzcount = 0;
1757   for ( i=0; i<mbs; i++ ) {
1758     dcount  = 0;
1759     odcount = 0;
1760     for ( j=0; j<bs; j++ ) {
1761       kmax = locrowlens[rowcount];
1762       for ( k=0; k<kmax; k++ ) {
1763         tmp = mycols[nzcount++]/bs;
1764         if (!mask[tmp]) {
1765           mask[tmp] = 1;
1766           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
1767           else masked1[dcount++] = tmp;
1768         }
1769       }
1770       rowcount++;
1771     }
1772 
1773     dlens[i]  = dcount;
1774     odlens[i] = odcount;
1775 
1776     /* zero out the mask elements we set */
1777     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
1778     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
1779   }
1780 
1781   /* create our matrix */
1782   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);
1783          CHKERRQ(ierr);
1784   A = *newmat;
1785   MatSetOption(A,MAT_COLUMNS_SORTED);
1786 
1787   if (!rank) {
1788     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
1789     /* read in my part of the matrix numerical values  */
1790     nz = procsnz[0];
1791     vals = buf;
1792     mycols = ibuf;
1793     if (size == 1)  nz -= extra_rows;
1794     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1795     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
1796 
1797     /* insert into matrix */
1798     jj      = rstart*bs;
1799     for ( i=0; i<m; i++ ) {
1800       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1801       mycols += locrowlens[i];
1802       vals   += locrowlens[i];
1803       jj++;
1804     }
1805     /* read in other processors (except the last one) and ship out */
1806     for ( i=1; i<size-1; i++ ) {
1807       nz = procsnz[i];
1808       vals = buf;
1809       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1810       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1811     }
1812     /* the last proc */
1813     if ( size != 1 ){
1814       nz = procsnz[i] - extra_rows;
1815       vals = buf;
1816       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1817       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
1818       MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);
1819     }
1820     PetscFree(procsnz);
1821   }
1822   else {
1823     /* receive numeric values */
1824     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
1825 
1826     /* receive message of values*/
1827     vals = buf;
1828     mycols = ibuf;
1829     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1830     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1831     if (maxnz != nz) SETERRQ(1,0,"something is wrong with file");
1832 
1833     /* insert into matrix */
1834     jj      = rstart*bs;
1835     for ( i=0; i<m; i++ ) {
1836       ierr    = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1837       mycols += locrowlens[i];
1838       vals   += locrowlens[i];
1839       jj++;
1840     }
1841   }
1842   PetscFree(locrowlens);
1843   PetscFree(buf);
1844   PetscFree(ibuf);
1845   PetscFree(rowners);
1846   PetscFree(dlens);
1847   PetscFree(mask);
1848   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1849   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1850   return 0;
1851 }
1852 
1853 
1854