xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision 4e220ebcc634eb0c8ccd78bed049803c9194c4d4)
1 #ifndef lint
2 static char vcid[] = "$Id: mpibaij.c,v 1.22 1996/08/15 12:48:12 bsmith Exp curfman $";
3 #endif
4 
5 #include "src/mat/impls/baij/mpi/mpibaij.h"
6 #include "src/vec/vecimpl.h"
7 
8 #include "draw.h"
9 #include "pinclude/pviewer.h"
10 
11 extern int MatSetUpMultiply_MPIBAIJ(Mat);
12 extern int DisAssemble_MPIBAIJ(Mat);
13 extern int MatIncreaseOverlap_MPIBAIJ(Mat,int,IS *,int);
14 extern int MatGetSubMatrices_MPIBAIJ(Mat,int,IS *,IS *,MatGetSubMatrixCall,Mat **);
15 extern int MatLUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,Mat *);
16 extern int MatLUFactorNumeric_MPIBAIJ(Mat,Mat *);
17 extern int MatLUFactor_MPIBAIJ(Mat,IS,IS,double);
18 extern int MatSolve_MPIBAIJ(Mat,Vec,Vec);
19 extern int MatSolveAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
20 extern int MatSolveTrans_MPIBAIJ(Mat,Vec,Vec);
21 extern int MatSolveTransAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
22 extern int MatILUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,int,Mat *);
23 
24 /*
25      Local utility routine that creates a mapping from the global column
26    number to the local number in the off-diagonal part of the local
27    storage of the matrix.  This is done in a non scable way since the
28    length of colmap equals the global matrix length.
29 */
30 static int CreateColmap_MPIBAIJ_Private(Mat mat)
31 {
32   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
33   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
34   int         nbs = B->nbs,i;
35 
36   baij->colmap = (int *) PetscMalloc(baij->Nbs*sizeof(int));CHKPTRQ(baij->colmap);
37   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
38   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
39   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i + 1;
40   return 0;
41 }
42 
43 
44 static int MatGetReordering_MPIBAIJ(Mat mat,MatReordering type,IS *rperm,IS *cperm)
45 {
46   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
47   int         ierr;
48   if (baij->size == 1) {
49     ierr = MatGetReordering(baij->A,type,rperm,cperm); CHKERRQ(ierr);
50   } else SETERRQ(1,"MatGetReordering_MPIBAIJ:not supported in parallel");
51   return 0;
52 }
53 
54 static int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
55 {
56   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
57   Scalar      value;
58   int         ierr,i,j, rstart = baij->rstart, rend = baij->rend;
59   int         cstart = baij->cstart, cend = baij->cend,row,col;
60   int         roworiented = baij->roworiented,rstart_orig,rend_orig;
61   int         cstart_orig,cend_orig,bs=baij->bs;
62 
63   if (baij->insertmode != NOT_SET_VALUES && baij->insertmode != addv) {
64     SETERRQ(1,"MatSetValues_MPIBAIJ:Cannot mix inserts and adds");
65   }
66   baij->insertmode = addv;
67   rstart_orig = rstart*bs;
68   rend_orig   = rend*bs;
69   cstart_orig = cstart*bs;
70   cend_orig   = cend*bs;
71   for ( i=0; i<m; i++ ) {
72     if (im[i] < 0) SETERRQ(1,"MatSetValues_MPIBAIJ:Negative row");
73     if (im[i] >= baij->M) SETERRQ(1,"MatSetValues_MPIBAIJ:Row too large");
74     if (im[i] >= rstart_orig && im[i] < rend_orig) {
75       row = im[i] - rstart_orig;
76       for ( j=0; j<n; j++ ) {
77         if (in[j] < 0) SETERRQ(1,"MatSetValues_MPIBAIJ:Negative column");
78         if (in[j] >= baij->N) SETERRQ(1,"MatSetValues_MPIBAIJ:Col too large");
79         if (in[j] >= cstart_orig && in[j] < cend_orig){
80           col = in[j] - cstart_orig;
81           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
82           ierr = MatSetValues(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
83         }
84         else {
85           if (mat->was_assembled) {
86             if (!baij->colmap) {
87               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
88             }
89             col = baij->colmap[in[j]/bs] - 1 + in[j]%bs;
90             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
91               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
92               col =  in[j];
93             }
94           }
95           else col = in[j];
96           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
97           ierr = MatSetValues(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
98         }
99       }
100     }
101     else {
102       if (roworiented) {
103         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
104       }
105       else {
106         row = im[i];
107         for ( j=0; j<n; j++ ) {
108           ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
109         }
110       }
111     }
112   }
113   return 0;
114 }
115 
116 static int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
117 {
118   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
119   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
120   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col;
121 
122   for ( i=0; i<m; i++ ) {
123     if (idxm[i] < 0) SETERRQ(1,"MatGetValues_MPIBAIJ:Negative row");
124     if (idxm[i] >= baij->M) SETERRQ(1,"MatGetValues_MPIBAIJ:Row too large");
125     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
126       row = idxm[i] - bsrstart;
127       for ( j=0; j<n; j++ ) {
128         if (idxn[j] < 0) SETERRQ(1,"MatGetValues_MPIBAIJ:Negative column");
129         if (idxn[j] >= baij->N) SETERRQ(1,"MatGetValues_MPIBAIJ:Col too large");
130         if (idxn[j] >= bscstart && idxn[j] < bscend){
131           col = idxn[j] - bscstart;
132           ierr = MatGetValues(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
133         }
134         else {
135           if (!baij->colmap) {
136             ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
137           }
138           if((baij->colmap[idxn[j]/bs]-1 < 0) ||
139              (baij->garray[baij->colmap[idxn[j]/bs]-1] != idxn[j]/bs)) *(v+i*n+j) = 0.0;
140           else {
141             col  = (baij->colmap[idxn[j]/bs]-1)*bs + idxn[j]%bs;
142             ierr = MatGetValues(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
143           }
144         }
145       }
146     }
147     else {
148       SETERRQ(1,"MatGetValues_MPIBAIJ:Only local values currently supported");
149     }
150   }
151   return 0;
152 }
153 
154 static int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
155 {
156   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
157   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
158   int        ierr, i,bs2=baij->bs2;
159   double     sum = 0.0;
160   Scalar     *v;
161 
162   if (baij->size == 1) {
163     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
164   } else {
165     if (type == NORM_FROBENIUS) {
166       v = amat->a;
167       for (i=0; i<amat->nz*bs2; i++ ) {
168 #if defined(PETSC_COMPLEX)
169         sum += real(conj(*v)*(*v)); v++;
170 #else
171         sum += (*v)*(*v); v++;
172 #endif
173       }
174       v = bmat->a;
175       for (i=0; i<bmat->nz*bs2; i++ ) {
176 #if defined(PETSC_COMPLEX)
177         sum += real(conj(*v)*(*v)); v++;
178 #else
179         sum += (*v)*(*v); v++;
180 #endif
181       }
182       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
183       *norm = sqrt(*norm);
184     }
185     else
186       SETERRQ(1,"MatNorm_SeqBAIJ:No support for this norm yet");
187   }
188   return 0;
189 }
190 
191 static int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
192 {
193   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
194   MPI_Comm    comm = mat->comm;
195   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
196   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
197   MPI_Request *send_waits,*recv_waits;
198   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
199   InsertMode  addv;
200   Scalar      *rvalues,*svalues;
201 
202   /* make sure all processors are either in INSERTMODE or ADDMODE */
203   MPI_Allreduce(&baij->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
204   if (addv == (ADD_VALUES|INSERT_VALUES)) {
205     SETERRQ(1,"MatAssemblyBegin_MPIBAIJ:Some processors inserted others added");
206   }
207   baij->insertmode = addv; /* in case this processor had no cache */
208 
209   /*  first count number of contributors to each processor */
210   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
211   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
212   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
213   for ( i=0; i<baij->stash.n; i++ ) {
214     idx = baij->stash.idx[i];
215     for ( j=0; j<size; j++ ) {
216       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
217         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
218       }
219     }
220   }
221   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
222 
223   /* inform other processors of number of messages and max length*/
224   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
225   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
226   nreceives = work[rank];
227   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
228   nmax = work[rank];
229   PetscFree(work);
230 
231   /* post receives:
232        1) each message will consist of ordered pairs
233      (global index,value) we store the global index as a double
234      to simplify the message passing.
235        2) since we don't know how long each individual message is we
236      allocate the largest needed buffer for each receive. Potentially
237      this is a lot of wasted space.
238 
239 
240        This could be done better.
241   */
242   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
243   CHKPTRQ(rvalues);
244   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
245   CHKPTRQ(recv_waits);
246   for ( i=0; i<nreceives; i++ ) {
247     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
248               comm,recv_waits+i);
249   }
250 
251   /* do sends:
252       1) starts[i] gives the starting index in svalues for stuff going to
253          the ith processor
254   */
255   svalues = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
256   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
257   CHKPTRQ(send_waits);
258   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
259   starts[0] = 0;
260   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
261   for ( i=0; i<baij->stash.n; i++ ) {
262     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
263     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
264     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
265   }
266   PetscFree(owner);
267   starts[0] = 0;
268   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
269   count = 0;
270   for ( i=0; i<size; i++ ) {
271     if (procs[i]) {
272       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
273                 comm,send_waits+count++);
274     }
275   }
276   PetscFree(starts); PetscFree(nprocs);
277 
278   /* Free cache space */
279   PLogInfo(0,"[%d]MatAssemblyBegin_MPIBAIJ:Number of off processor values %d\n",rank,baij->stash.n);
280   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
281 
282   baij->svalues    = svalues;    baij->rvalues    = rvalues;
283   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
284   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
285   baij->rmax       = nmax;
286 
287   return 0;
288 }
289 
290 
291 static int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
292 {
293   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
294   MPI_Status  *send_status,recv_status;
295   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
296   int         bs=baij->bs,row,col,other_disassembled;
297   Scalar      *values,val;
298   InsertMode  addv = baij->insertmode;
299 
300   /*  wait on receives */
301   while (count) {
302     MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);
303     /* unpack receives into our local space */
304     values = baij->rvalues + 3*imdex*baij->rmax;
305     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
306     n = n/3;
307     for ( i=0; i<n; i++ ) {
308       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
309       col = (int) PetscReal(values[3*i+1]);
310       val = values[3*i+2];
311       if (col >= baij->cstart*bs && col < baij->cend*bs) {
312         col -= baij->cstart*bs;
313         MatSetValues(baij->A,1,&row,1,&col,&val,addv);
314       }
315       else {
316         if (mat->was_assembled) {
317           if (!baij->colmap) {
318             ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);
319           }
320           col = (baij->colmap[col/bs]-1)*bs + col%bs;
321           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
322             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
323             col = (int) PetscReal(values[3*i+1]);
324           }
325         }
326         MatSetValues(baij->B,1,&row,1,&col,&val,addv);
327       }
328     }
329     count--;
330   }
331   PetscFree(baij->recv_waits); PetscFree(baij->rvalues);
332 
333   /* wait on sends */
334   if (baij->nsends) {
335     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));
336     CHKPTRQ(send_status);
337     MPI_Waitall(baij->nsends,baij->send_waits,send_status);
338     PetscFree(send_status);
339   }
340   PetscFree(baij->send_waits); PetscFree(baij->svalues);
341 
342   baij->insertmode = NOT_SET_VALUES;
343   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
344   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
345 
346   /* determine if any processor has disassembled, if so we must
347      also disassemble ourselfs, in order that we may reassemble. */
348   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
349   if (mat->was_assembled && !other_disassembled) {
350     ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
351   }
352 
353   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
354     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
355   }
356   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
357   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
358 
359   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
360   return 0;
361 }
362 
363 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
364 {
365   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
366   int          ierr;
367 
368   if (baij->size == 1) {
369     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
370   }
371   else SETERRQ(1,"MatView_MPIBAIJ_Binary:Only uniprocessor output supported");
372   return 0;
373 }
374 
375 static int MatView_MPIBAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
376 {
377   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
378   int          ierr, format,rank,bs=baij->bs;
379   FILE         *fd;
380   ViewerType   vtype;
381 
382   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
383   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
384     ierr = ViewerGetFormat(viewer,&format);
385     if (format == ASCII_FORMAT_INFO_DETAILED) {
386       MatInfo info;
387       MPI_Comm_rank(mat->comm,&rank);
388       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
389       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
390       PetscSequentialPhaseBegin(mat->comm,1);
391       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
392               rank,baij->m,(int)info.nz_used*bs,(int)info.nz_allocated*bs,
393               baij->bs,(int)info.memory);
394       ierr = MatGetInfo(baij->A,MAT_LOCAL,&info);
395       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
396       ierr = MatGetInfo(baij->B,MAT_LOCAL,&info);
397       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
398       fflush(fd);
399       PetscSequentialPhaseEnd(mat->comm,1);
400       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
401       return 0;
402     }
403     else if (format == ASCII_FORMAT_INFO) {
404       return 0;
405     }
406   }
407 
408   if (vtype == DRAW_VIEWER) {
409     Draw       draw;
410     PetscTruth isnull;
411     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
412     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
413   }
414 
415   if (vtype == ASCII_FILE_VIEWER) {
416     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
417     PetscSequentialPhaseBegin(mat->comm,1);
418     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
419            baij->rank,baij->m,baij->rstart*bs,baij->rend*bs,baij->n,
420             baij->cstart*bs,baij->cend*bs);
421     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
422     ierr = MatView(baij->B,viewer); CHKERRQ(ierr);
423     fflush(fd);
424     PetscSequentialPhaseEnd(mat->comm,1);
425   }
426   else {
427     int size = baij->size;
428     rank = baij->rank;
429     if (size == 1) {
430       ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
431     }
432     else {
433       /* assemble the entire matrix onto first processor. */
434       Mat         A;
435       Mat_SeqBAIJ *Aloc;
436       int         M = baij->M, N = baij->N,*ai,*aj,row,col,i,j,k,*rvals;
437       int         mbs=baij->mbs;
438       Scalar      *a;
439 
440       if (!rank) {
441         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
442         CHKERRQ(ierr);
443       }
444       else {
445         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
446         CHKERRQ(ierr);
447       }
448       PLogObjectParent(mat,A);
449 
450       /* copy over the A part */
451       Aloc = (Mat_SeqBAIJ*) baij->A->data;
452       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
453       row = baij->rstart;
454       rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
455 
456       for ( i=0; i<mbs; i++ ) {
457         rvals[0] = bs*(baij->rstart + i);
458         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
459         for ( j=ai[i]; j<ai[i+1]; j++ ) {
460           col = (baij->cstart+aj[j])*bs;
461           for (k=0; k<bs; k++ ) {
462             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
463             col++; a += bs;
464           }
465         }
466       }
467       /* copy over the B part */
468       Aloc = (Mat_SeqBAIJ*) baij->B->data;
469       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
470       row = baij->rstart*bs;
471       for ( i=0; i<mbs; i++ ) {
472         rvals[0] = bs*(baij->rstart + i);
473         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
474         for ( j=ai[i]; j<ai[i+1]; j++ ) {
475           col = baij->garray[aj[j]]*bs;
476           for (k=0; k<bs; k++ ) {
477             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
478             col++; a += bs;
479           }
480         }
481       }
482       PetscFree(rvals);
483       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
484       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
485       if (!rank) {
486         ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
487       }
488       ierr = MatDestroy(A); CHKERRQ(ierr);
489     }
490   }
491   return 0;
492 }
493 
494 
495 
496 static int MatView_MPIBAIJ(PetscObject obj,Viewer viewer)
497 {
498   Mat         mat = (Mat) obj;
499   int         ierr;
500   ViewerType  vtype;
501 
502   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
503   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
504       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
505     ierr = MatView_MPIBAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
506   }
507   else if (vtype == BINARY_FILE_VIEWER) {
508     return MatView_MPIBAIJ_Binary(mat,viewer);
509   }
510   return 0;
511 }
512 
513 static int MatDestroy_MPIBAIJ(PetscObject obj)
514 {
515   Mat         mat = (Mat) obj;
516   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
517   int         ierr;
518 
519 #if defined(PETSC_LOG)
520   PLogObjectState(obj,"Rows=%d, Cols=%d",baij->M,baij->N);
521 #endif
522 
523   PetscFree(baij->rowners);
524   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
525   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
526   if (baij->colmap) PetscFree(baij->colmap);
527   if (baij->garray) PetscFree(baij->garray);
528   if (baij->lvec)   VecDestroy(baij->lvec);
529   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
530   if (baij->rowvalues) PetscFree(baij->rowvalues);
531   PetscFree(baij);
532   PLogObjectDestroy(mat);
533   PetscHeaderDestroy(mat);
534   return 0;
535 }
536 
537 static int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
538 {
539   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
540   int         ierr, nt;
541 
542   VecGetLocalSize_Fast(xx,nt);
543   if (nt != a->n) {
544     SETERRQ(1,"MatMult_MPIBAIJ:Incompatible parition of A and xx");
545   }
546   VecGetLocalSize_Fast(yy,nt);
547   if (nt != a->m) {
548     SETERRQ(1,"MatMult_MPIBAIJ:Incompatible parition of A and yy");
549   }
550   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
551   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
552   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
553   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
554   return 0;
555 }
556 
557 static int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
558 {
559   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
560   int        ierr;
561   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
562   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
563   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
564   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
565   return 0;
566 }
567 
568 static int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
569 {
570   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
571   int        ierr;
572 
573   /* do nondiagonal part */
574   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
575   /* send it on its way */
576   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
577   /* do local part */
578   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
579   /* receive remote parts: note this assumes the values are not actually */
580   /* inserted in yy until the next line, which is true for my implementation*/
581   /* but is not perhaps always true. */
582   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,
583                   (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx);CHKERRQ(ierr);
584   return 0;
585 }
586 
587 static int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
588 {
589   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
590   int        ierr;
591 
592   /* do nondiagonal part */
593   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
594   /* send it on its way */
595   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
596   /* do local part */
597   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
598   /* receive remote parts: note this assumes the values are not actually */
599   /* inserted in yy until the next line, which is true for my implementation*/
600   /* but is not perhaps always true. */
601   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
602   return 0;
603 }
604 
605 /*
606   This only works correctly for square matrices where the subblock A->A is the
607    diagonal block
608 */
609 static int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
610 {
611   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
612   if (a->M != a->N)
613     SETERRQ(1,"MatGetDiagonal_MPIBAIJ:Supports only square matrix where A->A is diag block");
614   return MatGetDiagonal(a->A,v);
615 }
616 
617 static int MatScale_MPIBAIJ(Scalar *aa,Mat A)
618 {
619   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
620   int        ierr;
621   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
622   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
623   return 0;
624 }
625 static int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
626 {
627   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
628   *m = mat->M; *n = mat->N;
629   return 0;
630 }
631 
632 static int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
633 {
634   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
635   *m = mat->m; *n = mat->N;
636   return 0;
637 }
638 
639 static int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
640 {
641   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
642   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
643   return 0;
644 }
645 
646 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
647 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
648 
649 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
650 {
651   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
652   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
653   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
654   int        nztot, nzA, nzB, lrow, brstart = mat->rstart*bs, brend = mat->rend*bs;
655   int        *cmap, *idx_p,cstart = mat->cstart;
656 
657   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,"MatGetRow_MPIBAIJ:Already active");
658   mat->getrowactive = PETSC_TRUE;
659 
660   if (!mat->rowvalues && (idx || v)) {
661     /*
662         allocate enough space to hold information from the longest row.
663     */
664     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
665     int     max = 1,mbs = mat->mbs,tmp;
666     for ( i=0; i<mbs; i++ ) {
667       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
668       if (max < tmp) { max = tmp; }
669     }
670     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
671     CHKPTRQ(mat->rowvalues);
672     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
673   }
674 
675 
676   if (row < brstart || row >= brend) SETERRQ(1,"MatGetRow_MPIBAIJ:Only local rows")
677   lrow = row - brstart;
678 
679   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
680   if (!v)   {pvA = 0; pvB = 0;}
681   if (!idx) {pcA = 0; if (!v) pcB = 0;}
682   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
683   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
684   nztot = nzA + nzB;
685 
686   cmap  = mat->garray;
687   if (v  || idx) {
688     if (nztot) {
689       /* Sort by increasing column numbers, assuming A and B already sorted */
690       int imark = -1;
691       if (v) {
692         *v = v_p = mat->rowvalues;
693         for ( i=0; i<nzB; i++ ) {
694           if (cmap[cworkB[i]/bs] < cstart)   v_p[i] = vworkB[i];
695           else break;
696         }
697         imark = i;
698         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
699         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
700       }
701       if (idx) {
702         *idx = idx_p = mat->rowindices;
703         if (imark > -1) {
704           for ( i=0; i<imark; i++ ) {
705             idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs;
706           }
707         } else {
708           for ( i=0; i<nzB; i++ ) {
709             if (cmap[cworkB[i]/bs] < cstart)
710               idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
711             else break;
712           }
713           imark = i;
714         }
715         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart*bs + cworkA[i];
716         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
717       }
718     }
719     else {
720       if (idx) *idx = 0;
721       if (v)   *v   = 0;
722     }
723   }
724   *nz = nztot;
725   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
726   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
727   return 0;
728 }
729 
730 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
731 {
732   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
733   if (baij->getrowactive == PETSC_FALSE) {
734     SETERRQ(1,"MatRestoreRow_MPIBAIJ:MatGetRow not called");
735   }
736   baij->getrowactive = PETSC_FALSE;
737   return 0;
738 }
739 
740 static int MatGetBlockSize_MPIBAIJ(Mat mat,int *bs)
741 {
742   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
743   *bs = baij->bs;
744   return 0;
745 }
746 
747 static int MatZeroEntries_MPIBAIJ(Mat A)
748 {
749   Mat_MPIBAIJ *l = (Mat_MPIBAIJ *) A->data;
750   int         ierr;
751   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
752   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
753   return 0;
754 }
755 
756 static int MatGetInfo_MPIBAIJ(Mat matin,MatInfoType flag,MatInfo *info)
757 {
758   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) matin->data;
759   Mat         A = a->A, B = a->B;
760   int         ierr, isend[3], irecv[3];
761 
762   info->rows_global    = (double)a->M;
763   info->columns_global = (double)a->N;
764   info->rows_local     = (double)a->m;
765   info->columns_local  = (double)a->N;
766   info->block_size     = (double)a->bs;
767   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
768   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->memory;
769   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
770   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->memory;
771   if (flag == MAT_LOCAL) {
772     info->nz_used      = isend[0];
773     info->nz_allocated = isend[1];
774     info->nz_unneeded  = isend[2];
775     info->memory       = isend[3];
776     info->mallocs      = isend[4];
777   } else if (flag == MAT_GLOBAL_MAX) {
778     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_MAX,matin->comm);
779     info->nz_used      = irecv[0];
780     info->nz_allocated = irecv[1];
781     info->nz_unneeded  = irecv[2];
782     info->memory       = irecv[3];
783     info->mallocs      = irecv[4];
784   } else if (flag == MAT_GLOBAL_SUM) {
785     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_SUM,matin->comm);
786     info->nz_used      = irecv[0];
787     info->nz_allocated = irecv[1];
788     info->nz_unneeded  = irecv[2];
789     info->memory       = irecv[3];
790     info->mallocs      = irecv[4];
791   }
792   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
793   info->fill_ratio_needed = 0;
794   info->factor_mallocs    = 0;
795   return 0;
796 }
797 
798 static int MatSetOption_MPIBAIJ(Mat A,MatOption op)
799 {
800   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
801 
802   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
803       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
804       op == MAT_COLUMNS_SORTED ||
805       op == MAT_ROW_ORIENTED) {
806         MatSetOption(a->A,op);
807         MatSetOption(a->B,op);
808   }
809   else if (op == MAT_ROWS_SORTED ||
810            op == MAT_SYMMETRIC ||
811            op == MAT_STRUCTURALLY_SYMMETRIC ||
812            op == MAT_YES_NEW_DIAGONALS)
813     PLogInfo(A,"Info:MatSetOption_MPIBAIJ:Option ignored\n");
814   else if (op == MAT_COLUMN_ORIENTED) {
815     a->roworiented = 0;
816     MatSetOption(a->A,op);
817     MatSetOption(a->B,op);
818   }
819   else if (op == MAT_NO_NEW_DIAGONALS)
820     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIBAIJ:MAT_NO_NEW_DIAGONALS");}
821   else
822     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIBAIJ:unknown option");}
823   return 0;
824 }
825 
826 static int MatTranspose_MPIBAIJ(Mat A,Mat *matout)
827 {
828   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) A->data;
829   Mat_SeqBAIJ *Aloc;
830   Mat        B;
831   int        ierr,M=baij->M,N=baij->N,*ai,*aj,row,i,*rvals,j,k,col;
832   int        bs=baij->bs,mbs=baij->mbs;
833   Scalar     *a;
834 
835   if (matout == PETSC_NULL && M != N)
836     SETERRQ(1,"MatTranspose_MPIBAIJ:Square matrix only for in-place");
837   ierr = MatCreateMPIBAIJ(A->comm,baij->bs,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);
838   CHKERRQ(ierr);
839 
840   /* copy over the A part */
841   Aloc = (Mat_SeqBAIJ*) baij->A->data;
842   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
843   row = baij->rstart;
844   rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
845 
846   for ( i=0; i<mbs; i++ ) {
847     rvals[0] = bs*(baij->rstart + i);
848     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
849     for ( j=ai[i]; j<ai[i+1]; j++ ) {
850       col = (baij->cstart+aj[j])*bs;
851       for (k=0; k<bs; k++ ) {
852         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
853         col++; a += bs;
854       }
855     }
856   }
857   /* copy over the B part */
858   Aloc = (Mat_SeqBAIJ*) baij->B->data;
859   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
860   row = baij->rstart*bs;
861   for ( i=0; i<mbs; i++ ) {
862     rvals[0] = bs*(baij->rstart + i);
863     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
864     for ( j=ai[i]; j<ai[i+1]; j++ ) {
865       col = baij->garray[aj[j]]*bs;
866       for (k=0; k<bs; k++ ) {
867         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
868         col++; a += bs;
869       }
870     }
871   }
872   PetscFree(rvals);
873   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
874   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
875 
876   if (matout != PETSC_NULL) {
877     *matout = B;
878   } else {
879     /* This isn't really an in-place transpose .... but free data structures from baij */
880     PetscFree(baij->rowners);
881     ierr = MatDestroy(baij->A); CHKERRQ(ierr);
882     ierr = MatDestroy(baij->B); CHKERRQ(ierr);
883     if (baij->colmap) PetscFree(baij->colmap);
884     if (baij->garray) PetscFree(baij->garray);
885     if (baij->lvec) VecDestroy(baij->lvec);
886     if (baij->Mvctx) VecScatterDestroy(baij->Mvctx);
887     PetscFree(baij);
888     PetscMemcpy(A,B,sizeof(struct _Mat));
889     PetscHeaderDestroy(B);
890   }
891   return 0;
892 }
893 /* the code does not do the diagonal entries correctly unless the
894    matrix is square and the column and row owerships are identical.
895    This is a BUG. The only way to fix it seems to be to access
896    baij->A and baij->B directly and not through the MatZeroRows()
897    routine.
898 */
899 static int MatZeroRows_MPIBAIJ(Mat A,IS is,Scalar *diag)
900 {
901   Mat_MPIBAIJ    *l = (Mat_MPIBAIJ *) A->data;
902   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
903   int            *procs,*nprocs,j,found,idx,nsends,*work;
904   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
905   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
906   int            *lens,imdex,*lrows,*values,bs=l->bs;
907   MPI_Comm       comm = A->comm;
908   MPI_Request    *send_waits,*recv_waits;
909   MPI_Status     recv_status,*send_status;
910   IS             istmp;
911 
912   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
913   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
914 
915   /*  first count number of contributors to each processor */
916   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
917   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
918   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
919   for ( i=0; i<N; i++ ) {
920     idx = rows[i];
921     found = 0;
922     for ( j=0; j<size; j++ ) {
923       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
924         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
925       }
926     }
927     if (!found) SETERRQ(1,"MatZeroRows_MPIBAIJ:Index out of range");
928   }
929   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
930 
931   /* inform other processors of number of messages and max length*/
932   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
933   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
934   nrecvs = work[rank];
935   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
936   nmax = work[rank];
937   PetscFree(work);
938 
939   /* post receives:   */
940   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
941   CHKPTRQ(rvalues);
942   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
943   CHKPTRQ(recv_waits);
944   for ( i=0; i<nrecvs; i++ ) {
945     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
946   }
947 
948   /* do sends:
949       1) starts[i] gives the starting index in svalues for stuff going to
950          the ith processor
951   */
952   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
953   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
954   CHKPTRQ(send_waits);
955   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
956   starts[0] = 0;
957   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
958   for ( i=0; i<N; i++ ) {
959     svalues[starts[owner[i]]++] = rows[i];
960   }
961   ISRestoreIndices(is,&rows);
962 
963   starts[0] = 0;
964   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
965   count = 0;
966   for ( i=0; i<size; i++ ) {
967     if (procs[i]) {
968       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
969     }
970   }
971   PetscFree(starts);
972 
973   base = owners[rank]*bs;
974 
975   /*  wait on receives */
976   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
977   source = lens + nrecvs;
978   count  = nrecvs; slen = 0;
979   while (count) {
980     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
981     /* unpack receives into our local space */
982     MPI_Get_count(&recv_status,MPI_INT,&n);
983     source[imdex]  = recv_status.MPI_SOURCE;
984     lens[imdex]  = n;
985     slen += n;
986     count--;
987   }
988   PetscFree(recv_waits);
989 
990   /* move the data into the send scatter */
991   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
992   count = 0;
993   for ( i=0; i<nrecvs; i++ ) {
994     values = rvalues + i*nmax;
995     for ( j=0; j<lens[i]; j++ ) {
996       lrows[count++] = values[j] - base;
997     }
998   }
999   PetscFree(rvalues); PetscFree(lens);
1000   PetscFree(owner); PetscFree(nprocs);
1001 
1002   /* actually zap the local rows */
1003   ierr = ISCreateGeneral(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
1004   PLogObjectParent(A,istmp);
1005   PetscFree(lrows);
1006   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
1007   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
1008   ierr = ISDestroy(istmp); CHKERRQ(ierr);
1009 
1010   /* wait on sends */
1011   if (nsends) {
1012     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
1013     CHKPTRQ(send_status);
1014     MPI_Waitall(nsends,send_waits,send_status);
1015     PetscFree(send_status);
1016   }
1017   PetscFree(send_waits); PetscFree(svalues);
1018 
1019   return 0;
1020 }
1021 extern int MatPrintHelp_SeqBAIJ(Mat);
1022 static int MatPrintHelp_MPIBAIJ(Mat A)
1023 {
1024   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1025 
1026   if (!a->rank) return MatPrintHelp_SeqBAIJ(a->A);
1027   else return 0;
1028 }
1029 
1030 static int MatConvertSameType_MPIBAIJ(Mat,Mat *,int);
1031 
1032 /* -------------------------------------------------------------------*/
1033 static struct _MatOps MatOps = {
1034   MatSetValues_MPIBAIJ,MatGetRow_MPIBAIJ,MatRestoreRow_MPIBAIJ,MatMult_MPIBAIJ,
1035   MatMultAdd_MPIBAIJ,MatMultTrans_MPIBAIJ,MatMultTransAdd_MPIBAIJ,MatSolve_MPIBAIJ,
1036   MatSolveAdd_MPIBAIJ,MatSolveTrans_MPIBAIJ,MatSolveTransAdd_MPIBAIJ,MatLUFactor_MPIBAIJ,
1037   0,0,MatTranspose_MPIBAIJ,MatGetInfo_MPIBAIJ,
1038   0,MatGetDiagonal_MPIBAIJ,0,MatNorm_MPIBAIJ,
1039   MatAssemblyBegin_MPIBAIJ,MatAssemblyEnd_MPIBAIJ,0,MatSetOption_MPIBAIJ,
1040   MatZeroEntries_MPIBAIJ,MatZeroRows_MPIBAIJ,MatGetReordering_MPIBAIJ,MatLUFactorSymbolic_MPIBAIJ,
1041   MatLUFactorNumeric_MPIBAIJ,0,0,MatGetSize_MPIBAIJ,
1042   MatGetLocalSize_MPIBAIJ,MatGetOwnershipRange_MPIBAIJ,MatILUFactorSymbolic_MPIBAIJ,0,
1043   0,0,0,MatConvertSameType_MPIBAIJ,0,0,
1044   0,0,0,MatGetSubMatrices_MPIBAIJ,
1045   MatIncreaseOverlap_MPIBAIJ,MatGetValues_MPIBAIJ,0,MatPrintHelp_MPIBAIJ,
1046   MatScale_MPIBAIJ,0,0,0,MatGetBlockSize_MPIBAIJ};
1047 
1048 
1049 /*@C
1050    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
1051    (block compressed row).  For good matrix assembly performance
1052    the user should preallocate the matrix storage by setting the parameters
1053    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1054    performance can be increased by more than a factor of 50.
1055 
1056    Input Parameters:
1057 .  comm - MPI communicator
1058 .  bs   - size of blockk
1059 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1060            This value should be the same as the local size used in creating the
1061            y vector for the matrix-vector product y = Ax.
1062 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1063            This value should be the same as the local size used in creating the
1064            x vector for the matrix-vector product y = Ax.
1065 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1066 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1067 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
1068            submatrix  (same for all local rows)
1069 .  d_nzz - array containing the number of block nonzeros in the various block rows
1070            of the in diagonal portion of the local (possibly different for each block
1071            row) or PETSC_NULL.  You must leave room for the diagonal entry even if
1072            it is zero.
1073 .  o_nz  - number of block nonzeros per block row in the off-diagonal portion of local
1074            submatrix (same for all local rows).
1075 .  o_nzz - array containing the number of nonzeros in the various block rows of the
1076            off-diagonal portion of the local submatrix (possibly different for
1077            each block row) or PETSC_NULL.
1078 
1079    Output Parameter:
1080 .  A - the matrix
1081 
1082    Notes:
1083    The user MUST specify either the local or global matrix dimensions
1084    (possibly both).
1085 
1086    Storage Information:
1087    For a square global matrix we define each processor's diagonal portion
1088    to be its local rows and the corresponding columns (a square submatrix);
1089    each processor's off-diagonal portion encompasses the remainder of the
1090    local matrix (a rectangular submatrix).
1091 
1092    The user can specify preallocated storage for the diagonal part of
1093    the local submatrix with either d_nz or d_nnz (not both).  Set
1094    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1095    memory allocation.  Likewise, specify preallocated storage for the
1096    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1097 
1098    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1099    the figure below we depict these three local rows and all columns (0-11).
1100 
1101 $          0 1 2 3 4 5 6 7 8 9 10 11
1102 $         -------------------
1103 $  row 3  |  o o o d d d o o o o o o
1104 $  row 4  |  o o o d d d o o o o o o
1105 $  row 5  |  o o o d d d o o o o o o
1106 $         -------------------
1107 $
1108 
1109    Thus, any entries in the d locations are stored in the d (diagonal)
1110    submatrix, and any entries in the o locations are stored in the
1111    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1112    stored simply in the MATSEQBAIJ format for compressed row storage.
1113 
1114    Now d_nz should indicate the number of nonzeros per row in the d matrix,
1115    and o_nz should indicate the number of nonzeros per row in the o matrix.
1116    In general, for PDE problems in which most nonzeros are near the diagonal,
1117    one expects d_nz >> o_nz.   For large problems you MUST preallocate memory
1118    or you will get TERRIBLE performance; see the users' manual chapter on
1119    matrices and the file $(PETSC_DIR)/Performance.
1120 
1121 .keywords: matrix, block, aij, compressed row, sparse, parallel
1122 
1123 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues()
1124 @*/
1125 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
1126                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1127 {
1128   Mat          B;
1129   Mat_MPIBAIJ  *b;
1130   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE;
1131 
1132   if (bs < 1) SETERRQ(1,"MatCreateMPIBAIJ: invalid block size specified");
1133   *A = 0;
1134   PetscHeaderCreate(B,_Mat,MAT_COOKIE,MATMPIBAIJ,comm);
1135   PLogObjectCreate(B);
1136   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
1137   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
1138   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1139   B->destroy    = MatDestroy_MPIBAIJ;
1140   B->view       = MatView_MPIBAIJ;
1141 
1142   B->factor     = 0;
1143   B->assembled  = PETSC_FALSE;
1144 
1145   b->insertmode = NOT_SET_VALUES;
1146   MPI_Comm_rank(comm,&b->rank);
1147   MPI_Comm_size(comm,&b->size);
1148 
1149   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
1150     SETERRQ(1,"MatCreateMPIBAIJ:Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1151   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) SETERRQ(1,"MatCreateMPIBAIJ: either M or m should be specified");
1152   if ( M == PETSC_DECIDE && n == PETSC_DECIDE)SETERRQ(1,"MatCreateMPIBAIJ: either N or n should be specified");
1153   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
1154   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
1155 
1156   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1157     work[0] = m; work[1] = n;
1158     mbs = m/bs; nbs = n/bs;
1159     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
1160     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
1161     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
1162   }
1163   if (m == PETSC_DECIDE) {
1164     Mbs = M/bs;
1165     if (Mbs*bs != M) SETERRQ(1,"MatCreateMPIBAIJ: No of global rows must be divisible by blocksize");
1166     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
1167     m   = mbs*bs;
1168   }
1169   if (n == PETSC_DECIDE) {
1170     Nbs = N/bs;
1171     if (Nbs*bs != N) SETERRQ(1,"MatCreateMPIBAIJ: No of global cols must be divisible by blocksize");
1172     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
1173     n   = nbs*bs;
1174   }
1175   if (mbs*bs != m || nbs*bs != n) SETERRQ(1,"MatCreateMPIBAIJ: No of local rows, cols must be divisible by blocksize");
1176 
1177   b->m = m; B->m = m;
1178   b->n = n; B->n = n;
1179   b->N = N; B->N = N;
1180   b->M = M; B->M = M;
1181   b->bs  = bs;
1182   b->bs2 = bs*bs;
1183   b->mbs = mbs;
1184   b->nbs = nbs;
1185   b->Mbs = Mbs;
1186   b->Nbs = Nbs;
1187 
1188   /* build local table of row and column ownerships */
1189   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1190   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
1191   b->cowners = b->rowners + b->size + 2;
1192   MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
1193   b->rowners[0] = 0;
1194   for ( i=2; i<=b->size; i++ ) {
1195     b->rowners[i] += b->rowners[i-1];
1196   }
1197   b->rstart = b->rowners[b->rank];
1198   b->rend   = b->rowners[b->rank+1];
1199   MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
1200   b->cowners[0] = 0;
1201   for ( i=2; i<=b->size; i++ ) {
1202     b->cowners[i] += b->cowners[i-1];
1203   }
1204   b->cstart = b->cowners[b->rank];
1205   b->cend   = b->cowners[b->rank+1];
1206 
1207   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1208   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1209   PLogObjectParent(B,b->A);
1210   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1211   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1212   PLogObjectParent(B,b->B);
1213 
1214   /* build cache for off array entries formed */
1215   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1216   b->colmap      = 0;
1217   b->garray      = 0;
1218   b->roworiented = 1;
1219 
1220   /* stuff used for matrix vector multiply */
1221   b->lvec      = 0;
1222   b->Mvctx     = 0;
1223 
1224   /* stuff for MatGetRow() */
1225   b->rowindices   = 0;
1226   b->rowvalues    = 0;
1227   b->getrowactive = PETSC_FALSE;
1228 
1229   *A = B;
1230   return 0;
1231 }
1232 static int MatConvertSameType_MPIBAIJ(Mat matin,Mat *newmat,int cpvalues)
1233 {
1234   Mat        mat;
1235   Mat_MPIBAIJ *a,*oldmat = (Mat_MPIBAIJ *) matin->data;
1236   int        ierr, len=0, flg;
1237 
1238   *newmat       = 0;
1239   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIBAIJ,matin->comm);
1240   PLogObjectCreate(mat);
1241   mat->data       = (void *) (a = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(a);
1242   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1243   mat->destroy    = MatDestroy_MPIBAIJ;
1244   mat->view       = MatView_MPIBAIJ;
1245   mat->factor     = matin->factor;
1246   mat->assembled  = PETSC_TRUE;
1247 
1248   a->m = mat->m   = oldmat->m;
1249   a->n = mat->n   = oldmat->n;
1250   a->M = mat->M   = oldmat->M;
1251   a->N = mat->N   = oldmat->N;
1252 
1253   a->bs  = oldmat->bs;
1254   a->bs2 = oldmat->bs2;
1255   a->mbs = oldmat->mbs;
1256   a->nbs = oldmat->nbs;
1257   a->Mbs = oldmat->Mbs;
1258   a->Nbs = oldmat->Nbs;
1259 
1260   a->rstart       = oldmat->rstart;
1261   a->rend         = oldmat->rend;
1262   a->cstart       = oldmat->cstart;
1263   a->cend         = oldmat->cend;
1264   a->size         = oldmat->size;
1265   a->rank         = oldmat->rank;
1266   a->insertmode   = NOT_SET_VALUES;
1267   a->rowvalues    = 0;
1268   a->getrowactive = PETSC_FALSE;
1269 
1270   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1271   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
1272   a->cowners = a->rowners + a->size + 2;
1273   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1274   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1275   if (oldmat->colmap) {
1276     a->colmap = (int *) PetscMalloc((a->Nbs)*sizeof(int));CHKPTRQ(a->colmap);
1277     PLogObjectMemory(mat,(a->Nbs)*sizeof(int));
1278     PetscMemcpy(a->colmap,oldmat->colmap,(a->Nbs)*sizeof(int));
1279   } else a->colmap = 0;
1280   if (oldmat->garray && (len = ((Mat_SeqBAIJ *) (oldmat->B->data))->nbs)) {
1281     a->garray = (int *) PetscMalloc(len*sizeof(int)); CHKPTRQ(a->garray);
1282     PLogObjectMemory(mat,len*sizeof(int));
1283     PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1284   } else a->garray = 0;
1285 
1286   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1287   PLogObjectParent(mat,a->lvec);
1288   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1289   PLogObjectParent(mat,a->Mvctx);
1290   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1291   PLogObjectParent(mat,a->A);
1292   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1293   PLogObjectParent(mat,a->B);
1294   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1295   if (flg) {
1296     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1297   }
1298   *newmat = mat;
1299   return 0;
1300 }
1301 
1302 #include "sys.h"
1303 
1304 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
1305 {
1306   Mat          A;
1307   int          i, nz, ierr, j,rstart, rend, fd;
1308   Scalar       *vals,*buf;
1309   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1310   MPI_Status   status;
1311   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
1312   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
1313   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,bs2,Mbs,mbs,extra_rows;
1314   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
1315   int          dcount,kmax,k,nzcount,tmp;
1316 
1317 
1318   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
1319   bs2  = bs*bs;
1320 
1321   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1322   if (!rank) {
1323     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1324     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
1325     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIBAIJ:not matrix object");
1326   }
1327 
1328   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1329   M = header[1]; N = header[2];
1330 
1331   if (M != N) SETERRQ(1,"MatLoad_SeqBAIJ:Can only do square matrices");
1332 
1333   /*
1334      This code adds extra rows to make sure the number of rows is
1335      divisible by the blocksize
1336   */
1337   Mbs        = M/bs;
1338   extra_rows = bs - M + bs*(Mbs);
1339   if (extra_rows == bs) extra_rows = 0;
1340   else                  Mbs++;
1341   if (extra_rows &&!rank) {
1342     PLogInfo(0,"MatLoad_SeqBAIJ:Padding loaded matrix to match blocksize\n");
1343   }
1344 
1345   /* determine ownership of all rows */
1346   mbs = Mbs/size + ((Mbs % size) > rank);
1347   m   = mbs * bs;
1348   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
1349   browners = rowners + size + 1;
1350   MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1351   rowners[0] = 0;
1352   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
1353   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
1354   rstart = rowners[rank];
1355   rend   = rowners[rank+1];
1356 
1357   /* distribute row lengths to all processors */
1358   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
1359   if (!rank) {
1360     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
1361     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1362     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
1363     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1364     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
1365     MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);
1366     PetscFree(sndcounts);
1367   }
1368   else {
1369     MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);
1370   }
1371 
1372   if (!rank) {
1373     /* calculate the number of nonzeros on each processor */
1374     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1375     PetscMemzero(procsnz,size*sizeof(int));
1376     for ( i=0; i<size; i++ ) {
1377       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
1378         procsnz[i] += rowlengths[j];
1379       }
1380     }
1381     PetscFree(rowlengths);
1382 
1383     /* determine max buffer needed and allocate it */
1384     maxnz = 0;
1385     for ( i=0; i<size; i++ ) {
1386       maxnz = PetscMax(maxnz,procsnz[i]);
1387     }
1388     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1389 
1390     /* read in my part of the matrix column indices  */
1391     nz = procsnz[0];
1392     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1393     mycols = ibuf;
1394     if (size == 1)  nz -= extra_rows;
1395     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1396     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
1397 
1398     /* read in every ones (except the last) and ship off */
1399     for ( i=1; i<size-1; i++ ) {
1400       nz = procsnz[i];
1401       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1402       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1403     }
1404     /* read in the stuff for the last proc */
1405     if ( size != 1 ) {
1406       nz = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
1407       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1408       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
1409       MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);
1410     }
1411     PetscFree(cols);
1412   }
1413   else {
1414     /* determine buffer space needed for message */
1415     nz = 0;
1416     for ( i=0; i<m; i++ ) {
1417       nz += locrowlens[i];
1418     }
1419     ibuf = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1420     mycols = ibuf;
1421     /* receive message of column indices*/
1422     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1423     MPI_Get_count(&status,MPI_INT,&maxnz);
1424     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIBAIJ:something is wrong with file");
1425   }
1426 
1427   /* loop over local rows, determining number of off diagonal entries */
1428   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
1429   odlens = dlens + (rend-rstart);
1430   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
1431   PetscMemzero(mask,3*Mbs*sizeof(int));
1432   masked1 = mask    + Mbs;
1433   masked2 = masked1 + Mbs;
1434   rowcount = 0; nzcount = 0;
1435   for ( i=0; i<mbs; i++ ) {
1436     dcount  = 0;
1437     odcount = 0;
1438     for ( j=0; j<bs; j++ ) {
1439       kmax = locrowlens[rowcount];
1440       for ( k=0; k<kmax; k++ ) {
1441         tmp = mycols[nzcount++]/bs;
1442         if (!mask[tmp]) {
1443           mask[tmp] = 1;
1444           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
1445           else masked1[dcount++] = tmp;
1446         }
1447       }
1448       rowcount++;
1449     }
1450 
1451     dlens[i]  = dcount;
1452     odlens[i] = odcount;
1453 
1454     /* zero out the mask elements we set */
1455     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
1456     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
1457   }
1458 
1459   /* create our matrix */
1460   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);
1461          CHKERRQ(ierr);
1462   A = *newmat;
1463   MatSetOption(A,MAT_COLUMNS_SORTED);
1464 
1465   if (!rank) {
1466     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
1467     /* read in my part of the matrix numerical values  */
1468     nz = procsnz[0];
1469     vals = buf;
1470     mycols = ibuf;
1471     if (size == 1)  nz -= extra_rows;
1472     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1473     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
1474 
1475     /* insert into matrix */
1476     jj      = rstart*bs;
1477     for ( i=0; i<m; i++ ) {
1478       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1479       mycols += locrowlens[i];
1480       vals   += locrowlens[i];
1481       jj++;
1482     }
1483     /* read in other processors (except the last one) and ship out */
1484     for ( i=1; i<size-1; i++ ) {
1485       nz = procsnz[i];
1486       vals = buf;
1487       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1488       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1489     }
1490     /* the last proc */
1491     if ( size != 1 ){
1492       nz = procsnz[i] - extra_rows;
1493       vals = buf;
1494       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1495       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
1496       MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);
1497     }
1498     PetscFree(procsnz);
1499   }
1500   else {
1501     /* receive numeric values */
1502     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
1503 
1504     /* receive message of values*/
1505     vals = buf;
1506     mycols = ibuf;
1507     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1508     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1509     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIBAIJ:something is wrong with file");
1510 
1511     /* insert into matrix */
1512     jj      = rstart*bs;
1513     for ( i=0; i<m; i++ ) {
1514       ierr    = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1515       mycols += locrowlens[i];
1516       vals   += locrowlens[i];
1517       jj++;
1518     }
1519   }
1520   PetscFree(locrowlens);
1521   PetscFree(buf);
1522   PetscFree(ibuf);
1523   PetscFree(rowners);
1524   PetscFree(dlens);
1525   PetscFree(mask);
1526   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1527   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1528   return 0;
1529 }
1530 
1531 
1532