xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision bf39df4762e40954c1793e90cc3f6b988e16fb2a)
1 #ifndef lint
2 static char vcid[] = "$Id: mpibaij.c,v 1.12 1996/07/08 22:20:04 bsmith Exp balay $";
3 #endif
4 
5 #include "mpibaij.h"
6 
7 
8 #include "draw.h"
9 #include "pinclude/pviewer.h"
10 
11 extern int MatSetUpMultiply_MPIBAIJ(Mat);
12 extern int DisAssemble_MPIBAIJ(Mat);
13 extern int MatIncreaseOverlap_MPIBAIJ(Mat,int,IS *,int);
14 extern int MatGetSubMatrices_MPIBAIJ(Mat,int,IS *,IS *,MatGetSubMatrixCall,Mat **);
15 
16 /* local utility routine that creates a mapping from the global column
17 number to the local number in the off-diagonal part of the local
18 storage of the matrix.  This is done in a non scable way since the
19 length of colmap equals the global matrix length.
20 */
21 static int CreateColmap_MPIBAIJ_Private(Mat mat)
22 {
23   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
24   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
25   int        nbs = B->nbs,i;
26 
27   baij->colmap = (int *) PetscMalloc(baij->Nbs*sizeof(int));CHKPTRQ(baij->colmap);
28   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
29   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
30   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i;
31   return 0;
32 }
33 
34 
35 static int MatGetReordering_MPIBAIJ(Mat mat,MatReordering type,IS *rperm,IS *cperm)
36 {
37   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
38   int         ierr;
39   if (baij->size == 1) {
40     ierr = MatGetReordering(baij->A,type,rperm,cperm); CHKERRQ(ierr);
41   } else SETERRQ(1,"MatGetReordering_MPIBAIJ:not supported in parallel");
42   return 0;
43 }
44 
45 static int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
46 {
47   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
48   Scalar      value;
49   int         ierr,i,j, rstart = baij->rstart, rend = baij->rend;
50   int         cstart = baij->cstart, cend = baij->cend,row,col;
51   int         roworiented = baij->roworiented,rstart_orig,rend_orig;
52   int         cstart_orig,cend_orig,bs=baij->bs;
53 
54   if (baij->insertmode != NOT_SET_VALUES && baij->insertmode != addv) {
55     SETERRQ(1,"MatSetValues_MPIBAIJ:Cannot mix inserts and adds");
56   }
57   baij->insertmode = addv;
58   rstart_orig = rstart*bs;
59   rend_orig   = rend*bs;
60   cstart_orig = cstart*bs;
61   cend_orig   = cend*bs;
62   for ( i=0; i<m; i++ ) {
63     if (im[i] < 0) SETERRQ(1,"MatSetValues_MPIBAIJ:Negative row");
64     if (im[i] >= baij->M) SETERRQ(1,"MatSetValues_MPIBAIJ:Row too large");
65     if (im[i] >= rstart_orig && im[i] < rend_orig) {
66       row = im[i] - rstart_orig;
67       for ( j=0; j<n; j++ ) {
68         if (in[j] < 0) SETERRQ(1,"MatSetValues_MPIBAIJ:Negative column");
69         if (in[j] >= baij->N) SETERRQ(1,"MatSetValues_MPIBAIJ:Col too large");
70         if (in[j] >= cstart_orig && in[j] < cend_orig){
71           col = in[j] - cstart_orig;
72           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
73           ierr = MatSetValues(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
74         }
75         else {
76           if (mat->was_assembled) {
77             if (!baij->colmap) {ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);}
78             col = baij->colmap[in[j]];
79             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
80               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
81               col =  in[j];
82             }
83           }
84           else col = in[j];
85           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
86           ierr = MatSetValues(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
87         }
88       }
89     }
90     else {
91       if (roworiented) {
92         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
93       }
94       else {
95         row = im[i];
96         for ( j=0; j<n; j++ ) {
97           ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
98         }
99       }
100     }
101   }
102   return 0;
103 }
104 
105 static int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
106 {
107   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
108   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
109   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col;
110 
111   for ( i=0; i<m; i++ ) {
112     if (idxm[i] < 0) SETERRQ(1,"MatGetValues_MPIBAIJ:Negative row");
113     if (idxm[i] >= baij->M) SETERRQ(1,"MatGetValues_MPIBAIJ:Row too large");
114     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
115       row = idxm[i] - bsrstart;
116       for ( j=0; j<n; j++ ) {
117         if (idxn[j] < 0) SETERRQ(1,"MatGetValues_MPIBAIJ:Negative column");
118         if (idxn[j] >= baij->N) SETERRQ(1,"MatGetValues_MPIBAIJ:Col too large");
119         if (idxn[j] >= bscstart && idxn[j] < bscend){
120           col = idxn[j] - bscstart;
121           ierr = MatGetValues(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
122         }
123         else {
124           if (!baij->colmap) {ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);}
125           if ( baij->garray[baij->colmap[idxn[j]/bs]] != idxn[j]/bs ) *(v+i*n+j) = 0.0;
126           else {
127             col = baij->colmap[idxn[j]/bs]*bs + idxn[j]%bs;
128             ierr = MatGetValues(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
129           }
130         }
131       }
132     }
133     else {
134       SETERRQ(1,"MatGetValues_MPIBAIJ:Only local values currently supported");
135     }
136   }
137   return 0;
138 }
139 
140 static int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
141 {
142   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
143   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
144   int        ierr, i,bs2=baij->bs2;
145   double     sum = 0.0;
146   Scalar     *v;
147 
148   if (baij->size == 1) {
149     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
150   } else {
151     if (type == NORM_FROBENIUS) {
152       v = amat->a;
153       for (i=0; i<amat->nz*bs2; i++ ) {
154 #if defined(PETSC_COMPLEX)
155         sum += real(conj(*v)*(*v)); v++;
156 #else
157         sum += (*v)*(*v); v++;
158 #endif
159       }
160       v = bmat->a;
161       for (i=0; i<bmat->nz*bs2; i++ ) {
162 #if defined(PETSC_COMPLEX)
163         sum += real(conj(*v)*(*v)); v++;
164 #else
165         sum += (*v)*(*v); v++;
166 #endif
167       }
168       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
169       *norm = sqrt(*norm);
170     }
171     else
172       SETERRQ(1,"MatNorm_SeqBAIJ:No support for this norm yet");
173   }
174   return 0;
175 }
176 
177 static int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
178 {
179   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
180   MPI_Comm    comm = mat->comm;
181   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
182   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
183   MPI_Request *send_waits,*recv_waits;
184   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
185   InsertMode  addv;
186   Scalar      *rvalues,*svalues;
187 
188   /* make sure all processors are either in INSERTMODE or ADDMODE */
189   MPI_Allreduce(&baij->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
190   if (addv == (ADD_VALUES|INSERT_VALUES)) {
191     SETERRQ(1,"MatAssemblyBegin_MPIBAIJ:Some processors inserted others added");
192   }
193   baij->insertmode = addv; /* in case this processor had no cache */
194 
195   /*  first count number of contributors to each processor */
196   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
197   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
198   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
199   for ( i=0; i<baij->stash.n; i++ ) {
200     idx = baij->stash.idx[i];
201     for ( j=0; j<size; j++ ) {
202       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
203         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
204       }
205     }
206   }
207   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
208 
209   /* inform other processors of number of messages and max length*/
210   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
211   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
212   nreceives = work[rank];
213   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
214   nmax = work[rank];
215   PetscFree(work);
216 
217   /* post receives:
218        1) each message will consist of ordered pairs
219      (global index,value) we store the global index as a double
220      to simplify the message passing.
221        2) since we don't know how long each individual message is we
222      allocate the largest needed buffer for each receive. Potentially
223      this is a lot of wasted space.
224 
225 
226        This could be done better.
227   */
228   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
229   CHKPTRQ(rvalues);
230   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
231   CHKPTRQ(recv_waits);
232   for ( i=0; i<nreceives; i++ ) {
233     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
234               comm,recv_waits+i);
235   }
236 
237   /* do sends:
238       1) starts[i] gives the starting index in svalues for stuff going to
239          the ith processor
240   */
241   svalues = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
242   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
243   CHKPTRQ(send_waits);
244   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
245   starts[0] = 0;
246   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
247   for ( i=0; i<baij->stash.n; i++ ) {
248     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
249     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
250     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
251   }
252   PetscFree(owner);
253   starts[0] = 0;
254   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
255   count = 0;
256   for ( i=0; i<size; i++ ) {
257     if (procs[i]) {
258       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
259                 comm,send_waits+count++);
260     }
261   }
262   PetscFree(starts); PetscFree(nprocs);
263 
264   /* Free cache space */
265   PLogInfo(0,"[%d]MatAssemblyBegin_MPIBAIJ:Number of off processor values %d\n",rank,baij->stash.n);
266   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
267 
268   baij->svalues    = svalues;    baij->rvalues    = rvalues;
269   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
270   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
271   baij->rmax       = nmax;
272 
273   return 0;
274 }
275 
276 
277 static int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
278 {
279   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
280   MPI_Status  *send_status,recv_status;
281   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
282   int         bs=baij->bs,row,col,other_disassembled;
283   Scalar      *values,val;
284   InsertMode  addv = baij->insertmode;
285 
286   /*  wait on receives */
287   while (count) {
288     MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);
289     /* unpack receives into our local space */
290     values = baij->rvalues + 3*imdex*baij->rmax;
291     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
292     n = n/3;
293     for ( i=0; i<n; i++ ) {
294       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
295       col = (int) PetscReal(values[3*i+1]);
296       val = values[3*i+2];
297       if (col >= baij->cstart*bs && col < baij->cend*bs) {
298         col -= baij->cstart*bs;
299         MatSetValues(baij->A,1,&row,1,&col,&val,addv);
300       }
301       else {
302         if (mat->was_assembled) {
303           if (!baij->colmap) {ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);}
304           col = baij->colmap[col/bs]*bs + col%bs;
305           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
306             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
307             col = (int) PetscReal(values[3*i+1]);
308           }
309         }
310         MatSetValues(baij->B,1,&row,1,&col,&val,addv);
311       }
312     }
313     count--;
314   }
315   PetscFree(baij->recv_waits); PetscFree(baij->rvalues);
316 
317   /* wait on sends */
318   if (baij->nsends) {
319     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));
320     CHKPTRQ(send_status);
321     MPI_Waitall(baij->nsends,baij->send_waits,send_status);
322     PetscFree(send_status);
323   }
324   PetscFree(baij->send_waits); PetscFree(baij->svalues);
325 
326   baij->insertmode = NOT_SET_VALUES;
327   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
328   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
329 
330   /* determine if any processor has disassembled, if so we must
331      also disassemble ourselfs, in order that we may reassemble. */
332   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
333   if (mat->was_assembled && !other_disassembled) {
334     ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
335   }
336 
337   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
338     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
339   }
340   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
341   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
342 
343   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
344   return 0;
345 }
346 
347 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
348 {
349   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
350   int          ierr;
351 
352   if (baij->size == 1) {
353     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
354   }
355   else SETERRQ(1,"MatView_MPIBAIJ_Binary:Only uniprocessor output supported");
356   return 0;
357 }
358 
359 static int MatView_MPIBAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
360 {
361   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
362   int          ierr, format,rank,bs=baij->bs;
363   FILE         *fd;
364   ViewerType   vtype;
365 
366   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
367   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
368     ierr = ViewerGetFormat(viewer,&format);
369     if (format == ASCII_FORMAT_INFO_DETAILED) {
370       int nz, nzalloc, mem;
371       MPI_Comm_rank(mat->comm,&rank);
372       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
373       ierr = MatGetInfo(mat,MAT_LOCAL,&nz,&nzalloc,&mem);
374       PetscSequentialPhaseBegin(mat->comm,1);
375       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
376               rank,baij->m,nz*bs,nzalloc*bs,baij->bs,mem);
377       ierr = MatGetInfo(baij->A,MAT_LOCAL,&nz,&nzalloc,&mem);
378       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,nz*bs);
379       ierr = MatGetInfo(baij->B,MAT_LOCAL,&nz,&nzalloc,&mem);
380       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,nz*bs);
381       fflush(fd);
382       PetscSequentialPhaseEnd(mat->comm,1);
383       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
384       return 0;
385     }
386     else if (format == ASCII_FORMAT_INFO) {
387       return 0;
388     }
389   }
390 
391   if (vtype == DRAW_VIEWER) {
392     Draw       draw;
393     PetscTruth isnull;
394     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
395     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
396   }
397 
398   if (vtype == ASCII_FILE_VIEWER) {
399     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
400     PetscSequentialPhaseBegin(mat->comm,1);
401     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
402            baij->rank,baij->m,baij->rstart*bs,baij->rend*bs,baij->n,
403             baij->cstart*bs,baij->cend*bs);
404     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
405     ierr = MatView(baij->B,viewer); CHKERRQ(ierr);
406     fflush(fd);
407     PetscSequentialPhaseEnd(mat->comm,1);
408   }
409   else {
410     int size = baij->size;
411     rank = baij->rank;
412     if (size == 1) {
413       ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
414     }
415     else {
416       /* assemble the entire matrix onto first processor. */
417       Mat         A;
418       Mat_SeqBAIJ *Aloc;
419       int         M = baij->M, N = baij->N,*ai,*aj,row,col,i,j,k,*rvals;
420       int         mbs=baij->mbs;
421       Scalar      *a;
422 
423       if (!rank) {
424         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
425         CHKERRQ(ierr);
426       }
427       else {
428         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
429         CHKERRQ(ierr);
430       }
431       PLogObjectParent(mat,A);
432 
433       /* copy over the A part */
434       Aloc = (Mat_SeqBAIJ*) baij->A->data;
435       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
436       row = baij->rstart;
437       rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
438 
439       for ( i=0; i<mbs; i++ ) {
440         rvals[0] = bs*(baij->rstart + i);
441         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
442         for ( j=ai[i]; j<ai[i+1]; j++ ) {
443           col = (baij->cstart+aj[j])*bs;
444           for (k=0; k<bs; k++ ) {
445             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
446             col++; a += bs;
447           }
448         }
449       }
450       /* copy over the B part */
451       Aloc = (Mat_SeqBAIJ*) baij->B->data;
452       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
453       row = baij->rstart*bs;
454       for ( i=0; i<mbs; i++ ) {
455         rvals[0] = bs*(baij->rstart + i);
456         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
457         for ( j=ai[i]; j<ai[i+1]; j++ ) {
458           col = baij->garray[aj[j]]*bs;
459           for (k=0; k<bs; k++ ) {
460             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
461             col++; a += bs;
462           }
463         }
464       }
465       PetscFree(rvals);
466       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
467       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
468       if (!rank) {
469         ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
470       }
471       ierr = MatDestroy(A); CHKERRQ(ierr);
472     }
473   }
474   return 0;
475 }
476 
477 
478 
479 static int MatView_MPIBAIJ(PetscObject obj,Viewer viewer)
480 {
481   Mat         mat = (Mat) obj;
482   int         ierr;
483   ViewerType  vtype;
484 
485   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
486   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
487       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
488     ierr = MatView_MPIBAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
489   }
490   else if (vtype == BINARY_FILE_VIEWER) {
491     return MatView_MPIBAIJ_Binary(mat,viewer);
492   }
493   return 0;
494 }
495 
496 static int MatDestroy_MPIBAIJ(PetscObject obj)
497 {
498   Mat         mat = (Mat) obj;
499   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
500   int         ierr;
501 
502 #if defined(PETSC_LOG)
503   PLogObjectState(obj,"Rows=%d, Cols=%d",baij->M,baij->N);
504 #endif
505 
506   PetscFree(baij->rowners);
507   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
508   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
509   if (baij->colmap) PetscFree(baij->colmap);
510   if (baij->garray) PetscFree(baij->garray);
511   if (baij->lvec)   VecDestroy(baij->lvec);
512   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
513   if (baij->rowvalues) PetscFree(baij->rowvalues);
514   PetscFree(baij);
515   PLogObjectDestroy(mat);
516   PetscHeaderDestroy(mat);
517   return 0;
518 }
519 
520 static int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
521 {
522   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
523   int         ierr, nt;
524 
525   ierr = VecGetLocalSize(xx,&nt);  CHKERRQ(ierr);
526   if (nt != a->n) {
527     SETERRQ(1,"MatMult_MPIAIJ:Incompatible parition of A and xx");
528   }
529   ierr = VecGetLocalSize(yy,&nt);  CHKERRQ(ierr);
530   if (nt != a->m) {
531     SETERRQ(1,"MatMult_MPIAIJ:Incompatible parition of A and yy");
532   }
533   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx); CHKERRQ(ierr);
534   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
535   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx); CHKERRQ(ierr);
536   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
537   return 0;
538 }
539 
540 static int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
541 {
542   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
543   int        ierr;
544   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
545   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
546   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
547   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
548   return 0;
549 }
550 
551 static int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
552 {
553   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
554   int        ierr;
555 
556   /* do nondiagonal part */
557   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
558   /* send it on its way */
559   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,
560                 (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
561   /* do local part */
562   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
563   /* receive remote parts: note this assumes the values are not actually */
564   /* inserted in yy until the next line, which is true for my implementation*/
565   /* but is not perhaps always true. */
566   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,
567                   (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
568   return 0;
569 }
570 
571 static int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
572 {
573   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
574   int        ierr;
575 
576   /* do nondiagonal part */
577   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
578   /* send it on its way */
579   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,
580                  (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
581   /* do local part */
582   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
583   /* receive remote parts: note this assumes the values are not actually */
584   /* inserted in yy until the next line, which is true for my implementation*/
585   /* but is not perhaps always true. */
586   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,
587                   (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
588   return 0;
589 }
590 
591 /*
592   This only works correctly for square matrices where the subblock A->A is the
593    diagonal block
594 */
595 static int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
596 {
597   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
598   if (a->M != a->N)
599     SETERRQ(1,"MatGetDiagonal_MPIBAIJ:Supports only square matrix where A->A is diag block");
600   return MatGetDiagonal(a->A,v);
601 }
602 
603 static int MatScale_MPIBAIJ(Scalar *aa,Mat A)
604 {
605   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
606   int        ierr;
607   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
608   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
609   return 0;
610 }
611 static int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
612 {
613   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
614   *m = mat->M; *n = mat->N;
615   return 0;
616 }
617 
618 static int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
619 {
620   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
621   *m = mat->m; *n = mat->N;
622   return 0;
623 }
624 
625 static int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
626 {
627   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
628   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
629   return 0;
630 }
631 
632 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
633 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
634 
635 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
636 {
637   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
638   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
639   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
640   int        nztot, nzA, nzB, lrow, brstart = mat->rstart*bs, brend = mat->rend*bs;
641   int        *cmap, *idx_p,cstart = mat->cstart;
642 
643   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,"MatGetRow_MPIBAIJ:Already active");
644   mat->getrowactive = PETSC_TRUE;
645 
646   if (!mat->rowvalues && (idx || v)) {
647     /*
648         allocate enough space to hold information from the longest row.
649     */
650     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
651     int     max = 1,mbs = mat->mbs,tmp;
652     for ( i=0; i<mbs; i++ ) {
653       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
654       if (max < tmp) { max = tmp; }
655     }
656     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
657     CHKPTRQ(mat->rowvalues);
658     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
659   }
660 
661 
662   if (row < brstart || row >= brend) SETERRQ(1,"MatGetRow_MPIBAIJ:Only local rows")
663   lrow = row - brstart;
664 
665   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
666   if (!v)   {pvA = 0; pvB = 0;}
667   if (!idx) {pcA = 0; if (!v) pcB = 0;}
668   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
669   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
670   nztot = nzA + nzB;
671 
672   cmap  = mat->garray;
673   if (v  || idx) {
674     if (nztot) {
675       /* Sort by increasing column numbers, assuming A and B already sorted */
676       int imark = -1;
677       if (v) {
678         *v = v_p = mat->rowvalues;
679         for ( i=0; i<nzB; i++ ) {
680           if (cmap[cworkB[i]/bs] < cstart)   v_p[i] = vworkB[i];
681           else break;
682         }
683         imark = i;
684         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
685         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
686       }
687       if (idx) {
688         *idx = idx_p = mat->rowindices;
689         if (imark > -1) {
690           for ( i=0; i<imark; i++ ) {
691             idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs;
692           }
693         } else {
694           for ( i=0; i<nzB; i++ ) {
695             if (cmap[cworkB[i]/bs] < cstart)
696               idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
697             else break;
698           }
699           imark = i;
700         }
701         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart*bs + cworkA[i];
702         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
703       }
704     }
705     else {
706       if (idx) *idx = 0;
707       if (v)   *v   = 0;
708     }
709   }
710   *nz = nztot;
711   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
712   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
713   return 0;
714 }
715 
716 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
717 {
718   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
719   if (baij->getrowactive == PETSC_FALSE) {
720     SETERRQ(1,"MatRestoreRow_MPIBAIJ:MatGetRow not called");
721   }
722   baij->getrowactive = PETSC_FALSE;
723   return 0;
724 }
725 
726 static int MatGetBlockSize_MPIBAIJ(Mat mat, int *bs)
727 {
728   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
729   *bs = baij->bs;
730   return 0;
731 }
732 
733 /* -------------------------------------------------------------------*/
734 static struct _MatOps MatOps = {
735   MatSetValues_MPIBAIJ,MatGetRow_MPIBAIJ,MatRestoreRow_MPIBAIJ,MatMult_MPIBAIJ,
736   MatMultAdd_MPIBAIJ,MatMultTrans_MPIBAIJ,MatMultTransAdd_MPIBAIJ,0,
737   0,0,0,0,
738   0,0,0,0,
739   0,MatGetDiagonal_MPIBAIJ,0,MatNorm_MPIBAIJ,
740   MatAssemblyBegin_MPIBAIJ,MatAssemblyEnd_MPIBAIJ,0,0,
741   0,0,MatGetReordering_MPIBAIJ,0,
742   0,0,0,MatGetSize_MPIBAIJ,
743   MatGetLocalSize_MPIBAIJ,MatGetOwnershipRange_MPIBAIJ,0,0,
744   0,0,0,0,
745   0,0,0,0,
746   0,0,0,MatGetSubMatrices_MPIBAIJ,
747   MatIncreaseOverlap_MPIBAIJ,MatGetValues_MPIBAIJ,0,0,
748   MatScale_MPIBAIJ,0,0,0,MatGetBlockSize_MPIBAIJ};
749 
750 
751 /*@C
752    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
753    (block compressed row).  For good matrix assembly performance
754    the user should preallocate the matrix storage by setting the parameters
755    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
756    performance can be increased by more than a factor of 50.
757 
758    Input Parameters:
759 .  comm - MPI communicator
760 .  bs   - size of blockk
761 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
762            This value should be the same as the local size used in creating the
763            y vector for the matrix-vector product y = Ax.
764 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
765            This value should be the same as the local size used in creating the
766            x vector for the matrix-vector product y = Ax.
767 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
768 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
769 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
770            submatrix  (same for all local rows)
771 .  d_nzz - array containing the number of block nonzeros in the various block rows
772            of the in diagonal portion of the local (possibly different for each block
773            row) or PETSC_NULL.  You must leave room for the diagonal entry even if
774            it is zero.
775 .  o_nz  - number of block nonzeros per block row in the off-diagonal portion of local
776            submatrix (same for all local rows).
777 .  o_nzz - array containing the number of nonzeros in the various block rows of the
778            off-diagonal portion of the local submatrix (possibly different for
779            each block row) or PETSC_NULL.
780 
781    Output Parameter:
782 .  A - the matrix
783 
784    Notes:
785    The user MUST specify either the local or global matrix dimensions
786    (possibly both).
787 
788    Storage Information:
789    For a square global matrix we define each processor's diagonal portion
790    to be its local rows and the corresponding columns (a square submatrix);
791    each processor's off-diagonal portion encompasses the remainder of the
792    local matrix (a rectangular submatrix).
793 
794    The user can specify preallocated storage for the diagonal part of
795    the local submatrix with either d_nz or d_nnz (not both).  Set
796    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
797    memory allocation.  Likewise, specify preallocated storage for the
798    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
799 
800    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
801    the figure below we depict these three local rows and all columns (0-11).
802 
803 $          0 1 2 3 4 5 6 7 8 9 10 11
804 $         -------------------
805 $  row 3  |  o o o d d d o o o o o o
806 $  row 4  |  o o o d d d o o o o o o
807 $  row 5  |  o o o d d d o o o o o o
808 $         -------------------
809 $
810 
811    Thus, any entries in the d locations are stored in the d (diagonal)
812    submatrix, and any entries in the o locations are stored in the
813    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
814    stored simply in the MATSEQBAIJ format for compressed row storage.
815 
816    Now d_nz should indicate the number of nonzeros per row in the d matrix,
817    and o_nz should indicate the number of nonzeros per row in the o matrix.
818    In general, for PDE problems in which most nonzeros are near the diagonal,
819    one expects d_nz >> o_nz.   For large problems you MUST preallocate memory
820    or you will get TERRIBLE performance; see the users' manual chapter on
821    matrices and the file $(PETSC_DIR)/Performance.
822 
823 .keywords: matrix, block, aij, compressed row, sparse, parallel
824 
825 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues()
826 @*/
827 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
828                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
829 {
830   Mat          B;
831   Mat_MPIBAIJ  *b;
832   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE;
833 
834   if (bs < 1) SETERRQ(1,"MatCreateMPIBAIJ: invalid block size specified");
835   *A = 0;
836   PetscHeaderCreate(B,_Mat,MAT_COOKIE,MATMPIBAIJ,comm);
837   PLogObjectCreate(B);
838   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
839   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
840   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
841   B->destroy    = MatDestroy_MPIBAIJ;
842   B->view       = MatView_MPIBAIJ;
843 
844   B->factor     = 0;
845   B->assembled  = PETSC_FALSE;
846 
847   b->insertmode = NOT_SET_VALUES;
848   MPI_Comm_rank(comm,&b->rank);
849   MPI_Comm_size(comm,&b->size);
850 
851   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
852     SETERRQ(1,"MatCreateMPIBAIJ:Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
853   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) SETERRQ(1,"MatCreateMPIBAIJ: either M or m should be specified");
854   if ( M == PETSC_DECIDE && n == PETSC_DECIDE)SETERRQ(1,"MatCreateMPIBAIJ: either N or n should be specified");
855   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
856   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
857 
858   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
859     work[0] = m; work[1] = n;
860     mbs = m/bs; nbs = n/bs;
861     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
862     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
863     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
864   }
865   if (m == PETSC_DECIDE) {
866     Mbs = M/bs;
867     if (Mbs*bs != M) SETERRQ(1,"MatCreateMPIBAIJ: No of global rows must be divisible by blocksize");
868     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
869     m   = mbs*bs;
870   }
871   if (n == PETSC_DECIDE) {
872     Nbs = N/bs;
873     if (Nbs*bs != N) SETERRQ(1,"MatCreateMPIBAIJ: No of global cols must be divisible by blocksize");
874     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
875     n   = nbs*bs;
876   }
877   if (mbs*bs != m || nbs*bs != n) SETERRQ(1,"MatCreateMPIBAIJ: No of local rows, cols must be divisible by blocksize");
878 
879   b->m = m; B->m = m;
880   b->n = n; B->n = n;
881   b->N = N; B->N = N;
882   b->M = M; B->M = M;
883   b->bs  = bs;
884   b->bs2 = bs*bs;
885   b->mbs = mbs;
886   b->nbs = nbs;
887   b->Mbs = Mbs;
888   b->Nbs = Nbs;
889 
890   /* build local table of row and column ownerships */
891   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
892   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
893   b->cowners = b->rowners + b->size + 1;
894   MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
895   b->rowners[0] = 0;
896   for ( i=2; i<=b->size; i++ ) {
897     b->rowners[i] += b->rowners[i-1];
898   }
899   b->rstart = b->rowners[b->rank];
900   b->rend   = b->rowners[b->rank+1];
901   MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
902   b->cowners[0] = 0;
903   for ( i=2; i<=b->size; i++ ) {
904     b->cowners[i] += b->cowners[i-1];
905   }
906   b->cstart = b->cowners[b->rank];
907   b->cend   = b->cowners[b->rank+1];
908 
909   if (d_nz == PETSC_DEFAULT) d_nz = 5;
910   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
911   PLogObjectParent(B,b->A);
912   if (o_nz == PETSC_DEFAULT) o_nz = 0;
913   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
914   PLogObjectParent(B,b->B);
915 
916   /* build cache for off array entries formed */
917   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
918   b->colmap      = 0;
919   b->garray      = 0;
920   b->roworiented = 1;
921 
922   /* stuff used for matrix vector multiply */
923   b->lvec      = 0;
924   b->Mvctx     = 0;
925 
926   /* stuff for MatGetRow() */
927   b->rowindices   = 0;
928   b->rowvalues    = 0;
929   b->getrowactive = PETSC_FALSE;
930 
931   *A = B;
932   return 0;
933 }
934 
935 #include "sys.h"
936 
937 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
938 {
939   Mat          A;
940   int          i, nz, ierr, j,rstart, rend, fd;
941   Scalar       *vals,*buf;
942   MPI_Comm     comm = ((PetscObject)viewer)->comm;
943   MPI_Status   status;
944   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
945   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
946   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,bs2,Mbs,mbs,extra_rows;
947   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
948   int          dcount,kmax,k,nzcount,tmp;
949 
950 
951   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
952   bs2  = bs*bs;
953 
954   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
955   if (!rank) {
956     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
957     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
958     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIBAIJ:not matrix object");
959   }
960 
961   MPI_Bcast(header+1,3,MPI_INT,0,comm);
962   M = header[1]; N = header[2];
963 
964 
965   if (M != N) SETERRQ(1,"MatLoad_SeqBAIJ:Can only do square matrices");
966 
967   /*
968      This code adds extra rows to make sure the number of rows is
969      divisible by the blocksize
970   */
971   Mbs        = M/bs;
972   extra_rows = bs - M + bs*(Mbs);
973   if (extra_rows == bs) extra_rows = 0;
974   else                  Mbs++;
975   if (extra_rows &&!rank) {
976     PLogInfo(0,"MatLoad_SeqBAIJ:Padding loaded matrix to match blocksize");
977   }
978   /* determine ownership of all rows */
979   mbs = Mbs/size + ((Mbs % size) > rank);
980   m   = mbs * bs;
981   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
982   browners = rowners + size + 1;
983   MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);
984   rowners[0] = 0;
985   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
986   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
987   rstart = rowners[rank];
988   rend   = rowners[rank+1];
989 
990   /* distribute row lengths to all processors */
991   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
992   if (!rank) {
993     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
994     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
995     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
996     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
997     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
998     MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);
999     PetscFree(sndcounts);
1000   }
1001   else {
1002     MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);
1003   }
1004 
1005   if (!rank) {
1006     /* calculate the number of nonzeros on each processor */
1007     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1008     PetscMemzero(procsnz,size*sizeof(int));
1009     for ( i=0; i<size; i++ ) {
1010       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
1011         procsnz[i] += rowlengths[j];
1012       }
1013     }
1014     PetscFree(rowlengths);
1015 
1016     /* determine max buffer needed and allocate it */
1017     maxnz = 0;
1018     for ( i=0; i<size; i++ ) {
1019       maxnz = PetscMax(maxnz,procsnz[i]);
1020     }
1021     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1022 
1023     /* read in my part of the matrix column indices  */
1024     nz = procsnz[0];
1025     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1026     mycols = ibuf;
1027     if (size == 1)  nz -= extra_rows;
1028     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1029     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
1030 
1031     /* read in every ones (except the last) and ship off */
1032     for ( i=1; i<size-1; i++ ) {
1033       nz = procsnz[i];
1034       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1035       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1036     }
1037     /* read in the stuff for the last proc */
1038     if ( size != 1 ) {
1039       nz = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
1040       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1041       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
1042       MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);
1043     }
1044     PetscFree(cols);
1045   }
1046   else {
1047     /* determine buffer space needed for message */
1048     nz = 0;
1049     for ( i=0; i<m; i++ ) {
1050       nz += locrowlens[i];
1051     }
1052     ibuf = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1053     mycols = ibuf;
1054     /* receive message of column indices*/
1055     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1056     MPI_Get_count(&status,MPI_INT,&maxnz);
1057     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIBAIJ:something is wrong with file");
1058   }
1059 
1060   /* loop over local rows, determining number of off diagonal entries */
1061   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
1062   odlens = dlens + (rend-rstart);
1063   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
1064   PetscMemzero(mask,3*Mbs*sizeof(int));
1065   masked1 = mask    + Mbs;
1066   masked2 = masked1 + Mbs;
1067   rowcount = 0; nzcount = 0;
1068   for ( i=0; i<mbs; i++ ) {
1069     dcount  = 0;
1070     odcount = 0;
1071     for ( j=0; j<bs; j++ ) {
1072       kmax = locrowlens[rowcount];
1073       for ( k=0; k<kmax; k++ ) {
1074         tmp = mycols[nzcount++]/bs;
1075         if (!mask[tmp]) {
1076           mask[tmp] = 1;
1077           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
1078           else masked1[dcount++] = tmp;
1079         }
1080       }
1081       rowcount++;
1082     }
1083 
1084     dlens[i]  = dcount;
1085     odlens[i] = odcount;
1086 
1087     /* zero out the mask elements we set */
1088     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
1089     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
1090   }
1091 
1092   /* create our matrix */
1093   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);CHKERRQ(ierr);
1094   A = *newmat;
1095   MatSetOption(A,MAT_COLUMNS_SORTED);
1096 
1097   if (!rank) {
1098     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
1099     /* read in my part of the matrix numerical values  */
1100     nz = procsnz[0];
1101     vals = buf;
1102     mycols = ibuf;
1103     if (size == 1)  nz -= extra_rows;
1104     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1105     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
1106     /* insert into matrix */
1107     jj      = rstart*bs;
1108     for ( i=0; i<m; i++ ) {
1109       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1110       mycols += locrowlens[i];
1111       vals   += locrowlens[i];
1112       jj++;
1113     }
1114     /* read in other processors( except the last one) and ship out */
1115     for ( i=1; i<size-1; i++ ) {
1116       nz = procsnz[i];
1117       vals = buf;
1118       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1119       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1120     }
1121     /* the last proc */
1122     if ( size != 1 ){
1123       nz = procsnz[i] - extra_rows;
1124       vals = buf;
1125       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1126       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
1127       MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);
1128     }
1129     PetscFree(procsnz);
1130   }
1131   else {
1132     /* receive numeric values */
1133     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
1134 
1135     /* receive message of values*/
1136     vals = buf;
1137     mycols = ibuf;
1138     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1139     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1140     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIBAIJ:something is wrong with file");
1141 
1142     /* insert into matrix */
1143     jj      = rstart*bs;
1144     for ( i=0; i<m; i++ ) {
1145       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1146       mycols += locrowlens[i];
1147       vals   += locrowlens[i];
1148       jj++;
1149     }
1150   }
1151   PetscFree(locrowlens);
1152   PetscFree(buf);
1153   PetscFree(ibuf);
1154   PetscFree(rowners);
1155   PetscFree(dlens);
1156   PetscFree(mask);
1157   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1158   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1159   return 0;
1160 }
1161 
1162 
1163