xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision bd16c2fe4d4045da961594cb53fb6c4a23bb4ff7)
1 #ifndef lint
2 static char vcid[] = "$Id: mpibaij.c,v 1.7 1996/07/01 23:20:26 balay Exp balay $";
3 #endif
4 
5 #include "mpibaij.h"
6 
7 
8 #include "draw.h"
9 #include "pinclude/pviewer.h"
10 
11 extern int MatSetUpMultiply_MPIBAIJ(Mat);
12 extern int DisAssemble_MPIBAIJ(Mat);
13 
14 /* local utility routine that creates a mapping from the global column
15 number to the local number in the off-diagonal part of the local
16 storage of the matrix.  This is done in a non scable way since the
17 length of colmap equals the global matrix length.
18 */
19 static int CreateColmap_MPIBAIJ_Private(Mat mat)
20 {
21   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
22   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
23   int        nbs = B->nbs,i;
24 
25   baij->colmap = (int *) PetscMalloc(baij->Nbs*sizeof(int));CHKPTRQ(baij->colmap);
26   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
27   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
28   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i;
29   return 0;
30 }
31 
32 
33 static int MatGetReordering_MPIBAIJ(Mat mat,MatReordering type,IS *rperm,IS *cperm)
34 {
35   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
36   int         ierr;
37   if (baij->size == 1) {
38     ierr = MatGetReordering(baij->A,type,rperm,cperm); CHKERRQ(ierr);
39   } else SETERRQ(1,"MatGetReordering_MPIBAIJ:not supported in parallel");
40   return 0;
41 }
42 
43 static int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
44 {
45   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
46   Scalar      value;
47   int         ierr,i,j, rstart = baij->rstart, rend = baij->rend;
48   int         cstart = baij->cstart, cend = baij->cend,row,col;
49   int         roworiented = baij->roworiented,rstart_orig,rend_orig;
50   int         cstart_orig,cend_orig,bs=baij->bs;
51 
52   if (baij->insertmode != NOT_SET_VALUES && baij->insertmode != addv) {
53     SETERRQ(1,"MatSetValues_MPIBAIJ:Cannot mix inserts and adds");
54   }
55   baij->insertmode = addv;
56   rstart_orig = rstart*bs;
57   rend_orig   = rend*bs;
58   cstart_orig = cstart*bs;
59   cend_orig   = cend*bs;
60   for ( i=0; i<m; i++ ) {
61     if (im[i] < 0) SETERRQ(1,"MatSetValues_MPIBAIJ:Negative row");
62     if (im[i] >= baij->M) SETERRQ(1,"MatSetValues_MPIBAIJ:Row too large");
63     if (im[i] >= rstart_orig && im[i] < rend_orig) {
64       row = im[i] - rstart_orig;
65       for ( j=0; j<n; j++ ) {
66         if (in[j] < 0) SETERRQ(1,"MatSetValues_MPIBAIJ:Negative column");
67         if (in[j] >= baij->N) SETERRQ(1,"MatSetValues_MPIBAIJ:Col too large");
68         if (in[j] >= cstart_orig && in[j] < cend_orig){
69           col = in[j] - cstart_orig;
70           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
71           ierr = MatSetValues(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
72         }
73         else {
74           if (mat->was_assembled) {
75             if (!baij->colmap) {ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);}
76             col = baij->colmap[in[j]];
77             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
78               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
79               col =  in[j];
80             }
81           }
82           else col = in[j];
83           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
84           ierr = MatSetValues(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
85         }
86       }
87     }
88     else {
89       if (roworiented) {
90         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
91       }
92       else {
93         row = im[i];
94         for ( j=0; j<n; j++ ) {
95           ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
96         }
97       }
98     }
99   }
100   return 0;
101 }
102 
103 static int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
104 {
105   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
106   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
107   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col;
108 
109   for ( i=0; i<m; i++ ) {
110     if (idxm[i] < 0) SETERRQ(1,"MatGetValues_MPIBAIJ:Negative row");
111     if (idxm[i] >= baij->M) SETERRQ(1,"MatGetValues_MPIBAIJ:Row too large");
112     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
113       row = idxm[i] - bsrstart;
114       for ( j=0; j<n; j++ ) {
115         if (idxn[j] < 0) SETERRQ(1,"MatGetValues_MPIBAIJ:Negative column");
116         if (idxn[j] >= baij->N) SETERRQ(1,"MatGetValues_MPIBAIJ:Col too large");
117         if (idxn[j] >= bscstart && idxn[j] < bscend){
118           col = idxn[j] - bscstart;
119           ierr = MatGetValues(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
120         }
121         else {
122           if (!baij->colmap) {ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);}
123           if ( baij->garray[baij->colmap[idxn[j]/bs]] != idxn[j]/bs ) *(v+i*n+j) = 0.0;
124           else {
125             col = baij->colmap[idxn[j]/bs]*bs + idxn[j]%bs;
126             ierr = MatGetValues(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
127           }
128         }
129       }
130     }
131     else {
132       SETERRQ(1,"MatGetValues_MPIBAIJ:Only local values currently supported");
133     }
134   }
135   return 0;
136 }
137 
138 static int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
139 {
140   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
141   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
142   int        ierr, i,bs2=baij->bs2;
143   double     sum = 0.0;
144   Scalar     *v;
145 
146   if (baij->size == 1) {
147     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
148   } else {
149     if (type == NORM_FROBENIUS) {
150       v = amat->a;
151       for (i=0; i<amat->nz*bs2; i++ ) {
152 #if defined(PETSC_COMPLEX)
153         sum += real(conj(*v)*(*v)); v++;
154 #else
155         sum += (*v)*(*v); v++;
156 #endif
157       }
158       v = bmat->a;
159       for (i=0; i<bmat->nz*bs2; i++ ) {
160 #if defined(PETSC_COMPLEX)
161         sum += real(conj(*v)*(*v)); v++;
162 #else
163         sum += (*v)*(*v); v++;
164 #endif
165       }
166       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
167       *norm = sqrt(*norm);
168     }
169     else
170       SETERRQ(1,"MatNorm_SeqBAIJ:No support for this norm yet");
171   }
172   return 0;
173 }
174 
175 static int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
176 {
177   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
178   MPI_Comm    comm = mat->comm;
179   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
180   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
181   MPI_Request *send_waits,*recv_waits;
182   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
183   InsertMode  addv;
184   Scalar      *rvalues,*svalues;
185 
186   /* make sure all processors are either in INSERTMODE or ADDMODE */
187   MPI_Allreduce(&baij->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
188   if (addv == (ADD_VALUES|INSERT_VALUES)) {
189     SETERRQ(1,"MatAssemblyBegin_MPIBAIJ:Some processors inserted others added");
190   }
191   baij->insertmode = addv; /* in case this processor had no cache */
192 
193   /*  first count number of contributors to each processor */
194   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
195   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
196   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
197   for ( i=0; i<baij->stash.n; i++ ) {
198     idx = baij->stash.idx[i];
199     for ( j=0; j<size; j++ ) {
200       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
201         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
202       }
203     }
204   }
205   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
206 
207   /* inform other processors of number of messages and max length*/
208   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
209   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
210   nreceives = work[rank];
211   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
212   nmax = work[rank];
213   PetscFree(work);
214 
215   /* post receives:
216        1) each message will consist of ordered pairs
217      (global index,value) we store the global index as a double
218      to simplify the message passing.
219        2) since we don't know how long each individual message is we
220      allocate the largest needed buffer for each receive. Potentially
221      this is a lot of wasted space.
222 
223 
224        This could be done better.
225   */
226   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
227   CHKPTRQ(rvalues);
228   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
229   CHKPTRQ(recv_waits);
230   for ( i=0; i<nreceives; i++ ) {
231     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
232               comm,recv_waits+i);
233   }
234 
235   /* do sends:
236       1) starts[i] gives the starting index in svalues for stuff going to
237          the ith processor
238   */
239   svalues = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
240   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
241   CHKPTRQ(send_waits);
242   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
243   starts[0] = 0;
244   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
245   for ( i=0; i<baij->stash.n; i++ ) {
246     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
247     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
248     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
249   }
250   PetscFree(owner);
251   starts[0] = 0;
252   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
253   count = 0;
254   for ( i=0; i<size; i++ ) {
255     if (procs[i]) {
256       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
257                 comm,send_waits+count++);
258     }
259   }
260   PetscFree(starts); PetscFree(nprocs);
261 
262   /* Free cache space */
263   PLogInfo(0,"[%d]MatAssemblyBegin_MPIBAIJ:Number of off processor values %d\n",rank,baij->stash.n);
264   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
265 
266   baij->svalues    = svalues;    baij->rvalues    = rvalues;
267   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
268   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
269   baij->rmax       = nmax;
270 
271   return 0;
272 }
273 
274 
275 static int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
276 {
277   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
278   MPI_Status  *send_status,recv_status;
279   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
280   int         bs=baij->bs,row,col,other_disassembled;
281   Scalar      *values,val;
282   InsertMode  addv = baij->insertmode;
283 
284   /*  wait on receives */
285   while (count) {
286     MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);
287     /* unpack receives into our local space */
288     values = baij->rvalues + 3*imdex*baij->rmax;
289     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
290     n = n/3;
291     for ( i=0; i<n; i++ ) {
292       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
293       col = (int) PetscReal(values[3*i+1]);
294       val = values[3*i+2];
295       if (col >= baij->cstart*bs && col < baij->cend*bs) {
296         col -= baij->cstart*bs;
297         MatSetValues(baij->A,1,&row,1,&col,&val,addv);
298       }
299       else {
300         if (mat->was_assembled) {
301           if (!baij->colmap) {ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);}
302           col = baij->colmap[col/bs]*bs + col%bs;
303           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
304             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
305             col = (int) PetscReal(values[3*i+1]);
306           }
307         }
308         MatSetValues(baij->B,1,&row,1,&col,&val,addv);
309       }
310     }
311     count--;
312   }
313   PetscFree(baij->recv_waits); PetscFree(baij->rvalues);
314 
315   /* wait on sends */
316   if (baij->nsends) {
317     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));
318     CHKPTRQ(send_status);
319     MPI_Waitall(baij->nsends,baij->send_waits,send_status);
320     PetscFree(send_status);
321   }
322   PetscFree(baij->send_waits); PetscFree(baij->svalues);
323 
324   baij->insertmode = NOT_SET_VALUES;
325   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
326   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
327 
328   /* determine if any processor has disassembled, if so we must
329      also disassemble ourselfs, in order that we may reassemble. */
330   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
331   if (mat->was_assembled && !other_disassembled) {
332     ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
333   }
334 
335   if (!mat->was_assembled && mode == FINAL_ASSEMBLY) {
336     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
337   }
338   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
339   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
340 
341   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
342   return 0;
343 }
344 
345 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
346 {
347   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
348   int          ierr;
349 
350   if (baij->size == 1) {
351     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
352   }
353   else SETERRQ(1,"MatView_MPIBAIJ_Binary:Only uniprocessor output supported");
354   return 0;
355 }
356 
357 static int MatView_MPIBAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
358 {
359   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
360   int          ierr, format,rank,bs=baij->bs;
361   FILE         *fd;
362   ViewerType   vtype;
363 
364   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
365   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
366     ierr = ViewerGetFormat(viewer,&format);
367     if (format == ASCII_FORMAT_INFO_DETAILED) {
368       int nz, nzalloc, mem;
369       MPI_Comm_rank(mat->comm,&rank);
370       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
371       ierr = MatGetInfo(mat,MAT_LOCAL,&nz,&nzalloc,&mem);
372       PetscSequentialPhaseBegin(mat->comm,1);
373       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
374               rank,baij->m,nz*bs,nzalloc*bs,baij->bs,mem);
375       ierr = MatGetInfo(baij->A,MAT_LOCAL,&nz,&nzalloc,&mem);
376       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,nz*bs);
377       ierr = MatGetInfo(baij->B,MAT_LOCAL,&nz,&nzalloc,&mem);
378       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,nz*bs);
379       fflush(fd);
380       PetscSequentialPhaseEnd(mat->comm,1);
381       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
382       return 0;
383     }
384     else if (format == ASCII_FORMAT_INFO) {
385       return 0;
386     }
387   }
388 
389   if (vtype == DRAW_VIEWER) {
390     Draw       draw;
391     PetscTruth isnull;
392     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
393     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
394   }
395 
396   if (vtype == ASCII_FILE_VIEWER) {
397     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
398     PetscSequentialPhaseBegin(mat->comm,1);
399     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
400            baij->rank,baij->m,baij->rstart*bs,baij->rend*bs,baij->n,
401             baij->cstart*bs,baij->cend*bs);
402     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
403     ierr = MatView(baij->B,viewer); CHKERRQ(ierr);
404     fflush(fd);
405     PetscSequentialPhaseEnd(mat->comm,1);
406   }
407   else {
408     int size = baij->size;
409     rank = baij->rank;
410     if (size == 1) {
411       ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
412     }
413     else {
414       /* assemble the entire matrix onto first processor. */
415       Mat         A;
416       Mat_SeqBAIJ *Aloc;
417       int         M = baij->M, N = baij->N,*ai,*aj,row,col,i,j,k,*rvals;
418       int         mbs=baij->mbs;
419       Scalar      *a;
420 
421       if (!rank) {
422         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
423         CHKERRQ(ierr);
424       }
425       else {
426         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
427         CHKERRQ(ierr);
428       }
429       PLogObjectParent(mat,A);
430 
431       /* copy over the A part */
432       Aloc = (Mat_SeqBAIJ*) baij->A->data;
433       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
434       row = baij->rstart;
435       rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
436 
437       for ( i=0; i<mbs; i++ ) {
438         rvals[0] = bs*(baij->rstart + i);
439         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
440         for ( j=ai[i]; j<ai[i+1]; j++ ) {
441           col = (baij->cstart+aj[j])*bs;
442           for (k=0; k<bs; k++ ) {
443             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
444             col++; a += bs;
445           }
446         }
447       }
448       /* copy over the B part */
449       Aloc = (Mat_SeqBAIJ*) baij->B->data;
450       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
451       row = baij->rstart*bs;
452       for ( i=0; i<mbs; i++ ) {
453         rvals[0] = bs*(baij->rstart + i);
454         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
455         for ( j=ai[i]; j<ai[i+1]; j++ ) {
456           col = baij->garray[aj[j]]*bs;
457           for (k=0; k<bs; k++ ) {
458             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
459             col++; a += bs;
460           }
461         }
462       }
463       PetscFree(rvals);
464       ierr = MatAssemblyBegin(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
465       ierr = MatAssemblyEnd(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
466       if (!rank) {
467         ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
468       }
469       ierr = MatDestroy(A); CHKERRQ(ierr);
470     }
471   }
472   return 0;
473 }
474 
475 
476 
477 static int MatView_MPIBAIJ(PetscObject obj,Viewer viewer)
478 {
479   Mat         mat = (Mat) obj;
480   int         ierr;
481   ViewerType  vtype;
482 
483   if (!viewer) {
484     viewer = STDOUT_VIEWER_SELF;
485   }
486   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
487   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
488       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
489     ierr = MatView_MPIBAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
490   }
491   else if (vtype == BINARY_FILE_VIEWER) {
492     return MatView_MPIBAIJ_Binary(mat,viewer);
493   }
494   return 0;
495 }
496 
497 static int MatDestroy_MPIBAIJ(PetscObject obj)
498 {
499   Mat         mat = (Mat) obj;
500   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
501   int         ierr;
502 
503 #if defined(PETSC_LOG)
504   PLogObjectState(obj,"Rows=%d, Cols=%d",baij->M,baij->N);
505 #endif
506 
507   PetscFree(baij->rowners);
508   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
509   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
510   if (baij->colmap) PetscFree(baij->colmap);
511   if (baij->garray) PetscFree(baij->garray);
512   if (baij->lvec)   VecDestroy(baij->lvec);
513   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
514   if (baij->rowvalues) PetscFree(baij->rowvalues);
515   PetscFree(baij);
516   PLogObjectDestroy(mat);
517   PetscHeaderDestroy(mat);
518   return 0;
519 }
520 
521 static int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
522 {
523   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
524   int        ierr;
525 
526   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx); CHKERRQ(ierr);
527   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
528   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx); CHKERRQ(ierr);
529   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
530   return 0;
531 }
532 
533 static int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
534 {
535   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
536   int        ierr;
537   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
538   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
539   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
540   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
541   return 0;
542 }
543 
544 static int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
545 {
546   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
547   int        ierr;
548 
549   /* do nondiagonal part */
550   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
551   /* send it on its way */
552   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,
553                 (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
554   /* do local part */
555   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
556   /* receive remote parts: note this assumes the values are not actually */
557   /* inserted in yy until the next line, which is true for my implementation*/
558   /* but is not perhaps always true. */
559   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,
560                   (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
561   return 0;
562 }
563 
564 static int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
565 {
566   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
567   int        ierr;
568 
569   /* do nondiagonal part */
570   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
571   /* send it on its way */
572   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,
573                  (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
574   /* do local part */
575   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
576   /* receive remote parts: note this assumes the values are not actually */
577   /* inserted in yy until the next line, which is true for my implementation*/
578   /* but is not perhaps always true. */
579   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,
580                   (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
581   return 0;
582 }
583 
584 /*
585   This only works correctly for square matrices where the subblock A->A is the
586    diagonal block
587 */
588 static int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
589 {
590   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
591   if (a->M != a->N)
592     SETERRQ(1,"MatGetDiagonal_MPIBAIJ:Supports only square matrix where A->A is diag block");
593   return MatGetDiagonal(a->A,v);
594 }
595 
596 static int MatScale_MPIBAIJ(Scalar *aa,Mat A)
597 {
598   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
599   int        ierr;
600   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
601   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
602   return 0;
603 }
604 static int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
605 {
606   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
607   *m = mat->M; *n = mat->N;
608   return 0;
609 }
610 
611 static int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
612 {
613   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
614   *m = mat->m; *n = mat->N;
615   return 0;
616 }
617 
618 static int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
619 {
620   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
621   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
622   return 0;
623 }
624 
625 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
626 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
627 
628 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
629 {
630   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
631   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
632   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
633   int        nztot, nzA, nzB, lrow, brstart = mat->rstart*bs, brend = mat->rend*bs;
634   int        *cmap, *idx_p,cstart = mat->cstart;
635 
636   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,"MatGetRow_MPIBAIJ:Already active");
637   mat->getrowactive = PETSC_TRUE;
638 
639   if (!mat->rowvalues && (idx || v)) {
640     /*
641         allocate enough space to hold information from the longest row.
642     */
643     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
644     int     max = 1,mbs = mat->mbs,tmp;
645     for ( i=0; i<mbs; i++ ) {
646       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
647       if (max < tmp) { max = tmp; }
648     }
649     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
650     CHKPTRQ(mat->rowvalues);
651     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
652   }
653 
654 
655   if (row < brstart || row >= brend) SETERRQ(1,"MatGetRow_MPIBAIJ:Only local rows")
656   lrow = row - brstart;
657 
658   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
659   if (!v)   {pvA = 0; pvB = 0;}
660   if (!idx) {pcA = 0; if (!v) pcB = 0;}
661   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
662   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
663   nztot = nzA + nzB;
664 
665   cmap  = mat->garray;
666   if (v  || idx) {
667     if (nztot) {
668       /* Sort by increasing column numbers, assuming A and B already sorted */
669       int imark = -1;
670       if (v) {
671         *v = v_p = mat->rowvalues;
672         for ( i=0; i<nzB; i++ ) {
673           if (cmap[cworkB[i]/bs] < cstart)   v_p[i] = vworkB[i];
674           else break;
675         }
676         imark = i;
677         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
678         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
679       }
680       if (idx) {
681         *idx = idx_p = mat->rowindices;
682         if (imark > -1) {
683           for ( i=0; i<imark; i++ ) {
684             idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs;
685           }
686         } else {
687           for ( i=0; i<nzB; i++ ) {
688             if (cmap[cworkB[i]/bs] < cstart)
689               idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
690             else break;
691           }
692           imark = i;
693         }
694         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart*bs + cworkA[i];
695         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
696       }
697     }
698     else {*idx = 0; *v=0;}
699   }
700   *nz = nztot;
701   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
702   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
703   return 0;
704 }
705 
706 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
707 {
708   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
709   if (baij->getrowactive == PETSC_FALSE) {
710     SETERRQ(1,"MatRestoreRow_MPIBAIJ:MatGetRow not called");
711   }
712   baij->getrowactive = PETSC_FALSE;
713   return 0;
714 }
715 
716 /* -------------------------------------------------------------------*/
717 static struct _MatOps MatOps = {
718   MatSetValues_MPIBAIJ,MatGetRow_MPIBAIJ,MatRestoreRow_MPIBAIJ,MatMult_MPIBAIJ,
719   MatMultAdd_MPIBAIJ,MatMultTrans_MPIBAIJ,MatMultTransAdd_MPIBAIJ,0,
720   0,0,0,0,
721   0,0,0,0,
722   0,MatGetDiagonal_MPIBAIJ,0,MatNorm_MPIBAIJ,
723   MatAssemblyBegin_MPIBAIJ,MatAssemblyEnd_MPIBAIJ,0,0,
724   0,0,MatGetReordering_MPIBAIJ,0,
725   0,0,0,MatGetSize_MPIBAIJ,
726   MatGetLocalSize_MPIBAIJ,MatGetOwnershipRange_MPIBAIJ,0,0,
727   0,0,0,0,
728   0,0,0,0,
729   0,0,0,0,
730   0,MatGetValues_MPIBAIJ,0,0,
731   MatScale_MPIBAIJ,0,0};
732 
733 
734 /*@C
735    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
736    (block compressed row).  For good matrix assembly performance
737    the user should preallocate the matrix storage by setting the parameters
738    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
739    performance can be increased by more than a factor of 50.
740 
741    Input Parameters:
742 .  comm - MPI communicator
743 .  bs   - size of blockk
744 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
745 .  n - number of local columns (or PETSC_DECIDE to have calculated
746            if N is given)
747 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
748 .  N - number of global columns (or PETSC_DECIDE to have calculated
749            if n is given)
750 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
751            submatrix  (same for all local rows)
752 .  d_nzz - number of block nonzeros per block row in diagonal portion of local
753            submatrix or null (possibly different for each row).  You must leave
754            room for the diagonal entry even if it is zero.
755 .  o_nz  - number of block nonzeros per block row in off-diagonal portion of local
756            submatrix (same for all local rows).
757 .  o_nzz - number of block nonzeros per block row in off-diagonal portion of local
758            submatrix or null (possibly different for each row).
759 
760    Output Parameter:
761 .  A - the matrix
762 
763    Notes:
764    The user MUST specify either the local or global matrix dimensions
765    (possibly both).
766 
767    Storage Information:
768    For a square global matrix we define each processor's diagonal portion
769    to be its local rows and the corresponding columns (a square submatrix);
770    each processor's off-diagonal portion encompasses the remainder of the
771    local matrix (a rectangular submatrix).
772 
773    The user can specify preallocated storage for the diagonal part of
774    the local submatrix with either d_nz or d_nnz (not both).  Set
775    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
776    memory allocation.  Likewise, specify preallocated storage for the
777    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
778 
779    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
780    the figure below we depict these three local rows and all columns (0-11).
781 
782 $          0 1 2 3 4 5 6 7 8 9 10 11
783 $         -------------------
784 $  row 3  |  o o o d d d o o o o o o
785 $  row 4  |  o o o d d d o o o o o o
786 $  row 5  |  o o o d d d o o o o o o
787 $         -------------------
788 $
789 
790    Thus, any entries in the d locations are stored in the d (diagonal)
791    submatrix, and any entries in the o locations are stored in the
792    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
793    stored simply in the MATSEQBAIJ format for compressed row storage.
794 
795    Now d_nz should indicate the number of nonzeros per row in the d matrix,
796    and o_nz should indicate the number of nonzeros per row in the o matrix.
797    In general, for PDE problems in which most nonzeros are near the diagonal,
798    one expects d_nz >> o_nz.   For additional details, see the users manual
799    chapter on matrices and the file $(PETSC_DIR)/Performance.
800 
801 .keywords: matrix, aij, compressed row, sparse, parallel
802 
803 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues()
804 @*/
805 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
806                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
807 {
808   Mat          B;
809   Mat_MPIBAIJ  *b;
810   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE;
811 
812   if (bs < 1) SETERRQ(1,"MatCreateMPIBAIJ: invalid block size specified");
813   *A = 0;
814   PetscHeaderCreate(B,_Mat,MAT_COOKIE,MATMPIBAIJ,comm);
815   PLogObjectCreate(B);
816   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
817   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
818   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
819   B->destroy    = MatDestroy_MPIBAIJ;
820   B->view       = MatView_MPIBAIJ;
821 
822   B->factor     = 0;
823   B->assembled  = PETSC_FALSE;
824 
825   b->insertmode = NOT_SET_VALUES;
826   MPI_Comm_rank(comm,&b->rank);
827   MPI_Comm_size(comm,&b->size);
828 
829   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
830     SETERRQ(1,"MatCreateMPIBAIJ:Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
831   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) SETERRQ(1,"MatCreateMPIBAIJ: either M or m should be specified");
832   if ( M == PETSC_DECIDE && n == PETSC_DECIDE)SETERRQ(1,"MatCreateMPIBAIJ: either N or n should be specified");
833   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
834   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
835 
836   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
837     work[0] = m; work[1] = n;
838     mbs = m/bs; nbs = n/bs;
839     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
840     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
841     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
842   }
843   if (m == PETSC_DECIDE) {
844     Mbs = M/bs;
845     if (Mbs*bs != M) SETERRQ(1,"MatCreateMPIBAIJ: No of global rows must be divisible by blocksize");
846     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
847     m   = mbs*bs;
848   }
849   if (n == PETSC_DECIDE) {
850     Nbs = N/bs;
851     if (Nbs*bs != N) SETERRQ(1,"MatCreateMPIBAIJ: No of global cols must be divisible by blocksize");
852     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
853     n   = nbs*bs;
854   }
855   if (mbs*bs != m || nbs*bs != n) SETERRQ(1,"MatCreateMPIBAIJ: No of local rows, cols must be divisible by blocksize");
856 
857   b->m = m; B->m = m;
858   b->n = n; B->n = n;
859   b->N = N; B->N = N;
860   b->M = M; B->M = M;
861   b->bs  = bs;
862   b->bs2 = bs*bs;
863   b->mbs = mbs;
864   b->nbs = nbs;
865   b->Mbs = Mbs;
866   b->Nbs = Nbs;
867 
868   /* build local table of row and column ownerships */
869   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
870   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
871   b->cowners = b->rowners + b->size + 1;
872   MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
873   b->rowners[0] = 0;
874   for ( i=2; i<=b->size; i++ ) {
875     b->rowners[i] += b->rowners[i-1];
876   }
877   b->rstart = b->rowners[b->rank];
878   b->rend   = b->rowners[b->rank+1];
879   MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
880   b->cowners[0] = 0;
881   for ( i=2; i<=b->size; i++ ) {
882     b->cowners[i] += b->cowners[i-1];
883   }
884   b->cstart = b->cowners[b->rank];
885   b->cend   = b->cowners[b->rank+1];
886 
887   if (d_nz == PETSC_DEFAULT) d_nz = 5;
888   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
889   PLogObjectParent(B,b->A);
890   if (o_nz == PETSC_DEFAULT) o_nz = 0;
891   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
892   PLogObjectParent(B,b->B);
893 
894   /* build cache for off array entries formed */
895   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
896   b->colmap      = 0;
897   b->garray      = 0;
898   b->roworiented = 1;
899 
900   /* stuff used for matrix vector multiply */
901   b->lvec      = 0;
902   b->Mvctx     = 0;
903 
904   /* stuff for MatGetRow() */
905   b->rowindices   = 0;
906   b->rowvalues    = 0;
907   b->getrowactive = PETSC_FALSE;
908 
909   *A = B;
910   return 0;
911 }
912 
913 #include "sys.h"
914 
915 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
916 {
917   Mat          A;
918   int          i, nz, ierr, j,rstart, rend, fd;
919   Scalar       *vals,*buf;
920   MPI_Comm     comm = ((PetscObject)viewer)->comm;
921   MPI_Status   status;
922   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
923   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
924   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,bs2,Mbs,mbs,extra_rows;
925   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
926   int          dcount,kmax,k,nzcount,tmp;
927 
928 
929   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
930   bs2  = bs*bs;
931 
932   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
933   if (!rank) {
934     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
935     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
936     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIBAIJ:not matrix object");
937   }
938 
939   MPI_Bcast(header+1,3,MPI_INT,0,comm);
940   M = header[1]; N = header[2];
941 
942 
943   if (M != N) SETERRQ(1,"MatLoad_SeqBAIJ:Can only do square matrices");
944 
945   /*
946      This code adds extra rows to make sure the number of rows is
947      divisible by the blocksize
948   */
949   Mbs        = M/bs;
950   extra_rows = bs - M + bs*(Mbs);
951   if (extra_rows == bs) extra_rows = 0;
952   else                  Mbs++;
953   if (extra_rows &&!rank) {
954     PLogInfo(0,"MatLoad_SeqBAIJ:Padding loaded matrix to match blocksize");
955   }
956   /* determine ownership of all rows */
957   mbs = Mbs/size + ((Mbs % size) > rank);
958   m   = mbs * bs;
959   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
960   browners = rowners + size + 1;
961   MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);
962   rowners[0] = 0;
963   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
964   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
965   rstart = rowners[rank];
966   rend   = rowners[rank+1];
967 
968   /* distribute row lengths to all processors */
969   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
970   if (!rank) {
971     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
972     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
973     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
974     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
975     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
976     MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);
977     PetscFree(sndcounts);
978   }
979   else {
980     MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);
981   }
982 
983   if (!rank) {
984     /* calculate the number of nonzeros on each processor */
985     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
986     PetscMemzero(procsnz,size*sizeof(int));
987     for ( i=0; i<size; i++ ) {
988       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
989         procsnz[i] += rowlengths[j];
990       }
991     }
992     PetscFree(rowlengths);
993 
994     /* determine max buffer needed and allocate it */
995     maxnz = 0;
996     for ( i=0; i<size; i++ ) {
997       maxnz = PetscMax(maxnz,procsnz[i]);
998     }
999     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1000 
1001     /* read in my part of the matrix column indices  */
1002     nz = procsnz[0];
1003     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1004     mycols = ibuf;
1005     if (size == 1)  nz -= extra_rows;
1006     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1007     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
1008 
1009     /* read in every ones (except the last) and ship off */
1010     for ( i=1; i<size-1; i++ ) {
1011       nz = procsnz[i];
1012       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1013       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1014     }
1015     /* read in the stuff for the last proc */
1016     if ( size != 1 ) {
1017       nz = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
1018       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1019       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
1020       MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);
1021     }
1022     PetscFree(cols);
1023   }
1024   else {
1025     /* determine buffer space needed for message */
1026     nz = 0;
1027     for ( i=0; i<m; i++ ) {
1028       nz += locrowlens[i];
1029     }
1030     ibuf = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1031     mycols = ibuf;
1032     /* receive message of column indices*/
1033     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1034     MPI_Get_count(&status,MPI_INT,&maxnz);
1035     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIBAIJ:something is wrong with file");
1036   }
1037 
1038   /* loop over local rows, determining number of off diagonal entries */
1039   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
1040   odlens = dlens + (rend-rstart);
1041   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
1042   PetscMemzero(mask,3*Mbs*sizeof(int));
1043   masked1 = mask    + Mbs;
1044   masked2 = masked1 + Mbs;
1045   rowcount = 0; nzcount = 0;
1046   for ( i=0; i<mbs; i++ ) {
1047     dcount  = 0;
1048     odcount = 0;
1049     for ( j=0; j<bs; j++ ) {
1050       kmax = locrowlens[rowcount];
1051       for ( k=0; k<kmax; k++ ) {
1052         tmp = mycols[nzcount++]/bs;
1053         if (!mask[tmp]) {
1054           mask[tmp] = 1;
1055           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
1056           else masked1[dcount++] = tmp;
1057         }
1058       }
1059       rowcount++;
1060     }
1061 
1062     dlens[i]  = dcount;
1063     odlens[i] = odcount;
1064 
1065     /* zero out the mask elements we set */
1066     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
1067     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
1068   }
1069 
1070   /* create our matrix */
1071   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);CHKERRQ(ierr);
1072   A = *newmat;
1073   MatSetOption(A,COLUMNS_SORTED);
1074 
1075   if (!rank) {
1076     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
1077     /* read in my part of the matrix numerical values  */
1078     nz = procsnz[0];
1079     vals = buf;
1080     mycols = ibuf;
1081     if (size == 1)  nz -= extra_rows;
1082     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1083     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
1084     /* insert into matrix */
1085     jj      = rstart*bs;
1086     for ( i=0; i<m; i++ ) {
1087       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1088       mycols += locrowlens[i];
1089       vals   += locrowlens[i];
1090       jj++;
1091     }
1092     /* read in other processors( except the last one) and ship out */
1093     for ( i=1; i<size-1; i++ ) {
1094       nz = procsnz[i];
1095       vals = buf;
1096       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1097       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1098     }
1099     /* the last proc */
1100     if ( size != 1 ){
1101       nz = procsnz[i] - extra_rows;
1102       vals = buf;
1103       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1104       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
1105       MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);
1106     }
1107     PetscFree(procsnz);
1108   }
1109   else {
1110     /* receive numeric values */
1111     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
1112 
1113     /* receive message of values*/
1114     vals = buf;
1115     mycols = ibuf;
1116     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1117     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1118     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIBAIJ:something is wrong with file");
1119 
1120     /* insert into matrix */
1121     jj      = rstart*bs;
1122     for ( i=0; i<m; i++ ) {
1123       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1124       mycols += locrowlens[i];
1125       vals   += locrowlens[i];
1126       jj++;
1127     }
1128   }
1129   PetscFree(locrowlens);
1130   PetscFree(buf);
1131   PetscFree(ibuf);
1132   PetscFree(rowners);
1133   PetscFree(dlens);
1134   PetscFree(mask);
1135   ierr = MatAssemblyBegin(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
1136   ierr = MatAssemblyEnd(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
1137   return 0;
1138 }
1139 
1140 
1141