xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision 70f55243aafb320636e2a54ff30cab5d1e8d3d7b)
1 #ifndef lint
2 static char vcid[] = "$Id: mpibaij.c,v 1.20 1996/08/06 16:51:30 balay Exp bsmith $";
3 #endif
4 
5 #include "src/mat/impls/baij/mpi/mpibaij.h"
6 #include "src/vec/vecimpl.h"
7 
8 #include "draw.h"
9 #include "pinclude/pviewer.h"
10 
11 extern int MatSetUpMultiply_MPIBAIJ(Mat);
12 extern int DisAssemble_MPIBAIJ(Mat);
13 extern int MatIncreaseOverlap_MPIBAIJ(Mat,int,IS *,int);
14 extern int MatGetSubMatrices_MPIBAIJ(Mat,int,IS *,IS *,MatGetSubMatrixCall,Mat **);
15 extern int MatLUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,Mat *);
16 extern int MatLUFactorNumeric_MPIBAIJ(Mat,Mat *);
17 extern int MatLUFactor_MPIBAIJ(Mat,IS,IS,double);
18 extern int MatSolve_MPIBAIJ(Mat,Vec,Vec);
19 extern int MatSolveAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
20 extern int MatSolveTrans_MPIBAIJ(Mat,Vec,Vec);
21 extern int MatSolveTransAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
22 extern int MatILUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,int,Mat *);
23 
24 /* local utility routine that creates a mapping from the global column
25 number to the local number in the off-diagonal part of the local
26 storage of the matrix.  This is done in a non scable way since the
27 length of colmap equals the global matrix length.
28 */
29 static int CreateColmap_MPIBAIJ_Private(Mat mat)
30 {
31   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
32   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
33   int        nbs = B->nbs,i;
34 
35   baij->colmap = (int *) PetscMalloc(baij->Nbs*sizeof(int));CHKPTRQ(baij->colmap);
36   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
37   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
38   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i + 1;
39   return 0;
40 }
41 
42 
43 static int MatGetReordering_MPIBAIJ(Mat mat,MatReordering type,IS *rperm,IS *cperm)
44 {
45   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
46   int         ierr;
47   if (baij->size == 1) {
48     ierr = MatGetReordering(baij->A,type,rperm,cperm); CHKERRQ(ierr);
49   } else SETERRQ(1,"MatGetReordering_MPIBAIJ:not supported in parallel");
50   return 0;
51 }
52 
53 static int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
54 {
55   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
56   Scalar      value;
57   int         ierr,i,j, rstart = baij->rstart, rend = baij->rend;
58   int         cstart = baij->cstart, cend = baij->cend,row,col;
59   int         roworiented = baij->roworiented,rstart_orig,rend_orig;
60   int         cstart_orig,cend_orig,bs=baij->bs;
61 
62   if (baij->insertmode != NOT_SET_VALUES && baij->insertmode != addv) {
63     SETERRQ(1,"MatSetValues_MPIBAIJ:Cannot mix inserts and adds");
64   }
65   baij->insertmode = addv;
66   rstart_orig = rstart*bs;
67   rend_orig   = rend*bs;
68   cstart_orig = cstart*bs;
69   cend_orig   = cend*bs;
70   for ( i=0; i<m; i++ ) {
71     if (im[i] < 0) SETERRQ(1,"MatSetValues_MPIBAIJ:Negative row");
72     if (im[i] >= baij->M) SETERRQ(1,"MatSetValues_MPIBAIJ:Row too large");
73     if (im[i] >= rstart_orig && im[i] < rend_orig) {
74       row = im[i] - rstart_orig;
75       for ( j=0; j<n; j++ ) {
76         if (in[j] < 0) SETERRQ(1,"MatSetValues_MPIBAIJ:Negative column");
77         if (in[j] >= baij->N) SETERRQ(1,"MatSetValues_MPIBAIJ:Col too large");
78         if (in[j] >= cstart_orig && in[j] < cend_orig){
79           col = in[j] - cstart_orig;
80           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
81           ierr = MatSetValues(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
82         }
83         else {
84           if (mat->was_assembled) {
85             if (!baij->colmap) {
86               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
87             }
88             col = baij->colmap[in[j]/bs] - 1 + in[j]%bs;
89             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
90               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
91               col =  in[j];
92             }
93           }
94           else col = in[j];
95           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
96           ierr = MatSetValues(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
97         }
98       }
99     }
100     else {
101       if (roworiented) {
102         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
103       }
104       else {
105         row = im[i];
106         for ( j=0; j<n; j++ ) {
107           ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
108         }
109       }
110     }
111   }
112   return 0;
113 }
114 
115 static int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
116 {
117   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
118   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
119   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col;
120 
121   for ( i=0; i<m; i++ ) {
122     if (idxm[i] < 0) SETERRQ(1,"MatGetValues_MPIBAIJ:Negative row");
123     if (idxm[i] >= baij->M) SETERRQ(1,"MatGetValues_MPIBAIJ:Row too large");
124     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
125       row = idxm[i] - bsrstart;
126       for ( j=0; j<n; j++ ) {
127         if (idxn[j] < 0) SETERRQ(1,"MatGetValues_MPIBAIJ:Negative column");
128         if (idxn[j] >= baij->N) SETERRQ(1,"MatGetValues_MPIBAIJ:Col too large");
129         if (idxn[j] >= bscstart && idxn[j] < bscend){
130           col = idxn[j] - bscstart;
131           ierr = MatGetValues(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
132         }
133         else {
134           if (!baij->colmap) {
135             ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
136           }
137           if((baij->colmap[idxn[j]/bs]-1 < 0) ||
138              (baij->garray[baij->colmap[idxn[j]/bs]-1] != idxn[j]/bs)) *(v+i*n+j) = 0.0;
139           else {
140             col  = (baij->colmap[idxn[j]/bs]-1)*bs + idxn[j]%bs;
141             ierr = MatGetValues(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
142           }
143         }
144       }
145     }
146     else {
147       SETERRQ(1,"MatGetValues_MPIBAIJ:Only local values currently supported");
148     }
149   }
150   return 0;
151 }
152 
153 static int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
154 {
155   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
156   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
157   int        ierr, i,bs2=baij->bs2;
158   double     sum = 0.0;
159   Scalar     *v;
160 
161   if (baij->size == 1) {
162     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
163   } else {
164     if (type == NORM_FROBENIUS) {
165       v = amat->a;
166       for (i=0; i<amat->nz*bs2; i++ ) {
167 #if defined(PETSC_COMPLEX)
168         sum += real(conj(*v)*(*v)); v++;
169 #else
170         sum += (*v)*(*v); v++;
171 #endif
172       }
173       v = bmat->a;
174       for (i=0; i<bmat->nz*bs2; i++ ) {
175 #if defined(PETSC_COMPLEX)
176         sum += real(conj(*v)*(*v)); v++;
177 #else
178         sum += (*v)*(*v); v++;
179 #endif
180       }
181       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
182       *norm = sqrt(*norm);
183     }
184     else
185       SETERRQ(1,"MatNorm_SeqBAIJ:No support for this norm yet");
186   }
187   return 0;
188 }
189 
190 static int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
191 {
192   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
193   MPI_Comm    comm = mat->comm;
194   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
195   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
196   MPI_Request *send_waits,*recv_waits;
197   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
198   InsertMode  addv;
199   Scalar      *rvalues,*svalues;
200 
201   /* make sure all processors are either in INSERTMODE or ADDMODE */
202   MPI_Allreduce(&baij->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
203   if (addv == (ADD_VALUES|INSERT_VALUES)) {
204     SETERRQ(1,"MatAssemblyBegin_MPIBAIJ:Some processors inserted others added");
205   }
206   baij->insertmode = addv; /* in case this processor had no cache */
207 
208   /*  first count number of contributors to each processor */
209   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
210   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
211   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
212   for ( i=0; i<baij->stash.n; i++ ) {
213     idx = baij->stash.idx[i];
214     for ( j=0; j<size; j++ ) {
215       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
216         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
217       }
218     }
219   }
220   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
221 
222   /* inform other processors of number of messages and max length*/
223   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
224   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
225   nreceives = work[rank];
226   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
227   nmax = work[rank];
228   PetscFree(work);
229 
230   /* post receives:
231        1) each message will consist of ordered pairs
232      (global index,value) we store the global index as a double
233      to simplify the message passing.
234        2) since we don't know how long each individual message is we
235      allocate the largest needed buffer for each receive. Potentially
236      this is a lot of wasted space.
237 
238 
239        This could be done better.
240   */
241   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
242   CHKPTRQ(rvalues);
243   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
244   CHKPTRQ(recv_waits);
245   for ( i=0; i<nreceives; i++ ) {
246     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
247               comm,recv_waits+i);
248   }
249 
250   /* do sends:
251       1) starts[i] gives the starting index in svalues for stuff going to
252          the ith processor
253   */
254   svalues = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
255   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
256   CHKPTRQ(send_waits);
257   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
258   starts[0] = 0;
259   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
260   for ( i=0; i<baij->stash.n; i++ ) {
261     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
262     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
263     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
264   }
265   PetscFree(owner);
266   starts[0] = 0;
267   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
268   count = 0;
269   for ( i=0; i<size; i++ ) {
270     if (procs[i]) {
271       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
272                 comm,send_waits+count++);
273     }
274   }
275   PetscFree(starts); PetscFree(nprocs);
276 
277   /* Free cache space */
278   PLogInfo(0,"[%d]MatAssemblyBegin_MPIBAIJ:Number of off processor values %d\n",rank,baij->stash.n);
279   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
280 
281   baij->svalues    = svalues;    baij->rvalues    = rvalues;
282   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
283   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
284   baij->rmax       = nmax;
285 
286   return 0;
287 }
288 
289 
290 static int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
291 {
292   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
293   MPI_Status  *send_status,recv_status;
294   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
295   int         bs=baij->bs,row,col,other_disassembled;
296   Scalar      *values,val;
297   InsertMode  addv = baij->insertmode;
298 
299   /*  wait on receives */
300   while (count) {
301     MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);
302     /* unpack receives into our local space */
303     values = baij->rvalues + 3*imdex*baij->rmax;
304     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
305     n = n/3;
306     for ( i=0; i<n; i++ ) {
307       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
308       col = (int) PetscReal(values[3*i+1]);
309       val = values[3*i+2];
310       if (col >= baij->cstart*bs && col < baij->cend*bs) {
311         col -= baij->cstart*bs;
312         MatSetValues(baij->A,1,&row,1,&col,&val,addv);
313       }
314       else {
315         if (mat->was_assembled) {
316           if (!baij->colmap) {
317             ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);
318           }
319           col = (baij->colmap[col/bs]-1)*bs + col%bs;
320           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
321             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
322             col = (int) PetscReal(values[3*i+1]);
323           }
324         }
325         MatSetValues(baij->B,1,&row,1,&col,&val,addv);
326       }
327     }
328     count--;
329   }
330   PetscFree(baij->recv_waits); PetscFree(baij->rvalues);
331 
332   /* wait on sends */
333   if (baij->nsends) {
334     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));
335     CHKPTRQ(send_status);
336     MPI_Waitall(baij->nsends,baij->send_waits,send_status);
337     PetscFree(send_status);
338   }
339   PetscFree(baij->send_waits); PetscFree(baij->svalues);
340 
341   baij->insertmode = NOT_SET_VALUES;
342   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
343   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
344 
345   /* determine if any processor has disassembled, if so we must
346      also disassemble ourselfs, in order that we may reassemble. */
347   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
348   if (mat->was_assembled && !other_disassembled) {
349     ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
350   }
351 
352   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
353     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
354   }
355   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
356   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
357 
358   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
359   return 0;
360 }
361 
362 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
363 {
364   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
365   int          ierr;
366 
367   if (baij->size == 1) {
368     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
369   }
370   else SETERRQ(1,"MatView_MPIBAIJ_Binary:Only uniprocessor output supported");
371   return 0;
372 }
373 
374 static int MatView_MPIBAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
375 {
376   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
377   int          ierr, format,rank,bs=baij->bs;
378   FILE         *fd;
379   ViewerType   vtype;
380 
381   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
382   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
383     ierr = ViewerGetFormat(viewer,&format);
384     if (format == ASCII_FORMAT_INFO_DETAILED) {
385       int nz, nzalloc, mem;
386       MPI_Comm_rank(mat->comm,&rank);
387       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
388       ierr = MatGetInfo(mat,MAT_LOCAL,&nz,&nzalloc,&mem);
389       PetscSequentialPhaseBegin(mat->comm,1);
390       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
391               rank,baij->m,nz*bs,nzalloc*bs,baij->bs,mem);
392       ierr = MatGetInfo(baij->A,MAT_LOCAL,&nz,&nzalloc,&mem);
393       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,nz*bs);
394       ierr = MatGetInfo(baij->B,MAT_LOCAL,&nz,&nzalloc,&mem);
395       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,nz*bs);
396       fflush(fd);
397       PetscSequentialPhaseEnd(mat->comm,1);
398       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
399       return 0;
400     }
401     else if (format == ASCII_FORMAT_INFO) {
402       return 0;
403     }
404   }
405 
406   if (vtype == DRAW_VIEWER) {
407     Draw       draw;
408     PetscTruth isnull;
409     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
410     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
411   }
412 
413   if (vtype == ASCII_FILE_VIEWER) {
414     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
415     PetscSequentialPhaseBegin(mat->comm,1);
416     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
417            baij->rank,baij->m,baij->rstart*bs,baij->rend*bs,baij->n,
418             baij->cstart*bs,baij->cend*bs);
419     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
420     ierr = MatView(baij->B,viewer); CHKERRQ(ierr);
421     fflush(fd);
422     PetscSequentialPhaseEnd(mat->comm,1);
423   }
424   else {
425     int size = baij->size;
426     rank = baij->rank;
427     if (size == 1) {
428       ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
429     }
430     else {
431       /* assemble the entire matrix onto first processor. */
432       Mat         A;
433       Mat_SeqBAIJ *Aloc;
434       int         M = baij->M, N = baij->N,*ai,*aj,row,col,i,j,k,*rvals;
435       int         mbs=baij->mbs;
436       Scalar      *a;
437 
438       if (!rank) {
439         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
440         CHKERRQ(ierr);
441       }
442       else {
443         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
444         CHKERRQ(ierr);
445       }
446       PLogObjectParent(mat,A);
447 
448       /* copy over the A part */
449       Aloc = (Mat_SeqBAIJ*) baij->A->data;
450       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
451       row = baij->rstart;
452       rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
453 
454       for ( i=0; i<mbs; i++ ) {
455         rvals[0] = bs*(baij->rstart + i);
456         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
457         for ( j=ai[i]; j<ai[i+1]; j++ ) {
458           col = (baij->cstart+aj[j])*bs;
459           for (k=0; k<bs; k++ ) {
460             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
461             col++; a += bs;
462           }
463         }
464       }
465       /* copy over the B part */
466       Aloc = (Mat_SeqBAIJ*) baij->B->data;
467       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
468       row = baij->rstart*bs;
469       for ( i=0; i<mbs; i++ ) {
470         rvals[0] = bs*(baij->rstart + i);
471         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
472         for ( j=ai[i]; j<ai[i+1]; j++ ) {
473           col = baij->garray[aj[j]]*bs;
474           for (k=0; k<bs; k++ ) {
475             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
476             col++; a += bs;
477           }
478         }
479       }
480       PetscFree(rvals);
481       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
482       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
483       if (!rank) {
484         ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
485       }
486       ierr = MatDestroy(A); CHKERRQ(ierr);
487     }
488   }
489   return 0;
490 }
491 
492 
493 
494 static int MatView_MPIBAIJ(PetscObject obj,Viewer viewer)
495 {
496   Mat         mat = (Mat) obj;
497   int         ierr;
498   ViewerType  vtype;
499 
500   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
501   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
502       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
503     ierr = MatView_MPIBAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
504   }
505   else if (vtype == BINARY_FILE_VIEWER) {
506     return MatView_MPIBAIJ_Binary(mat,viewer);
507   }
508   return 0;
509 }
510 
511 static int MatDestroy_MPIBAIJ(PetscObject obj)
512 {
513   Mat         mat = (Mat) obj;
514   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
515   int         ierr;
516 
517 #if defined(PETSC_LOG)
518   PLogObjectState(obj,"Rows=%d, Cols=%d",baij->M,baij->N);
519 #endif
520 
521   PetscFree(baij->rowners);
522   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
523   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
524   if (baij->colmap) PetscFree(baij->colmap);
525   if (baij->garray) PetscFree(baij->garray);
526   if (baij->lvec)   VecDestroy(baij->lvec);
527   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
528   if (baij->rowvalues) PetscFree(baij->rowvalues);
529   PetscFree(baij);
530   PLogObjectDestroy(mat);
531   PetscHeaderDestroy(mat);
532   return 0;
533 }
534 
535 static int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
536 {
537   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
538   int         ierr, nt;
539 
540   VecGetLocalSize_Fast(xx,nt);
541   if (nt != a->n) {
542     SETERRQ(1,"MatMult_MPIBAIJ:Incompatible parition of A and xx");
543   }
544   VecGetLocalSize_Fast(yy,nt);
545   if (nt != a->m) {
546     SETERRQ(1,"MatMult_MPIBAIJ:Incompatible parition of A and yy");
547   }
548   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
549   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
550   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
551   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
552   return 0;
553 }
554 
555 static int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
556 {
557   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
558   int        ierr;
559   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
560   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
561   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
562   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
563   return 0;
564 }
565 
566 static int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
567 {
568   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
569   int        ierr;
570 
571   /* do nondiagonal part */
572   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
573   /* send it on its way */
574   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,
575                 (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
576   /* do local part */
577   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
578   /* receive remote parts: note this assumes the values are not actually */
579   /* inserted in yy until the next line, which is true for my implementation*/
580   /* but is not perhaps always true. */
581   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,
582                   (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
583   return 0;
584 }
585 
586 static int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
587 {
588   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
589   int        ierr;
590 
591   /* do nondiagonal part */
592   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
593   /* send it on its way */
594   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,
595                  (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
596   /* do local part */
597   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
598   /* receive remote parts: note this assumes the values are not actually */
599   /* inserted in yy until the next line, which is true for my implementation*/
600   /* but is not perhaps always true. */
601   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,
602                   (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
603   return 0;
604 }
605 
606 /*
607   This only works correctly for square matrices where the subblock A->A is the
608    diagonal block
609 */
610 static int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
611 {
612   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
613   if (a->M != a->N)
614     SETERRQ(1,"MatGetDiagonal_MPIBAIJ:Supports only square matrix where A->A is diag block");
615   return MatGetDiagonal(a->A,v);
616 }
617 
618 static int MatScale_MPIBAIJ(Scalar *aa,Mat A)
619 {
620   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
621   int        ierr;
622   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
623   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
624   return 0;
625 }
626 static int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
627 {
628   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
629   *m = mat->M; *n = mat->N;
630   return 0;
631 }
632 
633 static int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
634 {
635   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
636   *m = mat->m; *n = mat->N;
637   return 0;
638 }
639 
640 static int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
641 {
642   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
643   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
644   return 0;
645 }
646 
647 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
648 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
649 
650 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
651 {
652   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
653   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
654   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
655   int        nztot, nzA, nzB, lrow, brstart = mat->rstart*bs, brend = mat->rend*bs;
656   int        *cmap, *idx_p,cstart = mat->cstart;
657 
658   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,"MatGetRow_MPIBAIJ:Already active");
659   mat->getrowactive = PETSC_TRUE;
660 
661   if (!mat->rowvalues && (idx || v)) {
662     /*
663         allocate enough space to hold information from the longest row.
664     */
665     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
666     int     max = 1,mbs = mat->mbs,tmp;
667     for ( i=0; i<mbs; i++ ) {
668       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
669       if (max < tmp) { max = tmp; }
670     }
671     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
672     CHKPTRQ(mat->rowvalues);
673     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
674   }
675 
676 
677   if (row < brstart || row >= brend) SETERRQ(1,"MatGetRow_MPIBAIJ:Only local rows")
678   lrow = row - brstart;
679 
680   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
681   if (!v)   {pvA = 0; pvB = 0;}
682   if (!idx) {pcA = 0; if (!v) pcB = 0;}
683   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
684   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
685   nztot = nzA + nzB;
686 
687   cmap  = mat->garray;
688   if (v  || idx) {
689     if (nztot) {
690       /* Sort by increasing column numbers, assuming A and B already sorted */
691       int imark = -1;
692       if (v) {
693         *v = v_p = mat->rowvalues;
694         for ( i=0; i<nzB; i++ ) {
695           if (cmap[cworkB[i]/bs] < cstart)   v_p[i] = vworkB[i];
696           else break;
697         }
698         imark = i;
699         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
700         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
701       }
702       if (idx) {
703         *idx = idx_p = mat->rowindices;
704         if (imark > -1) {
705           for ( i=0; i<imark; i++ ) {
706             idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs;
707           }
708         } else {
709           for ( i=0; i<nzB; i++ ) {
710             if (cmap[cworkB[i]/bs] < cstart)
711               idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
712             else break;
713           }
714           imark = i;
715         }
716         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart*bs + cworkA[i];
717         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
718       }
719     }
720     else {
721       if (idx) *idx = 0;
722       if (v)   *v   = 0;
723     }
724   }
725   *nz = nztot;
726   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
727   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
728   return 0;
729 }
730 
731 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
732 {
733   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
734   if (baij->getrowactive == PETSC_FALSE) {
735     SETERRQ(1,"MatRestoreRow_MPIBAIJ:MatGetRow not called");
736   }
737   baij->getrowactive = PETSC_FALSE;
738   return 0;
739 }
740 
741 static int MatGetBlockSize_MPIBAIJ(Mat mat, int *bs)
742 {
743   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
744   *bs = baij->bs;
745   return 0;
746 }
747 
748 static int MatZeroEntries_MPIBAIJ(Mat A)
749 {
750   Mat_MPIBAIJ *l = (Mat_MPIBAIJ *) A->data;
751   int         ierr;
752   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
753   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
754   return 0;
755 }
756 
757 static int MatGetInfo_MPIBAIJ(Mat matin,MatInfoType flag,int *nz,
758                              int *nzalloc,int *mem)
759 {
760   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
761   Mat         A = mat->A, B = mat->B;
762   int         ierr, isend[3], irecv[3], nzA, nzallocA, memA;
763 
764   ierr = MatGetInfo(A,MAT_LOCAL,&nzA,&nzallocA,&memA); CHKERRQ(ierr);
765   ierr = MatGetInfo(B,MAT_LOCAL,&isend[0],&isend[1],&isend[2]); CHKERRQ(ierr);
766   isend[0] += nzA; isend[1] += nzallocA; isend[2] += memA;
767   if (flag == MAT_LOCAL) {
768     if (nz)       *nz      = isend[0];
769     if (nzalloc)  *nzalloc = isend[1];
770     if (mem)      *mem     = isend[2];
771   } else if (flag == MAT_GLOBAL_MAX) {
772     MPI_Allreduce( isend, irecv,3,MPI_INT,MPI_MAX,matin->comm);
773     if (nz)      *nz      = irecv[0];
774     if (nzalloc) *nzalloc = irecv[1];
775     if (mem)     *mem     = irecv[2];
776   } else if (flag == MAT_GLOBAL_SUM) {
777     MPI_Allreduce( isend, irecv,3,MPI_INT,MPI_SUM,matin->comm);
778     if (nz)      *nz      = irecv[0];
779     if (nzalloc) *nzalloc = irecv[1];
780     if (mem)     *mem     = irecv[2];
781   }
782   return 0;
783 }
784 
785 static int MatSetOption_MPIBAIJ(Mat A,MatOption op)
786 {
787   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
788 
789   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
790       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
791       op == MAT_COLUMNS_SORTED ||
792       op == MAT_ROW_ORIENTED) {
793         MatSetOption(a->A,op);
794         MatSetOption(a->B,op);
795   }
796   else if (op == MAT_ROWS_SORTED ||
797            op == MAT_SYMMETRIC ||
798            op == MAT_STRUCTURALLY_SYMMETRIC ||
799            op == MAT_YES_NEW_DIAGONALS)
800     PLogInfo(A,"Info:MatSetOption_MPIBAIJ:Option ignored\n");
801   else if (op == MAT_COLUMN_ORIENTED) {
802     a->roworiented = 0;
803     MatSetOption(a->A,op);
804     MatSetOption(a->B,op);
805   }
806   else if (op == MAT_NO_NEW_DIAGONALS)
807     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIBAIJ:MAT_NO_NEW_DIAGONALS");}
808   else
809     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIBAIJ:unknown option");}
810   return 0;
811 }
812 
813 static int MatTranspose_MPIBAIJ(Mat A,Mat *matout)
814 {
815   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) A->data;
816   Mat_SeqBAIJ *Aloc;
817   Mat        B;
818   int        ierr,M=baij->M,N=baij->N,*ai,*aj,row,i,*rvals,j,k,col;
819   int        bs=baij->bs,mbs=baij->mbs;
820   Scalar     *a;
821 
822   if (matout == PETSC_NULL && M != N)
823     SETERRQ(1,"MatTranspose_MPIBAIJ:Square matrix only for in-place");
824   ierr = MatCreateMPIBAIJ(A->comm,baij->bs,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);
825   CHKERRQ(ierr);
826 
827   /* copy over the A part */
828   Aloc = (Mat_SeqBAIJ*) baij->A->data;
829   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
830   row = baij->rstart;
831   rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
832 
833   for ( i=0; i<mbs; i++ ) {
834     rvals[0] = bs*(baij->rstart + i);
835     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
836     for ( j=ai[i]; j<ai[i+1]; j++ ) {
837       col = (baij->cstart+aj[j])*bs;
838       for (k=0; k<bs; k++ ) {
839         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
840         col++; a += bs;
841       }
842     }
843   }
844   /* copy over the B part */
845   Aloc = (Mat_SeqBAIJ*) baij->B->data;
846   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
847   row = baij->rstart*bs;
848   for ( i=0; i<mbs; i++ ) {
849     rvals[0] = bs*(baij->rstart + i);
850     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
851     for ( j=ai[i]; j<ai[i+1]; j++ ) {
852       col = baij->garray[aj[j]]*bs;
853       for (k=0; k<bs; k++ ) {
854         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
855         col++; a += bs;
856       }
857     }
858   }
859   PetscFree(rvals);
860   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
861   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
862 
863   if (matout != PETSC_NULL) {
864     *matout = B;
865   } else {
866     /* This isn't really an in-place transpose .... but free data structures from baij */
867     PetscFree(baij->rowners);
868     ierr = MatDestroy(baij->A); CHKERRQ(ierr);
869     ierr = MatDestroy(baij->B); CHKERRQ(ierr);
870     if (baij->colmap) PetscFree(baij->colmap);
871     if (baij->garray) PetscFree(baij->garray);
872     if (baij->lvec) VecDestroy(baij->lvec);
873     if (baij->Mvctx) VecScatterDestroy(baij->Mvctx);
874     PetscFree(baij);
875     PetscMemcpy(A,B,sizeof(struct _Mat));
876     PetscHeaderDestroy(B);
877   }
878   return 0;
879 }
880 /* the code does not do the diagonal entries correctly unless the
881    matrix is square and the column and row owerships are identical.
882    This is a BUG. The only way to fix it seems to be to access
883    baij->A and baij->B directly and not through the MatZeroRows()
884    routine.
885 */
886 static int MatZeroRows_MPIBAIJ(Mat A,IS is,Scalar *diag)
887 {
888   Mat_MPIBAIJ    *l = (Mat_MPIBAIJ *) A->data;
889   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
890   int            *procs,*nprocs,j,found,idx,nsends,*work;
891   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
892   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
893   int            *lens,imdex,*lrows,*values,bs=l->bs;
894   MPI_Comm       comm = A->comm;
895   MPI_Request    *send_waits,*recv_waits;
896   MPI_Status     recv_status,*send_status;
897   IS             istmp;
898 
899   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
900   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
901 
902   /*  first count number of contributors to each processor */
903   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
904   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
905   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
906   for ( i=0; i<N; i++ ) {
907     idx = rows[i];
908     found = 0;
909     for ( j=0; j<size; j++ ) {
910       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
911         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
912       }
913     }
914     if (!found) SETERRQ(1,"MatZeroRows_MPIBAIJ:Index out of range");
915   }
916   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
917 
918   /* inform other processors of number of messages and max length*/
919   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
920   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
921   nrecvs = work[rank];
922   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
923   nmax = work[rank];
924   PetscFree(work);
925 
926   /* post receives:   */
927   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
928   CHKPTRQ(rvalues);
929   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
930   CHKPTRQ(recv_waits);
931   for ( i=0; i<nrecvs; i++ ) {
932     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
933   }
934 
935   /* do sends:
936       1) starts[i] gives the starting index in svalues for stuff going to
937          the ith processor
938   */
939   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
940   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
941   CHKPTRQ(send_waits);
942   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
943   starts[0] = 0;
944   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
945   for ( i=0; i<N; i++ ) {
946     svalues[starts[owner[i]]++] = rows[i];
947   }
948   ISRestoreIndices(is,&rows);
949 
950   starts[0] = 0;
951   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
952   count = 0;
953   for ( i=0; i<size; i++ ) {
954     if (procs[i]) {
955       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
956     }
957   }
958   PetscFree(starts);
959 
960   base = owners[rank]*bs;
961 
962   /*  wait on receives */
963   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
964   source = lens + nrecvs;
965   count  = nrecvs; slen = 0;
966   while (count) {
967     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
968     /* unpack receives into our local space */
969     MPI_Get_count(&recv_status,MPI_INT,&n);
970     source[imdex]  = recv_status.MPI_SOURCE;
971     lens[imdex]  = n;
972     slen += n;
973     count--;
974   }
975   PetscFree(recv_waits);
976 
977   /* move the data into the send scatter */
978   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
979   count = 0;
980   for ( i=0; i<nrecvs; i++ ) {
981     values = rvalues + i*nmax;
982     for ( j=0; j<lens[i]; j++ ) {
983       lrows[count++] = values[j] - base;
984     }
985   }
986   PetscFree(rvalues); PetscFree(lens);
987   PetscFree(owner); PetscFree(nprocs);
988 
989   /* actually zap the local rows */
990   ierr = ISCreateSeq(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
991   PLogObjectParent(A,istmp);
992   PetscFree(lrows);
993   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
994   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
995   ierr = ISDestroy(istmp); CHKERRQ(ierr);
996 
997   /* wait on sends */
998   if (nsends) {
999     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
1000     CHKPTRQ(send_status);
1001     MPI_Waitall(nsends,send_waits,send_status);
1002     PetscFree(send_status);
1003   }
1004   PetscFree(send_waits); PetscFree(svalues);
1005 
1006   return 0;
1007 }
1008 extern int MatPrintHelp_SeqBAIJ(Mat);
1009 static int MatPrintHelp_MPIBAIJ(Mat A)
1010 {
1011   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1012 
1013   if (!a->rank) return MatPrintHelp_SeqBAIJ(a->A);
1014   else return 0;
1015 }
1016 
1017 static int MatConvertSameType_MPIBAIJ(Mat,Mat *,int);
1018 
1019 /* -------------------------------------------------------------------*/
1020 static struct _MatOps MatOps = {
1021   MatSetValues_MPIBAIJ,MatGetRow_MPIBAIJ,MatRestoreRow_MPIBAIJ,MatMult_MPIBAIJ,
1022   MatMultAdd_MPIBAIJ,MatMultTrans_MPIBAIJ,MatMultTransAdd_MPIBAIJ,MatSolve_MPIBAIJ,
1023   MatSolveAdd_MPIBAIJ,MatSolveTrans_MPIBAIJ,MatSolveTransAdd_MPIBAIJ,MatLUFactor_MPIBAIJ,
1024   0,0,MatTranspose_MPIBAIJ,MatGetInfo_MPIBAIJ,
1025   0,MatGetDiagonal_MPIBAIJ,0,MatNorm_MPIBAIJ,
1026   MatAssemblyBegin_MPIBAIJ,MatAssemblyEnd_MPIBAIJ,0,MatSetOption_MPIBAIJ,
1027   MatZeroEntries_MPIBAIJ,MatZeroRows_MPIBAIJ,MatGetReordering_MPIBAIJ,MatLUFactorSymbolic_MPIBAIJ,
1028   MatLUFactorNumeric_MPIBAIJ,0,0,MatGetSize_MPIBAIJ,
1029   MatGetLocalSize_MPIBAIJ,MatGetOwnershipRange_MPIBAIJ,MatILUFactorSymbolic_MPIBAIJ,0,
1030   0,0,0,MatConvertSameType_MPIBAIJ,0,0,
1031   0,0,0,MatGetSubMatrices_MPIBAIJ,
1032   MatIncreaseOverlap_MPIBAIJ,MatGetValues_MPIBAIJ,0,MatPrintHelp_MPIBAIJ,
1033   MatScale_MPIBAIJ,0,0,0,MatGetBlockSize_MPIBAIJ};
1034 
1035 
1036 /*@C
1037    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
1038    (block compressed row).  For good matrix assembly performance
1039    the user should preallocate the matrix storage by setting the parameters
1040    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1041    performance can be increased by more than a factor of 50.
1042 
1043    Input Parameters:
1044 .  comm - MPI communicator
1045 .  bs   - size of blockk
1046 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1047            This value should be the same as the local size used in creating the
1048            y vector for the matrix-vector product y = Ax.
1049 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1050            This value should be the same as the local size used in creating the
1051            x vector for the matrix-vector product y = Ax.
1052 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1053 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1054 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
1055            submatrix  (same for all local rows)
1056 .  d_nzz - array containing the number of block nonzeros in the various block rows
1057            of the in diagonal portion of the local (possibly different for each block
1058            row) or PETSC_NULL.  You must leave room for the diagonal entry even if
1059            it is zero.
1060 .  o_nz  - number of block nonzeros per block row in the off-diagonal portion of local
1061            submatrix (same for all local rows).
1062 .  o_nzz - array containing the number of nonzeros in the various block rows of the
1063            off-diagonal portion of the local submatrix (possibly different for
1064            each block row) or PETSC_NULL.
1065 
1066    Output Parameter:
1067 .  A - the matrix
1068 
1069    Notes:
1070    The user MUST specify either the local or global matrix dimensions
1071    (possibly both).
1072 
1073    Storage Information:
1074    For a square global matrix we define each processor's diagonal portion
1075    to be its local rows and the corresponding columns (a square submatrix);
1076    each processor's off-diagonal portion encompasses the remainder of the
1077    local matrix (a rectangular submatrix).
1078 
1079    The user can specify preallocated storage for the diagonal part of
1080    the local submatrix with either d_nz or d_nnz (not both).  Set
1081    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1082    memory allocation.  Likewise, specify preallocated storage for the
1083    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1084 
1085    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1086    the figure below we depict these three local rows and all columns (0-11).
1087 
1088 $          0 1 2 3 4 5 6 7 8 9 10 11
1089 $         -------------------
1090 $  row 3  |  o o o d d d o o o o o o
1091 $  row 4  |  o o o d d d o o o o o o
1092 $  row 5  |  o o o d d d o o o o o o
1093 $         -------------------
1094 $
1095 
1096    Thus, any entries in the d locations are stored in the d (diagonal)
1097    submatrix, and any entries in the o locations are stored in the
1098    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1099    stored simply in the MATSEQBAIJ format for compressed row storage.
1100 
1101    Now d_nz should indicate the number of nonzeros per row in the d matrix,
1102    and o_nz should indicate the number of nonzeros per row in the o matrix.
1103    In general, for PDE problems in which most nonzeros are near the diagonal,
1104    one expects d_nz >> o_nz.   For large problems you MUST preallocate memory
1105    or you will get TERRIBLE performance; see the users' manual chapter on
1106    matrices and the file $(PETSC_DIR)/Performance.
1107 
1108 .keywords: matrix, block, aij, compressed row, sparse, parallel
1109 
1110 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues()
1111 @*/
1112 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
1113                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1114 {
1115   Mat          B;
1116   Mat_MPIBAIJ  *b;
1117   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE;
1118 
1119   if (bs < 1) SETERRQ(1,"MatCreateMPIBAIJ: invalid block size specified");
1120   *A = 0;
1121   PetscHeaderCreate(B,_Mat,MAT_COOKIE,MATMPIBAIJ,comm);
1122   PLogObjectCreate(B);
1123   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
1124   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
1125   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1126   B->destroy    = MatDestroy_MPIBAIJ;
1127   B->view       = MatView_MPIBAIJ;
1128 
1129   B->factor     = 0;
1130   B->assembled  = PETSC_FALSE;
1131 
1132   b->insertmode = NOT_SET_VALUES;
1133   MPI_Comm_rank(comm,&b->rank);
1134   MPI_Comm_size(comm,&b->size);
1135 
1136   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
1137     SETERRQ(1,"MatCreateMPIBAIJ:Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1138   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) SETERRQ(1,"MatCreateMPIBAIJ: either M or m should be specified");
1139   if ( M == PETSC_DECIDE && n == PETSC_DECIDE)SETERRQ(1,"MatCreateMPIBAIJ: either N or n should be specified");
1140   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
1141   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
1142 
1143   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1144     work[0] = m; work[1] = n;
1145     mbs = m/bs; nbs = n/bs;
1146     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
1147     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
1148     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
1149   }
1150   if (m == PETSC_DECIDE) {
1151     Mbs = M/bs;
1152     if (Mbs*bs != M) SETERRQ(1,"MatCreateMPIBAIJ: No of global rows must be divisible by blocksize");
1153     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
1154     m   = mbs*bs;
1155   }
1156   if (n == PETSC_DECIDE) {
1157     Nbs = N/bs;
1158     if (Nbs*bs != N) SETERRQ(1,"MatCreateMPIBAIJ: No of global cols must be divisible by blocksize");
1159     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
1160     n   = nbs*bs;
1161   }
1162   if (mbs*bs != m || nbs*bs != n) SETERRQ(1,"MatCreateMPIBAIJ: No of local rows, cols must be divisible by blocksize");
1163 
1164   b->m = m; B->m = m;
1165   b->n = n; B->n = n;
1166   b->N = N; B->N = N;
1167   b->M = M; B->M = M;
1168   b->bs  = bs;
1169   b->bs2 = bs*bs;
1170   b->mbs = mbs;
1171   b->nbs = nbs;
1172   b->Mbs = Mbs;
1173   b->Nbs = Nbs;
1174 
1175   /* build local table of row and column ownerships */
1176   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1177   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
1178   b->cowners = b->rowners + b->size + 2;
1179   MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
1180   b->rowners[0] = 0;
1181   for ( i=2; i<=b->size; i++ ) {
1182     b->rowners[i] += b->rowners[i-1];
1183   }
1184   b->rstart = b->rowners[b->rank];
1185   b->rend   = b->rowners[b->rank+1];
1186   MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
1187   b->cowners[0] = 0;
1188   for ( i=2; i<=b->size; i++ ) {
1189     b->cowners[i] += b->cowners[i-1];
1190   }
1191   b->cstart = b->cowners[b->rank];
1192   b->cend   = b->cowners[b->rank+1];
1193 
1194   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1195   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1196   PLogObjectParent(B,b->A);
1197   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1198   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1199   PLogObjectParent(B,b->B);
1200 
1201   /* build cache for off array entries formed */
1202   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1203   b->colmap      = 0;
1204   b->garray      = 0;
1205   b->roworiented = 1;
1206 
1207   /* stuff used for matrix vector multiply */
1208   b->lvec      = 0;
1209   b->Mvctx     = 0;
1210 
1211   /* stuff for MatGetRow() */
1212   b->rowindices   = 0;
1213   b->rowvalues    = 0;
1214   b->getrowactive = PETSC_FALSE;
1215 
1216   *A = B;
1217   return 0;
1218 }
1219 static int MatConvertSameType_MPIBAIJ(Mat matin,Mat *newmat,int cpvalues)
1220 {
1221   Mat        mat;
1222   Mat_MPIBAIJ *a,*oldmat = (Mat_MPIBAIJ *) matin->data;
1223   int        ierr, len=0, flg;
1224 
1225   *newmat       = 0;
1226   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIBAIJ,matin->comm);
1227   PLogObjectCreate(mat);
1228   mat->data       = (void *) (a = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(a);
1229   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1230   mat->destroy    = MatDestroy_MPIBAIJ;
1231   mat->view       = MatView_MPIBAIJ;
1232   mat->factor     = matin->factor;
1233   mat->assembled  = PETSC_TRUE;
1234 
1235   a->m = mat->m   = oldmat->m;
1236   a->n = mat->n   = oldmat->n;
1237   a->M = mat->M   = oldmat->M;
1238   a->N = mat->N   = oldmat->N;
1239 
1240   a->bs  = oldmat->bs;
1241   a->bs2 = oldmat->bs2;
1242   a->mbs = oldmat->mbs;
1243   a->nbs = oldmat->nbs;
1244   a->Mbs = oldmat->Mbs;
1245   a->Nbs = oldmat->Nbs;
1246 
1247   a->rstart       = oldmat->rstart;
1248   a->rend         = oldmat->rend;
1249   a->cstart       = oldmat->cstart;
1250   a->cend         = oldmat->cend;
1251   a->size         = oldmat->size;
1252   a->rank         = oldmat->rank;
1253   a->insertmode   = NOT_SET_VALUES;
1254   a->rowvalues    = 0;
1255   a->getrowactive = PETSC_FALSE;
1256 
1257   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1258   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
1259   a->cowners = a->rowners + a->size + 2;
1260   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1261   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1262   if (oldmat->colmap) {
1263     a->colmap = (int *) PetscMalloc((a->Nbs)*sizeof(int));CHKPTRQ(a->colmap);
1264     PLogObjectMemory(mat,(a->Nbs)*sizeof(int));
1265     PetscMemcpy(a->colmap,oldmat->colmap,(a->Nbs)*sizeof(int));
1266   } else a->colmap = 0;
1267   if (oldmat->garray && (len = ((Mat_SeqBAIJ *) (oldmat->B->data))->nbs)) {
1268     a->garray = (int *) PetscMalloc(len*sizeof(int)); CHKPTRQ(a->garray);
1269     PLogObjectMemory(mat,len*sizeof(int));
1270     PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1271   } else a->garray = 0;
1272 
1273   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1274   PLogObjectParent(mat,a->lvec);
1275   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1276   PLogObjectParent(mat,a->Mvctx);
1277   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1278   PLogObjectParent(mat,a->A);
1279   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1280   PLogObjectParent(mat,a->B);
1281   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1282   if (flg) {
1283     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1284   }
1285   *newmat = mat;
1286   return 0;
1287 }
1288 
1289 #include "sys.h"
1290 
1291 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
1292 {
1293   Mat          A;
1294   int          i, nz, ierr, j,rstart, rend, fd;
1295   Scalar       *vals,*buf;
1296   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1297   MPI_Status   status;
1298   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
1299   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
1300   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,bs2,Mbs,mbs,extra_rows;
1301   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
1302   int          dcount,kmax,k,nzcount,tmp;
1303 
1304 
1305   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
1306   bs2  = bs*bs;
1307 
1308   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1309   if (!rank) {
1310     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1311     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
1312     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIBAIJ:not matrix object");
1313   }
1314 
1315   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1316   M = header[1]; N = header[2];
1317 
1318 
1319   if (M != N) SETERRQ(1,"MatLoad_SeqBAIJ:Can only do square matrices");
1320 
1321   /*
1322      This code adds extra rows to make sure the number of rows is
1323      divisible by the blocksize
1324   */
1325   Mbs        = M/bs;
1326   extra_rows = bs - M + bs*(Mbs);
1327   if (extra_rows == bs) extra_rows = 0;
1328   else                  Mbs++;
1329   if (extra_rows &&!rank) {
1330     PLogInfo(0,"MatLoad_SeqBAIJ:Padding loaded matrix to match blocksize");
1331   }
1332   /* determine ownership of all rows */
1333   mbs = Mbs/size + ((Mbs % size) > rank);
1334   m   = mbs * bs;
1335   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
1336   browners = rowners + size + 1;
1337   MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1338   rowners[0] = 0;
1339   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
1340   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
1341   rstart = rowners[rank];
1342   rend   = rowners[rank+1];
1343 
1344   /* distribute row lengths to all processors */
1345   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
1346   if (!rank) {
1347     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
1348     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1349     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
1350     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1351     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
1352     MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);
1353     PetscFree(sndcounts);
1354   }
1355   else {
1356     MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);
1357   }
1358 
1359   if (!rank) {
1360     /* calculate the number of nonzeros on each processor */
1361     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1362     PetscMemzero(procsnz,size*sizeof(int));
1363     for ( i=0; i<size; i++ ) {
1364       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
1365         procsnz[i] += rowlengths[j];
1366       }
1367     }
1368     PetscFree(rowlengths);
1369 
1370     /* determine max buffer needed and allocate it */
1371     maxnz = 0;
1372     for ( i=0; i<size; i++ ) {
1373       maxnz = PetscMax(maxnz,procsnz[i]);
1374     }
1375     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1376 
1377     /* read in my part of the matrix column indices  */
1378     nz = procsnz[0];
1379     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1380     mycols = ibuf;
1381     if (size == 1)  nz -= extra_rows;
1382     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1383     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
1384 
1385     /* read in every ones (except the last) and ship off */
1386     for ( i=1; i<size-1; i++ ) {
1387       nz = procsnz[i];
1388       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1389       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1390     }
1391     /* read in the stuff for the last proc */
1392     if ( size != 1 ) {
1393       nz = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
1394       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1395       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
1396       MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);
1397     }
1398     PetscFree(cols);
1399   }
1400   else {
1401     /* determine buffer space needed for message */
1402     nz = 0;
1403     for ( i=0; i<m; i++ ) {
1404       nz += locrowlens[i];
1405     }
1406     ibuf = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1407     mycols = ibuf;
1408     /* receive message of column indices*/
1409     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1410     MPI_Get_count(&status,MPI_INT,&maxnz);
1411     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIBAIJ:something is wrong with file");
1412   }
1413 
1414   /* loop over local rows, determining number of off diagonal entries */
1415   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
1416   odlens = dlens + (rend-rstart);
1417   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
1418   PetscMemzero(mask,3*Mbs*sizeof(int));
1419   masked1 = mask    + Mbs;
1420   masked2 = masked1 + Mbs;
1421   rowcount = 0; nzcount = 0;
1422   for ( i=0; i<mbs; i++ ) {
1423     dcount  = 0;
1424     odcount = 0;
1425     for ( j=0; j<bs; j++ ) {
1426       kmax = locrowlens[rowcount];
1427       for ( k=0; k<kmax; k++ ) {
1428         tmp = mycols[nzcount++]/bs;
1429         if (!mask[tmp]) {
1430           mask[tmp] = 1;
1431           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
1432           else masked1[dcount++] = tmp;
1433         }
1434       }
1435       rowcount++;
1436     }
1437 
1438     dlens[i]  = dcount;
1439     odlens[i] = odcount;
1440 
1441     /* zero out the mask elements we set */
1442     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
1443     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
1444   }
1445 
1446   /* create our matrix */
1447   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);CHKERRQ(ierr);
1448   A = *newmat;
1449   MatSetOption(A,MAT_COLUMNS_SORTED);
1450 
1451   if (!rank) {
1452     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
1453     /* read in my part of the matrix numerical values  */
1454     nz = procsnz[0];
1455     vals = buf;
1456     mycols = ibuf;
1457     if (size == 1)  nz -= extra_rows;
1458     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1459     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
1460     /* insert into matrix */
1461     jj      = rstart*bs;
1462     for ( i=0; i<m; i++ ) {
1463       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1464       mycols += locrowlens[i];
1465       vals   += locrowlens[i];
1466       jj++;
1467     }
1468     /* read in other processors( except the last one) and ship out */
1469     for ( i=1; i<size-1; i++ ) {
1470       nz = procsnz[i];
1471       vals = buf;
1472       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1473       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1474     }
1475     /* the last proc */
1476     if ( size != 1 ){
1477       nz = procsnz[i] - extra_rows;
1478       vals = buf;
1479       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1480       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
1481       MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);
1482     }
1483     PetscFree(procsnz);
1484   }
1485   else {
1486     /* receive numeric values */
1487     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
1488 
1489     /* receive message of values*/
1490     vals = buf;
1491     mycols = ibuf;
1492     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1493     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1494     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIBAIJ:something is wrong with file");
1495 
1496     /* insert into matrix */
1497     jj      = rstart*bs;
1498     for ( i=0; i<m; i++ ) {
1499       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1500       mycols += locrowlens[i];
1501       vals   += locrowlens[i];
1502       jj++;
1503     }
1504   }
1505   PetscFree(locrowlens);
1506   PetscFree(buf);
1507   PetscFree(ibuf);
1508   PetscFree(rowners);
1509   PetscFree(dlens);
1510   PetscFree(mask);
1511   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1512   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1513   return 0;
1514 }
1515 
1516 
1517