xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision 0bbee1111da17c17490e1dbef69beeec19afa96b)
1 #ifndef lint
2 static char vcid[] = "$Id: mpibaij.c,v 1.50 1997/01/14 17:15:35 balay Exp balay $";
3 #endif
4 
5 #include "src/mat/impls/baij/mpi/mpibaij.h"
6 #include "src/vec/vecimpl.h"
7 
8 #include "draw.h"
9 #include "pinclude/pviewer.h"
10 
11 extern int MatSetUpMultiply_MPIBAIJ(Mat);
12 extern int DisAssemble_MPIBAIJ(Mat);
13 extern int MatIncreaseOverlap_MPIBAIJ(Mat,int,IS *,int);
14 extern int MatGetSubMatrices_MPIBAIJ(Mat,int,IS *,IS *,MatGetSubMatrixCall,Mat **);
15 extern int MatLUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,Mat *);
16 extern int MatLUFactorNumeric_MPIBAIJ(Mat,Mat *);
17 extern int MatLUFactor_MPIBAIJ(Mat,IS,IS,double);
18 extern int MatSolve_MPIBAIJ(Mat,Vec,Vec);
19 extern int MatSolveAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
20 extern int MatSolveTrans_MPIBAIJ(Mat,Vec,Vec);
21 extern int MatSolveTransAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
22 extern int MatILUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,int,Mat *);
23 
24 
25 /*
26      Local utility routine that creates a mapping from the global column
27    number to the local number in the off-diagonal part of the local
28    storage of the matrix.  This is done in a non scable way since the
29    length of colmap equals the global matrix length.
30 */
31 #undef __FUNC__
32 #define __FUNC__ "CreateColmap_MPIBAIJ_Private"
33 static int CreateColmap_MPIBAIJ_Private(Mat mat)
34 {
35   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
36   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
37   int         nbs = B->nbs,i,bs=B->bs;;
38 
39   baij->colmap = (int *) PetscMalloc(baij->Nbs*sizeof(int));CHKPTRQ(baij->colmap);
40   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
41   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
42   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i*bs+1;
43   return 0;
44 }
45 
46 #undef __FUNC__
47 #define __FUNC__ "MatGetRowIJ_MPIBAIJ("
48 static int MatGetRowIJ_MPIBAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
49                             PetscTruth *done)
50 {
51   Mat_MPIBAIJ *aij = (Mat_MPIBAIJ *) mat->data;
52   int         ierr;
53   if (aij->size == 1) {
54     ierr = MatGetRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
55   } else SETERRQ(1,0,"not supported in parallel");
56   return 0;
57 }
58 
59 #undef __FUNC__
60 #define __FUNC__ "MatRestoreRowIJ_MPIBAIJ"
61 static int MatRestoreRowIJ_MPIBAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
62                                 PetscTruth *done)
63 {
64   Mat_MPIBAIJ *aij = (Mat_MPIBAIJ *) mat->data;
65   int        ierr;
66   if (aij->size == 1) {
67     ierr = MatRestoreRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
68   } else SETERRQ(1,0,"not supported in parallel");
69   return 0;
70 }
71 #define CHUNKSIZE  10
72 
73 #define  MatSetValues_SeqBAIJ_A_Private(row,col,value,addv) \
74 { \
75  \
76     brow = row/bs;  \
77     rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
78     rmax = imax[brow]; nrow = ailen[brow]; \
79       bcol = col/bs; \
80       ridx = row % bs; cidx = col % bs; \
81       for ( _i=0; _i<nrow; _i++ ) { \
82         if (rp[_i] > bcol) break; \
83         if (rp[_i] == bcol) { \
84           bap  = ap +  bs2*_i + bs*cidx + ridx; \
85           if (addv == ADD_VALUES) *bap += value;  \
86           else                    *bap  = value;  \
87           goto _noinsert; \
88         } \
89       } \
90       if (a->nonew) goto _noinsert; \
91       if (nrow >= rmax) { \
92         /* there is no extra room in row, therefore enlarge */ \
93         int    new_nz = ai[a->mbs] + CHUNKSIZE,len,*new_i,*new_j; \
94         Scalar *new_a; \
95  \
96         /* malloc new storage space */ \
97         len     = new_nz*(sizeof(int)+bs2*sizeof(Scalar))+(a->mbs+1)*sizeof(int); \
98         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
99         new_j   = (int *) (new_a + bs2*new_nz); \
100         new_i   = new_j + new_nz; \
101  \
102         /* copy over old data into new slots */ \
103         for ( ii=0; ii<brow+1; ii++ ) {new_i[ii] = ai[ii];} \
104         for ( ii=brow+1; ii<a->mbs+1; ii++ ) {new_i[ii] = ai[ii]+CHUNKSIZE;} \
105         PetscMemcpy(new_j,aj,(ai[brow]+nrow)*sizeof(int)); \
106         len = (new_nz - CHUNKSIZE - ai[brow] - nrow); \
107         PetscMemcpy(new_j+ai[brow]+nrow+CHUNKSIZE,aj+ai[brow]+nrow, \
108                                                            len*sizeof(int)); \
109         PetscMemcpy(new_a,aa,(ai[brow]+nrow)*bs2*sizeof(Scalar)); \
110         PetscMemzero(new_a+bs2*(ai[brow]+nrow),bs2*CHUNKSIZE*sizeof(Scalar)); \
111         PetscMemcpy(new_a+bs2*(ai[brow]+nrow+CHUNKSIZE), \
112                     aa+bs2*(ai[brow]+nrow),bs2*len*sizeof(Scalar));  \
113         /* free up old matrix storage */ \
114         PetscFree(a->a);  \
115         if (!a->singlemalloc) {PetscFree(a->i);PetscFree(a->j);} \
116         aa = a->a = new_a; ai = a->i = new_i; aj = a->j = new_j;  \
117         a->singlemalloc = 1; \
118  \
119         rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
120         rmax = imax[brow] = imax[brow] + CHUNKSIZE; \
121         PLogObjectMemory(A,CHUNKSIZE*(sizeof(int) + bs2*sizeof(Scalar))); \
122         a->maxnz += bs2*CHUNKSIZE; \
123         a->reallocs++; \
124         a->nz++; \
125       } \
126       N = nrow++ - 1;  \
127       /* shift up all the later entries in this row */ \
128       for ( ii=N; ii>=_i; ii-- ) { \
129         rp[ii+1] = rp[ii]; \
130         PetscMemcpy(ap+bs2*(ii+1),ap+bs2*(ii),bs2*sizeof(Scalar)); \
131       } \
132       if (N>=_i) PetscMemzero(ap+bs2*_i,bs2*sizeof(Scalar));  \
133       rp[_i]                      = bcol;  \
134       ap[bs2*_i + bs*cidx + ridx] = value;  \
135       _noinsert:; \
136     ailen[brow] = nrow; \
137 }
138 
139 extern int MatSetValues_SeqBAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
140 #undef __FUNC__
141 #define __FUNC__ "MatSetValues_MPIBAIJ"
142 static int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
143 {
144   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
145   Scalar      value;
146   int         ierr,i,j,row,col;
147   int         roworiented = baij->roworiented,rstart_orig=baij->rstart_bs ;
148   int         rend_orig=baij->rend_bs,cstart_orig=baij->cstart_bs;
149   int         cend_orig=baij->cend_bs,bs=baij->bs;
150 
151   /* Some Variables required in the macro */
152   Mat         A = baij->A;
153   Mat_SeqBAIJ *a = (Mat_SeqBAIJ *) (A)->data;
154   int         *rp,ii,nrow,_i,rmax,N;
155   int         *imax=a->imax,*ai=a->i,*ailen=a->ilen;
156   int         *aj=a->j,brow,bcol;
157   int         ridx,cidx,bs2=a->bs2;
158   Scalar      *ap,*aa=a->a,*bap;
159 
160 #if defined(PETSC_BOPT_g)
161   if (baij->insertmode != NOT_SET_VALUES && baij->insertmode != addv) {
162     SETERRQ(1,0,"Cannot mix inserts and adds");
163   }
164 #endif
165   baij->insertmode = addv;
166   for ( i=0; i<m; i++ ) {
167 #if defined(PETSC_BOPT_g)
168     if (im[i] < 0) SETERRQ(1,0,"Negative row");
169     if (im[i] >= baij->M) SETERRQ(1,0,"Row too large");
170 #endif
171     if (im[i] >= rstart_orig && im[i] < rend_orig) {
172       row = im[i] - rstart_orig;
173       for ( j=0; j<n; j++ ) {
174         if (in[j] >= cstart_orig && in[j] < cend_orig){
175           col = in[j] - cstart_orig;
176           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
177           MatSetValues_SeqBAIJ_A_Private(row,col,value,addv);
178           /* ierr = MatSetValues_SeqBAIJ(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
179         }
180 #if defined(PETSC_BOPT_g)
181         else if (in[j] < 0) {SETERRQ(1,0,"Negative column");}
182         else if (in[j] >= baij->N) {SETERRQ(1,0,"Col too large");}
183 #endif
184         else {
185           if (mat->was_assembled) {
186             if (!baij->colmap) {
187               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
188             }
189             col = baij->colmap[in[j]/bs] - 1 + in[j]%bs;
190             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
191               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
192               col =  in[j];
193             }
194           }
195           else col = in[j];
196           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
197           ierr = MatSetValues_SeqBAIJ(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
198         }
199       }
200     }
201     else {
202       if (roworiented && !baij->donotstash) {
203         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
204       }
205       else {
206         if (!baij->donotstash) {
207           row = im[i];
208 	  for ( j=0; j<n; j++ ) {
209 	    ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
210           }
211         }
212       }
213     }
214   }
215   return 0;
216 }
217 
218 #undef __FUNC__
219 #define __FUNC__ "MatGetValues_MPIBAIJ"
220 static int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
221 {
222   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
223   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
224   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col;
225 
226   for ( i=0; i<m; i++ ) {
227     if (idxm[i] < 0) SETERRQ(1,0,"Negative row");
228     if (idxm[i] >= baij->M) SETERRQ(1,0,"Row too large");
229     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
230       row = idxm[i] - bsrstart;
231       for ( j=0; j<n; j++ ) {
232         if (idxn[j] < 0) SETERRQ(1,0,"Negative column");
233         if (idxn[j] >= baij->N) SETERRQ(1,0,"Col too large");
234         if (idxn[j] >= bscstart && idxn[j] < bscend){
235           col = idxn[j] - bscstart;
236           ierr = MatGetValues(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
237         }
238         else {
239           if (!baij->colmap) {
240             ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
241           }
242           if((baij->colmap[idxn[j]/bs]-1 < 0) ||
243              (baij->garray[(baij->colmap[idxn[j]/bs]-1)/bs] != idxn[j]/bs)) *(v+i*n+j) = 0.0;
244           else {
245             col  = (baij->colmap[idxn[j]/bs]-1) + idxn[j]%bs;
246             ierr = MatGetValues(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
247           }
248         }
249       }
250     }
251     else {
252       SETERRQ(1,0,"Only local values currently supported");
253     }
254   }
255   return 0;
256 }
257 
258 #undef __FUNC__
259 #define __FUNC__ "MatNorm_MPIBAIJ"
260 static int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
261 {
262   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
263   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
264   int        ierr, i,bs2=baij->bs2;
265   double     sum = 0.0;
266   Scalar     *v;
267 
268   if (baij->size == 1) {
269     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
270   } else {
271     if (type == NORM_FROBENIUS) {
272       v = amat->a;
273       for (i=0; i<amat->nz*bs2; i++ ) {
274 #if defined(PETSC_COMPLEX)
275         sum += real(conj(*v)*(*v)); v++;
276 #else
277         sum += (*v)*(*v); v++;
278 #endif
279       }
280       v = bmat->a;
281       for (i=0; i<bmat->nz*bs2; i++ ) {
282 #if defined(PETSC_COMPLEX)
283         sum += real(conj(*v)*(*v)); v++;
284 #else
285         sum += (*v)*(*v); v++;
286 #endif
287       }
288       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
289       *norm = sqrt(*norm);
290     }
291     else
292       SETERRQ(PETSC_ERR_SUP,0,"No support for this norm yet");
293   }
294   return 0;
295 }
296 
297 #undef __FUNC__
298 #define __FUNC__ "MatAssemblyBegin_MPIBAIJ"
299 static int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
300 {
301   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
302   MPI_Comm    comm = mat->comm;
303   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
304   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
305   MPI_Request *send_waits,*recv_waits;
306   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
307   InsertMode  addv;
308   Scalar      *rvalues,*svalues;
309 
310   /* make sure all processors are either in INSERTMODE or ADDMODE */
311   MPI_Allreduce(&baij->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
312   if (addv == (ADD_VALUES|INSERT_VALUES)) {
313     SETERRQ(1,0,"Some processors inserted others added");
314   }
315   baij->insertmode = addv; /* in case this processor had no cache */
316 
317   /*  first count number of contributors to each processor */
318   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
319   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
320   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
321   for ( i=0; i<baij->stash.n; i++ ) {
322     idx = baij->stash.idx[i];
323     for ( j=0; j<size; j++ ) {
324       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
325         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
326       }
327     }
328   }
329   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
330 
331   /* inform other processors of number of messages and max length*/
332   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
333   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
334   nreceives = work[rank];
335   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
336   nmax = work[rank];
337   PetscFree(work);
338 
339   /* post receives:
340        1) each message will consist of ordered pairs
341      (global index,value) we store the global index as a double
342      to simplify the message passing.
343        2) since we don't know how long each individual message is we
344      allocate the largest needed buffer for each receive. Potentially
345      this is a lot of wasted space.
346 
347 
348        This could be done better.
349   */
350   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
351   CHKPTRQ(rvalues);
352   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
353   CHKPTRQ(recv_waits);
354   for ( i=0; i<nreceives; i++ ) {
355     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
356               comm,recv_waits+i);
357   }
358 
359   /* do sends:
360       1) starts[i] gives the starting index in svalues for stuff going to
361          the ith processor
362   */
363   svalues = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
364   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
365   CHKPTRQ(send_waits);
366   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
367   starts[0] = 0;
368   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
369   for ( i=0; i<baij->stash.n; i++ ) {
370     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
371     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
372     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
373   }
374   PetscFree(owner);
375   starts[0] = 0;
376   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
377   count = 0;
378   for ( i=0; i<size; i++ ) {
379     if (procs[i]) {
380       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
381                 comm,send_waits+count++);
382     }
383   }
384   PetscFree(starts); PetscFree(nprocs);
385 
386   /* Free cache space */
387   PLogInfo(mat,"MatAssemblyBegin_MPIBAIJ:Number of off-processor values %d\n",baij->stash.n);
388   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
389 
390   baij->svalues    = svalues;    baij->rvalues    = rvalues;
391   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
392   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
393   baij->rmax       = nmax;
394 
395   return 0;
396 }
397 
398 
399 #undef __FUNC__
400 #define __FUNC__ "MatAssemblyEnd_MPIBAIJ"
401 static int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
402 {
403   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
404   MPI_Status  *send_status,recv_status;
405   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
406   int         bs=baij->bs,row,col,other_disassembled;
407   Scalar      *values,val;
408   InsertMode  addv = baij->insertmode;
409 
410   /*  wait on receives */
411   while (count) {
412     MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);
413     /* unpack receives into our local space */
414     values = baij->rvalues + 3*imdex*baij->rmax;
415     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
416     n = n/3;
417     for ( i=0; i<n; i++ ) {
418       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
419       col = (int) PetscReal(values[3*i+1]);
420       val = values[3*i+2];
421       if (col >= baij->cstart*bs && col < baij->cend*bs) {
422         col -= baij->cstart*bs;
423         MatSetValues(baij->A,1,&row,1,&col,&val,addv);
424       }
425       else {
426         if (mat->was_assembled) {
427           if (!baij->colmap) {
428             ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);
429           }
430           col = (baij->colmap[col/bs]-1)*bs + col%bs;
431           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
432             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
433             col = (int) PetscReal(values[3*i+1]);
434           }
435         }
436         MatSetValues(baij->B,1,&row,1,&col,&val,addv);
437       }
438     }
439     count--;
440   }
441   PetscFree(baij->recv_waits); PetscFree(baij->rvalues);
442 
443   /* wait on sends */
444   if (baij->nsends) {
445     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));
446     CHKPTRQ(send_status);
447     MPI_Waitall(baij->nsends,baij->send_waits,send_status);
448     PetscFree(send_status);
449   }
450   PetscFree(baij->send_waits); PetscFree(baij->svalues);
451 
452   baij->insertmode = NOT_SET_VALUES;
453   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
454   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
455 
456   /* determine if any processor has disassembled, if so we must
457      also disassemble ourselfs, in order that we may reassemble. */
458   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
459   if (mat->was_assembled && !other_disassembled) {
460     ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
461   }
462 
463   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
464     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
465   }
466   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
467   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
468 
469   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
470   return 0;
471 }
472 
473 #undef __FUNC__
474 #define __FUNC__ "MatView_MPIBAIJ_Binary"
475 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
476 {
477   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
478   int          ierr;
479 
480   if (baij->size == 1) {
481     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
482   }
483   else SETERRQ(1,0,"Only uniprocessor output supported");
484   return 0;
485 }
486 
487 #undef __FUNC__
488 #define __FUNC__ "MatView_MPIBAIJ_ASCIIorDraworMatlab"
489 static int MatView_MPIBAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
490 {
491   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
492   int          ierr, format,rank,bs = baij->bs;
493   FILE         *fd;
494   ViewerType   vtype;
495 
496   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
497   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
498     ierr = ViewerGetFormat(viewer,&format);
499     if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
500       MatInfo info;
501       MPI_Comm_rank(mat->comm,&rank);
502       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
503       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
504       PetscSequentialPhaseBegin(mat->comm,1);
505       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
506               rank,baij->m,(int)info.nz_used*bs,(int)info.nz_allocated*bs,
507               baij->bs,(int)info.memory);
508       ierr = MatGetInfo(baij->A,MAT_LOCAL,&info);
509       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
510       ierr = MatGetInfo(baij->B,MAT_LOCAL,&info);
511       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
512       fflush(fd);
513       PetscSequentialPhaseEnd(mat->comm,1);
514       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
515       return 0;
516     }
517     else if (format == VIEWER_FORMAT_ASCII_INFO) {
518       PetscPrintf(mat->comm,"  block size is %d\n",bs);
519       return 0;
520     }
521   }
522 
523   if (vtype == DRAW_VIEWER) {
524     Draw       draw;
525     PetscTruth isnull;
526     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
527     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
528   }
529 
530   if (vtype == ASCII_FILE_VIEWER) {
531     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
532     PetscSequentialPhaseBegin(mat->comm,1);
533     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
534            baij->rank,baij->m,baij->rstart*bs,baij->rend*bs,baij->n,
535             baij->cstart*bs,baij->cend*bs);
536     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
537     ierr = MatView(baij->B,viewer); CHKERRQ(ierr);
538     fflush(fd);
539     PetscSequentialPhaseEnd(mat->comm,1);
540   }
541   else {
542     int size = baij->size;
543     rank = baij->rank;
544     if (size == 1) {
545       ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
546     }
547     else {
548       /* assemble the entire matrix onto first processor. */
549       Mat         A;
550       Mat_SeqBAIJ *Aloc;
551       int         M = baij->M, N = baij->N,*ai,*aj,row,col,i,j,k,*rvals;
552       int         mbs=baij->mbs;
553       Scalar      *a;
554 
555       if (!rank) {
556         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
557         CHKERRQ(ierr);
558       }
559       else {
560         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
561         CHKERRQ(ierr);
562       }
563       PLogObjectParent(mat,A);
564 
565       /* copy over the A part */
566       Aloc = (Mat_SeqBAIJ*) baij->A->data;
567       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
568       row = baij->rstart;
569       rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
570 
571       for ( i=0; i<mbs; i++ ) {
572         rvals[0] = bs*(baij->rstart + i);
573         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
574         for ( j=ai[i]; j<ai[i+1]; j++ ) {
575           col = (baij->cstart+aj[j])*bs;
576           for (k=0; k<bs; k++ ) {
577             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
578             col++; a += bs;
579           }
580         }
581       }
582       /* copy over the B part */
583       Aloc = (Mat_SeqBAIJ*) baij->B->data;
584       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
585       row = baij->rstart*bs;
586       for ( i=0; i<mbs; i++ ) {
587         rvals[0] = bs*(baij->rstart + i);
588         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
589         for ( j=ai[i]; j<ai[i+1]; j++ ) {
590           col = baij->garray[aj[j]]*bs;
591           for (k=0; k<bs; k++ ) {
592             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
593             col++; a += bs;
594           }
595         }
596       }
597       PetscFree(rvals);
598       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
599       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
600       if (!rank) {
601         ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
602       }
603       ierr = MatDestroy(A); CHKERRQ(ierr);
604     }
605   }
606   return 0;
607 }
608 
609 
610 
611 #undef __FUNC__
612 #define __FUNC__ "MatView_MPIBAIJ"
613 static int MatView_MPIBAIJ(PetscObject obj,Viewer viewer)
614 {
615   Mat         mat = (Mat) obj;
616   int         ierr;
617   ViewerType  vtype;
618 
619   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
620   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
621       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
622     ierr = MatView_MPIBAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
623   }
624   else if (vtype == BINARY_FILE_VIEWER) {
625     return MatView_MPIBAIJ_Binary(mat,viewer);
626   }
627   return 0;
628 }
629 
630 #undef __FUNC__
631 #define __FUNC__ "MatDestroy_MPIBAIJ"
632 static int MatDestroy_MPIBAIJ(PetscObject obj)
633 {
634   Mat         mat = (Mat) obj;
635   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
636   int         ierr;
637 
638 #if defined(PETSC_LOG)
639   PLogObjectState(obj,"Rows=%d, Cols=%d",baij->M,baij->N);
640 #endif
641 
642   PetscFree(baij->rowners);
643   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
644   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
645   if (baij->colmap) PetscFree(baij->colmap);
646   if (baij->garray) PetscFree(baij->garray);
647   if (baij->lvec)   VecDestroy(baij->lvec);
648   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
649   if (baij->rowvalues) PetscFree(baij->rowvalues);
650   PetscFree(baij);
651   if (mat->mapping) {
652     ierr = ISLocalToGlobalMappingDestroy(mat->mapping); CHKERRQ(ierr);
653   }
654   PLogObjectDestroy(mat);
655   PetscHeaderDestroy(mat);
656   return 0;
657 }
658 
659 #undef __FUNC__
660 #define __FUNC__ "MatMult_MPIBAIJ"
661 static int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
662 {
663   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
664   int         ierr, nt;
665 
666   VecGetLocalSize_Fast(xx,nt);
667   if (nt != a->n) {
668     SETERRQ(1,0,"Incompatible parition of A and xx");
669   }
670   VecGetLocalSize_Fast(yy,nt);
671   if (nt != a->m) {
672     SETERRQ(1,0,"Incompatible parition of A and yy");
673   }
674   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
675   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
676   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
677   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
678   ierr = VecScatterPostRecvs(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
679   return 0;
680 }
681 
682 #undef __FUNC__
683 #define __FUNC__ "MatMultAdd_MPIBAIJ"
684 static int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
685 {
686   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
687   int        ierr;
688   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
689   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
690   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
691   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
692   return 0;
693 }
694 
695 #undef __FUNC__
696 #define __FUNC__ "MatMultTrans_MPIBAIJ"
697 static int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
698 {
699   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
700   int        ierr;
701 
702   /* do nondiagonal part */
703   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
704   /* send it on its way */
705   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
706   /* do local part */
707   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
708   /* receive remote parts: note this assumes the values are not actually */
709   /* inserted in yy until the next line, which is true for my implementation*/
710   /* but is not perhaps always true. */
711   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
712   return 0;
713 }
714 
715 #undef __FUNC__
716 #define __FUNC__ "MatMultTransAdd_MPIBAIJ"
717 static int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
718 {
719   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
720   int        ierr;
721 
722   /* do nondiagonal part */
723   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
724   /* send it on its way */
725   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
726   /* do local part */
727   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
728   /* receive remote parts: note this assumes the values are not actually */
729   /* inserted in yy until the next line, which is true for my implementation*/
730   /* but is not perhaps always true. */
731   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
732   return 0;
733 }
734 
735 /*
736   This only works correctly for square matrices where the subblock A->A is the
737    diagonal block
738 */
739 #undef __FUNC__
740 #define __FUNC__ "MatGetDiagonal_MPIBAIJ"
741 static int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
742 {
743   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
744   if (a->M != a->N)
745     SETERRQ(1,0,"Supports only square matrix where A->A is diag block");
746   return MatGetDiagonal(a->A,v);
747 }
748 
749 #undef __FUNC__
750 #define __FUNC__ "MatScale_MPIBAIJ"
751 static int MatScale_MPIBAIJ(Scalar *aa,Mat A)
752 {
753   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
754   int        ierr;
755   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
756   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
757   return 0;
758 }
759 
760 #undef __FUNC__
761 #define __FUNC__ "MatGetSize_MPIBAIJ"
762 static int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
763 {
764   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
765   *m = mat->M; *n = mat->N;
766   return 0;
767 }
768 
769 #undef __FUNC__
770 #define __FUNC__ "MatGetLocalSize_MPIBAIJ"
771 static int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
772 {
773   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
774   *m = mat->m; *n = mat->N;
775   return 0;
776 }
777 
778 #undef __FUNC__
779 #define __FUNC__ "MatGetOwnershipRange_MPIBAIJ"
780 static int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
781 {
782   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
783   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
784   return 0;
785 }
786 
787 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
788 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
789 
790 #undef __FUNC__
791 #define __FUNC__ "MatGetRow_MPIBAIJ"
792 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
793 {
794   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
795   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
796   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
797   int        nztot, nzA, nzB, lrow, brstart = mat->rstart*bs, brend = mat->rend*bs;
798   int        *cmap, *idx_p,cstart = mat->cstart;
799 
800   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,0,"Already active");
801   mat->getrowactive = PETSC_TRUE;
802 
803   if (!mat->rowvalues && (idx || v)) {
804     /*
805         allocate enough space to hold information from the longest row.
806     */
807     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
808     int     max = 1,mbs = mat->mbs,tmp;
809     for ( i=0; i<mbs; i++ ) {
810       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
811       if (max < tmp) { max = tmp; }
812     }
813     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
814     CHKPTRQ(mat->rowvalues);
815     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
816   }
817 
818 
819   if (row < brstart || row >= brend) SETERRQ(1,0,"Only local rows")
820   lrow = row - brstart;
821 
822   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
823   if (!v)   {pvA = 0; pvB = 0;}
824   if (!idx) {pcA = 0; if (!v) pcB = 0;}
825   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
826   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
827   nztot = nzA + nzB;
828 
829   cmap  = mat->garray;
830   if (v  || idx) {
831     if (nztot) {
832       /* Sort by increasing column numbers, assuming A and B already sorted */
833       int imark = -1;
834       if (v) {
835         *v = v_p = mat->rowvalues;
836         for ( i=0; i<nzB; i++ ) {
837           if (cmap[cworkB[i]/bs] < cstart)   v_p[i] = vworkB[i];
838           else break;
839         }
840         imark = i;
841         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
842         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
843       }
844       if (idx) {
845         *idx = idx_p = mat->rowindices;
846         if (imark > -1) {
847           for ( i=0; i<imark; i++ ) {
848             idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs;
849           }
850         } else {
851           for ( i=0; i<nzB; i++ ) {
852             if (cmap[cworkB[i]/bs] < cstart)
853               idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
854             else break;
855           }
856           imark = i;
857         }
858         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart*bs + cworkA[i];
859         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
860       }
861     }
862     else {
863       if (idx) *idx = 0;
864       if (v)   *v   = 0;
865     }
866   }
867   *nz = nztot;
868   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
869   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
870   return 0;
871 }
872 
873 #undef __FUNC__
874 #define __FUNC__ "MatRestoreRow_MPIBAIJ"
875 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
876 {
877   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
878   if (baij->getrowactive == PETSC_FALSE) {
879     SETERRQ(1,0,"MatGetRow not called");
880   }
881   baij->getrowactive = PETSC_FALSE;
882   return 0;
883 }
884 
885 #undef __FUNC__
886 #define __FUNC__ "MatGetBlockSize_MPIBAIJ"
887 static int MatGetBlockSize_MPIBAIJ(Mat mat,int *bs)
888 {
889   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
890   *bs = baij->bs;
891   return 0;
892 }
893 
894 #undef __FUNC__
895 #define __FUNC__ "MatZeroEntries_MPIBAIJ"
896 static int MatZeroEntries_MPIBAIJ(Mat A)
897 {
898   Mat_MPIBAIJ *l = (Mat_MPIBAIJ *) A->data;
899   int         ierr;
900   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
901   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
902   return 0;
903 }
904 
905 #undef __FUNC__
906 #define __FUNC__ "MatGetInfo_MPIBAIJ"
907 static int MatGetInfo_MPIBAIJ(Mat matin,MatInfoType flag,MatInfo *info)
908 {
909   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) matin->data;
910   Mat         A = a->A, B = a->B;
911   int         ierr;
912   double      isend[5], irecv[5];
913 
914   info->rows_global    = (double)a->M;
915   info->columns_global = (double)a->N;
916   info->rows_local     = (double)a->m;
917   info->columns_local  = (double)a->N;
918   info->block_size     = (double)a->bs;
919   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
920   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->memory;
921   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
922   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->memory;
923   if (flag == MAT_LOCAL) {
924     info->nz_used      = isend[0];
925     info->nz_allocated = isend[1];
926     info->nz_unneeded  = isend[2];
927     info->memory       = isend[3];
928     info->mallocs      = isend[4];
929   } else if (flag == MAT_GLOBAL_MAX) {
930     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_MAX,matin->comm);
931     info->nz_used      = irecv[0];
932     info->nz_allocated = irecv[1];
933     info->nz_unneeded  = irecv[2];
934     info->memory       = irecv[3];
935     info->mallocs      = irecv[4];
936   } else if (flag == MAT_GLOBAL_SUM) {
937     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_SUM,matin->comm);
938     info->nz_used      = irecv[0];
939     info->nz_allocated = irecv[1];
940     info->nz_unneeded  = irecv[2];
941     info->memory       = irecv[3];
942     info->mallocs      = irecv[4];
943   }
944   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
945   info->fill_ratio_needed = 0;
946   info->factor_mallocs    = 0;
947   return 0;
948 }
949 
950 #undef __FUNC__
951 #define __FUNC__ "MatSetOption_MPIBAIJ"
952 static int MatSetOption_MPIBAIJ(Mat A,MatOption op)
953 {
954   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
955 
956   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
957       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
958       op == MAT_COLUMNS_UNSORTED ||
959       op == MAT_COLUMNS_SORTED) {
960         MatSetOption(a->A,op);
961         MatSetOption(a->B,op);
962   } else if (op == MAT_ROW_ORIENTED) {
963         a->roworiented = 1;
964         MatSetOption(a->A,op);
965         MatSetOption(a->B,op);
966   } else if (op == MAT_ROWS_SORTED ||
967              op == MAT_ROWS_UNSORTED ||
968              op == MAT_SYMMETRIC ||
969              op == MAT_STRUCTURALLY_SYMMETRIC ||
970              op == MAT_YES_NEW_DIAGONALS)
971     PLogInfo(A,"Info:MatSetOption_MPIBAIJ:Option ignored\n");
972   else if (op == MAT_COLUMN_ORIENTED) {
973     a->roworiented = 0;
974     MatSetOption(a->A,op);
975     MatSetOption(a->B,op);
976   } else if (op == MAT_IGNORE_OFF_PROCESSOR_ENTRIES) {
977     a->donotstash = 1;
978   } else if (op == MAT_NO_NEW_DIAGONALS)
979     {SETERRQ(PETSC_ERR_SUP,0,"MAT_NO_NEW_DIAGONALS");}
980   else
981     {SETERRQ(PETSC_ERR_SUP,0,"unknown option");}
982   return 0;
983 }
984 
985 #undef __FUNC__
986 #define __FUNC__ "MatTranspose_MPIBAIJ("
987 static int MatTranspose_MPIBAIJ(Mat A,Mat *matout)
988 {
989   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) A->data;
990   Mat_SeqBAIJ *Aloc;
991   Mat        B;
992   int        ierr,M=baij->M,N=baij->N,*ai,*aj,row,i,*rvals,j,k,col;
993   int        bs=baij->bs,mbs=baij->mbs;
994   Scalar     *a;
995 
996   if (matout == PETSC_NULL && M != N)
997     SETERRQ(1,0,"Square matrix only for in-place");
998   ierr = MatCreateMPIBAIJ(A->comm,baij->bs,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);
999   CHKERRQ(ierr);
1000 
1001   /* copy over the A part */
1002   Aloc = (Mat_SeqBAIJ*) baij->A->data;
1003   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1004   row = baij->rstart;
1005   rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
1006 
1007   for ( i=0; i<mbs; i++ ) {
1008     rvals[0] = bs*(baij->rstart + i);
1009     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1010     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1011       col = (baij->cstart+aj[j])*bs;
1012       for (k=0; k<bs; k++ ) {
1013         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1014         col++; a += bs;
1015       }
1016     }
1017   }
1018   /* copy over the B part */
1019   Aloc = (Mat_SeqBAIJ*) baij->B->data;
1020   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1021   row = baij->rstart*bs;
1022   for ( i=0; i<mbs; i++ ) {
1023     rvals[0] = bs*(baij->rstart + i);
1024     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1025     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1026       col = baij->garray[aj[j]]*bs;
1027       for (k=0; k<bs; k++ ) {
1028         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1029         col++; a += bs;
1030       }
1031     }
1032   }
1033   PetscFree(rvals);
1034   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1035   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1036 
1037   if (matout != PETSC_NULL) {
1038     *matout = B;
1039   } else {
1040     /* This isn't really an in-place transpose .... but free data structures from baij */
1041     PetscFree(baij->rowners);
1042     ierr = MatDestroy(baij->A); CHKERRQ(ierr);
1043     ierr = MatDestroy(baij->B); CHKERRQ(ierr);
1044     if (baij->colmap) PetscFree(baij->colmap);
1045     if (baij->garray) PetscFree(baij->garray);
1046     if (baij->lvec) VecDestroy(baij->lvec);
1047     if (baij->Mvctx) VecScatterDestroy(baij->Mvctx);
1048     PetscFree(baij);
1049     PetscMemcpy(A,B,sizeof(struct _Mat));
1050     PetscHeaderDestroy(B);
1051   }
1052   return 0;
1053 }
1054 
1055 #undef __FUNC__
1056 #define __FUNC__ "MatDiagonalScale_MPIBAIJ"
1057 int MatDiagonalScale_MPIBAIJ(Mat A,Vec ll,Vec rr)
1058 {
1059   Mat a = ((Mat_MPIBAIJ *) A->data)->A;
1060   Mat b = ((Mat_MPIBAIJ *) A->data)->B;
1061   int ierr,s1,s2,s3;
1062 
1063   if (ll)  {
1064     ierr = VecGetLocalSize(ll,&s1); CHKERRQ(ierr);
1065     ierr = MatGetLocalSize(A,&s2,&s3); CHKERRQ(ierr);
1066     if (s1!=s2) SETERRQ(1,0,"non-conforming local sizes");
1067     ierr = MatDiagonalScale(a,ll,0); CHKERRQ(ierr);
1068     ierr = MatDiagonalScale(b,ll,0); CHKERRQ(ierr);
1069   }
1070   if (rr) SETERRQ(1,0,"not supported for right vector");
1071   return 0;
1072 }
1073 
1074 /* the code does not do the diagonal entries correctly unless the
1075    matrix is square and the column and row owerships are identical.
1076    This is a BUG. The only way to fix it seems to be to access
1077    baij->A and baij->B directly and not through the MatZeroRows()
1078    routine.
1079 */
1080 #undef __FUNC__
1081 #define __FUNC__ "MatZeroRows_MPIBAIJ"
1082 static int MatZeroRows_MPIBAIJ(Mat A,IS is,Scalar *diag)
1083 {
1084   Mat_MPIBAIJ    *l = (Mat_MPIBAIJ *) A->data;
1085   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
1086   int            *procs,*nprocs,j,found,idx,nsends,*work;
1087   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
1088   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
1089   int            *lens,imdex,*lrows,*values,bs=l->bs;
1090   MPI_Comm       comm = A->comm;
1091   MPI_Request    *send_waits,*recv_waits;
1092   MPI_Status     recv_status,*send_status;
1093   IS             istmp;
1094 
1095   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
1096   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
1097 
1098   /*  first count number of contributors to each processor */
1099   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
1100   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
1101   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
1102   for ( i=0; i<N; i++ ) {
1103     idx = rows[i];
1104     found = 0;
1105     for ( j=0; j<size; j++ ) {
1106       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
1107         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
1108       }
1109     }
1110     if (!found) SETERRQ(1,0,"Index out of range");
1111   }
1112   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
1113 
1114   /* inform other processors of number of messages and max length*/
1115   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
1116   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
1117   nrecvs = work[rank];
1118   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
1119   nmax = work[rank];
1120   PetscFree(work);
1121 
1122   /* post receives:   */
1123   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
1124   CHKPTRQ(rvalues);
1125   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
1126   CHKPTRQ(recv_waits);
1127   for ( i=0; i<nrecvs; i++ ) {
1128     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
1129   }
1130 
1131   /* do sends:
1132       1) starts[i] gives the starting index in svalues for stuff going to
1133          the ith processor
1134   */
1135   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
1136   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
1137   CHKPTRQ(send_waits);
1138   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
1139   starts[0] = 0;
1140   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1141   for ( i=0; i<N; i++ ) {
1142     svalues[starts[owner[i]]++] = rows[i];
1143   }
1144   ISRestoreIndices(is,&rows);
1145 
1146   starts[0] = 0;
1147   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1148   count = 0;
1149   for ( i=0; i<size; i++ ) {
1150     if (procs[i]) {
1151       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
1152     }
1153   }
1154   PetscFree(starts);
1155 
1156   base = owners[rank]*bs;
1157 
1158   /*  wait on receives */
1159   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
1160   source = lens + nrecvs;
1161   count  = nrecvs; slen = 0;
1162   while (count) {
1163     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
1164     /* unpack receives into our local space */
1165     MPI_Get_count(&recv_status,MPI_INT,&n);
1166     source[imdex]  = recv_status.MPI_SOURCE;
1167     lens[imdex]  = n;
1168     slen += n;
1169     count--;
1170   }
1171   PetscFree(recv_waits);
1172 
1173   /* move the data into the send scatter */
1174   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
1175   count = 0;
1176   for ( i=0; i<nrecvs; i++ ) {
1177     values = rvalues + i*nmax;
1178     for ( j=0; j<lens[i]; j++ ) {
1179       lrows[count++] = values[j] - base;
1180     }
1181   }
1182   PetscFree(rvalues); PetscFree(lens);
1183   PetscFree(owner); PetscFree(nprocs);
1184 
1185   /* actually zap the local rows */
1186   ierr = ISCreateGeneral(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
1187   PLogObjectParent(A,istmp);
1188   PetscFree(lrows);
1189   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
1190   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
1191   ierr = ISDestroy(istmp); CHKERRQ(ierr);
1192 
1193   /* wait on sends */
1194   if (nsends) {
1195     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
1196     CHKPTRQ(send_status);
1197     MPI_Waitall(nsends,send_waits,send_status);
1198     PetscFree(send_status);
1199   }
1200   PetscFree(send_waits); PetscFree(svalues);
1201 
1202   return 0;
1203 }
1204 extern int MatPrintHelp_SeqBAIJ(Mat);
1205 #undef __FUNC__
1206 #define __FUNC__ "MatPrintHelp_MPIBAIJ"
1207 static int MatPrintHelp_MPIBAIJ(Mat A)
1208 {
1209   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1210 
1211   if (!a->rank) return MatPrintHelp_SeqBAIJ(a->A);
1212   else return 0;
1213 }
1214 
1215 #undef __FUNC__
1216 #define __FUNC__ "MatSetUnfactored_MPIBAIJ"
1217 static int MatSetUnfactored_MPIBAIJ(Mat A)
1218 {
1219   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1220   int         ierr;
1221   ierr = MatSetUnfactored(a->A); CHKERRQ(ierr);
1222   return 0;
1223 }
1224 
1225 static int MatConvertSameType_MPIBAIJ(Mat,Mat *,int);
1226 
1227 /* -------------------------------------------------------------------*/
1228 static struct _MatOps MatOps = {
1229   MatSetValues_MPIBAIJ,MatGetRow_MPIBAIJ,MatRestoreRow_MPIBAIJ,MatMult_MPIBAIJ,
1230   MatMultAdd_MPIBAIJ,MatMultTrans_MPIBAIJ,MatMultTransAdd_MPIBAIJ,0,
1231   0,0,0,0,
1232   0,0,MatTranspose_MPIBAIJ,MatGetInfo_MPIBAIJ,
1233   0,MatGetDiagonal_MPIBAIJ,MatDiagonalScale_MPIBAIJ,MatNorm_MPIBAIJ,
1234   MatAssemblyBegin_MPIBAIJ,MatAssemblyEnd_MPIBAIJ,0,MatSetOption_MPIBAIJ,
1235   MatZeroEntries_MPIBAIJ,MatZeroRows_MPIBAIJ,0,
1236   0,0,0,MatGetSize_MPIBAIJ,
1237   MatGetLocalSize_MPIBAIJ,MatGetOwnershipRange_MPIBAIJ,0,0,
1238   0,0,0,MatConvertSameType_MPIBAIJ,0,0,
1239   0,0,0,MatGetSubMatrices_MPIBAIJ,
1240   MatIncreaseOverlap_MPIBAIJ,MatGetValues_MPIBAIJ,0,MatPrintHelp_MPIBAIJ,
1241   MatScale_MPIBAIJ,0,0,0,MatGetBlockSize_MPIBAIJ,
1242   0,0,0,0,0,0,MatSetUnfactored_MPIBAIJ};
1243 
1244 
1245 #undef __FUNC__
1246 #define __FUNC__ "MatCreateMPIBAIJ"
1247 /*@C
1248    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
1249    (block compressed row).  For good matrix assembly performance
1250    the user should preallocate the matrix storage by setting the parameters
1251    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1252    performance can be increased by more than a factor of 50.
1253 
1254    Input Parameters:
1255 .  comm - MPI communicator
1256 .  bs   - size of blockk
1257 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1258            This value should be the same as the local size used in creating the
1259            y vector for the matrix-vector product y = Ax.
1260 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1261            This value should be the same as the local size used in creating the
1262            x vector for the matrix-vector product y = Ax.
1263 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1264 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1265 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
1266            submatrix  (same for all local rows)
1267 .  d_nzz - array containing the number of block nonzeros in the various block rows
1268            of the in diagonal portion of the local (possibly different for each block
1269            row) or PETSC_NULL.  You must leave room for the diagonal entry even if
1270            it is zero.
1271 .  o_nz  - number of block nonzeros per block row in the off-diagonal portion of local
1272            submatrix (same for all local rows).
1273 .  o_nzz - array containing the number of nonzeros in the various block rows of the
1274            off-diagonal portion of the local submatrix (possibly different for
1275            each block row) or PETSC_NULL.
1276 
1277    Output Parameter:
1278 .  A - the matrix
1279 
1280    Notes:
1281    The user MUST specify either the local or global matrix dimensions
1282    (possibly both).
1283 
1284    Storage Information:
1285    For a square global matrix we define each processor's diagonal portion
1286    to be its local rows and the corresponding columns (a square submatrix);
1287    each processor's off-diagonal portion encompasses the remainder of the
1288    local matrix (a rectangular submatrix).
1289 
1290    The user can specify preallocated storage for the diagonal part of
1291    the local submatrix with either d_nz or d_nnz (not both).  Set
1292    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1293    memory allocation.  Likewise, specify preallocated storage for the
1294    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1295 
1296    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1297    the figure below we depict these three local rows and all columns (0-11).
1298 
1299 $          0 1 2 3 4 5 6 7 8 9 10 11
1300 $         -------------------
1301 $  row 3  |  o o o d d d o o o o o o
1302 $  row 4  |  o o o d d d o o o o o o
1303 $  row 5  |  o o o d d d o o o o o o
1304 $         -------------------
1305 $
1306 
1307    Thus, any entries in the d locations are stored in the d (diagonal)
1308    submatrix, and any entries in the o locations are stored in the
1309    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1310    stored simply in the MATSEQBAIJ format for compressed row storage.
1311 
1312    Now d_nz should indicate the number of nonzeros per row in the d matrix,
1313    and o_nz should indicate the number of nonzeros per row in the o matrix.
1314    In general, for PDE problems in which most nonzeros are near the diagonal,
1315    one expects d_nz >> o_nz.   For large problems you MUST preallocate memory
1316    or you will get TERRIBLE performance; see the users' manual chapter on
1317    matrices.
1318 
1319 .keywords: matrix, block, aij, compressed row, sparse, parallel
1320 
1321 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues()
1322 @*/
1323 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
1324                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1325 {
1326   Mat          B;
1327   Mat_MPIBAIJ  *b;
1328   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE,size;
1329 
1330   if (bs < 1) SETERRQ(1,0,"invalid block size specified");
1331   *A = 0;
1332   PetscHeaderCreate(B,_Mat,MAT_COOKIE,MATMPIBAIJ,comm);
1333   PLogObjectCreate(B);
1334   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
1335   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
1336   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1337   MPI_Comm_size(comm,&size);
1338   if (size == 1) {
1339     B->ops.getrowij          = MatGetRowIJ_MPIBAIJ;
1340     B->ops.restorerowij      = MatRestoreRowIJ_MPIBAIJ;
1341     B->ops.lufactorsymbolic  = MatLUFactorSymbolic_MPIBAIJ;
1342     B->ops.lufactornumeric   = MatLUFactorNumeric_MPIBAIJ;
1343     B->ops.lufactor          = MatLUFactor_MPIBAIJ;
1344     B->ops.solve             = MatSolve_MPIBAIJ;
1345     B->ops.solveadd          = MatSolveAdd_MPIBAIJ;
1346     B->ops.solvetrans        = MatSolveTrans_MPIBAIJ;
1347     B->ops.solvetransadd     = MatSolveTransAdd_MPIBAIJ;
1348     B->ops.ilufactorsymbolic = MatILUFactorSymbolic_MPIBAIJ;
1349   }
1350 
1351   B->destroy    = MatDestroy_MPIBAIJ;
1352   B->view       = MatView_MPIBAIJ;
1353   B->mapping    = 0;
1354   B->factor     = 0;
1355   B->assembled  = PETSC_FALSE;
1356 
1357   b->insertmode = NOT_SET_VALUES;
1358   MPI_Comm_rank(comm,&b->rank);
1359   MPI_Comm_size(comm,&b->size);
1360 
1361   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
1362     SETERRQ(1,0,"Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1363   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) SETERRQ(1,0,"either M or m should be specified");
1364   if ( M == PETSC_DECIDE && n == PETSC_DECIDE)SETERRQ(1,0,"either N or n should be specified");
1365   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
1366   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
1367 
1368   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1369     work[0] = m; work[1] = n;
1370     mbs = m/bs; nbs = n/bs;
1371     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
1372     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
1373     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
1374   }
1375   if (m == PETSC_DECIDE) {
1376     Mbs = M/bs;
1377     if (Mbs*bs != M) SETERRQ(1,0,"No of global rows must be divisible by blocksize");
1378     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
1379     m   = mbs*bs;
1380   }
1381   if (n == PETSC_DECIDE) {
1382     Nbs = N/bs;
1383     if (Nbs*bs != N) SETERRQ(1,0,"No of global cols must be divisible by blocksize");
1384     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
1385     n   = nbs*bs;
1386   }
1387   if (mbs*bs != m || nbs*bs != n) SETERRQ(1,0,"No of local rows, cols must be divisible by blocksize");
1388 
1389   b->m = m; B->m = m;
1390   b->n = n; B->n = n;
1391   b->N = N; B->N = N;
1392   b->M = M; B->M = M;
1393   b->bs  = bs;
1394   b->bs2 = bs*bs;
1395   b->mbs = mbs;
1396   b->nbs = nbs;
1397   b->Mbs = Mbs;
1398   b->Nbs = Nbs;
1399 
1400   /* build local table of row and column ownerships */
1401   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1402   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
1403   b->cowners = b->rowners + b->size + 2;
1404   MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
1405   b->rowners[0] = 0;
1406   for ( i=2; i<=b->size; i++ ) {
1407     b->rowners[i] += b->rowners[i-1];
1408   }
1409   b->rstart    = b->rowners[b->rank];
1410   b->rend      = b->rowners[b->rank+1];
1411   b->rstart_bs = b->rstart * bs;
1412   b->rend_bs   = b->rend * bs;
1413 
1414   MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
1415   b->cowners[0] = 0;
1416   for ( i=2; i<=b->size; i++ ) {
1417     b->cowners[i] += b->cowners[i-1];
1418   }
1419   b->cstart    = b->cowners[b->rank];
1420   b->cend      = b->cowners[b->rank+1];
1421   b->cstart_bs = b->cstart * bs;
1422   b->cend_bs   = b->cend * bs;
1423 
1424   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1425   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1426   PLogObjectParent(B,b->A);
1427   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1428   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1429   PLogObjectParent(B,b->B);
1430 
1431   /* build cache for off array entries formed */
1432   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1433   b->donotstash  = 0;
1434   b->colmap      = 0;
1435   b->garray      = 0;
1436   b->roworiented = 1;
1437 
1438   /* stuff used for matrix vector multiply */
1439   b->lvec      = 0;
1440   b->Mvctx     = 0;
1441 
1442   /* stuff for MatGetRow() */
1443   b->rowindices   = 0;
1444   b->rowvalues    = 0;
1445   b->getrowactive = PETSC_FALSE;
1446 
1447   *A = B;
1448   return 0;
1449 }
1450 
1451 #undef __FUNC__
1452 #define __FUNC__ "MatConvertSameType_MPIBAIJ"
1453 static int MatConvertSameType_MPIBAIJ(Mat matin,Mat *newmat,int cpvalues)
1454 {
1455   Mat        mat;
1456   Mat_MPIBAIJ *a,*oldmat = (Mat_MPIBAIJ *) matin->data;
1457   int        ierr, len=0, flg;
1458 
1459   *newmat       = 0;
1460   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIBAIJ,matin->comm);
1461   PLogObjectCreate(mat);
1462   mat->data       = (void *) (a = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(a);
1463   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1464   mat->destroy    = MatDestroy_MPIBAIJ;
1465   mat->view       = MatView_MPIBAIJ;
1466   mat->factor     = matin->factor;
1467   mat->assembled  = PETSC_TRUE;
1468 
1469   a->m = mat->m   = oldmat->m;
1470   a->n = mat->n   = oldmat->n;
1471   a->M = mat->M   = oldmat->M;
1472   a->N = mat->N   = oldmat->N;
1473 
1474   a->bs  = oldmat->bs;
1475   a->bs2 = oldmat->bs2;
1476   a->mbs = oldmat->mbs;
1477   a->nbs = oldmat->nbs;
1478   a->Mbs = oldmat->Mbs;
1479   a->Nbs = oldmat->Nbs;
1480 
1481   a->rstart       = oldmat->rstart;
1482   a->rend         = oldmat->rend;
1483   a->cstart       = oldmat->cstart;
1484   a->cend         = oldmat->cend;
1485   a->size         = oldmat->size;
1486   a->rank         = oldmat->rank;
1487   a->insertmode   = NOT_SET_VALUES;
1488   a->rowvalues    = 0;
1489   a->getrowactive = PETSC_FALSE;
1490 
1491   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1492   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
1493   a->cowners = a->rowners + a->size + 2;
1494   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1495   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1496   if (oldmat->colmap) {
1497     a->colmap = (int *) PetscMalloc((a->Nbs)*sizeof(int));CHKPTRQ(a->colmap);
1498     PLogObjectMemory(mat,(a->Nbs)*sizeof(int));
1499     PetscMemcpy(a->colmap,oldmat->colmap,(a->Nbs)*sizeof(int));
1500   } else a->colmap = 0;
1501   if (oldmat->garray && (len = ((Mat_SeqBAIJ *) (oldmat->B->data))->nbs)) {
1502     a->garray = (int *) PetscMalloc(len*sizeof(int)); CHKPTRQ(a->garray);
1503     PLogObjectMemory(mat,len*sizeof(int));
1504     PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1505   } else a->garray = 0;
1506 
1507   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1508   PLogObjectParent(mat,a->lvec);
1509   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1510   PLogObjectParent(mat,a->Mvctx);
1511   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1512   PLogObjectParent(mat,a->A);
1513   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1514   PLogObjectParent(mat,a->B);
1515   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1516   if (flg) {
1517     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1518   }
1519   *newmat = mat;
1520   return 0;
1521 }
1522 
1523 #include "sys.h"
1524 
1525 #undef __FUNC__
1526 #define __FUNC__ "MatLoad_MPIBAIJ"
1527 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
1528 {
1529   Mat          A;
1530   int          i, nz, ierr, j,rstart, rend, fd;
1531   Scalar       *vals,*buf;
1532   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1533   MPI_Status   status;
1534   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
1535   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
1536   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,bs2,Mbs,mbs,extra_rows;
1537   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
1538   int          dcount,kmax,k,nzcount,tmp;
1539 
1540 
1541   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
1542   bs2  = bs*bs;
1543 
1544   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1545   if (!rank) {
1546     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1547     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
1548     if (header[0] != MAT_COOKIE) SETERRQ(1,0,"not matrix object");
1549   }
1550 
1551   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1552   M = header[1]; N = header[2];
1553 
1554   if (M != N) SETERRQ(1,0,"Can only do square matrices");
1555 
1556   /*
1557      This code adds extra rows to make sure the number of rows is
1558      divisible by the blocksize
1559   */
1560   Mbs        = M/bs;
1561   extra_rows = bs - M + bs*(Mbs);
1562   if (extra_rows == bs) extra_rows = 0;
1563   else                  Mbs++;
1564   if (extra_rows &&!rank) {
1565     PLogInfo(0,"MatLoad_MPIBAIJ:Padding loaded matrix to match blocksize\n");
1566   }
1567 
1568   /* determine ownership of all rows */
1569   mbs = Mbs/size + ((Mbs % size) > rank);
1570   m   = mbs * bs;
1571   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
1572   browners = rowners + size + 1;
1573   MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1574   rowners[0] = 0;
1575   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
1576   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
1577   rstart = rowners[rank];
1578   rend   = rowners[rank+1];
1579 
1580   /* distribute row lengths to all processors */
1581   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
1582   if (!rank) {
1583     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
1584     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1585     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
1586     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1587     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
1588     MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);
1589     PetscFree(sndcounts);
1590   }
1591   else {
1592     MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);
1593   }
1594 
1595   if (!rank) {
1596     /* calculate the number of nonzeros on each processor */
1597     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1598     PetscMemzero(procsnz,size*sizeof(int));
1599     for ( i=0; i<size; i++ ) {
1600       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
1601         procsnz[i] += rowlengths[j];
1602       }
1603     }
1604     PetscFree(rowlengths);
1605 
1606     /* determine max buffer needed and allocate it */
1607     maxnz = 0;
1608     for ( i=0; i<size; i++ ) {
1609       maxnz = PetscMax(maxnz,procsnz[i]);
1610     }
1611     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1612 
1613     /* read in my part of the matrix column indices  */
1614     nz = procsnz[0];
1615     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1616     mycols = ibuf;
1617     if (size == 1)  nz -= extra_rows;
1618     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1619     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
1620 
1621     /* read in every ones (except the last) and ship off */
1622     for ( i=1; i<size-1; i++ ) {
1623       nz = procsnz[i];
1624       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1625       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1626     }
1627     /* read in the stuff for the last proc */
1628     if ( size != 1 ) {
1629       nz = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
1630       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1631       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
1632       MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);
1633     }
1634     PetscFree(cols);
1635   }
1636   else {
1637     /* determine buffer space needed for message */
1638     nz = 0;
1639     for ( i=0; i<m; i++ ) {
1640       nz += locrowlens[i];
1641     }
1642     ibuf = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1643     mycols = ibuf;
1644     /* receive message of column indices*/
1645     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1646     MPI_Get_count(&status,MPI_INT,&maxnz);
1647     if (maxnz != nz) SETERRQ(1,0,"something is wrong with file");
1648   }
1649 
1650   /* loop over local rows, determining number of off diagonal entries */
1651   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
1652   odlens = dlens + (rend-rstart);
1653   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
1654   PetscMemzero(mask,3*Mbs*sizeof(int));
1655   masked1 = mask    + Mbs;
1656   masked2 = masked1 + Mbs;
1657   rowcount = 0; nzcount = 0;
1658   for ( i=0; i<mbs; i++ ) {
1659     dcount  = 0;
1660     odcount = 0;
1661     for ( j=0; j<bs; j++ ) {
1662       kmax = locrowlens[rowcount];
1663       for ( k=0; k<kmax; k++ ) {
1664         tmp = mycols[nzcount++]/bs;
1665         if (!mask[tmp]) {
1666           mask[tmp] = 1;
1667           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
1668           else masked1[dcount++] = tmp;
1669         }
1670       }
1671       rowcount++;
1672     }
1673 
1674     dlens[i]  = dcount;
1675     odlens[i] = odcount;
1676 
1677     /* zero out the mask elements we set */
1678     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
1679     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
1680   }
1681 
1682   /* create our matrix */
1683   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);
1684          CHKERRQ(ierr);
1685   A = *newmat;
1686   MatSetOption(A,MAT_COLUMNS_SORTED);
1687 
1688   if (!rank) {
1689     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
1690     /* read in my part of the matrix numerical values  */
1691     nz = procsnz[0];
1692     vals = buf;
1693     mycols = ibuf;
1694     if (size == 1)  nz -= extra_rows;
1695     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1696     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
1697 
1698     /* insert into matrix */
1699     jj      = rstart*bs;
1700     for ( i=0; i<m; i++ ) {
1701       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1702       mycols += locrowlens[i];
1703       vals   += locrowlens[i];
1704       jj++;
1705     }
1706     /* read in other processors (except the last one) and ship out */
1707     for ( i=1; i<size-1; i++ ) {
1708       nz = procsnz[i];
1709       vals = buf;
1710       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1711       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1712     }
1713     /* the last proc */
1714     if ( size != 1 ){
1715       nz = procsnz[i] - extra_rows;
1716       vals = buf;
1717       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1718       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
1719       MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);
1720     }
1721     PetscFree(procsnz);
1722   }
1723   else {
1724     /* receive numeric values */
1725     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
1726 
1727     /* receive message of values*/
1728     vals = buf;
1729     mycols = ibuf;
1730     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1731     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1732     if (maxnz != nz) SETERRQ(1,0,"something is wrong with file");
1733 
1734     /* insert into matrix */
1735     jj      = rstart*bs;
1736     for ( i=0; i<m; i++ ) {
1737       ierr    = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1738       mycols += locrowlens[i];
1739       vals   += locrowlens[i];
1740       jj++;
1741     }
1742   }
1743   PetscFree(locrowlens);
1744   PetscFree(buf);
1745   PetscFree(ibuf);
1746   PetscFree(rowners);
1747   PetscFree(dlens);
1748   PetscFree(mask);
1749   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1750   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1751   return 0;
1752 }
1753 
1754 
1755