xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision 92e8d3218da1c4f7c7fd9235e696e26af8150cec)
1 #ifndef lint
2 static char vcid[] = "$Id: mpibaij.c,v 1.9 1996/07/05 19:55:05 balay Exp curfman $";
3 #endif
4 
5 #include "mpibaij.h"
6 
7 
8 #include "draw.h"
9 #include "pinclude/pviewer.h"
10 
11 extern int MatSetUpMultiply_MPIBAIJ(Mat);
12 extern int DisAssemble_MPIBAIJ(Mat);
13 extern int MatIncreaseOverlap_MPIBAIJ(Mat,int,IS *,int);
14 extern int MatGetSubMatrices_MPIBAIJ(Mat,int,IS *,IS *,MatGetSubMatrixCall,Mat **);
15 
16 /* local utility routine that creates a mapping from the global column
17 number to the local number in the off-diagonal part of the local
18 storage of the matrix.  This is done in a non scable way since the
19 length of colmap equals the global matrix length.
20 */
21 static int CreateColmap_MPIBAIJ_Private(Mat mat)
22 {
23   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
24   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
25   int        nbs = B->nbs,i;
26 
27   baij->colmap = (int *) PetscMalloc(baij->Nbs*sizeof(int));CHKPTRQ(baij->colmap);
28   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
29   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
30   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i;
31   return 0;
32 }
33 
34 
35 static int MatGetReordering_MPIBAIJ(Mat mat,MatReordering type,IS *rperm,IS *cperm)
36 {
37   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
38   int         ierr;
39   if (baij->size == 1) {
40     ierr = MatGetReordering(baij->A,type,rperm,cperm); CHKERRQ(ierr);
41   } else SETERRQ(1,"MatGetReordering_MPIBAIJ:not supported in parallel");
42   return 0;
43 }
44 
45 static int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
46 {
47   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
48   Scalar      value;
49   int         ierr,i,j, rstart = baij->rstart, rend = baij->rend;
50   int         cstart = baij->cstart, cend = baij->cend,row,col;
51   int         roworiented = baij->roworiented,rstart_orig,rend_orig;
52   int         cstart_orig,cend_orig,bs=baij->bs;
53 
54   if (baij->insertmode != NOT_SET_VALUES && baij->insertmode != addv) {
55     SETERRQ(1,"MatSetValues_MPIBAIJ:Cannot mix inserts and adds");
56   }
57   baij->insertmode = addv;
58   rstart_orig = rstart*bs;
59   rend_orig   = rend*bs;
60   cstart_orig = cstart*bs;
61   cend_orig   = cend*bs;
62   for ( i=0; i<m; i++ ) {
63     if (im[i] < 0) SETERRQ(1,"MatSetValues_MPIBAIJ:Negative row");
64     if (im[i] >= baij->M) SETERRQ(1,"MatSetValues_MPIBAIJ:Row too large");
65     if (im[i] >= rstart_orig && im[i] < rend_orig) {
66       row = im[i] - rstart_orig;
67       for ( j=0; j<n; j++ ) {
68         if (in[j] < 0) SETERRQ(1,"MatSetValues_MPIBAIJ:Negative column");
69         if (in[j] >= baij->N) SETERRQ(1,"MatSetValues_MPIBAIJ:Col too large");
70         if (in[j] >= cstart_orig && in[j] < cend_orig){
71           col = in[j] - cstart_orig;
72           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
73           ierr = MatSetValues(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
74         }
75         else {
76           if (mat->was_assembled) {
77             if (!baij->colmap) {ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);}
78             col = baij->colmap[in[j]];
79             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
80               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
81               col =  in[j];
82             }
83           }
84           else col = in[j];
85           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
86           ierr = MatSetValues(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
87         }
88       }
89     }
90     else {
91       if (roworiented) {
92         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
93       }
94       else {
95         row = im[i];
96         for ( j=0; j<n; j++ ) {
97           ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
98         }
99       }
100     }
101   }
102   return 0;
103 }
104 
105 static int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
106 {
107   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
108   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
109   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col;
110 
111   for ( i=0; i<m; i++ ) {
112     if (idxm[i] < 0) SETERRQ(1,"MatGetValues_MPIBAIJ:Negative row");
113     if (idxm[i] >= baij->M) SETERRQ(1,"MatGetValues_MPIBAIJ:Row too large");
114     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
115       row = idxm[i] - bsrstart;
116       for ( j=0; j<n; j++ ) {
117         if (idxn[j] < 0) SETERRQ(1,"MatGetValues_MPIBAIJ:Negative column");
118         if (idxn[j] >= baij->N) SETERRQ(1,"MatGetValues_MPIBAIJ:Col too large");
119         if (idxn[j] >= bscstart && idxn[j] < bscend){
120           col = idxn[j] - bscstart;
121           ierr = MatGetValues(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
122         }
123         else {
124           if (!baij->colmap) {ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);}
125           if ( baij->garray[baij->colmap[idxn[j]/bs]] != idxn[j]/bs ) *(v+i*n+j) = 0.0;
126           else {
127             col = baij->colmap[idxn[j]/bs]*bs + idxn[j]%bs;
128             ierr = MatGetValues(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
129           }
130         }
131       }
132     }
133     else {
134       SETERRQ(1,"MatGetValues_MPIBAIJ:Only local values currently supported");
135     }
136   }
137   return 0;
138 }
139 
140 static int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
141 {
142   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
143   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
144   int        ierr, i,bs2=baij->bs2;
145   double     sum = 0.0;
146   Scalar     *v;
147 
148   if (baij->size == 1) {
149     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
150   } else {
151     if (type == NORM_FROBENIUS) {
152       v = amat->a;
153       for (i=0; i<amat->nz*bs2; i++ ) {
154 #if defined(PETSC_COMPLEX)
155         sum += real(conj(*v)*(*v)); v++;
156 #else
157         sum += (*v)*(*v); v++;
158 #endif
159       }
160       v = bmat->a;
161       for (i=0; i<bmat->nz*bs2; i++ ) {
162 #if defined(PETSC_COMPLEX)
163         sum += real(conj(*v)*(*v)); v++;
164 #else
165         sum += (*v)*(*v); v++;
166 #endif
167       }
168       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
169       *norm = sqrt(*norm);
170     }
171     else
172       SETERRQ(1,"MatNorm_SeqBAIJ:No support for this norm yet");
173   }
174   return 0;
175 }
176 
177 static int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
178 {
179   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
180   MPI_Comm    comm = mat->comm;
181   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
182   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
183   MPI_Request *send_waits,*recv_waits;
184   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
185   InsertMode  addv;
186   Scalar      *rvalues,*svalues;
187 
188   /* make sure all processors are either in INSERTMODE or ADDMODE */
189   MPI_Allreduce(&baij->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
190   if (addv == (ADD_VALUES|INSERT_VALUES)) {
191     SETERRQ(1,"MatAssemblyBegin_MPIBAIJ:Some processors inserted others added");
192   }
193   baij->insertmode = addv; /* in case this processor had no cache */
194 
195   /*  first count number of contributors to each processor */
196   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
197   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
198   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
199   for ( i=0; i<baij->stash.n; i++ ) {
200     idx = baij->stash.idx[i];
201     for ( j=0; j<size; j++ ) {
202       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
203         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
204       }
205     }
206   }
207   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
208 
209   /* inform other processors of number of messages and max length*/
210   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
211   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
212   nreceives = work[rank];
213   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
214   nmax = work[rank];
215   PetscFree(work);
216 
217   /* post receives:
218        1) each message will consist of ordered pairs
219      (global index,value) we store the global index as a double
220      to simplify the message passing.
221        2) since we don't know how long each individual message is we
222      allocate the largest needed buffer for each receive. Potentially
223      this is a lot of wasted space.
224 
225 
226        This could be done better.
227   */
228   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
229   CHKPTRQ(rvalues);
230   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
231   CHKPTRQ(recv_waits);
232   for ( i=0; i<nreceives; i++ ) {
233     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
234               comm,recv_waits+i);
235   }
236 
237   /* do sends:
238       1) starts[i] gives the starting index in svalues for stuff going to
239          the ith processor
240   */
241   svalues = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
242   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
243   CHKPTRQ(send_waits);
244   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
245   starts[0] = 0;
246   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
247   for ( i=0; i<baij->stash.n; i++ ) {
248     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
249     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
250     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
251   }
252   PetscFree(owner);
253   starts[0] = 0;
254   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
255   count = 0;
256   for ( i=0; i<size; i++ ) {
257     if (procs[i]) {
258       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
259                 comm,send_waits+count++);
260     }
261   }
262   PetscFree(starts); PetscFree(nprocs);
263 
264   /* Free cache space */
265   PLogInfo(0,"[%d]MatAssemblyBegin_MPIBAIJ:Number of off processor values %d\n",rank,baij->stash.n);
266   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
267 
268   baij->svalues    = svalues;    baij->rvalues    = rvalues;
269   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
270   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
271   baij->rmax       = nmax;
272 
273   return 0;
274 }
275 
276 
277 static int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
278 {
279   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
280   MPI_Status  *send_status,recv_status;
281   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
282   int         bs=baij->bs,row,col,other_disassembled;
283   Scalar      *values,val;
284   InsertMode  addv = baij->insertmode;
285 
286   /*  wait on receives */
287   while (count) {
288     MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);
289     /* unpack receives into our local space */
290     values = baij->rvalues + 3*imdex*baij->rmax;
291     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
292     n = n/3;
293     for ( i=0; i<n; i++ ) {
294       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
295       col = (int) PetscReal(values[3*i+1]);
296       val = values[3*i+2];
297       if (col >= baij->cstart*bs && col < baij->cend*bs) {
298         col -= baij->cstart*bs;
299         MatSetValues(baij->A,1,&row,1,&col,&val,addv);
300       }
301       else {
302         if (mat->was_assembled) {
303           if (!baij->colmap) {ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);}
304           col = baij->colmap[col/bs]*bs + col%bs;
305           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
306             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
307             col = (int) PetscReal(values[3*i+1]);
308           }
309         }
310         MatSetValues(baij->B,1,&row,1,&col,&val,addv);
311       }
312     }
313     count--;
314   }
315   PetscFree(baij->recv_waits); PetscFree(baij->rvalues);
316 
317   /* wait on sends */
318   if (baij->nsends) {
319     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));
320     CHKPTRQ(send_status);
321     MPI_Waitall(baij->nsends,baij->send_waits,send_status);
322     PetscFree(send_status);
323   }
324   PetscFree(baij->send_waits); PetscFree(baij->svalues);
325 
326   baij->insertmode = NOT_SET_VALUES;
327   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
328   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
329 
330   /* determine if any processor has disassembled, if so we must
331      also disassemble ourselfs, in order that we may reassemble. */
332   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
333   if (mat->was_assembled && !other_disassembled) {
334     ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
335   }
336 
337   if (!mat->was_assembled && mode == FINAL_ASSEMBLY) {
338     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
339   }
340   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
341   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
342 
343   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
344   return 0;
345 }
346 
347 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
348 {
349   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
350   int          ierr;
351 
352   if (baij->size == 1) {
353     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
354   }
355   else SETERRQ(1,"MatView_MPIBAIJ_Binary:Only uniprocessor output supported");
356   return 0;
357 }
358 
359 static int MatView_MPIBAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
360 {
361   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
362   int          ierr, format,rank,bs=baij->bs;
363   FILE         *fd;
364   ViewerType   vtype;
365 
366   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
367   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
368     ierr = ViewerGetFormat(viewer,&format);
369     if (format == ASCII_FORMAT_INFO_DETAILED) {
370       int nz, nzalloc, mem;
371       MPI_Comm_rank(mat->comm,&rank);
372       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
373       ierr = MatGetInfo(mat,MAT_LOCAL,&nz,&nzalloc,&mem);
374       PetscSequentialPhaseBegin(mat->comm,1);
375       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
376               rank,baij->m,nz*bs,nzalloc*bs,baij->bs,mem);
377       ierr = MatGetInfo(baij->A,MAT_LOCAL,&nz,&nzalloc,&mem);
378       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,nz*bs);
379       ierr = MatGetInfo(baij->B,MAT_LOCAL,&nz,&nzalloc,&mem);
380       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,nz*bs);
381       fflush(fd);
382       PetscSequentialPhaseEnd(mat->comm,1);
383       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
384       return 0;
385     }
386     else if (format == ASCII_FORMAT_INFO) {
387       return 0;
388     }
389   }
390 
391   if (vtype == DRAW_VIEWER) {
392     Draw       draw;
393     PetscTruth isnull;
394     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
395     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
396   }
397 
398   if (vtype == ASCII_FILE_VIEWER) {
399     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
400     PetscSequentialPhaseBegin(mat->comm,1);
401     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
402            baij->rank,baij->m,baij->rstart*bs,baij->rend*bs,baij->n,
403             baij->cstart*bs,baij->cend*bs);
404     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
405     ierr = MatView(baij->B,viewer); CHKERRQ(ierr);
406     fflush(fd);
407     PetscSequentialPhaseEnd(mat->comm,1);
408   }
409   else {
410     int size = baij->size;
411     rank = baij->rank;
412     if (size == 1) {
413       ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
414     }
415     else {
416       /* assemble the entire matrix onto first processor. */
417       Mat         A;
418       Mat_SeqBAIJ *Aloc;
419       int         M = baij->M, N = baij->N,*ai,*aj,row,col,i,j,k,*rvals;
420       int         mbs=baij->mbs;
421       Scalar      *a;
422 
423       if (!rank) {
424         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
425         CHKERRQ(ierr);
426       }
427       else {
428         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
429         CHKERRQ(ierr);
430       }
431       PLogObjectParent(mat,A);
432 
433       /* copy over the A part */
434       Aloc = (Mat_SeqBAIJ*) baij->A->data;
435       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
436       row = baij->rstart;
437       rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
438 
439       for ( i=0; i<mbs; i++ ) {
440         rvals[0] = bs*(baij->rstart + i);
441         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
442         for ( j=ai[i]; j<ai[i+1]; j++ ) {
443           col = (baij->cstart+aj[j])*bs;
444           for (k=0; k<bs; k++ ) {
445             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
446             col++; a += bs;
447           }
448         }
449       }
450       /* copy over the B part */
451       Aloc = (Mat_SeqBAIJ*) baij->B->data;
452       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
453       row = baij->rstart*bs;
454       for ( i=0; i<mbs; i++ ) {
455         rvals[0] = bs*(baij->rstart + i);
456         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
457         for ( j=ai[i]; j<ai[i+1]; j++ ) {
458           col = baij->garray[aj[j]]*bs;
459           for (k=0; k<bs; k++ ) {
460             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
461             col++; a += bs;
462           }
463         }
464       }
465       PetscFree(rvals);
466       ierr = MatAssemblyBegin(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
467       ierr = MatAssemblyEnd(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
468       if (!rank) {
469         ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
470       }
471       ierr = MatDestroy(A); CHKERRQ(ierr);
472     }
473   }
474   return 0;
475 }
476 
477 
478 
479 static int MatView_MPIBAIJ(PetscObject obj,Viewer viewer)
480 {
481   Mat         mat = (Mat) obj;
482   int         ierr;
483   ViewerType  vtype;
484 
485   if (!viewer) {
486     viewer = STDOUT_VIEWER_SELF;
487   }
488   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
489   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
490       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
491     ierr = MatView_MPIBAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
492   }
493   else if (vtype == BINARY_FILE_VIEWER) {
494     return MatView_MPIBAIJ_Binary(mat,viewer);
495   }
496   return 0;
497 }
498 
499 static int MatDestroy_MPIBAIJ(PetscObject obj)
500 {
501   Mat         mat = (Mat) obj;
502   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
503   int         ierr;
504 
505 #if defined(PETSC_LOG)
506   PLogObjectState(obj,"Rows=%d, Cols=%d",baij->M,baij->N);
507 #endif
508 
509   PetscFree(baij->rowners);
510   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
511   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
512   if (baij->colmap) PetscFree(baij->colmap);
513   if (baij->garray) PetscFree(baij->garray);
514   if (baij->lvec)   VecDestroy(baij->lvec);
515   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
516   if (baij->rowvalues) PetscFree(baij->rowvalues);
517   PetscFree(baij);
518   PLogObjectDestroy(mat);
519   PetscHeaderDestroy(mat);
520   return 0;
521 }
522 
523 static int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
524 {
525   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
526   int        ierr;
527 
528   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx); CHKERRQ(ierr);
529   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
530   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx); CHKERRQ(ierr);
531   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
532   return 0;
533 }
534 
535 static int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
536 {
537   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
538   int        ierr;
539   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
540   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
541   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
542   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
543   return 0;
544 }
545 
546 static int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
547 {
548   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
549   int        ierr;
550 
551   /* do nondiagonal part */
552   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
553   /* send it on its way */
554   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,
555                 (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
556   /* do local part */
557   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
558   /* receive remote parts: note this assumes the values are not actually */
559   /* inserted in yy until the next line, which is true for my implementation*/
560   /* but is not perhaps always true. */
561   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,
562                   (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
563   return 0;
564 }
565 
566 static int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
567 {
568   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
569   int        ierr;
570 
571   /* do nondiagonal part */
572   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
573   /* send it on its way */
574   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,
575                  (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
576   /* do local part */
577   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
578   /* receive remote parts: note this assumes the values are not actually */
579   /* inserted in yy until the next line, which is true for my implementation*/
580   /* but is not perhaps always true. */
581   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,
582                   (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
583   return 0;
584 }
585 
586 /*
587   This only works correctly for square matrices where the subblock A->A is the
588    diagonal block
589 */
590 static int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
591 {
592   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
593   if (a->M != a->N)
594     SETERRQ(1,"MatGetDiagonal_MPIBAIJ:Supports only square matrix where A->A is diag block");
595   return MatGetDiagonal(a->A,v);
596 }
597 
598 static int MatScale_MPIBAIJ(Scalar *aa,Mat A)
599 {
600   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
601   int        ierr;
602   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
603   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
604   return 0;
605 }
606 static int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
607 {
608   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
609   *m = mat->M; *n = mat->N;
610   return 0;
611 }
612 
613 static int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
614 {
615   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
616   *m = mat->m; *n = mat->N;
617   return 0;
618 }
619 
620 static int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
621 {
622   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
623   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
624   return 0;
625 }
626 
627 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
628 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
629 
630 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
631 {
632   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
633   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
634   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
635   int        nztot, nzA, nzB, lrow, brstart = mat->rstart*bs, brend = mat->rend*bs;
636   int        *cmap, *idx_p,cstart = mat->cstart;
637 
638   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,"MatGetRow_MPIBAIJ:Already active");
639   mat->getrowactive = PETSC_TRUE;
640 
641   if (!mat->rowvalues && (idx || v)) {
642     /*
643         allocate enough space to hold information from the longest row.
644     */
645     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
646     int     max = 1,mbs = mat->mbs,tmp;
647     for ( i=0; i<mbs; i++ ) {
648       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
649       if (max < tmp) { max = tmp; }
650     }
651     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
652     CHKPTRQ(mat->rowvalues);
653     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
654   }
655 
656 
657   if (row < brstart || row >= brend) SETERRQ(1,"MatGetRow_MPIBAIJ:Only local rows")
658   lrow = row - brstart;
659 
660   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
661   if (!v)   {pvA = 0; pvB = 0;}
662   if (!idx) {pcA = 0; if (!v) pcB = 0;}
663   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
664   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
665   nztot = nzA + nzB;
666 
667   cmap  = mat->garray;
668   if (v  || idx) {
669     if (nztot) {
670       /* Sort by increasing column numbers, assuming A and B already sorted */
671       int imark = -1;
672       if (v) {
673         *v = v_p = mat->rowvalues;
674         for ( i=0; i<nzB; i++ ) {
675           if (cmap[cworkB[i]/bs] < cstart)   v_p[i] = vworkB[i];
676           else break;
677         }
678         imark = i;
679         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
680         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
681       }
682       if (idx) {
683         *idx = idx_p = mat->rowindices;
684         if (imark > -1) {
685           for ( i=0; i<imark; i++ ) {
686             idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs;
687           }
688         } else {
689           for ( i=0; i<nzB; i++ ) {
690             if (cmap[cworkB[i]/bs] < cstart)
691               idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
692             else break;
693           }
694           imark = i;
695         }
696         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart*bs + cworkA[i];
697         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
698       }
699     }
700     else {
701       if (idx) *idx = 0;
702       if (v)   *v   = 0;
703     }
704   }
705   *nz = nztot;
706   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
707   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
708   return 0;
709 }
710 
711 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
712 {
713   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
714   if (baij->getrowactive == PETSC_FALSE) {
715     SETERRQ(1,"MatRestoreRow_MPIBAIJ:MatGetRow not called");
716   }
717   baij->getrowactive = PETSC_FALSE;
718   return 0;
719 }
720 
721 /* -------------------------------------------------------------------*/
722 static struct _MatOps MatOps = {
723   MatSetValues_MPIBAIJ,MatGetRow_MPIBAIJ,MatRestoreRow_MPIBAIJ,MatMult_MPIBAIJ,
724   MatMultAdd_MPIBAIJ,MatMultTrans_MPIBAIJ,MatMultTransAdd_MPIBAIJ,0,
725   0,0,0,0,
726   0,0,0,0,
727   0,MatGetDiagonal_MPIBAIJ,0,MatNorm_MPIBAIJ,
728   MatAssemblyBegin_MPIBAIJ,MatAssemblyEnd_MPIBAIJ,0,0,
729   0,0,MatGetReordering_MPIBAIJ,0,
730   0,0,0,MatGetSize_MPIBAIJ,
731   MatGetLocalSize_MPIBAIJ,MatGetOwnershipRange_MPIBAIJ,0,0,
732   0,0,0,0,
733   0,0,0,0,
734   0,0,0,MatGetSubMatrices_MPIBAIJ,
735   MatIncreaseOverlap_MPIBAIJ,MatGetValues_MPIBAIJ,0,0,
736   MatScale_MPIBAIJ,0,0};
737 
738 
739 /*@C
740    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
741    (block compressed row).  For good matrix assembly performance
742    the user should preallocate the matrix storage by setting the parameters
743    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
744    performance can be increased by more than a factor of 50.
745 
746    Input Parameters:
747 .  comm - MPI communicator
748 .  bs   - size of blockk
749 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
750            This value should be the same as the local size used in creating the
751            y vector for the matrix-vector product y = Ax.
752 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
753            This value should be the same as the local size used in creating the
754            x vector for the matrix-vector product y = Ax.
755 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
756 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
757 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
758            submatrix  (same for all local rows)
759 .  d_nzz - array containing the number of block nonzeros in the various block rows
760            of the in diagonal portion of the local (possibly different for each block
761            row) or PETSC_NULL.  You must leave room for the diagonal entry even if
762            it is zero.
763 .  o_nz  - number of block nonzeros per block row in the off-diagonal portion of local
764            submatrix (same for all local rows).
765 .  o_nzz - array containing the number of nonzeros in the various block rows of the
766            off-diagonal portion of the local submatrix (possibly different for
767            each block row) or PETSC_NULL.
768 
769    Output Parameter:
770 .  A - the matrix
771 
772    Notes:
773    The user MUST specify either the local or global matrix dimensions
774    (possibly both).
775 
776    Storage Information:
777    For a square global matrix we define each processor's diagonal portion
778    to be its local rows and the corresponding columns (a square submatrix);
779    each processor's off-diagonal portion encompasses the remainder of the
780    local matrix (a rectangular submatrix).
781 
782    The user can specify preallocated storage for the diagonal part of
783    the local submatrix with either d_nz or d_nnz (not both).  Set
784    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
785    memory allocation.  Likewise, specify preallocated storage for the
786    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
787 
788    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
789    the figure below we depict these three local rows and all columns (0-11).
790 
791 $          0 1 2 3 4 5 6 7 8 9 10 11
792 $         -------------------
793 $  row 3  |  o o o d d d o o o o o o
794 $  row 4  |  o o o d d d o o o o o o
795 $  row 5  |  o o o d d d o o o o o o
796 $         -------------------
797 $
798 
799    Thus, any entries in the d locations are stored in the d (diagonal)
800    submatrix, and any entries in the o locations are stored in the
801    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
802    stored simply in the MATSEQBAIJ format for compressed row storage.
803 
804    Now d_nz should indicate the number of nonzeros per row in the d matrix,
805    and o_nz should indicate the number of nonzeros per row in the o matrix.
806    In general, for PDE problems in which most nonzeros are near the diagonal,
807    one expects d_nz >> o_nz.   For large problems you MUST preallocate memory
808    or you will get TERRIBLE performance; see the users' manual chapter on
809    matrices and the file $(PETSC_DIR)/Performance.
810 
811 .keywords: matrix, block, aij, compressed row, sparse, parallel
812 
813 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues()
814 @*/
815 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
816                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
817 {
818   Mat          B;
819   Mat_MPIBAIJ  *b;
820   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE;
821 
822   if (bs < 1) SETERRQ(1,"MatCreateMPIBAIJ: invalid block size specified");
823   *A = 0;
824   PetscHeaderCreate(B,_Mat,MAT_COOKIE,MATMPIBAIJ,comm);
825   PLogObjectCreate(B);
826   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
827   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
828   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
829   B->destroy    = MatDestroy_MPIBAIJ;
830   B->view       = MatView_MPIBAIJ;
831 
832   B->factor     = 0;
833   B->assembled  = PETSC_FALSE;
834 
835   b->insertmode = NOT_SET_VALUES;
836   MPI_Comm_rank(comm,&b->rank);
837   MPI_Comm_size(comm,&b->size);
838 
839   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
840     SETERRQ(1,"MatCreateMPIBAIJ:Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
841   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) SETERRQ(1,"MatCreateMPIBAIJ: either M or m should be specified");
842   if ( M == PETSC_DECIDE && n == PETSC_DECIDE)SETERRQ(1,"MatCreateMPIBAIJ: either N or n should be specified");
843   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
844   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
845 
846   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
847     work[0] = m; work[1] = n;
848     mbs = m/bs; nbs = n/bs;
849     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
850     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
851     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
852   }
853   if (m == PETSC_DECIDE) {
854     Mbs = M/bs;
855     if (Mbs*bs != M) SETERRQ(1,"MatCreateMPIBAIJ: No of global rows must be divisible by blocksize");
856     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
857     m   = mbs*bs;
858   }
859   if (n == PETSC_DECIDE) {
860     Nbs = N/bs;
861     if (Nbs*bs != N) SETERRQ(1,"MatCreateMPIBAIJ: No of global cols must be divisible by blocksize");
862     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
863     n   = nbs*bs;
864   }
865   if (mbs*bs != m || nbs*bs != n) SETERRQ(1,"MatCreateMPIBAIJ: No of local rows, cols must be divisible by blocksize");
866 
867   b->m = m; B->m = m;
868   b->n = n; B->n = n;
869   b->N = N; B->N = N;
870   b->M = M; B->M = M;
871   b->bs  = bs;
872   b->bs2 = bs*bs;
873   b->mbs = mbs;
874   b->nbs = nbs;
875   b->Mbs = Mbs;
876   b->Nbs = Nbs;
877 
878   /* build local table of row and column ownerships */
879   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
880   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
881   b->cowners = b->rowners + b->size + 1;
882   MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
883   b->rowners[0] = 0;
884   for ( i=2; i<=b->size; i++ ) {
885     b->rowners[i] += b->rowners[i-1];
886   }
887   b->rstart = b->rowners[b->rank];
888   b->rend   = b->rowners[b->rank+1];
889   MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
890   b->cowners[0] = 0;
891   for ( i=2; i<=b->size; i++ ) {
892     b->cowners[i] += b->cowners[i-1];
893   }
894   b->cstart = b->cowners[b->rank];
895   b->cend   = b->cowners[b->rank+1];
896 
897   if (d_nz == PETSC_DEFAULT) d_nz = 5;
898   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
899   PLogObjectParent(B,b->A);
900   if (o_nz == PETSC_DEFAULT) o_nz = 0;
901   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
902   PLogObjectParent(B,b->B);
903 
904   /* build cache for off array entries formed */
905   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
906   b->colmap      = 0;
907   b->garray      = 0;
908   b->roworiented = 1;
909 
910   /* stuff used for matrix vector multiply */
911   b->lvec      = 0;
912   b->Mvctx     = 0;
913 
914   /* stuff for MatGetRow() */
915   b->rowindices   = 0;
916   b->rowvalues    = 0;
917   b->getrowactive = PETSC_FALSE;
918 
919   *A = B;
920   return 0;
921 }
922 
923 #include "sys.h"
924 
925 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
926 {
927   Mat          A;
928   int          i, nz, ierr, j,rstart, rend, fd;
929   Scalar       *vals,*buf;
930   MPI_Comm     comm = ((PetscObject)viewer)->comm;
931   MPI_Status   status;
932   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
933   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
934   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,bs2,Mbs,mbs,extra_rows;
935   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
936   int          dcount,kmax,k,nzcount,tmp;
937 
938 
939   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
940   bs2  = bs*bs;
941 
942   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
943   if (!rank) {
944     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
945     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
946     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIBAIJ:not matrix object");
947   }
948 
949   MPI_Bcast(header+1,3,MPI_INT,0,comm);
950   M = header[1]; N = header[2];
951 
952 
953   if (M != N) SETERRQ(1,"MatLoad_SeqBAIJ:Can only do square matrices");
954 
955   /*
956      This code adds extra rows to make sure the number of rows is
957      divisible by the blocksize
958   */
959   Mbs        = M/bs;
960   extra_rows = bs - M + bs*(Mbs);
961   if (extra_rows == bs) extra_rows = 0;
962   else                  Mbs++;
963   if (extra_rows &&!rank) {
964     PLogInfo(0,"MatLoad_SeqBAIJ:Padding loaded matrix to match blocksize");
965   }
966   /* determine ownership of all rows */
967   mbs = Mbs/size + ((Mbs % size) > rank);
968   m   = mbs * bs;
969   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
970   browners = rowners + size + 1;
971   MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);
972   rowners[0] = 0;
973   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
974   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
975   rstart = rowners[rank];
976   rend   = rowners[rank+1];
977 
978   /* distribute row lengths to all processors */
979   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
980   if (!rank) {
981     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
982     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
983     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
984     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
985     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
986     MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);
987     PetscFree(sndcounts);
988   }
989   else {
990     MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);
991   }
992 
993   if (!rank) {
994     /* calculate the number of nonzeros on each processor */
995     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
996     PetscMemzero(procsnz,size*sizeof(int));
997     for ( i=0; i<size; i++ ) {
998       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
999         procsnz[i] += rowlengths[j];
1000       }
1001     }
1002     PetscFree(rowlengths);
1003 
1004     /* determine max buffer needed and allocate it */
1005     maxnz = 0;
1006     for ( i=0; i<size; i++ ) {
1007       maxnz = PetscMax(maxnz,procsnz[i]);
1008     }
1009     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1010 
1011     /* read in my part of the matrix column indices  */
1012     nz = procsnz[0];
1013     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1014     mycols = ibuf;
1015     if (size == 1)  nz -= extra_rows;
1016     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1017     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
1018 
1019     /* read in every ones (except the last) and ship off */
1020     for ( i=1; i<size-1; i++ ) {
1021       nz = procsnz[i];
1022       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1023       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1024     }
1025     /* read in the stuff for the last proc */
1026     if ( size != 1 ) {
1027       nz = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
1028       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1029       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
1030       MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);
1031     }
1032     PetscFree(cols);
1033   }
1034   else {
1035     /* determine buffer space needed for message */
1036     nz = 0;
1037     for ( i=0; i<m; i++ ) {
1038       nz += locrowlens[i];
1039     }
1040     ibuf = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1041     mycols = ibuf;
1042     /* receive message of column indices*/
1043     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1044     MPI_Get_count(&status,MPI_INT,&maxnz);
1045     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIBAIJ:something is wrong with file");
1046   }
1047 
1048   /* loop over local rows, determining number of off diagonal entries */
1049   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
1050   odlens = dlens + (rend-rstart);
1051   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
1052   PetscMemzero(mask,3*Mbs*sizeof(int));
1053   masked1 = mask    + Mbs;
1054   masked2 = masked1 + Mbs;
1055   rowcount = 0; nzcount = 0;
1056   for ( i=0; i<mbs; i++ ) {
1057     dcount  = 0;
1058     odcount = 0;
1059     for ( j=0; j<bs; j++ ) {
1060       kmax = locrowlens[rowcount];
1061       for ( k=0; k<kmax; k++ ) {
1062         tmp = mycols[nzcount++]/bs;
1063         if (!mask[tmp]) {
1064           mask[tmp] = 1;
1065           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
1066           else masked1[dcount++] = tmp;
1067         }
1068       }
1069       rowcount++;
1070     }
1071 
1072     dlens[i]  = dcount;
1073     odlens[i] = odcount;
1074 
1075     /* zero out the mask elements we set */
1076     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
1077     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
1078   }
1079 
1080   /* create our matrix */
1081   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);CHKERRQ(ierr);
1082   A = *newmat;
1083   MatSetOption(A,COLUMNS_SORTED);
1084 
1085   if (!rank) {
1086     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
1087     /* read in my part of the matrix numerical values  */
1088     nz = procsnz[0];
1089     vals = buf;
1090     mycols = ibuf;
1091     if (size == 1)  nz -= extra_rows;
1092     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1093     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
1094     /* insert into matrix */
1095     jj      = rstart*bs;
1096     for ( i=0; i<m; i++ ) {
1097       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1098       mycols += locrowlens[i];
1099       vals   += locrowlens[i];
1100       jj++;
1101     }
1102     /* read in other processors( except the last one) and ship out */
1103     for ( i=1; i<size-1; i++ ) {
1104       nz = procsnz[i];
1105       vals = buf;
1106       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1107       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1108     }
1109     /* the last proc */
1110     if ( size != 1 ){
1111       nz = procsnz[i] - extra_rows;
1112       vals = buf;
1113       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1114       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
1115       MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);
1116     }
1117     PetscFree(procsnz);
1118   }
1119   else {
1120     /* receive numeric values */
1121     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
1122 
1123     /* receive message of values*/
1124     vals = buf;
1125     mycols = ibuf;
1126     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1127     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1128     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIBAIJ:something is wrong with file");
1129 
1130     /* insert into matrix */
1131     jj      = rstart*bs;
1132     for ( i=0; i<m; i++ ) {
1133       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1134       mycols += locrowlens[i];
1135       vals   += locrowlens[i];
1136       jj++;
1137     }
1138   }
1139   PetscFree(locrowlens);
1140   PetscFree(buf);
1141   PetscFree(ibuf);
1142   PetscFree(rowners);
1143   PetscFree(dlens);
1144   PetscFree(mask);
1145   ierr = MatAssemblyBegin(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
1146   ierr = MatAssemblyEnd(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
1147   return 0;
1148 }
1149 
1150 
1151