xref: /petsc/src/mat/impls/aij/mpi/mpiaij.c (revision 47b4a8ea77628bb8b5f28a18a3fcb28317589dba)
1 #ifndef lint
2 static char vcid[] = "$Id: mpiaij.c,v 1.151 1996/07/08 01:18:41 curfman Exp curfman $";
3 #endif
4 
5 #include "mpiaij.h"
6 #include "src/vec/vecimpl.h"
7 #include "src/inline/spops.h"
8 
9 /* local utility routine that creates a mapping from the global column
10 number to the local number in the off-diagonal part of the local
11 storage of the matrix.  This is done in a non scable way since the
12 length of colmap equals the global matrix length.
13 */
14 static int CreateColmap_Private(Mat mat)
15 {
16   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
17   Mat_SeqAIJ *B = (Mat_SeqAIJ*) aij->B->data;
18   int        n = B->n,i,shift = B->indexshift;
19 
20   aij->colmap = (int *) PetscMalloc(aij->N*sizeof(int));CHKPTRQ(aij->colmap);
21   PLogObjectMemory(mat,aij->N*sizeof(int));
22   PetscMemzero(aij->colmap,aij->N*sizeof(int));
23   for ( i=0; i<n; i++ ) aij->colmap[aij->garray[i]] = i-shift;
24   return 0;
25 }
26 
27 extern int DisAssemble_MPIAIJ(Mat);
28 
29 static int MatGetReordering_MPIAIJ(Mat mat,MatReordering type,IS *rperm,IS *cperm)
30 {
31   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
32   int        ierr;
33   if (aij->size == 1) {
34     ierr = MatGetReordering(aij->A,type,rperm,cperm); CHKERRQ(ierr);
35   } else SETERRQ(1,"MatGetReordering_MPIAIJ:not supported in parallel");
36   return 0;
37 }
38 
39 static int MatSetValues_MPIAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
40 {
41   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
42   Mat_SeqAIJ *C = (Mat_SeqAIJ*) aij->A->data;
43   Scalar     value;
44   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
45   int        cstart = aij->cstart, cend = aij->cend,row,col;
46   int        shift = C->indexshift,roworiented = aij->roworiented;
47 
48   if (aij->insertmode != NOT_SET_VALUES && aij->insertmode != addv) {
49     SETERRQ(1,"MatSetValues_MPIAIJ:Cannot mix inserts and adds");
50   }
51   aij->insertmode = addv;
52   for ( i=0; i<m; i++ ) {
53     if (im[i] < 0) SETERRQ(1,"MatSetValues_MPIAIJ:Negative row");
54     if (im[i] >= aij->M) SETERRQ(1,"MatSetValues_MPIAIJ:Row too large");
55     if (im[i] >= rstart && im[i] < rend) {
56       row = im[i] - rstart;
57       for ( j=0; j<n; j++ ) {
58         if (in[j] < 0) SETERRQ(1,"MatSetValues_MPIAIJ:Negative column");
59         if (in[j] >= aij->N) SETERRQ(1,"MatSetValues_MPIAIJ:Col too large");
60         if (in[j] >= cstart && in[j] < cend){
61           col = in[j] - cstart;
62           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
63           ierr = MatSetValues(aij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
64         }
65         else {
66           if (mat->was_assembled) {
67             if (!aij->colmap) {ierr = CreateColmap_Private(mat);CHKERRQ(ierr);}
68             col = aij->colmap[in[j]] + shift;
69             if (col < 0 && !((Mat_SeqAIJ*)(aij->A->data))->nonew) {
70               ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
71               col =  in[j];
72             }
73           }
74           else col = in[j];
75           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
76           ierr = MatSetValues(aij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
77         }
78       }
79     }
80     else {
81       if (roworiented) {
82         ierr = StashValues_Private(&aij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
83       }
84       else {
85         row = im[i];
86         for ( j=0; j<n; j++ ) {
87           ierr = StashValues_Private(&aij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
88         }
89       }
90     }
91   }
92   return 0;
93 }
94 
95 static int MatGetValues_MPIAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
96 {
97   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
98   Mat_SeqAIJ *C = (Mat_SeqAIJ*) aij->A->data;
99   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
100   int        cstart = aij->cstart, cend = aij->cend,row,col;
101   int        shift = C->indexshift;
102 
103   for ( i=0; i<m; i++ ) {
104     if (idxm[i] < 0) SETERRQ(1,"MatGetValues_MPIAIJ:Negative row");
105     if (idxm[i] >= aij->M) SETERRQ(1,"MatGetValues_MPIAIJ:Row too large");
106     if (idxm[i] >= rstart && idxm[i] < rend) {
107       row = idxm[i] - rstart;
108       for ( j=0; j<n; j++ ) {
109         if (idxn[j] < 0) SETERRQ(1,"MatGetValues_MPIAIJ:Negative column");
110         if (idxn[j] >= aij->N) SETERRQ(1,"MatGetValues_MPIAIJ:Col too large");
111         if (idxn[j] >= cstart && idxn[j] < cend){
112           col = idxn[j] - cstart;
113           ierr = MatGetValues(aij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
114         }
115         else {
116           if (!aij->colmap) {ierr = CreateColmap_Private(mat);CHKERRQ(ierr);}
117           col = aij->colmap[idxn[j]] + shift;
118           if ( aij->garray[col] != idxn[j] ) *(v+i*n+j) = 0.0;
119           else {
120             ierr = MatGetValues(aij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
121           }
122         }
123       }
124     }
125     else {
126       SETERRQ(1,"MatGetValues_MPIAIJ:Only local values currently supported");
127     }
128   }
129   return 0;
130 }
131 
132 static int MatAssemblyBegin_MPIAIJ(Mat mat,MatAssemblyType mode)
133 {
134   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
135   MPI_Comm    comm = mat->comm;
136   int         size = aij->size, *owners = aij->rowners;
137   int         rank = aij->rank,tag = mat->tag, *owner,*starts,count,ierr;
138   MPI_Request *send_waits,*recv_waits;
139   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
140   InsertMode  addv;
141   Scalar      *rvalues,*svalues;
142 
143   /* make sure all processors are either in INSERTMODE or ADDMODE */
144   MPI_Allreduce(&aij->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
145   if (addv == (ADD_VALUES|INSERT_VALUES)) {
146     SETERRQ(1,"MatAssemblyBegin_MPIAIJ:Some processors inserted others added");
147   }
148   aij->insertmode = addv; /* in case this processor had no cache */
149 
150   /*  first count number of contributors to each processor */
151   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
152   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
153   owner = (int *) PetscMalloc( (aij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
154   for ( i=0; i<aij->stash.n; i++ ) {
155     idx = aij->stash.idx[i];
156     for ( j=0; j<size; j++ ) {
157       if (idx >= owners[j] && idx < owners[j+1]) {
158         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
159       }
160     }
161   }
162   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
163 
164   /* inform other processors of number of messages and max length*/
165   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
166   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
167   nreceives = work[rank];
168   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
169   nmax = work[rank];
170   PetscFree(work);
171 
172   /* post receives:
173        1) each message will consist of ordered pairs
174      (global index,value) we store the global index as a double
175      to simplify the message passing.
176        2) since we don't know how long each individual message is we
177      allocate the largest needed buffer for each receive. Potentially
178      this is a lot of wasted space.
179 
180 
181        This could be done better.
182   */
183   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
184   CHKPTRQ(rvalues);
185   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
186   CHKPTRQ(recv_waits);
187   for ( i=0; i<nreceives; i++ ) {
188     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
189               comm,recv_waits+i);
190   }
191 
192   /* do sends:
193       1) starts[i] gives the starting index in svalues for stuff going to
194          the ith processor
195   */
196   svalues = (Scalar *) PetscMalloc(3*(aij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
197   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
198   CHKPTRQ(send_waits);
199   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
200   starts[0] = 0;
201   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
202   for ( i=0; i<aij->stash.n; i++ ) {
203     svalues[3*starts[owner[i]]]       = (Scalar)  aij->stash.idx[i];
204     svalues[3*starts[owner[i]]+1]     = (Scalar)  aij->stash.idy[i];
205     svalues[3*(starts[owner[i]]++)+2] =  aij->stash.array[i];
206   }
207   PetscFree(owner);
208   starts[0] = 0;
209   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
210   count = 0;
211   for ( i=0; i<size; i++ ) {
212     if (procs[i]) {
213       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
214                 comm,send_waits+count++);
215     }
216   }
217   PetscFree(starts); PetscFree(nprocs);
218 
219   /* Free cache space */
220   PLogInfo(0,"[%d]MatAssemblyBegin_MPIAIJ:Number of off processor values %d\n",rank,aij->stash.n);
221   ierr = StashDestroy_Private(&aij->stash); CHKERRQ(ierr);
222 
223   aij->svalues    = svalues;    aij->rvalues    = rvalues;
224   aij->nsends     = nsends;     aij->nrecvs     = nreceives;
225   aij->send_waits = send_waits; aij->recv_waits = recv_waits;
226   aij->rmax       = nmax;
227 
228   return 0;
229 }
230 extern int MatSetUpMultiply_MPIAIJ(Mat);
231 
232 static int MatAssemblyEnd_MPIAIJ(Mat mat,MatAssemblyType mode)
233 {
234   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
235   Mat_SeqAIJ *C = (Mat_SeqAIJ *) aij->A->data;
236   MPI_Status  *send_status,recv_status;
237   int         imdex,nrecvs = aij->nrecvs, count = nrecvs, i, n, ierr;
238   int         row,col,other_disassembled,shift = C->indexshift;
239   Scalar      *values,val;
240   InsertMode  addv = aij->insertmode;
241 
242   /*  wait on receives */
243   while (count) {
244     MPI_Waitany(nrecvs,aij->recv_waits,&imdex,&recv_status);
245     /* unpack receives into our local space */
246     values = aij->rvalues + 3*imdex*aij->rmax;
247     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
248     n = n/3;
249     for ( i=0; i<n; i++ ) {
250       row = (int) PetscReal(values[3*i]) - aij->rstart;
251       col = (int) PetscReal(values[3*i+1]);
252       val = values[3*i+2];
253       if (col >= aij->cstart && col < aij->cend) {
254         col -= aij->cstart;
255         MatSetValues(aij->A,1,&row,1,&col,&val,addv);
256       }
257       else {
258         if (mat->was_assembled) {
259           if (!aij->colmap) {ierr = CreateColmap_Private(mat);CHKERRQ(ierr);}
260           col = aij->colmap[col] + shift;
261           if (col < 0  && !((Mat_SeqAIJ*)(aij->A->data))->nonew) {
262             ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
263             col = (int) PetscReal(values[3*i+1]);
264           }
265         }
266         MatSetValues(aij->B,1,&row,1,&col,&val,addv);
267       }
268     }
269     count--;
270   }
271   PetscFree(aij->recv_waits); PetscFree(aij->rvalues);
272 
273   /* wait on sends */
274   if (aij->nsends) {
275     send_status = (MPI_Status *) PetscMalloc(aij->nsends*sizeof(MPI_Status));
276     CHKPTRQ(send_status);
277     MPI_Waitall(aij->nsends,aij->send_waits,send_status);
278     PetscFree(send_status);
279   }
280   PetscFree(aij->send_waits); PetscFree(aij->svalues);
281 
282   aij->insertmode = NOT_SET_VALUES;
283   ierr = MatAssemblyBegin(aij->A,mode); CHKERRQ(ierr);
284   ierr = MatAssemblyEnd(aij->A,mode); CHKERRQ(ierr);
285 
286   /* determine if any processor has disassembled, if so we must
287      also disassemble ourselfs, in order that we may reassemble. */
288   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
289   if (mat->was_assembled && !other_disassembled) {
290     ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
291   }
292 
293   if (!mat->was_assembled && mode == FINAL_ASSEMBLY) {
294     ierr = MatSetUpMultiply_MPIAIJ(mat); CHKERRQ(ierr);
295   }
296   ierr = MatAssemblyBegin(aij->B,mode); CHKERRQ(ierr);
297   ierr = MatAssemblyEnd(aij->B,mode); CHKERRQ(ierr);
298 
299   if (aij->rowvalues) {PetscFree(aij->rowvalues); aij->rowvalues = 0;}
300   return 0;
301 }
302 
303 static int MatZeroEntries_MPIAIJ(Mat A)
304 {
305   Mat_MPIAIJ *l = (Mat_MPIAIJ *) A->data;
306   int        ierr;
307   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
308   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
309   return 0;
310 }
311 
312 /* the code does not do the diagonal entries correctly unless the
313    matrix is square and the column and row owerships are identical.
314    This is a BUG. The only way to fix it seems to be to access
315    aij->A and aij->B directly and not through the MatZeroRows()
316    routine.
317 */
318 static int MatZeroRows_MPIAIJ(Mat A,IS is,Scalar *diag)
319 {
320   Mat_MPIAIJ     *l = (Mat_MPIAIJ *) A->data;
321   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
322   int            *procs,*nprocs,j,found,idx,nsends,*work;
323   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
324   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
325   int            *lens,imdex,*lrows,*values;
326   MPI_Comm       comm = A->comm;
327   MPI_Request    *send_waits,*recv_waits;
328   MPI_Status     recv_status,*send_status;
329   IS             istmp;
330 
331   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
332   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
333 
334   /*  first count number of contributors to each processor */
335   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
336   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
337   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
338   for ( i=0; i<N; i++ ) {
339     idx = rows[i];
340     found = 0;
341     for ( j=0; j<size; j++ ) {
342       if (idx >= owners[j] && idx < owners[j+1]) {
343         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
344       }
345     }
346     if (!found) SETERRQ(1,"MatZeroRows_MPIAIJ:Index out of range");
347   }
348   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
349 
350   /* inform other processors of number of messages and max length*/
351   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
352   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
353   nrecvs = work[rank];
354   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
355   nmax = work[rank];
356   PetscFree(work);
357 
358   /* post receives:   */
359   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
360   CHKPTRQ(rvalues);
361   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
362   CHKPTRQ(recv_waits);
363   for ( i=0; i<nrecvs; i++ ) {
364     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
365   }
366 
367   /* do sends:
368       1) starts[i] gives the starting index in svalues for stuff going to
369          the ith processor
370   */
371   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
372   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
373   CHKPTRQ(send_waits);
374   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
375   starts[0] = 0;
376   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
377   for ( i=0; i<N; i++ ) {
378     svalues[starts[owner[i]]++] = rows[i];
379   }
380   ISRestoreIndices(is,&rows);
381 
382   starts[0] = 0;
383   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
384   count = 0;
385   for ( i=0; i<size; i++ ) {
386     if (procs[i]) {
387       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
388     }
389   }
390   PetscFree(starts);
391 
392   base = owners[rank];
393 
394   /*  wait on receives */
395   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
396   source = lens + nrecvs;
397   count  = nrecvs; slen = 0;
398   while (count) {
399     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
400     /* unpack receives into our local space */
401     MPI_Get_count(&recv_status,MPI_INT,&n);
402     source[imdex]  = recv_status.MPI_SOURCE;
403     lens[imdex]  = n;
404     slen += n;
405     count--;
406   }
407   PetscFree(recv_waits);
408 
409   /* move the data into the send scatter */
410   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
411   count = 0;
412   for ( i=0; i<nrecvs; i++ ) {
413     values = rvalues + i*nmax;
414     for ( j=0; j<lens[i]; j++ ) {
415       lrows[count++] = values[j] - base;
416     }
417   }
418   PetscFree(rvalues); PetscFree(lens);
419   PetscFree(owner); PetscFree(nprocs);
420 
421   /* actually zap the local rows */
422   ierr = ISCreateSeq(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
423   PLogObjectParent(A,istmp);
424   PetscFree(lrows);
425   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
426   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
427   ierr = ISDestroy(istmp); CHKERRQ(ierr);
428 
429   /* wait on sends */
430   if (nsends) {
431     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
432     CHKPTRQ(send_status);
433     MPI_Waitall(nsends,send_waits,send_status);
434     PetscFree(send_status);
435   }
436   PetscFree(send_waits); PetscFree(svalues);
437 
438   return 0;
439 }
440 
441 static int MatMult_MPIAIJ(Mat A,Vec xx,Vec yy)
442 {
443   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
444   int        ierr,nt;
445 
446   ierr = VecGetLocalSize(xx,&nt);  CHKERRQ(ierr);
447   if (nt != a->n) {
448     SETERRQ(1,"MatMult_MPIAIJ:Incompatible parition of A and xx");
449   }
450   ierr = VecGetLocalSize(yy,&nt);  CHKERRQ(ierr);
451   if (nt != a->m) {
452     SETERRQ(1,"MatMult_MPIAIJ:Incompatible parition of A and yy");
453   }
454   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx); CHKERRQ(ierr);
455   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
456   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx); CHKERRQ(ierr);
457   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
458   return 0;
459 }
460 
461 static int MatMultAdd_MPIAIJ(Mat A,Vec xx,Vec yy,Vec zz)
462 {
463   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
464   int        ierr;
465   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
466   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
467   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
468   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
469   return 0;
470 }
471 
472 static int MatMultTrans_MPIAIJ(Mat A,Vec xx,Vec yy)
473 {
474   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
475   int        ierr;
476 
477   /* do nondiagonal part */
478   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
479   /* send it on its way */
480   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,
481                 (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
482   /* do local part */
483   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
484   /* receive remote parts: note this assumes the values are not actually */
485   /* inserted in yy until the next line, which is true for my implementation*/
486   /* but is not perhaps always true. */
487   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,
488                   (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
489   return 0;
490 }
491 
492 static int MatMultTransAdd_MPIAIJ(Mat A,Vec xx,Vec yy,Vec zz)
493 {
494   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
495   int        ierr;
496 
497   /* do nondiagonal part */
498   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
499   /* send it on its way */
500   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,
501                  (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
502   /* do local part */
503   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
504   /* receive remote parts: note this assumes the values are not actually */
505   /* inserted in yy until the next line, which is true for my implementation*/
506   /* but is not perhaps always true. */
507   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,
508                   (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
509   return 0;
510 }
511 
512 /*
513   This only works correctly for square matrices where the subblock A->A is the
514    diagonal block
515 */
516 static int MatGetDiagonal_MPIAIJ(Mat A,Vec v)
517 {
518   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
519   if (a->M != a->N)
520     SETERRQ(1,"MatGetDiagonal_MPIAIJ:Supports only square matrix where A->A is diag block");
521   return MatGetDiagonal(a->A,v);
522 }
523 
524 static int MatScale_MPIAIJ(Scalar *aa,Mat A)
525 {
526   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
527   int        ierr;
528   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
529   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
530   return 0;
531 }
532 
533 static int MatDestroy_MPIAIJ(PetscObject obj)
534 {
535   Mat        mat = (Mat) obj;
536   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
537   int        ierr;
538 #if defined(PETSC_LOG)
539   PLogObjectState(obj,"Rows=%d, Cols=%d",aij->M,aij->N);
540 #endif
541   PetscFree(aij->rowners);
542   ierr = MatDestroy(aij->A); CHKERRQ(ierr);
543   ierr = MatDestroy(aij->B); CHKERRQ(ierr);
544   if (aij->colmap) PetscFree(aij->colmap);
545   if (aij->garray) PetscFree(aij->garray);
546   if (aij->lvec)   VecDestroy(aij->lvec);
547   if (aij->Mvctx)  VecScatterDestroy(aij->Mvctx);
548   if (aij->rowvalues) PetscFree(aij->rowvalues);
549   PetscFree(aij);
550   PLogObjectDestroy(mat);
551   PetscHeaderDestroy(mat);
552   return 0;
553 }
554 #include "draw.h"
555 #include "pinclude/pviewer.h"
556 
557 static int MatView_MPIAIJ_Binary(Mat mat,Viewer viewer)
558 {
559   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
560   int         ierr;
561 
562   if (aij->size == 1) {
563     ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
564   }
565   else SETERRQ(1,"MatView_MPIAIJ_Binary:Only uniprocessor output supported");
566   return 0;
567 }
568 
569 static int MatView_MPIAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
570 {
571   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
572   Mat_SeqAIJ* C = (Mat_SeqAIJ*)aij->A->data;
573   int         ierr, format,shift = C->indexshift,rank;
574   FILE        *fd;
575   ViewerType  vtype;
576 
577   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
578   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
579     ierr = ViewerGetFormat(viewer,&format);
580     if (format == ASCII_FORMAT_INFO_DETAILED) {
581       int nz, nzalloc, mem, flg;
582       MPI_Comm_rank(mat->comm,&rank);
583       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
584       ierr = MatGetInfo(mat,MAT_LOCAL,&nz,&nzalloc,&mem);
585       ierr = OptionsHasName(PETSC_NULL,"-mat_aij_no_inode",&flg); CHKERRQ(ierr);
586       PetscSequentialPhaseBegin(mat->comm,1);
587       if (flg) fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d mem %d, not using I-node routines\n",
588          rank,aij->m,nz,nzalloc,mem);
589       else fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d mem %d, using I-node routines\n",
590          rank,aij->m,nz,nzalloc,mem);
591       ierr = MatGetInfo(aij->A,MAT_LOCAL,&nz,&nzalloc,&mem);
592       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,nz);
593       ierr = MatGetInfo(aij->B,MAT_LOCAL,&nz,&nzalloc,&mem);
594       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,nz);
595       fflush(fd);
596       PetscSequentialPhaseEnd(mat->comm,1);
597       ierr = VecScatterView(aij->Mvctx,viewer); CHKERRQ(ierr);
598       return 0;
599     }
600     else if (format == ASCII_FORMAT_INFO) {
601       return 0;
602     }
603   }
604 
605   if (vtype == DRAW_VIEWER) {
606     Draw       draw;
607     PetscTruth isnull;
608     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
609     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
610   }
611 
612   if (vtype == ASCII_FILE_VIEWER) {
613     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
614     PetscSequentialPhaseBegin(mat->comm,1);
615     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
616            aij->rank,aij->m,aij->rstart,aij->rend,aij->n,aij->cstart,
617            aij->cend);
618     ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
619     ierr = MatView(aij->B,viewer); CHKERRQ(ierr);
620     fflush(fd);
621     PetscSequentialPhaseEnd(mat->comm,1);
622   }
623   else {
624     int size = aij->size;
625     rank = aij->rank;
626     if (size == 1) {
627       ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
628     }
629     else {
630       /* assemble the entire matrix onto first processor. */
631       Mat         A;
632       Mat_SeqAIJ *Aloc;
633       int         M = aij->M, N = aij->N,m,*ai,*aj,row,*cols,i,*ct;
634       Scalar      *a;
635 
636       if (!rank) {
637         ierr = MatCreateMPIAIJ(mat->comm,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
638                CHKERRQ(ierr);
639       }
640       else {
641         ierr = MatCreateMPIAIJ(mat->comm,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
642                CHKERRQ(ierr);
643       }
644       PLogObjectParent(mat,A);
645 
646       /* copy over the A part */
647       Aloc = (Mat_SeqAIJ*) aij->A->data;
648       m = Aloc->m; ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
649       row = aij->rstart;
650       for ( i=0; i<ai[m]+shift; i++ ) {aj[i] += aij->cstart + shift;}
651       for ( i=0; i<m; i++ ) {
652         ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],aj,a,INSERT_VALUES);CHKERRQ(ierr);
653         row++; a += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
654       }
655       aj = Aloc->j;
656       for ( i=0; i<ai[m]+shift; i++ ) {aj[i] -= aij->cstart + shift;}
657 
658       /* copy over the B part */
659       Aloc = (Mat_SeqAIJ*) aij->B->data;
660       m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
661       row = aij->rstart;
662       ct = cols = (int *) PetscMalloc( (ai[m]+1)*sizeof(int) ); CHKPTRQ(cols);
663       for ( i=0; i<ai[m]+shift; i++ ) {cols[i] = aij->garray[aj[i]+shift];}
664       for ( i=0; i<m; i++ ) {
665         ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],cols,a,INSERT_VALUES);CHKERRQ(ierr);
666         row++; a += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
667       }
668       PetscFree(ct);
669       ierr = MatAssemblyBegin(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
670       ierr = MatAssemblyEnd(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
671       if (!rank) {
672         ierr = MatView(((Mat_MPIAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
673       }
674       ierr = MatDestroy(A); CHKERRQ(ierr);
675     }
676   }
677   return 0;
678 }
679 
680 static int MatView_MPIAIJ(PetscObject obj,Viewer viewer)
681 {
682   Mat         mat = (Mat) obj;
683   int         ierr;
684   ViewerType  vtype;
685 
686   if (!viewer) {
687     viewer = STDOUT_VIEWER_SELF;
688   }
689   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
690   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
691       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
692     ierr = MatView_MPIAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
693   }
694   else if (vtype == BINARY_FILE_VIEWER) {
695     return MatView_MPIAIJ_Binary(mat,viewer);
696   }
697   return 0;
698 }
699 
700 /*
701     This has to provide several versions.
702 
703      1) per sequential
704      2) a) use only local smoothing updating outer values only once.
705         b) local smoothing updating outer values each inner iteration
706      3) color updating out values betwen colors.
707 */
708 static int MatRelax_MPIAIJ(Mat matin,Vec bb,double omega,MatSORType flag,
709                            double fshift,int its,Vec xx)
710 {
711   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
712   Mat        AA = mat->A, BB = mat->B;
713   Mat_SeqAIJ *A = (Mat_SeqAIJ *) AA->data, *B = (Mat_SeqAIJ *)BB->data;
714   Scalar     zero = 0.0,*b,*x,*xs,*ls,d,*v,sum,scale,*t,*ts;
715   int        ierr,*idx, *diag;
716   int        n = mat->n, m = mat->m, i,shift = A->indexshift;
717   Vec        tt;
718 
719   VecGetArray(xx,&x); VecGetArray(bb,&b); VecGetArray(mat->lvec,&ls);
720   xs = x + shift; /* shift by one for index start of 1 */
721   ls = ls + shift;
722   if (!A->diag) {if ((ierr = MatMarkDiag_SeqAIJ(AA))) return ierr;}
723   diag = A->diag;
724   if (flag == SOR_APPLY_UPPER || flag == SOR_APPLY_LOWER) {
725     SETERRQ(1,"MatRelax_MPIAIJ:Option not supported");
726   }
727   if (flag & SOR_EISENSTAT) {
728     /* Let  A = L + U + D; where L is lower trianglar,
729     U is upper triangular, E is diagonal; This routine applies
730 
731             (L + E)^{-1} A (U + E)^{-1}
732 
733     to a vector efficiently using Eisenstat's trick. This is for
734     the case of SSOR preconditioner, so E is D/omega where omega
735     is the relaxation factor.
736     */
737     ierr = VecDuplicate(xx,&tt); CHKERRQ(ierr);
738     PLogObjectParent(matin,tt);
739     VecGetArray(tt,&t);
740     scale = (2.0/omega) - 1.0;
741     /*  x = (E + U)^{-1} b */
742     VecSet(&zero,mat->lvec);
743     ierr = VecPipelineBegin(xx,mat->lvec,INSERT_VALUES,PIPELINE_UP,
744                               mat->Mvctx); CHKERRQ(ierr);
745     for ( i=m-1; i>-1; i-- ) {
746       n    = A->i[i+1] - diag[i] - 1;
747       idx  = A->j + diag[i] + !shift;
748       v    = A->a + diag[i] + !shift;
749       sum  = b[i];
750       SPARSEDENSEMDOT(sum,xs,v,idx,n);
751       d    = fshift + A->a[diag[i]+shift];
752       n    = B->i[i+1] - B->i[i];
753       idx  = B->j + B->i[i] + shift;
754       v    = B->a + B->i[i] + shift;
755       SPARSEDENSEMDOT(sum,ls,v,idx,n);
756       x[i] = omega*(sum/d);
757     }
758     ierr = VecPipelineEnd(xx,mat->lvec,INSERT_VALUES,PIPELINE_UP,
759                             mat->Mvctx); CHKERRQ(ierr);
760 
761     /*  t = b - (2*E - D)x */
762     v = A->a;
763     for ( i=0; i<m; i++ ) { t[i] = b[i] - scale*(v[*diag++ + shift])*x[i]; }
764 
765     /*  t = (E + L)^{-1}t */
766     ts = t + shift; /* shifted by one for index start of a or mat->j*/
767     diag = A->diag;
768     VecSet(&zero,mat->lvec);
769     ierr = VecPipelineBegin(tt,mat->lvec,INSERT_VALUES,PIPELINE_DOWN,
770                                                  mat->Mvctx); CHKERRQ(ierr);
771     for ( i=0; i<m; i++ ) {
772       n    = diag[i] - A->i[i];
773       idx  = A->j + A->i[i] + shift;
774       v    = A->a + A->i[i] + shift;
775       sum  = t[i];
776       SPARSEDENSEMDOT(sum,ts,v,idx,n);
777       d    = fshift + A->a[diag[i]+shift];
778       n    = B->i[i+1] - B->i[i];
779       idx  = B->j + B->i[i] + shift;
780       v    = B->a + B->i[i] + shift;
781       SPARSEDENSEMDOT(sum,ls,v,idx,n);
782       t[i] = omega*(sum/d);
783     }
784     ierr = VecPipelineEnd(tt,mat->lvec,INSERT_VALUES,PIPELINE_DOWN,
785                                                     mat->Mvctx); CHKERRQ(ierr);
786     /*  x = x + t */
787     for ( i=0; i<m; i++ ) { x[i] += t[i]; }
788     VecDestroy(tt);
789     return 0;
790   }
791 
792 
793   if ((flag & SOR_SYMMETRIC_SWEEP) == SOR_SYMMETRIC_SWEEP){
794     if (flag & SOR_ZERO_INITIAL_GUESS) {
795       VecSet(&zero,mat->lvec); VecSet(&zero,xx);
796     }
797     else {
798       ierr=VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_UP,mat->Mvctx);
799       CHKERRQ(ierr);
800       ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_UP,mat->Mvctx);
801       CHKERRQ(ierr);
802     }
803     while (its--) {
804       /* go down through the rows */
805       ierr = VecPipelineBegin(xx,mat->lvec,INSERT_VALUES,PIPELINE_DOWN,
806                               mat->Mvctx); CHKERRQ(ierr);
807       for ( i=0; i<m; i++ ) {
808         n    = A->i[i+1] - A->i[i];
809         idx  = A->j + A->i[i] + shift;
810         v    = A->a + A->i[i] + shift;
811         sum  = b[i];
812         SPARSEDENSEMDOT(sum,xs,v,idx,n);
813         d    = fshift + A->a[diag[i]+shift];
814         n    = B->i[i+1] - B->i[i];
815         idx  = B->j + B->i[i] + shift;
816         v    = B->a + B->i[i] + shift;
817         SPARSEDENSEMDOT(sum,ls,v,idx,n);
818         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
819       }
820       ierr = VecPipelineEnd(xx,mat->lvec,INSERT_VALUES,PIPELINE_DOWN,
821                             mat->Mvctx); CHKERRQ(ierr);
822       /* come up through the rows */
823       ierr = VecPipelineBegin(xx,mat->lvec,INSERT_VALUES,PIPELINE_UP,
824                               mat->Mvctx); CHKERRQ(ierr);
825       for ( i=m-1; i>-1; i-- ) {
826         n    = A->i[i+1] - A->i[i];
827         idx  = A->j + A->i[i] + shift;
828         v    = A->a + A->i[i] + shift;
829         sum  = b[i];
830         SPARSEDENSEMDOT(sum,xs,v,idx,n);
831         d    = fshift + A->a[diag[i]+shift];
832         n    = B->i[i+1] - B->i[i];
833         idx  = B->j + B->i[i] + shift;
834         v    = B->a + B->i[i] + shift;
835         SPARSEDENSEMDOT(sum,ls,v,idx,n);
836         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
837       }
838       ierr = VecPipelineEnd(xx,mat->lvec,INSERT_VALUES,PIPELINE_UP,
839                             mat->Mvctx); CHKERRQ(ierr);
840     }
841   }
842   else if (flag & SOR_FORWARD_SWEEP){
843     if (flag & SOR_ZERO_INITIAL_GUESS) {
844       VecSet(&zero,mat->lvec);
845       ierr = VecPipelineBegin(xx,mat->lvec,INSERT_VALUES,PIPELINE_DOWN,
846                               mat->Mvctx); CHKERRQ(ierr);
847       for ( i=0; i<m; i++ ) {
848         n    = diag[i] - A->i[i];
849         idx  = A->j + A->i[i] + shift;
850         v    = A->a + A->i[i] + shift;
851         sum  = b[i];
852         SPARSEDENSEMDOT(sum,xs,v,idx,n);
853         d    = fshift + A->a[diag[i]+shift];
854         n    = B->i[i+1] - B->i[i];
855         idx  = B->j + B->i[i] + shift;
856         v    = B->a + B->i[i] + shift;
857         SPARSEDENSEMDOT(sum,ls,v,idx,n);
858         x[i] = omega*(sum/d);
859       }
860       ierr = VecPipelineEnd(xx,mat->lvec,INSERT_VALUES,PIPELINE_DOWN,
861                             mat->Mvctx); CHKERRQ(ierr);
862       its--;
863     }
864     while (its--) {
865       ierr=VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_UP,mat->Mvctx);
866       CHKERRQ(ierr);
867       ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_UP,mat->Mvctx);
868       CHKERRQ(ierr);
869       ierr = VecPipelineBegin(xx,mat->lvec,INSERT_VALUES,PIPELINE_DOWN,
870                               mat->Mvctx); CHKERRQ(ierr);
871       for ( i=0; i<m; i++ ) {
872         n    = A->i[i+1] - A->i[i];
873         idx  = A->j + A->i[i] + shift;
874         v    = A->a + A->i[i] + shift;
875         sum  = b[i];
876         SPARSEDENSEMDOT(sum,xs,v,idx,n);
877         d    = fshift + A->a[diag[i]+shift];
878         n    = B->i[i+1] - B->i[i];
879         idx  = B->j + B->i[i] + shift;
880         v    = B->a + B->i[i] + shift;
881         SPARSEDENSEMDOT(sum,ls,v,idx,n);
882         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
883       }
884       ierr = VecPipelineEnd(xx,mat->lvec,INSERT_VALUES,PIPELINE_DOWN,
885                             mat->Mvctx); CHKERRQ(ierr);
886     }
887   }
888   else if (flag & SOR_BACKWARD_SWEEP){
889     if (flag & SOR_ZERO_INITIAL_GUESS) {
890       VecSet(&zero,mat->lvec);
891       ierr = VecPipelineBegin(xx,mat->lvec,INSERT_VALUES,PIPELINE_UP,
892                               mat->Mvctx); CHKERRQ(ierr);
893       for ( i=m-1; i>-1; i-- ) {
894         n    = A->i[i+1] - diag[i] - 1;
895         idx  = A->j + diag[i] + !shift;
896         v    = A->a + diag[i] + !shift;
897         sum  = b[i];
898         SPARSEDENSEMDOT(sum,xs,v,idx,n);
899         d    = fshift + A->a[diag[i]+shift];
900         n    = B->i[i+1] - B->i[i];
901         idx  = B->j + B->i[i] + shift;
902         v    = B->a + B->i[i] + shift;
903         SPARSEDENSEMDOT(sum,ls,v,idx,n);
904         x[i] = omega*(sum/d);
905       }
906       ierr = VecPipelineEnd(xx,mat->lvec,INSERT_VALUES,PIPELINE_UP,
907                             mat->Mvctx); CHKERRQ(ierr);
908       its--;
909     }
910     while (its--) {
911       ierr = VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_DOWN,
912                             mat->Mvctx); CHKERRQ(ierr);
913       ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_DOWN,
914                             mat->Mvctx); CHKERRQ(ierr);
915       ierr = VecPipelineBegin(xx,mat->lvec,INSERT_VALUES,PIPELINE_UP,
916                               mat->Mvctx); CHKERRQ(ierr);
917       for ( i=m-1; i>-1; i-- ) {
918         n    = A->i[i+1] - A->i[i];
919         idx  = A->j + A->i[i] + shift;
920         v    = A->a + A->i[i] + shift;
921         sum  = b[i];
922         SPARSEDENSEMDOT(sum,xs,v,idx,n);
923         d    = fshift + A->a[diag[i]+shift];
924         n    = B->i[i+1] - B->i[i];
925         idx  = B->j + B->i[i] + shift;
926         v    = B->a + B->i[i] + shift;
927         SPARSEDENSEMDOT(sum,ls,v,idx,n);
928         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
929       }
930       ierr = VecPipelineEnd(xx,mat->lvec,INSERT_VALUES,PIPELINE_UP,
931                             mat->Mvctx); CHKERRQ(ierr);
932     }
933   }
934   else if ((flag & SOR_LOCAL_SYMMETRIC_SWEEP) == SOR_LOCAL_SYMMETRIC_SWEEP){
935     if (flag & SOR_ZERO_INITIAL_GUESS) {
936       return (*mat->A->ops.relax)(mat->A,bb,omega,flag,fshift,its,xx);
937     }
938     ierr=VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,mat->Mvctx);
939     CHKERRQ(ierr);
940     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,mat->Mvctx);
941     CHKERRQ(ierr);
942     while (its--) {
943       /* go down through the rows */
944       for ( i=0; i<m; i++ ) {
945         n    = A->i[i+1] - A->i[i];
946         idx  = A->j + A->i[i] + shift;
947         v    = A->a + A->i[i] + shift;
948         sum  = b[i];
949         SPARSEDENSEMDOT(sum,xs,v,idx,n);
950         d    = fshift + A->a[diag[i]+shift];
951         n    = B->i[i+1] - B->i[i];
952         idx  = B->j + B->i[i] + shift;
953         v    = B->a + B->i[i] + shift;
954         SPARSEDENSEMDOT(sum,ls,v,idx,n);
955         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
956       }
957       /* come up through the rows */
958       for ( i=m-1; i>-1; i-- ) {
959         n    = A->i[i+1] - A->i[i];
960         idx  = A->j + A->i[i] + shift;
961         v    = A->a + A->i[i] + shift;
962         sum  = b[i];
963         SPARSEDENSEMDOT(sum,xs,v,idx,n);
964         d    = fshift + A->a[diag[i]+shift];
965         n    = B->i[i+1] - B->i[i];
966         idx  = B->j + B->i[i] + shift;
967         v    = B->a + B->i[i] + shift;
968         SPARSEDENSEMDOT(sum,ls,v,idx,n);
969         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
970       }
971     }
972   }
973   else if (flag & SOR_LOCAL_FORWARD_SWEEP){
974     if (flag & SOR_ZERO_INITIAL_GUESS) {
975       return (*mat->A->ops.relax)(mat->A,bb,omega,flag,fshift,its,xx);
976     }
977     ierr=VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,mat->Mvctx);
978     CHKERRQ(ierr);
979     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,mat->Mvctx);
980     CHKERRQ(ierr);
981     while (its--) {
982       for ( i=0; i<m; i++ ) {
983         n    = A->i[i+1] - A->i[i];
984         idx  = A->j + A->i[i] + shift;
985         v    = A->a + A->i[i] + shift;
986         sum  = b[i];
987         SPARSEDENSEMDOT(sum,xs,v,idx,n);
988         d    = fshift + A->a[diag[i]+shift];
989         n    = B->i[i+1] - B->i[i];
990         idx  = B->j + B->i[i] + shift;
991         v    = B->a + B->i[i] + shift;
992         SPARSEDENSEMDOT(sum,ls,v,idx,n);
993         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
994       }
995     }
996   }
997   else if (flag & SOR_LOCAL_BACKWARD_SWEEP){
998     if (flag & SOR_ZERO_INITIAL_GUESS) {
999       return (*mat->A->ops.relax)(mat->A,bb,omega,flag,fshift,its,xx);
1000     }
1001     ierr = VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,
1002                             mat->Mvctx); CHKERRQ(ierr);
1003     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,
1004                             mat->Mvctx); CHKERRQ(ierr);
1005     while (its--) {
1006       for ( i=m-1; i>-1; i-- ) {
1007         n    = A->i[i+1] - A->i[i];
1008         idx  = A->j + A->i[i] + shift;
1009         v    = A->a + A->i[i] + shift;
1010         sum  = b[i];
1011         SPARSEDENSEMDOT(sum,xs,v,idx,n);
1012         d    = fshift + A->a[diag[i]+shift];
1013         n    = B->i[i+1] - B->i[i];
1014         idx  = B->j + B->i[i] + shift;
1015         v    = B->a + B->i[i] + shift;
1016         SPARSEDENSEMDOT(sum,ls,v,idx,n);
1017         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
1018       }
1019     }
1020   }
1021   return 0;
1022 }
1023 
1024 static int MatGetInfo_MPIAIJ(Mat matin,MatInfoType flag,int *nz,
1025                              int *nzalloc,int *mem)
1026 {
1027   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1028   Mat        A = mat->A, B = mat->B;
1029   int        ierr, isend[3], irecv[3], nzA, nzallocA, memA;
1030 
1031   ierr = MatGetInfo(A,MAT_LOCAL,&nzA,&nzallocA,&memA); CHKERRQ(ierr);
1032   ierr = MatGetInfo(B,MAT_LOCAL,&isend[0],&isend[1],&isend[2]); CHKERRQ(ierr);
1033   isend[0] += nzA; isend[1] += nzallocA; isend[2] += memA;
1034   if (flag == MAT_LOCAL) {
1035     if (nz)       *nz      = isend[0];
1036     if (nzalloc)  *nzalloc = isend[1];
1037     if (mem)      *mem     = isend[2];
1038   } else if (flag == MAT_GLOBAL_MAX) {
1039     MPI_Allreduce( isend, irecv,3,MPI_INT,MPI_MAX,matin->comm);
1040     if (nz)      *nz      = irecv[0];
1041     if (nzalloc) *nzalloc = irecv[1];
1042     if (mem)     *mem     = irecv[2];
1043   } else if (flag == MAT_GLOBAL_SUM) {
1044     MPI_Allreduce( isend, irecv,3,MPI_INT,MPI_SUM,matin->comm);
1045     if (nz)      *nz      = irecv[0];
1046     if (nzalloc) *nzalloc = irecv[1];
1047     if (mem)     *mem     = irecv[2];
1048   }
1049   return 0;
1050 }
1051 
1052 extern int MatLUFactorSymbolic_MPIAIJ(Mat,IS,IS,double,Mat*);
1053 extern int MatLUFactorNumeric_MPIAIJ(Mat,Mat*);
1054 extern int MatLUFactor_MPIAIJ(Mat,IS,IS,double);
1055 extern int MatILUFactorSymbolic_MPIAIJ(Mat,IS,IS,double,int,Mat *);
1056 extern int MatSolve_MPIAIJ(Mat,Vec,Vec);
1057 extern int MatSolveAdd_MPIAIJ(Mat,Vec,Vec,Vec);
1058 extern int MatSolveTrans_MPIAIJ(Mat,Vec,Vec);
1059 extern int MatSolveTransAdd_MPIAIJ(Mat,Vec,Vec,Vec);
1060 
1061 static int MatSetOption_MPIAIJ(Mat A,MatOption op)
1062 {
1063   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
1064 
1065   if (op == NO_NEW_NONZERO_LOCATIONS ||
1066       op == YES_NEW_NONZERO_LOCATIONS ||
1067       op == COLUMNS_SORTED ||
1068       op == ROW_ORIENTED) {
1069         MatSetOption(a->A,op);
1070         MatSetOption(a->B,op);
1071   }
1072   else if (op == ROWS_SORTED ||
1073            op == SYMMETRIC_MATRIX ||
1074            op == STRUCTURALLY_SYMMETRIC_MATRIX ||
1075            op == YES_NEW_DIAGONALS)
1076     PLogInfo(A,"Info:MatSetOption_MPIAIJ:Option ignored\n");
1077   else if (op == COLUMN_ORIENTED) {
1078     a->roworiented = 0;
1079     MatSetOption(a->A,op);
1080     MatSetOption(a->B,op);
1081   }
1082   else if (op == NO_NEW_DIAGONALS)
1083     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIAIJ:NO_NEW_DIAGONALS");}
1084   else
1085     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIAIJ:unknown option");}
1086   return 0;
1087 }
1088 
1089 static int MatGetSize_MPIAIJ(Mat matin,int *m,int *n)
1090 {
1091   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1092   *m = mat->M; *n = mat->N;
1093   return 0;
1094 }
1095 
1096 static int MatGetLocalSize_MPIAIJ(Mat matin,int *m,int *n)
1097 {
1098   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1099   *m = mat->m; *n = mat->N;
1100   return 0;
1101 }
1102 
1103 static int MatGetOwnershipRange_MPIAIJ(Mat matin,int *m,int *n)
1104 {
1105   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1106   *m = mat->rstart; *n = mat->rend;
1107   return 0;
1108 }
1109 
1110 extern int MatGetRow_SeqAIJ(Mat,int,int*,int**,Scalar**);
1111 extern int MatRestoreRow_SeqAIJ(Mat,int,int*,int**,Scalar**);
1112 
1113 int MatGetRow_MPIAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
1114 {
1115   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1116   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
1117   int        i, ierr, *cworkA, *cworkB, **pcA, **pcB, cstart = mat->cstart;
1118   int        nztot, nzA, nzB, lrow, rstart = mat->rstart, rend = mat->rend;
1119   int        *cmap, *idx_p;
1120 
1121   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,"MatGetRow_MPIAIJ:Already active");
1122   mat->getrowactive = PETSC_TRUE;
1123 
1124   if (!mat->rowvalues && (idx || v)) {
1125     /*
1126         allocate enough space to hold information from the longest row.
1127     */
1128     Mat_SeqAIJ *Aa = (Mat_SeqAIJ *) mat->A->data,*Ba = (Mat_SeqAIJ *) mat->B->data;
1129     int     max = 1,n = mat->n,tmp;
1130     for ( i=0; i<n; i++ ) {
1131       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
1132       if (max < tmp) { max = tmp; }
1133     }
1134     mat->rowvalues = (Scalar *) PetscMalloc( max*(sizeof(int)+sizeof(Scalar)));
1135     CHKPTRQ(mat->rowvalues);
1136     mat->rowindices = (int *) (mat->rowvalues + max);
1137   }
1138 
1139 
1140   if (row < rstart || row >= rend) SETERRQ(1,"MatGetRow_MPIAIJ:Only local rows")
1141   lrow = row - rstart;
1142 
1143   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
1144   if (!v)   {pvA = 0; pvB = 0;}
1145   if (!idx) {pcA = 0; if (!v) pcB = 0;}
1146   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1147   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1148   nztot = nzA + nzB;
1149 
1150   cmap  = mat->garray;
1151   if (v  || idx) {
1152     if (nztot) {
1153       /* Sort by increasing column numbers, assuming A and B already sorted */
1154       int imark = -1;
1155       if (v) {
1156         *v = v_p = mat->rowvalues;
1157         for ( i=0; i<nzB; i++ ) {
1158           if (cmap[cworkB[i]] < cstart)   v_p[i] = vworkB[i];
1159           else break;
1160         }
1161         imark = i;
1162         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
1163         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
1164       }
1165       if (idx) {
1166         *idx = idx_p = mat->rowindices;
1167         if (imark > -1) {
1168           for ( i=0; i<imark; i++ ) {
1169             idx_p[i] = cmap[cworkB[i]];
1170           }
1171         } else {
1172           for ( i=0; i<nzB; i++ ) {
1173             if (cmap[cworkB[i]] < cstart)   idx_p[i] = cmap[cworkB[i]];
1174             else break;
1175           }
1176           imark = i;
1177         }
1178         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart + cworkA[i];
1179         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]];
1180       }
1181     }
1182     else {
1183       if (idx) *idx = 0;
1184       if (v)   *v   = 0;
1185     }
1186   }
1187   *nz = nztot;
1188   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1189   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1190   return 0;
1191 }
1192 
1193 int MatRestoreRow_MPIAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
1194 {
1195   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
1196   if (aij->getrowactive == PETSC_FALSE) {
1197     SETERRQ(1,"MatRestoreRow_MPIAIJ:MatGetRow not called");
1198   }
1199   aij->getrowactive = PETSC_FALSE;
1200   return 0;
1201 }
1202 
1203 static int MatNorm_MPIAIJ(Mat mat,NormType type,double *norm)
1204 {
1205   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
1206   Mat_SeqAIJ *amat = (Mat_SeqAIJ*) aij->A->data, *bmat = (Mat_SeqAIJ*) aij->B->data;
1207   int        ierr, i, j, cstart = aij->cstart,shift = amat->indexshift;
1208   double     sum = 0.0;
1209   Scalar     *v;
1210 
1211   if (aij->size == 1) {
1212     ierr =  MatNorm(aij->A,type,norm); CHKERRQ(ierr);
1213   } else {
1214     if (type == NORM_FROBENIUS) {
1215       v = amat->a;
1216       for (i=0; i<amat->nz; i++ ) {
1217 #if defined(PETSC_COMPLEX)
1218         sum += real(conj(*v)*(*v)); v++;
1219 #else
1220         sum += (*v)*(*v); v++;
1221 #endif
1222       }
1223       v = bmat->a;
1224       for (i=0; i<bmat->nz; i++ ) {
1225 #if defined(PETSC_COMPLEX)
1226         sum += real(conj(*v)*(*v)); v++;
1227 #else
1228         sum += (*v)*(*v); v++;
1229 #endif
1230       }
1231       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
1232       *norm = sqrt(*norm);
1233     }
1234     else if (type == NORM_1) { /* max column norm */
1235       double *tmp, *tmp2;
1236       int    *jj, *garray = aij->garray;
1237       tmp  = (double *) PetscMalloc( aij->N*sizeof(double) ); CHKPTRQ(tmp);
1238       tmp2 = (double *) PetscMalloc( aij->N*sizeof(double) ); CHKPTRQ(tmp2);
1239       PetscMemzero(tmp,aij->N*sizeof(double));
1240       *norm = 0.0;
1241       v = amat->a; jj = amat->j;
1242       for ( j=0; j<amat->nz; j++ ) {
1243         tmp[cstart + *jj++ + shift] += PetscAbsScalar(*v);  v++;
1244       }
1245       v = bmat->a; jj = bmat->j;
1246       for ( j=0; j<bmat->nz; j++ ) {
1247         tmp[garray[*jj++ + shift]] += PetscAbsScalar(*v); v++;
1248       }
1249       MPI_Allreduce(tmp,tmp2,aij->N,MPI_DOUBLE,MPI_SUM,mat->comm);
1250       for ( j=0; j<aij->N; j++ ) {
1251         if (tmp2[j] > *norm) *norm = tmp2[j];
1252       }
1253       PetscFree(tmp); PetscFree(tmp2);
1254     }
1255     else if (type == NORM_INFINITY) { /* max row norm */
1256       double ntemp = 0.0;
1257       for ( j=0; j<amat->m; j++ ) {
1258         v = amat->a + amat->i[j] + shift;
1259         sum = 0.0;
1260         for ( i=0; i<amat->i[j+1]-amat->i[j]; i++ ) {
1261           sum += PetscAbsScalar(*v); v++;
1262         }
1263         v = bmat->a + bmat->i[j] + shift;
1264         for ( i=0; i<bmat->i[j+1]-bmat->i[j]; i++ ) {
1265           sum += PetscAbsScalar(*v); v++;
1266         }
1267         if (sum > ntemp) ntemp = sum;
1268       }
1269       MPI_Allreduce(&ntemp,norm,1,MPI_DOUBLE,MPI_MAX,mat->comm);
1270     }
1271     else {
1272       SETERRQ(1,"MatNorm_MPIAIJ:No support for two norm");
1273     }
1274   }
1275   return 0;
1276 }
1277 
1278 static int MatTranspose_MPIAIJ(Mat A,Mat *matout)
1279 {
1280   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
1281   Mat_SeqAIJ *Aloc = (Mat_SeqAIJ *) a->A->data;
1282   int        ierr,shift = Aloc->indexshift;
1283   Mat        B;
1284   int        M = a->M, N = a->N,m,*ai,*aj,row,*cols,i,*ct;
1285   Scalar     *array;
1286 
1287   if (matout == PETSC_NULL && M != N)
1288     SETERRQ(1,"MatTranspose_MPIAIJ:Square matrix only for in-place");
1289   ierr = MatCreateMPIAIJ(A->comm,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,
1290          PETSC_NULL,&B); CHKERRQ(ierr);
1291 
1292   /* copy over the A part */
1293   Aloc = (Mat_SeqAIJ*) a->A->data;
1294   m = Aloc->m; ai = Aloc->i; aj = Aloc->j; array = Aloc->a;
1295   row = a->rstart;
1296   for ( i=0; i<ai[m]+shift; i++ ) {aj[i] += a->cstart + shift;}
1297   for ( i=0; i<m; i++ ) {
1298     ierr = MatSetValues(B,ai[i+1]-ai[i],aj,1,&row,array,INSERT_VALUES);CHKERRQ(ierr);
1299     row++; array += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
1300   }
1301   aj = Aloc->j;
1302   for ( i=0; i<ai[m]+shift; i++ ) {aj[i] -= a->cstart + shift;}
1303 
1304   /* copy over the B part */
1305   Aloc = (Mat_SeqAIJ*) a->B->data;
1306   m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; array = Aloc->a;
1307   row = a->rstart;
1308   ct = cols = (int *) PetscMalloc( (1+ai[m]-shift)*sizeof(int) ); CHKPTRQ(cols);
1309   for ( i=0; i<ai[m]+shift; i++ ) {cols[i] = a->garray[aj[i]+shift];}
1310   for ( i=0; i<m; i++ ) {
1311     ierr = MatSetValues(B,ai[i+1]-ai[i],cols,1,&row,array,INSERT_VALUES);CHKERRQ(ierr);
1312     row++; array += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
1313   }
1314   PetscFree(ct);
1315   ierr = MatAssemblyBegin(B,FINAL_ASSEMBLY); CHKERRQ(ierr);
1316   ierr = MatAssemblyEnd(B,FINAL_ASSEMBLY); CHKERRQ(ierr);
1317   if (matout != PETSC_NULL) {
1318     *matout = B;
1319   } else {
1320     /* This isn't really an in-place transpose .... but free data structures from a */
1321     PetscFree(a->rowners);
1322     ierr = MatDestroy(a->A); CHKERRQ(ierr);
1323     ierr = MatDestroy(a->B); CHKERRQ(ierr);
1324     if (a->colmap) PetscFree(a->colmap);
1325     if (a->garray) PetscFree(a->garray);
1326     if (a->lvec) VecDestroy(a->lvec);
1327     if (a->Mvctx) VecScatterDestroy(a->Mvctx);
1328     PetscFree(a);
1329     PetscMemcpy(A,B,sizeof(struct _Mat));
1330     PetscHeaderDestroy(B);
1331   }
1332   return 0;
1333 }
1334 
1335 extern int MatPrintHelp_SeqAIJ(Mat);
1336 static int MatPrintHelp_MPIAIJ(Mat A)
1337 {
1338   Mat_MPIAIJ *a   = (Mat_MPIAIJ*) A->data;
1339 
1340   if (!a->rank) return MatPrintHelp_SeqAIJ(a->A);
1341   else return 0;
1342 }
1343 
1344 extern int MatConvert_MPIAIJ(Mat,MatType,Mat *);
1345 static int MatConvertSameType_MPIAIJ(Mat,Mat *,int);
1346 extern int MatIncreaseOverlap_MPIAIJ(Mat , int, IS *, int);
1347 int MatGetSubMatrices_MPIAIJ (Mat ,int , IS *,IS *,MatGetSubMatrixCall,Mat **);
1348 /* -------------------------------------------------------------------*/
1349 static struct _MatOps MatOps = {MatSetValues_MPIAIJ,
1350        MatGetRow_MPIAIJ,MatRestoreRow_MPIAIJ,
1351        MatMult_MPIAIJ,MatMultAdd_MPIAIJ,
1352        MatMultTrans_MPIAIJ,MatMultTransAdd_MPIAIJ,
1353        MatSolve_MPIAIJ,MatSolveAdd_MPIAIJ,
1354        MatSolveTrans_MPIAIJ,MatSolveTransAdd_MPIAIJ,
1355        MatLUFactor_MPIAIJ,0,
1356        MatRelax_MPIAIJ,
1357        MatTranspose_MPIAIJ,
1358        MatGetInfo_MPIAIJ,0,
1359        MatGetDiagonal_MPIAIJ,0,MatNorm_MPIAIJ,
1360        MatAssemblyBegin_MPIAIJ,MatAssemblyEnd_MPIAIJ,
1361        0,
1362        MatSetOption_MPIAIJ,MatZeroEntries_MPIAIJ,MatZeroRows_MPIAIJ,
1363        MatGetReordering_MPIAIJ,
1364        MatLUFactorSymbolic_MPIAIJ,MatLUFactorNumeric_MPIAIJ,0,0,
1365        MatGetSize_MPIAIJ,MatGetLocalSize_MPIAIJ,MatGetOwnershipRange_MPIAIJ,
1366        MatILUFactorSymbolic_MPIAIJ,0,
1367        0,0,MatConvert_MPIAIJ,0,0,MatConvertSameType_MPIAIJ,0,0,
1368        0,0,0,
1369        MatGetSubMatrices_MPIAIJ,MatIncreaseOverlap_MPIAIJ,MatGetValues_MPIAIJ,0,
1370        MatPrintHelp_MPIAIJ,
1371        MatScale_MPIAIJ};
1372 
1373 /*@C
1374    MatCreateMPIAIJ - Creates a sparse parallel matrix in AIJ format
1375    (the default parallel PETSc format).  For good matrix assembly performance
1376    the user should preallocate the matrix storage by setting the parameters
1377    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1378    performance can be increased by more than a factor of 50.
1379 
1380    Input Parameters:
1381 .  comm - MPI communicator
1382 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1383            This value should be the same as the local size used in creating the
1384            y vector for the matrix-vector product y = Ax.
1385 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1386            This value should be the same as the local size used in creating the
1387            x vector for the matrix-vector product y = Ax.
1388 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1389 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1390 .  d_nz - number of nonzeros per row in diagonal portion of local submatrix
1391            (same for all local rows)
1392 .  d_nzz - array containing the number of nonzeros in the various rows of the
1393            diagonal portion of the local submatrix (possibly different for each row)
1394            or PETSC_NULL. You must leave room for the diagonal entry even if
1395            it is zero.
1396 .  o_nz - number of nonzeros per row in the off-diagonal portion of local
1397            submatrix (same for all local rows).
1398 .  o_nzz - array containing the number of nonzeros in the various rows of the
1399            off-diagonal portion of the local submatrix (possibly different for
1400            each row) or PETSC_NULL.
1401 
1402    Output Parameter:
1403 .  A - the matrix
1404 
1405    Notes:
1406    The AIJ format (also called the Yale sparse matrix format or
1407    compressed row storage), is fully compatible with standard Fortran 77
1408    storage.  That is, the stored row and column indices can begin at
1409    either one (as in Fortran) or zero.  See the users manual for details.
1410 
1411    The user MUST specify either the local or global matrix dimensions
1412    (possibly both).
1413 
1414    By default, this format uses inodes (identical nodes) when possible.
1415    We search for consecutive rows with the same nonzero structure, thereby
1416    reusing matrix information to achieve increased efficiency.
1417 
1418    Options Database Keys:
1419 $    -mat_aij_no_inode  - Do not use inodes
1420 $    -mat_aij_inode_limit <limit> - Set inode limit.
1421 $        (max limit=5)
1422 $    -mat_aij_oneindex - Internally use indexing starting at 1
1423 $        rather than 0.  Note: When calling MatSetValues(),
1424 $        the user still MUST index entries starting at 0!
1425 
1426    Storage Information:
1427    For a square global matrix we define each processor's diagonal portion
1428    to be its local rows and the corresponding columns (a square submatrix);
1429    each processor's off-diagonal portion encompasses the remainder of the
1430    local matrix (a rectangular submatrix).
1431 
1432    The user can specify preallocated storage for the diagonal part of
1433    the local submatrix with either d_nz or d_nnz (not both).  Set
1434    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1435    memory allocation.  Likewise, specify preallocated storage for the
1436    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1437 
1438    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1439    the figure below we depict these three local rows and all columns (0-11).
1440 
1441 $          0 1 2 3 4 5 6 7 8 9 10 11
1442 $         -------------------
1443 $  row 3  |  o o o d d d o o o o o o
1444 $  row 4  |  o o o d d d o o o o o o
1445 $  row 5  |  o o o d d d o o o o o o
1446 $         -------------------
1447 $
1448 
1449    Thus, any entries in the d locations are stored in the d (diagonal)
1450    submatrix, and any entries in the o locations are stored in the
1451    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1452    stored simply in the MATSEQAIJ format for compressed row storage.
1453 
1454    Now d_nz should indicate the number of nonzeros per row in the d matrix,
1455    and o_nz should indicate the number of nonzeros per row in the o matrix.
1456    In general, for PDE problems in which most nonzeros are near the diagonal,
1457    one expects d_nz >> o_nz. For large problems you MUST preallocate memory
1458    or you will get TERRIBLE performance; see the users' manual chapter on
1459    matrices and the file $(PETSC_DIR)/Performance.
1460 
1461 .keywords: matrix, aij, compressed row, sparse, parallel
1462 
1463 .seealso: MatCreate(), MatCreateSeqAIJ(), MatSetValues()
1464 @*/
1465 int MatCreateMPIAIJ(MPI_Comm comm,int m,int n,int M,int N,
1466                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1467 {
1468   Mat          B;
1469   Mat_MPIAIJ   *b;
1470   int          ierr, i,sum[2],work[2];
1471 
1472   *A = 0;
1473   PetscHeaderCreate(B,_Mat,MAT_COOKIE,MATMPIAIJ,comm);
1474   PLogObjectCreate(B);
1475   B->data       = (void *) (b = PetscNew(Mat_MPIAIJ)); CHKPTRQ(b);
1476   PetscMemzero(b,sizeof(Mat_MPIAIJ));
1477   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1478   B->destroy    = MatDestroy_MPIAIJ;
1479   B->view       = MatView_MPIAIJ;
1480   B->factor     = 0;
1481   B->assembled  = PETSC_FALSE;
1482 
1483   b->insertmode = NOT_SET_VALUES;
1484   MPI_Comm_rank(comm,&b->rank);
1485   MPI_Comm_size(comm,&b->size);
1486 
1487   if (m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
1488     SETERRQ(1,"MatCreateMPIAIJ:Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1489 
1490   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1491     work[0] = m; work[1] = n;
1492     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
1493     if (M == PETSC_DECIDE) M = sum[0];
1494     if (N == PETSC_DECIDE) N = sum[1];
1495   }
1496   if (m == PETSC_DECIDE) {m = M/b->size + ((M % b->size) > b->rank);}
1497   if (n == PETSC_DECIDE) {n = N/b->size + ((N % b->size) > b->rank);}
1498   b->m = m; B->m = m;
1499   b->n = n; B->n = n;
1500   b->N = N; B->N = N;
1501   b->M = M; B->M = M;
1502 
1503   /* build local table of row and column ownerships */
1504   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1505   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIAIJ));
1506   b->cowners = b->rowners + b->size + 1;
1507   MPI_Allgather(&m,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
1508   b->rowners[0] = 0;
1509   for ( i=2; i<=b->size; i++ ) {
1510     b->rowners[i] += b->rowners[i-1];
1511   }
1512   b->rstart = b->rowners[b->rank];
1513   b->rend   = b->rowners[b->rank+1];
1514   MPI_Allgather(&n,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
1515   b->cowners[0] = 0;
1516   for ( i=2; i<=b->size; i++ ) {
1517     b->cowners[i] += b->cowners[i-1];
1518   }
1519   b->cstart = b->cowners[b->rank];
1520   b->cend   = b->cowners[b->rank+1];
1521 
1522   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1523   ierr = MatCreateSeqAIJ(MPI_COMM_SELF,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1524   PLogObjectParent(B,b->A);
1525   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1526   ierr = MatCreateSeqAIJ(MPI_COMM_SELF,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1527   PLogObjectParent(B,b->B);
1528 
1529   /* build cache for off array entries formed */
1530   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1531   b->colmap      = 0;
1532   b->garray      = 0;
1533   b->roworiented = 1;
1534 
1535   /* stuff used for matrix vector multiply */
1536   b->lvec      = 0;
1537   b->Mvctx     = 0;
1538 
1539   /* stuff for MatGetRow() */
1540   b->rowindices   = 0;
1541   b->rowvalues    = 0;
1542   b->getrowactive = PETSC_FALSE;
1543 
1544   *A = B;
1545   return 0;
1546 }
1547 
1548 static int MatConvertSameType_MPIAIJ(Mat matin,Mat *newmat,int cpvalues)
1549 {
1550   Mat        mat;
1551   Mat_MPIAIJ *a,*oldmat = (Mat_MPIAIJ *) matin->data;
1552   int        ierr, len=0, flg;
1553 
1554   *newmat       = 0;
1555   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIAIJ,matin->comm);
1556   PLogObjectCreate(mat);
1557   mat->data       = (void *) (a = PetscNew(Mat_MPIAIJ)); CHKPTRQ(a);
1558   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1559   mat->destroy    = MatDestroy_MPIAIJ;
1560   mat->view       = MatView_MPIAIJ;
1561   mat->factor     = matin->factor;
1562   mat->assembled  = PETSC_TRUE;
1563 
1564   a->m = mat->m   = oldmat->m;
1565   a->n = mat->n   = oldmat->n;
1566   a->M = mat->M   = oldmat->M;
1567   a->N = mat->N   = oldmat->N;
1568 
1569   a->rstart       = oldmat->rstart;
1570   a->rend         = oldmat->rend;
1571   a->cstart       = oldmat->cstart;
1572   a->cend         = oldmat->cend;
1573   a->size         = oldmat->size;
1574   a->rank         = oldmat->rank;
1575   a->insertmode   = NOT_SET_VALUES;
1576   a->rowvalues    = 0;
1577   a->getrowactive = PETSC_FALSE;
1578 
1579   a->rowners = (int *) PetscMalloc((a->size+1)*sizeof(int)); CHKPTRQ(a->rowners);
1580   PLogObjectMemory(mat,(a->size+1)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIAIJ));
1581   PetscMemcpy(a->rowners,oldmat->rowners,(a->size+1)*sizeof(int));
1582   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1583   if (oldmat->colmap) {
1584     a->colmap = (int *) PetscMalloc((a->N)*sizeof(int));CHKPTRQ(a->colmap);
1585     PLogObjectMemory(mat,(a->N)*sizeof(int));
1586     PetscMemcpy(a->colmap,oldmat->colmap,(a->N)*sizeof(int));
1587   } else a->colmap = 0;
1588   if (oldmat->garray && (len = ((Mat_SeqAIJ *) (oldmat->B->data))->n)) {
1589     a->garray = (int *) PetscMalloc(len*sizeof(int)); CHKPTRQ(a->garray);
1590     PLogObjectMemory(mat,len*sizeof(int));
1591     PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1592   } else a->garray = 0;
1593 
1594   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1595   PLogObjectParent(mat,a->lvec);
1596   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1597   PLogObjectParent(mat,a->Mvctx);
1598   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1599   PLogObjectParent(mat,a->A);
1600   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1601   PLogObjectParent(mat,a->B);
1602   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1603   if (flg) {
1604     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1605   }
1606   *newmat = mat;
1607   return 0;
1608 }
1609 
1610 #include "sys.h"
1611 
1612 int MatLoad_MPIAIJ(Viewer viewer,MatType type,Mat *newmat)
1613 {
1614   Mat          A;
1615   int          i, nz, ierr, j,rstart, rend, fd;
1616   Scalar       *vals,*svals;
1617   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1618   MPI_Status   status;
1619   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,maxnz,*cols;
1620   int          *ourlens,*sndcounts = 0,*procsnz = 0, *offlens,jj,*mycols,*smycols;
1621   int          tag = ((PetscObject)viewer)->tag;
1622 
1623   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1624   if (!rank) {
1625     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1626     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
1627     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIAIJ:not matrix object");
1628   }
1629 
1630   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1631   M = header[1]; N = header[2];
1632   /* determine ownership of all rows */
1633   m = M/size + ((M % size) > rank);
1634   rowners = (int *) PetscMalloc((size+2)*sizeof(int)); CHKPTRQ(rowners);
1635   MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1636   rowners[0] = 0;
1637   for ( i=2; i<=size; i++ ) {
1638     rowners[i] += rowners[i-1];
1639   }
1640   rstart = rowners[rank];
1641   rend   = rowners[rank+1];
1642 
1643   /* distribute row lengths to all processors */
1644   ourlens = (int*) PetscMalloc( 2*(rend-rstart)*sizeof(int) ); CHKPTRQ(ourlens);
1645   offlens = ourlens + (rend-rstart);
1646   if (!rank) {
1647     rowlengths = (int*) PetscMalloc( M*sizeof(int) ); CHKPTRQ(rowlengths);
1648     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1649     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1650     for ( i=0; i<size; i++ ) sndcounts[i] = rowners[i+1] - rowners[i];
1651     MPI_Scatterv(rowlengths,sndcounts,rowners,MPI_INT,ourlens,rend-rstart,MPI_INT,0,comm);
1652     PetscFree(sndcounts);
1653   }
1654   else {
1655     MPI_Scatterv(0,0,0,MPI_INT,ourlens,rend-rstart,MPI_INT, 0,comm);
1656   }
1657 
1658   if (!rank) {
1659     /* calculate the number of nonzeros on each processor */
1660     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1661     PetscMemzero(procsnz,size*sizeof(int));
1662     for ( i=0; i<size; i++ ) {
1663       for ( j=rowners[i]; j< rowners[i+1]; j++ ) {
1664         procsnz[i] += rowlengths[j];
1665       }
1666     }
1667     PetscFree(rowlengths);
1668 
1669     /* determine max buffer needed and allocate it */
1670     maxnz = 0;
1671     for ( i=0; i<size; i++ ) {
1672       maxnz = PetscMax(maxnz,procsnz[i]);
1673     }
1674     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1675 
1676     /* read in my part of the matrix column indices  */
1677     nz = procsnz[0];
1678     mycols = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1679     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1680 
1681     /* read in every one elses and ship off */
1682     for ( i=1; i<size; i++ ) {
1683       nz = procsnz[i];
1684       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1685       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1686     }
1687     PetscFree(cols);
1688   }
1689   else {
1690     /* determine buffer space needed for message */
1691     nz = 0;
1692     for ( i=0; i<m; i++ ) {
1693       nz += ourlens[i];
1694     }
1695     mycols = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1696 
1697     /* receive message of column indices*/
1698     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1699     MPI_Get_count(&status,MPI_INT,&maxnz);
1700     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIAIJ:something is wrong with file");
1701   }
1702 
1703   /* loop over local rows, determining number of off diagonal entries */
1704   PetscMemzero(offlens,m*sizeof(int));
1705   jj = 0;
1706   for ( i=0; i<m; i++ ) {
1707     for ( j=0; j<ourlens[i]; j++ ) {
1708       if (mycols[jj] < rstart || mycols[jj] >= rend) offlens[i]++;
1709       jj++;
1710     }
1711   }
1712 
1713   /* create our matrix */
1714   for ( i=0; i<m; i++ ) {
1715     ourlens[i] -= offlens[i];
1716   }
1717   ierr = MatCreateMPIAIJ(comm,m,PETSC_DECIDE,M,N,0,ourlens,0,offlens,newmat);CHKERRQ(ierr);
1718   A = *newmat;
1719   MatSetOption(A,COLUMNS_SORTED);
1720   for ( i=0; i<m; i++ ) {
1721     ourlens[i] += offlens[i];
1722   }
1723 
1724   if (!rank) {
1725     vals = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(vals);
1726 
1727     /* read in my part of the matrix numerical values  */
1728     nz = procsnz[0];
1729     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1730 
1731     /* insert into matrix */
1732     jj      = rstart;
1733     smycols = mycols;
1734     svals   = vals;
1735     for ( i=0; i<m; i++ ) {
1736       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1737       smycols += ourlens[i];
1738       svals   += ourlens[i];
1739       jj++;
1740     }
1741 
1742     /* read in other processors and ship out */
1743     for ( i=1; i<size; i++ ) {
1744       nz = procsnz[i];
1745       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1746       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1747     }
1748     PetscFree(procsnz);
1749   }
1750   else {
1751     /* receive numeric values */
1752     vals = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(vals);
1753 
1754     /* receive message of values*/
1755     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1756     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1757     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIAIJ:something is wrong with file");
1758 
1759     /* insert into matrix */
1760     jj      = rstart;
1761     smycols = mycols;
1762     svals   = vals;
1763     for ( i=0; i<m; i++ ) {
1764       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1765       smycols += ourlens[i];
1766       svals   += ourlens[i];
1767       jj++;
1768     }
1769   }
1770   PetscFree(ourlens); PetscFree(vals); PetscFree(mycols); PetscFree(rowners);
1771 
1772   ierr = MatAssemblyBegin(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
1773   ierr = MatAssemblyEnd(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
1774   return 0;
1775 }
1776