xref: /petsc/src/mat/impls/aij/mpi/mpiaij.c (revision 70423df42639036d7617fd1625ff853510ae094b)
1 #ifndef lint
2 static char vcid[] = "$Id: mpiaij.c,v 1.145 1996/06/27 02:49:15 bsmith Exp bsmith $";
3 #endif
4 
5 #include "mpiaij.h"
6 #include "src/vec/vecimpl.h"
7 #include "src/inline/spops.h"
8 
9 /* local utility routine that creates a mapping from the global column
10 number to the local number in the off-diagonal part of the local
11 storage of the matrix.  This is done in a non scable way since the
12 length of colmap equals the global matrix length.
13 */
14 static int CreateColmap_Private(Mat mat)
15 {
16   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
17   Mat_SeqAIJ *B = (Mat_SeqAIJ*) aij->B->data;
18   int        n = B->n,i,shift = B->indexshift;
19 
20   aij->colmap = (int *) PetscMalloc(aij->N*sizeof(int));CHKPTRQ(aij->colmap);
21   PLogObjectMemory(mat,aij->N*sizeof(int));
22   PetscMemzero(aij->colmap,aij->N*sizeof(int));
23   for ( i=0; i<n; i++ ) aij->colmap[aij->garray[i]] = i-shift;
24   return 0;
25 }
26 
27 extern int DisAssemble_MPIAIJ(Mat);
28 
29 static int MatGetReordering_MPIAIJ(Mat mat,MatReordering type,IS *rperm,IS *cperm)
30 {
31   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
32   int        ierr;
33   if (aij->size == 1) {
34     ierr = MatGetReordering(aij->A,type,rperm,cperm); CHKERRQ(ierr);
35   } else SETERRQ(1,"MatGetReordering_MPIAIJ:not supported in parallel");
36   return 0;
37 }
38 
39 static int MatSetValues_MPIAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
40 {
41   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
42   Mat_SeqAIJ *C = (Mat_SeqAIJ*) aij->A->data;
43   Scalar     value;
44   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
45   int        cstart = aij->cstart, cend = aij->cend,row,col;
46   int        shift = C->indexshift,roworiented = aij->roworiented;
47 
48   if (aij->insertmode != NOT_SET_VALUES && aij->insertmode != addv) {
49     SETERRQ(1,"MatSetValues_MPIAIJ:Cannot mix inserts and adds");
50   }
51   aij->insertmode = addv;
52   for ( i=0; i<m; i++ ) {
53     if (im[i] < 0) SETERRQ(1,"MatSetValues_MPIAIJ:Negative row");
54     if (im[i] >= aij->M) SETERRQ(1,"MatSetValues_MPIAIJ:Row too large");
55     if (im[i] >= rstart && im[i] < rend) {
56       row = im[i] - rstart;
57       for ( j=0; j<n; j++ ) {
58         if (in[j] < 0) SETERRQ(1,"MatSetValues_MPIAIJ:Negative column");
59         if (in[j] >= aij->N) SETERRQ(1,"MatSetValues_MPIAIJ:Col too large");
60         if (in[j] >= cstart && in[j] < cend){
61           col = in[j] - cstart;
62           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
63           ierr = MatSetValues(aij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
64         }
65         else {
66           if (mat->was_assembled) {
67             if (!aij->colmap) {ierr = CreateColmap_Private(mat);CHKERRQ(ierr);}
68             col = aij->colmap[in[j]] + shift;
69             if (col < 0 && !((Mat_SeqAIJ*)(aij->A->data))->nonew) {
70               ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
71               col =  in[j];
72             }
73           }
74           else col = in[j];
75           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
76           ierr = MatSetValues(aij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
77         }
78       }
79     }
80     else {
81       if (roworiented) {
82         ierr = StashValues_Private(&aij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
83       }
84       else {
85         row = im[i];
86         for ( j=0; j<n; j++ ) {
87           ierr = StashValues_Private(&aij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
88         }
89       }
90     }
91   }
92   return 0;
93 }
94 
95 static int MatGetValues_MPIAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
96 {
97   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
98   Mat_SeqAIJ *C = (Mat_SeqAIJ*) aij->A->data;
99   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
100   int        cstart = aij->cstart, cend = aij->cend,row,col;
101   int        shift = C->indexshift;
102 
103   for ( i=0; i<m; i++ ) {
104     if (idxm[i] < 0) SETERRQ(1,"MatGetValues_MPIAIJ:Negative row");
105     if (idxm[i] >= aij->M) SETERRQ(1,"MatGetValues_MPIAIJ:Row too large");
106     if (idxm[i] >= rstart && idxm[i] < rend) {
107       row = idxm[i] - rstart;
108       for ( j=0; j<n; j++ ) {
109         if (idxn[j] < 0) SETERRQ(1,"MatGetValues_MPIAIJ:Negative column");
110         if (idxn[j] >= aij->N) SETERRQ(1,"MatGetValues_MPIAIJ:Col too large");
111         if (idxn[j] >= cstart && idxn[j] < cend){
112           col = idxn[j] - cstart;
113           ierr = MatGetValues(aij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
114         }
115         else {
116           if (!aij->colmap) {ierr = CreateColmap_Private(mat);CHKERRQ(ierr);}
117           col = aij->colmap[idxn[j]] + shift;
118           ierr = MatGetValues(aij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
119         }
120       }
121     }
122     else {
123       SETERRQ(1,"MatGetValues_MPIAIJ:Only local values currently supported");
124     }
125   }
126   return 0;
127 }
128 
129 static int MatAssemblyBegin_MPIAIJ(Mat mat,MatAssemblyType mode)
130 {
131   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
132   MPI_Comm    comm = mat->comm;
133   int         size = aij->size, *owners = aij->rowners;
134   int         rank = aij->rank,tag = mat->tag, *owner,*starts,count,ierr;
135   MPI_Request *send_waits,*recv_waits;
136   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
137   InsertMode  addv;
138   Scalar      *rvalues,*svalues;
139 
140   /* make sure all processors are either in INSERTMODE or ADDMODE */
141   MPI_Allreduce(&aij->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
142   if (addv == (ADD_VALUES|INSERT_VALUES)) {
143     SETERRQ(1,"MatAssemblyBegin_MPIAIJ:Some processors inserted others added");
144   }
145   aij->insertmode = addv; /* in case this processor had no cache */
146 
147   /*  first count number of contributors to each processor */
148   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
149   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
150   owner = (int *) PetscMalloc( (aij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
151   for ( i=0; i<aij->stash.n; i++ ) {
152     idx = aij->stash.idx[i];
153     for ( j=0; j<size; j++ ) {
154       if (idx >= owners[j] && idx < owners[j+1]) {
155         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
156       }
157     }
158   }
159   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
160 
161   /* inform other processors of number of messages and max length*/
162   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
163   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
164   nreceives = work[rank];
165   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
166   nmax = work[rank];
167   PetscFree(work);
168 
169   /* post receives:
170        1) each message will consist of ordered pairs
171      (global index,value) we store the global index as a double
172      to simplify the message passing.
173        2) since we don't know how long each individual message is we
174      allocate the largest needed buffer for each receive. Potentially
175      this is a lot of wasted space.
176 
177 
178        This could be done better.
179   */
180   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
181   CHKPTRQ(rvalues);
182   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
183   CHKPTRQ(recv_waits);
184   for ( i=0; i<nreceives; i++ ) {
185     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
186               comm,recv_waits+i);
187   }
188 
189   /* do sends:
190       1) starts[i] gives the starting index in svalues for stuff going to
191          the ith processor
192   */
193   svalues = (Scalar *) PetscMalloc(3*(aij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
194   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
195   CHKPTRQ(send_waits);
196   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
197   starts[0] = 0;
198   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
199   for ( i=0; i<aij->stash.n; i++ ) {
200     svalues[3*starts[owner[i]]]       = (Scalar)  aij->stash.idx[i];
201     svalues[3*starts[owner[i]]+1]     = (Scalar)  aij->stash.idy[i];
202     svalues[3*(starts[owner[i]]++)+2] =  aij->stash.array[i];
203   }
204   PetscFree(owner);
205   starts[0] = 0;
206   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
207   count = 0;
208   for ( i=0; i<size; i++ ) {
209     if (procs[i]) {
210       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
211                 comm,send_waits+count++);
212     }
213   }
214   PetscFree(starts); PetscFree(nprocs);
215 
216   /* Free cache space */
217   PLogInfo(0,"[%d]MatAssemblyBegin_MPIAIJ:Number of off processor values %d\n",rank,aij->stash.n);
218   ierr = StashDestroy_Private(&aij->stash); CHKERRQ(ierr);
219 
220   aij->svalues    = svalues;    aij->rvalues    = rvalues;
221   aij->nsends     = nsends;     aij->nrecvs     = nreceives;
222   aij->send_waits = send_waits; aij->recv_waits = recv_waits;
223   aij->rmax       = nmax;
224 
225   return 0;
226 }
227 extern int MatSetUpMultiply_MPIAIJ(Mat);
228 
229 static int MatAssemblyEnd_MPIAIJ(Mat mat,MatAssemblyType mode)
230 {
231   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
232   Mat_SeqAIJ *C = (Mat_SeqAIJ *) aij->A->data;
233   MPI_Status  *send_status,recv_status;
234   int         imdex,nrecvs = aij->nrecvs, count = nrecvs, i, n, ierr;
235   int         row,col,other_disassembled,shift = C->indexshift;
236   Scalar      *values,val;
237   InsertMode  addv = aij->insertmode;
238 
239   /*  wait on receives */
240   while (count) {
241     MPI_Waitany(nrecvs,aij->recv_waits,&imdex,&recv_status);
242     /* unpack receives into our local space */
243     values = aij->rvalues + 3*imdex*aij->rmax;
244     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
245     n = n/3;
246     for ( i=0; i<n; i++ ) {
247       row = (int) PetscReal(values[3*i]) - aij->rstart;
248       col = (int) PetscReal(values[3*i+1]);
249       val = values[3*i+2];
250       if (col >= aij->cstart && col < aij->cend) {
251         col -= aij->cstart;
252         MatSetValues(aij->A,1,&row,1,&col,&val,addv);
253       }
254       else {
255         if (mat->was_assembled) {
256           if (!aij->colmap) {ierr = CreateColmap_Private(mat);CHKERRQ(ierr);}
257           col = aij->colmap[col] + shift;
258           if (col < 0  && !((Mat_SeqAIJ*)(aij->A->data))->nonew) {
259             ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
260             col = (int) PetscReal(values[3*i+1]);
261           }
262         }
263         MatSetValues(aij->B,1,&row,1,&col,&val,addv);
264       }
265     }
266     count--;
267   }
268   PetscFree(aij->recv_waits); PetscFree(aij->rvalues);
269 
270   /* wait on sends */
271   if (aij->nsends) {
272     send_status = (MPI_Status *) PetscMalloc(aij->nsends*sizeof(MPI_Status));
273     CHKPTRQ(send_status);
274     MPI_Waitall(aij->nsends,aij->send_waits,send_status);
275     PetscFree(send_status);
276   }
277   PetscFree(aij->send_waits); PetscFree(aij->svalues);
278 
279   aij->insertmode = NOT_SET_VALUES;
280   ierr = MatAssemblyBegin(aij->A,mode); CHKERRQ(ierr);
281   ierr = MatAssemblyEnd(aij->A,mode); CHKERRQ(ierr);
282 
283   /* determine if any processor has disassembled, if so we must
284      also disassemble ourselfs, in order that we may reassemble. */
285   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
286   if (mat->was_assembled && !other_disassembled) {
287     ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
288   }
289 
290   if (!mat->was_assembled && mode == FINAL_ASSEMBLY) {
291     ierr = MatSetUpMultiply_MPIAIJ(mat); CHKERRQ(ierr);
292   }
293   ierr = MatAssemblyBegin(aij->B,mode); CHKERRQ(ierr);
294   ierr = MatAssemblyEnd(aij->B,mode); CHKERRQ(ierr);
295 
296   if (aij->rowvalues) {PetscFree(aij->rowvalues); aij->rowvalues = 0;}
297   return 0;
298 }
299 
300 static int MatZeroEntries_MPIAIJ(Mat A)
301 {
302   Mat_MPIAIJ *l = (Mat_MPIAIJ *) A->data;
303   int        ierr;
304   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
305   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
306   return 0;
307 }
308 
309 /* the code does not do the diagonal entries correctly unless the
310    matrix is square and the column and row owerships are identical.
311    This is a BUG. The only way to fix it seems to be to access
312    aij->A and aij->B directly and not through the MatZeroRows()
313    routine.
314 */
315 static int MatZeroRows_MPIAIJ(Mat A,IS is,Scalar *diag)
316 {
317   Mat_MPIAIJ     *l = (Mat_MPIAIJ *) A->data;
318   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
319   int            *procs,*nprocs,j,found,idx,nsends,*work;
320   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
321   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
322   int            *lens,imdex,*lrows,*values;
323   MPI_Comm       comm = A->comm;
324   MPI_Request    *send_waits,*recv_waits;
325   MPI_Status     recv_status,*send_status;
326   IS             istmp;
327 
328   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
329   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
330 
331   /*  first count number of contributors to each processor */
332   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
333   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
334   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
335   for ( i=0; i<N; i++ ) {
336     idx = rows[i];
337     found = 0;
338     for ( j=0; j<size; j++ ) {
339       if (idx >= owners[j] && idx < owners[j+1]) {
340         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
341       }
342     }
343     if (!found) SETERRQ(1,"MatZeroRows_MPIAIJ:Index out of range");
344   }
345   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
346 
347   /* inform other processors of number of messages and max length*/
348   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
349   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
350   nrecvs = work[rank];
351   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
352   nmax = work[rank];
353   PetscFree(work);
354 
355   /* post receives:   */
356   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
357   CHKPTRQ(rvalues);
358   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
359   CHKPTRQ(recv_waits);
360   for ( i=0; i<nrecvs; i++ ) {
361     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
362   }
363 
364   /* do sends:
365       1) starts[i] gives the starting index in svalues for stuff going to
366          the ith processor
367   */
368   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
369   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
370   CHKPTRQ(send_waits);
371   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
372   starts[0] = 0;
373   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
374   for ( i=0; i<N; i++ ) {
375     svalues[starts[owner[i]]++] = rows[i];
376   }
377   ISRestoreIndices(is,&rows);
378 
379   starts[0] = 0;
380   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
381   count = 0;
382   for ( i=0; i<size; i++ ) {
383     if (procs[i]) {
384       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
385     }
386   }
387   PetscFree(starts);
388 
389   base = owners[rank];
390 
391   /*  wait on receives */
392   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
393   source = lens + nrecvs;
394   count  = nrecvs; slen = 0;
395   while (count) {
396     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
397     /* unpack receives into our local space */
398     MPI_Get_count(&recv_status,MPI_INT,&n);
399     source[imdex]  = recv_status.MPI_SOURCE;
400     lens[imdex]  = n;
401     slen += n;
402     count--;
403   }
404   PetscFree(recv_waits);
405 
406   /* move the data into the send scatter */
407   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
408   count = 0;
409   for ( i=0; i<nrecvs; i++ ) {
410     values = rvalues + i*nmax;
411     for ( j=0; j<lens[i]; j++ ) {
412       lrows[count++] = values[j] - base;
413     }
414   }
415   PetscFree(rvalues); PetscFree(lens);
416   PetscFree(owner); PetscFree(nprocs);
417 
418   /* actually zap the local rows */
419   ierr = ISCreateSeq(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
420   PLogObjectParent(A,istmp);
421   PetscFree(lrows);
422   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
423   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
424   ierr = ISDestroy(istmp); CHKERRQ(ierr);
425 
426   /* wait on sends */
427   if (nsends) {
428     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
429     CHKPTRQ(send_status);
430     MPI_Waitall(nsends,send_waits,send_status);
431     PetscFree(send_status);
432   }
433   PetscFree(send_waits); PetscFree(svalues);
434 
435   return 0;
436 }
437 
438 static int MatMult_MPIAIJ(Mat A,Vec xx,Vec yy)
439 {
440   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
441   int        ierr;
442 
443   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx); CHKERRQ(ierr);
444   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
445   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx); CHKERRQ(ierr);
446   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
447   return 0;
448 }
449 
450 static int MatMultAdd_MPIAIJ(Mat A,Vec xx,Vec yy,Vec zz)
451 {
452   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
453   int        ierr;
454   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
455   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
456   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
457   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
458   return 0;
459 }
460 
461 static int MatMultTrans_MPIAIJ(Mat A,Vec xx,Vec yy)
462 {
463   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
464   int        ierr;
465 
466   /* do nondiagonal part */
467   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
468   /* send it on its way */
469   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,
470                 (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
471   /* do local part */
472   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
473   /* receive remote parts: note this assumes the values are not actually */
474   /* inserted in yy until the next line, which is true for my implementation*/
475   /* but is not perhaps always true. */
476   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,
477                   (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
478   return 0;
479 }
480 
481 static int MatMultTransAdd_MPIAIJ(Mat A,Vec xx,Vec yy,Vec zz)
482 {
483   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
484   int        ierr;
485 
486   /* do nondiagonal part */
487   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
488   /* send it on its way */
489   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,
490                  (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
491   /* do local part */
492   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
493   /* receive remote parts: note this assumes the values are not actually */
494   /* inserted in yy until the next line, which is true for my implementation*/
495   /* but is not perhaps always true. */
496   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,
497                   (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
498   return 0;
499 }
500 
501 /*
502   This only works correctly for square matrices where the subblock A->A is the
503    diagonal block
504 */
505 static int MatGetDiagonal_MPIAIJ(Mat A,Vec v)
506 {
507   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
508   if (a->M != a->N)
509     SETERRQ(1,"MatGetDiagonal_MPIAIJ:Supports only square matrix where A->A is diag block");
510   return MatGetDiagonal(a->A,v);
511 }
512 
513 static int MatScale_MPIAIJ(Scalar *aa,Mat A)
514 {
515   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
516   int        ierr;
517   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
518   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
519   return 0;
520 }
521 
522 static int MatDestroy_MPIAIJ(PetscObject obj)
523 {
524   Mat        mat = (Mat) obj;
525   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
526   int        ierr;
527 #if defined(PETSC_LOG)
528   PLogObjectState(obj,"Rows=%d, Cols=%d",aij->M,aij->N);
529 #endif
530   PetscFree(aij->rowners);
531   ierr = MatDestroy(aij->A); CHKERRQ(ierr);
532   ierr = MatDestroy(aij->B); CHKERRQ(ierr);
533   if (aij->colmap) PetscFree(aij->colmap);
534   if (aij->garray) PetscFree(aij->garray);
535   if (aij->lvec)   VecDestroy(aij->lvec);
536   if (aij->Mvctx)  VecScatterDestroy(aij->Mvctx);
537   if (aij->rowvalues) PetscFree(aij->rowvalues);
538   PetscFree(aij);
539   PLogObjectDestroy(mat);
540   PetscHeaderDestroy(mat);
541   return 0;
542 }
543 #include "draw.h"
544 #include "pinclude/pviewer.h"
545 
546 static int MatView_MPIAIJ_Binary(Mat mat,Viewer viewer)
547 {
548   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
549   int         ierr;
550 
551   if (aij->size == 1) {
552     ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
553   }
554   else SETERRQ(1,"MatView_MPIAIJ_Binary:Only uniprocessor output supported");
555   return 0;
556 }
557 
558 static int MatView_MPIAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
559 {
560   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
561   Mat_SeqAIJ* C = (Mat_SeqAIJ*)aij->A->data;
562   int         ierr, format,shift = C->indexshift,rank;
563   FILE        *fd;
564   ViewerType  vtype;
565 
566   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
567   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
568     ierr = ViewerGetFormat(viewer,&format);
569     if (format == ASCII_FORMAT_INFO_DETAILED) {
570       int nz, nzalloc, mem, flg;
571       MPI_Comm_rank(mat->comm,&rank);
572       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
573       ierr = MatGetInfo(mat,MAT_LOCAL,&nz,&nzalloc,&mem);
574       ierr = OptionsHasName(PETSC_NULL,"-mat_aij_no_inode",&flg); CHKERRQ(ierr);
575       PetscSequentialPhaseBegin(mat->comm,1);
576       if (flg) fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d mem %d, not using I-node routines\n",
577          rank,aij->m,nz,nzalloc,mem);
578       else fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d mem %d, using I-node routines\n",
579          rank,aij->m,nz,nzalloc,mem);
580       ierr = MatGetInfo(aij->A,MAT_LOCAL,&nz,&nzalloc,&mem);
581       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,nz);
582       ierr = MatGetInfo(aij->B,MAT_LOCAL,&nz,&nzalloc,&mem);
583       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,nz);
584       fflush(fd);
585       PetscSequentialPhaseEnd(mat->comm,1);
586       ierr = VecScatterView(aij->Mvctx,viewer); CHKERRQ(ierr);
587       return 0;
588     }
589     else if (format == ASCII_FORMAT_INFO) {
590       return 0;
591     }
592   }
593 
594   if (vtype == DRAW_VIEWER) {
595     Draw       draw;
596     PetscTruth isnull;
597     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
598     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
599   }
600 
601   if (vtype == ASCII_FILE_VIEWER) {
602     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
603     PetscSequentialPhaseBegin(mat->comm,1);
604     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
605            aij->rank,aij->m,aij->rstart,aij->rend,aij->n,aij->cstart,
606            aij->cend);
607     ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
608     ierr = MatView(aij->B,viewer); CHKERRQ(ierr);
609     fflush(fd);
610     PetscSequentialPhaseEnd(mat->comm,1);
611   }
612   else {
613     int size = aij->size;
614     rank = aij->rank;
615     if (size == 1) {
616       ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
617     }
618     else {
619       /* assemble the entire matrix onto first processor. */
620       Mat         A;
621       Mat_SeqAIJ *Aloc;
622       int         M = aij->M, N = aij->N,m,*ai,*aj,row,*cols,i,*ct;
623       Scalar      *a;
624 
625       if (!rank) {
626         ierr = MatCreateMPIAIJ(mat->comm,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
627                CHKERRQ(ierr);
628       }
629       else {
630         ierr = MatCreateMPIAIJ(mat->comm,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
631                CHKERRQ(ierr);
632       }
633       PLogObjectParent(mat,A);
634 
635       /* copy over the A part */
636       Aloc = (Mat_SeqAIJ*) aij->A->data;
637       m = Aloc->m; ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
638       row = aij->rstart;
639       for ( i=0; i<ai[m]+shift; i++ ) {aj[i] += aij->cstart + shift;}
640       for ( i=0; i<m; i++ ) {
641         ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],aj,a,INSERT_VALUES);CHKERRQ(ierr);
642         row++; a += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
643       }
644       aj = Aloc->j;
645       for ( i=0; i<ai[m]+shift; i++ ) {aj[i] -= aij->cstart + shift;}
646 
647       /* copy over the B part */
648       Aloc = (Mat_SeqAIJ*) aij->B->data;
649       m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
650       row = aij->rstart;
651       ct = cols = (int *) PetscMalloc( (ai[m]+1)*sizeof(int) ); CHKPTRQ(cols);
652       for ( i=0; i<ai[m]+shift; i++ ) {cols[i] = aij->garray[aj[i]+shift];}
653       for ( i=0; i<m; i++ ) {
654         ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],cols,a,INSERT_VALUES);CHKERRQ(ierr);
655         row++; a += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
656       }
657       PetscFree(ct);
658       ierr = MatAssemblyBegin(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
659       ierr = MatAssemblyEnd(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
660       if (!rank) {
661         ierr = MatView(((Mat_MPIAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
662       }
663       ierr = MatDestroy(A); CHKERRQ(ierr);
664     }
665   }
666   return 0;
667 }
668 
669 static int MatView_MPIAIJ(PetscObject obj,Viewer viewer)
670 {
671   Mat         mat = (Mat) obj;
672   int         ierr;
673   ViewerType  vtype;
674 
675   if (!viewer) {
676     viewer = STDOUT_VIEWER_SELF;
677   }
678   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
679   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
680       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
681     ierr = MatView_MPIAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
682   }
683   else if (vtype == BINARY_FILE_VIEWER) {
684     return MatView_MPIAIJ_Binary(mat,viewer);
685   }
686   return 0;
687 }
688 
689 /*
690     This has to provide several versions.
691 
692      1) per sequential
693      2) a) use only local smoothing updating outer values only once.
694         b) local smoothing updating outer values each inner iteration
695      3) color updating out values betwen colors.
696 */
697 static int MatRelax_MPIAIJ(Mat matin,Vec bb,double omega,MatSORType flag,
698                            double fshift,int its,Vec xx)
699 {
700   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
701   Mat        AA = mat->A, BB = mat->B;
702   Mat_SeqAIJ *A = (Mat_SeqAIJ *) AA->data, *B = (Mat_SeqAIJ *)BB->data;
703   Scalar     zero = 0.0,*b,*x,*xs,*ls,d,*v,sum,scale,*t,*ts;
704   int        ierr,*idx, *diag;
705   int        n = mat->n, m = mat->m, i,shift = A->indexshift;
706   Vec        tt;
707 
708   VecGetArray(xx,&x); VecGetArray(bb,&b); VecGetArray(mat->lvec,&ls);
709   xs = x + shift; /* shift by one for index start of 1 */
710   ls = ls + shift;
711   if (!A->diag) {if ((ierr = MatMarkDiag_SeqAIJ(AA))) return ierr;}
712   diag = A->diag;
713   if (flag == SOR_APPLY_UPPER || flag == SOR_APPLY_LOWER) {
714     SETERRQ(1,"MatRelax_MPIAIJ:Option not supported");
715   }
716   if (flag & SOR_EISENSTAT) {
717     /* Let  A = L + U + D; where L is lower trianglar,
718     U is upper triangular, E is diagonal; This routine applies
719 
720             (L + E)^{-1} A (U + E)^{-1}
721 
722     to a vector efficiently using Eisenstat's trick. This is for
723     the case of SSOR preconditioner, so E is D/omega where omega
724     is the relaxation factor.
725     */
726     ierr = VecDuplicate(xx,&tt); CHKERRQ(ierr);
727     PLogObjectParent(matin,tt);
728     VecGetArray(tt,&t);
729     scale = (2.0/omega) - 1.0;
730     /*  x = (E + U)^{-1} b */
731     VecSet(&zero,mat->lvec);
732     ierr = VecPipelineBegin(xx,mat->lvec,INSERT_VALUES,PIPELINE_UP,
733                               mat->Mvctx); CHKERRQ(ierr);
734     for ( i=m-1; i>-1; i-- ) {
735       n    = A->i[i+1] - diag[i] - 1;
736       idx  = A->j + diag[i] + !shift;
737       v    = A->a + diag[i] + !shift;
738       sum  = b[i];
739       SPARSEDENSEMDOT(sum,xs,v,idx,n);
740       d    = fshift + A->a[diag[i]+shift];
741       n    = B->i[i+1] - B->i[i];
742       idx  = B->j + B->i[i] + shift;
743       v    = B->a + B->i[i] + shift;
744       SPARSEDENSEMDOT(sum,ls,v,idx,n);
745       x[i] = omega*(sum/d);
746     }
747     ierr = VecPipelineEnd(xx,mat->lvec,INSERT_VALUES,PIPELINE_UP,
748                             mat->Mvctx); CHKERRQ(ierr);
749 
750     /*  t = b - (2*E - D)x */
751     v = A->a;
752     for ( i=0; i<m; i++ ) { t[i] = b[i] - scale*(v[*diag++ + shift])*x[i]; }
753 
754     /*  t = (E + L)^{-1}t */
755     ts = t + shift; /* shifted by one for index start of a or mat->j*/
756     diag = A->diag;
757     VecSet(&zero,mat->lvec);
758     ierr = VecPipelineBegin(tt,mat->lvec,INSERT_VALUES,PIPELINE_DOWN,
759                                                  mat->Mvctx); CHKERRQ(ierr);
760     for ( i=0; i<m; i++ ) {
761       n    = diag[i] - A->i[i];
762       idx  = A->j + A->i[i] + shift;
763       v    = A->a + A->i[i] + shift;
764       sum  = t[i];
765       SPARSEDENSEMDOT(sum,ts,v,idx,n);
766       d    = fshift + A->a[diag[i]+shift];
767       n    = B->i[i+1] - B->i[i];
768       idx  = B->j + B->i[i] + shift;
769       v    = B->a + B->i[i] + shift;
770       SPARSEDENSEMDOT(sum,ls,v,idx,n);
771       t[i] = omega*(sum/d);
772     }
773     ierr = VecPipelineEnd(tt,mat->lvec,INSERT_VALUES,PIPELINE_DOWN,
774                                                     mat->Mvctx); CHKERRQ(ierr);
775     /*  x = x + t */
776     for ( i=0; i<m; i++ ) { x[i] += t[i]; }
777     VecDestroy(tt);
778     return 0;
779   }
780 
781 
782   if ((flag & SOR_SYMMETRIC_SWEEP) == SOR_SYMMETRIC_SWEEP){
783     if (flag & SOR_ZERO_INITIAL_GUESS) {
784       VecSet(&zero,mat->lvec); VecSet(&zero,xx);
785     }
786     else {
787       ierr=VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_UP,mat->Mvctx);
788       CHKERRQ(ierr);
789       ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_UP,mat->Mvctx);
790       CHKERRQ(ierr);
791     }
792     while (its--) {
793       /* go down through the rows */
794       ierr = VecPipelineBegin(xx,mat->lvec,INSERT_VALUES,PIPELINE_DOWN,
795                               mat->Mvctx); CHKERRQ(ierr);
796       for ( i=0; i<m; i++ ) {
797         n    = A->i[i+1] - A->i[i];
798         idx  = A->j + A->i[i] + shift;
799         v    = A->a + A->i[i] + shift;
800         sum  = b[i];
801         SPARSEDENSEMDOT(sum,xs,v,idx,n);
802         d    = fshift + A->a[diag[i]+shift];
803         n    = B->i[i+1] - B->i[i];
804         idx  = B->j + B->i[i] + shift;
805         v    = B->a + B->i[i] + shift;
806         SPARSEDENSEMDOT(sum,ls,v,idx,n);
807         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
808       }
809       ierr = VecPipelineEnd(xx,mat->lvec,INSERT_VALUES,PIPELINE_DOWN,
810                             mat->Mvctx); CHKERRQ(ierr);
811       /* come up through the rows */
812       ierr = VecPipelineBegin(xx,mat->lvec,INSERT_VALUES,PIPELINE_UP,
813                               mat->Mvctx); CHKERRQ(ierr);
814       for ( i=m-1; i>-1; i-- ) {
815         n    = A->i[i+1] - A->i[i];
816         idx  = A->j + A->i[i] + shift;
817         v    = A->a + A->i[i] + shift;
818         sum  = b[i];
819         SPARSEDENSEMDOT(sum,xs,v,idx,n);
820         d    = fshift + A->a[diag[i]+shift];
821         n    = B->i[i+1] - B->i[i];
822         idx  = B->j + B->i[i] + shift;
823         v    = B->a + B->i[i] + shift;
824         SPARSEDENSEMDOT(sum,ls,v,idx,n);
825         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
826       }
827       ierr = VecPipelineEnd(xx,mat->lvec,INSERT_VALUES,PIPELINE_UP,
828                             mat->Mvctx); CHKERRQ(ierr);
829     }
830   }
831   else if (flag & SOR_FORWARD_SWEEP){
832     if (flag & SOR_ZERO_INITIAL_GUESS) {
833       VecSet(&zero,mat->lvec);
834       ierr = VecPipelineBegin(xx,mat->lvec,INSERT_VALUES,PIPELINE_DOWN,
835                               mat->Mvctx); CHKERRQ(ierr);
836       for ( i=0; i<m; i++ ) {
837         n    = diag[i] - A->i[i];
838         idx  = A->j + A->i[i] + shift;
839         v    = A->a + A->i[i] + shift;
840         sum  = b[i];
841         SPARSEDENSEMDOT(sum,xs,v,idx,n);
842         d    = fshift + A->a[diag[i]+shift];
843         n    = B->i[i+1] - B->i[i];
844         idx  = B->j + B->i[i] + shift;
845         v    = B->a + B->i[i] + shift;
846         SPARSEDENSEMDOT(sum,ls,v,idx,n);
847         x[i] = omega*(sum/d);
848       }
849       ierr = VecPipelineEnd(xx,mat->lvec,INSERT_VALUES,PIPELINE_DOWN,
850                             mat->Mvctx); CHKERRQ(ierr);
851       its--;
852     }
853     while (its--) {
854       ierr=VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_UP,mat->Mvctx);
855       CHKERRQ(ierr);
856       ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_UP,mat->Mvctx);
857       CHKERRQ(ierr);
858       ierr = VecPipelineBegin(xx,mat->lvec,INSERT_VALUES,PIPELINE_DOWN,
859                               mat->Mvctx); CHKERRQ(ierr);
860       for ( i=0; i<m; i++ ) {
861         n    = A->i[i+1] - A->i[i];
862         idx  = A->j + A->i[i] + shift;
863         v    = A->a + A->i[i] + shift;
864         sum  = b[i];
865         SPARSEDENSEMDOT(sum,xs,v,idx,n);
866         d    = fshift + A->a[diag[i]+shift];
867         n    = B->i[i+1] - B->i[i];
868         idx  = B->j + B->i[i] + shift;
869         v    = B->a + B->i[i] + shift;
870         SPARSEDENSEMDOT(sum,ls,v,idx,n);
871         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
872       }
873       ierr = VecPipelineEnd(xx,mat->lvec,INSERT_VALUES,PIPELINE_DOWN,
874                             mat->Mvctx); CHKERRQ(ierr);
875     }
876   }
877   else if (flag & SOR_BACKWARD_SWEEP){
878     if (flag & SOR_ZERO_INITIAL_GUESS) {
879       VecSet(&zero,mat->lvec);
880       ierr = VecPipelineBegin(xx,mat->lvec,INSERT_VALUES,PIPELINE_UP,
881                               mat->Mvctx); CHKERRQ(ierr);
882       for ( i=m-1; i>-1; i-- ) {
883         n    = A->i[i+1] - diag[i] - 1;
884         idx  = A->j + diag[i] + !shift;
885         v    = A->a + diag[i] + !shift;
886         sum  = b[i];
887         SPARSEDENSEMDOT(sum,xs,v,idx,n);
888         d    = fshift + A->a[diag[i]+shift];
889         n    = B->i[i+1] - B->i[i];
890         idx  = B->j + B->i[i] + shift;
891         v    = B->a + B->i[i] + shift;
892         SPARSEDENSEMDOT(sum,ls,v,idx,n);
893         x[i] = omega*(sum/d);
894       }
895       ierr = VecPipelineEnd(xx,mat->lvec,INSERT_VALUES,PIPELINE_UP,
896                             mat->Mvctx); CHKERRQ(ierr);
897       its--;
898     }
899     while (its--) {
900       ierr = VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_DOWN,
901                             mat->Mvctx); CHKERRQ(ierr);
902       ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_DOWN,
903                             mat->Mvctx); CHKERRQ(ierr);
904       ierr = VecPipelineBegin(xx,mat->lvec,INSERT_VALUES,PIPELINE_UP,
905                               mat->Mvctx); CHKERRQ(ierr);
906       for ( i=m-1; i>-1; i-- ) {
907         n    = A->i[i+1] - A->i[i];
908         idx  = A->j + A->i[i] + shift;
909         v    = A->a + A->i[i] + shift;
910         sum  = b[i];
911         SPARSEDENSEMDOT(sum,xs,v,idx,n);
912         d    = fshift + A->a[diag[i]+shift];
913         n    = B->i[i+1] - B->i[i];
914         idx  = B->j + B->i[i] + shift;
915         v    = B->a + B->i[i] + shift;
916         SPARSEDENSEMDOT(sum,ls,v,idx,n);
917         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
918       }
919       ierr = VecPipelineEnd(xx,mat->lvec,INSERT_VALUES,PIPELINE_UP,
920                             mat->Mvctx); CHKERRQ(ierr);
921     }
922   }
923   else if ((flag & SOR_LOCAL_SYMMETRIC_SWEEP) == SOR_LOCAL_SYMMETRIC_SWEEP){
924     if (flag & SOR_ZERO_INITIAL_GUESS) {
925       return (*mat->A->ops.relax)(mat->A,bb,omega,flag,fshift,its,xx);
926     }
927     ierr=VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,mat->Mvctx);
928     CHKERRQ(ierr);
929     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,mat->Mvctx);
930     CHKERRQ(ierr);
931     while (its--) {
932       /* go down through the rows */
933       for ( i=0; i<m; i++ ) {
934         n    = A->i[i+1] - A->i[i];
935         idx  = A->j + A->i[i] + shift;
936         v    = A->a + A->i[i] + shift;
937         sum  = b[i];
938         SPARSEDENSEMDOT(sum,xs,v,idx,n);
939         d    = fshift + A->a[diag[i]+shift];
940         n    = B->i[i+1] - B->i[i];
941         idx  = B->j + B->i[i] + shift;
942         v    = B->a + B->i[i] + shift;
943         SPARSEDENSEMDOT(sum,ls,v,idx,n);
944         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
945       }
946       /* come up through the rows */
947       for ( i=m-1; i>-1; i-- ) {
948         n    = A->i[i+1] - A->i[i];
949         idx  = A->j + A->i[i] + shift;
950         v    = A->a + A->i[i] + shift;
951         sum  = b[i];
952         SPARSEDENSEMDOT(sum,xs,v,idx,n);
953         d    = fshift + A->a[diag[i]+shift];
954         n    = B->i[i+1] - B->i[i];
955         idx  = B->j + B->i[i] + shift;
956         v    = B->a + B->i[i] + shift;
957         SPARSEDENSEMDOT(sum,ls,v,idx,n);
958         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
959       }
960     }
961   }
962   else if (flag & SOR_LOCAL_FORWARD_SWEEP){
963     if (flag & SOR_ZERO_INITIAL_GUESS) {
964       return (*mat->A->ops.relax)(mat->A,bb,omega,flag,fshift,its,xx);
965     }
966     ierr=VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,mat->Mvctx);
967     CHKERRQ(ierr);
968     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,mat->Mvctx);
969     CHKERRQ(ierr);
970     while (its--) {
971       for ( i=0; i<m; i++ ) {
972         n    = A->i[i+1] - A->i[i];
973         idx  = A->j + A->i[i] + shift;
974         v    = A->a + A->i[i] + shift;
975         sum  = b[i];
976         SPARSEDENSEMDOT(sum,xs,v,idx,n);
977         d    = fshift + A->a[diag[i]+shift];
978         n    = B->i[i+1] - B->i[i];
979         idx  = B->j + B->i[i] + shift;
980         v    = B->a + B->i[i] + shift;
981         SPARSEDENSEMDOT(sum,ls,v,idx,n);
982         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
983       }
984     }
985   }
986   else if (flag & SOR_LOCAL_BACKWARD_SWEEP){
987     if (flag & SOR_ZERO_INITIAL_GUESS) {
988       return (*mat->A->ops.relax)(mat->A,bb,omega,flag,fshift,its,xx);
989     }
990     ierr = VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,
991                             mat->Mvctx); CHKERRQ(ierr);
992     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,
993                             mat->Mvctx); CHKERRQ(ierr);
994     while (its--) {
995       for ( i=m-1; i>-1; i-- ) {
996         n    = A->i[i+1] - A->i[i];
997         idx  = A->j + A->i[i] + shift;
998         v    = A->a + A->i[i] + shift;
999         sum  = b[i];
1000         SPARSEDENSEMDOT(sum,xs,v,idx,n);
1001         d    = fshift + A->a[diag[i]+shift];
1002         n    = B->i[i+1] - B->i[i];
1003         idx  = B->j + B->i[i] + shift;
1004         v    = B->a + B->i[i] + shift;
1005         SPARSEDENSEMDOT(sum,ls,v,idx,n);
1006         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
1007       }
1008     }
1009   }
1010   return 0;
1011 }
1012 
1013 static int MatGetInfo_MPIAIJ(Mat matin,MatInfoType flag,int *nz,
1014                              int *nzalloc,int *mem)
1015 {
1016   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1017   Mat        A = mat->A, B = mat->B;
1018   int        ierr, isend[3], irecv[3], nzA, nzallocA, memA;
1019 
1020   ierr = MatGetInfo(A,MAT_LOCAL,&nzA,&nzallocA,&memA); CHKERRQ(ierr);
1021   ierr = MatGetInfo(B,MAT_LOCAL,&isend[0],&isend[1],&isend[2]); CHKERRQ(ierr);
1022   isend[0] += nzA; isend[1] += nzallocA; isend[2] += memA;
1023   if (flag == MAT_LOCAL) {
1024     if (nz)       *nz      = isend[0];
1025     if (nzalloc)  *nzalloc = isend[1];
1026     if (mem)      *mem     = isend[2];
1027   } else if (flag == MAT_GLOBAL_MAX) {
1028     MPI_Allreduce( isend, irecv,3,MPI_INT,MPI_MAX,matin->comm);
1029     if (nz)      *nz      = irecv[0];
1030     if (nzalloc) *nzalloc = irecv[1];
1031     if (mem)     *mem     = irecv[2];
1032   } else if (flag == MAT_GLOBAL_SUM) {
1033     MPI_Allreduce( isend, irecv,3,MPI_INT,MPI_SUM,matin->comm);
1034     if (nz)      *nz      = irecv[0];
1035     if (nzalloc) *nzalloc = irecv[1];
1036     if (mem)     *mem     = irecv[2];
1037   }
1038   return 0;
1039 }
1040 
1041 extern int MatLUFactorSymbolic_MPIAIJ(Mat,IS,IS,double,Mat*);
1042 extern int MatLUFactorNumeric_MPIAIJ(Mat,Mat*);
1043 extern int MatLUFactor_MPIAIJ(Mat,IS,IS,double);
1044 extern int MatILUFactorSymbolic_MPIAIJ(Mat,IS,IS,double,int,Mat *);
1045 extern int MatSolve_MPIAIJ(Mat,Vec,Vec);
1046 extern int MatSolveAdd_MPIAIJ(Mat,Vec,Vec,Vec);
1047 extern int MatSolveTrans_MPIAIJ(Mat,Vec,Vec);
1048 extern int MatSolveTransAdd_MPIAIJ(Mat,Vec,Vec,Vec);
1049 
1050 static int MatSetOption_MPIAIJ(Mat A,MatOption op)
1051 {
1052   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
1053 
1054   if (op == NO_NEW_NONZERO_LOCATIONS ||
1055       op == YES_NEW_NONZERO_LOCATIONS ||
1056       op == COLUMNS_SORTED ||
1057       op == ROW_ORIENTED) {
1058         MatSetOption(a->A,op);
1059         MatSetOption(a->B,op);
1060   }
1061   else if (op == ROWS_SORTED ||
1062            op == SYMMETRIC_MATRIX ||
1063            op == STRUCTURALLY_SYMMETRIC_MATRIX ||
1064            op == YES_NEW_DIAGONALS)
1065     PLogInfo(A,"Info:MatSetOption_MPIAIJ:Option ignored\n");
1066   else if (op == COLUMN_ORIENTED) {
1067     a->roworiented = 0;
1068     MatSetOption(a->A,op);
1069     MatSetOption(a->B,op);
1070   }
1071   else if (op == NO_NEW_DIAGONALS)
1072     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIAIJ:NO_NEW_DIAGONALS");}
1073   else
1074     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIAIJ:unknown option");}
1075   return 0;
1076 }
1077 
1078 static int MatGetSize_MPIAIJ(Mat matin,int *m,int *n)
1079 {
1080   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1081   *m = mat->M; *n = mat->N;
1082   return 0;
1083 }
1084 
1085 static int MatGetLocalSize_MPIAIJ(Mat matin,int *m,int *n)
1086 {
1087   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1088   *m = mat->m; *n = mat->N;
1089   return 0;
1090 }
1091 
1092 static int MatGetOwnershipRange_MPIAIJ(Mat matin,int *m,int *n)
1093 {
1094   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1095   *m = mat->rstart; *n = mat->rend;
1096   return 0;
1097 }
1098 
1099 extern int MatGetRow_SeqAIJ(Mat,int,int*,int**,Scalar**);
1100 extern int MatRestoreRow_SeqAIJ(Mat,int,int*,int**,Scalar**);
1101 
1102 int MatGetRow_MPIAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
1103 {
1104   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1105   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
1106   int        i, ierr, *cworkA, *cworkB, **pcA, **pcB, cstart = mat->cstart;
1107   int        nztot, nzA, nzB, lrow, rstart = mat->rstart, rend = mat->rend;
1108   int        *cmap, *idx_p;
1109 
1110   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,"MatGetRow_MPIAIJ:Already active");
1111   mat->getrowactive = PETSC_TRUE;
1112 
1113   if (!mat->rowvalues && (idx || v)) {
1114     /*
1115         allocate enough space to hold information from the longest row.
1116     */
1117     Mat_SeqAIJ *Aa = (Mat_SeqAIJ *) mat->A->data,*Ba = (Mat_SeqAIJ *) mat->B->data;
1118     int     max = 1,n = mat->n,tmp;
1119     for ( i=0; i<n; i++ ) {
1120       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
1121       if (max < tmp) { max = tmp; }
1122     }
1123     mat->rowvalues = (Scalar *) PetscMalloc( max*(sizeof(int)+sizeof(Scalar)));
1124     CHKPTRQ(mat->rowvalues);
1125     mat->rowindices = (int *) (mat->rowvalues + max);
1126   }
1127 
1128 
1129   if (row < rstart || row >= rend) SETERRQ(1,"MatGetRow_MPIAIJ:Only local rows")
1130   lrow = row - rstart;
1131 
1132   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
1133   if (!v)   {pvA = 0; pvB = 0;}
1134   if (!idx) {pcA = 0; if (!v) pcB = 0;}
1135   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1136   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1137   nztot = nzA + nzB;
1138 
1139   cmap  = mat->garray;
1140   if (v  || idx) {
1141     if (nztot) {
1142       /* Sort by increasing column numbers, assuming A and B already sorted */
1143       int imark = -1;
1144       if (v) {
1145         *v = v_p = mat->rowvalues;
1146         for ( i=0; i<nzB; i++ ) {
1147           if (cmap[cworkB[i]] < cstart)   v_p[i] = vworkB[i];
1148           else break;
1149         }
1150         imark = i;
1151         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
1152         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
1153       }
1154       if (idx) {
1155         *idx = idx_p = mat->rowindices;
1156         if (imark > -1) {
1157           for ( i=0; i<imark; i++ ) {
1158             idx_p[i] = cmap[cworkB[i]];
1159           }
1160         } else {
1161           for ( i=0; i<nzB; i++ ) {
1162             if (cmap[cworkB[i]] < cstart)   idx_p[i] = cmap[cworkB[i]];
1163             else break;
1164           }
1165           imark = i;
1166         }
1167         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart + cworkA[i];
1168         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]];
1169       }
1170     }
1171     else {*idx = 0; *v=0;}
1172   }
1173   *nz = nztot;
1174   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1175   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1176   return 0;
1177 }
1178 
1179 int MatRestoreRow_MPIAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
1180 {
1181   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
1182   if (aij->getrowactive == PETSC_FALSE) {
1183     SETERRQ(1,"MatRestoreRow_MPIAIJ:MatGetRow not called");
1184   }
1185   aij->getrowactive = PETSC_FALSE;
1186   return 0;
1187 }
1188 
1189 static int MatNorm_MPIAIJ(Mat mat,NormType type,double *norm)
1190 {
1191   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
1192   Mat_SeqAIJ *amat = (Mat_SeqAIJ*) aij->A->data, *bmat = (Mat_SeqAIJ*) aij->B->data;
1193   int        ierr, i, j, cstart = aij->cstart,shift = amat->indexshift;
1194   double     sum = 0.0;
1195   Scalar     *v;
1196 
1197   if (aij->size == 1) {
1198     ierr =  MatNorm(aij->A,type,norm); CHKERRQ(ierr);
1199   } else {
1200     if (type == NORM_FROBENIUS) {
1201       v = amat->a;
1202       for (i=0; i<amat->nz; i++ ) {
1203 #if defined(PETSC_COMPLEX)
1204         sum += real(conj(*v)*(*v)); v++;
1205 #else
1206         sum += (*v)*(*v); v++;
1207 #endif
1208       }
1209       v = bmat->a;
1210       for (i=0; i<bmat->nz; i++ ) {
1211 #if defined(PETSC_COMPLEX)
1212         sum += real(conj(*v)*(*v)); v++;
1213 #else
1214         sum += (*v)*(*v); v++;
1215 #endif
1216       }
1217       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
1218       *norm = sqrt(*norm);
1219     }
1220     else if (type == NORM_1) { /* max column norm */
1221       double *tmp, *tmp2;
1222       int    *jj, *garray = aij->garray;
1223       tmp  = (double *) PetscMalloc( aij->N*sizeof(double) ); CHKPTRQ(tmp);
1224       tmp2 = (double *) PetscMalloc( aij->N*sizeof(double) ); CHKPTRQ(tmp2);
1225       PetscMemzero(tmp,aij->N*sizeof(double));
1226       *norm = 0.0;
1227       v = amat->a; jj = amat->j;
1228       for ( j=0; j<amat->nz; j++ ) {
1229         tmp[cstart + *jj++ + shift] += PetscAbsScalar(*v);  v++;
1230       }
1231       v = bmat->a; jj = bmat->j;
1232       for ( j=0; j<bmat->nz; j++ ) {
1233         tmp[garray[*jj++ + shift]] += PetscAbsScalar(*v); v++;
1234       }
1235       MPI_Allreduce(tmp,tmp2,aij->N,MPI_DOUBLE,MPI_SUM,mat->comm);
1236       for ( j=0; j<aij->N; j++ ) {
1237         if (tmp2[j] > *norm) *norm = tmp2[j];
1238       }
1239       PetscFree(tmp); PetscFree(tmp2);
1240     }
1241     else if (type == NORM_INFINITY) { /* max row norm */
1242       double ntemp = 0.0;
1243       for ( j=0; j<amat->m; j++ ) {
1244         v = amat->a + amat->i[j] + shift;
1245         sum = 0.0;
1246         for ( i=0; i<amat->i[j+1]-amat->i[j]; i++ ) {
1247           sum += PetscAbsScalar(*v); v++;
1248         }
1249         v = bmat->a + bmat->i[j] + shift;
1250         for ( i=0; i<bmat->i[j+1]-bmat->i[j]; i++ ) {
1251           sum += PetscAbsScalar(*v); v++;
1252         }
1253         if (sum > ntemp) ntemp = sum;
1254       }
1255       MPI_Allreduce(&ntemp,norm,1,MPI_DOUBLE,MPI_MAX,mat->comm);
1256     }
1257     else {
1258       SETERRQ(1,"MatNorm_MPIAIJ:No support for two norm");
1259     }
1260   }
1261   return 0;
1262 }
1263 
1264 static int MatTranspose_MPIAIJ(Mat A,Mat *matout)
1265 {
1266   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
1267   Mat_SeqAIJ *Aloc = (Mat_SeqAIJ *) a->A->data;
1268   int        ierr,shift = Aloc->indexshift;
1269   Mat        B;
1270   int        M = a->M, N = a->N,m,*ai,*aj,row,*cols,i,*ct;
1271   Scalar     *array;
1272 
1273   if (matout == PETSC_NULL && M != N)
1274     SETERRQ(1,"MatTranspose_MPIAIJ:Square matrix only for in-place");
1275   ierr = MatCreateMPIAIJ(A->comm,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,
1276          PETSC_NULL,&B); CHKERRQ(ierr);
1277 
1278   /* copy over the A part */
1279   Aloc = (Mat_SeqAIJ*) a->A->data;
1280   m = Aloc->m; ai = Aloc->i; aj = Aloc->j; array = Aloc->a;
1281   row = a->rstart;
1282   for ( i=0; i<ai[m]+shift; i++ ) {aj[i] += a->cstart + shift;}
1283   for ( i=0; i<m; i++ ) {
1284     ierr = MatSetValues(B,ai[i+1]-ai[i],aj,1,&row,array,INSERT_VALUES);CHKERRQ(ierr);
1285     row++; array += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
1286   }
1287   aj = Aloc->j;
1288   for ( i=0; i<ai[m]+shift; i++ ) {aj[i] -= a->cstart + shift;}
1289 
1290   /* copy over the B part */
1291   Aloc = (Mat_SeqAIJ*) a->B->data;
1292   m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; array = Aloc->a;
1293   row = a->rstart;
1294   ct = cols = (int *) PetscMalloc( (1+ai[m]-shift)*sizeof(int) ); CHKPTRQ(cols);
1295   for ( i=0; i<ai[m]+shift; i++ ) {cols[i] = a->garray[aj[i]+shift];}
1296   for ( i=0; i<m; i++ ) {
1297     ierr = MatSetValues(B,ai[i+1]-ai[i],cols,1,&row,array,INSERT_VALUES);CHKERRQ(ierr);
1298     row++; array += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
1299   }
1300   PetscFree(ct);
1301   ierr = MatAssemblyBegin(B,FINAL_ASSEMBLY); CHKERRQ(ierr);
1302   ierr = MatAssemblyEnd(B,FINAL_ASSEMBLY); CHKERRQ(ierr);
1303   if (matout != PETSC_NULL) {
1304     *matout = B;
1305   } else {
1306     /* This isn't really an in-place transpose .... but free data structures from a */
1307     PetscFree(a->rowners);
1308     ierr = MatDestroy(a->A); CHKERRQ(ierr);
1309     ierr = MatDestroy(a->B); CHKERRQ(ierr);
1310     if (a->colmap) PetscFree(a->colmap);
1311     if (a->garray) PetscFree(a->garray);
1312     if (a->lvec) VecDestroy(a->lvec);
1313     if (a->Mvctx) VecScatterDestroy(a->Mvctx);
1314     PetscFree(a);
1315     PetscMemcpy(A,B,sizeof(struct _Mat));
1316     PetscHeaderDestroy(B);
1317   }
1318   return 0;
1319 }
1320 
1321 extern int MatPrintHelp_SeqAIJ(Mat);
1322 static int MatPrintHelp_MPIAIJ(Mat A)
1323 {
1324   Mat_MPIAIJ *a   = (Mat_MPIAIJ*) A->data;
1325 
1326   if (!a->rank) return MatPrintHelp_SeqAIJ(a->A);
1327   else return 0;
1328 }
1329 
1330 extern int MatConvert_MPIAIJ(Mat,MatType,Mat *);
1331 static int MatConvertSameType_MPIAIJ(Mat,Mat *,int);
1332 extern int MatIncreaseOverlap_MPIAIJ(Mat , int, IS *, int);
1333 int MatGetSubMatrices_MPIAIJ (Mat ,int , IS *,IS *,MatGetSubMatrixCall,Mat **);
1334 /* -------------------------------------------------------------------*/
1335 static struct _MatOps MatOps = {MatSetValues_MPIAIJ,
1336        MatGetRow_MPIAIJ,MatRestoreRow_MPIAIJ,
1337        MatMult_MPIAIJ,MatMultAdd_MPIAIJ,
1338        MatMultTrans_MPIAIJ,MatMultTransAdd_MPIAIJ,
1339        MatSolve_MPIAIJ,MatSolveAdd_MPIAIJ,
1340        MatSolveTrans_MPIAIJ,MatSolveTransAdd_MPIAIJ,
1341        MatLUFactor_MPIAIJ,0,
1342        MatRelax_MPIAIJ,
1343        MatTranspose_MPIAIJ,
1344        MatGetInfo_MPIAIJ,0,
1345        MatGetDiagonal_MPIAIJ,0,MatNorm_MPIAIJ,
1346        MatAssemblyBegin_MPIAIJ,MatAssemblyEnd_MPIAIJ,
1347        0,
1348        MatSetOption_MPIAIJ,MatZeroEntries_MPIAIJ,MatZeroRows_MPIAIJ,
1349        MatGetReordering_MPIAIJ,
1350        MatLUFactorSymbolic_MPIAIJ,MatLUFactorNumeric_MPIAIJ,0,0,
1351        MatGetSize_MPIAIJ,MatGetLocalSize_MPIAIJ,MatGetOwnershipRange_MPIAIJ,
1352        MatILUFactorSymbolic_MPIAIJ,0,
1353        0,0,MatConvert_MPIAIJ,0,0,MatConvertSameType_MPIAIJ,0,0,
1354        0,0,0,
1355        MatGetSubMatrices_MPIAIJ,MatIncreaseOverlap_MPIAIJ,MatGetValues_MPIAIJ,0,
1356        MatPrintHelp_MPIAIJ,
1357        MatScale_MPIAIJ};
1358 
1359 /*@C
1360    MatCreateMPIAIJ - Creates a sparse parallel matrix in AIJ format
1361    (the default parallel PETSc format).  For good matrix assembly performance
1362    the user should preallocate the matrix storage by setting the parameters
1363    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1364    performance can be increased by more than a factor of 50.
1365 
1366    Input Parameters:
1367 .  comm - MPI communicator
1368 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1369 .  n - number of local columns (or PETSC_DECIDE to have calculated
1370            if N is given)
1371 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1372 .  N - number of global columns (or PETSC_DECIDE to have calculated
1373            if n is given)
1374 .  d_nz - number of nonzeros per row in diagonal portion of local submatrix
1375            (same for all local rows)
1376 .  d_nzz - array containing the number of nonzeros in the various rows of the
1377            diagonal portion of local submatrix (possibly different for each row)
1378            or PETSC_NULL. You must leave room for the diagonal entry even if
1379            it is zero.
1380 .  o_nz - number of nonzeros per row in the off-diagonal portion of local
1381            submatrix (same for all local rows).
1382 .  o_nzz - array containing the number of nonzeros in the various rows of the
1383            off-diagonal portion of the local submatrix (possibly different for
1384            each row) or PETSC_NULL.
1385 
1386    Output Parameter:
1387 .  A - the matrix
1388 
1389    Notes:
1390    The AIJ format (also called the Yale sparse matrix format or
1391    compressed row storage), is fully compatible with standard Fortran 77
1392    storage.  That is, the stored row and column indices can begin at
1393    either one (as in Fortran) or zero.  See the users manual for details.
1394 
1395    The user MUST specify either the local or global matrix dimensions
1396    (possibly both).
1397 
1398    By default, this format uses inodes (identical nodes) when possible.
1399    We search for consecutive rows with the same nonzero structure, thereby
1400    reusing matrix information to achieve increased efficiency.
1401 
1402    Options Database Keys:
1403 $    -mat_aij_no_inode  - Do not use inodes
1404 $    -mat_aij_inode_limit <limit> - Set inode limit.
1405 $        (max limit=5)
1406 $    -mat_aij_oneindex - Internally use indexing starting at 1
1407 $        rather than 0.  Note: When calling MatSetValues(),
1408 $        the user still MUST index entries starting at 0!
1409 
1410    Storage Information:
1411    For a square global matrix we define each processor's diagonal portion
1412    to be its local rows and the corresponding columns (a square submatrix);
1413    each processor's off-diagonal portion encompasses the remainder of the
1414    local matrix (a rectangular submatrix).
1415 
1416    The user can specify preallocated storage for the diagonal part of
1417    the local submatrix with either d_nz or d_nnz (not both).  Set
1418    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1419    memory allocation.  Likewise, specify preallocated storage for the
1420    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1421 
1422    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1423    the figure below we depict these three local rows and all columns (0-11).
1424 
1425 $          0 1 2 3 4 5 6 7 8 9 10 11
1426 $         -------------------
1427 $  row 3  |  o o o d d d o o o o o o
1428 $  row 4  |  o o o d d d o o o o o o
1429 $  row 5  |  o o o d d d o o o o o o
1430 $         -------------------
1431 $
1432 
1433    Thus, any entries in the d locations are stored in the d (diagonal)
1434    submatrix, and any entries in the o locations are stored in the
1435    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1436    stored simply in the MATSEQAIJ format for compressed row storage.
1437 
1438    Now d_nz should indicate the number of nonzeros per row in the d matrix,
1439    and o_nz should indicate the number of nonzeros per row in the o matrix.
1440    In general, for PDE problems in which most nonzeros are near the diagonal,
1441    one expects d_nz >> o_nz. For large problems you MUST preallocate memory
1442    or you will get TERRIBLE performance, see the users' manual chapter on
1443    matrices and the file $(PETSC_DIR)/Performance.
1444 
1445 .keywords: matrix, aij, compressed row, sparse, parallel
1446 
1447 .seealso: MatCreate(), MatCreateSeqAIJ(), MatSetValues()
1448 @*/
1449 int MatCreateMPIAIJ(MPI_Comm comm,int m,int n,int M,int N,
1450                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1451 {
1452   Mat          B;
1453   Mat_MPIAIJ   *b;
1454   int          ierr, i,sum[2],work[2];
1455 
1456   *A = 0;
1457   PetscHeaderCreate(B,_Mat,MAT_COOKIE,MATMPIAIJ,comm);
1458   PLogObjectCreate(B);
1459   B->data       = (void *) (b = PetscNew(Mat_MPIAIJ)); CHKPTRQ(b);
1460   PetscMemzero(b,sizeof(Mat_MPIAIJ));
1461   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1462   B->destroy    = MatDestroy_MPIAIJ;
1463   B->view       = MatView_MPIAIJ;
1464   B->factor     = 0;
1465   B->assembled  = PETSC_FALSE;
1466 
1467   b->insertmode = NOT_SET_VALUES;
1468   MPI_Comm_rank(comm,&b->rank);
1469   MPI_Comm_size(comm,&b->size);
1470 
1471   if (m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
1472     SETERRQ(1,"MatCreateMPIAIJ:Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1473 
1474   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1475     work[0] = m; work[1] = n;
1476     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
1477     if (M == PETSC_DECIDE) M = sum[0];
1478     if (N == PETSC_DECIDE) N = sum[1];
1479   }
1480   if (m == PETSC_DECIDE) {m = M/b->size + ((M % b->size) > b->rank);}
1481   if (n == PETSC_DECIDE) {n = N/b->size + ((N % b->size) > b->rank);}
1482   b->m = m; B->m = m;
1483   b->n = n; B->n = n;
1484   b->N = N; B->N = N;
1485   b->M = M; B->M = M;
1486 
1487   /* build local table of row and column ownerships */
1488   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1489   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIAIJ));
1490   b->cowners = b->rowners + b->size + 1;
1491   MPI_Allgather(&m,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
1492   b->rowners[0] = 0;
1493   for ( i=2; i<=b->size; i++ ) {
1494     b->rowners[i] += b->rowners[i-1];
1495   }
1496   b->rstart = b->rowners[b->rank];
1497   b->rend   = b->rowners[b->rank+1];
1498   MPI_Allgather(&n,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
1499   b->cowners[0] = 0;
1500   for ( i=2; i<=b->size; i++ ) {
1501     b->cowners[i] += b->cowners[i-1];
1502   }
1503   b->cstart = b->cowners[b->rank];
1504   b->cend   = b->cowners[b->rank+1];
1505 
1506   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1507   ierr = MatCreateSeqAIJ(MPI_COMM_SELF,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1508   PLogObjectParent(B,b->A);
1509   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1510   ierr = MatCreateSeqAIJ(MPI_COMM_SELF,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1511   PLogObjectParent(B,b->B);
1512 
1513   /* build cache for off array entries formed */
1514   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1515   b->colmap      = 0;
1516   b->garray      = 0;
1517   b->roworiented = 1;
1518 
1519   /* stuff used for matrix vector multiply */
1520   b->lvec      = 0;
1521   b->Mvctx     = 0;
1522 
1523   /* stuff for MatGetRow() */
1524   b->rowindices   = 0;
1525   b->rowvalues    = 0;
1526   b->getrowactive = PETSC_FALSE;
1527 
1528   *A = B;
1529   return 0;
1530 }
1531 
1532 static int MatConvertSameType_MPIAIJ(Mat matin,Mat *newmat,int cpvalues)
1533 {
1534   Mat        mat;
1535   Mat_MPIAIJ *a,*oldmat = (Mat_MPIAIJ *) matin->data;
1536   int        ierr, len=0, flg;
1537 
1538   *newmat       = 0;
1539   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIAIJ,matin->comm);
1540   PLogObjectCreate(mat);
1541   mat->data       = (void *) (a = PetscNew(Mat_MPIAIJ)); CHKPTRQ(a);
1542   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1543   mat->destroy    = MatDestroy_MPIAIJ;
1544   mat->view       = MatView_MPIAIJ;
1545   mat->factor     = matin->factor;
1546   mat->assembled  = PETSC_TRUE;
1547 
1548   a->m = mat->m   = oldmat->m;
1549   a->n = mat->n   = oldmat->n;
1550   a->M = mat->M   = oldmat->M;
1551   a->N = mat->N   = oldmat->N;
1552 
1553   a->rstart       = oldmat->rstart;
1554   a->rend         = oldmat->rend;
1555   a->cstart       = oldmat->cstart;
1556   a->cend         = oldmat->cend;
1557   a->size         = oldmat->size;
1558   a->rank         = oldmat->rank;
1559   a->insertmode   = NOT_SET_VALUES;
1560   a->rowvalues    = 0;
1561   a->getrowactive = PETSC_FALSE;
1562 
1563   a->rowners = (int *) PetscMalloc((a->size+1)*sizeof(int)); CHKPTRQ(a->rowners);
1564   PLogObjectMemory(mat,(a->size+1)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIAIJ));
1565   PetscMemcpy(a->rowners,oldmat->rowners,(a->size+1)*sizeof(int));
1566   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1567   if (oldmat->colmap) {
1568     a->colmap = (int *) PetscMalloc((a->N)*sizeof(int));CHKPTRQ(a->colmap);
1569     PLogObjectMemory(mat,(a->N)*sizeof(int));
1570     PetscMemcpy(a->colmap,oldmat->colmap,(a->N)*sizeof(int));
1571   } else a->colmap = 0;
1572   if (oldmat->garray && (len = ((Mat_SeqAIJ *) (oldmat->B->data))->n)) {
1573     a->garray = (int *) PetscMalloc(len*sizeof(int)); CHKPTRQ(a->garray);
1574     PLogObjectMemory(mat,len*sizeof(int));
1575     PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1576   } else a->garray = 0;
1577 
1578   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1579   PLogObjectParent(mat,a->lvec);
1580   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1581   PLogObjectParent(mat,a->Mvctx);
1582   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1583   PLogObjectParent(mat,a->A);
1584   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1585   PLogObjectParent(mat,a->B);
1586   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1587   if (flg) {
1588     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1589   }
1590   *newmat = mat;
1591   return 0;
1592 }
1593 
1594 #include "sys.h"
1595 
1596 int MatLoad_MPIAIJ(Viewer viewer,MatType type,Mat *newmat)
1597 {
1598   Mat          A;
1599   int          i, nz, ierr, j,rstart, rend, fd;
1600   Scalar       *vals,*svals;
1601   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1602   MPI_Status   status;
1603   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,maxnz,*cols;
1604   int          *ourlens,*sndcounts = 0,*procsnz = 0, *offlens,jj,*mycols,*smycols;
1605   int          tag = ((PetscObject)viewer)->tag;
1606 
1607   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1608   if (!rank) {
1609     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1610     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
1611     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIAIJ:not matrix object");
1612   }
1613 
1614   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1615   M = header[1]; N = header[2];
1616   /* determine ownership of all rows */
1617   m = M/size + ((M % size) > rank);
1618   rowners = (int *) PetscMalloc((size+2)*sizeof(int)); CHKPTRQ(rowners);
1619   MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1620   rowners[0] = 0;
1621   for ( i=2; i<=size; i++ ) {
1622     rowners[i] += rowners[i-1];
1623   }
1624   rstart = rowners[rank];
1625   rend   = rowners[rank+1];
1626 
1627   /* distribute row lengths to all processors */
1628   ourlens = (int*) PetscMalloc( 2*(rend-rstart)*sizeof(int) ); CHKPTRQ(ourlens);
1629   offlens = ourlens + (rend-rstart);
1630   if (!rank) {
1631     rowlengths = (int*) PetscMalloc( M*sizeof(int) ); CHKPTRQ(rowlengths);
1632     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1633     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1634     for ( i=0; i<size; i++ ) sndcounts[i] = rowners[i+1] - rowners[i];
1635     MPI_Scatterv(rowlengths,sndcounts,rowners,MPI_INT,ourlens,rend-rstart,MPI_INT,0,comm);
1636     PetscFree(sndcounts);
1637   }
1638   else {
1639     MPI_Scatterv(0,0,0,MPI_INT,ourlens,rend-rstart,MPI_INT, 0,comm);
1640   }
1641 
1642   if (!rank) {
1643     /* calculate the number of nonzeros on each processor */
1644     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1645     PetscMemzero(procsnz,size*sizeof(int));
1646     for ( i=0; i<size; i++ ) {
1647       for ( j=rowners[i]; j< rowners[i+1]; j++ ) {
1648         procsnz[i] += rowlengths[j];
1649       }
1650     }
1651     PetscFree(rowlengths);
1652 
1653     /* determine max buffer needed and allocate it */
1654     maxnz = 0;
1655     for ( i=0; i<size; i++ ) {
1656       maxnz = PetscMax(maxnz,procsnz[i]);
1657     }
1658     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1659 
1660     /* read in my part of the matrix column indices  */
1661     nz = procsnz[0];
1662     mycols = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1663     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1664 
1665     /* read in every one elses and ship off */
1666     for ( i=1; i<size; i++ ) {
1667       nz = procsnz[i];
1668       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1669       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1670     }
1671     PetscFree(cols);
1672   }
1673   else {
1674     /* determine buffer space needed for message */
1675     nz = 0;
1676     for ( i=0; i<m; i++ ) {
1677       nz += ourlens[i];
1678     }
1679     mycols = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1680 
1681     /* receive message of column indices*/
1682     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1683     MPI_Get_count(&status,MPI_INT,&maxnz);
1684     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIAIJ:something is wrong with file");
1685   }
1686 
1687   /* loop over local rows, determining number of off diagonal entries */
1688   PetscMemzero(offlens,m*sizeof(int));
1689   jj = 0;
1690   for ( i=0; i<m; i++ ) {
1691     for ( j=0; j<ourlens[i]; j++ ) {
1692       if (mycols[jj] < rstart || mycols[jj] >= rend) offlens[i]++;
1693       jj++;
1694     }
1695   }
1696 
1697   /* create our matrix */
1698   for ( i=0; i<m; i++ ) {
1699     ourlens[i] -= offlens[i];
1700   }
1701   ierr = MatCreateMPIAIJ(comm,m,PETSC_DECIDE,M,N,0,ourlens,0,offlens,newmat);CHKERRQ(ierr);
1702   A = *newmat;
1703   MatSetOption(A,COLUMNS_SORTED);
1704   for ( i=0; i<m; i++ ) {
1705     ourlens[i] += offlens[i];
1706   }
1707 
1708   if (!rank) {
1709     vals = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(vals);
1710 
1711     /* read in my part of the matrix numerical values  */
1712     nz = procsnz[0];
1713     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1714 
1715     /* insert into matrix */
1716     jj      = rstart;
1717     smycols = mycols;
1718     svals   = vals;
1719     for ( i=0; i<m; i++ ) {
1720       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1721       smycols += ourlens[i];
1722       svals   += ourlens[i];
1723       jj++;
1724     }
1725 
1726     /* read in other processors and ship out */
1727     for ( i=1; i<size; i++ ) {
1728       nz = procsnz[i];
1729       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1730       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1731     }
1732     PetscFree(procsnz);
1733   }
1734   else {
1735     /* receive numeric values */
1736     vals = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(vals);
1737 
1738     /* receive message of values*/
1739     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1740     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1741     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIAIJ:something is wrong with file");
1742 
1743     /* insert into matrix */
1744     jj      = rstart;
1745     smycols = mycols;
1746     svals   = vals;
1747     for ( i=0; i<m; i++ ) {
1748       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1749       smycols += ourlens[i];
1750       svals   += ourlens[i];
1751       jj++;
1752     }
1753   }
1754   PetscFree(ourlens); PetscFree(vals); PetscFree(mycols); PetscFree(rowners);
1755 
1756   ierr = MatAssemblyBegin(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
1757   ierr = MatAssemblyEnd(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
1758   return 0;
1759 }
1760