xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision bbdbd9a052d0f3015c70f8781ea47bd69446daae)
1 #ifndef lint
2 static char vcid[] = "$Id: mpibaij.c,v 1.28 1996/11/07 15:09:44 bsmith Exp bsmith $";
3 #endif
4 
5 #include "src/mat/impls/baij/mpi/mpibaij.h"
6 #include "src/vec/vecimpl.h"
7 
8 #include "draw.h"
9 #include "pinclude/pviewer.h"
10 
11 extern int MatSetUpMultiply_MPIBAIJ(Mat);
12 extern int DisAssemble_MPIBAIJ(Mat);
13 extern int MatIncreaseOverlap_MPIBAIJ(Mat,int,IS *,int);
14 extern int MatGetSubMatrices_MPIBAIJ(Mat,int,IS *,IS *,MatGetSubMatrixCall,Mat **);
15 extern int MatLUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,Mat *);
16 extern int MatLUFactorNumeric_MPIBAIJ(Mat,Mat *);
17 extern int MatLUFactor_MPIBAIJ(Mat,IS,IS,double);
18 extern int MatSolve_MPIBAIJ(Mat,Vec,Vec);
19 extern int MatSolveAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
20 extern int MatSolveTrans_MPIBAIJ(Mat,Vec,Vec);
21 extern int MatSolveTransAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
22 extern int MatILUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,int,Mat *);
23 
24 
25 /*
26      Local utility routine that creates a mapping from the global column
27    number to the local number in the off-diagonal part of the local
28    storage of the matrix.  This is done in a non scable way since the
29    length of colmap equals the global matrix length.
30 */
31 static int CreateColmap_MPIBAIJ_Private(Mat mat)
32 {
33   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
34   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
35   int         nbs = B->nbs,i;
36 
37   baij->colmap = (int *) PetscMalloc(baij->Nbs*sizeof(int));CHKPTRQ(baij->colmap);
38   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
39   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
40   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i + 1;
41   return 0;
42 }
43 
44 static int MatGetRowIJ_MPIBAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
45                             PetscTruth *done)
46 {
47   Mat_MPIBAIJ *aij = (Mat_MPIBAIJ *) mat->data;
48   int         ierr;
49   if (aij->size == 1) {
50     ierr = MatGetRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
51   } else SETERRQ(1,"MatGetRowIJ_MPIBAIJ:not supported in parallel");
52   return 0;
53 }
54 
55 static int MatRestoreRowIJ_MPIBAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
56                                 PetscTruth *done)
57 {
58   Mat_MPIBAIJ *aij = (Mat_MPIBAIJ *) mat->data;
59   int        ierr;
60   if (aij->size == 1) {
61     ierr = MatRestoreRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
62   } else SETERRQ(1,"MatRestoreRowIJ_MPIBAIJ:not supported in parallel");
63   return 0;
64 }
65 
66 extern int MatSetValues_SeqBAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
67 static int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
68 {
69   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
70   Scalar      value;
71   int         ierr,i,j, rstart = baij->rstart, rend = baij->rend;
72   int         cstart = baij->cstart, cend = baij->cend,row,col;
73   int         roworiented = baij->roworiented,rstart_orig,rend_orig;
74   int         cstart_orig,cend_orig,bs=baij->bs;
75 
76   if (baij->insertmode != NOT_SET_VALUES && baij->insertmode != addv) {
77     SETERRQ(1,"MatSetValues_MPIBAIJ:Cannot mix inserts and adds");
78   }
79   baij->insertmode = addv;
80   rstart_orig = rstart*bs;
81   rend_orig   = rend*bs;
82   cstart_orig = cstart*bs;
83   cend_orig   = cend*bs;
84   for ( i=0; i<m; i++ ) {
85 #if defined(PETSC_BOPT_g)
86     if (im[i] < 0) SETERRQ(1,"MatSetValues_MPIBAIJ:Negative row");
87     if (im[i] >= baij->M) SETERRQ(1,"MatSetValues_MPIBAIJ:Row too large");
88 #endif
89     if (im[i] >= rstart_orig && im[i] < rend_orig) {
90       row = im[i] - rstart_orig;
91       for ( j=0; j<n; j++ ) {
92         if (in[j] >= cstart_orig && in[j] < cend_orig){
93           col = in[j] - cstart_orig;
94           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
95           ierr = MatSetValues_SeqBAIJ(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
96         }
97 #if defined(PETSC_BOPT_g)
98         else if (in[j] < 0) {SETERRQ(1,"MatSetValues_MPIBAIJ:Negative column");}
99         else if (in[j] >= baij->N) {SETERRQ(1,"MatSetValues_MPIBAIJ:Col too large");}
100 #endif
101         else {
102           if (mat->was_assembled) {
103             if (!baij->colmap) {
104               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
105             }
106             col = baij->colmap[in[j]/bs] - 1 + in[j]%bs;
107             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
108               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
109               col =  in[j];
110             }
111           }
112           else col = in[j];
113           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
114           ierr = MatSetValues_SeqBAIJ(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
115         }
116       }
117     }
118     else {
119       if (roworiented) {
120         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
121       }
122       else {
123         row = im[i];
124         for ( j=0; j<n; j++ ) {
125           ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
126         }
127       }
128     }
129   }
130   return 0;
131 }
132 
133 static int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
134 {
135   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
136   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
137   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col;
138 
139   for ( i=0; i<m; i++ ) {
140     if (idxm[i] < 0) SETERRQ(1,"MatGetValues_MPIBAIJ:Negative row");
141     if (idxm[i] >= baij->M) SETERRQ(1,"MatGetValues_MPIBAIJ:Row too large");
142     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
143       row = idxm[i] - bsrstart;
144       for ( j=0; j<n; j++ ) {
145         if (idxn[j] < 0) SETERRQ(1,"MatGetValues_MPIBAIJ:Negative column");
146         if (idxn[j] >= baij->N) SETERRQ(1,"MatGetValues_MPIBAIJ:Col too large");
147         if (idxn[j] >= bscstart && idxn[j] < bscend){
148           col = idxn[j] - bscstart;
149           ierr = MatGetValues(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
150         }
151         else {
152           if (!baij->colmap) {
153             ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
154           }
155           if((baij->colmap[idxn[j]/bs]-1 < 0) ||
156              (baij->garray[baij->colmap[idxn[j]/bs]-1] != idxn[j]/bs)) *(v+i*n+j) = 0.0;
157           else {
158             col  = (baij->colmap[idxn[j]/bs]-1)*bs + idxn[j]%bs;
159             ierr = MatGetValues(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
160           }
161         }
162       }
163     }
164     else {
165       SETERRQ(1,"MatGetValues_MPIBAIJ:Only local values currently supported");
166     }
167   }
168   return 0;
169 }
170 
171 static int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
172 {
173   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
174   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
175   int        ierr, i,bs2=baij->bs2;
176   double     sum = 0.0;
177   Scalar     *v;
178 
179   if (baij->size == 1) {
180     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
181   } else {
182     if (type == NORM_FROBENIUS) {
183       v = amat->a;
184       for (i=0; i<amat->nz*bs2; i++ ) {
185 #if defined(PETSC_COMPLEX)
186         sum += real(conj(*v)*(*v)); v++;
187 #else
188         sum += (*v)*(*v); v++;
189 #endif
190       }
191       v = bmat->a;
192       for (i=0; i<bmat->nz*bs2; i++ ) {
193 #if defined(PETSC_COMPLEX)
194         sum += real(conj(*v)*(*v)); v++;
195 #else
196         sum += (*v)*(*v); v++;
197 #endif
198       }
199       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
200       *norm = sqrt(*norm);
201     }
202     else
203       SETERRQ(1,"MatNorm_SeqBAIJ:No support for this norm yet");
204   }
205   return 0;
206 }
207 
208 static int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
209 {
210   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
211   MPI_Comm    comm = mat->comm;
212   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
213   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
214   MPI_Request *send_waits,*recv_waits;
215   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
216   InsertMode  addv;
217   Scalar      *rvalues,*svalues;
218 
219   /* make sure all processors are either in INSERTMODE or ADDMODE */
220   MPI_Allreduce(&baij->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
221   if (addv == (ADD_VALUES|INSERT_VALUES)) {
222     SETERRQ(1,"MatAssemblyBegin_MPIBAIJ:Some processors inserted others added");
223   }
224   baij->insertmode = addv; /* in case this processor had no cache */
225 
226   /*  first count number of contributors to each processor */
227   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
228   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
229   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
230   for ( i=0; i<baij->stash.n; i++ ) {
231     idx = baij->stash.idx[i];
232     for ( j=0; j<size; j++ ) {
233       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
234         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
235       }
236     }
237   }
238   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
239 
240   /* inform other processors of number of messages and max length*/
241   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
242   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
243   nreceives = work[rank];
244   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
245   nmax = work[rank];
246   PetscFree(work);
247 
248   /* post receives:
249        1) each message will consist of ordered pairs
250      (global index,value) we store the global index as a double
251      to simplify the message passing.
252        2) since we don't know how long each individual message is we
253      allocate the largest needed buffer for each receive. Potentially
254      this is a lot of wasted space.
255 
256 
257        This could be done better.
258   */
259   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
260   CHKPTRQ(rvalues);
261   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
262   CHKPTRQ(recv_waits);
263   for ( i=0; i<nreceives; i++ ) {
264     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
265               comm,recv_waits+i);
266   }
267 
268   /* do sends:
269       1) starts[i] gives the starting index in svalues for stuff going to
270          the ith processor
271   */
272   svalues = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
273   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
274   CHKPTRQ(send_waits);
275   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
276   starts[0] = 0;
277   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
278   for ( i=0; i<baij->stash.n; i++ ) {
279     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
280     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
281     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
282   }
283   PetscFree(owner);
284   starts[0] = 0;
285   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
286   count = 0;
287   for ( i=0; i<size; i++ ) {
288     if (procs[i]) {
289       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
290                 comm,send_waits+count++);
291     }
292   }
293   PetscFree(starts); PetscFree(nprocs);
294 
295   /* Free cache space */
296   PLogInfo(0,"[%d]MatAssemblyBegin_MPIBAIJ:Number of off processor values %d\n",rank,baij->stash.n);
297   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
298 
299   baij->svalues    = svalues;    baij->rvalues    = rvalues;
300   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
301   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
302   baij->rmax       = nmax;
303 
304   return 0;
305 }
306 
307 
308 static int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
309 {
310   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
311   MPI_Status  *send_status,recv_status;
312   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
313   int         bs=baij->bs,row,col,other_disassembled;
314   Scalar      *values,val;
315   InsertMode  addv = baij->insertmode;
316 
317   /*  wait on receives */
318   while (count) {
319     MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);
320     /* unpack receives into our local space */
321     values = baij->rvalues + 3*imdex*baij->rmax;
322     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
323     n = n/3;
324     for ( i=0; i<n; i++ ) {
325       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
326       col = (int) PetscReal(values[3*i+1]);
327       val = values[3*i+2];
328       if (col >= baij->cstart*bs && col < baij->cend*bs) {
329         col -= baij->cstart*bs;
330         MatSetValues(baij->A,1,&row,1,&col,&val,addv);
331       }
332       else {
333         if (mat->was_assembled) {
334           if (!baij->colmap) {
335             ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);
336           }
337           col = (baij->colmap[col/bs]-1)*bs + col%bs;
338           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
339             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
340             col = (int) PetscReal(values[3*i+1]);
341           }
342         }
343         MatSetValues(baij->B,1,&row,1,&col,&val,addv);
344       }
345     }
346     count--;
347   }
348   PetscFree(baij->recv_waits); PetscFree(baij->rvalues);
349 
350   /* wait on sends */
351   if (baij->nsends) {
352     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));
353     CHKPTRQ(send_status);
354     MPI_Waitall(baij->nsends,baij->send_waits,send_status);
355     PetscFree(send_status);
356   }
357   PetscFree(baij->send_waits); PetscFree(baij->svalues);
358 
359   baij->insertmode = NOT_SET_VALUES;
360   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
361   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
362 
363   /* determine if any processor has disassembled, if so we must
364      also disassemble ourselfs, in order that we may reassemble. */
365   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
366   if (mat->was_assembled && !other_disassembled) {
367     ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
368   }
369 
370   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
371     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
372   }
373   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
374   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
375 
376   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
377   return 0;
378 }
379 
380 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
381 {
382   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
383   int          ierr;
384 
385   if (baij->size == 1) {
386     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
387   }
388   else SETERRQ(1,"MatView_MPIBAIJ_Binary:Only uniprocessor output supported");
389   return 0;
390 }
391 
392 static int MatView_MPIBAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
393 {
394   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
395   int          ierr, format,rank,bs = baij->bs;
396   FILE         *fd;
397   ViewerType   vtype;
398 
399   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
400   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
401     ierr = ViewerGetFormat(viewer,&format);
402     if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
403       MatInfo info;
404       MPI_Comm_rank(mat->comm,&rank);
405       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
406       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
407       PetscSequentialPhaseBegin(mat->comm,1);
408       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
409               rank,baij->m,(int)info.nz_used*bs,(int)info.nz_allocated*bs,
410               baij->bs,(int)info.memory);
411       ierr = MatGetInfo(baij->A,MAT_LOCAL,&info);
412       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
413       ierr = MatGetInfo(baij->B,MAT_LOCAL,&info);
414       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
415       fflush(fd);
416       PetscSequentialPhaseEnd(mat->comm,1);
417       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
418       return 0;
419     }
420     else if (format == VIEWER_FORMAT_ASCII_INFO) {
421       PetscPrintf(mat->comm,"  block size is %d\n",bs);
422       return 0;
423     }
424   }
425 
426   if (vtype == DRAW_VIEWER) {
427     Draw       draw;
428     PetscTruth isnull;
429     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
430     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
431   }
432 
433   if (vtype == ASCII_FILE_VIEWER) {
434     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
435     PetscSequentialPhaseBegin(mat->comm,1);
436     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
437            baij->rank,baij->m,baij->rstart*bs,baij->rend*bs,baij->n,
438             baij->cstart*bs,baij->cend*bs);
439     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
440     ierr = MatView(baij->B,viewer); CHKERRQ(ierr);
441     fflush(fd);
442     PetscSequentialPhaseEnd(mat->comm,1);
443   }
444   else {
445     int size = baij->size;
446     rank = baij->rank;
447     if (size == 1) {
448       ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
449     }
450     else {
451       /* assemble the entire matrix onto first processor. */
452       Mat         A;
453       Mat_SeqBAIJ *Aloc;
454       int         M = baij->M, N = baij->N,*ai,*aj,row,col,i,j,k,*rvals;
455       int         mbs=baij->mbs;
456       Scalar      *a;
457 
458       if (!rank) {
459         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
460         CHKERRQ(ierr);
461       }
462       else {
463         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
464         CHKERRQ(ierr);
465       }
466       PLogObjectParent(mat,A);
467 
468       /* copy over the A part */
469       Aloc = (Mat_SeqBAIJ*) baij->A->data;
470       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
471       row = baij->rstart;
472       rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
473 
474       for ( i=0; i<mbs; i++ ) {
475         rvals[0] = bs*(baij->rstart + i);
476         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
477         for ( j=ai[i]; j<ai[i+1]; j++ ) {
478           col = (baij->cstart+aj[j])*bs;
479           for (k=0; k<bs; k++ ) {
480             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
481             col++; a += bs;
482           }
483         }
484       }
485       /* copy over the B part */
486       Aloc = (Mat_SeqBAIJ*) baij->B->data;
487       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
488       row = baij->rstart*bs;
489       for ( i=0; i<mbs; i++ ) {
490         rvals[0] = bs*(baij->rstart + i);
491         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
492         for ( j=ai[i]; j<ai[i+1]; j++ ) {
493           col = baij->garray[aj[j]]*bs;
494           for (k=0; k<bs; k++ ) {
495             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
496             col++; a += bs;
497           }
498         }
499       }
500       PetscFree(rvals);
501       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
502       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
503       if (!rank) {
504         ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
505       }
506       ierr = MatDestroy(A); CHKERRQ(ierr);
507     }
508   }
509   return 0;
510 }
511 
512 
513 
514 static int MatView_MPIBAIJ(PetscObject obj,Viewer viewer)
515 {
516   Mat         mat = (Mat) obj;
517   int         ierr;
518   ViewerType  vtype;
519 
520   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
521   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
522       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
523     ierr = MatView_MPIBAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
524   }
525   else if (vtype == BINARY_FILE_VIEWER) {
526     return MatView_MPIBAIJ_Binary(mat,viewer);
527   }
528   return 0;
529 }
530 
531 static int MatDestroy_MPIBAIJ(PetscObject obj)
532 {
533   Mat         mat = (Mat) obj;
534   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
535   int         ierr;
536 
537 #if defined(PETSC_LOG)
538   PLogObjectState(obj,"Rows=%d, Cols=%d",baij->M,baij->N);
539 #endif
540 
541   PetscFree(baij->rowners);
542   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
543   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
544   if (baij->colmap) PetscFree(baij->colmap);
545   if (baij->garray) PetscFree(baij->garray);
546   if (baij->lvec)   VecDestroy(baij->lvec);
547   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
548   if (baij->rowvalues) PetscFree(baij->rowvalues);
549   PetscFree(baij);
550   PLogObjectDestroy(mat);
551   PetscHeaderDestroy(mat);
552   return 0;
553 }
554 
555 static int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
556 {
557   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
558   int         ierr, nt;
559 
560   VecGetLocalSize_Fast(xx,nt);
561   if (nt != a->n) {
562     SETERRQ(1,"MatMult_MPIBAIJ:Incompatible parition of A and xx");
563   }
564   VecGetLocalSize_Fast(yy,nt);
565   if (nt != a->m) {
566     SETERRQ(1,"MatMult_MPIBAIJ:Incompatible parition of A and yy");
567   }
568   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
569   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
570   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
571   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
572   ierr = VecScatterPostRecvs(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
573   return 0;
574 }
575 
576 static int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
577 {
578   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
579   int        ierr;
580   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
581   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
582   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
583   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
584   return 0;
585 }
586 
587 static int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
588 {
589   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
590   int        ierr;
591 
592   /* do nondiagonal part */
593   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
594   /* send it on its way */
595   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
596   /* do local part */
597   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
598   /* receive remote parts: note this assumes the values are not actually */
599   /* inserted in yy until the next line, which is true for my implementation*/
600   /* but is not perhaps always true. */
601   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
602   return 0;
603 }
604 
605 static int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
606 {
607   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
608   int        ierr;
609 
610   /* do nondiagonal part */
611   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
612   /* send it on its way */
613   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
614   /* do local part */
615   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
616   /* receive remote parts: note this assumes the values are not actually */
617   /* inserted in yy until the next line, which is true for my implementation*/
618   /* but is not perhaps always true. */
619   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
620   return 0;
621 }
622 
623 /*
624   This only works correctly for square matrices where the subblock A->A is the
625    diagonal block
626 */
627 static int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
628 {
629   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
630   if (a->M != a->N)
631     SETERRQ(1,"MatGetDiagonal_MPIBAIJ:Supports only square matrix where A->A is diag block");
632   return MatGetDiagonal(a->A,v);
633 }
634 
635 static int MatScale_MPIBAIJ(Scalar *aa,Mat A)
636 {
637   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
638   int        ierr;
639   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
640   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
641   return 0;
642 }
643 static int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
644 {
645   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
646   *m = mat->M; *n = mat->N;
647   return 0;
648 }
649 
650 static int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
651 {
652   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
653   *m = mat->m; *n = mat->N;
654   return 0;
655 }
656 
657 static int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
658 {
659   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
660   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
661   return 0;
662 }
663 
664 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
665 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
666 
667 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
668 {
669   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
670   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
671   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
672   int        nztot, nzA, nzB, lrow, brstart = mat->rstart*bs, brend = mat->rend*bs;
673   int        *cmap, *idx_p,cstart = mat->cstart;
674 
675   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,"MatGetRow_MPIBAIJ:Already active");
676   mat->getrowactive = PETSC_TRUE;
677 
678   if (!mat->rowvalues && (idx || v)) {
679     /*
680         allocate enough space to hold information from the longest row.
681     */
682     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
683     int     max = 1,mbs = mat->mbs,tmp;
684     for ( i=0; i<mbs; i++ ) {
685       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
686       if (max < tmp) { max = tmp; }
687     }
688     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
689     CHKPTRQ(mat->rowvalues);
690     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
691   }
692 
693 
694   if (row < brstart || row >= brend) SETERRQ(1,"MatGetRow_MPIBAIJ:Only local rows")
695   lrow = row - brstart;
696 
697   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
698   if (!v)   {pvA = 0; pvB = 0;}
699   if (!idx) {pcA = 0; if (!v) pcB = 0;}
700   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
701   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
702   nztot = nzA + nzB;
703 
704   cmap  = mat->garray;
705   if (v  || idx) {
706     if (nztot) {
707       /* Sort by increasing column numbers, assuming A and B already sorted */
708       int imark = -1;
709       if (v) {
710         *v = v_p = mat->rowvalues;
711         for ( i=0; i<nzB; i++ ) {
712           if (cmap[cworkB[i]/bs] < cstart)   v_p[i] = vworkB[i];
713           else break;
714         }
715         imark = i;
716         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
717         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
718       }
719       if (idx) {
720         *idx = idx_p = mat->rowindices;
721         if (imark > -1) {
722           for ( i=0; i<imark; i++ ) {
723             idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs;
724           }
725         } else {
726           for ( i=0; i<nzB; i++ ) {
727             if (cmap[cworkB[i]/bs] < cstart)
728               idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
729             else break;
730           }
731           imark = i;
732         }
733         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart*bs + cworkA[i];
734         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
735       }
736     }
737     else {
738       if (idx) *idx = 0;
739       if (v)   *v   = 0;
740     }
741   }
742   *nz = nztot;
743   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
744   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
745   return 0;
746 }
747 
748 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
749 {
750   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
751   if (baij->getrowactive == PETSC_FALSE) {
752     SETERRQ(1,"MatRestoreRow_MPIBAIJ:MatGetRow not called");
753   }
754   baij->getrowactive = PETSC_FALSE;
755   return 0;
756 }
757 
758 static int MatGetBlockSize_MPIBAIJ(Mat mat,int *bs)
759 {
760   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
761   *bs = baij->bs;
762   return 0;
763 }
764 
765 static int MatZeroEntries_MPIBAIJ(Mat A)
766 {
767   Mat_MPIBAIJ *l = (Mat_MPIBAIJ *) A->data;
768   int         ierr;
769   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
770   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
771   return 0;
772 }
773 
774 static int MatGetInfo_MPIBAIJ(Mat matin,MatInfoType flag,MatInfo *info)
775 {
776   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) matin->data;
777   Mat         A = a->A, B = a->B;
778   int         ierr;
779   double      isend[5], irecv[5];
780 
781   info->rows_global    = (double)a->M;
782   info->columns_global = (double)a->N;
783   info->rows_local     = (double)a->m;
784   info->columns_local  = (double)a->N;
785   info->block_size     = (double)a->bs;
786   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
787   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->memory;
788   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
789   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->memory;
790   if (flag == MAT_LOCAL) {
791     info->nz_used      = isend[0];
792     info->nz_allocated = isend[1];
793     info->nz_unneeded  = isend[2];
794     info->memory       = isend[3];
795     info->mallocs      = isend[4];
796   } else if (flag == MAT_GLOBAL_MAX) {
797     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_MAX,matin->comm);
798     info->nz_used      = irecv[0];
799     info->nz_allocated = irecv[1];
800     info->nz_unneeded  = irecv[2];
801     info->memory       = irecv[3];
802     info->mallocs      = irecv[4];
803   } else if (flag == MAT_GLOBAL_SUM) {
804     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_SUM,matin->comm);
805     info->nz_used      = irecv[0];
806     info->nz_allocated = irecv[1];
807     info->nz_unneeded  = irecv[2];
808     info->memory       = irecv[3];
809     info->mallocs      = irecv[4];
810   }
811   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
812   info->fill_ratio_needed = 0;
813   info->factor_mallocs    = 0;
814   return 0;
815 }
816 
817 static int MatSetOption_MPIBAIJ(Mat A,MatOption op)
818 {
819   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
820 
821   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
822       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
823       op == MAT_COLUMNS_SORTED ||
824       op == MAT_ROW_ORIENTED) {
825         MatSetOption(a->A,op);
826         MatSetOption(a->B,op);
827   }
828   else if (op == MAT_ROWS_SORTED ||
829            op == MAT_SYMMETRIC ||
830            op == MAT_STRUCTURALLY_SYMMETRIC ||
831            op == MAT_YES_NEW_DIAGONALS)
832     PLogInfo(A,"Info:MatSetOption_MPIBAIJ:Option ignored\n");
833   else if (op == MAT_COLUMN_ORIENTED) {
834     a->roworiented = 0;
835     MatSetOption(a->A,op);
836     MatSetOption(a->B,op);
837   }
838   else if (op == MAT_NO_NEW_DIAGONALS)
839     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIBAIJ:MAT_NO_NEW_DIAGONALS");}
840   else
841     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIBAIJ:unknown option");}
842   return 0;
843 }
844 
845 static int MatTranspose_MPIBAIJ(Mat A,Mat *matout)
846 {
847   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) A->data;
848   Mat_SeqBAIJ *Aloc;
849   Mat        B;
850   int        ierr,M=baij->M,N=baij->N,*ai,*aj,row,i,*rvals,j,k,col;
851   int        bs=baij->bs,mbs=baij->mbs;
852   Scalar     *a;
853 
854   if (matout == PETSC_NULL && M != N)
855     SETERRQ(1,"MatTranspose_MPIBAIJ:Square matrix only for in-place");
856   ierr = MatCreateMPIBAIJ(A->comm,baij->bs,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);
857   CHKERRQ(ierr);
858 
859   /* copy over the A part */
860   Aloc = (Mat_SeqBAIJ*) baij->A->data;
861   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
862   row = baij->rstart;
863   rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
864 
865   for ( i=0; i<mbs; i++ ) {
866     rvals[0] = bs*(baij->rstart + i);
867     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
868     for ( j=ai[i]; j<ai[i+1]; j++ ) {
869       col = (baij->cstart+aj[j])*bs;
870       for (k=0; k<bs; k++ ) {
871         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
872         col++; a += bs;
873       }
874     }
875   }
876   /* copy over the B part */
877   Aloc = (Mat_SeqBAIJ*) baij->B->data;
878   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
879   row = baij->rstart*bs;
880   for ( i=0; i<mbs; i++ ) {
881     rvals[0] = bs*(baij->rstart + i);
882     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
883     for ( j=ai[i]; j<ai[i+1]; j++ ) {
884       col = baij->garray[aj[j]]*bs;
885       for (k=0; k<bs; k++ ) {
886         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
887         col++; a += bs;
888       }
889     }
890   }
891   PetscFree(rvals);
892   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
893   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
894 
895   if (matout != PETSC_NULL) {
896     *matout = B;
897   } else {
898     /* This isn't really an in-place transpose .... but free data structures from baij */
899     PetscFree(baij->rowners);
900     ierr = MatDestroy(baij->A); CHKERRQ(ierr);
901     ierr = MatDestroy(baij->B); CHKERRQ(ierr);
902     if (baij->colmap) PetscFree(baij->colmap);
903     if (baij->garray) PetscFree(baij->garray);
904     if (baij->lvec) VecDestroy(baij->lvec);
905     if (baij->Mvctx) VecScatterDestroy(baij->Mvctx);
906     PetscFree(baij);
907     PetscMemcpy(A,B,sizeof(struct _Mat));
908     PetscHeaderDestroy(B);
909   }
910   return 0;
911 }
912 
913 int MatDiagonalScale_MPIBAIJ(Mat A,Vec ll,Vec rr)
914 {
915   Mat a = ((Mat_MPIBAIJ *) A->data)->A;
916   Mat b = ((Mat_MPIBAIJ *) A->data)->B;
917   int ierr,s1,s2,s3;
918 
919   if (ll)  {
920     ierr = VecGetLocalSize(ll,&s1); CHKERRQ(ierr);
921     ierr = MatGetLocalSize(A,&s2,&s3); CHKERRQ(ierr);
922     if (s1!=s2) SETERRQ(1,"MatDiagonalScale_MPIBAIJ: non-conforming local sizes");
923     ierr = MatDiagonalScale(a,ll,0); CHKERRQ(ierr);
924     ierr = MatDiagonalScale(b,ll,0); CHKERRQ(ierr);
925   }
926   if (rr) SETERRQ(1,"MatDiagonalScale_MPIBAIJ:not supported for right vector");
927   return 0;
928 }
929 
930 /* the code does not do the diagonal entries correctly unless the
931    matrix is square and the column and row owerships are identical.
932    This is a BUG. The only way to fix it seems to be to access
933    baij->A and baij->B directly and not through the MatZeroRows()
934    routine.
935 */
936 static int MatZeroRows_MPIBAIJ(Mat A,IS is,Scalar *diag)
937 {
938   Mat_MPIBAIJ    *l = (Mat_MPIBAIJ *) A->data;
939   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
940   int            *procs,*nprocs,j,found,idx,nsends,*work;
941   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
942   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
943   int            *lens,imdex,*lrows,*values,bs=l->bs;
944   MPI_Comm       comm = A->comm;
945   MPI_Request    *send_waits,*recv_waits;
946   MPI_Status     recv_status,*send_status;
947   IS             istmp;
948 
949   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
950   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
951 
952   /*  first count number of contributors to each processor */
953   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
954   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
955   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
956   for ( i=0; i<N; i++ ) {
957     idx = rows[i];
958     found = 0;
959     for ( j=0; j<size; j++ ) {
960       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
961         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
962       }
963     }
964     if (!found) SETERRQ(1,"MatZeroRows_MPIBAIJ:Index out of range");
965   }
966   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
967 
968   /* inform other processors of number of messages and max length*/
969   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
970   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
971   nrecvs = work[rank];
972   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
973   nmax = work[rank];
974   PetscFree(work);
975 
976   /* post receives:   */
977   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
978   CHKPTRQ(rvalues);
979   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
980   CHKPTRQ(recv_waits);
981   for ( i=0; i<nrecvs; i++ ) {
982     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
983   }
984 
985   /* do sends:
986       1) starts[i] gives the starting index in svalues for stuff going to
987          the ith processor
988   */
989   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
990   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
991   CHKPTRQ(send_waits);
992   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
993   starts[0] = 0;
994   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
995   for ( i=0; i<N; i++ ) {
996     svalues[starts[owner[i]]++] = rows[i];
997   }
998   ISRestoreIndices(is,&rows);
999 
1000   starts[0] = 0;
1001   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1002   count = 0;
1003   for ( i=0; i<size; i++ ) {
1004     if (procs[i]) {
1005       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
1006     }
1007   }
1008   PetscFree(starts);
1009 
1010   base = owners[rank]*bs;
1011 
1012   /*  wait on receives */
1013   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
1014   source = lens + nrecvs;
1015   count  = nrecvs; slen = 0;
1016   while (count) {
1017     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
1018     /* unpack receives into our local space */
1019     MPI_Get_count(&recv_status,MPI_INT,&n);
1020     source[imdex]  = recv_status.MPI_SOURCE;
1021     lens[imdex]  = n;
1022     slen += n;
1023     count--;
1024   }
1025   PetscFree(recv_waits);
1026 
1027   /* move the data into the send scatter */
1028   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
1029   count = 0;
1030   for ( i=0; i<nrecvs; i++ ) {
1031     values = rvalues + i*nmax;
1032     for ( j=0; j<lens[i]; j++ ) {
1033       lrows[count++] = values[j] - base;
1034     }
1035   }
1036   PetscFree(rvalues); PetscFree(lens);
1037   PetscFree(owner); PetscFree(nprocs);
1038 
1039   /* actually zap the local rows */
1040   ierr = ISCreateGeneral(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
1041   PLogObjectParent(A,istmp);
1042   PetscFree(lrows);
1043   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
1044   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
1045   ierr = ISDestroy(istmp); CHKERRQ(ierr);
1046 
1047   /* wait on sends */
1048   if (nsends) {
1049     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
1050     CHKPTRQ(send_status);
1051     MPI_Waitall(nsends,send_waits,send_status);
1052     PetscFree(send_status);
1053   }
1054   PetscFree(send_waits); PetscFree(svalues);
1055 
1056   return 0;
1057 }
1058 extern int MatPrintHelp_SeqBAIJ(Mat);
1059 static int MatPrintHelp_MPIBAIJ(Mat A)
1060 {
1061   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1062 
1063   if (!a->rank) return MatPrintHelp_SeqBAIJ(a->A);
1064   else return 0;
1065 }
1066 
1067 static int MatConvertSameType_MPIBAIJ(Mat,Mat *,int);
1068 
1069 /* -------------------------------------------------------------------*/
1070 static struct _MatOps MatOps = {
1071   MatSetValues_MPIBAIJ,MatGetRow_MPIBAIJ,MatRestoreRow_MPIBAIJ,MatMult_MPIBAIJ,
1072   MatMultAdd_MPIBAIJ,MatMultTrans_MPIBAIJ,MatMultTransAdd_MPIBAIJ,0,
1073   0,0,0,0,
1074   0,0,MatTranspose_MPIBAIJ,MatGetInfo_MPIBAIJ,
1075   0,MatGetDiagonal_MPIBAIJ,MatDiagonalScale_MPIBAIJ,MatNorm_MPIBAIJ,
1076   MatAssemblyBegin_MPIBAIJ,MatAssemblyEnd_MPIBAIJ,0,MatSetOption_MPIBAIJ,
1077   MatZeroEntries_MPIBAIJ,MatZeroRows_MPIBAIJ,0,
1078   0,0,0,MatGetSize_MPIBAIJ,
1079   MatGetLocalSize_MPIBAIJ,MatGetOwnershipRange_MPIBAIJ,0,0,
1080   0,0,0,MatConvertSameType_MPIBAIJ,0,0,
1081   0,0,0,MatGetSubMatrices_MPIBAIJ,
1082   MatIncreaseOverlap_MPIBAIJ,MatGetValues_MPIBAIJ,0,MatPrintHelp_MPIBAIJ,
1083   MatScale_MPIBAIJ,0,0,0,MatGetBlockSize_MPIBAIJ};
1084 
1085 
1086 /*@C
1087    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
1088    (block compressed row).  For good matrix assembly performance
1089    the user should preallocate the matrix storage by setting the parameters
1090    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1091    performance can be increased by more than a factor of 50.
1092 
1093    Input Parameters:
1094 .  comm - MPI communicator
1095 .  bs   - size of blockk
1096 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1097            This value should be the same as the local size used in creating the
1098            y vector for the matrix-vector product y = Ax.
1099 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1100            This value should be the same as the local size used in creating the
1101            x vector for the matrix-vector product y = Ax.
1102 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1103 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1104 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
1105            submatrix  (same for all local rows)
1106 .  d_nzz - array containing the number of block nonzeros in the various block rows
1107            of the in diagonal portion of the local (possibly different for each block
1108            row) or PETSC_NULL.  You must leave room for the diagonal entry even if
1109            it is zero.
1110 .  o_nz  - number of block nonzeros per block row in the off-diagonal portion of local
1111            submatrix (same for all local rows).
1112 .  o_nzz - array containing the number of nonzeros in the various block rows of the
1113            off-diagonal portion of the local submatrix (possibly different for
1114            each block row) or PETSC_NULL.
1115 
1116    Output Parameter:
1117 .  A - the matrix
1118 
1119    Notes:
1120    The user MUST specify either the local or global matrix dimensions
1121    (possibly both).
1122 
1123    Storage Information:
1124    For a square global matrix we define each processor's diagonal portion
1125    to be its local rows and the corresponding columns (a square submatrix);
1126    each processor's off-diagonal portion encompasses the remainder of the
1127    local matrix (a rectangular submatrix).
1128 
1129    The user can specify preallocated storage for the diagonal part of
1130    the local submatrix with either d_nz or d_nnz (not both).  Set
1131    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1132    memory allocation.  Likewise, specify preallocated storage for the
1133    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1134 
1135    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1136    the figure below we depict these three local rows and all columns (0-11).
1137 
1138 $          0 1 2 3 4 5 6 7 8 9 10 11
1139 $         -------------------
1140 $  row 3  |  o o o d d d o o o o o o
1141 $  row 4  |  o o o d d d o o o o o o
1142 $  row 5  |  o o o d d d o o o o o o
1143 $         -------------------
1144 $
1145 
1146    Thus, any entries in the d locations are stored in the d (diagonal)
1147    submatrix, and any entries in the o locations are stored in the
1148    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1149    stored simply in the MATSEQBAIJ format for compressed row storage.
1150 
1151    Now d_nz should indicate the number of nonzeros per row in the d matrix,
1152    and o_nz should indicate the number of nonzeros per row in the o matrix.
1153    In general, for PDE problems in which most nonzeros are near the diagonal,
1154    one expects d_nz >> o_nz.   For large problems you MUST preallocate memory
1155    or you will get TERRIBLE performance; see the users' manual chapter on
1156    matrices and the file $(PETSC_DIR)/Performance.
1157 
1158 .keywords: matrix, block, aij, compressed row, sparse, parallel
1159 
1160 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues()
1161 @*/
1162 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
1163                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1164 {
1165   Mat          B;
1166   Mat_MPIBAIJ  *b;
1167   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE,size;
1168 
1169   if (bs < 1) SETERRQ(1,"MatCreateMPIBAIJ: invalid block size specified");
1170   *A = 0;
1171   PetscHeaderCreate(B,_Mat,MAT_COOKIE,MATMPIBAIJ,comm);
1172   PLogObjectCreate(B);
1173   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
1174   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
1175   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1176   MPI_Comm_size(comm,&size);
1177   if (size == 1) {
1178     B->ops.getrowij          = MatGetRowIJ_MPIBAIJ;
1179     B->ops.restorerowij      = MatRestoreRowIJ_MPIBAIJ;
1180     B->ops.lufactorsymbolic  = MatLUFactorSymbolic_MPIBAIJ;
1181     B->ops.lufactornumeric   = MatLUFactorNumeric_MPIBAIJ;
1182     B->ops.lufactor          = MatLUFactor_MPIBAIJ;
1183     B->ops.solve             = MatSolve_MPIBAIJ;
1184     B->ops.solveadd          = MatSolveAdd_MPIBAIJ;
1185     B->ops.solvetrans        = MatSolveTrans_MPIBAIJ;
1186     B->ops.solvetransadd     = MatSolveTransAdd_MPIBAIJ;
1187     B->ops.ilufactorsymbolic = MatILUFactorSymbolic_MPIBAIJ;
1188   }
1189 
1190   B->destroy    = MatDestroy_MPIBAIJ;
1191   B->view       = MatView_MPIBAIJ;
1192 
1193   B->factor     = 0;
1194   B->assembled  = PETSC_FALSE;
1195 
1196   b->insertmode = NOT_SET_VALUES;
1197   MPI_Comm_rank(comm,&b->rank);
1198   MPI_Comm_size(comm,&b->size);
1199 
1200   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
1201     SETERRQ(1,"MatCreateMPIBAIJ:Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1202   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) SETERRQ(1,"MatCreateMPIBAIJ: either M or m should be specified");
1203   if ( M == PETSC_DECIDE && n == PETSC_DECIDE)SETERRQ(1,"MatCreateMPIBAIJ: either N or n should be specified");
1204   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
1205   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
1206 
1207   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1208     work[0] = m; work[1] = n;
1209     mbs = m/bs; nbs = n/bs;
1210     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
1211     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
1212     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
1213   }
1214   if (m == PETSC_DECIDE) {
1215     Mbs = M/bs;
1216     if (Mbs*bs != M) SETERRQ(1,"MatCreateMPIBAIJ: No of global rows must be divisible by blocksize");
1217     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
1218     m   = mbs*bs;
1219   }
1220   if (n == PETSC_DECIDE) {
1221     Nbs = N/bs;
1222     if (Nbs*bs != N) SETERRQ(1,"MatCreateMPIBAIJ: No of global cols must be divisible by blocksize");
1223     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
1224     n   = nbs*bs;
1225   }
1226   if (mbs*bs != m || nbs*bs != n) SETERRQ(1,"MatCreateMPIBAIJ: No of local rows, cols must be divisible by blocksize");
1227 
1228   b->m = m; B->m = m;
1229   b->n = n; B->n = n;
1230   b->N = N; B->N = N;
1231   b->M = M; B->M = M;
1232   b->bs  = bs;
1233   b->bs2 = bs*bs;
1234   b->mbs = mbs;
1235   b->nbs = nbs;
1236   b->Mbs = Mbs;
1237   b->Nbs = Nbs;
1238 
1239   /* build local table of row and column ownerships */
1240   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1241   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
1242   b->cowners = b->rowners + b->size + 2;
1243   MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
1244   b->rowners[0] = 0;
1245   for ( i=2; i<=b->size; i++ ) {
1246     b->rowners[i] += b->rowners[i-1];
1247   }
1248   b->rstart = b->rowners[b->rank];
1249   b->rend   = b->rowners[b->rank+1];
1250   MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
1251   b->cowners[0] = 0;
1252   for ( i=2; i<=b->size; i++ ) {
1253     b->cowners[i] += b->cowners[i-1];
1254   }
1255   b->cstart = b->cowners[b->rank];
1256   b->cend   = b->cowners[b->rank+1];
1257 
1258   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1259   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1260   PLogObjectParent(B,b->A);
1261   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1262   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1263   PLogObjectParent(B,b->B);
1264 
1265   /* build cache for off array entries formed */
1266   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1267   b->colmap      = 0;
1268   b->garray      = 0;
1269   b->roworiented = 1;
1270 
1271   /* stuff used for matrix vector multiply */
1272   b->lvec      = 0;
1273   b->Mvctx     = 0;
1274 
1275   /* stuff for MatGetRow() */
1276   b->rowindices   = 0;
1277   b->rowvalues    = 0;
1278   b->getrowactive = PETSC_FALSE;
1279 
1280   *A = B;
1281   return 0;
1282 }
1283 static int MatConvertSameType_MPIBAIJ(Mat matin,Mat *newmat,int cpvalues)
1284 {
1285   Mat        mat;
1286   Mat_MPIBAIJ *a,*oldmat = (Mat_MPIBAIJ *) matin->data;
1287   int        ierr, len=0, flg;
1288 
1289   *newmat       = 0;
1290   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIBAIJ,matin->comm);
1291   PLogObjectCreate(mat);
1292   mat->data       = (void *) (a = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(a);
1293   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1294   mat->destroy    = MatDestroy_MPIBAIJ;
1295   mat->view       = MatView_MPIBAIJ;
1296   mat->factor     = matin->factor;
1297   mat->assembled  = PETSC_TRUE;
1298 
1299   a->m = mat->m   = oldmat->m;
1300   a->n = mat->n   = oldmat->n;
1301   a->M = mat->M   = oldmat->M;
1302   a->N = mat->N   = oldmat->N;
1303 
1304   a->bs  = oldmat->bs;
1305   a->bs2 = oldmat->bs2;
1306   a->mbs = oldmat->mbs;
1307   a->nbs = oldmat->nbs;
1308   a->Mbs = oldmat->Mbs;
1309   a->Nbs = oldmat->Nbs;
1310 
1311   a->rstart       = oldmat->rstart;
1312   a->rend         = oldmat->rend;
1313   a->cstart       = oldmat->cstart;
1314   a->cend         = oldmat->cend;
1315   a->size         = oldmat->size;
1316   a->rank         = oldmat->rank;
1317   a->insertmode   = NOT_SET_VALUES;
1318   a->rowvalues    = 0;
1319   a->getrowactive = PETSC_FALSE;
1320 
1321   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1322   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
1323   a->cowners = a->rowners + a->size + 2;
1324   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1325   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1326   if (oldmat->colmap) {
1327     a->colmap = (int *) PetscMalloc((a->Nbs)*sizeof(int));CHKPTRQ(a->colmap);
1328     PLogObjectMemory(mat,(a->Nbs)*sizeof(int));
1329     PetscMemcpy(a->colmap,oldmat->colmap,(a->Nbs)*sizeof(int));
1330   } else a->colmap = 0;
1331   if (oldmat->garray && (len = ((Mat_SeqBAIJ *) (oldmat->B->data))->nbs)) {
1332     a->garray = (int *) PetscMalloc(len*sizeof(int)); CHKPTRQ(a->garray);
1333     PLogObjectMemory(mat,len*sizeof(int));
1334     PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1335   } else a->garray = 0;
1336 
1337   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1338   PLogObjectParent(mat,a->lvec);
1339   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1340   PLogObjectParent(mat,a->Mvctx);
1341   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1342   PLogObjectParent(mat,a->A);
1343   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1344   PLogObjectParent(mat,a->B);
1345   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1346   if (flg) {
1347     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1348   }
1349   *newmat = mat;
1350   return 0;
1351 }
1352 
1353 #include "sys.h"
1354 
1355 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
1356 {
1357   Mat          A;
1358   int          i, nz, ierr, j,rstart, rend, fd;
1359   Scalar       *vals,*buf;
1360   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1361   MPI_Status   status;
1362   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
1363   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
1364   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,bs2,Mbs,mbs,extra_rows;
1365   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
1366   int          dcount,kmax,k,nzcount,tmp;
1367 
1368 
1369   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
1370   bs2  = bs*bs;
1371 
1372   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1373   if (!rank) {
1374     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1375     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
1376     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIBAIJ:not matrix object");
1377   }
1378 
1379   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1380   M = header[1]; N = header[2];
1381 
1382   if (M != N) SETERRQ(1,"MatLoad_SeqBAIJ:Can only do square matrices");
1383 
1384   /*
1385      This code adds extra rows to make sure the number of rows is
1386      divisible by the blocksize
1387   */
1388   Mbs        = M/bs;
1389   extra_rows = bs - M + bs*(Mbs);
1390   if (extra_rows == bs) extra_rows = 0;
1391   else                  Mbs++;
1392   if (extra_rows &&!rank) {
1393     PLogInfo(0,"MatLoad_SeqBAIJ:Padding loaded matrix to match blocksize\n");
1394   }
1395 
1396   /* determine ownership of all rows */
1397   mbs = Mbs/size + ((Mbs % size) > rank);
1398   m   = mbs * bs;
1399   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
1400   browners = rowners + size + 1;
1401   MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1402   rowners[0] = 0;
1403   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
1404   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
1405   rstart = rowners[rank];
1406   rend   = rowners[rank+1];
1407 
1408   /* distribute row lengths to all processors */
1409   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
1410   if (!rank) {
1411     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
1412     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1413     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
1414     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1415     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
1416     MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);
1417     PetscFree(sndcounts);
1418   }
1419   else {
1420     MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);
1421   }
1422 
1423   if (!rank) {
1424     /* calculate the number of nonzeros on each processor */
1425     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1426     PetscMemzero(procsnz,size*sizeof(int));
1427     for ( i=0; i<size; i++ ) {
1428       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
1429         procsnz[i] += rowlengths[j];
1430       }
1431     }
1432     PetscFree(rowlengths);
1433 
1434     /* determine max buffer needed and allocate it */
1435     maxnz = 0;
1436     for ( i=0; i<size; i++ ) {
1437       maxnz = PetscMax(maxnz,procsnz[i]);
1438     }
1439     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1440 
1441     /* read in my part of the matrix column indices  */
1442     nz = procsnz[0];
1443     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1444     mycols = ibuf;
1445     if (size == 1)  nz -= extra_rows;
1446     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1447     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
1448 
1449     /* read in every ones (except the last) and ship off */
1450     for ( i=1; i<size-1; i++ ) {
1451       nz = procsnz[i];
1452       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1453       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1454     }
1455     /* read in the stuff for the last proc */
1456     if ( size != 1 ) {
1457       nz = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
1458       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1459       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
1460       MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);
1461     }
1462     PetscFree(cols);
1463   }
1464   else {
1465     /* determine buffer space needed for message */
1466     nz = 0;
1467     for ( i=0; i<m; i++ ) {
1468       nz += locrowlens[i];
1469     }
1470     ibuf = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1471     mycols = ibuf;
1472     /* receive message of column indices*/
1473     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1474     MPI_Get_count(&status,MPI_INT,&maxnz);
1475     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIBAIJ:something is wrong with file");
1476   }
1477 
1478   /* loop over local rows, determining number of off diagonal entries */
1479   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
1480   odlens = dlens + (rend-rstart);
1481   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
1482   PetscMemzero(mask,3*Mbs*sizeof(int));
1483   masked1 = mask    + Mbs;
1484   masked2 = masked1 + Mbs;
1485   rowcount = 0; nzcount = 0;
1486   for ( i=0; i<mbs; i++ ) {
1487     dcount  = 0;
1488     odcount = 0;
1489     for ( j=0; j<bs; j++ ) {
1490       kmax = locrowlens[rowcount];
1491       for ( k=0; k<kmax; k++ ) {
1492         tmp = mycols[nzcount++]/bs;
1493         if (!mask[tmp]) {
1494           mask[tmp] = 1;
1495           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
1496           else masked1[dcount++] = tmp;
1497         }
1498       }
1499       rowcount++;
1500     }
1501 
1502     dlens[i]  = dcount;
1503     odlens[i] = odcount;
1504 
1505     /* zero out the mask elements we set */
1506     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
1507     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
1508   }
1509 
1510   /* create our matrix */
1511   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);
1512          CHKERRQ(ierr);
1513   A = *newmat;
1514   MatSetOption(A,MAT_COLUMNS_SORTED);
1515 
1516   if (!rank) {
1517     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
1518     /* read in my part of the matrix numerical values  */
1519     nz = procsnz[0];
1520     vals = buf;
1521     mycols = ibuf;
1522     if (size == 1)  nz -= extra_rows;
1523     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1524     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
1525 
1526     /* insert into matrix */
1527     jj      = rstart*bs;
1528     for ( i=0; i<m; i++ ) {
1529       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1530       mycols += locrowlens[i];
1531       vals   += locrowlens[i];
1532       jj++;
1533     }
1534     /* read in other processors (except the last one) and ship out */
1535     for ( i=1; i<size-1; i++ ) {
1536       nz = procsnz[i];
1537       vals = buf;
1538       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1539       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1540     }
1541     /* the last proc */
1542     if ( size != 1 ){
1543       nz = procsnz[i] - extra_rows;
1544       vals = buf;
1545       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1546       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
1547       MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);
1548     }
1549     PetscFree(procsnz);
1550   }
1551   else {
1552     /* receive numeric values */
1553     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
1554 
1555     /* receive message of values*/
1556     vals = buf;
1557     mycols = ibuf;
1558     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1559     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1560     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIBAIJ:something is wrong with file");
1561 
1562     /* insert into matrix */
1563     jj      = rstart*bs;
1564     for ( i=0; i<m; i++ ) {
1565       ierr    = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1566       mycols += locrowlens[i];
1567       vals   += locrowlens[i];
1568       jj++;
1569     }
1570   }
1571   PetscFree(locrowlens);
1572   PetscFree(buf);
1573   PetscFree(ibuf);
1574   PetscFree(rowners);
1575   PetscFree(dlens);
1576   PetscFree(mask);
1577   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1578   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1579   return 0;
1580 }
1581 
1582 
1583