xref: /petsc/src/mat/impls/aij/mpi/mpiaij.c (revision d07ff4555fceecf3da02886fa26b1d3c851458fc)
1 #ifdef PETSC_RCS_HEADER
2 static char vcid[] = "$Id: mpiaij.c,v 1.283 1999/02/17 21:37:02 balay Exp bsmith $";
3 #endif
4 
5 #include "src/mat/impls/aij/mpi/mpiaij.h"
6 #include "src/vec/vecimpl.h"
7 #include "src/inline/spops.h"
8 
9 /* local utility routine that creates a mapping from the global column
10 number to the local number in the off-diagonal part of the local
11 storage of the matrix.  This is done in a non scable way since the
12 length of colmap equals the global matrix length.
13 */
14 #undef __FUNC__
15 #define __FUNC__ "CreateColmap_MPIAIJ_Private"
16 int CreateColmap_MPIAIJ_Private(Mat mat)
17 {
18   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
19   Mat_SeqAIJ *B = (Mat_SeqAIJ*) aij->B->data;
20   int        n = B->n,i;
21 #if defined (USE_CTABLE)
22   int        ierr;
23 #endif
24 
25   PetscFunctionBegin;
26 #if defined (USE_CTABLE)
27   ierr = TableCreate(aij->n/5,&aij->colmap); CHKERRQ(ierr);
28   for ( i=0; i<n; i++ ){
29     ierr = TableAdd(aij->colmap,aij->garray[i]+1,i+1);CHKERRQ(ierr);
30   }
31 #else
32   aij->colmap = (int *) PetscMalloc((aij->N+1)*sizeof(int));CHKPTRQ(aij->colmap);
33   PLogObjectMemory(mat,aij->N*sizeof(int));
34   PetscMemzero(aij->colmap,aij->N*sizeof(int));
35   for ( i=0; i<n; i++ ) aij->colmap[aij->garray[i]] = i+1;
36 #endif
37   PetscFunctionReturn(0);
38 }
39 
40 extern int DisAssemble_MPIAIJ(Mat);
41 
42 #define CHUNKSIZE   15
43 #define MatSetValues_SeqAIJ_A_Private(row,col,value,addv) \
44 { \
45  \
46     rp   = aj + ai[row] + shift; ap = aa + ai[row] + shift; \
47     rmax = aimax[row]; nrow = ailen[row];  \
48     col1 = col - shift; \
49      \
50     low = 0; high = nrow; \
51     while (high-low > 5) { \
52       t = (low+high)/2; \
53       if (rp[t] > col) high = t; \
54       else             low  = t; \
55     } \
56       for ( _i=0; _i<nrow; _i++ ) { \
57         if (rp[_i] > col1) break; \
58         if (rp[_i] == col1) { \
59           if (addv == ADD_VALUES) ap[_i] += value;   \
60           else                  ap[_i] = value; \
61           goto a_noinsert; \
62         } \
63       }  \
64       if (nonew == 1) goto a_noinsert; \
65       else if (nonew == -1) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero into matrix"); \
66       if (nrow >= rmax) { \
67         /* there is no extra room in row, therefore enlarge */ \
68         int    new_nz = ai[a->m] + CHUNKSIZE,len,*new_i,*new_j; \
69         Scalar *new_a; \
70  \
71         if (nonew == -2) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero in the matrix"); \
72  \
73         /* malloc new storage space */ \
74         len     = new_nz*(sizeof(int)+sizeof(Scalar))+(a->m+1)*sizeof(int); \
75         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
76         new_j   = (int *) (new_a + new_nz); \
77         new_i   = new_j + new_nz; \
78  \
79         /* copy over old data into new slots */ \
80         for ( ii=0; ii<row+1; ii++ ) {new_i[ii] = ai[ii];} \
81         for ( ii=row+1; ii<a->m+1; ii++ ) {new_i[ii] = ai[ii]+CHUNKSIZE;} \
82         PetscMemcpy(new_j,aj,(ai[row]+nrow+shift)*sizeof(int)); \
83         len = (new_nz - CHUNKSIZE - ai[row] - nrow - shift); \
84         PetscMemcpy(new_j+ai[row]+shift+nrow+CHUNKSIZE,aj+ai[row]+shift+nrow, \
85                                                            len*sizeof(int)); \
86         PetscMemcpy(new_a,aa,(ai[row]+nrow+shift)*sizeof(Scalar)); \
87         PetscMemcpy(new_a+ai[row]+shift+nrow+CHUNKSIZE,aa+ai[row]+shift+nrow, \
88                                                            len*sizeof(Scalar));  \
89         /* free up old matrix storage */ \
90  \
91         PetscFree(a->a);  \
92         if (!a->singlemalloc) {PetscFree(a->i);PetscFree(a->j);} \
93         aa = a->a = new_a; ai = a->i = new_i; aj = a->j = new_j;  \
94         a->singlemalloc = 1; \
95  \
96         rp   = aj + ai[row] + shift; ap = aa + ai[row] + shift; \
97         rmax = aimax[row] = aimax[row] + CHUNKSIZE; \
98         PLogObjectMemory(A,CHUNKSIZE*(sizeof(int) + sizeof(Scalar))); \
99         a->maxnz += CHUNKSIZE; \
100         a->reallocs++; \
101       } \
102       N = nrow++ - 1; a->nz++; \
103       /* shift up all the later entries in this row */ \
104       for ( ii=N; ii>=_i; ii-- ) { \
105         rp[ii+1] = rp[ii]; \
106         ap[ii+1] = ap[ii]; \
107       } \
108       rp[_i] = col1;  \
109       ap[_i] = value;  \
110       a_noinsert: ; \
111       ailen[row] = nrow; \
112 }
113 
114 #define MatSetValues_SeqAIJ_B_Private(row,col,value,addv) \
115 { \
116  \
117     rp   = bj + bi[row] + shift; ap = ba + bi[row] + shift; \
118     rmax = bimax[row]; nrow = bilen[row];  \
119     col1 = col - shift; \
120      \
121     low = 0; high = nrow; \
122     while (high-low > 5) { \
123       t = (low+high)/2; \
124       if (rp[t] > col) high = t; \
125       else             low  = t; \
126     } \
127        for ( _i=0; _i<nrow; _i++ ) { \
128         if (rp[_i] > col1) break; \
129         if (rp[_i] == col1) { \
130           if (addv == ADD_VALUES) ap[_i] += value;   \
131           else                  ap[_i] = value; \
132           goto b_noinsert; \
133         } \
134       }  \
135       if (nonew == 1) goto b_noinsert; \
136       else if (nonew == -1) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero into matrix"); \
137       if (nrow >= rmax) { \
138         /* there is no extra room in row, therefore enlarge */ \
139         int    new_nz = bi[b->m] + CHUNKSIZE,len,*new_i,*new_j; \
140         Scalar *new_a; \
141  \
142         if (nonew == -2) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero in the matrix"); \
143  \
144         /* malloc new storage space */ \
145         len     = new_nz*(sizeof(int)+sizeof(Scalar))+(b->m+1)*sizeof(int); \
146         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
147         new_j   = (int *) (new_a + new_nz); \
148         new_i   = new_j + new_nz; \
149  \
150         /* copy over old data into new slots */ \
151         for ( ii=0; ii<row+1; ii++ ) {new_i[ii] = bi[ii];} \
152         for ( ii=row+1; ii<b->m+1; ii++ ) {new_i[ii] = bi[ii]+CHUNKSIZE;} \
153         PetscMemcpy(new_j,bj,(bi[row]+nrow+shift)*sizeof(int)); \
154         len = (new_nz - CHUNKSIZE - bi[row] - nrow - shift); \
155         PetscMemcpy(new_j+bi[row]+shift+nrow+CHUNKSIZE,bj+bi[row]+shift+nrow, \
156                                                            len*sizeof(int)); \
157         PetscMemcpy(new_a,ba,(bi[row]+nrow+shift)*sizeof(Scalar)); \
158         PetscMemcpy(new_a+bi[row]+shift+nrow+CHUNKSIZE,ba+bi[row]+shift+nrow, \
159                                                            len*sizeof(Scalar));  \
160         /* free up old matrix storage */ \
161  \
162         PetscFree(b->a);  \
163         if (!b->singlemalloc) {PetscFree(b->i);PetscFree(b->j);} \
164         ba = b->a = new_a; bi = b->i = new_i; bj = b->j = new_j;  \
165         b->singlemalloc = 1; \
166  \
167         rp   = bj + bi[row] + shift; ap = ba + bi[row] + shift; \
168         rmax = bimax[row] = bimax[row] + CHUNKSIZE; \
169         PLogObjectMemory(B,CHUNKSIZE*(sizeof(int) + sizeof(Scalar))); \
170         b->maxnz += CHUNKSIZE; \
171         b->reallocs++; \
172       } \
173       N = nrow++ - 1; b->nz++; \
174       /* shift up all the later entries in this row */ \
175       for ( ii=N; ii>=_i; ii-- ) { \
176         rp[ii+1] = rp[ii]; \
177         ap[ii+1] = ap[ii]; \
178       } \
179       rp[_i] = col1;  \
180       ap[_i] = value;  \
181       b_noinsert: ; \
182       bilen[row] = nrow; \
183 }
184 
185 extern int MatSetValues_SeqAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
186 #undef __FUNC__
187 #define __FUNC__ "MatSetValues_MPIAIJ"
188 int MatSetValues_MPIAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
189 {
190   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
191   Scalar     value;
192   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
193   int        cstart = aij->cstart, cend = aij->cend,row,col;
194   int        roworiented = aij->roworiented;
195 
196   /* Some Variables required in the macro */
197   Mat        A = aij->A;
198   Mat_SeqAIJ *a = (Mat_SeqAIJ *) A->data;
199   int        *aimax = a->imax, *ai = a->i, *ailen = a->ilen,*aj = a->j;
200   Scalar     *aa = a->a;
201 
202   Mat        B = aij->B;
203   Mat_SeqAIJ *b = (Mat_SeqAIJ *) B->data;
204   int        *bimax = b->imax, *bi = b->i, *bilen = b->ilen,*bj = b->j;
205   Scalar     *ba = b->a;
206 
207   int        *rp,ii,nrow,_i,rmax, N, col1,low,high,t;
208   int        nonew = a->nonew,shift = a->indexshift;
209   Scalar     *ap;
210 
211   PetscFunctionBegin;
212   for ( i=0; i<m; i++ ) {
213     if (im[i] < 0) continue;
214 #if defined(USE_PETSC_BOPT_g)
215     if (im[i] >= aij->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
216 #endif
217     if (im[i] >= rstart && im[i] < rend) {
218       row = im[i] - rstart;
219       for ( j=0; j<n; j++ ) {
220         if (in[j] >= cstart && in[j] < cend){
221           col = in[j] - cstart;
222           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
223           MatSetValues_SeqAIJ_A_Private(row,col,value,addv);
224           /* ierr = MatSetValues_SeqAIJ(aij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
225         }
226         else if (in[j] < 0) continue;
227 #if defined(USE_PETSC_BOPT_g)
228         else if (in[j] >= aij->N) {SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Column too large");}
229 #endif
230         else {
231           if (mat->was_assembled) {
232             if (!aij->colmap) {
233               ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
234             }
235 #if defined (USE_CTABLE)
236             ierr = TableFind(aij->colmap,in[j]+1,&col); CHKERRQ(ierr);
237 	    col--;
238 #else
239             col = aij->colmap[in[j]] - 1;
240 #endif
241             if (col < 0 && !((Mat_SeqAIJ*)(aij->A->data))->nonew) {
242               ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
243               col =  in[j];
244               /* Reinitialize the variables required by MatSetValues_SeqAIJ_B_Private() */
245               B = aij->B;
246               b = (Mat_SeqAIJ *) B->data;
247               bimax = b->imax; bi = b->i; bilen = b->ilen; bj = b->j;
248               ba = b->a;
249             }
250           } else col = in[j];
251           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
252           MatSetValues_SeqAIJ_B_Private(row,col,value,addv);
253           /* ierr = MatSetValues_SeqAIJ(aij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
254         }
255       }
256     } else {
257       if (roworiented && !aij->donotstash) {
258         ierr = StashValues_Private(&aij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
259       } else {
260         if (!aij->donotstash) {
261           row = im[i];
262           for ( j=0; j<n; j++ ) {
263             ierr = StashValues_Private(&aij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
264           }
265         }
266       }
267     }
268   }
269   PetscFunctionReturn(0);
270 }
271 
272 #undef __FUNC__
273 #define __FUNC__ "MatGetValues_MPIAIJ"
274 int MatGetValues_MPIAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
275 {
276   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
277   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
278   int        cstart = aij->cstart, cend = aij->cend,row,col;
279 
280   PetscFunctionBegin;
281   for ( i=0; i<m; i++ ) {
282     if (idxm[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
283     if (idxm[i] >= aij->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
284     if (idxm[i] >= rstart && idxm[i] < rend) {
285       row = idxm[i] - rstart;
286       for ( j=0; j<n; j++ ) {
287         if (idxn[j] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative column");
288         if (idxn[j] >= aij->N) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Column too large");
289         if (idxn[j] >= cstart && idxn[j] < cend){
290           col = idxn[j] - cstart;
291           ierr = MatGetValues(aij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
292         } else {
293           if (!aij->colmap) {
294             ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
295           }
296 #if defined (USE_CTABLE)
297           ierr = TableFind(aij->colmap,idxn[j]+1,&col); CHKERRQ(ierr);
298           col --;
299 #else
300           col = aij->colmap[idxn[j]] - 1;
301 #endif
302           if ((col < 0) || (aij->garray[col] != idxn[j])) *(v+i*n+j) = 0.0;
303           else {
304             ierr = MatGetValues(aij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
305           }
306         }
307       }
308     } else {
309       SETERRQ(PETSC_ERR_SUP,0,"Only local values currently supported");
310     }
311   }
312   PetscFunctionReturn(0);
313 }
314 
315 #undef __FUNC__
316 #define __FUNC__ "MatAssemblyBegin_MPIAIJ"
317 int MatAssemblyBegin_MPIAIJ(Mat mat,MatAssemblyType mode)
318 {
319   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
320   MPI_Comm    comm = mat->comm;
321   int         size = aij->size, *owners = aij->rowners;
322   int         rank = aij->rank,tag = mat->tag, *owner,*starts,count,ierr;
323   MPI_Request *send_waits,*recv_waits;
324   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
325   InsertMode  addv;
326   Scalar      *rvalues,*svalues;
327 
328   PetscFunctionBegin;
329   if (aij->donotstash) {
330     aij->svalues    = 0; aij->rvalues    = 0;
331     aij->nsends     = 0; aij->nrecvs     = 0;
332     aij->send_waits = 0; aij->recv_waits = 0;
333     aij->rmax       = 0;
334     PetscFunctionReturn(0);
335   }
336 
337   /* make sure all processors are either in INSERTMODE or ADDMODE */
338   ierr = MPI_Allreduce(&mat->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);CHKERRQ(ierr);
339   if (addv == (ADD_VALUES|INSERT_VALUES)) {
340     SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"Some processors inserted others added");
341   }
342   mat->insertmode = addv; /* in case this processor had no cache */
343 
344   /*  first count number of contributors to each processor */
345   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
346   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
347   owner = (int *) PetscMalloc( (aij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
348   for ( i=0; i<aij->stash.n; i++ ) {
349     idx = aij->stash.idx[i];
350     for ( j=0; j<size; j++ ) {
351       if (idx >= owners[j] && idx < owners[j+1]) {
352         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
353       }
354     }
355   }
356   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
357 
358   /* inform other processors of number of messages and max length*/
359   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
360   ierr = MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
361   nreceives = work[rank];
362   ierr = MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);CHKERRQ(ierr);
363   nmax = work[rank];
364   PetscFree(work);
365 
366   /* post receives:
367        1) each message will consist of ordered pairs
368      (global index,value) we store the global index as a double
369      to simplify the message passing.
370        2) since we don't know how long each individual message is we
371      allocate the largest needed buffer for each receive. Potentially
372      this is a lot of wasted space.
373 
374 
375        This could be done better.
376   */
377   rvalues    = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));CHKPTRQ(rvalues);
378   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
379   for ( i=0; i<nreceives; i++ ) {
380     ierr = MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
381               comm,recv_waits+i);CHKERRQ(ierr);
382   }
383 
384   /* do sends:
385       1) starts[i] gives the starting index in svalues for stuff going to
386          the ith processor
387   */
388   svalues    = (Scalar *) PetscMalloc(3*(aij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
389   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
390   starts     = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
391   starts[0]  = 0;
392   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
393   for ( i=0; i<aij->stash.n; i++ ) {
394     svalues[3*starts[owner[i]]]       = (Scalar)  aij->stash.idx[i];
395     svalues[3*starts[owner[i]]+1]     = (Scalar)  aij->stash.idy[i];
396     svalues[3*(starts[owner[i]]++)+2] =  aij->stash.array[i];
397   }
398   PetscFree(owner);
399   starts[0] = 0;
400   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
401   count = 0;
402   for ( i=0; i<size; i++ ) {
403     if (procs[i]) {
404       ierr = MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,comm,send_waits+count++);CHKERRQ(ierr);
405     }
406   }
407   PetscFree(starts);
408   PetscFree(nprocs);
409 
410   /* Free cache space */
411   PLogInfo(aij->A,"MatAssemblyBegin_MPIAIJ:Number of off-processor values %d\n",aij->stash.n);
412   ierr = StashDestroy_Private(&aij->stash); CHKERRQ(ierr);
413 
414   aij->svalues    = svalues;    aij->rvalues    = rvalues;
415   aij->nsends     = nsends;     aij->nrecvs     = nreceives;
416   aij->send_waits = send_waits; aij->recv_waits = recv_waits;
417   aij->rmax       = nmax;
418 
419   PetscFunctionReturn(0);
420 }
421 extern int MatSetUpMultiply_MPIAIJ(Mat);
422 
423 #undef __FUNC__
424 #define __FUNC__ "MatAssemblyEnd_MPIAIJ"
425 int MatAssemblyEnd_MPIAIJ(Mat mat,MatAssemblyType mode)
426 {
427   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
428   MPI_Status  *send_status,recv_status;
429   int         imdex,nrecvs = aij->nrecvs, count = nrecvs, i, n, ierr;
430   int         row,col,other_disassembled;
431   Scalar      *values,val;
432   InsertMode  addv = mat->insertmode;
433 
434   PetscFunctionBegin;
435 
436   /*  wait on receives */
437   while (count) {
438     ierr = MPI_Waitany(nrecvs,aij->recv_waits,&imdex,&recv_status);CHKERRQ(ierr);
439     /* unpack receives into our local space */
440     values = aij->rvalues + 3*imdex*aij->rmax;
441     ierr = MPI_Get_count(&recv_status,MPIU_SCALAR,&n);CHKERRQ(ierr);
442     n = n/3;
443     for ( i=0; i<n; i++ ) {
444       row = (int) PetscReal(values[3*i]) - aij->rstart;
445       col = (int) PetscReal(values[3*i+1]);
446       val = values[3*i+2];
447       if (col >= aij->cstart && col < aij->cend) {
448         col -= aij->cstart;
449         ierr = MatSetValues(aij->A,1,&row,1,&col,&val,addv); CHKERRQ(ierr);
450       } else {
451         if (mat->was_assembled || mat->assembled) {
452           if (!aij->colmap) {
453             ierr = CreateColmap_MPIAIJ_Private(mat); CHKERRQ(ierr);
454           }
455 #if defined (USE_CTABLE)
456             ierr = TableFind(aij->colmap,col+1,&col); CHKERRQ(ierr);
457 	    col --;
458 #else
459           col = aij->colmap[col] - 1;
460 #endif
461           if (col < 0  && !((Mat_SeqAIJ*)(aij->A->data))->nonew) {
462             ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
463             col = (int) PetscReal(values[3*i+1]);
464           }
465         }
466         ierr = MatSetValues(aij->B,1,&row,1,&col,&val,addv); CHKERRQ(ierr);
467       }
468     }
469     count--;
470   }
471   if (aij->recv_waits) PetscFree(aij->recv_waits);
472   if (aij->rvalues)    PetscFree(aij->rvalues);
473 
474   /* wait on sends */
475   if (aij->nsends) {
476     send_status = (MPI_Status *) PetscMalloc(aij->nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
477     ierr        = MPI_Waitall(aij->nsends,aij->send_waits,send_status);CHKERRQ(ierr);
478     PetscFree(send_status);
479   }
480   if (aij->send_waits) PetscFree(aij->send_waits);
481   if (aij->svalues)    PetscFree(aij->svalues);
482 
483   ierr = MatAssemblyBegin(aij->A,mode); CHKERRQ(ierr);
484   ierr = MatAssemblyEnd(aij->A,mode); CHKERRQ(ierr);
485 
486   /* determine if any processor has disassembled, if so we must
487      also disassemble ourselfs, in order that we may reassemble. */
488   /*
489      if nonzero structure of submatrix B cannot change then we know that
490      no processor disassembled thus we can skip this stuff
491   */
492   if (!((Mat_SeqAIJ*) aij->B->data)->nonew)  {
493     ierr = MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);CHKERRQ(ierr);
494     if (mat->was_assembled && !other_disassembled) {
495       ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
496     }
497   }
498 
499   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
500     ierr = MatSetUpMultiply_MPIAIJ(mat); CHKERRQ(ierr);
501   }
502   ierr = MatAssemblyBegin(aij->B,mode); CHKERRQ(ierr);
503   ierr = MatAssemblyEnd(aij->B,mode); CHKERRQ(ierr);
504 
505   if (aij->rowvalues) {PetscFree(aij->rowvalues); aij->rowvalues = 0;}
506   PetscFunctionReturn(0);
507 }
508 
509 #undef __FUNC__
510 #define __FUNC__ "MatZeroEntries_MPIAIJ"
511 int MatZeroEntries_MPIAIJ(Mat A)
512 {
513   Mat_MPIAIJ *l = (Mat_MPIAIJ *) A->data;
514   int        ierr;
515 
516   PetscFunctionBegin;
517   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
518   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
519   PetscFunctionReturn(0);
520 }
521 
522 #undef __FUNC__
523 #define __FUNC__ "MatZeroRows_MPIAIJ"
524 int MatZeroRows_MPIAIJ(Mat A,IS is,Scalar *diag)
525 {
526   Mat_MPIAIJ     *l = (Mat_MPIAIJ *) A->data;
527   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
528   int            *procs,*nprocs,j,found,idx,nsends,*work,row;
529   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
530   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
531   int            *lens,imdex,*lrows,*values,rstart=l->rstart;
532   MPI_Comm       comm = A->comm;
533   MPI_Request    *send_waits,*recv_waits;
534   MPI_Status     recv_status,*send_status;
535   IS             istmp;
536 
537   PetscFunctionBegin;
538   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
539   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
540 
541   /*  first count number of contributors to each processor */
542   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
543   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
544   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
545   for ( i=0; i<N; i++ ) {
546     idx = rows[i];
547     found = 0;
548     for ( j=0; j<size; j++ ) {
549       if (idx >= owners[j] && idx < owners[j+1]) {
550         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
551       }
552     }
553     if (!found) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Index out of range");
554   }
555   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
556 
557   /* inform other processors of number of messages and max length*/
558   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
559   ierr = MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
560   nrecvs = work[rank];
561   ierr = MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);CHKERRQ(ierr);
562   nmax = work[rank];
563   PetscFree(work);
564 
565   /* post receives:   */
566   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int));CHKPTRQ(rvalues);
567   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
568   for ( i=0; i<nrecvs; i++ ) {
569     ierr = MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);CHKERRQ(ierr);
570   }
571 
572   /* do sends:
573       1) starts[i] gives the starting index in svalues for stuff going to
574          the ith processor
575   */
576   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
577   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
578   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
579   starts[0] = 0;
580   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
581   for ( i=0; i<N; i++ ) {
582     svalues[starts[owner[i]]++] = rows[i];
583   }
584   ISRestoreIndices(is,&rows);
585 
586   starts[0] = 0;
587   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
588   count = 0;
589   for ( i=0; i<size; i++ ) {
590     if (procs[i]) {
591       ierr = MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);CHKERRQ(ierr);
592     }
593   }
594   PetscFree(starts);
595 
596   base = owners[rank];
597 
598   /*  wait on receives */
599   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
600   source = lens + nrecvs;
601   count  = nrecvs; slen = 0;
602   while (count) {
603     ierr = MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);CHKERRQ(ierr);
604     /* unpack receives into our local space */
605     ierr = MPI_Get_count(&recv_status,MPI_INT,&n);CHKERRQ(ierr);
606     source[imdex]  = recv_status.MPI_SOURCE;
607     lens[imdex]  = n;
608     slen += n;
609     count--;
610   }
611   PetscFree(recv_waits);
612 
613   /* move the data into the send scatter */
614   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
615   count = 0;
616   for ( i=0; i<nrecvs; i++ ) {
617     values = rvalues + i*nmax;
618     for ( j=0; j<lens[i]; j++ ) {
619       lrows[count++] = values[j] - base;
620     }
621   }
622   PetscFree(rvalues); PetscFree(lens);
623   PetscFree(owner); PetscFree(nprocs);
624 
625   /* actually zap the local rows */
626   ierr = ISCreateGeneral(PETSC_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
627   PLogObjectParent(A,istmp);
628 
629   /*
630         Zero the required rows. If the "diagonal block" of the matrix
631      is square and the user wishes to set the diagonal we use seperate
632      code so that MatSetValues() is not called for each diagonal allocating
633      new memory, thus calling lots of mallocs and slowing things down.
634 
635        Contributed by: Mathew Knepley
636   */
637   /* must zero l->B before l->A because the (diag) case below may put values into l->B*/
638   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
639   if (diag && (l->A->M == l->A->N)) {
640     ierr      = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
641   } else if (diag) {
642     ierr = MatZeroRows(l->A,istmp,0); CHKERRQ(ierr);
643     if (((Mat_SeqAIJ*)l->A->data)->nonew) {
644       SETERRQ(PETSC_ERR_SUP,0,"MatZeroRows() on rectangular matrices cannot be used with the Mat options\n\
645 MAT_NO_NEW_NONZERO_LOCATIONS,MAT_NEW_NONZERO_LOCATION_ERR,MAT_NEW_NONZERO_ALLOCATION_ERR");
646     }
647     for ( i = 0; i < slen; i++ ) {
648       row  = lrows[i] + rstart;
649       ierr = MatSetValues(A,1,&row,1,&row,diag,INSERT_VALUES);CHKERRQ(ierr);
650     }
651     ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
652     ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
653   } else {
654     ierr = MatZeroRows(l->A,istmp,0); CHKERRQ(ierr);
655   }
656   ierr = ISDestroy(istmp); CHKERRQ(ierr);
657   PetscFree(lrows);
658 
659   /* wait on sends */
660   if (nsends) {
661     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
662     ierr        = MPI_Waitall(nsends,send_waits,send_status);CHKERRQ(ierr);
663     PetscFree(send_status);
664   }
665   PetscFree(send_waits); PetscFree(svalues);
666 
667   PetscFunctionReturn(0);
668 }
669 
670 #undef __FUNC__
671 #define __FUNC__ "MatMult_MPIAIJ"
672 int MatMult_MPIAIJ(Mat A,Vec xx,Vec yy)
673 {
674   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
675   int        ierr,nt;
676 
677   PetscFunctionBegin;
678   ierr = VecGetLocalSize(xx,&nt);  CHKERRQ(ierr);
679   if (nt != a->n) {
680     SETERRQ(PETSC_ERR_ARG_SIZ,0,"Incompatible partition of A and xx");
681   }
682   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx); CHKERRQ(ierr);
683   ierr = (*a->A->ops->mult)(a->A,xx,yy); CHKERRQ(ierr);
684   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx); CHKERRQ(ierr);
685   ierr = (*a->B->ops->multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
686   PetscFunctionReturn(0);
687 }
688 
689 #undef __FUNC__
690 #define __FUNC__ "MatMultAdd_MPIAIJ"
691 int MatMultAdd_MPIAIJ(Mat A,Vec xx,Vec yy,Vec zz)
692 {
693   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
694   int        ierr;
695 
696   PetscFunctionBegin;
697   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
698   ierr = (*a->A->ops->multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
699   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
700   ierr = (*a->B->ops->multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
701   PetscFunctionReturn(0);
702 }
703 
704 #undef __FUNC__
705 #define __FUNC__ "MatMultTrans_MPIAIJ"
706 int MatMultTrans_MPIAIJ(Mat A,Vec xx,Vec yy)
707 {
708   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
709   int        ierr;
710 
711   PetscFunctionBegin;
712   /* do nondiagonal part */
713   ierr = (*a->B->ops->multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
714   /* send it on its way */
715   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
716   /* do local part */
717   ierr = (*a->A->ops->multtrans)(a->A,xx,yy); CHKERRQ(ierr);
718   /* receive remote parts: note this assumes the values are not actually */
719   /* inserted in yy until the next line, which is true for my implementation*/
720   /* but is not perhaps always true. */
721   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
722   PetscFunctionReturn(0);
723 }
724 
725 #undef __FUNC__
726 #define __FUNC__ "MatMultTransAdd_MPIAIJ"
727 int MatMultTransAdd_MPIAIJ(Mat A,Vec xx,Vec yy,Vec zz)
728 {
729   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
730   int        ierr;
731 
732   PetscFunctionBegin;
733   /* do nondiagonal part */
734   ierr = (*a->B->ops->multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
735   /* send it on its way */
736   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
737   /* do local part */
738   ierr = (*a->A->ops->multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
739   /* receive remote parts: note this assumes the values are not actually */
740   /* inserted in yy until the next line, which is true for my implementation*/
741   /* but is not perhaps always true. */
742   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
743   PetscFunctionReturn(0);
744 }
745 
746 /*
747   This only works correctly for square matrices where the subblock A->A is the
748    diagonal block
749 */
750 #undef __FUNC__
751 #define __FUNC__ "MatGetDiagonal_MPIAIJ"
752 int MatGetDiagonal_MPIAIJ(Mat A,Vec v)
753 {
754   int        ierr;
755   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
756 
757   PetscFunctionBegin;
758   if (a->M != a->N) SETERRQ(PETSC_ERR_SUP,0,"Supports only square matrix where A->A is diag block");
759   if (a->rstart != a->cstart || a->rend != a->cend) {
760     SETERRQ(PETSC_ERR_ARG_SIZ,0,"row partition must equal col partition");
761   }
762   ierr = MatGetDiagonal(a->A,v);CHKERRQ(ierr);
763   PetscFunctionReturn(0);
764 }
765 
766 #undef __FUNC__
767 #define __FUNC__ "MatScale_MPIAIJ"
768 int MatScale_MPIAIJ(Scalar *aa,Mat A)
769 {
770   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
771   int        ierr;
772 
773   PetscFunctionBegin;
774   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
775   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
776   PetscFunctionReturn(0);
777 }
778 
779 #undef __FUNC__
780 #define __FUNC__ "MatDestroy_MPIAIJ"
781 int MatDestroy_MPIAIJ(Mat mat)
782 {
783   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
784   int        ierr;
785 
786   PetscFunctionBegin;
787   if (--mat->refct > 0) PetscFunctionReturn(0);
788 
789   if (mat->mapping) {
790     ierr = ISLocalToGlobalMappingDestroy(mat->mapping); CHKERRQ(ierr);
791   }
792   if (mat->bmapping) {
793     ierr = ISLocalToGlobalMappingDestroy(mat->bmapping); CHKERRQ(ierr);
794   }
795   if (mat->rmap) {
796     ierr = MapDestroy(mat->rmap);CHKERRQ(ierr);
797   }
798   if (mat->cmap) {
799     ierr = MapDestroy(mat->cmap);CHKERRQ(ierr);
800   }
801 #if defined(USE_PETSC_LOG)
802   PLogObjectState((PetscObject)mat,"Rows=%d, Cols=%d",aij->M,aij->N);
803 #endif
804   ierr = StashDestroy_Private(&aij->stash); CHKERRQ(ierr);
805   PetscFree(aij->rowners);
806   ierr = MatDestroy(aij->A); CHKERRQ(ierr);
807   ierr = MatDestroy(aij->B); CHKERRQ(ierr);
808 #if defined (USE_CTABLE)
809   if (aij->colmap) TableDelete(aij->colmap);
810 #else
811   if (aij->colmap) PetscFree(aij->colmap);
812 #endif
813   if (aij->garray) PetscFree(aij->garray);
814   if (aij->lvec)   VecDestroy(aij->lvec);
815   if (aij->Mvctx)  VecScatterDestroy(aij->Mvctx);
816   if (aij->rowvalues) PetscFree(aij->rowvalues);
817   PetscFree(aij);
818   PLogObjectDestroy(mat);
819   PetscHeaderDestroy(mat);
820   PetscFunctionReturn(0);
821 }
822 
823 #undef __FUNC__
824 #define __FUNC__ "MatView_MPIAIJ_Binary"
825 extern int MatView_MPIAIJ_Binary(Mat mat,Viewer viewer)
826 {
827   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
828   int         ierr;
829 
830   PetscFunctionBegin;
831   if (aij->size == 1) {
832     ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
833   }
834   else SETERRQ(PETSC_ERR_SUP,0,"Only uniprocessor output supported");
835   PetscFunctionReturn(0);
836 }
837 
838 #undef __FUNC__
839 #define __FUNC__ "MatView_MPIAIJ_ASCIIorDraworSocket"
840 extern int MatView_MPIAIJ_ASCIIorDraworSocket(Mat mat,Viewer viewer)
841 {
842   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
843   Mat_SeqAIJ* C = (Mat_SeqAIJ*)aij->A->data;
844   int         ierr, format,shift = C->indexshift,rank = aij->rank ;
845   int         size = aij->size;
846   FILE        *fd;
847   ViewerType  vtype;
848 
849   PetscFunctionBegin;
850   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
851   if (PetscTypeCompare(vtype,ASCII_VIEWER)) {
852     ierr = ViewerGetFormat(viewer,&format);CHKERRQ(ierr);
853     if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
854       MatInfo info;
855       int     flg;
856       MPI_Comm_rank(mat->comm,&rank);
857       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
858       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
859       ierr = OptionsHasName(PETSC_NULL,"-mat_aij_no_inode",&flg); CHKERRQ(ierr);
860       PetscSequentialPhaseBegin(mat->comm,1);
861       if (flg) fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d mem %d, not using I-node routines\n",
862          rank,aij->m,(int)info.nz_used,(int)info.nz_allocated,(int)info.memory);
863       else fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d mem %d, using I-node routines\n",
864          rank,aij->m,(int)info.nz_used,(int)info.nz_allocated,(int)info.memory);
865       ierr = MatGetInfo(aij->A,MAT_LOCAL,&info);
866       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used);
867       ierr = MatGetInfo(aij->B,MAT_LOCAL,&info);
868       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used);
869       fflush(fd);
870       PetscSequentialPhaseEnd(mat->comm,1);
871       ierr = VecScatterView(aij->Mvctx,viewer); CHKERRQ(ierr);
872       PetscFunctionReturn(0);
873     } else if (format == VIEWER_FORMAT_ASCII_INFO) {
874       PetscFunctionReturn(0);
875     }
876   } else if (PetscTypeCompare(vtype,DRAW_VIEWER)) {
877     Draw       draw;
878     PetscTruth isnull;
879     ierr = ViewerDrawGetDraw(viewer,0,&draw); CHKERRQ(ierr);
880     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) PetscFunctionReturn(0);
881   }
882 
883   if (size == 1) {
884     ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
885   } else {
886     /* assemble the entire matrix onto first processor. */
887     Mat         A;
888     Mat_SeqAIJ *Aloc;
889     int         M = aij->M, N = aij->N,m,*ai,*aj,row,*cols,i,*ct;
890     Scalar      *a;
891 
892     if (!rank) {
893       ierr = MatCreateMPIAIJ(mat->comm,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);CHKERRQ(ierr);
894     } else {
895       ierr = MatCreateMPIAIJ(mat->comm,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);CHKERRQ(ierr);
896     }
897     PLogObjectParent(mat,A);
898 
899     /* copy over the A part */
900     Aloc = (Mat_SeqAIJ*) aij->A->data;
901     m = Aloc->m; ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
902     row = aij->rstart;
903     for ( i=0; i<ai[m]+shift; i++ ) {aj[i] += aij->cstart + shift;}
904     for ( i=0; i<m; i++ ) {
905       ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],aj,a,INSERT_VALUES);CHKERRQ(ierr);
906       row++; a += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
907     }
908     aj = Aloc->j;
909     for ( i=0; i<ai[m]+shift; i++ ) {aj[i] -= aij->cstart + shift;}
910 
911     /* copy over the B part */
912     Aloc = (Mat_SeqAIJ*) aij->B->data;
913     m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
914     row = aij->rstart;
915     ct = cols = (int *) PetscMalloc( (ai[m]+1)*sizeof(int) ); CHKPTRQ(cols);
916     for ( i=0; i<ai[m]+shift; i++ ) {cols[i] = aij->garray[aj[i]+shift];}
917     for ( i=0; i<m; i++ ) {
918       ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],cols,a,INSERT_VALUES);CHKERRQ(ierr);
919       row++; a += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
920     }
921     PetscFree(ct);
922     ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
923     ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
924     /*
925        Everyone has to call to draw the matrix since the graphics waits are
926        synchronized across all processors that share the Draw object
927     */
928     if (!rank || PetscTypeCompare(vtype,DRAW_VIEWER)) {
929       ierr = MatView(((Mat_MPIAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
930     }
931     ierr = MatDestroy(A); CHKERRQ(ierr);
932   }
933   PetscFunctionReturn(0);
934 }
935 
936 #undef __FUNC__
937 #define __FUNC__ "MatView_MPIAIJ"
938 int MatView_MPIAIJ(Mat mat,Viewer viewer)
939 {
940   int         ierr;
941   ViewerType  vtype;
942 
943   PetscFunctionBegin;
944   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
945   if (PetscTypeCompare(vtype,ASCII_VIEWER) || PetscTypeCompare(vtype,DRAW_VIEWER) ||
946       PetscTypeCompare(vtype,SOCKET_VIEWER)) {
947     ierr = MatView_MPIAIJ_ASCIIorDraworSocket(mat,viewer); CHKERRQ(ierr);
948   } else if (PetscTypeCompare(vtype,BINARY_VIEWER)) {
949     ierr = MatView_MPIAIJ_Binary(mat,viewer);CHKERRQ(ierr);
950   } else {
951     SETERRQ(1,1,"Viewer type not supported by PETSc object");
952   }
953   PetscFunctionReturn(0);
954 }
955 
956 /*
957     This has to provide several versions.
958 
959      2) a) use only local smoothing updating outer values only once.
960         b) local smoothing updating outer values each inner iteration
961      3) color updating out values betwen colors.
962 */
963 #undef __FUNC__
964 #define __FUNC__ "MatRelax_MPIAIJ"
965 int MatRelax_MPIAIJ(Mat matin,Vec bb,double omega,MatSORType flag,
966                            double fshift,int its,Vec xx)
967 {
968   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
969   Mat        AA = mat->A, BB = mat->B;
970   Mat_SeqAIJ *A = (Mat_SeqAIJ *) AA->data, *B = (Mat_SeqAIJ *)BB->data;
971   Scalar     *b,*x,*xs,*ls,d,*v,sum;
972   int        ierr,*idx, *diag;
973   int        n = mat->n, m = mat->m, i,shift = A->indexshift;
974 
975   PetscFunctionBegin;
976   ierr = VecGetArray(xx,&x); CHKERRQ(ierr);
977   ierr = VecGetArray(bb,&b); CHKERRQ(ierr);
978   ierr = VecGetArray(mat->lvec,&ls);CHKERRQ(ierr);
979   xs = x + shift; /* shift by one for index start of 1 */
980   ls = ls + shift;
981   if (!A->diag) {ierr = MatMarkDiag_SeqAIJ(AA); CHKERRQ(ierr);}
982   diag = A->diag;
983   if ((flag & SOR_LOCAL_SYMMETRIC_SWEEP) == SOR_LOCAL_SYMMETRIC_SWEEP){
984     if (flag & SOR_ZERO_INITIAL_GUESS) {
985       ierr = (*mat->A->ops->relax)(mat->A,bb,omega,flag,fshift,its,xx);CHKERRQ(ierr);
986       ierr = VecRestoreArray(xx,&x); CHKERRQ(ierr);
987       ierr = VecRestoreArray(bb,&b); CHKERRQ(ierr);
988       ierr = VecRestoreArray(mat->lvec,&ls);CHKERRQ(ierr);
989       PetscFunctionReturn(0);
990     }
991     ierr = VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);CHKERRQ(ierr);
992     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);CHKERRQ(ierr);
993     while (its--) {
994       /* go down through the rows */
995       for ( i=0; i<m; i++ ) {
996         n    = A->i[i+1] - A->i[i];
997 	PLogFlops(4*n+3);
998         idx  = A->j + A->i[i] + shift;
999         v    = A->a + A->i[i] + shift;
1000         sum  = b[i];
1001         SPARSEDENSEMDOT(sum,xs,v,idx,n);
1002         d    = fshift + A->a[diag[i]+shift];
1003         n    = B->i[i+1] - B->i[i];
1004         idx  = B->j + B->i[i] + shift;
1005         v    = B->a + B->i[i] + shift;
1006         SPARSEDENSEMDOT(sum,ls,v,idx,n);
1007         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
1008       }
1009       /* come up through the rows */
1010       for ( i=m-1; i>-1; i-- ) {
1011         n    = A->i[i+1] - A->i[i];
1012 	PLogFlops(4*n+3)
1013         idx  = A->j + A->i[i] + shift;
1014         v    = A->a + A->i[i] + shift;
1015         sum  = b[i];
1016         SPARSEDENSEMDOT(sum,xs,v,idx,n);
1017         d    = fshift + A->a[diag[i]+shift];
1018         n    = B->i[i+1] - B->i[i];
1019         idx  = B->j + B->i[i] + shift;
1020         v    = B->a + B->i[i] + shift;
1021         SPARSEDENSEMDOT(sum,ls,v,idx,n);
1022         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
1023       }
1024     }
1025   } else if (flag & SOR_LOCAL_FORWARD_SWEEP){
1026     if (flag & SOR_ZERO_INITIAL_GUESS) {
1027       ierr = (*mat->A->ops->relax)(mat->A,bb,omega,flag,fshift,its,xx);CHKERRQ(ierr);
1028       ierr = VecRestoreArray(xx,&x); CHKERRQ(ierr);
1029       ierr = VecRestoreArray(bb,&b); CHKERRQ(ierr);
1030       ierr = VecRestoreArray(mat->lvec,&ls);CHKERRQ(ierr);
1031       PetscFunctionReturn(0);
1032     }
1033     ierr = VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);CHKERRQ(ierr);
1034     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);CHKERRQ(ierr);
1035     while (its--) {
1036       for ( i=0; i<m; i++ ) {
1037         n    = A->i[i+1] - A->i[i];
1038 	PLogFlops(4*n+3);
1039         idx  = A->j + A->i[i] + shift;
1040         v    = A->a + A->i[i] + shift;
1041         sum  = b[i];
1042         SPARSEDENSEMDOT(sum,xs,v,idx,n);
1043         d    = fshift + A->a[diag[i]+shift];
1044         n    = B->i[i+1] - B->i[i];
1045         idx  = B->j + B->i[i] + shift;
1046         v    = B->a + B->i[i] + shift;
1047         SPARSEDENSEMDOT(sum,ls,v,idx,n);
1048         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
1049       }
1050     }
1051   } else if (flag & SOR_LOCAL_BACKWARD_SWEEP){
1052     if (flag & SOR_ZERO_INITIAL_GUESS) {
1053       ierr = (*mat->A->ops->relax)(mat->A,bb,omega,flag,fshift,its,xx);CHKERRQ(ierr);
1054       ierr = VecRestoreArray(xx,&x); CHKERRQ(ierr);
1055       ierr = VecRestoreArray(bb,&b); CHKERRQ(ierr);
1056       ierr = VecRestoreArray(mat->lvec,&ls);CHKERRQ(ierr);
1057       PetscFunctionReturn(0);
1058     }
1059     ierr = VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);CHKERRQ(ierr);
1060     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);CHKERRQ(ierr);
1061     while (its--) {
1062       for ( i=m-1; i>-1; i-- ) {
1063         n    = A->i[i+1] - A->i[i];
1064 	PLogFlops(4*n+3);
1065         idx  = A->j + A->i[i] + shift;
1066         v    = A->a + A->i[i] + shift;
1067         sum  = b[i];
1068         SPARSEDENSEMDOT(sum,xs,v,idx,n);
1069         d    = fshift + A->a[diag[i]+shift];
1070         n    = B->i[i+1] - B->i[i];
1071         idx  = B->j + B->i[i] + shift;
1072         v    = B->a + B->i[i] + shift;
1073         SPARSEDENSEMDOT(sum,ls,v,idx,n);
1074         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
1075       }
1076     }
1077   } else {
1078     SETERRQ(PETSC_ERR_SUP,0,"Parallel SOR not supported");
1079   }
1080   ierr = VecRestoreArray(xx,&x); CHKERRQ(ierr);
1081   ierr = VecRestoreArray(bb,&b); CHKERRQ(ierr);
1082   ierr = VecRestoreArray(mat->lvec,&ls);CHKERRQ(ierr);
1083   PetscFunctionReturn(0);
1084 }
1085 
1086 #undef __FUNC__
1087 #define __FUNC__ "MatGetInfo_MPIAIJ"
1088 int MatGetInfo_MPIAIJ(Mat matin,MatInfoType flag,MatInfo *info)
1089 {
1090   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1091   Mat        A = mat->A, B = mat->B;
1092   int        ierr;
1093   double     isend[5], irecv[5];
1094 
1095   PetscFunctionBegin;
1096   info->block_size     = 1.0;
1097   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
1098   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->nz_unneeded;
1099   isend[3] = info->memory;  isend[4] = info->mallocs;
1100   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
1101   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->nz_unneeded;
1102   isend[3] += info->memory;  isend[4] += info->mallocs;
1103   if (flag == MAT_LOCAL) {
1104     info->nz_used      = isend[0];
1105     info->nz_allocated = isend[1];
1106     info->nz_unneeded  = isend[2];
1107     info->memory       = isend[3];
1108     info->mallocs      = isend[4];
1109   } else if (flag == MAT_GLOBAL_MAX) {
1110     ierr = MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_MAX,matin->comm);CHKERRQ(ierr);
1111     info->nz_used      = irecv[0];
1112     info->nz_allocated = irecv[1];
1113     info->nz_unneeded  = irecv[2];
1114     info->memory       = irecv[3];
1115     info->mallocs      = irecv[4];
1116   } else if (flag == MAT_GLOBAL_SUM) {
1117     ierr = MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_SUM,matin->comm);CHKERRQ(ierr);
1118     info->nz_used      = irecv[0];
1119     info->nz_allocated = irecv[1];
1120     info->nz_unneeded  = irecv[2];
1121     info->memory       = irecv[3];
1122     info->mallocs      = irecv[4];
1123   }
1124   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
1125   info->fill_ratio_needed = 0;
1126   info->factor_mallocs    = 0;
1127   info->rows_global       = (double)mat->M;
1128   info->columns_global    = (double)mat->N;
1129   info->rows_local        = (double)mat->m;
1130   info->columns_local     = (double)mat->N;
1131 
1132   PetscFunctionReturn(0);
1133 }
1134 
1135 #undef __FUNC__
1136 #define __FUNC__ "MatSetOption_MPIAIJ"
1137 int MatSetOption_MPIAIJ(Mat A,MatOption op)
1138 {
1139   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
1140   int        ierr;
1141 
1142   PetscFunctionBegin;
1143   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
1144       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
1145       op == MAT_COLUMNS_UNSORTED ||
1146       op == MAT_COLUMNS_SORTED ||
1147       op == MAT_NEW_NONZERO_ALLOCATION_ERR ||
1148       op == MAT_NEW_NONZERO_LOCATION_ERR) {
1149         ierr = MatSetOption(a->A,op);CHKERRQ(ierr);
1150         ierr = MatSetOption(a->B,op);CHKERRQ(ierr);
1151   } else if (op == MAT_ROW_ORIENTED) {
1152     a->roworiented = 1;
1153     ierr = MatSetOption(a->A,op);CHKERRQ(ierr);
1154     ierr = MatSetOption(a->B,op);CHKERRQ(ierr);
1155   } else if (op == MAT_ROWS_SORTED ||
1156              op == MAT_ROWS_UNSORTED ||
1157              op == MAT_SYMMETRIC ||
1158              op == MAT_STRUCTURALLY_SYMMETRIC ||
1159              op == MAT_YES_NEW_DIAGONALS)
1160     PLogInfo(A,"MatSetOption_MPIAIJ:Option ignored\n");
1161   else if (op == MAT_COLUMN_ORIENTED) {
1162     a->roworiented = 0;
1163     ierr = MatSetOption(a->A,op);CHKERRQ(ierr);
1164     ierr = MatSetOption(a->B,op);CHKERRQ(ierr);
1165   } else if (op == MAT_IGNORE_OFF_PROC_ENTRIES) {
1166     a->donotstash = 1;
1167   } else if (op == MAT_NO_NEW_DIAGONALS){
1168     SETERRQ(PETSC_ERR_SUP,0,"MAT_NO_NEW_DIAGONALS");
1169   } else {
1170     SETERRQ(PETSC_ERR_SUP,0,"unknown option");
1171   }
1172   PetscFunctionReturn(0);
1173 }
1174 
1175 #undef __FUNC__
1176 #define __FUNC__ "MatGetSize_MPIAIJ"
1177 int MatGetSize_MPIAIJ(Mat matin,int *m,int *n)
1178 {
1179   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1180 
1181   PetscFunctionBegin;
1182   if (m) *m = mat->M;
1183   if (n) *n = mat->N;
1184   PetscFunctionReturn(0);
1185 }
1186 
1187 #undef __FUNC__
1188 #define __FUNC__ "MatGetLocalSize_MPIAIJ"
1189 int MatGetLocalSize_MPIAIJ(Mat matin,int *m,int *n)
1190 {
1191   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1192 
1193   PetscFunctionBegin;
1194   if (m) *m = mat->m;
1195   if (n) *n = mat->n;
1196   PetscFunctionReturn(0);
1197 }
1198 
1199 #undef __FUNC__
1200 #define __FUNC__ "MatGetOwnershipRange_MPIAIJ"
1201 int MatGetOwnershipRange_MPIAIJ(Mat matin,int *m,int *n)
1202 {
1203   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1204 
1205   PetscFunctionBegin;
1206   *m = mat->rstart; *n = mat->rend;
1207   PetscFunctionReturn(0);
1208 }
1209 
1210 extern int MatGetRow_SeqAIJ(Mat,int,int*,int**,Scalar**);
1211 extern int MatRestoreRow_SeqAIJ(Mat,int,int*,int**,Scalar**);
1212 
1213 #undef __FUNC__
1214 #define __FUNC__ "MatGetRow_MPIAIJ"
1215 int MatGetRow_MPIAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
1216 {
1217   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1218   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
1219   int        i, ierr, *cworkA, *cworkB, **pcA, **pcB, cstart = mat->cstart;
1220   int        nztot, nzA, nzB, lrow, rstart = mat->rstart, rend = mat->rend;
1221   int        *cmap, *idx_p;
1222 
1223   PetscFunctionBegin;
1224   if (mat->getrowactive == PETSC_TRUE) SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"Already active");
1225   mat->getrowactive = PETSC_TRUE;
1226 
1227   if (!mat->rowvalues && (idx || v)) {
1228     /*
1229         allocate enough space to hold information from the longest row.
1230     */
1231     Mat_SeqAIJ *Aa = (Mat_SeqAIJ *) mat->A->data,*Ba = (Mat_SeqAIJ *) mat->B->data;
1232     int     max = 1,m = mat->m,tmp;
1233     for ( i=0; i<m; i++ ) {
1234       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
1235       if (max < tmp) { max = tmp; }
1236     }
1237     mat->rowvalues  = (Scalar *) PetscMalloc(max*(sizeof(int)+sizeof(Scalar)));CHKPTRQ(mat->rowvalues);
1238     mat->rowindices = (int *) (mat->rowvalues + max);
1239   }
1240 
1241   if (row < rstart || row >= rend) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Only local rows")
1242   lrow = row - rstart;
1243 
1244   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
1245   if (!v)   {pvA = 0; pvB = 0;}
1246   if (!idx) {pcA = 0; if (!v) pcB = 0;}
1247   ierr = (*mat->A->ops->getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1248   ierr = (*mat->B->ops->getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1249   nztot = nzA + nzB;
1250 
1251   cmap  = mat->garray;
1252   if (v  || idx) {
1253     if (nztot) {
1254       /* Sort by increasing column numbers, assuming A and B already sorted */
1255       int imark = -1;
1256       if (v) {
1257         *v = v_p = mat->rowvalues;
1258         for ( i=0; i<nzB; i++ ) {
1259           if (cmap[cworkB[i]] < cstart)   v_p[i] = vworkB[i];
1260           else break;
1261         }
1262         imark = i;
1263         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
1264         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
1265       }
1266       if (idx) {
1267         *idx = idx_p = mat->rowindices;
1268         if (imark > -1) {
1269           for ( i=0; i<imark; i++ ) {
1270             idx_p[i] = cmap[cworkB[i]];
1271           }
1272         } else {
1273           for ( i=0; i<nzB; i++ ) {
1274             if (cmap[cworkB[i]] < cstart)   idx_p[i] = cmap[cworkB[i]];
1275             else break;
1276           }
1277           imark = i;
1278         }
1279         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart + cworkA[i];
1280         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]];
1281       }
1282     } else {
1283       if (idx) *idx = 0;
1284       if (v)   *v   = 0;
1285     }
1286   }
1287   *nz = nztot;
1288   ierr = (*mat->A->ops->restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1289   ierr = (*mat->B->ops->restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1290   PetscFunctionReturn(0);
1291 }
1292 
1293 #undef __FUNC__
1294 #define __FUNC__ "MatRestoreRow_MPIAIJ"
1295 int MatRestoreRow_MPIAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
1296 {
1297   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
1298 
1299   PetscFunctionBegin;
1300   if (aij->getrowactive == PETSC_FALSE) {
1301     SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"MatGetRow not called");
1302   }
1303   aij->getrowactive = PETSC_FALSE;
1304   PetscFunctionReturn(0);
1305 }
1306 
1307 #undef __FUNC__
1308 #define __FUNC__ "MatNorm_MPIAIJ"
1309 int MatNorm_MPIAIJ(Mat mat,NormType type,double *norm)
1310 {
1311   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
1312   Mat_SeqAIJ *amat = (Mat_SeqAIJ*) aij->A->data, *bmat = (Mat_SeqAIJ*) aij->B->data;
1313   int        ierr, i, j, cstart = aij->cstart,shift = amat->indexshift;
1314   double     sum = 0.0;
1315   Scalar     *v;
1316 
1317   PetscFunctionBegin;
1318   if (aij->size == 1) {
1319     ierr =  MatNorm(aij->A,type,norm); CHKERRQ(ierr);
1320   } else {
1321     if (type == NORM_FROBENIUS) {
1322       v = amat->a;
1323       for (i=0; i<amat->nz; i++ ) {
1324 #if defined(USE_PETSC_COMPLEX)
1325         sum += PetscReal(PetscConj(*v)*(*v)); v++;
1326 #else
1327         sum += (*v)*(*v); v++;
1328 #endif
1329       }
1330       v = bmat->a;
1331       for (i=0; i<bmat->nz; i++ ) {
1332 #if defined(USE_PETSC_COMPLEX)
1333         sum += PetscReal(PetscConj(*v)*(*v)); v++;
1334 #else
1335         sum += (*v)*(*v); v++;
1336 #endif
1337       }
1338       ierr = MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);CHKERRQ(ierr);
1339       *norm = sqrt(*norm);
1340     } else if (type == NORM_1) { /* max column norm */
1341       double *tmp, *tmp2;
1342       int    *jj, *garray = aij->garray;
1343       tmp  = (double *) PetscMalloc( (aij->N+1)*sizeof(double) ); CHKPTRQ(tmp);
1344       tmp2 = (double *) PetscMalloc( (aij->N+1)*sizeof(double) ); CHKPTRQ(tmp2);
1345       PetscMemzero(tmp,aij->N*sizeof(double));
1346       *norm = 0.0;
1347       v = amat->a; jj = amat->j;
1348       for ( j=0; j<amat->nz; j++ ) {
1349         tmp[cstart + *jj++ + shift] += PetscAbsScalar(*v);  v++;
1350       }
1351       v = bmat->a; jj = bmat->j;
1352       for ( j=0; j<bmat->nz; j++ ) {
1353         tmp[garray[*jj++ + shift]] += PetscAbsScalar(*v); v++;
1354       }
1355       ierr = MPI_Allreduce(tmp,tmp2,aij->N,MPI_DOUBLE,MPI_SUM,mat->comm);CHKERRQ(ierr);
1356       for ( j=0; j<aij->N; j++ ) {
1357         if (tmp2[j] > *norm) *norm = tmp2[j];
1358       }
1359       PetscFree(tmp); PetscFree(tmp2);
1360     } else if (type == NORM_INFINITY) { /* max row norm */
1361       double ntemp = 0.0;
1362       for ( j=0; j<amat->m; j++ ) {
1363         v = amat->a + amat->i[j] + shift;
1364         sum = 0.0;
1365         for ( i=0; i<amat->i[j+1]-amat->i[j]; i++ ) {
1366           sum += PetscAbsScalar(*v); v++;
1367         }
1368         v = bmat->a + bmat->i[j] + shift;
1369         for ( i=0; i<bmat->i[j+1]-bmat->i[j]; i++ ) {
1370           sum += PetscAbsScalar(*v); v++;
1371         }
1372         if (sum > ntemp) ntemp = sum;
1373       }
1374       ierr = MPI_Allreduce(&ntemp,norm,1,MPI_DOUBLE,MPI_MAX,mat->comm);CHKERRQ(ierr);
1375     } else {
1376       SETERRQ(PETSC_ERR_SUP,0,"No support for two norm");
1377     }
1378   }
1379   PetscFunctionReturn(0);
1380 }
1381 
1382 #undef __FUNC__
1383 #define __FUNC__ "MatTranspose_MPIAIJ"
1384 int MatTranspose_MPIAIJ(Mat A,Mat *matout)
1385 {
1386   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
1387   Mat_SeqAIJ *Aloc = (Mat_SeqAIJ *) a->A->data;
1388   int        ierr,shift = Aloc->indexshift;
1389   int        M = a->M, N = a->N,m,*ai,*aj,row,*cols,i,*ct;
1390   Mat        B;
1391   Scalar     *array;
1392 
1393   PetscFunctionBegin;
1394   if (matout == PETSC_NULL && M != N) {
1395     SETERRQ(PETSC_ERR_ARG_SIZ,0,"Square matrix only for in-place");
1396   }
1397 
1398   ierr = MatCreateMPIAIJ(A->comm,a->n,a->m,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);CHKERRQ(ierr);
1399 
1400   /* copy over the A part */
1401   Aloc = (Mat_SeqAIJ*) a->A->data;
1402   m = Aloc->m; ai = Aloc->i; aj = Aloc->j; array = Aloc->a;
1403   row = a->rstart;
1404   for ( i=0; i<ai[m]+shift; i++ ) {aj[i] += a->cstart + shift;}
1405   for ( i=0; i<m; i++ ) {
1406     ierr = MatSetValues(B,ai[i+1]-ai[i],aj,1,&row,array,INSERT_VALUES);CHKERRQ(ierr);
1407     row++; array += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
1408   }
1409   aj = Aloc->j;
1410   for ( i=0; i<ai[m]+shift; i++ ) {aj[i] -= a->cstart + shift;}
1411 
1412   /* copy over the B part */
1413   Aloc = (Mat_SeqAIJ*) a->B->data;
1414   m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; array = Aloc->a;
1415   row = a->rstart;
1416   ct = cols = (int *) PetscMalloc( (1+ai[m]-shift)*sizeof(int) ); CHKPTRQ(cols);
1417   for ( i=0; i<ai[m]+shift; i++ ) {cols[i] = a->garray[aj[i]+shift];}
1418   for ( i=0; i<m; i++ ) {
1419     ierr = MatSetValues(B,ai[i+1]-ai[i],cols,1,&row,array,INSERT_VALUES);CHKERRQ(ierr);
1420     row++; array += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
1421   }
1422   PetscFree(ct);
1423   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1424   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1425   if (matout != PETSC_NULL) {
1426     *matout = B;
1427   } else {
1428     PetscOps       *Abops;
1429     struct _MatOps *Aops;
1430 
1431     /* This isn't really an in-place transpose .... but free data structures from a */
1432     PetscFree(a->rowners);
1433     ierr = MatDestroy(a->A); CHKERRQ(ierr);
1434     ierr = MatDestroy(a->B); CHKERRQ(ierr);
1435 #if defined (USE_CTABLE)
1436     if (a->colmap) TableDelete(a->colmap);
1437 #else
1438     if (a->colmap) PetscFree(a->colmap);
1439 #endif
1440     if (a->garray) PetscFree(a->garray);
1441     if (a->lvec) VecDestroy(a->lvec);
1442     if (a->Mvctx) VecScatterDestroy(a->Mvctx);
1443     PetscFree(a);
1444 
1445     /*
1446        This is horrible, horrible code. We need to keep the
1447       A pointers for the bops and ops but copy everything
1448       else from C.
1449     */
1450     Abops = A->bops;
1451     Aops  = A->ops;
1452     PetscMemcpy(A,B,sizeof(struct _p_Mat));
1453     A->bops = Abops;
1454     A->ops  = Aops;
1455     PetscHeaderDestroy(B);
1456   }
1457   PetscFunctionReturn(0);
1458 }
1459 
1460 #undef __FUNC__
1461 #define __FUNC__ "MatDiagonalScale_MPIAIJ"
1462 int MatDiagonalScale_MPIAIJ(Mat mat,Vec ll,Vec rr)
1463 {
1464   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
1465   Mat a = aij->A, b = aij->B;
1466   int ierr,s1,s2,s3;
1467 
1468   PetscFunctionBegin;
1469   ierr = MatGetLocalSize(mat,&s2,&s3); CHKERRQ(ierr);
1470   if (rr) {
1471     ierr = VecGetLocalSize(rr,&s1);CHKERRQ(ierr);
1472     if (s1!=s3) SETERRQ(PETSC_ERR_ARG_SIZ,0,"right vector non-conforming local size");
1473     /* Overlap communication with computation. */
1474     ierr = VecScatterBegin(rr,aij->lvec,INSERT_VALUES,SCATTER_FORWARD,aij->Mvctx); CHKERRQ(ierr);
1475   }
1476   if (ll) {
1477     ierr = VecGetLocalSize(ll,&s1);CHKERRQ(ierr);
1478     if (s1!=s2) SETERRQ(PETSC_ERR_ARG_SIZ,0,"left vector non-conforming local size");
1479     ierr = (*b->ops->diagonalscale)(b,ll,0); CHKERRQ(ierr);
1480   }
1481   /* scale  the diagonal block */
1482   ierr = (*a->ops->diagonalscale)(a,ll,rr); CHKERRQ(ierr);
1483 
1484   if (rr) {
1485     /* Do a scatter end and then right scale the off-diagonal block */
1486     ierr = VecScatterEnd(rr,aij->lvec,INSERT_VALUES,SCATTER_FORWARD,aij->Mvctx); CHKERRQ(ierr);
1487     ierr = (*b->ops->diagonalscale)(b,0,aij->lvec); CHKERRQ(ierr);
1488   }
1489 
1490   PetscFunctionReturn(0);
1491 }
1492 
1493 
1494 extern int MatPrintHelp_SeqAIJ(Mat);
1495 #undef __FUNC__
1496 #define __FUNC__ "MatPrintHelp_MPIAIJ"
1497 int MatPrintHelp_MPIAIJ(Mat A)
1498 {
1499   Mat_MPIAIJ *a   = (Mat_MPIAIJ*) A->data;
1500   int        ierr;
1501 
1502   PetscFunctionBegin;
1503   if (!a->rank) {
1504     ierr = MatPrintHelp_SeqAIJ(a->A);CHKERRQ(ierr);
1505   }
1506   PetscFunctionReturn(0);
1507 }
1508 
1509 #undef __FUNC__
1510 #define __FUNC__ "MatGetBlockSize_MPIAIJ"
1511 int MatGetBlockSize_MPIAIJ(Mat A,int *bs)
1512 {
1513   PetscFunctionBegin;
1514   *bs = 1;
1515   PetscFunctionReturn(0);
1516 }
1517 #undef __FUNC__
1518 #define __FUNC__ "MatSetUnfactored_MPIAIJ"
1519 int MatSetUnfactored_MPIAIJ(Mat A)
1520 {
1521   Mat_MPIAIJ *a   = (Mat_MPIAIJ*) A->data;
1522   int        ierr;
1523 
1524   PetscFunctionBegin;
1525   ierr = MatSetUnfactored(a->A); CHKERRQ(ierr);
1526   PetscFunctionReturn(0);
1527 }
1528 
1529 #undef __FUNC__
1530 #define __FUNC__ "MatEqual_MPIAIJ"
1531 int MatEqual_MPIAIJ(Mat A, Mat B, PetscTruth *flag)
1532 {
1533   Mat_MPIAIJ *matB = (Mat_MPIAIJ *) B->data,*matA = (Mat_MPIAIJ *) A->data;
1534   Mat        a, b, c, d;
1535   PetscTruth flg;
1536   int        ierr;
1537 
1538   PetscFunctionBegin;
1539   if (B->type != MATMPIAIJ) SETERRQ(PETSC_ERR_ARG_INCOMP,0,"Matrices must be same type");
1540   a = matA->A; b = matA->B;
1541   c = matB->A; d = matB->B;
1542 
1543   ierr = MatEqual(a, c, &flg); CHKERRQ(ierr);
1544   if (flg == PETSC_TRUE) {
1545     ierr = MatEqual(b, d, &flg); CHKERRQ(ierr);
1546   }
1547   ierr = MPI_Allreduce(&flg, flag, 1, MPI_INT, MPI_LAND, A->comm);CHKERRQ(ierr);
1548   PetscFunctionReturn(0);
1549 }
1550 
1551 #undef __FUNC__
1552 #define __FUNC__ "MatCopy_MPIAIJ"
1553 int MatCopy_MPIAIJ(Mat A,Mat B,MatStructure str)
1554 {
1555   int        ierr;
1556   Mat_MPIAIJ *a = (Mat_MPIAIJ *)A->data;
1557   Mat_MPIAIJ *b = (Mat_MPIAIJ *)B->data;
1558 
1559   PetscFunctionBegin;
1560   if (str != SAME_NONZERO_PATTERN || B->type != MATMPIAIJ) {
1561     /* because of the column compression in the off-processor part of the matrix a->B,
1562        the number of columns in a->B and b->B may be different, hence we cannot call
1563        the MatCopy() directly on the two parts. If need be, we can provide a more
1564        efficient copy than the MatCopy_Basic() by first uncompressing the a->B matrices
1565        then copying the submatrices */
1566     ierr = MatCopy_Basic(A,B,str);CHKERRQ(ierr);
1567   } else {
1568     ierr = MatCopy(a->A,b->A,str);CHKERRQ(ierr);
1569     ierr = MatCopy(a->B,b->B,str);CHKERRQ(ierr);
1570   }
1571   PetscFunctionReturn(0);
1572 }
1573 
1574 extern int MatDuplicate_MPIAIJ(Mat,MatDuplicateOption,Mat *);
1575 extern int MatIncreaseOverlap_MPIAIJ(Mat , int, IS *, int);
1576 extern int MatFDColoringCreate_MPIAIJ(Mat,ISColoring,MatFDColoring);
1577 extern int MatGetSubMatrices_MPIAIJ (Mat ,int , IS *,IS *,MatReuse,Mat **);
1578 extern int MatGetSubMatrix_MPIAIJ (Mat ,IS,IS,int,MatReuse,Mat *);
1579 
1580 /* -------------------------------------------------------------------*/
1581 static struct _MatOps MatOps_Values = {MatSetValues_MPIAIJ,
1582        MatGetRow_MPIAIJ,
1583        MatRestoreRow_MPIAIJ,
1584        MatMult_MPIAIJ,
1585        MatMultAdd_MPIAIJ,
1586        MatMultTrans_MPIAIJ,
1587        MatMultTransAdd_MPIAIJ,
1588        0,
1589        0,
1590        0,
1591        0,
1592        0,
1593        0,
1594        MatRelax_MPIAIJ,
1595        MatTranspose_MPIAIJ,
1596        MatGetInfo_MPIAIJ,
1597        MatEqual_MPIAIJ,
1598        MatGetDiagonal_MPIAIJ,
1599        MatDiagonalScale_MPIAIJ,
1600        MatNorm_MPIAIJ,
1601        MatAssemblyBegin_MPIAIJ,
1602        MatAssemblyEnd_MPIAIJ,
1603        0,
1604        MatSetOption_MPIAIJ,
1605        MatZeroEntries_MPIAIJ,
1606        MatZeroRows_MPIAIJ,
1607        0,
1608        0,
1609        0,
1610        0,
1611        MatGetSize_MPIAIJ,
1612        MatGetLocalSize_MPIAIJ,
1613        MatGetOwnershipRange_MPIAIJ,
1614        0,
1615        0,
1616        0,
1617        0,
1618        MatDuplicate_MPIAIJ,
1619        0,
1620        0,
1621        0,
1622        0,
1623        0,
1624        MatGetSubMatrices_MPIAIJ,
1625        MatIncreaseOverlap_MPIAIJ,
1626        MatGetValues_MPIAIJ,
1627        MatCopy_MPIAIJ,
1628        MatPrintHelp_MPIAIJ,
1629        MatScale_MPIAIJ,
1630        0,
1631        0,
1632        0,
1633        MatGetBlockSize_MPIAIJ,
1634        0,
1635        0,
1636        0,
1637        0,
1638        MatFDColoringCreate_MPIAIJ,
1639        0,
1640        MatSetUnfactored_MPIAIJ,
1641        0,
1642        0,
1643        MatGetSubMatrix_MPIAIJ,
1644        0,
1645        0,
1646        MatGetMaps_Petsc};
1647 
1648 /* ----------------------------------------------------------------------------------------*/
1649 
1650 EXTERN_C_BEGIN
1651 #undef __FUNC__
1652 #define __FUNC__ "MatStoreValues_MPIAIJ"
1653 int MatStoreValues_MPIAIJ(Mat mat)
1654 {
1655   Mat_MPIAIJ *aij = (Mat_MPIAIJ *)mat->data;
1656   int        ierr;
1657 
1658   PetscFunctionBegin;
1659   ierr = MatStoreValues(aij->A);CHKERRQ(ierr);
1660   ierr = MatStoreValues(aij->B);CHKERRQ(ierr);
1661   PetscFunctionReturn(0);
1662 }
1663 EXTERN_C_END
1664 
1665 EXTERN_C_BEGIN
1666 #undef __FUNC__
1667 #define __FUNC__ "MatRetrieveValues_MPIAIJ"
1668 int MatRetrieveValues_MPIAIJ(Mat mat)
1669 {
1670   Mat_MPIAIJ *aij = (Mat_MPIAIJ *)mat->data;
1671   int        ierr;
1672 
1673   PetscFunctionBegin;
1674   ierr = MatRetrieveValues(aij->A);CHKERRQ(ierr);
1675   ierr = MatRetrieveValues(aij->B);CHKERRQ(ierr);
1676   PetscFunctionReturn(0);
1677 }
1678 EXTERN_C_END
1679 
1680 #include "pc.h"
1681 EXTERN_C_BEGIN
1682 extern int MatGetDiagonalBlock_MPIAIJ(Mat,PetscTruth *,MatReuse,Mat *);
1683 EXTERN_C_END
1684 
1685 #undef __FUNC__
1686 #define __FUNC__ "MatCreateMPIAIJ"
1687 /*@C
1688    MatCreateMPIAIJ - Creates a sparse parallel matrix in AIJ format
1689    (the default parallel PETSc format).  For good matrix assembly performance
1690    the user should preallocate the matrix storage by setting the parameters
1691    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1692    performance can be increased by more than a factor of 50.
1693 
1694    Collective on MPI_Comm
1695 
1696    Input Parameters:
1697 +  comm - MPI communicator
1698 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1699            This value should be the same as the local size used in creating the
1700            y vector for the matrix-vector product y = Ax.
1701 .  n - This value should be the same as the local size used in creating the
1702        x vector for the matrix-vector product y = Ax. (or PETSC_DECIDE to have
1703        calculated if N is given) For square matrices n is almost always m.
1704 .  M - number of global rows (or PETSC_DETERMINE to have calculated if m is given)
1705 .  N - number of global columns (or PETSC_DETERMINE to have calculated if n is given)
1706 .  d_nz  - number of nonzeros per row in DIAGONAL portion of local submatrix
1707            (same value is used for all local rows)
1708 .  d_nnz - array containing the number of nonzeros in the various rows of the
1709            DIAGONAL portion of the local submatrix (possibly different for each row)
1710            or PETSC_NULL, if d_nz is used to specify the nonzero structure.
1711            The size of this array is equal to the number of local rows, i.e 'm'.
1712            You must leave room for the diagonal entry even if it is zero.
1713 .  o_nz  - number of nonzeros per row in the OFF-DIAGONAL portion of local
1714            submatrix (same value is used for all local rows).
1715 -  o_nnz - array containing the number of nonzeros in the various rows of the
1716            OFF-DIAGONAL portion of the local submatrix (possibly different for
1717            each row) or PETSC_NULL, if o_nz is used to specify the nonzero
1718            structure. The size of this array is equal to the number
1719            of local rows, i.e 'm'.
1720 
1721    Output Parameter:
1722 .  A - the matrix
1723 
1724    Notes:
1725    m,n,M,N parameters specify the size of the matrix, and its partitioning across
1726    processors, while d_nz,d_nnz,o_nz,o_nnz parameters specify the approximate
1727    storage requirements for this matrix.
1728 
1729    If PETSC_DECIDE or  PETSC_DETERMINE is used for a particular argument on one
1730    processor than it must be used on all processors that share the object for
1731    that argument.
1732 
1733    The AIJ format (also called the Yale sparse matrix format or
1734    compressed row storage), is fully compatible with standard Fortran 77
1735    storage.  That is, the stored row and column indices can begin at
1736    either one (as in Fortran) or zero.  See the users manual for details.
1737 
1738    The user MUST specify either the local or global matrix dimensions
1739    (possibly both).
1740 
1741    The parallel matrix is partitioned such that the first m0 rows belong to
1742    process 0, the next m1 rows belong to process 1, the next m2 rows belong
1743    to process 2 etc.. where m0,m1,m2... are the input parameter 'm'.
1744 
1745    The DIAGONAL portion of the local submatrix of a processor can be defined
1746    as the submatrix which is obtained by extraction the part corresponding
1747    to the rows r1-r2 and columns r1-r2 of the global matrix, where r1 is the
1748    first row that belongs to the processor, and r2 is the last row belonging
1749    to the this processor. This is a square mxm matrix. The remaining portion
1750    of the local submatrix (mxN) constitute the OFF-DIAGONAL portion.
1751 
1752    If o_nnz, d_nnz are specified, then o_nz, and d_nz are ignored.
1753 
1754    By default, this format uses inodes (identical nodes) when possible.
1755    We search for consecutive rows with the same nonzero structure, thereby
1756    reusing matrix information to achieve increased efficiency.
1757 
1758    Options Database Keys:
1759 +  -mat_aij_no_inode  - Do not use inodes
1760 .  -mat_aij_inode_limit <limit> - Sets inode limit (max limit=5)
1761 -  -mat_aij_oneindex - Internally use indexing starting at 1
1762         rather than 0.  Note that when calling MatSetValues(),
1763         the user still MUST index entries starting at 0!
1764 .   -mat_mpi - use the parallel matrix data structures even on one processor
1765                (defaults to using SeqBAIJ format on one processor)
1766 .   -mat_mpi - use the parallel matrix data structures even on one processor
1767                (defaults to using SeqAIJ format on one processor)
1768 
1769 
1770    Example usage:
1771 
1772    Consider the following 8x8 matrix with 34 non-zero values, that is
1773    assembled across 3 processors. Lets assume that proc0 owns 3 rows,
1774    proc1 owns 3 rows, proc2 owns 2 rows. This division can be shown
1775    as follows:
1776 
1777 .vb
1778             1  2  0  |  0  3  0  |  0  4
1779     Proc0   0  5  6  |  7  0  0  |  8  0
1780             9  0 10  | 11  0  0  | 12  0
1781     -------------------------------------
1782            13  0 14  | 15 16 17  |  0  0
1783     Proc1   0 18  0  | 19 20 21  |  0  0
1784             0  0  0  | 22 23  0  | 24  0
1785     -------------------------------------
1786     Proc2  25 26 27  |  0  0 28  | 29  0
1787            30  0  0  | 31 32 33  |  0 34
1788 .ve
1789 
1790    This can be represented as a collection of submatrices as:
1791 
1792 .vb
1793       A B C
1794       D E F
1795       G H I
1796 .ve
1797 
1798    Where the submatrices A,B,C are owned by proc0, D,E,F are
1799    owned by proc1, G,H,I are owned by proc2.
1800 
1801    The 'm' parameters for proc0,proc1,proc2 are 3,3,2 respectively.
1802    The 'n' parameters for proc0,proc1,proc2 are 3,3,2 respectively.
1803    The 'M','N' parameters are 8,8, and have the same values on all procs.
1804 
1805    The DIAGONAL submatrices corresponding to proc0,proc1,proc2 are
1806    submatrices [A], [E], [I] respectively. The OFF-DIAGONAL submatrices
1807    corresponding to proc0,proc1,proc2 are [BC], [DF], [GH] respectively.
1808    Internally, each processor stores the DIAGONAL part, and the OFF-DIAGONAL
1809    part as SeqAIJ matrices. for eg: proc1 will store [E] as a SeqAIJ
1810    matrix, ans [DF] as another SeqAIJ matrix.
1811 
1812    When d_nz, o_nz parameters are specified, d_nz storage elements are
1813    allocated for every row of the local diagonal submatrix, and o_nz
1814    storage locations are allocated for every row of the OFF-DIAGONAL submat.
1815    One way to choose d_nz and o_nz is to use the max nonzerors per local
1816    rows for each of the local DIAGONAL, and the OFF-DIAGONAL submatrices.
1817    In this case, the values of d_nz,o_nz are:
1818 .vb
1819      proc0 : dnz = 2, o_nz = 2
1820      proc1 : dnz = 3, o_nz = 2
1821      proc2 : dnz = 1, o_nz = 4
1822 .ve
1823    We are allocating m*(d_nz+o_nz) storage locations for every proc. This
1824    translates to 3*(2+2)=12 for proc0, 3*(3+2)=15 for proc1, 2*(1+4)=10
1825    for proc3. i.e we are using 12+15+10=37 storage locations to store
1826    34 values.
1827 
1828    When d_nnz, o_nnz parameters are specified, the storage is specified
1829    for every row, coresponding to both DIAGONAL and OFF-DIAGONAL submatrices.
1830    In the above case the values for d_nnz,o_nnz are:
1831 .vb
1832      proc0: d_nnz = [2,2,2] and o_nnz = [2,2,2]
1833      proc1: d_nnz = [3,3,2] and o_nnz = [2,1,1]
1834      proc2: d_nnz = [1,1]   and o_nnz = [4,4]
1835 .ve
1836    Here the space allocated is sum of all the above values i.e 34, and
1837    hence pre-allocation is perfect.
1838 
1839    Level: intermediate
1840 
1841 .keywords: matrix, aij, compressed row, sparse, parallel
1842 
1843 .seealso: MatCreate(), MatCreateSeqAIJ(), MatSetValues()
1844 @*/
1845 int MatCreateMPIAIJ(MPI_Comm comm,int m,int n,int M,int N,int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1846 {
1847   Mat          B;
1848   Mat_MPIAIJ   *b;
1849   int          ierr, i,size,flag1 = 0, flag2 = 0;
1850 
1851   PetscFunctionBegin;
1852   MPI_Comm_size(comm,&size);
1853   ierr = OptionsHasName(PETSC_NULL,"-mat_mpiaij",&flag1); CHKERRQ(ierr);
1854   ierr = OptionsHasName(PETSC_NULL,"-mat_mpi",&flag2); CHKERRQ(ierr);
1855   if (!flag1 && !flag2 && size == 1) {
1856     if (M == PETSC_DECIDE) M = m;
1857     if (N == PETSC_DECIDE) N = n;
1858     ierr = MatCreateSeqAIJ(comm,M,N,d_nz,d_nnz,A); CHKERRQ(ierr);
1859     PetscFunctionReturn(0);
1860   }
1861 
1862   *A = 0;
1863   PetscHeaderCreate(B,_p_Mat,struct _MatOps,MAT_COOKIE,MATMPIAIJ,"Mat",comm,MatDestroy,MatView);
1864   PLogObjectCreate(B);
1865   B->data            = (void *) (b = PetscNew(Mat_MPIAIJ)); CHKPTRQ(b);
1866   PetscMemzero(b,sizeof(Mat_MPIAIJ));
1867   PetscMemcpy(B->ops,&MatOps_Values,sizeof(struct _MatOps));
1868   B->ops->destroy    = MatDestroy_MPIAIJ;
1869   B->ops->view       = MatView_MPIAIJ;
1870   B->factor          = 0;
1871   B->assembled       = PETSC_FALSE;
1872   B->mapping         = 0;
1873 
1874   B->insertmode      = NOT_SET_VALUES;
1875   b->size            = size;
1876   MPI_Comm_rank(comm,&b->rank);
1877 
1878   if (m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL)) {
1879     SETERRQ(PETSC_ERR_ARG_WRONG,0,"Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1880   }
1881 
1882   ierr = PetscSplitOwnership(comm,&m,&M);CHKERRQ(ierr);
1883   ierr = PetscSplitOwnership(comm,&n,&N);CHKERRQ(ierr);
1884   b->m = m; B->m = m;
1885   b->n = n; B->n = n;
1886   b->N = N; B->N = N;
1887   b->M = M; B->M = M;
1888 
1889   /* the information in the maps duplicates the information computed below, eventually
1890      we should remove the duplicate information that is not contained in the maps */
1891   ierr = MapCreateMPI(comm,m,M,&B->rmap);CHKERRQ(ierr);
1892   ierr = MapCreateMPI(comm,n,N,&B->cmap);CHKERRQ(ierr);
1893 
1894   /* build local table of row and column ownerships */
1895   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1896   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIAIJ));
1897   b->cowners = b->rowners + b->size + 2;
1898   ierr = MPI_Allgather(&m,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1899   b->rowners[0] = 0;
1900   for ( i=2; i<=b->size; i++ ) {
1901     b->rowners[i] += b->rowners[i-1];
1902   }
1903   b->rstart = b->rowners[b->rank];
1904   b->rend   = b->rowners[b->rank+1];
1905   ierr = MPI_Allgather(&n,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1906   b->cowners[0] = 0;
1907   for ( i=2; i<=b->size; i++ ) {
1908     b->cowners[i] += b->cowners[i-1];
1909   }
1910   b->cstart = b->cowners[b->rank];
1911   b->cend   = b->cowners[b->rank+1];
1912 
1913   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1914   ierr = MatCreateSeqAIJ(PETSC_COMM_SELF,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1915   PLogObjectParent(B,b->A);
1916   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1917   ierr = MatCreateSeqAIJ(PETSC_COMM_SELF,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1918   PLogObjectParent(B,b->B);
1919 
1920   /* build cache for off array entries formed */
1921   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1922   b->donotstash  = 0;
1923   b->colmap      = 0;
1924   b->garray      = 0;
1925   b->roworiented = 1;
1926 
1927   /* stuff used for matrix vector multiply */
1928   b->lvec      = 0;
1929   b->Mvctx     = 0;
1930 
1931   /* stuff for MatGetRow() */
1932   b->rowindices   = 0;
1933   b->rowvalues    = 0;
1934   b->getrowactive = PETSC_FALSE;
1935 
1936   ierr = PetscObjectComposeFunction((PetscObject)B,"MatStoreValues_C",
1937                                      "MatStoreValues_MPIAIJ",
1938                                      (void*)MatStoreValues_MPIAIJ);CHKERRQ(ierr);
1939   ierr = PetscObjectComposeFunction((PetscObject)B,"MatRetrieveValues_C",
1940                                      "MatRetrieveValues_MPIAIJ",
1941                                      (void*)MatRetrieveValues_MPIAIJ);CHKERRQ(ierr);
1942   ierr = PetscObjectComposeFunction((PetscObject)B,"MatGetDiagonalBlock_C",
1943                                      "MatGetDiagonalBlock_MPIAIJ",
1944                                      (void*)MatGetDiagonalBlock_MPIAIJ);CHKERRQ(ierr);
1945   *A = B;
1946   PetscFunctionReturn(0);
1947 }
1948 
1949 #undef __FUNC__
1950 #define __FUNC__ "MatDuplicate_MPIAIJ"
1951 int MatDuplicate_MPIAIJ(Mat matin,MatDuplicateOption cpvalues,Mat *newmat)
1952 {
1953   Mat        mat;
1954   Mat_MPIAIJ *a,*oldmat = (Mat_MPIAIJ *) matin->data;
1955   int        ierr, len=0, flg;
1956 
1957   PetscFunctionBegin;
1958   *newmat       = 0;
1959   PetscHeaderCreate(mat,_p_Mat,struct _MatOps,MAT_COOKIE,MATMPIAIJ,"Mat",matin->comm,MatDestroy,MatView);
1960   PLogObjectCreate(mat);
1961   mat->data       = (void *) (a = PetscNew(Mat_MPIAIJ)); CHKPTRQ(a);
1962   PetscMemcpy(mat->ops,&MatOps_Values,sizeof(struct _MatOps));
1963   mat->ops->destroy = MatDestroy_MPIAIJ;
1964   mat->ops->view    = MatView_MPIAIJ;
1965   mat->factor       = matin->factor;
1966   mat->assembled    = PETSC_TRUE;
1967   mat->insertmode   = NOT_SET_VALUES;
1968 
1969   a->m = mat->m   = oldmat->m;
1970   a->n = mat->n   = oldmat->n;
1971   a->M = mat->M   = oldmat->M;
1972   a->N = mat->N   = oldmat->N;
1973 
1974   a->rstart       = oldmat->rstart;
1975   a->rend         = oldmat->rend;
1976   a->cstart       = oldmat->cstart;
1977   a->cend         = oldmat->cend;
1978   a->size         = oldmat->size;
1979   a->rank         = oldmat->rank;
1980   a->donotstash   = oldmat->donotstash;
1981   a->roworiented  = oldmat->roworiented;
1982   a->rowindices   = 0;
1983   a->rowvalues    = 0;
1984   a->getrowactive = PETSC_FALSE;
1985 
1986   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1987   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIAIJ));
1988   a->cowners = a->rowners + a->size + 2;
1989   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1990   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1991   if (oldmat->colmap) {
1992 #if defined (USE_CTABLE)
1993     ierr = TableCreateCopy(oldmat->colmap,&a->colmap); CHKERRQ(ierr);
1994 #else
1995     a->colmap = (int *) PetscMalloc((a->N)*sizeof(int));CHKPTRQ(a->colmap);
1996     PLogObjectMemory(mat,(a->N)*sizeof(int));
1997     PetscMemcpy(a->colmap,oldmat->colmap,(a->N)*sizeof(int));
1998 #endif
1999   } else a->colmap = 0;
2000   if (oldmat->garray) {
2001     len = ((Mat_SeqAIJ *) (oldmat->B->data))->n;
2002     a->garray = (int *) PetscMalloc((len+1)*sizeof(int)); CHKPTRQ(a->garray);
2003     PLogObjectMemory(mat,len*sizeof(int));
2004     if (len) PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
2005   } else a->garray = 0;
2006 
2007   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
2008   PLogObjectParent(mat,a->lvec);
2009   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
2010   PLogObjectParent(mat,a->Mvctx);
2011   ierr =  MatDuplicate(oldmat->A,cpvalues,&a->A); CHKERRQ(ierr);
2012   PLogObjectParent(mat,a->A);
2013   ierr =  MatDuplicate(oldmat->B,cpvalues,&a->B); CHKERRQ(ierr);
2014   PLogObjectParent(mat,a->B);
2015   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
2016   if (flg) {
2017     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
2018   }
2019   ierr = FListDuplicate(matin->qlist,&mat->qlist);CHKERRQ(ierr);
2020   *newmat = mat;
2021   PetscFunctionReturn(0);
2022 }
2023 
2024 #include "sys.h"
2025 
2026 #undef __FUNC__
2027 #define __FUNC__ "MatLoad_MPIAIJ"
2028 int MatLoad_MPIAIJ(Viewer viewer,MatType type,Mat *newmat)
2029 {
2030   Mat          A;
2031   Scalar       *vals,*svals;
2032   MPI_Comm     comm = ((PetscObject)viewer)->comm;
2033   MPI_Status   status;
2034   int          i, nz, ierr, j,rstart, rend, fd;
2035   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,maxnz,*cols;
2036   int          *ourlens,*sndcounts = 0,*procsnz = 0, *offlens,jj,*mycols,*smycols;
2037   int          tag = ((PetscObject)viewer)->tag,cend,cstart,n;
2038 
2039   PetscFunctionBegin;
2040   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
2041   if (!rank) {
2042     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
2043     ierr = PetscBinaryRead(fd,(char *)header,4,PETSC_INT); CHKERRQ(ierr);
2044     if (header[0] != MAT_COOKIE) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"not matrix object");
2045     if (header[3] < 0) {
2046       SETERRQ(PETSC_ERR_FILE_UNEXPECTED,1,"Matrix in special format on disk, cannot load as MPIAIJ");
2047     }
2048   }
2049 
2050   ierr = MPI_Bcast(header+1,3,MPI_INT,0,comm);CHKERRQ(ierr);
2051   M = header[1]; N = header[2];
2052   /* determine ownership of all rows */
2053   m = M/size + ((M % size) > rank);
2054   rowners = (int *) PetscMalloc((size+2)*sizeof(int)); CHKPTRQ(rowners);
2055   ierr = MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
2056   rowners[0] = 0;
2057   for ( i=2; i<=size; i++ ) {
2058     rowners[i] += rowners[i-1];
2059   }
2060   rstart = rowners[rank];
2061   rend   = rowners[rank+1];
2062 
2063   /* distribute row lengths to all processors */
2064   ourlens = (int*) PetscMalloc( 2*(rend-rstart)*sizeof(int) ); CHKPTRQ(ourlens);
2065   offlens = ourlens + (rend-rstart);
2066   if (!rank) {
2067     rowlengths = (int*) PetscMalloc( M*sizeof(int) ); CHKPTRQ(rowlengths);
2068     ierr = PetscBinaryRead(fd,rowlengths,M,PETSC_INT); CHKERRQ(ierr);
2069     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
2070     for ( i=0; i<size; i++ ) sndcounts[i] = rowners[i+1] - rowners[i];
2071     ierr = MPI_Scatterv(rowlengths,sndcounts,rowners,MPI_INT,ourlens,rend-rstart,MPI_INT,0,comm);CHKERRQ(ierr);
2072     PetscFree(sndcounts);
2073   } else {
2074     ierr = MPI_Scatterv(0,0,0,MPI_INT,ourlens,rend-rstart,MPI_INT, 0,comm);CHKERRQ(ierr);
2075   }
2076 
2077   if (!rank) {
2078     /* calculate the number of nonzeros on each processor */
2079     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
2080     PetscMemzero(procsnz,size*sizeof(int));
2081     for ( i=0; i<size; i++ ) {
2082       for ( j=rowners[i]; j< rowners[i+1]; j++ ) {
2083         procsnz[i] += rowlengths[j];
2084       }
2085     }
2086     PetscFree(rowlengths);
2087 
2088     /* determine max buffer needed and allocate it */
2089     maxnz = 0;
2090     for ( i=0; i<size; i++ ) {
2091       maxnz = PetscMax(maxnz,procsnz[i]);
2092     }
2093     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
2094 
2095     /* read in my part of the matrix column indices  */
2096     nz     = procsnz[0];
2097     mycols = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
2098     ierr   = PetscBinaryRead(fd,mycols,nz,PETSC_INT); CHKERRQ(ierr);
2099 
2100     /* read in every one elses and ship off */
2101     for ( i=1; i<size; i++ ) {
2102       nz   = procsnz[i];
2103       ierr = PetscBinaryRead(fd,cols,nz,PETSC_INT); CHKERRQ(ierr);
2104       ierr = MPI_Send(cols,nz,MPI_INT,i,tag,comm);CHKERRQ(ierr);
2105     }
2106     PetscFree(cols);
2107   } else {
2108     /* determine buffer space needed for message */
2109     nz = 0;
2110     for ( i=0; i<m; i++ ) {
2111       nz += ourlens[i];
2112     }
2113     mycols = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
2114 
2115     /* receive message of column indices*/
2116     ierr = MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);CHKERRQ(ierr);
2117     ierr = MPI_Get_count(&status,MPI_INT,&maxnz);CHKERRQ(ierr);
2118     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"something is wrong with file");
2119   }
2120 
2121   /* determine column ownership if matrix is not square */
2122   if (N != M) {
2123     n      = N/size + ((N % size) > rank);
2124     ierr   = MPI_Scan(&n,&cend,1,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
2125     cstart = cend - n;
2126   } else {
2127     cstart = rstart;
2128     cend   = rend;
2129     n      = cend - cstart;
2130   }
2131 
2132   /* loop over local rows, determining number of off diagonal entries */
2133   PetscMemzero(offlens,m*sizeof(int));
2134   jj = 0;
2135   for ( i=0; i<m; i++ ) {
2136     for ( j=0; j<ourlens[i]; j++ ) {
2137       if (mycols[jj] < cstart || mycols[jj] >= cend) offlens[i]++;
2138       jj++;
2139     }
2140   }
2141 
2142   /* create our matrix */
2143   for ( i=0; i<m; i++ ) {
2144     ourlens[i] -= offlens[i];
2145   }
2146   ierr = MatCreateMPIAIJ(comm,m,n,M,N,0,ourlens,0,offlens,newmat);CHKERRQ(ierr);
2147   A = *newmat;
2148   ierr = MatSetOption(A,MAT_COLUMNS_SORTED); CHKERRQ(ierr);
2149   for ( i=0; i<m; i++ ) {
2150     ourlens[i] += offlens[i];
2151   }
2152 
2153   if (!rank) {
2154     vals = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(vals);
2155 
2156     /* read in my part of the matrix numerical values  */
2157     nz   = procsnz[0];
2158     ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
2159 
2160     /* insert into matrix */
2161     jj      = rstart;
2162     smycols = mycols;
2163     svals   = vals;
2164     for ( i=0; i<m; i++ ) {
2165       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
2166       smycols += ourlens[i];
2167       svals   += ourlens[i];
2168       jj++;
2169     }
2170 
2171     /* read in other processors and ship out */
2172     for ( i=1; i<size; i++ ) {
2173       nz   = procsnz[i];
2174       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
2175       ierr = MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);CHKERRQ(ierr);
2176     }
2177     PetscFree(procsnz);
2178   } else {
2179     /* receive numeric values */
2180     vals = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(vals);
2181 
2182     /* receive message of values*/
2183     ierr = MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);CHKERRQ(ierr);
2184     ierr = MPI_Get_count(&status,MPIU_SCALAR,&maxnz);CHKERRQ(ierr);
2185     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"something is wrong with file");
2186 
2187     /* insert into matrix */
2188     jj      = rstart;
2189     smycols = mycols;
2190     svals   = vals;
2191     for ( i=0; i<m; i++ ) {
2192       ierr     = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
2193       smycols += ourlens[i];
2194       svals   += ourlens[i];
2195       jj++;
2196     }
2197   }
2198   PetscFree(ourlens); PetscFree(vals); PetscFree(mycols); PetscFree(rowners);
2199 
2200   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2201   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2202   PetscFunctionReturn(0);
2203 }
2204 
2205 #undef __FUNC__
2206 #define __FUNC__ "MatGetSubMatrix_MPIAIJ"
2207 /*
2208     Not great since it makes two copies of the submatrix, first an SeqAIJ
2209   in local and then by concatenating the local matrices the end result.
2210   Writing it directly would be much like MatGetSubMatrices_MPIAIJ()
2211 */
2212 int MatGetSubMatrix_MPIAIJ(Mat mat,IS isrow,IS iscol,int csize,MatReuse call,Mat *newmat)
2213 {
2214   int        ierr, i, m,n,rstart,row,rend,nz,*cwork,size,rank,j;
2215   Mat        *local,M, Mreuse;
2216   Scalar     *vwork,*aa;
2217   MPI_Comm   comm = mat->comm;
2218   Mat_SeqAIJ *aij;
2219   int        *ii, *jj,nlocal,*dlens,*olens,dlen,olen,jend;
2220 
2221   PetscFunctionBegin;
2222   MPI_Comm_rank(comm,&rank);
2223   MPI_Comm_size(comm,&size);
2224 
2225   if (call ==  MAT_REUSE_MATRIX) {
2226     ierr = PetscObjectQuery((PetscObject)*newmat,"SubMatrix",(PetscObject *)&Mreuse);CHKERRQ(ierr);
2227     if (!Mreuse) SETERRQ(1,1,"Submatrix passed in was not used before, cannot reuse");
2228     local = &Mreuse;
2229     ierr  = MatGetSubMatrices(mat,1,&isrow,&iscol,MAT_REUSE_MATRIX,&local);CHKERRQ(ierr);
2230   } else {
2231     ierr = MatGetSubMatrices(mat,1,&isrow,&iscol,MAT_INITIAL_MATRIX,&local);CHKERRQ(ierr);
2232     Mreuse = *local;
2233     PetscFree(local);
2234   }
2235 
2236   /*
2237       m - number of local rows
2238       n - number of columns (same on all processors)
2239       rstart - first row in new global matrix generated
2240   */
2241   ierr = MatGetSize(Mreuse,&m,&n);CHKERRQ(ierr);
2242   if (call == MAT_INITIAL_MATRIX) {
2243     aij = (Mat_SeqAIJ *) (Mreuse)->data;
2244     if (aij->indexshift) SETERRQ(PETSC_ERR_SUP,1,"No support for index shifted matrix");
2245     ii  = aij->i;
2246     jj  = aij->j;
2247 
2248     /*
2249         Determine the number of non-zeros in the diagonal and off-diagonal
2250         portions of the matrix in order to do correct preallocation
2251     */
2252 
2253     /* first get start and end of "diagonal" columns */
2254     if (csize == PETSC_DECIDE) {
2255       nlocal = n/size + ((n % size) > rank);
2256     } else {
2257       nlocal = csize;
2258     }
2259     ierr   = MPI_Scan(&nlocal,&rend,1,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
2260     rstart = rend - nlocal;
2261     if (rank == size - 1 && rend != n) {
2262       SETERRQ(1,1,"Local column sizes do not add up to total number of columns");
2263     }
2264 
2265     /* next, compute all the lengths */
2266     dlens = (int *) PetscMalloc( (2*m+1)*sizeof(int) );CHKPTRQ(dlens);
2267     olens = dlens + m;
2268     for ( i=0; i<m; i++ ) {
2269       jend = ii[i+1] - ii[i];
2270       olen = 0;
2271       dlen = 0;
2272       for ( j=0; j<jend; j++ ) {
2273         if ( *jj < rstart || *jj >= rend) olen++;
2274         else dlen++;
2275         jj++;
2276       }
2277       olens[i] = olen;
2278       dlens[i] = dlen;
2279     }
2280     ierr = MatCreateMPIAIJ(comm,m,nlocal,PETSC_DECIDE,n,0,dlens,0,olens,&M);CHKERRQ(ierr);
2281     PetscFree(dlens);
2282   } else {
2283     int ml,nl;
2284 
2285     M = *newmat;
2286     ierr = MatGetLocalSize(M,&ml,&nl);CHKERRQ(ierr);
2287     if (ml != m) SETERRQ(PETSC_ERR_ARG_SIZ,1,"Previous matrix must be same size/layout as request");
2288     ierr = MatZeroEntries(M);CHKERRQ(ierr);
2289     /*
2290          The next two lines are needed so we may call MatSetValues_MPIAIJ() below directly,
2291        rather than the slower MatSetValues().
2292     */
2293     M->was_assembled = PETSC_TRUE;
2294     M->assembled     = PETSC_FALSE;
2295   }
2296   ierr = MatGetOwnershipRange(M,&rstart,&rend); CHKERRQ(ierr);
2297   aij = (Mat_SeqAIJ *) (Mreuse)->data;
2298   if (aij->indexshift) SETERRQ(PETSC_ERR_SUP,1,"No support for index shifted matrix");
2299   ii  = aij->i;
2300   jj  = aij->j;
2301   aa  = aij->a;
2302   for (i=0; i<m; i++) {
2303     row   = rstart + i;
2304     nz    = ii[i+1] - ii[i];
2305     cwork = jj;     jj += nz;
2306     vwork = aa;     aa += nz;
2307     ierr = MatSetValues_MPIAIJ(M,1,&row,nz,cwork,vwork,INSERT_VALUES); CHKERRQ(ierr);
2308   }
2309 
2310   ierr = MatAssemblyBegin(M,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2311   ierr = MatAssemblyEnd(M,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2312   *newmat = M;
2313 
2314   /* save submatrix used in processor for next request */
2315   if (call ==  MAT_INITIAL_MATRIX) {
2316     ierr = PetscObjectCompose((PetscObject)M,"SubMatrix",(PetscObject)Mreuse);CHKERRQ(ierr);
2317     ierr = PetscObjectDereference((PetscObject)Mreuse);CHKERRQ(ierr);
2318   }
2319 
2320   PetscFunctionReturn(0);
2321 }
2322