xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision 0ac07820caad201d9f56837b0224a85c76e3c049)
1 #ifndef lint
2 static char vcid[] = "$Id: mpibaij.c,v 1.15 1996/07/12 19:09:33 balay Exp balay $";
3 #endif
4 
5 #include "mpibaij.h"
6 
7 
8 #include "draw.h"
9 #include "pinclude/pviewer.h"
10 
11 extern int MatSetUpMultiply_MPIBAIJ(Mat);
12 extern int DisAssemble_MPIBAIJ(Mat);
13 extern int MatIncreaseOverlap_MPIBAIJ(Mat,int,IS *,int);
14 extern int MatGetSubMatrices_MPIBAIJ(Mat,int,IS *,IS *,MatGetSubMatrixCall,Mat **);
15 extern int MatLUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,Mat *);
16 extern int MatLUFactorNumeric_MPIBAIJ(Mat,Mat *);
17 extern int MatLUFactor_MPIBAIJ(Mat,IS,IS,double);
18 extern int MatSolve_MPIBAIJ(Mat,Vec,Vec);
19 extern int MatSolveAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
20 extern int MatSolveTrans_MPIBAIJ(Mat,Vec,Vec);
21 extern int MatSolveTransAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
22 extern int MatILUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,int,Mat *);
23 
24 /* local utility routine that creates a mapping from the global column
25 number to the local number in the off-diagonal part of the local
26 storage of the matrix.  This is done in a non scable way since the
27 length of colmap equals the global matrix length.
28 */
29 static int CreateColmap_MPIBAIJ_Private(Mat mat)
30 {
31   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
32   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
33   int        nbs = B->nbs,i;
34 
35   baij->colmap = (int *) PetscMalloc(baij->Nbs*sizeof(int));CHKPTRQ(baij->colmap);
36   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
37   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
38   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i;
39   return 0;
40 }
41 
42 
43 static int MatGetReordering_MPIBAIJ(Mat mat,MatReordering type,IS *rperm,IS *cperm)
44 {
45   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
46   int         ierr;
47   if (baij->size == 1) {
48     ierr = MatGetReordering(baij->A,type,rperm,cperm); CHKERRQ(ierr);
49   } else SETERRQ(1,"MatGetReordering_MPIBAIJ:not supported in parallel");
50   return 0;
51 }
52 
53 static int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
54 {
55   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
56   Scalar      value;
57   int         ierr,i,j, rstart = baij->rstart, rend = baij->rend;
58   int         cstart = baij->cstart, cend = baij->cend,row,col;
59   int         roworiented = baij->roworiented,rstart_orig,rend_orig;
60   int         cstart_orig,cend_orig,bs=baij->bs;
61 
62   if (baij->insertmode != NOT_SET_VALUES && baij->insertmode != addv) {
63     SETERRQ(1,"MatSetValues_MPIBAIJ:Cannot mix inserts and adds");
64   }
65   baij->insertmode = addv;
66   rstart_orig = rstart*bs;
67   rend_orig   = rend*bs;
68   cstart_orig = cstart*bs;
69   cend_orig   = cend*bs;
70   for ( i=0; i<m; i++ ) {
71     if (im[i] < 0) SETERRQ(1,"MatSetValues_MPIBAIJ:Negative row");
72     if (im[i] >= baij->M) SETERRQ(1,"MatSetValues_MPIBAIJ:Row too large");
73     if (im[i] >= rstart_orig && im[i] < rend_orig) {
74       row = im[i] - rstart_orig;
75       for ( j=0; j<n; j++ ) {
76         if (in[j] < 0) SETERRQ(1,"MatSetValues_MPIBAIJ:Negative column");
77         if (in[j] >= baij->N) SETERRQ(1,"MatSetValues_MPIBAIJ:Col too large");
78         if (in[j] >= cstart_orig && in[j] < cend_orig){
79           col = in[j] - cstart_orig;
80           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
81           ierr = MatSetValues(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
82         }
83         else {
84           if (mat->was_assembled) {
85             if (!baij->colmap) {ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);}
86             col = baij->colmap[in[j]/bs] +in[j]%bs;
87             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
88               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
89               col =  in[j];
90             }
91           }
92           else col = in[j];
93           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
94           ierr = MatSetValues(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
95         }
96       }
97     }
98     else {
99       if (roworiented) {
100         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
101       }
102       else {
103         row = im[i];
104         for ( j=0; j<n; j++ ) {
105           ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
106         }
107       }
108     }
109   }
110   return 0;
111 }
112 
113 static int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
114 {
115   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
116   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
117   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col;
118 
119   for ( i=0; i<m; i++ ) {
120     if (idxm[i] < 0) SETERRQ(1,"MatGetValues_MPIBAIJ:Negative row");
121     if (idxm[i] >= baij->M) SETERRQ(1,"MatGetValues_MPIBAIJ:Row too large");
122     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
123       row = idxm[i] - bsrstart;
124       for ( j=0; j<n; j++ ) {
125         if (idxn[j] < 0) SETERRQ(1,"MatGetValues_MPIBAIJ:Negative column");
126         if (idxn[j] >= baij->N) SETERRQ(1,"MatGetValues_MPIBAIJ:Col too large");
127         if (idxn[j] >= bscstart && idxn[j] < bscend){
128           col = idxn[j] - bscstart;
129           ierr = MatGetValues(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
130         }
131         else {
132           if (!baij->colmap) {ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);}
133           if ( baij->garray[baij->colmap[idxn[j]/bs]] != idxn[j]/bs ) *(v+i*n+j) = 0.0;
134           else {
135             col = baij->colmap[idxn[j]/bs]*bs + idxn[j]%bs;
136             ierr = MatGetValues(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
137           }
138         }
139       }
140     }
141     else {
142       SETERRQ(1,"MatGetValues_MPIBAIJ:Only local values currently supported");
143     }
144   }
145   return 0;
146 }
147 
148 static int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
149 {
150   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
151   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
152   int        ierr, i,bs2=baij->bs2;
153   double     sum = 0.0;
154   Scalar     *v;
155 
156   if (baij->size == 1) {
157     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
158   } else {
159     if (type == NORM_FROBENIUS) {
160       v = amat->a;
161       for (i=0; i<amat->nz*bs2; i++ ) {
162 #if defined(PETSC_COMPLEX)
163         sum += real(conj(*v)*(*v)); v++;
164 #else
165         sum += (*v)*(*v); v++;
166 #endif
167       }
168       v = bmat->a;
169       for (i=0; i<bmat->nz*bs2; i++ ) {
170 #if defined(PETSC_COMPLEX)
171         sum += real(conj(*v)*(*v)); v++;
172 #else
173         sum += (*v)*(*v); v++;
174 #endif
175       }
176       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
177       *norm = sqrt(*norm);
178     }
179     else
180       SETERRQ(1,"MatNorm_SeqBAIJ:No support for this norm yet");
181   }
182   return 0;
183 }
184 
185 static int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
186 {
187   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
188   MPI_Comm    comm = mat->comm;
189   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
190   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
191   MPI_Request *send_waits,*recv_waits;
192   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
193   InsertMode  addv;
194   Scalar      *rvalues,*svalues;
195 
196   /* make sure all processors are either in INSERTMODE or ADDMODE */
197   MPI_Allreduce(&baij->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
198   if (addv == (ADD_VALUES|INSERT_VALUES)) {
199     SETERRQ(1,"MatAssemblyBegin_MPIBAIJ:Some processors inserted others added");
200   }
201   baij->insertmode = addv; /* in case this processor had no cache */
202 
203   /*  first count number of contributors to each processor */
204   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
205   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
206   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
207   for ( i=0; i<baij->stash.n; i++ ) {
208     idx = baij->stash.idx[i];
209     for ( j=0; j<size; j++ ) {
210       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
211         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
212       }
213     }
214   }
215   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
216 
217   /* inform other processors of number of messages and max length*/
218   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
219   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
220   nreceives = work[rank];
221   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
222   nmax = work[rank];
223   PetscFree(work);
224 
225   /* post receives:
226        1) each message will consist of ordered pairs
227      (global index,value) we store the global index as a double
228      to simplify the message passing.
229        2) since we don't know how long each individual message is we
230      allocate the largest needed buffer for each receive. Potentially
231      this is a lot of wasted space.
232 
233 
234        This could be done better.
235   */
236   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
237   CHKPTRQ(rvalues);
238   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
239   CHKPTRQ(recv_waits);
240   for ( i=0; i<nreceives; i++ ) {
241     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
242               comm,recv_waits+i);
243   }
244 
245   /* do sends:
246       1) starts[i] gives the starting index in svalues for stuff going to
247          the ith processor
248   */
249   svalues = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
250   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
251   CHKPTRQ(send_waits);
252   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
253   starts[0] = 0;
254   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
255   for ( i=0; i<baij->stash.n; i++ ) {
256     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
257     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
258     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
259   }
260   PetscFree(owner);
261   starts[0] = 0;
262   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
263   count = 0;
264   for ( i=0; i<size; i++ ) {
265     if (procs[i]) {
266       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
267                 comm,send_waits+count++);
268     }
269   }
270   PetscFree(starts); PetscFree(nprocs);
271 
272   /* Free cache space */
273   PLogInfo(0,"[%d]MatAssemblyBegin_MPIBAIJ:Number of off processor values %d\n",rank,baij->stash.n);
274   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
275 
276   baij->svalues    = svalues;    baij->rvalues    = rvalues;
277   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
278   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
279   baij->rmax       = nmax;
280 
281   return 0;
282 }
283 
284 
285 static int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
286 {
287   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
288   MPI_Status  *send_status,recv_status;
289   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
290   int         bs=baij->bs,row,col,other_disassembled;
291   Scalar      *values,val;
292   InsertMode  addv = baij->insertmode;
293 
294   /*  wait on receives */
295   while (count) {
296     MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);
297     /* unpack receives into our local space */
298     values = baij->rvalues + 3*imdex*baij->rmax;
299     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
300     n = n/3;
301     for ( i=0; i<n; i++ ) {
302       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
303       col = (int) PetscReal(values[3*i+1]);
304       val = values[3*i+2];
305       if (col >= baij->cstart*bs && col < baij->cend*bs) {
306         col -= baij->cstart*bs;
307         MatSetValues(baij->A,1,&row,1,&col,&val,addv);
308       }
309       else {
310         if (mat->was_assembled) {
311           if (!baij->colmap) {ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);}
312           col = baij->colmap[col/bs]*bs + col%bs;
313           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
314             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
315             col = (int) PetscReal(values[3*i+1]);
316           }
317         }
318         MatSetValues(baij->B,1,&row,1,&col,&val,addv);
319       }
320     }
321     count--;
322   }
323   PetscFree(baij->recv_waits); PetscFree(baij->rvalues);
324 
325   /* wait on sends */
326   if (baij->nsends) {
327     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));
328     CHKPTRQ(send_status);
329     MPI_Waitall(baij->nsends,baij->send_waits,send_status);
330     PetscFree(send_status);
331   }
332   PetscFree(baij->send_waits); PetscFree(baij->svalues);
333 
334   baij->insertmode = NOT_SET_VALUES;
335   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
336   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
337 
338   /* determine if any processor has disassembled, if so we must
339      also disassemble ourselfs, in order that we may reassemble. */
340   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
341   if (mat->was_assembled && !other_disassembled) {
342     ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
343   }
344 
345   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
346     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
347   }
348   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
349   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
350 
351   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
352   return 0;
353 }
354 
355 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
356 {
357   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
358   int          ierr;
359 
360   if (baij->size == 1) {
361     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
362   }
363   else SETERRQ(1,"MatView_MPIBAIJ_Binary:Only uniprocessor output supported");
364   return 0;
365 }
366 
367 static int MatView_MPIBAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
368 {
369   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
370   int          ierr, format,rank,bs=baij->bs;
371   FILE         *fd;
372   ViewerType   vtype;
373 
374   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
375   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
376     ierr = ViewerGetFormat(viewer,&format);
377     if (format == ASCII_FORMAT_INFO_DETAILED) {
378       int nz, nzalloc, mem;
379       MPI_Comm_rank(mat->comm,&rank);
380       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
381       ierr = MatGetInfo(mat,MAT_LOCAL,&nz,&nzalloc,&mem);
382       PetscSequentialPhaseBegin(mat->comm,1);
383       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
384               rank,baij->m,nz*bs,nzalloc*bs,baij->bs,mem);
385       ierr = MatGetInfo(baij->A,MAT_LOCAL,&nz,&nzalloc,&mem);
386       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,nz*bs);
387       ierr = MatGetInfo(baij->B,MAT_LOCAL,&nz,&nzalloc,&mem);
388       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,nz*bs);
389       fflush(fd);
390       PetscSequentialPhaseEnd(mat->comm,1);
391       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
392       return 0;
393     }
394     else if (format == ASCII_FORMAT_INFO) {
395       return 0;
396     }
397   }
398 
399   if (vtype == DRAW_VIEWER) {
400     Draw       draw;
401     PetscTruth isnull;
402     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
403     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
404   }
405 
406   if (vtype == ASCII_FILE_VIEWER) {
407     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
408     PetscSequentialPhaseBegin(mat->comm,1);
409     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
410            baij->rank,baij->m,baij->rstart*bs,baij->rend*bs,baij->n,
411             baij->cstart*bs,baij->cend*bs);
412     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
413     ierr = MatView(baij->B,viewer); CHKERRQ(ierr);
414     fflush(fd);
415     PetscSequentialPhaseEnd(mat->comm,1);
416   }
417   else {
418     int size = baij->size;
419     rank = baij->rank;
420     if (size == 1) {
421       ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
422     }
423     else {
424       /* assemble the entire matrix onto first processor. */
425       Mat         A;
426       Mat_SeqBAIJ *Aloc;
427       int         M = baij->M, N = baij->N,*ai,*aj,row,col,i,j,k,*rvals;
428       int         mbs=baij->mbs;
429       Scalar      *a;
430 
431       if (!rank) {
432         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
433         CHKERRQ(ierr);
434       }
435       else {
436         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
437         CHKERRQ(ierr);
438       }
439       PLogObjectParent(mat,A);
440 
441       /* copy over the A part */
442       Aloc = (Mat_SeqBAIJ*) baij->A->data;
443       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
444       row = baij->rstart;
445       rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
446 
447       for ( i=0; i<mbs; i++ ) {
448         rvals[0] = bs*(baij->rstart + i);
449         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
450         for ( j=ai[i]; j<ai[i+1]; j++ ) {
451           col = (baij->cstart+aj[j])*bs;
452           for (k=0; k<bs; k++ ) {
453             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
454             col++; a += bs;
455           }
456         }
457       }
458       /* copy over the B part */
459       Aloc = (Mat_SeqBAIJ*) baij->B->data;
460       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
461       row = baij->rstart*bs;
462       for ( i=0; i<mbs; i++ ) {
463         rvals[0] = bs*(baij->rstart + i);
464         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
465         for ( j=ai[i]; j<ai[i+1]; j++ ) {
466           col = baij->garray[aj[j]]*bs;
467           for (k=0; k<bs; k++ ) {
468             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
469             col++; a += bs;
470           }
471         }
472       }
473       PetscFree(rvals);
474       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
475       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
476       if (!rank) {
477         ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
478       }
479       ierr = MatDestroy(A); CHKERRQ(ierr);
480     }
481   }
482   return 0;
483 }
484 
485 
486 
487 static int MatView_MPIBAIJ(PetscObject obj,Viewer viewer)
488 {
489   Mat         mat = (Mat) obj;
490   int         ierr;
491   ViewerType  vtype;
492 
493   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
494   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
495       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
496     ierr = MatView_MPIBAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
497   }
498   else if (vtype == BINARY_FILE_VIEWER) {
499     return MatView_MPIBAIJ_Binary(mat,viewer);
500   }
501   return 0;
502 }
503 
504 static int MatDestroy_MPIBAIJ(PetscObject obj)
505 {
506   Mat         mat = (Mat) obj;
507   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
508   int         ierr;
509 
510 #if defined(PETSC_LOG)
511   PLogObjectState(obj,"Rows=%d, Cols=%d",baij->M,baij->N);
512 #endif
513 
514   PetscFree(baij->rowners);
515   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
516   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
517   if (baij->colmap) PetscFree(baij->colmap);
518   if (baij->garray) PetscFree(baij->garray);
519   if (baij->lvec)   VecDestroy(baij->lvec);
520   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
521   if (baij->rowvalues) PetscFree(baij->rowvalues);
522   PetscFree(baij);
523   PLogObjectDestroy(mat);
524   PetscHeaderDestroy(mat);
525   return 0;
526 }
527 
528 static int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
529 {
530   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
531   int         ierr, nt;
532 
533   ierr = VecGetLocalSize(xx,&nt);  CHKERRQ(ierr);
534   if (nt != a->n) {
535     SETERRQ(1,"MatMult_MPIBAIJ:Incompatible parition of A and xx");
536   }
537   ierr = VecGetLocalSize(yy,&nt);  CHKERRQ(ierr);
538   if (nt != a->m) {
539     SETERRQ(1,"MatMult_MPIBAIJ:Incompatible parition of A and yy");
540   }
541   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx); CHKERRQ(ierr);
542   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
543   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx); CHKERRQ(ierr);
544   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
545   return 0;
546 }
547 
548 static int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
549 {
550   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
551   int        ierr;
552   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
553   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
554   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
555   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
556   return 0;
557 }
558 
559 static int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
560 {
561   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
562   int        ierr;
563 
564   /* do nondiagonal part */
565   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
566   /* send it on its way */
567   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,
568                 (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
569   /* do local part */
570   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
571   /* receive remote parts: note this assumes the values are not actually */
572   /* inserted in yy until the next line, which is true for my implementation*/
573   /* but is not perhaps always true. */
574   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,
575                   (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
576   return 0;
577 }
578 
579 static int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
580 {
581   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
582   int        ierr;
583 
584   /* do nondiagonal part */
585   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
586   /* send it on its way */
587   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,
588                  (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
589   /* do local part */
590   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
591   /* receive remote parts: note this assumes the values are not actually */
592   /* inserted in yy until the next line, which is true for my implementation*/
593   /* but is not perhaps always true. */
594   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,
595                   (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
596   return 0;
597 }
598 
599 /*
600   This only works correctly for square matrices where the subblock A->A is the
601    diagonal block
602 */
603 static int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
604 {
605   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
606   if (a->M != a->N)
607     SETERRQ(1,"MatGetDiagonal_MPIBAIJ:Supports only square matrix where A->A is diag block");
608   return MatGetDiagonal(a->A,v);
609 }
610 
611 static int MatScale_MPIBAIJ(Scalar *aa,Mat A)
612 {
613   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
614   int        ierr;
615   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
616   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
617   return 0;
618 }
619 static int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
620 {
621   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
622   *m = mat->M; *n = mat->N;
623   return 0;
624 }
625 
626 static int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
627 {
628   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
629   *m = mat->m; *n = mat->N;
630   return 0;
631 }
632 
633 static int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
634 {
635   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
636   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
637   return 0;
638 }
639 
640 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
641 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
642 
643 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
644 {
645   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
646   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
647   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
648   int        nztot, nzA, nzB, lrow, brstart = mat->rstart*bs, brend = mat->rend*bs;
649   int        *cmap, *idx_p,cstart = mat->cstart;
650 
651   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,"MatGetRow_MPIBAIJ:Already active");
652   mat->getrowactive = PETSC_TRUE;
653 
654   if (!mat->rowvalues && (idx || v)) {
655     /*
656         allocate enough space to hold information from the longest row.
657     */
658     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
659     int     max = 1,mbs = mat->mbs,tmp;
660     for ( i=0; i<mbs; i++ ) {
661       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
662       if (max < tmp) { max = tmp; }
663     }
664     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
665     CHKPTRQ(mat->rowvalues);
666     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
667   }
668 
669 
670   if (row < brstart || row >= brend) SETERRQ(1,"MatGetRow_MPIBAIJ:Only local rows")
671   lrow = row - brstart;
672 
673   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
674   if (!v)   {pvA = 0; pvB = 0;}
675   if (!idx) {pcA = 0; if (!v) pcB = 0;}
676   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
677   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
678   nztot = nzA + nzB;
679 
680   cmap  = mat->garray;
681   if (v  || idx) {
682     if (nztot) {
683       /* Sort by increasing column numbers, assuming A and B already sorted */
684       int imark = -1;
685       if (v) {
686         *v = v_p = mat->rowvalues;
687         for ( i=0; i<nzB; i++ ) {
688           if (cmap[cworkB[i]/bs] < cstart)   v_p[i] = vworkB[i];
689           else break;
690         }
691         imark = i;
692         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
693         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
694       }
695       if (idx) {
696         *idx = idx_p = mat->rowindices;
697         if (imark > -1) {
698           for ( i=0; i<imark; i++ ) {
699             idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs;
700           }
701         } else {
702           for ( i=0; i<nzB; i++ ) {
703             if (cmap[cworkB[i]/bs] < cstart)
704               idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
705             else break;
706           }
707           imark = i;
708         }
709         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart*bs + cworkA[i];
710         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
711       }
712     }
713     else {
714       if (idx) *idx = 0;
715       if (v)   *v   = 0;
716     }
717   }
718   *nz = nztot;
719   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
720   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
721   return 0;
722 }
723 
724 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
725 {
726   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
727   if (baij->getrowactive == PETSC_FALSE) {
728     SETERRQ(1,"MatRestoreRow_MPIBAIJ:MatGetRow not called");
729   }
730   baij->getrowactive = PETSC_FALSE;
731   return 0;
732 }
733 
734 static int MatGetBlockSize_MPIBAIJ(Mat mat, int *bs)
735 {
736   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
737   *bs = baij->bs;
738   return 0;
739 }
740 
741 static int MatZeroEntries_MPIBAIJ(Mat A)
742 {
743   Mat_MPIBAIJ *l = (Mat_MPIBAIJ *) A->data;
744   int         ierr;
745   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
746   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
747   return 0;
748 }
749 
750 static int MatGetInfo_MPIBAIJ(Mat matin,MatInfoType flag,int *nz,
751                              int *nzalloc,int *mem)
752 {
753   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
754   Mat         A = mat->A, B = mat->B;
755   int         ierr, isend[3], irecv[3], nzA, nzallocA, memA;
756 
757   ierr = MatGetInfo(A,MAT_LOCAL,&nzA,&nzallocA,&memA); CHKERRQ(ierr);
758   ierr = MatGetInfo(B,MAT_LOCAL,&isend[0],&isend[1],&isend[2]); CHKERRQ(ierr);
759   isend[0] += nzA; isend[1] += nzallocA; isend[2] += memA;
760   if (flag == MAT_LOCAL) {
761     if (nz)       *nz      = isend[0];
762     if (nzalloc)  *nzalloc = isend[1];
763     if (mem)      *mem     = isend[2];
764   } else if (flag == MAT_GLOBAL_MAX) {
765     MPI_Allreduce( isend, irecv,3,MPI_INT,MPI_MAX,matin->comm);
766     if (nz)      *nz      = irecv[0];
767     if (nzalloc) *nzalloc = irecv[1];
768     if (mem)     *mem     = irecv[2];
769   } else if (flag == MAT_GLOBAL_SUM) {
770     MPI_Allreduce( isend, irecv,3,MPI_INT,MPI_SUM,matin->comm);
771     if (nz)      *nz      = irecv[0];
772     if (nzalloc) *nzalloc = irecv[1];
773     if (mem)     *mem     = irecv[2];
774   }
775   return 0;
776 }
777 
778 static int MatSetOption_MPIBAIJ(Mat A,MatOption op)
779 {
780   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
781 
782   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
783       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
784       op == MAT_COLUMNS_SORTED ||
785       op == MAT_ROW_ORIENTED) {
786         MatSetOption(a->A,op);
787         MatSetOption(a->B,op);
788   }
789   else if (op == MAT_ROWS_SORTED ||
790            op == MAT_SYMMETRIC ||
791            op == MAT_STRUCTURALLY_SYMMETRIC ||
792            op == MAT_YES_NEW_DIAGONALS)
793     PLogInfo(A,"Info:MatSetOption_MPIBAIJ:Option ignored\n");
794   else if (op == MAT_COLUMN_ORIENTED) {
795     a->roworiented = 0;
796     MatSetOption(a->A,op);
797     MatSetOption(a->B,op);
798   }
799   else if (op == MAT_NO_NEW_DIAGONALS)
800     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIBAIJ:MAT_NO_NEW_DIAGONALS");}
801   else
802     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIBAIJ:unknown option");}
803   return 0;
804 }
805 
806 static int MatTranspose_MPIBAIJ(Mat A,Mat *matout)
807 {
808   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) A->data;
809   Mat_SeqBAIJ *Aloc;
810   Mat        B;
811   int        ierr,M=baij->M,N=baij->N,*ai,*aj,row,i,*rvals,j,k,col;
812   int        bs=baij->bs,mbs=baij->mbs;
813   Scalar     *a;
814 
815   if (matout == PETSC_NULL && M != N)
816     SETERRQ(1,"MatTranspose_MPIBAIJ:Square matrix only for in-place");
817   ierr = MatCreateMPIBAIJ(A->comm,baij->bs,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);
818   CHKERRQ(ierr);
819 
820   /* copy over the A part */
821   Aloc = (Mat_SeqBAIJ*) baij->A->data;
822   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
823   row = baij->rstart;
824   rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
825 
826   for ( i=0; i<mbs; i++ ) {
827     rvals[0] = bs*(baij->rstart + i);
828     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
829     for ( j=ai[i]; j<ai[i+1]; j++ ) {
830       col = (baij->cstart+aj[j])*bs;
831       for (k=0; k<bs; k++ ) {
832         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
833         col++; a += bs;
834       }
835     }
836   }
837   /* copy over the B part */
838   Aloc = (Mat_SeqBAIJ*) baij->B->data;
839   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
840   row = baij->rstart*bs;
841   for ( i=0; i<mbs; i++ ) {
842     rvals[0] = bs*(baij->rstart + i);
843     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
844     for ( j=ai[i]; j<ai[i+1]; j++ ) {
845       col = baij->garray[aj[j]]*bs;
846       for (k=0; k<bs; k++ ) {
847         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
848         col++; a += bs;
849       }
850     }
851   }
852   PetscFree(rvals);
853   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
854   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
855 
856   if (matout != PETSC_NULL) {
857     *matout = B;
858   } else {
859     /* This isn't really an in-place transpose .... but free data structures from baij */
860     PetscFree(baij->rowners);
861     ierr = MatDestroy(baij->A); CHKERRQ(ierr);
862     ierr = MatDestroy(baij->B); CHKERRQ(ierr);
863     if (baij->colmap) PetscFree(baij->colmap);
864     if (baij->garray) PetscFree(baij->garray);
865     if (baij->lvec) VecDestroy(baij->lvec);
866     if (baij->Mvctx) VecScatterDestroy(baij->Mvctx);
867     PetscFree(baij);
868     PetscMemcpy(A,B,sizeof(struct _Mat));
869     PetscHeaderDestroy(B);
870   }
871   return 0;
872 }
873 /* the code does not do the diagonal entries correctly unless the
874    matrix is square and the column and row owerships are identical.
875    This is a BUG. The only way to fix it seems to be to access
876    baij->A and baij->B directly and not through the MatZeroRows()
877    routine.
878 */
879 static int MatZeroRows_MPIBAIJ(Mat A,IS is,Scalar *diag)
880 {
881   Mat_MPIBAIJ    *l = (Mat_MPIBAIJ *) A->data;
882   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
883   int            *procs,*nprocs,j,found,idx,nsends,*work;
884   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
885   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
886   int            *lens,imdex,*lrows,*values,bs=l->bs;
887   MPI_Comm       comm = A->comm;
888   MPI_Request    *send_waits,*recv_waits;
889   MPI_Status     recv_status,*send_status;
890   IS             istmp;
891 
892   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
893   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
894 
895   /*  first count number of contributors to each processor */
896   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
897   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
898   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
899   for ( i=0; i<N; i++ ) {
900     idx = rows[i];
901     found = 0;
902     for ( j=0; j<size; j++ ) {
903       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
904         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
905       }
906     }
907     if (!found) SETERRQ(1,"MatZeroRows_MPIBAIJ:Index out of range");
908   }
909   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
910 
911   /* inform other processors of number of messages and max length*/
912   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
913   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
914   nrecvs = work[rank];
915   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
916   nmax = work[rank];
917   PetscFree(work);
918 
919   /* post receives:   */
920   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
921   CHKPTRQ(rvalues);
922   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
923   CHKPTRQ(recv_waits);
924   for ( i=0; i<nrecvs; i++ ) {
925     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
926   }
927 
928   /* do sends:
929       1) starts[i] gives the starting index in svalues for stuff going to
930          the ith processor
931   */
932   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
933   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
934   CHKPTRQ(send_waits);
935   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
936   starts[0] = 0;
937   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
938   for ( i=0; i<N; i++ ) {
939     svalues[starts[owner[i]]++] = rows[i];
940   }
941   ISRestoreIndices(is,&rows);
942 
943   starts[0] = 0;
944   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
945   count = 0;
946   for ( i=0; i<size; i++ ) {
947     if (procs[i]) {
948       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
949     }
950   }
951   PetscFree(starts);
952 
953   base = owners[rank]*bs;
954 
955   /*  wait on receives */
956   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
957   source = lens + nrecvs;
958   count  = nrecvs; slen = 0;
959   while (count) {
960     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
961     /* unpack receives into our local space */
962     MPI_Get_count(&recv_status,MPI_INT,&n);
963     source[imdex]  = recv_status.MPI_SOURCE;
964     lens[imdex]  = n;
965     slen += n;
966     count--;
967   }
968   PetscFree(recv_waits);
969 
970   /* move the data into the send scatter */
971   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
972   count = 0;
973   for ( i=0; i<nrecvs; i++ ) {
974     values = rvalues + i*nmax;
975     for ( j=0; j<lens[i]; j++ ) {
976       lrows[count++] = values[j] - base;
977     }
978   }
979   PetscFree(rvalues); PetscFree(lens);
980   PetscFree(owner); PetscFree(nprocs);
981 
982   /* actually zap the local rows */
983   ierr = ISCreateSeq(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
984   PLogObjectParent(A,istmp);
985   PetscFree(lrows);
986   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
987   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
988   ierr = ISDestroy(istmp); CHKERRQ(ierr);
989 
990   /* wait on sends */
991   if (nsends) {
992     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
993     CHKPTRQ(send_status);
994     MPI_Waitall(nsends,send_waits,send_status);
995     PetscFree(send_status);
996   }
997   PetscFree(send_waits); PetscFree(svalues);
998 
999   return 0;
1000 }
1001 
1002 static int MatConvertSameType_MPIBAIJ(Mat,Mat *,int);
1003 
1004 /* -------------------------------------------------------------------*/
1005 static struct _MatOps MatOps = {
1006   MatSetValues_MPIBAIJ,MatGetRow_MPIBAIJ,MatRestoreRow_MPIBAIJ,MatMult_MPIBAIJ,
1007   MatMultAdd_MPIBAIJ,MatMultTrans_MPIBAIJ,MatMultTransAdd_MPIBAIJ,MatSolve_MPIBAIJ,
1008   MatSolveAdd_MPIBAIJ,MatSolveTrans_MPIBAIJ,MatSolveTransAdd_MPIBAIJ,MatLUFactor_MPIBAIJ,
1009   0,0,MatTranspose_MPIBAIJ,MatGetInfo_MPIBAIJ,
1010   0,MatGetDiagonal_MPIBAIJ,0,MatNorm_MPIBAIJ,
1011   MatAssemblyBegin_MPIBAIJ,MatAssemblyEnd_MPIBAIJ,0,MatSetOption_MPIBAIJ,
1012   MatZeroEntries_MPIBAIJ,MatZeroRows_MPIBAIJ,MatGetReordering_MPIBAIJ,MatLUFactorSymbolic_MPIBAIJ,
1013   MatLUFactorNumeric_MPIBAIJ,0,0,MatGetSize_MPIBAIJ,
1014   MatGetLocalSize_MPIBAIJ,MatGetOwnershipRange_MPIBAIJ,MatILUFactorSymbolic_MPIBAIJ,0,
1015   0,0,0,0,
1016   0,MatConvertSameType_MPIBAIJ,0,0,
1017   0,0,0,MatGetSubMatrices_MPIBAIJ,
1018   MatIncreaseOverlap_MPIBAIJ,MatGetValues_MPIBAIJ,0,0,
1019   MatScale_MPIBAIJ,0,0,0,MatGetBlockSize_MPIBAIJ};
1020 
1021 
1022 /*@C
1023    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
1024    (block compressed row).  For good matrix assembly performance
1025    the user should preallocate the matrix storage by setting the parameters
1026    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1027    performance can be increased by more than a factor of 50.
1028 
1029    Input Parameters:
1030 .  comm - MPI communicator
1031 .  bs   - size of blockk
1032 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1033            This value should be the same as the local size used in creating the
1034            y vector for the matrix-vector product y = Ax.
1035 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1036            This value should be the same as the local size used in creating the
1037            x vector for the matrix-vector product y = Ax.
1038 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1039 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1040 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
1041            submatrix  (same for all local rows)
1042 .  d_nzz - array containing the number of block nonzeros in the various block rows
1043            of the in diagonal portion of the local (possibly different for each block
1044            row) or PETSC_NULL.  You must leave room for the diagonal entry even if
1045            it is zero.
1046 .  o_nz  - number of block nonzeros per block row in the off-diagonal portion of local
1047            submatrix (same for all local rows).
1048 .  o_nzz - array containing the number of nonzeros in the various block rows of the
1049            off-diagonal portion of the local submatrix (possibly different for
1050            each block row) or PETSC_NULL.
1051 
1052    Output Parameter:
1053 .  A - the matrix
1054 
1055    Notes:
1056    The user MUST specify either the local or global matrix dimensions
1057    (possibly both).
1058 
1059    Storage Information:
1060    For a square global matrix we define each processor's diagonal portion
1061    to be its local rows and the corresponding columns (a square submatrix);
1062    each processor's off-diagonal portion encompasses the remainder of the
1063    local matrix (a rectangular submatrix).
1064 
1065    The user can specify preallocated storage for the diagonal part of
1066    the local submatrix with either d_nz or d_nnz (not both).  Set
1067    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1068    memory allocation.  Likewise, specify preallocated storage for the
1069    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1070 
1071    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1072    the figure below we depict these three local rows and all columns (0-11).
1073 
1074 $          0 1 2 3 4 5 6 7 8 9 10 11
1075 $         -------------------
1076 $  row 3  |  o o o d d d o o o o o o
1077 $  row 4  |  o o o d d d o o o o o o
1078 $  row 5  |  o o o d d d o o o o o o
1079 $         -------------------
1080 $
1081 
1082    Thus, any entries in the d locations are stored in the d (diagonal)
1083    submatrix, and any entries in the o locations are stored in the
1084    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1085    stored simply in the MATSEQBAIJ format for compressed row storage.
1086 
1087    Now d_nz should indicate the number of nonzeros per row in the d matrix,
1088    and o_nz should indicate the number of nonzeros per row in the o matrix.
1089    In general, for PDE problems in which most nonzeros are near the diagonal,
1090    one expects d_nz >> o_nz.   For large problems you MUST preallocate memory
1091    or you will get TERRIBLE performance; see the users' manual chapter on
1092    matrices and the file $(PETSC_DIR)/Performance.
1093 
1094 .keywords: matrix, block, aij, compressed row, sparse, parallel
1095 
1096 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues()
1097 @*/
1098 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
1099                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1100 {
1101   Mat          B;
1102   Mat_MPIBAIJ  *b;
1103   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE;
1104 
1105   if (bs < 1) SETERRQ(1,"MatCreateMPIBAIJ: invalid block size specified");
1106   *A = 0;
1107   PetscHeaderCreate(B,_Mat,MAT_COOKIE,MATMPIBAIJ,comm);
1108   PLogObjectCreate(B);
1109   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
1110   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
1111   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1112   B->destroy    = MatDestroy_MPIBAIJ;
1113   B->view       = MatView_MPIBAIJ;
1114 
1115   B->factor     = 0;
1116   B->assembled  = PETSC_FALSE;
1117 
1118   b->insertmode = NOT_SET_VALUES;
1119   MPI_Comm_rank(comm,&b->rank);
1120   MPI_Comm_size(comm,&b->size);
1121 
1122   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
1123     SETERRQ(1,"MatCreateMPIBAIJ:Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1124   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) SETERRQ(1,"MatCreateMPIBAIJ: either M or m should be specified");
1125   if ( M == PETSC_DECIDE && n == PETSC_DECIDE)SETERRQ(1,"MatCreateMPIBAIJ: either N or n should be specified");
1126   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
1127   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
1128 
1129   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1130     work[0] = m; work[1] = n;
1131     mbs = m/bs; nbs = n/bs;
1132     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
1133     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
1134     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
1135   }
1136   if (m == PETSC_DECIDE) {
1137     Mbs = M/bs;
1138     if (Mbs*bs != M) SETERRQ(1,"MatCreateMPIBAIJ: No of global rows must be divisible by blocksize");
1139     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
1140     m   = mbs*bs;
1141   }
1142   if (n == PETSC_DECIDE) {
1143     Nbs = N/bs;
1144     if (Nbs*bs != N) SETERRQ(1,"MatCreateMPIBAIJ: No of global cols must be divisible by blocksize");
1145     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
1146     n   = nbs*bs;
1147   }
1148   if (mbs*bs != m || nbs*bs != n) SETERRQ(1,"MatCreateMPIBAIJ: No of local rows, cols must be divisible by blocksize");
1149 
1150   b->m = m; B->m = m;
1151   b->n = n; B->n = n;
1152   b->N = N; B->N = N;
1153   b->M = M; B->M = M;
1154   b->bs  = bs;
1155   b->bs2 = bs*bs;
1156   b->mbs = mbs;
1157   b->nbs = nbs;
1158   b->Mbs = Mbs;
1159   b->Nbs = Nbs;
1160 
1161   /* build local table of row and column ownerships */
1162   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1163   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
1164   b->cowners = b->rowners + b->size + 2;
1165   MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
1166   b->rowners[0] = 0;
1167   for ( i=2; i<=b->size; i++ ) {
1168     b->rowners[i] += b->rowners[i-1];
1169   }
1170   b->rstart = b->rowners[b->rank];
1171   b->rend   = b->rowners[b->rank+1];
1172   MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
1173   b->cowners[0] = 0;
1174   for ( i=2; i<=b->size; i++ ) {
1175     b->cowners[i] += b->cowners[i-1];
1176   }
1177   b->cstart = b->cowners[b->rank];
1178   b->cend   = b->cowners[b->rank+1];
1179 
1180   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1181   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1182   PLogObjectParent(B,b->A);
1183   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1184   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1185   PLogObjectParent(B,b->B);
1186 
1187   /* build cache for off array entries formed */
1188   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1189   b->colmap      = 0;
1190   b->garray      = 0;
1191   b->roworiented = 1;
1192 
1193   /* stuff used for matrix vector multiply */
1194   b->lvec      = 0;
1195   b->Mvctx     = 0;
1196 
1197   /* stuff for MatGetRow() */
1198   b->rowindices   = 0;
1199   b->rowvalues    = 0;
1200   b->getrowactive = PETSC_FALSE;
1201 
1202   *A = B;
1203   return 0;
1204 }
1205 static int MatConvertSameType_MPIBAIJ(Mat matin,Mat *newmat,int cpvalues)
1206 {
1207   Mat        mat;
1208   Mat_MPIBAIJ *a,*oldmat = (Mat_MPIBAIJ *) matin->data;
1209   int        ierr, len=0, flg;
1210 
1211   *newmat       = 0;
1212   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIBAIJ,matin->comm);
1213   PLogObjectCreate(mat);
1214   mat->data       = (void *) (a = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(a);
1215   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1216   mat->destroy    = MatDestroy_MPIBAIJ;
1217   mat->view       = MatView_MPIBAIJ;
1218   mat->factor     = matin->factor;
1219   mat->assembled  = PETSC_TRUE;
1220 
1221   a->m = mat->m   = oldmat->m;
1222   a->n = mat->n   = oldmat->n;
1223   a->M = mat->M   = oldmat->M;
1224   a->N = mat->N   = oldmat->N;
1225 
1226   a->bs  = oldmat->bs;
1227   a->bs2 = oldmat->bs2;
1228   a->mbs = oldmat->mbs;
1229   a->nbs = oldmat->nbs;
1230   a->Mbs = oldmat->Mbs;
1231   a->Nbs = oldmat->Nbs;
1232 
1233   a->rstart       = oldmat->rstart;
1234   a->rend         = oldmat->rend;
1235   a->cstart       = oldmat->cstart;
1236   a->cend         = oldmat->cend;
1237   a->size         = oldmat->size;
1238   a->rank         = oldmat->rank;
1239   a->insertmode   = NOT_SET_VALUES;
1240   a->rowvalues    = 0;
1241   a->getrowactive = PETSC_FALSE;
1242 
1243   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1244   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
1245   a->cowners = a->rowners + a->size + 2;
1246   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1247   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1248   if (oldmat->colmap) {
1249     a->colmap = (int *) PetscMalloc((a->Nbs)*sizeof(int));CHKPTRQ(a->colmap);
1250     PLogObjectMemory(mat,(a->Nbs)*sizeof(int));
1251     PetscMemcpy(a->colmap,oldmat->colmap,(a->Nbs)*sizeof(int));
1252   } else a->colmap = 0;
1253   if (oldmat->garray && (len = ((Mat_SeqBAIJ *) (oldmat->B->data))->nbs)) {
1254     a->garray = (int *) PetscMalloc(len*sizeof(int)); CHKPTRQ(a->garray);
1255     PLogObjectMemory(mat,len*sizeof(int));
1256     PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1257   } else a->garray = 0;
1258 
1259   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1260   PLogObjectParent(mat,a->lvec);
1261   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1262   PLogObjectParent(mat,a->Mvctx);
1263   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1264   PLogObjectParent(mat,a->A);
1265   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1266   PLogObjectParent(mat,a->B);
1267   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1268   if (flg) {
1269     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1270   }
1271   *newmat = mat;
1272   return 0;
1273 }
1274 
1275 #include "sys.h"
1276 
1277 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
1278 {
1279   Mat          A;
1280   int          i, nz, ierr, j,rstart, rend, fd;
1281   Scalar       *vals,*buf;
1282   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1283   MPI_Status   status;
1284   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
1285   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
1286   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,bs2,Mbs,mbs,extra_rows;
1287   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
1288   int          dcount,kmax,k,nzcount,tmp;
1289 
1290 
1291   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
1292   bs2  = bs*bs;
1293 
1294   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1295   if (!rank) {
1296     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1297     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
1298     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIBAIJ:not matrix object");
1299   }
1300 
1301   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1302   M = header[1]; N = header[2];
1303 
1304 
1305   if (M != N) SETERRQ(1,"MatLoad_SeqBAIJ:Can only do square matrices");
1306 
1307   /*
1308      This code adds extra rows to make sure the number of rows is
1309      divisible by the blocksize
1310   */
1311   Mbs        = M/bs;
1312   extra_rows = bs - M + bs*(Mbs);
1313   if (extra_rows == bs) extra_rows = 0;
1314   else                  Mbs++;
1315   if (extra_rows &&!rank) {
1316     PLogInfo(0,"MatLoad_SeqBAIJ:Padding loaded matrix to match blocksize");
1317   }
1318   /* determine ownership of all rows */
1319   mbs = Mbs/size + ((Mbs % size) > rank);
1320   m   = mbs * bs;
1321   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
1322   browners = rowners + size + 1;
1323   MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1324   rowners[0] = 0;
1325   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
1326   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
1327   rstart = rowners[rank];
1328   rend   = rowners[rank+1];
1329 
1330   /* distribute row lengths to all processors */
1331   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
1332   if (!rank) {
1333     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
1334     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1335     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
1336     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1337     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
1338     MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);
1339     PetscFree(sndcounts);
1340   }
1341   else {
1342     MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);
1343   }
1344 
1345   if (!rank) {
1346     /* calculate the number of nonzeros on each processor */
1347     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1348     PetscMemzero(procsnz,size*sizeof(int));
1349     for ( i=0; i<size; i++ ) {
1350       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
1351         procsnz[i] += rowlengths[j];
1352       }
1353     }
1354     PetscFree(rowlengths);
1355 
1356     /* determine max buffer needed and allocate it */
1357     maxnz = 0;
1358     for ( i=0; i<size; i++ ) {
1359       maxnz = PetscMax(maxnz,procsnz[i]);
1360     }
1361     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1362 
1363     /* read in my part of the matrix column indices  */
1364     nz = procsnz[0];
1365     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1366     mycols = ibuf;
1367     if (size == 1)  nz -= extra_rows;
1368     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1369     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
1370 
1371     /* read in every ones (except the last) and ship off */
1372     for ( i=1; i<size-1; i++ ) {
1373       nz = procsnz[i];
1374       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1375       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1376     }
1377     /* read in the stuff for the last proc */
1378     if ( size != 1 ) {
1379       nz = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
1380       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1381       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
1382       MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);
1383     }
1384     PetscFree(cols);
1385   }
1386   else {
1387     /* determine buffer space needed for message */
1388     nz = 0;
1389     for ( i=0; i<m; i++ ) {
1390       nz += locrowlens[i];
1391     }
1392     ibuf = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1393     mycols = ibuf;
1394     /* receive message of column indices*/
1395     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1396     MPI_Get_count(&status,MPI_INT,&maxnz);
1397     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIBAIJ:something is wrong with file");
1398   }
1399 
1400   /* loop over local rows, determining number of off diagonal entries */
1401   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
1402   odlens = dlens + (rend-rstart);
1403   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
1404   PetscMemzero(mask,3*Mbs*sizeof(int));
1405   masked1 = mask    + Mbs;
1406   masked2 = masked1 + Mbs;
1407   rowcount = 0; nzcount = 0;
1408   for ( i=0; i<mbs; i++ ) {
1409     dcount  = 0;
1410     odcount = 0;
1411     for ( j=0; j<bs; j++ ) {
1412       kmax = locrowlens[rowcount];
1413       for ( k=0; k<kmax; k++ ) {
1414         tmp = mycols[nzcount++]/bs;
1415         if (!mask[tmp]) {
1416           mask[tmp] = 1;
1417           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
1418           else masked1[dcount++] = tmp;
1419         }
1420       }
1421       rowcount++;
1422     }
1423 
1424     dlens[i]  = dcount;
1425     odlens[i] = odcount;
1426 
1427     /* zero out the mask elements we set */
1428     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
1429     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
1430   }
1431 
1432   /* create our matrix */
1433   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);CHKERRQ(ierr);
1434   A = *newmat;
1435   MatSetOption(A,MAT_COLUMNS_SORTED);
1436 
1437   if (!rank) {
1438     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
1439     /* read in my part of the matrix numerical values  */
1440     nz = procsnz[0];
1441     vals = buf;
1442     mycols = ibuf;
1443     if (size == 1)  nz -= extra_rows;
1444     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1445     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
1446     /* insert into matrix */
1447     jj      = rstart*bs;
1448     for ( i=0; i<m; i++ ) {
1449       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1450       mycols += locrowlens[i];
1451       vals   += locrowlens[i];
1452       jj++;
1453     }
1454     /* read in other processors( except the last one) and ship out */
1455     for ( i=1; i<size-1; i++ ) {
1456       nz = procsnz[i];
1457       vals = buf;
1458       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1459       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1460     }
1461     /* the last proc */
1462     if ( size != 1 ){
1463       nz = procsnz[i] - extra_rows;
1464       vals = buf;
1465       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1466       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
1467       MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);
1468     }
1469     PetscFree(procsnz);
1470   }
1471   else {
1472     /* receive numeric values */
1473     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
1474 
1475     /* receive message of values*/
1476     vals = buf;
1477     mycols = ibuf;
1478     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1479     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1480     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIBAIJ:something is wrong with file");
1481 
1482     /* insert into matrix */
1483     jj      = rstart*bs;
1484     for ( i=0; i<m; i++ ) {
1485       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1486       mycols += locrowlens[i];
1487       vals   += locrowlens[i];
1488       jj++;
1489     }
1490   }
1491   PetscFree(locrowlens);
1492   PetscFree(buf);
1493   PetscFree(ibuf);
1494   PetscFree(rowners);
1495   PetscFree(dlens);
1496   PetscFree(mask);
1497   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1498   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1499   return 0;
1500 }
1501 
1502 
1503