xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision 90f02eec332fcca4c33b4e7b21cabed76bf0614e)
1 #ifndef lint
2 static char vcid[] = "$Id: mpibaij.c,v 1.29 1996/11/13 21:28:41 bsmith Exp bsmith $";
3 #endif
4 
5 #include "src/mat/impls/baij/mpi/mpibaij.h"
6 #include "src/vec/vecimpl.h"
7 
8 #include "draw.h"
9 #include "pinclude/pviewer.h"
10 
11 extern int MatSetUpMultiply_MPIBAIJ(Mat);
12 extern int DisAssemble_MPIBAIJ(Mat);
13 extern int MatIncreaseOverlap_MPIBAIJ(Mat,int,IS *,int);
14 extern int MatGetSubMatrices_MPIBAIJ(Mat,int,IS *,IS *,MatGetSubMatrixCall,Mat **);
15 extern int MatLUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,Mat *);
16 extern int MatLUFactorNumeric_MPIBAIJ(Mat,Mat *);
17 extern int MatLUFactor_MPIBAIJ(Mat,IS,IS,double);
18 extern int MatSolve_MPIBAIJ(Mat,Vec,Vec);
19 extern int MatSolveAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
20 extern int MatSolveTrans_MPIBAIJ(Mat,Vec,Vec);
21 extern int MatSolveTransAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
22 extern int MatILUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,int,Mat *);
23 
24 
25 /*
26      Local utility routine that creates a mapping from the global column
27    number to the local number in the off-diagonal part of the local
28    storage of the matrix.  This is done in a non scable way since the
29    length of colmap equals the global matrix length.
30 */
31 static int CreateColmap_MPIBAIJ_Private(Mat mat)
32 {
33   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
34   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
35   int         nbs = B->nbs,i;
36 
37   baij->colmap = (int *) PetscMalloc(baij->Nbs*sizeof(int));CHKPTRQ(baij->colmap);
38   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
39   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
40   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i + 1;
41   return 0;
42 }
43 
44 static int MatGetRowIJ_MPIBAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
45                             PetscTruth *done)
46 {
47   Mat_MPIBAIJ *aij = (Mat_MPIBAIJ *) mat->data;
48   int         ierr;
49   if (aij->size == 1) {
50     ierr = MatGetRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
51   } else SETERRQ(1,"MatGetRowIJ_MPIBAIJ:not supported in parallel");
52   return 0;
53 }
54 
55 static int MatRestoreRowIJ_MPIBAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
56                                 PetscTruth *done)
57 {
58   Mat_MPIBAIJ *aij = (Mat_MPIBAIJ *) mat->data;
59   int        ierr;
60   if (aij->size == 1) {
61     ierr = MatRestoreRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
62   } else SETERRQ(1,"MatRestoreRowIJ_MPIBAIJ:not supported in parallel");
63   return 0;
64 }
65 
66 extern int MatSetValues_SeqBAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
67 static int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
68 {
69   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
70   Scalar      value;
71   int         ierr,i,j, rstart = baij->rstart, rend = baij->rend;
72   int         cstart = baij->cstart, cend = baij->cend,row,col;
73   int         roworiented = baij->roworiented,rstart_orig,rend_orig;
74   int         cstart_orig,cend_orig,bs=baij->bs;
75 
76   if (baij->insertmode != NOT_SET_VALUES && baij->insertmode != addv) {
77     SETERRQ(1,"MatSetValues_MPIBAIJ:Cannot mix inserts and adds");
78   }
79   baij->insertmode = addv;
80   rstart_orig = rstart*bs;
81   rend_orig   = rend*bs;
82   cstart_orig = cstart*bs;
83   cend_orig   = cend*bs;
84   for ( i=0; i<m; i++ ) {
85 #if defined(PETSC_BOPT_g)
86     if (im[i] < 0) SETERRQ(1,"MatSetValues_MPIBAIJ:Negative row");
87     if (im[i] >= baij->M) SETERRQ(1,"MatSetValues_MPIBAIJ:Row too large");
88 #endif
89     if (im[i] >= rstart_orig && im[i] < rend_orig) {
90       row = im[i] - rstart_orig;
91       for ( j=0; j<n; j++ ) {
92         if (in[j] >= cstart_orig && in[j] < cend_orig){
93           col = in[j] - cstart_orig;
94           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
95           ierr = MatSetValues_SeqBAIJ(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
96         }
97 #if defined(PETSC_BOPT_g)
98         else if (in[j] < 0) {SETERRQ(1,"MatSetValues_MPIBAIJ:Negative column");}
99         else if (in[j] >= baij->N) {SETERRQ(1,"MatSetValues_MPIBAIJ:Col too large");}
100 #endif
101         else {
102           if (mat->was_assembled) {
103             if (!baij->colmap) {
104               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
105             }
106             col = baij->colmap[in[j]/bs] - 1 + in[j]%bs;
107             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
108               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
109               col =  in[j];
110             }
111           }
112           else col = in[j];
113           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
114           ierr = MatSetValues_SeqBAIJ(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
115         }
116       }
117     }
118     else {
119       if (roworiented && !baij->donotstash) {
120         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
121       }
122       else {
123         if (!baij->donotstash) {
124           row = im[i];
125 	  for ( j=0; j<n; j++ ) {
126 	    ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
127           }
128         }
129       }
130     }
131   }
132   return 0;
133 }
134 
135 static int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
136 {
137   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
138   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
139   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col;
140 
141   for ( i=0; i<m; i++ ) {
142     if (idxm[i] < 0) SETERRQ(1,"MatGetValues_MPIBAIJ:Negative row");
143     if (idxm[i] >= baij->M) SETERRQ(1,"MatGetValues_MPIBAIJ:Row too large");
144     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
145       row = idxm[i] - bsrstart;
146       for ( j=0; j<n; j++ ) {
147         if (idxn[j] < 0) SETERRQ(1,"MatGetValues_MPIBAIJ:Negative column");
148         if (idxn[j] >= baij->N) SETERRQ(1,"MatGetValues_MPIBAIJ:Col too large");
149         if (idxn[j] >= bscstart && idxn[j] < bscend){
150           col = idxn[j] - bscstart;
151           ierr = MatGetValues(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
152         }
153         else {
154           if (!baij->colmap) {
155             ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
156           }
157           if((baij->colmap[idxn[j]/bs]-1 < 0) ||
158              (baij->garray[baij->colmap[idxn[j]/bs]-1] != idxn[j]/bs)) *(v+i*n+j) = 0.0;
159           else {
160             col  = (baij->colmap[idxn[j]/bs]-1)*bs + idxn[j]%bs;
161             ierr = MatGetValues(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
162           }
163         }
164       }
165     }
166     else {
167       SETERRQ(1,"MatGetValues_MPIBAIJ:Only local values currently supported");
168     }
169   }
170   return 0;
171 }
172 
173 static int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
174 {
175   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
176   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
177   int        ierr, i,bs2=baij->bs2;
178   double     sum = 0.0;
179   Scalar     *v;
180 
181   if (baij->size == 1) {
182     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
183   } else {
184     if (type == NORM_FROBENIUS) {
185       v = amat->a;
186       for (i=0; i<amat->nz*bs2; i++ ) {
187 #if defined(PETSC_COMPLEX)
188         sum += real(conj(*v)*(*v)); v++;
189 #else
190         sum += (*v)*(*v); v++;
191 #endif
192       }
193       v = bmat->a;
194       for (i=0; i<bmat->nz*bs2; i++ ) {
195 #if defined(PETSC_COMPLEX)
196         sum += real(conj(*v)*(*v)); v++;
197 #else
198         sum += (*v)*(*v); v++;
199 #endif
200       }
201       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
202       *norm = sqrt(*norm);
203     }
204     else
205       SETERRQ(1,"MatNorm_SeqBAIJ:No support for this norm yet");
206   }
207   return 0;
208 }
209 
210 static int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
211 {
212   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
213   MPI_Comm    comm = mat->comm;
214   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
215   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
216   MPI_Request *send_waits,*recv_waits;
217   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
218   InsertMode  addv;
219   Scalar      *rvalues,*svalues;
220 
221   /* make sure all processors are either in INSERTMODE or ADDMODE */
222   MPI_Allreduce(&baij->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
223   if (addv == (ADD_VALUES|INSERT_VALUES)) {
224     SETERRQ(1,"MatAssemblyBegin_MPIBAIJ:Some processors inserted others added");
225   }
226   baij->insertmode = addv; /* in case this processor had no cache */
227 
228   /*  first count number of contributors to each processor */
229   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
230   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
231   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
232   for ( i=0; i<baij->stash.n; i++ ) {
233     idx = baij->stash.idx[i];
234     for ( j=0; j<size; j++ ) {
235       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
236         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
237       }
238     }
239   }
240   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
241 
242   /* inform other processors of number of messages and max length*/
243   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
244   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
245   nreceives = work[rank];
246   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
247   nmax = work[rank];
248   PetscFree(work);
249 
250   /* post receives:
251        1) each message will consist of ordered pairs
252      (global index,value) we store the global index as a double
253      to simplify the message passing.
254        2) since we don't know how long each individual message is we
255      allocate the largest needed buffer for each receive. Potentially
256      this is a lot of wasted space.
257 
258 
259        This could be done better.
260   */
261   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
262   CHKPTRQ(rvalues);
263   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
264   CHKPTRQ(recv_waits);
265   for ( i=0; i<nreceives; i++ ) {
266     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
267               comm,recv_waits+i);
268   }
269 
270   /* do sends:
271       1) starts[i] gives the starting index in svalues for stuff going to
272          the ith processor
273   */
274   svalues = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
275   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
276   CHKPTRQ(send_waits);
277   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
278   starts[0] = 0;
279   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
280   for ( i=0; i<baij->stash.n; i++ ) {
281     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
282     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
283     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
284   }
285   PetscFree(owner);
286   starts[0] = 0;
287   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
288   count = 0;
289   for ( i=0; i<size; i++ ) {
290     if (procs[i]) {
291       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
292                 comm,send_waits+count++);
293     }
294   }
295   PetscFree(starts); PetscFree(nprocs);
296 
297   /* Free cache space */
298   PLogInfo(0,"[%d]MatAssemblyBegin_MPIBAIJ:Number of off processor values %d\n",rank,baij->stash.n);
299   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
300 
301   baij->svalues    = svalues;    baij->rvalues    = rvalues;
302   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
303   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
304   baij->rmax       = nmax;
305 
306   return 0;
307 }
308 
309 
310 static int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
311 {
312   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
313   MPI_Status  *send_status,recv_status;
314   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
315   int         bs=baij->bs,row,col,other_disassembled;
316   Scalar      *values,val;
317   InsertMode  addv = baij->insertmode;
318 
319   /*  wait on receives */
320   while (count) {
321     MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);
322     /* unpack receives into our local space */
323     values = baij->rvalues + 3*imdex*baij->rmax;
324     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
325     n = n/3;
326     for ( i=0; i<n; i++ ) {
327       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
328       col = (int) PetscReal(values[3*i+1]);
329       val = values[3*i+2];
330       if (col >= baij->cstart*bs && col < baij->cend*bs) {
331         col -= baij->cstart*bs;
332         MatSetValues(baij->A,1,&row,1,&col,&val,addv);
333       }
334       else {
335         if (mat->was_assembled) {
336           if (!baij->colmap) {
337             ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);
338           }
339           col = (baij->colmap[col/bs]-1)*bs + col%bs;
340           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
341             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
342             col = (int) PetscReal(values[3*i+1]);
343           }
344         }
345         MatSetValues(baij->B,1,&row,1,&col,&val,addv);
346       }
347     }
348     count--;
349   }
350   PetscFree(baij->recv_waits); PetscFree(baij->rvalues);
351 
352   /* wait on sends */
353   if (baij->nsends) {
354     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));
355     CHKPTRQ(send_status);
356     MPI_Waitall(baij->nsends,baij->send_waits,send_status);
357     PetscFree(send_status);
358   }
359   PetscFree(baij->send_waits); PetscFree(baij->svalues);
360 
361   baij->insertmode = NOT_SET_VALUES;
362   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
363   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
364 
365   /* determine if any processor has disassembled, if so we must
366      also disassemble ourselfs, in order that we may reassemble. */
367   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
368   if (mat->was_assembled && !other_disassembled) {
369     ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
370   }
371 
372   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
373     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
374   }
375   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
376   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
377 
378   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
379   return 0;
380 }
381 
382 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
383 {
384   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
385   int          ierr;
386 
387   if (baij->size == 1) {
388     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
389   }
390   else SETERRQ(1,"MatView_MPIBAIJ_Binary:Only uniprocessor output supported");
391   return 0;
392 }
393 
394 static int MatView_MPIBAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
395 {
396   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
397   int          ierr, format,rank,bs = baij->bs;
398   FILE         *fd;
399   ViewerType   vtype;
400 
401   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
402   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
403     ierr = ViewerGetFormat(viewer,&format);
404     if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
405       MatInfo info;
406       MPI_Comm_rank(mat->comm,&rank);
407       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
408       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
409       PetscSequentialPhaseBegin(mat->comm,1);
410       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
411               rank,baij->m,(int)info.nz_used*bs,(int)info.nz_allocated*bs,
412               baij->bs,(int)info.memory);
413       ierr = MatGetInfo(baij->A,MAT_LOCAL,&info);
414       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
415       ierr = MatGetInfo(baij->B,MAT_LOCAL,&info);
416       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
417       fflush(fd);
418       PetscSequentialPhaseEnd(mat->comm,1);
419       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
420       return 0;
421     }
422     else if (format == VIEWER_FORMAT_ASCII_INFO) {
423       PetscPrintf(mat->comm,"  block size is %d\n",bs);
424       return 0;
425     }
426   }
427 
428   if (vtype == DRAW_VIEWER) {
429     Draw       draw;
430     PetscTruth isnull;
431     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
432     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
433   }
434 
435   if (vtype == ASCII_FILE_VIEWER) {
436     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
437     PetscSequentialPhaseBegin(mat->comm,1);
438     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
439            baij->rank,baij->m,baij->rstart*bs,baij->rend*bs,baij->n,
440             baij->cstart*bs,baij->cend*bs);
441     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
442     ierr = MatView(baij->B,viewer); CHKERRQ(ierr);
443     fflush(fd);
444     PetscSequentialPhaseEnd(mat->comm,1);
445   }
446   else {
447     int size = baij->size;
448     rank = baij->rank;
449     if (size == 1) {
450       ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
451     }
452     else {
453       /* assemble the entire matrix onto first processor. */
454       Mat         A;
455       Mat_SeqBAIJ *Aloc;
456       int         M = baij->M, N = baij->N,*ai,*aj,row,col,i,j,k,*rvals;
457       int         mbs=baij->mbs;
458       Scalar      *a;
459 
460       if (!rank) {
461         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
462         CHKERRQ(ierr);
463       }
464       else {
465         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
466         CHKERRQ(ierr);
467       }
468       PLogObjectParent(mat,A);
469 
470       /* copy over the A part */
471       Aloc = (Mat_SeqBAIJ*) baij->A->data;
472       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
473       row = baij->rstart;
474       rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
475 
476       for ( i=0; i<mbs; i++ ) {
477         rvals[0] = bs*(baij->rstart + i);
478         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
479         for ( j=ai[i]; j<ai[i+1]; j++ ) {
480           col = (baij->cstart+aj[j])*bs;
481           for (k=0; k<bs; k++ ) {
482             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
483             col++; a += bs;
484           }
485         }
486       }
487       /* copy over the B part */
488       Aloc = (Mat_SeqBAIJ*) baij->B->data;
489       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
490       row = baij->rstart*bs;
491       for ( i=0; i<mbs; i++ ) {
492         rvals[0] = bs*(baij->rstart + i);
493         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
494         for ( j=ai[i]; j<ai[i+1]; j++ ) {
495           col = baij->garray[aj[j]]*bs;
496           for (k=0; k<bs; k++ ) {
497             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
498             col++; a += bs;
499           }
500         }
501       }
502       PetscFree(rvals);
503       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
504       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
505       if (!rank) {
506         ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
507       }
508       ierr = MatDestroy(A); CHKERRQ(ierr);
509     }
510   }
511   return 0;
512 }
513 
514 
515 
516 static int MatView_MPIBAIJ(PetscObject obj,Viewer viewer)
517 {
518   Mat         mat = (Mat) obj;
519   int         ierr;
520   ViewerType  vtype;
521 
522   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
523   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
524       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
525     ierr = MatView_MPIBAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
526   }
527   else if (vtype == BINARY_FILE_VIEWER) {
528     return MatView_MPIBAIJ_Binary(mat,viewer);
529   }
530   return 0;
531 }
532 
533 static int MatDestroy_MPIBAIJ(PetscObject obj)
534 {
535   Mat         mat = (Mat) obj;
536   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
537   int         ierr;
538 
539 #if defined(PETSC_LOG)
540   PLogObjectState(obj,"Rows=%d, Cols=%d",baij->M,baij->N);
541 #endif
542 
543   PetscFree(baij->rowners);
544   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
545   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
546   if (baij->colmap) PetscFree(baij->colmap);
547   if (baij->garray) PetscFree(baij->garray);
548   if (baij->lvec)   VecDestroy(baij->lvec);
549   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
550   if (baij->rowvalues) PetscFree(baij->rowvalues);
551   PetscFree(baij);
552   if (mat->mapping) {
553     ierr = ISLocalToGlobalMappingDestroy(mat->mapping); CHKERRQ(ierr);
554   }
555   PLogObjectDestroy(mat);
556   PetscHeaderDestroy(mat);
557   return 0;
558 }
559 
560 static int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
561 {
562   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
563   int         ierr, nt;
564 
565   VecGetLocalSize_Fast(xx,nt);
566   if (nt != a->n) {
567     SETERRQ(1,"MatMult_MPIBAIJ:Incompatible parition of A and xx");
568   }
569   VecGetLocalSize_Fast(yy,nt);
570   if (nt != a->m) {
571     SETERRQ(1,"MatMult_MPIBAIJ:Incompatible parition of A and yy");
572   }
573   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
574   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
575   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
576   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
577   ierr = VecScatterPostRecvs(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
578   return 0;
579 }
580 
581 static int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
582 {
583   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
584   int        ierr;
585   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
586   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
587   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
588   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
589   return 0;
590 }
591 
592 static int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
593 {
594   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
595   int        ierr;
596 
597   /* do nondiagonal part */
598   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
599   /* send it on its way */
600   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
601   /* do local part */
602   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
603   /* receive remote parts: note this assumes the values are not actually */
604   /* inserted in yy until the next line, which is true for my implementation*/
605   /* but is not perhaps always true. */
606   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
607   return 0;
608 }
609 
610 static int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
611 {
612   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
613   int        ierr;
614 
615   /* do nondiagonal part */
616   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
617   /* send it on its way */
618   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
619   /* do local part */
620   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
621   /* receive remote parts: note this assumes the values are not actually */
622   /* inserted in yy until the next line, which is true for my implementation*/
623   /* but is not perhaps always true. */
624   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
625   return 0;
626 }
627 
628 /*
629   This only works correctly for square matrices where the subblock A->A is the
630    diagonal block
631 */
632 static int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
633 {
634   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
635   if (a->M != a->N)
636     SETERRQ(1,"MatGetDiagonal_MPIBAIJ:Supports only square matrix where A->A is diag block");
637   return MatGetDiagonal(a->A,v);
638 }
639 
640 static int MatScale_MPIBAIJ(Scalar *aa,Mat A)
641 {
642   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
643   int        ierr;
644   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
645   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
646   return 0;
647 }
648 static int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
649 {
650   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
651   *m = mat->M; *n = mat->N;
652   return 0;
653 }
654 
655 static int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
656 {
657   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
658   *m = mat->m; *n = mat->N;
659   return 0;
660 }
661 
662 static int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
663 {
664   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
665   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
666   return 0;
667 }
668 
669 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
670 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
671 
672 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
673 {
674   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
675   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
676   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
677   int        nztot, nzA, nzB, lrow, brstart = mat->rstart*bs, brend = mat->rend*bs;
678   int        *cmap, *idx_p,cstart = mat->cstart;
679 
680   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,"MatGetRow_MPIBAIJ:Already active");
681   mat->getrowactive = PETSC_TRUE;
682 
683   if (!mat->rowvalues && (idx || v)) {
684     /*
685         allocate enough space to hold information from the longest row.
686     */
687     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
688     int     max = 1,mbs = mat->mbs,tmp;
689     for ( i=0; i<mbs; i++ ) {
690       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
691       if (max < tmp) { max = tmp; }
692     }
693     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
694     CHKPTRQ(mat->rowvalues);
695     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
696   }
697 
698 
699   if (row < brstart || row >= brend) SETERRQ(1,"MatGetRow_MPIBAIJ:Only local rows")
700   lrow = row - brstart;
701 
702   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
703   if (!v)   {pvA = 0; pvB = 0;}
704   if (!idx) {pcA = 0; if (!v) pcB = 0;}
705   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
706   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
707   nztot = nzA + nzB;
708 
709   cmap  = mat->garray;
710   if (v  || idx) {
711     if (nztot) {
712       /* Sort by increasing column numbers, assuming A and B already sorted */
713       int imark = -1;
714       if (v) {
715         *v = v_p = mat->rowvalues;
716         for ( i=0; i<nzB; i++ ) {
717           if (cmap[cworkB[i]/bs] < cstart)   v_p[i] = vworkB[i];
718           else break;
719         }
720         imark = i;
721         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
722         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
723       }
724       if (idx) {
725         *idx = idx_p = mat->rowindices;
726         if (imark > -1) {
727           for ( i=0; i<imark; i++ ) {
728             idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs;
729           }
730         } else {
731           for ( i=0; i<nzB; i++ ) {
732             if (cmap[cworkB[i]/bs] < cstart)
733               idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
734             else break;
735           }
736           imark = i;
737         }
738         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart*bs + cworkA[i];
739         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
740       }
741     }
742     else {
743       if (idx) *idx = 0;
744       if (v)   *v   = 0;
745     }
746   }
747   *nz = nztot;
748   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
749   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
750   return 0;
751 }
752 
753 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
754 {
755   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
756   if (baij->getrowactive == PETSC_FALSE) {
757     SETERRQ(1,"MatRestoreRow_MPIBAIJ:MatGetRow not called");
758   }
759   baij->getrowactive = PETSC_FALSE;
760   return 0;
761 }
762 
763 static int MatGetBlockSize_MPIBAIJ(Mat mat,int *bs)
764 {
765   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
766   *bs = baij->bs;
767   return 0;
768 }
769 
770 static int MatZeroEntries_MPIBAIJ(Mat A)
771 {
772   Mat_MPIBAIJ *l = (Mat_MPIBAIJ *) A->data;
773   int         ierr;
774   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
775   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
776   return 0;
777 }
778 
779 static int MatGetInfo_MPIBAIJ(Mat matin,MatInfoType flag,MatInfo *info)
780 {
781   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) matin->data;
782   Mat         A = a->A, B = a->B;
783   int         ierr;
784   double      isend[5], irecv[5];
785 
786   info->rows_global    = (double)a->M;
787   info->columns_global = (double)a->N;
788   info->rows_local     = (double)a->m;
789   info->columns_local  = (double)a->N;
790   info->block_size     = (double)a->bs;
791   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
792   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->memory;
793   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
794   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->memory;
795   if (flag == MAT_LOCAL) {
796     info->nz_used      = isend[0];
797     info->nz_allocated = isend[1];
798     info->nz_unneeded  = isend[2];
799     info->memory       = isend[3];
800     info->mallocs      = isend[4];
801   } else if (flag == MAT_GLOBAL_MAX) {
802     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_MAX,matin->comm);
803     info->nz_used      = irecv[0];
804     info->nz_allocated = irecv[1];
805     info->nz_unneeded  = irecv[2];
806     info->memory       = irecv[3];
807     info->mallocs      = irecv[4];
808   } else if (flag == MAT_GLOBAL_SUM) {
809     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_SUM,matin->comm);
810     info->nz_used      = irecv[0];
811     info->nz_allocated = irecv[1];
812     info->nz_unneeded  = irecv[2];
813     info->memory       = irecv[3];
814     info->mallocs      = irecv[4];
815   }
816   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
817   info->fill_ratio_needed = 0;
818   info->factor_mallocs    = 0;
819   return 0;
820 }
821 
822 static int MatSetOption_MPIBAIJ(Mat A,MatOption op)
823 {
824   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
825 
826   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
827       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
828       op == MAT_COLUMNS_SORTED ||
829       op == MAT_ROW_ORIENTED) {
830         MatSetOption(a->A,op);
831         MatSetOption(a->B,op);
832   }
833   else if (op == MAT_ROWS_SORTED ||
834            op == MAT_SYMMETRIC ||
835            op == MAT_STRUCTURALLY_SYMMETRIC ||
836            op == MAT_YES_NEW_DIAGONALS)
837     PLogInfo(A,"Info:MatSetOption_MPIBAIJ:Option ignored\n");
838   else if (op == MAT_COLUMN_ORIENTED) {
839     a->roworiented = 0;
840     MatSetOption(a->A,op);
841     MatSetOption(a->B,op);
842   } else if (op == MAT_IGNORE_OFF_PROCESSOR_ENTRIES) {
843     a->donotstash = 1;
844   } else if (op == MAT_NO_NEW_DIAGONALS)
845     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIBAIJ:MAT_NO_NEW_DIAGONALS");}
846   else
847     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIBAIJ:unknown option");}
848   return 0;
849 }
850 
851 static int MatTranspose_MPIBAIJ(Mat A,Mat *matout)
852 {
853   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) A->data;
854   Mat_SeqBAIJ *Aloc;
855   Mat        B;
856   int        ierr,M=baij->M,N=baij->N,*ai,*aj,row,i,*rvals,j,k,col;
857   int        bs=baij->bs,mbs=baij->mbs;
858   Scalar     *a;
859 
860   if (matout == PETSC_NULL && M != N)
861     SETERRQ(1,"MatTranspose_MPIBAIJ:Square matrix only for in-place");
862   ierr = MatCreateMPIBAIJ(A->comm,baij->bs,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);
863   CHKERRQ(ierr);
864 
865   /* copy over the A part */
866   Aloc = (Mat_SeqBAIJ*) baij->A->data;
867   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
868   row = baij->rstart;
869   rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
870 
871   for ( i=0; i<mbs; i++ ) {
872     rvals[0] = bs*(baij->rstart + i);
873     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
874     for ( j=ai[i]; j<ai[i+1]; j++ ) {
875       col = (baij->cstart+aj[j])*bs;
876       for (k=0; k<bs; k++ ) {
877         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
878         col++; a += bs;
879       }
880     }
881   }
882   /* copy over the B part */
883   Aloc = (Mat_SeqBAIJ*) baij->B->data;
884   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
885   row = baij->rstart*bs;
886   for ( i=0; i<mbs; i++ ) {
887     rvals[0] = bs*(baij->rstart + i);
888     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
889     for ( j=ai[i]; j<ai[i+1]; j++ ) {
890       col = baij->garray[aj[j]]*bs;
891       for (k=0; k<bs; k++ ) {
892         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
893         col++; a += bs;
894       }
895     }
896   }
897   PetscFree(rvals);
898   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
899   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
900 
901   if (matout != PETSC_NULL) {
902     *matout = B;
903   } else {
904     /* This isn't really an in-place transpose .... but free data structures from baij */
905     PetscFree(baij->rowners);
906     ierr = MatDestroy(baij->A); CHKERRQ(ierr);
907     ierr = MatDestroy(baij->B); CHKERRQ(ierr);
908     if (baij->colmap) PetscFree(baij->colmap);
909     if (baij->garray) PetscFree(baij->garray);
910     if (baij->lvec) VecDestroy(baij->lvec);
911     if (baij->Mvctx) VecScatterDestroy(baij->Mvctx);
912     PetscFree(baij);
913     PetscMemcpy(A,B,sizeof(struct _Mat));
914     PetscHeaderDestroy(B);
915   }
916   return 0;
917 }
918 
919 int MatDiagonalScale_MPIBAIJ(Mat A,Vec ll,Vec rr)
920 {
921   Mat a = ((Mat_MPIBAIJ *) A->data)->A;
922   Mat b = ((Mat_MPIBAIJ *) A->data)->B;
923   int ierr,s1,s2,s3;
924 
925   if (ll)  {
926     ierr = VecGetLocalSize(ll,&s1); CHKERRQ(ierr);
927     ierr = MatGetLocalSize(A,&s2,&s3); CHKERRQ(ierr);
928     if (s1!=s2) SETERRQ(1,"MatDiagonalScale_MPIBAIJ: non-conforming local sizes");
929     ierr = MatDiagonalScale(a,ll,0); CHKERRQ(ierr);
930     ierr = MatDiagonalScale(b,ll,0); CHKERRQ(ierr);
931   }
932   if (rr) SETERRQ(1,"MatDiagonalScale_MPIBAIJ:not supported for right vector");
933   return 0;
934 }
935 
936 /* the code does not do the diagonal entries correctly unless the
937    matrix is square and the column and row owerships are identical.
938    This is a BUG. The only way to fix it seems to be to access
939    baij->A and baij->B directly and not through the MatZeroRows()
940    routine.
941 */
942 static int MatZeroRows_MPIBAIJ(Mat A,IS is,Scalar *diag)
943 {
944   Mat_MPIBAIJ    *l = (Mat_MPIBAIJ *) A->data;
945   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
946   int            *procs,*nprocs,j,found,idx,nsends,*work;
947   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
948   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
949   int            *lens,imdex,*lrows,*values,bs=l->bs;
950   MPI_Comm       comm = A->comm;
951   MPI_Request    *send_waits,*recv_waits;
952   MPI_Status     recv_status,*send_status;
953   IS             istmp;
954 
955   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
956   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
957 
958   /*  first count number of contributors to each processor */
959   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
960   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
961   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
962   for ( i=0; i<N; i++ ) {
963     idx = rows[i];
964     found = 0;
965     for ( j=0; j<size; j++ ) {
966       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
967         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
968       }
969     }
970     if (!found) SETERRQ(1,"MatZeroRows_MPIBAIJ:Index out of range");
971   }
972   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
973 
974   /* inform other processors of number of messages and max length*/
975   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
976   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
977   nrecvs = work[rank];
978   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
979   nmax = work[rank];
980   PetscFree(work);
981 
982   /* post receives:   */
983   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
984   CHKPTRQ(rvalues);
985   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
986   CHKPTRQ(recv_waits);
987   for ( i=0; i<nrecvs; i++ ) {
988     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
989   }
990 
991   /* do sends:
992       1) starts[i] gives the starting index in svalues for stuff going to
993          the ith processor
994   */
995   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
996   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
997   CHKPTRQ(send_waits);
998   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
999   starts[0] = 0;
1000   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1001   for ( i=0; i<N; i++ ) {
1002     svalues[starts[owner[i]]++] = rows[i];
1003   }
1004   ISRestoreIndices(is,&rows);
1005 
1006   starts[0] = 0;
1007   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1008   count = 0;
1009   for ( i=0; i<size; i++ ) {
1010     if (procs[i]) {
1011       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
1012     }
1013   }
1014   PetscFree(starts);
1015 
1016   base = owners[rank]*bs;
1017 
1018   /*  wait on receives */
1019   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
1020   source = lens + nrecvs;
1021   count  = nrecvs; slen = 0;
1022   while (count) {
1023     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
1024     /* unpack receives into our local space */
1025     MPI_Get_count(&recv_status,MPI_INT,&n);
1026     source[imdex]  = recv_status.MPI_SOURCE;
1027     lens[imdex]  = n;
1028     slen += n;
1029     count--;
1030   }
1031   PetscFree(recv_waits);
1032 
1033   /* move the data into the send scatter */
1034   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
1035   count = 0;
1036   for ( i=0; i<nrecvs; i++ ) {
1037     values = rvalues + i*nmax;
1038     for ( j=0; j<lens[i]; j++ ) {
1039       lrows[count++] = values[j] - base;
1040     }
1041   }
1042   PetscFree(rvalues); PetscFree(lens);
1043   PetscFree(owner); PetscFree(nprocs);
1044 
1045   /* actually zap the local rows */
1046   ierr = ISCreateGeneral(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
1047   PLogObjectParent(A,istmp);
1048   PetscFree(lrows);
1049   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
1050   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
1051   ierr = ISDestroy(istmp); CHKERRQ(ierr);
1052 
1053   /* wait on sends */
1054   if (nsends) {
1055     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
1056     CHKPTRQ(send_status);
1057     MPI_Waitall(nsends,send_waits,send_status);
1058     PetscFree(send_status);
1059   }
1060   PetscFree(send_waits); PetscFree(svalues);
1061 
1062   return 0;
1063 }
1064 extern int MatPrintHelp_SeqBAIJ(Mat);
1065 static int MatPrintHelp_MPIBAIJ(Mat A)
1066 {
1067   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1068 
1069   if (!a->rank) return MatPrintHelp_SeqBAIJ(a->A);
1070   else return 0;
1071 }
1072 
1073 static int MatConvertSameType_MPIBAIJ(Mat,Mat *,int);
1074 
1075 /* -------------------------------------------------------------------*/
1076 static struct _MatOps MatOps = {
1077   MatSetValues_MPIBAIJ,MatGetRow_MPIBAIJ,MatRestoreRow_MPIBAIJ,MatMult_MPIBAIJ,
1078   MatMultAdd_MPIBAIJ,MatMultTrans_MPIBAIJ,MatMultTransAdd_MPIBAIJ,0,
1079   0,0,0,0,
1080   0,0,MatTranspose_MPIBAIJ,MatGetInfo_MPIBAIJ,
1081   0,MatGetDiagonal_MPIBAIJ,MatDiagonalScale_MPIBAIJ,MatNorm_MPIBAIJ,
1082   MatAssemblyBegin_MPIBAIJ,MatAssemblyEnd_MPIBAIJ,0,MatSetOption_MPIBAIJ,
1083   MatZeroEntries_MPIBAIJ,MatZeroRows_MPIBAIJ,0,
1084   0,0,0,MatGetSize_MPIBAIJ,
1085   MatGetLocalSize_MPIBAIJ,MatGetOwnershipRange_MPIBAIJ,0,0,
1086   0,0,0,MatConvertSameType_MPIBAIJ,0,0,
1087   0,0,0,MatGetSubMatrices_MPIBAIJ,
1088   MatIncreaseOverlap_MPIBAIJ,MatGetValues_MPIBAIJ,0,MatPrintHelp_MPIBAIJ,
1089   MatScale_MPIBAIJ,0,0,0,MatGetBlockSize_MPIBAIJ};
1090 
1091 
1092 /*@C
1093    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
1094    (block compressed row).  For good matrix assembly performance
1095    the user should preallocate the matrix storage by setting the parameters
1096    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1097    performance can be increased by more than a factor of 50.
1098 
1099    Input Parameters:
1100 .  comm - MPI communicator
1101 .  bs   - size of blockk
1102 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1103            This value should be the same as the local size used in creating the
1104            y vector for the matrix-vector product y = Ax.
1105 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1106            This value should be the same as the local size used in creating the
1107            x vector for the matrix-vector product y = Ax.
1108 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1109 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1110 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
1111            submatrix  (same for all local rows)
1112 .  d_nzz - array containing the number of block nonzeros in the various block rows
1113            of the in diagonal portion of the local (possibly different for each block
1114            row) or PETSC_NULL.  You must leave room for the diagonal entry even if
1115            it is zero.
1116 .  o_nz  - number of block nonzeros per block row in the off-diagonal portion of local
1117            submatrix (same for all local rows).
1118 .  o_nzz - array containing the number of nonzeros in the various block rows of the
1119            off-diagonal portion of the local submatrix (possibly different for
1120            each block row) or PETSC_NULL.
1121 
1122    Output Parameter:
1123 .  A - the matrix
1124 
1125    Notes:
1126    The user MUST specify either the local or global matrix dimensions
1127    (possibly both).
1128 
1129    Storage Information:
1130    For a square global matrix we define each processor's diagonal portion
1131    to be its local rows and the corresponding columns (a square submatrix);
1132    each processor's off-diagonal portion encompasses the remainder of the
1133    local matrix (a rectangular submatrix).
1134 
1135    The user can specify preallocated storage for the diagonal part of
1136    the local submatrix with either d_nz or d_nnz (not both).  Set
1137    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1138    memory allocation.  Likewise, specify preallocated storage for the
1139    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1140 
1141    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1142    the figure below we depict these three local rows and all columns (0-11).
1143 
1144 $          0 1 2 3 4 5 6 7 8 9 10 11
1145 $         -------------------
1146 $  row 3  |  o o o d d d o o o o o o
1147 $  row 4  |  o o o d d d o o o o o o
1148 $  row 5  |  o o o d d d o o o o o o
1149 $         -------------------
1150 $
1151 
1152    Thus, any entries in the d locations are stored in the d (diagonal)
1153    submatrix, and any entries in the o locations are stored in the
1154    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1155    stored simply in the MATSEQBAIJ format for compressed row storage.
1156 
1157    Now d_nz should indicate the number of nonzeros per row in the d matrix,
1158    and o_nz should indicate the number of nonzeros per row in the o matrix.
1159    In general, for PDE problems in which most nonzeros are near the diagonal,
1160    one expects d_nz >> o_nz.   For large problems you MUST preallocate memory
1161    or you will get TERRIBLE performance; see the users' manual chapter on
1162    matrices and the file $(PETSC_DIR)/Performance.
1163 
1164 .keywords: matrix, block, aij, compressed row, sparse, parallel
1165 
1166 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues()
1167 @*/
1168 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
1169                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1170 {
1171   Mat          B;
1172   Mat_MPIBAIJ  *b;
1173   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE,size;
1174 
1175   if (bs < 1) SETERRQ(1,"MatCreateMPIBAIJ: invalid block size specified");
1176   *A = 0;
1177   PetscHeaderCreate(B,_Mat,MAT_COOKIE,MATMPIBAIJ,comm);
1178   PLogObjectCreate(B);
1179   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
1180   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
1181   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1182   MPI_Comm_size(comm,&size);
1183   if (size == 1) {
1184     B->ops.getrowij          = MatGetRowIJ_MPIBAIJ;
1185     B->ops.restorerowij      = MatRestoreRowIJ_MPIBAIJ;
1186     B->ops.lufactorsymbolic  = MatLUFactorSymbolic_MPIBAIJ;
1187     B->ops.lufactornumeric   = MatLUFactorNumeric_MPIBAIJ;
1188     B->ops.lufactor          = MatLUFactor_MPIBAIJ;
1189     B->ops.solve             = MatSolve_MPIBAIJ;
1190     B->ops.solveadd          = MatSolveAdd_MPIBAIJ;
1191     B->ops.solvetrans        = MatSolveTrans_MPIBAIJ;
1192     B->ops.solvetransadd     = MatSolveTransAdd_MPIBAIJ;
1193     B->ops.ilufactorsymbolic = MatILUFactorSymbolic_MPIBAIJ;
1194   }
1195 
1196   B->destroy    = MatDestroy_MPIBAIJ;
1197   B->view       = MatView_MPIBAIJ;
1198   B->mapping    = 0;
1199   B->factor     = 0;
1200   B->assembled  = PETSC_FALSE;
1201 
1202   b->insertmode = NOT_SET_VALUES;
1203   MPI_Comm_rank(comm,&b->rank);
1204   MPI_Comm_size(comm,&b->size);
1205 
1206   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
1207     SETERRQ(1,"MatCreateMPIBAIJ:Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1208   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) SETERRQ(1,"MatCreateMPIBAIJ: either M or m should be specified");
1209   if ( M == PETSC_DECIDE && n == PETSC_DECIDE)SETERRQ(1,"MatCreateMPIBAIJ: either N or n should be specified");
1210   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
1211   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
1212 
1213   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1214     work[0] = m; work[1] = n;
1215     mbs = m/bs; nbs = n/bs;
1216     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
1217     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
1218     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
1219   }
1220   if (m == PETSC_DECIDE) {
1221     Mbs = M/bs;
1222     if (Mbs*bs != M) SETERRQ(1,"MatCreateMPIBAIJ: No of global rows must be divisible by blocksize");
1223     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
1224     m   = mbs*bs;
1225   }
1226   if (n == PETSC_DECIDE) {
1227     Nbs = N/bs;
1228     if (Nbs*bs != N) SETERRQ(1,"MatCreateMPIBAIJ: No of global cols must be divisible by blocksize");
1229     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
1230     n   = nbs*bs;
1231   }
1232   if (mbs*bs != m || nbs*bs != n) SETERRQ(1,"MatCreateMPIBAIJ: No of local rows, cols must be divisible by blocksize");
1233 
1234   b->m = m; B->m = m;
1235   b->n = n; B->n = n;
1236   b->N = N; B->N = N;
1237   b->M = M; B->M = M;
1238   b->bs  = bs;
1239   b->bs2 = bs*bs;
1240   b->mbs = mbs;
1241   b->nbs = nbs;
1242   b->Mbs = Mbs;
1243   b->Nbs = Nbs;
1244 
1245   /* build local table of row and column ownerships */
1246   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1247   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
1248   b->cowners = b->rowners + b->size + 2;
1249   MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
1250   b->rowners[0] = 0;
1251   for ( i=2; i<=b->size; i++ ) {
1252     b->rowners[i] += b->rowners[i-1];
1253   }
1254   b->rstart = b->rowners[b->rank];
1255   b->rend   = b->rowners[b->rank+1];
1256   MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
1257   b->cowners[0] = 0;
1258   for ( i=2; i<=b->size; i++ ) {
1259     b->cowners[i] += b->cowners[i-1];
1260   }
1261   b->cstart = b->cowners[b->rank];
1262   b->cend   = b->cowners[b->rank+1];
1263 
1264   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1265   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1266   PLogObjectParent(B,b->A);
1267   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1268   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1269   PLogObjectParent(B,b->B);
1270 
1271   /* build cache for off array entries formed */
1272   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1273   b->donotstash  = 0;
1274   b->colmap      = 0;
1275   b->garray      = 0;
1276   b->roworiented = 1;
1277 
1278   /* stuff used for matrix vector multiply */
1279   b->lvec      = 0;
1280   b->Mvctx     = 0;
1281 
1282   /* stuff for MatGetRow() */
1283   b->rowindices   = 0;
1284   b->rowvalues    = 0;
1285   b->getrowactive = PETSC_FALSE;
1286 
1287   *A = B;
1288   return 0;
1289 }
1290 static int MatConvertSameType_MPIBAIJ(Mat matin,Mat *newmat,int cpvalues)
1291 {
1292   Mat        mat;
1293   Mat_MPIBAIJ *a,*oldmat = (Mat_MPIBAIJ *) matin->data;
1294   int        ierr, len=0, flg;
1295 
1296   *newmat       = 0;
1297   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIBAIJ,matin->comm);
1298   PLogObjectCreate(mat);
1299   mat->data       = (void *) (a = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(a);
1300   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1301   mat->destroy    = MatDestroy_MPIBAIJ;
1302   mat->view       = MatView_MPIBAIJ;
1303   mat->factor     = matin->factor;
1304   mat->assembled  = PETSC_TRUE;
1305 
1306   a->m = mat->m   = oldmat->m;
1307   a->n = mat->n   = oldmat->n;
1308   a->M = mat->M   = oldmat->M;
1309   a->N = mat->N   = oldmat->N;
1310 
1311   a->bs  = oldmat->bs;
1312   a->bs2 = oldmat->bs2;
1313   a->mbs = oldmat->mbs;
1314   a->nbs = oldmat->nbs;
1315   a->Mbs = oldmat->Mbs;
1316   a->Nbs = oldmat->Nbs;
1317 
1318   a->rstart       = oldmat->rstart;
1319   a->rend         = oldmat->rend;
1320   a->cstart       = oldmat->cstart;
1321   a->cend         = oldmat->cend;
1322   a->size         = oldmat->size;
1323   a->rank         = oldmat->rank;
1324   a->insertmode   = NOT_SET_VALUES;
1325   a->rowvalues    = 0;
1326   a->getrowactive = PETSC_FALSE;
1327 
1328   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1329   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
1330   a->cowners = a->rowners + a->size + 2;
1331   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1332   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1333   if (oldmat->colmap) {
1334     a->colmap = (int *) PetscMalloc((a->Nbs)*sizeof(int));CHKPTRQ(a->colmap);
1335     PLogObjectMemory(mat,(a->Nbs)*sizeof(int));
1336     PetscMemcpy(a->colmap,oldmat->colmap,(a->Nbs)*sizeof(int));
1337   } else a->colmap = 0;
1338   if (oldmat->garray && (len = ((Mat_SeqBAIJ *) (oldmat->B->data))->nbs)) {
1339     a->garray = (int *) PetscMalloc(len*sizeof(int)); CHKPTRQ(a->garray);
1340     PLogObjectMemory(mat,len*sizeof(int));
1341     PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1342   } else a->garray = 0;
1343 
1344   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1345   PLogObjectParent(mat,a->lvec);
1346   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1347   PLogObjectParent(mat,a->Mvctx);
1348   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1349   PLogObjectParent(mat,a->A);
1350   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1351   PLogObjectParent(mat,a->B);
1352   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1353   if (flg) {
1354     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1355   }
1356   *newmat = mat;
1357   return 0;
1358 }
1359 
1360 #include "sys.h"
1361 
1362 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
1363 {
1364   Mat          A;
1365   int          i, nz, ierr, j,rstart, rend, fd;
1366   Scalar       *vals,*buf;
1367   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1368   MPI_Status   status;
1369   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
1370   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
1371   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,bs2,Mbs,mbs,extra_rows;
1372   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
1373   int          dcount,kmax,k,nzcount,tmp;
1374 
1375 
1376   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
1377   bs2  = bs*bs;
1378 
1379   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1380   if (!rank) {
1381     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1382     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
1383     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIBAIJ:not matrix object");
1384   }
1385 
1386   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1387   M = header[1]; N = header[2];
1388 
1389   if (M != N) SETERRQ(1,"MatLoad_SeqBAIJ:Can only do square matrices");
1390 
1391   /*
1392      This code adds extra rows to make sure the number of rows is
1393      divisible by the blocksize
1394   */
1395   Mbs        = M/bs;
1396   extra_rows = bs - M + bs*(Mbs);
1397   if (extra_rows == bs) extra_rows = 0;
1398   else                  Mbs++;
1399   if (extra_rows &&!rank) {
1400     PLogInfo(0,"MatLoad_SeqBAIJ:Padding loaded matrix to match blocksize\n");
1401   }
1402 
1403   /* determine ownership of all rows */
1404   mbs = Mbs/size + ((Mbs % size) > rank);
1405   m   = mbs * bs;
1406   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
1407   browners = rowners + size + 1;
1408   MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1409   rowners[0] = 0;
1410   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
1411   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
1412   rstart = rowners[rank];
1413   rend   = rowners[rank+1];
1414 
1415   /* distribute row lengths to all processors */
1416   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
1417   if (!rank) {
1418     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
1419     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1420     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
1421     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1422     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
1423     MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);
1424     PetscFree(sndcounts);
1425   }
1426   else {
1427     MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);
1428   }
1429 
1430   if (!rank) {
1431     /* calculate the number of nonzeros on each processor */
1432     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1433     PetscMemzero(procsnz,size*sizeof(int));
1434     for ( i=0; i<size; i++ ) {
1435       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
1436         procsnz[i] += rowlengths[j];
1437       }
1438     }
1439     PetscFree(rowlengths);
1440 
1441     /* determine max buffer needed and allocate it */
1442     maxnz = 0;
1443     for ( i=0; i<size; i++ ) {
1444       maxnz = PetscMax(maxnz,procsnz[i]);
1445     }
1446     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1447 
1448     /* read in my part of the matrix column indices  */
1449     nz = procsnz[0];
1450     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1451     mycols = ibuf;
1452     if (size == 1)  nz -= extra_rows;
1453     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1454     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
1455 
1456     /* read in every ones (except the last) and ship off */
1457     for ( i=1; i<size-1; i++ ) {
1458       nz = procsnz[i];
1459       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1460       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1461     }
1462     /* read in the stuff for the last proc */
1463     if ( size != 1 ) {
1464       nz = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
1465       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1466       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
1467       MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);
1468     }
1469     PetscFree(cols);
1470   }
1471   else {
1472     /* determine buffer space needed for message */
1473     nz = 0;
1474     for ( i=0; i<m; i++ ) {
1475       nz += locrowlens[i];
1476     }
1477     ibuf = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1478     mycols = ibuf;
1479     /* receive message of column indices*/
1480     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1481     MPI_Get_count(&status,MPI_INT,&maxnz);
1482     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIBAIJ:something is wrong with file");
1483   }
1484 
1485   /* loop over local rows, determining number of off diagonal entries */
1486   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
1487   odlens = dlens + (rend-rstart);
1488   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
1489   PetscMemzero(mask,3*Mbs*sizeof(int));
1490   masked1 = mask    + Mbs;
1491   masked2 = masked1 + Mbs;
1492   rowcount = 0; nzcount = 0;
1493   for ( i=0; i<mbs; i++ ) {
1494     dcount  = 0;
1495     odcount = 0;
1496     for ( j=0; j<bs; j++ ) {
1497       kmax = locrowlens[rowcount];
1498       for ( k=0; k<kmax; k++ ) {
1499         tmp = mycols[nzcount++]/bs;
1500         if (!mask[tmp]) {
1501           mask[tmp] = 1;
1502           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
1503           else masked1[dcount++] = tmp;
1504         }
1505       }
1506       rowcount++;
1507     }
1508 
1509     dlens[i]  = dcount;
1510     odlens[i] = odcount;
1511 
1512     /* zero out the mask elements we set */
1513     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
1514     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
1515   }
1516 
1517   /* create our matrix */
1518   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);
1519          CHKERRQ(ierr);
1520   A = *newmat;
1521   MatSetOption(A,MAT_COLUMNS_SORTED);
1522 
1523   if (!rank) {
1524     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
1525     /* read in my part of the matrix numerical values  */
1526     nz = procsnz[0];
1527     vals = buf;
1528     mycols = ibuf;
1529     if (size == 1)  nz -= extra_rows;
1530     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1531     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
1532 
1533     /* insert into matrix */
1534     jj      = rstart*bs;
1535     for ( i=0; i<m; i++ ) {
1536       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1537       mycols += locrowlens[i];
1538       vals   += locrowlens[i];
1539       jj++;
1540     }
1541     /* read in other processors (except the last one) and ship out */
1542     for ( i=1; i<size-1; i++ ) {
1543       nz = procsnz[i];
1544       vals = buf;
1545       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1546       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1547     }
1548     /* the last proc */
1549     if ( size != 1 ){
1550       nz = procsnz[i] - extra_rows;
1551       vals = buf;
1552       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1553       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
1554       MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);
1555     }
1556     PetscFree(procsnz);
1557   }
1558   else {
1559     /* receive numeric values */
1560     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
1561 
1562     /* receive message of values*/
1563     vals = buf;
1564     mycols = ibuf;
1565     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1566     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1567     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIBAIJ:something is wrong with file");
1568 
1569     /* insert into matrix */
1570     jj      = rstart*bs;
1571     for ( i=0; i<m; i++ ) {
1572       ierr    = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1573       mycols += locrowlens[i];
1574       vals   += locrowlens[i];
1575       jj++;
1576     }
1577   }
1578   PetscFree(locrowlens);
1579   PetscFree(buf);
1580   PetscFree(ibuf);
1581   PetscFree(rowners);
1582   PetscFree(dlens);
1583   PetscFree(mask);
1584   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1585   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1586   return 0;
1587 }
1588 
1589 
1590