xref: /petsc/src/mat/impls/aij/mpi/mpiaij.c (revision 8ba3a721c9105e9240dbd90dceddbf8833cabcd1)
1 #ifndef lint
2 static char vcid[] = "$Id: mpiaij.c,v 1.189 1997/01/27 18:16:42 bsmith Exp curfman $";
3 #endif
4 
5 #include "src/mat/impls/aij/mpi/mpiaij.h"
6 #include "src/vec/vecimpl.h"
7 #include "src/inline/spops.h"
8 
9 /* local utility routine that creates a mapping from the global column
10 number to the local number in the off-diagonal part of the local
11 storage of the matrix.  This is done in a non scable way since the
12 length of colmap equals the global matrix length.
13 */
14 #undef __FUNC__
15 #define __FUNC__ "CreateColmap_MPIAIJ_Private"
16 int CreateColmap_MPIAIJ_Private(Mat mat)
17 {
18   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
19   Mat_SeqAIJ *B = (Mat_SeqAIJ*) aij->B->data;
20   int        n = B->n,i;
21 
22   aij->colmap = (int *) PetscMalloc(aij->N*sizeof(int));CHKPTRQ(aij->colmap);
23   PLogObjectMemory(mat,aij->N*sizeof(int));
24   PetscMemzero(aij->colmap,aij->N*sizeof(int));
25   for ( i=0; i<n; i++ ) aij->colmap[aij->garray[i]] = i+1;
26   return 0;
27 }
28 
29 extern int DisAssemble_MPIAIJ(Mat);
30 
31 #undef __FUNC__
32 #define __FUNC__ "MatGetRowIJ_MPIAIJ"
33 static int MatGetRowIJ_MPIAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
34                            PetscTruth *done)
35 {
36   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
37   int        ierr;
38   if (aij->size == 1) {
39     ierr = MatGetRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
40   } else SETERRQ(1,0,"not supported in parallel");
41   return 0;
42 }
43 
44 #undef __FUNC__
45 #define __FUNC__ "MatRestoreRowIJ_MPIAIJ"
46 static int MatRestoreRowIJ_MPIAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
47                                PetscTruth *done)
48 {
49   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
50   int        ierr;
51   if (aij->size == 1) {
52     ierr = MatRestoreRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
53   } else SETERRQ(1,0,"not supported in parallel");
54   return 0;
55 }
56 
57 #define CHUNKSIZE   15
58 #define MatSetValues_SeqAIJ_A_Private(A,row,col,value,addv) \
59 { \
60  \
61     rp   = aj + ai[row] + shift; ap = aa + ai[row] + shift; \
62     rmax = imax[row]; nrow = ailen[row];  \
63     col1 = col - shift; \
64      \
65       for ( _i=0; _i<nrow; _i++ ) { \
66         if (rp[_i] > col1) break; \
67         if (rp[_i] == col1) { \
68           if (addv == ADD_VALUES) ap[_i] += value;   \
69           else                  ap[_i] = value; \
70           goto _noinsert; \
71         } \
72       }  \
73       if (nonew)  goto _noinsert; \
74       if (nrow >= rmax) { \
75         /* there is no extra room in row, therefore enlarge */ \
76         int    new_nz = ai[a->m] + CHUNKSIZE,len,*new_i,*new_j; \
77         Scalar *new_a; \
78  \
79         /* malloc new storage space */ \
80         len     = new_nz*(sizeof(int)+sizeof(Scalar))+(a->m+1)*sizeof(int); \
81         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
82         new_j   = (int *) (new_a + new_nz); \
83         new_i   = new_j + new_nz; \
84  \
85         /* copy over old data into new slots */ \
86         for ( ii=0; ii<row+1; ii++ ) {new_i[ii] = ai[ii];} \
87         for ( ii=row+1; ii<a->m+1; ii++ ) {new_i[ii] = ai[ii]+CHUNKSIZE;} \
88         PetscMemcpy(new_j,aj,(ai[row]+nrow+shift)*sizeof(int)); \
89         len = (new_nz - CHUNKSIZE - ai[row] - nrow - shift); \
90         PetscMemcpy(new_j+ai[row]+shift+nrow+CHUNKSIZE,aj+ai[row]+shift+nrow, \
91                                                            len*sizeof(int)); \
92         PetscMemcpy(new_a,aa,(ai[row]+nrow+shift)*sizeof(Scalar)); \
93         PetscMemcpy(new_a+ai[row]+shift+nrow+CHUNKSIZE,aa+ai[row]+shift+nrow, \
94                                                            len*sizeof(Scalar));  \
95         /* free up old matrix storage */ \
96  \
97         PetscFree(a->a);  \
98         if (!a->singlemalloc) {PetscFree(a->i);PetscFree(a->j);} \
99         aa = a->a = new_a; ai = a->i = new_i; aj = a->j = new_j;  \
100         a->singlemalloc = 1; \
101  \
102         rp   = aj + ai[row] + shift; ap = aa + ai[row] + shift; \
103         rmax = imax[row] = imax[row] + CHUNKSIZE; \
104         PLogObjectMemory(A,CHUNKSIZE*(sizeof(int) + sizeof(Scalar))); \
105         a->maxnz += CHUNKSIZE; \
106         a->reallocs++; \
107       } \
108       N = nrow++ - 1; a->nz++; \
109       /* shift up all the later entries in this row */ \
110       for ( ii=N; ii>=_i; ii-- ) { \
111         rp[ii+1] = rp[ii]; \
112         ap[ii+1] = ap[ii]; \
113       } \
114       rp[_i] = col1;  \
115       ap[_i] = value;  \
116       _noinsert: ; \
117       ailen[row] = nrow; \
118 }
119 
120 extern int MatSetValues_SeqAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
121 #undef __FUNC__
122 #define __FUNC__ "MatSetValues_MPIAIJ"
123 static int MatSetValues_MPIAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
124 {
125   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
126   Scalar     value;
127   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
128   int        cstart = aij->cstart, cend = aij->cend,row,col;
129   int        roworiented = aij->roworiented;
130 
131   /* Some Variables required in the macro */
132   Mat        A = aij->A;
133   Mat_SeqAIJ *a = (Mat_SeqAIJ *) A->data;
134   int        *rp,ii,nrow,_i,rmax, N, col1;
135   int        *imax = a->imax, *ai = a->i, *ailen = a->ilen;
136   int        *aj = a->j, nonew = a->nonew,shift = a->indexshift;
137   Scalar     *ap, *aa = a->a;
138 
139 #if defined(PETSC_BOPT_g)
140   if (aij->insertmode != NOT_SET_VALUES && aij->insertmode != addv) {
141     SETERRQ(1,0,"Cannot mix inserts and adds");
142   }
143 #endif
144   aij->insertmode = addv;
145   for ( i=0; i<m; i++ ) {
146 #if defined(PETSC_BOPT_g)
147     if (im[i] < 0) SETERRQ(1,0,"Negative row");
148     if (im[i] >= aij->M) SETERRQ(1,0,"Row too large");
149 #endif
150     if (im[i] >= rstart && im[i] < rend) {
151       row = im[i] - rstart;
152       for ( j=0; j<n; j++ ) {
153         if (in[j] >= cstart && in[j] < cend){
154           col = in[j] - cstart;
155           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
156           MatSetValues_SeqAIJ_A_Private(aij->A,row,col,value,addv);
157           /* ierr = MatSetValues_SeqAIJ(aij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
158         }
159 #if defined(PETSC_BOPT_g)
160         else if (in[j] < 0) {SETERRQ(1,0,"Negative column");}
161         else if (in[j] >= aij->N) {SETERRQ(1,0,"Col too large");}
162 #endif
163         else {
164           if (mat->was_assembled) {
165             if (!aij->colmap) {
166               ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
167             }
168             col = aij->colmap[in[j]] - 1;
169             if (col < 0 && !((Mat_SeqAIJ*)(aij->A->data))->nonew) {
170               ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
171               col =  in[j];
172             }
173           }
174           else col = in[j];
175           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
176           ierr = MatSetValues_SeqAIJ(aij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
177         }
178       }
179     }
180     else {
181       if (roworiented && !aij->donotstash) {
182         ierr = StashValues_Private(&aij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
183       }
184       else {
185         if (!aij->donotstash) {
186           row = im[i];
187           for ( j=0; j<n; j++ ) {
188             ierr = StashValues_Private(&aij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
189           }
190         }
191       }
192     }
193   }
194   return 0;
195 }
196 
197 #undef __FUNC__
198 #define __FUNC__ "MatGetValues_MPIAIJ"
199 static int MatGetValues_MPIAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
200 {
201   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
202   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
203   int        cstart = aij->cstart, cend = aij->cend,row,col;
204 
205   for ( i=0; i<m; i++ ) {
206     if (idxm[i] < 0) SETERRQ(1,0,"Negative row");
207     if (idxm[i] >= aij->M) SETERRQ(1,0,"Row too large");
208     if (idxm[i] >= rstart && idxm[i] < rend) {
209       row = idxm[i] - rstart;
210       for ( j=0; j<n; j++ ) {
211         if (idxn[j] < 0) SETERRQ(1,0,"Negative column");
212         if (idxn[j] >= aij->N) SETERRQ(1,0,"Col too large");
213         if (idxn[j] >= cstart && idxn[j] < cend){
214           col = idxn[j] - cstart;
215           ierr = MatGetValues(aij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
216         }
217         else {
218           if (!aij->colmap) {
219             ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
220           }
221           col = aij->colmap[idxn[j]] - 1;
222           if ((col < 0) || (aij->garray[col] != idxn[j])) *(v+i*n+j) = 0.0;
223           else {
224             ierr = MatGetValues(aij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
225           }
226         }
227       }
228     }
229     else {
230       SETERRQ(1,0,"Only local values currently supported");
231     }
232   }
233   return 0;
234 }
235 
236 #undef __FUNC__
237 #define __FUNC__ "MatAssemblyBegin_MPIAIJ"
238 static int MatAssemblyBegin_MPIAIJ(Mat mat,MatAssemblyType mode)
239 {
240   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
241   MPI_Comm    comm = mat->comm;
242   int         size = aij->size, *owners = aij->rowners;
243   int         rank = aij->rank,tag = mat->tag, *owner,*starts,count,ierr;
244   MPI_Request *send_waits,*recv_waits;
245   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
246   InsertMode  addv;
247   Scalar      *rvalues,*svalues;
248 
249   /* make sure all processors are either in INSERTMODE or ADDMODE */
250   MPI_Allreduce(&aij->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
251   if (addv == (ADD_VALUES|INSERT_VALUES)) {
252     SETERRQ(1,0,"Some processors inserted others added");
253   }
254   aij->insertmode = addv; /* in case this processor had no cache */
255 
256   /*  first count number of contributors to each processor */
257   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
258   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
259   owner = (int *) PetscMalloc( (aij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
260   for ( i=0; i<aij->stash.n; i++ ) {
261     idx = aij->stash.idx[i];
262     for ( j=0; j<size; j++ ) {
263       if (idx >= owners[j] && idx < owners[j+1]) {
264         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
265       }
266     }
267   }
268   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
269 
270   /* inform other processors of number of messages and max length*/
271   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
272   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
273   nreceives = work[rank];
274   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
275   nmax = work[rank];
276   PetscFree(work);
277 
278   /* post receives:
279        1) each message will consist of ordered pairs
280      (global index,value) we store the global index as a double
281      to simplify the message passing.
282        2) since we don't know how long each individual message is we
283      allocate the largest needed buffer for each receive. Potentially
284      this is a lot of wasted space.
285 
286 
287        This could be done better.
288   */
289   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
290   CHKPTRQ(rvalues);
291   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
292   CHKPTRQ(recv_waits);
293   for ( i=0; i<nreceives; i++ ) {
294     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
295               comm,recv_waits+i);
296   }
297 
298   /* do sends:
299       1) starts[i] gives the starting index in svalues for stuff going to
300          the ith processor
301   */
302   svalues = (Scalar *) PetscMalloc(3*(aij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
303   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
304   CHKPTRQ(send_waits);
305   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
306   starts[0] = 0;
307   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
308   for ( i=0; i<aij->stash.n; i++ ) {
309     svalues[3*starts[owner[i]]]       = (Scalar)  aij->stash.idx[i];
310     svalues[3*starts[owner[i]]+1]     = (Scalar)  aij->stash.idy[i];
311     svalues[3*(starts[owner[i]]++)+2] =  aij->stash.array[i];
312   }
313   PetscFree(owner);
314   starts[0] = 0;
315   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
316   count = 0;
317   for ( i=0; i<size; i++ ) {
318     if (procs[i]) {
319       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
320                 comm,send_waits+count++);
321     }
322   }
323   PetscFree(starts); PetscFree(nprocs);
324 
325   /* Free cache space */
326   PLogInfo(mat,"MatAssemblyBegin_MPIAIJ:Number of off-processor values %d\n",aij->stash.n);
327   ierr = StashDestroy_Private(&aij->stash); CHKERRQ(ierr);
328 
329   aij->svalues    = svalues;    aij->rvalues    = rvalues;
330   aij->nsends     = nsends;     aij->nrecvs     = nreceives;
331   aij->send_waits = send_waits; aij->recv_waits = recv_waits;
332   aij->rmax       = nmax;
333 
334   return 0;
335 }
336 extern int MatSetUpMultiply_MPIAIJ(Mat);
337 
338 #undef __FUNC__
339 #define __FUNC__ "MatAssemblyEnd_MPIAIJ"
340 static int MatAssemblyEnd_MPIAIJ(Mat mat,MatAssemblyType mode)
341 {
342   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
343   MPI_Status  *send_status,recv_status;
344   int         imdex,nrecvs = aij->nrecvs, count = nrecvs, i, n, ierr;
345   int         row,col,other_disassembled;
346   Scalar      *values,val;
347   InsertMode  addv = aij->insertmode;
348 
349   /*  wait on receives */
350   while (count) {
351     MPI_Waitany(nrecvs,aij->recv_waits,&imdex,&recv_status);
352     /* unpack receives into our local space */
353     values = aij->rvalues + 3*imdex*aij->rmax;
354     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
355     n = n/3;
356     for ( i=0; i<n; i++ ) {
357       row = (int) PetscReal(values[3*i]) - aij->rstart;
358       col = (int) PetscReal(values[3*i+1]);
359       val = values[3*i+2];
360       if (col >= aij->cstart && col < aij->cend) {
361         col -= aij->cstart;
362         MatSetValues(aij->A,1,&row,1,&col,&val,addv);
363       }
364       else {
365         if (mat->was_assembled || mat->assembled) {
366           if (!aij->colmap) {
367             ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
368           }
369           col = aij->colmap[col] - 1;
370           if (col < 0  && !((Mat_SeqAIJ*)(aij->A->data))->nonew) {
371             ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
372             col = (int) PetscReal(values[3*i+1]);
373           }
374         }
375         MatSetValues(aij->B,1,&row,1,&col,&val,addv);
376       }
377     }
378     count--;
379   }
380   PetscFree(aij->recv_waits); PetscFree(aij->rvalues);
381 
382   /* wait on sends */
383   if (aij->nsends) {
384     send_status = (MPI_Status *) PetscMalloc(aij->nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
385     MPI_Waitall(aij->nsends,aij->send_waits,send_status);
386     PetscFree(send_status);
387   }
388   PetscFree(aij->send_waits); PetscFree(aij->svalues);
389 
390   aij->insertmode = NOT_SET_VALUES;
391   ierr = MatAssemblyBegin(aij->A,mode); CHKERRQ(ierr);
392   ierr = MatAssemblyEnd(aij->A,mode); CHKERRQ(ierr);
393 
394   /* determine if any processor has disassembled, if so we must
395      also disassemble ourselfs, in order that we may reassemble. */
396   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
397   if (mat->was_assembled && !other_disassembled) {
398     ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
399   }
400 
401   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
402     ierr = MatSetUpMultiply_MPIAIJ(mat); CHKERRQ(ierr);
403   }
404   ierr = MatAssemblyBegin(aij->B,mode); CHKERRQ(ierr);
405   ierr = MatAssemblyEnd(aij->B,mode); CHKERRQ(ierr);
406 
407   if (aij->rowvalues) {PetscFree(aij->rowvalues); aij->rowvalues = 0;}
408   return 0;
409 }
410 
411 #undef __FUNC__
412 #define __FUNC__ "MatZeroEntries_MPIAIJ"
413 static int MatZeroEntries_MPIAIJ(Mat A)
414 {
415   Mat_MPIAIJ *l = (Mat_MPIAIJ *) A->data;
416   int        ierr;
417   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
418   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
419   return 0;
420 }
421 
422 /* the code does not do the diagonal entries correctly unless the
423    matrix is square and the column and row owerships are identical.
424    This is a BUG. The only way to fix it seems to be to access
425    aij->A and aij->B directly and not through the MatZeroRows()
426    routine.
427 */
428 #undef __FUNC__
429 #define __FUNC__ "MatZeroRows_MPIAIJ"
430 static int MatZeroRows_MPIAIJ(Mat A,IS is,Scalar *diag)
431 {
432   Mat_MPIAIJ     *l = (Mat_MPIAIJ *) A->data;
433   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
434   int            *procs,*nprocs,j,found,idx,nsends,*work;
435   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
436   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
437   int            *lens,imdex,*lrows,*values;
438   MPI_Comm       comm = A->comm;
439   MPI_Request    *send_waits,*recv_waits;
440   MPI_Status     recv_status,*send_status;
441   IS             istmp;
442 
443   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
444   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
445 
446   /*  first count number of contributors to each processor */
447   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
448   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
449   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
450   for ( i=0; i<N; i++ ) {
451     idx = rows[i];
452     found = 0;
453     for ( j=0; j<size; j++ ) {
454       if (idx >= owners[j] && idx < owners[j+1]) {
455         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
456       }
457     }
458     if (!found) SETERRQ(1,0,"Index out of range");
459   }
460   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
461 
462   /* inform other processors of number of messages and max length*/
463   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
464   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
465   nrecvs = work[rank];
466   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
467   nmax = work[rank];
468   PetscFree(work);
469 
470   /* post receives:   */
471   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
472   CHKPTRQ(rvalues);
473   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
474   CHKPTRQ(recv_waits);
475   for ( i=0; i<nrecvs; i++ ) {
476     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
477   }
478 
479   /* do sends:
480       1) starts[i] gives the starting index in svalues for stuff going to
481          the ith processor
482   */
483   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
484   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
485   CHKPTRQ(send_waits);
486   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
487   starts[0] = 0;
488   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
489   for ( i=0; i<N; i++ ) {
490     svalues[starts[owner[i]]++] = rows[i];
491   }
492   ISRestoreIndices(is,&rows);
493 
494   starts[0] = 0;
495   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
496   count = 0;
497   for ( i=0; i<size; i++ ) {
498     if (procs[i]) {
499       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
500     }
501   }
502   PetscFree(starts);
503 
504   base = owners[rank];
505 
506   /*  wait on receives */
507   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
508   source = lens + nrecvs;
509   count  = nrecvs; slen = 0;
510   while (count) {
511     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
512     /* unpack receives into our local space */
513     MPI_Get_count(&recv_status,MPI_INT,&n);
514     source[imdex]  = recv_status.MPI_SOURCE;
515     lens[imdex]  = n;
516     slen += n;
517     count--;
518   }
519   PetscFree(recv_waits);
520 
521   /* move the data into the send scatter */
522   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
523   count = 0;
524   for ( i=0; i<nrecvs; i++ ) {
525     values = rvalues + i*nmax;
526     for ( j=0; j<lens[i]; j++ ) {
527       lrows[count++] = values[j] - base;
528     }
529   }
530   PetscFree(rvalues); PetscFree(lens);
531   PetscFree(owner); PetscFree(nprocs);
532 
533   /* actually zap the local rows */
534   ierr = ISCreateGeneral(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
535   PLogObjectParent(A,istmp);
536   PetscFree(lrows);
537   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
538   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
539   ierr = ISDestroy(istmp); CHKERRQ(ierr);
540 
541   /* wait on sends */
542   if (nsends) {
543     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
544     CHKPTRQ(send_status);
545     MPI_Waitall(nsends,send_waits,send_status);
546     PetscFree(send_status);
547   }
548   PetscFree(send_waits); PetscFree(svalues);
549 
550   return 0;
551 }
552 
553 #undef __FUNC__
554 #define __FUNC__ "MatMult_MPIAIJ"
555 static int MatMult_MPIAIJ(Mat A,Vec xx,Vec yy)
556 {
557   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
558   int        ierr,nt;
559 
560   ierr = VecGetLocalSize(xx,&nt);  CHKERRQ(ierr);
561   if (nt != a->n) {
562     SETERRQ(1,0,"Incompatible partition of A and xx");
563   }
564   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx); CHKERRQ(ierr);
565   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
566   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx); CHKERRQ(ierr);
567   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
568   return 0;
569 }
570 
571 #undef __FUNC__
572 #define __FUNC__ "MatMultAdd_MPIAIJ"
573 static int MatMultAdd_MPIAIJ(Mat A,Vec xx,Vec yy,Vec zz)
574 {
575   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
576   int        ierr;
577   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
578   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
579   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
580   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
581   return 0;
582 }
583 
584 #undef __FUNC__
585 #define __FUNC__ "MatMultTrans_MPIAIJ"
586 static int MatMultTrans_MPIAIJ(Mat A,Vec xx,Vec yy)
587 {
588   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
589   int        ierr;
590 
591   /* do nondiagonal part */
592   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
593   /* send it on its way */
594   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
595   /* do local part */
596   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
597   /* receive remote parts: note this assumes the values are not actually */
598   /* inserted in yy until the next line, which is true for my implementation*/
599   /* but is not perhaps always true. */
600   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
601   return 0;
602 }
603 
604 #undef __FUNC__
605 #define __FUNC__ "MatMultTransAdd_MPIAIJ"
606 static int MatMultTransAdd_MPIAIJ(Mat A,Vec xx,Vec yy,Vec zz)
607 {
608   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
609   int        ierr;
610 
611   /* do nondiagonal part */
612   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
613   /* send it on its way */
614   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
615   /* do local part */
616   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
617   /* receive remote parts: note this assumes the values are not actually */
618   /* inserted in yy until the next line, which is true for my implementation*/
619   /* but is not perhaps always true. */
620   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
621   return 0;
622 }
623 
624 /*
625   This only works correctly for square matrices where the subblock A->A is the
626    diagonal block
627 */
628 #undef __FUNC__
629 #define __FUNC__ "MatGetDiagonal_MPIAIJ"
630 static int MatGetDiagonal_MPIAIJ(Mat A,Vec v)
631 {
632   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
633   if (a->M != a->N)
634     SETERRQ(1,0,"Supports only square matrix where A->A is diag block");
635   if (a->rstart != a->cstart || a->rend != a->cend) {
636     SETERRQ(1,0,"row partition must equal col partition");  }
637   return MatGetDiagonal(a->A,v);
638 }
639 
640 #undef __FUNC__
641 #define __FUNC__ "MatScale_MPIAIJ"
642 static int MatScale_MPIAIJ(Scalar *aa,Mat A)
643 {
644   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
645   int        ierr;
646   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
647   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
648   return 0;
649 }
650 
651 #undef __FUNC__
652 #define __FUNC__ "MatDestroy_MPIAIJ"
653 static int MatDestroy_MPIAIJ(PetscObject obj)
654 {
655   Mat        mat = (Mat) obj;
656   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
657   int        ierr;
658 #if defined(PETSC_LOG)
659   PLogObjectState(obj,"Rows=%d, Cols=%d",aij->M,aij->N);
660 #endif
661   PetscFree(aij->rowners);
662   ierr = MatDestroy(aij->A); CHKERRQ(ierr);
663   ierr = MatDestroy(aij->B); CHKERRQ(ierr);
664   if (aij->colmap) PetscFree(aij->colmap);
665   if (aij->garray) PetscFree(aij->garray);
666   if (aij->lvec)   VecDestroy(aij->lvec);
667   if (aij->Mvctx)  VecScatterDestroy(aij->Mvctx);
668   if (aij->rowvalues) PetscFree(aij->rowvalues);
669   PetscFree(aij);
670   if (mat->mapping) {
671     ierr = ISLocalToGlobalMappingDestroy(mat->mapping); CHKERRQ(ierr);
672   }
673   PLogObjectDestroy(mat);
674   PetscHeaderDestroy(mat);
675   return 0;
676 }
677 #include "draw.h"
678 #include "pinclude/pviewer.h"
679 
680 #undef __FUNC__
681 #define __FUNC__ "MatView_MPIAIJ_Binary"
682 static int MatView_MPIAIJ_Binary(Mat mat,Viewer viewer)
683 {
684   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
685   int         ierr;
686 
687   if (aij->size == 1) {
688     ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
689   }
690   else SETERRQ(1,0,"Only uniprocessor output supported");
691   return 0;
692 }
693 
694 #undef __FUNC__
695 #define __FUNC__ "MatView_MPIAIJ_ASCIIorDraworMatlab"
696 static int MatView_MPIAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
697 {
698   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
699   Mat_SeqAIJ* C = (Mat_SeqAIJ*)aij->A->data;
700   int         ierr, format,shift = C->indexshift,rank;
701   FILE        *fd;
702   ViewerType  vtype;
703 
704   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
705   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
706     ierr = ViewerGetFormat(viewer,&format);
707     if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
708       MatInfo info;
709       int     flg;
710       MPI_Comm_rank(mat->comm,&rank);
711       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
712       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
713       ierr = OptionsHasName(PETSC_NULL,"-mat_aij_no_inode",&flg); CHKERRQ(ierr);
714       PetscSequentialPhaseBegin(mat->comm,1);
715       if (flg) fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d mem %d, not using I-node routines\n",
716          rank,aij->m,(int)info.nz_used,(int)info.nz_allocated,(int)info.memory);
717       else fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d mem %d, using I-node routines\n",
718          rank,aij->m,(int)info.nz_used,(int)info.nz_allocated,(int)info.memory);
719       ierr = MatGetInfo(aij->A,MAT_LOCAL,&info);
720       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used);
721       ierr = MatGetInfo(aij->B,MAT_LOCAL,&info);
722       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used);
723       fflush(fd);
724       PetscSequentialPhaseEnd(mat->comm,1);
725       ierr = VecScatterView(aij->Mvctx,viewer); CHKERRQ(ierr);
726       return 0;
727     }
728     else if (format == VIEWER_FORMAT_ASCII_INFO) {
729       return 0;
730     }
731   }
732 
733   if (vtype == DRAW_VIEWER) {
734     Draw       draw;
735     PetscTruth isnull;
736     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
737     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
738   }
739 
740   if (vtype == ASCII_FILE_VIEWER) {
741     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
742     PetscSequentialPhaseBegin(mat->comm,1);
743     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
744            aij->rank,aij->m,aij->rstart,aij->rend,aij->n,aij->cstart,
745            aij->cend);
746     ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
747     ierr = MatView(aij->B,viewer); CHKERRQ(ierr);
748     fflush(fd);
749     PetscSequentialPhaseEnd(mat->comm,1);
750   }
751   else {
752     int size = aij->size;
753     rank = aij->rank;
754     if (size == 1) {
755       ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
756     }
757     else {
758       /* assemble the entire matrix onto first processor. */
759       Mat         A;
760       Mat_SeqAIJ *Aloc;
761       int         M = aij->M, N = aij->N,m,*ai,*aj,row,*cols,i,*ct;
762       Scalar      *a;
763 
764       if (!rank) {
765         ierr = MatCreateMPIAIJ(mat->comm,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
766                CHKERRQ(ierr);
767       }
768       else {
769         ierr = MatCreateMPIAIJ(mat->comm,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
770                CHKERRQ(ierr);
771       }
772       PLogObjectParent(mat,A);
773 
774       /* copy over the A part */
775       Aloc = (Mat_SeqAIJ*) aij->A->data;
776       m = Aloc->m; ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
777       row = aij->rstart;
778       for ( i=0; i<ai[m]+shift; i++ ) {aj[i] += aij->cstart + shift;}
779       for ( i=0; i<m; i++ ) {
780         ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],aj,a,INSERT_VALUES);CHKERRQ(ierr);
781         row++; a += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
782       }
783       aj = Aloc->j;
784       for ( i=0; i<ai[m]+shift; i++ ) {aj[i] -= aij->cstart + shift;}
785 
786       /* copy over the B part */
787       Aloc = (Mat_SeqAIJ*) aij->B->data;
788       m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
789       row = aij->rstart;
790       ct = cols = (int *) PetscMalloc( (ai[m]+1)*sizeof(int) ); CHKPTRQ(cols);
791       for ( i=0; i<ai[m]+shift; i++ ) {cols[i] = aij->garray[aj[i]+shift];}
792       for ( i=0; i<m; i++ ) {
793         ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],cols,a,INSERT_VALUES);CHKERRQ(ierr);
794         row++; a += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
795       }
796       PetscFree(ct);
797       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
798       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
799       if (!rank) {
800         ierr = MatView(((Mat_MPIAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
801       }
802       ierr = MatDestroy(A); CHKERRQ(ierr);
803     }
804   }
805   return 0;
806 }
807 
808 #undef __FUNC__
809 #define __FUNC__ "MatView_MPIAIJ"
810 static int MatView_MPIAIJ(PetscObject obj,Viewer viewer)
811 {
812   Mat         mat = (Mat) obj;
813   int         ierr;
814   ViewerType  vtype;
815 
816   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
817   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
818       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
819     ierr = MatView_MPIAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
820   }
821   else if (vtype == BINARY_FILE_VIEWER) {
822     return MatView_MPIAIJ_Binary(mat,viewer);
823   }
824   return 0;
825 }
826 
827 /*
828     This has to provide several versions.
829 
830      2) a) use only local smoothing updating outer values only once.
831         b) local smoothing updating outer values each inner iteration
832      3) color updating out values betwen colors.
833 */
834 #undef __FUNC__
835 #define __FUNC__ "MatRelax_MPIAIJ"
836 static int MatRelax_MPIAIJ(Mat matin,Vec bb,double omega,MatSORType flag,
837                            double fshift,int its,Vec xx)
838 {
839   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
840   Mat        AA = mat->A, BB = mat->B;
841   Mat_SeqAIJ *A = (Mat_SeqAIJ *) AA->data, *B = (Mat_SeqAIJ *)BB->data;
842   Scalar     *b,*x,*xs,*ls,d,*v,sum;
843   int        ierr,*idx, *diag;
844   int        n = mat->n, m = mat->m, i,shift = A->indexshift;
845 
846   VecGetArray(xx,&x); VecGetArray(bb,&b); VecGetArray(mat->lvec,&ls);
847   xs = x + shift; /* shift by one for index start of 1 */
848   ls = ls + shift;
849   if (!A->diag) {if ((ierr = MatMarkDiag_SeqAIJ(AA))) return ierr;}
850   diag = A->diag;
851   if ((flag & SOR_LOCAL_SYMMETRIC_SWEEP) == SOR_LOCAL_SYMMETRIC_SWEEP){
852     if (flag & SOR_ZERO_INITIAL_GUESS) {
853       return (*mat->A->ops.relax)(mat->A,bb,omega,flag,fshift,its,xx);
854     }
855     ierr=VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);
856     CHKERRQ(ierr);
857     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);
858     CHKERRQ(ierr);
859     while (its--) {
860       /* go down through the rows */
861       for ( i=0; i<m; i++ ) {
862         n    = A->i[i+1] - A->i[i];
863         idx  = A->j + A->i[i] + shift;
864         v    = A->a + A->i[i] + shift;
865         sum  = b[i];
866         SPARSEDENSEMDOT(sum,xs,v,idx,n);
867         d    = fshift + A->a[diag[i]+shift];
868         n    = B->i[i+1] - B->i[i];
869         idx  = B->j + B->i[i] + shift;
870         v    = B->a + B->i[i] + shift;
871         SPARSEDENSEMDOT(sum,ls,v,idx,n);
872         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
873       }
874       /* come up through the rows */
875       for ( i=m-1; i>-1; i-- ) {
876         n    = A->i[i+1] - A->i[i];
877         idx  = A->j + A->i[i] + shift;
878         v    = A->a + A->i[i] + shift;
879         sum  = b[i];
880         SPARSEDENSEMDOT(sum,xs,v,idx,n);
881         d    = fshift + A->a[diag[i]+shift];
882         n    = B->i[i+1] - B->i[i];
883         idx  = B->j + B->i[i] + shift;
884         v    = B->a + B->i[i] + shift;
885         SPARSEDENSEMDOT(sum,ls,v,idx,n);
886         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
887       }
888     }
889   }
890   else if (flag & SOR_LOCAL_FORWARD_SWEEP){
891     if (flag & SOR_ZERO_INITIAL_GUESS) {
892       return (*mat->A->ops.relax)(mat->A,bb,omega,flag,fshift,its,xx);
893     }
894     ierr=VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);
895     CHKERRQ(ierr);
896     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);
897     CHKERRQ(ierr);
898     while (its--) {
899       for ( i=0; i<m; i++ ) {
900         n    = A->i[i+1] - A->i[i];
901         idx  = A->j + A->i[i] + shift;
902         v    = A->a + A->i[i] + shift;
903         sum  = b[i];
904         SPARSEDENSEMDOT(sum,xs,v,idx,n);
905         d    = fshift + A->a[diag[i]+shift];
906         n    = B->i[i+1] - B->i[i];
907         idx  = B->j + B->i[i] + shift;
908         v    = B->a + B->i[i] + shift;
909         SPARSEDENSEMDOT(sum,ls,v,idx,n);
910         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
911       }
912     }
913   }
914   else if (flag & SOR_LOCAL_BACKWARD_SWEEP){
915     if (flag & SOR_ZERO_INITIAL_GUESS) {
916       return (*mat->A->ops.relax)(mat->A,bb,omega,flag,fshift,its,xx);
917     }
918     ierr = VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,
919                             mat->Mvctx); CHKERRQ(ierr);
920     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,
921                             mat->Mvctx); CHKERRQ(ierr);
922     while (its--) {
923       for ( i=m-1; i>-1; i-- ) {
924         n    = A->i[i+1] - A->i[i];
925         idx  = A->j + A->i[i] + shift;
926         v    = A->a + A->i[i] + shift;
927         sum  = b[i];
928         SPARSEDENSEMDOT(sum,xs,v,idx,n);
929         d    = fshift + A->a[diag[i]+shift];
930         n    = B->i[i+1] - B->i[i];
931         idx  = B->j + B->i[i] + shift;
932         v    = B->a + B->i[i] + shift;
933         SPARSEDENSEMDOT(sum,ls,v,idx,n);
934         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
935       }
936     }
937   }
938   else {
939     SETERRQ(1,0,"Option not supported");
940   }
941   return 0;
942 }
943 
944 #undef __FUNC__
945 #define __FUNC__ "MatGetInfo_MPIAIJ"
946 static int MatGetInfo_MPIAIJ(Mat matin,MatInfoType flag,MatInfo *info)
947 {
948   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
949   Mat        A = mat->A, B = mat->B;
950   int        ierr;
951   double     isend[5], irecv[5];
952 
953   info->rows_global    = (double)mat->M;
954   info->columns_global = (double)mat->N;
955   info->rows_local     = (double)mat->m;
956   info->columns_local  = (double)mat->N;
957   info->block_size     = 1.0;
958   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
959   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->nz_unneeded;
960   isend[3] = info->memory;  isend[4] = info->mallocs;
961   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
962   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->nz_unneeded;
963   isend[3] += info->memory;  isend[4] += info->mallocs;
964   if (flag == MAT_LOCAL) {
965     info->nz_used      = isend[0];
966     info->nz_allocated = isend[1];
967     info->nz_unneeded  = isend[2];
968     info->memory       = isend[3];
969     info->mallocs      = isend[4];
970   } else if (flag == MAT_GLOBAL_MAX) {
971     MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_MAX,matin->comm);
972     info->nz_used      = irecv[0];
973     info->nz_allocated = irecv[1];
974     info->nz_unneeded  = irecv[2];
975     info->memory       = irecv[3];
976     info->mallocs      = irecv[4];
977   } else if (flag == MAT_GLOBAL_SUM) {
978     MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_SUM,matin->comm);
979     info->nz_used      = irecv[0];
980     info->nz_allocated = irecv[1];
981     info->nz_unneeded  = irecv[2];
982     info->memory       = irecv[3];
983     info->mallocs      = irecv[4];
984   }
985   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
986   info->fill_ratio_needed = 0;
987   info->factor_mallocs    = 0;
988 
989   return 0;
990 }
991 
992 extern int MatLUFactorSymbolic_MPIAIJ(Mat,IS,IS,double,Mat*);
993 extern int MatLUFactorNumeric_MPIAIJ(Mat,Mat*);
994 extern int MatLUFactor_MPIAIJ(Mat,IS,IS,double);
995 extern int MatILUFactorSymbolic_MPIAIJ(Mat,IS,IS,double,int,Mat *);
996 extern int MatSolve_MPIAIJ(Mat,Vec,Vec);
997 extern int MatSolveAdd_MPIAIJ(Mat,Vec,Vec,Vec);
998 extern int MatSolveTrans_MPIAIJ(Mat,Vec,Vec);
999 extern int MatSolveTransAdd_MPIAIJ(Mat,Vec,Vec,Vec);
1000 
1001 #undef __FUNC__
1002 #define __FUNC__ "MatSetOption_MPIAIJ"
1003 static int MatSetOption_MPIAIJ(Mat A,MatOption op)
1004 {
1005   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
1006 
1007   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
1008       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
1009       op == MAT_COLUMNS_UNSORTED ||
1010       op == MAT_COLUMNS_SORTED) {
1011         MatSetOption(a->A,op);
1012         MatSetOption(a->B,op);
1013   } else if (op == MAT_ROW_ORIENTED) {
1014     a->roworiented = 1;
1015     MatSetOption(a->A,op);
1016     MatSetOption(a->B,op);
1017   } else if (op == MAT_ROWS_SORTED ||
1018              op == MAT_ROWS_UNSORTED ||
1019              op == MAT_SYMMETRIC ||
1020              op == MAT_STRUCTURALLY_SYMMETRIC ||
1021              op == MAT_YES_NEW_DIAGONALS)
1022     PLogInfo(A,"Info:MatSetOption_MPIAIJ:Option ignored\n");
1023   else if (op == MAT_COLUMN_ORIENTED) {
1024     a->roworiented = 0;
1025     MatSetOption(a->A,op);
1026     MatSetOption(a->B,op);
1027   } else if (op == MAT_IGNORE_OFF_PROCESSOR_ENTRIES) {
1028     a->donotstash = 1;
1029   } else if (op == MAT_NO_NEW_DIAGONALS)
1030     {SETERRQ(PETSC_ERR_SUP,0,"MAT_NO_NEW_DIAGONALS");}
1031   else
1032     {SETERRQ(PETSC_ERR_SUP,0,"unknown option");}
1033   return 0;
1034 }
1035 
1036 #undef __FUNC__
1037 #define __FUNC__ "MatGetSize_MPIAIJ"
1038 static int MatGetSize_MPIAIJ(Mat matin,int *m,int *n)
1039 {
1040   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1041   *m = mat->M; *n = mat->N;
1042   return 0;
1043 }
1044 
1045 #undef __FUNC__
1046 #define __FUNC__ "MatGetLocalSize_MPIAIJ"
1047 static int MatGetLocalSize_MPIAIJ(Mat matin,int *m,int *n)
1048 {
1049   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1050   *m = mat->m; *n = mat->N;
1051   return 0;
1052 }
1053 
1054 #undef __FUNC__
1055 #define __FUNC__ "MatGetOwnershipRange_MPIAIJ"
1056 static int MatGetOwnershipRange_MPIAIJ(Mat matin,int *m,int *n)
1057 {
1058   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1059   *m = mat->rstart; *n = mat->rend;
1060   return 0;
1061 }
1062 
1063 extern int MatGetRow_SeqAIJ(Mat,int,int*,int**,Scalar**);
1064 extern int MatRestoreRow_SeqAIJ(Mat,int,int*,int**,Scalar**);
1065 
1066 #undef __FUNC__
1067 #define __FUNC__ "MatGetRow_MPIAIJ"
1068 int MatGetRow_MPIAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
1069 {
1070   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1071   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
1072   int        i, ierr, *cworkA, *cworkB, **pcA, **pcB, cstart = mat->cstart;
1073   int        nztot, nzA, nzB, lrow, rstart = mat->rstart, rend = mat->rend;
1074   int        *cmap, *idx_p;
1075 
1076   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,0,"Already active");
1077   mat->getrowactive = PETSC_TRUE;
1078 
1079   if (!mat->rowvalues && (idx || v)) {
1080     /*
1081         allocate enough space to hold information from the longest row.
1082     */
1083     Mat_SeqAIJ *Aa = (Mat_SeqAIJ *) mat->A->data,*Ba = (Mat_SeqAIJ *) mat->B->data;
1084     int     max = 1,m = mat->m,tmp;
1085     for ( i=0; i<m; i++ ) {
1086       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
1087       if (max < tmp) { max = tmp; }
1088     }
1089     mat->rowvalues = (Scalar *) PetscMalloc( max*(sizeof(int)+sizeof(Scalar)));
1090     CHKPTRQ(mat->rowvalues);
1091     mat->rowindices = (int *) (mat->rowvalues + max);
1092   }
1093 
1094   if (row < rstart || row >= rend) SETERRQ(1,0,"Only local rows")
1095   lrow = row - rstart;
1096 
1097   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
1098   if (!v)   {pvA = 0; pvB = 0;}
1099   if (!idx) {pcA = 0; if (!v) pcB = 0;}
1100   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1101   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1102   nztot = nzA + nzB;
1103 
1104   cmap  = mat->garray;
1105   if (v  || idx) {
1106     if (nztot) {
1107       /* Sort by increasing column numbers, assuming A and B already sorted */
1108       int imark = -1;
1109       if (v) {
1110         *v = v_p = mat->rowvalues;
1111         for ( i=0; i<nzB; i++ ) {
1112           if (cmap[cworkB[i]] < cstart)   v_p[i] = vworkB[i];
1113           else break;
1114         }
1115         imark = i;
1116         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
1117         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
1118       }
1119       if (idx) {
1120         *idx = idx_p = mat->rowindices;
1121         if (imark > -1) {
1122           for ( i=0; i<imark; i++ ) {
1123             idx_p[i] = cmap[cworkB[i]];
1124           }
1125         } else {
1126           for ( i=0; i<nzB; i++ ) {
1127             if (cmap[cworkB[i]] < cstart)   idx_p[i] = cmap[cworkB[i]];
1128             else break;
1129           }
1130           imark = i;
1131         }
1132         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart + cworkA[i];
1133         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]];
1134       }
1135     }
1136     else {
1137       if (idx) *idx = 0;
1138       if (v)   *v   = 0;
1139     }
1140   }
1141   *nz = nztot;
1142   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1143   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1144   return 0;
1145 }
1146 
1147 #undef __FUNC__
1148 #define __FUNC__ "MatRestoreRow_MPIAIJ"
1149 int MatRestoreRow_MPIAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
1150 {
1151   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
1152   if (aij->getrowactive == PETSC_FALSE) {
1153     SETERRQ(1,0,"MatGetRow not called");
1154   }
1155   aij->getrowactive = PETSC_FALSE;
1156   return 0;
1157 }
1158 
1159 #undef __FUNC__
1160 #define __FUNC__ "MatNorm_MPIAIJ"
1161 static int MatNorm_MPIAIJ(Mat mat,NormType type,double *norm)
1162 {
1163   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
1164   Mat_SeqAIJ *amat = (Mat_SeqAIJ*) aij->A->data, *bmat = (Mat_SeqAIJ*) aij->B->data;
1165   int        ierr, i, j, cstart = aij->cstart,shift = amat->indexshift;
1166   double     sum = 0.0;
1167   Scalar     *v;
1168 
1169   if (aij->size == 1) {
1170     ierr =  MatNorm(aij->A,type,norm); CHKERRQ(ierr);
1171   } else {
1172     if (type == NORM_FROBENIUS) {
1173       v = amat->a;
1174       for (i=0; i<amat->nz; i++ ) {
1175 #if defined(PETSC_COMPLEX)
1176         sum += real(conj(*v)*(*v)); v++;
1177 #else
1178         sum += (*v)*(*v); v++;
1179 #endif
1180       }
1181       v = bmat->a;
1182       for (i=0; i<bmat->nz; i++ ) {
1183 #if defined(PETSC_COMPLEX)
1184         sum += real(conj(*v)*(*v)); v++;
1185 #else
1186         sum += (*v)*(*v); v++;
1187 #endif
1188       }
1189       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
1190       *norm = sqrt(*norm);
1191     }
1192     else if (type == NORM_1) { /* max column norm */
1193       double *tmp, *tmp2;
1194       int    *jj, *garray = aij->garray;
1195       tmp  = (double *) PetscMalloc( aij->N*sizeof(double) ); CHKPTRQ(tmp);
1196       tmp2 = (double *) PetscMalloc( aij->N*sizeof(double) ); CHKPTRQ(tmp2);
1197       PetscMemzero(tmp,aij->N*sizeof(double));
1198       *norm = 0.0;
1199       v = amat->a; jj = amat->j;
1200       for ( j=0; j<amat->nz; j++ ) {
1201         tmp[cstart + *jj++ + shift] += PetscAbsScalar(*v);  v++;
1202       }
1203       v = bmat->a; jj = bmat->j;
1204       for ( j=0; j<bmat->nz; j++ ) {
1205         tmp[garray[*jj++ + shift]] += PetscAbsScalar(*v); v++;
1206       }
1207       MPI_Allreduce(tmp,tmp2,aij->N,MPI_DOUBLE,MPI_SUM,mat->comm);
1208       for ( j=0; j<aij->N; j++ ) {
1209         if (tmp2[j] > *norm) *norm = tmp2[j];
1210       }
1211       PetscFree(tmp); PetscFree(tmp2);
1212     }
1213     else if (type == NORM_INFINITY) { /* max row norm */
1214       double ntemp = 0.0;
1215       for ( j=0; j<amat->m; j++ ) {
1216         v = amat->a + amat->i[j] + shift;
1217         sum = 0.0;
1218         for ( i=0; i<amat->i[j+1]-amat->i[j]; i++ ) {
1219           sum += PetscAbsScalar(*v); v++;
1220         }
1221         v = bmat->a + bmat->i[j] + shift;
1222         for ( i=0; i<bmat->i[j+1]-bmat->i[j]; i++ ) {
1223           sum += PetscAbsScalar(*v); v++;
1224         }
1225         if (sum > ntemp) ntemp = sum;
1226       }
1227       MPI_Allreduce(&ntemp,norm,1,MPI_DOUBLE,MPI_MAX,mat->comm);
1228     }
1229     else {
1230       SETERRQ(1,0,"No support for two norm");
1231     }
1232   }
1233   return 0;
1234 }
1235 
1236 #undef __FUNC__
1237 #define __FUNC__ "MatTranspose_MPIAIJ"
1238 static int MatTranspose_MPIAIJ(Mat A,Mat *matout)
1239 {
1240   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
1241   Mat_SeqAIJ *Aloc = (Mat_SeqAIJ *) a->A->data;
1242   int        ierr,shift = Aloc->indexshift;
1243   Mat        B;
1244   int        M = a->M, N = a->N,m,*ai,*aj,row,*cols,i,*ct;
1245   Scalar     *array;
1246 
1247   if (matout == PETSC_NULL && M != N)
1248     SETERRQ(1,0,"Square matrix only for in-place");
1249   ierr = MatCreateMPIAIJ(A->comm,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,
1250          PETSC_NULL,&B); CHKERRQ(ierr);
1251 
1252   /* copy over the A part */
1253   Aloc = (Mat_SeqAIJ*) a->A->data;
1254   m = Aloc->m; ai = Aloc->i; aj = Aloc->j; array = Aloc->a;
1255   row = a->rstart;
1256   for ( i=0; i<ai[m]+shift; i++ ) {aj[i] += a->cstart + shift;}
1257   for ( i=0; i<m; i++ ) {
1258     ierr = MatSetValues(B,ai[i+1]-ai[i],aj,1,&row,array,INSERT_VALUES);CHKERRQ(ierr);
1259     row++; array += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
1260   }
1261   aj = Aloc->j;
1262   for ( i=0; i<ai[m]+shift; i++ ) {aj[i] -= a->cstart + shift;}
1263 
1264   /* copy over the B part */
1265   Aloc = (Mat_SeqAIJ*) a->B->data;
1266   m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; array = Aloc->a;
1267   row = a->rstart;
1268   ct = cols = (int *) PetscMalloc( (1+ai[m]-shift)*sizeof(int) ); CHKPTRQ(cols);
1269   for ( i=0; i<ai[m]+shift; i++ ) {cols[i] = a->garray[aj[i]+shift];}
1270   for ( i=0; i<m; i++ ) {
1271     ierr = MatSetValues(B,ai[i+1]-ai[i],cols,1,&row,array,INSERT_VALUES);CHKERRQ(ierr);
1272     row++; array += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
1273   }
1274   PetscFree(ct);
1275   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1276   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1277   if (matout != PETSC_NULL) {
1278     *matout = B;
1279   } else {
1280     /* This isn't really an in-place transpose .... but free data structures from a */
1281     PetscFree(a->rowners);
1282     ierr = MatDestroy(a->A); CHKERRQ(ierr);
1283     ierr = MatDestroy(a->B); CHKERRQ(ierr);
1284     if (a->colmap) PetscFree(a->colmap);
1285     if (a->garray) PetscFree(a->garray);
1286     if (a->lvec) VecDestroy(a->lvec);
1287     if (a->Mvctx) VecScatterDestroy(a->Mvctx);
1288     PetscFree(a);
1289     PetscMemcpy(A,B,sizeof(struct _Mat));
1290     PetscHeaderDestroy(B);
1291   }
1292   return 0;
1293 }
1294 
1295 #undef __FUNC__
1296 #define __FUNC__ "MatDiagonalScale_MPIAIJ"
1297 int MatDiagonalScale_MPIAIJ(Mat mat,Vec ll,Vec rr)
1298 {
1299   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
1300   Mat a = aij->A, b = aij->B;
1301   int ierr,s1,s2,s3;
1302 
1303   ierr = MatGetLocalSize(mat,&s2,&s3); CHKERRQ(ierr);
1304   if (rr) {
1305     s3 = aij->n;
1306     VecGetLocalSize_Fast(rr,s1);
1307     if (s1!=s3) SETERRQ(1,0,"right vector non-conforming local size");
1308     /* Overlap communication with computation. */
1309     ierr = VecScatterBegin(rr,aij->lvec,INSERT_VALUES,SCATTER_FORWARD,aij->Mvctx); CHKERRQ(ierr);
1310   }
1311   if (ll) {
1312     VecGetLocalSize_Fast(ll,s1);
1313     if (s1!=s2) SETERRQ(1,0,"left vector non-conforming local size");
1314     ierr = (*b->ops.diagonalscale)(b,ll,0); CHKERRQ(ierr);
1315   }
1316   /* scale  the diagonal block */
1317   ierr = (*a->ops.diagonalscale)(a,ll,rr); CHKERRQ(ierr);
1318 
1319   if (rr) {
1320     /* Do a scatter end and then right scale the off-diagonal block */
1321     ierr = VecScatterEnd(rr,aij->lvec,INSERT_VALUES,SCATTER_FORWARD,aij->Mvctx); CHKERRQ(ierr);
1322     ierr = (*b->ops.diagonalscale)(b,0,aij->lvec); CHKERRQ(ierr);
1323   }
1324 
1325   return 0;
1326 }
1327 
1328 
1329 extern int MatPrintHelp_SeqAIJ(Mat);
1330 #undef __FUNC__
1331 #define __FUNC__ "MatPrintHelp_MPIAIJ"
1332 static int MatPrintHelp_MPIAIJ(Mat A)
1333 {
1334   Mat_MPIAIJ *a   = (Mat_MPIAIJ*) A->data;
1335 
1336   if (!a->rank) return MatPrintHelp_SeqAIJ(a->A);
1337   else return 0;
1338 }
1339 
1340 #undef __FUNC__
1341 #define __FUNC__ "MatGetBlockSize_MPIAIJ"
1342 static int MatGetBlockSize_MPIAIJ(Mat A,int *bs)
1343 {
1344   *bs = 1;
1345   return 0;
1346 }
1347 #undef __FUNC__
1348 #define __FUNC__ "MatSetUnfactored_MPIAIJ"
1349 static int MatSetUnfactored_MPIAIJ(Mat A)
1350 {
1351   Mat_MPIAIJ *a   = (Mat_MPIAIJ*) A->data;
1352   int        ierr;
1353   ierr = MatSetUnfactored(a->A); CHKERRQ(ierr);
1354   return 0;
1355 }
1356 
1357 static int MatConvertSameType_MPIAIJ(Mat,Mat *,int);
1358 extern int MatIncreaseOverlap_MPIAIJ(Mat , int, IS *, int);
1359 extern int MatFDColoringCreate_MPIAIJ(Mat,ISColoring,MatFDColoring);
1360 extern int MatGetSubMatrices_MPIAIJ (Mat ,int , IS *,IS *,MatGetSubMatrixCall,Mat **);
1361 /* -------------------------------------------------------------------*/
1362 static struct _MatOps MatOps = {MatSetValues_MPIAIJ,
1363        MatGetRow_MPIAIJ,MatRestoreRow_MPIAIJ,
1364        MatMult_MPIAIJ,MatMultAdd_MPIAIJ,
1365        MatMultTrans_MPIAIJ,MatMultTransAdd_MPIAIJ,
1366        0,0,
1367        0,0,
1368        0,0,
1369        MatRelax_MPIAIJ,
1370        MatTranspose_MPIAIJ,
1371        MatGetInfo_MPIAIJ,0,
1372        MatGetDiagonal_MPIAIJ,MatDiagonalScale_MPIAIJ,MatNorm_MPIAIJ,
1373        MatAssemblyBegin_MPIAIJ,MatAssemblyEnd_MPIAIJ,
1374        0,
1375        MatSetOption_MPIAIJ,MatZeroEntries_MPIAIJ,MatZeroRows_MPIAIJ,
1376        0,0,0,0,
1377        MatGetSize_MPIAIJ,MatGetLocalSize_MPIAIJ,MatGetOwnershipRange_MPIAIJ,
1378        0,0,
1379        0,0,MatConvertSameType_MPIAIJ,0,0,
1380        0,0,0,
1381        MatGetSubMatrices_MPIAIJ,MatIncreaseOverlap_MPIAIJ,MatGetValues_MPIAIJ,0,
1382        MatPrintHelp_MPIAIJ,
1383        MatScale_MPIAIJ,0,0,0,
1384        MatGetBlockSize_MPIAIJ,0,0,0,0,
1385        MatFDColoringCreate_MPIAIJ,0,MatSetUnfactored_MPIAIJ};
1386 
1387 
1388 #undef __FUNC__
1389 #define __FUNC__ "MatCreateMPIAIJ"
1390 /*@C
1391    MatCreateMPIAIJ - Creates a sparse parallel matrix in AIJ format
1392    (the default parallel PETSc format).  For good matrix assembly performance
1393    the user should preallocate the matrix storage by setting the parameters
1394    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1395    performance can be increased by more than a factor of 50.
1396 
1397    Input Parameters:
1398 .  comm - MPI communicator
1399 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1400            This value should be the same as the local size used in creating the
1401            y vector for the matrix-vector product y = Ax.
1402 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1403            This value should be the same as the local size used in creating the
1404            x vector for the matrix-vector product y = Ax.
1405 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1406 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1407 .  d_nz - number of nonzeros per row in diagonal portion of local submatrix
1408            (same for all local rows)
1409 .  d_nzz - array containing the number of nonzeros in the various rows of the
1410            diagonal portion of the local submatrix (possibly different for each row)
1411            or PETSC_NULL. You must leave room for the diagonal entry even if
1412            it is zero.
1413 .  o_nz - number of nonzeros per row in the off-diagonal portion of local
1414            submatrix (same for all local rows).
1415 .  o_nzz - array containing the number of nonzeros in the various rows of the
1416            off-diagonal portion of the local submatrix (possibly different for
1417            each row) or PETSC_NULL.
1418 
1419    Output Parameter:
1420 .  A - the matrix
1421 
1422    Notes:
1423    The AIJ format (also called the Yale sparse matrix format or
1424    compressed row storage), is fully compatible with standard Fortran 77
1425    storage.  That is, the stored row and column indices can begin at
1426    either one (as in Fortran) or zero.  See the users manual for details.
1427 
1428    The user MUST specify either the local or global matrix dimensions
1429    (possibly both).
1430 
1431    By default, this format uses inodes (identical nodes) when possible.
1432    We search for consecutive rows with the same nonzero structure, thereby
1433    reusing matrix information to achieve increased efficiency.
1434 
1435    Options Database Keys:
1436 $    -mat_aij_no_inode  - Do not use inodes
1437 $    -mat_aij_inode_limit <limit> - Set inode limit.
1438 $        (max limit=5)
1439 $    -mat_aij_oneindex - Internally use indexing starting at 1
1440 $        rather than 0.  Note: When calling MatSetValues(),
1441 $        the user still MUST index entries starting at 0!
1442 
1443    Storage Information:
1444    For a square global matrix we define each processor's diagonal portion
1445    to be its local rows and the corresponding columns (a square submatrix);
1446    each processor's off-diagonal portion encompasses the remainder of the
1447    local matrix (a rectangular submatrix).
1448 
1449    The user can specify preallocated storage for the diagonal part of
1450    the local submatrix with either d_nz or d_nnz (not both).  Set
1451    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1452    memory allocation.  Likewise, specify preallocated storage for the
1453    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1454 
1455    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1456    the figure below we depict these three local rows and all columns (0-11).
1457 
1458 $          0 1 2 3 4 5 6 7 8 9 10 11
1459 $         -------------------
1460 $  row 3  |  o o o d d d o o o o o o
1461 $  row 4  |  o o o d d d o o o o o o
1462 $  row 5  |  o o o d d d o o o o o o
1463 $         -------------------
1464 $
1465 
1466    Thus, any entries in the d locations are stored in the d (diagonal)
1467    submatrix, and any entries in the o locations are stored in the
1468    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1469    stored simply in the MATSEQAIJ format for compressed row storage.
1470 
1471    Now d_nz should indicate the number of nonzeros per row in the d matrix,
1472    and o_nz should indicate the number of nonzeros per row in the o matrix.
1473    In general, for PDE problems in which most nonzeros are near the diagonal,
1474    one expects d_nz >> o_nz. For large problems you MUST preallocate memory
1475    or you will get TERRIBLE performance; see the users' manual chapter on
1476    matrices.
1477 
1478 .keywords: matrix, aij, compressed row, sparse, parallel
1479 
1480 .seealso: MatCreate(), MatCreateSeqAIJ(), MatSetValues()
1481 @*/
1482 int MatCreateMPIAIJ(MPI_Comm comm,int m,int n,int M,int N,
1483                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1484 {
1485   Mat          B;
1486   Mat_MPIAIJ   *b;
1487   int          ierr, i,sum[2],work[2],size;
1488 
1489   *A = 0;
1490   PetscHeaderCreate(B,_Mat,MAT_COOKIE,MATMPIAIJ,comm);
1491   PLogObjectCreate(B);
1492   B->data       = (void *) (b = PetscNew(Mat_MPIAIJ)); CHKPTRQ(b);
1493   PetscMemzero(b,sizeof(Mat_MPIAIJ));
1494   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1495   MPI_Comm_size(comm,&size);
1496   if (size == 1) {
1497     B->ops.getrowij          = MatGetRowIJ_MPIAIJ;
1498     B->ops.restorerowij      = MatRestoreRowIJ_MPIAIJ;
1499     B->ops.lufactorsymbolic  = MatLUFactorSymbolic_MPIAIJ;
1500     B->ops.lufactornumeric   = MatLUFactorNumeric_MPIAIJ;
1501     B->ops.lufactor          = MatLUFactor_MPIAIJ;
1502     B->ops.solve             = MatSolve_MPIAIJ;
1503     B->ops.solveadd          = MatSolveAdd_MPIAIJ;
1504     B->ops.solvetrans        = MatSolveTrans_MPIAIJ;
1505     B->ops.solvetransadd     = MatSolveTransAdd_MPIAIJ;
1506     B->ops.ilufactorsymbolic = MatILUFactorSymbolic_MPIAIJ;
1507   }
1508   B->destroy    = MatDestroy_MPIAIJ;
1509   B->view       = MatView_MPIAIJ;
1510   B->factor     = 0;
1511   B->assembled  = PETSC_FALSE;
1512   B->mapping    = 0;
1513 
1514   b->insertmode = NOT_SET_VALUES;
1515   MPI_Comm_rank(comm,&b->rank);
1516   MPI_Comm_size(comm,&b->size);
1517 
1518   if (m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
1519     SETERRQ(1,0,"Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1520 
1521   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1522     work[0] = m; work[1] = n;
1523     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
1524     if (M == PETSC_DECIDE) M = sum[0];
1525     if (N == PETSC_DECIDE) N = sum[1];
1526   }
1527   if (m == PETSC_DECIDE) {m = M/b->size + ((M % b->size) > b->rank);}
1528   if (n == PETSC_DECIDE) {n = N/b->size + ((N % b->size) > b->rank);}
1529   b->m = m; B->m = m;
1530   b->n = n; B->n = n;
1531   b->N = N; B->N = N;
1532   b->M = M; B->M = M;
1533 
1534   /* build local table of row and column ownerships */
1535   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1536   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIAIJ));
1537   b->cowners = b->rowners + b->size + 2;
1538   MPI_Allgather(&m,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
1539   b->rowners[0] = 0;
1540   for ( i=2; i<=b->size; i++ ) {
1541     b->rowners[i] += b->rowners[i-1];
1542   }
1543   b->rstart = b->rowners[b->rank];
1544   b->rend   = b->rowners[b->rank+1];
1545   MPI_Allgather(&n,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
1546   b->cowners[0] = 0;
1547   for ( i=2; i<=b->size; i++ ) {
1548     b->cowners[i] += b->cowners[i-1];
1549   }
1550   b->cstart = b->cowners[b->rank];
1551   b->cend   = b->cowners[b->rank+1];
1552 
1553   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1554   ierr = MatCreateSeqAIJ(MPI_COMM_SELF,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1555   PLogObjectParent(B,b->A);
1556   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1557   ierr = MatCreateSeqAIJ(MPI_COMM_SELF,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1558   PLogObjectParent(B,b->B);
1559 
1560   /* build cache for off array entries formed */
1561   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1562   b->donotstash  = 0;
1563   b->colmap      = 0;
1564   b->garray      = 0;
1565   b->roworiented = 1;
1566 
1567   /* stuff used for matrix vector multiply */
1568   b->lvec      = 0;
1569   b->Mvctx     = 0;
1570 
1571   /* stuff for MatGetRow() */
1572   b->rowindices   = 0;
1573   b->rowvalues    = 0;
1574   b->getrowactive = PETSC_FALSE;
1575 
1576   *A = B;
1577   return 0;
1578 }
1579 
1580 #undef __FUNC__
1581 #define __FUNC__ "MatConvertSameType_MPIAIJ"
1582 static int MatConvertSameType_MPIAIJ(Mat matin,Mat *newmat,int cpvalues)
1583 {
1584   Mat        mat;
1585   Mat_MPIAIJ *a,*oldmat = (Mat_MPIAIJ *) matin->data;
1586   int        ierr, len=0, flg;
1587 
1588   *newmat       = 0;
1589   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIAIJ,matin->comm);
1590   PLogObjectCreate(mat);
1591   mat->data       = (void *) (a = PetscNew(Mat_MPIAIJ)); CHKPTRQ(a);
1592   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1593   mat->destroy    = MatDestroy_MPIAIJ;
1594   mat->view       = MatView_MPIAIJ;
1595   mat->factor     = matin->factor;
1596   mat->assembled  = PETSC_TRUE;
1597 
1598   a->m = mat->m   = oldmat->m;
1599   a->n = mat->n   = oldmat->n;
1600   a->M = mat->M   = oldmat->M;
1601   a->N = mat->N   = oldmat->N;
1602 
1603   a->rstart       = oldmat->rstart;
1604   a->rend         = oldmat->rend;
1605   a->cstart       = oldmat->cstart;
1606   a->cend         = oldmat->cend;
1607   a->size         = oldmat->size;
1608   a->rank         = oldmat->rank;
1609   a->insertmode   = NOT_SET_VALUES;
1610   a->rowvalues    = 0;
1611   a->getrowactive = PETSC_FALSE;
1612 
1613   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1614   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIAIJ));
1615   a->cowners = a->rowners + a->size + 2;
1616   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1617   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1618   if (oldmat->colmap) {
1619     a->colmap = (int *) PetscMalloc((a->N)*sizeof(int));CHKPTRQ(a->colmap);
1620     PLogObjectMemory(mat,(a->N)*sizeof(int));
1621     PetscMemcpy(a->colmap,oldmat->colmap,(a->N)*sizeof(int));
1622   } else a->colmap = 0;
1623   if (oldmat->garray) {
1624     len = ((Mat_SeqAIJ *) (oldmat->B->data))->n;
1625     a->garray = (int *) PetscMalloc((len+1)*sizeof(int)); CHKPTRQ(a->garray);
1626     PLogObjectMemory(mat,len*sizeof(int));
1627     if (len) PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1628   } else a->garray = 0;
1629 
1630   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1631   PLogObjectParent(mat,a->lvec);
1632   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1633   PLogObjectParent(mat,a->Mvctx);
1634   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1635   PLogObjectParent(mat,a->A);
1636   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1637   PLogObjectParent(mat,a->B);
1638   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1639   if (flg) {
1640     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1641   }
1642   *newmat = mat;
1643   return 0;
1644 }
1645 
1646 #include "sys.h"
1647 
1648 #undef __FUNC__
1649 #define __FUNC__ "MatLoad_MPIAIJ"
1650 int MatLoad_MPIAIJ(Viewer viewer,MatType type,Mat *newmat)
1651 {
1652   Mat          A;
1653   int          i, nz, ierr, j,rstart, rend, fd;
1654   Scalar       *vals,*svals;
1655   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1656   MPI_Status   status;
1657   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,maxnz,*cols;
1658   int          *ourlens,*sndcounts = 0,*procsnz = 0, *offlens,jj,*mycols,*smycols;
1659   int          tag = ((PetscObject)viewer)->tag;
1660 
1661   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1662   if (!rank) {
1663     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1664     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
1665     if (header[0] != MAT_COOKIE) SETERRQ(1,0,"not matrix object");
1666   }
1667 
1668   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1669   M = header[1]; N = header[2];
1670   /* determine ownership of all rows */
1671   m = M/size + ((M % size) > rank);
1672   rowners = (int *) PetscMalloc((size+2)*sizeof(int)); CHKPTRQ(rowners);
1673   MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1674   rowners[0] = 0;
1675   for ( i=2; i<=size; i++ ) {
1676     rowners[i] += rowners[i-1];
1677   }
1678   rstart = rowners[rank];
1679   rend   = rowners[rank+1];
1680 
1681   /* distribute row lengths to all processors */
1682   ourlens = (int*) PetscMalloc( 2*(rend-rstart)*sizeof(int) ); CHKPTRQ(ourlens);
1683   offlens = ourlens + (rend-rstart);
1684   if (!rank) {
1685     rowlengths = (int*) PetscMalloc( M*sizeof(int) ); CHKPTRQ(rowlengths);
1686     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1687     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1688     for ( i=0; i<size; i++ ) sndcounts[i] = rowners[i+1] - rowners[i];
1689     MPI_Scatterv(rowlengths,sndcounts,rowners,MPI_INT,ourlens,rend-rstart,MPI_INT,0,comm);
1690     PetscFree(sndcounts);
1691   }
1692   else {
1693     MPI_Scatterv(0,0,0,MPI_INT,ourlens,rend-rstart,MPI_INT, 0,comm);
1694   }
1695 
1696   if (!rank) {
1697     /* calculate the number of nonzeros on each processor */
1698     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1699     PetscMemzero(procsnz,size*sizeof(int));
1700     for ( i=0; i<size; i++ ) {
1701       for ( j=rowners[i]; j< rowners[i+1]; j++ ) {
1702         procsnz[i] += rowlengths[j];
1703       }
1704     }
1705     PetscFree(rowlengths);
1706 
1707     /* determine max buffer needed and allocate it */
1708     maxnz = 0;
1709     for ( i=0; i<size; i++ ) {
1710       maxnz = PetscMax(maxnz,procsnz[i]);
1711     }
1712     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1713 
1714     /* read in my part of the matrix column indices  */
1715     nz = procsnz[0];
1716     mycols = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1717     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1718 
1719     /* read in every one elses and ship off */
1720     for ( i=1; i<size; i++ ) {
1721       nz = procsnz[i];
1722       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1723       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1724     }
1725     PetscFree(cols);
1726   }
1727   else {
1728     /* determine buffer space needed for message */
1729     nz = 0;
1730     for ( i=0; i<m; i++ ) {
1731       nz += ourlens[i];
1732     }
1733     mycols = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1734 
1735     /* receive message of column indices*/
1736     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1737     MPI_Get_count(&status,MPI_INT,&maxnz);
1738     if (maxnz != nz) SETERRQ(1,0,"something is wrong with file");
1739   }
1740 
1741   /* loop over local rows, determining number of off diagonal entries */
1742   PetscMemzero(offlens,m*sizeof(int));
1743   jj = 0;
1744   for ( i=0; i<m; i++ ) {
1745     for ( j=0; j<ourlens[i]; j++ ) {
1746       if (mycols[jj] < rstart || mycols[jj] >= rend) offlens[i]++;
1747       jj++;
1748     }
1749   }
1750 
1751   /* create our matrix */
1752   for ( i=0; i<m; i++ ) {
1753     ourlens[i] -= offlens[i];
1754   }
1755   ierr = MatCreateMPIAIJ(comm,m,PETSC_DECIDE,M,N,0,ourlens,0,offlens,newmat);CHKERRQ(ierr);
1756   A = *newmat;
1757   MatSetOption(A,MAT_COLUMNS_SORTED);
1758   for ( i=0; i<m; i++ ) {
1759     ourlens[i] += offlens[i];
1760   }
1761 
1762   if (!rank) {
1763     vals = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(vals);
1764 
1765     /* read in my part of the matrix numerical values  */
1766     nz = procsnz[0];
1767     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1768 
1769     /* insert into matrix */
1770     jj      = rstart;
1771     smycols = mycols;
1772     svals   = vals;
1773     for ( i=0; i<m; i++ ) {
1774       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1775       smycols += ourlens[i];
1776       svals   += ourlens[i];
1777       jj++;
1778     }
1779 
1780     /* read in other processors and ship out */
1781     for ( i=1; i<size; i++ ) {
1782       nz = procsnz[i];
1783       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1784       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1785     }
1786     PetscFree(procsnz);
1787   }
1788   else {
1789     /* receive numeric values */
1790     vals = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(vals);
1791 
1792     /* receive message of values*/
1793     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1794     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1795     if (maxnz != nz) SETERRQ(1,0,"something is wrong with file");
1796 
1797     /* insert into matrix */
1798     jj      = rstart;
1799     smycols = mycols;
1800     svals   = vals;
1801     for ( i=0; i<m; i++ ) {
1802       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1803       smycols += ourlens[i];
1804       svals   += ourlens[i];
1805       jj++;
1806     }
1807   }
1808   PetscFree(ourlens); PetscFree(vals); PetscFree(mycols); PetscFree(rowners);
1809 
1810   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1811   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1812   return 0;
1813 }
1814