xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision bb5a7306ba2b79be99f6d9dff5d00348384195e8)
1 #ifndef lint
2 static char vcid[] = "$Id: mpibaij.c,v 1.44 1997/01/01 03:38:25 bsmith Exp bsmith $";
3 #endif
4 
5 #include "src/mat/impls/baij/mpi/mpibaij.h"
6 #include "src/vec/vecimpl.h"
7 
8 #include "draw.h"
9 #include "pinclude/pviewer.h"
10 
11 extern int MatSetUpMultiply_MPIBAIJ(Mat);
12 extern int DisAssemble_MPIBAIJ(Mat);
13 extern int MatIncreaseOverlap_MPIBAIJ(Mat,int,IS *,int);
14 extern int MatGetSubMatrices_MPIBAIJ(Mat,int,IS *,IS *,MatGetSubMatrixCall,Mat **);
15 extern int MatLUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,Mat *);
16 extern int MatLUFactorNumeric_MPIBAIJ(Mat,Mat *);
17 extern int MatLUFactor_MPIBAIJ(Mat,IS,IS,double);
18 extern int MatSolve_MPIBAIJ(Mat,Vec,Vec);
19 extern int MatSolveAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
20 extern int MatSolveTrans_MPIBAIJ(Mat,Vec,Vec);
21 extern int MatSolveTransAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
22 extern int MatILUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,int,Mat *);
23 
24 
25 /*
26      Local utility routine that creates a mapping from the global column
27    number to the local number in the off-diagonal part of the local
28    storage of the matrix.  This is done in a non scable way since the
29    length of colmap equals the global matrix length.
30 */
31 #undef __FUNCTION__
32 #define __FUNCTION__ "CreateColmap_MPIBAIJ_Private"
33 static int CreateColmap_MPIBAIJ_Private(Mat mat)
34 {
35   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
36   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
37   int         nbs = B->nbs,i,bs=B->bs;;
38 
39   baij->colmap = (int *) PetscMalloc(baij->Nbs*sizeof(int));CHKPTRQ(baij->colmap);
40   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
41   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
42   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i*bs+1;
43   return 0;
44 }
45 
46 #undef __FUNCTION__
47 #define __FUNCTION__ "MatGetRowIJ_MPIBAIJ("
48 static int MatGetRowIJ_MPIBAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
49                             PetscTruth *done)
50 {
51   Mat_MPIBAIJ *aij = (Mat_MPIBAIJ *) mat->data;
52   int         ierr;
53   if (aij->size == 1) {
54     ierr = MatGetRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
55   } else SETERRQ(1,0,"not supported in parallel");
56   return 0;
57 }
58 
59 #undef __FUNCTION__
60 #define __FUNCTION__ "MatRestoreRowIJ_MPIBAIJ"
61 static int MatRestoreRowIJ_MPIBAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
62                                 PetscTruth *done)
63 {
64   Mat_MPIBAIJ *aij = (Mat_MPIBAIJ *) mat->data;
65   int        ierr;
66   if (aij->size == 1) {
67     ierr = MatRestoreRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
68   } else SETERRQ(1,0,"not supported in parallel");
69   return 0;
70 }
71 
72 extern int MatSetValues_SeqBAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
73 #undef __FUNCTION__
74 #define __FUNCTION__ "MatSetValues_MPIBAIJ"
75 static int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
76 {
77   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
78   Scalar      value;
79   int         ierr,i,j,row,col;
80   int         roworiented = baij->roworiented,rstart_orig=baij->rstart_bs ;
81   int         rend_orig=baij->rend_bs,cstart_orig=baij->cstart_bs;
82   int         cend_orig=baij->cend_bs,bs=baij->bs;
83 
84 #if defined(PETSC_BOPT_g)
85   if (baij->insertmode != NOT_SET_VALUES && baij->insertmode != addv) {
86     SETERRQ(1,0,"Cannot mix inserts and adds");
87   }
88 #endif
89   baij->insertmode = addv;
90   for ( i=0; i<m; i++ ) {
91 #if defined(PETSC_BOPT_g)
92     if (im[i] < 0) SETERRQ(1,0,"Negative row");
93     if (im[i] >= baij->M) SETERRQ(1,0,"Row too large");
94 #endif
95     if (im[i] >= rstart_orig && im[i] < rend_orig) {
96       row = im[i] - rstart_orig;
97       for ( j=0; j<n; j++ ) {
98         if (in[j] >= cstart_orig && in[j] < cend_orig){
99           col = in[j] - cstart_orig;
100           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
101           ierr = MatSetValues_SeqBAIJ(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
102         }
103 #if defined(PETSC_BOPT_g)
104         else if (in[j] < 0) {SETERRQ(1,0,"Negative column");}
105         else if (in[j] >= baij->N) {SETERRQ(1,0,"Col too large");}
106 #endif
107         else {
108           if (mat->was_assembled) {
109             if (!baij->colmap) {
110               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
111             }
112             col = baij->colmap[in[j]/bs] - 1 + in[j]%bs;
113             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
114               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
115               col =  in[j];
116             }
117           }
118           else col = in[j];
119           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
120           ierr = MatSetValues_SeqBAIJ(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
121         }
122       }
123     }
124     else {
125       if (roworiented && !baij->donotstash) {
126         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
127       }
128       else {
129         if (!baij->donotstash) {
130           row = im[i];
131 	  for ( j=0; j<n; j++ ) {
132 	    ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
133           }
134         }
135       }
136     }
137   }
138   return 0;
139 }
140 
141 #undef __FUNCTION__
142 #define __FUNCTION__ "MatGetValues_MPIBAIJ"
143 static int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
144 {
145   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
146   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
147   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col;
148 
149   for ( i=0; i<m; i++ ) {
150     if (idxm[i] < 0) SETERRQ(1,0,"Negative row");
151     if (idxm[i] >= baij->M) SETERRQ(1,0,"Row too large");
152     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
153       row = idxm[i] - bsrstart;
154       for ( j=0; j<n; j++ ) {
155         if (idxn[j] < 0) SETERRQ(1,0,"Negative column");
156         if (idxn[j] >= baij->N) SETERRQ(1,0,"Col too large");
157         if (idxn[j] >= bscstart && idxn[j] < bscend){
158           col = idxn[j] - bscstart;
159           ierr = MatGetValues(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
160         }
161         else {
162           if (!baij->colmap) {
163             ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
164           }
165           if((baij->colmap[idxn[j]/bs]-1 < 0) ||
166              (baij->garray[(baij->colmap[idxn[j]/bs]-1)/bs] != idxn[j]/bs)) *(v+i*n+j) = 0.0;
167           else {
168             col  = (baij->colmap[idxn[j]/bs]-1) + idxn[j]%bs;
169             ierr = MatGetValues(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
170           }
171         }
172       }
173     }
174     else {
175       SETERRQ(1,0,"Only local values currently supported");
176     }
177   }
178   return 0;
179 }
180 
181 #undef __FUNCTION__
182 #define __FUNCTION__ "MatNorm_MPIBAIJ"
183 static int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
184 {
185   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
186   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
187   int        ierr, i,bs2=baij->bs2;
188   double     sum = 0.0;
189   Scalar     *v;
190 
191   if (baij->size == 1) {
192     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
193   } else {
194     if (type == NORM_FROBENIUS) {
195       v = amat->a;
196       for (i=0; i<amat->nz*bs2; i++ ) {
197 #if defined(PETSC_COMPLEX)
198         sum += real(conj(*v)*(*v)); v++;
199 #else
200         sum += (*v)*(*v); v++;
201 #endif
202       }
203       v = bmat->a;
204       for (i=0; i<bmat->nz*bs2; i++ ) {
205 #if defined(PETSC_COMPLEX)
206         sum += real(conj(*v)*(*v)); v++;
207 #else
208         sum += (*v)*(*v); v++;
209 #endif
210       }
211       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
212       *norm = sqrt(*norm);
213     }
214     else
215       SETERRQ(PETSC_ERR_SUP,0,"No support for this norm yet");
216   }
217   return 0;
218 }
219 
220 #undef __FUNCTION__
221 #define __FUNCTION__ "MatAssemblyBegin_MPIBAIJ"
222 static int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
223 {
224   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
225   MPI_Comm    comm = mat->comm;
226   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
227   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
228   MPI_Request *send_waits,*recv_waits;
229   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
230   InsertMode  addv;
231   Scalar      *rvalues,*svalues;
232 
233   /* make sure all processors are either in INSERTMODE or ADDMODE */
234   MPI_Allreduce(&baij->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
235   if (addv == (ADD_VALUES|INSERT_VALUES)) {
236     SETERRQ(1,0,"Some processors inserted others added");
237   }
238   baij->insertmode = addv; /* in case this processor had no cache */
239 
240   /*  first count number of contributors to each processor */
241   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
242   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
243   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
244   for ( i=0; i<baij->stash.n; i++ ) {
245     idx = baij->stash.idx[i];
246     for ( j=0; j<size; j++ ) {
247       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
248         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
249       }
250     }
251   }
252   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
253 
254   /* inform other processors of number of messages and max length*/
255   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
256   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
257   nreceives = work[rank];
258   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
259   nmax = work[rank];
260   PetscFree(work);
261 
262   /* post receives:
263        1) each message will consist of ordered pairs
264      (global index,value) we store the global index as a double
265      to simplify the message passing.
266        2) since we don't know how long each individual message is we
267      allocate the largest needed buffer for each receive. Potentially
268      this is a lot of wasted space.
269 
270 
271        This could be done better.
272   */
273   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
274   CHKPTRQ(rvalues);
275   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
276   CHKPTRQ(recv_waits);
277   for ( i=0; i<nreceives; i++ ) {
278     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
279               comm,recv_waits+i);
280   }
281 
282   /* do sends:
283       1) starts[i] gives the starting index in svalues for stuff going to
284          the ith processor
285   */
286   svalues = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
287   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
288   CHKPTRQ(send_waits);
289   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
290   starts[0] = 0;
291   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
292   for ( i=0; i<baij->stash.n; i++ ) {
293     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
294     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
295     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
296   }
297   PetscFree(owner);
298   starts[0] = 0;
299   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
300   count = 0;
301   for ( i=0; i<size; i++ ) {
302     if (procs[i]) {
303       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
304                 comm,send_waits+count++);
305     }
306   }
307   PetscFree(starts); PetscFree(nprocs);
308 
309   /* Free cache space */
310   PLogInfo(mat,"MatAssemblyBegin_MPIBAIJ:Number of off-processor values %d\n",baij->stash.n);
311   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
312 
313   baij->svalues    = svalues;    baij->rvalues    = rvalues;
314   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
315   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
316   baij->rmax       = nmax;
317 
318   return 0;
319 }
320 
321 
322 #undef __FUNCTION__
323 #define __FUNCTION__ "MatAssemblyEnd_MPIBAIJ"
324 static int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
325 {
326   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
327   MPI_Status  *send_status,recv_status;
328   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
329   int         bs=baij->bs,row,col,other_disassembled;
330   Scalar      *values,val;
331   InsertMode  addv = baij->insertmode;
332 
333   /*  wait on receives */
334   while (count) {
335     MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);
336     /* unpack receives into our local space */
337     values = baij->rvalues + 3*imdex*baij->rmax;
338     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
339     n = n/3;
340     for ( i=0; i<n; i++ ) {
341       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
342       col = (int) PetscReal(values[3*i+1]);
343       val = values[3*i+2];
344       if (col >= baij->cstart*bs && col < baij->cend*bs) {
345         col -= baij->cstart*bs;
346         MatSetValues(baij->A,1,&row,1,&col,&val,addv);
347       }
348       else {
349         if (mat->was_assembled) {
350           if (!baij->colmap) {
351             ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);
352           }
353           col = (baij->colmap[col/bs]-1)*bs + col%bs;
354           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
355             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
356             col = (int) PetscReal(values[3*i+1]);
357           }
358         }
359         MatSetValues(baij->B,1,&row,1,&col,&val,addv);
360       }
361     }
362     count--;
363   }
364   PetscFree(baij->recv_waits); PetscFree(baij->rvalues);
365 
366   /* wait on sends */
367   if (baij->nsends) {
368     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));
369     CHKPTRQ(send_status);
370     MPI_Waitall(baij->nsends,baij->send_waits,send_status);
371     PetscFree(send_status);
372   }
373   PetscFree(baij->send_waits); PetscFree(baij->svalues);
374 
375   baij->insertmode = NOT_SET_VALUES;
376   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
377   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
378 
379   /* determine if any processor has disassembled, if so we must
380      also disassemble ourselfs, in order that we may reassemble. */
381   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
382   if (mat->was_assembled && !other_disassembled) {
383     ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
384   }
385 
386   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
387     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
388   }
389   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
390   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
391 
392   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
393   return 0;
394 }
395 
396 #undef __FUNCTION__
397 #define __FUNCTION__ "MatView_MPIBAIJ_Binary"
398 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
399 {
400   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
401   int          ierr;
402 
403   if (baij->size == 1) {
404     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
405   }
406   else SETERRQ(1,0,"Only uniprocessor output supported");
407   return 0;
408 }
409 
410 #undef __FUNCTION__
411 #define __FUNCTION__ "MatView_MPIBAIJ_ASCIIorDraworMatlab"
412 static int MatView_MPIBAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
413 {
414   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
415   int          ierr, format,rank,bs = baij->bs;
416   FILE         *fd;
417   ViewerType   vtype;
418 
419   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
420   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
421     ierr = ViewerGetFormat(viewer,&format);
422     if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
423       MatInfo info;
424       MPI_Comm_rank(mat->comm,&rank);
425       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
426       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
427       PetscSequentialPhaseBegin(mat->comm,1);
428       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
429               rank,baij->m,(int)info.nz_used*bs,(int)info.nz_allocated*bs,
430               baij->bs,(int)info.memory);
431       ierr = MatGetInfo(baij->A,MAT_LOCAL,&info);
432       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
433       ierr = MatGetInfo(baij->B,MAT_LOCAL,&info);
434       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
435       fflush(fd);
436       PetscSequentialPhaseEnd(mat->comm,1);
437       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
438       return 0;
439     }
440     else if (format == VIEWER_FORMAT_ASCII_INFO) {
441       PetscPrintf(mat->comm,"  block size is %d\n",bs);
442       return 0;
443     }
444   }
445 
446   if (vtype == DRAW_VIEWER) {
447     Draw       draw;
448     PetscTruth isnull;
449     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
450     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
451   }
452 
453   if (vtype == ASCII_FILE_VIEWER) {
454     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
455     PetscSequentialPhaseBegin(mat->comm,1);
456     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
457            baij->rank,baij->m,baij->rstart*bs,baij->rend*bs,baij->n,
458             baij->cstart*bs,baij->cend*bs);
459     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
460     ierr = MatView(baij->B,viewer); CHKERRQ(ierr);
461     fflush(fd);
462     PetscSequentialPhaseEnd(mat->comm,1);
463   }
464   else {
465     int size = baij->size;
466     rank = baij->rank;
467     if (size == 1) {
468       ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
469     }
470     else {
471       /* assemble the entire matrix onto first processor. */
472       Mat         A;
473       Mat_SeqBAIJ *Aloc;
474       int         M = baij->M, N = baij->N,*ai,*aj,row,col,i,j,k,*rvals;
475       int         mbs=baij->mbs;
476       Scalar      *a;
477 
478       if (!rank) {
479         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
480         CHKERRQ(ierr);
481       }
482       else {
483         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
484         CHKERRQ(ierr);
485       }
486       PLogObjectParent(mat,A);
487 
488       /* copy over the A part */
489       Aloc = (Mat_SeqBAIJ*) baij->A->data;
490       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
491       row = baij->rstart;
492       rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
493 
494       for ( i=0; i<mbs; i++ ) {
495         rvals[0] = bs*(baij->rstart + i);
496         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
497         for ( j=ai[i]; j<ai[i+1]; j++ ) {
498           col = (baij->cstart+aj[j])*bs;
499           for (k=0; k<bs; k++ ) {
500             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
501             col++; a += bs;
502           }
503         }
504       }
505       /* copy over the B part */
506       Aloc = (Mat_SeqBAIJ*) baij->B->data;
507       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
508       row = baij->rstart*bs;
509       for ( i=0; i<mbs; i++ ) {
510         rvals[0] = bs*(baij->rstart + i);
511         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
512         for ( j=ai[i]; j<ai[i+1]; j++ ) {
513           col = baij->garray[aj[j]]*bs;
514           for (k=0; k<bs; k++ ) {
515             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
516             col++; a += bs;
517           }
518         }
519       }
520       PetscFree(rvals);
521       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
522       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
523       if (!rank) {
524         ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
525       }
526       ierr = MatDestroy(A); CHKERRQ(ierr);
527     }
528   }
529   return 0;
530 }
531 
532 
533 
534 #undef __FUNCTION__
535 #define __FUNCTION__ "MatView_MPIBAIJ"
536 static int MatView_MPIBAIJ(PetscObject obj,Viewer viewer)
537 {
538   Mat         mat = (Mat) obj;
539   int         ierr;
540   ViewerType  vtype;
541 
542   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
543   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
544       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
545     ierr = MatView_MPIBAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
546   }
547   else if (vtype == BINARY_FILE_VIEWER) {
548     return MatView_MPIBAIJ_Binary(mat,viewer);
549   }
550   return 0;
551 }
552 
553 #undef __FUNCTION__
554 #define __FUNCTION__ "MatDestroy_MPIBAIJ"
555 static int MatDestroy_MPIBAIJ(PetscObject obj)
556 {
557   Mat         mat = (Mat) obj;
558   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
559   int         ierr;
560 
561 #if defined(PETSC_LOG)
562   PLogObjectState(obj,"Rows=%d, Cols=%d",baij->M,baij->N);
563 #endif
564 
565   PetscFree(baij->rowners);
566   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
567   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
568   if (baij->colmap) PetscFree(baij->colmap);
569   if (baij->garray) PetscFree(baij->garray);
570   if (baij->lvec)   VecDestroy(baij->lvec);
571   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
572   if (baij->rowvalues) PetscFree(baij->rowvalues);
573   PetscFree(baij);
574   if (mat->mapping) {
575     ierr = ISLocalToGlobalMappingDestroy(mat->mapping); CHKERRQ(ierr);
576   }
577   PLogObjectDestroy(mat);
578   PetscHeaderDestroy(mat);
579   return 0;
580 }
581 
582 #undef __FUNCTION__
583 #define __FUNCTION__ "MatMult_MPIBAIJ"
584 static int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
585 {
586   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
587   int         ierr, nt;
588 
589   VecGetLocalSize_Fast(xx,nt);
590   if (nt != a->n) {
591     SETERRQ(1,0,"Incompatible parition of A and xx");
592   }
593   VecGetLocalSize_Fast(yy,nt);
594   if (nt != a->m) {
595     SETERRQ(1,0,"Incompatible parition of A and yy");
596   }
597   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
598   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
599   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
600   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
601   ierr = VecScatterPostRecvs(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
602   return 0;
603 }
604 
605 #undef __FUNCTION__
606 #define __FUNCTION__ "MatMultAdd_MPIBAIJ"
607 static int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
608 {
609   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
610   int        ierr;
611   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
612   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
613   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
614   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
615   return 0;
616 }
617 
618 #undef __FUNCTION__
619 #define __FUNCTION__ "MatMultTrans_MPIBAIJ"
620 static int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
621 {
622   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
623   int        ierr;
624 
625   /* do nondiagonal part */
626   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
627   /* send it on its way */
628   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
629   /* do local part */
630   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
631   /* receive remote parts: note this assumes the values are not actually */
632   /* inserted in yy until the next line, which is true for my implementation*/
633   /* but is not perhaps always true. */
634   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
635   return 0;
636 }
637 
638 #undef __FUNCTION__
639 #define __FUNCTION__ "MatMultTransAdd_MPIBAIJ"
640 static int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
641 {
642   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
643   int        ierr;
644 
645   /* do nondiagonal part */
646   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
647   /* send it on its way */
648   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
649   /* do local part */
650   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
651   /* receive remote parts: note this assumes the values are not actually */
652   /* inserted in yy until the next line, which is true for my implementation*/
653   /* but is not perhaps always true. */
654   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
655   return 0;
656 }
657 
658 /*
659   This only works correctly for square matrices where the subblock A->A is the
660    diagonal block
661 */
662 #undef __FUNCTION__
663 #define __FUNCTION__ "MatGetDiagonal_MPIBAIJ"
664 static int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
665 {
666   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
667   if (a->M != a->N)
668     SETERRQ(1,0,"Supports only square matrix where A->A is diag block");
669   return MatGetDiagonal(a->A,v);
670 }
671 
672 #undef __FUNCTION__
673 #define __FUNCTION__ "MatScale_MPIBAIJ"
674 static int MatScale_MPIBAIJ(Scalar *aa,Mat A)
675 {
676   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
677   int        ierr;
678   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
679   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
680   return 0;
681 }
682 
683 #undef __FUNCTION__
684 #define __FUNCTION__ "MatGetSize_MPIBAIJ"
685 static int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
686 {
687   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
688   *m = mat->M; *n = mat->N;
689   return 0;
690 }
691 
692 #undef __FUNCTION__
693 #define __FUNCTION__ "MatGetLocalSize_MPIBAIJ"
694 static int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
695 {
696   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
697   *m = mat->m; *n = mat->N;
698   return 0;
699 }
700 
701 #undef __FUNCTION__
702 #define __FUNCTION__ "MatGetOwnershipRange_MPIBAIJ"
703 static int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
704 {
705   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
706   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
707   return 0;
708 }
709 
710 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
711 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
712 
713 #undef __FUNCTION__
714 #define __FUNCTION__ "MatGetRow_MPIBAIJ"
715 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
716 {
717   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
718   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
719   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
720   int        nztot, nzA, nzB, lrow, brstart = mat->rstart*bs, brend = mat->rend*bs;
721   int        *cmap, *idx_p,cstart = mat->cstart;
722 
723   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,0,"Already active");
724   mat->getrowactive = PETSC_TRUE;
725 
726   if (!mat->rowvalues && (idx || v)) {
727     /*
728         allocate enough space to hold information from the longest row.
729     */
730     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
731     int     max = 1,mbs = mat->mbs,tmp;
732     for ( i=0; i<mbs; i++ ) {
733       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
734       if (max < tmp) { max = tmp; }
735     }
736     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
737     CHKPTRQ(mat->rowvalues);
738     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
739   }
740 
741 
742   if (row < brstart || row >= brend) SETERRQ(1,0,"Only local rows")
743   lrow = row - brstart;
744 
745   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
746   if (!v)   {pvA = 0; pvB = 0;}
747   if (!idx) {pcA = 0; if (!v) pcB = 0;}
748   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
749   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
750   nztot = nzA + nzB;
751 
752   cmap  = mat->garray;
753   if (v  || idx) {
754     if (nztot) {
755       /* Sort by increasing column numbers, assuming A and B already sorted */
756       int imark = -1;
757       if (v) {
758         *v = v_p = mat->rowvalues;
759         for ( i=0; i<nzB; i++ ) {
760           if (cmap[cworkB[i]/bs] < cstart)   v_p[i] = vworkB[i];
761           else break;
762         }
763         imark = i;
764         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
765         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
766       }
767       if (idx) {
768         *idx = idx_p = mat->rowindices;
769         if (imark > -1) {
770           for ( i=0; i<imark; i++ ) {
771             idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs;
772           }
773         } else {
774           for ( i=0; i<nzB; i++ ) {
775             if (cmap[cworkB[i]/bs] < cstart)
776               idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
777             else break;
778           }
779           imark = i;
780         }
781         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart*bs + cworkA[i];
782         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
783       }
784     }
785     else {
786       if (idx) *idx = 0;
787       if (v)   *v   = 0;
788     }
789   }
790   *nz = nztot;
791   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
792   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
793   return 0;
794 }
795 
796 #undef __FUNCTION__
797 #define __FUNCTION__ "MatRestoreRow_MPIBAIJ"
798 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
799 {
800   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
801   if (baij->getrowactive == PETSC_FALSE) {
802     SETERRQ(1,0,"MatGetRow not called");
803   }
804   baij->getrowactive = PETSC_FALSE;
805   return 0;
806 }
807 
808 #undef __FUNCTION__
809 #define __FUNCTION__ "MatGetBlockSize_MPIBAIJ"
810 static int MatGetBlockSize_MPIBAIJ(Mat mat,int *bs)
811 {
812   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
813   *bs = baij->bs;
814   return 0;
815 }
816 
817 #undef __FUNCTION__
818 #define __FUNCTION__ "MatZeroEntries_MPIBAIJ"
819 static int MatZeroEntries_MPIBAIJ(Mat A)
820 {
821   Mat_MPIBAIJ *l = (Mat_MPIBAIJ *) A->data;
822   int         ierr;
823   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
824   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
825   return 0;
826 }
827 
828 #undef __FUNCTION__
829 #define __FUNCTION__ "MatGetInfo_MPIBAIJ"
830 static int MatGetInfo_MPIBAIJ(Mat matin,MatInfoType flag,MatInfo *info)
831 {
832   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) matin->data;
833   Mat         A = a->A, B = a->B;
834   int         ierr;
835   double      isend[5], irecv[5];
836 
837   info->rows_global    = (double)a->M;
838   info->columns_global = (double)a->N;
839   info->rows_local     = (double)a->m;
840   info->columns_local  = (double)a->N;
841   info->block_size     = (double)a->bs;
842   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
843   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->memory;
844   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
845   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->memory;
846   if (flag == MAT_LOCAL) {
847     info->nz_used      = isend[0];
848     info->nz_allocated = isend[1];
849     info->nz_unneeded  = isend[2];
850     info->memory       = isend[3];
851     info->mallocs      = isend[4];
852   } else if (flag == MAT_GLOBAL_MAX) {
853     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_MAX,matin->comm);
854     info->nz_used      = irecv[0];
855     info->nz_allocated = irecv[1];
856     info->nz_unneeded  = irecv[2];
857     info->memory       = irecv[3];
858     info->mallocs      = irecv[4];
859   } else if (flag == MAT_GLOBAL_SUM) {
860     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_SUM,matin->comm);
861     info->nz_used      = irecv[0];
862     info->nz_allocated = irecv[1];
863     info->nz_unneeded  = irecv[2];
864     info->memory       = irecv[3];
865     info->mallocs      = irecv[4];
866   }
867   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
868   info->fill_ratio_needed = 0;
869   info->factor_mallocs    = 0;
870   return 0;
871 }
872 
873 #undef __FUNCTION__
874 #define __FUNCTION__ "MatSetOption_MPIBAIJ"
875 static int MatSetOption_MPIBAIJ(Mat A,MatOption op)
876 {
877   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
878 
879   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
880       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
881       op == MAT_COLUMNS_UNSORTED ||
882       op == MAT_COLUMNS_SORTED) {
883         MatSetOption(a->A,op);
884         MatSetOption(a->B,op);
885   } else if (op == MAT_ROW_ORIENTED) {
886         a->roworiented = 1;
887         MatSetOption(a->A,op);
888         MatSetOption(a->B,op);
889   } else if (op == MAT_ROWS_SORTED ||
890              op == MAT_ROWS_UNSORTED ||
891              op == MAT_SYMMETRIC ||
892              op == MAT_STRUCTURALLY_SYMMETRIC ||
893              op == MAT_YES_NEW_DIAGONALS)
894     PLogInfo(A,"Info:MatSetOption_MPIBAIJ:Option ignored\n");
895   else if (op == MAT_COLUMN_ORIENTED) {
896     a->roworiented = 0;
897     MatSetOption(a->A,op);
898     MatSetOption(a->B,op);
899   } else if (op == MAT_IGNORE_OFF_PROCESSOR_ENTRIES) {
900     a->donotstash = 1;
901   } else if (op == MAT_NO_NEW_DIAGONALS)
902     {SETERRQ(PETSC_ERR_SUP,0,"MAT_NO_NEW_DIAGONALS");}
903   else
904     {SETERRQ(PETSC_ERR_SUP,0,"unknown option");}
905   return 0;
906 }
907 
908 #undef __FUNCTION__
909 #define __FUNCTION__ "MatTranspose_MPIBAIJ("
910 static int MatTranspose_MPIBAIJ(Mat A,Mat *matout)
911 {
912   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) A->data;
913   Mat_SeqBAIJ *Aloc;
914   Mat        B;
915   int        ierr,M=baij->M,N=baij->N,*ai,*aj,row,i,*rvals,j,k,col;
916   int        bs=baij->bs,mbs=baij->mbs;
917   Scalar     *a;
918 
919   if (matout == PETSC_NULL && M != N)
920     SETERRQ(1,0,"Square matrix only for in-place");
921   ierr = MatCreateMPIBAIJ(A->comm,baij->bs,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);
922   CHKERRQ(ierr);
923 
924   /* copy over the A part */
925   Aloc = (Mat_SeqBAIJ*) baij->A->data;
926   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
927   row = baij->rstart;
928   rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
929 
930   for ( i=0; i<mbs; i++ ) {
931     rvals[0] = bs*(baij->rstart + i);
932     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
933     for ( j=ai[i]; j<ai[i+1]; j++ ) {
934       col = (baij->cstart+aj[j])*bs;
935       for (k=0; k<bs; k++ ) {
936         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
937         col++; a += bs;
938       }
939     }
940   }
941   /* copy over the B part */
942   Aloc = (Mat_SeqBAIJ*) baij->B->data;
943   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
944   row = baij->rstart*bs;
945   for ( i=0; i<mbs; i++ ) {
946     rvals[0] = bs*(baij->rstart + i);
947     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
948     for ( j=ai[i]; j<ai[i+1]; j++ ) {
949       col = baij->garray[aj[j]]*bs;
950       for (k=0; k<bs; k++ ) {
951         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
952         col++; a += bs;
953       }
954     }
955   }
956   PetscFree(rvals);
957   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
958   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
959 
960   if (matout != PETSC_NULL) {
961     *matout = B;
962   } else {
963     /* This isn't really an in-place transpose .... but free data structures from baij */
964     PetscFree(baij->rowners);
965     ierr = MatDestroy(baij->A); CHKERRQ(ierr);
966     ierr = MatDestroy(baij->B); CHKERRQ(ierr);
967     if (baij->colmap) PetscFree(baij->colmap);
968     if (baij->garray) PetscFree(baij->garray);
969     if (baij->lvec) VecDestroy(baij->lvec);
970     if (baij->Mvctx) VecScatterDestroy(baij->Mvctx);
971     PetscFree(baij);
972     PetscMemcpy(A,B,sizeof(struct _Mat));
973     PetscHeaderDestroy(B);
974   }
975   return 0;
976 }
977 
978 #undef __FUNCTION__
979 #define __FUNCTION__ "MatDiagonalScale_MPIBAIJ"
980 int MatDiagonalScale_MPIBAIJ(Mat A,Vec ll,Vec rr)
981 {
982   Mat a = ((Mat_MPIBAIJ *) A->data)->A;
983   Mat b = ((Mat_MPIBAIJ *) A->data)->B;
984   int ierr,s1,s2,s3;
985 
986   if (ll)  {
987     ierr = VecGetLocalSize(ll,&s1); CHKERRQ(ierr);
988     ierr = MatGetLocalSize(A,&s2,&s3); CHKERRQ(ierr);
989     if (s1!=s2) SETERRQ(1,0,"non-conforming local sizes");
990     ierr = MatDiagonalScale(a,ll,0); CHKERRQ(ierr);
991     ierr = MatDiagonalScale(b,ll,0); CHKERRQ(ierr);
992   }
993   if (rr) SETERRQ(1,0,"not supported for right vector");
994   return 0;
995 }
996 
997 /* the code does not do the diagonal entries correctly unless the
998    matrix is square and the column and row owerships are identical.
999    This is a BUG. The only way to fix it seems to be to access
1000    baij->A and baij->B directly and not through the MatZeroRows()
1001    routine.
1002 */
1003 #undef __FUNCTION__
1004 #define __FUNCTION__ "MatZeroRows_MPIBAIJ"
1005 static int MatZeroRows_MPIBAIJ(Mat A,IS is,Scalar *diag)
1006 {
1007   Mat_MPIBAIJ    *l = (Mat_MPIBAIJ *) A->data;
1008   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
1009   int            *procs,*nprocs,j,found,idx,nsends,*work;
1010   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
1011   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
1012   int            *lens,imdex,*lrows,*values,bs=l->bs;
1013   MPI_Comm       comm = A->comm;
1014   MPI_Request    *send_waits,*recv_waits;
1015   MPI_Status     recv_status,*send_status;
1016   IS             istmp;
1017 
1018   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
1019   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
1020 
1021   /*  first count number of contributors to each processor */
1022   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
1023   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
1024   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
1025   for ( i=0; i<N; i++ ) {
1026     idx = rows[i];
1027     found = 0;
1028     for ( j=0; j<size; j++ ) {
1029       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
1030         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
1031       }
1032     }
1033     if (!found) SETERRQ(1,0,"Index out of range");
1034   }
1035   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
1036 
1037   /* inform other processors of number of messages and max length*/
1038   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
1039   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
1040   nrecvs = work[rank];
1041   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
1042   nmax = work[rank];
1043   PetscFree(work);
1044 
1045   /* post receives:   */
1046   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
1047   CHKPTRQ(rvalues);
1048   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
1049   CHKPTRQ(recv_waits);
1050   for ( i=0; i<nrecvs; i++ ) {
1051     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
1052   }
1053 
1054   /* do sends:
1055       1) starts[i] gives the starting index in svalues for stuff going to
1056          the ith processor
1057   */
1058   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
1059   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
1060   CHKPTRQ(send_waits);
1061   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
1062   starts[0] = 0;
1063   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1064   for ( i=0; i<N; i++ ) {
1065     svalues[starts[owner[i]]++] = rows[i];
1066   }
1067   ISRestoreIndices(is,&rows);
1068 
1069   starts[0] = 0;
1070   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1071   count = 0;
1072   for ( i=0; i<size; i++ ) {
1073     if (procs[i]) {
1074       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
1075     }
1076   }
1077   PetscFree(starts);
1078 
1079   base = owners[rank]*bs;
1080 
1081   /*  wait on receives */
1082   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
1083   source = lens + nrecvs;
1084   count  = nrecvs; slen = 0;
1085   while (count) {
1086     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
1087     /* unpack receives into our local space */
1088     MPI_Get_count(&recv_status,MPI_INT,&n);
1089     source[imdex]  = recv_status.MPI_SOURCE;
1090     lens[imdex]  = n;
1091     slen += n;
1092     count--;
1093   }
1094   PetscFree(recv_waits);
1095 
1096   /* move the data into the send scatter */
1097   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
1098   count = 0;
1099   for ( i=0; i<nrecvs; i++ ) {
1100     values = rvalues + i*nmax;
1101     for ( j=0; j<lens[i]; j++ ) {
1102       lrows[count++] = values[j] - base;
1103     }
1104   }
1105   PetscFree(rvalues); PetscFree(lens);
1106   PetscFree(owner); PetscFree(nprocs);
1107 
1108   /* actually zap the local rows */
1109   ierr = ISCreateGeneral(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
1110   PLogObjectParent(A,istmp);
1111   PetscFree(lrows);
1112   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
1113   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
1114   ierr = ISDestroy(istmp); CHKERRQ(ierr);
1115 
1116   /* wait on sends */
1117   if (nsends) {
1118     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
1119     CHKPTRQ(send_status);
1120     MPI_Waitall(nsends,send_waits,send_status);
1121     PetscFree(send_status);
1122   }
1123   PetscFree(send_waits); PetscFree(svalues);
1124 
1125   return 0;
1126 }
1127 extern int MatPrintHelp_SeqBAIJ(Mat);
1128 #undef __FUNCTION__
1129 #define __FUNCTION__ "MatPrintHelp_MPIBAIJ"
1130 static int MatPrintHelp_MPIBAIJ(Mat A)
1131 {
1132   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1133 
1134   if (!a->rank) return MatPrintHelp_SeqBAIJ(a->A);
1135   else return 0;
1136 }
1137 
1138 #undef __FUNCTION__
1139 #define __FUNCTION__ "MatSetUnfactored_MPIBAIJ"
1140 static int MatSetUnfactored_MPIBAIJ(Mat A)
1141 {
1142   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1143   int         ierr;
1144   ierr = MatSetUnfactored(a->A); CHKERRQ(ierr);
1145   return 0;
1146 }
1147 
1148 static int MatConvertSameType_MPIBAIJ(Mat,Mat *,int);
1149 
1150 /* -------------------------------------------------------------------*/
1151 static struct _MatOps MatOps = {
1152   MatSetValues_MPIBAIJ,MatGetRow_MPIBAIJ,MatRestoreRow_MPIBAIJ,MatMult_MPIBAIJ,
1153   MatMultAdd_MPIBAIJ,MatMultTrans_MPIBAIJ,MatMultTransAdd_MPIBAIJ,0,
1154   0,0,0,0,
1155   0,0,MatTranspose_MPIBAIJ,MatGetInfo_MPIBAIJ,
1156   0,MatGetDiagonal_MPIBAIJ,MatDiagonalScale_MPIBAIJ,MatNorm_MPIBAIJ,
1157   MatAssemblyBegin_MPIBAIJ,MatAssemblyEnd_MPIBAIJ,0,MatSetOption_MPIBAIJ,
1158   MatZeroEntries_MPIBAIJ,MatZeroRows_MPIBAIJ,0,
1159   0,0,0,MatGetSize_MPIBAIJ,
1160   MatGetLocalSize_MPIBAIJ,MatGetOwnershipRange_MPIBAIJ,0,0,
1161   0,0,0,MatConvertSameType_MPIBAIJ,0,0,
1162   0,0,0,MatGetSubMatrices_MPIBAIJ,
1163   MatIncreaseOverlap_MPIBAIJ,MatGetValues_MPIBAIJ,0,MatPrintHelp_MPIBAIJ,
1164   MatScale_MPIBAIJ,0,0,0,MatGetBlockSize_MPIBAIJ,
1165   0,0,0,0,0,0,MatSetUnfactored_MPIBAIJ};
1166 
1167 
1168 #undef __FUNCTION__
1169 #define __FUNCTION__ "MatCreateMPIBAIJ"
1170 /*@C
1171    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
1172    (block compressed row).  For good matrix assembly performance
1173    the user should preallocate the matrix storage by setting the parameters
1174    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1175    performance can be increased by more than a factor of 50.
1176 
1177    Input Parameters:
1178 .  comm - MPI communicator
1179 .  bs   - size of blockk
1180 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1181            This value should be the same as the local size used in creating the
1182            y vector for the matrix-vector product y = Ax.
1183 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1184            This value should be the same as the local size used in creating the
1185            x vector for the matrix-vector product y = Ax.
1186 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1187 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1188 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
1189            submatrix  (same for all local rows)
1190 .  d_nzz - array containing the number of block nonzeros in the various block rows
1191            of the in diagonal portion of the local (possibly different for each block
1192            row) or PETSC_NULL.  You must leave room for the diagonal entry even if
1193            it is zero.
1194 .  o_nz  - number of block nonzeros per block row in the off-diagonal portion of local
1195            submatrix (same for all local rows).
1196 .  o_nzz - array containing the number of nonzeros in the various block rows of the
1197            off-diagonal portion of the local submatrix (possibly different for
1198            each block row) or PETSC_NULL.
1199 
1200    Output Parameter:
1201 .  A - the matrix
1202 
1203    Notes:
1204    The user MUST specify either the local or global matrix dimensions
1205    (possibly both).
1206 
1207    Storage Information:
1208    For a square global matrix we define each processor's diagonal portion
1209    to be its local rows and the corresponding columns (a square submatrix);
1210    each processor's off-diagonal portion encompasses the remainder of the
1211    local matrix (a rectangular submatrix).
1212 
1213    The user can specify preallocated storage for the diagonal part of
1214    the local submatrix with either d_nz or d_nnz (not both).  Set
1215    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1216    memory allocation.  Likewise, specify preallocated storage for the
1217    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1218 
1219    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1220    the figure below we depict these three local rows and all columns (0-11).
1221 
1222 $          0 1 2 3 4 5 6 7 8 9 10 11
1223 $         -------------------
1224 $  row 3  |  o o o d d d o o o o o o
1225 $  row 4  |  o o o d d d o o o o o o
1226 $  row 5  |  o o o d d d o o o o o o
1227 $         -------------------
1228 $
1229 
1230    Thus, any entries in the d locations are stored in the d (diagonal)
1231    submatrix, and any entries in the o locations are stored in the
1232    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1233    stored simply in the MATSEQBAIJ format for compressed row storage.
1234 
1235    Now d_nz should indicate the number of nonzeros per row in the d matrix,
1236    and o_nz should indicate the number of nonzeros per row in the o matrix.
1237    In general, for PDE problems in which most nonzeros are near the diagonal,
1238    one expects d_nz >> o_nz.   For large problems you MUST preallocate memory
1239    or you will get TERRIBLE performance; see the users' manual chapter on
1240    matrices.
1241 
1242 .keywords: matrix, block, aij, compressed row, sparse, parallel
1243 
1244 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues()
1245 @*/
1246 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
1247                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1248 {
1249   Mat          B;
1250   Mat_MPIBAIJ  *b;
1251   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE,size;
1252 
1253   if (bs < 1) SETERRQ(1,0,"invalid block size specified");
1254   *A = 0;
1255   PetscHeaderCreate(B,_Mat,MAT_COOKIE,MATMPIBAIJ,comm);
1256   PLogObjectCreate(B);
1257   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
1258   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
1259   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1260   MPI_Comm_size(comm,&size);
1261   if (size == 1) {
1262     B->ops.getrowij          = MatGetRowIJ_MPIBAIJ;
1263     B->ops.restorerowij      = MatRestoreRowIJ_MPIBAIJ;
1264     B->ops.lufactorsymbolic  = MatLUFactorSymbolic_MPIBAIJ;
1265     B->ops.lufactornumeric   = MatLUFactorNumeric_MPIBAIJ;
1266     B->ops.lufactor          = MatLUFactor_MPIBAIJ;
1267     B->ops.solve             = MatSolve_MPIBAIJ;
1268     B->ops.solveadd          = MatSolveAdd_MPIBAIJ;
1269     B->ops.solvetrans        = MatSolveTrans_MPIBAIJ;
1270     B->ops.solvetransadd     = MatSolveTransAdd_MPIBAIJ;
1271     B->ops.ilufactorsymbolic = MatILUFactorSymbolic_MPIBAIJ;
1272   }
1273 
1274   B->destroy    = MatDestroy_MPIBAIJ;
1275   B->view       = MatView_MPIBAIJ;
1276   B->mapping    = 0;
1277   B->factor     = 0;
1278   B->assembled  = PETSC_FALSE;
1279 
1280   b->insertmode = NOT_SET_VALUES;
1281   MPI_Comm_rank(comm,&b->rank);
1282   MPI_Comm_size(comm,&b->size);
1283 
1284   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
1285     SETERRQ(1,0,"Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1286   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) SETERRQ(1,0,"either M or m should be specified");
1287   if ( M == PETSC_DECIDE && n == PETSC_DECIDE)SETERRQ(1,0,"either N or n should be specified");
1288   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
1289   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
1290 
1291   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1292     work[0] = m; work[1] = n;
1293     mbs = m/bs; nbs = n/bs;
1294     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
1295     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
1296     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
1297   }
1298   if (m == PETSC_DECIDE) {
1299     Mbs = M/bs;
1300     if (Mbs*bs != M) SETERRQ(1,0,"No of global rows must be divisible by blocksize");
1301     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
1302     m   = mbs*bs;
1303   }
1304   if (n == PETSC_DECIDE) {
1305     Nbs = N/bs;
1306     if (Nbs*bs != N) SETERRQ(1,0,"No of global cols must be divisible by blocksize");
1307     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
1308     n   = nbs*bs;
1309   }
1310   if (mbs*bs != m || nbs*bs != n) SETERRQ(1,0,"No of local rows, cols must be divisible by blocksize");
1311 
1312   b->m = m; B->m = m;
1313   b->n = n; B->n = n;
1314   b->N = N; B->N = N;
1315   b->M = M; B->M = M;
1316   b->bs  = bs;
1317   b->bs2 = bs*bs;
1318   b->mbs = mbs;
1319   b->nbs = nbs;
1320   b->Mbs = Mbs;
1321   b->Nbs = Nbs;
1322 
1323   /* build local table of row and column ownerships */
1324   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1325   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
1326   b->cowners = b->rowners + b->size + 2;
1327   MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
1328   b->rowners[0] = 0;
1329   for ( i=2; i<=b->size; i++ ) {
1330     b->rowners[i] += b->rowners[i-1];
1331   }
1332   b->rstart    = b->rowners[b->rank];
1333   b->rend      = b->rowners[b->rank+1];
1334   b->rstart_bs = b->rstart * bs;
1335   b->rend_bs   = b->rend * bs;
1336 
1337   MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
1338   b->cowners[0] = 0;
1339   for ( i=2; i<=b->size; i++ ) {
1340     b->cowners[i] += b->cowners[i-1];
1341   }
1342   b->cstart    = b->cowners[b->rank];
1343   b->cend      = b->cowners[b->rank+1];
1344   b->cstart_bs = b->cstart * bs;
1345   b->cend_bs   = b->cend * bs;
1346 
1347   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1348   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1349   PLogObjectParent(B,b->A);
1350   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1351   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1352   PLogObjectParent(B,b->B);
1353 
1354   /* build cache for off array entries formed */
1355   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1356   b->donotstash  = 0;
1357   b->colmap      = 0;
1358   b->garray      = 0;
1359   b->roworiented = 1;
1360 
1361   /* stuff used for matrix vector multiply */
1362   b->lvec      = 0;
1363   b->Mvctx     = 0;
1364 
1365   /* stuff for MatGetRow() */
1366   b->rowindices   = 0;
1367   b->rowvalues    = 0;
1368   b->getrowactive = PETSC_FALSE;
1369 
1370   *A = B;
1371   return 0;
1372 }
1373 
1374 #undef __FUNCTION__
1375 #define __FUNCTION__ "MatConvertSameType_MPIBAIJ"
1376 static int MatConvertSameType_MPIBAIJ(Mat matin,Mat *newmat,int cpvalues)
1377 {
1378   Mat        mat;
1379   Mat_MPIBAIJ *a,*oldmat = (Mat_MPIBAIJ *) matin->data;
1380   int        ierr, len=0, flg;
1381 
1382   *newmat       = 0;
1383   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIBAIJ,matin->comm);
1384   PLogObjectCreate(mat);
1385   mat->data       = (void *) (a = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(a);
1386   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1387   mat->destroy    = MatDestroy_MPIBAIJ;
1388   mat->view       = MatView_MPIBAIJ;
1389   mat->factor     = matin->factor;
1390   mat->assembled  = PETSC_TRUE;
1391 
1392   a->m = mat->m   = oldmat->m;
1393   a->n = mat->n   = oldmat->n;
1394   a->M = mat->M   = oldmat->M;
1395   a->N = mat->N   = oldmat->N;
1396 
1397   a->bs  = oldmat->bs;
1398   a->bs2 = oldmat->bs2;
1399   a->mbs = oldmat->mbs;
1400   a->nbs = oldmat->nbs;
1401   a->Mbs = oldmat->Mbs;
1402   a->Nbs = oldmat->Nbs;
1403 
1404   a->rstart       = oldmat->rstart;
1405   a->rend         = oldmat->rend;
1406   a->cstart       = oldmat->cstart;
1407   a->cend         = oldmat->cend;
1408   a->size         = oldmat->size;
1409   a->rank         = oldmat->rank;
1410   a->insertmode   = NOT_SET_VALUES;
1411   a->rowvalues    = 0;
1412   a->getrowactive = PETSC_FALSE;
1413 
1414   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1415   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
1416   a->cowners = a->rowners + a->size + 2;
1417   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1418   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1419   if (oldmat->colmap) {
1420     a->colmap = (int *) PetscMalloc((a->Nbs)*sizeof(int));CHKPTRQ(a->colmap);
1421     PLogObjectMemory(mat,(a->Nbs)*sizeof(int));
1422     PetscMemcpy(a->colmap,oldmat->colmap,(a->Nbs)*sizeof(int));
1423   } else a->colmap = 0;
1424   if (oldmat->garray && (len = ((Mat_SeqBAIJ *) (oldmat->B->data))->nbs)) {
1425     a->garray = (int *) PetscMalloc(len*sizeof(int)); CHKPTRQ(a->garray);
1426     PLogObjectMemory(mat,len*sizeof(int));
1427     PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1428   } else a->garray = 0;
1429 
1430   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1431   PLogObjectParent(mat,a->lvec);
1432   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1433   PLogObjectParent(mat,a->Mvctx);
1434   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1435   PLogObjectParent(mat,a->A);
1436   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1437   PLogObjectParent(mat,a->B);
1438   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1439   if (flg) {
1440     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1441   }
1442   *newmat = mat;
1443   return 0;
1444 }
1445 
1446 #include "sys.h"
1447 
1448 #undef __FUNCTION__
1449 #define __FUNCTION__ "MatLoad_MPIBAIJ"
1450 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
1451 {
1452   Mat          A;
1453   int          i, nz, ierr, j,rstart, rend, fd;
1454   Scalar       *vals,*buf;
1455   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1456   MPI_Status   status;
1457   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
1458   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
1459   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,bs2,Mbs,mbs,extra_rows;
1460   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
1461   int          dcount,kmax,k,nzcount,tmp;
1462 
1463 
1464   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
1465   bs2  = bs*bs;
1466 
1467   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1468   if (!rank) {
1469     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1470     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
1471     if (header[0] != MAT_COOKIE) SETERRQ(1,0,"not matrix object");
1472   }
1473 
1474   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1475   M = header[1]; N = header[2];
1476 
1477   if (M != N) SETERRQ(1,0,"Can only do square matrices");
1478 
1479   /*
1480      This code adds extra rows to make sure the number of rows is
1481      divisible by the blocksize
1482   */
1483   Mbs        = M/bs;
1484   extra_rows = bs - M + bs*(Mbs);
1485   if (extra_rows == bs) extra_rows = 0;
1486   else                  Mbs++;
1487   if (extra_rows &&!rank) {
1488     PLogInfo(0,"MatLoad_MPIBAIJ:Padding loaded matrix to match blocksize\n");
1489   }
1490 
1491   /* determine ownership of all rows */
1492   mbs = Mbs/size + ((Mbs % size) > rank);
1493   m   = mbs * bs;
1494   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
1495   browners = rowners + size + 1;
1496   MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1497   rowners[0] = 0;
1498   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
1499   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
1500   rstart = rowners[rank];
1501   rend   = rowners[rank+1];
1502 
1503   /* distribute row lengths to all processors */
1504   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
1505   if (!rank) {
1506     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
1507     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1508     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
1509     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1510     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
1511     MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);
1512     PetscFree(sndcounts);
1513   }
1514   else {
1515     MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);
1516   }
1517 
1518   if (!rank) {
1519     /* calculate the number of nonzeros on each processor */
1520     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1521     PetscMemzero(procsnz,size*sizeof(int));
1522     for ( i=0; i<size; i++ ) {
1523       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
1524         procsnz[i] += rowlengths[j];
1525       }
1526     }
1527     PetscFree(rowlengths);
1528 
1529     /* determine max buffer needed and allocate it */
1530     maxnz = 0;
1531     for ( i=0; i<size; i++ ) {
1532       maxnz = PetscMax(maxnz,procsnz[i]);
1533     }
1534     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1535 
1536     /* read in my part of the matrix column indices  */
1537     nz = procsnz[0];
1538     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1539     mycols = ibuf;
1540     if (size == 1)  nz -= extra_rows;
1541     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1542     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
1543 
1544     /* read in every ones (except the last) and ship off */
1545     for ( i=1; i<size-1; i++ ) {
1546       nz = procsnz[i];
1547       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1548       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1549     }
1550     /* read in the stuff for the last proc */
1551     if ( size != 1 ) {
1552       nz = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
1553       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1554       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
1555       MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);
1556     }
1557     PetscFree(cols);
1558   }
1559   else {
1560     /* determine buffer space needed for message */
1561     nz = 0;
1562     for ( i=0; i<m; i++ ) {
1563       nz += locrowlens[i];
1564     }
1565     ibuf = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1566     mycols = ibuf;
1567     /* receive message of column indices*/
1568     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1569     MPI_Get_count(&status,MPI_INT,&maxnz);
1570     if (maxnz != nz) SETERRQ(1,0,"something is wrong with file");
1571   }
1572 
1573   /* loop over local rows, determining number of off diagonal entries */
1574   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
1575   odlens = dlens + (rend-rstart);
1576   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
1577   PetscMemzero(mask,3*Mbs*sizeof(int));
1578   masked1 = mask    + Mbs;
1579   masked2 = masked1 + Mbs;
1580   rowcount = 0; nzcount = 0;
1581   for ( i=0; i<mbs; i++ ) {
1582     dcount  = 0;
1583     odcount = 0;
1584     for ( j=0; j<bs; j++ ) {
1585       kmax = locrowlens[rowcount];
1586       for ( k=0; k<kmax; k++ ) {
1587         tmp = mycols[nzcount++]/bs;
1588         if (!mask[tmp]) {
1589           mask[tmp] = 1;
1590           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
1591           else masked1[dcount++] = tmp;
1592         }
1593       }
1594       rowcount++;
1595     }
1596 
1597     dlens[i]  = dcount;
1598     odlens[i] = odcount;
1599 
1600     /* zero out the mask elements we set */
1601     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
1602     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
1603   }
1604 
1605   /* create our matrix */
1606   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);
1607          CHKERRQ(ierr);
1608   A = *newmat;
1609   MatSetOption(A,MAT_COLUMNS_SORTED);
1610 
1611   if (!rank) {
1612     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
1613     /* read in my part of the matrix numerical values  */
1614     nz = procsnz[0];
1615     vals = buf;
1616     mycols = ibuf;
1617     if (size == 1)  nz -= extra_rows;
1618     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1619     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
1620 
1621     /* insert into matrix */
1622     jj      = rstart*bs;
1623     for ( i=0; i<m; i++ ) {
1624       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1625       mycols += locrowlens[i];
1626       vals   += locrowlens[i];
1627       jj++;
1628     }
1629     /* read in other processors (except the last one) and ship out */
1630     for ( i=1; i<size-1; i++ ) {
1631       nz = procsnz[i];
1632       vals = buf;
1633       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1634       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1635     }
1636     /* the last proc */
1637     if ( size != 1 ){
1638       nz = procsnz[i] - extra_rows;
1639       vals = buf;
1640       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1641       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
1642       MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);
1643     }
1644     PetscFree(procsnz);
1645   }
1646   else {
1647     /* receive numeric values */
1648     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
1649 
1650     /* receive message of values*/
1651     vals = buf;
1652     mycols = ibuf;
1653     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1654     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1655     if (maxnz != nz) SETERRQ(1,0,"something is wrong with file");
1656 
1657     /* insert into matrix */
1658     jj      = rstart*bs;
1659     for ( i=0; i<m; i++ ) {
1660       ierr    = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1661       mycols += locrowlens[i];
1662       vals   += locrowlens[i];
1663       jj++;
1664     }
1665   }
1666   PetscFree(locrowlens);
1667   PetscFree(buf);
1668   PetscFree(ibuf);
1669   PetscFree(rowners);
1670   PetscFree(dlens);
1671   PetscFree(mask);
1672   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1673   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1674   return 0;
1675 }
1676 
1677 
1678