xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision 0d4b0b6c1a374e6007cd4b2fbb5567147f7b1605)
1 #ifndef lint
2 static char vcid[] = "$Id: mpibaij.c,v 1.21 1996/08/08 14:43:40 bsmith Exp bsmith $";
3 #endif
4 
5 #include "src/mat/impls/baij/mpi/mpibaij.h"
6 #include "src/vec/vecimpl.h"
7 
8 #include "draw.h"
9 #include "pinclude/pviewer.h"
10 
11 extern int MatSetUpMultiply_MPIBAIJ(Mat);
12 extern int DisAssemble_MPIBAIJ(Mat);
13 extern int MatIncreaseOverlap_MPIBAIJ(Mat,int,IS *,int);
14 extern int MatGetSubMatrices_MPIBAIJ(Mat,int,IS *,IS *,MatGetSubMatrixCall,Mat **);
15 extern int MatLUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,Mat *);
16 extern int MatLUFactorNumeric_MPIBAIJ(Mat,Mat *);
17 extern int MatLUFactor_MPIBAIJ(Mat,IS,IS,double);
18 extern int MatSolve_MPIBAIJ(Mat,Vec,Vec);
19 extern int MatSolveAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
20 extern int MatSolveTrans_MPIBAIJ(Mat,Vec,Vec);
21 extern int MatSolveTransAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
22 extern int MatILUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,int,Mat *);
23 
24 /*
25      Local utility routine that creates a mapping from the global column
26    number to the local number in the off-diagonal part of the local
27    storage of the matrix.  This is done in a non scable way since the
28    length of colmap equals the global matrix length.
29 */
30 static int CreateColmap_MPIBAIJ_Private(Mat mat)
31 {
32   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
33   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
34   int         nbs = B->nbs,i;
35 
36   baij->colmap = (int *) PetscMalloc(baij->Nbs*sizeof(int));CHKPTRQ(baij->colmap);
37   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
38   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
39   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i + 1;
40   return 0;
41 }
42 
43 
44 static int MatGetReordering_MPIBAIJ(Mat mat,MatReordering type,IS *rperm,IS *cperm)
45 {
46   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
47   int         ierr;
48   if (baij->size == 1) {
49     ierr = MatGetReordering(baij->A,type,rperm,cperm); CHKERRQ(ierr);
50   } else SETERRQ(1,"MatGetReordering_MPIBAIJ:not supported in parallel");
51   return 0;
52 }
53 
54 static int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
55 {
56   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
57   Scalar      value;
58   int         ierr,i,j, rstart = baij->rstart, rend = baij->rend;
59   int         cstart = baij->cstart, cend = baij->cend,row,col;
60   int         roworiented = baij->roworiented,rstart_orig,rend_orig;
61   int         cstart_orig,cend_orig,bs=baij->bs;
62 
63   if (baij->insertmode != NOT_SET_VALUES && baij->insertmode != addv) {
64     SETERRQ(1,"MatSetValues_MPIBAIJ:Cannot mix inserts and adds");
65   }
66   baij->insertmode = addv;
67   rstart_orig = rstart*bs;
68   rend_orig   = rend*bs;
69   cstart_orig = cstart*bs;
70   cend_orig   = cend*bs;
71   for ( i=0; i<m; i++ ) {
72     if (im[i] < 0) SETERRQ(1,"MatSetValues_MPIBAIJ:Negative row");
73     if (im[i] >= baij->M) SETERRQ(1,"MatSetValues_MPIBAIJ:Row too large");
74     if (im[i] >= rstart_orig && im[i] < rend_orig) {
75       row = im[i] - rstart_orig;
76       for ( j=0; j<n; j++ ) {
77         if (in[j] < 0) SETERRQ(1,"MatSetValues_MPIBAIJ:Negative column");
78         if (in[j] >= baij->N) SETERRQ(1,"MatSetValues_MPIBAIJ:Col too large");
79         if (in[j] >= cstart_orig && in[j] < cend_orig){
80           col = in[j] - cstart_orig;
81           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
82           ierr = MatSetValues(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
83         }
84         else {
85           if (mat->was_assembled) {
86             if (!baij->colmap) {
87               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
88             }
89             col = baij->colmap[in[j]/bs] - 1 + in[j]%bs;
90             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
91               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
92               col =  in[j];
93             }
94           }
95           else col = in[j];
96           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
97           ierr = MatSetValues(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
98         }
99       }
100     }
101     else {
102       if (roworiented) {
103         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
104       }
105       else {
106         row = im[i];
107         for ( j=0; j<n; j++ ) {
108           ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
109         }
110       }
111     }
112   }
113   return 0;
114 }
115 
116 static int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
117 {
118   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
119   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
120   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col;
121 
122   for ( i=0; i<m; i++ ) {
123     if (idxm[i] < 0) SETERRQ(1,"MatGetValues_MPIBAIJ:Negative row");
124     if (idxm[i] >= baij->M) SETERRQ(1,"MatGetValues_MPIBAIJ:Row too large");
125     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
126       row = idxm[i] - bsrstart;
127       for ( j=0; j<n; j++ ) {
128         if (idxn[j] < 0) SETERRQ(1,"MatGetValues_MPIBAIJ:Negative column");
129         if (idxn[j] >= baij->N) SETERRQ(1,"MatGetValues_MPIBAIJ:Col too large");
130         if (idxn[j] >= bscstart && idxn[j] < bscend){
131           col = idxn[j] - bscstart;
132           ierr = MatGetValues(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
133         }
134         else {
135           if (!baij->colmap) {
136             ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
137           }
138           if((baij->colmap[idxn[j]/bs]-1 < 0) ||
139              (baij->garray[baij->colmap[idxn[j]/bs]-1] != idxn[j]/bs)) *(v+i*n+j) = 0.0;
140           else {
141             col  = (baij->colmap[idxn[j]/bs]-1)*bs + idxn[j]%bs;
142             ierr = MatGetValues(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
143           }
144         }
145       }
146     }
147     else {
148       SETERRQ(1,"MatGetValues_MPIBAIJ:Only local values currently supported");
149     }
150   }
151   return 0;
152 }
153 
154 static int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
155 {
156   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
157   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
158   int        ierr, i,bs2=baij->bs2;
159   double     sum = 0.0;
160   Scalar     *v;
161 
162   if (baij->size == 1) {
163     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
164   } else {
165     if (type == NORM_FROBENIUS) {
166       v = amat->a;
167       for (i=0; i<amat->nz*bs2; i++ ) {
168 #if defined(PETSC_COMPLEX)
169         sum += real(conj(*v)*(*v)); v++;
170 #else
171         sum += (*v)*(*v); v++;
172 #endif
173       }
174       v = bmat->a;
175       for (i=0; i<bmat->nz*bs2; i++ ) {
176 #if defined(PETSC_COMPLEX)
177         sum += real(conj(*v)*(*v)); v++;
178 #else
179         sum += (*v)*(*v); v++;
180 #endif
181       }
182       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
183       *norm = sqrt(*norm);
184     }
185     else
186       SETERRQ(1,"MatNorm_SeqBAIJ:No support for this norm yet");
187   }
188   return 0;
189 }
190 
191 static int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
192 {
193   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
194   MPI_Comm    comm = mat->comm;
195   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
196   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
197   MPI_Request *send_waits,*recv_waits;
198   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
199   InsertMode  addv;
200   Scalar      *rvalues,*svalues;
201 
202   /* make sure all processors are either in INSERTMODE or ADDMODE */
203   MPI_Allreduce(&baij->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
204   if (addv == (ADD_VALUES|INSERT_VALUES)) {
205     SETERRQ(1,"MatAssemblyBegin_MPIBAIJ:Some processors inserted others added");
206   }
207   baij->insertmode = addv; /* in case this processor had no cache */
208 
209   /*  first count number of contributors to each processor */
210   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
211   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
212   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
213   for ( i=0; i<baij->stash.n; i++ ) {
214     idx = baij->stash.idx[i];
215     for ( j=0; j<size; j++ ) {
216       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
217         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
218       }
219     }
220   }
221   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
222 
223   /* inform other processors of number of messages and max length*/
224   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
225   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
226   nreceives = work[rank];
227   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
228   nmax = work[rank];
229   PetscFree(work);
230 
231   /* post receives:
232        1) each message will consist of ordered pairs
233      (global index,value) we store the global index as a double
234      to simplify the message passing.
235        2) since we don't know how long each individual message is we
236      allocate the largest needed buffer for each receive. Potentially
237      this is a lot of wasted space.
238 
239 
240        This could be done better.
241   */
242   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
243   CHKPTRQ(rvalues);
244   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
245   CHKPTRQ(recv_waits);
246   for ( i=0; i<nreceives; i++ ) {
247     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
248               comm,recv_waits+i);
249   }
250 
251   /* do sends:
252       1) starts[i] gives the starting index in svalues for stuff going to
253          the ith processor
254   */
255   svalues = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
256   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
257   CHKPTRQ(send_waits);
258   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
259   starts[0] = 0;
260   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
261   for ( i=0; i<baij->stash.n; i++ ) {
262     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
263     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
264     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
265   }
266   PetscFree(owner);
267   starts[0] = 0;
268   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
269   count = 0;
270   for ( i=0; i<size; i++ ) {
271     if (procs[i]) {
272       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
273                 comm,send_waits+count++);
274     }
275   }
276   PetscFree(starts); PetscFree(nprocs);
277 
278   /* Free cache space */
279   PLogInfo(0,"[%d]MatAssemblyBegin_MPIBAIJ:Number of off processor values %d\n",rank,baij->stash.n);
280   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
281 
282   baij->svalues    = svalues;    baij->rvalues    = rvalues;
283   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
284   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
285   baij->rmax       = nmax;
286 
287   return 0;
288 }
289 
290 
291 static int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
292 {
293   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
294   MPI_Status  *send_status,recv_status;
295   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
296   int         bs=baij->bs,row,col,other_disassembled;
297   Scalar      *values,val;
298   InsertMode  addv = baij->insertmode;
299 
300   /*  wait on receives */
301   while (count) {
302     MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);
303     /* unpack receives into our local space */
304     values = baij->rvalues + 3*imdex*baij->rmax;
305     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
306     n = n/3;
307     for ( i=0; i<n; i++ ) {
308       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
309       col = (int) PetscReal(values[3*i+1]);
310       val = values[3*i+2];
311       if (col >= baij->cstart*bs && col < baij->cend*bs) {
312         col -= baij->cstart*bs;
313         MatSetValues(baij->A,1,&row,1,&col,&val,addv);
314       }
315       else {
316         if (mat->was_assembled) {
317           if (!baij->colmap) {
318             ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);
319           }
320           col = (baij->colmap[col/bs]-1)*bs + col%bs;
321           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
322             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
323             col = (int) PetscReal(values[3*i+1]);
324           }
325         }
326         MatSetValues(baij->B,1,&row,1,&col,&val,addv);
327       }
328     }
329     count--;
330   }
331   PetscFree(baij->recv_waits); PetscFree(baij->rvalues);
332 
333   /* wait on sends */
334   if (baij->nsends) {
335     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));
336     CHKPTRQ(send_status);
337     MPI_Waitall(baij->nsends,baij->send_waits,send_status);
338     PetscFree(send_status);
339   }
340   PetscFree(baij->send_waits); PetscFree(baij->svalues);
341 
342   baij->insertmode = NOT_SET_VALUES;
343   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
344   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
345 
346   /* determine if any processor has disassembled, if so we must
347      also disassemble ourselfs, in order that we may reassemble. */
348   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
349   if (mat->was_assembled && !other_disassembled) {
350     ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
351   }
352 
353   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
354     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
355   }
356   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
357   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
358 
359   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
360   return 0;
361 }
362 
363 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
364 {
365   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
366   int          ierr;
367 
368   if (baij->size == 1) {
369     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
370   }
371   else SETERRQ(1,"MatView_MPIBAIJ_Binary:Only uniprocessor output supported");
372   return 0;
373 }
374 
375 static int MatView_MPIBAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
376 {
377   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
378   int          ierr, format,rank,bs=baij->bs;
379   FILE         *fd;
380   ViewerType   vtype;
381 
382   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
383   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
384     ierr = ViewerGetFormat(viewer,&format);
385     if (format == ASCII_FORMAT_INFO_DETAILED) {
386       int nz, nzalloc, mem;
387       MPI_Comm_rank(mat->comm,&rank);
388       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
389       ierr = MatGetInfo(mat,MAT_LOCAL,&nz,&nzalloc,&mem);
390       PetscSequentialPhaseBegin(mat->comm,1);
391       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
392               rank,baij->m,nz*bs,nzalloc*bs,baij->bs,mem);
393       ierr = MatGetInfo(baij->A,MAT_LOCAL,&nz,&nzalloc,&mem);
394       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,nz*bs);
395       ierr = MatGetInfo(baij->B,MAT_LOCAL,&nz,&nzalloc,&mem);
396       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,nz*bs);
397       fflush(fd);
398       PetscSequentialPhaseEnd(mat->comm,1);
399       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
400       return 0;
401     }
402     else if (format == ASCII_FORMAT_INFO) {
403       return 0;
404     }
405   }
406 
407   if (vtype == DRAW_VIEWER) {
408     Draw       draw;
409     PetscTruth isnull;
410     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
411     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
412   }
413 
414   if (vtype == ASCII_FILE_VIEWER) {
415     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
416     PetscSequentialPhaseBegin(mat->comm,1);
417     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
418            baij->rank,baij->m,baij->rstart*bs,baij->rend*bs,baij->n,
419             baij->cstart*bs,baij->cend*bs);
420     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
421     ierr = MatView(baij->B,viewer); CHKERRQ(ierr);
422     fflush(fd);
423     PetscSequentialPhaseEnd(mat->comm,1);
424   }
425   else {
426     int size = baij->size;
427     rank = baij->rank;
428     if (size == 1) {
429       ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
430     }
431     else {
432       /* assemble the entire matrix onto first processor. */
433       Mat         A;
434       Mat_SeqBAIJ *Aloc;
435       int         M = baij->M, N = baij->N,*ai,*aj,row,col,i,j,k,*rvals;
436       int         mbs=baij->mbs;
437       Scalar      *a;
438 
439       if (!rank) {
440         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
441         CHKERRQ(ierr);
442       }
443       else {
444         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
445         CHKERRQ(ierr);
446       }
447       PLogObjectParent(mat,A);
448 
449       /* copy over the A part */
450       Aloc = (Mat_SeqBAIJ*) baij->A->data;
451       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
452       row = baij->rstart;
453       rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
454 
455       for ( i=0; i<mbs; i++ ) {
456         rvals[0] = bs*(baij->rstart + i);
457         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
458         for ( j=ai[i]; j<ai[i+1]; j++ ) {
459           col = (baij->cstart+aj[j])*bs;
460           for (k=0; k<bs; k++ ) {
461             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
462             col++; a += bs;
463           }
464         }
465       }
466       /* copy over the B part */
467       Aloc = (Mat_SeqBAIJ*) baij->B->data;
468       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
469       row = baij->rstart*bs;
470       for ( i=0; i<mbs; i++ ) {
471         rvals[0] = bs*(baij->rstart + i);
472         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
473         for ( j=ai[i]; j<ai[i+1]; j++ ) {
474           col = baij->garray[aj[j]]*bs;
475           for (k=0; k<bs; k++ ) {
476             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
477             col++; a += bs;
478           }
479         }
480       }
481       PetscFree(rvals);
482       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
483       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
484       if (!rank) {
485         ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
486       }
487       ierr = MatDestroy(A); CHKERRQ(ierr);
488     }
489   }
490   return 0;
491 }
492 
493 
494 
495 static int MatView_MPIBAIJ(PetscObject obj,Viewer viewer)
496 {
497   Mat         mat = (Mat) obj;
498   int         ierr;
499   ViewerType  vtype;
500 
501   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
502   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
503       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
504     ierr = MatView_MPIBAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
505   }
506   else if (vtype == BINARY_FILE_VIEWER) {
507     return MatView_MPIBAIJ_Binary(mat,viewer);
508   }
509   return 0;
510 }
511 
512 static int MatDestroy_MPIBAIJ(PetscObject obj)
513 {
514   Mat         mat = (Mat) obj;
515   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
516   int         ierr;
517 
518 #if defined(PETSC_LOG)
519   PLogObjectState(obj,"Rows=%d, Cols=%d",baij->M,baij->N);
520 #endif
521 
522   PetscFree(baij->rowners);
523   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
524   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
525   if (baij->colmap) PetscFree(baij->colmap);
526   if (baij->garray) PetscFree(baij->garray);
527   if (baij->lvec)   VecDestroy(baij->lvec);
528   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
529   if (baij->rowvalues) PetscFree(baij->rowvalues);
530   PetscFree(baij);
531   PLogObjectDestroy(mat);
532   PetscHeaderDestroy(mat);
533   return 0;
534 }
535 
536 static int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
537 {
538   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
539   int         ierr, nt;
540 
541   VecGetLocalSize_Fast(xx,nt);
542   if (nt != a->n) {
543     SETERRQ(1,"MatMult_MPIBAIJ:Incompatible parition of A and xx");
544   }
545   VecGetLocalSize_Fast(yy,nt);
546   if (nt != a->m) {
547     SETERRQ(1,"MatMult_MPIBAIJ:Incompatible parition of A and yy");
548   }
549   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
550   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
551   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
552   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
553   return 0;
554 }
555 
556 static int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
557 {
558   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
559   int        ierr;
560   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
561   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
562   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
563   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
564   return 0;
565 }
566 
567 static int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
568 {
569   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
570   int        ierr;
571 
572   /* do nondiagonal part */
573   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
574   /* send it on its way */
575   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
576   /* do local part */
577   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
578   /* receive remote parts: note this assumes the values are not actually */
579   /* inserted in yy until the next line, which is true for my implementation*/
580   /* but is not perhaps always true. */
581   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,
582                   (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx);CHKERRQ(ierr);
583   return 0;
584 }
585 
586 static int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
587 {
588   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
589   int        ierr;
590 
591   /* do nondiagonal part */
592   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
593   /* send it on its way */
594   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
595   /* do local part */
596   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
597   /* receive remote parts: note this assumes the values are not actually */
598   /* inserted in yy until the next line, which is true for my implementation*/
599   /* but is not perhaps always true. */
600   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
601   return 0;
602 }
603 
604 /*
605   This only works correctly for square matrices where the subblock A->A is the
606    diagonal block
607 */
608 static int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
609 {
610   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
611   if (a->M != a->N)
612     SETERRQ(1,"MatGetDiagonal_MPIBAIJ:Supports only square matrix where A->A is diag block");
613   return MatGetDiagonal(a->A,v);
614 }
615 
616 static int MatScale_MPIBAIJ(Scalar *aa,Mat A)
617 {
618   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
619   int        ierr;
620   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
621   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
622   return 0;
623 }
624 static int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
625 {
626   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
627   *m = mat->M; *n = mat->N;
628   return 0;
629 }
630 
631 static int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
632 {
633   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
634   *m = mat->m; *n = mat->N;
635   return 0;
636 }
637 
638 static int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
639 {
640   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
641   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
642   return 0;
643 }
644 
645 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
646 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
647 
648 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
649 {
650   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
651   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
652   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
653   int        nztot, nzA, nzB, lrow, brstart = mat->rstart*bs, brend = mat->rend*bs;
654   int        *cmap, *idx_p,cstart = mat->cstart;
655 
656   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,"MatGetRow_MPIBAIJ:Already active");
657   mat->getrowactive = PETSC_TRUE;
658 
659   if (!mat->rowvalues && (idx || v)) {
660     /*
661         allocate enough space to hold information from the longest row.
662     */
663     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
664     int     max = 1,mbs = mat->mbs,tmp;
665     for ( i=0; i<mbs; i++ ) {
666       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
667       if (max < tmp) { max = tmp; }
668     }
669     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
670     CHKPTRQ(mat->rowvalues);
671     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
672   }
673 
674 
675   if (row < brstart || row >= brend) SETERRQ(1,"MatGetRow_MPIBAIJ:Only local rows")
676   lrow = row - brstart;
677 
678   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
679   if (!v)   {pvA = 0; pvB = 0;}
680   if (!idx) {pcA = 0; if (!v) pcB = 0;}
681   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
682   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
683   nztot = nzA + nzB;
684 
685   cmap  = mat->garray;
686   if (v  || idx) {
687     if (nztot) {
688       /* Sort by increasing column numbers, assuming A and B already sorted */
689       int imark = -1;
690       if (v) {
691         *v = v_p = mat->rowvalues;
692         for ( i=0; i<nzB; i++ ) {
693           if (cmap[cworkB[i]/bs] < cstart)   v_p[i] = vworkB[i];
694           else break;
695         }
696         imark = i;
697         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
698         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
699       }
700       if (idx) {
701         *idx = idx_p = mat->rowindices;
702         if (imark > -1) {
703           for ( i=0; i<imark; i++ ) {
704             idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs;
705           }
706         } else {
707           for ( i=0; i<nzB; i++ ) {
708             if (cmap[cworkB[i]/bs] < cstart)
709               idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
710             else break;
711           }
712           imark = i;
713         }
714         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart*bs + cworkA[i];
715         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
716       }
717     }
718     else {
719       if (idx) *idx = 0;
720       if (v)   *v   = 0;
721     }
722   }
723   *nz = nztot;
724   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
725   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
726   return 0;
727 }
728 
729 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
730 {
731   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
732   if (baij->getrowactive == PETSC_FALSE) {
733     SETERRQ(1,"MatRestoreRow_MPIBAIJ:MatGetRow not called");
734   }
735   baij->getrowactive = PETSC_FALSE;
736   return 0;
737 }
738 
739 static int MatGetBlockSize_MPIBAIJ(Mat mat, int *bs)
740 {
741   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
742   *bs = baij->bs;
743   return 0;
744 }
745 
746 static int MatZeroEntries_MPIBAIJ(Mat A)
747 {
748   Mat_MPIBAIJ *l = (Mat_MPIBAIJ *) A->data;
749   int         ierr;
750   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
751   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
752   return 0;
753 }
754 
755 static int MatGetInfo_MPIBAIJ(Mat matin,MatInfoType flag,int *nz,
756                              int *nzalloc,int *mem)
757 {
758   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
759   Mat         A = mat->A, B = mat->B;
760   int         ierr, isend[3], irecv[3], nzA, nzallocA, memA;
761 
762   ierr = MatGetInfo(A,MAT_LOCAL,&nzA,&nzallocA,&memA); CHKERRQ(ierr);
763   ierr = MatGetInfo(B,MAT_LOCAL,&isend[0],&isend[1],&isend[2]); CHKERRQ(ierr);
764   isend[0] += nzA; isend[1] += nzallocA; isend[2] += memA;
765   if (flag == MAT_LOCAL) {
766     if (nz)       *nz      = isend[0];
767     if (nzalloc)  *nzalloc = isend[1];
768     if (mem)      *mem     = isend[2];
769   } else if (flag == MAT_GLOBAL_MAX) {
770     MPI_Allreduce( isend, irecv,3,MPI_INT,MPI_MAX,matin->comm);
771     if (nz)      *nz      = irecv[0];
772     if (nzalloc) *nzalloc = irecv[1];
773     if (mem)     *mem     = irecv[2];
774   } else if (flag == MAT_GLOBAL_SUM) {
775     MPI_Allreduce( isend, irecv,3,MPI_INT,MPI_SUM,matin->comm);
776     if (nz)      *nz      = irecv[0];
777     if (nzalloc) *nzalloc = irecv[1];
778     if (mem)     *mem     = irecv[2];
779   }
780   return 0;
781 }
782 
783 static int MatSetOption_MPIBAIJ(Mat A,MatOption op)
784 {
785   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
786 
787   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
788       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
789       op == MAT_COLUMNS_SORTED ||
790       op == MAT_ROW_ORIENTED) {
791         MatSetOption(a->A,op);
792         MatSetOption(a->B,op);
793   }
794   else if (op == MAT_ROWS_SORTED ||
795            op == MAT_SYMMETRIC ||
796            op == MAT_STRUCTURALLY_SYMMETRIC ||
797            op == MAT_YES_NEW_DIAGONALS)
798     PLogInfo(A,"Info:MatSetOption_MPIBAIJ:Option ignored\n");
799   else if (op == MAT_COLUMN_ORIENTED) {
800     a->roworiented = 0;
801     MatSetOption(a->A,op);
802     MatSetOption(a->B,op);
803   }
804   else if (op == MAT_NO_NEW_DIAGONALS)
805     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIBAIJ:MAT_NO_NEW_DIAGONALS");}
806   else
807     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIBAIJ:unknown option");}
808   return 0;
809 }
810 
811 static int MatTranspose_MPIBAIJ(Mat A,Mat *matout)
812 {
813   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) A->data;
814   Mat_SeqBAIJ *Aloc;
815   Mat        B;
816   int        ierr,M=baij->M,N=baij->N,*ai,*aj,row,i,*rvals,j,k,col;
817   int        bs=baij->bs,mbs=baij->mbs;
818   Scalar     *a;
819 
820   if (matout == PETSC_NULL && M != N)
821     SETERRQ(1,"MatTranspose_MPIBAIJ:Square matrix only for in-place");
822   ierr = MatCreateMPIBAIJ(A->comm,baij->bs,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);
823   CHKERRQ(ierr);
824 
825   /* copy over the A part */
826   Aloc = (Mat_SeqBAIJ*) baij->A->data;
827   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
828   row = baij->rstart;
829   rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
830 
831   for ( i=0; i<mbs; i++ ) {
832     rvals[0] = bs*(baij->rstart + i);
833     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
834     for ( j=ai[i]; j<ai[i+1]; j++ ) {
835       col = (baij->cstart+aj[j])*bs;
836       for (k=0; k<bs; k++ ) {
837         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
838         col++; a += bs;
839       }
840     }
841   }
842   /* copy over the B part */
843   Aloc = (Mat_SeqBAIJ*) baij->B->data;
844   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
845   row = baij->rstart*bs;
846   for ( i=0; i<mbs; i++ ) {
847     rvals[0] = bs*(baij->rstart + i);
848     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
849     for ( j=ai[i]; j<ai[i+1]; j++ ) {
850       col = baij->garray[aj[j]]*bs;
851       for (k=0; k<bs; k++ ) {
852         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
853         col++; a += bs;
854       }
855     }
856   }
857   PetscFree(rvals);
858   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
859   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
860 
861   if (matout != PETSC_NULL) {
862     *matout = B;
863   } else {
864     /* This isn't really an in-place transpose .... but free data structures from baij */
865     PetscFree(baij->rowners);
866     ierr = MatDestroy(baij->A); CHKERRQ(ierr);
867     ierr = MatDestroy(baij->B); CHKERRQ(ierr);
868     if (baij->colmap) PetscFree(baij->colmap);
869     if (baij->garray) PetscFree(baij->garray);
870     if (baij->lvec) VecDestroy(baij->lvec);
871     if (baij->Mvctx) VecScatterDestroy(baij->Mvctx);
872     PetscFree(baij);
873     PetscMemcpy(A,B,sizeof(struct _Mat));
874     PetscHeaderDestroy(B);
875   }
876   return 0;
877 }
878 /* the code does not do the diagonal entries correctly unless the
879    matrix is square and the column and row owerships are identical.
880    This is a BUG. The only way to fix it seems to be to access
881    baij->A and baij->B directly and not through the MatZeroRows()
882    routine.
883 */
884 static int MatZeroRows_MPIBAIJ(Mat A,IS is,Scalar *diag)
885 {
886   Mat_MPIBAIJ    *l = (Mat_MPIBAIJ *) A->data;
887   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
888   int            *procs,*nprocs,j,found,idx,nsends,*work;
889   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
890   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
891   int            *lens,imdex,*lrows,*values,bs=l->bs;
892   MPI_Comm       comm = A->comm;
893   MPI_Request    *send_waits,*recv_waits;
894   MPI_Status     recv_status,*send_status;
895   IS             istmp;
896 
897   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
898   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
899 
900   /*  first count number of contributors to each processor */
901   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
902   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
903   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
904   for ( i=0; i<N; i++ ) {
905     idx = rows[i];
906     found = 0;
907     for ( j=0; j<size; j++ ) {
908       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
909         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
910       }
911     }
912     if (!found) SETERRQ(1,"MatZeroRows_MPIBAIJ:Index out of range");
913   }
914   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
915 
916   /* inform other processors of number of messages and max length*/
917   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
918   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
919   nrecvs = work[rank];
920   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
921   nmax = work[rank];
922   PetscFree(work);
923 
924   /* post receives:   */
925   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
926   CHKPTRQ(rvalues);
927   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
928   CHKPTRQ(recv_waits);
929   for ( i=0; i<nrecvs; i++ ) {
930     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
931   }
932 
933   /* do sends:
934       1) starts[i] gives the starting index in svalues for stuff going to
935          the ith processor
936   */
937   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
938   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
939   CHKPTRQ(send_waits);
940   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
941   starts[0] = 0;
942   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
943   for ( i=0; i<N; i++ ) {
944     svalues[starts[owner[i]]++] = rows[i];
945   }
946   ISRestoreIndices(is,&rows);
947 
948   starts[0] = 0;
949   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
950   count = 0;
951   for ( i=0; i<size; i++ ) {
952     if (procs[i]) {
953       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
954     }
955   }
956   PetscFree(starts);
957 
958   base = owners[rank]*bs;
959 
960   /*  wait on receives */
961   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
962   source = lens + nrecvs;
963   count  = nrecvs; slen = 0;
964   while (count) {
965     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
966     /* unpack receives into our local space */
967     MPI_Get_count(&recv_status,MPI_INT,&n);
968     source[imdex]  = recv_status.MPI_SOURCE;
969     lens[imdex]  = n;
970     slen += n;
971     count--;
972   }
973   PetscFree(recv_waits);
974 
975   /* move the data into the send scatter */
976   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
977   count = 0;
978   for ( i=0; i<nrecvs; i++ ) {
979     values = rvalues + i*nmax;
980     for ( j=0; j<lens[i]; j++ ) {
981       lrows[count++] = values[j] - base;
982     }
983   }
984   PetscFree(rvalues); PetscFree(lens);
985   PetscFree(owner); PetscFree(nprocs);
986 
987   /* actually zap the local rows */
988   ierr = ISCreateGeneral(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
989   PLogObjectParent(A,istmp);
990   PetscFree(lrows);
991   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
992   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
993   ierr = ISDestroy(istmp); CHKERRQ(ierr);
994 
995   /* wait on sends */
996   if (nsends) {
997     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
998     CHKPTRQ(send_status);
999     MPI_Waitall(nsends,send_waits,send_status);
1000     PetscFree(send_status);
1001   }
1002   PetscFree(send_waits); PetscFree(svalues);
1003 
1004   return 0;
1005 }
1006 extern int MatPrintHelp_SeqBAIJ(Mat);
1007 static int MatPrintHelp_MPIBAIJ(Mat A)
1008 {
1009   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1010 
1011   if (!a->rank) return MatPrintHelp_SeqBAIJ(a->A);
1012   else return 0;
1013 }
1014 
1015 static int MatConvertSameType_MPIBAIJ(Mat,Mat *,int);
1016 
1017 /* -------------------------------------------------------------------*/
1018 static struct _MatOps MatOps = {
1019   MatSetValues_MPIBAIJ,MatGetRow_MPIBAIJ,MatRestoreRow_MPIBAIJ,MatMult_MPIBAIJ,
1020   MatMultAdd_MPIBAIJ,MatMultTrans_MPIBAIJ,MatMultTransAdd_MPIBAIJ,MatSolve_MPIBAIJ,
1021   MatSolveAdd_MPIBAIJ,MatSolveTrans_MPIBAIJ,MatSolveTransAdd_MPIBAIJ,MatLUFactor_MPIBAIJ,
1022   0,0,MatTranspose_MPIBAIJ,MatGetInfo_MPIBAIJ,
1023   0,MatGetDiagonal_MPIBAIJ,0,MatNorm_MPIBAIJ,
1024   MatAssemblyBegin_MPIBAIJ,MatAssemblyEnd_MPIBAIJ,0,MatSetOption_MPIBAIJ,
1025   MatZeroEntries_MPIBAIJ,MatZeroRows_MPIBAIJ,MatGetReordering_MPIBAIJ,MatLUFactorSymbolic_MPIBAIJ,
1026   MatLUFactorNumeric_MPIBAIJ,0,0,MatGetSize_MPIBAIJ,
1027   MatGetLocalSize_MPIBAIJ,MatGetOwnershipRange_MPIBAIJ,MatILUFactorSymbolic_MPIBAIJ,0,
1028   0,0,0,MatConvertSameType_MPIBAIJ,0,0,
1029   0,0,0,MatGetSubMatrices_MPIBAIJ,
1030   MatIncreaseOverlap_MPIBAIJ,MatGetValues_MPIBAIJ,0,MatPrintHelp_MPIBAIJ,
1031   MatScale_MPIBAIJ,0,0,0,MatGetBlockSize_MPIBAIJ};
1032 
1033 
1034 /*@C
1035    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
1036    (block compressed row).  For good matrix assembly performance
1037    the user should preallocate the matrix storage by setting the parameters
1038    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1039    performance can be increased by more than a factor of 50.
1040 
1041    Input Parameters:
1042 .  comm - MPI communicator
1043 .  bs   - size of blockk
1044 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1045            This value should be the same as the local size used in creating the
1046            y vector for the matrix-vector product y = Ax.
1047 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1048            This value should be the same as the local size used in creating the
1049            x vector for the matrix-vector product y = Ax.
1050 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1051 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1052 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
1053            submatrix  (same for all local rows)
1054 .  d_nzz - array containing the number of block nonzeros in the various block rows
1055            of the in diagonal portion of the local (possibly different for each block
1056            row) or PETSC_NULL.  You must leave room for the diagonal entry even if
1057            it is zero.
1058 .  o_nz  - number of block nonzeros per block row in the off-diagonal portion of local
1059            submatrix (same for all local rows).
1060 .  o_nzz - array containing the number of nonzeros in the various block rows of the
1061            off-diagonal portion of the local submatrix (possibly different for
1062            each block row) or PETSC_NULL.
1063 
1064    Output Parameter:
1065 .  A - the matrix
1066 
1067    Notes:
1068    The user MUST specify either the local or global matrix dimensions
1069    (possibly both).
1070 
1071    Storage Information:
1072    For a square global matrix we define each processor's diagonal portion
1073    to be its local rows and the corresponding columns (a square submatrix);
1074    each processor's off-diagonal portion encompasses the remainder of the
1075    local matrix (a rectangular submatrix).
1076 
1077    The user can specify preallocated storage for the diagonal part of
1078    the local submatrix with either d_nz or d_nnz (not both).  Set
1079    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1080    memory allocation.  Likewise, specify preallocated storage for the
1081    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1082 
1083    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1084    the figure below we depict these three local rows and all columns (0-11).
1085 
1086 $          0 1 2 3 4 5 6 7 8 9 10 11
1087 $         -------------------
1088 $  row 3  |  o o o d d d o o o o o o
1089 $  row 4  |  o o o d d d o o o o o o
1090 $  row 5  |  o o o d d d o o o o o o
1091 $         -------------------
1092 $
1093 
1094    Thus, any entries in the d locations are stored in the d (diagonal)
1095    submatrix, and any entries in the o locations are stored in the
1096    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1097    stored simply in the MATSEQBAIJ format for compressed row storage.
1098 
1099    Now d_nz should indicate the number of nonzeros per row in the d matrix,
1100    and o_nz should indicate the number of nonzeros per row in the o matrix.
1101    In general, for PDE problems in which most nonzeros are near the diagonal,
1102    one expects d_nz >> o_nz.   For large problems you MUST preallocate memory
1103    or you will get TERRIBLE performance; see the users' manual chapter on
1104    matrices and the file $(PETSC_DIR)/Performance.
1105 
1106 .keywords: matrix, block, aij, compressed row, sparse, parallel
1107 
1108 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues()
1109 @*/
1110 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
1111                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1112 {
1113   Mat          B;
1114   Mat_MPIBAIJ  *b;
1115   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE;
1116 
1117   if (bs < 1) SETERRQ(1,"MatCreateMPIBAIJ: invalid block size specified");
1118   *A = 0;
1119   PetscHeaderCreate(B,_Mat,MAT_COOKIE,MATMPIBAIJ,comm);
1120   PLogObjectCreate(B);
1121   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
1122   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
1123   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1124   B->destroy    = MatDestroy_MPIBAIJ;
1125   B->view       = MatView_MPIBAIJ;
1126 
1127   B->factor     = 0;
1128   B->assembled  = PETSC_FALSE;
1129 
1130   b->insertmode = NOT_SET_VALUES;
1131   MPI_Comm_rank(comm,&b->rank);
1132   MPI_Comm_size(comm,&b->size);
1133 
1134   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
1135     SETERRQ(1,"MatCreateMPIBAIJ:Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1136   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) SETERRQ(1,"MatCreateMPIBAIJ: either M or m should be specified");
1137   if ( M == PETSC_DECIDE && n == PETSC_DECIDE)SETERRQ(1,"MatCreateMPIBAIJ: either N or n should be specified");
1138   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
1139   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
1140 
1141   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1142     work[0] = m; work[1] = n;
1143     mbs = m/bs; nbs = n/bs;
1144     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
1145     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
1146     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
1147   }
1148   if (m == PETSC_DECIDE) {
1149     Mbs = M/bs;
1150     if (Mbs*bs != M) SETERRQ(1,"MatCreateMPIBAIJ: No of global rows must be divisible by blocksize");
1151     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
1152     m   = mbs*bs;
1153   }
1154   if (n == PETSC_DECIDE) {
1155     Nbs = N/bs;
1156     if (Nbs*bs != N) SETERRQ(1,"MatCreateMPIBAIJ: No of global cols must be divisible by blocksize");
1157     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
1158     n   = nbs*bs;
1159   }
1160   if (mbs*bs != m || nbs*bs != n) SETERRQ(1,"MatCreateMPIBAIJ: No of local rows, cols must be divisible by blocksize");
1161 
1162   b->m = m; B->m = m;
1163   b->n = n; B->n = n;
1164   b->N = N; B->N = N;
1165   b->M = M; B->M = M;
1166   b->bs  = bs;
1167   b->bs2 = bs*bs;
1168   b->mbs = mbs;
1169   b->nbs = nbs;
1170   b->Mbs = Mbs;
1171   b->Nbs = Nbs;
1172 
1173   /* build local table of row and column ownerships */
1174   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1175   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
1176   b->cowners = b->rowners + b->size + 2;
1177   MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
1178   b->rowners[0] = 0;
1179   for ( i=2; i<=b->size; i++ ) {
1180     b->rowners[i] += b->rowners[i-1];
1181   }
1182   b->rstart = b->rowners[b->rank];
1183   b->rend   = b->rowners[b->rank+1];
1184   MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
1185   b->cowners[0] = 0;
1186   for ( i=2; i<=b->size; i++ ) {
1187     b->cowners[i] += b->cowners[i-1];
1188   }
1189   b->cstart = b->cowners[b->rank];
1190   b->cend   = b->cowners[b->rank+1];
1191 
1192   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1193   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1194   PLogObjectParent(B,b->A);
1195   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1196   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1197   PLogObjectParent(B,b->B);
1198 
1199   /* build cache for off array entries formed */
1200   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1201   b->colmap      = 0;
1202   b->garray      = 0;
1203   b->roworiented = 1;
1204 
1205   /* stuff used for matrix vector multiply */
1206   b->lvec      = 0;
1207   b->Mvctx     = 0;
1208 
1209   /* stuff for MatGetRow() */
1210   b->rowindices   = 0;
1211   b->rowvalues    = 0;
1212   b->getrowactive = PETSC_FALSE;
1213 
1214   *A = B;
1215   return 0;
1216 }
1217 static int MatConvertSameType_MPIBAIJ(Mat matin,Mat *newmat,int cpvalues)
1218 {
1219   Mat        mat;
1220   Mat_MPIBAIJ *a,*oldmat = (Mat_MPIBAIJ *) matin->data;
1221   int        ierr, len=0, flg;
1222 
1223   *newmat       = 0;
1224   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIBAIJ,matin->comm);
1225   PLogObjectCreate(mat);
1226   mat->data       = (void *) (a = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(a);
1227   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1228   mat->destroy    = MatDestroy_MPIBAIJ;
1229   mat->view       = MatView_MPIBAIJ;
1230   mat->factor     = matin->factor;
1231   mat->assembled  = PETSC_TRUE;
1232 
1233   a->m = mat->m   = oldmat->m;
1234   a->n = mat->n   = oldmat->n;
1235   a->M = mat->M   = oldmat->M;
1236   a->N = mat->N   = oldmat->N;
1237 
1238   a->bs  = oldmat->bs;
1239   a->bs2 = oldmat->bs2;
1240   a->mbs = oldmat->mbs;
1241   a->nbs = oldmat->nbs;
1242   a->Mbs = oldmat->Mbs;
1243   a->Nbs = oldmat->Nbs;
1244 
1245   a->rstart       = oldmat->rstart;
1246   a->rend         = oldmat->rend;
1247   a->cstart       = oldmat->cstart;
1248   a->cend         = oldmat->cend;
1249   a->size         = oldmat->size;
1250   a->rank         = oldmat->rank;
1251   a->insertmode   = NOT_SET_VALUES;
1252   a->rowvalues    = 0;
1253   a->getrowactive = PETSC_FALSE;
1254 
1255   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1256   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
1257   a->cowners = a->rowners + a->size + 2;
1258   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1259   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1260   if (oldmat->colmap) {
1261     a->colmap = (int *) PetscMalloc((a->Nbs)*sizeof(int));CHKPTRQ(a->colmap);
1262     PLogObjectMemory(mat,(a->Nbs)*sizeof(int));
1263     PetscMemcpy(a->colmap,oldmat->colmap,(a->Nbs)*sizeof(int));
1264   } else a->colmap = 0;
1265   if (oldmat->garray && (len = ((Mat_SeqBAIJ *) (oldmat->B->data))->nbs)) {
1266     a->garray = (int *) PetscMalloc(len*sizeof(int)); CHKPTRQ(a->garray);
1267     PLogObjectMemory(mat,len*sizeof(int));
1268     PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1269   } else a->garray = 0;
1270 
1271   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1272   PLogObjectParent(mat,a->lvec);
1273   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1274   PLogObjectParent(mat,a->Mvctx);
1275   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1276   PLogObjectParent(mat,a->A);
1277   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1278   PLogObjectParent(mat,a->B);
1279   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1280   if (flg) {
1281     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1282   }
1283   *newmat = mat;
1284   return 0;
1285 }
1286 
1287 #include "sys.h"
1288 
1289 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
1290 {
1291   Mat          A;
1292   int          i, nz, ierr, j,rstart, rend, fd;
1293   Scalar       *vals,*buf;
1294   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1295   MPI_Status   status;
1296   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
1297   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
1298   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,bs2,Mbs,mbs,extra_rows;
1299   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
1300   int          dcount,kmax,k,nzcount,tmp;
1301 
1302 
1303   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
1304   bs2  = bs*bs;
1305 
1306   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1307   if (!rank) {
1308     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1309     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
1310     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIBAIJ:not matrix object");
1311   }
1312 
1313   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1314   M = header[1]; N = header[2];
1315 
1316   if (M != N) SETERRQ(1,"MatLoad_SeqBAIJ:Can only do square matrices");
1317 
1318   /*
1319      This code adds extra rows to make sure the number of rows is
1320      divisible by the blocksize
1321   */
1322   Mbs        = M/bs;
1323   extra_rows = bs - M + bs*(Mbs);
1324   if (extra_rows == bs) extra_rows = 0;
1325   else                  Mbs++;
1326   if (extra_rows &&!rank) {
1327     PLogInfo(0,"MatLoad_SeqBAIJ:Padding loaded matrix to match blocksize\n");
1328   }
1329 
1330   /* determine ownership of all rows */
1331   mbs = Mbs/size + ((Mbs % size) > rank);
1332   m   = mbs * bs;
1333   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
1334   browners = rowners + size + 1;
1335   MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1336   rowners[0] = 0;
1337   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
1338   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
1339   rstart = rowners[rank];
1340   rend   = rowners[rank+1];
1341 
1342   /* distribute row lengths to all processors */
1343   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
1344   if (!rank) {
1345     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
1346     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1347     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
1348     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1349     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
1350     MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);
1351     PetscFree(sndcounts);
1352   }
1353   else {
1354     MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);
1355   }
1356 
1357   if (!rank) {
1358     /* calculate the number of nonzeros on each processor */
1359     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1360     PetscMemzero(procsnz,size*sizeof(int));
1361     for ( i=0; i<size; i++ ) {
1362       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
1363         procsnz[i] += rowlengths[j];
1364       }
1365     }
1366     PetscFree(rowlengths);
1367 
1368     /* determine max buffer needed and allocate it */
1369     maxnz = 0;
1370     for ( i=0; i<size; i++ ) {
1371       maxnz = PetscMax(maxnz,procsnz[i]);
1372     }
1373     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1374 
1375     /* read in my part of the matrix column indices  */
1376     nz = procsnz[0];
1377     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1378     mycols = ibuf;
1379     if (size == 1)  nz -= extra_rows;
1380     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1381     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
1382 
1383     /* read in every ones (except the last) and ship off */
1384     for ( i=1; i<size-1; i++ ) {
1385       nz = procsnz[i];
1386       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1387       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1388     }
1389     /* read in the stuff for the last proc */
1390     if ( size != 1 ) {
1391       nz = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
1392       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1393       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
1394       MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);
1395     }
1396     PetscFree(cols);
1397   }
1398   else {
1399     /* determine buffer space needed for message */
1400     nz = 0;
1401     for ( i=0; i<m; i++ ) {
1402       nz += locrowlens[i];
1403     }
1404     ibuf = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1405     mycols = ibuf;
1406     /* receive message of column indices*/
1407     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1408     MPI_Get_count(&status,MPI_INT,&maxnz);
1409     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIBAIJ:something is wrong with file");
1410   }
1411 
1412   /* loop over local rows, determining number of off diagonal entries */
1413   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
1414   odlens = dlens + (rend-rstart);
1415   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
1416   PetscMemzero(mask,3*Mbs*sizeof(int));
1417   masked1 = mask    + Mbs;
1418   masked2 = masked1 + Mbs;
1419   rowcount = 0; nzcount = 0;
1420   for ( i=0; i<mbs; i++ ) {
1421     dcount  = 0;
1422     odcount = 0;
1423     for ( j=0; j<bs; j++ ) {
1424       kmax = locrowlens[rowcount];
1425       for ( k=0; k<kmax; k++ ) {
1426         tmp = mycols[nzcount++]/bs;
1427         if (!mask[tmp]) {
1428           mask[tmp] = 1;
1429           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
1430           else masked1[dcount++] = tmp;
1431         }
1432       }
1433       rowcount++;
1434     }
1435 
1436     dlens[i]  = dcount;
1437     odlens[i] = odcount;
1438 
1439     /* zero out the mask elements we set */
1440     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
1441     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
1442   }
1443 
1444   /* create our matrix */
1445   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);
1446          CHKERRQ(ierr);
1447   A = *newmat;
1448   MatSetOption(A,MAT_COLUMNS_SORTED);
1449 
1450   if (!rank) {
1451     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
1452     /* read in my part of the matrix numerical values  */
1453     nz = procsnz[0];
1454     vals = buf;
1455     mycols = ibuf;
1456     if (size == 1)  nz -= extra_rows;
1457     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1458     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
1459 
1460     /* insert into matrix */
1461     jj      = rstart*bs;
1462     for ( i=0; i<m; i++ ) {
1463       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1464       mycols += locrowlens[i];
1465       vals   += locrowlens[i];
1466       jj++;
1467     }
1468     /* read in other processors (except the last one) and ship out */
1469     for ( i=1; i<size-1; i++ ) {
1470       nz = procsnz[i];
1471       vals = buf;
1472       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1473       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1474     }
1475     /* the last proc */
1476     if ( size != 1 ){
1477       nz = procsnz[i] - extra_rows;
1478       vals = buf;
1479       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1480       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
1481       MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);
1482     }
1483     PetscFree(procsnz);
1484   }
1485   else {
1486     /* receive numeric values */
1487     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
1488 
1489     /* receive message of values*/
1490     vals = buf;
1491     mycols = ibuf;
1492     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1493     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1494     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIBAIJ:something is wrong with file");
1495 
1496     /* insert into matrix */
1497     jj      = rstart*bs;
1498     for ( i=0; i<m; i++ ) {
1499       ierr    = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1500       mycols += locrowlens[i];
1501       vals   += locrowlens[i];
1502       jj++;
1503     }
1504   }
1505   PetscFree(locrowlens);
1506   PetscFree(buf);
1507   PetscFree(ibuf);
1508   PetscFree(rowners);
1509   PetscFree(dlens);
1510   PetscFree(mask);
1511   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1512   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1513   return 0;
1514 }
1515 
1516 
1517