xref: /petsc/src/mat/impls/aij/mpi/mpiaij.c (revision 3101214862ca1a9248fe36eea58aaca745746247)
1 
2 #ifdef PETSC_RCS_HEADER
3 static char vcid[] = "$Id: mpiaij.c,v 1.246 1998/05/12 16:16:19 bsmith Exp bsmith $";
4 #endif
5 
6 #include "pinclude/pviewer.h"
7 #include "src/mat/impls/aij/mpi/mpiaij.h"
8 #include "src/vec/vecimpl.h"
9 #include "src/inline/spops.h"
10 
11 /* local utility routine that creates a mapping from the global column
12 number to the local number in the off-diagonal part of the local
13 storage of the matrix.  This is done in a non scable way since the
14 length of colmap equals the global matrix length.
15 */
16 #undef __FUNC__
17 #define __FUNC__ "CreateColmap_MPIAIJ_Private"
18 int CreateColmap_MPIAIJ_Private(Mat mat)
19 {
20   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
21   Mat_SeqAIJ *B = (Mat_SeqAIJ*) aij->B->data;
22   int        n = B->n,i;
23 
24   PetscFunctionBegin;
25   aij->colmap = (int *) PetscMalloc((aij->N+1)*sizeof(int));CHKPTRQ(aij->colmap);
26   PLogObjectMemory(mat,aij->N*sizeof(int));
27   PetscMemzero(aij->colmap,aij->N*sizeof(int));
28   for ( i=0; i<n; i++ ) aij->colmap[aij->garray[i]] = i+1;
29   PetscFunctionReturn(0);
30 }
31 
32 extern int DisAssemble_MPIAIJ(Mat);
33 
34 #define CHUNKSIZE   15
35 #define MatSetValues_SeqAIJ_A_Private(row,col,value,addv) \
36 { \
37  \
38     rp   = aj + ai[row] + shift; ap = aa + ai[row] + shift; \
39     rmax = aimax[row]; nrow = ailen[row];  \
40     col1 = col - shift; \
41      \
42     low = 0; high = nrow; \
43     while (high-low > 5) { \
44       t = (low+high)/2; \
45       if (rp[t] > col) high = t; \
46       else             low  = t; \
47     } \
48       for ( _i=0; _i<nrow; _i++ ) { \
49         if (rp[_i] > col1) break; \
50         if (rp[_i] == col1) { \
51           if (addv == ADD_VALUES) ap[_i] += value;   \
52           else                  ap[_i] = value; \
53           goto a_noinsert; \
54         } \
55       }  \
56       if (nonew == 1) goto a_noinsert; \
57       else if (nonew == -1) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero into matrix"); \
58       if (nrow >= rmax) { \
59         /* there is no extra room in row, therefore enlarge */ \
60         int    new_nz = ai[a->m] + CHUNKSIZE,len,*new_i,*new_j; \
61         Scalar *new_a; \
62  \
63         if (nonew == -2) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero in the matrix"); \
64  \
65         /* malloc new storage space */ \
66         len     = new_nz*(sizeof(int)+sizeof(Scalar))+(a->m+1)*sizeof(int); \
67         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
68         new_j   = (int *) (new_a + new_nz); \
69         new_i   = new_j + new_nz; \
70  \
71         /* copy over old data into new slots */ \
72         for ( ii=0; ii<row+1; ii++ ) {new_i[ii] = ai[ii];} \
73         for ( ii=row+1; ii<a->m+1; ii++ ) {new_i[ii] = ai[ii]+CHUNKSIZE;} \
74         PetscMemcpy(new_j,aj,(ai[row]+nrow+shift)*sizeof(int)); \
75         len = (new_nz - CHUNKSIZE - ai[row] - nrow - shift); \
76         PetscMemcpy(new_j+ai[row]+shift+nrow+CHUNKSIZE,aj+ai[row]+shift+nrow, \
77                                                            len*sizeof(int)); \
78         PetscMemcpy(new_a,aa,(ai[row]+nrow+shift)*sizeof(Scalar)); \
79         PetscMemcpy(new_a+ai[row]+shift+nrow+CHUNKSIZE,aa+ai[row]+shift+nrow, \
80                                                            len*sizeof(Scalar));  \
81         /* free up old matrix storage */ \
82  \
83         PetscFree(a->a);  \
84         if (!a->singlemalloc) {PetscFree(a->i);PetscFree(a->j);} \
85         aa = a->a = new_a; ai = a->i = new_i; aj = a->j = new_j;  \
86         a->singlemalloc = 1; \
87  \
88         rp   = aj + ai[row] + shift; ap = aa + ai[row] + shift; \
89         rmax = aimax[row] = aimax[row] + CHUNKSIZE; \
90         PLogObjectMemory(A,CHUNKSIZE*(sizeof(int) + sizeof(Scalar))); \
91         a->maxnz += CHUNKSIZE; \
92         a->reallocs++; \
93       } \
94       N = nrow++ - 1; a->nz++; \
95       /* shift up all the later entries in this row */ \
96       for ( ii=N; ii>=_i; ii-- ) { \
97         rp[ii+1] = rp[ii]; \
98         ap[ii+1] = ap[ii]; \
99       } \
100       rp[_i] = col1;  \
101       ap[_i] = value;  \
102       a_noinsert: ; \
103       ailen[row] = nrow; \
104 }
105 
106 #define MatSetValues_SeqAIJ_B_Private(row,col,value,addv) \
107 { \
108  \
109     rp   = bj + bi[row] + shift; ap = ba + bi[row] + shift; \
110     rmax = bimax[row]; nrow = bilen[row];  \
111     col1 = col - shift; \
112      \
113     low = 0; high = nrow; \
114     while (high-low > 5) { \
115       t = (low+high)/2; \
116       if (rp[t] > col) high = t; \
117       else             low  = t; \
118     } \
119        for ( _i=0; _i<nrow; _i++ ) { \
120         if (rp[_i] > col1) break; \
121         if (rp[_i] == col1) { \
122           if (addv == ADD_VALUES) ap[_i] += value;   \
123           else                  ap[_i] = value; \
124           goto b_noinsert; \
125         } \
126       }  \
127       if (nonew == 1) goto b_noinsert; \
128       else if (nonew == -1) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero into matrix"); \
129       if (nrow >= rmax) { \
130         /* there is no extra room in row, therefore enlarge */ \
131         int    new_nz = bi[b->m] + CHUNKSIZE,len,*new_i,*new_j; \
132         Scalar *new_a; \
133  \
134         if (nonew == -2) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero in the matrix"); \
135  \
136         /* malloc new storage space */ \
137         len     = new_nz*(sizeof(int)+sizeof(Scalar))+(b->m+1)*sizeof(int); \
138         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
139         new_j   = (int *) (new_a + new_nz); \
140         new_i   = new_j + new_nz; \
141  \
142         /* copy over old data into new slots */ \
143         for ( ii=0; ii<row+1; ii++ ) {new_i[ii] = bi[ii];} \
144         for ( ii=row+1; ii<b->m+1; ii++ ) {new_i[ii] = bi[ii]+CHUNKSIZE;} \
145         PetscMemcpy(new_j,bj,(bi[row]+nrow+shift)*sizeof(int)); \
146         len = (new_nz - CHUNKSIZE - bi[row] - nrow - shift); \
147         PetscMemcpy(new_j+bi[row]+shift+nrow+CHUNKSIZE,bj+bi[row]+shift+nrow, \
148                                                            len*sizeof(int)); \
149         PetscMemcpy(new_a,ba,(bi[row]+nrow+shift)*sizeof(Scalar)); \
150         PetscMemcpy(new_a+bi[row]+shift+nrow+CHUNKSIZE,ba+bi[row]+shift+nrow, \
151                                                            len*sizeof(Scalar));  \
152         /* free up old matrix storage */ \
153  \
154         PetscFree(b->a);  \
155         if (!b->singlemalloc) {PetscFree(b->i);PetscFree(b->j);} \
156         ba = b->a = new_a; bi = b->i = new_i; bj = b->j = new_j;  \
157         b->singlemalloc = 1; \
158  \
159         rp   = bj + bi[row] + shift; ap = ba + bi[row] + shift; \
160         rmax = bimax[row] = bimax[row] + CHUNKSIZE; \
161         PLogObjectMemory(B,CHUNKSIZE*(sizeof(int) + sizeof(Scalar))); \
162         b->maxnz += CHUNKSIZE; \
163         b->reallocs++; \
164       } \
165       N = nrow++ - 1; b->nz++; \
166       /* shift up all the later entries in this row */ \
167       for ( ii=N; ii>=_i; ii-- ) { \
168         rp[ii+1] = rp[ii]; \
169         ap[ii+1] = ap[ii]; \
170       } \
171       rp[_i] = col1;  \
172       ap[_i] = value;  \
173       b_noinsert: ; \
174       bilen[row] = nrow; \
175 }
176 
177 extern int MatSetValues_SeqAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
178 #undef __FUNC__
179 #define __FUNC__ "MatSetValues_MPIAIJ"
180 int MatSetValues_MPIAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
181 {
182   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
183   Scalar     value;
184   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
185   int        cstart = aij->cstart, cend = aij->cend,row,col;
186   int        roworiented = aij->roworiented;
187 
188   /* Some Variables required in the macro */
189   Mat        A = aij->A;
190   Mat_SeqAIJ *a = (Mat_SeqAIJ *) A->data;
191   int        *aimax = a->imax, *ai = a->i, *ailen = a->ilen,*aj = a->j;
192   Scalar     *aa = a->a;
193 
194   Mat        B = aij->B;
195   Mat_SeqAIJ *b = (Mat_SeqAIJ *) B->data;
196   int        *bimax = b->imax, *bi = b->i, *bilen = b->ilen,*bj = b->j;
197   Scalar     *ba = b->a;
198 
199   int        *rp,ii,nrow,_i,rmax, N, col1,low,high,t;
200   int        nonew = a->nonew,shift = a->indexshift;
201   Scalar     *ap;
202 
203   PetscFunctionBegin;
204   for ( i=0; i<m; i++ ) {
205 #if defined(USE_PETSC_BOPT_g)
206     if (im[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
207     if (im[i] >= aij->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
208 #endif
209     if (im[i] >= rstart && im[i] < rend) {
210       row = im[i] - rstart;
211       for ( j=0; j<n; j++ ) {
212         if (in[j] >= cstart && in[j] < cend){
213           col = in[j] - cstart;
214           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
215           MatSetValues_SeqAIJ_A_Private(row,col,value,addv);
216           /* ierr = MatSetValues_SeqAIJ(aij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
217         }
218 #if defined(USE_PETSC_BOPT_g)
219         else if (in[j] < 0) {SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative column");}
220         else if (in[j] >= aij->N) {SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Column too large");}
221 #endif
222         else {
223           if (mat->was_assembled) {
224             if (!aij->colmap) {
225               ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
226             }
227             col = aij->colmap[in[j]] - 1;
228             if (col < 0 && !((Mat_SeqAIJ*)(aij->A->data))->nonew) {
229               ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
230               col =  in[j];
231               /* Reinitialize the variables required by MatSetValues_SeqAIJ_B_Private() */
232               B = aij->B;
233               b = (Mat_SeqAIJ *) B->data;
234               bimax = b->imax; bi = b->i; bilen = b->ilen; bj = b->j;
235               ba = b->a;
236             }
237           } else col = in[j];
238           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
239           MatSetValues_SeqAIJ_B_Private(row,col,value,addv);
240           /* ierr = MatSetValues_SeqAIJ(aij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
241         }
242       }
243     }
244     else {
245       if (roworiented && !aij->donotstash) {
246         ierr = StashValues_Private(&aij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
247       }
248       else {
249         if (!aij->donotstash) {
250           row = im[i];
251           for ( j=0; j<n; j++ ) {
252             ierr = StashValues_Private(&aij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
253           }
254         }
255       }
256     }
257   }
258   PetscFunctionReturn(0);
259 }
260 
261 #undef __FUNC__
262 #define __FUNC__ "MatGetValues_MPIAIJ"
263 int MatGetValues_MPIAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
264 {
265   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
266   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
267   int        cstart = aij->cstart, cend = aij->cend,row,col;
268 
269   PetscFunctionBegin;
270   for ( i=0; i<m; i++ ) {
271     if (idxm[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
272     if (idxm[i] >= aij->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
273     if (idxm[i] >= rstart && idxm[i] < rend) {
274       row = idxm[i] - rstart;
275       for ( j=0; j<n; j++ ) {
276         if (idxn[j] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative column");
277         if (idxn[j] >= aij->N) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Column too large");
278         if (idxn[j] >= cstart && idxn[j] < cend){
279           col = idxn[j] - cstart;
280           ierr = MatGetValues(aij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
281         }
282         else {
283           if (!aij->colmap) {
284             ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
285           }
286           col = aij->colmap[idxn[j]] - 1;
287           if ((col < 0) || (aij->garray[col] != idxn[j])) *(v+i*n+j) = 0.0;
288           else {
289             ierr = MatGetValues(aij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
290           }
291         }
292       }
293     } else {
294       SETERRQ(PETSC_ERR_SUP,0,"Only local values currently supported");
295     }
296   }
297   PetscFunctionReturn(0);
298 }
299 
300 #undef __FUNC__
301 #define __FUNC__ "MatAssemblyBegin_MPIAIJ"
302 int MatAssemblyBegin_MPIAIJ(Mat mat,MatAssemblyType mode)
303 {
304   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
305   MPI_Comm    comm = mat->comm;
306   int         size = aij->size, *owners = aij->rowners;
307   int         rank = aij->rank,tag = mat->tag, *owner,*starts,count,ierr;
308   MPI_Request *send_waits,*recv_waits;
309   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
310   InsertMode  addv;
311   Scalar      *rvalues,*svalues;
312 
313   PetscFunctionBegin;
314   if (aij->donotstash) {
315     aij->svalues    = 0; aij->rvalues    = 0;
316     aij->nsends     = 0; aij->nrecvs     = 0;
317     aij->send_waits = 0; aij->recv_waits = 0;
318     aij->rmax       = 0;
319     PetscFunctionReturn(0);
320   }
321 
322   /* make sure all processors are either in INSERTMODE or ADDMODE */
323   ierr = MPI_Allreduce(&mat->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);CHKERRQ(ierr);
324   if (addv == (ADD_VALUES|INSERT_VALUES)) {
325     SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"Some processors inserted others added");
326   }
327   mat->insertmode = addv; /* in case this processor had no cache */
328 
329   /*  first count number of contributors to each processor */
330   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
331   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
332   owner = (int *) PetscMalloc( (aij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
333   for ( i=0; i<aij->stash.n; i++ ) {
334     idx = aij->stash.idx[i];
335     for ( j=0; j<size; j++ ) {
336       if (idx >= owners[j] && idx < owners[j+1]) {
337         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
338       }
339     }
340   }
341   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
342 
343   /* inform other processors of number of messages and max length*/
344   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
345   ierr = MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
346   nreceives = work[rank];
347   ierr = MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);CHKERRQ(ierr);
348   nmax = work[rank];
349   PetscFree(work);
350 
351   /* post receives:
352        1) each message will consist of ordered pairs
353      (global index,value) we store the global index as a double
354      to simplify the message passing.
355        2) since we don't know how long each individual message is we
356      allocate the largest needed buffer for each receive. Potentially
357      this is a lot of wasted space.
358 
359 
360        This could be done better.
361   */
362   rvalues    = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));CHKPTRQ(rvalues);
363   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
364   for ( i=0; i<nreceives; i++ ) {
365     ierr = MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
366               comm,recv_waits+i);CHKERRQ(ierr);
367   }
368 
369   /* do sends:
370       1) starts[i] gives the starting index in svalues for stuff going to
371          the ith processor
372   */
373   svalues    = (Scalar *) PetscMalloc(3*(aij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
374   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
375   starts     = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
376   starts[0]  = 0;
377   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
378   for ( i=0; i<aij->stash.n; i++ ) {
379     svalues[3*starts[owner[i]]]       = (Scalar)  aij->stash.idx[i];
380     svalues[3*starts[owner[i]]+1]     = (Scalar)  aij->stash.idy[i];
381     svalues[3*(starts[owner[i]]++)+2] =  aij->stash.array[i];
382   }
383   PetscFree(owner);
384   starts[0] = 0;
385   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
386   count = 0;
387   for ( i=0; i<size; i++ ) {
388     if (procs[i]) {
389       ierr = MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,comm,send_waits+count++);CHKERRQ(ierr);
390     }
391   }
392   PetscFree(starts);
393   PetscFree(nprocs);
394 
395   /* Free cache space */
396   PLogInfo(aij->A,"MatAssemblyBegin_MPIAIJ:Number of off-processor values %d\n",aij->stash.n);
397   ierr = StashDestroy_Private(&aij->stash); CHKERRQ(ierr);
398 
399   aij->svalues    = svalues;    aij->rvalues    = rvalues;
400   aij->nsends     = nsends;     aij->nrecvs     = nreceives;
401   aij->send_waits = send_waits; aij->recv_waits = recv_waits;
402   aij->rmax       = nmax;
403 
404   PetscFunctionReturn(0);
405 }
406 extern int MatSetUpMultiply_MPIAIJ(Mat);
407 
408 #undef __FUNC__
409 #define __FUNC__ "MatAssemblyEnd_MPIAIJ"
410 int MatAssemblyEnd_MPIAIJ(Mat mat,MatAssemblyType mode)
411 {
412   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
413   MPI_Status  *send_status,recv_status;
414   int         imdex,nrecvs = aij->nrecvs, count = nrecvs, i, n, ierr;
415   int         row,col,other_disassembled;
416   Scalar      *values,val;
417   InsertMode  addv = mat->insertmode;
418 
419   PetscFunctionBegin;
420 
421   /*  wait on receives */
422   while (count) {
423     ierr = MPI_Waitany(nrecvs,aij->recv_waits,&imdex,&recv_status);CHKERRQ(ierr);
424     /* unpack receives into our local space */
425     values = aij->rvalues + 3*imdex*aij->rmax;
426     ierr = MPI_Get_count(&recv_status,MPIU_SCALAR,&n);CHKERRQ(ierr);
427     n = n/3;
428     for ( i=0; i<n; i++ ) {
429       row = (int) PetscReal(values[3*i]) - aij->rstart;
430       col = (int) PetscReal(values[3*i+1]);
431       val = values[3*i+2];
432       if (col >= aij->cstart && col < aij->cend) {
433         col -= aij->cstart;
434         ierr = MatSetValues(aij->A,1,&row,1,&col,&val,addv); CHKERRQ(ierr);
435       } else {
436         if (mat->was_assembled || mat->assembled) {
437           if (!aij->colmap) {
438             ierr = CreateColmap_MPIAIJ_Private(mat); CHKERRQ(ierr);
439           }
440           col = aij->colmap[col] - 1;
441           if (col < 0  && !((Mat_SeqAIJ*)(aij->A->data))->nonew) {
442             ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
443             col = (int) PetscReal(values[3*i+1]);
444           }
445         }
446         ierr = MatSetValues(aij->B,1,&row,1,&col,&val,addv); CHKERRQ(ierr);
447       }
448     }
449     count--;
450   }
451   if (aij->recv_waits) PetscFree(aij->recv_waits);
452   if (aij->rvalues)    PetscFree(aij->rvalues);
453 
454   /* wait on sends */
455   if (aij->nsends) {
456     send_status = (MPI_Status *) PetscMalloc(aij->nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
457     ierr        = MPI_Waitall(aij->nsends,aij->send_waits,send_status);CHKERRQ(ierr);
458     PetscFree(send_status);
459   }
460   if (aij->send_waits) PetscFree(aij->send_waits);
461   if (aij->svalues)    PetscFree(aij->svalues);
462 
463   ierr = MatAssemblyBegin(aij->A,mode); CHKERRQ(ierr);
464   ierr = MatAssemblyEnd(aij->A,mode); CHKERRQ(ierr);
465 
466   /* determine if any processor has disassembled, if so we must
467      also disassemble ourselfs, in order that we may reassemble. */
468   /*
469      if nonzero structure of submatrix B cannot change then we know that
470      no processor disassembled thus we can skip this stuff
471   */
472   if (!((Mat_SeqAIJ*) aij->B->data)->nonew)  {
473     ierr = MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);CHKERRQ(ierr);
474     if (mat->was_assembled && !other_disassembled) {
475       ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
476     }
477   }
478 
479   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
480     ierr = MatSetUpMultiply_MPIAIJ(mat); CHKERRQ(ierr);
481   }
482   ierr = MatAssemblyBegin(aij->B,mode); CHKERRQ(ierr);
483   ierr = MatAssemblyEnd(aij->B,mode); CHKERRQ(ierr);
484 
485   if (aij->rowvalues) {PetscFree(aij->rowvalues); aij->rowvalues = 0;}
486   PetscFunctionReturn(0);
487 }
488 
489 #undef __FUNC__
490 #define __FUNC__ "MatZeroEntries_MPIAIJ"
491 int MatZeroEntries_MPIAIJ(Mat A)
492 {
493   Mat_MPIAIJ *l = (Mat_MPIAIJ *) A->data;
494   int        ierr;
495 
496   PetscFunctionBegin;
497   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
498   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
499   PetscFunctionReturn(0);
500 }
501 
502 #undef __FUNC__
503 #define __FUNC__ "MatZeroRows_MPIAIJ"
504 int MatZeroRows_MPIAIJ(Mat A,IS is,Scalar *diag)
505 {
506   Mat_MPIAIJ     *l = (Mat_MPIAIJ *) A->data;
507   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
508   int            *procs,*nprocs,j,found,idx,nsends,*work,row;
509   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
510   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
511   int            *lens,imdex,*lrows,*values,rstart=l->rstart;
512   MPI_Comm       comm = A->comm;
513   MPI_Request    *send_waits,*recv_waits;
514   MPI_Status     recv_status,*send_status;
515   IS             istmp;
516   PetscTruth     localdiag;
517 
518   PetscFunctionBegin;
519   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
520   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
521 
522   /*  first count number of contributors to each processor */
523   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
524   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
525   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
526   for ( i=0; i<N; i++ ) {
527     idx = rows[i];
528     found = 0;
529     for ( j=0; j<size; j++ ) {
530       if (idx >= owners[j] && idx < owners[j+1]) {
531         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
532       }
533     }
534     if (!found) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Index out of range");
535   }
536   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
537 
538   /* inform other processors of number of messages and max length*/
539   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
540   ierr = MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
541   nrecvs = work[rank];
542   ierr = MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);CHKERRQ(ierr);
543   nmax = work[rank];
544   PetscFree(work);
545 
546   /* post receives:   */
547   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int));CHKPTRQ(rvalues);
548   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
549   for ( i=0; i<nrecvs; i++ ) {
550     ierr = MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);CHKERRQ(ierr);
551   }
552 
553   /* do sends:
554       1) starts[i] gives the starting index in svalues for stuff going to
555          the ith processor
556   */
557   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
558   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
559   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
560   starts[0] = 0;
561   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
562   for ( i=0; i<N; i++ ) {
563     svalues[starts[owner[i]]++] = rows[i];
564   }
565   ISRestoreIndices(is,&rows);
566 
567   starts[0] = 0;
568   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
569   count = 0;
570   for ( i=0; i<size; i++ ) {
571     if (procs[i]) {
572       ierr = MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);CHKERRQ(ierr);
573     }
574   }
575   PetscFree(starts);
576 
577   base = owners[rank];
578 
579   /*  wait on receives */
580   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
581   source = lens + nrecvs;
582   count  = nrecvs; slen = 0;
583   while (count) {
584     ierr = MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);CHKERRQ(ierr);
585     /* unpack receives into our local space */
586     ierr = MPI_Get_count(&recv_status,MPI_INT,&n);CHKERRQ(ierr);
587     source[imdex]  = recv_status.MPI_SOURCE;
588     lens[imdex]  = n;
589     slen += n;
590     count--;
591   }
592   PetscFree(recv_waits);
593 
594   /* move the data into the send scatter */
595   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
596   count = 0;
597   for ( i=0; i<nrecvs; i++ ) {
598     values = rvalues + i*nmax;
599     for ( j=0; j<lens[i]; j++ ) {
600       lrows[count++] = values[j] - base;
601     }
602   }
603   PetscFree(rvalues); PetscFree(lens);
604   PetscFree(owner); PetscFree(nprocs);
605 
606   /* actually zap the local rows */
607   ierr = ISCreateGeneral(PETSC_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
608   PLogObjectParent(A,istmp);
609 
610   /*
611         Zero the required rows. If the "diagonal block" of the matrix
612      is square and the user wishes to set the diagonal we use seperate
613      code so that MatSetValues() is not called for each diagonal allocating
614      new memory, thus calling lots of mallocs and slowing things down.
615 
616        Contributed by: Mathew Knepley
617   */
618   localdiag = PETSC_FALSE;
619   if (diag && (l->A->M == l->A->N)) {
620     localdiag = PETSC_TRUE;
621     ierr      = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
622   } else {
623     ierr = MatZeroRows(l->A,istmp,0); CHKERRQ(ierr);
624   }
625   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
626   ierr = ISDestroy(istmp); CHKERRQ(ierr);
627 
628   if (diag && (localdiag == PETSC_FALSE)) {
629     for ( i = 0; i < slen; i++ ) {
630       row = lrows[i] + rstart;
631       MatSetValues(A,1,&row,1,&row,diag,INSERT_VALUES);
632     }
633     MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY);
634     MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY);
635   }
636 
637   if (diag) {
638     for ( i = 0; i < slen; i++ ) {
639       row = lrows[i] + rstart;
640       MatSetValues(A,1,&row,1,&row,diag,INSERT_VALUES);
641     }
642     MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY);
643     MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY);
644   }
645   PetscFree(lrows);
646 
647   /* wait on sends */
648   if (nsends) {
649     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
650     ierr        = MPI_Waitall(nsends,send_waits,send_status);CHKERRQ(ierr);
651     PetscFree(send_status);
652   }
653   PetscFree(send_waits); PetscFree(svalues);
654 
655   PetscFunctionReturn(0);
656 }
657 
658 #undef __FUNC__
659 #define __FUNC__ "MatMult_MPIAIJ"
660 int MatMult_MPIAIJ(Mat A,Vec xx,Vec yy)
661 {
662   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
663   int        ierr,nt;
664 
665   PetscFunctionBegin;
666   ierr = VecGetLocalSize(xx,&nt);  CHKERRQ(ierr);
667   if (nt != a->n) {
668     SETERRQ(PETSC_ERR_ARG_SIZ,0,"Incompatible partition of A and xx");
669   }
670   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx); CHKERRQ(ierr);
671   ierr = (*a->A->ops->mult)(a->A,xx,yy); CHKERRQ(ierr);
672   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx); CHKERRQ(ierr);
673   ierr = (*a->B->ops->multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
674   PetscFunctionReturn(0);
675 }
676 
677 #undef __FUNC__
678 #define __FUNC__ "MatMultAdd_MPIAIJ"
679 int MatMultAdd_MPIAIJ(Mat A,Vec xx,Vec yy,Vec zz)
680 {
681   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
682   int        ierr;
683 
684   PetscFunctionBegin;
685   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
686   ierr = (*a->A->ops->multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
687   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
688   ierr = (*a->B->ops->multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
689   PetscFunctionReturn(0);
690 }
691 
692 #undef __FUNC__
693 #define __FUNC__ "MatMultTrans_MPIAIJ"
694 int MatMultTrans_MPIAIJ(Mat A,Vec xx,Vec yy)
695 {
696   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
697   int        ierr;
698 
699   PetscFunctionBegin;
700   /* do nondiagonal part */
701   ierr = (*a->B->ops->multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
702   /* send it on its way */
703   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
704   /* do local part */
705   ierr = (*a->A->ops->multtrans)(a->A,xx,yy); CHKERRQ(ierr);
706   /* receive remote parts: note this assumes the values are not actually */
707   /* inserted in yy until the next line, which is true for my implementation*/
708   /* but is not perhaps always true. */
709   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
710   PetscFunctionReturn(0);
711 }
712 
713 #undef __FUNC__
714 #define __FUNC__ "MatMultTransAdd_MPIAIJ"
715 int MatMultTransAdd_MPIAIJ(Mat A,Vec xx,Vec yy,Vec zz)
716 {
717   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
718   int        ierr;
719 
720   PetscFunctionBegin;
721   /* do nondiagonal part */
722   ierr = (*a->B->ops->multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
723   /* send it on its way */
724   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
725   /* do local part */
726   ierr = (*a->A->ops->multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
727   /* receive remote parts: note this assumes the values are not actually */
728   /* inserted in yy until the next line, which is true for my implementation*/
729   /* but is not perhaps always true. */
730   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
731   PetscFunctionReturn(0);
732 }
733 
734 /*
735   This only works correctly for square matrices where the subblock A->A is the
736    diagonal block
737 */
738 #undef __FUNC__
739 #define __FUNC__ "MatGetDiagonal_MPIAIJ"
740 int MatGetDiagonal_MPIAIJ(Mat A,Vec v)
741 {
742   int        ierr;
743   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
744 
745   PetscFunctionBegin;
746   if (a->M != a->N) SETERRQ(PETSC_ERR_SUP,0,"Supports only square matrix where A->A is diag block");
747   if (a->rstart != a->cstart || a->rend != a->cend) {
748     SETERRQ(PETSC_ERR_ARG_SIZ,0,"row partition must equal col partition");
749   }
750   ierr = MatGetDiagonal(a->A,v);CHKERRQ(ierr);
751   PetscFunctionReturn(0);
752 }
753 
754 #undef __FUNC__
755 #define __FUNC__ "MatScale_MPIAIJ"
756 int MatScale_MPIAIJ(Scalar *aa,Mat A)
757 {
758   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
759   int        ierr;
760 
761   PetscFunctionBegin;
762   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
763   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
764   PetscFunctionReturn(0);
765 }
766 
767 #undef __FUNC__
768 #define __FUNC__ "MatDestroy_MPIAIJ"
769 int MatDestroy_MPIAIJ(Mat mat)
770 {
771   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
772   int        ierr;
773 
774   PetscFunctionBegin;
775 #if defined(USE_PETSC_LOG)
776   PLogObjectState((PetscObject)mat,"Rows=%d, Cols=%d",aij->M,aij->N);
777 #endif
778   ierr = StashDestroy_Private(&aij->stash); CHKERRQ(ierr);
779   PetscFree(aij->rowners);
780   ierr = MatDestroy(aij->A); CHKERRQ(ierr);
781   ierr = MatDestroy(aij->B); CHKERRQ(ierr);
782   if (aij->colmap) PetscFree(aij->colmap);
783   if (aij->garray) PetscFree(aij->garray);
784   if (aij->lvec)   VecDestroy(aij->lvec);
785   if (aij->Mvctx)  VecScatterDestroy(aij->Mvctx);
786   if (aij->rowvalues) PetscFree(aij->rowvalues);
787   PetscFree(aij);
788   PLogObjectDestroy(mat);
789   PetscHeaderDestroy(mat);
790   PetscFunctionReturn(0);
791 }
792 
793 #undef __FUNC__
794 #define __FUNC__ "MatView_MPIAIJ_Binary"
795 extern int MatView_MPIAIJ_Binary(Mat mat,Viewer viewer)
796 {
797   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
798   int         ierr;
799 
800   PetscFunctionBegin;
801   if (aij->size == 1) {
802     ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
803   }
804   else SETERRQ(PETSC_ERR_SUP,0,"Only uniprocessor output supported");
805   PetscFunctionReturn(0);
806 }
807 
808 #undef __FUNC__
809 #define __FUNC__ "MatView_MPIAIJ_ASCIIorDraworMatlab"
810 extern int MatView_MPIAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
811 {
812   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
813   Mat_SeqAIJ* C = (Mat_SeqAIJ*)aij->A->data;
814   int         ierr, format,shift = C->indexshift,rank;
815   FILE        *fd;
816   ViewerType  vtype;
817 
818   PetscFunctionBegin;
819   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
820   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
821     ierr = ViewerGetFormat(viewer,&format);
822     if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
823       MatInfo info;
824       int     flg;
825       MPI_Comm_rank(mat->comm,&rank);
826       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
827       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
828       ierr = OptionsHasName(PETSC_NULL,"-mat_aij_no_inode",&flg); CHKERRQ(ierr);
829       PetscSequentialPhaseBegin(mat->comm,1);
830       if (flg) fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d mem %d, not using I-node routines\n",
831          rank,aij->m,(int)info.nz_used,(int)info.nz_allocated,(int)info.memory);
832       else fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d mem %d, using I-node routines\n",
833          rank,aij->m,(int)info.nz_used,(int)info.nz_allocated,(int)info.memory);
834       ierr = MatGetInfo(aij->A,MAT_LOCAL,&info);
835       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used);
836       ierr = MatGetInfo(aij->B,MAT_LOCAL,&info);
837       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used);
838       fflush(fd);
839       PetscSequentialPhaseEnd(mat->comm,1);
840       ierr = VecScatterView(aij->Mvctx,viewer); CHKERRQ(ierr);
841       PetscFunctionReturn(0);
842     } else if (format == VIEWER_FORMAT_ASCII_INFO) {
843       PetscFunctionReturn(0);
844     }
845   }
846 
847   if (vtype == DRAW_VIEWER) {
848     Draw       draw;
849     PetscTruth isnull;
850     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
851     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) PetscFunctionReturn(0);
852   }
853 
854   if (vtype == ASCII_FILE_VIEWER) {
855     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
856     PetscSequentialPhaseBegin(mat->comm,1);
857     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
858            aij->rank,aij->m,aij->rstart,aij->rend,aij->n,aij->cstart,
859            aij->cend);
860     ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
861     ierr = MatView(aij->B,viewer); CHKERRQ(ierr);
862     fflush(fd);
863     PetscSequentialPhaseEnd(mat->comm,1);
864   } else {
865     int size = aij->size;
866     rank = aij->rank;
867     if (size == 1) {
868       ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
869     } else {
870       /* assemble the entire matrix onto first processor. */
871       Mat         A;
872       Mat_SeqAIJ *Aloc;
873       int         M = aij->M, N = aij->N,m,*ai,*aj,row,*cols,i,*ct;
874       Scalar      *a;
875 
876       if (!rank) {
877         ierr = MatCreateMPIAIJ(mat->comm,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);CHKERRQ(ierr);
878       } else {
879         ierr = MatCreateMPIAIJ(mat->comm,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);CHKERRQ(ierr);
880       }
881       PLogObjectParent(mat,A);
882 
883       /* copy over the A part */
884       Aloc = (Mat_SeqAIJ*) aij->A->data;
885       m = Aloc->m; ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
886       row = aij->rstart;
887       for ( i=0; i<ai[m]+shift; i++ ) {aj[i] += aij->cstart + shift;}
888       for ( i=0; i<m; i++ ) {
889         ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],aj,a,INSERT_VALUES);CHKERRQ(ierr);
890         row++; a += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
891       }
892       aj = Aloc->j;
893       for ( i=0; i<ai[m]+shift; i++ ) {aj[i] -= aij->cstart + shift;}
894 
895       /* copy over the B part */
896       Aloc = (Mat_SeqAIJ*) aij->B->data;
897       m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
898       row = aij->rstart;
899       ct = cols = (int *) PetscMalloc( (ai[m]+1)*sizeof(int) ); CHKPTRQ(cols);
900       for ( i=0; i<ai[m]+shift; i++ ) {cols[i] = aij->garray[aj[i]+shift];}
901       for ( i=0; i<m; i++ ) {
902         ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],cols,a,INSERT_VALUES);CHKERRQ(ierr);
903         row++; a += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
904       }
905       PetscFree(ct);
906       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
907       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
908       /*
909          Everyone has to call to draw the matrix since the graphics waits are
910          synchronized across all processors that share the Draw object
911       */
912       if (!rank || vtype == DRAW_VIEWER) {
913         ierr = MatView(((Mat_MPIAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
914       }
915       ierr = MatDestroy(A); CHKERRQ(ierr);
916     }
917   }
918   PetscFunctionReturn(0);
919 }
920 
921 #undef __FUNC__
922 #define __FUNC__ "MatView_MPIAIJ"
923 int MatView_MPIAIJ(Mat mat,Viewer viewer)
924 {
925   int         ierr;
926   ViewerType  vtype;
927 
928   PetscFunctionBegin;
929   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
930   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
931       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
932     ierr = MatView_MPIAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
933   } else if (vtype == BINARY_FILE_VIEWER) {
934     ierr = MatView_MPIAIJ_Binary(mat,viewer);CHKERRQ(ierr);
935   } else {
936     SETERRQ(1,1,"Viewer type not supported by PETSc object");
937   }
938   PetscFunctionReturn(0);
939 }
940 
941 /*
942     This has to provide several versions.
943 
944      2) a) use only local smoothing updating outer values only once.
945         b) local smoothing updating outer values each inner iteration
946      3) color updating out values betwen colors.
947 */
948 #undef __FUNC__
949 #define __FUNC__ "MatRelax_MPIAIJ"
950 int MatRelax_MPIAIJ(Mat matin,Vec bb,double omega,MatSORType flag,
951                            double fshift,int its,Vec xx)
952 {
953   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
954   Mat        AA = mat->A, BB = mat->B;
955   Mat_SeqAIJ *A = (Mat_SeqAIJ *) AA->data, *B = (Mat_SeqAIJ *)BB->data;
956   Scalar     *b,*x,*xs,*ls,d,*v,sum;
957   int        ierr,*idx, *diag;
958   int        n = mat->n, m = mat->m, i,shift = A->indexshift;
959 
960   PetscFunctionBegin;
961   VecGetArray(xx,&x); VecGetArray(bb,&b); VecGetArray(mat->lvec,&ls);
962   xs = x + shift; /* shift by one for index start of 1 */
963   ls = ls + shift;
964   if (!A->diag) {ierr = MatMarkDiag_SeqAIJ(AA); CHKERRQ(ierr);}
965   diag = A->diag;
966   if ((flag & SOR_LOCAL_SYMMETRIC_SWEEP) == SOR_LOCAL_SYMMETRIC_SWEEP){
967     if (flag & SOR_ZERO_INITIAL_GUESS) {
968       ierr = (*mat->A->ops->relax)(mat->A,bb,omega,flag,fshift,its,xx);CHKERRQ(ierr);
969       PetscFunctionReturn(0);
970     }
971     ierr=VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);CHKERRQ(ierr);
972     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);CHKERRQ(ierr);
973     while (its--) {
974       /* go down through the rows */
975       for ( i=0; i<m; i++ ) {
976         n    = A->i[i+1] - A->i[i];
977         idx  = A->j + A->i[i] + shift;
978         v    = A->a + A->i[i] + shift;
979         sum  = b[i];
980         SPARSEDENSEMDOT(sum,xs,v,idx,n);
981         d    = fshift + A->a[diag[i]+shift];
982         n    = B->i[i+1] - B->i[i];
983         idx  = B->j + B->i[i] + shift;
984         v    = B->a + B->i[i] + shift;
985         SPARSEDENSEMDOT(sum,ls,v,idx,n);
986         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
987       }
988       /* come up through the rows */
989       for ( i=m-1; i>-1; i-- ) {
990         n    = A->i[i+1] - A->i[i];
991         idx  = A->j + A->i[i] + shift;
992         v    = A->a + A->i[i] + shift;
993         sum  = b[i];
994         SPARSEDENSEMDOT(sum,xs,v,idx,n);
995         d    = fshift + A->a[diag[i]+shift];
996         n    = B->i[i+1] - B->i[i];
997         idx  = B->j + B->i[i] + shift;
998         v    = B->a + B->i[i] + shift;
999         SPARSEDENSEMDOT(sum,ls,v,idx,n);
1000         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
1001       }
1002     }
1003   } else if (flag & SOR_LOCAL_FORWARD_SWEEP){
1004     if (flag & SOR_ZERO_INITIAL_GUESS) {
1005       ierr = (*mat->A->ops->relax)(mat->A,bb,omega,flag,fshift,its,xx);CHKERRQ(ierr);
1006       PetscFunctionReturn(0);
1007     }
1008     ierr=VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);CHKERRQ(ierr);
1009     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);CHKERRQ(ierr);
1010     while (its--) {
1011       for ( i=0; i<m; i++ ) {
1012         n    = A->i[i+1] - A->i[i];
1013         idx  = A->j + A->i[i] + shift;
1014         v    = A->a + A->i[i] + shift;
1015         sum  = b[i];
1016         SPARSEDENSEMDOT(sum,xs,v,idx,n);
1017         d    = fshift + A->a[diag[i]+shift];
1018         n    = B->i[i+1] - B->i[i];
1019         idx  = B->j + B->i[i] + shift;
1020         v    = B->a + B->i[i] + shift;
1021         SPARSEDENSEMDOT(sum,ls,v,idx,n);
1022         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
1023       }
1024     }
1025   } else if (flag & SOR_LOCAL_BACKWARD_SWEEP){
1026     if (flag & SOR_ZERO_INITIAL_GUESS) {
1027       ierr = (*mat->A->ops->relax)(mat->A,bb,omega,flag,fshift,its,xx);CHKERRQ(ierr);
1028       PetscFunctionReturn(0);
1029     }
1030     ierr = VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,
1031                             mat->Mvctx); CHKERRQ(ierr);
1032     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,
1033                             mat->Mvctx); CHKERRQ(ierr);
1034     while (its--) {
1035       for ( i=m-1; i>-1; i-- ) {
1036         n    = A->i[i+1] - A->i[i];
1037         idx  = A->j + A->i[i] + shift;
1038         v    = A->a + A->i[i] + shift;
1039         sum  = b[i];
1040         SPARSEDENSEMDOT(sum,xs,v,idx,n);
1041         d    = fshift + A->a[diag[i]+shift];
1042         n    = B->i[i+1] - B->i[i];
1043         idx  = B->j + B->i[i] + shift;
1044         v    = B->a + B->i[i] + shift;
1045         SPARSEDENSEMDOT(sum,ls,v,idx,n);
1046         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
1047       }
1048     }
1049   } else {
1050     SETERRQ(PETSC_ERR_SUP,0,"Parallel SOR not supported");
1051   }
1052   PetscFunctionReturn(0);
1053 }
1054 
1055 #undef __FUNC__
1056 #define __FUNC__ "MatGetInfo_MPIAIJ"
1057 int MatGetInfo_MPIAIJ(Mat matin,MatInfoType flag,MatInfo *info)
1058 {
1059   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1060   Mat        A = mat->A, B = mat->B;
1061   int        ierr;
1062   double     isend[5], irecv[5];
1063 
1064   PetscFunctionBegin;
1065   info->block_size     = 1.0;
1066   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
1067   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->nz_unneeded;
1068   isend[3] = info->memory;  isend[4] = info->mallocs;
1069   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
1070   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->nz_unneeded;
1071   isend[3] += info->memory;  isend[4] += info->mallocs;
1072   if (flag == MAT_LOCAL) {
1073     info->nz_used      = isend[0];
1074     info->nz_allocated = isend[1];
1075     info->nz_unneeded  = isend[2];
1076     info->memory       = isend[3];
1077     info->mallocs      = isend[4];
1078   } else if (flag == MAT_GLOBAL_MAX) {
1079     ierr = MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_MAX,matin->comm);CHKERRQ(ierr);
1080     info->nz_used      = irecv[0];
1081     info->nz_allocated = irecv[1];
1082     info->nz_unneeded  = irecv[2];
1083     info->memory       = irecv[3];
1084     info->mallocs      = irecv[4];
1085   } else if (flag == MAT_GLOBAL_SUM) {
1086     ierr = MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_SUM,matin->comm);CHKERRQ(ierr);
1087     info->nz_used      = irecv[0];
1088     info->nz_allocated = irecv[1];
1089     info->nz_unneeded  = irecv[2];
1090     info->memory       = irecv[3];
1091     info->mallocs      = irecv[4];
1092   }
1093   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
1094   info->fill_ratio_needed = 0;
1095   info->factor_mallocs    = 0;
1096   info->rows_global       = (double)mat->M;
1097   info->columns_global    = (double)mat->N;
1098   info->rows_local        = (double)mat->m;
1099   info->columns_local     = (double)mat->N;
1100 
1101   PetscFunctionReturn(0);
1102 }
1103 
1104 #undef __FUNC__
1105 #define __FUNC__ "MatSetOption_MPIAIJ"
1106 int MatSetOption_MPIAIJ(Mat A,MatOption op)
1107 {
1108   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
1109 
1110   PetscFunctionBegin;
1111   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
1112       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
1113       op == MAT_COLUMNS_UNSORTED ||
1114       op == MAT_COLUMNS_SORTED ||
1115       op == MAT_NEW_NONZERO_ALLOCATION_ERROR ||
1116       op == MAT_NEW_NONZERO_LOCATION_ERROR) {
1117         MatSetOption(a->A,op);
1118         MatSetOption(a->B,op);
1119   } else if (op == MAT_ROW_ORIENTED) {
1120     a->roworiented = 1;
1121     MatSetOption(a->A,op);
1122     MatSetOption(a->B,op);
1123   } else if (op == MAT_ROWS_SORTED ||
1124              op == MAT_ROWS_UNSORTED ||
1125              op == MAT_SYMMETRIC ||
1126              op == MAT_STRUCTURALLY_SYMMETRIC ||
1127              op == MAT_YES_NEW_DIAGONALS)
1128     PLogInfo(A,"MatSetOption_MPIAIJ:Option ignored\n");
1129   else if (op == MAT_COLUMN_ORIENTED) {
1130     a->roworiented = 0;
1131     MatSetOption(a->A,op);
1132     MatSetOption(a->B,op);
1133   } else if (op == MAT_IGNORE_OFF_PROC_ENTRIES) {
1134     a->donotstash = 1;
1135   } else if (op == MAT_NO_NEW_DIAGONALS){
1136     SETERRQ(PETSC_ERR_SUP,0,"MAT_NO_NEW_DIAGONALS");
1137   } else {
1138     SETERRQ(PETSC_ERR_SUP,0,"unknown option");
1139   }
1140   PetscFunctionReturn(0);
1141 }
1142 
1143 #undef __FUNC__
1144 #define __FUNC__ "MatGetSize_MPIAIJ"
1145 int MatGetSize_MPIAIJ(Mat matin,int *m,int *n)
1146 {
1147   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1148 
1149   PetscFunctionBegin;
1150   if (m) *m = mat->M;
1151   if (n) *n = mat->N;
1152   PetscFunctionReturn(0);
1153 }
1154 
1155 #undef __FUNC__
1156 #define __FUNC__ "MatGetLocalSize_MPIAIJ"
1157 int MatGetLocalSize_MPIAIJ(Mat matin,int *m,int *n)
1158 {
1159   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1160 
1161   PetscFunctionBegin;
1162   if (m) *m = mat->m;
1163   if (n) *n = mat->n;
1164   PetscFunctionReturn(0);
1165 }
1166 
1167 #undef __FUNC__
1168 #define __FUNC__ "MatGetOwnershipRange_MPIAIJ"
1169 int MatGetOwnershipRange_MPIAIJ(Mat matin,int *m,int *n)
1170 {
1171   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1172 
1173   PetscFunctionBegin;
1174   *m = mat->rstart; *n = mat->rend;
1175   PetscFunctionReturn(0);
1176 }
1177 
1178 extern int MatGetRow_SeqAIJ(Mat,int,int*,int**,Scalar**);
1179 extern int MatRestoreRow_SeqAIJ(Mat,int,int*,int**,Scalar**);
1180 
1181 #undef __FUNC__
1182 #define __FUNC__ "MatGetRow_MPIAIJ"
1183 int MatGetRow_MPIAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
1184 {
1185   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1186   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
1187   int        i, ierr, *cworkA, *cworkB, **pcA, **pcB, cstart = mat->cstart;
1188   int        nztot, nzA, nzB, lrow, rstart = mat->rstart, rend = mat->rend;
1189   int        *cmap, *idx_p;
1190 
1191   PetscFunctionBegin;
1192   if (mat->getrowactive == PETSC_TRUE) SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"Already active");
1193   mat->getrowactive = PETSC_TRUE;
1194 
1195   if (!mat->rowvalues && (idx || v)) {
1196     /*
1197         allocate enough space to hold information from the longest row.
1198     */
1199     Mat_SeqAIJ *Aa = (Mat_SeqAIJ *) mat->A->data,*Ba = (Mat_SeqAIJ *) mat->B->data;
1200     int     max = 1,m = mat->m,tmp;
1201     for ( i=0; i<m; i++ ) {
1202       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
1203       if (max < tmp) { max = tmp; }
1204     }
1205     mat->rowvalues = (Scalar *) PetscMalloc( max*(sizeof(int)+sizeof(Scalar)));
1206     CHKPTRQ(mat->rowvalues);
1207     mat->rowindices = (int *) (mat->rowvalues + max);
1208   }
1209 
1210   if (row < rstart || row >= rend) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Only local rows")
1211   lrow = row - rstart;
1212 
1213   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
1214   if (!v)   {pvA = 0; pvB = 0;}
1215   if (!idx) {pcA = 0; if (!v) pcB = 0;}
1216   ierr = (*mat->A->ops->getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1217   ierr = (*mat->B->ops->getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1218   nztot = nzA + nzB;
1219 
1220   cmap  = mat->garray;
1221   if (v  || idx) {
1222     if (nztot) {
1223       /* Sort by increasing column numbers, assuming A and B already sorted */
1224       int imark = -1;
1225       if (v) {
1226         *v = v_p = mat->rowvalues;
1227         for ( i=0; i<nzB; i++ ) {
1228           if (cmap[cworkB[i]] < cstart)   v_p[i] = vworkB[i];
1229           else break;
1230         }
1231         imark = i;
1232         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
1233         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
1234       }
1235       if (idx) {
1236         *idx = idx_p = mat->rowindices;
1237         if (imark > -1) {
1238           for ( i=0; i<imark; i++ ) {
1239             idx_p[i] = cmap[cworkB[i]];
1240           }
1241         } else {
1242           for ( i=0; i<nzB; i++ ) {
1243             if (cmap[cworkB[i]] < cstart)   idx_p[i] = cmap[cworkB[i]];
1244             else break;
1245           }
1246           imark = i;
1247         }
1248         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart + cworkA[i];
1249         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]];
1250       }
1251     }
1252     else {
1253       if (idx) *idx = 0;
1254       if (v)   *v   = 0;
1255     }
1256   }
1257   *nz = nztot;
1258   ierr = (*mat->A->ops->restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1259   ierr = (*mat->B->ops->restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1260   PetscFunctionReturn(0);
1261 }
1262 
1263 #undef __FUNC__
1264 #define __FUNC__ "MatRestoreRow_MPIAIJ"
1265 int MatRestoreRow_MPIAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
1266 {
1267   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
1268 
1269   PetscFunctionBegin;
1270   if (aij->getrowactive == PETSC_FALSE) {
1271     SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"MatGetRow not called");
1272   }
1273   aij->getrowactive = PETSC_FALSE;
1274   PetscFunctionReturn(0);
1275 }
1276 
1277 #undef __FUNC__
1278 #define __FUNC__ "MatNorm_MPIAIJ"
1279 int MatNorm_MPIAIJ(Mat mat,NormType type,double *norm)
1280 {
1281   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
1282   Mat_SeqAIJ *amat = (Mat_SeqAIJ*) aij->A->data, *bmat = (Mat_SeqAIJ*) aij->B->data;
1283   int        ierr, i, j, cstart = aij->cstart,shift = amat->indexshift;
1284   double     sum = 0.0;
1285   Scalar     *v;
1286 
1287   PetscFunctionBegin;
1288   if (aij->size == 1) {
1289     ierr =  MatNorm(aij->A,type,norm); CHKERRQ(ierr);
1290   } else {
1291     if (type == NORM_FROBENIUS) {
1292       v = amat->a;
1293       for (i=0; i<amat->nz; i++ ) {
1294 #if defined(USE_PETSC_COMPLEX)
1295         sum += real(conj(*v)*(*v)); v++;
1296 #else
1297         sum += (*v)*(*v); v++;
1298 #endif
1299       }
1300       v = bmat->a;
1301       for (i=0; i<bmat->nz; i++ ) {
1302 #if defined(USE_PETSC_COMPLEX)
1303         sum += real(conj(*v)*(*v)); v++;
1304 #else
1305         sum += (*v)*(*v); v++;
1306 #endif
1307       }
1308       ierr = MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);CHKERRQ(ierr);
1309       *norm = sqrt(*norm);
1310     } else if (type == NORM_1) { /* max column norm */
1311       double *tmp, *tmp2;
1312       int    *jj, *garray = aij->garray;
1313       tmp  = (double *) PetscMalloc( (aij->N+1)*sizeof(double) ); CHKPTRQ(tmp);
1314       tmp2 = (double *) PetscMalloc( (aij->N+1)*sizeof(double) ); CHKPTRQ(tmp2);
1315       PetscMemzero(tmp,aij->N*sizeof(double));
1316       *norm = 0.0;
1317       v = amat->a; jj = amat->j;
1318       for ( j=0; j<amat->nz; j++ ) {
1319         tmp[cstart + *jj++ + shift] += PetscAbsScalar(*v);  v++;
1320       }
1321       v = bmat->a; jj = bmat->j;
1322       for ( j=0; j<bmat->nz; j++ ) {
1323         tmp[garray[*jj++ + shift]] += PetscAbsScalar(*v); v++;
1324       }
1325       ierr = MPI_Allreduce(tmp,tmp2,aij->N,MPI_DOUBLE,MPI_SUM,mat->comm);CHKERRQ(ierr);
1326       for ( j=0; j<aij->N; j++ ) {
1327         if (tmp2[j] > *norm) *norm = tmp2[j];
1328       }
1329       PetscFree(tmp); PetscFree(tmp2);
1330     } else if (type == NORM_INFINITY) { /* max row norm */
1331       double ntemp = 0.0;
1332       for ( j=0; j<amat->m; j++ ) {
1333         v = amat->a + amat->i[j] + shift;
1334         sum = 0.0;
1335         for ( i=0; i<amat->i[j+1]-amat->i[j]; i++ ) {
1336           sum += PetscAbsScalar(*v); v++;
1337         }
1338         v = bmat->a + bmat->i[j] + shift;
1339         for ( i=0; i<bmat->i[j+1]-bmat->i[j]; i++ ) {
1340           sum += PetscAbsScalar(*v); v++;
1341         }
1342         if (sum > ntemp) ntemp = sum;
1343       }
1344       ierr = MPI_Allreduce(&ntemp,norm,1,MPI_DOUBLE,MPI_MAX,mat->comm);CHKERRQ(ierr);
1345     } else {
1346       SETERRQ(PETSC_ERR_SUP,0,"No support for two norm");
1347     }
1348   }
1349   PetscFunctionReturn(0);
1350 }
1351 
1352 #undef __FUNC__
1353 #define __FUNC__ "MatTranspose_MPIAIJ"
1354 int MatTranspose_MPIAIJ(Mat A,Mat *matout)
1355 {
1356   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
1357   Mat_SeqAIJ *Aloc = (Mat_SeqAIJ *) a->A->data;
1358   int        ierr,shift = Aloc->indexshift;
1359   int        M = a->M, N = a->N,m,*ai,*aj,row,*cols,i,*ct;
1360   Mat        B;
1361   Scalar     *array;
1362 
1363   PetscFunctionBegin;
1364   if (matout == PETSC_NULL && M != N) {
1365     SETERRQ(PETSC_ERR_ARG_SIZ,0,"Square matrix only for in-place");
1366   }
1367 
1368   ierr = MatCreateMPIAIJ(A->comm,a->n,a->m,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);CHKERRQ(ierr);
1369 
1370   /* copy over the A part */
1371   Aloc = (Mat_SeqAIJ*) a->A->data;
1372   m = Aloc->m; ai = Aloc->i; aj = Aloc->j; array = Aloc->a;
1373   row = a->rstart;
1374   for ( i=0; i<ai[m]+shift; i++ ) {aj[i] += a->cstart + shift;}
1375   for ( i=0; i<m; i++ ) {
1376     ierr = MatSetValues(B,ai[i+1]-ai[i],aj,1,&row,array,INSERT_VALUES);CHKERRQ(ierr);
1377     row++; array += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
1378   }
1379   aj = Aloc->j;
1380   for ( i=0; i<ai[m]+shift; i++ ) {aj[i] -= a->cstart + shift;}
1381 
1382   /* copy over the B part */
1383   Aloc = (Mat_SeqAIJ*) a->B->data;
1384   m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; array = Aloc->a;
1385   row = a->rstart;
1386   ct = cols = (int *) PetscMalloc( (1+ai[m]-shift)*sizeof(int) ); CHKPTRQ(cols);
1387   for ( i=0; i<ai[m]+shift; i++ ) {cols[i] = a->garray[aj[i]+shift];}
1388   for ( i=0; i<m; i++ ) {
1389     ierr = MatSetValues(B,ai[i+1]-ai[i],cols,1,&row,array,INSERT_VALUES);CHKERRQ(ierr);
1390     row++; array += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
1391   }
1392   PetscFree(ct);
1393   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1394   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1395   if (matout != PETSC_NULL) {
1396     *matout = B;
1397   } else {
1398     PetscOps       *Abops;
1399     struct _MatOps *Aops;
1400 
1401     /* This isn't really an in-place transpose .... but free data structures from a */
1402     PetscFree(a->rowners);
1403     ierr = MatDestroy(a->A); CHKERRQ(ierr);
1404     ierr = MatDestroy(a->B); CHKERRQ(ierr);
1405     if (a->colmap) PetscFree(a->colmap);
1406     if (a->garray) PetscFree(a->garray);
1407     if (a->lvec) VecDestroy(a->lvec);
1408     if (a->Mvctx) VecScatterDestroy(a->Mvctx);
1409     PetscFree(a);
1410 
1411     /*
1412        This is horrible, horrible code. We need to keep the
1413       A pointers for the bops and ops but copy everything
1414       else from C.
1415     */
1416     Abops = A->bops;
1417     Aops  = A->ops;
1418     PetscMemcpy(A,B,sizeof(struct _p_Mat));
1419     A->bops = Abops;
1420     A->ops  = Aops;
1421     PetscHeaderDestroy(B);
1422   }
1423   PetscFunctionReturn(0);
1424 }
1425 
1426 #undef __FUNC__
1427 #define __FUNC__ "MatDiagonalScale_MPIAIJ"
1428 int MatDiagonalScale_MPIAIJ(Mat mat,Vec ll,Vec rr)
1429 {
1430   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
1431   Mat a = aij->A, b = aij->B;
1432   int ierr,s1,s2,s3;
1433 
1434   PetscFunctionBegin;
1435   ierr = MatGetLocalSize(mat,&s2,&s3); CHKERRQ(ierr);
1436   if (rr) {
1437     ierr = VecGetLocalSize(rr,&s1);CHKERRQ(ierr);
1438     if (s1!=s3) SETERRQ(PETSC_ERR_ARG_SIZ,0,"right vector non-conforming local size");
1439     /* Overlap communication with computation. */
1440     ierr = VecScatterBegin(rr,aij->lvec,INSERT_VALUES,SCATTER_FORWARD,aij->Mvctx); CHKERRQ(ierr);
1441   }
1442   if (ll) {
1443     ierr = VecGetLocalSize(ll,&s1);CHKERRQ(ierr);
1444     if (s1!=s2) SETERRQ(PETSC_ERR_ARG_SIZ,0,"left vector non-conforming local size");
1445     ierr = (*b->ops->diagonalscale)(b,ll,0); CHKERRQ(ierr);
1446   }
1447   /* scale  the diagonal block */
1448   ierr = (*a->ops->diagonalscale)(a,ll,rr); CHKERRQ(ierr);
1449 
1450   if (rr) {
1451     /* Do a scatter end and then right scale the off-diagonal block */
1452     ierr = VecScatterEnd(rr,aij->lvec,INSERT_VALUES,SCATTER_FORWARD,aij->Mvctx); CHKERRQ(ierr);
1453     ierr = (*b->ops->diagonalscale)(b,0,aij->lvec); CHKERRQ(ierr);
1454   }
1455 
1456   PetscFunctionReturn(0);
1457 }
1458 
1459 
1460 extern int MatPrintHelp_SeqAIJ(Mat);
1461 #undef __FUNC__
1462 #define __FUNC__ "MatPrintHelp_MPIAIJ"
1463 int MatPrintHelp_MPIAIJ(Mat A)
1464 {
1465   Mat_MPIAIJ *a   = (Mat_MPIAIJ*) A->data;
1466   int        ierr;
1467 
1468   PetscFunctionBegin;
1469   if (!a->rank) {
1470     ierr = MatPrintHelp_SeqAIJ(a->A);CHKERRQ(ierr);
1471   }
1472   PetscFunctionReturn(0);
1473 }
1474 
1475 #undef __FUNC__
1476 #define __FUNC__ "MatGetBlockSize_MPIAIJ"
1477 int MatGetBlockSize_MPIAIJ(Mat A,int *bs)
1478 {
1479   PetscFunctionBegin;
1480   *bs = 1;
1481   PetscFunctionReturn(0);
1482 }
1483 #undef __FUNC__
1484 #define __FUNC__ "MatSetUnfactored_MPIAIJ"
1485 int MatSetUnfactored_MPIAIJ(Mat A)
1486 {
1487   Mat_MPIAIJ *a   = (Mat_MPIAIJ*) A->data;
1488   int        ierr;
1489 
1490   PetscFunctionBegin;
1491   ierr = MatSetUnfactored(a->A); CHKERRQ(ierr);
1492   PetscFunctionReturn(0);
1493 }
1494 
1495 #undef __FUNC__
1496 #define __FUNC__ "MatEqual_MPIAIJ"
1497 int MatEqual_MPIAIJ(Mat A, Mat B, PetscTruth *flag)
1498 {
1499   Mat_MPIAIJ *matB = (Mat_MPIAIJ *) B->data,*matA = (Mat_MPIAIJ *) A->data;
1500   Mat        a, b, c, d;
1501   PetscTruth flg;
1502   int        ierr;
1503 
1504   PetscFunctionBegin;
1505   if (B->type != MATMPIAIJ) SETERRQ(PETSC_ERR_ARG_INCOMP,0,"Matrices must be same type");
1506   a = matA->A; b = matA->B;
1507   c = matB->A; d = matB->B;
1508 
1509   ierr = MatEqual(a, c, &flg); CHKERRQ(ierr);
1510   if (flg == PETSC_TRUE) {
1511     ierr = MatEqual(b, d, &flg); CHKERRQ(ierr);
1512   }
1513   ierr = MPI_Allreduce(&flg, flag, 1, MPI_INT, MPI_LAND, A->comm);CHKERRQ(ierr);
1514   PetscFunctionReturn(0);
1515 }
1516 
1517 extern int MatConvertSameType_MPIAIJ(Mat,Mat *,int);
1518 extern int MatIncreaseOverlap_MPIAIJ(Mat , int, IS *, int);
1519 extern int MatFDColoringCreate_MPIAIJ(Mat,ISColoring,MatFDColoring);
1520 extern int MatGetSubMatrices_MPIAIJ (Mat ,int , IS *,IS *,MatGetSubMatrixCall,Mat **);
1521 extern int MatGetSubMatrix_MPIAIJ (Mat ,IS,IS,int,MatGetSubMatrixCall,Mat *);
1522 
1523 /* -------------------------------------------------------------------*/
1524 static struct _MatOps MatOps = {MatSetValues_MPIAIJ,
1525        MatGetRow_MPIAIJ,MatRestoreRow_MPIAIJ,
1526        MatMult_MPIAIJ,MatMultAdd_MPIAIJ,
1527        MatMultTrans_MPIAIJ,MatMultTransAdd_MPIAIJ,
1528        0,0,
1529        0,0,
1530        0,0,
1531        MatRelax_MPIAIJ,
1532        MatTranspose_MPIAIJ,
1533        MatGetInfo_MPIAIJ,MatEqual_MPIAIJ,
1534        MatGetDiagonal_MPIAIJ,MatDiagonalScale_MPIAIJ,MatNorm_MPIAIJ,
1535        MatAssemblyBegin_MPIAIJ,MatAssemblyEnd_MPIAIJ,
1536        0,
1537        MatSetOption_MPIAIJ,MatZeroEntries_MPIAIJ,MatZeroRows_MPIAIJ,
1538        0,0,0,0,
1539        MatGetSize_MPIAIJ,MatGetLocalSize_MPIAIJ,MatGetOwnershipRange_MPIAIJ,
1540        0,0,
1541        0,0,MatConvertSameType_MPIAIJ,0,0,
1542        0,0,0,
1543        MatGetSubMatrices_MPIAIJ,MatIncreaseOverlap_MPIAIJ,MatGetValues_MPIAIJ,0,
1544        MatPrintHelp_MPIAIJ,
1545        MatScale_MPIAIJ,0,0,0,
1546        MatGetBlockSize_MPIAIJ,0,0,0,0,
1547        MatFDColoringCreate_MPIAIJ,0,MatSetUnfactored_MPIAIJ,
1548        0,0,MatGetSubMatrix_MPIAIJ};
1549 
1550 
1551 #undef __FUNC__
1552 #define __FUNC__ "MatCreateMPIAIJ"
1553 /*@C
1554    MatCreateMPIAIJ - Creates a sparse parallel matrix in AIJ format
1555    (the default parallel PETSc format).  For good matrix assembly performance
1556    the user should preallocate the matrix storage by setting the parameters
1557    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1558    performance can be increased by more than a factor of 50.
1559 
1560    Collective on MPI_Comm
1561 
1562    Input Parameters:
1563 +  comm - MPI communicator
1564 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1565            This value should be the same as the local size used in creating the
1566            y vector for the matrix-vector product y = Ax.
1567 .  n - This value should be the same as the local size used in creating the
1568        x vector for the matrix-vector product y = Ax. (or PETSC_DECIDE to have
1569        calculated if N is given) For square matrices n is almost always m.
1570 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1571 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1572 .  d_nz - number of nonzeros per row in diagonal portion of local submatrix
1573            (same for all local rows)
1574 .  d_nzz - array containing the number of nonzeros in the various rows of the
1575            diagonal portion of the local submatrix (possibly different for each row)
1576            or PETSC_NULL. You must leave room for the diagonal entry even if
1577            it is zero.
1578 .  o_nz - number of nonzeros per row in the off-diagonal portion of local
1579            submatrix (same for all local rows).
1580 -  o_nzz - array containing the number of nonzeros in the various rows of the
1581            off-diagonal portion of the local submatrix (possibly different for
1582            each row) or PETSC_NULL.
1583 
1584    Output Parameter:
1585 .  A - the matrix
1586 
1587    Notes:
1588    The AIJ format (also called the Yale sparse matrix format or
1589    compressed row storage), is fully compatible with standard Fortran 77
1590    storage.  That is, the stored row and column indices can begin at
1591    either one (as in Fortran) or zero.  See the users manual for details.
1592 
1593    The user MUST specify either the local or global matrix dimensions
1594    (possibly both).
1595 
1596    By default, this format uses inodes (identical nodes) when possible.
1597    We search for consecutive rows with the same nonzero structure, thereby
1598    reusing matrix information to achieve increased efficiency.
1599 
1600    Options Database Keys:
1601 +  -mat_aij_no_inode  - Do not use inodes
1602 .  -mat_aij_inode_limit <limit> - Sets inode limit (max limit=5)
1603 -  -mat_aij_oneindex - Internally use indexing starting at 1
1604         rather than 0.  Note that when calling MatSetValues(),
1605         the user still MUST index entries starting at 0!
1606 
1607    Storage Information:
1608    For a square global matrix we define each processor's diagonal portion
1609    to be its local rows and the corresponding columns (a square submatrix);
1610    each processor's off-diagonal portion encompasses the remainder of the
1611    local matrix (a rectangular submatrix).
1612 
1613    The user can specify preallocated storage for the diagonal part of
1614    the local submatrix with either d_nz or d_nnz (not both).  Set
1615    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1616    memory allocation.  Likewise, specify preallocated storage for the
1617    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1618 
1619    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1620    the figure below we depict these three local rows and all columns (0-11).
1621 
1622 .vb
1623              0 1 2 3 4 5 6 7 8 9 10 11
1624             -------------------
1625      row 3  |  o o o d d d o o o o o o
1626      row 4  |  o o o d d d o o o o o o
1627      row 5  |  o o o d d d o o o o o o
1628             -------------------
1629 .ve
1630 
1631    Thus, any entries in the d locations are stored in the d (diagonal)
1632    submatrix, and any entries in the o locations are stored in the
1633    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1634    stored simply in the MATSEQAIJ format for compressed row storage.
1635 
1636    Now d_nz should indicate the number of nonzeros per row in the d matrix,
1637    and o_nz should indicate the number of nonzeros per row in the o matrix.
1638    In general, for PDE problems in which most nonzeros are near the diagonal,
1639    one expects d_nz >> o_nz. For large problems you MUST preallocate memory
1640    or you will get TERRIBLE performance; see the users' manual chapter on
1641    matrices.
1642 
1643 .keywords: matrix, aij, compressed row, sparse, parallel
1644 
1645 .seealso: MatCreate(), MatCreateSeqAIJ(), MatSetValues()
1646 @*/
1647 int MatCreateMPIAIJ(MPI_Comm comm,int m,int n,int M,int N,int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1648 {
1649   Mat          B;
1650   Mat_MPIAIJ   *b;
1651   int          ierr, i,sum[2],work[2],size;
1652 
1653   PetscFunctionBegin;
1654   MPI_Comm_size(comm,&size);
1655   if (size == 1) {
1656     if (M == PETSC_DECIDE) M = m;
1657     if (N == PETSC_DECIDE) N = n;
1658     ierr = MatCreateSeqAIJ(comm,M,N,d_nz,d_nnz,A); CHKERRQ(ierr);
1659     PetscFunctionReturn(0);
1660   }
1661 
1662   *A = 0;
1663   PetscHeaderCreate(B,_p_Mat,struct _MatOps,MAT_COOKIE,MATMPIAIJ,comm,MatDestroy,MatView);
1664   PLogObjectCreate(B);
1665   B->data       = (void *) (b = PetscNew(Mat_MPIAIJ)); CHKPTRQ(b);
1666   PetscMemzero(b,sizeof(Mat_MPIAIJ));
1667   PetscMemcpy(B->ops,&MatOps,sizeof(struct _MatOps));
1668   B->ops->destroy    = MatDestroy_MPIAIJ;
1669   B->ops->view       = MatView_MPIAIJ;
1670   B->factor     = 0;
1671   B->assembled  = PETSC_FALSE;
1672   B->mapping    = 0;
1673 
1674   B->insertmode = NOT_SET_VALUES;
1675   b->size       = size;
1676   MPI_Comm_rank(comm,&b->rank);
1677 
1678   if (m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL)) {
1679     SETERRQ(PETSC_ERR_ARG_WRONG,0,"Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1680   }
1681 
1682   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1683     work[0] = m; work[1] = n;
1684     ierr = MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );CHKERRQ(ierr);
1685     if (M == PETSC_DECIDE) M = sum[0];
1686     if (N == PETSC_DECIDE) N = sum[1];
1687   }
1688   if (m == PETSC_DECIDE) {m = M/b->size + ((M % b->size) > b->rank);}
1689   if (n == PETSC_DECIDE) {n = N/b->size + ((N % b->size) > b->rank);}
1690   b->m = m; B->m = m;
1691   b->n = n; B->n = n;
1692   b->N = N; B->N = N;
1693   b->M = M; B->M = M;
1694 
1695   /* build local table of row and column ownerships */
1696   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1697   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIAIJ));
1698   b->cowners = b->rowners + b->size + 2;
1699   ierr = MPI_Allgather(&m,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1700   b->rowners[0] = 0;
1701   for ( i=2; i<=b->size; i++ ) {
1702     b->rowners[i] += b->rowners[i-1];
1703   }
1704   b->rstart = b->rowners[b->rank];
1705   b->rend   = b->rowners[b->rank+1];
1706   ierr = MPI_Allgather(&n,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1707   b->cowners[0] = 0;
1708   for ( i=2; i<=b->size; i++ ) {
1709     b->cowners[i] += b->cowners[i-1];
1710   }
1711   b->cstart = b->cowners[b->rank];
1712   b->cend   = b->cowners[b->rank+1];
1713 
1714   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1715   ierr = MatCreateSeqAIJ(PETSC_COMM_SELF,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1716   PLogObjectParent(B,b->A);
1717   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1718   ierr = MatCreateSeqAIJ(PETSC_COMM_SELF,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1719   PLogObjectParent(B,b->B);
1720 
1721   /* build cache for off array entries formed */
1722   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1723   b->donotstash  = 0;
1724   b->colmap      = 0;
1725   b->garray      = 0;
1726   b->roworiented = 1;
1727 
1728   /* stuff used for matrix vector multiply */
1729   b->lvec      = 0;
1730   b->Mvctx     = 0;
1731 
1732   /* stuff for MatGetRow() */
1733   b->rowindices   = 0;
1734   b->rowvalues    = 0;
1735   b->getrowactive = PETSC_FALSE;
1736 
1737   *A = B;
1738   PetscFunctionReturn(0);
1739 }
1740 
1741 #undef __FUNC__
1742 #define __FUNC__ "MatConvertSameType_MPIAIJ"
1743 int MatConvertSameType_MPIAIJ(Mat matin,Mat *newmat,int cpvalues)
1744 {
1745   Mat        mat;
1746   Mat_MPIAIJ *a,*oldmat = (Mat_MPIAIJ *) matin->data;
1747   int        ierr, len=0, flg;
1748 
1749   PetscFunctionBegin;
1750   *newmat       = 0;
1751   PetscHeaderCreate(mat,_p_Mat,struct _MatOps,MAT_COOKIE,MATMPIAIJ,matin->comm,MatDestroy,MatView);
1752   PLogObjectCreate(mat);
1753   mat->data       = (void *) (a = PetscNew(Mat_MPIAIJ)); CHKPTRQ(a);
1754   PetscMemcpy(mat->ops,&MatOps,sizeof(struct _MatOps));
1755   mat->ops->destroy    = MatDestroy_MPIAIJ;
1756   mat->ops->view       = MatView_MPIAIJ;
1757   mat->factor     = matin->factor;
1758   mat->assembled  = PETSC_TRUE;
1759 
1760   a->m = mat->m   = oldmat->m;
1761   a->n = mat->n   = oldmat->n;
1762   a->M = mat->M   = oldmat->M;
1763   a->N = mat->N   = oldmat->N;
1764 
1765   a->rstart       = oldmat->rstart;
1766   a->rend         = oldmat->rend;
1767   a->cstart       = oldmat->cstart;
1768   a->cend         = oldmat->cend;
1769   a->size         = oldmat->size;
1770   a->rank         = oldmat->rank;
1771   mat->insertmode = NOT_SET_VALUES;
1772   a->rowvalues    = 0;
1773   a->getrowactive = PETSC_FALSE;
1774 
1775   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1776   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIAIJ));
1777   a->cowners = a->rowners + a->size + 2;
1778   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1779   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1780   if (oldmat->colmap) {
1781     a->colmap = (int *) PetscMalloc((a->N)*sizeof(int));CHKPTRQ(a->colmap);
1782     PLogObjectMemory(mat,(a->N)*sizeof(int));
1783     PetscMemcpy(a->colmap,oldmat->colmap,(a->N)*sizeof(int));
1784   } else a->colmap = 0;
1785   if (oldmat->garray) {
1786     len = ((Mat_SeqAIJ *) (oldmat->B->data))->n;
1787     a->garray = (int *) PetscMalloc((len+1)*sizeof(int)); CHKPTRQ(a->garray);
1788     PLogObjectMemory(mat,len*sizeof(int));
1789     if (len) PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1790   } else a->garray = 0;
1791 
1792   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1793   PLogObjectParent(mat,a->lvec);
1794   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1795   PLogObjectParent(mat,a->Mvctx);
1796   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1797   PLogObjectParent(mat,a->A);
1798   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1799   PLogObjectParent(mat,a->B);
1800   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1801   if (flg) {
1802     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1803   }
1804   *newmat = mat;
1805   PetscFunctionReturn(0);
1806 }
1807 
1808 #include "sys.h"
1809 
1810 #undef __FUNC__
1811 #define __FUNC__ "MatLoad_MPIAIJ"
1812 int MatLoad_MPIAIJ(Viewer viewer,MatType type,Mat *newmat)
1813 {
1814   Mat          A;
1815   Scalar       *vals,*svals;
1816   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1817   MPI_Status   status;
1818   int          i, nz, ierr, j,rstart, rend, fd;
1819   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,maxnz,*cols;
1820   int          *ourlens,*sndcounts = 0,*procsnz = 0, *offlens,jj,*mycols,*smycols;
1821   int          tag = ((PetscObject)viewer)->tag;
1822 
1823   PetscFunctionBegin;
1824   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1825   if (!rank) {
1826     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1827     ierr = PetscBinaryRead(fd,(char *)header,4,PETSC_INT); CHKERRQ(ierr);
1828     if (header[0] != MAT_COOKIE) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"not matrix object");
1829     if (header[3] < 0) {
1830       SETERRQ(PETSC_ERR_FILE_UNEXPECTED,1,"Matrix in special format on disk, cannot load as MPIAIJ");
1831     }
1832   }
1833 
1834 
1835   ierr = MPI_Bcast(header+1,3,MPI_INT,0,comm);CHKERRQ(ierr);
1836   M = header[1]; N = header[2];
1837   /* determine ownership of all rows */
1838   m = M/size + ((M % size) > rank);
1839   rowners = (int *) PetscMalloc((size+2)*sizeof(int)); CHKPTRQ(rowners);
1840   ierr = MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1841   rowners[0] = 0;
1842   for ( i=2; i<=size; i++ ) {
1843     rowners[i] += rowners[i-1];
1844   }
1845   rstart = rowners[rank];
1846   rend   = rowners[rank+1];
1847 
1848   /* distribute row lengths to all processors */
1849   ourlens = (int*) PetscMalloc( 2*(rend-rstart)*sizeof(int) ); CHKPTRQ(ourlens);
1850   offlens = ourlens + (rend-rstart);
1851   if (!rank) {
1852     rowlengths = (int*) PetscMalloc( M*sizeof(int) ); CHKPTRQ(rowlengths);
1853     ierr = PetscBinaryRead(fd,rowlengths,M,PETSC_INT); CHKERRQ(ierr);
1854     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1855     for ( i=0; i<size; i++ ) sndcounts[i] = rowners[i+1] - rowners[i];
1856     ierr = MPI_Scatterv(rowlengths,sndcounts,rowners,MPI_INT,ourlens,rend-rstart,MPI_INT,0,comm);CHKERRQ(ierr);
1857     PetscFree(sndcounts);
1858   } else {
1859     ierr = MPI_Scatterv(0,0,0,MPI_INT,ourlens,rend-rstart,MPI_INT, 0,comm);CHKERRQ(ierr);
1860   }
1861 
1862   if (!rank) {
1863     /* calculate the number of nonzeros on each processor */
1864     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1865     PetscMemzero(procsnz,size*sizeof(int));
1866     for ( i=0; i<size; i++ ) {
1867       for ( j=rowners[i]; j< rowners[i+1]; j++ ) {
1868         procsnz[i] += rowlengths[j];
1869       }
1870     }
1871     PetscFree(rowlengths);
1872 
1873     /* determine max buffer needed and allocate it */
1874     maxnz = 0;
1875     for ( i=0; i<size; i++ ) {
1876       maxnz = PetscMax(maxnz,procsnz[i]);
1877     }
1878     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1879 
1880     /* read in my part of the matrix column indices  */
1881     nz     = procsnz[0];
1882     mycols = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1883     ierr   = PetscBinaryRead(fd,mycols,nz,PETSC_INT); CHKERRQ(ierr);
1884 
1885     /* read in every one elses and ship off */
1886     for ( i=1; i<size; i++ ) {
1887       nz   = procsnz[i];
1888       ierr = PetscBinaryRead(fd,cols,nz,PETSC_INT); CHKERRQ(ierr);
1889       ierr = MPI_Send(cols,nz,MPI_INT,i,tag,comm);CHKERRQ(ierr);
1890     }
1891     PetscFree(cols);
1892   } else {
1893     /* determine buffer space needed for message */
1894     nz = 0;
1895     for ( i=0; i<m; i++ ) {
1896       nz += ourlens[i];
1897     }
1898     mycols = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1899 
1900     /* receive message of column indices*/
1901     ierr = MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);CHKERRQ(ierr);
1902     ierr = MPI_Get_count(&status,MPI_INT,&maxnz);CHKERRQ(ierr);
1903     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"something is wrong with file");
1904   }
1905 
1906   /* loop over local rows, determining number of off diagonal entries */
1907   PetscMemzero(offlens,m*sizeof(int));
1908   jj = 0;
1909   for ( i=0; i<m; i++ ) {
1910     for ( j=0; j<ourlens[i]; j++ ) {
1911       if (mycols[jj] < rstart || mycols[jj] >= rend) offlens[i]++;
1912       jj++;
1913     }
1914   }
1915 
1916   /* create our matrix */
1917   for ( i=0; i<m; i++ ) {
1918     ourlens[i] -= offlens[i];
1919   }
1920   ierr = MatCreateMPIAIJ(comm,m,PETSC_DECIDE,M,N,0,ourlens,0,offlens,newmat);CHKERRQ(ierr);
1921   A = *newmat;
1922   MatSetOption(A,MAT_COLUMNS_SORTED);
1923   for ( i=0; i<m; i++ ) {
1924     ourlens[i] += offlens[i];
1925   }
1926 
1927   if (!rank) {
1928     vals = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(vals);
1929 
1930     /* read in my part of the matrix numerical values  */
1931     nz = procsnz[0];
1932     ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
1933 
1934     /* insert into matrix */
1935     jj      = rstart;
1936     smycols = mycols;
1937     svals   = vals;
1938     for ( i=0; i<m; i++ ) {
1939       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1940       smycols += ourlens[i];
1941       svals   += ourlens[i];
1942       jj++;
1943     }
1944 
1945     /* read in other processors and ship out */
1946     for ( i=1; i<size; i++ ) {
1947       nz   = procsnz[i];
1948       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
1949       ierr = MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);CHKERRQ(ierr);
1950     }
1951     PetscFree(procsnz);
1952   } else {
1953     /* receive numeric values */
1954     vals = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(vals);
1955 
1956     /* receive message of values*/
1957     ierr = MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);CHKERRQ(ierr);
1958     ierr = MPI_Get_count(&status,MPIU_SCALAR,&maxnz);CHKERRQ(ierr);
1959     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"something is wrong with file");
1960 
1961     /* insert into matrix */
1962     jj      = rstart;
1963     smycols = mycols;
1964     svals   = vals;
1965     for ( i=0; i<m; i++ ) {
1966       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1967       smycols += ourlens[i];
1968       svals   += ourlens[i];
1969       jj++;
1970     }
1971   }
1972   PetscFree(ourlens); PetscFree(vals); PetscFree(mycols); PetscFree(rowners);
1973 
1974   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1975   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1976   PetscFunctionReturn(0);
1977 }
1978 
1979 #undef __FUNC__
1980 #define __FUNC__ "MatGetSubMatrix_MPIAIJ"
1981 /*
1982     Not great since it makes two copies of the submatrix, first an SeqAIJ
1983   in local and then by concatenating the local matrices the end result.
1984   Writing it directly would be much like MatGetSubMatrices_MPIAIJ()
1985 */
1986 int MatGetSubMatrix_MPIAIJ(Mat mat,IS isrow,IS iscol,int csize,MatGetSubMatrixCall call,Mat *newmat)
1987 {
1988   int        ierr, i, m,n,rstart,row,rend,nz,*cwork,size,rank,j;
1989   Mat        *local,M, Mreuse;
1990   Scalar     *vwork,*aa;
1991   MPI_Comm   comm = mat->comm;
1992   Mat_SeqAIJ *aij;
1993   int        *ii, *jj,nlocal,*dlens,*olens,dlen,olen,jend;
1994 
1995   PetscFunctionBegin;
1996   MPI_Comm_rank(comm,&rank);
1997   MPI_Comm_size(comm,&size);
1998 
1999   if (call ==  MAT_REUSE_MATRIX) {
2000     ierr = PetscObjectQuery((PetscObject)*newmat,"SubMatrix",(PetscObject *)&Mreuse);CHKERRQ(ierr);
2001     if (!Mreuse) SETERRQ(1,1,"Submatrix passed in was not used before, cannot reuse");
2002     local = &Mreuse;
2003     ierr  = MatGetSubMatrices(mat,1,&isrow,&iscol,MAT_REUSE_MATRIX,&local);CHKERRQ(ierr);
2004   } else {
2005     ierr = MatGetSubMatrices(mat,1,&isrow,&iscol,MAT_INITIAL_MATRIX,&local);CHKERRQ(ierr);
2006     Mreuse = *local;
2007     PetscFree(local);
2008   }
2009 
2010   /*
2011       m - number of local rows
2012       n - number of columns (same on all processors)
2013       rstart - first row in new global matrix generated
2014   */
2015   ierr = MatGetSize(Mreuse,&m,&n);CHKERRQ(ierr);
2016   if (call == MAT_INITIAL_MATRIX) {
2017     aij = (Mat_SeqAIJ *) (Mreuse)->data;
2018     if (aij->indexshift) SETERRQ(PETSC_ERR_SUP,1,"No support for index shifted matrix");
2019     ii  = aij->i;
2020     jj  = aij->j;
2021 
2022     /*
2023         Determine the number of non-zeros in the diagonal and off-diagonal
2024         portions of the matrix in order to do correct preallocation
2025     */
2026 
2027     /* first get start and end of "diagonal" columns */
2028     if (csize == PETSC_DECIDE) {
2029       nlocal = n/size + ((n % size) > rank);
2030     } else {
2031       nlocal = csize;
2032     }
2033     ierr   = MPI_Scan(&nlocal,&rend,1,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
2034     rstart = rend - nlocal;
2035     if (rank == size - 1 && rend != n) {
2036       SETERRQ(1,1,"Local column sizes do not add up to total number of columns");
2037     }
2038 
2039     /* next, compute all the lengths */
2040     dlens = (int *) PetscMalloc( (2*m+1)*sizeof(int) );CHKPTRQ(dlens);
2041     olens = dlens + m;
2042     for ( i=0; i<m; i++ ) {
2043       jend = ii[i+1] - ii[i];
2044       olen = 0;
2045       dlen = 0;
2046       for ( j=0; j<jend; j++ ) {
2047         if ( *jj < rstart || *jj >= rend) olen++;
2048         else dlen++;
2049         jj++;
2050       }
2051       olens[i] = olen;
2052       dlens[i] = dlen;
2053     }
2054     ierr = MatCreateMPIAIJ(comm,m,PETSC_DECIDE,PETSC_DECIDE,n,0,dlens,0,olens,&M);CHKERRQ(ierr);
2055     PetscFree(dlens);
2056   } else {
2057     int ml,nl;
2058 
2059     M = *newmat;
2060     ierr = MatGetLocalSize(M,&ml,&nl);CHKERRQ(ierr);
2061     if (ml != m) SETERRQ(PETSC_ERR_ARG_SIZ,1,"Previous matrix must be same size/layout as request");
2062     ierr = MatZeroEntries(M);CHKERRQ(ierr);
2063     /*
2064          The next two lines are needed so we may call MatSetValues_MPIAIJ() below directly,
2065        rather than the slower MatSetValues().
2066     */
2067     M->was_assembled = PETSC_TRUE;
2068     M->assembled     = PETSC_FALSE;
2069   }
2070   ierr = MatGetOwnershipRange(M,&rstart,&rend); CHKERRQ(ierr);
2071   aij = (Mat_SeqAIJ *) (Mreuse)->data;
2072   if (aij->indexshift) SETERRQ(PETSC_ERR_SUP,1,"No support for index shifted matrix");
2073   ii  = aij->i;
2074   jj  = aij->j;
2075   aa  = aij->a;
2076   for (i=0; i<m; i++) {
2077     row   = rstart + i;
2078     nz    = ii[i+1] - ii[i];
2079     cwork = jj;     jj += nz;
2080     vwork = aa;     aa += nz;
2081     ierr = MatSetValues_MPIAIJ(M,1,&row,nz,cwork,vwork,INSERT_VALUES); CHKERRQ(ierr);
2082   }
2083 
2084   ierr = MatAssemblyBegin(M,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2085   ierr = MatAssemblyEnd(M,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2086   *newmat = M;
2087 
2088   /* save submatrix used in processor for next request */
2089   if (call ==  MAT_INITIAL_MATRIX) {
2090     ierr = PetscObjectCompose((PetscObject)M,"SubMatrix",(PetscObject)Mreuse);CHKERRQ(ierr);
2091     ierr = PetscObjectDereference((PetscObject)Mreuse);CHKERRQ(ierr);
2092   }
2093 
2094   PetscFunctionReturn(0);
2095 }
2096