xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision bbf0e169f9fbcb77d596644d33cfb942ebffc0f0)
1 #ifndef lint
2 static char vcid[] = "$Id: mpibaij.c,v 1.26 1996/09/19 20:51:42 balay Exp bsmith $";
3 #endif
4 
5 #include "src/mat/impls/baij/mpi/mpibaij.h"
6 #include "src/vec/vecimpl.h"
7 
8 #include "draw.h"
9 #include "pinclude/pviewer.h"
10 
11 extern int MatSetUpMultiply_MPIBAIJ(Mat);
12 extern int DisAssemble_MPIBAIJ(Mat);
13 extern int MatIncreaseOverlap_MPIBAIJ(Mat,int,IS *,int);
14 extern int MatGetSubMatrices_MPIBAIJ(Mat,int,IS *,IS *,MatGetSubMatrixCall,Mat **);
15 extern int MatLUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,Mat *);
16 extern int MatLUFactorNumeric_MPIBAIJ(Mat,Mat *);
17 extern int MatLUFactor_MPIBAIJ(Mat,IS,IS,double);
18 extern int MatSolve_MPIBAIJ(Mat,Vec,Vec);
19 extern int MatSolveAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
20 extern int MatSolveTrans_MPIBAIJ(Mat,Vec,Vec);
21 extern int MatSolveTransAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
22 extern int MatILUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,int,Mat *);
23 
24 
25 /*
26      Local utility routine that creates a mapping from the global column
27    number to the local number in the off-diagonal part of the local
28    storage of the matrix.  This is done in a non scable way since the
29    length of colmap equals the global matrix length.
30 */
31 static int CreateColmap_MPIBAIJ_Private(Mat mat)
32 {
33   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
34   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
35   int         nbs = B->nbs,i;
36 
37   baij->colmap = (int *) PetscMalloc(baij->Nbs*sizeof(int));CHKPTRQ(baij->colmap);
38   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
39   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
40   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i + 1;
41   return 0;
42 }
43 
44 static int MatGetRowIJ_MPIBAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
45                             PetscTruth *done)
46 {
47   Mat_MPIBAIJ *aij = (Mat_MPIBAIJ *) mat->data;
48   int         ierr;
49   if (aij->size == 1) {
50     ierr = MatGetRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
51   } else SETERRQ(1,"MatGetRowIJ_MPIBAIJ:not supported in parallel");
52   return 0;
53 }
54 
55 static int MatRestoreRowIJ_MPIBAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
56                                 PetscTruth *done)
57 {
58   Mat_MPIBAIJ *aij = (Mat_MPIBAIJ *) mat->data;
59   int        ierr;
60   if (aij->size == 1) {
61     ierr = MatRestoreRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
62   } else SETERRQ(1,"MatRestoreRowIJ_MPIBAIJ:not supported in parallel");
63   return 0;
64 }
65 
66 static int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
67 {
68   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
69   Scalar      value;
70   int         ierr,i,j, rstart = baij->rstart, rend = baij->rend;
71   int         cstart = baij->cstart, cend = baij->cend,row,col;
72   int         roworiented = baij->roworiented,rstart_orig,rend_orig;
73   int         cstart_orig,cend_orig,bs=baij->bs;
74 
75   if (baij->insertmode != NOT_SET_VALUES && baij->insertmode != addv) {
76     SETERRQ(1,"MatSetValues_MPIBAIJ:Cannot mix inserts and adds");
77   }
78   baij->insertmode = addv;
79   rstart_orig = rstart*bs;
80   rend_orig   = rend*bs;
81   cstart_orig = cstart*bs;
82   cend_orig   = cend*bs;
83   for ( i=0; i<m; i++ ) {
84     if (im[i] < 0) SETERRQ(1,"MatSetValues_MPIBAIJ:Negative row");
85     if (im[i] >= baij->M) SETERRQ(1,"MatSetValues_MPIBAIJ:Row too large");
86     if (im[i] >= rstart_orig && im[i] < rend_orig) {
87       row = im[i] - rstart_orig;
88       for ( j=0; j<n; j++ ) {
89         if (in[j] < 0) SETERRQ(1,"MatSetValues_MPIBAIJ:Negative column");
90         if (in[j] >= baij->N) SETERRQ(1,"MatSetValues_MPIBAIJ:Col too large");
91         if (in[j] >= cstart_orig && in[j] < cend_orig){
92           col = in[j] - cstart_orig;
93           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
94           ierr = MatSetValues(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
95         }
96         else {
97           if (mat->was_assembled) {
98             if (!baij->colmap) {
99               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
100             }
101             col = baij->colmap[in[j]/bs] - 1 + in[j]%bs;
102             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
103               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
104               col =  in[j];
105             }
106           }
107           else col = in[j];
108           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
109           ierr = MatSetValues(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
110         }
111       }
112     }
113     else {
114       if (roworiented) {
115         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
116       }
117       else {
118         row = im[i];
119         for ( j=0; j<n; j++ ) {
120           ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
121         }
122       }
123     }
124   }
125   return 0;
126 }
127 
128 static int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
129 {
130   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
131   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
132   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col;
133 
134   for ( i=0; i<m; i++ ) {
135     if (idxm[i] < 0) SETERRQ(1,"MatGetValues_MPIBAIJ:Negative row");
136     if (idxm[i] >= baij->M) SETERRQ(1,"MatGetValues_MPIBAIJ:Row too large");
137     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
138       row = idxm[i] - bsrstart;
139       for ( j=0; j<n; j++ ) {
140         if (idxn[j] < 0) SETERRQ(1,"MatGetValues_MPIBAIJ:Negative column");
141         if (idxn[j] >= baij->N) SETERRQ(1,"MatGetValues_MPIBAIJ:Col too large");
142         if (idxn[j] >= bscstart && idxn[j] < bscend){
143           col = idxn[j] - bscstart;
144           ierr = MatGetValues(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
145         }
146         else {
147           if (!baij->colmap) {
148             ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
149           }
150           if((baij->colmap[idxn[j]/bs]-1 < 0) ||
151              (baij->garray[baij->colmap[idxn[j]/bs]-1] != idxn[j]/bs)) *(v+i*n+j) = 0.0;
152           else {
153             col  = (baij->colmap[idxn[j]/bs]-1)*bs + idxn[j]%bs;
154             ierr = MatGetValues(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
155           }
156         }
157       }
158     }
159     else {
160       SETERRQ(1,"MatGetValues_MPIBAIJ:Only local values currently supported");
161     }
162   }
163   return 0;
164 }
165 
166 static int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
167 {
168   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
169   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
170   int        ierr, i,bs2=baij->bs2;
171   double     sum = 0.0;
172   Scalar     *v;
173 
174   if (baij->size == 1) {
175     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
176   } else {
177     if (type == NORM_FROBENIUS) {
178       v = amat->a;
179       for (i=0; i<amat->nz*bs2; i++ ) {
180 #if defined(PETSC_COMPLEX)
181         sum += real(conj(*v)*(*v)); v++;
182 #else
183         sum += (*v)*(*v); v++;
184 #endif
185       }
186       v = bmat->a;
187       for (i=0; i<bmat->nz*bs2; i++ ) {
188 #if defined(PETSC_COMPLEX)
189         sum += real(conj(*v)*(*v)); v++;
190 #else
191         sum += (*v)*(*v); v++;
192 #endif
193       }
194       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
195       *norm = sqrt(*norm);
196     }
197     else
198       SETERRQ(1,"MatNorm_SeqBAIJ:No support for this norm yet");
199   }
200   return 0;
201 }
202 
203 static int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
204 {
205   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
206   MPI_Comm    comm = mat->comm;
207   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
208   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
209   MPI_Request *send_waits,*recv_waits;
210   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
211   InsertMode  addv;
212   Scalar      *rvalues,*svalues;
213 
214   /* make sure all processors are either in INSERTMODE or ADDMODE */
215   MPI_Allreduce(&baij->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
216   if (addv == (ADD_VALUES|INSERT_VALUES)) {
217     SETERRQ(1,"MatAssemblyBegin_MPIBAIJ:Some processors inserted others added");
218   }
219   baij->insertmode = addv; /* in case this processor had no cache */
220 
221   /*  first count number of contributors to each processor */
222   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
223   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
224   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
225   for ( i=0; i<baij->stash.n; i++ ) {
226     idx = baij->stash.idx[i];
227     for ( j=0; j<size; j++ ) {
228       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
229         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
230       }
231     }
232   }
233   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
234 
235   /* inform other processors of number of messages and max length*/
236   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
237   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
238   nreceives = work[rank];
239   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
240   nmax = work[rank];
241   PetscFree(work);
242 
243   /* post receives:
244        1) each message will consist of ordered pairs
245      (global index,value) we store the global index as a double
246      to simplify the message passing.
247        2) since we don't know how long each individual message is we
248      allocate the largest needed buffer for each receive. Potentially
249      this is a lot of wasted space.
250 
251 
252        This could be done better.
253   */
254   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
255   CHKPTRQ(rvalues);
256   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
257   CHKPTRQ(recv_waits);
258   for ( i=0; i<nreceives; i++ ) {
259     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
260               comm,recv_waits+i);
261   }
262 
263   /* do sends:
264       1) starts[i] gives the starting index in svalues for stuff going to
265          the ith processor
266   */
267   svalues = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
268   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
269   CHKPTRQ(send_waits);
270   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
271   starts[0] = 0;
272   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
273   for ( i=0; i<baij->stash.n; i++ ) {
274     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
275     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
276     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
277   }
278   PetscFree(owner);
279   starts[0] = 0;
280   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
281   count = 0;
282   for ( i=0; i<size; i++ ) {
283     if (procs[i]) {
284       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
285                 comm,send_waits+count++);
286     }
287   }
288   PetscFree(starts); PetscFree(nprocs);
289 
290   /* Free cache space */
291   PLogInfo(0,"[%d]MatAssemblyBegin_MPIBAIJ:Number of off processor values %d\n",rank,baij->stash.n);
292   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
293 
294   baij->svalues    = svalues;    baij->rvalues    = rvalues;
295   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
296   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
297   baij->rmax       = nmax;
298 
299   return 0;
300 }
301 
302 
303 static int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
304 {
305   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
306   MPI_Status  *send_status,recv_status;
307   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
308   int         bs=baij->bs,row,col,other_disassembled;
309   Scalar      *values,val;
310   InsertMode  addv = baij->insertmode;
311 
312   /*  wait on receives */
313   while (count) {
314     MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);
315     /* unpack receives into our local space */
316     values = baij->rvalues + 3*imdex*baij->rmax;
317     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
318     n = n/3;
319     for ( i=0; i<n; i++ ) {
320       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
321       col = (int) PetscReal(values[3*i+1]);
322       val = values[3*i+2];
323       if (col >= baij->cstart*bs && col < baij->cend*bs) {
324         col -= baij->cstart*bs;
325         MatSetValues(baij->A,1,&row,1,&col,&val,addv);
326       }
327       else {
328         if (mat->was_assembled) {
329           if (!baij->colmap) {
330             ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);
331           }
332           col = (baij->colmap[col/bs]-1)*bs + col%bs;
333           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
334             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
335             col = (int) PetscReal(values[3*i+1]);
336           }
337         }
338         MatSetValues(baij->B,1,&row,1,&col,&val,addv);
339       }
340     }
341     count--;
342   }
343   PetscFree(baij->recv_waits); PetscFree(baij->rvalues);
344 
345   /* wait on sends */
346   if (baij->nsends) {
347     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));
348     CHKPTRQ(send_status);
349     MPI_Waitall(baij->nsends,baij->send_waits,send_status);
350     PetscFree(send_status);
351   }
352   PetscFree(baij->send_waits); PetscFree(baij->svalues);
353 
354   baij->insertmode = NOT_SET_VALUES;
355   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
356   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
357 
358   /* determine if any processor has disassembled, if so we must
359      also disassemble ourselfs, in order that we may reassemble. */
360   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
361   if (mat->was_assembled && !other_disassembled) {
362     ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
363   }
364 
365   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
366     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
367   }
368   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
369   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
370 
371   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
372   return 0;
373 }
374 
375 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
376 {
377   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
378   int          ierr;
379 
380   if (baij->size == 1) {
381     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
382   }
383   else SETERRQ(1,"MatView_MPIBAIJ_Binary:Only uniprocessor output supported");
384   return 0;
385 }
386 
387 static int MatView_MPIBAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
388 {
389   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
390   int          ierr, format,rank,bs=baij->bs;
391   FILE         *fd;
392   ViewerType   vtype;
393 
394   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
395   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
396     ierr = ViewerGetFormat(viewer,&format);
397     if (format == ASCII_FORMAT_INFO_DETAILED) {
398       MatInfo info;
399       MPI_Comm_rank(mat->comm,&rank);
400       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
401       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
402       PetscSequentialPhaseBegin(mat->comm,1);
403       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
404               rank,baij->m,(int)info.nz_used*bs,(int)info.nz_allocated*bs,
405               baij->bs,(int)info.memory);
406       ierr = MatGetInfo(baij->A,MAT_LOCAL,&info);
407       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
408       ierr = MatGetInfo(baij->B,MAT_LOCAL,&info);
409       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
410       fflush(fd);
411       PetscSequentialPhaseEnd(mat->comm,1);
412       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
413       return 0;
414     }
415     else if (format == ASCII_FORMAT_INFO) {
416       return 0;
417     }
418   }
419 
420   if (vtype == DRAW_VIEWER) {
421     Draw       draw;
422     PetscTruth isnull;
423     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
424     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
425   }
426 
427   if (vtype == ASCII_FILE_VIEWER) {
428     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
429     PetscSequentialPhaseBegin(mat->comm,1);
430     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
431            baij->rank,baij->m,baij->rstart*bs,baij->rend*bs,baij->n,
432             baij->cstart*bs,baij->cend*bs);
433     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
434     ierr = MatView(baij->B,viewer); CHKERRQ(ierr);
435     fflush(fd);
436     PetscSequentialPhaseEnd(mat->comm,1);
437   }
438   else {
439     int size = baij->size;
440     rank = baij->rank;
441     if (size == 1) {
442       ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
443     }
444     else {
445       /* assemble the entire matrix onto first processor. */
446       Mat         A;
447       Mat_SeqBAIJ *Aloc;
448       int         M = baij->M, N = baij->N,*ai,*aj,row,col,i,j,k,*rvals;
449       int         mbs=baij->mbs;
450       Scalar      *a;
451 
452       if (!rank) {
453         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
454         CHKERRQ(ierr);
455       }
456       else {
457         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
458         CHKERRQ(ierr);
459       }
460       PLogObjectParent(mat,A);
461 
462       /* copy over the A part */
463       Aloc = (Mat_SeqBAIJ*) baij->A->data;
464       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
465       row = baij->rstart;
466       rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
467 
468       for ( i=0; i<mbs; i++ ) {
469         rvals[0] = bs*(baij->rstart + i);
470         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
471         for ( j=ai[i]; j<ai[i+1]; j++ ) {
472           col = (baij->cstart+aj[j])*bs;
473           for (k=0; k<bs; k++ ) {
474             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
475             col++; a += bs;
476           }
477         }
478       }
479       /* copy over the B part */
480       Aloc = (Mat_SeqBAIJ*) baij->B->data;
481       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
482       row = baij->rstart*bs;
483       for ( i=0; i<mbs; i++ ) {
484         rvals[0] = bs*(baij->rstart + i);
485         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
486         for ( j=ai[i]; j<ai[i+1]; j++ ) {
487           col = baij->garray[aj[j]]*bs;
488           for (k=0; k<bs; k++ ) {
489             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
490             col++; a += bs;
491           }
492         }
493       }
494       PetscFree(rvals);
495       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
496       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
497       if (!rank) {
498         ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
499       }
500       ierr = MatDestroy(A); CHKERRQ(ierr);
501     }
502   }
503   return 0;
504 }
505 
506 
507 
508 static int MatView_MPIBAIJ(PetscObject obj,Viewer viewer)
509 {
510   Mat         mat = (Mat) obj;
511   int         ierr;
512   ViewerType  vtype;
513 
514   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
515   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
516       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
517     ierr = MatView_MPIBAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
518   }
519   else if (vtype == BINARY_FILE_VIEWER) {
520     return MatView_MPIBAIJ_Binary(mat,viewer);
521   }
522   return 0;
523 }
524 
525 static int MatDestroy_MPIBAIJ(PetscObject obj)
526 {
527   Mat         mat = (Mat) obj;
528   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
529   int         ierr;
530 
531 #if defined(PETSC_LOG)
532   PLogObjectState(obj,"Rows=%d, Cols=%d",baij->M,baij->N);
533 #endif
534 
535   PetscFree(baij->rowners);
536   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
537   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
538   if (baij->colmap) PetscFree(baij->colmap);
539   if (baij->garray) PetscFree(baij->garray);
540   if (baij->lvec)   VecDestroy(baij->lvec);
541   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
542   if (baij->rowvalues) PetscFree(baij->rowvalues);
543   PetscFree(baij);
544   PLogObjectDestroy(mat);
545   PetscHeaderDestroy(mat);
546   return 0;
547 }
548 
549 static int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
550 {
551   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
552   int         ierr, nt;
553 
554   VecGetLocalSize_Fast(xx,nt);
555   if (nt != a->n) {
556     SETERRQ(1,"MatMult_MPIBAIJ:Incompatible parition of A and xx");
557   }
558   VecGetLocalSize_Fast(yy,nt);
559   if (nt != a->m) {
560     SETERRQ(1,"MatMult_MPIBAIJ:Incompatible parition of A and yy");
561   }
562   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
563   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
564   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
565   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
566   return 0;
567 }
568 
569 static int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
570 {
571   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
572   int        ierr;
573   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
574   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
575   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
576   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
577   return 0;
578 }
579 
580 static int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
581 {
582   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
583   int        ierr;
584 
585   /* do nondiagonal part */
586   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
587   /* send it on its way */
588   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
589   /* do local part */
590   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
591   /* receive remote parts: note this assumes the values are not actually */
592   /* inserted in yy until the next line, which is true for my implementation*/
593   /* but is not perhaps always true. */
594   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,
595                   (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx);CHKERRQ(ierr);
596   return 0;
597 }
598 
599 static int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
600 {
601   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
602   int        ierr;
603 
604   /* do nondiagonal part */
605   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
606   /* send it on its way */
607   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
608   /* do local part */
609   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
610   /* receive remote parts: note this assumes the values are not actually */
611   /* inserted in yy until the next line, which is true for my implementation*/
612   /* but is not perhaps always true. */
613   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
614   return 0;
615 }
616 
617 /*
618   This only works correctly for square matrices where the subblock A->A is the
619    diagonal block
620 */
621 static int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
622 {
623   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
624   if (a->M != a->N)
625     SETERRQ(1,"MatGetDiagonal_MPIBAIJ:Supports only square matrix where A->A is diag block");
626   return MatGetDiagonal(a->A,v);
627 }
628 
629 static int MatScale_MPIBAIJ(Scalar *aa,Mat A)
630 {
631   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
632   int        ierr;
633   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
634   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
635   return 0;
636 }
637 static int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
638 {
639   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
640   *m = mat->M; *n = mat->N;
641   return 0;
642 }
643 
644 static int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
645 {
646   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
647   *m = mat->m; *n = mat->N;
648   return 0;
649 }
650 
651 static int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
652 {
653   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
654   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
655   return 0;
656 }
657 
658 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
659 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
660 
661 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
662 {
663   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
664   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
665   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
666   int        nztot, nzA, nzB, lrow, brstart = mat->rstart*bs, brend = mat->rend*bs;
667   int        *cmap, *idx_p,cstart = mat->cstart;
668 
669   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,"MatGetRow_MPIBAIJ:Already active");
670   mat->getrowactive = PETSC_TRUE;
671 
672   if (!mat->rowvalues && (idx || v)) {
673     /*
674         allocate enough space to hold information from the longest row.
675     */
676     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
677     int     max = 1,mbs = mat->mbs,tmp;
678     for ( i=0; i<mbs; i++ ) {
679       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
680       if (max < tmp) { max = tmp; }
681     }
682     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
683     CHKPTRQ(mat->rowvalues);
684     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
685   }
686 
687 
688   if (row < brstart || row >= brend) SETERRQ(1,"MatGetRow_MPIBAIJ:Only local rows")
689   lrow = row - brstart;
690 
691   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
692   if (!v)   {pvA = 0; pvB = 0;}
693   if (!idx) {pcA = 0; if (!v) pcB = 0;}
694   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
695   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
696   nztot = nzA + nzB;
697 
698   cmap  = mat->garray;
699   if (v  || idx) {
700     if (nztot) {
701       /* Sort by increasing column numbers, assuming A and B already sorted */
702       int imark = -1;
703       if (v) {
704         *v = v_p = mat->rowvalues;
705         for ( i=0; i<nzB; i++ ) {
706           if (cmap[cworkB[i]/bs] < cstart)   v_p[i] = vworkB[i];
707           else break;
708         }
709         imark = i;
710         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
711         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
712       }
713       if (idx) {
714         *idx = idx_p = mat->rowindices;
715         if (imark > -1) {
716           for ( i=0; i<imark; i++ ) {
717             idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs;
718           }
719         } else {
720           for ( i=0; i<nzB; i++ ) {
721             if (cmap[cworkB[i]/bs] < cstart)
722               idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
723             else break;
724           }
725           imark = i;
726         }
727         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart*bs + cworkA[i];
728         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
729       }
730     }
731     else {
732       if (idx) *idx = 0;
733       if (v)   *v   = 0;
734     }
735   }
736   *nz = nztot;
737   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
738   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
739   return 0;
740 }
741 
742 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
743 {
744   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
745   if (baij->getrowactive == PETSC_FALSE) {
746     SETERRQ(1,"MatRestoreRow_MPIBAIJ:MatGetRow not called");
747   }
748   baij->getrowactive = PETSC_FALSE;
749   return 0;
750 }
751 
752 static int MatGetBlockSize_MPIBAIJ(Mat mat,int *bs)
753 {
754   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
755   *bs = baij->bs;
756   return 0;
757 }
758 
759 static int MatZeroEntries_MPIBAIJ(Mat A)
760 {
761   Mat_MPIBAIJ *l = (Mat_MPIBAIJ *) A->data;
762   int         ierr;
763   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
764   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
765   return 0;
766 }
767 
768 static int MatGetInfo_MPIBAIJ(Mat matin,MatInfoType flag,MatInfo *info)
769 {
770   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) matin->data;
771   Mat         A = a->A, B = a->B;
772   int         ierr;
773   double      isend[5], irecv[5];
774 
775   info->rows_global    = (double)a->M;
776   info->columns_global = (double)a->N;
777   info->rows_local     = (double)a->m;
778   info->columns_local  = (double)a->N;
779   info->block_size     = (double)a->bs;
780   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
781   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->memory;
782   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
783   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->memory;
784   if (flag == MAT_LOCAL) {
785     info->nz_used      = isend[0];
786     info->nz_allocated = isend[1];
787     info->nz_unneeded  = isend[2];
788     info->memory       = isend[3];
789     info->mallocs      = isend[4];
790   } else if (flag == MAT_GLOBAL_MAX) {
791     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_MAX,matin->comm);
792     info->nz_used      = irecv[0];
793     info->nz_allocated = irecv[1];
794     info->nz_unneeded  = irecv[2];
795     info->memory       = irecv[3];
796     info->mallocs      = irecv[4];
797   } else if (flag == MAT_GLOBAL_SUM) {
798     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_SUM,matin->comm);
799     info->nz_used      = irecv[0];
800     info->nz_allocated = irecv[1];
801     info->nz_unneeded  = irecv[2];
802     info->memory       = irecv[3];
803     info->mallocs      = irecv[4];
804   }
805   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
806   info->fill_ratio_needed = 0;
807   info->factor_mallocs    = 0;
808   return 0;
809 }
810 
811 static int MatSetOption_MPIBAIJ(Mat A,MatOption op)
812 {
813   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
814 
815   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
816       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
817       op == MAT_COLUMNS_SORTED ||
818       op == MAT_ROW_ORIENTED) {
819         MatSetOption(a->A,op);
820         MatSetOption(a->B,op);
821   }
822   else if (op == MAT_ROWS_SORTED ||
823            op == MAT_SYMMETRIC ||
824            op == MAT_STRUCTURALLY_SYMMETRIC ||
825            op == MAT_YES_NEW_DIAGONALS)
826     PLogInfo(A,"Info:MatSetOption_MPIBAIJ:Option ignored\n");
827   else if (op == MAT_COLUMN_ORIENTED) {
828     a->roworiented = 0;
829     MatSetOption(a->A,op);
830     MatSetOption(a->B,op);
831   }
832   else if (op == MAT_NO_NEW_DIAGONALS)
833     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIBAIJ:MAT_NO_NEW_DIAGONALS");}
834   else
835     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIBAIJ:unknown option");}
836   return 0;
837 }
838 
839 static int MatTranspose_MPIBAIJ(Mat A,Mat *matout)
840 {
841   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) A->data;
842   Mat_SeqBAIJ *Aloc;
843   Mat        B;
844   int        ierr,M=baij->M,N=baij->N,*ai,*aj,row,i,*rvals,j,k,col;
845   int        bs=baij->bs,mbs=baij->mbs;
846   Scalar     *a;
847 
848   if (matout == PETSC_NULL && M != N)
849     SETERRQ(1,"MatTranspose_MPIBAIJ:Square matrix only for in-place");
850   ierr = MatCreateMPIBAIJ(A->comm,baij->bs,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);
851   CHKERRQ(ierr);
852 
853   /* copy over the A part */
854   Aloc = (Mat_SeqBAIJ*) baij->A->data;
855   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
856   row = baij->rstart;
857   rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
858 
859   for ( i=0; i<mbs; i++ ) {
860     rvals[0] = bs*(baij->rstart + i);
861     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
862     for ( j=ai[i]; j<ai[i+1]; j++ ) {
863       col = (baij->cstart+aj[j])*bs;
864       for (k=0; k<bs; k++ ) {
865         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
866         col++; a += bs;
867       }
868     }
869   }
870   /* copy over the B part */
871   Aloc = (Mat_SeqBAIJ*) baij->B->data;
872   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
873   row = baij->rstart*bs;
874   for ( i=0; i<mbs; i++ ) {
875     rvals[0] = bs*(baij->rstart + i);
876     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
877     for ( j=ai[i]; j<ai[i+1]; j++ ) {
878       col = baij->garray[aj[j]]*bs;
879       for (k=0; k<bs; k++ ) {
880         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
881         col++; a += bs;
882       }
883     }
884   }
885   PetscFree(rvals);
886   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
887   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
888 
889   if (matout != PETSC_NULL) {
890     *matout = B;
891   } else {
892     /* This isn't really an in-place transpose .... but free data structures from baij */
893     PetscFree(baij->rowners);
894     ierr = MatDestroy(baij->A); CHKERRQ(ierr);
895     ierr = MatDestroy(baij->B); CHKERRQ(ierr);
896     if (baij->colmap) PetscFree(baij->colmap);
897     if (baij->garray) PetscFree(baij->garray);
898     if (baij->lvec) VecDestroy(baij->lvec);
899     if (baij->Mvctx) VecScatterDestroy(baij->Mvctx);
900     PetscFree(baij);
901     PetscMemcpy(A,B,sizeof(struct _Mat));
902     PetscHeaderDestroy(B);
903   }
904   return 0;
905 }
906 
907 int MatDiagonalScale_MPIBAIJ(Mat A,Vec ll,Vec rr)
908 {
909   Mat a = ((Mat_MPIBAIJ *) A->data)->A;
910   Mat b = ((Mat_MPIBAIJ *) A->data)->B;
911   int ierr,s1,s2,s3;
912 
913   if (ll)  {
914     ierr = VecGetLocalSize(ll,&s1); CHKERRQ(ierr);
915     ierr = MatGetLocalSize(A,&s2,&s3); CHKERRQ(ierr);
916     if (s1!=s2) SETERRQ(1,"MatDiagonalScale_MPIBAIJ: non-conforming local sizes");
917     ierr = MatDiagonalScale(a,ll,0); CHKERRQ(ierr);
918     ierr = MatDiagonalScale(b,ll,0); CHKERRQ(ierr);
919   }
920   if (rr) SETERRQ(1,"MatDiagonalScale_MPIBAIJ:not supported for right vector");
921   return 0;
922 }
923 
924 /* the code does not do the diagonal entries correctly unless the
925    matrix is square and the column and row owerships are identical.
926    This is a BUG. The only way to fix it seems to be to access
927    baij->A and baij->B directly and not through the MatZeroRows()
928    routine.
929 */
930 static int MatZeroRows_MPIBAIJ(Mat A,IS is,Scalar *diag)
931 {
932   Mat_MPIBAIJ    *l = (Mat_MPIBAIJ *) A->data;
933   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
934   int            *procs,*nprocs,j,found,idx,nsends,*work;
935   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
936   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
937   int            *lens,imdex,*lrows,*values,bs=l->bs;
938   MPI_Comm       comm = A->comm;
939   MPI_Request    *send_waits,*recv_waits;
940   MPI_Status     recv_status,*send_status;
941   IS             istmp;
942 
943   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
944   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
945 
946   /*  first count number of contributors to each processor */
947   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
948   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
949   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
950   for ( i=0; i<N; i++ ) {
951     idx = rows[i];
952     found = 0;
953     for ( j=0; j<size; j++ ) {
954       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
955         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
956       }
957     }
958     if (!found) SETERRQ(1,"MatZeroRows_MPIBAIJ:Index out of range");
959   }
960   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
961 
962   /* inform other processors of number of messages and max length*/
963   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
964   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
965   nrecvs = work[rank];
966   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
967   nmax = work[rank];
968   PetscFree(work);
969 
970   /* post receives:   */
971   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
972   CHKPTRQ(rvalues);
973   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
974   CHKPTRQ(recv_waits);
975   for ( i=0; i<nrecvs; i++ ) {
976     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
977   }
978 
979   /* do sends:
980       1) starts[i] gives the starting index in svalues for stuff going to
981          the ith processor
982   */
983   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
984   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
985   CHKPTRQ(send_waits);
986   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
987   starts[0] = 0;
988   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
989   for ( i=0; i<N; i++ ) {
990     svalues[starts[owner[i]]++] = rows[i];
991   }
992   ISRestoreIndices(is,&rows);
993 
994   starts[0] = 0;
995   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
996   count = 0;
997   for ( i=0; i<size; i++ ) {
998     if (procs[i]) {
999       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
1000     }
1001   }
1002   PetscFree(starts);
1003 
1004   base = owners[rank]*bs;
1005 
1006   /*  wait on receives */
1007   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
1008   source = lens + nrecvs;
1009   count  = nrecvs; slen = 0;
1010   while (count) {
1011     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
1012     /* unpack receives into our local space */
1013     MPI_Get_count(&recv_status,MPI_INT,&n);
1014     source[imdex]  = recv_status.MPI_SOURCE;
1015     lens[imdex]  = n;
1016     slen += n;
1017     count--;
1018   }
1019   PetscFree(recv_waits);
1020 
1021   /* move the data into the send scatter */
1022   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
1023   count = 0;
1024   for ( i=0; i<nrecvs; i++ ) {
1025     values = rvalues + i*nmax;
1026     for ( j=0; j<lens[i]; j++ ) {
1027       lrows[count++] = values[j] - base;
1028     }
1029   }
1030   PetscFree(rvalues); PetscFree(lens);
1031   PetscFree(owner); PetscFree(nprocs);
1032 
1033   /* actually zap the local rows */
1034   ierr = ISCreateGeneral(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
1035   PLogObjectParent(A,istmp);
1036   PetscFree(lrows);
1037   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
1038   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
1039   ierr = ISDestroy(istmp); CHKERRQ(ierr);
1040 
1041   /* wait on sends */
1042   if (nsends) {
1043     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
1044     CHKPTRQ(send_status);
1045     MPI_Waitall(nsends,send_waits,send_status);
1046     PetscFree(send_status);
1047   }
1048   PetscFree(send_waits); PetscFree(svalues);
1049 
1050   return 0;
1051 }
1052 extern int MatPrintHelp_SeqBAIJ(Mat);
1053 static int MatPrintHelp_MPIBAIJ(Mat A)
1054 {
1055   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1056 
1057   if (!a->rank) return MatPrintHelp_SeqBAIJ(a->A);
1058   else return 0;
1059 }
1060 
1061 static int MatConvertSameType_MPIBAIJ(Mat,Mat *,int);
1062 
1063 /* -------------------------------------------------------------------*/
1064 static struct _MatOps MatOps = {
1065   MatSetValues_MPIBAIJ,MatGetRow_MPIBAIJ,MatRestoreRow_MPIBAIJ,MatMult_MPIBAIJ,
1066   MatMultAdd_MPIBAIJ,MatMultTrans_MPIBAIJ,MatMultTransAdd_MPIBAIJ,0,
1067   0,0,0,0,
1068   0,0,MatTranspose_MPIBAIJ,MatGetInfo_MPIBAIJ,
1069   0,MatGetDiagonal_MPIBAIJ,MatDiagonalScale_MPIBAIJ,MatNorm_MPIBAIJ,
1070   MatAssemblyBegin_MPIBAIJ,MatAssemblyEnd_MPIBAIJ,0,MatSetOption_MPIBAIJ,
1071   MatZeroEntries_MPIBAIJ,MatZeroRows_MPIBAIJ,0,
1072   0,0,0,MatGetSize_MPIBAIJ,
1073   MatGetLocalSize_MPIBAIJ,MatGetOwnershipRange_MPIBAIJ,0,0,
1074   0,0,0,MatConvertSameType_MPIBAIJ,0,0,
1075   0,0,0,MatGetSubMatrices_MPIBAIJ,
1076   MatIncreaseOverlap_MPIBAIJ,MatGetValues_MPIBAIJ,0,MatPrintHelp_MPIBAIJ,
1077   MatScale_MPIBAIJ,0,0,0,MatGetBlockSize_MPIBAIJ};
1078 
1079 
1080 /*@C
1081    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
1082    (block compressed row).  For good matrix assembly performance
1083    the user should preallocate the matrix storage by setting the parameters
1084    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1085    performance can be increased by more than a factor of 50.
1086 
1087    Input Parameters:
1088 .  comm - MPI communicator
1089 .  bs   - size of blockk
1090 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1091            This value should be the same as the local size used in creating the
1092            y vector for the matrix-vector product y = Ax.
1093 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1094            This value should be the same as the local size used in creating the
1095            x vector for the matrix-vector product y = Ax.
1096 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1097 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1098 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
1099            submatrix  (same for all local rows)
1100 .  d_nzz - array containing the number of block nonzeros in the various block rows
1101            of the in diagonal portion of the local (possibly different for each block
1102            row) or PETSC_NULL.  You must leave room for the diagonal entry even if
1103            it is zero.
1104 .  o_nz  - number of block nonzeros per block row in the off-diagonal portion of local
1105            submatrix (same for all local rows).
1106 .  o_nzz - array containing the number of nonzeros in the various block rows of the
1107            off-diagonal portion of the local submatrix (possibly different for
1108            each block row) or PETSC_NULL.
1109 
1110    Output Parameter:
1111 .  A - the matrix
1112 
1113    Notes:
1114    The user MUST specify either the local or global matrix dimensions
1115    (possibly both).
1116 
1117    Storage Information:
1118    For a square global matrix we define each processor's diagonal portion
1119    to be its local rows and the corresponding columns (a square submatrix);
1120    each processor's off-diagonal portion encompasses the remainder of the
1121    local matrix (a rectangular submatrix).
1122 
1123    The user can specify preallocated storage for the diagonal part of
1124    the local submatrix with either d_nz or d_nnz (not both).  Set
1125    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1126    memory allocation.  Likewise, specify preallocated storage for the
1127    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1128 
1129    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1130    the figure below we depict these three local rows and all columns (0-11).
1131 
1132 $          0 1 2 3 4 5 6 7 8 9 10 11
1133 $         -------------------
1134 $  row 3  |  o o o d d d o o o o o o
1135 $  row 4  |  o o o d d d o o o o o o
1136 $  row 5  |  o o o d d d o o o o o o
1137 $         -------------------
1138 $
1139 
1140    Thus, any entries in the d locations are stored in the d (diagonal)
1141    submatrix, and any entries in the o locations are stored in the
1142    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1143    stored simply in the MATSEQBAIJ format for compressed row storage.
1144 
1145    Now d_nz should indicate the number of nonzeros per row in the d matrix,
1146    and o_nz should indicate the number of nonzeros per row in the o matrix.
1147    In general, for PDE problems in which most nonzeros are near the diagonal,
1148    one expects d_nz >> o_nz.   For large problems you MUST preallocate memory
1149    or you will get TERRIBLE performance; see the users' manual chapter on
1150    matrices and the file $(PETSC_DIR)/Performance.
1151 
1152 .keywords: matrix, block, aij, compressed row, sparse, parallel
1153 
1154 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues()
1155 @*/
1156 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
1157                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1158 {
1159   Mat          B;
1160   Mat_MPIBAIJ  *b;
1161   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE,size;
1162 
1163   if (bs < 1) SETERRQ(1,"MatCreateMPIBAIJ: invalid block size specified");
1164   *A = 0;
1165   PetscHeaderCreate(B,_Mat,MAT_COOKIE,MATMPIBAIJ,comm);
1166   PLogObjectCreate(B);
1167   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
1168   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
1169   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1170   MPI_Comm_size(comm,&size);
1171   if (size == 1) {
1172     B->ops.getrowij          = MatGetRowIJ_MPIBAIJ;
1173     B->ops.restorerowij      = MatRestoreRowIJ_MPIBAIJ;
1174     B->ops.lufactorsymbolic  = MatLUFactorSymbolic_MPIBAIJ;
1175     B->ops.lufactornumeric   = MatLUFactorNumeric_MPIBAIJ;
1176     B->ops.lufactor          = MatLUFactor_MPIBAIJ;
1177     B->ops.solve             = MatSolve_MPIBAIJ;
1178     B->ops.solveadd          = MatSolveAdd_MPIBAIJ;
1179     B->ops.solvetrans        = MatSolveTrans_MPIBAIJ;
1180     B->ops.solvetransadd     = MatSolveTransAdd_MPIBAIJ;
1181     B->ops.ilufactorsymbolic = MatILUFactorSymbolic_MPIBAIJ;
1182   }
1183 
1184   B->destroy    = MatDestroy_MPIBAIJ;
1185   B->view       = MatView_MPIBAIJ;
1186 
1187   B->factor     = 0;
1188   B->assembled  = PETSC_FALSE;
1189 
1190   b->insertmode = NOT_SET_VALUES;
1191   MPI_Comm_rank(comm,&b->rank);
1192   MPI_Comm_size(comm,&b->size);
1193 
1194   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
1195     SETERRQ(1,"MatCreateMPIBAIJ:Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1196   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) SETERRQ(1,"MatCreateMPIBAIJ: either M or m should be specified");
1197   if ( M == PETSC_DECIDE && n == PETSC_DECIDE)SETERRQ(1,"MatCreateMPIBAIJ: either N or n should be specified");
1198   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
1199   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
1200 
1201   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1202     work[0] = m; work[1] = n;
1203     mbs = m/bs; nbs = n/bs;
1204     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
1205     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
1206     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
1207   }
1208   if (m == PETSC_DECIDE) {
1209     Mbs = M/bs;
1210     if (Mbs*bs != M) SETERRQ(1,"MatCreateMPIBAIJ: No of global rows must be divisible by blocksize");
1211     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
1212     m   = mbs*bs;
1213   }
1214   if (n == PETSC_DECIDE) {
1215     Nbs = N/bs;
1216     if (Nbs*bs != N) SETERRQ(1,"MatCreateMPIBAIJ: No of global cols must be divisible by blocksize");
1217     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
1218     n   = nbs*bs;
1219   }
1220   if (mbs*bs != m || nbs*bs != n) SETERRQ(1,"MatCreateMPIBAIJ: No of local rows, cols must be divisible by blocksize");
1221 
1222   b->m = m; B->m = m;
1223   b->n = n; B->n = n;
1224   b->N = N; B->N = N;
1225   b->M = M; B->M = M;
1226   b->bs  = bs;
1227   b->bs2 = bs*bs;
1228   b->mbs = mbs;
1229   b->nbs = nbs;
1230   b->Mbs = Mbs;
1231   b->Nbs = Nbs;
1232 
1233   /* build local table of row and column ownerships */
1234   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1235   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
1236   b->cowners = b->rowners + b->size + 2;
1237   MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
1238   b->rowners[0] = 0;
1239   for ( i=2; i<=b->size; i++ ) {
1240     b->rowners[i] += b->rowners[i-1];
1241   }
1242   b->rstart = b->rowners[b->rank];
1243   b->rend   = b->rowners[b->rank+1];
1244   MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
1245   b->cowners[0] = 0;
1246   for ( i=2; i<=b->size; i++ ) {
1247     b->cowners[i] += b->cowners[i-1];
1248   }
1249   b->cstart = b->cowners[b->rank];
1250   b->cend   = b->cowners[b->rank+1];
1251 
1252   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1253   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1254   PLogObjectParent(B,b->A);
1255   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1256   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1257   PLogObjectParent(B,b->B);
1258 
1259   /* build cache for off array entries formed */
1260   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1261   b->colmap      = 0;
1262   b->garray      = 0;
1263   b->roworiented = 1;
1264 
1265   /* stuff used for matrix vector multiply */
1266   b->lvec      = 0;
1267   b->Mvctx     = 0;
1268 
1269   /* stuff for MatGetRow() */
1270   b->rowindices   = 0;
1271   b->rowvalues    = 0;
1272   b->getrowactive = PETSC_FALSE;
1273 
1274   *A = B;
1275   return 0;
1276 }
1277 static int MatConvertSameType_MPIBAIJ(Mat matin,Mat *newmat,int cpvalues)
1278 {
1279   Mat        mat;
1280   Mat_MPIBAIJ *a,*oldmat = (Mat_MPIBAIJ *) matin->data;
1281   int        ierr, len=0, flg;
1282 
1283   *newmat       = 0;
1284   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIBAIJ,matin->comm);
1285   PLogObjectCreate(mat);
1286   mat->data       = (void *) (a = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(a);
1287   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1288   mat->destroy    = MatDestroy_MPIBAIJ;
1289   mat->view       = MatView_MPIBAIJ;
1290   mat->factor     = matin->factor;
1291   mat->assembled  = PETSC_TRUE;
1292 
1293   a->m = mat->m   = oldmat->m;
1294   a->n = mat->n   = oldmat->n;
1295   a->M = mat->M   = oldmat->M;
1296   a->N = mat->N   = oldmat->N;
1297 
1298   a->bs  = oldmat->bs;
1299   a->bs2 = oldmat->bs2;
1300   a->mbs = oldmat->mbs;
1301   a->nbs = oldmat->nbs;
1302   a->Mbs = oldmat->Mbs;
1303   a->Nbs = oldmat->Nbs;
1304 
1305   a->rstart       = oldmat->rstart;
1306   a->rend         = oldmat->rend;
1307   a->cstart       = oldmat->cstart;
1308   a->cend         = oldmat->cend;
1309   a->size         = oldmat->size;
1310   a->rank         = oldmat->rank;
1311   a->insertmode   = NOT_SET_VALUES;
1312   a->rowvalues    = 0;
1313   a->getrowactive = PETSC_FALSE;
1314 
1315   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1316   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
1317   a->cowners = a->rowners + a->size + 2;
1318   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1319   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1320   if (oldmat->colmap) {
1321     a->colmap = (int *) PetscMalloc((a->Nbs)*sizeof(int));CHKPTRQ(a->colmap);
1322     PLogObjectMemory(mat,(a->Nbs)*sizeof(int));
1323     PetscMemcpy(a->colmap,oldmat->colmap,(a->Nbs)*sizeof(int));
1324   } else a->colmap = 0;
1325   if (oldmat->garray && (len = ((Mat_SeqBAIJ *) (oldmat->B->data))->nbs)) {
1326     a->garray = (int *) PetscMalloc(len*sizeof(int)); CHKPTRQ(a->garray);
1327     PLogObjectMemory(mat,len*sizeof(int));
1328     PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1329   } else a->garray = 0;
1330 
1331   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1332   PLogObjectParent(mat,a->lvec);
1333   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1334   PLogObjectParent(mat,a->Mvctx);
1335   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1336   PLogObjectParent(mat,a->A);
1337   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1338   PLogObjectParent(mat,a->B);
1339   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1340   if (flg) {
1341     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1342   }
1343   *newmat = mat;
1344   return 0;
1345 }
1346 
1347 #include "sys.h"
1348 
1349 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
1350 {
1351   Mat          A;
1352   int          i, nz, ierr, j,rstart, rend, fd;
1353   Scalar       *vals,*buf;
1354   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1355   MPI_Status   status;
1356   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
1357   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
1358   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,bs2,Mbs,mbs,extra_rows;
1359   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
1360   int          dcount,kmax,k,nzcount,tmp;
1361 
1362 
1363   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
1364   bs2  = bs*bs;
1365 
1366   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1367   if (!rank) {
1368     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1369     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
1370     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIBAIJ:not matrix object");
1371   }
1372 
1373   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1374   M = header[1]; N = header[2];
1375 
1376   if (M != N) SETERRQ(1,"MatLoad_SeqBAIJ:Can only do square matrices");
1377 
1378   /*
1379      This code adds extra rows to make sure the number of rows is
1380      divisible by the blocksize
1381   */
1382   Mbs        = M/bs;
1383   extra_rows = bs - M + bs*(Mbs);
1384   if (extra_rows == bs) extra_rows = 0;
1385   else                  Mbs++;
1386   if (extra_rows &&!rank) {
1387     PLogInfo(0,"MatLoad_SeqBAIJ:Padding loaded matrix to match blocksize\n");
1388   }
1389 
1390   /* determine ownership of all rows */
1391   mbs = Mbs/size + ((Mbs % size) > rank);
1392   m   = mbs * bs;
1393   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
1394   browners = rowners + size + 1;
1395   MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1396   rowners[0] = 0;
1397   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
1398   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
1399   rstart = rowners[rank];
1400   rend   = rowners[rank+1];
1401 
1402   /* distribute row lengths to all processors */
1403   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
1404   if (!rank) {
1405     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
1406     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1407     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
1408     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1409     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
1410     MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);
1411     PetscFree(sndcounts);
1412   }
1413   else {
1414     MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);
1415   }
1416 
1417   if (!rank) {
1418     /* calculate the number of nonzeros on each processor */
1419     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1420     PetscMemzero(procsnz,size*sizeof(int));
1421     for ( i=0; i<size; i++ ) {
1422       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
1423         procsnz[i] += rowlengths[j];
1424       }
1425     }
1426     PetscFree(rowlengths);
1427 
1428     /* determine max buffer needed and allocate it */
1429     maxnz = 0;
1430     for ( i=0; i<size; i++ ) {
1431       maxnz = PetscMax(maxnz,procsnz[i]);
1432     }
1433     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1434 
1435     /* read in my part of the matrix column indices  */
1436     nz = procsnz[0];
1437     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1438     mycols = ibuf;
1439     if (size == 1)  nz -= extra_rows;
1440     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1441     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
1442 
1443     /* read in every ones (except the last) and ship off */
1444     for ( i=1; i<size-1; i++ ) {
1445       nz = procsnz[i];
1446       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1447       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1448     }
1449     /* read in the stuff for the last proc */
1450     if ( size != 1 ) {
1451       nz = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
1452       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1453       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
1454       MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);
1455     }
1456     PetscFree(cols);
1457   }
1458   else {
1459     /* determine buffer space needed for message */
1460     nz = 0;
1461     for ( i=0; i<m; i++ ) {
1462       nz += locrowlens[i];
1463     }
1464     ibuf = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1465     mycols = ibuf;
1466     /* receive message of column indices*/
1467     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1468     MPI_Get_count(&status,MPI_INT,&maxnz);
1469     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIBAIJ:something is wrong with file");
1470   }
1471 
1472   /* loop over local rows, determining number of off diagonal entries */
1473   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
1474   odlens = dlens + (rend-rstart);
1475   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
1476   PetscMemzero(mask,3*Mbs*sizeof(int));
1477   masked1 = mask    + Mbs;
1478   masked2 = masked1 + Mbs;
1479   rowcount = 0; nzcount = 0;
1480   for ( i=0; i<mbs; i++ ) {
1481     dcount  = 0;
1482     odcount = 0;
1483     for ( j=0; j<bs; j++ ) {
1484       kmax = locrowlens[rowcount];
1485       for ( k=0; k<kmax; k++ ) {
1486         tmp = mycols[nzcount++]/bs;
1487         if (!mask[tmp]) {
1488           mask[tmp] = 1;
1489           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
1490           else masked1[dcount++] = tmp;
1491         }
1492       }
1493       rowcount++;
1494     }
1495 
1496     dlens[i]  = dcount;
1497     odlens[i] = odcount;
1498 
1499     /* zero out the mask elements we set */
1500     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
1501     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
1502   }
1503 
1504   /* create our matrix */
1505   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);
1506          CHKERRQ(ierr);
1507   A = *newmat;
1508   MatSetOption(A,MAT_COLUMNS_SORTED);
1509 
1510   if (!rank) {
1511     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
1512     /* read in my part of the matrix numerical values  */
1513     nz = procsnz[0];
1514     vals = buf;
1515     mycols = ibuf;
1516     if (size == 1)  nz -= extra_rows;
1517     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1518     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
1519 
1520     /* insert into matrix */
1521     jj      = rstart*bs;
1522     for ( i=0; i<m; i++ ) {
1523       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1524       mycols += locrowlens[i];
1525       vals   += locrowlens[i];
1526       jj++;
1527     }
1528     /* read in other processors (except the last one) and ship out */
1529     for ( i=1; i<size-1; i++ ) {
1530       nz = procsnz[i];
1531       vals = buf;
1532       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1533       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1534     }
1535     /* the last proc */
1536     if ( size != 1 ){
1537       nz = procsnz[i] - extra_rows;
1538       vals = buf;
1539       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1540       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
1541       MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);
1542     }
1543     PetscFree(procsnz);
1544   }
1545   else {
1546     /* receive numeric values */
1547     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
1548 
1549     /* receive message of values*/
1550     vals = buf;
1551     mycols = ibuf;
1552     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1553     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1554     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIBAIJ:something is wrong with file");
1555 
1556     /* insert into matrix */
1557     jj      = rstart*bs;
1558     for ( i=0; i<m; i++ ) {
1559       ierr    = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1560       mycols += locrowlens[i];
1561       vals   += locrowlens[i];
1562       jj++;
1563     }
1564   }
1565   PetscFree(locrowlens);
1566   PetscFree(buf);
1567   PetscFree(ibuf);
1568   PetscFree(rowners);
1569   PetscFree(dlens);
1570   PetscFree(mask);
1571   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1572   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1573   return 0;
1574 }
1575 
1576 
1577