xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision eec97ae04bdeb15ffcc1e3552cfd57ac3a3045c1)
1 #ifndef lint
2 static char vcid[] = "$Id: mpibaij.c,v 1.36 1996/11/29 22:26:37 curfman Exp curfman $";
3 #endif
4 
5 #include "src/mat/impls/baij/mpi/mpibaij.h"
6 #include "src/vec/vecimpl.h"
7 
8 #include "draw.h"
9 #include "pinclude/pviewer.h"
10 
11 extern int MatSetUpMultiply_MPIBAIJ(Mat);
12 extern int DisAssemble_MPIBAIJ(Mat);
13 extern int MatIncreaseOverlap_MPIBAIJ(Mat,int,IS *,int);
14 extern int MatGetSubMatrices_MPIBAIJ(Mat,int,IS *,IS *,MatGetSubMatrixCall,Mat **);
15 extern int MatLUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,Mat *);
16 extern int MatLUFactorNumeric_MPIBAIJ(Mat,Mat *);
17 extern int MatLUFactor_MPIBAIJ(Mat,IS,IS,double);
18 extern int MatSolve_MPIBAIJ(Mat,Vec,Vec);
19 extern int MatSolveAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
20 extern int MatSolveTrans_MPIBAIJ(Mat,Vec,Vec);
21 extern int MatSolveTransAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
22 extern int MatILUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,int,Mat *);
23 
24 
25 /*
26      Local utility routine that creates a mapping from the global column
27    number to the local number in the off-diagonal part of the local
28    storage of the matrix.  This is done in a non scable way since the
29    length of colmap equals the global matrix length.
30 */
31 static int CreateColmap_MPIBAIJ_Private(Mat mat)
32 {
33   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
34   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
35   int         nbs = B->nbs,i,bs=B->bs;;
36 
37   baij->colmap = (int *) PetscMalloc(baij->Nbs*sizeof(int));CHKPTRQ(baij->colmap);
38   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
39   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
40   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i*bs+1;
41   return 0;
42 }
43 
44 static int MatGetRowIJ_MPIBAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
45                             PetscTruth *done)
46 {
47   Mat_MPIBAIJ *aij = (Mat_MPIBAIJ *) mat->data;
48   int         ierr;
49   if (aij->size == 1) {
50     ierr = MatGetRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
51   } else SETERRQ(1,"MatGetRowIJ_MPIBAIJ:not supported in parallel");
52   return 0;
53 }
54 
55 static int MatRestoreRowIJ_MPIBAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
56                                 PetscTruth *done)
57 {
58   Mat_MPIBAIJ *aij = (Mat_MPIBAIJ *) mat->data;
59   int        ierr;
60   if (aij->size == 1) {
61     ierr = MatRestoreRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
62   } else SETERRQ(1,"MatRestoreRowIJ_MPIBAIJ:not supported in parallel");
63   return 0;
64 }
65 
66 extern int MatSetValues_SeqBAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
67 static int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
68 {
69   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
70   Scalar      value;
71   int         ierr,i,j, rstart = baij->rstart, rend = baij->rend;
72   int         cstart = baij->cstart, cend = baij->cend,row,col;
73   int         roworiented = baij->roworiented,rstart_orig,rend_orig;
74   int         cstart_orig,cend_orig,bs=baij->bs;
75 
76   if (baij->insertmode != NOT_SET_VALUES && baij->insertmode != addv) {
77     SETERRQ(1,"MatSetValues_MPIBAIJ:Cannot mix inserts and adds");
78   }
79   baij->insertmode = addv;
80   rstart_orig = rstart*bs;
81   rend_orig   = rend*bs;
82   cstart_orig = cstart*bs;
83   cend_orig   = cend*bs;
84   for ( i=0; i<m; i++ ) {
85 #if defined(PETSC_BOPT_g)
86     if (im[i] < 0) SETERRQ(1,"MatSetValues_MPIBAIJ:Negative row");
87     if (im[i] >= baij->M) SETERRQ(1,"MatSetValues_MPIBAIJ:Row too large");
88 #endif
89     if (im[i] >= rstart_orig && im[i] < rend_orig) {
90       row = im[i] - rstart_orig;
91       for ( j=0; j<n; j++ ) {
92         if (in[j] >= cstart_orig && in[j] < cend_orig){
93           col = in[j] - cstart_orig;
94           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
95           ierr = MatSetValues_SeqBAIJ(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
96         }
97 #if defined(PETSC_BOPT_g)
98         else if (in[j] < 0) {SETERRQ(1,"MatSetValues_MPIBAIJ:Negative column");}
99         else if (in[j] >= baij->N) {SETERRQ(1,"MatSetValues_MPIBAIJ:Col too large");}
100 #endif
101         else {
102           if (mat->was_assembled) {
103             if (!baij->colmap) {
104               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
105             }
106             col = baij->colmap[in[j]/bs] - 1 + in[j]%bs;
107             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
108               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
109               col =  in[j];
110             }
111           }
112           else col = in[j];
113           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
114           ierr = MatSetValues_SeqBAIJ(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
115         }
116       }
117     }
118     else {
119       if (roworiented && !baij->donotstash) {
120         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
121       }
122       else {
123         if (!baij->donotstash) {
124           row = im[i];
125 	  for ( j=0; j<n; j++ ) {
126 	    ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
127           }
128         }
129       }
130     }
131   }
132   return 0;
133 }
134 
135 static int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
136 {
137   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
138   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
139   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col;
140 
141   for ( i=0; i<m; i++ ) {
142     if (idxm[i] < 0) SETERRQ(1,"MatGetValues_MPIBAIJ:Negative row");
143     if (idxm[i] >= baij->M) SETERRQ(1,"MatGetValues_MPIBAIJ:Row too large");
144     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
145       row = idxm[i] - bsrstart;
146       for ( j=0; j<n; j++ ) {
147         if (idxn[j] < 0) SETERRQ(1,"MatGetValues_MPIBAIJ:Negative column");
148         if (idxn[j] >= baij->N) SETERRQ(1,"MatGetValues_MPIBAIJ:Col too large");
149         if (idxn[j] >= bscstart && idxn[j] < bscend){
150           col = idxn[j] - bscstart;
151           ierr = MatGetValues(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
152         }
153         else {
154           if (!baij->colmap) {
155             ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
156           }
157           if((baij->colmap[idxn[j]/bs]-1 < 0) ||
158              (baij->garray[baij->colmap[idxn[j]/bs]-1] != idxn[j]/bs)) *(v+i*n+j) = 0.0;
159           else {
160             col  = (baij->colmap[idxn[j]/bs]-1)*bs + idxn[j]%bs;
161             ierr = MatGetValues(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
162           }
163         }
164       }
165     }
166     else {
167       SETERRQ(1,"MatGetValues_MPIBAIJ:Only local values currently supported");
168     }
169   }
170   return 0;
171 }
172 
173 static int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
174 {
175   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
176   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
177   int        ierr, i,bs2=baij->bs2;
178   double     sum = 0.0;
179   Scalar     *v;
180 
181   if (baij->size == 1) {
182     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
183   } else {
184     if (type == NORM_FROBENIUS) {
185       v = amat->a;
186       for (i=0; i<amat->nz*bs2; i++ ) {
187 #if defined(PETSC_COMPLEX)
188         sum += real(conj(*v)*(*v)); v++;
189 #else
190         sum += (*v)*(*v); v++;
191 #endif
192       }
193       v = bmat->a;
194       for (i=0; i<bmat->nz*bs2; i++ ) {
195 #if defined(PETSC_COMPLEX)
196         sum += real(conj(*v)*(*v)); v++;
197 #else
198         sum += (*v)*(*v); v++;
199 #endif
200       }
201       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
202       *norm = sqrt(*norm);
203     }
204     else
205       SETERRQ(PETSC_ERR_SUP,"MatNorm_MPIBAIJ:No support for this norm yet");
206   }
207   return 0;
208 }
209 
210 static int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
211 {
212   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
213   MPI_Comm    comm = mat->comm;
214   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
215   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
216   MPI_Request *send_waits,*recv_waits;
217   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
218   InsertMode  addv;
219   Scalar      *rvalues,*svalues;
220 
221   /* make sure all processors are either in INSERTMODE or ADDMODE */
222   MPI_Allreduce(&baij->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
223   if (addv == (ADD_VALUES|INSERT_VALUES)) {
224     SETERRQ(1,"MatAssemblyBegin_MPIBAIJ:Some processors inserted others added");
225   }
226   baij->insertmode = addv; /* in case this processor had no cache */
227 
228   /*  first count number of contributors to each processor */
229   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
230   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
231   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
232   for ( i=0; i<baij->stash.n; i++ ) {
233     idx = baij->stash.idx[i];
234     for ( j=0; j<size; j++ ) {
235       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
236         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
237       }
238     }
239   }
240   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
241 
242   /* inform other processors of number of messages and max length*/
243   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
244   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
245   nreceives = work[rank];
246   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
247   nmax = work[rank];
248   PetscFree(work);
249 
250   /* post receives:
251        1) each message will consist of ordered pairs
252      (global index,value) we store the global index as a double
253      to simplify the message passing.
254        2) since we don't know how long each individual message is we
255      allocate the largest needed buffer for each receive. Potentially
256      this is a lot of wasted space.
257 
258 
259        This could be done better.
260   */
261   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
262   CHKPTRQ(rvalues);
263   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
264   CHKPTRQ(recv_waits);
265   for ( i=0; i<nreceives; i++ ) {
266     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
267               comm,recv_waits+i);
268   }
269 
270   /* do sends:
271       1) starts[i] gives the starting index in svalues for stuff going to
272          the ith processor
273   */
274   svalues = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
275   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
276   CHKPTRQ(send_waits);
277   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
278   starts[0] = 0;
279   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
280   for ( i=0; i<baij->stash.n; i++ ) {
281     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
282     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
283     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
284   }
285   PetscFree(owner);
286   starts[0] = 0;
287   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
288   count = 0;
289   for ( i=0; i<size; i++ ) {
290     if (procs[i]) {
291       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
292                 comm,send_waits+count++);
293     }
294   }
295   PetscFree(starts); PetscFree(nprocs);
296 
297   /* Free cache space */
298   PLogInfo(0,"[%d]MatAssemblyBegin_MPIBAIJ:Number of off-processor values %d\n",rank,baij->stash.n);
299   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
300 
301   baij->svalues    = svalues;    baij->rvalues    = rvalues;
302   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
303   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
304   baij->rmax       = nmax;
305 
306   return 0;
307 }
308 
309 
310 static int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
311 {
312   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
313   MPI_Status  *send_status,recv_status;
314   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
315   int         bs=baij->bs,row,col,other_disassembled;
316   Scalar      *values,val;
317   InsertMode  addv = baij->insertmode;
318 
319   /*  wait on receives */
320   while (count) {
321     MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);
322     /* unpack receives into our local space */
323     values = baij->rvalues + 3*imdex*baij->rmax;
324     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
325     n = n/3;
326     for ( i=0; i<n; i++ ) {
327       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
328       col = (int) PetscReal(values[3*i+1]);
329       val = values[3*i+2];
330       if (col >= baij->cstart*bs && col < baij->cend*bs) {
331         col -= baij->cstart*bs;
332         MatSetValues(baij->A,1,&row,1,&col,&val,addv);
333       }
334       else {
335         if (mat->was_assembled) {
336           if (!baij->colmap) {
337             ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);
338           }
339           col = (baij->colmap[col/bs]-1)*bs + col%bs;
340           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
341             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
342             col = (int) PetscReal(values[3*i+1]);
343           }
344         }
345         MatSetValues(baij->B,1,&row,1,&col,&val,addv);
346       }
347     }
348     count--;
349   }
350   PetscFree(baij->recv_waits); PetscFree(baij->rvalues);
351 
352   /* wait on sends */
353   if (baij->nsends) {
354     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));
355     CHKPTRQ(send_status);
356     MPI_Waitall(baij->nsends,baij->send_waits,send_status);
357     PetscFree(send_status);
358   }
359   PetscFree(baij->send_waits); PetscFree(baij->svalues);
360 
361   baij->insertmode = NOT_SET_VALUES;
362   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
363   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
364 
365   /* determine if any processor has disassembled, if so we must
366      also disassemble ourselfs, in order that we may reassemble. */
367   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
368   if (mat->was_assembled && !other_disassembled) {
369     ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
370   }
371 
372   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
373     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
374   }
375   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
376   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
377 
378   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
379   return 0;
380 }
381 
382 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
383 {
384   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
385   int          ierr;
386 
387   if (baij->size == 1) {
388     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
389   }
390   else SETERRQ(1,"MatView_MPIBAIJ_Binary:Only uniprocessor output supported");
391   return 0;
392 }
393 
394 static int MatView_MPIBAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
395 {
396   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
397   int          ierr, format,rank,bs = baij->bs;
398   FILE         *fd;
399   ViewerType   vtype;
400 
401   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
402   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
403     ierr = ViewerGetFormat(viewer,&format);
404     if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
405       MatInfo info;
406       MPI_Comm_rank(mat->comm,&rank);
407       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
408       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
409       PetscSequentialPhaseBegin(mat->comm,1);
410       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
411               rank,baij->m,(int)info.nz_used*bs,(int)info.nz_allocated*bs,
412               baij->bs,(int)info.memory);
413       ierr = MatGetInfo(baij->A,MAT_LOCAL,&info);
414       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
415       ierr = MatGetInfo(baij->B,MAT_LOCAL,&info);
416       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
417       fflush(fd);
418       PetscSequentialPhaseEnd(mat->comm,1);
419       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
420       return 0;
421     }
422     else if (format == VIEWER_FORMAT_ASCII_INFO) {
423       PetscPrintf(mat->comm,"  block size is %d\n",bs);
424       return 0;
425     }
426   }
427 
428   if (vtype == DRAW_VIEWER) {
429     Draw       draw;
430     PetscTruth isnull;
431     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
432     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
433   }
434 
435   if (vtype == ASCII_FILE_VIEWER) {
436     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
437     PetscSequentialPhaseBegin(mat->comm,1);
438     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
439            baij->rank,baij->m,baij->rstart*bs,baij->rend*bs,baij->n,
440             baij->cstart*bs,baij->cend*bs);
441     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
442     ierr = MatView(baij->B,viewer); CHKERRQ(ierr);
443     fflush(fd);
444     PetscSequentialPhaseEnd(mat->comm,1);
445   }
446   else {
447     int size = baij->size;
448     rank = baij->rank;
449     if (size == 1) {
450       ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
451     }
452     else {
453       /* assemble the entire matrix onto first processor. */
454       Mat         A;
455       Mat_SeqBAIJ *Aloc;
456       int         M = baij->M, N = baij->N,*ai,*aj,row,col,i,j,k,*rvals;
457       int         mbs=baij->mbs;
458       Scalar      *a;
459 
460       if (!rank) {
461         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
462         CHKERRQ(ierr);
463       }
464       else {
465         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
466         CHKERRQ(ierr);
467       }
468       PLogObjectParent(mat,A);
469 
470       /* copy over the A part */
471       Aloc = (Mat_SeqBAIJ*) baij->A->data;
472       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
473       row = baij->rstart;
474       rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
475 
476       for ( i=0; i<mbs; i++ ) {
477         rvals[0] = bs*(baij->rstart + i);
478         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
479         for ( j=ai[i]; j<ai[i+1]; j++ ) {
480           col = (baij->cstart+aj[j])*bs;
481           for (k=0; k<bs; k++ ) {
482             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
483             col++; a += bs;
484           }
485         }
486       }
487       /* copy over the B part */
488       Aloc = (Mat_SeqBAIJ*) baij->B->data;
489       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
490       row = baij->rstart*bs;
491       for ( i=0; i<mbs; i++ ) {
492         rvals[0] = bs*(baij->rstart + i);
493         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
494         for ( j=ai[i]; j<ai[i+1]; j++ ) {
495           col = baij->garray[aj[j]]*bs;
496           for (k=0; k<bs; k++ ) {
497             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
498             col++; a += bs;
499           }
500         }
501       }
502       PetscFree(rvals);
503       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
504       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
505       if (!rank) {
506         ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
507       }
508       ierr = MatDestroy(A); CHKERRQ(ierr);
509     }
510   }
511   return 0;
512 }
513 
514 
515 
516 static int MatView_MPIBAIJ(PetscObject obj,Viewer viewer)
517 {
518   Mat         mat = (Mat) obj;
519   int         ierr;
520   ViewerType  vtype;
521 
522   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
523   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
524       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
525     ierr = MatView_MPIBAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
526   }
527   else if (vtype == BINARY_FILE_VIEWER) {
528     return MatView_MPIBAIJ_Binary(mat,viewer);
529   }
530   return 0;
531 }
532 
533 static int MatDestroy_MPIBAIJ(PetscObject obj)
534 {
535   Mat         mat = (Mat) obj;
536   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
537   int         ierr;
538 
539 #if defined(PETSC_LOG)
540   PLogObjectState(obj,"Rows=%d, Cols=%d",baij->M,baij->N);
541 #endif
542 
543   PetscFree(baij->rowners);
544   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
545   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
546   if (baij->colmap) PetscFree(baij->colmap);
547   if (baij->garray) PetscFree(baij->garray);
548   if (baij->lvec)   VecDestroy(baij->lvec);
549   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
550   if (baij->rowvalues) PetscFree(baij->rowvalues);
551   PetscFree(baij);
552   if (mat->mapping) {
553     ierr = ISLocalToGlobalMappingDestroy(mat->mapping); CHKERRQ(ierr);
554   }
555   PLogObjectDestroy(mat);
556   PetscHeaderDestroy(mat);
557   return 0;
558 }
559 
560 static int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
561 {
562   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
563   int         ierr, nt;
564 
565   VecGetLocalSize_Fast(xx,nt);
566   if (nt != a->n) {
567     SETERRQ(1,"MatMult_MPIBAIJ:Incompatible parition of A and xx");
568   }
569   VecGetLocalSize_Fast(yy,nt);
570   if (nt != a->m) {
571     SETERRQ(1,"MatMult_MPIBAIJ:Incompatible parition of A and yy");
572   }
573   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
574   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
575   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
576   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
577   ierr = VecScatterPostRecvs(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
578   return 0;
579 }
580 
581 static int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
582 {
583   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
584   int        ierr;
585   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
586   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
587   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
588   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
589   return 0;
590 }
591 
592 static int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
593 {
594   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
595   int        ierr;
596 
597   /* do nondiagonal part */
598   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
599   /* send it on its way */
600   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
601   /* do local part */
602   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
603   /* receive remote parts: note this assumes the values are not actually */
604   /* inserted in yy until the next line, which is true for my implementation*/
605   /* but is not perhaps always true. */
606   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
607   return 0;
608 }
609 
610 static int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
611 {
612   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
613   int        ierr;
614 
615   /* do nondiagonal part */
616   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
617   /* send it on its way */
618   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
619   /* do local part */
620   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
621   /* receive remote parts: note this assumes the values are not actually */
622   /* inserted in yy until the next line, which is true for my implementation*/
623   /* but is not perhaps always true. */
624   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
625   return 0;
626 }
627 
628 /*
629   This only works correctly for square matrices where the subblock A->A is the
630    diagonal block
631 */
632 static int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
633 {
634   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
635   if (a->M != a->N)
636     SETERRQ(1,"MatGetDiagonal_MPIBAIJ:Supports only square matrix where A->A is diag block");
637   return MatGetDiagonal(a->A,v);
638 }
639 
640 static int MatScale_MPIBAIJ(Scalar *aa,Mat A)
641 {
642   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
643   int        ierr;
644   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
645   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
646   return 0;
647 }
648 static int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
649 {
650   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
651   *m = mat->M; *n = mat->N;
652   return 0;
653 }
654 
655 static int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
656 {
657   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
658   *m = mat->m; *n = mat->N;
659   return 0;
660 }
661 
662 static int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
663 {
664   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
665   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
666   return 0;
667 }
668 
669 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
670 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
671 
672 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
673 {
674   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
675   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
676   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
677   int        nztot, nzA, nzB, lrow, brstart = mat->rstart*bs, brend = mat->rend*bs;
678   int        *cmap, *idx_p,cstart = mat->cstart;
679 
680   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,"MatGetRow_MPIBAIJ:Already active");
681   mat->getrowactive = PETSC_TRUE;
682 
683   if (!mat->rowvalues && (idx || v)) {
684     /*
685         allocate enough space to hold information from the longest row.
686     */
687     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
688     int     max = 1,mbs = mat->mbs,tmp;
689     for ( i=0; i<mbs; i++ ) {
690       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
691       if (max < tmp) { max = tmp; }
692     }
693     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
694     CHKPTRQ(mat->rowvalues);
695     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
696   }
697 
698 
699   if (row < brstart || row >= brend) SETERRQ(1,"MatGetRow_MPIBAIJ:Only local rows")
700   lrow = row - brstart;
701 
702   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
703   if (!v)   {pvA = 0; pvB = 0;}
704   if (!idx) {pcA = 0; if (!v) pcB = 0;}
705   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
706   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
707   nztot = nzA + nzB;
708 
709   cmap  = mat->garray;
710   if (v  || idx) {
711     if (nztot) {
712       /* Sort by increasing column numbers, assuming A and B already sorted */
713       int imark = -1;
714       if (v) {
715         *v = v_p = mat->rowvalues;
716         for ( i=0; i<nzB; i++ ) {
717           if (cmap[cworkB[i]/bs] < cstart)   v_p[i] = vworkB[i];
718           else break;
719         }
720         imark = i;
721         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
722         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
723       }
724       if (idx) {
725         *idx = idx_p = mat->rowindices;
726         if (imark > -1) {
727           for ( i=0; i<imark; i++ ) {
728             idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs;
729           }
730         } else {
731           for ( i=0; i<nzB; i++ ) {
732             if (cmap[cworkB[i]/bs] < cstart)
733               idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
734             else break;
735           }
736           imark = i;
737         }
738         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart*bs + cworkA[i];
739         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
740       }
741     }
742     else {
743       if (idx) *idx = 0;
744       if (v)   *v   = 0;
745     }
746   }
747   *nz = nztot;
748   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
749   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
750   return 0;
751 }
752 
753 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
754 {
755   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
756   if (baij->getrowactive == PETSC_FALSE) {
757     SETERRQ(1,"MatRestoreRow_MPIBAIJ:MatGetRow not called");
758   }
759   baij->getrowactive = PETSC_FALSE;
760   return 0;
761 }
762 
763 static int MatGetBlockSize_MPIBAIJ(Mat mat,int *bs)
764 {
765   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
766   *bs = baij->bs;
767   return 0;
768 }
769 
770 static int MatZeroEntries_MPIBAIJ(Mat A)
771 {
772   Mat_MPIBAIJ *l = (Mat_MPIBAIJ *) A->data;
773   int         ierr;
774   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
775   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
776   return 0;
777 }
778 
779 static int MatGetInfo_MPIBAIJ(Mat matin,MatInfoType flag,MatInfo *info)
780 {
781   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) matin->data;
782   Mat         A = a->A, B = a->B;
783   int         ierr;
784   double      isend[5], irecv[5];
785 
786   info->rows_global    = (double)a->M;
787   info->columns_global = (double)a->N;
788   info->rows_local     = (double)a->m;
789   info->columns_local  = (double)a->N;
790   info->block_size     = (double)a->bs;
791   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
792   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->memory;
793   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
794   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->memory;
795   if (flag == MAT_LOCAL) {
796     info->nz_used      = isend[0];
797     info->nz_allocated = isend[1];
798     info->nz_unneeded  = isend[2];
799     info->memory       = isend[3];
800     info->mallocs      = isend[4];
801   } else if (flag == MAT_GLOBAL_MAX) {
802     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_MAX,matin->comm);
803     info->nz_used      = irecv[0];
804     info->nz_allocated = irecv[1];
805     info->nz_unneeded  = irecv[2];
806     info->memory       = irecv[3];
807     info->mallocs      = irecv[4];
808   } else if (flag == MAT_GLOBAL_SUM) {
809     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_SUM,matin->comm);
810     info->nz_used      = irecv[0];
811     info->nz_allocated = irecv[1];
812     info->nz_unneeded  = irecv[2];
813     info->memory       = irecv[3];
814     info->mallocs      = irecv[4];
815   }
816   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
817   info->fill_ratio_needed = 0;
818   info->factor_mallocs    = 0;
819   return 0;
820 }
821 
822 static int MatSetOption_MPIBAIJ(Mat A,MatOption op)
823 {
824   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
825 
826   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
827       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
828       op == MAT_COLUMNS_UNSORTED ||
829       op == MAT_COLUMNS_SORTED) {
830         MatSetOption(a->A,op);
831         MatSetOption(a->B,op);
832   } else if (op == MAT_ROW_ORIENTED) {
833         a->roworiented = 1;
834         MatSetOption(a->A,op);
835         MatSetOption(a->B,op);
836   } else if (op == MAT_ROWS_SORTED ||
837              op == MAT_ROWS_UNSORTED ||
838              op == MAT_SYMMETRIC ||
839              op == MAT_STRUCTURALLY_SYMMETRIC ||
840              op == MAT_YES_NEW_DIAGONALS)
841     PLogInfo(A,"Info:MatSetOption_MPIBAIJ:Option ignored\n");
842   else if (op == MAT_COLUMN_ORIENTED) {
843     a->roworiented = 0;
844     MatSetOption(a->A,op);
845     MatSetOption(a->B,op);
846   } else if (op == MAT_IGNORE_OFF_PROCESSOR_ENTRIES) {
847     a->donotstash = 1;
848   } else if (op == MAT_NO_NEW_DIAGONALS)
849     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIBAIJ:MAT_NO_NEW_DIAGONALS");}
850   else
851     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIBAIJ:unknown option");}
852   return 0;
853 }
854 
855 static int MatTranspose_MPIBAIJ(Mat A,Mat *matout)
856 {
857   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) A->data;
858   Mat_SeqBAIJ *Aloc;
859   Mat        B;
860   int        ierr,M=baij->M,N=baij->N,*ai,*aj,row,i,*rvals,j,k,col;
861   int        bs=baij->bs,mbs=baij->mbs;
862   Scalar     *a;
863 
864   if (matout == PETSC_NULL && M != N)
865     SETERRQ(1,"MatTranspose_MPIBAIJ:Square matrix only for in-place");
866   ierr = MatCreateMPIBAIJ(A->comm,baij->bs,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);
867   CHKERRQ(ierr);
868 
869   /* copy over the A part */
870   Aloc = (Mat_SeqBAIJ*) baij->A->data;
871   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
872   row = baij->rstart;
873   rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
874 
875   for ( i=0; i<mbs; i++ ) {
876     rvals[0] = bs*(baij->rstart + i);
877     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
878     for ( j=ai[i]; j<ai[i+1]; j++ ) {
879       col = (baij->cstart+aj[j])*bs;
880       for (k=0; k<bs; k++ ) {
881         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
882         col++; a += bs;
883       }
884     }
885   }
886   /* copy over the B part */
887   Aloc = (Mat_SeqBAIJ*) baij->B->data;
888   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
889   row = baij->rstart*bs;
890   for ( i=0; i<mbs; i++ ) {
891     rvals[0] = bs*(baij->rstart + i);
892     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
893     for ( j=ai[i]; j<ai[i+1]; j++ ) {
894       col = baij->garray[aj[j]]*bs;
895       for (k=0; k<bs; k++ ) {
896         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
897         col++; a += bs;
898       }
899     }
900   }
901   PetscFree(rvals);
902   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
903   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
904 
905   if (matout != PETSC_NULL) {
906     *matout = B;
907   } else {
908     /* This isn't really an in-place transpose .... but free data structures from baij */
909     PetscFree(baij->rowners);
910     ierr = MatDestroy(baij->A); CHKERRQ(ierr);
911     ierr = MatDestroy(baij->B); CHKERRQ(ierr);
912     if (baij->colmap) PetscFree(baij->colmap);
913     if (baij->garray) PetscFree(baij->garray);
914     if (baij->lvec) VecDestroy(baij->lvec);
915     if (baij->Mvctx) VecScatterDestroy(baij->Mvctx);
916     PetscFree(baij);
917     PetscMemcpy(A,B,sizeof(struct _Mat));
918     PetscHeaderDestroy(B);
919   }
920   return 0;
921 }
922 
923 int MatDiagonalScale_MPIBAIJ(Mat A,Vec ll,Vec rr)
924 {
925   Mat a = ((Mat_MPIBAIJ *) A->data)->A;
926   Mat b = ((Mat_MPIBAIJ *) A->data)->B;
927   int ierr,s1,s2,s3;
928 
929   if (ll)  {
930     ierr = VecGetLocalSize(ll,&s1); CHKERRQ(ierr);
931     ierr = MatGetLocalSize(A,&s2,&s3); CHKERRQ(ierr);
932     if (s1!=s2) SETERRQ(1,"MatDiagonalScale_MPIBAIJ: non-conforming local sizes");
933     ierr = MatDiagonalScale(a,ll,0); CHKERRQ(ierr);
934     ierr = MatDiagonalScale(b,ll,0); CHKERRQ(ierr);
935   }
936   if (rr) SETERRQ(1,"MatDiagonalScale_MPIBAIJ:not supported for right vector");
937   return 0;
938 }
939 
940 /* the code does not do the diagonal entries correctly unless the
941    matrix is square and the column and row owerships are identical.
942    This is a BUG. The only way to fix it seems to be to access
943    baij->A and baij->B directly and not through the MatZeroRows()
944    routine.
945 */
946 static int MatZeroRows_MPIBAIJ(Mat A,IS is,Scalar *diag)
947 {
948   Mat_MPIBAIJ    *l = (Mat_MPIBAIJ *) A->data;
949   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
950   int            *procs,*nprocs,j,found,idx,nsends,*work;
951   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
952   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
953   int            *lens,imdex,*lrows,*values,bs=l->bs;
954   MPI_Comm       comm = A->comm;
955   MPI_Request    *send_waits,*recv_waits;
956   MPI_Status     recv_status,*send_status;
957   IS             istmp;
958 
959   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
960   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
961 
962   /*  first count number of contributors to each processor */
963   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
964   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
965   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
966   for ( i=0; i<N; i++ ) {
967     idx = rows[i];
968     found = 0;
969     for ( j=0; j<size; j++ ) {
970       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
971         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
972       }
973     }
974     if (!found) SETERRQ(1,"MatZeroRows_MPIBAIJ:Index out of range");
975   }
976   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
977 
978   /* inform other processors of number of messages and max length*/
979   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
980   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
981   nrecvs = work[rank];
982   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
983   nmax = work[rank];
984   PetscFree(work);
985 
986   /* post receives:   */
987   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
988   CHKPTRQ(rvalues);
989   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
990   CHKPTRQ(recv_waits);
991   for ( i=0; i<nrecvs; i++ ) {
992     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
993   }
994 
995   /* do sends:
996       1) starts[i] gives the starting index in svalues for stuff going to
997          the ith processor
998   */
999   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
1000   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
1001   CHKPTRQ(send_waits);
1002   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
1003   starts[0] = 0;
1004   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1005   for ( i=0; i<N; i++ ) {
1006     svalues[starts[owner[i]]++] = rows[i];
1007   }
1008   ISRestoreIndices(is,&rows);
1009 
1010   starts[0] = 0;
1011   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1012   count = 0;
1013   for ( i=0; i<size; i++ ) {
1014     if (procs[i]) {
1015       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
1016     }
1017   }
1018   PetscFree(starts);
1019 
1020   base = owners[rank]*bs;
1021 
1022   /*  wait on receives */
1023   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
1024   source = lens + nrecvs;
1025   count  = nrecvs; slen = 0;
1026   while (count) {
1027     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
1028     /* unpack receives into our local space */
1029     MPI_Get_count(&recv_status,MPI_INT,&n);
1030     source[imdex]  = recv_status.MPI_SOURCE;
1031     lens[imdex]  = n;
1032     slen += n;
1033     count--;
1034   }
1035   PetscFree(recv_waits);
1036 
1037   /* move the data into the send scatter */
1038   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
1039   count = 0;
1040   for ( i=0; i<nrecvs; i++ ) {
1041     values = rvalues + i*nmax;
1042     for ( j=0; j<lens[i]; j++ ) {
1043       lrows[count++] = values[j] - base;
1044     }
1045   }
1046   PetscFree(rvalues); PetscFree(lens);
1047   PetscFree(owner); PetscFree(nprocs);
1048 
1049   /* actually zap the local rows */
1050   ierr = ISCreateGeneral(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
1051   PLogObjectParent(A,istmp);
1052   PetscFree(lrows);
1053   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
1054   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
1055   ierr = ISDestroy(istmp); CHKERRQ(ierr);
1056 
1057   /* wait on sends */
1058   if (nsends) {
1059     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
1060     CHKPTRQ(send_status);
1061     MPI_Waitall(nsends,send_waits,send_status);
1062     PetscFree(send_status);
1063   }
1064   PetscFree(send_waits); PetscFree(svalues);
1065 
1066   return 0;
1067 }
1068 extern int MatPrintHelp_SeqBAIJ(Mat);
1069 static int MatPrintHelp_MPIBAIJ(Mat A)
1070 {
1071   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1072 
1073   if (!a->rank) return MatPrintHelp_SeqBAIJ(a->A);
1074   else return 0;
1075 }
1076 
1077 static int MatConvertSameType_MPIBAIJ(Mat,Mat *,int);
1078 
1079 /* -------------------------------------------------------------------*/
1080 static struct _MatOps MatOps = {
1081   MatSetValues_MPIBAIJ,MatGetRow_MPIBAIJ,MatRestoreRow_MPIBAIJ,MatMult_MPIBAIJ,
1082   MatMultAdd_MPIBAIJ,MatMultTrans_MPIBAIJ,MatMultTransAdd_MPIBAIJ,0,
1083   0,0,0,0,
1084   0,0,MatTranspose_MPIBAIJ,MatGetInfo_MPIBAIJ,
1085   0,MatGetDiagonal_MPIBAIJ,MatDiagonalScale_MPIBAIJ,MatNorm_MPIBAIJ,
1086   MatAssemblyBegin_MPIBAIJ,MatAssemblyEnd_MPIBAIJ,0,MatSetOption_MPIBAIJ,
1087   MatZeroEntries_MPIBAIJ,MatZeroRows_MPIBAIJ,0,
1088   0,0,0,MatGetSize_MPIBAIJ,
1089   MatGetLocalSize_MPIBAIJ,MatGetOwnershipRange_MPIBAIJ,0,0,
1090   0,0,0,MatConvertSameType_MPIBAIJ,0,0,
1091   0,0,0,MatGetSubMatrices_MPIBAIJ,
1092   MatIncreaseOverlap_MPIBAIJ,MatGetValues_MPIBAIJ,0,MatPrintHelp_MPIBAIJ,
1093   MatScale_MPIBAIJ,0,0,0,MatGetBlockSize_MPIBAIJ};
1094 
1095 
1096 /*@C
1097    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
1098    (block compressed row).  For good matrix assembly performance
1099    the user should preallocate the matrix storage by setting the parameters
1100    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1101    performance can be increased by more than a factor of 50.
1102 
1103    Input Parameters:
1104 .  comm - MPI communicator
1105 .  bs   - size of blockk
1106 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1107            This value should be the same as the local size used in creating the
1108            y vector for the matrix-vector product y = Ax.
1109 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1110            This value should be the same as the local size used in creating the
1111            x vector for the matrix-vector product y = Ax.
1112 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1113 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1114 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
1115            submatrix  (same for all local rows)
1116 .  d_nzz - array containing the number of block nonzeros in the various block rows
1117            of the in diagonal portion of the local (possibly different for each block
1118            row) or PETSC_NULL.  You must leave room for the diagonal entry even if
1119            it is zero.
1120 .  o_nz  - number of block nonzeros per block row in the off-diagonal portion of local
1121            submatrix (same for all local rows).
1122 .  o_nzz - array containing the number of nonzeros in the various block rows of the
1123            off-diagonal portion of the local submatrix (possibly different for
1124            each block row) or PETSC_NULL.
1125 
1126    Output Parameter:
1127 .  A - the matrix
1128 
1129    Notes:
1130    The user MUST specify either the local or global matrix dimensions
1131    (possibly both).
1132 
1133    Storage Information:
1134    For a square global matrix we define each processor's diagonal portion
1135    to be its local rows and the corresponding columns (a square submatrix);
1136    each processor's off-diagonal portion encompasses the remainder of the
1137    local matrix (a rectangular submatrix).
1138 
1139    The user can specify preallocated storage for the diagonal part of
1140    the local submatrix with either d_nz or d_nnz (not both).  Set
1141    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1142    memory allocation.  Likewise, specify preallocated storage for the
1143    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1144 
1145    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1146    the figure below we depict these three local rows and all columns (0-11).
1147 
1148 $          0 1 2 3 4 5 6 7 8 9 10 11
1149 $         -------------------
1150 $  row 3  |  o o o d d d o o o o o o
1151 $  row 4  |  o o o d d d o o o o o o
1152 $  row 5  |  o o o d d d o o o o o o
1153 $         -------------------
1154 $
1155 
1156    Thus, any entries in the d locations are stored in the d (diagonal)
1157    submatrix, and any entries in the o locations are stored in the
1158    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1159    stored simply in the MATSEQBAIJ format for compressed row storage.
1160 
1161    Now d_nz should indicate the number of nonzeros per row in the d matrix,
1162    and o_nz should indicate the number of nonzeros per row in the o matrix.
1163    In general, for PDE problems in which most nonzeros are near the diagonal,
1164    one expects d_nz >> o_nz.   For large problems you MUST preallocate memory
1165    or you will get TERRIBLE performance; see the users' manual chapter on
1166    matrices.
1167 
1168 .keywords: matrix, block, aij, compressed row, sparse, parallel
1169 
1170 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues()
1171 @*/
1172 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
1173                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1174 {
1175   Mat          B;
1176   Mat_MPIBAIJ  *b;
1177   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE,size;
1178 
1179   if (bs < 1) SETERRQ(1,"MatCreateMPIBAIJ: invalid block size specified");
1180   *A = 0;
1181   PetscHeaderCreate(B,_Mat,MAT_COOKIE,MATMPIBAIJ,comm);
1182   PLogObjectCreate(B);
1183   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
1184   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
1185   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1186   MPI_Comm_size(comm,&size);
1187   if (size == 1) {
1188     B->ops.getrowij          = MatGetRowIJ_MPIBAIJ;
1189     B->ops.restorerowij      = MatRestoreRowIJ_MPIBAIJ;
1190     B->ops.lufactorsymbolic  = MatLUFactorSymbolic_MPIBAIJ;
1191     B->ops.lufactornumeric   = MatLUFactorNumeric_MPIBAIJ;
1192     B->ops.lufactor          = MatLUFactor_MPIBAIJ;
1193     B->ops.solve             = MatSolve_MPIBAIJ;
1194     B->ops.solveadd          = MatSolveAdd_MPIBAIJ;
1195     B->ops.solvetrans        = MatSolveTrans_MPIBAIJ;
1196     B->ops.solvetransadd     = MatSolveTransAdd_MPIBAIJ;
1197     B->ops.ilufactorsymbolic = MatILUFactorSymbolic_MPIBAIJ;
1198   }
1199 
1200   B->destroy    = MatDestroy_MPIBAIJ;
1201   B->view       = MatView_MPIBAIJ;
1202   B->mapping    = 0;
1203   B->factor     = 0;
1204   B->assembled  = PETSC_FALSE;
1205 
1206   b->insertmode = NOT_SET_VALUES;
1207   MPI_Comm_rank(comm,&b->rank);
1208   MPI_Comm_size(comm,&b->size);
1209 
1210   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
1211     SETERRQ(1,"MatCreateMPIBAIJ:Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1212   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) SETERRQ(1,"MatCreateMPIBAIJ: either M or m should be specified");
1213   if ( M == PETSC_DECIDE && n == PETSC_DECIDE)SETERRQ(1,"MatCreateMPIBAIJ: either N or n should be specified");
1214   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
1215   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
1216 
1217   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1218     work[0] = m; work[1] = n;
1219     mbs = m/bs; nbs = n/bs;
1220     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
1221     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
1222     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
1223   }
1224   if (m == PETSC_DECIDE) {
1225     Mbs = M/bs;
1226     if (Mbs*bs != M) SETERRQ(1,"MatCreateMPIBAIJ: No of global rows must be divisible by blocksize");
1227     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
1228     m   = mbs*bs;
1229   }
1230   if (n == PETSC_DECIDE) {
1231     Nbs = N/bs;
1232     if (Nbs*bs != N) SETERRQ(1,"MatCreateMPIBAIJ: No of global cols must be divisible by blocksize");
1233     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
1234     n   = nbs*bs;
1235   }
1236   if (mbs*bs != m || nbs*bs != n) SETERRQ(1,"MatCreateMPIBAIJ: No of local rows, cols must be divisible by blocksize");
1237 
1238   b->m = m; B->m = m;
1239   b->n = n; B->n = n;
1240   b->N = N; B->N = N;
1241   b->M = M; B->M = M;
1242   b->bs  = bs;
1243   b->bs2 = bs*bs;
1244   b->mbs = mbs;
1245   b->nbs = nbs;
1246   b->Mbs = Mbs;
1247   b->Nbs = Nbs;
1248 
1249   /* build local table of row and column ownerships */
1250   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1251   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
1252   b->cowners = b->rowners + b->size + 2;
1253   MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
1254   b->rowners[0] = 0;
1255   for ( i=2; i<=b->size; i++ ) {
1256     b->rowners[i] += b->rowners[i-1];
1257   }
1258   b->rstart = b->rowners[b->rank];
1259   b->rend   = b->rowners[b->rank+1];
1260   MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
1261   b->cowners[0] = 0;
1262   for ( i=2; i<=b->size; i++ ) {
1263     b->cowners[i] += b->cowners[i-1];
1264   }
1265   b->cstart = b->cowners[b->rank];
1266   b->cend   = b->cowners[b->rank+1];
1267 
1268   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1269   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1270   PLogObjectParent(B,b->A);
1271   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1272   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1273   PLogObjectParent(B,b->B);
1274 
1275   /* build cache for off array entries formed */
1276   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1277   b->donotstash  = 0;
1278   b->colmap      = 0;
1279   b->garray      = 0;
1280   b->roworiented = 1;
1281 
1282   /* stuff used for matrix vector multiply */
1283   b->lvec      = 0;
1284   b->Mvctx     = 0;
1285 
1286   /* stuff for MatGetRow() */
1287   b->rowindices   = 0;
1288   b->rowvalues    = 0;
1289   b->getrowactive = PETSC_FALSE;
1290 
1291   *A = B;
1292   return 0;
1293 }
1294 static int MatConvertSameType_MPIBAIJ(Mat matin,Mat *newmat,int cpvalues)
1295 {
1296   Mat        mat;
1297   Mat_MPIBAIJ *a,*oldmat = (Mat_MPIBAIJ *) matin->data;
1298   int        ierr, len=0, flg;
1299 
1300   *newmat       = 0;
1301   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIBAIJ,matin->comm);
1302   PLogObjectCreate(mat);
1303   mat->data       = (void *) (a = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(a);
1304   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1305   mat->destroy    = MatDestroy_MPIBAIJ;
1306   mat->view       = MatView_MPIBAIJ;
1307   mat->factor     = matin->factor;
1308   mat->assembled  = PETSC_TRUE;
1309 
1310   a->m = mat->m   = oldmat->m;
1311   a->n = mat->n   = oldmat->n;
1312   a->M = mat->M   = oldmat->M;
1313   a->N = mat->N   = oldmat->N;
1314 
1315   a->bs  = oldmat->bs;
1316   a->bs2 = oldmat->bs2;
1317   a->mbs = oldmat->mbs;
1318   a->nbs = oldmat->nbs;
1319   a->Mbs = oldmat->Mbs;
1320   a->Nbs = oldmat->Nbs;
1321 
1322   a->rstart       = oldmat->rstart;
1323   a->rend         = oldmat->rend;
1324   a->cstart       = oldmat->cstart;
1325   a->cend         = oldmat->cend;
1326   a->size         = oldmat->size;
1327   a->rank         = oldmat->rank;
1328   a->insertmode   = NOT_SET_VALUES;
1329   a->rowvalues    = 0;
1330   a->getrowactive = PETSC_FALSE;
1331 
1332   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1333   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
1334   a->cowners = a->rowners + a->size + 2;
1335   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1336   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1337   if (oldmat->colmap) {
1338     a->colmap = (int *) PetscMalloc((a->Nbs)*sizeof(int));CHKPTRQ(a->colmap);
1339     PLogObjectMemory(mat,(a->Nbs)*sizeof(int));
1340     PetscMemcpy(a->colmap,oldmat->colmap,(a->Nbs)*sizeof(int));
1341   } else a->colmap = 0;
1342   if (oldmat->garray && (len = ((Mat_SeqBAIJ *) (oldmat->B->data))->nbs)) {
1343     a->garray = (int *) PetscMalloc(len*sizeof(int)); CHKPTRQ(a->garray);
1344     PLogObjectMemory(mat,len*sizeof(int));
1345     PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1346   } else a->garray = 0;
1347 
1348   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1349   PLogObjectParent(mat,a->lvec);
1350   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1351   PLogObjectParent(mat,a->Mvctx);
1352   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1353   PLogObjectParent(mat,a->A);
1354   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1355   PLogObjectParent(mat,a->B);
1356   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1357   if (flg) {
1358     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1359   }
1360   *newmat = mat;
1361   return 0;
1362 }
1363 
1364 #include "sys.h"
1365 
1366 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
1367 {
1368   Mat          A;
1369   int          i, nz, ierr, j,rstart, rend, fd;
1370   Scalar       *vals,*buf;
1371   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1372   MPI_Status   status;
1373   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
1374   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
1375   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,bs2,Mbs,mbs,extra_rows;
1376   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
1377   int          dcount,kmax,k,nzcount,tmp;
1378 
1379 
1380   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
1381   bs2  = bs*bs;
1382 
1383   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1384   if (!rank) {
1385     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1386     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
1387     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIBAIJ:not matrix object");
1388   }
1389 
1390   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1391   M = header[1]; N = header[2];
1392 
1393   if (M != N) SETERRQ(1,"MatLoad_MPIBAIJ:Can only do square matrices");
1394 
1395   /*
1396      This code adds extra rows to make sure the number of rows is
1397      divisible by the blocksize
1398   */
1399   Mbs        = M/bs;
1400   extra_rows = bs - M + bs*(Mbs);
1401   if (extra_rows == bs) extra_rows = 0;
1402   else                  Mbs++;
1403   if (extra_rows &&!rank) {
1404     PLogInfo(0,"MatLoad_MPIBAIJ:Padding loaded matrix to match blocksize\n");
1405   }
1406 
1407   /* determine ownership of all rows */
1408   mbs = Mbs/size + ((Mbs % size) > rank);
1409   m   = mbs * bs;
1410   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
1411   browners = rowners + size + 1;
1412   MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1413   rowners[0] = 0;
1414   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
1415   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
1416   rstart = rowners[rank];
1417   rend   = rowners[rank+1];
1418 
1419   /* distribute row lengths to all processors */
1420   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
1421   if (!rank) {
1422     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
1423     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1424     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
1425     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1426     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
1427     MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);
1428     PetscFree(sndcounts);
1429   }
1430   else {
1431     MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);
1432   }
1433 
1434   if (!rank) {
1435     /* calculate the number of nonzeros on each processor */
1436     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1437     PetscMemzero(procsnz,size*sizeof(int));
1438     for ( i=0; i<size; i++ ) {
1439       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
1440         procsnz[i] += rowlengths[j];
1441       }
1442     }
1443     PetscFree(rowlengths);
1444 
1445     /* determine max buffer needed and allocate it */
1446     maxnz = 0;
1447     for ( i=0; i<size; i++ ) {
1448       maxnz = PetscMax(maxnz,procsnz[i]);
1449     }
1450     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1451 
1452     /* read in my part of the matrix column indices  */
1453     nz = procsnz[0];
1454     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1455     mycols = ibuf;
1456     if (size == 1)  nz -= extra_rows;
1457     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1458     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
1459 
1460     /* read in every ones (except the last) and ship off */
1461     for ( i=1; i<size-1; i++ ) {
1462       nz = procsnz[i];
1463       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1464       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1465     }
1466     /* read in the stuff for the last proc */
1467     if ( size != 1 ) {
1468       nz = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
1469       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1470       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
1471       MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);
1472     }
1473     PetscFree(cols);
1474   }
1475   else {
1476     /* determine buffer space needed for message */
1477     nz = 0;
1478     for ( i=0; i<m; i++ ) {
1479       nz += locrowlens[i];
1480     }
1481     ibuf = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1482     mycols = ibuf;
1483     /* receive message of column indices*/
1484     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1485     MPI_Get_count(&status,MPI_INT,&maxnz);
1486     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIBAIJ:something is wrong with file");
1487   }
1488 
1489   /* loop over local rows, determining number of off diagonal entries */
1490   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
1491   odlens = dlens + (rend-rstart);
1492   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
1493   PetscMemzero(mask,3*Mbs*sizeof(int));
1494   masked1 = mask    + Mbs;
1495   masked2 = masked1 + Mbs;
1496   rowcount = 0; nzcount = 0;
1497   for ( i=0; i<mbs; i++ ) {
1498     dcount  = 0;
1499     odcount = 0;
1500     for ( j=0; j<bs; j++ ) {
1501       kmax = locrowlens[rowcount];
1502       for ( k=0; k<kmax; k++ ) {
1503         tmp = mycols[nzcount++]/bs;
1504         if (!mask[tmp]) {
1505           mask[tmp] = 1;
1506           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
1507           else masked1[dcount++] = tmp;
1508         }
1509       }
1510       rowcount++;
1511     }
1512 
1513     dlens[i]  = dcount;
1514     odlens[i] = odcount;
1515 
1516     /* zero out the mask elements we set */
1517     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
1518     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
1519   }
1520 
1521   /* create our matrix */
1522   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);
1523          CHKERRQ(ierr);
1524   A = *newmat;
1525   MatSetOption(A,MAT_COLUMNS_SORTED);
1526 
1527   if (!rank) {
1528     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
1529     /* read in my part of the matrix numerical values  */
1530     nz = procsnz[0];
1531     vals = buf;
1532     mycols = ibuf;
1533     if (size == 1)  nz -= extra_rows;
1534     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1535     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
1536 
1537     /* insert into matrix */
1538     jj      = rstart*bs;
1539     for ( i=0; i<m; i++ ) {
1540       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1541       mycols += locrowlens[i];
1542       vals   += locrowlens[i];
1543       jj++;
1544     }
1545     /* read in other processors (except the last one) and ship out */
1546     for ( i=1; i<size-1; i++ ) {
1547       nz = procsnz[i];
1548       vals = buf;
1549       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1550       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1551     }
1552     /* the last proc */
1553     if ( size != 1 ){
1554       nz = procsnz[i] - extra_rows;
1555       vals = buf;
1556       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1557       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
1558       MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);
1559     }
1560     PetscFree(procsnz);
1561   }
1562   else {
1563     /* receive numeric values */
1564     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
1565 
1566     /* receive message of values*/
1567     vals = buf;
1568     mycols = ibuf;
1569     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1570     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1571     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIBAIJ:something is wrong with file");
1572 
1573     /* insert into matrix */
1574     jj      = rstart*bs;
1575     for ( i=0; i<m; i++ ) {
1576       ierr    = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1577       mycols += locrowlens[i];
1578       vals   += locrowlens[i];
1579       jj++;
1580     }
1581   }
1582   PetscFree(locrowlens);
1583   PetscFree(buf);
1584   PetscFree(ibuf);
1585   PetscFree(rowners);
1586   PetscFree(dlens);
1587   PetscFree(mask);
1588   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1589   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1590   return 0;
1591 }
1592 
1593 
1594