xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision 026e39d04bcf2a445c0a9332c845c324154b2e8f)
1 #ifndef lint
2 static char vcid[] = "$Id: mpibaij.c,v 1.41 1996/12/07 00:50:45 balay Exp balay $";
3 #endif
4 
5 #include "src/mat/impls/baij/mpi/mpibaij.h"
6 #include "src/vec/vecimpl.h"
7 
8 #include "draw.h"
9 #include "pinclude/pviewer.h"
10 
11 extern int MatSetUpMultiply_MPIBAIJ(Mat);
12 extern int DisAssemble_MPIBAIJ(Mat);
13 extern int MatIncreaseOverlap_MPIBAIJ(Mat,int,IS *,int);
14 extern int MatGetSubMatrices_MPIBAIJ(Mat,int,IS *,IS *,MatGetSubMatrixCall,Mat **);
15 extern int MatLUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,Mat *);
16 extern int MatLUFactorNumeric_MPIBAIJ(Mat,Mat *);
17 extern int MatLUFactor_MPIBAIJ(Mat,IS,IS,double);
18 extern int MatSolve_MPIBAIJ(Mat,Vec,Vec);
19 extern int MatSolveAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
20 extern int MatSolveTrans_MPIBAIJ(Mat,Vec,Vec);
21 extern int MatSolveTransAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
22 extern int MatILUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,int,Mat *);
23 
24 
25 /*
26      Local utility routine that creates a mapping from the global column
27    number to the local number in the off-diagonal part of the local
28    storage of the matrix.  This is done in a non scable way since the
29    length of colmap equals the global matrix length.
30 */
31 #undef __FUNCTION__
32 #define __FUNCTION__ "CreateColmap_MPIBAIJ_Private"
33 static int CreateColmap_MPIBAIJ_Private(Mat mat)
34 {
35   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
36   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
37   int         nbs = B->nbs,i,bs=B->bs;;
38 
39   baij->colmap = (int *) PetscMalloc(baij->Nbs*sizeof(int));CHKPTRQ(baij->colmap);
40   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
41   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
42   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i*bs+1;
43   return 0;
44 }
45 
46 #undef __FUNCTION__
47 #define __FUNCTION__ "MatGetRowIJ_MPIBAIJ("
48 static int MatGetRowIJ_MPIBAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
49                             PetscTruth *done)
50 {
51   Mat_MPIBAIJ *aij = (Mat_MPIBAIJ *) mat->data;
52   int         ierr;
53   if (aij->size == 1) {
54     ierr = MatGetRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
55   } else SETERRQ(1,"MatGetRowIJ_MPIBAIJ:not supported in parallel");
56   return 0;
57 }
58 
59 #undef __FUNCTION__
60 #define __FUNCTION__ "MatRestoreRowIJ_MPIBAIJ"
61 static int MatRestoreRowIJ_MPIBAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
62                                 PetscTruth *done)
63 {
64   Mat_MPIBAIJ *aij = (Mat_MPIBAIJ *) mat->data;
65   int        ierr;
66   if (aij->size == 1) {
67     ierr = MatRestoreRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
68   } else SETERRQ(1,"MatRestoreRowIJ_MPIBAIJ:not supported in parallel");
69   return 0;
70 }
71 
72 extern int MatSetValues_SeqBAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
73 #undef __FUNCTION__
74 #define __FUNCTION__ "MatSetValues_MPIBAIJ"
75 static int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
76 {
77   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
78   Scalar      value;
79   int         ierr,i,j,row,col;
80   int         roworiented = baij->roworiented,rstart_orig=baij->rstart_bs ;
81   int         rend_orig=baij->rend_bs,cstart_orig=baij->cstart_bs;
82   int         cend_orig=baij->cend_bs,bs=baij->bs;
83 
84 #if defined(PETSC_BOPT_g)
85   if (baij->insertmode != NOT_SET_VALUES && baij->insertmode != addv) {
86     SETERRQ(1,"MatSetValues_MPIBAIJ:Cannot mix inserts and adds");
87   }
88 #endif
89   baij->insertmode = addv;
90   for ( i=0; i<m; i++ ) {
91 #if defined(PETSC_BOPT_g)
92     if (im[i] < 0) SETERRQ(1,"MatSetValues_MPIBAIJ:Negative row");
93     if (im[i] >= baij->M) SETERRQ(1,"MatSetValues_MPIBAIJ:Row too large");
94 #endif
95     if (im[i] >= rstart_orig && im[i] < rend_orig) {
96       row = im[i] - rstart_orig;
97       for ( j=0; j<n; j++ ) {
98         if (in[j] >= cstart_orig && in[j] < cend_orig){
99           col = in[j] - cstart_orig;
100           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
101           ierr = MatSetValues_SeqBAIJ(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
102         }
103 #if defined(PETSC_BOPT_g)
104         else if (in[j] < 0) {SETERRQ(1,"MatSetValues_MPIBAIJ:Negative column");}
105         else if (in[j] >= baij->N) {SETERRQ(1,"MatSetValues_MPIBAIJ:Col too large");}
106 #endif
107         else {
108           if (mat->was_assembled) {
109             if (!baij->colmap) {
110               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
111             }
112             col = baij->colmap[in[j]/bs] - 1 + in[j]%bs;
113             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
114               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
115               col =  in[j];
116             }
117           }
118           else col = in[j];
119           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
120           ierr = MatSetValues_SeqBAIJ(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
121         }
122       }
123     }
124     else {
125       if (roworiented && !baij->donotstash) {
126         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
127       }
128       else {
129         if (!baij->donotstash) {
130           row = im[i];
131 	  for ( j=0; j<n; j++ ) {
132 	    ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
133           }
134         }
135       }
136     }
137   }
138   return 0;
139 }
140 
141 #undef __FUNCTION__
142 #define __FUNCTION__ "MatGetValues_MPIBAIJ"
143 static int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
144 {
145   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
146   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
147   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col;
148 
149   for ( i=0; i<m; i++ ) {
150     if (idxm[i] < 0) SETERRQ(1,"MatGetValues_MPIBAIJ:Negative row");
151     if (idxm[i] >= baij->M) SETERRQ(1,"MatGetValues_MPIBAIJ:Row too large");
152     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
153       row = idxm[i] - bsrstart;
154       for ( j=0; j<n; j++ ) {
155         if (idxn[j] < 0) SETERRQ(1,"MatGetValues_MPIBAIJ:Negative column");
156         if (idxn[j] >= baij->N) SETERRQ(1,"MatGetValues_MPIBAIJ:Col too large");
157         if (idxn[j] >= bscstart && idxn[j] < bscend){
158           col = idxn[j] - bscstart;
159           ierr = MatGetValues(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
160         }
161         else {
162           if (!baij->colmap) {
163             ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
164           }
165           if((baij->colmap[idxn[j]/bs]-1 < 0) ||
166              (baij->garray[(baij->colmap[idxn[j]/bs]-1)/bs] != idxn[j]/bs)) *(v+i*n+j) = 0.0;
167           else {
168             col  = (baij->colmap[idxn[j]/bs]-1) + idxn[j]%bs;
169             ierr = MatGetValues(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
170           }
171         }
172       }
173     }
174     else {
175       SETERRQ(1,"MatGetValues_MPIBAIJ:Only local values currently supported");
176     }
177   }
178   return 0;
179 }
180 
181 #undef __FUNCTION__
182 #define __FUNCTION__ "MatNorm_MPIBAIJ"
183 static int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
184 {
185   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
186   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
187   int        ierr, i,bs2=baij->bs2;
188   double     sum = 0.0;
189   Scalar     *v;
190 
191   if (baij->size == 1) {
192     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
193   } else {
194     if (type == NORM_FROBENIUS) {
195       v = amat->a;
196       for (i=0; i<amat->nz*bs2; i++ ) {
197 #if defined(PETSC_COMPLEX)
198         sum += real(conj(*v)*(*v)); v++;
199 #else
200         sum += (*v)*(*v); v++;
201 #endif
202       }
203       v = bmat->a;
204       for (i=0; i<bmat->nz*bs2; i++ ) {
205 #if defined(PETSC_COMPLEX)
206         sum += real(conj(*v)*(*v)); v++;
207 #else
208         sum += (*v)*(*v); v++;
209 #endif
210       }
211       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
212       *norm = sqrt(*norm);
213     }
214     else
215       SETERRQ(PETSC_ERR_SUP,"MatNorm_MPIBAIJ:No support for this norm yet");
216   }
217   return 0;
218 }
219 
220 #undef __FUNCTION__
221 #define __FUNCTION__ "MatAssemblyBegin_MPIBAIJ"
222 static int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
223 {
224   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
225   MPI_Comm    comm = mat->comm;
226   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
227   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
228   MPI_Request *send_waits,*recv_waits;
229   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
230   InsertMode  addv;
231   Scalar      *rvalues,*svalues;
232 
233   /* make sure all processors are either in INSERTMODE or ADDMODE */
234   MPI_Allreduce(&baij->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
235   if (addv == (ADD_VALUES|INSERT_VALUES)) {
236     SETERRQ(1,"MatAssemblyBegin_MPIBAIJ:Some processors inserted others added");
237   }
238   baij->insertmode = addv; /* in case this processor had no cache */
239 
240   /*  first count number of contributors to each processor */
241   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
242   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
243   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
244   for ( i=0; i<baij->stash.n; i++ ) {
245     idx = baij->stash.idx[i];
246     for ( j=0; j<size; j++ ) {
247       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
248         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
249       }
250     }
251   }
252   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
253 
254   /* inform other processors of number of messages and max length*/
255   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
256   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
257   nreceives = work[rank];
258   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
259   nmax = work[rank];
260   PetscFree(work);
261 
262   /* post receives:
263        1) each message will consist of ordered pairs
264      (global index,value) we store the global index as a double
265      to simplify the message passing.
266        2) since we don't know how long each individual message is we
267      allocate the largest needed buffer for each receive. Potentially
268      this is a lot of wasted space.
269 
270 
271        This could be done better.
272   */
273   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
274   CHKPTRQ(rvalues);
275   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
276   CHKPTRQ(recv_waits);
277   for ( i=0; i<nreceives; i++ ) {
278     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
279               comm,recv_waits+i);
280   }
281 
282   /* do sends:
283       1) starts[i] gives the starting index in svalues for stuff going to
284          the ith processor
285   */
286   svalues = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
287   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
288   CHKPTRQ(send_waits);
289   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
290   starts[0] = 0;
291   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
292   for ( i=0; i<baij->stash.n; i++ ) {
293     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
294     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
295     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
296   }
297   PetscFree(owner);
298   starts[0] = 0;
299   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
300   count = 0;
301   for ( i=0; i<size; i++ ) {
302     if (procs[i]) {
303       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
304                 comm,send_waits+count++);
305     }
306   }
307   PetscFree(starts); PetscFree(nprocs);
308 
309   /* Free cache space */
310   PLogInfo(mat,"MatAssemblyBegin_MPIBAIJ:Number of off-processor values %d\n",baij->stash.n);
311   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
312 
313   baij->svalues    = svalues;    baij->rvalues    = rvalues;
314   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
315   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
316   baij->rmax       = nmax;
317 
318   return 0;
319 }
320 
321 
322 #undef __FUNCTION__
323 #define __FUNCTION__ "MatAssemblyEnd_MPIBAIJ"
324 static int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
325 {
326   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
327   MPI_Status  *send_status,recv_status;
328   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
329   int         bs=baij->bs,row,col,other_disassembled;
330   Scalar      *values,val;
331   InsertMode  addv = baij->insertmode;
332 
333   /*  wait on receives */
334   while (count) {
335     MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);
336     /* unpack receives into our local space */
337     values = baij->rvalues + 3*imdex*baij->rmax;
338     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
339     n = n/3;
340     for ( i=0; i<n; i++ ) {
341       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
342       col = (int) PetscReal(values[3*i+1]);
343       val = values[3*i+2];
344       if (col >= baij->cstart*bs && col < baij->cend*bs) {
345         col -= baij->cstart*bs;
346         MatSetValues(baij->A,1,&row,1,&col,&val,addv);
347       }
348       else {
349         if (mat->was_assembled) {
350           if (!baij->colmap) {
351             ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);
352           }
353           col = (baij->colmap[col/bs]-1)*bs + col%bs;
354           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
355             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
356             col = (int) PetscReal(values[3*i+1]);
357           }
358         }
359         MatSetValues(baij->B,1,&row,1,&col,&val,addv);
360       }
361     }
362     count--;
363   }
364   PetscFree(baij->recv_waits); PetscFree(baij->rvalues);
365 
366   /* wait on sends */
367   if (baij->nsends) {
368     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));
369     CHKPTRQ(send_status);
370     MPI_Waitall(baij->nsends,baij->send_waits,send_status);
371     PetscFree(send_status);
372   }
373   PetscFree(baij->send_waits); PetscFree(baij->svalues);
374 
375   baij->insertmode = NOT_SET_VALUES;
376   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
377   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
378 
379   /* determine if any processor has disassembled, if so we must
380      also disassemble ourselfs, in order that we may reassemble. */
381   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
382   if (mat->was_assembled && !other_disassembled) {
383     ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
384   }
385 
386   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
387     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
388   }
389   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
390   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
391 
392   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
393   return 0;
394 }
395 
396 #undef __FUNCTION__
397 #define __FUNCTION__ "MatView_MPIBAIJ_Binary"
398 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
399 {
400   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
401   int          ierr;
402 
403   if (baij->size == 1) {
404     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
405   }
406   else SETERRQ(1,"MatView_MPIBAIJ_Binary:Only uniprocessor output supported");
407   return 0;
408 }
409 
410 #undef __FUNCTION__
411 #define __FUNCTION__ "MatView_MPIBAIJ_ASCIIorDraworMatlab"
412 static int MatView_MPIBAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
413 {
414   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
415   int          ierr, format,rank,bs = baij->bs;
416   FILE         *fd;
417   ViewerType   vtype;
418 
419   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
420   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
421     ierr = ViewerGetFormat(viewer,&format);
422     if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
423       MatInfo info;
424       MPI_Comm_rank(mat->comm,&rank);
425       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
426       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
427       PetscSequentialPhaseBegin(mat->comm,1);
428       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
429               rank,baij->m,(int)info.nz_used*bs,(int)info.nz_allocated*bs,
430               baij->bs,(int)info.memory);
431       ierr = MatGetInfo(baij->A,MAT_LOCAL,&info);
432       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
433       ierr = MatGetInfo(baij->B,MAT_LOCAL,&info);
434       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
435       fflush(fd);
436       PetscSequentialPhaseEnd(mat->comm,1);
437       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
438       return 0;
439     }
440     else if (format == VIEWER_FORMAT_ASCII_INFO) {
441       PetscPrintf(mat->comm,"  block size is %d\n",bs);
442       return 0;
443     }
444   }
445 
446   if (vtype == DRAW_VIEWER) {
447     Draw       draw;
448     PetscTruth isnull;
449     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
450     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
451   }
452 
453   if (vtype == ASCII_FILE_VIEWER) {
454     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
455     PetscSequentialPhaseBegin(mat->comm,1);
456     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
457            baij->rank,baij->m,baij->rstart*bs,baij->rend*bs,baij->n,
458             baij->cstart*bs,baij->cend*bs);
459     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
460     ierr = MatView(baij->B,viewer); CHKERRQ(ierr);
461     fflush(fd);
462     PetscSequentialPhaseEnd(mat->comm,1);
463   }
464   else {
465     int size = baij->size;
466     rank = baij->rank;
467     if (size == 1) {
468       ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
469     }
470     else {
471       /* assemble the entire matrix onto first processor. */
472       Mat         A;
473       Mat_SeqBAIJ *Aloc;
474       int         M = baij->M, N = baij->N,*ai,*aj,row,col,i,j,k,*rvals;
475       int         mbs=baij->mbs;
476       Scalar      *a;
477 
478       if (!rank) {
479         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
480         CHKERRQ(ierr);
481       }
482       else {
483         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
484         CHKERRQ(ierr);
485       }
486       PLogObjectParent(mat,A);
487 
488       /* copy over the A part */
489       Aloc = (Mat_SeqBAIJ*) baij->A->data;
490       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
491       row = baij->rstart;
492       rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
493 
494       for ( i=0; i<mbs; i++ ) {
495         rvals[0] = bs*(baij->rstart + i);
496         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
497         for ( j=ai[i]; j<ai[i+1]; j++ ) {
498           col = (baij->cstart+aj[j])*bs;
499           for (k=0; k<bs; k++ ) {
500             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
501             col++; a += bs;
502           }
503         }
504       }
505       /* copy over the B part */
506       Aloc = (Mat_SeqBAIJ*) baij->B->data;
507       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
508       row = baij->rstart*bs;
509       for ( i=0; i<mbs; i++ ) {
510         rvals[0] = bs*(baij->rstart + i);
511         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
512         for ( j=ai[i]; j<ai[i+1]; j++ ) {
513           col = baij->garray[aj[j]]*bs;
514           for (k=0; k<bs; k++ ) {
515             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
516             col++; a += bs;
517           }
518         }
519       }
520       PetscFree(rvals);
521       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
522       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
523       if (!rank) {
524         ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
525       }
526       ierr = MatDestroy(A); CHKERRQ(ierr);
527     }
528   }
529   return 0;
530 }
531 
532 
533 
534 #undef __FUNCTION__
535 #define __FUNCTION__ "MatView_MPIBAIJ"
536 static int MatView_MPIBAIJ(PetscObject obj,Viewer viewer)
537 {
538   Mat         mat = (Mat) obj;
539   int         ierr;
540   ViewerType  vtype;
541 
542   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
543   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
544       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
545     ierr = MatView_MPIBAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
546   }
547   else if (vtype == BINARY_FILE_VIEWER) {
548     return MatView_MPIBAIJ_Binary(mat,viewer);
549   }
550   return 0;
551 }
552 
553 #undef __FUNCTION__
554 #define __FUNCTION__ "MatDestroy_MPIBAIJ"
555 static int MatDestroy_MPIBAIJ(PetscObject obj)
556 {
557   Mat         mat = (Mat) obj;
558   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
559   int         ierr;
560 
561 #if defined(PETSC_LOG)
562   PLogObjectState(obj,"Rows=%d, Cols=%d",baij->M,baij->N);
563 #endif
564 
565   PetscFree(baij->rowners);
566   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
567   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
568   if (baij->colmap) PetscFree(baij->colmap);
569   if (baij->garray) PetscFree(baij->garray);
570   if (baij->lvec)   VecDestroy(baij->lvec);
571   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
572   if (baij->rowvalues) PetscFree(baij->rowvalues);
573   PetscFree(baij);
574   if (mat->mapping) {
575     ierr = ISLocalToGlobalMappingDestroy(mat->mapping); CHKERRQ(ierr);
576   }
577   PLogObjectDestroy(mat);
578   PetscHeaderDestroy(mat);
579   return 0;
580 }
581 
582 #undef __FUNCTION__
583 #define __FUNCTION__ "MatMult_MPIBAIJ"
584 static int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
585 {
586   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
587   int         ierr, nt;
588 
589   VecGetLocalSize_Fast(xx,nt);
590   if (nt != a->n) {
591     SETERRQ(1,"MatMult_MPIBAIJ:Incompatible parition of A and xx");
592   }
593   VecGetLocalSize_Fast(yy,nt);
594   if (nt != a->m) {
595     SETERRQ(1,"MatMult_MPIBAIJ:Incompatible parition of A and yy");
596   }
597   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
598   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
599   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
600   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
601   ierr = VecScatterPostRecvs(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
602   return 0;
603 }
604 
605 #undef __FUNCTION__
606 #define __FUNCTION__ "MatMultAdd_MPIBAIJ"
607 static int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
608 {
609   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
610   int        ierr;
611   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
612   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
613   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
614   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
615   return 0;
616 }
617 
618 #undef __FUNCTION__
619 #define __FUNCTION__ "MatMultTrans_MPIBAIJ"
620 static int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
621 {
622   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
623   int        ierr;
624 
625   /* do nondiagonal part */
626   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
627   /* send it on its way */
628   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
629   /* do local part */
630   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
631   /* receive remote parts: note this assumes the values are not actually */
632   /* inserted in yy until the next line, which is true for my implementation*/
633   /* but is not perhaps always true. */
634   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
635   return 0;
636 }
637 
638 #undef __FUNCTION__
639 #define __FUNCTION__ "MatMultTransAdd_MPIBAIJ"
640 static int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
641 {
642   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
643   int        ierr;
644 
645   /* do nondiagonal part */
646   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
647   /* send it on its way */
648   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
649   /* do local part */
650   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
651   /* receive remote parts: note this assumes the values are not actually */
652   /* inserted in yy until the next line, which is true for my implementation*/
653   /* but is not perhaps always true. */
654   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
655   return 0;
656 }
657 
658 /*
659   This only works correctly for square matrices where the subblock A->A is the
660    diagonal block
661 */
662 #undef __FUNCTION__
663 #define __FUNCTION__ "MatGetDiagonal_MPIBAIJ"
664 static int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
665 {
666   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
667   if (a->M != a->N)
668     SETERRQ(1,"MatGetDiagonal_MPIBAIJ:Supports only square matrix where A->A is diag block");
669   return MatGetDiagonal(a->A,v);
670 }
671 
672 #undef __FUNCTION__
673 #define __FUNCTION__ "MatScale_MPIBAIJ"
674 static int MatScale_MPIBAIJ(Scalar *aa,Mat A)
675 {
676   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
677   int        ierr;
678   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
679   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
680   return 0;
681 }
682 
683 #undef __FUNCTION__
684 #define __FUNCTION__ "MatGetSize_MPIBAIJ"
685 static int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
686 {
687   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
688   *m = mat->M; *n = mat->N;
689   return 0;
690 }
691 
692 #undef __FUNCTION__
693 #define __FUNCTION__ "MatGetLocalSize_MPIBAIJ"
694 static int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
695 {
696   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
697   *m = mat->m; *n = mat->N;
698   return 0;
699 }
700 
701 #undef __FUNCTION__
702 #define __FUNCTION__ "MatGetOwnershipRange_MPIBAIJ"
703 static int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
704 {
705   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
706   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
707   return 0;
708 }
709 
710 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
711 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
712 
713 #undef __FUNCTION__
714 #define __FUNCTION__ "MatGetRow_MPIBAIJ"
715 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
716 {
717   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
718   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
719   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
720   int        nztot, nzA, nzB, lrow, brstart = mat->rstart*bs, brend = mat->rend*bs;
721   int        *cmap, *idx_p,cstart = mat->cstart;
722 
723   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,"MatGetRow_MPIBAIJ:Already active");
724   mat->getrowactive = PETSC_TRUE;
725 
726   if (!mat->rowvalues && (idx || v)) {
727     /*
728         allocate enough space to hold information from the longest row.
729     */
730     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
731     int     max = 1,mbs = mat->mbs,tmp;
732     for ( i=0; i<mbs; i++ ) {
733       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
734       if (max < tmp) { max = tmp; }
735     }
736     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
737     CHKPTRQ(mat->rowvalues);
738     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
739   }
740 
741 
742   if (row < brstart || row >= brend) SETERRQ(1,"MatGetRow_MPIBAIJ:Only local rows")
743   lrow = row - brstart;
744 
745   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
746   if (!v)   {pvA = 0; pvB = 0;}
747   if (!idx) {pcA = 0; if (!v) pcB = 0;}
748   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
749   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
750   nztot = nzA + nzB;
751 
752   cmap  = mat->garray;
753   if (v  || idx) {
754     if (nztot) {
755       /* Sort by increasing column numbers, assuming A and B already sorted */
756       int imark = -1;
757       if (v) {
758         *v = v_p = mat->rowvalues;
759         for ( i=0; i<nzB; i++ ) {
760           if (cmap[cworkB[i]/bs] < cstart)   v_p[i] = vworkB[i];
761           else break;
762         }
763         imark = i;
764         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
765         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
766       }
767       if (idx) {
768         *idx = idx_p = mat->rowindices;
769         if (imark > -1) {
770           for ( i=0; i<imark; i++ ) {
771             idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs;
772           }
773         } else {
774           for ( i=0; i<nzB; i++ ) {
775             if (cmap[cworkB[i]/bs] < cstart)
776               idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
777             else break;
778           }
779           imark = i;
780         }
781         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart*bs + cworkA[i];
782         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
783       }
784     }
785     else {
786       if (idx) *idx = 0;
787       if (v)   *v   = 0;
788     }
789   }
790   *nz = nztot;
791   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
792   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
793   return 0;
794 }
795 
796 #undef __FUNCTION__
797 #define __FUNCTION__ "MatRestoreRow_MPIBAIJ"
798 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
799 {
800   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
801   if (baij->getrowactive == PETSC_FALSE) {
802     SETERRQ(1,"MatRestoreRow_MPIBAIJ:MatGetRow not called");
803   }
804   baij->getrowactive = PETSC_FALSE;
805   return 0;
806 }
807 
808 #undef __FUNCTION__
809 #define __FUNCTION__ "MatGetBlockSize_MPIBAIJ"
810 static int MatGetBlockSize_MPIBAIJ(Mat mat,int *bs)
811 {
812   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
813   *bs = baij->bs;
814   return 0;
815 }
816 
817 #undef __FUNCTION__
818 #define __FUNCTION__ "MatZeroEntries_MPIBAIJ"
819 static int MatZeroEntries_MPIBAIJ(Mat A)
820 {
821   Mat_MPIBAIJ *l = (Mat_MPIBAIJ *) A->data;
822   int         ierr;
823   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
824   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
825   return 0;
826 }
827 
828 #undef __FUNCTION__
829 #define __FUNCTION__ "MatGetInfo_MPIBAIJ"
830 static int MatGetInfo_MPIBAIJ(Mat matin,MatInfoType flag,MatInfo *info)
831 {
832   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) matin->data;
833   Mat         A = a->A, B = a->B;
834   int         ierr;
835   double      isend[5], irecv[5];
836 
837   info->rows_global    = (double)a->M;
838   info->columns_global = (double)a->N;
839   info->rows_local     = (double)a->m;
840   info->columns_local  = (double)a->N;
841   info->block_size     = (double)a->bs;
842   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
843   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->memory;
844   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
845   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->memory;
846   if (flag == MAT_LOCAL) {
847     info->nz_used      = isend[0];
848     info->nz_allocated = isend[1];
849     info->nz_unneeded  = isend[2];
850     info->memory       = isend[3];
851     info->mallocs      = isend[4];
852   } else if (flag == MAT_GLOBAL_MAX) {
853     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_MAX,matin->comm);
854     info->nz_used      = irecv[0];
855     info->nz_allocated = irecv[1];
856     info->nz_unneeded  = irecv[2];
857     info->memory       = irecv[3];
858     info->mallocs      = irecv[4];
859   } else if (flag == MAT_GLOBAL_SUM) {
860     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_SUM,matin->comm);
861     info->nz_used      = irecv[0];
862     info->nz_allocated = irecv[1];
863     info->nz_unneeded  = irecv[2];
864     info->memory       = irecv[3];
865     info->mallocs      = irecv[4];
866   }
867   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
868   info->fill_ratio_needed = 0;
869   info->factor_mallocs    = 0;
870   return 0;
871 }
872 
873 #undef __FUNCTION__
874 #define __FUNCTION__ "MatSetOption_MPIBAIJ"
875 static int MatSetOption_MPIBAIJ(Mat A,MatOption op)
876 {
877   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
878 
879   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
880       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
881       op == MAT_COLUMNS_UNSORTED ||
882       op == MAT_COLUMNS_SORTED) {
883         MatSetOption(a->A,op);
884         MatSetOption(a->B,op);
885   } else if (op == MAT_ROW_ORIENTED) {
886         a->roworiented = 1;
887         MatSetOption(a->A,op);
888         MatSetOption(a->B,op);
889   } else if (op == MAT_ROWS_SORTED ||
890              op == MAT_ROWS_UNSORTED ||
891              op == MAT_SYMMETRIC ||
892              op == MAT_STRUCTURALLY_SYMMETRIC ||
893              op == MAT_YES_NEW_DIAGONALS)
894     PLogInfo(A,"Info:MatSetOption_MPIBAIJ:Option ignored\n");
895   else if (op == MAT_COLUMN_ORIENTED) {
896     a->roworiented = 0;
897     MatSetOption(a->A,op);
898     MatSetOption(a->B,op);
899   } else if (op == MAT_IGNORE_OFF_PROCESSOR_ENTRIES) {
900     a->donotstash = 1;
901   } else if (op == MAT_NO_NEW_DIAGONALS)
902     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIBAIJ:MAT_NO_NEW_DIAGONALS");}
903   else
904     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIBAIJ:unknown option");}
905   return 0;
906 }
907 
908 #undef __FUNCTION__
909 #define __FUNCTION__ "MatTranspose_MPIBAIJ("
910 static int MatTranspose_MPIBAIJ(Mat A,Mat *matout)
911 {
912   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) A->data;
913   Mat_SeqBAIJ *Aloc;
914   Mat        B;
915   int        ierr,M=baij->M,N=baij->N,*ai,*aj,row,i,*rvals,j,k,col;
916   int        bs=baij->bs,mbs=baij->mbs;
917   Scalar     *a;
918 
919   if (matout == PETSC_NULL && M != N)
920     SETERRQ(1,"MatTranspose_MPIBAIJ:Square matrix only for in-place");
921   ierr = MatCreateMPIBAIJ(A->comm,baij->bs,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);
922   CHKERRQ(ierr);
923 
924   /* copy over the A part */
925   Aloc = (Mat_SeqBAIJ*) baij->A->data;
926   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
927   row = baij->rstart;
928   rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
929 
930   for ( i=0; i<mbs; i++ ) {
931     rvals[0] = bs*(baij->rstart + i);
932     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
933     for ( j=ai[i]; j<ai[i+1]; j++ ) {
934       col = (baij->cstart+aj[j])*bs;
935       for (k=0; k<bs; k++ ) {
936         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
937         col++; a += bs;
938       }
939     }
940   }
941   /* copy over the B part */
942   Aloc = (Mat_SeqBAIJ*) baij->B->data;
943   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
944   row = baij->rstart*bs;
945   for ( i=0; i<mbs; i++ ) {
946     rvals[0] = bs*(baij->rstart + i);
947     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
948     for ( j=ai[i]; j<ai[i+1]; j++ ) {
949       col = baij->garray[aj[j]]*bs;
950       for (k=0; k<bs; k++ ) {
951         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
952         col++; a += bs;
953       }
954     }
955   }
956   PetscFree(rvals);
957   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
958   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
959 
960   if (matout != PETSC_NULL) {
961     *matout = B;
962   } else {
963     /* This isn't really an in-place transpose .... but free data structures from baij */
964     PetscFree(baij->rowners);
965     ierr = MatDestroy(baij->A); CHKERRQ(ierr);
966     ierr = MatDestroy(baij->B); CHKERRQ(ierr);
967     if (baij->colmap) PetscFree(baij->colmap);
968     if (baij->garray) PetscFree(baij->garray);
969     if (baij->lvec) VecDestroy(baij->lvec);
970     if (baij->Mvctx) VecScatterDestroy(baij->Mvctx);
971     PetscFree(baij);
972     PetscMemcpy(A,B,sizeof(struct _Mat));
973     PetscHeaderDestroy(B);
974   }
975   return 0;
976 }
977 
978 #undef __FUNCTION__
979 #define __FUNCTION__ "MatDiagonalScale_MPIBAIJ"
980 int MatDiagonalScale_MPIBAIJ(Mat A,Vec ll,Vec rr)
981 {
982   Mat a = ((Mat_MPIBAIJ *) A->data)->A;
983   Mat b = ((Mat_MPIBAIJ *) A->data)->B;
984   int ierr,s1,s2,s3;
985 
986   if (ll)  {
987     ierr = VecGetLocalSize(ll,&s1); CHKERRQ(ierr);
988     ierr = MatGetLocalSize(A,&s2,&s3); CHKERRQ(ierr);
989     if (s1!=s2) SETERRQ(1,"MatDiagonalScale_MPIBAIJ: non-conforming local sizes");
990     ierr = MatDiagonalScale(a,ll,0); CHKERRQ(ierr);
991     ierr = MatDiagonalScale(b,ll,0); CHKERRQ(ierr);
992   }
993   if (rr) SETERRQ(1,"MatDiagonalScale_MPIBAIJ:not supported for right vector");
994   return 0;
995 }
996 
997 /* the code does not do the diagonal entries correctly unless the
998    matrix is square and the column and row owerships are identical.
999    This is a BUG. The only way to fix it seems to be to access
1000    baij->A and baij->B directly and not through the MatZeroRows()
1001    routine.
1002 */
1003 #undef __FUNCTION__
1004 #define __FUNCTION__ "MatZeroRows_MPIBAIJ"
1005 static int MatZeroRows_MPIBAIJ(Mat A,IS is,Scalar *diag)
1006 {
1007   Mat_MPIBAIJ    *l = (Mat_MPIBAIJ *) A->data;
1008   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
1009   int            *procs,*nprocs,j,found,idx,nsends,*work;
1010   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
1011   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
1012   int            *lens,imdex,*lrows,*values,bs=l->bs;
1013   MPI_Comm       comm = A->comm;
1014   MPI_Request    *send_waits,*recv_waits;
1015   MPI_Status     recv_status,*send_status;
1016   IS             istmp;
1017 
1018   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
1019   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
1020 
1021   /*  first count number of contributors to each processor */
1022   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
1023   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
1024   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
1025   for ( i=0; i<N; i++ ) {
1026     idx = rows[i];
1027     found = 0;
1028     for ( j=0; j<size; j++ ) {
1029       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
1030         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
1031       }
1032     }
1033     if (!found) SETERRQ(1,"MatZeroRows_MPIBAIJ:Index out of range");
1034   }
1035   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
1036 
1037   /* inform other processors of number of messages and max length*/
1038   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
1039   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
1040   nrecvs = work[rank];
1041   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
1042   nmax = work[rank];
1043   PetscFree(work);
1044 
1045   /* post receives:   */
1046   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
1047   CHKPTRQ(rvalues);
1048   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
1049   CHKPTRQ(recv_waits);
1050   for ( i=0; i<nrecvs; i++ ) {
1051     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
1052   }
1053 
1054   /* do sends:
1055       1) starts[i] gives the starting index in svalues for stuff going to
1056          the ith processor
1057   */
1058   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
1059   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
1060   CHKPTRQ(send_waits);
1061   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
1062   starts[0] = 0;
1063   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1064   for ( i=0; i<N; i++ ) {
1065     svalues[starts[owner[i]]++] = rows[i];
1066   }
1067   ISRestoreIndices(is,&rows);
1068 
1069   starts[0] = 0;
1070   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1071   count = 0;
1072   for ( i=0; i<size; i++ ) {
1073     if (procs[i]) {
1074       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
1075     }
1076   }
1077   PetscFree(starts);
1078 
1079   base = owners[rank]*bs;
1080 
1081   /*  wait on receives */
1082   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
1083   source = lens + nrecvs;
1084   count  = nrecvs; slen = 0;
1085   while (count) {
1086     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
1087     /* unpack receives into our local space */
1088     MPI_Get_count(&recv_status,MPI_INT,&n);
1089     source[imdex]  = recv_status.MPI_SOURCE;
1090     lens[imdex]  = n;
1091     slen += n;
1092     count--;
1093   }
1094   PetscFree(recv_waits);
1095 
1096   /* move the data into the send scatter */
1097   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
1098   count = 0;
1099   for ( i=0; i<nrecvs; i++ ) {
1100     values = rvalues + i*nmax;
1101     for ( j=0; j<lens[i]; j++ ) {
1102       lrows[count++] = values[j] - base;
1103     }
1104   }
1105   PetscFree(rvalues); PetscFree(lens);
1106   PetscFree(owner); PetscFree(nprocs);
1107 
1108   /* actually zap the local rows */
1109   ierr = ISCreateGeneral(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
1110   PLogObjectParent(A,istmp);
1111   PetscFree(lrows);
1112   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
1113   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
1114   ierr = ISDestroy(istmp); CHKERRQ(ierr);
1115 
1116   /* wait on sends */
1117   if (nsends) {
1118     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
1119     CHKPTRQ(send_status);
1120     MPI_Waitall(nsends,send_waits,send_status);
1121     PetscFree(send_status);
1122   }
1123   PetscFree(send_waits); PetscFree(svalues);
1124 
1125   return 0;
1126 }
1127 extern int MatPrintHelp_SeqBAIJ(Mat);
1128 #undef __FUNCTION__
1129 #define __FUNCTION__ "MatPrintHelp_MPIBAIJ"
1130 static int MatPrintHelp_MPIBAIJ(Mat A)
1131 {
1132   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1133 
1134   if (!a->rank) return MatPrintHelp_SeqBAIJ(a->A);
1135   else return 0;
1136 }
1137 
1138 static int MatConvertSameType_MPIBAIJ(Mat,Mat *,int);
1139 
1140 /* -------------------------------------------------------------------*/
1141 static struct _MatOps MatOps = {
1142   MatSetValues_MPIBAIJ,MatGetRow_MPIBAIJ,MatRestoreRow_MPIBAIJ,MatMult_MPIBAIJ,
1143   MatMultAdd_MPIBAIJ,MatMultTrans_MPIBAIJ,MatMultTransAdd_MPIBAIJ,0,
1144   0,0,0,0,
1145   0,0,MatTranspose_MPIBAIJ,MatGetInfo_MPIBAIJ,
1146   0,MatGetDiagonal_MPIBAIJ,MatDiagonalScale_MPIBAIJ,MatNorm_MPIBAIJ,
1147   MatAssemblyBegin_MPIBAIJ,MatAssemblyEnd_MPIBAIJ,0,MatSetOption_MPIBAIJ,
1148   MatZeroEntries_MPIBAIJ,MatZeroRows_MPIBAIJ,0,
1149   0,0,0,MatGetSize_MPIBAIJ,
1150   MatGetLocalSize_MPIBAIJ,MatGetOwnershipRange_MPIBAIJ,0,0,
1151   0,0,0,MatConvertSameType_MPIBAIJ,0,0,
1152   0,0,0,MatGetSubMatrices_MPIBAIJ,
1153   MatIncreaseOverlap_MPIBAIJ,MatGetValues_MPIBAIJ,0,MatPrintHelp_MPIBAIJ,
1154   MatScale_MPIBAIJ,0,0,0,MatGetBlockSize_MPIBAIJ};
1155 
1156 
1157 #undef __FUNCTION__
1158 #define __FUNCTION__ "MatCreateMPIBAIJ"
1159 /*@C
1160    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
1161    (block compressed row).  For good matrix assembly performance
1162    the user should preallocate the matrix storage by setting the parameters
1163    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1164    performance can be increased by more than a factor of 50.
1165 
1166    Input Parameters:
1167 .  comm - MPI communicator
1168 .  bs   - size of blockk
1169 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1170            This value should be the same as the local size used in creating the
1171            y vector for the matrix-vector product y = Ax.
1172 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1173            This value should be the same as the local size used in creating the
1174            x vector for the matrix-vector product y = Ax.
1175 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1176 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1177 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
1178            submatrix  (same for all local rows)
1179 .  d_nzz - array containing the number of block nonzeros in the various block rows
1180            of the in diagonal portion of the local (possibly different for each block
1181            row) or PETSC_NULL.  You must leave room for the diagonal entry even if
1182            it is zero.
1183 .  o_nz  - number of block nonzeros per block row in the off-diagonal portion of local
1184            submatrix (same for all local rows).
1185 .  o_nzz - array containing the number of nonzeros in the various block rows of the
1186            off-diagonal portion of the local submatrix (possibly different for
1187            each block row) or PETSC_NULL.
1188 
1189    Output Parameter:
1190 .  A - the matrix
1191 
1192    Notes:
1193    The user MUST specify either the local or global matrix dimensions
1194    (possibly both).
1195 
1196    Storage Information:
1197    For a square global matrix we define each processor's diagonal portion
1198    to be its local rows and the corresponding columns (a square submatrix);
1199    each processor's off-diagonal portion encompasses the remainder of the
1200    local matrix (a rectangular submatrix).
1201 
1202    The user can specify preallocated storage for the diagonal part of
1203    the local submatrix with either d_nz or d_nnz (not both).  Set
1204    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1205    memory allocation.  Likewise, specify preallocated storage for the
1206    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1207 
1208    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1209    the figure below we depict these three local rows and all columns (0-11).
1210 
1211 $          0 1 2 3 4 5 6 7 8 9 10 11
1212 $         -------------------
1213 $  row 3  |  o o o d d d o o o o o o
1214 $  row 4  |  o o o d d d o o o o o o
1215 $  row 5  |  o o o d d d o o o o o o
1216 $         -------------------
1217 $
1218 
1219    Thus, any entries in the d locations are stored in the d (diagonal)
1220    submatrix, and any entries in the o locations are stored in the
1221    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1222    stored simply in the MATSEQBAIJ format for compressed row storage.
1223 
1224    Now d_nz should indicate the number of nonzeros per row in the d matrix,
1225    and o_nz should indicate the number of nonzeros per row in the o matrix.
1226    In general, for PDE problems in which most nonzeros are near the diagonal,
1227    one expects d_nz >> o_nz.   For large problems you MUST preallocate memory
1228    or you will get TERRIBLE performance; see the users' manual chapter on
1229    matrices.
1230 
1231 .keywords: matrix, block, aij, compressed row, sparse, parallel
1232 
1233 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues()
1234 @*/
1235 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
1236                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1237 {
1238   Mat          B;
1239   Mat_MPIBAIJ  *b;
1240   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE,size;
1241 
1242   if (bs < 1) SETERRQ(1,"MatCreateMPIBAIJ: invalid block size specified");
1243   *A = 0;
1244   PetscHeaderCreate(B,_Mat,MAT_COOKIE,MATMPIBAIJ,comm);
1245   PLogObjectCreate(B);
1246   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
1247   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
1248   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1249   MPI_Comm_size(comm,&size);
1250   if (size == 1) {
1251     B->ops.getrowij          = MatGetRowIJ_MPIBAIJ;
1252     B->ops.restorerowij      = MatRestoreRowIJ_MPIBAIJ;
1253     B->ops.lufactorsymbolic  = MatLUFactorSymbolic_MPIBAIJ;
1254     B->ops.lufactornumeric   = MatLUFactorNumeric_MPIBAIJ;
1255     B->ops.lufactor          = MatLUFactor_MPIBAIJ;
1256     B->ops.solve             = MatSolve_MPIBAIJ;
1257     B->ops.solveadd          = MatSolveAdd_MPIBAIJ;
1258     B->ops.solvetrans        = MatSolveTrans_MPIBAIJ;
1259     B->ops.solvetransadd     = MatSolveTransAdd_MPIBAIJ;
1260     B->ops.ilufactorsymbolic = MatILUFactorSymbolic_MPIBAIJ;
1261   }
1262 
1263   B->destroy    = MatDestroy_MPIBAIJ;
1264   B->view       = MatView_MPIBAIJ;
1265   B->mapping    = 0;
1266   B->factor     = 0;
1267   B->assembled  = PETSC_FALSE;
1268 
1269   b->insertmode = NOT_SET_VALUES;
1270   MPI_Comm_rank(comm,&b->rank);
1271   MPI_Comm_size(comm,&b->size);
1272 
1273   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
1274     SETERRQ(1,"MatCreateMPIBAIJ:Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1275   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) SETERRQ(1,"MatCreateMPIBAIJ: either M or m should be specified");
1276   if ( M == PETSC_DECIDE && n == PETSC_DECIDE)SETERRQ(1,"MatCreateMPIBAIJ: either N or n should be specified");
1277   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
1278   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
1279 
1280   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1281     work[0] = m; work[1] = n;
1282     mbs = m/bs; nbs = n/bs;
1283     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
1284     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
1285     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
1286   }
1287   if (m == PETSC_DECIDE) {
1288     Mbs = M/bs;
1289     if (Mbs*bs != M) SETERRQ(1,"MatCreateMPIBAIJ: No of global rows must be divisible by blocksize");
1290     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
1291     m   = mbs*bs;
1292   }
1293   if (n == PETSC_DECIDE) {
1294     Nbs = N/bs;
1295     if (Nbs*bs != N) SETERRQ(1,"MatCreateMPIBAIJ: No of global cols must be divisible by blocksize");
1296     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
1297     n   = nbs*bs;
1298   }
1299   if (mbs*bs != m || nbs*bs != n) SETERRQ(1,"MatCreateMPIBAIJ: No of local rows, cols must be divisible by blocksize");
1300 
1301   b->m = m; B->m = m;
1302   b->n = n; B->n = n;
1303   b->N = N; B->N = N;
1304   b->M = M; B->M = M;
1305   b->bs  = bs;
1306   b->bs2 = bs*bs;
1307   b->mbs = mbs;
1308   b->nbs = nbs;
1309   b->Mbs = Mbs;
1310   b->Nbs = Nbs;
1311 
1312   /* build local table of row and column ownerships */
1313   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1314   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
1315   b->cowners = b->rowners + b->size + 2;
1316   MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
1317   b->rowners[0] = 0;
1318   for ( i=2; i<=b->size; i++ ) {
1319     b->rowners[i] += b->rowners[i-1];
1320   }
1321   b->rstart    = b->rowners[b->rank];
1322   b->rend      = b->rowners[b->rank+1];
1323   b->rstart_bs = b->rstart * bs;
1324   b->rend_bs   = b->rend * bs;
1325 
1326   MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
1327   b->cowners[0] = 0;
1328   for ( i=2; i<=b->size; i++ ) {
1329     b->cowners[i] += b->cowners[i-1];
1330   }
1331   b->cstart    = b->cowners[b->rank];
1332   b->cend      = b->cowners[b->rank+1];
1333   b->cstart_bs = b->cstart * bs;
1334   b->cend_bs   = b->cend * bs;
1335 
1336   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1337   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1338   PLogObjectParent(B,b->A);
1339   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1340   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1341   PLogObjectParent(B,b->B);
1342 
1343   /* build cache for off array entries formed */
1344   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1345   b->donotstash  = 0;
1346   b->colmap      = 0;
1347   b->garray      = 0;
1348   b->roworiented = 1;
1349 
1350   /* stuff used for matrix vector multiply */
1351   b->lvec      = 0;
1352   b->Mvctx     = 0;
1353 
1354   /* stuff for MatGetRow() */
1355   b->rowindices   = 0;
1356   b->rowvalues    = 0;
1357   b->getrowactive = PETSC_FALSE;
1358 
1359   *A = B;
1360   return 0;
1361 }
1362 
1363 #undef __FUNCTION__
1364 #define __FUNCTION__ "MatConvertSameType_MPIBAIJ"
1365 static int MatConvertSameType_MPIBAIJ(Mat matin,Mat *newmat,int cpvalues)
1366 {
1367   Mat        mat;
1368   Mat_MPIBAIJ *a,*oldmat = (Mat_MPIBAIJ *) matin->data;
1369   int        ierr, len=0, flg;
1370 
1371   *newmat       = 0;
1372   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIBAIJ,matin->comm);
1373   PLogObjectCreate(mat);
1374   mat->data       = (void *) (a = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(a);
1375   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1376   mat->destroy    = MatDestroy_MPIBAIJ;
1377   mat->view       = MatView_MPIBAIJ;
1378   mat->factor     = matin->factor;
1379   mat->assembled  = PETSC_TRUE;
1380 
1381   a->m = mat->m   = oldmat->m;
1382   a->n = mat->n   = oldmat->n;
1383   a->M = mat->M   = oldmat->M;
1384   a->N = mat->N   = oldmat->N;
1385 
1386   a->bs  = oldmat->bs;
1387   a->bs2 = oldmat->bs2;
1388   a->mbs = oldmat->mbs;
1389   a->nbs = oldmat->nbs;
1390   a->Mbs = oldmat->Mbs;
1391   a->Nbs = oldmat->Nbs;
1392 
1393   a->rstart       = oldmat->rstart;
1394   a->rend         = oldmat->rend;
1395   a->cstart       = oldmat->cstart;
1396   a->cend         = oldmat->cend;
1397   a->size         = oldmat->size;
1398   a->rank         = oldmat->rank;
1399   a->insertmode   = NOT_SET_VALUES;
1400   a->rowvalues    = 0;
1401   a->getrowactive = PETSC_FALSE;
1402 
1403   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1404   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
1405   a->cowners = a->rowners + a->size + 2;
1406   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1407   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1408   if (oldmat->colmap) {
1409     a->colmap = (int *) PetscMalloc((a->Nbs)*sizeof(int));CHKPTRQ(a->colmap);
1410     PLogObjectMemory(mat,(a->Nbs)*sizeof(int));
1411     PetscMemcpy(a->colmap,oldmat->colmap,(a->Nbs)*sizeof(int));
1412   } else a->colmap = 0;
1413   if (oldmat->garray && (len = ((Mat_SeqBAIJ *) (oldmat->B->data))->nbs)) {
1414     a->garray = (int *) PetscMalloc(len*sizeof(int)); CHKPTRQ(a->garray);
1415     PLogObjectMemory(mat,len*sizeof(int));
1416     PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1417   } else a->garray = 0;
1418 
1419   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1420   PLogObjectParent(mat,a->lvec);
1421   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1422   PLogObjectParent(mat,a->Mvctx);
1423   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1424   PLogObjectParent(mat,a->A);
1425   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1426   PLogObjectParent(mat,a->B);
1427   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1428   if (flg) {
1429     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1430   }
1431   *newmat = mat;
1432   return 0;
1433 }
1434 
1435 #include "sys.h"
1436 
1437 #undef __FUNCTION__
1438 #define __FUNCTION__ "MatLoad_MPIBAIJ"
1439 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
1440 {
1441   Mat          A;
1442   int          i, nz, ierr, j,rstart, rend, fd;
1443   Scalar       *vals,*buf;
1444   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1445   MPI_Status   status;
1446   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
1447   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
1448   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,bs2,Mbs,mbs,extra_rows;
1449   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
1450   int          dcount,kmax,k,nzcount,tmp;
1451 
1452 
1453   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
1454   bs2  = bs*bs;
1455 
1456   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1457   if (!rank) {
1458     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1459     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
1460     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIBAIJ:not matrix object");
1461   }
1462 
1463   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1464   M = header[1]; N = header[2];
1465 
1466   if (M != N) SETERRQ(1,"MatLoad_MPIBAIJ:Can only do square matrices");
1467 
1468   /*
1469      This code adds extra rows to make sure the number of rows is
1470      divisible by the blocksize
1471   */
1472   Mbs        = M/bs;
1473   extra_rows = bs - M + bs*(Mbs);
1474   if (extra_rows == bs) extra_rows = 0;
1475   else                  Mbs++;
1476   if (extra_rows &&!rank) {
1477     PLogInfo(0,"MatLoad_MPIBAIJ:Padding loaded matrix to match blocksize\n");
1478   }
1479 
1480   /* determine ownership of all rows */
1481   mbs = Mbs/size + ((Mbs % size) > rank);
1482   m   = mbs * bs;
1483   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
1484   browners = rowners + size + 1;
1485   MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1486   rowners[0] = 0;
1487   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
1488   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
1489   rstart = rowners[rank];
1490   rend   = rowners[rank+1];
1491 
1492   /* distribute row lengths to all processors */
1493   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
1494   if (!rank) {
1495     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
1496     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1497     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
1498     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1499     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
1500     MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);
1501     PetscFree(sndcounts);
1502   }
1503   else {
1504     MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);
1505   }
1506 
1507   if (!rank) {
1508     /* calculate the number of nonzeros on each processor */
1509     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1510     PetscMemzero(procsnz,size*sizeof(int));
1511     for ( i=0; i<size; i++ ) {
1512       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
1513         procsnz[i] += rowlengths[j];
1514       }
1515     }
1516     PetscFree(rowlengths);
1517 
1518     /* determine max buffer needed and allocate it */
1519     maxnz = 0;
1520     for ( i=0; i<size; i++ ) {
1521       maxnz = PetscMax(maxnz,procsnz[i]);
1522     }
1523     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1524 
1525     /* read in my part of the matrix column indices  */
1526     nz = procsnz[0];
1527     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1528     mycols = ibuf;
1529     if (size == 1)  nz -= extra_rows;
1530     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1531     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
1532 
1533     /* read in every ones (except the last) and ship off */
1534     for ( i=1; i<size-1; i++ ) {
1535       nz = procsnz[i];
1536       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1537       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1538     }
1539     /* read in the stuff for the last proc */
1540     if ( size != 1 ) {
1541       nz = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
1542       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1543       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
1544       MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);
1545     }
1546     PetscFree(cols);
1547   }
1548   else {
1549     /* determine buffer space needed for message */
1550     nz = 0;
1551     for ( i=0; i<m; i++ ) {
1552       nz += locrowlens[i];
1553     }
1554     ibuf = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1555     mycols = ibuf;
1556     /* receive message of column indices*/
1557     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1558     MPI_Get_count(&status,MPI_INT,&maxnz);
1559     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIBAIJ:something is wrong with file");
1560   }
1561 
1562   /* loop over local rows, determining number of off diagonal entries */
1563   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
1564   odlens = dlens + (rend-rstart);
1565   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
1566   PetscMemzero(mask,3*Mbs*sizeof(int));
1567   masked1 = mask    + Mbs;
1568   masked2 = masked1 + Mbs;
1569   rowcount = 0; nzcount = 0;
1570   for ( i=0; i<mbs; i++ ) {
1571     dcount  = 0;
1572     odcount = 0;
1573     for ( j=0; j<bs; j++ ) {
1574       kmax = locrowlens[rowcount];
1575       for ( k=0; k<kmax; k++ ) {
1576         tmp = mycols[nzcount++]/bs;
1577         if (!mask[tmp]) {
1578           mask[tmp] = 1;
1579           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
1580           else masked1[dcount++] = tmp;
1581         }
1582       }
1583       rowcount++;
1584     }
1585 
1586     dlens[i]  = dcount;
1587     odlens[i] = odcount;
1588 
1589     /* zero out the mask elements we set */
1590     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
1591     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
1592   }
1593 
1594   /* create our matrix */
1595   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);
1596          CHKERRQ(ierr);
1597   A = *newmat;
1598   MatSetOption(A,MAT_COLUMNS_SORTED);
1599 
1600   if (!rank) {
1601     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
1602     /* read in my part of the matrix numerical values  */
1603     nz = procsnz[0];
1604     vals = buf;
1605     mycols = ibuf;
1606     if (size == 1)  nz -= extra_rows;
1607     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1608     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
1609 
1610     /* insert into matrix */
1611     jj      = rstart*bs;
1612     for ( i=0; i<m; i++ ) {
1613       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1614       mycols += locrowlens[i];
1615       vals   += locrowlens[i];
1616       jj++;
1617     }
1618     /* read in other processors (except the last one) and ship out */
1619     for ( i=1; i<size-1; i++ ) {
1620       nz = procsnz[i];
1621       vals = buf;
1622       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1623       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1624     }
1625     /* the last proc */
1626     if ( size != 1 ){
1627       nz = procsnz[i] - extra_rows;
1628       vals = buf;
1629       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1630       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
1631       MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);
1632     }
1633     PetscFree(procsnz);
1634   }
1635   else {
1636     /* receive numeric values */
1637     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
1638 
1639     /* receive message of values*/
1640     vals = buf;
1641     mycols = ibuf;
1642     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1643     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1644     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIBAIJ:something is wrong with file");
1645 
1646     /* insert into matrix */
1647     jj      = rstart*bs;
1648     for ( i=0; i<m; i++ ) {
1649       ierr    = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1650       mycols += locrowlens[i];
1651       vals   += locrowlens[i];
1652       jj++;
1653     }
1654   }
1655   PetscFree(locrowlens);
1656   PetscFree(buf);
1657   PetscFree(ibuf);
1658   PetscFree(rowners);
1659   PetscFree(dlens);
1660   PetscFree(mask);
1661   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1662   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1663   return 0;
1664 }
1665 
1666 
1667