xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision b61a851f6b27729bedaaf02efee3e442e5d67aa7)
1 #ifndef lint
2 static char vcid[] = "$Id: mpibaij.c,v 1.5 1996/06/24 22:18:52 balay Exp balay $";
3 #endif
4 
5 #include "mpibaij.h"
6 
7 
8 #include "draw.h"
9 #include "pinclude/pviewer.h"
10 
11 extern int MatSetUpMultiply_MPIBAIJ(Mat);
12 extern int DisAssemble_MPIBAIJ(Mat);
13 
14 /* local utility routine that creates a mapping from the global column
15 number to the local number in the off-diagonal part of the local
16 storage of the matrix.  This is done in a non scable way since the
17 length of colmap equals the global matrix length.
18 */
19 static int CreateColmap_MPIBAIJ_Private(Mat mat)
20 {
21   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
22   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
23   int        nbs = B->nbs,i;
24 
25   baij->colmap = (int *) PetscMalloc(baij->Nbs*sizeof(int));CHKPTRQ(baij->colmap);
26   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
27   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
28   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i;
29   return 0;
30 }
31 
32 
33 static int MatGetReordering_MPIBAIJ(Mat mat,MatReordering type,IS *rperm,IS *cperm)
34 {
35   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
36   int         ierr;
37   if (baij->size == 1) {
38     ierr = MatGetReordering(baij->A,type,rperm,cperm); CHKERRQ(ierr);
39   } else SETERRQ(1,"MatGetReordering_MPIBAIJ:not supported in parallel");
40   return 0;
41 }
42 
43 static int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
44 {
45   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
46   Scalar      value;
47   int         ierr,i,j, rstart = baij->rstart, rend = baij->rend;
48   int         cstart = baij->cstart, cend = baij->cend,row,col;
49   int         roworiented = baij->roworiented,rstart_orig,rend_orig;
50   int         cstart_orig,cend_orig,bs=baij->bs;
51 
52   if (baij->insertmode != NOT_SET_VALUES && baij->insertmode != addv) {
53     SETERRQ(1,"MatSetValues_MPIBAIJ:Cannot mix inserts and adds");
54   }
55   baij->insertmode = addv;
56   rstart_orig = rstart*bs;
57   rend_orig   = rend*bs;
58   cstart_orig = cstart*bs;
59   cend_orig   = cend*bs;
60   for ( i=0; i<m; i++ ) {
61     if (im[i] < 0) SETERRQ(1,"MatSetValues_MPIBAIJ:Negative row");
62     if (im[i] >= baij->M) SETERRQ(1,"MatSetValues_MPIBAIJ:Row too large");
63     if (im[i] >= rstart_orig && im[i] < rend_orig) {
64       row = im[i] - rstart_orig;
65       for ( j=0; j<n; j++ ) {
66         if (in[j] < 0) SETERRQ(1,"MatSetValues_MPIBAIJ:Negative column");
67         if (in[j] >= baij->N) SETERRQ(1,"MatSetValues_MPIBAIJ:Col too large");
68         if (in[j] >= cstart_orig && in[j] < cend_orig){
69           col = in[j] - cstart_orig;
70           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
71           ierr = MatSetValues(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
72         }
73         else {
74           if (mat->was_assembled) {
75             if (!baij->colmap) {ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);}
76             col = baij->colmap[in[j]];
77             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
78               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
79               col =  in[j];
80             }
81           }
82           else col = in[j];
83           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
84           ierr = MatSetValues(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
85         }
86       }
87     }
88     else {
89       if (roworiented) {
90         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
91       }
92       else {
93         row = im[i];
94         for ( j=0; j<n; j++ ) {
95           ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
96         }
97       }
98     }
99   }
100   return 0;
101 }
102 
103 static int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
104 {
105   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
106   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
107   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col;
108 
109   for ( i=0; i<m; i++ ) {
110     if (idxm[i] < 0) SETERRQ(1,"MatGetValues_MPIBAIJ:Negative row");
111     if (idxm[i] >= baij->M) SETERRQ(1,"MatGetValues_MPIBAIJ:Row too large");
112     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
113       row = idxm[i] - bsrstart;
114       for ( j=0; j<n; j++ ) {
115         if (idxn[j] < 0) SETERRQ(1,"MatGetValues_MPIBAIJ:Negative column");
116         if (idxn[j] >= baij->N) SETERRQ(1,"MatGetValues_MPIBAIJ:Col too large");
117         if (idxn[j] >= bscstart && idxn[j] < bscend){
118           col = idxn[j] - bscstart;
119           ierr = MatGetValues(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
120         }
121         else {
122           if (!baij->colmap) {ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);}
123           col = baij->colmap[idxn[j]/bs]*bs + col%bs;
124           ierr = MatGetValues(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
125         }
126       }
127     }
128     else {
129       SETERRQ(1,"MatGetValues_MPIBAIJ:Only local values currently supported");
130     }
131   }
132   return 0;
133 }
134 
135 static int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
136 {
137   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
138   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
139   int        ierr, i,bs2=baij->bs2;
140   double     sum = 0.0;
141   Scalar     *v;
142 
143   if (baij->size == 1) {
144     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
145   } else {
146     if (type == NORM_FROBENIUS) {
147       v = amat->a;
148       for (i=0; i<amat->nz*bs2; i++ ) {
149 #if defined(PETSC_COMPLEX)
150         sum += real(conj(*v)*(*v)); v++;
151 #else
152         sum += (*v)*(*v); v++;
153 #endif
154       }
155       v = bmat->a;
156       for (i=0; i<bmat->nz*bs2; i++ ) {
157 #if defined(PETSC_COMPLEX)
158         sum += real(conj(*v)*(*v)); v++;
159 #else
160         sum += (*v)*(*v); v++;
161 #endif
162       }
163       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
164       *norm = sqrt(*norm);
165     }
166     else
167       SETERRQ(1,"MatNorm_SeqBAIJ:No support for this norm yet");
168   }
169   return 0;
170 }
171 
172 static int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
173 {
174   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
175   MPI_Comm    comm = mat->comm;
176   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
177   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
178   MPI_Request *send_waits,*recv_waits;
179   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
180   InsertMode  addv;
181   Scalar      *rvalues,*svalues;
182 
183   /* make sure all processors are either in INSERTMODE or ADDMODE */
184   MPI_Allreduce(&baij->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
185   if (addv == (ADD_VALUES|INSERT_VALUES)) {
186     SETERRQ(1,"MatAssemblyBegin_MPIBAIJ:Some processors inserted others added");
187   }
188   baij->insertmode = addv; /* in case this processor had no cache */
189 
190   /*  first count number of contributors to each processor */
191   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
192   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
193   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
194   for ( i=0; i<baij->stash.n; i++ ) {
195     idx = baij->stash.idx[i];
196     for ( j=0; j<size; j++ ) {
197       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
198         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
199       }
200     }
201   }
202   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
203 
204   /* inform other processors of number of messages and max length*/
205   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
206   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
207   nreceives = work[rank];
208   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
209   nmax = work[rank];
210   PetscFree(work);
211 
212   /* post receives:
213        1) each message will consist of ordered pairs
214      (global index,value) we store the global index as a double
215      to simplify the message passing.
216        2) since we don't know how long each individual message is we
217      allocate the largest needed buffer for each receive. Potentially
218      this is a lot of wasted space.
219 
220 
221        This could be done better.
222   */
223   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
224   CHKPTRQ(rvalues);
225   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
226   CHKPTRQ(recv_waits);
227   for ( i=0; i<nreceives; i++ ) {
228     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
229               comm,recv_waits+i);
230   }
231 
232   /* do sends:
233       1) starts[i] gives the starting index in svalues for stuff going to
234          the ith processor
235   */
236   svalues = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
237   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
238   CHKPTRQ(send_waits);
239   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
240   starts[0] = 0;
241   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
242   for ( i=0; i<baij->stash.n; i++ ) {
243     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
244     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
245     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
246   }
247   PetscFree(owner);
248   starts[0] = 0;
249   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
250   count = 0;
251   for ( i=0; i<size; i++ ) {
252     if (procs[i]) {
253       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
254                 comm,send_waits+count++);
255     }
256   }
257   PetscFree(starts); PetscFree(nprocs);
258 
259   /* Free cache space */
260   PLogInfo(0,"[%d]MatAssemblyBegin_MPIBAIJ:Number of off processor values %d\n",rank,baij->stash.n);
261   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
262 
263   baij->svalues    = svalues;    baij->rvalues    = rvalues;
264   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
265   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
266   baij->rmax       = nmax;
267 
268   return 0;
269 }
270 
271 
272 static int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
273 {
274   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
275   MPI_Status  *send_status,recv_status;
276   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
277   int         bs=baij->bs,row,col,other_disassembled;
278   Scalar      *values,val;
279   InsertMode  addv = baij->insertmode;
280 
281   /*  wait on receives */
282   while (count) {
283     MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);
284     /* unpack receives into our local space */
285     values = baij->rvalues + 3*imdex*baij->rmax;
286     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
287     n = n/3;
288     for ( i=0; i<n; i++ ) {
289       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
290       col = (int) PetscReal(values[3*i+1]);
291       val = values[3*i+2];
292       if (col >= baij->cstart*bs && col < baij->cend*bs) {
293         col -= baij->cstart*bs;
294         MatSetValues(baij->A,1,&row,1,&col,&val,addv);
295       }
296       else {
297         if (mat->was_assembled) {
298           if (!baij->colmap) {ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);}
299           col = baij->colmap[col/bs]*bs + col%bs;
300           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
301             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
302             col = (int) PetscReal(values[3*i+1]);
303           }
304         }
305         MatSetValues(baij->B,1,&row,1,&col,&val,addv);
306       }
307     }
308     count--;
309   }
310   PetscFree(baij->recv_waits); PetscFree(baij->rvalues);
311 
312   /* wait on sends */
313   if (baij->nsends) {
314     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));
315     CHKPTRQ(send_status);
316     MPI_Waitall(baij->nsends,baij->send_waits,send_status);
317     PetscFree(send_status);
318   }
319   PetscFree(baij->send_waits); PetscFree(baij->svalues);
320 
321   baij->insertmode = NOT_SET_VALUES;
322   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
323   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
324 
325   /* determine if any processor has disassembled, if so we must
326      also disassemble ourselfs, in order that we may reassemble. */
327   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
328   if (mat->was_assembled && !other_disassembled) {
329     ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
330   }
331 
332   if (!mat->was_assembled && mode == FINAL_ASSEMBLY) {
333     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
334   }
335   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
336   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
337 
338   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
339   return 0;
340 }
341 
342 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
343 {
344   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
345   int          ierr;
346 
347   if (baij->size == 1) {
348     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
349   }
350   else SETERRQ(1,"MatView_MPIBAIJ_Binary:Only uniprocessor output supported");
351   return 0;
352 }
353 
354 static int MatView_MPIBAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
355 {
356   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
357   int          ierr, format,rank,bs=baij->bs;
358   FILE         *fd;
359   ViewerType   vtype;
360 
361   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
362   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
363     ierr = ViewerGetFormat(viewer,&format);
364     if (format == ASCII_FORMAT_INFO_DETAILED) {
365       int nz, nzalloc, mem;
366       MPI_Comm_rank(mat->comm,&rank);
367       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
368       ierr = MatGetInfo(mat,MAT_LOCAL,&nz,&nzalloc,&mem);
369       PetscSequentialPhaseBegin(mat->comm,1);
370       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
371               rank,baij->m,nz*bs,nzalloc*bs,baij->bs,mem);
372       ierr = MatGetInfo(baij->A,MAT_LOCAL,&nz,&nzalloc,&mem);
373       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,nz*bs);
374       ierr = MatGetInfo(baij->B,MAT_LOCAL,&nz,&nzalloc,&mem);
375       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,nz*bs);
376       fflush(fd);
377       PetscSequentialPhaseEnd(mat->comm,1);
378       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
379       return 0;
380     }
381     else if (format == ASCII_FORMAT_INFO) {
382       return 0;
383     }
384   }
385 
386   if (vtype == DRAW_VIEWER) {
387     Draw       draw;
388     PetscTruth isnull;
389     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
390     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
391   }
392 
393   if (vtype == ASCII_FILE_VIEWER) {
394     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
395     PetscSequentialPhaseBegin(mat->comm,1);
396     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
397            baij->rank,baij->m,baij->rstart*bs,baij->rend*bs,baij->n,
398             baij->cstart*bs,baij->cend*bs);
399     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
400     ierr = MatView(baij->B,viewer); CHKERRQ(ierr);
401     fflush(fd);
402     PetscSequentialPhaseEnd(mat->comm,1);
403   }
404   else {
405     int size = baij->size;
406     rank = baij->rank;
407     if (size == 1) {
408       ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
409     }
410     else {
411       /* assemble the entire matrix onto first processor. */
412       Mat         A;
413       Mat_SeqBAIJ *Aloc;
414       int         M = baij->M, N = baij->N,*ai,*aj,row,col,i,j,k,*rvals;
415       int         mbs=baij->mbs;
416       Scalar      *a;
417 
418       if (!rank) {
419         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
420         CHKERRQ(ierr);
421       }
422       else {
423         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
424         CHKERRQ(ierr);
425       }
426       PLogObjectParent(mat,A);
427 
428       /* copy over the A part */
429       Aloc = (Mat_SeqBAIJ*) baij->A->data;
430       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
431       row = baij->rstart;
432       rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
433 
434       for ( i=0; i<mbs; i++ ) {
435         rvals[0] = bs*(baij->rstart + i);
436         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
437         for ( j=ai[i]; j<ai[i+1]; j++ ) {
438           col = (baij->cstart+aj[j])*bs;
439           for (k=0; k<bs; k++ ) {
440             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
441             col++; a += bs;
442           }
443         }
444       }
445       /* copy over the B part */
446       Aloc = (Mat_SeqBAIJ*) baij->B->data;
447       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
448       row = baij->rstart*bs;
449       for ( i=0; i<mbs; i++ ) {
450         rvals[0] = bs*(baij->rstart + i);
451         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
452         for ( j=ai[i]; j<ai[i+1]; j++ ) {
453           col = baij->garray[aj[j]]*bs;
454           for (k=0; k<bs; k++ ) {
455             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
456             col++; a += bs;
457           }
458         }
459       }
460       PetscFree(rvals);
461       ierr = MatAssemblyBegin(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
462       ierr = MatAssemblyEnd(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
463       if (!rank) {
464         ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
465       }
466       ierr = MatDestroy(A); CHKERRQ(ierr);
467     }
468   }
469   return 0;
470 }
471 
472 
473 
474 static int MatView_MPIBAIJ(PetscObject obj,Viewer viewer)
475 {
476   Mat         mat = (Mat) obj;
477   int         ierr;
478   ViewerType  vtype;
479 
480   if (!viewer) {
481     viewer = STDOUT_VIEWER_SELF;
482   }
483   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
484   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
485       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
486     ierr = MatView_MPIBAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
487   }
488   else if (vtype == BINARY_FILE_VIEWER) {
489     return MatView_MPIBAIJ_Binary(mat,viewer);
490   }
491   return 0;
492 }
493 
494 static int MatDestroy_MPIBAIJ(PetscObject obj)
495 {
496   Mat         mat = (Mat) obj;
497   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
498   int         ierr;
499 
500 #if defined(PETSC_LOG)
501   PLogObjectState(obj,"Rows=%d, Cols=%d",baij->M,baij->N);
502 #endif
503 
504   PetscFree(baij->rowners);
505   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
506   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
507   if (baij->colmap) PetscFree(baij->colmap);
508   if (baij->garray) PetscFree(baij->garray);
509   if (baij->lvec)   VecDestroy(baij->lvec);
510   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
511   if (baij->rowvalues) PetscFree(baij->rowvalues);
512   PetscFree(baij);
513   PLogObjectDestroy(mat);
514   PetscHeaderDestroy(mat);
515   return 0;
516 }
517 
518 static int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
519 {
520   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
521   int        ierr;
522 
523   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx); CHKERRQ(ierr);
524   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
525   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx); CHKERRQ(ierr);
526   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
527   return 0;
528 }
529 
530 static int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
531 {
532   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
533   int        ierr;
534   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
535   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
536   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
537   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
538   return 0;
539 }
540 
541 static int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
542 {
543   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
544   int        ierr;
545 
546   /* do nondiagonal part */
547   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
548   /* send it on its way */
549   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,
550                 (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
551   /* do local part */
552   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
553   /* receive remote parts: note this assumes the values are not actually */
554   /* inserted in yy until the next line, which is true for my implementation*/
555   /* but is not perhaps always true. */
556   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,
557                   (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
558   return 0;
559 }
560 
561 static int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
562 {
563   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
564   int        ierr;
565 
566   /* do nondiagonal part */
567   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
568   /* send it on its way */
569   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,
570                  (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
571   /* do local part */
572   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
573   /* receive remote parts: note this assumes the values are not actually */
574   /* inserted in yy until the next line, which is true for my implementation*/
575   /* but is not perhaps always true. */
576   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,
577                   (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
578   return 0;
579 }
580 
581 /*
582   This only works correctly for square matrices where the subblock A->A is the
583    diagonal block
584 */
585 static int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
586 {
587   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
588   if (a->M != a->N)
589     SETERRQ(1,"MatGetDiagonal_MPIBAIJ:Supports only square matrix where A->A is diag block");
590   return MatGetDiagonal(a->A,v);
591 }
592 
593 static int MatScale_MPIBAIJ(Scalar *aa,Mat A)
594 {
595   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
596   int        ierr;
597   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
598   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
599   return 0;
600 }
601 static int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
602 {
603   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
604   *m = mat->M; *n = mat->N;
605   return 0;
606 }
607 
608 static int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
609 {
610   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
611   *m = mat->m; *n = mat->N;
612   return 0;
613 }
614 
615 static int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
616 {
617   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
618   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
619   return 0;
620 }
621 
622 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
623 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
624 
625 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
626 {
627   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
628   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
629   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
630   int        nztot, nzA, nzB, lrow, rstart = mat->rstart*bs, rend = mat->rend*bs;
631   int        *cmap, *idx_p,cstart = mat->cstart*bs;
632 
633   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,"MatGetRow_MPIBAIJ:Already active");
634   mat->getrowactive = PETSC_TRUE;
635 
636   if (!mat->rowvalues && (idx || v)) {
637     /*
638         allocate enough space to hold information from the longest row.
639     */
640     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
641     int     max = 1,nbs = mat->nbs,tmp;
642     for ( i=0; i<nbs; i++ ) {
643       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
644       if (max < tmp) { max = tmp; }
645     }
646     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
647     CHKPTRQ(mat->rowvalues);
648     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
649   }
650 
651 
652   if (row < rstart || row >= rend) SETERRQ(1,"MatGetRow_MPIBAIJ:Only local rows")
653   lrow = row - rstart;
654 
655   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
656   if (!v)   {pvA = 0; pvB = 0;}
657   if (!idx) {pcA = 0; if (!v) pcB = 0;}
658   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
659   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
660   nztot = nzA + nzB;
661 
662   cmap  = mat->garray;
663   if (v  || idx) {
664     if (nztot) {
665       /* Sort by increasing column numbers, assuming A and B already sorted */
666       int imark = -1;
667       if (v) {
668         *v = v_p = mat->rowvalues;
669         for ( i=0; i<nzB; i++ ) {
670           if (cmap[cworkB[i]] < cstart)   v_p[i] = vworkB[i];
671           else break;
672         }
673         imark = i;
674         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
675         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
676       }
677       if (idx) {
678         *idx = idx_p = mat->rowindices;
679         if (imark > -1) {
680           for ( i=0; i<imark; i++ ) {
681             idx_p[i] = cmap[cworkB[i]];
682           }
683         } else {
684           for ( i=0; i<nzB; i++ ) {
685             if (cmap[cworkB[i]] < cstart)   idx_p[i] = cmap[cworkB[i]];
686             else break;
687           }
688           imark = i;
689         }
690         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart + cworkA[i];
691         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]];
692       }
693     }
694     else {*idx = 0; *v=0;}
695   }
696   *nz = nztot;
697   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
698   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
699   return 0;
700 }
701 
702 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
703 {
704   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
705   if (baij->getrowactive == PETSC_FALSE) {
706     SETERRQ(1,"MatRestoreRow_MPIBAIJ:MatGetRow not called");
707   }
708   baij->getrowactive = PETSC_FALSE;
709   return 0;
710 }
711 
712 /* -------------------------------------------------------------------*/
713 static struct _MatOps MatOps = {
714   MatSetValues_MPIBAIJ,0,0,MatMult_MPIBAIJ,
715   MatMultAdd_MPIBAIJ,MatMultTrans_MPIBAIJ,MatMultTransAdd_MPIBAIJ,0,
716   0,0,0,0,
717   0,0,0,0,
718   0,MatGetDiagonal_MPIBAIJ,0,MatNorm_MPIBAIJ,
719   MatAssemblyBegin_MPIBAIJ,MatAssemblyEnd_MPIBAIJ,0,0,
720   0,0,MatGetReordering_MPIBAIJ,0,
721   0,0,0,MatGetSize_MPIBAIJ,
722   MatGetLocalSize_MPIBAIJ,MatGetOwnershipRange_MPIBAIJ,0,0,
723   0,0,0,0,
724   0,0,0,0,
725   0,0,0,0,
726   0,MatGetValues_MPIBAIJ,0,0,
727   MatScale_MPIBAIJ,0,0};
728 
729 
730 /*@C
731    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
732    (block compressed row).  For good matrix assembly performance
733    the user should preallocate the matrix storage by setting the parameters
734    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
735    performance can be increased by more than a factor of 50.
736 
737    Input Parameters:
738 .  comm - MPI communicator
739 .  bs   - size of blockk
740 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
741 .  n - number of local columns (or PETSC_DECIDE to have calculated
742            if N is given)
743 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
744 .  N - number of global columns (or PETSC_DECIDE to have calculated
745            if n is given)
746 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
747            submatrix  (same for all local rows)
748 .  d_nzz - number of block nonzeros per block row in diagonal portion of local
749            submatrix or null (possibly different for each row).  You must leave
750            room for the diagonal entry even if it is zero.
751 .  o_nz  - number of block nonzeros per block row in off-diagonal portion of local
752            submatrix (same for all local rows).
753 .  o_nzz - number of block nonzeros per block row in off-diagonal portion of local
754            submatrix or null (possibly different for each row).
755 
756    Output Parameter:
757 .  A - the matrix
758 
759    Notes:
760    The user MUST specify either the local or global matrix dimensions
761    (possibly both).
762 
763    Storage Information:
764    For a square global matrix we define each processor's diagonal portion
765    to be its local rows and the corresponding columns (a square submatrix);
766    each processor's off-diagonal portion encompasses the remainder of the
767    local matrix (a rectangular submatrix).
768 
769    The user can specify preallocated storage for the diagonal part of
770    the local submatrix with either d_nz or d_nnz (not both).  Set
771    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
772    memory allocation.  Likewise, specify preallocated storage for the
773    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
774 
775    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
776    the figure below we depict these three local rows and all columns (0-11).
777 
778 $          0 1 2 3 4 5 6 7 8 9 10 11
779 $         -------------------
780 $  row 3  |  o o o d d d o o o o o o
781 $  row 4  |  o o o d d d o o o o o o
782 $  row 5  |  o o o d d d o o o o o o
783 $         -------------------
784 $
785 
786    Thus, any entries in the d locations are stored in the d (diagonal)
787    submatrix, and any entries in the o locations are stored in the
788    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
789    stored simply in the MATSEQBAIJ format for compressed row storage.
790 
791    Now d_nz should indicate the number of nonzeros per row in the d matrix,
792    and o_nz should indicate the number of nonzeros per row in the o matrix.
793    In general, for PDE problems in which most nonzeros are near the diagonal,
794    one expects d_nz >> o_nz.   For additional details, see the users manual
795    chapter on matrices and the file $(PETSC_DIR)/Performance.
796 
797 .keywords: matrix, aij, compressed row, sparse, parallel
798 
799 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues()
800 @*/
801 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
802                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
803 {
804   Mat          B;
805   Mat_MPIBAIJ  *b;
806   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE;
807 
808   if (bs < 1) SETERRQ(1,"MatCreateMPIBAIJ: invalid block size specified");
809   *A = 0;
810   PetscHeaderCreate(B,_Mat,MAT_COOKIE,MATMPIBAIJ,comm);
811   PLogObjectCreate(B);
812   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
813   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
814   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
815   B->destroy    = MatDestroy_MPIBAIJ;
816   B->view       = MatView_MPIBAIJ;
817 
818   B->factor     = 0;
819   B->assembled  = PETSC_FALSE;
820 
821   b->insertmode = NOT_SET_VALUES;
822   MPI_Comm_rank(comm,&b->rank);
823   MPI_Comm_size(comm,&b->size);
824 
825   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
826     SETERRQ(1,"MatCreateMPIBAIJ:Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
827   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) SETERRQ(1,"MatCreateMPIBAIJ: either M or m should be specified");
828   if ( M == PETSC_DECIDE && n == PETSC_DECIDE)SETERRQ(1,"MatCreateMPIBAIJ: either N or n should be specified");
829   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
830   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
831 
832   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
833     work[0] = m; work[1] = n;
834     mbs = m/bs; nbs = n/bs;
835     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
836     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
837     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
838   }
839   if (m == PETSC_DECIDE) {
840     Mbs = M/bs;
841     if (Mbs*bs != M) SETERRQ(1,"MatCreateMPIBAIJ: No of global rows must be divisible by blocksize");
842     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
843     m   = mbs*bs;
844   }
845   if (n == PETSC_DECIDE) {
846     Nbs = N/bs;
847     if (Nbs*bs != N) SETERRQ(1,"MatCreateMPIBAIJ: No of global cols must be divisible by blocksize");
848     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
849     n   = nbs*bs;
850   }
851   if (mbs*bs != m || nbs*bs != n) SETERRQ(1,"MatCreateMPIBAIJ: No of local rows, cols must be divisible by blocksize");
852 
853   b->m = m; B->m = m;
854   b->n = n; B->n = n;
855   b->N = N; B->N = N;
856   b->M = M; B->M = M;
857   b->bs  = bs;
858   b->bs2 = bs*bs;
859   b->mbs = mbs;
860   b->nbs = nbs;
861   b->Mbs = Mbs;
862   b->Nbs = Nbs;
863 
864   /* build local table of row and column ownerships */
865   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
866   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
867   b->cowners = b->rowners + b->size + 1;
868   MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
869   b->rowners[0] = 0;
870   for ( i=2; i<=b->size; i++ ) {
871     b->rowners[i] += b->rowners[i-1];
872   }
873   b->rstart = b->rowners[b->rank];
874   b->rend   = b->rowners[b->rank+1];
875   MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
876   b->cowners[0] = 0;
877   for ( i=2; i<=b->size; i++ ) {
878     b->cowners[i] += b->cowners[i-1];
879   }
880   b->cstart = b->cowners[b->rank];
881   b->cend   = b->cowners[b->rank+1];
882 
883   if (d_nz == PETSC_DEFAULT) d_nz = 5;
884   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
885   PLogObjectParent(B,b->A);
886   if (o_nz == PETSC_DEFAULT) o_nz = 0;
887   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
888   PLogObjectParent(B,b->B);
889 
890   /* build cache for off array entries formed */
891   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
892   b->colmap      = 0;
893   b->garray      = 0;
894   b->roworiented = 1;
895 
896   /* stuff used for matrix vector multiply */
897   b->lvec      = 0;
898   b->Mvctx     = 0;
899 
900   /* stuff for MatGetRow() */
901   b->rowindices   = 0;
902   b->rowvalues    = 0;
903   b->getrowactive = PETSC_FALSE;
904 
905   *A = B;
906   return 0;
907 }
908 
909 #include "sys.h"
910 
911 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
912 {
913   Mat          A;
914   int          i, nz, ierr, j,rstart, rend, fd;
915   Scalar       *vals,*buf;
916   MPI_Comm     comm = ((PetscObject)viewer)->comm;
917   MPI_Status   status;
918   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
919   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
920   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,bs2,Mbs,mbs,extra_rows;
921   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
922   int          dcount,kmax,k,nzcount,tmp;
923 
924 
925   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
926   bs2  = bs*bs;
927 
928   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
929   if (!rank) {
930     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
931     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
932     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIBAIJ:not matrix object");
933   }
934 
935   MPI_Bcast(header+1,3,MPI_INT,0,comm);
936   M = header[1]; N = header[2];
937 
938 
939   if (M != N) SETERRQ(1,"MatLoad_SeqBAIJ:Can only do square matrices");
940 
941   /*
942      This code adds extra rows to make sure the number of rows is
943      divisible by the blocksize
944   */
945   Mbs        = M/bs;
946   extra_rows = bs - M + bs*(Mbs);
947   if (extra_rows == bs) extra_rows = 0;
948   else                  Mbs++;
949   if (extra_rows &&!rank) {
950     PLogInfo(0,"MatLoad_SeqBAIJ:Padding loaded matrix to match blocksize");
951   }
952   /* determine ownership of all rows */
953   mbs = Mbs/size + ((Mbs % size) > rank);
954   m   = mbs * bs;
955   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
956   browners = rowners + size + 1;
957   MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);
958   rowners[0] = 0;
959   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
960   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
961   rstart = rowners[rank];
962   rend   = rowners[rank+1];
963 
964   /* distribute row lengths to all processors */
965   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
966   if (!rank) {
967     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
968     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
969     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
970     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
971     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
972     MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);
973     PetscFree(sndcounts);
974   }
975   else {
976     MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);
977   }
978 
979   if (!rank) {
980     /* calculate the number of nonzeros on each processor */
981     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
982     PetscMemzero(procsnz,size*sizeof(int));
983     for ( i=0; i<size; i++ ) {
984       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
985         procsnz[i] += rowlengths[j];
986       }
987     }
988     PetscFree(rowlengths);
989 
990     /* determine max buffer needed and allocate it */
991     maxnz = 0;
992     for ( i=0; i<size; i++ ) {
993       maxnz = PetscMax(maxnz,procsnz[i]);
994     }
995     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
996 
997     /* read in my part of the matrix column indices  */
998     nz = procsnz[0];
999     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1000     mycols = ibuf;
1001     if (size == 1)  nz -= extra_rows;
1002     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1003     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
1004 
1005     /* read in every ones (except the last) and ship off */
1006     for ( i=1; i<size-1; i++ ) {
1007       nz = procsnz[i];
1008       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1009       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1010     }
1011     /* read in the stuff for the last proc */
1012     if ( size != 1 ) {
1013       nz = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
1014       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1015       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
1016       MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);
1017     }
1018     PetscFree(cols);
1019   }
1020   else {
1021     /* determine buffer space needed for message */
1022     nz = 0;
1023     for ( i=0; i<m; i++ ) {
1024       nz += locrowlens[i];
1025     }
1026     ibuf = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1027     mycols = ibuf;
1028     /* receive message of column indices*/
1029     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1030     MPI_Get_count(&status,MPI_INT,&maxnz);
1031     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIBAIJ:something is wrong with file");
1032   }
1033 
1034   /* loop over local rows, determining number of off diagonal entries */
1035   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
1036   odlens = dlens + (rend-rstart);
1037   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
1038   PetscMemzero(mask,3*Mbs*sizeof(int));
1039   masked1 = mask    + Mbs;
1040   masked2 = masked1 + Mbs;
1041   rowcount = 0; nzcount = 0;
1042   for ( i=0; i<mbs; i++ ) {
1043     dcount  = 0;
1044     odcount = 0;
1045     for ( j=0; j<bs; j++ ) {
1046       kmax = locrowlens[rowcount];
1047       for ( k=0; k<kmax; k++ ) {
1048         tmp = mycols[nzcount++]/bs;
1049         if (!mask[tmp]) {
1050           mask[tmp] = 1;
1051           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
1052           else masked1[dcount++] = tmp;
1053         }
1054       }
1055       rowcount++;
1056     }
1057 
1058     dlens[i]  = dcount;
1059     odlens[i] = odcount;
1060 
1061     /* zero out the mask elements we set */
1062     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
1063     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
1064   }
1065 
1066   /* create our matrix */
1067   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);CHKERRQ(ierr);
1068   A = *newmat;
1069   MatSetOption(A,COLUMNS_SORTED);
1070 
1071   if (!rank) {
1072     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
1073     /* read in my part of the matrix numerical values  */
1074     nz = procsnz[0];
1075     vals = buf;
1076     mycols = ibuf;
1077     if (size == 1)  nz -= extra_rows;
1078     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1079     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
1080     /* insert into matrix */
1081     jj      = rstart*bs;
1082     for ( i=0; i<m; i++ ) {
1083       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1084       mycols += locrowlens[i];
1085       vals   += locrowlens[i];
1086       jj++;
1087     }
1088     /* read in other processors( except the last one) and ship out */
1089     for ( i=1; i<size-1; i++ ) {
1090       nz = procsnz[i];
1091       vals = buf;
1092       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1093       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1094     }
1095     /* the last proc */
1096     if ( size != 1 ){
1097       nz = procsnz[i] - extra_rows;
1098       vals = buf;
1099       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1100       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
1101       MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);
1102     }
1103     PetscFree(procsnz);
1104   }
1105   else {
1106     /* receive numeric values */
1107     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
1108 
1109     /* receive message of values*/
1110     vals = buf;
1111     mycols = ibuf;
1112     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1113     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1114     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIBAIJ:something is wrong with file");
1115 
1116     /* insert into matrix */
1117     jj      = rstart*bs;
1118     for ( i=0; i<m; i++ ) {
1119       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1120       mycols += locrowlens[i];
1121       vals   += locrowlens[i];
1122       jj++;
1123     }
1124   }
1125   PetscFree(locrowlens);
1126   PetscFree(buf);
1127   PetscFree(ibuf);
1128   PetscFree(rowners);
1129   PetscFree(dlens);
1130   PetscFree(mask);
1131   ierr = MatAssemblyBegin(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
1132   ierr = MatAssemblyEnd(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
1133   return 0;
1134 }
1135 
1136 
1137