xref: /petsc/src/mat/impls/aij/mpi/mpiaij.c (revision 08c5a2cb45c49a5be56df2f44fb1b499edde026e)
1 #ifdef PETSC_RCS_HEADER
2 static char vcid[] = "$Id: mpiaij.c,v 1.265 1998/11/20 22:21:12 balay Exp balay $";
3 #endif
4 
5 #include "pinclude/pviewer.h"
6 #include "src/mat/impls/aij/mpi/mpiaij.h"
7 #include "src/vec/vecimpl.h"
8 #include "src/inline/spops.h"
9 
10 /* local utility routine that creates a mapping from the global column
11 number to the local number in the off-diagonal part of the local
12 storage of the matrix.  This is done in a non scable way since the
13 length of colmap equals the global matrix length.
14 */
15 #undef __FUNC__
16 #define __FUNC__ "CreateColmap_MPIAIJ_Private"
17 int CreateColmap_MPIAIJ_Private(Mat mat)
18 {
19   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
20   Mat_SeqAIJ *B = (Mat_SeqAIJ*) aij->B->data;
21   int        n = B->n,i;
22 
23   PetscFunctionBegin;
24   aij->colmap = (int *) PetscMalloc((aij->N+1)*sizeof(int));CHKPTRQ(aij->colmap);
25   PLogObjectMemory(mat,aij->N*sizeof(int));
26   PetscMemzero(aij->colmap,aij->N*sizeof(int));
27   for ( i=0; i<n; i++ ) aij->colmap[aij->garray[i]] = i+1;
28   PetscFunctionReturn(0);
29 }
30 
31 extern int DisAssemble_MPIAIJ(Mat);
32 
33 #define CHUNKSIZE   15
34 #define MatSetValues_SeqAIJ_A_Private(row,col,value,addv) \
35 { \
36  \
37     rp   = aj + ai[row] + shift; ap = aa + ai[row] + shift; \
38     rmax = aimax[row]; nrow = ailen[row];  \
39     col1 = col - shift; \
40      \
41     low = 0; high = nrow; \
42     while (high-low > 5) { \
43       t = (low+high)/2; \
44       if (rp[t] > col) high = t; \
45       else             low  = t; \
46     } \
47       for ( _i=0; _i<nrow; _i++ ) { \
48         if (rp[_i] > col1) break; \
49         if (rp[_i] == col1) { \
50           if (addv == ADD_VALUES) ap[_i] += value;   \
51           else                  ap[_i] = value; \
52           goto a_noinsert; \
53         } \
54       }  \
55       if (nonew == 1) goto a_noinsert; \
56       else if (nonew == -1) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero into matrix"); \
57       if (nrow >= rmax) { \
58         /* there is no extra room in row, therefore enlarge */ \
59         int    new_nz = ai[a->m] + CHUNKSIZE,len,*new_i,*new_j; \
60         Scalar *new_a; \
61  \
62         if (nonew == -2) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero in the matrix"); \
63  \
64         /* malloc new storage space */ \
65         len     = new_nz*(sizeof(int)+sizeof(Scalar))+(a->m+1)*sizeof(int); \
66         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
67         new_j   = (int *) (new_a + new_nz); \
68         new_i   = new_j + new_nz; \
69  \
70         /* copy over old data into new slots */ \
71         for ( ii=0; ii<row+1; ii++ ) {new_i[ii] = ai[ii];} \
72         for ( ii=row+1; ii<a->m+1; ii++ ) {new_i[ii] = ai[ii]+CHUNKSIZE;} \
73         PetscMemcpy(new_j,aj,(ai[row]+nrow+shift)*sizeof(int)); \
74         len = (new_nz - CHUNKSIZE - ai[row] - nrow - shift); \
75         PetscMemcpy(new_j+ai[row]+shift+nrow+CHUNKSIZE,aj+ai[row]+shift+nrow, \
76                                                            len*sizeof(int)); \
77         PetscMemcpy(new_a,aa,(ai[row]+nrow+shift)*sizeof(Scalar)); \
78         PetscMemcpy(new_a+ai[row]+shift+nrow+CHUNKSIZE,aa+ai[row]+shift+nrow, \
79                                                            len*sizeof(Scalar));  \
80         /* free up old matrix storage */ \
81  \
82         PetscFree(a->a);  \
83         if (!a->singlemalloc) {PetscFree(a->i);PetscFree(a->j);} \
84         aa = a->a = new_a; ai = a->i = new_i; aj = a->j = new_j;  \
85         a->singlemalloc = 1; \
86  \
87         rp   = aj + ai[row] + shift; ap = aa + ai[row] + shift; \
88         rmax = aimax[row] = aimax[row] + CHUNKSIZE; \
89         PLogObjectMemory(A,CHUNKSIZE*(sizeof(int) + sizeof(Scalar))); \
90         a->maxnz += CHUNKSIZE; \
91         a->reallocs++; \
92       } \
93       N = nrow++ - 1; a->nz++; \
94       /* shift up all the later entries in this row */ \
95       for ( ii=N; ii>=_i; ii-- ) { \
96         rp[ii+1] = rp[ii]; \
97         ap[ii+1] = ap[ii]; \
98       } \
99       rp[_i] = col1;  \
100       ap[_i] = value;  \
101       a_noinsert: ; \
102       ailen[row] = nrow; \
103 }
104 
105 #define MatSetValues_SeqAIJ_B_Private(row,col,value,addv) \
106 { \
107  \
108     rp   = bj + bi[row] + shift; ap = ba + bi[row] + shift; \
109     rmax = bimax[row]; nrow = bilen[row];  \
110     col1 = col - shift; \
111      \
112     low = 0; high = nrow; \
113     while (high-low > 5) { \
114       t = (low+high)/2; \
115       if (rp[t] > col) high = t; \
116       else             low  = t; \
117     } \
118        for ( _i=0; _i<nrow; _i++ ) { \
119         if (rp[_i] > col1) break; \
120         if (rp[_i] == col1) { \
121           if (addv == ADD_VALUES) ap[_i] += value;   \
122           else                  ap[_i] = value; \
123           goto b_noinsert; \
124         } \
125       }  \
126       if (nonew == 1) goto b_noinsert; \
127       else if (nonew == -1) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero into matrix"); \
128       if (nrow >= rmax) { \
129         /* there is no extra room in row, therefore enlarge */ \
130         int    new_nz = bi[b->m] + CHUNKSIZE,len,*new_i,*new_j; \
131         Scalar *new_a; \
132  \
133         if (nonew == -2) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero in the matrix"); \
134  \
135         /* malloc new storage space */ \
136         len     = new_nz*(sizeof(int)+sizeof(Scalar))+(b->m+1)*sizeof(int); \
137         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
138         new_j   = (int *) (new_a + new_nz); \
139         new_i   = new_j + new_nz; \
140  \
141         /* copy over old data into new slots */ \
142         for ( ii=0; ii<row+1; ii++ ) {new_i[ii] = bi[ii];} \
143         for ( ii=row+1; ii<b->m+1; ii++ ) {new_i[ii] = bi[ii]+CHUNKSIZE;} \
144         PetscMemcpy(new_j,bj,(bi[row]+nrow+shift)*sizeof(int)); \
145         len = (new_nz - CHUNKSIZE - bi[row] - nrow - shift); \
146         PetscMemcpy(new_j+bi[row]+shift+nrow+CHUNKSIZE,bj+bi[row]+shift+nrow, \
147                                                            len*sizeof(int)); \
148         PetscMemcpy(new_a,ba,(bi[row]+nrow+shift)*sizeof(Scalar)); \
149         PetscMemcpy(new_a+bi[row]+shift+nrow+CHUNKSIZE,ba+bi[row]+shift+nrow, \
150                                                            len*sizeof(Scalar));  \
151         /* free up old matrix storage */ \
152  \
153         PetscFree(b->a);  \
154         if (!b->singlemalloc) {PetscFree(b->i);PetscFree(b->j);} \
155         ba = b->a = new_a; bi = b->i = new_i; bj = b->j = new_j;  \
156         b->singlemalloc = 1; \
157  \
158         rp   = bj + bi[row] + shift; ap = ba + bi[row] + shift; \
159         rmax = bimax[row] = bimax[row] + CHUNKSIZE; \
160         PLogObjectMemory(B,CHUNKSIZE*(sizeof(int) + sizeof(Scalar))); \
161         b->maxnz += CHUNKSIZE; \
162         b->reallocs++; \
163       } \
164       N = nrow++ - 1; b->nz++; \
165       /* shift up all the later entries in this row */ \
166       for ( ii=N; ii>=_i; ii-- ) { \
167         rp[ii+1] = rp[ii]; \
168         ap[ii+1] = ap[ii]; \
169       } \
170       rp[_i] = col1;  \
171       ap[_i] = value;  \
172       b_noinsert: ; \
173       bilen[row] = nrow; \
174 }
175 
176 extern int MatSetValues_SeqAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
177 #undef __FUNC__
178 #define __FUNC__ "MatSetValues_MPIAIJ"
179 int MatSetValues_MPIAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
180 {
181   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
182   Scalar     value;
183   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
184   int        cstart = aij->cstart, cend = aij->cend,row,col;
185   int        roworiented = aij->roworiented;
186 
187   /* Some Variables required in the macro */
188   Mat        A = aij->A;
189   Mat_SeqAIJ *a = (Mat_SeqAIJ *) A->data;
190   int        *aimax = a->imax, *ai = a->i, *ailen = a->ilen,*aj = a->j;
191   Scalar     *aa = a->a;
192 
193   Mat        B = aij->B;
194   Mat_SeqAIJ *b = (Mat_SeqAIJ *) B->data;
195   int        *bimax = b->imax, *bi = b->i, *bilen = b->ilen,*bj = b->j;
196   Scalar     *ba = b->a;
197 
198   int        *rp,ii,nrow,_i,rmax, N, col1,low,high,t;
199   int        nonew = a->nonew,shift = a->indexshift;
200   Scalar     *ap;
201 
202   PetscFunctionBegin;
203   for ( i=0; i<m; i++ ) {
204 #if defined(USE_PETSC_BOPT_g)
205     if (im[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
206     if (im[i] >= aij->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
207 #endif
208     if (im[i] >= rstart && im[i] < rend) {
209       row = im[i] - rstart;
210       for ( j=0; j<n; j++ ) {
211         if (in[j] >= cstart && in[j] < cend){
212           col = in[j] - cstart;
213           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
214           MatSetValues_SeqAIJ_A_Private(row,col,value,addv);
215           /* ierr = MatSetValues_SeqAIJ(aij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
216         }
217 #if defined(USE_PETSC_BOPT_g)
218         else if (in[j] < 0) {SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative column");}
219         else if (in[j] >= aij->N) {SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Column too large");}
220 #endif
221         else {
222           if (mat->was_assembled) {
223             if (!aij->colmap) {
224               ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
225             }
226             col = aij->colmap[in[j]] - 1;
227             if (col < 0 && !((Mat_SeqAIJ*)(aij->A->data))->nonew) {
228               ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
229               col =  in[j];
230               /* Reinitialize the variables required by MatSetValues_SeqAIJ_B_Private() */
231               B = aij->B;
232               b = (Mat_SeqAIJ *) B->data;
233               bimax = b->imax; bi = b->i; bilen = b->ilen; bj = b->j;
234               ba = b->a;
235             }
236           } else col = in[j];
237           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
238           MatSetValues_SeqAIJ_B_Private(row,col,value,addv);
239           /* ierr = MatSetValues_SeqAIJ(aij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
240         }
241       }
242     }
243     else {
244       if (roworiented && !aij->donotstash) {
245         ierr = StashValues_Private(&aij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
246       }
247       else {
248         if (!aij->donotstash) {
249           row = im[i];
250           for ( j=0; j<n; j++ ) {
251             ierr = StashValues_Private(&aij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
252           }
253         }
254       }
255     }
256   }
257   PetscFunctionReturn(0);
258 }
259 
260 #undef __FUNC__
261 #define __FUNC__ "MatGetValues_MPIAIJ"
262 int MatGetValues_MPIAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
263 {
264   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
265   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
266   int        cstart = aij->cstart, cend = aij->cend,row,col;
267 
268   PetscFunctionBegin;
269   for ( i=0; i<m; i++ ) {
270     if (idxm[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
271     if (idxm[i] >= aij->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
272     if (idxm[i] >= rstart && idxm[i] < rend) {
273       row = idxm[i] - rstart;
274       for ( j=0; j<n; j++ ) {
275         if (idxn[j] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative column");
276         if (idxn[j] >= aij->N) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Column too large");
277         if (idxn[j] >= cstart && idxn[j] < cend){
278           col = idxn[j] - cstart;
279           ierr = MatGetValues(aij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
280         } else {
281           if (!aij->colmap) {
282             ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
283           }
284           col = aij->colmap[idxn[j]] - 1;
285           if ((col < 0) || (aij->garray[col] != idxn[j])) *(v+i*n+j) = 0.0;
286           else {
287             ierr = MatGetValues(aij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
288           }
289         }
290       }
291     } else {
292       SETERRQ(PETSC_ERR_SUP,0,"Only local values currently supported");
293     }
294   }
295   PetscFunctionReturn(0);
296 }
297 
298 #undef __FUNC__
299 #define __FUNC__ "MatAssemblyBegin_MPIAIJ"
300 int MatAssemblyBegin_MPIAIJ(Mat mat,MatAssemblyType mode)
301 {
302   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
303   MPI_Comm    comm = mat->comm;
304   int         size = aij->size, *owners = aij->rowners;
305   int         rank = aij->rank,tag = mat->tag, *owner,*starts,count,ierr;
306   MPI_Request *send_waits,*recv_waits;
307   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
308   InsertMode  addv;
309   Scalar      *rvalues,*svalues;
310 
311   PetscFunctionBegin;
312   if (aij->donotstash) {
313     aij->svalues    = 0; aij->rvalues    = 0;
314     aij->nsends     = 0; aij->nrecvs     = 0;
315     aij->send_waits = 0; aij->recv_waits = 0;
316     aij->rmax       = 0;
317     PetscFunctionReturn(0);
318   }
319 
320   /* make sure all processors are either in INSERTMODE or ADDMODE */
321   ierr = MPI_Allreduce(&mat->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);CHKERRQ(ierr);
322   if (addv == (ADD_VALUES|INSERT_VALUES)) {
323     SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"Some processors inserted others added");
324   }
325   mat->insertmode = addv; /* in case this processor had no cache */
326 
327   /*  first count number of contributors to each processor */
328   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
329   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
330   owner = (int *) PetscMalloc( (aij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
331   for ( i=0; i<aij->stash.n; i++ ) {
332     idx = aij->stash.idx[i];
333     for ( j=0; j<size; j++ ) {
334       if (idx >= owners[j] && idx < owners[j+1]) {
335         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
336       }
337     }
338   }
339   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
340 
341   /* inform other processors of number of messages and max length*/
342   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
343   ierr = MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
344   nreceives = work[rank];
345   ierr = MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);CHKERRQ(ierr);
346   nmax = work[rank];
347   PetscFree(work);
348 
349   /* post receives:
350        1) each message will consist of ordered pairs
351      (global index,value) we store the global index as a double
352      to simplify the message passing.
353        2) since we don't know how long each individual message is we
354      allocate the largest needed buffer for each receive. Potentially
355      this is a lot of wasted space.
356 
357 
358        This could be done better.
359   */
360   rvalues    = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));CHKPTRQ(rvalues);
361   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
362   for ( i=0; i<nreceives; i++ ) {
363     ierr = MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
364               comm,recv_waits+i);CHKERRQ(ierr);
365   }
366 
367   /* do sends:
368       1) starts[i] gives the starting index in svalues for stuff going to
369          the ith processor
370   */
371   svalues    = (Scalar *) PetscMalloc(3*(aij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
372   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
373   starts     = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
374   starts[0]  = 0;
375   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
376   for ( i=0; i<aij->stash.n; i++ ) {
377     svalues[3*starts[owner[i]]]       = (Scalar)  aij->stash.idx[i];
378     svalues[3*starts[owner[i]]+1]     = (Scalar)  aij->stash.idy[i];
379     svalues[3*(starts[owner[i]]++)+2] =  aij->stash.array[i];
380   }
381   PetscFree(owner);
382   starts[0] = 0;
383   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
384   count = 0;
385   for ( i=0; i<size; i++ ) {
386     if (procs[i]) {
387       ierr = MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,comm,send_waits+count++);CHKERRQ(ierr);
388     }
389   }
390   PetscFree(starts);
391   PetscFree(nprocs);
392 
393   /* Free cache space */
394   PLogInfo(aij->A,"MatAssemblyBegin_MPIAIJ:Number of off-processor values %d\n",aij->stash.n);
395   ierr = StashDestroy_Private(&aij->stash); CHKERRQ(ierr);
396 
397   aij->svalues    = svalues;    aij->rvalues    = rvalues;
398   aij->nsends     = nsends;     aij->nrecvs     = nreceives;
399   aij->send_waits = send_waits; aij->recv_waits = recv_waits;
400   aij->rmax       = nmax;
401 
402   PetscFunctionReturn(0);
403 }
404 extern int MatSetUpMultiply_MPIAIJ(Mat);
405 
406 #undef __FUNC__
407 #define __FUNC__ "MatAssemblyEnd_MPIAIJ"
408 int MatAssemblyEnd_MPIAIJ(Mat mat,MatAssemblyType mode)
409 {
410   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
411   MPI_Status  *send_status,recv_status;
412   int         imdex,nrecvs = aij->nrecvs, count = nrecvs, i, n, ierr;
413   int         row,col,other_disassembled;
414   Scalar      *values,val;
415   InsertMode  addv = mat->insertmode;
416 
417   PetscFunctionBegin;
418 
419   /*  wait on receives */
420   while (count) {
421     ierr = MPI_Waitany(nrecvs,aij->recv_waits,&imdex,&recv_status);CHKERRQ(ierr);
422     /* unpack receives into our local space */
423     values = aij->rvalues + 3*imdex*aij->rmax;
424     ierr = MPI_Get_count(&recv_status,MPIU_SCALAR,&n);CHKERRQ(ierr);
425     n = n/3;
426     for ( i=0; i<n; i++ ) {
427       row = (int) PetscReal(values[3*i]) - aij->rstart;
428       col = (int) PetscReal(values[3*i+1]);
429       val = values[3*i+2];
430       if (col >= aij->cstart && col < aij->cend) {
431         col -= aij->cstart;
432         ierr = MatSetValues(aij->A,1,&row,1,&col,&val,addv); CHKERRQ(ierr);
433       } else {
434         if (mat->was_assembled || mat->assembled) {
435           if (!aij->colmap) {
436             ierr = CreateColmap_MPIAIJ_Private(mat); CHKERRQ(ierr);
437           }
438           col = aij->colmap[col] - 1;
439           if (col < 0  && !((Mat_SeqAIJ*)(aij->A->data))->nonew) {
440             ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
441             col = (int) PetscReal(values[3*i+1]);
442           }
443         }
444         ierr = MatSetValues(aij->B,1,&row,1,&col,&val,addv); CHKERRQ(ierr);
445       }
446     }
447     count--;
448   }
449   if (aij->recv_waits) PetscFree(aij->recv_waits);
450   if (aij->rvalues)    PetscFree(aij->rvalues);
451 
452   /* wait on sends */
453   if (aij->nsends) {
454     send_status = (MPI_Status *) PetscMalloc(aij->nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
455     ierr        = MPI_Waitall(aij->nsends,aij->send_waits,send_status);CHKERRQ(ierr);
456     PetscFree(send_status);
457   }
458   if (aij->send_waits) PetscFree(aij->send_waits);
459   if (aij->svalues)    PetscFree(aij->svalues);
460 
461   ierr = MatAssemblyBegin(aij->A,mode); CHKERRQ(ierr);
462   ierr = MatAssemblyEnd(aij->A,mode); CHKERRQ(ierr);
463 
464   /* determine if any processor has disassembled, if so we must
465      also disassemble ourselfs, in order that we may reassemble. */
466   /*
467      if nonzero structure of submatrix B cannot change then we know that
468      no processor disassembled thus we can skip this stuff
469   */
470   if (!((Mat_SeqAIJ*) aij->B->data)->nonew)  {
471     ierr = MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);CHKERRQ(ierr);
472     if (mat->was_assembled && !other_disassembled) {
473       ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
474     }
475   }
476 
477   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
478     ierr = MatSetUpMultiply_MPIAIJ(mat); CHKERRQ(ierr);
479   }
480   ierr = MatAssemblyBegin(aij->B,mode); CHKERRQ(ierr);
481   ierr = MatAssemblyEnd(aij->B,mode); CHKERRQ(ierr);
482 
483   if (aij->rowvalues) {PetscFree(aij->rowvalues); aij->rowvalues = 0;}
484   PetscFunctionReturn(0);
485 }
486 
487 #undef __FUNC__
488 #define __FUNC__ "MatZeroEntries_MPIAIJ"
489 int MatZeroEntries_MPIAIJ(Mat A)
490 {
491   Mat_MPIAIJ *l = (Mat_MPIAIJ *) A->data;
492   int        ierr;
493 
494   PetscFunctionBegin;
495   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
496   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
497   PetscFunctionReturn(0);
498 }
499 
500 #undef __FUNC__
501 #define __FUNC__ "MatZeroRows_MPIAIJ"
502 int MatZeroRows_MPIAIJ(Mat A,IS is,Scalar *diag)
503 {
504   Mat_MPIAIJ     *l = (Mat_MPIAIJ *) A->data;
505   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
506   int            *procs,*nprocs,j,found,idx,nsends,*work,row;
507   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
508   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
509   int            *lens,imdex,*lrows,*values,rstart=l->rstart;
510   MPI_Comm       comm = A->comm;
511   MPI_Request    *send_waits,*recv_waits;
512   MPI_Status     recv_status,*send_status;
513   IS             istmp;
514   PetscTruth     localdiag;
515 
516   PetscFunctionBegin;
517   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
518   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
519 
520   /*  first count number of contributors to each processor */
521   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
522   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
523   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
524   for ( i=0; i<N; i++ ) {
525     idx = rows[i];
526     found = 0;
527     for ( j=0; j<size; j++ ) {
528       if (idx >= owners[j] && idx < owners[j+1]) {
529         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
530       }
531     }
532     if (!found) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Index out of range");
533   }
534   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
535 
536   /* inform other processors of number of messages and max length*/
537   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
538   ierr = MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
539   nrecvs = work[rank];
540   ierr = MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);CHKERRQ(ierr);
541   nmax = work[rank];
542   PetscFree(work);
543 
544   /* post receives:   */
545   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int));CHKPTRQ(rvalues);
546   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
547   for ( i=0; i<nrecvs; i++ ) {
548     ierr = MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);CHKERRQ(ierr);
549   }
550 
551   /* do sends:
552       1) starts[i] gives the starting index in svalues for stuff going to
553          the ith processor
554   */
555   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
556   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
557   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
558   starts[0] = 0;
559   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
560   for ( i=0; i<N; i++ ) {
561     svalues[starts[owner[i]]++] = rows[i];
562   }
563   ISRestoreIndices(is,&rows);
564 
565   starts[0] = 0;
566   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
567   count = 0;
568   for ( i=0; i<size; i++ ) {
569     if (procs[i]) {
570       ierr = MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);CHKERRQ(ierr);
571     }
572   }
573   PetscFree(starts);
574 
575   base = owners[rank];
576 
577   /*  wait on receives */
578   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
579   source = lens + nrecvs;
580   count  = nrecvs; slen = 0;
581   while (count) {
582     ierr = MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);CHKERRQ(ierr);
583     /* unpack receives into our local space */
584     ierr = MPI_Get_count(&recv_status,MPI_INT,&n);CHKERRQ(ierr);
585     source[imdex]  = recv_status.MPI_SOURCE;
586     lens[imdex]  = n;
587     slen += n;
588     count--;
589   }
590   PetscFree(recv_waits);
591 
592   /* move the data into the send scatter */
593   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
594   count = 0;
595   for ( i=0; i<nrecvs; i++ ) {
596     values = rvalues + i*nmax;
597     for ( j=0; j<lens[i]; j++ ) {
598       lrows[count++] = values[j] - base;
599     }
600   }
601   PetscFree(rvalues); PetscFree(lens);
602   PetscFree(owner); PetscFree(nprocs);
603 
604   /* actually zap the local rows */
605   ierr = ISCreateGeneral(PETSC_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
606   PLogObjectParent(A,istmp);
607 
608   /*
609         Zero the required rows. If the "diagonal block" of the matrix
610      is square and the user wishes to set the diagonal we use seperate
611      code so that MatSetValues() is not called for each diagonal allocating
612      new memory, thus calling lots of mallocs and slowing things down.
613 
614        Contributed by: Mathew Knepley
615   */
616   localdiag = PETSC_FALSE;
617   if (diag && (l->A->M == l->A->N)) {
618     localdiag = PETSC_TRUE;
619     ierr      = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
620   } else {
621     ierr = MatZeroRows(l->A,istmp,0); CHKERRQ(ierr);
622   }
623   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
624   ierr = ISDestroy(istmp); CHKERRQ(ierr);
625 
626   if (diag && (localdiag == PETSC_FALSE)) {
627     for ( i = 0; i < slen; i++ ) {
628       row = lrows[i] + rstart;
629       MatSetValues(A,1,&row,1,&row,diag,INSERT_VALUES);
630     }
631     MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY);
632     MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY);
633   }
634 
635   if (diag) {
636     for ( i = 0; i < slen; i++ ) {
637       row = lrows[i] + rstart;
638       MatSetValues(A,1,&row,1,&row,diag,INSERT_VALUES);
639     }
640     MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY);
641     MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY);
642   }
643   PetscFree(lrows);
644 
645   /* wait on sends */
646   if (nsends) {
647     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
648     ierr        = MPI_Waitall(nsends,send_waits,send_status);CHKERRQ(ierr);
649     PetscFree(send_status);
650   }
651   PetscFree(send_waits); PetscFree(svalues);
652 
653   PetscFunctionReturn(0);
654 }
655 
656 #undef __FUNC__
657 #define __FUNC__ "MatMult_MPIAIJ"
658 int MatMult_MPIAIJ(Mat A,Vec xx,Vec yy)
659 {
660   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
661   int        ierr,nt;
662 
663   PetscFunctionBegin;
664   ierr = VecGetLocalSize(xx,&nt);  CHKERRQ(ierr);
665   if (nt != a->n) {
666     SETERRQ(PETSC_ERR_ARG_SIZ,0,"Incompatible partition of A and xx");
667   }
668   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx); CHKERRQ(ierr);
669   ierr = (*a->A->ops->mult)(a->A,xx,yy); CHKERRQ(ierr);
670   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx); CHKERRQ(ierr);
671   ierr = (*a->B->ops->multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
672   PetscFunctionReturn(0);
673 }
674 
675 #undef __FUNC__
676 #define __FUNC__ "MatMultAdd_MPIAIJ"
677 int MatMultAdd_MPIAIJ(Mat A,Vec xx,Vec yy,Vec zz)
678 {
679   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
680   int        ierr;
681 
682   PetscFunctionBegin;
683   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
684   ierr = (*a->A->ops->multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
685   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
686   ierr = (*a->B->ops->multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
687   PetscFunctionReturn(0);
688 }
689 
690 #undef __FUNC__
691 #define __FUNC__ "MatMultTrans_MPIAIJ"
692 int MatMultTrans_MPIAIJ(Mat A,Vec xx,Vec yy)
693 {
694   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
695   int        ierr;
696 
697   PetscFunctionBegin;
698   /* do nondiagonal part */
699   ierr = (*a->B->ops->multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
700   /* send it on its way */
701   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
702   /* do local part */
703   ierr = (*a->A->ops->multtrans)(a->A,xx,yy); CHKERRQ(ierr);
704   /* receive remote parts: note this assumes the values are not actually */
705   /* inserted in yy until the next line, which is true for my implementation*/
706   /* but is not perhaps always true. */
707   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
708   PetscFunctionReturn(0);
709 }
710 
711 #undef __FUNC__
712 #define __FUNC__ "MatMultTransAdd_MPIAIJ"
713 int MatMultTransAdd_MPIAIJ(Mat A,Vec xx,Vec yy,Vec zz)
714 {
715   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
716   int        ierr;
717 
718   PetscFunctionBegin;
719   /* do nondiagonal part */
720   ierr = (*a->B->ops->multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
721   /* send it on its way */
722   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
723   /* do local part */
724   ierr = (*a->A->ops->multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
725   /* receive remote parts: note this assumes the values are not actually */
726   /* inserted in yy until the next line, which is true for my implementation*/
727   /* but is not perhaps always true. */
728   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
729   PetscFunctionReturn(0);
730 }
731 
732 /*
733   This only works correctly for square matrices where the subblock A->A is the
734    diagonal block
735 */
736 #undef __FUNC__
737 #define __FUNC__ "MatGetDiagonal_MPIAIJ"
738 int MatGetDiagonal_MPIAIJ(Mat A,Vec v)
739 {
740   int        ierr;
741   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
742 
743   PetscFunctionBegin;
744   if (a->M != a->N) SETERRQ(PETSC_ERR_SUP,0,"Supports only square matrix where A->A is diag block");
745   if (a->rstart != a->cstart || a->rend != a->cend) {
746     SETERRQ(PETSC_ERR_ARG_SIZ,0,"row partition must equal col partition");
747   }
748   ierr = MatGetDiagonal(a->A,v);CHKERRQ(ierr);
749   PetscFunctionReturn(0);
750 }
751 
752 #undef __FUNC__
753 #define __FUNC__ "MatScale_MPIAIJ"
754 int MatScale_MPIAIJ(Scalar *aa,Mat A)
755 {
756   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
757   int        ierr;
758 
759   PetscFunctionBegin;
760   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
761   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
762   PetscFunctionReturn(0);
763 }
764 
765 #undef __FUNC__
766 #define __FUNC__ "MatDestroy_MPIAIJ"
767 int MatDestroy_MPIAIJ(Mat mat)
768 {
769   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
770   int        ierr;
771 
772   PetscFunctionBegin;
773   if (--mat->refct > 0) PetscFunctionReturn(0);
774 
775   if (mat->mapping) {
776     ierr = ISLocalToGlobalMappingDestroy(mat->mapping); CHKERRQ(ierr);
777   }
778   if (mat->bmapping) {
779     ierr = ISLocalToGlobalMappingDestroy(mat->bmapping); CHKERRQ(ierr);
780   }
781   if (mat->rmap) {
782     ierr = MapDestroy(mat->rmap);CHKERRQ(ierr);
783   }
784   if (mat->cmap) {
785     ierr = MapDestroy(mat->cmap);CHKERRQ(ierr);
786   }
787 #if defined(USE_PETSC_LOG)
788   PLogObjectState((PetscObject)mat,"Rows=%d, Cols=%d",aij->M,aij->N);
789 #endif
790   ierr = StashDestroy_Private(&aij->stash); CHKERRQ(ierr);
791   PetscFree(aij->rowners);
792   ierr = MatDestroy(aij->A); CHKERRQ(ierr);
793   ierr = MatDestroy(aij->B); CHKERRQ(ierr);
794   if (aij->colmap) PetscFree(aij->colmap);
795   if (aij->garray) PetscFree(aij->garray);
796   if (aij->lvec)   VecDestroy(aij->lvec);
797   if (aij->Mvctx)  VecScatterDestroy(aij->Mvctx);
798   if (aij->rowvalues) PetscFree(aij->rowvalues);
799   PetscFree(aij);
800   PLogObjectDestroy(mat);
801   PetscHeaderDestroy(mat);
802   PetscFunctionReturn(0);
803 }
804 
805 #undef __FUNC__
806 #define __FUNC__ "MatView_MPIAIJ_Binary"
807 extern int MatView_MPIAIJ_Binary(Mat mat,Viewer viewer)
808 {
809   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
810   int         ierr;
811 
812   PetscFunctionBegin;
813   if (aij->size == 1) {
814     ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
815   }
816   else SETERRQ(PETSC_ERR_SUP,0,"Only uniprocessor output supported");
817   PetscFunctionReturn(0);
818 }
819 
820 #undef __FUNC__
821 #define __FUNC__ "MatView_MPIAIJ_ASCIIorDraworMatlab"
822 extern int MatView_MPIAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
823 {
824   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
825   Mat_SeqAIJ* C = (Mat_SeqAIJ*)aij->A->data;
826   int         ierr, format,shift = C->indexshift,rank;
827   FILE        *fd;
828   ViewerType  vtype;
829 
830   PetscFunctionBegin;
831   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
832   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
833     ierr = ViewerGetFormat(viewer,&format);
834     if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
835       MatInfo info;
836       int     flg;
837       MPI_Comm_rank(mat->comm,&rank);
838       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
839       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
840       ierr = OptionsHasName(PETSC_NULL,"-mat_aij_no_inode",&flg); CHKERRQ(ierr);
841       PetscSequentialPhaseBegin(mat->comm,1);
842       if (flg) fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d mem %d, not using I-node routines\n",
843          rank,aij->m,(int)info.nz_used,(int)info.nz_allocated,(int)info.memory);
844       else fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d mem %d, using I-node routines\n",
845          rank,aij->m,(int)info.nz_used,(int)info.nz_allocated,(int)info.memory);
846       ierr = MatGetInfo(aij->A,MAT_LOCAL,&info);
847       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used);
848       ierr = MatGetInfo(aij->B,MAT_LOCAL,&info);
849       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used);
850       fflush(fd);
851       PetscSequentialPhaseEnd(mat->comm,1);
852       ierr = VecScatterView(aij->Mvctx,viewer); CHKERRQ(ierr);
853       PetscFunctionReturn(0);
854     } else if (format == VIEWER_FORMAT_ASCII_INFO) {
855       PetscFunctionReturn(0);
856     }
857   }
858 
859   if (vtype == DRAW_VIEWER) {
860     Draw       draw;
861     PetscTruth isnull;
862     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
863     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) PetscFunctionReturn(0);
864   }
865 
866   if (vtype == ASCII_FILE_VIEWER) {
867     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
868     PetscSequentialPhaseBegin(mat->comm,1);
869     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
870            aij->rank,aij->m,aij->rstart,aij->rend,aij->n,aij->cstart,
871            aij->cend);
872     ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
873     ierr = MatView(aij->B,viewer); CHKERRQ(ierr);
874     fflush(fd);
875     PetscSequentialPhaseEnd(mat->comm,1);
876   } else {
877     int size = aij->size;
878     rank = aij->rank;
879     if (size == 1) {
880       ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
881     } else {
882       /* assemble the entire matrix onto first processor. */
883       Mat         A;
884       Mat_SeqAIJ *Aloc;
885       int         M = aij->M, N = aij->N,m,*ai,*aj,row,*cols,i,*ct;
886       Scalar      *a;
887 
888       if (!rank) {
889         ierr = MatCreateMPIAIJ(mat->comm,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);CHKERRQ(ierr);
890       } else {
891         ierr = MatCreateMPIAIJ(mat->comm,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);CHKERRQ(ierr);
892       }
893       PLogObjectParent(mat,A);
894 
895       /* copy over the A part */
896       Aloc = (Mat_SeqAIJ*) aij->A->data;
897       m = Aloc->m; ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
898       row = aij->rstart;
899       for ( i=0; i<ai[m]+shift; i++ ) {aj[i] += aij->cstart + shift;}
900       for ( i=0; i<m; i++ ) {
901         ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],aj,a,INSERT_VALUES);CHKERRQ(ierr);
902         row++; a += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
903       }
904       aj = Aloc->j;
905       for ( i=0; i<ai[m]+shift; i++ ) {aj[i] -= aij->cstart + shift;}
906 
907       /* copy over the B part */
908       Aloc = (Mat_SeqAIJ*) aij->B->data;
909       m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
910       row = aij->rstart;
911       ct = cols = (int *) PetscMalloc( (ai[m]+1)*sizeof(int) ); CHKPTRQ(cols);
912       for ( i=0; i<ai[m]+shift; i++ ) {cols[i] = aij->garray[aj[i]+shift];}
913       for ( i=0; i<m; i++ ) {
914         ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],cols,a,INSERT_VALUES);CHKERRQ(ierr);
915         row++; a += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
916       }
917       PetscFree(ct);
918       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
919       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
920       /*
921          Everyone has to call to draw the matrix since the graphics waits are
922          synchronized across all processors that share the Draw object
923       */
924       if (!rank || vtype == DRAW_VIEWER) {
925         ierr = MatView(((Mat_MPIAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
926       }
927       ierr = MatDestroy(A); CHKERRQ(ierr);
928     }
929   }
930   PetscFunctionReturn(0);
931 }
932 
933 #undef __FUNC__
934 #define __FUNC__ "MatView_MPIAIJ"
935 int MatView_MPIAIJ(Mat mat,Viewer viewer)
936 {
937   int         ierr;
938   ViewerType  vtype;
939 
940   PetscFunctionBegin;
941   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
942   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
943       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
944     ierr = MatView_MPIAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
945   } else if (vtype == BINARY_FILE_VIEWER) {
946     ierr = MatView_MPIAIJ_Binary(mat,viewer);CHKERRQ(ierr);
947   } else {
948     SETERRQ(1,1,"Viewer type not supported by PETSc object");
949   }
950   PetscFunctionReturn(0);
951 }
952 
953 /*
954     This has to provide several versions.
955 
956      2) a) use only local smoothing updating outer values only once.
957         b) local smoothing updating outer values each inner iteration
958      3) color updating out values betwen colors.
959 */
960 #undef __FUNC__
961 #define __FUNC__ "MatRelax_MPIAIJ"
962 int MatRelax_MPIAIJ(Mat matin,Vec bb,double omega,MatSORType flag,
963                            double fshift,int its,Vec xx)
964 {
965   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
966   Mat        AA = mat->A, BB = mat->B;
967   Mat_SeqAIJ *A = (Mat_SeqAIJ *) AA->data, *B = (Mat_SeqAIJ *)BB->data;
968   Scalar     *b,*x,*xs,*ls,d,*v,sum;
969   int        ierr,*idx, *diag;
970   int        n = mat->n, m = mat->m, i,shift = A->indexshift;
971 
972   PetscFunctionBegin;
973   ierr = VecGetArray(xx,&x); CHKERRQ(ierr);
974   ierr = VecGetArray(bb,&b); CHKERRQ(ierr);
975   ierr = VecGetArray(mat->lvec,&ls);CHKERRQ(ierr);
976   xs = x + shift; /* shift by one for index start of 1 */
977   ls = ls + shift;
978   if (!A->diag) {ierr = MatMarkDiag_SeqAIJ(AA); CHKERRQ(ierr);}
979   diag = A->diag;
980   if ((flag & SOR_LOCAL_SYMMETRIC_SWEEP) == SOR_LOCAL_SYMMETRIC_SWEEP){
981     if (flag & SOR_ZERO_INITIAL_GUESS) {
982       ierr = (*mat->A->ops->relax)(mat->A,bb,omega,flag,fshift,its,xx);CHKERRQ(ierr);
983       ierr = VecRestoreArray(xx,&x); CHKERRQ(ierr);
984       ierr = VecRestoreArray(bb,&b); CHKERRQ(ierr);
985       ierr = VecRestoreArray(mat->lvec,&ls);CHKERRQ(ierr);
986       PetscFunctionReturn(0);
987     }
988     ierr = VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);CHKERRQ(ierr);
989     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);CHKERRQ(ierr);
990     while (its--) {
991       /* go down through the rows */
992       for ( i=0; i<m; i++ ) {
993         n    = A->i[i+1] - A->i[i];
994 	PLogFlops(4*n+3);
995         idx  = A->j + A->i[i] + shift;
996         v    = A->a + A->i[i] + shift;
997         sum  = b[i];
998         SPARSEDENSEMDOT(sum,xs,v,idx,n);
999         d    = fshift + A->a[diag[i]+shift];
1000         n    = B->i[i+1] - B->i[i];
1001         idx  = B->j + B->i[i] + shift;
1002         v    = B->a + B->i[i] + shift;
1003         SPARSEDENSEMDOT(sum,ls,v,idx,n);
1004         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
1005       }
1006       /* come up through the rows */
1007       for ( i=m-1; i>-1; i-- ) {
1008         n    = A->i[i+1] - A->i[i];
1009 	PLogFlops(4*n+3)
1010         idx  = A->j + A->i[i] + shift;
1011         v    = A->a + A->i[i] + shift;
1012         sum  = b[i];
1013         SPARSEDENSEMDOT(sum,xs,v,idx,n);
1014         d    = fshift + A->a[diag[i]+shift];
1015         n    = B->i[i+1] - B->i[i];
1016         idx  = B->j + B->i[i] + shift;
1017         v    = B->a + B->i[i] + shift;
1018         SPARSEDENSEMDOT(sum,ls,v,idx,n);
1019         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
1020       }
1021     }
1022   } else if (flag & SOR_LOCAL_FORWARD_SWEEP){
1023     if (flag & SOR_ZERO_INITIAL_GUESS) {
1024       ierr = (*mat->A->ops->relax)(mat->A,bb,omega,flag,fshift,its,xx);CHKERRQ(ierr);
1025       ierr = VecRestoreArray(xx,&x); CHKERRQ(ierr);
1026       ierr = VecRestoreArray(bb,&b); CHKERRQ(ierr);
1027       ierr = VecRestoreArray(mat->lvec,&ls);CHKERRQ(ierr);
1028       PetscFunctionReturn(0);
1029     }
1030     ierr = VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);CHKERRQ(ierr);
1031     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);CHKERRQ(ierr);
1032     while (its--) {
1033       for ( i=0; i<m; i++ ) {
1034         n    = A->i[i+1] - A->i[i];
1035 	PLogFlops(4*n+3);
1036         idx  = A->j + A->i[i] + shift;
1037         v    = A->a + A->i[i] + shift;
1038         sum  = b[i];
1039         SPARSEDENSEMDOT(sum,xs,v,idx,n);
1040         d    = fshift + A->a[diag[i]+shift];
1041         n    = B->i[i+1] - B->i[i];
1042         idx  = B->j + B->i[i] + shift;
1043         v    = B->a + B->i[i] + shift;
1044         SPARSEDENSEMDOT(sum,ls,v,idx,n);
1045         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
1046       }
1047     }
1048   } else if (flag & SOR_LOCAL_BACKWARD_SWEEP){
1049     if (flag & SOR_ZERO_INITIAL_GUESS) {
1050       ierr = (*mat->A->ops->relax)(mat->A,bb,omega,flag,fshift,its,xx);CHKERRQ(ierr);
1051       ierr = VecRestoreArray(xx,&x); CHKERRQ(ierr);
1052       ierr = VecRestoreArray(bb,&b); CHKERRQ(ierr);
1053       ierr = VecRestoreArray(mat->lvec,&ls);CHKERRQ(ierr);
1054       PetscFunctionReturn(0);
1055     }
1056     ierr = VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);CHKERRQ(ierr);
1057     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);CHKERRQ(ierr);
1058     while (its--) {
1059       for ( i=m-1; i>-1; i-- ) {
1060         n    = A->i[i+1] - A->i[i];
1061 	PLogFlops(4*n+3);
1062         idx  = A->j + A->i[i] + shift;
1063         v    = A->a + A->i[i] + shift;
1064         sum  = b[i];
1065         SPARSEDENSEMDOT(sum,xs,v,idx,n);
1066         d    = fshift + A->a[diag[i]+shift];
1067         n    = B->i[i+1] - B->i[i];
1068         idx  = B->j + B->i[i] + shift;
1069         v    = B->a + B->i[i] + shift;
1070         SPARSEDENSEMDOT(sum,ls,v,idx,n);
1071         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
1072       }
1073     }
1074   } else {
1075     SETERRQ(PETSC_ERR_SUP,0,"Parallel SOR not supported");
1076   }
1077   ierr = VecRestoreArray(xx,&x); CHKERRQ(ierr);
1078   ierr = VecRestoreArray(bb,&b); CHKERRQ(ierr);
1079   ierr = VecRestoreArray(mat->lvec,&ls);CHKERRQ(ierr);
1080   PetscFunctionReturn(0);
1081 }
1082 
1083 #undef __FUNC__
1084 #define __FUNC__ "MatGetInfo_MPIAIJ"
1085 int MatGetInfo_MPIAIJ(Mat matin,MatInfoType flag,MatInfo *info)
1086 {
1087   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1088   Mat        A = mat->A, B = mat->B;
1089   int        ierr;
1090   double     isend[5], irecv[5];
1091 
1092   PetscFunctionBegin;
1093   info->block_size     = 1.0;
1094   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
1095   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->nz_unneeded;
1096   isend[3] = info->memory;  isend[4] = info->mallocs;
1097   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
1098   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->nz_unneeded;
1099   isend[3] += info->memory;  isend[4] += info->mallocs;
1100   if (flag == MAT_LOCAL) {
1101     info->nz_used      = isend[0];
1102     info->nz_allocated = isend[1];
1103     info->nz_unneeded  = isend[2];
1104     info->memory       = isend[3];
1105     info->mallocs      = isend[4];
1106   } else if (flag == MAT_GLOBAL_MAX) {
1107     ierr = MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_MAX,matin->comm);CHKERRQ(ierr);
1108     info->nz_used      = irecv[0];
1109     info->nz_allocated = irecv[1];
1110     info->nz_unneeded  = irecv[2];
1111     info->memory       = irecv[3];
1112     info->mallocs      = irecv[4];
1113   } else if (flag == MAT_GLOBAL_SUM) {
1114     ierr = MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_SUM,matin->comm);CHKERRQ(ierr);
1115     info->nz_used      = irecv[0];
1116     info->nz_allocated = irecv[1];
1117     info->nz_unneeded  = irecv[2];
1118     info->memory       = irecv[3];
1119     info->mallocs      = irecv[4];
1120   }
1121   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
1122   info->fill_ratio_needed = 0;
1123   info->factor_mallocs    = 0;
1124   info->rows_global       = (double)mat->M;
1125   info->columns_global    = (double)mat->N;
1126   info->rows_local        = (double)mat->m;
1127   info->columns_local     = (double)mat->N;
1128 
1129   PetscFunctionReturn(0);
1130 }
1131 
1132 #undef __FUNC__
1133 #define __FUNC__ "MatSetOption_MPIAIJ"
1134 int MatSetOption_MPIAIJ(Mat A,MatOption op)
1135 {
1136   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
1137 
1138   PetscFunctionBegin;
1139   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
1140       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
1141       op == MAT_COLUMNS_UNSORTED ||
1142       op == MAT_COLUMNS_SORTED ||
1143       op == MAT_NEW_NONZERO_ALLOCATION_ERROR ||
1144       op == MAT_NEW_NONZERO_LOCATION_ERROR) {
1145         MatSetOption(a->A,op);
1146         MatSetOption(a->B,op);
1147   } else if (op == MAT_ROW_ORIENTED) {
1148     a->roworiented = 1;
1149     MatSetOption(a->A,op);
1150     MatSetOption(a->B,op);
1151   } else if (op == MAT_ROWS_SORTED ||
1152              op == MAT_ROWS_UNSORTED ||
1153              op == MAT_SYMMETRIC ||
1154              op == MAT_STRUCTURALLY_SYMMETRIC ||
1155              op == MAT_YES_NEW_DIAGONALS)
1156     PLogInfo(A,"MatSetOption_MPIAIJ:Option ignored\n");
1157   else if (op == MAT_COLUMN_ORIENTED) {
1158     a->roworiented = 0;
1159     MatSetOption(a->A,op);
1160     MatSetOption(a->B,op);
1161   } else if (op == MAT_IGNORE_OFF_PROC_ENTRIES) {
1162     a->donotstash = 1;
1163   } else if (op == MAT_NO_NEW_DIAGONALS){
1164     SETERRQ(PETSC_ERR_SUP,0,"MAT_NO_NEW_DIAGONALS");
1165   } else {
1166     SETERRQ(PETSC_ERR_SUP,0,"unknown option");
1167   }
1168   PetscFunctionReturn(0);
1169 }
1170 
1171 #undef __FUNC__
1172 #define __FUNC__ "MatGetSize_MPIAIJ"
1173 int MatGetSize_MPIAIJ(Mat matin,int *m,int *n)
1174 {
1175   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1176 
1177   PetscFunctionBegin;
1178   if (m) *m = mat->M;
1179   if (n) *n = mat->N;
1180   PetscFunctionReturn(0);
1181 }
1182 
1183 #undef __FUNC__
1184 #define __FUNC__ "MatGetLocalSize_MPIAIJ"
1185 int MatGetLocalSize_MPIAIJ(Mat matin,int *m,int *n)
1186 {
1187   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1188 
1189   PetscFunctionBegin;
1190   if (m) *m = mat->m;
1191   if (n) *n = mat->n;
1192   PetscFunctionReturn(0);
1193 }
1194 
1195 #undef __FUNC__
1196 #define __FUNC__ "MatGetOwnershipRange_MPIAIJ"
1197 int MatGetOwnershipRange_MPIAIJ(Mat matin,int *m,int *n)
1198 {
1199   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1200 
1201   PetscFunctionBegin;
1202   *m = mat->rstart; *n = mat->rend;
1203   PetscFunctionReturn(0);
1204 }
1205 
1206 extern int MatGetRow_SeqAIJ(Mat,int,int*,int**,Scalar**);
1207 extern int MatRestoreRow_SeqAIJ(Mat,int,int*,int**,Scalar**);
1208 
1209 #undef __FUNC__
1210 #define __FUNC__ "MatGetRow_MPIAIJ"
1211 int MatGetRow_MPIAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
1212 {
1213   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1214   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
1215   int        i, ierr, *cworkA, *cworkB, **pcA, **pcB, cstart = mat->cstart;
1216   int        nztot, nzA, nzB, lrow, rstart = mat->rstart, rend = mat->rend;
1217   int        *cmap, *idx_p;
1218 
1219   PetscFunctionBegin;
1220   if (mat->getrowactive == PETSC_TRUE) SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"Already active");
1221   mat->getrowactive = PETSC_TRUE;
1222 
1223   if (!mat->rowvalues && (idx || v)) {
1224     /*
1225         allocate enough space to hold information from the longest row.
1226     */
1227     Mat_SeqAIJ *Aa = (Mat_SeqAIJ *) mat->A->data,*Ba = (Mat_SeqAIJ *) mat->B->data;
1228     int     max = 1,m = mat->m,tmp;
1229     for ( i=0; i<m; i++ ) {
1230       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
1231       if (max < tmp) { max = tmp; }
1232     }
1233     mat->rowvalues  = (Scalar *) PetscMalloc(max*(sizeof(int)+sizeof(Scalar)));CHKPTRQ(mat->rowvalues);
1234     mat->rowindices = (int *) (mat->rowvalues + max);
1235   }
1236 
1237   if (row < rstart || row >= rend) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Only local rows")
1238   lrow = row - rstart;
1239 
1240   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
1241   if (!v)   {pvA = 0; pvB = 0;}
1242   if (!idx) {pcA = 0; if (!v) pcB = 0;}
1243   ierr = (*mat->A->ops->getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1244   ierr = (*mat->B->ops->getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1245   nztot = nzA + nzB;
1246 
1247   cmap  = mat->garray;
1248   if (v  || idx) {
1249     if (nztot) {
1250       /* Sort by increasing column numbers, assuming A and B already sorted */
1251       int imark = -1;
1252       if (v) {
1253         *v = v_p = mat->rowvalues;
1254         for ( i=0; i<nzB; i++ ) {
1255           if (cmap[cworkB[i]] < cstart)   v_p[i] = vworkB[i];
1256           else break;
1257         }
1258         imark = i;
1259         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
1260         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
1261       }
1262       if (idx) {
1263         *idx = idx_p = mat->rowindices;
1264         if (imark > -1) {
1265           for ( i=0; i<imark; i++ ) {
1266             idx_p[i] = cmap[cworkB[i]];
1267           }
1268         } else {
1269           for ( i=0; i<nzB; i++ ) {
1270             if (cmap[cworkB[i]] < cstart)   idx_p[i] = cmap[cworkB[i]];
1271             else break;
1272           }
1273           imark = i;
1274         }
1275         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart + cworkA[i];
1276         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]];
1277       }
1278     } else {
1279       if (idx) *idx = 0;
1280       if (v)   *v   = 0;
1281     }
1282   }
1283   *nz = nztot;
1284   ierr = (*mat->A->ops->restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1285   ierr = (*mat->B->ops->restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1286   PetscFunctionReturn(0);
1287 }
1288 
1289 #undef __FUNC__
1290 #define __FUNC__ "MatRestoreRow_MPIAIJ"
1291 int MatRestoreRow_MPIAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
1292 {
1293   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
1294 
1295   PetscFunctionBegin;
1296   if (aij->getrowactive == PETSC_FALSE) {
1297     SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"MatGetRow not called");
1298   }
1299   aij->getrowactive = PETSC_FALSE;
1300   PetscFunctionReturn(0);
1301 }
1302 
1303 #undef __FUNC__
1304 #define __FUNC__ "MatNorm_MPIAIJ"
1305 int MatNorm_MPIAIJ(Mat mat,NormType type,double *norm)
1306 {
1307   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
1308   Mat_SeqAIJ *amat = (Mat_SeqAIJ*) aij->A->data, *bmat = (Mat_SeqAIJ*) aij->B->data;
1309   int        ierr, i, j, cstart = aij->cstart,shift = amat->indexshift;
1310   double     sum = 0.0;
1311   Scalar     *v;
1312 
1313   PetscFunctionBegin;
1314   if (aij->size == 1) {
1315     ierr =  MatNorm(aij->A,type,norm); CHKERRQ(ierr);
1316   } else {
1317     if (type == NORM_FROBENIUS) {
1318       v = amat->a;
1319       for (i=0; i<amat->nz; i++ ) {
1320 #if defined(USE_PETSC_COMPLEX)
1321         sum += PetscReal(PetscConj(*v)*(*v)); v++;
1322 #else
1323         sum += (*v)*(*v); v++;
1324 #endif
1325       }
1326       v = bmat->a;
1327       for (i=0; i<bmat->nz; i++ ) {
1328 #if defined(USE_PETSC_COMPLEX)
1329         sum += PetscReal(PetscConj(*v)*(*v)); v++;
1330 #else
1331         sum += (*v)*(*v); v++;
1332 #endif
1333       }
1334       ierr = MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);CHKERRQ(ierr);
1335       *norm = sqrt(*norm);
1336     } else if (type == NORM_1) { /* max column norm */
1337       double *tmp, *tmp2;
1338       int    *jj, *garray = aij->garray;
1339       tmp  = (double *) PetscMalloc( (aij->N+1)*sizeof(double) ); CHKPTRQ(tmp);
1340       tmp2 = (double *) PetscMalloc( (aij->N+1)*sizeof(double) ); CHKPTRQ(tmp2);
1341       PetscMemzero(tmp,aij->N*sizeof(double));
1342       *norm = 0.0;
1343       v = amat->a; jj = amat->j;
1344       for ( j=0; j<amat->nz; j++ ) {
1345         tmp[cstart + *jj++ + shift] += PetscAbsScalar(*v);  v++;
1346       }
1347       v = bmat->a; jj = bmat->j;
1348       for ( j=0; j<bmat->nz; j++ ) {
1349         tmp[garray[*jj++ + shift]] += PetscAbsScalar(*v); v++;
1350       }
1351       ierr = MPI_Allreduce(tmp,tmp2,aij->N,MPI_DOUBLE,MPI_SUM,mat->comm);CHKERRQ(ierr);
1352       for ( j=0; j<aij->N; j++ ) {
1353         if (tmp2[j] > *norm) *norm = tmp2[j];
1354       }
1355       PetscFree(tmp); PetscFree(tmp2);
1356     } else if (type == NORM_INFINITY) { /* max row norm */
1357       double ntemp = 0.0;
1358       for ( j=0; j<amat->m; j++ ) {
1359         v = amat->a + amat->i[j] + shift;
1360         sum = 0.0;
1361         for ( i=0; i<amat->i[j+1]-amat->i[j]; i++ ) {
1362           sum += PetscAbsScalar(*v); v++;
1363         }
1364         v = bmat->a + bmat->i[j] + shift;
1365         for ( i=0; i<bmat->i[j+1]-bmat->i[j]; i++ ) {
1366           sum += PetscAbsScalar(*v); v++;
1367         }
1368         if (sum > ntemp) ntemp = sum;
1369       }
1370       ierr = MPI_Allreduce(&ntemp,norm,1,MPI_DOUBLE,MPI_MAX,mat->comm);CHKERRQ(ierr);
1371     } else {
1372       SETERRQ(PETSC_ERR_SUP,0,"No support for two norm");
1373     }
1374   }
1375   PetscFunctionReturn(0);
1376 }
1377 
1378 #undef __FUNC__
1379 #define __FUNC__ "MatTranspose_MPIAIJ"
1380 int MatTranspose_MPIAIJ(Mat A,Mat *matout)
1381 {
1382   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
1383   Mat_SeqAIJ *Aloc = (Mat_SeqAIJ *) a->A->data;
1384   int        ierr,shift = Aloc->indexshift;
1385   int        M = a->M, N = a->N,m,*ai,*aj,row,*cols,i,*ct;
1386   Mat        B;
1387   Scalar     *array;
1388 
1389   PetscFunctionBegin;
1390   if (matout == PETSC_NULL && M != N) {
1391     SETERRQ(PETSC_ERR_ARG_SIZ,0,"Square matrix only for in-place");
1392   }
1393 
1394   ierr = MatCreateMPIAIJ(A->comm,a->n,a->m,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);CHKERRQ(ierr);
1395 
1396   /* copy over the A part */
1397   Aloc = (Mat_SeqAIJ*) a->A->data;
1398   m = Aloc->m; ai = Aloc->i; aj = Aloc->j; array = Aloc->a;
1399   row = a->rstart;
1400   for ( i=0; i<ai[m]+shift; i++ ) {aj[i] += a->cstart + shift;}
1401   for ( i=0; i<m; i++ ) {
1402     ierr = MatSetValues(B,ai[i+1]-ai[i],aj,1,&row,array,INSERT_VALUES);CHKERRQ(ierr);
1403     row++; array += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
1404   }
1405   aj = Aloc->j;
1406   for ( i=0; i<ai[m]+shift; i++ ) {aj[i] -= a->cstart + shift;}
1407 
1408   /* copy over the B part */
1409   Aloc = (Mat_SeqAIJ*) a->B->data;
1410   m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; array = Aloc->a;
1411   row = a->rstart;
1412   ct = cols = (int *) PetscMalloc( (1+ai[m]-shift)*sizeof(int) ); CHKPTRQ(cols);
1413   for ( i=0; i<ai[m]+shift; i++ ) {cols[i] = a->garray[aj[i]+shift];}
1414   for ( i=0; i<m; i++ ) {
1415     ierr = MatSetValues(B,ai[i+1]-ai[i],cols,1,&row,array,INSERT_VALUES);CHKERRQ(ierr);
1416     row++; array += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
1417   }
1418   PetscFree(ct);
1419   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1420   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1421   if (matout != PETSC_NULL) {
1422     *matout = B;
1423   } else {
1424     PetscOps       *Abops;
1425     struct _MatOps *Aops;
1426 
1427     /* This isn't really an in-place transpose .... but free data structures from a */
1428     PetscFree(a->rowners);
1429     ierr = MatDestroy(a->A); CHKERRQ(ierr);
1430     ierr = MatDestroy(a->B); CHKERRQ(ierr);
1431     if (a->colmap) PetscFree(a->colmap);
1432     if (a->garray) PetscFree(a->garray);
1433     if (a->lvec) VecDestroy(a->lvec);
1434     if (a->Mvctx) VecScatterDestroy(a->Mvctx);
1435     PetscFree(a);
1436 
1437     /*
1438        This is horrible, horrible code. We need to keep the
1439       A pointers for the bops and ops but copy everything
1440       else from C.
1441     */
1442     Abops = A->bops;
1443     Aops  = A->ops;
1444     PetscMemcpy(A,B,sizeof(struct _p_Mat));
1445     A->bops = Abops;
1446     A->ops  = Aops;
1447     PetscHeaderDestroy(B);
1448   }
1449   PetscFunctionReturn(0);
1450 }
1451 
1452 #undef __FUNC__
1453 #define __FUNC__ "MatDiagonalScale_MPIAIJ"
1454 int MatDiagonalScale_MPIAIJ(Mat mat,Vec ll,Vec rr)
1455 {
1456   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
1457   Mat a = aij->A, b = aij->B;
1458   int ierr,s1,s2,s3;
1459 
1460   PetscFunctionBegin;
1461   ierr = MatGetLocalSize(mat,&s2,&s3); CHKERRQ(ierr);
1462   if (rr) {
1463     ierr = VecGetLocalSize(rr,&s1);CHKERRQ(ierr);
1464     if (s1!=s3) SETERRQ(PETSC_ERR_ARG_SIZ,0,"right vector non-conforming local size");
1465     /* Overlap communication with computation. */
1466     ierr = VecScatterBegin(rr,aij->lvec,INSERT_VALUES,SCATTER_FORWARD,aij->Mvctx); CHKERRQ(ierr);
1467   }
1468   if (ll) {
1469     ierr = VecGetLocalSize(ll,&s1);CHKERRQ(ierr);
1470     if (s1!=s2) SETERRQ(PETSC_ERR_ARG_SIZ,0,"left vector non-conforming local size");
1471     ierr = (*b->ops->diagonalscale)(b,ll,0); CHKERRQ(ierr);
1472   }
1473   /* scale  the diagonal block */
1474   ierr = (*a->ops->diagonalscale)(a,ll,rr); CHKERRQ(ierr);
1475 
1476   if (rr) {
1477     /* Do a scatter end and then right scale the off-diagonal block */
1478     ierr = VecScatterEnd(rr,aij->lvec,INSERT_VALUES,SCATTER_FORWARD,aij->Mvctx); CHKERRQ(ierr);
1479     ierr = (*b->ops->diagonalscale)(b,0,aij->lvec); CHKERRQ(ierr);
1480   }
1481 
1482   PetscFunctionReturn(0);
1483 }
1484 
1485 
1486 extern int MatPrintHelp_SeqAIJ(Mat);
1487 #undef __FUNC__
1488 #define __FUNC__ "MatPrintHelp_MPIAIJ"
1489 int MatPrintHelp_MPIAIJ(Mat A)
1490 {
1491   Mat_MPIAIJ *a   = (Mat_MPIAIJ*) A->data;
1492   int        ierr;
1493 
1494   PetscFunctionBegin;
1495   if (!a->rank) {
1496     ierr = MatPrintHelp_SeqAIJ(a->A);CHKERRQ(ierr);
1497   }
1498   PetscFunctionReturn(0);
1499 }
1500 
1501 #undef __FUNC__
1502 #define __FUNC__ "MatGetBlockSize_MPIAIJ"
1503 int MatGetBlockSize_MPIAIJ(Mat A,int *bs)
1504 {
1505   PetscFunctionBegin;
1506   *bs = 1;
1507   PetscFunctionReturn(0);
1508 }
1509 #undef __FUNC__
1510 #define __FUNC__ "MatSetUnfactored_MPIAIJ"
1511 int MatSetUnfactored_MPIAIJ(Mat A)
1512 {
1513   Mat_MPIAIJ *a   = (Mat_MPIAIJ*) A->data;
1514   int        ierr;
1515 
1516   PetscFunctionBegin;
1517   ierr = MatSetUnfactored(a->A); CHKERRQ(ierr);
1518   PetscFunctionReturn(0);
1519 }
1520 
1521 #undef __FUNC__
1522 #define __FUNC__ "MatEqual_MPIAIJ"
1523 int MatEqual_MPIAIJ(Mat A, Mat B, PetscTruth *flag)
1524 {
1525   Mat_MPIAIJ *matB = (Mat_MPIAIJ *) B->data,*matA = (Mat_MPIAIJ *) A->data;
1526   Mat        a, b, c, d;
1527   PetscTruth flg;
1528   int        ierr;
1529 
1530   PetscFunctionBegin;
1531   if (B->type != MATMPIAIJ) SETERRQ(PETSC_ERR_ARG_INCOMP,0,"Matrices must be same type");
1532   a = matA->A; b = matA->B;
1533   c = matB->A; d = matB->B;
1534 
1535   ierr = MatEqual(a, c, &flg); CHKERRQ(ierr);
1536   if (flg == PETSC_TRUE) {
1537     ierr = MatEqual(b, d, &flg); CHKERRQ(ierr);
1538   }
1539   ierr = MPI_Allreduce(&flg, flag, 1, MPI_INT, MPI_LAND, A->comm);CHKERRQ(ierr);
1540   PetscFunctionReturn(0);
1541 }
1542 
1543 #undef __FUNC__
1544 #define __FUNC__ "MatCopy_MPIAIJ"
1545 int MatCopy_MPIAIJ(Mat A,Mat B,MatStructure str)
1546 {
1547   int        ierr;
1548   Mat_MPIAIJ *a = (Mat_MPIAIJ *)A->data;
1549   Mat_MPIAIJ *b = (Mat_MPIAIJ *)B->data;
1550 
1551   PetscFunctionBegin;
1552   if (str != SAME_NONZERO_PATTERN || B->type != MATMPIAIJ) {
1553     /* because of the column compression in the off-processor part of the matrix a->B,
1554        the number of columns in a->B and b->B may be different, hence we cannot call
1555        the MatCopy() directly on the two parts. If need be, we can provide a more
1556        efficient copy than the MatCopy_Basic() by first uncompressing the a->B matrices
1557        then copying the submatrices */
1558     ierr = MatCopy_Basic(A,B,str);CHKERRQ(ierr);
1559   } else {
1560     ierr = MatCopy(a->A,b->A,str);CHKERRQ(ierr);
1561     ierr = MatCopy(a->B,b->B,str);CHKERRQ(ierr);
1562   }
1563   PetscFunctionReturn(0);
1564 }
1565 
1566 extern int MatDuplicate_MPIAIJ(Mat,MatDuplicateOption,Mat *);
1567 extern int MatIncreaseOverlap_MPIAIJ(Mat , int, IS *, int);
1568 extern int MatFDColoringCreate_MPIAIJ(Mat,ISColoring,MatFDColoring);
1569 extern int MatGetSubMatrices_MPIAIJ (Mat ,int , IS *,IS *,MatGetSubMatrixCall,Mat **);
1570 extern int MatGetSubMatrix_MPIAIJ (Mat ,IS,IS,int,MatGetSubMatrixCall,Mat *);
1571 
1572 /* -------------------------------------------------------------------*/
1573 static struct _MatOps MatOps_Values = {MatSetValues_MPIAIJ,
1574        MatGetRow_MPIAIJ,
1575        MatRestoreRow_MPIAIJ,
1576        MatMult_MPIAIJ,
1577        MatMultAdd_MPIAIJ,
1578        MatMultTrans_MPIAIJ,
1579        MatMultTransAdd_MPIAIJ,
1580        0,
1581        0,
1582        0,
1583        0,
1584        0,
1585        0,
1586        MatRelax_MPIAIJ,
1587        MatTranspose_MPIAIJ,
1588        MatGetInfo_MPIAIJ,
1589        MatEqual_MPIAIJ,
1590        MatGetDiagonal_MPIAIJ,
1591        MatDiagonalScale_MPIAIJ,
1592        MatNorm_MPIAIJ,
1593        MatAssemblyBegin_MPIAIJ,
1594        MatAssemblyEnd_MPIAIJ,
1595        0,
1596        MatSetOption_MPIAIJ,
1597        MatZeroEntries_MPIAIJ,
1598        MatZeroRows_MPIAIJ,
1599        0,
1600        0,
1601        0,
1602        0,
1603        MatGetSize_MPIAIJ,
1604        MatGetLocalSize_MPIAIJ,
1605        MatGetOwnershipRange_MPIAIJ,
1606        0,
1607        0,
1608        0,
1609        0,
1610        MatDuplicate_MPIAIJ,
1611        0,
1612        0,
1613        0,
1614        0,
1615        0,
1616        MatGetSubMatrices_MPIAIJ,
1617        MatIncreaseOverlap_MPIAIJ,
1618        MatGetValues_MPIAIJ,
1619        MatCopy_MPIAIJ,
1620        MatPrintHelp_MPIAIJ,
1621        MatScale_MPIAIJ,
1622        0,
1623        0,
1624        0,
1625        MatGetBlockSize_MPIAIJ,
1626        0,
1627        0,
1628        0,
1629        0,
1630        MatFDColoringCreate_MPIAIJ,
1631        0,
1632        MatSetUnfactored_MPIAIJ,
1633        0,
1634        0,
1635        MatGetSubMatrix_MPIAIJ,
1636        0,
1637        0,
1638        MatGetMaps_Petsc};
1639 
1640 /* ----------------------------------------------------------------------------------------*/
1641 
1642 EXTERN_C_BEGIN
1643 #undef __FUNC__
1644 #define __FUNC__ "MatStoreValues_MPIAIJ"
1645 int MatStoreValues_MPIAIJ(Mat mat)
1646 {
1647   Mat_MPIAIJ *aij = (Mat_MPIAIJ *)mat->data;
1648   int        ierr;
1649 
1650   PetscFunctionBegin;
1651   ierr = MatStoreValues(aij->A);CHKERRQ(ierr);
1652   ierr = MatStoreValues(aij->B);CHKERRQ(ierr);
1653   PetscFunctionReturn(0);
1654 }
1655 EXTERN_C_END
1656 
1657 EXTERN_C_BEGIN
1658 #undef __FUNC__
1659 #define __FUNC__ "MatRetrieveValues_MPIAIJ"
1660 int MatRetrieveValues_MPIAIJ(Mat mat)
1661 {
1662   Mat_MPIAIJ *aij = (Mat_MPIAIJ *)mat->data;
1663   int        ierr;
1664 
1665   PetscFunctionBegin;
1666   ierr = MatRetrieveValues(aij->A);CHKERRQ(ierr);
1667   ierr = MatRetrieveValues(aij->B);CHKERRQ(ierr);
1668   PetscFunctionReturn(0);
1669 }
1670 EXTERN_C_END
1671 
1672 #undef __FUNC__
1673 #define __FUNC__ "MatCreateMPIAIJ"
1674 /*@C
1675    MatCreateMPIAIJ - Creates a sparse parallel matrix in AIJ format
1676    (the default parallel PETSc format).  For good matrix assembly performance
1677    the user should preallocate the matrix storage by setting the parameters
1678    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1679    performance can be increased by more than a factor of 50.
1680 
1681    Collective on MPI_Comm
1682 
1683    Input Parameters:
1684 +  comm - MPI communicator
1685 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1686            This value should be the same as the local size used in creating the
1687            y vector for the matrix-vector product y = Ax.
1688 .  n - This value should be the same as the local size used in creating the
1689        x vector for the matrix-vector product y = Ax. (or PETSC_DECIDE to have
1690        calculated if N is given) For square matrices n is almost always m.
1691 .  M - number of global rows (or PETSC_DETERMINE to have calculated if m is given)
1692 .  N - number of global columns (or PETSC_DETERMINE to have calculated if n is given)
1693 .  d_nz  - number of nonzeros per row in DIAGONAL portion of local submatrix
1694            (same value is used for all local rows)
1695 .  d_nnz - array containing the number of nonzeros in the various rows of the
1696            DIAGONAL portion of the local submatrix (possibly different for each row)
1697            or PETSC_NULL, if d_nz is used to specify the nonzero structure.
1698            The size of this array is equal to the number of local rows, i.e 'm'.
1699            You must leave room for the diagonal entry even if it is zero.
1700 .  o_nz  - number of nonzeros per row in the OFF-DIAGONAL portion of local
1701            submatrix (same value is used for all local rows).
1702 -  o_nnz - array containing the number of nonzeros in the various rows of the
1703            OFF-DIAGONAL portion of the local submatrix (possibly different for
1704            each row) or PETSC_NULL, if o_nz is used to specify the nonzero
1705            structure. The size of this array is equal to the number
1706            of local rows, i.e 'm'.
1707 
1708    Output Parameter:
1709 .  A - the matrix
1710 
1711    Notes:
1712    m,n,M,N parameters specify the size of the matrix, and its partitioning across
1713    processors, while d_nz,d_nnz,o_nz,o_nnz parameters specify the approximate
1714    storage requirements for this matrix.
1715 
1716    If PETSC_DECIDE or  PETSC_DETERMINE is used for a particular argument on one
1717    processor than it must be used on all processors that share the object for
1718    that argument.
1719 
1720    The AIJ format (also called the Yale sparse matrix format or
1721    compressed row storage), is fully compatible with standard Fortran 77
1722    storage.  That is, the stored row and column indices can begin at
1723    either one (as in Fortran) or zero.  See the users manual for details.
1724 
1725    The user MUST specify either the local or global matrix dimensions
1726    (possibly both).
1727 
1728    The parallel matrix is partitioned such that the first m0 rows belong to
1729    process 0, the next m1 rows belong to process 1, the next m2 rows belong
1730    to process 2 etc.. where m0,m1,m2... are the input parameter 'm'.
1731 
1732    The DIAGONAL portion of the local submatrix of a processor can be defined
1733    as the submatrix which is obtained by extraction the part corresponding
1734    to the rows r1-r2 and columns r1-r2 of the global matrix, where r1 is the
1735    first row that belongs to the processor, and r2 is the last row belonging
1736    to the this processor. This is a square mxm matrix. The remaining portion
1737    of the local submatrix (mxN) constitute the OFF-DIAGONAL portion.
1738 
1739    If o_nnz, d_nnz are specified, then o_nz, and d_nz are ignored.
1740 
1741    By default, this format uses inodes (identical nodes) when possible.
1742    We search for consecutive rows with the same nonzero structure, thereby
1743    reusing matrix information to achieve increased efficiency.
1744 
1745    Options Database Keys:
1746 +  -mat_aij_no_inode  - Do not use inodes
1747 .  -mat_aij_inode_limit <limit> - Sets inode limit (max limit=5)
1748 -  -mat_aij_oneindex - Internally use indexing starting at 1
1749         rather than 0.  Note that when calling MatSetValues(),
1750         the user still MUST index entries starting at 0!
1751 .   -mat_mpi - use the parallel matrix data structures even on one processor
1752                (defaults to using SeqBAIJ format on one processor)
1753 .   -mat_mpi - use the parallel matrix data structures even on one processor
1754                (defaults to using SeqAIJ format on one processor)
1755 
1756 
1757    Example usage:
1758 
1759    Consider the following 8x8 matrix with 34 non-zero values, that is
1760    assembled across 3 processors. Lets assume that proc0 owns 3 rows,
1761    proc1 owns 3 rows, proc2 owns 2 rows. This division can be shown
1762    as follows:
1763 
1764 .vb
1765             1  2  0  |  0  3  0  |  0  4
1766     Proc0   0  5  6  |  7  0  0  |  8  0
1767             9  0 10  | 11  0  0  | 12  0
1768     -------------------------------------
1769            13  0 14  | 15 16 17  |  0  0
1770     Proc1   0 18  0  | 19 20 21  |  0  0
1771             0  0  0  | 22 23  0  | 24  0
1772     -------------------------------------
1773     Proc2  25 26 27  |  0  0 28  | 29  0
1774            30  0  0  | 31 32 33  |  0 34
1775 .ve
1776 
1777    This can be represented as a collection of submatrices as:
1778 
1779 .vb
1780       A B C
1781       D E F
1782       G H I
1783 .ve
1784 
1785    Where the submatrices A,B,C are owned by proc0, D,E,F are
1786    owned by proc1, G,H,I are owned by proc2.
1787 
1788    The 'm' parameters for proc0,proc1,proc2 are 3,3,2 respectively.
1789    The 'n' parameters for proc0,proc1,proc2 are 3,3,2 respectively.
1790    The 'M','N' parameters are 8,8, and have the same values on all procs.
1791 
1792    The DIAGONAL submatrices corresponding to proc0,proc1,proc2 are
1793    submatrices [A], [E], [I] respectively. The OFF-DIAGONAL submatrices
1794    corresponding to proc0,proc1,proc2 are [BC], [DF], [GH] respectively.
1795    Internally, each processor stores the DIAGONAL part, and the OFF-DIAGONAL
1796    part as SeqAIJ matrices. for eg: proc1 will store [E] as a SeqAIJ
1797    matrix, ans [DF] as another SeqAIJ matrix.
1798 
1799    When d_nz, o_nz parameters are specified, d_nz storage elements are
1800    allocated for every row of the local diagonal submatrix, and o_nz
1801    storage locations are allocated for every row of the OFF-DIAGONAL submat.
1802    One way to choose d_nz and o_nz is to use the max nonzerors per local
1803    rows for each of the local DIAGONAL, and the OFF-DIAGONAL submatrices.
1804    In this case, the values of d_nz,o_nz are:
1805 .vb
1806      proc0 : dnz = 2, o_nz = 2
1807      proc1 : dnz = 3, o_nz = 2
1808      proc2 : dnz = 1, o_nz = 4
1809 .ve
1810    We are allocating m*(d_nz+o_nz) storage locations for every proc. This
1811    translates to 3*(2+2)=12 for proc0, 3*(3+2)=15 for proc1, 2*(1+4)=10
1812    for proc3. i.e we are using 12+15+10=37 storage locations to store
1813    34 values.
1814 
1815    When d_nnz, o_nnz parameters are specified, the storage is specified
1816    for every row, coresponding to both DIAGONAL and OFF-DIAGONAL submatrices.
1817    In the above case the values for d_nnz,o_nnz are:
1818 .vb
1819      proc0: d_nnz = [2,2,2] and o_nnz = [2,2,2]
1820      proc1: d_nnz = [3,3,2] and o_nnz = [2,1,1]
1821      proc2: d_nnz = [1,1]   and o_nnz = [4,4]
1822 .ve
1823    Here the space allocated is sum of all the above values i.e 34, and
1824    hence pre-allocation is perfect.
1825 
1826 .keywords: matrix, aij, compressed row, sparse, parallel
1827 
1828 .seealso: MatCreate(), MatCreateSeqAIJ(), MatSetValues()
1829 @*/
1830 int MatCreateMPIAIJ(MPI_Comm comm,int m,int n,int M,int N,int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1831 {
1832   Mat          B;
1833   Mat_MPIAIJ   *b;
1834   int          ierr, i,sum[2],work[2],size,flag1 = 0, flag2 = 0;
1835 
1836   PetscFunctionBegin;
1837   MPI_Comm_size(comm,&size);
1838   ierr = OptionsHasName(PETSC_NULL,"-mat_mpiaij",&flag1); CHKERRQ(ierr);
1839   ierr = OptionsHasName(PETSC_NULL,"-mat_mpi",&flag2); CHKERRQ(ierr);
1840   if (!flag1 && !flag2 && size == 1) {
1841     if (M == PETSC_DECIDE) M = m;
1842     if (N == PETSC_DECIDE) N = n;
1843     ierr = MatCreateSeqAIJ(comm,M,N,d_nz,d_nnz,A); CHKERRQ(ierr);
1844     PetscFunctionReturn(0);
1845   }
1846 
1847   *A = 0;
1848   PetscHeaderCreate(B,_p_Mat,struct _MatOps,MAT_COOKIE,MATMPIAIJ,comm,MatDestroy,MatView);
1849   PLogObjectCreate(B);
1850   B->data            = (void *) (b = PetscNew(Mat_MPIAIJ)); CHKPTRQ(b);
1851   PetscMemzero(b,sizeof(Mat_MPIAIJ));
1852   PetscMemcpy(B->ops,&MatOps_Values,sizeof(struct _MatOps));
1853   B->ops->destroy    = MatDestroy_MPIAIJ;
1854   B->ops->view       = MatView_MPIAIJ;
1855   B->factor          = 0;
1856   B->assembled       = PETSC_FALSE;
1857   B->mapping         = 0;
1858 
1859   B->insertmode      = NOT_SET_VALUES;
1860   b->size            = size;
1861   MPI_Comm_rank(comm,&b->rank);
1862 
1863   if (m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL)) {
1864     SETERRQ(PETSC_ERR_ARG_WRONG,0,"Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1865   }
1866 
1867   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1868     work[0] = m; work[1] = n;
1869     ierr = MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );CHKERRQ(ierr);
1870     if (M == PETSC_DECIDE) M = sum[0];
1871     if (N == PETSC_DECIDE) N = sum[1];
1872   }
1873   if (m == PETSC_DECIDE) {m = M/b->size + ((M % b->size) > b->rank);}
1874   if (n == PETSC_DECIDE) {n = N/b->size + ((N % b->size) > b->rank);}
1875   b->m = m; B->m = m;
1876   b->n = n; B->n = n;
1877   b->N = N; B->N = N;
1878   b->M = M; B->M = M;
1879 
1880   /* the information in the maps duplicates the information computed below, eventually
1881      we should remove the duplicate information that is not contained in the maps */
1882   ierr = MapCreateMPI(comm,m,M,&B->rmap);CHKERRQ(ierr);
1883   ierr = MapCreateMPI(comm,n,N,&B->cmap);CHKERRQ(ierr);
1884 
1885   /* build local table of row and column ownerships */
1886   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1887   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIAIJ));
1888   b->cowners = b->rowners + b->size + 2;
1889   ierr = MPI_Allgather(&m,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1890   b->rowners[0] = 0;
1891   for ( i=2; i<=b->size; i++ ) {
1892     b->rowners[i] += b->rowners[i-1];
1893   }
1894   b->rstart = b->rowners[b->rank];
1895   b->rend   = b->rowners[b->rank+1];
1896   ierr = MPI_Allgather(&n,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1897   b->cowners[0] = 0;
1898   for ( i=2; i<=b->size; i++ ) {
1899     b->cowners[i] += b->cowners[i-1];
1900   }
1901   b->cstart = b->cowners[b->rank];
1902   b->cend   = b->cowners[b->rank+1];
1903 
1904   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1905   ierr = MatCreateSeqAIJ(PETSC_COMM_SELF,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1906   PLogObjectParent(B,b->A);
1907   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1908   ierr = MatCreateSeqAIJ(PETSC_COMM_SELF,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1909   PLogObjectParent(B,b->B);
1910 
1911   /* build cache for off array entries formed */
1912   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1913   b->donotstash  = 0;
1914   b->colmap      = 0;
1915   b->garray      = 0;
1916   b->roworiented = 1;
1917 
1918   /* stuff used for matrix vector multiply */
1919   b->lvec      = 0;
1920   b->Mvctx     = 0;
1921 
1922   /* stuff for MatGetRow() */
1923   b->rowindices   = 0;
1924   b->rowvalues    = 0;
1925   b->getrowactive = PETSC_FALSE;
1926 
1927   ierr = PetscObjectComposeFunction((PetscObject)B,"MatStoreValues_C",
1928                                      "MatStoreValues_MPIAIJ",
1929                                      (void*)MatStoreValues_MPIAIJ);CHKERRQ(ierr);
1930   ierr = PetscObjectComposeFunction((PetscObject)B,"MatRetrieveValues_C",
1931                                      "MatRetrieveValues_MPIAIJ",
1932                                      (void*)MatRetrieveValues_MPIAIJ);CHKERRQ(ierr);
1933   *A = B;
1934   PetscFunctionReturn(0);
1935 }
1936 
1937 #undef __FUNC__
1938 #define __FUNC__ "MatDuplicate_MPIAIJ"
1939 int MatDuplicate_MPIAIJ(Mat matin,MatDuplicateOption cpvalues,Mat *newmat)
1940 {
1941   Mat        mat;
1942   Mat_MPIAIJ *a,*oldmat = (Mat_MPIAIJ *) matin->data;
1943   int        ierr, len=0, flg;
1944 
1945   PetscFunctionBegin;
1946   *newmat       = 0;
1947   PetscHeaderCreate(mat,_p_Mat,struct _MatOps,MAT_COOKIE,MATMPIAIJ,matin->comm,MatDestroy,MatView);
1948   PLogObjectCreate(mat);
1949   mat->data       = (void *) (a = PetscNew(Mat_MPIAIJ)); CHKPTRQ(a);
1950   PetscMemcpy(mat->ops,&MatOps_Values,sizeof(struct _MatOps));
1951   mat->ops->destroy    = MatDestroy_MPIAIJ;
1952   mat->ops->view       = MatView_MPIAIJ;
1953   mat->factor     = matin->factor;
1954   mat->assembled  = PETSC_TRUE;
1955 
1956   a->m = mat->m   = oldmat->m;
1957   a->n = mat->n   = oldmat->n;
1958   a->M = mat->M   = oldmat->M;
1959   a->N = mat->N   = oldmat->N;
1960 
1961   a->rstart       = oldmat->rstart;
1962   a->rend         = oldmat->rend;
1963   a->cstart       = oldmat->cstart;
1964   a->cend         = oldmat->cend;
1965   a->size         = oldmat->size;
1966   a->rank         = oldmat->rank;
1967   mat->insertmode = NOT_SET_VALUES;
1968   a->rowvalues    = 0;
1969   a->getrowactive = PETSC_FALSE;
1970 
1971   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1972   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIAIJ));
1973   a->cowners = a->rowners + a->size + 2;
1974   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1975   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1976   if (oldmat->colmap) {
1977     a->colmap = (int *) PetscMalloc((a->N)*sizeof(int));CHKPTRQ(a->colmap);
1978     PLogObjectMemory(mat,(a->N)*sizeof(int));
1979     PetscMemcpy(a->colmap,oldmat->colmap,(a->N)*sizeof(int));
1980   } else a->colmap = 0;
1981   if (oldmat->garray) {
1982     len = ((Mat_SeqAIJ *) (oldmat->B->data))->n;
1983     a->garray = (int *) PetscMalloc((len+1)*sizeof(int)); CHKPTRQ(a->garray);
1984     PLogObjectMemory(mat,len*sizeof(int));
1985     if (len) PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1986   } else a->garray = 0;
1987 
1988   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1989   PLogObjectParent(mat,a->lvec);
1990   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1991   PLogObjectParent(mat,a->Mvctx);
1992   ierr =  MatDuplicate(oldmat->A,cpvalues,&a->A); CHKERRQ(ierr);
1993   PLogObjectParent(mat,a->A);
1994   ierr =  MatDuplicate(oldmat->B,cpvalues,&a->B); CHKERRQ(ierr);
1995   PLogObjectParent(mat,a->B);
1996   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1997   if (flg) {
1998     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1999   }
2000   *newmat = mat;
2001   PetscFunctionReturn(0);
2002 }
2003 
2004 #include "sys.h"
2005 
2006 #undef __FUNC__
2007 #define __FUNC__ "MatLoad_MPIAIJ"
2008 int MatLoad_MPIAIJ(Viewer viewer,MatType type,Mat *newmat)
2009 {
2010   Mat          A;
2011   Scalar       *vals,*svals;
2012   MPI_Comm     comm = ((PetscObject)viewer)->comm;
2013   MPI_Status   status;
2014   int          i, nz, ierr, j,rstart, rend, fd;
2015   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,maxnz,*cols;
2016   int          *ourlens,*sndcounts = 0,*procsnz = 0, *offlens,jj,*mycols,*smycols;
2017   int          tag = ((PetscObject)viewer)->tag,cend,cstart,n;
2018 
2019   PetscFunctionBegin;
2020   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
2021   if (!rank) {
2022     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
2023     ierr = PetscBinaryRead(fd,(char *)header,4,PETSC_INT); CHKERRQ(ierr);
2024     if (header[0] != MAT_COOKIE) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"not matrix object");
2025     if (header[3] < 0) {
2026       SETERRQ(PETSC_ERR_FILE_UNEXPECTED,1,"Matrix in special format on disk, cannot load as MPIAIJ");
2027     }
2028   }
2029 
2030   ierr = MPI_Bcast(header+1,3,MPI_INT,0,comm);CHKERRQ(ierr);
2031   M = header[1]; N = header[2];
2032   /* determine ownership of all rows */
2033   m = M/size + ((M % size) > rank);
2034   rowners = (int *) PetscMalloc((size+2)*sizeof(int)); CHKPTRQ(rowners);
2035   ierr = MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
2036   rowners[0] = 0;
2037   for ( i=2; i<=size; i++ ) {
2038     rowners[i] += rowners[i-1];
2039   }
2040   rstart = rowners[rank];
2041   rend   = rowners[rank+1];
2042 
2043   /* distribute row lengths to all processors */
2044   ourlens = (int*) PetscMalloc( 2*(rend-rstart)*sizeof(int) ); CHKPTRQ(ourlens);
2045   offlens = ourlens + (rend-rstart);
2046   if (!rank) {
2047     rowlengths = (int*) PetscMalloc( M*sizeof(int) ); CHKPTRQ(rowlengths);
2048     ierr = PetscBinaryRead(fd,rowlengths,M,PETSC_INT); CHKERRQ(ierr);
2049     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
2050     for ( i=0; i<size; i++ ) sndcounts[i] = rowners[i+1] - rowners[i];
2051     ierr = MPI_Scatterv(rowlengths,sndcounts,rowners,MPI_INT,ourlens,rend-rstart,MPI_INT,0,comm);CHKERRQ(ierr);
2052     PetscFree(sndcounts);
2053   } else {
2054     ierr = MPI_Scatterv(0,0,0,MPI_INT,ourlens,rend-rstart,MPI_INT, 0,comm);CHKERRQ(ierr);
2055   }
2056 
2057   if (!rank) {
2058     /* calculate the number of nonzeros on each processor */
2059     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
2060     PetscMemzero(procsnz,size*sizeof(int));
2061     for ( i=0; i<size; i++ ) {
2062       for ( j=rowners[i]; j< rowners[i+1]; j++ ) {
2063         procsnz[i] += rowlengths[j];
2064       }
2065     }
2066     PetscFree(rowlengths);
2067 
2068     /* determine max buffer needed and allocate it */
2069     maxnz = 0;
2070     for ( i=0; i<size; i++ ) {
2071       maxnz = PetscMax(maxnz,procsnz[i]);
2072     }
2073     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
2074 
2075     /* read in my part of the matrix column indices  */
2076     nz     = procsnz[0];
2077     mycols = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
2078     ierr   = PetscBinaryRead(fd,mycols,nz,PETSC_INT); CHKERRQ(ierr);
2079 
2080     /* read in every one elses and ship off */
2081     for ( i=1; i<size; i++ ) {
2082       nz   = procsnz[i];
2083       ierr = PetscBinaryRead(fd,cols,nz,PETSC_INT); CHKERRQ(ierr);
2084       ierr = MPI_Send(cols,nz,MPI_INT,i,tag,comm);CHKERRQ(ierr);
2085     }
2086     PetscFree(cols);
2087   } else {
2088     /* determine buffer space needed for message */
2089     nz = 0;
2090     for ( i=0; i<m; i++ ) {
2091       nz += ourlens[i];
2092     }
2093     mycols = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
2094 
2095     /* receive message of column indices*/
2096     ierr = MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);CHKERRQ(ierr);
2097     ierr = MPI_Get_count(&status,MPI_INT,&maxnz);CHKERRQ(ierr);
2098     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"something is wrong with file");
2099   }
2100 
2101   /* determine column ownership if matrix is not square */
2102   if (N != M) {
2103     n      = N/size + ((N % size) > rank);
2104     ierr   = MPI_Scan(&n,&cend,1,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
2105     cstart = cend - n;
2106   } else {
2107     cstart = rstart;
2108     cend   = rend;
2109     n      = cend - cstart;
2110   }
2111 
2112   /* loop over local rows, determining number of off diagonal entries */
2113   PetscMemzero(offlens,m*sizeof(int));
2114   jj = 0;
2115   for ( i=0; i<m; i++ ) {
2116     for ( j=0; j<ourlens[i]; j++ ) {
2117       if (mycols[jj] < cstart || mycols[jj] >= cend) offlens[i]++;
2118       jj++;
2119     }
2120   }
2121 
2122   /* create our matrix */
2123   for ( i=0; i<m; i++ ) {
2124     ourlens[i] -= offlens[i];
2125   }
2126   ierr = MatCreateMPIAIJ(comm,m,n,M,N,0,ourlens,0,offlens,newmat);CHKERRQ(ierr);
2127   A = *newmat;
2128   ierr = MatSetOption(A,MAT_COLUMNS_SORTED); CHKERRQ(ierr);
2129   for ( i=0; i<m; i++ ) {
2130     ourlens[i] += offlens[i];
2131   }
2132 
2133   if (!rank) {
2134     vals = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(vals);
2135 
2136     /* read in my part of the matrix numerical values  */
2137     nz   = procsnz[0];
2138     ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
2139 
2140     /* insert into matrix */
2141     jj      = rstart;
2142     smycols = mycols;
2143     svals   = vals;
2144     for ( i=0; i<m; i++ ) {
2145       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
2146       smycols += ourlens[i];
2147       svals   += ourlens[i];
2148       jj++;
2149     }
2150 
2151     /* read in other processors and ship out */
2152     for ( i=1; i<size; i++ ) {
2153       nz   = procsnz[i];
2154       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
2155       ierr = MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);CHKERRQ(ierr);
2156     }
2157     PetscFree(procsnz);
2158   } else {
2159     /* receive numeric values */
2160     vals = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(vals);
2161 
2162     /* receive message of values*/
2163     ierr = MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);CHKERRQ(ierr);
2164     ierr = MPI_Get_count(&status,MPIU_SCALAR,&maxnz);CHKERRQ(ierr);
2165     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"something is wrong with file");
2166 
2167     /* insert into matrix */
2168     jj      = rstart;
2169     smycols = mycols;
2170     svals   = vals;
2171     for ( i=0; i<m; i++ ) {
2172       ierr     = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
2173       smycols += ourlens[i];
2174       svals   += ourlens[i];
2175       jj++;
2176     }
2177   }
2178   PetscFree(ourlens); PetscFree(vals); PetscFree(mycols); PetscFree(rowners);
2179 
2180   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2181   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2182   PetscFunctionReturn(0);
2183 }
2184 
2185 #undef __FUNC__
2186 #define __FUNC__ "MatGetSubMatrix_MPIAIJ"
2187 /*
2188     Not great since it makes two copies of the submatrix, first an SeqAIJ
2189   in local and then by concatenating the local matrices the end result.
2190   Writing it directly would be much like MatGetSubMatrices_MPIAIJ()
2191 */
2192 int MatGetSubMatrix_MPIAIJ(Mat mat,IS isrow,IS iscol,int csize,MatGetSubMatrixCall call,Mat *newmat)
2193 {
2194   int        ierr, i, m,n,rstart,row,rend,nz,*cwork,size,rank,j;
2195   Mat        *local,M, Mreuse;
2196   Scalar     *vwork,*aa;
2197   MPI_Comm   comm = mat->comm;
2198   Mat_SeqAIJ *aij;
2199   int        *ii, *jj,nlocal,*dlens,*olens,dlen,olen,jend;
2200 
2201   PetscFunctionBegin;
2202   MPI_Comm_rank(comm,&rank);
2203   MPI_Comm_size(comm,&size);
2204 
2205   if (call ==  MAT_REUSE_MATRIX) {
2206     ierr = PetscObjectQuery((PetscObject)*newmat,"SubMatrix",(PetscObject *)&Mreuse);CHKERRQ(ierr);
2207     if (!Mreuse) SETERRQ(1,1,"Submatrix passed in was not used before, cannot reuse");
2208     local = &Mreuse;
2209     ierr  = MatGetSubMatrices(mat,1,&isrow,&iscol,MAT_REUSE_MATRIX,&local);CHKERRQ(ierr);
2210   } else {
2211     ierr = MatGetSubMatrices(mat,1,&isrow,&iscol,MAT_INITIAL_MATRIX,&local);CHKERRQ(ierr);
2212     Mreuse = *local;
2213     PetscFree(local);
2214   }
2215 
2216   /*
2217       m - number of local rows
2218       n - number of columns (same on all processors)
2219       rstart - first row in new global matrix generated
2220   */
2221   ierr = MatGetSize(Mreuse,&m,&n);CHKERRQ(ierr);
2222   if (call == MAT_INITIAL_MATRIX) {
2223     aij = (Mat_SeqAIJ *) (Mreuse)->data;
2224     if (aij->indexshift) SETERRQ(PETSC_ERR_SUP,1,"No support for index shifted matrix");
2225     ii  = aij->i;
2226     jj  = aij->j;
2227 
2228     /*
2229         Determine the number of non-zeros in the diagonal and off-diagonal
2230         portions of the matrix in order to do correct preallocation
2231     */
2232 
2233     /* first get start and end of "diagonal" columns */
2234     if (csize == PETSC_DECIDE) {
2235       nlocal = n/size + ((n % size) > rank);
2236     } else {
2237       nlocal = csize;
2238     }
2239     ierr   = MPI_Scan(&nlocal,&rend,1,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
2240     rstart = rend - nlocal;
2241     if (rank == size - 1 && rend != n) {
2242       SETERRQ(1,1,"Local column sizes do not add up to total number of columns");
2243     }
2244 
2245     /* next, compute all the lengths */
2246     dlens = (int *) PetscMalloc( (2*m+1)*sizeof(int) );CHKPTRQ(dlens);
2247     olens = dlens + m;
2248     for ( i=0; i<m; i++ ) {
2249       jend = ii[i+1] - ii[i];
2250       olen = 0;
2251       dlen = 0;
2252       for ( j=0; j<jend; j++ ) {
2253         if ( *jj < rstart || *jj >= rend) olen++;
2254         else dlen++;
2255         jj++;
2256       }
2257       olens[i] = olen;
2258       dlens[i] = dlen;
2259     }
2260     ierr = MatCreateMPIAIJ(comm,m,PETSC_DECIDE,PETSC_DECIDE,n,0,dlens,0,olens,&M);CHKERRQ(ierr);
2261     PetscFree(dlens);
2262   } else {
2263     int ml,nl;
2264 
2265     M = *newmat;
2266     ierr = MatGetLocalSize(M,&ml,&nl);CHKERRQ(ierr);
2267     if (ml != m) SETERRQ(PETSC_ERR_ARG_SIZ,1,"Previous matrix must be same size/layout as request");
2268     ierr = MatZeroEntries(M);CHKERRQ(ierr);
2269     /*
2270          The next two lines are needed so we may call MatSetValues_MPIAIJ() below directly,
2271        rather than the slower MatSetValues().
2272     */
2273     M->was_assembled = PETSC_TRUE;
2274     M->assembled     = PETSC_FALSE;
2275   }
2276   ierr = MatGetOwnershipRange(M,&rstart,&rend); CHKERRQ(ierr);
2277   aij = (Mat_SeqAIJ *) (Mreuse)->data;
2278   if (aij->indexshift) SETERRQ(PETSC_ERR_SUP,1,"No support for index shifted matrix");
2279   ii  = aij->i;
2280   jj  = aij->j;
2281   aa  = aij->a;
2282   for (i=0; i<m; i++) {
2283     row   = rstart + i;
2284     nz    = ii[i+1] - ii[i];
2285     cwork = jj;     jj += nz;
2286     vwork = aa;     aa += nz;
2287     ierr = MatSetValues_MPIAIJ(M,1,&row,nz,cwork,vwork,INSERT_VALUES); CHKERRQ(ierr);
2288   }
2289 
2290   ierr = MatAssemblyBegin(M,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2291   ierr = MatAssemblyEnd(M,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2292   *newmat = M;
2293 
2294   /* save submatrix used in processor for next request */
2295   if (call ==  MAT_INITIAL_MATRIX) {
2296     ierr = PetscObjectCompose((PetscObject)M,"SubMatrix",(PetscObject)Mreuse);CHKERRQ(ierr);
2297     ierr = PetscObjectDereference((PetscObject)Mreuse);CHKERRQ(ierr);
2298   }
2299 
2300   PetscFunctionReturn(0);
2301 }
2302