xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision eada665151041bf9d7614ddbfca52443d2b77989)
1 #ifndef lint
2 static char vcid[] = "$Id: mpibaij.c,v 1.47 1997/01/13 17:27:40 balay Exp balay $";
3 #endif
4 
5 #include "src/mat/impls/baij/mpi/mpibaij.h"
6 #include "src/vec/vecimpl.h"
7 
8 #include "draw.h"
9 #include "pinclude/pviewer.h"
10 
11 extern int MatSetUpMultiply_MPIBAIJ(Mat);
12 extern int DisAssemble_MPIBAIJ(Mat);
13 extern int MatIncreaseOverlap_MPIBAIJ(Mat,int,IS *,int);
14 extern int MatGetSubMatrices_MPIBAIJ(Mat,int,IS *,IS *,MatGetSubMatrixCall,Mat **);
15 extern int MatLUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,Mat *);
16 extern int MatLUFactorNumeric_MPIBAIJ(Mat,Mat *);
17 extern int MatLUFactor_MPIBAIJ(Mat,IS,IS,double);
18 extern int MatSolve_MPIBAIJ(Mat,Vec,Vec);
19 extern int MatSolveAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
20 extern int MatSolveTrans_MPIBAIJ(Mat,Vec,Vec);
21 extern int MatSolveTransAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
22 extern int MatILUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,int,Mat *);
23 
24 
25 /*
26      Local utility routine that creates a mapping from the global column
27    number to the local number in the off-diagonal part of the local
28    storage of the matrix.  This is done in a non scable way since the
29    length of colmap equals the global matrix length.
30 */
31 #undef __FUNC__
32 #define __FUNC__ "CreateColmap_MPIBAIJ_Private"
33 static int CreateColmap_MPIBAIJ_Private(Mat mat)
34 {
35   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
36   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
37   int         nbs = B->nbs,i,bs=B->bs;;
38 
39   baij->colmap = (int *) PetscMalloc(baij->Nbs*sizeof(int));CHKPTRQ(baij->colmap);
40   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
41   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
42   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i*bs+1;
43   return 0;
44 }
45 
46 #undef __FUNC__
47 #define __FUNC__ "MatGetRowIJ_MPIBAIJ("
48 static int MatGetRowIJ_MPIBAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
49                             PetscTruth *done)
50 {
51   Mat_MPIBAIJ *aij = (Mat_MPIBAIJ *) mat->data;
52   int         ierr;
53   if (aij->size == 1) {
54     ierr = MatGetRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
55   } else SETERRQ(1,0,"not supported in parallel");
56   return 0;
57 }
58 
59 #undef __FUNC__
60 #define __FUNC__ "MatRestoreRowIJ_MPIBAIJ"
61 static int MatRestoreRowIJ_MPIBAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
62                                 PetscTruth *done)
63 {
64   Mat_MPIBAIJ *aij = (Mat_MPIBAIJ *) mat->data;
65   int        ierr;
66   if (aij->size == 1) {
67     ierr = MatRestoreRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
68   } else SETERRQ(1,0,"not supported in parallel");
69   return 0;
70 }
71 #define CHUNKSIZE  10
72 
73 #define  MatSetValues_SeqBAIJ_A_Private( row, col, value) \
74 { \
75  \
76     brow = row/bs;  \
77     rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
78     rmax = imax[brow]; nrow = ailen[brow]; \
79       bcol = col/bs; \
80       ridx = row % bs; cidx = col % bs; \
81       for ( _i=0; _i<nrow; _i++ ) { \
82         if (rp[_i] > bcol) break; \
83         if (rp[_i] == bcol) { \
84           bap  = ap +  bs2*_i + bs*cidx + ridx; \
85           if (addv == ADD_VALUES) *bap += value;  \
86           else                    *bap  = value;  \
87           goto _noinsert; \
88         } \
89       } \
90       if (nrow >= rmax) { \
91         /* there is no extra room in row, therefore enlarge */ \
92         int    new_nz = ai[a->mbs] + CHUNKSIZE,len,*new_i,*new_j; \
93         Scalar *new_a; \
94  \
95         /* malloc new storage space */ \
96         len     = new_nz*(sizeof(int)+bs2*sizeof(Scalar))+(a->mbs+1)*sizeof(int); \
97         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
98         new_j   = (int *) (new_a + bs2*new_nz); \
99         new_i   = new_j + new_nz; \
100  \
101         /* copy over old data into new slots */ \
102         for ( ii=0; ii<brow+1; ii++ ) {new_i[ii] = ai[ii];} \
103         for ( ii=brow+1; ii<a->mbs+1; ii++ ) {new_i[ii] = ai[ii]+CHUNKSIZE;} \
104         PetscMemcpy(new_j,aj,(ai[brow]+nrow)*sizeof(int)); \
105         len = (new_nz - CHUNKSIZE - ai[brow] - nrow); \
106         PetscMemcpy(new_j+ai[brow]+nrow+CHUNKSIZE,aj+ai[brow]+nrow, \
107                                                            len*sizeof(int)); \
108         PetscMemcpy(new_a,aa,(ai[brow]+nrow)*bs2*sizeof(Scalar)); \
109         PetscMemzero(new_a+bs2*(ai[brow]+nrow),bs2*CHUNKSIZE*sizeof(Scalar)); \
110         PetscMemcpy(new_a+bs2*(ai[brow]+nrow+CHUNKSIZE), \
111                     aa+bs2*(ai[brow]+nrow),bs2*len*sizeof(Scalar));  \
112         /* free up old matrix storage */ \
113         PetscFree(a->a);  \
114         if (!a->singlemalloc) {PetscFree(a->i);PetscFree(a->j);} \
115         aa = a->a = new_a; ai = a->i = new_i; aj = a->j = new_j;  \
116         a->singlemalloc = 1; \
117  \
118         rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
119         rmax = imax[brow] = imax[brow] + CHUNKSIZE; \
120         PLogObjectMemory(A,CHUNKSIZE*(sizeof(int) + bs2*sizeof(Scalar))); \
121         a->maxnz += bs2*CHUNKSIZE; \
122         a->reallocs++; \
123         a->nz++; \
124       } \
125       N = nrow++ - 1;  \
126       /* shift up all the later entries in this row */ \
127       for ( ii=N; ii>=_i; ii-- ) { \
128         rp[ii+1] = rp[ii]; \
129         PetscMemcpy(ap+bs2*(ii+1),ap+bs2*(ii),bs2*sizeof(Scalar)); \
130       } \
131       if (N>=_i) PetscMemzero(ap+bs2*_i,bs2*sizeof(Scalar));  \
132       rp[_i]                      = bcol;  \
133       ap[bs2*_i + bs*cidx + ridx] = value;  \
134       _noinsert:; \
135     ailen[brow] = nrow; \
136 }
137 
138 extern int MatSetValues_SeqBAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
139 #undef __FUNC__
140 #define __FUNC__ "MatSetValues_MPIBAIJ"
141 static int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
142 {
143   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
144   Scalar      value;
145   int         ierr,i,j,row,col;
146   int         roworiented = baij->roworiented,rstart_orig=baij->rstart_bs ;
147   int         rend_orig=baij->rend_bs,cstart_orig=baij->cstart_bs;
148   int         cend_orig=baij->cend_bs,bs=baij->bs;
149 
150   /* Some Variables required in the macro */
151   Mat A = baij->A;
152   Mat_SeqBAIJ *a = (Mat_SeqBAIJ *) (A)->data;
153   int         *rp,ii,nrow,_i,rmax,N;
154   int         *imax=a->imax,*ai=a->i,*ailen=a->ilen;
155   int         *aj=a->j,brow,bcol;
156   int          ridx,cidx,bs2=a->bs2;
157   Scalar      *ap,*aa=a->a,*bap;
158 
159 #if defined(PETSC_BOPT_g)
160   if (baij->insertmode != NOT_SET_VALUES && baij->insertmode != addv) {
161     SETERRQ(1,0,"Cannot mix inserts and adds");
162   }
163 #endif
164   baij->insertmode = addv;
165   for ( i=0; i<m; i++ ) {
166 #if defined(PETSC_BOPT_g)
167     if (im[i] < 0) SETERRQ(1,0,"Negative row");
168     if (im[i] >= baij->M) SETERRQ(1,0,"Row too large");
169 #endif
170     if (im[i] >= rstart_orig && im[i] < rend_orig) {
171       row = im[i] - rstart_orig;
172       for ( j=0; j<n; j++ ) {
173         if (in[j] >= cstart_orig && in[j] < cend_orig){
174           col = in[j] - cstart_orig;
175           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
176           MatSetValues_SeqBAIJ_A_Private(row,col,value);
177           /* ierr = MatSetValues_SeqBAIJ(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
178         }
179 #if defined(PETSC_BOPT_g)
180         else if (in[j] < 0) {SETERRQ(1,0,"Negative column");}
181         else if (in[j] >= baij->N) {SETERRQ(1,0,"Col too large");}
182 #endif
183         else {
184           if (mat->was_assembled) {
185             if (!baij->colmap) {
186               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
187             }
188             col = baij->colmap[in[j]/bs] - 1 + in[j]%bs;
189             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
190               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
191               col =  in[j];
192             }
193           }
194           else col = in[j];
195           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
196           ierr = MatSetValues_SeqBAIJ(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
197         }
198       }
199     }
200     else {
201       if (roworiented && !baij->donotstash) {
202         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
203       }
204       else {
205         if (!baij->donotstash) {
206           row = im[i];
207 	  for ( j=0; j<n; j++ ) {
208 	    ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
209           }
210         }
211       }
212     }
213   }
214   return 0;
215 }
216 
217 #undef __FUNC__
218 #define __FUNC__ "MatGetValues_MPIBAIJ"
219 static int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
220 {
221   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
222   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
223   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col;
224 
225   for ( i=0; i<m; i++ ) {
226     if (idxm[i] < 0) SETERRQ(1,0,"Negative row");
227     if (idxm[i] >= baij->M) SETERRQ(1,0,"Row too large");
228     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
229       row = idxm[i] - bsrstart;
230       for ( j=0; j<n; j++ ) {
231         if (idxn[j] < 0) SETERRQ(1,0,"Negative column");
232         if (idxn[j] >= baij->N) SETERRQ(1,0,"Col too large");
233         if (idxn[j] >= bscstart && idxn[j] < bscend){
234           col = idxn[j] - bscstart;
235           ierr = MatGetValues(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
236         }
237         else {
238           if (!baij->colmap) {
239             ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
240           }
241           if((baij->colmap[idxn[j]/bs]-1 < 0) ||
242              (baij->garray[(baij->colmap[idxn[j]/bs]-1)/bs] != idxn[j]/bs)) *(v+i*n+j) = 0.0;
243           else {
244             col  = (baij->colmap[idxn[j]/bs]-1) + idxn[j]%bs;
245             ierr = MatGetValues(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
246           }
247         }
248       }
249     }
250     else {
251       SETERRQ(1,0,"Only local values currently supported");
252     }
253   }
254   return 0;
255 }
256 
257 #undef __FUNC__
258 #define __FUNC__ "MatNorm_MPIBAIJ"
259 static int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
260 {
261   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
262   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
263   int        ierr, i,bs2=baij->bs2;
264   double     sum = 0.0;
265   Scalar     *v;
266 
267   if (baij->size == 1) {
268     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
269   } else {
270     if (type == NORM_FROBENIUS) {
271       v = amat->a;
272       for (i=0; i<amat->nz*bs2; i++ ) {
273 #if defined(PETSC_COMPLEX)
274         sum += real(conj(*v)*(*v)); v++;
275 #else
276         sum += (*v)*(*v); v++;
277 #endif
278       }
279       v = bmat->a;
280       for (i=0; i<bmat->nz*bs2; i++ ) {
281 #if defined(PETSC_COMPLEX)
282         sum += real(conj(*v)*(*v)); v++;
283 #else
284         sum += (*v)*(*v); v++;
285 #endif
286       }
287       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
288       *norm = sqrt(*norm);
289     }
290     else
291       SETERRQ(PETSC_ERR_SUP,0,"No support for this norm yet");
292   }
293   return 0;
294 }
295 
296 #undef __FUNC__
297 #define __FUNC__ "MatAssemblyBegin_MPIBAIJ"
298 static int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
299 {
300   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
301   MPI_Comm    comm = mat->comm;
302   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
303   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
304   MPI_Request *send_waits,*recv_waits;
305   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
306   InsertMode  addv;
307   Scalar      *rvalues,*svalues;
308 
309   /* make sure all processors are either in INSERTMODE or ADDMODE */
310   MPI_Allreduce(&baij->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
311   if (addv == (ADD_VALUES|INSERT_VALUES)) {
312     SETERRQ(1,0,"Some processors inserted others added");
313   }
314   baij->insertmode = addv; /* in case this processor had no cache */
315 
316   /*  first count number of contributors to each processor */
317   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
318   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
319   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
320   for ( i=0; i<baij->stash.n; i++ ) {
321     idx = baij->stash.idx[i];
322     for ( j=0; j<size; j++ ) {
323       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
324         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
325       }
326     }
327   }
328   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
329 
330   /* inform other processors of number of messages and max length*/
331   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
332   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
333   nreceives = work[rank];
334   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
335   nmax = work[rank];
336   PetscFree(work);
337 
338   /* post receives:
339        1) each message will consist of ordered pairs
340      (global index,value) we store the global index as a double
341      to simplify the message passing.
342        2) since we don't know how long each individual message is we
343      allocate the largest needed buffer for each receive. Potentially
344      this is a lot of wasted space.
345 
346 
347        This could be done better.
348   */
349   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
350   CHKPTRQ(rvalues);
351   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
352   CHKPTRQ(recv_waits);
353   for ( i=0; i<nreceives; i++ ) {
354     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
355               comm,recv_waits+i);
356   }
357 
358   /* do sends:
359       1) starts[i] gives the starting index in svalues for stuff going to
360          the ith processor
361   */
362   svalues = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
363   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
364   CHKPTRQ(send_waits);
365   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
366   starts[0] = 0;
367   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
368   for ( i=0; i<baij->stash.n; i++ ) {
369     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
370     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
371     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
372   }
373   PetscFree(owner);
374   starts[0] = 0;
375   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
376   count = 0;
377   for ( i=0; i<size; i++ ) {
378     if (procs[i]) {
379       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
380                 comm,send_waits+count++);
381     }
382   }
383   PetscFree(starts); PetscFree(nprocs);
384 
385   /* Free cache space */
386   PLogInfo(mat,"MatAssemblyBegin_MPIBAIJ:Number of off-processor values %d\n",baij->stash.n);
387   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
388 
389   baij->svalues    = svalues;    baij->rvalues    = rvalues;
390   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
391   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
392   baij->rmax       = nmax;
393 
394   return 0;
395 }
396 
397 
398 #undef __FUNC__
399 #define __FUNC__ "MatAssemblyEnd_MPIBAIJ"
400 static int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
401 {
402   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
403   MPI_Status  *send_status,recv_status;
404   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
405   int         bs=baij->bs,row,col,other_disassembled;
406   Scalar      *values,val;
407   InsertMode  addv = baij->insertmode;
408 
409   /*  wait on receives */
410   while (count) {
411     MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);
412     /* unpack receives into our local space */
413     values = baij->rvalues + 3*imdex*baij->rmax;
414     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
415     n = n/3;
416     for ( i=0; i<n; i++ ) {
417       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
418       col = (int) PetscReal(values[3*i+1]);
419       val = values[3*i+2];
420       if (col >= baij->cstart*bs && col < baij->cend*bs) {
421         col -= baij->cstart*bs;
422         MatSetValues(baij->A,1,&row,1,&col,&val,addv);
423       }
424       else {
425         if (mat->was_assembled) {
426           if (!baij->colmap) {
427             ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);
428           }
429           col = (baij->colmap[col/bs]-1)*bs + col%bs;
430           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
431             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
432             col = (int) PetscReal(values[3*i+1]);
433           }
434         }
435         MatSetValues(baij->B,1,&row,1,&col,&val,addv);
436       }
437     }
438     count--;
439   }
440   PetscFree(baij->recv_waits); PetscFree(baij->rvalues);
441 
442   /* wait on sends */
443   if (baij->nsends) {
444     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));
445     CHKPTRQ(send_status);
446     MPI_Waitall(baij->nsends,baij->send_waits,send_status);
447     PetscFree(send_status);
448   }
449   PetscFree(baij->send_waits); PetscFree(baij->svalues);
450 
451   baij->insertmode = NOT_SET_VALUES;
452   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
453   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
454 
455   /* determine if any processor has disassembled, if so we must
456      also disassemble ourselfs, in order that we may reassemble. */
457   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
458   if (mat->was_assembled && !other_disassembled) {
459     ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
460   }
461 
462   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
463     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
464   }
465   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
466   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
467 
468   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
469   return 0;
470 }
471 
472 #undef __FUNC__
473 #define __FUNC__ "MatView_MPIBAIJ_Binary"
474 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
475 {
476   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
477   int          ierr;
478 
479   if (baij->size == 1) {
480     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
481   }
482   else SETERRQ(1,0,"Only uniprocessor output supported");
483   return 0;
484 }
485 
486 #undef __FUNC__
487 #define __FUNC__ "MatView_MPIBAIJ_ASCIIorDraworMatlab"
488 static int MatView_MPIBAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
489 {
490   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
491   int          ierr, format,rank,bs = baij->bs;
492   FILE         *fd;
493   ViewerType   vtype;
494 
495   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
496   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
497     ierr = ViewerGetFormat(viewer,&format);
498     if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
499       MatInfo info;
500       MPI_Comm_rank(mat->comm,&rank);
501       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
502       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
503       PetscSequentialPhaseBegin(mat->comm,1);
504       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
505               rank,baij->m,(int)info.nz_used*bs,(int)info.nz_allocated*bs,
506               baij->bs,(int)info.memory);
507       ierr = MatGetInfo(baij->A,MAT_LOCAL,&info);
508       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
509       ierr = MatGetInfo(baij->B,MAT_LOCAL,&info);
510       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
511       fflush(fd);
512       PetscSequentialPhaseEnd(mat->comm,1);
513       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
514       return 0;
515     }
516     else if (format == VIEWER_FORMAT_ASCII_INFO) {
517       PetscPrintf(mat->comm,"  block size is %d\n",bs);
518       return 0;
519     }
520   }
521 
522   if (vtype == DRAW_VIEWER) {
523     Draw       draw;
524     PetscTruth isnull;
525     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
526     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
527   }
528 
529   if (vtype == ASCII_FILE_VIEWER) {
530     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
531     PetscSequentialPhaseBegin(mat->comm,1);
532     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
533            baij->rank,baij->m,baij->rstart*bs,baij->rend*bs,baij->n,
534             baij->cstart*bs,baij->cend*bs);
535     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
536     ierr = MatView(baij->B,viewer); CHKERRQ(ierr);
537     fflush(fd);
538     PetscSequentialPhaseEnd(mat->comm,1);
539   }
540   else {
541     int size = baij->size;
542     rank = baij->rank;
543     if (size == 1) {
544       ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
545     }
546     else {
547       /* assemble the entire matrix onto first processor. */
548       Mat         A;
549       Mat_SeqBAIJ *Aloc;
550       int         M = baij->M, N = baij->N,*ai,*aj,row,col,i,j,k,*rvals;
551       int         mbs=baij->mbs;
552       Scalar      *a;
553 
554       if (!rank) {
555         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
556         CHKERRQ(ierr);
557       }
558       else {
559         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
560         CHKERRQ(ierr);
561       }
562       PLogObjectParent(mat,A);
563 
564       /* copy over the A part */
565       Aloc = (Mat_SeqBAIJ*) baij->A->data;
566       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
567       row = baij->rstart;
568       rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
569 
570       for ( i=0; i<mbs; i++ ) {
571         rvals[0] = bs*(baij->rstart + i);
572         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
573         for ( j=ai[i]; j<ai[i+1]; j++ ) {
574           col = (baij->cstart+aj[j])*bs;
575           for (k=0; k<bs; k++ ) {
576             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
577             col++; a += bs;
578           }
579         }
580       }
581       /* copy over the B part */
582       Aloc = (Mat_SeqBAIJ*) baij->B->data;
583       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
584       row = baij->rstart*bs;
585       for ( i=0; i<mbs; i++ ) {
586         rvals[0] = bs*(baij->rstart + i);
587         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
588         for ( j=ai[i]; j<ai[i+1]; j++ ) {
589           col = baij->garray[aj[j]]*bs;
590           for (k=0; k<bs; k++ ) {
591             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
592             col++; a += bs;
593           }
594         }
595       }
596       PetscFree(rvals);
597       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
598       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
599       if (!rank) {
600         ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
601       }
602       ierr = MatDestroy(A); CHKERRQ(ierr);
603     }
604   }
605   return 0;
606 }
607 
608 
609 
610 #undef __FUNC__
611 #define __FUNC__ "MatView_MPIBAIJ"
612 static int MatView_MPIBAIJ(PetscObject obj,Viewer viewer)
613 {
614   Mat         mat = (Mat) obj;
615   int         ierr;
616   ViewerType  vtype;
617 
618   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
619   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
620       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
621     ierr = MatView_MPIBAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
622   }
623   else if (vtype == BINARY_FILE_VIEWER) {
624     return MatView_MPIBAIJ_Binary(mat,viewer);
625   }
626   return 0;
627 }
628 
629 #undef __FUNC__
630 #define __FUNC__ "MatDestroy_MPIBAIJ"
631 static int MatDestroy_MPIBAIJ(PetscObject obj)
632 {
633   Mat         mat = (Mat) obj;
634   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
635   int         ierr;
636 
637 #if defined(PETSC_LOG)
638   PLogObjectState(obj,"Rows=%d, Cols=%d",baij->M,baij->N);
639 #endif
640 
641   PetscFree(baij->rowners);
642   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
643   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
644   if (baij->colmap) PetscFree(baij->colmap);
645   if (baij->garray) PetscFree(baij->garray);
646   if (baij->lvec)   VecDestroy(baij->lvec);
647   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
648   if (baij->rowvalues) PetscFree(baij->rowvalues);
649   PetscFree(baij);
650   if (mat->mapping) {
651     ierr = ISLocalToGlobalMappingDestroy(mat->mapping); CHKERRQ(ierr);
652   }
653   PLogObjectDestroy(mat);
654   PetscHeaderDestroy(mat);
655   return 0;
656 }
657 
658 #undef __FUNC__
659 #define __FUNC__ "MatMult_MPIBAIJ"
660 static int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
661 {
662   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
663   int         ierr, nt;
664 
665   VecGetLocalSize_Fast(xx,nt);
666   if (nt != a->n) {
667     SETERRQ(1,0,"Incompatible parition of A and xx");
668   }
669   VecGetLocalSize_Fast(yy,nt);
670   if (nt != a->m) {
671     SETERRQ(1,0,"Incompatible parition of A and yy");
672   }
673   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
674   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
675   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
676   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
677   ierr = VecScatterPostRecvs(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
678   return 0;
679 }
680 
681 #undef __FUNC__
682 #define __FUNC__ "MatMultAdd_MPIBAIJ"
683 static int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
684 {
685   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
686   int        ierr;
687   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
688   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
689   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
690   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
691   return 0;
692 }
693 
694 #undef __FUNC__
695 #define __FUNC__ "MatMultTrans_MPIBAIJ"
696 static int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
697 {
698   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
699   int        ierr;
700 
701   /* do nondiagonal part */
702   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
703   /* send it on its way */
704   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
705   /* do local part */
706   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
707   /* receive remote parts: note this assumes the values are not actually */
708   /* inserted in yy until the next line, which is true for my implementation*/
709   /* but is not perhaps always true. */
710   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
711   return 0;
712 }
713 
714 #undef __FUNC__
715 #define __FUNC__ "MatMultTransAdd_MPIBAIJ"
716 static int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
717 {
718   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
719   int        ierr;
720 
721   /* do nondiagonal part */
722   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
723   /* send it on its way */
724   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
725   /* do local part */
726   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
727   /* receive remote parts: note this assumes the values are not actually */
728   /* inserted in yy until the next line, which is true for my implementation*/
729   /* but is not perhaps always true. */
730   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
731   return 0;
732 }
733 
734 /*
735   This only works correctly for square matrices where the subblock A->A is the
736    diagonal block
737 */
738 #undef __FUNC__
739 #define __FUNC__ "MatGetDiagonal_MPIBAIJ"
740 static int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
741 {
742   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
743   if (a->M != a->N)
744     SETERRQ(1,0,"Supports only square matrix where A->A is diag block");
745   return MatGetDiagonal(a->A,v);
746 }
747 
748 #undef __FUNC__
749 #define __FUNC__ "MatScale_MPIBAIJ"
750 static int MatScale_MPIBAIJ(Scalar *aa,Mat A)
751 {
752   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
753   int        ierr;
754   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
755   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
756   return 0;
757 }
758 
759 #undef __FUNC__
760 #define __FUNC__ "MatGetSize_MPIBAIJ"
761 static int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
762 {
763   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
764   *m = mat->M; *n = mat->N;
765   return 0;
766 }
767 
768 #undef __FUNC__
769 #define __FUNC__ "MatGetLocalSize_MPIBAIJ"
770 static int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
771 {
772   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
773   *m = mat->m; *n = mat->N;
774   return 0;
775 }
776 
777 #undef __FUNC__
778 #define __FUNC__ "MatGetOwnershipRange_MPIBAIJ"
779 static int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
780 {
781   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
782   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
783   return 0;
784 }
785 
786 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
787 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
788 
789 #undef __FUNC__
790 #define __FUNC__ "MatGetRow_MPIBAIJ"
791 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
792 {
793   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
794   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
795   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
796   int        nztot, nzA, nzB, lrow, brstart = mat->rstart*bs, brend = mat->rend*bs;
797   int        *cmap, *idx_p,cstart = mat->cstart;
798 
799   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,0,"Already active");
800   mat->getrowactive = PETSC_TRUE;
801 
802   if (!mat->rowvalues && (idx || v)) {
803     /*
804         allocate enough space to hold information from the longest row.
805     */
806     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
807     int     max = 1,mbs = mat->mbs,tmp;
808     for ( i=0; i<mbs; i++ ) {
809       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
810       if (max < tmp) { max = tmp; }
811     }
812     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
813     CHKPTRQ(mat->rowvalues);
814     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
815   }
816 
817 
818   if (row < brstart || row >= brend) SETERRQ(1,0,"Only local rows")
819   lrow = row - brstart;
820 
821   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
822   if (!v)   {pvA = 0; pvB = 0;}
823   if (!idx) {pcA = 0; if (!v) pcB = 0;}
824   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
825   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
826   nztot = nzA + nzB;
827 
828   cmap  = mat->garray;
829   if (v  || idx) {
830     if (nztot) {
831       /* Sort by increasing column numbers, assuming A and B already sorted */
832       int imark = -1;
833       if (v) {
834         *v = v_p = mat->rowvalues;
835         for ( i=0; i<nzB; i++ ) {
836           if (cmap[cworkB[i]/bs] < cstart)   v_p[i] = vworkB[i];
837           else break;
838         }
839         imark = i;
840         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
841         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
842       }
843       if (idx) {
844         *idx = idx_p = mat->rowindices;
845         if (imark > -1) {
846           for ( i=0; i<imark; i++ ) {
847             idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs;
848           }
849         } else {
850           for ( i=0; i<nzB; i++ ) {
851             if (cmap[cworkB[i]/bs] < cstart)
852               idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
853             else break;
854           }
855           imark = i;
856         }
857         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart*bs + cworkA[i];
858         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
859       }
860     }
861     else {
862       if (idx) *idx = 0;
863       if (v)   *v   = 0;
864     }
865   }
866   *nz = nztot;
867   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
868   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
869   return 0;
870 }
871 
872 #undef __FUNC__
873 #define __FUNC__ "MatRestoreRow_MPIBAIJ"
874 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
875 {
876   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
877   if (baij->getrowactive == PETSC_FALSE) {
878     SETERRQ(1,0,"MatGetRow not called");
879   }
880   baij->getrowactive = PETSC_FALSE;
881   return 0;
882 }
883 
884 #undef __FUNC__
885 #define __FUNC__ "MatGetBlockSize_MPIBAIJ"
886 static int MatGetBlockSize_MPIBAIJ(Mat mat,int *bs)
887 {
888   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
889   *bs = baij->bs;
890   return 0;
891 }
892 
893 #undef __FUNC__
894 #define __FUNC__ "MatZeroEntries_MPIBAIJ"
895 static int MatZeroEntries_MPIBAIJ(Mat A)
896 {
897   Mat_MPIBAIJ *l = (Mat_MPIBAIJ *) A->data;
898   int         ierr;
899   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
900   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
901   return 0;
902 }
903 
904 #undef __FUNC__
905 #define __FUNC__ "MatGetInfo_MPIBAIJ"
906 static int MatGetInfo_MPIBAIJ(Mat matin,MatInfoType flag,MatInfo *info)
907 {
908   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) matin->data;
909   Mat         A = a->A, B = a->B;
910   int         ierr;
911   double      isend[5], irecv[5];
912 
913   info->rows_global    = (double)a->M;
914   info->columns_global = (double)a->N;
915   info->rows_local     = (double)a->m;
916   info->columns_local  = (double)a->N;
917   info->block_size     = (double)a->bs;
918   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
919   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->memory;
920   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
921   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->memory;
922   if (flag == MAT_LOCAL) {
923     info->nz_used      = isend[0];
924     info->nz_allocated = isend[1];
925     info->nz_unneeded  = isend[2];
926     info->memory       = isend[3];
927     info->mallocs      = isend[4];
928   } else if (flag == MAT_GLOBAL_MAX) {
929     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_MAX,matin->comm);
930     info->nz_used      = irecv[0];
931     info->nz_allocated = irecv[1];
932     info->nz_unneeded  = irecv[2];
933     info->memory       = irecv[3];
934     info->mallocs      = irecv[4];
935   } else if (flag == MAT_GLOBAL_SUM) {
936     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_SUM,matin->comm);
937     info->nz_used      = irecv[0];
938     info->nz_allocated = irecv[1];
939     info->nz_unneeded  = irecv[2];
940     info->memory       = irecv[3];
941     info->mallocs      = irecv[4];
942   }
943   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
944   info->fill_ratio_needed = 0;
945   info->factor_mallocs    = 0;
946   return 0;
947 }
948 
949 #undef __FUNC__
950 #define __FUNC__ "MatSetOption_MPIBAIJ"
951 static int MatSetOption_MPIBAIJ(Mat A,MatOption op)
952 {
953   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
954 
955   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
956       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
957       op == MAT_COLUMNS_UNSORTED ||
958       op == MAT_COLUMNS_SORTED) {
959         MatSetOption(a->A,op);
960         MatSetOption(a->B,op);
961   } else if (op == MAT_ROW_ORIENTED) {
962         a->roworiented = 1;
963         MatSetOption(a->A,op);
964         MatSetOption(a->B,op);
965   } else if (op == MAT_ROWS_SORTED ||
966              op == MAT_ROWS_UNSORTED ||
967              op == MAT_SYMMETRIC ||
968              op == MAT_STRUCTURALLY_SYMMETRIC ||
969              op == MAT_YES_NEW_DIAGONALS)
970     PLogInfo(A,"Info:MatSetOption_MPIBAIJ:Option ignored\n");
971   else if (op == MAT_COLUMN_ORIENTED) {
972     a->roworiented = 0;
973     MatSetOption(a->A,op);
974     MatSetOption(a->B,op);
975   } else if (op == MAT_IGNORE_OFF_PROCESSOR_ENTRIES) {
976     a->donotstash = 1;
977   } else if (op == MAT_NO_NEW_DIAGONALS)
978     {SETERRQ(PETSC_ERR_SUP,0,"MAT_NO_NEW_DIAGONALS");}
979   else
980     {SETERRQ(PETSC_ERR_SUP,0,"unknown option");}
981   return 0;
982 }
983 
984 #undef __FUNC__
985 #define __FUNC__ "MatTranspose_MPIBAIJ("
986 static int MatTranspose_MPIBAIJ(Mat A,Mat *matout)
987 {
988   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) A->data;
989   Mat_SeqBAIJ *Aloc;
990   Mat        B;
991   int        ierr,M=baij->M,N=baij->N,*ai,*aj,row,i,*rvals,j,k,col;
992   int        bs=baij->bs,mbs=baij->mbs;
993   Scalar     *a;
994 
995   if (matout == PETSC_NULL && M != N)
996     SETERRQ(1,0,"Square matrix only for in-place");
997   ierr = MatCreateMPIBAIJ(A->comm,baij->bs,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);
998   CHKERRQ(ierr);
999 
1000   /* copy over the A part */
1001   Aloc = (Mat_SeqBAIJ*) baij->A->data;
1002   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1003   row = baij->rstart;
1004   rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
1005 
1006   for ( i=0; i<mbs; i++ ) {
1007     rvals[0] = bs*(baij->rstart + i);
1008     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1009     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1010       col = (baij->cstart+aj[j])*bs;
1011       for (k=0; k<bs; k++ ) {
1012         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1013         col++; a += bs;
1014       }
1015     }
1016   }
1017   /* copy over the B part */
1018   Aloc = (Mat_SeqBAIJ*) baij->B->data;
1019   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1020   row = baij->rstart*bs;
1021   for ( i=0; i<mbs; i++ ) {
1022     rvals[0] = bs*(baij->rstart + i);
1023     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1024     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1025       col = baij->garray[aj[j]]*bs;
1026       for (k=0; k<bs; k++ ) {
1027         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1028         col++; a += bs;
1029       }
1030     }
1031   }
1032   PetscFree(rvals);
1033   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1034   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1035 
1036   if (matout != PETSC_NULL) {
1037     *matout = B;
1038   } else {
1039     /* This isn't really an in-place transpose .... but free data structures from baij */
1040     PetscFree(baij->rowners);
1041     ierr = MatDestroy(baij->A); CHKERRQ(ierr);
1042     ierr = MatDestroy(baij->B); CHKERRQ(ierr);
1043     if (baij->colmap) PetscFree(baij->colmap);
1044     if (baij->garray) PetscFree(baij->garray);
1045     if (baij->lvec) VecDestroy(baij->lvec);
1046     if (baij->Mvctx) VecScatterDestroy(baij->Mvctx);
1047     PetscFree(baij);
1048     PetscMemcpy(A,B,sizeof(struct _Mat));
1049     PetscHeaderDestroy(B);
1050   }
1051   return 0;
1052 }
1053 
1054 #undef __FUNC__
1055 #define __FUNC__ "MatDiagonalScale_MPIBAIJ"
1056 int MatDiagonalScale_MPIBAIJ(Mat A,Vec ll,Vec rr)
1057 {
1058   Mat a = ((Mat_MPIBAIJ *) A->data)->A;
1059   Mat b = ((Mat_MPIBAIJ *) A->data)->B;
1060   int ierr,s1,s2,s3;
1061 
1062   if (ll)  {
1063     ierr = VecGetLocalSize(ll,&s1); CHKERRQ(ierr);
1064     ierr = MatGetLocalSize(A,&s2,&s3); CHKERRQ(ierr);
1065     if (s1!=s2) SETERRQ(1,0,"non-conforming local sizes");
1066     ierr = MatDiagonalScale(a,ll,0); CHKERRQ(ierr);
1067     ierr = MatDiagonalScale(b,ll,0); CHKERRQ(ierr);
1068   }
1069   if (rr) SETERRQ(1,0,"not supported for right vector");
1070   return 0;
1071 }
1072 
1073 /* the code does not do the diagonal entries correctly unless the
1074    matrix is square and the column and row owerships are identical.
1075    This is a BUG. The only way to fix it seems to be to access
1076    baij->A and baij->B directly and not through the MatZeroRows()
1077    routine.
1078 */
1079 #undef __FUNC__
1080 #define __FUNC__ "MatZeroRows_MPIBAIJ"
1081 static int MatZeroRows_MPIBAIJ(Mat A,IS is,Scalar *diag)
1082 {
1083   Mat_MPIBAIJ    *l = (Mat_MPIBAIJ *) A->data;
1084   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
1085   int            *procs,*nprocs,j,found,idx,nsends,*work;
1086   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
1087   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
1088   int            *lens,imdex,*lrows,*values,bs=l->bs;
1089   MPI_Comm       comm = A->comm;
1090   MPI_Request    *send_waits,*recv_waits;
1091   MPI_Status     recv_status,*send_status;
1092   IS             istmp;
1093 
1094   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
1095   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
1096 
1097   /*  first count number of contributors to each processor */
1098   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
1099   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
1100   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
1101   for ( i=0; i<N; i++ ) {
1102     idx = rows[i];
1103     found = 0;
1104     for ( j=0; j<size; j++ ) {
1105       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
1106         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
1107       }
1108     }
1109     if (!found) SETERRQ(1,0,"Index out of range");
1110   }
1111   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
1112 
1113   /* inform other processors of number of messages and max length*/
1114   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
1115   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
1116   nrecvs = work[rank];
1117   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
1118   nmax = work[rank];
1119   PetscFree(work);
1120 
1121   /* post receives:   */
1122   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
1123   CHKPTRQ(rvalues);
1124   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
1125   CHKPTRQ(recv_waits);
1126   for ( i=0; i<nrecvs; i++ ) {
1127     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
1128   }
1129 
1130   /* do sends:
1131       1) starts[i] gives the starting index in svalues for stuff going to
1132          the ith processor
1133   */
1134   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
1135   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
1136   CHKPTRQ(send_waits);
1137   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
1138   starts[0] = 0;
1139   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1140   for ( i=0; i<N; i++ ) {
1141     svalues[starts[owner[i]]++] = rows[i];
1142   }
1143   ISRestoreIndices(is,&rows);
1144 
1145   starts[0] = 0;
1146   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1147   count = 0;
1148   for ( i=0; i<size; i++ ) {
1149     if (procs[i]) {
1150       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
1151     }
1152   }
1153   PetscFree(starts);
1154 
1155   base = owners[rank]*bs;
1156 
1157   /*  wait on receives */
1158   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
1159   source = lens + nrecvs;
1160   count  = nrecvs; slen = 0;
1161   while (count) {
1162     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
1163     /* unpack receives into our local space */
1164     MPI_Get_count(&recv_status,MPI_INT,&n);
1165     source[imdex]  = recv_status.MPI_SOURCE;
1166     lens[imdex]  = n;
1167     slen += n;
1168     count--;
1169   }
1170   PetscFree(recv_waits);
1171 
1172   /* move the data into the send scatter */
1173   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
1174   count = 0;
1175   for ( i=0; i<nrecvs; i++ ) {
1176     values = rvalues + i*nmax;
1177     for ( j=0; j<lens[i]; j++ ) {
1178       lrows[count++] = values[j] - base;
1179     }
1180   }
1181   PetscFree(rvalues); PetscFree(lens);
1182   PetscFree(owner); PetscFree(nprocs);
1183 
1184   /* actually zap the local rows */
1185   ierr = ISCreateGeneral(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
1186   PLogObjectParent(A,istmp);
1187   PetscFree(lrows);
1188   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
1189   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
1190   ierr = ISDestroy(istmp); CHKERRQ(ierr);
1191 
1192   /* wait on sends */
1193   if (nsends) {
1194     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
1195     CHKPTRQ(send_status);
1196     MPI_Waitall(nsends,send_waits,send_status);
1197     PetscFree(send_status);
1198   }
1199   PetscFree(send_waits); PetscFree(svalues);
1200 
1201   return 0;
1202 }
1203 extern int MatPrintHelp_SeqBAIJ(Mat);
1204 #undef __FUNC__
1205 #define __FUNC__ "MatPrintHelp_MPIBAIJ"
1206 static int MatPrintHelp_MPIBAIJ(Mat A)
1207 {
1208   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1209 
1210   if (!a->rank) return MatPrintHelp_SeqBAIJ(a->A);
1211   else return 0;
1212 }
1213 
1214 #undef __FUNC__
1215 #define __FUNC__ "MatSetUnfactored_MPIBAIJ"
1216 static int MatSetUnfactored_MPIBAIJ(Mat A)
1217 {
1218   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1219   int         ierr;
1220   ierr = MatSetUnfactored(a->A); CHKERRQ(ierr);
1221   return 0;
1222 }
1223 
1224 static int MatConvertSameType_MPIBAIJ(Mat,Mat *,int);
1225 
1226 /* -------------------------------------------------------------------*/
1227 static struct _MatOps MatOps = {
1228   MatSetValues_MPIBAIJ,MatGetRow_MPIBAIJ,MatRestoreRow_MPIBAIJ,MatMult_MPIBAIJ,
1229   MatMultAdd_MPIBAIJ,MatMultTrans_MPIBAIJ,MatMultTransAdd_MPIBAIJ,0,
1230   0,0,0,0,
1231   0,0,MatTranspose_MPIBAIJ,MatGetInfo_MPIBAIJ,
1232   0,MatGetDiagonal_MPIBAIJ,MatDiagonalScale_MPIBAIJ,MatNorm_MPIBAIJ,
1233   MatAssemblyBegin_MPIBAIJ,MatAssemblyEnd_MPIBAIJ,0,MatSetOption_MPIBAIJ,
1234   MatZeroEntries_MPIBAIJ,MatZeroRows_MPIBAIJ,0,
1235   0,0,0,MatGetSize_MPIBAIJ,
1236   MatGetLocalSize_MPIBAIJ,MatGetOwnershipRange_MPIBAIJ,0,0,
1237   0,0,0,MatConvertSameType_MPIBAIJ,0,0,
1238   0,0,0,MatGetSubMatrices_MPIBAIJ,
1239   MatIncreaseOverlap_MPIBAIJ,MatGetValues_MPIBAIJ,0,MatPrintHelp_MPIBAIJ,
1240   MatScale_MPIBAIJ,0,0,0,MatGetBlockSize_MPIBAIJ,
1241   0,0,0,0,0,0,MatSetUnfactored_MPIBAIJ};
1242 
1243 
1244 #undef __FUNC__
1245 #define __FUNC__ "MatCreateMPIBAIJ"
1246 /*@C
1247    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
1248    (block compressed row).  For good matrix assembly performance
1249    the user should preallocate the matrix storage by setting the parameters
1250    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1251    performance can be increased by more than a factor of 50.
1252 
1253    Input Parameters:
1254 .  comm - MPI communicator
1255 .  bs   - size of blockk
1256 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1257            This value should be the same as the local size used in creating the
1258            y vector for the matrix-vector product y = Ax.
1259 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1260            This value should be the same as the local size used in creating the
1261            x vector for the matrix-vector product y = Ax.
1262 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1263 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1264 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
1265            submatrix  (same for all local rows)
1266 .  d_nzz - array containing the number of block nonzeros in the various block rows
1267            of the in diagonal portion of the local (possibly different for each block
1268            row) or PETSC_NULL.  You must leave room for the diagonal entry even if
1269            it is zero.
1270 .  o_nz  - number of block nonzeros per block row in the off-diagonal portion of local
1271            submatrix (same for all local rows).
1272 .  o_nzz - array containing the number of nonzeros in the various block rows of the
1273            off-diagonal portion of the local submatrix (possibly different for
1274            each block row) or PETSC_NULL.
1275 
1276    Output Parameter:
1277 .  A - the matrix
1278 
1279    Notes:
1280    The user MUST specify either the local or global matrix dimensions
1281    (possibly both).
1282 
1283    Storage Information:
1284    For a square global matrix we define each processor's diagonal portion
1285    to be its local rows and the corresponding columns (a square submatrix);
1286    each processor's off-diagonal portion encompasses the remainder of the
1287    local matrix (a rectangular submatrix).
1288 
1289    The user can specify preallocated storage for the diagonal part of
1290    the local submatrix with either d_nz or d_nnz (not both).  Set
1291    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1292    memory allocation.  Likewise, specify preallocated storage for the
1293    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1294 
1295    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1296    the figure below we depict these three local rows and all columns (0-11).
1297 
1298 $          0 1 2 3 4 5 6 7 8 9 10 11
1299 $         -------------------
1300 $  row 3  |  o o o d d d o o o o o o
1301 $  row 4  |  o o o d d d o o o o o o
1302 $  row 5  |  o o o d d d o o o o o o
1303 $         -------------------
1304 $
1305 
1306    Thus, any entries in the d locations are stored in the d (diagonal)
1307    submatrix, and any entries in the o locations are stored in the
1308    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1309    stored simply in the MATSEQBAIJ format for compressed row storage.
1310 
1311    Now d_nz should indicate the number of nonzeros per row in the d matrix,
1312    and o_nz should indicate the number of nonzeros per row in the o matrix.
1313    In general, for PDE problems in which most nonzeros are near the diagonal,
1314    one expects d_nz >> o_nz.   For large problems you MUST preallocate memory
1315    or you will get TERRIBLE performance; see the users' manual chapter on
1316    matrices.
1317 
1318 .keywords: matrix, block, aij, compressed row, sparse, parallel
1319 
1320 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues()
1321 @*/
1322 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
1323                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1324 {
1325   Mat          B;
1326   Mat_MPIBAIJ  *b;
1327   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE,size;
1328 
1329   if (bs < 1) SETERRQ(1,0,"invalid block size specified");
1330   *A = 0;
1331   PetscHeaderCreate(B,_Mat,MAT_COOKIE,MATMPIBAIJ,comm);
1332   PLogObjectCreate(B);
1333   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
1334   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
1335   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1336   MPI_Comm_size(comm,&size);
1337   if (size == 1) {
1338     B->ops.getrowij          = MatGetRowIJ_MPIBAIJ;
1339     B->ops.restorerowij      = MatRestoreRowIJ_MPIBAIJ;
1340     B->ops.lufactorsymbolic  = MatLUFactorSymbolic_MPIBAIJ;
1341     B->ops.lufactornumeric   = MatLUFactorNumeric_MPIBAIJ;
1342     B->ops.lufactor          = MatLUFactor_MPIBAIJ;
1343     B->ops.solve             = MatSolve_MPIBAIJ;
1344     B->ops.solveadd          = MatSolveAdd_MPIBAIJ;
1345     B->ops.solvetrans        = MatSolveTrans_MPIBAIJ;
1346     B->ops.solvetransadd     = MatSolveTransAdd_MPIBAIJ;
1347     B->ops.ilufactorsymbolic = MatILUFactorSymbolic_MPIBAIJ;
1348   }
1349 
1350   B->destroy    = MatDestroy_MPIBAIJ;
1351   B->view       = MatView_MPIBAIJ;
1352   B->mapping    = 0;
1353   B->factor     = 0;
1354   B->assembled  = PETSC_FALSE;
1355 
1356   b->insertmode = NOT_SET_VALUES;
1357   MPI_Comm_rank(comm,&b->rank);
1358   MPI_Comm_size(comm,&b->size);
1359 
1360   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
1361     SETERRQ(1,0,"Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1362   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) SETERRQ(1,0,"either M or m should be specified");
1363   if ( M == PETSC_DECIDE && n == PETSC_DECIDE)SETERRQ(1,0,"either N or n should be specified");
1364   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
1365   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
1366 
1367   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1368     work[0] = m; work[1] = n;
1369     mbs = m/bs; nbs = n/bs;
1370     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
1371     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
1372     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
1373   }
1374   if (m == PETSC_DECIDE) {
1375     Mbs = M/bs;
1376     if (Mbs*bs != M) SETERRQ(1,0,"No of global rows must be divisible by blocksize");
1377     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
1378     m   = mbs*bs;
1379   }
1380   if (n == PETSC_DECIDE) {
1381     Nbs = N/bs;
1382     if (Nbs*bs != N) SETERRQ(1,0,"No of global cols must be divisible by blocksize");
1383     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
1384     n   = nbs*bs;
1385   }
1386   if (mbs*bs != m || nbs*bs != n) SETERRQ(1,0,"No of local rows, cols must be divisible by blocksize");
1387 
1388   b->m = m; B->m = m;
1389   b->n = n; B->n = n;
1390   b->N = N; B->N = N;
1391   b->M = M; B->M = M;
1392   b->bs  = bs;
1393   b->bs2 = bs*bs;
1394   b->mbs = mbs;
1395   b->nbs = nbs;
1396   b->Mbs = Mbs;
1397   b->Nbs = Nbs;
1398 
1399   /* build local table of row and column ownerships */
1400   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1401   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
1402   b->cowners = b->rowners + b->size + 2;
1403   MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
1404   b->rowners[0] = 0;
1405   for ( i=2; i<=b->size; i++ ) {
1406     b->rowners[i] += b->rowners[i-1];
1407   }
1408   b->rstart    = b->rowners[b->rank];
1409   b->rend      = b->rowners[b->rank+1];
1410   b->rstart_bs = b->rstart * bs;
1411   b->rend_bs   = b->rend * bs;
1412 
1413   MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
1414   b->cowners[0] = 0;
1415   for ( i=2; i<=b->size; i++ ) {
1416     b->cowners[i] += b->cowners[i-1];
1417   }
1418   b->cstart    = b->cowners[b->rank];
1419   b->cend      = b->cowners[b->rank+1];
1420   b->cstart_bs = b->cstart * bs;
1421   b->cend_bs   = b->cend * bs;
1422 
1423   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1424   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1425   PLogObjectParent(B,b->A);
1426   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1427   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1428   PLogObjectParent(B,b->B);
1429 
1430   /* build cache for off array entries formed */
1431   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1432   b->donotstash  = 0;
1433   b->colmap      = 0;
1434   b->garray      = 0;
1435   b->roworiented = 1;
1436 
1437   /* stuff used for matrix vector multiply */
1438   b->lvec      = 0;
1439   b->Mvctx     = 0;
1440 
1441   /* stuff for MatGetRow() */
1442   b->rowindices   = 0;
1443   b->rowvalues    = 0;
1444   b->getrowactive = PETSC_FALSE;
1445 
1446   *A = B;
1447   return 0;
1448 }
1449 
1450 #undef __FUNC__
1451 #define __FUNC__ "MatConvertSameType_MPIBAIJ"
1452 static int MatConvertSameType_MPIBAIJ(Mat matin,Mat *newmat,int cpvalues)
1453 {
1454   Mat        mat;
1455   Mat_MPIBAIJ *a,*oldmat = (Mat_MPIBAIJ *) matin->data;
1456   int        ierr, len=0, flg;
1457 
1458   *newmat       = 0;
1459   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIBAIJ,matin->comm);
1460   PLogObjectCreate(mat);
1461   mat->data       = (void *) (a = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(a);
1462   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1463   mat->destroy    = MatDestroy_MPIBAIJ;
1464   mat->view       = MatView_MPIBAIJ;
1465   mat->factor     = matin->factor;
1466   mat->assembled  = PETSC_TRUE;
1467 
1468   a->m = mat->m   = oldmat->m;
1469   a->n = mat->n   = oldmat->n;
1470   a->M = mat->M   = oldmat->M;
1471   a->N = mat->N   = oldmat->N;
1472 
1473   a->bs  = oldmat->bs;
1474   a->bs2 = oldmat->bs2;
1475   a->mbs = oldmat->mbs;
1476   a->nbs = oldmat->nbs;
1477   a->Mbs = oldmat->Mbs;
1478   a->Nbs = oldmat->Nbs;
1479 
1480   a->rstart       = oldmat->rstart;
1481   a->rend         = oldmat->rend;
1482   a->cstart       = oldmat->cstart;
1483   a->cend         = oldmat->cend;
1484   a->size         = oldmat->size;
1485   a->rank         = oldmat->rank;
1486   a->insertmode   = NOT_SET_VALUES;
1487   a->rowvalues    = 0;
1488   a->getrowactive = PETSC_FALSE;
1489 
1490   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1491   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
1492   a->cowners = a->rowners + a->size + 2;
1493   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1494   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1495   if (oldmat->colmap) {
1496     a->colmap = (int *) PetscMalloc((a->Nbs)*sizeof(int));CHKPTRQ(a->colmap);
1497     PLogObjectMemory(mat,(a->Nbs)*sizeof(int));
1498     PetscMemcpy(a->colmap,oldmat->colmap,(a->Nbs)*sizeof(int));
1499   } else a->colmap = 0;
1500   if (oldmat->garray && (len = ((Mat_SeqBAIJ *) (oldmat->B->data))->nbs)) {
1501     a->garray = (int *) PetscMalloc(len*sizeof(int)); CHKPTRQ(a->garray);
1502     PLogObjectMemory(mat,len*sizeof(int));
1503     PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1504   } else a->garray = 0;
1505 
1506   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1507   PLogObjectParent(mat,a->lvec);
1508   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1509   PLogObjectParent(mat,a->Mvctx);
1510   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1511   PLogObjectParent(mat,a->A);
1512   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1513   PLogObjectParent(mat,a->B);
1514   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1515   if (flg) {
1516     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1517   }
1518   *newmat = mat;
1519   return 0;
1520 }
1521 
1522 #include "sys.h"
1523 
1524 #undef __FUNC__
1525 #define __FUNC__ "MatLoad_MPIBAIJ"
1526 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
1527 {
1528   Mat          A;
1529   int          i, nz, ierr, j,rstart, rend, fd;
1530   Scalar       *vals,*buf;
1531   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1532   MPI_Status   status;
1533   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
1534   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
1535   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,bs2,Mbs,mbs,extra_rows;
1536   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
1537   int          dcount,kmax,k,nzcount,tmp;
1538 
1539 
1540   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
1541   bs2  = bs*bs;
1542 
1543   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1544   if (!rank) {
1545     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1546     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
1547     if (header[0] != MAT_COOKIE) SETERRQ(1,0,"not matrix object");
1548   }
1549 
1550   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1551   M = header[1]; N = header[2];
1552 
1553   if (M != N) SETERRQ(1,0,"Can only do square matrices");
1554 
1555   /*
1556      This code adds extra rows to make sure the number of rows is
1557      divisible by the blocksize
1558   */
1559   Mbs        = M/bs;
1560   extra_rows = bs - M + bs*(Mbs);
1561   if (extra_rows == bs) extra_rows = 0;
1562   else                  Mbs++;
1563   if (extra_rows &&!rank) {
1564     PLogInfo(0,"MatLoad_MPIBAIJ:Padding loaded matrix to match blocksize\n");
1565   }
1566 
1567   /* determine ownership of all rows */
1568   mbs = Mbs/size + ((Mbs % size) > rank);
1569   m   = mbs * bs;
1570   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
1571   browners = rowners + size + 1;
1572   MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1573   rowners[0] = 0;
1574   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
1575   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
1576   rstart = rowners[rank];
1577   rend   = rowners[rank+1];
1578 
1579   /* distribute row lengths to all processors */
1580   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
1581   if (!rank) {
1582     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
1583     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1584     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
1585     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1586     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
1587     MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);
1588     PetscFree(sndcounts);
1589   }
1590   else {
1591     MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);
1592   }
1593 
1594   if (!rank) {
1595     /* calculate the number of nonzeros on each processor */
1596     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1597     PetscMemzero(procsnz,size*sizeof(int));
1598     for ( i=0; i<size; i++ ) {
1599       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
1600         procsnz[i] += rowlengths[j];
1601       }
1602     }
1603     PetscFree(rowlengths);
1604 
1605     /* determine max buffer needed and allocate it */
1606     maxnz = 0;
1607     for ( i=0; i<size; i++ ) {
1608       maxnz = PetscMax(maxnz,procsnz[i]);
1609     }
1610     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1611 
1612     /* read in my part of the matrix column indices  */
1613     nz = procsnz[0];
1614     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1615     mycols = ibuf;
1616     if (size == 1)  nz -= extra_rows;
1617     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1618     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
1619 
1620     /* read in every ones (except the last) and ship off */
1621     for ( i=1; i<size-1; i++ ) {
1622       nz = procsnz[i];
1623       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1624       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1625     }
1626     /* read in the stuff for the last proc */
1627     if ( size != 1 ) {
1628       nz = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
1629       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1630       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
1631       MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);
1632     }
1633     PetscFree(cols);
1634   }
1635   else {
1636     /* determine buffer space needed for message */
1637     nz = 0;
1638     for ( i=0; i<m; i++ ) {
1639       nz += locrowlens[i];
1640     }
1641     ibuf = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1642     mycols = ibuf;
1643     /* receive message of column indices*/
1644     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1645     MPI_Get_count(&status,MPI_INT,&maxnz);
1646     if (maxnz != nz) SETERRQ(1,0,"something is wrong with file");
1647   }
1648 
1649   /* loop over local rows, determining number of off diagonal entries */
1650   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
1651   odlens = dlens + (rend-rstart);
1652   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
1653   PetscMemzero(mask,3*Mbs*sizeof(int));
1654   masked1 = mask    + Mbs;
1655   masked2 = masked1 + Mbs;
1656   rowcount = 0; nzcount = 0;
1657   for ( i=0; i<mbs; i++ ) {
1658     dcount  = 0;
1659     odcount = 0;
1660     for ( j=0; j<bs; j++ ) {
1661       kmax = locrowlens[rowcount];
1662       for ( k=0; k<kmax; k++ ) {
1663         tmp = mycols[nzcount++]/bs;
1664         if (!mask[tmp]) {
1665           mask[tmp] = 1;
1666           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
1667           else masked1[dcount++] = tmp;
1668         }
1669       }
1670       rowcount++;
1671     }
1672 
1673     dlens[i]  = dcount;
1674     odlens[i] = odcount;
1675 
1676     /* zero out the mask elements we set */
1677     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
1678     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
1679   }
1680 
1681   /* create our matrix */
1682   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);
1683          CHKERRQ(ierr);
1684   A = *newmat;
1685   MatSetOption(A,MAT_COLUMNS_SORTED);
1686 
1687   if (!rank) {
1688     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
1689     /* read in my part of the matrix numerical values  */
1690     nz = procsnz[0];
1691     vals = buf;
1692     mycols = ibuf;
1693     if (size == 1)  nz -= extra_rows;
1694     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1695     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
1696 
1697     /* insert into matrix */
1698     jj      = rstart*bs;
1699     for ( i=0; i<m; i++ ) {
1700       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1701       mycols += locrowlens[i];
1702       vals   += locrowlens[i];
1703       jj++;
1704     }
1705     /* read in other processors (except the last one) and ship out */
1706     for ( i=1; i<size-1; i++ ) {
1707       nz = procsnz[i];
1708       vals = buf;
1709       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1710       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1711     }
1712     /* the last proc */
1713     if ( size != 1 ){
1714       nz = procsnz[i] - extra_rows;
1715       vals = buf;
1716       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1717       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
1718       MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);
1719     }
1720     PetscFree(procsnz);
1721   }
1722   else {
1723     /* receive numeric values */
1724     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
1725 
1726     /* receive message of values*/
1727     vals = buf;
1728     mycols = ibuf;
1729     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1730     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1731     if (maxnz != nz) SETERRQ(1,0,"something is wrong with file");
1732 
1733     /* insert into matrix */
1734     jj      = rstart*bs;
1735     for ( i=0; i<m; i++ ) {
1736       ierr    = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1737       mycols += locrowlens[i];
1738       vals   += locrowlens[i];
1739       jj++;
1740     }
1741   }
1742   PetscFree(locrowlens);
1743   PetscFree(buf);
1744   PetscFree(ibuf);
1745   PetscFree(rowners);
1746   PetscFree(dlens);
1747   PetscFree(mask);
1748   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1749   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1750   return 0;
1751 }
1752 
1753 
1754