xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision fea6b2d80d8ab4080338562a5cbef6dcdf5bb470)
1 #ifndef lint
2 static char vcid[] = "$Id: mpibaij.c,v 1.25 1996/09/14 03:08:37 bsmith Exp balay $";
3 #endif
4 
5 #include "src/mat/impls/baij/mpi/mpibaij.h"
6 #include "src/vec/vecimpl.h"
7 
8 #include "draw.h"
9 #include "pinclude/pviewer.h"
10 
11 extern int MatSetUpMultiply_MPIBAIJ(Mat);
12 extern int DisAssemble_MPIBAIJ(Mat);
13 extern int MatIncreaseOverlap_MPIBAIJ(Mat,int,IS *,int);
14 extern int MatGetSubMatrices_MPIBAIJ(Mat,int,IS *,IS *,MatGetSubMatrixCall,Mat **);
15 extern int MatLUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,Mat *);
16 extern int MatLUFactorNumeric_MPIBAIJ(Mat,Mat *);
17 extern int MatLUFactor_MPIBAIJ(Mat,IS,IS,double);
18 extern int MatSolve_MPIBAIJ(Mat,Vec,Vec);
19 extern int MatSolveAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
20 extern int MatSolveTrans_MPIBAIJ(Mat,Vec,Vec);
21 extern int MatSolveTransAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
22 extern int MatILUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,int,Mat *);
23 
24 
25 /*
26      Local utility routine that creates a mapping from the global column
27    number to the local number in the off-diagonal part of the local
28    storage of the matrix.  This is done in a non scable way since the
29    length of colmap equals the global matrix length.
30 */
31 static int CreateColmap_MPIBAIJ_Private(Mat mat)
32 {
33   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
34   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
35   int         nbs = B->nbs,i;
36 
37   baij->colmap = (int *) PetscMalloc(baij->Nbs*sizeof(int));CHKPTRQ(baij->colmap);
38   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
39   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
40   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i + 1;
41   return 0;
42 }
43 
44 static int MatGetRowIJ_MPIBAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
45                             PetscTruth *done)
46 {
47   Mat_MPIBAIJ *aij = (Mat_MPIBAIJ *) mat->data;
48   int         ierr;
49   if (aij->size == 1) {
50     ierr = MatGetRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
51   } else SETERRQ(1,"MatGetRowIJ_MPIBAIJ:not supported in parallel");
52   return 0;
53 }
54 
55 static int MatRestoreRowIJ_MPIBAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
56                                 PetscTruth *done)
57 {
58   Mat_MPIBAIJ *aij = (Mat_MPIBAIJ *) mat->data;
59   int        ierr;
60   if (aij->size == 1) {
61     ierr = MatRestoreRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
62   } else SETERRQ(1,"MatRestoreRowIJ_MPIBAIJ:not supported in parallel");
63   return 0;
64 }
65 
66 static int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
67 {
68   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
69   Scalar      value;
70   int         ierr,i,j, rstart = baij->rstart, rend = baij->rend;
71   int         cstart = baij->cstart, cend = baij->cend,row,col;
72   int         roworiented = baij->roworiented,rstart_orig,rend_orig;
73   int         cstart_orig,cend_orig,bs=baij->bs;
74 
75   if (baij->insertmode != NOT_SET_VALUES && baij->insertmode != addv) {
76     SETERRQ(1,"MatSetValues_MPIBAIJ:Cannot mix inserts and adds");
77   }
78   baij->insertmode = addv;
79   rstart_orig = rstart*bs;
80   rend_orig   = rend*bs;
81   cstart_orig = cstart*bs;
82   cend_orig   = cend*bs;
83   for ( i=0; i<m; i++ ) {
84     if (im[i] < 0) SETERRQ(1,"MatSetValues_MPIBAIJ:Negative row");
85     if (im[i] >= baij->M) SETERRQ(1,"MatSetValues_MPIBAIJ:Row too large");
86     if (im[i] >= rstart_orig && im[i] < rend_orig) {
87       row = im[i] - rstart_orig;
88       for ( j=0; j<n; j++ ) {
89         if (in[j] < 0) SETERRQ(1,"MatSetValues_MPIBAIJ:Negative column");
90         if (in[j] >= baij->N) SETERRQ(1,"MatSetValues_MPIBAIJ:Col too large");
91         if (in[j] >= cstart_orig && in[j] < cend_orig){
92           col = in[j] - cstart_orig;
93           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
94           ierr = MatSetValues(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
95         }
96         else {
97           if (mat->was_assembled) {
98             if (!baij->colmap) {
99               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
100             }
101             col = baij->colmap[in[j]/bs] - 1 + in[j]%bs;
102             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
103               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
104               col =  in[j];
105             }
106           }
107           else col = in[j];
108           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
109           ierr = MatSetValues(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
110         }
111       }
112     }
113     else {
114       if (roworiented) {
115         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
116       }
117       else {
118         row = im[i];
119         for ( j=0; j<n; j++ ) {
120           ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
121         }
122       }
123     }
124   }
125   return 0;
126 }
127 
128 static int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
129 {
130   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
131   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
132   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col;
133 
134   for ( i=0; i<m; i++ ) {
135     if (idxm[i] < 0) SETERRQ(1,"MatGetValues_MPIBAIJ:Negative row");
136     if (idxm[i] >= baij->M) SETERRQ(1,"MatGetValues_MPIBAIJ:Row too large");
137     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
138       row = idxm[i] - bsrstart;
139       for ( j=0; j<n; j++ ) {
140         if (idxn[j] < 0) SETERRQ(1,"MatGetValues_MPIBAIJ:Negative column");
141         if (idxn[j] >= baij->N) SETERRQ(1,"MatGetValues_MPIBAIJ:Col too large");
142         if (idxn[j] >= bscstart && idxn[j] < bscend){
143           col = idxn[j] - bscstart;
144           ierr = MatGetValues(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
145         }
146         else {
147           if (!baij->colmap) {
148             ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
149           }
150           if((baij->colmap[idxn[j]/bs]-1 < 0) ||
151              (baij->garray[baij->colmap[idxn[j]/bs]-1] != idxn[j]/bs)) *(v+i*n+j) = 0.0;
152           else {
153             col  = (baij->colmap[idxn[j]/bs]-1)*bs + idxn[j]%bs;
154             ierr = MatGetValues(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
155           }
156         }
157       }
158     }
159     else {
160       SETERRQ(1,"MatGetValues_MPIBAIJ:Only local values currently supported");
161     }
162   }
163   return 0;
164 }
165 
166 static int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
167 {
168   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
169   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
170   int        ierr, i,bs2=baij->bs2;
171   double     sum = 0.0;
172   Scalar     *v;
173 
174   if (baij->size == 1) {
175     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
176   } else {
177     if (type == NORM_FROBENIUS) {
178       v = amat->a;
179       for (i=0; i<amat->nz*bs2; i++ ) {
180 #if defined(PETSC_COMPLEX)
181         sum += real(conj(*v)*(*v)); v++;
182 #else
183         sum += (*v)*(*v); v++;
184 #endif
185       }
186       v = bmat->a;
187       for (i=0; i<bmat->nz*bs2; i++ ) {
188 #if defined(PETSC_COMPLEX)
189         sum += real(conj(*v)*(*v)); v++;
190 #else
191         sum += (*v)*(*v); v++;
192 #endif
193       }
194       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
195       *norm = sqrt(*norm);
196     }
197     else
198       SETERRQ(1,"MatNorm_SeqBAIJ:No support for this norm yet");
199   }
200   return 0;
201 }
202 
203 static int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
204 {
205   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
206   MPI_Comm    comm = mat->comm;
207   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
208   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
209   MPI_Request *send_waits,*recv_waits;
210   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
211   InsertMode  addv;
212   Scalar      *rvalues,*svalues;
213 
214   /* make sure all processors are either in INSERTMODE or ADDMODE */
215   MPI_Allreduce(&baij->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
216   if (addv == (ADD_VALUES|INSERT_VALUES)) {
217     SETERRQ(1,"MatAssemblyBegin_MPIBAIJ:Some processors inserted others added");
218   }
219   baij->insertmode = addv; /* in case this processor had no cache */
220 
221   /*  first count number of contributors to each processor */
222   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
223   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
224   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
225   for ( i=0; i<baij->stash.n; i++ ) {
226     idx = baij->stash.idx[i];
227     for ( j=0; j<size; j++ ) {
228       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
229         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
230       }
231     }
232   }
233   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
234 
235   /* inform other processors of number of messages and max length*/
236   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
237   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
238   nreceives = work[rank];
239   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
240   nmax = work[rank];
241   PetscFree(work);
242 
243   /* post receives:
244        1) each message will consist of ordered pairs
245      (global index,value) we store the global index as a double
246      to simplify the message passing.
247        2) since we don't know how long each individual message is we
248      allocate the largest needed buffer for each receive. Potentially
249      this is a lot of wasted space.
250 
251 
252        This could be done better.
253   */
254   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
255   CHKPTRQ(rvalues);
256   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
257   CHKPTRQ(recv_waits);
258   for ( i=0; i<nreceives; i++ ) {
259     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
260               comm,recv_waits+i);
261   }
262 
263   /* do sends:
264       1) starts[i] gives the starting index in svalues for stuff going to
265          the ith processor
266   */
267   svalues = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
268   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
269   CHKPTRQ(send_waits);
270   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
271   starts[0] = 0;
272   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
273   for ( i=0; i<baij->stash.n; i++ ) {
274     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
275     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
276     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
277   }
278   PetscFree(owner);
279   starts[0] = 0;
280   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
281   count = 0;
282   for ( i=0; i<size; i++ ) {
283     if (procs[i]) {
284       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
285                 comm,send_waits+count++);
286     }
287   }
288   PetscFree(starts); PetscFree(nprocs);
289 
290   /* Free cache space */
291   PLogInfo(0,"[%d]MatAssemblyBegin_MPIBAIJ:Number of off processor values %d\n",rank,baij->stash.n);
292   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
293 
294   baij->svalues    = svalues;    baij->rvalues    = rvalues;
295   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
296   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
297   baij->rmax       = nmax;
298 
299   return 0;
300 }
301 
302 
303 static int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
304 {
305   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
306   MPI_Status  *send_status,recv_status;
307   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
308   int         bs=baij->bs,row,col,other_disassembled;
309   Scalar      *values,val;
310   InsertMode  addv = baij->insertmode;
311 
312   /*  wait on receives */
313   while (count) {
314     MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);
315     /* unpack receives into our local space */
316     values = baij->rvalues + 3*imdex*baij->rmax;
317     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
318     n = n/3;
319     for ( i=0; i<n; i++ ) {
320       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
321       col = (int) PetscReal(values[3*i+1]);
322       val = values[3*i+2];
323       if (col >= baij->cstart*bs && col < baij->cend*bs) {
324         col -= baij->cstart*bs;
325         MatSetValues(baij->A,1,&row,1,&col,&val,addv);
326       }
327       else {
328         if (mat->was_assembled) {
329           if (!baij->colmap) {
330             ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);
331           }
332           col = (baij->colmap[col/bs]-1)*bs + col%bs;
333           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
334             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
335             col = (int) PetscReal(values[3*i+1]);
336           }
337         }
338         MatSetValues(baij->B,1,&row,1,&col,&val,addv);
339       }
340     }
341     count--;
342   }
343   PetscFree(baij->recv_waits); PetscFree(baij->rvalues);
344 
345   /* wait on sends */
346   if (baij->nsends) {
347     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));
348     CHKPTRQ(send_status);
349     MPI_Waitall(baij->nsends,baij->send_waits,send_status);
350     PetscFree(send_status);
351   }
352   PetscFree(baij->send_waits); PetscFree(baij->svalues);
353 
354   baij->insertmode = NOT_SET_VALUES;
355   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
356   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
357 
358   /* determine if any processor has disassembled, if so we must
359      also disassemble ourselfs, in order that we may reassemble. */
360   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
361   if (mat->was_assembled && !other_disassembled) {
362     ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
363   }
364 
365   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
366     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
367   }
368   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
369   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
370 
371   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
372   return 0;
373 }
374 
375 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
376 {
377   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
378   int          ierr;
379 
380   if (baij->size == 1) {
381     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
382   }
383   else SETERRQ(1,"MatView_MPIBAIJ_Binary:Only uniprocessor output supported");
384   return 0;
385 }
386 
387 static int MatView_MPIBAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
388 {
389   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
390   int          ierr, format,rank,bs=baij->bs;
391   FILE         *fd;
392   ViewerType   vtype;
393 
394   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
395   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
396     ierr = ViewerGetFormat(viewer,&format);
397     if (format == ASCII_FORMAT_INFO_DETAILED) {
398       MatInfo info;
399       MPI_Comm_rank(mat->comm,&rank);
400       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
401       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
402       PetscSequentialPhaseBegin(mat->comm,1);
403       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
404               rank,baij->m,(int)info.nz_used*bs,(int)info.nz_allocated*bs,
405               baij->bs,(int)info.memory);
406       ierr = MatGetInfo(baij->A,MAT_LOCAL,&info);
407       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
408       ierr = MatGetInfo(baij->B,MAT_LOCAL,&info);
409       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
410       fflush(fd);
411       PetscSequentialPhaseEnd(mat->comm,1);
412       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
413       return 0;
414     }
415     else if (format == ASCII_FORMAT_INFO) {
416       return 0;
417     }
418   }
419 
420   if (vtype == DRAW_VIEWER) {
421     Draw       draw;
422     PetscTruth isnull;
423     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
424     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
425   }
426 
427   if (vtype == ASCII_FILE_VIEWER) {
428     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
429     PetscSequentialPhaseBegin(mat->comm,1);
430     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
431            baij->rank,baij->m,baij->rstart*bs,baij->rend*bs,baij->n,
432             baij->cstart*bs,baij->cend*bs);
433     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
434     ierr = MatView(baij->B,viewer); CHKERRQ(ierr);
435     fflush(fd);
436     PetscSequentialPhaseEnd(mat->comm,1);
437   }
438   else {
439     int size = baij->size;
440     rank = baij->rank;
441     if (size == 1) {
442       ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
443     }
444     else {
445       /* assemble the entire matrix onto first processor. */
446       Mat         A;
447       Mat_SeqBAIJ *Aloc;
448       int         M = baij->M, N = baij->N,*ai,*aj,row,col,i,j,k,*rvals;
449       int         mbs=baij->mbs;
450       Scalar      *a;
451 
452       if (!rank) {
453         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
454         CHKERRQ(ierr);
455       }
456       else {
457         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
458         CHKERRQ(ierr);
459       }
460       PLogObjectParent(mat,A);
461 
462       /* copy over the A part */
463       Aloc = (Mat_SeqBAIJ*) baij->A->data;
464       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
465       row = baij->rstart;
466       rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
467 
468       for ( i=0; i<mbs; i++ ) {
469         rvals[0] = bs*(baij->rstart + i);
470         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
471         for ( j=ai[i]; j<ai[i+1]; j++ ) {
472           col = (baij->cstart+aj[j])*bs;
473           for (k=0; k<bs; k++ ) {
474             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
475             col++; a += bs;
476           }
477         }
478       }
479       /* copy over the B part */
480       Aloc = (Mat_SeqBAIJ*) baij->B->data;
481       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
482       row = baij->rstart*bs;
483       for ( i=0; i<mbs; i++ ) {
484         rvals[0] = bs*(baij->rstart + i);
485         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
486         for ( j=ai[i]; j<ai[i+1]; j++ ) {
487           col = baij->garray[aj[j]]*bs;
488           for (k=0; k<bs; k++ ) {
489             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
490             col++; a += bs;
491           }
492         }
493       }
494       PetscFree(rvals);
495       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
496       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
497       if (!rank) {
498         ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
499       }
500       ierr = MatDestroy(A); CHKERRQ(ierr);
501     }
502   }
503   return 0;
504 }
505 
506 
507 
508 static int MatView_MPIBAIJ(PetscObject obj,Viewer viewer)
509 {
510   Mat         mat = (Mat) obj;
511   int         ierr;
512   ViewerType  vtype;
513 
514   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
515   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
516       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
517     ierr = MatView_MPIBAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
518   }
519   else if (vtype == BINARY_FILE_VIEWER) {
520     return MatView_MPIBAIJ_Binary(mat,viewer);
521   }
522   return 0;
523 }
524 
525 static int MatDestroy_MPIBAIJ(PetscObject obj)
526 {
527   Mat         mat = (Mat) obj;
528   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
529   int         ierr;
530 
531 #if defined(PETSC_LOG)
532   PLogObjectState(obj,"Rows=%d, Cols=%d",baij->M,baij->N);
533 #endif
534 
535   PetscFree(baij->rowners);
536   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
537   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
538   if (baij->colmap) PetscFree(baij->colmap);
539   if (baij->garray) PetscFree(baij->garray);
540   if (baij->lvec)   VecDestroy(baij->lvec);
541   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
542   if (baij->rowvalues) PetscFree(baij->rowvalues);
543   PetscFree(baij);
544   PLogObjectDestroy(mat);
545   PetscHeaderDestroy(mat);
546   return 0;
547 }
548 
549 static int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
550 {
551   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
552   int         ierr, nt;
553 
554   VecGetLocalSize_Fast(xx,nt);
555   if (nt != a->n) {
556     SETERRQ(1,"MatMult_MPIBAIJ:Incompatible parition of A and xx");
557   }
558   VecGetLocalSize_Fast(yy,nt);
559   if (nt != a->m) {
560     SETERRQ(1,"MatMult_MPIBAIJ:Incompatible parition of A and yy");
561   }
562   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
563   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
564   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
565   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
566   return 0;
567 }
568 
569 static int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
570 {
571   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
572   int        ierr;
573   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
574   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
575   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
576   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
577   return 0;
578 }
579 
580 static int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
581 {
582   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
583   int        ierr;
584 
585   /* do nondiagonal part */
586   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
587   /* send it on its way */
588   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
589   /* do local part */
590   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
591   /* receive remote parts: note this assumes the values are not actually */
592   /* inserted in yy until the next line, which is true for my implementation*/
593   /* but is not perhaps always true. */
594   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,
595                   (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx);CHKERRQ(ierr);
596   return 0;
597 }
598 
599 static int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
600 {
601   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
602   int        ierr;
603 
604   /* do nondiagonal part */
605   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
606   /* send it on its way */
607   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
608   /* do local part */
609   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
610   /* receive remote parts: note this assumes the values are not actually */
611   /* inserted in yy until the next line, which is true for my implementation*/
612   /* but is not perhaps always true. */
613   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
614   return 0;
615 }
616 
617 /*
618   This only works correctly for square matrices where the subblock A->A is the
619    diagonal block
620 */
621 static int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
622 {
623   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
624   if (a->M != a->N)
625     SETERRQ(1,"MatGetDiagonal_MPIBAIJ:Supports only square matrix where A->A is diag block");
626   return MatGetDiagonal(a->A,v);
627 }
628 
629 static int MatScale_MPIBAIJ(Scalar *aa,Mat A)
630 {
631   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
632   int        ierr;
633   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
634   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
635   return 0;
636 }
637 static int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
638 {
639   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
640   *m = mat->M; *n = mat->N;
641   return 0;
642 }
643 
644 static int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
645 {
646   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
647   *m = mat->m; *n = mat->N;
648   return 0;
649 }
650 
651 static int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
652 {
653   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
654   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
655   return 0;
656 }
657 
658 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
659 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
660 
661 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
662 {
663   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
664   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
665   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
666   int        nztot, nzA, nzB, lrow, brstart = mat->rstart*bs, brend = mat->rend*bs;
667   int        *cmap, *idx_p,cstart = mat->cstart;
668 
669   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,"MatGetRow_MPIBAIJ:Already active");
670   mat->getrowactive = PETSC_TRUE;
671 
672   if (!mat->rowvalues && (idx || v)) {
673     /*
674         allocate enough space to hold information from the longest row.
675     */
676     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
677     int     max = 1,mbs = mat->mbs,tmp;
678     for ( i=0; i<mbs; i++ ) {
679       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
680       if (max < tmp) { max = tmp; }
681     }
682     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
683     CHKPTRQ(mat->rowvalues);
684     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
685   }
686 
687 
688   if (row < brstart || row >= brend) SETERRQ(1,"MatGetRow_MPIBAIJ:Only local rows")
689   lrow = row - brstart;
690 
691   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
692   if (!v)   {pvA = 0; pvB = 0;}
693   if (!idx) {pcA = 0; if (!v) pcB = 0;}
694   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
695   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
696   nztot = nzA + nzB;
697 
698   cmap  = mat->garray;
699   if (v  || idx) {
700     if (nztot) {
701       /* Sort by increasing column numbers, assuming A and B already sorted */
702       int imark = -1;
703       if (v) {
704         *v = v_p = mat->rowvalues;
705         for ( i=0; i<nzB; i++ ) {
706           if (cmap[cworkB[i]/bs] < cstart)   v_p[i] = vworkB[i];
707           else break;
708         }
709         imark = i;
710         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
711         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
712       }
713       if (idx) {
714         *idx = idx_p = mat->rowindices;
715         if (imark > -1) {
716           for ( i=0; i<imark; i++ ) {
717             idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs;
718           }
719         } else {
720           for ( i=0; i<nzB; i++ ) {
721             if (cmap[cworkB[i]/bs] < cstart)
722               idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
723             else break;
724           }
725           imark = i;
726         }
727         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart*bs + cworkA[i];
728         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
729       }
730     }
731     else {
732       if (idx) *idx = 0;
733       if (v)   *v   = 0;
734     }
735   }
736   *nz = nztot;
737   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
738   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
739   return 0;
740 }
741 
742 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
743 {
744   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
745   if (baij->getrowactive == PETSC_FALSE) {
746     SETERRQ(1,"MatRestoreRow_MPIBAIJ:MatGetRow not called");
747   }
748   baij->getrowactive = PETSC_FALSE;
749   return 0;
750 }
751 
752 static int MatGetBlockSize_MPIBAIJ(Mat mat,int *bs)
753 {
754   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
755   *bs = baij->bs;
756   return 0;
757 }
758 
759 static int MatZeroEntries_MPIBAIJ(Mat A)
760 {
761   Mat_MPIBAIJ *l = (Mat_MPIBAIJ *) A->data;
762   int         ierr;
763   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
764   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
765   return 0;
766 }
767 
768 static int MatGetInfo_MPIBAIJ(Mat matin,MatInfoType flag,MatInfo *info)
769 {
770   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) matin->data;
771   Mat         A = a->A, B = a->B;
772   int         ierr;
773   double      isend[5], irecv[5];
774 
775   info->rows_global    = (double)a->M;
776   info->columns_global = (double)a->N;
777   info->rows_local     = (double)a->m;
778   info->columns_local  = (double)a->N;
779   info->block_size     = (double)a->bs;
780   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
781   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->memory;
782   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
783   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->memory;
784   if (flag == MAT_LOCAL) {
785     info->nz_used      = isend[0];
786     info->nz_allocated = isend[1];
787     info->nz_unneeded  = isend[2];
788     info->memory       = isend[3];
789     info->mallocs      = isend[4];
790   } else if (flag == MAT_GLOBAL_MAX) {
791     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_MAX,matin->comm);
792     info->nz_used      = irecv[0];
793     info->nz_allocated = irecv[1];
794     info->nz_unneeded  = irecv[2];
795     info->memory       = irecv[3];
796     info->mallocs      = irecv[4];
797   } else if (flag == MAT_GLOBAL_SUM) {
798     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_SUM,matin->comm);
799     info->nz_used      = irecv[0];
800     info->nz_allocated = irecv[1];
801     info->nz_unneeded  = irecv[2];
802     info->memory       = irecv[3];
803     info->mallocs      = irecv[4];
804   }
805   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
806   info->fill_ratio_needed = 0;
807   info->factor_mallocs    = 0;
808   return 0;
809 }
810 
811 static int MatSetOption_MPIBAIJ(Mat A,MatOption op)
812 {
813   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
814 
815   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
816       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
817       op == MAT_COLUMNS_SORTED ||
818       op == MAT_ROW_ORIENTED) {
819         MatSetOption(a->A,op);
820         MatSetOption(a->B,op);
821   }
822   else if (op == MAT_ROWS_SORTED ||
823            op == MAT_SYMMETRIC ||
824            op == MAT_STRUCTURALLY_SYMMETRIC ||
825            op == MAT_YES_NEW_DIAGONALS)
826     PLogInfo(A,"Info:MatSetOption_MPIBAIJ:Option ignored\n");
827   else if (op == MAT_COLUMN_ORIENTED) {
828     a->roworiented = 0;
829     MatSetOption(a->A,op);
830     MatSetOption(a->B,op);
831   }
832   else if (op == MAT_NO_NEW_DIAGONALS)
833     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIBAIJ:MAT_NO_NEW_DIAGONALS");}
834   else
835     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIBAIJ:unknown option");}
836   return 0;
837 }
838 
839 static int MatTranspose_MPIBAIJ(Mat A,Mat *matout)
840 {
841   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) A->data;
842   Mat_SeqBAIJ *Aloc;
843   Mat        B;
844   int        ierr,M=baij->M,N=baij->N,*ai,*aj,row,i,*rvals,j,k,col;
845   int        bs=baij->bs,mbs=baij->mbs;
846   Scalar     *a;
847 
848   if (matout == PETSC_NULL && M != N)
849     SETERRQ(1,"MatTranspose_MPIBAIJ:Square matrix only for in-place");
850   ierr = MatCreateMPIBAIJ(A->comm,baij->bs,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);
851   CHKERRQ(ierr);
852 
853   /* copy over the A part */
854   Aloc = (Mat_SeqBAIJ*) baij->A->data;
855   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
856   row = baij->rstart;
857   rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
858 
859   for ( i=0; i<mbs; i++ ) {
860     rvals[0] = bs*(baij->rstart + i);
861     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
862     for ( j=ai[i]; j<ai[i+1]; j++ ) {
863       col = (baij->cstart+aj[j])*bs;
864       for (k=0; k<bs; k++ ) {
865         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
866         col++; a += bs;
867       }
868     }
869   }
870   /* copy over the B part */
871   Aloc = (Mat_SeqBAIJ*) baij->B->data;
872   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
873   row = baij->rstart*bs;
874   for ( i=0; i<mbs; i++ ) {
875     rvals[0] = bs*(baij->rstart + i);
876     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
877     for ( j=ai[i]; j<ai[i+1]; j++ ) {
878       col = baij->garray[aj[j]]*bs;
879       for (k=0; k<bs; k++ ) {
880         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
881         col++; a += bs;
882       }
883     }
884   }
885   PetscFree(rvals);
886   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
887   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
888 
889   if (matout != PETSC_NULL) {
890     *matout = B;
891   } else {
892     /* This isn't really an in-place transpose .... but free data structures from baij */
893     PetscFree(baij->rowners);
894     ierr = MatDestroy(baij->A); CHKERRQ(ierr);
895     ierr = MatDestroy(baij->B); CHKERRQ(ierr);
896     if (baij->colmap) PetscFree(baij->colmap);
897     if (baij->garray) PetscFree(baij->garray);
898     if (baij->lvec) VecDestroy(baij->lvec);
899     if (baij->Mvctx) VecScatterDestroy(baij->Mvctx);
900     PetscFree(baij);
901     PetscMemcpy(A,B,sizeof(struct _Mat));
902     PetscHeaderDestroy(B);
903   }
904   return 0;
905 }
906 
907 int MatDiagonalScale_MPIBAIJ(Mat A,Vec ll,Vec rr)
908 {
909   Mat a = ((Mat_MPIBAIJ *) A->data)->A;
910   Mat b = ((Mat_MPIBAIJ *) A->data)->B;
911   int ierr,s1,s2,s3;
912 
913   if (ll)  {
914     ierr = VecGetLocalSize(ll,&s1); CHKERRQ(ierr);
915     ierr = MatGetLocalSize(A,&s2,&s3); CHKERRQ(ierr);
916     if (s1!=s2) SETERRQ(1,"MatDiagonalScale_MPIBAIJ: non-conforming local sizes");
917     ierr = MatDiagonalScale(a,ll,0); CHKERRQ(ierr);
918     ierr = MatDiagonalScale(b,ll,0); CHKERRQ(ierr);
919   }
920   if (rr) SETERRQ(1,"MatDiagonalScale_MPIBAIJ:not supported for right vector");
921   return 0;
922 }
923 
924 /* the code does not do the diagonal entries correctly unless the
925    matrix is square and the column and row owerships are identical.
926    This is a BUG. The only way to fix it seems to be to access
927    baij->A and baij->B directly and not through the MatZeroRows()
928    routine.
929 */
930 static int MatZeroRows_MPIBAIJ(Mat A,IS is,Scalar *diag)
931 {
932   Mat_MPIBAIJ    *l = (Mat_MPIBAIJ *) A->data;
933   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
934   int            *procs,*nprocs,j,found,idx,nsends,*work;
935   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
936   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
937   int            *lens,imdex,*lrows,*values,bs=l->bs;
938   MPI_Comm       comm = A->comm;
939   MPI_Request    *send_waits,*recv_waits;
940   MPI_Status     recv_status,*send_status;
941   IS             istmp;
942 
943   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
944   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
945 
946   /*  first count number of contributors to each processor */
947   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
948   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
949   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
950   for ( i=0; i<N; i++ ) {
951     idx = rows[i];
952     found = 0;
953     for ( j=0; j<size; j++ ) {
954       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
955         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
956       }
957     }
958     if (!found) SETERRQ(1,"MatZeroRows_MPIBAIJ:Index out of range");
959   }
960   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
961 
962   /* inform other processors of number of messages and max length*/
963   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
964   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
965   nrecvs = work[rank];
966   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
967   nmax = work[rank];
968   PetscFree(work);
969 
970   /* post receives:   */
971   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
972   CHKPTRQ(rvalues);
973   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
974   CHKPTRQ(recv_waits);
975   for ( i=0; i<nrecvs; i++ ) {
976     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
977   }
978 
979   /* do sends:
980       1) starts[i] gives the starting index in svalues for stuff going to
981          the ith processor
982   */
983   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
984   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
985   CHKPTRQ(send_waits);
986   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
987   starts[0] = 0;
988   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
989   for ( i=0; i<N; i++ ) {
990     svalues[starts[owner[i]]++] = rows[i];
991   }
992   ISRestoreIndices(is,&rows);
993 
994   starts[0] = 0;
995   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
996   count = 0;
997   for ( i=0; i<size; i++ ) {
998     if (procs[i]) {
999       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
1000     }
1001   }
1002   PetscFree(starts);
1003 
1004   base = owners[rank]*bs;
1005 
1006   /*  wait on receives */
1007   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
1008   source = lens + nrecvs;
1009   count  = nrecvs; slen = 0;
1010   while (count) {
1011     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
1012     /* unpack receives into our local space */
1013     MPI_Get_count(&recv_status,MPI_INT,&n);
1014     source[imdex]  = recv_status.MPI_SOURCE;
1015     lens[imdex]  = n;
1016     slen += n;
1017     count--;
1018   }
1019   PetscFree(recv_waits);
1020 
1021   /* move the data into the send scatter */
1022   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
1023   count = 0;
1024   for ( i=0; i<nrecvs; i++ ) {
1025     values = rvalues + i*nmax;
1026     for ( j=0; j<lens[i]; j++ ) {
1027       lrows[count++] = values[j] - base;
1028     }
1029   }
1030   PetscFree(rvalues); PetscFree(lens);
1031   PetscFree(owner); PetscFree(nprocs);
1032 
1033   /* actually zap the local rows */
1034   ierr = ISCreateGeneral(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
1035   PLogObjectParent(A,istmp);
1036   PetscFree(lrows);
1037   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
1038   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
1039   ierr = ISDestroy(istmp); CHKERRQ(ierr);
1040 
1041   /* wait on sends */
1042   if (nsends) {
1043     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
1044     CHKPTRQ(send_status);
1045     MPI_Waitall(nsends,send_waits,send_status);
1046     PetscFree(send_status);
1047   }
1048   PetscFree(send_waits); PetscFree(svalues);
1049 
1050   return 0;
1051 }
1052 extern int MatPrintHelp_SeqBAIJ(Mat);
1053 static int MatPrintHelp_MPIBAIJ(Mat A)
1054 {
1055   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1056 
1057   if (!a->rank) return MatPrintHelp_SeqBAIJ(a->A);
1058   else return 0;
1059 }
1060 
1061 static int MatConvertSameType_MPIBAIJ(Mat,Mat *,int);
1062 
1063 /* -------------------------------------------------------------------*/
1064 static struct _MatOps MatOps = {
1065   MatSetValues_MPIBAIJ,MatGetRow_MPIBAIJ,MatRestoreRow_MPIBAIJ,MatMult_MPIBAIJ,
1066   MatMultAdd_MPIBAIJ,MatMultTrans_MPIBAIJ,MatMultTransAdd_MPIBAIJ,MatSolve_MPIBAIJ,
1067   MatSolveAdd_MPIBAIJ,MatSolveTrans_MPIBAIJ,MatSolveTransAdd_MPIBAIJ,MatLUFactor_MPIBAIJ,
1068   0,0,MatTranspose_MPIBAIJ,MatGetInfo_MPIBAIJ,
1069   0,MatGetDiagonal_MPIBAIJ,MatDiagonalScale_MPIBAIJ,MatNorm_MPIBAIJ,
1070   MatAssemblyBegin_MPIBAIJ,MatAssemblyEnd_MPIBAIJ,0,MatSetOption_MPIBAIJ,
1071   MatZeroEntries_MPIBAIJ,MatZeroRows_MPIBAIJ,MatLUFactorSymbolic_MPIBAIJ,
1072   MatLUFactorNumeric_MPIBAIJ,0,0,MatGetSize_MPIBAIJ,
1073   MatGetLocalSize_MPIBAIJ,MatGetOwnershipRange_MPIBAIJ,MatILUFactorSymbolic_MPIBAIJ,0,
1074   0,0,0,MatConvertSameType_MPIBAIJ,0,0,
1075   0,0,0,MatGetSubMatrices_MPIBAIJ,
1076   MatIncreaseOverlap_MPIBAIJ,MatGetValues_MPIBAIJ,0,MatPrintHelp_MPIBAIJ,
1077   MatScale_MPIBAIJ,0,0,0,MatGetBlockSize_MPIBAIJ,
1078   MatGetRowIJ_MPIBAIJ,
1079   MatRestoreRowIJ_MPIBAIJ};
1080 
1081 
1082 /*@C
1083    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
1084    (block compressed row).  For good matrix assembly performance
1085    the user should preallocate the matrix storage by setting the parameters
1086    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1087    performance can be increased by more than a factor of 50.
1088 
1089    Input Parameters:
1090 .  comm - MPI communicator
1091 .  bs   - size of blockk
1092 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1093            This value should be the same as the local size used in creating the
1094            y vector for the matrix-vector product y = Ax.
1095 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1096            This value should be the same as the local size used in creating the
1097            x vector for the matrix-vector product y = Ax.
1098 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1099 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1100 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
1101            submatrix  (same for all local rows)
1102 .  d_nzz - array containing the number of block nonzeros in the various block rows
1103            of the in diagonal portion of the local (possibly different for each block
1104            row) or PETSC_NULL.  You must leave room for the diagonal entry even if
1105            it is zero.
1106 .  o_nz  - number of block nonzeros per block row in the off-diagonal portion of local
1107            submatrix (same for all local rows).
1108 .  o_nzz - array containing the number of nonzeros in the various block rows of the
1109            off-diagonal portion of the local submatrix (possibly different for
1110            each block row) or PETSC_NULL.
1111 
1112    Output Parameter:
1113 .  A - the matrix
1114 
1115    Notes:
1116    The user MUST specify either the local or global matrix dimensions
1117    (possibly both).
1118 
1119    Storage Information:
1120    For a square global matrix we define each processor's diagonal portion
1121    to be its local rows and the corresponding columns (a square submatrix);
1122    each processor's off-diagonal portion encompasses the remainder of the
1123    local matrix (a rectangular submatrix).
1124 
1125    The user can specify preallocated storage for the diagonal part of
1126    the local submatrix with either d_nz or d_nnz (not both).  Set
1127    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1128    memory allocation.  Likewise, specify preallocated storage for the
1129    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1130 
1131    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1132    the figure below we depict these three local rows and all columns (0-11).
1133 
1134 $          0 1 2 3 4 5 6 7 8 9 10 11
1135 $         -------------------
1136 $  row 3  |  o o o d d d o o o o o o
1137 $  row 4  |  o o o d d d o o o o o o
1138 $  row 5  |  o o o d d d o o o o o o
1139 $         -------------------
1140 $
1141 
1142    Thus, any entries in the d locations are stored in the d (diagonal)
1143    submatrix, and any entries in the o locations are stored in the
1144    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1145    stored simply in the MATSEQBAIJ format for compressed row storage.
1146 
1147    Now d_nz should indicate the number of nonzeros per row in the d matrix,
1148    and o_nz should indicate the number of nonzeros per row in the o matrix.
1149    In general, for PDE problems in which most nonzeros are near the diagonal,
1150    one expects d_nz >> o_nz.   For large problems you MUST preallocate memory
1151    or you will get TERRIBLE performance; see the users' manual chapter on
1152    matrices and the file $(PETSC_DIR)/Performance.
1153 
1154 .keywords: matrix, block, aij, compressed row, sparse, parallel
1155 
1156 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues()
1157 @*/
1158 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
1159                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1160 {
1161   Mat          B;
1162   Mat_MPIBAIJ  *b;
1163   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE;
1164 
1165   if (bs < 1) SETERRQ(1,"MatCreateMPIBAIJ: invalid block size specified");
1166   *A = 0;
1167   PetscHeaderCreate(B,_Mat,MAT_COOKIE,MATMPIBAIJ,comm);
1168   PLogObjectCreate(B);
1169   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
1170   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
1171   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1172   B->destroy    = MatDestroy_MPIBAIJ;
1173   B->view       = MatView_MPIBAIJ;
1174 
1175   B->factor     = 0;
1176   B->assembled  = PETSC_FALSE;
1177 
1178   b->insertmode = NOT_SET_VALUES;
1179   MPI_Comm_rank(comm,&b->rank);
1180   MPI_Comm_size(comm,&b->size);
1181 
1182   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
1183     SETERRQ(1,"MatCreateMPIBAIJ:Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1184   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) SETERRQ(1,"MatCreateMPIBAIJ: either M or m should be specified");
1185   if ( M == PETSC_DECIDE && n == PETSC_DECIDE)SETERRQ(1,"MatCreateMPIBAIJ: either N or n should be specified");
1186   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
1187   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
1188 
1189   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1190     work[0] = m; work[1] = n;
1191     mbs = m/bs; nbs = n/bs;
1192     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
1193     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
1194     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
1195   }
1196   if (m == PETSC_DECIDE) {
1197     Mbs = M/bs;
1198     if (Mbs*bs != M) SETERRQ(1,"MatCreateMPIBAIJ: No of global rows must be divisible by blocksize");
1199     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
1200     m   = mbs*bs;
1201   }
1202   if (n == PETSC_DECIDE) {
1203     Nbs = N/bs;
1204     if (Nbs*bs != N) SETERRQ(1,"MatCreateMPIBAIJ: No of global cols must be divisible by blocksize");
1205     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
1206     n   = nbs*bs;
1207   }
1208   if (mbs*bs != m || nbs*bs != n) SETERRQ(1,"MatCreateMPIBAIJ: No of local rows, cols must be divisible by blocksize");
1209 
1210   b->m = m; B->m = m;
1211   b->n = n; B->n = n;
1212   b->N = N; B->N = N;
1213   b->M = M; B->M = M;
1214   b->bs  = bs;
1215   b->bs2 = bs*bs;
1216   b->mbs = mbs;
1217   b->nbs = nbs;
1218   b->Mbs = Mbs;
1219   b->Nbs = Nbs;
1220 
1221   /* build local table of row and column ownerships */
1222   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1223   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
1224   b->cowners = b->rowners + b->size + 2;
1225   MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
1226   b->rowners[0] = 0;
1227   for ( i=2; i<=b->size; i++ ) {
1228     b->rowners[i] += b->rowners[i-1];
1229   }
1230   b->rstart = b->rowners[b->rank];
1231   b->rend   = b->rowners[b->rank+1];
1232   MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
1233   b->cowners[0] = 0;
1234   for ( i=2; i<=b->size; i++ ) {
1235     b->cowners[i] += b->cowners[i-1];
1236   }
1237   b->cstart = b->cowners[b->rank];
1238   b->cend   = b->cowners[b->rank+1];
1239 
1240   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1241   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1242   PLogObjectParent(B,b->A);
1243   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1244   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1245   PLogObjectParent(B,b->B);
1246 
1247   /* build cache for off array entries formed */
1248   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1249   b->colmap      = 0;
1250   b->garray      = 0;
1251   b->roworiented = 1;
1252 
1253   /* stuff used for matrix vector multiply */
1254   b->lvec      = 0;
1255   b->Mvctx     = 0;
1256 
1257   /* stuff for MatGetRow() */
1258   b->rowindices   = 0;
1259   b->rowvalues    = 0;
1260   b->getrowactive = PETSC_FALSE;
1261 
1262   *A = B;
1263   return 0;
1264 }
1265 static int MatConvertSameType_MPIBAIJ(Mat matin,Mat *newmat,int cpvalues)
1266 {
1267   Mat        mat;
1268   Mat_MPIBAIJ *a,*oldmat = (Mat_MPIBAIJ *) matin->data;
1269   int        ierr, len=0, flg;
1270 
1271   *newmat       = 0;
1272   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIBAIJ,matin->comm);
1273   PLogObjectCreate(mat);
1274   mat->data       = (void *) (a = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(a);
1275   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1276   mat->destroy    = MatDestroy_MPIBAIJ;
1277   mat->view       = MatView_MPIBAIJ;
1278   mat->factor     = matin->factor;
1279   mat->assembled  = PETSC_TRUE;
1280 
1281   a->m = mat->m   = oldmat->m;
1282   a->n = mat->n   = oldmat->n;
1283   a->M = mat->M   = oldmat->M;
1284   a->N = mat->N   = oldmat->N;
1285 
1286   a->bs  = oldmat->bs;
1287   a->bs2 = oldmat->bs2;
1288   a->mbs = oldmat->mbs;
1289   a->nbs = oldmat->nbs;
1290   a->Mbs = oldmat->Mbs;
1291   a->Nbs = oldmat->Nbs;
1292 
1293   a->rstart       = oldmat->rstart;
1294   a->rend         = oldmat->rend;
1295   a->cstart       = oldmat->cstart;
1296   a->cend         = oldmat->cend;
1297   a->size         = oldmat->size;
1298   a->rank         = oldmat->rank;
1299   a->insertmode   = NOT_SET_VALUES;
1300   a->rowvalues    = 0;
1301   a->getrowactive = PETSC_FALSE;
1302 
1303   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1304   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
1305   a->cowners = a->rowners + a->size + 2;
1306   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1307   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1308   if (oldmat->colmap) {
1309     a->colmap = (int *) PetscMalloc((a->Nbs)*sizeof(int));CHKPTRQ(a->colmap);
1310     PLogObjectMemory(mat,(a->Nbs)*sizeof(int));
1311     PetscMemcpy(a->colmap,oldmat->colmap,(a->Nbs)*sizeof(int));
1312   } else a->colmap = 0;
1313   if (oldmat->garray && (len = ((Mat_SeqBAIJ *) (oldmat->B->data))->nbs)) {
1314     a->garray = (int *) PetscMalloc(len*sizeof(int)); CHKPTRQ(a->garray);
1315     PLogObjectMemory(mat,len*sizeof(int));
1316     PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1317   } else a->garray = 0;
1318 
1319   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1320   PLogObjectParent(mat,a->lvec);
1321   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1322   PLogObjectParent(mat,a->Mvctx);
1323   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1324   PLogObjectParent(mat,a->A);
1325   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1326   PLogObjectParent(mat,a->B);
1327   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1328   if (flg) {
1329     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1330   }
1331   *newmat = mat;
1332   return 0;
1333 }
1334 
1335 #include "sys.h"
1336 
1337 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
1338 {
1339   Mat          A;
1340   int          i, nz, ierr, j,rstart, rend, fd;
1341   Scalar       *vals,*buf;
1342   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1343   MPI_Status   status;
1344   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
1345   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
1346   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,bs2,Mbs,mbs,extra_rows;
1347   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
1348   int          dcount,kmax,k,nzcount,tmp;
1349 
1350 
1351   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
1352   bs2  = bs*bs;
1353 
1354   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1355   if (!rank) {
1356     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1357     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
1358     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIBAIJ:not matrix object");
1359   }
1360 
1361   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1362   M = header[1]; N = header[2];
1363 
1364   if (M != N) SETERRQ(1,"MatLoad_SeqBAIJ:Can only do square matrices");
1365 
1366   /*
1367      This code adds extra rows to make sure the number of rows is
1368      divisible by the blocksize
1369   */
1370   Mbs        = M/bs;
1371   extra_rows = bs - M + bs*(Mbs);
1372   if (extra_rows == bs) extra_rows = 0;
1373   else                  Mbs++;
1374   if (extra_rows &&!rank) {
1375     PLogInfo(0,"MatLoad_SeqBAIJ:Padding loaded matrix to match blocksize\n");
1376   }
1377 
1378   /* determine ownership of all rows */
1379   mbs = Mbs/size + ((Mbs % size) > rank);
1380   m   = mbs * bs;
1381   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
1382   browners = rowners + size + 1;
1383   MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1384   rowners[0] = 0;
1385   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
1386   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
1387   rstart = rowners[rank];
1388   rend   = rowners[rank+1];
1389 
1390   /* distribute row lengths to all processors */
1391   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
1392   if (!rank) {
1393     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
1394     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1395     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
1396     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1397     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
1398     MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);
1399     PetscFree(sndcounts);
1400   }
1401   else {
1402     MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);
1403   }
1404 
1405   if (!rank) {
1406     /* calculate the number of nonzeros on each processor */
1407     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1408     PetscMemzero(procsnz,size*sizeof(int));
1409     for ( i=0; i<size; i++ ) {
1410       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
1411         procsnz[i] += rowlengths[j];
1412       }
1413     }
1414     PetscFree(rowlengths);
1415 
1416     /* determine max buffer needed and allocate it */
1417     maxnz = 0;
1418     for ( i=0; i<size; i++ ) {
1419       maxnz = PetscMax(maxnz,procsnz[i]);
1420     }
1421     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1422 
1423     /* read in my part of the matrix column indices  */
1424     nz = procsnz[0];
1425     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1426     mycols = ibuf;
1427     if (size == 1)  nz -= extra_rows;
1428     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1429     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
1430 
1431     /* read in every ones (except the last) and ship off */
1432     for ( i=1; i<size-1; i++ ) {
1433       nz = procsnz[i];
1434       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1435       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1436     }
1437     /* read in the stuff for the last proc */
1438     if ( size != 1 ) {
1439       nz = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
1440       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1441       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
1442       MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);
1443     }
1444     PetscFree(cols);
1445   }
1446   else {
1447     /* determine buffer space needed for message */
1448     nz = 0;
1449     for ( i=0; i<m; i++ ) {
1450       nz += locrowlens[i];
1451     }
1452     ibuf = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1453     mycols = ibuf;
1454     /* receive message of column indices*/
1455     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1456     MPI_Get_count(&status,MPI_INT,&maxnz);
1457     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIBAIJ:something is wrong with file");
1458   }
1459 
1460   /* loop over local rows, determining number of off diagonal entries */
1461   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
1462   odlens = dlens + (rend-rstart);
1463   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
1464   PetscMemzero(mask,3*Mbs*sizeof(int));
1465   masked1 = mask    + Mbs;
1466   masked2 = masked1 + Mbs;
1467   rowcount = 0; nzcount = 0;
1468   for ( i=0; i<mbs; i++ ) {
1469     dcount  = 0;
1470     odcount = 0;
1471     for ( j=0; j<bs; j++ ) {
1472       kmax = locrowlens[rowcount];
1473       for ( k=0; k<kmax; k++ ) {
1474         tmp = mycols[nzcount++]/bs;
1475         if (!mask[tmp]) {
1476           mask[tmp] = 1;
1477           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
1478           else masked1[dcount++] = tmp;
1479         }
1480       }
1481       rowcount++;
1482     }
1483 
1484     dlens[i]  = dcount;
1485     odlens[i] = odcount;
1486 
1487     /* zero out the mask elements we set */
1488     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
1489     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
1490   }
1491 
1492   /* create our matrix */
1493   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);
1494          CHKERRQ(ierr);
1495   A = *newmat;
1496   MatSetOption(A,MAT_COLUMNS_SORTED);
1497 
1498   if (!rank) {
1499     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
1500     /* read in my part of the matrix numerical values  */
1501     nz = procsnz[0];
1502     vals = buf;
1503     mycols = ibuf;
1504     if (size == 1)  nz -= extra_rows;
1505     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1506     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
1507 
1508     /* insert into matrix */
1509     jj      = rstart*bs;
1510     for ( i=0; i<m; i++ ) {
1511       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1512       mycols += locrowlens[i];
1513       vals   += locrowlens[i];
1514       jj++;
1515     }
1516     /* read in other processors (except the last one) and ship out */
1517     for ( i=1; i<size-1; i++ ) {
1518       nz = procsnz[i];
1519       vals = buf;
1520       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1521       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1522     }
1523     /* the last proc */
1524     if ( size != 1 ){
1525       nz = procsnz[i] - extra_rows;
1526       vals = buf;
1527       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1528       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
1529       MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);
1530     }
1531     PetscFree(procsnz);
1532   }
1533   else {
1534     /* receive numeric values */
1535     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
1536 
1537     /* receive message of values*/
1538     vals = buf;
1539     mycols = ibuf;
1540     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1541     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1542     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIBAIJ:something is wrong with file");
1543 
1544     /* insert into matrix */
1545     jj      = rstart*bs;
1546     for ( i=0; i<m; i++ ) {
1547       ierr    = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1548       mycols += locrowlens[i];
1549       vals   += locrowlens[i];
1550       jj++;
1551     }
1552   }
1553   PetscFree(locrowlens);
1554   PetscFree(buf);
1555   PetscFree(ibuf);
1556   PetscFree(rowners);
1557   PetscFree(dlens);
1558   PetscFree(mask);
1559   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1560   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1561   return 0;
1562 }
1563 
1564 
1565