xref: /petsc/src/mat/impls/aij/mpi/mpiaij.c (revision a0e291a800ee45b4b2a97096e897722001efcf76)
1 
2 #ifndef lint
3 static char vcid[] = "$Id: mpiaij.c,v 1.160 1996/08/08 14:42:52 bsmith Exp bsmith $";
4 #endif
5 
6 #include "src/mat/impls/aij/mpi/mpiaij.h"
7 #include "src/vec/vecimpl.h"
8 #include "src/inline/spops.h"
9 
10 /* local utility routine that creates a mapping from the global column
11 number to the local number in the off-diagonal part of the local
12 storage of the matrix.  This is done in a non scable way since the
13 length of colmap equals the global matrix length.
14 */
15 static int CreateColmap_MPIAIJ_Private(Mat mat)
16 {
17   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
18   Mat_SeqAIJ *B = (Mat_SeqAIJ*) aij->B->data;
19   int        n = B->n,i;
20 
21   aij->colmap = (int *) PetscMalloc(aij->N*sizeof(int));CHKPTRQ(aij->colmap);
22   PLogObjectMemory(mat,aij->N*sizeof(int));
23   PetscMemzero(aij->colmap,aij->N*sizeof(int));
24   for ( i=0; i<n; i++ ) aij->colmap[aij->garray[i]] = i+1;
25   return 0;
26 }
27 
28 extern int DisAssemble_MPIAIJ(Mat);
29 
30 static int MatGetReordering_MPIAIJ(Mat mat,MatReordering type,IS *rperm,IS *cperm)
31 {
32   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
33   int        ierr;
34   if (aij->size == 1) {
35     ierr = MatGetReordering(aij->A,type,rperm,cperm); CHKERRQ(ierr);
36   } else SETERRQ(1,"MatGetReordering_MPIAIJ:not supported in parallel");
37   return 0;
38 }
39 
40 static int MatSetValues_MPIAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
41 {
42   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
43   Scalar     value;
44   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
45   int        cstart = aij->cstart, cend = aij->cend,row,col;
46   int        roworiented = aij->roworiented;
47 
48   if (aij->insertmode != NOT_SET_VALUES && aij->insertmode != addv) {
49     SETERRQ(1,"MatSetValues_MPIAIJ:Cannot mix inserts and adds");
50   }
51   aij->insertmode = addv;
52   for ( i=0; i<m; i++ ) {
53     if (im[i] < 0) SETERRQ(1,"MatSetValues_MPIAIJ:Negative row");
54     if (im[i] >= aij->M) SETERRQ(1,"MatSetValues_MPIAIJ:Row too large");
55     if (im[i] >= rstart && im[i] < rend) {
56       row = im[i] - rstart;
57       for ( j=0; j<n; j++ ) {
58         if (in[j] < 0) SETERRQ(1,"MatSetValues_MPIAIJ:Negative column");
59         if (in[j] >= aij->N) SETERRQ(1,"MatSetValues_MPIAIJ:Col too large");
60         if (in[j] >= cstart && in[j] < cend){
61           col = in[j] - cstart;
62           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
63           ierr = MatSetValues(aij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
64         }
65         else {
66           if (mat->was_assembled) {
67             if (!aij->colmap) {
68               ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
69             }
70             col = aij->colmap[in[j]] - 1;
71             if (col < 0 && !((Mat_SeqAIJ*)(aij->A->data))->nonew) {
72               ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
73               col =  in[j];
74             }
75           }
76           else col = in[j];
77           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
78           ierr = MatSetValues(aij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
79         }
80       }
81     }
82     else {
83       if (roworiented) {
84         ierr = StashValues_Private(&aij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
85       }
86       else {
87         row = im[i];
88         for ( j=0; j<n; j++ ) {
89           ierr = StashValues_Private(&aij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
90         }
91       }
92     }
93   }
94   return 0;
95 }
96 
97 static int MatGetValues_MPIAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
98 {
99   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
100   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
101   int        cstart = aij->cstart, cend = aij->cend,row,col;
102 
103   for ( i=0; i<m; i++ ) {
104     if (idxm[i] < 0) SETERRQ(1,"MatGetValues_MPIAIJ:Negative row");
105     if (idxm[i] >= aij->M) SETERRQ(1,"MatGetValues_MPIAIJ:Row too large");
106     if (idxm[i] >= rstart && idxm[i] < rend) {
107       row = idxm[i] - rstart;
108       for ( j=0; j<n; j++ ) {
109         if (idxn[j] < 0) SETERRQ(1,"MatGetValues_MPIAIJ:Negative column");
110         if (idxn[j] >= aij->N) SETERRQ(1,"MatGetValues_MPIAIJ:Col too large");
111         if (idxn[j] >= cstart && idxn[j] < cend){
112           col = idxn[j] - cstart;
113           ierr = MatGetValues(aij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
114         }
115         else {
116           if (!aij->colmap) {
117             ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
118           }
119           col = aij->colmap[idxn[j]] - 1;
120           if ((col < 0) || (aij->garray[col] != idxn[j])) *(v+i*n+j) = 0.0;
121           else {
122             ierr = MatGetValues(aij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
123           }
124         }
125       }
126     }
127     else {
128       SETERRQ(1,"MatGetValues_MPIAIJ:Only local values currently supported");
129     }
130   }
131   return 0;
132 }
133 
134 static int MatAssemblyBegin_MPIAIJ(Mat mat,MatAssemblyType mode)
135 {
136   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
137   MPI_Comm    comm = mat->comm;
138   int         size = aij->size, *owners = aij->rowners;
139   int         rank = aij->rank,tag = mat->tag, *owner,*starts,count,ierr;
140   MPI_Request *send_waits,*recv_waits;
141   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
142   InsertMode  addv;
143   Scalar      *rvalues,*svalues;
144 
145   /* make sure all processors are either in INSERTMODE or ADDMODE */
146   MPI_Allreduce(&aij->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
147   if (addv == (ADD_VALUES|INSERT_VALUES)) {
148     SETERRQ(1,"MatAssemblyBegin_MPIAIJ:Some processors inserted others added");
149   }
150   aij->insertmode = addv; /* in case this processor had no cache */
151 
152   /*  first count number of contributors to each processor */
153   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
154   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
155   owner = (int *) PetscMalloc( (aij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
156   for ( i=0; i<aij->stash.n; i++ ) {
157     idx = aij->stash.idx[i];
158     for ( j=0; j<size; j++ ) {
159       if (idx >= owners[j] && idx < owners[j+1]) {
160         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
161       }
162     }
163   }
164   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
165 
166   /* inform other processors of number of messages and max length*/
167   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
168   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
169   nreceives = work[rank];
170   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
171   nmax = work[rank];
172   PetscFree(work);
173 
174   /* post receives:
175        1) each message will consist of ordered pairs
176      (global index,value) we store the global index as a double
177      to simplify the message passing.
178        2) since we don't know how long each individual message is we
179      allocate the largest needed buffer for each receive. Potentially
180      this is a lot of wasted space.
181 
182 
183        This could be done better.
184   */
185   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
186   CHKPTRQ(rvalues);
187   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
188   CHKPTRQ(recv_waits);
189   for ( i=0; i<nreceives; i++ ) {
190     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
191               comm,recv_waits+i);
192   }
193 
194   /* do sends:
195       1) starts[i] gives the starting index in svalues for stuff going to
196          the ith processor
197   */
198   svalues = (Scalar *) PetscMalloc(3*(aij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
199   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
200   CHKPTRQ(send_waits);
201   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
202   starts[0] = 0;
203   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
204   for ( i=0; i<aij->stash.n; i++ ) {
205     svalues[3*starts[owner[i]]]       = (Scalar)  aij->stash.idx[i];
206     svalues[3*starts[owner[i]]+1]     = (Scalar)  aij->stash.idy[i];
207     svalues[3*(starts[owner[i]]++)+2] =  aij->stash.array[i];
208   }
209   PetscFree(owner);
210   starts[0] = 0;
211   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
212   count = 0;
213   for ( i=0; i<size; i++ ) {
214     if (procs[i]) {
215       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
216                 comm,send_waits+count++);
217     }
218   }
219   PetscFree(starts); PetscFree(nprocs);
220 
221   /* Free cache space */
222   PLogInfo(0,"[%d]MatAssemblyBegin_MPIAIJ:Number of off processor values %d\n",rank,aij->stash.n);
223   ierr = StashDestroy_Private(&aij->stash); CHKERRQ(ierr);
224 
225   aij->svalues    = svalues;    aij->rvalues    = rvalues;
226   aij->nsends     = nsends;     aij->nrecvs     = nreceives;
227   aij->send_waits = send_waits; aij->recv_waits = recv_waits;
228   aij->rmax       = nmax;
229 
230   return 0;
231 }
232 extern int MatSetUpMultiply_MPIAIJ(Mat);
233 
234 static int MatAssemblyEnd_MPIAIJ(Mat mat,MatAssemblyType mode)
235 {
236   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
237   MPI_Status  *send_status,recv_status;
238   int         imdex,nrecvs = aij->nrecvs, count = nrecvs, i, n, ierr;
239   int         row,col,other_disassembled;
240   Scalar      *values,val;
241   InsertMode  addv = aij->insertmode;
242 
243   /*  wait on receives */
244   while (count) {
245     MPI_Waitany(nrecvs,aij->recv_waits,&imdex,&recv_status);
246     /* unpack receives into our local space */
247     values = aij->rvalues + 3*imdex*aij->rmax;
248     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
249     n = n/3;
250     for ( i=0; i<n; i++ ) {
251       row = (int) PetscReal(values[3*i]) - aij->rstart;
252       col = (int) PetscReal(values[3*i+1]);
253       val = values[3*i+2];
254       if (col >= aij->cstart && col < aij->cend) {
255         col -= aij->cstart;
256         MatSetValues(aij->A,1,&row,1,&col,&val,addv);
257       }
258       else {
259         if (mat->was_assembled) {
260           if (!aij->colmap) {
261             ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
262           }
263           col = aij->colmap[col] - 1;
264           if (col < 0  && !((Mat_SeqAIJ*)(aij->A->data))->nonew) {
265             ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
266             col = (int) PetscReal(values[3*i+1]);
267           }
268         }
269         MatSetValues(aij->B,1,&row,1,&col,&val,addv);
270       }
271     }
272     count--;
273   }
274   PetscFree(aij->recv_waits); PetscFree(aij->rvalues);
275 
276   /* wait on sends */
277   if (aij->nsends) {
278     send_status = (MPI_Status *) PetscMalloc(aij->nsends*sizeof(MPI_Status));
279     CHKPTRQ(send_status);
280     MPI_Waitall(aij->nsends,aij->send_waits,send_status);
281     PetscFree(send_status);
282   }
283   PetscFree(aij->send_waits); PetscFree(aij->svalues);
284 
285   aij->insertmode = NOT_SET_VALUES;
286   ierr = MatAssemblyBegin(aij->A,mode); CHKERRQ(ierr);
287   ierr = MatAssemblyEnd(aij->A,mode); CHKERRQ(ierr);
288 
289   /* determine if any processor has disassembled, if so we must
290      also disassemble ourselfs, in order that we may reassemble. */
291   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
292   if (mat->was_assembled && !other_disassembled) {
293     ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
294   }
295 
296   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
297     ierr = MatSetUpMultiply_MPIAIJ(mat); CHKERRQ(ierr);
298   }
299   ierr = MatAssemblyBegin(aij->B,mode); CHKERRQ(ierr);
300   ierr = MatAssemblyEnd(aij->B,mode); CHKERRQ(ierr);
301 
302   if (aij->rowvalues) {PetscFree(aij->rowvalues); aij->rowvalues = 0;}
303   return 0;
304 }
305 
306 static int MatZeroEntries_MPIAIJ(Mat A)
307 {
308   Mat_MPIAIJ *l = (Mat_MPIAIJ *) A->data;
309   int        ierr;
310   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
311   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
312   return 0;
313 }
314 
315 /* the code does not do the diagonal entries correctly unless the
316    matrix is square and the column and row owerships are identical.
317    This is a BUG. The only way to fix it seems to be to access
318    aij->A and aij->B directly and not through the MatZeroRows()
319    routine.
320 */
321 static int MatZeroRows_MPIAIJ(Mat A,IS is,Scalar *diag)
322 {
323   Mat_MPIAIJ     *l = (Mat_MPIAIJ *) A->data;
324   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
325   int            *procs,*nprocs,j,found,idx,nsends,*work;
326   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
327   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
328   int            *lens,imdex,*lrows,*values;
329   MPI_Comm       comm = A->comm;
330   MPI_Request    *send_waits,*recv_waits;
331   MPI_Status     recv_status,*send_status;
332   IS             istmp;
333 
334   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
335   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
336 
337   /*  first count number of contributors to each processor */
338   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
339   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
340   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
341   for ( i=0; i<N; i++ ) {
342     idx = rows[i];
343     found = 0;
344     for ( j=0; j<size; j++ ) {
345       if (idx >= owners[j] && idx < owners[j+1]) {
346         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
347       }
348     }
349     if (!found) SETERRQ(1,"MatZeroRows_MPIAIJ:Index out of range");
350   }
351   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
352 
353   /* inform other processors of number of messages and max length*/
354   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
355   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
356   nrecvs = work[rank];
357   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
358   nmax = work[rank];
359   PetscFree(work);
360 
361   /* post receives:   */
362   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
363   CHKPTRQ(rvalues);
364   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
365   CHKPTRQ(recv_waits);
366   for ( i=0; i<nrecvs; i++ ) {
367     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
368   }
369 
370   /* do sends:
371       1) starts[i] gives the starting index in svalues for stuff going to
372          the ith processor
373   */
374   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
375   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
376   CHKPTRQ(send_waits);
377   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
378   starts[0] = 0;
379   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
380   for ( i=0; i<N; i++ ) {
381     svalues[starts[owner[i]]++] = rows[i];
382   }
383   ISRestoreIndices(is,&rows);
384 
385   starts[0] = 0;
386   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
387   count = 0;
388   for ( i=0; i<size; i++ ) {
389     if (procs[i]) {
390       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
391     }
392   }
393   PetscFree(starts);
394 
395   base = owners[rank];
396 
397   /*  wait on receives */
398   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
399   source = lens + nrecvs;
400   count  = nrecvs; slen = 0;
401   while (count) {
402     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
403     /* unpack receives into our local space */
404     MPI_Get_count(&recv_status,MPI_INT,&n);
405     source[imdex]  = recv_status.MPI_SOURCE;
406     lens[imdex]  = n;
407     slen += n;
408     count--;
409   }
410   PetscFree(recv_waits);
411 
412   /* move the data into the send scatter */
413   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
414   count = 0;
415   for ( i=0; i<nrecvs; i++ ) {
416     values = rvalues + i*nmax;
417     for ( j=0; j<lens[i]; j++ ) {
418       lrows[count++] = values[j] - base;
419     }
420   }
421   PetscFree(rvalues); PetscFree(lens);
422   PetscFree(owner); PetscFree(nprocs);
423 
424   /* actually zap the local rows */
425   ierr = ISCreateSeq(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
426   PLogObjectParent(A,istmp);
427   PetscFree(lrows);
428   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
429   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
430   ierr = ISDestroy(istmp); CHKERRQ(ierr);
431 
432   /* wait on sends */
433   if (nsends) {
434     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
435     CHKPTRQ(send_status);
436     MPI_Waitall(nsends,send_waits,send_status);
437     PetscFree(send_status);
438   }
439   PetscFree(send_waits); PetscFree(svalues);
440 
441   return 0;
442 }
443 
444 static int MatMult_MPIAIJ(Mat A,Vec xx,Vec yy)
445 {
446   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
447   int        ierr,nt;
448 
449   ierr = VecGetLocalSize(xx,&nt);  CHKERRQ(ierr);
450   if (nt != a->n) {
451     SETERRQ(1,"MatMult_MPIAIJ:Incompatible parition of A and xx");
452   }
453   ierr = VecGetLocalSize(yy,&nt);  CHKERRQ(ierr);
454   if (nt != a->m) {
455     SETERRQ(1,"MatMult_MPIAIJ:Incompatible parition of A and yy");
456   }
457   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx); CHKERRQ(ierr);
458   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
459   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx); CHKERRQ(ierr);
460   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
461   return 0;
462 }
463 
464 static int MatMultAdd_MPIAIJ(Mat A,Vec xx,Vec yy,Vec zz)
465 {
466   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
467   int        ierr;
468   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
469   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
470   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
471   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
472   return 0;
473 }
474 
475 static int MatMultTrans_MPIAIJ(Mat A,Vec xx,Vec yy)
476 {
477   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
478   int        ierr;
479 
480   /* do nondiagonal part */
481   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
482   /* send it on its way */
483   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,
484                 (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
485   /* do local part */
486   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
487   /* receive remote parts: note this assumes the values are not actually */
488   /* inserted in yy until the next line, which is true for my implementation*/
489   /* but is not perhaps always true. */
490   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,
491                   (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
492   return 0;
493 }
494 
495 static int MatMultTransAdd_MPIAIJ(Mat A,Vec xx,Vec yy,Vec zz)
496 {
497   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
498   int        ierr;
499 
500   /* do nondiagonal part */
501   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
502   /* send it on its way */
503   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,
504                  (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
505   /* do local part */
506   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
507   /* receive remote parts: note this assumes the values are not actually */
508   /* inserted in yy until the next line, which is true for my implementation*/
509   /* but is not perhaps always true. */
510   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,
511                   (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
512   return 0;
513 }
514 
515 /*
516   This only works correctly for square matrices where the subblock A->A is the
517    diagonal block
518 */
519 static int MatGetDiagonal_MPIAIJ(Mat A,Vec v)
520 {
521   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
522   if (a->M != a->N)
523     SETERRQ(1,"MatGetDiagonal_MPIAIJ:Supports only square matrix where A->A is diag block");
524   if (a->rstart != a->cstart || a->rend != a->cend) {
525     SETERRQ(1,"MatGetDiagonal_MPIAIJ:row partition must equal col partition");  }
526   return MatGetDiagonal(a->A,v);
527 }
528 
529 static int MatScale_MPIAIJ(Scalar *aa,Mat A)
530 {
531   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
532   int        ierr;
533   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
534   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
535   return 0;
536 }
537 
538 static int MatDestroy_MPIAIJ(PetscObject obj)
539 {
540   Mat        mat = (Mat) obj;
541   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
542   int        ierr;
543 #if defined(PETSC_LOG)
544   PLogObjectState(obj,"Rows=%d, Cols=%d",aij->M,aij->N);
545 #endif
546   PetscFree(aij->rowners);
547   ierr = MatDestroy(aij->A); CHKERRQ(ierr);
548   ierr = MatDestroy(aij->B); CHKERRQ(ierr);
549   if (aij->colmap) PetscFree(aij->colmap);
550   if (aij->garray) PetscFree(aij->garray);
551   if (aij->lvec)   VecDestroy(aij->lvec);
552   if (aij->Mvctx)  VecScatterDestroy(aij->Mvctx);
553   if (aij->rowvalues) PetscFree(aij->rowvalues);
554   PetscFree(aij);
555   PLogObjectDestroy(mat);
556   PetscHeaderDestroy(mat);
557   return 0;
558 }
559 #include "draw.h"
560 #include "pinclude/pviewer.h"
561 
562 static int MatView_MPIAIJ_Binary(Mat mat,Viewer viewer)
563 {
564   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
565   int         ierr;
566 
567   if (aij->size == 1) {
568     ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
569   }
570   else SETERRQ(1,"MatView_MPIAIJ_Binary:Only uniprocessor output supported");
571   return 0;
572 }
573 
574 static int MatView_MPIAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
575 {
576   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
577   Mat_SeqAIJ* C = (Mat_SeqAIJ*)aij->A->data;
578   int         ierr, format,shift = C->indexshift,rank;
579   FILE        *fd;
580   ViewerType  vtype;
581 
582   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
583   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
584     ierr = ViewerGetFormat(viewer,&format);
585     if (format == ASCII_FORMAT_INFO_DETAILED) {
586       int nz, nzalloc, mem, flg;
587       MPI_Comm_rank(mat->comm,&rank);
588       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
589       ierr = MatGetInfo(mat,MAT_LOCAL,&nz,&nzalloc,&mem);
590       ierr = OptionsHasName(PETSC_NULL,"-mat_aij_no_inode",&flg); CHKERRQ(ierr);
591       PetscSequentialPhaseBegin(mat->comm,1);
592       if (flg) fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d mem %d, not using I-node routines\n",
593          rank,aij->m,nz,nzalloc,mem);
594       else fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d mem %d, using I-node routines\n",
595          rank,aij->m,nz,nzalloc,mem);
596       ierr = MatGetInfo(aij->A,MAT_LOCAL,&nz,&nzalloc,&mem);
597       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,nz);
598       ierr = MatGetInfo(aij->B,MAT_LOCAL,&nz,&nzalloc,&mem);
599       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,nz);
600       fflush(fd);
601       PetscSequentialPhaseEnd(mat->comm,1);
602       ierr = VecScatterView(aij->Mvctx,viewer); CHKERRQ(ierr);
603       return 0;
604     }
605     else if (format == ASCII_FORMAT_INFO) {
606       return 0;
607     }
608   }
609 
610   if (vtype == DRAW_VIEWER) {
611     Draw       draw;
612     PetscTruth isnull;
613     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
614     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
615   }
616 
617   if (vtype == ASCII_FILE_VIEWER) {
618     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
619     PetscSequentialPhaseBegin(mat->comm,1);
620     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
621            aij->rank,aij->m,aij->rstart,aij->rend,aij->n,aij->cstart,
622            aij->cend);
623     ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
624     ierr = MatView(aij->B,viewer); CHKERRQ(ierr);
625     fflush(fd);
626     PetscSequentialPhaseEnd(mat->comm,1);
627   }
628   else {
629     int size = aij->size;
630     rank = aij->rank;
631     if (size == 1) {
632       ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
633     }
634     else {
635       /* assemble the entire matrix onto first processor. */
636       Mat         A;
637       Mat_SeqAIJ *Aloc;
638       int         M = aij->M, N = aij->N,m,*ai,*aj,row,*cols,i,*ct;
639       Scalar      *a;
640 
641       if (!rank) {
642         ierr = MatCreateMPIAIJ(mat->comm,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
643                CHKERRQ(ierr);
644       }
645       else {
646         ierr = MatCreateMPIAIJ(mat->comm,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
647                CHKERRQ(ierr);
648       }
649       PLogObjectParent(mat,A);
650 
651       /* copy over the A part */
652       Aloc = (Mat_SeqAIJ*) aij->A->data;
653       m = Aloc->m; ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
654       row = aij->rstart;
655       for ( i=0; i<ai[m]+shift; i++ ) {aj[i] += aij->cstart + shift;}
656       for ( i=0; i<m; i++ ) {
657         ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],aj,a,INSERT_VALUES);CHKERRQ(ierr);
658         row++; a += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
659       }
660       aj = Aloc->j;
661       for ( i=0; i<ai[m]+shift; i++ ) {aj[i] -= aij->cstart + shift;}
662 
663       /* copy over the B part */
664       Aloc = (Mat_SeqAIJ*) aij->B->data;
665       m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
666       row = aij->rstart;
667       ct = cols = (int *) PetscMalloc( (ai[m]+1)*sizeof(int) ); CHKPTRQ(cols);
668       for ( i=0; i<ai[m]+shift; i++ ) {cols[i] = aij->garray[aj[i]+shift];}
669       for ( i=0; i<m; i++ ) {
670         ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],cols,a,INSERT_VALUES);CHKERRQ(ierr);
671         row++; a += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
672       }
673       PetscFree(ct);
674       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
675       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
676       if (!rank) {
677         ierr = MatView(((Mat_MPIAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
678       }
679       ierr = MatDestroy(A); CHKERRQ(ierr);
680     }
681   }
682   return 0;
683 }
684 
685 static int MatView_MPIAIJ(PetscObject obj,Viewer viewer)
686 {
687   Mat         mat = (Mat) obj;
688   int         ierr;
689   ViewerType  vtype;
690 
691   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
692   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
693       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
694     ierr = MatView_MPIAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
695   }
696   else if (vtype == BINARY_FILE_VIEWER) {
697     return MatView_MPIAIJ_Binary(mat,viewer);
698   }
699   return 0;
700 }
701 
702 /*
703     This has to provide several versions.
704 
705      2) a) use only local smoothing updating outer values only once.
706         b) local smoothing updating outer values each inner iteration
707      3) color updating out values betwen colors.
708 */
709 static int MatRelax_MPIAIJ(Mat matin,Vec bb,double omega,MatSORType flag,
710                            double fshift,int its,Vec xx)
711 {
712   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
713   Mat        AA = mat->A, BB = mat->B;
714   Mat_SeqAIJ *A = (Mat_SeqAIJ *) AA->data, *B = (Mat_SeqAIJ *)BB->data;
715   Scalar     *b,*x,*xs,*ls,d,*v,sum;
716   int        ierr,*idx, *diag;
717   int        n = mat->n, m = mat->m, i,shift = A->indexshift;
718 
719   VecGetArray(xx,&x); VecGetArray(bb,&b); VecGetArray(mat->lvec,&ls);
720   xs = x + shift; /* shift by one for index start of 1 */
721   ls = ls + shift;
722   if (!A->diag) {if ((ierr = MatMarkDiag_SeqAIJ(AA))) return ierr;}
723   diag = A->diag;
724   if ((flag & SOR_LOCAL_SYMMETRIC_SWEEP) == SOR_LOCAL_SYMMETRIC_SWEEP){
725     if (flag & SOR_ZERO_INITIAL_GUESS) {
726       return (*mat->A->ops.relax)(mat->A,bb,omega,flag,fshift,its,xx);
727     }
728     ierr=VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,mat->Mvctx);
729     CHKERRQ(ierr);
730     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,mat->Mvctx);
731     CHKERRQ(ierr);
732     while (its--) {
733       /* go down through the rows */
734       for ( i=0; i<m; i++ ) {
735         n    = A->i[i+1] - A->i[i];
736         idx  = A->j + A->i[i] + shift;
737         v    = A->a + A->i[i] + shift;
738         sum  = b[i];
739         SPARSEDENSEMDOT(sum,xs,v,idx,n);
740         d    = fshift + A->a[diag[i]+shift];
741         n    = B->i[i+1] - B->i[i];
742         idx  = B->j + B->i[i] + shift;
743         v    = B->a + B->i[i] + shift;
744         SPARSEDENSEMDOT(sum,ls,v,idx,n);
745         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
746       }
747       /* come up through the rows */
748       for ( i=m-1; i>-1; i-- ) {
749         n    = A->i[i+1] - A->i[i];
750         idx  = A->j + A->i[i] + shift;
751         v    = A->a + A->i[i] + shift;
752         sum  = b[i];
753         SPARSEDENSEMDOT(sum,xs,v,idx,n);
754         d    = fshift + A->a[diag[i]+shift];
755         n    = B->i[i+1] - B->i[i];
756         idx  = B->j + B->i[i] + shift;
757         v    = B->a + B->i[i] + shift;
758         SPARSEDENSEMDOT(sum,ls,v,idx,n);
759         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
760       }
761     }
762   }
763   else if (flag & SOR_LOCAL_FORWARD_SWEEP){
764     if (flag & SOR_ZERO_INITIAL_GUESS) {
765       return (*mat->A->ops.relax)(mat->A,bb,omega,flag,fshift,its,xx);
766     }
767     ierr=VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,mat->Mvctx);
768     CHKERRQ(ierr);
769     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,mat->Mvctx);
770     CHKERRQ(ierr);
771     while (its--) {
772       for ( i=0; i<m; i++ ) {
773         n    = A->i[i+1] - A->i[i];
774         idx  = A->j + A->i[i] + shift;
775         v    = A->a + A->i[i] + shift;
776         sum  = b[i];
777         SPARSEDENSEMDOT(sum,xs,v,idx,n);
778         d    = fshift + A->a[diag[i]+shift];
779         n    = B->i[i+1] - B->i[i];
780         idx  = B->j + B->i[i] + shift;
781         v    = B->a + B->i[i] + shift;
782         SPARSEDENSEMDOT(sum,ls,v,idx,n);
783         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
784       }
785     }
786   }
787   else if (flag & SOR_LOCAL_BACKWARD_SWEEP){
788     if (flag & SOR_ZERO_INITIAL_GUESS) {
789       return (*mat->A->ops.relax)(mat->A,bb,omega,flag,fshift,its,xx);
790     }
791     ierr = VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,
792                             mat->Mvctx); CHKERRQ(ierr);
793     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,
794                             mat->Mvctx); CHKERRQ(ierr);
795     while (its--) {
796       for ( i=m-1; i>-1; i-- ) {
797         n    = A->i[i+1] - A->i[i];
798         idx  = A->j + A->i[i] + shift;
799         v    = A->a + A->i[i] + shift;
800         sum  = b[i];
801         SPARSEDENSEMDOT(sum,xs,v,idx,n);
802         d    = fshift + A->a[diag[i]+shift];
803         n    = B->i[i+1] - B->i[i];
804         idx  = B->j + B->i[i] + shift;
805         v    = B->a + B->i[i] + shift;
806         SPARSEDENSEMDOT(sum,ls,v,idx,n);
807         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
808       }
809     }
810   }
811   else {
812     SETERRQ(1,"MatRelax_MPIAIJ:Option not supported");
813   }
814   return 0;
815 }
816 
817 static int MatGetInfo_MPIAIJ(Mat matin,MatInfoType flag,int *nz,
818                              int *nzalloc,int *mem)
819 {
820   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
821   Mat        A = mat->A, B = mat->B;
822   int        ierr, isend[3], irecv[3], nzA, nzallocA, memA;
823 
824   ierr = MatGetInfo(A,MAT_LOCAL,&nzA,&nzallocA,&memA); CHKERRQ(ierr);
825   ierr = MatGetInfo(B,MAT_LOCAL,&isend[0],&isend[1],&isend[2]); CHKERRQ(ierr);
826   isend[0] += nzA; isend[1] += nzallocA; isend[2] += memA;
827   if (flag == MAT_LOCAL) {
828     if (nz)       *nz      = isend[0];
829     if (nzalloc)  *nzalloc = isend[1];
830     if (mem)      *mem     = isend[2];
831   } else if (flag == MAT_GLOBAL_MAX) {
832     MPI_Allreduce( isend, irecv,3,MPI_INT,MPI_MAX,matin->comm);
833     if (nz)      *nz      = irecv[0];
834     if (nzalloc) *nzalloc = irecv[1];
835     if (mem)     *mem     = irecv[2];
836   } else if (flag == MAT_GLOBAL_SUM) {
837     MPI_Allreduce( isend, irecv,3,MPI_INT,MPI_SUM,matin->comm);
838     if (nz)      *nz      = irecv[0];
839     if (nzalloc) *nzalloc = irecv[1];
840     if (mem)     *mem     = irecv[2];
841   }
842   return 0;
843 }
844 
845 extern int MatLUFactorSymbolic_MPIAIJ(Mat,IS,IS,double,Mat*);
846 extern int MatLUFactorNumeric_MPIAIJ(Mat,Mat*);
847 extern int MatLUFactor_MPIAIJ(Mat,IS,IS,double);
848 extern int MatILUFactorSymbolic_MPIAIJ(Mat,IS,IS,double,int,Mat *);
849 extern int MatSolve_MPIAIJ(Mat,Vec,Vec);
850 extern int MatSolveAdd_MPIAIJ(Mat,Vec,Vec,Vec);
851 extern int MatSolveTrans_MPIAIJ(Mat,Vec,Vec);
852 extern int MatSolveTransAdd_MPIAIJ(Mat,Vec,Vec,Vec);
853 
854 static int MatSetOption_MPIAIJ(Mat A,MatOption op)
855 {
856   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
857 
858   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
859       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
860       op == MAT_COLUMNS_SORTED ||
861       op == MAT_ROW_ORIENTED) {
862         MatSetOption(a->A,op);
863         MatSetOption(a->B,op);
864   }
865   else if (op == MAT_ROWS_SORTED ||
866            op == MAT_SYMMETRIC ||
867            op == MAT_STRUCTURALLY_SYMMETRIC ||
868            op == MAT_YES_NEW_DIAGONALS)
869     PLogInfo(A,"Info:MatSetOption_MPIAIJ:Option ignored\n");
870   else if (op == MAT_COLUMN_ORIENTED) {
871     a->roworiented = 0;
872     MatSetOption(a->A,op);
873     MatSetOption(a->B,op);
874   }
875   else if (op == MAT_NO_NEW_DIAGONALS)
876     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIAIJ:MAT_NO_NEW_DIAGONALS");}
877   else
878     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIAIJ:unknown option");}
879   return 0;
880 }
881 
882 static int MatGetSize_MPIAIJ(Mat matin,int *m,int *n)
883 {
884   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
885   *m = mat->M; *n = mat->N;
886   return 0;
887 }
888 
889 static int MatGetLocalSize_MPIAIJ(Mat matin,int *m,int *n)
890 {
891   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
892   *m = mat->m; *n = mat->N;
893   return 0;
894 }
895 
896 static int MatGetOwnershipRange_MPIAIJ(Mat matin,int *m,int *n)
897 {
898   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
899   *m = mat->rstart; *n = mat->rend;
900   return 0;
901 }
902 
903 extern int MatGetRow_SeqAIJ(Mat,int,int*,int**,Scalar**);
904 extern int MatRestoreRow_SeqAIJ(Mat,int,int*,int**,Scalar**);
905 
906 int MatGetRow_MPIAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
907 {
908   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
909   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
910   int        i, ierr, *cworkA, *cworkB, **pcA, **pcB, cstart = mat->cstart;
911   int        nztot, nzA, nzB, lrow, rstart = mat->rstart, rend = mat->rend;
912   int        *cmap, *idx_p;
913 
914   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,"MatGetRow_MPIAIJ:Already active");
915   mat->getrowactive = PETSC_TRUE;
916 
917   if (!mat->rowvalues && (idx || v)) {
918     /*
919         allocate enough space to hold information from the longest row.
920     */
921     Mat_SeqAIJ *Aa = (Mat_SeqAIJ *) mat->A->data,*Ba = (Mat_SeqAIJ *) mat->B->data;
922     int     max = 1,m = mat->m,tmp;
923     for ( i=0; i<m; i++ ) {
924       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
925       if (max < tmp) { max = tmp; }
926     }
927     mat->rowvalues = (Scalar *) PetscMalloc( max*(sizeof(int)+sizeof(Scalar)));
928     CHKPTRQ(mat->rowvalues);
929     mat->rowindices = (int *) (mat->rowvalues + max);
930   }
931 
932   if (row < rstart || row >= rend) SETERRQ(1,"MatGetRow_MPIAIJ:Only local rows")
933   lrow = row - rstart;
934 
935   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
936   if (!v)   {pvA = 0; pvB = 0;}
937   if (!idx) {pcA = 0; if (!v) pcB = 0;}
938   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
939   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
940   nztot = nzA + nzB;
941 
942   cmap  = mat->garray;
943   if (v  || idx) {
944     if (nztot) {
945       /* Sort by increasing column numbers, assuming A and B already sorted */
946       int imark = -1;
947       if (v) {
948         *v = v_p = mat->rowvalues;
949         for ( i=0; i<nzB; i++ ) {
950           if (cmap[cworkB[i]] < cstart)   v_p[i] = vworkB[i];
951           else break;
952         }
953         imark = i;
954         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
955         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
956       }
957       if (idx) {
958         *idx = idx_p = mat->rowindices;
959         if (imark > -1) {
960           for ( i=0; i<imark; i++ ) {
961             idx_p[i] = cmap[cworkB[i]];
962           }
963         } else {
964           for ( i=0; i<nzB; i++ ) {
965             if (cmap[cworkB[i]] < cstart)   idx_p[i] = cmap[cworkB[i]];
966             else break;
967           }
968           imark = i;
969         }
970         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart + cworkA[i];
971         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]];
972       }
973     }
974     else {
975       if (idx) *idx = 0;
976       if (v)   *v   = 0;
977     }
978   }
979   *nz = nztot;
980   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
981   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
982   return 0;
983 }
984 
985 int MatRestoreRow_MPIAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
986 {
987   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
988   if (aij->getrowactive == PETSC_FALSE) {
989     SETERRQ(1,"MatRestoreRow_MPIAIJ:MatGetRow not called");
990   }
991   aij->getrowactive = PETSC_FALSE;
992   return 0;
993 }
994 
995 static int MatNorm_MPIAIJ(Mat mat,NormType type,double *norm)
996 {
997   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
998   Mat_SeqAIJ *amat = (Mat_SeqAIJ*) aij->A->data, *bmat = (Mat_SeqAIJ*) aij->B->data;
999   int        ierr, i, j, cstart = aij->cstart,shift = amat->indexshift;
1000   double     sum = 0.0;
1001   Scalar     *v;
1002 
1003   if (aij->size == 1) {
1004     ierr =  MatNorm(aij->A,type,norm); CHKERRQ(ierr);
1005   } else {
1006     if (type == NORM_FROBENIUS) {
1007       v = amat->a;
1008       for (i=0; i<amat->nz; i++ ) {
1009 #if defined(PETSC_COMPLEX)
1010         sum += real(conj(*v)*(*v)); v++;
1011 #else
1012         sum += (*v)*(*v); v++;
1013 #endif
1014       }
1015       v = bmat->a;
1016       for (i=0; i<bmat->nz; i++ ) {
1017 #if defined(PETSC_COMPLEX)
1018         sum += real(conj(*v)*(*v)); v++;
1019 #else
1020         sum += (*v)*(*v); v++;
1021 #endif
1022       }
1023       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
1024       *norm = sqrt(*norm);
1025     }
1026     else if (type == NORM_1) { /* max column norm */
1027       double *tmp, *tmp2;
1028       int    *jj, *garray = aij->garray;
1029       tmp  = (double *) PetscMalloc( aij->N*sizeof(double) ); CHKPTRQ(tmp);
1030       tmp2 = (double *) PetscMalloc( aij->N*sizeof(double) ); CHKPTRQ(tmp2);
1031       PetscMemzero(tmp,aij->N*sizeof(double));
1032       *norm = 0.0;
1033       v = amat->a; jj = amat->j;
1034       for ( j=0; j<amat->nz; j++ ) {
1035         tmp[cstart + *jj++ + shift] += PetscAbsScalar(*v);  v++;
1036       }
1037       v = bmat->a; jj = bmat->j;
1038       for ( j=0; j<bmat->nz; j++ ) {
1039         tmp[garray[*jj++ + shift]] += PetscAbsScalar(*v); v++;
1040       }
1041       MPI_Allreduce(tmp,tmp2,aij->N,MPI_DOUBLE,MPI_SUM,mat->comm);
1042       for ( j=0; j<aij->N; j++ ) {
1043         if (tmp2[j] > *norm) *norm = tmp2[j];
1044       }
1045       PetscFree(tmp); PetscFree(tmp2);
1046     }
1047     else if (type == NORM_INFINITY) { /* max row norm */
1048       double ntemp = 0.0;
1049       for ( j=0; j<amat->m; j++ ) {
1050         v = amat->a + amat->i[j] + shift;
1051         sum = 0.0;
1052         for ( i=0; i<amat->i[j+1]-amat->i[j]; i++ ) {
1053           sum += PetscAbsScalar(*v); v++;
1054         }
1055         v = bmat->a + bmat->i[j] + shift;
1056         for ( i=0; i<bmat->i[j+1]-bmat->i[j]; i++ ) {
1057           sum += PetscAbsScalar(*v); v++;
1058         }
1059         if (sum > ntemp) ntemp = sum;
1060       }
1061       MPI_Allreduce(&ntemp,norm,1,MPI_DOUBLE,MPI_MAX,mat->comm);
1062     }
1063     else {
1064       SETERRQ(1,"MatNorm_MPIAIJ:No support for two norm");
1065     }
1066   }
1067   return 0;
1068 }
1069 
1070 static int MatTranspose_MPIAIJ(Mat A,Mat *matout)
1071 {
1072   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
1073   Mat_SeqAIJ *Aloc = (Mat_SeqAIJ *) a->A->data;
1074   int        ierr,shift = Aloc->indexshift;
1075   Mat        B;
1076   int        M = a->M, N = a->N,m,*ai,*aj,row,*cols,i,*ct;
1077   Scalar     *array;
1078 
1079   if (matout == PETSC_NULL && M != N)
1080     SETERRQ(1,"MatTranspose_MPIAIJ:Square matrix only for in-place");
1081   ierr = MatCreateMPIAIJ(A->comm,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,
1082          PETSC_NULL,&B); CHKERRQ(ierr);
1083 
1084   /* copy over the A part */
1085   Aloc = (Mat_SeqAIJ*) a->A->data;
1086   m = Aloc->m; ai = Aloc->i; aj = Aloc->j; array = Aloc->a;
1087   row = a->rstart;
1088   for ( i=0; i<ai[m]+shift; i++ ) {aj[i] += a->cstart + shift;}
1089   for ( i=0; i<m; i++ ) {
1090     ierr = MatSetValues(B,ai[i+1]-ai[i],aj,1,&row,array,INSERT_VALUES);CHKERRQ(ierr);
1091     row++; array += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
1092   }
1093   aj = Aloc->j;
1094   for ( i=0; i<ai[m]+shift; i++ ) {aj[i] -= a->cstart + shift;}
1095 
1096   /* copy over the B part */
1097   Aloc = (Mat_SeqAIJ*) a->B->data;
1098   m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; array = Aloc->a;
1099   row = a->rstart;
1100   ct = cols = (int *) PetscMalloc( (1+ai[m]-shift)*sizeof(int) ); CHKPTRQ(cols);
1101   for ( i=0; i<ai[m]+shift; i++ ) {cols[i] = a->garray[aj[i]+shift];}
1102   for ( i=0; i<m; i++ ) {
1103     ierr = MatSetValues(B,ai[i+1]-ai[i],cols,1,&row,array,INSERT_VALUES);CHKERRQ(ierr);
1104     row++; array += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
1105   }
1106   PetscFree(ct);
1107   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1108   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1109   if (matout != PETSC_NULL) {
1110     *matout = B;
1111   } else {
1112     /* This isn't really an in-place transpose .... but free data structures from a */
1113     PetscFree(a->rowners);
1114     ierr = MatDestroy(a->A); CHKERRQ(ierr);
1115     ierr = MatDestroy(a->B); CHKERRQ(ierr);
1116     if (a->colmap) PetscFree(a->colmap);
1117     if (a->garray) PetscFree(a->garray);
1118     if (a->lvec) VecDestroy(a->lvec);
1119     if (a->Mvctx) VecScatterDestroy(a->Mvctx);
1120     PetscFree(a);
1121     PetscMemcpy(A,B,sizeof(struct _Mat));
1122     PetscHeaderDestroy(B);
1123   }
1124   return 0;
1125 }
1126 
1127 extern int MatPrintHelp_SeqAIJ(Mat);
1128 static int MatPrintHelp_MPIAIJ(Mat A)
1129 {
1130   Mat_MPIAIJ *a   = (Mat_MPIAIJ*) A->data;
1131 
1132   if (!a->rank) return MatPrintHelp_SeqAIJ(a->A);
1133   else return 0;
1134 }
1135 
1136 static int MatGetBlockSize_MPIAIJ(Mat A,int *bs)
1137 {
1138   *bs = 1;
1139   return 0;
1140 }
1141 
1142 extern int MatConvert_MPIAIJ(Mat,MatType,Mat *);
1143 static int MatConvertSameType_MPIAIJ(Mat,Mat *,int);
1144 extern int MatIncreaseOverlap_MPIAIJ(Mat , int, IS *, int);
1145 int MatGetSubMatrices_MPIAIJ (Mat ,int , IS *,IS *,MatGetSubMatrixCall,Mat **);
1146 /* -------------------------------------------------------------------*/
1147 static struct _MatOps MatOps = {MatSetValues_MPIAIJ,
1148        MatGetRow_MPIAIJ,MatRestoreRow_MPIAIJ,
1149        MatMult_MPIAIJ,MatMultAdd_MPIAIJ,
1150        MatMultTrans_MPIAIJ,MatMultTransAdd_MPIAIJ,
1151        MatSolve_MPIAIJ,MatSolveAdd_MPIAIJ,
1152        MatSolveTrans_MPIAIJ,MatSolveTransAdd_MPIAIJ,
1153        MatLUFactor_MPIAIJ,0,
1154        MatRelax_MPIAIJ,
1155        MatTranspose_MPIAIJ,
1156        MatGetInfo_MPIAIJ,0,
1157        MatGetDiagonal_MPIAIJ,0,MatNorm_MPIAIJ,
1158        MatAssemblyBegin_MPIAIJ,MatAssemblyEnd_MPIAIJ,
1159        0,
1160        MatSetOption_MPIAIJ,MatZeroEntries_MPIAIJ,MatZeroRows_MPIAIJ,
1161        MatGetReordering_MPIAIJ,
1162        MatLUFactorSymbolic_MPIAIJ,MatLUFactorNumeric_MPIAIJ,0,0,
1163        MatGetSize_MPIAIJ,MatGetLocalSize_MPIAIJ,MatGetOwnershipRange_MPIAIJ,
1164        MatILUFactorSymbolic_MPIAIJ,0,
1165        0,0,MatConvert_MPIAIJ,MatConvertSameType_MPIAIJ,0,0,
1166        0,0,0,
1167        MatGetSubMatrices_MPIAIJ,MatIncreaseOverlap_MPIAIJ,MatGetValues_MPIAIJ,0,
1168        MatPrintHelp_MPIAIJ,
1169        MatScale_MPIAIJ,0,0,0,MatGetBlockSize_MPIAIJ};
1170 
1171 /*@C
1172    MatCreateMPIAIJ - Creates a sparse parallel matrix in AIJ format
1173    (the default parallel PETSc format).  For good matrix assembly performance
1174    the user should preallocate the matrix storage by setting the parameters
1175    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1176    performance can be increased by more than a factor of 50.
1177 
1178    Input Parameters:
1179 .  comm - MPI communicator
1180 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1181            This value should be the same as the local size used in creating the
1182            y vector for the matrix-vector product y = Ax.
1183 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1184            This value should be the same as the local size used in creating the
1185            x vector for the matrix-vector product y = Ax.
1186 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1187 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1188 .  d_nz - number of nonzeros per row in diagonal portion of local submatrix
1189            (same for all local rows)
1190 .  d_nzz - array containing the number of nonzeros in the various rows of the
1191            diagonal portion of the local submatrix (possibly different for each row)
1192            or PETSC_NULL. You must leave room for the diagonal entry even if
1193            it is zero.
1194 .  o_nz - number of nonzeros per row in the off-diagonal portion of local
1195            submatrix (same for all local rows).
1196 .  o_nzz - array containing the number of nonzeros in the various rows of the
1197            off-diagonal portion of the local submatrix (possibly different for
1198            each row) or PETSC_NULL.
1199 
1200    Output Parameter:
1201 .  A - the matrix
1202 
1203    Notes:
1204    The AIJ format (also called the Yale sparse matrix format or
1205    compressed row storage), is fully compatible with standard Fortran 77
1206    storage.  That is, the stored row and column indices can begin at
1207    either one (as in Fortran) or zero.  See the users manual for details.
1208 
1209    The user MUST specify either the local or global matrix dimensions
1210    (possibly both).
1211 
1212    By default, this format uses inodes (identical nodes) when possible.
1213    We search for consecutive rows with the same nonzero structure, thereby
1214    reusing matrix information to achieve increased efficiency.
1215 
1216    Options Database Keys:
1217 $    -mat_aij_no_inode  - Do not use inodes
1218 $    -mat_aij_inode_limit <limit> - Set inode limit.
1219 $        (max limit=5)
1220 $    -mat_aij_oneindex - Internally use indexing starting at 1
1221 $        rather than 0.  Note: When calling MatSetValues(),
1222 $        the user still MUST index entries starting at 0!
1223 
1224    Storage Information:
1225    For a square global matrix we define each processor's diagonal portion
1226    to be its local rows and the corresponding columns (a square submatrix);
1227    each processor's off-diagonal portion encompasses the remainder of the
1228    local matrix (a rectangular submatrix).
1229 
1230    The user can specify preallocated storage for the diagonal part of
1231    the local submatrix with either d_nz or d_nnz (not both).  Set
1232    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1233    memory allocation.  Likewise, specify preallocated storage for the
1234    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1235 
1236    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1237    the figure below we depict these three local rows and all columns (0-11).
1238 
1239 $          0 1 2 3 4 5 6 7 8 9 10 11
1240 $         -------------------
1241 $  row 3  |  o o o d d d o o o o o o
1242 $  row 4  |  o o o d d d o o o o o o
1243 $  row 5  |  o o o d d d o o o o o o
1244 $         -------------------
1245 $
1246 
1247    Thus, any entries in the d locations are stored in the d (diagonal)
1248    submatrix, and any entries in the o locations are stored in the
1249    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1250    stored simply in the MATSEQAIJ format for compressed row storage.
1251 
1252    Now d_nz should indicate the number of nonzeros per row in the d matrix,
1253    and o_nz should indicate the number of nonzeros per row in the o matrix.
1254    In general, for PDE problems in which most nonzeros are near the diagonal,
1255    one expects d_nz >> o_nz. For large problems you MUST preallocate memory
1256    or you will get TERRIBLE performance; see the users' manual chapter on
1257    matrices and the file $(PETSC_DIR)/Performance.
1258 
1259 .keywords: matrix, aij, compressed row, sparse, parallel
1260 
1261 .seealso: MatCreate(), MatCreateSeqAIJ(), MatSetValues()
1262 @*/
1263 int MatCreateMPIAIJ(MPI_Comm comm,int m,int n,int M,int N,
1264                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1265 {
1266   Mat          B;
1267   Mat_MPIAIJ   *b;
1268   int          ierr, i,sum[2],work[2];
1269 
1270   *A = 0;
1271   PetscHeaderCreate(B,_Mat,MAT_COOKIE,MATMPIAIJ,comm);
1272   PLogObjectCreate(B);
1273   B->data       = (void *) (b = PetscNew(Mat_MPIAIJ)); CHKPTRQ(b);
1274   PetscMemzero(b,sizeof(Mat_MPIAIJ));
1275   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1276   B->destroy    = MatDestroy_MPIAIJ;
1277   B->view       = MatView_MPIAIJ;
1278   B->factor     = 0;
1279   B->assembled  = PETSC_FALSE;
1280 
1281   b->insertmode = NOT_SET_VALUES;
1282   MPI_Comm_rank(comm,&b->rank);
1283   MPI_Comm_size(comm,&b->size);
1284 
1285   if (m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
1286     SETERRQ(1,"MatCreateMPIAIJ:Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1287 
1288   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1289     work[0] = m; work[1] = n;
1290     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
1291     if (M == PETSC_DECIDE) M = sum[0];
1292     if (N == PETSC_DECIDE) N = sum[1];
1293   }
1294   if (m == PETSC_DECIDE) {m = M/b->size + ((M % b->size) > b->rank);}
1295   if (n == PETSC_DECIDE) {n = N/b->size + ((N % b->size) > b->rank);}
1296   b->m = m; B->m = m;
1297   b->n = n; B->n = n;
1298   b->N = N; B->N = N;
1299   b->M = M; B->M = M;
1300 
1301   /* build local table of row and column ownerships */
1302   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1303   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIAIJ));
1304   b->cowners = b->rowners + b->size + 2;
1305   MPI_Allgather(&m,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
1306   b->rowners[0] = 0;
1307   for ( i=2; i<=b->size; i++ ) {
1308     b->rowners[i] += b->rowners[i-1];
1309   }
1310   b->rstart = b->rowners[b->rank];
1311   b->rend   = b->rowners[b->rank+1];
1312   MPI_Allgather(&n,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
1313   b->cowners[0] = 0;
1314   for ( i=2; i<=b->size; i++ ) {
1315     b->cowners[i] += b->cowners[i-1];
1316   }
1317   b->cstart = b->cowners[b->rank];
1318   b->cend   = b->cowners[b->rank+1];
1319 
1320   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1321   ierr = MatCreateSeqAIJ(MPI_COMM_SELF,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1322   PLogObjectParent(B,b->A);
1323   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1324   ierr = MatCreateSeqAIJ(MPI_COMM_SELF,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1325   PLogObjectParent(B,b->B);
1326 
1327   /* build cache for off array entries formed */
1328   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1329   b->colmap      = 0;
1330   b->garray      = 0;
1331   b->roworiented = 1;
1332 
1333   /* stuff used for matrix vector multiply */
1334   b->lvec      = 0;
1335   b->Mvctx     = 0;
1336 
1337   /* stuff for MatGetRow() */
1338   b->rowindices   = 0;
1339   b->rowvalues    = 0;
1340   b->getrowactive = PETSC_FALSE;
1341 
1342   *A = B;
1343   return 0;
1344 }
1345 
1346 static int MatConvertSameType_MPIAIJ(Mat matin,Mat *newmat,int cpvalues)
1347 {
1348   Mat        mat;
1349   Mat_MPIAIJ *a,*oldmat = (Mat_MPIAIJ *) matin->data;
1350   int        ierr, len=0, flg;
1351 
1352   *newmat       = 0;
1353   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIAIJ,matin->comm);
1354   PLogObjectCreate(mat);
1355   mat->data       = (void *) (a = PetscNew(Mat_MPIAIJ)); CHKPTRQ(a);
1356   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1357   mat->destroy    = MatDestroy_MPIAIJ;
1358   mat->view       = MatView_MPIAIJ;
1359   mat->factor     = matin->factor;
1360   mat->assembled  = PETSC_TRUE;
1361 
1362   a->m = mat->m   = oldmat->m;
1363   a->n = mat->n   = oldmat->n;
1364   a->M = mat->M   = oldmat->M;
1365   a->N = mat->N   = oldmat->N;
1366 
1367   a->rstart       = oldmat->rstart;
1368   a->rend         = oldmat->rend;
1369   a->cstart       = oldmat->cstart;
1370   a->cend         = oldmat->cend;
1371   a->size         = oldmat->size;
1372   a->rank         = oldmat->rank;
1373   a->insertmode   = NOT_SET_VALUES;
1374   a->rowvalues    = 0;
1375   a->getrowactive = PETSC_FALSE;
1376 
1377   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1378   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIAIJ));
1379   a->cowners = a->rowners + a->size + 2;
1380   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1381   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1382   if (oldmat->colmap) {
1383     a->colmap = (int *) PetscMalloc((a->N)*sizeof(int));CHKPTRQ(a->colmap);
1384     PLogObjectMemory(mat,(a->N)*sizeof(int));
1385     PetscMemcpy(a->colmap,oldmat->colmap,(a->N)*sizeof(int));
1386   } else a->colmap = 0;
1387   if (oldmat->garray && (len = ((Mat_SeqAIJ *) (oldmat->B->data))->n)) {
1388     a->garray = (int *) PetscMalloc(len*sizeof(int)); CHKPTRQ(a->garray);
1389     PLogObjectMemory(mat,len*sizeof(int));
1390     PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1391   } else a->garray = 0;
1392 
1393   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1394   PLogObjectParent(mat,a->lvec);
1395   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1396   PLogObjectParent(mat,a->Mvctx);
1397   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1398   PLogObjectParent(mat,a->A);
1399   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1400   PLogObjectParent(mat,a->B);
1401   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1402   if (flg) {
1403     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1404   }
1405   *newmat = mat;
1406   return 0;
1407 }
1408 
1409 #include "sys.h"
1410 
1411 int MatLoad_MPIAIJ(Viewer viewer,MatType type,Mat *newmat)
1412 {
1413   Mat          A;
1414   int          i, nz, ierr, j,rstart, rend, fd;
1415   Scalar       *vals,*svals;
1416   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1417   MPI_Status   status;
1418   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,maxnz,*cols;
1419   int          *ourlens,*sndcounts = 0,*procsnz = 0, *offlens,jj,*mycols,*smycols;
1420   int          tag = ((PetscObject)viewer)->tag;
1421 
1422   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1423   if (!rank) {
1424     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1425     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
1426     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIAIJ:not matrix object");
1427   }
1428 
1429   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1430   M = header[1]; N = header[2];
1431   /* determine ownership of all rows */
1432   m = M/size + ((M % size) > rank);
1433   rowners = (int *) PetscMalloc((size+2)*sizeof(int)); CHKPTRQ(rowners);
1434   MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1435   rowners[0] = 0;
1436   for ( i=2; i<=size; i++ ) {
1437     rowners[i] += rowners[i-1];
1438   }
1439   rstart = rowners[rank];
1440   rend   = rowners[rank+1];
1441 
1442   /* distribute row lengths to all processors */
1443   ourlens = (int*) PetscMalloc( 2*(rend-rstart)*sizeof(int) ); CHKPTRQ(ourlens);
1444   offlens = ourlens + (rend-rstart);
1445   if (!rank) {
1446     rowlengths = (int*) PetscMalloc( M*sizeof(int) ); CHKPTRQ(rowlengths);
1447     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1448     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1449     for ( i=0; i<size; i++ ) sndcounts[i] = rowners[i+1] - rowners[i];
1450     MPI_Scatterv(rowlengths,sndcounts,rowners,MPI_INT,ourlens,rend-rstart,MPI_INT,0,comm);
1451     PetscFree(sndcounts);
1452   }
1453   else {
1454     MPI_Scatterv(0,0,0,MPI_INT,ourlens,rend-rstart,MPI_INT, 0,comm);
1455   }
1456 
1457   if (!rank) {
1458     /* calculate the number of nonzeros on each processor */
1459     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1460     PetscMemzero(procsnz,size*sizeof(int));
1461     for ( i=0; i<size; i++ ) {
1462       for ( j=rowners[i]; j< rowners[i+1]; j++ ) {
1463         procsnz[i] += rowlengths[j];
1464       }
1465     }
1466     PetscFree(rowlengths);
1467 
1468     /* determine max buffer needed and allocate it */
1469     maxnz = 0;
1470     for ( i=0; i<size; i++ ) {
1471       maxnz = PetscMax(maxnz,procsnz[i]);
1472     }
1473     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1474 
1475     /* read in my part of the matrix column indices  */
1476     nz = procsnz[0];
1477     mycols = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1478     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1479 
1480     /* read in every one elses and ship off */
1481     for ( i=1; i<size; i++ ) {
1482       nz = procsnz[i];
1483       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1484       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1485     }
1486     PetscFree(cols);
1487   }
1488   else {
1489     /* determine buffer space needed for message */
1490     nz = 0;
1491     for ( i=0; i<m; i++ ) {
1492       nz += ourlens[i];
1493     }
1494     mycols = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1495 
1496     /* receive message of column indices*/
1497     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1498     MPI_Get_count(&status,MPI_INT,&maxnz);
1499     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIAIJ:something is wrong with file");
1500   }
1501 
1502   /* loop over local rows, determining number of off diagonal entries */
1503   PetscMemzero(offlens,m*sizeof(int));
1504   jj = 0;
1505   for ( i=0; i<m; i++ ) {
1506     for ( j=0; j<ourlens[i]; j++ ) {
1507       if (mycols[jj] < rstart || mycols[jj] >= rend) offlens[i]++;
1508       jj++;
1509     }
1510   }
1511 
1512   /* create our matrix */
1513   for ( i=0; i<m; i++ ) {
1514     ourlens[i] -= offlens[i];
1515   }
1516   ierr = MatCreateMPIAIJ(comm,m,PETSC_DECIDE,M,N,0,ourlens,0,offlens,newmat);CHKERRQ(ierr);
1517   A = *newmat;
1518   MatSetOption(A,MAT_COLUMNS_SORTED);
1519   for ( i=0; i<m; i++ ) {
1520     ourlens[i] += offlens[i];
1521   }
1522 
1523   if (!rank) {
1524     vals = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(vals);
1525 
1526     /* read in my part of the matrix numerical values  */
1527     nz = procsnz[0];
1528     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1529 
1530     /* insert into matrix */
1531     jj      = rstart;
1532     smycols = mycols;
1533     svals   = vals;
1534     for ( i=0; i<m; i++ ) {
1535       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1536       smycols += ourlens[i];
1537       svals   += ourlens[i];
1538       jj++;
1539     }
1540 
1541     /* read in other processors and ship out */
1542     for ( i=1; i<size; i++ ) {
1543       nz = procsnz[i];
1544       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1545       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1546     }
1547     PetscFree(procsnz);
1548   }
1549   else {
1550     /* receive numeric values */
1551     vals = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(vals);
1552 
1553     /* receive message of values*/
1554     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1555     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1556     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIAIJ:something is wrong with file");
1557 
1558     /* insert into matrix */
1559     jj      = rstart;
1560     smycols = mycols;
1561     svals   = vals;
1562     for ( i=0; i<m; i++ ) {
1563       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1564       smycols += ourlens[i];
1565       svals   += ourlens[i];
1566       jj++;
1567     }
1568   }
1569   PetscFree(ourlens); PetscFree(vals); PetscFree(mycols); PetscFree(rowners);
1570 
1571   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1572   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1573   return 0;
1574 }
1575