xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision c16cb8f2de640b3af1673495cb6d811eff24116a)
1 #ifndef lint
2 static char vcid[] = "$Id: mpibaij.c,v 1.17 1996/07/31 20:25:15 balay Exp bsmith $";
3 #endif
4 
5 #include "mpibaij.h"
6 #include "src/vec/vecimpl.h"
7 
8 #include "draw.h"
9 #include "pinclude/pviewer.h"
10 
11 extern int MatSetUpMultiply_MPIBAIJ(Mat);
12 extern int DisAssemble_MPIBAIJ(Mat);
13 extern int MatIncreaseOverlap_MPIBAIJ(Mat,int,IS *,int);
14 extern int MatGetSubMatrices_MPIBAIJ(Mat,int,IS *,IS *,MatGetSubMatrixCall,Mat **);
15 extern int MatLUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,Mat *);
16 extern int MatLUFactorNumeric_MPIBAIJ(Mat,Mat *);
17 extern int MatLUFactor_MPIBAIJ(Mat,IS,IS,double);
18 extern int MatSolve_MPIBAIJ(Mat,Vec,Vec);
19 extern int MatSolveAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
20 extern int MatSolveTrans_MPIBAIJ(Mat,Vec,Vec);
21 extern int MatSolveTransAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
22 extern int MatILUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,int,Mat *);
23 
24 /* local utility routine that creates a mapping from the global column
25 number to the local number in the off-diagonal part of the local
26 storage of the matrix.  This is done in a non scable way since the
27 length of colmap equals the global matrix length.
28 */
29 static int CreateColmap_MPIBAIJ_Private(Mat mat)
30 {
31   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
32   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
33   int        nbs = B->nbs,i;
34 
35   baij->colmap = (int *) PetscMalloc(baij->Nbs*sizeof(int));CHKPTRQ(baij->colmap);
36   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
37   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
38   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i;
39   return 0;
40 }
41 
42 
43 static int MatGetReordering_MPIBAIJ(Mat mat,MatReordering type,IS *rperm,IS *cperm)
44 {
45   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
46   int         ierr;
47   if (baij->size == 1) {
48     ierr = MatGetReordering(baij->A,type,rperm,cperm); CHKERRQ(ierr);
49   } else SETERRQ(1,"MatGetReordering_MPIBAIJ:not supported in parallel");
50   return 0;
51 }
52 
53 static int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
54 {
55   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
56   Scalar      value;
57   int         ierr,i,j, rstart = baij->rstart, rend = baij->rend;
58   int         cstart = baij->cstart, cend = baij->cend,row,col;
59   int         roworiented = baij->roworiented,rstart_orig,rend_orig;
60   int         cstart_orig,cend_orig,bs=baij->bs;
61 
62   if (baij->insertmode != NOT_SET_VALUES && baij->insertmode != addv) {
63     SETERRQ(1,"MatSetValues_MPIBAIJ:Cannot mix inserts and adds");
64   }
65   baij->insertmode = addv;
66   rstart_orig = rstart*bs;
67   rend_orig   = rend*bs;
68   cstart_orig = cstart*bs;
69   cend_orig   = cend*bs;
70   for ( i=0; i<m; i++ ) {
71     if (im[i] < 0) SETERRQ(1,"MatSetValues_MPIBAIJ:Negative row");
72     if (im[i] >= baij->M) SETERRQ(1,"MatSetValues_MPIBAIJ:Row too large");
73     if (im[i] >= rstart_orig && im[i] < rend_orig) {
74       row = im[i] - rstart_orig;
75       for ( j=0; j<n; j++ ) {
76         if (in[j] < 0) SETERRQ(1,"MatSetValues_MPIBAIJ:Negative column");
77         if (in[j] >= baij->N) SETERRQ(1,"MatSetValues_MPIBAIJ:Col too large");
78         if (in[j] >= cstart_orig && in[j] < cend_orig){
79           col = in[j] - cstart_orig;
80           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
81           ierr = MatSetValues(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
82         }
83         else {
84           if (mat->was_assembled) {
85             if (!baij->colmap) {ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);}
86             col = baij->colmap[in[j]/bs] +in[j]%bs;
87             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
88               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
89               col =  in[j];
90             }
91           }
92           else col = in[j];
93           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
94           ierr = MatSetValues(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
95         }
96       }
97     }
98     else {
99       if (roworiented) {
100         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
101       }
102       else {
103         row = im[i];
104         for ( j=0; j<n; j++ ) {
105           ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
106         }
107       }
108     }
109   }
110   return 0;
111 }
112 
113 static int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
114 {
115   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
116   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
117   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col;
118 
119   for ( i=0; i<m; i++ ) {
120     if (idxm[i] < 0) SETERRQ(1,"MatGetValues_MPIBAIJ:Negative row");
121     if (idxm[i] >= baij->M) SETERRQ(1,"MatGetValues_MPIBAIJ:Row too large");
122     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
123       row = idxm[i] - bsrstart;
124       for ( j=0; j<n; j++ ) {
125         if (idxn[j] < 0) SETERRQ(1,"MatGetValues_MPIBAIJ:Negative column");
126         if (idxn[j] >= baij->N) SETERRQ(1,"MatGetValues_MPIBAIJ:Col too large");
127         if (idxn[j] >= bscstart && idxn[j] < bscend){
128           col = idxn[j] - bscstart;
129           ierr = MatGetValues(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
130         }
131         else {
132           if (!baij->colmap) {ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);}
133           if ( baij->garray[baij->colmap[idxn[j]/bs]] != idxn[j]/bs ) *(v+i*n+j) = 0.0;
134           else {
135             col = baij->colmap[idxn[j]/bs]*bs + idxn[j]%bs;
136             ierr = MatGetValues(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
137           }
138         }
139       }
140     }
141     else {
142       SETERRQ(1,"MatGetValues_MPIBAIJ:Only local values currently supported");
143     }
144   }
145   return 0;
146 }
147 
148 static int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
149 {
150   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
151   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
152   int        ierr, i,bs2=baij->bs2;
153   double     sum = 0.0;
154   Scalar     *v;
155 
156   if (baij->size == 1) {
157     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
158   } else {
159     if (type == NORM_FROBENIUS) {
160       v = amat->a;
161       for (i=0; i<amat->nz*bs2; i++ ) {
162 #if defined(PETSC_COMPLEX)
163         sum += real(conj(*v)*(*v)); v++;
164 #else
165         sum += (*v)*(*v); v++;
166 #endif
167       }
168       v = bmat->a;
169       for (i=0; i<bmat->nz*bs2; i++ ) {
170 #if defined(PETSC_COMPLEX)
171         sum += real(conj(*v)*(*v)); v++;
172 #else
173         sum += (*v)*(*v); v++;
174 #endif
175       }
176       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
177       *norm = sqrt(*norm);
178     }
179     else
180       SETERRQ(1,"MatNorm_SeqBAIJ:No support for this norm yet");
181   }
182   return 0;
183 }
184 
185 static int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
186 {
187   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
188   MPI_Comm    comm = mat->comm;
189   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
190   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
191   MPI_Request *send_waits,*recv_waits;
192   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
193   InsertMode  addv;
194   Scalar      *rvalues,*svalues;
195 
196   /* make sure all processors are either in INSERTMODE or ADDMODE */
197   MPI_Allreduce(&baij->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
198   if (addv == (ADD_VALUES|INSERT_VALUES)) {
199     SETERRQ(1,"MatAssemblyBegin_MPIBAIJ:Some processors inserted others added");
200   }
201   baij->insertmode = addv; /* in case this processor had no cache */
202 
203   /*  first count number of contributors to each processor */
204   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
205   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
206   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
207   for ( i=0; i<baij->stash.n; i++ ) {
208     idx = baij->stash.idx[i];
209     for ( j=0; j<size; j++ ) {
210       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
211         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
212       }
213     }
214   }
215   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
216 
217   /* inform other processors of number of messages and max length*/
218   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
219   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
220   nreceives = work[rank];
221   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
222   nmax = work[rank];
223   PetscFree(work);
224 
225   /* post receives:
226        1) each message will consist of ordered pairs
227      (global index,value) we store the global index as a double
228      to simplify the message passing.
229        2) since we don't know how long each individual message is we
230      allocate the largest needed buffer for each receive. Potentially
231      this is a lot of wasted space.
232 
233 
234        This could be done better.
235   */
236   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
237   CHKPTRQ(rvalues);
238   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
239   CHKPTRQ(recv_waits);
240   for ( i=0; i<nreceives; i++ ) {
241     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
242               comm,recv_waits+i);
243   }
244 
245   /* do sends:
246       1) starts[i] gives the starting index in svalues for stuff going to
247          the ith processor
248   */
249   svalues = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
250   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
251   CHKPTRQ(send_waits);
252   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
253   starts[0] = 0;
254   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
255   for ( i=0; i<baij->stash.n; i++ ) {
256     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
257     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
258     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
259   }
260   PetscFree(owner);
261   starts[0] = 0;
262   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
263   count = 0;
264   for ( i=0; i<size; i++ ) {
265     if (procs[i]) {
266       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
267                 comm,send_waits+count++);
268     }
269   }
270   PetscFree(starts); PetscFree(nprocs);
271 
272   /* Free cache space */
273   PLogInfo(0,"[%d]MatAssemblyBegin_MPIBAIJ:Number of off processor values %d\n",rank,baij->stash.n);
274   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
275 
276   baij->svalues    = svalues;    baij->rvalues    = rvalues;
277   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
278   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
279   baij->rmax       = nmax;
280 
281   return 0;
282 }
283 
284 
285 static int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
286 {
287   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
288   MPI_Status  *send_status,recv_status;
289   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
290   int         bs=baij->bs,row,col,other_disassembled;
291   Scalar      *values,val;
292   InsertMode  addv = baij->insertmode;
293 
294   /*  wait on receives */
295   while (count) {
296     MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);
297     /* unpack receives into our local space */
298     values = baij->rvalues + 3*imdex*baij->rmax;
299     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
300     n = n/3;
301     for ( i=0; i<n; i++ ) {
302       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
303       col = (int) PetscReal(values[3*i+1]);
304       val = values[3*i+2];
305       if (col >= baij->cstart*bs && col < baij->cend*bs) {
306         col -= baij->cstart*bs;
307         MatSetValues(baij->A,1,&row,1,&col,&val,addv);
308       }
309       else {
310         if (mat->was_assembled) {
311           if (!baij->colmap) {ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);}
312           col = baij->colmap[col/bs]*bs + col%bs;
313           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
314             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
315             col = (int) PetscReal(values[3*i+1]);
316           }
317         }
318         MatSetValues(baij->B,1,&row,1,&col,&val,addv);
319       }
320     }
321     count--;
322   }
323   PetscFree(baij->recv_waits); PetscFree(baij->rvalues);
324 
325   /* wait on sends */
326   if (baij->nsends) {
327     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));
328     CHKPTRQ(send_status);
329     MPI_Waitall(baij->nsends,baij->send_waits,send_status);
330     PetscFree(send_status);
331   }
332   PetscFree(baij->send_waits); PetscFree(baij->svalues);
333 
334   baij->insertmode = NOT_SET_VALUES;
335   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
336   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
337 
338   /* determine if any processor has disassembled, if so we must
339      also disassemble ourselfs, in order that we may reassemble. */
340   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
341   if (mat->was_assembled && !other_disassembled) {
342     ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
343   }
344 
345   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
346     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
347   }
348   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
349   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
350 
351   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
352   return 0;
353 }
354 
355 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
356 {
357   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
358   int          ierr;
359 
360   if (baij->size == 1) {
361     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
362   }
363   else SETERRQ(1,"MatView_MPIBAIJ_Binary:Only uniprocessor output supported");
364   return 0;
365 }
366 
367 static int MatView_MPIBAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
368 {
369   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
370   int          ierr, format,rank,bs=baij->bs;
371   FILE         *fd;
372   ViewerType   vtype;
373 
374   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
375   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
376     ierr = ViewerGetFormat(viewer,&format);
377     if (format == ASCII_FORMAT_INFO_DETAILED) {
378       int nz, nzalloc, mem;
379       MPI_Comm_rank(mat->comm,&rank);
380       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
381       ierr = MatGetInfo(mat,MAT_LOCAL,&nz,&nzalloc,&mem);
382       PetscSequentialPhaseBegin(mat->comm,1);
383       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
384               rank,baij->m,nz*bs,nzalloc*bs,baij->bs,mem);
385       ierr = MatGetInfo(baij->A,MAT_LOCAL,&nz,&nzalloc,&mem);
386       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,nz*bs);
387       ierr = MatGetInfo(baij->B,MAT_LOCAL,&nz,&nzalloc,&mem);
388       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,nz*bs);
389       fflush(fd);
390       PetscSequentialPhaseEnd(mat->comm,1);
391       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
392       return 0;
393     }
394     else if (format == ASCII_FORMAT_INFO) {
395       return 0;
396     }
397   }
398 
399   if (vtype == DRAW_VIEWER) {
400     Draw       draw;
401     PetscTruth isnull;
402     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
403     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
404   }
405 
406   if (vtype == ASCII_FILE_VIEWER) {
407     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
408     PetscSequentialPhaseBegin(mat->comm,1);
409     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
410            baij->rank,baij->m,baij->rstart*bs,baij->rend*bs,baij->n,
411             baij->cstart*bs,baij->cend*bs);
412     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
413     ierr = MatView(baij->B,viewer); CHKERRQ(ierr);
414     fflush(fd);
415     PetscSequentialPhaseEnd(mat->comm,1);
416   }
417   else {
418     int size = baij->size;
419     rank = baij->rank;
420     if (size == 1) {
421       ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
422     }
423     else {
424       /* assemble the entire matrix onto first processor. */
425       Mat         A;
426       Mat_SeqBAIJ *Aloc;
427       int         M = baij->M, N = baij->N,*ai,*aj,row,col,i,j,k,*rvals;
428       int         mbs=baij->mbs;
429       Scalar      *a;
430 
431       if (!rank) {
432         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
433         CHKERRQ(ierr);
434       }
435       else {
436         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
437         CHKERRQ(ierr);
438       }
439       PLogObjectParent(mat,A);
440 
441       /* copy over the A part */
442       Aloc = (Mat_SeqBAIJ*) baij->A->data;
443       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
444       row = baij->rstart;
445       rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
446 
447       for ( i=0; i<mbs; i++ ) {
448         rvals[0] = bs*(baij->rstart + i);
449         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
450         for ( j=ai[i]; j<ai[i+1]; j++ ) {
451           col = (baij->cstart+aj[j])*bs;
452           for (k=0; k<bs; k++ ) {
453             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
454             col++; a += bs;
455           }
456         }
457       }
458       /* copy over the B part */
459       Aloc = (Mat_SeqBAIJ*) baij->B->data;
460       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
461       row = baij->rstart*bs;
462       for ( i=0; i<mbs; i++ ) {
463         rvals[0] = bs*(baij->rstart + i);
464         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
465         for ( j=ai[i]; j<ai[i+1]; j++ ) {
466           col = baij->garray[aj[j]]*bs;
467           for (k=0; k<bs; k++ ) {
468             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
469             col++; a += bs;
470           }
471         }
472       }
473       PetscFree(rvals);
474       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
475       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
476       if (!rank) {
477         ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
478       }
479       ierr = MatDestroy(A); CHKERRQ(ierr);
480     }
481   }
482   return 0;
483 }
484 
485 
486 
487 static int MatView_MPIBAIJ(PetscObject obj,Viewer viewer)
488 {
489   Mat         mat = (Mat) obj;
490   int         ierr;
491   ViewerType  vtype;
492 
493   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
494   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
495       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
496     ierr = MatView_MPIBAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
497   }
498   else if (vtype == BINARY_FILE_VIEWER) {
499     return MatView_MPIBAIJ_Binary(mat,viewer);
500   }
501   return 0;
502 }
503 
504 static int MatDestroy_MPIBAIJ(PetscObject obj)
505 {
506   Mat         mat = (Mat) obj;
507   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
508   int         ierr;
509 
510 #if defined(PETSC_LOG)
511   PLogObjectState(obj,"Rows=%d, Cols=%d",baij->M,baij->N);
512 #endif
513 
514   PetscFree(baij->rowners);
515   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
516   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
517   if (baij->colmap) PetscFree(baij->colmap);
518   if (baij->garray) PetscFree(baij->garray);
519   if (baij->lvec)   VecDestroy(baij->lvec);
520   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
521   if (baij->rowvalues) PetscFree(baij->rowvalues);
522   PetscFree(baij);
523   PLogObjectDestroy(mat);
524   PetscHeaderDestroy(mat);
525   return 0;
526 }
527 
528 static int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
529 {
530   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
531   int         ierr, nt;
532 
533   VecGetLocalSize_Fast(xx,nt);
534   if (nt != a->n) {
535     SETERRQ(1,"MatMult_MPIBAIJ:Incompatible parition of A and xx");
536   }
537   VecGetLocalSize_Fast(yy,nt);
538   if (nt != a->m) {
539     SETERRQ(1,"MatMult_MPIBAIJ:Incompatible parition of A and yy");
540   }
541   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
542   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
543   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
544   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
545   return 0;
546 }
547 
548 static int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
549 {
550   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
551   int        ierr;
552   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
553   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
554   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
555   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
556   return 0;
557 }
558 
559 static int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
560 {
561   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
562   int        ierr;
563 
564   /* do nondiagonal part */
565   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
566   /* send it on its way */
567   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,
568                 (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
569   /* do local part */
570   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
571   /* receive remote parts: note this assumes the values are not actually */
572   /* inserted in yy until the next line, which is true for my implementation*/
573   /* but is not perhaps always true. */
574   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,
575                   (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
576   return 0;
577 }
578 
579 static int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
580 {
581   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
582   int        ierr;
583 
584   /* do nondiagonal part */
585   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
586   /* send it on its way */
587   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,
588                  (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
589   /* do local part */
590   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
591   /* receive remote parts: note this assumes the values are not actually */
592   /* inserted in yy until the next line, which is true for my implementation*/
593   /* but is not perhaps always true. */
594   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,
595                   (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
596   return 0;
597 }
598 
599 /*
600   This only works correctly for square matrices where the subblock A->A is the
601    diagonal block
602 */
603 static int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
604 {
605   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
606   if (a->M != a->N)
607     SETERRQ(1,"MatGetDiagonal_MPIBAIJ:Supports only square matrix where A->A is diag block");
608   return MatGetDiagonal(a->A,v);
609 }
610 
611 static int MatScale_MPIBAIJ(Scalar *aa,Mat A)
612 {
613   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
614   int        ierr;
615   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
616   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
617   return 0;
618 }
619 static int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
620 {
621   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
622   *m = mat->M; *n = mat->N;
623   return 0;
624 }
625 
626 static int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
627 {
628   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
629   *m = mat->m; *n = mat->N;
630   return 0;
631 }
632 
633 static int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
634 {
635   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
636   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
637   return 0;
638 }
639 
640 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
641 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
642 
643 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
644 {
645   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
646   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
647   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
648   int        nztot, nzA, nzB, lrow, brstart = mat->rstart*bs, brend = mat->rend*bs;
649   int        *cmap, *idx_p,cstart = mat->cstart;
650 
651   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,"MatGetRow_MPIBAIJ:Already active");
652   mat->getrowactive = PETSC_TRUE;
653 
654   if (!mat->rowvalues && (idx || v)) {
655     /*
656         allocate enough space to hold information from the longest row.
657     */
658     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
659     int     max = 1,mbs = mat->mbs,tmp;
660     for ( i=0; i<mbs; i++ ) {
661       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
662       if (max < tmp) { max = tmp; }
663     }
664     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
665     CHKPTRQ(mat->rowvalues);
666     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
667   }
668 
669 
670   if (row < brstart || row >= brend) SETERRQ(1,"MatGetRow_MPIBAIJ:Only local rows")
671   lrow = row - brstart;
672 
673   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
674   if (!v)   {pvA = 0; pvB = 0;}
675   if (!idx) {pcA = 0; if (!v) pcB = 0;}
676   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
677   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
678   nztot = nzA + nzB;
679 
680   cmap  = mat->garray;
681   if (v  || idx) {
682     if (nztot) {
683       /* Sort by increasing column numbers, assuming A and B already sorted */
684       int imark = -1;
685       if (v) {
686         *v = v_p = mat->rowvalues;
687         for ( i=0; i<nzB; i++ ) {
688           if (cmap[cworkB[i]/bs] < cstart)   v_p[i] = vworkB[i];
689           else break;
690         }
691         imark = i;
692         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
693         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
694       }
695       if (idx) {
696         *idx = idx_p = mat->rowindices;
697         if (imark > -1) {
698           for ( i=0; i<imark; i++ ) {
699             idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs;
700           }
701         } else {
702           for ( i=0; i<nzB; i++ ) {
703             if (cmap[cworkB[i]/bs] < cstart)
704               idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
705             else break;
706           }
707           imark = i;
708         }
709         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart*bs + cworkA[i];
710         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
711       }
712     }
713     else {
714       if (idx) *idx = 0;
715       if (v)   *v   = 0;
716     }
717   }
718   *nz = nztot;
719   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
720   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
721   return 0;
722 }
723 
724 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
725 {
726   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
727   if (baij->getrowactive == PETSC_FALSE) {
728     SETERRQ(1,"MatRestoreRow_MPIBAIJ:MatGetRow not called");
729   }
730   baij->getrowactive = PETSC_FALSE;
731   return 0;
732 }
733 
734 static int MatGetBlockSize_MPIBAIJ(Mat mat, int *bs)
735 {
736   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
737   *bs = baij->bs;
738   return 0;
739 }
740 
741 static int MatZeroEntries_MPIBAIJ(Mat A)
742 {
743   Mat_MPIBAIJ *l = (Mat_MPIBAIJ *) A->data;
744   int         ierr;
745   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
746   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
747   return 0;
748 }
749 
750 static int MatGetInfo_MPIBAIJ(Mat matin,MatInfoType flag,int *nz,
751                              int *nzalloc,int *mem)
752 {
753   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
754   Mat         A = mat->A, B = mat->B;
755   int         ierr, isend[3], irecv[3], nzA, nzallocA, memA;
756 
757   ierr = MatGetInfo(A,MAT_LOCAL,&nzA,&nzallocA,&memA); CHKERRQ(ierr);
758   ierr = MatGetInfo(B,MAT_LOCAL,&isend[0],&isend[1],&isend[2]); CHKERRQ(ierr);
759   isend[0] += nzA; isend[1] += nzallocA; isend[2] += memA;
760   if (flag == MAT_LOCAL) {
761     if (nz)       *nz      = isend[0];
762     if (nzalloc)  *nzalloc = isend[1];
763     if (mem)      *mem     = isend[2];
764   } else if (flag == MAT_GLOBAL_MAX) {
765     MPI_Allreduce( isend, irecv,3,MPI_INT,MPI_MAX,matin->comm);
766     if (nz)      *nz      = irecv[0];
767     if (nzalloc) *nzalloc = irecv[1];
768     if (mem)     *mem     = irecv[2];
769   } else if (flag == MAT_GLOBAL_SUM) {
770     MPI_Allreduce( isend, irecv,3,MPI_INT,MPI_SUM,matin->comm);
771     if (nz)      *nz      = irecv[0];
772     if (nzalloc) *nzalloc = irecv[1];
773     if (mem)     *mem     = irecv[2];
774   }
775   return 0;
776 }
777 
778 static int MatSetOption_MPIBAIJ(Mat A,MatOption op)
779 {
780   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
781 
782   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
783       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
784       op == MAT_COLUMNS_SORTED ||
785       op == MAT_ROW_ORIENTED) {
786         MatSetOption(a->A,op);
787         MatSetOption(a->B,op);
788   }
789   else if (op == MAT_ROWS_SORTED ||
790            op == MAT_SYMMETRIC ||
791            op == MAT_STRUCTURALLY_SYMMETRIC ||
792            op == MAT_YES_NEW_DIAGONALS)
793     PLogInfo(A,"Info:MatSetOption_MPIBAIJ:Option ignored\n");
794   else if (op == MAT_COLUMN_ORIENTED) {
795     a->roworiented = 0;
796     MatSetOption(a->A,op);
797     MatSetOption(a->B,op);
798   }
799   else if (op == MAT_NO_NEW_DIAGONALS)
800     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIBAIJ:MAT_NO_NEW_DIAGONALS");}
801   else
802     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIBAIJ:unknown option");}
803   return 0;
804 }
805 
806 static int MatTranspose_MPIBAIJ(Mat A,Mat *matout)
807 {
808   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) A->data;
809   Mat_SeqBAIJ *Aloc;
810   Mat        B;
811   int        ierr,M=baij->M,N=baij->N,*ai,*aj,row,i,*rvals,j,k,col;
812   int        bs=baij->bs,mbs=baij->mbs;
813   Scalar     *a;
814 
815   if (matout == PETSC_NULL && M != N)
816     SETERRQ(1,"MatTranspose_MPIBAIJ:Square matrix only for in-place");
817   ierr = MatCreateMPIBAIJ(A->comm,baij->bs,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);
818   CHKERRQ(ierr);
819 
820   /* copy over the A part */
821   Aloc = (Mat_SeqBAIJ*) baij->A->data;
822   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
823   row = baij->rstart;
824   rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
825 
826   for ( i=0; i<mbs; i++ ) {
827     rvals[0] = bs*(baij->rstart + i);
828     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
829     for ( j=ai[i]; j<ai[i+1]; j++ ) {
830       col = (baij->cstart+aj[j])*bs;
831       for (k=0; k<bs; k++ ) {
832         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
833         col++; a += bs;
834       }
835     }
836   }
837   /* copy over the B part */
838   Aloc = (Mat_SeqBAIJ*) baij->B->data;
839   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
840   row = baij->rstart*bs;
841   for ( i=0; i<mbs; i++ ) {
842     rvals[0] = bs*(baij->rstart + i);
843     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
844     for ( j=ai[i]; j<ai[i+1]; j++ ) {
845       col = baij->garray[aj[j]]*bs;
846       for (k=0; k<bs; k++ ) {
847         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
848         col++; a += bs;
849       }
850     }
851   }
852   PetscFree(rvals);
853   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
854   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
855 
856   if (matout != PETSC_NULL) {
857     *matout = B;
858   } else {
859     /* This isn't really an in-place transpose .... but free data structures from baij */
860     PetscFree(baij->rowners);
861     ierr = MatDestroy(baij->A); CHKERRQ(ierr);
862     ierr = MatDestroy(baij->B); CHKERRQ(ierr);
863     if (baij->colmap) PetscFree(baij->colmap);
864     if (baij->garray) PetscFree(baij->garray);
865     if (baij->lvec) VecDestroy(baij->lvec);
866     if (baij->Mvctx) VecScatterDestroy(baij->Mvctx);
867     PetscFree(baij);
868     PetscMemcpy(A,B,sizeof(struct _Mat));
869     PetscHeaderDestroy(B);
870   }
871   return 0;
872 }
873 /* the code does not do the diagonal entries correctly unless the
874    matrix is square and the column and row owerships are identical.
875    This is a BUG. The only way to fix it seems to be to access
876    baij->A and baij->B directly and not through the MatZeroRows()
877    routine.
878 */
879 static int MatZeroRows_MPIBAIJ(Mat A,IS is,Scalar *diag)
880 {
881   Mat_MPIBAIJ    *l = (Mat_MPIBAIJ *) A->data;
882   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
883   int            *procs,*nprocs,j,found,idx,nsends,*work;
884   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
885   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
886   int            *lens,imdex,*lrows,*values,bs=l->bs;
887   MPI_Comm       comm = A->comm;
888   MPI_Request    *send_waits,*recv_waits;
889   MPI_Status     recv_status,*send_status;
890   IS             istmp;
891 
892   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
893   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
894 
895   /*  first count number of contributors to each processor */
896   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
897   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
898   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
899   for ( i=0; i<N; i++ ) {
900     idx = rows[i];
901     found = 0;
902     for ( j=0; j<size; j++ ) {
903       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
904         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
905       }
906     }
907     if (!found) SETERRQ(1,"MatZeroRows_MPIBAIJ:Index out of range");
908   }
909   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
910 
911   /* inform other processors of number of messages and max length*/
912   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
913   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
914   nrecvs = work[rank];
915   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
916   nmax = work[rank];
917   PetscFree(work);
918 
919   /* post receives:   */
920   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
921   CHKPTRQ(rvalues);
922   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
923   CHKPTRQ(recv_waits);
924   for ( i=0; i<nrecvs; i++ ) {
925     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
926   }
927 
928   /* do sends:
929       1) starts[i] gives the starting index in svalues for stuff going to
930          the ith processor
931   */
932   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
933   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
934   CHKPTRQ(send_waits);
935   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
936   starts[0] = 0;
937   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
938   for ( i=0; i<N; i++ ) {
939     svalues[starts[owner[i]]++] = rows[i];
940   }
941   ISRestoreIndices(is,&rows);
942 
943   starts[0] = 0;
944   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
945   count = 0;
946   for ( i=0; i<size; i++ ) {
947     if (procs[i]) {
948       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
949     }
950   }
951   PetscFree(starts);
952 
953   base = owners[rank]*bs;
954 
955   /*  wait on receives */
956   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
957   source = lens + nrecvs;
958   count  = nrecvs; slen = 0;
959   while (count) {
960     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
961     /* unpack receives into our local space */
962     MPI_Get_count(&recv_status,MPI_INT,&n);
963     source[imdex]  = recv_status.MPI_SOURCE;
964     lens[imdex]  = n;
965     slen += n;
966     count--;
967   }
968   PetscFree(recv_waits);
969 
970   /* move the data into the send scatter */
971   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
972   count = 0;
973   for ( i=0; i<nrecvs; i++ ) {
974     values = rvalues + i*nmax;
975     for ( j=0; j<lens[i]; j++ ) {
976       lrows[count++] = values[j] - base;
977     }
978   }
979   PetscFree(rvalues); PetscFree(lens);
980   PetscFree(owner); PetscFree(nprocs);
981 
982   /* actually zap the local rows */
983   ierr = ISCreateSeq(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
984   PLogObjectParent(A,istmp);
985   PetscFree(lrows);
986   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
987   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
988   ierr = ISDestroy(istmp); CHKERRQ(ierr);
989 
990   /* wait on sends */
991   if (nsends) {
992     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
993     CHKPTRQ(send_status);
994     MPI_Waitall(nsends,send_waits,send_status);
995     PetscFree(send_status);
996   }
997   PetscFree(send_waits); PetscFree(svalues);
998 
999   return 0;
1000 }
1001 extern int MatPrintHelp_SeqBAIJ(Mat);
1002 static int MatPrintHelp_MPIBAIJ(Mat A)
1003 {
1004   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1005 
1006   if (!a->rank) return MatPrintHelp_SeqBAIJ(a->A);
1007   else return 0;
1008 }
1009 
1010 static int MatConvertSameType_MPIBAIJ(Mat,Mat *,int);
1011 
1012 /* -------------------------------------------------------------------*/
1013 static struct _MatOps MatOps = {
1014   MatSetValues_MPIBAIJ,MatGetRow_MPIBAIJ,MatRestoreRow_MPIBAIJ,MatMult_MPIBAIJ,
1015   MatMultAdd_MPIBAIJ,MatMultTrans_MPIBAIJ,MatMultTransAdd_MPIBAIJ,MatSolve_MPIBAIJ,
1016   MatSolveAdd_MPIBAIJ,MatSolveTrans_MPIBAIJ,MatSolveTransAdd_MPIBAIJ,MatLUFactor_MPIBAIJ,
1017   0,0,MatTranspose_MPIBAIJ,MatGetInfo_MPIBAIJ,
1018   0,MatGetDiagonal_MPIBAIJ,0,MatNorm_MPIBAIJ,
1019   MatAssemblyBegin_MPIBAIJ,MatAssemblyEnd_MPIBAIJ,0,MatSetOption_MPIBAIJ,
1020   MatZeroEntries_MPIBAIJ,MatZeroRows_MPIBAIJ,MatGetReordering_MPIBAIJ,MatLUFactorSymbolic_MPIBAIJ,
1021   MatLUFactorNumeric_MPIBAIJ,0,0,MatGetSize_MPIBAIJ,
1022   MatGetLocalSize_MPIBAIJ,MatGetOwnershipRange_MPIBAIJ,MatILUFactorSymbolic_MPIBAIJ,0,
1023   0,0,0,0,
1024   0,MatConvertSameType_MPIBAIJ,0,0,
1025   0,0,0,MatGetSubMatrices_MPIBAIJ,
1026   MatIncreaseOverlap_MPIBAIJ,MatGetValues_MPIBAIJ,0,MatPrintHelp_MPIBAIJ,
1027   MatScale_MPIBAIJ,0,0,0,MatGetBlockSize_MPIBAIJ};
1028 
1029 
1030 /*@C
1031    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
1032    (block compressed row).  For good matrix assembly performance
1033    the user should preallocate the matrix storage by setting the parameters
1034    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1035    performance can be increased by more than a factor of 50.
1036 
1037    Input Parameters:
1038 .  comm - MPI communicator
1039 .  bs   - size of blockk
1040 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1041            This value should be the same as the local size used in creating the
1042            y vector for the matrix-vector product y = Ax.
1043 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1044            This value should be the same as the local size used in creating the
1045            x vector for the matrix-vector product y = Ax.
1046 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1047 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1048 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
1049            submatrix  (same for all local rows)
1050 .  d_nzz - array containing the number of block nonzeros in the various block rows
1051            of the in diagonal portion of the local (possibly different for each block
1052            row) or PETSC_NULL.  You must leave room for the diagonal entry even if
1053            it is zero.
1054 .  o_nz  - number of block nonzeros per block row in the off-diagonal portion of local
1055            submatrix (same for all local rows).
1056 .  o_nzz - array containing the number of nonzeros in the various block rows of the
1057            off-diagonal portion of the local submatrix (possibly different for
1058            each block row) or PETSC_NULL.
1059 
1060    Output Parameter:
1061 .  A - the matrix
1062 
1063    Notes:
1064    The user MUST specify either the local or global matrix dimensions
1065    (possibly both).
1066 
1067    Storage Information:
1068    For a square global matrix we define each processor's diagonal portion
1069    to be its local rows and the corresponding columns (a square submatrix);
1070    each processor's off-diagonal portion encompasses the remainder of the
1071    local matrix (a rectangular submatrix).
1072 
1073    The user can specify preallocated storage for the diagonal part of
1074    the local submatrix with either d_nz or d_nnz (not both).  Set
1075    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1076    memory allocation.  Likewise, specify preallocated storage for the
1077    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1078 
1079    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1080    the figure below we depict these three local rows and all columns (0-11).
1081 
1082 $          0 1 2 3 4 5 6 7 8 9 10 11
1083 $         -------------------
1084 $  row 3  |  o o o d d d o o o o o o
1085 $  row 4  |  o o o d d d o o o o o o
1086 $  row 5  |  o o o d d d o o o o o o
1087 $         -------------------
1088 $
1089 
1090    Thus, any entries in the d locations are stored in the d (diagonal)
1091    submatrix, and any entries in the o locations are stored in the
1092    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1093    stored simply in the MATSEQBAIJ format for compressed row storage.
1094 
1095    Now d_nz should indicate the number of nonzeros per row in the d matrix,
1096    and o_nz should indicate the number of nonzeros per row in the o matrix.
1097    In general, for PDE problems in which most nonzeros are near the diagonal,
1098    one expects d_nz >> o_nz.   For large problems you MUST preallocate memory
1099    or you will get TERRIBLE performance; see the users' manual chapter on
1100    matrices and the file $(PETSC_DIR)/Performance.
1101 
1102 .keywords: matrix, block, aij, compressed row, sparse, parallel
1103 
1104 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues()
1105 @*/
1106 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
1107                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1108 {
1109   Mat          B;
1110   Mat_MPIBAIJ  *b;
1111   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE;
1112 
1113   if (bs < 1) SETERRQ(1,"MatCreateMPIBAIJ: invalid block size specified");
1114   *A = 0;
1115   PetscHeaderCreate(B,_Mat,MAT_COOKIE,MATMPIBAIJ,comm);
1116   PLogObjectCreate(B);
1117   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
1118   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
1119   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1120   B->destroy    = MatDestroy_MPIBAIJ;
1121   B->view       = MatView_MPIBAIJ;
1122 
1123   B->factor     = 0;
1124   B->assembled  = PETSC_FALSE;
1125 
1126   b->insertmode = NOT_SET_VALUES;
1127   MPI_Comm_rank(comm,&b->rank);
1128   MPI_Comm_size(comm,&b->size);
1129 
1130   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
1131     SETERRQ(1,"MatCreateMPIBAIJ:Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1132   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) SETERRQ(1,"MatCreateMPIBAIJ: either M or m should be specified");
1133   if ( M == PETSC_DECIDE && n == PETSC_DECIDE)SETERRQ(1,"MatCreateMPIBAIJ: either N or n should be specified");
1134   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
1135   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
1136 
1137   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1138     work[0] = m; work[1] = n;
1139     mbs = m/bs; nbs = n/bs;
1140     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
1141     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
1142     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
1143   }
1144   if (m == PETSC_DECIDE) {
1145     Mbs = M/bs;
1146     if (Mbs*bs != M) SETERRQ(1,"MatCreateMPIBAIJ: No of global rows must be divisible by blocksize");
1147     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
1148     m   = mbs*bs;
1149   }
1150   if (n == PETSC_DECIDE) {
1151     Nbs = N/bs;
1152     if (Nbs*bs != N) SETERRQ(1,"MatCreateMPIBAIJ: No of global cols must be divisible by blocksize");
1153     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
1154     n   = nbs*bs;
1155   }
1156   if (mbs*bs != m || nbs*bs != n) SETERRQ(1,"MatCreateMPIBAIJ: No of local rows, cols must be divisible by blocksize");
1157 
1158   b->m = m; B->m = m;
1159   b->n = n; B->n = n;
1160   b->N = N; B->N = N;
1161   b->M = M; B->M = M;
1162   b->bs  = bs;
1163   b->bs2 = bs*bs;
1164   b->mbs = mbs;
1165   b->nbs = nbs;
1166   b->Mbs = Mbs;
1167   b->Nbs = Nbs;
1168 
1169   /* build local table of row and column ownerships */
1170   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1171   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
1172   b->cowners = b->rowners + b->size + 2;
1173   MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
1174   b->rowners[0] = 0;
1175   for ( i=2; i<=b->size; i++ ) {
1176     b->rowners[i] += b->rowners[i-1];
1177   }
1178   b->rstart = b->rowners[b->rank];
1179   b->rend   = b->rowners[b->rank+1];
1180   MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
1181   b->cowners[0] = 0;
1182   for ( i=2; i<=b->size; i++ ) {
1183     b->cowners[i] += b->cowners[i-1];
1184   }
1185   b->cstart = b->cowners[b->rank];
1186   b->cend   = b->cowners[b->rank+1];
1187 
1188   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1189   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1190   PLogObjectParent(B,b->A);
1191   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1192   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1193   PLogObjectParent(B,b->B);
1194 
1195   /* build cache for off array entries formed */
1196   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1197   b->colmap      = 0;
1198   b->garray      = 0;
1199   b->roworiented = 1;
1200 
1201   /* stuff used for matrix vector multiply */
1202   b->lvec      = 0;
1203   b->Mvctx     = 0;
1204 
1205   /* stuff for MatGetRow() */
1206   b->rowindices   = 0;
1207   b->rowvalues    = 0;
1208   b->getrowactive = PETSC_FALSE;
1209 
1210   *A = B;
1211   return 0;
1212 }
1213 static int MatConvertSameType_MPIBAIJ(Mat matin,Mat *newmat,int cpvalues)
1214 {
1215   Mat        mat;
1216   Mat_MPIBAIJ *a,*oldmat = (Mat_MPIBAIJ *) matin->data;
1217   int        ierr, len=0, flg;
1218 
1219   *newmat       = 0;
1220   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIBAIJ,matin->comm);
1221   PLogObjectCreate(mat);
1222   mat->data       = (void *) (a = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(a);
1223   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1224   mat->destroy    = MatDestroy_MPIBAIJ;
1225   mat->view       = MatView_MPIBAIJ;
1226   mat->factor     = matin->factor;
1227   mat->assembled  = PETSC_TRUE;
1228 
1229   a->m = mat->m   = oldmat->m;
1230   a->n = mat->n   = oldmat->n;
1231   a->M = mat->M   = oldmat->M;
1232   a->N = mat->N   = oldmat->N;
1233 
1234   a->bs  = oldmat->bs;
1235   a->bs2 = oldmat->bs2;
1236   a->mbs = oldmat->mbs;
1237   a->nbs = oldmat->nbs;
1238   a->Mbs = oldmat->Mbs;
1239   a->Nbs = oldmat->Nbs;
1240 
1241   a->rstart       = oldmat->rstart;
1242   a->rend         = oldmat->rend;
1243   a->cstart       = oldmat->cstart;
1244   a->cend         = oldmat->cend;
1245   a->size         = oldmat->size;
1246   a->rank         = oldmat->rank;
1247   a->insertmode   = NOT_SET_VALUES;
1248   a->rowvalues    = 0;
1249   a->getrowactive = PETSC_FALSE;
1250 
1251   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1252   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
1253   a->cowners = a->rowners + a->size + 2;
1254   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1255   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1256   if (oldmat->colmap) {
1257     a->colmap = (int *) PetscMalloc((a->Nbs)*sizeof(int));CHKPTRQ(a->colmap);
1258     PLogObjectMemory(mat,(a->Nbs)*sizeof(int));
1259     PetscMemcpy(a->colmap,oldmat->colmap,(a->Nbs)*sizeof(int));
1260   } else a->colmap = 0;
1261   if (oldmat->garray && (len = ((Mat_SeqBAIJ *) (oldmat->B->data))->nbs)) {
1262     a->garray = (int *) PetscMalloc(len*sizeof(int)); CHKPTRQ(a->garray);
1263     PLogObjectMemory(mat,len*sizeof(int));
1264     PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1265   } else a->garray = 0;
1266 
1267   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1268   PLogObjectParent(mat,a->lvec);
1269   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1270   PLogObjectParent(mat,a->Mvctx);
1271   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1272   PLogObjectParent(mat,a->A);
1273   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1274   PLogObjectParent(mat,a->B);
1275   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1276   if (flg) {
1277     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1278   }
1279   *newmat = mat;
1280   return 0;
1281 }
1282 
1283 #include "sys.h"
1284 
1285 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
1286 {
1287   Mat          A;
1288   int          i, nz, ierr, j,rstart, rend, fd;
1289   Scalar       *vals,*buf;
1290   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1291   MPI_Status   status;
1292   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
1293   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
1294   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,bs2,Mbs,mbs,extra_rows;
1295   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
1296   int          dcount,kmax,k,nzcount,tmp;
1297 
1298 
1299   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
1300   bs2  = bs*bs;
1301 
1302   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1303   if (!rank) {
1304     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1305     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
1306     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIBAIJ:not matrix object");
1307   }
1308 
1309   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1310   M = header[1]; N = header[2];
1311 
1312 
1313   if (M != N) SETERRQ(1,"MatLoad_SeqBAIJ:Can only do square matrices");
1314 
1315   /*
1316      This code adds extra rows to make sure the number of rows is
1317      divisible by the blocksize
1318   */
1319   Mbs        = M/bs;
1320   extra_rows = bs - M + bs*(Mbs);
1321   if (extra_rows == bs) extra_rows = 0;
1322   else                  Mbs++;
1323   if (extra_rows &&!rank) {
1324     PLogInfo(0,"MatLoad_SeqBAIJ:Padding loaded matrix to match blocksize");
1325   }
1326   /* determine ownership of all rows */
1327   mbs = Mbs/size + ((Mbs % size) > rank);
1328   m   = mbs * bs;
1329   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
1330   browners = rowners + size + 1;
1331   MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1332   rowners[0] = 0;
1333   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
1334   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
1335   rstart = rowners[rank];
1336   rend   = rowners[rank+1];
1337 
1338   /* distribute row lengths to all processors */
1339   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
1340   if (!rank) {
1341     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
1342     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1343     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
1344     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1345     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
1346     MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);
1347     PetscFree(sndcounts);
1348   }
1349   else {
1350     MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);
1351   }
1352 
1353   if (!rank) {
1354     /* calculate the number of nonzeros on each processor */
1355     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1356     PetscMemzero(procsnz,size*sizeof(int));
1357     for ( i=0; i<size; i++ ) {
1358       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
1359         procsnz[i] += rowlengths[j];
1360       }
1361     }
1362     PetscFree(rowlengths);
1363 
1364     /* determine max buffer needed and allocate it */
1365     maxnz = 0;
1366     for ( i=0; i<size; i++ ) {
1367       maxnz = PetscMax(maxnz,procsnz[i]);
1368     }
1369     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1370 
1371     /* read in my part of the matrix column indices  */
1372     nz = procsnz[0];
1373     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1374     mycols = ibuf;
1375     if (size == 1)  nz -= extra_rows;
1376     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1377     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
1378 
1379     /* read in every ones (except the last) and ship off */
1380     for ( i=1; i<size-1; i++ ) {
1381       nz = procsnz[i];
1382       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1383       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1384     }
1385     /* read in the stuff for the last proc */
1386     if ( size != 1 ) {
1387       nz = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
1388       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1389       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
1390       MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);
1391     }
1392     PetscFree(cols);
1393   }
1394   else {
1395     /* determine buffer space needed for message */
1396     nz = 0;
1397     for ( i=0; i<m; i++ ) {
1398       nz += locrowlens[i];
1399     }
1400     ibuf = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1401     mycols = ibuf;
1402     /* receive message of column indices*/
1403     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1404     MPI_Get_count(&status,MPI_INT,&maxnz);
1405     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIBAIJ:something is wrong with file");
1406   }
1407 
1408   /* loop over local rows, determining number of off diagonal entries */
1409   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
1410   odlens = dlens + (rend-rstart);
1411   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
1412   PetscMemzero(mask,3*Mbs*sizeof(int));
1413   masked1 = mask    + Mbs;
1414   masked2 = masked1 + Mbs;
1415   rowcount = 0; nzcount = 0;
1416   for ( i=0; i<mbs; i++ ) {
1417     dcount  = 0;
1418     odcount = 0;
1419     for ( j=0; j<bs; j++ ) {
1420       kmax = locrowlens[rowcount];
1421       for ( k=0; k<kmax; k++ ) {
1422         tmp = mycols[nzcount++]/bs;
1423         if (!mask[tmp]) {
1424           mask[tmp] = 1;
1425           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
1426           else masked1[dcount++] = tmp;
1427         }
1428       }
1429       rowcount++;
1430     }
1431 
1432     dlens[i]  = dcount;
1433     odlens[i] = odcount;
1434 
1435     /* zero out the mask elements we set */
1436     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
1437     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
1438   }
1439 
1440   /* create our matrix */
1441   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);CHKERRQ(ierr);
1442   A = *newmat;
1443   MatSetOption(A,MAT_COLUMNS_SORTED);
1444 
1445   if (!rank) {
1446     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
1447     /* read in my part of the matrix numerical values  */
1448     nz = procsnz[0];
1449     vals = buf;
1450     mycols = ibuf;
1451     if (size == 1)  nz -= extra_rows;
1452     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1453     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
1454     /* insert into matrix */
1455     jj      = rstart*bs;
1456     for ( i=0; i<m; i++ ) {
1457       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1458       mycols += locrowlens[i];
1459       vals   += locrowlens[i];
1460       jj++;
1461     }
1462     /* read in other processors( except the last one) and ship out */
1463     for ( i=1; i<size-1; i++ ) {
1464       nz = procsnz[i];
1465       vals = buf;
1466       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1467       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1468     }
1469     /* the last proc */
1470     if ( size != 1 ){
1471       nz = procsnz[i] - extra_rows;
1472       vals = buf;
1473       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1474       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
1475       MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);
1476     }
1477     PetscFree(procsnz);
1478   }
1479   else {
1480     /* receive numeric values */
1481     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
1482 
1483     /* receive message of values*/
1484     vals = buf;
1485     mycols = ibuf;
1486     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1487     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1488     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIBAIJ:something is wrong with file");
1489 
1490     /* insert into matrix */
1491     jj      = rstart*bs;
1492     for ( i=0; i<m; i++ ) {
1493       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1494       mycols += locrowlens[i];
1495       vals   += locrowlens[i];
1496       jj++;
1497     }
1498   }
1499   PetscFree(locrowlens);
1500   PetscFree(buf);
1501   PetscFree(ibuf);
1502   PetscFree(rowners);
1503   PetscFree(dlens);
1504   PetscFree(mask);
1505   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1506   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1507   return 0;
1508 }
1509 
1510 
1511