xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision 905e6a2f0f49659307fca631a274f7b99e9427ee)
1 #ifndef lint
2 static char vcid[] = "$Id: mpibaij.c,v 1.18 1996/08/04 23:12:57 bsmith Exp bsmith $";
3 #endif
4 
5 #include "mpibaij.h"
6 #include "src/vec/vecimpl.h"
7 
8 #include "draw.h"
9 #include "pinclude/pviewer.h"
10 
11 extern int MatSetUpMultiply_MPIBAIJ(Mat);
12 extern int DisAssemble_MPIBAIJ(Mat);
13 extern int MatIncreaseOverlap_MPIBAIJ(Mat,int,IS *,int);
14 extern int MatGetSubMatrices_MPIBAIJ(Mat,int,IS *,IS *,MatGetSubMatrixCall,Mat **);
15 extern int MatLUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,Mat *);
16 extern int MatLUFactorNumeric_MPIBAIJ(Mat,Mat *);
17 extern int MatLUFactor_MPIBAIJ(Mat,IS,IS,double);
18 extern int MatSolve_MPIBAIJ(Mat,Vec,Vec);
19 extern int MatSolveAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
20 extern int MatSolveTrans_MPIBAIJ(Mat,Vec,Vec);
21 extern int MatSolveTransAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
22 extern int MatILUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,int,Mat *);
23 
24 /* local utility routine that creates a mapping from the global column
25 number to the local number in the off-diagonal part of the local
26 storage of the matrix.  This is done in a non scable way since the
27 length of colmap equals the global matrix length.
28 */
29 static int CreateColmap_MPIBAIJ_Private(Mat mat)
30 {
31   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
32   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
33   int        nbs = B->nbs,i;
34 
35   baij->colmap = (int *) PetscMalloc(baij->Nbs*sizeof(int));CHKPTRQ(baij->colmap);
36   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
37   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
38   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i + 1;
39   return 0;
40 }
41 
42 
43 static int MatGetReordering_MPIBAIJ(Mat mat,MatReordering type,IS *rperm,IS *cperm)
44 {
45   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
46   int         ierr;
47   if (baij->size == 1) {
48     ierr = MatGetReordering(baij->A,type,rperm,cperm); CHKERRQ(ierr);
49   } else SETERRQ(1,"MatGetReordering_MPIBAIJ:not supported in parallel");
50   return 0;
51 }
52 
53 static int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
54 {
55   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
56   Scalar      value;
57   int         ierr,i,j, rstart = baij->rstart, rend = baij->rend;
58   int         cstart = baij->cstart, cend = baij->cend,row,col;
59   int         roworiented = baij->roworiented,rstart_orig,rend_orig;
60   int         cstart_orig,cend_orig,bs=baij->bs;
61 
62   if (baij->insertmode != NOT_SET_VALUES && baij->insertmode != addv) {
63     SETERRQ(1,"MatSetValues_MPIBAIJ:Cannot mix inserts and adds");
64   }
65   baij->insertmode = addv;
66   rstart_orig = rstart*bs;
67   rend_orig   = rend*bs;
68   cstart_orig = cstart*bs;
69   cend_orig   = cend*bs;
70   for ( i=0; i<m; i++ ) {
71     if (im[i] < 0) SETERRQ(1,"MatSetValues_MPIBAIJ:Negative row");
72     if (im[i] >= baij->M) SETERRQ(1,"MatSetValues_MPIBAIJ:Row too large");
73     if (im[i] >= rstart_orig && im[i] < rend_orig) {
74       row = im[i] - rstart_orig;
75       for ( j=0; j<n; j++ ) {
76         if (in[j] < 0) SETERRQ(1,"MatSetValues_MPIBAIJ:Negative column");
77         if (in[j] >= baij->N) SETERRQ(1,"MatSetValues_MPIBAIJ:Col too large");
78         if (in[j] >= cstart_orig && in[j] < cend_orig){
79           col = in[j] - cstart_orig;
80           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
81           ierr = MatSetValues(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
82         }
83         else {
84           if (mat->was_assembled) {
85             if (!baij->colmap) {
86               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
87             }
88             col = baij->colmap[in[j]/bs] - 1 + in[j]%bs;
89             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
90               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
91               col =  in[j];
92             }
93           }
94           else col = in[j];
95           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
96           ierr = MatSetValues(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
97         }
98       }
99     }
100     else {
101       if (roworiented) {
102         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
103       }
104       else {
105         row = im[i];
106         for ( j=0; j<n; j++ ) {
107           ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
108         }
109       }
110     }
111   }
112   return 0;
113 }
114 
115 static int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
116 {
117   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
118   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
119   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col;
120 
121   for ( i=0; i<m; i++ ) {
122     if (idxm[i] < 0) SETERRQ(1,"MatGetValues_MPIBAIJ:Negative row");
123     if (idxm[i] >= baij->M) SETERRQ(1,"MatGetValues_MPIBAIJ:Row too large");
124     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
125       row = idxm[i] - bsrstart;
126       for ( j=0; j<n; j++ ) {
127         if (idxn[j] < 0) SETERRQ(1,"MatGetValues_MPIBAIJ:Negative column");
128         if (idxn[j] >= baij->N) SETERRQ(1,"MatGetValues_MPIBAIJ:Col too large");
129         if (idxn[j] >= bscstart && idxn[j] < bscend){
130           col = idxn[j] - bscstart;
131           ierr = MatGetValues(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
132         }
133         else {
134           if (!baij->colmap) {
135             ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
136           }
137           if (baij->garray[baij->colmap[idxn[j]/bs]-1] != idxn[j]/bs) *(v+i*n+j) = 0.0;
138           else {
139             col  = (baij->colmap[idxn[j]/bs]-1)*bs + idxn[j]%bs;
140             ierr = MatGetValues(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
141           }
142         }
143       }
144     }
145     else {
146       SETERRQ(1,"MatGetValues_MPIBAIJ:Only local values currently supported");
147     }
148   }
149   return 0;
150 }
151 
152 static int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
153 {
154   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
155   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
156   int        ierr, i,bs2=baij->bs2;
157   double     sum = 0.0;
158   Scalar     *v;
159 
160   if (baij->size == 1) {
161     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
162   } else {
163     if (type == NORM_FROBENIUS) {
164       v = amat->a;
165       for (i=0; i<amat->nz*bs2; i++ ) {
166 #if defined(PETSC_COMPLEX)
167         sum += real(conj(*v)*(*v)); v++;
168 #else
169         sum += (*v)*(*v); v++;
170 #endif
171       }
172       v = bmat->a;
173       for (i=0; i<bmat->nz*bs2; i++ ) {
174 #if defined(PETSC_COMPLEX)
175         sum += real(conj(*v)*(*v)); v++;
176 #else
177         sum += (*v)*(*v); v++;
178 #endif
179       }
180       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
181       *norm = sqrt(*norm);
182     }
183     else
184       SETERRQ(1,"MatNorm_SeqBAIJ:No support for this norm yet");
185   }
186   return 0;
187 }
188 
189 static int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
190 {
191   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
192   MPI_Comm    comm = mat->comm;
193   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
194   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
195   MPI_Request *send_waits,*recv_waits;
196   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
197   InsertMode  addv;
198   Scalar      *rvalues,*svalues;
199 
200   /* make sure all processors are either in INSERTMODE or ADDMODE */
201   MPI_Allreduce(&baij->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
202   if (addv == (ADD_VALUES|INSERT_VALUES)) {
203     SETERRQ(1,"MatAssemblyBegin_MPIBAIJ:Some processors inserted others added");
204   }
205   baij->insertmode = addv; /* in case this processor had no cache */
206 
207   /*  first count number of contributors to each processor */
208   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
209   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
210   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
211   for ( i=0; i<baij->stash.n; i++ ) {
212     idx = baij->stash.idx[i];
213     for ( j=0; j<size; j++ ) {
214       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
215         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
216       }
217     }
218   }
219   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
220 
221   /* inform other processors of number of messages and max length*/
222   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
223   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
224   nreceives = work[rank];
225   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
226   nmax = work[rank];
227   PetscFree(work);
228 
229   /* post receives:
230        1) each message will consist of ordered pairs
231      (global index,value) we store the global index as a double
232      to simplify the message passing.
233        2) since we don't know how long each individual message is we
234      allocate the largest needed buffer for each receive. Potentially
235      this is a lot of wasted space.
236 
237 
238        This could be done better.
239   */
240   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
241   CHKPTRQ(rvalues);
242   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
243   CHKPTRQ(recv_waits);
244   for ( i=0; i<nreceives; i++ ) {
245     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
246               comm,recv_waits+i);
247   }
248 
249   /* do sends:
250       1) starts[i] gives the starting index in svalues for stuff going to
251          the ith processor
252   */
253   svalues = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
254   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
255   CHKPTRQ(send_waits);
256   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
257   starts[0] = 0;
258   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
259   for ( i=0; i<baij->stash.n; i++ ) {
260     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
261     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
262     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
263   }
264   PetscFree(owner);
265   starts[0] = 0;
266   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
267   count = 0;
268   for ( i=0; i<size; i++ ) {
269     if (procs[i]) {
270       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
271                 comm,send_waits+count++);
272     }
273   }
274   PetscFree(starts); PetscFree(nprocs);
275 
276   /* Free cache space */
277   PLogInfo(0,"[%d]MatAssemblyBegin_MPIBAIJ:Number of off processor values %d\n",rank,baij->stash.n);
278   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
279 
280   baij->svalues    = svalues;    baij->rvalues    = rvalues;
281   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
282   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
283   baij->rmax       = nmax;
284 
285   return 0;
286 }
287 
288 
289 static int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
290 {
291   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
292   MPI_Status  *send_status,recv_status;
293   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
294   int         bs=baij->bs,row,col,other_disassembled;
295   Scalar      *values,val;
296   InsertMode  addv = baij->insertmode;
297 
298   /*  wait on receives */
299   while (count) {
300     MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);
301     /* unpack receives into our local space */
302     values = baij->rvalues + 3*imdex*baij->rmax;
303     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
304     n = n/3;
305     for ( i=0; i<n; i++ ) {
306       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
307       col = (int) PetscReal(values[3*i+1]);
308       val = values[3*i+2];
309       if (col >= baij->cstart*bs && col < baij->cend*bs) {
310         col -= baij->cstart*bs;
311         MatSetValues(baij->A,1,&row,1,&col,&val,addv);
312       }
313       else {
314         if (mat->was_assembled) {
315           if (!baij->colmap) {
316             ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);
317           }
318           col = (baij->colmap[col/bs]-1)*bs + col%bs;
319           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
320             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
321             col = (int) PetscReal(values[3*i+1]);
322           }
323         }
324         MatSetValues(baij->B,1,&row,1,&col,&val,addv);
325       }
326     }
327     count--;
328   }
329   PetscFree(baij->recv_waits); PetscFree(baij->rvalues);
330 
331   /* wait on sends */
332   if (baij->nsends) {
333     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));
334     CHKPTRQ(send_status);
335     MPI_Waitall(baij->nsends,baij->send_waits,send_status);
336     PetscFree(send_status);
337   }
338   PetscFree(baij->send_waits); PetscFree(baij->svalues);
339 
340   baij->insertmode = NOT_SET_VALUES;
341   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
342   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
343 
344   /* determine if any processor has disassembled, if so we must
345      also disassemble ourselfs, in order that we may reassemble. */
346   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
347   if (mat->was_assembled && !other_disassembled) {
348     ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
349   }
350 
351   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
352     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
353   }
354   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
355   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
356 
357   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
358   return 0;
359 }
360 
361 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
362 {
363   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
364   int          ierr;
365 
366   if (baij->size == 1) {
367     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
368   }
369   else SETERRQ(1,"MatView_MPIBAIJ_Binary:Only uniprocessor output supported");
370   return 0;
371 }
372 
373 static int MatView_MPIBAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
374 {
375   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
376   int          ierr, format,rank,bs=baij->bs;
377   FILE         *fd;
378   ViewerType   vtype;
379 
380   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
381   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
382     ierr = ViewerGetFormat(viewer,&format);
383     if (format == ASCII_FORMAT_INFO_DETAILED) {
384       int nz, nzalloc, mem;
385       MPI_Comm_rank(mat->comm,&rank);
386       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
387       ierr = MatGetInfo(mat,MAT_LOCAL,&nz,&nzalloc,&mem);
388       PetscSequentialPhaseBegin(mat->comm,1);
389       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
390               rank,baij->m,nz*bs,nzalloc*bs,baij->bs,mem);
391       ierr = MatGetInfo(baij->A,MAT_LOCAL,&nz,&nzalloc,&mem);
392       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,nz*bs);
393       ierr = MatGetInfo(baij->B,MAT_LOCAL,&nz,&nzalloc,&mem);
394       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,nz*bs);
395       fflush(fd);
396       PetscSequentialPhaseEnd(mat->comm,1);
397       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
398       return 0;
399     }
400     else if (format == ASCII_FORMAT_INFO) {
401       return 0;
402     }
403   }
404 
405   if (vtype == DRAW_VIEWER) {
406     Draw       draw;
407     PetscTruth isnull;
408     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
409     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
410   }
411 
412   if (vtype == ASCII_FILE_VIEWER) {
413     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
414     PetscSequentialPhaseBegin(mat->comm,1);
415     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
416            baij->rank,baij->m,baij->rstart*bs,baij->rend*bs,baij->n,
417             baij->cstart*bs,baij->cend*bs);
418     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
419     ierr = MatView(baij->B,viewer); CHKERRQ(ierr);
420     fflush(fd);
421     PetscSequentialPhaseEnd(mat->comm,1);
422   }
423   else {
424     int size = baij->size;
425     rank = baij->rank;
426     if (size == 1) {
427       ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
428     }
429     else {
430       /* assemble the entire matrix onto first processor. */
431       Mat         A;
432       Mat_SeqBAIJ *Aloc;
433       int         M = baij->M, N = baij->N,*ai,*aj,row,col,i,j,k,*rvals;
434       int         mbs=baij->mbs;
435       Scalar      *a;
436 
437       if (!rank) {
438         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
439         CHKERRQ(ierr);
440       }
441       else {
442         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
443         CHKERRQ(ierr);
444       }
445       PLogObjectParent(mat,A);
446 
447       /* copy over the A part */
448       Aloc = (Mat_SeqBAIJ*) baij->A->data;
449       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
450       row = baij->rstart;
451       rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
452 
453       for ( i=0; i<mbs; i++ ) {
454         rvals[0] = bs*(baij->rstart + i);
455         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
456         for ( j=ai[i]; j<ai[i+1]; j++ ) {
457           col = (baij->cstart+aj[j])*bs;
458           for (k=0; k<bs; k++ ) {
459             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
460             col++; a += bs;
461           }
462         }
463       }
464       /* copy over the B part */
465       Aloc = (Mat_SeqBAIJ*) baij->B->data;
466       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
467       row = baij->rstart*bs;
468       for ( i=0; i<mbs; i++ ) {
469         rvals[0] = bs*(baij->rstart + i);
470         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
471         for ( j=ai[i]; j<ai[i+1]; j++ ) {
472           col = baij->garray[aj[j]]*bs;
473           for (k=0; k<bs; k++ ) {
474             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
475             col++; a += bs;
476           }
477         }
478       }
479       PetscFree(rvals);
480       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
481       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
482       if (!rank) {
483         ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
484       }
485       ierr = MatDestroy(A); CHKERRQ(ierr);
486     }
487   }
488   return 0;
489 }
490 
491 
492 
493 static int MatView_MPIBAIJ(PetscObject obj,Viewer viewer)
494 {
495   Mat         mat = (Mat) obj;
496   int         ierr;
497   ViewerType  vtype;
498 
499   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
500   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
501       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
502     ierr = MatView_MPIBAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
503   }
504   else if (vtype == BINARY_FILE_VIEWER) {
505     return MatView_MPIBAIJ_Binary(mat,viewer);
506   }
507   return 0;
508 }
509 
510 static int MatDestroy_MPIBAIJ(PetscObject obj)
511 {
512   Mat         mat = (Mat) obj;
513   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
514   int         ierr;
515 
516 #if defined(PETSC_LOG)
517   PLogObjectState(obj,"Rows=%d, Cols=%d",baij->M,baij->N);
518 #endif
519 
520   PetscFree(baij->rowners);
521   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
522   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
523   if (baij->colmap) PetscFree(baij->colmap);
524   if (baij->garray) PetscFree(baij->garray);
525   if (baij->lvec)   VecDestroy(baij->lvec);
526   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
527   if (baij->rowvalues) PetscFree(baij->rowvalues);
528   PetscFree(baij);
529   PLogObjectDestroy(mat);
530   PetscHeaderDestroy(mat);
531   return 0;
532 }
533 
534 static int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
535 {
536   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
537   int         ierr, nt;
538 
539   VecGetLocalSize_Fast(xx,nt);
540   if (nt != a->n) {
541     SETERRQ(1,"MatMult_MPIBAIJ:Incompatible parition of A and xx");
542   }
543   VecGetLocalSize_Fast(yy,nt);
544   if (nt != a->m) {
545     SETERRQ(1,"MatMult_MPIBAIJ:Incompatible parition of A and yy");
546   }
547   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
548   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
549   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
550   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
551   return 0;
552 }
553 
554 static int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
555 {
556   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
557   int        ierr;
558   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
559   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
560   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
561   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
562   return 0;
563 }
564 
565 static int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
566 {
567   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
568   int        ierr;
569 
570   /* do nondiagonal part */
571   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
572   /* send it on its way */
573   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,
574                 (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
575   /* do local part */
576   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
577   /* receive remote parts: note this assumes the values are not actually */
578   /* inserted in yy until the next line, which is true for my implementation*/
579   /* but is not perhaps always true. */
580   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,
581                   (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
582   return 0;
583 }
584 
585 static int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
586 {
587   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
588   int        ierr;
589 
590   /* do nondiagonal part */
591   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
592   /* send it on its way */
593   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,
594                  (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
595   /* do local part */
596   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
597   /* receive remote parts: note this assumes the values are not actually */
598   /* inserted in yy until the next line, which is true for my implementation*/
599   /* but is not perhaps always true. */
600   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,
601                   (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
602   return 0;
603 }
604 
605 /*
606   This only works correctly for square matrices where the subblock A->A is the
607    diagonal block
608 */
609 static int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
610 {
611   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
612   if (a->M != a->N)
613     SETERRQ(1,"MatGetDiagonal_MPIBAIJ:Supports only square matrix where A->A is diag block");
614   return MatGetDiagonal(a->A,v);
615 }
616 
617 static int MatScale_MPIBAIJ(Scalar *aa,Mat A)
618 {
619   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
620   int        ierr;
621   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
622   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
623   return 0;
624 }
625 static int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
626 {
627   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
628   *m = mat->M; *n = mat->N;
629   return 0;
630 }
631 
632 static int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
633 {
634   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
635   *m = mat->m; *n = mat->N;
636   return 0;
637 }
638 
639 static int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
640 {
641   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
642   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
643   return 0;
644 }
645 
646 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
647 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
648 
649 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
650 {
651   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
652   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
653   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
654   int        nztot, nzA, nzB, lrow, brstart = mat->rstart*bs, brend = mat->rend*bs;
655   int        *cmap, *idx_p,cstart = mat->cstart;
656 
657   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,"MatGetRow_MPIBAIJ:Already active");
658   mat->getrowactive = PETSC_TRUE;
659 
660   if (!mat->rowvalues && (idx || v)) {
661     /*
662         allocate enough space to hold information from the longest row.
663     */
664     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
665     int     max = 1,mbs = mat->mbs,tmp;
666     for ( i=0; i<mbs; i++ ) {
667       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
668       if (max < tmp) { max = tmp; }
669     }
670     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
671     CHKPTRQ(mat->rowvalues);
672     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
673   }
674 
675 
676   if (row < brstart || row >= brend) SETERRQ(1,"MatGetRow_MPIBAIJ:Only local rows")
677   lrow = row - brstart;
678 
679   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
680   if (!v)   {pvA = 0; pvB = 0;}
681   if (!idx) {pcA = 0; if (!v) pcB = 0;}
682   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
683   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
684   nztot = nzA + nzB;
685 
686   cmap  = mat->garray;
687   if (v  || idx) {
688     if (nztot) {
689       /* Sort by increasing column numbers, assuming A and B already sorted */
690       int imark = -1;
691       if (v) {
692         *v = v_p = mat->rowvalues;
693         for ( i=0; i<nzB; i++ ) {
694           if (cmap[cworkB[i]/bs] < cstart)   v_p[i] = vworkB[i];
695           else break;
696         }
697         imark = i;
698         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
699         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
700       }
701       if (idx) {
702         *idx = idx_p = mat->rowindices;
703         if (imark > -1) {
704           for ( i=0; i<imark; i++ ) {
705             idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs;
706           }
707         } else {
708           for ( i=0; i<nzB; i++ ) {
709             if (cmap[cworkB[i]/bs] < cstart)
710               idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
711             else break;
712           }
713           imark = i;
714         }
715         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart*bs + cworkA[i];
716         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
717       }
718     }
719     else {
720       if (idx) *idx = 0;
721       if (v)   *v   = 0;
722     }
723   }
724   *nz = nztot;
725   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
726   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
727   return 0;
728 }
729 
730 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
731 {
732   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
733   if (baij->getrowactive == PETSC_FALSE) {
734     SETERRQ(1,"MatRestoreRow_MPIBAIJ:MatGetRow not called");
735   }
736   baij->getrowactive = PETSC_FALSE;
737   return 0;
738 }
739 
740 static int MatGetBlockSize_MPIBAIJ(Mat mat, int *bs)
741 {
742   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
743   *bs = baij->bs;
744   return 0;
745 }
746 
747 static int MatZeroEntries_MPIBAIJ(Mat A)
748 {
749   Mat_MPIBAIJ *l = (Mat_MPIBAIJ *) A->data;
750   int         ierr;
751   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
752   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
753   return 0;
754 }
755 
756 static int MatGetInfo_MPIBAIJ(Mat matin,MatInfoType flag,int *nz,
757                              int *nzalloc,int *mem)
758 {
759   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
760   Mat         A = mat->A, B = mat->B;
761   int         ierr, isend[3], irecv[3], nzA, nzallocA, memA;
762 
763   ierr = MatGetInfo(A,MAT_LOCAL,&nzA,&nzallocA,&memA); CHKERRQ(ierr);
764   ierr = MatGetInfo(B,MAT_LOCAL,&isend[0],&isend[1],&isend[2]); CHKERRQ(ierr);
765   isend[0] += nzA; isend[1] += nzallocA; isend[2] += memA;
766   if (flag == MAT_LOCAL) {
767     if (nz)       *nz      = isend[0];
768     if (nzalloc)  *nzalloc = isend[1];
769     if (mem)      *mem     = isend[2];
770   } else if (flag == MAT_GLOBAL_MAX) {
771     MPI_Allreduce( isend, irecv,3,MPI_INT,MPI_MAX,matin->comm);
772     if (nz)      *nz      = irecv[0];
773     if (nzalloc) *nzalloc = irecv[1];
774     if (mem)     *mem     = irecv[2];
775   } else if (flag == MAT_GLOBAL_SUM) {
776     MPI_Allreduce( isend, irecv,3,MPI_INT,MPI_SUM,matin->comm);
777     if (nz)      *nz      = irecv[0];
778     if (nzalloc) *nzalloc = irecv[1];
779     if (mem)     *mem     = irecv[2];
780   }
781   return 0;
782 }
783 
784 static int MatSetOption_MPIBAIJ(Mat A,MatOption op)
785 {
786   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
787 
788   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
789       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
790       op == MAT_COLUMNS_SORTED ||
791       op == MAT_ROW_ORIENTED) {
792         MatSetOption(a->A,op);
793         MatSetOption(a->B,op);
794   }
795   else if (op == MAT_ROWS_SORTED ||
796            op == MAT_SYMMETRIC ||
797            op == MAT_STRUCTURALLY_SYMMETRIC ||
798            op == MAT_YES_NEW_DIAGONALS)
799     PLogInfo(A,"Info:MatSetOption_MPIBAIJ:Option ignored\n");
800   else if (op == MAT_COLUMN_ORIENTED) {
801     a->roworiented = 0;
802     MatSetOption(a->A,op);
803     MatSetOption(a->B,op);
804   }
805   else if (op == MAT_NO_NEW_DIAGONALS)
806     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIBAIJ:MAT_NO_NEW_DIAGONALS");}
807   else
808     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIBAIJ:unknown option");}
809   return 0;
810 }
811 
812 static int MatTranspose_MPIBAIJ(Mat A,Mat *matout)
813 {
814   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) A->data;
815   Mat_SeqBAIJ *Aloc;
816   Mat        B;
817   int        ierr,M=baij->M,N=baij->N,*ai,*aj,row,i,*rvals,j,k,col;
818   int        bs=baij->bs,mbs=baij->mbs;
819   Scalar     *a;
820 
821   if (matout == PETSC_NULL && M != N)
822     SETERRQ(1,"MatTranspose_MPIBAIJ:Square matrix only for in-place");
823   ierr = MatCreateMPIBAIJ(A->comm,baij->bs,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);
824   CHKERRQ(ierr);
825 
826   /* copy over the A part */
827   Aloc = (Mat_SeqBAIJ*) baij->A->data;
828   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
829   row = baij->rstart;
830   rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
831 
832   for ( i=0; i<mbs; i++ ) {
833     rvals[0] = bs*(baij->rstart + i);
834     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
835     for ( j=ai[i]; j<ai[i+1]; j++ ) {
836       col = (baij->cstart+aj[j])*bs;
837       for (k=0; k<bs; k++ ) {
838         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
839         col++; a += bs;
840       }
841     }
842   }
843   /* copy over the B part */
844   Aloc = (Mat_SeqBAIJ*) baij->B->data;
845   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
846   row = baij->rstart*bs;
847   for ( i=0; i<mbs; i++ ) {
848     rvals[0] = bs*(baij->rstart + i);
849     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
850     for ( j=ai[i]; j<ai[i+1]; j++ ) {
851       col = baij->garray[aj[j]]*bs;
852       for (k=0; k<bs; k++ ) {
853         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
854         col++; a += bs;
855       }
856     }
857   }
858   PetscFree(rvals);
859   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
860   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
861 
862   if (matout != PETSC_NULL) {
863     *matout = B;
864   } else {
865     /* This isn't really an in-place transpose .... but free data structures from baij */
866     PetscFree(baij->rowners);
867     ierr = MatDestroy(baij->A); CHKERRQ(ierr);
868     ierr = MatDestroy(baij->B); CHKERRQ(ierr);
869     if (baij->colmap) PetscFree(baij->colmap);
870     if (baij->garray) PetscFree(baij->garray);
871     if (baij->lvec) VecDestroy(baij->lvec);
872     if (baij->Mvctx) VecScatterDestroy(baij->Mvctx);
873     PetscFree(baij);
874     PetscMemcpy(A,B,sizeof(struct _Mat));
875     PetscHeaderDestroy(B);
876   }
877   return 0;
878 }
879 /* the code does not do the diagonal entries correctly unless the
880    matrix is square and the column and row owerships are identical.
881    This is a BUG. The only way to fix it seems to be to access
882    baij->A and baij->B directly and not through the MatZeroRows()
883    routine.
884 */
885 static int MatZeroRows_MPIBAIJ(Mat A,IS is,Scalar *diag)
886 {
887   Mat_MPIBAIJ    *l = (Mat_MPIBAIJ *) A->data;
888   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
889   int            *procs,*nprocs,j,found,idx,nsends,*work;
890   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
891   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
892   int            *lens,imdex,*lrows,*values,bs=l->bs;
893   MPI_Comm       comm = A->comm;
894   MPI_Request    *send_waits,*recv_waits;
895   MPI_Status     recv_status,*send_status;
896   IS             istmp;
897 
898   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
899   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
900 
901   /*  first count number of contributors to each processor */
902   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
903   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
904   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
905   for ( i=0; i<N; i++ ) {
906     idx = rows[i];
907     found = 0;
908     for ( j=0; j<size; j++ ) {
909       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
910         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
911       }
912     }
913     if (!found) SETERRQ(1,"MatZeroRows_MPIBAIJ:Index out of range");
914   }
915   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
916 
917   /* inform other processors of number of messages and max length*/
918   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
919   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
920   nrecvs = work[rank];
921   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
922   nmax = work[rank];
923   PetscFree(work);
924 
925   /* post receives:   */
926   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
927   CHKPTRQ(rvalues);
928   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
929   CHKPTRQ(recv_waits);
930   for ( i=0; i<nrecvs; i++ ) {
931     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
932   }
933 
934   /* do sends:
935       1) starts[i] gives the starting index in svalues for stuff going to
936          the ith processor
937   */
938   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
939   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
940   CHKPTRQ(send_waits);
941   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
942   starts[0] = 0;
943   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
944   for ( i=0; i<N; i++ ) {
945     svalues[starts[owner[i]]++] = rows[i];
946   }
947   ISRestoreIndices(is,&rows);
948 
949   starts[0] = 0;
950   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
951   count = 0;
952   for ( i=0; i<size; i++ ) {
953     if (procs[i]) {
954       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
955     }
956   }
957   PetscFree(starts);
958 
959   base = owners[rank]*bs;
960 
961   /*  wait on receives */
962   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
963   source = lens + nrecvs;
964   count  = nrecvs; slen = 0;
965   while (count) {
966     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
967     /* unpack receives into our local space */
968     MPI_Get_count(&recv_status,MPI_INT,&n);
969     source[imdex]  = recv_status.MPI_SOURCE;
970     lens[imdex]  = n;
971     slen += n;
972     count--;
973   }
974   PetscFree(recv_waits);
975 
976   /* move the data into the send scatter */
977   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
978   count = 0;
979   for ( i=0; i<nrecvs; i++ ) {
980     values = rvalues + i*nmax;
981     for ( j=0; j<lens[i]; j++ ) {
982       lrows[count++] = values[j] - base;
983     }
984   }
985   PetscFree(rvalues); PetscFree(lens);
986   PetscFree(owner); PetscFree(nprocs);
987 
988   /* actually zap the local rows */
989   ierr = ISCreateSeq(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
990   PLogObjectParent(A,istmp);
991   PetscFree(lrows);
992   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
993   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
994   ierr = ISDestroy(istmp); CHKERRQ(ierr);
995 
996   /* wait on sends */
997   if (nsends) {
998     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
999     CHKPTRQ(send_status);
1000     MPI_Waitall(nsends,send_waits,send_status);
1001     PetscFree(send_status);
1002   }
1003   PetscFree(send_waits); PetscFree(svalues);
1004 
1005   return 0;
1006 }
1007 extern int MatPrintHelp_SeqBAIJ(Mat);
1008 static int MatPrintHelp_MPIBAIJ(Mat A)
1009 {
1010   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1011 
1012   if (!a->rank) return MatPrintHelp_SeqBAIJ(a->A);
1013   else return 0;
1014 }
1015 
1016 static int MatConvertSameType_MPIBAIJ(Mat,Mat *,int);
1017 
1018 /* -------------------------------------------------------------------*/
1019 static struct _MatOps MatOps = {
1020   MatSetValues_MPIBAIJ,MatGetRow_MPIBAIJ,MatRestoreRow_MPIBAIJ,MatMult_MPIBAIJ,
1021   MatMultAdd_MPIBAIJ,MatMultTrans_MPIBAIJ,MatMultTransAdd_MPIBAIJ,MatSolve_MPIBAIJ,
1022   MatSolveAdd_MPIBAIJ,MatSolveTrans_MPIBAIJ,MatSolveTransAdd_MPIBAIJ,MatLUFactor_MPIBAIJ,
1023   0,0,MatTranspose_MPIBAIJ,MatGetInfo_MPIBAIJ,
1024   0,MatGetDiagonal_MPIBAIJ,0,MatNorm_MPIBAIJ,
1025   MatAssemblyBegin_MPIBAIJ,MatAssemblyEnd_MPIBAIJ,0,MatSetOption_MPIBAIJ,
1026   MatZeroEntries_MPIBAIJ,MatZeroRows_MPIBAIJ,MatGetReordering_MPIBAIJ,MatLUFactorSymbolic_MPIBAIJ,
1027   MatLUFactorNumeric_MPIBAIJ,0,0,MatGetSize_MPIBAIJ,
1028   MatGetLocalSize_MPIBAIJ,MatGetOwnershipRange_MPIBAIJ,MatILUFactorSymbolic_MPIBAIJ,0,
1029   0,0,0,MatConvertSameType_MPIBAIJ,0,0,
1030   0,0,0,MatGetSubMatrices_MPIBAIJ,
1031   MatIncreaseOverlap_MPIBAIJ,MatGetValues_MPIBAIJ,0,MatPrintHelp_MPIBAIJ,
1032   MatScale_MPIBAIJ,0,0,0,MatGetBlockSize_MPIBAIJ};
1033 
1034 
1035 /*@C
1036    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
1037    (block compressed row).  For good matrix assembly performance
1038    the user should preallocate the matrix storage by setting the parameters
1039    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1040    performance can be increased by more than a factor of 50.
1041 
1042    Input Parameters:
1043 .  comm - MPI communicator
1044 .  bs   - size of blockk
1045 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1046            This value should be the same as the local size used in creating the
1047            y vector for the matrix-vector product y = Ax.
1048 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1049            This value should be the same as the local size used in creating the
1050            x vector for the matrix-vector product y = Ax.
1051 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1052 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1053 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
1054            submatrix  (same for all local rows)
1055 .  d_nzz - array containing the number of block nonzeros in the various block rows
1056            of the in diagonal portion of the local (possibly different for each block
1057            row) or PETSC_NULL.  You must leave room for the diagonal entry even if
1058            it is zero.
1059 .  o_nz  - number of block nonzeros per block row in the off-diagonal portion of local
1060            submatrix (same for all local rows).
1061 .  o_nzz - array containing the number of nonzeros in the various block rows of the
1062            off-diagonal portion of the local submatrix (possibly different for
1063            each block row) or PETSC_NULL.
1064 
1065    Output Parameter:
1066 .  A - the matrix
1067 
1068    Notes:
1069    The user MUST specify either the local or global matrix dimensions
1070    (possibly both).
1071 
1072    Storage Information:
1073    For a square global matrix we define each processor's diagonal portion
1074    to be its local rows and the corresponding columns (a square submatrix);
1075    each processor's off-diagonal portion encompasses the remainder of the
1076    local matrix (a rectangular submatrix).
1077 
1078    The user can specify preallocated storage for the diagonal part of
1079    the local submatrix with either d_nz or d_nnz (not both).  Set
1080    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1081    memory allocation.  Likewise, specify preallocated storage for the
1082    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1083 
1084    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1085    the figure below we depict these three local rows and all columns (0-11).
1086 
1087 $          0 1 2 3 4 5 6 7 8 9 10 11
1088 $         -------------------
1089 $  row 3  |  o o o d d d o o o o o o
1090 $  row 4  |  o o o d d d o o o o o o
1091 $  row 5  |  o o o d d d o o o o o o
1092 $         -------------------
1093 $
1094 
1095    Thus, any entries in the d locations are stored in the d (diagonal)
1096    submatrix, and any entries in the o locations are stored in the
1097    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1098    stored simply in the MATSEQBAIJ format for compressed row storage.
1099 
1100    Now d_nz should indicate the number of nonzeros per row in the d matrix,
1101    and o_nz should indicate the number of nonzeros per row in the o matrix.
1102    In general, for PDE problems in which most nonzeros are near the diagonal,
1103    one expects d_nz >> o_nz.   For large problems you MUST preallocate memory
1104    or you will get TERRIBLE performance; see the users' manual chapter on
1105    matrices and the file $(PETSC_DIR)/Performance.
1106 
1107 .keywords: matrix, block, aij, compressed row, sparse, parallel
1108 
1109 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues()
1110 @*/
1111 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
1112                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1113 {
1114   Mat          B;
1115   Mat_MPIBAIJ  *b;
1116   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE;
1117 
1118   if (bs < 1) SETERRQ(1,"MatCreateMPIBAIJ: invalid block size specified");
1119   *A = 0;
1120   PetscHeaderCreate(B,_Mat,MAT_COOKIE,MATMPIBAIJ,comm);
1121   PLogObjectCreate(B);
1122   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
1123   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
1124   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1125   B->destroy    = MatDestroy_MPIBAIJ;
1126   B->view       = MatView_MPIBAIJ;
1127 
1128   B->factor     = 0;
1129   B->assembled  = PETSC_FALSE;
1130 
1131   b->insertmode = NOT_SET_VALUES;
1132   MPI_Comm_rank(comm,&b->rank);
1133   MPI_Comm_size(comm,&b->size);
1134 
1135   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
1136     SETERRQ(1,"MatCreateMPIBAIJ:Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1137   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) SETERRQ(1,"MatCreateMPIBAIJ: either M or m should be specified");
1138   if ( M == PETSC_DECIDE && n == PETSC_DECIDE)SETERRQ(1,"MatCreateMPIBAIJ: either N or n should be specified");
1139   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
1140   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
1141 
1142   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1143     work[0] = m; work[1] = n;
1144     mbs = m/bs; nbs = n/bs;
1145     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
1146     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
1147     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
1148   }
1149   if (m == PETSC_DECIDE) {
1150     Mbs = M/bs;
1151     if (Mbs*bs != M) SETERRQ(1,"MatCreateMPIBAIJ: No of global rows must be divisible by blocksize");
1152     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
1153     m   = mbs*bs;
1154   }
1155   if (n == PETSC_DECIDE) {
1156     Nbs = N/bs;
1157     if (Nbs*bs != N) SETERRQ(1,"MatCreateMPIBAIJ: No of global cols must be divisible by blocksize");
1158     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
1159     n   = nbs*bs;
1160   }
1161   if (mbs*bs != m || nbs*bs != n) SETERRQ(1,"MatCreateMPIBAIJ: No of local rows, cols must be divisible by blocksize");
1162 
1163   b->m = m; B->m = m;
1164   b->n = n; B->n = n;
1165   b->N = N; B->N = N;
1166   b->M = M; B->M = M;
1167   b->bs  = bs;
1168   b->bs2 = bs*bs;
1169   b->mbs = mbs;
1170   b->nbs = nbs;
1171   b->Mbs = Mbs;
1172   b->Nbs = Nbs;
1173 
1174   /* build local table of row and column ownerships */
1175   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1176   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
1177   b->cowners = b->rowners + b->size + 2;
1178   MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
1179   b->rowners[0] = 0;
1180   for ( i=2; i<=b->size; i++ ) {
1181     b->rowners[i] += b->rowners[i-1];
1182   }
1183   b->rstart = b->rowners[b->rank];
1184   b->rend   = b->rowners[b->rank+1];
1185   MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
1186   b->cowners[0] = 0;
1187   for ( i=2; i<=b->size; i++ ) {
1188     b->cowners[i] += b->cowners[i-1];
1189   }
1190   b->cstart = b->cowners[b->rank];
1191   b->cend   = b->cowners[b->rank+1];
1192 
1193   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1194   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1195   PLogObjectParent(B,b->A);
1196   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1197   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1198   PLogObjectParent(B,b->B);
1199 
1200   /* build cache for off array entries formed */
1201   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1202   b->colmap      = 0;
1203   b->garray      = 0;
1204   b->roworiented = 1;
1205 
1206   /* stuff used for matrix vector multiply */
1207   b->lvec      = 0;
1208   b->Mvctx     = 0;
1209 
1210   /* stuff for MatGetRow() */
1211   b->rowindices   = 0;
1212   b->rowvalues    = 0;
1213   b->getrowactive = PETSC_FALSE;
1214 
1215   *A = B;
1216   return 0;
1217 }
1218 static int MatConvertSameType_MPIBAIJ(Mat matin,Mat *newmat,int cpvalues)
1219 {
1220   Mat        mat;
1221   Mat_MPIBAIJ *a,*oldmat = (Mat_MPIBAIJ *) matin->data;
1222   int        ierr, len=0, flg;
1223 
1224   *newmat       = 0;
1225   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIBAIJ,matin->comm);
1226   PLogObjectCreate(mat);
1227   mat->data       = (void *) (a = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(a);
1228   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1229   mat->destroy    = MatDestroy_MPIBAIJ;
1230   mat->view       = MatView_MPIBAIJ;
1231   mat->factor     = matin->factor;
1232   mat->assembled  = PETSC_TRUE;
1233 
1234   a->m = mat->m   = oldmat->m;
1235   a->n = mat->n   = oldmat->n;
1236   a->M = mat->M   = oldmat->M;
1237   a->N = mat->N   = oldmat->N;
1238 
1239   a->bs  = oldmat->bs;
1240   a->bs2 = oldmat->bs2;
1241   a->mbs = oldmat->mbs;
1242   a->nbs = oldmat->nbs;
1243   a->Mbs = oldmat->Mbs;
1244   a->Nbs = oldmat->Nbs;
1245 
1246   a->rstart       = oldmat->rstart;
1247   a->rend         = oldmat->rend;
1248   a->cstart       = oldmat->cstart;
1249   a->cend         = oldmat->cend;
1250   a->size         = oldmat->size;
1251   a->rank         = oldmat->rank;
1252   a->insertmode   = NOT_SET_VALUES;
1253   a->rowvalues    = 0;
1254   a->getrowactive = PETSC_FALSE;
1255 
1256   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1257   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
1258   a->cowners = a->rowners + a->size + 2;
1259   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1260   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1261   if (oldmat->colmap) {
1262     a->colmap = (int *) PetscMalloc((a->Nbs)*sizeof(int));CHKPTRQ(a->colmap);
1263     PLogObjectMemory(mat,(a->Nbs)*sizeof(int));
1264     PetscMemcpy(a->colmap,oldmat->colmap,(a->Nbs)*sizeof(int));
1265   } else a->colmap = 0;
1266   if (oldmat->garray && (len = ((Mat_SeqBAIJ *) (oldmat->B->data))->nbs)) {
1267     a->garray = (int *) PetscMalloc(len*sizeof(int)); CHKPTRQ(a->garray);
1268     PLogObjectMemory(mat,len*sizeof(int));
1269     PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1270   } else a->garray = 0;
1271 
1272   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1273   PLogObjectParent(mat,a->lvec);
1274   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1275   PLogObjectParent(mat,a->Mvctx);
1276   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1277   PLogObjectParent(mat,a->A);
1278   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1279   PLogObjectParent(mat,a->B);
1280   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1281   if (flg) {
1282     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1283   }
1284   *newmat = mat;
1285   return 0;
1286 }
1287 
1288 #include "sys.h"
1289 
1290 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
1291 {
1292   Mat          A;
1293   int          i, nz, ierr, j,rstart, rend, fd;
1294   Scalar       *vals,*buf;
1295   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1296   MPI_Status   status;
1297   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
1298   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
1299   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,bs2,Mbs,mbs,extra_rows;
1300   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
1301   int          dcount,kmax,k,nzcount,tmp;
1302 
1303 
1304   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
1305   bs2  = bs*bs;
1306 
1307   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1308   if (!rank) {
1309     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1310     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
1311     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIBAIJ:not matrix object");
1312   }
1313 
1314   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1315   M = header[1]; N = header[2];
1316 
1317 
1318   if (M != N) SETERRQ(1,"MatLoad_SeqBAIJ:Can only do square matrices");
1319 
1320   /*
1321      This code adds extra rows to make sure the number of rows is
1322      divisible by the blocksize
1323   */
1324   Mbs        = M/bs;
1325   extra_rows = bs - M + bs*(Mbs);
1326   if (extra_rows == bs) extra_rows = 0;
1327   else                  Mbs++;
1328   if (extra_rows &&!rank) {
1329     PLogInfo(0,"MatLoad_SeqBAIJ:Padding loaded matrix to match blocksize");
1330   }
1331   /* determine ownership of all rows */
1332   mbs = Mbs/size + ((Mbs % size) > rank);
1333   m   = mbs * bs;
1334   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
1335   browners = rowners + size + 1;
1336   MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1337   rowners[0] = 0;
1338   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
1339   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
1340   rstart = rowners[rank];
1341   rend   = rowners[rank+1];
1342 
1343   /* distribute row lengths to all processors */
1344   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
1345   if (!rank) {
1346     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
1347     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1348     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
1349     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1350     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
1351     MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);
1352     PetscFree(sndcounts);
1353   }
1354   else {
1355     MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);
1356   }
1357 
1358   if (!rank) {
1359     /* calculate the number of nonzeros on each processor */
1360     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1361     PetscMemzero(procsnz,size*sizeof(int));
1362     for ( i=0; i<size; i++ ) {
1363       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
1364         procsnz[i] += rowlengths[j];
1365       }
1366     }
1367     PetscFree(rowlengths);
1368 
1369     /* determine max buffer needed and allocate it */
1370     maxnz = 0;
1371     for ( i=0; i<size; i++ ) {
1372       maxnz = PetscMax(maxnz,procsnz[i]);
1373     }
1374     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1375 
1376     /* read in my part of the matrix column indices  */
1377     nz = procsnz[0];
1378     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1379     mycols = ibuf;
1380     if (size == 1)  nz -= extra_rows;
1381     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1382     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
1383 
1384     /* read in every ones (except the last) and ship off */
1385     for ( i=1; i<size-1; i++ ) {
1386       nz = procsnz[i];
1387       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1388       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1389     }
1390     /* read in the stuff for the last proc */
1391     if ( size != 1 ) {
1392       nz = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
1393       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1394       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
1395       MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);
1396     }
1397     PetscFree(cols);
1398   }
1399   else {
1400     /* determine buffer space needed for message */
1401     nz = 0;
1402     for ( i=0; i<m; i++ ) {
1403       nz += locrowlens[i];
1404     }
1405     ibuf = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1406     mycols = ibuf;
1407     /* receive message of column indices*/
1408     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1409     MPI_Get_count(&status,MPI_INT,&maxnz);
1410     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIBAIJ:something is wrong with file");
1411   }
1412 
1413   /* loop over local rows, determining number of off diagonal entries */
1414   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
1415   odlens = dlens + (rend-rstart);
1416   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
1417   PetscMemzero(mask,3*Mbs*sizeof(int));
1418   masked1 = mask    + Mbs;
1419   masked2 = masked1 + Mbs;
1420   rowcount = 0; nzcount = 0;
1421   for ( i=0; i<mbs; i++ ) {
1422     dcount  = 0;
1423     odcount = 0;
1424     for ( j=0; j<bs; j++ ) {
1425       kmax = locrowlens[rowcount];
1426       for ( k=0; k<kmax; k++ ) {
1427         tmp = mycols[nzcount++]/bs;
1428         if (!mask[tmp]) {
1429           mask[tmp] = 1;
1430           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
1431           else masked1[dcount++] = tmp;
1432         }
1433       }
1434       rowcount++;
1435     }
1436 
1437     dlens[i]  = dcount;
1438     odlens[i] = odcount;
1439 
1440     /* zero out the mask elements we set */
1441     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
1442     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
1443   }
1444 
1445   /* create our matrix */
1446   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);CHKERRQ(ierr);
1447   A = *newmat;
1448   MatSetOption(A,MAT_COLUMNS_SORTED);
1449 
1450   if (!rank) {
1451     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
1452     /* read in my part of the matrix numerical values  */
1453     nz = procsnz[0];
1454     vals = buf;
1455     mycols = ibuf;
1456     if (size == 1)  nz -= extra_rows;
1457     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1458     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
1459     /* insert into matrix */
1460     jj      = rstart*bs;
1461     for ( i=0; i<m; i++ ) {
1462       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1463       mycols += locrowlens[i];
1464       vals   += locrowlens[i];
1465       jj++;
1466     }
1467     /* read in other processors( except the last one) and ship out */
1468     for ( i=1; i<size-1; i++ ) {
1469       nz = procsnz[i];
1470       vals = buf;
1471       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1472       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1473     }
1474     /* the last proc */
1475     if ( size != 1 ){
1476       nz = procsnz[i] - extra_rows;
1477       vals = buf;
1478       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1479       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
1480       MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);
1481     }
1482     PetscFree(procsnz);
1483   }
1484   else {
1485     /* receive numeric values */
1486     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
1487 
1488     /* receive message of values*/
1489     vals = buf;
1490     mycols = ibuf;
1491     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1492     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1493     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIBAIJ:something is wrong with file");
1494 
1495     /* insert into matrix */
1496     jj      = rstart*bs;
1497     for ( i=0; i<m; i++ ) {
1498       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1499       mycols += locrowlens[i];
1500       vals   += locrowlens[i];
1501       jj++;
1502     }
1503   }
1504   PetscFree(locrowlens);
1505   PetscFree(buf);
1506   PetscFree(ibuf);
1507   PetscFree(rowners);
1508   PetscFree(dlens);
1509   PetscFree(mask);
1510   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1511   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1512   return 0;
1513 }
1514 
1515 
1516