xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision c5263f90b5a72a07097a78361c4f8084181073fb)
1 #ifndef lint
2 static char vcid[] = "$Id: mpibaij.c,v 1.52 1997/01/27 18:17:11 bsmith Exp balay $";
3 #endif
4 
5 #include "src/mat/impls/baij/mpi/mpibaij.h"
6 #include "src/vec/vecimpl.h"
7 
8 #include "draw.h"
9 #include "pinclude/pviewer.h"
10 
11 extern int MatSetUpMultiply_MPIBAIJ(Mat);
12 extern int DisAssemble_MPIBAIJ(Mat);
13 extern int MatIncreaseOverlap_MPIBAIJ(Mat,int,IS *,int);
14 extern int MatGetSubMatrices_MPIBAIJ(Mat,int,IS *,IS *,MatGetSubMatrixCall,Mat **);
15 extern int MatLUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,Mat *);
16 extern int MatLUFactorNumeric_MPIBAIJ(Mat,Mat *);
17 extern int MatLUFactor_MPIBAIJ(Mat,IS,IS,double);
18 extern int MatSolve_MPIBAIJ(Mat,Vec,Vec);
19 extern int MatSolveAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
20 extern int MatSolveTrans_MPIBAIJ(Mat,Vec,Vec);
21 extern int MatSolveTransAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
22 extern int MatILUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,int,Mat *);
23 
24 
25 /*
26      Local utility routine that creates a mapping from the global column
27    number to the local number in the off-diagonal part of the local
28    storage of the matrix.  This is done in a non scable way since the
29    length of colmap equals the global matrix length.
30 */
31 #undef __FUNC__
32 #define __FUNC__ "CreateColmap_MPIBAIJ_Private"
33 static int CreateColmap_MPIBAIJ_Private(Mat mat)
34 {
35   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
36   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
37   int         nbs = B->nbs,i,bs=B->bs;;
38 
39   baij->colmap = (int *) PetscMalloc(baij->Nbs*sizeof(int));CHKPTRQ(baij->colmap);
40   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
41   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
42   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i*bs+1;
43   return 0;
44 }
45 
46 #undef __FUNC__
47 #define __FUNC__ "MatGetRowIJ_MPIBAIJ("
48 static int MatGetRowIJ_MPIBAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
49                             PetscTruth *done)
50 {
51   Mat_MPIBAIJ *aij = (Mat_MPIBAIJ *) mat->data;
52   int         ierr;
53   if (aij->size == 1) {
54     ierr = MatGetRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
55   } else SETERRQ(1,0,"not supported in parallel");
56   return 0;
57 }
58 
59 #undef __FUNC__
60 #define __FUNC__ "MatRestoreRowIJ_MPIBAIJ"
61 static int MatRestoreRowIJ_MPIBAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
62                                 PetscTruth *done)
63 {
64   Mat_MPIBAIJ *aij = (Mat_MPIBAIJ *) mat->data;
65   int        ierr;
66   if (aij->size == 1) {
67     ierr = MatRestoreRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
68   } else SETERRQ(1,0,"not supported in parallel");
69   return 0;
70 }
71 #define CHUNKSIZE  10
72 
73 #define  MatSetValues_SeqBAIJ_A_Private(row,col,value,addv) \
74 { \
75  \
76     brow = row/bs;  \
77     rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
78     rmax = imax[brow]; nrow = ailen[brow]; \
79       bcol = col/bs; \
80       ridx = row % bs; cidx = col % bs; \
81       low = 0; high = nrow; \
82       while (high-low > 3) { \
83         t = (low+high)/2; \
84         if (rp[t] > bcol) high = t; \
85         else              low  = t; \
86       } \
87       for ( _i=low; _i<high; _i++ ) { \
88         if (rp[_i] > bcol) break; \
89         if (rp[_i] == bcol) { \
90           bap  = ap +  bs2*_i + bs*cidx + ridx; \
91           if (addv == ADD_VALUES) *bap += value;  \
92           else                    *bap  = value;  \
93           goto _noinsert; \
94         } \
95       } \
96       if (a->nonew) goto _noinsert; \
97       if (nrow >= rmax) { \
98         /* there is no extra room in row, therefore enlarge */ \
99         int    new_nz = ai[a->mbs] + CHUNKSIZE,len,*new_i,*new_j; \
100         Scalar *new_a; \
101  \
102         /* malloc new storage space */ \
103         len     = new_nz*(sizeof(int)+bs2*sizeof(Scalar))+(a->mbs+1)*sizeof(int); \
104         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
105         new_j   = (int *) (new_a + bs2*new_nz); \
106         new_i   = new_j + new_nz; \
107  \
108         /* copy over old data into new slots */ \
109         for ( ii=0; ii<brow+1; ii++ ) {new_i[ii] = ai[ii];} \
110         for ( ii=brow+1; ii<a->mbs+1; ii++ ) {new_i[ii] = ai[ii]+CHUNKSIZE;} \
111         PetscMemcpy(new_j,aj,(ai[brow]+nrow)*sizeof(int)); \
112         len = (new_nz - CHUNKSIZE - ai[brow] - nrow); \
113         PetscMemcpy(new_j+ai[brow]+nrow+CHUNKSIZE,aj+ai[brow]+nrow, \
114                                                            len*sizeof(int)); \
115         PetscMemcpy(new_a,aa,(ai[brow]+nrow)*bs2*sizeof(Scalar)); \
116         PetscMemzero(new_a+bs2*(ai[brow]+nrow),bs2*CHUNKSIZE*sizeof(Scalar)); \
117         PetscMemcpy(new_a+bs2*(ai[brow]+nrow+CHUNKSIZE), \
118                     aa+bs2*(ai[brow]+nrow),bs2*len*sizeof(Scalar));  \
119         /* free up old matrix storage */ \
120         PetscFree(a->a);  \
121         if (!a->singlemalloc) {PetscFree(a->i);PetscFree(a->j);} \
122         aa = a->a = new_a; ai = a->i = new_i; aj = a->j = new_j;  \
123         a->singlemalloc = 1; \
124  \
125         rp   = aj + ai[brow]; ap = aa + bs2*ai[brow]; \
126         rmax = imax[brow] = imax[brow] + CHUNKSIZE; \
127         PLogObjectMemory(A,CHUNKSIZE*(sizeof(int) + bs2*sizeof(Scalar))); \
128         a->maxnz += bs2*CHUNKSIZE; \
129         a->reallocs++; \
130         a->nz++; \
131       } \
132       N = nrow++ - 1;  \
133       /* shift up all the later entries in this row */ \
134       for ( ii=N; ii>=_i; ii-- ) { \
135         rp[ii+1] = rp[ii]; \
136         PetscMemcpy(ap+bs2*(ii+1),ap+bs2*(ii),bs2*sizeof(Scalar)); \
137       } \
138       if (N>=_i) PetscMemzero(ap+bs2*_i,bs2*sizeof(Scalar));  \
139       rp[_i]                      = bcol;  \
140       ap[bs2*_i + bs*cidx + ridx] = value;  \
141       _noinsert:; \
142     ailen[brow] = nrow; \
143 }
144 
145 extern int MatSetValues_SeqBAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
146 #undef __FUNC__
147 #define __FUNC__ "MatSetValues_MPIBAIJ"
148 static int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
149 {
150   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
151   Scalar      value;
152   int         ierr,i,j,row,col;
153   int         roworiented = baij->roworiented,rstart_orig=baij->rstart_bs ;
154   int         rend_orig=baij->rend_bs,cstart_orig=baij->cstart_bs;
155   int         cend_orig=baij->cend_bs,bs=baij->bs;
156 
157   /* Some Variables required in the macro */
158   Mat         A = baij->A;
159   Mat_SeqBAIJ *a = (Mat_SeqBAIJ *) (A)->data;
160   int         *rp,ii,nrow,_i,rmax,N;
161   int         *imax=a->imax,*ai=a->i,*ailen=a->ilen;
162   int         *aj=a->j,brow,bcol;
163   int         low,high,t,ridx,cidx,bs2=a->bs2;
164   Scalar      *ap,*aa=a->a,*bap;
165 
166 #if defined(PETSC_BOPT_g)
167   if (baij->insertmode != NOT_SET_VALUES && baij->insertmode != addv) {
168     SETERRQ(1,0,"Cannot mix inserts and adds");
169   }
170 #endif
171   baij->insertmode = addv;
172   for ( i=0; i<m; i++ ) {
173 #if defined(PETSC_BOPT_g)
174     if (im[i] < 0) SETERRQ(1,0,"Negative row");
175     if (im[i] >= baij->M) SETERRQ(1,0,"Row too large");
176 #endif
177     if (im[i] >= rstart_orig && im[i] < rend_orig) {
178       row = im[i] - rstart_orig;
179       for ( j=0; j<n; j++ ) {
180         if (in[j] >= cstart_orig && in[j] < cend_orig){
181           col = in[j] - cstart_orig;
182           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
183           MatSetValues_SeqBAIJ_A_Private(row,col,value,addv);
184           /* ierr = MatSetValues_SeqBAIJ(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
185         }
186 #if defined(PETSC_BOPT_g)
187         else if (in[j] < 0) {SETERRQ(1,0,"Negative column");}
188         else if (in[j] >= baij->N) {SETERRQ(1,0,"Col too large");}
189 #endif
190         else {
191           if (mat->was_assembled) {
192             if (!baij->colmap) {
193               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
194             }
195             col = baij->colmap[in[j]/bs] - 1 + in[j]%bs;
196             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
197               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
198               col =  in[j];
199             }
200           }
201           else col = in[j];
202           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
203           ierr = MatSetValues_SeqBAIJ(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
204         }
205       }
206     }
207     else {
208       if (roworiented && !baij->donotstash) {
209         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
210       }
211       else {
212         if (!baij->donotstash) {
213           row = im[i];
214 	  for ( j=0; j<n; j++ ) {
215 	    ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
216           }
217         }
218       }
219     }
220   }
221   return 0;
222 }
223 
224 extern int MatSetValuesBlocked_SeqBAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
225 #undef __FUNC__
226 #define __FUNC__ "MatSetValuesBlocked_MPIBAIJ"
227 static int MatSetValuesBlocked_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
228 {
229   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
230   Scalar      *value,*tmp;
231   int         ierr,i,j,ii,jj,row,col;
232   int         roworiented = baij->roworiented,rstart=baij->rstart ;
233   int         rend=baij->rend,cstart=baij->cstart,stepval;
234   int         cend=baij->cend,bs=baij->bs,bs2=baij->bs2;
235 
236 #if defined(PETSC_BOPT_g)
237   if (baij->insertmode != NOT_SET_VALUES && baij->insertmode != addv) {
238     SETERRQ(1,0,"Cannot mix inserts and adds");
239   }
240 #endif
241   /* Should be stashed somewhere to avoid multiple mallocs */
242   tmp = (Scalar*) PetscMalloc(bs2*sizeof(Scalar)); CHKPTRQ(tmp);
243   if (roworiented) {
244     stepval = (n-1)*bs;
245   } else {
246     stepval = (m-1)*bs;
247   }
248   value = (Scalar *)PetscMalloc(bs2*sizeof(Scalar)); CHKPTRQ(value);
249   baij->insertmode = addv;
250   for ( i=0; i<m; i++ ) {
251 #if defined(PETSC_BOPT_g)
252     if (im[i] < 0) SETERRQ(1,0,"Negative row");
253     if (im[i] >= baij->Mbs) SETERRQ(1,0,"Row too large");
254 #endif
255     if (im[i] >= rstart && im[i] < rend) {
256       row = im[i] - rstart;
257       for ( j=0; j<n; j++ ) {
258         if (in[j] >= cstart && in[j] < cend){
259           col = in[j] - cstart;
260           if (roworiented) {
261             value = v + i*(stepval+bs)*bs + j*bs;
262             for ( ii=0; ii<bs; ii++,value+=stepval )
263               for (jj=ii; jj<bs2; jj+=bs )
264                 tmp[jj] = *value++;
265           } else {
266             value = v + j*(stepval+bs)*bs + i*bs;
267             for ( ii=0; ii<bs; ii++,value+=stepval )
268               for (jj=0; jj<bs; jj++ )
269                 *tmp++  = *value++;
270             tmp -=bs2;
271           }
272           ierr = MatSetValuesBlocked_SeqBAIJ(baij->A,1,&row,1,&col,tmp,addv);CHKERRQ(ierr);
273         }
274 #if defined(PETSC_BOPT_g)
275         else if (in[j] < 0) {SETERRQ(1,0,"Negative column");}
276         else if (in[j] >= baij->Nbs) {SETERRQ(1,0,"Col too large");}
277 #endif
278         else {
279           if (mat->was_assembled) {
280             if (!baij->colmap) {
281               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
282             }
283             col = baij->colmap[in[j]] - 1;
284             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
285               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
286               col =  in[j];
287             }
288           }
289           else col = in[j];
290           if (roworiented) {
291             value = v + i*(stepval+bs)*bs + j*bs;
292             for ( ii=0; ii<bs; ii++,value+=stepval )
293               for (jj=ii; jj<bs2; jj+=bs )
294                 tmp[jj] = *value++;
295           } else {
296             value = v + j*(stepval+bs)*bs + i*bs;
297             for ( ii=0; ii<bs; ii++,value+=stepval )
298               for (jj=0; jj<bs; jj++ )
299                 *tmp++  = *value++;
300             tmp -=bs2;
301           }
302           ierr = MatSetValuesBlocked_SeqBAIJ(baij->B,1,&row,1,&col,tmp,addv);CHKERRQ(ierr);
303         }
304       }
305     }
306     else {
307       if (!baij->donotstash) {
308         row = im[i]*bs;
309         if (roworiented ) {
310           for ( ii=0; ii<bs; i++,row++ ) {
311             for ( jj=0; jj<bs; jj++ ) {
312               ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
313             }
314           }
315         }
316         else {
317           row = im[i];
318 	  for ( j=0; j<n; j++ ) {
319 	    ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
320           }
321         }
322       }
323     }
324   }
325   PetscFree(tmp);
326   return 0;
327 }
328 
329 #undef __FUNC__
330 #define __FUNC__ "MatGetValues_MPIBAIJ"
331 static int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
332 {
333   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
334   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
335   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col;
336 
337   for ( i=0; i<m; i++ ) {
338     if (idxm[i] < 0) SETERRQ(1,0,"Negative row");
339     if (idxm[i] >= baij->M) SETERRQ(1,0,"Row too large");
340     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
341       row = idxm[i] - bsrstart;
342       for ( j=0; j<n; j++ ) {
343         if (idxn[j] < 0) SETERRQ(1,0,"Negative column");
344         if (idxn[j] >= baij->N) SETERRQ(1,0,"Col too large");
345         if (idxn[j] >= bscstart && idxn[j] < bscend){
346           col = idxn[j] - bscstart;
347           ierr = MatGetValues(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
348         }
349         else {
350           if (!baij->colmap) {
351             ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
352           }
353           if((baij->colmap[idxn[j]/bs]-1 < 0) ||
354              (baij->garray[(baij->colmap[idxn[j]/bs]-1)/bs] != idxn[j]/bs)) *(v+i*n+j) = 0.0;
355           else {
356             col  = (baij->colmap[idxn[j]/bs]-1) + idxn[j]%bs;
357             ierr = MatGetValues(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
358           }
359         }
360       }
361     }
362     else {
363       SETERRQ(1,0,"Only local values currently supported");
364     }
365   }
366   return 0;
367 }
368 
369 #undef __FUNC__
370 #define __FUNC__ "MatNorm_MPIBAIJ"
371 static int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
372 {
373   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
374   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
375   int        ierr, i,bs2=baij->bs2;
376   double     sum = 0.0;
377   Scalar     *v;
378 
379   if (baij->size == 1) {
380     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
381   } else {
382     if (type == NORM_FROBENIUS) {
383       v = amat->a;
384       for (i=0; i<amat->nz*bs2; i++ ) {
385 #if defined(PETSC_COMPLEX)
386         sum += real(conj(*v)*(*v)); v++;
387 #else
388         sum += (*v)*(*v); v++;
389 #endif
390       }
391       v = bmat->a;
392       for (i=0; i<bmat->nz*bs2; i++ ) {
393 #if defined(PETSC_COMPLEX)
394         sum += real(conj(*v)*(*v)); v++;
395 #else
396         sum += (*v)*(*v); v++;
397 #endif
398       }
399       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
400       *norm = sqrt(*norm);
401     }
402     else
403       SETERRQ(PETSC_ERR_SUP,0,"No support for this norm yet");
404   }
405   return 0;
406 }
407 
408 #undef __FUNC__
409 #define __FUNC__ "MatAssemblyBegin_MPIBAIJ"
410 static int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
411 {
412   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
413   MPI_Comm    comm = mat->comm;
414   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
415   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
416   MPI_Request *send_waits,*recv_waits;
417   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
418   InsertMode  addv;
419   Scalar      *rvalues,*svalues;
420 
421   /* make sure all processors are either in INSERTMODE or ADDMODE */
422   MPI_Allreduce(&baij->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
423   if (addv == (ADD_VALUES|INSERT_VALUES)) {
424     SETERRQ(1,0,"Some processors inserted others added");
425   }
426   baij->insertmode = addv; /* in case this processor had no cache */
427 
428   /*  first count number of contributors to each processor */
429   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
430   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
431   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
432   for ( i=0; i<baij->stash.n; i++ ) {
433     idx = baij->stash.idx[i];
434     for ( j=0; j<size; j++ ) {
435       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
436         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
437       }
438     }
439   }
440   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
441 
442   /* inform other processors of number of messages and max length*/
443   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
444   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
445   nreceives = work[rank];
446   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
447   nmax = work[rank];
448   PetscFree(work);
449 
450   /* post receives:
451        1) each message will consist of ordered pairs
452      (global index,value) we store the global index as a double
453      to simplify the message passing.
454        2) since we don't know how long each individual message is we
455      allocate the largest needed buffer for each receive. Potentially
456      this is a lot of wasted space.
457 
458 
459        This could be done better.
460   */
461   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
462   CHKPTRQ(rvalues);
463   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
464   CHKPTRQ(recv_waits);
465   for ( i=0; i<nreceives; i++ ) {
466     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
467               comm,recv_waits+i);
468   }
469 
470   /* do sends:
471       1) starts[i] gives the starting index in svalues for stuff going to
472          the ith processor
473   */
474   svalues = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
475   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
476   CHKPTRQ(send_waits);
477   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
478   starts[0] = 0;
479   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
480   for ( i=0; i<baij->stash.n; i++ ) {
481     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
482     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
483     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
484   }
485   PetscFree(owner);
486   starts[0] = 0;
487   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
488   count = 0;
489   for ( i=0; i<size; i++ ) {
490     if (procs[i]) {
491       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
492                 comm,send_waits+count++);
493     }
494   }
495   PetscFree(starts); PetscFree(nprocs);
496 
497   /* Free cache space */
498   PLogInfo(mat,"MatAssemblyBegin_MPIBAIJ:Number of off-processor values %d\n",baij->stash.n);
499   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
500 
501   baij->svalues    = svalues;    baij->rvalues    = rvalues;
502   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
503   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
504   baij->rmax       = nmax;
505 
506   return 0;
507 }
508 
509 
510 #undef __FUNC__
511 #define __FUNC__ "MatAssemblyEnd_MPIBAIJ"
512 static int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
513 {
514   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
515   MPI_Status  *send_status,recv_status;
516   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
517   int         bs=baij->bs,row,col,other_disassembled;
518   Scalar      *values,val;
519   InsertMode  addv = baij->insertmode;
520 
521   /*  wait on receives */
522   while (count) {
523     MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);
524     /* unpack receives into our local space */
525     values = baij->rvalues + 3*imdex*baij->rmax;
526     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
527     n = n/3;
528     for ( i=0; i<n; i++ ) {
529       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
530       col = (int) PetscReal(values[3*i+1]);
531       val = values[3*i+2];
532       if (col >= baij->cstart*bs && col < baij->cend*bs) {
533         col -= baij->cstart*bs;
534         MatSetValues(baij->A,1,&row,1,&col,&val,addv);
535       }
536       else {
537         if (mat->was_assembled) {
538           if (!baij->colmap) {
539             ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);
540           }
541           col = (baij->colmap[col/bs]-1)*bs + col%bs;
542           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
543             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
544             col = (int) PetscReal(values[3*i+1]);
545           }
546         }
547         MatSetValues(baij->B,1,&row,1,&col,&val,addv);
548       }
549     }
550     count--;
551   }
552   PetscFree(baij->recv_waits); PetscFree(baij->rvalues);
553 
554   /* wait on sends */
555   if (baij->nsends) {
556     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));
557     CHKPTRQ(send_status);
558     MPI_Waitall(baij->nsends,baij->send_waits,send_status);
559     PetscFree(send_status);
560   }
561   PetscFree(baij->send_waits); PetscFree(baij->svalues);
562 
563   baij->insertmode = NOT_SET_VALUES;
564   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
565   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
566 
567   /* determine if any processor has disassembled, if so we must
568      also disassemble ourselfs, in order that we may reassemble. */
569   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
570   if (mat->was_assembled && !other_disassembled) {
571     ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
572   }
573 
574   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
575     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
576   }
577   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
578   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
579 
580   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
581   return 0;
582 }
583 
584 #undef __FUNC__
585 #define __FUNC__ "MatView_MPIBAIJ_Binary"
586 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
587 {
588   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
589   int          ierr;
590 
591   if (baij->size == 1) {
592     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
593   }
594   else SETERRQ(1,0,"Only uniprocessor output supported");
595   return 0;
596 }
597 
598 #undef __FUNC__
599 #define __FUNC__ "MatView_MPIBAIJ_ASCIIorDraworMatlab"
600 static int MatView_MPIBAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
601 {
602   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
603   int          ierr, format,rank,bs = baij->bs;
604   FILE         *fd;
605   ViewerType   vtype;
606 
607   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
608   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
609     ierr = ViewerGetFormat(viewer,&format);
610     if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
611       MatInfo info;
612       MPI_Comm_rank(mat->comm,&rank);
613       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
614       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
615       PetscSequentialPhaseBegin(mat->comm,1);
616       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
617               rank,baij->m,(int)info.nz_used*bs,(int)info.nz_allocated*bs,
618               baij->bs,(int)info.memory);
619       ierr = MatGetInfo(baij->A,MAT_LOCAL,&info);
620       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
621       ierr = MatGetInfo(baij->B,MAT_LOCAL,&info);
622       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
623       fflush(fd);
624       PetscSequentialPhaseEnd(mat->comm,1);
625       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
626       return 0;
627     }
628     else if (format == VIEWER_FORMAT_ASCII_INFO) {
629       PetscPrintf(mat->comm,"  block size is %d\n",bs);
630       return 0;
631     }
632   }
633 
634   if (vtype == DRAW_VIEWER) {
635     Draw       draw;
636     PetscTruth isnull;
637     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
638     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
639   }
640 
641   if (vtype == ASCII_FILE_VIEWER) {
642     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
643     PetscSequentialPhaseBegin(mat->comm,1);
644     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
645            baij->rank,baij->m,baij->rstart*bs,baij->rend*bs,baij->n,
646             baij->cstart*bs,baij->cend*bs);
647     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
648     ierr = MatView(baij->B,viewer); CHKERRQ(ierr);
649     fflush(fd);
650     PetscSequentialPhaseEnd(mat->comm,1);
651   }
652   else {
653     int size = baij->size;
654     rank = baij->rank;
655     if (size == 1) {
656       ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
657     }
658     else {
659       /* assemble the entire matrix onto first processor. */
660       Mat         A;
661       Mat_SeqBAIJ *Aloc;
662       int         M = baij->M, N = baij->N,*ai,*aj,row,col,i,j,k,*rvals;
663       int         mbs=baij->mbs;
664       Scalar      *a;
665 
666       if (!rank) {
667         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
668         CHKERRQ(ierr);
669       }
670       else {
671         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
672         CHKERRQ(ierr);
673       }
674       PLogObjectParent(mat,A);
675 
676       /* copy over the A part */
677       Aloc = (Mat_SeqBAIJ*) baij->A->data;
678       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
679       row = baij->rstart;
680       rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
681 
682       for ( i=0; i<mbs; i++ ) {
683         rvals[0] = bs*(baij->rstart + i);
684         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
685         for ( j=ai[i]; j<ai[i+1]; j++ ) {
686           col = (baij->cstart+aj[j])*bs;
687           for (k=0; k<bs; k++ ) {
688             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
689             col++; a += bs;
690           }
691         }
692       }
693       /* copy over the B part */
694       Aloc = (Mat_SeqBAIJ*) baij->B->data;
695       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
696       row = baij->rstart*bs;
697       for ( i=0; i<mbs; i++ ) {
698         rvals[0] = bs*(baij->rstart + i);
699         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
700         for ( j=ai[i]; j<ai[i+1]; j++ ) {
701           col = baij->garray[aj[j]]*bs;
702           for (k=0; k<bs; k++ ) {
703             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
704             col++; a += bs;
705           }
706         }
707       }
708       PetscFree(rvals);
709       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
710       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
711       if (!rank) {
712         ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
713       }
714       ierr = MatDestroy(A); CHKERRQ(ierr);
715     }
716   }
717   return 0;
718 }
719 
720 
721 
722 #undef __FUNC__
723 #define __FUNC__ "MatView_MPIBAIJ"
724 static int MatView_MPIBAIJ(PetscObject obj,Viewer viewer)
725 {
726   Mat         mat = (Mat) obj;
727   int         ierr;
728   ViewerType  vtype;
729 
730   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
731   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
732       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
733     ierr = MatView_MPIBAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
734   }
735   else if (vtype == BINARY_FILE_VIEWER) {
736     return MatView_MPIBAIJ_Binary(mat,viewer);
737   }
738   return 0;
739 }
740 
741 #undef __FUNC__
742 #define __FUNC__ "MatDestroy_MPIBAIJ"
743 static int MatDestroy_MPIBAIJ(PetscObject obj)
744 {
745   Mat         mat = (Mat) obj;
746   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
747   int         ierr;
748 
749 #if defined(PETSC_LOG)
750   PLogObjectState(obj,"Rows=%d, Cols=%d",baij->M,baij->N);
751 #endif
752 
753   PetscFree(baij->rowners);
754   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
755   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
756   if (baij->colmap) PetscFree(baij->colmap);
757   if (baij->garray) PetscFree(baij->garray);
758   if (baij->lvec)   VecDestroy(baij->lvec);
759   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
760   if (baij->rowvalues) PetscFree(baij->rowvalues);
761   PetscFree(baij);
762   if (mat->mapping) {
763     ierr = ISLocalToGlobalMappingDestroy(mat->mapping); CHKERRQ(ierr);
764   }
765   PLogObjectDestroy(mat);
766   PetscHeaderDestroy(mat);
767   return 0;
768 }
769 
770 #undef __FUNC__
771 #define __FUNC__ "MatMult_MPIBAIJ"
772 static int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
773 {
774   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
775   int         ierr, nt;
776 
777   VecGetLocalSize_Fast(xx,nt);
778   if (nt != a->n) {
779     SETERRQ(1,0,"Incompatible partition of A and xx");
780   }
781   VecGetLocalSize_Fast(yy,nt);
782   if (nt != a->m) {
783     SETERRQ(1,0,"Incompatible parition of A and yy");
784   }
785   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
786   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
787   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
788   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
789   ierr = VecScatterPostRecvs(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
790   return 0;
791 }
792 
793 #undef __FUNC__
794 #define __FUNC__ "MatMultAdd_MPIBAIJ"
795 static int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
796 {
797   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
798   int        ierr;
799   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
800   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
801   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
802   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
803   return 0;
804 }
805 
806 #undef __FUNC__
807 #define __FUNC__ "MatMultTrans_MPIBAIJ"
808 static int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
809 {
810   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
811   int        ierr;
812 
813   /* do nondiagonal part */
814   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
815   /* send it on its way */
816   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
817   /* do local part */
818   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
819   /* receive remote parts: note this assumes the values are not actually */
820   /* inserted in yy until the next line, which is true for my implementation*/
821   /* but is not perhaps always true. */
822   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
823   return 0;
824 }
825 
826 #undef __FUNC__
827 #define __FUNC__ "MatMultTransAdd_MPIBAIJ"
828 static int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
829 {
830   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
831   int        ierr;
832 
833   /* do nondiagonal part */
834   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
835   /* send it on its way */
836   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
837   /* do local part */
838   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
839   /* receive remote parts: note this assumes the values are not actually */
840   /* inserted in yy until the next line, which is true for my implementation*/
841   /* but is not perhaps always true. */
842   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
843   return 0;
844 }
845 
846 /*
847   This only works correctly for square matrices where the subblock A->A is the
848    diagonal block
849 */
850 #undef __FUNC__
851 #define __FUNC__ "MatGetDiagonal_MPIBAIJ"
852 static int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
853 {
854   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
855   if (a->M != a->N)
856     SETERRQ(1,0,"Supports only square matrix where A->A is diag block");
857   return MatGetDiagonal(a->A,v);
858 }
859 
860 #undef __FUNC__
861 #define __FUNC__ "MatScale_MPIBAIJ"
862 static int MatScale_MPIBAIJ(Scalar *aa,Mat A)
863 {
864   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
865   int        ierr;
866   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
867   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
868   return 0;
869 }
870 
871 #undef __FUNC__
872 #define __FUNC__ "MatGetSize_MPIBAIJ"
873 static int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
874 {
875   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
876   *m = mat->M; *n = mat->N;
877   return 0;
878 }
879 
880 #undef __FUNC__
881 #define __FUNC__ "MatGetLocalSize_MPIBAIJ"
882 static int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
883 {
884   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
885   *m = mat->m; *n = mat->N;
886   return 0;
887 }
888 
889 #undef __FUNC__
890 #define __FUNC__ "MatGetOwnershipRange_MPIBAIJ"
891 static int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
892 {
893   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
894   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
895   return 0;
896 }
897 
898 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
899 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
900 
901 #undef __FUNC__
902 #define __FUNC__ "MatGetRow_MPIBAIJ"
903 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
904 {
905   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
906   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
907   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
908   int        nztot, nzA, nzB, lrow, brstart = mat->rstart*bs, brend = mat->rend*bs;
909   int        *cmap, *idx_p,cstart = mat->cstart;
910 
911   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,0,"Already active");
912   mat->getrowactive = PETSC_TRUE;
913 
914   if (!mat->rowvalues && (idx || v)) {
915     /*
916         allocate enough space to hold information from the longest row.
917     */
918     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
919     int     max = 1,mbs = mat->mbs,tmp;
920     for ( i=0; i<mbs; i++ ) {
921       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
922       if (max < tmp) { max = tmp; }
923     }
924     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
925     CHKPTRQ(mat->rowvalues);
926     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
927   }
928 
929 
930   if (row < brstart || row >= brend) SETERRQ(1,0,"Only local rows")
931   lrow = row - brstart;
932 
933   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
934   if (!v)   {pvA = 0; pvB = 0;}
935   if (!idx) {pcA = 0; if (!v) pcB = 0;}
936   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
937   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
938   nztot = nzA + nzB;
939 
940   cmap  = mat->garray;
941   if (v  || idx) {
942     if (nztot) {
943       /* Sort by increasing column numbers, assuming A and B already sorted */
944       int imark = -1;
945       if (v) {
946         *v = v_p = mat->rowvalues;
947         for ( i=0; i<nzB; i++ ) {
948           if (cmap[cworkB[i]/bs] < cstart)   v_p[i] = vworkB[i];
949           else break;
950         }
951         imark = i;
952         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
953         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
954       }
955       if (idx) {
956         *idx = idx_p = mat->rowindices;
957         if (imark > -1) {
958           for ( i=0; i<imark; i++ ) {
959             idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs;
960           }
961         } else {
962           for ( i=0; i<nzB; i++ ) {
963             if (cmap[cworkB[i]/bs] < cstart)
964               idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
965             else break;
966           }
967           imark = i;
968         }
969         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart*bs + cworkA[i];
970         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
971       }
972     }
973     else {
974       if (idx) *idx = 0;
975       if (v)   *v   = 0;
976     }
977   }
978   *nz = nztot;
979   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
980   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
981   return 0;
982 }
983 
984 #undef __FUNC__
985 #define __FUNC__ "MatRestoreRow_MPIBAIJ"
986 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
987 {
988   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
989   if (baij->getrowactive == PETSC_FALSE) {
990     SETERRQ(1,0,"MatGetRow not called");
991   }
992   baij->getrowactive = PETSC_FALSE;
993   return 0;
994 }
995 
996 #undef __FUNC__
997 #define __FUNC__ "MatGetBlockSize_MPIBAIJ"
998 static int MatGetBlockSize_MPIBAIJ(Mat mat,int *bs)
999 {
1000   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
1001   *bs = baij->bs;
1002   return 0;
1003 }
1004 
1005 #undef __FUNC__
1006 #define __FUNC__ "MatZeroEntries_MPIBAIJ"
1007 static int MatZeroEntries_MPIBAIJ(Mat A)
1008 {
1009   Mat_MPIBAIJ *l = (Mat_MPIBAIJ *) A->data;
1010   int         ierr;
1011   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
1012   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
1013   return 0;
1014 }
1015 
1016 #undef __FUNC__
1017 #define __FUNC__ "MatGetInfo_MPIBAIJ"
1018 static int MatGetInfo_MPIBAIJ(Mat matin,MatInfoType flag,MatInfo *info)
1019 {
1020   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) matin->data;
1021   Mat         A = a->A, B = a->B;
1022   int         ierr;
1023   double      isend[5], irecv[5];
1024 
1025   info->rows_global    = (double)a->M;
1026   info->columns_global = (double)a->N;
1027   info->rows_local     = (double)a->m;
1028   info->columns_local  = (double)a->N;
1029   info->block_size     = (double)a->bs;
1030   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
1031   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->memory;
1032   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
1033   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->memory;
1034   if (flag == MAT_LOCAL) {
1035     info->nz_used      = isend[0];
1036     info->nz_allocated = isend[1];
1037     info->nz_unneeded  = isend[2];
1038     info->memory       = isend[3];
1039     info->mallocs      = isend[4];
1040   } else if (flag == MAT_GLOBAL_MAX) {
1041     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_MAX,matin->comm);
1042     info->nz_used      = irecv[0];
1043     info->nz_allocated = irecv[1];
1044     info->nz_unneeded  = irecv[2];
1045     info->memory       = irecv[3];
1046     info->mallocs      = irecv[4];
1047   } else if (flag == MAT_GLOBAL_SUM) {
1048     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_SUM,matin->comm);
1049     info->nz_used      = irecv[0];
1050     info->nz_allocated = irecv[1];
1051     info->nz_unneeded  = irecv[2];
1052     info->memory       = irecv[3];
1053     info->mallocs      = irecv[4];
1054   }
1055   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
1056   info->fill_ratio_needed = 0;
1057   info->factor_mallocs    = 0;
1058   return 0;
1059 }
1060 
1061 #undef __FUNC__
1062 #define __FUNC__ "MatSetOption_MPIBAIJ"
1063 static int MatSetOption_MPIBAIJ(Mat A,MatOption op)
1064 {
1065   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
1066 
1067   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
1068       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
1069       op == MAT_COLUMNS_UNSORTED ||
1070       op == MAT_COLUMNS_SORTED) {
1071         MatSetOption(a->A,op);
1072         MatSetOption(a->B,op);
1073   } else if (op == MAT_ROW_ORIENTED) {
1074         a->roworiented = 1;
1075         MatSetOption(a->A,op);
1076         MatSetOption(a->B,op);
1077   } else if (op == MAT_ROWS_SORTED ||
1078              op == MAT_ROWS_UNSORTED ||
1079              op == MAT_SYMMETRIC ||
1080              op == MAT_STRUCTURALLY_SYMMETRIC ||
1081              op == MAT_YES_NEW_DIAGONALS)
1082     PLogInfo(A,"Info:MatSetOption_MPIBAIJ:Option ignored\n");
1083   else if (op == MAT_COLUMN_ORIENTED) {
1084     a->roworiented = 0;
1085     MatSetOption(a->A,op);
1086     MatSetOption(a->B,op);
1087   } else if (op == MAT_IGNORE_OFF_PROCESSOR_ENTRIES) {
1088     a->donotstash = 1;
1089   } else if (op == MAT_NO_NEW_DIAGONALS)
1090     {SETERRQ(PETSC_ERR_SUP,0,"MAT_NO_NEW_DIAGONALS");}
1091   else
1092     {SETERRQ(PETSC_ERR_SUP,0,"unknown option");}
1093   return 0;
1094 }
1095 
1096 #undef __FUNC__
1097 #define __FUNC__ "MatTranspose_MPIBAIJ("
1098 static int MatTranspose_MPIBAIJ(Mat A,Mat *matout)
1099 {
1100   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) A->data;
1101   Mat_SeqBAIJ *Aloc;
1102   Mat        B;
1103   int        ierr,M=baij->M,N=baij->N,*ai,*aj,row,i,*rvals,j,k,col;
1104   int        bs=baij->bs,mbs=baij->mbs;
1105   Scalar     *a;
1106 
1107   if (matout == PETSC_NULL && M != N)
1108     SETERRQ(1,0,"Square matrix only for in-place");
1109   ierr = MatCreateMPIBAIJ(A->comm,baij->bs,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);
1110   CHKERRQ(ierr);
1111 
1112   /* copy over the A part */
1113   Aloc = (Mat_SeqBAIJ*) baij->A->data;
1114   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1115   row = baij->rstart;
1116   rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
1117 
1118   for ( i=0; i<mbs; i++ ) {
1119     rvals[0] = bs*(baij->rstart + i);
1120     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1121     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1122       col = (baij->cstart+aj[j])*bs;
1123       for (k=0; k<bs; k++ ) {
1124         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1125         col++; a += bs;
1126       }
1127     }
1128   }
1129   /* copy over the B part */
1130   Aloc = (Mat_SeqBAIJ*) baij->B->data;
1131   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
1132   row = baij->rstart*bs;
1133   for ( i=0; i<mbs; i++ ) {
1134     rvals[0] = bs*(baij->rstart + i);
1135     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
1136     for ( j=ai[i]; j<ai[i+1]; j++ ) {
1137       col = baij->garray[aj[j]]*bs;
1138       for (k=0; k<bs; k++ ) {
1139         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
1140         col++; a += bs;
1141       }
1142     }
1143   }
1144   PetscFree(rvals);
1145   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1146   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1147 
1148   if (matout != PETSC_NULL) {
1149     *matout = B;
1150   } else {
1151     /* This isn't really an in-place transpose .... but free data structures from baij */
1152     PetscFree(baij->rowners);
1153     ierr = MatDestroy(baij->A); CHKERRQ(ierr);
1154     ierr = MatDestroy(baij->B); CHKERRQ(ierr);
1155     if (baij->colmap) PetscFree(baij->colmap);
1156     if (baij->garray) PetscFree(baij->garray);
1157     if (baij->lvec) VecDestroy(baij->lvec);
1158     if (baij->Mvctx) VecScatterDestroy(baij->Mvctx);
1159     PetscFree(baij);
1160     PetscMemcpy(A,B,sizeof(struct _Mat));
1161     PetscHeaderDestroy(B);
1162   }
1163   return 0;
1164 }
1165 
1166 #undef __FUNC__
1167 #define __FUNC__ "MatDiagonalScale_MPIBAIJ"
1168 int MatDiagonalScale_MPIBAIJ(Mat A,Vec ll,Vec rr)
1169 {
1170   Mat a = ((Mat_MPIBAIJ *) A->data)->A;
1171   Mat b = ((Mat_MPIBAIJ *) A->data)->B;
1172   int ierr,s1,s2,s3;
1173 
1174   if (ll)  {
1175     ierr = VecGetLocalSize(ll,&s1); CHKERRQ(ierr);
1176     ierr = MatGetLocalSize(A,&s2,&s3); CHKERRQ(ierr);
1177     if (s1!=s2) SETERRQ(1,0,"non-conforming local sizes");
1178     ierr = MatDiagonalScale(a,ll,0); CHKERRQ(ierr);
1179     ierr = MatDiagonalScale(b,ll,0); CHKERRQ(ierr);
1180   }
1181   if (rr) SETERRQ(1,0,"not supported for right vector");
1182   return 0;
1183 }
1184 
1185 /* the code does not do the diagonal entries correctly unless the
1186    matrix is square and the column and row owerships are identical.
1187    This is a BUG. The only way to fix it seems to be to access
1188    baij->A and baij->B directly and not through the MatZeroRows()
1189    routine.
1190 */
1191 #undef __FUNC__
1192 #define __FUNC__ "MatZeroRows_MPIBAIJ"
1193 static int MatZeroRows_MPIBAIJ(Mat A,IS is,Scalar *diag)
1194 {
1195   Mat_MPIBAIJ    *l = (Mat_MPIBAIJ *) A->data;
1196   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
1197   int            *procs,*nprocs,j,found,idx,nsends,*work;
1198   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
1199   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
1200   int            *lens,imdex,*lrows,*values,bs=l->bs;
1201   MPI_Comm       comm = A->comm;
1202   MPI_Request    *send_waits,*recv_waits;
1203   MPI_Status     recv_status,*send_status;
1204   IS             istmp;
1205 
1206   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
1207   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
1208 
1209   /*  first count number of contributors to each processor */
1210   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
1211   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
1212   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
1213   for ( i=0; i<N; i++ ) {
1214     idx = rows[i];
1215     found = 0;
1216     for ( j=0; j<size; j++ ) {
1217       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
1218         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
1219       }
1220     }
1221     if (!found) SETERRQ(1,0,"Index out of range");
1222   }
1223   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
1224 
1225   /* inform other processors of number of messages and max length*/
1226   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
1227   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
1228   nrecvs = work[rank];
1229   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
1230   nmax = work[rank];
1231   PetscFree(work);
1232 
1233   /* post receives:   */
1234   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
1235   CHKPTRQ(rvalues);
1236   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
1237   CHKPTRQ(recv_waits);
1238   for ( i=0; i<nrecvs; i++ ) {
1239     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
1240   }
1241 
1242   /* do sends:
1243       1) starts[i] gives the starting index in svalues for stuff going to
1244          the ith processor
1245   */
1246   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
1247   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
1248   CHKPTRQ(send_waits);
1249   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
1250   starts[0] = 0;
1251   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1252   for ( i=0; i<N; i++ ) {
1253     svalues[starts[owner[i]]++] = rows[i];
1254   }
1255   ISRestoreIndices(is,&rows);
1256 
1257   starts[0] = 0;
1258   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1259   count = 0;
1260   for ( i=0; i<size; i++ ) {
1261     if (procs[i]) {
1262       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
1263     }
1264   }
1265   PetscFree(starts);
1266 
1267   base = owners[rank]*bs;
1268 
1269   /*  wait on receives */
1270   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
1271   source = lens + nrecvs;
1272   count  = nrecvs; slen = 0;
1273   while (count) {
1274     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
1275     /* unpack receives into our local space */
1276     MPI_Get_count(&recv_status,MPI_INT,&n);
1277     source[imdex]  = recv_status.MPI_SOURCE;
1278     lens[imdex]  = n;
1279     slen += n;
1280     count--;
1281   }
1282   PetscFree(recv_waits);
1283 
1284   /* move the data into the send scatter */
1285   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
1286   count = 0;
1287   for ( i=0; i<nrecvs; i++ ) {
1288     values = rvalues + i*nmax;
1289     for ( j=0; j<lens[i]; j++ ) {
1290       lrows[count++] = values[j] - base;
1291     }
1292   }
1293   PetscFree(rvalues); PetscFree(lens);
1294   PetscFree(owner); PetscFree(nprocs);
1295 
1296   /* actually zap the local rows */
1297   ierr = ISCreateGeneral(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
1298   PLogObjectParent(A,istmp);
1299   PetscFree(lrows);
1300   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
1301   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
1302   ierr = ISDestroy(istmp); CHKERRQ(ierr);
1303 
1304   /* wait on sends */
1305   if (nsends) {
1306     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
1307     CHKPTRQ(send_status);
1308     MPI_Waitall(nsends,send_waits,send_status);
1309     PetscFree(send_status);
1310   }
1311   PetscFree(send_waits); PetscFree(svalues);
1312 
1313   return 0;
1314 }
1315 extern int MatPrintHelp_SeqBAIJ(Mat);
1316 #undef __FUNC__
1317 #define __FUNC__ "MatPrintHelp_MPIBAIJ"
1318 static int MatPrintHelp_MPIBAIJ(Mat A)
1319 {
1320   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1321 
1322   if (!a->rank) return MatPrintHelp_SeqBAIJ(a->A);
1323   else return 0;
1324 }
1325 
1326 #undef __FUNC__
1327 #define __FUNC__ "MatSetUnfactored_MPIBAIJ"
1328 static int MatSetUnfactored_MPIBAIJ(Mat A)
1329 {
1330   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1331   int         ierr;
1332   ierr = MatSetUnfactored(a->A); CHKERRQ(ierr);
1333   return 0;
1334 }
1335 
1336 static int MatConvertSameType_MPIBAIJ(Mat,Mat *,int);
1337 
1338 /* -------------------------------------------------------------------*/
1339 static struct _MatOps MatOps = {
1340   MatSetValues_MPIBAIJ,MatGetRow_MPIBAIJ,MatRestoreRow_MPIBAIJ,MatMult_MPIBAIJ,
1341   MatMultAdd_MPIBAIJ,MatMultTrans_MPIBAIJ,MatMultTransAdd_MPIBAIJ,0,
1342   0,0,0,0,
1343   0,0,MatTranspose_MPIBAIJ,MatGetInfo_MPIBAIJ,
1344   0,MatGetDiagonal_MPIBAIJ,MatDiagonalScale_MPIBAIJ,MatNorm_MPIBAIJ,
1345   MatAssemblyBegin_MPIBAIJ,MatAssemblyEnd_MPIBAIJ,0,MatSetOption_MPIBAIJ,
1346   MatZeroEntries_MPIBAIJ,MatZeroRows_MPIBAIJ,0,
1347   0,0,0,MatGetSize_MPIBAIJ,
1348   MatGetLocalSize_MPIBAIJ,MatGetOwnershipRange_MPIBAIJ,0,0,
1349   0,0,MatConvertSameType_MPIBAIJ,0,0,
1350   0,0,0,MatGetSubMatrices_MPIBAIJ,
1351   MatIncreaseOverlap_MPIBAIJ,MatGetValues_MPIBAIJ,0,MatPrintHelp_MPIBAIJ,
1352   MatScale_MPIBAIJ,0,0,0,MatGetBlockSize_MPIBAIJ,
1353   0,0,0,0,0,0,MatSetUnfactored_MPIBAIJ,0,MatSetValuesBlocked_MPIBAIJ};
1354 
1355 
1356 #undef __FUNC__
1357 #define __FUNC__ "MatCreateMPIBAIJ"
1358 /*@C
1359    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
1360    (block compressed row).  For good matrix assembly performance
1361    the user should preallocate the matrix storage by setting the parameters
1362    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1363    performance can be increased by more than a factor of 50.
1364 
1365    Input Parameters:
1366 .  comm - MPI communicator
1367 .  bs   - size of blockk
1368 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1369            This value should be the same as the local size used in creating the
1370            y vector for the matrix-vector product y = Ax.
1371 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1372            This value should be the same as the local size used in creating the
1373            x vector for the matrix-vector product y = Ax.
1374 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1375 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1376 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
1377            submatrix  (same for all local rows)
1378 .  d_nzz - array containing the number of block nonzeros in the various block rows
1379            of the in diagonal portion of the local (possibly different for each block
1380            row) or PETSC_NULL.  You must leave room for the diagonal entry even if
1381            it is zero.
1382 .  o_nz  - number of block nonzeros per block row in the off-diagonal portion of local
1383            submatrix (same for all local rows).
1384 .  o_nzz - array containing the number of nonzeros in the various block rows of the
1385            off-diagonal portion of the local submatrix (possibly different for
1386            each block row) or PETSC_NULL.
1387 
1388    Output Parameter:
1389 .  A - the matrix
1390 
1391    Notes:
1392    The user MUST specify either the local or global matrix dimensions
1393    (possibly both).
1394 
1395    Storage Information:
1396    For a square global matrix we define each processor's diagonal portion
1397    to be its local rows and the corresponding columns (a square submatrix);
1398    each processor's off-diagonal portion encompasses the remainder of the
1399    local matrix (a rectangular submatrix).
1400 
1401    The user can specify preallocated storage for the diagonal part of
1402    the local submatrix with either d_nz or d_nnz (not both).  Set
1403    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1404    memory allocation.  Likewise, specify preallocated storage for the
1405    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1406 
1407    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1408    the figure below we depict these three local rows and all columns (0-11).
1409 
1410 $          0 1 2 3 4 5 6 7 8 9 10 11
1411 $         -------------------
1412 $  row 3  |  o o o d d d o o o o o o
1413 $  row 4  |  o o o d d d o o o o o o
1414 $  row 5  |  o o o d d d o o o o o o
1415 $         -------------------
1416 $
1417 
1418    Thus, any entries in the d locations are stored in the d (diagonal)
1419    submatrix, and any entries in the o locations are stored in the
1420    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1421    stored simply in the MATSEQBAIJ format for compressed row storage.
1422 
1423    Now d_nz should indicate the number of nonzeros per row in the d matrix,
1424    and o_nz should indicate the number of nonzeros per row in the o matrix.
1425    In general, for PDE problems in which most nonzeros are near the diagonal,
1426    one expects d_nz >> o_nz.   For large problems you MUST preallocate memory
1427    or you will get TERRIBLE performance; see the users' manual chapter on
1428    matrices.
1429 
1430 .keywords: matrix, block, aij, compressed row, sparse, parallel
1431 
1432 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues()
1433 @*/
1434 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
1435                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1436 {
1437   Mat          B;
1438   Mat_MPIBAIJ  *b;
1439   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE,size;
1440 
1441   if (bs < 1) SETERRQ(1,0,"invalid block size specified");
1442   *A = 0;
1443   PetscHeaderCreate(B,_Mat,MAT_COOKIE,MATMPIBAIJ,comm);
1444   PLogObjectCreate(B);
1445   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
1446   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
1447   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1448   MPI_Comm_size(comm,&size);
1449   if (size == 1) {
1450     B->ops.getrowij          = MatGetRowIJ_MPIBAIJ;
1451     B->ops.restorerowij      = MatRestoreRowIJ_MPIBAIJ;
1452     B->ops.lufactorsymbolic  = MatLUFactorSymbolic_MPIBAIJ;
1453     B->ops.lufactornumeric   = MatLUFactorNumeric_MPIBAIJ;
1454     B->ops.lufactor          = MatLUFactor_MPIBAIJ;
1455     B->ops.solve             = MatSolve_MPIBAIJ;
1456     B->ops.solveadd          = MatSolveAdd_MPIBAIJ;
1457     B->ops.solvetrans        = MatSolveTrans_MPIBAIJ;
1458     B->ops.solvetransadd     = MatSolveTransAdd_MPIBAIJ;
1459     B->ops.ilufactorsymbolic = MatILUFactorSymbolic_MPIBAIJ;
1460   }
1461 
1462   B->destroy    = MatDestroy_MPIBAIJ;
1463   B->view       = MatView_MPIBAIJ;
1464   B->mapping    = 0;
1465   B->factor     = 0;
1466   B->assembled  = PETSC_FALSE;
1467 
1468   b->insertmode = NOT_SET_VALUES;
1469   MPI_Comm_rank(comm,&b->rank);
1470   MPI_Comm_size(comm,&b->size);
1471 
1472   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
1473     SETERRQ(1,0,"Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1474   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) SETERRQ(1,0,"either M or m should be specified");
1475   if ( M == PETSC_DECIDE && n == PETSC_DECIDE)SETERRQ(1,0,"either N or n should be specified");
1476   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
1477   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
1478 
1479   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1480     work[0] = m; work[1] = n;
1481     mbs = m/bs; nbs = n/bs;
1482     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
1483     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
1484     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
1485   }
1486   if (m == PETSC_DECIDE) {
1487     Mbs = M/bs;
1488     if (Mbs*bs != M) SETERRQ(1,0,"No of global rows must be divisible by blocksize");
1489     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
1490     m   = mbs*bs;
1491   }
1492   if (n == PETSC_DECIDE) {
1493     Nbs = N/bs;
1494     if (Nbs*bs != N) SETERRQ(1,0,"No of global cols must be divisible by blocksize");
1495     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
1496     n   = nbs*bs;
1497   }
1498   if (mbs*bs != m || nbs*bs != n) SETERRQ(1,0,"No of local rows, cols must be divisible by blocksize");
1499 
1500   b->m = m; B->m = m;
1501   b->n = n; B->n = n;
1502   b->N = N; B->N = N;
1503   b->M = M; B->M = M;
1504   b->bs  = bs;
1505   b->bs2 = bs*bs;
1506   b->mbs = mbs;
1507   b->nbs = nbs;
1508   b->Mbs = Mbs;
1509   b->Nbs = Nbs;
1510 
1511   /* build local table of row and column ownerships */
1512   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1513   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
1514   b->cowners = b->rowners + b->size + 2;
1515   MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
1516   b->rowners[0] = 0;
1517   for ( i=2; i<=b->size; i++ ) {
1518     b->rowners[i] += b->rowners[i-1];
1519   }
1520   b->rstart    = b->rowners[b->rank];
1521   b->rend      = b->rowners[b->rank+1];
1522   b->rstart_bs = b->rstart * bs;
1523   b->rend_bs   = b->rend * bs;
1524 
1525   MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
1526   b->cowners[0] = 0;
1527   for ( i=2; i<=b->size; i++ ) {
1528     b->cowners[i] += b->cowners[i-1];
1529   }
1530   b->cstart    = b->cowners[b->rank];
1531   b->cend      = b->cowners[b->rank+1];
1532   b->cstart_bs = b->cstart * bs;
1533   b->cend_bs   = b->cend * bs;
1534 
1535   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1536   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1537   PLogObjectParent(B,b->A);
1538   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1539   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1540   PLogObjectParent(B,b->B);
1541 
1542   /* build cache for off array entries formed */
1543   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1544   b->donotstash  = 0;
1545   b->colmap      = 0;
1546   b->garray      = 0;
1547   b->roworiented = 1;
1548 
1549   /* stuff used for matrix vector multiply */
1550   b->lvec      = 0;
1551   b->Mvctx     = 0;
1552 
1553   /* stuff for MatGetRow() */
1554   b->rowindices   = 0;
1555   b->rowvalues    = 0;
1556   b->getrowactive = PETSC_FALSE;
1557 
1558   *A = B;
1559   return 0;
1560 }
1561 
1562 #undef __FUNC__
1563 #define __FUNC__ "MatConvertSameType_MPIBAIJ"
1564 static int MatConvertSameType_MPIBAIJ(Mat matin,Mat *newmat,int cpvalues)
1565 {
1566   Mat        mat;
1567   Mat_MPIBAIJ *a,*oldmat = (Mat_MPIBAIJ *) matin->data;
1568   int        ierr, len=0, flg;
1569 
1570   *newmat       = 0;
1571   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIBAIJ,matin->comm);
1572   PLogObjectCreate(mat);
1573   mat->data       = (void *) (a = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(a);
1574   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1575   mat->destroy    = MatDestroy_MPIBAIJ;
1576   mat->view       = MatView_MPIBAIJ;
1577   mat->factor     = matin->factor;
1578   mat->assembled  = PETSC_TRUE;
1579 
1580   a->m = mat->m   = oldmat->m;
1581   a->n = mat->n   = oldmat->n;
1582   a->M = mat->M   = oldmat->M;
1583   a->N = mat->N   = oldmat->N;
1584 
1585   a->bs  = oldmat->bs;
1586   a->bs2 = oldmat->bs2;
1587   a->mbs = oldmat->mbs;
1588   a->nbs = oldmat->nbs;
1589   a->Mbs = oldmat->Mbs;
1590   a->Nbs = oldmat->Nbs;
1591 
1592   a->rstart       = oldmat->rstart;
1593   a->rend         = oldmat->rend;
1594   a->cstart       = oldmat->cstart;
1595   a->cend         = oldmat->cend;
1596   a->size         = oldmat->size;
1597   a->rank         = oldmat->rank;
1598   a->insertmode   = NOT_SET_VALUES;
1599   a->rowvalues    = 0;
1600   a->getrowactive = PETSC_FALSE;
1601 
1602   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1603   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
1604   a->cowners = a->rowners + a->size + 2;
1605   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1606   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1607   if (oldmat->colmap) {
1608     a->colmap = (int *) PetscMalloc((a->Nbs)*sizeof(int));CHKPTRQ(a->colmap);
1609     PLogObjectMemory(mat,(a->Nbs)*sizeof(int));
1610     PetscMemcpy(a->colmap,oldmat->colmap,(a->Nbs)*sizeof(int));
1611   } else a->colmap = 0;
1612   if (oldmat->garray && (len = ((Mat_SeqBAIJ *) (oldmat->B->data))->nbs)) {
1613     a->garray = (int *) PetscMalloc(len*sizeof(int)); CHKPTRQ(a->garray);
1614     PLogObjectMemory(mat,len*sizeof(int));
1615     PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1616   } else a->garray = 0;
1617 
1618   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1619   PLogObjectParent(mat,a->lvec);
1620   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1621   PLogObjectParent(mat,a->Mvctx);
1622   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1623   PLogObjectParent(mat,a->A);
1624   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1625   PLogObjectParent(mat,a->B);
1626   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1627   if (flg) {
1628     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1629   }
1630   *newmat = mat;
1631   return 0;
1632 }
1633 
1634 #include "sys.h"
1635 
1636 #undef __FUNC__
1637 #define __FUNC__ "MatLoad_MPIBAIJ"
1638 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
1639 {
1640   Mat          A;
1641   int          i, nz, ierr, j,rstart, rend, fd;
1642   Scalar       *vals,*buf;
1643   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1644   MPI_Status   status;
1645   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
1646   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
1647   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,bs2,Mbs,mbs,extra_rows;
1648   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
1649   int          dcount,kmax,k,nzcount,tmp;
1650 
1651 
1652   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
1653   bs2  = bs*bs;
1654 
1655   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1656   if (!rank) {
1657     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1658     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
1659     if (header[0] != MAT_COOKIE) SETERRQ(1,0,"not matrix object");
1660   }
1661 
1662   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1663   M = header[1]; N = header[2];
1664 
1665   if (M != N) SETERRQ(1,0,"Can only do square matrices");
1666 
1667   /*
1668      This code adds extra rows to make sure the number of rows is
1669      divisible by the blocksize
1670   */
1671   Mbs        = M/bs;
1672   extra_rows = bs - M + bs*(Mbs);
1673   if (extra_rows == bs) extra_rows = 0;
1674   else                  Mbs++;
1675   if (extra_rows &&!rank) {
1676     PLogInfo(0,"MatLoad_MPIBAIJ:Padding loaded matrix to match blocksize\n");
1677   }
1678 
1679   /* determine ownership of all rows */
1680   mbs = Mbs/size + ((Mbs % size) > rank);
1681   m   = mbs * bs;
1682   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
1683   browners = rowners + size + 1;
1684   MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1685   rowners[0] = 0;
1686   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
1687   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
1688   rstart = rowners[rank];
1689   rend   = rowners[rank+1];
1690 
1691   /* distribute row lengths to all processors */
1692   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
1693   if (!rank) {
1694     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
1695     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1696     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
1697     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1698     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
1699     MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);
1700     PetscFree(sndcounts);
1701   }
1702   else {
1703     MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);
1704   }
1705 
1706   if (!rank) {
1707     /* calculate the number of nonzeros on each processor */
1708     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1709     PetscMemzero(procsnz,size*sizeof(int));
1710     for ( i=0; i<size; i++ ) {
1711       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
1712         procsnz[i] += rowlengths[j];
1713       }
1714     }
1715     PetscFree(rowlengths);
1716 
1717     /* determine max buffer needed and allocate it */
1718     maxnz = 0;
1719     for ( i=0; i<size; i++ ) {
1720       maxnz = PetscMax(maxnz,procsnz[i]);
1721     }
1722     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1723 
1724     /* read in my part of the matrix column indices  */
1725     nz = procsnz[0];
1726     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1727     mycols = ibuf;
1728     if (size == 1)  nz -= extra_rows;
1729     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1730     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
1731 
1732     /* read in every ones (except the last) and ship off */
1733     for ( i=1; i<size-1; i++ ) {
1734       nz = procsnz[i];
1735       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1736       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1737     }
1738     /* read in the stuff for the last proc */
1739     if ( size != 1 ) {
1740       nz = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
1741       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1742       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
1743       MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);
1744     }
1745     PetscFree(cols);
1746   }
1747   else {
1748     /* determine buffer space needed for message */
1749     nz = 0;
1750     for ( i=0; i<m; i++ ) {
1751       nz += locrowlens[i];
1752     }
1753     ibuf = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1754     mycols = ibuf;
1755     /* receive message of column indices*/
1756     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1757     MPI_Get_count(&status,MPI_INT,&maxnz);
1758     if (maxnz != nz) SETERRQ(1,0,"something is wrong with file");
1759   }
1760 
1761   /* loop over local rows, determining number of off diagonal entries */
1762   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
1763   odlens = dlens + (rend-rstart);
1764   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
1765   PetscMemzero(mask,3*Mbs*sizeof(int));
1766   masked1 = mask    + Mbs;
1767   masked2 = masked1 + Mbs;
1768   rowcount = 0; nzcount = 0;
1769   for ( i=0; i<mbs; i++ ) {
1770     dcount  = 0;
1771     odcount = 0;
1772     for ( j=0; j<bs; j++ ) {
1773       kmax = locrowlens[rowcount];
1774       for ( k=0; k<kmax; k++ ) {
1775         tmp = mycols[nzcount++]/bs;
1776         if (!mask[tmp]) {
1777           mask[tmp] = 1;
1778           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
1779           else masked1[dcount++] = tmp;
1780         }
1781       }
1782       rowcount++;
1783     }
1784 
1785     dlens[i]  = dcount;
1786     odlens[i] = odcount;
1787 
1788     /* zero out the mask elements we set */
1789     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
1790     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
1791   }
1792 
1793   /* create our matrix */
1794   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);
1795          CHKERRQ(ierr);
1796   A = *newmat;
1797   MatSetOption(A,MAT_COLUMNS_SORTED);
1798 
1799   if (!rank) {
1800     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
1801     /* read in my part of the matrix numerical values  */
1802     nz = procsnz[0];
1803     vals = buf;
1804     mycols = ibuf;
1805     if (size == 1)  nz -= extra_rows;
1806     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1807     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
1808 
1809     /* insert into matrix */
1810     jj      = rstart*bs;
1811     for ( i=0; i<m; i++ ) {
1812       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1813       mycols += locrowlens[i];
1814       vals   += locrowlens[i];
1815       jj++;
1816     }
1817     /* read in other processors (except the last one) and ship out */
1818     for ( i=1; i<size-1; i++ ) {
1819       nz = procsnz[i];
1820       vals = buf;
1821       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1822       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1823     }
1824     /* the last proc */
1825     if ( size != 1 ){
1826       nz = procsnz[i] - extra_rows;
1827       vals = buf;
1828       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1829       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
1830       MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);
1831     }
1832     PetscFree(procsnz);
1833   }
1834   else {
1835     /* receive numeric values */
1836     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
1837 
1838     /* receive message of values*/
1839     vals = buf;
1840     mycols = ibuf;
1841     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1842     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1843     if (maxnz != nz) SETERRQ(1,0,"something is wrong with file");
1844 
1845     /* insert into matrix */
1846     jj      = rstart*bs;
1847     for ( i=0; i<m; i++ ) {
1848       ierr    = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1849       mycols += locrowlens[i];
1850       vals   += locrowlens[i];
1851       jj++;
1852     }
1853   }
1854   PetscFree(locrowlens);
1855   PetscFree(buf);
1856   PetscFree(ibuf);
1857   PetscFree(rowners);
1858   PetscFree(dlens);
1859   PetscFree(mask);
1860   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1861   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1862   return 0;
1863 }
1864 
1865 
1866