xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision 5da98dde8008eff33126359474c4036a9818d3ed)
1 #ifndef lint
2 static char vcid[] = "$Id: mpibaij.c,v 1.8 1996/07/02 19:50:37 balay Exp balay $";
3 #endif
4 
5 #include "mpibaij.h"
6 
7 
8 #include "draw.h"
9 #include "pinclude/pviewer.h"
10 
11 extern int MatSetUpMultiply_MPIBAIJ(Mat);
12 extern int DisAssemble_MPIBAIJ(Mat);
13 extern int MatIncreaseOverlap_MPIBAIJ(Mat,int,IS *,int);
14 extern int MatGetSubMatrices_MPIBAIJ(Mat,int,IS *,IS *,MatGetSubMatrixCall,Mat **);
15 
16 /* local utility routine that creates a mapping from the global column
17 number to the local number in the off-diagonal part of the local
18 storage of the matrix.  This is done in a non scable way since the
19 length of colmap equals the global matrix length.
20 */
21 static int CreateColmap_MPIBAIJ_Private(Mat mat)
22 {
23   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
24   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
25   int        nbs = B->nbs,i;
26 
27   baij->colmap = (int *) PetscMalloc(baij->Nbs*sizeof(int));CHKPTRQ(baij->colmap);
28   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
29   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
30   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i;
31   return 0;
32 }
33 
34 
35 static int MatGetReordering_MPIBAIJ(Mat mat,MatReordering type,IS *rperm,IS *cperm)
36 {
37   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
38   int         ierr;
39   if (baij->size == 1) {
40     ierr = MatGetReordering(baij->A,type,rperm,cperm); CHKERRQ(ierr);
41   } else SETERRQ(1,"MatGetReordering_MPIBAIJ:not supported in parallel");
42   return 0;
43 }
44 
45 static int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
46 {
47   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
48   Scalar      value;
49   int         ierr,i,j, rstart = baij->rstart, rend = baij->rend;
50   int         cstart = baij->cstart, cend = baij->cend,row,col;
51   int         roworiented = baij->roworiented,rstart_orig,rend_orig;
52   int         cstart_orig,cend_orig,bs=baij->bs;
53 
54   if (baij->insertmode != NOT_SET_VALUES && baij->insertmode != addv) {
55     SETERRQ(1,"MatSetValues_MPIBAIJ:Cannot mix inserts and adds");
56   }
57   baij->insertmode = addv;
58   rstart_orig = rstart*bs;
59   rend_orig   = rend*bs;
60   cstart_orig = cstart*bs;
61   cend_orig   = cend*bs;
62   for ( i=0; i<m; i++ ) {
63     if (im[i] < 0) SETERRQ(1,"MatSetValues_MPIBAIJ:Negative row");
64     if (im[i] >= baij->M) SETERRQ(1,"MatSetValues_MPIBAIJ:Row too large");
65     if (im[i] >= rstart_orig && im[i] < rend_orig) {
66       row = im[i] - rstart_orig;
67       for ( j=0; j<n; j++ ) {
68         if (in[j] < 0) SETERRQ(1,"MatSetValues_MPIBAIJ:Negative column");
69         if (in[j] >= baij->N) SETERRQ(1,"MatSetValues_MPIBAIJ:Col too large");
70         if (in[j] >= cstart_orig && in[j] < cend_orig){
71           col = in[j] - cstart_orig;
72           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
73           ierr = MatSetValues(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
74         }
75         else {
76           if (mat->was_assembled) {
77             if (!baij->colmap) {ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);}
78             col = baij->colmap[in[j]];
79             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
80               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
81               col =  in[j];
82             }
83           }
84           else col = in[j];
85           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
86           ierr = MatSetValues(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
87         }
88       }
89     }
90     else {
91       if (roworiented) {
92         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
93       }
94       else {
95         row = im[i];
96         for ( j=0; j<n; j++ ) {
97           ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
98         }
99       }
100     }
101   }
102   return 0;
103 }
104 
105 static int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
106 {
107   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
108   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
109   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col;
110 
111   for ( i=0; i<m; i++ ) {
112     if (idxm[i] < 0) SETERRQ(1,"MatGetValues_MPIBAIJ:Negative row");
113     if (idxm[i] >= baij->M) SETERRQ(1,"MatGetValues_MPIBAIJ:Row too large");
114     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
115       row = idxm[i] - bsrstart;
116       for ( j=0; j<n; j++ ) {
117         if (idxn[j] < 0) SETERRQ(1,"MatGetValues_MPIBAIJ:Negative column");
118         if (idxn[j] >= baij->N) SETERRQ(1,"MatGetValues_MPIBAIJ:Col too large");
119         if (idxn[j] >= bscstart && idxn[j] < bscend){
120           col = idxn[j] - bscstart;
121           ierr = MatGetValues(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
122         }
123         else {
124           if (!baij->colmap) {ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);}
125           if ( baij->garray[baij->colmap[idxn[j]/bs]] != idxn[j]/bs ) *(v+i*n+j) = 0.0;
126           else {
127             col = baij->colmap[idxn[j]/bs]*bs + idxn[j]%bs;
128             ierr = MatGetValues(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
129           }
130         }
131       }
132     }
133     else {
134       SETERRQ(1,"MatGetValues_MPIBAIJ:Only local values currently supported");
135     }
136   }
137   return 0;
138 }
139 
140 static int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
141 {
142   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
143   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
144   int        ierr, i,bs2=baij->bs2;
145   double     sum = 0.0;
146   Scalar     *v;
147 
148   if (baij->size == 1) {
149     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
150   } else {
151     if (type == NORM_FROBENIUS) {
152       v = amat->a;
153       for (i=0; i<amat->nz*bs2; i++ ) {
154 #if defined(PETSC_COMPLEX)
155         sum += real(conj(*v)*(*v)); v++;
156 #else
157         sum += (*v)*(*v); v++;
158 #endif
159       }
160       v = bmat->a;
161       for (i=0; i<bmat->nz*bs2; i++ ) {
162 #if defined(PETSC_COMPLEX)
163         sum += real(conj(*v)*(*v)); v++;
164 #else
165         sum += (*v)*(*v); v++;
166 #endif
167       }
168       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
169       *norm = sqrt(*norm);
170     }
171     else
172       SETERRQ(1,"MatNorm_SeqBAIJ:No support for this norm yet");
173   }
174   return 0;
175 }
176 
177 static int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
178 {
179   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
180   MPI_Comm    comm = mat->comm;
181   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
182   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
183   MPI_Request *send_waits,*recv_waits;
184   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
185   InsertMode  addv;
186   Scalar      *rvalues,*svalues;
187 
188   /* make sure all processors are either in INSERTMODE or ADDMODE */
189   MPI_Allreduce(&baij->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
190   if (addv == (ADD_VALUES|INSERT_VALUES)) {
191     SETERRQ(1,"MatAssemblyBegin_MPIBAIJ:Some processors inserted others added");
192   }
193   baij->insertmode = addv; /* in case this processor had no cache */
194 
195   /*  first count number of contributors to each processor */
196   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
197   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
198   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
199   for ( i=0; i<baij->stash.n; i++ ) {
200     idx = baij->stash.idx[i];
201     for ( j=0; j<size; j++ ) {
202       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
203         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
204       }
205     }
206   }
207   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
208 
209   /* inform other processors of number of messages and max length*/
210   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
211   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
212   nreceives = work[rank];
213   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
214   nmax = work[rank];
215   PetscFree(work);
216 
217   /* post receives:
218        1) each message will consist of ordered pairs
219      (global index,value) we store the global index as a double
220      to simplify the message passing.
221        2) since we don't know how long each individual message is we
222      allocate the largest needed buffer for each receive. Potentially
223      this is a lot of wasted space.
224 
225 
226        This could be done better.
227   */
228   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
229   CHKPTRQ(rvalues);
230   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
231   CHKPTRQ(recv_waits);
232   for ( i=0; i<nreceives; i++ ) {
233     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
234               comm,recv_waits+i);
235   }
236 
237   /* do sends:
238       1) starts[i] gives the starting index in svalues for stuff going to
239          the ith processor
240   */
241   svalues = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
242   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
243   CHKPTRQ(send_waits);
244   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
245   starts[0] = 0;
246   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
247   for ( i=0; i<baij->stash.n; i++ ) {
248     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
249     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
250     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
251   }
252   PetscFree(owner);
253   starts[0] = 0;
254   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
255   count = 0;
256   for ( i=0; i<size; i++ ) {
257     if (procs[i]) {
258       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
259                 comm,send_waits+count++);
260     }
261   }
262   PetscFree(starts); PetscFree(nprocs);
263 
264   /* Free cache space */
265   PLogInfo(0,"[%d]MatAssemblyBegin_MPIBAIJ:Number of off processor values %d\n",rank,baij->stash.n);
266   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
267 
268   baij->svalues    = svalues;    baij->rvalues    = rvalues;
269   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
270   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
271   baij->rmax       = nmax;
272 
273   return 0;
274 }
275 
276 
277 static int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
278 {
279   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
280   MPI_Status  *send_status,recv_status;
281   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
282   int         bs=baij->bs,row,col,other_disassembled;
283   Scalar      *values,val;
284   InsertMode  addv = baij->insertmode;
285 
286   /*  wait on receives */
287   while (count) {
288     MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);
289     /* unpack receives into our local space */
290     values = baij->rvalues + 3*imdex*baij->rmax;
291     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
292     n = n/3;
293     for ( i=0; i<n; i++ ) {
294       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
295       col = (int) PetscReal(values[3*i+1]);
296       val = values[3*i+2];
297       if (col >= baij->cstart*bs && col < baij->cend*bs) {
298         col -= baij->cstart*bs;
299         MatSetValues(baij->A,1,&row,1,&col,&val,addv);
300       }
301       else {
302         if (mat->was_assembled) {
303           if (!baij->colmap) {ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);}
304           col = baij->colmap[col/bs]*bs + col%bs;
305           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
306             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
307             col = (int) PetscReal(values[3*i+1]);
308           }
309         }
310         MatSetValues(baij->B,1,&row,1,&col,&val,addv);
311       }
312     }
313     count--;
314   }
315   PetscFree(baij->recv_waits); PetscFree(baij->rvalues);
316 
317   /* wait on sends */
318   if (baij->nsends) {
319     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));
320     CHKPTRQ(send_status);
321     MPI_Waitall(baij->nsends,baij->send_waits,send_status);
322     PetscFree(send_status);
323   }
324   PetscFree(baij->send_waits); PetscFree(baij->svalues);
325 
326   baij->insertmode = NOT_SET_VALUES;
327   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
328   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
329 
330   /* determine if any processor has disassembled, if so we must
331      also disassemble ourselfs, in order that we may reassemble. */
332   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
333   if (mat->was_assembled && !other_disassembled) {
334     ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
335   }
336 
337   if (!mat->was_assembled && mode == FINAL_ASSEMBLY) {
338     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
339   }
340   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
341   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
342 
343   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
344   return 0;
345 }
346 
347 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
348 {
349   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
350   int          ierr;
351 
352   if (baij->size == 1) {
353     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
354   }
355   else SETERRQ(1,"MatView_MPIBAIJ_Binary:Only uniprocessor output supported");
356   return 0;
357 }
358 
359 static int MatView_MPIBAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
360 {
361   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
362   int          ierr, format,rank,bs=baij->bs;
363   FILE         *fd;
364   ViewerType   vtype;
365 
366   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
367   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
368     ierr = ViewerGetFormat(viewer,&format);
369     if (format == ASCII_FORMAT_INFO_DETAILED) {
370       int nz, nzalloc, mem;
371       MPI_Comm_rank(mat->comm,&rank);
372       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
373       ierr = MatGetInfo(mat,MAT_LOCAL,&nz,&nzalloc,&mem);
374       PetscSequentialPhaseBegin(mat->comm,1);
375       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
376               rank,baij->m,nz*bs,nzalloc*bs,baij->bs,mem);
377       ierr = MatGetInfo(baij->A,MAT_LOCAL,&nz,&nzalloc,&mem);
378       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,nz*bs);
379       ierr = MatGetInfo(baij->B,MAT_LOCAL,&nz,&nzalloc,&mem);
380       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,nz*bs);
381       fflush(fd);
382       PetscSequentialPhaseEnd(mat->comm,1);
383       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
384       return 0;
385     }
386     else if (format == ASCII_FORMAT_INFO) {
387       return 0;
388     }
389   }
390 
391   if (vtype == DRAW_VIEWER) {
392     Draw       draw;
393     PetscTruth isnull;
394     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
395     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
396   }
397 
398   if (vtype == ASCII_FILE_VIEWER) {
399     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
400     PetscSequentialPhaseBegin(mat->comm,1);
401     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
402            baij->rank,baij->m,baij->rstart*bs,baij->rend*bs,baij->n,
403             baij->cstart*bs,baij->cend*bs);
404     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
405     ierr = MatView(baij->B,viewer); CHKERRQ(ierr);
406     fflush(fd);
407     PetscSequentialPhaseEnd(mat->comm,1);
408   }
409   else {
410     int size = baij->size;
411     rank = baij->rank;
412     if (size == 1) {
413       ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
414     }
415     else {
416       /* assemble the entire matrix onto first processor. */
417       Mat         A;
418       Mat_SeqBAIJ *Aloc;
419       int         M = baij->M, N = baij->N,*ai,*aj,row,col,i,j,k,*rvals;
420       int         mbs=baij->mbs;
421       Scalar      *a;
422 
423       if (!rank) {
424         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
425         CHKERRQ(ierr);
426       }
427       else {
428         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
429         CHKERRQ(ierr);
430       }
431       PLogObjectParent(mat,A);
432 
433       /* copy over the A part */
434       Aloc = (Mat_SeqBAIJ*) baij->A->data;
435       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
436       row = baij->rstart;
437       rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
438 
439       for ( i=0; i<mbs; i++ ) {
440         rvals[0] = bs*(baij->rstart + i);
441         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
442         for ( j=ai[i]; j<ai[i+1]; j++ ) {
443           col = (baij->cstart+aj[j])*bs;
444           for (k=0; k<bs; k++ ) {
445             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
446             col++; a += bs;
447           }
448         }
449       }
450       /* copy over the B part */
451       Aloc = (Mat_SeqBAIJ*) baij->B->data;
452       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
453       row = baij->rstart*bs;
454       for ( i=0; i<mbs; i++ ) {
455         rvals[0] = bs*(baij->rstart + i);
456         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
457         for ( j=ai[i]; j<ai[i+1]; j++ ) {
458           col = baij->garray[aj[j]]*bs;
459           for (k=0; k<bs; k++ ) {
460             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
461             col++; a += bs;
462           }
463         }
464       }
465       PetscFree(rvals);
466       ierr = MatAssemblyBegin(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
467       ierr = MatAssemblyEnd(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
468       if (!rank) {
469         ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
470       }
471       ierr = MatDestroy(A); CHKERRQ(ierr);
472     }
473   }
474   return 0;
475 }
476 
477 
478 
479 static int MatView_MPIBAIJ(PetscObject obj,Viewer viewer)
480 {
481   Mat         mat = (Mat) obj;
482   int         ierr;
483   ViewerType  vtype;
484 
485   if (!viewer) {
486     viewer = STDOUT_VIEWER_SELF;
487   }
488   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
489   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
490       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
491     ierr = MatView_MPIBAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
492   }
493   else if (vtype == BINARY_FILE_VIEWER) {
494     return MatView_MPIBAIJ_Binary(mat,viewer);
495   }
496   return 0;
497 }
498 
499 static int MatDestroy_MPIBAIJ(PetscObject obj)
500 {
501   Mat         mat = (Mat) obj;
502   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
503   int         ierr;
504 
505 #if defined(PETSC_LOG)
506   PLogObjectState(obj,"Rows=%d, Cols=%d",baij->M,baij->N);
507 #endif
508 
509   PetscFree(baij->rowners);
510   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
511   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
512   if (baij->colmap) PetscFree(baij->colmap);
513   if (baij->garray) PetscFree(baij->garray);
514   if (baij->lvec)   VecDestroy(baij->lvec);
515   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
516   if (baij->rowvalues) PetscFree(baij->rowvalues);
517   PetscFree(baij);
518   PLogObjectDestroy(mat);
519   PetscHeaderDestroy(mat);
520   return 0;
521 }
522 
523 static int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
524 {
525   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
526   int        ierr;
527 
528   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx); CHKERRQ(ierr);
529   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
530   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx); CHKERRQ(ierr);
531   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
532   return 0;
533 }
534 
535 static int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
536 {
537   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
538   int        ierr;
539   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
540   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
541   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
542   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
543   return 0;
544 }
545 
546 static int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
547 {
548   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
549   int        ierr;
550 
551   /* do nondiagonal part */
552   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
553   /* send it on its way */
554   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,
555                 (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
556   /* do local part */
557   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
558   /* receive remote parts: note this assumes the values are not actually */
559   /* inserted in yy until the next line, which is true for my implementation*/
560   /* but is not perhaps always true. */
561   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,
562                   (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
563   return 0;
564 }
565 
566 static int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
567 {
568   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
569   int        ierr;
570 
571   /* do nondiagonal part */
572   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
573   /* send it on its way */
574   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,
575                  (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
576   /* do local part */
577   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
578   /* receive remote parts: note this assumes the values are not actually */
579   /* inserted in yy until the next line, which is true for my implementation*/
580   /* but is not perhaps always true. */
581   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,
582                   (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
583   return 0;
584 }
585 
586 /*
587   This only works correctly for square matrices where the subblock A->A is the
588    diagonal block
589 */
590 static int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
591 {
592   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
593   if (a->M != a->N)
594     SETERRQ(1,"MatGetDiagonal_MPIBAIJ:Supports only square matrix where A->A is diag block");
595   return MatGetDiagonal(a->A,v);
596 }
597 
598 static int MatScale_MPIBAIJ(Scalar *aa,Mat A)
599 {
600   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
601   int        ierr;
602   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
603   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
604   return 0;
605 }
606 static int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
607 {
608   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
609   *m = mat->M; *n = mat->N;
610   return 0;
611 }
612 
613 static int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
614 {
615   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
616   *m = mat->m; *n = mat->N;
617   return 0;
618 }
619 
620 static int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
621 {
622   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
623   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
624   return 0;
625 }
626 
627 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
628 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
629 
630 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
631 {
632   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
633   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
634   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
635   int        nztot, nzA, nzB, lrow, brstart = mat->rstart*bs, brend = mat->rend*bs;
636   int        *cmap, *idx_p,cstart = mat->cstart;
637 
638   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,"MatGetRow_MPIBAIJ:Already active");
639   mat->getrowactive = PETSC_TRUE;
640 
641   if (!mat->rowvalues && (idx || v)) {
642     /*
643         allocate enough space to hold information from the longest row.
644     */
645     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
646     int     max = 1,mbs = mat->mbs,tmp;
647     for ( i=0; i<mbs; i++ ) {
648       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
649       if (max < tmp) { max = tmp; }
650     }
651     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
652     CHKPTRQ(mat->rowvalues);
653     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
654   }
655 
656 
657   if (row < brstart || row >= brend) SETERRQ(1,"MatGetRow_MPIBAIJ:Only local rows")
658   lrow = row - brstart;
659 
660   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
661   if (!v)   {pvA = 0; pvB = 0;}
662   if (!idx) {pcA = 0; if (!v) pcB = 0;}
663   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
664   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
665   nztot = nzA + nzB;
666 
667   cmap  = mat->garray;
668   if (v  || idx) {
669     if (nztot) {
670       /* Sort by increasing column numbers, assuming A and B already sorted */
671       int imark = -1;
672       if (v) {
673         *v = v_p = mat->rowvalues;
674         for ( i=0; i<nzB; i++ ) {
675           if (cmap[cworkB[i]/bs] < cstart)   v_p[i] = vworkB[i];
676           else break;
677         }
678         imark = i;
679         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
680         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
681       }
682       if (idx) {
683         *idx = idx_p = mat->rowindices;
684         if (imark > -1) {
685           for ( i=0; i<imark; i++ ) {
686             idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs;
687           }
688         } else {
689           for ( i=0; i<nzB; i++ ) {
690             if (cmap[cworkB[i]/bs] < cstart)
691               idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
692             else break;
693           }
694           imark = i;
695         }
696         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart*bs + cworkA[i];
697         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
698       }
699     }
700     else {
701       if (idx) *idx = 0;
702       if (v)   *v   = 0;
703     }
704   }
705   *nz = nztot;
706   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
707   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
708   return 0;
709 }
710 
711 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
712 {
713   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
714   if (baij->getrowactive == PETSC_FALSE) {
715     SETERRQ(1,"MatRestoreRow_MPIBAIJ:MatGetRow not called");
716   }
717   baij->getrowactive = PETSC_FALSE;
718   return 0;
719 }
720 
721 /* -------------------------------------------------------------------*/
722 static struct _MatOps MatOps = {
723   MatSetValues_MPIBAIJ,MatGetRow_MPIBAIJ,MatRestoreRow_MPIBAIJ,MatMult_MPIBAIJ,
724   MatMultAdd_MPIBAIJ,MatMultTrans_MPIBAIJ,MatMultTransAdd_MPIBAIJ,0,
725   0,0,0,0,
726   0,0,0,0,
727   0,MatGetDiagonal_MPIBAIJ,0,MatNorm_MPIBAIJ,
728   MatAssemblyBegin_MPIBAIJ,MatAssemblyEnd_MPIBAIJ,0,0,
729   0,0,MatGetReordering_MPIBAIJ,0,
730   0,0,0,MatGetSize_MPIBAIJ,
731   MatGetLocalSize_MPIBAIJ,MatGetOwnershipRange_MPIBAIJ,0,0,
732   0,0,0,0,
733   0,0,0,0,
734   0,0,0,MatGetSubMatrices_MPIBAIJ,
735   MatIncreaseOverlap_MPIBAIJ,MatGetValues_MPIBAIJ,0,0,
736   MatScale_MPIBAIJ,0,0};
737 
738 
739 /*@C
740    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
741    (block compressed row).  For good matrix assembly performance
742    the user should preallocate the matrix storage by setting the parameters
743    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
744    performance can be increased by more than a factor of 50.
745 
746    Input Parameters:
747 .  comm - MPI communicator
748 .  bs   - size of blockk
749 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
750 .  n - number of local columns (or PETSC_DECIDE to have calculated
751            if N is given)
752 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
753 .  N - number of global columns (or PETSC_DECIDE to have calculated
754            if n is given)
755 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
756            submatrix  (same for all local rows)
757 .  d_nzz - number of block nonzeros per block row in diagonal portion of local
758            submatrix or null (possibly different for each row).  You must leave
759            room for the diagonal entry even if it is zero.
760 .  o_nz  - number of block nonzeros per block row in off-diagonal portion of local
761            submatrix (same for all local rows).
762 .  o_nzz - number of block nonzeros per block row in off-diagonal portion of local
763            submatrix or null (possibly different for each row).
764 
765    Output Parameter:
766 .  A - the matrix
767 
768    Notes:
769    The user MUST specify either the local or global matrix dimensions
770    (possibly both).
771 
772    Storage Information:
773    For a square global matrix we define each processor's diagonal portion
774    to be its local rows and the corresponding columns (a square submatrix);
775    each processor's off-diagonal portion encompasses the remainder of the
776    local matrix (a rectangular submatrix).
777 
778    The user can specify preallocated storage for the diagonal part of
779    the local submatrix with either d_nz or d_nnz (not both).  Set
780    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
781    memory allocation.  Likewise, specify preallocated storage for the
782    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
783 
784    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
785    the figure below we depict these three local rows and all columns (0-11).
786 
787 $          0 1 2 3 4 5 6 7 8 9 10 11
788 $         -------------------
789 $  row 3  |  o o o d d d o o o o o o
790 $  row 4  |  o o o d d d o o o o o o
791 $  row 5  |  o o o d d d o o o o o o
792 $         -------------------
793 $
794 
795    Thus, any entries in the d locations are stored in the d (diagonal)
796    submatrix, and any entries in the o locations are stored in the
797    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
798    stored simply in the MATSEQBAIJ format for compressed row storage.
799 
800    Now d_nz should indicate the number of nonzeros per row in the d matrix,
801    and o_nz should indicate the number of nonzeros per row in the o matrix.
802    In general, for PDE problems in which most nonzeros are near the diagonal,
803    one expects d_nz >> o_nz.   For additional details, see the users manual
804    chapter on matrices and the file $(PETSC_DIR)/Performance.
805 
806 .keywords: matrix, aij, compressed row, sparse, parallel
807 
808 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues()
809 @*/
810 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
811                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
812 {
813   Mat          B;
814   Mat_MPIBAIJ  *b;
815   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE;
816 
817   if (bs < 1) SETERRQ(1,"MatCreateMPIBAIJ: invalid block size specified");
818   *A = 0;
819   PetscHeaderCreate(B,_Mat,MAT_COOKIE,MATMPIBAIJ,comm);
820   PLogObjectCreate(B);
821   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
822   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
823   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
824   B->destroy    = MatDestroy_MPIBAIJ;
825   B->view       = MatView_MPIBAIJ;
826 
827   B->factor     = 0;
828   B->assembled  = PETSC_FALSE;
829 
830   b->insertmode = NOT_SET_VALUES;
831   MPI_Comm_rank(comm,&b->rank);
832   MPI_Comm_size(comm,&b->size);
833 
834   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
835     SETERRQ(1,"MatCreateMPIBAIJ:Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
836   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) SETERRQ(1,"MatCreateMPIBAIJ: either M or m should be specified");
837   if ( M == PETSC_DECIDE && n == PETSC_DECIDE)SETERRQ(1,"MatCreateMPIBAIJ: either N or n should be specified");
838   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
839   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
840 
841   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
842     work[0] = m; work[1] = n;
843     mbs = m/bs; nbs = n/bs;
844     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
845     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
846     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
847   }
848   if (m == PETSC_DECIDE) {
849     Mbs = M/bs;
850     if (Mbs*bs != M) SETERRQ(1,"MatCreateMPIBAIJ: No of global rows must be divisible by blocksize");
851     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
852     m   = mbs*bs;
853   }
854   if (n == PETSC_DECIDE) {
855     Nbs = N/bs;
856     if (Nbs*bs != N) SETERRQ(1,"MatCreateMPIBAIJ: No of global cols must be divisible by blocksize");
857     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
858     n   = nbs*bs;
859   }
860   if (mbs*bs != m || nbs*bs != n) SETERRQ(1,"MatCreateMPIBAIJ: No of local rows, cols must be divisible by blocksize");
861 
862   b->m = m; B->m = m;
863   b->n = n; B->n = n;
864   b->N = N; B->N = N;
865   b->M = M; B->M = M;
866   b->bs  = bs;
867   b->bs2 = bs*bs;
868   b->mbs = mbs;
869   b->nbs = nbs;
870   b->Mbs = Mbs;
871   b->Nbs = Nbs;
872 
873   /* build local table of row and column ownerships */
874   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
875   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
876   b->cowners = b->rowners + b->size + 1;
877   MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
878   b->rowners[0] = 0;
879   for ( i=2; i<=b->size; i++ ) {
880     b->rowners[i] += b->rowners[i-1];
881   }
882   b->rstart = b->rowners[b->rank];
883   b->rend   = b->rowners[b->rank+1];
884   MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
885   b->cowners[0] = 0;
886   for ( i=2; i<=b->size; i++ ) {
887     b->cowners[i] += b->cowners[i-1];
888   }
889   b->cstart = b->cowners[b->rank];
890   b->cend   = b->cowners[b->rank+1];
891 
892   if (d_nz == PETSC_DEFAULT) d_nz = 5;
893   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
894   PLogObjectParent(B,b->A);
895   if (o_nz == PETSC_DEFAULT) o_nz = 0;
896   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
897   PLogObjectParent(B,b->B);
898 
899   /* build cache for off array entries formed */
900   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
901   b->colmap      = 0;
902   b->garray      = 0;
903   b->roworiented = 1;
904 
905   /* stuff used for matrix vector multiply */
906   b->lvec      = 0;
907   b->Mvctx     = 0;
908 
909   /* stuff for MatGetRow() */
910   b->rowindices   = 0;
911   b->rowvalues    = 0;
912   b->getrowactive = PETSC_FALSE;
913 
914   *A = B;
915   return 0;
916 }
917 
918 #include "sys.h"
919 
920 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
921 {
922   Mat          A;
923   int          i, nz, ierr, j,rstart, rend, fd;
924   Scalar       *vals,*buf;
925   MPI_Comm     comm = ((PetscObject)viewer)->comm;
926   MPI_Status   status;
927   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
928   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
929   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,bs2,Mbs,mbs,extra_rows;
930   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
931   int          dcount,kmax,k,nzcount,tmp;
932 
933 
934   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
935   bs2  = bs*bs;
936 
937   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
938   if (!rank) {
939     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
940     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
941     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIBAIJ:not matrix object");
942   }
943 
944   MPI_Bcast(header+1,3,MPI_INT,0,comm);
945   M = header[1]; N = header[2];
946 
947 
948   if (M != N) SETERRQ(1,"MatLoad_SeqBAIJ:Can only do square matrices");
949 
950   /*
951      This code adds extra rows to make sure the number of rows is
952      divisible by the blocksize
953   */
954   Mbs        = M/bs;
955   extra_rows = bs - M + bs*(Mbs);
956   if (extra_rows == bs) extra_rows = 0;
957   else                  Mbs++;
958   if (extra_rows &&!rank) {
959     PLogInfo(0,"MatLoad_SeqBAIJ:Padding loaded matrix to match blocksize");
960   }
961   /* determine ownership of all rows */
962   mbs = Mbs/size + ((Mbs % size) > rank);
963   m   = mbs * bs;
964   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
965   browners = rowners + size + 1;
966   MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);
967   rowners[0] = 0;
968   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
969   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
970   rstart = rowners[rank];
971   rend   = rowners[rank+1];
972 
973   /* distribute row lengths to all processors */
974   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
975   if (!rank) {
976     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
977     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
978     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
979     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
980     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
981     MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);
982     PetscFree(sndcounts);
983   }
984   else {
985     MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);
986   }
987 
988   if (!rank) {
989     /* calculate the number of nonzeros on each processor */
990     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
991     PetscMemzero(procsnz,size*sizeof(int));
992     for ( i=0; i<size; i++ ) {
993       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
994         procsnz[i] += rowlengths[j];
995       }
996     }
997     PetscFree(rowlengths);
998 
999     /* determine max buffer needed and allocate it */
1000     maxnz = 0;
1001     for ( i=0; i<size; i++ ) {
1002       maxnz = PetscMax(maxnz,procsnz[i]);
1003     }
1004     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1005 
1006     /* read in my part of the matrix column indices  */
1007     nz = procsnz[0];
1008     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1009     mycols = ibuf;
1010     if (size == 1)  nz -= extra_rows;
1011     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1012     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
1013 
1014     /* read in every ones (except the last) and ship off */
1015     for ( i=1; i<size-1; i++ ) {
1016       nz = procsnz[i];
1017       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1018       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1019     }
1020     /* read in the stuff for the last proc */
1021     if ( size != 1 ) {
1022       nz = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
1023       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1024       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
1025       MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);
1026     }
1027     PetscFree(cols);
1028   }
1029   else {
1030     /* determine buffer space needed for message */
1031     nz = 0;
1032     for ( i=0; i<m; i++ ) {
1033       nz += locrowlens[i];
1034     }
1035     ibuf = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1036     mycols = ibuf;
1037     /* receive message of column indices*/
1038     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1039     MPI_Get_count(&status,MPI_INT,&maxnz);
1040     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIBAIJ:something is wrong with file");
1041   }
1042 
1043   /* loop over local rows, determining number of off diagonal entries */
1044   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
1045   odlens = dlens + (rend-rstart);
1046   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
1047   PetscMemzero(mask,3*Mbs*sizeof(int));
1048   masked1 = mask    + Mbs;
1049   masked2 = masked1 + Mbs;
1050   rowcount = 0; nzcount = 0;
1051   for ( i=0; i<mbs; i++ ) {
1052     dcount  = 0;
1053     odcount = 0;
1054     for ( j=0; j<bs; j++ ) {
1055       kmax = locrowlens[rowcount];
1056       for ( k=0; k<kmax; k++ ) {
1057         tmp = mycols[nzcount++]/bs;
1058         if (!mask[tmp]) {
1059           mask[tmp] = 1;
1060           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
1061           else masked1[dcount++] = tmp;
1062         }
1063       }
1064       rowcount++;
1065     }
1066 
1067     dlens[i]  = dcount;
1068     odlens[i] = odcount;
1069 
1070     /* zero out the mask elements we set */
1071     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
1072     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
1073   }
1074 
1075   /* create our matrix */
1076   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);CHKERRQ(ierr);
1077   A = *newmat;
1078   MatSetOption(A,COLUMNS_SORTED);
1079 
1080   if (!rank) {
1081     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
1082     /* read in my part of the matrix numerical values  */
1083     nz = procsnz[0];
1084     vals = buf;
1085     mycols = ibuf;
1086     if (size == 1)  nz -= extra_rows;
1087     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1088     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
1089     /* insert into matrix */
1090     jj      = rstart*bs;
1091     for ( i=0; i<m; i++ ) {
1092       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1093       mycols += locrowlens[i];
1094       vals   += locrowlens[i];
1095       jj++;
1096     }
1097     /* read in other processors( except the last one) and ship out */
1098     for ( i=1; i<size-1; i++ ) {
1099       nz = procsnz[i];
1100       vals = buf;
1101       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1102       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1103     }
1104     /* the last proc */
1105     if ( size != 1 ){
1106       nz = procsnz[i] - extra_rows;
1107       vals = buf;
1108       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1109       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
1110       MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);
1111     }
1112     PetscFree(procsnz);
1113   }
1114   else {
1115     /* receive numeric values */
1116     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
1117 
1118     /* receive message of values*/
1119     vals = buf;
1120     mycols = ibuf;
1121     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1122     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1123     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIBAIJ:something is wrong with file");
1124 
1125     /* insert into matrix */
1126     jj      = rstart*bs;
1127     for ( i=0; i<m; i++ ) {
1128       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1129       mycols += locrowlens[i];
1130       vals   += locrowlens[i];
1131       jj++;
1132     }
1133   }
1134   PetscFree(locrowlens);
1135   PetscFree(buf);
1136   PetscFree(ibuf);
1137   PetscFree(rowners);
1138   PetscFree(dlens);
1139   PetscFree(mask);
1140   ierr = MatAssemblyBegin(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
1141   ierr = MatAssemblyEnd(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
1142   return 0;
1143 }
1144 
1145 
1146