xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision 082acdae65a2be6366aba0e8cc025c52f06633cb)
1 #ifndef lint
2 static char vcid[] = "$Id: mpibaij.c,v 1.40 1996/12/03 15:49:52 balay Exp balay $";
3 #endif
4 
5 #include "src/mat/impls/baij/mpi/mpibaij.h"
6 #include "src/vec/vecimpl.h"
7 
8 #include "draw.h"
9 #include "pinclude/pviewer.h"
10 
11 extern int MatSetUpMultiply_MPIBAIJ(Mat);
12 extern int DisAssemble_MPIBAIJ(Mat);
13 extern int MatIncreaseOverlap_MPIBAIJ(Mat,int,IS *,int);
14 extern int MatGetSubMatrices_MPIBAIJ(Mat,int,IS *,IS *,MatGetSubMatrixCall,Mat **);
15 extern int MatLUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,Mat *);
16 extern int MatLUFactorNumeric_MPIBAIJ(Mat,Mat *);
17 extern int MatLUFactor_MPIBAIJ(Mat,IS,IS,double);
18 extern int MatSolve_MPIBAIJ(Mat,Vec,Vec);
19 extern int MatSolveAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
20 extern int MatSolveTrans_MPIBAIJ(Mat,Vec,Vec);
21 extern int MatSolveTransAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
22 extern int MatILUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,int,Mat *);
23 
24 
25 /*
26      Local utility routine that creates a mapping from the global column
27    number to the local number in the off-diagonal part of the local
28    storage of the matrix.  This is done in a non scable way since the
29    length of colmap equals the global matrix length.
30 */
31 static int CreateColmap_MPIBAIJ_Private(Mat mat)
32 {
33   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
34   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
35   int         nbs = B->nbs,i,bs=B->bs;;
36 
37   baij->colmap = (int *) PetscMalloc(baij->Nbs*sizeof(int));CHKPTRQ(baij->colmap);
38   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
39   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
40   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i*bs+1;
41   return 0;
42 }
43 
44 static int MatGetRowIJ_MPIBAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
45                             PetscTruth *done)
46 {
47   Mat_MPIBAIJ *aij = (Mat_MPIBAIJ *) mat->data;
48   int         ierr;
49   if (aij->size == 1) {
50     ierr = MatGetRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
51   } else SETERRQ(1,"MatGetRowIJ_MPIBAIJ:not supported in parallel");
52   return 0;
53 }
54 
55 static int MatRestoreRowIJ_MPIBAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
56                                 PetscTruth *done)
57 {
58   Mat_MPIBAIJ *aij = (Mat_MPIBAIJ *) mat->data;
59   int        ierr;
60   if (aij->size == 1) {
61     ierr = MatRestoreRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
62   } else SETERRQ(1,"MatRestoreRowIJ_MPIBAIJ:not supported in parallel");
63   return 0;
64 }
65 
66 extern int MatSetValues_SeqBAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
67 static int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
68 {
69   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
70   Scalar      value;
71   int         ierr,i,j,row,col;
72   int         roworiented = baij->roworiented,rstart_orig=baij->rstart_bs ;
73   int         rend_orig=baij->rend_bs,cstart_orig=baij->cstart_bs;
74   int         cend_orig=baij->cend_bs,bs=baij->bs;
75 
76 #if defined(PETSC_BOPT_g)
77   if (baij->insertmode != NOT_SET_VALUES && baij->insertmode != addv) {
78     SETERRQ(1,"MatSetValues_MPIBAIJ:Cannot mix inserts and adds");
79   }
80 #endif
81   baij->insertmode = addv;
82   for ( i=0; i<m; i++ ) {
83 #if defined(PETSC_BOPT_g)
84     if (im[i] < 0) SETERRQ(1,"MatSetValues_MPIBAIJ:Negative row");
85     if (im[i] >= baij->M) SETERRQ(1,"MatSetValues_MPIBAIJ:Row too large");
86 #endif
87     if (im[i] >= rstart_orig && im[i] < rend_orig) {
88       row = im[i] - rstart_orig;
89       for ( j=0; j<n; j++ ) {
90         if (in[j] >= cstart_orig && in[j] < cend_orig){
91           col = in[j] - cstart_orig;
92           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
93           ierr = MatSetValues_SeqBAIJ(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
94         }
95 #if defined(PETSC_BOPT_g)
96         else if (in[j] < 0) {SETERRQ(1,"MatSetValues_MPIBAIJ:Negative column");}
97         else if (in[j] >= baij->N) {SETERRQ(1,"MatSetValues_MPIBAIJ:Col too large");}
98 #endif
99         else {
100           if (mat->was_assembled) {
101             if (!baij->colmap) {
102               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
103             }
104             col = baij->colmap[in[j]/bs] - 1 + in[j]%bs;
105             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
106               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
107               col =  in[j];
108             }
109           }
110           else col = in[j];
111           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
112           ierr = MatSetValues_SeqBAIJ(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
113         }
114       }
115     }
116     else {
117       if (roworiented && !baij->donotstash) {
118         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
119       }
120       else {
121         if (!baij->donotstash) {
122           row = im[i];
123 	  for ( j=0; j<n; j++ ) {
124 	    ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
125           }
126         }
127       }
128     }
129   }
130   return 0;
131 }
132 
133 static int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
134 {
135   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
136   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
137   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col;
138 
139   for ( i=0; i<m; i++ ) {
140     if (idxm[i] < 0) SETERRQ(1,"MatGetValues_MPIBAIJ:Negative row");
141     if (idxm[i] >= baij->M) SETERRQ(1,"MatGetValues_MPIBAIJ:Row too large");
142     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
143       row = idxm[i] - bsrstart;
144       for ( j=0; j<n; j++ ) {
145         if (idxn[j] < 0) SETERRQ(1,"MatGetValues_MPIBAIJ:Negative column");
146         if (idxn[j] >= baij->N) SETERRQ(1,"MatGetValues_MPIBAIJ:Col too large");
147         if (idxn[j] >= bscstart && idxn[j] < bscend){
148           col = idxn[j] - bscstart;
149           ierr = MatGetValues(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
150         }
151         else {
152           if (!baij->colmap) {
153             ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
154           }
155           if((baij->colmap[idxn[j]/bs]-1 < 0) ||
156              (baij->garray[(baij->colmap[idxn[j]/bs]-1)/bs] != idxn[j]/bs)) *(v+i*n+j) = 0.0;
157           else {
158             col  = (baij->colmap[idxn[j]/bs]-1) + idxn[j]%bs;
159             ierr = MatGetValues(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
160           }
161         }
162       }
163     }
164     else {
165       SETERRQ(1,"MatGetValues_MPIBAIJ:Only local values currently supported");
166     }
167   }
168   return 0;
169 }
170 
171 static int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
172 {
173   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
174   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
175   int        ierr, i,bs2=baij->bs2;
176   double     sum = 0.0;
177   Scalar     *v;
178 
179   if (baij->size == 1) {
180     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
181   } else {
182     if (type == NORM_FROBENIUS) {
183       v = amat->a;
184       for (i=0; i<amat->nz*bs2; i++ ) {
185 #if defined(PETSC_COMPLEX)
186         sum += real(conj(*v)*(*v)); v++;
187 #else
188         sum += (*v)*(*v); v++;
189 #endif
190       }
191       v = bmat->a;
192       for (i=0; i<bmat->nz*bs2; i++ ) {
193 #if defined(PETSC_COMPLEX)
194         sum += real(conj(*v)*(*v)); v++;
195 #else
196         sum += (*v)*(*v); v++;
197 #endif
198       }
199       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
200       *norm = sqrt(*norm);
201     }
202     else
203       SETERRQ(PETSC_ERR_SUP,"MatNorm_MPIBAIJ:No support for this norm yet");
204   }
205   return 0;
206 }
207 
208 static int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
209 {
210   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
211   MPI_Comm    comm = mat->comm;
212   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
213   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
214   MPI_Request *send_waits,*recv_waits;
215   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
216   InsertMode  addv;
217   Scalar      *rvalues,*svalues;
218 
219   /* make sure all processors are either in INSERTMODE or ADDMODE */
220   MPI_Allreduce(&baij->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
221   if (addv == (ADD_VALUES|INSERT_VALUES)) {
222     SETERRQ(1,"MatAssemblyBegin_MPIBAIJ:Some processors inserted others added");
223   }
224   baij->insertmode = addv; /* in case this processor had no cache */
225 
226   /*  first count number of contributors to each processor */
227   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
228   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
229   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
230   for ( i=0; i<baij->stash.n; i++ ) {
231     idx = baij->stash.idx[i];
232     for ( j=0; j<size; j++ ) {
233       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
234         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
235       }
236     }
237   }
238   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
239 
240   /* inform other processors of number of messages and max length*/
241   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
242   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
243   nreceives = work[rank];
244   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
245   nmax = work[rank];
246   PetscFree(work);
247 
248   /* post receives:
249        1) each message will consist of ordered pairs
250      (global index,value) we store the global index as a double
251      to simplify the message passing.
252        2) since we don't know how long each individual message is we
253      allocate the largest needed buffer for each receive. Potentially
254      this is a lot of wasted space.
255 
256 
257        This could be done better.
258   */
259   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
260   CHKPTRQ(rvalues);
261   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
262   CHKPTRQ(recv_waits);
263   for ( i=0; i<nreceives; i++ ) {
264     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
265               comm,recv_waits+i);
266   }
267 
268   /* do sends:
269       1) starts[i] gives the starting index in svalues for stuff going to
270          the ith processor
271   */
272   svalues = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
273   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
274   CHKPTRQ(send_waits);
275   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
276   starts[0] = 0;
277   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
278   for ( i=0; i<baij->stash.n; i++ ) {
279     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
280     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
281     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
282   }
283   PetscFree(owner);
284   starts[0] = 0;
285   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
286   count = 0;
287   for ( i=0; i<size; i++ ) {
288     if (procs[i]) {
289       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
290                 comm,send_waits+count++);
291     }
292   }
293   PetscFree(starts); PetscFree(nprocs);
294 
295   /* Free cache space */
296   PLogInfo(mat,"MatAssemblyBegin_MPIBAIJ:Number of off-processor values %d\n",baij->stash.n);
297   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
298 
299   baij->svalues    = svalues;    baij->rvalues    = rvalues;
300   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
301   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
302   baij->rmax       = nmax;
303 
304   return 0;
305 }
306 
307 
308 static int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
309 {
310   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
311   MPI_Status  *send_status,recv_status;
312   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
313   int         bs=baij->bs,row,col,other_disassembled;
314   Scalar      *values,val;
315   InsertMode  addv = baij->insertmode;
316 
317   /*  wait on receives */
318   while (count) {
319     MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);
320     /* unpack receives into our local space */
321     values = baij->rvalues + 3*imdex*baij->rmax;
322     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
323     n = n/3;
324     for ( i=0; i<n; i++ ) {
325       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
326       col = (int) PetscReal(values[3*i+1]);
327       val = values[3*i+2];
328       if (col >= baij->cstart*bs && col < baij->cend*bs) {
329         col -= baij->cstart*bs;
330         MatSetValues(baij->A,1,&row,1,&col,&val,addv);
331       }
332       else {
333         if (mat->was_assembled) {
334           if (!baij->colmap) {
335             ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);
336           }
337           col = (baij->colmap[col/bs]-1)*bs + col%bs;
338           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
339             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
340             col = (int) PetscReal(values[3*i+1]);
341           }
342         }
343         MatSetValues(baij->B,1,&row,1,&col,&val,addv);
344       }
345     }
346     count--;
347   }
348   PetscFree(baij->recv_waits); PetscFree(baij->rvalues);
349 
350   /* wait on sends */
351   if (baij->nsends) {
352     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));
353     CHKPTRQ(send_status);
354     MPI_Waitall(baij->nsends,baij->send_waits,send_status);
355     PetscFree(send_status);
356   }
357   PetscFree(baij->send_waits); PetscFree(baij->svalues);
358 
359   baij->insertmode = NOT_SET_VALUES;
360   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
361   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
362 
363   /* determine if any processor has disassembled, if so we must
364      also disassemble ourselfs, in order that we may reassemble. */
365   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
366   if (mat->was_assembled && !other_disassembled) {
367     ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
368   }
369 
370   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
371     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
372   }
373   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
374   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
375 
376   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
377   return 0;
378 }
379 
380 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
381 {
382   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
383   int          ierr;
384 
385   if (baij->size == 1) {
386     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
387   }
388   else SETERRQ(1,"MatView_MPIBAIJ_Binary:Only uniprocessor output supported");
389   return 0;
390 }
391 
392 static int MatView_MPIBAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
393 {
394   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
395   int          ierr, format,rank,bs = baij->bs;
396   FILE         *fd;
397   ViewerType   vtype;
398 
399   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
400   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
401     ierr = ViewerGetFormat(viewer,&format);
402     if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
403       MatInfo info;
404       MPI_Comm_rank(mat->comm,&rank);
405       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
406       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
407       PetscSequentialPhaseBegin(mat->comm,1);
408       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
409               rank,baij->m,(int)info.nz_used*bs,(int)info.nz_allocated*bs,
410               baij->bs,(int)info.memory);
411       ierr = MatGetInfo(baij->A,MAT_LOCAL,&info);
412       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
413       ierr = MatGetInfo(baij->B,MAT_LOCAL,&info);
414       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
415       fflush(fd);
416       PetscSequentialPhaseEnd(mat->comm,1);
417       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
418       return 0;
419     }
420     else if (format == VIEWER_FORMAT_ASCII_INFO) {
421       PetscPrintf(mat->comm,"  block size is %d\n",bs);
422       return 0;
423     }
424   }
425 
426   if (vtype == DRAW_VIEWER) {
427     Draw       draw;
428     PetscTruth isnull;
429     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
430     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
431   }
432 
433   if (vtype == ASCII_FILE_VIEWER) {
434     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
435     PetscSequentialPhaseBegin(mat->comm,1);
436     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
437            baij->rank,baij->m,baij->rstart*bs,baij->rend*bs,baij->n,
438             baij->cstart*bs,baij->cend*bs);
439     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
440     ierr = MatView(baij->B,viewer); CHKERRQ(ierr);
441     fflush(fd);
442     PetscSequentialPhaseEnd(mat->comm,1);
443   }
444   else {
445     int size = baij->size;
446     rank = baij->rank;
447     if (size == 1) {
448       ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
449     }
450     else {
451       /* assemble the entire matrix onto first processor. */
452       Mat         A;
453       Mat_SeqBAIJ *Aloc;
454       int         M = baij->M, N = baij->N,*ai,*aj,row,col,i,j,k,*rvals;
455       int         mbs=baij->mbs;
456       Scalar      *a;
457 
458       if (!rank) {
459         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
460         CHKERRQ(ierr);
461       }
462       else {
463         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
464         CHKERRQ(ierr);
465       }
466       PLogObjectParent(mat,A);
467 
468       /* copy over the A part */
469       Aloc = (Mat_SeqBAIJ*) baij->A->data;
470       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
471       row = baij->rstart;
472       rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
473 
474       for ( i=0; i<mbs; i++ ) {
475         rvals[0] = bs*(baij->rstart + i);
476         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
477         for ( j=ai[i]; j<ai[i+1]; j++ ) {
478           col = (baij->cstart+aj[j])*bs;
479           for (k=0; k<bs; k++ ) {
480             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
481             col++; a += bs;
482           }
483         }
484       }
485       /* copy over the B part */
486       Aloc = (Mat_SeqBAIJ*) baij->B->data;
487       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
488       row = baij->rstart*bs;
489       for ( i=0; i<mbs; i++ ) {
490         rvals[0] = bs*(baij->rstart + i);
491         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
492         for ( j=ai[i]; j<ai[i+1]; j++ ) {
493           col = baij->garray[aj[j]]*bs;
494           for (k=0; k<bs; k++ ) {
495             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
496             col++; a += bs;
497           }
498         }
499       }
500       PetscFree(rvals);
501       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
502       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
503       if (!rank) {
504         ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
505       }
506       ierr = MatDestroy(A); CHKERRQ(ierr);
507     }
508   }
509   return 0;
510 }
511 
512 
513 
514 static int MatView_MPIBAIJ(PetscObject obj,Viewer viewer)
515 {
516   Mat         mat = (Mat) obj;
517   int         ierr;
518   ViewerType  vtype;
519 
520   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
521   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
522       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
523     ierr = MatView_MPIBAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
524   }
525   else if (vtype == BINARY_FILE_VIEWER) {
526     return MatView_MPIBAIJ_Binary(mat,viewer);
527   }
528   return 0;
529 }
530 
531 static int MatDestroy_MPIBAIJ(PetscObject obj)
532 {
533   Mat         mat = (Mat) obj;
534   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
535   int         ierr;
536 
537 #if defined(PETSC_LOG)
538   PLogObjectState(obj,"Rows=%d, Cols=%d",baij->M,baij->N);
539 #endif
540 
541   PetscFree(baij->rowners);
542   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
543   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
544   if (baij->colmap) PetscFree(baij->colmap);
545   if (baij->garray) PetscFree(baij->garray);
546   if (baij->lvec)   VecDestroy(baij->lvec);
547   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
548   if (baij->rowvalues) PetscFree(baij->rowvalues);
549   PetscFree(baij);
550   if (mat->mapping) {
551     ierr = ISLocalToGlobalMappingDestroy(mat->mapping); CHKERRQ(ierr);
552   }
553   PLogObjectDestroy(mat);
554   PetscHeaderDestroy(mat);
555   return 0;
556 }
557 
558 static int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
559 {
560   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
561   int         ierr, nt;
562 
563   VecGetLocalSize_Fast(xx,nt);
564   if (nt != a->n) {
565     SETERRQ(1,"MatMult_MPIBAIJ:Incompatible parition of A and xx");
566   }
567   VecGetLocalSize_Fast(yy,nt);
568   if (nt != a->m) {
569     SETERRQ(1,"MatMult_MPIBAIJ:Incompatible parition of A and yy");
570   }
571   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
572   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
573   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
574   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
575   ierr = VecScatterPostRecvs(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
576   return 0;
577 }
578 
579 static int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
580 {
581   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
582   int        ierr;
583   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
584   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
585   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
586   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
587   return 0;
588 }
589 
590 static int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
591 {
592   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
593   int        ierr;
594 
595   /* do nondiagonal part */
596   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
597   /* send it on its way */
598   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
599   /* do local part */
600   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
601   /* receive remote parts: note this assumes the values are not actually */
602   /* inserted in yy until the next line, which is true for my implementation*/
603   /* but is not perhaps always true. */
604   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
605   return 0;
606 }
607 
608 static int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
609 {
610   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
611   int        ierr;
612 
613   /* do nondiagonal part */
614   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
615   /* send it on its way */
616   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
617   /* do local part */
618   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
619   /* receive remote parts: note this assumes the values are not actually */
620   /* inserted in yy until the next line, which is true for my implementation*/
621   /* but is not perhaps always true. */
622   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
623   return 0;
624 }
625 
626 /*
627   This only works correctly for square matrices where the subblock A->A is the
628    diagonal block
629 */
630 static int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
631 {
632   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
633   if (a->M != a->N)
634     SETERRQ(1,"MatGetDiagonal_MPIBAIJ:Supports only square matrix where A->A is diag block");
635   return MatGetDiagonal(a->A,v);
636 }
637 
638 static int MatScale_MPIBAIJ(Scalar *aa,Mat A)
639 {
640   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
641   int        ierr;
642   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
643   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
644   return 0;
645 }
646 static int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
647 {
648   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
649   *m = mat->M; *n = mat->N;
650   return 0;
651 }
652 
653 static int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
654 {
655   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
656   *m = mat->m; *n = mat->N;
657   return 0;
658 }
659 
660 static int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
661 {
662   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
663   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
664   return 0;
665 }
666 
667 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
668 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
669 
670 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
671 {
672   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
673   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
674   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
675   int        nztot, nzA, nzB, lrow, brstart = mat->rstart*bs, brend = mat->rend*bs;
676   int        *cmap, *idx_p,cstart = mat->cstart;
677 
678   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,"MatGetRow_MPIBAIJ:Already active");
679   mat->getrowactive = PETSC_TRUE;
680 
681   if (!mat->rowvalues && (idx || v)) {
682     /*
683         allocate enough space to hold information from the longest row.
684     */
685     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
686     int     max = 1,mbs = mat->mbs,tmp;
687     for ( i=0; i<mbs; i++ ) {
688       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
689       if (max < tmp) { max = tmp; }
690     }
691     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
692     CHKPTRQ(mat->rowvalues);
693     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
694   }
695 
696 
697   if (row < brstart || row >= brend) SETERRQ(1,"MatGetRow_MPIBAIJ:Only local rows")
698   lrow = row - brstart;
699 
700   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
701   if (!v)   {pvA = 0; pvB = 0;}
702   if (!idx) {pcA = 0; if (!v) pcB = 0;}
703   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
704   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
705   nztot = nzA + nzB;
706 
707   cmap  = mat->garray;
708   if (v  || idx) {
709     if (nztot) {
710       /* Sort by increasing column numbers, assuming A and B already sorted */
711       int imark = -1;
712       if (v) {
713         *v = v_p = mat->rowvalues;
714         for ( i=0; i<nzB; i++ ) {
715           if (cmap[cworkB[i]/bs] < cstart)   v_p[i] = vworkB[i];
716           else break;
717         }
718         imark = i;
719         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
720         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
721       }
722       if (idx) {
723         *idx = idx_p = mat->rowindices;
724         if (imark > -1) {
725           for ( i=0; i<imark; i++ ) {
726             idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs;
727           }
728         } else {
729           for ( i=0; i<nzB; i++ ) {
730             if (cmap[cworkB[i]/bs] < cstart)
731               idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
732             else break;
733           }
734           imark = i;
735         }
736         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart*bs + cworkA[i];
737         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
738       }
739     }
740     else {
741       if (idx) *idx = 0;
742       if (v)   *v   = 0;
743     }
744   }
745   *nz = nztot;
746   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
747   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
748   return 0;
749 }
750 
751 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
752 {
753   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
754   if (baij->getrowactive == PETSC_FALSE) {
755     SETERRQ(1,"MatRestoreRow_MPIBAIJ:MatGetRow not called");
756   }
757   baij->getrowactive = PETSC_FALSE;
758   return 0;
759 }
760 
761 static int MatGetBlockSize_MPIBAIJ(Mat mat,int *bs)
762 {
763   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
764   *bs = baij->bs;
765   return 0;
766 }
767 
768 static int MatZeroEntries_MPIBAIJ(Mat A)
769 {
770   Mat_MPIBAIJ *l = (Mat_MPIBAIJ *) A->data;
771   int         ierr;
772   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
773   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
774   return 0;
775 }
776 
777 static int MatGetInfo_MPIBAIJ(Mat matin,MatInfoType flag,MatInfo *info)
778 {
779   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) matin->data;
780   Mat         A = a->A, B = a->B;
781   int         ierr;
782   double      isend[5], irecv[5];
783 
784   info->rows_global    = (double)a->M;
785   info->columns_global = (double)a->N;
786   info->rows_local     = (double)a->m;
787   info->columns_local  = (double)a->N;
788   info->block_size     = (double)a->bs;
789   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
790   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->memory;
791   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
792   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->memory;
793   if (flag == MAT_LOCAL) {
794     info->nz_used      = isend[0];
795     info->nz_allocated = isend[1];
796     info->nz_unneeded  = isend[2];
797     info->memory       = isend[3];
798     info->mallocs      = isend[4];
799   } else if (flag == MAT_GLOBAL_MAX) {
800     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_MAX,matin->comm);
801     info->nz_used      = irecv[0];
802     info->nz_allocated = irecv[1];
803     info->nz_unneeded  = irecv[2];
804     info->memory       = irecv[3];
805     info->mallocs      = irecv[4];
806   } else if (flag == MAT_GLOBAL_SUM) {
807     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_SUM,matin->comm);
808     info->nz_used      = irecv[0];
809     info->nz_allocated = irecv[1];
810     info->nz_unneeded  = irecv[2];
811     info->memory       = irecv[3];
812     info->mallocs      = irecv[4];
813   }
814   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
815   info->fill_ratio_needed = 0;
816   info->factor_mallocs    = 0;
817   return 0;
818 }
819 
820 static int MatSetOption_MPIBAIJ(Mat A,MatOption op)
821 {
822   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
823 
824   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
825       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
826       op == MAT_COLUMNS_UNSORTED ||
827       op == MAT_COLUMNS_SORTED) {
828         MatSetOption(a->A,op);
829         MatSetOption(a->B,op);
830   } else if (op == MAT_ROW_ORIENTED) {
831         a->roworiented = 1;
832         MatSetOption(a->A,op);
833         MatSetOption(a->B,op);
834   } else if (op == MAT_ROWS_SORTED ||
835              op == MAT_ROWS_UNSORTED ||
836              op == MAT_SYMMETRIC ||
837              op == MAT_STRUCTURALLY_SYMMETRIC ||
838              op == MAT_YES_NEW_DIAGONALS)
839     PLogInfo(A,"Info:MatSetOption_MPIBAIJ:Option ignored\n");
840   else if (op == MAT_COLUMN_ORIENTED) {
841     a->roworiented = 0;
842     MatSetOption(a->A,op);
843     MatSetOption(a->B,op);
844   } else if (op == MAT_IGNORE_OFF_PROCESSOR_ENTRIES) {
845     a->donotstash = 1;
846   } else if (op == MAT_NO_NEW_DIAGONALS)
847     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIBAIJ:MAT_NO_NEW_DIAGONALS");}
848   else
849     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIBAIJ:unknown option");}
850   return 0;
851 }
852 
853 static int MatTranspose_MPIBAIJ(Mat A,Mat *matout)
854 {
855   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) A->data;
856   Mat_SeqBAIJ *Aloc;
857   Mat        B;
858   int        ierr,M=baij->M,N=baij->N,*ai,*aj,row,i,*rvals,j,k,col;
859   int        bs=baij->bs,mbs=baij->mbs;
860   Scalar     *a;
861 
862   if (matout == PETSC_NULL && M != N)
863     SETERRQ(1,"MatTranspose_MPIBAIJ:Square matrix only for in-place");
864   ierr = MatCreateMPIBAIJ(A->comm,baij->bs,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);
865   CHKERRQ(ierr);
866 
867   /* copy over the A part */
868   Aloc = (Mat_SeqBAIJ*) baij->A->data;
869   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
870   row = baij->rstart;
871   rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
872 
873   for ( i=0; i<mbs; i++ ) {
874     rvals[0] = bs*(baij->rstart + i);
875     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
876     for ( j=ai[i]; j<ai[i+1]; j++ ) {
877       col = (baij->cstart+aj[j])*bs;
878       for (k=0; k<bs; k++ ) {
879         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
880         col++; a += bs;
881       }
882     }
883   }
884   /* copy over the B part */
885   Aloc = (Mat_SeqBAIJ*) baij->B->data;
886   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
887   row = baij->rstart*bs;
888   for ( i=0; i<mbs; i++ ) {
889     rvals[0] = bs*(baij->rstart + i);
890     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
891     for ( j=ai[i]; j<ai[i+1]; j++ ) {
892       col = baij->garray[aj[j]]*bs;
893       for (k=0; k<bs; k++ ) {
894         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
895         col++; a += bs;
896       }
897     }
898   }
899   PetscFree(rvals);
900   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
901   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
902 
903   if (matout != PETSC_NULL) {
904     *matout = B;
905   } else {
906     /* This isn't really an in-place transpose .... but free data structures from baij */
907     PetscFree(baij->rowners);
908     ierr = MatDestroy(baij->A); CHKERRQ(ierr);
909     ierr = MatDestroy(baij->B); CHKERRQ(ierr);
910     if (baij->colmap) PetscFree(baij->colmap);
911     if (baij->garray) PetscFree(baij->garray);
912     if (baij->lvec) VecDestroy(baij->lvec);
913     if (baij->Mvctx) VecScatterDestroy(baij->Mvctx);
914     PetscFree(baij);
915     PetscMemcpy(A,B,sizeof(struct _Mat));
916     PetscHeaderDestroy(B);
917   }
918   return 0;
919 }
920 
921 int MatDiagonalScale_MPIBAIJ(Mat A,Vec ll,Vec rr)
922 {
923   Mat a = ((Mat_MPIBAIJ *) A->data)->A;
924   Mat b = ((Mat_MPIBAIJ *) A->data)->B;
925   int ierr,s1,s2,s3;
926 
927   if (ll)  {
928     ierr = VecGetLocalSize(ll,&s1); CHKERRQ(ierr);
929     ierr = MatGetLocalSize(A,&s2,&s3); CHKERRQ(ierr);
930     if (s1!=s2) SETERRQ(1,"MatDiagonalScale_MPIBAIJ: non-conforming local sizes");
931     ierr = MatDiagonalScale(a,ll,0); CHKERRQ(ierr);
932     ierr = MatDiagonalScale(b,ll,0); CHKERRQ(ierr);
933   }
934   if (rr) SETERRQ(1,"MatDiagonalScale_MPIBAIJ:not supported for right vector");
935   return 0;
936 }
937 
938 /* the code does not do the diagonal entries correctly unless the
939    matrix is square and the column and row owerships are identical.
940    This is a BUG. The only way to fix it seems to be to access
941    baij->A and baij->B directly and not through the MatZeroRows()
942    routine.
943 */
944 static int MatZeroRows_MPIBAIJ(Mat A,IS is,Scalar *diag)
945 {
946   Mat_MPIBAIJ    *l = (Mat_MPIBAIJ *) A->data;
947   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
948   int            *procs,*nprocs,j,found,idx,nsends,*work;
949   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
950   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
951   int            *lens,imdex,*lrows,*values,bs=l->bs;
952   MPI_Comm       comm = A->comm;
953   MPI_Request    *send_waits,*recv_waits;
954   MPI_Status     recv_status,*send_status;
955   IS             istmp;
956 
957   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
958   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
959 
960   /*  first count number of contributors to each processor */
961   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
962   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
963   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
964   for ( i=0; i<N; i++ ) {
965     idx = rows[i];
966     found = 0;
967     for ( j=0; j<size; j++ ) {
968       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
969         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
970       }
971     }
972     if (!found) SETERRQ(1,"MatZeroRows_MPIBAIJ:Index out of range");
973   }
974   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
975 
976   /* inform other processors of number of messages and max length*/
977   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
978   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
979   nrecvs = work[rank];
980   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
981   nmax = work[rank];
982   PetscFree(work);
983 
984   /* post receives:   */
985   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
986   CHKPTRQ(rvalues);
987   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
988   CHKPTRQ(recv_waits);
989   for ( i=0; i<nrecvs; i++ ) {
990     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
991   }
992 
993   /* do sends:
994       1) starts[i] gives the starting index in svalues for stuff going to
995          the ith processor
996   */
997   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
998   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
999   CHKPTRQ(send_waits);
1000   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
1001   starts[0] = 0;
1002   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1003   for ( i=0; i<N; i++ ) {
1004     svalues[starts[owner[i]]++] = rows[i];
1005   }
1006   ISRestoreIndices(is,&rows);
1007 
1008   starts[0] = 0;
1009   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1010   count = 0;
1011   for ( i=0; i<size; i++ ) {
1012     if (procs[i]) {
1013       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
1014     }
1015   }
1016   PetscFree(starts);
1017 
1018   base = owners[rank]*bs;
1019 
1020   /*  wait on receives */
1021   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
1022   source = lens + nrecvs;
1023   count  = nrecvs; slen = 0;
1024   while (count) {
1025     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
1026     /* unpack receives into our local space */
1027     MPI_Get_count(&recv_status,MPI_INT,&n);
1028     source[imdex]  = recv_status.MPI_SOURCE;
1029     lens[imdex]  = n;
1030     slen += n;
1031     count--;
1032   }
1033   PetscFree(recv_waits);
1034 
1035   /* move the data into the send scatter */
1036   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
1037   count = 0;
1038   for ( i=0; i<nrecvs; i++ ) {
1039     values = rvalues + i*nmax;
1040     for ( j=0; j<lens[i]; j++ ) {
1041       lrows[count++] = values[j] - base;
1042     }
1043   }
1044   PetscFree(rvalues); PetscFree(lens);
1045   PetscFree(owner); PetscFree(nprocs);
1046 
1047   /* actually zap the local rows */
1048   ierr = ISCreateGeneral(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
1049   PLogObjectParent(A,istmp);
1050   PetscFree(lrows);
1051   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
1052   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
1053   ierr = ISDestroy(istmp); CHKERRQ(ierr);
1054 
1055   /* wait on sends */
1056   if (nsends) {
1057     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
1058     CHKPTRQ(send_status);
1059     MPI_Waitall(nsends,send_waits,send_status);
1060     PetscFree(send_status);
1061   }
1062   PetscFree(send_waits); PetscFree(svalues);
1063 
1064   return 0;
1065 }
1066 extern int MatPrintHelp_SeqBAIJ(Mat);
1067 static int MatPrintHelp_MPIBAIJ(Mat A)
1068 {
1069   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1070 
1071   if (!a->rank) return MatPrintHelp_SeqBAIJ(a->A);
1072   else return 0;
1073 }
1074 
1075 static int MatConvertSameType_MPIBAIJ(Mat,Mat *,int);
1076 
1077 /* -------------------------------------------------------------------*/
1078 static struct _MatOps MatOps = {
1079   MatSetValues_MPIBAIJ,MatGetRow_MPIBAIJ,MatRestoreRow_MPIBAIJ,MatMult_MPIBAIJ,
1080   MatMultAdd_MPIBAIJ,MatMultTrans_MPIBAIJ,MatMultTransAdd_MPIBAIJ,0,
1081   0,0,0,0,
1082   0,0,MatTranspose_MPIBAIJ,MatGetInfo_MPIBAIJ,
1083   0,MatGetDiagonal_MPIBAIJ,MatDiagonalScale_MPIBAIJ,MatNorm_MPIBAIJ,
1084   MatAssemblyBegin_MPIBAIJ,MatAssemblyEnd_MPIBAIJ,0,MatSetOption_MPIBAIJ,
1085   MatZeroEntries_MPIBAIJ,MatZeroRows_MPIBAIJ,0,
1086   0,0,0,MatGetSize_MPIBAIJ,
1087   MatGetLocalSize_MPIBAIJ,MatGetOwnershipRange_MPIBAIJ,0,0,
1088   0,0,0,MatConvertSameType_MPIBAIJ,0,0,
1089   0,0,0,MatGetSubMatrices_MPIBAIJ,
1090   MatIncreaseOverlap_MPIBAIJ,MatGetValues_MPIBAIJ,0,MatPrintHelp_MPIBAIJ,
1091   MatScale_MPIBAIJ,0,0,0,MatGetBlockSize_MPIBAIJ};
1092 
1093 
1094 /*@C
1095    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
1096    (block compressed row).  For good matrix assembly performance
1097    the user should preallocate the matrix storage by setting the parameters
1098    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1099    performance can be increased by more than a factor of 50.
1100 
1101    Input Parameters:
1102 .  comm - MPI communicator
1103 .  bs   - size of blockk
1104 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1105            This value should be the same as the local size used in creating the
1106            y vector for the matrix-vector product y = Ax.
1107 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1108            This value should be the same as the local size used in creating the
1109            x vector for the matrix-vector product y = Ax.
1110 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1111 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1112 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
1113            submatrix  (same for all local rows)
1114 .  d_nzz - array containing the number of block nonzeros in the various block rows
1115            of the in diagonal portion of the local (possibly different for each block
1116            row) or PETSC_NULL.  You must leave room for the diagonal entry even if
1117            it is zero.
1118 .  o_nz  - number of block nonzeros per block row in the off-diagonal portion of local
1119            submatrix (same for all local rows).
1120 .  o_nzz - array containing the number of nonzeros in the various block rows of the
1121            off-diagonal portion of the local submatrix (possibly different for
1122            each block row) or PETSC_NULL.
1123 
1124    Output Parameter:
1125 .  A - the matrix
1126 
1127    Notes:
1128    The user MUST specify either the local or global matrix dimensions
1129    (possibly both).
1130 
1131    Storage Information:
1132    For a square global matrix we define each processor's diagonal portion
1133    to be its local rows and the corresponding columns (a square submatrix);
1134    each processor's off-diagonal portion encompasses the remainder of the
1135    local matrix (a rectangular submatrix).
1136 
1137    The user can specify preallocated storage for the diagonal part of
1138    the local submatrix with either d_nz or d_nnz (not both).  Set
1139    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1140    memory allocation.  Likewise, specify preallocated storage for the
1141    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1142 
1143    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1144    the figure below we depict these three local rows and all columns (0-11).
1145 
1146 $          0 1 2 3 4 5 6 7 8 9 10 11
1147 $         -------------------
1148 $  row 3  |  o o o d d d o o o o o o
1149 $  row 4  |  o o o d d d o o o o o o
1150 $  row 5  |  o o o d d d o o o o o o
1151 $         -------------------
1152 $
1153 
1154    Thus, any entries in the d locations are stored in the d (diagonal)
1155    submatrix, and any entries in the o locations are stored in the
1156    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1157    stored simply in the MATSEQBAIJ format for compressed row storage.
1158 
1159    Now d_nz should indicate the number of nonzeros per row in the d matrix,
1160    and o_nz should indicate the number of nonzeros per row in the o matrix.
1161    In general, for PDE problems in which most nonzeros are near the diagonal,
1162    one expects d_nz >> o_nz.   For large problems you MUST preallocate memory
1163    or you will get TERRIBLE performance; see the users' manual chapter on
1164    matrices.
1165 
1166 .keywords: matrix, block, aij, compressed row, sparse, parallel
1167 
1168 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues()
1169 @*/
1170 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
1171                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1172 {
1173   Mat          B;
1174   Mat_MPIBAIJ  *b;
1175   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE,size;
1176 
1177   if (bs < 1) SETERRQ(1,"MatCreateMPIBAIJ: invalid block size specified");
1178   *A = 0;
1179   PetscHeaderCreate(B,_Mat,MAT_COOKIE,MATMPIBAIJ,comm);
1180   PLogObjectCreate(B);
1181   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
1182   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
1183   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1184   MPI_Comm_size(comm,&size);
1185   if (size == 1) {
1186     B->ops.getrowij          = MatGetRowIJ_MPIBAIJ;
1187     B->ops.restorerowij      = MatRestoreRowIJ_MPIBAIJ;
1188     B->ops.lufactorsymbolic  = MatLUFactorSymbolic_MPIBAIJ;
1189     B->ops.lufactornumeric   = MatLUFactorNumeric_MPIBAIJ;
1190     B->ops.lufactor          = MatLUFactor_MPIBAIJ;
1191     B->ops.solve             = MatSolve_MPIBAIJ;
1192     B->ops.solveadd          = MatSolveAdd_MPIBAIJ;
1193     B->ops.solvetrans        = MatSolveTrans_MPIBAIJ;
1194     B->ops.solvetransadd     = MatSolveTransAdd_MPIBAIJ;
1195     B->ops.ilufactorsymbolic = MatILUFactorSymbolic_MPIBAIJ;
1196   }
1197 
1198   B->destroy    = MatDestroy_MPIBAIJ;
1199   B->view       = MatView_MPIBAIJ;
1200   B->mapping    = 0;
1201   B->factor     = 0;
1202   B->assembled  = PETSC_FALSE;
1203 
1204   b->insertmode = NOT_SET_VALUES;
1205   MPI_Comm_rank(comm,&b->rank);
1206   MPI_Comm_size(comm,&b->size);
1207 
1208   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
1209     SETERRQ(1,"MatCreateMPIBAIJ:Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1210   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) SETERRQ(1,"MatCreateMPIBAIJ: either M or m should be specified");
1211   if ( M == PETSC_DECIDE && n == PETSC_DECIDE)SETERRQ(1,"MatCreateMPIBAIJ: either N or n should be specified");
1212   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
1213   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
1214 
1215   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1216     work[0] = m; work[1] = n;
1217     mbs = m/bs; nbs = n/bs;
1218     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
1219     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
1220     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
1221   }
1222   if (m == PETSC_DECIDE) {
1223     Mbs = M/bs;
1224     if (Mbs*bs != M) SETERRQ(1,"MatCreateMPIBAIJ: No of global rows must be divisible by blocksize");
1225     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
1226     m   = mbs*bs;
1227   }
1228   if (n == PETSC_DECIDE) {
1229     Nbs = N/bs;
1230     if (Nbs*bs != N) SETERRQ(1,"MatCreateMPIBAIJ: No of global cols must be divisible by blocksize");
1231     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
1232     n   = nbs*bs;
1233   }
1234   if (mbs*bs != m || nbs*bs != n) SETERRQ(1,"MatCreateMPIBAIJ: No of local rows, cols must be divisible by blocksize");
1235 
1236   b->m = m; B->m = m;
1237   b->n = n; B->n = n;
1238   b->N = N; B->N = N;
1239   b->M = M; B->M = M;
1240   b->bs  = bs;
1241   b->bs2 = bs*bs;
1242   b->mbs = mbs;
1243   b->nbs = nbs;
1244   b->Mbs = Mbs;
1245   b->Nbs = Nbs;
1246 
1247   /* build local table of row and column ownerships */
1248   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1249   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
1250   b->cowners = b->rowners + b->size + 2;
1251   MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
1252   b->rowners[0] = 0;
1253   for ( i=2; i<=b->size; i++ ) {
1254     b->rowners[i] += b->rowners[i-1];
1255   }
1256   b->rstart    = b->rowners[b->rank];
1257   b->rend      = b->rowners[b->rank+1];
1258   b->rstart_bs = b->rstart * bs;
1259   b->rend_bs   = b->rend * bs;
1260 
1261   MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
1262   b->cowners[0] = 0;
1263   for ( i=2; i<=b->size; i++ ) {
1264     b->cowners[i] += b->cowners[i-1];
1265   }
1266   b->cstart    = b->cowners[b->rank];
1267   b->cend      = b->cowners[b->rank+1];
1268   b->cstart_bs = b->cstart * bs;
1269   b->cend_bs   = b->cend * bs;
1270 
1271   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1272   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1273   PLogObjectParent(B,b->A);
1274   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1275   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1276   PLogObjectParent(B,b->B);
1277 
1278   /* build cache for off array entries formed */
1279   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1280   b->donotstash  = 0;
1281   b->colmap      = 0;
1282   b->garray      = 0;
1283   b->roworiented = 1;
1284 
1285   /* stuff used for matrix vector multiply */
1286   b->lvec      = 0;
1287   b->Mvctx     = 0;
1288 
1289   /* stuff for MatGetRow() */
1290   b->rowindices   = 0;
1291   b->rowvalues    = 0;
1292   b->getrowactive = PETSC_FALSE;
1293 
1294   *A = B;
1295   return 0;
1296 }
1297 static int MatConvertSameType_MPIBAIJ(Mat matin,Mat *newmat,int cpvalues)
1298 {
1299   Mat        mat;
1300   Mat_MPIBAIJ *a,*oldmat = (Mat_MPIBAIJ *) matin->data;
1301   int        ierr, len=0, flg;
1302 
1303   *newmat       = 0;
1304   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIBAIJ,matin->comm);
1305   PLogObjectCreate(mat);
1306   mat->data       = (void *) (a = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(a);
1307   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1308   mat->destroy    = MatDestroy_MPIBAIJ;
1309   mat->view       = MatView_MPIBAIJ;
1310   mat->factor     = matin->factor;
1311   mat->assembled  = PETSC_TRUE;
1312 
1313   a->m = mat->m   = oldmat->m;
1314   a->n = mat->n   = oldmat->n;
1315   a->M = mat->M   = oldmat->M;
1316   a->N = mat->N   = oldmat->N;
1317 
1318   a->bs  = oldmat->bs;
1319   a->bs2 = oldmat->bs2;
1320   a->mbs = oldmat->mbs;
1321   a->nbs = oldmat->nbs;
1322   a->Mbs = oldmat->Mbs;
1323   a->Nbs = oldmat->Nbs;
1324 
1325   a->rstart       = oldmat->rstart;
1326   a->rend         = oldmat->rend;
1327   a->cstart       = oldmat->cstart;
1328   a->cend         = oldmat->cend;
1329   a->size         = oldmat->size;
1330   a->rank         = oldmat->rank;
1331   a->insertmode   = NOT_SET_VALUES;
1332   a->rowvalues    = 0;
1333   a->getrowactive = PETSC_FALSE;
1334 
1335   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1336   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
1337   a->cowners = a->rowners + a->size + 2;
1338   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1339   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1340   if (oldmat->colmap) {
1341     a->colmap = (int *) PetscMalloc((a->Nbs)*sizeof(int));CHKPTRQ(a->colmap);
1342     PLogObjectMemory(mat,(a->Nbs)*sizeof(int));
1343     PetscMemcpy(a->colmap,oldmat->colmap,(a->Nbs)*sizeof(int));
1344   } else a->colmap = 0;
1345   if (oldmat->garray && (len = ((Mat_SeqBAIJ *) (oldmat->B->data))->nbs)) {
1346     a->garray = (int *) PetscMalloc(len*sizeof(int)); CHKPTRQ(a->garray);
1347     PLogObjectMemory(mat,len*sizeof(int));
1348     PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1349   } else a->garray = 0;
1350 
1351   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1352   PLogObjectParent(mat,a->lvec);
1353   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1354   PLogObjectParent(mat,a->Mvctx);
1355   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1356   PLogObjectParent(mat,a->A);
1357   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1358   PLogObjectParent(mat,a->B);
1359   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1360   if (flg) {
1361     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1362   }
1363   *newmat = mat;
1364   return 0;
1365 }
1366 
1367 #include "sys.h"
1368 
1369 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
1370 {
1371   Mat          A;
1372   int          i, nz, ierr, j,rstart, rend, fd;
1373   Scalar       *vals,*buf;
1374   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1375   MPI_Status   status;
1376   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
1377   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
1378   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,bs2,Mbs,mbs,extra_rows;
1379   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
1380   int          dcount,kmax,k,nzcount,tmp;
1381 
1382 
1383   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
1384   bs2  = bs*bs;
1385 
1386   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1387   if (!rank) {
1388     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1389     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
1390     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIBAIJ:not matrix object");
1391   }
1392 
1393   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1394   M = header[1]; N = header[2];
1395 
1396   if (M != N) SETERRQ(1,"MatLoad_MPIBAIJ:Can only do square matrices");
1397 
1398   /*
1399      This code adds extra rows to make sure the number of rows is
1400      divisible by the blocksize
1401   */
1402   Mbs        = M/bs;
1403   extra_rows = bs - M + bs*(Mbs);
1404   if (extra_rows == bs) extra_rows = 0;
1405   else                  Mbs++;
1406   if (extra_rows &&!rank) {
1407     PLogInfo(0,"MatLoad_MPIBAIJ:Padding loaded matrix to match blocksize\n");
1408   }
1409 
1410   /* determine ownership of all rows */
1411   mbs = Mbs/size + ((Mbs % size) > rank);
1412   m   = mbs * bs;
1413   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
1414   browners = rowners + size + 1;
1415   MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1416   rowners[0] = 0;
1417   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
1418   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
1419   rstart = rowners[rank];
1420   rend   = rowners[rank+1];
1421 
1422   /* distribute row lengths to all processors */
1423   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
1424   if (!rank) {
1425     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
1426     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1427     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
1428     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1429     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
1430     MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);
1431     PetscFree(sndcounts);
1432   }
1433   else {
1434     MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);
1435   }
1436 
1437   if (!rank) {
1438     /* calculate the number of nonzeros on each processor */
1439     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1440     PetscMemzero(procsnz,size*sizeof(int));
1441     for ( i=0; i<size; i++ ) {
1442       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
1443         procsnz[i] += rowlengths[j];
1444       }
1445     }
1446     PetscFree(rowlengths);
1447 
1448     /* determine max buffer needed and allocate it */
1449     maxnz = 0;
1450     for ( i=0; i<size; i++ ) {
1451       maxnz = PetscMax(maxnz,procsnz[i]);
1452     }
1453     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1454 
1455     /* read in my part of the matrix column indices  */
1456     nz = procsnz[0];
1457     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1458     mycols = ibuf;
1459     if (size == 1)  nz -= extra_rows;
1460     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1461     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
1462 
1463     /* read in every ones (except the last) and ship off */
1464     for ( i=1; i<size-1; i++ ) {
1465       nz = procsnz[i];
1466       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1467       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1468     }
1469     /* read in the stuff for the last proc */
1470     if ( size != 1 ) {
1471       nz = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
1472       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1473       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
1474       MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);
1475     }
1476     PetscFree(cols);
1477   }
1478   else {
1479     /* determine buffer space needed for message */
1480     nz = 0;
1481     for ( i=0; i<m; i++ ) {
1482       nz += locrowlens[i];
1483     }
1484     ibuf = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1485     mycols = ibuf;
1486     /* receive message of column indices*/
1487     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1488     MPI_Get_count(&status,MPI_INT,&maxnz);
1489     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIBAIJ:something is wrong with file");
1490   }
1491 
1492   /* loop over local rows, determining number of off diagonal entries */
1493   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
1494   odlens = dlens + (rend-rstart);
1495   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
1496   PetscMemzero(mask,3*Mbs*sizeof(int));
1497   masked1 = mask    + Mbs;
1498   masked2 = masked1 + Mbs;
1499   rowcount = 0; nzcount = 0;
1500   for ( i=0; i<mbs; i++ ) {
1501     dcount  = 0;
1502     odcount = 0;
1503     for ( j=0; j<bs; j++ ) {
1504       kmax = locrowlens[rowcount];
1505       for ( k=0; k<kmax; k++ ) {
1506         tmp = mycols[nzcount++]/bs;
1507         if (!mask[tmp]) {
1508           mask[tmp] = 1;
1509           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
1510           else masked1[dcount++] = tmp;
1511         }
1512       }
1513       rowcount++;
1514     }
1515 
1516     dlens[i]  = dcount;
1517     odlens[i] = odcount;
1518 
1519     /* zero out the mask elements we set */
1520     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
1521     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
1522   }
1523 
1524   /* create our matrix */
1525   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);
1526          CHKERRQ(ierr);
1527   A = *newmat;
1528   MatSetOption(A,MAT_COLUMNS_SORTED);
1529 
1530   if (!rank) {
1531     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
1532     /* read in my part of the matrix numerical values  */
1533     nz = procsnz[0];
1534     vals = buf;
1535     mycols = ibuf;
1536     if (size == 1)  nz -= extra_rows;
1537     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1538     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
1539 
1540     /* insert into matrix */
1541     jj      = rstart*bs;
1542     for ( i=0; i<m; i++ ) {
1543       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1544       mycols += locrowlens[i];
1545       vals   += locrowlens[i];
1546       jj++;
1547     }
1548     /* read in other processors (except the last one) and ship out */
1549     for ( i=1; i<size-1; i++ ) {
1550       nz = procsnz[i];
1551       vals = buf;
1552       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1553       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1554     }
1555     /* the last proc */
1556     if ( size != 1 ){
1557       nz = procsnz[i] - extra_rows;
1558       vals = buf;
1559       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1560       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
1561       MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);
1562     }
1563     PetscFree(procsnz);
1564   }
1565   else {
1566     /* receive numeric values */
1567     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
1568 
1569     /* receive message of values*/
1570     vals = buf;
1571     mycols = ibuf;
1572     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1573     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1574     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIBAIJ:something is wrong with file");
1575 
1576     /* insert into matrix */
1577     jj      = rstart*bs;
1578     for ( i=0; i<m; i++ ) {
1579       ierr    = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1580       mycols += locrowlens[i];
1581       vals   += locrowlens[i];
1582       jj++;
1583     }
1584   }
1585   PetscFree(locrowlens);
1586   PetscFree(buf);
1587   PetscFree(ibuf);
1588   PetscFree(rowners);
1589   PetscFree(dlens);
1590   PetscFree(mask);
1591   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1592   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1593   return 0;
1594 }
1595 
1596 
1597