xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision 2c3acbe9ddcb1424b0c1a2c5c7ff7d317d17c150)
1 #ifndef lint
2 static char vcid[] = "$Id: mpibaij.c,v 1.38 1996/12/01 17:55:24 curfman Exp bsmith $";
3 #endif
4 
5 #include "src/mat/impls/baij/mpi/mpibaij.h"
6 #include "src/vec/vecimpl.h"
7 
8 #include "draw.h"
9 #include "pinclude/pviewer.h"
10 
11 extern int MatSetUpMultiply_MPIBAIJ(Mat);
12 extern int DisAssemble_MPIBAIJ(Mat);
13 extern int MatIncreaseOverlap_MPIBAIJ(Mat,int,IS *,int);
14 extern int MatGetSubMatrices_MPIBAIJ(Mat,int,IS *,IS *,MatGetSubMatrixCall,Mat **);
15 extern int MatLUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,Mat *);
16 extern int MatLUFactorNumeric_MPIBAIJ(Mat,Mat *);
17 extern int MatLUFactor_MPIBAIJ(Mat,IS,IS,double);
18 extern int MatSolve_MPIBAIJ(Mat,Vec,Vec);
19 extern int MatSolveAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
20 extern int MatSolveTrans_MPIBAIJ(Mat,Vec,Vec);
21 extern int MatSolveTransAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
22 extern int MatILUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,int,Mat *);
23 
24 
25 /*
26      Local utility routine that creates a mapping from the global column
27    number to the local number in the off-diagonal part of the local
28    storage of the matrix.  This is done in a non scable way since the
29    length of colmap equals the global matrix length.
30 */
31 static int CreateColmap_MPIBAIJ_Private(Mat mat)
32 {
33   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
34   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
35   int         nbs = B->nbs,i,bs=B->bs;;
36 
37   baij->colmap = (int *) PetscMalloc(baij->Nbs*sizeof(int));CHKPTRQ(baij->colmap);
38   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
39   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
40   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i*bs+1;
41   return 0;
42 }
43 
44 static int MatGetRowIJ_MPIBAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
45                             PetscTruth *done)
46 {
47   Mat_MPIBAIJ *aij = (Mat_MPIBAIJ *) mat->data;
48   int         ierr;
49   if (aij->size == 1) {
50     ierr = MatGetRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
51   } else SETERRQ(1,"MatGetRowIJ_MPIBAIJ:not supported in parallel");
52   return 0;
53 }
54 
55 static int MatRestoreRowIJ_MPIBAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
56                                 PetscTruth *done)
57 {
58   Mat_MPIBAIJ *aij = (Mat_MPIBAIJ *) mat->data;
59   int        ierr;
60   if (aij->size == 1) {
61     ierr = MatRestoreRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
62   } else SETERRQ(1,"MatRestoreRowIJ_MPIBAIJ:not supported in parallel");
63   return 0;
64 }
65 
66 extern int MatSetValues_SeqBAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
67 static int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
68 {
69   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
70   Scalar      value;
71   int         ierr,i,j, rstart = baij->rstart, rend = baij->rend;
72   int         cstart = baij->cstart, cend = baij->cend,row,col;
73   int         roworiented = baij->roworiented,rstart_orig,rend_orig;
74   int         cstart_orig,cend_orig,bs=baij->bs;
75 
76 #if defined(PETSC_BOPT_g)
77   if (baij->insertmode != NOT_SET_VALUES && baij->insertmode != addv) {
78     SETERRQ(1,"MatSetValues_MPIBAIJ:Cannot mix inserts and adds");
79   }
80 #endif
81   baij->insertmode = addv;
82   rstart_orig      = rstart*bs;
83   rend_orig        = rend*bs;
84   cstart_orig      = cstart*bs;
85   cend_orig        = cend*bs;
86   for ( i=0; i<m; i++ ) {
87 #if defined(PETSC_BOPT_g)
88     if (im[i] < 0) SETERRQ(1,"MatSetValues_MPIBAIJ:Negative row");
89     if (im[i] >= baij->M) SETERRQ(1,"MatSetValues_MPIBAIJ:Row too large");
90 #endif
91     if (im[i] >= rstart_orig && im[i] < rend_orig) {
92       row = im[i] - rstart_orig;
93       for ( j=0; j<n; j++ ) {
94         if (in[j] >= cstart_orig && in[j] < cend_orig){
95           col = in[j] - cstart_orig;
96           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
97           ierr = MatSetValues_SeqBAIJ(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
98         }
99 #if defined(PETSC_BOPT_g)
100         else if (in[j] < 0) {SETERRQ(1,"MatSetValues_MPIBAIJ:Negative column");}
101         else if (in[j] >= baij->N) {SETERRQ(1,"MatSetValues_MPIBAIJ:Col too large");}
102 #endif
103         else {
104           if (mat->was_assembled) {
105             if (!baij->colmap) {
106               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
107             }
108             col = baij->colmap[in[j]/bs] - 1 + in[j]%bs;
109             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
110               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
111               col =  in[j];
112             }
113           }
114           else col = in[j];
115           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
116           ierr = MatSetValues_SeqBAIJ(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
117         }
118       }
119     }
120     else {
121       if (roworiented && !baij->donotstash) {
122         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
123       }
124       else {
125         if (!baij->donotstash) {
126           row = im[i];
127 	  for ( j=0; j<n; j++ ) {
128 	    ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
129           }
130         }
131       }
132     }
133   }
134   return 0;
135 }
136 
137 static int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
138 {
139   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
140   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
141   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col;
142 
143   for ( i=0; i<m; i++ ) {
144     if (idxm[i] < 0) SETERRQ(1,"MatGetValues_MPIBAIJ:Negative row");
145     if (idxm[i] >= baij->M) SETERRQ(1,"MatGetValues_MPIBAIJ:Row too large");
146     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
147       row = idxm[i] - bsrstart;
148       for ( j=0; j<n; j++ ) {
149         if (idxn[j] < 0) SETERRQ(1,"MatGetValues_MPIBAIJ:Negative column");
150         if (idxn[j] >= baij->N) SETERRQ(1,"MatGetValues_MPIBAIJ:Col too large");
151         if (idxn[j] >= bscstart && idxn[j] < bscend){
152           col = idxn[j] - bscstart;
153           ierr = MatGetValues(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
154         }
155         else {
156           if (!baij->colmap) {
157             ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
158           }
159           if((baij->colmap[idxn[j]/bs]-1 < 0) ||
160              (baij->garray[baij->colmap[idxn[j]/bs]-1] != idxn[j]/bs)) *(v+i*n+j) = 0.0;
161           else {
162             col  = (baij->colmap[idxn[j]/bs]-1)*bs + idxn[j]%bs;
163             ierr = MatGetValues(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
164           }
165         }
166       }
167     }
168     else {
169       SETERRQ(1,"MatGetValues_MPIBAIJ:Only local values currently supported");
170     }
171   }
172   return 0;
173 }
174 
175 static int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
176 {
177   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
178   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
179   int        ierr, i,bs2=baij->bs2;
180   double     sum = 0.0;
181   Scalar     *v;
182 
183   if (baij->size == 1) {
184     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
185   } else {
186     if (type == NORM_FROBENIUS) {
187       v = amat->a;
188       for (i=0; i<amat->nz*bs2; i++ ) {
189 #if defined(PETSC_COMPLEX)
190         sum += real(conj(*v)*(*v)); v++;
191 #else
192         sum += (*v)*(*v); v++;
193 #endif
194       }
195       v = bmat->a;
196       for (i=0; i<bmat->nz*bs2; i++ ) {
197 #if defined(PETSC_COMPLEX)
198         sum += real(conj(*v)*(*v)); v++;
199 #else
200         sum += (*v)*(*v); v++;
201 #endif
202       }
203       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
204       *norm = sqrt(*norm);
205     }
206     else
207       SETERRQ(PETSC_ERR_SUP,"MatNorm_MPIBAIJ:No support for this norm yet");
208   }
209   return 0;
210 }
211 
212 static int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
213 {
214   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
215   MPI_Comm    comm = mat->comm;
216   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
217   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
218   MPI_Request *send_waits,*recv_waits;
219   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
220   InsertMode  addv;
221   Scalar      *rvalues,*svalues;
222 
223   /* make sure all processors are either in INSERTMODE or ADDMODE */
224   MPI_Allreduce(&baij->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
225   if (addv == (ADD_VALUES|INSERT_VALUES)) {
226     SETERRQ(1,"MatAssemblyBegin_MPIBAIJ:Some processors inserted others added");
227   }
228   baij->insertmode = addv; /* in case this processor had no cache */
229 
230   /*  first count number of contributors to each processor */
231   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
232   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
233   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
234   for ( i=0; i<baij->stash.n; i++ ) {
235     idx = baij->stash.idx[i];
236     for ( j=0; j<size; j++ ) {
237       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
238         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
239       }
240     }
241   }
242   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
243 
244   /* inform other processors of number of messages and max length*/
245   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
246   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
247   nreceives = work[rank];
248   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
249   nmax = work[rank];
250   PetscFree(work);
251 
252   /* post receives:
253        1) each message will consist of ordered pairs
254      (global index,value) we store the global index as a double
255      to simplify the message passing.
256        2) since we don't know how long each individual message is we
257      allocate the largest needed buffer for each receive. Potentially
258      this is a lot of wasted space.
259 
260 
261        This could be done better.
262   */
263   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
264   CHKPTRQ(rvalues);
265   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
266   CHKPTRQ(recv_waits);
267   for ( i=0; i<nreceives; i++ ) {
268     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
269               comm,recv_waits+i);
270   }
271 
272   /* do sends:
273       1) starts[i] gives the starting index in svalues for stuff going to
274          the ith processor
275   */
276   svalues = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
277   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
278   CHKPTRQ(send_waits);
279   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
280   starts[0] = 0;
281   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
282   for ( i=0; i<baij->stash.n; i++ ) {
283     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
284     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
285     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
286   }
287   PetscFree(owner);
288   starts[0] = 0;
289   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
290   count = 0;
291   for ( i=0; i<size; i++ ) {
292     if (procs[i]) {
293       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
294                 comm,send_waits+count++);
295     }
296   }
297   PetscFree(starts); PetscFree(nprocs);
298 
299   /* Free cache space */
300   PLogInfo(mat,"MatAssemblyBegin_MPIBAIJ:Number of off-processor values %d\n",baij->stash.n);
301   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
302 
303   baij->svalues    = svalues;    baij->rvalues    = rvalues;
304   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
305   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
306   baij->rmax       = nmax;
307 
308   return 0;
309 }
310 
311 
312 static int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
313 {
314   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
315   MPI_Status  *send_status,recv_status;
316   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
317   int         bs=baij->bs,row,col,other_disassembled;
318   Scalar      *values,val;
319   InsertMode  addv = baij->insertmode;
320 
321   /*  wait on receives */
322   while (count) {
323     MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);
324     /* unpack receives into our local space */
325     values = baij->rvalues + 3*imdex*baij->rmax;
326     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
327     n = n/3;
328     for ( i=0; i<n; i++ ) {
329       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
330       col = (int) PetscReal(values[3*i+1]);
331       val = values[3*i+2];
332       if (col >= baij->cstart*bs && col < baij->cend*bs) {
333         col -= baij->cstart*bs;
334         MatSetValues(baij->A,1,&row,1,&col,&val,addv);
335       }
336       else {
337         if (mat->was_assembled) {
338           if (!baij->colmap) {
339             ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);
340           }
341           col = (baij->colmap[col/bs]-1)*bs + col%bs;
342           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
343             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
344             col = (int) PetscReal(values[3*i+1]);
345           }
346         }
347         MatSetValues(baij->B,1,&row,1,&col,&val,addv);
348       }
349     }
350     count--;
351   }
352   PetscFree(baij->recv_waits); PetscFree(baij->rvalues);
353 
354   /* wait on sends */
355   if (baij->nsends) {
356     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));
357     CHKPTRQ(send_status);
358     MPI_Waitall(baij->nsends,baij->send_waits,send_status);
359     PetscFree(send_status);
360   }
361   PetscFree(baij->send_waits); PetscFree(baij->svalues);
362 
363   baij->insertmode = NOT_SET_VALUES;
364   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
365   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
366 
367   /* determine if any processor has disassembled, if so we must
368      also disassemble ourselfs, in order that we may reassemble. */
369   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
370   if (mat->was_assembled && !other_disassembled) {
371     ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
372   }
373 
374   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
375     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
376   }
377   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
378   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
379 
380   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
381   return 0;
382 }
383 
384 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
385 {
386   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
387   int          ierr;
388 
389   if (baij->size == 1) {
390     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
391   }
392   else SETERRQ(1,"MatView_MPIBAIJ_Binary:Only uniprocessor output supported");
393   return 0;
394 }
395 
396 static int MatView_MPIBAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
397 {
398   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
399   int          ierr, format,rank,bs = baij->bs;
400   FILE         *fd;
401   ViewerType   vtype;
402 
403   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
404   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
405     ierr = ViewerGetFormat(viewer,&format);
406     if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
407       MatInfo info;
408       MPI_Comm_rank(mat->comm,&rank);
409       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
410       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
411       PetscSequentialPhaseBegin(mat->comm,1);
412       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
413               rank,baij->m,(int)info.nz_used*bs,(int)info.nz_allocated*bs,
414               baij->bs,(int)info.memory);
415       ierr = MatGetInfo(baij->A,MAT_LOCAL,&info);
416       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
417       ierr = MatGetInfo(baij->B,MAT_LOCAL,&info);
418       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
419       fflush(fd);
420       PetscSequentialPhaseEnd(mat->comm,1);
421       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
422       return 0;
423     }
424     else if (format == VIEWER_FORMAT_ASCII_INFO) {
425       PetscPrintf(mat->comm,"  block size is %d\n",bs);
426       return 0;
427     }
428   }
429 
430   if (vtype == DRAW_VIEWER) {
431     Draw       draw;
432     PetscTruth isnull;
433     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
434     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
435   }
436 
437   if (vtype == ASCII_FILE_VIEWER) {
438     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
439     PetscSequentialPhaseBegin(mat->comm,1);
440     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
441            baij->rank,baij->m,baij->rstart*bs,baij->rend*bs,baij->n,
442             baij->cstart*bs,baij->cend*bs);
443     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
444     ierr = MatView(baij->B,viewer); CHKERRQ(ierr);
445     fflush(fd);
446     PetscSequentialPhaseEnd(mat->comm,1);
447   }
448   else {
449     int size = baij->size;
450     rank = baij->rank;
451     if (size == 1) {
452       ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
453     }
454     else {
455       /* assemble the entire matrix onto first processor. */
456       Mat         A;
457       Mat_SeqBAIJ *Aloc;
458       int         M = baij->M, N = baij->N,*ai,*aj,row,col,i,j,k,*rvals;
459       int         mbs=baij->mbs;
460       Scalar      *a;
461 
462       if (!rank) {
463         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
464         CHKERRQ(ierr);
465       }
466       else {
467         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
468         CHKERRQ(ierr);
469       }
470       PLogObjectParent(mat,A);
471 
472       /* copy over the A part */
473       Aloc = (Mat_SeqBAIJ*) baij->A->data;
474       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
475       row = baij->rstart;
476       rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
477 
478       for ( i=0; i<mbs; i++ ) {
479         rvals[0] = bs*(baij->rstart + i);
480         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
481         for ( j=ai[i]; j<ai[i+1]; j++ ) {
482           col = (baij->cstart+aj[j])*bs;
483           for (k=0; k<bs; k++ ) {
484             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
485             col++; a += bs;
486           }
487         }
488       }
489       /* copy over the B part */
490       Aloc = (Mat_SeqBAIJ*) baij->B->data;
491       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
492       row = baij->rstart*bs;
493       for ( i=0; i<mbs; i++ ) {
494         rvals[0] = bs*(baij->rstart + i);
495         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
496         for ( j=ai[i]; j<ai[i+1]; j++ ) {
497           col = baij->garray[aj[j]]*bs;
498           for (k=0; k<bs; k++ ) {
499             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
500             col++; a += bs;
501           }
502         }
503       }
504       PetscFree(rvals);
505       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
506       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
507       if (!rank) {
508         ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
509       }
510       ierr = MatDestroy(A); CHKERRQ(ierr);
511     }
512   }
513   return 0;
514 }
515 
516 
517 
518 static int MatView_MPIBAIJ(PetscObject obj,Viewer viewer)
519 {
520   Mat         mat = (Mat) obj;
521   int         ierr;
522   ViewerType  vtype;
523 
524   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
525   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
526       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
527     ierr = MatView_MPIBAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
528   }
529   else if (vtype == BINARY_FILE_VIEWER) {
530     return MatView_MPIBAIJ_Binary(mat,viewer);
531   }
532   return 0;
533 }
534 
535 static int MatDestroy_MPIBAIJ(PetscObject obj)
536 {
537   Mat         mat = (Mat) obj;
538   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
539   int         ierr;
540 
541 #if defined(PETSC_LOG)
542   PLogObjectState(obj,"Rows=%d, Cols=%d",baij->M,baij->N);
543 #endif
544 
545   PetscFree(baij->rowners);
546   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
547   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
548   if (baij->colmap) PetscFree(baij->colmap);
549   if (baij->garray) PetscFree(baij->garray);
550   if (baij->lvec)   VecDestroy(baij->lvec);
551   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
552   if (baij->rowvalues) PetscFree(baij->rowvalues);
553   PetscFree(baij);
554   if (mat->mapping) {
555     ierr = ISLocalToGlobalMappingDestroy(mat->mapping); CHKERRQ(ierr);
556   }
557   PLogObjectDestroy(mat);
558   PetscHeaderDestroy(mat);
559   return 0;
560 }
561 
562 static int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
563 {
564   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
565   int         ierr, nt;
566 
567   VecGetLocalSize_Fast(xx,nt);
568   if (nt != a->n) {
569     SETERRQ(1,"MatMult_MPIBAIJ:Incompatible parition of A and xx");
570   }
571   VecGetLocalSize_Fast(yy,nt);
572   if (nt != a->m) {
573     SETERRQ(1,"MatMult_MPIBAIJ:Incompatible parition of A and yy");
574   }
575   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
576   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
577   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
578   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
579   ierr = VecScatterPostRecvs(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
580   return 0;
581 }
582 
583 static int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
584 {
585   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
586   int        ierr;
587   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
588   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
589   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
590   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
591   return 0;
592 }
593 
594 static int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
595 {
596   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
597   int        ierr;
598 
599   /* do nondiagonal part */
600   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
601   /* send it on its way */
602   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
603   /* do local part */
604   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
605   /* receive remote parts: note this assumes the values are not actually */
606   /* inserted in yy until the next line, which is true for my implementation*/
607   /* but is not perhaps always true. */
608   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
609   return 0;
610 }
611 
612 static int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
613 {
614   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
615   int        ierr;
616 
617   /* do nondiagonal part */
618   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
619   /* send it on its way */
620   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
621   /* do local part */
622   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
623   /* receive remote parts: note this assumes the values are not actually */
624   /* inserted in yy until the next line, which is true for my implementation*/
625   /* but is not perhaps always true. */
626   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
627   return 0;
628 }
629 
630 /*
631   This only works correctly for square matrices where the subblock A->A is the
632    diagonal block
633 */
634 static int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
635 {
636   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
637   if (a->M != a->N)
638     SETERRQ(1,"MatGetDiagonal_MPIBAIJ:Supports only square matrix where A->A is diag block");
639   return MatGetDiagonal(a->A,v);
640 }
641 
642 static int MatScale_MPIBAIJ(Scalar *aa,Mat A)
643 {
644   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
645   int        ierr;
646   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
647   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
648   return 0;
649 }
650 static int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
651 {
652   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
653   *m = mat->M; *n = mat->N;
654   return 0;
655 }
656 
657 static int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
658 {
659   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
660   *m = mat->m; *n = mat->N;
661   return 0;
662 }
663 
664 static int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
665 {
666   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
667   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
668   return 0;
669 }
670 
671 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
672 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
673 
674 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
675 {
676   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
677   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
678   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
679   int        nztot, nzA, nzB, lrow, brstart = mat->rstart*bs, brend = mat->rend*bs;
680   int        *cmap, *idx_p,cstart = mat->cstart;
681 
682   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,"MatGetRow_MPIBAIJ:Already active");
683   mat->getrowactive = PETSC_TRUE;
684 
685   if (!mat->rowvalues && (idx || v)) {
686     /*
687         allocate enough space to hold information from the longest row.
688     */
689     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
690     int     max = 1,mbs = mat->mbs,tmp;
691     for ( i=0; i<mbs; i++ ) {
692       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
693       if (max < tmp) { max = tmp; }
694     }
695     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
696     CHKPTRQ(mat->rowvalues);
697     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
698   }
699 
700 
701   if (row < brstart || row >= brend) SETERRQ(1,"MatGetRow_MPIBAIJ:Only local rows")
702   lrow = row - brstart;
703 
704   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
705   if (!v)   {pvA = 0; pvB = 0;}
706   if (!idx) {pcA = 0; if (!v) pcB = 0;}
707   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
708   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
709   nztot = nzA + nzB;
710 
711   cmap  = mat->garray;
712   if (v  || idx) {
713     if (nztot) {
714       /* Sort by increasing column numbers, assuming A and B already sorted */
715       int imark = -1;
716       if (v) {
717         *v = v_p = mat->rowvalues;
718         for ( i=0; i<nzB; i++ ) {
719           if (cmap[cworkB[i]/bs] < cstart)   v_p[i] = vworkB[i];
720           else break;
721         }
722         imark = i;
723         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
724         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
725       }
726       if (idx) {
727         *idx = idx_p = mat->rowindices;
728         if (imark > -1) {
729           for ( i=0; i<imark; i++ ) {
730             idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs;
731           }
732         } else {
733           for ( i=0; i<nzB; i++ ) {
734             if (cmap[cworkB[i]/bs] < cstart)
735               idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
736             else break;
737           }
738           imark = i;
739         }
740         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart*bs + cworkA[i];
741         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
742       }
743     }
744     else {
745       if (idx) *idx = 0;
746       if (v)   *v   = 0;
747     }
748   }
749   *nz = nztot;
750   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
751   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
752   return 0;
753 }
754 
755 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
756 {
757   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
758   if (baij->getrowactive == PETSC_FALSE) {
759     SETERRQ(1,"MatRestoreRow_MPIBAIJ:MatGetRow not called");
760   }
761   baij->getrowactive = PETSC_FALSE;
762   return 0;
763 }
764 
765 static int MatGetBlockSize_MPIBAIJ(Mat mat,int *bs)
766 {
767   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
768   *bs = baij->bs;
769   return 0;
770 }
771 
772 static int MatZeroEntries_MPIBAIJ(Mat A)
773 {
774   Mat_MPIBAIJ *l = (Mat_MPIBAIJ *) A->data;
775   int         ierr;
776   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
777   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
778   return 0;
779 }
780 
781 static int MatGetInfo_MPIBAIJ(Mat matin,MatInfoType flag,MatInfo *info)
782 {
783   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) matin->data;
784   Mat         A = a->A, B = a->B;
785   int         ierr;
786   double      isend[5], irecv[5];
787 
788   info->rows_global    = (double)a->M;
789   info->columns_global = (double)a->N;
790   info->rows_local     = (double)a->m;
791   info->columns_local  = (double)a->N;
792   info->block_size     = (double)a->bs;
793   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
794   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->memory;
795   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
796   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->memory;
797   if (flag == MAT_LOCAL) {
798     info->nz_used      = isend[0];
799     info->nz_allocated = isend[1];
800     info->nz_unneeded  = isend[2];
801     info->memory       = isend[3];
802     info->mallocs      = isend[4];
803   } else if (flag == MAT_GLOBAL_MAX) {
804     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_MAX,matin->comm);
805     info->nz_used      = irecv[0];
806     info->nz_allocated = irecv[1];
807     info->nz_unneeded  = irecv[2];
808     info->memory       = irecv[3];
809     info->mallocs      = irecv[4];
810   } else if (flag == MAT_GLOBAL_SUM) {
811     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_SUM,matin->comm);
812     info->nz_used      = irecv[0];
813     info->nz_allocated = irecv[1];
814     info->nz_unneeded  = irecv[2];
815     info->memory       = irecv[3];
816     info->mallocs      = irecv[4];
817   }
818   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
819   info->fill_ratio_needed = 0;
820   info->factor_mallocs    = 0;
821   return 0;
822 }
823 
824 static int MatSetOption_MPIBAIJ(Mat A,MatOption op)
825 {
826   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
827 
828   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
829       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
830       op == MAT_COLUMNS_UNSORTED ||
831       op == MAT_COLUMNS_SORTED) {
832         MatSetOption(a->A,op);
833         MatSetOption(a->B,op);
834   } else if (op == MAT_ROW_ORIENTED) {
835         a->roworiented = 1;
836         MatSetOption(a->A,op);
837         MatSetOption(a->B,op);
838   } else if (op == MAT_ROWS_SORTED ||
839              op == MAT_ROWS_UNSORTED ||
840              op == MAT_SYMMETRIC ||
841              op == MAT_STRUCTURALLY_SYMMETRIC ||
842              op == MAT_YES_NEW_DIAGONALS)
843     PLogInfo(A,"Info:MatSetOption_MPIBAIJ:Option ignored\n");
844   else if (op == MAT_COLUMN_ORIENTED) {
845     a->roworiented = 0;
846     MatSetOption(a->A,op);
847     MatSetOption(a->B,op);
848   } else if (op == MAT_IGNORE_OFF_PROCESSOR_ENTRIES) {
849     a->donotstash = 1;
850   } else if (op == MAT_NO_NEW_DIAGONALS)
851     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIBAIJ:MAT_NO_NEW_DIAGONALS");}
852   else
853     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIBAIJ:unknown option");}
854   return 0;
855 }
856 
857 static int MatTranspose_MPIBAIJ(Mat A,Mat *matout)
858 {
859   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) A->data;
860   Mat_SeqBAIJ *Aloc;
861   Mat        B;
862   int        ierr,M=baij->M,N=baij->N,*ai,*aj,row,i,*rvals,j,k,col;
863   int        bs=baij->bs,mbs=baij->mbs;
864   Scalar     *a;
865 
866   if (matout == PETSC_NULL && M != N)
867     SETERRQ(1,"MatTranspose_MPIBAIJ:Square matrix only for in-place");
868   ierr = MatCreateMPIBAIJ(A->comm,baij->bs,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);
869   CHKERRQ(ierr);
870 
871   /* copy over the A part */
872   Aloc = (Mat_SeqBAIJ*) baij->A->data;
873   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
874   row = baij->rstart;
875   rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
876 
877   for ( i=0; i<mbs; i++ ) {
878     rvals[0] = bs*(baij->rstart + i);
879     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
880     for ( j=ai[i]; j<ai[i+1]; j++ ) {
881       col = (baij->cstart+aj[j])*bs;
882       for (k=0; k<bs; k++ ) {
883         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
884         col++; a += bs;
885       }
886     }
887   }
888   /* copy over the B part */
889   Aloc = (Mat_SeqBAIJ*) baij->B->data;
890   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
891   row = baij->rstart*bs;
892   for ( i=0; i<mbs; i++ ) {
893     rvals[0] = bs*(baij->rstart + i);
894     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
895     for ( j=ai[i]; j<ai[i+1]; j++ ) {
896       col = baij->garray[aj[j]]*bs;
897       for (k=0; k<bs; k++ ) {
898         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
899         col++; a += bs;
900       }
901     }
902   }
903   PetscFree(rvals);
904   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
905   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
906 
907   if (matout != PETSC_NULL) {
908     *matout = B;
909   } else {
910     /* This isn't really an in-place transpose .... but free data structures from baij */
911     PetscFree(baij->rowners);
912     ierr = MatDestroy(baij->A); CHKERRQ(ierr);
913     ierr = MatDestroy(baij->B); CHKERRQ(ierr);
914     if (baij->colmap) PetscFree(baij->colmap);
915     if (baij->garray) PetscFree(baij->garray);
916     if (baij->lvec) VecDestroy(baij->lvec);
917     if (baij->Mvctx) VecScatterDestroy(baij->Mvctx);
918     PetscFree(baij);
919     PetscMemcpy(A,B,sizeof(struct _Mat));
920     PetscHeaderDestroy(B);
921   }
922   return 0;
923 }
924 
925 int MatDiagonalScale_MPIBAIJ(Mat A,Vec ll,Vec rr)
926 {
927   Mat a = ((Mat_MPIBAIJ *) A->data)->A;
928   Mat b = ((Mat_MPIBAIJ *) A->data)->B;
929   int ierr,s1,s2,s3;
930 
931   if (ll)  {
932     ierr = VecGetLocalSize(ll,&s1); CHKERRQ(ierr);
933     ierr = MatGetLocalSize(A,&s2,&s3); CHKERRQ(ierr);
934     if (s1!=s2) SETERRQ(1,"MatDiagonalScale_MPIBAIJ: non-conforming local sizes");
935     ierr = MatDiagonalScale(a,ll,0); CHKERRQ(ierr);
936     ierr = MatDiagonalScale(b,ll,0); CHKERRQ(ierr);
937   }
938   if (rr) SETERRQ(1,"MatDiagonalScale_MPIBAIJ:not supported for right vector");
939   return 0;
940 }
941 
942 /* the code does not do the diagonal entries correctly unless the
943    matrix is square and the column and row owerships are identical.
944    This is a BUG. The only way to fix it seems to be to access
945    baij->A and baij->B directly and not through the MatZeroRows()
946    routine.
947 */
948 static int MatZeroRows_MPIBAIJ(Mat A,IS is,Scalar *diag)
949 {
950   Mat_MPIBAIJ    *l = (Mat_MPIBAIJ *) A->data;
951   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
952   int            *procs,*nprocs,j,found,idx,nsends,*work;
953   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
954   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
955   int            *lens,imdex,*lrows,*values,bs=l->bs;
956   MPI_Comm       comm = A->comm;
957   MPI_Request    *send_waits,*recv_waits;
958   MPI_Status     recv_status,*send_status;
959   IS             istmp;
960 
961   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
962   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
963 
964   /*  first count number of contributors to each processor */
965   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
966   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
967   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
968   for ( i=0; i<N; i++ ) {
969     idx = rows[i];
970     found = 0;
971     for ( j=0; j<size; j++ ) {
972       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
973         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
974       }
975     }
976     if (!found) SETERRQ(1,"MatZeroRows_MPIBAIJ:Index out of range");
977   }
978   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
979 
980   /* inform other processors of number of messages and max length*/
981   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
982   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
983   nrecvs = work[rank];
984   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
985   nmax = work[rank];
986   PetscFree(work);
987 
988   /* post receives:   */
989   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
990   CHKPTRQ(rvalues);
991   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
992   CHKPTRQ(recv_waits);
993   for ( i=0; i<nrecvs; i++ ) {
994     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
995   }
996 
997   /* do sends:
998       1) starts[i] gives the starting index in svalues for stuff going to
999          the ith processor
1000   */
1001   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
1002   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
1003   CHKPTRQ(send_waits);
1004   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
1005   starts[0] = 0;
1006   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1007   for ( i=0; i<N; i++ ) {
1008     svalues[starts[owner[i]]++] = rows[i];
1009   }
1010   ISRestoreIndices(is,&rows);
1011 
1012   starts[0] = 0;
1013   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1014   count = 0;
1015   for ( i=0; i<size; i++ ) {
1016     if (procs[i]) {
1017       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
1018     }
1019   }
1020   PetscFree(starts);
1021 
1022   base = owners[rank]*bs;
1023 
1024   /*  wait on receives */
1025   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
1026   source = lens + nrecvs;
1027   count  = nrecvs; slen = 0;
1028   while (count) {
1029     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
1030     /* unpack receives into our local space */
1031     MPI_Get_count(&recv_status,MPI_INT,&n);
1032     source[imdex]  = recv_status.MPI_SOURCE;
1033     lens[imdex]  = n;
1034     slen += n;
1035     count--;
1036   }
1037   PetscFree(recv_waits);
1038 
1039   /* move the data into the send scatter */
1040   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
1041   count = 0;
1042   for ( i=0; i<nrecvs; i++ ) {
1043     values = rvalues + i*nmax;
1044     for ( j=0; j<lens[i]; j++ ) {
1045       lrows[count++] = values[j] - base;
1046     }
1047   }
1048   PetscFree(rvalues); PetscFree(lens);
1049   PetscFree(owner); PetscFree(nprocs);
1050 
1051   /* actually zap the local rows */
1052   ierr = ISCreateGeneral(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
1053   PLogObjectParent(A,istmp);
1054   PetscFree(lrows);
1055   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
1056   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
1057   ierr = ISDestroy(istmp); CHKERRQ(ierr);
1058 
1059   /* wait on sends */
1060   if (nsends) {
1061     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
1062     CHKPTRQ(send_status);
1063     MPI_Waitall(nsends,send_waits,send_status);
1064     PetscFree(send_status);
1065   }
1066   PetscFree(send_waits); PetscFree(svalues);
1067 
1068   return 0;
1069 }
1070 extern int MatPrintHelp_SeqBAIJ(Mat);
1071 static int MatPrintHelp_MPIBAIJ(Mat A)
1072 {
1073   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1074 
1075   if (!a->rank) return MatPrintHelp_SeqBAIJ(a->A);
1076   else return 0;
1077 }
1078 
1079 static int MatConvertSameType_MPIBAIJ(Mat,Mat *,int);
1080 
1081 /* -------------------------------------------------------------------*/
1082 static struct _MatOps MatOps = {
1083   MatSetValues_MPIBAIJ,MatGetRow_MPIBAIJ,MatRestoreRow_MPIBAIJ,MatMult_MPIBAIJ,
1084   MatMultAdd_MPIBAIJ,MatMultTrans_MPIBAIJ,MatMultTransAdd_MPIBAIJ,0,
1085   0,0,0,0,
1086   0,0,MatTranspose_MPIBAIJ,MatGetInfo_MPIBAIJ,
1087   0,MatGetDiagonal_MPIBAIJ,MatDiagonalScale_MPIBAIJ,MatNorm_MPIBAIJ,
1088   MatAssemblyBegin_MPIBAIJ,MatAssemblyEnd_MPIBAIJ,0,MatSetOption_MPIBAIJ,
1089   MatZeroEntries_MPIBAIJ,MatZeroRows_MPIBAIJ,0,
1090   0,0,0,MatGetSize_MPIBAIJ,
1091   MatGetLocalSize_MPIBAIJ,MatGetOwnershipRange_MPIBAIJ,0,0,
1092   0,0,0,MatConvertSameType_MPIBAIJ,0,0,
1093   0,0,0,MatGetSubMatrices_MPIBAIJ,
1094   MatIncreaseOverlap_MPIBAIJ,MatGetValues_MPIBAIJ,0,MatPrintHelp_MPIBAIJ,
1095   MatScale_MPIBAIJ,0,0,0,MatGetBlockSize_MPIBAIJ};
1096 
1097 
1098 /*@C
1099    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
1100    (block compressed row).  For good matrix assembly performance
1101    the user should preallocate the matrix storage by setting the parameters
1102    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1103    performance can be increased by more than a factor of 50.
1104 
1105    Input Parameters:
1106 .  comm - MPI communicator
1107 .  bs   - size of blockk
1108 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1109            This value should be the same as the local size used in creating the
1110            y vector for the matrix-vector product y = Ax.
1111 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1112            This value should be the same as the local size used in creating the
1113            x vector for the matrix-vector product y = Ax.
1114 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1115 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1116 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
1117            submatrix  (same for all local rows)
1118 .  d_nzz - array containing the number of block nonzeros in the various block rows
1119            of the in diagonal portion of the local (possibly different for each block
1120            row) or PETSC_NULL.  You must leave room for the diagonal entry even if
1121            it is zero.
1122 .  o_nz  - number of block nonzeros per block row in the off-diagonal portion of local
1123            submatrix (same for all local rows).
1124 .  o_nzz - array containing the number of nonzeros in the various block rows of the
1125            off-diagonal portion of the local submatrix (possibly different for
1126            each block row) or PETSC_NULL.
1127 
1128    Output Parameter:
1129 .  A - the matrix
1130 
1131    Notes:
1132    The user MUST specify either the local or global matrix dimensions
1133    (possibly both).
1134 
1135    Storage Information:
1136    For a square global matrix we define each processor's diagonal portion
1137    to be its local rows and the corresponding columns (a square submatrix);
1138    each processor's off-diagonal portion encompasses the remainder of the
1139    local matrix (a rectangular submatrix).
1140 
1141    The user can specify preallocated storage for the diagonal part of
1142    the local submatrix with either d_nz or d_nnz (not both).  Set
1143    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1144    memory allocation.  Likewise, specify preallocated storage for the
1145    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1146 
1147    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1148    the figure below we depict these three local rows and all columns (0-11).
1149 
1150 $          0 1 2 3 4 5 6 7 8 9 10 11
1151 $         -------------------
1152 $  row 3  |  o o o d d d o o o o o o
1153 $  row 4  |  o o o d d d o o o o o o
1154 $  row 5  |  o o o d d d o o o o o o
1155 $         -------------------
1156 $
1157 
1158    Thus, any entries in the d locations are stored in the d (diagonal)
1159    submatrix, and any entries in the o locations are stored in the
1160    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1161    stored simply in the MATSEQBAIJ format for compressed row storage.
1162 
1163    Now d_nz should indicate the number of nonzeros per row in the d matrix,
1164    and o_nz should indicate the number of nonzeros per row in the o matrix.
1165    In general, for PDE problems in which most nonzeros are near the diagonal,
1166    one expects d_nz >> o_nz.   For large problems you MUST preallocate memory
1167    or you will get TERRIBLE performance; see the users' manual chapter on
1168    matrices.
1169 
1170 .keywords: matrix, block, aij, compressed row, sparse, parallel
1171 
1172 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues()
1173 @*/
1174 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
1175                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1176 {
1177   Mat          B;
1178   Mat_MPIBAIJ  *b;
1179   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE,size;
1180 
1181   if (bs < 1) SETERRQ(1,"MatCreateMPIBAIJ: invalid block size specified");
1182   *A = 0;
1183   PetscHeaderCreate(B,_Mat,MAT_COOKIE,MATMPIBAIJ,comm);
1184   PLogObjectCreate(B);
1185   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
1186   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
1187   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1188   MPI_Comm_size(comm,&size);
1189   if (size == 1) {
1190     B->ops.getrowij          = MatGetRowIJ_MPIBAIJ;
1191     B->ops.restorerowij      = MatRestoreRowIJ_MPIBAIJ;
1192     B->ops.lufactorsymbolic  = MatLUFactorSymbolic_MPIBAIJ;
1193     B->ops.lufactornumeric   = MatLUFactorNumeric_MPIBAIJ;
1194     B->ops.lufactor          = MatLUFactor_MPIBAIJ;
1195     B->ops.solve             = MatSolve_MPIBAIJ;
1196     B->ops.solveadd          = MatSolveAdd_MPIBAIJ;
1197     B->ops.solvetrans        = MatSolveTrans_MPIBAIJ;
1198     B->ops.solvetransadd     = MatSolveTransAdd_MPIBAIJ;
1199     B->ops.ilufactorsymbolic = MatILUFactorSymbolic_MPIBAIJ;
1200   }
1201 
1202   B->destroy    = MatDestroy_MPIBAIJ;
1203   B->view       = MatView_MPIBAIJ;
1204   B->mapping    = 0;
1205   B->factor     = 0;
1206   B->assembled  = PETSC_FALSE;
1207 
1208   b->insertmode = NOT_SET_VALUES;
1209   MPI_Comm_rank(comm,&b->rank);
1210   MPI_Comm_size(comm,&b->size);
1211 
1212   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
1213     SETERRQ(1,"MatCreateMPIBAIJ:Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1214   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) SETERRQ(1,"MatCreateMPIBAIJ: either M or m should be specified");
1215   if ( M == PETSC_DECIDE && n == PETSC_DECIDE)SETERRQ(1,"MatCreateMPIBAIJ: either N or n should be specified");
1216   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
1217   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
1218 
1219   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1220     work[0] = m; work[1] = n;
1221     mbs = m/bs; nbs = n/bs;
1222     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
1223     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
1224     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
1225   }
1226   if (m == PETSC_DECIDE) {
1227     Mbs = M/bs;
1228     if (Mbs*bs != M) SETERRQ(1,"MatCreateMPIBAIJ: No of global rows must be divisible by blocksize");
1229     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
1230     m   = mbs*bs;
1231   }
1232   if (n == PETSC_DECIDE) {
1233     Nbs = N/bs;
1234     if (Nbs*bs != N) SETERRQ(1,"MatCreateMPIBAIJ: No of global cols must be divisible by blocksize");
1235     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
1236     n   = nbs*bs;
1237   }
1238   if (mbs*bs != m || nbs*bs != n) SETERRQ(1,"MatCreateMPIBAIJ: No of local rows, cols must be divisible by blocksize");
1239 
1240   b->m = m; B->m = m;
1241   b->n = n; B->n = n;
1242   b->N = N; B->N = N;
1243   b->M = M; B->M = M;
1244   b->bs  = bs;
1245   b->bs2 = bs*bs;
1246   b->mbs = mbs;
1247   b->nbs = nbs;
1248   b->Mbs = Mbs;
1249   b->Nbs = Nbs;
1250 
1251   /* build local table of row and column ownerships */
1252   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1253   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
1254   b->cowners = b->rowners + b->size + 2;
1255   MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
1256   b->rowners[0] = 0;
1257   for ( i=2; i<=b->size; i++ ) {
1258     b->rowners[i] += b->rowners[i-1];
1259   }
1260   b->rstart = b->rowners[b->rank];
1261   b->rend   = b->rowners[b->rank+1];
1262   MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
1263   b->cowners[0] = 0;
1264   for ( i=2; i<=b->size; i++ ) {
1265     b->cowners[i] += b->cowners[i-1];
1266   }
1267   b->cstart = b->cowners[b->rank];
1268   b->cend   = b->cowners[b->rank+1];
1269 
1270   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1271   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1272   PLogObjectParent(B,b->A);
1273   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1274   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1275   PLogObjectParent(B,b->B);
1276 
1277   /* build cache for off array entries formed */
1278   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1279   b->donotstash  = 0;
1280   b->colmap      = 0;
1281   b->garray      = 0;
1282   b->roworiented = 1;
1283 
1284   /* stuff used for matrix vector multiply */
1285   b->lvec      = 0;
1286   b->Mvctx     = 0;
1287 
1288   /* stuff for MatGetRow() */
1289   b->rowindices   = 0;
1290   b->rowvalues    = 0;
1291   b->getrowactive = PETSC_FALSE;
1292 
1293   *A = B;
1294   return 0;
1295 }
1296 static int MatConvertSameType_MPIBAIJ(Mat matin,Mat *newmat,int cpvalues)
1297 {
1298   Mat        mat;
1299   Mat_MPIBAIJ *a,*oldmat = (Mat_MPIBAIJ *) matin->data;
1300   int        ierr, len=0, flg;
1301 
1302   *newmat       = 0;
1303   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIBAIJ,matin->comm);
1304   PLogObjectCreate(mat);
1305   mat->data       = (void *) (a = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(a);
1306   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1307   mat->destroy    = MatDestroy_MPIBAIJ;
1308   mat->view       = MatView_MPIBAIJ;
1309   mat->factor     = matin->factor;
1310   mat->assembled  = PETSC_TRUE;
1311 
1312   a->m = mat->m   = oldmat->m;
1313   a->n = mat->n   = oldmat->n;
1314   a->M = mat->M   = oldmat->M;
1315   a->N = mat->N   = oldmat->N;
1316 
1317   a->bs  = oldmat->bs;
1318   a->bs2 = oldmat->bs2;
1319   a->mbs = oldmat->mbs;
1320   a->nbs = oldmat->nbs;
1321   a->Mbs = oldmat->Mbs;
1322   a->Nbs = oldmat->Nbs;
1323 
1324   a->rstart       = oldmat->rstart;
1325   a->rend         = oldmat->rend;
1326   a->cstart       = oldmat->cstart;
1327   a->cend         = oldmat->cend;
1328   a->size         = oldmat->size;
1329   a->rank         = oldmat->rank;
1330   a->insertmode   = NOT_SET_VALUES;
1331   a->rowvalues    = 0;
1332   a->getrowactive = PETSC_FALSE;
1333 
1334   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1335   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
1336   a->cowners = a->rowners + a->size + 2;
1337   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1338   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1339   if (oldmat->colmap) {
1340     a->colmap = (int *) PetscMalloc((a->Nbs)*sizeof(int));CHKPTRQ(a->colmap);
1341     PLogObjectMemory(mat,(a->Nbs)*sizeof(int));
1342     PetscMemcpy(a->colmap,oldmat->colmap,(a->Nbs)*sizeof(int));
1343   } else a->colmap = 0;
1344   if (oldmat->garray && (len = ((Mat_SeqBAIJ *) (oldmat->B->data))->nbs)) {
1345     a->garray = (int *) PetscMalloc(len*sizeof(int)); CHKPTRQ(a->garray);
1346     PLogObjectMemory(mat,len*sizeof(int));
1347     PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1348   } else a->garray = 0;
1349 
1350   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1351   PLogObjectParent(mat,a->lvec);
1352   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1353   PLogObjectParent(mat,a->Mvctx);
1354   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1355   PLogObjectParent(mat,a->A);
1356   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1357   PLogObjectParent(mat,a->B);
1358   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1359   if (flg) {
1360     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1361   }
1362   *newmat = mat;
1363   return 0;
1364 }
1365 
1366 #include "sys.h"
1367 
1368 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
1369 {
1370   Mat          A;
1371   int          i, nz, ierr, j,rstart, rend, fd;
1372   Scalar       *vals,*buf;
1373   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1374   MPI_Status   status;
1375   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
1376   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
1377   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,bs2,Mbs,mbs,extra_rows;
1378   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
1379   int          dcount,kmax,k,nzcount,tmp;
1380 
1381 
1382   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
1383   bs2  = bs*bs;
1384 
1385   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1386   if (!rank) {
1387     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1388     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
1389     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIBAIJ:not matrix object");
1390   }
1391 
1392   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1393   M = header[1]; N = header[2];
1394 
1395   if (M != N) SETERRQ(1,"MatLoad_MPIBAIJ:Can only do square matrices");
1396 
1397   /*
1398      This code adds extra rows to make sure the number of rows is
1399      divisible by the blocksize
1400   */
1401   Mbs        = M/bs;
1402   extra_rows = bs - M + bs*(Mbs);
1403   if (extra_rows == bs) extra_rows = 0;
1404   else                  Mbs++;
1405   if (extra_rows &&!rank) {
1406     PLogInfo(0,"MatLoad_MPIBAIJ:Padding loaded matrix to match blocksize\n");
1407   }
1408 
1409   /* determine ownership of all rows */
1410   mbs = Mbs/size + ((Mbs % size) > rank);
1411   m   = mbs * bs;
1412   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
1413   browners = rowners + size + 1;
1414   MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1415   rowners[0] = 0;
1416   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
1417   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
1418   rstart = rowners[rank];
1419   rend   = rowners[rank+1];
1420 
1421   /* distribute row lengths to all processors */
1422   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
1423   if (!rank) {
1424     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
1425     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1426     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
1427     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1428     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
1429     MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);
1430     PetscFree(sndcounts);
1431   }
1432   else {
1433     MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);
1434   }
1435 
1436   if (!rank) {
1437     /* calculate the number of nonzeros on each processor */
1438     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1439     PetscMemzero(procsnz,size*sizeof(int));
1440     for ( i=0; i<size; i++ ) {
1441       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
1442         procsnz[i] += rowlengths[j];
1443       }
1444     }
1445     PetscFree(rowlengths);
1446 
1447     /* determine max buffer needed and allocate it */
1448     maxnz = 0;
1449     for ( i=0; i<size; i++ ) {
1450       maxnz = PetscMax(maxnz,procsnz[i]);
1451     }
1452     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1453 
1454     /* read in my part of the matrix column indices  */
1455     nz = procsnz[0];
1456     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1457     mycols = ibuf;
1458     if (size == 1)  nz -= extra_rows;
1459     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1460     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
1461 
1462     /* read in every ones (except the last) and ship off */
1463     for ( i=1; i<size-1; i++ ) {
1464       nz = procsnz[i];
1465       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1466       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1467     }
1468     /* read in the stuff for the last proc */
1469     if ( size != 1 ) {
1470       nz = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
1471       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1472       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
1473       MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);
1474     }
1475     PetscFree(cols);
1476   }
1477   else {
1478     /* determine buffer space needed for message */
1479     nz = 0;
1480     for ( i=0; i<m; i++ ) {
1481       nz += locrowlens[i];
1482     }
1483     ibuf = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1484     mycols = ibuf;
1485     /* receive message of column indices*/
1486     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1487     MPI_Get_count(&status,MPI_INT,&maxnz);
1488     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIBAIJ:something is wrong with file");
1489   }
1490 
1491   /* loop over local rows, determining number of off diagonal entries */
1492   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
1493   odlens = dlens + (rend-rstart);
1494   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
1495   PetscMemzero(mask,3*Mbs*sizeof(int));
1496   masked1 = mask    + Mbs;
1497   masked2 = masked1 + Mbs;
1498   rowcount = 0; nzcount = 0;
1499   for ( i=0; i<mbs; i++ ) {
1500     dcount  = 0;
1501     odcount = 0;
1502     for ( j=0; j<bs; j++ ) {
1503       kmax = locrowlens[rowcount];
1504       for ( k=0; k<kmax; k++ ) {
1505         tmp = mycols[nzcount++]/bs;
1506         if (!mask[tmp]) {
1507           mask[tmp] = 1;
1508           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
1509           else masked1[dcount++] = tmp;
1510         }
1511       }
1512       rowcount++;
1513     }
1514 
1515     dlens[i]  = dcount;
1516     odlens[i] = odcount;
1517 
1518     /* zero out the mask elements we set */
1519     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
1520     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
1521   }
1522 
1523   /* create our matrix */
1524   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);
1525          CHKERRQ(ierr);
1526   A = *newmat;
1527   MatSetOption(A,MAT_COLUMNS_SORTED);
1528 
1529   if (!rank) {
1530     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
1531     /* read in my part of the matrix numerical values  */
1532     nz = procsnz[0];
1533     vals = buf;
1534     mycols = ibuf;
1535     if (size == 1)  nz -= extra_rows;
1536     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1537     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
1538 
1539     /* insert into matrix */
1540     jj      = rstart*bs;
1541     for ( i=0; i<m; i++ ) {
1542       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1543       mycols += locrowlens[i];
1544       vals   += locrowlens[i];
1545       jj++;
1546     }
1547     /* read in other processors (except the last one) and ship out */
1548     for ( i=1; i<size-1; i++ ) {
1549       nz = procsnz[i];
1550       vals = buf;
1551       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1552       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1553     }
1554     /* the last proc */
1555     if ( size != 1 ){
1556       nz = procsnz[i] - extra_rows;
1557       vals = buf;
1558       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1559       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
1560       MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);
1561     }
1562     PetscFree(procsnz);
1563   }
1564   else {
1565     /* receive numeric values */
1566     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
1567 
1568     /* receive message of values*/
1569     vals = buf;
1570     mycols = ibuf;
1571     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1572     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1573     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIBAIJ:something is wrong with file");
1574 
1575     /* insert into matrix */
1576     jj      = rstart*bs;
1577     for ( i=0; i<m; i++ ) {
1578       ierr    = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1579       mycols += locrowlens[i];
1580       vals   += locrowlens[i];
1581       jj++;
1582     }
1583   }
1584   PetscFree(locrowlens);
1585   PetscFree(buf);
1586   PetscFree(ibuf);
1587   PetscFree(rowners);
1588   PetscFree(dlens);
1589   PetscFree(mask);
1590   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1591   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1592   return 0;
1593 }
1594 
1595 
1596