xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision 6a67db7e1512b5d9ff9ba8f94e48d05f0a9eae3d)
1 #ifndef lint
2 static char vcid[] = "$Id: mpibaij.c,v 1.23 1996/08/22 19:53:50 curfman Exp curfman $";
3 #endif
4 
5 #include "src/mat/impls/baij/mpi/mpibaij.h"
6 #include "src/vec/vecimpl.h"
7 
8 #include "draw.h"
9 #include "pinclude/pviewer.h"
10 
11 extern int MatSetUpMultiply_MPIBAIJ(Mat);
12 extern int DisAssemble_MPIBAIJ(Mat);
13 extern int MatIncreaseOverlap_MPIBAIJ(Mat,int,IS *,int);
14 extern int MatGetSubMatrices_MPIBAIJ(Mat,int,IS *,IS *,MatGetSubMatrixCall,Mat **);
15 extern int MatLUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,Mat *);
16 extern int MatLUFactorNumeric_MPIBAIJ(Mat,Mat *);
17 extern int MatLUFactor_MPIBAIJ(Mat,IS,IS,double);
18 extern int MatSolve_MPIBAIJ(Mat,Vec,Vec);
19 extern int MatSolveAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
20 extern int MatSolveTrans_MPIBAIJ(Mat,Vec,Vec);
21 extern int MatSolveTransAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
22 extern int MatILUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,int,Mat *);
23 
24 /*
25      Local utility routine that creates a mapping from the global column
26    number to the local number in the off-diagonal part of the local
27    storage of the matrix.  This is done in a non scable way since the
28    length of colmap equals the global matrix length.
29 */
30 static int CreateColmap_MPIBAIJ_Private(Mat mat)
31 {
32   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
33   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
34   int         nbs = B->nbs,i;
35 
36   baij->colmap = (int *) PetscMalloc(baij->Nbs*sizeof(int));CHKPTRQ(baij->colmap);
37   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
38   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
39   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i + 1;
40   return 0;
41 }
42 
43 
44 static int MatGetReordering_MPIBAIJ(Mat mat,MatReordering type,IS *rperm,IS *cperm)
45 {
46   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
47   int         ierr;
48   if (baij->size == 1) {
49     ierr = MatGetReordering(baij->A,type,rperm,cperm); CHKERRQ(ierr);
50   } else SETERRQ(1,"MatGetReordering_MPIBAIJ:not supported in parallel");
51   return 0;
52 }
53 
54 static int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
55 {
56   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
57   Scalar      value;
58   int         ierr,i,j, rstart = baij->rstart, rend = baij->rend;
59   int         cstart = baij->cstart, cend = baij->cend,row,col;
60   int         roworiented = baij->roworiented,rstart_orig,rend_orig;
61   int         cstart_orig,cend_orig,bs=baij->bs;
62 
63   if (baij->insertmode != NOT_SET_VALUES && baij->insertmode != addv) {
64     SETERRQ(1,"MatSetValues_MPIBAIJ:Cannot mix inserts and adds");
65   }
66   baij->insertmode = addv;
67   rstart_orig = rstart*bs;
68   rend_orig   = rend*bs;
69   cstart_orig = cstart*bs;
70   cend_orig   = cend*bs;
71   for ( i=0; i<m; i++ ) {
72     if (im[i] < 0) SETERRQ(1,"MatSetValues_MPIBAIJ:Negative row");
73     if (im[i] >= baij->M) SETERRQ(1,"MatSetValues_MPIBAIJ:Row too large");
74     if (im[i] >= rstart_orig && im[i] < rend_orig) {
75       row = im[i] - rstart_orig;
76       for ( j=0; j<n; j++ ) {
77         if (in[j] < 0) SETERRQ(1,"MatSetValues_MPIBAIJ:Negative column");
78         if (in[j] >= baij->N) SETERRQ(1,"MatSetValues_MPIBAIJ:Col too large");
79         if (in[j] >= cstart_orig && in[j] < cend_orig){
80           col = in[j] - cstart_orig;
81           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
82           ierr = MatSetValues(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
83         }
84         else {
85           if (mat->was_assembled) {
86             if (!baij->colmap) {
87               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
88             }
89             col = baij->colmap[in[j]/bs] - 1 + in[j]%bs;
90             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
91               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
92               col =  in[j];
93             }
94           }
95           else col = in[j];
96           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
97           ierr = MatSetValues(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
98         }
99       }
100     }
101     else {
102       if (roworiented) {
103         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
104       }
105       else {
106         row = im[i];
107         for ( j=0; j<n; j++ ) {
108           ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
109         }
110       }
111     }
112   }
113   return 0;
114 }
115 
116 static int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
117 {
118   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
119   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
120   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col;
121 
122   for ( i=0; i<m; i++ ) {
123     if (idxm[i] < 0) SETERRQ(1,"MatGetValues_MPIBAIJ:Negative row");
124     if (idxm[i] >= baij->M) SETERRQ(1,"MatGetValues_MPIBAIJ:Row too large");
125     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
126       row = idxm[i] - bsrstart;
127       for ( j=0; j<n; j++ ) {
128         if (idxn[j] < 0) SETERRQ(1,"MatGetValues_MPIBAIJ:Negative column");
129         if (idxn[j] >= baij->N) SETERRQ(1,"MatGetValues_MPIBAIJ:Col too large");
130         if (idxn[j] >= bscstart && idxn[j] < bscend){
131           col = idxn[j] - bscstart;
132           ierr = MatGetValues(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
133         }
134         else {
135           if (!baij->colmap) {
136             ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
137           }
138           if((baij->colmap[idxn[j]/bs]-1 < 0) ||
139              (baij->garray[baij->colmap[idxn[j]/bs]-1] != idxn[j]/bs)) *(v+i*n+j) = 0.0;
140           else {
141             col  = (baij->colmap[idxn[j]/bs]-1)*bs + idxn[j]%bs;
142             ierr = MatGetValues(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
143           }
144         }
145       }
146     }
147     else {
148       SETERRQ(1,"MatGetValues_MPIBAIJ:Only local values currently supported");
149     }
150   }
151   return 0;
152 }
153 
154 static int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
155 {
156   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
157   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
158   int        ierr, i,bs2=baij->bs2;
159   double     sum = 0.0;
160   Scalar     *v;
161 
162   if (baij->size == 1) {
163     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
164   } else {
165     if (type == NORM_FROBENIUS) {
166       v = amat->a;
167       for (i=0; i<amat->nz*bs2; i++ ) {
168 #if defined(PETSC_COMPLEX)
169         sum += real(conj(*v)*(*v)); v++;
170 #else
171         sum += (*v)*(*v); v++;
172 #endif
173       }
174       v = bmat->a;
175       for (i=0; i<bmat->nz*bs2; i++ ) {
176 #if defined(PETSC_COMPLEX)
177         sum += real(conj(*v)*(*v)); v++;
178 #else
179         sum += (*v)*(*v); v++;
180 #endif
181       }
182       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
183       *norm = sqrt(*norm);
184     }
185     else
186       SETERRQ(1,"MatNorm_SeqBAIJ:No support for this norm yet");
187   }
188   return 0;
189 }
190 
191 static int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
192 {
193   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
194   MPI_Comm    comm = mat->comm;
195   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
196   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
197   MPI_Request *send_waits,*recv_waits;
198   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
199   InsertMode  addv;
200   Scalar      *rvalues,*svalues;
201 
202   /* make sure all processors are either in INSERTMODE or ADDMODE */
203   MPI_Allreduce(&baij->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
204   if (addv == (ADD_VALUES|INSERT_VALUES)) {
205     SETERRQ(1,"MatAssemblyBegin_MPIBAIJ:Some processors inserted others added");
206   }
207   baij->insertmode = addv; /* in case this processor had no cache */
208 
209   /*  first count number of contributors to each processor */
210   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
211   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
212   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
213   for ( i=0; i<baij->stash.n; i++ ) {
214     idx = baij->stash.idx[i];
215     for ( j=0; j<size; j++ ) {
216       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
217         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
218       }
219     }
220   }
221   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
222 
223   /* inform other processors of number of messages and max length*/
224   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
225   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
226   nreceives = work[rank];
227   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
228   nmax = work[rank];
229   PetscFree(work);
230 
231   /* post receives:
232        1) each message will consist of ordered pairs
233      (global index,value) we store the global index as a double
234      to simplify the message passing.
235        2) since we don't know how long each individual message is we
236      allocate the largest needed buffer for each receive. Potentially
237      this is a lot of wasted space.
238 
239 
240        This could be done better.
241   */
242   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
243   CHKPTRQ(rvalues);
244   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
245   CHKPTRQ(recv_waits);
246   for ( i=0; i<nreceives; i++ ) {
247     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
248               comm,recv_waits+i);
249   }
250 
251   /* do sends:
252       1) starts[i] gives the starting index in svalues for stuff going to
253          the ith processor
254   */
255   svalues = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
256   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
257   CHKPTRQ(send_waits);
258   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
259   starts[0] = 0;
260   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
261   for ( i=0; i<baij->stash.n; i++ ) {
262     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
263     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
264     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
265   }
266   PetscFree(owner);
267   starts[0] = 0;
268   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
269   count = 0;
270   for ( i=0; i<size; i++ ) {
271     if (procs[i]) {
272       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
273                 comm,send_waits+count++);
274     }
275   }
276   PetscFree(starts); PetscFree(nprocs);
277 
278   /* Free cache space */
279   PLogInfo(0,"[%d]MatAssemblyBegin_MPIBAIJ:Number of off processor values %d\n",rank,baij->stash.n);
280   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
281 
282   baij->svalues    = svalues;    baij->rvalues    = rvalues;
283   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
284   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
285   baij->rmax       = nmax;
286 
287   return 0;
288 }
289 
290 
291 static int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
292 {
293   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
294   MPI_Status  *send_status,recv_status;
295   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
296   int         bs=baij->bs,row,col,other_disassembled;
297   Scalar      *values,val;
298   InsertMode  addv = baij->insertmode;
299 
300   /*  wait on receives */
301   while (count) {
302     MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);
303     /* unpack receives into our local space */
304     values = baij->rvalues + 3*imdex*baij->rmax;
305     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
306     n = n/3;
307     for ( i=0; i<n; i++ ) {
308       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
309       col = (int) PetscReal(values[3*i+1]);
310       val = values[3*i+2];
311       if (col >= baij->cstart*bs && col < baij->cend*bs) {
312         col -= baij->cstart*bs;
313         MatSetValues(baij->A,1,&row,1,&col,&val,addv);
314       }
315       else {
316         if (mat->was_assembled) {
317           if (!baij->colmap) {
318             ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);
319           }
320           col = (baij->colmap[col/bs]-1)*bs + col%bs;
321           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
322             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
323             col = (int) PetscReal(values[3*i+1]);
324           }
325         }
326         MatSetValues(baij->B,1,&row,1,&col,&val,addv);
327       }
328     }
329     count--;
330   }
331   PetscFree(baij->recv_waits); PetscFree(baij->rvalues);
332 
333   /* wait on sends */
334   if (baij->nsends) {
335     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));
336     CHKPTRQ(send_status);
337     MPI_Waitall(baij->nsends,baij->send_waits,send_status);
338     PetscFree(send_status);
339   }
340   PetscFree(baij->send_waits); PetscFree(baij->svalues);
341 
342   baij->insertmode = NOT_SET_VALUES;
343   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
344   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
345 
346   /* determine if any processor has disassembled, if so we must
347      also disassemble ourselfs, in order that we may reassemble. */
348   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
349   if (mat->was_assembled && !other_disassembled) {
350     ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
351   }
352 
353   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
354     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
355   }
356   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
357   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
358 
359   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
360   return 0;
361 }
362 
363 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
364 {
365   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
366   int          ierr;
367 
368   if (baij->size == 1) {
369     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
370   }
371   else SETERRQ(1,"MatView_MPIBAIJ_Binary:Only uniprocessor output supported");
372   return 0;
373 }
374 
375 static int MatView_MPIBAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
376 {
377   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
378   int          ierr, format,rank,bs=baij->bs;
379   FILE         *fd;
380   ViewerType   vtype;
381 
382   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
383   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
384     ierr = ViewerGetFormat(viewer,&format);
385     if (format == ASCII_FORMAT_INFO_DETAILED) {
386       MatInfo info;
387       MPI_Comm_rank(mat->comm,&rank);
388       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
389       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
390       PetscSequentialPhaseBegin(mat->comm,1);
391       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
392               rank,baij->m,(int)info.nz_used*bs,(int)info.nz_allocated*bs,
393               baij->bs,(int)info.memory);
394       ierr = MatGetInfo(baij->A,MAT_LOCAL,&info);
395       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
396       ierr = MatGetInfo(baij->B,MAT_LOCAL,&info);
397       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
398       fflush(fd);
399       PetscSequentialPhaseEnd(mat->comm,1);
400       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
401       return 0;
402     }
403     else if (format == ASCII_FORMAT_INFO) {
404       return 0;
405     }
406   }
407 
408   if (vtype == DRAW_VIEWER) {
409     Draw       draw;
410     PetscTruth isnull;
411     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
412     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
413   }
414 
415   if (vtype == ASCII_FILE_VIEWER) {
416     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
417     PetscSequentialPhaseBegin(mat->comm,1);
418     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
419            baij->rank,baij->m,baij->rstart*bs,baij->rend*bs,baij->n,
420             baij->cstart*bs,baij->cend*bs);
421     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
422     ierr = MatView(baij->B,viewer); CHKERRQ(ierr);
423     fflush(fd);
424     PetscSequentialPhaseEnd(mat->comm,1);
425   }
426   else {
427     int size = baij->size;
428     rank = baij->rank;
429     if (size == 1) {
430       ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
431     }
432     else {
433       /* assemble the entire matrix onto first processor. */
434       Mat         A;
435       Mat_SeqBAIJ *Aloc;
436       int         M = baij->M, N = baij->N,*ai,*aj,row,col,i,j,k,*rvals;
437       int         mbs=baij->mbs;
438       Scalar      *a;
439 
440       if (!rank) {
441         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
442         CHKERRQ(ierr);
443       }
444       else {
445         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
446         CHKERRQ(ierr);
447       }
448       PLogObjectParent(mat,A);
449 
450       /* copy over the A part */
451       Aloc = (Mat_SeqBAIJ*) baij->A->data;
452       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
453       row = baij->rstart;
454       rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
455 
456       for ( i=0; i<mbs; i++ ) {
457         rvals[0] = bs*(baij->rstart + i);
458         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
459         for ( j=ai[i]; j<ai[i+1]; j++ ) {
460           col = (baij->cstart+aj[j])*bs;
461           for (k=0; k<bs; k++ ) {
462             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
463             col++; a += bs;
464           }
465         }
466       }
467       /* copy over the B part */
468       Aloc = (Mat_SeqBAIJ*) baij->B->data;
469       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
470       row = baij->rstart*bs;
471       for ( i=0; i<mbs; i++ ) {
472         rvals[0] = bs*(baij->rstart + i);
473         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
474         for ( j=ai[i]; j<ai[i+1]; j++ ) {
475           col = baij->garray[aj[j]]*bs;
476           for (k=0; k<bs; k++ ) {
477             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
478             col++; a += bs;
479           }
480         }
481       }
482       PetscFree(rvals);
483       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
484       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
485       if (!rank) {
486         ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
487       }
488       ierr = MatDestroy(A); CHKERRQ(ierr);
489     }
490   }
491   return 0;
492 }
493 
494 
495 
496 static int MatView_MPIBAIJ(PetscObject obj,Viewer viewer)
497 {
498   Mat         mat = (Mat) obj;
499   int         ierr;
500   ViewerType  vtype;
501 
502   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
503   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
504       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
505     ierr = MatView_MPIBAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
506   }
507   else if (vtype == BINARY_FILE_VIEWER) {
508     return MatView_MPIBAIJ_Binary(mat,viewer);
509   }
510   return 0;
511 }
512 
513 static int MatDestroy_MPIBAIJ(PetscObject obj)
514 {
515   Mat         mat = (Mat) obj;
516   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
517   int         ierr;
518 
519 #if defined(PETSC_LOG)
520   PLogObjectState(obj,"Rows=%d, Cols=%d",baij->M,baij->N);
521 #endif
522 
523   PetscFree(baij->rowners);
524   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
525   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
526   if (baij->colmap) PetscFree(baij->colmap);
527   if (baij->garray) PetscFree(baij->garray);
528   if (baij->lvec)   VecDestroy(baij->lvec);
529   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
530   if (baij->rowvalues) PetscFree(baij->rowvalues);
531   PetscFree(baij);
532   PLogObjectDestroy(mat);
533   PetscHeaderDestroy(mat);
534   return 0;
535 }
536 
537 static int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
538 {
539   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
540   int         ierr, nt;
541 
542   VecGetLocalSize_Fast(xx,nt);
543   if (nt != a->n) {
544     SETERRQ(1,"MatMult_MPIBAIJ:Incompatible parition of A and xx");
545   }
546   VecGetLocalSize_Fast(yy,nt);
547   if (nt != a->m) {
548     SETERRQ(1,"MatMult_MPIBAIJ:Incompatible parition of A and yy");
549   }
550   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
551   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
552   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
553   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
554   return 0;
555 }
556 
557 static int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
558 {
559   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
560   int        ierr;
561   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
562   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
563   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
564   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
565   return 0;
566 }
567 
568 static int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
569 {
570   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
571   int        ierr;
572 
573   /* do nondiagonal part */
574   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
575   /* send it on its way */
576   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
577   /* do local part */
578   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
579   /* receive remote parts: note this assumes the values are not actually */
580   /* inserted in yy until the next line, which is true for my implementation*/
581   /* but is not perhaps always true. */
582   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,
583                   (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx);CHKERRQ(ierr);
584   return 0;
585 }
586 
587 static int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
588 {
589   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
590   int        ierr;
591 
592   /* do nondiagonal part */
593   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
594   /* send it on its way */
595   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
596   /* do local part */
597   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
598   /* receive remote parts: note this assumes the values are not actually */
599   /* inserted in yy until the next line, which is true for my implementation*/
600   /* but is not perhaps always true. */
601   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
602   return 0;
603 }
604 
605 /*
606   This only works correctly for square matrices where the subblock A->A is the
607    diagonal block
608 */
609 static int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
610 {
611   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
612   if (a->M != a->N)
613     SETERRQ(1,"MatGetDiagonal_MPIBAIJ:Supports only square matrix where A->A is diag block");
614   return MatGetDiagonal(a->A,v);
615 }
616 
617 static int MatScale_MPIBAIJ(Scalar *aa,Mat A)
618 {
619   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
620   int        ierr;
621   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
622   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
623   return 0;
624 }
625 static int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
626 {
627   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
628   *m = mat->M; *n = mat->N;
629   return 0;
630 }
631 
632 static int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
633 {
634   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
635   *m = mat->m; *n = mat->N;
636   return 0;
637 }
638 
639 static int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
640 {
641   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
642   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
643   return 0;
644 }
645 
646 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
647 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
648 
649 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
650 {
651   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
652   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
653   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
654   int        nztot, nzA, nzB, lrow, brstart = mat->rstart*bs, brend = mat->rend*bs;
655   int        *cmap, *idx_p,cstart = mat->cstart;
656 
657   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,"MatGetRow_MPIBAIJ:Already active");
658   mat->getrowactive = PETSC_TRUE;
659 
660   if (!mat->rowvalues && (idx || v)) {
661     /*
662         allocate enough space to hold information from the longest row.
663     */
664     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
665     int     max = 1,mbs = mat->mbs,tmp;
666     for ( i=0; i<mbs; i++ ) {
667       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
668       if (max < tmp) { max = tmp; }
669     }
670     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
671     CHKPTRQ(mat->rowvalues);
672     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
673   }
674 
675 
676   if (row < brstart || row >= brend) SETERRQ(1,"MatGetRow_MPIBAIJ:Only local rows")
677   lrow = row - brstart;
678 
679   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
680   if (!v)   {pvA = 0; pvB = 0;}
681   if (!idx) {pcA = 0; if (!v) pcB = 0;}
682   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
683   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
684   nztot = nzA + nzB;
685 
686   cmap  = mat->garray;
687   if (v  || idx) {
688     if (nztot) {
689       /* Sort by increasing column numbers, assuming A and B already sorted */
690       int imark = -1;
691       if (v) {
692         *v = v_p = mat->rowvalues;
693         for ( i=0; i<nzB; i++ ) {
694           if (cmap[cworkB[i]/bs] < cstart)   v_p[i] = vworkB[i];
695           else break;
696         }
697         imark = i;
698         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
699         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
700       }
701       if (idx) {
702         *idx = idx_p = mat->rowindices;
703         if (imark > -1) {
704           for ( i=0; i<imark; i++ ) {
705             idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs;
706           }
707         } else {
708           for ( i=0; i<nzB; i++ ) {
709             if (cmap[cworkB[i]/bs] < cstart)
710               idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
711             else break;
712           }
713           imark = i;
714         }
715         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart*bs + cworkA[i];
716         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
717       }
718     }
719     else {
720       if (idx) *idx = 0;
721       if (v)   *v   = 0;
722     }
723   }
724   *nz = nztot;
725   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
726   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
727   return 0;
728 }
729 
730 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
731 {
732   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
733   if (baij->getrowactive == PETSC_FALSE) {
734     SETERRQ(1,"MatRestoreRow_MPIBAIJ:MatGetRow not called");
735   }
736   baij->getrowactive = PETSC_FALSE;
737   return 0;
738 }
739 
740 static int MatGetBlockSize_MPIBAIJ(Mat mat,int *bs)
741 {
742   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
743   *bs = baij->bs;
744   return 0;
745 }
746 
747 static int MatZeroEntries_MPIBAIJ(Mat A)
748 {
749   Mat_MPIBAIJ *l = (Mat_MPIBAIJ *) A->data;
750   int         ierr;
751   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
752   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
753   return 0;
754 }
755 
756 static int MatGetInfo_MPIBAIJ(Mat matin,MatInfoType flag,MatInfo *info)
757 {
758   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) matin->data;
759   Mat         A = a->A, B = a->B;
760   int         ierr;
761   double      isend[5], irecv[5];
762 
763   info->rows_global    = (double)a->M;
764   info->columns_global = (double)a->N;
765   info->rows_local     = (double)a->m;
766   info->columns_local  = (double)a->N;
767   info->block_size     = (double)a->bs;
768   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
769   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->memory;
770   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
771   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->memory;
772   if (flag == MAT_LOCAL) {
773     info->nz_used      = isend[0];
774     info->nz_allocated = isend[1];
775     info->nz_unneeded  = isend[2];
776     info->memory       = isend[3];
777     info->mallocs      = isend[4];
778   } else if (flag == MAT_GLOBAL_MAX) {
779     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_MAX,matin->comm);
780     info->nz_used      = irecv[0];
781     info->nz_allocated = irecv[1];
782     info->nz_unneeded  = irecv[2];
783     info->memory       = irecv[3];
784     info->mallocs      = irecv[4];
785   } else if (flag == MAT_GLOBAL_SUM) {
786     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_SUM,matin->comm);
787     info->nz_used      = irecv[0];
788     info->nz_allocated = irecv[1];
789     info->nz_unneeded  = irecv[2];
790     info->memory       = irecv[3];
791     info->mallocs      = irecv[4];
792   }
793   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
794   info->fill_ratio_needed = 0;
795   info->factor_mallocs    = 0;
796   return 0;
797 }
798 
799 static int MatSetOption_MPIBAIJ(Mat A,MatOption op)
800 {
801   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
802 
803   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
804       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
805       op == MAT_COLUMNS_SORTED ||
806       op == MAT_ROW_ORIENTED) {
807         MatSetOption(a->A,op);
808         MatSetOption(a->B,op);
809   }
810   else if (op == MAT_ROWS_SORTED ||
811            op == MAT_SYMMETRIC ||
812            op == MAT_STRUCTURALLY_SYMMETRIC ||
813            op == MAT_YES_NEW_DIAGONALS)
814     PLogInfo(A,"Info:MatSetOption_MPIBAIJ:Option ignored\n");
815   else if (op == MAT_COLUMN_ORIENTED) {
816     a->roworiented = 0;
817     MatSetOption(a->A,op);
818     MatSetOption(a->B,op);
819   }
820   else if (op == MAT_NO_NEW_DIAGONALS)
821     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIBAIJ:MAT_NO_NEW_DIAGONALS");}
822   else
823     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIBAIJ:unknown option");}
824   return 0;
825 }
826 
827 static int MatTranspose_MPIBAIJ(Mat A,Mat *matout)
828 {
829   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) A->data;
830   Mat_SeqBAIJ *Aloc;
831   Mat        B;
832   int        ierr,M=baij->M,N=baij->N,*ai,*aj,row,i,*rvals,j,k,col;
833   int        bs=baij->bs,mbs=baij->mbs;
834   Scalar     *a;
835 
836   if (matout == PETSC_NULL && M != N)
837     SETERRQ(1,"MatTranspose_MPIBAIJ:Square matrix only for in-place");
838   ierr = MatCreateMPIBAIJ(A->comm,baij->bs,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);
839   CHKERRQ(ierr);
840 
841   /* copy over the A part */
842   Aloc = (Mat_SeqBAIJ*) baij->A->data;
843   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
844   row = baij->rstart;
845   rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
846 
847   for ( i=0; i<mbs; i++ ) {
848     rvals[0] = bs*(baij->rstart + i);
849     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
850     for ( j=ai[i]; j<ai[i+1]; j++ ) {
851       col = (baij->cstart+aj[j])*bs;
852       for (k=0; k<bs; k++ ) {
853         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
854         col++; a += bs;
855       }
856     }
857   }
858   /* copy over the B part */
859   Aloc = (Mat_SeqBAIJ*) baij->B->data;
860   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
861   row = baij->rstart*bs;
862   for ( i=0; i<mbs; i++ ) {
863     rvals[0] = bs*(baij->rstart + i);
864     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
865     for ( j=ai[i]; j<ai[i+1]; j++ ) {
866       col = baij->garray[aj[j]]*bs;
867       for (k=0; k<bs; k++ ) {
868         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
869         col++; a += bs;
870       }
871     }
872   }
873   PetscFree(rvals);
874   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
875   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
876 
877   if (matout != PETSC_NULL) {
878     *matout = B;
879   } else {
880     /* This isn't really an in-place transpose .... but free data structures from baij */
881     PetscFree(baij->rowners);
882     ierr = MatDestroy(baij->A); CHKERRQ(ierr);
883     ierr = MatDestroy(baij->B); CHKERRQ(ierr);
884     if (baij->colmap) PetscFree(baij->colmap);
885     if (baij->garray) PetscFree(baij->garray);
886     if (baij->lvec) VecDestroy(baij->lvec);
887     if (baij->Mvctx) VecScatterDestroy(baij->Mvctx);
888     PetscFree(baij);
889     PetscMemcpy(A,B,sizeof(struct _Mat));
890     PetscHeaderDestroy(B);
891   }
892   return 0;
893 }
894 /* the code does not do the diagonal entries correctly unless the
895    matrix is square and the column and row owerships are identical.
896    This is a BUG. The only way to fix it seems to be to access
897    baij->A and baij->B directly and not through the MatZeroRows()
898    routine.
899 */
900 static int MatZeroRows_MPIBAIJ(Mat A,IS is,Scalar *diag)
901 {
902   Mat_MPIBAIJ    *l = (Mat_MPIBAIJ *) A->data;
903   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
904   int            *procs,*nprocs,j,found,idx,nsends,*work;
905   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
906   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
907   int            *lens,imdex,*lrows,*values,bs=l->bs;
908   MPI_Comm       comm = A->comm;
909   MPI_Request    *send_waits,*recv_waits;
910   MPI_Status     recv_status,*send_status;
911   IS             istmp;
912 
913   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
914   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
915 
916   /*  first count number of contributors to each processor */
917   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
918   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
919   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
920   for ( i=0; i<N; i++ ) {
921     idx = rows[i];
922     found = 0;
923     for ( j=0; j<size; j++ ) {
924       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
925         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
926       }
927     }
928     if (!found) SETERRQ(1,"MatZeroRows_MPIBAIJ:Index out of range");
929   }
930   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
931 
932   /* inform other processors of number of messages and max length*/
933   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
934   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
935   nrecvs = work[rank];
936   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
937   nmax = work[rank];
938   PetscFree(work);
939 
940   /* post receives:   */
941   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
942   CHKPTRQ(rvalues);
943   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
944   CHKPTRQ(recv_waits);
945   for ( i=0; i<nrecvs; i++ ) {
946     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
947   }
948 
949   /* do sends:
950       1) starts[i] gives the starting index in svalues for stuff going to
951          the ith processor
952   */
953   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
954   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
955   CHKPTRQ(send_waits);
956   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
957   starts[0] = 0;
958   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
959   for ( i=0; i<N; i++ ) {
960     svalues[starts[owner[i]]++] = rows[i];
961   }
962   ISRestoreIndices(is,&rows);
963 
964   starts[0] = 0;
965   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
966   count = 0;
967   for ( i=0; i<size; i++ ) {
968     if (procs[i]) {
969       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
970     }
971   }
972   PetscFree(starts);
973 
974   base = owners[rank]*bs;
975 
976   /*  wait on receives */
977   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
978   source = lens + nrecvs;
979   count  = nrecvs; slen = 0;
980   while (count) {
981     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
982     /* unpack receives into our local space */
983     MPI_Get_count(&recv_status,MPI_INT,&n);
984     source[imdex]  = recv_status.MPI_SOURCE;
985     lens[imdex]  = n;
986     slen += n;
987     count--;
988   }
989   PetscFree(recv_waits);
990 
991   /* move the data into the send scatter */
992   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
993   count = 0;
994   for ( i=0; i<nrecvs; i++ ) {
995     values = rvalues + i*nmax;
996     for ( j=0; j<lens[i]; j++ ) {
997       lrows[count++] = values[j] - base;
998     }
999   }
1000   PetscFree(rvalues); PetscFree(lens);
1001   PetscFree(owner); PetscFree(nprocs);
1002 
1003   /* actually zap the local rows */
1004   ierr = ISCreateGeneral(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
1005   PLogObjectParent(A,istmp);
1006   PetscFree(lrows);
1007   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
1008   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
1009   ierr = ISDestroy(istmp); CHKERRQ(ierr);
1010 
1011   /* wait on sends */
1012   if (nsends) {
1013     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
1014     CHKPTRQ(send_status);
1015     MPI_Waitall(nsends,send_waits,send_status);
1016     PetscFree(send_status);
1017   }
1018   PetscFree(send_waits); PetscFree(svalues);
1019 
1020   return 0;
1021 }
1022 extern int MatPrintHelp_SeqBAIJ(Mat);
1023 static int MatPrintHelp_MPIBAIJ(Mat A)
1024 {
1025   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1026 
1027   if (!a->rank) return MatPrintHelp_SeqBAIJ(a->A);
1028   else return 0;
1029 }
1030 
1031 static int MatConvertSameType_MPIBAIJ(Mat,Mat *,int);
1032 
1033 /* -------------------------------------------------------------------*/
1034 static struct _MatOps MatOps = {
1035   MatSetValues_MPIBAIJ,MatGetRow_MPIBAIJ,MatRestoreRow_MPIBAIJ,MatMult_MPIBAIJ,
1036   MatMultAdd_MPIBAIJ,MatMultTrans_MPIBAIJ,MatMultTransAdd_MPIBAIJ,MatSolve_MPIBAIJ,
1037   MatSolveAdd_MPIBAIJ,MatSolveTrans_MPIBAIJ,MatSolveTransAdd_MPIBAIJ,MatLUFactor_MPIBAIJ,
1038   0,0,MatTranspose_MPIBAIJ,MatGetInfo_MPIBAIJ,
1039   0,MatGetDiagonal_MPIBAIJ,0,MatNorm_MPIBAIJ,
1040   MatAssemblyBegin_MPIBAIJ,MatAssemblyEnd_MPIBAIJ,0,MatSetOption_MPIBAIJ,
1041   MatZeroEntries_MPIBAIJ,MatZeroRows_MPIBAIJ,MatGetReordering_MPIBAIJ,MatLUFactorSymbolic_MPIBAIJ,
1042   MatLUFactorNumeric_MPIBAIJ,0,0,MatGetSize_MPIBAIJ,
1043   MatGetLocalSize_MPIBAIJ,MatGetOwnershipRange_MPIBAIJ,MatILUFactorSymbolic_MPIBAIJ,0,
1044   0,0,0,MatConvertSameType_MPIBAIJ,0,0,
1045   0,0,0,MatGetSubMatrices_MPIBAIJ,
1046   MatIncreaseOverlap_MPIBAIJ,MatGetValues_MPIBAIJ,0,MatPrintHelp_MPIBAIJ,
1047   MatScale_MPIBAIJ,0,0,0,MatGetBlockSize_MPIBAIJ};
1048 
1049 
1050 /*@C
1051    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
1052    (block compressed row).  For good matrix assembly performance
1053    the user should preallocate the matrix storage by setting the parameters
1054    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1055    performance can be increased by more than a factor of 50.
1056 
1057    Input Parameters:
1058 .  comm - MPI communicator
1059 .  bs   - size of blockk
1060 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1061            This value should be the same as the local size used in creating the
1062            y vector for the matrix-vector product y = Ax.
1063 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1064            This value should be the same as the local size used in creating the
1065            x vector for the matrix-vector product y = Ax.
1066 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1067 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1068 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
1069            submatrix  (same for all local rows)
1070 .  d_nzz - array containing the number of block nonzeros in the various block rows
1071            of the in diagonal portion of the local (possibly different for each block
1072            row) or PETSC_NULL.  You must leave room for the diagonal entry even if
1073            it is zero.
1074 .  o_nz  - number of block nonzeros per block row in the off-diagonal portion of local
1075            submatrix (same for all local rows).
1076 .  o_nzz - array containing the number of nonzeros in the various block rows of the
1077            off-diagonal portion of the local submatrix (possibly different for
1078            each block row) or PETSC_NULL.
1079 
1080    Output Parameter:
1081 .  A - the matrix
1082 
1083    Notes:
1084    The user MUST specify either the local or global matrix dimensions
1085    (possibly both).
1086 
1087    Storage Information:
1088    For a square global matrix we define each processor's diagonal portion
1089    to be its local rows and the corresponding columns (a square submatrix);
1090    each processor's off-diagonal portion encompasses the remainder of the
1091    local matrix (a rectangular submatrix).
1092 
1093    The user can specify preallocated storage for the diagonal part of
1094    the local submatrix with either d_nz or d_nnz (not both).  Set
1095    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1096    memory allocation.  Likewise, specify preallocated storage for the
1097    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1098 
1099    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1100    the figure below we depict these three local rows and all columns (0-11).
1101 
1102 $          0 1 2 3 4 5 6 7 8 9 10 11
1103 $         -------------------
1104 $  row 3  |  o o o d d d o o o o o o
1105 $  row 4  |  o o o d d d o o o o o o
1106 $  row 5  |  o o o d d d o o o o o o
1107 $         -------------------
1108 $
1109 
1110    Thus, any entries in the d locations are stored in the d (diagonal)
1111    submatrix, and any entries in the o locations are stored in the
1112    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1113    stored simply in the MATSEQBAIJ format for compressed row storage.
1114 
1115    Now d_nz should indicate the number of nonzeros per row in the d matrix,
1116    and o_nz should indicate the number of nonzeros per row in the o matrix.
1117    In general, for PDE problems in which most nonzeros are near the diagonal,
1118    one expects d_nz >> o_nz.   For large problems you MUST preallocate memory
1119    or you will get TERRIBLE performance; see the users' manual chapter on
1120    matrices and the file $(PETSC_DIR)/Performance.
1121 
1122 .keywords: matrix, block, aij, compressed row, sparse, parallel
1123 
1124 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues()
1125 @*/
1126 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
1127                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1128 {
1129   Mat          B;
1130   Mat_MPIBAIJ  *b;
1131   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE;
1132 
1133   if (bs < 1) SETERRQ(1,"MatCreateMPIBAIJ: invalid block size specified");
1134   *A = 0;
1135   PetscHeaderCreate(B,_Mat,MAT_COOKIE,MATMPIBAIJ,comm);
1136   PLogObjectCreate(B);
1137   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
1138   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
1139   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1140   B->destroy    = MatDestroy_MPIBAIJ;
1141   B->view       = MatView_MPIBAIJ;
1142 
1143   B->factor     = 0;
1144   B->assembled  = PETSC_FALSE;
1145 
1146   b->insertmode = NOT_SET_VALUES;
1147   MPI_Comm_rank(comm,&b->rank);
1148   MPI_Comm_size(comm,&b->size);
1149 
1150   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
1151     SETERRQ(1,"MatCreateMPIBAIJ:Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1152   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) SETERRQ(1,"MatCreateMPIBAIJ: either M or m should be specified");
1153   if ( M == PETSC_DECIDE && n == PETSC_DECIDE)SETERRQ(1,"MatCreateMPIBAIJ: either N or n should be specified");
1154   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
1155   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
1156 
1157   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1158     work[0] = m; work[1] = n;
1159     mbs = m/bs; nbs = n/bs;
1160     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
1161     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
1162     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
1163   }
1164   if (m == PETSC_DECIDE) {
1165     Mbs = M/bs;
1166     if (Mbs*bs != M) SETERRQ(1,"MatCreateMPIBAIJ: No of global rows must be divisible by blocksize");
1167     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
1168     m   = mbs*bs;
1169   }
1170   if (n == PETSC_DECIDE) {
1171     Nbs = N/bs;
1172     if (Nbs*bs != N) SETERRQ(1,"MatCreateMPIBAIJ: No of global cols must be divisible by blocksize");
1173     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
1174     n   = nbs*bs;
1175   }
1176   if (mbs*bs != m || nbs*bs != n) SETERRQ(1,"MatCreateMPIBAIJ: No of local rows, cols must be divisible by blocksize");
1177 
1178   b->m = m; B->m = m;
1179   b->n = n; B->n = n;
1180   b->N = N; B->N = N;
1181   b->M = M; B->M = M;
1182   b->bs  = bs;
1183   b->bs2 = bs*bs;
1184   b->mbs = mbs;
1185   b->nbs = nbs;
1186   b->Mbs = Mbs;
1187   b->Nbs = Nbs;
1188 
1189   /* build local table of row and column ownerships */
1190   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1191   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
1192   b->cowners = b->rowners + b->size + 2;
1193   MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
1194   b->rowners[0] = 0;
1195   for ( i=2; i<=b->size; i++ ) {
1196     b->rowners[i] += b->rowners[i-1];
1197   }
1198   b->rstart = b->rowners[b->rank];
1199   b->rend   = b->rowners[b->rank+1];
1200   MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
1201   b->cowners[0] = 0;
1202   for ( i=2; i<=b->size; i++ ) {
1203     b->cowners[i] += b->cowners[i-1];
1204   }
1205   b->cstart = b->cowners[b->rank];
1206   b->cend   = b->cowners[b->rank+1];
1207 
1208   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1209   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1210   PLogObjectParent(B,b->A);
1211   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1212   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1213   PLogObjectParent(B,b->B);
1214 
1215   /* build cache for off array entries formed */
1216   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1217   b->colmap      = 0;
1218   b->garray      = 0;
1219   b->roworiented = 1;
1220 
1221   /* stuff used for matrix vector multiply */
1222   b->lvec      = 0;
1223   b->Mvctx     = 0;
1224 
1225   /* stuff for MatGetRow() */
1226   b->rowindices   = 0;
1227   b->rowvalues    = 0;
1228   b->getrowactive = PETSC_FALSE;
1229 
1230   *A = B;
1231   return 0;
1232 }
1233 static int MatConvertSameType_MPIBAIJ(Mat matin,Mat *newmat,int cpvalues)
1234 {
1235   Mat        mat;
1236   Mat_MPIBAIJ *a,*oldmat = (Mat_MPIBAIJ *) matin->data;
1237   int        ierr, len=0, flg;
1238 
1239   *newmat       = 0;
1240   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIBAIJ,matin->comm);
1241   PLogObjectCreate(mat);
1242   mat->data       = (void *) (a = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(a);
1243   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1244   mat->destroy    = MatDestroy_MPIBAIJ;
1245   mat->view       = MatView_MPIBAIJ;
1246   mat->factor     = matin->factor;
1247   mat->assembled  = PETSC_TRUE;
1248 
1249   a->m = mat->m   = oldmat->m;
1250   a->n = mat->n   = oldmat->n;
1251   a->M = mat->M   = oldmat->M;
1252   a->N = mat->N   = oldmat->N;
1253 
1254   a->bs  = oldmat->bs;
1255   a->bs2 = oldmat->bs2;
1256   a->mbs = oldmat->mbs;
1257   a->nbs = oldmat->nbs;
1258   a->Mbs = oldmat->Mbs;
1259   a->Nbs = oldmat->Nbs;
1260 
1261   a->rstart       = oldmat->rstart;
1262   a->rend         = oldmat->rend;
1263   a->cstart       = oldmat->cstart;
1264   a->cend         = oldmat->cend;
1265   a->size         = oldmat->size;
1266   a->rank         = oldmat->rank;
1267   a->insertmode   = NOT_SET_VALUES;
1268   a->rowvalues    = 0;
1269   a->getrowactive = PETSC_FALSE;
1270 
1271   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1272   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
1273   a->cowners = a->rowners + a->size + 2;
1274   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1275   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1276   if (oldmat->colmap) {
1277     a->colmap = (int *) PetscMalloc((a->Nbs)*sizeof(int));CHKPTRQ(a->colmap);
1278     PLogObjectMemory(mat,(a->Nbs)*sizeof(int));
1279     PetscMemcpy(a->colmap,oldmat->colmap,(a->Nbs)*sizeof(int));
1280   } else a->colmap = 0;
1281   if (oldmat->garray && (len = ((Mat_SeqBAIJ *) (oldmat->B->data))->nbs)) {
1282     a->garray = (int *) PetscMalloc(len*sizeof(int)); CHKPTRQ(a->garray);
1283     PLogObjectMemory(mat,len*sizeof(int));
1284     PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1285   } else a->garray = 0;
1286 
1287   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1288   PLogObjectParent(mat,a->lvec);
1289   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1290   PLogObjectParent(mat,a->Mvctx);
1291   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1292   PLogObjectParent(mat,a->A);
1293   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1294   PLogObjectParent(mat,a->B);
1295   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1296   if (flg) {
1297     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1298   }
1299   *newmat = mat;
1300   return 0;
1301 }
1302 
1303 #include "sys.h"
1304 
1305 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
1306 {
1307   Mat          A;
1308   int          i, nz, ierr, j,rstart, rend, fd;
1309   Scalar       *vals,*buf;
1310   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1311   MPI_Status   status;
1312   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
1313   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
1314   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,bs2,Mbs,mbs,extra_rows;
1315   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
1316   int          dcount,kmax,k,nzcount,tmp;
1317 
1318 
1319   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
1320   bs2  = bs*bs;
1321 
1322   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1323   if (!rank) {
1324     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1325     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
1326     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIBAIJ:not matrix object");
1327   }
1328 
1329   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1330   M = header[1]; N = header[2];
1331 
1332   if (M != N) SETERRQ(1,"MatLoad_SeqBAIJ:Can only do square matrices");
1333 
1334   /*
1335      This code adds extra rows to make sure the number of rows is
1336      divisible by the blocksize
1337   */
1338   Mbs        = M/bs;
1339   extra_rows = bs - M + bs*(Mbs);
1340   if (extra_rows == bs) extra_rows = 0;
1341   else                  Mbs++;
1342   if (extra_rows &&!rank) {
1343     PLogInfo(0,"MatLoad_SeqBAIJ:Padding loaded matrix to match blocksize\n");
1344   }
1345 
1346   /* determine ownership of all rows */
1347   mbs = Mbs/size + ((Mbs % size) > rank);
1348   m   = mbs * bs;
1349   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
1350   browners = rowners + size + 1;
1351   MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1352   rowners[0] = 0;
1353   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
1354   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
1355   rstart = rowners[rank];
1356   rend   = rowners[rank+1];
1357 
1358   /* distribute row lengths to all processors */
1359   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
1360   if (!rank) {
1361     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
1362     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1363     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
1364     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1365     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
1366     MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);
1367     PetscFree(sndcounts);
1368   }
1369   else {
1370     MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);
1371   }
1372 
1373   if (!rank) {
1374     /* calculate the number of nonzeros on each processor */
1375     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1376     PetscMemzero(procsnz,size*sizeof(int));
1377     for ( i=0; i<size; i++ ) {
1378       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
1379         procsnz[i] += rowlengths[j];
1380       }
1381     }
1382     PetscFree(rowlengths);
1383 
1384     /* determine max buffer needed and allocate it */
1385     maxnz = 0;
1386     for ( i=0; i<size; i++ ) {
1387       maxnz = PetscMax(maxnz,procsnz[i]);
1388     }
1389     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1390 
1391     /* read in my part of the matrix column indices  */
1392     nz = procsnz[0];
1393     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1394     mycols = ibuf;
1395     if (size == 1)  nz -= extra_rows;
1396     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1397     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
1398 
1399     /* read in every ones (except the last) and ship off */
1400     for ( i=1; i<size-1; i++ ) {
1401       nz = procsnz[i];
1402       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1403       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1404     }
1405     /* read in the stuff for the last proc */
1406     if ( size != 1 ) {
1407       nz = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
1408       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1409       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
1410       MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);
1411     }
1412     PetscFree(cols);
1413   }
1414   else {
1415     /* determine buffer space needed for message */
1416     nz = 0;
1417     for ( i=0; i<m; i++ ) {
1418       nz += locrowlens[i];
1419     }
1420     ibuf = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1421     mycols = ibuf;
1422     /* receive message of column indices*/
1423     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1424     MPI_Get_count(&status,MPI_INT,&maxnz);
1425     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIBAIJ:something is wrong with file");
1426   }
1427 
1428   /* loop over local rows, determining number of off diagonal entries */
1429   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
1430   odlens = dlens + (rend-rstart);
1431   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
1432   PetscMemzero(mask,3*Mbs*sizeof(int));
1433   masked1 = mask    + Mbs;
1434   masked2 = masked1 + Mbs;
1435   rowcount = 0; nzcount = 0;
1436   for ( i=0; i<mbs; i++ ) {
1437     dcount  = 0;
1438     odcount = 0;
1439     for ( j=0; j<bs; j++ ) {
1440       kmax = locrowlens[rowcount];
1441       for ( k=0; k<kmax; k++ ) {
1442         tmp = mycols[nzcount++]/bs;
1443         if (!mask[tmp]) {
1444           mask[tmp] = 1;
1445           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
1446           else masked1[dcount++] = tmp;
1447         }
1448       }
1449       rowcount++;
1450     }
1451 
1452     dlens[i]  = dcount;
1453     odlens[i] = odcount;
1454 
1455     /* zero out the mask elements we set */
1456     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
1457     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
1458   }
1459 
1460   /* create our matrix */
1461   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);
1462          CHKERRQ(ierr);
1463   A = *newmat;
1464   MatSetOption(A,MAT_COLUMNS_SORTED);
1465 
1466   if (!rank) {
1467     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
1468     /* read in my part of the matrix numerical values  */
1469     nz = procsnz[0];
1470     vals = buf;
1471     mycols = ibuf;
1472     if (size == 1)  nz -= extra_rows;
1473     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1474     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
1475 
1476     /* insert into matrix */
1477     jj      = rstart*bs;
1478     for ( i=0; i<m; i++ ) {
1479       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1480       mycols += locrowlens[i];
1481       vals   += locrowlens[i];
1482       jj++;
1483     }
1484     /* read in other processors (except the last one) and ship out */
1485     for ( i=1; i<size-1; i++ ) {
1486       nz = procsnz[i];
1487       vals = buf;
1488       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1489       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1490     }
1491     /* the last proc */
1492     if ( size != 1 ){
1493       nz = procsnz[i] - extra_rows;
1494       vals = buf;
1495       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1496       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
1497       MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);
1498     }
1499     PetscFree(procsnz);
1500   }
1501   else {
1502     /* receive numeric values */
1503     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
1504 
1505     /* receive message of values*/
1506     vals = buf;
1507     mycols = ibuf;
1508     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1509     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1510     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIBAIJ:something is wrong with file");
1511 
1512     /* insert into matrix */
1513     jj      = rstart*bs;
1514     for ( i=0; i<m; i++ ) {
1515       ierr    = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1516       mycols += locrowlens[i];
1517       vals   += locrowlens[i];
1518       jj++;
1519     }
1520   }
1521   PetscFree(locrowlens);
1522   PetscFree(buf);
1523   PetscFree(ibuf);
1524   PetscFree(rowners);
1525   PetscFree(dlens);
1526   PetscFree(mask);
1527   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1528   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1529   return 0;
1530 }
1531 
1532 
1533