xref: /petsc/src/mat/impls/aij/mpi/mpiaij.c (revision 49f289441cf4e0553157c9cbf75d3484d13cb3f6)
1 #ifndef lint
2 static char vcid[] = "$Id: mpiaij.c,v 1.153 1996/07/08 22:19:18 bsmith Exp balay $";
3 #endif
4 
5 #include "mpiaij.h"
6 #include "src/vec/vecimpl.h"
7 #include "src/inline/spops.h"
8 
9 /* local utility routine that creates a mapping from the global column
10 number to the local number in the off-diagonal part of the local
11 storage of the matrix.  This is done in a non scable way since the
12 length of colmap equals the global matrix length.
13 */
14 static int CreateColmap_Private(Mat mat)
15 {
16   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
17   Mat_SeqAIJ *B = (Mat_SeqAIJ*) aij->B->data;
18   int        n = B->n,i,shift = B->indexshift;
19 
20   aij->colmap = (int *) PetscMalloc(aij->N*sizeof(int));CHKPTRQ(aij->colmap);
21   PLogObjectMemory(mat,aij->N*sizeof(int));
22   PetscMemzero(aij->colmap,aij->N*sizeof(int));
23   for ( i=0; i<n; i++ ) aij->colmap[aij->garray[i]] = i-shift;
24   return 0;
25 }
26 
27 extern int DisAssemble_MPIAIJ(Mat);
28 
29 static int MatGetReordering_MPIAIJ(Mat mat,MatReordering type,IS *rperm,IS *cperm)
30 {
31   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
32   int        ierr;
33   if (aij->size == 1) {
34     ierr = MatGetReordering(aij->A,type,rperm,cperm); CHKERRQ(ierr);
35   } else SETERRQ(1,"MatGetReordering_MPIAIJ:not supported in parallel");
36   return 0;
37 }
38 
39 static int MatSetValues_MPIAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
40 {
41   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
42   Mat_SeqAIJ *C = (Mat_SeqAIJ*) aij->A->data;
43   Scalar     value;
44   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
45   int        cstart = aij->cstart, cend = aij->cend,row,col;
46   int        shift = C->indexshift,roworiented = aij->roworiented;
47 
48   if (aij->insertmode != NOT_SET_VALUES && aij->insertmode != addv) {
49     SETERRQ(1,"MatSetValues_MPIAIJ:Cannot mix inserts and adds");
50   }
51   aij->insertmode = addv;
52   for ( i=0; i<m; i++ ) {
53     if (im[i] < 0) SETERRQ(1,"MatSetValues_MPIAIJ:Negative row");
54     if (im[i] >= aij->M) SETERRQ(1,"MatSetValues_MPIAIJ:Row too large");
55     if (im[i] >= rstart && im[i] < rend) {
56       row = im[i] - rstart;
57       for ( j=0; j<n; j++ ) {
58         if (in[j] < 0) SETERRQ(1,"MatSetValues_MPIAIJ:Negative column");
59         if (in[j] >= aij->N) SETERRQ(1,"MatSetValues_MPIAIJ:Col too large");
60         if (in[j] >= cstart && in[j] < cend){
61           col = in[j] - cstart;
62           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
63           ierr = MatSetValues(aij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
64         }
65         else {
66           if (mat->was_assembled) {
67             if (!aij->colmap) {ierr = CreateColmap_Private(mat);CHKERRQ(ierr);}
68             col = aij->colmap[in[j]] + shift;
69             if (col < 0 && !((Mat_SeqAIJ*)(aij->A->data))->nonew) {
70               ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
71               col =  in[j];
72             }
73           }
74           else col = in[j];
75           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
76           ierr = MatSetValues(aij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
77         }
78       }
79     }
80     else {
81       if (roworiented) {
82         ierr = StashValues_Private(&aij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
83       }
84       else {
85         row = im[i];
86         for ( j=0; j<n; j++ ) {
87           ierr = StashValues_Private(&aij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
88         }
89       }
90     }
91   }
92   return 0;
93 }
94 
95 static int MatGetValues_MPIAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
96 {
97   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
98   Mat_SeqAIJ *C = (Mat_SeqAIJ*) aij->A->data;
99   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
100   int        cstart = aij->cstart, cend = aij->cend,row,col;
101   int        shift = C->indexshift;
102 
103   for ( i=0; i<m; i++ ) {
104     if (idxm[i] < 0) SETERRQ(1,"MatGetValues_MPIAIJ:Negative row");
105     if (idxm[i] >= aij->M) SETERRQ(1,"MatGetValues_MPIAIJ:Row too large");
106     if (idxm[i] >= rstart && idxm[i] < rend) {
107       row = idxm[i] - rstart;
108       for ( j=0; j<n; j++ ) {
109         if (idxn[j] < 0) SETERRQ(1,"MatGetValues_MPIAIJ:Negative column");
110         if (idxn[j] >= aij->N) SETERRQ(1,"MatGetValues_MPIAIJ:Col too large");
111         if (idxn[j] >= cstart && idxn[j] < cend){
112           col = idxn[j] - cstart;
113           ierr = MatGetValues(aij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
114         }
115         else {
116           if (!aij->colmap) {ierr = CreateColmap_Private(mat);CHKERRQ(ierr);}
117           col = aij->colmap[idxn[j]] + shift;
118           if ( aij->garray[col] != idxn[j] ) *(v+i*n+j) = 0.0;
119           else {
120             ierr = MatGetValues(aij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
121           }
122         }
123       }
124     }
125     else {
126       SETERRQ(1,"MatGetValues_MPIAIJ:Only local values currently supported");
127     }
128   }
129   return 0;
130 }
131 
132 static int MatAssemblyBegin_MPIAIJ(Mat mat,MatAssemblyType mode)
133 {
134   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
135   MPI_Comm    comm = mat->comm;
136   int         size = aij->size, *owners = aij->rowners;
137   int         rank = aij->rank,tag = mat->tag, *owner,*starts,count,ierr;
138   MPI_Request *send_waits,*recv_waits;
139   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
140   InsertMode  addv;
141   Scalar      *rvalues,*svalues;
142 
143   /* make sure all processors are either in INSERTMODE or ADDMODE */
144   MPI_Allreduce(&aij->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
145   if (addv == (ADD_VALUES|INSERT_VALUES)) {
146     SETERRQ(1,"MatAssemblyBegin_MPIAIJ:Some processors inserted others added");
147   }
148   aij->insertmode = addv; /* in case this processor had no cache */
149 
150   /*  first count number of contributors to each processor */
151   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
152   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
153   owner = (int *) PetscMalloc( (aij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
154   for ( i=0; i<aij->stash.n; i++ ) {
155     idx = aij->stash.idx[i];
156     for ( j=0; j<size; j++ ) {
157       if (idx >= owners[j] && idx < owners[j+1]) {
158         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
159       }
160     }
161   }
162   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
163 
164   /* inform other processors of number of messages and max length*/
165   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
166   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
167   nreceives = work[rank];
168   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
169   nmax = work[rank];
170   PetscFree(work);
171 
172   /* post receives:
173        1) each message will consist of ordered pairs
174      (global index,value) we store the global index as a double
175      to simplify the message passing.
176        2) since we don't know how long each individual message is we
177      allocate the largest needed buffer for each receive. Potentially
178      this is a lot of wasted space.
179 
180 
181        This could be done better.
182   */
183   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
184   CHKPTRQ(rvalues);
185   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
186   CHKPTRQ(recv_waits);
187   for ( i=0; i<nreceives; i++ ) {
188     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
189               comm,recv_waits+i);
190   }
191 
192   /* do sends:
193       1) starts[i] gives the starting index in svalues for stuff going to
194          the ith processor
195   */
196   svalues = (Scalar *) PetscMalloc(3*(aij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
197   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
198   CHKPTRQ(send_waits);
199   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
200   starts[0] = 0;
201   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
202   for ( i=0; i<aij->stash.n; i++ ) {
203     svalues[3*starts[owner[i]]]       = (Scalar)  aij->stash.idx[i];
204     svalues[3*starts[owner[i]]+1]     = (Scalar)  aij->stash.idy[i];
205     svalues[3*(starts[owner[i]]++)+2] =  aij->stash.array[i];
206   }
207   PetscFree(owner);
208   starts[0] = 0;
209   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
210   count = 0;
211   for ( i=0; i<size; i++ ) {
212     if (procs[i]) {
213       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
214                 comm,send_waits+count++);
215     }
216   }
217   PetscFree(starts); PetscFree(nprocs);
218 
219   /* Free cache space */
220   PLogInfo(0,"[%d]MatAssemblyBegin_MPIAIJ:Number of off processor values %d\n",rank,aij->stash.n);
221   ierr = StashDestroy_Private(&aij->stash); CHKERRQ(ierr);
222 
223   aij->svalues    = svalues;    aij->rvalues    = rvalues;
224   aij->nsends     = nsends;     aij->nrecvs     = nreceives;
225   aij->send_waits = send_waits; aij->recv_waits = recv_waits;
226   aij->rmax       = nmax;
227 
228   return 0;
229 }
230 extern int MatSetUpMultiply_MPIAIJ(Mat);
231 
232 static int MatAssemblyEnd_MPIAIJ(Mat mat,MatAssemblyType mode)
233 {
234   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
235   Mat_SeqAIJ *C = (Mat_SeqAIJ *) aij->A->data;
236   MPI_Status  *send_status,recv_status;
237   int         imdex,nrecvs = aij->nrecvs, count = nrecvs, i, n, ierr;
238   int         row,col,other_disassembled,shift = C->indexshift;
239   Scalar      *values,val;
240   InsertMode  addv = aij->insertmode;
241 
242   /*  wait on receives */
243   while (count) {
244     MPI_Waitany(nrecvs,aij->recv_waits,&imdex,&recv_status);
245     /* unpack receives into our local space */
246     values = aij->rvalues + 3*imdex*aij->rmax;
247     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
248     n = n/3;
249     for ( i=0; i<n; i++ ) {
250       row = (int) PetscReal(values[3*i]) - aij->rstart;
251       col = (int) PetscReal(values[3*i+1]);
252       val = values[3*i+2];
253       if (col >= aij->cstart && col < aij->cend) {
254         col -= aij->cstart;
255         MatSetValues(aij->A,1,&row,1,&col,&val,addv);
256       }
257       else {
258         if (mat->was_assembled) {
259           if (!aij->colmap) {ierr = CreateColmap_Private(mat);CHKERRQ(ierr);}
260           col = aij->colmap[col] + shift;
261           if (col < 0  && !((Mat_SeqAIJ*)(aij->A->data))->nonew) {
262             ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
263             col = (int) PetscReal(values[3*i+1]);
264           }
265         }
266         MatSetValues(aij->B,1,&row,1,&col,&val,addv);
267       }
268     }
269     count--;
270   }
271   PetscFree(aij->recv_waits); PetscFree(aij->rvalues);
272 
273   /* wait on sends */
274   if (aij->nsends) {
275     send_status = (MPI_Status *) PetscMalloc(aij->nsends*sizeof(MPI_Status));
276     CHKPTRQ(send_status);
277     MPI_Waitall(aij->nsends,aij->send_waits,send_status);
278     PetscFree(send_status);
279   }
280   PetscFree(aij->send_waits); PetscFree(aij->svalues);
281 
282   aij->insertmode = NOT_SET_VALUES;
283   ierr = MatAssemblyBegin(aij->A,mode); CHKERRQ(ierr);
284   ierr = MatAssemblyEnd(aij->A,mode); CHKERRQ(ierr);
285 
286   /* determine if any processor has disassembled, if so we must
287      also disassemble ourselfs, in order that we may reassemble. */
288   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
289   if (mat->was_assembled && !other_disassembled) {
290     ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
291   }
292 
293   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
294     ierr = MatSetUpMultiply_MPIAIJ(mat); CHKERRQ(ierr);
295   }
296   ierr = MatAssemblyBegin(aij->B,mode); CHKERRQ(ierr);
297   ierr = MatAssemblyEnd(aij->B,mode); CHKERRQ(ierr);
298 
299   if (aij->rowvalues) {PetscFree(aij->rowvalues); aij->rowvalues = 0;}
300   return 0;
301 }
302 
303 static int MatZeroEntries_MPIAIJ(Mat A)
304 {
305   Mat_MPIAIJ *l = (Mat_MPIAIJ *) A->data;
306   int        ierr;
307   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
308   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
309   return 0;
310 }
311 
312 /* the code does not do the diagonal entries correctly unless the
313    matrix is square and the column and row owerships are identical.
314    This is a BUG. The only way to fix it seems to be to access
315    aij->A and aij->B directly and not through the MatZeroRows()
316    routine.
317 */
318 static int MatZeroRows_MPIAIJ(Mat A,IS is,Scalar *diag)
319 {
320   Mat_MPIAIJ     *l = (Mat_MPIAIJ *) A->data;
321   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
322   int            *procs,*nprocs,j,found,idx,nsends,*work;
323   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
324   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
325   int            *lens,imdex,*lrows,*values;
326   MPI_Comm       comm = A->comm;
327   MPI_Request    *send_waits,*recv_waits;
328   MPI_Status     recv_status,*send_status;
329   IS             istmp;
330 
331   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
332   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
333 
334   /*  first count number of contributors to each processor */
335   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
336   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
337   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
338   for ( i=0; i<N; i++ ) {
339     idx = rows[i];
340     found = 0;
341     for ( j=0; j<size; j++ ) {
342       if (idx >= owners[j] && idx < owners[j+1]) {
343         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
344       }
345     }
346     if (!found) SETERRQ(1,"MatZeroRows_MPIAIJ:Index out of range");
347   }
348   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
349 
350   /* inform other processors of number of messages and max length*/
351   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
352   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
353   nrecvs = work[rank];
354   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
355   nmax = work[rank];
356   PetscFree(work);
357 
358   /* post receives:   */
359   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
360   CHKPTRQ(rvalues);
361   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
362   CHKPTRQ(recv_waits);
363   for ( i=0; i<nrecvs; i++ ) {
364     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
365   }
366 
367   /* do sends:
368       1) starts[i] gives the starting index in svalues for stuff going to
369          the ith processor
370   */
371   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
372   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
373   CHKPTRQ(send_waits);
374   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
375   starts[0] = 0;
376   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
377   for ( i=0; i<N; i++ ) {
378     svalues[starts[owner[i]]++] = rows[i];
379   }
380   ISRestoreIndices(is,&rows);
381 
382   starts[0] = 0;
383   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
384   count = 0;
385   for ( i=0; i<size; i++ ) {
386     if (procs[i]) {
387       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
388     }
389   }
390   PetscFree(starts);
391 
392   base = owners[rank];
393 
394   /*  wait on receives */
395   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
396   source = lens + nrecvs;
397   count  = nrecvs; slen = 0;
398   while (count) {
399     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
400     /* unpack receives into our local space */
401     MPI_Get_count(&recv_status,MPI_INT,&n);
402     source[imdex]  = recv_status.MPI_SOURCE;
403     lens[imdex]  = n;
404     slen += n;
405     count--;
406   }
407   PetscFree(recv_waits);
408 
409   /* move the data into the send scatter */
410   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
411   count = 0;
412   for ( i=0; i<nrecvs; i++ ) {
413     values = rvalues + i*nmax;
414     for ( j=0; j<lens[i]; j++ ) {
415       lrows[count++] = values[j] - base;
416     }
417   }
418   PetscFree(rvalues); PetscFree(lens);
419   PetscFree(owner); PetscFree(nprocs);
420 
421   /* actually zap the local rows */
422   ierr = ISCreateSeq(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
423   PLogObjectParent(A,istmp);
424   PetscFree(lrows);
425   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
426   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
427   ierr = ISDestroy(istmp); CHKERRQ(ierr);
428 
429   /* wait on sends */
430   if (nsends) {
431     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
432     CHKPTRQ(send_status);
433     MPI_Waitall(nsends,send_waits,send_status);
434     PetscFree(send_status);
435   }
436   PetscFree(send_waits); PetscFree(svalues);
437 
438   return 0;
439 }
440 
441 static int MatMult_MPIAIJ(Mat A,Vec xx,Vec yy)
442 {
443   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
444   int        ierr,nt;
445 
446   ierr = VecGetLocalSize(xx,&nt);  CHKERRQ(ierr);
447   if (nt != a->n) {
448     SETERRQ(1,"MatMult_MPIAIJ:Incompatible parition of A and xx");
449   }
450   ierr = VecGetLocalSize(yy,&nt);  CHKERRQ(ierr);
451   if (nt != a->m) {
452     SETERRQ(1,"MatMult_MPIAIJ:Incompatible parition of A and yy");
453   }
454   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx); CHKERRQ(ierr);
455   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
456   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx); CHKERRQ(ierr);
457   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
458   return 0;
459 }
460 
461 static int MatMultAdd_MPIAIJ(Mat A,Vec xx,Vec yy,Vec zz)
462 {
463   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
464   int        ierr;
465   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
466   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
467   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
468   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
469   return 0;
470 }
471 
472 static int MatMultTrans_MPIAIJ(Mat A,Vec xx,Vec yy)
473 {
474   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
475   int        ierr;
476 
477   /* do nondiagonal part */
478   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
479   /* send it on its way */
480   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,
481                 (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
482   /* do local part */
483   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
484   /* receive remote parts: note this assumes the values are not actually */
485   /* inserted in yy until the next line, which is true for my implementation*/
486   /* but is not perhaps always true. */
487   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,
488                   (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
489   return 0;
490 }
491 
492 static int MatMultTransAdd_MPIAIJ(Mat A,Vec xx,Vec yy,Vec zz)
493 {
494   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
495   int        ierr;
496 
497   /* do nondiagonal part */
498   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
499   /* send it on its way */
500   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,
501                  (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
502   /* do local part */
503   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
504   /* receive remote parts: note this assumes the values are not actually */
505   /* inserted in yy until the next line, which is true for my implementation*/
506   /* but is not perhaps always true. */
507   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,
508                   (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
509   return 0;
510 }
511 
512 /*
513   This only works correctly for square matrices where the subblock A->A is the
514    diagonal block
515 */
516 static int MatGetDiagonal_MPIAIJ(Mat A,Vec v)
517 {
518   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
519   if (a->M != a->N)
520     SETERRQ(1,"MatGetDiagonal_MPIAIJ:Supports only square matrix where A->A is diag block");
521   return MatGetDiagonal(a->A,v);
522 }
523 
524 static int MatScale_MPIAIJ(Scalar *aa,Mat A)
525 {
526   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
527   int        ierr;
528   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
529   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
530   return 0;
531 }
532 
533 static int MatDestroy_MPIAIJ(PetscObject obj)
534 {
535   Mat        mat = (Mat) obj;
536   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
537   int        ierr;
538 #if defined(PETSC_LOG)
539   PLogObjectState(obj,"Rows=%d, Cols=%d",aij->M,aij->N);
540 #endif
541   PetscFree(aij->rowners);
542   ierr = MatDestroy(aij->A); CHKERRQ(ierr);
543   ierr = MatDestroy(aij->B); CHKERRQ(ierr);
544   if (aij->colmap) PetscFree(aij->colmap);
545   if (aij->garray) PetscFree(aij->garray);
546   if (aij->lvec)   VecDestroy(aij->lvec);
547   if (aij->Mvctx)  VecScatterDestroy(aij->Mvctx);
548   if (aij->rowvalues) PetscFree(aij->rowvalues);
549   PetscFree(aij);
550   PLogObjectDestroy(mat);
551   PetscHeaderDestroy(mat);
552   return 0;
553 }
554 #include "draw.h"
555 #include "pinclude/pviewer.h"
556 
557 static int MatView_MPIAIJ_Binary(Mat mat,Viewer viewer)
558 {
559   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
560   int         ierr;
561 
562   if (aij->size == 1) {
563     ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
564   }
565   else SETERRQ(1,"MatView_MPIAIJ_Binary:Only uniprocessor output supported");
566   return 0;
567 }
568 
569 static int MatView_MPIAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
570 {
571   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
572   Mat_SeqAIJ* C = (Mat_SeqAIJ*)aij->A->data;
573   int         ierr, format,shift = C->indexshift,rank;
574   FILE        *fd;
575   ViewerType  vtype;
576 
577   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
578   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
579     ierr = ViewerGetFormat(viewer,&format);
580     if (format == ASCII_FORMAT_INFO_DETAILED) {
581       int nz, nzalloc, mem, flg;
582       MPI_Comm_rank(mat->comm,&rank);
583       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
584       ierr = MatGetInfo(mat,MAT_LOCAL,&nz,&nzalloc,&mem);
585       ierr = OptionsHasName(PETSC_NULL,"-mat_aij_no_inode",&flg); CHKERRQ(ierr);
586       PetscSequentialPhaseBegin(mat->comm,1);
587       if (flg) fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d mem %d, not using I-node routines\n",
588          rank,aij->m,nz,nzalloc,mem);
589       else fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d mem %d, using I-node routines\n",
590          rank,aij->m,nz,nzalloc,mem);
591       ierr = MatGetInfo(aij->A,MAT_LOCAL,&nz,&nzalloc,&mem);
592       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,nz);
593       ierr = MatGetInfo(aij->B,MAT_LOCAL,&nz,&nzalloc,&mem);
594       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,nz);
595       fflush(fd);
596       PetscSequentialPhaseEnd(mat->comm,1);
597       ierr = VecScatterView(aij->Mvctx,viewer); CHKERRQ(ierr);
598       return 0;
599     }
600     else if (format == ASCII_FORMAT_INFO) {
601       return 0;
602     }
603   }
604 
605   if (vtype == DRAW_VIEWER) {
606     Draw       draw;
607     PetscTruth isnull;
608     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
609     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
610   }
611 
612   if (vtype == ASCII_FILE_VIEWER) {
613     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
614     PetscSequentialPhaseBegin(mat->comm,1);
615     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
616            aij->rank,aij->m,aij->rstart,aij->rend,aij->n,aij->cstart,
617            aij->cend);
618     ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
619     ierr = MatView(aij->B,viewer); CHKERRQ(ierr);
620     fflush(fd);
621     PetscSequentialPhaseEnd(mat->comm,1);
622   }
623   else {
624     int size = aij->size;
625     rank = aij->rank;
626     if (size == 1) {
627       ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
628     }
629     else {
630       /* assemble the entire matrix onto first processor. */
631       Mat         A;
632       Mat_SeqAIJ *Aloc;
633       int         M = aij->M, N = aij->N,m,*ai,*aj,row,*cols,i,*ct;
634       Scalar      *a;
635 
636       if (!rank) {
637         ierr = MatCreateMPIAIJ(mat->comm,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
638                CHKERRQ(ierr);
639       }
640       else {
641         ierr = MatCreateMPIAIJ(mat->comm,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
642                CHKERRQ(ierr);
643       }
644       PLogObjectParent(mat,A);
645 
646       /* copy over the A part */
647       Aloc = (Mat_SeqAIJ*) aij->A->data;
648       m = Aloc->m; ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
649       row = aij->rstart;
650       for ( i=0; i<ai[m]+shift; i++ ) {aj[i] += aij->cstart + shift;}
651       for ( i=0; i<m; i++ ) {
652         ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],aj,a,INSERT_VALUES);CHKERRQ(ierr);
653         row++; a += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
654       }
655       aj = Aloc->j;
656       for ( i=0; i<ai[m]+shift; i++ ) {aj[i] -= aij->cstart + shift;}
657 
658       /* copy over the B part */
659       Aloc = (Mat_SeqAIJ*) aij->B->data;
660       m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
661       row = aij->rstart;
662       ct = cols = (int *) PetscMalloc( (ai[m]+1)*sizeof(int) ); CHKPTRQ(cols);
663       for ( i=0; i<ai[m]+shift; i++ ) {cols[i] = aij->garray[aj[i]+shift];}
664       for ( i=0; i<m; i++ ) {
665         ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],cols,a,INSERT_VALUES);CHKERRQ(ierr);
666         row++; a += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
667       }
668       PetscFree(ct);
669       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
670       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
671       if (!rank) {
672         ierr = MatView(((Mat_MPIAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
673       }
674       ierr = MatDestroy(A); CHKERRQ(ierr);
675     }
676   }
677   return 0;
678 }
679 
680 static int MatView_MPIAIJ(PetscObject obj,Viewer viewer)
681 {
682   Mat         mat = (Mat) obj;
683   int         ierr;
684   ViewerType  vtype;
685 
686   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
687   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
688       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
689     ierr = MatView_MPIAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
690   }
691   else if (vtype == BINARY_FILE_VIEWER) {
692     return MatView_MPIAIJ_Binary(mat,viewer);
693   }
694   return 0;
695 }
696 
697 /*
698     This has to provide several versions.
699 
700      1) per sequential
701      2) a) use only local smoothing updating outer values only once.
702         b) local smoothing updating outer values each inner iteration
703      3) color updating out values betwen colors.
704 */
705 static int MatRelax_MPIAIJ(Mat matin,Vec bb,double omega,MatSORType flag,
706                            double fshift,int its,Vec xx)
707 {
708   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
709   Mat        AA = mat->A, BB = mat->B;
710   Mat_SeqAIJ *A = (Mat_SeqAIJ *) AA->data, *B = (Mat_SeqAIJ *)BB->data;
711   Scalar     zero = 0.0,*b,*x,*xs,*ls,d,*v,sum,scale,*t,*ts;
712   int        ierr,*idx, *diag;
713   int        n = mat->n, m = mat->m, i,shift = A->indexshift;
714   Vec        tt;
715 
716   VecGetArray(xx,&x); VecGetArray(bb,&b); VecGetArray(mat->lvec,&ls);
717   xs = x + shift; /* shift by one for index start of 1 */
718   ls = ls + shift;
719   if (!A->diag) {if ((ierr = MatMarkDiag_SeqAIJ(AA))) return ierr;}
720   diag = A->diag;
721   if (flag == SOR_APPLY_UPPER || flag == SOR_APPLY_LOWER) {
722     SETERRQ(1,"MatRelax_MPIAIJ:Option not supported");
723   }
724   if (flag & SOR_EISENSTAT) {
725     /* Let  A = L + U + D; where L is lower trianglar,
726     U is upper triangular, E is diagonal; This routine applies
727 
728             (L + E)^{-1} A (U + E)^{-1}
729 
730     to a vector efficiently using Eisenstat's trick. This is for
731     the case of SSOR preconditioner, so E is D/omega where omega
732     is the relaxation factor.
733     */
734     ierr = VecDuplicate(xx,&tt); CHKERRQ(ierr);
735     PLogObjectParent(matin,tt);
736     VecGetArray(tt,&t);
737     scale = (2.0/omega) - 1.0;
738     /*  x = (E + U)^{-1} b */
739     VecSet(&zero,mat->lvec);
740     ierr = VecPipelineBegin(xx,mat->lvec,INSERT_VALUES,PIPELINE_UP,
741                               mat->Mvctx); CHKERRQ(ierr);
742     for ( i=m-1; i>-1; i-- ) {
743       n    = A->i[i+1] - diag[i] - 1;
744       idx  = A->j + diag[i] + !shift;
745       v    = A->a + diag[i] + !shift;
746       sum  = b[i];
747       SPARSEDENSEMDOT(sum,xs,v,idx,n);
748       d    = fshift + A->a[diag[i]+shift];
749       n    = B->i[i+1] - B->i[i];
750       idx  = B->j + B->i[i] + shift;
751       v    = B->a + B->i[i] + shift;
752       SPARSEDENSEMDOT(sum,ls,v,idx,n);
753       x[i] = omega*(sum/d);
754     }
755     ierr = VecPipelineEnd(xx,mat->lvec,INSERT_VALUES,PIPELINE_UP,
756                             mat->Mvctx); CHKERRQ(ierr);
757 
758     /*  t = b - (2*E - D)x */
759     v = A->a;
760     for ( i=0; i<m; i++ ) { t[i] = b[i] - scale*(v[*diag++ + shift])*x[i]; }
761 
762     /*  t = (E + L)^{-1}t */
763     ts = t + shift; /* shifted by one for index start of a or mat->j*/
764     diag = A->diag;
765     VecSet(&zero,mat->lvec);
766     ierr = VecPipelineBegin(tt,mat->lvec,INSERT_VALUES,PIPELINE_DOWN,
767                                                  mat->Mvctx); CHKERRQ(ierr);
768     for ( i=0; i<m; i++ ) {
769       n    = diag[i] - A->i[i];
770       idx  = A->j + A->i[i] + shift;
771       v    = A->a + A->i[i] + shift;
772       sum  = t[i];
773       SPARSEDENSEMDOT(sum,ts,v,idx,n);
774       d    = fshift + A->a[diag[i]+shift];
775       n    = B->i[i+1] - B->i[i];
776       idx  = B->j + B->i[i] + shift;
777       v    = B->a + B->i[i] + shift;
778       SPARSEDENSEMDOT(sum,ls,v,idx,n);
779       t[i] = omega*(sum/d);
780     }
781     ierr = VecPipelineEnd(tt,mat->lvec,INSERT_VALUES,PIPELINE_DOWN,
782                                                     mat->Mvctx); CHKERRQ(ierr);
783     /*  x = x + t */
784     for ( i=0; i<m; i++ ) { x[i] += t[i]; }
785     VecDestroy(tt);
786     return 0;
787   }
788 
789 
790   if ((flag & SOR_SYMMETRIC_SWEEP) == SOR_SYMMETRIC_SWEEP){
791     if (flag & SOR_ZERO_INITIAL_GUESS) {
792       VecSet(&zero,mat->lvec); VecSet(&zero,xx);
793     }
794     else {
795       ierr=VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_UP,mat->Mvctx);
796       CHKERRQ(ierr);
797       ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_UP,mat->Mvctx);
798       CHKERRQ(ierr);
799     }
800     while (its--) {
801       /* go down through the rows */
802       ierr = VecPipelineBegin(xx,mat->lvec,INSERT_VALUES,PIPELINE_DOWN,
803                               mat->Mvctx); CHKERRQ(ierr);
804       for ( i=0; i<m; i++ ) {
805         n    = A->i[i+1] - A->i[i];
806         idx  = A->j + A->i[i] + shift;
807         v    = A->a + A->i[i] + shift;
808         sum  = b[i];
809         SPARSEDENSEMDOT(sum,xs,v,idx,n);
810         d    = fshift + A->a[diag[i]+shift];
811         n    = B->i[i+1] - B->i[i];
812         idx  = B->j + B->i[i] + shift;
813         v    = B->a + B->i[i] + shift;
814         SPARSEDENSEMDOT(sum,ls,v,idx,n);
815         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
816       }
817       ierr = VecPipelineEnd(xx,mat->lvec,INSERT_VALUES,PIPELINE_DOWN,
818                             mat->Mvctx); CHKERRQ(ierr);
819       /* come up through the rows */
820       ierr = VecPipelineBegin(xx,mat->lvec,INSERT_VALUES,PIPELINE_UP,
821                               mat->Mvctx); CHKERRQ(ierr);
822       for ( i=m-1; i>-1; i-- ) {
823         n    = A->i[i+1] - A->i[i];
824         idx  = A->j + A->i[i] + shift;
825         v    = A->a + A->i[i] + shift;
826         sum  = b[i];
827         SPARSEDENSEMDOT(sum,xs,v,idx,n);
828         d    = fshift + A->a[diag[i]+shift];
829         n    = B->i[i+1] - B->i[i];
830         idx  = B->j + B->i[i] + shift;
831         v    = B->a + B->i[i] + shift;
832         SPARSEDENSEMDOT(sum,ls,v,idx,n);
833         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
834       }
835       ierr = VecPipelineEnd(xx,mat->lvec,INSERT_VALUES,PIPELINE_UP,
836                             mat->Mvctx); CHKERRQ(ierr);
837     }
838   }
839   else if (flag & SOR_FORWARD_SWEEP){
840     if (flag & SOR_ZERO_INITIAL_GUESS) {
841       VecSet(&zero,mat->lvec);
842       ierr = VecPipelineBegin(xx,mat->lvec,INSERT_VALUES,PIPELINE_DOWN,
843                               mat->Mvctx); CHKERRQ(ierr);
844       for ( i=0; i<m; i++ ) {
845         n    = diag[i] - A->i[i];
846         idx  = A->j + A->i[i] + shift;
847         v    = A->a + A->i[i] + shift;
848         sum  = b[i];
849         SPARSEDENSEMDOT(sum,xs,v,idx,n);
850         d    = fshift + A->a[diag[i]+shift];
851         n    = B->i[i+1] - B->i[i];
852         idx  = B->j + B->i[i] + shift;
853         v    = B->a + B->i[i] + shift;
854         SPARSEDENSEMDOT(sum,ls,v,idx,n);
855         x[i] = omega*(sum/d);
856       }
857       ierr = VecPipelineEnd(xx,mat->lvec,INSERT_VALUES,PIPELINE_DOWN,
858                             mat->Mvctx); CHKERRQ(ierr);
859       its--;
860     }
861     while (its--) {
862       ierr=VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_UP,mat->Mvctx);
863       CHKERRQ(ierr);
864       ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_UP,mat->Mvctx);
865       CHKERRQ(ierr);
866       ierr = VecPipelineBegin(xx,mat->lvec,INSERT_VALUES,PIPELINE_DOWN,
867                               mat->Mvctx); CHKERRQ(ierr);
868       for ( i=0; i<m; i++ ) {
869         n    = A->i[i+1] - A->i[i];
870         idx  = A->j + A->i[i] + shift;
871         v    = A->a + A->i[i] + shift;
872         sum  = b[i];
873         SPARSEDENSEMDOT(sum,xs,v,idx,n);
874         d    = fshift + A->a[diag[i]+shift];
875         n    = B->i[i+1] - B->i[i];
876         idx  = B->j + B->i[i] + shift;
877         v    = B->a + B->i[i] + shift;
878         SPARSEDENSEMDOT(sum,ls,v,idx,n);
879         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
880       }
881       ierr = VecPipelineEnd(xx,mat->lvec,INSERT_VALUES,PIPELINE_DOWN,
882                             mat->Mvctx); CHKERRQ(ierr);
883     }
884   }
885   else if (flag & SOR_BACKWARD_SWEEP){
886     if (flag & SOR_ZERO_INITIAL_GUESS) {
887       VecSet(&zero,mat->lvec);
888       ierr = VecPipelineBegin(xx,mat->lvec,INSERT_VALUES,PIPELINE_UP,
889                               mat->Mvctx); CHKERRQ(ierr);
890       for ( i=m-1; i>-1; i-- ) {
891         n    = A->i[i+1] - diag[i] - 1;
892         idx  = A->j + diag[i] + !shift;
893         v    = A->a + diag[i] + !shift;
894         sum  = b[i];
895         SPARSEDENSEMDOT(sum,xs,v,idx,n);
896         d    = fshift + A->a[diag[i]+shift];
897         n    = B->i[i+1] - B->i[i];
898         idx  = B->j + B->i[i] + shift;
899         v    = B->a + B->i[i] + shift;
900         SPARSEDENSEMDOT(sum,ls,v,idx,n);
901         x[i] = omega*(sum/d);
902       }
903       ierr = VecPipelineEnd(xx,mat->lvec,INSERT_VALUES,PIPELINE_UP,
904                             mat->Mvctx); CHKERRQ(ierr);
905       its--;
906     }
907     while (its--) {
908       ierr = VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_DOWN,
909                             mat->Mvctx); CHKERRQ(ierr);
910       ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_DOWN,
911                             mat->Mvctx); CHKERRQ(ierr);
912       ierr = VecPipelineBegin(xx,mat->lvec,INSERT_VALUES,PIPELINE_UP,
913                               mat->Mvctx); CHKERRQ(ierr);
914       for ( i=m-1; i>-1; i-- ) {
915         n    = A->i[i+1] - A->i[i];
916         idx  = A->j + A->i[i] + shift;
917         v    = A->a + A->i[i] + shift;
918         sum  = b[i];
919         SPARSEDENSEMDOT(sum,xs,v,idx,n);
920         d    = fshift + A->a[diag[i]+shift];
921         n    = B->i[i+1] - B->i[i];
922         idx  = B->j + B->i[i] + shift;
923         v    = B->a + B->i[i] + shift;
924         SPARSEDENSEMDOT(sum,ls,v,idx,n);
925         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
926       }
927       ierr = VecPipelineEnd(xx,mat->lvec,INSERT_VALUES,PIPELINE_UP,
928                             mat->Mvctx); CHKERRQ(ierr);
929     }
930   }
931   else if ((flag & SOR_LOCAL_SYMMETRIC_SWEEP) == SOR_LOCAL_SYMMETRIC_SWEEP){
932     if (flag & SOR_ZERO_INITIAL_GUESS) {
933       return (*mat->A->ops.relax)(mat->A,bb,omega,flag,fshift,its,xx);
934     }
935     ierr=VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,mat->Mvctx);
936     CHKERRQ(ierr);
937     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,mat->Mvctx);
938     CHKERRQ(ierr);
939     while (its--) {
940       /* go down through the rows */
941       for ( i=0; i<m; i++ ) {
942         n    = A->i[i+1] - A->i[i];
943         idx  = A->j + A->i[i] + shift;
944         v    = A->a + A->i[i] + shift;
945         sum  = b[i];
946         SPARSEDENSEMDOT(sum,xs,v,idx,n);
947         d    = fshift + A->a[diag[i]+shift];
948         n    = B->i[i+1] - B->i[i];
949         idx  = B->j + B->i[i] + shift;
950         v    = B->a + B->i[i] + shift;
951         SPARSEDENSEMDOT(sum,ls,v,idx,n);
952         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
953       }
954       /* come up through the rows */
955       for ( i=m-1; i>-1; i-- ) {
956         n    = A->i[i+1] - A->i[i];
957         idx  = A->j + A->i[i] + shift;
958         v    = A->a + A->i[i] + shift;
959         sum  = b[i];
960         SPARSEDENSEMDOT(sum,xs,v,idx,n);
961         d    = fshift + A->a[diag[i]+shift];
962         n    = B->i[i+1] - B->i[i];
963         idx  = B->j + B->i[i] + shift;
964         v    = B->a + B->i[i] + shift;
965         SPARSEDENSEMDOT(sum,ls,v,idx,n);
966         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
967       }
968     }
969   }
970   else if (flag & SOR_LOCAL_FORWARD_SWEEP){
971     if (flag & SOR_ZERO_INITIAL_GUESS) {
972       return (*mat->A->ops.relax)(mat->A,bb,omega,flag,fshift,its,xx);
973     }
974     ierr=VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,mat->Mvctx);
975     CHKERRQ(ierr);
976     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,mat->Mvctx);
977     CHKERRQ(ierr);
978     while (its--) {
979       for ( i=0; i<m; i++ ) {
980         n    = A->i[i+1] - A->i[i];
981         idx  = A->j + A->i[i] + shift;
982         v    = A->a + A->i[i] + shift;
983         sum  = b[i];
984         SPARSEDENSEMDOT(sum,xs,v,idx,n);
985         d    = fshift + A->a[diag[i]+shift];
986         n    = B->i[i+1] - B->i[i];
987         idx  = B->j + B->i[i] + shift;
988         v    = B->a + B->i[i] + shift;
989         SPARSEDENSEMDOT(sum,ls,v,idx,n);
990         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
991       }
992     }
993   }
994   else if (flag & SOR_LOCAL_BACKWARD_SWEEP){
995     if (flag & SOR_ZERO_INITIAL_GUESS) {
996       return (*mat->A->ops.relax)(mat->A,bb,omega,flag,fshift,its,xx);
997     }
998     ierr = VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,
999                             mat->Mvctx); CHKERRQ(ierr);
1000     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,
1001                             mat->Mvctx); CHKERRQ(ierr);
1002     while (its--) {
1003       for ( i=m-1; i>-1; i-- ) {
1004         n    = A->i[i+1] - A->i[i];
1005         idx  = A->j + A->i[i] + shift;
1006         v    = A->a + A->i[i] + shift;
1007         sum  = b[i];
1008         SPARSEDENSEMDOT(sum,xs,v,idx,n);
1009         d    = fshift + A->a[diag[i]+shift];
1010         n    = B->i[i+1] - B->i[i];
1011         idx  = B->j + B->i[i] + shift;
1012         v    = B->a + B->i[i] + shift;
1013         SPARSEDENSEMDOT(sum,ls,v,idx,n);
1014         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
1015       }
1016     }
1017   }
1018   return 0;
1019 }
1020 
1021 static int MatGetInfo_MPIAIJ(Mat matin,MatInfoType flag,int *nz,
1022                              int *nzalloc,int *mem)
1023 {
1024   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1025   Mat        A = mat->A, B = mat->B;
1026   int        ierr, isend[3], irecv[3], nzA, nzallocA, memA;
1027 
1028   ierr = MatGetInfo(A,MAT_LOCAL,&nzA,&nzallocA,&memA); CHKERRQ(ierr);
1029   ierr = MatGetInfo(B,MAT_LOCAL,&isend[0],&isend[1],&isend[2]); CHKERRQ(ierr);
1030   isend[0] += nzA; isend[1] += nzallocA; isend[2] += memA;
1031   if (flag == MAT_LOCAL) {
1032     if (nz)       *nz      = isend[0];
1033     if (nzalloc)  *nzalloc = isend[1];
1034     if (mem)      *mem     = isend[2];
1035   } else if (flag == MAT_GLOBAL_MAX) {
1036     MPI_Allreduce( isend, irecv,3,MPI_INT,MPI_MAX,matin->comm);
1037     if (nz)      *nz      = irecv[0];
1038     if (nzalloc) *nzalloc = irecv[1];
1039     if (mem)     *mem     = irecv[2];
1040   } else if (flag == MAT_GLOBAL_SUM) {
1041     MPI_Allreduce( isend, irecv,3,MPI_INT,MPI_SUM,matin->comm);
1042     if (nz)      *nz      = irecv[0];
1043     if (nzalloc) *nzalloc = irecv[1];
1044     if (mem)     *mem     = irecv[2];
1045   }
1046   return 0;
1047 }
1048 
1049 extern int MatLUFactorSymbolic_MPIAIJ(Mat,IS,IS,double,Mat*);
1050 extern int MatLUFactorNumeric_MPIAIJ(Mat,Mat*);
1051 extern int MatLUFactor_MPIAIJ(Mat,IS,IS,double);
1052 extern int MatILUFactorSymbolic_MPIAIJ(Mat,IS,IS,double,int,Mat *);
1053 extern int MatSolve_MPIAIJ(Mat,Vec,Vec);
1054 extern int MatSolveAdd_MPIAIJ(Mat,Vec,Vec,Vec);
1055 extern int MatSolveTrans_MPIAIJ(Mat,Vec,Vec);
1056 extern int MatSolveTransAdd_MPIAIJ(Mat,Vec,Vec,Vec);
1057 
1058 static int MatSetOption_MPIAIJ(Mat A,MatOption op)
1059 {
1060   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
1061 
1062   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
1063       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
1064       op == MAT_COLUMNS_SORTED ||
1065       op == MAT_ROW_ORIENTED) {
1066         MatSetOption(a->A,op);
1067         MatSetOption(a->B,op);
1068   }
1069   else if (op == MAT_ROWS_SORTED ||
1070            op == MAT_SYMMETRIC ||
1071            op == MAT_STRUCTURALLY_SYMMETRIC ||
1072            op == MAT_YES_NEW_DIAGONALS)
1073     PLogInfo(A,"Info:MatSetOption_MPIAIJ:Option ignored\n");
1074   else if (op == MAT_COLUMN_ORIENTED) {
1075     a->roworiented = 0;
1076     MatSetOption(a->A,op);
1077     MatSetOption(a->B,op);
1078   }
1079   else if (op == MAT_NO_NEW_DIAGONALS)
1080     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIAIJ:MAT_NO_NEW_DIAGONALS");}
1081   else
1082     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIAIJ:unknown option");}
1083   return 0;
1084 }
1085 
1086 static int MatGetSize_MPIAIJ(Mat matin,int *m,int *n)
1087 {
1088   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1089   *m = mat->M; *n = mat->N;
1090   return 0;
1091 }
1092 
1093 static int MatGetLocalSize_MPIAIJ(Mat matin,int *m,int *n)
1094 {
1095   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1096   *m = mat->m; *n = mat->N;
1097   return 0;
1098 }
1099 
1100 static int MatGetOwnershipRange_MPIAIJ(Mat matin,int *m,int *n)
1101 {
1102   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1103   *m = mat->rstart; *n = mat->rend;
1104   return 0;
1105 }
1106 
1107 extern int MatGetRow_SeqAIJ(Mat,int,int*,int**,Scalar**);
1108 extern int MatRestoreRow_SeqAIJ(Mat,int,int*,int**,Scalar**);
1109 
1110 int MatGetRow_MPIAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
1111 {
1112   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1113   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
1114   int        i, ierr, *cworkA, *cworkB, **pcA, **pcB, cstart = mat->cstart;
1115   int        nztot, nzA, nzB, lrow, rstart = mat->rstart, rend = mat->rend;
1116   int        *cmap, *idx_p;
1117 
1118   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,"MatGetRow_MPIAIJ:Already active");
1119   mat->getrowactive = PETSC_TRUE;
1120 
1121   if (!mat->rowvalues && (idx || v)) {
1122     /*
1123         allocate enough space to hold information from the longest row.
1124     */
1125     Mat_SeqAIJ *Aa = (Mat_SeqAIJ *) mat->A->data,*Ba = (Mat_SeqAIJ *) mat->B->data;
1126     int     max = 1,n = mat->n,tmp;
1127     for ( i=0; i<n; i++ ) {
1128       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
1129       if (max < tmp) { max = tmp; }
1130     }
1131     mat->rowvalues = (Scalar *) PetscMalloc( max*(sizeof(int)+sizeof(Scalar)));
1132     CHKPTRQ(mat->rowvalues);
1133     mat->rowindices = (int *) (mat->rowvalues + max);
1134   }
1135 
1136 
1137   if (row < rstart || row >= rend) SETERRQ(1,"MatGetRow_MPIAIJ:Only local rows")
1138   lrow = row - rstart;
1139 
1140   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
1141   if (!v)   {pvA = 0; pvB = 0;}
1142   if (!idx) {pcA = 0; if (!v) pcB = 0;}
1143   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1144   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1145   nztot = nzA + nzB;
1146 
1147   cmap  = mat->garray;
1148   if (v  || idx) {
1149     if (nztot) {
1150       /* Sort by increasing column numbers, assuming A and B already sorted */
1151       int imark = -1;
1152       if (v) {
1153         *v = v_p = mat->rowvalues;
1154         for ( i=0; i<nzB; i++ ) {
1155           if (cmap[cworkB[i]] < cstart)   v_p[i] = vworkB[i];
1156           else break;
1157         }
1158         imark = i;
1159         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
1160         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
1161       }
1162       if (idx) {
1163         *idx = idx_p = mat->rowindices;
1164         if (imark > -1) {
1165           for ( i=0; i<imark; i++ ) {
1166             idx_p[i] = cmap[cworkB[i]];
1167           }
1168         } else {
1169           for ( i=0; i<nzB; i++ ) {
1170             if (cmap[cworkB[i]] < cstart)   idx_p[i] = cmap[cworkB[i]];
1171             else break;
1172           }
1173           imark = i;
1174         }
1175         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart + cworkA[i];
1176         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]];
1177       }
1178     }
1179     else {
1180       if (idx) *idx = 0;
1181       if (v)   *v   = 0;
1182     }
1183   }
1184   *nz = nztot;
1185   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1186   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1187   return 0;
1188 }
1189 
1190 int MatRestoreRow_MPIAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
1191 {
1192   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
1193   if (aij->getrowactive == PETSC_FALSE) {
1194     SETERRQ(1,"MatRestoreRow_MPIAIJ:MatGetRow not called");
1195   }
1196   aij->getrowactive = PETSC_FALSE;
1197   return 0;
1198 }
1199 
1200 static int MatNorm_MPIAIJ(Mat mat,NormType type,double *norm)
1201 {
1202   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
1203   Mat_SeqAIJ *amat = (Mat_SeqAIJ*) aij->A->data, *bmat = (Mat_SeqAIJ*) aij->B->data;
1204   int        ierr, i, j, cstart = aij->cstart,shift = amat->indexshift;
1205   double     sum = 0.0;
1206   Scalar     *v;
1207 
1208   if (aij->size == 1) {
1209     ierr =  MatNorm(aij->A,type,norm); CHKERRQ(ierr);
1210   } else {
1211     if (type == NORM_FROBENIUS) {
1212       v = amat->a;
1213       for (i=0; i<amat->nz; i++ ) {
1214 #if defined(PETSC_COMPLEX)
1215         sum += real(conj(*v)*(*v)); v++;
1216 #else
1217         sum += (*v)*(*v); v++;
1218 #endif
1219       }
1220       v = bmat->a;
1221       for (i=0; i<bmat->nz; i++ ) {
1222 #if defined(PETSC_COMPLEX)
1223         sum += real(conj(*v)*(*v)); v++;
1224 #else
1225         sum += (*v)*(*v); v++;
1226 #endif
1227       }
1228       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
1229       *norm = sqrt(*norm);
1230     }
1231     else if (type == NORM_1) { /* max column norm */
1232       double *tmp, *tmp2;
1233       int    *jj, *garray = aij->garray;
1234       tmp  = (double *) PetscMalloc( aij->N*sizeof(double) ); CHKPTRQ(tmp);
1235       tmp2 = (double *) PetscMalloc( aij->N*sizeof(double) ); CHKPTRQ(tmp2);
1236       PetscMemzero(tmp,aij->N*sizeof(double));
1237       *norm = 0.0;
1238       v = amat->a; jj = amat->j;
1239       for ( j=0; j<amat->nz; j++ ) {
1240         tmp[cstart + *jj++ + shift] += PetscAbsScalar(*v);  v++;
1241       }
1242       v = bmat->a; jj = bmat->j;
1243       for ( j=0; j<bmat->nz; j++ ) {
1244         tmp[garray[*jj++ + shift]] += PetscAbsScalar(*v); v++;
1245       }
1246       MPI_Allreduce(tmp,tmp2,aij->N,MPI_DOUBLE,MPI_SUM,mat->comm);
1247       for ( j=0; j<aij->N; j++ ) {
1248         if (tmp2[j] > *norm) *norm = tmp2[j];
1249       }
1250       PetscFree(tmp); PetscFree(tmp2);
1251     }
1252     else if (type == NORM_INFINITY) { /* max row norm */
1253       double ntemp = 0.0;
1254       for ( j=0; j<amat->m; j++ ) {
1255         v = amat->a + amat->i[j] + shift;
1256         sum = 0.0;
1257         for ( i=0; i<amat->i[j+1]-amat->i[j]; i++ ) {
1258           sum += PetscAbsScalar(*v); v++;
1259         }
1260         v = bmat->a + bmat->i[j] + shift;
1261         for ( i=0; i<bmat->i[j+1]-bmat->i[j]; i++ ) {
1262           sum += PetscAbsScalar(*v); v++;
1263         }
1264         if (sum > ntemp) ntemp = sum;
1265       }
1266       MPI_Allreduce(&ntemp,norm,1,MPI_DOUBLE,MPI_MAX,mat->comm);
1267     }
1268     else {
1269       SETERRQ(1,"MatNorm_MPIAIJ:No support for two norm");
1270     }
1271   }
1272   return 0;
1273 }
1274 
1275 static int MatTranspose_MPIAIJ(Mat A,Mat *matout)
1276 {
1277   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
1278   Mat_SeqAIJ *Aloc = (Mat_SeqAIJ *) a->A->data;
1279   int        ierr,shift = Aloc->indexshift;
1280   Mat        B;
1281   int        M = a->M, N = a->N,m,*ai,*aj,row,*cols,i,*ct;
1282   Scalar     *array;
1283 
1284   if (matout == PETSC_NULL && M != N)
1285     SETERRQ(1,"MatTranspose_MPIAIJ:Square matrix only for in-place");
1286   ierr = MatCreateMPIAIJ(A->comm,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,
1287          PETSC_NULL,&B); CHKERRQ(ierr);
1288 
1289   /* copy over the A part */
1290   Aloc = (Mat_SeqAIJ*) a->A->data;
1291   m = Aloc->m; ai = Aloc->i; aj = Aloc->j; array = Aloc->a;
1292   row = a->rstart;
1293   for ( i=0; i<ai[m]+shift; i++ ) {aj[i] += a->cstart + shift;}
1294   for ( i=0; i<m; i++ ) {
1295     ierr = MatSetValues(B,ai[i+1]-ai[i],aj,1,&row,array,INSERT_VALUES);CHKERRQ(ierr);
1296     row++; array += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
1297   }
1298   aj = Aloc->j;
1299   for ( i=0; i<ai[m]+shift; i++ ) {aj[i] -= a->cstart + shift;}
1300 
1301   /* copy over the B part */
1302   Aloc = (Mat_SeqAIJ*) a->B->data;
1303   m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; array = Aloc->a;
1304   row = a->rstart;
1305   ct = cols = (int *) PetscMalloc( (1+ai[m]-shift)*sizeof(int) ); CHKPTRQ(cols);
1306   for ( i=0; i<ai[m]+shift; i++ ) {cols[i] = a->garray[aj[i]+shift];}
1307   for ( i=0; i<m; i++ ) {
1308     ierr = MatSetValues(B,ai[i+1]-ai[i],cols,1,&row,array,INSERT_VALUES);CHKERRQ(ierr);
1309     row++; array += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
1310   }
1311   PetscFree(ct);
1312   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1313   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1314   if (matout != PETSC_NULL) {
1315     *matout = B;
1316   } else {
1317     /* This isn't really an in-place transpose .... but free data structures from a */
1318     PetscFree(a->rowners);
1319     ierr = MatDestroy(a->A); CHKERRQ(ierr);
1320     ierr = MatDestroy(a->B); CHKERRQ(ierr);
1321     if (a->colmap) PetscFree(a->colmap);
1322     if (a->garray) PetscFree(a->garray);
1323     if (a->lvec) VecDestroy(a->lvec);
1324     if (a->Mvctx) VecScatterDestroy(a->Mvctx);
1325     PetscFree(a);
1326     PetscMemcpy(A,B,sizeof(struct _Mat));
1327     PetscHeaderDestroy(B);
1328   }
1329   return 0;
1330 }
1331 
1332 extern int MatPrintHelp_SeqAIJ(Mat);
1333 static int MatPrintHelp_MPIAIJ(Mat A)
1334 {
1335   Mat_MPIAIJ *a   = (Mat_MPIAIJ*) A->data;
1336 
1337   if (!a->rank) return MatPrintHelp_SeqAIJ(a->A);
1338   else return 0;
1339 }
1340 
1341 static int MatGetBlockSize_MPIAIJ(Mat A,int *bs)
1342 {
1343   *bs = 1;
1344   return 0;
1345 }
1346 
1347 extern int MatConvert_MPIAIJ(Mat,MatType,Mat *);
1348 static int MatConvertSameType_MPIAIJ(Mat,Mat *,int);
1349 extern int MatIncreaseOverlap_MPIAIJ(Mat , int, IS *, int);
1350 int MatGetSubMatrices_MPIAIJ (Mat ,int , IS *,IS *,MatGetSubMatrixCall,Mat **);
1351 /* -------------------------------------------------------------------*/
1352 static struct _MatOps MatOps = {MatSetValues_MPIAIJ,
1353        MatGetRow_MPIAIJ,MatRestoreRow_MPIAIJ,
1354        MatMult_MPIAIJ,MatMultAdd_MPIAIJ,
1355        MatMultTrans_MPIAIJ,MatMultTransAdd_MPIAIJ,
1356        MatSolve_MPIAIJ,MatSolveAdd_MPIAIJ,
1357        MatSolveTrans_MPIAIJ,MatSolveTransAdd_MPIAIJ,
1358        MatLUFactor_MPIAIJ,0,
1359        MatRelax_MPIAIJ,
1360        MatTranspose_MPIAIJ,
1361        MatGetInfo_MPIAIJ,0,
1362        MatGetDiagonal_MPIAIJ,0,MatNorm_MPIAIJ,
1363        MatAssemblyBegin_MPIAIJ,MatAssemblyEnd_MPIAIJ,
1364        0,
1365        MatSetOption_MPIAIJ,MatZeroEntries_MPIAIJ,MatZeroRows_MPIAIJ,
1366        MatGetReordering_MPIAIJ,
1367        MatLUFactorSymbolic_MPIAIJ,MatLUFactorNumeric_MPIAIJ,0,0,
1368        MatGetSize_MPIAIJ,MatGetLocalSize_MPIAIJ,MatGetOwnershipRange_MPIAIJ,
1369        MatILUFactorSymbolic_MPIAIJ,0,
1370        0,0,MatConvert_MPIAIJ,0,0,MatConvertSameType_MPIAIJ,0,0,
1371        0,0,0,
1372        MatGetSubMatrices_MPIAIJ,MatIncreaseOverlap_MPIAIJ,MatGetValues_MPIAIJ,0,
1373        MatPrintHelp_MPIAIJ,
1374        MatScale_MPIAIJ,0,0,0,MatGetBlockSize_MPIAIJ};
1375 
1376 /*@C
1377    MatCreateMPIAIJ - Creates a sparse parallel matrix in AIJ format
1378    (the default parallel PETSc format).  For good matrix assembly performance
1379    the user should preallocate the matrix storage by setting the parameters
1380    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1381    performance can be increased by more than a factor of 50.
1382 
1383    Input Parameters:
1384 .  comm - MPI communicator
1385 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1386            This value should be the same as the local size used in creating the
1387            y vector for the matrix-vector product y = Ax.
1388 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1389            This value should be the same as the local size used in creating the
1390            x vector for the matrix-vector product y = Ax.
1391 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1392 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1393 .  d_nz - number of nonzeros per row in diagonal portion of local submatrix
1394            (same for all local rows)
1395 .  d_nzz - array containing the number of nonzeros in the various rows of the
1396            diagonal portion of the local submatrix (possibly different for each row)
1397            or PETSC_NULL. You must leave room for the diagonal entry even if
1398            it is zero.
1399 .  o_nz - number of nonzeros per row in the off-diagonal portion of local
1400            submatrix (same for all local rows).
1401 .  o_nzz - array containing the number of nonzeros in the various rows of the
1402            off-diagonal portion of the local submatrix (possibly different for
1403            each row) or PETSC_NULL.
1404 
1405    Output Parameter:
1406 .  A - the matrix
1407 
1408    Notes:
1409    The AIJ format (also called the Yale sparse matrix format or
1410    compressed row storage), is fully compatible with standard Fortran 77
1411    storage.  That is, the stored row and column indices can begin at
1412    either one (as in Fortran) or zero.  See the users manual for details.
1413 
1414    The user MUST specify either the local or global matrix dimensions
1415    (possibly both).
1416 
1417    By default, this format uses inodes (identical nodes) when possible.
1418    We search for consecutive rows with the same nonzero structure, thereby
1419    reusing matrix information to achieve increased efficiency.
1420 
1421    Options Database Keys:
1422 $    -mat_aij_no_inode  - Do not use inodes
1423 $    -mat_aij_inode_limit <limit> - Set inode limit.
1424 $        (max limit=5)
1425 $    -mat_aij_oneindex - Internally use indexing starting at 1
1426 $        rather than 0.  Note: When calling MatSetValues(),
1427 $        the user still MUST index entries starting at 0!
1428 
1429    Storage Information:
1430    For a square global matrix we define each processor's diagonal portion
1431    to be its local rows and the corresponding columns (a square submatrix);
1432    each processor's off-diagonal portion encompasses the remainder of the
1433    local matrix (a rectangular submatrix).
1434 
1435    The user can specify preallocated storage for the diagonal part of
1436    the local submatrix with either d_nz or d_nnz (not both).  Set
1437    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1438    memory allocation.  Likewise, specify preallocated storage for the
1439    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1440 
1441    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1442    the figure below we depict these three local rows and all columns (0-11).
1443 
1444 $          0 1 2 3 4 5 6 7 8 9 10 11
1445 $         -------------------
1446 $  row 3  |  o o o d d d o o o o o o
1447 $  row 4  |  o o o d d d o o o o o o
1448 $  row 5  |  o o o d d d o o o o o o
1449 $         -------------------
1450 $
1451 
1452    Thus, any entries in the d locations are stored in the d (diagonal)
1453    submatrix, and any entries in the o locations are stored in the
1454    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1455    stored simply in the MATSEQAIJ format for compressed row storage.
1456 
1457    Now d_nz should indicate the number of nonzeros per row in the d matrix,
1458    and o_nz should indicate the number of nonzeros per row in the o matrix.
1459    In general, for PDE problems in which most nonzeros are near the diagonal,
1460    one expects d_nz >> o_nz. For large problems you MUST preallocate memory
1461    or you will get TERRIBLE performance; see the users' manual chapter on
1462    matrices and the file $(PETSC_DIR)/Performance.
1463 
1464 .keywords: matrix, aij, compressed row, sparse, parallel
1465 
1466 .seealso: MatCreate(), MatCreateSeqAIJ(), MatSetValues()
1467 @*/
1468 int MatCreateMPIAIJ(MPI_Comm comm,int m,int n,int M,int N,
1469                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1470 {
1471   Mat          B;
1472   Mat_MPIAIJ   *b;
1473   int          ierr, i,sum[2],work[2];
1474 
1475   *A = 0;
1476   PetscHeaderCreate(B,_Mat,MAT_COOKIE,MATMPIAIJ,comm);
1477   PLogObjectCreate(B);
1478   B->data       = (void *) (b = PetscNew(Mat_MPIAIJ)); CHKPTRQ(b);
1479   PetscMemzero(b,sizeof(Mat_MPIAIJ));
1480   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1481   B->destroy    = MatDestroy_MPIAIJ;
1482   B->view       = MatView_MPIAIJ;
1483   B->factor     = 0;
1484   B->assembled  = PETSC_FALSE;
1485 
1486   b->insertmode = NOT_SET_VALUES;
1487   MPI_Comm_rank(comm,&b->rank);
1488   MPI_Comm_size(comm,&b->size);
1489 
1490   if (m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
1491     SETERRQ(1,"MatCreateMPIAIJ:Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1492 
1493   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1494     work[0] = m; work[1] = n;
1495     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
1496     if (M == PETSC_DECIDE) M = sum[0];
1497     if (N == PETSC_DECIDE) N = sum[1];
1498   }
1499   if (m == PETSC_DECIDE) {m = M/b->size + ((M % b->size) > b->rank);}
1500   if (n == PETSC_DECIDE) {n = N/b->size + ((N % b->size) > b->rank);}
1501   b->m = m; B->m = m;
1502   b->n = n; B->n = n;
1503   b->N = N; B->N = N;
1504   b->M = M; B->M = M;
1505 
1506   /* build local table of row and column ownerships */
1507   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1508   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIAIJ));
1509   b->cowners = b->rowners + b->size + 1;
1510   MPI_Allgather(&m,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
1511   b->rowners[0] = 0;
1512   for ( i=2; i<=b->size; i++ ) {
1513     b->rowners[i] += b->rowners[i-1];
1514   }
1515   b->rstart = b->rowners[b->rank];
1516   b->rend   = b->rowners[b->rank+1];
1517   MPI_Allgather(&n,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
1518   b->cowners[0] = 0;
1519   for ( i=2; i<=b->size; i++ ) {
1520     b->cowners[i] += b->cowners[i-1];
1521   }
1522   b->cstart = b->cowners[b->rank];
1523   b->cend   = b->cowners[b->rank+1];
1524 
1525   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1526   ierr = MatCreateSeqAIJ(MPI_COMM_SELF,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1527   PLogObjectParent(B,b->A);
1528   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1529   ierr = MatCreateSeqAIJ(MPI_COMM_SELF,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1530   PLogObjectParent(B,b->B);
1531 
1532   /* build cache for off array entries formed */
1533   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1534   b->colmap      = 0;
1535   b->garray      = 0;
1536   b->roworiented = 1;
1537 
1538   /* stuff used for matrix vector multiply */
1539   b->lvec      = 0;
1540   b->Mvctx     = 0;
1541 
1542   /* stuff for MatGetRow() */
1543   b->rowindices   = 0;
1544   b->rowvalues    = 0;
1545   b->getrowactive = PETSC_FALSE;
1546 
1547   *A = B;
1548   return 0;
1549 }
1550 
1551 static int MatConvertSameType_MPIAIJ(Mat matin,Mat *newmat,int cpvalues)
1552 {
1553   Mat        mat;
1554   Mat_MPIAIJ *a,*oldmat = (Mat_MPIAIJ *) matin->data;
1555   int        ierr, len=0, flg;
1556 
1557   *newmat       = 0;
1558   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIAIJ,matin->comm);
1559   PLogObjectCreate(mat);
1560   mat->data       = (void *) (a = PetscNew(Mat_MPIAIJ)); CHKPTRQ(a);
1561   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1562   mat->destroy    = MatDestroy_MPIAIJ;
1563   mat->view       = MatView_MPIAIJ;
1564   mat->factor     = matin->factor;
1565   mat->assembled  = PETSC_TRUE;
1566 
1567   a->m = mat->m   = oldmat->m;
1568   a->n = mat->n   = oldmat->n;
1569   a->M = mat->M   = oldmat->M;
1570   a->N = mat->N   = oldmat->N;
1571 
1572   a->rstart       = oldmat->rstart;
1573   a->rend         = oldmat->rend;
1574   a->cstart       = oldmat->cstart;
1575   a->cend         = oldmat->cend;
1576   a->size         = oldmat->size;
1577   a->rank         = oldmat->rank;
1578   a->insertmode   = NOT_SET_VALUES;
1579   a->rowvalues    = 0;
1580   a->getrowactive = PETSC_FALSE;
1581 
1582   a->rowners = (int *) PetscMalloc((a->size+1)*sizeof(int)); CHKPTRQ(a->rowners);
1583   PLogObjectMemory(mat,(a->size+1)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIAIJ));
1584   PetscMemcpy(a->rowners,oldmat->rowners,(a->size+1)*sizeof(int));
1585   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1586   if (oldmat->colmap) {
1587     a->colmap = (int *) PetscMalloc((a->N)*sizeof(int));CHKPTRQ(a->colmap);
1588     PLogObjectMemory(mat,(a->N)*sizeof(int));
1589     PetscMemcpy(a->colmap,oldmat->colmap,(a->N)*sizeof(int));
1590   } else a->colmap = 0;
1591   if (oldmat->garray && (len = ((Mat_SeqAIJ *) (oldmat->B->data))->n)) {
1592     a->garray = (int *) PetscMalloc(len*sizeof(int)); CHKPTRQ(a->garray);
1593     PLogObjectMemory(mat,len*sizeof(int));
1594     PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1595   } else a->garray = 0;
1596 
1597   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1598   PLogObjectParent(mat,a->lvec);
1599   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1600   PLogObjectParent(mat,a->Mvctx);
1601   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1602   PLogObjectParent(mat,a->A);
1603   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1604   PLogObjectParent(mat,a->B);
1605   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1606   if (flg) {
1607     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1608   }
1609   *newmat = mat;
1610   return 0;
1611 }
1612 
1613 #include "sys.h"
1614 
1615 int MatLoad_MPIAIJ(Viewer viewer,MatType type,Mat *newmat)
1616 {
1617   Mat          A;
1618   int          i, nz, ierr, j,rstart, rend, fd;
1619   Scalar       *vals,*svals;
1620   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1621   MPI_Status   status;
1622   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,maxnz,*cols;
1623   int          *ourlens,*sndcounts = 0,*procsnz = 0, *offlens,jj,*mycols,*smycols;
1624   int          tag = ((PetscObject)viewer)->tag;
1625 
1626   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1627   if (!rank) {
1628     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1629     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
1630     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIAIJ:not matrix object");
1631   }
1632 
1633   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1634   M = header[1]; N = header[2];
1635   /* determine ownership of all rows */
1636   m = M/size + ((M % size) > rank);
1637   rowners = (int *) PetscMalloc((size+2)*sizeof(int)); CHKPTRQ(rowners);
1638   MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1639   rowners[0] = 0;
1640   for ( i=2; i<=size; i++ ) {
1641     rowners[i] += rowners[i-1];
1642   }
1643   rstart = rowners[rank];
1644   rend   = rowners[rank+1];
1645 
1646   /* distribute row lengths to all processors */
1647   ourlens = (int*) PetscMalloc( 2*(rend-rstart)*sizeof(int) ); CHKPTRQ(ourlens);
1648   offlens = ourlens + (rend-rstart);
1649   if (!rank) {
1650     rowlengths = (int*) PetscMalloc( M*sizeof(int) ); CHKPTRQ(rowlengths);
1651     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1652     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1653     for ( i=0; i<size; i++ ) sndcounts[i] = rowners[i+1] - rowners[i];
1654     MPI_Scatterv(rowlengths,sndcounts,rowners,MPI_INT,ourlens,rend-rstart,MPI_INT,0,comm);
1655     PetscFree(sndcounts);
1656   }
1657   else {
1658     MPI_Scatterv(0,0,0,MPI_INT,ourlens,rend-rstart,MPI_INT, 0,comm);
1659   }
1660 
1661   if (!rank) {
1662     /* calculate the number of nonzeros on each processor */
1663     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1664     PetscMemzero(procsnz,size*sizeof(int));
1665     for ( i=0; i<size; i++ ) {
1666       for ( j=rowners[i]; j< rowners[i+1]; j++ ) {
1667         procsnz[i] += rowlengths[j];
1668       }
1669     }
1670     PetscFree(rowlengths);
1671 
1672     /* determine max buffer needed and allocate it */
1673     maxnz = 0;
1674     for ( i=0; i<size; i++ ) {
1675       maxnz = PetscMax(maxnz,procsnz[i]);
1676     }
1677     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1678 
1679     /* read in my part of the matrix column indices  */
1680     nz = procsnz[0];
1681     mycols = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1682     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1683 
1684     /* read in every one elses and ship off */
1685     for ( i=1; i<size; i++ ) {
1686       nz = procsnz[i];
1687       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1688       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1689     }
1690     PetscFree(cols);
1691   }
1692   else {
1693     /* determine buffer space needed for message */
1694     nz = 0;
1695     for ( i=0; i<m; i++ ) {
1696       nz += ourlens[i];
1697     }
1698     mycols = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1699 
1700     /* receive message of column indices*/
1701     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1702     MPI_Get_count(&status,MPI_INT,&maxnz);
1703     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIAIJ:something is wrong with file");
1704   }
1705 
1706   /* loop over local rows, determining number of off diagonal entries */
1707   PetscMemzero(offlens,m*sizeof(int));
1708   jj = 0;
1709   for ( i=0; i<m; i++ ) {
1710     for ( j=0; j<ourlens[i]; j++ ) {
1711       if (mycols[jj] < rstart || mycols[jj] >= rend) offlens[i]++;
1712       jj++;
1713     }
1714   }
1715 
1716   /* create our matrix */
1717   for ( i=0; i<m; i++ ) {
1718     ourlens[i] -= offlens[i];
1719   }
1720   ierr = MatCreateMPIAIJ(comm,m,PETSC_DECIDE,M,N,0,ourlens,0,offlens,newmat);CHKERRQ(ierr);
1721   A = *newmat;
1722   MatSetOption(A,MAT_COLUMNS_SORTED);
1723   for ( i=0; i<m; i++ ) {
1724     ourlens[i] += offlens[i];
1725   }
1726 
1727   if (!rank) {
1728     vals = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(vals);
1729 
1730     /* read in my part of the matrix numerical values  */
1731     nz = procsnz[0];
1732     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1733 
1734     /* insert into matrix */
1735     jj      = rstart;
1736     smycols = mycols;
1737     svals   = vals;
1738     for ( i=0; i<m; i++ ) {
1739       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1740       smycols += ourlens[i];
1741       svals   += ourlens[i];
1742       jj++;
1743     }
1744 
1745     /* read in other processors and ship out */
1746     for ( i=1; i<size; i++ ) {
1747       nz = procsnz[i];
1748       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1749       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1750     }
1751     PetscFree(procsnz);
1752   }
1753   else {
1754     /* receive numeric values */
1755     vals = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(vals);
1756 
1757     /* receive message of values*/
1758     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1759     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1760     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIAIJ:something is wrong with file");
1761 
1762     /* insert into matrix */
1763     jj      = rstart;
1764     smycols = mycols;
1765     svals   = vals;
1766     for ( i=0; i<m; i++ ) {
1767       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1768       smycols += ourlens[i];
1769       svals   += ourlens[i];
1770       jj++;
1771     }
1772   }
1773   PetscFree(ourlens); PetscFree(vals); PetscFree(mycols); PetscFree(rowners);
1774 
1775   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1776   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1777   return 0;
1778 }
1779