xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision 80c1aa95bd12b9d562ae9106f3faf74872834bb5)
1 #ifndef lint
2 static char vcid[] = "$Id: mpibaij.c,v 1.46 1997/01/06 20:25:27 balay Exp balay $";
3 #endif
4 
5 #include "src/mat/impls/baij/mpi/mpibaij.h"
6 #include "src/vec/vecimpl.h"
7 
8 #include "draw.h"
9 #include "pinclude/pviewer.h"
10 
11 extern int MatSetUpMultiply_MPIBAIJ(Mat);
12 extern int DisAssemble_MPIBAIJ(Mat);
13 extern int MatIncreaseOverlap_MPIBAIJ(Mat,int,IS *,int);
14 extern int MatGetSubMatrices_MPIBAIJ(Mat,int,IS *,IS *,MatGetSubMatrixCall,Mat **);
15 extern int MatLUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,Mat *);
16 extern int MatLUFactorNumeric_MPIBAIJ(Mat,Mat *);
17 extern int MatLUFactor_MPIBAIJ(Mat,IS,IS,double);
18 extern int MatSolve_MPIBAIJ(Mat,Vec,Vec);
19 extern int MatSolveAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
20 extern int MatSolveTrans_MPIBAIJ(Mat,Vec,Vec);
21 extern int MatSolveTransAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
22 extern int MatILUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,int,Mat *);
23 
24 
25 /*
26      Local utility routine that creates a mapping from the global column
27    number to the local number in the off-diagonal part of the local
28    storage of the matrix.  This is done in a non scable way since the
29    length of colmap equals the global matrix length.
30 */
31 #undef __FUNC__
32 #define __FUNC__ "CreateColmap_MPIBAIJ_Private"
33 static int CreateColmap_MPIBAIJ_Private(Mat mat)
34 {
35   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
36   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
37   int         nbs = B->nbs,i,bs=B->bs;;
38 
39   baij->colmap = (int *) PetscMalloc(baij->Nbs*sizeof(int));CHKPTRQ(baij->colmap);
40   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
41   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
42   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i*bs+1;
43   return 0;
44 }
45 
46 #undef __FUNC__
47 #define __FUNC__ "MatGetRowIJ_MPIBAIJ("
48 static int MatGetRowIJ_MPIBAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
49                             PetscTruth *done)
50 {
51   Mat_MPIBAIJ *aij = (Mat_MPIBAIJ *) mat->data;
52   int         ierr;
53   if (aij->size == 1) {
54     ierr = MatGetRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
55   } else SETERRQ(1,0,"not supported in parallel");
56   return 0;
57 }
58 
59 #undef __FUNC__
60 #define __FUNC__ "MatRestoreRowIJ_MPIBAIJ"
61 static int MatRestoreRowIJ_MPIBAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
62                                 PetscTruth *done)
63 {
64   Mat_MPIBAIJ *aij = (Mat_MPIBAIJ *) mat->data;
65   int        ierr;
66   if (aij->size == 1) {
67     ierr = MatRestoreRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
68   } else SETERRQ(1,0,"not supported in parallel");
69   return 0;
70 }
71 #define CHUNKSIZE  10
72 
73 #define  MatSetValues_SeqBAIJ_A_Private( new_A, row, col, value) \
74 { \
75  \
76     brow = row/bs;  \
77     rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
78     rmax = imax[brow]; nrow = ailen[brow]; \
79       bcol = col/bs; \
80       ridx = row % bs; cidx = col % bs; \
81       for ( _i=0; _i<nrow; _i++ ) { \
82         if (rp[_i] > bcol) break; \
83         if (rp[_i] == bcol) { \
84           bap  = ap +  bs2*_i + bs*cidx + ridx; \
85 	    *bap  = value; \
86           goto _noinsert; \
87         } \
88       } \
89       if (nrow >= rmax) { \
90         /* there is no extra room in row, therefore enlarge */ \
91         int    new_nz = ai[a->mbs] + CHUNKSIZE,len,*new_i,*new_j; \
92         Scalar *new_a; \
93  \
94         /* malloc new storage space */ \
95         len     = new_nz*(sizeof(int)+bs2*sizeof(Scalar))+(a->mbs+1)*sizeof(int); \
96         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
97         new_j   = (int *) (new_a + bs2*new_nz); \
98         new_i   = new_j + new_nz; \
99  \
100         /* copy over old data into new slots */ \
101         for ( ii=0; ii<brow+1; ii++ ) {new_i[ii] = ai[ii];} \
102         for ( ii=brow+1; ii<a->mbs+1; ii++ ) {new_i[ii] = ai[ii]+CHUNKSIZE;} \
103         PetscMemcpy(new_j,aj,(ai[brow]+nrow)*sizeof(int)); \
104         len = (new_nz - CHUNKSIZE - ai[brow] - nrow); \
105         PetscMemcpy(new_j+ai[brow]+nrow+CHUNKSIZE,aj+ai[brow]+nrow, \
106                                                            len*sizeof(int)); \
107         PetscMemcpy(new_a,aa,(ai[brow]+nrow)*bs2*sizeof(Scalar)); \
108         PetscMemzero(new_a+bs2*(ai[brow]+nrow),bs2*CHUNKSIZE*sizeof(Scalar)); \
109         PetscMemcpy(new_a+bs2*(ai[brow]+nrow+CHUNKSIZE), \
110                     aa+bs2*(ai[brow]+nrow),bs2*len*sizeof(Scalar));  \
111         /* free up old matrix storage */ \
112         PetscFree(a->a);  \
113         if (!a->singlemalloc) {PetscFree(a->i);PetscFree(a->j);} \
114         aa = a->a = new_a; ai = a->i = new_i; aj = a->j = new_j;  \
115         a->singlemalloc = 1; \
116  \
117         rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
118         rmax = imax[brow] = imax[brow] + CHUNKSIZE; \
119         PLogObjectMemory(A,CHUNKSIZE*(sizeof(int) + bs2*sizeof(Scalar))); \
120         a->maxnz += bs2*CHUNKSIZE; \
121         a->reallocs++; \
122         a->nz++; \
123       } \
124       N = nrow++ - 1;  \
125       /* shift up all the later entries in this row */ \
126       for ( ii=N; ii>=_i; ii-- ) { \
127         rp[ii+1] = rp[ii]; \
128         PetscMemcpy(ap+bs2*(ii+1),ap+bs2*(ii),bs2*sizeof(Scalar)); \
129       } \
130       if (N>=_i) PetscMemzero(ap+bs2*_i,bs2*sizeof(Scalar));  \
131       rp[_i]                      = bcol;  \
132       ap[bs2*_i + bs*cidx + ridx] = value;  \
133       _noinsert:; \
134     ailen[brow] = nrow; \
135 }
136 
137 extern int MatSetValues_SeqBAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
138 #undef __FUNC__
139 #define __FUNC__ "MatSetValues_MPIBAIJ"
140 static int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
141 {
142   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
143   Scalar      value;
144   int         ierr,i,j,row,col;
145   int         roworiented = baij->roworiented,rstart_orig=baij->rstart_bs ;
146   int         rend_orig=baij->rend_bs,cstart_orig=baij->cstart_bs;
147   int         cend_orig=baij->cend_bs,bs=baij->bs;
148 
149   Mat A = baij->A;
150   Mat_SeqBAIJ *a = (Mat_SeqBAIJ *) (A)->data;
151   int         *rp,ii,nrow,_i,rmax,N;
152   int         *imax=a->imax,*ai=a->i,*ailen=a->ilen;
153   int         *aj=a->j,brow,bcol;
154   int          ridx,cidx,bs2=a->bs2;
155   Scalar      *ap,*aa=a->a,*bap;
156 
157 #if defined(PETSC_BOPT_g)
158   if (baij->insertmode != NOT_SET_VALUES && baij->insertmode != addv) {
159     SETERRQ(1,0,"Cannot mix inserts and adds");
160   }
161 #endif
162   baij->insertmode = addv;
163   for ( i=0; i<m; i++ ) {
164 #if defined(PETSC_BOPT_g)
165     if (im[i] < 0) SETERRQ(1,0,"Negative row");
166     if (im[i] >= baij->M) SETERRQ(1,0,"Row too large");
167 #endif
168     if (im[i] >= rstart_orig && im[i] < rend_orig) {
169       row = im[i] - rstart_orig;
170       for ( j=0; j<n; j++ ) {
171         if (in[j] >= cstart_orig && in[j] < cend_orig){
172           col = in[j] - cstart_orig;
173           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
174           MatSetValues_SeqBAIJ_A_Private(baij->A,row,col,value);
175           /* ierr = MatSetValues_SeqBAIJ(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
176         }
177 #if defined(PETSC_BOPT_g)
178         else if (in[j] < 0) {SETERRQ(1,0,"Negative column");}
179         else if (in[j] >= baij->N) {SETERRQ(1,0,"Col too large");}
180 #endif
181         else {
182           if (mat->was_assembled) {
183             if (!baij->colmap) {
184               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
185             }
186             col = baij->colmap[in[j]/bs] - 1 + in[j]%bs;
187             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
188               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
189               col =  in[j];
190             }
191           }
192           else col = in[j];
193           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
194           ierr = MatSetValues_SeqBAIJ(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
195         }
196       }
197     }
198     else {
199       if (roworiented && !baij->donotstash) {
200         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
201       }
202       else {
203         if (!baij->donotstash) {
204           row = im[i];
205 	  for ( j=0; j<n; j++ ) {
206 	    ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
207           }
208         }
209       }
210     }
211   }
212   return 0;
213 }
214 
215 #undef __FUNC__
216 #define __FUNC__ "MatGetValues_MPIBAIJ"
217 static int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
218 {
219   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
220   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
221   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col;
222 
223   for ( i=0; i<m; i++ ) {
224     if (idxm[i] < 0) SETERRQ(1,0,"Negative row");
225     if (idxm[i] >= baij->M) SETERRQ(1,0,"Row too large");
226     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
227       row = idxm[i] - bsrstart;
228       for ( j=0; j<n; j++ ) {
229         if (idxn[j] < 0) SETERRQ(1,0,"Negative column");
230         if (idxn[j] >= baij->N) SETERRQ(1,0,"Col too large");
231         if (idxn[j] >= bscstart && idxn[j] < bscend){
232           col = idxn[j] - bscstart;
233           ierr = MatGetValues(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
234         }
235         else {
236           if (!baij->colmap) {
237             ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
238           }
239           if((baij->colmap[idxn[j]/bs]-1 < 0) ||
240              (baij->garray[(baij->colmap[idxn[j]/bs]-1)/bs] != idxn[j]/bs)) *(v+i*n+j) = 0.0;
241           else {
242             col  = (baij->colmap[idxn[j]/bs]-1) + idxn[j]%bs;
243             ierr = MatGetValues(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
244           }
245         }
246       }
247     }
248     else {
249       SETERRQ(1,0,"Only local values currently supported");
250     }
251   }
252   return 0;
253 }
254 
255 #undef __FUNC__
256 #define __FUNC__ "MatNorm_MPIBAIJ"
257 static int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
258 {
259   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
260   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
261   int        ierr, i,bs2=baij->bs2;
262   double     sum = 0.0;
263   Scalar     *v;
264 
265   if (baij->size == 1) {
266     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
267   } else {
268     if (type == NORM_FROBENIUS) {
269       v = amat->a;
270       for (i=0; i<amat->nz*bs2; i++ ) {
271 #if defined(PETSC_COMPLEX)
272         sum += real(conj(*v)*(*v)); v++;
273 #else
274         sum += (*v)*(*v); v++;
275 #endif
276       }
277       v = bmat->a;
278       for (i=0; i<bmat->nz*bs2; i++ ) {
279 #if defined(PETSC_COMPLEX)
280         sum += real(conj(*v)*(*v)); v++;
281 #else
282         sum += (*v)*(*v); v++;
283 #endif
284       }
285       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
286       *norm = sqrt(*norm);
287     }
288     else
289       SETERRQ(PETSC_ERR_SUP,0,"No support for this norm yet");
290   }
291   return 0;
292 }
293 
294 #undef __FUNC__
295 #define __FUNC__ "MatAssemblyBegin_MPIBAIJ"
296 static int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
297 {
298   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
299   MPI_Comm    comm = mat->comm;
300   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
301   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
302   MPI_Request *send_waits,*recv_waits;
303   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
304   InsertMode  addv;
305   Scalar      *rvalues,*svalues;
306 
307   /* make sure all processors are either in INSERTMODE or ADDMODE */
308   MPI_Allreduce(&baij->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
309   if (addv == (ADD_VALUES|INSERT_VALUES)) {
310     SETERRQ(1,0,"Some processors inserted others added");
311   }
312   baij->insertmode = addv; /* in case this processor had no cache */
313 
314   /*  first count number of contributors to each processor */
315   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
316   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
317   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
318   for ( i=0; i<baij->stash.n; i++ ) {
319     idx = baij->stash.idx[i];
320     for ( j=0; j<size; j++ ) {
321       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
322         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
323       }
324     }
325   }
326   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
327 
328   /* inform other processors of number of messages and max length*/
329   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
330   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
331   nreceives = work[rank];
332   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
333   nmax = work[rank];
334   PetscFree(work);
335 
336   /* post receives:
337        1) each message will consist of ordered pairs
338      (global index,value) we store the global index as a double
339      to simplify the message passing.
340        2) since we don't know how long each individual message is we
341      allocate the largest needed buffer for each receive. Potentially
342      this is a lot of wasted space.
343 
344 
345        This could be done better.
346   */
347   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
348   CHKPTRQ(rvalues);
349   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
350   CHKPTRQ(recv_waits);
351   for ( i=0; i<nreceives; i++ ) {
352     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
353               comm,recv_waits+i);
354   }
355 
356   /* do sends:
357       1) starts[i] gives the starting index in svalues for stuff going to
358          the ith processor
359   */
360   svalues = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
361   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
362   CHKPTRQ(send_waits);
363   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
364   starts[0] = 0;
365   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
366   for ( i=0; i<baij->stash.n; i++ ) {
367     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
368     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
369     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
370   }
371   PetscFree(owner);
372   starts[0] = 0;
373   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
374   count = 0;
375   for ( i=0; i<size; i++ ) {
376     if (procs[i]) {
377       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
378                 comm,send_waits+count++);
379     }
380   }
381   PetscFree(starts); PetscFree(nprocs);
382 
383   /* Free cache space */
384   PLogInfo(mat,"MatAssemblyBegin_MPIBAIJ:Number of off-processor values %d\n",baij->stash.n);
385   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
386 
387   baij->svalues    = svalues;    baij->rvalues    = rvalues;
388   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
389   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
390   baij->rmax       = nmax;
391 
392   return 0;
393 }
394 
395 
396 #undef __FUNC__
397 #define __FUNC__ "MatAssemblyEnd_MPIBAIJ"
398 static int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
399 {
400   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
401   MPI_Status  *send_status,recv_status;
402   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
403   int         bs=baij->bs,row,col,other_disassembled;
404   Scalar      *values,val;
405   InsertMode  addv = baij->insertmode;
406 
407   /*  wait on receives */
408   while (count) {
409     MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);
410     /* unpack receives into our local space */
411     values = baij->rvalues + 3*imdex*baij->rmax;
412     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
413     n = n/3;
414     for ( i=0; i<n; i++ ) {
415       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
416       col = (int) PetscReal(values[3*i+1]);
417       val = values[3*i+2];
418       if (col >= baij->cstart*bs && col < baij->cend*bs) {
419         col -= baij->cstart*bs;
420         MatSetValues(baij->A,1,&row,1,&col,&val,addv);
421       }
422       else {
423         if (mat->was_assembled) {
424           if (!baij->colmap) {
425             ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);
426           }
427           col = (baij->colmap[col/bs]-1)*bs + col%bs;
428           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
429             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
430             col = (int) PetscReal(values[3*i+1]);
431           }
432         }
433         MatSetValues(baij->B,1,&row,1,&col,&val,addv);
434       }
435     }
436     count--;
437   }
438   PetscFree(baij->recv_waits); PetscFree(baij->rvalues);
439 
440   /* wait on sends */
441   if (baij->nsends) {
442     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));
443     CHKPTRQ(send_status);
444     MPI_Waitall(baij->nsends,baij->send_waits,send_status);
445     PetscFree(send_status);
446   }
447   PetscFree(baij->send_waits); PetscFree(baij->svalues);
448 
449   baij->insertmode = NOT_SET_VALUES;
450   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
451   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
452 
453   /* determine if any processor has disassembled, if so we must
454      also disassemble ourselfs, in order that we may reassemble. */
455   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
456   if (mat->was_assembled && !other_disassembled) {
457     ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
458   }
459 
460   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
461     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
462   }
463   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
464   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
465 
466   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
467   return 0;
468 }
469 
470 #undef __FUNC__
471 #define __FUNC__ "MatView_MPIBAIJ_Binary"
472 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
473 {
474   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
475   int          ierr;
476 
477   if (baij->size == 1) {
478     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
479   }
480   else SETERRQ(1,0,"Only uniprocessor output supported");
481   return 0;
482 }
483 
484 #undef __FUNC__
485 #define __FUNC__ "MatView_MPIBAIJ_ASCIIorDraworMatlab"
486 static int MatView_MPIBAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
487 {
488   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
489   int          ierr, format,rank,bs = baij->bs;
490   FILE         *fd;
491   ViewerType   vtype;
492 
493   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
494   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
495     ierr = ViewerGetFormat(viewer,&format);
496     if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
497       MatInfo info;
498       MPI_Comm_rank(mat->comm,&rank);
499       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
500       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
501       PetscSequentialPhaseBegin(mat->comm,1);
502       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
503               rank,baij->m,(int)info.nz_used*bs,(int)info.nz_allocated*bs,
504               baij->bs,(int)info.memory);
505       ierr = MatGetInfo(baij->A,MAT_LOCAL,&info);
506       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
507       ierr = MatGetInfo(baij->B,MAT_LOCAL,&info);
508       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
509       fflush(fd);
510       PetscSequentialPhaseEnd(mat->comm,1);
511       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
512       return 0;
513     }
514     else if (format == VIEWER_FORMAT_ASCII_INFO) {
515       PetscPrintf(mat->comm,"  block size is %d\n",bs);
516       return 0;
517     }
518   }
519 
520   if (vtype == DRAW_VIEWER) {
521     Draw       draw;
522     PetscTruth isnull;
523     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
524     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
525   }
526 
527   if (vtype == ASCII_FILE_VIEWER) {
528     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
529     PetscSequentialPhaseBegin(mat->comm,1);
530     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
531            baij->rank,baij->m,baij->rstart*bs,baij->rend*bs,baij->n,
532             baij->cstart*bs,baij->cend*bs);
533     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
534     ierr = MatView(baij->B,viewer); CHKERRQ(ierr);
535     fflush(fd);
536     PetscSequentialPhaseEnd(mat->comm,1);
537   }
538   else {
539     int size = baij->size;
540     rank = baij->rank;
541     if (size == 1) {
542       ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
543     }
544     else {
545       /* assemble the entire matrix onto first processor. */
546       Mat         A;
547       Mat_SeqBAIJ *Aloc;
548       int         M = baij->M, N = baij->N,*ai,*aj,row,col,i,j,k,*rvals;
549       int         mbs=baij->mbs;
550       Scalar      *a;
551 
552       if (!rank) {
553         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
554         CHKERRQ(ierr);
555       }
556       else {
557         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
558         CHKERRQ(ierr);
559       }
560       PLogObjectParent(mat,A);
561 
562       /* copy over the A part */
563       Aloc = (Mat_SeqBAIJ*) baij->A->data;
564       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
565       row = baij->rstart;
566       rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
567 
568       for ( i=0; i<mbs; i++ ) {
569         rvals[0] = bs*(baij->rstart + i);
570         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
571         for ( j=ai[i]; j<ai[i+1]; j++ ) {
572           col = (baij->cstart+aj[j])*bs;
573           for (k=0; k<bs; k++ ) {
574             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
575             col++; a += bs;
576           }
577         }
578       }
579       /* copy over the B part */
580       Aloc = (Mat_SeqBAIJ*) baij->B->data;
581       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
582       row = baij->rstart*bs;
583       for ( i=0; i<mbs; i++ ) {
584         rvals[0] = bs*(baij->rstart + i);
585         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
586         for ( j=ai[i]; j<ai[i+1]; j++ ) {
587           col = baij->garray[aj[j]]*bs;
588           for (k=0; k<bs; k++ ) {
589             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
590             col++; a += bs;
591           }
592         }
593       }
594       PetscFree(rvals);
595       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
596       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
597       if (!rank) {
598         ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
599       }
600       ierr = MatDestroy(A); CHKERRQ(ierr);
601     }
602   }
603   return 0;
604 }
605 
606 
607 
608 #undef __FUNC__
609 #define __FUNC__ "MatView_MPIBAIJ"
610 static int MatView_MPIBAIJ(PetscObject obj,Viewer viewer)
611 {
612   Mat         mat = (Mat) obj;
613   int         ierr;
614   ViewerType  vtype;
615 
616   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
617   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
618       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
619     ierr = MatView_MPIBAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
620   }
621   else if (vtype == BINARY_FILE_VIEWER) {
622     return MatView_MPIBAIJ_Binary(mat,viewer);
623   }
624   return 0;
625 }
626 
627 #undef __FUNC__
628 #define __FUNC__ "MatDestroy_MPIBAIJ"
629 static int MatDestroy_MPIBAIJ(PetscObject obj)
630 {
631   Mat         mat = (Mat) obj;
632   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
633   int         ierr;
634 
635 #if defined(PETSC_LOG)
636   PLogObjectState(obj,"Rows=%d, Cols=%d",baij->M,baij->N);
637 #endif
638 
639   PetscFree(baij->rowners);
640   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
641   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
642   if (baij->colmap) PetscFree(baij->colmap);
643   if (baij->garray) PetscFree(baij->garray);
644   if (baij->lvec)   VecDestroy(baij->lvec);
645   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
646   if (baij->rowvalues) PetscFree(baij->rowvalues);
647   PetscFree(baij);
648   if (mat->mapping) {
649     ierr = ISLocalToGlobalMappingDestroy(mat->mapping); CHKERRQ(ierr);
650   }
651   PLogObjectDestroy(mat);
652   PetscHeaderDestroy(mat);
653   return 0;
654 }
655 
656 #undef __FUNC__
657 #define __FUNC__ "MatMult_MPIBAIJ"
658 static int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
659 {
660   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
661   int         ierr, nt;
662 
663   VecGetLocalSize_Fast(xx,nt);
664   if (nt != a->n) {
665     SETERRQ(1,0,"Incompatible parition of A and xx");
666   }
667   VecGetLocalSize_Fast(yy,nt);
668   if (nt != a->m) {
669     SETERRQ(1,0,"Incompatible parition of A and yy");
670   }
671   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
672   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
673   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
674   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
675   ierr = VecScatterPostRecvs(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
676   return 0;
677 }
678 
679 #undef __FUNC__
680 #define __FUNC__ "MatMultAdd_MPIBAIJ"
681 static int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
682 {
683   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
684   int        ierr;
685   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
686   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
687   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
688   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
689   return 0;
690 }
691 
692 #undef __FUNC__
693 #define __FUNC__ "MatMultTrans_MPIBAIJ"
694 static int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
695 {
696   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
697   int        ierr;
698 
699   /* do nondiagonal part */
700   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
701   /* send it on its way */
702   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
703   /* do local part */
704   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
705   /* receive remote parts: note this assumes the values are not actually */
706   /* inserted in yy until the next line, which is true for my implementation*/
707   /* but is not perhaps always true. */
708   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
709   return 0;
710 }
711 
712 #undef __FUNC__
713 #define __FUNC__ "MatMultTransAdd_MPIBAIJ"
714 static int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
715 {
716   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
717   int        ierr;
718 
719   /* do nondiagonal part */
720   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
721   /* send it on its way */
722   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
723   /* do local part */
724   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
725   /* receive remote parts: note this assumes the values are not actually */
726   /* inserted in yy until the next line, which is true for my implementation*/
727   /* but is not perhaps always true. */
728   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
729   return 0;
730 }
731 
732 /*
733   This only works correctly for square matrices where the subblock A->A is the
734    diagonal block
735 */
736 #undef __FUNC__
737 #define __FUNC__ "MatGetDiagonal_MPIBAIJ"
738 static int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
739 {
740   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
741   if (a->M != a->N)
742     SETERRQ(1,0,"Supports only square matrix where A->A is diag block");
743   return MatGetDiagonal(a->A,v);
744 }
745 
746 #undef __FUNC__
747 #define __FUNC__ "MatScale_MPIBAIJ"
748 static int MatScale_MPIBAIJ(Scalar *aa,Mat A)
749 {
750   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
751   int        ierr;
752   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
753   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
754   return 0;
755 }
756 
757 #undef __FUNC__
758 #define __FUNC__ "MatGetSize_MPIBAIJ"
759 static int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
760 {
761   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
762   *m = mat->M; *n = mat->N;
763   return 0;
764 }
765 
766 #undef __FUNC__
767 #define __FUNC__ "MatGetLocalSize_MPIBAIJ"
768 static int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
769 {
770   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
771   *m = mat->m; *n = mat->N;
772   return 0;
773 }
774 
775 #undef __FUNC__
776 #define __FUNC__ "MatGetOwnershipRange_MPIBAIJ"
777 static int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
778 {
779   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
780   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
781   return 0;
782 }
783 
784 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
785 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
786 
787 #undef __FUNC__
788 #define __FUNC__ "MatGetRow_MPIBAIJ"
789 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
790 {
791   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
792   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
793   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
794   int        nztot, nzA, nzB, lrow, brstart = mat->rstart*bs, brend = mat->rend*bs;
795   int        *cmap, *idx_p,cstart = mat->cstart;
796 
797   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,0,"Already active");
798   mat->getrowactive = PETSC_TRUE;
799 
800   if (!mat->rowvalues && (idx || v)) {
801     /*
802         allocate enough space to hold information from the longest row.
803     */
804     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
805     int     max = 1,mbs = mat->mbs,tmp;
806     for ( i=0; i<mbs; i++ ) {
807       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
808       if (max < tmp) { max = tmp; }
809     }
810     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
811     CHKPTRQ(mat->rowvalues);
812     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
813   }
814 
815 
816   if (row < brstart || row >= brend) SETERRQ(1,0,"Only local rows")
817   lrow = row - brstart;
818 
819   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
820   if (!v)   {pvA = 0; pvB = 0;}
821   if (!idx) {pcA = 0; if (!v) pcB = 0;}
822   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
823   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
824   nztot = nzA + nzB;
825 
826   cmap  = mat->garray;
827   if (v  || idx) {
828     if (nztot) {
829       /* Sort by increasing column numbers, assuming A and B already sorted */
830       int imark = -1;
831       if (v) {
832         *v = v_p = mat->rowvalues;
833         for ( i=0; i<nzB; i++ ) {
834           if (cmap[cworkB[i]/bs] < cstart)   v_p[i] = vworkB[i];
835           else break;
836         }
837         imark = i;
838         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
839         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
840       }
841       if (idx) {
842         *idx = idx_p = mat->rowindices;
843         if (imark > -1) {
844           for ( i=0; i<imark; i++ ) {
845             idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs;
846           }
847         } else {
848           for ( i=0; i<nzB; i++ ) {
849             if (cmap[cworkB[i]/bs] < cstart)
850               idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
851             else break;
852           }
853           imark = i;
854         }
855         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart*bs + cworkA[i];
856         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
857       }
858     }
859     else {
860       if (idx) *idx = 0;
861       if (v)   *v   = 0;
862     }
863   }
864   *nz = nztot;
865   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
866   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
867   return 0;
868 }
869 
870 #undef __FUNC__
871 #define __FUNC__ "MatRestoreRow_MPIBAIJ"
872 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
873 {
874   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
875   if (baij->getrowactive == PETSC_FALSE) {
876     SETERRQ(1,0,"MatGetRow not called");
877   }
878   baij->getrowactive = PETSC_FALSE;
879   return 0;
880 }
881 
882 #undef __FUNC__
883 #define __FUNC__ "MatGetBlockSize_MPIBAIJ"
884 static int MatGetBlockSize_MPIBAIJ(Mat mat,int *bs)
885 {
886   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
887   *bs = baij->bs;
888   return 0;
889 }
890 
891 #undef __FUNC__
892 #define __FUNC__ "MatZeroEntries_MPIBAIJ"
893 static int MatZeroEntries_MPIBAIJ(Mat A)
894 {
895   Mat_MPIBAIJ *l = (Mat_MPIBAIJ *) A->data;
896   int         ierr;
897   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
898   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
899   return 0;
900 }
901 
902 #undef __FUNC__
903 #define __FUNC__ "MatGetInfo_MPIBAIJ"
904 static int MatGetInfo_MPIBAIJ(Mat matin,MatInfoType flag,MatInfo *info)
905 {
906   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) matin->data;
907   Mat         A = a->A, B = a->B;
908   int         ierr;
909   double      isend[5], irecv[5];
910 
911   info->rows_global    = (double)a->M;
912   info->columns_global = (double)a->N;
913   info->rows_local     = (double)a->m;
914   info->columns_local  = (double)a->N;
915   info->block_size     = (double)a->bs;
916   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
917   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->memory;
918   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
919   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->memory;
920   if (flag == MAT_LOCAL) {
921     info->nz_used      = isend[0];
922     info->nz_allocated = isend[1];
923     info->nz_unneeded  = isend[2];
924     info->memory       = isend[3];
925     info->mallocs      = isend[4];
926   } else if (flag == MAT_GLOBAL_MAX) {
927     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_MAX,matin->comm);
928     info->nz_used      = irecv[0];
929     info->nz_allocated = irecv[1];
930     info->nz_unneeded  = irecv[2];
931     info->memory       = irecv[3];
932     info->mallocs      = irecv[4];
933   } else if (flag == MAT_GLOBAL_SUM) {
934     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_SUM,matin->comm);
935     info->nz_used      = irecv[0];
936     info->nz_allocated = irecv[1];
937     info->nz_unneeded  = irecv[2];
938     info->memory       = irecv[3];
939     info->mallocs      = irecv[4];
940   }
941   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
942   info->fill_ratio_needed = 0;
943   info->factor_mallocs    = 0;
944   return 0;
945 }
946 
947 #undef __FUNC__
948 #define __FUNC__ "MatSetOption_MPIBAIJ"
949 static int MatSetOption_MPIBAIJ(Mat A,MatOption op)
950 {
951   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
952 
953   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
954       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
955       op == MAT_COLUMNS_UNSORTED ||
956       op == MAT_COLUMNS_SORTED) {
957         MatSetOption(a->A,op);
958         MatSetOption(a->B,op);
959   } else if (op == MAT_ROW_ORIENTED) {
960         a->roworiented = 1;
961         MatSetOption(a->A,op);
962         MatSetOption(a->B,op);
963   } else if (op == MAT_ROWS_SORTED ||
964              op == MAT_ROWS_UNSORTED ||
965              op == MAT_SYMMETRIC ||
966              op == MAT_STRUCTURALLY_SYMMETRIC ||
967              op == MAT_YES_NEW_DIAGONALS)
968     PLogInfo(A,"Info:MatSetOption_MPIBAIJ:Option ignored\n");
969   else if (op == MAT_COLUMN_ORIENTED) {
970     a->roworiented = 0;
971     MatSetOption(a->A,op);
972     MatSetOption(a->B,op);
973   } else if (op == MAT_IGNORE_OFF_PROCESSOR_ENTRIES) {
974     a->donotstash = 1;
975   } else if (op == MAT_NO_NEW_DIAGONALS)
976     {SETERRQ(PETSC_ERR_SUP,0,"MAT_NO_NEW_DIAGONALS");}
977   else
978     {SETERRQ(PETSC_ERR_SUP,0,"unknown option");}
979   return 0;
980 }
981 
982 #undef __FUNC__
983 #define __FUNC__ "MatTranspose_MPIBAIJ("
984 static int MatTranspose_MPIBAIJ(Mat A,Mat *matout)
985 {
986   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) A->data;
987   Mat_SeqBAIJ *Aloc;
988   Mat        B;
989   int        ierr,M=baij->M,N=baij->N,*ai,*aj,row,i,*rvals,j,k,col;
990   int        bs=baij->bs,mbs=baij->mbs;
991   Scalar     *a;
992 
993   if (matout == PETSC_NULL && M != N)
994     SETERRQ(1,0,"Square matrix only for in-place");
995   ierr = MatCreateMPIBAIJ(A->comm,baij->bs,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);
996   CHKERRQ(ierr);
997 
998   /* copy over the A part */
999   Aloc = (Mat_SeqBAIJ*) baij->A->data;
1000   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1001   row = baij->rstart;
1002   rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
1003 
1004   for ( i=0; i<mbs; i++ ) {
1005     rvals[0] = bs*(baij->rstart + i);
1006     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1007     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1008       col = (baij->cstart+aj[j])*bs;
1009       for (k=0; k<bs; k++ ) {
1010         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1011         col++; a += bs;
1012       }
1013     }
1014   }
1015   /* copy over the B part */
1016   Aloc = (Mat_SeqBAIJ*) baij->B->data;
1017   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1018   row = baij->rstart*bs;
1019   for ( i=0; i<mbs; i++ ) {
1020     rvals[0] = bs*(baij->rstart + i);
1021     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1022     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1023       col = baij->garray[aj[j]]*bs;
1024       for (k=0; k<bs; k++ ) {
1025         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1026         col++; a += bs;
1027       }
1028     }
1029   }
1030   PetscFree(rvals);
1031   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1032   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1033 
1034   if (matout != PETSC_NULL) {
1035     *matout = B;
1036   } else {
1037     /* This isn't really an in-place transpose .... but free data structures from baij */
1038     PetscFree(baij->rowners);
1039     ierr = MatDestroy(baij->A); CHKERRQ(ierr);
1040     ierr = MatDestroy(baij->B); CHKERRQ(ierr);
1041     if (baij->colmap) PetscFree(baij->colmap);
1042     if (baij->garray) PetscFree(baij->garray);
1043     if (baij->lvec) VecDestroy(baij->lvec);
1044     if (baij->Mvctx) VecScatterDestroy(baij->Mvctx);
1045     PetscFree(baij);
1046     PetscMemcpy(A,B,sizeof(struct _Mat));
1047     PetscHeaderDestroy(B);
1048   }
1049   return 0;
1050 }
1051 
1052 #undef __FUNC__
1053 #define __FUNC__ "MatDiagonalScale_MPIBAIJ"
1054 int MatDiagonalScale_MPIBAIJ(Mat A,Vec ll,Vec rr)
1055 {
1056   Mat a = ((Mat_MPIBAIJ *) A->data)->A;
1057   Mat b = ((Mat_MPIBAIJ *) A->data)->B;
1058   int ierr,s1,s2,s3;
1059 
1060   if (ll)  {
1061     ierr = VecGetLocalSize(ll,&s1); CHKERRQ(ierr);
1062     ierr = MatGetLocalSize(A,&s2,&s3); CHKERRQ(ierr);
1063     if (s1!=s2) SETERRQ(1,0,"non-conforming local sizes");
1064     ierr = MatDiagonalScale(a,ll,0); CHKERRQ(ierr);
1065     ierr = MatDiagonalScale(b,ll,0); CHKERRQ(ierr);
1066   }
1067   if (rr) SETERRQ(1,0,"not supported for right vector");
1068   return 0;
1069 }
1070 
1071 /* the code does not do the diagonal entries correctly unless the
1072    matrix is square and the column and row owerships are identical.
1073    This is a BUG. The only way to fix it seems to be to access
1074    baij->A and baij->B directly and not through the MatZeroRows()
1075    routine.
1076 */
1077 #undef __FUNC__
1078 #define __FUNC__ "MatZeroRows_MPIBAIJ"
1079 static int MatZeroRows_MPIBAIJ(Mat A,IS is,Scalar *diag)
1080 {
1081   Mat_MPIBAIJ    *l = (Mat_MPIBAIJ *) A->data;
1082   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
1083   int            *procs,*nprocs,j,found,idx,nsends,*work;
1084   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
1085   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
1086   int            *lens,imdex,*lrows,*values,bs=l->bs;
1087   MPI_Comm       comm = A->comm;
1088   MPI_Request    *send_waits,*recv_waits;
1089   MPI_Status     recv_status,*send_status;
1090   IS             istmp;
1091 
1092   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
1093   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
1094 
1095   /*  first count number of contributors to each processor */
1096   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
1097   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
1098   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
1099   for ( i=0; i<N; i++ ) {
1100     idx = rows[i];
1101     found = 0;
1102     for ( j=0; j<size; j++ ) {
1103       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
1104         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
1105       }
1106     }
1107     if (!found) SETERRQ(1,0,"Index out of range");
1108   }
1109   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
1110 
1111   /* inform other processors of number of messages and max length*/
1112   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
1113   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
1114   nrecvs = work[rank];
1115   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
1116   nmax = work[rank];
1117   PetscFree(work);
1118 
1119   /* post receives:   */
1120   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
1121   CHKPTRQ(rvalues);
1122   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
1123   CHKPTRQ(recv_waits);
1124   for ( i=0; i<nrecvs; i++ ) {
1125     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
1126   }
1127 
1128   /* do sends:
1129       1) starts[i] gives the starting index in svalues for stuff going to
1130          the ith processor
1131   */
1132   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
1133   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
1134   CHKPTRQ(send_waits);
1135   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
1136   starts[0] = 0;
1137   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1138   for ( i=0; i<N; i++ ) {
1139     svalues[starts[owner[i]]++] = rows[i];
1140   }
1141   ISRestoreIndices(is,&rows);
1142 
1143   starts[0] = 0;
1144   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1145   count = 0;
1146   for ( i=0; i<size; i++ ) {
1147     if (procs[i]) {
1148       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
1149     }
1150   }
1151   PetscFree(starts);
1152 
1153   base = owners[rank]*bs;
1154 
1155   /*  wait on receives */
1156   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
1157   source = lens + nrecvs;
1158   count  = nrecvs; slen = 0;
1159   while (count) {
1160     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
1161     /* unpack receives into our local space */
1162     MPI_Get_count(&recv_status,MPI_INT,&n);
1163     source[imdex]  = recv_status.MPI_SOURCE;
1164     lens[imdex]  = n;
1165     slen += n;
1166     count--;
1167   }
1168   PetscFree(recv_waits);
1169 
1170   /* move the data into the send scatter */
1171   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
1172   count = 0;
1173   for ( i=0; i<nrecvs; i++ ) {
1174     values = rvalues + i*nmax;
1175     for ( j=0; j<lens[i]; j++ ) {
1176       lrows[count++] = values[j] - base;
1177     }
1178   }
1179   PetscFree(rvalues); PetscFree(lens);
1180   PetscFree(owner); PetscFree(nprocs);
1181 
1182   /* actually zap the local rows */
1183   ierr = ISCreateGeneral(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
1184   PLogObjectParent(A,istmp);
1185   PetscFree(lrows);
1186   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
1187   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
1188   ierr = ISDestroy(istmp); CHKERRQ(ierr);
1189 
1190   /* wait on sends */
1191   if (nsends) {
1192     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
1193     CHKPTRQ(send_status);
1194     MPI_Waitall(nsends,send_waits,send_status);
1195     PetscFree(send_status);
1196   }
1197   PetscFree(send_waits); PetscFree(svalues);
1198 
1199   return 0;
1200 }
1201 extern int MatPrintHelp_SeqBAIJ(Mat);
1202 #undef __FUNC__
1203 #define __FUNC__ "MatPrintHelp_MPIBAIJ"
1204 static int MatPrintHelp_MPIBAIJ(Mat A)
1205 {
1206   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1207 
1208   if (!a->rank) return MatPrintHelp_SeqBAIJ(a->A);
1209   else return 0;
1210 }
1211 
1212 #undef __FUNC__
1213 #define __FUNC__ "MatSetUnfactored_MPIBAIJ"
1214 static int MatSetUnfactored_MPIBAIJ(Mat A)
1215 {
1216   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1217   int         ierr;
1218   ierr = MatSetUnfactored(a->A); CHKERRQ(ierr);
1219   return 0;
1220 }
1221 
1222 static int MatConvertSameType_MPIBAIJ(Mat,Mat *,int);
1223 
1224 /* -------------------------------------------------------------------*/
1225 static struct _MatOps MatOps = {
1226   MatSetValues_MPIBAIJ,MatGetRow_MPIBAIJ,MatRestoreRow_MPIBAIJ,MatMult_MPIBAIJ,
1227   MatMultAdd_MPIBAIJ,MatMultTrans_MPIBAIJ,MatMultTransAdd_MPIBAIJ,0,
1228   0,0,0,0,
1229   0,0,MatTranspose_MPIBAIJ,MatGetInfo_MPIBAIJ,
1230   0,MatGetDiagonal_MPIBAIJ,MatDiagonalScale_MPIBAIJ,MatNorm_MPIBAIJ,
1231   MatAssemblyBegin_MPIBAIJ,MatAssemblyEnd_MPIBAIJ,0,MatSetOption_MPIBAIJ,
1232   MatZeroEntries_MPIBAIJ,MatZeroRows_MPIBAIJ,0,
1233   0,0,0,MatGetSize_MPIBAIJ,
1234   MatGetLocalSize_MPIBAIJ,MatGetOwnershipRange_MPIBAIJ,0,0,
1235   0,0,0,MatConvertSameType_MPIBAIJ,0,0,
1236   0,0,0,MatGetSubMatrices_MPIBAIJ,
1237   MatIncreaseOverlap_MPIBAIJ,MatGetValues_MPIBAIJ,0,MatPrintHelp_MPIBAIJ,
1238   MatScale_MPIBAIJ,0,0,0,MatGetBlockSize_MPIBAIJ,
1239   0,0,0,0,0,0,MatSetUnfactored_MPIBAIJ};
1240 
1241 
1242 #undef __FUNC__
1243 #define __FUNC__ "MatCreateMPIBAIJ"
1244 /*@C
1245    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
1246    (block compressed row).  For good matrix assembly performance
1247    the user should preallocate the matrix storage by setting the parameters
1248    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1249    performance can be increased by more than a factor of 50.
1250 
1251    Input Parameters:
1252 .  comm - MPI communicator
1253 .  bs   - size of blockk
1254 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1255            This value should be the same as the local size used in creating the
1256            y vector for the matrix-vector product y = Ax.
1257 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1258            This value should be the same as the local size used in creating the
1259            x vector for the matrix-vector product y = Ax.
1260 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1261 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1262 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
1263            submatrix  (same for all local rows)
1264 .  d_nzz - array containing the number of block nonzeros in the various block rows
1265            of the in diagonal portion of the local (possibly different for each block
1266            row) or PETSC_NULL.  You must leave room for the diagonal entry even if
1267            it is zero.
1268 .  o_nz  - number of block nonzeros per block row in the off-diagonal portion of local
1269            submatrix (same for all local rows).
1270 .  o_nzz - array containing the number of nonzeros in the various block rows of the
1271            off-diagonal portion of the local submatrix (possibly different for
1272            each block row) or PETSC_NULL.
1273 
1274    Output Parameter:
1275 .  A - the matrix
1276 
1277    Notes:
1278    The user MUST specify either the local or global matrix dimensions
1279    (possibly both).
1280 
1281    Storage Information:
1282    For a square global matrix we define each processor's diagonal portion
1283    to be its local rows and the corresponding columns (a square submatrix);
1284    each processor's off-diagonal portion encompasses the remainder of the
1285    local matrix (a rectangular submatrix).
1286 
1287    The user can specify preallocated storage for the diagonal part of
1288    the local submatrix with either d_nz or d_nnz (not both).  Set
1289    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1290    memory allocation.  Likewise, specify preallocated storage for the
1291    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1292 
1293    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1294    the figure below we depict these three local rows and all columns (0-11).
1295 
1296 $          0 1 2 3 4 5 6 7 8 9 10 11
1297 $         -------------------
1298 $  row 3  |  o o o d d d o o o o o o
1299 $  row 4  |  o o o d d d o o o o o o
1300 $  row 5  |  o o o d d d o o o o o o
1301 $         -------------------
1302 $
1303 
1304    Thus, any entries in the d locations are stored in the d (diagonal)
1305    submatrix, and any entries in the o locations are stored in the
1306    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1307    stored simply in the MATSEQBAIJ format for compressed row storage.
1308 
1309    Now d_nz should indicate the number of nonzeros per row in the d matrix,
1310    and o_nz should indicate the number of nonzeros per row in the o matrix.
1311    In general, for PDE problems in which most nonzeros are near the diagonal,
1312    one expects d_nz >> o_nz.   For large problems you MUST preallocate memory
1313    or you will get TERRIBLE performance; see the users' manual chapter on
1314    matrices.
1315 
1316 .keywords: matrix, block, aij, compressed row, sparse, parallel
1317 
1318 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues()
1319 @*/
1320 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
1321                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1322 {
1323   Mat          B;
1324   Mat_MPIBAIJ  *b;
1325   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE,size;
1326 
1327   if (bs < 1) SETERRQ(1,0,"invalid block size specified");
1328   *A = 0;
1329   PetscHeaderCreate(B,_Mat,MAT_COOKIE,MATMPIBAIJ,comm);
1330   PLogObjectCreate(B);
1331   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
1332   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
1333   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1334   MPI_Comm_size(comm,&size);
1335   if (size == 1) {
1336     B->ops.getrowij          = MatGetRowIJ_MPIBAIJ;
1337     B->ops.restorerowij      = MatRestoreRowIJ_MPIBAIJ;
1338     B->ops.lufactorsymbolic  = MatLUFactorSymbolic_MPIBAIJ;
1339     B->ops.lufactornumeric   = MatLUFactorNumeric_MPIBAIJ;
1340     B->ops.lufactor          = MatLUFactor_MPIBAIJ;
1341     B->ops.solve             = MatSolve_MPIBAIJ;
1342     B->ops.solveadd          = MatSolveAdd_MPIBAIJ;
1343     B->ops.solvetrans        = MatSolveTrans_MPIBAIJ;
1344     B->ops.solvetransadd     = MatSolveTransAdd_MPIBAIJ;
1345     B->ops.ilufactorsymbolic = MatILUFactorSymbolic_MPIBAIJ;
1346   }
1347 
1348   B->destroy    = MatDestroy_MPIBAIJ;
1349   B->view       = MatView_MPIBAIJ;
1350   B->mapping    = 0;
1351   B->factor     = 0;
1352   B->assembled  = PETSC_FALSE;
1353 
1354   b->insertmode = NOT_SET_VALUES;
1355   MPI_Comm_rank(comm,&b->rank);
1356   MPI_Comm_size(comm,&b->size);
1357 
1358   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
1359     SETERRQ(1,0,"Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1360   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) SETERRQ(1,0,"either M or m should be specified");
1361   if ( M == PETSC_DECIDE && n == PETSC_DECIDE)SETERRQ(1,0,"either N or n should be specified");
1362   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
1363   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
1364 
1365   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1366     work[0] = m; work[1] = n;
1367     mbs = m/bs; nbs = n/bs;
1368     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
1369     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
1370     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
1371   }
1372   if (m == PETSC_DECIDE) {
1373     Mbs = M/bs;
1374     if (Mbs*bs != M) SETERRQ(1,0,"No of global rows must be divisible by blocksize");
1375     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
1376     m   = mbs*bs;
1377   }
1378   if (n == PETSC_DECIDE) {
1379     Nbs = N/bs;
1380     if (Nbs*bs != N) SETERRQ(1,0,"No of global cols must be divisible by blocksize");
1381     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
1382     n   = nbs*bs;
1383   }
1384   if (mbs*bs != m || nbs*bs != n) SETERRQ(1,0,"No of local rows, cols must be divisible by blocksize");
1385 
1386   b->m = m; B->m = m;
1387   b->n = n; B->n = n;
1388   b->N = N; B->N = N;
1389   b->M = M; B->M = M;
1390   b->bs  = bs;
1391   b->bs2 = bs*bs;
1392   b->mbs = mbs;
1393   b->nbs = nbs;
1394   b->Mbs = Mbs;
1395   b->Nbs = Nbs;
1396 
1397   /* build local table of row and column ownerships */
1398   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1399   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
1400   b->cowners = b->rowners + b->size + 2;
1401   MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
1402   b->rowners[0] = 0;
1403   for ( i=2; i<=b->size; i++ ) {
1404     b->rowners[i] += b->rowners[i-1];
1405   }
1406   b->rstart    = b->rowners[b->rank];
1407   b->rend      = b->rowners[b->rank+1];
1408   b->rstart_bs = b->rstart * bs;
1409   b->rend_bs   = b->rend * bs;
1410 
1411   MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
1412   b->cowners[0] = 0;
1413   for ( i=2; i<=b->size; i++ ) {
1414     b->cowners[i] += b->cowners[i-1];
1415   }
1416   b->cstart    = b->cowners[b->rank];
1417   b->cend      = b->cowners[b->rank+1];
1418   b->cstart_bs = b->cstart * bs;
1419   b->cend_bs   = b->cend * bs;
1420 
1421   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1422   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1423   PLogObjectParent(B,b->A);
1424   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1425   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1426   PLogObjectParent(B,b->B);
1427 
1428   /* build cache for off array entries formed */
1429   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1430   b->donotstash  = 0;
1431   b->colmap      = 0;
1432   b->garray      = 0;
1433   b->roworiented = 1;
1434 
1435   /* stuff used for matrix vector multiply */
1436   b->lvec      = 0;
1437   b->Mvctx     = 0;
1438 
1439   /* stuff for MatGetRow() */
1440   b->rowindices   = 0;
1441   b->rowvalues    = 0;
1442   b->getrowactive = PETSC_FALSE;
1443 
1444   *A = B;
1445   return 0;
1446 }
1447 
1448 #undef __FUNC__
1449 #define __FUNC__ "MatConvertSameType_MPIBAIJ"
1450 static int MatConvertSameType_MPIBAIJ(Mat matin,Mat *newmat,int cpvalues)
1451 {
1452   Mat        mat;
1453   Mat_MPIBAIJ *a,*oldmat = (Mat_MPIBAIJ *) matin->data;
1454   int        ierr, len=0, flg;
1455 
1456   *newmat       = 0;
1457   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIBAIJ,matin->comm);
1458   PLogObjectCreate(mat);
1459   mat->data       = (void *) (a = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(a);
1460   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1461   mat->destroy    = MatDestroy_MPIBAIJ;
1462   mat->view       = MatView_MPIBAIJ;
1463   mat->factor     = matin->factor;
1464   mat->assembled  = PETSC_TRUE;
1465 
1466   a->m = mat->m   = oldmat->m;
1467   a->n = mat->n   = oldmat->n;
1468   a->M = mat->M   = oldmat->M;
1469   a->N = mat->N   = oldmat->N;
1470 
1471   a->bs  = oldmat->bs;
1472   a->bs2 = oldmat->bs2;
1473   a->mbs = oldmat->mbs;
1474   a->nbs = oldmat->nbs;
1475   a->Mbs = oldmat->Mbs;
1476   a->Nbs = oldmat->Nbs;
1477 
1478   a->rstart       = oldmat->rstart;
1479   a->rend         = oldmat->rend;
1480   a->cstart       = oldmat->cstart;
1481   a->cend         = oldmat->cend;
1482   a->size         = oldmat->size;
1483   a->rank         = oldmat->rank;
1484   a->insertmode   = NOT_SET_VALUES;
1485   a->rowvalues    = 0;
1486   a->getrowactive = PETSC_FALSE;
1487 
1488   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1489   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
1490   a->cowners = a->rowners + a->size + 2;
1491   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1492   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1493   if (oldmat->colmap) {
1494     a->colmap = (int *) PetscMalloc((a->Nbs)*sizeof(int));CHKPTRQ(a->colmap);
1495     PLogObjectMemory(mat,(a->Nbs)*sizeof(int));
1496     PetscMemcpy(a->colmap,oldmat->colmap,(a->Nbs)*sizeof(int));
1497   } else a->colmap = 0;
1498   if (oldmat->garray && (len = ((Mat_SeqBAIJ *) (oldmat->B->data))->nbs)) {
1499     a->garray = (int *) PetscMalloc(len*sizeof(int)); CHKPTRQ(a->garray);
1500     PLogObjectMemory(mat,len*sizeof(int));
1501     PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1502   } else a->garray = 0;
1503 
1504   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1505   PLogObjectParent(mat,a->lvec);
1506   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1507   PLogObjectParent(mat,a->Mvctx);
1508   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1509   PLogObjectParent(mat,a->A);
1510   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1511   PLogObjectParent(mat,a->B);
1512   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1513   if (flg) {
1514     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1515   }
1516   *newmat = mat;
1517   return 0;
1518 }
1519 
1520 #include "sys.h"
1521 
1522 #undef __FUNC__
1523 #define __FUNC__ "MatLoad_MPIBAIJ"
1524 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
1525 {
1526   Mat          A;
1527   int          i, nz, ierr, j,rstart, rend, fd;
1528   Scalar       *vals,*buf;
1529   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1530   MPI_Status   status;
1531   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
1532   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
1533   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,bs2,Mbs,mbs,extra_rows;
1534   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
1535   int          dcount,kmax,k,nzcount,tmp;
1536 
1537 
1538   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
1539   bs2  = bs*bs;
1540 
1541   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1542   if (!rank) {
1543     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1544     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
1545     if (header[0] != MAT_COOKIE) SETERRQ(1,0,"not matrix object");
1546   }
1547 
1548   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1549   M = header[1]; N = header[2];
1550 
1551   if (M != N) SETERRQ(1,0,"Can only do square matrices");
1552 
1553   /*
1554      This code adds extra rows to make sure the number of rows is
1555      divisible by the blocksize
1556   */
1557   Mbs        = M/bs;
1558   extra_rows = bs - M + bs*(Mbs);
1559   if (extra_rows == bs) extra_rows = 0;
1560   else                  Mbs++;
1561   if (extra_rows &&!rank) {
1562     PLogInfo(0,"MatLoad_MPIBAIJ:Padding loaded matrix to match blocksize\n");
1563   }
1564 
1565   /* determine ownership of all rows */
1566   mbs = Mbs/size + ((Mbs % size) > rank);
1567   m   = mbs * bs;
1568   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
1569   browners = rowners + size + 1;
1570   MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1571   rowners[0] = 0;
1572   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
1573   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
1574   rstart = rowners[rank];
1575   rend   = rowners[rank+1];
1576 
1577   /* distribute row lengths to all processors */
1578   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
1579   if (!rank) {
1580     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
1581     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1582     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
1583     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1584     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
1585     MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);
1586     PetscFree(sndcounts);
1587   }
1588   else {
1589     MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);
1590   }
1591 
1592   if (!rank) {
1593     /* calculate the number of nonzeros on each processor */
1594     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1595     PetscMemzero(procsnz,size*sizeof(int));
1596     for ( i=0; i<size; i++ ) {
1597       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
1598         procsnz[i] += rowlengths[j];
1599       }
1600     }
1601     PetscFree(rowlengths);
1602 
1603     /* determine max buffer needed and allocate it */
1604     maxnz = 0;
1605     for ( i=0; i<size; i++ ) {
1606       maxnz = PetscMax(maxnz,procsnz[i]);
1607     }
1608     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1609 
1610     /* read in my part of the matrix column indices  */
1611     nz = procsnz[0];
1612     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1613     mycols = ibuf;
1614     if (size == 1)  nz -= extra_rows;
1615     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1616     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
1617 
1618     /* read in every ones (except the last) and ship off */
1619     for ( i=1; i<size-1; i++ ) {
1620       nz = procsnz[i];
1621       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1622       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1623     }
1624     /* read in the stuff for the last proc */
1625     if ( size != 1 ) {
1626       nz = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
1627       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1628       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
1629       MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);
1630     }
1631     PetscFree(cols);
1632   }
1633   else {
1634     /* determine buffer space needed for message */
1635     nz = 0;
1636     for ( i=0; i<m; i++ ) {
1637       nz += locrowlens[i];
1638     }
1639     ibuf = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1640     mycols = ibuf;
1641     /* receive message of column indices*/
1642     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1643     MPI_Get_count(&status,MPI_INT,&maxnz);
1644     if (maxnz != nz) SETERRQ(1,0,"something is wrong with file");
1645   }
1646 
1647   /* loop over local rows, determining number of off diagonal entries */
1648   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
1649   odlens = dlens + (rend-rstart);
1650   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
1651   PetscMemzero(mask,3*Mbs*sizeof(int));
1652   masked1 = mask    + Mbs;
1653   masked2 = masked1 + Mbs;
1654   rowcount = 0; nzcount = 0;
1655   for ( i=0; i<mbs; i++ ) {
1656     dcount  = 0;
1657     odcount = 0;
1658     for ( j=0; j<bs; j++ ) {
1659       kmax = locrowlens[rowcount];
1660       for ( k=0; k<kmax; k++ ) {
1661         tmp = mycols[nzcount++]/bs;
1662         if (!mask[tmp]) {
1663           mask[tmp] = 1;
1664           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
1665           else masked1[dcount++] = tmp;
1666         }
1667       }
1668       rowcount++;
1669     }
1670 
1671     dlens[i]  = dcount;
1672     odlens[i] = odcount;
1673 
1674     /* zero out the mask elements we set */
1675     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
1676     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
1677   }
1678 
1679   /* create our matrix */
1680   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);
1681          CHKERRQ(ierr);
1682   A = *newmat;
1683   MatSetOption(A,MAT_COLUMNS_SORTED);
1684 
1685   if (!rank) {
1686     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
1687     /* read in my part of the matrix numerical values  */
1688     nz = procsnz[0];
1689     vals = buf;
1690     mycols = ibuf;
1691     if (size == 1)  nz -= extra_rows;
1692     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1693     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
1694 
1695     /* insert into matrix */
1696     jj      = rstart*bs;
1697     for ( i=0; i<m; i++ ) {
1698       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1699       mycols += locrowlens[i];
1700       vals   += locrowlens[i];
1701       jj++;
1702     }
1703     /* read in other processors (except the last one) and ship out */
1704     for ( i=1; i<size-1; i++ ) {
1705       nz = procsnz[i];
1706       vals = buf;
1707       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1708       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1709     }
1710     /* the last proc */
1711     if ( size != 1 ){
1712       nz = procsnz[i] - extra_rows;
1713       vals = buf;
1714       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1715       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
1716       MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);
1717     }
1718     PetscFree(procsnz);
1719   }
1720   else {
1721     /* receive numeric values */
1722     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
1723 
1724     /* receive message of values*/
1725     vals = buf;
1726     mycols = ibuf;
1727     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1728     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1729     if (maxnz != nz) SETERRQ(1,0,"something is wrong with file");
1730 
1731     /* insert into matrix */
1732     jj      = rstart*bs;
1733     for ( i=0; i<m; i++ ) {
1734       ierr    = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1735       mycols += locrowlens[i];
1736       vals   += locrowlens[i];
1737       jj++;
1738     }
1739   }
1740   PetscFree(locrowlens);
1741   PetscFree(buf);
1742   PetscFree(ibuf);
1743   PetscFree(rowners);
1744   PetscFree(dlens);
1745   PetscFree(mask);
1746   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1747   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1748   return 0;
1749 }
1750 
1751 
1752