xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision 1942fdfc3363b1385e6dd6a4d0a1dabce71c8c08)
1 #ifndef lint
2 static char vcid[] = "$Id: mpibaij.c,v 1.24 1996/08/23 22:21:21 curfman Exp bsmith $";
3 #endif
4 
5 #include "src/mat/impls/baij/mpi/mpibaij.h"
6 #include "src/vec/vecimpl.h"
7 
8 #include "draw.h"
9 #include "pinclude/pviewer.h"
10 
11 extern int MatSetUpMultiply_MPIBAIJ(Mat);
12 extern int DisAssemble_MPIBAIJ(Mat);
13 extern int MatIncreaseOverlap_MPIBAIJ(Mat,int,IS *,int);
14 extern int MatGetSubMatrices_MPIBAIJ(Mat,int,IS *,IS *,MatGetSubMatrixCall,Mat **);
15 extern int MatLUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,Mat *);
16 extern int MatLUFactorNumeric_MPIBAIJ(Mat,Mat *);
17 extern int MatLUFactor_MPIBAIJ(Mat,IS,IS,double);
18 extern int MatSolve_MPIBAIJ(Mat,Vec,Vec);
19 extern int MatSolveAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
20 extern int MatSolveTrans_MPIBAIJ(Mat,Vec,Vec);
21 extern int MatSolveTransAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
22 extern int MatILUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,int,Mat *);
23 
24 
25 /*
26      Local utility routine that creates a mapping from the global column
27    number to the local number in the off-diagonal part of the local
28    storage of the matrix.  This is done in a non scable way since the
29    length of colmap equals the global matrix length.
30 */
31 static int CreateColmap_MPIBAIJ_Private(Mat mat)
32 {
33   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
34   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
35   int         nbs = B->nbs,i;
36 
37   baij->colmap = (int *) PetscMalloc(baij->Nbs*sizeof(int));CHKPTRQ(baij->colmap);
38   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
39   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
40   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i + 1;
41   return 0;
42 }
43 
44 static int MatGetRowIJ_MPIBAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
45                             PetscTruth *done)
46 {
47   Mat_MPIBAIJ *aij = (Mat_MPIBAIJ *) mat->data;
48   int         ierr;
49   if (aij->size == 1) {
50     ierr = MatGetRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
51   } else SETERRQ(1,"MatGetRowIJ_MPIBAIJ:not supported in parallel");
52   return 0;
53 }
54 
55 static int MatRestoreRowIJ_MPIBAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
56                                 PetscTruth *done)
57 {
58   Mat_MPIBAIJ *aij = (Mat_MPIBAIJ *) mat->data;
59   int        ierr;
60   if (aij->size == 1) {
61     ierr = MatRestoreRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
62   } else SETERRQ(1,"MatRestoreRowIJ_MPIBAIJ:not supported in parallel");
63   return 0;
64 }
65 
66 static int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
67 {
68   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
69   Scalar      value;
70   int         ierr,i,j, rstart = baij->rstart, rend = baij->rend;
71   int         cstart = baij->cstart, cend = baij->cend,row,col;
72   int         roworiented = baij->roworiented,rstart_orig,rend_orig;
73   int         cstart_orig,cend_orig,bs=baij->bs;
74 
75   if (baij->insertmode != NOT_SET_VALUES && baij->insertmode != addv) {
76     SETERRQ(1,"MatSetValues_MPIBAIJ:Cannot mix inserts and adds");
77   }
78   baij->insertmode = addv;
79   rstart_orig = rstart*bs;
80   rend_orig   = rend*bs;
81   cstart_orig = cstart*bs;
82   cend_orig   = cend*bs;
83   for ( i=0; i<m; i++ ) {
84     if (im[i] < 0) SETERRQ(1,"MatSetValues_MPIBAIJ:Negative row");
85     if (im[i] >= baij->M) SETERRQ(1,"MatSetValues_MPIBAIJ:Row too large");
86     if (im[i] >= rstart_orig && im[i] < rend_orig) {
87       row = im[i] - rstart_orig;
88       for ( j=0; j<n; j++ ) {
89         if (in[j] < 0) SETERRQ(1,"MatSetValues_MPIBAIJ:Negative column");
90         if (in[j] >= baij->N) SETERRQ(1,"MatSetValues_MPIBAIJ:Col too large");
91         if (in[j] >= cstart_orig && in[j] < cend_orig){
92           col = in[j] - cstart_orig;
93           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
94           ierr = MatSetValues(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
95         }
96         else {
97           if (mat->was_assembled) {
98             if (!baij->colmap) {
99               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
100             }
101             col = baij->colmap[in[j]/bs] - 1 + in[j]%bs;
102             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
103               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
104               col =  in[j];
105             }
106           }
107           else col = in[j];
108           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
109           ierr = MatSetValues(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
110         }
111       }
112     }
113     else {
114       if (roworiented) {
115         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
116       }
117       else {
118         row = im[i];
119         for ( j=0; j<n; j++ ) {
120           ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
121         }
122       }
123     }
124   }
125   return 0;
126 }
127 
128 static int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
129 {
130   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
131   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
132   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col;
133 
134   for ( i=0; i<m; i++ ) {
135     if (idxm[i] < 0) SETERRQ(1,"MatGetValues_MPIBAIJ:Negative row");
136     if (idxm[i] >= baij->M) SETERRQ(1,"MatGetValues_MPIBAIJ:Row too large");
137     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
138       row = idxm[i] - bsrstart;
139       for ( j=0; j<n; j++ ) {
140         if (idxn[j] < 0) SETERRQ(1,"MatGetValues_MPIBAIJ:Negative column");
141         if (idxn[j] >= baij->N) SETERRQ(1,"MatGetValues_MPIBAIJ:Col too large");
142         if (idxn[j] >= bscstart && idxn[j] < bscend){
143           col = idxn[j] - bscstart;
144           ierr = MatGetValues(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
145         }
146         else {
147           if (!baij->colmap) {
148             ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
149           }
150           if((baij->colmap[idxn[j]/bs]-1 < 0) ||
151              (baij->garray[baij->colmap[idxn[j]/bs]-1] != idxn[j]/bs)) *(v+i*n+j) = 0.0;
152           else {
153             col  = (baij->colmap[idxn[j]/bs]-1)*bs + idxn[j]%bs;
154             ierr = MatGetValues(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
155           }
156         }
157       }
158     }
159     else {
160       SETERRQ(1,"MatGetValues_MPIBAIJ:Only local values currently supported");
161     }
162   }
163   return 0;
164 }
165 
166 static int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
167 {
168   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
169   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
170   int        ierr, i,bs2=baij->bs2;
171   double     sum = 0.0;
172   Scalar     *v;
173 
174   if (baij->size == 1) {
175     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
176   } else {
177     if (type == NORM_FROBENIUS) {
178       v = amat->a;
179       for (i=0; i<amat->nz*bs2; i++ ) {
180 #if defined(PETSC_COMPLEX)
181         sum += real(conj(*v)*(*v)); v++;
182 #else
183         sum += (*v)*(*v); v++;
184 #endif
185       }
186       v = bmat->a;
187       for (i=0; i<bmat->nz*bs2; i++ ) {
188 #if defined(PETSC_COMPLEX)
189         sum += real(conj(*v)*(*v)); v++;
190 #else
191         sum += (*v)*(*v); v++;
192 #endif
193       }
194       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
195       *norm = sqrt(*norm);
196     }
197     else
198       SETERRQ(1,"MatNorm_SeqBAIJ:No support for this norm yet");
199   }
200   return 0;
201 }
202 
203 static int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
204 {
205   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
206   MPI_Comm    comm = mat->comm;
207   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
208   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
209   MPI_Request *send_waits,*recv_waits;
210   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
211   InsertMode  addv;
212   Scalar      *rvalues,*svalues;
213 
214   /* make sure all processors are either in INSERTMODE or ADDMODE */
215   MPI_Allreduce(&baij->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
216   if (addv == (ADD_VALUES|INSERT_VALUES)) {
217     SETERRQ(1,"MatAssemblyBegin_MPIBAIJ:Some processors inserted others added");
218   }
219   baij->insertmode = addv; /* in case this processor had no cache */
220 
221   /*  first count number of contributors to each processor */
222   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
223   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
224   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
225   for ( i=0; i<baij->stash.n; i++ ) {
226     idx = baij->stash.idx[i];
227     for ( j=0; j<size; j++ ) {
228       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
229         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
230       }
231     }
232   }
233   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
234 
235   /* inform other processors of number of messages and max length*/
236   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
237   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
238   nreceives = work[rank];
239   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
240   nmax = work[rank];
241   PetscFree(work);
242 
243   /* post receives:
244        1) each message will consist of ordered pairs
245      (global index,value) we store the global index as a double
246      to simplify the message passing.
247        2) since we don't know how long each individual message is we
248      allocate the largest needed buffer for each receive. Potentially
249      this is a lot of wasted space.
250 
251 
252        This could be done better.
253   */
254   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
255   CHKPTRQ(rvalues);
256   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
257   CHKPTRQ(recv_waits);
258   for ( i=0; i<nreceives; i++ ) {
259     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
260               comm,recv_waits+i);
261   }
262 
263   /* do sends:
264       1) starts[i] gives the starting index in svalues for stuff going to
265          the ith processor
266   */
267   svalues = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
268   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
269   CHKPTRQ(send_waits);
270   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
271   starts[0] = 0;
272   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
273   for ( i=0; i<baij->stash.n; i++ ) {
274     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
275     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
276     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
277   }
278   PetscFree(owner);
279   starts[0] = 0;
280   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
281   count = 0;
282   for ( i=0; i<size; i++ ) {
283     if (procs[i]) {
284       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
285                 comm,send_waits+count++);
286     }
287   }
288   PetscFree(starts); PetscFree(nprocs);
289 
290   /* Free cache space */
291   PLogInfo(0,"[%d]MatAssemblyBegin_MPIBAIJ:Number of off processor values %d\n",rank,baij->stash.n);
292   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
293 
294   baij->svalues    = svalues;    baij->rvalues    = rvalues;
295   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
296   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
297   baij->rmax       = nmax;
298 
299   return 0;
300 }
301 
302 
303 static int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
304 {
305   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
306   MPI_Status  *send_status,recv_status;
307   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
308   int         bs=baij->bs,row,col,other_disassembled;
309   Scalar      *values,val;
310   InsertMode  addv = baij->insertmode;
311 
312   /*  wait on receives */
313   while (count) {
314     MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);
315     /* unpack receives into our local space */
316     values = baij->rvalues + 3*imdex*baij->rmax;
317     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
318     n = n/3;
319     for ( i=0; i<n; i++ ) {
320       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
321       col = (int) PetscReal(values[3*i+1]);
322       val = values[3*i+2];
323       if (col >= baij->cstart*bs && col < baij->cend*bs) {
324         col -= baij->cstart*bs;
325         MatSetValues(baij->A,1,&row,1,&col,&val,addv);
326       }
327       else {
328         if (mat->was_assembled) {
329           if (!baij->colmap) {
330             ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);
331           }
332           col = (baij->colmap[col/bs]-1)*bs + col%bs;
333           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
334             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
335             col = (int) PetscReal(values[3*i+1]);
336           }
337         }
338         MatSetValues(baij->B,1,&row,1,&col,&val,addv);
339       }
340     }
341     count--;
342   }
343   PetscFree(baij->recv_waits); PetscFree(baij->rvalues);
344 
345   /* wait on sends */
346   if (baij->nsends) {
347     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));
348     CHKPTRQ(send_status);
349     MPI_Waitall(baij->nsends,baij->send_waits,send_status);
350     PetscFree(send_status);
351   }
352   PetscFree(baij->send_waits); PetscFree(baij->svalues);
353 
354   baij->insertmode = NOT_SET_VALUES;
355   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
356   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
357 
358   /* determine if any processor has disassembled, if so we must
359      also disassemble ourselfs, in order that we may reassemble. */
360   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
361   if (mat->was_assembled && !other_disassembled) {
362     ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
363   }
364 
365   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
366     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
367   }
368   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
369   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
370 
371   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
372   return 0;
373 }
374 
375 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
376 {
377   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
378   int          ierr;
379 
380   if (baij->size == 1) {
381     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
382   }
383   else SETERRQ(1,"MatView_MPIBAIJ_Binary:Only uniprocessor output supported");
384   return 0;
385 }
386 
387 static int MatView_MPIBAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
388 {
389   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
390   int          ierr, format,rank,bs=baij->bs;
391   FILE         *fd;
392   ViewerType   vtype;
393 
394   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
395   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
396     ierr = ViewerGetFormat(viewer,&format);
397     if (format == ASCII_FORMAT_INFO_DETAILED) {
398       MatInfo info;
399       MPI_Comm_rank(mat->comm,&rank);
400       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
401       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
402       PetscSequentialPhaseBegin(mat->comm,1);
403       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
404               rank,baij->m,(int)info.nz_used*bs,(int)info.nz_allocated*bs,
405               baij->bs,(int)info.memory);
406       ierr = MatGetInfo(baij->A,MAT_LOCAL,&info);
407       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
408       ierr = MatGetInfo(baij->B,MAT_LOCAL,&info);
409       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
410       fflush(fd);
411       PetscSequentialPhaseEnd(mat->comm,1);
412       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
413       return 0;
414     }
415     else if (format == ASCII_FORMAT_INFO) {
416       return 0;
417     }
418   }
419 
420   if (vtype == DRAW_VIEWER) {
421     Draw       draw;
422     PetscTruth isnull;
423     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
424     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
425   }
426 
427   if (vtype == ASCII_FILE_VIEWER) {
428     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
429     PetscSequentialPhaseBegin(mat->comm,1);
430     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
431            baij->rank,baij->m,baij->rstart*bs,baij->rend*bs,baij->n,
432             baij->cstart*bs,baij->cend*bs);
433     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
434     ierr = MatView(baij->B,viewer); CHKERRQ(ierr);
435     fflush(fd);
436     PetscSequentialPhaseEnd(mat->comm,1);
437   }
438   else {
439     int size = baij->size;
440     rank = baij->rank;
441     if (size == 1) {
442       ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
443     }
444     else {
445       /* assemble the entire matrix onto first processor. */
446       Mat         A;
447       Mat_SeqBAIJ *Aloc;
448       int         M = baij->M, N = baij->N,*ai,*aj,row,col,i,j,k,*rvals;
449       int         mbs=baij->mbs;
450       Scalar      *a;
451 
452       if (!rank) {
453         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
454         CHKERRQ(ierr);
455       }
456       else {
457         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
458         CHKERRQ(ierr);
459       }
460       PLogObjectParent(mat,A);
461 
462       /* copy over the A part */
463       Aloc = (Mat_SeqBAIJ*) baij->A->data;
464       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
465       row = baij->rstart;
466       rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
467 
468       for ( i=0; i<mbs; i++ ) {
469         rvals[0] = bs*(baij->rstart + i);
470         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
471         for ( j=ai[i]; j<ai[i+1]; j++ ) {
472           col = (baij->cstart+aj[j])*bs;
473           for (k=0; k<bs; k++ ) {
474             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
475             col++; a += bs;
476           }
477         }
478       }
479       /* copy over the B part */
480       Aloc = (Mat_SeqBAIJ*) baij->B->data;
481       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
482       row = baij->rstart*bs;
483       for ( i=0; i<mbs; i++ ) {
484         rvals[0] = bs*(baij->rstart + i);
485         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
486         for ( j=ai[i]; j<ai[i+1]; j++ ) {
487           col = baij->garray[aj[j]]*bs;
488           for (k=0; k<bs; k++ ) {
489             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
490             col++; a += bs;
491           }
492         }
493       }
494       PetscFree(rvals);
495       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
496       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
497       if (!rank) {
498         ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
499       }
500       ierr = MatDestroy(A); CHKERRQ(ierr);
501     }
502   }
503   return 0;
504 }
505 
506 
507 
508 static int MatView_MPIBAIJ(PetscObject obj,Viewer viewer)
509 {
510   Mat         mat = (Mat) obj;
511   int         ierr;
512   ViewerType  vtype;
513 
514   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
515   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
516       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
517     ierr = MatView_MPIBAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
518   }
519   else if (vtype == BINARY_FILE_VIEWER) {
520     return MatView_MPIBAIJ_Binary(mat,viewer);
521   }
522   return 0;
523 }
524 
525 static int MatDestroy_MPIBAIJ(PetscObject obj)
526 {
527   Mat         mat = (Mat) obj;
528   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
529   int         ierr;
530 
531 #if defined(PETSC_LOG)
532   PLogObjectState(obj,"Rows=%d, Cols=%d",baij->M,baij->N);
533 #endif
534 
535   PetscFree(baij->rowners);
536   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
537   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
538   if (baij->colmap) PetscFree(baij->colmap);
539   if (baij->garray) PetscFree(baij->garray);
540   if (baij->lvec)   VecDestroy(baij->lvec);
541   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
542   if (baij->rowvalues) PetscFree(baij->rowvalues);
543   PetscFree(baij);
544   PLogObjectDestroy(mat);
545   PetscHeaderDestroy(mat);
546   return 0;
547 }
548 
549 static int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
550 {
551   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
552   int         ierr, nt;
553 
554   VecGetLocalSize_Fast(xx,nt);
555   if (nt != a->n) {
556     SETERRQ(1,"MatMult_MPIBAIJ:Incompatible parition of A and xx");
557   }
558   VecGetLocalSize_Fast(yy,nt);
559   if (nt != a->m) {
560     SETERRQ(1,"MatMult_MPIBAIJ:Incompatible parition of A and yy");
561   }
562   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
563   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
564   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
565   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
566   return 0;
567 }
568 
569 static int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
570 {
571   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
572   int        ierr;
573   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
574   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
575   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
576   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
577   return 0;
578 }
579 
580 static int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
581 {
582   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
583   int        ierr;
584 
585   /* do nondiagonal part */
586   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
587   /* send it on its way */
588   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
589   /* do local part */
590   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
591   /* receive remote parts: note this assumes the values are not actually */
592   /* inserted in yy until the next line, which is true for my implementation*/
593   /* but is not perhaps always true. */
594   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,
595                   (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx);CHKERRQ(ierr);
596   return 0;
597 }
598 
599 static int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
600 {
601   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
602   int        ierr;
603 
604   /* do nondiagonal part */
605   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
606   /* send it on its way */
607   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
608   /* do local part */
609   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
610   /* receive remote parts: note this assumes the values are not actually */
611   /* inserted in yy until the next line, which is true for my implementation*/
612   /* but is not perhaps always true. */
613   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
614   return 0;
615 }
616 
617 /*
618   This only works correctly for square matrices where the subblock A->A is the
619    diagonal block
620 */
621 static int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
622 {
623   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
624   if (a->M != a->N)
625     SETERRQ(1,"MatGetDiagonal_MPIBAIJ:Supports only square matrix where A->A is diag block");
626   return MatGetDiagonal(a->A,v);
627 }
628 
629 static int MatScale_MPIBAIJ(Scalar *aa,Mat A)
630 {
631   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
632   int        ierr;
633   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
634   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
635   return 0;
636 }
637 static int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
638 {
639   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
640   *m = mat->M; *n = mat->N;
641   return 0;
642 }
643 
644 static int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
645 {
646   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
647   *m = mat->m; *n = mat->N;
648   return 0;
649 }
650 
651 static int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
652 {
653   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
654   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
655   return 0;
656 }
657 
658 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
659 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
660 
661 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
662 {
663   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
664   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
665   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
666   int        nztot, nzA, nzB, lrow, brstart = mat->rstart*bs, brend = mat->rend*bs;
667   int        *cmap, *idx_p,cstart = mat->cstart;
668 
669   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,"MatGetRow_MPIBAIJ:Already active");
670   mat->getrowactive = PETSC_TRUE;
671 
672   if (!mat->rowvalues && (idx || v)) {
673     /*
674         allocate enough space to hold information from the longest row.
675     */
676     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
677     int     max = 1,mbs = mat->mbs,tmp;
678     for ( i=0; i<mbs; i++ ) {
679       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
680       if (max < tmp) { max = tmp; }
681     }
682     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
683     CHKPTRQ(mat->rowvalues);
684     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
685   }
686 
687 
688   if (row < brstart || row >= brend) SETERRQ(1,"MatGetRow_MPIBAIJ:Only local rows")
689   lrow = row - brstart;
690 
691   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
692   if (!v)   {pvA = 0; pvB = 0;}
693   if (!idx) {pcA = 0; if (!v) pcB = 0;}
694   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
695   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
696   nztot = nzA + nzB;
697 
698   cmap  = mat->garray;
699   if (v  || idx) {
700     if (nztot) {
701       /* Sort by increasing column numbers, assuming A and B already sorted */
702       int imark = -1;
703       if (v) {
704         *v = v_p = mat->rowvalues;
705         for ( i=0; i<nzB; i++ ) {
706           if (cmap[cworkB[i]/bs] < cstart)   v_p[i] = vworkB[i];
707           else break;
708         }
709         imark = i;
710         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
711         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
712       }
713       if (idx) {
714         *idx = idx_p = mat->rowindices;
715         if (imark > -1) {
716           for ( i=0; i<imark; i++ ) {
717             idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs;
718           }
719         } else {
720           for ( i=0; i<nzB; i++ ) {
721             if (cmap[cworkB[i]/bs] < cstart)
722               idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
723             else break;
724           }
725           imark = i;
726         }
727         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart*bs + cworkA[i];
728         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
729       }
730     }
731     else {
732       if (idx) *idx = 0;
733       if (v)   *v   = 0;
734     }
735   }
736   *nz = nztot;
737   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
738   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
739   return 0;
740 }
741 
742 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
743 {
744   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
745   if (baij->getrowactive == PETSC_FALSE) {
746     SETERRQ(1,"MatRestoreRow_MPIBAIJ:MatGetRow not called");
747   }
748   baij->getrowactive = PETSC_FALSE;
749   return 0;
750 }
751 
752 static int MatGetBlockSize_MPIBAIJ(Mat mat,int *bs)
753 {
754   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
755   *bs = baij->bs;
756   return 0;
757 }
758 
759 static int MatZeroEntries_MPIBAIJ(Mat A)
760 {
761   Mat_MPIBAIJ *l = (Mat_MPIBAIJ *) A->data;
762   int         ierr;
763   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
764   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
765   return 0;
766 }
767 
768 static int MatGetInfo_MPIBAIJ(Mat matin,MatInfoType flag,MatInfo *info)
769 {
770   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) matin->data;
771   Mat         A = a->A, B = a->B;
772   int         ierr;
773   double      isend[5], irecv[5];
774 
775   info->rows_global    = (double)a->M;
776   info->columns_global = (double)a->N;
777   info->rows_local     = (double)a->m;
778   info->columns_local  = (double)a->N;
779   info->block_size     = (double)a->bs;
780   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
781   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->memory;
782   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
783   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->memory;
784   if (flag == MAT_LOCAL) {
785     info->nz_used      = isend[0];
786     info->nz_allocated = isend[1];
787     info->nz_unneeded  = isend[2];
788     info->memory       = isend[3];
789     info->mallocs      = isend[4];
790   } else if (flag == MAT_GLOBAL_MAX) {
791     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_MAX,matin->comm);
792     info->nz_used      = irecv[0];
793     info->nz_allocated = irecv[1];
794     info->nz_unneeded  = irecv[2];
795     info->memory       = irecv[3];
796     info->mallocs      = irecv[4];
797   } else if (flag == MAT_GLOBAL_SUM) {
798     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_SUM,matin->comm);
799     info->nz_used      = irecv[0];
800     info->nz_allocated = irecv[1];
801     info->nz_unneeded  = irecv[2];
802     info->memory       = irecv[3];
803     info->mallocs      = irecv[4];
804   }
805   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
806   info->fill_ratio_needed = 0;
807   info->factor_mallocs    = 0;
808   return 0;
809 }
810 
811 static int MatSetOption_MPIBAIJ(Mat A,MatOption op)
812 {
813   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
814 
815   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
816       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
817       op == MAT_COLUMNS_SORTED ||
818       op == MAT_ROW_ORIENTED) {
819         MatSetOption(a->A,op);
820         MatSetOption(a->B,op);
821   }
822   else if (op == MAT_ROWS_SORTED ||
823            op == MAT_SYMMETRIC ||
824            op == MAT_STRUCTURALLY_SYMMETRIC ||
825            op == MAT_YES_NEW_DIAGONALS)
826     PLogInfo(A,"Info:MatSetOption_MPIBAIJ:Option ignored\n");
827   else if (op == MAT_COLUMN_ORIENTED) {
828     a->roworiented = 0;
829     MatSetOption(a->A,op);
830     MatSetOption(a->B,op);
831   }
832   else if (op == MAT_NO_NEW_DIAGONALS)
833     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIBAIJ:MAT_NO_NEW_DIAGONALS");}
834   else
835     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIBAIJ:unknown option");}
836   return 0;
837 }
838 
839 static int MatTranspose_MPIBAIJ(Mat A,Mat *matout)
840 {
841   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) A->data;
842   Mat_SeqBAIJ *Aloc;
843   Mat        B;
844   int        ierr,M=baij->M,N=baij->N,*ai,*aj,row,i,*rvals,j,k,col;
845   int        bs=baij->bs,mbs=baij->mbs;
846   Scalar     *a;
847 
848   if (matout == PETSC_NULL && M != N)
849     SETERRQ(1,"MatTranspose_MPIBAIJ:Square matrix only for in-place");
850   ierr = MatCreateMPIBAIJ(A->comm,baij->bs,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);
851   CHKERRQ(ierr);
852 
853   /* copy over the A part */
854   Aloc = (Mat_SeqBAIJ*) baij->A->data;
855   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
856   row = baij->rstart;
857   rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
858 
859   for ( i=0; i<mbs; i++ ) {
860     rvals[0] = bs*(baij->rstart + i);
861     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
862     for ( j=ai[i]; j<ai[i+1]; j++ ) {
863       col = (baij->cstart+aj[j])*bs;
864       for (k=0; k<bs; k++ ) {
865         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
866         col++; a += bs;
867       }
868     }
869   }
870   /* copy over the B part */
871   Aloc = (Mat_SeqBAIJ*) baij->B->data;
872   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
873   row = baij->rstart*bs;
874   for ( i=0; i<mbs; i++ ) {
875     rvals[0] = bs*(baij->rstart + i);
876     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
877     for ( j=ai[i]; j<ai[i+1]; j++ ) {
878       col = baij->garray[aj[j]]*bs;
879       for (k=0; k<bs; k++ ) {
880         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
881         col++; a += bs;
882       }
883     }
884   }
885   PetscFree(rvals);
886   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
887   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
888 
889   if (matout != PETSC_NULL) {
890     *matout = B;
891   } else {
892     /* This isn't really an in-place transpose .... but free data structures from baij */
893     PetscFree(baij->rowners);
894     ierr = MatDestroy(baij->A); CHKERRQ(ierr);
895     ierr = MatDestroy(baij->B); CHKERRQ(ierr);
896     if (baij->colmap) PetscFree(baij->colmap);
897     if (baij->garray) PetscFree(baij->garray);
898     if (baij->lvec) VecDestroy(baij->lvec);
899     if (baij->Mvctx) VecScatterDestroy(baij->Mvctx);
900     PetscFree(baij);
901     PetscMemcpy(A,B,sizeof(struct _Mat));
902     PetscHeaderDestroy(B);
903   }
904   return 0;
905 }
906 /* the code does not do the diagonal entries correctly unless the
907    matrix is square and the column and row owerships are identical.
908    This is a BUG. The only way to fix it seems to be to access
909    baij->A and baij->B directly and not through the MatZeroRows()
910    routine.
911 */
912 static int MatZeroRows_MPIBAIJ(Mat A,IS is,Scalar *diag)
913 {
914   Mat_MPIBAIJ    *l = (Mat_MPIBAIJ *) A->data;
915   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
916   int            *procs,*nprocs,j,found,idx,nsends,*work;
917   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
918   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
919   int            *lens,imdex,*lrows,*values,bs=l->bs;
920   MPI_Comm       comm = A->comm;
921   MPI_Request    *send_waits,*recv_waits;
922   MPI_Status     recv_status,*send_status;
923   IS             istmp;
924 
925   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
926   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
927 
928   /*  first count number of contributors to each processor */
929   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
930   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
931   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
932   for ( i=0; i<N; i++ ) {
933     idx = rows[i];
934     found = 0;
935     for ( j=0; j<size; j++ ) {
936       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
937         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
938       }
939     }
940     if (!found) SETERRQ(1,"MatZeroRows_MPIBAIJ:Index out of range");
941   }
942   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
943 
944   /* inform other processors of number of messages and max length*/
945   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
946   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
947   nrecvs = work[rank];
948   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
949   nmax = work[rank];
950   PetscFree(work);
951 
952   /* post receives:   */
953   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
954   CHKPTRQ(rvalues);
955   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
956   CHKPTRQ(recv_waits);
957   for ( i=0; i<nrecvs; i++ ) {
958     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
959   }
960 
961   /* do sends:
962       1) starts[i] gives the starting index in svalues for stuff going to
963          the ith processor
964   */
965   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
966   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
967   CHKPTRQ(send_waits);
968   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
969   starts[0] = 0;
970   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
971   for ( i=0; i<N; i++ ) {
972     svalues[starts[owner[i]]++] = rows[i];
973   }
974   ISRestoreIndices(is,&rows);
975 
976   starts[0] = 0;
977   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
978   count = 0;
979   for ( i=0; i<size; i++ ) {
980     if (procs[i]) {
981       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
982     }
983   }
984   PetscFree(starts);
985 
986   base = owners[rank]*bs;
987 
988   /*  wait on receives */
989   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
990   source = lens + nrecvs;
991   count  = nrecvs; slen = 0;
992   while (count) {
993     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
994     /* unpack receives into our local space */
995     MPI_Get_count(&recv_status,MPI_INT,&n);
996     source[imdex]  = recv_status.MPI_SOURCE;
997     lens[imdex]  = n;
998     slen += n;
999     count--;
1000   }
1001   PetscFree(recv_waits);
1002 
1003   /* move the data into the send scatter */
1004   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
1005   count = 0;
1006   for ( i=0; i<nrecvs; i++ ) {
1007     values = rvalues + i*nmax;
1008     for ( j=0; j<lens[i]; j++ ) {
1009       lrows[count++] = values[j] - base;
1010     }
1011   }
1012   PetscFree(rvalues); PetscFree(lens);
1013   PetscFree(owner); PetscFree(nprocs);
1014 
1015   /* actually zap the local rows */
1016   ierr = ISCreateGeneral(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
1017   PLogObjectParent(A,istmp);
1018   PetscFree(lrows);
1019   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
1020   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
1021   ierr = ISDestroy(istmp); CHKERRQ(ierr);
1022 
1023   /* wait on sends */
1024   if (nsends) {
1025     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
1026     CHKPTRQ(send_status);
1027     MPI_Waitall(nsends,send_waits,send_status);
1028     PetscFree(send_status);
1029   }
1030   PetscFree(send_waits); PetscFree(svalues);
1031 
1032   return 0;
1033 }
1034 extern int MatPrintHelp_SeqBAIJ(Mat);
1035 static int MatPrintHelp_MPIBAIJ(Mat A)
1036 {
1037   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1038 
1039   if (!a->rank) return MatPrintHelp_SeqBAIJ(a->A);
1040   else return 0;
1041 }
1042 
1043 static int MatConvertSameType_MPIBAIJ(Mat,Mat *,int);
1044 
1045 /* -------------------------------------------------------------------*/
1046 static struct _MatOps MatOps = {
1047   MatSetValues_MPIBAIJ,MatGetRow_MPIBAIJ,MatRestoreRow_MPIBAIJ,MatMult_MPIBAIJ,
1048   MatMultAdd_MPIBAIJ,MatMultTrans_MPIBAIJ,MatMultTransAdd_MPIBAIJ,MatSolve_MPIBAIJ,
1049   MatSolveAdd_MPIBAIJ,MatSolveTrans_MPIBAIJ,MatSolveTransAdd_MPIBAIJ,MatLUFactor_MPIBAIJ,
1050   0,0,MatTranspose_MPIBAIJ,MatGetInfo_MPIBAIJ,
1051   0,MatGetDiagonal_MPIBAIJ,0,MatNorm_MPIBAIJ,
1052   MatAssemblyBegin_MPIBAIJ,MatAssemblyEnd_MPIBAIJ,0,MatSetOption_MPIBAIJ,
1053   MatZeroEntries_MPIBAIJ,MatZeroRows_MPIBAIJ,MatLUFactorSymbolic_MPIBAIJ,
1054   MatLUFactorNumeric_MPIBAIJ,0,0,MatGetSize_MPIBAIJ,
1055   MatGetLocalSize_MPIBAIJ,MatGetOwnershipRange_MPIBAIJ,MatILUFactorSymbolic_MPIBAIJ,0,
1056   0,0,0,MatConvertSameType_MPIBAIJ,0,0,
1057   0,0,0,MatGetSubMatrices_MPIBAIJ,
1058   MatIncreaseOverlap_MPIBAIJ,MatGetValues_MPIBAIJ,0,MatPrintHelp_MPIBAIJ,
1059   MatScale_MPIBAIJ,0,0,0,MatGetBlockSize_MPIBAIJ,
1060   MatGetRowIJ_MPIBAIJ,
1061   MatRestoreRowIJ_MPIBAIJ};
1062 
1063 
1064 /*@C
1065    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
1066    (block compressed row).  For good matrix assembly performance
1067    the user should preallocate the matrix storage by setting the parameters
1068    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1069    performance can be increased by more than a factor of 50.
1070 
1071    Input Parameters:
1072 .  comm - MPI communicator
1073 .  bs   - size of blockk
1074 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1075            This value should be the same as the local size used in creating the
1076            y vector for the matrix-vector product y = Ax.
1077 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1078            This value should be the same as the local size used in creating the
1079            x vector for the matrix-vector product y = Ax.
1080 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1081 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1082 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
1083            submatrix  (same for all local rows)
1084 .  d_nzz - array containing the number of block nonzeros in the various block rows
1085            of the in diagonal portion of the local (possibly different for each block
1086            row) or PETSC_NULL.  You must leave room for the diagonal entry even if
1087            it is zero.
1088 .  o_nz  - number of block nonzeros per block row in the off-diagonal portion of local
1089            submatrix (same for all local rows).
1090 .  o_nzz - array containing the number of nonzeros in the various block rows of the
1091            off-diagonal portion of the local submatrix (possibly different for
1092            each block row) or PETSC_NULL.
1093 
1094    Output Parameter:
1095 .  A - the matrix
1096 
1097    Notes:
1098    The user MUST specify either the local or global matrix dimensions
1099    (possibly both).
1100 
1101    Storage Information:
1102    For a square global matrix we define each processor's diagonal portion
1103    to be its local rows and the corresponding columns (a square submatrix);
1104    each processor's off-diagonal portion encompasses the remainder of the
1105    local matrix (a rectangular submatrix).
1106 
1107    The user can specify preallocated storage for the diagonal part of
1108    the local submatrix with either d_nz or d_nnz (not both).  Set
1109    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1110    memory allocation.  Likewise, specify preallocated storage for the
1111    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1112 
1113    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1114    the figure below we depict these three local rows and all columns (0-11).
1115 
1116 $          0 1 2 3 4 5 6 7 8 9 10 11
1117 $         -------------------
1118 $  row 3  |  o o o d d d o o o o o o
1119 $  row 4  |  o o o d d d o o o o o o
1120 $  row 5  |  o o o d d d o o o o o o
1121 $         -------------------
1122 $
1123 
1124    Thus, any entries in the d locations are stored in the d (diagonal)
1125    submatrix, and any entries in the o locations are stored in the
1126    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1127    stored simply in the MATSEQBAIJ format for compressed row storage.
1128 
1129    Now d_nz should indicate the number of nonzeros per row in the d matrix,
1130    and o_nz should indicate the number of nonzeros per row in the o matrix.
1131    In general, for PDE problems in which most nonzeros are near the diagonal,
1132    one expects d_nz >> o_nz.   For large problems you MUST preallocate memory
1133    or you will get TERRIBLE performance; see the users' manual chapter on
1134    matrices and the file $(PETSC_DIR)/Performance.
1135 
1136 .keywords: matrix, block, aij, compressed row, sparse, parallel
1137 
1138 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues()
1139 @*/
1140 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
1141                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1142 {
1143   Mat          B;
1144   Mat_MPIBAIJ  *b;
1145   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE;
1146 
1147   if (bs < 1) SETERRQ(1,"MatCreateMPIBAIJ: invalid block size specified");
1148   *A = 0;
1149   PetscHeaderCreate(B,_Mat,MAT_COOKIE,MATMPIBAIJ,comm);
1150   PLogObjectCreate(B);
1151   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
1152   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
1153   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1154   B->destroy    = MatDestroy_MPIBAIJ;
1155   B->view       = MatView_MPIBAIJ;
1156 
1157   B->factor     = 0;
1158   B->assembled  = PETSC_FALSE;
1159 
1160   b->insertmode = NOT_SET_VALUES;
1161   MPI_Comm_rank(comm,&b->rank);
1162   MPI_Comm_size(comm,&b->size);
1163 
1164   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
1165     SETERRQ(1,"MatCreateMPIBAIJ:Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1166   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) SETERRQ(1,"MatCreateMPIBAIJ: either M or m should be specified");
1167   if ( M == PETSC_DECIDE && n == PETSC_DECIDE)SETERRQ(1,"MatCreateMPIBAIJ: either N or n should be specified");
1168   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
1169   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
1170 
1171   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1172     work[0] = m; work[1] = n;
1173     mbs = m/bs; nbs = n/bs;
1174     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
1175     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
1176     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
1177   }
1178   if (m == PETSC_DECIDE) {
1179     Mbs = M/bs;
1180     if (Mbs*bs != M) SETERRQ(1,"MatCreateMPIBAIJ: No of global rows must be divisible by blocksize");
1181     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
1182     m   = mbs*bs;
1183   }
1184   if (n == PETSC_DECIDE) {
1185     Nbs = N/bs;
1186     if (Nbs*bs != N) SETERRQ(1,"MatCreateMPIBAIJ: No of global cols must be divisible by blocksize");
1187     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
1188     n   = nbs*bs;
1189   }
1190   if (mbs*bs != m || nbs*bs != n) SETERRQ(1,"MatCreateMPIBAIJ: No of local rows, cols must be divisible by blocksize");
1191 
1192   b->m = m; B->m = m;
1193   b->n = n; B->n = n;
1194   b->N = N; B->N = N;
1195   b->M = M; B->M = M;
1196   b->bs  = bs;
1197   b->bs2 = bs*bs;
1198   b->mbs = mbs;
1199   b->nbs = nbs;
1200   b->Mbs = Mbs;
1201   b->Nbs = Nbs;
1202 
1203   /* build local table of row and column ownerships */
1204   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1205   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
1206   b->cowners = b->rowners + b->size + 2;
1207   MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
1208   b->rowners[0] = 0;
1209   for ( i=2; i<=b->size; i++ ) {
1210     b->rowners[i] += b->rowners[i-1];
1211   }
1212   b->rstart = b->rowners[b->rank];
1213   b->rend   = b->rowners[b->rank+1];
1214   MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
1215   b->cowners[0] = 0;
1216   for ( i=2; i<=b->size; i++ ) {
1217     b->cowners[i] += b->cowners[i-1];
1218   }
1219   b->cstart = b->cowners[b->rank];
1220   b->cend   = b->cowners[b->rank+1];
1221 
1222   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1223   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1224   PLogObjectParent(B,b->A);
1225   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1226   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1227   PLogObjectParent(B,b->B);
1228 
1229   /* build cache for off array entries formed */
1230   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1231   b->colmap      = 0;
1232   b->garray      = 0;
1233   b->roworiented = 1;
1234 
1235   /* stuff used for matrix vector multiply */
1236   b->lvec      = 0;
1237   b->Mvctx     = 0;
1238 
1239   /* stuff for MatGetRow() */
1240   b->rowindices   = 0;
1241   b->rowvalues    = 0;
1242   b->getrowactive = PETSC_FALSE;
1243 
1244   *A = B;
1245   return 0;
1246 }
1247 static int MatConvertSameType_MPIBAIJ(Mat matin,Mat *newmat,int cpvalues)
1248 {
1249   Mat        mat;
1250   Mat_MPIBAIJ *a,*oldmat = (Mat_MPIBAIJ *) matin->data;
1251   int        ierr, len=0, flg;
1252 
1253   *newmat       = 0;
1254   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIBAIJ,matin->comm);
1255   PLogObjectCreate(mat);
1256   mat->data       = (void *) (a = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(a);
1257   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1258   mat->destroy    = MatDestroy_MPIBAIJ;
1259   mat->view       = MatView_MPIBAIJ;
1260   mat->factor     = matin->factor;
1261   mat->assembled  = PETSC_TRUE;
1262 
1263   a->m = mat->m   = oldmat->m;
1264   a->n = mat->n   = oldmat->n;
1265   a->M = mat->M   = oldmat->M;
1266   a->N = mat->N   = oldmat->N;
1267 
1268   a->bs  = oldmat->bs;
1269   a->bs2 = oldmat->bs2;
1270   a->mbs = oldmat->mbs;
1271   a->nbs = oldmat->nbs;
1272   a->Mbs = oldmat->Mbs;
1273   a->Nbs = oldmat->Nbs;
1274 
1275   a->rstart       = oldmat->rstart;
1276   a->rend         = oldmat->rend;
1277   a->cstart       = oldmat->cstart;
1278   a->cend         = oldmat->cend;
1279   a->size         = oldmat->size;
1280   a->rank         = oldmat->rank;
1281   a->insertmode   = NOT_SET_VALUES;
1282   a->rowvalues    = 0;
1283   a->getrowactive = PETSC_FALSE;
1284 
1285   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1286   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
1287   a->cowners = a->rowners + a->size + 2;
1288   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1289   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1290   if (oldmat->colmap) {
1291     a->colmap = (int *) PetscMalloc((a->Nbs)*sizeof(int));CHKPTRQ(a->colmap);
1292     PLogObjectMemory(mat,(a->Nbs)*sizeof(int));
1293     PetscMemcpy(a->colmap,oldmat->colmap,(a->Nbs)*sizeof(int));
1294   } else a->colmap = 0;
1295   if (oldmat->garray && (len = ((Mat_SeqBAIJ *) (oldmat->B->data))->nbs)) {
1296     a->garray = (int *) PetscMalloc(len*sizeof(int)); CHKPTRQ(a->garray);
1297     PLogObjectMemory(mat,len*sizeof(int));
1298     PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1299   } else a->garray = 0;
1300 
1301   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1302   PLogObjectParent(mat,a->lvec);
1303   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1304   PLogObjectParent(mat,a->Mvctx);
1305   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1306   PLogObjectParent(mat,a->A);
1307   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1308   PLogObjectParent(mat,a->B);
1309   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1310   if (flg) {
1311     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1312   }
1313   *newmat = mat;
1314   return 0;
1315 }
1316 
1317 #include "sys.h"
1318 
1319 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
1320 {
1321   Mat          A;
1322   int          i, nz, ierr, j,rstart, rend, fd;
1323   Scalar       *vals,*buf;
1324   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1325   MPI_Status   status;
1326   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
1327   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
1328   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,bs2,Mbs,mbs,extra_rows;
1329   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
1330   int          dcount,kmax,k,nzcount,tmp;
1331 
1332 
1333   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
1334   bs2  = bs*bs;
1335 
1336   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1337   if (!rank) {
1338     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1339     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
1340     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIBAIJ:not matrix object");
1341   }
1342 
1343   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1344   M = header[1]; N = header[2];
1345 
1346   if (M != N) SETERRQ(1,"MatLoad_SeqBAIJ:Can only do square matrices");
1347 
1348   /*
1349      This code adds extra rows to make sure the number of rows is
1350      divisible by the blocksize
1351   */
1352   Mbs        = M/bs;
1353   extra_rows = bs - M + bs*(Mbs);
1354   if (extra_rows == bs) extra_rows = 0;
1355   else                  Mbs++;
1356   if (extra_rows &&!rank) {
1357     PLogInfo(0,"MatLoad_SeqBAIJ:Padding loaded matrix to match blocksize\n");
1358   }
1359 
1360   /* determine ownership of all rows */
1361   mbs = Mbs/size + ((Mbs % size) > rank);
1362   m   = mbs * bs;
1363   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
1364   browners = rowners + size + 1;
1365   MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1366   rowners[0] = 0;
1367   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
1368   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
1369   rstart = rowners[rank];
1370   rend   = rowners[rank+1];
1371 
1372   /* distribute row lengths to all processors */
1373   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
1374   if (!rank) {
1375     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
1376     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1377     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
1378     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1379     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
1380     MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);
1381     PetscFree(sndcounts);
1382   }
1383   else {
1384     MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);
1385   }
1386 
1387   if (!rank) {
1388     /* calculate the number of nonzeros on each processor */
1389     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1390     PetscMemzero(procsnz,size*sizeof(int));
1391     for ( i=0; i<size; i++ ) {
1392       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
1393         procsnz[i] += rowlengths[j];
1394       }
1395     }
1396     PetscFree(rowlengths);
1397 
1398     /* determine max buffer needed and allocate it */
1399     maxnz = 0;
1400     for ( i=0; i<size; i++ ) {
1401       maxnz = PetscMax(maxnz,procsnz[i]);
1402     }
1403     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1404 
1405     /* read in my part of the matrix column indices  */
1406     nz = procsnz[0];
1407     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1408     mycols = ibuf;
1409     if (size == 1)  nz -= extra_rows;
1410     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1411     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
1412 
1413     /* read in every ones (except the last) and ship off */
1414     for ( i=1; i<size-1; i++ ) {
1415       nz = procsnz[i];
1416       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1417       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1418     }
1419     /* read in the stuff for the last proc */
1420     if ( size != 1 ) {
1421       nz = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
1422       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1423       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
1424       MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);
1425     }
1426     PetscFree(cols);
1427   }
1428   else {
1429     /* determine buffer space needed for message */
1430     nz = 0;
1431     for ( i=0; i<m; i++ ) {
1432       nz += locrowlens[i];
1433     }
1434     ibuf = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1435     mycols = ibuf;
1436     /* receive message of column indices*/
1437     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1438     MPI_Get_count(&status,MPI_INT,&maxnz);
1439     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIBAIJ:something is wrong with file");
1440   }
1441 
1442   /* loop over local rows, determining number of off diagonal entries */
1443   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
1444   odlens = dlens + (rend-rstart);
1445   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
1446   PetscMemzero(mask,3*Mbs*sizeof(int));
1447   masked1 = mask    + Mbs;
1448   masked2 = masked1 + Mbs;
1449   rowcount = 0; nzcount = 0;
1450   for ( i=0; i<mbs; i++ ) {
1451     dcount  = 0;
1452     odcount = 0;
1453     for ( j=0; j<bs; j++ ) {
1454       kmax = locrowlens[rowcount];
1455       for ( k=0; k<kmax; k++ ) {
1456         tmp = mycols[nzcount++]/bs;
1457         if (!mask[tmp]) {
1458           mask[tmp] = 1;
1459           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
1460           else masked1[dcount++] = tmp;
1461         }
1462       }
1463       rowcount++;
1464     }
1465 
1466     dlens[i]  = dcount;
1467     odlens[i] = odcount;
1468 
1469     /* zero out the mask elements we set */
1470     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
1471     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
1472   }
1473 
1474   /* create our matrix */
1475   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);
1476          CHKERRQ(ierr);
1477   A = *newmat;
1478   MatSetOption(A,MAT_COLUMNS_SORTED);
1479 
1480   if (!rank) {
1481     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
1482     /* read in my part of the matrix numerical values  */
1483     nz = procsnz[0];
1484     vals = buf;
1485     mycols = ibuf;
1486     if (size == 1)  nz -= extra_rows;
1487     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1488     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
1489 
1490     /* insert into matrix */
1491     jj      = rstart*bs;
1492     for ( i=0; i<m; i++ ) {
1493       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1494       mycols += locrowlens[i];
1495       vals   += locrowlens[i];
1496       jj++;
1497     }
1498     /* read in other processors (except the last one) and ship out */
1499     for ( i=1; i<size-1; i++ ) {
1500       nz = procsnz[i];
1501       vals = buf;
1502       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1503       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1504     }
1505     /* the last proc */
1506     if ( size != 1 ){
1507       nz = procsnz[i] - extra_rows;
1508       vals = buf;
1509       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1510       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
1511       MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);
1512     }
1513     PetscFree(procsnz);
1514   }
1515   else {
1516     /* receive numeric values */
1517     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
1518 
1519     /* receive message of values*/
1520     vals = buf;
1521     mycols = ibuf;
1522     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1523     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1524     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIBAIJ:something is wrong with file");
1525 
1526     /* insert into matrix */
1527     jj      = rstart*bs;
1528     for ( i=0; i<m; i++ ) {
1529       ierr    = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1530       mycols += locrowlens[i];
1531       vals   += locrowlens[i];
1532       jj++;
1533     }
1534   }
1535   PetscFree(locrowlens);
1536   PetscFree(buf);
1537   PetscFree(ibuf);
1538   PetscFree(rowners);
1539   PetscFree(dlens);
1540   PetscFree(mask);
1541   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1542   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1543   return 0;
1544 }
1545 
1546 
1547