xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision 6d4a857725049b046d9f1dbe01d47d0c391d790c)
1 #ifndef lint
2 static char vcid[] = "$Id: mpibaij.c,v 1.11 1996/07/08 12:45:27 curfman Exp bsmith $";
3 #endif
4 
5 #include "mpibaij.h"
6 
7 
8 #include "draw.h"
9 #include "pinclude/pviewer.h"
10 
11 extern int MatSetUpMultiply_MPIBAIJ(Mat);
12 extern int DisAssemble_MPIBAIJ(Mat);
13 extern int MatIncreaseOverlap_MPIBAIJ(Mat,int,IS *,int);
14 extern int MatGetSubMatrices_MPIBAIJ(Mat,int,IS *,IS *,MatGetSubMatrixCall,Mat **);
15 
16 /* local utility routine that creates a mapping from the global column
17 number to the local number in the off-diagonal part of the local
18 storage of the matrix.  This is done in a non scable way since the
19 length of colmap equals the global matrix length.
20 */
21 static int CreateColmap_MPIBAIJ_Private(Mat mat)
22 {
23   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
24   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
25   int        nbs = B->nbs,i;
26 
27   baij->colmap = (int *) PetscMalloc(baij->Nbs*sizeof(int));CHKPTRQ(baij->colmap);
28   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
29   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
30   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i;
31   return 0;
32 }
33 
34 
35 static int MatGetReordering_MPIBAIJ(Mat mat,MatReordering type,IS *rperm,IS *cperm)
36 {
37   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
38   int         ierr;
39   if (baij->size == 1) {
40     ierr = MatGetReordering(baij->A,type,rperm,cperm); CHKERRQ(ierr);
41   } else SETERRQ(1,"MatGetReordering_MPIBAIJ:not supported in parallel");
42   return 0;
43 }
44 
45 static int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
46 {
47   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
48   Scalar      value;
49   int         ierr,i,j, rstart = baij->rstart, rend = baij->rend;
50   int         cstart = baij->cstart, cend = baij->cend,row,col;
51   int         roworiented = baij->roworiented,rstart_orig,rend_orig;
52   int         cstart_orig,cend_orig,bs=baij->bs;
53 
54   if (baij->insertmode != NOT_SET_VALUES && baij->insertmode != addv) {
55     SETERRQ(1,"MatSetValues_MPIBAIJ:Cannot mix inserts and adds");
56   }
57   baij->insertmode = addv;
58   rstart_orig = rstart*bs;
59   rend_orig   = rend*bs;
60   cstart_orig = cstart*bs;
61   cend_orig   = cend*bs;
62   for ( i=0; i<m; i++ ) {
63     if (im[i] < 0) SETERRQ(1,"MatSetValues_MPIBAIJ:Negative row");
64     if (im[i] >= baij->M) SETERRQ(1,"MatSetValues_MPIBAIJ:Row too large");
65     if (im[i] >= rstart_orig && im[i] < rend_orig) {
66       row = im[i] - rstart_orig;
67       for ( j=0; j<n; j++ ) {
68         if (in[j] < 0) SETERRQ(1,"MatSetValues_MPIBAIJ:Negative column");
69         if (in[j] >= baij->N) SETERRQ(1,"MatSetValues_MPIBAIJ:Col too large");
70         if (in[j] >= cstart_orig && in[j] < cend_orig){
71           col = in[j] - cstart_orig;
72           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
73           ierr = MatSetValues(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
74         }
75         else {
76           if (mat->was_assembled) {
77             if (!baij->colmap) {ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);}
78             col = baij->colmap[in[j]];
79             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
80               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
81               col =  in[j];
82             }
83           }
84           else col = in[j];
85           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
86           ierr = MatSetValues(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
87         }
88       }
89     }
90     else {
91       if (roworiented) {
92         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
93       }
94       else {
95         row = im[i];
96         for ( j=0; j<n; j++ ) {
97           ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
98         }
99       }
100     }
101   }
102   return 0;
103 }
104 
105 static int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
106 {
107   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
108   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
109   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col;
110 
111   for ( i=0; i<m; i++ ) {
112     if (idxm[i] < 0) SETERRQ(1,"MatGetValues_MPIBAIJ:Negative row");
113     if (idxm[i] >= baij->M) SETERRQ(1,"MatGetValues_MPIBAIJ:Row too large");
114     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
115       row = idxm[i] - bsrstart;
116       for ( j=0; j<n; j++ ) {
117         if (idxn[j] < 0) SETERRQ(1,"MatGetValues_MPIBAIJ:Negative column");
118         if (idxn[j] >= baij->N) SETERRQ(1,"MatGetValues_MPIBAIJ:Col too large");
119         if (idxn[j] >= bscstart && idxn[j] < bscend){
120           col = idxn[j] - bscstart;
121           ierr = MatGetValues(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
122         }
123         else {
124           if (!baij->colmap) {ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);}
125           if ( baij->garray[baij->colmap[idxn[j]/bs]] != idxn[j]/bs ) *(v+i*n+j) = 0.0;
126           else {
127             col = baij->colmap[idxn[j]/bs]*bs + idxn[j]%bs;
128             ierr = MatGetValues(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
129           }
130         }
131       }
132     }
133     else {
134       SETERRQ(1,"MatGetValues_MPIBAIJ:Only local values currently supported");
135     }
136   }
137   return 0;
138 }
139 
140 static int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
141 {
142   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
143   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
144   int        ierr, i,bs2=baij->bs2;
145   double     sum = 0.0;
146   Scalar     *v;
147 
148   if (baij->size == 1) {
149     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
150   } else {
151     if (type == NORM_FROBENIUS) {
152       v = amat->a;
153       for (i=0; i<amat->nz*bs2; i++ ) {
154 #if defined(PETSC_COMPLEX)
155         sum += real(conj(*v)*(*v)); v++;
156 #else
157         sum += (*v)*(*v); v++;
158 #endif
159       }
160       v = bmat->a;
161       for (i=0; i<bmat->nz*bs2; i++ ) {
162 #if defined(PETSC_COMPLEX)
163         sum += real(conj(*v)*(*v)); v++;
164 #else
165         sum += (*v)*(*v); v++;
166 #endif
167       }
168       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
169       *norm = sqrt(*norm);
170     }
171     else
172       SETERRQ(1,"MatNorm_SeqBAIJ:No support for this norm yet");
173   }
174   return 0;
175 }
176 
177 static int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
178 {
179   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
180   MPI_Comm    comm = mat->comm;
181   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
182   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
183   MPI_Request *send_waits,*recv_waits;
184   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
185   InsertMode  addv;
186   Scalar      *rvalues,*svalues;
187 
188   /* make sure all processors are either in INSERTMODE or ADDMODE */
189   MPI_Allreduce(&baij->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
190   if (addv == (ADD_VALUES|INSERT_VALUES)) {
191     SETERRQ(1,"MatAssemblyBegin_MPIBAIJ:Some processors inserted others added");
192   }
193   baij->insertmode = addv; /* in case this processor had no cache */
194 
195   /*  first count number of contributors to each processor */
196   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
197   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
198   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
199   for ( i=0; i<baij->stash.n; i++ ) {
200     idx = baij->stash.idx[i];
201     for ( j=0; j<size; j++ ) {
202       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
203         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
204       }
205     }
206   }
207   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
208 
209   /* inform other processors of number of messages and max length*/
210   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
211   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
212   nreceives = work[rank];
213   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
214   nmax = work[rank];
215   PetscFree(work);
216 
217   /* post receives:
218        1) each message will consist of ordered pairs
219      (global index,value) we store the global index as a double
220      to simplify the message passing.
221        2) since we don't know how long each individual message is we
222      allocate the largest needed buffer for each receive. Potentially
223      this is a lot of wasted space.
224 
225 
226        This could be done better.
227   */
228   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
229   CHKPTRQ(rvalues);
230   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
231   CHKPTRQ(recv_waits);
232   for ( i=0; i<nreceives; i++ ) {
233     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
234               comm,recv_waits+i);
235   }
236 
237   /* do sends:
238       1) starts[i] gives the starting index in svalues for stuff going to
239          the ith processor
240   */
241   svalues = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
242   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
243   CHKPTRQ(send_waits);
244   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
245   starts[0] = 0;
246   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
247   for ( i=0; i<baij->stash.n; i++ ) {
248     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
249     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
250     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
251   }
252   PetscFree(owner);
253   starts[0] = 0;
254   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
255   count = 0;
256   for ( i=0; i<size; i++ ) {
257     if (procs[i]) {
258       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
259                 comm,send_waits+count++);
260     }
261   }
262   PetscFree(starts); PetscFree(nprocs);
263 
264   /* Free cache space */
265   PLogInfo(0,"[%d]MatAssemblyBegin_MPIBAIJ:Number of off processor values %d\n",rank,baij->stash.n);
266   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
267 
268   baij->svalues    = svalues;    baij->rvalues    = rvalues;
269   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
270   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
271   baij->rmax       = nmax;
272 
273   return 0;
274 }
275 
276 
277 static int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
278 {
279   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
280   MPI_Status  *send_status,recv_status;
281   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
282   int         bs=baij->bs,row,col,other_disassembled;
283   Scalar      *values,val;
284   InsertMode  addv = baij->insertmode;
285 
286   /*  wait on receives */
287   while (count) {
288     MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);
289     /* unpack receives into our local space */
290     values = baij->rvalues + 3*imdex*baij->rmax;
291     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
292     n = n/3;
293     for ( i=0; i<n; i++ ) {
294       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
295       col = (int) PetscReal(values[3*i+1]);
296       val = values[3*i+2];
297       if (col >= baij->cstart*bs && col < baij->cend*bs) {
298         col -= baij->cstart*bs;
299         MatSetValues(baij->A,1,&row,1,&col,&val,addv);
300       }
301       else {
302         if (mat->was_assembled) {
303           if (!baij->colmap) {ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);}
304           col = baij->colmap[col/bs]*bs + col%bs;
305           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
306             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
307             col = (int) PetscReal(values[3*i+1]);
308           }
309         }
310         MatSetValues(baij->B,1,&row,1,&col,&val,addv);
311       }
312     }
313     count--;
314   }
315   PetscFree(baij->recv_waits); PetscFree(baij->rvalues);
316 
317   /* wait on sends */
318   if (baij->nsends) {
319     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));
320     CHKPTRQ(send_status);
321     MPI_Waitall(baij->nsends,baij->send_waits,send_status);
322     PetscFree(send_status);
323   }
324   PetscFree(baij->send_waits); PetscFree(baij->svalues);
325 
326   baij->insertmode = NOT_SET_VALUES;
327   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
328   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
329 
330   /* determine if any processor has disassembled, if so we must
331      also disassemble ourselfs, in order that we may reassemble. */
332   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
333   if (mat->was_assembled && !other_disassembled) {
334     ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
335   }
336 
337   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
338     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
339   }
340   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
341   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
342 
343   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
344   return 0;
345 }
346 
347 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
348 {
349   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
350   int          ierr;
351 
352   if (baij->size == 1) {
353     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
354   }
355   else SETERRQ(1,"MatView_MPIBAIJ_Binary:Only uniprocessor output supported");
356   return 0;
357 }
358 
359 static int MatView_MPIBAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
360 {
361   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
362   int          ierr, format,rank,bs=baij->bs;
363   FILE         *fd;
364   ViewerType   vtype;
365 
366   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
367   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
368     ierr = ViewerGetFormat(viewer,&format);
369     if (format == ASCII_FORMAT_INFO_DETAILED) {
370       int nz, nzalloc, mem;
371       MPI_Comm_rank(mat->comm,&rank);
372       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
373       ierr = MatGetInfo(mat,MAT_LOCAL,&nz,&nzalloc,&mem);
374       PetscSequentialPhaseBegin(mat->comm,1);
375       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
376               rank,baij->m,nz*bs,nzalloc*bs,baij->bs,mem);
377       ierr = MatGetInfo(baij->A,MAT_LOCAL,&nz,&nzalloc,&mem);
378       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,nz*bs);
379       ierr = MatGetInfo(baij->B,MAT_LOCAL,&nz,&nzalloc,&mem);
380       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,nz*bs);
381       fflush(fd);
382       PetscSequentialPhaseEnd(mat->comm,1);
383       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
384       return 0;
385     }
386     else if (format == ASCII_FORMAT_INFO) {
387       return 0;
388     }
389   }
390 
391   if (vtype == DRAW_VIEWER) {
392     Draw       draw;
393     PetscTruth isnull;
394     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
395     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
396   }
397 
398   if (vtype == ASCII_FILE_VIEWER) {
399     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
400     PetscSequentialPhaseBegin(mat->comm,1);
401     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
402            baij->rank,baij->m,baij->rstart*bs,baij->rend*bs,baij->n,
403             baij->cstart*bs,baij->cend*bs);
404     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
405     ierr = MatView(baij->B,viewer); CHKERRQ(ierr);
406     fflush(fd);
407     PetscSequentialPhaseEnd(mat->comm,1);
408   }
409   else {
410     int size = baij->size;
411     rank = baij->rank;
412     if (size == 1) {
413       ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
414     }
415     else {
416       /* assemble the entire matrix onto first processor. */
417       Mat         A;
418       Mat_SeqBAIJ *Aloc;
419       int         M = baij->M, N = baij->N,*ai,*aj,row,col,i,j,k,*rvals;
420       int         mbs=baij->mbs;
421       Scalar      *a;
422 
423       if (!rank) {
424         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
425         CHKERRQ(ierr);
426       }
427       else {
428         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
429         CHKERRQ(ierr);
430       }
431       PLogObjectParent(mat,A);
432 
433       /* copy over the A part */
434       Aloc = (Mat_SeqBAIJ*) baij->A->data;
435       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
436       row = baij->rstart;
437       rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
438 
439       for ( i=0; i<mbs; i++ ) {
440         rvals[0] = bs*(baij->rstart + i);
441         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
442         for ( j=ai[i]; j<ai[i+1]; j++ ) {
443           col = (baij->cstart+aj[j])*bs;
444           for (k=0; k<bs; k++ ) {
445             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
446             col++; a += bs;
447           }
448         }
449       }
450       /* copy over the B part */
451       Aloc = (Mat_SeqBAIJ*) baij->B->data;
452       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
453       row = baij->rstart*bs;
454       for ( i=0; i<mbs; i++ ) {
455         rvals[0] = bs*(baij->rstart + i);
456         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
457         for ( j=ai[i]; j<ai[i+1]; j++ ) {
458           col = baij->garray[aj[j]]*bs;
459           for (k=0; k<bs; k++ ) {
460             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
461             col++; a += bs;
462           }
463         }
464       }
465       PetscFree(rvals);
466       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
467       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
468       if (!rank) {
469         ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
470       }
471       ierr = MatDestroy(A); CHKERRQ(ierr);
472     }
473   }
474   return 0;
475 }
476 
477 
478 
479 static int MatView_MPIBAIJ(PetscObject obj,Viewer viewer)
480 {
481   Mat         mat = (Mat) obj;
482   int         ierr;
483   ViewerType  vtype;
484 
485   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
486   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
487       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
488     ierr = MatView_MPIBAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
489   }
490   else if (vtype == BINARY_FILE_VIEWER) {
491     return MatView_MPIBAIJ_Binary(mat,viewer);
492   }
493   return 0;
494 }
495 
496 static int MatDestroy_MPIBAIJ(PetscObject obj)
497 {
498   Mat         mat = (Mat) obj;
499   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
500   int         ierr;
501 
502 #if defined(PETSC_LOG)
503   PLogObjectState(obj,"Rows=%d, Cols=%d",baij->M,baij->N);
504 #endif
505 
506   PetscFree(baij->rowners);
507   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
508   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
509   if (baij->colmap) PetscFree(baij->colmap);
510   if (baij->garray) PetscFree(baij->garray);
511   if (baij->lvec)   VecDestroy(baij->lvec);
512   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
513   if (baij->rowvalues) PetscFree(baij->rowvalues);
514   PetscFree(baij);
515   PLogObjectDestroy(mat);
516   PetscHeaderDestroy(mat);
517   return 0;
518 }
519 
520 static int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
521 {
522   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
523   int         ierr, nt;
524 
525   ierr = VecGetLocalSize(xx,&nt);  CHKERRQ(ierr);
526   if (nt != a->n) {
527     SETERRQ(1,"MatMult_MPIAIJ:Incompatible parition of A and xx");
528   }
529   ierr = VecGetLocalSize(yy,&nt);  CHKERRQ(ierr);
530   if (nt != a->m) {
531     SETERRQ(1,"MatMult_MPIAIJ:Incompatible parition of A and yy");
532   }
533   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx); CHKERRQ(ierr);
534   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
535   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx); CHKERRQ(ierr);
536   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
537   return 0;
538 }
539 
540 static int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
541 {
542   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
543   int        ierr;
544   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
545   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
546   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
547   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
548   return 0;
549 }
550 
551 static int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
552 {
553   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
554   int        ierr;
555 
556   /* do nondiagonal part */
557   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
558   /* send it on its way */
559   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,
560                 (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
561   /* do local part */
562   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
563   /* receive remote parts: note this assumes the values are not actually */
564   /* inserted in yy until the next line, which is true for my implementation*/
565   /* but is not perhaps always true. */
566   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,
567                   (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
568   return 0;
569 }
570 
571 static int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
572 {
573   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
574   int        ierr;
575 
576   /* do nondiagonal part */
577   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
578   /* send it on its way */
579   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,
580                  (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
581   /* do local part */
582   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
583   /* receive remote parts: note this assumes the values are not actually */
584   /* inserted in yy until the next line, which is true for my implementation*/
585   /* but is not perhaps always true. */
586   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,
587                   (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
588   return 0;
589 }
590 
591 /*
592   This only works correctly for square matrices where the subblock A->A is the
593    diagonal block
594 */
595 static int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
596 {
597   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
598   if (a->M != a->N)
599     SETERRQ(1,"MatGetDiagonal_MPIBAIJ:Supports only square matrix where A->A is diag block");
600   return MatGetDiagonal(a->A,v);
601 }
602 
603 static int MatScale_MPIBAIJ(Scalar *aa,Mat A)
604 {
605   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
606   int        ierr;
607   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
608   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
609   return 0;
610 }
611 static int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
612 {
613   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
614   *m = mat->M; *n = mat->N;
615   return 0;
616 }
617 
618 static int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
619 {
620   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
621   *m = mat->m; *n = mat->N;
622   return 0;
623 }
624 
625 static int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
626 {
627   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
628   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
629   return 0;
630 }
631 
632 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
633 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
634 
635 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
636 {
637   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
638   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
639   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
640   int        nztot, nzA, nzB, lrow, brstart = mat->rstart*bs, brend = mat->rend*bs;
641   int        *cmap, *idx_p,cstart = mat->cstart;
642 
643   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,"MatGetRow_MPIBAIJ:Already active");
644   mat->getrowactive = PETSC_TRUE;
645 
646   if (!mat->rowvalues && (idx || v)) {
647     /*
648         allocate enough space to hold information from the longest row.
649     */
650     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
651     int     max = 1,mbs = mat->mbs,tmp;
652     for ( i=0; i<mbs; i++ ) {
653       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
654       if (max < tmp) { max = tmp; }
655     }
656     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
657     CHKPTRQ(mat->rowvalues);
658     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
659   }
660 
661 
662   if (row < brstart || row >= brend) SETERRQ(1,"MatGetRow_MPIBAIJ:Only local rows")
663   lrow = row - brstart;
664 
665   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
666   if (!v)   {pvA = 0; pvB = 0;}
667   if (!idx) {pcA = 0; if (!v) pcB = 0;}
668   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
669   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
670   nztot = nzA + nzB;
671 
672   cmap  = mat->garray;
673   if (v  || idx) {
674     if (nztot) {
675       /* Sort by increasing column numbers, assuming A and B already sorted */
676       int imark = -1;
677       if (v) {
678         *v = v_p = mat->rowvalues;
679         for ( i=0; i<nzB; i++ ) {
680           if (cmap[cworkB[i]/bs] < cstart)   v_p[i] = vworkB[i];
681           else break;
682         }
683         imark = i;
684         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
685         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
686       }
687       if (idx) {
688         *idx = idx_p = mat->rowindices;
689         if (imark > -1) {
690           for ( i=0; i<imark; i++ ) {
691             idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs;
692           }
693         } else {
694           for ( i=0; i<nzB; i++ ) {
695             if (cmap[cworkB[i]/bs] < cstart)
696               idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
697             else break;
698           }
699           imark = i;
700         }
701         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart*bs + cworkA[i];
702         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
703       }
704     }
705     else {
706       if (idx) *idx = 0;
707       if (v)   *v   = 0;
708     }
709   }
710   *nz = nztot;
711   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
712   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
713   return 0;
714 }
715 
716 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
717 {
718   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
719   if (baij->getrowactive == PETSC_FALSE) {
720     SETERRQ(1,"MatRestoreRow_MPIBAIJ:MatGetRow not called");
721   }
722   baij->getrowactive = PETSC_FALSE;
723   return 0;
724 }
725 
726 /* -------------------------------------------------------------------*/
727 static struct _MatOps MatOps = {
728   MatSetValues_MPIBAIJ,MatGetRow_MPIBAIJ,MatRestoreRow_MPIBAIJ,MatMult_MPIBAIJ,
729   MatMultAdd_MPIBAIJ,MatMultTrans_MPIBAIJ,MatMultTransAdd_MPIBAIJ,0,
730   0,0,0,0,
731   0,0,0,0,
732   0,MatGetDiagonal_MPIBAIJ,0,MatNorm_MPIBAIJ,
733   MatAssemblyBegin_MPIBAIJ,MatAssemblyEnd_MPIBAIJ,0,0,
734   0,0,MatGetReordering_MPIBAIJ,0,
735   0,0,0,MatGetSize_MPIBAIJ,
736   MatGetLocalSize_MPIBAIJ,MatGetOwnershipRange_MPIBAIJ,0,0,
737   0,0,0,0,
738   0,0,0,0,
739   0,0,0,MatGetSubMatrices_MPIBAIJ,
740   MatIncreaseOverlap_MPIBAIJ,MatGetValues_MPIBAIJ,0,0,
741   MatScale_MPIBAIJ,0,0};
742 
743 
744 /*@C
745    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
746    (block compressed row).  For good matrix assembly performance
747    the user should preallocate the matrix storage by setting the parameters
748    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
749    performance can be increased by more than a factor of 50.
750 
751    Input Parameters:
752 .  comm - MPI communicator
753 .  bs   - size of blockk
754 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
755            This value should be the same as the local size used in creating the
756            y vector for the matrix-vector product y = Ax.
757 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
758            This value should be the same as the local size used in creating the
759            x vector for the matrix-vector product y = Ax.
760 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
761 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
762 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
763            submatrix  (same for all local rows)
764 .  d_nzz - array containing the number of block nonzeros in the various block rows
765            of the in diagonal portion of the local (possibly different for each block
766            row) or PETSC_NULL.  You must leave room for the diagonal entry even if
767            it is zero.
768 .  o_nz  - number of block nonzeros per block row in the off-diagonal portion of local
769            submatrix (same for all local rows).
770 .  o_nzz - array containing the number of nonzeros in the various block rows of the
771            off-diagonal portion of the local submatrix (possibly different for
772            each block row) or PETSC_NULL.
773 
774    Output Parameter:
775 .  A - the matrix
776 
777    Notes:
778    The user MUST specify either the local or global matrix dimensions
779    (possibly both).
780 
781    Storage Information:
782    For a square global matrix we define each processor's diagonal portion
783    to be its local rows and the corresponding columns (a square submatrix);
784    each processor's off-diagonal portion encompasses the remainder of the
785    local matrix (a rectangular submatrix).
786 
787    The user can specify preallocated storage for the diagonal part of
788    the local submatrix with either d_nz or d_nnz (not both).  Set
789    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
790    memory allocation.  Likewise, specify preallocated storage for the
791    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
792 
793    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
794    the figure below we depict these three local rows and all columns (0-11).
795 
796 $          0 1 2 3 4 5 6 7 8 9 10 11
797 $         -------------------
798 $  row 3  |  o o o d d d o o o o o o
799 $  row 4  |  o o o d d d o o o o o o
800 $  row 5  |  o o o d d d o o o o o o
801 $         -------------------
802 $
803 
804    Thus, any entries in the d locations are stored in the d (diagonal)
805    submatrix, and any entries in the o locations are stored in the
806    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
807    stored simply in the MATSEQBAIJ format for compressed row storage.
808 
809    Now d_nz should indicate the number of nonzeros per row in the d matrix,
810    and o_nz should indicate the number of nonzeros per row in the o matrix.
811    In general, for PDE problems in which most nonzeros are near the diagonal,
812    one expects d_nz >> o_nz.   For large problems you MUST preallocate memory
813    or you will get TERRIBLE performance; see the users' manual chapter on
814    matrices and the file $(PETSC_DIR)/Performance.
815 
816 .keywords: matrix, block, aij, compressed row, sparse, parallel
817 
818 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues()
819 @*/
820 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
821                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
822 {
823   Mat          B;
824   Mat_MPIBAIJ  *b;
825   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE;
826 
827   if (bs < 1) SETERRQ(1,"MatCreateMPIBAIJ: invalid block size specified");
828   *A = 0;
829   PetscHeaderCreate(B,_Mat,MAT_COOKIE,MATMPIBAIJ,comm);
830   PLogObjectCreate(B);
831   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
832   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
833   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
834   B->destroy    = MatDestroy_MPIBAIJ;
835   B->view       = MatView_MPIBAIJ;
836 
837   B->factor     = 0;
838   B->assembled  = PETSC_FALSE;
839 
840   b->insertmode = NOT_SET_VALUES;
841   MPI_Comm_rank(comm,&b->rank);
842   MPI_Comm_size(comm,&b->size);
843 
844   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
845     SETERRQ(1,"MatCreateMPIBAIJ:Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
846   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) SETERRQ(1,"MatCreateMPIBAIJ: either M or m should be specified");
847   if ( M == PETSC_DECIDE && n == PETSC_DECIDE)SETERRQ(1,"MatCreateMPIBAIJ: either N or n should be specified");
848   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
849   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
850 
851   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
852     work[0] = m; work[1] = n;
853     mbs = m/bs; nbs = n/bs;
854     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
855     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
856     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
857   }
858   if (m == PETSC_DECIDE) {
859     Mbs = M/bs;
860     if (Mbs*bs != M) SETERRQ(1,"MatCreateMPIBAIJ: No of global rows must be divisible by blocksize");
861     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
862     m   = mbs*bs;
863   }
864   if (n == PETSC_DECIDE) {
865     Nbs = N/bs;
866     if (Nbs*bs != N) SETERRQ(1,"MatCreateMPIBAIJ: No of global cols must be divisible by blocksize");
867     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
868     n   = nbs*bs;
869   }
870   if (mbs*bs != m || nbs*bs != n) SETERRQ(1,"MatCreateMPIBAIJ: No of local rows, cols must be divisible by blocksize");
871 
872   b->m = m; B->m = m;
873   b->n = n; B->n = n;
874   b->N = N; B->N = N;
875   b->M = M; B->M = M;
876   b->bs  = bs;
877   b->bs2 = bs*bs;
878   b->mbs = mbs;
879   b->nbs = nbs;
880   b->Mbs = Mbs;
881   b->Nbs = Nbs;
882 
883   /* build local table of row and column ownerships */
884   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
885   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
886   b->cowners = b->rowners + b->size + 1;
887   MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
888   b->rowners[0] = 0;
889   for ( i=2; i<=b->size; i++ ) {
890     b->rowners[i] += b->rowners[i-1];
891   }
892   b->rstart = b->rowners[b->rank];
893   b->rend   = b->rowners[b->rank+1];
894   MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
895   b->cowners[0] = 0;
896   for ( i=2; i<=b->size; i++ ) {
897     b->cowners[i] += b->cowners[i-1];
898   }
899   b->cstart = b->cowners[b->rank];
900   b->cend   = b->cowners[b->rank+1];
901 
902   if (d_nz == PETSC_DEFAULT) d_nz = 5;
903   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
904   PLogObjectParent(B,b->A);
905   if (o_nz == PETSC_DEFAULT) o_nz = 0;
906   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
907   PLogObjectParent(B,b->B);
908 
909   /* build cache for off array entries formed */
910   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
911   b->colmap      = 0;
912   b->garray      = 0;
913   b->roworiented = 1;
914 
915   /* stuff used for matrix vector multiply */
916   b->lvec      = 0;
917   b->Mvctx     = 0;
918 
919   /* stuff for MatGetRow() */
920   b->rowindices   = 0;
921   b->rowvalues    = 0;
922   b->getrowactive = PETSC_FALSE;
923 
924   *A = B;
925   return 0;
926 }
927 
928 #include "sys.h"
929 
930 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
931 {
932   Mat          A;
933   int          i, nz, ierr, j,rstart, rend, fd;
934   Scalar       *vals,*buf;
935   MPI_Comm     comm = ((PetscObject)viewer)->comm;
936   MPI_Status   status;
937   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
938   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
939   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,bs2,Mbs,mbs,extra_rows;
940   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
941   int          dcount,kmax,k,nzcount,tmp;
942 
943 
944   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
945   bs2  = bs*bs;
946 
947   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
948   if (!rank) {
949     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
950     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
951     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIBAIJ:not matrix object");
952   }
953 
954   MPI_Bcast(header+1,3,MPI_INT,0,comm);
955   M = header[1]; N = header[2];
956 
957 
958   if (M != N) SETERRQ(1,"MatLoad_SeqBAIJ:Can only do square matrices");
959 
960   /*
961      This code adds extra rows to make sure the number of rows is
962      divisible by the blocksize
963   */
964   Mbs        = M/bs;
965   extra_rows = bs - M + bs*(Mbs);
966   if (extra_rows == bs) extra_rows = 0;
967   else                  Mbs++;
968   if (extra_rows &&!rank) {
969     PLogInfo(0,"MatLoad_SeqBAIJ:Padding loaded matrix to match blocksize");
970   }
971   /* determine ownership of all rows */
972   mbs = Mbs/size + ((Mbs % size) > rank);
973   m   = mbs * bs;
974   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
975   browners = rowners + size + 1;
976   MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);
977   rowners[0] = 0;
978   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
979   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
980   rstart = rowners[rank];
981   rend   = rowners[rank+1];
982 
983   /* distribute row lengths to all processors */
984   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
985   if (!rank) {
986     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
987     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
988     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
989     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
990     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
991     MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);
992     PetscFree(sndcounts);
993   }
994   else {
995     MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);
996   }
997 
998   if (!rank) {
999     /* calculate the number of nonzeros on each processor */
1000     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1001     PetscMemzero(procsnz,size*sizeof(int));
1002     for ( i=0; i<size; i++ ) {
1003       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
1004         procsnz[i] += rowlengths[j];
1005       }
1006     }
1007     PetscFree(rowlengths);
1008 
1009     /* determine max buffer needed and allocate it */
1010     maxnz = 0;
1011     for ( i=0; i<size; i++ ) {
1012       maxnz = PetscMax(maxnz,procsnz[i]);
1013     }
1014     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1015 
1016     /* read in my part of the matrix column indices  */
1017     nz = procsnz[0];
1018     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1019     mycols = ibuf;
1020     if (size == 1)  nz -= extra_rows;
1021     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1022     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
1023 
1024     /* read in every ones (except the last) and ship off */
1025     for ( i=1; i<size-1; i++ ) {
1026       nz = procsnz[i];
1027       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1028       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1029     }
1030     /* read in the stuff for the last proc */
1031     if ( size != 1 ) {
1032       nz = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
1033       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1034       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
1035       MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);
1036     }
1037     PetscFree(cols);
1038   }
1039   else {
1040     /* determine buffer space needed for message */
1041     nz = 0;
1042     for ( i=0; i<m; i++ ) {
1043       nz += locrowlens[i];
1044     }
1045     ibuf = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1046     mycols = ibuf;
1047     /* receive message of column indices*/
1048     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1049     MPI_Get_count(&status,MPI_INT,&maxnz);
1050     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIBAIJ:something is wrong with file");
1051   }
1052 
1053   /* loop over local rows, determining number of off diagonal entries */
1054   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
1055   odlens = dlens + (rend-rstart);
1056   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
1057   PetscMemzero(mask,3*Mbs*sizeof(int));
1058   masked1 = mask    + Mbs;
1059   masked2 = masked1 + Mbs;
1060   rowcount = 0; nzcount = 0;
1061   for ( i=0; i<mbs; i++ ) {
1062     dcount  = 0;
1063     odcount = 0;
1064     for ( j=0; j<bs; j++ ) {
1065       kmax = locrowlens[rowcount];
1066       for ( k=0; k<kmax; k++ ) {
1067         tmp = mycols[nzcount++]/bs;
1068         if (!mask[tmp]) {
1069           mask[tmp] = 1;
1070           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
1071           else masked1[dcount++] = tmp;
1072         }
1073       }
1074       rowcount++;
1075     }
1076 
1077     dlens[i]  = dcount;
1078     odlens[i] = odcount;
1079 
1080     /* zero out the mask elements we set */
1081     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
1082     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
1083   }
1084 
1085   /* create our matrix */
1086   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);CHKERRQ(ierr);
1087   A = *newmat;
1088   MatSetOption(A,MAT_COLUMNS_SORTED);
1089 
1090   if (!rank) {
1091     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
1092     /* read in my part of the matrix numerical values  */
1093     nz = procsnz[0];
1094     vals = buf;
1095     mycols = ibuf;
1096     if (size == 1)  nz -= extra_rows;
1097     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1098     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
1099     /* insert into matrix */
1100     jj      = rstart*bs;
1101     for ( i=0; i<m; i++ ) {
1102       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1103       mycols += locrowlens[i];
1104       vals   += locrowlens[i];
1105       jj++;
1106     }
1107     /* read in other processors( except the last one) and ship out */
1108     for ( i=1; i<size-1; i++ ) {
1109       nz = procsnz[i];
1110       vals = buf;
1111       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1112       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1113     }
1114     /* the last proc */
1115     if ( size != 1 ){
1116       nz = procsnz[i] - extra_rows;
1117       vals = buf;
1118       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1119       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
1120       MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);
1121     }
1122     PetscFree(procsnz);
1123   }
1124   else {
1125     /* receive numeric values */
1126     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
1127 
1128     /* receive message of values*/
1129     vals = buf;
1130     mycols = ibuf;
1131     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1132     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1133     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIBAIJ:something is wrong with file");
1134 
1135     /* insert into matrix */
1136     jj      = rstart*bs;
1137     for ( i=0; i<m; i++ ) {
1138       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1139       mycols += locrowlens[i];
1140       vals   += locrowlens[i];
1141       jj++;
1142     }
1143   }
1144   PetscFree(locrowlens);
1145   PetscFree(buf);
1146   PetscFree(ibuf);
1147   PetscFree(rowners);
1148   PetscFree(dlens);
1149   PetscFree(mask);
1150   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1151   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1152   return 0;
1153 }
1154 
1155 
1156