xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision 683d4f989ec3a1de4e8db9c711574474e7b1792b)
1 #ifndef lint
2 static char vcid[] = "$Id: mpibaij.c,v 1.14 1996/07/12 17:50:29 balay Exp balay $";
3 #endif
4 
5 #include "mpibaij.h"
6 
7 
8 #include "draw.h"
9 #include "pinclude/pviewer.h"
10 
11 extern int MatSetUpMultiply_MPIBAIJ(Mat);
12 extern int DisAssemble_MPIBAIJ(Mat);
13 extern int MatIncreaseOverlap_MPIBAIJ(Mat,int,IS *,int);
14 extern int MatGetSubMatrices_MPIBAIJ(Mat,int,IS *,IS *,MatGetSubMatrixCall,Mat **);
15 extern int MatLUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,Mat *);
16 extern int MatLUFactorNumeric_MPIBAIJ(Mat,Mat *);
17 extern int MatLUFactor_MPIBAIJ(Mat,IS,IS,double);
18 extern int MatSolve_MPIBAIJ(Mat,Vec,Vec);
19 extern int MatSolveAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
20 extern int MatSolveTrans_MPIBAIJ(Mat,Vec,Vec);
21 extern int MatSolveTransAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
22 extern int MatILUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,int,Mat *);
23 
24 /* local utility routine that creates a mapping from the global column
25 number to the local number in the off-diagonal part of the local
26 storage of the matrix.  This is done in a non scable way since the
27 length of colmap equals the global matrix length.
28 */
29 static int CreateColmap_MPIBAIJ_Private(Mat mat)
30 {
31   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
32   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
33   int        nbs = B->nbs,i;
34 
35   baij->colmap = (int *) PetscMalloc(baij->Nbs*sizeof(int));CHKPTRQ(baij->colmap);
36   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
37   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
38   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i;
39   return 0;
40 }
41 
42 
43 static int MatGetReordering_MPIBAIJ(Mat mat,MatReordering type,IS *rperm,IS *cperm)
44 {
45   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
46   int         ierr;
47   if (baij->size == 1) {
48     ierr = MatGetReordering(baij->A,type,rperm,cperm); CHKERRQ(ierr);
49   } else SETERRQ(1,"MatGetReordering_MPIBAIJ:not supported in parallel");
50   return 0;
51 }
52 
53 static int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
54 {
55   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
56   Scalar      value;
57   int         ierr,i,j, rstart = baij->rstart, rend = baij->rend;
58   int         cstart = baij->cstart, cend = baij->cend,row,col;
59   int         roworiented = baij->roworiented,rstart_orig,rend_orig;
60   int         cstart_orig,cend_orig,bs=baij->bs;
61 
62   if (baij->insertmode != NOT_SET_VALUES && baij->insertmode != addv) {
63     SETERRQ(1,"MatSetValues_MPIBAIJ:Cannot mix inserts and adds");
64   }
65   baij->insertmode = addv;
66   rstart_orig = rstart*bs;
67   rend_orig   = rend*bs;
68   cstart_orig = cstart*bs;
69   cend_orig   = cend*bs;
70   for ( i=0; i<m; i++ ) {
71     if (im[i] < 0) SETERRQ(1,"MatSetValues_MPIBAIJ:Negative row");
72     if (im[i] >= baij->M) SETERRQ(1,"MatSetValues_MPIBAIJ:Row too large");
73     if (im[i] >= rstart_orig && im[i] < rend_orig) {
74       row = im[i] - rstart_orig;
75       for ( j=0; j<n; j++ ) {
76         if (in[j] < 0) SETERRQ(1,"MatSetValues_MPIBAIJ:Negative column");
77         if (in[j] >= baij->N) SETERRQ(1,"MatSetValues_MPIBAIJ:Col too large");
78         if (in[j] >= cstart_orig && in[j] < cend_orig){
79           col = in[j] - cstart_orig;
80           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
81           ierr = MatSetValues(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
82         }
83         else {
84           if (mat->was_assembled) {
85             if (!baij->colmap) {ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);}
86             col = baij->colmap[in[j]/bs] +in[j]%bs;
87             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
88               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
89               col =  in[j];
90             }
91           }
92           else col = in[j];
93           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
94           ierr = MatSetValues(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
95         }
96       }
97     }
98     else {
99       if (roworiented) {
100         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
101       }
102       else {
103         row = im[i];
104         for ( j=0; j<n; j++ ) {
105           ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
106         }
107       }
108     }
109   }
110   return 0;
111 }
112 
113 static int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
114 {
115   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
116   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
117   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col;
118 
119   for ( i=0; i<m; i++ ) {
120     if (idxm[i] < 0) SETERRQ(1,"MatGetValues_MPIBAIJ:Negative row");
121     if (idxm[i] >= baij->M) SETERRQ(1,"MatGetValues_MPIBAIJ:Row too large");
122     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
123       row = idxm[i] - bsrstart;
124       for ( j=0; j<n; j++ ) {
125         if (idxn[j] < 0) SETERRQ(1,"MatGetValues_MPIBAIJ:Negative column");
126         if (idxn[j] >= baij->N) SETERRQ(1,"MatGetValues_MPIBAIJ:Col too large");
127         if (idxn[j] >= bscstart && idxn[j] < bscend){
128           col = idxn[j] - bscstart;
129           ierr = MatGetValues(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
130         }
131         else {
132           if (!baij->colmap) {ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);}
133           if ( baij->garray[baij->colmap[idxn[j]/bs]] != idxn[j]/bs ) *(v+i*n+j) = 0.0;
134           else {
135             col = baij->colmap[idxn[j]/bs]*bs + idxn[j]%bs;
136             ierr = MatGetValues(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
137           }
138         }
139       }
140     }
141     else {
142       SETERRQ(1,"MatGetValues_MPIBAIJ:Only local values currently supported");
143     }
144   }
145   return 0;
146 }
147 
148 static int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
149 {
150   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
151   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
152   int        ierr, i,bs2=baij->bs2;
153   double     sum = 0.0;
154   Scalar     *v;
155 
156   if (baij->size == 1) {
157     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
158   } else {
159     if (type == NORM_FROBENIUS) {
160       v = amat->a;
161       for (i=0; i<amat->nz*bs2; i++ ) {
162 #if defined(PETSC_COMPLEX)
163         sum += real(conj(*v)*(*v)); v++;
164 #else
165         sum += (*v)*(*v); v++;
166 #endif
167       }
168       v = bmat->a;
169       for (i=0; i<bmat->nz*bs2; i++ ) {
170 #if defined(PETSC_COMPLEX)
171         sum += real(conj(*v)*(*v)); v++;
172 #else
173         sum += (*v)*(*v); v++;
174 #endif
175       }
176       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
177       *norm = sqrt(*norm);
178     }
179     else
180       SETERRQ(1,"MatNorm_SeqBAIJ:No support for this norm yet");
181   }
182   return 0;
183 }
184 
185 static int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
186 {
187   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
188   MPI_Comm    comm = mat->comm;
189   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
190   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
191   MPI_Request *send_waits,*recv_waits;
192   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
193   InsertMode  addv;
194   Scalar      *rvalues,*svalues;
195 
196   /* make sure all processors are either in INSERTMODE or ADDMODE */
197   MPI_Allreduce(&baij->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
198   if (addv == (ADD_VALUES|INSERT_VALUES)) {
199     SETERRQ(1,"MatAssemblyBegin_MPIBAIJ:Some processors inserted others added");
200   }
201   baij->insertmode = addv; /* in case this processor had no cache */
202 
203   /*  first count number of contributors to each processor */
204   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
205   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
206   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
207   for ( i=0; i<baij->stash.n; i++ ) {
208     idx = baij->stash.idx[i];
209     for ( j=0; j<size; j++ ) {
210       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
211         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
212       }
213     }
214   }
215   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
216 
217   /* inform other processors of number of messages and max length*/
218   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
219   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
220   nreceives = work[rank];
221   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
222   nmax = work[rank];
223   PetscFree(work);
224 
225   /* post receives:
226        1) each message will consist of ordered pairs
227      (global index,value) we store the global index as a double
228      to simplify the message passing.
229        2) since we don't know how long each individual message is we
230      allocate the largest needed buffer for each receive. Potentially
231      this is a lot of wasted space.
232 
233 
234        This could be done better.
235   */
236   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
237   CHKPTRQ(rvalues);
238   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
239   CHKPTRQ(recv_waits);
240   for ( i=0; i<nreceives; i++ ) {
241     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
242               comm,recv_waits+i);
243   }
244 
245   /* do sends:
246       1) starts[i] gives the starting index in svalues for stuff going to
247          the ith processor
248   */
249   svalues = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
250   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
251   CHKPTRQ(send_waits);
252   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
253   starts[0] = 0;
254   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
255   for ( i=0; i<baij->stash.n; i++ ) {
256     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
257     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
258     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
259   }
260   PetscFree(owner);
261   starts[0] = 0;
262   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
263   count = 0;
264   for ( i=0; i<size; i++ ) {
265     if (procs[i]) {
266       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
267                 comm,send_waits+count++);
268     }
269   }
270   PetscFree(starts); PetscFree(nprocs);
271 
272   /* Free cache space */
273   PLogInfo(0,"[%d]MatAssemblyBegin_MPIBAIJ:Number of off processor values %d\n",rank,baij->stash.n);
274   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
275 
276   baij->svalues    = svalues;    baij->rvalues    = rvalues;
277   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
278   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
279   baij->rmax       = nmax;
280 
281   return 0;
282 }
283 
284 
285 static int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
286 {
287   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
288   MPI_Status  *send_status,recv_status;
289   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
290   int         bs=baij->bs,row,col,other_disassembled;
291   Scalar      *values,val;
292   InsertMode  addv = baij->insertmode;
293 
294   /*  wait on receives */
295   while (count) {
296     MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);
297     /* unpack receives into our local space */
298     values = baij->rvalues + 3*imdex*baij->rmax;
299     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
300     n = n/3;
301     for ( i=0; i<n; i++ ) {
302       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
303       col = (int) PetscReal(values[3*i+1]);
304       val = values[3*i+2];
305       if (col >= baij->cstart*bs && col < baij->cend*bs) {
306         col -= baij->cstart*bs;
307         MatSetValues(baij->A,1,&row,1,&col,&val,addv);
308       }
309       else {
310         if (mat->was_assembled) {
311           if (!baij->colmap) {ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);}
312           col = baij->colmap[col/bs]*bs + col%bs;
313           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
314             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
315             col = (int) PetscReal(values[3*i+1]);
316           }
317         }
318         MatSetValues(baij->B,1,&row,1,&col,&val,addv);
319       }
320     }
321     count--;
322   }
323   PetscFree(baij->recv_waits); PetscFree(baij->rvalues);
324 
325   /* wait on sends */
326   if (baij->nsends) {
327     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));
328     CHKPTRQ(send_status);
329     MPI_Waitall(baij->nsends,baij->send_waits,send_status);
330     PetscFree(send_status);
331   }
332   PetscFree(baij->send_waits); PetscFree(baij->svalues);
333 
334   baij->insertmode = NOT_SET_VALUES;
335   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
336   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
337 
338   /* determine if any processor has disassembled, if so we must
339      also disassemble ourselfs, in order that we may reassemble. */
340   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
341   if (mat->was_assembled && !other_disassembled) {
342     ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
343   }
344 
345   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
346     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
347   }
348   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
349   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
350 
351   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
352   return 0;
353 }
354 
355 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
356 {
357   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
358   int          ierr;
359 
360   if (baij->size == 1) {
361     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
362   }
363   else SETERRQ(1,"MatView_MPIBAIJ_Binary:Only uniprocessor output supported");
364   return 0;
365 }
366 
367 static int MatView_MPIBAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
368 {
369   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
370   int          ierr, format,rank,bs=baij->bs;
371   FILE         *fd;
372   ViewerType   vtype;
373 
374   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
375   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
376     ierr = ViewerGetFormat(viewer,&format);
377     if (format == ASCII_FORMAT_INFO_DETAILED) {
378       int nz, nzalloc, mem;
379       MPI_Comm_rank(mat->comm,&rank);
380       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
381       ierr = MatGetInfo(mat,MAT_LOCAL,&nz,&nzalloc,&mem);
382       PetscSequentialPhaseBegin(mat->comm,1);
383       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
384               rank,baij->m,nz*bs,nzalloc*bs,baij->bs,mem);
385       ierr = MatGetInfo(baij->A,MAT_LOCAL,&nz,&nzalloc,&mem);
386       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,nz*bs);
387       ierr = MatGetInfo(baij->B,MAT_LOCAL,&nz,&nzalloc,&mem);
388       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,nz*bs);
389       fflush(fd);
390       PetscSequentialPhaseEnd(mat->comm,1);
391       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
392       return 0;
393     }
394     else if (format == ASCII_FORMAT_INFO) {
395       return 0;
396     }
397   }
398 
399   if (vtype == DRAW_VIEWER) {
400     Draw       draw;
401     PetscTruth isnull;
402     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
403     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
404   }
405 
406   if (vtype == ASCII_FILE_VIEWER) {
407     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
408     PetscSequentialPhaseBegin(mat->comm,1);
409     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
410            baij->rank,baij->m,baij->rstart*bs,baij->rend*bs,baij->n,
411             baij->cstart*bs,baij->cend*bs);
412     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
413     ierr = MatView(baij->B,viewer); CHKERRQ(ierr);
414     fflush(fd);
415     PetscSequentialPhaseEnd(mat->comm,1);
416   }
417   else {
418     int size = baij->size;
419     rank = baij->rank;
420     if (size == 1) {
421       ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
422     }
423     else {
424       /* assemble the entire matrix onto first processor. */
425       Mat         A;
426       Mat_SeqBAIJ *Aloc;
427       int         M = baij->M, N = baij->N,*ai,*aj,row,col,i,j,k,*rvals;
428       int         mbs=baij->mbs;
429       Scalar      *a;
430 
431       if (!rank) {
432         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
433         CHKERRQ(ierr);
434       }
435       else {
436         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
437         CHKERRQ(ierr);
438       }
439       PLogObjectParent(mat,A);
440 
441       /* copy over the A part */
442       Aloc = (Mat_SeqBAIJ*) baij->A->data;
443       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
444       row = baij->rstart;
445       rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
446 
447       for ( i=0; i<mbs; i++ ) {
448         rvals[0] = bs*(baij->rstart + i);
449         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
450         for ( j=ai[i]; j<ai[i+1]; j++ ) {
451           col = (baij->cstart+aj[j])*bs;
452           for (k=0; k<bs; k++ ) {
453             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
454             col++; a += bs;
455           }
456         }
457       }
458       /* copy over the B part */
459       Aloc = (Mat_SeqBAIJ*) baij->B->data;
460       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
461       row = baij->rstart*bs;
462       for ( i=0; i<mbs; i++ ) {
463         rvals[0] = bs*(baij->rstart + i);
464         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
465         for ( j=ai[i]; j<ai[i+1]; j++ ) {
466           col = baij->garray[aj[j]]*bs;
467           for (k=0; k<bs; k++ ) {
468             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
469             col++; a += bs;
470           }
471         }
472       }
473       PetscFree(rvals);
474       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
475       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
476       if (!rank) {
477         ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
478       }
479       ierr = MatDestroy(A); CHKERRQ(ierr);
480     }
481   }
482   return 0;
483 }
484 
485 
486 
487 static int MatView_MPIBAIJ(PetscObject obj,Viewer viewer)
488 {
489   Mat         mat = (Mat) obj;
490   int         ierr;
491   ViewerType  vtype;
492 
493   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
494   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
495       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
496     ierr = MatView_MPIBAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
497   }
498   else if (vtype == BINARY_FILE_VIEWER) {
499     return MatView_MPIBAIJ_Binary(mat,viewer);
500   }
501   return 0;
502 }
503 
504 static int MatDestroy_MPIBAIJ(PetscObject obj)
505 {
506   Mat         mat = (Mat) obj;
507   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
508   int         ierr;
509 
510 #if defined(PETSC_LOG)
511   PLogObjectState(obj,"Rows=%d, Cols=%d",baij->M,baij->N);
512 #endif
513 
514   PetscFree(baij->rowners);
515   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
516   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
517   if (baij->colmap) PetscFree(baij->colmap);
518   if (baij->garray) PetscFree(baij->garray);
519   if (baij->lvec)   VecDestroy(baij->lvec);
520   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
521   if (baij->rowvalues) PetscFree(baij->rowvalues);
522   PetscFree(baij);
523   PLogObjectDestroy(mat);
524   PetscHeaderDestroy(mat);
525   return 0;
526 }
527 
528 static int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
529 {
530   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
531   int         ierr, nt;
532 
533   ierr = VecGetLocalSize(xx,&nt);  CHKERRQ(ierr);
534   if (nt != a->n) {
535     SETERRQ(1,"MatMult_MPIAIJ:Incompatible parition of A and xx");
536   }
537   ierr = VecGetLocalSize(yy,&nt);  CHKERRQ(ierr);
538   if (nt != a->m) {
539     SETERRQ(1,"MatMult_MPIAIJ:Incompatible parition of A and yy");
540   }
541   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx); CHKERRQ(ierr);
542   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
543   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx); CHKERRQ(ierr);
544   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
545   return 0;
546 }
547 
548 static int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
549 {
550   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
551   int        ierr;
552   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
553   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
554   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
555   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
556   return 0;
557 }
558 
559 static int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
560 {
561   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
562   int        ierr;
563 
564   /* do nondiagonal part */
565   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
566   /* send it on its way */
567   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,
568                 (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
569   /* do local part */
570   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
571   /* receive remote parts: note this assumes the values are not actually */
572   /* inserted in yy until the next line, which is true for my implementation*/
573   /* but is not perhaps always true. */
574   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,
575                   (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
576   return 0;
577 }
578 
579 static int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
580 {
581   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
582   int        ierr;
583 
584   /* do nondiagonal part */
585   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
586   /* send it on its way */
587   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,
588                  (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
589   /* do local part */
590   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
591   /* receive remote parts: note this assumes the values are not actually */
592   /* inserted in yy until the next line, which is true for my implementation*/
593   /* but is not perhaps always true. */
594   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,
595                   (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
596   return 0;
597 }
598 
599 /*
600   This only works correctly for square matrices where the subblock A->A is the
601    diagonal block
602 */
603 static int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
604 {
605   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
606   if (a->M != a->N)
607     SETERRQ(1,"MatGetDiagonal_MPIBAIJ:Supports only square matrix where A->A is diag block");
608   return MatGetDiagonal(a->A,v);
609 }
610 
611 static int MatScale_MPIBAIJ(Scalar *aa,Mat A)
612 {
613   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
614   int        ierr;
615   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
616   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
617   return 0;
618 }
619 static int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
620 {
621   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
622   *m = mat->M; *n = mat->N;
623   return 0;
624 }
625 
626 static int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
627 {
628   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
629   *m = mat->m; *n = mat->N;
630   return 0;
631 }
632 
633 static int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
634 {
635   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
636   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
637   return 0;
638 }
639 
640 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
641 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
642 
643 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
644 {
645   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
646   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
647   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
648   int        nztot, nzA, nzB, lrow, brstart = mat->rstart*bs, brend = mat->rend*bs;
649   int        *cmap, *idx_p,cstart = mat->cstart;
650 
651   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,"MatGetRow_MPIBAIJ:Already active");
652   mat->getrowactive = PETSC_TRUE;
653 
654   if (!mat->rowvalues && (idx || v)) {
655     /*
656         allocate enough space to hold information from the longest row.
657     */
658     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
659     int     max = 1,mbs = mat->mbs,tmp;
660     for ( i=0; i<mbs; i++ ) {
661       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
662       if (max < tmp) { max = tmp; }
663     }
664     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
665     CHKPTRQ(mat->rowvalues);
666     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
667   }
668 
669 
670   if (row < brstart || row >= brend) SETERRQ(1,"MatGetRow_MPIBAIJ:Only local rows")
671   lrow = row - brstart;
672 
673   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
674   if (!v)   {pvA = 0; pvB = 0;}
675   if (!idx) {pcA = 0; if (!v) pcB = 0;}
676   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
677   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
678   nztot = nzA + nzB;
679 
680   cmap  = mat->garray;
681   if (v  || idx) {
682     if (nztot) {
683       /* Sort by increasing column numbers, assuming A and B already sorted */
684       int imark = -1;
685       if (v) {
686         *v = v_p = mat->rowvalues;
687         for ( i=0; i<nzB; i++ ) {
688           if (cmap[cworkB[i]/bs] < cstart)   v_p[i] = vworkB[i];
689           else break;
690         }
691         imark = i;
692         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
693         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
694       }
695       if (idx) {
696         *idx = idx_p = mat->rowindices;
697         if (imark > -1) {
698           for ( i=0; i<imark; i++ ) {
699             idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs;
700           }
701         } else {
702           for ( i=0; i<nzB; i++ ) {
703             if (cmap[cworkB[i]/bs] < cstart)
704               idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
705             else break;
706           }
707           imark = i;
708         }
709         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart*bs + cworkA[i];
710         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
711       }
712     }
713     else {
714       if (idx) *idx = 0;
715       if (v)   *v   = 0;
716     }
717   }
718   *nz = nztot;
719   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
720   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
721   return 0;
722 }
723 
724 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
725 {
726   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
727   if (baij->getrowactive == PETSC_FALSE) {
728     SETERRQ(1,"MatRestoreRow_MPIBAIJ:MatGetRow not called");
729   }
730   baij->getrowactive = PETSC_FALSE;
731   return 0;
732 }
733 
734 static int MatGetBlockSize_MPIBAIJ(Mat mat, int *bs)
735 {
736   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
737   *bs = baij->bs;
738   return 0;
739 }
740 
741 static int MatZeroEntries_MPIBAIJ(Mat A)
742 {
743   Mat_MPIBAIJ *l = (Mat_MPIBAIJ *) A->data;
744   int         ierr;
745   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
746   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
747   return 0;
748 }
749 static int MatSetOption_MPIBAIJ(Mat A,MatOption op)
750 {
751   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
752 
753   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
754       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
755       op == MAT_COLUMNS_SORTED ||
756       op == MAT_ROW_ORIENTED) {
757         MatSetOption(a->A,op);
758         MatSetOption(a->B,op);
759   }
760   else if (op == MAT_ROWS_SORTED ||
761            op == MAT_SYMMETRIC ||
762            op == MAT_STRUCTURALLY_SYMMETRIC ||
763            op == MAT_YES_NEW_DIAGONALS)
764     PLogInfo(A,"Info:MatSetOption_MPIBAIJ:Option ignored\n");
765   else if (op == MAT_COLUMN_ORIENTED) {
766     a->roworiented = 0;
767     MatSetOption(a->A,op);
768     MatSetOption(a->B,op);
769   }
770   else if (op == MAT_NO_NEW_DIAGONALS)
771     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIAIJ:MAT_NO_NEW_DIAGONALS");}
772   else
773     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIAIJ:unknown option");}
774   return 0;
775 }
776 
777 /* -------------------------------------------------------------------*/
778 static struct _MatOps MatOps = {
779   MatSetValues_MPIBAIJ,MatGetRow_MPIBAIJ,MatRestoreRow_MPIBAIJ,MatMult_MPIBAIJ,
780   MatMultAdd_MPIBAIJ,MatMultTrans_MPIBAIJ,MatMultTransAdd_MPIBAIJ,MatSolve_MPIBAIJ,
781   MatSolveAdd_MPIBAIJ,MatSolveTrans_MPIBAIJ,MatSolveTransAdd_MPIBAIJ,MatLUFactor_MPIBAIJ,
782   0,0,0,0,
783   0,MatGetDiagonal_MPIBAIJ,0,MatNorm_MPIBAIJ,
784   MatAssemblyBegin_MPIBAIJ,MatAssemblyEnd_MPIBAIJ,0,MatSetOption_MPIBAIJ,
785   MatZeroEntries_MPIBAIJ,0,MatGetReordering_MPIBAIJ,MatLUFactorSymbolic_MPIBAIJ,
786   MatLUFactorNumeric_MPIBAIJ,0,0,MatGetSize_MPIBAIJ,
787   MatGetLocalSize_MPIBAIJ,MatGetOwnershipRange_MPIBAIJ,MatILUFactorSymbolic_MPIBAIJ,0,
788   0,0,0,0,
789   0,0,0,0,
790   0,0,0,MatGetSubMatrices_MPIBAIJ,
791   MatIncreaseOverlap_MPIBAIJ,MatGetValues_MPIBAIJ,0,0,
792   MatScale_MPIBAIJ,0,0,0,MatGetBlockSize_MPIBAIJ};
793 
794 
795 /*@C
796    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
797    (block compressed row).  For good matrix assembly performance
798    the user should preallocate the matrix storage by setting the parameters
799    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
800    performance can be increased by more than a factor of 50.
801 
802    Input Parameters:
803 .  comm - MPI communicator
804 .  bs   - size of blockk
805 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
806            This value should be the same as the local size used in creating the
807            y vector for the matrix-vector product y = Ax.
808 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
809            This value should be the same as the local size used in creating the
810            x vector for the matrix-vector product y = Ax.
811 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
812 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
813 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
814            submatrix  (same for all local rows)
815 .  d_nzz - array containing the number of block nonzeros in the various block rows
816            of the in diagonal portion of the local (possibly different for each block
817            row) or PETSC_NULL.  You must leave room for the diagonal entry even if
818            it is zero.
819 .  o_nz  - number of block nonzeros per block row in the off-diagonal portion of local
820            submatrix (same for all local rows).
821 .  o_nzz - array containing the number of nonzeros in the various block rows of the
822            off-diagonal portion of the local submatrix (possibly different for
823            each block row) or PETSC_NULL.
824 
825    Output Parameter:
826 .  A - the matrix
827 
828    Notes:
829    The user MUST specify either the local or global matrix dimensions
830    (possibly both).
831 
832    Storage Information:
833    For a square global matrix we define each processor's diagonal portion
834    to be its local rows and the corresponding columns (a square submatrix);
835    each processor's off-diagonal portion encompasses the remainder of the
836    local matrix (a rectangular submatrix).
837 
838    The user can specify preallocated storage for the diagonal part of
839    the local submatrix with either d_nz or d_nnz (not both).  Set
840    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
841    memory allocation.  Likewise, specify preallocated storage for the
842    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
843 
844    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
845    the figure below we depict these three local rows and all columns (0-11).
846 
847 $          0 1 2 3 4 5 6 7 8 9 10 11
848 $         -------------------
849 $  row 3  |  o o o d d d o o o o o o
850 $  row 4  |  o o o d d d o o o o o o
851 $  row 5  |  o o o d d d o o o o o o
852 $         -------------------
853 $
854 
855    Thus, any entries in the d locations are stored in the d (diagonal)
856    submatrix, and any entries in the o locations are stored in the
857    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
858    stored simply in the MATSEQBAIJ format for compressed row storage.
859 
860    Now d_nz should indicate the number of nonzeros per row in the d matrix,
861    and o_nz should indicate the number of nonzeros per row in the o matrix.
862    In general, for PDE problems in which most nonzeros are near the diagonal,
863    one expects d_nz >> o_nz.   For large problems you MUST preallocate memory
864    or you will get TERRIBLE performance; see the users' manual chapter on
865    matrices and the file $(PETSC_DIR)/Performance.
866 
867 .keywords: matrix, block, aij, compressed row, sparse, parallel
868 
869 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues()
870 @*/
871 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
872                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
873 {
874   Mat          B;
875   Mat_MPIBAIJ  *b;
876   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE;
877 
878   if (bs < 1) SETERRQ(1,"MatCreateMPIBAIJ: invalid block size specified");
879   *A = 0;
880   PetscHeaderCreate(B,_Mat,MAT_COOKIE,MATMPIBAIJ,comm);
881   PLogObjectCreate(B);
882   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
883   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
884   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
885   B->destroy    = MatDestroy_MPIBAIJ;
886   B->view       = MatView_MPIBAIJ;
887 
888   B->factor     = 0;
889   B->assembled  = PETSC_FALSE;
890 
891   b->insertmode = NOT_SET_VALUES;
892   MPI_Comm_rank(comm,&b->rank);
893   MPI_Comm_size(comm,&b->size);
894 
895   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
896     SETERRQ(1,"MatCreateMPIBAIJ:Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
897   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) SETERRQ(1,"MatCreateMPIBAIJ: either M or m should be specified");
898   if ( M == PETSC_DECIDE && n == PETSC_DECIDE)SETERRQ(1,"MatCreateMPIBAIJ: either N or n should be specified");
899   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
900   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
901 
902   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
903     work[0] = m; work[1] = n;
904     mbs = m/bs; nbs = n/bs;
905     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
906     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
907     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
908   }
909   if (m == PETSC_DECIDE) {
910     Mbs = M/bs;
911     if (Mbs*bs != M) SETERRQ(1,"MatCreateMPIBAIJ: No of global rows must be divisible by blocksize");
912     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
913     m   = mbs*bs;
914   }
915   if (n == PETSC_DECIDE) {
916     Nbs = N/bs;
917     if (Nbs*bs != N) SETERRQ(1,"MatCreateMPIBAIJ: No of global cols must be divisible by blocksize");
918     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
919     n   = nbs*bs;
920   }
921   if (mbs*bs != m || nbs*bs != n) SETERRQ(1,"MatCreateMPIBAIJ: No of local rows, cols must be divisible by blocksize");
922 
923   b->m = m; B->m = m;
924   b->n = n; B->n = n;
925   b->N = N; B->N = N;
926   b->M = M; B->M = M;
927   b->bs  = bs;
928   b->bs2 = bs*bs;
929   b->mbs = mbs;
930   b->nbs = nbs;
931   b->Mbs = Mbs;
932   b->Nbs = Nbs;
933 
934   /* build local table of row and column ownerships */
935   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
936   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
937   b->cowners = b->rowners + b->size + 1;
938   MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
939   b->rowners[0] = 0;
940   for ( i=2; i<=b->size; i++ ) {
941     b->rowners[i] += b->rowners[i-1];
942   }
943   b->rstart = b->rowners[b->rank];
944   b->rend   = b->rowners[b->rank+1];
945   MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
946   b->cowners[0] = 0;
947   for ( i=2; i<=b->size; i++ ) {
948     b->cowners[i] += b->cowners[i-1];
949   }
950   b->cstart = b->cowners[b->rank];
951   b->cend   = b->cowners[b->rank+1];
952 
953   if (d_nz == PETSC_DEFAULT) d_nz = 5;
954   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
955   PLogObjectParent(B,b->A);
956   if (o_nz == PETSC_DEFAULT) o_nz = 0;
957   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
958   PLogObjectParent(B,b->B);
959 
960   /* build cache for off array entries formed */
961   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
962   b->colmap      = 0;
963   b->garray      = 0;
964   b->roworiented = 1;
965 
966   /* stuff used for matrix vector multiply */
967   b->lvec      = 0;
968   b->Mvctx     = 0;
969 
970   /* stuff for MatGetRow() */
971   b->rowindices   = 0;
972   b->rowvalues    = 0;
973   b->getrowactive = PETSC_FALSE;
974 
975   *A = B;
976   return 0;
977 }
978 
979 #include "sys.h"
980 
981 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
982 {
983   Mat          A;
984   int          i, nz, ierr, j,rstart, rend, fd;
985   Scalar       *vals,*buf;
986   MPI_Comm     comm = ((PetscObject)viewer)->comm;
987   MPI_Status   status;
988   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
989   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
990   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,bs2,Mbs,mbs,extra_rows;
991   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
992   int          dcount,kmax,k,nzcount,tmp;
993 
994 
995   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
996   bs2  = bs*bs;
997 
998   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
999   if (!rank) {
1000     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1001     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
1002     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIBAIJ:not matrix object");
1003   }
1004 
1005   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1006   M = header[1]; N = header[2];
1007 
1008 
1009   if (M != N) SETERRQ(1,"MatLoad_SeqBAIJ:Can only do square matrices");
1010 
1011   /*
1012      This code adds extra rows to make sure the number of rows is
1013      divisible by the blocksize
1014   */
1015   Mbs        = M/bs;
1016   extra_rows = bs - M + bs*(Mbs);
1017   if (extra_rows == bs) extra_rows = 0;
1018   else                  Mbs++;
1019   if (extra_rows &&!rank) {
1020     PLogInfo(0,"MatLoad_SeqBAIJ:Padding loaded matrix to match blocksize");
1021   }
1022   /* determine ownership of all rows */
1023   mbs = Mbs/size + ((Mbs % size) > rank);
1024   m   = mbs * bs;
1025   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
1026   browners = rowners + size + 1;
1027   MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1028   rowners[0] = 0;
1029   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
1030   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
1031   rstart = rowners[rank];
1032   rend   = rowners[rank+1];
1033 
1034   /* distribute row lengths to all processors */
1035   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
1036   if (!rank) {
1037     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
1038     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1039     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
1040     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1041     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
1042     MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);
1043     PetscFree(sndcounts);
1044   }
1045   else {
1046     MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);
1047   }
1048 
1049   if (!rank) {
1050     /* calculate the number of nonzeros on each processor */
1051     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1052     PetscMemzero(procsnz,size*sizeof(int));
1053     for ( i=0; i<size; i++ ) {
1054       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
1055         procsnz[i] += rowlengths[j];
1056       }
1057     }
1058     PetscFree(rowlengths);
1059 
1060     /* determine max buffer needed and allocate it */
1061     maxnz = 0;
1062     for ( i=0; i<size; i++ ) {
1063       maxnz = PetscMax(maxnz,procsnz[i]);
1064     }
1065     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1066 
1067     /* read in my part of the matrix column indices  */
1068     nz = procsnz[0];
1069     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1070     mycols = ibuf;
1071     if (size == 1)  nz -= extra_rows;
1072     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1073     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
1074 
1075     /* read in every ones (except the last) and ship off */
1076     for ( i=1; i<size-1; i++ ) {
1077       nz = procsnz[i];
1078       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1079       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1080     }
1081     /* read in the stuff for the last proc */
1082     if ( size != 1 ) {
1083       nz = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
1084       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1085       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
1086       MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);
1087     }
1088     PetscFree(cols);
1089   }
1090   else {
1091     /* determine buffer space needed for message */
1092     nz = 0;
1093     for ( i=0; i<m; i++ ) {
1094       nz += locrowlens[i];
1095     }
1096     ibuf = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1097     mycols = ibuf;
1098     /* receive message of column indices*/
1099     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1100     MPI_Get_count(&status,MPI_INT,&maxnz);
1101     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIBAIJ:something is wrong with file");
1102   }
1103 
1104   /* loop over local rows, determining number of off diagonal entries */
1105   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
1106   odlens = dlens + (rend-rstart);
1107   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
1108   PetscMemzero(mask,3*Mbs*sizeof(int));
1109   masked1 = mask    + Mbs;
1110   masked2 = masked1 + Mbs;
1111   rowcount = 0; nzcount = 0;
1112   for ( i=0; i<mbs; i++ ) {
1113     dcount  = 0;
1114     odcount = 0;
1115     for ( j=0; j<bs; j++ ) {
1116       kmax = locrowlens[rowcount];
1117       for ( k=0; k<kmax; k++ ) {
1118         tmp = mycols[nzcount++]/bs;
1119         if (!mask[tmp]) {
1120           mask[tmp] = 1;
1121           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
1122           else masked1[dcount++] = tmp;
1123         }
1124       }
1125       rowcount++;
1126     }
1127 
1128     dlens[i]  = dcount;
1129     odlens[i] = odcount;
1130 
1131     /* zero out the mask elements we set */
1132     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
1133     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
1134   }
1135 
1136   /* create our matrix */
1137   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);CHKERRQ(ierr);
1138   A = *newmat;
1139   MatSetOption(A,MAT_COLUMNS_SORTED);
1140 
1141   if (!rank) {
1142     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
1143     /* read in my part of the matrix numerical values  */
1144     nz = procsnz[0];
1145     vals = buf;
1146     mycols = ibuf;
1147     if (size == 1)  nz -= extra_rows;
1148     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1149     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
1150     /* insert into matrix */
1151     jj      = rstart*bs;
1152     for ( i=0; i<m; i++ ) {
1153       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1154       mycols += locrowlens[i];
1155       vals   += locrowlens[i];
1156       jj++;
1157     }
1158     /* read in other processors( except the last one) and ship out */
1159     for ( i=1; i<size-1; i++ ) {
1160       nz = procsnz[i];
1161       vals = buf;
1162       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1163       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1164     }
1165     /* the last proc */
1166     if ( size != 1 ){
1167       nz = procsnz[i] - extra_rows;
1168       vals = buf;
1169       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1170       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
1171       MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);
1172     }
1173     PetscFree(procsnz);
1174   }
1175   else {
1176     /* receive numeric values */
1177     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
1178 
1179     /* receive message of values*/
1180     vals = buf;
1181     mycols = ibuf;
1182     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1183     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1184     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIBAIJ:something is wrong with file");
1185 
1186     /* insert into matrix */
1187     jj      = rstart*bs;
1188     for ( i=0; i<m; i++ ) {
1189       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1190       mycols += locrowlens[i];
1191       vals   += locrowlens[i];
1192       jj++;
1193     }
1194   }
1195   PetscFree(locrowlens);
1196   PetscFree(buf);
1197   PetscFree(ibuf);
1198   PetscFree(rowners);
1199   PetscFree(dlens);
1200   PetscFree(mask);
1201   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1202   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1203   return 0;
1204 }
1205 
1206 
1207