xref: /petsc/src/mat/impls/aij/mpi/mpiaij.c (revision 5ef9f2a5cf905ed65136deff0c9e7fca368161b7)
1 #ifdef PETSC_RCS_HEADER
2 static char vcid[] = "$Id: mpiaij.c,v 1.274 1999/01/12 23:15:12 bsmith Exp bsmith $";
3 #endif
4 
5 #include "src/mat/impls/aij/mpi/mpiaij.h"
6 #include "src/vec/vecimpl.h"
7 #include "src/inline/spops.h"
8 
9 /* local utility routine that creates a mapping from the global column
10 number to the local number in the off-diagonal part of the local
11 storage of the matrix.  This is done in a non scable way since the
12 length of colmap equals the global matrix length.
13 */
14 #undef __FUNC__
15 #define __FUNC__ "CreateColmap_MPIAIJ_Private"
16 int CreateColmap_MPIAIJ_Private(Mat mat)
17 {
18   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
19   Mat_SeqAIJ *B = (Mat_SeqAIJ*) aij->B->data;
20   int        n = B->n,i;
21 #if defined (USE_CTABLE)
22   int        ierr;
23 #endif
24 
25   PetscFunctionBegin;
26 #if defined (USE_CTABLE)
27   ierr = TableCreate( &aij->colmap, aij->n/5 ); CHKERRQ(ierr);
28   for ( i=0; i<n; i++ ){
29     ierr = TableAdd( aij->colmap, aij->garray[i] + 1, i+1 );CHKERRQ(ierr);
30   }
31 #else
32   aij->colmap = (int *) PetscMalloc((aij->N+1)*sizeof(int));CHKPTRQ(aij->colmap);
33   PLogObjectMemory(mat,aij->N*sizeof(int));
34   PetscMemzero(aij->colmap,aij->N*sizeof(int));
35   for ( i=0; i<n; i++ ) aij->colmap[aij->garray[i]] = i+1;
36 #endif
37   PetscFunctionReturn(0);
38 }
39 
40 extern int DisAssemble_MPIAIJ(Mat);
41 
42 #define CHUNKSIZE   15
43 #define MatSetValues_SeqAIJ_A_Private(row,col,value,addv) \
44 { \
45  \
46     rp   = aj + ai[row] + shift; ap = aa + ai[row] + shift; \
47     rmax = aimax[row]; nrow = ailen[row];  \
48     col1 = col - shift; \
49      \
50     low = 0; high = nrow; \
51     while (high-low > 5) { \
52       t = (low+high)/2; \
53       if (rp[t] > col) high = t; \
54       else             low  = t; \
55     } \
56       for ( _i=0; _i<nrow; _i++ ) { \
57         if (rp[_i] > col1) break; \
58         if (rp[_i] == col1) { \
59           if (addv == ADD_VALUES) ap[_i] += value;   \
60           else                  ap[_i] = value; \
61           goto a_noinsert; \
62         } \
63       }  \
64       if (nonew == 1) goto a_noinsert; \
65       else if (nonew == -1) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero into matrix"); \
66       if (nrow >= rmax) { \
67         /* there is no extra room in row, therefore enlarge */ \
68         int    new_nz = ai[a->m] + CHUNKSIZE,len,*new_i,*new_j; \
69         Scalar *new_a; \
70  \
71         if (nonew == -2) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero in the matrix"); \
72  \
73         /* malloc new storage space */ \
74         len     = new_nz*(sizeof(int)+sizeof(Scalar))+(a->m+1)*sizeof(int); \
75         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
76         new_j   = (int *) (new_a + new_nz); \
77         new_i   = new_j + new_nz; \
78  \
79         /* copy over old data into new slots */ \
80         for ( ii=0; ii<row+1; ii++ ) {new_i[ii] = ai[ii];} \
81         for ( ii=row+1; ii<a->m+1; ii++ ) {new_i[ii] = ai[ii]+CHUNKSIZE;} \
82         PetscMemcpy(new_j,aj,(ai[row]+nrow+shift)*sizeof(int)); \
83         len = (new_nz - CHUNKSIZE - ai[row] - nrow - shift); \
84         PetscMemcpy(new_j+ai[row]+shift+nrow+CHUNKSIZE,aj+ai[row]+shift+nrow, \
85                                                            len*sizeof(int)); \
86         PetscMemcpy(new_a,aa,(ai[row]+nrow+shift)*sizeof(Scalar)); \
87         PetscMemcpy(new_a+ai[row]+shift+nrow+CHUNKSIZE,aa+ai[row]+shift+nrow, \
88                                                            len*sizeof(Scalar));  \
89         /* free up old matrix storage */ \
90  \
91         PetscFree(a->a);  \
92         if (!a->singlemalloc) {PetscFree(a->i);PetscFree(a->j);} \
93         aa = a->a = new_a; ai = a->i = new_i; aj = a->j = new_j;  \
94         a->singlemalloc = 1; \
95  \
96         rp   = aj + ai[row] + shift; ap = aa + ai[row] + shift; \
97         rmax = aimax[row] = aimax[row] + CHUNKSIZE; \
98         PLogObjectMemory(A,CHUNKSIZE*(sizeof(int) + sizeof(Scalar))); \
99         a->maxnz += CHUNKSIZE; \
100         a->reallocs++; \
101       } \
102       N = nrow++ - 1; a->nz++; \
103       /* shift up all the later entries in this row */ \
104       for ( ii=N; ii>=_i; ii-- ) { \
105         rp[ii+1] = rp[ii]; \
106         ap[ii+1] = ap[ii]; \
107       } \
108       rp[_i] = col1;  \
109       ap[_i] = value;  \
110       a_noinsert: ; \
111       ailen[row] = nrow; \
112 }
113 
114 #define MatSetValues_SeqAIJ_B_Private(row,col,value,addv) \
115 { \
116  \
117     rp   = bj + bi[row] + shift; ap = ba + bi[row] + shift; \
118     rmax = bimax[row]; nrow = bilen[row];  \
119     col1 = col - shift; \
120      \
121     low = 0; high = nrow; \
122     while (high-low > 5) { \
123       t = (low+high)/2; \
124       if (rp[t] > col) high = t; \
125       else             low  = t; \
126     } \
127        for ( _i=0; _i<nrow; _i++ ) { \
128         if (rp[_i] > col1) break; \
129         if (rp[_i] == col1) { \
130           if (addv == ADD_VALUES) ap[_i] += value;   \
131           else                  ap[_i] = value; \
132           goto b_noinsert; \
133         } \
134       }  \
135       if (nonew == 1) goto b_noinsert; \
136       else if (nonew == -1) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero into matrix"); \
137       if (nrow >= rmax) { \
138         /* there is no extra room in row, therefore enlarge */ \
139         int    new_nz = bi[b->m] + CHUNKSIZE,len,*new_i,*new_j; \
140         Scalar *new_a; \
141  \
142         if (nonew == -2) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero in the matrix"); \
143  \
144         /* malloc new storage space */ \
145         len     = new_nz*(sizeof(int)+sizeof(Scalar))+(b->m+1)*sizeof(int); \
146         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
147         new_j   = (int *) (new_a + new_nz); \
148         new_i   = new_j + new_nz; \
149  \
150         /* copy over old data into new slots */ \
151         for ( ii=0; ii<row+1; ii++ ) {new_i[ii] = bi[ii];} \
152         for ( ii=row+1; ii<b->m+1; ii++ ) {new_i[ii] = bi[ii]+CHUNKSIZE;} \
153         PetscMemcpy(new_j,bj,(bi[row]+nrow+shift)*sizeof(int)); \
154         len = (new_nz - CHUNKSIZE - bi[row] - nrow - shift); \
155         PetscMemcpy(new_j+bi[row]+shift+nrow+CHUNKSIZE,bj+bi[row]+shift+nrow, \
156                                                            len*sizeof(int)); \
157         PetscMemcpy(new_a,ba,(bi[row]+nrow+shift)*sizeof(Scalar)); \
158         PetscMemcpy(new_a+bi[row]+shift+nrow+CHUNKSIZE,ba+bi[row]+shift+nrow, \
159                                                            len*sizeof(Scalar));  \
160         /* free up old matrix storage */ \
161  \
162         PetscFree(b->a);  \
163         if (!b->singlemalloc) {PetscFree(b->i);PetscFree(b->j);} \
164         ba = b->a = new_a; bi = b->i = new_i; bj = b->j = new_j;  \
165         b->singlemalloc = 1; \
166  \
167         rp   = bj + bi[row] + shift; ap = ba + bi[row] + shift; \
168         rmax = bimax[row] = bimax[row] + CHUNKSIZE; \
169         PLogObjectMemory(B,CHUNKSIZE*(sizeof(int) + sizeof(Scalar))); \
170         b->maxnz += CHUNKSIZE; \
171         b->reallocs++; \
172       } \
173       N = nrow++ - 1; b->nz++; \
174       /* shift up all the later entries in this row */ \
175       for ( ii=N; ii>=_i; ii-- ) { \
176         rp[ii+1] = rp[ii]; \
177         ap[ii+1] = ap[ii]; \
178       } \
179       rp[_i] = col1;  \
180       ap[_i] = value;  \
181       b_noinsert: ; \
182       bilen[row] = nrow; \
183 }
184 
185 extern int MatSetValues_SeqAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
186 #undef __FUNC__
187 #define __FUNC__ "MatSetValues_MPIAIJ"
188 int MatSetValues_MPIAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
189 {
190   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
191   Scalar     value;
192   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
193   int        cstart = aij->cstart, cend = aij->cend,row,col;
194   int        roworiented = aij->roworiented;
195 
196   /* Some Variables required in the macro */
197   Mat        A = aij->A;
198   Mat_SeqAIJ *a = (Mat_SeqAIJ *) A->data;
199   int        *aimax = a->imax, *ai = a->i, *ailen = a->ilen,*aj = a->j;
200   Scalar     *aa = a->a;
201 
202   Mat        B = aij->B;
203   Mat_SeqAIJ *b = (Mat_SeqAIJ *) B->data;
204   int        *bimax = b->imax, *bi = b->i, *bilen = b->ilen,*bj = b->j;
205   Scalar     *ba = b->a;
206 
207   int        *rp,ii,nrow,_i,rmax, N, col1,low,high,t;
208   int        nonew = a->nonew,shift = a->indexshift;
209   Scalar     *ap;
210 
211   PetscFunctionBegin;
212   for ( i=0; i<m; i++ ) {
213     if (im[i] < 0) continue;
214 #if defined(USE_PETSC_BOPT_g)
215     if (im[i] >= aij->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
216 #endif
217     if (im[i] >= rstart && im[i] < rend) {
218       row = im[i] - rstart;
219       for ( j=0; j<n; j++ ) {
220         if (in[j] >= cstart && in[j] < cend){
221           col = in[j] - cstart;
222           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
223           MatSetValues_SeqAIJ_A_Private(row,col,value,addv);
224           /* ierr = MatSetValues_SeqAIJ(aij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
225         }
226         else if (in[j] < 0) continue;
227 #if defined(USE_PETSC_BOPT_g)
228         else if (in[j] >= aij->N) {SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Column too large");}
229 #endif
230         else {
231           if (mat->was_assembled) {
232             if (!aij->colmap) {
233               ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
234             }
235 #if defined (USE_CTABLE)
236 	    col = TableFind( aij->colmap, in[j] + 1 ) - 1;
237 #else
238             col = aij->colmap[in[j]] - 1;
239 #endif
240             if (col < 0 && !((Mat_SeqAIJ*)(aij->A->data))->nonew) {
241               ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
242               col =  in[j];
243               /* Reinitialize the variables required by MatSetValues_SeqAIJ_B_Private() */
244               B = aij->B;
245               b = (Mat_SeqAIJ *) B->data;
246               bimax = b->imax; bi = b->i; bilen = b->ilen; bj = b->j;
247               ba = b->a;
248             }
249           } else col = in[j];
250           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
251           MatSetValues_SeqAIJ_B_Private(row,col,value,addv);
252           /* ierr = MatSetValues_SeqAIJ(aij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
253         }
254       }
255     } else {
256       if (roworiented && !aij->donotstash) {
257         ierr = StashValues_Private(&aij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
258       } else {
259         if (!aij->donotstash) {
260           row = im[i];
261           for ( j=0; j<n; j++ ) {
262             ierr = StashValues_Private(&aij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
263           }
264         }
265       }
266     }
267   }
268   PetscFunctionReturn(0);
269 }
270 
271 #undef __FUNC__
272 #define __FUNC__ "MatGetValues_MPIAIJ"
273 int MatGetValues_MPIAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
274 {
275   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
276   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
277   int        cstart = aij->cstart, cend = aij->cend,row,col;
278 
279   PetscFunctionBegin;
280   for ( i=0; i<m; i++ ) {
281     if (idxm[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
282     if (idxm[i] >= aij->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
283     if (idxm[i] >= rstart && idxm[i] < rend) {
284       row = idxm[i] - rstart;
285       for ( j=0; j<n; j++ ) {
286         if (idxn[j] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative column");
287         if (idxn[j] >= aij->N) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Column too large");
288         if (idxn[j] >= cstart && idxn[j] < cend){
289           col = idxn[j] - cstart;
290           ierr = MatGetValues(aij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
291         } else {
292           if (!aij->colmap) {
293             ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
294           }
295 #if defined (USE_CTABLE)
296 	  col = TableFind( aij->colmap, idxn[j] + 1 ) - 1;
297 #else
298           col = aij->colmap[idxn[j]] - 1;
299 #endif
300           if ((col < 0) || (aij->garray[col] != idxn[j])) *(v+i*n+j) = 0.0;
301           else {
302             ierr = MatGetValues(aij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
303           }
304         }
305       }
306     } else {
307       SETERRQ(PETSC_ERR_SUP,0,"Only local values currently supported");
308     }
309   }
310   PetscFunctionReturn(0);
311 }
312 
313 #undef __FUNC__
314 #define __FUNC__ "MatAssemblyBegin_MPIAIJ"
315 int MatAssemblyBegin_MPIAIJ(Mat mat,MatAssemblyType mode)
316 {
317   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
318   MPI_Comm    comm = mat->comm;
319   int         size = aij->size, *owners = aij->rowners;
320   int         rank = aij->rank,tag = mat->tag, *owner,*starts,count,ierr;
321   MPI_Request *send_waits,*recv_waits;
322   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
323   InsertMode  addv;
324   Scalar      *rvalues,*svalues;
325 
326   PetscFunctionBegin;
327   if (aij->donotstash) {
328     aij->svalues    = 0; aij->rvalues    = 0;
329     aij->nsends     = 0; aij->nrecvs     = 0;
330     aij->send_waits = 0; aij->recv_waits = 0;
331     aij->rmax       = 0;
332     PetscFunctionReturn(0);
333   }
334 
335   /* make sure all processors are either in INSERTMODE or ADDMODE */
336   ierr = MPI_Allreduce(&mat->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);CHKERRQ(ierr);
337   if (addv == (ADD_VALUES|INSERT_VALUES)) {
338     SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"Some processors inserted others added");
339   }
340   mat->insertmode = addv; /* in case this processor had no cache */
341 
342   /*  first count number of contributors to each processor */
343   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
344   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
345   owner = (int *) PetscMalloc( (aij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
346   for ( i=0; i<aij->stash.n; i++ ) {
347     idx = aij->stash.idx[i];
348     for ( j=0; j<size; j++ ) {
349       if (idx >= owners[j] && idx < owners[j+1]) {
350         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
351       }
352     }
353   }
354   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
355 
356   /* inform other processors of number of messages and max length*/
357   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
358   ierr = MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
359   nreceives = work[rank];
360   ierr = MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);CHKERRQ(ierr);
361   nmax = work[rank];
362   PetscFree(work);
363 
364   /* post receives:
365        1) each message will consist of ordered pairs
366      (global index,value) we store the global index as a double
367      to simplify the message passing.
368        2) since we don't know how long each individual message is we
369      allocate the largest needed buffer for each receive. Potentially
370      this is a lot of wasted space.
371 
372 
373        This could be done better.
374   */
375   rvalues    = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));CHKPTRQ(rvalues);
376   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
377   for ( i=0; i<nreceives; i++ ) {
378     ierr = MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
379               comm,recv_waits+i);CHKERRQ(ierr);
380   }
381 
382   /* do sends:
383       1) starts[i] gives the starting index in svalues for stuff going to
384          the ith processor
385   */
386   svalues    = (Scalar *) PetscMalloc(3*(aij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
387   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
388   starts     = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
389   starts[0]  = 0;
390   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
391   for ( i=0; i<aij->stash.n; i++ ) {
392     svalues[3*starts[owner[i]]]       = (Scalar)  aij->stash.idx[i];
393     svalues[3*starts[owner[i]]+1]     = (Scalar)  aij->stash.idy[i];
394     svalues[3*(starts[owner[i]]++)+2] =  aij->stash.array[i];
395   }
396   PetscFree(owner);
397   starts[0] = 0;
398   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
399   count = 0;
400   for ( i=0; i<size; i++ ) {
401     if (procs[i]) {
402       ierr = MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,comm,send_waits+count++);CHKERRQ(ierr);
403     }
404   }
405   PetscFree(starts);
406   PetscFree(nprocs);
407 
408   /* Free cache space */
409   PLogInfo(aij->A,"MatAssemblyBegin_MPIAIJ:Number of off-processor values %d\n",aij->stash.n);
410   ierr = StashDestroy_Private(&aij->stash); CHKERRQ(ierr);
411 
412   aij->svalues    = svalues;    aij->rvalues    = rvalues;
413   aij->nsends     = nsends;     aij->nrecvs     = nreceives;
414   aij->send_waits = send_waits; aij->recv_waits = recv_waits;
415   aij->rmax       = nmax;
416 
417   PetscFunctionReturn(0);
418 }
419 extern int MatSetUpMultiply_MPIAIJ(Mat);
420 
421 #undef __FUNC__
422 #define __FUNC__ "MatAssemblyEnd_MPIAIJ"
423 int MatAssemblyEnd_MPIAIJ(Mat mat,MatAssemblyType mode)
424 {
425   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
426   MPI_Status  *send_status,recv_status;
427   int         imdex,nrecvs = aij->nrecvs, count = nrecvs, i, n, ierr;
428   int         row,col,other_disassembled;
429   Scalar      *values,val;
430   InsertMode  addv = mat->insertmode;
431 
432   PetscFunctionBegin;
433 
434   /*  wait on receives */
435   while (count) {
436     ierr = MPI_Waitany(nrecvs,aij->recv_waits,&imdex,&recv_status);CHKERRQ(ierr);
437     /* unpack receives into our local space */
438     values = aij->rvalues + 3*imdex*aij->rmax;
439     ierr = MPI_Get_count(&recv_status,MPIU_SCALAR,&n);CHKERRQ(ierr);
440     n = n/3;
441     for ( i=0; i<n; i++ ) {
442       row = (int) PetscReal(values[3*i]) - aij->rstart;
443       col = (int) PetscReal(values[3*i+1]);
444       val = values[3*i+2];
445       if (col >= aij->cstart && col < aij->cend) {
446         col -= aij->cstart;
447         ierr = MatSetValues(aij->A,1,&row,1,&col,&val,addv); CHKERRQ(ierr);
448       } else {
449         if (mat->was_assembled || mat->assembled) {
450           if (!aij->colmap) {
451             ierr = CreateColmap_MPIAIJ_Private(mat); CHKERRQ(ierr);
452           }
453 #if defined (USE_CTABLE)
454           col = TableFind( aij->colmap, col + 1 ) - 1;
455 #else
456           col = aij->colmap[col] - 1;
457 #endif
458           if (col < 0  && !((Mat_SeqAIJ*)(aij->A->data))->nonew) {
459             ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
460             col = (int) PetscReal(values[3*i+1]);
461           }
462         }
463         ierr = MatSetValues(aij->B,1,&row,1,&col,&val,addv); CHKERRQ(ierr);
464       }
465     }
466     count--;
467   }
468   if (aij->recv_waits) PetscFree(aij->recv_waits);
469   if (aij->rvalues)    PetscFree(aij->rvalues);
470 
471   /* wait on sends */
472   if (aij->nsends) {
473     send_status = (MPI_Status *) PetscMalloc(aij->nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
474     ierr        = MPI_Waitall(aij->nsends,aij->send_waits,send_status);CHKERRQ(ierr);
475     PetscFree(send_status);
476   }
477   if (aij->send_waits) PetscFree(aij->send_waits);
478   if (aij->svalues)    PetscFree(aij->svalues);
479 
480   ierr = MatAssemblyBegin(aij->A,mode); CHKERRQ(ierr);
481   ierr = MatAssemblyEnd(aij->A,mode); CHKERRQ(ierr);
482 
483   /* determine if any processor has disassembled, if so we must
484      also disassemble ourselfs, in order that we may reassemble. */
485   /*
486      if nonzero structure of submatrix B cannot change then we know that
487      no processor disassembled thus we can skip this stuff
488   */
489   if (!((Mat_SeqAIJ*) aij->B->data)->nonew)  {
490     ierr = MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);CHKERRQ(ierr);
491     if (mat->was_assembled && !other_disassembled) {
492       ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
493     }
494   }
495 
496   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
497     ierr = MatSetUpMultiply_MPIAIJ(mat); CHKERRQ(ierr);
498   }
499   ierr = MatAssemblyBegin(aij->B,mode); CHKERRQ(ierr);
500   ierr = MatAssemblyEnd(aij->B,mode); CHKERRQ(ierr);
501 
502   if (aij->rowvalues) {PetscFree(aij->rowvalues); aij->rowvalues = 0;}
503   PetscFunctionReturn(0);
504 }
505 
506 #undef __FUNC__
507 #define __FUNC__ "MatZeroEntries_MPIAIJ"
508 int MatZeroEntries_MPIAIJ(Mat A)
509 {
510   Mat_MPIAIJ *l = (Mat_MPIAIJ *) A->data;
511   int        ierr;
512 
513   PetscFunctionBegin;
514   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
515   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
516   PetscFunctionReturn(0);
517 }
518 
519 #undef __FUNC__
520 #define __FUNC__ "MatZeroRows_MPIAIJ"
521 int MatZeroRows_MPIAIJ(Mat A,IS is,Scalar *diag)
522 {
523   Mat_MPIAIJ     *l = (Mat_MPIAIJ *) A->data;
524   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
525   int            *procs,*nprocs,j,found,idx,nsends,*work,row;
526   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
527   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
528   int            *lens,imdex,*lrows,*values,rstart=l->rstart;
529   MPI_Comm       comm = A->comm;
530   MPI_Request    *send_waits,*recv_waits;
531   MPI_Status     recv_status,*send_status;
532   IS             istmp;
533   PetscTruth     localdiag;
534 
535   PetscFunctionBegin;
536   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
537   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
538 
539   /*  first count number of contributors to each processor */
540   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
541   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
542   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
543   for ( i=0; i<N; i++ ) {
544     idx = rows[i];
545     found = 0;
546     for ( j=0; j<size; j++ ) {
547       if (idx >= owners[j] && idx < owners[j+1]) {
548         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
549       }
550     }
551     if (!found) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Index out of range");
552   }
553   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
554 
555   /* inform other processors of number of messages and max length*/
556   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
557   ierr = MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
558   nrecvs = work[rank];
559   ierr = MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);CHKERRQ(ierr);
560   nmax = work[rank];
561   PetscFree(work);
562 
563   /* post receives:   */
564   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int));CHKPTRQ(rvalues);
565   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
566   for ( i=0; i<nrecvs; i++ ) {
567     ierr = MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);CHKERRQ(ierr);
568   }
569 
570   /* do sends:
571       1) starts[i] gives the starting index in svalues for stuff going to
572          the ith processor
573   */
574   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
575   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
576   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
577   starts[0] = 0;
578   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
579   for ( i=0; i<N; i++ ) {
580     svalues[starts[owner[i]]++] = rows[i];
581   }
582   ISRestoreIndices(is,&rows);
583 
584   starts[0] = 0;
585   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
586   count = 0;
587   for ( i=0; i<size; i++ ) {
588     if (procs[i]) {
589       ierr = MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);CHKERRQ(ierr);
590     }
591   }
592   PetscFree(starts);
593 
594   base = owners[rank];
595 
596   /*  wait on receives */
597   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
598   source = lens + nrecvs;
599   count  = nrecvs; slen = 0;
600   while (count) {
601     ierr = MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);CHKERRQ(ierr);
602     /* unpack receives into our local space */
603     ierr = MPI_Get_count(&recv_status,MPI_INT,&n);CHKERRQ(ierr);
604     source[imdex]  = recv_status.MPI_SOURCE;
605     lens[imdex]  = n;
606     slen += n;
607     count--;
608   }
609   PetscFree(recv_waits);
610 
611   /* move the data into the send scatter */
612   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
613   count = 0;
614   for ( i=0; i<nrecvs; i++ ) {
615     values = rvalues + i*nmax;
616     for ( j=0; j<lens[i]; j++ ) {
617       lrows[count++] = values[j] - base;
618     }
619   }
620   PetscFree(rvalues); PetscFree(lens);
621   PetscFree(owner); PetscFree(nprocs);
622 
623   /* actually zap the local rows */
624   ierr = ISCreateGeneral(PETSC_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
625   PLogObjectParent(A,istmp);
626 
627   /*
628         Zero the required rows. If the "diagonal block" of the matrix
629      is square and the user wishes to set the diagonal we use seperate
630      code so that MatSetValues() is not called for each diagonal allocating
631      new memory, thus calling lots of mallocs and slowing things down.
632 
633        Contributed by: Mathew Knepley
634   */
635   localdiag = PETSC_FALSE;
636   if (diag && (l->A->M == l->A->N)) {
637     localdiag = PETSC_TRUE;
638     ierr      = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
639   } else {
640     ierr = MatZeroRows(l->A,istmp,0); CHKERRQ(ierr);
641   }
642   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
643   ierr = ISDestroy(istmp); CHKERRQ(ierr);
644 
645   if (diag && (localdiag == PETSC_FALSE)) {
646     for ( i = 0; i < slen; i++ ) {
647       row = lrows[i] + rstart;
648       MatSetValues(A,1,&row,1,&row,diag,INSERT_VALUES);
649     }
650     MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY);
651     MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY);
652   }
653 
654   if (diag) {
655     for ( i = 0; i < slen; i++ ) {
656       row = lrows[i] + rstart;
657       MatSetValues(A,1,&row,1,&row,diag,INSERT_VALUES);
658     }
659     MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY);
660     MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY);
661   }
662   PetscFree(lrows);
663 
664   /* wait on sends */
665   if (nsends) {
666     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
667     ierr        = MPI_Waitall(nsends,send_waits,send_status);CHKERRQ(ierr);
668     PetscFree(send_status);
669   }
670   PetscFree(send_waits); PetscFree(svalues);
671 
672   PetscFunctionReturn(0);
673 }
674 
675 #undef __FUNC__
676 #define __FUNC__ "MatMult_MPIAIJ"
677 int MatMult_MPIAIJ(Mat A,Vec xx,Vec yy)
678 {
679   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
680   int        ierr,nt;
681 
682   PetscFunctionBegin;
683   ierr = VecGetLocalSize(xx,&nt);  CHKERRQ(ierr);
684   if (nt != a->n) {
685     SETERRQ(PETSC_ERR_ARG_SIZ,0,"Incompatible partition of A and xx");
686   }
687   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx); CHKERRQ(ierr);
688   ierr = (*a->A->ops->mult)(a->A,xx,yy); CHKERRQ(ierr);
689   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx); CHKERRQ(ierr);
690   ierr = (*a->B->ops->multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
691   PetscFunctionReturn(0);
692 }
693 
694 #undef __FUNC__
695 #define __FUNC__ "MatMultAdd_MPIAIJ"
696 int MatMultAdd_MPIAIJ(Mat A,Vec xx,Vec yy,Vec zz)
697 {
698   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
699   int        ierr;
700 
701   PetscFunctionBegin;
702   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
703   ierr = (*a->A->ops->multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
704   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
705   ierr = (*a->B->ops->multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
706   PetscFunctionReturn(0);
707 }
708 
709 #undef __FUNC__
710 #define __FUNC__ "MatMultTrans_MPIAIJ"
711 int MatMultTrans_MPIAIJ(Mat A,Vec xx,Vec yy)
712 {
713   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
714   int        ierr;
715 
716   PetscFunctionBegin;
717   /* do nondiagonal part */
718   ierr = (*a->B->ops->multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
719   /* send it on its way */
720   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
721   /* do local part */
722   ierr = (*a->A->ops->multtrans)(a->A,xx,yy); CHKERRQ(ierr);
723   /* receive remote parts: note this assumes the values are not actually */
724   /* inserted in yy until the next line, which is true for my implementation*/
725   /* but is not perhaps always true. */
726   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
727   PetscFunctionReturn(0);
728 }
729 
730 #undef __FUNC__
731 #define __FUNC__ "MatMultTransAdd_MPIAIJ"
732 int MatMultTransAdd_MPIAIJ(Mat A,Vec xx,Vec yy,Vec zz)
733 {
734   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
735   int        ierr;
736 
737   PetscFunctionBegin;
738   /* do nondiagonal part */
739   ierr = (*a->B->ops->multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
740   /* send it on its way */
741   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
742   /* do local part */
743   ierr = (*a->A->ops->multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
744   /* receive remote parts: note this assumes the values are not actually */
745   /* inserted in yy until the next line, which is true for my implementation*/
746   /* but is not perhaps always true. */
747   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
748   PetscFunctionReturn(0);
749 }
750 
751 /*
752   This only works correctly for square matrices where the subblock A->A is the
753    diagonal block
754 */
755 #undef __FUNC__
756 #define __FUNC__ "MatGetDiagonal_MPIAIJ"
757 int MatGetDiagonal_MPIAIJ(Mat A,Vec v)
758 {
759   int        ierr;
760   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
761 
762   PetscFunctionBegin;
763   if (a->M != a->N) SETERRQ(PETSC_ERR_SUP,0,"Supports only square matrix where A->A is diag block");
764   if (a->rstart != a->cstart || a->rend != a->cend) {
765     SETERRQ(PETSC_ERR_ARG_SIZ,0,"row partition must equal col partition");
766   }
767   ierr = MatGetDiagonal(a->A,v);CHKERRQ(ierr);
768   PetscFunctionReturn(0);
769 }
770 
771 #undef __FUNC__
772 #define __FUNC__ "MatScale_MPIAIJ"
773 int MatScale_MPIAIJ(Scalar *aa,Mat A)
774 {
775   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
776   int        ierr;
777 
778   PetscFunctionBegin;
779   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
780   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
781   PetscFunctionReturn(0);
782 }
783 
784 #undef __FUNC__
785 #define __FUNC__ "MatDestroy_MPIAIJ"
786 int MatDestroy_MPIAIJ(Mat mat)
787 {
788   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
789   int        ierr;
790 
791   PetscFunctionBegin;
792   if (--mat->refct > 0) PetscFunctionReturn(0);
793 
794   if (mat->mapping) {
795     ierr = ISLocalToGlobalMappingDestroy(mat->mapping); CHKERRQ(ierr);
796   }
797   if (mat->bmapping) {
798     ierr = ISLocalToGlobalMappingDestroy(mat->bmapping); CHKERRQ(ierr);
799   }
800   if (mat->rmap) {
801     ierr = MapDestroy(mat->rmap);CHKERRQ(ierr);
802   }
803   if (mat->cmap) {
804     ierr = MapDestroy(mat->cmap);CHKERRQ(ierr);
805   }
806 #if defined(USE_PETSC_LOG)
807   PLogObjectState((PetscObject)mat,"Rows=%d, Cols=%d",aij->M,aij->N);
808 #endif
809   ierr = StashDestroy_Private(&aij->stash); CHKERRQ(ierr);
810   PetscFree(aij->rowners);
811   ierr = MatDestroy(aij->A); CHKERRQ(ierr);
812   ierr = MatDestroy(aij->B); CHKERRQ(ierr);
813 #if defined (USE_CTABLE)
814   if (aij->colmap) TableDelete(aij->colmap);
815 #else
816   if (aij->colmap) PetscFree(aij->colmap);
817 #endif
818   if (aij->garray) PetscFree(aij->garray);
819   if (aij->lvec)   VecDestroy(aij->lvec);
820   if (aij->Mvctx)  VecScatterDestroy(aij->Mvctx);
821   if (aij->rowvalues) PetscFree(aij->rowvalues);
822   PetscFree(aij);
823   PLogObjectDestroy(mat);
824   PetscHeaderDestroy(mat);
825   PetscFunctionReturn(0);
826 }
827 
828 #undef __FUNC__
829 #define __FUNC__ "MatView_MPIAIJ_Binary"
830 extern int MatView_MPIAIJ_Binary(Mat mat,Viewer viewer)
831 {
832   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
833   int         ierr;
834 
835   PetscFunctionBegin;
836   if (aij->size == 1) {
837     ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
838   }
839   else SETERRQ(PETSC_ERR_SUP,0,"Only uniprocessor output supported");
840   PetscFunctionReturn(0);
841 }
842 
843 #undef __FUNC__
844 #define __FUNC__ "MatView_MPIAIJ_ASCIIorDraworSocket"
845 extern int MatView_MPIAIJ_ASCIIorDraworSocket(Mat mat,Viewer viewer)
846 {
847   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
848   Mat_SeqAIJ* C = (Mat_SeqAIJ*)aij->A->data;
849   int         ierr, format,shift = C->indexshift,rank = aij->rank ;
850   int         size = aij->size;
851   FILE        *fd;
852   ViewerType  vtype;
853 
854   PetscFunctionBegin;
855   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
856   if (PetscTypeCompare(vtype,ASCII_VIEWER)) {
857     ierr = ViewerGetFormat(viewer,&format);
858     if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
859       MatInfo info;
860       int     flg;
861       MPI_Comm_rank(mat->comm,&rank);
862       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
863       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
864       ierr = OptionsHasName(PETSC_NULL,"-mat_aij_no_inode",&flg); CHKERRQ(ierr);
865       PetscSequentialPhaseBegin(mat->comm,1);
866       if (flg) fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d mem %d, not using I-node routines\n",
867          rank,aij->m,(int)info.nz_used,(int)info.nz_allocated,(int)info.memory);
868       else fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d mem %d, using I-node routines\n",
869          rank,aij->m,(int)info.nz_used,(int)info.nz_allocated,(int)info.memory);
870       ierr = MatGetInfo(aij->A,MAT_LOCAL,&info);
871       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used);
872       ierr = MatGetInfo(aij->B,MAT_LOCAL,&info);
873       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used);
874       fflush(fd);
875       PetscSequentialPhaseEnd(mat->comm,1);
876       ierr = VecScatterView(aij->Mvctx,viewer); CHKERRQ(ierr);
877       PetscFunctionReturn(0);
878     } else if (format == VIEWER_FORMAT_ASCII_INFO) {
879       PetscFunctionReturn(0);
880     }
881   } else if (PetscTypeCompare(vtype,DRAW_VIEWER)) {
882     Draw       draw;
883     PetscTruth isnull;
884     ierr = ViewerDrawGetDraw(viewer,0,&draw); CHKERRQ(ierr);
885     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) PetscFunctionReturn(0);
886   }
887 
888   if (size == 1) {
889     ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
890   } else {
891     /* assemble the entire matrix onto first processor. */
892     Mat         A;
893     Mat_SeqAIJ *Aloc;
894     int         M = aij->M, N = aij->N,m,*ai,*aj,row,*cols,i,*ct;
895     Scalar      *a;
896 
897     if (!rank) {
898       ierr = MatCreateMPIAIJ(mat->comm,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);CHKERRQ(ierr);
899     } else {
900       ierr = MatCreateMPIAIJ(mat->comm,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);CHKERRQ(ierr);
901     }
902     PLogObjectParent(mat,A);
903 
904     /* copy over the A part */
905     Aloc = (Mat_SeqAIJ*) aij->A->data;
906     m = Aloc->m; ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
907     row = aij->rstart;
908     for ( i=0; i<ai[m]+shift; i++ ) {aj[i] += aij->cstart + shift;}
909     for ( i=0; i<m; i++ ) {
910       ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],aj,a,INSERT_VALUES);CHKERRQ(ierr);
911       row++; a += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
912     }
913     aj = Aloc->j;
914     for ( i=0; i<ai[m]+shift; i++ ) {aj[i] -= aij->cstart + shift;}
915 
916     /* copy over the B part */
917     Aloc = (Mat_SeqAIJ*) aij->B->data;
918     m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
919     row = aij->rstart;
920     ct = cols = (int *) PetscMalloc( (ai[m]+1)*sizeof(int) ); CHKPTRQ(cols);
921     for ( i=0; i<ai[m]+shift; i++ ) {cols[i] = aij->garray[aj[i]+shift];}
922     for ( i=0; i<m; i++ ) {
923       ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],cols,a,INSERT_VALUES);CHKERRQ(ierr);
924       row++; a += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
925     }
926     PetscFree(ct);
927     ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
928     ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
929     /*
930        Everyone has to call to draw the matrix since the graphics waits are
931        synchronized across all processors that share the Draw object
932     */
933     if (!rank || PetscTypeCompare(vtype,DRAW_VIEWER)) {
934       ierr = MatView(((Mat_MPIAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
935     }
936     ierr = MatDestroy(A); CHKERRQ(ierr);
937   }
938   PetscFunctionReturn(0);
939 }
940 
941 #undef __FUNC__
942 #define __FUNC__ "MatView_MPIAIJ"
943 int MatView_MPIAIJ(Mat mat,Viewer viewer)
944 {
945   int         ierr;
946   ViewerType  vtype;
947 
948   PetscFunctionBegin;
949   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
950   if (PetscTypeCompare(vtype,ASCII_VIEWER) || PetscTypeCompare(vtype,DRAW_VIEWER) ||
951       PetscTypeCompare(vtype,SOCKET_VIEWER)) {
952     ierr = MatView_MPIAIJ_ASCIIorDraworSocket(mat,viewer); CHKERRQ(ierr);
953   } else if (PetscTypeCompare(vtype,BINARY_VIEWER)) {
954     ierr = MatView_MPIAIJ_Binary(mat,viewer);CHKERRQ(ierr);
955   } else {
956     SETERRQ(1,1,"Viewer type not supported by PETSc object");
957   }
958   PetscFunctionReturn(0);
959 }
960 
961 /*
962     This has to provide several versions.
963 
964      2) a) use only local smoothing updating outer values only once.
965         b) local smoothing updating outer values each inner iteration
966      3) color updating out values betwen colors.
967 */
968 #undef __FUNC__
969 #define __FUNC__ "MatRelax_MPIAIJ"
970 int MatRelax_MPIAIJ(Mat matin,Vec bb,double omega,MatSORType flag,
971                            double fshift,int its,Vec xx)
972 {
973   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
974   Mat        AA = mat->A, BB = mat->B;
975   Mat_SeqAIJ *A = (Mat_SeqAIJ *) AA->data, *B = (Mat_SeqAIJ *)BB->data;
976   Scalar     *b,*x,*xs,*ls,d,*v,sum;
977   int        ierr,*idx, *diag;
978   int        n = mat->n, m = mat->m, i,shift = A->indexshift;
979 
980   PetscFunctionBegin;
981   ierr = VecGetArray(xx,&x); CHKERRQ(ierr);
982   ierr = VecGetArray(bb,&b); CHKERRQ(ierr);
983   ierr = VecGetArray(mat->lvec,&ls);CHKERRQ(ierr);
984   xs = x + shift; /* shift by one for index start of 1 */
985   ls = ls + shift;
986   if (!A->diag) {ierr = MatMarkDiag_SeqAIJ(AA); CHKERRQ(ierr);}
987   diag = A->diag;
988   if ((flag & SOR_LOCAL_SYMMETRIC_SWEEP) == SOR_LOCAL_SYMMETRIC_SWEEP){
989     if (flag & SOR_ZERO_INITIAL_GUESS) {
990       ierr = (*mat->A->ops->relax)(mat->A,bb,omega,flag,fshift,its,xx);CHKERRQ(ierr);
991       ierr = VecRestoreArray(xx,&x); CHKERRQ(ierr);
992       ierr = VecRestoreArray(bb,&b); CHKERRQ(ierr);
993       ierr = VecRestoreArray(mat->lvec,&ls);CHKERRQ(ierr);
994       PetscFunctionReturn(0);
995     }
996     ierr = VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);CHKERRQ(ierr);
997     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);CHKERRQ(ierr);
998     while (its--) {
999       /* go down through the rows */
1000       for ( i=0; i<m; i++ ) {
1001         n    = A->i[i+1] - A->i[i];
1002 	PLogFlops(4*n+3);
1003         idx  = A->j + A->i[i] + shift;
1004         v    = A->a + A->i[i] + shift;
1005         sum  = b[i];
1006         SPARSEDENSEMDOT(sum,xs,v,idx,n);
1007         d    = fshift + A->a[diag[i]+shift];
1008         n    = B->i[i+1] - B->i[i];
1009         idx  = B->j + B->i[i] + shift;
1010         v    = B->a + B->i[i] + shift;
1011         SPARSEDENSEMDOT(sum,ls,v,idx,n);
1012         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
1013       }
1014       /* come up through the rows */
1015       for ( i=m-1; i>-1; i-- ) {
1016         n    = A->i[i+1] - A->i[i];
1017 	PLogFlops(4*n+3)
1018         idx  = A->j + A->i[i] + shift;
1019         v    = A->a + A->i[i] + shift;
1020         sum  = b[i];
1021         SPARSEDENSEMDOT(sum,xs,v,idx,n);
1022         d    = fshift + A->a[diag[i]+shift];
1023         n    = B->i[i+1] - B->i[i];
1024         idx  = B->j + B->i[i] + shift;
1025         v    = B->a + B->i[i] + shift;
1026         SPARSEDENSEMDOT(sum,ls,v,idx,n);
1027         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
1028       }
1029     }
1030   } else if (flag & SOR_LOCAL_FORWARD_SWEEP){
1031     if (flag & SOR_ZERO_INITIAL_GUESS) {
1032       ierr = (*mat->A->ops->relax)(mat->A,bb,omega,flag,fshift,its,xx);CHKERRQ(ierr);
1033       ierr = VecRestoreArray(xx,&x); CHKERRQ(ierr);
1034       ierr = VecRestoreArray(bb,&b); CHKERRQ(ierr);
1035       ierr = VecRestoreArray(mat->lvec,&ls);CHKERRQ(ierr);
1036       PetscFunctionReturn(0);
1037     }
1038     ierr = VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);CHKERRQ(ierr);
1039     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);CHKERRQ(ierr);
1040     while (its--) {
1041       for ( i=0; i<m; i++ ) {
1042         n    = A->i[i+1] - A->i[i];
1043 	PLogFlops(4*n+3);
1044         idx  = A->j + A->i[i] + shift;
1045         v    = A->a + A->i[i] + shift;
1046         sum  = b[i];
1047         SPARSEDENSEMDOT(sum,xs,v,idx,n);
1048         d    = fshift + A->a[diag[i]+shift];
1049         n    = B->i[i+1] - B->i[i];
1050         idx  = B->j + B->i[i] + shift;
1051         v    = B->a + B->i[i] + shift;
1052         SPARSEDENSEMDOT(sum,ls,v,idx,n);
1053         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
1054       }
1055     }
1056   } else if (flag & SOR_LOCAL_BACKWARD_SWEEP){
1057     if (flag & SOR_ZERO_INITIAL_GUESS) {
1058       ierr = (*mat->A->ops->relax)(mat->A,bb,omega,flag,fshift,its,xx);CHKERRQ(ierr);
1059       ierr = VecRestoreArray(xx,&x); CHKERRQ(ierr);
1060       ierr = VecRestoreArray(bb,&b); CHKERRQ(ierr);
1061       ierr = VecRestoreArray(mat->lvec,&ls);CHKERRQ(ierr);
1062       PetscFunctionReturn(0);
1063     }
1064     ierr = VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);CHKERRQ(ierr);
1065     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);CHKERRQ(ierr);
1066     while (its--) {
1067       for ( i=m-1; i>-1; i-- ) {
1068         n    = A->i[i+1] - A->i[i];
1069 	PLogFlops(4*n+3);
1070         idx  = A->j + A->i[i] + shift;
1071         v    = A->a + A->i[i] + shift;
1072         sum  = b[i];
1073         SPARSEDENSEMDOT(sum,xs,v,idx,n);
1074         d    = fshift + A->a[diag[i]+shift];
1075         n    = B->i[i+1] - B->i[i];
1076         idx  = B->j + B->i[i] + shift;
1077         v    = B->a + B->i[i] + shift;
1078         SPARSEDENSEMDOT(sum,ls,v,idx,n);
1079         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
1080       }
1081     }
1082   } else {
1083     SETERRQ(PETSC_ERR_SUP,0,"Parallel SOR not supported");
1084   }
1085   ierr = VecRestoreArray(xx,&x); CHKERRQ(ierr);
1086   ierr = VecRestoreArray(bb,&b); CHKERRQ(ierr);
1087   ierr = VecRestoreArray(mat->lvec,&ls);CHKERRQ(ierr);
1088   PetscFunctionReturn(0);
1089 }
1090 
1091 #undef __FUNC__
1092 #define __FUNC__ "MatGetInfo_MPIAIJ"
1093 int MatGetInfo_MPIAIJ(Mat matin,MatInfoType flag,MatInfo *info)
1094 {
1095   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1096   Mat        A = mat->A, B = mat->B;
1097   int        ierr;
1098   double     isend[5], irecv[5];
1099 
1100   PetscFunctionBegin;
1101   info->block_size     = 1.0;
1102   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
1103   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->nz_unneeded;
1104   isend[3] = info->memory;  isend[4] = info->mallocs;
1105   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
1106   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->nz_unneeded;
1107   isend[3] += info->memory;  isend[4] += info->mallocs;
1108   if (flag == MAT_LOCAL) {
1109     info->nz_used      = isend[0];
1110     info->nz_allocated = isend[1];
1111     info->nz_unneeded  = isend[2];
1112     info->memory       = isend[3];
1113     info->mallocs      = isend[4];
1114   } else if (flag == MAT_GLOBAL_MAX) {
1115     ierr = MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_MAX,matin->comm);CHKERRQ(ierr);
1116     info->nz_used      = irecv[0];
1117     info->nz_allocated = irecv[1];
1118     info->nz_unneeded  = irecv[2];
1119     info->memory       = irecv[3];
1120     info->mallocs      = irecv[4];
1121   } else if (flag == MAT_GLOBAL_SUM) {
1122     ierr = MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_SUM,matin->comm);CHKERRQ(ierr);
1123     info->nz_used      = irecv[0];
1124     info->nz_allocated = irecv[1];
1125     info->nz_unneeded  = irecv[2];
1126     info->memory       = irecv[3];
1127     info->mallocs      = irecv[4];
1128   }
1129   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
1130   info->fill_ratio_needed = 0;
1131   info->factor_mallocs    = 0;
1132   info->rows_global       = (double)mat->M;
1133   info->columns_global    = (double)mat->N;
1134   info->rows_local        = (double)mat->m;
1135   info->columns_local     = (double)mat->N;
1136 
1137   PetscFunctionReturn(0);
1138 }
1139 
1140 #undef __FUNC__
1141 #define __FUNC__ "MatSetOption_MPIAIJ"
1142 int MatSetOption_MPIAIJ(Mat A,MatOption op)
1143 {
1144   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
1145 
1146   PetscFunctionBegin;
1147   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
1148       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
1149       op == MAT_COLUMNS_UNSORTED ||
1150       op == MAT_COLUMNS_SORTED ||
1151       op == MAT_NEW_NONZERO_ALLOCATION_ERROR ||
1152       op == MAT_NEW_NONZERO_LOCATION_ERROR) {
1153         MatSetOption(a->A,op);
1154         MatSetOption(a->B,op);
1155   } else if (op == MAT_ROW_ORIENTED) {
1156     a->roworiented = 1;
1157     MatSetOption(a->A,op);
1158     MatSetOption(a->B,op);
1159   } else if (op == MAT_ROWS_SORTED ||
1160              op == MAT_ROWS_UNSORTED ||
1161              op == MAT_SYMMETRIC ||
1162              op == MAT_STRUCTURALLY_SYMMETRIC ||
1163              op == MAT_YES_NEW_DIAGONALS)
1164     PLogInfo(A,"MatSetOption_MPIAIJ:Option ignored\n");
1165   else if (op == MAT_COLUMN_ORIENTED) {
1166     a->roworiented = 0;
1167     MatSetOption(a->A,op);
1168     MatSetOption(a->B,op);
1169   } else if (op == MAT_IGNORE_OFF_PROC_ENTRIES) {
1170     a->donotstash = 1;
1171   } else if (op == MAT_NO_NEW_DIAGONALS){
1172     SETERRQ(PETSC_ERR_SUP,0,"MAT_NO_NEW_DIAGONALS");
1173   } else {
1174     SETERRQ(PETSC_ERR_SUP,0,"unknown option");
1175   }
1176   PetscFunctionReturn(0);
1177 }
1178 
1179 #undef __FUNC__
1180 #define __FUNC__ "MatGetSize_MPIAIJ"
1181 int MatGetSize_MPIAIJ(Mat matin,int *m,int *n)
1182 {
1183   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1184 
1185   PetscFunctionBegin;
1186   if (m) *m = mat->M;
1187   if (n) *n = mat->N;
1188   PetscFunctionReturn(0);
1189 }
1190 
1191 #undef __FUNC__
1192 #define __FUNC__ "MatGetLocalSize_MPIAIJ"
1193 int MatGetLocalSize_MPIAIJ(Mat matin,int *m,int *n)
1194 {
1195   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1196 
1197   PetscFunctionBegin;
1198   if (m) *m = mat->m;
1199   if (n) *n = mat->n;
1200   PetscFunctionReturn(0);
1201 }
1202 
1203 #undef __FUNC__
1204 #define __FUNC__ "MatGetOwnershipRange_MPIAIJ"
1205 int MatGetOwnershipRange_MPIAIJ(Mat matin,int *m,int *n)
1206 {
1207   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1208 
1209   PetscFunctionBegin;
1210   *m = mat->rstart; *n = mat->rend;
1211   PetscFunctionReturn(0);
1212 }
1213 
1214 extern int MatGetRow_SeqAIJ(Mat,int,int*,int**,Scalar**);
1215 extern int MatRestoreRow_SeqAIJ(Mat,int,int*,int**,Scalar**);
1216 
1217 #undef __FUNC__
1218 #define __FUNC__ "MatGetRow_MPIAIJ"
1219 int MatGetRow_MPIAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
1220 {
1221   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1222   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
1223   int        i, ierr, *cworkA, *cworkB, **pcA, **pcB, cstart = mat->cstart;
1224   int        nztot, nzA, nzB, lrow, rstart = mat->rstart, rend = mat->rend;
1225   int        *cmap, *idx_p;
1226 
1227   PetscFunctionBegin;
1228   if (mat->getrowactive == PETSC_TRUE) SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"Already active");
1229   mat->getrowactive = PETSC_TRUE;
1230 
1231   if (!mat->rowvalues && (idx || v)) {
1232     /*
1233         allocate enough space to hold information from the longest row.
1234     */
1235     Mat_SeqAIJ *Aa = (Mat_SeqAIJ *) mat->A->data,*Ba = (Mat_SeqAIJ *) mat->B->data;
1236     int     max = 1,m = mat->m,tmp;
1237     for ( i=0; i<m; i++ ) {
1238       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
1239       if (max < tmp) { max = tmp; }
1240     }
1241     mat->rowvalues  = (Scalar *) PetscMalloc(max*(sizeof(int)+sizeof(Scalar)));CHKPTRQ(mat->rowvalues);
1242     mat->rowindices = (int *) (mat->rowvalues + max);
1243   }
1244 
1245   if (row < rstart || row >= rend) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Only local rows")
1246   lrow = row - rstart;
1247 
1248   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
1249   if (!v)   {pvA = 0; pvB = 0;}
1250   if (!idx) {pcA = 0; if (!v) pcB = 0;}
1251   ierr = (*mat->A->ops->getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1252   ierr = (*mat->B->ops->getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1253   nztot = nzA + nzB;
1254 
1255   cmap  = mat->garray;
1256   if (v  || idx) {
1257     if (nztot) {
1258       /* Sort by increasing column numbers, assuming A and B already sorted */
1259       int imark = -1;
1260       if (v) {
1261         *v = v_p = mat->rowvalues;
1262         for ( i=0; i<nzB; i++ ) {
1263           if (cmap[cworkB[i]] < cstart)   v_p[i] = vworkB[i];
1264           else break;
1265         }
1266         imark = i;
1267         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
1268         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
1269       }
1270       if (idx) {
1271         *idx = idx_p = mat->rowindices;
1272         if (imark > -1) {
1273           for ( i=0; i<imark; i++ ) {
1274             idx_p[i] = cmap[cworkB[i]];
1275           }
1276         } else {
1277           for ( i=0; i<nzB; i++ ) {
1278             if (cmap[cworkB[i]] < cstart)   idx_p[i] = cmap[cworkB[i]];
1279             else break;
1280           }
1281           imark = i;
1282         }
1283         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart + cworkA[i];
1284         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]];
1285       }
1286     } else {
1287       if (idx) *idx = 0;
1288       if (v)   *v   = 0;
1289     }
1290   }
1291   *nz = nztot;
1292   ierr = (*mat->A->ops->restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1293   ierr = (*mat->B->ops->restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1294   PetscFunctionReturn(0);
1295 }
1296 
1297 #undef __FUNC__
1298 #define __FUNC__ "MatRestoreRow_MPIAIJ"
1299 int MatRestoreRow_MPIAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
1300 {
1301   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
1302 
1303   PetscFunctionBegin;
1304   if (aij->getrowactive == PETSC_FALSE) {
1305     SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"MatGetRow not called");
1306   }
1307   aij->getrowactive = PETSC_FALSE;
1308   PetscFunctionReturn(0);
1309 }
1310 
1311 #undef __FUNC__
1312 #define __FUNC__ "MatNorm_MPIAIJ"
1313 int MatNorm_MPIAIJ(Mat mat,NormType type,double *norm)
1314 {
1315   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
1316   Mat_SeqAIJ *amat = (Mat_SeqAIJ*) aij->A->data, *bmat = (Mat_SeqAIJ*) aij->B->data;
1317   int        ierr, i, j, cstart = aij->cstart,shift = amat->indexshift;
1318   double     sum = 0.0;
1319   Scalar     *v;
1320 
1321   PetscFunctionBegin;
1322   if (aij->size == 1) {
1323     ierr =  MatNorm(aij->A,type,norm); CHKERRQ(ierr);
1324   } else {
1325     if (type == NORM_FROBENIUS) {
1326       v = amat->a;
1327       for (i=0; i<amat->nz; i++ ) {
1328 #if defined(USE_PETSC_COMPLEX)
1329         sum += PetscReal(PetscConj(*v)*(*v)); v++;
1330 #else
1331         sum += (*v)*(*v); v++;
1332 #endif
1333       }
1334       v = bmat->a;
1335       for (i=0; i<bmat->nz; i++ ) {
1336 #if defined(USE_PETSC_COMPLEX)
1337         sum += PetscReal(PetscConj(*v)*(*v)); v++;
1338 #else
1339         sum += (*v)*(*v); v++;
1340 #endif
1341       }
1342       ierr = MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);CHKERRQ(ierr);
1343       *norm = sqrt(*norm);
1344     } else if (type == NORM_1) { /* max column norm */
1345       double *tmp, *tmp2;
1346       int    *jj, *garray = aij->garray;
1347       tmp  = (double *) PetscMalloc( (aij->N+1)*sizeof(double) ); CHKPTRQ(tmp);
1348       tmp2 = (double *) PetscMalloc( (aij->N+1)*sizeof(double) ); CHKPTRQ(tmp2);
1349       PetscMemzero(tmp,aij->N*sizeof(double));
1350       *norm = 0.0;
1351       v = amat->a; jj = amat->j;
1352       for ( j=0; j<amat->nz; j++ ) {
1353         tmp[cstart + *jj++ + shift] += PetscAbsScalar(*v);  v++;
1354       }
1355       v = bmat->a; jj = bmat->j;
1356       for ( j=0; j<bmat->nz; j++ ) {
1357         tmp[garray[*jj++ + shift]] += PetscAbsScalar(*v); v++;
1358       }
1359       ierr = MPI_Allreduce(tmp,tmp2,aij->N,MPI_DOUBLE,MPI_SUM,mat->comm);CHKERRQ(ierr);
1360       for ( j=0; j<aij->N; j++ ) {
1361         if (tmp2[j] > *norm) *norm = tmp2[j];
1362       }
1363       PetscFree(tmp); PetscFree(tmp2);
1364     } else if (type == NORM_INFINITY) { /* max row norm */
1365       double ntemp = 0.0;
1366       for ( j=0; j<amat->m; j++ ) {
1367         v = amat->a + amat->i[j] + shift;
1368         sum = 0.0;
1369         for ( i=0; i<amat->i[j+1]-amat->i[j]; i++ ) {
1370           sum += PetscAbsScalar(*v); v++;
1371         }
1372         v = bmat->a + bmat->i[j] + shift;
1373         for ( i=0; i<bmat->i[j+1]-bmat->i[j]; i++ ) {
1374           sum += PetscAbsScalar(*v); v++;
1375         }
1376         if (sum > ntemp) ntemp = sum;
1377       }
1378       ierr = MPI_Allreduce(&ntemp,norm,1,MPI_DOUBLE,MPI_MAX,mat->comm);CHKERRQ(ierr);
1379     } else {
1380       SETERRQ(PETSC_ERR_SUP,0,"No support for two norm");
1381     }
1382   }
1383   PetscFunctionReturn(0);
1384 }
1385 
1386 #undef __FUNC__
1387 #define __FUNC__ "MatTranspose_MPIAIJ"
1388 int MatTranspose_MPIAIJ(Mat A,Mat *matout)
1389 {
1390   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
1391   Mat_SeqAIJ *Aloc = (Mat_SeqAIJ *) a->A->data;
1392   int        ierr,shift = Aloc->indexshift;
1393   int        M = a->M, N = a->N,m,*ai,*aj,row,*cols,i,*ct;
1394   Mat        B;
1395   Scalar     *array;
1396 
1397   PetscFunctionBegin;
1398   if (matout == PETSC_NULL && M != N) {
1399     SETERRQ(PETSC_ERR_ARG_SIZ,0,"Square matrix only for in-place");
1400   }
1401 
1402   ierr = MatCreateMPIAIJ(A->comm,a->n,a->m,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);CHKERRQ(ierr);
1403 
1404   /* copy over the A part */
1405   Aloc = (Mat_SeqAIJ*) a->A->data;
1406   m = Aloc->m; ai = Aloc->i; aj = Aloc->j; array = Aloc->a;
1407   row = a->rstart;
1408   for ( i=0; i<ai[m]+shift; i++ ) {aj[i] += a->cstart + shift;}
1409   for ( i=0; i<m; i++ ) {
1410     ierr = MatSetValues(B,ai[i+1]-ai[i],aj,1,&row,array,INSERT_VALUES);CHKERRQ(ierr);
1411     row++; array += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
1412   }
1413   aj = Aloc->j;
1414   for ( i=0; i<ai[m]+shift; i++ ) {aj[i] -= a->cstart + shift;}
1415 
1416   /* copy over the B part */
1417   Aloc = (Mat_SeqAIJ*) a->B->data;
1418   m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; array = Aloc->a;
1419   row = a->rstart;
1420   ct = cols = (int *) PetscMalloc( (1+ai[m]-shift)*sizeof(int) ); CHKPTRQ(cols);
1421   for ( i=0; i<ai[m]+shift; i++ ) {cols[i] = a->garray[aj[i]+shift];}
1422   for ( i=0; i<m; i++ ) {
1423     ierr = MatSetValues(B,ai[i+1]-ai[i],cols,1,&row,array,INSERT_VALUES);CHKERRQ(ierr);
1424     row++; array += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
1425   }
1426   PetscFree(ct);
1427   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1428   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1429   if (matout != PETSC_NULL) {
1430     *matout = B;
1431   } else {
1432     PetscOps       *Abops;
1433     struct _MatOps *Aops;
1434 
1435     /* This isn't really an in-place transpose .... but free data structures from a */
1436     PetscFree(a->rowners);
1437     ierr = MatDestroy(a->A); CHKERRQ(ierr);
1438     ierr = MatDestroy(a->B); CHKERRQ(ierr);
1439 #if defined (USE_CTABLE)
1440     if (a->colmap) TableDelete(a->colmap);
1441 #else
1442     if (a->colmap) PetscFree(a->colmap);
1443 #endif
1444     if (a->garray) PetscFree(a->garray);
1445     if (a->lvec) VecDestroy(a->lvec);
1446     if (a->Mvctx) VecScatterDestroy(a->Mvctx);
1447     PetscFree(a);
1448 
1449     /*
1450        This is horrible, horrible code. We need to keep the
1451       A pointers for the bops and ops but copy everything
1452       else from C.
1453     */
1454     Abops = A->bops;
1455     Aops  = A->ops;
1456     PetscMemcpy(A,B,sizeof(struct _p_Mat));
1457     A->bops = Abops;
1458     A->ops  = Aops;
1459     PetscHeaderDestroy(B);
1460   }
1461   PetscFunctionReturn(0);
1462 }
1463 
1464 #undef __FUNC__
1465 #define __FUNC__ "MatDiagonalScale_MPIAIJ"
1466 int MatDiagonalScale_MPIAIJ(Mat mat,Vec ll,Vec rr)
1467 {
1468   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
1469   Mat a = aij->A, b = aij->B;
1470   int ierr,s1,s2,s3;
1471 
1472   PetscFunctionBegin;
1473   ierr = MatGetLocalSize(mat,&s2,&s3); CHKERRQ(ierr);
1474   if (rr) {
1475     ierr = VecGetLocalSize(rr,&s1);CHKERRQ(ierr);
1476     if (s1!=s3) SETERRQ(PETSC_ERR_ARG_SIZ,0,"right vector non-conforming local size");
1477     /* Overlap communication with computation. */
1478     ierr = VecScatterBegin(rr,aij->lvec,INSERT_VALUES,SCATTER_FORWARD,aij->Mvctx); CHKERRQ(ierr);
1479   }
1480   if (ll) {
1481     ierr = VecGetLocalSize(ll,&s1);CHKERRQ(ierr);
1482     if (s1!=s2) SETERRQ(PETSC_ERR_ARG_SIZ,0,"left vector non-conforming local size");
1483     ierr = (*b->ops->diagonalscale)(b,ll,0); CHKERRQ(ierr);
1484   }
1485   /* scale  the diagonal block */
1486   ierr = (*a->ops->diagonalscale)(a,ll,rr); CHKERRQ(ierr);
1487 
1488   if (rr) {
1489     /* Do a scatter end and then right scale the off-diagonal block */
1490     ierr = VecScatterEnd(rr,aij->lvec,INSERT_VALUES,SCATTER_FORWARD,aij->Mvctx); CHKERRQ(ierr);
1491     ierr = (*b->ops->diagonalscale)(b,0,aij->lvec); CHKERRQ(ierr);
1492   }
1493 
1494   PetscFunctionReturn(0);
1495 }
1496 
1497 
1498 extern int MatPrintHelp_SeqAIJ(Mat);
1499 #undef __FUNC__
1500 #define __FUNC__ "MatPrintHelp_MPIAIJ"
1501 int MatPrintHelp_MPIAIJ(Mat A)
1502 {
1503   Mat_MPIAIJ *a   = (Mat_MPIAIJ*) A->data;
1504   int        ierr;
1505 
1506   PetscFunctionBegin;
1507   if (!a->rank) {
1508     ierr = MatPrintHelp_SeqAIJ(a->A);CHKERRQ(ierr);
1509   }
1510   PetscFunctionReturn(0);
1511 }
1512 
1513 #undef __FUNC__
1514 #define __FUNC__ "MatGetBlockSize_MPIAIJ"
1515 int MatGetBlockSize_MPIAIJ(Mat A,int *bs)
1516 {
1517   PetscFunctionBegin;
1518   *bs = 1;
1519   PetscFunctionReturn(0);
1520 }
1521 #undef __FUNC__
1522 #define __FUNC__ "MatSetUnfactored_MPIAIJ"
1523 int MatSetUnfactored_MPIAIJ(Mat A)
1524 {
1525   Mat_MPIAIJ *a   = (Mat_MPIAIJ*) A->data;
1526   int        ierr;
1527 
1528   PetscFunctionBegin;
1529   ierr = MatSetUnfactored(a->A); CHKERRQ(ierr);
1530   PetscFunctionReturn(0);
1531 }
1532 
1533 #undef __FUNC__
1534 #define __FUNC__ "MatEqual_MPIAIJ"
1535 int MatEqual_MPIAIJ(Mat A, Mat B, PetscTruth *flag)
1536 {
1537   Mat_MPIAIJ *matB = (Mat_MPIAIJ *) B->data,*matA = (Mat_MPIAIJ *) A->data;
1538   Mat        a, b, c, d;
1539   PetscTruth flg;
1540   int        ierr;
1541 
1542   PetscFunctionBegin;
1543   if (B->type != MATMPIAIJ) SETERRQ(PETSC_ERR_ARG_INCOMP,0,"Matrices must be same type");
1544   a = matA->A; b = matA->B;
1545   c = matB->A; d = matB->B;
1546 
1547   ierr = MatEqual(a, c, &flg); CHKERRQ(ierr);
1548   if (flg == PETSC_TRUE) {
1549     ierr = MatEqual(b, d, &flg); CHKERRQ(ierr);
1550   }
1551   ierr = MPI_Allreduce(&flg, flag, 1, MPI_INT, MPI_LAND, A->comm);CHKERRQ(ierr);
1552   PetscFunctionReturn(0);
1553 }
1554 
1555 #undef __FUNC__
1556 #define __FUNC__ "MatCopy_MPIAIJ"
1557 int MatCopy_MPIAIJ(Mat A,Mat B,MatStructure str)
1558 {
1559   int        ierr;
1560   Mat_MPIAIJ *a = (Mat_MPIAIJ *)A->data;
1561   Mat_MPIAIJ *b = (Mat_MPIAIJ *)B->data;
1562 
1563   PetscFunctionBegin;
1564   if (str != SAME_NONZERO_PATTERN || B->type != MATMPIAIJ) {
1565     /* because of the column compression in the off-processor part of the matrix a->B,
1566        the number of columns in a->B and b->B may be different, hence we cannot call
1567        the MatCopy() directly on the two parts. If need be, we can provide a more
1568        efficient copy than the MatCopy_Basic() by first uncompressing the a->B matrices
1569        then copying the submatrices */
1570     ierr = MatCopy_Basic(A,B,str);CHKERRQ(ierr);
1571   } else {
1572     ierr = MatCopy(a->A,b->A,str);CHKERRQ(ierr);
1573     ierr = MatCopy(a->B,b->B,str);CHKERRQ(ierr);
1574   }
1575   PetscFunctionReturn(0);
1576 }
1577 
1578 extern int MatDuplicate_MPIAIJ(Mat,MatDuplicateOption,Mat *);
1579 extern int MatIncreaseOverlap_MPIAIJ(Mat , int, IS *, int);
1580 extern int MatFDColoringCreate_MPIAIJ(Mat,ISColoring,MatFDColoring);
1581 extern int MatGetSubMatrices_MPIAIJ (Mat ,int , IS *,IS *,MatReuse,Mat **);
1582 extern int MatGetSubMatrix_MPIAIJ (Mat ,IS,IS,int,MatReuse,Mat *);
1583 
1584 /* -------------------------------------------------------------------*/
1585 static struct _MatOps MatOps_Values = {MatSetValues_MPIAIJ,
1586        MatGetRow_MPIAIJ,
1587        MatRestoreRow_MPIAIJ,
1588        MatMult_MPIAIJ,
1589        MatMultAdd_MPIAIJ,
1590        MatMultTrans_MPIAIJ,
1591        MatMultTransAdd_MPIAIJ,
1592        0,
1593        0,
1594        0,
1595        0,
1596        0,
1597        0,
1598        MatRelax_MPIAIJ,
1599        MatTranspose_MPIAIJ,
1600        MatGetInfo_MPIAIJ,
1601        MatEqual_MPIAIJ,
1602        MatGetDiagonal_MPIAIJ,
1603        MatDiagonalScale_MPIAIJ,
1604        MatNorm_MPIAIJ,
1605        MatAssemblyBegin_MPIAIJ,
1606        MatAssemblyEnd_MPIAIJ,
1607        0,
1608        MatSetOption_MPIAIJ,
1609        MatZeroEntries_MPIAIJ,
1610        MatZeroRows_MPIAIJ,
1611        0,
1612        0,
1613        0,
1614        0,
1615        MatGetSize_MPIAIJ,
1616        MatGetLocalSize_MPIAIJ,
1617        MatGetOwnershipRange_MPIAIJ,
1618        0,
1619        0,
1620        0,
1621        0,
1622        MatDuplicate_MPIAIJ,
1623        0,
1624        0,
1625        0,
1626        0,
1627        0,
1628        MatGetSubMatrices_MPIAIJ,
1629        MatIncreaseOverlap_MPIAIJ,
1630        MatGetValues_MPIAIJ,
1631        MatCopy_MPIAIJ,
1632        MatPrintHelp_MPIAIJ,
1633        MatScale_MPIAIJ,
1634        0,
1635        0,
1636        0,
1637        MatGetBlockSize_MPIAIJ,
1638        0,
1639        0,
1640        0,
1641        0,
1642        MatFDColoringCreate_MPIAIJ,
1643        0,
1644        MatSetUnfactored_MPIAIJ,
1645        0,
1646        0,
1647        MatGetSubMatrix_MPIAIJ,
1648        0,
1649        0,
1650        MatGetMaps_Petsc};
1651 
1652 /* ----------------------------------------------------------------------------------------*/
1653 
1654 EXTERN_C_BEGIN
1655 #undef __FUNC__
1656 #define __FUNC__ "MatStoreValues_MPIAIJ"
1657 int MatStoreValues_MPIAIJ(Mat mat)
1658 {
1659   Mat_MPIAIJ *aij = (Mat_MPIAIJ *)mat->data;
1660   int        ierr;
1661 
1662   PetscFunctionBegin;
1663   ierr = MatStoreValues(aij->A);CHKERRQ(ierr);
1664   ierr = MatStoreValues(aij->B);CHKERRQ(ierr);
1665   PetscFunctionReturn(0);
1666 }
1667 EXTERN_C_END
1668 
1669 EXTERN_C_BEGIN
1670 #undef __FUNC__
1671 #define __FUNC__ "MatRetrieveValues_MPIAIJ"
1672 int MatRetrieveValues_MPIAIJ(Mat mat)
1673 {
1674   Mat_MPIAIJ *aij = (Mat_MPIAIJ *)mat->data;
1675   int        ierr;
1676 
1677   PetscFunctionBegin;
1678   ierr = MatRetrieveValues(aij->A);CHKERRQ(ierr);
1679   ierr = MatRetrieveValues(aij->B);CHKERRQ(ierr);
1680   PetscFunctionReturn(0);
1681 }
1682 EXTERN_C_END
1683 
1684 #include "pc.h"
1685 EXTERN_C_BEGIN
1686 extern int MatGetDiagonalBlock_MPIAIJ(Mat,PetscTruth *,MatReuse,Mat *);
1687 EXTERN_C_END
1688 
1689 #undef __FUNC__
1690 #define __FUNC__ "MatCreateMPIAIJ"
1691 /*@C
1692    MatCreateMPIAIJ - Creates a sparse parallel matrix in AIJ format
1693    (the default parallel PETSc format).  For good matrix assembly performance
1694    the user should preallocate the matrix storage by setting the parameters
1695    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1696    performance can be increased by more than a factor of 50.
1697 
1698    Collective on MPI_Comm
1699 
1700    Input Parameters:
1701 +  comm - MPI communicator
1702 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1703            This value should be the same as the local size used in creating the
1704            y vector for the matrix-vector product y = Ax.
1705 .  n - This value should be the same as the local size used in creating the
1706        x vector for the matrix-vector product y = Ax. (or PETSC_DECIDE to have
1707        calculated if N is given) For square matrices n is almost always m.
1708 .  M - number of global rows (or PETSC_DETERMINE to have calculated if m is given)
1709 .  N - number of global columns (or PETSC_DETERMINE to have calculated if n is given)
1710 .  d_nz  - number of nonzeros per row in DIAGONAL portion of local submatrix
1711            (same value is used for all local rows)
1712 .  d_nnz - array containing the number of nonzeros in the various rows of the
1713            DIAGONAL portion of the local submatrix (possibly different for each row)
1714            or PETSC_NULL, if d_nz is used to specify the nonzero structure.
1715            The size of this array is equal to the number of local rows, i.e 'm'.
1716            You must leave room for the diagonal entry even if it is zero.
1717 .  o_nz  - number of nonzeros per row in the OFF-DIAGONAL portion of local
1718            submatrix (same value is used for all local rows).
1719 -  o_nnz - array containing the number of nonzeros in the various rows of the
1720            OFF-DIAGONAL portion of the local submatrix (possibly different for
1721            each row) or PETSC_NULL, if o_nz is used to specify the nonzero
1722            structure. The size of this array is equal to the number
1723            of local rows, i.e 'm'.
1724 
1725    Output Parameter:
1726 .  A - the matrix
1727 
1728    Notes:
1729    m,n,M,N parameters specify the size of the matrix, and its partitioning across
1730    processors, while d_nz,d_nnz,o_nz,o_nnz parameters specify the approximate
1731    storage requirements for this matrix.
1732 
1733    If PETSC_DECIDE or  PETSC_DETERMINE is used for a particular argument on one
1734    processor than it must be used on all processors that share the object for
1735    that argument.
1736 
1737    The AIJ format (also called the Yale sparse matrix format or
1738    compressed row storage), is fully compatible with standard Fortran 77
1739    storage.  That is, the stored row and column indices can begin at
1740    either one (as in Fortran) or zero.  See the users manual for details.
1741 
1742    The user MUST specify either the local or global matrix dimensions
1743    (possibly both).
1744 
1745    The parallel matrix is partitioned such that the first m0 rows belong to
1746    process 0, the next m1 rows belong to process 1, the next m2 rows belong
1747    to process 2 etc.. where m0,m1,m2... are the input parameter 'm'.
1748 
1749    The DIAGONAL portion of the local submatrix of a processor can be defined
1750    as the submatrix which is obtained by extraction the part corresponding
1751    to the rows r1-r2 and columns r1-r2 of the global matrix, where r1 is the
1752    first row that belongs to the processor, and r2 is the last row belonging
1753    to the this processor. This is a square mxm matrix. The remaining portion
1754    of the local submatrix (mxN) constitute the OFF-DIAGONAL portion.
1755 
1756    If o_nnz, d_nnz are specified, then o_nz, and d_nz are ignored.
1757 
1758    By default, this format uses inodes (identical nodes) when possible.
1759    We search for consecutive rows with the same nonzero structure, thereby
1760    reusing matrix information to achieve increased efficiency.
1761 
1762    Options Database Keys:
1763 +  -mat_aij_no_inode  - Do not use inodes
1764 .  -mat_aij_inode_limit <limit> - Sets inode limit (max limit=5)
1765 -  -mat_aij_oneindex - Internally use indexing starting at 1
1766         rather than 0.  Note that when calling MatSetValues(),
1767         the user still MUST index entries starting at 0!
1768 .   -mat_mpi - use the parallel matrix data structures even on one processor
1769                (defaults to using SeqBAIJ format on one processor)
1770 .   -mat_mpi - use the parallel matrix data structures even on one processor
1771                (defaults to using SeqAIJ format on one processor)
1772 
1773 
1774    Example usage:
1775 
1776    Consider the following 8x8 matrix with 34 non-zero values, that is
1777    assembled across 3 processors. Lets assume that proc0 owns 3 rows,
1778    proc1 owns 3 rows, proc2 owns 2 rows. This division can be shown
1779    as follows:
1780 
1781 .vb
1782             1  2  0  |  0  3  0  |  0  4
1783     Proc0   0  5  6  |  7  0  0  |  8  0
1784             9  0 10  | 11  0  0  | 12  0
1785     -------------------------------------
1786            13  0 14  | 15 16 17  |  0  0
1787     Proc1   0 18  0  | 19 20 21  |  0  0
1788             0  0  0  | 22 23  0  | 24  0
1789     -------------------------------------
1790     Proc2  25 26 27  |  0  0 28  | 29  0
1791            30  0  0  | 31 32 33  |  0 34
1792 .ve
1793 
1794    This can be represented as a collection of submatrices as:
1795 
1796 .vb
1797       A B C
1798       D E F
1799       G H I
1800 .ve
1801 
1802    Where the submatrices A,B,C are owned by proc0, D,E,F are
1803    owned by proc1, G,H,I are owned by proc2.
1804 
1805    The 'm' parameters for proc0,proc1,proc2 are 3,3,2 respectively.
1806    The 'n' parameters for proc0,proc1,proc2 are 3,3,2 respectively.
1807    The 'M','N' parameters are 8,8, and have the same values on all procs.
1808 
1809    The DIAGONAL submatrices corresponding to proc0,proc1,proc2 are
1810    submatrices [A], [E], [I] respectively. The OFF-DIAGONAL submatrices
1811    corresponding to proc0,proc1,proc2 are [BC], [DF], [GH] respectively.
1812    Internally, each processor stores the DIAGONAL part, and the OFF-DIAGONAL
1813    part as SeqAIJ matrices. for eg: proc1 will store [E] as a SeqAIJ
1814    matrix, ans [DF] as another SeqAIJ matrix.
1815 
1816    When d_nz, o_nz parameters are specified, d_nz storage elements are
1817    allocated for every row of the local diagonal submatrix, and o_nz
1818    storage locations are allocated for every row of the OFF-DIAGONAL submat.
1819    One way to choose d_nz and o_nz is to use the max nonzerors per local
1820    rows for each of the local DIAGONAL, and the OFF-DIAGONAL submatrices.
1821    In this case, the values of d_nz,o_nz are:
1822 .vb
1823      proc0 : dnz = 2, o_nz = 2
1824      proc1 : dnz = 3, o_nz = 2
1825      proc2 : dnz = 1, o_nz = 4
1826 .ve
1827    We are allocating m*(d_nz+o_nz) storage locations for every proc. This
1828    translates to 3*(2+2)=12 for proc0, 3*(3+2)=15 for proc1, 2*(1+4)=10
1829    for proc3. i.e we are using 12+15+10=37 storage locations to store
1830    34 values.
1831 
1832    When d_nnz, o_nnz parameters are specified, the storage is specified
1833    for every row, coresponding to both DIAGONAL and OFF-DIAGONAL submatrices.
1834    In the above case the values for d_nnz,o_nnz are:
1835 .vb
1836      proc0: d_nnz = [2,2,2] and o_nnz = [2,2,2]
1837      proc1: d_nnz = [3,3,2] and o_nnz = [2,1,1]
1838      proc2: d_nnz = [1,1]   and o_nnz = [4,4]
1839 .ve
1840    Here the space allocated is sum of all the above values i.e 34, and
1841    hence pre-allocation is perfect.
1842 
1843 .keywords: matrix, aij, compressed row, sparse, parallel
1844 
1845 .seealso: MatCreate(), MatCreateSeqAIJ(), MatSetValues()
1846 @*/
1847 int MatCreateMPIAIJ(MPI_Comm comm,int m,int n,int M,int N,int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1848 {
1849   Mat          B;
1850   Mat_MPIAIJ   *b;
1851   int          ierr, i,sum[2],work[2],size,flag1 = 0, flag2 = 0;
1852 
1853   PetscFunctionBegin;
1854   MPI_Comm_size(comm,&size);
1855   ierr = OptionsHasName(PETSC_NULL,"-mat_mpiaij",&flag1); CHKERRQ(ierr);
1856   ierr = OptionsHasName(PETSC_NULL,"-mat_mpi",&flag2); CHKERRQ(ierr);
1857   if (!flag1 && !flag2 && size == 1) {
1858     if (M == PETSC_DECIDE) M = m;
1859     if (N == PETSC_DECIDE) N = n;
1860     ierr = MatCreateSeqAIJ(comm,M,N,d_nz,d_nnz,A); CHKERRQ(ierr);
1861     PetscFunctionReturn(0);
1862   }
1863 
1864   *A = 0;
1865   PetscHeaderCreate(B,_p_Mat,struct _MatOps,MAT_COOKIE,MATMPIAIJ,"Mat",comm,MatDestroy,MatView);
1866   PLogObjectCreate(B);
1867   B->data            = (void *) (b = PetscNew(Mat_MPIAIJ)); CHKPTRQ(b);
1868   PetscMemzero(b,sizeof(Mat_MPIAIJ));
1869   PetscMemcpy(B->ops,&MatOps_Values,sizeof(struct _MatOps));
1870   B->ops->destroy    = MatDestroy_MPIAIJ;
1871   B->ops->view       = MatView_MPIAIJ;
1872   B->factor          = 0;
1873   B->assembled       = PETSC_FALSE;
1874   B->mapping         = 0;
1875 
1876   B->insertmode      = NOT_SET_VALUES;
1877   b->size            = size;
1878   MPI_Comm_rank(comm,&b->rank);
1879 
1880   if (m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL)) {
1881     SETERRQ(PETSC_ERR_ARG_WRONG,0,"Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1882   }
1883 
1884   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1885     work[0] = m; work[1] = n;
1886     ierr = MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );CHKERRQ(ierr);
1887     if (M == PETSC_DECIDE) M = sum[0];
1888     if (N == PETSC_DECIDE) N = sum[1];
1889   }
1890   if (m == PETSC_DECIDE) {m = M/b->size + ((M % b->size) > b->rank);}
1891   if (n == PETSC_DECIDE) {n = N/b->size + ((N % b->size) > b->rank);}
1892   b->m = m; B->m = m;
1893   b->n = n; B->n = n;
1894   b->N = N; B->N = N;
1895   b->M = M; B->M = M;
1896 
1897   /* the information in the maps duplicates the information computed below, eventually
1898      we should remove the duplicate information that is not contained in the maps */
1899   ierr = MapCreateMPI(comm,m,M,&B->rmap);CHKERRQ(ierr);
1900   ierr = MapCreateMPI(comm,n,N,&B->cmap);CHKERRQ(ierr);
1901 
1902   /* build local table of row and column ownerships */
1903   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1904   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIAIJ));
1905   b->cowners = b->rowners + b->size + 2;
1906   ierr = MPI_Allgather(&m,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1907   b->rowners[0] = 0;
1908   for ( i=2; i<=b->size; i++ ) {
1909     b->rowners[i] += b->rowners[i-1];
1910   }
1911   b->rstart = b->rowners[b->rank];
1912   b->rend   = b->rowners[b->rank+1];
1913   ierr = MPI_Allgather(&n,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1914   b->cowners[0] = 0;
1915   for ( i=2; i<=b->size; i++ ) {
1916     b->cowners[i] += b->cowners[i-1];
1917   }
1918   b->cstart = b->cowners[b->rank];
1919   b->cend   = b->cowners[b->rank+1];
1920 
1921   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1922   ierr = MatCreateSeqAIJ(PETSC_COMM_SELF,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1923   PLogObjectParent(B,b->A);
1924   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1925   ierr = MatCreateSeqAIJ(PETSC_COMM_SELF,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1926   PLogObjectParent(B,b->B);
1927 
1928   /* build cache for off array entries formed */
1929   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1930   b->donotstash  = 0;
1931   b->colmap      = 0;
1932   b->garray      = 0;
1933   b->roworiented = 1;
1934 
1935   /* stuff used for matrix vector multiply */
1936   b->lvec      = 0;
1937   b->Mvctx     = 0;
1938 
1939   /* stuff for MatGetRow() */
1940   b->rowindices   = 0;
1941   b->rowvalues    = 0;
1942   b->getrowactive = PETSC_FALSE;
1943 
1944   ierr = PetscObjectComposeFunction((PetscObject)B,"MatStoreValues_C",
1945                                      "MatStoreValues_MPIAIJ",
1946                                      (void*)MatStoreValues_MPIAIJ);CHKERRQ(ierr);
1947   ierr = PetscObjectComposeFunction((PetscObject)B,"MatRetrieveValues_C",
1948                                      "MatRetrieveValues_MPIAIJ",
1949                                      (void*)MatRetrieveValues_MPIAIJ);CHKERRQ(ierr);
1950   ierr = PetscObjectComposeFunction((PetscObject)B,"MatGetDiagonalBlock_C",
1951                                      "MatGetDiagonalBlock_MPIAIJ",
1952                                      (void*)MatGetDiagonalBlock_MPIAIJ);CHKERRQ(ierr);
1953   *A = B;
1954   PetscFunctionReturn(0);
1955 }
1956 
1957 #undef __FUNC__
1958 #define __FUNC__ "MatDuplicate_MPIAIJ"
1959 int MatDuplicate_MPIAIJ(Mat matin,MatDuplicateOption cpvalues,Mat *newmat)
1960 {
1961   Mat        mat;
1962   Mat_MPIAIJ *a,*oldmat = (Mat_MPIAIJ *) matin->data;
1963   int        ierr, len=0, flg;
1964 
1965   PetscFunctionBegin;
1966   *newmat       = 0;
1967   PetscHeaderCreate(mat,_p_Mat,struct _MatOps,MAT_COOKIE,MATMPIAIJ,"Mat",matin->comm,MatDestroy,MatView);
1968   PLogObjectCreate(mat);
1969   mat->data       = (void *) (a = PetscNew(Mat_MPIAIJ)); CHKPTRQ(a);
1970   PetscMemcpy(mat->ops,&MatOps_Values,sizeof(struct _MatOps));
1971   mat->ops->destroy    = MatDestroy_MPIAIJ;
1972   mat->ops->view       = MatView_MPIAIJ;
1973   mat->factor     = matin->factor;
1974   mat->assembled  = PETSC_TRUE;
1975 
1976   a->m = mat->m   = oldmat->m;
1977   a->n = mat->n   = oldmat->n;
1978   a->M = mat->M   = oldmat->M;
1979   a->N = mat->N   = oldmat->N;
1980 
1981   a->rstart       = oldmat->rstart;
1982   a->rend         = oldmat->rend;
1983   a->cstart       = oldmat->cstart;
1984   a->cend         = oldmat->cend;
1985   a->size         = oldmat->size;
1986   a->rank         = oldmat->rank;
1987   mat->insertmode = NOT_SET_VALUES;
1988   a->rowvalues    = 0;
1989   a->getrowactive = PETSC_FALSE;
1990 
1991   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1992   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIAIJ));
1993   a->cowners = a->rowners + a->size + 2;
1994   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1995   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1996   if (oldmat->colmap) {
1997 #if defined (USE_CTABLE)
1998     ierr = TableCreateCopy( &a->colmap, oldmat->colmap ); CHKERRQ(ierr);
1999 #else
2000     a->colmap = (int *) PetscMalloc((a->N)*sizeof(int));CHKPTRQ(a->colmap);
2001     PLogObjectMemory(mat,(a->N)*sizeof(int));
2002     PetscMemcpy(a->colmap,oldmat->colmap,(a->N)*sizeof(int));
2003 #endif
2004   } else a->colmap = 0;
2005   if (oldmat->garray) {
2006     len = ((Mat_SeqAIJ *) (oldmat->B->data))->n;
2007     a->garray = (int *) PetscMalloc((len+1)*sizeof(int)); CHKPTRQ(a->garray);
2008     PLogObjectMemory(mat,len*sizeof(int));
2009     if (len) PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
2010   } else a->garray = 0;
2011 
2012   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
2013   PLogObjectParent(mat,a->lvec);
2014   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
2015   PLogObjectParent(mat,a->Mvctx);
2016   ierr =  MatDuplicate(oldmat->A,cpvalues,&a->A); CHKERRQ(ierr);
2017   PLogObjectParent(mat,a->A);
2018   ierr =  MatDuplicate(oldmat->B,cpvalues,&a->B); CHKERRQ(ierr);
2019   PLogObjectParent(mat,a->B);
2020   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
2021   if (flg) {
2022     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
2023   }
2024   ierr = FListDuplicate(matin->qlist,&mat->qlist);CHKERRQ(ierr);
2025   *newmat = mat;
2026   PetscFunctionReturn(0);
2027 }
2028 
2029 #include "sys.h"
2030 
2031 #undef __FUNC__
2032 #define __FUNC__ "MatLoad_MPIAIJ"
2033 int MatLoad_MPIAIJ(Viewer viewer,MatType type,Mat *newmat)
2034 {
2035   Mat          A;
2036   Scalar       *vals,*svals;
2037   MPI_Comm     comm = ((PetscObject)viewer)->comm;
2038   MPI_Status   status;
2039   int          i, nz, ierr, j,rstart, rend, fd;
2040   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,maxnz,*cols;
2041   int          *ourlens,*sndcounts = 0,*procsnz = 0, *offlens,jj,*mycols,*smycols;
2042   int          tag = ((PetscObject)viewer)->tag,cend,cstart,n;
2043 
2044   PetscFunctionBegin;
2045   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
2046   if (!rank) {
2047     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
2048     ierr = PetscBinaryRead(fd,(char *)header,4,PETSC_INT); CHKERRQ(ierr);
2049     if (header[0] != MAT_COOKIE) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"not matrix object");
2050     if (header[3] < 0) {
2051       SETERRQ(PETSC_ERR_FILE_UNEXPECTED,1,"Matrix in special format on disk, cannot load as MPIAIJ");
2052     }
2053   }
2054 
2055   ierr = MPI_Bcast(header+1,3,MPI_INT,0,comm);CHKERRQ(ierr);
2056   M = header[1]; N = header[2];
2057   /* determine ownership of all rows */
2058   m = M/size + ((M % size) > rank);
2059   rowners = (int *) PetscMalloc((size+2)*sizeof(int)); CHKPTRQ(rowners);
2060   ierr = MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
2061   rowners[0] = 0;
2062   for ( i=2; i<=size; i++ ) {
2063     rowners[i] += rowners[i-1];
2064   }
2065   rstart = rowners[rank];
2066   rend   = rowners[rank+1];
2067 
2068   /* distribute row lengths to all processors */
2069   ourlens = (int*) PetscMalloc( 2*(rend-rstart)*sizeof(int) ); CHKPTRQ(ourlens);
2070   offlens = ourlens + (rend-rstart);
2071   if (!rank) {
2072     rowlengths = (int*) PetscMalloc( M*sizeof(int) ); CHKPTRQ(rowlengths);
2073     ierr = PetscBinaryRead(fd,rowlengths,M,PETSC_INT); CHKERRQ(ierr);
2074     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
2075     for ( i=0; i<size; i++ ) sndcounts[i] = rowners[i+1] - rowners[i];
2076     ierr = MPI_Scatterv(rowlengths,sndcounts,rowners,MPI_INT,ourlens,rend-rstart,MPI_INT,0,comm);CHKERRQ(ierr);
2077     PetscFree(sndcounts);
2078   } else {
2079     ierr = MPI_Scatterv(0,0,0,MPI_INT,ourlens,rend-rstart,MPI_INT, 0,comm);CHKERRQ(ierr);
2080   }
2081 
2082   if (!rank) {
2083     /* calculate the number of nonzeros on each processor */
2084     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
2085     PetscMemzero(procsnz,size*sizeof(int));
2086     for ( i=0; i<size; i++ ) {
2087       for ( j=rowners[i]; j< rowners[i+1]; j++ ) {
2088         procsnz[i] += rowlengths[j];
2089       }
2090     }
2091     PetscFree(rowlengths);
2092 
2093     /* determine max buffer needed and allocate it */
2094     maxnz = 0;
2095     for ( i=0; i<size; i++ ) {
2096       maxnz = PetscMax(maxnz,procsnz[i]);
2097     }
2098     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
2099 
2100     /* read in my part of the matrix column indices  */
2101     nz     = procsnz[0];
2102     mycols = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
2103     ierr   = PetscBinaryRead(fd,mycols,nz,PETSC_INT); CHKERRQ(ierr);
2104 
2105     /* read in every one elses and ship off */
2106     for ( i=1; i<size; i++ ) {
2107       nz   = procsnz[i];
2108       ierr = PetscBinaryRead(fd,cols,nz,PETSC_INT); CHKERRQ(ierr);
2109       ierr = MPI_Send(cols,nz,MPI_INT,i,tag,comm);CHKERRQ(ierr);
2110     }
2111     PetscFree(cols);
2112   } else {
2113     /* determine buffer space needed for message */
2114     nz = 0;
2115     for ( i=0; i<m; i++ ) {
2116       nz += ourlens[i];
2117     }
2118     mycols = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
2119 
2120     /* receive message of column indices*/
2121     ierr = MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);CHKERRQ(ierr);
2122     ierr = MPI_Get_count(&status,MPI_INT,&maxnz);CHKERRQ(ierr);
2123     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"something is wrong with file");
2124   }
2125 
2126   /* determine column ownership if matrix is not square */
2127   if (N != M) {
2128     n      = N/size + ((N % size) > rank);
2129     ierr   = MPI_Scan(&n,&cend,1,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
2130     cstart = cend - n;
2131   } else {
2132     cstart = rstart;
2133     cend   = rend;
2134     n      = cend - cstart;
2135   }
2136 
2137   /* loop over local rows, determining number of off diagonal entries */
2138   PetscMemzero(offlens,m*sizeof(int));
2139   jj = 0;
2140   for ( i=0; i<m; i++ ) {
2141     for ( j=0; j<ourlens[i]; j++ ) {
2142       if (mycols[jj] < cstart || mycols[jj] >= cend) offlens[i]++;
2143       jj++;
2144     }
2145   }
2146 
2147   /* create our matrix */
2148   for ( i=0; i<m; i++ ) {
2149     ourlens[i] -= offlens[i];
2150   }
2151   ierr = MatCreateMPIAIJ(comm,m,n,M,N,0,ourlens,0,offlens,newmat);CHKERRQ(ierr);
2152   A = *newmat;
2153   ierr = MatSetOption(A,MAT_COLUMNS_SORTED); CHKERRQ(ierr);
2154   for ( i=0; i<m; i++ ) {
2155     ourlens[i] += offlens[i];
2156   }
2157 
2158   if (!rank) {
2159     vals = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(vals);
2160 
2161     /* read in my part of the matrix numerical values  */
2162     nz   = procsnz[0];
2163     ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
2164 
2165     /* insert into matrix */
2166     jj      = rstart;
2167     smycols = mycols;
2168     svals   = vals;
2169     for ( i=0; i<m; i++ ) {
2170       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
2171       smycols += ourlens[i];
2172       svals   += ourlens[i];
2173       jj++;
2174     }
2175 
2176     /* read in other processors and ship out */
2177     for ( i=1; i<size; i++ ) {
2178       nz   = procsnz[i];
2179       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
2180       ierr = MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);CHKERRQ(ierr);
2181     }
2182     PetscFree(procsnz);
2183   } else {
2184     /* receive numeric values */
2185     vals = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(vals);
2186 
2187     /* receive message of values*/
2188     ierr = MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);CHKERRQ(ierr);
2189     ierr = MPI_Get_count(&status,MPIU_SCALAR,&maxnz);CHKERRQ(ierr);
2190     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"something is wrong with file");
2191 
2192     /* insert into matrix */
2193     jj      = rstart;
2194     smycols = mycols;
2195     svals   = vals;
2196     for ( i=0; i<m; i++ ) {
2197       ierr     = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
2198       smycols += ourlens[i];
2199       svals   += ourlens[i];
2200       jj++;
2201     }
2202   }
2203   PetscFree(ourlens); PetscFree(vals); PetscFree(mycols); PetscFree(rowners);
2204 
2205   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2206   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2207   PetscFunctionReturn(0);
2208 }
2209 
2210 #undef __FUNC__
2211 #define __FUNC__ "MatGetSubMatrix_MPIAIJ"
2212 /*
2213     Not great since it makes two copies of the submatrix, first an SeqAIJ
2214   in local and then by concatenating the local matrices the end result.
2215   Writing it directly would be much like MatGetSubMatrices_MPIAIJ()
2216 */
2217 int MatGetSubMatrix_MPIAIJ(Mat mat,IS isrow,IS iscol,int csize,MatReuse call,Mat *newmat)
2218 {
2219   int        ierr, i, m,n,rstart,row,rend,nz,*cwork,size,rank,j;
2220   Mat        *local,M, Mreuse;
2221   Scalar     *vwork,*aa;
2222   MPI_Comm   comm = mat->comm;
2223   Mat_SeqAIJ *aij;
2224   int        *ii, *jj,nlocal,*dlens,*olens,dlen,olen,jend;
2225 
2226   PetscFunctionBegin;
2227   MPI_Comm_rank(comm,&rank);
2228   MPI_Comm_size(comm,&size);
2229 
2230   if (call ==  MAT_REUSE_MATRIX) {
2231     ierr = PetscObjectQuery((PetscObject)*newmat,"SubMatrix",(PetscObject *)&Mreuse);CHKERRQ(ierr);
2232     if (!Mreuse) SETERRQ(1,1,"Submatrix passed in was not used before, cannot reuse");
2233     local = &Mreuse;
2234     ierr  = MatGetSubMatrices(mat,1,&isrow,&iscol,MAT_REUSE_MATRIX,&local);CHKERRQ(ierr);
2235   } else {
2236     ierr = MatGetSubMatrices(mat,1,&isrow,&iscol,MAT_INITIAL_MATRIX,&local);CHKERRQ(ierr);
2237     Mreuse = *local;
2238     PetscFree(local);
2239   }
2240 
2241   /*
2242       m - number of local rows
2243       n - number of columns (same on all processors)
2244       rstart - first row in new global matrix generated
2245   */
2246   ierr = MatGetSize(Mreuse,&m,&n);CHKERRQ(ierr);
2247   if (call == MAT_INITIAL_MATRIX) {
2248     aij = (Mat_SeqAIJ *) (Mreuse)->data;
2249     if (aij->indexshift) SETERRQ(PETSC_ERR_SUP,1,"No support for index shifted matrix");
2250     ii  = aij->i;
2251     jj  = aij->j;
2252 
2253     /*
2254         Determine the number of non-zeros in the diagonal and off-diagonal
2255         portions of the matrix in order to do correct preallocation
2256     */
2257 
2258     /* first get start and end of "diagonal" columns */
2259     if (csize == PETSC_DECIDE) {
2260       nlocal = n/size + ((n % size) > rank);
2261     } else {
2262       nlocal = csize;
2263     }
2264     ierr   = MPI_Scan(&nlocal,&rend,1,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
2265     rstart = rend - nlocal;
2266     if (rank == size - 1 && rend != n) {
2267       SETERRQ(1,1,"Local column sizes do not add up to total number of columns");
2268     }
2269 
2270     /* next, compute all the lengths */
2271     dlens = (int *) PetscMalloc( (2*m+1)*sizeof(int) );CHKPTRQ(dlens);
2272     olens = dlens + m;
2273     for ( i=0; i<m; i++ ) {
2274       jend = ii[i+1] - ii[i];
2275       olen = 0;
2276       dlen = 0;
2277       for ( j=0; j<jend; j++ ) {
2278         if ( *jj < rstart || *jj >= rend) olen++;
2279         else dlen++;
2280         jj++;
2281       }
2282       olens[i] = olen;
2283       dlens[i] = dlen;
2284     }
2285     ierr = MatCreateMPIAIJ(comm,m,nlocal,PETSC_DECIDE,n,0,dlens,0,olens,&M);CHKERRQ(ierr);
2286     PetscFree(dlens);
2287   } else {
2288     int ml,nl;
2289 
2290     M = *newmat;
2291     ierr = MatGetLocalSize(M,&ml,&nl);CHKERRQ(ierr);
2292     if (ml != m) SETERRQ(PETSC_ERR_ARG_SIZ,1,"Previous matrix must be same size/layout as request");
2293     ierr = MatZeroEntries(M);CHKERRQ(ierr);
2294     /*
2295          The next two lines are needed so we may call MatSetValues_MPIAIJ() below directly,
2296        rather than the slower MatSetValues().
2297     */
2298     M->was_assembled = PETSC_TRUE;
2299     M->assembled     = PETSC_FALSE;
2300   }
2301   ierr = MatGetOwnershipRange(M,&rstart,&rend); CHKERRQ(ierr);
2302   aij = (Mat_SeqAIJ *) (Mreuse)->data;
2303   if (aij->indexshift) SETERRQ(PETSC_ERR_SUP,1,"No support for index shifted matrix");
2304   ii  = aij->i;
2305   jj  = aij->j;
2306   aa  = aij->a;
2307   for (i=0; i<m; i++) {
2308     row   = rstart + i;
2309     nz    = ii[i+1] - ii[i];
2310     cwork = jj;     jj += nz;
2311     vwork = aa;     aa += nz;
2312     ierr = MatSetValues_MPIAIJ(M,1,&row,nz,cwork,vwork,INSERT_VALUES); CHKERRQ(ierr);
2313   }
2314 
2315   ierr = MatAssemblyBegin(M,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2316   ierr = MatAssemblyEnd(M,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2317   *newmat = M;
2318 
2319   /* save submatrix used in processor for next request */
2320   if (call ==  MAT_INITIAL_MATRIX) {
2321     ierr = PetscObjectCompose((PetscObject)M,"SubMatrix",(PetscObject)Mreuse);CHKERRQ(ierr);
2322     ierr = PetscObjectDereference((PetscObject)Mreuse);CHKERRQ(ierr);
2323   }
2324 
2325   PetscFunctionReturn(0);
2326 }
2327