xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision 58667388dc052d1a30bc35acfb34c66b1cc7684e)
1 #ifndef lint
2 static char vcid[] = "$Id: mpibaij.c,v 1.13 1996/07/11 04:06:08 balay Exp balay $";
3 #endif
4 
5 #include "mpibaij.h"
6 
7 
8 #include "draw.h"
9 #include "pinclude/pviewer.h"
10 
11 extern int MatSetUpMultiply_MPIBAIJ(Mat);
12 extern int DisAssemble_MPIBAIJ(Mat);
13 extern int MatIncreaseOverlap_MPIBAIJ(Mat,int,IS *,int);
14 extern int MatGetSubMatrices_MPIBAIJ(Mat,int,IS *,IS *,MatGetSubMatrixCall,Mat **);
15 
16 /* local utility routine that creates a mapping from the global column
17 number to the local number in the off-diagonal part of the local
18 storage of the matrix.  This is done in a non scable way since the
19 length of colmap equals the global matrix length.
20 */
21 static int CreateColmap_MPIBAIJ_Private(Mat mat)
22 {
23   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
24   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
25   int        nbs = B->nbs,i;
26 
27   baij->colmap = (int *) PetscMalloc(baij->Nbs*sizeof(int));CHKPTRQ(baij->colmap);
28   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
29   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
30   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i;
31   return 0;
32 }
33 
34 
35 static int MatGetReordering_MPIBAIJ(Mat mat,MatReordering type,IS *rperm,IS *cperm)
36 {
37   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
38   int         ierr;
39   if (baij->size == 1) {
40     ierr = MatGetReordering(baij->A,type,rperm,cperm); CHKERRQ(ierr);
41   } else SETERRQ(1,"MatGetReordering_MPIBAIJ:not supported in parallel");
42   return 0;
43 }
44 
45 static int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
46 {
47   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
48   Scalar      value;
49   int         ierr,i,j, rstart = baij->rstart, rend = baij->rend;
50   int         cstart = baij->cstart, cend = baij->cend,row,col;
51   int         roworiented = baij->roworiented,rstart_orig,rend_orig;
52   int         cstart_orig,cend_orig,bs=baij->bs;
53 
54   if (baij->insertmode != NOT_SET_VALUES && baij->insertmode != addv) {
55     SETERRQ(1,"MatSetValues_MPIBAIJ:Cannot mix inserts and adds");
56   }
57   baij->insertmode = addv;
58   rstart_orig = rstart*bs;
59   rend_orig   = rend*bs;
60   cstart_orig = cstart*bs;
61   cend_orig   = cend*bs;
62   for ( i=0; i<m; i++ ) {
63     if (im[i] < 0) SETERRQ(1,"MatSetValues_MPIBAIJ:Negative row");
64     if (im[i] >= baij->M) SETERRQ(1,"MatSetValues_MPIBAIJ:Row too large");
65     if (im[i] >= rstart_orig && im[i] < rend_orig) {
66       row = im[i] - rstart_orig;
67       for ( j=0; j<n; j++ ) {
68         if (in[j] < 0) SETERRQ(1,"MatSetValues_MPIBAIJ:Negative column");
69         if (in[j] >= baij->N) SETERRQ(1,"MatSetValues_MPIBAIJ:Col too large");
70         if (in[j] >= cstart_orig && in[j] < cend_orig){
71           col = in[j] - cstart_orig;
72           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
73           ierr = MatSetValues(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
74         }
75         else {
76           if (mat->was_assembled) {
77             if (!baij->colmap) {ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);}
78             col = baij->colmap[in[j]/bs] +in[j]%bs;
79             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
80               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
81               col =  in[j];
82             }
83           }
84           else col = in[j];
85           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
86           ierr = MatSetValues(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
87         }
88       }
89     }
90     else {
91       if (roworiented) {
92         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
93       }
94       else {
95         row = im[i];
96         for ( j=0; j<n; j++ ) {
97           ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
98         }
99       }
100     }
101   }
102   return 0;
103 }
104 
105 static int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
106 {
107   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
108   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
109   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col;
110 
111   for ( i=0; i<m; i++ ) {
112     if (idxm[i] < 0) SETERRQ(1,"MatGetValues_MPIBAIJ:Negative row");
113     if (idxm[i] >= baij->M) SETERRQ(1,"MatGetValues_MPIBAIJ:Row too large");
114     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
115       row = idxm[i] - bsrstart;
116       for ( j=0; j<n; j++ ) {
117         if (idxn[j] < 0) SETERRQ(1,"MatGetValues_MPIBAIJ:Negative column");
118         if (idxn[j] >= baij->N) SETERRQ(1,"MatGetValues_MPIBAIJ:Col too large");
119         if (idxn[j] >= bscstart && idxn[j] < bscend){
120           col = idxn[j] - bscstart;
121           ierr = MatGetValues(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
122         }
123         else {
124           if (!baij->colmap) {ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);}
125           if ( baij->garray[baij->colmap[idxn[j]/bs]] != idxn[j]/bs ) *(v+i*n+j) = 0.0;
126           else {
127             col = baij->colmap[idxn[j]/bs]*bs + idxn[j]%bs;
128             ierr = MatGetValues(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
129           }
130         }
131       }
132     }
133     else {
134       SETERRQ(1,"MatGetValues_MPIBAIJ:Only local values currently supported");
135     }
136   }
137   return 0;
138 }
139 
140 static int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
141 {
142   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
143   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
144   int        ierr, i,bs2=baij->bs2;
145   double     sum = 0.0;
146   Scalar     *v;
147 
148   if (baij->size == 1) {
149     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
150   } else {
151     if (type == NORM_FROBENIUS) {
152       v = amat->a;
153       for (i=0; i<amat->nz*bs2; i++ ) {
154 #if defined(PETSC_COMPLEX)
155         sum += real(conj(*v)*(*v)); v++;
156 #else
157         sum += (*v)*(*v); v++;
158 #endif
159       }
160       v = bmat->a;
161       for (i=0; i<bmat->nz*bs2; i++ ) {
162 #if defined(PETSC_COMPLEX)
163         sum += real(conj(*v)*(*v)); v++;
164 #else
165         sum += (*v)*(*v); v++;
166 #endif
167       }
168       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
169       *norm = sqrt(*norm);
170     }
171     else
172       SETERRQ(1,"MatNorm_SeqBAIJ:No support for this norm yet");
173   }
174   return 0;
175 }
176 
177 static int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
178 {
179   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
180   MPI_Comm    comm = mat->comm;
181   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
182   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
183   MPI_Request *send_waits,*recv_waits;
184   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
185   InsertMode  addv;
186   Scalar      *rvalues,*svalues;
187 
188   /* make sure all processors are either in INSERTMODE or ADDMODE */
189   MPI_Allreduce(&baij->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
190   if (addv == (ADD_VALUES|INSERT_VALUES)) {
191     SETERRQ(1,"MatAssemblyBegin_MPIBAIJ:Some processors inserted others added");
192   }
193   baij->insertmode = addv; /* in case this processor had no cache */
194 
195   /*  first count number of contributors to each processor */
196   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
197   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
198   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
199   for ( i=0; i<baij->stash.n; i++ ) {
200     idx = baij->stash.idx[i];
201     for ( j=0; j<size; j++ ) {
202       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
203         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
204       }
205     }
206   }
207   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
208 
209   /* inform other processors of number of messages and max length*/
210   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
211   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
212   nreceives = work[rank];
213   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
214   nmax = work[rank];
215   PetscFree(work);
216 
217   /* post receives:
218        1) each message will consist of ordered pairs
219      (global index,value) we store the global index as a double
220      to simplify the message passing.
221        2) since we don't know how long each individual message is we
222      allocate the largest needed buffer for each receive. Potentially
223      this is a lot of wasted space.
224 
225 
226        This could be done better.
227   */
228   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
229   CHKPTRQ(rvalues);
230   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
231   CHKPTRQ(recv_waits);
232   for ( i=0; i<nreceives; i++ ) {
233     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
234               comm,recv_waits+i);
235   }
236 
237   /* do sends:
238       1) starts[i] gives the starting index in svalues for stuff going to
239          the ith processor
240   */
241   svalues = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
242   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
243   CHKPTRQ(send_waits);
244   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
245   starts[0] = 0;
246   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
247   for ( i=0; i<baij->stash.n; i++ ) {
248     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
249     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
250     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
251   }
252   PetscFree(owner);
253   starts[0] = 0;
254   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
255   count = 0;
256   for ( i=0; i<size; i++ ) {
257     if (procs[i]) {
258       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
259                 comm,send_waits+count++);
260     }
261   }
262   PetscFree(starts); PetscFree(nprocs);
263 
264   /* Free cache space */
265   PLogInfo(0,"[%d]MatAssemblyBegin_MPIBAIJ:Number of off processor values %d\n",rank,baij->stash.n);
266   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
267 
268   baij->svalues    = svalues;    baij->rvalues    = rvalues;
269   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
270   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
271   baij->rmax       = nmax;
272 
273   return 0;
274 }
275 
276 
277 static int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
278 {
279   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
280   MPI_Status  *send_status,recv_status;
281   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
282   int         bs=baij->bs,row,col,other_disassembled;
283   Scalar      *values,val;
284   InsertMode  addv = baij->insertmode;
285 
286   /*  wait on receives */
287   while (count) {
288     MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);
289     /* unpack receives into our local space */
290     values = baij->rvalues + 3*imdex*baij->rmax;
291     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
292     n = n/3;
293     for ( i=0; i<n; i++ ) {
294       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
295       col = (int) PetscReal(values[3*i+1]);
296       val = values[3*i+2];
297       if (col >= baij->cstart*bs && col < baij->cend*bs) {
298         col -= baij->cstart*bs;
299         MatSetValues(baij->A,1,&row,1,&col,&val,addv);
300       }
301       else {
302         if (mat->was_assembled) {
303           if (!baij->colmap) {ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);}
304           col = baij->colmap[col/bs]*bs + col%bs;
305           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
306             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
307             col = (int) PetscReal(values[3*i+1]);
308           }
309         }
310         MatSetValues(baij->B,1,&row,1,&col,&val,addv);
311       }
312     }
313     count--;
314   }
315   PetscFree(baij->recv_waits); PetscFree(baij->rvalues);
316 
317   /* wait on sends */
318   if (baij->nsends) {
319     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));
320     CHKPTRQ(send_status);
321     MPI_Waitall(baij->nsends,baij->send_waits,send_status);
322     PetscFree(send_status);
323   }
324   PetscFree(baij->send_waits); PetscFree(baij->svalues);
325 
326   baij->insertmode = NOT_SET_VALUES;
327   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
328   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
329 
330   /* determine if any processor has disassembled, if so we must
331      also disassemble ourselfs, in order that we may reassemble. */
332   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
333   if (mat->was_assembled && !other_disassembled) {
334     ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
335   }
336 
337   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
338     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
339   }
340   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
341   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
342 
343   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
344   return 0;
345 }
346 
347 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
348 {
349   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
350   int          ierr;
351 
352   if (baij->size == 1) {
353     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
354   }
355   else SETERRQ(1,"MatView_MPIBAIJ_Binary:Only uniprocessor output supported");
356   return 0;
357 }
358 
359 static int MatView_MPIBAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
360 {
361   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
362   int          ierr, format,rank,bs=baij->bs;
363   FILE         *fd;
364   ViewerType   vtype;
365 
366   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
367   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
368     ierr = ViewerGetFormat(viewer,&format);
369     if (format == ASCII_FORMAT_INFO_DETAILED) {
370       int nz, nzalloc, mem;
371       MPI_Comm_rank(mat->comm,&rank);
372       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
373       ierr = MatGetInfo(mat,MAT_LOCAL,&nz,&nzalloc,&mem);
374       PetscSequentialPhaseBegin(mat->comm,1);
375       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
376               rank,baij->m,nz*bs,nzalloc*bs,baij->bs,mem);
377       ierr = MatGetInfo(baij->A,MAT_LOCAL,&nz,&nzalloc,&mem);
378       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,nz*bs);
379       ierr = MatGetInfo(baij->B,MAT_LOCAL,&nz,&nzalloc,&mem);
380       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,nz*bs);
381       fflush(fd);
382       PetscSequentialPhaseEnd(mat->comm,1);
383       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
384       return 0;
385     }
386     else if (format == ASCII_FORMAT_INFO) {
387       return 0;
388     }
389   }
390 
391   if (vtype == DRAW_VIEWER) {
392     Draw       draw;
393     PetscTruth isnull;
394     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
395     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
396   }
397 
398   if (vtype == ASCII_FILE_VIEWER) {
399     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
400     PetscSequentialPhaseBegin(mat->comm,1);
401     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
402            baij->rank,baij->m,baij->rstart*bs,baij->rend*bs,baij->n,
403             baij->cstart*bs,baij->cend*bs);
404     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
405     ierr = MatView(baij->B,viewer); CHKERRQ(ierr);
406     fflush(fd);
407     PetscSequentialPhaseEnd(mat->comm,1);
408   }
409   else {
410     int size = baij->size;
411     rank = baij->rank;
412     if (size == 1) {
413       ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
414     }
415     else {
416       /* assemble the entire matrix onto first processor. */
417       Mat         A;
418       Mat_SeqBAIJ *Aloc;
419       int         M = baij->M, N = baij->N,*ai,*aj,row,col,i,j,k,*rvals;
420       int         mbs=baij->mbs;
421       Scalar      *a;
422 
423       if (!rank) {
424         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
425         CHKERRQ(ierr);
426       }
427       else {
428         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
429         CHKERRQ(ierr);
430       }
431       PLogObjectParent(mat,A);
432 
433       /* copy over the A part */
434       Aloc = (Mat_SeqBAIJ*) baij->A->data;
435       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
436       row = baij->rstart;
437       rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
438 
439       for ( i=0; i<mbs; i++ ) {
440         rvals[0] = bs*(baij->rstart + i);
441         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
442         for ( j=ai[i]; j<ai[i+1]; j++ ) {
443           col = (baij->cstart+aj[j])*bs;
444           for (k=0; k<bs; k++ ) {
445             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
446             col++; a += bs;
447           }
448         }
449       }
450       /* copy over the B part */
451       Aloc = (Mat_SeqBAIJ*) baij->B->data;
452       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
453       row = baij->rstart*bs;
454       for ( i=0; i<mbs; i++ ) {
455         rvals[0] = bs*(baij->rstart + i);
456         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
457         for ( j=ai[i]; j<ai[i+1]; j++ ) {
458           col = baij->garray[aj[j]]*bs;
459           for (k=0; k<bs; k++ ) {
460             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
461             col++; a += bs;
462           }
463         }
464       }
465       PetscFree(rvals);
466       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
467       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
468       if (!rank) {
469         ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
470       }
471       ierr = MatDestroy(A); CHKERRQ(ierr);
472     }
473   }
474   return 0;
475 }
476 
477 
478 
479 static int MatView_MPIBAIJ(PetscObject obj,Viewer viewer)
480 {
481   Mat         mat = (Mat) obj;
482   int         ierr;
483   ViewerType  vtype;
484 
485   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
486   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
487       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
488     ierr = MatView_MPIBAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
489   }
490   else if (vtype == BINARY_FILE_VIEWER) {
491     return MatView_MPIBAIJ_Binary(mat,viewer);
492   }
493   return 0;
494 }
495 
496 static int MatDestroy_MPIBAIJ(PetscObject obj)
497 {
498   Mat         mat = (Mat) obj;
499   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
500   int         ierr;
501 
502 #if defined(PETSC_LOG)
503   PLogObjectState(obj,"Rows=%d, Cols=%d",baij->M,baij->N);
504 #endif
505 
506   PetscFree(baij->rowners);
507   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
508   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
509   if (baij->colmap) PetscFree(baij->colmap);
510   if (baij->garray) PetscFree(baij->garray);
511   if (baij->lvec)   VecDestroy(baij->lvec);
512   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
513   if (baij->rowvalues) PetscFree(baij->rowvalues);
514   PetscFree(baij);
515   PLogObjectDestroy(mat);
516   PetscHeaderDestroy(mat);
517   return 0;
518 }
519 
520 static int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
521 {
522   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
523   int         ierr, nt;
524 
525   ierr = VecGetLocalSize(xx,&nt);  CHKERRQ(ierr);
526   if (nt != a->n) {
527     SETERRQ(1,"MatMult_MPIAIJ:Incompatible parition of A and xx");
528   }
529   ierr = VecGetLocalSize(yy,&nt);  CHKERRQ(ierr);
530   if (nt != a->m) {
531     SETERRQ(1,"MatMult_MPIAIJ:Incompatible parition of A and yy");
532   }
533   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx); CHKERRQ(ierr);
534   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
535   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx); CHKERRQ(ierr);
536   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
537   return 0;
538 }
539 
540 static int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
541 {
542   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
543   int        ierr;
544   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
545   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
546   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
547   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
548   return 0;
549 }
550 
551 static int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
552 {
553   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
554   int        ierr;
555 
556   /* do nondiagonal part */
557   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
558   /* send it on its way */
559   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,
560                 (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
561   /* do local part */
562   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
563   /* receive remote parts: note this assumes the values are not actually */
564   /* inserted in yy until the next line, which is true for my implementation*/
565   /* but is not perhaps always true. */
566   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,
567                   (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
568   return 0;
569 }
570 
571 static int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
572 {
573   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
574   int        ierr;
575 
576   /* do nondiagonal part */
577   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
578   /* send it on its way */
579   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,
580                  (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
581   /* do local part */
582   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
583   /* receive remote parts: note this assumes the values are not actually */
584   /* inserted in yy until the next line, which is true for my implementation*/
585   /* but is not perhaps always true. */
586   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,
587                   (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
588   return 0;
589 }
590 
591 /*
592   This only works correctly for square matrices where the subblock A->A is the
593    diagonal block
594 */
595 static int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
596 {
597   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
598   if (a->M != a->N)
599     SETERRQ(1,"MatGetDiagonal_MPIBAIJ:Supports only square matrix where A->A is diag block");
600   return MatGetDiagonal(a->A,v);
601 }
602 
603 static int MatScale_MPIBAIJ(Scalar *aa,Mat A)
604 {
605   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
606   int        ierr;
607   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
608   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
609   return 0;
610 }
611 static int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
612 {
613   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
614   *m = mat->M; *n = mat->N;
615   return 0;
616 }
617 
618 static int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
619 {
620   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
621   *m = mat->m; *n = mat->N;
622   return 0;
623 }
624 
625 static int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
626 {
627   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
628   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
629   return 0;
630 }
631 
632 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
633 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
634 
635 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
636 {
637   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
638   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
639   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
640   int        nztot, nzA, nzB, lrow, brstart = mat->rstart*bs, brend = mat->rend*bs;
641   int        *cmap, *idx_p,cstart = mat->cstart;
642 
643   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,"MatGetRow_MPIBAIJ:Already active");
644   mat->getrowactive = PETSC_TRUE;
645 
646   if (!mat->rowvalues && (idx || v)) {
647     /*
648         allocate enough space to hold information from the longest row.
649     */
650     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
651     int     max = 1,mbs = mat->mbs,tmp;
652     for ( i=0; i<mbs; i++ ) {
653       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
654       if (max < tmp) { max = tmp; }
655     }
656     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
657     CHKPTRQ(mat->rowvalues);
658     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
659   }
660 
661 
662   if (row < brstart || row >= brend) SETERRQ(1,"MatGetRow_MPIBAIJ:Only local rows")
663   lrow = row - brstart;
664 
665   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
666   if (!v)   {pvA = 0; pvB = 0;}
667   if (!idx) {pcA = 0; if (!v) pcB = 0;}
668   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
669   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
670   nztot = nzA + nzB;
671 
672   cmap  = mat->garray;
673   if (v  || idx) {
674     if (nztot) {
675       /* Sort by increasing column numbers, assuming A and B already sorted */
676       int imark = -1;
677       if (v) {
678         *v = v_p = mat->rowvalues;
679         for ( i=0; i<nzB; i++ ) {
680           if (cmap[cworkB[i]/bs] < cstart)   v_p[i] = vworkB[i];
681           else break;
682         }
683         imark = i;
684         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
685         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
686       }
687       if (idx) {
688         *idx = idx_p = mat->rowindices;
689         if (imark > -1) {
690           for ( i=0; i<imark; i++ ) {
691             idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs;
692           }
693         } else {
694           for ( i=0; i<nzB; i++ ) {
695             if (cmap[cworkB[i]/bs] < cstart)
696               idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
697             else break;
698           }
699           imark = i;
700         }
701         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart*bs + cworkA[i];
702         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
703       }
704     }
705     else {
706       if (idx) *idx = 0;
707       if (v)   *v   = 0;
708     }
709   }
710   *nz = nztot;
711   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
712   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
713   return 0;
714 }
715 
716 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
717 {
718   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
719   if (baij->getrowactive == PETSC_FALSE) {
720     SETERRQ(1,"MatRestoreRow_MPIBAIJ:MatGetRow not called");
721   }
722   baij->getrowactive = PETSC_FALSE;
723   return 0;
724 }
725 
726 static int MatGetBlockSize_MPIBAIJ(Mat mat, int *bs)
727 {
728   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
729   *bs = baij->bs;
730   return 0;
731 }
732 
733 static int MatZeroEntries_MPIBAIJ(Mat A)
734 {
735   Mat_MPIBAIJ *l = (Mat_MPIBAIJ *) A->data;
736   int         ierr;
737   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
738   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
739   return 0;
740 }
741 static int MatSetOption_MPIBAIJ(Mat A,MatOption op)
742 {
743   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
744 
745   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
746       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
747       op == MAT_COLUMNS_SORTED ||
748       op == MAT_ROW_ORIENTED) {
749         MatSetOption(a->A,op);
750         MatSetOption(a->B,op);
751   }
752   else if (op == MAT_ROWS_SORTED ||
753            op == MAT_SYMMETRIC ||
754            op == MAT_STRUCTURALLY_SYMMETRIC ||
755            op == MAT_YES_NEW_DIAGONALS)
756     PLogInfo(A,"Info:MatSetOption_MPIBAIJ:Option ignored\n");
757   else if (op == MAT_COLUMN_ORIENTED) {
758     a->roworiented = 0;
759     MatSetOption(a->A,op);
760     MatSetOption(a->B,op);
761   }
762   else if (op == MAT_NO_NEW_DIAGONALS)
763     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIAIJ:MAT_NO_NEW_DIAGONALS");}
764   else
765     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIAIJ:unknown option");}
766   return 0;
767 }
768 
769 /* -------------------------------------------------------------------*/
770 static struct _MatOps MatOps = {
771   MatSetValues_MPIBAIJ,MatGetRow_MPIBAIJ,MatRestoreRow_MPIBAIJ,MatMult_MPIBAIJ,
772   MatMultAdd_MPIBAIJ,MatMultTrans_MPIBAIJ,MatMultTransAdd_MPIBAIJ,0,
773   0,0,0,0,
774   0,0,0,0,
775   0,MatGetDiagonal_MPIBAIJ,0,MatNorm_MPIBAIJ,
776   MatAssemblyBegin_MPIBAIJ,MatAssemblyEnd_MPIBAIJ,0,MatSetOption_MPIBAIJ,
777   MatZeroEntries_MPIBAIJ,0,MatGetReordering_MPIBAIJ,0,
778   0,0,0,MatGetSize_MPIBAIJ,
779   MatGetLocalSize_MPIBAIJ,MatGetOwnershipRange_MPIBAIJ,0,0,
780   0,0,0,0,
781   0,0,0,0,
782   0,0,0,MatGetSubMatrices_MPIBAIJ,
783   MatIncreaseOverlap_MPIBAIJ,MatGetValues_MPIBAIJ,0,0,
784   MatScale_MPIBAIJ,0,0,0,MatGetBlockSize_MPIBAIJ};
785 
786 
787 /*@C
788    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
789    (block compressed row).  For good matrix assembly performance
790    the user should preallocate the matrix storage by setting the parameters
791    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
792    performance can be increased by more than a factor of 50.
793 
794    Input Parameters:
795 .  comm - MPI communicator
796 .  bs   - size of blockk
797 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
798            This value should be the same as the local size used in creating the
799            y vector for the matrix-vector product y = Ax.
800 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
801            This value should be the same as the local size used in creating the
802            x vector for the matrix-vector product y = Ax.
803 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
804 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
805 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
806            submatrix  (same for all local rows)
807 .  d_nzz - array containing the number of block nonzeros in the various block rows
808            of the in diagonal portion of the local (possibly different for each block
809            row) or PETSC_NULL.  You must leave room for the diagonal entry even if
810            it is zero.
811 .  o_nz  - number of block nonzeros per block row in the off-diagonal portion of local
812            submatrix (same for all local rows).
813 .  o_nzz - array containing the number of nonzeros in the various block rows of the
814            off-diagonal portion of the local submatrix (possibly different for
815            each block row) or PETSC_NULL.
816 
817    Output Parameter:
818 .  A - the matrix
819 
820    Notes:
821    The user MUST specify either the local or global matrix dimensions
822    (possibly both).
823 
824    Storage Information:
825    For a square global matrix we define each processor's diagonal portion
826    to be its local rows and the corresponding columns (a square submatrix);
827    each processor's off-diagonal portion encompasses the remainder of the
828    local matrix (a rectangular submatrix).
829 
830    The user can specify preallocated storage for the diagonal part of
831    the local submatrix with either d_nz or d_nnz (not both).  Set
832    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
833    memory allocation.  Likewise, specify preallocated storage for the
834    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
835 
836    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
837    the figure below we depict these three local rows and all columns (0-11).
838 
839 $          0 1 2 3 4 5 6 7 8 9 10 11
840 $         -------------------
841 $  row 3  |  o o o d d d o o o o o o
842 $  row 4  |  o o o d d d o o o o o o
843 $  row 5  |  o o o d d d o o o o o o
844 $         -------------------
845 $
846 
847    Thus, any entries in the d locations are stored in the d (diagonal)
848    submatrix, and any entries in the o locations are stored in the
849    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
850    stored simply in the MATSEQBAIJ format for compressed row storage.
851 
852    Now d_nz should indicate the number of nonzeros per row in the d matrix,
853    and o_nz should indicate the number of nonzeros per row in the o matrix.
854    In general, for PDE problems in which most nonzeros are near the diagonal,
855    one expects d_nz >> o_nz.   For large problems you MUST preallocate memory
856    or you will get TERRIBLE performance; see the users' manual chapter on
857    matrices and the file $(PETSC_DIR)/Performance.
858 
859 .keywords: matrix, block, aij, compressed row, sparse, parallel
860 
861 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues()
862 @*/
863 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
864                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
865 {
866   Mat          B;
867   Mat_MPIBAIJ  *b;
868   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE;
869 
870   if (bs < 1) SETERRQ(1,"MatCreateMPIBAIJ: invalid block size specified");
871   *A = 0;
872   PetscHeaderCreate(B,_Mat,MAT_COOKIE,MATMPIBAIJ,comm);
873   PLogObjectCreate(B);
874   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
875   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
876   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
877   B->destroy    = MatDestroy_MPIBAIJ;
878   B->view       = MatView_MPIBAIJ;
879 
880   B->factor     = 0;
881   B->assembled  = PETSC_FALSE;
882 
883   b->insertmode = NOT_SET_VALUES;
884   MPI_Comm_rank(comm,&b->rank);
885   MPI_Comm_size(comm,&b->size);
886 
887   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
888     SETERRQ(1,"MatCreateMPIBAIJ:Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
889   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) SETERRQ(1,"MatCreateMPIBAIJ: either M or m should be specified");
890   if ( M == PETSC_DECIDE && n == PETSC_DECIDE)SETERRQ(1,"MatCreateMPIBAIJ: either N or n should be specified");
891   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
892   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
893 
894   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
895     work[0] = m; work[1] = n;
896     mbs = m/bs; nbs = n/bs;
897     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
898     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
899     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
900   }
901   if (m == PETSC_DECIDE) {
902     Mbs = M/bs;
903     if (Mbs*bs != M) SETERRQ(1,"MatCreateMPIBAIJ: No of global rows must be divisible by blocksize");
904     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
905     m   = mbs*bs;
906   }
907   if (n == PETSC_DECIDE) {
908     Nbs = N/bs;
909     if (Nbs*bs != N) SETERRQ(1,"MatCreateMPIBAIJ: No of global cols must be divisible by blocksize");
910     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
911     n   = nbs*bs;
912   }
913   if (mbs*bs != m || nbs*bs != n) SETERRQ(1,"MatCreateMPIBAIJ: No of local rows, cols must be divisible by blocksize");
914 
915   b->m = m; B->m = m;
916   b->n = n; B->n = n;
917   b->N = N; B->N = N;
918   b->M = M; B->M = M;
919   b->bs  = bs;
920   b->bs2 = bs*bs;
921   b->mbs = mbs;
922   b->nbs = nbs;
923   b->Mbs = Mbs;
924   b->Nbs = Nbs;
925 
926   /* build local table of row and column ownerships */
927   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
928   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
929   b->cowners = b->rowners + b->size + 1;
930   MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
931   b->rowners[0] = 0;
932   for ( i=2; i<=b->size; i++ ) {
933     b->rowners[i] += b->rowners[i-1];
934   }
935   b->rstart = b->rowners[b->rank];
936   b->rend   = b->rowners[b->rank+1];
937   MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
938   b->cowners[0] = 0;
939   for ( i=2; i<=b->size; i++ ) {
940     b->cowners[i] += b->cowners[i-1];
941   }
942   b->cstart = b->cowners[b->rank];
943   b->cend   = b->cowners[b->rank+1];
944 
945   if (d_nz == PETSC_DEFAULT) d_nz = 5;
946   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
947   PLogObjectParent(B,b->A);
948   if (o_nz == PETSC_DEFAULT) o_nz = 0;
949   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
950   PLogObjectParent(B,b->B);
951 
952   /* build cache for off array entries formed */
953   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
954   b->colmap      = 0;
955   b->garray      = 0;
956   b->roworiented = 1;
957 
958   /* stuff used for matrix vector multiply */
959   b->lvec      = 0;
960   b->Mvctx     = 0;
961 
962   /* stuff for MatGetRow() */
963   b->rowindices   = 0;
964   b->rowvalues    = 0;
965   b->getrowactive = PETSC_FALSE;
966 
967   *A = B;
968   return 0;
969 }
970 
971 #include "sys.h"
972 
973 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
974 {
975   Mat          A;
976   int          i, nz, ierr, j,rstart, rend, fd;
977   Scalar       *vals,*buf;
978   MPI_Comm     comm = ((PetscObject)viewer)->comm;
979   MPI_Status   status;
980   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
981   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
982   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,bs2,Mbs,mbs,extra_rows;
983   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
984   int          dcount,kmax,k,nzcount,tmp;
985 
986 
987   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
988   bs2  = bs*bs;
989 
990   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
991   if (!rank) {
992     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
993     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
994     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIBAIJ:not matrix object");
995   }
996 
997   MPI_Bcast(header+1,3,MPI_INT,0,comm);
998   M = header[1]; N = header[2];
999 
1000 
1001   if (M != N) SETERRQ(1,"MatLoad_SeqBAIJ:Can only do square matrices");
1002 
1003   /*
1004      This code adds extra rows to make sure the number of rows is
1005      divisible by the blocksize
1006   */
1007   Mbs        = M/bs;
1008   extra_rows = bs - M + bs*(Mbs);
1009   if (extra_rows == bs) extra_rows = 0;
1010   else                  Mbs++;
1011   if (extra_rows &&!rank) {
1012     PLogInfo(0,"MatLoad_SeqBAIJ:Padding loaded matrix to match blocksize");
1013   }
1014   /* determine ownership of all rows */
1015   mbs = Mbs/size + ((Mbs % size) > rank);
1016   m   = mbs * bs;
1017   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
1018   browners = rowners + size + 1;
1019   MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1020   rowners[0] = 0;
1021   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
1022   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
1023   rstart = rowners[rank];
1024   rend   = rowners[rank+1];
1025 
1026   /* distribute row lengths to all processors */
1027   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
1028   if (!rank) {
1029     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
1030     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1031     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
1032     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1033     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
1034     MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);
1035     PetscFree(sndcounts);
1036   }
1037   else {
1038     MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);
1039   }
1040 
1041   if (!rank) {
1042     /* calculate the number of nonzeros on each processor */
1043     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1044     PetscMemzero(procsnz,size*sizeof(int));
1045     for ( i=0; i<size; i++ ) {
1046       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
1047         procsnz[i] += rowlengths[j];
1048       }
1049     }
1050     PetscFree(rowlengths);
1051 
1052     /* determine max buffer needed and allocate it */
1053     maxnz = 0;
1054     for ( i=0; i<size; i++ ) {
1055       maxnz = PetscMax(maxnz,procsnz[i]);
1056     }
1057     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1058 
1059     /* read in my part of the matrix column indices  */
1060     nz = procsnz[0];
1061     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1062     mycols = ibuf;
1063     if (size == 1)  nz -= extra_rows;
1064     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1065     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
1066 
1067     /* read in every ones (except the last) and ship off */
1068     for ( i=1; i<size-1; i++ ) {
1069       nz = procsnz[i];
1070       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1071       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1072     }
1073     /* read in the stuff for the last proc */
1074     if ( size != 1 ) {
1075       nz = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
1076       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1077       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
1078       MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);
1079     }
1080     PetscFree(cols);
1081   }
1082   else {
1083     /* determine buffer space needed for message */
1084     nz = 0;
1085     for ( i=0; i<m; i++ ) {
1086       nz += locrowlens[i];
1087     }
1088     ibuf = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1089     mycols = ibuf;
1090     /* receive message of column indices*/
1091     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1092     MPI_Get_count(&status,MPI_INT,&maxnz);
1093     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIBAIJ:something is wrong with file");
1094   }
1095 
1096   /* loop over local rows, determining number of off diagonal entries */
1097   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
1098   odlens = dlens + (rend-rstart);
1099   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
1100   PetscMemzero(mask,3*Mbs*sizeof(int));
1101   masked1 = mask    + Mbs;
1102   masked2 = masked1 + Mbs;
1103   rowcount = 0; nzcount = 0;
1104   for ( i=0; i<mbs; i++ ) {
1105     dcount  = 0;
1106     odcount = 0;
1107     for ( j=0; j<bs; j++ ) {
1108       kmax = locrowlens[rowcount];
1109       for ( k=0; k<kmax; k++ ) {
1110         tmp = mycols[nzcount++]/bs;
1111         if (!mask[tmp]) {
1112           mask[tmp] = 1;
1113           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
1114           else masked1[dcount++] = tmp;
1115         }
1116       }
1117       rowcount++;
1118     }
1119 
1120     dlens[i]  = dcount;
1121     odlens[i] = odcount;
1122 
1123     /* zero out the mask elements we set */
1124     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
1125     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
1126   }
1127 
1128   /* create our matrix */
1129   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);CHKERRQ(ierr);
1130   A = *newmat;
1131   MatSetOption(A,MAT_COLUMNS_SORTED);
1132 
1133   if (!rank) {
1134     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
1135     /* read in my part of the matrix numerical values  */
1136     nz = procsnz[0];
1137     vals = buf;
1138     mycols = ibuf;
1139     if (size == 1)  nz -= extra_rows;
1140     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1141     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
1142     /* insert into matrix */
1143     jj      = rstart*bs;
1144     for ( i=0; i<m; i++ ) {
1145       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1146       mycols += locrowlens[i];
1147       vals   += locrowlens[i];
1148       jj++;
1149     }
1150     /* read in other processors( except the last one) and ship out */
1151     for ( i=1; i<size-1; i++ ) {
1152       nz = procsnz[i];
1153       vals = buf;
1154       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1155       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1156     }
1157     /* the last proc */
1158     if ( size != 1 ){
1159       nz = procsnz[i] - extra_rows;
1160       vals = buf;
1161       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1162       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
1163       MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);
1164     }
1165     PetscFree(procsnz);
1166   }
1167   else {
1168     /* receive numeric values */
1169     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
1170 
1171     /* receive message of values*/
1172     vals = buf;
1173     mycols = ibuf;
1174     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1175     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1176     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIBAIJ:something is wrong with file");
1177 
1178     /* insert into matrix */
1179     jj      = rstart*bs;
1180     for ( i=0; i<m; i++ ) {
1181       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1182       mycols += locrowlens[i];
1183       vals   += locrowlens[i];
1184       jj++;
1185     }
1186   }
1187   PetscFree(locrowlens);
1188   PetscFree(buf);
1189   PetscFree(ibuf);
1190   PetscFree(rowners);
1191   PetscFree(dlens);
1192   PetscFree(mask);
1193   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1194   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1195   return 0;
1196 }
1197 
1198 
1199