xref: /petsc/src/mat/impls/aij/mpi/mpiaij.c (revision fbbc08e16cd775b8c27b3e96bde110f8b14868e7)
1 #ifdef PETSC_RCS_HEADER
2 static char vcid[] = "$Id: mpiaij.c,v 1.275 1999/01/27 19:47:24 bsmith Exp curfman $";
3 #endif
4 
5 #include "src/mat/impls/aij/mpi/mpiaij.h"
6 #include "src/vec/vecimpl.h"
7 #include "src/inline/spops.h"
8 
9 /* local utility routine that creates a mapping from the global column
10 number to the local number in the off-diagonal part of the local
11 storage of the matrix.  This is done in a non scable way since the
12 length of colmap equals the global matrix length.
13 */
14 #undef __FUNC__
15 #define __FUNC__ "CreateColmap_MPIAIJ_Private"
16 int CreateColmap_MPIAIJ_Private(Mat mat)
17 {
18   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
19   Mat_SeqAIJ *B = (Mat_SeqAIJ*) aij->B->data;
20   int        n = B->n,i;
21 #if defined (USE_CTABLE)
22   int        ierr;
23 #endif
24 
25   PetscFunctionBegin;
26 #if defined (USE_CTABLE)
27   ierr = TableCreate( &aij->colmap, aij->n/5 ); CHKERRQ(ierr);
28   for ( i=0; i<n; i++ ){
29     ierr = TableAdd( aij->colmap, aij->garray[i] + 1, i+1 );CHKERRQ(ierr);
30   }
31 #else
32   aij->colmap = (int *) PetscMalloc((aij->N+1)*sizeof(int));CHKPTRQ(aij->colmap);
33   PLogObjectMemory(mat,aij->N*sizeof(int));
34   PetscMemzero(aij->colmap,aij->N*sizeof(int));
35   for ( i=0; i<n; i++ ) aij->colmap[aij->garray[i]] = i+1;
36 #endif
37   PetscFunctionReturn(0);
38 }
39 
40 extern int DisAssemble_MPIAIJ(Mat);
41 
42 #define CHUNKSIZE   15
43 #define MatSetValues_SeqAIJ_A_Private(row,col,value,addv) \
44 { \
45  \
46     rp   = aj + ai[row] + shift; ap = aa + ai[row] + shift; \
47     rmax = aimax[row]; nrow = ailen[row];  \
48     col1 = col - shift; \
49      \
50     low = 0; high = nrow; \
51     while (high-low > 5) { \
52       t = (low+high)/2; \
53       if (rp[t] > col) high = t; \
54       else             low  = t; \
55     } \
56       for ( _i=0; _i<nrow; _i++ ) { \
57         if (rp[_i] > col1) break; \
58         if (rp[_i] == col1) { \
59           if (addv == ADD_VALUES) ap[_i] += value;   \
60           else                  ap[_i] = value; \
61           goto a_noinsert; \
62         } \
63       }  \
64       if (nonew == 1) goto a_noinsert; \
65       else if (nonew == -1) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero into matrix"); \
66       if (nrow >= rmax) { \
67         /* there is no extra room in row, therefore enlarge */ \
68         int    new_nz = ai[a->m] + CHUNKSIZE,len,*new_i,*new_j; \
69         Scalar *new_a; \
70  \
71         if (nonew == -2) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero in the matrix"); \
72  \
73         /* malloc new storage space */ \
74         len     = new_nz*(sizeof(int)+sizeof(Scalar))+(a->m+1)*sizeof(int); \
75         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
76         new_j   = (int *) (new_a + new_nz); \
77         new_i   = new_j + new_nz; \
78  \
79         /* copy over old data into new slots */ \
80         for ( ii=0; ii<row+1; ii++ ) {new_i[ii] = ai[ii];} \
81         for ( ii=row+1; ii<a->m+1; ii++ ) {new_i[ii] = ai[ii]+CHUNKSIZE;} \
82         PetscMemcpy(new_j,aj,(ai[row]+nrow+shift)*sizeof(int)); \
83         len = (new_nz - CHUNKSIZE - ai[row] - nrow - shift); \
84         PetscMemcpy(new_j+ai[row]+shift+nrow+CHUNKSIZE,aj+ai[row]+shift+nrow, \
85                                                            len*sizeof(int)); \
86         PetscMemcpy(new_a,aa,(ai[row]+nrow+shift)*sizeof(Scalar)); \
87         PetscMemcpy(new_a+ai[row]+shift+nrow+CHUNKSIZE,aa+ai[row]+shift+nrow, \
88                                                            len*sizeof(Scalar));  \
89         /* free up old matrix storage */ \
90  \
91         PetscFree(a->a);  \
92         if (!a->singlemalloc) {PetscFree(a->i);PetscFree(a->j);} \
93         aa = a->a = new_a; ai = a->i = new_i; aj = a->j = new_j;  \
94         a->singlemalloc = 1; \
95  \
96         rp   = aj + ai[row] + shift; ap = aa + ai[row] + shift; \
97         rmax = aimax[row] = aimax[row] + CHUNKSIZE; \
98         PLogObjectMemory(A,CHUNKSIZE*(sizeof(int) + sizeof(Scalar))); \
99         a->maxnz += CHUNKSIZE; \
100         a->reallocs++; \
101       } \
102       N = nrow++ - 1; a->nz++; \
103       /* shift up all the later entries in this row */ \
104       for ( ii=N; ii>=_i; ii-- ) { \
105         rp[ii+1] = rp[ii]; \
106         ap[ii+1] = ap[ii]; \
107       } \
108       rp[_i] = col1;  \
109       ap[_i] = value;  \
110       a_noinsert: ; \
111       ailen[row] = nrow; \
112 }
113 
114 #define MatSetValues_SeqAIJ_B_Private(row,col,value,addv) \
115 { \
116  \
117     rp   = bj + bi[row] + shift; ap = ba + bi[row] + shift; \
118     rmax = bimax[row]; nrow = bilen[row];  \
119     col1 = col - shift; \
120      \
121     low = 0; high = nrow; \
122     while (high-low > 5) { \
123       t = (low+high)/2; \
124       if (rp[t] > col) high = t; \
125       else             low  = t; \
126     } \
127        for ( _i=0; _i<nrow; _i++ ) { \
128         if (rp[_i] > col1) break; \
129         if (rp[_i] == col1) { \
130           if (addv == ADD_VALUES) ap[_i] += value;   \
131           else                  ap[_i] = value; \
132           goto b_noinsert; \
133         } \
134       }  \
135       if (nonew == 1) goto b_noinsert; \
136       else if (nonew == -1) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero into matrix"); \
137       if (nrow >= rmax) { \
138         /* there is no extra room in row, therefore enlarge */ \
139         int    new_nz = bi[b->m] + CHUNKSIZE,len,*new_i,*new_j; \
140         Scalar *new_a; \
141  \
142         if (nonew == -2) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero in the matrix"); \
143  \
144         /* malloc new storage space */ \
145         len     = new_nz*(sizeof(int)+sizeof(Scalar))+(b->m+1)*sizeof(int); \
146         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
147         new_j   = (int *) (new_a + new_nz); \
148         new_i   = new_j + new_nz; \
149  \
150         /* copy over old data into new slots */ \
151         for ( ii=0; ii<row+1; ii++ ) {new_i[ii] = bi[ii];} \
152         for ( ii=row+1; ii<b->m+1; ii++ ) {new_i[ii] = bi[ii]+CHUNKSIZE;} \
153         PetscMemcpy(new_j,bj,(bi[row]+nrow+shift)*sizeof(int)); \
154         len = (new_nz - CHUNKSIZE - bi[row] - nrow - shift); \
155         PetscMemcpy(new_j+bi[row]+shift+nrow+CHUNKSIZE,bj+bi[row]+shift+nrow, \
156                                                            len*sizeof(int)); \
157         PetscMemcpy(new_a,ba,(bi[row]+nrow+shift)*sizeof(Scalar)); \
158         PetscMemcpy(new_a+bi[row]+shift+nrow+CHUNKSIZE,ba+bi[row]+shift+nrow, \
159                                                            len*sizeof(Scalar));  \
160         /* free up old matrix storage */ \
161  \
162         PetscFree(b->a);  \
163         if (!b->singlemalloc) {PetscFree(b->i);PetscFree(b->j);} \
164         ba = b->a = new_a; bi = b->i = new_i; bj = b->j = new_j;  \
165         b->singlemalloc = 1; \
166  \
167         rp   = bj + bi[row] + shift; ap = ba + bi[row] + shift; \
168         rmax = bimax[row] = bimax[row] + CHUNKSIZE; \
169         PLogObjectMemory(B,CHUNKSIZE*(sizeof(int) + sizeof(Scalar))); \
170         b->maxnz += CHUNKSIZE; \
171         b->reallocs++; \
172       } \
173       N = nrow++ - 1; b->nz++; \
174       /* shift up all the later entries in this row */ \
175       for ( ii=N; ii>=_i; ii-- ) { \
176         rp[ii+1] = rp[ii]; \
177         ap[ii+1] = ap[ii]; \
178       } \
179       rp[_i] = col1;  \
180       ap[_i] = value;  \
181       b_noinsert: ; \
182       bilen[row] = nrow; \
183 }
184 
185 extern int MatSetValues_SeqAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
186 #undef __FUNC__
187 #define __FUNC__ "MatSetValues_MPIAIJ"
188 int MatSetValues_MPIAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
189 {
190   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
191   Scalar     value;
192   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
193   int        cstart = aij->cstart, cend = aij->cend,row,col;
194   int        roworiented = aij->roworiented;
195 
196   /* Some Variables required in the macro */
197   Mat        A = aij->A;
198   Mat_SeqAIJ *a = (Mat_SeqAIJ *) A->data;
199   int        *aimax = a->imax, *ai = a->i, *ailen = a->ilen,*aj = a->j;
200   Scalar     *aa = a->a;
201 
202   Mat        B = aij->B;
203   Mat_SeqAIJ *b = (Mat_SeqAIJ *) B->data;
204   int        *bimax = b->imax, *bi = b->i, *bilen = b->ilen,*bj = b->j;
205   Scalar     *ba = b->a;
206 
207   int        *rp,ii,nrow,_i,rmax, N, col1,low,high,t;
208   int        nonew = a->nonew,shift = a->indexshift;
209   Scalar     *ap;
210 
211   PetscFunctionBegin;
212   for ( i=0; i<m; i++ ) {
213     if (im[i] < 0) continue;
214 #if defined(USE_PETSC_BOPT_g)
215     if (im[i] >= aij->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
216 #endif
217     if (im[i] >= rstart && im[i] < rend) {
218       row = im[i] - rstart;
219       for ( j=0; j<n; j++ ) {
220         if (in[j] >= cstart && in[j] < cend){
221           col = in[j] - cstart;
222           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
223           MatSetValues_SeqAIJ_A_Private(row,col,value,addv);
224           /* ierr = MatSetValues_SeqAIJ(aij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
225         }
226         else if (in[j] < 0) continue;
227 #if defined(USE_PETSC_BOPT_g)
228         else if (in[j] >= aij->N) {SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Column too large");}
229 #endif
230         else {
231           if (mat->was_assembled) {
232             if (!aij->colmap) {
233               ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
234             }
235 #if defined (USE_CTABLE)
236 	    col = TableFind( aij->colmap, in[j] + 1 ) - 1;
237 #else
238             col = aij->colmap[in[j]] - 1;
239 #endif
240             if (col < 0 && !((Mat_SeqAIJ*)(aij->A->data))->nonew) {
241               ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
242               col =  in[j];
243               /* Reinitialize the variables required by MatSetValues_SeqAIJ_B_Private() */
244               B = aij->B;
245               b = (Mat_SeqAIJ *) B->data;
246               bimax = b->imax; bi = b->i; bilen = b->ilen; bj = b->j;
247               ba = b->a;
248             }
249           } else col = in[j];
250           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
251           MatSetValues_SeqAIJ_B_Private(row,col,value,addv);
252           /* ierr = MatSetValues_SeqAIJ(aij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
253         }
254       }
255     } else {
256       if (roworiented && !aij->donotstash) {
257         ierr = StashValues_Private(&aij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
258       } else {
259         if (!aij->donotstash) {
260           row = im[i];
261           for ( j=0; j<n; j++ ) {
262             ierr = StashValues_Private(&aij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
263           }
264         }
265       }
266     }
267   }
268   PetscFunctionReturn(0);
269 }
270 
271 #undef __FUNC__
272 #define __FUNC__ "MatGetValues_MPIAIJ"
273 int MatGetValues_MPIAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
274 {
275   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
276   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
277   int        cstart = aij->cstart, cend = aij->cend,row,col;
278 
279   PetscFunctionBegin;
280   for ( i=0; i<m; i++ ) {
281     if (idxm[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
282     if (idxm[i] >= aij->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
283     if (idxm[i] >= rstart && idxm[i] < rend) {
284       row = idxm[i] - rstart;
285       for ( j=0; j<n; j++ ) {
286         if (idxn[j] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative column");
287         if (idxn[j] >= aij->N) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Column too large");
288         if (idxn[j] >= cstart && idxn[j] < cend){
289           col = idxn[j] - cstart;
290           ierr = MatGetValues(aij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
291         } else {
292           if (!aij->colmap) {
293             ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
294           }
295 #if defined (USE_CTABLE)
296 	  col = TableFind( aij->colmap, idxn[j] + 1 ) - 1;
297 #else
298           col = aij->colmap[idxn[j]] - 1;
299 #endif
300           if ((col < 0) || (aij->garray[col] != idxn[j])) *(v+i*n+j) = 0.0;
301           else {
302             ierr = MatGetValues(aij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
303           }
304         }
305       }
306     } else {
307       SETERRQ(PETSC_ERR_SUP,0,"Only local values currently supported");
308     }
309   }
310   PetscFunctionReturn(0);
311 }
312 
313 #undef __FUNC__
314 #define __FUNC__ "MatAssemblyBegin_MPIAIJ"
315 int MatAssemblyBegin_MPIAIJ(Mat mat,MatAssemblyType mode)
316 {
317   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
318   MPI_Comm    comm = mat->comm;
319   int         size = aij->size, *owners = aij->rowners;
320   int         rank = aij->rank,tag = mat->tag, *owner,*starts,count,ierr;
321   MPI_Request *send_waits,*recv_waits;
322   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
323   InsertMode  addv;
324   Scalar      *rvalues,*svalues;
325 
326   PetscFunctionBegin;
327   if (aij->donotstash) {
328     aij->svalues    = 0; aij->rvalues    = 0;
329     aij->nsends     = 0; aij->nrecvs     = 0;
330     aij->send_waits = 0; aij->recv_waits = 0;
331     aij->rmax       = 0;
332     PetscFunctionReturn(0);
333   }
334 
335   /* make sure all processors are either in INSERTMODE or ADDMODE */
336   ierr = MPI_Allreduce(&mat->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);CHKERRQ(ierr);
337   if (addv == (ADD_VALUES|INSERT_VALUES)) {
338     SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"Some processors inserted others added");
339   }
340   mat->insertmode = addv; /* in case this processor had no cache */
341 
342   /*  first count number of contributors to each processor */
343   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
344   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
345   owner = (int *) PetscMalloc( (aij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
346   for ( i=0; i<aij->stash.n; i++ ) {
347     idx = aij->stash.idx[i];
348     for ( j=0; j<size; j++ ) {
349       if (idx >= owners[j] && idx < owners[j+1]) {
350         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
351       }
352     }
353   }
354   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
355 
356   /* inform other processors of number of messages and max length*/
357   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
358   ierr = MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
359   nreceives = work[rank];
360   ierr = MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);CHKERRQ(ierr);
361   nmax = work[rank];
362   PetscFree(work);
363 
364   /* post receives:
365        1) each message will consist of ordered pairs
366      (global index,value) we store the global index as a double
367      to simplify the message passing.
368        2) since we don't know how long each individual message is we
369      allocate the largest needed buffer for each receive. Potentially
370      this is a lot of wasted space.
371 
372 
373        This could be done better.
374   */
375   rvalues    = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));CHKPTRQ(rvalues);
376   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
377   for ( i=0; i<nreceives; i++ ) {
378     ierr = MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
379               comm,recv_waits+i);CHKERRQ(ierr);
380   }
381 
382   /* do sends:
383       1) starts[i] gives the starting index in svalues for stuff going to
384          the ith processor
385   */
386   svalues    = (Scalar *) PetscMalloc(3*(aij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
387   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
388   starts     = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
389   starts[0]  = 0;
390   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
391   for ( i=0; i<aij->stash.n; i++ ) {
392     svalues[3*starts[owner[i]]]       = (Scalar)  aij->stash.idx[i];
393     svalues[3*starts[owner[i]]+1]     = (Scalar)  aij->stash.idy[i];
394     svalues[3*(starts[owner[i]]++)+2] =  aij->stash.array[i];
395   }
396   PetscFree(owner);
397   starts[0] = 0;
398   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
399   count = 0;
400   for ( i=0; i<size; i++ ) {
401     if (procs[i]) {
402       ierr = MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,comm,send_waits+count++);CHKERRQ(ierr);
403     }
404   }
405   PetscFree(starts);
406   PetscFree(nprocs);
407 
408   /* Free cache space */
409   PLogInfo(aij->A,"MatAssemblyBegin_MPIAIJ:Number of off-processor values %d\n",aij->stash.n);
410   ierr = StashDestroy_Private(&aij->stash); CHKERRQ(ierr);
411 
412   aij->svalues    = svalues;    aij->rvalues    = rvalues;
413   aij->nsends     = nsends;     aij->nrecvs     = nreceives;
414   aij->send_waits = send_waits; aij->recv_waits = recv_waits;
415   aij->rmax       = nmax;
416 
417   PetscFunctionReturn(0);
418 }
419 extern int MatSetUpMultiply_MPIAIJ(Mat);
420 
421 #undef __FUNC__
422 #define __FUNC__ "MatAssemblyEnd_MPIAIJ"
423 int MatAssemblyEnd_MPIAIJ(Mat mat,MatAssemblyType mode)
424 {
425   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
426   MPI_Status  *send_status,recv_status;
427   int         imdex,nrecvs = aij->nrecvs, count = nrecvs, i, n, ierr;
428   int         row,col,other_disassembled;
429   Scalar      *values,val;
430   InsertMode  addv = mat->insertmode;
431 
432   PetscFunctionBegin;
433 
434   /*  wait on receives */
435   while (count) {
436     ierr = MPI_Waitany(nrecvs,aij->recv_waits,&imdex,&recv_status);CHKERRQ(ierr);
437     /* unpack receives into our local space */
438     values = aij->rvalues + 3*imdex*aij->rmax;
439     ierr = MPI_Get_count(&recv_status,MPIU_SCALAR,&n);CHKERRQ(ierr);
440     n = n/3;
441     for ( i=0; i<n; i++ ) {
442       row = (int) PetscReal(values[3*i]) - aij->rstart;
443       col = (int) PetscReal(values[3*i+1]);
444       val = values[3*i+2];
445       if (col >= aij->cstart && col < aij->cend) {
446         col -= aij->cstart;
447         ierr = MatSetValues(aij->A,1,&row,1,&col,&val,addv); CHKERRQ(ierr);
448       } else {
449         if (mat->was_assembled || mat->assembled) {
450           if (!aij->colmap) {
451             ierr = CreateColmap_MPIAIJ_Private(mat); CHKERRQ(ierr);
452           }
453 #if defined (USE_CTABLE)
454           col = TableFind( aij->colmap, col + 1 ) - 1;
455 #else
456           col = aij->colmap[col] - 1;
457 #endif
458           if (col < 0  && !((Mat_SeqAIJ*)(aij->A->data))->nonew) {
459             ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
460             col = (int) PetscReal(values[3*i+1]);
461           }
462         }
463         ierr = MatSetValues(aij->B,1,&row,1,&col,&val,addv); CHKERRQ(ierr);
464       }
465     }
466     count--;
467   }
468   if (aij->recv_waits) PetscFree(aij->recv_waits);
469   if (aij->rvalues)    PetscFree(aij->rvalues);
470 
471   /* wait on sends */
472   if (aij->nsends) {
473     send_status = (MPI_Status *) PetscMalloc(aij->nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
474     ierr        = MPI_Waitall(aij->nsends,aij->send_waits,send_status);CHKERRQ(ierr);
475     PetscFree(send_status);
476   }
477   if (aij->send_waits) PetscFree(aij->send_waits);
478   if (aij->svalues)    PetscFree(aij->svalues);
479 
480   ierr = MatAssemblyBegin(aij->A,mode); CHKERRQ(ierr);
481   ierr = MatAssemblyEnd(aij->A,mode); CHKERRQ(ierr);
482 
483   /* determine if any processor has disassembled, if so we must
484      also disassemble ourselfs, in order that we may reassemble. */
485   /*
486      if nonzero structure of submatrix B cannot change then we know that
487      no processor disassembled thus we can skip this stuff
488   */
489   if (!((Mat_SeqAIJ*) aij->B->data)->nonew)  {
490     ierr = MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);CHKERRQ(ierr);
491     if (mat->was_assembled && !other_disassembled) {
492       ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
493     }
494   }
495 
496   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
497     ierr = MatSetUpMultiply_MPIAIJ(mat); CHKERRQ(ierr);
498   }
499   ierr = MatAssemblyBegin(aij->B,mode); CHKERRQ(ierr);
500   ierr = MatAssemblyEnd(aij->B,mode); CHKERRQ(ierr);
501 
502   if (aij->rowvalues) {PetscFree(aij->rowvalues); aij->rowvalues = 0;}
503   PetscFunctionReturn(0);
504 }
505 
506 #undef __FUNC__
507 #define __FUNC__ "MatZeroEntries_MPIAIJ"
508 int MatZeroEntries_MPIAIJ(Mat A)
509 {
510   Mat_MPIAIJ *l = (Mat_MPIAIJ *) A->data;
511   int        ierr;
512 
513   PetscFunctionBegin;
514   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
515   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
516   PetscFunctionReturn(0);
517 }
518 
519 #undef __FUNC__
520 #define __FUNC__ "MatZeroRows_MPIAIJ"
521 int MatZeroRows_MPIAIJ(Mat A,IS is,Scalar *diag)
522 {
523   Mat_MPIAIJ     *l = (Mat_MPIAIJ *) A->data;
524   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
525   int            *procs,*nprocs,j,found,idx,nsends,*work,row;
526   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
527   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
528   int            *lens,imdex,*lrows,*values,rstart=l->rstart;
529   MPI_Comm       comm = A->comm;
530   MPI_Request    *send_waits,*recv_waits;
531   MPI_Status     recv_status,*send_status;
532   IS             istmp;
533   PetscTruth     localdiag;
534 
535   PetscFunctionBegin;
536   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
537   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
538 
539   /*  first count number of contributors to each processor */
540   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
541   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
542   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
543   for ( i=0; i<N; i++ ) {
544     idx = rows[i];
545     found = 0;
546     for ( j=0; j<size; j++ ) {
547       if (idx >= owners[j] && idx < owners[j+1]) {
548         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
549       }
550     }
551     if (!found) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Index out of range");
552   }
553   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
554 
555   /* inform other processors of number of messages and max length*/
556   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
557   ierr = MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
558   nrecvs = work[rank];
559   ierr = MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);CHKERRQ(ierr);
560   nmax = work[rank];
561   PetscFree(work);
562 
563   /* post receives:   */
564   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int));CHKPTRQ(rvalues);
565   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
566   for ( i=0; i<nrecvs; i++ ) {
567     ierr = MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);CHKERRQ(ierr);
568   }
569 
570   /* do sends:
571       1) starts[i] gives the starting index in svalues for stuff going to
572          the ith processor
573   */
574   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
575   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
576   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
577   starts[0] = 0;
578   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
579   for ( i=0; i<N; i++ ) {
580     svalues[starts[owner[i]]++] = rows[i];
581   }
582   ISRestoreIndices(is,&rows);
583 
584   starts[0] = 0;
585   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
586   count = 0;
587   for ( i=0; i<size; i++ ) {
588     if (procs[i]) {
589       ierr = MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);CHKERRQ(ierr);
590     }
591   }
592   PetscFree(starts);
593 
594   base = owners[rank];
595 
596   /*  wait on receives */
597   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
598   source = lens + nrecvs;
599   count  = nrecvs; slen = 0;
600   while (count) {
601     ierr = MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);CHKERRQ(ierr);
602     /* unpack receives into our local space */
603     ierr = MPI_Get_count(&recv_status,MPI_INT,&n);CHKERRQ(ierr);
604     source[imdex]  = recv_status.MPI_SOURCE;
605     lens[imdex]  = n;
606     slen += n;
607     count--;
608   }
609   PetscFree(recv_waits);
610 
611   /* move the data into the send scatter */
612   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
613   count = 0;
614   for ( i=0; i<nrecvs; i++ ) {
615     values = rvalues + i*nmax;
616     for ( j=0; j<lens[i]; j++ ) {
617       lrows[count++] = values[j] - base;
618     }
619   }
620   PetscFree(rvalues); PetscFree(lens);
621   PetscFree(owner); PetscFree(nprocs);
622 
623   /* actually zap the local rows */
624   ierr = ISCreateGeneral(PETSC_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
625   PLogObjectParent(A,istmp);
626 
627   /*
628         Zero the required rows. If the "diagonal block" of the matrix
629      is square and the user wishes to set the diagonal we use seperate
630      code so that MatSetValues() is not called for each diagonal allocating
631      new memory, thus calling lots of mallocs and slowing things down.
632 
633        Contributed by: Mathew Knepley
634   */
635   localdiag = PETSC_FALSE;
636   if (diag && (l->A->M == l->A->N)) {
637     localdiag = PETSC_TRUE;
638     ierr      = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
639   } else {
640     ierr = MatZeroRows(l->A,istmp,0); CHKERRQ(ierr);
641   }
642   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
643   ierr = ISDestroy(istmp); CHKERRQ(ierr);
644 
645   if (diag && (localdiag == PETSC_FALSE)) {
646     for ( i = 0; i < slen; i++ ) {
647       row = lrows[i] + rstart;
648       MatSetValues(A,1,&row,1,&row,diag,INSERT_VALUES);
649     }
650     MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY);
651     MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY);
652   }
653 
654   if (diag) {
655     for ( i = 0; i < slen; i++ ) {
656       row = lrows[i] + rstart;
657       MatSetValues(A,1,&row,1,&row,diag,INSERT_VALUES);
658     }
659     MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY);
660     MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY);
661   }
662   PetscFree(lrows);
663 
664   /* wait on sends */
665   if (nsends) {
666     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
667     ierr        = MPI_Waitall(nsends,send_waits,send_status);CHKERRQ(ierr);
668     PetscFree(send_status);
669   }
670   PetscFree(send_waits); PetscFree(svalues);
671 
672   PetscFunctionReturn(0);
673 }
674 
675 #undef __FUNC__
676 #define __FUNC__ "MatMult_MPIAIJ"
677 int MatMult_MPIAIJ(Mat A,Vec xx,Vec yy)
678 {
679   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
680   int        ierr,nt;
681 
682   PetscFunctionBegin;
683   ierr = VecGetLocalSize(xx,&nt);  CHKERRQ(ierr);
684   if (nt != a->n) {
685     SETERRQ(PETSC_ERR_ARG_SIZ,0,"Incompatible partition of A and xx");
686   }
687   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx); CHKERRQ(ierr);
688   ierr = (*a->A->ops->mult)(a->A,xx,yy); CHKERRQ(ierr);
689   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx); CHKERRQ(ierr);
690   ierr = (*a->B->ops->multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
691   PetscFunctionReturn(0);
692 }
693 
694 #undef __FUNC__
695 #define __FUNC__ "MatMultAdd_MPIAIJ"
696 int MatMultAdd_MPIAIJ(Mat A,Vec xx,Vec yy,Vec zz)
697 {
698   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
699   int        ierr;
700 
701   PetscFunctionBegin;
702   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
703   ierr = (*a->A->ops->multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
704   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
705   ierr = (*a->B->ops->multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
706   PetscFunctionReturn(0);
707 }
708 
709 #undef __FUNC__
710 #define __FUNC__ "MatMultTrans_MPIAIJ"
711 int MatMultTrans_MPIAIJ(Mat A,Vec xx,Vec yy)
712 {
713   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
714   int        ierr;
715 
716   PetscFunctionBegin;
717   /* do nondiagonal part */
718   ierr = (*a->B->ops->multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
719   /* send it on its way */
720   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
721   /* do local part */
722   ierr = (*a->A->ops->multtrans)(a->A,xx,yy); CHKERRQ(ierr);
723   /* receive remote parts: note this assumes the values are not actually */
724   /* inserted in yy until the next line, which is true for my implementation*/
725   /* but is not perhaps always true. */
726   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
727   PetscFunctionReturn(0);
728 }
729 
730 #undef __FUNC__
731 #define __FUNC__ "MatMultTransAdd_MPIAIJ"
732 int MatMultTransAdd_MPIAIJ(Mat A,Vec xx,Vec yy,Vec zz)
733 {
734   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
735   int        ierr;
736 
737   PetscFunctionBegin;
738   /* do nondiagonal part */
739   ierr = (*a->B->ops->multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
740   /* send it on its way */
741   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
742   /* do local part */
743   ierr = (*a->A->ops->multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
744   /* receive remote parts: note this assumes the values are not actually */
745   /* inserted in yy until the next line, which is true for my implementation*/
746   /* but is not perhaps always true. */
747   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
748   PetscFunctionReturn(0);
749 }
750 
751 /*
752   This only works correctly for square matrices where the subblock A->A is the
753    diagonal block
754 */
755 #undef __FUNC__
756 #define __FUNC__ "MatGetDiagonal_MPIAIJ"
757 int MatGetDiagonal_MPIAIJ(Mat A,Vec v)
758 {
759   int        ierr;
760   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
761 
762   PetscFunctionBegin;
763   if (a->M != a->N) SETERRQ(PETSC_ERR_SUP,0,"Supports only square matrix where A->A is diag block");
764   if (a->rstart != a->cstart || a->rend != a->cend) {
765     SETERRQ(PETSC_ERR_ARG_SIZ,0,"row partition must equal col partition");
766   }
767   ierr = MatGetDiagonal(a->A,v);CHKERRQ(ierr);
768   PetscFunctionReturn(0);
769 }
770 
771 #undef __FUNC__
772 #define __FUNC__ "MatScale_MPIAIJ"
773 int MatScale_MPIAIJ(Scalar *aa,Mat A)
774 {
775   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
776   int        ierr;
777 
778   PetscFunctionBegin;
779   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
780   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
781   PetscFunctionReturn(0);
782 }
783 
784 #undef __FUNC__
785 #define __FUNC__ "MatDestroy_MPIAIJ"
786 int MatDestroy_MPIAIJ(Mat mat)
787 {
788   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
789   int        ierr;
790 
791   PetscFunctionBegin;
792   if (--mat->refct > 0) PetscFunctionReturn(0);
793 
794   if (mat->mapping) {
795     ierr = ISLocalToGlobalMappingDestroy(mat->mapping); CHKERRQ(ierr);
796   }
797   if (mat->bmapping) {
798     ierr = ISLocalToGlobalMappingDestroy(mat->bmapping); CHKERRQ(ierr);
799   }
800   if (mat->rmap) {
801     ierr = MapDestroy(mat->rmap);CHKERRQ(ierr);
802   }
803   if (mat->cmap) {
804     ierr = MapDestroy(mat->cmap);CHKERRQ(ierr);
805   }
806 #if defined(USE_PETSC_LOG)
807   PLogObjectState((PetscObject)mat,"Rows=%d, Cols=%d",aij->M,aij->N);
808 #endif
809   ierr = StashDestroy_Private(&aij->stash); CHKERRQ(ierr);
810   PetscFree(aij->rowners);
811   ierr = MatDestroy(aij->A); CHKERRQ(ierr);
812   ierr = MatDestroy(aij->B); CHKERRQ(ierr);
813 #if defined (USE_CTABLE)
814   if (aij->colmap) TableDelete(aij->colmap);
815 #else
816   if (aij->colmap) PetscFree(aij->colmap);
817 #endif
818   if (aij->garray) PetscFree(aij->garray);
819   if (aij->lvec)   VecDestroy(aij->lvec);
820   if (aij->Mvctx)  VecScatterDestroy(aij->Mvctx);
821   if (aij->rowvalues) PetscFree(aij->rowvalues);
822   PetscFree(aij);
823   PLogObjectDestroy(mat);
824   PetscHeaderDestroy(mat);
825   PetscFunctionReturn(0);
826 }
827 
828 #undef __FUNC__
829 #define __FUNC__ "MatView_MPIAIJ_Binary"
830 extern int MatView_MPIAIJ_Binary(Mat mat,Viewer viewer)
831 {
832   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
833   int         ierr;
834 
835   PetscFunctionBegin;
836   if (aij->size == 1) {
837     ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
838   }
839   else SETERRQ(PETSC_ERR_SUP,0,"Only uniprocessor output supported");
840   PetscFunctionReturn(0);
841 }
842 
843 #undef __FUNC__
844 #define __FUNC__ "MatView_MPIAIJ_ASCIIorDraworSocket"
845 extern int MatView_MPIAIJ_ASCIIorDraworSocket(Mat mat,Viewer viewer)
846 {
847   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
848   Mat_SeqAIJ* C = (Mat_SeqAIJ*)aij->A->data;
849   int         ierr, format,shift = C->indexshift,rank = aij->rank ;
850   int         size = aij->size;
851   FILE        *fd;
852   ViewerType  vtype;
853 
854   PetscFunctionBegin;
855   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
856   if (PetscTypeCompare(vtype,ASCII_VIEWER)) {
857     ierr = ViewerGetFormat(viewer,&format);
858     if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
859       MatInfo info;
860       int     flg;
861       MPI_Comm_rank(mat->comm,&rank);
862       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
863       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
864       ierr = OptionsHasName(PETSC_NULL,"-mat_aij_no_inode",&flg); CHKERRQ(ierr);
865       PetscSequentialPhaseBegin(mat->comm,1);
866       if (flg) fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d mem %d, not using I-node routines\n",
867          rank,aij->m,(int)info.nz_used,(int)info.nz_allocated,(int)info.memory);
868       else fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d mem %d, using I-node routines\n",
869          rank,aij->m,(int)info.nz_used,(int)info.nz_allocated,(int)info.memory);
870       ierr = MatGetInfo(aij->A,MAT_LOCAL,&info);
871       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used);
872       ierr = MatGetInfo(aij->B,MAT_LOCAL,&info);
873       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used);
874       fflush(fd);
875       PetscSequentialPhaseEnd(mat->comm,1);
876       ierr = VecScatterView(aij->Mvctx,viewer); CHKERRQ(ierr);
877       PetscFunctionReturn(0);
878     } else if (format == VIEWER_FORMAT_ASCII_INFO) {
879       PetscFunctionReturn(0);
880     }
881   } else if (PetscTypeCompare(vtype,DRAW_VIEWER)) {
882     Draw       draw;
883     PetscTruth isnull;
884     ierr = ViewerDrawGetDraw(viewer,0,&draw); CHKERRQ(ierr);
885     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) PetscFunctionReturn(0);
886   }
887 
888   if (size == 1) {
889     ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
890   } else {
891     /* assemble the entire matrix onto first processor. */
892     Mat         A;
893     Mat_SeqAIJ *Aloc;
894     int         M = aij->M, N = aij->N,m,*ai,*aj,row,*cols,i,*ct;
895     Scalar      *a;
896 
897     if (!rank) {
898       ierr = MatCreateMPIAIJ(mat->comm,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);CHKERRQ(ierr);
899     } else {
900       ierr = MatCreateMPIAIJ(mat->comm,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);CHKERRQ(ierr);
901     }
902     PLogObjectParent(mat,A);
903 
904     /* copy over the A part */
905     Aloc = (Mat_SeqAIJ*) aij->A->data;
906     m = Aloc->m; ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
907     row = aij->rstart;
908     for ( i=0; i<ai[m]+shift; i++ ) {aj[i] += aij->cstart + shift;}
909     for ( i=0; i<m; i++ ) {
910       ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],aj,a,INSERT_VALUES);CHKERRQ(ierr);
911       row++; a += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
912     }
913     aj = Aloc->j;
914     for ( i=0; i<ai[m]+shift; i++ ) {aj[i] -= aij->cstart + shift;}
915 
916     /* copy over the B part */
917     Aloc = (Mat_SeqAIJ*) aij->B->data;
918     m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
919     row = aij->rstart;
920     ct = cols = (int *) PetscMalloc( (ai[m]+1)*sizeof(int) ); CHKPTRQ(cols);
921     for ( i=0; i<ai[m]+shift; i++ ) {cols[i] = aij->garray[aj[i]+shift];}
922     for ( i=0; i<m; i++ ) {
923       ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],cols,a,INSERT_VALUES);CHKERRQ(ierr);
924       row++; a += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
925     }
926     PetscFree(ct);
927     ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
928     ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
929     /*
930        Everyone has to call to draw the matrix since the graphics waits are
931        synchronized across all processors that share the Draw object
932     */
933     if (!rank || PetscTypeCompare(vtype,DRAW_VIEWER)) {
934       ierr = MatView(((Mat_MPIAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
935     }
936     ierr = MatDestroy(A); CHKERRQ(ierr);
937   }
938   PetscFunctionReturn(0);
939 }
940 
941 #undef __FUNC__
942 #define __FUNC__ "MatView_MPIAIJ"
943 int MatView_MPIAIJ(Mat mat,Viewer viewer)
944 {
945   int         ierr;
946   ViewerType  vtype;
947 
948   PetscFunctionBegin;
949   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
950   if (PetscTypeCompare(vtype,ASCII_VIEWER) || PetscTypeCompare(vtype,DRAW_VIEWER) ||
951       PetscTypeCompare(vtype,SOCKET_VIEWER)) {
952     ierr = MatView_MPIAIJ_ASCIIorDraworSocket(mat,viewer); CHKERRQ(ierr);
953   } else if (PetscTypeCompare(vtype,BINARY_VIEWER)) {
954     ierr = MatView_MPIAIJ_Binary(mat,viewer);CHKERRQ(ierr);
955   } else {
956     SETERRQ(1,1,"Viewer type not supported by PETSc object");
957   }
958   PetscFunctionReturn(0);
959 }
960 
961 /*
962     This has to provide several versions.
963 
964      2) a) use only local smoothing updating outer values only once.
965         b) local smoothing updating outer values each inner iteration
966      3) color updating out values betwen colors.
967 */
968 #undef __FUNC__
969 #define __FUNC__ "MatRelax_MPIAIJ"
970 int MatRelax_MPIAIJ(Mat matin,Vec bb,double omega,MatSORType flag,
971                            double fshift,int its,Vec xx)
972 {
973   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
974   Mat        AA = mat->A, BB = mat->B;
975   Mat_SeqAIJ *A = (Mat_SeqAIJ *) AA->data, *B = (Mat_SeqAIJ *)BB->data;
976   Scalar     *b,*x,*xs,*ls,d,*v,sum;
977   int        ierr,*idx, *diag;
978   int        n = mat->n, m = mat->m, i,shift = A->indexshift;
979 
980   PetscFunctionBegin;
981   ierr = VecGetArray(xx,&x); CHKERRQ(ierr);
982   ierr = VecGetArray(bb,&b); CHKERRQ(ierr);
983   ierr = VecGetArray(mat->lvec,&ls);CHKERRQ(ierr);
984   xs = x + shift; /* shift by one for index start of 1 */
985   ls = ls + shift;
986   if (!A->diag) {ierr = MatMarkDiag_SeqAIJ(AA); CHKERRQ(ierr);}
987   diag = A->diag;
988   if ((flag & SOR_LOCAL_SYMMETRIC_SWEEP) == SOR_LOCAL_SYMMETRIC_SWEEP){
989     if (flag & SOR_ZERO_INITIAL_GUESS) {
990       ierr = (*mat->A->ops->relax)(mat->A,bb,omega,flag,fshift,its,xx);CHKERRQ(ierr);
991       ierr = VecRestoreArray(xx,&x); CHKERRQ(ierr);
992       ierr = VecRestoreArray(bb,&b); CHKERRQ(ierr);
993       ierr = VecRestoreArray(mat->lvec,&ls);CHKERRQ(ierr);
994       PetscFunctionReturn(0);
995     }
996     ierr = VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);CHKERRQ(ierr);
997     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);CHKERRQ(ierr);
998     while (its--) {
999       /* go down through the rows */
1000       for ( i=0; i<m; i++ ) {
1001         n    = A->i[i+1] - A->i[i];
1002 	PLogFlops(4*n+3);
1003         idx  = A->j + A->i[i] + shift;
1004         v    = A->a + A->i[i] + shift;
1005         sum  = b[i];
1006         SPARSEDENSEMDOT(sum,xs,v,idx,n);
1007         d    = fshift + A->a[diag[i]+shift];
1008         n    = B->i[i+1] - B->i[i];
1009         idx  = B->j + B->i[i] + shift;
1010         v    = B->a + B->i[i] + shift;
1011         SPARSEDENSEMDOT(sum,ls,v,idx,n);
1012         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
1013       }
1014       /* come up through the rows */
1015       for ( i=m-1; i>-1; i-- ) {
1016         n    = A->i[i+1] - A->i[i];
1017 	PLogFlops(4*n+3)
1018         idx  = A->j + A->i[i] + shift;
1019         v    = A->a + A->i[i] + shift;
1020         sum  = b[i];
1021         SPARSEDENSEMDOT(sum,xs,v,idx,n);
1022         d    = fshift + A->a[diag[i]+shift];
1023         n    = B->i[i+1] - B->i[i];
1024         idx  = B->j + B->i[i] + shift;
1025         v    = B->a + B->i[i] + shift;
1026         SPARSEDENSEMDOT(sum,ls,v,idx,n);
1027         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
1028       }
1029     }
1030   } else if (flag & SOR_LOCAL_FORWARD_SWEEP){
1031     if (flag & SOR_ZERO_INITIAL_GUESS) {
1032       ierr = (*mat->A->ops->relax)(mat->A,bb,omega,flag,fshift,its,xx);CHKERRQ(ierr);
1033       ierr = VecRestoreArray(xx,&x); CHKERRQ(ierr);
1034       ierr = VecRestoreArray(bb,&b); CHKERRQ(ierr);
1035       ierr = VecRestoreArray(mat->lvec,&ls);CHKERRQ(ierr);
1036       PetscFunctionReturn(0);
1037     }
1038     ierr = VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);CHKERRQ(ierr);
1039     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);CHKERRQ(ierr);
1040     while (its--) {
1041       for ( i=0; i<m; i++ ) {
1042         n    = A->i[i+1] - A->i[i];
1043 	PLogFlops(4*n+3);
1044         idx  = A->j + A->i[i] + shift;
1045         v    = A->a + A->i[i] + shift;
1046         sum  = b[i];
1047         SPARSEDENSEMDOT(sum,xs,v,idx,n);
1048         d    = fshift + A->a[diag[i]+shift];
1049         n    = B->i[i+1] - B->i[i];
1050         idx  = B->j + B->i[i] + shift;
1051         v    = B->a + B->i[i] + shift;
1052         SPARSEDENSEMDOT(sum,ls,v,idx,n);
1053         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
1054       }
1055     }
1056   } else if (flag & SOR_LOCAL_BACKWARD_SWEEP){
1057     if (flag & SOR_ZERO_INITIAL_GUESS) {
1058       ierr = (*mat->A->ops->relax)(mat->A,bb,omega,flag,fshift,its,xx);CHKERRQ(ierr);
1059       ierr = VecRestoreArray(xx,&x); CHKERRQ(ierr);
1060       ierr = VecRestoreArray(bb,&b); CHKERRQ(ierr);
1061       ierr = VecRestoreArray(mat->lvec,&ls);CHKERRQ(ierr);
1062       PetscFunctionReturn(0);
1063     }
1064     ierr = VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);CHKERRQ(ierr);
1065     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);CHKERRQ(ierr);
1066     while (its--) {
1067       for ( i=m-1; i>-1; i-- ) {
1068         n    = A->i[i+1] - A->i[i];
1069 	PLogFlops(4*n+3);
1070         idx  = A->j + A->i[i] + shift;
1071         v    = A->a + A->i[i] + shift;
1072         sum  = b[i];
1073         SPARSEDENSEMDOT(sum,xs,v,idx,n);
1074         d    = fshift + A->a[diag[i]+shift];
1075         n    = B->i[i+1] - B->i[i];
1076         idx  = B->j + B->i[i] + shift;
1077         v    = B->a + B->i[i] + shift;
1078         SPARSEDENSEMDOT(sum,ls,v,idx,n);
1079         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
1080       }
1081     }
1082   } else {
1083     SETERRQ(PETSC_ERR_SUP,0,"Parallel SOR not supported");
1084   }
1085   ierr = VecRestoreArray(xx,&x); CHKERRQ(ierr);
1086   ierr = VecRestoreArray(bb,&b); CHKERRQ(ierr);
1087   ierr = VecRestoreArray(mat->lvec,&ls);CHKERRQ(ierr);
1088   PetscFunctionReturn(0);
1089 }
1090 
1091 #undef __FUNC__
1092 #define __FUNC__ "MatGetInfo_MPIAIJ"
1093 int MatGetInfo_MPIAIJ(Mat matin,MatInfoType flag,MatInfo *info)
1094 {
1095   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1096   Mat        A = mat->A, B = mat->B;
1097   int        ierr;
1098   double     isend[5], irecv[5];
1099 
1100   PetscFunctionBegin;
1101   info->block_size     = 1.0;
1102   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
1103   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->nz_unneeded;
1104   isend[3] = info->memory;  isend[4] = info->mallocs;
1105   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
1106   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->nz_unneeded;
1107   isend[3] += info->memory;  isend[4] += info->mallocs;
1108   if (flag == MAT_LOCAL) {
1109     info->nz_used      = isend[0];
1110     info->nz_allocated = isend[1];
1111     info->nz_unneeded  = isend[2];
1112     info->memory       = isend[3];
1113     info->mallocs      = isend[4];
1114   } else if (flag == MAT_GLOBAL_MAX) {
1115     ierr = MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_MAX,matin->comm);CHKERRQ(ierr);
1116     info->nz_used      = irecv[0];
1117     info->nz_allocated = irecv[1];
1118     info->nz_unneeded  = irecv[2];
1119     info->memory       = irecv[3];
1120     info->mallocs      = irecv[4];
1121   } else if (flag == MAT_GLOBAL_SUM) {
1122     ierr = MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_SUM,matin->comm);CHKERRQ(ierr);
1123     info->nz_used      = irecv[0];
1124     info->nz_allocated = irecv[1];
1125     info->nz_unneeded  = irecv[2];
1126     info->memory       = irecv[3];
1127     info->mallocs      = irecv[4];
1128   }
1129   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
1130   info->fill_ratio_needed = 0;
1131   info->factor_mallocs    = 0;
1132   info->rows_global       = (double)mat->M;
1133   info->columns_global    = (double)mat->N;
1134   info->rows_local        = (double)mat->m;
1135   info->columns_local     = (double)mat->N;
1136 
1137   PetscFunctionReturn(0);
1138 }
1139 
1140 #undef __FUNC__
1141 #define __FUNC__ "MatSetOption_MPIAIJ"
1142 int MatSetOption_MPIAIJ(Mat A,MatOption op)
1143 {
1144   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
1145 
1146   PetscFunctionBegin;
1147   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
1148       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
1149       op == MAT_COLUMNS_UNSORTED ||
1150       op == MAT_COLUMNS_SORTED ||
1151       op == MAT_NEW_NONZERO_ALLOCATION_ERROR ||
1152       op == MAT_NEW_NONZERO_LOCATION_ERROR) {
1153         MatSetOption(a->A,op);
1154         MatSetOption(a->B,op);
1155   } else if (op == MAT_ROW_ORIENTED) {
1156     a->roworiented = 1;
1157     MatSetOption(a->A,op);
1158     MatSetOption(a->B,op);
1159   } else if (op == MAT_ROWS_SORTED ||
1160              op == MAT_ROWS_UNSORTED ||
1161              op == MAT_SYMMETRIC ||
1162              op == MAT_STRUCTURALLY_SYMMETRIC ||
1163              op == MAT_YES_NEW_DIAGONALS)
1164     PLogInfo(A,"MatSetOption_MPIAIJ:Option ignored\n");
1165   else if (op == MAT_COLUMN_ORIENTED) {
1166     a->roworiented = 0;
1167     MatSetOption(a->A,op);
1168     MatSetOption(a->B,op);
1169   } else if (op == MAT_IGNORE_OFF_PROC_ENTRIES) {
1170     a->donotstash = 1;
1171   } else if (op == MAT_NO_NEW_DIAGONALS){
1172     SETERRQ(PETSC_ERR_SUP,0,"MAT_NO_NEW_DIAGONALS");
1173   } else {
1174     SETERRQ(PETSC_ERR_SUP,0,"unknown option");
1175   }
1176   PetscFunctionReturn(0);
1177 }
1178 
1179 #undef __FUNC__
1180 #define __FUNC__ "MatGetSize_MPIAIJ"
1181 int MatGetSize_MPIAIJ(Mat matin,int *m,int *n)
1182 {
1183   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1184 
1185   PetscFunctionBegin;
1186   if (m) *m = mat->M;
1187   if (n) *n = mat->N;
1188   PetscFunctionReturn(0);
1189 }
1190 
1191 #undef __FUNC__
1192 #define __FUNC__ "MatGetLocalSize_MPIAIJ"
1193 int MatGetLocalSize_MPIAIJ(Mat matin,int *m,int *n)
1194 {
1195   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1196 
1197   PetscFunctionBegin;
1198   if (m) *m = mat->m;
1199   if (n) *n = mat->n;
1200   PetscFunctionReturn(0);
1201 }
1202 
1203 #undef __FUNC__
1204 #define __FUNC__ "MatGetOwnershipRange_MPIAIJ"
1205 int MatGetOwnershipRange_MPIAIJ(Mat matin,int *m,int *n)
1206 {
1207   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1208 
1209   PetscFunctionBegin;
1210   *m = mat->rstart; *n = mat->rend;
1211   PetscFunctionReturn(0);
1212 }
1213 
1214 extern int MatGetRow_SeqAIJ(Mat,int,int*,int**,Scalar**);
1215 extern int MatRestoreRow_SeqAIJ(Mat,int,int*,int**,Scalar**);
1216 
1217 #undef __FUNC__
1218 #define __FUNC__ "MatGetRow_MPIAIJ"
1219 int MatGetRow_MPIAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
1220 {
1221   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1222   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
1223   int        i, ierr, *cworkA, *cworkB, **pcA, **pcB, cstart = mat->cstart;
1224   int        nztot, nzA, nzB, lrow, rstart = mat->rstart, rend = mat->rend;
1225   int        *cmap, *idx_p;
1226 
1227   PetscFunctionBegin;
1228   if (mat->getrowactive == PETSC_TRUE) SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"Already active");
1229   mat->getrowactive = PETSC_TRUE;
1230 
1231   if (!mat->rowvalues && (idx || v)) {
1232     /*
1233         allocate enough space to hold information from the longest row.
1234     */
1235     Mat_SeqAIJ *Aa = (Mat_SeqAIJ *) mat->A->data,*Ba = (Mat_SeqAIJ *) mat->B->data;
1236     int     max = 1,m = mat->m,tmp;
1237     for ( i=0; i<m; i++ ) {
1238       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
1239       if (max < tmp) { max = tmp; }
1240     }
1241     mat->rowvalues  = (Scalar *) PetscMalloc(max*(sizeof(int)+sizeof(Scalar)));CHKPTRQ(mat->rowvalues);
1242     mat->rowindices = (int *) (mat->rowvalues + max);
1243   }
1244 
1245   if (row < rstart || row >= rend) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Only local rows")
1246   lrow = row - rstart;
1247 
1248   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
1249   if (!v)   {pvA = 0; pvB = 0;}
1250   if (!idx) {pcA = 0; if (!v) pcB = 0;}
1251   ierr = (*mat->A->ops->getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1252   ierr = (*mat->B->ops->getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1253   nztot = nzA + nzB;
1254 
1255   cmap  = mat->garray;
1256   if (v  || idx) {
1257     if (nztot) {
1258       /* Sort by increasing column numbers, assuming A and B already sorted */
1259       int imark = -1;
1260       if (v) {
1261         *v = v_p = mat->rowvalues;
1262         for ( i=0; i<nzB; i++ ) {
1263           if (cmap[cworkB[i]] < cstart)   v_p[i] = vworkB[i];
1264           else break;
1265         }
1266         imark = i;
1267         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
1268         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
1269       }
1270       if (idx) {
1271         *idx = idx_p = mat->rowindices;
1272         if (imark > -1) {
1273           for ( i=0; i<imark; i++ ) {
1274             idx_p[i] = cmap[cworkB[i]];
1275           }
1276         } else {
1277           for ( i=0; i<nzB; i++ ) {
1278             if (cmap[cworkB[i]] < cstart)   idx_p[i] = cmap[cworkB[i]];
1279             else break;
1280           }
1281           imark = i;
1282         }
1283         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart + cworkA[i];
1284         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]];
1285       }
1286     } else {
1287       if (idx) *idx = 0;
1288       if (v)   *v   = 0;
1289     }
1290   }
1291   *nz = nztot;
1292   ierr = (*mat->A->ops->restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1293   ierr = (*mat->B->ops->restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1294   PetscFunctionReturn(0);
1295 }
1296 
1297 #undef __FUNC__
1298 #define __FUNC__ "MatRestoreRow_MPIAIJ"
1299 int MatRestoreRow_MPIAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
1300 {
1301   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
1302 
1303   PetscFunctionBegin;
1304   if (aij->getrowactive == PETSC_FALSE) {
1305     SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"MatGetRow not called");
1306   }
1307   aij->getrowactive = PETSC_FALSE;
1308   PetscFunctionReturn(0);
1309 }
1310 
1311 #undef __FUNC__
1312 #define __FUNC__ "MatNorm_MPIAIJ"
1313 int MatNorm_MPIAIJ(Mat mat,NormType type,double *norm)
1314 {
1315   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
1316   Mat_SeqAIJ *amat = (Mat_SeqAIJ*) aij->A->data, *bmat = (Mat_SeqAIJ*) aij->B->data;
1317   int        ierr, i, j, cstart = aij->cstart,shift = amat->indexshift;
1318   double     sum = 0.0;
1319   Scalar     *v;
1320 
1321   PetscFunctionBegin;
1322   if (aij->size == 1) {
1323     ierr =  MatNorm(aij->A,type,norm); CHKERRQ(ierr);
1324   } else {
1325     if (type == NORM_FROBENIUS) {
1326       v = amat->a;
1327       for (i=0; i<amat->nz; i++ ) {
1328 #if defined(USE_PETSC_COMPLEX)
1329         sum += PetscReal(PetscConj(*v)*(*v)); v++;
1330 #else
1331         sum += (*v)*(*v); v++;
1332 #endif
1333       }
1334       v = bmat->a;
1335       for (i=0; i<bmat->nz; i++ ) {
1336 #if defined(USE_PETSC_COMPLEX)
1337         sum += PetscReal(PetscConj(*v)*(*v)); v++;
1338 #else
1339         sum += (*v)*(*v); v++;
1340 #endif
1341       }
1342       ierr = MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);CHKERRQ(ierr);
1343       *norm = sqrt(*norm);
1344     } else if (type == NORM_1) { /* max column norm */
1345       double *tmp, *tmp2;
1346       int    *jj, *garray = aij->garray;
1347       tmp  = (double *) PetscMalloc( (aij->N+1)*sizeof(double) ); CHKPTRQ(tmp);
1348       tmp2 = (double *) PetscMalloc( (aij->N+1)*sizeof(double) ); CHKPTRQ(tmp2);
1349       PetscMemzero(tmp,aij->N*sizeof(double));
1350       *norm = 0.0;
1351       v = amat->a; jj = amat->j;
1352       for ( j=0; j<amat->nz; j++ ) {
1353         tmp[cstart + *jj++ + shift] += PetscAbsScalar(*v);  v++;
1354       }
1355       v = bmat->a; jj = bmat->j;
1356       for ( j=0; j<bmat->nz; j++ ) {
1357         tmp[garray[*jj++ + shift]] += PetscAbsScalar(*v); v++;
1358       }
1359       ierr = MPI_Allreduce(tmp,tmp2,aij->N,MPI_DOUBLE,MPI_SUM,mat->comm);CHKERRQ(ierr);
1360       for ( j=0; j<aij->N; j++ ) {
1361         if (tmp2[j] > *norm) *norm = tmp2[j];
1362       }
1363       PetscFree(tmp); PetscFree(tmp2);
1364     } else if (type == NORM_INFINITY) { /* max row norm */
1365       double ntemp = 0.0;
1366       for ( j=0; j<amat->m; j++ ) {
1367         v = amat->a + amat->i[j] + shift;
1368         sum = 0.0;
1369         for ( i=0; i<amat->i[j+1]-amat->i[j]; i++ ) {
1370           sum += PetscAbsScalar(*v); v++;
1371         }
1372         v = bmat->a + bmat->i[j] + shift;
1373         for ( i=0; i<bmat->i[j+1]-bmat->i[j]; i++ ) {
1374           sum += PetscAbsScalar(*v); v++;
1375         }
1376         if (sum > ntemp) ntemp = sum;
1377       }
1378       ierr = MPI_Allreduce(&ntemp,norm,1,MPI_DOUBLE,MPI_MAX,mat->comm);CHKERRQ(ierr);
1379     } else {
1380       SETERRQ(PETSC_ERR_SUP,0,"No support for two norm");
1381     }
1382   }
1383   PetscFunctionReturn(0);
1384 }
1385 
1386 #undef __FUNC__
1387 #define __FUNC__ "MatTranspose_MPIAIJ"
1388 int MatTranspose_MPIAIJ(Mat A,Mat *matout)
1389 {
1390   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
1391   Mat_SeqAIJ *Aloc = (Mat_SeqAIJ *) a->A->data;
1392   int        ierr,shift = Aloc->indexshift;
1393   int        M = a->M, N = a->N,m,*ai,*aj,row,*cols,i,*ct;
1394   Mat        B;
1395   Scalar     *array;
1396 
1397   PetscFunctionBegin;
1398   if (matout == PETSC_NULL && M != N) {
1399     SETERRQ(PETSC_ERR_ARG_SIZ,0,"Square matrix only for in-place");
1400   }
1401 
1402   ierr = MatCreateMPIAIJ(A->comm,a->n,a->m,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);CHKERRQ(ierr);
1403 
1404   /* copy over the A part */
1405   Aloc = (Mat_SeqAIJ*) a->A->data;
1406   m = Aloc->m; ai = Aloc->i; aj = Aloc->j; array = Aloc->a;
1407   row = a->rstart;
1408   for ( i=0; i<ai[m]+shift; i++ ) {aj[i] += a->cstart + shift;}
1409   for ( i=0; i<m; i++ ) {
1410     ierr = MatSetValues(B,ai[i+1]-ai[i],aj,1,&row,array,INSERT_VALUES);CHKERRQ(ierr);
1411     row++; array += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
1412   }
1413   aj = Aloc->j;
1414   for ( i=0; i<ai[m]+shift; i++ ) {aj[i] -= a->cstart + shift;}
1415 
1416   /* copy over the B part */
1417   Aloc = (Mat_SeqAIJ*) a->B->data;
1418   m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; array = Aloc->a;
1419   row = a->rstart;
1420   ct = cols = (int *) PetscMalloc( (1+ai[m]-shift)*sizeof(int) ); CHKPTRQ(cols);
1421   for ( i=0; i<ai[m]+shift; i++ ) {cols[i] = a->garray[aj[i]+shift];}
1422   for ( i=0; i<m; i++ ) {
1423     ierr = MatSetValues(B,ai[i+1]-ai[i],cols,1,&row,array,INSERT_VALUES);CHKERRQ(ierr);
1424     row++; array += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
1425   }
1426   PetscFree(ct);
1427   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1428   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1429   if (matout != PETSC_NULL) {
1430     *matout = B;
1431   } else {
1432     PetscOps       *Abops;
1433     struct _MatOps *Aops;
1434 
1435     /* This isn't really an in-place transpose .... but free data structures from a */
1436     PetscFree(a->rowners);
1437     ierr = MatDestroy(a->A); CHKERRQ(ierr);
1438     ierr = MatDestroy(a->B); CHKERRQ(ierr);
1439 #if defined (USE_CTABLE)
1440     if (a->colmap) TableDelete(a->colmap);
1441 #else
1442     if (a->colmap) PetscFree(a->colmap);
1443 #endif
1444     if (a->garray) PetscFree(a->garray);
1445     if (a->lvec) VecDestroy(a->lvec);
1446     if (a->Mvctx) VecScatterDestroy(a->Mvctx);
1447     PetscFree(a);
1448 
1449     /*
1450        This is horrible, horrible code. We need to keep the
1451       A pointers for the bops and ops but copy everything
1452       else from C.
1453     */
1454     Abops = A->bops;
1455     Aops  = A->ops;
1456     PetscMemcpy(A,B,sizeof(struct _p_Mat));
1457     A->bops = Abops;
1458     A->ops  = Aops;
1459     PetscHeaderDestroy(B);
1460   }
1461   PetscFunctionReturn(0);
1462 }
1463 
1464 #undef __FUNC__
1465 #define __FUNC__ "MatDiagonalScale_MPIAIJ"
1466 int MatDiagonalScale_MPIAIJ(Mat mat,Vec ll,Vec rr)
1467 {
1468   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
1469   Mat a = aij->A, b = aij->B;
1470   int ierr,s1,s2,s3;
1471 
1472   PetscFunctionBegin;
1473   ierr = MatGetLocalSize(mat,&s2,&s3); CHKERRQ(ierr);
1474   if (rr) {
1475     ierr = VecGetLocalSize(rr,&s1);CHKERRQ(ierr);
1476     if (s1!=s3) SETERRQ(PETSC_ERR_ARG_SIZ,0,"right vector non-conforming local size");
1477     /* Overlap communication with computation. */
1478     ierr = VecScatterBegin(rr,aij->lvec,INSERT_VALUES,SCATTER_FORWARD,aij->Mvctx); CHKERRQ(ierr);
1479   }
1480   if (ll) {
1481     ierr = VecGetLocalSize(ll,&s1);CHKERRQ(ierr);
1482     if (s1!=s2) SETERRQ(PETSC_ERR_ARG_SIZ,0,"left vector non-conforming local size");
1483     ierr = (*b->ops->diagonalscale)(b,ll,0); CHKERRQ(ierr);
1484   }
1485   /* scale  the diagonal block */
1486   ierr = (*a->ops->diagonalscale)(a,ll,rr); CHKERRQ(ierr);
1487 
1488   if (rr) {
1489     /* Do a scatter end and then right scale the off-diagonal block */
1490     ierr = VecScatterEnd(rr,aij->lvec,INSERT_VALUES,SCATTER_FORWARD,aij->Mvctx); CHKERRQ(ierr);
1491     ierr = (*b->ops->diagonalscale)(b,0,aij->lvec); CHKERRQ(ierr);
1492   }
1493 
1494   PetscFunctionReturn(0);
1495 }
1496 
1497 
1498 extern int MatPrintHelp_SeqAIJ(Mat);
1499 #undef __FUNC__
1500 #define __FUNC__ "MatPrintHelp_MPIAIJ"
1501 int MatPrintHelp_MPIAIJ(Mat A)
1502 {
1503   Mat_MPIAIJ *a   = (Mat_MPIAIJ*) A->data;
1504   int        ierr;
1505 
1506   PetscFunctionBegin;
1507   if (!a->rank) {
1508     ierr = MatPrintHelp_SeqAIJ(a->A);CHKERRQ(ierr);
1509   }
1510   PetscFunctionReturn(0);
1511 }
1512 
1513 #undef __FUNC__
1514 #define __FUNC__ "MatGetBlockSize_MPIAIJ"
1515 int MatGetBlockSize_MPIAIJ(Mat A,int *bs)
1516 {
1517   PetscFunctionBegin;
1518   *bs = 1;
1519   PetscFunctionReturn(0);
1520 }
1521 #undef __FUNC__
1522 #define __FUNC__ "MatSetUnfactored_MPIAIJ"
1523 int MatSetUnfactored_MPIAIJ(Mat A)
1524 {
1525   Mat_MPIAIJ *a   = (Mat_MPIAIJ*) A->data;
1526   int        ierr;
1527 
1528   PetscFunctionBegin;
1529   ierr = MatSetUnfactored(a->A); CHKERRQ(ierr);
1530   PetscFunctionReturn(0);
1531 }
1532 
1533 #undef __FUNC__
1534 #define __FUNC__ "MatEqual_MPIAIJ"
1535 int MatEqual_MPIAIJ(Mat A, Mat B, PetscTruth *flag)
1536 {
1537   Mat_MPIAIJ *matB = (Mat_MPIAIJ *) B->data,*matA = (Mat_MPIAIJ *) A->data;
1538   Mat        a, b, c, d;
1539   PetscTruth flg;
1540   int        ierr;
1541 
1542   PetscFunctionBegin;
1543   if (B->type != MATMPIAIJ) SETERRQ(PETSC_ERR_ARG_INCOMP,0,"Matrices must be same type");
1544   a = matA->A; b = matA->B;
1545   c = matB->A; d = matB->B;
1546 
1547   ierr = MatEqual(a, c, &flg); CHKERRQ(ierr);
1548   if (flg == PETSC_TRUE) {
1549     ierr = MatEqual(b, d, &flg); CHKERRQ(ierr);
1550   }
1551   ierr = MPI_Allreduce(&flg, flag, 1, MPI_INT, MPI_LAND, A->comm);CHKERRQ(ierr);
1552   PetscFunctionReturn(0);
1553 }
1554 
1555 #undef __FUNC__
1556 #define __FUNC__ "MatCopy_MPIAIJ"
1557 int MatCopy_MPIAIJ(Mat A,Mat B,MatStructure str)
1558 {
1559   int        ierr;
1560   Mat_MPIAIJ *a = (Mat_MPIAIJ *)A->data;
1561   Mat_MPIAIJ *b = (Mat_MPIAIJ *)B->data;
1562 
1563   PetscFunctionBegin;
1564   if (str != SAME_NONZERO_PATTERN || B->type != MATMPIAIJ) {
1565     /* because of the column compression in the off-processor part of the matrix a->B,
1566        the number of columns in a->B and b->B may be different, hence we cannot call
1567        the MatCopy() directly on the two parts. If need be, we can provide a more
1568        efficient copy than the MatCopy_Basic() by first uncompressing the a->B matrices
1569        then copying the submatrices */
1570     ierr = MatCopy_Basic(A,B,str);CHKERRQ(ierr);
1571   } else {
1572     ierr = MatCopy(a->A,b->A,str);CHKERRQ(ierr);
1573     ierr = MatCopy(a->B,b->B,str);CHKERRQ(ierr);
1574   }
1575   PetscFunctionReturn(0);
1576 }
1577 
1578 extern int MatDuplicate_MPIAIJ(Mat,MatDuplicateOption,Mat *);
1579 extern int MatIncreaseOverlap_MPIAIJ(Mat , int, IS *, int);
1580 extern int MatFDColoringCreate_MPIAIJ(Mat,ISColoring,MatFDColoring);
1581 extern int MatGetSubMatrices_MPIAIJ (Mat ,int , IS *,IS *,MatReuse,Mat **);
1582 extern int MatGetSubMatrix_MPIAIJ (Mat ,IS,IS,int,MatReuse,Mat *);
1583 
1584 /* -------------------------------------------------------------------*/
1585 static struct _MatOps MatOps_Values = {MatSetValues_MPIAIJ,
1586        MatGetRow_MPIAIJ,
1587        MatRestoreRow_MPIAIJ,
1588        MatMult_MPIAIJ,
1589        MatMultAdd_MPIAIJ,
1590        MatMultTrans_MPIAIJ,
1591        MatMultTransAdd_MPIAIJ,
1592        0,
1593        0,
1594        0,
1595        0,
1596        0,
1597        0,
1598        MatRelax_MPIAIJ,
1599        MatTranspose_MPIAIJ,
1600        MatGetInfo_MPIAIJ,
1601        MatEqual_MPIAIJ,
1602        MatGetDiagonal_MPIAIJ,
1603        MatDiagonalScale_MPIAIJ,
1604        MatNorm_MPIAIJ,
1605        MatAssemblyBegin_MPIAIJ,
1606        MatAssemblyEnd_MPIAIJ,
1607        0,
1608        MatSetOption_MPIAIJ,
1609        MatZeroEntries_MPIAIJ,
1610        MatZeroRows_MPIAIJ,
1611        0,
1612        0,
1613        0,
1614        0,
1615        MatGetSize_MPIAIJ,
1616        MatGetLocalSize_MPIAIJ,
1617        MatGetOwnershipRange_MPIAIJ,
1618        0,
1619        0,
1620        0,
1621        0,
1622        MatDuplicate_MPIAIJ,
1623        0,
1624        0,
1625        0,
1626        0,
1627        0,
1628        MatGetSubMatrices_MPIAIJ,
1629        MatIncreaseOverlap_MPIAIJ,
1630        MatGetValues_MPIAIJ,
1631        MatCopy_MPIAIJ,
1632        MatPrintHelp_MPIAIJ,
1633        MatScale_MPIAIJ,
1634        0,
1635        0,
1636        0,
1637        MatGetBlockSize_MPIAIJ,
1638        0,
1639        0,
1640        0,
1641        0,
1642        MatFDColoringCreate_MPIAIJ,
1643        0,
1644        MatSetUnfactored_MPIAIJ,
1645        0,
1646        0,
1647        MatGetSubMatrix_MPIAIJ,
1648        0,
1649        0,
1650        MatGetMaps_Petsc};
1651 
1652 /* ----------------------------------------------------------------------------------------*/
1653 
1654 EXTERN_C_BEGIN
1655 #undef __FUNC__
1656 #define __FUNC__ "MatStoreValues_MPIAIJ"
1657 int MatStoreValues_MPIAIJ(Mat mat)
1658 {
1659   Mat_MPIAIJ *aij = (Mat_MPIAIJ *)mat->data;
1660   int        ierr;
1661 
1662   PetscFunctionBegin;
1663   ierr = MatStoreValues(aij->A);CHKERRQ(ierr);
1664   ierr = MatStoreValues(aij->B);CHKERRQ(ierr);
1665   PetscFunctionReturn(0);
1666 }
1667 EXTERN_C_END
1668 
1669 EXTERN_C_BEGIN
1670 #undef __FUNC__
1671 #define __FUNC__ "MatRetrieveValues_MPIAIJ"
1672 int MatRetrieveValues_MPIAIJ(Mat mat)
1673 {
1674   Mat_MPIAIJ *aij = (Mat_MPIAIJ *)mat->data;
1675   int        ierr;
1676 
1677   PetscFunctionBegin;
1678   ierr = MatRetrieveValues(aij->A);CHKERRQ(ierr);
1679   ierr = MatRetrieveValues(aij->B);CHKERRQ(ierr);
1680   PetscFunctionReturn(0);
1681 }
1682 EXTERN_C_END
1683 
1684 #include "pc.h"
1685 EXTERN_C_BEGIN
1686 extern int MatGetDiagonalBlock_MPIAIJ(Mat,PetscTruth *,MatReuse,Mat *);
1687 EXTERN_C_END
1688 
1689 #undef __FUNC__
1690 #define __FUNC__ "MatCreateMPIAIJ"
1691 /*@C
1692    MatCreateMPIAIJ - Creates a sparse parallel matrix in AIJ format
1693    (the default parallel PETSc format).  For good matrix assembly performance
1694    the user should preallocate the matrix storage by setting the parameters
1695    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1696    performance can be increased by more than a factor of 50.
1697 
1698    Collective on MPI_Comm
1699 
1700    Input Parameters:
1701 +  comm - MPI communicator
1702 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1703            This value should be the same as the local size used in creating the
1704            y vector for the matrix-vector product y = Ax.
1705 .  n - This value should be the same as the local size used in creating the
1706        x vector for the matrix-vector product y = Ax. (or PETSC_DECIDE to have
1707        calculated if N is given) For square matrices n is almost always m.
1708 .  M - number of global rows (or PETSC_DETERMINE to have calculated if m is given)
1709 .  N - number of global columns (or PETSC_DETERMINE to have calculated if n is given)
1710 .  d_nz  - number of nonzeros per row in DIAGONAL portion of local submatrix
1711            (same value is used for all local rows)
1712 .  d_nnz - array containing the number of nonzeros in the various rows of the
1713            DIAGONAL portion of the local submatrix (possibly different for each row)
1714            or PETSC_NULL, if d_nz is used to specify the nonzero structure.
1715            The size of this array is equal to the number of local rows, i.e 'm'.
1716            You must leave room for the diagonal entry even if it is zero.
1717 .  o_nz  - number of nonzeros per row in the OFF-DIAGONAL portion of local
1718            submatrix (same value is used for all local rows).
1719 -  o_nnz - array containing the number of nonzeros in the various rows of the
1720            OFF-DIAGONAL portion of the local submatrix (possibly different for
1721            each row) or PETSC_NULL, if o_nz is used to specify the nonzero
1722            structure. The size of this array is equal to the number
1723            of local rows, i.e 'm'.
1724 
1725    Output Parameter:
1726 .  A - the matrix
1727 
1728    Notes:
1729    m,n,M,N parameters specify the size of the matrix, and its partitioning across
1730    processors, while d_nz,d_nnz,o_nz,o_nnz parameters specify the approximate
1731    storage requirements for this matrix.
1732 
1733    If PETSC_DECIDE or  PETSC_DETERMINE is used for a particular argument on one
1734    processor than it must be used on all processors that share the object for
1735    that argument.
1736 
1737    The AIJ format (also called the Yale sparse matrix format or
1738    compressed row storage), is fully compatible with standard Fortran 77
1739    storage.  That is, the stored row and column indices can begin at
1740    either one (as in Fortran) or zero.  See the users manual for details.
1741 
1742    The user MUST specify either the local or global matrix dimensions
1743    (possibly both).
1744 
1745    The parallel matrix is partitioned such that the first m0 rows belong to
1746    process 0, the next m1 rows belong to process 1, the next m2 rows belong
1747    to process 2 etc.. where m0,m1,m2... are the input parameter 'm'.
1748 
1749    The DIAGONAL portion of the local submatrix of a processor can be defined
1750    as the submatrix which is obtained by extraction the part corresponding
1751    to the rows r1-r2 and columns r1-r2 of the global matrix, where r1 is the
1752    first row that belongs to the processor, and r2 is the last row belonging
1753    to the this processor. This is a square mxm matrix. The remaining portion
1754    of the local submatrix (mxN) constitute the OFF-DIAGONAL portion.
1755 
1756    If o_nnz, d_nnz are specified, then o_nz, and d_nz are ignored.
1757 
1758    By default, this format uses inodes (identical nodes) when possible.
1759    We search for consecutive rows with the same nonzero structure, thereby
1760    reusing matrix information to achieve increased efficiency.
1761 
1762    Options Database Keys:
1763 +  -mat_aij_no_inode  - Do not use inodes
1764 .  -mat_aij_inode_limit <limit> - Sets inode limit (max limit=5)
1765 -  -mat_aij_oneindex - Internally use indexing starting at 1
1766         rather than 0.  Note that when calling MatSetValues(),
1767         the user still MUST index entries starting at 0!
1768 .   -mat_mpi - use the parallel matrix data structures even on one processor
1769                (defaults to using SeqBAIJ format on one processor)
1770 .   -mat_mpi - use the parallel matrix data structures even on one processor
1771                (defaults to using SeqAIJ format on one processor)
1772 
1773 
1774    Example usage:
1775 
1776    Consider the following 8x8 matrix with 34 non-zero values, that is
1777    assembled across 3 processors. Lets assume that proc0 owns 3 rows,
1778    proc1 owns 3 rows, proc2 owns 2 rows. This division can be shown
1779    as follows:
1780 
1781 .vb
1782             1  2  0  |  0  3  0  |  0  4
1783     Proc0   0  5  6  |  7  0  0  |  8  0
1784             9  0 10  | 11  0  0  | 12  0
1785     -------------------------------------
1786            13  0 14  | 15 16 17  |  0  0
1787     Proc1   0 18  0  | 19 20 21  |  0  0
1788             0  0  0  | 22 23  0  | 24  0
1789     -------------------------------------
1790     Proc2  25 26 27  |  0  0 28  | 29  0
1791            30  0  0  | 31 32 33  |  0 34
1792 .ve
1793 
1794    This can be represented as a collection of submatrices as:
1795 
1796 .vb
1797       A B C
1798       D E F
1799       G H I
1800 .ve
1801 
1802    Where the submatrices A,B,C are owned by proc0, D,E,F are
1803    owned by proc1, G,H,I are owned by proc2.
1804 
1805    The 'm' parameters for proc0,proc1,proc2 are 3,3,2 respectively.
1806    The 'n' parameters for proc0,proc1,proc2 are 3,3,2 respectively.
1807    The 'M','N' parameters are 8,8, and have the same values on all procs.
1808 
1809    The DIAGONAL submatrices corresponding to proc0,proc1,proc2 are
1810    submatrices [A], [E], [I] respectively. The OFF-DIAGONAL submatrices
1811    corresponding to proc0,proc1,proc2 are [BC], [DF], [GH] respectively.
1812    Internally, each processor stores the DIAGONAL part, and the OFF-DIAGONAL
1813    part as SeqAIJ matrices. for eg: proc1 will store [E] as a SeqAIJ
1814    matrix, ans [DF] as another SeqAIJ matrix.
1815 
1816    When d_nz, o_nz parameters are specified, d_nz storage elements are
1817    allocated for every row of the local diagonal submatrix, and o_nz
1818    storage locations are allocated for every row of the OFF-DIAGONAL submat.
1819    One way to choose d_nz and o_nz is to use the max nonzerors per local
1820    rows for each of the local DIAGONAL, and the OFF-DIAGONAL submatrices.
1821    In this case, the values of d_nz,o_nz are:
1822 .vb
1823      proc0 : dnz = 2, o_nz = 2
1824      proc1 : dnz = 3, o_nz = 2
1825      proc2 : dnz = 1, o_nz = 4
1826 .ve
1827    We are allocating m*(d_nz+o_nz) storage locations for every proc. This
1828    translates to 3*(2+2)=12 for proc0, 3*(3+2)=15 for proc1, 2*(1+4)=10
1829    for proc3. i.e we are using 12+15+10=37 storage locations to store
1830    34 values.
1831 
1832    When d_nnz, o_nnz parameters are specified, the storage is specified
1833    for every row, coresponding to both DIAGONAL and OFF-DIAGONAL submatrices.
1834    In the above case the values for d_nnz,o_nnz are:
1835 .vb
1836      proc0: d_nnz = [2,2,2] and o_nnz = [2,2,2]
1837      proc1: d_nnz = [3,3,2] and o_nnz = [2,1,1]
1838      proc2: d_nnz = [1,1]   and o_nnz = [4,4]
1839 .ve
1840    Here the space allocated is sum of all the above values i.e 34, and
1841    hence pre-allocation is perfect.
1842 
1843    Level: intermediate
1844 
1845 .keywords: matrix, aij, compressed row, sparse, parallel
1846 
1847 .seealso: MatCreate(), MatCreateSeqAIJ(), MatSetValues()
1848 @*/
1849 int MatCreateMPIAIJ(MPI_Comm comm,int m,int n,int M,int N,int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1850 {
1851   Mat          B;
1852   Mat_MPIAIJ   *b;
1853   int          ierr, i,sum[2],work[2],size,flag1 = 0, flag2 = 0;
1854 
1855   PetscFunctionBegin;
1856   MPI_Comm_size(comm,&size);
1857   ierr = OptionsHasName(PETSC_NULL,"-mat_mpiaij",&flag1); CHKERRQ(ierr);
1858   ierr = OptionsHasName(PETSC_NULL,"-mat_mpi",&flag2); CHKERRQ(ierr);
1859   if (!flag1 && !flag2 && size == 1) {
1860     if (M == PETSC_DECIDE) M = m;
1861     if (N == PETSC_DECIDE) N = n;
1862     ierr = MatCreateSeqAIJ(comm,M,N,d_nz,d_nnz,A); CHKERRQ(ierr);
1863     PetscFunctionReturn(0);
1864   }
1865 
1866   *A = 0;
1867   PetscHeaderCreate(B,_p_Mat,struct _MatOps,MAT_COOKIE,MATMPIAIJ,"Mat",comm,MatDestroy,MatView);
1868   PLogObjectCreate(B);
1869   B->data            = (void *) (b = PetscNew(Mat_MPIAIJ)); CHKPTRQ(b);
1870   PetscMemzero(b,sizeof(Mat_MPIAIJ));
1871   PetscMemcpy(B->ops,&MatOps_Values,sizeof(struct _MatOps));
1872   B->ops->destroy    = MatDestroy_MPIAIJ;
1873   B->ops->view       = MatView_MPIAIJ;
1874   B->factor          = 0;
1875   B->assembled       = PETSC_FALSE;
1876   B->mapping         = 0;
1877 
1878   B->insertmode      = NOT_SET_VALUES;
1879   b->size            = size;
1880   MPI_Comm_rank(comm,&b->rank);
1881 
1882   if (m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL)) {
1883     SETERRQ(PETSC_ERR_ARG_WRONG,0,"Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1884   }
1885 
1886   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1887     work[0] = m; work[1] = n;
1888     ierr = MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );CHKERRQ(ierr);
1889     if (M == PETSC_DECIDE) M = sum[0];
1890     if (N == PETSC_DECIDE) N = sum[1];
1891   }
1892   if (m == PETSC_DECIDE) {m = M/b->size + ((M % b->size) > b->rank);}
1893   if (n == PETSC_DECIDE) {n = N/b->size + ((N % b->size) > b->rank);}
1894   b->m = m; B->m = m;
1895   b->n = n; B->n = n;
1896   b->N = N; B->N = N;
1897   b->M = M; B->M = M;
1898 
1899   /* the information in the maps duplicates the information computed below, eventually
1900      we should remove the duplicate information that is not contained in the maps */
1901   ierr = MapCreateMPI(comm,m,M,&B->rmap);CHKERRQ(ierr);
1902   ierr = MapCreateMPI(comm,n,N,&B->cmap);CHKERRQ(ierr);
1903 
1904   /* build local table of row and column ownerships */
1905   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1906   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIAIJ));
1907   b->cowners = b->rowners + b->size + 2;
1908   ierr = MPI_Allgather(&m,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1909   b->rowners[0] = 0;
1910   for ( i=2; i<=b->size; i++ ) {
1911     b->rowners[i] += b->rowners[i-1];
1912   }
1913   b->rstart = b->rowners[b->rank];
1914   b->rend   = b->rowners[b->rank+1];
1915   ierr = MPI_Allgather(&n,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1916   b->cowners[0] = 0;
1917   for ( i=2; i<=b->size; i++ ) {
1918     b->cowners[i] += b->cowners[i-1];
1919   }
1920   b->cstart = b->cowners[b->rank];
1921   b->cend   = b->cowners[b->rank+1];
1922 
1923   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1924   ierr = MatCreateSeqAIJ(PETSC_COMM_SELF,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1925   PLogObjectParent(B,b->A);
1926   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1927   ierr = MatCreateSeqAIJ(PETSC_COMM_SELF,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1928   PLogObjectParent(B,b->B);
1929 
1930   /* build cache for off array entries formed */
1931   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1932   b->donotstash  = 0;
1933   b->colmap      = 0;
1934   b->garray      = 0;
1935   b->roworiented = 1;
1936 
1937   /* stuff used for matrix vector multiply */
1938   b->lvec      = 0;
1939   b->Mvctx     = 0;
1940 
1941   /* stuff for MatGetRow() */
1942   b->rowindices   = 0;
1943   b->rowvalues    = 0;
1944   b->getrowactive = PETSC_FALSE;
1945 
1946   ierr = PetscObjectComposeFunction((PetscObject)B,"MatStoreValues_C",
1947                                      "MatStoreValues_MPIAIJ",
1948                                      (void*)MatStoreValues_MPIAIJ);CHKERRQ(ierr);
1949   ierr = PetscObjectComposeFunction((PetscObject)B,"MatRetrieveValues_C",
1950                                      "MatRetrieveValues_MPIAIJ",
1951                                      (void*)MatRetrieveValues_MPIAIJ);CHKERRQ(ierr);
1952   ierr = PetscObjectComposeFunction((PetscObject)B,"MatGetDiagonalBlock_C",
1953                                      "MatGetDiagonalBlock_MPIAIJ",
1954                                      (void*)MatGetDiagonalBlock_MPIAIJ);CHKERRQ(ierr);
1955   *A = B;
1956   PetscFunctionReturn(0);
1957 }
1958 
1959 #undef __FUNC__
1960 #define __FUNC__ "MatDuplicate_MPIAIJ"
1961 int MatDuplicate_MPIAIJ(Mat matin,MatDuplicateOption cpvalues,Mat *newmat)
1962 {
1963   Mat        mat;
1964   Mat_MPIAIJ *a,*oldmat = (Mat_MPIAIJ *) matin->data;
1965   int        ierr, len=0, flg;
1966 
1967   PetscFunctionBegin;
1968   *newmat       = 0;
1969   PetscHeaderCreate(mat,_p_Mat,struct _MatOps,MAT_COOKIE,MATMPIAIJ,"Mat",matin->comm,MatDestroy,MatView);
1970   PLogObjectCreate(mat);
1971   mat->data       = (void *) (a = PetscNew(Mat_MPIAIJ)); CHKPTRQ(a);
1972   PetscMemcpy(mat->ops,&MatOps_Values,sizeof(struct _MatOps));
1973   mat->ops->destroy    = MatDestroy_MPIAIJ;
1974   mat->ops->view       = MatView_MPIAIJ;
1975   mat->factor     = matin->factor;
1976   mat->assembled  = PETSC_TRUE;
1977 
1978   a->m = mat->m   = oldmat->m;
1979   a->n = mat->n   = oldmat->n;
1980   a->M = mat->M   = oldmat->M;
1981   a->N = mat->N   = oldmat->N;
1982 
1983   a->rstart       = oldmat->rstart;
1984   a->rend         = oldmat->rend;
1985   a->cstart       = oldmat->cstart;
1986   a->cend         = oldmat->cend;
1987   a->size         = oldmat->size;
1988   a->rank         = oldmat->rank;
1989   mat->insertmode = NOT_SET_VALUES;
1990   a->rowvalues    = 0;
1991   a->getrowactive = PETSC_FALSE;
1992 
1993   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1994   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIAIJ));
1995   a->cowners = a->rowners + a->size + 2;
1996   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1997   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1998   if (oldmat->colmap) {
1999 #if defined (USE_CTABLE)
2000     ierr = TableCreateCopy( &a->colmap, oldmat->colmap ); CHKERRQ(ierr);
2001 #else
2002     a->colmap = (int *) PetscMalloc((a->N)*sizeof(int));CHKPTRQ(a->colmap);
2003     PLogObjectMemory(mat,(a->N)*sizeof(int));
2004     PetscMemcpy(a->colmap,oldmat->colmap,(a->N)*sizeof(int));
2005 #endif
2006   } else a->colmap = 0;
2007   if (oldmat->garray) {
2008     len = ((Mat_SeqAIJ *) (oldmat->B->data))->n;
2009     a->garray = (int *) PetscMalloc((len+1)*sizeof(int)); CHKPTRQ(a->garray);
2010     PLogObjectMemory(mat,len*sizeof(int));
2011     if (len) PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
2012   } else a->garray = 0;
2013 
2014   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
2015   PLogObjectParent(mat,a->lvec);
2016   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
2017   PLogObjectParent(mat,a->Mvctx);
2018   ierr =  MatDuplicate(oldmat->A,cpvalues,&a->A); CHKERRQ(ierr);
2019   PLogObjectParent(mat,a->A);
2020   ierr =  MatDuplicate(oldmat->B,cpvalues,&a->B); CHKERRQ(ierr);
2021   PLogObjectParent(mat,a->B);
2022   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
2023   if (flg) {
2024     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
2025   }
2026   ierr = FListDuplicate(matin->qlist,&mat->qlist);CHKERRQ(ierr);
2027   *newmat = mat;
2028   PetscFunctionReturn(0);
2029 }
2030 
2031 #include "sys.h"
2032 
2033 #undef __FUNC__
2034 #define __FUNC__ "MatLoad_MPIAIJ"
2035 int MatLoad_MPIAIJ(Viewer viewer,MatType type,Mat *newmat)
2036 {
2037   Mat          A;
2038   Scalar       *vals,*svals;
2039   MPI_Comm     comm = ((PetscObject)viewer)->comm;
2040   MPI_Status   status;
2041   int          i, nz, ierr, j,rstart, rend, fd;
2042   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,maxnz,*cols;
2043   int          *ourlens,*sndcounts = 0,*procsnz = 0, *offlens,jj,*mycols,*smycols;
2044   int          tag = ((PetscObject)viewer)->tag,cend,cstart,n;
2045 
2046   PetscFunctionBegin;
2047   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
2048   if (!rank) {
2049     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
2050     ierr = PetscBinaryRead(fd,(char *)header,4,PETSC_INT); CHKERRQ(ierr);
2051     if (header[0] != MAT_COOKIE) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"not matrix object");
2052     if (header[3] < 0) {
2053       SETERRQ(PETSC_ERR_FILE_UNEXPECTED,1,"Matrix in special format on disk, cannot load as MPIAIJ");
2054     }
2055   }
2056 
2057   ierr = MPI_Bcast(header+1,3,MPI_INT,0,comm);CHKERRQ(ierr);
2058   M = header[1]; N = header[2];
2059   /* determine ownership of all rows */
2060   m = M/size + ((M % size) > rank);
2061   rowners = (int *) PetscMalloc((size+2)*sizeof(int)); CHKPTRQ(rowners);
2062   ierr = MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
2063   rowners[0] = 0;
2064   for ( i=2; i<=size; i++ ) {
2065     rowners[i] += rowners[i-1];
2066   }
2067   rstart = rowners[rank];
2068   rend   = rowners[rank+1];
2069 
2070   /* distribute row lengths to all processors */
2071   ourlens = (int*) PetscMalloc( 2*(rend-rstart)*sizeof(int) ); CHKPTRQ(ourlens);
2072   offlens = ourlens + (rend-rstart);
2073   if (!rank) {
2074     rowlengths = (int*) PetscMalloc( M*sizeof(int) ); CHKPTRQ(rowlengths);
2075     ierr = PetscBinaryRead(fd,rowlengths,M,PETSC_INT); CHKERRQ(ierr);
2076     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
2077     for ( i=0; i<size; i++ ) sndcounts[i] = rowners[i+1] - rowners[i];
2078     ierr = MPI_Scatterv(rowlengths,sndcounts,rowners,MPI_INT,ourlens,rend-rstart,MPI_INT,0,comm);CHKERRQ(ierr);
2079     PetscFree(sndcounts);
2080   } else {
2081     ierr = MPI_Scatterv(0,0,0,MPI_INT,ourlens,rend-rstart,MPI_INT, 0,comm);CHKERRQ(ierr);
2082   }
2083 
2084   if (!rank) {
2085     /* calculate the number of nonzeros on each processor */
2086     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
2087     PetscMemzero(procsnz,size*sizeof(int));
2088     for ( i=0; i<size; i++ ) {
2089       for ( j=rowners[i]; j< rowners[i+1]; j++ ) {
2090         procsnz[i] += rowlengths[j];
2091       }
2092     }
2093     PetscFree(rowlengths);
2094 
2095     /* determine max buffer needed and allocate it */
2096     maxnz = 0;
2097     for ( i=0; i<size; i++ ) {
2098       maxnz = PetscMax(maxnz,procsnz[i]);
2099     }
2100     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
2101 
2102     /* read in my part of the matrix column indices  */
2103     nz     = procsnz[0];
2104     mycols = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
2105     ierr   = PetscBinaryRead(fd,mycols,nz,PETSC_INT); CHKERRQ(ierr);
2106 
2107     /* read in every one elses and ship off */
2108     for ( i=1; i<size; i++ ) {
2109       nz   = procsnz[i];
2110       ierr = PetscBinaryRead(fd,cols,nz,PETSC_INT); CHKERRQ(ierr);
2111       ierr = MPI_Send(cols,nz,MPI_INT,i,tag,comm);CHKERRQ(ierr);
2112     }
2113     PetscFree(cols);
2114   } else {
2115     /* determine buffer space needed for message */
2116     nz = 0;
2117     for ( i=0; i<m; i++ ) {
2118       nz += ourlens[i];
2119     }
2120     mycols = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
2121 
2122     /* receive message of column indices*/
2123     ierr = MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);CHKERRQ(ierr);
2124     ierr = MPI_Get_count(&status,MPI_INT,&maxnz);CHKERRQ(ierr);
2125     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"something is wrong with file");
2126   }
2127 
2128   /* determine column ownership if matrix is not square */
2129   if (N != M) {
2130     n      = N/size + ((N % size) > rank);
2131     ierr   = MPI_Scan(&n,&cend,1,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
2132     cstart = cend - n;
2133   } else {
2134     cstart = rstart;
2135     cend   = rend;
2136     n      = cend - cstart;
2137   }
2138 
2139   /* loop over local rows, determining number of off diagonal entries */
2140   PetscMemzero(offlens,m*sizeof(int));
2141   jj = 0;
2142   for ( i=0; i<m; i++ ) {
2143     for ( j=0; j<ourlens[i]; j++ ) {
2144       if (mycols[jj] < cstart || mycols[jj] >= cend) offlens[i]++;
2145       jj++;
2146     }
2147   }
2148 
2149   /* create our matrix */
2150   for ( i=0; i<m; i++ ) {
2151     ourlens[i] -= offlens[i];
2152   }
2153   ierr = MatCreateMPIAIJ(comm,m,n,M,N,0,ourlens,0,offlens,newmat);CHKERRQ(ierr);
2154   A = *newmat;
2155   ierr = MatSetOption(A,MAT_COLUMNS_SORTED); CHKERRQ(ierr);
2156   for ( i=0; i<m; i++ ) {
2157     ourlens[i] += offlens[i];
2158   }
2159 
2160   if (!rank) {
2161     vals = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(vals);
2162 
2163     /* read in my part of the matrix numerical values  */
2164     nz   = procsnz[0];
2165     ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
2166 
2167     /* insert into matrix */
2168     jj      = rstart;
2169     smycols = mycols;
2170     svals   = vals;
2171     for ( i=0; i<m; i++ ) {
2172       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
2173       smycols += ourlens[i];
2174       svals   += ourlens[i];
2175       jj++;
2176     }
2177 
2178     /* read in other processors and ship out */
2179     for ( i=1; i<size; i++ ) {
2180       nz   = procsnz[i];
2181       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
2182       ierr = MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);CHKERRQ(ierr);
2183     }
2184     PetscFree(procsnz);
2185   } else {
2186     /* receive numeric values */
2187     vals = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(vals);
2188 
2189     /* receive message of values*/
2190     ierr = MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);CHKERRQ(ierr);
2191     ierr = MPI_Get_count(&status,MPIU_SCALAR,&maxnz);CHKERRQ(ierr);
2192     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"something is wrong with file");
2193 
2194     /* insert into matrix */
2195     jj      = rstart;
2196     smycols = mycols;
2197     svals   = vals;
2198     for ( i=0; i<m; i++ ) {
2199       ierr     = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
2200       smycols += ourlens[i];
2201       svals   += ourlens[i];
2202       jj++;
2203     }
2204   }
2205   PetscFree(ourlens); PetscFree(vals); PetscFree(mycols); PetscFree(rowners);
2206 
2207   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2208   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2209   PetscFunctionReturn(0);
2210 }
2211 
2212 #undef __FUNC__
2213 #define __FUNC__ "MatGetSubMatrix_MPIAIJ"
2214 /*
2215     Not great since it makes two copies of the submatrix, first an SeqAIJ
2216   in local and then by concatenating the local matrices the end result.
2217   Writing it directly would be much like MatGetSubMatrices_MPIAIJ()
2218 */
2219 int MatGetSubMatrix_MPIAIJ(Mat mat,IS isrow,IS iscol,int csize,MatReuse call,Mat *newmat)
2220 {
2221   int        ierr, i, m,n,rstart,row,rend,nz,*cwork,size,rank,j;
2222   Mat        *local,M, Mreuse;
2223   Scalar     *vwork,*aa;
2224   MPI_Comm   comm = mat->comm;
2225   Mat_SeqAIJ *aij;
2226   int        *ii, *jj,nlocal,*dlens,*olens,dlen,olen,jend;
2227 
2228   PetscFunctionBegin;
2229   MPI_Comm_rank(comm,&rank);
2230   MPI_Comm_size(comm,&size);
2231 
2232   if (call ==  MAT_REUSE_MATRIX) {
2233     ierr = PetscObjectQuery((PetscObject)*newmat,"SubMatrix",(PetscObject *)&Mreuse);CHKERRQ(ierr);
2234     if (!Mreuse) SETERRQ(1,1,"Submatrix passed in was not used before, cannot reuse");
2235     local = &Mreuse;
2236     ierr  = MatGetSubMatrices(mat,1,&isrow,&iscol,MAT_REUSE_MATRIX,&local);CHKERRQ(ierr);
2237   } else {
2238     ierr = MatGetSubMatrices(mat,1,&isrow,&iscol,MAT_INITIAL_MATRIX,&local);CHKERRQ(ierr);
2239     Mreuse = *local;
2240     PetscFree(local);
2241   }
2242 
2243   /*
2244       m - number of local rows
2245       n - number of columns (same on all processors)
2246       rstart - first row in new global matrix generated
2247   */
2248   ierr = MatGetSize(Mreuse,&m,&n);CHKERRQ(ierr);
2249   if (call == MAT_INITIAL_MATRIX) {
2250     aij = (Mat_SeqAIJ *) (Mreuse)->data;
2251     if (aij->indexshift) SETERRQ(PETSC_ERR_SUP,1,"No support for index shifted matrix");
2252     ii  = aij->i;
2253     jj  = aij->j;
2254 
2255     /*
2256         Determine the number of non-zeros in the diagonal and off-diagonal
2257         portions of the matrix in order to do correct preallocation
2258     */
2259 
2260     /* first get start and end of "diagonal" columns */
2261     if (csize == PETSC_DECIDE) {
2262       nlocal = n/size + ((n % size) > rank);
2263     } else {
2264       nlocal = csize;
2265     }
2266     ierr   = MPI_Scan(&nlocal,&rend,1,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
2267     rstart = rend - nlocal;
2268     if (rank == size - 1 && rend != n) {
2269       SETERRQ(1,1,"Local column sizes do not add up to total number of columns");
2270     }
2271 
2272     /* next, compute all the lengths */
2273     dlens = (int *) PetscMalloc( (2*m+1)*sizeof(int) );CHKPTRQ(dlens);
2274     olens = dlens + m;
2275     for ( i=0; i<m; i++ ) {
2276       jend = ii[i+1] - ii[i];
2277       olen = 0;
2278       dlen = 0;
2279       for ( j=0; j<jend; j++ ) {
2280         if ( *jj < rstart || *jj >= rend) olen++;
2281         else dlen++;
2282         jj++;
2283       }
2284       olens[i] = olen;
2285       dlens[i] = dlen;
2286     }
2287     ierr = MatCreateMPIAIJ(comm,m,nlocal,PETSC_DECIDE,n,0,dlens,0,olens,&M);CHKERRQ(ierr);
2288     PetscFree(dlens);
2289   } else {
2290     int ml,nl;
2291 
2292     M = *newmat;
2293     ierr = MatGetLocalSize(M,&ml,&nl);CHKERRQ(ierr);
2294     if (ml != m) SETERRQ(PETSC_ERR_ARG_SIZ,1,"Previous matrix must be same size/layout as request");
2295     ierr = MatZeroEntries(M);CHKERRQ(ierr);
2296     /*
2297          The next two lines are needed so we may call MatSetValues_MPIAIJ() below directly,
2298        rather than the slower MatSetValues().
2299     */
2300     M->was_assembled = PETSC_TRUE;
2301     M->assembled     = PETSC_FALSE;
2302   }
2303   ierr = MatGetOwnershipRange(M,&rstart,&rend); CHKERRQ(ierr);
2304   aij = (Mat_SeqAIJ *) (Mreuse)->data;
2305   if (aij->indexshift) SETERRQ(PETSC_ERR_SUP,1,"No support for index shifted matrix");
2306   ii  = aij->i;
2307   jj  = aij->j;
2308   aa  = aij->a;
2309   for (i=0; i<m; i++) {
2310     row   = rstart + i;
2311     nz    = ii[i+1] - ii[i];
2312     cwork = jj;     jj += nz;
2313     vwork = aa;     aa += nz;
2314     ierr = MatSetValues_MPIAIJ(M,1,&row,nz,cwork,vwork,INSERT_VALUES); CHKERRQ(ierr);
2315   }
2316 
2317   ierr = MatAssemblyBegin(M,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2318   ierr = MatAssemblyEnd(M,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2319   *newmat = M;
2320 
2321   /* save submatrix used in processor for next request */
2322   if (call ==  MAT_INITIAL_MATRIX) {
2323     ierr = PetscObjectCompose((PetscObject)M,"SubMatrix",(PetscObject)Mreuse);CHKERRQ(ierr);
2324     ierr = PetscObjectDereference((PetscObject)Mreuse);CHKERRQ(ierr);
2325   }
2326 
2327   PetscFunctionReturn(0);
2328 }
2329