xref: /petsc/src/mat/impls/aij/mpi/mpiaij.c (revision c7fcc2eafc245811be4a49a6c2d749b53b1601d3)
1 
2 #ifdef PETSC_RCS_HEADER
3 static char vcid[] = "$Id: mpiaij.c,v 1.254 1998/07/14 02:36:19 bsmith Exp bsmith $";
4 #endif
5 
6 #include "pinclude/pviewer.h"
7 #include "src/mat/impls/aij/mpi/mpiaij.h"
8 #include "src/vec/vecimpl.h"
9 #include "src/inline/spops.h"
10 
11 /* local utility routine that creates a mapping from the global column
12 number to the local number in the off-diagonal part of the local
13 storage of the matrix.  This is done in a non scable way since the
14 length of colmap equals the global matrix length.
15 */
16 #undef __FUNC__
17 #define __FUNC__ "CreateColmap_MPIAIJ_Private"
18 int CreateColmap_MPIAIJ_Private(Mat mat)
19 {
20   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
21   Mat_SeqAIJ *B = (Mat_SeqAIJ*) aij->B->data;
22   int        n = B->n,i;
23 
24   PetscFunctionBegin;
25   aij->colmap = (int *) PetscMalloc((aij->N+1)*sizeof(int));CHKPTRQ(aij->colmap);
26   PLogObjectMemory(mat,aij->N*sizeof(int));
27   PetscMemzero(aij->colmap,aij->N*sizeof(int));
28   for ( i=0; i<n; i++ ) aij->colmap[aij->garray[i]] = i+1;
29   PetscFunctionReturn(0);
30 }
31 
32 extern int DisAssemble_MPIAIJ(Mat);
33 
34 #define CHUNKSIZE   15
35 #define MatSetValues_SeqAIJ_A_Private(row,col,value,addv) \
36 { \
37  \
38     rp   = aj + ai[row] + shift; ap = aa + ai[row] + shift; \
39     rmax = aimax[row]; nrow = ailen[row];  \
40     col1 = col - shift; \
41      \
42     low = 0; high = nrow; \
43     while (high-low > 5) { \
44       t = (low+high)/2; \
45       if (rp[t] > col) high = t; \
46       else             low  = t; \
47     } \
48       for ( _i=0; _i<nrow; _i++ ) { \
49         if (rp[_i] > col1) break; \
50         if (rp[_i] == col1) { \
51           if (addv == ADD_VALUES) ap[_i] += value;   \
52           else                  ap[_i] = value; \
53           goto a_noinsert; \
54         } \
55       }  \
56       if (nonew == 1) goto a_noinsert; \
57       else if (nonew == -1) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero into matrix"); \
58       if (nrow >= rmax) { \
59         /* there is no extra room in row, therefore enlarge */ \
60         int    new_nz = ai[a->m] + CHUNKSIZE,len,*new_i,*new_j; \
61         Scalar *new_a; \
62  \
63         if (nonew == -2) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero in the matrix"); \
64  \
65         /* malloc new storage space */ \
66         len     = new_nz*(sizeof(int)+sizeof(Scalar))+(a->m+1)*sizeof(int); \
67         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
68         new_j   = (int *) (new_a + new_nz); \
69         new_i   = new_j + new_nz; \
70  \
71         /* copy over old data into new slots */ \
72         for ( ii=0; ii<row+1; ii++ ) {new_i[ii] = ai[ii];} \
73         for ( ii=row+1; ii<a->m+1; ii++ ) {new_i[ii] = ai[ii]+CHUNKSIZE;} \
74         PetscMemcpy(new_j,aj,(ai[row]+nrow+shift)*sizeof(int)); \
75         len = (new_nz - CHUNKSIZE - ai[row] - nrow - shift); \
76         PetscMemcpy(new_j+ai[row]+shift+nrow+CHUNKSIZE,aj+ai[row]+shift+nrow, \
77                                                            len*sizeof(int)); \
78         PetscMemcpy(new_a,aa,(ai[row]+nrow+shift)*sizeof(Scalar)); \
79         PetscMemcpy(new_a+ai[row]+shift+nrow+CHUNKSIZE,aa+ai[row]+shift+nrow, \
80                                                            len*sizeof(Scalar));  \
81         /* free up old matrix storage */ \
82  \
83         PetscFree(a->a);  \
84         if (!a->singlemalloc) {PetscFree(a->i);PetscFree(a->j);} \
85         aa = a->a = new_a; ai = a->i = new_i; aj = a->j = new_j;  \
86         a->singlemalloc = 1; \
87  \
88         rp   = aj + ai[row] + shift; ap = aa + ai[row] + shift; \
89         rmax = aimax[row] = aimax[row] + CHUNKSIZE; \
90         PLogObjectMemory(A,CHUNKSIZE*(sizeof(int) + sizeof(Scalar))); \
91         a->maxnz += CHUNKSIZE; \
92         a->reallocs++; \
93       } \
94       N = nrow++ - 1; a->nz++; \
95       /* shift up all the later entries in this row */ \
96       for ( ii=N; ii>=_i; ii-- ) { \
97         rp[ii+1] = rp[ii]; \
98         ap[ii+1] = ap[ii]; \
99       } \
100       rp[_i] = col1;  \
101       ap[_i] = value;  \
102       a_noinsert: ; \
103       ailen[row] = nrow; \
104 }
105 
106 #define MatSetValues_SeqAIJ_B_Private(row,col,value,addv) \
107 { \
108  \
109     rp   = bj + bi[row] + shift; ap = ba + bi[row] + shift; \
110     rmax = bimax[row]; nrow = bilen[row];  \
111     col1 = col - shift; \
112      \
113     low = 0; high = nrow; \
114     while (high-low > 5) { \
115       t = (low+high)/2; \
116       if (rp[t] > col) high = t; \
117       else             low  = t; \
118     } \
119        for ( _i=0; _i<nrow; _i++ ) { \
120         if (rp[_i] > col1) break; \
121         if (rp[_i] == col1) { \
122           if (addv == ADD_VALUES) ap[_i] += value;   \
123           else                  ap[_i] = value; \
124           goto b_noinsert; \
125         } \
126       }  \
127       if (nonew == 1) goto b_noinsert; \
128       else if (nonew == -1) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero into matrix"); \
129       if (nrow >= rmax) { \
130         /* there is no extra room in row, therefore enlarge */ \
131         int    new_nz = bi[b->m] + CHUNKSIZE,len,*new_i,*new_j; \
132         Scalar *new_a; \
133  \
134         if (nonew == -2) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero in the matrix"); \
135  \
136         /* malloc new storage space */ \
137         len     = new_nz*(sizeof(int)+sizeof(Scalar))+(b->m+1)*sizeof(int); \
138         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
139         new_j   = (int *) (new_a + new_nz); \
140         new_i   = new_j + new_nz; \
141  \
142         /* copy over old data into new slots */ \
143         for ( ii=0; ii<row+1; ii++ ) {new_i[ii] = bi[ii];} \
144         for ( ii=row+1; ii<b->m+1; ii++ ) {new_i[ii] = bi[ii]+CHUNKSIZE;} \
145         PetscMemcpy(new_j,bj,(bi[row]+nrow+shift)*sizeof(int)); \
146         len = (new_nz - CHUNKSIZE - bi[row] - nrow - shift); \
147         PetscMemcpy(new_j+bi[row]+shift+nrow+CHUNKSIZE,bj+bi[row]+shift+nrow, \
148                                                            len*sizeof(int)); \
149         PetscMemcpy(new_a,ba,(bi[row]+nrow+shift)*sizeof(Scalar)); \
150         PetscMemcpy(new_a+bi[row]+shift+nrow+CHUNKSIZE,ba+bi[row]+shift+nrow, \
151                                                            len*sizeof(Scalar));  \
152         /* free up old matrix storage */ \
153  \
154         PetscFree(b->a);  \
155         if (!b->singlemalloc) {PetscFree(b->i);PetscFree(b->j);} \
156         ba = b->a = new_a; bi = b->i = new_i; bj = b->j = new_j;  \
157         b->singlemalloc = 1; \
158  \
159         rp   = bj + bi[row] + shift; ap = ba + bi[row] + shift; \
160         rmax = bimax[row] = bimax[row] + CHUNKSIZE; \
161         PLogObjectMemory(B,CHUNKSIZE*(sizeof(int) + sizeof(Scalar))); \
162         b->maxnz += CHUNKSIZE; \
163         b->reallocs++; \
164       } \
165       N = nrow++ - 1; b->nz++; \
166       /* shift up all the later entries in this row */ \
167       for ( ii=N; ii>=_i; ii-- ) { \
168         rp[ii+1] = rp[ii]; \
169         ap[ii+1] = ap[ii]; \
170       } \
171       rp[_i] = col1;  \
172       ap[_i] = value;  \
173       b_noinsert: ; \
174       bilen[row] = nrow; \
175 }
176 
177 extern int MatSetValues_SeqAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
178 #undef __FUNC__
179 #define __FUNC__ "MatSetValues_MPIAIJ"
180 int MatSetValues_MPIAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
181 {
182   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
183   Scalar     value;
184   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
185   int        cstart = aij->cstart, cend = aij->cend,row,col;
186   int        roworiented = aij->roworiented;
187 
188   /* Some Variables required in the macro */
189   Mat        A = aij->A;
190   Mat_SeqAIJ *a = (Mat_SeqAIJ *) A->data;
191   int        *aimax = a->imax, *ai = a->i, *ailen = a->ilen,*aj = a->j;
192   Scalar     *aa = a->a;
193 
194   Mat        B = aij->B;
195   Mat_SeqAIJ *b = (Mat_SeqAIJ *) B->data;
196   int        *bimax = b->imax, *bi = b->i, *bilen = b->ilen,*bj = b->j;
197   Scalar     *ba = b->a;
198 
199   int        *rp,ii,nrow,_i,rmax, N, col1,low,high,t;
200   int        nonew = a->nonew,shift = a->indexshift;
201   Scalar     *ap;
202 
203   PetscFunctionBegin;
204   for ( i=0; i<m; i++ ) {
205 #if defined(USE_PETSC_BOPT_g)
206     if (im[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
207     if (im[i] >= aij->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
208 #endif
209     if (im[i] >= rstart && im[i] < rend) {
210       row = im[i] - rstart;
211       for ( j=0; j<n; j++ ) {
212         if (in[j] >= cstart && in[j] < cend){
213           col = in[j] - cstart;
214           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
215           MatSetValues_SeqAIJ_A_Private(row,col,value,addv);
216           /* ierr = MatSetValues_SeqAIJ(aij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
217         }
218 #if defined(USE_PETSC_BOPT_g)
219         else if (in[j] < 0) {SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative column");}
220         else if (in[j] >= aij->N) {SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Column too large");}
221 #endif
222         else {
223           if (mat->was_assembled) {
224             if (!aij->colmap) {
225               ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
226             }
227             col = aij->colmap[in[j]] - 1;
228             if (col < 0 && !((Mat_SeqAIJ*)(aij->A->data))->nonew) {
229               ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
230               col =  in[j];
231               /* Reinitialize the variables required by MatSetValues_SeqAIJ_B_Private() */
232               B = aij->B;
233               b = (Mat_SeqAIJ *) B->data;
234               bimax = b->imax; bi = b->i; bilen = b->ilen; bj = b->j;
235               ba = b->a;
236             }
237           } else col = in[j];
238           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
239           MatSetValues_SeqAIJ_B_Private(row,col,value,addv);
240           /* ierr = MatSetValues_SeqAIJ(aij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
241         }
242       }
243     }
244     else {
245       if (roworiented && !aij->donotstash) {
246         ierr = StashValues_Private(&aij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
247       }
248       else {
249         if (!aij->donotstash) {
250           row = im[i];
251           for ( j=0; j<n; j++ ) {
252             ierr = StashValues_Private(&aij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
253           }
254         }
255       }
256     }
257   }
258   PetscFunctionReturn(0);
259 }
260 
261 #undef __FUNC__
262 #define __FUNC__ "MatGetValues_MPIAIJ"
263 int MatGetValues_MPIAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
264 {
265   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
266   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
267   int        cstart = aij->cstart, cend = aij->cend,row,col;
268 
269   PetscFunctionBegin;
270   for ( i=0; i<m; i++ ) {
271     if (idxm[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
272     if (idxm[i] >= aij->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
273     if (idxm[i] >= rstart && idxm[i] < rend) {
274       row = idxm[i] - rstart;
275       for ( j=0; j<n; j++ ) {
276         if (idxn[j] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative column");
277         if (idxn[j] >= aij->N) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Column too large");
278         if (idxn[j] >= cstart && idxn[j] < cend){
279           col = idxn[j] - cstart;
280           ierr = MatGetValues(aij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
281         } else {
282           if (!aij->colmap) {
283             ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
284           }
285           col = aij->colmap[idxn[j]] - 1;
286           if ((col < 0) || (aij->garray[col] != idxn[j])) *(v+i*n+j) = 0.0;
287           else {
288             ierr = MatGetValues(aij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
289           }
290         }
291       }
292     } else {
293       SETERRQ(PETSC_ERR_SUP,0,"Only local values currently supported");
294     }
295   }
296   PetscFunctionReturn(0);
297 }
298 
299 #undef __FUNC__
300 #define __FUNC__ "MatAssemblyBegin_MPIAIJ"
301 int MatAssemblyBegin_MPIAIJ(Mat mat,MatAssemblyType mode)
302 {
303   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
304   MPI_Comm    comm = mat->comm;
305   int         size = aij->size, *owners = aij->rowners;
306   int         rank = aij->rank,tag = mat->tag, *owner,*starts,count,ierr;
307   MPI_Request *send_waits,*recv_waits;
308   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
309   InsertMode  addv;
310   Scalar      *rvalues,*svalues;
311 
312   PetscFunctionBegin;
313   if (aij->donotstash) {
314     aij->svalues    = 0; aij->rvalues    = 0;
315     aij->nsends     = 0; aij->nrecvs     = 0;
316     aij->send_waits = 0; aij->recv_waits = 0;
317     aij->rmax       = 0;
318     PetscFunctionReturn(0);
319   }
320 
321   /* make sure all processors are either in INSERTMODE or ADDMODE */
322   ierr = MPI_Allreduce(&mat->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);CHKERRQ(ierr);
323   if (addv == (ADD_VALUES|INSERT_VALUES)) {
324     SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"Some processors inserted others added");
325   }
326   mat->insertmode = addv; /* in case this processor had no cache */
327 
328   /*  first count number of contributors to each processor */
329   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
330   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
331   owner = (int *) PetscMalloc( (aij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
332   for ( i=0; i<aij->stash.n; i++ ) {
333     idx = aij->stash.idx[i];
334     for ( j=0; j<size; j++ ) {
335       if (idx >= owners[j] && idx < owners[j+1]) {
336         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
337       }
338     }
339   }
340   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
341 
342   /* inform other processors of number of messages and max length*/
343   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
344   ierr = MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
345   nreceives = work[rank];
346   ierr = MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);CHKERRQ(ierr);
347   nmax = work[rank];
348   PetscFree(work);
349 
350   /* post receives:
351        1) each message will consist of ordered pairs
352      (global index,value) we store the global index as a double
353      to simplify the message passing.
354        2) since we don't know how long each individual message is we
355      allocate the largest needed buffer for each receive. Potentially
356      this is a lot of wasted space.
357 
358 
359        This could be done better.
360   */
361   rvalues    = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));CHKPTRQ(rvalues);
362   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
363   for ( i=0; i<nreceives; i++ ) {
364     ierr = MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
365               comm,recv_waits+i);CHKERRQ(ierr);
366   }
367 
368   /* do sends:
369       1) starts[i] gives the starting index in svalues for stuff going to
370          the ith processor
371   */
372   svalues    = (Scalar *) PetscMalloc(3*(aij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
373   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
374   starts     = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
375   starts[0]  = 0;
376   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
377   for ( i=0; i<aij->stash.n; i++ ) {
378     svalues[3*starts[owner[i]]]       = (Scalar)  aij->stash.idx[i];
379     svalues[3*starts[owner[i]]+1]     = (Scalar)  aij->stash.idy[i];
380     svalues[3*(starts[owner[i]]++)+2] =  aij->stash.array[i];
381   }
382   PetscFree(owner);
383   starts[0] = 0;
384   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
385   count = 0;
386   for ( i=0; i<size; i++ ) {
387     if (procs[i]) {
388       ierr = MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,comm,send_waits+count++);CHKERRQ(ierr);
389     }
390   }
391   PetscFree(starts);
392   PetscFree(nprocs);
393 
394   /* Free cache space */
395   PLogInfo(aij->A,"MatAssemblyBegin_MPIAIJ:Number of off-processor values %d\n",aij->stash.n);
396   ierr = StashDestroy_Private(&aij->stash); CHKERRQ(ierr);
397 
398   aij->svalues    = svalues;    aij->rvalues    = rvalues;
399   aij->nsends     = nsends;     aij->nrecvs     = nreceives;
400   aij->send_waits = send_waits; aij->recv_waits = recv_waits;
401   aij->rmax       = nmax;
402 
403   PetscFunctionReturn(0);
404 }
405 extern int MatSetUpMultiply_MPIAIJ(Mat);
406 
407 #undef __FUNC__
408 #define __FUNC__ "MatAssemblyEnd_MPIAIJ"
409 int MatAssemblyEnd_MPIAIJ(Mat mat,MatAssemblyType mode)
410 {
411   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
412   MPI_Status  *send_status,recv_status;
413   int         imdex,nrecvs = aij->nrecvs, count = nrecvs, i, n, ierr;
414   int         row,col,other_disassembled;
415   Scalar      *values,val;
416   InsertMode  addv = mat->insertmode;
417 
418   PetscFunctionBegin;
419 
420   /*  wait on receives */
421   while (count) {
422     ierr = MPI_Waitany(nrecvs,aij->recv_waits,&imdex,&recv_status);CHKERRQ(ierr);
423     /* unpack receives into our local space */
424     values = aij->rvalues + 3*imdex*aij->rmax;
425     ierr = MPI_Get_count(&recv_status,MPIU_SCALAR,&n);CHKERRQ(ierr);
426     n = n/3;
427     for ( i=0; i<n; i++ ) {
428       row = (int) PetscReal(values[3*i]) - aij->rstart;
429       col = (int) PetscReal(values[3*i+1]);
430       val = values[3*i+2];
431       if (col >= aij->cstart && col < aij->cend) {
432         col -= aij->cstart;
433         ierr = MatSetValues(aij->A,1,&row,1,&col,&val,addv); CHKERRQ(ierr);
434       } else {
435         if (mat->was_assembled || mat->assembled) {
436           if (!aij->colmap) {
437             ierr = CreateColmap_MPIAIJ_Private(mat); CHKERRQ(ierr);
438           }
439           col = aij->colmap[col] - 1;
440           if (col < 0  && !((Mat_SeqAIJ*)(aij->A->data))->nonew) {
441             ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
442             col = (int) PetscReal(values[3*i+1]);
443           }
444         }
445         ierr = MatSetValues(aij->B,1,&row,1,&col,&val,addv); CHKERRQ(ierr);
446       }
447     }
448     count--;
449   }
450   if (aij->recv_waits) PetscFree(aij->recv_waits);
451   if (aij->rvalues)    PetscFree(aij->rvalues);
452 
453   /* wait on sends */
454   if (aij->nsends) {
455     send_status = (MPI_Status *) PetscMalloc(aij->nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
456     ierr        = MPI_Waitall(aij->nsends,aij->send_waits,send_status);CHKERRQ(ierr);
457     PetscFree(send_status);
458   }
459   if (aij->send_waits) PetscFree(aij->send_waits);
460   if (aij->svalues)    PetscFree(aij->svalues);
461 
462   ierr = MatAssemblyBegin(aij->A,mode); CHKERRQ(ierr);
463   ierr = MatAssemblyEnd(aij->A,mode); CHKERRQ(ierr);
464 
465   /* determine if any processor has disassembled, if so we must
466      also disassemble ourselfs, in order that we may reassemble. */
467   /*
468      if nonzero structure of submatrix B cannot change then we know that
469      no processor disassembled thus we can skip this stuff
470   */
471   if (!((Mat_SeqAIJ*) aij->B->data)->nonew)  {
472     ierr = MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);CHKERRQ(ierr);
473     if (mat->was_assembled && !other_disassembled) {
474       ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
475     }
476   }
477 
478   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
479     ierr = MatSetUpMultiply_MPIAIJ(mat); CHKERRQ(ierr);
480   }
481   ierr = MatAssemblyBegin(aij->B,mode); CHKERRQ(ierr);
482   ierr = MatAssemblyEnd(aij->B,mode); CHKERRQ(ierr);
483 
484   if (aij->rowvalues) {PetscFree(aij->rowvalues); aij->rowvalues = 0;}
485   PetscFunctionReturn(0);
486 }
487 
488 #undef __FUNC__
489 #define __FUNC__ "MatZeroEntries_MPIAIJ"
490 int MatZeroEntries_MPIAIJ(Mat A)
491 {
492   Mat_MPIAIJ *l = (Mat_MPIAIJ *) A->data;
493   int        ierr;
494 
495   PetscFunctionBegin;
496   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
497   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
498   PetscFunctionReturn(0);
499 }
500 
501 #undef __FUNC__
502 #define __FUNC__ "MatZeroRows_MPIAIJ"
503 int MatZeroRows_MPIAIJ(Mat A,IS is,Scalar *diag)
504 {
505   Mat_MPIAIJ     *l = (Mat_MPIAIJ *) A->data;
506   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
507   int            *procs,*nprocs,j,found,idx,nsends,*work,row;
508   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
509   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
510   int            *lens,imdex,*lrows,*values,rstart=l->rstart;
511   MPI_Comm       comm = A->comm;
512   MPI_Request    *send_waits,*recv_waits;
513   MPI_Status     recv_status,*send_status;
514   IS             istmp;
515   PetscTruth     localdiag;
516 
517   PetscFunctionBegin;
518   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
519   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
520 
521   /*  first count number of contributors to each processor */
522   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
523   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
524   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
525   for ( i=0; i<N; i++ ) {
526     idx = rows[i];
527     found = 0;
528     for ( j=0; j<size; j++ ) {
529       if (idx >= owners[j] && idx < owners[j+1]) {
530         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
531       }
532     }
533     if (!found) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Index out of range");
534   }
535   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
536 
537   /* inform other processors of number of messages and max length*/
538   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
539   ierr = MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
540   nrecvs = work[rank];
541   ierr = MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);CHKERRQ(ierr);
542   nmax = work[rank];
543   PetscFree(work);
544 
545   /* post receives:   */
546   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int));CHKPTRQ(rvalues);
547   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
548   for ( i=0; i<nrecvs; i++ ) {
549     ierr = MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);CHKERRQ(ierr);
550   }
551 
552   /* do sends:
553       1) starts[i] gives the starting index in svalues for stuff going to
554          the ith processor
555   */
556   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
557   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
558   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
559   starts[0] = 0;
560   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
561   for ( i=0; i<N; i++ ) {
562     svalues[starts[owner[i]]++] = rows[i];
563   }
564   ISRestoreIndices(is,&rows);
565 
566   starts[0] = 0;
567   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
568   count = 0;
569   for ( i=0; i<size; i++ ) {
570     if (procs[i]) {
571       ierr = MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);CHKERRQ(ierr);
572     }
573   }
574   PetscFree(starts);
575 
576   base = owners[rank];
577 
578   /*  wait on receives */
579   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
580   source = lens + nrecvs;
581   count  = nrecvs; slen = 0;
582   while (count) {
583     ierr = MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);CHKERRQ(ierr);
584     /* unpack receives into our local space */
585     ierr = MPI_Get_count(&recv_status,MPI_INT,&n);CHKERRQ(ierr);
586     source[imdex]  = recv_status.MPI_SOURCE;
587     lens[imdex]  = n;
588     slen += n;
589     count--;
590   }
591   PetscFree(recv_waits);
592 
593   /* move the data into the send scatter */
594   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
595   count = 0;
596   for ( i=0; i<nrecvs; i++ ) {
597     values = rvalues + i*nmax;
598     for ( j=0; j<lens[i]; j++ ) {
599       lrows[count++] = values[j] - base;
600     }
601   }
602   PetscFree(rvalues); PetscFree(lens);
603   PetscFree(owner); PetscFree(nprocs);
604 
605   /* actually zap the local rows */
606   ierr = ISCreateGeneral(PETSC_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
607   PLogObjectParent(A,istmp);
608 
609   /*
610         Zero the required rows. If the "diagonal block" of the matrix
611      is square and the user wishes to set the diagonal we use seperate
612      code so that MatSetValues() is not called for each diagonal allocating
613      new memory, thus calling lots of mallocs and slowing things down.
614 
615        Contributed by: Mathew Knepley
616   */
617   localdiag = PETSC_FALSE;
618   if (diag && (l->A->M == l->A->N)) {
619     localdiag = PETSC_TRUE;
620     ierr      = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
621   } else {
622     ierr = MatZeroRows(l->A,istmp,0); CHKERRQ(ierr);
623   }
624   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
625   ierr = ISDestroy(istmp); CHKERRQ(ierr);
626 
627   if (diag && (localdiag == PETSC_FALSE)) {
628     for ( i = 0; i < slen; i++ ) {
629       row = lrows[i] + rstart;
630       MatSetValues(A,1,&row,1,&row,diag,INSERT_VALUES);
631     }
632     MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY);
633     MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY);
634   }
635 
636   if (diag) {
637     for ( i = 0; i < slen; i++ ) {
638       row = lrows[i] + rstart;
639       MatSetValues(A,1,&row,1,&row,diag,INSERT_VALUES);
640     }
641     MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY);
642     MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY);
643   }
644   PetscFree(lrows);
645 
646   /* wait on sends */
647   if (nsends) {
648     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
649     ierr        = MPI_Waitall(nsends,send_waits,send_status);CHKERRQ(ierr);
650     PetscFree(send_status);
651   }
652   PetscFree(send_waits); PetscFree(svalues);
653 
654   PetscFunctionReturn(0);
655 }
656 
657 #undef __FUNC__
658 #define __FUNC__ "MatMult_MPIAIJ"
659 int MatMult_MPIAIJ(Mat A,Vec xx,Vec yy)
660 {
661   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
662   int        ierr,nt;
663 
664   PetscFunctionBegin;
665   ierr = VecGetLocalSize(xx,&nt);  CHKERRQ(ierr);
666   if (nt != a->n) {
667     SETERRQ(PETSC_ERR_ARG_SIZ,0,"Incompatible partition of A and xx");
668   }
669   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx); CHKERRQ(ierr);
670   ierr = (*a->A->ops->mult)(a->A,xx,yy); CHKERRQ(ierr);
671   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx); CHKERRQ(ierr);
672   ierr = (*a->B->ops->multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
673   PetscFunctionReturn(0);
674 }
675 
676 #undef __FUNC__
677 #define __FUNC__ "MatMultAdd_MPIAIJ"
678 int MatMultAdd_MPIAIJ(Mat A,Vec xx,Vec yy,Vec zz)
679 {
680   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
681   int        ierr;
682 
683   PetscFunctionBegin;
684   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
685   ierr = (*a->A->ops->multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
686   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
687   ierr = (*a->B->ops->multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
688   PetscFunctionReturn(0);
689 }
690 
691 #undef __FUNC__
692 #define __FUNC__ "MatMultTrans_MPIAIJ"
693 int MatMultTrans_MPIAIJ(Mat A,Vec xx,Vec yy)
694 {
695   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
696   int        ierr;
697 
698   PetscFunctionBegin;
699   /* do nondiagonal part */
700   ierr = (*a->B->ops->multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
701   /* send it on its way */
702   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
703   /* do local part */
704   ierr = (*a->A->ops->multtrans)(a->A,xx,yy); CHKERRQ(ierr);
705   /* receive remote parts: note this assumes the values are not actually */
706   /* inserted in yy until the next line, which is true for my implementation*/
707   /* but is not perhaps always true. */
708   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
709   PetscFunctionReturn(0);
710 }
711 
712 #undef __FUNC__
713 #define __FUNC__ "MatMultTransAdd_MPIAIJ"
714 int MatMultTransAdd_MPIAIJ(Mat A,Vec xx,Vec yy,Vec zz)
715 {
716   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
717   int        ierr;
718 
719   PetscFunctionBegin;
720   /* do nondiagonal part */
721   ierr = (*a->B->ops->multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
722   /* send it on its way */
723   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
724   /* do local part */
725   ierr = (*a->A->ops->multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
726   /* receive remote parts: note this assumes the values are not actually */
727   /* inserted in yy until the next line, which is true for my implementation*/
728   /* but is not perhaps always true. */
729   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
730   PetscFunctionReturn(0);
731 }
732 
733 /*
734   This only works correctly for square matrices where the subblock A->A is the
735    diagonal block
736 */
737 #undef __FUNC__
738 #define __FUNC__ "MatGetDiagonal_MPIAIJ"
739 int MatGetDiagonal_MPIAIJ(Mat A,Vec v)
740 {
741   int        ierr;
742   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
743 
744   PetscFunctionBegin;
745   if (a->M != a->N) SETERRQ(PETSC_ERR_SUP,0,"Supports only square matrix where A->A is diag block");
746   if (a->rstart != a->cstart || a->rend != a->cend) {
747     SETERRQ(PETSC_ERR_ARG_SIZ,0,"row partition must equal col partition");
748   }
749   ierr = MatGetDiagonal(a->A,v);CHKERRQ(ierr);
750   PetscFunctionReturn(0);
751 }
752 
753 #undef __FUNC__
754 #define __FUNC__ "MatScale_MPIAIJ"
755 int MatScale_MPIAIJ(Scalar *aa,Mat A)
756 {
757   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
758   int        ierr;
759 
760   PetscFunctionBegin;
761   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
762   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
763   PetscFunctionReturn(0);
764 }
765 
766 #undef __FUNC__
767 #define __FUNC__ "MatDestroy_MPIAIJ"
768 int MatDestroy_MPIAIJ(Mat mat)
769 {
770   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
771   int        ierr;
772 
773   PetscFunctionBegin;
774   if (--mat->refct > 0) PetscFunctionReturn(0);
775 
776   if (mat->mapping) {
777     ierr = ISLocalToGlobalMappingDestroy(mat->mapping); CHKERRQ(ierr);
778   }
779   if (mat->bmapping) {
780     ierr = ISLocalToGlobalMappingDestroy(mat->bmapping); CHKERRQ(ierr);
781   }
782   if (mat->rmap) {
783     ierr = MapDestroy(mat->rmap);CHKERRQ(ierr);
784   }
785   if (mat->cmap) {
786     ierr = MapDestroy(mat->cmap);CHKERRQ(ierr);
787   }
788 #if defined(USE_PETSC_LOG)
789   PLogObjectState((PetscObject)mat,"Rows=%d, Cols=%d",aij->M,aij->N);
790 #endif
791   ierr = StashDestroy_Private(&aij->stash); CHKERRQ(ierr);
792   PetscFree(aij->rowners);
793   ierr = MatDestroy(aij->A); CHKERRQ(ierr);
794   ierr = MatDestroy(aij->B); CHKERRQ(ierr);
795   if (aij->colmap) PetscFree(aij->colmap);
796   if (aij->garray) PetscFree(aij->garray);
797   if (aij->lvec)   VecDestroy(aij->lvec);
798   if (aij->Mvctx)  VecScatterDestroy(aij->Mvctx);
799   if (aij->rowvalues) PetscFree(aij->rowvalues);
800   PetscFree(aij);
801   PLogObjectDestroy(mat);
802   PetscHeaderDestroy(mat);
803   PetscFunctionReturn(0);
804 }
805 
806 #undef __FUNC__
807 #define __FUNC__ "MatView_MPIAIJ_Binary"
808 extern int MatView_MPIAIJ_Binary(Mat mat,Viewer viewer)
809 {
810   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
811   int         ierr;
812 
813   PetscFunctionBegin;
814   if (aij->size == 1) {
815     ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
816   }
817   else SETERRQ(PETSC_ERR_SUP,0,"Only uniprocessor output supported");
818   PetscFunctionReturn(0);
819 }
820 
821 #undef __FUNC__
822 #define __FUNC__ "MatView_MPIAIJ_ASCIIorDraworMatlab"
823 extern int MatView_MPIAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
824 {
825   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
826   Mat_SeqAIJ* C = (Mat_SeqAIJ*)aij->A->data;
827   int         ierr, format,shift = C->indexshift,rank;
828   FILE        *fd;
829   ViewerType  vtype;
830 
831   PetscFunctionBegin;
832   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
833   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
834     ierr = ViewerGetFormat(viewer,&format);
835     if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
836       MatInfo info;
837       int     flg;
838       MPI_Comm_rank(mat->comm,&rank);
839       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
840       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
841       ierr = OptionsHasName(PETSC_NULL,"-mat_aij_no_inode",&flg); CHKERRQ(ierr);
842       PetscSequentialPhaseBegin(mat->comm,1);
843       if (flg) fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d mem %d, not using I-node routines\n",
844          rank,aij->m,(int)info.nz_used,(int)info.nz_allocated,(int)info.memory);
845       else fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d mem %d, using I-node routines\n",
846          rank,aij->m,(int)info.nz_used,(int)info.nz_allocated,(int)info.memory);
847       ierr = MatGetInfo(aij->A,MAT_LOCAL,&info);
848       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used);
849       ierr = MatGetInfo(aij->B,MAT_LOCAL,&info);
850       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used);
851       fflush(fd);
852       PetscSequentialPhaseEnd(mat->comm,1);
853       ierr = VecScatterView(aij->Mvctx,viewer); CHKERRQ(ierr);
854       PetscFunctionReturn(0);
855     } else if (format == VIEWER_FORMAT_ASCII_INFO) {
856       PetscFunctionReturn(0);
857     }
858   }
859 
860   if (vtype == DRAW_VIEWER) {
861     Draw       draw;
862     PetscTruth isnull;
863     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
864     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) PetscFunctionReturn(0);
865   }
866 
867   if (vtype == ASCII_FILE_VIEWER) {
868     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
869     PetscSequentialPhaseBegin(mat->comm,1);
870     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
871            aij->rank,aij->m,aij->rstart,aij->rend,aij->n,aij->cstart,
872            aij->cend);
873     ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
874     ierr = MatView(aij->B,viewer); CHKERRQ(ierr);
875     fflush(fd);
876     PetscSequentialPhaseEnd(mat->comm,1);
877   } else {
878     int size = aij->size;
879     rank = aij->rank;
880     if (size == 1) {
881       ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
882     } else {
883       /* assemble the entire matrix onto first processor. */
884       Mat         A;
885       Mat_SeqAIJ *Aloc;
886       int         M = aij->M, N = aij->N,m,*ai,*aj,row,*cols,i,*ct;
887       Scalar      *a;
888 
889       if (!rank) {
890         ierr = MatCreateMPIAIJ(mat->comm,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);CHKERRQ(ierr);
891       } else {
892         ierr = MatCreateMPIAIJ(mat->comm,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);CHKERRQ(ierr);
893       }
894       PLogObjectParent(mat,A);
895 
896       /* copy over the A part */
897       Aloc = (Mat_SeqAIJ*) aij->A->data;
898       m = Aloc->m; ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
899       row = aij->rstart;
900       for ( i=0; i<ai[m]+shift; i++ ) {aj[i] += aij->cstart + shift;}
901       for ( i=0; i<m; i++ ) {
902         ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],aj,a,INSERT_VALUES);CHKERRQ(ierr);
903         row++; a += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
904       }
905       aj = Aloc->j;
906       for ( i=0; i<ai[m]+shift; i++ ) {aj[i] -= aij->cstart + shift;}
907 
908       /* copy over the B part */
909       Aloc = (Mat_SeqAIJ*) aij->B->data;
910       m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
911       row = aij->rstart;
912       ct = cols = (int *) PetscMalloc( (ai[m]+1)*sizeof(int) ); CHKPTRQ(cols);
913       for ( i=0; i<ai[m]+shift; i++ ) {cols[i] = aij->garray[aj[i]+shift];}
914       for ( i=0; i<m; i++ ) {
915         ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],cols,a,INSERT_VALUES);CHKERRQ(ierr);
916         row++; a += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
917       }
918       PetscFree(ct);
919       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
920       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
921       /*
922          Everyone has to call to draw the matrix since the graphics waits are
923          synchronized across all processors that share the Draw object
924       */
925       if (!rank || vtype == DRAW_VIEWER) {
926         ierr = MatView(((Mat_MPIAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
927       }
928       ierr = MatDestroy(A); CHKERRQ(ierr);
929     }
930   }
931   PetscFunctionReturn(0);
932 }
933 
934 #undef __FUNC__
935 #define __FUNC__ "MatView_MPIAIJ"
936 int MatView_MPIAIJ(Mat mat,Viewer viewer)
937 {
938   int         ierr;
939   ViewerType  vtype;
940 
941   PetscFunctionBegin;
942   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
943   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
944       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
945     ierr = MatView_MPIAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
946   } else if (vtype == BINARY_FILE_VIEWER) {
947     ierr = MatView_MPIAIJ_Binary(mat,viewer);CHKERRQ(ierr);
948   } else {
949     SETERRQ(1,1,"Viewer type not supported by PETSc object");
950   }
951   PetscFunctionReturn(0);
952 }
953 
954 /*
955     This has to provide several versions.
956 
957      2) a) use only local smoothing updating outer values only once.
958         b) local smoothing updating outer values each inner iteration
959      3) color updating out values betwen colors.
960 */
961 #undef __FUNC__
962 #define __FUNC__ "MatRelax_MPIAIJ"
963 int MatRelax_MPIAIJ(Mat matin,Vec bb,double omega,MatSORType flag,
964                            double fshift,int its,Vec xx)
965 {
966   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
967   Mat        AA = mat->A, BB = mat->B;
968   Mat_SeqAIJ *A = (Mat_SeqAIJ *) AA->data, *B = (Mat_SeqAIJ *)BB->data;
969   Scalar     *b,*x,*xs,*ls,d,*v,sum;
970   int        ierr,*idx, *diag;
971   int        n = mat->n, m = mat->m, i,shift = A->indexshift;
972 
973   PetscFunctionBegin;
974   VecGetArray(xx,&x); VecGetArray(bb,&b); VecGetArray(mat->lvec,&ls);
975   xs = x + shift; /* shift by one for index start of 1 */
976   ls = ls + shift;
977   if (!A->diag) {ierr = MatMarkDiag_SeqAIJ(AA); CHKERRQ(ierr);}
978   diag = A->diag;
979   if ((flag & SOR_LOCAL_SYMMETRIC_SWEEP) == SOR_LOCAL_SYMMETRIC_SWEEP){
980     if (flag & SOR_ZERO_INITIAL_GUESS) {
981       ierr = (*mat->A->ops->relax)(mat->A,bb,omega,flag,fshift,its,xx);CHKERRQ(ierr);
982       PetscFunctionReturn(0);
983     }
984     ierr=VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);CHKERRQ(ierr);
985     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);CHKERRQ(ierr);
986     while (its--) {
987       /* go down through the rows */
988       for ( i=0; i<m; i++ ) {
989         n    = A->i[i+1] - A->i[i];
990         idx  = A->j + A->i[i] + shift;
991         v    = A->a + A->i[i] + shift;
992         sum  = b[i];
993         SPARSEDENSEMDOT(sum,xs,v,idx,n);
994         d    = fshift + A->a[diag[i]+shift];
995         n    = B->i[i+1] - B->i[i];
996         idx  = B->j + B->i[i] + shift;
997         v    = B->a + B->i[i] + shift;
998         SPARSEDENSEMDOT(sum,ls,v,idx,n);
999         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
1000       }
1001       /* come up through the rows */
1002       for ( i=m-1; i>-1; i-- ) {
1003         n    = A->i[i+1] - A->i[i];
1004         idx  = A->j + A->i[i] + shift;
1005         v    = A->a + A->i[i] + shift;
1006         sum  = b[i];
1007         SPARSEDENSEMDOT(sum,xs,v,idx,n);
1008         d    = fshift + A->a[diag[i]+shift];
1009         n    = B->i[i+1] - B->i[i];
1010         idx  = B->j + B->i[i] + shift;
1011         v    = B->a + B->i[i] + shift;
1012         SPARSEDENSEMDOT(sum,ls,v,idx,n);
1013         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
1014       }
1015     }
1016   } else if (flag & SOR_LOCAL_FORWARD_SWEEP){
1017     if (flag & SOR_ZERO_INITIAL_GUESS) {
1018       ierr = (*mat->A->ops->relax)(mat->A,bb,omega,flag,fshift,its,xx);CHKERRQ(ierr);
1019       PetscFunctionReturn(0);
1020     }
1021     ierr=VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);CHKERRQ(ierr);
1022     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);CHKERRQ(ierr);
1023     while (its--) {
1024       for ( i=0; i<m; i++ ) {
1025         n    = A->i[i+1] - A->i[i];
1026         idx  = A->j + A->i[i] + shift;
1027         v    = A->a + A->i[i] + shift;
1028         sum  = b[i];
1029         SPARSEDENSEMDOT(sum,xs,v,idx,n);
1030         d    = fshift + A->a[diag[i]+shift];
1031         n    = B->i[i+1] - B->i[i];
1032         idx  = B->j + B->i[i] + shift;
1033         v    = B->a + B->i[i] + shift;
1034         SPARSEDENSEMDOT(sum,ls,v,idx,n);
1035         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
1036       }
1037     }
1038   } else if (flag & SOR_LOCAL_BACKWARD_SWEEP){
1039     if (flag & SOR_ZERO_INITIAL_GUESS) {
1040       ierr = (*mat->A->ops->relax)(mat->A,bb,omega,flag,fshift,its,xx);CHKERRQ(ierr);
1041       PetscFunctionReturn(0);
1042     }
1043     ierr = VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,
1044                             mat->Mvctx); CHKERRQ(ierr);
1045     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,
1046                             mat->Mvctx); CHKERRQ(ierr);
1047     while (its--) {
1048       for ( i=m-1; i>-1; i-- ) {
1049         n    = A->i[i+1] - A->i[i];
1050         idx  = A->j + A->i[i] + shift;
1051         v    = A->a + A->i[i] + shift;
1052         sum  = b[i];
1053         SPARSEDENSEMDOT(sum,xs,v,idx,n);
1054         d    = fshift + A->a[diag[i]+shift];
1055         n    = B->i[i+1] - B->i[i];
1056         idx  = B->j + B->i[i] + shift;
1057         v    = B->a + B->i[i] + shift;
1058         SPARSEDENSEMDOT(sum,ls,v,idx,n);
1059         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
1060       }
1061     }
1062   } else {
1063     SETERRQ(PETSC_ERR_SUP,0,"Parallel SOR not supported");
1064   }
1065   PetscFunctionReturn(0);
1066 }
1067 
1068 #undef __FUNC__
1069 #define __FUNC__ "MatGetInfo_MPIAIJ"
1070 int MatGetInfo_MPIAIJ(Mat matin,MatInfoType flag,MatInfo *info)
1071 {
1072   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1073   Mat        A = mat->A, B = mat->B;
1074   int        ierr;
1075   double     isend[5], irecv[5];
1076 
1077   PetscFunctionBegin;
1078   info->block_size     = 1.0;
1079   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
1080   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->nz_unneeded;
1081   isend[3] = info->memory;  isend[4] = info->mallocs;
1082   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
1083   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->nz_unneeded;
1084   isend[3] += info->memory;  isend[4] += info->mallocs;
1085   if (flag == MAT_LOCAL) {
1086     info->nz_used      = isend[0];
1087     info->nz_allocated = isend[1];
1088     info->nz_unneeded  = isend[2];
1089     info->memory       = isend[3];
1090     info->mallocs      = isend[4];
1091   } else if (flag == MAT_GLOBAL_MAX) {
1092     ierr = MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_MAX,matin->comm);CHKERRQ(ierr);
1093     info->nz_used      = irecv[0];
1094     info->nz_allocated = irecv[1];
1095     info->nz_unneeded  = irecv[2];
1096     info->memory       = irecv[3];
1097     info->mallocs      = irecv[4];
1098   } else if (flag == MAT_GLOBAL_SUM) {
1099     ierr = MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_SUM,matin->comm);CHKERRQ(ierr);
1100     info->nz_used      = irecv[0];
1101     info->nz_allocated = irecv[1];
1102     info->nz_unneeded  = irecv[2];
1103     info->memory       = irecv[3];
1104     info->mallocs      = irecv[4];
1105   }
1106   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
1107   info->fill_ratio_needed = 0;
1108   info->factor_mallocs    = 0;
1109   info->rows_global       = (double)mat->M;
1110   info->columns_global    = (double)mat->N;
1111   info->rows_local        = (double)mat->m;
1112   info->columns_local     = (double)mat->N;
1113 
1114   PetscFunctionReturn(0);
1115 }
1116 
1117 #undef __FUNC__
1118 #define __FUNC__ "MatSetOption_MPIAIJ"
1119 int MatSetOption_MPIAIJ(Mat A,MatOption op)
1120 {
1121   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
1122 
1123   PetscFunctionBegin;
1124   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
1125       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
1126       op == MAT_COLUMNS_UNSORTED ||
1127       op == MAT_COLUMNS_SORTED ||
1128       op == MAT_NEW_NONZERO_ALLOCATION_ERROR ||
1129       op == MAT_NEW_NONZERO_LOCATION_ERROR) {
1130         MatSetOption(a->A,op);
1131         MatSetOption(a->B,op);
1132   } else if (op == MAT_ROW_ORIENTED) {
1133     a->roworiented = 1;
1134     MatSetOption(a->A,op);
1135     MatSetOption(a->B,op);
1136   } else if (op == MAT_ROWS_SORTED ||
1137              op == MAT_ROWS_UNSORTED ||
1138              op == MAT_SYMMETRIC ||
1139              op == MAT_STRUCTURALLY_SYMMETRIC ||
1140              op == MAT_YES_NEW_DIAGONALS)
1141     PLogInfo(A,"MatSetOption_MPIAIJ:Option ignored\n");
1142   else if (op == MAT_COLUMN_ORIENTED) {
1143     a->roworiented = 0;
1144     MatSetOption(a->A,op);
1145     MatSetOption(a->B,op);
1146   } else if (op == MAT_IGNORE_OFF_PROC_ENTRIES) {
1147     a->donotstash = 1;
1148   } else if (op == MAT_NO_NEW_DIAGONALS){
1149     SETERRQ(PETSC_ERR_SUP,0,"MAT_NO_NEW_DIAGONALS");
1150   } else {
1151     SETERRQ(PETSC_ERR_SUP,0,"unknown option");
1152   }
1153   PetscFunctionReturn(0);
1154 }
1155 
1156 #undef __FUNC__
1157 #define __FUNC__ "MatGetSize_MPIAIJ"
1158 int MatGetSize_MPIAIJ(Mat matin,int *m,int *n)
1159 {
1160   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1161 
1162   PetscFunctionBegin;
1163   if (m) *m = mat->M;
1164   if (n) *n = mat->N;
1165   PetscFunctionReturn(0);
1166 }
1167 
1168 #undef __FUNC__
1169 #define __FUNC__ "MatGetLocalSize_MPIAIJ"
1170 int MatGetLocalSize_MPIAIJ(Mat matin,int *m,int *n)
1171 {
1172   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1173 
1174   PetscFunctionBegin;
1175   if (m) *m = mat->m;
1176   if (n) *n = mat->n;
1177   PetscFunctionReturn(0);
1178 }
1179 
1180 #undef __FUNC__
1181 #define __FUNC__ "MatGetOwnershipRange_MPIAIJ"
1182 int MatGetOwnershipRange_MPIAIJ(Mat matin,int *m,int *n)
1183 {
1184   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1185 
1186   PetscFunctionBegin;
1187   *m = mat->rstart; *n = mat->rend;
1188   PetscFunctionReturn(0);
1189 }
1190 
1191 extern int MatGetRow_SeqAIJ(Mat,int,int*,int**,Scalar**);
1192 extern int MatRestoreRow_SeqAIJ(Mat,int,int*,int**,Scalar**);
1193 
1194 #undef __FUNC__
1195 #define __FUNC__ "MatGetRow_MPIAIJ"
1196 int MatGetRow_MPIAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
1197 {
1198   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1199   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
1200   int        i, ierr, *cworkA, *cworkB, **pcA, **pcB, cstart = mat->cstart;
1201   int        nztot, nzA, nzB, lrow, rstart = mat->rstart, rend = mat->rend;
1202   int        *cmap, *idx_p;
1203 
1204   PetscFunctionBegin;
1205   if (mat->getrowactive == PETSC_TRUE) SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"Already active");
1206   mat->getrowactive = PETSC_TRUE;
1207 
1208   if (!mat->rowvalues && (idx || v)) {
1209     /*
1210         allocate enough space to hold information from the longest row.
1211     */
1212     Mat_SeqAIJ *Aa = (Mat_SeqAIJ *) mat->A->data,*Ba = (Mat_SeqAIJ *) mat->B->data;
1213     int     max = 1,m = mat->m,tmp;
1214     for ( i=0; i<m; i++ ) {
1215       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
1216       if (max < tmp) { max = tmp; }
1217     }
1218     mat->rowvalues = (Scalar *) PetscMalloc( max*(sizeof(int)+sizeof(Scalar)));
1219     CHKPTRQ(mat->rowvalues);
1220     mat->rowindices = (int *) (mat->rowvalues + max);
1221   }
1222 
1223   if (row < rstart || row >= rend) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Only local rows")
1224   lrow = row - rstart;
1225 
1226   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
1227   if (!v)   {pvA = 0; pvB = 0;}
1228   if (!idx) {pcA = 0; if (!v) pcB = 0;}
1229   ierr = (*mat->A->ops->getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1230   ierr = (*mat->B->ops->getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1231   nztot = nzA + nzB;
1232 
1233   cmap  = mat->garray;
1234   if (v  || idx) {
1235     if (nztot) {
1236       /* Sort by increasing column numbers, assuming A and B already sorted */
1237       int imark = -1;
1238       if (v) {
1239         *v = v_p = mat->rowvalues;
1240         for ( i=0; i<nzB; i++ ) {
1241           if (cmap[cworkB[i]] < cstart)   v_p[i] = vworkB[i];
1242           else break;
1243         }
1244         imark = i;
1245         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
1246         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
1247       }
1248       if (idx) {
1249         *idx = idx_p = mat->rowindices;
1250         if (imark > -1) {
1251           for ( i=0; i<imark; i++ ) {
1252             idx_p[i] = cmap[cworkB[i]];
1253           }
1254         } else {
1255           for ( i=0; i<nzB; i++ ) {
1256             if (cmap[cworkB[i]] < cstart)   idx_p[i] = cmap[cworkB[i]];
1257             else break;
1258           }
1259           imark = i;
1260         }
1261         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart + cworkA[i];
1262         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]];
1263       }
1264     }
1265     else {
1266       if (idx) *idx = 0;
1267       if (v)   *v   = 0;
1268     }
1269   }
1270   *nz = nztot;
1271   ierr = (*mat->A->ops->restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1272   ierr = (*mat->B->ops->restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1273   PetscFunctionReturn(0);
1274 }
1275 
1276 #undef __FUNC__
1277 #define __FUNC__ "MatRestoreRow_MPIAIJ"
1278 int MatRestoreRow_MPIAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
1279 {
1280   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
1281 
1282   PetscFunctionBegin;
1283   if (aij->getrowactive == PETSC_FALSE) {
1284     SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"MatGetRow not called");
1285   }
1286   aij->getrowactive = PETSC_FALSE;
1287   PetscFunctionReturn(0);
1288 }
1289 
1290 #undef __FUNC__
1291 #define __FUNC__ "MatNorm_MPIAIJ"
1292 int MatNorm_MPIAIJ(Mat mat,NormType type,double *norm)
1293 {
1294   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
1295   Mat_SeqAIJ *amat = (Mat_SeqAIJ*) aij->A->data, *bmat = (Mat_SeqAIJ*) aij->B->data;
1296   int        ierr, i, j, cstart = aij->cstart,shift = amat->indexshift;
1297   double     sum = 0.0;
1298   Scalar     *v;
1299 
1300   PetscFunctionBegin;
1301   if (aij->size == 1) {
1302     ierr =  MatNorm(aij->A,type,norm); CHKERRQ(ierr);
1303   } else {
1304     if (type == NORM_FROBENIUS) {
1305       v = amat->a;
1306       for (i=0; i<amat->nz; i++ ) {
1307 #if defined(USE_PETSC_COMPLEX)
1308         sum += PetscReal(PetscConj(*v)*(*v)); v++;
1309 #else
1310         sum += (*v)*(*v); v++;
1311 #endif
1312       }
1313       v = bmat->a;
1314       for (i=0; i<bmat->nz; i++ ) {
1315 #if defined(USE_PETSC_COMPLEX)
1316         sum += PetscReal(PetscConj(*v)*(*v)); v++;
1317 #else
1318         sum += (*v)*(*v); v++;
1319 #endif
1320       }
1321       ierr = MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);CHKERRQ(ierr);
1322       *norm = sqrt(*norm);
1323     } else if (type == NORM_1) { /* max column norm */
1324       double *tmp, *tmp2;
1325       int    *jj, *garray = aij->garray;
1326       tmp  = (double *) PetscMalloc( (aij->N+1)*sizeof(double) ); CHKPTRQ(tmp);
1327       tmp2 = (double *) PetscMalloc( (aij->N+1)*sizeof(double) ); CHKPTRQ(tmp2);
1328       PetscMemzero(tmp,aij->N*sizeof(double));
1329       *norm = 0.0;
1330       v = amat->a; jj = amat->j;
1331       for ( j=0; j<amat->nz; j++ ) {
1332         tmp[cstart + *jj++ + shift] += PetscAbsScalar(*v);  v++;
1333       }
1334       v = bmat->a; jj = bmat->j;
1335       for ( j=0; j<bmat->nz; j++ ) {
1336         tmp[garray[*jj++ + shift]] += PetscAbsScalar(*v); v++;
1337       }
1338       ierr = MPI_Allreduce(tmp,tmp2,aij->N,MPI_DOUBLE,MPI_SUM,mat->comm);CHKERRQ(ierr);
1339       for ( j=0; j<aij->N; j++ ) {
1340         if (tmp2[j] > *norm) *norm = tmp2[j];
1341       }
1342       PetscFree(tmp); PetscFree(tmp2);
1343     } else if (type == NORM_INFINITY) { /* max row norm */
1344       double ntemp = 0.0;
1345       for ( j=0; j<amat->m; j++ ) {
1346         v = amat->a + amat->i[j] + shift;
1347         sum = 0.0;
1348         for ( i=0; i<amat->i[j+1]-amat->i[j]; i++ ) {
1349           sum += PetscAbsScalar(*v); v++;
1350         }
1351         v = bmat->a + bmat->i[j] + shift;
1352         for ( i=0; i<bmat->i[j+1]-bmat->i[j]; i++ ) {
1353           sum += PetscAbsScalar(*v); v++;
1354         }
1355         if (sum > ntemp) ntemp = sum;
1356       }
1357       ierr = MPI_Allreduce(&ntemp,norm,1,MPI_DOUBLE,MPI_MAX,mat->comm);CHKERRQ(ierr);
1358     } else {
1359       SETERRQ(PETSC_ERR_SUP,0,"No support for two norm");
1360     }
1361   }
1362   PetscFunctionReturn(0);
1363 }
1364 
1365 #undef __FUNC__
1366 #define __FUNC__ "MatTranspose_MPIAIJ"
1367 int MatTranspose_MPIAIJ(Mat A,Mat *matout)
1368 {
1369   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
1370   Mat_SeqAIJ *Aloc = (Mat_SeqAIJ *) a->A->data;
1371   int        ierr,shift = Aloc->indexshift;
1372   int        M = a->M, N = a->N,m,*ai,*aj,row,*cols,i,*ct;
1373   Mat        B;
1374   Scalar     *array;
1375 
1376   PetscFunctionBegin;
1377   if (matout == PETSC_NULL && M != N) {
1378     SETERRQ(PETSC_ERR_ARG_SIZ,0,"Square matrix only for in-place");
1379   }
1380 
1381   ierr = MatCreateMPIAIJ(A->comm,a->n,a->m,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);CHKERRQ(ierr);
1382 
1383   /* copy over the A part */
1384   Aloc = (Mat_SeqAIJ*) a->A->data;
1385   m = Aloc->m; ai = Aloc->i; aj = Aloc->j; array = Aloc->a;
1386   row = a->rstart;
1387   for ( i=0; i<ai[m]+shift; i++ ) {aj[i] += a->cstart + shift;}
1388   for ( i=0; i<m; i++ ) {
1389     ierr = MatSetValues(B,ai[i+1]-ai[i],aj,1,&row,array,INSERT_VALUES);CHKERRQ(ierr);
1390     row++; array += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
1391   }
1392   aj = Aloc->j;
1393   for ( i=0; i<ai[m]+shift; i++ ) {aj[i] -= a->cstart + shift;}
1394 
1395   /* copy over the B part */
1396   Aloc = (Mat_SeqAIJ*) a->B->data;
1397   m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; array = Aloc->a;
1398   row = a->rstart;
1399   ct = cols = (int *) PetscMalloc( (1+ai[m]-shift)*sizeof(int) ); CHKPTRQ(cols);
1400   for ( i=0; i<ai[m]+shift; i++ ) {cols[i] = a->garray[aj[i]+shift];}
1401   for ( i=0; i<m; i++ ) {
1402     ierr = MatSetValues(B,ai[i+1]-ai[i],cols,1,&row,array,INSERT_VALUES);CHKERRQ(ierr);
1403     row++; array += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
1404   }
1405   PetscFree(ct);
1406   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1407   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1408   if (matout != PETSC_NULL) {
1409     *matout = B;
1410   } else {
1411     PetscOps       *Abops;
1412     struct _MatOps *Aops;
1413 
1414     /* This isn't really an in-place transpose .... but free data structures from a */
1415     PetscFree(a->rowners);
1416     ierr = MatDestroy(a->A); CHKERRQ(ierr);
1417     ierr = MatDestroy(a->B); CHKERRQ(ierr);
1418     if (a->colmap) PetscFree(a->colmap);
1419     if (a->garray) PetscFree(a->garray);
1420     if (a->lvec) VecDestroy(a->lvec);
1421     if (a->Mvctx) VecScatterDestroy(a->Mvctx);
1422     PetscFree(a);
1423 
1424     /*
1425        This is horrible, horrible code. We need to keep the
1426       A pointers for the bops and ops but copy everything
1427       else from C.
1428     */
1429     Abops = A->bops;
1430     Aops  = A->ops;
1431     PetscMemcpy(A,B,sizeof(struct _p_Mat));
1432     A->bops = Abops;
1433     A->ops  = Aops;
1434     PetscHeaderDestroy(B);
1435   }
1436   PetscFunctionReturn(0);
1437 }
1438 
1439 #undef __FUNC__
1440 #define __FUNC__ "MatDiagonalScale_MPIAIJ"
1441 int MatDiagonalScale_MPIAIJ(Mat mat,Vec ll,Vec rr)
1442 {
1443   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
1444   Mat a = aij->A, b = aij->B;
1445   int ierr,s1,s2,s3;
1446 
1447   PetscFunctionBegin;
1448   ierr = MatGetLocalSize(mat,&s2,&s3); CHKERRQ(ierr);
1449   if (rr) {
1450     ierr = VecGetLocalSize(rr,&s1);CHKERRQ(ierr);
1451     if (s1!=s3) SETERRQ(PETSC_ERR_ARG_SIZ,0,"right vector non-conforming local size");
1452     /* Overlap communication with computation. */
1453     ierr = VecScatterBegin(rr,aij->lvec,INSERT_VALUES,SCATTER_FORWARD,aij->Mvctx); CHKERRQ(ierr);
1454   }
1455   if (ll) {
1456     ierr = VecGetLocalSize(ll,&s1);CHKERRQ(ierr);
1457     if (s1!=s2) SETERRQ(PETSC_ERR_ARG_SIZ,0,"left vector non-conforming local size");
1458     ierr = (*b->ops->diagonalscale)(b,ll,0); CHKERRQ(ierr);
1459   }
1460   /* scale  the diagonal block */
1461   ierr = (*a->ops->diagonalscale)(a,ll,rr); CHKERRQ(ierr);
1462 
1463   if (rr) {
1464     /* Do a scatter end and then right scale the off-diagonal block */
1465     ierr = VecScatterEnd(rr,aij->lvec,INSERT_VALUES,SCATTER_FORWARD,aij->Mvctx); CHKERRQ(ierr);
1466     ierr = (*b->ops->diagonalscale)(b,0,aij->lvec); CHKERRQ(ierr);
1467   }
1468 
1469   PetscFunctionReturn(0);
1470 }
1471 
1472 
1473 extern int MatPrintHelp_SeqAIJ(Mat);
1474 #undef __FUNC__
1475 #define __FUNC__ "MatPrintHelp_MPIAIJ"
1476 int MatPrintHelp_MPIAIJ(Mat A)
1477 {
1478   Mat_MPIAIJ *a   = (Mat_MPIAIJ*) A->data;
1479   int        ierr;
1480 
1481   PetscFunctionBegin;
1482   if (!a->rank) {
1483     ierr = MatPrintHelp_SeqAIJ(a->A);CHKERRQ(ierr);
1484   }
1485   PetscFunctionReturn(0);
1486 }
1487 
1488 #undef __FUNC__
1489 #define __FUNC__ "MatGetBlockSize_MPIAIJ"
1490 int MatGetBlockSize_MPIAIJ(Mat A,int *bs)
1491 {
1492   PetscFunctionBegin;
1493   *bs = 1;
1494   PetscFunctionReturn(0);
1495 }
1496 #undef __FUNC__
1497 #define __FUNC__ "MatSetUnfactored_MPIAIJ"
1498 int MatSetUnfactored_MPIAIJ(Mat A)
1499 {
1500   Mat_MPIAIJ *a   = (Mat_MPIAIJ*) A->data;
1501   int        ierr;
1502 
1503   PetscFunctionBegin;
1504   ierr = MatSetUnfactored(a->A); CHKERRQ(ierr);
1505   PetscFunctionReturn(0);
1506 }
1507 
1508 #undef __FUNC__
1509 #define __FUNC__ "MatEqual_MPIAIJ"
1510 int MatEqual_MPIAIJ(Mat A, Mat B, PetscTruth *flag)
1511 {
1512   Mat_MPIAIJ *matB = (Mat_MPIAIJ *) B->data,*matA = (Mat_MPIAIJ *) A->data;
1513   Mat        a, b, c, d;
1514   PetscTruth flg;
1515   int        ierr;
1516 
1517   PetscFunctionBegin;
1518   if (B->type != MATMPIAIJ) SETERRQ(PETSC_ERR_ARG_INCOMP,0,"Matrices must be same type");
1519   a = matA->A; b = matA->B;
1520   c = matB->A; d = matB->B;
1521 
1522   ierr = MatEqual(a, c, &flg); CHKERRQ(ierr);
1523   if (flg == PETSC_TRUE) {
1524     ierr = MatEqual(b, d, &flg); CHKERRQ(ierr);
1525   }
1526   ierr = MPI_Allreduce(&flg, flag, 1, MPI_INT, MPI_LAND, A->comm);CHKERRQ(ierr);
1527   PetscFunctionReturn(0);
1528 }
1529 
1530 extern int MatConvertSameType_MPIAIJ(Mat,Mat *,int);
1531 extern int MatIncreaseOverlap_MPIAIJ(Mat , int, IS *, int);
1532 extern int MatFDColoringCreate_MPIAIJ(Mat,ISColoring,MatFDColoring);
1533 extern int MatGetSubMatrices_MPIAIJ (Mat ,int , IS *,IS *,MatGetSubMatrixCall,Mat **);
1534 extern int MatGetSubMatrix_MPIAIJ (Mat ,IS,IS,int,MatGetSubMatrixCall,Mat *);
1535 
1536 /* -------------------------------------------------------------------*/
1537 static struct _MatOps MatOps_Values = {MatSetValues_MPIAIJ,
1538        MatGetRow_MPIAIJ,
1539        MatRestoreRow_MPIAIJ,
1540        MatMult_MPIAIJ,
1541        MatMultAdd_MPIAIJ,
1542        MatMultTrans_MPIAIJ,
1543        MatMultTransAdd_MPIAIJ,
1544        0,
1545        0,
1546        0,
1547        0,
1548        0,
1549        0,
1550        MatRelax_MPIAIJ,
1551        MatTranspose_MPIAIJ,
1552        MatGetInfo_MPIAIJ,
1553        MatEqual_MPIAIJ,
1554        MatGetDiagonal_MPIAIJ,
1555        MatDiagonalScale_MPIAIJ,
1556        MatNorm_MPIAIJ,
1557        MatAssemblyBegin_MPIAIJ,
1558        MatAssemblyEnd_MPIAIJ,
1559        0,
1560        MatSetOption_MPIAIJ,
1561        MatZeroEntries_MPIAIJ,
1562        MatZeroRows_MPIAIJ,
1563        0,
1564        0,
1565        0,
1566        0,
1567        MatGetSize_MPIAIJ,
1568        MatGetLocalSize_MPIAIJ,
1569        MatGetOwnershipRange_MPIAIJ,
1570        0,
1571        0,
1572        0,
1573        0,
1574        MatConvertSameType_MPIAIJ,
1575        0,
1576        0,
1577        0,
1578        0,
1579        0,
1580        MatGetSubMatrices_MPIAIJ,
1581        MatIncreaseOverlap_MPIAIJ,
1582        MatGetValues_MPIAIJ,
1583        0,
1584        MatPrintHelp_MPIAIJ,
1585        MatScale_MPIAIJ,
1586        0,
1587        0,
1588        0,
1589        MatGetBlockSize_MPIAIJ,
1590        0,
1591        0,
1592        0,
1593        0,
1594        MatFDColoringCreate_MPIAIJ,
1595        0,
1596        MatSetUnfactored_MPIAIJ,
1597        0,
1598        0,
1599        MatGetSubMatrix_MPIAIJ,
1600        0,
1601        0,
1602        MatGetMaps_Petsc};
1603 
1604 
1605 #undef __FUNC__
1606 #define __FUNC__ "MatCreateMPIAIJ"
1607 /*@C
1608    MatCreateMPIAIJ - Creates a sparse parallel matrix in AIJ format
1609    (the default parallel PETSc format).  For good matrix assembly performance
1610    the user should preallocate the matrix storage by setting the parameters
1611    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1612    performance can be increased by more than a factor of 50.
1613 
1614    Collective on MPI_Comm
1615 
1616    Input Parameters:
1617 +  comm - MPI communicator
1618 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1619            This value should be the same as the local size used in creating the
1620            y vector for the matrix-vector product y = Ax.
1621 .  n - This value should be the same as the local size used in creating the
1622        x vector for the matrix-vector product y = Ax. (or PETSC_DECIDE to have
1623        calculated if N is given) For square matrices n is almost always m.
1624 .  M - number of global rows (or PETSC_DETERMINE to have calculated if m is given)
1625 .  N - number of global columns (or PETSC_DETERMINE to have calculated if n is given)
1626 .  d_nz - number of nonzeros per row in diagonal portion of local submatrix
1627            (same for all local rows)
1628 .  d_nzz - array containing the number of nonzeros in the various rows of the
1629            diagonal portion of the local submatrix (possibly different for each row)
1630            or PETSC_NULL. You must leave room for the diagonal entry even if
1631            it is zero.
1632 .  o_nz - number of nonzeros per row in the off-diagonal portion of local
1633            submatrix (same for all local rows).
1634 -  o_nzz - array containing the number of nonzeros in the various rows of the
1635            off-diagonal portion of the local submatrix (possibly different for
1636            each row) or PETSC_NULL.
1637 
1638    Output Parameter:
1639 .  A - the matrix
1640 
1641    Notes:
1642    If PETSC_DECIDE or  PETSC_DETERMINE is used for a particular argument on one processor
1643    than it must be used on all processors that share the object for that argument.
1644 
1645    The AIJ format (also called the Yale sparse matrix format or
1646    compressed row storage), is fully compatible with standard Fortran 77
1647    storage.  That is, the stored row and column indices can begin at
1648    either one (as in Fortran) or zero.  See the users manual for details.
1649 
1650    The user MUST specify either the local or global matrix dimensions
1651    (possibly both).
1652 
1653    By default, this format uses inodes (identical nodes) when possible.
1654    We search for consecutive rows with the same nonzero structure, thereby
1655    reusing matrix information to achieve increased efficiency.
1656 
1657    Options Database Keys:
1658 +  -mat_aij_no_inode  - Do not use inodes
1659 .  -mat_aij_inode_limit <limit> - Sets inode limit (max limit=5)
1660 -  -mat_aij_oneindex - Internally use indexing starting at 1
1661         rather than 0.  Note that when calling MatSetValues(),
1662         the user still MUST index entries starting at 0!
1663 
1664    Storage Information:
1665    For a square global matrix we define each processor's diagonal portion
1666    to be its local rows and the corresponding columns (a square submatrix);
1667    each processor's off-diagonal portion encompasses the remainder of the
1668    local matrix (a rectangular submatrix).
1669 
1670    The user can specify preallocated storage for the diagonal part of
1671    the local submatrix with either d_nz or d_nnz (not both).  Set
1672    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1673    memory allocation.  Likewise, specify preallocated storage for the
1674    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1675 
1676    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1677    the figure below we depict these three local rows and all columns (0-11).
1678 
1679 .vb
1680              0 1 2 3 4 5 6 7 8 9 10 11
1681             -------------------
1682      row 3  |  o o o d d d o o o o o o
1683      row 4  |  o o o d d d o o o o o o
1684      row 5  |  o o o d d d o o o o o o
1685             -------------------
1686 .ve
1687 
1688    Thus, any entries in the d locations are stored in the d (diagonal)
1689    submatrix, and any entries in the o locations are stored in the
1690    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1691    stored simply in the MATSEQAIJ format for compressed row storage.
1692 
1693    Now d_nz should indicate the number of nonzeros per row in the d matrix,
1694    and o_nz should indicate the number of nonzeros per row in the o matrix.
1695    In general, for PDE problems in which most nonzeros are near the diagonal,
1696    one expects d_nz >> o_nz. For large problems you MUST preallocate memory
1697    or you will get TERRIBLE performance; see the users' manual chapter on
1698    matrices.
1699 
1700 .keywords: matrix, aij, compressed row, sparse, parallel
1701 
1702 .seealso: MatCreate(), MatCreateSeqAIJ(), MatSetValues()
1703 @*/
1704 int MatCreateMPIAIJ(MPI_Comm comm,int m,int n,int M,int N,int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1705 {
1706   Mat          B;
1707   Mat_MPIAIJ   *b;
1708   int          ierr, i,sum[2],work[2],size;
1709 
1710   PetscFunctionBegin;
1711   MPI_Comm_size(comm,&size);
1712   if (size == 1) {
1713     if (M == PETSC_DECIDE) M = m;
1714     if (N == PETSC_DECIDE) N = n;
1715     ierr = MatCreateSeqAIJ(comm,M,N,d_nz,d_nnz,A); CHKERRQ(ierr);
1716     PetscFunctionReturn(0);
1717   }
1718 
1719   *A = 0;
1720   PetscHeaderCreate(B,_p_Mat,struct _MatOps,MAT_COOKIE,MATMPIAIJ,comm,MatDestroy,MatView);
1721   PLogObjectCreate(B);
1722   B->data       = (void *) (b = PetscNew(Mat_MPIAIJ)); CHKPTRQ(b);
1723   PetscMemzero(b,sizeof(Mat_MPIAIJ));
1724   PetscMemcpy(B->ops,&MatOps_Values,sizeof(struct _MatOps));
1725   B->ops->destroy    = MatDestroy_MPIAIJ;
1726   B->ops->view       = MatView_MPIAIJ;
1727   B->factor     = 0;
1728   B->assembled  = PETSC_FALSE;
1729   B->mapping    = 0;
1730 
1731   B->insertmode = NOT_SET_VALUES;
1732   b->size       = size;
1733   MPI_Comm_rank(comm,&b->rank);
1734 
1735   if (m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL)) {
1736     SETERRQ(PETSC_ERR_ARG_WRONG,0,"Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1737   }
1738 
1739   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1740     work[0] = m; work[1] = n;
1741     ierr = MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );CHKERRQ(ierr);
1742     if (M == PETSC_DECIDE) M = sum[0];
1743     if (N == PETSC_DECIDE) N = sum[1];
1744   }
1745   if (m == PETSC_DECIDE) {m = M/b->size + ((M % b->size) > b->rank);}
1746   if (n == PETSC_DECIDE) {n = N/b->size + ((N % b->size) > b->rank);}
1747   b->m = m; B->m = m;
1748   b->n = n; B->n = n;
1749   b->N = N; B->N = N;
1750   b->M = M; B->M = M;
1751 
1752   /* the information in the maps duplicates the information computed below, eventually
1753      we should remove the duplicate information that is not contained in the maps */
1754   ierr = MapCreate(comm,m,M,B->rmap);CHKERRQ(ierr);
1755   ierr = MapCreate(comm,n,N,B->cmap);CHKERRQ(ierr);
1756 
1757   /* build local table of row and column ownerships */
1758   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1759   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIAIJ));
1760   b->cowners = b->rowners + b->size + 2;
1761   ierr = MPI_Allgather(&m,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1762   b->rowners[0] = 0;
1763   for ( i=2; i<=b->size; i++ ) {
1764     b->rowners[i] += b->rowners[i-1];
1765   }
1766   b->rstart = b->rowners[b->rank];
1767   b->rend   = b->rowners[b->rank+1];
1768   ierr = MPI_Allgather(&n,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1769   b->cowners[0] = 0;
1770   for ( i=2; i<=b->size; i++ ) {
1771     b->cowners[i] += b->cowners[i-1];
1772   }
1773   b->cstart = b->cowners[b->rank];
1774   b->cend   = b->cowners[b->rank+1];
1775 
1776   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1777   ierr = MatCreateSeqAIJ(PETSC_COMM_SELF,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1778   PLogObjectParent(B,b->A);
1779   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1780   ierr = MatCreateSeqAIJ(PETSC_COMM_SELF,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1781   PLogObjectParent(B,b->B);
1782 
1783   /* build cache for off array entries formed */
1784   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1785   b->donotstash  = 0;
1786   b->colmap      = 0;
1787   b->garray      = 0;
1788   b->roworiented = 1;
1789 
1790   /* stuff used for matrix vector multiply */
1791   b->lvec      = 0;
1792   b->Mvctx     = 0;
1793 
1794   /* stuff for MatGetRow() */
1795   b->rowindices   = 0;
1796   b->rowvalues    = 0;
1797   b->getrowactive = PETSC_FALSE;
1798 
1799   *A = B;
1800   PetscFunctionReturn(0);
1801 }
1802 
1803 #undef __FUNC__
1804 #define __FUNC__ "MatConvertSameType_MPIAIJ"
1805 int MatConvertSameType_MPIAIJ(Mat matin,Mat *newmat,int cpvalues)
1806 {
1807   Mat        mat;
1808   Mat_MPIAIJ *a,*oldmat = (Mat_MPIAIJ *) matin->data;
1809   int        ierr, len=0, flg;
1810 
1811   PetscFunctionBegin;
1812   *newmat       = 0;
1813   PetscHeaderCreate(mat,_p_Mat,struct _MatOps,MAT_COOKIE,MATMPIAIJ,matin->comm,MatDestroy,MatView);
1814   PLogObjectCreate(mat);
1815   mat->data       = (void *) (a = PetscNew(Mat_MPIAIJ)); CHKPTRQ(a);
1816   PetscMemcpy(mat->ops,&MatOps_Values,sizeof(struct _MatOps));
1817   mat->ops->destroy    = MatDestroy_MPIAIJ;
1818   mat->ops->view       = MatView_MPIAIJ;
1819   mat->factor     = matin->factor;
1820   mat->assembled  = PETSC_TRUE;
1821 
1822   a->m = mat->m   = oldmat->m;
1823   a->n = mat->n   = oldmat->n;
1824   a->M = mat->M   = oldmat->M;
1825   a->N = mat->N   = oldmat->N;
1826 
1827   a->rstart       = oldmat->rstart;
1828   a->rend         = oldmat->rend;
1829   a->cstart       = oldmat->cstart;
1830   a->cend         = oldmat->cend;
1831   a->size         = oldmat->size;
1832   a->rank         = oldmat->rank;
1833   mat->insertmode = NOT_SET_VALUES;
1834   a->rowvalues    = 0;
1835   a->getrowactive = PETSC_FALSE;
1836 
1837   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1838   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIAIJ));
1839   a->cowners = a->rowners + a->size + 2;
1840   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1841   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1842   if (oldmat->colmap) {
1843     a->colmap = (int *) PetscMalloc((a->N)*sizeof(int));CHKPTRQ(a->colmap);
1844     PLogObjectMemory(mat,(a->N)*sizeof(int));
1845     PetscMemcpy(a->colmap,oldmat->colmap,(a->N)*sizeof(int));
1846   } else a->colmap = 0;
1847   if (oldmat->garray) {
1848     len = ((Mat_SeqAIJ *) (oldmat->B->data))->n;
1849     a->garray = (int *) PetscMalloc((len+1)*sizeof(int)); CHKPTRQ(a->garray);
1850     PLogObjectMemory(mat,len*sizeof(int));
1851     if (len) PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1852   } else a->garray = 0;
1853 
1854   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1855   PLogObjectParent(mat,a->lvec);
1856   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1857   PLogObjectParent(mat,a->Mvctx);
1858   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1859   PLogObjectParent(mat,a->A);
1860   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1861   PLogObjectParent(mat,a->B);
1862   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1863   if (flg) {
1864     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1865   }
1866   *newmat = mat;
1867   PetscFunctionReturn(0);
1868 }
1869 
1870 #include "sys.h"
1871 
1872 #undef __FUNC__
1873 #define __FUNC__ "MatLoad_MPIAIJ"
1874 int MatLoad_MPIAIJ(Viewer viewer,MatType type,Mat *newmat)
1875 {
1876   Mat          A;
1877   Scalar       *vals,*svals;
1878   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1879   MPI_Status   status;
1880   int          i, nz, ierr, j,rstart, rend, fd;
1881   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,maxnz,*cols;
1882   int          *ourlens,*sndcounts = 0,*procsnz = 0, *offlens,jj,*mycols,*smycols;
1883   int          tag = ((PetscObject)viewer)->tag;
1884 
1885   PetscFunctionBegin;
1886   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1887   if (!rank) {
1888     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1889     ierr = PetscBinaryRead(fd,(char *)header,4,PETSC_INT); CHKERRQ(ierr);
1890     if (header[0] != MAT_COOKIE) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"not matrix object");
1891     if (header[3] < 0) {
1892       SETERRQ(PETSC_ERR_FILE_UNEXPECTED,1,"Matrix in special format on disk, cannot load as MPIAIJ");
1893     }
1894   }
1895 
1896   ierr = MPI_Bcast(header+1,3,MPI_INT,0,comm);CHKERRQ(ierr);
1897   M = header[1]; N = header[2];
1898   /* determine ownership of all rows */
1899   m = M/size + ((M % size) > rank);
1900   rowners = (int *) PetscMalloc((size+2)*sizeof(int)); CHKPTRQ(rowners);
1901   ierr = MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1902   rowners[0] = 0;
1903   for ( i=2; i<=size; i++ ) {
1904     rowners[i] += rowners[i-1];
1905   }
1906   rstart = rowners[rank];
1907   rend   = rowners[rank+1];
1908 
1909   /* distribute row lengths to all processors */
1910   ourlens = (int*) PetscMalloc( 2*(rend-rstart)*sizeof(int) ); CHKPTRQ(ourlens);
1911   offlens = ourlens + (rend-rstart);
1912   if (!rank) {
1913     rowlengths = (int*) PetscMalloc( M*sizeof(int) ); CHKPTRQ(rowlengths);
1914     ierr = PetscBinaryRead(fd,rowlengths,M,PETSC_INT); CHKERRQ(ierr);
1915     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1916     for ( i=0; i<size; i++ ) sndcounts[i] = rowners[i+1] - rowners[i];
1917     ierr = MPI_Scatterv(rowlengths,sndcounts,rowners,MPI_INT,ourlens,rend-rstart,MPI_INT,0,comm);CHKERRQ(ierr);
1918     PetscFree(sndcounts);
1919   } else {
1920     ierr = MPI_Scatterv(0,0,0,MPI_INT,ourlens,rend-rstart,MPI_INT, 0,comm);CHKERRQ(ierr);
1921   }
1922 
1923   if (!rank) {
1924     /* calculate the number of nonzeros on each processor */
1925     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1926     PetscMemzero(procsnz,size*sizeof(int));
1927     for ( i=0; i<size; i++ ) {
1928       for ( j=rowners[i]; j< rowners[i+1]; j++ ) {
1929         procsnz[i] += rowlengths[j];
1930       }
1931     }
1932     PetscFree(rowlengths);
1933 
1934     /* determine max buffer needed and allocate it */
1935     maxnz = 0;
1936     for ( i=0; i<size; i++ ) {
1937       maxnz = PetscMax(maxnz,procsnz[i]);
1938     }
1939     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1940 
1941     /* read in my part of the matrix column indices  */
1942     nz     = procsnz[0];
1943     mycols = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1944     ierr   = PetscBinaryRead(fd,mycols,nz,PETSC_INT); CHKERRQ(ierr);
1945 
1946     /* read in every one elses and ship off */
1947     for ( i=1; i<size; i++ ) {
1948       nz   = procsnz[i];
1949       ierr = PetscBinaryRead(fd,cols,nz,PETSC_INT); CHKERRQ(ierr);
1950       ierr = MPI_Send(cols,nz,MPI_INT,i,tag,comm);CHKERRQ(ierr);
1951     }
1952     PetscFree(cols);
1953   } else {
1954     /* determine buffer space needed for message */
1955     nz = 0;
1956     for ( i=0; i<m; i++ ) {
1957       nz += ourlens[i];
1958     }
1959     mycols = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1960 
1961     /* receive message of column indices*/
1962     ierr = MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);CHKERRQ(ierr);
1963     ierr = MPI_Get_count(&status,MPI_INT,&maxnz);CHKERRQ(ierr);
1964     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"something is wrong with file");
1965   }
1966 
1967   /* loop over local rows, determining number of off diagonal entries */
1968   PetscMemzero(offlens,m*sizeof(int));
1969   jj = 0;
1970   for ( i=0; i<m; i++ ) {
1971     for ( j=0; j<ourlens[i]; j++ ) {
1972       if (mycols[jj] < rstart || mycols[jj] >= rend) offlens[i]++;
1973       jj++;
1974     }
1975   }
1976 
1977   /* create our matrix */
1978   for ( i=0; i<m; i++ ) {
1979     ourlens[i] -= offlens[i];
1980   }
1981   ierr = MatCreateMPIAIJ(comm,m,PETSC_DECIDE,M,N,0,ourlens,0,offlens,newmat);CHKERRQ(ierr);
1982   A = *newmat;
1983   MatSetOption(A,MAT_COLUMNS_SORTED);
1984   for ( i=0; i<m; i++ ) {
1985     ourlens[i] += offlens[i];
1986   }
1987 
1988   if (!rank) {
1989     vals = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(vals);
1990 
1991     /* read in my part of the matrix numerical values  */
1992     nz = procsnz[0];
1993     ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
1994 
1995     /* insert into matrix */
1996     jj      = rstart;
1997     smycols = mycols;
1998     svals   = vals;
1999     for ( i=0; i<m; i++ ) {
2000       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
2001       smycols += ourlens[i];
2002       svals   += ourlens[i];
2003       jj++;
2004     }
2005 
2006     /* read in other processors and ship out */
2007     for ( i=1; i<size; i++ ) {
2008       nz   = procsnz[i];
2009       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
2010       ierr = MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);CHKERRQ(ierr);
2011     }
2012     PetscFree(procsnz);
2013   } else {
2014     /* receive numeric values */
2015     vals = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(vals);
2016 
2017     /* receive message of values*/
2018     ierr = MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);CHKERRQ(ierr);
2019     ierr = MPI_Get_count(&status,MPIU_SCALAR,&maxnz);CHKERRQ(ierr);
2020     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"something is wrong with file");
2021 
2022     /* insert into matrix */
2023     jj      = rstart;
2024     smycols = mycols;
2025     svals   = vals;
2026     for ( i=0; i<m; i++ ) {
2027       ierr     = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
2028       smycols += ourlens[i];
2029       svals   += ourlens[i];
2030       jj++;
2031     }
2032   }
2033   PetscFree(ourlens); PetscFree(vals); PetscFree(mycols); PetscFree(rowners);
2034 
2035   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2036   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2037   PetscFunctionReturn(0);
2038 }
2039 
2040 #undef __FUNC__
2041 #define __FUNC__ "MatGetSubMatrix_MPIAIJ"
2042 /*
2043     Not great since it makes two copies of the submatrix, first an SeqAIJ
2044   in local and then by concatenating the local matrices the end result.
2045   Writing it directly would be much like MatGetSubMatrices_MPIAIJ()
2046 */
2047 int MatGetSubMatrix_MPIAIJ(Mat mat,IS isrow,IS iscol,int csize,MatGetSubMatrixCall call,Mat *newmat)
2048 {
2049   int        ierr, i, m,n,rstart,row,rend,nz,*cwork,size,rank,j;
2050   Mat        *local,M, Mreuse;
2051   Scalar     *vwork,*aa;
2052   MPI_Comm   comm = mat->comm;
2053   Mat_SeqAIJ *aij;
2054   int        *ii, *jj,nlocal,*dlens,*olens,dlen,olen,jend;
2055 
2056   PetscFunctionBegin;
2057   MPI_Comm_rank(comm,&rank);
2058   MPI_Comm_size(comm,&size);
2059 
2060   if (call ==  MAT_REUSE_MATRIX) {
2061     ierr = PetscObjectQuery((PetscObject)*newmat,"SubMatrix",(PetscObject *)&Mreuse);CHKERRQ(ierr);
2062     if (!Mreuse) SETERRQ(1,1,"Submatrix passed in was not used before, cannot reuse");
2063     local = &Mreuse;
2064     ierr  = MatGetSubMatrices(mat,1,&isrow,&iscol,MAT_REUSE_MATRIX,&local);CHKERRQ(ierr);
2065   } else {
2066     ierr = MatGetSubMatrices(mat,1,&isrow,&iscol,MAT_INITIAL_MATRIX,&local);CHKERRQ(ierr);
2067     Mreuse = *local;
2068     PetscFree(local);
2069   }
2070 
2071   /*
2072       m - number of local rows
2073       n - number of columns (same on all processors)
2074       rstart - first row in new global matrix generated
2075   */
2076   ierr = MatGetSize(Mreuse,&m,&n);CHKERRQ(ierr);
2077   if (call == MAT_INITIAL_MATRIX) {
2078     aij = (Mat_SeqAIJ *) (Mreuse)->data;
2079     if (aij->indexshift) SETERRQ(PETSC_ERR_SUP,1,"No support for index shifted matrix");
2080     ii  = aij->i;
2081     jj  = aij->j;
2082 
2083     /*
2084         Determine the number of non-zeros in the diagonal and off-diagonal
2085         portions of the matrix in order to do correct preallocation
2086     */
2087 
2088     /* first get start and end of "diagonal" columns */
2089     if (csize == PETSC_DECIDE) {
2090       nlocal = n/size + ((n % size) > rank);
2091     } else {
2092       nlocal = csize;
2093     }
2094     ierr   = MPI_Scan(&nlocal,&rend,1,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
2095     rstart = rend - nlocal;
2096     if (rank == size - 1 && rend != n) {
2097       SETERRQ(1,1,"Local column sizes do not add up to total number of columns");
2098     }
2099 
2100     /* next, compute all the lengths */
2101     dlens = (int *) PetscMalloc( (2*m+1)*sizeof(int) );CHKPTRQ(dlens);
2102     olens = dlens + m;
2103     for ( i=0; i<m; i++ ) {
2104       jend = ii[i+1] - ii[i];
2105       olen = 0;
2106       dlen = 0;
2107       for ( j=0; j<jend; j++ ) {
2108         if ( *jj < rstart || *jj >= rend) olen++;
2109         else dlen++;
2110         jj++;
2111       }
2112       olens[i] = olen;
2113       dlens[i] = dlen;
2114     }
2115     ierr = MatCreateMPIAIJ(comm,m,PETSC_DECIDE,PETSC_DECIDE,n,0,dlens,0,olens,&M);CHKERRQ(ierr);
2116     PetscFree(dlens);
2117   } else {
2118     int ml,nl;
2119 
2120     M = *newmat;
2121     ierr = MatGetLocalSize(M,&ml,&nl);CHKERRQ(ierr);
2122     if (ml != m) SETERRQ(PETSC_ERR_ARG_SIZ,1,"Previous matrix must be same size/layout as request");
2123     ierr = MatZeroEntries(M);CHKERRQ(ierr);
2124     /*
2125          The next two lines are needed so we may call MatSetValues_MPIAIJ() below directly,
2126        rather than the slower MatSetValues().
2127     */
2128     M->was_assembled = PETSC_TRUE;
2129     M->assembled     = PETSC_FALSE;
2130   }
2131   ierr = MatGetOwnershipRange(M,&rstart,&rend); CHKERRQ(ierr);
2132   aij = (Mat_SeqAIJ *) (Mreuse)->data;
2133   if (aij->indexshift) SETERRQ(PETSC_ERR_SUP,1,"No support for index shifted matrix");
2134   ii  = aij->i;
2135   jj  = aij->j;
2136   aa  = aij->a;
2137   for (i=0; i<m; i++) {
2138     row   = rstart + i;
2139     nz    = ii[i+1] - ii[i];
2140     cwork = jj;     jj += nz;
2141     vwork = aa;     aa += nz;
2142     ierr = MatSetValues_MPIAIJ(M,1,&row,nz,cwork,vwork,INSERT_VALUES); CHKERRQ(ierr);
2143   }
2144 
2145   ierr = MatAssemblyBegin(M,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2146   ierr = MatAssemblyEnd(M,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2147   *newmat = M;
2148 
2149   /* save submatrix used in processor for next request */
2150   if (call ==  MAT_INITIAL_MATRIX) {
2151     ierr = PetscObjectCompose((PetscObject)M,"SubMatrix",(PetscObject)Mreuse);CHKERRQ(ierr);
2152     ierr = PetscObjectDereference((PetscObject)Mreuse);CHKERRQ(ierr);
2153   }
2154 
2155   PetscFunctionReturn(0);
2156 }
2157