xref: /petsc/src/mat/impls/aij/mpi/mpiaij.c (revision 0bbee1111da17c17490e1dbef69beeec19afa96b)
1 #ifndef lint
2 static char vcid[] = "$Id: mpiaij.c,v 1.187 1997/01/14 20:34:14 balay Exp balay $";
3 #endif
4 
5 #include "src/mat/impls/aij/mpi/mpiaij.h"
6 #include "src/vec/vecimpl.h"
7 #include "src/inline/spops.h"
8 
9 /* local utility routine that creates a mapping from the global column
10 number to the local number in the off-diagonal part of the local
11 storage of the matrix.  This is done in a non scable way since the
12 length of colmap equals the global matrix length.
13 */
14 #undef __FUNC__
15 #define __FUNC__ "CreateColmap_MPIAIJ_Private"
16 int CreateColmap_MPIAIJ_Private(Mat mat)
17 {
18   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
19   Mat_SeqAIJ *B = (Mat_SeqAIJ*) aij->B->data;
20   int        n = B->n,i;
21 
22   aij->colmap = (int *) PetscMalloc(aij->N*sizeof(int));CHKPTRQ(aij->colmap);
23   PLogObjectMemory(mat,aij->N*sizeof(int));
24   PetscMemzero(aij->colmap,aij->N*sizeof(int));
25   for ( i=0; i<n; i++ ) aij->colmap[aij->garray[i]] = i+1;
26   return 0;
27 }
28 
29 extern int DisAssemble_MPIAIJ(Mat);
30 
31 #undef __FUNC__
32 #define __FUNC__ "MatGetRowIJ_MPIAIJ"
33 static int MatGetRowIJ_MPIAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
34                            PetscTruth *done)
35 {
36   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
37   int        ierr;
38   if (aij->size == 1) {
39     ierr = MatGetRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
40   } else SETERRQ(1,0,"not supported in parallel");
41   return 0;
42 }
43 
44 #undef __FUNC__
45 #define __FUNC__ "MatRestoreRowIJ_MPIAIJ"
46 static int MatRestoreRowIJ_MPIAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
47                                PetscTruth *done)
48 {
49   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
50   int        ierr;
51   if (aij->size == 1) {
52     ierr = MatRestoreRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
53   } else SETERRQ(1,0,"not supported in parallel");
54   return 0;
55 }
56 
57 #define CHUNKSIZE   15
58 #define MatSetValues_SeqAIJ_A_Private(A,row,col,value,addv) \
59 { \
60  \
61     rp   = aj + ai[row] + shift; ap = aa + ai[row] + shift; \
62     rmax = imax[row]; nrow = ailen[row];  \
63     col1 = col - shift; \
64      \
65       for ( _i=0; _i<nrow; _i++ ) { \
66         if (rp[_i] > col1) break; \
67         if (rp[_i] == col1) { \
68           if (addv == ADD_VALUES) ap[_i] += value;   \
69           else                  ap[_i] = value; \
70           goto _noinsert; \
71         } \
72       }  \
73       if (nonew)  goto _noinsert; \
74       if (nrow >= rmax) { \
75         /* there is no extra room in row, therefore enlarge */ \
76         int    new_nz = ai[a->m] + CHUNKSIZE,len,*new_i,*new_j; \
77         Scalar *new_a; \
78  \
79         /* malloc new storage space */ \
80         len     = new_nz*(sizeof(int)+sizeof(Scalar))+(a->m+1)*sizeof(int); \
81         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
82         new_j   = (int *) (new_a + new_nz); \
83         new_i   = new_j + new_nz; \
84  \
85         /* copy over old data into new slots */ \
86         for ( ii=0; ii<row+1; ii++ ) {new_i[ii] = ai[ii];} \
87         for ( ii=row+1; ii<a->m+1; ii++ ) {new_i[ii] = ai[ii]+CHUNKSIZE;} \
88         PetscMemcpy(new_j,aj,(ai[row]+nrow+shift)*sizeof(int)); \
89         len = (new_nz - CHUNKSIZE - ai[row] - nrow - shift); \
90         PetscMemcpy(new_j+ai[row]+shift+nrow+CHUNKSIZE,aj+ai[row]+shift+nrow, \
91                                                            len*sizeof(int)); \
92         PetscMemcpy(new_a,aa,(ai[row]+nrow+shift)*sizeof(Scalar)); \
93         PetscMemcpy(new_a+ai[row]+shift+nrow+CHUNKSIZE,aa+ai[row]+shift+nrow, \
94                                                            len*sizeof(Scalar));  \
95         /* free up old matrix storage */ \
96  \
97         PetscFree(a->a);  \
98         if (!a->singlemalloc) {PetscFree(a->i);PetscFree(a->j);} \
99         aa = a->a = new_a; ai = a->i = new_i; aj = a->j = new_j;  \
100         a->singlemalloc = 1; \
101  \
102         rp   = aj + ai[row] + shift; ap = aa + ai[row] + shift; \
103         rmax = imax[row] = imax[row] + CHUNKSIZE; \
104         PLogObjectMemory(A,CHUNKSIZE*(sizeof(int) + sizeof(Scalar))); \
105         a->maxnz += CHUNKSIZE; \
106         a->reallocs++; \
107       } \
108       N = nrow++ - 1; a->nz++; \
109       /* shift up all the later entries in this row */ \
110       for ( ii=N; ii>=_i; ii-- ) { \
111         rp[ii+1] = rp[ii]; \
112         ap[ii+1] = ap[ii]; \
113       } \
114       rp[_i] = col1;  \
115       ap[_i] = value;  \
116       _noinsert: ; \
117       ailen[row] = nrow; \
118 }
119 
120 extern int MatSetValues_SeqAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
121 #undef __FUNC__
122 #define __FUNC__ "MatSetValues_MPIAIJ"
123 static int MatSetValues_MPIAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
124 {
125   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
126   Scalar     value;
127   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
128   int        cstart = aij->cstart, cend = aij->cend,row,col;
129   int        roworiented = aij->roworiented;
130 
131   /* Some Variables required in the macro */
132   Mat        A = aij->A;
133   Mat_SeqAIJ *a = (Mat_SeqAIJ *) A->data;
134   int        *rp,ii,nrow,_i,rmax, N, col1;
135   int        *imax = a->imax, *ai = a->i, *ailen = a->ilen;
136   int        *aj = a->j, nonew = a->nonew,shift = a->indexshift;
137   Scalar     *ap, *aa = a->a;
138 
139 #if defined(PETSC_BOPT_g)
140   if (aij->insertmode != NOT_SET_VALUES && aij->insertmode != addv) {
141     SETERRQ(1,0,"Cannot mix inserts and adds");
142   }
143 #endif
144   aij->insertmode = addv;
145   for ( i=0; i<m; i++ ) {
146 #if defined(PETSC_BOPT_g)
147     if (im[i] < 0) SETERRQ(1,0,"Negative row");
148     if (im[i] >= aij->M) SETERRQ(1,0,"Row too large");
149 #endif
150     if (im[i] >= rstart && im[i] < rend) {
151       row = im[i] - rstart;
152       for ( j=0; j<n; j++ ) {
153         if (in[j] >= cstart && in[j] < cend){
154           col = in[j] - cstart;
155           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
156           MatSetValues_SeqAIJ_A_Private(aij->A,row,col,value,addv);
157           /* ierr = MatSetValues_SeqAIJ(aij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
158         }
159 #if defined(PETSC_BOPT_g)
160         else if (in[j] < 0) {SETERRQ(1,0,"Negative column");}
161         else if (in[j] >= aij->N) {SETERRQ(1,0,"Col too large");}
162 #endif
163         else {
164           if (mat->was_assembled) {
165             if (!aij->colmap) {
166               ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
167             }
168             col = aij->colmap[in[j]] - 1;
169             if (col < 0 && !((Mat_SeqAIJ*)(aij->A->data))->nonew) {
170               ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
171               col =  in[j];
172             }
173           }
174           else col = in[j];
175           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
176           ierr = MatSetValues_SeqAIJ(aij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
177         }
178       }
179     }
180     else {
181       if (roworiented && !aij->donotstash) {
182         ierr = StashValues_Private(&aij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
183       }
184       else {
185         if (!aij->donotstash) {
186           row = im[i];
187           for ( j=0; j<n; j++ ) {
188             ierr = StashValues_Private(&aij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
189           }
190         }
191       }
192     }
193   }
194   return 0;
195 }
196 
197 #undef __FUNC__
198 #define __FUNC__ "MatGetValues_MPIAIJ"
199 static int MatGetValues_MPIAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
200 {
201   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
202   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
203   int        cstart = aij->cstart, cend = aij->cend,row,col;
204 
205   for ( i=0; i<m; i++ ) {
206     if (idxm[i] < 0) SETERRQ(1,0,"Negative row");
207     if (idxm[i] >= aij->M) SETERRQ(1,0,"Row too large");
208     if (idxm[i] >= rstart && idxm[i] < rend) {
209       row = idxm[i] - rstart;
210       for ( j=0; j<n; j++ ) {
211         if (idxn[j] < 0) SETERRQ(1,0,"Negative column");
212         if (idxn[j] >= aij->N) SETERRQ(1,0,"Col too large");
213         if (idxn[j] >= cstart && idxn[j] < cend){
214           col = idxn[j] - cstart;
215           ierr = MatGetValues(aij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
216         }
217         else {
218           if (!aij->colmap) {
219             ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
220           }
221           col = aij->colmap[idxn[j]] - 1;
222           if ((col < 0) || (aij->garray[col] != idxn[j])) *(v+i*n+j) = 0.0;
223           else {
224             ierr = MatGetValues(aij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
225           }
226         }
227       }
228     }
229     else {
230       SETERRQ(1,0,"Only local values currently supported");
231     }
232   }
233   return 0;
234 }
235 
236 #undef __FUNC__
237 #define __FUNC__ "MatAssemblyBegin_MPIAIJ"
238 static int MatAssemblyBegin_MPIAIJ(Mat mat,MatAssemblyType mode)
239 {
240   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
241   MPI_Comm    comm = mat->comm;
242   int         size = aij->size, *owners = aij->rowners;
243   int         rank = aij->rank,tag = mat->tag, *owner,*starts,count,ierr;
244   MPI_Request *send_waits,*recv_waits;
245   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
246   InsertMode  addv;
247   Scalar      *rvalues,*svalues;
248 
249   /* make sure all processors are either in INSERTMODE or ADDMODE */
250   MPI_Allreduce(&aij->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
251   if (addv == (ADD_VALUES|INSERT_VALUES)) {
252     SETERRQ(1,0,"Some processors inserted others added");
253   }
254   aij->insertmode = addv; /* in case this processor had no cache */
255 
256   /*  first count number of contributors to each processor */
257   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
258   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
259   owner = (int *) PetscMalloc( (aij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
260   for ( i=0; i<aij->stash.n; i++ ) {
261     idx = aij->stash.idx[i];
262     for ( j=0; j<size; j++ ) {
263       if (idx >= owners[j] && idx < owners[j+1]) {
264         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
265       }
266     }
267   }
268   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
269 
270   /* inform other processors of number of messages and max length*/
271   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
272   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
273   nreceives = work[rank];
274   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
275   nmax = work[rank];
276   PetscFree(work);
277 
278   /* post receives:
279        1) each message will consist of ordered pairs
280      (global index,value) we store the global index as a double
281      to simplify the message passing.
282        2) since we don't know how long each individual message is we
283      allocate the largest needed buffer for each receive. Potentially
284      this is a lot of wasted space.
285 
286 
287        This could be done better.
288   */
289   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
290   CHKPTRQ(rvalues);
291   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
292   CHKPTRQ(recv_waits);
293   for ( i=0; i<nreceives; i++ ) {
294     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
295               comm,recv_waits+i);
296   }
297 
298   /* do sends:
299       1) starts[i] gives the starting index in svalues for stuff going to
300          the ith processor
301   */
302   svalues = (Scalar *) PetscMalloc(3*(aij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
303   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
304   CHKPTRQ(send_waits);
305   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
306   starts[0] = 0;
307   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
308   for ( i=0; i<aij->stash.n; i++ ) {
309     svalues[3*starts[owner[i]]]       = (Scalar)  aij->stash.idx[i];
310     svalues[3*starts[owner[i]]+1]     = (Scalar)  aij->stash.idy[i];
311     svalues[3*(starts[owner[i]]++)+2] =  aij->stash.array[i];
312   }
313   PetscFree(owner);
314   starts[0] = 0;
315   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
316   count = 0;
317   for ( i=0; i<size; i++ ) {
318     if (procs[i]) {
319       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
320                 comm,send_waits+count++);
321     }
322   }
323   PetscFree(starts); PetscFree(nprocs);
324 
325   /* Free cache space */
326   PLogInfo(mat,"MatAssemblyBegin_MPIAIJ:Number of off-processor values %d\n",aij->stash.n);
327   ierr = StashDestroy_Private(&aij->stash); CHKERRQ(ierr);
328 
329   aij->svalues    = svalues;    aij->rvalues    = rvalues;
330   aij->nsends     = nsends;     aij->nrecvs     = nreceives;
331   aij->send_waits = send_waits; aij->recv_waits = recv_waits;
332   aij->rmax       = nmax;
333 
334   return 0;
335 }
336 extern int MatSetUpMultiply_MPIAIJ(Mat);
337 
338 #undef __FUNC__
339 #define __FUNC__ "MatAssemblyEnd_MPIAIJ"
340 static int MatAssemblyEnd_MPIAIJ(Mat mat,MatAssemblyType mode)
341 {
342   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
343   MPI_Status  *send_status,recv_status;
344   int         imdex,nrecvs = aij->nrecvs, count = nrecvs, i, n, ierr;
345   int         row,col,other_disassembled;
346   Scalar      *values,val;
347   InsertMode  addv = aij->insertmode;
348 
349   /*  wait on receives */
350   while (count) {
351     MPI_Waitany(nrecvs,aij->recv_waits,&imdex,&recv_status);
352     /* unpack receives into our local space */
353     values = aij->rvalues + 3*imdex*aij->rmax;
354     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
355     n = n/3;
356     for ( i=0; i<n; i++ ) {
357       row = (int) PetscReal(values[3*i]) - aij->rstart;
358       col = (int) PetscReal(values[3*i+1]);
359       val = values[3*i+2];
360       if (col >= aij->cstart && col < aij->cend) {
361         col -= aij->cstart;
362         MatSetValues(aij->A,1,&row,1,&col,&val,addv);
363       }
364       else {
365         if (mat->was_assembled || mat->assembled) {
366           if (!aij->colmap) {
367             ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
368           }
369           col = aij->colmap[col] - 1;
370           if (col < 0  && !((Mat_SeqAIJ*)(aij->A->data))->nonew) {
371             ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
372             col = (int) PetscReal(values[3*i+1]);
373           }
374         }
375         MatSetValues(aij->B,1,&row,1,&col,&val,addv);
376       }
377     }
378     count--;
379   }
380   PetscFree(aij->recv_waits); PetscFree(aij->rvalues);
381 
382   /* wait on sends */
383   if (aij->nsends) {
384     send_status = (MPI_Status *) PetscMalloc(aij->nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
385     MPI_Waitall(aij->nsends,aij->send_waits,send_status);
386     PetscFree(send_status);
387   }
388   PetscFree(aij->send_waits); PetscFree(aij->svalues);
389 
390   aij->insertmode = NOT_SET_VALUES;
391   ierr = MatAssemblyBegin(aij->A,mode); CHKERRQ(ierr);
392   ierr = MatAssemblyEnd(aij->A,mode); CHKERRQ(ierr);
393 
394   /* determine if any processor has disassembled, if so we must
395      also disassemble ourselfs, in order that we may reassemble. */
396   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
397   if (mat->was_assembled && !other_disassembled) {
398     ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
399   }
400 
401   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
402     ierr = MatSetUpMultiply_MPIAIJ(mat); CHKERRQ(ierr);
403   }
404   ierr = MatAssemblyBegin(aij->B,mode); CHKERRQ(ierr);
405   ierr = MatAssemblyEnd(aij->B,mode); CHKERRQ(ierr);
406 
407   if (aij->rowvalues) {PetscFree(aij->rowvalues); aij->rowvalues = 0;}
408   return 0;
409 }
410 
411 #undef __FUNC__
412 #define __FUNC__ "MatZeroEntries_MPIAIJ"
413 static int MatZeroEntries_MPIAIJ(Mat A)
414 {
415   Mat_MPIAIJ *l = (Mat_MPIAIJ *) A->data;
416   int        ierr;
417   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
418   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
419   return 0;
420 }
421 
422 /* the code does not do the diagonal entries correctly unless the
423    matrix is square and the column and row owerships are identical.
424    This is a BUG. The only way to fix it seems to be to access
425    aij->A and aij->B directly and not through the MatZeroRows()
426    routine.
427 */
428 #undef __FUNC__
429 #define __FUNC__ "MatZeroRows_MPIAIJ"
430 static int MatZeroRows_MPIAIJ(Mat A,IS is,Scalar *diag)
431 {
432   Mat_MPIAIJ     *l = (Mat_MPIAIJ *) A->data;
433   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
434   int            *procs,*nprocs,j,found,idx,nsends,*work;
435   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
436   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
437   int            *lens,imdex,*lrows,*values;
438   MPI_Comm       comm = A->comm;
439   MPI_Request    *send_waits,*recv_waits;
440   MPI_Status     recv_status,*send_status;
441   IS             istmp;
442 
443   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
444   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
445 
446   /*  first count number of contributors to each processor */
447   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
448   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
449   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
450   for ( i=0; i<N; i++ ) {
451     idx = rows[i];
452     found = 0;
453     for ( j=0; j<size; j++ ) {
454       if (idx >= owners[j] && idx < owners[j+1]) {
455         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
456       }
457     }
458     if (!found) SETERRQ(1,0,"Index out of range");
459   }
460   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
461 
462   /* inform other processors of number of messages and max length*/
463   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
464   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
465   nrecvs = work[rank];
466   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
467   nmax = work[rank];
468   PetscFree(work);
469 
470   /* post receives:   */
471   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
472   CHKPTRQ(rvalues);
473   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
474   CHKPTRQ(recv_waits);
475   for ( i=0; i<nrecvs; i++ ) {
476     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
477   }
478 
479   /* do sends:
480       1) starts[i] gives the starting index in svalues for stuff going to
481          the ith processor
482   */
483   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
484   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
485   CHKPTRQ(send_waits);
486   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
487   starts[0] = 0;
488   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
489   for ( i=0; i<N; i++ ) {
490     svalues[starts[owner[i]]++] = rows[i];
491   }
492   ISRestoreIndices(is,&rows);
493 
494   starts[0] = 0;
495   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
496   count = 0;
497   for ( i=0; i<size; i++ ) {
498     if (procs[i]) {
499       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
500     }
501   }
502   PetscFree(starts);
503 
504   base = owners[rank];
505 
506   /*  wait on receives */
507   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
508   source = lens + nrecvs;
509   count  = nrecvs; slen = 0;
510   while (count) {
511     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
512     /* unpack receives into our local space */
513     MPI_Get_count(&recv_status,MPI_INT,&n);
514     source[imdex]  = recv_status.MPI_SOURCE;
515     lens[imdex]  = n;
516     slen += n;
517     count--;
518   }
519   PetscFree(recv_waits);
520 
521   /* move the data into the send scatter */
522   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
523   count = 0;
524   for ( i=0; i<nrecvs; i++ ) {
525     values = rvalues + i*nmax;
526     for ( j=0; j<lens[i]; j++ ) {
527       lrows[count++] = values[j] - base;
528     }
529   }
530   PetscFree(rvalues); PetscFree(lens);
531   PetscFree(owner); PetscFree(nprocs);
532 
533   /* actually zap the local rows */
534   ierr = ISCreateGeneral(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
535   PLogObjectParent(A,istmp);
536   PetscFree(lrows);
537   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
538   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
539   ierr = ISDestroy(istmp); CHKERRQ(ierr);
540 
541   /* wait on sends */
542   if (nsends) {
543     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
544     CHKPTRQ(send_status);
545     MPI_Waitall(nsends,send_waits,send_status);
546     PetscFree(send_status);
547   }
548   PetscFree(send_waits); PetscFree(svalues);
549 
550   return 0;
551 }
552 
553 #undef __FUNC__
554 #define __FUNC__ "MatMult_MPIAIJ"
555 static int MatMult_MPIAIJ(Mat A,Vec xx,Vec yy)
556 {
557   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
558   int        ierr,nt;
559 
560   ierr = VecGetLocalSize(xx,&nt);  CHKERRQ(ierr);
561   if (nt != a->n) {
562     SETERRQ(1,0,"Incompatible parition of A and xx");
563   }
564   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx); CHKERRQ(ierr);
565   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
566   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx); CHKERRQ(ierr);
567   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
568   return 0;
569 }
570 
571 #undef __FUNC__
572 #define __FUNC__ "MatMultAdd_MPIAIJ"
573 static int MatMultAdd_MPIAIJ(Mat A,Vec xx,Vec yy,Vec zz)
574 {
575   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
576   int        ierr;
577   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
578   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
579   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
580   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
581   return 0;
582 }
583 
584 #undef __FUNC__
585 #define __FUNC__ "MatMultTrans_MPIAIJ"
586 static int MatMultTrans_MPIAIJ(Mat A,Vec xx,Vec yy)
587 {
588   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
589   int        ierr;
590 
591   /* do nondiagonal part */
592   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
593   /* send it on its way */
594   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
595   /* do local part */
596   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
597   /* receive remote parts: note this assumes the values are not actually */
598   /* inserted in yy until the next line, which is true for my implementation*/
599   /* but is not perhaps always true. */
600   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
601   return 0;
602 }
603 
604 #undef __FUNC__
605 #define __FUNC__ "MatMultTransAdd_MPIAIJ"
606 static int MatMultTransAdd_MPIAIJ(Mat A,Vec xx,Vec yy,Vec zz)
607 {
608   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
609   int        ierr;
610 
611   /* do nondiagonal part */
612   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
613   /* send it on its way */
614   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
615   /* do local part */
616   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
617   /* receive remote parts: note this assumes the values are not actually */
618   /* inserted in yy until the next line, which is true for my implementation*/
619   /* but is not perhaps always true. */
620   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
621   return 0;
622 }
623 
624 /*
625   This only works correctly for square matrices where the subblock A->A is the
626    diagonal block
627 */
628 #undef __FUNC__
629 #define __FUNC__ "MatGetDiagonal_MPIAIJ"
630 static int MatGetDiagonal_MPIAIJ(Mat A,Vec v)
631 {
632   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
633   if (a->M != a->N)
634     SETERRQ(1,0,"Supports only square matrix where A->A is diag block");
635   if (a->rstart != a->cstart || a->rend != a->cend) {
636     SETERRQ(1,0,"row partition must equal col partition");  }
637   return MatGetDiagonal(a->A,v);
638 }
639 
640 #undef __FUNC__
641 #define __FUNC__ "MatScale_MPIAIJ"
642 static int MatScale_MPIAIJ(Scalar *aa,Mat A)
643 {
644   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
645   int        ierr;
646   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
647   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
648   return 0;
649 }
650 
651 #undef __FUNC__
652 #define __FUNC__ "MatDestroy_MPIAIJ"
653 static int MatDestroy_MPIAIJ(PetscObject obj)
654 {
655   Mat        mat = (Mat) obj;
656   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
657   int        ierr;
658 #if defined(PETSC_LOG)
659   PLogObjectState(obj,"Rows=%d, Cols=%d",aij->M,aij->N);
660 #endif
661   PetscFree(aij->rowners);
662   ierr = MatDestroy(aij->A); CHKERRQ(ierr);
663   ierr = MatDestroy(aij->B); CHKERRQ(ierr);
664   if (aij->colmap) PetscFree(aij->colmap);
665   if (aij->garray) PetscFree(aij->garray);
666   if (aij->lvec)   VecDestroy(aij->lvec);
667   if (aij->Mvctx)  VecScatterDestroy(aij->Mvctx);
668   if (aij->rowvalues) PetscFree(aij->rowvalues);
669   PetscFree(aij);
670   if (mat->mapping) {
671     ierr = ISLocalToGlobalMappingDestroy(mat->mapping); CHKERRQ(ierr);
672   }
673   PLogObjectDestroy(mat);
674   PetscHeaderDestroy(mat);
675   return 0;
676 }
677 #include "draw.h"
678 #include "pinclude/pviewer.h"
679 
680 #undef __FUNC__
681 #define __FUNC__ "MatView_MPIAIJ_Binary"
682 static int MatView_MPIAIJ_Binary(Mat mat,Viewer viewer)
683 {
684   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
685   int         ierr;
686 
687   if (aij->size == 1) {
688     ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
689   }
690   else SETERRQ(1,0,"Only uniprocessor output supported");
691   return 0;
692 }
693 
694 #undef __FUNC__
695 #define __FUNC__ "MatView_MPIAIJ_ASCIIorDraworMatlab"
696 static int MatView_MPIAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
697 {
698   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
699   Mat_SeqAIJ* C = (Mat_SeqAIJ*)aij->A->data;
700   int         ierr, format,shift = C->indexshift,rank;
701   FILE        *fd;
702   ViewerType  vtype;
703 
704   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
705   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
706     ierr = ViewerGetFormat(viewer,&format);
707     if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
708       MatInfo info;
709       int     flg;
710       MPI_Comm_rank(mat->comm,&rank);
711       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
712       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
713       ierr = OptionsHasName(PETSC_NULL,"-mat_aij_no_inode",&flg); CHKERRQ(ierr);
714       PetscSequentialPhaseBegin(mat->comm,1);
715       if (flg) fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d mem %d, not using I-node routines\n",
716          rank,aij->m,(int)info.nz_used,(int)info.nz_allocated,(int)info.memory);
717       else fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d mem %d, using I-node routines\n",
718          rank,aij->m,(int)info.nz_used,(int)info.nz_allocated,(int)info.memory);
719       ierr = MatGetInfo(aij->A,MAT_LOCAL,&info);
720       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used);
721       ierr = MatGetInfo(aij->B,MAT_LOCAL,&info);
722       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used);
723       fflush(fd);
724       PetscSequentialPhaseEnd(mat->comm,1);
725       ierr = VecScatterView(aij->Mvctx,viewer); CHKERRQ(ierr);
726       return 0;
727     }
728     else if (format == VIEWER_FORMAT_ASCII_INFO) {
729       return 0;
730     }
731   }
732 
733   if (vtype == DRAW_VIEWER) {
734     Draw       draw;
735     PetscTruth isnull;
736     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
737     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
738   }
739 
740   if (vtype == ASCII_FILE_VIEWER) {
741     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
742     PetscSequentialPhaseBegin(mat->comm,1);
743     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
744            aij->rank,aij->m,aij->rstart,aij->rend,aij->n,aij->cstart,
745            aij->cend);
746     ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
747     ierr = MatView(aij->B,viewer); CHKERRQ(ierr);
748     fflush(fd);
749     PetscSequentialPhaseEnd(mat->comm,1);
750   }
751   else {
752     int size = aij->size;
753     rank = aij->rank;
754     if (size == 1) {
755       ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
756     }
757     else {
758       /* assemble the entire matrix onto first processor. */
759       Mat         A;
760       Mat_SeqAIJ *Aloc;
761       int         M = aij->M, N = aij->N,m,*ai,*aj,row,*cols,i,*ct;
762       Scalar      *a;
763 
764       if (!rank) {
765         ierr = MatCreateMPIAIJ(mat->comm,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
766                CHKERRQ(ierr);
767       }
768       else {
769         ierr = MatCreateMPIAIJ(mat->comm,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
770                CHKERRQ(ierr);
771       }
772       PLogObjectParent(mat,A);
773 
774       /* copy over the A part */
775       Aloc = (Mat_SeqAIJ*) aij->A->data;
776       m = Aloc->m; ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
777       row = aij->rstart;
778       for ( i=0; i<ai[m]+shift; i++ ) {aj[i] += aij->cstart + shift;}
779       for ( i=0; i<m; i++ ) {
780         ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],aj,a,INSERT_VALUES);CHKERRQ(ierr);
781         row++; a += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
782       }
783       aj = Aloc->j;
784       for ( i=0; i<ai[m]+shift; i++ ) {aj[i] -= aij->cstart + shift;}
785 
786       /* copy over the B part */
787       Aloc = (Mat_SeqAIJ*) aij->B->data;
788       m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
789       row = aij->rstart;
790       ct = cols = (int *) PetscMalloc( (ai[m]+1)*sizeof(int) ); CHKPTRQ(cols);
791       for ( i=0; i<ai[m]+shift; i++ ) {cols[i] = aij->garray[aj[i]+shift];}
792       for ( i=0; i<m; i++ ) {
793         ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],cols,a,INSERT_VALUES);CHKERRQ(ierr);
794         row++; a += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
795       }
796       PetscFree(ct);
797       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
798       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
799       if (!rank) {
800         ierr = MatView(((Mat_MPIAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
801       }
802       ierr = MatDestroy(A); CHKERRQ(ierr);
803     }
804   }
805   return 0;
806 }
807 
808 #undef __FUNC__
809 #define __FUNC__ "MatView_MPIAIJ"
810 static int MatView_MPIAIJ(PetscObject obj,Viewer viewer)
811 {
812   Mat         mat = (Mat) obj;
813   int         ierr;
814   ViewerType  vtype;
815 
816   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
817   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
818       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
819     ierr = MatView_MPIAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
820   }
821   else if (vtype == BINARY_FILE_VIEWER) {
822     return MatView_MPIAIJ_Binary(mat,viewer);
823   }
824   return 0;
825 }
826 
827 /*
828     This has to provide several versions.
829 
830      2) a) use only local smoothing updating outer values only once.
831         b) local smoothing updating outer values each inner iteration
832      3) color updating out values betwen colors.
833 */
834 #undef __FUNC__
835 #define __FUNC__ "MatRelax_MPIAIJ"
836 static int MatRelax_MPIAIJ(Mat matin,Vec bb,double omega,MatSORType flag,
837                            double fshift,int its,Vec xx)
838 {
839   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
840   Mat        AA = mat->A, BB = mat->B;
841   Mat_SeqAIJ *A = (Mat_SeqAIJ *) AA->data, *B = (Mat_SeqAIJ *)BB->data;
842   Scalar     *b,*x,*xs,*ls,d,*v,sum;
843   int        ierr,*idx, *diag;
844   int        n = mat->n, m = mat->m, i,shift = A->indexshift;
845 
846   VecGetArray(xx,&x); VecGetArray(bb,&b); VecGetArray(mat->lvec,&ls);
847   xs = x + shift; /* shift by one for index start of 1 */
848   ls = ls + shift;
849   if (!A->diag) {if ((ierr = MatMarkDiag_SeqAIJ(AA))) return ierr;}
850   diag = A->diag;
851   if ((flag & SOR_LOCAL_SYMMETRIC_SWEEP) == SOR_LOCAL_SYMMETRIC_SWEEP){
852     if (flag & SOR_ZERO_INITIAL_GUESS) {
853       return (*mat->A->ops.relax)(mat->A,bb,omega,flag,fshift,its,xx);
854     }
855     ierr=VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);
856     CHKERRQ(ierr);
857     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);
858     CHKERRQ(ierr);
859     while (its--) {
860       /* go down through the rows */
861       for ( i=0; i<m; i++ ) {
862         n    = A->i[i+1] - A->i[i];
863         idx  = A->j + A->i[i] + shift;
864         v    = A->a + A->i[i] + shift;
865         sum  = b[i];
866         SPARSEDENSEMDOT(sum,xs,v,idx,n);
867         d    = fshift + A->a[diag[i]+shift];
868         n    = B->i[i+1] - B->i[i];
869         idx  = B->j + B->i[i] + shift;
870         v    = B->a + B->i[i] + shift;
871         SPARSEDENSEMDOT(sum,ls,v,idx,n);
872         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
873       }
874       /* come up through the rows */
875       for ( i=m-1; i>-1; i-- ) {
876         n    = A->i[i+1] - A->i[i];
877         idx  = A->j + A->i[i] + shift;
878         v    = A->a + A->i[i] + shift;
879         sum  = b[i];
880         SPARSEDENSEMDOT(sum,xs,v,idx,n);
881         d    = fshift + A->a[diag[i]+shift];
882         n    = B->i[i+1] - B->i[i];
883         idx  = B->j + B->i[i] + shift;
884         v    = B->a + B->i[i] + shift;
885         SPARSEDENSEMDOT(sum,ls,v,idx,n);
886         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
887       }
888     }
889   }
890   else if (flag & SOR_LOCAL_FORWARD_SWEEP){
891     if (flag & SOR_ZERO_INITIAL_GUESS) {
892       return (*mat->A->ops.relax)(mat->A,bb,omega,flag,fshift,its,xx);
893     }
894     ierr=VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);
895     CHKERRQ(ierr);
896     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);
897     CHKERRQ(ierr);
898     while (its--) {
899       for ( i=0; i<m; i++ ) {
900         n    = A->i[i+1] - A->i[i];
901         idx  = A->j + A->i[i] + shift;
902         v    = A->a + A->i[i] + shift;
903         sum  = b[i];
904         SPARSEDENSEMDOT(sum,xs,v,idx,n);
905         d    = fshift + A->a[diag[i]+shift];
906         n    = B->i[i+1] - B->i[i];
907         idx  = B->j + B->i[i] + shift;
908         v    = B->a + B->i[i] + shift;
909         SPARSEDENSEMDOT(sum,ls,v,idx,n);
910         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
911       }
912     }
913   }
914   else if (flag & SOR_LOCAL_BACKWARD_SWEEP){
915     if (flag & SOR_ZERO_INITIAL_GUESS) {
916       return (*mat->A->ops.relax)(mat->A,bb,omega,flag,fshift,its,xx);
917     }
918     ierr = VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,
919                             mat->Mvctx); CHKERRQ(ierr);
920     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,
921                             mat->Mvctx); CHKERRQ(ierr);
922     while (its--) {
923       for ( i=m-1; i>-1; i-- ) {
924         n    = A->i[i+1] - A->i[i];
925         idx  = A->j + A->i[i] + shift;
926         v    = A->a + A->i[i] + shift;
927         sum  = b[i];
928         SPARSEDENSEMDOT(sum,xs,v,idx,n);
929         d    = fshift + A->a[diag[i]+shift];
930         n    = B->i[i+1] - B->i[i];
931         idx  = B->j + B->i[i] + shift;
932         v    = B->a + B->i[i] + shift;
933         SPARSEDENSEMDOT(sum,ls,v,idx,n);
934         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
935       }
936     }
937   }
938   else {
939     SETERRQ(1,0,"Option not supported");
940   }
941   return 0;
942 }
943 
944 #undef __FUNC__
945 #define __FUNC__ "MatGetInfo_MPIAIJ"
946 static int MatGetInfo_MPIAIJ(Mat matin,MatInfoType flag,MatInfo *info)
947 {
948   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
949   Mat        A = mat->A, B = mat->B;
950   int        ierr;
951   double     isend[5], irecv[5];
952 
953   info->rows_global    = (double)mat->M;
954   info->columns_global = (double)mat->N;
955   info->rows_local     = (double)mat->m;
956   info->columns_local  = (double)mat->N;
957   info->block_size     = 1.0;
958   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
959   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->nz_unneeded;
960   isend[3] = info->memory;  isend[4] = info->mallocs;
961   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
962   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->nz_unneeded;
963   isend[3] += info->memory;  isend[4] += info->mallocs;
964   if (flag == MAT_LOCAL) {
965     info->nz_used      = isend[0];
966     info->nz_allocated = isend[1];
967     info->nz_unneeded  = isend[2];
968     info->memory       = isend[3];
969     info->mallocs      = isend[4];
970   } else if (flag == MAT_GLOBAL_MAX) {
971     MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_MAX,matin->comm);
972     info->nz_used      = irecv[0];
973     info->nz_allocated = irecv[1];
974     info->nz_unneeded  = irecv[2];
975     info->memory       = irecv[3];
976     info->mallocs      = irecv[4];
977   } else if (flag == MAT_GLOBAL_SUM) {
978     MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_SUM,matin->comm);
979     info->nz_used      = irecv[0];
980     info->nz_allocated = irecv[1];
981     info->nz_unneeded  = irecv[2];
982     info->memory       = irecv[3];
983     info->mallocs      = irecv[4];
984   }
985   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
986   info->fill_ratio_needed = 0;
987   info->factor_mallocs    = 0;
988 
989   return 0;
990 }
991 
992 extern int MatLUFactorSymbolic_MPIAIJ(Mat,IS,IS,double,Mat*);
993 extern int MatLUFactorNumeric_MPIAIJ(Mat,Mat*);
994 extern int MatLUFactor_MPIAIJ(Mat,IS,IS,double);
995 extern int MatILUFactorSymbolic_MPIAIJ(Mat,IS,IS,double,int,Mat *);
996 extern int MatSolve_MPIAIJ(Mat,Vec,Vec);
997 extern int MatSolveAdd_MPIAIJ(Mat,Vec,Vec,Vec);
998 extern int MatSolveTrans_MPIAIJ(Mat,Vec,Vec);
999 extern int MatSolveTransAdd_MPIAIJ(Mat,Vec,Vec,Vec);
1000 
1001 #undef __FUNC__
1002 #define __FUNC__ "MatSetOption_MPIAIJ"
1003 static int MatSetOption_MPIAIJ(Mat A,MatOption op)
1004 {
1005   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
1006 
1007   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
1008       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
1009       op == MAT_COLUMNS_UNSORTED ||
1010       op == MAT_COLUMNS_SORTED) {
1011         MatSetOption(a->A,op);
1012         MatSetOption(a->B,op);
1013   } else if (op == MAT_ROW_ORIENTED) {
1014     a->roworiented = 1;
1015     MatSetOption(a->A,op);
1016     MatSetOption(a->B,op);
1017   } else if (op == MAT_ROWS_SORTED ||
1018              op == MAT_ROWS_UNSORTED ||
1019              op == MAT_SYMMETRIC ||
1020              op == MAT_STRUCTURALLY_SYMMETRIC ||
1021              op == MAT_YES_NEW_DIAGONALS)
1022     PLogInfo(A,"Info:MatSetOption_MPIAIJ:Option ignored\n");
1023   else if (op == MAT_COLUMN_ORIENTED) {
1024     a->roworiented = 0;
1025     MatSetOption(a->A,op);
1026     MatSetOption(a->B,op);
1027   } else if (op == MAT_IGNORE_OFF_PROCESSOR_ENTRIES) {
1028     a->donotstash = 1;
1029   } else if (op == MAT_NO_NEW_DIAGONALS)
1030     {SETERRQ(PETSC_ERR_SUP,0,"MAT_NO_NEW_DIAGONALS");}
1031   else
1032     {SETERRQ(PETSC_ERR_SUP,0,"unknown option");}
1033   return 0;
1034 }
1035 
1036 #undef __FUNC__
1037 #define __FUNC__ "MatGetSize_MPIAIJ"
1038 static int MatGetSize_MPIAIJ(Mat matin,int *m,int *n)
1039 {
1040   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1041   *m = mat->M; *n = mat->N;
1042   return 0;
1043 }
1044 
1045 #undef __FUNC__
1046 #define __FUNC__ "MatGetLocalSize_MPIAIJ"
1047 static int MatGetLocalSize_MPIAIJ(Mat matin,int *m,int *n)
1048 {
1049   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1050   *m = mat->m; *n = mat->N;
1051   return 0;
1052 }
1053 
1054 #undef __FUNC__
1055 #define __FUNC__ "MatGetOwnershipRange_MPIAIJ"
1056 static int MatGetOwnershipRange_MPIAIJ(Mat matin,int *m,int *n)
1057 {
1058   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1059   *m = mat->rstart; *n = mat->rend;
1060   return 0;
1061 }
1062 
1063 extern int MatGetRow_SeqAIJ(Mat,int,int*,int**,Scalar**);
1064 extern int MatRestoreRow_SeqAIJ(Mat,int,int*,int**,Scalar**);
1065 
1066 #undef __FUNC__
1067 #define __FUNC__ "MatGetRow_MPIAIJ"
1068 int MatGetRow_MPIAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
1069 {
1070   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1071   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
1072   int        i, ierr, *cworkA, *cworkB, **pcA, **pcB, cstart = mat->cstart;
1073   int        nztot, nzA, nzB, lrow, rstart = mat->rstart, rend = mat->rend;
1074   int        *cmap, *idx_p;
1075 
1076   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,0,"Already active");
1077   mat->getrowactive = PETSC_TRUE;
1078 
1079   if (!mat->rowvalues && (idx || v)) {
1080     /*
1081         allocate enough space to hold information from the longest row.
1082     */
1083     Mat_SeqAIJ *Aa = (Mat_SeqAIJ *) mat->A->data,*Ba = (Mat_SeqAIJ *) mat->B->data;
1084     int     max = 1,m = mat->m,tmp;
1085     for ( i=0; i<m; i++ ) {
1086       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
1087       if (max < tmp) { max = tmp; }
1088     }
1089     mat->rowvalues = (Scalar *) PetscMalloc( max*(sizeof(int)+sizeof(Scalar)));
1090     CHKPTRQ(mat->rowvalues);
1091     mat->rowindices = (int *) (mat->rowvalues + max);
1092   }
1093 
1094   if (row < rstart || row >= rend) SETERRQ(1,0,"Only local rows")
1095   lrow = row - rstart;
1096 
1097   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
1098   if (!v)   {pvA = 0; pvB = 0;}
1099   if (!idx) {pcA = 0; if (!v) pcB = 0;}
1100   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1101   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1102   nztot = nzA + nzB;
1103 
1104   cmap  = mat->garray;
1105   if (v  || idx) {
1106     if (nztot) {
1107       /* Sort by increasing column numbers, assuming A and B already sorted */
1108       int imark = -1;
1109       if (v) {
1110         *v = v_p = mat->rowvalues;
1111         for ( i=0; i<nzB; i++ ) {
1112           if (cmap[cworkB[i]] < cstart)   v_p[i] = vworkB[i];
1113           else break;
1114         }
1115         imark = i;
1116         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
1117         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
1118       }
1119       if (idx) {
1120         *idx = idx_p = mat->rowindices;
1121         if (imark > -1) {
1122           for ( i=0; i<imark; i++ ) {
1123             idx_p[i] = cmap[cworkB[i]];
1124           }
1125         } else {
1126           for ( i=0; i<nzB; i++ ) {
1127             if (cmap[cworkB[i]] < cstart)   idx_p[i] = cmap[cworkB[i]];
1128             else break;
1129           }
1130           imark = i;
1131         }
1132         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart + cworkA[i];
1133         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]];
1134       }
1135     }
1136     else {
1137       if (idx) *idx = 0;
1138       if (v)   *v   = 0;
1139     }
1140   }
1141   *nz = nztot;
1142   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1143   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1144   return 0;
1145 }
1146 
1147 #undef __FUNC__
1148 #define __FUNC__ "MatRestoreRow_MPIAIJ"
1149 int MatRestoreRow_MPIAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
1150 {
1151   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
1152   if (aij->getrowactive == PETSC_FALSE) {
1153     SETERRQ(1,0,"MatGetRow not called");
1154   }
1155   aij->getrowactive = PETSC_FALSE;
1156   return 0;
1157 }
1158 
1159 #undef __FUNC__
1160 #define __FUNC__ "MatNorm_MPIAIJ"
1161 static int MatNorm_MPIAIJ(Mat mat,NormType type,double *norm)
1162 {
1163   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
1164   Mat_SeqAIJ *amat = (Mat_SeqAIJ*) aij->A->data, *bmat = (Mat_SeqAIJ*) aij->B->data;
1165   int        ierr, i, j, cstart = aij->cstart,shift = amat->indexshift;
1166   double     sum = 0.0;
1167   Scalar     *v;
1168 
1169   if (aij->size == 1) {
1170     ierr =  MatNorm(aij->A,type,norm); CHKERRQ(ierr);
1171   } else {
1172     if (type == NORM_FROBENIUS) {
1173       v = amat->a;
1174       for (i=0; i<amat->nz; i++ ) {
1175 #if defined(PETSC_COMPLEX)
1176         sum += real(conj(*v)*(*v)); v++;
1177 #else
1178         sum += (*v)*(*v); v++;
1179 #endif
1180       }
1181       v = bmat->a;
1182       for (i=0; i<bmat->nz; i++ ) {
1183 #if defined(PETSC_COMPLEX)
1184         sum += real(conj(*v)*(*v)); v++;
1185 #else
1186         sum += (*v)*(*v); v++;
1187 #endif
1188       }
1189       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
1190       *norm = sqrt(*norm);
1191     }
1192     else if (type == NORM_1) { /* max column norm */
1193       double *tmp, *tmp2;
1194       int    *jj, *garray = aij->garray;
1195       tmp  = (double *) PetscMalloc( aij->N*sizeof(double) ); CHKPTRQ(tmp);
1196       tmp2 = (double *) PetscMalloc( aij->N*sizeof(double) ); CHKPTRQ(tmp2);
1197       PetscMemzero(tmp,aij->N*sizeof(double));
1198       *norm = 0.0;
1199       v = amat->a; jj = amat->j;
1200       for ( j=0; j<amat->nz; j++ ) {
1201         tmp[cstart + *jj++ + shift] += PetscAbsScalar(*v);  v++;
1202       }
1203       v = bmat->a; jj = bmat->j;
1204       for ( j=0; j<bmat->nz; j++ ) {
1205         tmp[garray[*jj++ + shift]] += PetscAbsScalar(*v); v++;
1206       }
1207       MPI_Allreduce(tmp,tmp2,aij->N,MPI_DOUBLE,MPI_SUM,mat->comm);
1208       for ( j=0; j<aij->N; j++ ) {
1209         if (tmp2[j] > *norm) *norm = tmp2[j];
1210       }
1211       PetscFree(tmp); PetscFree(tmp2);
1212     }
1213     else if (type == NORM_INFINITY) { /* max row norm */
1214       double ntemp = 0.0;
1215       for ( j=0; j<amat->m; j++ ) {
1216         v = amat->a + amat->i[j] + shift;
1217         sum = 0.0;
1218         for ( i=0; i<amat->i[j+1]-amat->i[j]; i++ ) {
1219           sum += PetscAbsScalar(*v); v++;
1220         }
1221         v = bmat->a + bmat->i[j] + shift;
1222         for ( i=0; i<bmat->i[j+1]-bmat->i[j]; i++ ) {
1223           sum += PetscAbsScalar(*v); v++;
1224         }
1225         if (sum > ntemp) ntemp = sum;
1226       }
1227       MPI_Allreduce(&ntemp,norm,1,MPI_DOUBLE,MPI_MAX,mat->comm);
1228     }
1229     else {
1230       SETERRQ(1,0,"No support for two norm");
1231     }
1232   }
1233   return 0;
1234 }
1235 
1236 #undef __FUNC__
1237 #define __FUNC__ "MatTranspose_MPIAIJ"
1238 static int MatTranspose_MPIAIJ(Mat A,Mat *matout)
1239 {
1240   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
1241   Mat_SeqAIJ *Aloc = (Mat_SeqAIJ *) a->A->data;
1242   int        ierr,shift = Aloc->indexshift;
1243   Mat        B;
1244   int        M = a->M, N = a->N,m,*ai,*aj,row,*cols,i,*ct;
1245   Scalar     *array;
1246 
1247   if (matout == PETSC_NULL && M != N)
1248     SETERRQ(1,0,"Square matrix only for in-place");
1249   ierr = MatCreateMPIAIJ(A->comm,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,
1250          PETSC_NULL,&B); CHKERRQ(ierr);
1251 
1252   /* copy over the A part */
1253   Aloc = (Mat_SeqAIJ*) a->A->data;
1254   m = Aloc->m; ai = Aloc->i; aj = Aloc->j; array = Aloc->a;
1255   row = a->rstart;
1256   for ( i=0; i<ai[m]+shift; i++ ) {aj[i] += a->cstart + shift;}
1257   for ( i=0; i<m; i++ ) {
1258     ierr = MatSetValues(B,ai[i+1]-ai[i],aj,1,&row,array,INSERT_VALUES);CHKERRQ(ierr);
1259     row++; array += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
1260   }
1261   aj = Aloc->j;
1262   for ( i=0; i<ai[m]+shift; i++ ) {aj[i] -= a->cstart + shift;}
1263 
1264   /* copy over the B part */
1265   Aloc = (Mat_SeqAIJ*) a->B->data;
1266   m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; array = Aloc->a;
1267   row = a->rstart;
1268   ct = cols = (int *) PetscMalloc( (1+ai[m]-shift)*sizeof(int) ); CHKPTRQ(cols);
1269   for ( i=0; i<ai[m]+shift; i++ ) {cols[i] = a->garray[aj[i]+shift];}
1270   for ( i=0; i<m; i++ ) {
1271     ierr = MatSetValues(B,ai[i+1]-ai[i],cols,1,&row,array,INSERT_VALUES);CHKERRQ(ierr);
1272     row++; array += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
1273   }
1274   PetscFree(ct);
1275   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1276   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1277   if (matout != PETSC_NULL) {
1278     *matout = B;
1279   } else {
1280     /* This isn't really an in-place transpose .... but free data structures from a */
1281     PetscFree(a->rowners);
1282     ierr = MatDestroy(a->A); CHKERRQ(ierr);
1283     ierr = MatDestroy(a->B); CHKERRQ(ierr);
1284     if (a->colmap) PetscFree(a->colmap);
1285     if (a->garray) PetscFree(a->garray);
1286     if (a->lvec) VecDestroy(a->lvec);
1287     if (a->Mvctx) VecScatterDestroy(a->Mvctx);
1288     PetscFree(a);
1289     PetscMemcpy(A,B,sizeof(struct _Mat));
1290     PetscHeaderDestroy(B);
1291   }
1292   return 0;
1293 }
1294 
1295 #undef __FUNC__
1296 #define __FUNC__ "MatDiagonalScale_MPIAIJ"
1297 int MatDiagonalScale_MPIAIJ(Mat mat,Vec ll,Vec rr)
1298 {
1299   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
1300   Mat a = aij->A, b = aij->B;
1301   int ierr,s1,s2,s3;
1302 
1303   ierr = MatGetLocalSize(mat,&s2,&s3); CHKERRQ(ierr);
1304   if (rr) {
1305     s3 = aij->n;
1306     VecGetLocalSize_Fast(rr,s1);
1307     if (s1!=s3) SETERRQ(1,0,"right vector non-conforming local size");
1308     /* Overlap communication with computation. */
1309     ierr = VecScatterBegin(rr,aij->lvec,INSERT_VALUES,SCATTER_FORWARD,aij->Mvctx); CHKERRQ(ierr);
1310   }
1311   if (ll) {
1312     VecGetLocalSize_Fast(ll,s1);
1313     if (s1!=s2) SETERRQ(1,0,"left vector non-conforming local size");
1314     ierr = (*b->ops.diagonalscale)(b,ll,0); CHKERRQ(ierr);
1315   }
1316   /* scale  the diagonal block */
1317   ierr = (*a->ops.diagonalscale)(a,ll,rr); CHKERRQ(ierr);
1318 
1319   if (rr) {
1320     /* Do a scatter end and then right scale the off-diagonal block */
1321     ierr = VecScatterEnd(rr,aij->lvec,INSERT_VALUES,SCATTER_FORWARD,aij->Mvctx); CHKERRQ(ierr);
1322     ierr = (*b->ops.diagonalscale)(b,0,aij->lvec); CHKERRQ(ierr);
1323   }
1324 
1325   return 0;
1326 }
1327 
1328 
1329 extern int MatPrintHelp_SeqAIJ(Mat);
1330 #undef __FUNC__
1331 #define __FUNC__ "MatPrintHelp_MPIAIJ"
1332 static int MatPrintHelp_MPIAIJ(Mat A)
1333 {
1334   Mat_MPIAIJ *a   = (Mat_MPIAIJ*) A->data;
1335 
1336   if (!a->rank) return MatPrintHelp_SeqAIJ(a->A);
1337   else return 0;
1338 }
1339 
1340 #undef __FUNC__
1341 #define __FUNC__ "MatGetBlockSize_MPIAIJ"
1342 static int MatGetBlockSize_MPIAIJ(Mat A,int *bs)
1343 {
1344   *bs = 1;
1345   return 0;
1346 }
1347 #undef __FUNC__
1348 #define __FUNC__ "MatSetUnfactored_MPIAIJ"
1349 static int MatSetUnfactored_MPIAIJ(Mat A)
1350 {
1351   Mat_MPIAIJ *a   = (Mat_MPIAIJ*) A->data;
1352   int        ierr;
1353   ierr = MatSetUnfactored(a->A); CHKERRQ(ierr);
1354   return 0;
1355 }
1356 
1357 
1358 extern int MatConvert_MPIAIJ(Mat,MatType,Mat *);
1359 static int MatConvertSameType_MPIAIJ(Mat,Mat *,int);
1360 extern int MatIncreaseOverlap_MPIAIJ(Mat , int, IS *, int);
1361 extern int MatFDColoringCreate_MPIAIJ(Mat,ISColoring,MatFDColoring);
1362 extern int MatGetSubMatrices_MPIAIJ (Mat ,int , IS *,IS *,MatGetSubMatrixCall,Mat **);
1363 /* -------------------------------------------------------------------*/
1364 static struct _MatOps MatOps = {MatSetValues_MPIAIJ,
1365        MatGetRow_MPIAIJ,MatRestoreRow_MPIAIJ,
1366        MatMult_MPIAIJ,MatMultAdd_MPIAIJ,
1367        MatMultTrans_MPIAIJ,MatMultTransAdd_MPIAIJ,
1368        0,0,
1369        0,0,
1370        0,0,
1371        MatRelax_MPIAIJ,
1372        MatTranspose_MPIAIJ,
1373        MatGetInfo_MPIAIJ,0,
1374        MatGetDiagonal_MPIAIJ,MatDiagonalScale_MPIAIJ,MatNorm_MPIAIJ,
1375        MatAssemblyBegin_MPIAIJ,MatAssemblyEnd_MPIAIJ,
1376        0,
1377        MatSetOption_MPIAIJ,MatZeroEntries_MPIAIJ,MatZeroRows_MPIAIJ,
1378        0,0,0,0,
1379        MatGetSize_MPIAIJ,MatGetLocalSize_MPIAIJ,MatGetOwnershipRange_MPIAIJ,
1380        0,0,
1381        0,0,MatConvert_MPIAIJ,MatConvertSameType_MPIAIJ,0,0,
1382        0,0,0,
1383        MatGetSubMatrices_MPIAIJ,MatIncreaseOverlap_MPIAIJ,MatGetValues_MPIAIJ,0,
1384        MatPrintHelp_MPIAIJ,
1385        MatScale_MPIAIJ,0,0,0,
1386        MatGetBlockSize_MPIAIJ,0,0,0,0,
1387        MatFDColoringCreate_MPIAIJ,0,MatSetUnfactored_MPIAIJ};
1388 
1389 
1390 #undef __FUNC__
1391 #define __FUNC__ "MatCreateMPIAIJ"
1392 /*@C
1393    MatCreateMPIAIJ - Creates a sparse parallel matrix in AIJ format
1394    (the default parallel PETSc format).  For good matrix assembly performance
1395    the user should preallocate the matrix storage by setting the parameters
1396    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1397    performance can be increased by more than a factor of 50.
1398 
1399    Input Parameters:
1400 .  comm - MPI communicator
1401 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1402            This value should be the same as the local size used in creating the
1403            y vector for the matrix-vector product y = Ax.
1404 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1405            This value should be the same as the local size used in creating the
1406            x vector for the matrix-vector product y = Ax.
1407 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1408 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1409 .  d_nz - number of nonzeros per row in diagonal portion of local submatrix
1410            (same for all local rows)
1411 .  d_nzz - array containing the number of nonzeros in the various rows of the
1412            diagonal portion of the local submatrix (possibly different for each row)
1413            or PETSC_NULL. You must leave room for the diagonal entry even if
1414            it is zero.
1415 .  o_nz - number of nonzeros per row in the off-diagonal portion of local
1416            submatrix (same for all local rows).
1417 .  o_nzz - array containing the number of nonzeros in the various rows of the
1418            off-diagonal portion of the local submatrix (possibly different for
1419            each row) or PETSC_NULL.
1420 
1421    Output Parameter:
1422 .  A - the matrix
1423 
1424    Notes:
1425    The AIJ format (also called the Yale sparse matrix format or
1426    compressed row storage), is fully compatible with standard Fortran 77
1427    storage.  That is, the stored row and column indices can begin at
1428    either one (as in Fortran) or zero.  See the users manual for details.
1429 
1430    The user MUST specify either the local or global matrix dimensions
1431    (possibly both).
1432 
1433    By default, this format uses inodes (identical nodes) when possible.
1434    We search for consecutive rows with the same nonzero structure, thereby
1435    reusing matrix information to achieve increased efficiency.
1436 
1437    Options Database Keys:
1438 $    -mat_aij_no_inode  - Do not use inodes
1439 $    -mat_aij_inode_limit <limit> - Set inode limit.
1440 $        (max limit=5)
1441 $    -mat_aij_oneindex - Internally use indexing starting at 1
1442 $        rather than 0.  Note: When calling MatSetValues(),
1443 $        the user still MUST index entries starting at 0!
1444 
1445    Storage Information:
1446    For a square global matrix we define each processor's diagonal portion
1447    to be its local rows and the corresponding columns (a square submatrix);
1448    each processor's off-diagonal portion encompasses the remainder of the
1449    local matrix (a rectangular submatrix).
1450 
1451    The user can specify preallocated storage for the diagonal part of
1452    the local submatrix with either d_nz or d_nnz (not both).  Set
1453    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1454    memory allocation.  Likewise, specify preallocated storage for the
1455    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1456 
1457    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1458    the figure below we depict these three local rows and all columns (0-11).
1459 
1460 $          0 1 2 3 4 5 6 7 8 9 10 11
1461 $         -------------------
1462 $  row 3  |  o o o d d d o o o o o o
1463 $  row 4  |  o o o d d d o o o o o o
1464 $  row 5  |  o o o d d d o o o o o o
1465 $         -------------------
1466 $
1467 
1468    Thus, any entries in the d locations are stored in the d (diagonal)
1469    submatrix, and any entries in the o locations are stored in the
1470    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1471    stored simply in the MATSEQAIJ format for compressed row storage.
1472 
1473    Now d_nz should indicate the number of nonzeros per row in the d matrix,
1474    and o_nz should indicate the number of nonzeros per row in the o matrix.
1475    In general, for PDE problems in which most nonzeros are near the diagonal,
1476    one expects d_nz >> o_nz. For large problems you MUST preallocate memory
1477    or you will get TERRIBLE performance; see the users' manual chapter on
1478    matrices.
1479 
1480 .keywords: matrix, aij, compressed row, sparse, parallel
1481 
1482 .seealso: MatCreate(), MatCreateSeqAIJ(), MatSetValues()
1483 @*/
1484 int MatCreateMPIAIJ(MPI_Comm comm,int m,int n,int M,int N,
1485                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1486 {
1487   Mat          B;
1488   Mat_MPIAIJ   *b;
1489   int          ierr, i,sum[2],work[2],size;
1490 
1491   *A = 0;
1492   PetscHeaderCreate(B,_Mat,MAT_COOKIE,MATMPIAIJ,comm);
1493   PLogObjectCreate(B);
1494   B->data       = (void *) (b = PetscNew(Mat_MPIAIJ)); CHKPTRQ(b);
1495   PetscMemzero(b,sizeof(Mat_MPIAIJ));
1496   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1497   MPI_Comm_size(comm,&size);
1498   if (size == 1) {
1499     B->ops.getrowij          = MatGetRowIJ_MPIAIJ;
1500     B->ops.restorerowij      = MatRestoreRowIJ_MPIAIJ;
1501     B->ops.lufactorsymbolic  = MatLUFactorSymbolic_MPIAIJ;
1502     B->ops.lufactornumeric   = MatLUFactorNumeric_MPIAIJ;
1503     B->ops.lufactor          = MatLUFactor_MPIAIJ;
1504     B->ops.solve             = MatSolve_MPIAIJ;
1505     B->ops.solveadd          = MatSolveAdd_MPIAIJ;
1506     B->ops.solvetrans        = MatSolveTrans_MPIAIJ;
1507     B->ops.solvetransadd     = MatSolveTransAdd_MPIAIJ;
1508     B->ops.ilufactorsymbolic = MatILUFactorSymbolic_MPIAIJ;
1509   }
1510   B->destroy    = MatDestroy_MPIAIJ;
1511   B->view       = MatView_MPIAIJ;
1512   B->factor     = 0;
1513   B->assembled  = PETSC_FALSE;
1514   B->mapping    = 0;
1515 
1516   b->insertmode = NOT_SET_VALUES;
1517   MPI_Comm_rank(comm,&b->rank);
1518   MPI_Comm_size(comm,&b->size);
1519 
1520   if (m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
1521     SETERRQ(1,0,"Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1522 
1523   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1524     work[0] = m; work[1] = n;
1525     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
1526     if (M == PETSC_DECIDE) M = sum[0];
1527     if (N == PETSC_DECIDE) N = sum[1];
1528   }
1529   if (m == PETSC_DECIDE) {m = M/b->size + ((M % b->size) > b->rank);}
1530   if (n == PETSC_DECIDE) {n = N/b->size + ((N % b->size) > b->rank);}
1531   b->m = m; B->m = m;
1532   b->n = n; B->n = n;
1533   b->N = N; B->N = N;
1534   b->M = M; B->M = M;
1535 
1536   /* build local table of row and column ownerships */
1537   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1538   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIAIJ));
1539   b->cowners = b->rowners + b->size + 2;
1540   MPI_Allgather(&m,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
1541   b->rowners[0] = 0;
1542   for ( i=2; i<=b->size; i++ ) {
1543     b->rowners[i] += b->rowners[i-1];
1544   }
1545   b->rstart = b->rowners[b->rank];
1546   b->rend   = b->rowners[b->rank+1];
1547   MPI_Allgather(&n,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
1548   b->cowners[0] = 0;
1549   for ( i=2; i<=b->size; i++ ) {
1550     b->cowners[i] += b->cowners[i-1];
1551   }
1552   b->cstart = b->cowners[b->rank];
1553   b->cend   = b->cowners[b->rank+1];
1554 
1555   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1556   ierr = MatCreateSeqAIJ(MPI_COMM_SELF,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1557   PLogObjectParent(B,b->A);
1558   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1559   ierr = MatCreateSeqAIJ(MPI_COMM_SELF,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1560   PLogObjectParent(B,b->B);
1561 
1562   /* build cache for off array entries formed */
1563   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1564   b->donotstash  = 0;
1565   b->colmap      = 0;
1566   b->garray      = 0;
1567   b->roworiented = 1;
1568 
1569   /* stuff used for matrix vector multiply */
1570   b->lvec      = 0;
1571   b->Mvctx     = 0;
1572 
1573   /* stuff for MatGetRow() */
1574   b->rowindices   = 0;
1575   b->rowvalues    = 0;
1576   b->getrowactive = PETSC_FALSE;
1577 
1578   *A = B;
1579   return 0;
1580 }
1581 
1582 #undef __FUNC__
1583 #define __FUNC__ "MatConvertSameType_MPIAIJ"
1584 static int MatConvertSameType_MPIAIJ(Mat matin,Mat *newmat,int cpvalues)
1585 {
1586   Mat        mat;
1587   Mat_MPIAIJ *a,*oldmat = (Mat_MPIAIJ *) matin->data;
1588   int        ierr, len=0, flg;
1589 
1590   *newmat       = 0;
1591   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIAIJ,matin->comm);
1592   PLogObjectCreate(mat);
1593   mat->data       = (void *) (a = PetscNew(Mat_MPIAIJ)); CHKPTRQ(a);
1594   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1595   mat->destroy    = MatDestroy_MPIAIJ;
1596   mat->view       = MatView_MPIAIJ;
1597   mat->factor     = matin->factor;
1598   mat->assembled  = PETSC_TRUE;
1599 
1600   a->m = mat->m   = oldmat->m;
1601   a->n = mat->n   = oldmat->n;
1602   a->M = mat->M   = oldmat->M;
1603   a->N = mat->N   = oldmat->N;
1604 
1605   a->rstart       = oldmat->rstart;
1606   a->rend         = oldmat->rend;
1607   a->cstart       = oldmat->cstart;
1608   a->cend         = oldmat->cend;
1609   a->size         = oldmat->size;
1610   a->rank         = oldmat->rank;
1611   a->insertmode   = NOT_SET_VALUES;
1612   a->rowvalues    = 0;
1613   a->getrowactive = PETSC_FALSE;
1614 
1615   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1616   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIAIJ));
1617   a->cowners = a->rowners + a->size + 2;
1618   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1619   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1620   if (oldmat->colmap) {
1621     a->colmap = (int *) PetscMalloc((a->N)*sizeof(int));CHKPTRQ(a->colmap);
1622     PLogObjectMemory(mat,(a->N)*sizeof(int));
1623     PetscMemcpy(a->colmap,oldmat->colmap,(a->N)*sizeof(int));
1624   } else a->colmap = 0;
1625   if (oldmat->garray) {
1626     len = ((Mat_SeqAIJ *) (oldmat->B->data))->n;
1627     a->garray = (int *) PetscMalloc((len+1)*sizeof(int)); CHKPTRQ(a->garray);
1628     PLogObjectMemory(mat,len*sizeof(int));
1629     if (len) PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1630   } else a->garray = 0;
1631 
1632   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1633   PLogObjectParent(mat,a->lvec);
1634   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1635   PLogObjectParent(mat,a->Mvctx);
1636   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1637   PLogObjectParent(mat,a->A);
1638   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1639   PLogObjectParent(mat,a->B);
1640   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1641   if (flg) {
1642     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1643   }
1644   *newmat = mat;
1645   return 0;
1646 }
1647 
1648 #include "sys.h"
1649 
1650 #undef __FUNC__
1651 #define __FUNC__ "MatLoad_MPIAIJ"
1652 int MatLoad_MPIAIJ(Viewer viewer,MatType type,Mat *newmat)
1653 {
1654   Mat          A;
1655   int          i, nz, ierr, j,rstart, rend, fd;
1656   Scalar       *vals,*svals;
1657   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1658   MPI_Status   status;
1659   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,maxnz,*cols;
1660   int          *ourlens,*sndcounts = 0,*procsnz = 0, *offlens,jj,*mycols,*smycols;
1661   int          tag = ((PetscObject)viewer)->tag;
1662 
1663   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1664   if (!rank) {
1665     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1666     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
1667     if (header[0] != MAT_COOKIE) SETERRQ(1,0,"not matrix object");
1668   }
1669 
1670   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1671   M = header[1]; N = header[2];
1672   /* determine ownership of all rows */
1673   m = M/size + ((M % size) > rank);
1674   rowners = (int *) PetscMalloc((size+2)*sizeof(int)); CHKPTRQ(rowners);
1675   MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1676   rowners[0] = 0;
1677   for ( i=2; i<=size; i++ ) {
1678     rowners[i] += rowners[i-1];
1679   }
1680   rstart = rowners[rank];
1681   rend   = rowners[rank+1];
1682 
1683   /* distribute row lengths to all processors */
1684   ourlens = (int*) PetscMalloc( 2*(rend-rstart)*sizeof(int) ); CHKPTRQ(ourlens);
1685   offlens = ourlens + (rend-rstart);
1686   if (!rank) {
1687     rowlengths = (int*) PetscMalloc( M*sizeof(int) ); CHKPTRQ(rowlengths);
1688     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1689     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1690     for ( i=0; i<size; i++ ) sndcounts[i] = rowners[i+1] - rowners[i];
1691     MPI_Scatterv(rowlengths,sndcounts,rowners,MPI_INT,ourlens,rend-rstart,MPI_INT,0,comm);
1692     PetscFree(sndcounts);
1693   }
1694   else {
1695     MPI_Scatterv(0,0,0,MPI_INT,ourlens,rend-rstart,MPI_INT, 0,comm);
1696   }
1697 
1698   if (!rank) {
1699     /* calculate the number of nonzeros on each processor */
1700     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1701     PetscMemzero(procsnz,size*sizeof(int));
1702     for ( i=0; i<size; i++ ) {
1703       for ( j=rowners[i]; j< rowners[i+1]; j++ ) {
1704         procsnz[i] += rowlengths[j];
1705       }
1706     }
1707     PetscFree(rowlengths);
1708 
1709     /* determine max buffer needed and allocate it */
1710     maxnz = 0;
1711     for ( i=0; i<size; i++ ) {
1712       maxnz = PetscMax(maxnz,procsnz[i]);
1713     }
1714     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1715 
1716     /* read in my part of the matrix column indices  */
1717     nz = procsnz[0];
1718     mycols = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1719     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1720 
1721     /* read in every one elses and ship off */
1722     for ( i=1; i<size; i++ ) {
1723       nz = procsnz[i];
1724       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1725       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1726     }
1727     PetscFree(cols);
1728   }
1729   else {
1730     /* determine buffer space needed for message */
1731     nz = 0;
1732     for ( i=0; i<m; i++ ) {
1733       nz += ourlens[i];
1734     }
1735     mycols = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1736 
1737     /* receive message of column indices*/
1738     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1739     MPI_Get_count(&status,MPI_INT,&maxnz);
1740     if (maxnz != nz) SETERRQ(1,0,"something is wrong with file");
1741   }
1742 
1743   /* loop over local rows, determining number of off diagonal entries */
1744   PetscMemzero(offlens,m*sizeof(int));
1745   jj = 0;
1746   for ( i=0; i<m; i++ ) {
1747     for ( j=0; j<ourlens[i]; j++ ) {
1748       if (mycols[jj] < rstart || mycols[jj] >= rend) offlens[i]++;
1749       jj++;
1750     }
1751   }
1752 
1753   /* create our matrix */
1754   for ( i=0; i<m; i++ ) {
1755     ourlens[i] -= offlens[i];
1756   }
1757   ierr = MatCreateMPIAIJ(comm,m,PETSC_DECIDE,M,N,0,ourlens,0,offlens,newmat);CHKERRQ(ierr);
1758   A = *newmat;
1759   MatSetOption(A,MAT_COLUMNS_SORTED);
1760   for ( i=0; i<m; i++ ) {
1761     ourlens[i] += offlens[i];
1762   }
1763 
1764   if (!rank) {
1765     vals = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(vals);
1766 
1767     /* read in my part of the matrix numerical values  */
1768     nz = procsnz[0];
1769     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1770 
1771     /* insert into matrix */
1772     jj      = rstart;
1773     smycols = mycols;
1774     svals   = vals;
1775     for ( i=0; i<m; i++ ) {
1776       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1777       smycols += ourlens[i];
1778       svals   += ourlens[i];
1779       jj++;
1780     }
1781 
1782     /* read in other processors and ship out */
1783     for ( i=1; i<size; i++ ) {
1784       nz = procsnz[i];
1785       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1786       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1787     }
1788     PetscFree(procsnz);
1789   }
1790   else {
1791     /* receive numeric values */
1792     vals = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(vals);
1793 
1794     /* receive message of values*/
1795     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1796     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1797     if (maxnz != nz) SETERRQ(1,0,"something is wrong with file");
1798 
1799     /* insert into matrix */
1800     jj      = rstart;
1801     smycols = mycols;
1802     svals   = vals;
1803     for ( i=0; i<m; i++ ) {
1804       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1805       smycols += ourlens[i];
1806       svals   += ourlens[i];
1807       jj++;
1808     }
1809   }
1810   PetscFree(ourlens); PetscFree(vals); PetscFree(mycols); PetscFree(rowners);
1811 
1812   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1813   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1814   return 0;
1815 }
1816