xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision 59e971c2ffc80da70bc03f147463f507d4b9c13f)
1 #ifndef lint
2 static char vcid[] = "$Id: mpibaij.c,v 1.31 1996/11/20 05:04:27 curfman Exp curfman $";
3 #endif
4 
5 #include "src/mat/impls/baij/mpi/mpibaij.h"
6 #include "src/vec/vecimpl.h"
7 
8 #include "draw.h"
9 #include "pinclude/pviewer.h"
10 
11 extern int MatSetUpMultiply_MPIBAIJ(Mat);
12 extern int DisAssemble_MPIBAIJ(Mat);
13 extern int MatIncreaseOverlap_MPIBAIJ(Mat,int,IS *,int);
14 extern int MatGetSubMatrices_MPIBAIJ(Mat,int,IS *,IS *,MatGetSubMatrixCall,Mat **);
15 extern int MatLUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,Mat *);
16 extern int MatLUFactorNumeric_MPIBAIJ(Mat,Mat *);
17 extern int MatLUFactor_MPIBAIJ(Mat,IS,IS,double);
18 extern int MatSolve_MPIBAIJ(Mat,Vec,Vec);
19 extern int MatSolveAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
20 extern int MatSolveTrans_MPIBAIJ(Mat,Vec,Vec);
21 extern int MatSolveTransAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
22 extern int MatILUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,int,Mat *);
23 
24 
25 /*
26      Local utility routine that creates a mapping from the global column
27    number to the local number in the off-diagonal part of the local
28    storage of the matrix.  This is done in a non scable way since the
29    length of colmap equals the global matrix length.
30 */
31 static int CreateColmap_MPIBAIJ_Private(Mat mat)
32 {
33   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
34   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
35   int         nbs = B->nbs,i;
36 
37   baij->colmap = (int *) PetscMalloc(baij->Nbs*sizeof(int));CHKPTRQ(baij->colmap);
38   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
39   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
40   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i + 1;
41   return 0;
42 }
43 
44 static int MatGetRowIJ_MPIBAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
45                             PetscTruth *done)
46 {
47   Mat_MPIBAIJ *aij = (Mat_MPIBAIJ *) mat->data;
48   int         ierr;
49   if (aij->size == 1) {
50     ierr = MatGetRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
51   } else SETERRQ(1,"MatGetRowIJ_MPIBAIJ:not supported in parallel");
52   return 0;
53 }
54 
55 static int MatRestoreRowIJ_MPIBAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
56                                 PetscTruth *done)
57 {
58   Mat_MPIBAIJ *aij = (Mat_MPIBAIJ *) mat->data;
59   int        ierr;
60   if (aij->size == 1) {
61     ierr = MatRestoreRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
62   } else SETERRQ(1,"MatRestoreRowIJ_MPIBAIJ:not supported in parallel");
63   return 0;
64 }
65 
66 extern int MatSetValues_SeqBAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
67 static int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
68 {
69   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
70   Scalar      value;
71   int         ierr,i,j, rstart = baij->rstart, rend = baij->rend;
72   int         cstart = baij->cstart, cend = baij->cend,row,col;
73   int         roworiented = baij->roworiented,rstart_orig,rend_orig;
74   int         cstart_orig,cend_orig,bs=baij->bs;
75 
76   if (baij->insertmode != NOT_SET_VALUES && baij->insertmode != addv) {
77     SETERRQ(1,"MatSetValues_MPIBAIJ:Cannot mix inserts and adds");
78   }
79   baij->insertmode = addv;
80   rstart_orig = rstart*bs;
81   rend_orig   = rend*bs;
82   cstart_orig = cstart*bs;
83   cend_orig   = cend*bs;
84   for ( i=0; i<m; i++ ) {
85 #if defined(PETSC_BOPT_g)
86     if (im[i] < 0) SETERRQ(1,"MatSetValues_MPIBAIJ:Negative row");
87     if (im[i] >= baij->M) SETERRQ(1,"MatSetValues_MPIBAIJ:Row too large");
88 #endif
89     if (im[i] >= rstart_orig && im[i] < rend_orig) {
90       row = im[i] - rstart_orig;
91       for ( j=0; j<n; j++ ) {
92         if (in[j] >= cstart_orig && in[j] < cend_orig){
93           col = in[j] - cstart_orig;
94           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
95           ierr = MatSetValues_SeqBAIJ(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
96         }
97 #if defined(PETSC_BOPT_g)
98         else if (in[j] < 0) {SETERRQ(1,"MatSetValues_MPIBAIJ:Negative column");}
99         else if (in[j] >= baij->N) {SETERRQ(1,"MatSetValues_MPIBAIJ:Col too large");}
100 #endif
101         else {
102           if (mat->was_assembled) {
103             if (!baij->colmap) {
104               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
105             }
106             col = baij->colmap[in[j]/bs] - 1 + in[j]%bs;
107             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
108               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
109               col =  in[j];
110             }
111           }
112           else col = in[j];
113           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
114           ierr = MatSetValues_SeqBAIJ(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
115         }
116       }
117     }
118     else {
119       if (roworiented && !baij->donotstash) {
120         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
121       }
122       else {
123         if (!baij->donotstash) {
124           row = im[i];
125 	  for ( j=0; j<n; j++ ) {
126 	    ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
127           }
128         }
129       }
130     }
131   }
132   return 0;
133 }
134 
135 static int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
136 {
137   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
138   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
139   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col;
140 
141   for ( i=0; i<m; i++ ) {
142     if (idxm[i] < 0) SETERRQ(1,"MatGetValues_MPIBAIJ:Negative row");
143     if (idxm[i] >= baij->M) SETERRQ(1,"MatGetValues_MPIBAIJ:Row too large");
144     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
145       row = idxm[i] - bsrstart;
146       for ( j=0; j<n; j++ ) {
147         if (idxn[j] < 0) SETERRQ(1,"MatGetValues_MPIBAIJ:Negative column");
148         if (idxn[j] >= baij->N) SETERRQ(1,"MatGetValues_MPIBAIJ:Col too large");
149         if (idxn[j] >= bscstart && idxn[j] < bscend){
150           col = idxn[j] - bscstart;
151           ierr = MatGetValues(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
152         }
153         else {
154           if (!baij->colmap) {
155             ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
156           }
157           if((baij->colmap[idxn[j]/bs]-1 < 0) ||
158              (baij->garray[baij->colmap[idxn[j]/bs]-1] != idxn[j]/bs)) *(v+i*n+j) = 0.0;
159           else {
160             col  = (baij->colmap[idxn[j]/bs]-1)*bs + idxn[j]%bs;
161             ierr = MatGetValues(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
162           }
163         }
164       }
165     }
166     else {
167       SETERRQ(1,"MatGetValues_MPIBAIJ:Only local values currently supported");
168     }
169   }
170   return 0;
171 }
172 
173 static int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
174 {
175   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
176   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
177   int        ierr, i,bs2=baij->bs2;
178   double     sum = 0.0;
179   Scalar     *v;
180 
181   if (baij->size == 1) {
182     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
183   } else {
184     if (type == NORM_FROBENIUS) {
185       v = amat->a;
186       for (i=0; i<amat->nz*bs2; i++ ) {
187 #if defined(PETSC_COMPLEX)
188         sum += real(conj(*v)*(*v)); v++;
189 #else
190         sum += (*v)*(*v); v++;
191 #endif
192       }
193       v = bmat->a;
194       for (i=0; i<bmat->nz*bs2; i++ ) {
195 #if defined(PETSC_COMPLEX)
196         sum += real(conj(*v)*(*v)); v++;
197 #else
198         sum += (*v)*(*v); v++;
199 #endif
200       }
201       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
202       *norm = sqrt(*norm);
203     }
204     else
205       SETERRQ(1,"MatNorm_SeqBAIJ:No support for this norm yet");
206   }
207   return 0;
208 }
209 
210 static int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
211 {
212   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
213   MPI_Comm    comm = mat->comm;
214   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
215   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
216   MPI_Request *send_waits,*recv_waits;
217   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
218   InsertMode  addv;
219   Scalar      *rvalues,*svalues;
220 
221   /* make sure all processors are either in INSERTMODE or ADDMODE */
222   MPI_Allreduce(&baij->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
223   if (addv == (ADD_VALUES|INSERT_VALUES)) {
224     SETERRQ(1,"MatAssemblyBegin_MPIBAIJ:Some processors inserted others added");
225   }
226   baij->insertmode = addv; /* in case this processor had no cache */
227 
228   /*  first count number of contributors to each processor */
229   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
230   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
231   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
232   for ( i=0; i<baij->stash.n; i++ ) {
233     idx = baij->stash.idx[i];
234     for ( j=0; j<size; j++ ) {
235       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
236         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
237       }
238     }
239   }
240   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
241 
242   /* inform other processors of number of messages and max length*/
243   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
244   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
245   nreceives = work[rank];
246   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
247   nmax = work[rank];
248   PetscFree(work);
249 
250   /* post receives:
251        1) each message will consist of ordered pairs
252      (global index,value) we store the global index as a double
253      to simplify the message passing.
254        2) since we don't know how long each individual message is we
255      allocate the largest needed buffer for each receive. Potentially
256      this is a lot of wasted space.
257 
258 
259        This could be done better.
260   */
261   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
262   CHKPTRQ(rvalues);
263   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
264   CHKPTRQ(recv_waits);
265   for ( i=0; i<nreceives; i++ ) {
266     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
267               comm,recv_waits+i);
268   }
269 
270   /* do sends:
271       1) starts[i] gives the starting index in svalues for stuff going to
272          the ith processor
273   */
274   svalues = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
275   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
276   CHKPTRQ(send_waits);
277   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
278   starts[0] = 0;
279   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
280   for ( i=0; i<baij->stash.n; i++ ) {
281     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
282     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
283     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
284   }
285   PetscFree(owner);
286   starts[0] = 0;
287   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
288   count = 0;
289   for ( i=0; i<size; i++ ) {
290     if (procs[i]) {
291       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
292                 comm,send_waits+count++);
293     }
294   }
295   PetscFree(starts); PetscFree(nprocs);
296 
297   /* Free cache space */
298   PLogInfo(0,"[%d]MatAssemblyBegin_MPIBAIJ:Number of off processor values %d\n",rank,baij->stash.n);
299   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
300 
301   baij->svalues    = svalues;    baij->rvalues    = rvalues;
302   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
303   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
304   baij->rmax       = nmax;
305 
306   return 0;
307 }
308 
309 
310 static int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
311 {
312   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
313   MPI_Status  *send_status,recv_status;
314   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
315   int         bs=baij->bs,row,col,other_disassembled;
316   Scalar      *values,val;
317   InsertMode  addv = baij->insertmode;
318 
319   /*  wait on receives */
320   while (count) {
321     MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);
322     /* unpack receives into our local space */
323     values = baij->rvalues + 3*imdex*baij->rmax;
324     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
325     n = n/3;
326     for ( i=0; i<n; i++ ) {
327       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
328       col = (int) PetscReal(values[3*i+1]);
329       val = values[3*i+2];
330       if (col >= baij->cstart*bs && col < baij->cend*bs) {
331         col -= baij->cstart*bs;
332         MatSetValues(baij->A,1,&row,1,&col,&val,addv);
333       }
334       else {
335         if (mat->was_assembled) {
336           if (!baij->colmap) {
337             ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);
338           }
339           col = (baij->colmap[col/bs]-1)*bs + col%bs;
340           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
341             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
342             col = (int) PetscReal(values[3*i+1]);
343           }
344         }
345         MatSetValues(baij->B,1,&row,1,&col,&val,addv);
346       }
347     }
348     count--;
349   }
350   PetscFree(baij->recv_waits); PetscFree(baij->rvalues);
351 
352   /* wait on sends */
353   if (baij->nsends) {
354     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));
355     CHKPTRQ(send_status);
356     MPI_Waitall(baij->nsends,baij->send_waits,send_status);
357     PetscFree(send_status);
358   }
359   PetscFree(baij->send_waits); PetscFree(baij->svalues);
360 
361   baij->insertmode = NOT_SET_VALUES;
362   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
363   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
364 
365   /* determine if any processor has disassembled, if so we must
366      also disassemble ourselfs, in order that we may reassemble. */
367   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
368   if (mat->was_assembled && !other_disassembled) {
369     ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
370   }
371 
372   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
373     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
374   }
375   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
376   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
377 
378   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
379   return 0;
380 }
381 
382 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
383 {
384   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
385   int          ierr;
386 
387   if (baij->size == 1) {
388     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
389   }
390   else SETERRQ(1,"MatView_MPIBAIJ_Binary:Only uniprocessor output supported");
391   return 0;
392 }
393 
394 static int MatView_MPIBAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
395 {
396   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
397   int          ierr, format,rank,bs = baij->bs;
398   FILE         *fd;
399   ViewerType   vtype;
400 
401   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
402   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
403     ierr = ViewerGetFormat(viewer,&format);
404     if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
405       MatInfo info;
406       MPI_Comm_rank(mat->comm,&rank);
407       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
408       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
409       PetscSequentialPhaseBegin(mat->comm,1);
410       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
411               rank,baij->m,(int)info.nz_used*bs,(int)info.nz_allocated*bs,
412               baij->bs,(int)info.memory);
413       ierr = MatGetInfo(baij->A,MAT_LOCAL,&info);
414       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
415       ierr = MatGetInfo(baij->B,MAT_LOCAL,&info);
416       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
417       fflush(fd);
418       PetscSequentialPhaseEnd(mat->comm,1);
419       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
420       return 0;
421     }
422     else if (format == VIEWER_FORMAT_ASCII_INFO) {
423       PetscPrintf(mat->comm,"  block size is %d\n",bs);
424       return 0;
425     }
426   }
427 
428   if (vtype == DRAW_VIEWER) {
429     Draw       draw;
430     PetscTruth isnull;
431     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
432     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
433   }
434 
435   if (vtype == ASCII_FILE_VIEWER) {
436     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
437     PetscSequentialPhaseBegin(mat->comm,1);
438     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
439            baij->rank,baij->m,baij->rstart*bs,baij->rend*bs,baij->n,
440             baij->cstart*bs,baij->cend*bs);
441     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
442     ierr = MatView(baij->B,viewer); CHKERRQ(ierr);
443     fflush(fd);
444     PetscSequentialPhaseEnd(mat->comm,1);
445   }
446   else {
447     int size = baij->size;
448     rank = baij->rank;
449     if (size == 1) {
450       ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
451     }
452     else {
453       /* assemble the entire matrix onto first processor. */
454       Mat         A;
455       Mat_SeqBAIJ *Aloc;
456       int         M = baij->M, N = baij->N,*ai,*aj,row,col,i,j,k,*rvals;
457       int         mbs=baij->mbs;
458       Scalar      *a;
459 
460       if (!rank) {
461         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
462         CHKERRQ(ierr);
463       }
464       else {
465         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
466         CHKERRQ(ierr);
467       }
468       PLogObjectParent(mat,A);
469 
470       /* copy over the A part */
471       Aloc = (Mat_SeqBAIJ*) baij->A->data;
472       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
473       row = baij->rstart;
474       rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
475 
476       for ( i=0; i<mbs; i++ ) {
477         rvals[0] = bs*(baij->rstart + i);
478         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
479         for ( j=ai[i]; j<ai[i+1]; j++ ) {
480           col = (baij->cstart+aj[j])*bs;
481           for (k=0; k<bs; k++ ) {
482             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
483             col++; a += bs;
484           }
485         }
486       }
487       /* copy over the B part */
488       Aloc = (Mat_SeqBAIJ*) baij->B->data;
489       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
490       row = baij->rstart*bs;
491       for ( i=0; i<mbs; i++ ) {
492         rvals[0] = bs*(baij->rstart + i);
493         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
494         for ( j=ai[i]; j<ai[i+1]; j++ ) {
495           col = baij->garray[aj[j]]*bs;
496           for (k=0; k<bs; k++ ) {
497             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
498             col++; a += bs;
499           }
500         }
501       }
502       PetscFree(rvals);
503       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
504       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
505       if (!rank) {
506         ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
507       }
508       ierr = MatDestroy(A); CHKERRQ(ierr);
509     }
510   }
511   return 0;
512 }
513 
514 
515 
516 static int MatView_MPIBAIJ(PetscObject obj,Viewer viewer)
517 {
518   Mat         mat = (Mat) obj;
519   int         ierr;
520   ViewerType  vtype;
521 
522   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
523   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
524       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
525     ierr = MatView_MPIBAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
526   }
527   else if (vtype == BINARY_FILE_VIEWER) {
528     return MatView_MPIBAIJ_Binary(mat,viewer);
529   }
530   return 0;
531 }
532 
533 static int MatDestroy_MPIBAIJ(PetscObject obj)
534 {
535   Mat         mat = (Mat) obj;
536   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
537   int         ierr;
538 
539 #if defined(PETSC_LOG)
540   PLogObjectState(obj,"Rows=%d, Cols=%d",baij->M,baij->N);
541 #endif
542 
543   PetscFree(baij->rowners);
544   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
545   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
546   if (baij->colmap) PetscFree(baij->colmap);
547   if (baij->garray) PetscFree(baij->garray);
548   if (baij->lvec)   VecDestroy(baij->lvec);
549   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
550   if (baij->rowvalues) PetscFree(baij->rowvalues);
551   PetscFree(baij);
552   if (mat->mapping) {
553     ierr = ISLocalToGlobalMappingDestroy(mat->mapping); CHKERRQ(ierr);
554   }
555   PLogObjectDestroy(mat);
556   PetscHeaderDestroy(mat);
557   return 0;
558 }
559 
560 static int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
561 {
562   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
563   int         ierr, nt;
564 
565   VecGetLocalSize_Fast(xx,nt);
566   if (nt != a->n) {
567     SETERRQ(1,"MatMult_MPIBAIJ:Incompatible parition of A and xx");
568   }
569   VecGetLocalSize_Fast(yy,nt);
570   if (nt != a->m) {
571     SETERRQ(1,"MatMult_MPIBAIJ:Incompatible parition of A and yy");
572   }
573   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
574   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
575   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
576   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
577   ierr = VecScatterPostRecvs(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
578   return 0;
579 }
580 
581 static int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
582 {
583   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
584   int        ierr;
585   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
586   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
587   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
588   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
589   return 0;
590 }
591 
592 static int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
593 {
594   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
595   int        ierr;
596 
597   /* do nondiagonal part */
598   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
599   /* send it on its way */
600   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
601   /* do local part */
602   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
603   /* receive remote parts: note this assumes the values are not actually */
604   /* inserted in yy until the next line, which is true for my implementation*/
605   /* but is not perhaps always true. */
606   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
607   return 0;
608 }
609 
610 static int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
611 {
612   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
613   int        ierr;
614 
615   /* do nondiagonal part */
616   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
617   /* send it on its way */
618   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
619   /* do local part */
620   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
621   /* receive remote parts: note this assumes the values are not actually */
622   /* inserted in yy until the next line, which is true for my implementation*/
623   /* but is not perhaps always true. */
624   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
625   return 0;
626 }
627 
628 /*
629   This only works correctly for square matrices where the subblock A->A is the
630    diagonal block
631 */
632 static int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
633 {
634   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
635   if (a->M != a->N)
636     SETERRQ(1,"MatGetDiagonal_MPIBAIJ:Supports only square matrix where A->A is diag block");
637   return MatGetDiagonal(a->A,v);
638 }
639 
640 static int MatScale_MPIBAIJ(Scalar *aa,Mat A)
641 {
642   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
643   int        ierr;
644   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
645   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
646   return 0;
647 }
648 static int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
649 {
650   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
651   *m = mat->M; *n = mat->N;
652   return 0;
653 }
654 
655 static int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
656 {
657   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
658   *m = mat->m; *n = mat->N;
659   return 0;
660 }
661 
662 static int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
663 {
664   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
665   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
666   return 0;
667 }
668 
669 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
670 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
671 
672 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
673 {
674   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
675   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
676   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
677   int        nztot, nzA, nzB, lrow, brstart = mat->rstart*bs, brend = mat->rend*bs;
678   int        *cmap, *idx_p,cstart = mat->cstart;
679 
680   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,"MatGetRow_MPIBAIJ:Already active");
681   mat->getrowactive = PETSC_TRUE;
682 
683   if (!mat->rowvalues && (idx || v)) {
684     /*
685         allocate enough space to hold information from the longest row.
686     */
687     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
688     int     max = 1,mbs = mat->mbs,tmp;
689     for ( i=0; i<mbs; i++ ) {
690       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
691       if (max < tmp) { max = tmp; }
692     }
693     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
694     CHKPTRQ(mat->rowvalues);
695     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
696   }
697 
698 
699   if (row < brstart || row >= brend) SETERRQ(1,"MatGetRow_MPIBAIJ:Only local rows")
700   lrow = row - brstart;
701 
702   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
703   if (!v)   {pvA = 0; pvB = 0;}
704   if (!idx) {pcA = 0; if (!v) pcB = 0;}
705   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
706   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
707   nztot = nzA + nzB;
708 
709   cmap  = mat->garray;
710   if (v  || idx) {
711     if (nztot) {
712       /* Sort by increasing column numbers, assuming A and B already sorted */
713       int imark = -1;
714       if (v) {
715         *v = v_p = mat->rowvalues;
716         for ( i=0; i<nzB; i++ ) {
717           if (cmap[cworkB[i]/bs] < cstart)   v_p[i] = vworkB[i];
718           else break;
719         }
720         imark = i;
721         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
722         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
723       }
724       if (idx) {
725         *idx = idx_p = mat->rowindices;
726         if (imark > -1) {
727           for ( i=0; i<imark; i++ ) {
728             idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs;
729           }
730         } else {
731           for ( i=0; i<nzB; i++ ) {
732             if (cmap[cworkB[i]/bs] < cstart)
733               idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
734             else break;
735           }
736           imark = i;
737         }
738         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart*bs + cworkA[i];
739         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
740       }
741     }
742     else {
743       if (idx) *idx = 0;
744       if (v)   *v   = 0;
745     }
746   }
747   *nz = nztot;
748   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
749   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
750   return 0;
751 }
752 
753 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
754 {
755   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
756   if (baij->getrowactive == PETSC_FALSE) {
757     SETERRQ(1,"MatRestoreRow_MPIBAIJ:MatGetRow not called");
758   }
759   baij->getrowactive = PETSC_FALSE;
760   return 0;
761 }
762 
763 static int MatGetBlockSize_MPIBAIJ(Mat mat,int *bs)
764 {
765   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
766   *bs = baij->bs;
767   return 0;
768 }
769 
770 static int MatZeroEntries_MPIBAIJ(Mat A)
771 {
772   Mat_MPIBAIJ *l = (Mat_MPIBAIJ *) A->data;
773   int         ierr;
774   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
775   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
776   return 0;
777 }
778 
779 static int MatGetInfo_MPIBAIJ(Mat matin,MatInfoType flag,MatInfo *info)
780 {
781   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) matin->data;
782   Mat         A = a->A, B = a->B;
783   int         ierr;
784   double      isend[5], irecv[5];
785 
786   info->rows_global    = (double)a->M;
787   info->columns_global = (double)a->N;
788   info->rows_local     = (double)a->m;
789   info->columns_local  = (double)a->N;
790   info->block_size     = (double)a->bs;
791   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
792   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->memory;
793   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
794   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->memory;
795   if (flag == MAT_LOCAL) {
796     info->nz_used      = isend[0];
797     info->nz_allocated = isend[1];
798     info->nz_unneeded  = isend[2];
799     info->memory       = isend[3];
800     info->mallocs      = isend[4];
801   } else if (flag == MAT_GLOBAL_MAX) {
802     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_MAX,matin->comm);
803     info->nz_used      = irecv[0];
804     info->nz_allocated = irecv[1];
805     info->nz_unneeded  = irecv[2];
806     info->memory       = irecv[3];
807     info->mallocs      = irecv[4];
808   } else if (flag == MAT_GLOBAL_SUM) {
809     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_SUM,matin->comm);
810     info->nz_used      = irecv[0];
811     info->nz_allocated = irecv[1];
812     info->nz_unneeded  = irecv[2];
813     info->memory       = irecv[3];
814     info->mallocs      = irecv[4];
815   }
816   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
817   info->fill_ratio_needed = 0;
818   info->factor_mallocs    = 0;
819   return 0;
820 }
821 
822 static int MatSetOption_MPIBAIJ(Mat A,MatOption op)
823 {
824   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
825 
826   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
827       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
828       op == MAT_COLUMNS_SORTED) {
829         MatSetOption(a->A,op);
830         MatSetOption(a->B,op);
831   } else if (op == MAT_ROW_ORIENTED) {
832         a->roworiented = 1;
833         MatSetOption(a->A,op);
834         MatSetOption(a->B,op);
835   } else if (op == MAT_ROWS_SORTED ||
836              op == MAT_SYMMETRIC ||
837              op == MAT_STRUCTURALLY_SYMMETRIC ||
838              op == MAT_YES_NEW_DIAGONALS)
839     PLogInfo(A,"Info:MatSetOption_MPIBAIJ:Option ignored\n");
840   else if (op == MAT_COLUMN_ORIENTED) {
841     a->roworiented = 0;
842     MatSetOption(a->A,op);
843     MatSetOption(a->B,op);
844   } else if (op == MAT_IGNORE_OFF_PROCESSOR_ENTRIES) {
845     a->donotstash = 1;
846   } else if (op == MAT_NO_NEW_DIAGONALS)
847     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIBAIJ:MAT_NO_NEW_DIAGONALS");}
848   else
849     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIBAIJ:unknown option");}
850   return 0;
851 }
852 
853 static int MatTranspose_MPIBAIJ(Mat A,Mat *matout)
854 {
855   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) A->data;
856   Mat_SeqBAIJ *Aloc;
857   Mat        B;
858   int        ierr,M=baij->M,N=baij->N,*ai,*aj,row,i,*rvals,j,k,col;
859   int        bs=baij->bs,mbs=baij->mbs;
860   Scalar     *a;
861 
862   if (matout == PETSC_NULL && M != N)
863     SETERRQ(1,"MatTranspose_MPIBAIJ:Square matrix only for in-place");
864   ierr = MatCreateMPIBAIJ(A->comm,baij->bs,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);
865   CHKERRQ(ierr);
866 
867   /* copy over the A part */
868   Aloc = (Mat_SeqBAIJ*) baij->A->data;
869   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
870   row = baij->rstart;
871   rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
872 
873   for ( i=0; i<mbs; i++ ) {
874     rvals[0] = bs*(baij->rstart + i);
875     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
876     for ( j=ai[i]; j<ai[i+1]; j++ ) {
877       col = (baij->cstart+aj[j])*bs;
878       for (k=0; k<bs; k++ ) {
879         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
880         col++; a += bs;
881       }
882     }
883   }
884   /* copy over the B part */
885   Aloc = (Mat_SeqBAIJ*) baij->B->data;
886   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
887   row = baij->rstart*bs;
888   for ( i=0; i<mbs; i++ ) {
889     rvals[0] = bs*(baij->rstart + i);
890     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
891     for ( j=ai[i]; j<ai[i+1]; j++ ) {
892       col = baij->garray[aj[j]]*bs;
893       for (k=0; k<bs; k++ ) {
894         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
895         col++; a += bs;
896       }
897     }
898   }
899   PetscFree(rvals);
900   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
901   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
902 
903   if (matout != PETSC_NULL) {
904     *matout = B;
905   } else {
906     /* This isn't really an in-place transpose .... but free data structures from baij */
907     PetscFree(baij->rowners);
908     ierr = MatDestroy(baij->A); CHKERRQ(ierr);
909     ierr = MatDestroy(baij->B); CHKERRQ(ierr);
910     if (baij->colmap) PetscFree(baij->colmap);
911     if (baij->garray) PetscFree(baij->garray);
912     if (baij->lvec) VecDestroy(baij->lvec);
913     if (baij->Mvctx) VecScatterDestroy(baij->Mvctx);
914     PetscFree(baij);
915     PetscMemcpy(A,B,sizeof(struct _Mat));
916     PetscHeaderDestroy(B);
917   }
918   return 0;
919 }
920 
921 int MatDiagonalScale_MPIBAIJ(Mat A,Vec ll,Vec rr)
922 {
923   Mat a = ((Mat_MPIBAIJ *) A->data)->A;
924   Mat b = ((Mat_MPIBAIJ *) A->data)->B;
925   int ierr,s1,s2,s3;
926 
927   if (ll)  {
928     ierr = VecGetLocalSize(ll,&s1); CHKERRQ(ierr);
929     ierr = MatGetLocalSize(A,&s2,&s3); CHKERRQ(ierr);
930     if (s1!=s2) SETERRQ(1,"MatDiagonalScale_MPIBAIJ: non-conforming local sizes");
931     ierr = MatDiagonalScale(a,ll,0); CHKERRQ(ierr);
932     ierr = MatDiagonalScale(b,ll,0); CHKERRQ(ierr);
933   }
934   if (rr) SETERRQ(1,"MatDiagonalScale_MPIBAIJ:not supported for right vector");
935   return 0;
936 }
937 
938 /* the code does not do the diagonal entries correctly unless the
939    matrix is square and the column and row owerships are identical.
940    This is a BUG. The only way to fix it seems to be to access
941    baij->A and baij->B directly and not through the MatZeroRows()
942    routine.
943 */
944 static int MatZeroRows_MPIBAIJ(Mat A,IS is,Scalar *diag)
945 {
946   Mat_MPIBAIJ    *l = (Mat_MPIBAIJ *) A->data;
947   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
948   int            *procs,*nprocs,j,found,idx,nsends,*work;
949   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
950   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
951   int            *lens,imdex,*lrows,*values,bs=l->bs;
952   MPI_Comm       comm = A->comm;
953   MPI_Request    *send_waits,*recv_waits;
954   MPI_Status     recv_status,*send_status;
955   IS             istmp;
956 
957   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
958   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
959 
960   /*  first count number of contributors to each processor */
961   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
962   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
963   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
964   for ( i=0; i<N; i++ ) {
965     idx = rows[i];
966     found = 0;
967     for ( j=0; j<size; j++ ) {
968       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
969         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
970       }
971     }
972     if (!found) SETERRQ(1,"MatZeroRows_MPIBAIJ:Index out of range");
973   }
974   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
975 
976   /* inform other processors of number of messages and max length*/
977   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
978   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
979   nrecvs = work[rank];
980   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
981   nmax = work[rank];
982   PetscFree(work);
983 
984   /* post receives:   */
985   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
986   CHKPTRQ(rvalues);
987   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
988   CHKPTRQ(recv_waits);
989   for ( i=0; i<nrecvs; i++ ) {
990     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
991   }
992 
993   /* do sends:
994       1) starts[i] gives the starting index in svalues for stuff going to
995          the ith processor
996   */
997   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
998   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
999   CHKPTRQ(send_waits);
1000   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
1001   starts[0] = 0;
1002   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1003   for ( i=0; i<N; i++ ) {
1004     svalues[starts[owner[i]]++] = rows[i];
1005   }
1006   ISRestoreIndices(is,&rows);
1007 
1008   starts[0] = 0;
1009   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1010   count = 0;
1011   for ( i=0; i<size; i++ ) {
1012     if (procs[i]) {
1013       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
1014     }
1015   }
1016   PetscFree(starts);
1017 
1018   base = owners[rank]*bs;
1019 
1020   /*  wait on receives */
1021   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
1022   source = lens + nrecvs;
1023   count  = nrecvs; slen = 0;
1024   while (count) {
1025     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
1026     /* unpack receives into our local space */
1027     MPI_Get_count(&recv_status,MPI_INT,&n);
1028     source[imdex]  = recv_status.MPI_SOURCE;
1029     lens[imdex]  = n;
1030     slen += n;
1031     count--;
1032   }
1033   PetscFree(recv_waits);
1034 
1035   /* move the data into the send scatter */
1036   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
1037   count = 0;
1038   for ( i=0; i<nrecvs; i++ ) {
1039     values = rvalues + i*nmax;
1040     for ( j=0; j<lens[i]; j++ ) {
1041       lrows[count++] = values[j] - base;
1042     }
1043   }
1044   PetscFree(rvalues); PetscFree(lens);
1045   PetscFree(owner); PetscFree(nprocs);
1046 
1047   /* actually zap the local rows */
1048   ierr = ISCreateGeneral(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
1049   PLogObjectParent(A,istmp);
1050   PetscFree(lrows);
1051   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
1052   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
1053   ierr = ISDestroy(istmp); CHKERRQ(ierr);
1054 
1055   /* wait on sends */
1056   if (nsends) {
1057     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
1058     CHKPTRQ(send_status);
1059     MPI_Waitall(nsends,send_waits,send_status);
1060     PetscFree(send_status);
1061   }
1062   PetscFree(send_waits); PetscFree(svalues);
1063 
1064   return 0;
1065 }
1066 extern int MatPrintHelp_SeqBAIJ(Mat);
1067 static int MatPrintHelp_MPIBAIJ(Mat A)
1068 {
1069   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1070 
1071   if (!a->rank) return MatPrintHelp_SeqBAIJ(a->A);
1072   else return 0;
1073 }
1074 
1075 static int MatConvertSameType_MPIBAIJ(Mat,Mat *,int);
1076 
1077 /* -------------------------------------------------------------------*/
1078 static struct _MatOps MatOps = {
1079   MatSetValues_MPIBAIJ,MatGetRow_MPIBAIJ,MatRestoreRow_MPIBAIJ,MatMult_MPIBAIJ,
1080   MatMultAdd_MPIBAIJ,MatMultTrans_MPIBAIJ,MatMultTransAdd_MPIBAIJ,0,
1081   0,0,0,0,
1082   0,0,MatTranspose_MPIBAIJ,MatGetInfo_MPIBAIJ,
1083   0,MatGetDiagonal_MPIBAIJ,MatDiagonalScale_MPIBAIJ,MatNorm_MPIBAIJ,
1084   MatAssemblyBegin_MPIBAIJ,MatAssemblyEnd_MPIBAIJ,0,MatSetOption_MPIBAIJ,
1085   MatZeroEntries_MPIBAIJ,MatZeroRows_MPIBAIJ,0,
1086   0,0,0,MatGetSize_MPIBAIJ,
1087   MatGetLocalSize_MPIBAIJ,MatGetOwnershipRange_MPIBAIJ,0,0,
1088   0,0,0,MatConvertSameType_MPIBAIJ,0,0,
1089   0,0,0,MatGetSubMatrices_MPIBAIJ,
1090   MatIncreaseOverlap_MPIBAIJ,MatGetValues_MPIBAIJ,0,MatPrintHelp_MPIBAIJ,
1091   MatScale_MPIBAIJ,0,0,0,MatGetBlockSize_MPIBAIJ};
1092 
1093 
1094 /*@C
1095    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
1096    (block compressed row).  For good matrix assembly performance
1097    the user should preallocate the matrix storage by setting the parameters
1098    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1099    performance can be increased by more than a factor of 50.
1100 
1101    Input Parameters:
1102 .  comm - MPI communicator
1103 .  bs   - size of blockk
1104 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1105            This value should be the same as the local size used in creating the
1106            y vector for the matrix-vector product y = Ax.
1107 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1108            This value should be the same as the local size used in creating the
1109            x vector for the matrix-vector product y = Ax.
1110 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1111 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1112 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
1113            submatrix  (same for all local rows)
1114 .  d_nzz - array containing the number of block nonzeros in the various block rows
1115            of the in diagonal portion of the local (possibly different for each block
1116            row) or PETSC_NULL.  You must leave room for the diagonal entry even if
1117            it is zero.
1118 .  o_nz  - number of block nonzeros per block row in the off-diagonal portion of local
1119            submatrix (same for all local rows).
1120 .  o_nzz - array containing the number of nonzeros in the various block rows of the
1121            off-diagonal portion of the local submatrix (possibly different for
1122            each block row) or PETSC_NULL.
1123 
1124    Output Parameter:
1125 .  A - the matrix
1126 
1127    Notes:
1128    The user MUST specify either the local or global matrix dimensions
1129    (possibly both).
1130 
1131    Storage Information:
1132    For a square global matrix we define each processor's diagonal portion
1133    to be its local rows and the corresponding columns (a square submatrix);
1134    each processor's off-diagonal portion encompasses the remainder of the
1135    local matrix (a rectangular submatrix).
1136 
1137    The user can specify preallocated storage for the diagonal part of
1138    the local submatrix with either d_nz or d_nnz (not both).  Set
1139    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1140    memory allocation.  Likewise, specify preallocated storage for the
1141    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1142 
1143    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1144    the figure below we depict these three local rows and all columns (0-11).
1145 
1146 $          0 1 2 3 4 5 6 7 8 9 10 11
1147 $         -------------------
1148 $  row 3  |  o o o d d d o o o o o o
1149 $  row 4  |  o o o d d d o o o o o o
1150 $  row 5  |  o o o d d d o o o o o o
1151 $         -------------------
1152 $
1153 
1154    Thus, any entries in the d locations are stored in the d (diagonal)
1155    submatrix, and any entries in the o locations are stored in the
1156    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1157    stored simply in the MATSEQBAIJ format for compressed row storage.
1158 
1159    Now d_nz should indicate the number of nonzeros per row in the d matrix,
1160    and o_nz should indicate the number of nonzeros per row in the o matrix.
1161    In general, for PDE problems in which most nonzeros are near the diagonal,
1162    one expects d_nz >> o_nz.   For large problems you MUST preallocate memory
1163    or you will get TERRIBLE performance; see the users' manual chapter on
1164    matrices and the file $(PETSC_DIR)/Performance.
1165 
1166 .keywords: matrix, block, aij, compressed row, sparse, parallel
1167 
1168 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues()
1169 @*/
1170 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
1171                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1172 {
1173   Mat          B;
1174   Mat_MPIBAIJ  *b;
1175   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE,size;
1176 
1177   if (bs < 1) SETERRQ(1,"MatCreateMPIBAIJ: invalid block size specified");
1178   *A = 0;
1179   PetscHeaderCreate(B,_Mat,MAT_COOKIE,MATMPIBAIJ,comm);
1180   PLogObjectCreate(B);
1181   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
1182   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
1183   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1184   MPI_Comm_size(comm,&size);
1185   if (size == 1) {
1186     B->ops.getrowij          = MatGetRowIJ_MPIBAIJ;
1187     B->ops.restorerowij      = MatRestoreRowIJ_MPIBAIJ;
1188     B->ops.lufactorsymbolic  = MatLUFactorSymbolic_MPIBAIJ;
1189     B->ops.lufactornumeric   = MatLUFactorNumeric_MPIBAIJ;
1190     B->ops.lufactor          = MatLUFactor_MPIBAIJ;
1191     B->ops.solve             = MatSolve_MPIBAIJ;
1192     B->ops.solveadd          = MatSolveAdd_MPIBAIJ;
1193     B->ops.solvetrans        = MatSolveTrans_MPIBAIJ;
1194     B->ops.solvetransadd     = MatSolveTransAdd_MPIBAIJ;
1195     B->ops.ilufactorsymbolic = MatILUFactorSymbolic_MPIBAIJ;
1196   }
1197 
1198   B->destroy    = MatDestroy_MPIBAIJ;
1199   B->view       = MatView_MPIBAIJ;
1200   B->mapping    = 0;
1201   B->factor     = 0;
1202   B->assembled  = PETSC_FALSE;
1203 
1204   b->insertmode = NOT_SET_VALUES;
1205   MPI_Comm_rank(comm,&b->rank);
1206   MPI_Comm_size(comm,&b->size);
1207 
1208   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
1209     SETERRQ(1,"MatCreateMPIBAIJ:Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1210   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) SETERRQ(1,"MatCreateMPIBAIJ: either M or m should be specified");
1211   if ( M == PETSC_DECIDE && n == PETSC_DECIDE)SETERRQ(1,"MatCreateMPIBAIJ: either N or n should be specified");
1212   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
1213   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
1214 
1215   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1216     work[0] = m; work[1] = n;
1217     mbs = m/bs; nbs = n/bs;
1218     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
1219     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
1220     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
1221   }
1222   if (m == PETSC_DECIDE) {
1223     Mbs = M/bs;
1224     if (Mbs*bs != M) SETERRQ(1,"MatCreateMPIBAIJ: No of global rows must be divisible by blocksize");
1225     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
1226     m   = mbs*bs;
1227   }
1228   if (n == PETSC_DECIDE) {
1229     Nbs = N/bs;
1230     if (Nbs*bs != N) SETERRQ(1,"MatCreateMPIBAIJ: No of global cols must be divisible by blocksize");
1231     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
1232     n   = nbs*bs;
1233   }
1234   if (mbs*bs != m || nbs*bs != n) SETERRQ(1,"MatCreateMPIBAIJ: No of local rows, cols must be divisible by blocksize");
1235 
1236   b->m = m; B->m = m;
1237   b->n = n; B->n = n;
1238   b->N = N; B->N = N;
1239   b->M = M; B->M = M;
1240   b->bs  = bs;
1241   b->bs2 = bs*bs;
1242   b->mbs = mbs;
1243   b->nbs = nbs;
1244   b->Mbs = Mbs;
1245   b->Nbs = Nbs;
1246 
1247   /* build local table of row and column ownerships */
1248   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1249   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
1250   b->cowners = b->rowners + b->size + 2;
1251   MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
1252   b->rowners[0] = 0;
1253   for ( i=2; i<=b->size; i++ ) {
1254     b->rowners[i] += b->rowners[i-1];
1255   }
1256   b->rstart = b->rowners[b->rank];
1257   b->rend   = b->rowners[b->rank+1];
1258   MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
1259   b->cowners[0] = 0;
1260   for ( i=2; i<=b->size; i++ ) {
1261     b->cowners[i] += b->cowners[i-1];
1262   }
1263   b->cstart = b->cowners[b->rank];
1264   b->cend   = b->cowners[b->rank+1];
1265 
1266   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1267   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1268   PLogObjectParent(B,b->A);
1269   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1270   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1271   PLogObjectParent(B,b->B);
1272 
1273   /* build cache for off array entries formed */
1274   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1275   b->donotstash  = 0;
1276   b->colmap      = 0;
1277   b->garray      = 0;
1278   b->roworiented = 1;
1279 
1280   /* stuff used for matrix vector multiply */
1281   b->lvec      = 0;
1282   b->Mvctx     = 0;
1283 
1284   /* stuff for MatGetRow() */
1285   b->rowindices   = 0;
1286   b->rowvalues    = 0;
1287   b->getrowactive = PETSC_FALSE;
1288 
1289   *A = B;
1290   return 0;
1291 }
1292 static int MatConvertSameType_MPIBAIJ(Mat matin,Mat *newmat,int cpvalues)
1293 {
1294   Mat        mat;
1295   Mat_MPIBAIJ *a,*oldmat = (Mat_MPIBAIJ *) matin->data;
1296   int        ierr, len=0, flg;
1297 
1298   *newmat       = 0;
1299   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIBAIJ,matin->comm);
1300   PLogObjectCreate(mat);
1301   mat->data       = (void *) (a = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(a);
1302   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1303   mat->destroy    = MatDestroy_MPIBAIJ;
1304   mat->view       = MatView_MPIBAIJ;
1305   mat->factor     = matin->factor;
1306   mat->assembled  = PETSC_TRUE;
1307 
1308   a->m = mat->m   = oldmat->m;
1309   a->n = mat->n   = oldmat->n;
1310   a->M = mat->M   = oldmat->M;
1311   a->N = mat->N   = oldmat->N;
1312 
1313   a->bs  = oldmat->bs;
1314   a->bs2 = oldmat->bs2;
1315   a->mbs = oldmat->mbs;
1316   a->nbs = oldmat->nbs;
1317   a->Mbs = oldmat->Mbs;
1318   a->Nbs = oldmat->Nbs;
1319 
1320   a->rstart       = oldmat->rstart;
1321   a->rend         = oldmat->rend;
1322   a->cstart       = oldmat->cstart;
1323   a->cend         = oldmat->cend;
1324   a->size         = oldmat->size;
1325   a->rank         = oldmat->rank;
1326   a->insertmode   = NOT_SET_VALUES;
1327   a->rowvalues    = 0;
1328   a->getrowactive = PETSC_FALSE;
1329 
1330   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1331   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
1332   a->cowners = a->rowners + a->size + 2;
1333   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1334   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1335   if (oldmat->colmap) {
1336     a->colmap = (int *) PetscMalloc((a->Nbs)*sizeof(int));CHKPTRQ(a->colmap);
1337     PLogObjectMemory(mat,(a->Nbs)*sizeof(int));
1338     PetscMemcpy(a->colmap,oldmat->colmap,(a->Nbs)*sizeof(int));
1339   } else a->colmap = 0;
1340   if (oldmat->garray && (len = ((Mat_SeqBAIJ *) (oldmat->B->data))->nbs)) {
1341     a->garray = (int *) PetscMalloc(len*sizeof(int)); CHKPTRQ(a->garray);
1342     PLogObjectMemory(mat,len*sizeof(int));
1343     PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1344   } else a->garray = 0;
1345 
1346   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1347   PLogObjectParent(mat,a->lvec);
1348   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1349   PLogObjectParent(mat,a->Mvctx);
1350   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1351   PLogObjectParent(mat,a->A);
1352   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1353   PLogObjectParent(mat,a->B);
1354   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1355   if (flg) {
1356     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1357   }
1358   *newmat = mat;
1359   return 0;
1360 }
1361 
1362 #include "sys.h"
1363 
1364 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
1365 {
1366   Mat          A;
1367   int          i, nz, ierr, j,rstart, rend, fd;
1368   Scalar       *vals,*buf;
1369   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1370   MPI_Status   status;
1371   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
1372   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
1373   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,bs2,Mbs,mbs,extra_rows;
1374   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
1375   int          dcount,kmax,k,nzcount,tmp;
1376 
1377 
1378   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
1379   bs2  = bs*bs;
1380 
1381   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1382   if (!rank) {
1383     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1384     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
1385     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIBAIJ:not matrix object");
1386   }
1387 
1388   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1389   M = header[1]; N = header[2];
1390 
1391   if (M != N) SETERRQ(1,"MatLoad_SeqBAIJ:Can only do square matrices");
1392 
1393   /*
1394      This code adds extra rows to make sure the number of rows is
1395      divisible by the blocksize
1396   */
1397   Mbs        = M/bs;
1398   extra_rows = bs - M + bs*(Mbs);
1399   if (extra_rows == bs) extra_rows = 0;
1400   else                  Mbs++;
1401   if (extra_rows &&!rank) {
1402     PLogInfo(0,"MatLoad_SeqBAIJ:Padding loaded matrix to match blocksize\n");
1403   }
1404 
1405   /* determine ownership of all rows */
1406   mbs = Mbs/size + ((Mbs % size) > rank);
1407   m   = mbs * bs;
1408   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
1409   browners = rowners + size + 1;
1410   MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1411   rowners[0] = 0;
1412   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
1413   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
1414   rstart = rowners[rank];
1415   rend   = rowners[rank+1];
1416 
1417   /* distribute row lengths to all processors */
1418   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
1419   if (!rank) {
1420     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
1421     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1422     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
1423     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1424     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
1425     MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);
1426     PetscFree(sndcounts);
1427   }
1428   else {
1429     MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);
1430   }
1431 
1432   if (!rank) {
1433     /* calculate the number of nonzeros on each processor */
1434     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1435     PetscMemzero(procsnz,size*sizeof(int));
1436     for ( i=0; i<size; i++ ) {
1437       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
1438         procsnz[i] += rowlengths[j];
1439       }
1440     }
1441     PetscFree(rowlengths);
1442 
1443     /* determine max buffer needed and allocate it */
1444     maxnz = 0;
1445     for ( i=0; i<size; i++ ) {
1446       maxnz = PetscMax(maxnz,procsnz[i]);
1447     }
1448     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1449 
1450     /* read in my part of the matrix column indices  */
1451     nz = procsnz[0];
1452     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1453     mycols = ibuf;
1454     if (size == 1)  nz -= extra_rows;
1455     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1456     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
1457 
1458     /* read in every ones (except the last) and ship off */
1459     for ( i=1; i<size-1; i++ ) {
1460       nz = procsnz[i];
1461       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1462       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1463     }
1464     /* read in the stuff for the last proc */
1465     if ( size != 1 ) {
1466       nz = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
1467       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1468       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
1469       MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);
1470     }
1471     PetscFree(cols);
1472   }
1473   else {
1474     /* determine buffer space needed for message */
1475     nz = 0;
1476     for ( i=0; i<m; i++ ) {
1477       nz += locrowlens[i];
1478     }
1479     ibuf = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1480     mycols = ibuf;
1481     /* receive message of column indices*/
1482     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1483     MPI_Get_count(&status,MPI_INT,&maxnz);
1484     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIBAIJ:something is wrong with file");
1485   }
1486 
1487   /* loop over local rows, determining number of off diagonal entries */
1488   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
1489   odlens = dlens + (rend-rstart);
1490   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
1491   PetscMemzero(mask,3*Mbs*sizeof(int));
1492   masked1 = mask    + Mbs;
1493   masked2 = masked1 + Mbs;
1494   rowcount = 0; nzcount = 0;
1495   for ( i=0; i<mbs; i++ ) {
1496     dcount  = 0;
1497     odcount = 0;
1498     for ( j=0; j<bs; j++ ) {
1499       kmax = locrowlens[rowcount];
1500       for ( k=0; k<kmax; k++ ) {
1501         tmp = mycols[nzcount++]/bs;
1502         if (!mask[tmp]) {
1503           mask[tmp] = 1;
1504           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
1505           else masked1[dcount++] = tmp;
1506         }
1507       }
1508       rowcount++;
1509     }
1510 
1511     dlens[i]  = dcount;
1512     odlens[i] = odcount;
1513 
1514     /* zero out the mask elements we set */
1515     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
1516     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
1517   }
1518 
1519   /* create our matrix */
1520   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);
1521          CHKERRQ(ierr);
1522   A = *newmat;
1523   MatSetOption(A,MAT_COLUMNS_SORTED);
1524 
1525   if (!rank) {
1526     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
1527     /* read in my part of the matrix numerical values  */
1528     nz = procsnz[0];
1529     vals = buf;
1530     mycols = ibuf;
1531     if (size == 1)  nz -= extra_rows;
1532     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1533     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
1534 
1535     /* insert into matrix */
1536     jj      = rstart*bs;
1537     for ( i=0; i<m; i++ ) {
1538       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1539       mycols += locrowlens[i];
1540       vals   += locrowlens[i];
1541       jj++;
1542     }
1543     /* read in other processors (except the last one) and ship out */
1544     for ( i=1; i<size-1; i++ ) {
1545       nz = procsnz[i];
1546       vals = buf;
1547       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1548       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1549     }
1550     /* the last proc */
1551     if ( size != 1 ){
1552       nz = procsnz[i] - extra_rows;
1553       vals = buf;
1554       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1555       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
1556       MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);
1557     }
1558     PetscFree(procsnz);
1559   }
1560   else {
1561     /* receive numeric values */
1562     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
1563 
1564     /* receive message of values*/
1565     vals = buf;
1566     mycols = ibuf;
1567     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1568     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1569     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIBAIJ:something is wrong with file");
1570 
1571     /* insert into matrix */
1572     jj      = rstart*bs;
1573     for ( i=0; i<m; i++ ) {
1574       ierr    = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1575       mycols += locrowlens[i];
1576       vals   += locrowlens[i];
1577       jj++;
1578     }
1579   }
1580   PetscFree(locrowlens);
1581   PetscFree(buf);
1582   PetscFree(ibuf);
1583   PetscFree(rowners);
1584   PetscFree(dlens);
1585   PetscFree(mask);
1586   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1587   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1588   return 0;
1589 }
1590 
1591 
1592