xref: /petsc/src/mat/impls/aij/mpi/mpiaij.c (revision 6525c446fdebc33f8f2914de551b2af5a320bf84)
1 #ifdef PETSC_RCS_HEADER
2 static char vcid[] = "$Id: mpiaij.c,v 1.281 1999/02/17 17:34:43 balay Exp balay $";
3 #endif
4 
5 #include "src/mat/impls/aij/mpi/mpiaij.h"
6 #include "src/vec/vecimpl.h"
7 #include "src/inline/spops.h"
8 
9 /* local utility routine that creates a mapping from the global column
10 number to the local number in the off-diagonal part of the local
11 storage of the matrix.  This is done in a non scable way since the
12 length of colmap equals the global matrix length.
13 */
14 #undef __FUNC__
15 #define __FUNC__ "CreateColmap_MPIAIJ_Private"
16 int CreateColmap_MPIAIJ_Private(Mat mat)
17 {
18   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
19   Mat_SeqAIJ *B = (Mat_SeqAIJ*) aij->B->data;
20   int        n = B->n,i;
21 #if defined (USE_CTABLE)
22   int        ierr;
23 #endif
24 
25   PetscFunctionBegin;
26 #if defined (USE_CTABLE)
27   ierr = TableCreate( &aij->colmap, aij->n/5 ); CHKERRQ(ierr);
28   for ( i=0; i<n; i++ ){
29     ierr = TableAdd( aij->colmap, aij->garray[i] + 1, i+1 );CHKERRQ(ierr);
30   }
31 #else
32   aij->colmap = (int *) PetscMalloc((aij->N+1)*sizeof(int));CHKPTRQ(aij->colmap);
33   PLogObjectMemory(mat,aij->N*sizeof(int));
34   PetscMemzero(aij->colmap,aij->N*sizeof(int));
35   for ( i=0; i<n; i++ ) aij->colmap[aij->garray[i]] = i+1;
36 #endif
37   PetscFunctionReturn(0);
38 }
39 
40 extern int DisAssemble_MPIAIJ(Mat);
41 
42 #define CHUNKSIZE   15
43 #define MatSetValues_SeqAIJ_A_Private(row,col,value,addv) \
44 { \
45  \
46     rp   = aj + ai[row] + shift; ap = aa + ai[row] + shift; \
47     rmax = aimax[row]; nrow = ailen[row];  \
48     col1 = col - shift; \
49      \
50     low = 0; high = nrow; \
51     while (high-low > 5) { \
52       t = (low+high)/2; \
53       if (rp[t] > col) high = t; \
54       else             low  = t; \
55     } \
56       for ( _i=0; _i<nrow; _i++ ) { \
57         if (rp[_i] > col1) break; \
58         if (rp[_i] == col1) { \
59           if (addv == ADD_VALUES) ap[_i] += value;   \
60           else                  ap[_i] = value; \
61           goto a_noinsert; \
62         } \
63       }  \
64       if (nonew == 1) goto a_noinsert; \
65       else if (nonew == -1) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero into matrix"); \
66       if (nrow >= rmax) { \
67         /* there is no extra room in row, therefore enlarge */ \
68         int    new_nz = ai[a->m] + CHUNKSIZE,len,*new_i,*new_j; \
69         Scalar *new_a; \
70  \
71         if (nonew == -2) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero in the matrix"); \
72  \
73         /* malloc new storage space */ \
74         len     = new_nz*(sizeof(int)+sizeof(Scalar))+(a->m+1)*sizeof(int); \
75         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
76         new_j   = (int *) (new_a + new_nz); \
77         new_i   = new_j + new_nz; \
78  \
79         /* copy over old data into new slots */ \
80         for ( ii=0; ii<row+1; ii++ ) {new_i[ii] = ai[ii];} \
81         for ( ii=row+1; ii<a->m+1; ii++ ) {new_i[ii] = ai[ii]+CHUNKSIZE;} \
82         PetscMemcpy(new_j,aj,(ai[row]+nrow+shift)*sizeof(int)); \
83         len = (new_nz - CHUNKSIZE - ai[row] - nrow - shift); \
84         PetscMemcpy(new_j+ai[row]+shift+nrow+CHUNKSIZE,aj+ai[row]+shift+nrow, \
85                                                            len*sizeof(int)); \
86         PetscMemcpy(new_a,aa,(ai[row]+nrow+shift)*sizeof(Scalar)); \
87         PetscMemcpy(new_a+ai[row]+shift+nrow+CHUNKSIZE,aa+ai[row]+shift+nrow, \
88                                                            len*sizeof(Scalar));  \
89         /* free up old matrix storage */ \
90  \
91         PetscFree(a->a);  \
92         if (!a->singlemalloc) {PetscFree(a->i);PetscFree(a->j);} \
93         aa = a->a = new_a; ai = a->i = new_i; aj = a->j = new_j;  \
94         a->singlemalloc = 1; \
95  \
96         rp   = aj + ai[row] + shift; ap = aa + ai[row] + shift; \
97         rmax = aimax[row] = aimax[row] + CHUNKSIZE; \
98         PLogObjectMemory(A,CHUNKSIZE*(sizeof(int) + sizeof(Scalar))); \
99         a->maxnz += CHUNKSIZE; \
100         a->reallocs++; \
101       } \
102       N = nrow++ - 1; a->nz++; \
103       /* shift up all the later entries in this row */ \
104       for ( ii=N; ii>=_i; ii-- ) { \
105         rp[ii+1] = rp[ii]; \
106         ap[ii+1] = ap[ii]; \
107       } \
108       rp[_i] = col1;  \
109       ap[_i] = value;  \
110       a_noinsert: ; \
111       ailen[row] = nrow; \
112 }
113 
114 #define MatSetValues_SeqAIJ_B_Private(row,col,value,addv) \
115 { \
116  \
117     rp   = bj + bi[row] + shift; ap = ba + bi[row] + shift; \
118     rmax = bimax[row]; nrow = bilen[row];  \
119     col1 = col - shift; \
120      \
121     low = 0; high = nrow; \
122     while (high-low > 5) { \
123       t = (low+high)/2; \
124       if (rp[t] > col) high = t; \
125       else             low  = t; \
126     } \
127        for ( _i=0; _i<nrow; _i++ ) { \
128         if (rp[_i] > col1) break; \
129         if (rp[_i] == col1) { \
130           if (addv == ADD_VALUES) ap[_i] += value;   \
131           else                  ap[_i] = value; \
132           goto b_noinsert; \
133         } \
134       }  \
135       if (nonew == 1) goto b_noinsert; \
136       else if (nonew == -1) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero into matrix"); \
137       if (nrow >= rmax) { \
138         /* there is no extra room in row, therefore enlarge */ \
139         int    new_nz = bi[b->m] + CHUNKSIZE,len,*new_i,*new_j; \
140         Scalar *new_a; \
141  \
142         if (nonew == -2) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero in the matrix"); \
143  \
144         /* malloc new storage space */ \
145         len     = new_nz*(sizeof(int)+sizeof(Scalar))+(b->m+1)*sizeof(int); \
146         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
147         new_j   = (int *) (new_a + new_nz); \
148         new_i   = new_j + new_nz; \
149  \
150         /* copy over old data into new slots */ \
151         for ( ii=0; ii<row+1; ii++ ) {new_i[ii] = bi[ii];} \
152         for ( ii=row+1; ii<b->m+1; ii++ ) {new_i[ii] = bi[ii]+CHUNKSIZE;} \
153         PetscMemcpy(new_j,bj,(bi[row]+nrow+shift)*sizeof(int)); \
154         len = (new_nz - CHUNKSIZE - bi[row] - nrow - shift); \
155         PetscMemcpy(new_j+bi[row]+shift+nrow+CHUNKSIZE,bj+bi[row]+shift+nrow, \
156                                                            len*sizeof(int)); \
157         PetscMemcpy(new_a,ba,(bi[row]+nrow+shift)*sizeof(Scalar)); \
158         PetscMemcpy(new_a+bi[row]+shift+nrow+CHUNKSIZE,ba+bi[row]+shift+nrow, \
159                                                            len*sizeof(Scalar));  \
160         /* free up old matrix storage */ \
161  \
162         PetscFree(b->a);  \
163         if (!b->singlemalloc) {PetscFree(b->i);PetscFree(b->j);} \
164         ba = b->a = new_a; bi = b->i = new_i; bj = b->j = new_j;  \
165         b->singlemalloc = 1; \
166  \
167         rp   = bj + bi[row] + shift; ap = ba + bi[row] + shift; \
168         rmax = bimax[row] = bimax[row] + CHUNKSIZE; \
169         PLogObjectMemory(B,CHUNKSIZE*(sizeof(int) + sizeof(Scalar))); \
170         b->maxnz += CHUNKSIZE; \
171         b->reallocs++; \
172       } \
173       N = nrow++ - 1; b->nz++; \
174       /* shift up all the later entries in this row */ \
175       for ( ii=N; ii>=_i; ii-- ) { \
176         rp[ii+1] = rp[ii]; \
177         ap[ii+1] = ap[ii]; \
178       } \
179       rp[_i] = col1;  \
180       ap[_i] = value;  \
181       b_noinsert: ; \
182       bilen[row] = nrow; \
183 }
184 
185 extern int MatSetValues_SeqAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
186 #undef __FUNC__
187 #define __FUNC__ "MatSetValues_MPIAIJ"
188 int MatSetValues_MPIAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
189 {
190   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
191   Scalar     value;
192   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
193   int        cstart = aij->cstart, cend = aij->cend,row,col;
194   int        roworiented = aij->roworiented;
195 
196   /* Some Variables required in the macro */
197   Mat        A = aij->A;
198   Mat_SeqAIJ *a = (Mat_SeqAIJ *) A->data;
199   int        *aimax = a->imax, *ai = a->i, *ailen = a->ilen,*aj = a->j;
200   Scalar     *aa = a->a;
201 
202   Mat        B = aij->B;
203   Mat_SeqAIJ *b = (Mat_SeqAIJ *) B->data;
204   int        *bimax = b->imax, *bi = b->i, *bilen = b->ilen,*bj = b->j;
205   Scalar     *ba = b->a;
206 
207   int        *rp,ii,nrow,_i,rmax, N, col1,low,high,t;
208   int        nonew = a->nonew,shift = a->indexshift;
209   Scalar     *ap;
210 
211   PetscFunctionBegin;
212   for ( i=0; i<m; i++ ) {
213     if (im[i] < 0) continue;
214 #if defined(USE_PETSC_BOPT_g)
215     if (im[i] >= aij->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
216 #endif
217     if (im[i] >= rstart && im[i] < rend) {
218       row = im[i] - rstart;
219       for ( j=0; j<n; j++ ) {
220         if (in[j] >= cstart && in[j] < cend){
221           col = in[j] - cstart;
222           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
223           MatSetValues_SeqAIJ_A_Private(row,col,value,addv);
224           /* ierr = MatSetValues_SeqAIJ(aij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
225         }
226         else if (in[j] < 0) continue;
227 #if defined(USE_PETSC_BOPT_g)
228         else if (in[j] >= aij->N) {SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Column too large");}
229 #endif
230         else {
231           if (mat->was_assembled) {
232             if (!aij->colmap) {
233               ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
234             }
235 #if defined (USE_CTABLE)
236 	    col = TableFind( aij->colmap, in[j] + 1 ) - 1;
237 #else
238             col = aij->colmap[in[j]] - 1;
239 #endif
240             if (col < 0 && !((Mat_SeqAIJ*)(aij->A->data))->nonew) {
241               ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
242               col =  in[j];
243               /* Reinitialize the variables required by MatSetValues_SeqAIJ_B_Private() */
244               B = aij->B;
245               b = (Mat_SeqAIJ *) B->data;
246               bimax = b->imax; bi = b->i; bilen = b->ilen; bj = b->j;
247               ba = b->a;
248             }
249           } else col = in[j];
250           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
251           MatSetValues_SeqAIJ_B_Private(row,col,value,addv);
252           /* ierr = MatSetValues_SeqAIJ(aij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
253         }
254       }
255     } else {
256       if (roworiented && !aij->donotstash) {
257         ierr = StashValues_Private(&aij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
258       } else {
259         if (!aij->donotstash) {
260           row = im[i];
261           for ( j=0; j<n; j++ ) {
262             ierr = StashValues_Private(&aij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
263           }
264         }
265       }
266     }
267   }
268   PetscFunctionReturn(0);
269 }
270 
271 #undef __FUNC__
272 #define __FUNC__ "MatGetValues_MPIAIJ"
273 int MatGetValues_MPIAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
274 {
275   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
276   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
277   int        cstart = aij->cstart, cend = aij->cend,row,col;
278 
279   PetscFunctionBegin;
280   for ( i=0; i<m; i++ ) {
281     if (idxm[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
282     if (idxm[i] >= aij->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
283     if (idxm[i] >= rstart && idxm[i] < rend) {
284       row = idxm[i] - rstart;
285       for ( j=0; j<n; j++ ) {
286         if (idxn[j] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative column");
287         if (idxn[j] >= aij->N) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Column too large");
288         if (idxn[j] >= cstart && idxn[j] < cend){
289           col = idxn[j] - cstart;
290           ierr = MatGetValues(aij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
291         } else {
292           if (!aij->colmap) {
293             ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
294           }
295 #if defined (USE_CTABLE)
296 	  col = TableFind( aij->colmap, idxn[j] + 1 ) - 1;
297 #else
298           col = aij->colmap[idxn[j]] - 1;
299 #endif
300           if ((col < 0) || (aij->garray[col] != idxn[j])) *(v+i*n+j) = 0.0;
301           else {
302             ierr = MatGetValues(aij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
303           }
304         }
305       }
306     } else {
307       SETERRQ(PETSC_ERR_SUP,0,"Only local values currently supported");
308     }
309   }
310   PetscFunctionReturn(0);
311 }
312 
313 #undef __FUNC__
314 #define __FUNC__ "MatAssemblyBegin_MPIAIJ"
315 int MatAssemblyBegin_MPIAIJ(Mat mat,MatAssemblyType mode)
316 {
317   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
318   MPI_Comm    comm = mat->comm;
319   int         size = aij->size, *owners = aij->rowners;
320   int         rank = aij->rank,tag = mat->tag, *owner,*starts,count,ierr;
321   MPI_Request *send_waits,*recv_waits;
322   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
323   InsertMode  addv;
324   Scalar      *rvalues,*svalues;
325 
326   PetscFunctionBegin;
327   if (aij->donotstash) {
328     aij->svalues    = 0; aij->rvalues    = 0;
329     aij->nsends     = 0; aij->nrecvs     = 0;
330     aij->send_waits = 0; aij->recv_waits = 0;
331     aij->rmax       = 0;
332     PetscFunctionReturn(0);
333   }
334 
335   /* make sure all processors are either in INSERTMODE or ADDMODE */
336   ierr = MPI_Allreduce(&mat->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);CHKERRQ(ierr);
337   if (addv == (ADD_VALUES|INSERT_VALUES)) {
338     SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"Some processors inserted others added");
339   }
340   mat->insertmode = addv; /* in case this processor had no cache */
341 
342   /*  first count number of contributors to each processor */
343   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
344   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
345   owner = (int *) PetscMalloc( (aij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
346   for ( i=0; i<aij->stash.n; i++ ) {
347     idx = aij->stash.idx[i];
348     for ( j=0; j<size; j++ ) {
349       if (idx >= owners[j] && idx < owners[j+1]) {
350         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
351       }
352     }
353   }
354   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
355 
356   /* inform other processors of number of messages and max length*/
357   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
358   ierr = MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
359   nreceives = work[rank];
360   ierr = MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);CHKERRQ(ierr);
361   nmax = work[rank];
362   PetscFree(work);
363 
364   /* post receives:
365        1) each message will consist of ordered pairs
366      (global index,value) we store the global index as a double
367      to simplify the message passing.
368        2) since we don't know how long each individual message is we
369      allocate the largest needed buffer for each receive. Potentially
370      this is a lot of wasted space.
371 
372 
373        This could be done better.
374   */
375   rvalues    = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));CHKPTRQ(rvalues);
376   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
377   for ( i=0; i<nreceives; i++ ) {
378     ierr = MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
379               comm,recv_waits+i);CHKERRQ(ierr);
380   }
381 
382   /* do sends:
383       1) starts[i] gives the starting index in svalues for stuff going to
384          the ith processor
385   */
386   svalues    = (Scalar *) PetscMalloc(3*(aij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
387   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
388   starts     = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
389   starts[0]  = 0;
390   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
391   for ( i=0; i<aij->stash.n; i++ ) {
392     svalues[3*starts[owner[i]]]       = (Scalar)  aij->stash.idx[i];
393     svalues[3*starts[owner[i]]+1]     = (Scalar)  aij->stash.idy[i];
394     svalues[3*(starts[owner[i]]++)+2] =  aij->stash.array[i];
395   }
396   PetscFree(owner);
397   starts[0] = 0;
398   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
399   count = 0;
400   for ( i=0; i<size; i++ ) {
401     if (procs[i]) {
402       ierr = MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,comm,send_waits+count++);CHKERRQ(ierr);
403     }
404   }
405   PetscFree(starts);
406   PetscFree(nprocs);
407 
408   /* Free cache space */
409   PLogInfo(aij->A,"MatAssemblyBegin_MPIAIJ:Number of off-processor values %d\n",aij->stash.n);
410   ierr = StashDestroy_Private(&aij->stash); CHKERRQ(ierr);
411 
412   aij->svalues    = svalues;    aij->rvalues    = rvalues;
413   aij->nsends     = nsends;     aij->nrecvs     = nreceives;
414   aij->send_waits = send_waits; aij->recv_waits = recv_waits;
415   aij->rmax       = nmax;
416 
417   PetscFunctionReturn(0);
418 }
419 extern int MatSetUpMultiply_MPIAIJ(Mat);
420 
421 #undef __FUNC__
422 #define __FUNC__ "MatAssemblyEnd_MPIAIJ"
423 int MatAssemblyEnd_MPIAIJ(Mat mat,MatAssemblyType mode)
424 {
425   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
426   MPI_Status  *send_status,recv_status;
427   int         imdex,nrecvs = aij->nrecvs, count = nrecvs, i, n, ierr;
428   int         row,col,other_disassembled;
429   Scalar      *values,val;
430   InsertMode  addv = mat->insertmode;
431 
432   PetscFunctionBegin;
433 
434   /*  wait on receives */
435   while (count) {
436     ierr = MPI_Waitany(nrecvs,aij->recv_waits,&imdex,&recv_status);CHKERRQ(ierr);
437     /* unpack receives into our local space */
438     values = aij->rvalues + 3*imdex*aij->rmax;
439     ierr = MPI_Get_count(&recv_status,MPIU_SCALAR,&n);CHKERRQ(ierr);
440     n = n/3;
441     for ( i=0; i<n; i++ ) {
442       row = (int) PetscReal(values[3*i]) - aij->rstart;
443       col = (int) PetscReal(values[3*i+1]);
444       val = values[3*i+2];
445       if (col >= aij->cstart && col < aij->cend) {
446         col -= aij->cstart;
447         ierr = MatSetValues(aij->A,1,&row,1,&col,&val,addv); CHKERRQ(ierr);
448       } else {
449         if (mat->was_assembled || mat->assembled) {
450           if (!aij->colmap) {
451             ierr = CreateColmap_MPIAIJ_Private(mat); CHKERRQ(ierr);
452           }
453 #if defined (USE_CTABLE)
454           col = TableFind( aij->colmap, col + 1 ) - 1;
455 #else
456           col = aij->colmap[col] - 1;
457 #endif
458           if (col < 0  && !((Mat_SeqAIJ*)(aij->A->data))->nonew) {
459             ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
460             col = (int) PetscReal(values[3*i+1]);
461           }
462         }
463         ierr = MatSetValues(aij->B,1,&row,1,&col,&val,addv); CHKERRQ(ierr);
464       }
465     }
466     count--;
467   }
468   if (aij->recv_waits) PetscFree(aij->recv_waits);
469   if (aij->rvalues)    PetscFree(aij->rvalues);
470 
471   /* wait on sends */
472   if (aij->nsends) {
473     send_status = (MPI_Status *) PetscMalloc(aij->nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
474     ierr        = MPI_Waitall(aij->nsends,aij->send_waits,send_status);CHKERRQ(ierr);
475     PetscFree(send_status);
476   }
477   if (aij->send_waits) PetscFree(aij->send_waits);
478   if (aij->svalues)    PetscFree(aij->svalues);
479 
480   ierr = MatAssemblyBegin(aij->A,mode); CHKERRQ(ierr);
481   ierr = MatAssemblyEnd(aij->A,mode); CHKERRQ(ierr);
482 
483   /* determine if any processor has disassembled, if so we must
484      also disassemble ourselfs, in order that we may reassemble. */
485   /*
486      if nonzero structure of submatrix B cannot change then we know that
487      no processor disassembled thus we can skip this stuff
488   */
489   if (!((Mat_SeqAIJ*) aij->B->data)->nonew)  {
490     ierr = MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);CHKERRQ(ierr);
491     if (mat->was_assembled && !other_disassembled) {
492       ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
493     }
494   }
495 
496   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
497     ierr = MatSetUpMultiply_MPIAIJ(mat); CHKERRQ(ierr);
498   }
499   ierr = MatAssemblyBegin(aij->B,mode); CHKERRQ(ierr);
500   ierr = MatAssemblyEnd(aij->B,mode); CHKERRQ(ierr);
501 
502   if (aij->rowvalues) {PetscFree(aij->rowvalues); aij->rowvalues = 0;}
503   PetscFunctionReturn(0);
504 }
505 
506 #undef __FUNC__
507 #define __FUNC__ "MatZeroEntries_MPIAIJ"
508 int MatZeroEntries_MPIAIJ(Mat A)
509 {
510   Mat_MPIAIJ *l = (Mat_MPIAIJ *) A->data;
511   int        ierr;
512 
513   PetscFunctionBegin;
514   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
515   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
516   PetscFunctionReturn(0);
517 }
518 
519 #undef __FUNC__
520 #define __FUNC__ "MatZeroRows_MPIAIJ"
521 int MatZeroRows_MPIAIJ(Mat A,IS is,Scalar *diag)
522 {
523   Mat_MPIAIJ     *l = (Mat_MPIAIJ *) A->data;
524   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
525   int            *procs,*nprocs,j,found,idx,nsends,*work,row;
526   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
527   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
528   int            *lens,imdex,*lrows,*values,rstart=l->rstart;
529   MPI_Comm       comm = A->comm;
530   MPI_Request    *send_waits,*recv_waits;
531   MPI_Status     recv_status,*send_status;
532   IS             istmp;
533 
534   PetscFunctionBegin;
535   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
536   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
537 
538   /*  first count number of contributors to each processor */
539   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
540   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
541   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
542   for ( i=0; i<N; i++ ) {
543     idx = rows[i];
544     found = 0;
545     for ( j=0; j<size; j++ ) {
546       if (idx >= owners[j] && idx < owners[j+1]) {
547         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
548       }
549     }
550     if (!found) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Index out of range");
551   }
552   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
553 
554   /* inform other processors of number of messages and max length*/
555   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
556   ierr = MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
557   nrecvs = work[rank];
558   ierr = MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);CHKERRQ(ierr);
559   nmax = work[rank];
560   PetscFree(work);
561 
562   /* post receives:   */
563   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int));CHKPTRQ(rvalues);
564   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
565   for ( i=0; i<nrecvs; i++ ) {
566     ierr = MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);CHKERRQ(ierr);
567   }
568 
569   /* do sends:
570       1) starts[i] gives the starting index in svalues for stuff going to
571          the ith processor
572   */
573   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
574   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
575   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
576   starts[0] = 0;
577   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
578   for ( i=0; i<N; i++ ) {
579     svalues[starts[owner[i]]++] = rows[i];
580   }
581   ISRestoreIndices(is,&rows);
582 
583   starts[0] = 0;
584   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
585   count = 0;
586   for ( i=0; i<size; i++ ) {
587     if (procs[i]) {
588       ierr = MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);CHKERRQ(ierr);
589     }
590   }
591   PetscFree(starts);
592 
593   base = owners[rank];
594 
595   /*  wait on receives */
596   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
597   source = lens + nrecvs;
598   count  = nrecvs; slen = 0;
599   while (count) {
600     ierr = MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);CHKERRQ(ierr);
601     /* unpack receives into our local space */
602     ierr = MPI_Get_count(&recv_status,MPI_INT,&n);CHKERRQ(ierr);
603     source[imdex]  = recv_status.MPI_SOURCE;
604     lens[imdex]  = n;
605     slen += n;
606     count--;
607   }
608   PetscFree(recv_waits);
609 
610   /* move the data into the send scatter */
611   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
612   count = 0;
613   for ( i=0; i<nrecvs; i++ ) {
614     values = rvalues + i*nmax;
615     for ( j=0; j<lens[i]; j++ ) {
616       lrows[count++] = values[j] - base;
617     }
618   }
619   PetscFree(rvalues); PetscFree(lens);
620   PetscFree(owner); PetscFree(nprocs);
621 
622   /* actually zap the local rows */
623   ierr = ISCreateGeneral(PETSC_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
624   PLogObjectParent(A,istmp);
625 
626   /*
627         Zero the required rows. If the "diagonal block" of the matrix
628      is square and the user wishes to set the diagonal we use seperate
629      code so that MatSetValues() is not called for each diagonal allocating
630      new memory, thus calling lots of mallocs and slowing things down.
631 
632        Contributed by: Mathew Knepley
633   */
634   /* must zero l->B before l->A because the (diag) case below may put values into l->B*/
635   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
636   if (diag && (l->A->M == l->A->N)) {
637     ierr      = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
638   } else if (diag) {
639     ierr = MatZeroRows(l->A,istmp,0); CHKERRQ(ierr);
640     if (!((Mat_SeqAIJ*)l->A->data)->nonew) {
641       SETERRQ(PETSC_ERR_SUP,0,"MatZeroRows() on rectangular matrices cannot be used with \n\
642 the Mat options MAT_NO_NEW_NONZERO_LOCATIONS, MAT_NEW_NONZERO_LOCATION_ERR");
643     }
644     for ( i = 0; i < slen; i++ ) {
645       row  = lrows[i] + rstart;
646       ierr = MatSetValues(A,1,&row,1,&row,diag,INSERT_VALUES);CHKERRQ(ierr);
647     }
648     ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
649     ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
650   } else {
651     ierr = MatZeroRows(l->A,istmp,0); CHKERRQ(ierr);
652   }
653   ierr = ISDestroy(istmp); CHKERRQ(ierr);
654   PetscFree(lrows);
655 
656   /* wait on sends */
657   if (nsends) {
658     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
659     ierr        = MPI_Waitall(nsends,send_waits,send_status);CHKERRQ(ierr);
660     PetscFree(send_status);
661   }
662   PetscFree(send_waits); PetscFree(svalues);
663 
664   PetscFunctionReturn(0);
665 }
666 
667 #undef __FUNC__
668 #define __FUNC__ "MatMult_MPIAIJ"
669 int MatMult_MPIAIJ(Mat A,Vec xx,Vec yy)
670 {
671   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
672   int        ierr,nt;
673 
674   PetscFunctionBegin;
675   ierr = VecGetLocalSize(xx,&nt);  CHKERRQ(ierr);
676   if (nt != a->n) {
677     SETERRQ(PETSC_ERR_ARG_SIZ,0,"Incompatible partition of A and xx");
678   }
679   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx); CHKERRQ(ierr);
680   ierr = (*a->A->ops->mult)(a->A,xx,yy); CHKERRQ(ierr);
681   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx); CHKERRQ(ierr);
682   ierr = (*a->B->ops->multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
683   PetscFunctionReturn(0);
684 }
685 
686 #undef __FUNC__
687 #define __FUNC__ "MatMultAdd_MPIAIJ"
688 int MatMultAdd_MPIAIJ(Mat A,Vec xx,Vec yy,Vec zz)
689 {
690   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
691   int        ierr;
692 
693   PetscFunctionBegin;
694   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
695   ierr = (*a->A->ops->multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
696   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
697   ierr = (*a->B->ops->multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
698   PetscFunctionReturn(0);
699 }
700 
701 #undef __FUNC__
702 #define __FUNC__ "MatMultTrans_MPIAIJ"
703 int MatMultTrans_MPIAIJ(Mat A,Vec xx,Vec yy)
704 {
705   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
706   int        ierr;
707 
708   PetscFunctionBegin;
709   /* do nondiagonal part */
710   ierr = (*a->B->ops->multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
711   /* send it on its way */
712   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
713   /* do local part */
714   ierr = (*a->A->ops->multtrans)(a->A,xx,yy); CHKERRQ(ierr);
715   /* receive remote parts: note this assumes the values are not actually */
716   /* inserted in yy until the next line, which is true for my implementation*/
717   /* but is not perhaps always true. */
718   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
719   PetscFunctionReturn(0);
720 }
721 
722 #undef __FUNC__
723 #define __FUNC__ "MatMultTransAdd_MPIAIJ"
724 int MatMultTransAdd_MPIAIJ(Mat A,Vec xx,Vec yy,Vec zz)
725 {
726   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
727   int        ierr;
728 
729   PetscFunctionBegin;
730   /* do nondiagonal part */
731   ierr = (*a->B->ops->multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
732   /* send it on its way */
733   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
734   /* do local part */
735   ierr = (*a->A->ops->multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
736   /* receive remote parts: note this assumes the values are not actually */
737   /* inserted in yy until the next line, which is true for my implementation*/
738   /* but is not perhaps always true. */
739   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
740   PetscFunctionReturn(0);
741 }
742 
743 /*
744   This only works correctly for square matrices where the subblock A->A is the
745    diagonal block
746 */
747 #undef __FUNC__
748 #define __FUNC__ "MatGetDiagonal_MPIAIJ"
749 int MatGetDiagonal_MPIAIJ(Mat A,Vec v)
750 {
751   int        ierr;
752   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
753 
754   PetscFunctionBegin;
755   if (a->M != a->N) SETERRQ(PETSC_ERR_SUP,0,"Supports only square matrix where A->A is diag block");
756   if (a->rstart != a->cstart || a->rend != a->cend) {
757     SETERRQ(PETSC_ERR_ARG_SIZ,0,"row partition must equal col partition");
758   }
759   ierr = MatGetDiagonal(a->A,v);CHKERRQ(ierr);
760   PetscFunctionReturn(0);
761 }
762 
763 #undef __FUNC__
764 #define __FUNC__ "MatScale_MPIAIJ"
765 int MatScale_MPIAIJ(Scalar *aa,Mat A)
766 {
767   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
768   int        ierr;
769 
770   PetscFunctionBegin;
771   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
772   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
773   PetscFunctionReturn(0);
774 }
775 
776 #undef __FUNC__
777 #define __FUNC__ "MatDestroy_MPIAIJ"
778 int MatDestroy_MPIAIJ(Mat mat)
779 {
780   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
781   int        ierr;
782 
783   PetscFunctionBegin;
784   if (--mat->refct > 0) PetscFunctionReturn(0);
785 
786   if (mat->mapping) {
787     ierr = ISLocalToGlobalMappingDestroy(mat->mapping); CHKERRQ(ierr);
788   }
789   if (mat->bmapping) {
790     ierr = ISLocalToGlobalMappingDestroy(mat->bmapping); CHKERRQ(ierr);
791   }
792   if (mat->rmap) {
793     ierr = MapDestroy(mat->rmap);CHKERRQ(ierr);
794   }
795   if (mat->cmap) {
796     ierr = MapDestroy(mat->cmap);CHKERRQ(ierr);
797   }
798 #if defined(USE_PETSC_LOG)
799   PLogObjectState((PetscObject)mat,"Rows=%d, Cols=%d",aij->M,aij->N);
800 #endif
801   ierr = StashDestroy_Private(&aij->stash); CHKERRQ(ierr);
802   PetscFree(aij->rowners);
803   ierr = MatDestroy(aij->A); CHKERRQ(ierr);
804   ierr = MatDestroy(aij->B); CHKERRQ(ierr);
805 #if defined (USE_CTABLE)
806   if (aij->colmap) TableDelete(aij->colmap);
807 #else
808   if (aij->colmap) PetscFree(aij->colmap);
809 #endif
810   if (aij->garray) PetscFree(aij->garray);
811   if (aij->lvec)   VecDestroy(aij->lvec);
812   if (aij->Mvctx)  VecScatterDestroy(aij->Mvctx);
813   if (aij->rowvalues) PetscFree(aij->rowvalues);
814   PetscFree(aij);
815   PLogObjectDestroy(mat);
816   PetscHeaderDestroy(mat);
817   PetscFunctionReturn(0);
818 }
819 
820 #undef __FUNC__
821 #define __FUNC__ "MatView_MPIAIJ_Binary"
822 extern int MatView_MPIAIJ_Binary(Mat mat,Viewer viewer)
823 {
824   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
825   int         ierr;
826 
827   PetscFunctionBegin;
828   if (aij->size == 1) {
829     ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
830   }
831   else SETERRQ(PETSC_ERR_SUP,0,"Only uniprocessor output supported");
832   PetscFunctionReturn(0);
833 }
834 
835 #undef __FUNC__
836 #define __FUNC__ "MatView_MPIAIJ_ASCIIorDraworSocket"
837 extern int MatView_MPIAIJ_ASCIIorDraworSocket(Mat mat,Viewer viewer)
838 {
839   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
840   Mat_SeqAIJ* C = (Mat_SeqAIJ*)aij->A->data;
841   int         ierr, format,shift = C->indexshift,rank = aij->rank ;
842   int         size = aij->size;
843   FILE        *fd;
844   ViewerType  vtype;
845 
846   PetscFunctionBegin;
847   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
848   if (PetscTypeCompare(vtype,ASCII_VIEWER)) {
849     ierr = ViewerGetFormat(viewer,&format);
850     if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
851       MatInfo info;
852       int     flg;
853       MPI_Comm_rank(mat->comm,&rank);
854       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
855       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
856       ierr = OptionsHasName(PETSC_NULL,"-mat_aij_no_inode",&flg); CHKERRQ(ierr);
857       PetscSequentialPhaseBegin(mat->comm,1);
858       if (flg) fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d mem %d, not using I-node routines\n",
859          rank,aij->m,(int)info.nz_used,(int)info.nz_allocated,(int)info.memory);
860       else fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d mem %d, using I-node routines\n",
861          rank,aij->m,(int)info.nz_used,(int)info.nz_allocated,(int)info.memory);
862       ierr = MatGetInfo(aij->A,MAT_LOCAL,&info);
863       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used);
864       ierr = MatGetInfo(aij->B,MAT_LOCAL,&info);
865       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used);
866       fflush(fd);
867       PetscSequentialPhaseEnd(mat->comm,1);
868       ierr = VecScatterView(aij->Mvctx,viewer); CHKERRQ(ierr);
869       PetscFunctionReturn(0);
870     } else if (format == VIEWER_FORMAT_ASCII_INFO) {
871       PetscFunctionReturn(0);
872     }
873   } else if (PetscTypeCompare(vtype,DRAW_VIEWER)) {
874     Draw       draw;
875     PetscTruth isnull;
876     ierr = ViewerDrawGetDraw(viewer,0,&draw); CHKERRQ(ierr);
877     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) PetscFunctionReturn(0);
878   }
879 
880   if (size == 1) {
881     ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
882   } else {
883     /* assemble the entire matrix onto first processor. */
884     Mat         A;
885     Mat_SeqAIJ *Aloc;
886     int         M = aij->M, N = aij->N,m,*ai,*aj,row,*cols,i,*ct;
887     Scalar      *a;
888 
889     if (!rank) {
890       ierr = MatCreateMPIAIJ(mat->comm,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);CHKERRQ(ierr);
891     } else {
892       ierr = MatCreateMPIAIJ(mat->comm,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);CHKERRQ(ierr);
893     }
894     PLogObjectParent(mat,A);
895 
896     /* copy over the A part */
897     Aloc = (Mat_SeqAIJ*) aij->A->data;
898     m = Aloc->m; ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
899     row = aij->rstart;
900     for ( i=0; i<ai[m]+shift; i++ ) {aj[i] += aij->cstart + shift;}
901     for ( i=0; i<m; i++ ) {
902       ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],aj,a,INSERT_VALUES);CHKERRQ(ierr);
903       row++; a += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
904     }
905     aj = Aloc->j;
906     for ( i=0; i<ai[m]+shift; i++ ) {aj[i] -= aij->cstart + shift;}
907 
908     /* copy over the B part */
909     Aloc = (Mat_SeqAIJ*) aij->B->data;
910     m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
911     row = aij->rstart;
912     ct = cols = (int *) PetscMalloc( (ai[m]+1)*sizeof(int) ); CHKPTRQ(cols);
913     for ( i=0; i<ai[m]+shift; i++ ) {cols[i] = aij->garray[aj[i]+shift];}
914     for ( i=0; i<m; i++ ) {
915       ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],cols,a,INSERT_VALUES);CHKERRQ(ierr);
916       row++; a += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
917     }
918     PetscFree(ct);
919     ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
920     ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
921     /*
922        Everyone has to call to draw the matrix since the graphics waits are
923        synchronized across all processors that share the Draw object
924     */
925     if (!rank || PetscTypeCompare(vtype,DRAW_VIEWER)) {
926       ierr = MatView(((Mat_MPIAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
927     }
928     ierr = MatDestroy(A); CHKERRQ(ierr);
929   }
930   PetscFunctionReturn(0);
931 }
932 
933 #undef __FUNC__
934 #define __FUNC__ "MatView_MPIAIJ"
935 int MatView_MPIAIJ(Mat mat,Viewer viewer)
936 {
937   int         ierr;
938   ViewerType  vtype;
939 
940   PetscFunctionBegin;
941   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
942   if (PetscTypeCompare(vtype,ASCII_VIEWER) || PetscTypeCompare(vtype,DRAW_VIEWER) ||
943       PetscTypeCompare(vtype,SOCKET_VIEWER)) {
944     ierr = MatView_MPIAIJ_ASCIIorDraworSocket(mat,viewer); CHKERRQ(ierr);
945   } else if (PetscTypeCompare(vtype,BINARY_VIEWER)) {
946     ierr = MatView_MPIAIJ_Binary(mat,viewer);CHKERRQ(ierr);
947   } else {
948     SETERRQ(1,1,"Viewer type not supported by PETSc object");
949   }
950   PetscFunctionReturn(0);
951 }
952 
953 /*
954     This has to provide several versions.
955 
956      2) a) use only local smoothing updating outer values only once.
957         b) local smoothing updating outer values each inner iteration
958      3) color updating out values betwen colors.
959 */
960 #undef __FUNC__
961 #define __FUNC__ "MatRelax_MPIAIJ"
962 int MatRelax_MPIAIJ(Mat matin,Vec bb,double omega,MatSORType flag,
963                            double fshift,int its,Vec xx)
964 {
965   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
966   Mat        AA = mat->A, BB = mat->B;
967   Mat_SeqAIJ *A = (Mat_SeqAIJ *) AA->data, *B = (Mat_SeqAIJ *)BB->data;
968   Scalar     *b,*x,*xs,*ls,d,*v,sum;
969   int        ierr,*idx, *diag;
970   int        n = mat->n, m = mat->m, i,shift = A->indexshift;
971 
972   PetscFunctionBegin;
973   ierr = VecGetArray(xx,&x); CHKERRQ(ierr);
974   ierr = VecGetArray(bb,&b); CHKERRQ(ierr);
975   ierr = VecGetArray(mat->lvec,&ls);CHKERRQ(ierr);
976   xs = x + shift; /* shift by one for index start of 1 */
977   ls = ls + shift;
978   if (!A->diag) {ierr = MatMarkDiag_SeqAIJ(AA); CHKERRQ(ierr);}
979   diag = A->diag;
980   if ((flag & SOR_LOCAL_SYMMETRIC_SWEEP) == SOR_LOCAL_SYMMETRIC_SWEEP){
981     if (flag & SOR_ZERO_INITIAL_GUESS) {
982       ierr = (*mat->A->ops->relax)(mat->A,bb,omega,flag,fshift,its,xx);CHKERRQ(ierr);
983       ierr = VecRestoreArray(xx,&x); CHKERRQ(ierr);
984       ierr = VecRestoreArray(bb,&b); CHKERRQ(ierr);
985       ierr = VecRestoreArray(mat->lvec,&ls);CHKERRQ(ierr);
986       PetscFunctionReturn(0);
987     }
988     ierr = VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);CHKERRQ(ierr);
989     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);CHKERRQ(ierr);
990     while (its--) {
991       /* go down through the rows */
992       for ( i=0; i<m; i++ ) {
993         n    = A->i[i+1] - A->i[i];
994 	PLogFlops(4*n+3);
995         idx  = A->j + A->i[i] + shift;
996         v    = A->a + A->i[i] + shift;
997         sum  = b[i];
998         SPARSEDENSEMDOT(sum,xs,v,idx,n);
999         d    = fshift + A->a[diag[i]+shift];
1000         n    = B->i[i+1] - B->i[i];
1001         idx  = B->j + B->i[i] + shift;
1002         v    = B->a + B->i[i] + shift;
1003         SPARSEDENSEMDOT(sum,ls,v,idx,n);
1004         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
1005       }
1006       /* come up through the rows */
1007       for ( i=m-1; i>-1; i-- ) {
1008         n    = A->i[i+1] - A->i[i];
1009 	PLogFlops(4*n+3)
1010         idx  = A->j + A->i[i] + shift;
1011         v    = A->a + A->i[i] + shift;
1012         sum  = b[i];
1013         SPARSEDENSEMDOT(sum,xs,v,idx,n);
1014         d    = fshift + A->a[diag[i]+shift];
1015         n    = B->i[i+1] - B->i[i];
1016         idx  = B->j + B->i[i] + shift;
1017         v    = B->a + B->i[i] + shift;
1018         SPARSEDENSEMDOT(sum,ls,v,idx,n);
1019         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
1020       }
1021     }
1022   } else if (flag & SOR_LOCAL_FORWARD_SWEEP){
1023     if (flag & SOR_ZERO_INITIAL_GUESS) {
1024       ierr = (*mat->A->ops->relax)(mat->A,bb,omega,flag,fshift,its,xx);CHKERRQ(ierr);
1025       ierr = VecRestoreArray(xx,&x); CHKERRQ(ierr);
1026       ierr = VecRestoreArray(bb,&b); CHKERRQ(ierr);
1027       ierr = VecRestoreArray(mat->lvec,&ls);CHKERRQ(ierr);
1028       PetscFunctionReturn(0);
1029     }
1030     ierr = VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);CHKERRQ(ierr);
1031     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);CHKERRQ(ierr);
1032     while (its--) {
1033       for ( i=0; i<m; i++ ) {
1034         n    = A->i[i+1] - A->i[i];
1035 	PLogFlops(4*n+3);
1036         idx  = A->j + A->i[i] + shift;
1037         v    = A->a + A->i[i] + shift;
1038         sum  = b[i];
1039         SPARSEDENSEMDOT(sum,xs,v,idx,n);
1040         d    = fshift + A->a[diag[i]+shift];
1041         n    = B->i[i+1] - B->i[i];
1042         idx  = B->j + B->i[i] + shift;
1043         v    = B->a + B->i[i] + shift;
1044         SPARSEDENSEMDOT(sum,ls,v,idx,n);
1045         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
1046       }
1047     }
1048   } else if (flag & SOR_LOCAL_BACKWARD_SWEEP){
1049     if (flag & SOR_ZERO_INITIAL_GUESS) {
1050       ierr = (*mat->A->ops->relax)(mat->A,bb,omega,flag,fshift,its,xx);CHKERRQ(ierr);
1051       ierr = VecRestoreArray(xx,&x); CHKERRQ(ierr);
1052       ierr = VecRestoreArray(bb,&b); CHKERRQ(ierr);
1053       ierr = VecRestoreArray(mat->lvec,&ls);CHKERRQ(ierr);
1054       PetscFunctionReturn(0);
1055     }
1056     ierr = VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);CHKERRQ(ierr);
1057     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);CHKERRQ(ierr);
1058     while (its--) {
1059       for ( i=m-1; i>-1; i-- ) {
1060         n    = A->i[i+1] - A->i[i];
1061 	PLogFlops(4*n+3);
1062         idx  = A->j + A->i[i] + shift;
1063         v    = A->a + A->i[i] + shift;
1064         sum  = b[i];
1065         SPARSEDENSEMDOT(sum,xs,v,idx,n);
1066         d    = fshift + A->a[diag[i]+shift];
1067         n    = B->i[i+1] - B->i[i];
1068         idx  = B->j + B->i[i] + shift;
1069         v    = B->a + B->i[i] + shift;
1070         SPARSEDENSEMDOT(sum,ls,v,idx,n);
1071         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
1072       }
1073     }
1074   } else {
1075     SETERRQ(PETSC_ERR_SUP,0,"Parallel SOR not supported");
1076   }
1077   ierr = VecRestoreArray(xx,&x); CHKERRQ(ierr);
1078   ierr = VecRestoreArray(bb,&b); CHKERRQ(ierr);
1079   ierr = VecRestoreArray(mat->lvec,&ls);CHKERRQ(ierr);
1080   PetscFunctionReturn(0);
1081 }
1082 
1083 #undef __FUNC__
1084 #define __FUNC__ "MatGetInfo_MPIAIJ"
1085 int MatGetInfo_MPIAIJ(Mat matin,MatInfoType flag,MatInfo *info)
1086 {
1087   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1088   Mat        A = mat->A, B = mat->B;
1089   int        ierr;
1090   double     isend[5], irecv[5];
1091 
1092   PetscFunctionBegin;
1093   info->block_size     = 1.0;
1094   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
1095   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->nz_unneeded;
1096   isend[3] = info->memory;  isend[4] = info->mallocs;
1097   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
1098   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->nz_unneeded;
1099   isend[3] += info->memory;  isend[4] += info->mallocs;
1100   if (flag == MAT_LOCAL) {
1101     info->nz_used      = isend[0];
1102     info->nz_allocated = isend[1];
1103     info->nz_unneeded  = isend[2];
1104     info->memory       = isend[3];
1105     info->mallocs      = isend[4];
1106   } else if (flag == MAT_GLOBAL_MAX) {
1107     ierr = MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_MAX,matin->comm);CHKERRQ(ierr);
1108     info->nz_used      = irecv[0];
1109     info->nz_allocated = irecv[1];
1110     info->nz_unneeded  = irecv[2];
1111     info->memory       = irecv[3];
1112     info->mallocs      = irecv[4];
1113   } else if (flag == MAT_GLOBAL_SUM) {
1114     ierr = MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_SUM,matin->comm);CHKERRQ(ierr);
1115     info->nz_used      = irecv[0];
1116     info->nz_allocated = irecv[1];
1117     info->nz_unneeded  = irecv[2];
1118     info->memory       = irecv[3];
1119     info->mallocs      = irecv[4];
1120   }
1121   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
1122   info->fill_ratio_needed = 0;
1123   info->factor_mallocs    = 0;
1124   info->rows_global       = (double)mat->M;
1125   info->columns_global    = (double)mat->N;
1126   info->rows_local        = (double)mat->m;
1127   info->columns_local     = (double)mat->N;
1128 
1129   PetscFunctionReturn(0);
1130 }
1131 
1132 #undef __FUNC__
1133 #define __FUNC__ "MatSetOption_MPIAIJ"
1134 int MatSetOption_MPIAIJ(Mat A,MatOption op)
1135 {
1136   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
1137   int        ierr;
1138 
1139   PetscFunctionBegin;
1140   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
1141       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
1142       op == MAT_COLUMNS_UNSORTED ||
1143       op == MAT_COLUMNS_SORTED ||
1144       op == MAT_NEW_NONZERO_ALLOCATION_ERR ||
1145       op == MAT_NEW_NONZERO_LOCATION_ERR) {
1146         ierr = MatSetOption(a->A,op);CHKERRQ(ierr);
1147         ierr = MatSetOption(a->B,op);CHKERRQ(ierr);
1148   } else if (op == MAT_ROW_ORIENTED) {
1149     a->roworiented = 1;
1150     ierr = MatSetOption(a->A,op);CHKERRQ(ierr);
1151     ierr = MatSetOption(a->B,op);CHKERRQ(ierr);
1152   } else if (op == MAT_ROWS_SORTED ||
1153              op == MAT_ROWS_UNSORTED ||
1154              op == MAT_SYMMETRIC ||
1155              op == MAT_STRUCTURALLY_SYMMETRIC ||
1156              op == MAT_YES_NEW_DIAGONALS)
1157     PLogInfo(A,"MatSetOption_MPIAIJ:Option ignored\n");
1158   else if (op == MAT_COLUMN_ORIENTED) {
1159     a->roworiented = 0;
1160     ierr = MatSetOption(a->A,op);CHKERRQ(ierr);
1161     ierr = MatSetOption(a->B,op);CHKERRQ(ierr);
1162   } else if (op == MAT_IGNORE_OFF_PROC_ENTRIES) {
1163     a->donotstash = 1;
1164   } else if (op == MAT_NO_NEW_DIAGONALS){
1165     SETERRQ(PETSC_ERR_SUP,0,"MAT_NO_NEW_DIAGONALS");
1166   } else {
1167     SETERRQ(PETSC_ERR_SUP,0,"unknown option");
1168   }
1169   PetscFunctionReturn(0);
1170 }
1171 
1172 #undef __FUNC__
1173 #define __FUNC__ "MatGetSize_MPIAIJ"
1174 int MatGetSize_MPIAIJ(Mat matin,int *m,int *n)
1175 {
1176   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1177 
1178   PetscFunctionBegin;
1179   if (m) *m = mat->M;
1180   if (n) *n = mat->N;
1181   PetscFunctionReturn(0);
1182 }
1183 
1184 #undef __FUNC__
1185 #define __FUNC__ "MatGetLocalSize_MPIAIJ"
1186 int MatGetLocalSize_MPIAIJ(Mat matin,int *m,int *n)
1187 {
1188   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1189 
1190   PetscFunctionBegin;
1191   if (m) *m = mat->m;
1192   if (n) *n = mat->n;
1193   PetscFunctionReturn(0);
1194 }
1195 
1196 #undef __FUNC__
1197 #define __FUNC__ "MatGetOwnershipRange_MPIAIJ"
1198 int MatGetOwnershipRange_MPIAIJ(Mat matin,int *m,int *n)
1199 {
1200   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1201 
1202   PetscFunctionBegin;
1203   *m = mat->rstart; *n = mat->rend;
1204   PetscFunctionReturn(0);
1205 }
1206 
1207 extern int MatGetRow_SeqAIJ(Mat,int,int*,int**,Scalar**);
1208 extern int MatRestoreRow_SeqAIJ(Mat,int,int*,int**,Scalar**);
1209 
1210 #undef __FUNC__
1211 #define __FUNC__ "MatGetRow_MPIAIJ"
1212 int MatGetRow_MPIAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
1213 {
1214   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1215   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
1216   int        i, ierr, *cworkA, *cworkB, **pcA, **pcB, cstart = mat->cstart;
1217   int        nztot, nzA, nzB, lrow, rstart = mat->rstart, rend = mat->rend;
1218   int        *cmap, *idx_p;
1219 
1220   PetscFunctionBegin;
1221   if (mat->getrowactive == PETSC_TRUE) SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"Already active");
1222   mat->getrowactive = PETSC_TRUE;
1223 
1224   if (!mat->rowvalues && (idx || v)) {
1225     /*
1226         allocate enough space to hold information from the longest row.
1227     */
1228     Mat_SeqAIJ *Aa = (Mat_SeqAIJ *) mat->A->data,*Ba = (Mat_SeqAIJ *) mat->B->data;
1229     int     max = 1,m = mat->m,tmp;
1230     for ( i=0; i<m; i++ ) {
1231       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
1232       if (max < tmp) { max = tmp; }
1233     }
1234     mat->rowvalues  = (Scalar *) PetscMalloc(max*(sizeof(int)+sizeof(Scalar)));CHKPTRQ(mat->rowvalues);
1235     mat->rowindices = (int *) (mat->rowvalues + max);
1236   }
1237 
1238   if (row < rstart || row >= rend) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Only local rows")
1239   lrow = row - rstart;
1240 
1241   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
1242   if (!v)   {pvA = 0; pvB = 0;}
1243   if (!idx) {pcA = 0; if (!v) pcB = 0;}
1244   ierr = (*mat->A->ops->getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1245   ierr = (*mat->B->ops->getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1246   nztot = nzA + nzB;
1247 
1248   cmap  = mat->garray;
1249   if (v  || idx) {
1250     if (nztot) {
1251       /* Sort by increasing column numbers, assuming A and B already sorted */
1252       int imark = -1;
1253       if (v) {
1254         *v = v_p = mat->rowvalues;
1255         for ( i=0; i<nzB; i++ ) {
1256           if (cmap[cworkB[i]] < cstart)   v_p[i] = vworkB[i];
1257           else break;
1258         }
1259         imark = i;
1260         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
1261         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
1262       }
1263       if (idx) {
1264         *idx = idx_p = mat->rowindices;
1265         if (imark > -1) {
1266           for ( i=0; i<imark; i++ ) {
1267             idx_p[i] = cmap[cworkB[i]];
1268           }
1269         } else {
1270           for ( i=0; i<nzB; i++ ) {
1271             if (cmap[cworkB[i]] < cstart)   idx_p[i] = cmap[cworkB[i]];
1272             else break;
1273           }
1274           imark = i;
1275         }
1276         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart + cworkA[i];
1277         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]];
1278       }
1279     } else {
1280       if (idx) *idx = 0;
1281       if (v)   *v   = 0;
1282     }
1283   }
1284   *nz = nztot;
1285   ierr = (*mat->A->ops->restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1286   ierr = (*mat->B->ops->restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1287   PetscFunctionReturn(0);
1288 }
1289 
1290 #undef __FUNC__
1291 #define __FUNC__ "MatRestoreRow_MPIAIJ"
1292 int MatRestoreRow_MPIAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
1293 {
1294   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
1295 
1296   PetscFunctionBegin;
1297   if (aij->getrowactive == PETSC_FALSE) {
1298     SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"MatGetRow not called");
1299   }
1300   aij->getrowactive = PETSC_FALSE;
1301   PetscFunctionReturn(0);
1302 }
1303 
1304 #undef __FUNC__
1305 #define __FUNC__ "MatNorm_MPIAIJ"
1306 int MatNorm_MPIAIJ(Mat mat,NormType type,double *norm)
1307 {
1308   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
1309   Mat_SeqAIJ *amat = (Mat_SeqAIJ*) aij->A->data, *bmat = (Mat_SeqAIJ*) aij->B->data;
1310   int        ierr, i, j, cstart = aij->cstart,shift = amat->indexshift;
1311   double     sum = 0.0;
1312   Scalar     *v;
1313 
1314   PetscFunctionBegin;
1315   if (aij->size == 1) {
1316     ierr =  MatNorm(aij->A,type,norm); CHKERRQ(ierr);
1317   } else {
1318     if (type == NORM_FROBENIUS) {
1319       v = amat->a;
1320       for (i=0; i<amat->nz; i++ ) {
1321 #if defined(USE_PETSC_COMPLEX)
1322         sum += PetscReal(PetscConj(*v)*(*v)); v++;
1323 #else
1324         sum += (*v)*(*v); v++;
1325 #endif
1326       }
1327       v = bmat->a;
1328       for (i=0; i<bmat->nz; i++ ) {
1329 #if defined(USE_PETSC_COMPLEX)
1330         sum += PetscReal(PetscConj(*v)*(*v)); v++;
1331 #else
1332         sum += (*v)*(*v); v++;
1333 #endif
1334       }
1335       ierr = MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);CHKERRQ(ierr);
1336       *norm = sqrt(*norm);
1337     } else if (type == NORM_1) { /* max column norm */
1338       double *tmp, *tmp2;
1339       int    *jj, *garray = aij->garray;
1340       tmp  = (double *) PetscMalloc( (aij->N+1)*sizeof(double) ); CHKPTRQ(tmp);
1341       tmp2 = (double *) PetscMalloc( (aij->N+1)*sizeof(double) ); CHKPTRQ(tmp2);
1342       PetscMemzero(tmp,aij->N*sizeof(double));
1343       *norm = 0.0;
1344       v = amat->a; jj = amat->j;
1345       for ( j=0; j<amat->nz; j++ ) {
1346         tmp[cstart + *jj++ + shift] += PetscAbsScalar(*v);  v++;
1347       }
1348       v = bmat->a; jj = bmat->j;
1349       for ( j=0; j<bmat->nz; j++ ) {
1350         tmp[garray[*jj++ + shift]] += PetscAbsScalar(*v); v++;
1351       }
1352       ierr = MPI_Allreduce(tmp,tmp2,aij->N,MPI_DOUBLE,MPI_SUM,mat->comm);CHKERRQ(ierr);
1353       for ( j=0; j<aij->N; j++ ) {
1354         if (tmp2[j] > *norm) *norm = tmp2[j];
1355       }
1356       PetscFree(tmp); PetscFree(tmp2);
1357     } else if (type == NORM_INFINITY) { /* max row norm */
1358       double ntemp = 0.0;
1359       for ( j=0; j<amat->m; j++ ) {
1360         v = amat->a + amat->i[j] + shift;
1361         sum = 0.0;
1362         for ( i=0; i<amat->i[j+1]-amat->i[j]; i++ ) {
1363           sum += PetscAbsScalar(*v); v++;
1364         }
1365         v = bmat->a + bmat->i[j] + shift;
1366         for ( i=0; i<bmat->i[j+1]-bmat->i[j]; i++ ) {
1367           sum += PetscAbsScalar(*v); v++;
1368         }
1369         if (sum > ntemp) ntemp = sum;
1370       }
1371       ierr = MPI_Allreduce(&ntemp,norm,1,MPI_DOUBLE,MPI_MAX,mat->comm);CHKERRQ(ierr);
1372     } else {
1373       SETERRQ(PETSC_ERR_SUP,0,"No support for two norm");
1374     }
1375   }
1376   PetscFunctionReturn(0);
1377 }
1378 
1379 #undef __FUNC__
1380 #define __FUNC__ "MatTranspose_MPIAIJ"
1381 int MatTranspose_MPIAIJ(Mat A,Mat *matout)
1382 {
1383   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
1384   Mat_SeqAIJ *Aloc = (Mat_SeqAIJ *) a->A->data;
1385   int        ierr,shift = Aloc->indexshift;
1386   int        M = a->M, N = a->N,m,*ai,*aj,row,*cols,i,*ct;
1387   Mat        B;
1388   Scalar     *array;
1389 
1390   PetscFunctionBegin;
1391   if (matout == PETSC_NULL && M != N) {
1392     SETERRQ(PETSC_ERR_ARG_SIZ,0,"Square matrix only for in-place");
1393   }
1394 
1395   ierr = MatCreateMPIAIJ(A->comm,a->n,a->m,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);CHKERRQ(ierr);
1396 
1397   /* copy over the A part */
1398   Aloc = (Mat_SeqAIJ*) a->A->data;
1399   m = Aloc->m; ai = Aloc->i; aj = Aloc->j; array = Aloc->a;
1400   row = a->rstart;
1401   for ( i=0; i<ai[m]+shift; i++ ) {aj[i] += a->cstart + shift;}
1402   for ( i=0; i<m; i++ ) {
1403     ierr = MatSetValues(B,ai[i+1]-ai[i],aj,1,&row,array,INSERT_VALUES);CHKERRQ(ierr);
1404     row++; array += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
1405   }
1406   aj = Aloc->j;
1407   for ( i=0; i<ai[m]+shift; i++ ) {aj[i] -= a->cstart + shift;}
1408 
1409   /* copy over the B part */
1410   Aloc = (Mat_SeqAIJ*) a->B->data;
1411   m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; array = Aloc->a;
1412   row = a->rstart;
1413   ct = cols = (int *) PetscMalloc( (1+ai[m]-shift)*sizeof(int) ); CHKPTRQ(cols);
1414   for ( i=0; i<ai[m]+shift; i++ ) {cols[i] = a->garray[aj[i]+shift];}
1415   for ( i=0; i<m; i++ ) {
1416     ierr = MatSetValues(B,ai[i+1]-ai[i],cols,1,&row,array,INSERT_VALUES);CHKERRQ(ierr);
1417     row++; array += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
1418   }
1419   PetscFree(ct);
1420   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1421   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1422   if (matout != PETSC_NULL) {
1423     *matout = B;
1424   } else {
1425     PetscOps       *Abops;
1426     struct _MatOps *Aops;
1427 
1428     /* This isn't really an in-place transpose .... but free data structures from a */
1429     PetscFree(a->rowners);
1430     ierr = MatDestroy(a->A); CHKERRQ(ierr);
1431     ierr = MatDestroy(a->B); CHKERRQ(ierr);
1432 #if defined (USE_CTABLE)
1433     if (a->colmap) TableDelete(a->colmap);
1434 #else
1435     if (a->colmap) PetscFree(a->colmap);
1436 #endif
1437     if (a->garray) PetscFree(a->garray);
1438     if (a->lvec) VecDestroy(a->lvec);
1439     if (a->Mvctx) VecScatterDestroy(a->Mvctx);
1440     PetscFree(a);
1441 
1442     /*
1443        This is horrible, horrible code. We need to keep the
1444       A pointers for the bops and ops but copy everything
1445       else from C.
1446     */
1447     Abops = A->bops;
1448     Aops  = A->ops;
1449     PetscMemcpy(A,B,sizeof(struct _p_Mat));
1450     A->bops = Abops;
1451     A->ops  = Aops;
1452     PetscHeaderDestroy(B);
1453   }
1454   PetscFunctionReturn(0);
1455 }
1456 
1457 #undef __FUNC__
1458 #define __FUNC__ "MatDiagonalScale_MPIAIJ"
1459 int MatDiagonalScale_MPIAIJ(Mat mat,Vec ll,Vec rr)
1460 {
1461   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
1462   Mat a = aij->A, b = aij->B;
1463   int ierr,s1,s2,s3;
1464 
1465   PetscFunctionBegin;
1466   ierr = MatGetLocalSize(mat,&s2,&s3); CHKERRQ(ierr);
1467   if (rr) {
1468     ierr = VecGetLocalSize(rr,&s1);CHKERRQ(ierr);
1469     if (s1!=s3) SETERRQ(PETSC_ERR_ARG_SIZ,0,"right vector non-conforming local size");
1470     /* Overlap communication with computation. */
1471     ierr = VecScatterBegin(rr,aij->lvec,INSERT_VALUES,SCATTER_FORWARD,aij->Mvctx); CHKERRQ(ierr);
1472   }
1473   if (ll) {
1474     ierr = VecGetLocalSize(ll,&s1);CHKERRQ(ierr);
1475     if (s1!=s2) SETERRQ(PETSC_ERR_ARG_SIZ,0,"left vector non-conforming local size");
1476     ierr = (*b->ops->diagonalscale)(b,ll,0); CHKERRQ(ierr);
1477   }
1478   /* scale  the diagonal block */
1479   ierr = (*a->ops->diagonalscale)(a,ll,rr); CHKERRQ(ierr);
1480 
1481   if (rr) {
1482     /* Do a scatter end and then right scale the off-diagonal block */
1483     ierr = VecScatterEnd(rr,aij->lvec,INSERT_VALUES,SCATTER_FORWARD,aij->Mvctx); CHKERRQ(ierr);
1484     ierr = (*b->ops->diagonalscale)(b,0,aij->lvec); CHKERRQ(ierr);
1485   }
1486 
1487   PetscFunctionReturn(0);
1488 }
1489 
1490 
1491 extern int MatPrintHelp_SeqAIJ(Mat);
1492 #undef __FUNC__
1493 #define __FUNC__ "MatPrintHelp_MPIAIJ"
1494 int MatPrintHelp_MPIAIJ(Mat A)
1495 {
1496   Mat_MPIAIJ *a   = (Mat_MPIAIJ*) A->data;
1497   int        ierr;
1498 
1499   PetscFunctionBegin;
1500   if (!a->rank) {
1501     ierr = MatPrintHelp_SeqAIJ(a->A);CHKERRQ(ierr);
1502   }
1503   PetscFunctionReturn(0);
1504 }
1505 
1506 #undef __FUNC__
1507 #define __FUNC__ "MatGetBlockSize_MPIAIJ"
1508 int MatGetBlockSize_MPIAIJ(Mat A,int *bs)
1509 {
1510   PetscFunctionBegin;
1511   *bs = 1;
1512   PetscFunctionReturn(0);
1513 }
1514 #undef __FUNC__
1515 #define __FUNC__ "MatSetUnfactored_MPIAIJ"
1516 int MatSetUnfactored_MPIAIJ(Mat A)
1517 {
1518   Mat_MPIAIJ *a   = (Mat_MPIAIJ*) A->data;
1519   int        ierr;
1520 
1521   PetscFunctionBegin;
1522   ierr = MatSetUnfactored(a->A); CHKERRQ(ierr);
1523   PetscFunctionReturn(0);
1524 }
1525 
1526 #undef __FUNC__
1527 #define __FUNC__ "MatEqual_MPIAIJ"
1528 int MatEqual_MPIAIJ(Mat A, Mat B, PetscTruth *flag)
1529 {
1530   Mat_MPIAIJ *matB = (Mat_MPIAIJ *) B->data,*matA = (Mat_MPIAIJ *) A->data;
1531   Mat        a, b, c, d;
1532   PetscTruth flg;
1533   int        ierr;
1534 
1535   PetscFunctionBegin;
1536   if (B->type != MATMPIAIJ) SETERRQ(PETSC_ERR_ARG_INCOMP,0,"Matrices must be same type");
1537   a = matA->A; b = matA->B;
1538   c = matB->A; d = matB->B;
1539 
1540   ierr = MatEqual(a, c, &flg); CHKERRQ(ierr);
1541   if (flg == PETSC_TRUE) {
1542     ierr = MatEqual(b, d, &flg); CHKERRQ(ierr);
1543   }
1544   ierr = MPI_Allreduce(&flg, flag, 1, MPI_INT, MPI_LAND, A->comm);CHKERRQ(ierr);
1545   PetscFunctionReturn(0);
1546 }
1547 
1548 #undef __FUNC__
1549 #define __FUNC__ "MatCopy_MPIAIJ"
1550 int MatCopy_MPIAIJ(Mat A,Mat B,MatStructure str)
1551 {
1552   int        ierr;
1553   Mat_MPIAIJ *a = (Mat_MPIAIJ *)A->data;
1554   Mat_MPIAIJ *b = (Mat_MPIAIJ *)B->data;
1555 
1556   PetscFunctionBegin;
1557   if (str != SAME_NONZERO_PATTERN || B->type != MATMPIAIJ) {
1558     /* because of the column compression in the off-processor part of the matrix a->B,
1559        the number of columns in a->B and b->B may be different, hence we cannot call
1560        the MatCopy() directly on the two parts. If need be, we can provide a more
1561        efficient copy than the MatCopy_Basic() by first uncompressing the a->B matrices
1562        then copying the submatrices */
1563     ierr = MatCopy_Basic(A,B,str);CHKERRQ(ierr);
1564   } else {
1565     ierr = MatCopy(a->A,b->A,str);CHKERRQ(ierr);
1566     ierr = MatCopy(a->B,b->B,str);CHKERRQ(ierr);
1567   }
1568   PetscFunctionReturn(0);
1569 }
1570 
1571 extern int MatDuplicate_MPIAIJ(Mat,MatDuplicateOption,Mat *);
1572 extern int MatIncreaseOverlap_MPIAIJ(Mat , int, IS *, int);
1573 extern int MatFDColoringCreate_MPIAIJ(Mat,ISColoring,MatFDColoring);
1574 extern int MatGetSubMatrices_MPIAIJ (Mat ,int , IS *,IS *,MatReuse,Mat **);
1575 extern int MatGetSubMatrix_MPIAIJ (Mat ,IS,IS,int,MatReuse,Mat *);
1576 
1577 /* -------------------------------------------------------------------*/
1578 static struct _MatOps MatOps_Values = {MatSetValues_MPIAIJ,
1579        MatGetRow_MPIAIJ,
1580        MatRestoreRow_MPIAIJ,
1581        MatMult_MPIAIJ,
1582        MatMultAdd_MPIAIJ,
1583        MatMultTrans_MPIAIJ,
1584        MatMultTransAdd_MPIAIJ,
1585        0,
1586        0,
1587        0,
1588        0,
1589        0,
1590        0,
1591        MatRelax_MPIAIJ,
1592        MatTranspose_MPIAIJ,
1593        MatGetInfo_MPIAIJ,
1594        MatEqual_MPIAIJ,
1595        MatGetDiagonal_MPIAIJ,
1596        MatDiagonalScale_MPIAIJ,
1597        MatNorm_MPIAIJ,
1598        MatAssemblyBegin_MPIAIJ,
1599        MatAssemblyEnd_MPIAIJ,
1600        0,
1601        MatSetOption_MPIAIJ,
1602        MatZeroEntries_MPIAIJ,
1603        MatZeroRows_MPIAIJ,
1604        0,
1605        0,
1606        0,
1607        0,
1608        MatGetSize_MPIAIJ,
1609        MatGetLocalSize_MPIAIJ,
1610        MatGetOwnershipRange_MPIAIJ,
1611        0,
1612        0,
1613        0,
1614        0,
1615        MatDuplicate_MPIAIJ,
1616        0,
1617        0,
1618        0,
1619        0,
1620        0,
1621        MatGetSubMatrices_MPIAIJ,
1622        MatIncreaseOverlap_MPIAIJ,
1623        MatGetValues_MPIAIJ,
1624        MatCopy_MPIAIJ,
1625        MatPrintHelp_MPIAIJ,
1626        MatScale_MPIAIJ,
1627        0,
1628        0,
1629        0,
1630        MatGetBlockSize_MPIAIJ,
1631        0,
1632        0,
1633        0,
1634        0,
1635        MatFDColoringCreate_MPIAIJ,
1636        0,
1637        MatSetUnfactored_MPIAIJ,
1638        0,
1639        0,
1640        MatGetSubMatrix_MPIAIJ,
1641        0,
1642        0,
1643        MatGetMaps_Petsc};
1644 
1645 /* ----------------------------------------------------------------------------------------*/
1646 
1647 EXTERN_C_BEGIN
1648 #undef __FUNC__
1649 #define __FUNC__ "MatStoreValues_MPIAIJ"
1650 int MatStoreValues_MPIAIJ(Mat mat)
1651 {
1652   Mat_MPIAIJ *aij = (Mat_MPIAIJ *)mat->data;
1653   int        ierr;
1654 
1655   PetscFunctionBegin;
1656   ierr = MatStoreValues(aij->A);CHKERRQ(ierr);
1657   ierr = MatStoreValues(aij->B);CHKERRQ(ierr);
1658   PetscFunctionReturn(0);
1659 }
1660 EXTERN_C_END
1661 
1662 EXTERN_C_BEGIN
1663 #undef __FUNC__
1664 #define __FUNC__ "MatRetrieveValues_MPIAIJ"
1665 int MatRetrieveValues_MPIAIJ(Mat mat)
1666 {
1667   Mat_MPIAIJ *aij = (Mat_MPIAIJ *)mat->data;
1668   int        ierr;
1669 
1670   PetscFunctionBegin;
1671   ierr = MatRetrieveValues(aij->A);CHKERRQ(ierr);
1672   ierr = MatRetrieveValues(aij->B);CHKERRQ(ierr);
1673   PetscFunctionReturn(0);
1674 }
1675 EXTERN_C_END
1676 
1677 #include "pc.h"
1678 EXTERN_C_BEGIN
1679 extern int MatGetDiagonalBlock_MPIAIJ(Mat,PetscTruth *,MatReuse,Mat *);
1680 EXTERN_C_END
1681 
1682 #undef __FUNC__
1683 #define __FUNC__ "MatCreateMPIAIJ"
1684 /*@C
1685    MatCreateMPIAIJ - Creates a sparse parallel matrix in AIJ format
1686    (the default parallel PETSc format).  For good matrix assembly performance
1687    the user should preallocate the matrix storage by setting the parameters
1688    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1689    performance can be increased by more than a factor of 50.
1690 
1691    Collective on MPI_Comm
1692 
1693    Input Parameters:
1694 +  comm - MPI communicator
1695 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1696            This value should be the same as the local size used in creating the
1697            y vector for the matrix-vector product y = Ax.
1698 .  n - This value should be the same as the local size used in creating the
1699        x vector for the matrix-vector product y = Ax. (or PETSC_DECIDE to have
1700        calculated if N is given) For square matrices n is almost always m.
1701 .  M - number of global rows (or PETSC_DETERMINE to have calculated if m is given)
1702 .  N - number of global columns (or PETSC_DETERMINE to have calculated if n is given)
1703 .  d_nz  - number of nonzeros per row in DIAGONAL portion of local submatrix
1704            (same value is used for all local rows)
1705 .  d_nnz - array containing the number of nonzeros in the various rows of the
1706            DIAGONAL portion of the local submatrix (possibly different for each row)
1707            or PETSC_NULL, if d_nz is used to specify the nonzero structure.
1708            The size of this array is equal to the number of local rows, i.e 'm'.
1709            You must leave room for the diagonal entry even if it is zero.
1710 .  o_nz  - number of nonzeros per row in the OFF-DIAGONAL portion of local
1711            submatrix (same value is used for all local rows).
1712 -  o_nnz - array containing the number of nonzeros in the various rows of the
1713            OFF-DIAGONAL portion of the local submatrix (possibly different for
1714            each row) or PETSC_NULL, if o_nz is used to specify the nonzero
1715            structure. The size of this array is equal to the number
1716            of local rows, i.e 'm'.
1717 
1718    Output Parameter:
1719 .  A - the matrix
1720 
1721    Notes:
1722    m,n,M,N parameters specify the size of the matrix, and its partitioning across
1723    processors, while d_nz,d_nnz,o_nz,o_nnz parameters specify the approximate
1724    storage requirements for this matrix.
1725 
1726    If PETSC_DECIDE or  PETSC_DETERMINE is used for a particular argument on one
1727    processor than it must be used on all processors that share the object for
1728    that argument.
1729 
1730    The AIJ format (also called the Yale sparse matrix format or
1731    compressed row storage), is fully compatible with standard Fortran 77
1732    storage.  That is, the stored row and column indices can begin at
1733    either one (as in Fortran) or zero.  See the users manual for details.
1734 
1735    The user MUST specify either the local or global matrix dimensions
1736    (possibly both).
1737 
1738    The parallel matrix is partitioned such that the first m0 rows belong to
1739    process 0, the next m1 rows belong to process 1, the next m2 rows belong
1740    to process 2 etc.. where m0,m1,m2... are the input parameter 'm'.
1741 
1742    The DIAGONAL portion of the local submatrix of a processor can be defined
1743    as the submatrix which is obtained by extraction the part corresponding
1744    to the rows r1-r2 and columns r1-r2 of the global matrix, where r1 is the
1745    first row that belongs to the processor, and r2 is the last row belonging
1746    to the this processor. This is a square mxm matrix. The remaining portion
1747    of the local submatrix (mxN) constitute the OFF-DIAGONAL portion.
1748 
1749    If o_nnz, d_nnz are specified, then o_nz, and d_nz are ignored.
1750 
1751    By default, this format uses inodes (identical nodes) when possible.
1752    We search for consecutive rows with the same nonzero structure, thereby
1753    reusing matrix information to achieve increased efficiency.
1754 
1755    Options Database Keys:
1756 +  -mat_aij_no_inode  - Do not use inodes
1757 .  -mat_aij_inode_limit <limit> - Sets inode limit (max limit=5)
1758 -  -mat_aij_oneindex - Internally use indexing starting at 1
1759         rather than 0.  Note that when calling MatSetValues(),
1760         the user still MUST index entries starting at 0!
1761 .   -mat_mpi - use the parallel matrix data structures even on one processor
1762                (defaults to using SeqBAIJ format on one processor)
1763 .   -mat_mpi - use the parallel matrix data structures even on one processor
1764                (defaults to using SeqAIJ format on one processor)
1765 
1766 
1767    Example usage:
1768 
1769    Consider the following 8x8 matrix with 34 non-zero values, that is
1770    assembled across 3 processors. Lets assume that proc0 owns 3 rows,
1771    proc1 owns 3 rows, proc2 owns 2 rows. This division can be shown
1772    as follows:
1773 
1774 .vb
1775             1  2  0  |  0  3  0  |  0  4
1776     Proc0   0  5  6  |  7  0  0  |  8  0
1777             9  0 10  | 11  0  0  | 12  0
1778     -------------------------------------
1779            13  0 14  | 15 16 17  |  0  0
1780     Proc1   0 18  0  | 19 20 21  |  0  0
1781             0  0  0  | 22 23  0  | 24  0
1782     -------------------------------------
1783     Proc2  25 26 27  |  0  0 28  | 29  0
1784            30  0  0  | 31 32 33  |  0 34
1785 .ve
1786 
1787    This can be represented as a collection of submatrices as:
1788 
1789 .vb
1790       A B C
1791       D E F
1792       G H I
1793 .ve
1794 
1795    Where the submatrices A,B,C are owned by proc0, D,E,F are
1796    owned by proc1, G,H,I are owned by proc2.
1797 
1798    The 'm' parameters for proc0,proc1,proc2 are 3,3,2 respectively.
1799    The 'n' parameters for proc0,proc1,proc2 are 3,3,2 respectively.
1800    The 'M','N' parameters are 8,8, and have the same values on all procs.
1801 
1802    The DIAGONAL submatrices corresponding to proc0,proc1,proc2 are
1803    submatrices [A], [E], [I] respectively. The OFF-DIAGONAL submatrices
1804    corresponding to proc0,proc1,proc2 are [BC], [DF], [GH] respectively.
1805    Internally, each processor stores the DIAGONAL part, and the OFF-DIAGONAL
1806    part as SeqAIJ matrices. for eg: proc1 will store [E] as a SeqAIJ
1807    matrix, ans [DF] as another SeqAIJ matrix.
1808 
1809    When d_nz, o_nz parameters are specified, d_nz storage elements are
1810    allocated for every row of the local diagonal submatrix, and o_nz
1811    storage locations are allocated for every row of the OFF-DIAGONAL submat.
1812    One way to choose d_nz and o_nz is to use the max nonzerors per local
1813    rows for each of the local DIAGONAL, and the OFF-DIAGONAL submatrices.
1814    In this case, the values of d_nz,o_nz are:
1815 .vb
1816      proc0 : dnz = 2, o_nz = 2
1817      proc1 : dnz = 3, o_nz = 2
1818      proc2 : dnz = 1, o_nz = 4
1819 .ve
1820    We are allocating m*(d_nz+o_nz) storage locations for every proc. This
1821    translates to 3*(2+2)=12 for proc0, 3*(3+2)=15 for proc1, 2*(1+4)=10
1822    for proc3. i.e we are using 12+15+10=37 storage locations to store
1823    34 values.
1824 
1825    When d_nnz, o_nnz parameters are specified, the storage is specified
1826    for every row, coresponding to both DIAGONAL and OFF-DIAGONAL submatrices.
1827    In the above case the values for d_nnz,o_nnz are:
1828 .vb
1829      proc0: d_nnz = [2,2,2] and o_nnz = [2,2,2]
1830      proc1: d_nnz = [3,3,2] and o_nnz = [2,1,1]
1831      proc2: d_nnz = [1,1]   and o_nnz = [4,4]
1832 .ve
1833    Here the space allocated is sum of all the above values i.e 34, and
1834    hence pre-allocation is perfect.
1835 
1836    Level: intermediate
1837 
1838 .keywords: matrix, aij, compressed row, sparse, parallel
1839 
1840 .seealso: MatCreate(), MatCreateSeqAIJ(), MatSetValues()
1841 @*/
1842 int MatCreateMPIAIJ(MPI_Comm comm,int m,int n,int M,int N,int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1843 {
1844   Mat          B;
1845   Mat_MPIAIJ   *b;
1846   int          ierr, i,size,flag1 = 0, flag2 = 0;
1847 
1848   PetscFunctionBegin;
1849   MPI_Comm_size(comm,&size);
1850   ierr = OptionsHasName(PETSC_NULL,"-mat_mpiaij",&flag1); CHKERRQ(ierr);
1851   ierr = OptionsHasName(PETSC_NULL,"-mat_mpi",&flag2); CHKERRQ(ierr);
1852   if (!flag1 && !flag2 && size == 1) {
1853     if (M == PETSC_DECIDE) M = m;
1854     if (N == PETSC_DECIDE) N = n;
1855     ierr = MatCreateSeqAIJ(comm,M,N,d_nz,d_nnz,A); CHKERRQ(ierr);
1856     PetscFunctionReturn(0);
1857   }
1858 
1859   *A = 0;
1860   PetscHeaderCreate(B,_p_Mat,struct _MatOps,MAT_COOKIE,MATMPIAIJ,"Mat",comm,MatDestroy,MatView);
1861   PLogObjectCreate(B);
1862   B->data            = (void *) (b = PetscNew(Mat_MPIAIJ)); CHKPTRQ(b);
1863   PetscMemzero(b,sizeof(Mat_MPIAIJ));
1864   PetscMemcpy(B->ops,&MatOps_Values,sizeof(struct _MatOps));
1865   B->ops->destroy    = MatDestroy_MPIAIJ;
1866   B->ops->view       = MatView_MPIAIJ;
1867   B->factor          = 0;
1868   B->assembled       = PETSC_FALSE;
1869   B->mapping         = 0;
1870 
1871   B->insertmode      = NOT_SET_VALUES;
1872   b->size            = size;
1873   MPI_Comm_rank(comm,&b->rank);
1874 
1875   if (m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL)) {
1876     SETERRQ(PETSC_ERR_ARG_WRONG,0,"Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1877   }
1878 
1879   ierr = PetscSplitOwnership(comm,&m,&M);CHKERRQ(ierr);
1880   ierr = PetscSplitOwnership(comm,&n,&N);CHKERRQ(ierr);
1881   b->m = m; B->m = m;
1882   b->n = n; B->n = n;
1883   b->N = N; B->N = N;
1884   b->M = M; B->M = M;
1885 
1886   /* the information in the maps duplicates the information computed below, eventually
1887      we should remove the duplicate information that is not contained in the maps */
1888   ierr = MapCreateMPI(comm,m,M,&B->rmap);CHKERRQ(ierr);
1889   ierr = MapCreateMPI(comm,n,N,&B->cmap);CHKERRQ(ierr);
1890 
1891   /* build local table of row and column ownerships */
1892   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1893   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIAIJ));
1894   b->cowners = b->rowners + b->size + 2;
1895   ierr = MPI_Allgather(&m,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1896   b->rowners[0] = 0;
1897   for ( i=2; i<=b->size; i++ ) {
1898     b->rowners[i] += b->rowners[i-1];
1899   }
1900   b->rstart = b->rowners[b->rank];
1901   b->rend   = b->rowners[b->rank+1];
1902   ierr = MPI_Allgather(&n,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1903   b->cowners[0] = 0;
1904   for ( i=2; i<=b->size; i++ ) {
1905     b->cowners[i] += b->cowners[i-1];
1906   }
1907   b->cstart = b->cowners[b->rank];
1908   b->cend   = b->cowners[b->rank+1];
1909 
1910   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1911   ierr = MatCreateSeqAIJ(PETSC_COMM_SELF,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1912   PLogObjectParent(B,b->A);
1913   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1914   ierr = MatCreateSeqAIJ(PETSC_COMM_SELF,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1915   PLogObjectParent(B,b->B);
1916 
1917   /* build cache for off array entries formed */
1918   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1919   b->donotstash  = 0;
1920   b->colmap      = 0;
1921   b->garray      = 0;
1922   b->roworiented = 1;
1923 
1924   /* stuff used for matrix vector multiply */
1925   b->lvec      = 0;
1926   b->Mvctx     = 0;
1927 
1928   /* stuff for MatGetRow() */
1929   b->rowindices   = 0;
1930   b->rowvalues    = 0;
1931   b->getrowactive = PETSC_FALSE;
1932 
1933   ierr = PetscObjectComposeFunction((PetscObject)B,"MatStoreValues_C",
1934                                      "MatStoreValues_MPIAIJ",
1935                                      (void*)MatStoreValues_MPIAIJ);CHKERRQ(ierr);
1936   ierr = PetscObjectComposeFunction((PetscObject)B,"MatRetrieveValues_C",
1937                                      "MatRetrieveValues_MPIAIJ",
1938                                      (void*)MatRetrieveValues_MPIAIJ);CHKERRQ(ierr);
1939   ierr = PetscObjectComposeFunction((PetscObject)B,"MatGetDiagonalBlock_C",
1940                                      "MatGetDiagonalBlock_MPIAIJ",
1941                                      (void*)MatGetDiagonalBlock_MPIAIJ);CHKERRQ(ierr);
1942   *A = B;
1943   PetscFunctionReturn(0);
1944 }
1945 
1946 #undef __FUNC__
1947 #define __FUNC__ "MatDuplicate_MPIAIJ"
1948 int MatDuplicate_MPIAIJ(Mat matin,MatDuplicateOption cpvalues,Mat *newmat)
1949 {
1950   Mat        mat;
1951   Mat_MPIAIJ *a,*oldmat = (Mat_MPIAIJ *) matin->data;
1952   int        ierr, len=0, flg;
1953 
1954   PetscFunctionBegin;
1955   *newmat       = 0;
1956   PetscHeaderCreate(mat,_p_Mat,struct _MatOps,MAT_COOKIE,MATMPIAIJ,"Mat",matin->comm,MatDestroy,MatView);
1957   PLogObjectCreate(mat);
1958   mat->data       = (void *) (a = PetscNew(Mat_MPIAIJ)); CHKPTRQ(a);
1959   PetscMemcpy(mat->ops,&MatOps_Values,sizeof(struct _MatOps));
1960   mat->ops->destroy = MatDestroy_MPIAIJ;
1961   mat->ops->view    = MatView_MPIAIJ;
1962   mat->factor       = matin->factor;
1963   mat->assembled    = PETSC_TRUE;
1964   mat->insertmode   = NOT_SET_VALUES;
1965 
1966   a->m = mat->m   = oldmat->m;
1967   a->n = mat->n   = oldmat->n;
1968   a->M = mat->M   = oldmat->M;
1969   a->N = mat->N   = oldmat->N;
1970 
1971   a->rstart       = oldmat->rstart;
1972   a->rend         = oldmat->rend;
1973   a->cstart       = oldmat->cstart;
1974   a->cend         = oldmat->cend;
1975   a->size         = oldmat->size;
1976   a->rank         = oldmat->rank;
1977   a->donotstash   = oldmat->donotstash;
1978   a->roworiented  = oldmat->roworiented;
1979   a->rowindices   = 0;
1980   a->rowvalues    = 0;
1981   a->getrowactive = PETSC_FALSE;
1982 
1983   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1984   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIAIJ));
1985   a->cowners = a->rowners + a->size + 2;
1986   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1987   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1988   if (oldmat->colmap) {
1989 #if defined (USE_CTABLE)
1990     ierr = TableCreateCopy( &a->colmap, oldmat->colmap ); CHKERRQ(ierr);
1991 #else
1992     a->colmap = (int *) PetscMalloc((a->N)*sizeof(int));CHKPTRQ(a->colmap);
1993     PLogObjectMemory(mat,(a->N)*sizeof(int));
1994     PetscMemcpy(a->colmap,oldmat->colmap,(a->N)*sizeof(int));
1995 #endif
1996   } else a->colmap = 0;
1997   if (oldmat->garray) {
1998     len = ((Mat_SeqAIJ *) (oldmat->B->data))->n;
1999     a->garray = (int *) PetscMalloc((len+1)*sizeof(int)); CHKPTRQ(a->garray);
2000     PLogObjectMemory(mat,len*sizeof(int));
2001     if (len) PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
2002   } else a->garray = 0;
2003 
2004   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
2005   PLogObjectParent(mat,a->lvec);
2006   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
2007   PLogObjectParent(mat,a->Mvctx);
2008   ierr =  MatDuplicate(oldmat->A,cpvalues,&a->A); CHKERRQ(ierr);
2009   PLogObjectParent(mat,a->A);
2010   ierr =  MatDuplicate(oldmat->B,cpvalues,&a->B); CHKERRQ(ierr);
2011   PLogObjectParent(mat,a->B);
2012   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
2013   if (flg) {
2014     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
2015   }
2016   ierr = FListDuplicate(matin->qlist,&mat->qlist);CHKERRQ(ierr);
2017   *newmat = mat;
2018   PetscFunctionReturn(0);
2019 }
2020 
2021 #include "sys.h"
2022 
2023 #undef __FUNC__
2024 #define __FUNC__ "MatLoad_MPIAIJ"
2025 int MatLoad_MPIAIJ(Viewer viewer,MatType type,Mat *newmat)
2026 {
2027   Mat          A;
2028   Scalar       *vals,*svals;
2029   MPI_Comm     comm = ((PetscObject)viewer)->comm;
2030   MPI_Status   status;
2031   int          i, nz, ierr, j,rstart, rend, fd;
2032   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,maxnz,*cols;
2033   int          *ourlens,*sndcounts = 0,*procsnz = 0, *offlens,jj,*mycols,*smycols;
2034   int          tag = ((PetscObject)viewer)->tag,cend,cstart,n;
2035 
2036   PetscFunctionBegin;
2037   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
2038   if (!rank) {
2039     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
2040     ierr = PetscBinaryRead(fd,(char *)header,4,PETSC_INT); CHKERRQ(ierr);
2041     if (header[0] != MAT_COOKIE) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"not matrix object");
2042     if (header[3] < 0) {
2043       SETERRQ(PETSC_ERR_FILE_UNEXPECTED,1,"Matrix in special format on disk, cannot load as MPIAIJ");
2044     }
2045   }
2046 
2047   ierr = MPI_Bcast(header+1,3,MPI_INT,0,comm);CHKERRQ(ierr);
2048   M = header[1]; N = header[2];
2049   /* determine ownership of all rows */
2050   m = M/size + ((M % size) > rank);
2051   rowners = (int *) PetscMalloc((size+2)*sizeof(int)); CHKPTRQ(rowners);
2052   ierr = MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
2053   rowners[0] = 0;
2054   for ( i=2; i<=size; i++ ) {
2055     rowners[i] += rowners[i-1];
2056   }
2057   rstart = rowners[rank];
2058   rend   = rowners[rank+1];
2059 
2060   /* distribute row lengths to all processors */
2061   ourlens = (int*) PetscMalloc( 2*(rend-rstart)*sizeof(int) ); CHKPTRQ(ourlens);
2062   offlens = ourlens + (rend-rstart);
2063   if (!rank) {
2064     rowlengths = (int*) PetscMalloc( M*sizeof(int) ); CHKPTRQ(rowlengths);
2065     ierr = PetscBinaryRead(fd,rowlengths,M,PETSC_INT); CHKERRQ(ierr);
2066     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
2067     for ( i=0; i<size; i++ ) sndcounts[i] = rowners[i+1] - rowners[i];
2068     ierr = MPI_Scatterv(rowlengths,sndcounts,rowners,MPI_INT,ourlens,rend-rstart,MPI_INT,0,comm);CHKERRQ(ierr);
2069     PetscFree(sndcounts);
2070   } else {
2071     ierr = MPI_Scatterv(0,0,0,MPI_INT,ourlens,rend-rstart,MPI_INT, 0,comm);CHKERRQ(ierr);
2072   }
2073 
2074   if (!rank) {
2075     /* calculate the number of nonzeros on each processor */
2076     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
2077     PetscMemzero(procsnz,size*sizeof(int));
2078     for ( i=0; i<size; i++ ) {
2079       for ( j=rowners[i]; j< rowners[i+1]; j++ ) {
2080         procsnz[i] += rowlengths[j];
2081       }
2082     }
2083     PetscFree(rowlengths);
2084 
2085     /* determine max buffer needed and allocate it */
2086     maxnz = 0;
2087     for ( i=0; i<size; i++ ) {
2088       maxnz = PetscMax(maxnz,procsnz[i]);
2089     }
2090     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
2091 
2092     /* read in my part of the matrix column indices  */
2093     nz     = procsnz[0];
2094     mycols = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
2095     ierr   = PetscBinaryRead(fd,mycols,nz,PETSC_INT); CHKERRQ(ierr);
2096 
2097     /* read in every one elses and ship off */
2098     for ( i=1; i<size; i++ ) {
2099       nz   = procsnz[i];
2100       ierr = PetscBinaryRead(fd,cols,nz,PETSC_INT); CHKERRQ(ierr);
2101       ierr = MPI_Send(cols,nz,MPI_INT,i,tag,comm);CHKERRQ(ierr);
2102     }
2103     PetscFree(cols);
2104   } else {
2105     /* determine buffer space needed for message */
2106     nz = 0;
2107     for ( i=0; i<m; i++ ) {
2108       nz += ourlens[i];
2109     }
2110     mycols = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
2111 
2112     /* receive message of column indices*/
2113     ierr = MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);CHKERRQ(ierr);
2114     ierr = MPI_Get_count(&status,MPI_INT,&maxnz);CHKERRQ(ierr);
2115     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"something is wrong with file");
2116   }
2117 
2118   /* determine column ownership if matrix is not square */
2119   if (N != M) {
2120     n      = N/size + ((N % size) > rank);
2121     ierr   = MPI_Scan(&n,&cend,1,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
2122     cstart = cend - n;
2123   } else {
2124     cstart = rstart;
2125     cend   = rend;
2126     n      = cend - cstart;
2127   }
2128 
2129   /* loop over local rows, determining number of off diagonal entries */
2130   PetscMemzero(offlens,m*sizeof(int));
2131   jj = 0;
2132   for ( i=0; i<m; i++ ) {
2133     for ( j=0; j<ourlens[i]; j++ ) {
2134       if (mycols[jj] < cstart || mycols[jj] >= cend) offlens[i]++;
2135       jj++;
2136     }
2137   }
2138 
2139   /* create our matrix */
2140   for ( i=0; i<m; i++ ) {
2141     ourlens[i] -= offlens[i];
2142   }
2143   ierr = MatCreateMPIAIJ(comm,m,n,M,N,0,ourlens,0,offlens,newmat);CHKERRQ(ierr);
2144   A = *newmat;
2145   ierr = MatSetOption(A,MAT_COLUMNS_SORTED); CHKERRQ(ierr);
2146   for ( i=0; i<m; i++ ) {
2147     ourlens[i] += offlens[i];
2148   }
2149 
2150   if (!rank) {
2151     vals = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(vals);
2152 
2153     /* read in my part of the matrix numerical values  */
2154     nz   = procsnz[0];
2155     ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
2156 
2157     /* insert into matrix */
2158     jj      = rstart;
2159     smycols = mycols;
2160     svals   = vals;
2161     for ( i=0; i<m; i++ ) {
2162       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
2163       smycols += ourlens[i];
2164       svals   += ourlens[i];
2165       jj++;
2166     }
2167 
2168     /* read in other processors and ship out */
2169     for ( i=1; i<size; i++ ) {
2170       nz   = procsnz[i];
2171       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
2172       ierr = MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);CHKERRQ(ierr);
2173     }
2174     PetscFree(procsnz);
2175   } else {
2176     /* receive numeric values */
2177     vals = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(vals);
2178 
2179     /* receive message of values*/
2180     ierr = MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);CHKERRQ(ierr);
2181     ierr = MPI_Get_count(&status,MPIU_SCALAR,&maxnz);CHKERRQ(ierr);
2182     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"something is wrong with file");
2183 
2184     /* insert into matrix */
2185     jj      = rstart;
2186     smycols = mycols;
2187     svals   = vals;
2188     for ( i=0; i<m; i++ ) {
2189       ierr     = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
2190       smycols += ourlens[i];
2191       svals   += ourlens[i];
2192       jj++;
2193     }
2194   }
2195   PetscFree(ourlens); PetscFree(vals); PetscFree(mycols); PetscFree(rowners);
2196 
2197   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2198   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2199   PetscFunctionReturn(0);
2200 }
2201 
2202 #undef __FUNC__
2203 #define __FUNC__ "MatGetSubMatrix_MPIAIJ"
2204 /*
2205     Not great since it makes two copies of the submatrix, first an SeqAIJ
2206   in local and then by concatenating the local matrices the end result.
2207   Writing it directly would be much like MatGetSubMatrices_MPIAIJ()
2208 */
2209 int MatGetSubMatrix_MPIAIJ(Mat mat,IS isrow,IS iscol,int csize,MatReuse call,Mat *newmat)
2210 {
2211   int        ierr, i, m,n,rstart,row,rend,nz,*cwork,size,rank,j;
2212   Mat        *local,M, Mreuse;
2213   Scalar     *vwork,*aa;
2214   MPI_Comm   comm = mat->comm;
2215   Mat_SeqAIJ *aij;
2216   int        *ii, *jj,nlocal,*dlens,*olens,dlen,olen,jend;
2217 
2218   PetscFunctionBegin;
2219   MPI_Comm_rank(comm,&rank);
2220   MPI_Comm_size(comm,&size);
2221 
2222   if (call ==  MAT_REUSE_MATRIX) {
2223     ierr = PetscObjectQuery((PetscObject)*newmat,"SubMatrix",(PetscObject *)&Mreuse);CHKERRQ(ierr);
2224     if (!Mreuse) SETERRQ(1,1,"Submatrix passed in was not used before, cannot reuse");
2225     local = &Mreuse;
2226     ierr  = MatGetSubMatrices(mat,1,&isrow,&iscol,MAT_REUSE_MATRIX,&local);CHKERRQ(ierr);
2227   } else {
2228     ierr = MatGetSubMatrices(mat,1,&isrow,&iscol,MAT_INITIAL_MATRIX,&local);CHKERRQ(ierr);
2229     Mreuse = *local;
2230     PetscFree(local);
2231   }
2232 
2233   /*
2234       m - number of local rows
2235       n - number of columns (same on all processors)
2236       rstart - first row in new global matrix generated
2237   */
2238   ierr = MatGetSize(Mreuse,&m,&n);CHKERRQ(ierr);
2239   if (call == MAT_INITIAL_MATRIX) {
2240     aij = (Mat_SeqAIJ *) (Mreuse)->data;
2241     if (aij->indexshift) SETERRQ(PETSC_ERR_SUP,1,"No support for index shifted matrix");
2242     ii  = aij->i;
2243     jj  = aij->j;
2244 
2245     /*
2246         Determine the number of non-zeros in the diagonal and off-diagonal
2247         portions of the matrix in order to do correct preallocation
2248     */
2249 
2250     /* first get start and end of "diagonal" columns */
2251     if (csize == PETSC_DECIDE) {
2252       nlocal = n/size + ((n % size) > rank);
2253     } else {
2254       nlocal = csize;
2255     }
2256     ierr   = MPI_Scan(&nlocal,&rend,1,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
2257     rstart = rend - nlocal;
2258     if (rank == size - 1 && rend != n) {
2259       SETERRQ(1,1,"Local column sizes do not add up to total number of columns");
2260     }
2261 
2262     /* next, compute all the lengths */
2263     dlens = (int *) PetscMalloc( (2*m+1)*sizeof(int) );CHKPTRQ(dlens);
2264     olens = dlens + m;
2265     for ( i=0; i<m; i++ ) {
2266       jend = ii[i+1] - ii[i];
2267       olen = 0;
2268       dlen = 0;
2269       for ( j=0; j<jend; j++ ) {
2270         if ( *jj < rstart || *jj >= rend) olen++;
2271         else dlen++;
2272         jj++;
2273       }
2274       olens[i] = olen;
2275       dlens[i] = dlen;
2276     }
2277     ierr = MatCreateMPIAIJ(comm,m,nlocal,PETSC_DECIDE,n,0,dlens,0,olens,&M);CHKERRQ(ierr);
2278     PetscFree(dlens);
2279   } else {
2280     int ml,nl;
2281 
2282     M = *newmat;
2283     ierr = MatGetLocalSize(M,&ml,&nl);CHKERRQ(ierr);
2284     if (ml != m) SETERRQ(PETSC_ERR_ARG_SIZ,1,"Previous matrix must be same size/layout as request");
2285     ierr = MatZeroEntries(M);CHKERRQ(ierr);
2286     /*
2287          The next two lines are needed so we may call MatSetValues_MPIAIJ() below directly,
2288        rather than the slower MatSetValues().
2289     */
2290     M->was_assembled = PETSC_TRUE;
2291     M->assembled     = PETSC_FALSE;
2292   }
2293   ierr = MatGetOwnershipRange(M,&rstart,&rend); CHKERRQ(ierr);
2294   aij = (Mat_SeqAIJ *) (Mreuse)->data;
2295   if (aij->indexshift) SETERRQ(PETSC_ERR_SUP,1,"No support for index shifted matrix");
2296   ii  = aij->i;
2297   jj  = aij->j;
2298   aa  = aij->a;
2299   for (i=0; i<m; i++) {
2300     row   = rstart + i;
2301     nz    = ii[i+1] - ii[i];
2302     cwork = jj;     jj += nz;
2303     vwork = aa;     aa += nz;
2304     ierr = MatSetValues_MPIAIJ(M,1,&row,nz,cwork,vwork,INSERT_VALUES); CHKERRQ(ierr);
2305   }
2306 
2307   ierr = MatAssemblyBegin(M,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2308   ierr = MatAssemblyEnd(M,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2309   *newmat = M;
2310 
2311   /* save submatrix used in processor for next request */
2312   if (call ==  MAT_INITIAL_MATRIX) {
2313     ierr = PetscObjectCompose((PetscObject)M,"SubMatrix",(PetscObject)Mreuse);CHKERRQ(ierr);
2314     ierr = PetscObjectDereference((PetscObject)Mreuse);CHKERRQ(ierr);
2315   }
2316 
2317   PetscFunctionReturn(0);
2318 }
2319