xref: /petsc/src/mat/impls/aij/mpi/mpiaij.c (revision 0520107fe0fb6fb2de36e5dc2c15a82b6ccd3e44)
1 #ifndef lint
2 static char vcid[] = "$Id: mpiaij.c,v 1.185 1997/01/06 20:24:32 balay Exp balay $";
3 #endif
4 
5 #include "src/mat/impls/aij/mpi/mpiaij.h"
6 #include "src/vec/vecimpl.h"
7 #include "src/inline/spops.h"
8 
9 /* local utility routine that creates a mapping from the global column
10 number to the local number in the off-diagonal part of the local
11 storage of the matrix.  This is done in a non scable way since the
12 length of colmap equals the global matrix length.
13 */
14 #undef __FUNC__
15 #define __FUNC__ "CreateColmap_MPIAIJ_Private"
16 int CreateColmap_MPIAIJ_Private(Mat mat)
17 {
18   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
19   Mat_SeqAIJ *B = (Mat_SeqAIJ*) aij->B->data;
20   int        n = B->n,i;
21 
22   aij->colmap = (int *) PetscMalloc(aij->N*sizeof(int));CHKPTRQ(aij->colmap);
23   PLogObjectMemory(mat,aij->N*sizeof(int));
24   PetscMemzero(aij->colmap,aij->N*sizeof(int));
25   for ( i=0; i<n; i++ ) aij->colmap[aij->garray[i]] = i+1;
26   return 0;
27 }
28 
29 extern int DisAssemble_MPIAIJ(Mat);
30 
31 #undef __FUNC__
32 #define __FUNC__ "MatGetRowIJ_MPIAIJ"
33 static int MatGetRowIJ_MPIAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
34                            PetscTruth *done)
35 {
36   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
37   int        ierr;
38   if (aij->size == 1) {
39     ierr = MatGetRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
40   } else SETERRQ(1,0,"not supported in parallel");
41   return 0;
42 }
43 
44 #undef __FUNC__
45 #define __FUNC__ "MatRestoreRowIJ_MPIAIJ"
46 static int MatRestoreRowIJ_MPIAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
47                                PetscTruth *done)
48 {
49   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
50   int        ierr;
51   if (aij->size == 1) {
52     ierr = MatRestoreRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
53   } else SETERRQ(1,0,"not supported in parallel");
54   return 0;
55 }
56 
57 #define CHUNKSIZE   15
58 #define MatSetValues_SeqAIJ_A_Private(row,col,value) \
59 { \
60  \
61     rp   = aj + ai[row] + shift; ap = aa + ai[row] + shift; \
62     rmax = imax[row]; nrow = ailen[row];  \
63       for ( _i=0; _i<nrow; _i++ ) { \
64         if (rp[_i] > col) break; \
65         if (rp[_i] == col) { \
66           if (addv == ADD_VALUES) ap[_i] += value;   \
67           else                  ap[_i] = value; \
68           goto _noinsert; \
69         } \
70       }  \
71       if (a->nonew) goto _noinsert; \
72       if (nrow >= rmax) { \
73         /* there is no extra room in row, therefore enlarge */ \
74         int    new_nz = ai[a->m] + CHUNKSIZE,len,*new_i,*new_j; \
75         Scalar *new_a; \
76  \
77         /* malloc new storage space */ \
78         len     = new_nz*(sizeof(int)+sizeof(Scalar))+(a->m+1)*sizeof(int); \
79         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
80         new_j   = (int *) (new_a + new_nz); \
81         new_i   = new_j + new_nz; \
82  \
83         /* copy over old data into new slots */ \
84         for ( ii=0; ii<row+1; ii++ ) {new_i[ii] = ai[ii];} \
85         for ( ii=row+1; ii<a->m+1; ii++ ) {new_i[ii] = ai[ii]+CHUNKSIZE;} \
86         PetscMemcpy(new_j,aj,(ai[row]+nrow+shift)*sizeof(int)); \
87         len = (new_nz - CHUNKSIZE - ai[row] - nrow - shift); \
88         PetscMemcpy(new_j+ai[row]+shift+nrow+CHUNKSIZE,aj+ai[row]+shift+nrow, \
89                                                            len*sizeof(int)); \
90         PetscMemcpy(new_a,aa,(ai[row]+nrow+shift)*sizeof(Scalar)); \
91         PetscMemcpy(new_a+ai[row]+shift+nrow+CHUNKSIZE,aa+ai[row]+shift+nrow, \
92                                                            len*sizeof(Scalar));  \
93         /* free up old matrix storage */ \
94         PetscFree(a->a);  \
95         if (!a->singlemalloc) {PetscFree(a->i);PetscFree(a->j);} \
96         aa = a->a = new_a; ai = a->i = new_i; aj = a->j = new_j;  \
97         a->singlemalloc = 1; \
98  \
99         rp   = aj + ai[row] + shift; ap = aa + ai[row] + shift; \
100         rmax = imax[row] = imax[row] + CHUNKSIZE; \
101         PLogObjectMemory(A,CHUNKSIZE*(sizeof(int) + sizeof(Scalar))); \
102         a->maxnz += CHUNKSIZE; \
103         a->reallocs++; \
104       } \
105       N = nrow++ - 1; a->nz++; \
106       /* shift up all the later entries in this row */ \
107       for ( ii=N; ii>=_i; ii-- ) { \
108         rp[ii+1] = rp[ii]; \
109         ap[ii+1] = ap[ii]; \
110       } \
111       rp[_i] = col;  \
112       ap[_i] = value;  \
113       _noinsert:; \
114     ailen[row] = nrow; \
115   return 0; \
116 }
117 
118 extern int MatSetValues_SeqAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
119 #undef __FUNC__
120 #define __FUNC__ "MatSetValues_MPIAIJ"
121 static int MatSetValues_MPIAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
122 {
123   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
124   Scalar     value;
125   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
126   int        cstart = aij->cstart, cend = aij->cend,row,col;
127   int        roworiented = aij->roworiented;
128 
129   /* Some Variables required in the macro */
130   Mat        A = aij->A;
131   Mat_SeqAIJ *a = (Mat_SeqAIJ *) (A)->data;
132   int        *rp,ii,nrow,_i,rmax,N;
133   int        *imax=a->imax,*ai=a->i,*ailen=a->ilen;
134   int        *aj=a->j,shift=a->indexshift;
135   Scalar     *ap,*aa=a->a;
136 
137 #if defined(PETSC_BOPT_g)
138   if (aij->insertmode != NOT_SET_VALUES && aij->insertmode != addv) {
139     SETERRQ(1,0,"Cannot mix inserts and adds");
140   }
141 #endif
142   aij->insertmode = addv;
143   for ( i=0; i<m; i++ ) {
144 #if defined(PETSC_BOPT_g)
145     if (im[i] < 0) SETERRQ(1,0,"Negative row");
146     if (im[i] >= aij->M) SETERRQ(1,0,"Row too large");
147 #endif
148     if (im[i] >= rstart && im[i] < rend) {
149       row = im[i] - rstart;
150       for ( j=0; j<n; j++ ) {
151         if (in[j] >= cstart && in[j] < cend){
152           col = in[j] - cstart;
153           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
154           MatSetValues_SeqAIJ_A_Private(row,col,value);
155           /* ierr = MatSetValues_SeqAIJ(aij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
156         }
157 #if defined(PETSC_BOPT_g)
158         else if (in[j] < 0) {SETERRQ(1,0,"Negative column");}
159         else if (in[j] >= aij->N) {SETERRQ(1,0,"Col too large");}
160 #endif
161         else {
162           if (mat->was_assembled) {
163             if (!aij->colmap) {
164               ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
165             }
166             col = aij->colmap[in[j]] - 1;
167             if (col < 0 && !((Mat_SeqAIJ*)(aij->A->data))->nonew) {
168               ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
169               col =  in[j];
170             }
171           }
172           else col = in[j];
173           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
174           ierr = MatSetValues_SeqAIJ(aij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
175         }
176       }
177     }
178     else {
179       if (roworiented && !aij->donotstash) {
180         ierr = StashValues_Private(&aij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
181       }
182       else {
183         if (!aij->donotstash) {
184           row = im[i];
185           for ( j=0; j<n; j++ ) {
186             ierr = StashValues_Private(&aij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
187           }
188         }
189       }
190     }
191   }
192   return 0;
193 }
194 
195 #undef __FUNC__
196 #define __FUNC__ "MatGetValues_MPIAIJ"
197 static int MatGetValues_MPIAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
198 {
199   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
200   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
201   int        cstart = aij->cstart, cend = aij->cend,row,col;
202 
203   for ( i=0; i<m; i++ ) {
204     if (idxm[i] < 0) SETERRQ(1,0,"Negative row");
205     if (idxm[i] >= aij->M) SETERRQ(1,0,"Row too large");
206     if (idxm[i] >= rstart && idxm[i] < rend) {
207       row = idxm[i] - rstart;
208       for ( j=0; j<n; j++ ) {
209         if (idxn[j] < 0) SETERRQ(1,0,"Negative column");
210         if (idxn[j] >= aij->N) SETERRQ(1,0,"Col too large");
211         if (idxn[j] >= cstart && idxn[j] < cend){
212           col = idxn[j] - cstart;
213           ierr = MatGetValues(aij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
214         }
215         else {
216           if (!aij->colmap) {
217             ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
218           }
219           col = aij->colmap[idxn[j]] - 1;
220           if ((col < 0) || (aij->garray[col] != idxn[j])) *(v+i*n+j) = 0.0;
221           else {
222             ierr = MatGetValues(aij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
223           }
224         }
225       }
226     }
227     else {
228       SETERRQ(1,0,"Only local values currently supported");
229     }
230   }
231   return 0;
232 }
233 
234 #undef __FUNC__
235 #define __FUNC__ "MatAssemblyBegin_MPIAIJ"
236 static int MatAssemblyBegin_MPIAIJ(Mat mat,MatAssemblyType mode)
237 {
238   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
239   MPI_Comm    comm = mat->comm;
240   int         size = aij->size, *owners = aij->rowners;
241   int         rank = aij->rank,tag = mat->tag, *owner,*starts,count,ierr;
242   MPI_Request *send_waits,*recv_waits;
243   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
244   InsertMode  addv;
245   Scalar      *rvalues,*svalues;
246 
247   /* make sure all processors are either in INSERTMODE or ADDMODE */
248   MPI_Allreduce(&aij->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
249   if (addv == (ADD_VALUES|INSERT_VALUES)) {
250     SETERRQ(1,0,"Some processors inserted others added");
251   }
252   aij->insertmode = addv; /* in case this processor had no cache */
253 
254   /*  first count number of contributors to each processor */
255   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
256   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
257   owner = (int *) PetscMalloc( (aij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
258   for ( i=0; i<aij->stash.n; i++ ) {
259     idx = aij->stash.idx[i];
260     for ( j=0; j<size; j++ ) {
261       if (idx >= owners[j] && idx < owners[j+1]) {
262         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
263       }
264     }
265   }
266   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
267 
268   /* inform other processors of number of messages and max length*/
269   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
270   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
271   nreceives = work[rank];
272   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
273   nmax = work[rank];
274   PetscFree(work);
275 
276   /* post receives:
277        1) each message will consist of ordered pairs
278      (global index,value) we store the global index as a double
279      to simplify the message passing.
280        2) since we don't know how long each individual message is we
281      allocate the largest needed buffer for each receive. Potentially
282      this is a lot of wasted space.
283 
284 
285        This could be done better.
286   */
287   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
288   CHKPTRQ(rvalues);
289   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
290   CHKPTRQ(recv_waits);
291   for ( i=0; i<nreceives; i++ ) {
292     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
293               comm,recv_waits+i);
294   }
295 
296   /* do sends:
297       1) starts[i] gives the starting index in svalues for stuff going to
298          the ith processor
299   */
300   svalues = (Scalar *) PetscMalloc(3*(aij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
301   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
302   CHKPTRQ(send_waits);
303   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
304   starts[0] = 0;
305   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
306   for ( i=0; i<aij->stash.n; i++ ) {
307     svalues[3*starts[owner[i]]]       = (Scalar)  aij->stash.idx[i];
308     svalues[3*starts[owner[i]]+1]     = (Scalar)  aij->stash.idy[i];
309     svalues[3*(starts[owner[i]]++)+2] =  aij->stash.array[i];
310   }
311   PetscFree(owner);
312   starts[0] = 0;
313   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
314   count = 0;
315   for ( i=0; i<size; i++ ) {
316     if (procs[i]) {
317       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
318                 comm,send_waits+count++);
319     }
320   }
321   PetscFree(starts); PetscFree(nprocs);
322 
323   /* Free cache space */
324   PLogInfo(mat,"MatAssemblyBegin_MPIAIJ:Number of off-processor values %d\n",aij->stash.n);
325   ierr = StashDestroy_Private(&aij->stash); CHKERRQ(ierr);
326 
327   aij->svalues    = svalues;    aij->rvalues    = rvalues;
328   aij->nsends     = nsends;     aij->nrecvs     = nreceives;
329   aij->send_waits = send_waits; aij->recv_waits = recv_waits;
330   aij->rmax       = nmax;
331 
332   return 0;
333 }
334 extern int MatSetUpMultiply_MPIAIJ(Mat);
335 
336 #undef __FUNC__
337 #define __FUNC__ "MatAssemblyEnd_MPIAIJ"
338 static int MatAssemblyEnd_MPIAIJ(Mat mat,MatAssemblyType mode)
339 {
340   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
341   MPI_Status  *send_status,recv_status;
342   int         imdex,nrecvs = aij->nrecvs, count = nrecvs, i, n, ierr;
343   int         row,col,other_disassembled;
344   Scalar      *values,val;
345   InsertMode  addv = aij->insertmode;
346 
347   /*  wait on receives */
348   while (count) {
349     MPI_Waitany(nrecvs,aij->recv_waits,&imdex,&recv_status);
350     /* unpack receives into our local space */
351     values = aij->rvalues + 3*imdex*aij->rmax;
352     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
353     n = n/3;
354     for ( i=0; i<n; i++ ) {
355       row = (int) PetscReal(values[3*i]) - aij->rstart;
356       col = (int) PetscReal(values[3*i+1]);
357       val = values[3*i+2];
358       if (col >= aij->cstart && col < aij->cend) {
359         col -= aij->cstart;
360         MatSetValues(aij->A,1,&row,1,&col,&val,addv);
361       }
362       else {
363         if (mat->was_assembled || mat->assembled) {
364           if (!aij->colmap) {
365             ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
366           }
367           col = aij->colmap[col] - 1;
368           if (col < 0  && !((Mat_SeqAIJ*)(aij->A->data))->nonew) {
369             ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
370             col = (int) PetscReal(values[3*i+1]);
371           }
372         }
373         MatSetValues(aij->B,1,&row,1,&col,&val,addv);
374       }
375     }
376     count--;
377   }
378   PetscFree(aij->recv_waits); PetscFree(aij->rvalues);
379 
380   /* wait on sends */
381   if (aij->nsends) {
382     send_status = (MPI_Status *) PetscMalloc(aij->nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
383     MPI_Waitall(aij->nsends,aij->send_waits,send_status);
384     PetscFree(send_status);
385   }
386   PetscFree(aij->send_waits); PetscFree(aij->svalues);
387 
388   aij->insertmode = NOT_SET_VALUES;
389   ierr = MatAssemblyBegin(aij->A,mode); CHKERRQ(ierr);
390   ierr = MatAssemblyEnd(aij->A,mode); CHKERRQ(ierr);
391 
392   /* determine if any processor has disassembled, if so we must
393      also disassemble ourselfs, in order that we may reassemble. */
394   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
395   if (mat->was_assembled && !other_disassembled) {
396     ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
397   }
398 
399   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
400     ierr = MatSetUpMultiply_MPIAIJ(mat); CHKERRQ(ierr);
401   }
402   ierr = MatAssemblyBegin(aij->B,mode); CHKERRQ(ierr);
403   ierr = MatAssemblyEnd(aij->B,mode); CHKERRQ(ierr);
404 
405   if (aij->rowvalues) {PetscFree(aij->rowvalues); aij->rowvalues = 0;}
406   return 0;
407 }
408 
409 #undef __FUNC__
410 #define __FUNC__ "MatZeroEntries_MPIAIJ"
411 static int MatZeroEntries_MPIAIJ(Mat A)
412 {
413   Mat_MPIAIJ *l = (Mat_MPIAIJ *) A->data;
414   int        ierr;
415   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
416   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
417   return 0;
418 }
419 
420 /* the code does not do the diagonal entries correctly unless the
421    matrix is square and the column and row owerships are identical.
422    This is a BUG. The only way to fix it seems to be to access
423    aij->A and aij->B directly and not through the MatZeroRows()
424    routine.
425 */
426 #undef __FUNC__
427 #define __FUNC__ "MatZeroRows_MPIAIJ"
428 static int MatZeroRows_MPIAIJ(Mat A,IS is,Scalar *diag)
429 {
430   Mat_MPIAIJ     *l = (Mat_MPIAIJ *) A->data;
431   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
432   int            *procs,*nprocs,j,found,idx,nsends,*work;
433   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
434   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
435   int            *lens,imdex,*lrows,*values;
436   MPI_Comm       comm = A->comm;
437   MPI_Request    *send_waits,*recv_waits;
438   MPI_Status     recv_status,*send_status;
439   IS             istmp;
440 
441   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
442   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
443 
444   /*  first count number of contributors to each processor */
445   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
446   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
447   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
448   for ( i=0; i<N; i++ ) {
449     idx = rows[i];
450     found = 0;
451     for ( j=0; j<size; j++ ) {
452       if (idx >= owners[j] && idx < owners[j+1]) {
453         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
454       }
455     }
456     if (!found) SETERRQ(1,0,"Index out of range");
457   }
458   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
459 
460   /* inform other processors of number of messages and max length*/
461   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
462   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
463   nrecvs = work[rank];
464   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
465   nmax = work[rank];
466   PetscFree(work);
467 
468   /* post receives:   */
469   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
470   CHKPTRQ(rvalues);
471   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
472   CHKPTRQ(recv_waits);
473   for ( i=0; i<nrecvs; i++ ) {
474     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
475   }
476 
477   /* do sends:
478       1) starts[i] gives the starting index in svalues for stuff going to
479          the ith processor
480   */
481   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
482   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
483   CHKPTRQ(send_waits);
484   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
485   starts[0] = 0;
486   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
487   for ( i=0; i<N; i++ ) {
488     svalues[starts[owner[i]]++] = rows[i];
489   }
490   ISRestoreIndices(is,&rows);
491 
492   starts[0] = 0;
493   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
494   count = 0;
495   for ( i=0; i<size; i++ ) {
496     if (procs[i]) {
497       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
498     }
499   }
500   PetscFree(starts);
501 
502   base = owners[rank];
503 
504   /*  wait on receives */
505   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
506   source = lens + nrecvs;
507   count  = nrecvs; slen = 0;
508   while (count) {
509     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
510     /* unpack receives into our local space */
511     MPI_Get_count(&recv_status,MPI_INT,&n);
512     source[imdex]  = recv_status.MPI_SOURCE;
513     lens[imdex]  = n;
514     slen += n;
515     count--;
516   }
517   PetscFree(recv_waits);
518 
519   /* move the data into the send scatter */
520   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
521   count = 0;
522   for ( i=0; i<nrecvs; i++ ) {
523     values = rvalues + i*nmax;
524     for ( j=0; j<lens[i]; j++ ) {
525       lrows[count++] = values[j] - base;
526     }
527   }
528   PetscFree(rvalues); PetscFree(lens);
529   PetscFree(owner); PetscFree(nprocs);
530 
531   /* actually zap the local rows */
532   ierr = ISCreateGeneral(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
533   PLogObjectParent(A,istmp);
534   PetscFree(lrows);
535   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
536   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
537   ierr = ISDestroy(istmp); CHKERRQ(ierr);
538 
539   /* wait on sends */
540   if (nsends) {
541     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
542     CHKPTRQ(send_status);
543     MPI_Waitall(nsends,send_waits,send_status);
544     PetscFree(send_status);
545   }
546   PetscFree(send_waits); PetscFree(svalues);
547 
548   return 0;
549 }
550 
551 #undef __FUNC__
552 #define __FUNC__ "MatMult_MPIAIJ"
553 static int MatMult_MPIAIJ(Mat A,Vec xx,Vec yy)
554 {
555   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
556   int        ierr,nt;
557 
558   ierr = VecGetLocalSize(xx,&nt);  CHKERRQ(ierr);
559   if (nt != a->n) {
560     SETERRQ(1,0,"Incompatible parition of A and xx");
561   }
562   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx); CHKERRQ(ierr);
563   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
564   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx); CHKERRQ(ierr);
565   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
566   return 0;
567 }
568 
569 #undef __FUNC__
570 #define __FUNC__ "MatMultAdd_MPIAIJ"
571 static int MatMultAdd_MPIAIJ(Mat A,Vec xx,Vec yy,Vec zz)
572 {
573   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
574   int        ierr;
575   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
576   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
577   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
578   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
579   return 0;
580 }
581 
582 #undef __FUNC__
583 #define __FUNC__ "MatMultTrans_MPIAIJ"
584 static int MatMultTrans_MPIAIJ(Mat A,Vec xx,Vec yy)
585 {
586   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
587   int        ierr;
588 
589   /* do nondiagonal part */
590   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
591   /* send it on its way */
592   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
593   /* do local part */
594   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
595   /* receive remote parts: note this assumes the values are not actually */
596   /* inserted in yy until the next line, which is true for my implementation*/
597   /* but is not perhaps always true. */
598   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
599   return 0;
600 }
601 
602 #undef __FUNC__
603 #define __FUNC__ "MatMultTransAdd_MPIAIJ"
604 static int MatMultTransAdd_MPIAIJ(Mat A,Vec xx,Vec yy,Vec zz)
605 {
606   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
607   int        ierr;
608 
609   /* do nondiagonal part */
610   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
611   /* send it on its way */
612   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
613   /* do local part */
614   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
615   /* receive remote parts: note this assumes the values are not actually */
616   /* inserted in yy until the next line, which is true for my implementation*/
617   /* but is not perhaps always true. */
618   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
619   return 0;
620 }
621 
622 /*
623   This only works correctly for square matrices where the subblock A->A is the
624    diagonal block
625 */
626 #undef __FUNC__
627 #define __FUNC__ "MatGetDiagonal_MPIAIJ"
628 static int MatGetDiagonal_MPIAIJ(Mat A,Vec v)
629 {
630   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
631   if (a->M != a->N)
632     SETERRQ(1,0,"Supports only square matrix where A->A is diag block");
633   if (a->rstart != a->cstart || a->rend != a->cend) {
634     SETERRQ(1,0,"row partition must equal col partition");  }
635   return MatGetDiagonal(a->A,v);
636 }
637 
638 #undef __FUNC__
639 #define __FUNC__ "MatScale_MPIAIJ"
640 static int MatScale_MPIAIJ(Scalar *aa,Mat A)
641 {
642   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
643   int        ierr;
644   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
645   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
646   return 0;
647 }
648 
649 #undef __FUNC__
650 #define __FUNC__ "MatDestroy_MPIAIJ"
651 static int MatDestroy_MPIAIJ(PetscObject obj)
652 {
653   Mat        mat = (Mat) obj;
654   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
655   int        ierr;
656 #if defined(PETSC_LOG)
657   PLogObjectState(obj,"Rows=%d, Cols=%d",aij->M,aij->N);
658 #endif
659   PetscFree(aij->rowners);
660   ierr = MatDestroy(aij->A); CHKERRQ(ierr);
661   ierr = MatDestroy(aij->B); CHKERRQ(ierr);
662   if (aij->colmap) PetscFree(aij->colmap);
663   if (aij->garray) PetscFree(aij->garray);
664   if (aij->lvec)   VecDestroy(aij->lvec);
665   if (aij->Mvctx)  VecScatterDestroy(aij->Mvctx);
666   if (aij->rowvalues) PetscFree(aij->rowvalues);
667   PetscFree(aij);
668   if (mat->mapping) {
669     ierr = ISLocalToGlobalMappingDestroy(mat->mapping); CHKERRQ(ierr);
670   }
671   PLogObjectDestroy(mat);
672   PetscHeaderDestroy(mat);
673   return 0;
674 }
675 #include "draw.h"
676 #include "pinclude/pviewer.h"
677 
678 #undef __FUNC__
679 #define __FUNC__ "MatView_MPIAIJ_Binary"
680 static int MatView_MPIAIJ_Binary(Mat mat,Viewer viewer)
681 {
682   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
683   int         ierr;
684 
685   if (aij->size == 1) {
686     ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
687   }
688   else SETERRQ(1,0,"Only uniprocessor output supported");
689   return 0;
690 }
691 
692 #undef __FUNC__
693 #define __FUNC__ "MatView_MPIAIJ_ASCIIorDraworMatlab"
694 static int MatView_MPIAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
695 {
696   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
697   Mat_SeqAIJ* C = (Mat_SeqAIJ*)aij->A->data;
698   int         ierr, format,shift = C->indexshift,rank;
699   FILE        *fd;
700   ViewerType  vtype;
701 
702   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
703   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
704     ierr = ViewerGetFormat(viewer,&format);
705     if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
706       MatInfo info;
707       int     flg;
708       MPI_Comm_rank(mat->comm,&rank);
709       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
710       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
711       ierr = OptionsHasName(PETSC_NULL,"-mat_aij_no_inode",&flg); CHKERRQ(ierr);
712       PetscSequentialPhaseBegin(mat->comm,1);
713       if (flg) fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d mem %d, not using I-node routines\n",
714          rank,aij->m,(int)info.nz_used,(int)info.nz_allocated,(int)info.memory);
715       else fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d mem %d, using I-node routines\n",
716          rank,aij->m,(int)info.nz_used,(int)info.nz_allocated,(int)info.memory);
717       ierr = MatGetInfo(aij->A,MAT_LOCAL,&info);
718       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used);
719       ierr = MatGetInfo(aij->B,MAT_LOCAL,&info);
720       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used);
721       fflush(fd);
722       PetscSequentialPhaseEnd(mat->comm,1);
723       ierr = VecScatterView(aij->Mvctx,viewer); CHKERRQ(ierr);
724       return 0;
725     }
726     else if (format == VIEWER_FORMAT_ASCII_INFO) {
727       return 0;
728     }
729   }
730 
731   if (vtype == DRAW_VIEWER) {
732     Draw       draw;
733     PetscTruth isnull;
734     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
735     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
736   }
737 
738   if (vtype == ASCII_FILE_VIEWER) {
739     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
740     PetscSequentialPhaseBegin(mat->comm,1);
741     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
742            aij->rank,aij->m,aij->rstart,aij->rend,aij->n,aij->cstart,
743            aij->cend);
744     ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
745     ierr = MatView(aij->B,viewer); CHKERRQ(ierr);
746     fflush(fd);
747     PetscSequentialPhaseEnd(mat->comm,1);
748   }
749   else {
750     int size = aij->size;
751     rank = aij->rank;
752     if (size == 1) {
753       ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
754     }
755     else {
756       /* assemble the entire matrix onto first processor. */
757       Mat         A;
758       Mat_SeqAIJ *Aloc;
759       int         M = aij->M, N = aij->N,m,*ai,*aj,row,*cols,i,*ct;
760       Scalar      *a;
761 
762       if (!rank) {
763         ierr = MatCreateMPIAIJ(mat->comm,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
764                CHKERRQ(ierr);
765       }
766       else {
767         ierr = MatCreateMPIAIJ(mat->comm,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
768                CHKERRQ(ierr);
769       }
770       PLogObjectParent(mat,A);
771 
772       /* copy over the A part */
773       Aloc = (Mat_SeqAIJ*) aij->A->data;
774       m = Aloc->m; ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
775       row = aij->rstart;
776       for ( i=0; i<ai[m]+shift; i++ ) {aj[i] += aij->cstart + shift;}
777       for ( i=0; i<m; i++ ) {
778         ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],aj,a,INSERT_VALUES);CHKERRQ(ierr);
779         row++; a += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
780       }
781       aj = Aloc->j;
782       for ( i=0; i<ai[m]+shift; i++ ) {aj[i] -= aij->cstart + shift;}
783 
784       /* copy over the B part */
785       Aloc = (Mat_SeqAIJ*) aij->B->data;
786       m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
787       row = aij->rstart;
788       ct = cols = (int *) PetscMalloc( (ai[m]+1)*sizeof(int) ); CHKPTRQ(cols);
789       for ( i=0; i<ai[m]+shift; i++ ) {cols[i] = aij->garray[aj[i]+shift];}
790       for ( i=0; i<m; i++ ) {
791         ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],cols,a,INSERT_VALUES);CHKERRQ(ierr);
792         row++; a += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
793       }
794       PetscFree(ct);
795       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
796       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
797       if (!rank) {
798         ierr = MatView(((Mat_MPIAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
799       }
800       ierr = MatDestroy(A); CHKERRQ(ierr);
801     }
802   }
803   return 0;
804 }
805 
806 #undef __FUNC__
807 #define __FUNC__ "MatView_MPIAIJ"
808 static int MatView_MPIAIJ(PetscObject obj,Viewer viewer)
809 {
810   Mat         mat = (Mat) obj;
811   int         ierr;
812   ViewerType  vtype;
813 
814   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
815   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
816       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
817     ierr = MatView_MPIAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
818   }
819   else if (vtype == BINARY_FILE_VIEWER) {
820     return MatView_MPIAIJ_Binary(mat,viewer);
821   }
822   return 0;
823 }
824 
825 /*
826     This has to provide several versions.
827 
828      2) a) use only local smoothing updating outer values only once.
829         b) local smoothing updating outer values each inner iteration
830      3) color updating out values betwen colors.
831 */
832 #undef __FUNC__
833 #define __FUNC__ "MatRelax_MPIAIJ"
834 static int MatRelax_MPIAIJ(Mat matin,Vec bb,double omega,MatSORType flag,
835                            double fshift,int its,Vec xx)
836 {
837   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
838   Mat        AA = mat->A, BB = mat->B;
839   Mat_SeqAIJ *A = (Mat_SeqAIJ *) AA->data, *B = (Mat_SeqAIJ *)BB->data;
840   Scalar     *b,*x,*xs,*ls,d,*v,sum;
841   int        ierr,*idx, *diag;
842   int        n = mat->n, m = mat->m, i,shift = A->indexshift;
843 
844   VecGetArray(xx,&x); VecGetArray(bb,&b); VecGetArray(mat->lvec,&ls);
845   xs = x + shift; /* shift by one for index start of 1 */
846   ls = ls + shift;
847   if (!A->diag) {if ((ierr = MatMarkDiag_SeqAIJ(AA))) return ierr;}
848   diag = A->diag;
849   if ((flag & SOR_LOCAL_SYMMETRIC_SWEEP) == SOR_LOCAL_SYMMETRIC_SWEEP){
850     if (flag & SOR_ZERO_INITIAL_GUESS) {
851       return (*mat->A->ops.relax)(mat->A,bb,omega,flag,fshift,its,xx);
852     }
853     ierr=VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);
854     CHKERRQ(ierr);
855     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);
856     CHKERRQ(ierr);
857     while (its--) {
858       /* go down through the rows */
859       for ( i=0; i<m; i++ ) {
860         n    = A->i[i+1] - A->i[i];
861         idx  = A->j + A->i[i] + shift;
862         v    = A->a + A->i[i] + shift;
863         sum  = b[i];
864         SPARSEDENSEMDOT(sum,xs,v,idx,n);
865         d    = fshift + A->a[diag[i]+shift];
866         n    = B->i[i+1] - B->i[i];
867         idx  = B->j + B->i[i] + shift;
868         v    = B->a + B->i[i] + shift;
869         SPARSEDENSEMDOT(sum,ls,v,idx,n);
870         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
871       }
872       /* come up through the rows */
873       for ( i=m-1; i>-1; i-- ) {
874         n    = A->i[i+1] - A->i[i];
875         idx  = A->j + A->i[i] + shift;
876         v    = A->a + A->i[i] + shift;
877         sum  = b[i];
878         SPARSEDENSEMDOT(sum,xs,v,idx,n);
879         d    = fshift + A->a[diag[i]+shift];
880         n    = B->i[i+1] - B->i[i];
881         idx  = B->j + B->i[i] + shift;
882         v    = B->a + B->i[i] + shift;
883         SPARSEDENSEMDOT(sum,ls,v,idx,n);
884         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
885       }
886     }
887   }
888   else if (flag & SOR_LOCAL_FORWARD_SWEEP){
889     if (flag & SOR_ZERO_INITIAL_GUESS) {
890       return (*mat->A->ops.relax)(mat->A,bb,omega,flag,fshift,its,xx);
891     }
892     ierr=VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);
893     CHKERRQ(ierr);
894     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);
895     CHKERRQ(ierr);
896     while (its--) {
897       for ( i=0; i<m; i++ ) {
898         n    = A->i[i+1] - A->i[i];
899         idx  = A->j + A->i[i] + shift;
900         v    = A->a + A->i[i] + shift;
901         sum  = b[i];
902         SPARSEDENSEMDOT(sum,xs,v,idx,n);
903         d    = fshift + A->a[diag[i]+shift];
904         n    = B->i[i+1] - B->i[i];
905         idx  = B->j + B->i[i] + shift;
906         v    = B->a + B->i[i] + shift;
907         SPARSEDENSEMDOT(sum,ls,v,idx,n);
908         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
909       }
910     }
911   }
912   else if (flag & SOR_LOCAL_BACKWARD_SWEEP){
913     if (flag & SOR_ZERO_INITIAL_GUESS) {
914       return (*mat->A->ops.relax)(mat->A,bb,omega,flag,fshift,its,xx);
915     }
916     ierr = VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,
917                             mat->Mvctx); CHKERRQ(ierr);
918     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,
919                             mat->Mvctx); CHKERRQ(ierr);
920     while (its--) {
921       for ( i=m-1; i>-1; i-- ) {
922         n    = A->i[i+1] - A->i[i];
923         idx  = A->j + A->i[i] + shift;
924         v    = A->a + A->i[i] + shift;
925         sum  = b[i];
926         SPARSEDENSEMDOT(sum,xs,v,idx,n);
927         d    = fshift + A->a[diag[i]+shift];
928         n    = B->i[i+1] - B->i[i];
929         idx  = B->j + B->i[i] + shift;
930         v    = B->a + B->i[i] + shift;
931         SPARSEDENSEMDOT(sum,ls,v,idx,n);
932         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
933       }
934     }
935   }
936   else {
937     SETERRQ(1,0,"Option not supported");
938   }
939   return 0;
940 }
941 
942 #undef __FUNC__
943 #define __FUNC__ "MatGetInfo_MPIAIJ"
944 static int MatGetInfo_MPIAIJ(Mat matin,MatInfoType flag,MatInfo *info)
945 {
946   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
947   Mat        A = mat->A, B = mat->B;
948   int        ierr;
949   double     isend[5], irecv[5];
950 
951   info->rows_global    = (double)mat->M;
952   info->columns_global = (double)mat->N;
953   info->rows_local     = (double)mat->m;
954   info->columns_local  = (double)mat->N;
955   info->block_size     = 1.0;
956   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
957   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->nz_unneeded;
958   isend[3] = info->memory;  isend[4] = info->mallocs;
959   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
960   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->nz_unneeded;
961   isend[3] += info->memory;  isend[4] += info->mallocs;
962   if (flag == MAT_LOCAL) {
963     info->nz_used      = isend[0];
964     info->nz_allocated = isend[1];
965     info->nz_unneeded  = isend[2];
966     info->memory       = isend[3];
967     info->mallocs      = isend[4];
968   } else if (flag == MAT_GLOBAL_MAX) {
969     MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_MAX,matin->comm);
970     info->nz_used      = irecv[0];
971     info->nz_allocated = irecv[1];
972     info->nz_unneeded  = irecv[2];
973     info->memory       = irecv[3];
974     info->mallocs      = irecv[4];
975   } else if (flag == MAT_GLOBAL_SUM) {
976     MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_SUM,matin->comm);
977     info->nz_used      = irecv[0];
978     info->nz_allocated = irecv[1];
979     info->nz_unneeded  = irecv[2];
980     info->memory       = irecv[3];
981     info->mallocs      = irecv[4];
982   }
983   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
984   info->fill_ratio_needed = 0;
985   info->factor_mallocs    = 0;
986 
987   return 0;
988 }
989 
990 extern int MatLUFactorSymbolic_MPIAIJ(Mat,IS,IS,double,Mat*);
991 extern int MatLUFactorNumeric_MPIAIJ(Mat,Mat*);
992 extern int MatLUFactor_MPIAIJ(Mat,IS,IS,double);
993 extern int MatILUFactorSymbolic_MPIAIJ(Mat,IS,IS,double,int,Mat *);
994 extern int MatSolve_MPIAIJ(Mat,Vec,Vec);
995 extern int MatSolveAdd_MPIAIJ(Mat,Vec,Vec,Vec);
996 extern int MatSolveTrans_MPIAIJ(Mat,Vec,Vec);
997 extern int MatSolveTransAdd_MPIAIJ(Mat,Vec,Vec,Vec);
998 
999 #undef __FUNC__
1000 #define __FUNC__ "MatSetOption_MPIAIJ"
1001 static int MatSetOption_MPIAIJ(Mat A,MatOption op)
1002 {
1003   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
1004 
1005   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
1006       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
1007       op == MAT_COLUMNS_UNSORTED ||
1008       op == MAT_COLUMNS_SORTED) {
1009         MatSetOption(a->A,op);
1010         MatSetOption(a->B,op);
1011   } else if (op == MAT_ROW_ORIENTED) {
1012     a->roworiented = 1;
1013     MatSetOption(a->A,op);
1014     MatSetOption(a->B,op);
1015   } else if (op == MAT_ROWS_SORTED ||
1016              op == MAT_ROWS_UNSORTED ||
1017              op == MAT_SYMMETRIC ||
1018              op == MAT_STRUCTURALLY_SYMMETRIC ||
1019              op == MAT_YES_NEW_DIAGONALS)
1020     PLogInfo(A,"Info:MatSetOption_MPIAIJ:Option ignored\n");
1021   else if (op == MAT_COLUMN_ORIENTED) {
1022     a->roworiented = 0;
1023     MatSetOption(a->A,op);
1024     MatSetOption(a->B,op);
1025   } else if (op == MAT_IGNORE_OFF_PROCESSOR_ENTRIES) {
1026     a->donotstash = 1;
1027   } else if (op == MAT_NO_NEW_DIAGONALS)
1028     {SETERRQ(PETSC_ERR_SUP,0,"MAT_NO_NEW_DIAGONALS");}
1029   else
1030     {SETERRQ(PETSC_ERR_SUP,0,"unknown option");}
1031   return 0;
1032 }
1033 
1034 #undef __FUNC__
1035 #define __FUNC__ "MatGetSize_MPIAIJ"
1036 static int MatGetSize_MPIAIJ(Mat matin,int *m,int *n)
1037 {
1038   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1039   *m = mat->M; *n = mat->N;
1040   return 0;
1041 }
1042 
1043 #undef __FUNC__
1044 #define __FUNC__ "MatGetLocalSize_MPIAIJ"
1045 static int MatGetLocalSize_MPIAIJ(Mat matin,int *m,int *n)
1046 {
1047   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1048   *m = mat->m; *n = mat->N;
1049   return 0;
1050 }
1051 
1052 #undef __FUNC__
1053 #define __FUNC__ "MatGetOwnershipRange_MPIAIJ"
1054 static int MatGetOwnershipRange_MPIAIJ(Mat matin,int *m,int *n)
1055 {
1056   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1057   *m = mat->rstart; *n = mat->rend;
1058   return 0;
1059 }
1060 
1061 extern int MatGetRow_SeqAIJ(Mat,int,int*,int**,Scalar**);
1062 extern int MatRestoreRow_SeqAIJ(Mat,int,int*,int**,Scalar**);
1063 
1064 #undef __FUNC__
1065 #define __FUNC__ "MatGetRow_MPIAIJ"
1066 int MatGetRow_MPIAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
1067 {
1068   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1069   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
1070   int        i, ierr, *cworkA, *cworkB, **pcA, **pcB, cstart = mat->cstart;
1071   int        nztot, nzA, nzB, lrow, rstart = mat->rstart, rend = mat->rend;
1072   int        *cmap, *idx_p;
1073 
1074   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,0,"Already active");
1075   mat->getrowactive = PETSC_TRUE;
1076 
1077   if (!mat->rowvalues && (idx || v)) {
1078     /*
1079         allocate enough space to hold information from the longest row.
1080     */
1081     Mat_SeqAIJ *Aa = (Mat_SeqAIJ *) mat->A->data,*Ba = (Mat_SeqAIJ *) mat->B->data;
1082     int     max = 1,m = mat->m,tmp;
1083     for ( i=0; i<m; i++ ) {
1084       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
1085       if (max < tmp) { max = tmp; }
1086     }
1087     mat->rowvalues = (Scalar *) PetscMalloc( max*(sizeof(int)+sizeof(Scalar)));
1088     CHKPTRQ(mat->rowvalues);
1089     mat->rowindices = (int *) (mat->rowvalues + max);
1090   }
1091 
1092   if (row < rstart || row >= rend) SETERRQ(1,0,"Only local rows")
1093   lrow = row - rstart;
1094 
1095   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
1096   if (!v)   {pvA = 0; pvB = 0;}
1097   if (!idx) {pcA = 0; if (!v) pcB = 0;}
1098   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1099   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1100   nztot = nzA + nzB;
1101 
1102   cmap  = mat->garray;
1103   if (v  || idx) {
1104     if (nztot) {
1105       /* Sort by increasing column numbers, assuming A and B already sorted */
1106       int imark = -1;
1107       if (v) {
1108         *v = v_p = mat->rowvalues;
1109         for ( i=0; i<nzB; i++ ) {
1110           if (cmap[cworkB[i]] < cstart)   v_p[i] = vworkB[i];
1111           else break;
1112         }
1113         imark = i;
1114         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
1115         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
1116       }
1117       if (idx) {
1118         *idx = idx_p = mat->rowindices;
1119         if (imark > -1) {
1120           for ( i=0; i<imark; i++ ) {
1121             idx_p[i] = cmap[cworkB[i]];
1122           }
1123         } else {
1124           for ( i=0; i<nzB; i++ ) {
1125             if (cmap[cworkB[i]] < cstart)   idx_p[i] = cmap[cworkB[i]];
1126             else break;
1127           }
1128           imark = i;
1129         }
1130         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart + cworkA[i];
1131         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]];
1132       }
1133     }
1134     else {
1135       if (idx) *idx = 0;
1136       if (v)   *v   = 0;
1137     }
1138   }
1139   *nz = nztot;
1140   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1141   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1142   return 0;
1143 }
1144 
1145 #undef __FUNC__
1146 #define __FUNC__ "MatRestoreRow_MPIAIJ"
1147 int MatRestoreRow_MPIAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
1148 {
1149   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
1150   if (aij->getrowactive == PETSC_FALSE) {
1151     SETERRQ(1,0,"MatGetRow not called");
1152   }
1153   aij->getrowactive = PETSC_FALSE;
1154   return 0;
1155 }
1156 
1157 #undef __FUNC__
1158 #define __FUNC__ "MatNorm_MPIAIJ"
1159 static int MatNorm_MPIAIJ(Mat mat,NormType type,double *norm)
1160 {
1161   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
1162   Mat_SeqAIJ *amat = (Mat_SeqAIJ*) aij->A->data, *bmat = (Mat_SeqAIJ*) aij->B->data;
1163   int        ierr, i, j, cstart = aij->cstart,shift = amat->indexshift;
1164   double     sum = 0.0;
1165   Scalar     *v;
1166 
1167   if (aij->size == 1) {
1168     ierr =  MatNorm(aij->A,type,norm); CHKERRQ(ierr);
1169   } else {
1170     if (type == NORM_FROBENIUS) {
1171       v = amat->a;
1172       for (i=0; i<amat->nz; i++ ) {
1173 #if defined(PETSC_COMPLEX)
1174         sum += real(conj(*v)*(*v)); v++;
1175 #else
1176         sum += (*v)*(*v); v++;
1177 #endif
1178       }
1179       v = bmat->a;
1180       for (i=0; i<bmat->nz; i++ ) {
1181 #if defined(PETSC_COMPLEX)
1182         sum += real(conj(*v)*(*v)); v++;
1183 #else
1184         sum += (*v)*(*v); v++;
1185 #endif
1186       }
1187       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
1188       *norm = sqrt(*norm);
1189     }
1190     else if (type == NORM_1) { /* max column norm */
1191       double *tmp, *tmp2;
1192       int    *jj, *garray = aij->garray;
1193       tmp  = (double *) PetscMalloc( aij->N*sizeof(double) ); CHKPTRQ(tmp);
1194       tmp2 = (double *) PetscMalloc( aij->N*sizeof(double) ); CHKPTRQ(tmp2);
1195       PetscMemzero(tmp,aij->N*sizeof(double));
1196       *norm = 0.0;
1197       v = amat->a; jj = amat->j;
1198       for ( j=0; j<amat->nz; j++ ) {
1199         tmp[cstart + *jj++ + shift] += PetscAbsScalar(*v);  v++;
1200       }
1201       v = bmat->a; jj = bmat->j;
1202       for ( j=0; j<bmat->nz; j++ ) {
1203         tmp[garray[*jj++ + shift]] += PetscAbsScalar(*v); v++;
1204       }
1205       MPI_Allreduce(tmp,tmp2,aij->N,MPI_DOUBLE,MPI_SUM,mat->comm);
1206       for ( j=0; j<aij->N; j++ ) {
1207         if (tmp2[j] > *norm) *norm = tmp2[j];
1208       }
1209       PetscFree(tmp); PetscFree(tmp2);
1210     }
1211     else if (type == NORM_INFINITY) { /* max row norm */
1212       double ntemp = 0.0;
1213       for ( j=0; j<amat->m; j++ ) {
1214         v = amat->a + amat->i[j] + shift;
1215         sum = 0.0;
1216         for ( i=0; i<amat->i[j+1]-amat->i[j]; i++ ) {
1217           sum += PetscAbsScalar(*v); v++;
1218         }
1219         v = bmat->a + bmat->i[j] + shift;
1220         for ( i=0; i<bmat->i[j+1]-bmat->i[j]; i++ ) {
1221           sum += PetscAbsScalar(*v); v++;
1222         }
1223         if (sum > ntemp) ntemp = sum;
1224       }
1225       MPI_Allreduce(&ntemp,norm,1,MPI_DOUBLE,MPI_MAX,mat->comm);
1226     }
1227     else {
1228       SETERRQ(1,0,"No support for two norm");
1229     }
1230   }
1231   return 0;
1232 }
1233 
1234 #undef __FUNC__
1235 #define __FUNC__ "MatTranspose_MPIAIJ"
1236 static int MatTranspose_MPIAIJ(Mat A,Mat *matout)
1237 {
1238   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
1239   Mat_SeqAIJ *Aloc = (Mat_SeqAIJ *) a->A->data;
1240   int        ierr,shift = Aloc->indexshift;
1241   Mat        B;
1242   int        M = a->M, N = a->N,m,*ai,*aj,row,*cols,i,*ct;
1243   Scalar     *array;
1244 
1245   if (matout == PETSC_NULL && M != N)
1246     SETERRQ(1,0,"Square matrix only for in-place");
1247   ierr = MatCreateMPIAIJ(A->comm,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,
1248          PETSC_NULL,&B); CHKERRQ(ierr);
1249 
1250   /* copy over the A part */
1251   Aloc = (Mat_SeqAIJ*) a->A->data;
1252   m = Aloc->m; ai = Aloc->i; aj = Aloc->j; array = Aloc->a;
1253   row = a->rstart;
1254   for ( i=0; i<ai[m]+shift; i++ ) {aj[i] += a->cstart + shift;}
1255   for ( i=0; i<m; i++ ) {
1256     ierr = MatSetValues(B,ai[i+1]-ai[i],aj,1,&row,array,INSERT_VALUES);CHKERRQ(ierr);
1257     row++; array += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
1258   }
1259   aj = Aloc->j;
1260   for ( i=0; i<ai[m]+shift; i++ ) {aj[i] -= a->cstart + shift;}
1261 
1262   /* copy over the B part */
1263   Aloc = (Mat_SeqAIJ*) a->B->data;
1264   m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; array = Aloc->a;
1265   row = a->rstart;
1266   ct = cols = (int *) PetscMalloc( (1+ai[m]-shift)*sizeof(int) ); CHKPTRQ(cols);
1267   for ( i=0; i<ai[m]+shift; i++ ) {cols[i] = a->garray[aj[i]+shift];}
1268   for ( i=0; i<m; i++ ) {
1269     ierr = MatSetValues(B,ai[i+1]-ai[i],cols,1,&row,array,INSERT_VALUES);CHKERRQ(ierr);
1270     row++; array += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
1271   }
1272   PetscFree(ct);
1273   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1274   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1275   if (matout != PETSC_NULL) {
1276     *matout = B;
1277   } else {
1278     /* This isn't really an in-place transpose .... but free data structures from a */
1279     PetscFree(a->rowners);
1280     ierr = MatDestroy(a->A); CHKERRQ(ierr);
1281     ierr = MatDestroy(a->B); CHKERRQ(ierr);
1282     if (a->colmap) PetscFree(a->colmap);
1283     if (a->garray) PetscFree(a->garray);
1284     if (a->lvec) VecDestroy(a->lvec);
1285     if (a->Mvctx) VecScatterDestroy(a->Mvctx);
1286     PetscFree(a);
1287     PetscMemcpy(A,B,sizeof(struct _Mat));
1288     PetscHeaderDestroy(B);
1289   }
1290   return 0;
1291 }
1292 
1293 #undef __FUNC__
1294 #define __FUNC__ "MatDiagonalScale_MPIAIJ"
1295 int MatDiagonalScale_MPIAIJ(Mat mat,Vec ll,Vec rr)
1296 {
1297   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
1298   Mat a = aij->A, b = aij->B;
1299   int ierr,s1,s2,s3;
1300 
1301   ierr = MatGetLocalSize(mat,&s2,&s3); CHKERRQ(ierr);
1302   if (rr) {
1303     s3 = aij->n;
1304     VecGetLocalSize_Fast(rr,s1);
1305     if (s1!=s3) SETERRQ(1,0,"right vector non-conforming local size");
1306     /* Overlap communication with computation. */
1307     ierr = VecScatterBegin(rr,aij->lvec,INSERT_VALUES,SCATTER_FORWARD,aij->Mvctx); CHKERRQ(ierr);
1308   }
1309   if (ll) {
1310     VecGetLocalSize_Fast(ll,s1);
1311     if (s1!=s2) SETERRQ(1,0,"left vector non-conforming local size");
1312     ierr = (*b->ops.diagonalscale)(b,ll,0); CHKERRQ(ierr);
1313   }
1314   /* scale  the diagonal block */
1315   ierr = (*a->ops.diagonalscale)(a,ll,rr); CHKERRQ(ierr);
1316 
1317   if (rr) {
1318     /* Do a scatter end and then right scale the off-diagonal block */
1319     ierr = VecScatterEnd(rr,aij->lvec,INSERT_VALUES,SCATTER_FORWARD,aij->Mvctx); CHKERRQ(ierr);
1320     ierr = (*b->ops.diagonalscale)(b,0,aij->lvec); CHKERRQ(ierr);
1321   }
1322 
1323   return 0;
1324 }
1325 
1326 
1327 extern int MatPrintHelp_SeqAIJ(Mat);
1328 #undef __FUNC__
1329 #define __FUNC__ "MatPrintHelp_MPIAIJ"
1330 static int MatPrintHelp_MPIAIJ(Mat A)
1331 {
1332   Mat_MPIAIJ *a   = (Mat_MPIAIJ*) A->data;
1333 
1334   if (!a->rank) return MatPrintHelp_SeqAIJ(a->A);
1335   else return 0;
1336 }
1337 
1338 #undef __FUNC__
1339 #define __FUNC__ "MatGetBlockSize_MPIAIJ"
1340 static int MatGetBlockSize_MPIAIJ(Mat A,int *bs)
1341 {
1342   *bs = 1;
1343   return 0;
1344 }
1345 #undef __FUNC__
1346 #define __FUNC__ "MatSetUnfactored_MPIAIJ"
1347 static int MatSetUnfactored_MPIAIJ(Mat A)
1348 {
1349   Mat_MPIAIJ *a   = (Mat_MPIAIJ*) A->data;
1350   int        ierr;
1351   ierr = MatSetUnfactored(a->A); CHKERRQ(ierr);
1352   return 0;
1353 }
1354 
1355 
1356 extern int MatConvert_MPIAIJ(Mat,MatType,Mat *);
1357 static int MatConvertSameType_MPIAIJ(Mat,Mat *,int);
1358 extern int MatIncreaseOverlap_MPIAIJ(Mat , int, IS *, int);
1359 extern int MatFDColoringCreate_MPIAIJ(Mat,ISColoring,MatFDColoring);
1360 extern int MatGetSubMatrices_MPIAIJ (Mat ,int , IS *,IS *,MatGetSubMatrixCall,Mat **);
1361 /* -------------------------------------------------------------------*/
1362 static struct _MatOps MatOps = {MatSetValues_MPIAIJ,
1363        MatGetRow_MPIAIJ,MatRestoreRow_MPIAIJ,
1364        MatMult_MPIAIJ,MatMultAdd_MPIAIJ,
1365        MatMultTrans_MPIAIJ,MatMultTransAdd_MPIAIJ,
1366        0,0,
1367        0,0,
1368        0,0,
1369        MatRelax_MPIAIJ,
1370        MatTranspose_MPIAIJ,
1371        MatGetInfo_MPIAIJ,0,
1372        MatGetDiagonal_MPIAIJ,MatDiagonalScale_MPIAIJ,MatNorm_MPIAIJ,
1373        MatAssemblyBegin_MPIAIJ,MatAssemblyEnd_MPIAIJ,
1374        0,
1375        MatSetOption_MPIAIJ,MatZeroEntries_MPIAIJ,MatZeroRows_MPIAIJ,
1376        0,0,0,0,
1377        MatGetSize_MPIAIJ,MatGetLocalSize_MPIAIJ,MatGetOwnershipRange_MPIAIJ,
1378        0,0,
1379        0,0,MatConvert_MPIAIJ,MatConvertSameType_MPIAIJ,0,0,
1380        0,0,0,
1381        MatGetSubMatrices_MPIAIJ,MatIncreaseOverlap_MPIAIJ,MatGetValues_MPIAIJ,0,
1382        MatPrintHelp_MPIAIJ,
1383        MatScale_MPIAIJ,0,0,0,
1384        MatGetBlockSize_MPIAIJ,0,0,0,0,
1385        MatFDColoringCreate_MPIAIJ,0,MatSetUnfactored_MPIAIJ};
1386 
1387 
1388 #undef __FUNC__
1389 #define __FUNC__ "MatCreateMPIAIJ"
1390 /*@C
1391    MatCreateMPIAIJ - Creates a sparse parallel matrix in AIJ format
1392    (the default parallel PETSc format).  For good matrix assembly performance
1393    the user should preallocate the matrix storage by setting the parameters
1394    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1395    performance can be increased by more than a factor of 50.
1396 
1397    Input Parameters:
1398 .  comm - MPI communicator
1399 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1400            This value should be the same as the local size used in creating the
1401            y vector for the matrix-vector product y = Ax.
1402 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1403            This value should be the same as the local size used in creating the
1404            x vector for the matrix-vector product y = Ax.
1405 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1406 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1407 .  d_nz - number of nonzeros per row in diagonal portion of local submatrix
1408            (same for all local rows)
1409 .  d_nzz - array containing the number of nonzeros in the various rows of the
1410            diagonal portion of the local submatrix (possibly different for each row)
1411            or PETSC_NULL. You must leave room for the diagonal entry even if
1412            it is zero.
1413 .  o_nz - number of nonzeros per row in the off-diagonal portion of local
1414            submatrix (same for all local rows).
1415 .  o_nzz - array containing the number of nonzeros in the various rows of the
1416            off-diagonal portion of the local submatrix (possibly different for
1417            each row) or PETSC_NULL.
1418 
1419    Output Parameter:
1420 .  A - the matrix
1421 
1422    Notes:
1423    The AIJ format (also called the Yale sparse matrix format or
1424    compressed row storage), is fully compatible with standard Fortran 77
1425    storage.  That is, the stored row and column indices can begin at
1426    either one (as in Fortran) or zero.  See the users manual for details.
1427 
1428    The user MUST specify either the local or global matrix dimensions
1429    (possibly both).
1430 
1431    By default, this format uses inodes (identical nodes) when possible.
1432    We search for consecutive rows with the same nonzero structure, thereby
1433    reusing matrix information to achieve increased efficiency.
1434 
1435    Options Database Keys:
1436 $    -mat_aij_no_inode  - Do not use inodes
1437 $    -mat_aij_inode_limit <limit> - Set inode limit.
1438 $        (max limit=5)
1439 $    -mat_aij_oneindex - Internally use indexing starting at 1
1440 $        rather than 0.  Note: When calling MatSetValues(),
1441 $        the user still MUST index entries starting at 0!
1442 
1443    Storage Information:
1444    For a square global matrix we define each processor's diagonal portion
1445    to be its local rows and the corresponding columns (a square submatrix);
1446    each processor's off-diagonal portion encompasses the remainder of the
1447    local matrix (a rectangular submatrix).
1448 
1449    The user can specify preallocated storage for the diagonal part of
1450    the local submatrix with either d_nz or d_nnz (not both).  Set
1451    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1452    memory allocation.  Likewise, specify preallocated storage for the
1453    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1454 
1455    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1456    the figure below we depict these three local rows and all columns (0-11).
1457 
1458 $          0 1 2 3 4 5 6 7 8 9 10 11
1459 $         -------------------
1460 $  row 3  |  o o o d d d o o o o o o
1461 $  row 4  |  o o o d d d o o o o o o
1462 $  row 5  |  o o o d d d o o o o o o
1463 $         -------------------
1464 $
1465 
1466    Thus, any entries in the d locations are stored in the d (diagonal)
1467    submatrix, and any entries in the o locations are stored in the
1468    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1469    stored simply in the MATSEQAIJ format for compressed row storage.
1470 
1471    Now d_nz should indicate the number of nonzeros per row in the d matrix,
1472    and o_nz should indicate the number of nonzeros per row in the o matrix.
1473    In general, for PDE problems in which most nonzeros are near the diagonal,
1474    one expects d_nz >> o_nz. For large problems you MUST preallocate memory
1475    or you will get TERRIBLE performance; see the users' manual chapter on
1476    matrices.
1477 
1478 .keywords: matrix, aij, compressed row, sparse, parallel
1479 
1480 .seealso: MatCreate(), MatCreateSeqAIJ(), MatSetValues()
1481 @*/
1482 int MatCreateMPIAIJ(MPI_Comm comm,int m,int n,int M,int N,
1483                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1484 {
1485   Mat          B;
1486   Mat_MPIAIJ   *b;
1487   int          ierr, i,sum[2],work[2],size;
1488 
1489   *A = 0;
1490   PetscHeaderCreate(B,_Mat,MAT_COOKIE,MATMPIAIJ,comm);
1491   PLogObjectCreate(B);
1492   B->data       = (void *) (b = PetscNew(Mat_MPIAIJ)); CHKPTRQ(b);
1493   PetscMemzero(b,sizeof(Mat_MPIAIJ));
1494   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1495   MPI_Comm_size(comm,&size);
1496   if (size == 1) {
1497     B->ops.getrowij          = MatGetRowIJ_MPIAIJ;
1498     B->ops.restorerowij      = MatRestoreRowIJ_MPIAIJ;
1499     B->ops.lufactorsymbolic  = MatLUFactorSymbolic_MPIAIJ;
1500     B->ops.lufactornumeric   = MatLUFactorNumeric_MPIAIJ;
1501     B->ops.lufactor          = MatLUFactor_MPIAIJ;
1502     B->ops.solve             = MatSolve_MPIAIJ;
1503     B->ops.solveadd          = MatSolveAdd_MPIAIJ;
1504     B->ops.solvetrans        = MatSolveTrans_MPIAIJ;
1505     B->ops.solvetransadd     = MatSolveTransAdd_MPIAIJ;
1506     B->ops.ilufactorsymbolic = MatILUFactorSymbolic_MPIAIJ;
1507   }
1508   B->destroy    = MatDestroy_MPIAIJ;
1509   B->view       = MatView_MPIAIJ;
1510   B->factor     = 0;
1511   B->assembled  = PETSC_FALSE;
1512   B->mapping    = 0;
1513 
1514   b->insertmode = NOT_SET_VALUES;
1515   MPI_Comm_rank(comm,&b->rank);
1516   MPI_Comm_size(comm,&b->size);
1517 
1518   if (m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
1519     SETERRQ(1,0,"Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1520 
1521   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1522     work[0] = m; work[1] = n;
1523     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
1524     if (M == PETSC_DECIDE) M = sum[0];
1525     if (N == PETSC_DECIDE) N = sum[1];
1526   }
1527   if (m == PETSC_DECIDE) {m = M/b->size + ((M % b->size) > b->rank);}
1528   if (n == PETSC_DECIDE) {n = N/b->size + ((N % b->size) > b->rank);}
1529   b->m = m; B->m = m;
1530   b->n = n; B->n = n;
1531   b->N = N; B->N = N;
1532   b->M = M; B->M = M;
1533 
1534   /* build local table of row and column ownerships */
1535   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1536   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIAIJ));
1537   b->cowners = b->rowners + b->size + 2;
1538   MPI_Allgather(&m,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
1539   b->rowners[0] = 0;
1540   for ( i=2; i<=b->size; i++ ) {
1541     b->rowners[i] += b->rowners[i-1];
1542   }
1543   b->rstart = b->rowners[b->rank];
1544   b->rend   = b->rowners[b->rank+1];
1545   MPI_Allgather(&n,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
1546   b->cowners[0] = 0;
1547   for ( i=2; i<=b->size; i++ ) {
1548     b->cowners[i] += b->cowners[i-1];
1549   }
1550   b->cstart = b->cowners[b->rank];
1551   b->cend   = b->cowners[b->rank+1];
1552 
1553   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1554   ierr = MatCreateSeqAIJ(MPI_COMM_SELF,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1555   PLogObjectParent(B,b->A);
1556   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1557   ierr = MatCreateSeqAIJ(MPI_COMM_SELF,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1558   PLogObjectParent(B,b->B);
1559 
1560   /* build cache for off array entries formed */
1561   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1562   b->donotstash  = 0;
1563   b->colmap      = 0;
1564   b->garray      = 0;
1565   b->roworiented = 1;
1566 
1567   /* stuff used for matrix vector multiply */
1568   b->lvec      = 0;
1569   b->Mvctx     = 0;
1570 
1571   /* stuff for MatGetRow() */
1572   b->rowindices   = 0;
1573   b->rowvalues    = 0;
1574   b->getrowactive = PETSC_FALSE;
1575 
1576   *A = B;
1577   return 0;
1578 }
1579 
1580 #undef __FUNC__
1581 #define __FUNC__ "MatConvertSameType_MPIAIJ"
1582 static int MatConvertSameType_MPIAIJ(Mat matin,Mat *newmat,int cpvalues)
1583 {
1584   Mat        mat;
1585   Mat_MPIAIJ *a,*oldmat = (Mat_MPIAIJ *) matin->data;
1586   int        ierr, len=0, flg;
1587 
1588   *newmat       = 0;
1589   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIAIJ,matin->comm);
1590   PLogObjectCreate(mat);
1591   mat->data       = (void *) (a = PetscNew(Mat_MPIAIJ)); CHKPTRQ(a);
1592   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1593   mat->destroy    = MatDestroy_MPIAIJ;
1594   mat->view       = MatView_MPIAIJ;
1595   mat->factor     = matin->factor;
1596   mat->assembled  = PETSC_TRUE;
1597 
1598   a->m = mat->m   = oldmat->m;
1599   a->n = mat->n   = oldmat->n;
1600   a->M = mat->M   = oldmat->M;
1601   a->N = mat->N   = oldmat->N;
1602 
1603   a->rstart       = oldmat->rstart;
1604   a->rend         = oldmat->rend;
1605   a->cstart       = oldmat->cstart;
1606   a->cend         = oldmat->cend;
1607   a->size         = oldmat->size;
1608   a->rank         = oldmat->rank;
1609   a->insertmode   = NOT_SET_VALUES;
1610   a->rowvalues    = 0;
1611   a->getrowactive = PETSC_FALSE;
1612 
1613   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1614   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIAIJ));
1615   a->cowners = a->rowners + a->size + 2;
1616   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1617   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1618   if (oldmat->colmap) {
1619     a->colmap = (int *) PetscMalloc((a->N)*sizeof(int));CHKPTRQ(a->colmap);
1620     PLogObjectMemory(mat,(a->N)*sizeof(int));
1621     PetscMemcpy(a->colmap,oldmat->colmap,(a->N)*sizeof(int));
1622   } else a->colmap = 0;
1623   if (oldmat->garray) {
1624     len = ((Mat_SeqAIJ *) (oldmat->B->data))->n;
1625     a->garray = (int *) PetscMalloc((len+1)*sizeof(int)); CHKPTRQ(a->garray);
1626     PLogObjectMemory(mat,len*sizeof(int));
1627     if (len) PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1628   } else a->garray = 0;
1629 
1630   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1631   PLogObjectParent(mat,a->lvec);
1632   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1633   PLogObjectParent(mat,a->Mvctx);
1634   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1635   PLogObjectParent(mat,a->A);
1636   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1637   PLogObjectParent(mat,a->B);
1638   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1639   if (flg) {
1640     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1641   }
1642   *newmat = mat;
1643   return 0;
1644 }
1645 
1646 #include "sys.h"
1647 
1648 #undef __FUNC__
1649 #define __FUNC__ "MatLoad_MPIAIJ"
1650 int MatLoad_MPIAIJ(Viewer viewer,MatType type,Mat *newmat)
1651 {
1652   Mat          A;
1653   int          i, nz, ierr, j,rstart, rend, fd;
1654   Scalar       *vals,*svals;
1655   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1656   MPI_Status   status;
1657   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,maxnz,*cols;
1658   int          *ourlens,*sndcounts = 0,*procsnz = 0, *offlens,jj,*mycols,*smycols;
1659   int          tag = ((PetscObject)viewer)->tag;
1660 
1661   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1662   if (!rank) {
1663     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1664     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
1665     if (header[0] != MAT_COOKIE) SETERRQ(1,0,"not matrix object");
1666   }
1667 
1668   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1669   M = header[1]; N = header[2];
1670   /* determine ownership of all rows */
1671   m = M/size + ((M % size) > rank);
1672   rowners = (int *) PetscMalloc((size+2)*sizeof(int)); CHKPTRQ(rowners);
1673   MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1674   rowners[0] = 0;
1675   for ( i=2; i<=size; i++ ) {
1676     rowners[i] += rowners[i-1];
1677   }
1678   rstart = rowners[rank];
1679   rend   = rowners[rank+1];
1680 
1681   /* distribute row lengths to all processors */
1682   ourlens = (int*) PetscMalloc( 2*(rend-rstart)*sizeof(int) ); CHKPTRQ(ourlens);
1683   offlens = ourlens + (rend-rstart);
1684   if (!rank) {
1685     rowlengths = (int*) PetscMalloc( M*sizeof(int) ); CHKPTRQ(rowlengths);
1686     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1687     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1688     for ( i=0; i<size; i++ ) sndcounts[i] = rowners[i+1] - rowners[i];
1689     MPI_Scatterv(rowlengths,sndcounts,rowners,MPI_INT,ourlens,rend-rstart,MPI_INT,0,comm);
1690     PetscFree(sndcounts);
1691   }
1692   else {
1693     MPI_Scatterv(0,0,0,MPI_INT,ourlens,rend-rstart,MPI_INT, 0,comm);
1694   }
1695 
1696   if (!rank) {
1697     /* calculate the number of nonzeros on each processor */
1698     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1699     PetscMemzero(procsnz,size*sizeof(int));
1700     for ( i=0; i<size; i++ ) {
1701       for ( j=rowners[i]; j< rowners[i+1]; j++ ) {
1702         procsnz[i] += rowlengths[j];
1703       }
1704     }
1705     PetscFree(rowlengths);
1706 
1707     /* determine max buffer needed and allocate it */
1708     maxnz = 0;
1709     for ( i=0; i<size; i++ ) {
1710       maxnz = PetscMax(maxnz,procsnz[i]);
1711     }
1712     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1713 
1714     /* read in my part of the matrix column indices  */
1715     nz = procsnz[0];
1716     mycols = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1717     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1718 
1719     /* read in every one elses and ship off */
1720     for ( i=1; i<size; i++ ) {
1721       nz = procsnz[i];
1722       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1723       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1724     }
1725     PetscFree(cols);
1726   }
1727   else {
1728     /* determine buffer space needed for message */
1729     nz = 0;
1730     for ( i=0; i<m; i++ ) {
1731       nz += ourlens[i];
1732     }
1733     mycols = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1734 
1735     /* receive message of column indices*/
1736     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1737     MPI_Get_count(&status,MPI_INT,&maxnz);
1738     if (maxnz != nz) SETERRQ(1,0,"something is wrong with file");
1739   }
1740 
1741   /* loop over local rows, determining number of off diagonal entries */
1742   PetscMemzero(offlens,m*sizeof(int));
1743   jj = 0;
1744   for ( i=0; i<m; i++ ) {
1745     for ( j=0; j<ourlens[i]; j++ ) {
1746       if (mycols[jj] < rstart || mycols[jj] >= rend) offlens[i]++;
1747       jj++;
1748     }
1749   }
1750 
1751   /* create our matrix */
1752   for ( i=0; i<m; i++ ) {
1753     ourlens[i] -= offlens[i];
1754   }
1755   ierr = MatCreateMPIAIJ(comm,m,PETSC_DECIDE,M,N,0,ourlens,0,offlens,newmat);CHKERRQ(ierr);
1756   A = *newmat;
1757   MatSetOption(A,MAT_COLUMNS_SORTED);
1758   for ( i=0; i<m; i++ ) {
1759     ourlens[i] += offlens[i];
1760   }
1761 
1762   if (!rank) {
1763     vals = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(vals);
1764 
1765     /* read in my part of the matrix numerical values  */
1766     nz = procsnz[0];
1767     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1768 
1769     /* insert into matrix */
1770     jj      = rstart;
1771     smycols = mycols;
1772     svals   = vals;
1773     for ( i=0; i<m; i++ ) {
1774       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1775       smycols += ourlens[i];
1776       svals   += ourlens[i];
1777       jj++;
1778     }
1779 
1780     /* read in other processors and ship out */
1781     for ( i=1; i<size; i++ ) {
1782       nz = procsnz[i];
1783       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1784       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1785     }
1786     PetscFree(procsnz);
1787   }
1788   else {
1789     /* receive numeric values */
1790     vals = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(vals);
1791 
1792     /* receive message of values*/
1793     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1794     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1795     if (maxnz != nz) SETERRQ(1,0,"something is wrong with file");
1796 
1797     /* insert into matrix */
1798     jj      = rstart;
1799     smycols = mycols;
1800     svals   = vals;
1801     for ( i=0; i<m; i++ ) {
1802       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1803       smycols += ourlens[i];
1804       svals   += ourlens[i];
1805       jj++;
1806     }
1807   }
1808   PetscFree(ourlens); PetscFree(vals); PetscFree(mycols); PetscFree(rowners);
1809 
1810   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1811   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1812   return 0;
1813 }
1814