xref: /petsc/src/mat/impls/aij/mpi/mpiaij.c (revision b5bd3ad58742d10278d184b667b60ae1619f8109)
1 
2 #ifdef PETSC_RCS_HEADER
3 static char vcid[] = "$Id: mpiaij.c,v 1.244 1998/05/11 21:53:56 bsmith Exp bsmith $";
4 #endif
5 
6 #include "pinclude/pviewer.h"
7 #include "src/mat/impls/aij/mpi/mpiaij.h"
8 #include "src/vec/vecimpl.h"
9 #include "src/inline/spops.h"
10 
11 /* local utility routine that creates a mapping from the global column
12 number to the local number in the off-diagonal part of the local
13 storage of the matrix.  This is done in a non scable way since the
14 length of colmap equals the global matrix length.
15 */
16 #undef __FUNC__
17 #define __FUNC__ "CreateColmap_MPIAIJ_Private"
18 int CreateColmap_MPIAIJ_Private(Mat mat)
19 {
20   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
21   Mat_SeqAIJ *B = (Mat_SeqAIJ*) aij->B->data;
22   int        n = B->n,i;
23 
24   PetscFunctionBegin;
25   aij->colmap = (int *) PetscMalloc((aij->N+1)*sizeof(int));CHKPTRQ(aij->colmap);
26   PLogObjectMemory(mat,aij->N*sizeof(int));
27   PetscMemzero(aij->colmap,aij->N*sizeof(int));
28   for ( i=0; i<n; i++ ) aij->colmap[aij->garray[i]] = i+1;
29   PetscFunctionReturn(0);
30 }
31 
32 extern int DisAssemble_MPIAIJ(Mat);
33 
34 #define CHUNKSIZE   15
35 #define MatSetValues_SeqAIJ_A_Private(row,col,value,addv) \
36 { \
37  \
38     rp   = aj + ai[row] + shift; ap = aa + ai[row] + shift; \
39     rmax = aimax[row]; nrow = ailen[row];  \
40     col1 = col - shift; \
41      \
42     low = 0; high = nrow; \
43     while (high-low > 5) { \
44       t = (low+high)/2; \
45       if (rp[t] > col) high = t; \
46       else             low  = t; \
47     } \
48       for ( _i=0; _i<nrow; _i++ ) { \
49         if (rp[_i] > col1) break; \
50         if (rp[_i] == col1) { \
51           if (addv == ADD_VALUES) ap[_i] += value;   \
52           else                  ap[_i] = value; \
53           goto a_noinsert; \
54         } \
55       }  \
56       if (nonew == 1) goto a_noinsert; \
57       else if (nonew == -1) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero into matrix"); \
58       if (nrow >= rmax) { \
59         /* there is no extra room in row, therefore enlarge */ \
60         int    new_nz = ai[a->m] + CHUNKSIZE,len,*new_i,*new_j; \
61         Scalar *new_a; \
62  \
63         if (nonew == -2) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero in the matrix"); \
64  \
65         /* malloc new storage space */ \
66         len     = new_nz*(sizeof(int)+sizeof(Scalar))+(a->m+1)*sizeof(int); \
67         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
68         new_j   = (int *) (new_a + new_nz); \
69         new_i   = new_j + new_nz; \
70  \
71         /* copy over old data into new slots */ \
72         for ( ii=0; ii<row+1; ii++ ) {new_i[ii] = ai[ii];} \
73         for ( ii=row+1; ii<a->m+1; ii++ ) {new_i[ii] = ai[ii]+CHUNKSIZE;} \
74         PetscMemcpy(new_j,aj,(ai[row]+nrow+shift)*sizeof(int)); \
75         len = (new_nz - CHUNKSIZE - ai[row] - nrow - shift); \
76         PetscMemcpy(new_j+ai[row]+shift+nrow+CHUNKSIZE,aj+ai[row]+shift+nrow, \
77                                                            len*sizeof(int)); \
78         PetscMemcpy(new_a,aa,(ai[row]+nrow+shift)*sizeof(Scalar)); \
79         PetscMemcpy(new_a+ai[row]+shift+nrow+CHUNKSIZE,aa+ai[row]+shift+nrow, \
80                                                            len*sizeof(Scalar));  \
81         /* free up old matrix storage */ \
82  \
83         PetscFree(a->a);  \
84         if (!a->singlemalloc) {PetscFree(a->i);PetscFree(a->j);} \
85         aa = a->a = new_a; ai = a->i = new_i; aj = a->j = new_j;  \
86         a->singlemalloc = 1; \
87  \
88         rp   = aj + ai[row] + shift; ap = aa + ai[row] + shift; \
89         rmax = aimax[row] = aimax[row] + CHUNKSIZE; \
90         PLogObjectMemory(A,CHUNKSIZE*(sizeof(int) + sizeof(Scalar))); \
91         a->maxnz += CHUNKSIZE; \
92         a->reallocs++; \
93       } \
94       N = nrow++ - 1; a->nz++; \
95       /* shift up all the later entries in this row */ \
96       for ( ii=N; ii>=_i; ii-- ) { \
97         rp[ii+1] = rp[ii]; \
98         ap[ii+1] = ap[ii]; \
99       } \
100       rp[_i] = col1;  \
101       ap[_i] = value;  \
102       a_noinsert: ; \
103       ailen[row] = nrow; \
104 }
105 
106 #define MatSetValues_SeqAIJ_B_Private(row,col,value,addv) \
107 { \
108  \
109     rp   = bj + bi[row] + shift; ap = ba + bi[row] + shift; \
110     rmax = bimax[row]; nrow = bilen[row];  \
111     col1 = col - shift; \
112      \
113     low = 0; high = nrow; \
114     while (high-low > 5) { \
115       t = (low+high)/2; \
116       if (rp[t] > col) high = t; \
117       else             low  = t; \
118     } \
119        for ( _i=0; _i<nrow; _i++ ) { \
120         if (rp[_i] > col1) break; \
121         if (rp[_i] == col1) { \
122           if (addv == ADD_VALUES) ap[_i] += value;   \
123           else                  ap[_i] = value; \
124           goto b_noinsert; \
125         } \
126       }  \
127       if (nonew == 1) goto b_noinsert; \
128       else if (nonew == -1) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero into matrix"); \
129       if (nrow >= rmax) { \
130         /* there is no extra room in row, therefore enlarge */ \
131         int    new_nz = bi[b->m] + CHUNKSIZE,len,*new_i,*new_j; \
132         Scalar *new_a; \
133  \
134         if (nonew == -2) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero in the matrix"); \
135  \
136         /* malloc new storage space */ \
137         len     = new_nz*(sizeof(int)+sizeof(Scalar))+(b->m+1)*sizeof(int); \
138         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
139         new_j   = (int *) (new_a + new_nz); \
140         new_i   = new_j + new_nz; \
141  \
142         /* copy over old data into new slots */ \
143         for ( ii=0; ii<row+1; ii++ ) {new_i[ii] = bi[ii];} \
144         for ( ii=row+1; ii<b->m+1; ii++ ) {new_i[ii] = bi[ii]+CHUNKSIZE;} \
145         PetscMemcpy(new_j,bj,(bi[row]+nrow+shift)*sizeof(int)); \
146         len = (new_nz - CHUNKSIZE - bi[row] - nrow - shift); \
147         PetscMemcpy(new_j+bi[row]+shift+nrow+CHUNKSIZE,bj+bi[row]+shift+nrow, \
148                                                            len*sizeof(int)); \
149         PetscMemcpy(new_a,ba,(bi[row]+nrow+shift)*sizeof(Scalar)); \
150         PetscMemcpy(new_a+bi[row]+shift+nrow+CHUNKSIZE,ba+bi[row]+shift+nrow, \
151                                                            len*sizeof(Scalar));  \
152         /* free up old matrix storage */ \
153  \
154         PetscFree(b->a);  \
155         if (!b->singlemalloc) {PetscFree(b->i);PetscFree(b->j);} \
156         ba = b->a = new_a; bi = b->i = new_i; bj = b->j = new_j;  \
157         b->singlemalloc = 1; \
158  \
159         rp   = bj + bi[row] + shift; ap = ba + bi[row] + shift; \
160         rmax = bimax[row] = bimax[row] + CHUNKSIZE; \
161         PLogObjectMemory(B,CHUNKSIZE*(sizeof(int) + sizeof(Scalar))); \
162         b->maxnz += CHUNKSIZE; \
163         b->reallocs++; \
164       } \
165       N = nrow++ - 1; b->nz++; \
166       /* shift up all the later entries in this row */ \
167       for ( ii=N; ii>=_i; ii-- ) { \
168         rp[ii+1] = rp[ii]; \
169         ap[ii+1] = ap[ii]; \
170       } \
171       rp[_i] = col1;  \
172       ap[_i] = value;  \
173       b_noinsert: ; \
174       bilen[row] = nrow; \
175 }
176 
177 extern int MatSetValues_SeqAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
178 #undef __FUNC__
179 #define __FUNC__ "MatSetValues_MPIAIJ"
180 int MatSetValues_MPIAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
181 {
182   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
183   Scalar     value;
184   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
185   int        cstart = aij->cstart, cend = aij->cend,row,col;
186   int        roworiented = aij->roworiented;
187 
188   /* Some Variables required in the macro */
189   Mat        A = aij->A;
190   Mat_SeqAIJ *a = (Mat_SeqAIJ *) A->data;
191   int        *aimax = a->imax, *ai = a->i, *ailen = a->ilen,*aj = a->j;
192   Scalar     *aa = a->a;
193 
194   Mat        B = aij->B;
195   Mat_SeqAIJ *b = (Mat_SeqAIJ *) B->data;
196   int        *bimax = b->imax, *bi = b->i, *bilen = b->ilen,*bj = b->j;
197   Scalar     *ba = b->a;
198 
199   int        *rp,ii,nrow,_i,rmax, N, col1,low,high,t;
200   int        nonew = a->nonew,shift = a->indexshift;
201   Scalar     *ap;
202 
203   PetscFunctionBegin;
204   for ( i=0; i<m; i++ ) {
205 #if defined(USE_PETSC_BOPT_g)
206     if (im[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
207     if (im[i] >= aij->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
208 #endif
209     if (im[i] >= rstart && im[i] < rend) {
210       row = im[i] - rstart;
211       for ( j=0; j<n; j++ ) {
212         if (in[j] >= cstart && in[j] < cend){
213           col = in[j] - cstart;
214           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
215           MatSetValues_SeqAIJ_A_Private(row,col,value,addv);
216           /* ierr = MatSetValues_SeqAIJ(aij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
217         }
218 #if defined(USE_PETSC_BOPT_g)
219         else if (in[j] < 0) {SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative column");}
220         else if (in[j] >= aij->N) {SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Column too large");}
221 #endif
222         else {
223           if (mat->was_assembled) {
224             if (!aij->colmap) {
225               ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
226             }
227             col = aij->colmap[in[j]] - 1;
228             if (col < 0 && !((Mat_SeqAIJ*)(aij->A->data))->nonew) {
229               ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
230               col =  in[j];
231               /* Reinitialize the variables required by MatSetValues_SeqAIJ_B_Private() */
232               B = aij->B;
233               b = (Mat_SeqAIJ *) B->data;
234               bimax = b->imax; bi = b->i; bilen = b->ilen; bj = b->j;
235               ba = b->a;
236             }
237           } else col = in[j];
238           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
239           MatSetValues_SeqAIJ_B_Private(row,col,value,addv);
240           /* ierr = MatSetValues_SeqAIJ(aij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
241         }
242       }
243     }
244     else {
245       if (roworiented && !aij->donotstash) {
246         ierr = StashValues_Private(&aij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
247       }
248       else {
249         if (!aij->donotstash) {
250           row = im[i];
251           for ( j=0; j<n; j++ ) {
252             ierr = StashValues_Private(&aij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
253           }
254         }
255       }
256     }
257   }
258   PetscFunctionReturn(0);
259 }
260 
261 #undef __FUNC__
262 #define __FUNC__ "MatGetValues_MPIAIJ"
263 int MatGetValues_MPIAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
264 {
265   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
266   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
267   int        cstart = aij->cstart, cend = aij->cend,row,col;
268 
269   PetscFunctionBegin;
270   for ( i=0; i<m; i++ ) {
271     if (idxm[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
272     if (idxm[i] >= aij->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
273     if (idxm[i] >= rstart && idxm[i] < rend) {
274       row = idxm[i] - rstart;
275       for ( j=0; j<n; j++ ) {
276         if (idxn[j] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative column");
277         if (idxn[j] >= aij->N) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Column too large");
278         if (idxn[j] >= cstart && idxn[j] < cend){
279           col = idxn[j] - cstart;
280           ierr = MatGetValues(aij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
281         }
282         else {
283           if (!aij->colmap) {
284             ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
285           }
286           col = aij->colmap[idxn[j]] - 1;
287           if ((col < 0) || (aij->garray[col] != idxn[j])) *(v+i*n+j) = 0.0;
288           else {
289             ierr = MatGetValues(aij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
290           }
291         }
292       }
293     } else {
294       SETERRQ(PETSC_ERR_SUP,0,"Only local values currently supported");
295     }
296   }
297   PetscFunctionReturn(0);
298 }
299 
300 #undef __FUNC__
301 #define __FUNC__ "MatAssemblyBegin_MPIAIJ"
302 int MatAssemblyBegin_MPIAIJ(Mat mat,MatAssemblyType mode)
303 {
304   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
305   MPI_Comm    comm = mat->comm;
306   int         size = aij->size, *owners = aij->rowners;
307   int         rank = aij->rank,tag = mat->tag, *owner,*starts,count,ierr;
308   MPI_Request *send_waits,*recv_waits;
309   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
310   InsertMode  addv;
311   Scalar      *rvalues,*svalues;
312 
313   PetscFunctionBegin;
314   if (aij->donotstash) {
315     aij->svalues    = 0; aij->rvalues    = 0;
316     aij->nsends     = 0; aij->nrecvs     = 0;
317     aij->send_waits = 0; aij->recv_waits = 0;
318     aij->rmax       = 0;
319     PetscFunctionReturn(0);
320   }
321 
322   /* make sure all processors are either in INSERTMODE or ADDMODE */
323   ierr = MPI_Allreduce(&mat->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);CHKERRQ(ierr);
324   if (addv == (ADD_VALUES|INSERT_VALUES)) {
325     SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"Some processors inserted others added");
326   }
327   mat->insertmode = addv; /* in case this processor had no cache */
328 
329   /*  first count number of contributors to each processor */
330   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
331   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
332   owner = (int *) PetscMalloc( (aij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
333   for ( i=0; i<aij->stash.n; i++ ) {
334     idx = aij->stash.idx[i];
335     for ( j=0; j<size; j++ ) {
336       if (idx >= owners[j] && idx < owners[j+1]) {
337         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
338       }
339     }
340   }
341   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
342 
343   /* inform other processors of number of messages and max length*/
344   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
345   ierr = MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
346   nreceives = work[rank];
347   ierr = MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);CHKERRQ(ierr);
348   nmax = work[rank];
349   PetscFree(work);
350 
351   /* post receives:
352        1) each message will consist of ordered pairs
353      (global index,value) we store the global index as a double
354      to simplify the message passing.
355        2) since we don't know how long each individual message is we
356      allocate the largest needed buffer for each receive. Potentially
357      this is a lot of wasted space.
358 
359 
360        This could be done better.
361   */
362   rvalues    = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));CHKPTRQ(rvalues);
363   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
364   for ( i=0; i<nreceives; i++ ) {
365     ierr = MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
366               comm,recv_waits+i);CHKERRQ(ierr);
367   }
368 
369   /* do sends:
370       1) starts[i] gives the starting index in svalues for stuff going to
371          the ith processor
372   */
373   svalues    = (Scalar *) PetscMalloc(3*(aij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
374   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
375   starts     = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
376   starts[0]  = 0;
377   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
378   for ( i=0; i<aij->stash.n; i++ ) {
379     svalues[3*starts[owner[i]]]       = (Scalar)  aij->stash.idx[i];
380     svalues[3*starts[owner[i]]+1]     = (Scalar)  aij->stash.idy[i];
381     svalues[3*(starts[owner[i]]++)+2] =  aij->stash.array[i];
382   }
383   PetscFree(owner);
384   starts[0] = 0;
385   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
386   count = 0;
387   for ( i=0; i<size; i++ ) {
388     if (procs[i]) {
389       ierr = MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,comm,send_waits+count++);CHKERRQ(ierr);
390     }
391   }
392   PetscFree(starts);
393   PetscFree(nprocs);
394 
395   /* Free cache space */
396   PLogInfo(aij->A,"MatAssemblyBegin_MPIAIJ:Number of off-processor values %d\n",aij->stash.n);
397   ierr = StashDestroy_Private(&aij->stash); CHKERRQ(ierr);
398 
399   aij->svalues    = svalues;    aij->rvalues    = rvalues;
400   aij->nsends     = nsends;     aij->nrecvs     = nreceives;
401   aij->send_waits = send_waits; aij->recv_waits = recv_waits;
402   aij->rmax       = nmax;
403 
404   PetscFunctionReturn(0);
405 }
406 extern int MatSetUpMultiply_MPIAIJ(Mat);
407 
408 #undef __FUNC__
409 #define __FUNC__ "MatAssemblyEnd_MPIAIJ"
410 int MatAssemblyEnd_MPIAIJ(Mat mat,MatAssemblyType mode)
411 {
412   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
413   MPI_Status  *send_status,recv_status;
414   int         imdex,nrecvs = aij->nrecvs, count = nrecvs, i, n, ierr;
415   int         row,col,other_disassembled;
416   Scalar      *values,val;
417   InsertMode  addv = mat->insertmode;
418 
419   PetscFunctionBegin;
420 
421   /*  wait on receives */
422   while (count) {
423     ierr = MPI_Waitany(nrecvs,aij->recv_waits,&imdex,&recv_status);CHKERRQ(ierr);
424     /* unpack receives into our local space */
425     values = aij->rvalues + 3*imdex*aij->rmax;
426     ierr = MPI_Get_count(&recv_status,MPIU_SCALAR,&n);CHKERRQ(ierr);
427     n = n/3;
428     for ( i=0; i<n; i++ ) {
429       row = (int) PetscReal(values[3*i]) - aij->rstart;
430       col = (int) PetscReal(values[3*i+1]);
431       val = values[3*i+2];
432       if (col >= aij->cstart && col < aij->cend) {
433         col -= aij->cstart;
434         ierr = MatSetValues(aij->A,1,&row,1,&col,&val,addv); CHKERRQ(ierr);
435       } else {
436         if (mat->was_assembled || mat->assembled) {
437           if (!aij->colmap) {
438             ierr = CreateColmap_MPIAIJ_Private(mat); CHKERRQ(ierr);
439           }
440           col = aij->colmap[col] - 1;
441           if (col < 0  && !((Mat_SeqAIJ*)(aij->A->data))->nonew) {
442             ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
443             col = (int) PetscReal(values[3*i+1]);
444           }
445         }
446         ierr = MatSetValues(aij->B,1,&row,1,&col,&val,addv); CHKERRQ(ierr);
447       }
448     }
449     count--;
450   }
451   PetscFree(aij->recv_waits); PetscFree(aij->rvalues);
452 
453   /* wait on sends */
454   if (aij->nsends) {
455     send_status = (MPI_Status *) PetscMalloc(aij->nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
456     ierr        = MPI_Waitall(aij->nsends,aij->send_waits,send_status);CHKERRQ(ierr);
457     PetscFree(send_status);
458   }
459   if (aij->send_waits) PetscFree(aij->send_waits);
460   if (aij->svalues)    PetscFree(aij->svalues);
461 
462   ierr = MatAssemblyBegin(aij->A,mode); CHKERRQ(ierr);
463   ierr = MatAssemblyEnd(aij->A,mode); CHKERRQ(ierr);
464 
465   /* determine if any processor has disassembled, if so we must
466      also disassemble ourselfs, in order that we may reassemble. */
467   ierr = MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);CHKERRQ(ierr);
468   if (mat->was_assembled && !other_disassembled) {
469     ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
470   }
471 
472   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
473     ierr = MatSetUpMultiply_MPIAIJ(mat); CHKERRQ(ierr);
474   }
475   ierr = MatAssemblyBegin(aij->B,mode); CHKERRQ(ierr);
476   ierr = MatAssemblyEnd(aij->B,mode); CHKERRQ(ierr);
477 
478   if (aij->rowvalues) {PetscFree(aij->rowvalues); aij->rowvalues = 0;}
479   PetscFunctionReturn(0);
480 }
481 
482 #undef __FUNC__
483 #define __FUNC__ "MatZeroEntries_MPIAIJ"
484 int MatZeroEntries_MPIAIJ(Mat A)
485 {
486   Mat_MPIAIJ *l = (Mat_MPIAIJ *) A->data;
487   int        ierr;
488 
489   PetscFunctionBegin;
490   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
491   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
492   PetscFunctionReturn(0);
493 }
494 
495 #undef __FUNC__
496 #define __FUNC__ "MatZeroRows_MPIAIJ"
497 int MatZeroRows_MPIAIJ(Mat A,IS is,Scalar *diag)
498 {
499   Mat_MPIAIJ     *l = (Mat_MPIAIJ *) A->data;
500   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
501   int            *procs,*nprocs,j,found,idx,nsends,*work,row;
502   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
503   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
504   int            *lens,imdex,*lrows,*values,rstart=l->rstart;
505   MPI_Comm       comm = A->comm;
506   MPI_Request    *send_waits,*recv_waits;
507   MPI_Status     recv_status,*send_status;
508   IS             istmp;
509   PetscTruth     localdiag;
510 
511   PetscFunctionBegin;
512   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
513   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
514 
515   /*  first count number of contributors to each processor */
516   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
517   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
518   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
519   for ( i=0; i<N; i++ ) {
520     idx = rows[i];
521     found = 0;
522     for ( j=0; j<size; j++ ) {
523       if (idx >= owners[j] && idx < owners[j+1]) {
524         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
525       }
526     }
527     if (!found) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Index out of range");
528   }
529   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
530 
531   /* inform other processors of number of messages and max length*/
532   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
533   ierr = MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
534   nrecvs = work[rank];
535   ierr = MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);CHKERRQ(ierr);
536   nmax = work[rank];
537   PetscFree(work);
538 
539   /* post receives:   */
540   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int));CHKPTRQ(rvalues);
541   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
542   for ( i=0; i<nrecvs; i++ ) {
543     ierr = MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);CHKERRQ(ierr);
544   }
545 
546   /* do sends:
547       1) starts[i] gives the starting index in svalues for stuff going to
548          the ith processor
549   */
550   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
551   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
552   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
553   starts[0] = 0;
554   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
555   for ( i=0; i<N; i++ ) {
556     svalues[starts[owner[i]]++] = rows[i];
557   }
558   ISRestoreIndices(is,&rows);
559 
560   starts[0] = 0;
561   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
562   count = 0;
563   for ( i=0; i<size; i++ ) {
564     if (procs[i]) {
565       ierr = MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);CHKERRQ(ierr);
566     }
567   }
568   PetscFree(starts);
569 
570   base = owners[rank];
571 
572   /*  wait on receives */
573   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
574   source = lens + nrecvs;
575   count  = nrecvs; slen = 0;
576   while (count) {
577     ierr = MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);CHKERRQ(ierr);
578     /* unpack receives into our local space */
579     ierr = MPI_Get_count(&recv_status,MPI_INT,&n);CHKERRQ(ierr);
580     source[imdex]  = recv_status.MPI_SOURCE;
581     lens[imdex]  = n;
582     slen += n;
583     count--;
584   }
585   PetscFree(recv_waits);
586 
587   /* move the data into the send scatter */
588   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
589   count = 0;
590   for ( i=0; i<nrecvs; i++ ) {
591     values = rvalues + i*nmax;
592     for ( j=0; j<lens[i]; j++ ) {
593       lrows[count++] = values[j] - base;
594     }
595   }
596   PetscFree(rvalues); PetscFree(lens);
597   PetscFree(owner); PetscFree(nprocs);
598 
599   /* actually zap the local rows */
600   ierr = ISCreateGeneral(PETSC_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
601   PLogObjectParent(A,istmp);
602 
603   /*
604         Zero the required rows. If the "diagonal block" of the matrix
605      is square and the user wishes to set the diagonal we use seperate
606      code so that MatSetValues() is not called for each diagonal allocating
607      new memory, thus calling lots of mallocs and slowing things down.
608 
609        Contributed by: Mathew Knepley
610   */
611   localdiag = PETSC_FALSE;
612   if (diag && (l->A->M == l->A->N)) {
613     localdiag = PETSC_TRUE;
614     ierr      = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
615   } else {
616     ierr = MatZeroRows(l->A,istmp,0); CHKERRQ(ierr);
617   }
618   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
619   ierr = ISDestroy(istmp); CHKERRQ(ierr);
620 
621   if (diag && (localdiag == PETSC_FALSE)) {
622     for ( i = 0; i < slen; i++ ) {
623       row = lrows[i] + rstart;
624       MatSetValues(A,1,&row,1,&row,diag,INSERT_VALUES);
625     }
626     MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY);
627     MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY);
628   }
629 
630   if (diag) {
631     for ( i = 0; i < slen; i++ ) {
632       row = lrows[i] + rstart;
633       MatSetValues(A,1,&row,1,&row,diag,INSERT_VALUES);
634     }
635     MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY);
636     MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY);
637   }
638   PetscFree(lrows);
639 
640   /* wait on sends */
641   if (nsends) {
642     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
643     ierr        = MPI_Waitall(nsends,send_waits,send_status);CHKERRQ(ierr);
644     PetscFree(send_status);
645   }
646   PetscFree(send_waits); PetscFree(svalues);
647 
648   PetscFunctionReturn(0);
649 }
650 
651 #undef __FUNC__
652 #define __FUNC__ "MatMult_MPIAIJ"
653 int MatMult_MPIAIJ(Mat A,Vec xx,Vec yy)
654 {
655   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
656   int        ierr,nt;
657 
658   PetscFunctionBegin;
659   ierr = VecGetLocalSize(xx,&nt);  CHKERRQ(ierr);
660   if (nt != a->n) {
661     SETERRQ(PETSC_ERR_ARG_SIZ,0,"Incompatible partition of A and xx");
662   }
663   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx); CHKERRQ(ierr);
664   ierr = (*a->A->ops->mult)(a->A,xx,yy); CHKERRQ(ierr);
665   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx); CHKERRQ(ierr);
666   ierr = (*a->B->ops->multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
667   PetscFunctionReturn(0);
668 }
669 
670 #undef __FUNC__
671 #define __FUNC__ "MatMultAdd_MPIAIJ"
672 int MatMultAdd_MPIAIJ(Mat A,Vec xx,Vec yy,Vec zz)
673 {
674   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
675   int        ierr;
676 
677   PetscFunctionBegin;
678   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
679   ierr = (*a->A->ops->multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
680   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
681   ierr = (*a->B->ops->multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
682   PetscFunctionReturn(0);
683 }
684 
685 #undef __FUNC__
686 #define __FUNC__ "MatMultTrans_MPIAIJ"
687 int MatMultTrans_MPIAIJ(Mat A,Vec xx,Vec yy)
688 {
689   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
690   int        ierr;
691 
692   PetscFunctionBegin;
693   /* do nondiagonal part */
694   ierr = (*a->B->ops->multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
695   /* send it on its way */
696   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
697   /* do local part */
698   ierr = (*a->A->ops->multtrans)(a->A,xx,yy); CHKERRQ(ierr);
699   /* receive remote parts: note this assumes the values are not actually */
700   /* inserted in yy until the next line, which is true for my implementation*/
701   /* but is not perhaps always true. */
702   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
703   PetscFunctionReturn(0);
704 }
705 
706 #undef __FUNC__
707 #define __FUNC__ "MatMultTransAdd_MPIAIJ"
708 int MatMultTransAdd_MPIAIJ(Mat A,Vec xx,Vec yy,Vec zz)
709 {
710   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
711   int        ierr;
712 
713   PetscFunctionBegin;
714   /* do nondiagonal part */
715   ierr = (*a->B->ops->multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
716   /* send it on its way */
717   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
718   /* do local part */
719   ierr = (*a->A->ops->multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
720   /* receive remote parts: note this assumes the values are not actually */
721   /* inserted in yy until the next line, which is true for my implementation*/
722   /* but is not perhaps always true. */
723   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
724   PetscFunctionReturn(0);
725 }
726 
727 /*
728   This only works correctly for square matrices where the subblock A->A is the
729    diagonal block
730 */
731 #undef __FUNC__
732 #define __FUNC__ "MatGetDiagonal_MPIAIJ"
733 int MatGetDiagonal_MPIAIJ(Mat A,Vec v)
734 {
735   int        ierr;
736   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
737 
738   PetscFunctionBegin;
739   if (a->M != a->N) SETERRQ(PETSC_ERR_SUP,0,"Supports only square matrix where A->A is diag block");
740   if (a->rstart != a->cstart || a->rend != a->cend) {
741     SETERRQ(PETSC_ERR_ARG_SIZ,0,"row partition must equal col partition");
742   }
743   ierr = MatGetDiagonal(a->A,v);CHKERRQ(ierr);
744   PetscFunctionReturn(0);
745 }
746 
747 #undef __FUNC__
748 #define __FUNC__ "MatScale_MPIAIJ"
749 int MatScale_MPIAIJ(Scalar *aa,Mat A)
750 {
751   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
752   int        ierr;
753 
754   PetscFunctionBegin;
755   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
756   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
757   PetscFunctionReturn(0);
758 }
759 
760 #undef __FUNC__
761 #define __FUNC__ "MatDestroy_MPIAIJ"
762 int MatDestroy_MPIAIJ(Mat mat)
763 {
764   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
765   int        ierr;
766 
767   PetscFunctionBegin;
768 #if defined(USE_PETSC_LOG)
769   PLogObjectState((PetscObject)mat,"Rows=%d, Cols=%d",aij->M,aij->N);
770 #endif
771   ierr = StashDestroy_Private(&aij->stash); CHKERRQ(ierr);
772   PetscFree(aij->rowners);
773   ierr = MatDestroy(aij->A); CHKERRQ(ierr);
774   ierr = MatDestroy(aij->B); CHKERRQ(ierr);
775   if (aij->colmap) PetscFree(aij->colmap);
776   if (aij->garray) PetscFree(aij->garray);
777   if (aij->lvec)   VecDestroy(aij->lvec);
778   if (aij->Mvctx)  VecScatterDestroy(aij->Mvctx);
779   if (aij->rowvalues) PetscFree(aij->rowvalues);
780   PetscFree(aij);
781   PLogObjectDestroy(mat);
782   PetscHeaderDestroy(mat);
783   PetscFunctionReturn(0);
784 }
785 
786 #undef __FUNC__
787 #define __FUNC__ "MatView_MPIAIJ_Binary"
788 extern int MatView_MPIAIJ_Binary(Mat mat,Viewer viewer)
789 {
790   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
791   int         ierr;
792 
793   PetscFunctionBegin;
794   if (aij->size == 1) {
795     ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
796   }
797   else SETERRQ(PETSC_ERR_SUP,0,"Only uniprocessor output supported");
798   PetscFunctionReturn(0);
799 }
800 
801 #undef __FUNC__
802 #define __FUNC__ "MatView_MPIAIJ_ASCIIorDraworMatlab"
803 extern int MatView_MPIAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
804 {
805   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
806   Mat_SeqAIJ* C = (Mat_SeqAIJ*)aij->A->data;
807   int         ierr, format,shift = C->indexshift,rank;
808   FILE        *fd;
809   ViewerType  vtype;
810 
811   PetscFunctionBegin;
812   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
813   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
814     ierr = ViewerGetFormat(viewer,&format);
815     if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
816       MatInfo info;
817       int     flg;
818       MPI_Comm_rank(mat->comm,&rank);
819       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
820       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
821       ierr = OptionsHasName(PETSC_NULL,"-mat_aij_no_inode",&flg); CHKERRQ(ierr);
822       PetscSequentialPhaseBegin(mat->comm,1);
823       if (flg) fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d mem %d, not using I-node routines\n",
824          rank,aij->m,(int)info.nz_used,(int)info.nz_allocated,(int)info.memory);
825       else fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d mem %d, using I-node routines\n",
826          rank,aij->m,(int)info.nz_used,(int)info.nz_allocated,(int)info.memory);
827       ierr = MatGetInfo(aij->A,MAT_LOCAL,&info);
828       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used);
829       ierr = MatGetInfo(aij->B,MAT_LOCAL,&info);
830       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used);
831       fflush(fd);
832       PetscSequentialPhaseEnd(mat->comm,1);
833       ierr = VecScatterView(aij->Mvctx,viewer); CHKERRQ(ierr);
834       PetscFunctionReturn(0);
835     } else if (format == VIEWER_FORMAT_ASCII_INFO) {
836       PetscFunctionReturn(0);
837     }
838   }
839 
840   if (vtype == DRAW_VIEWER) {
841     Draw       draw;
842     PetscTruth isnull;
843     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
844     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) PetscFunctionReturn(0);
845   }
846 
847   if (vtype == ASCII_FILE_VIEWER) {
848     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
849     PetscSequentialPhaseBegin(mat->comm,1);
850     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
851            aij->rank,aij->m,aij->rstart,aij->rend,aij->n,aij->cstart,
852            aij->cend);
853     ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
854     ierr = MatView(aij->B,viewer); CHKERRQ(ierr);
855     fflush(fd);
856     PetscSequentialPhaseEnd(mat->comm,1);
857   } else {
858     int size = aij->size;
859     rank = aij->rank;
860     if (size == 1) {
861       ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
862     } else {
863       /* assemble the entire matrix onto first processor. */
864       Mat         A;
865       Mat_SeqAIJ *Aloc;
866       int         M = aij->M, N = aij->N,m,*ai,*aj,row,*cols,i,*ct;
867       Scalar      *a;
868 
869       if (!rank) {
870         ierr = MatCreateMPIAIJ(mat->comm,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);CHKERRQ(ierr);
871       } else {
872         ierr = MatCreateMPIAIJ(mat->comm,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);CHKERRQ(ierr);
873       }
874       PLogObjectParent(mat,A);
875 
876       /* copy over the A part */
877       Aloc = (Mat_SeqAIJ*) aij->A->data;
878       m = Aloc->m; ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
879       row = aij->rstart;
880       for ( i=0; i<ai[m]+shift; i++ ) {aj[i] += aij->cstart + shift;}
881       for ( i=0; i<m; i++ ) {
882         ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],aj,a,INSERT_VALUES);CHKERRQ(ierr);
883         row++; a += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
884       }
885       aj = Aloc->j;
886       for ( i=0; i<ai[m]+shift; i++ ) {aj[i] -= aij->cstart + shift;}
887 
888       /* copy over the B part */
889       Aloc = (Mat_SeqAIJ*) aij->B->data;
890       m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
891       row = aij->rstart;
892       ct = cols = (int *) PetscMalloc( (ai[m]+1)*sizeof(int) ); CHKPTRQ(cols);
893       for ( i=0; i<ai[m]+shift; i++ ) {cols[i] = aij->garray[aj[i]+shift];}
894       for ( i=0; i<m; i++ ) {
895         ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],cols,a,INSERT_VALUES);CHKERRQ(ierr);
896         row++; a += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
897       }
898       PetscFree(ct);
899       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
900       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
901       /*
902          Everyone has to call to draw the matrix since the graphics waits are
903          synchronized across all processors that share the Draw object
904       */
905       if (!rank || vtype == DRAW_VIEWER) {
906         ierr = MatView(((Mat_MPIAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
907       }
908       ierr = MatDestroy(A); CHKERRQ(ierr);
909     }
910   }
911   PetscFunctionReturn(0);
912 }
913 
914 #undef __FUNC__
915 #define __FUNC__ "MatView_MPIAIJ"
916 int MatView_MPIAIJ(Mat mat,Viewer viewer)
917 {
918   int         ierr;
919   ViewerType  vtype;
920 
921   PetscFunctionBegin;
922   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
923   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
924       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
925     ierr = MatView_MPIAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
926   } else if (vtype == BINARY_FILE_VIEWER) {
927     ierr = MatView_MPIAIJ_Binary(mat,viewer);CHKERRQ(ierr);
928   } else {
929     SETERRQ(1,1,"Viewer type not supported by PETSc object");
930   }
931   PetscFunctionReturn(0);
932 }
933 
934 /*
935     This has to provide several versions.
936 
937      2) a) use only local smoothing updating outer values only once.
938         b) local smoothing updating outer values each inner iteration
939      3) color updating out values betwen colors.
940 */
941 #undef __FUNC__
942 #define __FUNC__ "MatRelax_MPIAIJ"
943 int MatRelax_MPIAIJ(Mat matin,Vec bb,double omega,MatSORType flag,
944                            double fshift,int its,Vec xx)
945 {
946   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
947   Mat        AA = mat->A, BB = mat->B;
948   Mat_SeqAIJ *A = (Mat_SeqAIJ *) AA->data, *B = (Mat_SeqAIJ *)BB->data;
949   Scalar     *b,*x,*xs,*ls,d,*v,sum;
950   int        ierr,*idx, *diag;
951   int        n = mat->n, m = mat->m, i,shift = A->indexshift;
952 
953   PetscFunctionBegin;
954   VecGetArray(xx,&x); VecGetArray(bb,&b); VecGetArray(mat->lvec,&ls);
955   xs = x + shift; /* shift by one for index start of 1 */
956   ls = ls + shift;
957   if (!A->diag) {ierr = MatMarkDiag_SeqAIJ(AA); CHKERRQ(ierr);}
958   diag = A->diag;
959   if ((flag & SOR_LOCAL_SYMMETRIC_SWEEP) == SOR_LOCAL_SYMMETRIC_SWEEP){
960     if (flag & SOR_ZERO_INITIAL_GUESS) {
961       ierr = (*mat->A->ops->relax)(mat->A,bb,omega,flag,fshift,its,xx);CHKERRQ(ierr);
962       PetscFunctionReturn(0);
963     }
964     ierr=VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);CHKERRQ(ierr);
965     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);CHKERRQ(ierr);
966     while (its--) {
967       /* go down through the rows */
968       for ( i=0; i<m; i++ ) {
969         n    = A->i[i+1] - A->i[i];
970         idx  = A->j + A->i[i] + shift;
971         v    = A->a + A->i[i] + shift;
972         sum  = b[i];
973         SPARSEDENSEMDOT(sum,xs,v,idx,n);
974         d    = fshift + A->a[diag[i]+shift];
975         n    = B->i[i+1] - B->i[i];
976         idx  = B->j + B->i[i] + shift;
977         v    = B->a + B->i[i] + shift;
978         SPARSEDENSEMDOT(sum,ls,v,idx,n);
979         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
980       }
981       /* come up through the rows */
982       for ( i=m-1; i>-1; i-- ) {
983         n    = A->i[i+1] - A->i[i];
984         idx  = A->j + A->i[i] + shift;
985         v    = A->a + A->i[i] + shift;
986         sum  = b[i];
987         SPARSEDENSEMDOT(sum,xs,v,idx,n);
988         d    = fshift + A->a[diag[i]+shift];
989         n    = B->i[i+1] - B->i[i];
990         idx  = B->j + B->i[i] + shift;
991         v    = B->a + B->i[i] + shift;
992         SPARSEDENSEMDOT(sum,ls,v,idx,n);
993         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
994       }
995     }
996   } else if (flag & SOR_LOCAL_FORWARD_SWEEP){
997     if (flag & SOR_ZERO_INITIAL_GUESS) {
998       ierr = (*mat->A->ops->relax)(mat->A,bb,omega,flag,fshift,its,xx);CHKERRQ(ierr);
999       PetscFunctionReturn(0);
1000     }
1001     ierr=VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);CHKERRQ(ierr);
1002     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);CHKERRQ(ierr);
1003     while (its--) {
1004       for ( i=0; i<m; i++ ) {
1005         n    = A->i[i+1] - A->i[i];
1006         idx  = A->j + A->i[i] + shift;
1007         v    = A->a + A->i[i] + shift;
1008         sum  = b[i];
1009         SPARSEDENSEMDOT(sum,xs,v,idx,n);
1010         d    = fshift + A->a[diag[i]+shift];
1011         n    = B->i[i+1] - B->i[i];
1012         idx  = B->j + B->i[i] + shift;
1013         v    = B->a + B->i[i] + shift;
1014         SPARSEDENSEMDOT(sum,ls,v,idx,n);
1015         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
1016       }
1017     }
1018   } else if (flag & SOR_LOCAL_BACKWARD_SWEEP){
1019     if (flag & SOR_ZERO_INITIAL_GUESS) {
1020       ierr = (*mat->A->ops->relax)(mat->A,bb,omega,flag,fshift,its,xx);CHKERRQ(ierr);
1021       PetscFunctionReturn(0);
1022     }
1023     ierr = VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,
1024                             mat->Mvctx); CHKERRQ(ierr);
1025     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,
1026                             mat->Mvctx); CHKERRQ(ierr);
1027     while (its--) {
1028       for ( i=m-1; i>-1; i-- ) {
1029         n    = A->i[i+1] - A->i[i];
1030         idx  = A->j + A->i[i] + shift;
1031         v    = A->a + A->i[i] + shift;
1032         sum  = b[i];
1033         SPARSEDENSEMDOT(sum,xs,v,idx,n);
1034         d    = fshift + A->a[diag[i]+shift];
1035         n    = B->i[i+1] - B->i[i];
1036         idx  = B->j + B->i[i] + shift;
1037         v    = B->a + B->i[i] + shift;
1038         SPARSEDENSEMDOT(sum,ls,v,idx,n);
1039         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
1040       }
1041     }
1042   } else {
1043     SETERRQ(PETSC_ERR_SUP,0,"Parallel SOR not supported");
1044   }
1045   PetscFunctionReturn(0);
1046 }
1047 
1048 #undef __FUNC__
1049 #define __FUNC__ "MatGetInfo_MPIAIJ"
1050 int MatGetInfo_MPIAIJ(Mat matin,MatInfoType flag,MatInfo *info)
1051 {
1052   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1053   Mat        A = mat->A, B = mat->B;
1054   int        ierr;
1055   double     isend[5], irecv[5];
1056 
1057   PetscFunctionBegin;
1058   info->block_size     = 1.0;
1059   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
1060   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->nz_unneeded;
1061   isend[3] = info->memory;  isend[4] = info->mallocs;
1062   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
1063   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->nz_unneeded;
1064   isend[3] += info->memory;  isend[4] += info->mallocs;
1065   if (flag == MAT_LOCAL) {
1066     info->nz_used      = isend[0];
1067     info->nz_allocated = isend[1];
1068     info->nz_unneeded  = isend[2];
1069     info->memory       = isend[3];
1070     info->mallocs      = isend[4];
1071   } else if (flag == MAT_GLOBAL_MAX) {
1072     ierr = MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_MAX,matin->comm);CHKERRQ(ierr);
1073     info->nz_used      = irecv[0];
1074     info->nz_allocated = irecv[1];
1075     info->nz_unneeded  = irecv[2];
1076     info->memory       = irecv[3];
1077     info->mallocs      = irecv[4];
1078   } else if (flag == MAT_GLOBAL_SUM) {
1079     ierr = MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_SUM,matin->comm);CHKERRQ(ierr);
1080     info->nz_used      = irecv[0];
1081     info->nz_allocated = irecv[1];
1082     info->nz_unneeded  = irecv[2];
1083     info->memory       = irecv[3];
1084     info->mallocs      = irecv[4];
1085   }
1086   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
1087   info->fill_ratio_needed = 0;
1088   info->factor_mallocs    = 0;
1089   info->rows_global       = (double)mat->M;
1090   info->columns_global    = (double)mat->N;
1091   info->rows_local        = (double)mat->m;
1092   info->columns_local     = (double)mat->N;
1093 
1094   PetscFunctionReturn(0);
1095 }
1096 
1097 #undef __FUNC__
1098 #define __FUNC__ "MatSetOption_MPIAIJ"
1099 int MatSetOption_MPIAIJ(Mat A,MatOption op)
1100 {
1101   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
1102 
1103   PetscFunctionBegin;
1104   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
1105       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
1106       op == MAT_COLUMNS_UNSORTED ||
1107       op == MAT_COLUMNS_SORTED ||
1108       op == MAT_NEW_NONZERO_ALLOCATION_ERROR ||
1109       op == MAT_NEW_NONZERO_LOCATION_ERROR) {
1110         MatSetOption(a->A,op);
1111         MatSetOption(a->B,op);
1112   } else if (op == MAT_ROW_ORIENTED) {
1113     a->roworiented = 1;
1114     MatSetOption(a->A,op);
1115     MatSetOption(a->B,op);
1116   } else if (op == MAT_ROWS_SORTED ||
1117              op == MAT_ROWS_UNSORTED ||
1118              op == MAT_SYMMETRIC ||
1119              op == MAT_STRUCTURALLY_SYMMETRIC ||
1120              op == MAT_YES_NEW_DIAGONALS)
1121     PLogInfo(A,"MatSetOption_MPIAIJ:Option ignored\n");
1122   else if (op == MAT_COLUMN_ORIENTED) {
1123     a->roworiented = 0;
1124     MatSetOption(a->A,op);
1125     MatSetOption(a->B,op);
1126   } else if (op == MAT_IGNORE_OFF_PROC_ENTRIES) {
1127     a->donotstash = 1;
1128   } else if (op == MAT_NO_NEW_DIAGONALS){
1129     SETERRQ(PETSC_ERR_SUP,0,"MAT_NO_NEW_DIAGONALS");
1130   } else {
1131     SETERRQ(PETSC_ERR_SUP,0,"unknown option");
1132   }
1133   PetscFunctionReturn(0);
1134 }
1135 
1136 #undef __FUNC__
1137 #define __FUNC__ "MatGetSize_MPIAIJ"
1138 int MatGetSize_MPIAIJ(Mat matin,int *m,int *n)
1139 {
1140   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1141 
1142   PetscFunctionBegin;
1143   if (m) *m = mat->M;
1144   if (n) *n = mat->N;
1145   PetscFunctionReturn(0);
1146 }
1147 
1148 #undef __FUNC__
1149 #define __FUNC__ "MatGetLocalSize_MPIAIJ"
1150 int MatGetLocalSize_MPIAIJ(Mat matin,int *m,int *n)
1151 {
1152   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1153 
1154   PetscFunctionBegin;
1155   if (m) *m = mat->m;
1156   if (n) *n = mat->n;
1157   PetscFunctionReturn(0);
1158 }
1159 
1160 #undef __FUNC__
1161 #define __FUNC__ "MatGetOwnershipRange_MPIAIJ"
1162 int MatGetOwnershipRange_MPIAIJ(Mat matin,int *m,int *n)
1163 {
1164   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1165 
1166   PetscFunctionBegin;
1167   *m = mat->rstart; *n = mat->rend;
1168   PetscFunctionReturn(0);
1169 }
1170 
1171 extern int MatGetRow_SeqAIJ(Mat,int,int*,int**,Scalar**);
1172 extern int MatRestoreRow_SeqAIJ(Mat,int,int*,int**,Scalar**);
1173 
1174 #undef __FUNC__
1175 #define __FUNC__ "MatGetRow_MPIAIJ"
1176 int MatGetRow_MPIAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
1177 {
1178   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1179   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
1180   int        i, ierr, *cworkA, *cworkB, **pcA, **pcB, cstart = mat->cstart;
1181   int        nztot, nzA, nzB, lrow, rstart = mat->rstart, rend = mat->rend;
1182   int        *cmap, *idx_p;
1183 
1184   PetscFunctionBegin;
1185   if (mat->getrowactive == PETSC_TRUE) SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"Already active");
1186   mat->getrowactive = PETSC_TRUE;
1187 
1188   if (!mat->rowvalues && (idx || v)) {
1189     /*
1190         allocate enough space to hold information from the longest row.
1191     */
1192     Mat_SeqAIJ *Aa = (Mat_SeqAIJ *) mat->A->data,*Ba = (Mat_SeqAIJ *) mat->B->data;
1193     int     max = 1,m = mat->m,tmp;
1194     for ( i=0; i<m; i++ ) {
1195       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
1196       if (max < tmp) { max = tmp; }
1197     }
1198     mat->rowvalues = (Scalar *) PetscMalloc( max*(sizeof(int)+sizeof(Scalar)));
1199     CHKPTRQ(mat->rowvalues);
1200     mat->rowindices = (int *) (mat->rowvalues + max);
1201   }
1202 
1203   if (row < rstart || row >= rend) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Only local rows")
1204   lrow = row - rstart;
1205 
1206   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
1207   if (!v)   {pvA = 0; pvB = 0;}
1208   if (!idx) {pcA = 0; if (!v) pcB = 0;}
1209   ierr = (*mat->A->ops->getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1210   ierr = (*mat->B->ops->getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1211   nztot = nzA + nzB;
1212 
1213   cmap  = mat->garray;
1214   if (v  || idx) {
1215     if (nztot) {
1216       /* Sort by increasing column numbers, assuming A and B already sorted */
1217       int imark = -1;
1218       if (v) {
1219         *v = v_p = mat->rowvalues;
1220         for ( i=0; i<nzB; i++ ) {
1221           if (cmap[cworkB[i]] < cstart)   v_p[i] = vworkB[i];
1222           else break;
1223         }
1224         imark = i;
1225         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
1226         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
1227       }
1228       if (idx) {
1229         *idx = idx_p = mat->rowindices;
1230         if (imark > -1) {
1231           for ( i=0; i<imark; i++ ) {
1232             idx_p[i] = cmap[cworkB[i]];
1233           }
1234         } else {
1235           for ( i=0; i<nzB; i++ ) {
1236             if (cmap[cworkB[i]] < cstart)   idx_p[i] = cmap[cworkB[i]];
1237             else break;
1238           }
1239           imark = i;
1240         }
1241         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart + cworkA[i];
1242         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]];
1243       }
1244     }
1245     else {
1246       if (idx) *idx = 0;
1247       if (v)   *v   = 0;
1248     }
1249   }
1250   *nz = nztot;
1251   ierr = (*mat->A->ops->restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1252   ierr = (*mat->B->ops->restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1253   PetscFunctionReturn(0);
1254 }
1255 
1256 #undef __FUNC__
1257 #define __FUNC__ "MatRestoreRow_MPIAIJ"
1258 int MatRestoreRow_MPIAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
1259 {
1260   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
1261 
1262   PetscFunctionBegin;
1263   if (aij->getrowactive == PETSC_FALSE) {
1264     SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"MatGetRow not called");
1265   }
1266   aij->getrowactive = PETSC_FALSE;
1267   PetscFunctionReturn(0);
1268 }
1269 
1270 #undef __FUNC__
1271 #define __FUNC__ "MatNorm_MPIAIJ"
1272 int MatNorm_MPIAIJ(Mat mat,NormType type,double *norm)
1273 {
1274   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
1275   Mat_SeqAIJ *amat = (Mat_SeqAIJ*) aij->A->data, *bmat = (Mat_SeqAIJ*) aij->B->data;
1276   int        ierr, i, j, cstart = aij->cstart,shift = amat->indexshift;
1277   double     sum = 0.0;
1278   Scalar     *v;
1279 
1280   PetscFunctionBegin;
1281   if (aij->size == 1) {
1282     ierr =  MatNorm(aij->A,type,norm); CHKERRQ(ierr);
1283   } else {
1284     if (type == NORM_FROBENIUS) {
1285       v = amat->a;
1286       for (i=0; i<amat->nz; i++ ) {
1287 #if defined(USE_PETSC_COMPLEX)
1288         sum += real(conj(*v)*(*v)); v++;
1289 #else
1290         sum += (*v)*(*v); v++;
1291 #endif
1292       }
1293       v = bmat->a;
1294       for (i=0; i<bmat->nz; i++ ) {
1295 #if defined(USE_PETSC_COMPLEX)
1296         sum += real(conj(*v)*(*v)); v++;
1297 #else
1298         sum += (*v)*(*v); v++;
1299 #endif
1300       }
1301       ierr = MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);CHKERRQ(ierr);
1302       *norm = sqrt(*norm);
1303     } else if (type == NORM_1) { /* max column norm */
1304       double *tmp, *tmp2;
1305       int    *jj, *garray = aij->garray;
1306       tmp  = (double *) PetscMalloc( (aij->N+1)*sizeof(double) ); CHKPTRQ(tmp);
1307       tmp2 = (double *) PetscMalloc( (aij->N+1)*sizeof(double) ); CHKPTRQ(tmp2);
1308       PetscMemzero(tmp,aij->N*sizeof(double));
1309       *norm = 0.0;
1310       v = amat->a; jj = amat->j;
1311       for ( j=0; j<amat->nz; j++ ) {
1312         tmp[cstart + *jj++ + shift] += PetscAbsScalar(*v);  v++;
1313       }
1314       v = bmat->a; jj = bmat->j;
1315       for ( j=0; j<bmat->nz; j++ ) {
1316         tmp[garray[*jj++ + shift]] += PetscAbsScalar(*v); v++;
1317       }
1318       ierr = MPI_Allreduce(tmp,tmp2,aij->N,MPI_DOUBLE,MPI_SUM,mat->comm);CHKERRQ(ierr);
1319       for ( j=0; j<aij->N; j++ ) {
1320         if (tmp2[j] > *norm) *norm = tmp2[j];
1321       }
1322       PetscFree(tmp); PetscFree(tmp2);
1323     } else if (type == NORM_INFINITY) { /* max row norm */
1324       double ntemp = 0.0;
1325       for ( j=0; j<amat->m; j++ ) {
1326         v = amat->a + amat->i[j] + shift;
1327         sum = 0.0;
1328         for ( i=0; i<amat->i[j+1]-amat->i[j]; i++ ) {
1329           sum += PetscAbsScalar(*v); v++;
1330         }
1331         v = bmat->a + bmat->i[j] + shift;
1332         for ( i=0; i<bmat->i[j+1]-bmat->i[j]; i++ ) {
1333           sum += PetscAbsScalar(*v); v++;
1334         }
1335         if (sum > ntemp) ntemp = sum;
1336       }
1337       ierr = MPI_Allreduce(&ntemp,norm,1,MPI_DOUBLE,MPI_MAX,mat->comm);CHKERRQ(ierr);
1338     } else {
1339       SETERRQ(PETSC_ERR_SUP,0,"No support for two norm");
1340     }
1341   }
1342   PetscFunctionReturn(0);
1343 }
1344 
1345 #undef __FUNC__
1346 #define __FUNC__ "MatTranspose_MPIAIJ"
1347 int MatTranspose_MPIAIJ(Mat A,Mat *matout)
1348 {
1349   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
1350   Mat_SeqAIJ *Aloc = (Mat_SeqAIJ *) a->A->data;
1351   int        ierr,shift = Aloc->indexshift;
1352   int        M = a->M, N = a->N,m,*ai,*aj,row,*cols,i,*ct;
1353   Mat        B;
1354   Scalar     *array;
1355 
1356   PetscFunctionBegin;
1357   if (matout == PETSC_NULL && M != N) {
1358     SETERRQ(PETSC_ERR_ARG_SIZ,0,"Square matrix only for in-place");
1359   }
1360 
1361   ierr = MatCreateMPIAIJ(A->comm,a->n,a->m,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);CHKERRQ(ierr);
1362 
1363   /* copy over the A part */
1364   Aloc = (Mat_SeqAIJ*) a->A->data;
1365   m = Aloc->m; ai = Aloc->i; aj = Aloc->j; array = Aloc->a;
1366   row = a->rstart;
1367   for ( i=0; i<ai[m]+shift; i++ ) {aj[i] += a->cstart + shift;}
1368   for ( i=0; i<m; i++ ) {
1369     ierr = MatSetValues(B,ai[i+1]-ai[i],aj,1,&row,array,INSERT_VALUES);CHKERRQ(ierr);
1370     row++; array += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
1371   }
1372   aj = Aloc->j;
1373   for ( i=0; i<ai[m]+shift; i++ ) {aj[i] -= a->cstart + shift;}
1374 
1375   /* copy over the B part */
1376   Aloc = (Mat_SeqAIJ*) a->B->data;
1377   m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; array = Aloc->a;
1378   row = a->rstart;
1379   ct = cols = (int *) PetscMalloc( (1+ai[m]-shift)*sizeof(int) ); CHKPTRQ(cols);
1380   for ( i=0; i<ai[m]+shift; i++ ) {cols[i] = a->garray[aj[i]+shift];}
1381   for ( i=0; i<m; i++ ) {
1382     ierr = MatSetValues(B,ai[i+1]-ai[i],cols,1,&row,array,INSERT_VALUES);CHKERRQ(ierr);
1383     row++; array += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
1384   }
1385   PetscFree(ct);
1386   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1387   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1388   if (matout != PETSC_NULL) {
1389     *matout = B;
1390   } else {
1391     PetscOps       *Abops;
1392     struct _MatOps *Aops;
1393 
1394     /* This isn't really an in-place transpose .... but free data structures from a */
1395     PetscFree(a->rowners);
1396     ierr = MatDestroy(a->A); CHKERRQ(ierr);
1397     ierr = MatDestroy(a->B); CHKERRQ(ierr);
1398     if (a->colmap) PetscFree(a->colmap);
1399     if (a->garray) PetscFree(a->garray);
1400     if (a->lvec) VecDestroy(a->lvec);
1401     if (a->Mvctx) VecScatterDestroy(a->Mvctx);
1402     PetscFree(a);
1403 
1404     /*
1405        This is horrible, horrible code. We need to keep the
1406       A pointers for the bops and ops but copy everything
1407       else from C.
1408     */
1409     Abops = A->bops;
1410     Aops  = A->ops;
1411     PetscMemcpy(A,B,sizeof(struct _p_Mat));
1412     A->bops = Abops;
1413     A->ops  = Aops;
1414     PetscHeaderDestroy(B);
1415   }
1416   PetscFunctionReturn(0);
1417 }
1418 
1419 #undef __FUNC__
1420 #define __FUNC__ "MatDiagonalScale_MPIAIJ"
1421 int MatDiagonalScale_MPIAIJ(Mat mat,Vec ll,Vec rr)
1422 {
1423   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
1424   Mat a = aij->A, b = aij->B;
1425   int ierr,s1,s2,s3;
1426 
1427   PetscFunctionBegin;
1428   ierr = MatGetLocalSize(mat,&s2,&s3); CHKERRQ(ierr);
1429   if (rr) {
1430     ierr = VecGetLocalSize(rr,&s1);CHKERRQ(ierr);
1431     if (s1!=s3) SETERRQ(PETSC_ERR_ARG_SIZ,0,"right vector non-conforming local size");
1432     /* Overlap communication with computation. */
1433     ierr = VecScatterBegin(rr,aij->lvec,INSERT_VALUES,SCATTER_FORWARD,aij->Mvctx); CHKERRQ(ierr);
1434   }
1435   if (ll) {
1436     ierr = VecGetLocalSize(ll,&s1);CHKERRQ(ierr);
1437     if (s1!=s2) SETERRQ(PETSC_ERR_ARG_SIZ,0,"left vector non-conforming local size");
1438     ierr = (*b->ops->diagonalscale)(b,ll,0); CHKERRQ(ierr);
1439   }
1440   /* scale  the diagonal block */
1441   ierr = (*a->ops->diagonalscale)(a,ll,rr); CHKERRQ(ierr);
1442 
1443   if (rr) {
1444     /* Do a scatter end and then right scale the off-diagonal block */
1445     ierr = VecScatterEnd(rr,aij->lvec,INSERT_VALUES,SCATTER_FORWARD,aij->Mvctx); CHKERRQ(ierr);
1446     ierr = (*b->ops->diagonalscale)(b,0,aij->lvec); CHKERRQ(ierr);
1447   }
1448 
1449   PetscFunctionReturn(0);
1450 }
1451 
1452 
1453 extern int MatPrintHelp_SeqAIJ(Mat);
1454 #undef __FUNC__
1455 #define __FUNC__ "MatPrintHelp_MPIAIJ"
1456 int MatPrintHelp_MPIAIJ(Mat A)
1457 {
1458   Mat_MPIAIJ *a   = (Mat_MPIAIJ*) A->data;
1459   int        ierr;
1460 
1461   PetscFunctionBegin;
1462   if (!a->rank) {
1463     ierr = MatPrintHelp_SeqAIJ(a->A);CHKERRQ(ierr);
1464   }
1465   PetscFunctionReturn(0);
1466 }
1467 
1468 #undef __FUNC__
1469 #define __FUNC__ "MatGetBlockSize_MPIAIJ"
1470 int MatGetBlockSize_MPIAIJ(Mat A,int *bs)
1471 {
1472   PetscFunctionBegin;
1473   *bs = 1;
1474   PetscFunctionReturn(0);
1475 }
1476 #undef __FUNC__
1477 #define __FUNC__ "MatSetUnfactored_MPIAIJ"
1478 int MatSetUnfactored_MPIAIJ(Mat A)
1479 {
1480   Mat_MPIAIJ *a   = (Mat_MPIAIJ*) A->data;
1481   int        ierr;
1482 
1483   PetscFunctionBegin;
1484   ierr = MatSetUnfactored(a->A); CHKERRQ(ierr);
1485   PetscFunctionReturn(0);
1486 }
1487 
1488 #undef __FUNC__
1489 #define __FUNC__ "MatEqual_MPIAIJ"
1490 int MatEqual_MPIAIJ(Mat A, Mat B, PetscTruth *flag)
1491 {
1492   Mat_MPIAIJ *matB = (Mat_MPIAIJ *) B->data,*matA = (Mat_MPIAIJ *) A->data;
1493   Mat        a, b, c, d;
1494   PetscTruth flg;
1495   int        ierr;
1496 
1497   PetscFunctionBegin;
1498   if (B->type != MATMPIAIJ) SETERRQ(PETSC_ERR_ARG_INCOMP,0,"Matrices must be same type");
1499   a = matA->A; b = matA->B;
1500   c = matB->A; d = matB->B;
1501 
1502   ierr = MatEqual(a, c, &flg); CHKERRQ(ierr);
1503   if (flg == PETSC_TRUE) {
1504     ierr = MatEqual(b, d, &flg); CHKERRQ(ierr);
1505   }
1506   ierr = MPI_Allreduce(&flg, flag, 1, MPI_INT, MPI_LAND, A->comm);CHKERRQ(ierr);
1507   PetscFunctionReturn(0);
1508 }
1509 
1510 extern int MatConvertSameType_MPIAIJ(Mat,Mat *,int);
1511 extern int MatIncreaseOverlap_MPIAIJ(Mat , int, IS *, int);
1512 extern int MatFDColoringCreate_MPIAIJ(Mat,ISColoring,MatFDColoring);
1513 extern int MatGetSubMatrices_MPIAIJ (Mat ,int , IS *,IS *,MatGetSubMatrixCall,Mat **);
1514 extern int MatGetSubMatrix_MPIAIJ (Mat ,IS,IS,int,MatGetSubMatrixCall,Mat *);
1515 
1516 /* -------------------------------------------------------------------*/
1517 static struct _MatOps MatOps = {MatSetValues_MPIAIJ,
1518        MatGetRow_MPIAIJ,MatRestoreRow_MPIAIJ,
1519        MatMult_MPIAIJ,MatMultAdd_MPIAIJ,
1520        MatMultTrans_MPIAIJ,MatMultTransAdd_MPIAIJ,
1521        0,0,
1522        0,0,
1523        0,0,
1524        MatRelax_MPIAIJ,
1525        MatTranspose_MPIAIJ,
1526        MatGetInfo_MPIAIJ,MatEqual_MPIAIJ,
1527        MatGetDiagonal_MPIAIJ,MatDiagonalScale_MPIAIJ,MatNorm_MPIAIJ,
1528        MatAssemblyBegin_MPIAIJ,MatAssemblyEnd_MPIAIJ,
1529        0,
1530        MatSetOption_MPIAIJ,MatZeroEntries_MPIAIJ,MatZeroRows_MPIAIJ,
1531        0,0,0,0,
1532        MatGetSize_MPIAIJ,MatGetLocalSize_MPIAIJ,MatGetOwnershipRange_MPIAIJ,
1533        0,0,
1534        0,0,MatConvertSameType_MPIAIJ,0,0,
1535        0,0,0,
1536        MatGetSubMatrices_MPIAIJ,MatIncreaseOverlap_MPIAIJ,MatGetValues_MPIAIJ,0,
1537        MatPrintHelp_MPIAIJ,
1538        MatScale_MPIAIJ,0,0,0,
1539        MatGetBlockSize_MPIAIJ,0,0,0,0,
1540        MatFDColoringCreate_MPIAIJ,0,MatSetUnfactored_MPIAIJ,
1541        0,0,MatGetSubMatrix_MPIAIJ};
1542 
1543 
1544 #undef __FUNC__
1545 #define __FUNC__ "MatCreateMPIAIJ"
1546 /*@C
1547    MatCreateMPIAIJ - Creates a sparse parallel matrix in AIJ format
1548    (the default parallel PETSc format).  For good matrix assembly performance
1549    the user should preallocate the matrix storage by setting the parameters
1550    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1551    performance can be increased by more than a factor of 50.
1552 
1553    Collective on MPI_Comm
1554 
1555    Input Parameters:
1556 +  comm - MPI communicator
1557 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1558            This value should be the same as the local size used in creating the
1559            y vector for the matrix-vector product y = Ax.
1560 .  n - This value should be the same as the local size used in creating the
1561        x vector for the matrix-vector product y = Ax. (or PETSC_DECIDE to have
1562        calculated if N is given) For square matrices n is almost always m.
1563 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1564 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1565 .  d_nz - number of nonzeros per row in diagonal portion of local submatrix
1566            (same for all local rows)
1567 .  d_nzz - array containing the number of nonzeros in the various rows of the
1568            diagonal portion of the local submatrix (possibly different for each row)
1569            or PETSC_NULL. You must leave room for the diagonal entry even if
1570            it is zero.
1571 .  o_nz - number of nonzeros per row in the off-diagonal portion of local
1572            submatrix (same for all local rows).
1573 -  o_nzz - array containing the number of nonzeros in the various rows of the
1574            off-diagonal portion of the local submatrix (possibly different for
1575            each row) or PETSC_NULL.
1576 
1577    Output Parameter:
1578 .  A - the matrix
1579 
1580    Notes:
1581    The AIJ format (also called the Yale sparse matrix format or
1582    compressed row storage), is fully compatible with standard Fortran 77
1583    storage.  That is, the stored row and column indices can begin at
1584    either one (as in Fortran) or zero.  See the users manual for details.
1585 
1586    The user MUST specify either the local or global matrix dimensions
1587    (possibly both).
1588 
1589    By default, this format uses inodes (identical nodes) when possible.
1590    We search for consecutive rows with the same nonzero structure, thereby
1591    reusing matrix information to achieve increased efficiency.
1592 
1593    Options Database Keys:
1594 +  -mat_aij_no_inode  - Do not use inodes
1595 .  -mat_aij_inode_limit <limit> - Sets inode limit (max limit=5)
1596 -  -mat_aij_oneindex - Internally use indexing starting at 1
1597         rather than 0.  Note that when calling MatSetValues(),
1598         the user still MUST index entries starting at 0!
1599 
1600    Storage Information:
1601    For a square global matrix we define each processor's diagonal portion
1602    to be its local rows and the corresponding columns (a square submatrix);
1603    each processor's off-diagonal portion encompasses the remainder of the
1604    local matrix (a rectangular submatrix).
1605 
1606    The user can specify preallocated storage for the diagonal part of
1607    the local submatrix with either d_nz or d_nnz (not both).  Set
1608    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1609    memory allocation.  Likewise, specify preallocated storage for the
1610    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1611 
1612    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1613    the figure below we depict these three local rows and all columns (0-11).
1614 
1615 .vb
1616              0 1 2 3 4 5 6 7 8 9 10 11
1617             -------------------
1618      row 3  |  o o o d d d o o o o o o
1619      row 4  |  o o o d d d o o o o o o
1620      row 5  |  o o o d d d o o o o o o
1621             -------------------
1622 .ve
1623 
1624    Thus, any entries in the d locations are stored in the d (diagonal)
1625    submatrix, and any entries in the o locations are stored in the
1626    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1627    stored simply in the MATSEQAIJ format for compressed row storage.
1628 
1629    Now d_nz should indicate the number of nonzeros per row in the d matrix,
1630    and o_nz should indicate the number of nonzeros per row in the o matrix.
1631    In general, for PDE problems in which most nonzeros are near the diagonal,
1632    one expects d_nz >> o_nz. For large problems you MUST preallocate memory
1633    or you will get TERRIBLE performance; see the users' manual chapter on
1634    matrices.
1635 
1636 .keywords: matrix, aij, compressed row, sparse, parallel
1637 
1638 .seealso: MatCreate(), MatCreateSeqAIJ(), MatSetValues()
1639 @*/
1640 int MatCreateMPIAIJ(MPI_Comm comm,int m,int n,int M,int N,int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1641 {
1642   Mat          B;
1643   Mat_MPIAIJ   *b;
1644   int          ierr, i,sum[2],work[2],size;
1645 
1646   PetscFunctionBegin;
1647   MPI_Comm_size(comm,&size);
1648   if (size == 1) {
1649     if (M == PETSC_DECIDE) M = m;
1650     if (N == PETSC_DECIDE) N = n;
1651     ierr = MatCreateSeqAIJ(comm,M,N,d_nz,d_nnz,A); CHKERRQ(ierr);
1652     PetscFunctionReturn(0);
1653   }
1654 
1655   *A = 0;
1656   PetscHeaderCreate(B,_p_Mat,struct _MatOps,MAT_COOKIE,MATMPIAIJ,comm,MatDestroy,MatView);
1657   PLogObjectCreate(B);
1658   B->data       = (void *) (b = PetscNew(Mat_MPIAIJ)); CHKPTRQ(b);
1659   PetscMemzero(b,sizeof(Mat_MPIAIJ));
1660   PetscMemcpy(B->ops,&MatOps,sizeof(struct _MatOps));
1661   B->ops->destroy    = MatDestroy_MPIAIJ;
1662   B->ops->view       = MatView_MPIAIJ;
1663   B->factor     = 0;
1664   B->assembled  = PETSC_FALSE;
1665   B->mapping    = 0;
1666 
1667   B->insertmode = NOT_SET_VALUES;
1668   b->size       = size;
1669   MPI_Comm_rank(comm,&b->rank);
1670 
1671   if (m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL)) {
1672     SETERRQ(PETSC_ERR_ARG_WRONG,0,"Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1673   }
1674 
1675   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1676     work[0] = m; work[1] = n;
1677     ierr = MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );CHKERRQ(ierr);
1678     if (M == PETSC_DECIDE) M = sum[0];
1679     if (N == PETSC_DECIDE) N = sum[1];
1680   }
1681   if (m == PETSC_DECIDE) {m = M/b->size + ((M % b->size) > b->rank);}
1682   if (n == PETSC_DECIDE) {n = N/b->size + ((N % b->size) > b->rank);}
1683   b->m = m; B->m = m;
1684   b->n = n; B->n = n;
1685   b->N = N; B->N = N;
1686   b->M = M; B->M = M;
1687 
1688   /* build local table of row and column ownerships */
1689   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1690   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIAIJ));
1691   b->cowners = b->rowners + b->size + 2;
1692   ierr = MPI_Allgather(&m,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1693   b->rowners[0] = 0;
1694   for ( i=2; i<=b->size; i++ ) {
1695     b->rowners[i] += b->rowners[i-1];
1696   }
1697   b->rstart = b->rowners[b->rank];
1698   b->rend   = b->rowners[b->rank+1];
1699   ierr = MPI_Allgather(&n,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1700   b->cowners[0] = 0;
1701   for ( i=2; i<=b->size; i++ ) {
1702     b->cowners[i] += b->cowners[i-1];
1703   }
1704   b->cstart = b->cowners[b->rank];
1705   b->cend   = b->cowners[b->rank+1];
1706 
1707   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1708   ierr = MatCreateSeqAIJ(PETSC_COMM_SELF,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1709   PLogObjectParent(B,b->A);
1710   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1711   ierr = MatCreateSeqAIJ(PETSC_COMM_SELF,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1712   PLogObjectParent(B,b->B);
1713 
1714   /* build cache for off array entries formed */
1715   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1716   b->donotstash  = 0;
1717   b->colmap      = 0;
1718   b->garray      = 0;
1719   b->roworiented = 1;
1720 
1721   /* stuff used for matrix vector multiply */
1722   b->lvec      = 0;
1723   b->Mvctx     = 0;
1724 
1725   /* stuff for MatGetRow() */
1726   b->rowindices   = 0;
1727   b->rowvalues    = 0;
1728   b->getrowactive = PETSC_FALSE;
1729 
1730   *A = B;
1731   PetscFunctionReturn(0);
1732 }
1733 
1734 #undef __FUNC__
1735 #define __FUNC__ "MatConvertSameType_MPIAIJ"
1736 int MatConvertSameType_MPIAIJ(Mat matin,Mat *newmat,int cpvalues)
1737 {
1738   Mat        mat;
1739   Mat_MPIAIJ *a,*oldmat = (Mat_MPIAIJ *) matin->data;
1740   int        ierr, len=0, flg;
1741 
1742   PetscFunctionBegin;
1743   *newmat       = 0;
1744   PetscHeaderCreate(mat,_p_Mat,struct _MatOps,MAT_COOKIE,MATMPIAIJ,matin->comm,MatDestroy,MatView);
1745   PLogObjectCreate(mat);
1746   mat->data       = (void *) (a = PetscNew(Mat_MPIAIJ)); CHKPTRQ(a);
1747   PetscMemcpy(mat->ops,&MatOps,sizeof(struct _MatOps));
1748   mat->ops->destroy    = MatDestroy_MPIAIJ;
1749   mat->ops->view       = MatView_MPIAIJ;
1750   mat->factor     = matin->factor;
1751   mat->assembled  = PETSC_TRUE;
1752 
1753   a->m = mat->m   = oldmat->m;
1754   a->n = mat->n   = oldmat->n;
1755   a->M = mat->M   = oldmat->M;
1756   a->N = mat->N   = oldmat->N;
1757 
1758   a->rstart       = oldmat->rstart;
1759   a->rend         = oldmat->rend;
1760   a->cstart       = oldmat->cstart;
1761   a->cend         = oldmat->cend;
1762   a->size         = oldmat->size;
1763   a->rank         = oldmat->rank;
1764   mat->insertmode = NOT_SET_VALUES;
1765   a->rowvalues    = 0;
1766   a->getrowactive = PETSC_FALSE;
1767 
1768   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1769   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIAIJ));
1770   a->cowners = a->rowners + a->size + 2;
1771   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1772   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1773   if (oldmat->colmap) {
1774     a->colmap = (int *) PetscMalloc((a->N)*sizeof(int));CHKPTRQ(a->colmap);
1775     PLogObjectMemory(mat,(a->N)*sizeof(int));
1776     PetscMemcpy(a->colmap,oldmat->colmap,(a->N)*sizeof(int));
1777   } else a->colmap = 0;
1778   if (oldmat->garray) {
1779     len = ((Mat_SeqAIJ *) (oldmat->B->data))->n;
1780     a->garray = (int *) PetscMalloc((len+1)*sizeof(int)); CHKPTRQ(a->garray);
1781     PLogObjectMemory(mat,len*sizeof(int));
1782     if (len) PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1783   } else a->garray = 0;
1784 
1785   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1786   PLogObjectParent(mat,a->lvec);
1787   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1788   PLogObjectParent(mat,a->Mvctx);
1789   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1790   PLogObjectParent(mat,a->A);
1791   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1792   PLogObjectParent(mat,a->B);
1793   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1794   if (flg) {
1795     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1796   }
1797   *newmat = mat;
1798   PetscFunctionReturn(0);
1799 }
1800 
1801 #include "sys.h"
1802 
1803 #undef __FUNC__
1804 #define __FUNC__ "MatLoad_MPIAIJ"
1805 int MatLoad_MPIAIJ(Viewer viewer,MatType type,Mat *newmat)
1806 {
1807   Mat          A;
1808   Scalar       *vals,*svals;
1809   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1810   MPI_Status   status;
1811   int          i, nz, ierr, j,rstart, rend, fd;
1812   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,maxnz,*cols;
1813   int          *ourlens,*sndcounts = 0,*procsnz = 0, *offlens,jj,*mycols,*smycols;
1814   int          tag = ((PetscObject)viewer)->tag;
1815 
1816   PetscFunctionBegin;
1817   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1818   if (!rank) {
1819     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1820     ierr = PetscBinaryRead(fd,(char *)header,4,PETSC_INT); CHKERRQ(ierr);
1821     if (header[0] != MAT_COOKIE) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"not matrix object");
1822     if (header[3] < 0) {
1823       SETERRQ(PETSC_ERR_FILE_UNEXPECTED,1,"Matrix in special format on disk, cannot load as MPIAIJ");
1824     }
1825   }
1826 
1827 
1828   ierr = MPI_Bcast(header+1,3,MPI_INT,0,comm);CHKERRQ(ierr);
1829   M = header[1]; N = header[2];
1830   /* determine ownership of all rows */
1831   m = M/size + ((M % size) > rank);
1832   rowners = (int *) PetscMalloc((size+2)*sizeof(int)); CHKPTRQ(rowners);
1833   ierr = MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1834   rowners[0] = 0;
1835   for ( i=2; i<=size; i++ ) {
1836     rowners[i] += rowners[i-1];
1837   }
1838   rstart = rowners[rank];
1839   rend   = rowners[rank+1];
1840 
1841   /* distribute row lengths to all processors */
1842   ourlens = (int*) PetscMalloc( 2*(rend-rstart)*sizeof(int) ); CHKPTRQ(ourlens);
1843   offlens = ourlens + (rend-rstart);
1844   if (!rank) {
1845     rowlengths = (int*) PetscMalloc( M*sizeof(int) ); CHKPTRQ(rowlengths);
1846     ierr = PetscBinaryRead(fd,rowlengths,M,PETSC_INT); CHKERRQ(ierr);
1847     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1848     for ( i=0; i<size; i++ ) sndcounts[i] = rowners[i+1] - rowners[i];
1849     ierr = MPI_Scatterv(rowlengths,sndcounts,rowners,MPI_INT,ourlens,rend-rstart,MPI_INT,0,comm);CHKERRQ(ierr);
1850     PetscFree(sndcounts);
1851   } else {
1852     ierr = MPI_Scatterv(0,0,0,MPI_INT,ourlens,rend-rstart,MPI_INT, 0,comm);CHKERRQ(ierr);
1853   }
1854 
1855   if (!rank) {
1856     /* calculate the number of nonzeros on each processor */
1857     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1858     PetscMemzero(procsnz,size*sizeof(int));
1859     for ( i=0; i<size; i++ ) {
1860       for ( j=rowners[i]; j< rowners[i+1]; j++ ) {
1861         procsnz[i] += rowlengths[j];
1862       }
1863     }
1864     PetscFree(rowlengths);
1865 
1866     /* determine max buffer needed and allocate it */
1867     maxnz = 0;
1868     for ( i=0; i<size; i++ ) {
1869       maxnz = PetscMax(maxnz,procsnz[i]);
1870     }
1871     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1872 
1873     /* read in my part of the matrix column indices  */
1874     nz     = procsnz[0];
1875     mycols = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1876     ierr   = PetscBinaryRead(fd,mycols,nz,PETSC_INT); CHKERRQ(ierr);
1877 
1878     /* read in every one elses and ship off */
1879     for ( i=1; i<size; i++ ) {
1880       nz   = procsnz[i];
1881       ierr = PetscBinaryRead(fd,cols,nz,PETSC_INT); CHKERRQ(ierr);
1882       ierr = MPI_Send(cols,nz,MPI_INT,i,tag,comm);CHKERRQ(ierr);
1883     }
1884     PetscFree(cols);
1885   } else {
1886     /* determine buffer space needed for message */
1887     nz = 0;
1888     for ( i=0; i<m; i++ ) {
1889       nz += ourlens[i];
1890     }
1891     mycols = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1892 
1893     /* receive message of column indices*/
1894     ierr = MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);CHKERRQ(ierr);
1895     ierr = MPI_Get_count(&status,MPI_INT,&maxnz);CHKERRQ(ierr);
1896     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"something is wrong with file");
1897   }
1898 
1899   /* loop over local rows, determining number of off diagonal entries */
1900   PetscMemzero(offlens,m*sizeof(int));
1901   jj = 0;
1902   for ( i=0; i<m; i++ ) {
1903     for ( j=0; j<ourlens[i]; j++ ) {
1904       if (mycols[jj] < rstart || mycols[jj] >= rend) offlens[i]++;
1905       jj++;
1906     }
1907   }
1908 
1909   /* create our matrix */
1910   for ( i=0; i<m; i++ ) {
1911     ourlens[i] -= offlens[i];
1912   }
1913   ierr = MatCreateMPIAIJ(comm,m,PETSC_DECIDE,M,N,0,ourlens,0,offlens,newmat);CHKERRQ(ierr);
1914   A = *newmat;
1915   MatSetOption(A,MAT_COLUMNS_SORTED);
1916   for ( i=0; i<m; i++ ) {
1917     ourlens[i] += offlens[i];
1918   }
1919 
1920   if (!rank) {
1921     vals = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(vals);
1922 
1923     /* read in my part of the matrix numerical values  */
1924     nz = procsnz[0];
1925     ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
1926 
1927     /* insert into matrix */
1928     jj      = rstart;
1929     smycols = mycols;
1930     svals   = vals;
1931     for ( i=0; i<m; i++ ) {
1932       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1933       smycols += ourlens[i];
1934       svals   += ourlens[i];
1935       jj++;
1936     }
1937 
1938     /* read in other processors and ship out */
1939     for ( i=1; i<size; i++ ) {
1940       nz   = procsnz[i];
1941       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
1942       ierr = MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);CHKERRQ(ierr);
1943     }
1944     PetscFree(procsnz);
1945   } else {
1946     /* receive numeric values */
1947     vals = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(vals);
1948 
1949     /* receive message of values*/
1950     ierr = MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);CHKERRQ(ierr);
1951     ierr = MPI_Get_count(&status,MPIU_SCALAR,&maxnz);CHKERRQ(ierr);
1952     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"something is wrong with file");
1953 
1954     /* insert into matrix */
1955     jj      = rstart;
1956     smycols = mycols;
1957     svals   = vals;
1958     for ( i=0; i<m; i++ ) {
1959       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1960       smycols += ourlens[i];
1961       svals   += ourlens[i];
1962       jj++;
1963     }
1964   }
1965   PetscFree(ourlens); PetscFree(vals); PetscFree(mycols); PetscFree(rowners);
1966 
1967   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1968   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1969   PetscFunctionReturn(0);
1970 }
1971 
1972 #undef __FUNC__
1973 #define __FUNC__ "MatGetSubMatrix_MPIAIJ"
1974 /*
1975     Not great since it makes two copies of the submatrix, first an SeqAIJ
1976   in local and then by concatenating the local matrices the end result.
1977   Writing it directly would be much like MatGetSubMatrices_MPIAIJ()
1978 */
1979 int MatGetSubMatrix_MPIAIJ(Mat mat,IS isrow,IS iscol,int csize,MatGetSubMatrixCall call,Mat *newmat)
1980 {
1981   int        ierr, i, m,n,rstart,row,rend,nz,*cwork,size,rank,j;
1982   Mat        *local,M, Mreuse;
1983   Scalar     *vwork,*aa;
1984   MPI_Comm   comm = mat->comm;
1985   Mat_SeqAIJ *aij;
1986   int        *ii, *jj,nlocal,*dlens,*olens,dlen,olen,jend;
1987 
1988   PetscFunctionBegin;
1989   MPI_Comm_rank(comm,&rank);
1990   MPI_Comm_size(comm,&size);
1991 
1992   if (call ==  MAT_REUSE_MATRIX) {
1993     ierr = PetscObjectQuery((PetscObject)*newmat,"SubMatrix",(PetscObject *)&Mreuse);CHKERRQ(ierr);
1994     if (!Mreuse) SETERRQ(1,1,"Submatrix passed in was not used before, cannot reuse");
1995     local = &Mreuse;
1996     ierr  = MatGetSubMatrices(mat,1,&isrow,&iscol,MAT_REUSE_MATRIX,&local);CHKERRQ(ierr);
1997   } else {
1998     ierr = MatGetSubMatrices(mat,1,&isrow,&iscol,MAT_INITIAL_MATRIX,&local);CHKERRQ(ierr);
1999     Mreuse = *local;
2000     PetscFree(local);
2001   }
2002 
2003   /*
2004       m - number of local rows
2005       n - number of columns (same on all processors)
2006       rstart - first row in new global matrix generated
2007   */
2008   ierr = MatGetSize(Mreuse,&m,&n);CHKERRQ(ierr);
2009   if (call == MAT_INITIAL_MATRIX) {
2010     aij = (Mat_SeqAIJ *) (Mreuse)->data;
2011     if (aij->indexshift) SETERRQ(PETSC_ERR_SUP,1,"No support for index shifted matrix");
2012     ii  = aij->i;
2013     jj  = aij->j;
2014 
2015     /*
2016         Determine the number of non-zeros in the diagonal and off-diagonal
2017         portions of the matrix in order to do correct preallocation
2018     */
2019 
2020     /* first get start and end of "diagonal" columns */
2021     if (csize == PETSC_DECIDE) {
2022       nlocal = n/size + ((n % size) > rank);
2023     } else {
2024       nlocal = csize;
2025     }
2026     ierr   = MPI_Scan(&nlocal,&rend,1,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
2027     rstart = rend - nlocal;
2028     if (rank == size - 1 && rend != n) {
2029       SETERRQ(1,1,"Local column sizes do not add up to total number of columns");
2030     }
2031 
2032     /* next, compute all the lengths */
2033     dlens = (int *) PetscMalloc( (2*m+1)*sizeof(int) );CHKPTRQ(dlens);
2034     olens = dlens + m;
2035     for ( i=0; i<m; i++ ) {
2036       jend = ii[i+1] - ii[i];
2037       olen = 0;
2038       dlen = 0;
2039       for ( j=0; j<jend; j++ ) {
2040         if ( *jj < rstart || *jj >= rend) olen++;
2041         else dlen++;
2042         jj++;
2043       }
2044       olens[i] = olen;
2045       dlens[i] = dlen;
2046     }
2047     ierr = MatCreateMPIAIJ(comm,m,PETSC_DECIDE,PETSC_DECIDE,n,0,dlens,0,olens,&M);CHKERRQ(ierr);
2048     PetscFree(dlens);
2049   } else {
2050     int ml,nl;
2051 
2052     M = *newmat;
2053     ierr = MatGetLocalSize(M,&ml,&nl);CHKERRQ(ierr);
2054     if (ml != m) SETERRQ(PETSC_ERR_ARG_SIZ,1,"Previous matrix must be same size/layout as request");
2055     ierr = MatZeroEntries(M);CHKERRQ(ierr);
2056     /*
2057          The next two lines are needed so we may call MatSetValues_MPIAIJ() below directly,
2058        rather than the slower MatSetValues().
2059     */
2060     M->was_assembled = PETSC_TRUE;
2061     M->assembled     = PETSC_FALSE;
2062   }
2063   ierr = MatGetOwnershipRange(M,&rstart,&rend); CHKERRQ(ierr);
2064   aij = (Mat_SeqAIJ *) (Mreuse)->data;
2065   if (aij->indexshift) SETERRQ(PETSC_ERR_SUP,1,"No support for index shifted matrix");
2066   ii  = aij->i;
2067   jj  = aij->j;
2068   aa  = aij->a;
2069   for (i=0; i<m; i++) {
2070     row   = rstart + i;
2071     nz    = ii[i+1] - ii[i];
2072     cwork = jj;     jj += nz;
2073     vwork = aa;     aa += nz;
2074     ierr = MatSetValues_MPIAIJ(M,1,&row,nz,cwork,vwork,INSERT_VALUES); CHKERRQ(ierr);
2075   }
2076 
2077   ierr = MatAssemblyBegin(M,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2078   ierr = MatAssemblyEnd(M,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2079   *newmat = M;
2080 
2081   /* save submatrix used in processor for next request */
2082   if (call ==  MAT_INITIAL_MATRIX) {
2083     ierr = PetscObjectCompose((PetscObject)M,"SubMatrix",(PetscObject)Mreuse);CHKERRQ(ierr);
2084     ierr = PetscObjectDereference((PetscObject)Mreuse);CHKERRQ(ierr);
2085   }
2086 
2087   PetscFunctionReturn(0);
2088 }
2089