xref: /petsc/src/mat/impls/aij/mpi/mpiaij.c (revision e12933850baf6ee5ed30d09cb860fcc862df920c)
1 
2 #ifdef PETSC_RCS_HEADER
3 static char vcid[] = "$Id: mpiaij.c,v 1.260 1998/08/25 19:52:04 bsmith Exp bsmith $";
4 #endif
5 
6 #include "pinclude/pviewer.h"
7 #include "src/mat/impls/aij/mpi/mpiaij.h"
8 #include "src/vec/vecimpl.h"
9 #include "src/inline/spops.h"
10 
11 /* local utility routine that creates a mapping from the global column
12 number to the local number in the off-diagonal part of the local
13 storage of the matrix.  This is done in a non scable way since the
14 length of colmap equals the global matrix length.
15 */
16 #undef __FUNC__
17 #define __FUNC__ "CreateColmap_MPIAIJ_Private"
18 int CreateColmap_MPIAIJ_Private(Mat mat)
19 {
20   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
21   Mat_SeqAIJ *B = (Mat_SeqAIJ*) aij->B->data;
22   int        n = B->n,i;
23 
24   PetscFunctionBegin;
25   aij->colmap = (int *) PetscMalloc((aij->N+1)*sizeof(int));CHKPTRQ(aij->colmap);
26   PLogObjectMemory(mat,aij->N*sizeof(int));
27   PetscMemzero(aij->colmap,aij->N*sizeof(int));
28   for ( i=0; i<n; i++ ) aij->colmap[aij->garray[i]] = i+1;
29   PetscFunctionReturn(0);
30 }
31 
32 extern int DisAssemble_MPIAIJ(Mat);
33 
34 #define CHUNKSIZE   15
35 #define MatSetValues_SeqAIJ_A_Private(row,col,value,addv) \
36 { \
37  \
38     rp   = aj + ai[row] + shift; ap = aa + ai[row] + shift; \
39     rmax = aimax[row]; nrow = ailen[row];  \
40     col1 = col - shift; \
41      \
42     low = 0; high = nrow; \
43     while (high-low > 5) { \
44       t = (low+high)/2; \
45       if (rp[t] > col) high = t; \
46       else             low  = t; \
47     } \
48       for ( _i=0; _i<nrow; _i++ ) { \
49         if (rp[_i] > col1) break; \
50         if (rp[_i] == col1) { \
51           if (addv == ADD_VALUES) ap[_i] += value;   \
52           else                  ap[_i] = value; \
53           goto a_noinsert; \
54         } \
55       }  \
56       if (nonew == 1) goto a_noinsert; \
57       else if (nonew == -1) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero into matrix"); \
58       if (nrow >= rmax) { \
59         /* there is no extra room in row, therefore enlarge */ \
60         int    new_nz = ai[a->m] + CHUNKSIZE,len,*new_i,*new_j; \
61         Scalar *new_a; \
62  \
63         if (nonew == -2) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero in the matrix"); \
64  \
65         /* malloc new storage space */ \
66         len     = new_nz*(sizeof(int)+sizeof(Scalar))+(a->m+1)*sizeof(int); \
67         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
68         new_j   = (int *) (new_a + new_nz); \
69         new_i   = new_j + new_nz; \
70  \
71         /* copy over old data into new slots */ \
72         for ( ii=0; ii<row+1; ii++ ) {new_i[ii] = ai[ii];} \
73         for ( ii=row+1; ii<a->m+1; ii++ ) {new_i[ii] = ai[ii]+CHUNKSIZE;} \
74         PetscMemcpy(new_j,aj,(ai[row]+nrow+shift)*sizeof(int)); \
75         len = (new_nz - CHUNKSIZE - ai[row] - nrow - shift); \
76         PetscMemcpy(new_j+ai[row]+shift+nrow+CHUNKSIZE,aj+ai[row]+shift+nrow, \
77                                                            len*sizeof(int)); \
78         PetscMemcpy(new_a,aa,(ai[row]+nrow+shift)*sizeof(Scalar)); \
79         PetscMemcpy(new_a+ai[row]+shift+nrow+CHUNKSIZE,aa+ai[row]+shift+nrow, \
80                                                            len*sizeof(Scalar));  \
81         /* free up old matrix storage */ \
82  \
83         PetscFree(a->a);  \
84         if (!a->singlemalloc) {PetscFree(a->i);PetscFree(a->j);} \
85         aa = a->a = new_a; ai = a->i = new_i; aj = a->j = new_j;  \
86         a->singlemalloc = 1; \
87  \
88         rp   = aj + ai[row] + shift; ap = aa + ai[row] + shift; \
89         rmax = aimax[row] = aimax[row] + CHUNKSIZE; \
90         PLogObjectMemory(A,CHUNKSIZE*(sizeof(int) + sizeof(Scalar))); \
91         a->maxnz += CHUNKSIZE; \
92         a->reallocs++; \
93       } \
94       N = nrow++ - 1; a->nz++; \
95       /* shift up all the later entries in this row */ \
96       for ( ii=N; ii>=_i; ii-- ) { \
97         rp[ii+1] = rp[ii]; \
98         ap[ii+1] = ap[ii]; \
99       } \
100       rp[_i] = col1;  \
101       ap[_i] = value;  \
102       a_noinsert: ; \
103       ailen[row] = nrow; \
104 }
105 
106 #define MatSetValues_SeqAIJ_B_Private(row,col,value,addv) \
107 { \
108  \
109     rp   = bj + bi[row] + shift; ap = ba + bi[row] + shift; \
110     rmax = bimax[row]; nrow = bilen[row];  \
111     col1 = col - shift; \
112      \
113     low = 0; high = nrow; \
114     while (high-low > 5) { \
115       t = (low+high)/2; \
116       if (rp[t] > col) high = t; \
117       else             low  = t; \
118     } \
119        for ( _i=0; _i<nrow; _i++ ) { \
120         if (rp[_i] > col1) break; \
121         if (rp[_i] == col1) { \
122           if (addv == ADD_VALUES) ap[_i] += value;   \
123           else                  ap[_i] = value; \
124           goto b_noinsert; \
125         } \
126       }  \
127       if (nonew == 1) goto b_noinsert; \
128       else if (nonew == -1) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero into matrix"); \
129       if (nrow >= rmax) { \
130         /* there is no extra room in row, therefore enlarge */ \
131         int    new_nz = bi[b->m] + CHUNKSIZE,len,*new_i,*new_j; \
132         Scalar *new_a; \
133  \
134         if (nonew == -2) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero in the matrix"); \
135  \
136         /* malloc new storage space */ \
137         len     = new_nz*(sizeof(int)+sizeof(Scalar))+(b->m+1)*sizeof(int); \
138         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
139         new_j   = (int *) (new_a + new_nz); \
140         new_i   = new_j + new_nz; \
141  \
142         /* copy over old data into new slots */ \
143         for ( ii=0; ii<row+1; ii++ ) {new_i[ii] = bi[ii];} \
144         for ( ii=row+1; ii<b->m+1; ii++ ) {new_i[ii] = bi[ii]+CHUNKSIZE;} \
145         PetscMemcpy(new_j,bj,(bi[row]+nrow+shift)*sizeof(int)); \
146         len = (new_nz - CHUNKSIZE - bi[row] - nrow - shift); \
147         PetscMemcpy(new_j+bi[row]+shift+nrow+CHUNKSIZE,bj+bi[row]+shift+nrow, \
148                                                            len*sizeof(int)); \
149         PetscMemcpy(new_a,ba,(bi[row]+nrow+shift)*sizeof(Scalar)); \
150         PetscMemcpy(new_a+bi[row]+shift+nrow+CHUNKSIZE,ba+bi[row]+shift+nrow, \
151                                                            len*sizeof(Scalar));  \
152         /* free up old matrix storage */ \
153  \
154         PetscFree(b->a);  \
155         if (!b->singlemalloc) {PetscFree(b->i);PetscFree(b->j);} \
156         ba = b->a = new_a; bi = b->i = new_i; bj = b->j = new_j;  \
157         b->singlemalloc = 1; \
158  \
159         rp   = bj + bi[row] + shift; ap = ba + bi[row] + shift; \
160         rmax = bimax[row] = bimax[row] + CHUNKSIZE; \
161         PLogObjectMemory(B,CHUNKSIZE*(sizeof(int) + sizeof(Scalar))); \
162         b->maxnz += CHUNKSIZE; \
163         b->reallocs++; \
164       } \
165       N = nrow++ - 1; b->nz++; \
166       /* shift up all the later entries in this row */ \
167       for ( ii=N; ii>=_i; ii-- ) { \
168         rp[ii+1] = rp[ii]; \
169         ap[ii+1] = ap[ii]; \
170       } \
171       rp[_i] = col1;  \
172       ap[_i] = value;  \
173       b_noinsert: ; \
174       bilen[row] = nrow; \
175 }
176 
177 extern int MatSetValues_SeqAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
178 #undef __FUNC__
179 #define __FUNC__ "MatSetValues_MPIAIJ"
180 int MatSetValues_MPIAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
181 {
182   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
183   Scalar     value;
184   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
185   int        cstart = aij->cstart, cend = aij->cend,row,col;
186   int        roworiented = aij->roworiented;
187 
188   /* Some Variables required in the macro */
189   Mat        A = aij->A;
190   Mat_SeqAIJ *a = (Mat_SeqAIJ *) A->data;
191   int        *aimax = a->imax, *ai = a->i, *ailen = a->ilen,*aj = a->j;
192   Scalar     *aa = a->a;
193 
194   Mat        B = aij->B;
195   Mat_SeqAIJ *b = (Mat_SeqAIJ *) B->data;
196   int        *bimax = b->imax, *bi = b->i, *bilen = b->ilen,*bj = b->j;
197   Scalar     *ba = b->a;
198 
199   int        *rp,ii,nrow,_i,rmax, N, col1,low,high,t;
200   int        nonew = a->nonew,shift = a->indexshift;
201   Scalar     *ap;
202 
203   PetscFunctionBegin;
204   for ( i=0; i<m; i++ ) {
205 #if defined(USE_PETSC_BOPT_g)
206     if (im[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
207     if (im[i] >= aij->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
208 #endif
209     if (im[i] >= rstart && im[i] < rend) {
210       row = im[i] - rstart;
211       for ( j=0; j<n; j++ ) {
212         if (in[j] >= cstart && in[j] < cend){
213           col = in[j] - cstart;
214           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
215           MatSetValues_SeqAIJ_A_Private(row,col,value,addv);
216           /* ierr = MatSetValues_SeqAIJ(aij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
217         }
218 #if defined(USE_PETSC_BOPT_g)
219         else if (in[j] < 0) {SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative column");}
220         else if (in[j] >= aij->N) {SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Column too large");}
221 #endif
222         else {
223           if (mat->was_assembled) {
224             if (!aij->colmap) {
225               ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
226             }
227             col = aij->colmap[in[j]] - 1;
228             if (col < 0 && !((Mat_SeqAIJ*)(aij->A->data))->nonew) {
229               ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
230               col =  in[j];
231               /* Reinitialize the variables required by MatSetValues_SeqAIJ_B_Private() */
232               B = aij->B;
233               b = (Mat_SeqAIJ *) B->data;
234               bimax = b->imax; bi = b->i; bilen = b->ilen; bj = b->j;
235               ba = b->a;
236             }
237           } else col = in[j];
238           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
239           MatSetValues_SeqAIJ_B_Private(row,col,value,addv);
240           /* ierr = MatSetValues_SeqAIJ(aij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
241         }
242       }
243     }
244     else {
245       if (roworiented && !aij->donotstash) {
246         ierr = StashValues_Private(&aij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
247       }
248       else {
249         if (!aij->donotstash) {
250           row = im[i];
251           for ( j=0; j<n; j++ ) {
252             ierr = StashValues_Private(&aij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
253           }
254         }
255       }
256     }
257   }
258   PetscFunctionReturn(0);
259 }
260 
261 #undef __FUNC__
262 #define __FUNC__ "MatGetValues_MPIAIJ"
263 int MatGetValues_MPIAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
264 {
265   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
266   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
267   int        cstart = aij->cstart, cend = aij->cend,row,col;
268 
269   PetscFunctionBegin;
270   for ( i=0; i<m; i++ ) {
271     if (idxm[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
272     if (idxm[i] >= aij->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
273     if (idxm[i] >= rstart && idxm[i] < rend) {
274       row = idxm[i] - rstart;
275       for ( j=0; j<n; j++ ) {
276         if (idxn[j] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative column");
277         if (idxn[j] >= aij->N) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Column too large");
278         if (idxn[j] >= cstart && idxn[j] < cend){
279           col = idxn[j] - cstart;
280           ierr = MatGetValues(aij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
281         } else {
282           if (!aij->colmap) {
283             ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
284           }
285           col = aij->colmap[idxn[j]] - 1;
286           if ((col < 0) || (aij->garray[col] != idxn[j])) *(v+i*n+j) = 0.0;
287           else {
288             ierr = MatGetValues(aij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
289           }
290         }
291       }
292     } else {
293       SETERRQ(PETSC_ERR_SUP,0,"Only local values currently supported");
294     }
295   }
296   PetscFunctionReturn(0);
297 }
298 
299 #undef __FUNC__
300 #define __FUNC__ "MatAssemblyBegin_MPIAIJ"
301 int MatAssemblyBegin_MPIAIJ(Mat mat,MatAssemblyType mode)
302 {
303   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
304   MPI_Comm    comm = mat->comm;
305   int         size = aij->size, *owners = aij->rowners;
306   int         rank = aij->rank,tag = mat->tag, *owner,*starts,count,ierr;
307   MPI_Request *send_waits,*recv_waits;
308   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
309   InsertMode  addv;
310   Scalar      *rvalues,*svalues;
311 
312   PetscFunctionBegin;
313   if (aij->donotstash) {
314     aij->svalues    = 0; aij->rvalues    = 0;
315     aij->nsends     = 0; aij->nrecvs     = 0;
316     aij->send_waits = 0; aij->recv_waits = 0;
317     aij->rmax       = 0;
318     PetscFunctionReturn(0);
319   }
320 
321   /* make sure all processors are either in INSERTMODE or ADDMODE */
322   ierr = MPI_Allreduce(&mat->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);CHKERRQ(ierr);
323   if (addv == (ADD_VALUES|INSERT_VALUES)) {
324     SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"Some processors inserted others added");
325   }
326   mat->insertmode = addv; /* in case this processor had no cache */
327 
328   /*  first count number of contributors to each processor */
329   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
330   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
331   owner = (int *) PetscMalloc( (aij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
332   for ( i=0; i<aij->stash.n; i++ ) {
333     idx = aij->stash.idx[i];
334     for ( j=0; j<size; j++ ) {
335       if (idx >= owners[j] && idx < owners[j+1]) {
336         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
337       }
338     }
339   }
340   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
341 
342   /* inform other processors of number of messages and max length*/
343   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
344   ierr = MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
345   nreceives = work[rank];
346   ierr = MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);CHKERRQ(ierr);
347   nmax = work[rank];
348   PetscFree(work);
349 
350   /* post receives:
351        1) each message will consist of ordered pairs
352      (global index,value) we store the global index as a double
353      to simplify the message passing.
354        2) since we don't know how long each individual message is we
355      allocate the largest needed buffer for each receive. Potentially
356      this is a lot of wasted space.
357 
358 
359        This could be done better.
360   */
361   rvalues    = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));CHKPTRQ(rvalues);
362   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
363   for ( i=0; i<nreceives; i++ ) {
364     ierr = MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
365               comm,recv_waits+i);CHKERRQ(ierr);
366   }
367 
368   /* do sends:
369       1) starts[i] gives the starting index in svalues for stuff going to
370          the ith processor
371   */
372   svalues    = (Scalar *) PetscMalloc(3*(aij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
373   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
374   starts     = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
375   starts[0]  = 0;
376   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
377   for ( i=0; i<aij->stash.n; i++ ) {
378     svalues[3*starts[owner[i]]]       = (Scalar)  aij->stash.idx[i];
379     svalues[3*starts[owner[i]]+1]     = (Scalar)  aij->stash.idy[i];
380     svalues[3*(starts[owner[i]]++)+2] =  aij->stash.array[i];
381   }
382   PetscFree(owner);
383   starts[0] = 0;
384   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
385   count = 0;
386   for ( i=0; i<size; i++ ) {
387     if (procs[i]) {
388       ierr = MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,comm,send_waits+count++);CHKERRQ(ierr);
389     }
390   }
391   PetscFree(starts);
392   PetscFree(nprocs);
393 
394   /* Free cache space */
395   PLogInfo(aij->A,"MatAssemblyBegin_MPIAIJ:Number of off-processor values %d\n",aij->stash.n);
396   ierr = StashDestroy_Private(&aij->stash); CHKERRQ(ierr);
397 
398   aij->svalues    = svalues;    aij->rvalues    = rvalues;
399   aij->nsends     = nsends;     aij->nrecvs     = nreceives;
400   aij->send_waits = send_waits; aij->recv_waits = recv_waits;
401   aij->rmax       = nmax;
402 
403   PetscFunctionReturn(0);
404 }
405 extern int MatSetUpMultiply_MPIAIJ(Mat);
406 
407 #undef __FUNC__
408 #define __FUNC__ "MatAssemblyEnd_MPIAIJ"
409 int MatAssemblyEnd_MPIAIJ(Mat mat,MatAssemblyType mode)
410 {
411   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
412   MPI_Status  *send_status,recv_status;
413   int         imdex,nrecvs = aij->nrecvs, count = nrecvs, i, n, ierr;
414   int         row,col,other_disassembled;
415   Scalar      *values,val;
416   InsertMode  addv = mat->insertmode;
417 
418   PetscFunctionBegin;
419 
420   /*  wait on receives */
421   while (count) {
422     ierr = MPI_Waitany(nrecvs,aij->recv_waits,&imdex,&recv_status);CHKERRQ(ierr);
423     /* unpack receives into our local space */
424     values = aij->rvalues + 3*imdex*aij->rmax;
425     ierr = MPI_Get_count(&recv_status,MPIU_SCALAR,&n);CHKERRQ(ierr);
426     n = n/3;
427     for ( i=0; i<n; i++ ) {
428       row = (int) PetscReal(values[3*i]) - aij->rstart;
429       col = (int) PetscReal(values[3*i+1]);
430       val = values[3*i+2];
431       if (col >= aij->cstart && col < aij->cend) {
432         col -= aij->cstart;
433         ierr = MatSetValues(aij->A,1,&row,1,&col,&val,addv); CHKERRQ(ierr);
434       } else {
435         if (mat->was_assembled || mat->assembled) {
436           if (!aij->colmap) {
437             ierr = CreateColmap_MPIAIJ_Private(mat); CHKERRQ(ierr);
438           }
439           col = aij->colmap[col] - 1;
440           if (col < 0  && !((Mat_SeqAIJ*)(aij->A->data))->nonew) {
441             ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
442             col = (int) PetscReal(values[3*i+1]);
443           }
444         }
445         ierr = MatSetValues(aij->B,1,&row,1,&col,&val,addv); CHKERRQ(ierr);
446       }
447     }
448     count--;
449   }
450   if (aij->recv_waits) PetscFree(aij->recv_waits);
451   if (aij->rvalues)    PetscFree(aij->rvalues);
452 
453   /* wait on sends */
454   if (aij->nsends) {
455     send_status = (MPI_Status *) PetscMalloc(aij->nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
456     ierr        = MPI_Waitall(aij->nsends,aij->send_waits,send_status);CHKERRQ(ierr);
457     PetscFree(send_status);
458   }
459   if (aij->send_waits) PetscFree(aij->send_waits);
460   if (aij->svalues)    PetscFree(aij->svalues);
461 
462   ierr = MatAssemblyBegin(aij->A,mode); CHKERRQ(ierr);
463   ierr = MatAssemblyEnd(aij->A,mode); CHKERRQ(ierr);
464 
465   /* determine if any processor has disassembled, if so we must
466      also disassemble ourselfs, in order that we may reassemble. */
467   /*
468      if nonzero structure of submatrix B cannot change then we know that
469      no processor disassembled thus we can skip this stuff
470   */
471   if (!((Mat_SeqAIJ*) aij->B->data)->nonew)  {
472     ierr = MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);CHKERRQ(ierr);
473     if (mat->was_assembled && !other_disassembled) {
474       ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
475     }
476   }
477 
478   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
479     ierr = MatSetUpMultiply_MPIAIJ(mat); CHKERRQ(ierr);
480   }
481   ierr = MatAssemblyBegin(aij->B,mode); CHKERRQ(ierr);
482   ierr = MatAssemblyEnd(aij->B,mode); CHKERRQ(ierr);
483 
484   if (aij->rowvalues) {PetscFree(aij->rowvalues); aij->rowvalues = 0;}
485   PetscFunctionReturn(0);
486 }
487 
488 #undef __FUNC__
489 #define __FUNC__ "MatZeroEntries_MPIAIJ"
490 int MatZeroEntries_MPIAIJ(Mat A)
491 {
492   Mat_MPIAIJ *l = (Mat_MPIAIJ *) A->data;
493   int        ierr;
494 
495   PetscFunctionBegin;
496   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
497   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
498   PetscFunctionReturn(0);
499 }
500 
501 #undef __FUNC__
502 #define __FUNC__ "MatZeroRows_MPIAIJ"
503 int MatZeroRows_MPIAIJ(Mat A,IS is,Scalar *diag)
504 {
505   Mat_MPIAIJ     *l = (Mat_MPIAIJ *) A->data;
506   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
507   int            *procs,*nprocs,j,found,idx,nsends,*work,row;
508   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
509   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
510   int            *lens,imdex,*lrows,*values,rstart=l->rstart;
511   MPI_Comm       comm = A->comm;
512   MPI_Request    *send_waits,*recv_waits;
513   MPI_Status     recv_status,*send_status;
514   IS             istmp;
515   PetscTruth     localdiag;
516 
517   PetscFunctionBegin;
518   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
519   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
520 
521   /*  first count number of contributors to each processor */
522   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
523   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
524   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
525   for ( i=0; i<N; i++ ) {
526     idx = rows[i];
527     found = 0;
528     for ( j=0; j<size; j++ ) {
529       if (idx >= owners[j] && idx < owners[j+1]) {
530         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
531       }
532     }
533     if (!found) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Index out of range");
534   }
535   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
536 
537   /* inform other processors of number of messages and max length*/
538   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
539   ierr = MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
540   nrecvs = work[rank];
541   ierr = MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);CHKERRQ(ierr);
542   nmax = work[rank];
543   PetscFree(work);
544 
545   /* post receives:   */
546   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int));CHKPTRQ(rvalues);
547   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
548   for ( i=0; i<nrecvs; i++ ) {
549     ierr = MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);CHKERRQ(ierr);
550   }
551 
552   /* do sends:
553       1) starts[i] gives the starting index in svalues for stuff going to
554          the ith processor
555   */
556   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
557   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
558   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
559   starts[0] = 0;
560   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
561   for ( i=0; i<N; i++ ) {
562     svalues[starts[owner[i]]++] = rows[i];
563   }
564   ISRestoreIndices(is,&rows);
565 
566   starts[0] = 0;
567   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
568   count = 0;
569   for ( i=0; i<size; i++ ) {
570     if (procs[i]) {
571       ierr = MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);CHKERRQ(ierr);
572     }
573   }
574   PetscFree(starts);
575 
576   base = owners[rank];
577 
578   /*  wait on receives */
579   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
580   source = lens + nrecvs;
581   count  = nrecvs; slen = 0;
582   while (count) {
583     ierr = MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);CHKERRQ(ierr);
584     /* unpack receives into our local space */
585     ierr = MPI_Get_count(&recv_status,MPI_INT,&n);CHKERRQ(ierr);
586     source[imdex]  = recv_status.MPI_SOURCE;
587     lens[imdex]  = n;
588     slen += n;
589     count--;
590   }
591   PetscFree(recv_waits);
592 
593   /* move the data into the send scatter */
594   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
595   count = 0;
596   for ( i=0; i<nrecvs; i++ ) {
597     values = rvalues + i*nmax;
598     for ( j=0; j<lens[i]; j++ ) {
599       lrows[count++] = values[j] - base;
600     }
601   }
602   PetscFree(rvalues); PetscFree(lens);
603   PetscFree(owner); PetscFree(nprocs);
604 
605   /* actually zap the local rows */
606   ierr = ISCreateGeneral(PETSC_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
607   PLogObjectParent(A,istmp);
608 
609   /*
610         Zero the required rows. If the "diagonal block" of the matrix
611      is square and the user wishes to set the diagonal we use seperate
612      code so that MatSetValues() is not called for each diagonal allocating
613      new memory, thus calling lots of mallocs and slowing things down.
614 
615        Contributed by: Mathew Knepley
616   */
617   localdiag = PETSC_FALSE;
618   if (diag && (l->A->M == l->A->N)) {
619     localdiag = PETSC_TRUE;
620     ierr      = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
621   } else {
622     ierr = MatZeroRows(l->A,istmp,0); CHKERRQ(ierr);
623   }
624   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
625   ierr = ISDestroy(istmp); CHKERRQ(ierr);
626 
627   if (diag && (localdiag == PETSC_FALSE)) {
628     for ( i = 0; i < slen; i++ ) {
629       row = lrows[i] + rstart;
630       MatSetValues(A,1,&row,1,&row,diag,INSERT_VALUES);
631     }
632     MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY);
633     MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY);
634   }
635 
636   if (diag) {
637     for ( i = 0; i < slen; i++ ) {
638       row = lrows[i] + rstart;
639       MatSetValues(A,1,&row,1,&row,diag,INSERT_VALUES);
640     }
641     MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY);
642     MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY);
643   }
644   PetscFree(lrows);
645 
646   /* wait on sends */
647   if (nsends) {
648     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
649     ierr        = MPI_Waitall(nsends,send_waits,send_status);CHKERRQ(ierr);
650     PetscFree(send_status);
651   }
652   PetscFree(send_waits); PetscFree(svalues);
653 
654   PetscFunctionReturn(0);
655 }
656 
657 #undef __FUNC__
658 #define __FUNC__ "MatMult_MPIAIJ"
659 int MatMult_MPIAIJ(Mat A,Vec xx,Vec yy)
660 {
661   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
662   int        ierr,nt;
663 
664   PetscFunctionBegin;
665   ierr = VecGetLocalSize(xx,&nt);  CHKERRQ(ierr);
666   if (nt != a->n) {
667     SETERRQ(PETSC_ERR_ARG_SIZ,0,"Incompatible partition of A and xx");
668   }
669   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx); CHKERRQ(ierr);
670   ierr = (*a->A->ops->mult)(a->A,xx,yy); CHKERRQ(ierr);
671   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx); CHKERRQ(ierr);
672   ierr = (*a->B->ops->multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
673   PetscFunctionReturn(0);
674 }
675 
676 #undef __FUNC__
677 #define __FUNC__ "MatMultAdd_MPIAIJ"
678 int MatMultAdd_MPIAIJ(Mat A,Vec xx,Vec yy,Vec zz)
679 {
680   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
681   int        ierr;
682 
683   PetscFunctionBegin;
684   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
685   ierr = (*a->A->ops->multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
686   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
687   ierr = (*a->B->ops->multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
688   PetscFunctionReturn(0);
689 }
690 
691 #undef __FUNC__
692 #define __FUNC__ "MatMultTrans_MPIAIJ"
693 int MatMultTrans_MPIAIJ(Mat A,Vec xx,Vec yy)
694 {
695   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
696   int        ierr;
697 
698   PetscFunctionBegin;
699   /* do nondiagonal part */
700   ierr = (*a->B->ops->multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
701   /* send it on its way */
702   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
703   /* do local part */
704   ierr = (*a->A->ops->multtrans)(a->A,xx,yy); CHKERRQ(ierr);
705   /* receive remote parts: note this assumes the values are not actually */
706   /* inserted in yy until the next line, which is true for my implementation*/
707   /* but is not perhaps always true. */
708   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
709   PetscFunctionReturn(0);
710 }
711 
712 #undef __FUNC__
713 #define __FUNC__ "MatMultTransAdd_MPIAIJ"
714 int MatMultTransAdd_MPIAIJ(Mat A,Vec xx,Vec yy,Vec zz)
715 {
716   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
717   int        ierr;
718 
719   PetscFunctionBegin;
720   /* do nondiagonal part */
721   ierr = (*a->B->ops->multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
722   /* send it on its way */
723   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
724   /* do local part */
725   ierr = (*a->A->ops->multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
726   /* receive remote parts: note this assumes the values are not actually */
727   /* inserted in yy until the next line, which is true for my implementation*/
728   /* but is not perhaps always true. */
729   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
730   PetscFunctionReturn(0);
731 }
732 
733 /*
734   This only works correctly for square matrices where the subblock A->A is the
735    diagonal block
736 */
737 #undef __FUNC__
738 #define __FUNC__ "MatGetDiagonal_MPIAIJ"
739 int MatGetDiagonal_MPIAIJ(Mat A,Vec v)
740 {
741   int        ierr;
742   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
743 
744   PetscFunctionBegin;
745   if (a->M != a->N) SETERRQ(PETSC_ERR_SUP,0,"Supports only square matrix where A->A is diag block");
746   if (a->rstart != a->cstart || a->rend != a->cend) {
747     SETERRQ(PETSC_ERR_ARG_SIZ,0,"row partition must equal col partition");
748   }
749   ierr = MatGetDiagonal(a->A,v);CHKERRQ(ierr);
750   PetscFunctionReturn(0);
751 }
752 
753 #undef __FUNC__
754 #define __FUNC__ "MatScale_MPIAIJ"
755 int MatScale_MPIAIJ(Scalar *aa,Mat A)
756 {
757   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
758   int        ierr;
759 
760   PetscFunctionBegin;
761   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
762   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
763   PetscFunctionReturn(0);
764 }
765 
766 #undef __FUNC__
767 #define __FUNC__ "MatDestroy_MPIAIJ"
768 int MatDestroy_MPIAIJ(Mat mat)
769 {
770   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
771   int        ierr;
772 
773   PetscFunctionBegin;
774   if (--mat->refct > 0) PetscFunctionReturn(0);
775 
776   if (mat->mapping) {
777     ierr = ISLocalToGlobalMappingDestroy(mat->mapping); CHKERRQ(ierr);
778   }
779   if (mat->bmapping) {
780     ierr = ISLocalToGlobalMappingDestroy(mat->bmapping); CHKERRQ(ierr);
781   }
782   if (mat->rmap) {
783     ierr = MapDestroy(mat->rmap);CHKERRQ(ierr);
784   }
785   if (mat->cmap) {
786     ierr = MapDestroy(mat->cmap);CHKERRQ(ierr);
787   }
788 #if defined(USE_PETSC_LOG)
789   PLogObjectState((PetscObject)mat,"Rows=%d, Cols=%d",aij->M,aij->N);
790 #endif
791   ierr = StashDestroy_Private(&aij->stash); CHKERRQ(ierr);
792   PetscFree(aij->rowners);
793   ierr = MatDestroy(aij->A); CHKERRQ(ierr);
794   ierr = MatDestroy(aij->B); CHKERRQ(ierr);
795   if (aij->colmap) PetscFree(aij->colmap);
796   if (aij->garray) PetscFree(aij->garray);
797   if (aij->lvec)   VecDestroy(aij->lvec);
798   if (aij->Mvctx)  VecScatterDestroy(aij->Mvctx);
799   if (aij->rowvalues) PetscFree(aij->rowvalues);
800   PetscFree(aij);
801   PLogObjectDestroy(mat);
802   PetscHeaderDestroy(mat);
803   PetscFunctionReturn(0);
804 }
805 
806 #undef __FUNC__
807 #define __FUNC__ "MatView_MPIAIJ_Binary"
808 extern int MatView_MPIAIJ_Binary(Mat mat,Viewer viewer)
809 {
810   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
811   int         ierr;
812 
813   PetscFunctionBegin;
814   if (aij->size == 1) {
815     ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
816   }
817   else SETERRQ(PETSC_ERR_SUP,0,"Only uniprocessor output supported");
818   PetscFunctionReturn(0);
819 }
820 
821 #undef __FUNC__
822 #define __FUNC__ "MatView_MPIAIJ_ASCIIorDraworMatlab"
823 extern int MatView_MPIAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
824 {
825   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
826   Mat_SeqAIJ* C = (Mat_SeqAIJ*)aij->A->data;
827   int         ierr, format,shift = C->indexshift,rank;
828   FILE        *fd;
829   ViewerType  vtype;
830 
831   PetscFunctionBegin;
832   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
833   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
834     ierr = ViewerGetFormat(viewer,&format);
835     if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
836       MatInfo info;
837       int     flg;
838       MPI_Comm_rank(mat->comm,&rank);
839       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
840       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
841       ierr = OptionsHasName(PETSC_NULL,"-mat_aij_no_inode",&flg); CHKERRQ(ierr);
842       PetscSequentialPhaseBegin(mat->comm,1);
843       if (flg) fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d mem %d, not using I-node routines\n",
844          rank,aij->m,(int)info.nz_used,(int)info.nz_allocated,(int)info.memory);
845       else fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d mem %d, using I-node routines\n",
846          rank,aij->m,(int)info.nz_used,(int)info.nz_allocated,(int)info.memory);
847       ierr = MatGetInfo(aij->A,MAT_LOCAL,&info);
848       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used);
849       ierr = MatGetInfo(aij->B,MAT_LOCAL,&info);
850       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used);
851       fflush(fd);
852       PetscSequentialPhaseEnd(mat->comm,1);
853       ierr = VecScatterView(aij->Mvctx,viewer); CHKERRQ(ierr);
854       PetscFunctionReturn(0);
855     } else if (format == VIEWER_FORMAT_ASCII_INFO) {
856       PetscFunctionReturn(0);
857     }
858   }
859 
860   if (vtype == DRAW_VIEWER) {
861     Draw       draw;
862     PetscTruth isnull;
863     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
864     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) PetscFunctionReturn(0);
865   }
866 
867   if (vtype == ASCII_FILE_VIEWER) {
868     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
869     PetscSequentialPhaseBegin(mat->comm,1);
870     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
871            aij->rank,aij->m,aij->rstart,aij->rend,aij->n,aij->cstart,
872            aij->cend);
873     ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
874     ierr = MatView(aij->B,viewer); CHKERRQ(ierr);
875     fflush(fd);
876     PetscSequentialPhaseEnd(mat->comm,1);
877   } else {
878     int size = aij->size;
879     rank = aij->rank;
880     if (size == 1) {
881       ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
882     } else {
883       /* assemble the entire matrix onto first processor. */
884       Mat         A;
885       Mat_SeqAIJ *Aloc;
886       int         M = aij->M, N = aij->N,m,*ai,*aj,row,*cols,i,*ct;
887       Scalar      *a;
888 
889       if (!rank) {
890         ierr = MatCreateMPIAIJ(mat->comm,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);CHKERRQ(ierr);
891       } else {
892         ierr = MatCreateMPIAIJ(mat->comm,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);CHKERRQ(ierr);
893       }
894       PLogObjectParent(mat,A);
895 
896       /* copy over the A part */
897       Aloc = (Mat_SeqAIJ*) aij->A->data;
898       m = Aloc->m; ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
899       row = aij->rstart;
900       for ( i=0; i<ai[m]+shift; i++ ) {aj[i] += aij->cstart + shift;}
901       for ( i=0; i<m; i++ ) {
902         ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],aj,a,INSERT_VALUES);CHKERRQ(ierr);
903         row++; a += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
904       }
905       aj = Aloc->j;
906       for ( i=0; i<ai[m]+shift; i++ ) {aj[i] -= aij->cstart + shift;}
907 
908       /* copy over the B part */
909       Aloc = (Mat_SeqAIJ*) aij->B->data;
910       m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
911       row = aij->rstart;
912       ct = cols = (int *) PetscMalloc( (ai[m]+1)*sizeof(int) ); CHKPTRQ(cols);
913       for ( i=0; i<ai[m]+shift; i++ ) {cols[i] = aij->garray[aj[i]+shift];}
914       for ( i=0; i<m; i++ ) {
915         ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],cols,a,INSERT_VALUES);CHKERRQ(ierr);
916         row++; a += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
917       }
918       PetscFree(ct);
919       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
920       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
921       /*
922          Everyone has to call to draw the matrix since the graphics waits are
923          synchronized across all processors that share the Draw object
924       */
925       if (!rank || vtype == DRAW_VIEWER) {
926         ierr = MatView(((Mat_MPIAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
927       }
928       ierr = MatDestroy(A); CHKERRQ(ierr);
929     }
930   }
931   PetscFunctionReturn(0);
932 }
933 
934 #undef __FUNC__
935 #define __FUNC__ "MatView_MPIAIJ"
936 int MatView_MPIAIJ(Mat mat,Viewer viewer)
937 {
938   int         ierr;
939   ViewerType  vtype;
940 
941   PetscFunctionBegin;
942   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
943   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
944       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
945     ierr = MatView_MPIAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
946   } else if (vtype == BINARY_FILE_VIEWER) {
947     ierr = MatView_MPIAIJ_Binary(mat,viewer);CHKERRQ(ierr);
948   } else {
949     SETERRQ(1,1,"Viewer type not supported by PETSc object");
950   }
951   PetscFunctionReturn(0);
952 }
953 
954 /*
955     This has to provide several versions.
956 
957      2) a) use only local smoothing updating outer values only once.
958         b) local smoothing updating outer values each inner iteration
959      3) color updating out values betwen colors.
960 */
961 #undef __FUNC__
962 #define __FUNC__ "MatRelax_MPIAIJ"
963 int MatRelax_MPIAIJ(Mat matin,Vec bb,double omega,MatSORType flag,
964                            double fshift,int its,Vec xx)
965 {
966   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
967   Mat        AA = mat->A, BB = mat->B;
968   Mat_SeqAIJ *A = (Mat_SeqAIJ *) AA->data, *B = (Mat_SeqAIJ *)BB->data;
969   Scalar     *b,*x,*xs,*ls,d,*v,sum;
970   int        ierr,*idx, *diag;
971   int        n = mat->n, m = mat->m, i,shift = A->indexshift;
972 
973   PetscFunctionBegin;
974   VecGetArray(xx,&x); VecGetArray(bb,&b); VecGetArray(mat->lvec,&ls);
975   xs = x + shift; /* shift by one for index start of 1 */
976   ls = ls + shift;
977   if (!A->diag) {ierr = MatMarkDiag_SeqAIJ(AA); CHKERRQ(ierr);}
978   diag = A->diag;
979   if ((flag & SOR_LOCAL_SYMMETRIC_SWEEP) == SOR_LOCAL_SYMMETRIC_SWEEP){
980     if (flag & SOR_ZERO_INITIAL_GUESS) {
981       ierr = (*mat->A->ops->relax)(mat->A,bb,omega,flag,fshift,its,xx);CHKERRQ(ierr);
982       PetscFunctionReturn(0);
983     }
984     ierr=VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);CHKERRQ(ierr);
985     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);CHKERRQ(ierr);
986     while (its--) {
987       /* go down through the rows */
988       for ( i=0; i<m; i++ ) {
989         n    = A->i[i+1] - A->i[i];
990 	PLogFlops(4*n+3);
991         idx  = A->j + A->i[i] + shift;
992         v    = A->a + A->i[i] + shift;
993         sum  = b[i];
994         SPARSEDENSEMDOT(sum,xs,v,idx,n);
995         d    = fshift + A->a[diag[i]+shift];
996         n    = B->i[i+1] - B->i[i];
997         idx  = B->j + B->i[i] + shift;
998         v    = B->a + B->i[i] + shift;
999         SPARSEDENSEMDOT(sum,ls,v,idx,n);
1000         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
1001       }
1002       /* come up through the rows */
1003       for ( i=m-1; i>-1; i-- ) {
1004         n    = A->i[i+1] - A->i[i];
1005 	PLogFlops(4*n+3)
1006         idx  = A->j + A->i[i] + shift;
1007         v    = A->a + A->i[i] + shift;
1008         sum  = b[i];
1009         SPARSEDENSEMDOT(sum,xs,v,idx,n);
1010         d    = fshift + A->a[diag[i]+shift];
1011         n    = B->i[i+1] - B->i[i];
1012         idx  = B->j + B->i[i] + shift;
1013         v    = B->a + B->i[i] + shift;
1014         SPARSEDENSEMDOT(sum,ls,v,idx,n);
1015         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
1016       }
1017     }
1018   } else if (flag & SOR_LOCAL_FORWARD_SWEEP){
1019     if (flag & SOR_ZERO_INITIAL_GUESS) {
1020       ierr = (*mat->A->ops->relax)(mat->A,bb,omega,flag,fshift,its,xx);CHKERRQ(ierr);
1021       PetscFunctionReturn(0);
1022     }
1023     ierr=VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);CHKERRQ(ierr);
1024     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);CHKERRQ(ierr);
1025     while (its--) {
1026       for ( i=0; i<m; i++ ) {
1027         n    = A->i[i+1] - A->i[i];
1028 	PLogFlops(4*n+3);
1029         idx  = A->j + A->i[i] + shift;
1030         v    = A->a + A->i[i] + shift;
1031         sum  = b[i];
1032         SPARSEDENSEMDOT(sum,xs,v,idx,n);
1033         d    = fshift + A->a[diag[i]+shift];
1034         n    = B->i[i+1] - B->i[i];
1035         idx  = B->j + B->i[i] + shift;
1036         v    = B->a + B->i[i] + shift;
1037         SPARSEDENSEMDOT(sum,ls,v,idx,n);
1038         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
1039       }
1040     }
1041   } else if (flag & SOR_LOCAL_BACKWARD_SWEEP){
1042     if (flag & SOR_ZERO_INITIAL_GUESS) {
1043       ierr = (*mat->A->ops->relax)(mat->A,bb,omega,flag,fshift,its,xx);CHKERRQ(ierr);
1044       PetscFunctionReturn(0);
1045     }
1046     ierr = VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,
1047                             mat->Mvctx); CHKERRQ(ierr);
1048     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,
1049                             mat->Mvctx); CHKERRQ(ierr);
1050     while (its--) {
1051       for ( i=m-1; i>-1; i-- ) {
1052         n    = A->i[i+1] - A->i[i];
1053 	PLogFlops(4*n+3);
1054         idx  = A->j + A->i[i] + shift;
1055         v    = A->a + A->i[i] + shift;
1056         sum  = b[i];
1057         SPARSEDENSEMDOT(sum,xs,v,idx,n);
1058         d    = fshift + A->a[diag[i]+shift];
1059         n    = B->i[i+1] - B->i[i];
1060         idx  = B->j + B->i[i] + shift;
1061         v    = B->a + B->i[i] + shift;
1062         SPARSEDENSEMDOT(sum,ls,v,idx,n);
1063         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
1064       }
1065     }
1066   } else {
1067     SETERRQ(PETSC_ERR_SUP,0,"Parallel SOR not supported");
1068   }
1069   PetscFunctionReturn(0);
1070 }
1071 
1072 #undef __FUNC__
1073 #define __FUNC__ "MatGetInfo_MPIAIJ"
1074 int MatGetInfo_MPIAIJ(Mat matin,MatInfoType flag,MatInfo *info)
1075 {
1076   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1077   Mat        A = mat->A, B = mat->B;
1078   int        ierr;
1079   double     isend[5], irecv[5];
1080 
1081   PetscFunctionBegin;
1082   info->block_size     = 1.0;
1083   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
1084   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->nz_unneeded;
1085   isend[3] = info->memory;  isend[4] = info->mallocs;
1086   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
1087   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->nz_unneeded;
1088   isend[3] += info->memory;  isend[4] += info->mallocs;
1089   if (flag == MAT_LOCAL) {
1090     info->nz_used      = isend[0];
1091     info->nz_allocated = isend[1];
1092     info->nz_unneeded  = isend[2];
1093     info->memory       = isend[3];
1094     info->mallocs      = isend[4];
1095   } else if (flag == MAT_GLOBAL_MAX) {
1096     ierr = MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_MAX,matin->comm);CHKERRQ(ierr);
1097     info->nz_used      = irecv[0];
1098     info->nz_allocated = irecv[1];
1099     info->nz_unneeded  = irecv[2];
1100     info->memory       = irecv[3];
1101     info->mallocs      = irecv[4];
1102   } else if (flag == MAT_GLOBAL_SUM) {
1103     ierr = MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_SUM,matin->comm);CHKERRQ(ierr);
1104     info->nz_used      = irecv[0];
1105     info->nz_allocated = irecv[1];
1106     info->nz_unneeded  = irecv[2];
1107     info->memory       = irecv[3];
1108     info->mallocs      = irecv[4];
1109   }
1110   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
1111   info->fill_ratio_needed = 0;
1112   info->factor_mallocs    = 0;
1113   info->rows_global       = (double)mat->M;
1114   info->columns_global    = (double)mat->N;
1115   info->rows_local        = (double)mat->m;
1116   info->columns_local     = (double)mat->N;
1117 
1118   PetscFunctionReturn(0);
1119 }
1120 
1121 #undef __FUNC__
1122 #define __FUNC__ "MatSetOption_MPIAIJ"
1123 int MatSetOption_MPIAIJ(Mat A,MatOption op)
1124 {
1125   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
1126 
1127   PetscFunctionBegin;
1128   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
1129       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
1130       op == MAT_COLUMNS_UNSORTED ||
1131       op == MAT_COLUMNS_SORTED ||
1132       op == MAT_NEW_NONZERO_ALLOCATION_ERROR ||
1133       op == MAT_NEW_NONZERO_LOCATION_ERROR) {
1134         MatSetOption(a->A,op);
1135         MatSetOption(a->B,op);
1136   } else if (op == MAT_ROW_ORIENTED) {
1137     a->roworiented = 1;
1138     MatSetOption(a->A,op);
1139     MatSetOption(a->B,op);
1140   } else if (op == MAT_ROWS_SORTED ||
1141              op == MAT_ROWS_UNSORTED ||
1142              op == MAT_SYMMETRIC ||
1143              op == MAT_STRUCTURALLY_SYMMETRIC ||
1144              op == MAT_YES_NEW_DIAGONALS)
1145     PLogInfo(A,"MatSetOption_MPIAIJ:Option ignored\n");
1146   else if (op == MAT_COLUMN_ORIENTED) {
1147     a->roworiented = 0;
1148     MatSetOption(a->A,op);
1149     MatSetOption(a->B,op);
1150   } else if (op == MAT_IGNORE_OFF_PROC_ENTRIES) {
1151     a->donotstash = 1;
1152   } else if (op == MAT_NO_NEW_DIAGONALS){
1153     SETERRQ(PETSC_ERR_SUP,0,"MAT_NO_NEW_DIAGONALS");
1154   } else {
1155     SETERRQ(PETSC_ERR_SUP,0,"unknown option");
1156   }
1157   PetscFunctionReturn(0);
1158 }
1159 
1160 #undef __FUNC__
1161 #define __FUNC__ "MatGetSize_MPIAIJ"
1162 int MatGetSize_MPIAIJ(Mat matin,int *m,int *n)
1163 {
1164   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1165 
1166   PetscFunctionBegin;
1167   if (m) *m = mat->M;
1168   if (n) *n = mat->N;
1169   PetscFunctionReturn(0);
1170 }
1171 
1172 #undef __FUNC__
1173 #define __FUNC__ "MatGetLocalSize_MPIAIJ"
1174 int MatGetLocalSize_MPIAIJ(Mat matin,int *m,int *n)
1175 {
1176   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1177 
1178   PetscFunctionBegin;
1179   if (m) *m = mat->m;
1180   if (n) *n = mat->n;
1181   PetscFunctionReturn(0);
1182 }
1183 
1184 #undef __FUNC__
1185 #define __FUNC__ "MatGetOwnershipRange_MPIAIJ"
1186 int MatGetOwnershipRange_MPIAIJ(Mat matin,int *m,int *n)
1187 {
1188   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1189 
1190   PetscFunctionBegin;
1191   *m = mat->rstart; *n = mat->rend;
1192   PetscFunctionReturn(0);
1193 }
1194 
1195 extern int MatGetRow_SeqAIJ(Mat,int,int*,int**,Scalar**);
1196 extern int MatRestoreRow_SeqAIJ(Mat,int,int*,int**,Scalar**);
1197 
1198 #undef __FUNC__
1199 #define __FUNC__ "MatGetRow_MPIAIJ"
1200 int MatGetRow_MPIAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
1201 {
1202   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1203   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
1204   int        i, ierr, *cworkA, *cworkB, **pcA, **pcB, cstart = mat->cstart;
1205   int        nztot, nzA, nzB, lrow, rstart = mat->rstart, rend = mat->rend;
1206   int        *cmap, *idx_p;
1207 
1208   PetscFunctionBegin;
1209   if (mat->getrowactive == PETSC_TRUE) SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"Already active");
1210   mat->getrowactive = PETSC_TRUE;
1211 
1212   if (!mat->rowvalues && (idx || v)) {
1213     /*
1214         allocate enough space to hold information from the longest row.
1215     */
1216     Mat_SeqAIJ *Aa = (Mat_SeqAIJ *) mat->A->data,*Ba = (Mat_SeqAIJ *) mat->B->data;
1217     int     max = 1,m = mat->m,tmp;
1218     for ( i=0; i<m; i++ ) {
1219       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
1220       if (max < tmp) { max = tmp; }
1221     }
1222     mat->rowvalues  = (Scalar *) PetscMalloc(max*(sizeof(int)+sizeof(Scalar)));CHKPTRQ(mat->rowvalues);
1223     mat->rowindices = (int *) (mat->rowvalues + max);
1224   }
1225 
1226   if (row < rstart || row >= rend) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Only local rows")
1227   lrow = row - rstart;
1228 
1229   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
1230   if (!v)   {pvA = 0; pvB = 0;}
1231   if (!idx) {pcA = 0; if (!v) pcB = 0;}
1232   ierr = (*mat->A->ops->getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1233   ierr = (*mat->B->ops->getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1234   nztot = nzA + nzB;
1235 
1236   cmap  = mat->garray;
1237   if (v  || idx) {
1238     if (nztot) {
1239       /* Sort by increasing column numbers, assuming A and B already sorted */
1240       int imark = -1;
1241       if (v) {
1242         *v = v_p = mat->rowvalues;
1243         for ( i=0; i<nzB; i++ ) {
1244           if (cmap[cworkB[i]] < cstart)   v_p[i] = vworkB[i];
1245           else break;
1246         }
1247         imark = i;
1248         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
1249         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
1250       }
1251       if (idx) {
1252         *idx = idx_p = mat->rowindices;
1253         if (imark > -1) {
1254           for ( i=0; i<imark; i++ ) {
1255             idx_p[i] = cmap[cworkB[i]];
1256           }
1257         } else {
1258           for ( i=0; i<nzB; i++ ) {
1259             if (cmap[cworkB[i]] < cstart)   idx_p[i] = cmap[cworkB[i]];
1260             else break;
1261           }
1262           imark = i;
1263         }
1264         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart + cworkA[i];
1265         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]];
1266       }
1267     } else {
1268       if (idx) *idx = 0;
1269       if (v)   *v   = 0;
1270     }
1271   }
1272   *nz = nztot;
1273   ierr = (*mat->A->ops->restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1274   ierr = (*mat->B->ops->restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1275   PetscFunctionReturn(0);
1276 }
1277 
1278 #undef __FUNC__
1279 #define __FUNC__ "MatRestoreRow_MPIAIJ"
1280 int MatRestoreRow_MPIAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
1281 {
1282   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
1283 
1284   PetscFunctionBegin;
1285   if (aij->getrowactive == PETSC_FALSE) {
1286     SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"MatGetRow not called");
1287   }
1288   aij->getrowactive = PETSC_FALSE;
1289   PetscFunctionReturn(0);
1290 }
1291 
1292 #undef __FUNC__
1293 #define __FUNC__ "MatNorm_MPIAIJ"
1294 int MatNorm_MPIAIJ(Mat mat,NormType type,double *norm)
1295 {
1296   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
1297   Mat_SeqAIJ *amat = (Mat_SeqAIJ*) aij->A->data, *bmat = (Mat_SeqAIJ*) aij->B->data;
1298   int        ierr, i, j, cstart = aij->cstart,shift = amat->indexshift;
1299   double     sum = 0.0;
1300   Scalar     *v;
1301 
1302   PetscFunctionBegin;
1303   if (aij->size == 1) {
1304     ierr =  MatNorm(aij->A,type,norm); CHKERRQ(ierr);
1305   } else {
1306     if (type == NORM_FROBENIUS) {
1307       v = amat->a;
1308       for (i=0; i<amat->nz; i++ ) {
1309 #if defined(USE_PETSC_COMPLEX)
1310         sum += PetscReal(PetscConj(*v)*(*v)); v++;
1311 #else
1312         sum += (*v)*(*v); v++;
1313 #endif
1314       }
1315       v = bmat->a;
1316       for (i=0; i<bmat->nz; i++ ) {
1317 #if defined(USE_PETSC_COMPLEX)
1318         sum += PetscReal(PetscConj(*v)*(*v)); v++;
1319 #else
1320         sum += (*v)*(*v); v++;
1321 #endif
1322       }
1323       ierr = MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);CHKERRQ(ierr);
1324       *norm = sqrt(*norm);
1325     } else if (type == NORM_1) { /* max column norm */
1326       double *tmp, *tmp2;
1327       int    *jj, *garray = aij->garray;
1328       tmp  = (double *) PetscMalloc( (aij->N+1)*sizeof(double) ); CHKPTRQ(tmp);
1329       tmp2 = (double *) PetscMalloc( (aij->N+1)*sizeof(double) ); CHKPTRQ(tmp2);
1330       PetscMemzero(tmp,aij->N*sizeof(double));
1331       *norm = 0.0;
1332       v = amat->a; jj = amat->j;
1333       for ( j=0; j<amat->nz; j++ ) {
1334         tmp[cstart + *jj++ + shift] += PetscAbsScalar(*v);  v++;
1335       }
1336       v = bmat->a; jj = bmat->j;
1337       for ( j=0; j<bmat->nz; j++ ) {
1338         tmp[garray[*jj++ + shift]] += PetscAbsScalar(*v); v++;
1339       }
1340       ierr = MPI_Allreduce(tmp,tmp2,aij->N,MPI_DOUBLE,MPI_SUM,mat->comm);CHKERRQ(ierr);
1341       for ( j=0; j<aij->N; j++ ) {
1342         if (tmp2[j] > *norm) *norm = tmp2[j];
1343       }
1344       PetscFree(tmp); PetscFree(tmp2);
1345     } else if (type == NORM_INFINITY) { /* max row norm */
1346       double ntemp = 0.0;
1347       for ( j=0; j<amat->m; j++ ) {
1348         v = amat->a + amat->i[j] + shift;
1349         sum = 0.0;
1350         for ( i=0; i<amat->i[j+1]-amat->i[j]; i++ ) {
1351           sum += PetscAbsScalar(*v); v++;
1352         }
1353         v = bmat->a + bmat->i[j] + shift;
1354         for ( i=0; i<bmat->i[j+1]-bmat->i[j]; i++ ) {
1355           sum += PetscAbsScalar(*v); v++;
1356         }
1357         if (sum > ntemp) ntemp = sum;
1358       }
1359       ierr = MPI_Allreduce(&ntemp,norm,1,MPI_DOUBLE,MPI_MAX,mat->comm);CHKERRQ(ierr);
1360     } else {
1361       SETERRQ(PETSC_ERR_SUP,0,"No support for two norm");
1362     }
1363   }
1364   PetscFunctionReturn(0);
1365 }
1366 
1367 #undef __FUNC__
1368 #define __FUNC__ "MatTranspose_MPIAIJ"
1369 int MatTranspose_MPIAIJ(Mat A,Mat *matout)
1370 {
1371   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
1372   Mat_SeqAIJ *Aloc = (Mat_SeqAIJ *) a->A->data;
1373   int        ierr,shift = Aloc->indexshift;
1374   int        M = a->M, N = a->N,m,*ai,*aj,row,*cols,i,*ct;
1375   Mat        B;
1376   Scalar     *array;
1377 
1378   PetscFunctionBegin;
1379   if (matout == PETSC_NULL && M != N) {
1380     SETERRQ(PETSC_ERR_ARG_SIZ,0,"Square matrix only for in-place");
1381   }
1382 
1383   ierr = MatCreateMPIAIJ(A->comm,a->n,a->m,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);CHKERRQ(ierr);
1384 
1385   /* copy over the A part */
1386   Aloc = (Mat_SeqAIJ*) a->A->data;
1387   m = Aloc->m; ai = Aloc->i; aj = Aloc->j; array = Aloc->a;
1388   row = a->rstart;
1389   for ( i=0; i<ai[m]+shift; i++ ) {aj[i] += a->cstart + shift;}
1390   for ( i=0; i<m; i++ ) {
1391     ierr = MatSetValues(B,ai[i+1]-ai[i],aj,1,&row,array,INSERT_VALUES);CHKERRQ(ierr);
1392     row++; array += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
1393   }
1394   aj = Aloc->j;
1395   for ( i=0; i<ai[m]+shift; i++ ) {aj[i] -= a->cstart + shift;}
1396 
1397   /* copy over the B part */
1398   Aloc = (Mat_SeqAIJ*) a->B->data;
1399   m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; array = Aloc->a;
1400   row = a->rstart;
1401   ct = cols = (int *) PetscMalloc( (1+ai[m]-shift)*sizeof(int) ); CHKPTRQ(cols);
1402   for ( i=0; i<ai[m]+shift; i++ ) {cols[i] = a->garray[aj[i]+shift];}
1403   for ( i=0; i<m; i++ ) {
1404     ierr = MatSetValues(B,ai[i+1]-ai[i],cols,1,&row,array,INSERT_VALUES);CHKERRQ(ierr);
1405     row++; array += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
1406   }
1407   PetscFree(ct);
1408   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1409   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1410   if (matout != PETSC_NULL) {
1411     *matout = B;
1412   } else {
1413     PetscOps       *Abops;
1414     struct _MatOps *Aops;
1415 
1416     /* This isn't really an in-place transpose .... but free data structures from a */
1417     PetscFree(a->rowners);
1418     ierr = MatDestroy(a->A); CHKERRQ(ierr);
1419     ierr = MatDestroy(a->B); CHKERRQ(ierr);
1420     if (a->colmap) PetscFree(a->colmap);
1421     if (a->garray) PetscFree(a->garray);
1422     if (a->lvec) VecDestroy(a->lvec);
1423     if (a->Mvctx) VecScatterDestroy(a->Mvctx);
1424     PetscFree(a);
1425 
1426     /*
1427        This is horrible, horrible code. We need to keep the
1428       A pointers for the bops and ops but copy everything
1429       else from C.
1430     */
1431     Abops = A->bops;
1432     Aops  = A->ops;
1433     PetscMemcpy(A,B,sizeof(struct _p_Mat));
1434     A->bops = Abops;
1435     A->ops  = Aops;
1436     PetscHeaderDestroy(B);
1437   }
1438   PetscFunctionReturn(0);
1439 }
1440 
1441 #undef __FUNC__
1442 #define __FUNC__ "MatDiagonalScale_MPIAIJ"
1443 int MatDiagonalScale_MPIAIJ(Mat mat,Vec ll,Vec rr)
1444 {
1445   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
1446   Mat a = aij->A, b = aij->B;
1447   int ierr,s1,s2,s3;
1448 
1449   PetscFunctionBegin;
1450   ierr = MatGetLocalSize(mat,&s2,&s3); CHKERRQ(ierr);
1451   if (rr) {
1452     ierr = VecGetLocalSize(rr,&s1);CHKERRQ(ierr);
1453     if (s1!=s3) SETERRQ(PETSC_ERR_ARG_SIZ,0,"right vector non-conforming local size");
1454     /* Overlap communication with computation. */
1455     ierr = VecScatterBegin(rr,aij->lvec,INSERT_VALUES,SCATTER_FORWARD,aij->Mvctx); CHKERRQ(ierr);
1456   }
1457   if (ll) {
1458     ierr = VecGetLocalSize(ll,&s1);CHKERRQ(ierr);
1459     if (s1!=s2) SETERRQ(PETSC_ERR_ARG_SIZ,0,"left vector non-conforming local size");
1460     ierr = (*b->ops->diagonalscale)(b,ll,0); CHKERRQ(ierr);
1461   }
1462   /* scale  the diagonal block */
1463   ierr = (*a->ops->diagonalscale)(a,ll,rr); CHKERRQ(ierr);
1464 
1465   if (rr) {
1466     /* Do a scatter end and then right scale the off-diagonal block */
1467     ierr = VecScatterEnd(rr,aij->lvec,INSERT_VALUES,SCATTER_FORWARD,aij->Mvctx); CHKERRQ(ierr);
1468     ierr = (*b->ops->diagonalscale)(b,0,aij->lvec); CHKERRQ(ierr);
1469   }
1470 
1471   PetscFunctionReturn(0);
1472 }
1473 
1474 
1475 extern int MatPrintHelp_SeqAIJ(Mat);
1476 #undef __FUNC__
1477 #define __FUNC__ "MatPrintHelp_MPIAIJ"
1478 int MatPrintHelp_MPIAIJ(Mat A)
1479 {
1480   Mat_MPIAIJ *a   = (Mat_MPIAIJ*) A->data;
1481   int        ierr;
1482 
1483   PetscFunctionBegin;
1484   if (!a->rank) {
1485     ierr = MatPrintHelp_SeqAIJ(a->A);CHKERRQ(ierr);
1486   }
1487   PetscFunctionReturn(0);
1488 }
1489 
1490 #undef __FUNC__
1491 #define __FUNC__ "MatGetBlockSize_MPIAIJ"
1492 int MatGetBlockSize_MPIAIJ(Mat A,int *bs)
1493 {
1494   PetscFunctionBegin;
1495   *bs = 1;
1496   PetscFunctionReturn(0);
1497 }
1498 #undef __FUNC__
1499 #define __FUNC__ "MatSetUnfactored_MPIAIJ"
1500 int MatSetUnfactored_MPIAIJ(Mat A)
1501 {
1502   Mat_MPIAIJ *a   = (Mat_MPIAIJ*) A->data;
1503   int        ierr;
1504 
1505   PetscFunctionBegin;
1506   ierr = MatSetUnfactored(a->A); CHKERRQ(ierr);
1507   PetscFunctionReturn(0);
1508 }
1509 
1510 #undef __FUNC__
1511 #define __FUNC__ "MatEqual_MPIAIJ"
1512 int MatEqual_MPIAIJ(Mat A, Mat B, PetscTruth *flag)
1513 {
1514   Mat_MPIAIJ *matB = (Mat_MPIAIJ *) B->data,*matA = (Mat_MPIAIJ *) A->data;
1515   Mat        a, b, c, d;
1516   PetscTruth flg;
1517   int        ierr;
1518 
1519   PetscFunctionBegin;
1520   if (B->type != MATMPIAIJ) SETERRQ(PETSC_ERR_ARG_INCOMP,0,"Matrices must be same type");
1521   a = matA->A; b = matA->B;
1522   c = matB->A; d = matB->B;
1523 
1524   ierr = MatEqual(a, c, &flg); CHKERRQ(ierr);
1525   if (flg == PETSC_TRUE) {
1526     ierr = MatEqual(b, d, &flg); CHKERRQ(ierr);
1527   }
1528   ierr = MPI_Allreduce(&flg, flag, 1, MPI_INT, MPI_LAND, A->comm);CHKERRQ(ierr);
1529   PetscFunctionReturn(0);
1530 }
1531 
1532 #undef __FUNC__
1533 #define __FUNC__ "MatCopy_MPIAIJ"
1534 int MatCopy_MPIAIJ(Mat A,Mat B,MatStructure str)
1535 {
1536   int        ierr;
1537   Mat_MPIAIJ *a = (Mat_MPIAIJ *)A->data;
1538   Mat_MPIAIJ *b = (Mat_MPIAIJ *)B->data;
1539 
1540   PetscFunctionBegin;
1541   if (str != SAME_NONZERO_PATTERN || B->type != MATMPIAIJ) {
1542     /* because of the column compression in the off-processor part of the matrix a->B,
1543        the number of columns in a->B and b->B may be different, hence we cannot call
1544        the MatCopy() directly on the two parts. If need be, we can provide a more
1545        efficient copy than the MatCopy_Basic() by first uncompressing the a->B matrices
1546        then copying the submatrices */
1547     ierr = MatCopy_Basic(A,B,str);CHKERRQ(ierr);
1548   } else {
1549     ierr = MatCopy(a->A,b->A,str);CHKERRQ(ierr);
1550     ierr = MatCopy(a->B,b->B,str);CHKERRQ(ierr);
1551   }
1552   PetscFunctionReturn(0);
1553 }
1554 
1555 extern int MatConvertSameType_MPIAIJ(Mat,Mat *,int);
1556 extern int MatIncreaseOverlap_MPIAIJ(Mat , int, IS *, int);
1557 extern int MatFDColoringCreate_MPIAIJ(Mat,ISColoring,MatFDColoring);
1558 extern int MatGetSubMatrices_MPIAIJ (Mat ,int , IS *,IS *,MatGetSubMatrixCall,Mat **);
1559 extern int MatGetSubMatrix_MPIAIJ (Mat ,IS,IS,int,MatGetSubMatrixCall,Mat *);
1560 
1561 /* -------------------------------------------------------------------*/
1562 static struct _MatOps MatOps_Values = {MatSetValues_MPIAIJ,
1563        MatGetRow_MPIAIJ,
1564        MatRestoreRow_MPIAIJ,
1565        MatMult_MPIAIJ,
1566        MatMultAdd_MPIAIJ,
1567        MatMultTrans_MPIAIJ,
1568        MatMultTransAdd_MPIAIJ,
1569        0,
1570        0,
1571        0,
1572        0,
1573        0,
1574        0,
1575        MatRelax_MPIAIJ,
1576        MatTranspose_MPIAIJ,
1577        MatGetInfo_MPIAIJ,
1578        MatEqual_MPIAIJ,
1579        MatGetDiagonal_MPIAIJ,
1580        MatDiagonalScale_MPIAIJ,
1581        MatNorm_MPIAIJ,
1582        MatAssemblyBegin_MPIAIJ,
1583        MatAssemblyEnd_MPIAIJ,
1584        0,
1585        MatSetOption_MPIAIJ,
1586        MatZeroEntries_MPIAIJ,
1587        MatZeroRows_MPIAIJ,
1588        0,
1589        0,
1590        0,
1591        0,
1592        MatGetSize_MPIAIJ,
1593        MatGetLocalSize_MPIAIJ,
1594        MatGetOwnershipRange_MPIAIJ,
1595        0,
1596        0,
1597        0,
1598        0,
1599        MatConvertSameType_MPIAIJ,
1600        0,
1601        0,
1602        0,
1603        0,
1604        0,
1605        MatGetSubMatrices_MPIAIJ,
1606        MatIncreaseOverlap_MPIAIJ,
1607        MatGetValues_MPIAIJ,
1608        MatCopy_MPIAIJ,
1609        MatPrintHelp_MPIAIJ,
1610        MatScale_MPIAIJ,
1611        0,
1612        0,
1613        0,
1614        MatGetBlockSize_MPIAIJ,
1615        0,
1616        0,
1617        0,
1618        0,
1619        MatFDColoringCreate_MPIAIJ,
1620        0,
1621        MatSetUnfactored_MPIAIJ,
1622        0,
1623        0,
1624        MatGetSubMatrix_MPIAIJ,
1625        0,
1626        0,
1627        MatGetMaps_Petsc};
1628 
1629 
1630 #undef __FUNC__
1631 #define __FUNC__ "MatCreateMPIAIJ"
1632 /*@C
1633    MatCreateMPIAIJ - Creates a sparse parallel matrix in AIJ format
1634    (the default parallel PETSc format).  For good matrix assembly performance
1635    the user should preallocate the matrix storage by setting the parameters
1636    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1637    performance can be increased by more than a factor of 50.
1638 
1639    Collective on MPI_Comm
1640 
1641    Input Parameters:
1642 +  comm - MPI communicator
1643 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1644            This value should be the same as the local size used in creating the
1645            y vector for the matrix-vector product y = Ax.
1646 .  n - This value should be the same as the local size used in creating the
1647        x vector for the matrix-vector product y = Ax. (or PETSC_DECIDE to have
1648        calculated if N is given) For square matrices n is almost always m.
1649 .  M - number of global rows (or PETSC_DETERMINE to have calculated if m is given)
1650 .  N - number of global columns (or PETSC_DETERMINE to have calculated if n is given)
1651 .  d_nz - number of nonzeros per row in diagonal portion of local submatrix
1652            (same for all local rows)
1653 .  d_nzz - array containing the number of nonzeros in the various rows of the
1654            diagonal portion of the local submatrix (possibly different for each row)
1655            or PETSC_NULL. You must leave room for the diagonal entry even if
1656            it is zero.
1657 .  o_nz - number of nonzeros per row in the off-diagonal portion of local
1658            submatrix (same for all local rows).
1659 -  o_nzz - array containing the number of nonzeros in the various rows of the
1660            off-diagonal portion of the local submatrix (possibly different for
1661            each row) or PETSC_NULL.
1662 
1663    Output Parameter:
1664 .  A - the matrix
1665 
1666    Notes:
1667    If PETSC_DECIDE or  PETSC_DETERMINE is used for a particular argument on one processor
1668    than it must be used on all processors that share the object for that argument.
1669 
1670    The AIJ format (also called the Yale sparse matrix format or
1671    compressed row storage), is fully compatible with standard Fortran 77
1672    storage.  That is, the stored row and column indices can begin at
1673    either one (as in Fortran) or zero.  See the users manual for details.
1674 
1675    The user MUST specify either the local or global matrix dimensions
1676    (possibly both).
1677 
1678    By default, this format uses inodes (identical nodes) when possible.
1679    We search for consecutive rows with the same nonzero structure, thereby
1680    reusing matrix information to achieve increased efficiency.
1681 
1682    Options Database Keys:
1683 +  -mat_aij_no_inode  - Do not use inodes
1684 .  -mat_aij_inode_limit <limit> - Sets inode limit (max limit=5)
1685 -  -mat_aij_oneindex - Internally use indexing starting at 1
1686         rather than 0.  Note that when calling MatSetValues(),
1687         the user still MUST index entries starting at 0!
1688 .   -mat_mpi - use the parallel matrix data structures even on one processor
1689                (defaults to using SeqBAIJ format on one processor)
1690 .   -mat_mpi - use the parallel matrix data structures even on one processor
1691                (defaults to using SeqAIJ format on one processor)
1692 
1693 
1694    Storage Information:
1695    For a square global matrix we define each processor's diagonal portion
1696    to be its local rows and the corresponding columns (a square submatrix);
1697    each processor's off-diagonal portion encompasses the remainder of the
1698    local matrix (a rectangular submatrix).
1699 
1700    The user can specify preallocated storage for the diagonal part of
1701    the local submatrix with either d_nz or d_nnz (not both).  Set
1702    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1703    memory allocation.  Likewise, specify preallocated storage for the
1704    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1705 
1706    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1707    the figure below we depict these three local rows and all columns (0-11).
1708 
1709 .vb
1710              0 1 2 3 4 5 6 7 8 9 10 11
1711             -------------------
1712      row 3  |  o o o d d d o o o o o o
1713      row 4  |  o o o d d d o o o o o o
1714      row 5  |  o o o d d d o o o o o o
1715             -------------------
1716 .ve
1717 
1718    Thus, any entries in the d locations are stored in the d (diagonal)
1719    submatrix, and any entries in the o locations are stored in the
1720    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1721    stored simply in the MATSEQAIJ format for compressed row storage.
1722 
1723    Now d_nz should indicate the number of nonzeros per row in the d matrix,
1724    and o_nz should indicate the number of nonzeros per row in the o matrix.
1725    In general, for PDE problems in which most nonzeros are near the diagonal,
1726    one expects d_nz >> o_nz. For large problems you MUST preallocate memory
1727    or you will get TERRIBLE performance; see the users' manual chapter on
1728    matrices.
1729 
1730 .keywords: matrix, aij, compressed row, sparse, parallel
1731 
1732 .seealso: MatCreate(), MatCreateSeqAIJ(), MatSetValues()
1733 @*/
1734 int MatCreateMPIAIJ(MPI_Comm comm,int m,int n,int M,int N,int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1735 {
1736   Mat          B;
1737   Mat_MPIAIJ   *b;
1738   int          ierr, i,sum[2],work[2],size,flag1 = 0, flag2 = 0;
1739 
1740   PetscFunctionBegin;
1741   MPI_Comm_size(comm,&size);
1742   ierr = OptionsHasName(PETSC_NULL,"-mat_mpiaij",&flag1); CHKERRQ(ierr);
1743   ierr = OptionsHasName(PETSC_NULL,"-mat_mpi",&flag2); CHKERRQ(ierr);
1744   if (!flag1 && !flag2 && size == 1) {
1745     if (M == PETSC_DECIDE) M = m;
1746     if (N == PETSC_DECIDE) N = n;
1747     ierr = MatCreateSeqAIJ(comm,M,N,d_nz,d_nnz,A); CHKERRQ(ierr);
1748     PetscFunctionReturn(0);
1749   }
1750 
1751   *A = 0;
1752   PetscHeaderCreate(B,_p_Mat,struct _MatOps,MAT_COOKIE,MATMPIAIJ,comm,MatDestroy,MatView);
1753   PLogObjectCreate(B);
1754   B->data            = (void *) (b = PetscNew(Mat_MPIAIJ)); CHKPTRQ(b);
1755   PetscMemzero(b,sizeof(Mat_MPIAIJ));
1756   PetscMemcpy(B->ops,&MatOps_Values,sizeof(struct _MatOps));
1757   B->ops->destroy    = MatDestroy_MPIAIJ;
1758   B->ops->view       = MatView_MPIAIJ;
1759   B->factor          = 0;
1760   B->assembled       = PETSC_FALSE;
1761   B->mapping         = 0;
1762 
1763   B->insertmode      = NOT_SET_VALUES;
1764   b->size            = size;
1765   MPI_Comm_rank(comm,&b->rank);
1766 
1767   if (m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL)) {
1768     SETERRQ(PETSC_ERR_ARG_WRONG,0,"Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1769   }
1770 
1771   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1772     work[0] = m; work[1] = n;
1773     ierr = MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );CHKERRQ(ierr);
1774     if (M == PETSC_DECIDE) M = sum[0];
1775     if (N == PETSC_DECIDE) N = sum[1];
1776   }
1777   if (m == PETSC_DECIDE) {m = M/b->size + ((M % b->size) > b->rank);}
1778   if (n == PETSC_DECIDE) {n = N/b->size + ((N % b->size) > b->rank);}
1779   b->m = m; B->m = m;
1780   b->n = n; B->n = n;
1781   b->N = N; B->N = N;
1782   b->M = M; B->M = M;
1783 
1784   /* the information in the maps duplicates the information computed below, eventually
1785      we should remove the duplicate information that is not contained in the maps */
1786   ierr = MapCreateMPI(comm,m,M,&B->rmap);CHKERRQ(ierr);
1787   ierr = MapCreateMPI(comm,n,N,&B->cmap);CHKERRQ(ierr);
1788 
1789   /* build local table of row and column ownerships */
1790   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1791   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIAIJ));
1792   b->cowners = b->rowners + b->size + 2;
1793   ierr = MPI_Allgather(&m,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1794   b->rowners[0] = 0;
1795   for ( i=2; i<=b->size; i++ ) {
1796     b->rowners[i] += b->rowners[i-1];
1797   }
1798   b->rstart = b->rowners[b->rank];
1799   b->rend   = b->rowners[b->rank+1];
1800   ierr = MPI_Allgather(&n,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1801   b->cowners[0] = 0;
1802   for ( i=2; i<=b->size; i++ ) {
1803     b->cowners[i] += b->cowners[i-1];
1804   }
1805   b->cstart = b->cowners[b->rank];
1806   b->cend   = b->cowners[b->rank+1];
1807 
1808   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1809   ierr = MatCreateSeqAIJ(PETSC_COMM_SELF,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1810   PLogObjectParent(B,b->A);
1811   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1812   ierr = MatCreateSeqAIJ(PETSC_COMM_SELF,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1813   PLogObjectParent(B,b->B);
1814 
1815   /* build cache for off array entries formed */
1816   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1817   b->donotstash  = 0;
1818   b->colmap      = 0;
1819   b->garray      = 0;
1820   b->roworiented = 1;
1821 
1822   /* stuff used for matrix vector multiply */
1823   b->lvec      = 0;
1824   b->Mvctx     = 0;
1825 
1826   /* stuff for MatGetRow() */
1827   b->rowindices   = 0;
1828   b->rowvalues    = 0;
1829   b->getrowactive = PETSC_FALSE;
1830 
1831   *A = B;
1832   PetscFunctionReturn(0);
1833 }
1834 
1835 #undef __FUNC__
1836 #define __FUNC__ "MatConvertSameType_MPIAIJ"
1837 int MatConvertSameType_MPIAIJ(Mat matin,Mat *newmat,int cpvalues)
1838 {
1839   Mat        mat;
1840   Mat_MPIAIJ *a,*oldmat = (Mat_MPIAIJ *) matin->data;
1841   int        ierr, len=0, flg;
1842 
1843   PetscFunctionBegin;
1844   *newmat       = 0;
1845   PetscHeaderCreate(mat,_p_Mat,struct _MatOps,MAT_COOKIE,MATMPIAIJ,matin->comm,MatDestroy,MatView);
1846   PLogObjectCreate(mat);
1847   mat->data       = (void *) (a = PetscNew(Mat_MPIAIJ)); CHKPTRQ(a);
1848   PetscMemcpy(mat->ops,&MatOps_Values,sizeof(struct _MatOps));
1849   mat->ops->destroy    = MatDestroy_MPIAIJ;
1850   mat->ops->view       = MatView_MPIAIJ;
1851   mat->factor     = matin->factor;
1852   mat->assembled  = PETSC_TRUE;
1853 
1854   a->m = mat->m   = oldmat->m;
1855   a->n = mat->n   = oldmat->n;
1856   a->M = mat->M   = oldmat->M;
1857   a->N = mat->N   = oldmat->N;
1858 
1859   a->rstart       = oldmat->rstart;
1860   a->rend         = oldmat->rend;
1861   a->cstart       = oldmat->cstart;
1862   a->cend         = oldmat->cend;
1863   a->size         = oldmat->size;
1864   a->rank         = oldmat->rank;
1865   mat->insertmode = NOT_SET_VALUES;
1866   a->rowvalues    = 0;
1867   a->getrowactive = PETSC_FALSE;
1868 
1869   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1870   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIAIJ));
1871   a->cowners = a->rowners + a->size + 2;
1872   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1873   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1874   if (oldmat->colmap) {
1875     a->colmap = (int *) PetscMalloc((a->N)*sizeof(int));CHKPTRQ(a->colmap);
1876     PLogObjectMemory(mat,(a->N)*sizeof(int));
1877     PetscMemcpy(a->colmap,oldmat->colmap,(a->N)*sizeof(int));
1878   } else a->colmap = 0;
1879   if (oldmat->garray) {
1880     len = ((Mat_SeqAIJ *) (oldmat->B->data))->n;
1881     a->garray = (int *) PetscMalloc((len+1)*sizeof(int)); CHKPTRQ(a->garray);
1882     PLogObjectMemory(mat,len*sizeof(int));
1883     if (len) PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1884   } else a->garray = 0;
1885 
1886   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1887   PLogObjectParent(mat,a->lvec);
1888   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1889   PLogObjectParent(mat,a->Mvctx);
1890   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1891   PLogObjectParent(mat,a->A);
1892   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1893   PLogObjectParent(mat,a->B);
1894   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1895   if (flg) {
1896     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1897   }
1898   *newmat = mat;
1899   PetscFunctionReturn(0);
1900 }
1901 
1902 #include "sys.h"
1903 
1904 #undef __FUNC__
1905 #define __FUNC__ "MatLoad_MPIAIJ"
1906 int MatLoad_MPIAIJ(Viewer viewer,MatType type,Mat *newmat)
1907 {
1908   Mat          A;
1909   Scalar       *vals,*svals;
1910   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1911   MPI_Status   status;
1912   int          i, nz, ierr, j,rstart, rend, fd;
1913   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,maxnz,*cols;
1914   int          *ourlens,*sndcounts = 0,*procsnz = 0, *offlens,jj,*mycols,*smycols;
1915   int          tag = ((PetscObject)viewer)->tag;
1916 
1917   PetscFunctionBegin;
1918   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1919   if (!rank) {
1920     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1921     ierr = PetscBinaryRead(fd,(char *)header,4,PETSC_INT); CHKERRQ(ierr);
1922     if (header[0] != MAT_COOKIE) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"not matrix object");
1923     if (header[3] < 0) {
1924       SETERRQ(PETSC_ERR_FILE_UNEXPECTED,1,"Matrix in special format on disk, cannot load as MPIAIJ");
1925     }
1926   }
1927 
1928   ierr = MPI_Bcast(header+1,3,MPI_INT,0,comm);CHKERRQ(ierr);
1929   M = header[1]; N = header[2];
1930   /* determine ownership of all rows */
1931   m = M/size + ((M % size) > rank);
1932   rowners = (int *) PetscMalloc((size+2)*sizeof(int)); CHKPTRQ(rowners);
1933   ierr = MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1934   rowners[0] = 0;
1935   for ( i=2; i<=size; i++ ) {
1936     rowners[i] += rowners[i-1];
1937   }
1938   rstart = rowners[rank];
1939   rend   = rowners[rank+1];
1940 
1941   /* distribute row lengths to all processors */
1942   ourlens = (int*) PetscMalloc( 2*(rend-rstart)*sizeof(int) ); CHKPTRQ(ourlens);
1943   offlens = ourlens + (rend-rstart);
1944   if (!rank) {
1945     rowlengths = (int*) PetscMalloc( M*sizeof(int) ); CHKPTRQ(rowlengths);
1946     ierr = PetscBinaryRead(fd,rowlengths,M,PETSC_INT); CHKERRQ(ierr);
1947     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1948     for ( i=0; i<size; i++ ) sndcounts[i] = rowners[i+1] - rowners[i];
1949     ierr = MPI_Scatterv(rowlengths,sndcounts,rowners,MPI_INT,ourlens,rend-rstart,MPI_INT,0,comm);CHKERRQ(ierr);
1950     PetscFree(sndcounts);
1951   } else {
1952     ierr = MPI_Scatterv(0,0,0,MPI_INT,ourlens,rend-rstart,MPI_INT, 0,comm);CHKERRQ(ierr);
1953   }
1954 
1955   if (!rank) {
1956     /* calculate the number of nonzeros on each processor */
1957     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1958     PetscMemzero(procsnz,size*sizeof(int));
1959     for ( i=0; i<size; i++ ) {
1960       for ( j=rowners[i]; j< rowners[i+1]; j++ ) {
1961         procsnz[i] += rowlengths[j];
1962       }
1963     }
1964     PetscFree(rowlengths);
1965 
1966     /* determine max buffer needed and allocate it */
1967     maxnz = 0;
1968     for ( i=0; i<size; i++ ) {
1969       maxnz = PetscMax(maxnz,procsnz[i]);
1970     }
1971     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1972 
1973     /* read in my part of the matrix column indices  */
1974     nz     = procsnz[0];
1975     mycols = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1976     ierr   = PetscBinaryRead(fd,mycols,nz,PETSC_INT); CHKERRQ(ierr);
1977 
1978     /* read in every one elses and ship off */
1979     for ( i=1; i<size; i++ ) {
1980       nz   = procsnz[i];
1981       ierr = PetscBinaryRead(fd,cols,nz,PETSC_INT); CHKERRQ(ierr);
1982       ierr = MPI_Send(cols,nz,MPI_INT,i,tag,comm);CHKERRQ(ierr);
1983     }
1984     PetscFree(cols);
1985   } else {
1986     /* determine buffer space needed for message */
1987     nz = 0;
1988     for ( i=0; i<m; i++ ) {
1989       nz += ourlens[i];
1990     }
1991     mycols = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1992 
1993     /* receive message of column indices*/
1994     ierr = MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);CHKERRQ(ierr);
1995     ierr = MPI_Get_count(&status,MPI_INT,&maxnz);CHKERRQ(ierr);
1996     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"something is wrong with file");
1997   }
1998 
1999   /* loop over local rows, determining number of off diagonal entries */
2000   PetscMemzero(offlens,m*sizeof(int));
2001   jj = 0;
2002   for ( i=0; i<m; i++ ) {
2003     for ( j=0; j<ourlens[i]; j++ ) {
2004       if (mycols[jj] < rstart || mycols[jj] >= rend) offlens[i]++;
2005       jj++;
2006     }
2007   }
2008 
2009   /* create our matrix */
2010   for ( i=0; i<m; i++ ) {
2011     ourlens[i] -= offlens[i];
2012   }
2013   ierr = MatCreateMPIAIJ(comm,m,PETSC_DECIDE,M,N,0,ourlens,0,offlens,newmat);CHKERRQ(ierr);
2014   A = *newmat;
2015   MatSetOption(A,MAT_COLUMNS_SORTED);
2016   for ( i=0; i<m; i++ ) {
2017     ourlens[i] += offlens[i];
2018   }
2019 
2020   if (!rank) {
2021     vals = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(vals);
2022 
2023     /* read in my part of the matrix numerical values  */
2024     nz = procsnz[0];
2025     ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
2026 
2027     /* insert into matrix */
2028     jj      = rstart;
2029     smycols = mycols;
2030     svals   = vals;
2031     for ( i=0; i<m; i++ ) {
2032       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
2033       smycols += ourlens[i];
2034       svals   += ourlens[i];
2035       jj++;
2036     }
2037 
2038     /* read in other processors and ship out */
2039     for ( i=1; i<size; i++ ) {
2040       nz   = procsnz[i];
2041       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
2042       ierr = MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);CHKERRQ(ierr);
2043     }
2044     PetscFree(procsnz);
2045   } else {
2046     /* receive numeric values */
2047     vals = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(vals);
2048 
2049     /* receive message of values*/
2050     ierr = MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);CHKERRQ(ierr);
2051     ierr = MPI_Get_count(&status,MPIU_SCALAR,&maxnz);CHKERRQ(ierr);
2052     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"something is wrong with file");
2053 
2054     /* insert into matrix */
2055     jj      = rstart;
2056     smycols = mycols;
2057     svals   = vals;
2058     for ( i=0; i<m; i++ ) {
2059       ierr     = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
2060       smycols += ourlens[i];
2061       svals   += ourlens[i];
2062       jj++;
2063     }
2064   }
2065   PetscFree(ourlens); PetscFree(vals); PetscFree(mycols); PetscFree(rowners);
2066 
2067   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2068   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2069   PetscFunctionReturn(0);
2070 }
2071 
2072 #undef __FUNC__
2073 #define __FUNC__ "MatGetSubMatrix_MPIAIJ"
2074 /*
2075     Not great since it makes two copies of the submatrix, first an SeqAIJ
2076   in local and then by concatenating the local matrices the end result.
2077   Writing it directly would be much like MatGetSubMatrices_MPIAIJ()
2078 */
2079 int MatGetSubMatrix_MPIAIJ(Mat mat,IS isrow,IS iscol,int csize,MatGetSubMatrixCall call,Mat *newmat)
2080 {
2081   int        ierr, i, m,n,rstart,row,rend,nz,*cwork,size,rank,j;
2082   Mat        *local,M, Mreuse;
2083   Scalar     *vwork,*aa;
2084   MPI_Comm   comm = mat->comm;
2085   Mat_SeqAIJ *aij;
2086   int        *ii, *jj,nlocal,*dlens,*olens,dlen,olen,jend;
2087 
2088   PetscFunctionBegin;
2089   MPI_Comm_rank(comm,&rank);
2090   MPI_Comm_size(comm,&size);
2091 
2092   if (call ==  MAT_REUSE_MATRIX) {
2093     ierr = PetscObjectQuery((PetscObject)*newmat,"SubMatrix",(PetscObject *)&Mreuse);CHKERRQ(ierr);
2094     if (!Mreuse) SETERRQ(1,1,"Submatrix passed in was not used before, cannot reuse");
2095     local = &Mreuse;
2096     ierr  = MatGetSubMatrices(mat,1,&isrow,&iscol,MAT_REUSE_MATRIX,&local);CHKERRQ(ierr);
2097   } else {
2098     ierr = MatGetSubMatrices(mat,1,&isrow,&iscol,MAT_INITIAL_MATRIX,&local);CHKERRQ(ierr);
2099     Mreuse = *local;
2100     PetscFree(local);
2101   }
2102 
2103   /*
2104       m - number of local rows
2105       n - number of columns (same on all processors)
2106       rstart - first row in new global matrix generated
2107   */
2108   ierr = MatGetSize(Mreuse,&m,&n);CHKERRQ(ierr);
2109   if (call == MAT_INITIAL_MATRIX) {
2110     aij = (Mat_SeqAIJ *) (Mreuse)->data;
2111     if (aij->indexshift) SETERRQ(PETSC_ERR_SUP,1,"No support for index shifted matrix");
2112     ii  = aij->i;
2113     jj  = aij->j;
2114 
2115     /*
2116         Determine the number of non-zeros in the diagonal and off-diagonal
2117         portions of the matrix in order to do correct preallocation
2118     */
2119 
2120     /* first get start and end of "diagonal" columns */
2121     if (csize == PETSC_DECIDE) {
2122       nlocal = n/size + ((n % size) > rank);
2123     } else {
2124       nlocal = csize;
2125     }
2126     ierr   = MPI_Scan(&nlocal,&rend,1,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
2127     rstart = rend - nlocal;
2128     if (rank == size - 1 && rend != n) {
2129       SETERRQ(1,1,"Local column sizes do not add up to total number of columns");
2130     }
2131 
2132     /* next, compute all the lengths */
2133     dlens = (int *) PetscMalloc( (2*m+1)*sizeof(int) );CHKPTRQ(dlens);
2134     olens = dlens + m;
2135     for ( i=0; i<m; i++ ) {
2136       jend = ii[i+1] - ii[i];
2137       olen = 0;
2138       dlen = 0;
2139       for ( j=0; j<jend; j++ ) {
2140         if ( *jj < rstart || *jj >= rend) olen++;
2141         else dlen++;
2142         jj++;
2143       }
2144       olens[i] = olen;
2145       dlens[i] = dlen;
2146     }
2147     ierr = MatCreateMPIAIJ(comm,m,PETSC_DECIDE,PETSC_DECIDE,n,0,dlens,0,olens,&M);CHKERRQ(ierr);
2148     PetscFree(dlens);
2149   } else {
2150     int ml,nl;
2151 
2152     M = *newmat;
2153     ierr = MatGetLocalSize(M,&ml,&nl);CHKERRQ(ierr);
2154     if (ml != m) SETERRQ(PETSC_ERR_ARG_SIZ,1,"Previous matrix must be same size/layout as request");
2155     ierr = MatZeroEntries(M);CHKERRQ(ierr);
2156     /*
2157          The next two lines are needed so we may call MatSetValues_MPIAIJ() below directly,
2158        rather than the slower MatSetValues().
2159     */
2160     M->was_assembled = PETSC_TRUE;
2161     M->assembled     = PETSC_FALSE;
2162   }
2163   ierr = MatGetOwnershipRange(M,&rstart,&rend); CHKERRQ(ierr);
2164   aij = (Mat_SeqAIJ *) (Mreuse)->data;
2165   if (aij->indexshift) SETERRQ(PETSC_ERR_SUP,1,"No support for index shifted matrix");
2166   ii  = aij->i;
2167   jj  = aij->j;
2168   aa  = aij->a;
2169   for (i=0; i<m; i++) {
2170     row   = rstart + i;
2171     nz    = ii[i+1] - ii[i];
2172     cwork = jj;     jj += nz;
2173     vwork = aa;     aa += nz;
2174     ierr = MatSetValues_MPIAIJ(M,1,&row,nz,cwork,vwork,INSERT_VALUES); CHKERRQ(ierr);
2175   }
2176 
2177   ierr = MatAssemblyBegin(M,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2178   ierr = MatAssemblyEnd(M,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2179   *newmat = M;
2180 
2181   /* save submatrix used in processor for next request */
2182   if (call ==  MAT_INITIAL_MATRIX) {
2183     ierr = PetscObjectCompose((PetscObject)M,"SubMatrix",(PetscObject)Mreuse);CHKERRQ(ierr);
2184     ierr = PetscObjectDereference((PetscObject)Mreuse);CHKERRQ(ierr);
2185   }
2186 
2187   PetscFunctionReturn(0);
2188 }
2189