xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision 83e1b59cb1046d00ed4e5740a19a4a527e6508b2)
1 #ifndef lint
2 static char vcid[] = "$Id: mpibaij.c,v 1.4 1996/06/21 23:12:01 balay Exp balay $";
3 #endif
4 
5 #include "mpibaij.h"
6 
7 
8 #include "draw.h"
9 #include "pinclude/pviewer.h"
10 
11 extern int MatSetUpMultiply_MPIBAIJ(Mat);
12 extern int DisAssemble_MPIBAIJ(Mat);
13 
14 /* local utility routine that creates a mapping from the global column
15 number to the local number in the off-diagonal part of the local
16 storage of the matrix.  This is done in a non scable way since the
17 length of colmap equals the global matrix length.
18 */
19 static int CreateColmap_MPIBAIJ_Private(Mat mat)
20 {
21   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
22   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
23   int        nbs = B->nbs,i;
24 
25   baij->colmap = (int *) PetscMalloc(baij->Nbs*sizeof(int));CHKPTRQ(baij->colmap);
26   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
27   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
28   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i;
29   return 0;
30 }
31 
32 
33 static int MatGetReordering_MPIBAIJ(Mat mat,MatOrdering type,IS *rperm,IS *cperm)
34 {
35   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
36   int         ierr;
37   if (baij->size == 1) {
38     ierr = MatGetReordering(baij->A,type,rperm,cperm); CHKERRQ(ierr);
39   } else SETERRQ(1,"MatGetReordering_MPIBAIJ:not supported in parallel");
40   return 0;
41 }
42 
43 static int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
44 {
45   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
46   Scalar      value;
47   int         ierr,i,j, rstart = baij->rstart, rend = baij->rend;
48   int         cstart = baij->cstart, cend = baij->cend,row,col;
49   int         roworiented = baij->roworiented,rstart_orig,rend_orig;
50   int         cstart_orig,cend_orig,bs=baij->bs;
51 
52   if (baij->insertmode != NOT_SET_VALUES && baij->insertmode != addv) {
53     SETERRQ(1,"MatSetValues_MPIBAIJ:Cannot mix inserts and adds");
54   }
55   baij->insertmode = addv;
56   rstart_orig = rstart*bs;
57   rend_orig   = rend*bs;
58   cstart_orig = cstart*bs;
59   cend_orig   = cend*bs;
60   for ( i=0; i<m; i++ ) {
61     if (im[i] < 0) SETERRQ(1,"MatSetValues_MPIBAIJ:Negative row");
62     if (im[i] >= baij->M) SETERRQ(1,"MatSetValues_MPIBAIJ:Row too large");
63     if (im[i] >= rstart_orig && im[i] < rend_orig) {
64       row = im[i] - rstart_orig;
65       for ( j=0; j<n; j++ ) {
66         if (in[j] < 0) SETERRQ(1,"MatSetValues_MPIBAIJ:Negative column");
67         if (in[j] >= baij->N) SETERRQ(1,"MatSetValues_MPIBAIJ:Col too large");
68         if (in[j] >= cstart_orig && in[j] < cend_orig){
69           col = in[j] - cstart_orig;
70           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
71           ierr = MatSetValues(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
72         }
73         else {
74           if (mat->was_assembled) {
75             if (!baij->colmap) {ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);}
76             col = baij->colmap[in[j]];
77             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
78               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
79               col =  in[j];
80             }
81           }
82           else col = in[j];
83           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
84           ierr = MatSetValues(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
85         }
86       }
87     }
88     else {
89       if (roworiented) {
90         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
91       }
92       else {
93         row = im[i];
94         for ( j=0; j<n; j++ ) {
95           ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
96         }
97       }
98     }
99   }
100   return 0;
101 }
102 
103 static int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
104 {
105   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
106   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
107   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col;
108 
109   for ( i=0; i<m; i++ ) {
110     if (idxm[i] < 0) SETERRQ(1,"MatGetValues_MPIBAIJ:Negative row");
111     if (idxm[i] >= baij->M) SETERRQ(1,"MatGetValues_MPIBAIJ:Row too large");
112     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
113       row = idxm[i] - bsrstart;
114       for ( j=0; j<n; j++ ) {
115         if (idxn[j] < 0) SETERRQ(1,"MatGetValues_MPIBAIJ:Negative column");
116         if (idxn[j] >= baij->N) SETERRQ(1,"MatGetValues_MPIBAIJ:Col too large");
117         if (idxn[j] >= bscstart && idxn[j] < bscend){
118           col = idxn[j] - bscstart;
119           ierr = MatGetValues(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
120         }
121         else {
122           col = baij->colmap[idxn[j]/bs]*bs + col%bs;
123           ierr = MatGetValues(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
124         }
125       }
126     }
127     else {
128       SETERRQ(1,"MatGetValues_MPIBAIJ:Only local values currently supported");
129     }
130   }
131   return 0;
132 }
133 
134 static int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
135 {
136   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
137   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
138   int        ierr, i, j, cstart = baij->cstart,bs2=baij->bs2;
139   double     sum = 0.0;
140   Scalar     *v;
141 
142   if (baij->size == 1) {
143     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
144   } else {
145     if (type == NORM_FROBENIUS) {
146       v = amat->a;
147       for (i=0; i<amat->nz*bs2; i++ ) {
148 #if defined(PETSC_COMPLEX)
149         sum += real(conj(*v)*(*v)); v++;
150 #else
151         sum += (*v)*(*v); v++;
152 #endif
153       }
154       v = bmat->a;
155       for (i=0; i<bmat->nz*bs2; i++ ) {
156 #if defined(PETSC_COMPLEX)
157         sum += real(conj(*v)*(*v)); v++;
158 #else
159         sum += (*v)*(*v); v++;
160 #endif
161       }
162       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
163       *norm = sqrt(*norm);
164     }
165     else if (type == NORM_1) { /* max column norm */
166       double *tmp, *tmp2;
167       int    *jj, *garray = baij->garray;
168       tmp  = (double *) PetscMalloc( baij->N*sizeof(double) ); CHKPTRQ(tmp);
169       tmp2 = (double *) PetscMalloc( baij->N*sizeof(double) ); CHKPTRQ(tmp2);
170       PetscMemzero(tmp,baij->N*sizeof(double));
171       *norm = 0.0;
172       v = amat->a; jj = amat->j;
173       for ( j=0; j<amat->nz; j++ ) {
174         tmp[cstart + *jj++ ] += PetscAbsScalar(*v);  v++;
175       }
176       v = bmat->a; jj = bmat->j;
177       for ( j=0; j<bmat->nz; j++ ) {
178         tmp[garray[*jj++ ]] += PetscAbsScalar(*v); v++;
179       }
180       MPI_Allreduce(tmp,tmp2,baij->N,MPI_DOUBLE,MPI_SUM,mat->comm);
181       for ( j=0; j<baij->N; j++ ) {
182         if (tmp2[j] > *norm) *norm = tmp2[j];
183       }
184       PetscFree(tmp); PetscFree(tmp2);
185     }
186     else if (type == NORM_INFINITY) { /* max row norm */
187       double ntemp = 0.0;
188       for ( j=0; j<amat->m; j++ ) {
189         v = amat->a + amat->i[j];
190         sum = 0.0;
191         for ( i=0; i<amat->i[j+1]-amat->i[j]; i++ ) {
192           sum += PetscAbsScalar(*v); v++;
193         }
194         v = bmat->a + bmat->i[j];
195         for ( i=0; i<bmat->i[j+1]-bmat->i[j]; i++ ) {
196           sum += PetscAbsScalar(*v); v++;
197         }
198         if (sum > ntemp) ntemp = sum;
199       }
200       MPI_Allreduce(&ntemp,norm,1,MPI_DOUBLE,MPI_MAX,mat->comm);
201     }
202     else {
203       SETERRQ(1,"MatNorm_MPIBAIJ:No support for two norm");
204     }
205   }
206   return 0;
207 }
208 
209 static int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
210 {
211   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
212   MPI_Comm    comm = mat->comm;
213   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
214   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
215   MPI_Request *send_waits,*recv_waits;
216   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
217   InsertMode  addv;
218   Scalar      *rvalues,*svalues;
219 
220   /* make sure all processors are either in INSERTMODE or ADDMODE */
221   MPI_Allreduce(&baij->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
222   if (addv == (ADD_VALUES|INSERT_VALUES)) {
223     SETERRQ(1,"MatAssemblyBegin_MPIBAIJ:Some processors inserted others added");
224   }
225   baij->insertmode = addv; /* in case this processor had no cache */
226 
227   /*  first count number of contributors to each processor */
228   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
229   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
230   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
231   for ( i=0; i<baij->stash.n; i++ ) {
232     idx = baij->stash.idx[i];
233     for ( j=0; j<size; j++ ) {
234       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
235         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
236       }
237     }
238   }
239   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
240 
241   /* inform other processors of number of messages and max length*/
242   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
243   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
244   nreceives = work[rank];
245   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
246   nmax = work[rank];
247   PetscFree(work);
248 
249   /* post receives:
250        1) each message will consist of ordered pairs
251      (global index,value) we store the global index as a double
252      to simplify the message passing.
253        2) since we don't know how long each individual message is we
254      allocate the largest needed buffer for each receive. Potentially
255      this is a lot of wasted space.
256 
257 
258        This could be done better.
259   */
260   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
261   CHKPTRQ(rvalues);
262   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
263   CHKPTRQ(recv_waits);
264   for ( i=0; i<nreceives; i++ ) {
265     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
266               comm,recv_waits+i);
267   }
268 
269   /* do sends:
270       1) starts[i] gives the starting index in svalues for stuff going to
271          the ith processor
272   */
273   svalues = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
274   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
275   CHKPTRQ(send_waits);
276   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
277   starts[0] = 0;
278   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
279   for ( i=0; i<baij->stash.n; i++ ) {
280     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
281     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
282     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
283   }
284   PetscFree(owner);
285   starts[0] = 0;
286   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
287   count = 0;
288   for ( i=0; i<size; i++ ) {
289     if (procs[i]) {
290       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
291                 comm,send_waits+count++);
292     }
293   }
294   PetscFree(starts); PetscFree(nprocs);
295 
296   /* Free cache space */
297   PLogInfo(0,"[%d]MatAssemblyBegin_MPIBAIJ:Number of off processor values %d\n",rank,baij->stash.n);
298   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
299 
300   baij->svalues    = svalues;    baij->rvalues    = rvalues;
301   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
302   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
303   baij->rmax       = nmax;
304 
305   return 0;
306 }
307 
308 
309 static int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
310 {
311   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
312   MPI_Status  *send_status,recv_status;
313   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
314   int         bs=baij->bs,row,col,other_disassembled;
315   Scalar      *values,val;
316   InsertMode  addv = baij->insertmode;
317 
318   /*  wait on receives */
319   while (count) {
320     MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);
321     /* unpack receives into our local space */
322     values = baij->rvalues + 3*imdex*baij->rmax;
323     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
324     n = n/3;
325     for ( i=0; i<n; i++ ) {
326       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
327       col = (int) PetscReal(values[3*i+1]);
328       val = values[3*i+2];
329       if (col >= baij->cstart*bs && col < baij->cend*bs) {
330         col -= baij->cstart*bs;
331         MatSetValues(baij->A,1,&row,1,&col,&val,addv);
332       }
333       else {
334         if (mat->was_assembled) {
335           if (!baij->colmap) {ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);}
336           col = baij->colmap[col/bs]*bs + col%bs;
337           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
338             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
339             col = (int) PetscReal(values[3*i+1]);
340           }
341         }
342         MatSetValues(baij->B,1,&row,1,&col,&val,addv);
343       }
344     }
345     count--;
346   }
347   PetscFree(baij->recv_waits); PetscFree(baij->rvalues);
348 
349   /* wait on sends */
350   if (baij->nsends) {
351     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));
352     CHKPTRQ(send_status);
353     MPI_Waitall(baij->nsends,baij->send_waits,send_status);
354     PetscFree(send_status);
355   }
356   PetscFree(baij->send_waits); PetscFree(baij->svalues);
357 
358   baij->insertmode = NOT_SET_VALUES;
359   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
360   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
361 
362   /* determine if any processor has disassembled, if so we must
363      also disassemble ourselfs, in order that we may reassemble. */
364   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
365   if (mat->was_assembled && !other_disassembled) {
366     ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
367   }
368 
369   if (!mat->was_assembled && mode == FINAL_ASSEMBLY) {
370     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
371   }
372   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
373   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
374 
375   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
376   return 0;
377 }
378 
379 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
380 {
381   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
382   int          ierr;
383 
384   if (baij->size == 1) {
385     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
386   }
387   else SETERRQ(1,"MatView_MPIBAIJ_Binary:Only uniprocessor output supported");
388   return 0;
389 }
390 
391 static int MatView_MPIBAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
392 {
393   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
394   int          ierr, format,rank,bs=baij->bs;
395   FILE         *fd;
396   ViewerType   vtype;
397 
398   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
399   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
400     ierr = ViewerGetFormat(viewer,&format);
401     if (format == ASCII_FORMAT_INFO_DETAILED) {
402       int nz, nzalloc, mem;
403       MPI_Comm_rank(mat->comm,&rank);
404       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
405       ierr = MatGetInfo(mat,MAT_LOCAL,&nz,&nzalloc,&mem);
406       PetscSequentialPhaseBegin(mat->comm,1);
407       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
408               rank,baij->m,nz*bs,nzalloc*bs,baij->bs,mem);
409       ierr = MatGetInfo(baij->A,MAT_LOCAL,&nz,&nzalloc,&mem);
410       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,nz*bs);
411       ierr = MatGetInfo(baij->B,MAT_LOCAL,&nz,&nzalloc,&mem);
412       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,nz*bs);
413       fflush(fd);
414       PetscSequentialPhaseEnd(mat->comm,1);
415       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
416       return 0;
417     }
418     else if (format == ASCII_FORMAT_INFO) {
419       return 0;
420     }
421   }
422 
423   if (vtype == DRAW_VIEWER) {
424     Draw       draw;
425     PetscTruth isnull;
426     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
427     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
428   }
429 
430   if (vtype == ASCII_FILE_VIEWER) {
431     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
432     PetscSequentialPhaseBegin(mat->comm,1);
433     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
434            baij->rank,baij->m,baij->rstart*bs,baij->rend*bs,baij->n,
435             baij->cstart*bs,baij->cend*bs);
436     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
437     ierr = MatView(baij->B,viewer); CHKERRQ(ierr);
438     fflush(fd);
439     PetscSequentialPhaseEnd(mat->comm,1);
440   }
441   else {
442     int size = baij->size;
443     rank = baij->rank;
444     if (size == 1) {
445       ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
446     }
447     else {
448       /* assemble the entire matrix onto first processor. */
449       Mat         A;
450       Mat_SeqBAIJ *Aloc;
451       int         M = baij->M, N = baij->N,*ai,*aj,row,col,i,j,k,*rvals;
452       int         mbs=baij->mbs;
453       Scalar      *a;
454 
455       if (!rank) {
456         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
457         CHKERRQ(ierr);
458       }
459       else {
460         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
461         CHKERRQ(ierr);
462       }
463       PLogObjectParent(mat,A);
464 
465       /* copy over the A part */
466       Aloc = (Mat_SeqBAIJ*) baij->A->data;
467       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
468       row = baij->rstart;
469       rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
470 
471       for ( i=0; i<mbs; i++ ) {
472         rvals[0] = bs*(baij->rstart + i);
473         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
474         for ( j=ai[i]; j<ai[i+1]; j++ ) {
475           col = (baij->cstart+aj[j])*bs;
476           for (k=0; k<bs; k++ ) {
477             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
478             col++; a += bs;
479           }
480         }
481       }
482       /* copy over the B part */
483       Aloc = (Mat_SeqBAIJ*) baij->B->data;
484       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
485       row = baij->rstart*bs;
486       for ( i=0; i<mbs; i++ ) {
487         rvals[0] = bs*(baij->rstart + i);
488         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
489         for ( j=ai[i]; j<ai[i+1]; j++ ) {
490           col = baij->garray[aj[j]]*bs;
491           for (k=0; k<bs; k++ ) {
492             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
493             col++; a += bs;
494           }
495         }
496       }
497       PetscFree(rvals);
498       ierr = MatAssemblyBegin(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
499       ierr = MatAssemblyEnd(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
500       if (!rank) {
501         ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
502       }
503       ierr = MatDestroy(A); CHKERRQ(ierr);
504     }
505   }
506   return 0;
507 }
508 
509 
510 
511 static int MatView_MPIBAIJ(PetscObject obj,Viewer viewer)
512 {
513   Mat         mat = (Mat) obj;
514   int         ierr;
515   ViewerType  vtype;
516 
517   if (!viewer) {
518     viewer = STDOUT_VIEWER_SELF;
519   }
520   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
521   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
522       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
523     ierr = MatView_MPIBAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
524   }
525   else if (vtype == BINARY_FILE_VIEWER) {
526     return MatView_MPIBAIJ_Binary(mat,viewer);
527   }
528   return 0;
529 }
530 
531 static int MatDestroy_MPIBAIJ(PetscObject obj)
532 {
533   Mat         mat = (Mat) obj;
534   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
535   int         ierr;
536 
537 #if defined(PETSC_LOG)
538   PLogObjectState(obj,"Rows=%d, Cols=%d",baij->M,baij->N);
539 #endif
540 
541   PetscFree(baij->rowners);
542   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
543   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
544   if (baij->colmap) PetscFree(baij->colmap);
545   if (baij->garray) PetscFree(baij->garray);
546   if (baij->lvec)   VecDestroy(baij->lvec);
547   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
548   if (baij->rowvalues) PetscFree(baij->rowvalues);
549   PetscFree(baij);
550   PLogObjectDestroy(mat);
551   PetscHeaderDestroy(mat);
552   return 0;
553 }
554 
555 static int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
556 {
557   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
558   int        ierr;
559 
560   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx); CHKERRQ(ierr);
561   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
562   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx); CHKERRQ(ierr);
563   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
564   return 0;
565 }
566 
567 static int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
568 {
569   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
570   int        ierr;
571   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
572   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
573   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
574   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
575   return 0;
576 }
577 
578 static int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
579 {
580   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
581   int        ierr;
582 
583   /* do nondiagonal part */
584   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
585   /* send it on its way */
586   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,
587                 (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
588   /* do local part */
589   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
590   /* receive remote parts: note this assumes the values are not actually */
591   /* inserted in yy until the next line, which is true for my implementation*/
592   /* but is not perhaps always true. */
593   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,
594                   (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
595   return 0;
596 }
597 
598 static int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
599 {
600   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
601   int        ierr;
602 
603   /* do nondiagonal part */
604   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
605   /* send it on its way */
606   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,
607                  (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
608   /* do local part */
609   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
610   /* receive remote parts: note this assumes the values are not actually */
611   /* inserted in yy until the next line, which is true for my implementation*/
612   /* but is not perhaps always true. */
613   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,
614                   (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
615   return 0;
616 }
617 
618 /*
619   This only works correctly for square matrices where the subblock A->A is the
620    diagonal block
621 */
622 static int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
623 {
624   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
625   if (a->M != a->N)
626     SETERRQ(1,"MatGetDiagonal_MPIBAIJ:Supports only square matrix where A->A is diag block");
627   return MatGetDiagonal(a->A,v);
628 }
629 
630 static int MatScale_MPIBAIJ(Scalar *aa,Mat A)
631 {
632   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
633   int        ierr;
634   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
635   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
636   return 0;
637 }
638 static int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
639 {
640   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
641   *m = mat->M; *n = mat->N;
642   return 0;
643 }
644 
645 static int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
646 {
647   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
648   *m = mat->m; *n = mat->N;
649   return 0;
650 }
651 
652 static int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
653 {
654   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
655   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
656   return 0;
657 }
658 
659 /* -------------------------------------------------------------------*/
660 static struct _MatOps MatOps = {
661   MatSetValues_MPIBAIJ,0,0,MatMult_MPIBAIJ,
662   MatMultAdd_MPIBAIJ,MatMultTrans_MPIBAIJ,MatMultTransAdd_MPIBAIJ,0,
663   0,0,0,0,
664   0,0,0,0,
665   0,MatGetDiagonal_MPIBAIJ,0,0,
666   MatAssemblyBegin_MPIBAIJ,MatAssemblyEnd_MPIBAIJ,0,0,
667   0,0,MatGetReordering_MPIBAIJ,0,
668   0,0,0,MatGetSize_MPIBAIJ,
669   MatGetLocalSize_MPIBAIJ,MatGetOwnershipRange_MPIBAIJ,0,0,
670   0,0,0,0,
671   0,0,0,0,
672   0,0,0,0,
673   0,0,0,0,
674   MatScale_MPIBAIJ,0,0};
675 
676 
677 /*@C
678    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
679    (block compressed row).  For good matrix assembly performance
680    the user should preallocate the matrix storage by setting the parameters
681    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
682    performance can be increased by more than a factor of 50.
683 
684    Input Parameters:
685 .  comm - MPI communicator
686 .  bs   - size of blockk
687 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
688 .  n - number of local columns (or PETSC_DECIDE to have calculated
689            if N is given)
690 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
691 .  N - number of global columns (or PETSC_DECIDE to have calculated
692            if n is given)
693 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
694            submatrix  (same for all local rows)
695 .  d_nzz - number of block nonzeros per block row in diagonal portion of local
696            submatrix or null (possibly different for each row).  You must leave
697            room for the diagonal entry even if it is zero.
698 .  o_nz  - number of block nonzeros per block row in off-diagonal portion of local
699            submatrix (same for all local rows).
700 .  o_nzz - number of block nonzeros per block row in off-diagonal portion of local
701            submatrix or null (possibly different for each row).
702 
703    Output Parameter:
704 .  A - the matrix
705 
706    Notes:
707    The user MUST specify either the local or global matrix dimensions
708    (possibly both).
709 
710    Storage Information:
711    For a square global matrix we define each processor's diagonal portion
712    to be its local rows and the corresponding columns (a square submatrix);
713    each processor's off-diagonal portion encompasses the remainder of the
714    local matrix (a rectangular submatrix).
715 
716    The user can specify preallocated storage for the diagonal part of
717    the local submatrix with either d_nz or d_nnz (not both).  Set
718    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
719    memory allocation.  Likewise, specify preallocated storage for the
720    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
721 
722    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
723    the figure below we depict these three local rows and all columns (0-11).
724 
725 $          0 1 2 3 4 5 6 7 8 9 10 11
726 $         -------------------
727 $  row 3  |  o o o d d d o o o o o o
728 $  row 4  |  o o o d d d o o o o o o
729 $  row 5  |  o o o d d d o o o o o o
730 $         -------------------
731 $
732 
733    Thus, any entries in the d locations are stored in the d (diagonal)
734    submatrix, and any entries in the o locations are stored in the
735    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
736    stored simply in the MATSEQBAIJ format for compressed row storage.
737 
738    Now d_nz should indicate the number of nonzeros per row in the d matrix,
739    and o_nz should indicate the number of nonzeros per row in the o matrix.
740    In general, for PDE problems in which most nonzeros are near the diagonal,
741    one expects d_nz >> o_nz.   For additional details, see the users manual
742    chapter on matrices and the file $(PETSC_DIR)/Performance.
743 
744 .keywords: matrix, aij, compressed row, sparse, parallel
745 
746 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues()
747 @*/
748 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
749                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
750 {
751   Mat          B;
752   Mat_MPIBAIJ  *b;
753   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE;
754 
755   if (bs < 1) SETERRQ(1,"MatCreateMPIBAIJ: invalid block size specified");
756   *A = 0;
757   PetscHeaderCreate(B,_Mat,MAT_COOKIE,MATMPIBAIJ,comm);
758   PLogObjectCreate(B);
759   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
760   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
761   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
762   B->destroy    = MatDestroy_MPIBAIJ;
763   B->view       = MatView_MPIBAIJ;
764 
765   B->factor     = 0;
766   B->assembled  = PETSC_FALSE;
767 
768   b->insertmode = NOT_SET_VALUES;
769   MPI_Comm_rank(comm,&b->rank);
770   MPI_Comm_size(comm,&b->size);
771 
772   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
773     SETERRQ(1,"MatCreateMPIBAIJ:Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
774   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) SETERRQ(1,"MatCreateMPIBAIJ: either M or m should be specified");
775   if ( M == PETSC_DECIDE && n == PETSC_DECIDE)SETERRQ(1,"MatCreateMPIBAIJ: either N or n should be specified");
776   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
777   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
778 
779   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
780     work[0] = m; work[1] = n;
781     mbs = m/bs; nbs = n/bs;
782     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
783     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
784     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
785   }
786   if (m == PETSC_DECIDE) {
787     Mbs = M/bs;
788     if (Mbs*bs != M) SETERRQ(1,"MatCreateMPIBAIJ: No of global rows must be divisible by blocksize");
789     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
790     m   = mbs*bs;
791   }
792   if (n == PETSC_DECIDE) {
793     Nbs = N/bs;
794     if (Nbs*bs != N) SETERRQ(1,"MatCreateMPIBAIJ: No of global cols must be divisible by blocksize");
795     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
796     n   = nbs*bs;
797   }
798   if (mbs*bs != m || nbs*bs != n) SETERRQ(1,"MatCreateMPIBAIJ: No of local rows, cols must be divisible by blocksize");
799 
800   b->m = m; B->m = m;
801   b->n = n; B->n = n;
802   b->N = N; B->N = N;
803   b->M = M; B->M = M;
804   b->bs  = bs;
805   b->bs2 = bs*bs;
806   b->mbs = mbs;
807   b->nbs = nbs;
808   b->Mbs = Mbs;
809   b->Nbs = Nbs;
810 
811   /* build local table of row and column ownerships */
812   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
813   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
814   b->cowners = b->rowners + b->size + 1;
815   MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
816   b->rowners[0] = 0;
817   for ( i=2; i<=b->size; i++ ) {
818     b->rowners[i] += b->rowners[i-1];
819   }
820   b->rstart = b->rowners[b->rank];
821   b->rend   = b->rowners[b->rank+1];
822   MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
823   b->cowners[0] = 0;
824   for ( i=2; i<=b->size; i++ ) {
825     b->cowners[i] += b->cowners[i-1];
826   }
827   b->cstart = b->cowners[b->rank];
828   b->cend   = b->cowners[b->rank+1];
829 
830   if (d_nz == PETSC_DEFAULT) d_nz = 5;
831   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
832   PLogObjectParent(B,b->A);
833   if (o_nz == PETSC_DEFAULT) o_nz = 0;
834   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
835   PLogObjectParent(B,b->B);
836 
837   /* build cache for off array entries formed */
838   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
839   b->colmap      = 0;
840   b->garray      = 0;
841   b->roworiented = 1;
842 
843   /* stuff used for matrix vector multiply */
844   b->lvec      = 0;
845   b->Mvctx     = 0;
846 
847   /* stuff for MatGetRow() */
848   b->rowindices   = 0;
849   b->rowvalues    = 0;
850   b->getrowactive = PETSC_FALSE;
851 
852   *A = B;
853   return 0;
854 }
855 
856 #include "sys.h"
857 
858 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
859 {
860   Mat          A;
861   int          i, nz, ierr, j,rstart, rend, fd;
862   Scalar       *vals,*buf;
863   MPI_Comm     comm = ((PetscObject)viewer)->comm;
864   MPI_Status   status;
865   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
866   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
867   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,bs2,Mbs,mbs,extra_rows;
868   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
869   int          dcount,kmax,k,nzcount,tmp;
870 
871 
872   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
873   bs2  = bs*bs;
874 
875   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
876   if (!rank) {
877     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
878     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
879     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIBAIJ:not matrix object");
880   }
881 
882   MPI_Bcast(header+1,3,MPI_INT,0,comm);
883   M = header[1]; N = header[2];
884 
885 
886   if (M != N) SETERRQ(1,"MatLoad_SeqBAIJ:Can only do square matrices");
887 
888   /*
889      This code adds extra rows to make sure the number of rows is
890      divisible by the blocksize
891   */
892   Mbs        = M/bs;
893   extra_rows = bs - M + bs*(Mbs);
894   if (extra_rows == bs) extra_rows = 0;
895   else                  Mbs++;
896   if (extra_rows &&!rank) {
897     PLogInfo(0,"MatLoad_SeqBAIJ:Padding loaded matrix to match blocksize");
898   }
899   /* determine ownership of all rows */
900   mbs = Mbs/size + ((Mbs % size) > rank);
901   m   = mbs * bs;
902   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
903   browners = rowners + size + 1;
904   MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);
905   rowners[0] = 0;
906   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
907   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
908   rstart = rowners[rank];
909   rend   = rowners[rank+1];
910 
911   /* distribute row lengths to all processors */
912   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
913   if (!rank) {
914     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
915     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
916     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
917     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
918     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
919     MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);
920     PetscFree(sndcounts);
921   }
922   else {
923     MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);
924   }
925 
926   if (!rank) {
927     /* calculate the number of nonzeros on each processor */
928     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
929     PetscMemzero(procsnz,size*sizeof(int));
930     for ( i=0; i<size; i++ ) {
931       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
932         procsnz[i] += rowlengths[j];
933       }
934     }
935     PetscFree(rowlengths);
936 
937     /* determine max buffer needed and allocate it */
938     maxnz = 0;
939     for ( i=0; i<size; i++ ) {
940       maxnz = PetscMax(maxnz,procsnz[i]);
941     }
942     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
943 
944     /* read in my part of the matrix column indices  */
945     nz = procsnz[0];
946     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
947     mycols = ibuf;
948     if (size == 1)  nz -= extra_rows;
949     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
950     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
951 
952     /* read in every ones (except the last) and ship off */
953     for ( i=1; i<size-1; i++ ) {
954       nz = procsnz[i];
955       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
956       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
957     }
958     /* read in the stuff for the last proc */
959     if ( size != 1 ) {
960       nz = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
961       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
962       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
963       MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);
964     }
965     PetscFree(cols);
966   }
967   else {
968     /* determine buffer space needed for message */
969     nz = 0;
970     for ( i=0; i<m; i++ ) {
971       nz += locrowlens[i];
972     }
973     ibuf = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
974     mycols = ibuf;
975     /* receive message of column indices*/
976     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
977     MPI_Get_count(&status,MPI_INT,&maxnz);
978     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIBAIJ:something is wrong with file");
979   }
980 
981   /* loop over local rows, determining number of off diagonal entries */
982   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
983   odlens = dlens + (rend-rstart);
984   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
985   PetscMemzero(mask,3*Mbs*sizeof(int));
986   masked1 = mask    + Mbs;
987   masked2 = masked1 + Mbs;
988   rowcount = 0; nzcount = 0;
989   for ( i=0; i<mbs; i++ ) {
990     dcount  = 0;
991     odcount = 0;
992     for ( j=0; j<bs; j++ ) {
993       kmax = locrowlens[rowcount];
994       for ( k=0; k<kmax; k++ ) {
995         tmp = mycols[nzcount++]/bs;
996         if (!mask[tmp]) {
997           mask[tmp] = 1;
998           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
999           else masked1[dcount++] = tmp;
1000         }
1001       }
1002       rowcount++;
1003     }
1004 
1005     dlens[i]  = dcount;
1006     odlens[i] = odcount;
1007 
1008     /* zero out the mask elements we set */
1009     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
1010     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
1011   }
1012 
1013   /* create our matrix */
1014   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);CHKERRQ(ierr);
1015   A = *newmat;
1016   MatSetOption(A,COLUMNS_SORTED);
1017 
1018   if (!rank) {
1019     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
1020     /* read in my part of the matrix numerical values  */
1021     nz = procsnz[0];
1022     vals = buf;
1023     mycols = ibuf;
1024     if (size == 1)  nz -= extra_rows;
1025     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1026     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
1027     /* insert into matrix */
1028     jj      = rstart*bs;
1029     for ( i=0; i<m; i++ ) {
1030       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1031       mycols += locrowlens[i];
1032       vals   += locrowlens[i];
1033       jj++;
1034     }
1035     /* read in other processors( except the last one) and ship out */
1036     for ( i=1; i<size-1; i++ ) {
1037       nz = procsnz[i];
1038       vals = buf;
1039       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1040       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1041     }
1042     /* the last proc */
1043     if ( size != 1 ){
1044       nz = procsnz[i] - extra_rows;
1045       vals = buf;
1046       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1047       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
1048       MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);
1049     }
1050     PetscFree(procsnz);
1051   }
1052   else {
1053     /* receive numeric values */
1054     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
1055 
1056     /* receive message of values*/
1057     vals = buf;
1058     mycols = ibuf;
1059     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1060     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1061     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIBAIJ:something is wrong with file");
1062 
1063     /* insert into matrix */
1064     jj      = rstart*bs;
1065     for ( i=0; i<m; i++ ) {
1066       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1067       mycols += locrowlens[i];
1068       vals   += locrowlens[i];
1069       jj++;
1070     }
1071   }
1072   PetscFree(locrowlens);
1073   PetscFree(buf);
1074   PetscFree(ibuf);
1075   PetscFree(rowners);
1076   PetscFree(dlens);
1077   PetscFree(mask);
1078   ierr = MatAssemblyBegin(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
1079   ierr = MatAssemblyEnd(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
1080   return 0;
1081 }
1082 
1083 
1084