xref: /petsc/src/mat/impls/aij/mpi/mpiaij.c (revision 3f41c07dae894bc9f45e63ed04312cde128954a3)
1 
2 #ifndef lint
3 static char vcid[] = "$Id: mpiaij.c,v 1.161 1996/08/12 03:41:34 bsmith Exp bsmith $";
4 #endif
5 
6 #include "src/mat/impls/aij/mpi/mpiaij.h"
7 #include "src/vec/vecimpl.h"
8 #include "src/inline/spops.h"
9 
10 /* local utility routine that creates a mapping from the global column
11 number to the local number in the off-diagonal part of the local
12 storage of the matrix.  This is done in a non scable way since the
13 length of colmap equals the global matrix length.
14 */
15 static int CreateColmap_MPIAIJ_Private(Mat mat)
16 {
17   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
18   Mat_SeqAIJ *B = (Mat_SeqAIJ*) aij->B->data;
19   int        n = B->n,i;
20 
21   aij->colmap = (int *) PetscMalloc(aij->N*sizeof(int));CHKPTRQ(aij->colmap);
22   PLogObjectMemory(mat,aij->N*sizeof(int));
23   PetscMemzero(aij->colmap,aij->N*sizeof(int));
24   for ( i=0; i<n; i++ ) aij->colmap[aij->garray[i]] = i+1;
25   return 0;
26 }
27 
28 extern int DisAssemble_MPIAIJ(Mat);
29 
30 static int MatGetReordering_MPIAIJ(Mat mat,MatReordering type,IS *rperm,IS *cperm)
31 {
32   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
33   int        ierr;
34   if (aij->size == 1) {
35     ierr = MatGetReordering(aij->A,type,rperm,cperm); CHKERRQ(ierr);
36   } else SETERRQ(1,"MatGetReordering_MPIAIJ:not supported in parallel");
37   return 0;
38 }
39 
40 static int MatSetValues_MPIAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
41 {
42   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
43   Scalar     value;
44   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
45   int        cstart = aij->cstart, cend = aij->cend,row,col;
46   int        roworiented = aij->roworiented;
47 
48   if (aij->insertmode != NOT_SET_VALUES && aij->insertmode != addv) {
49     SETERRQ(1,"MatSetValues_MPIAIJ:Cannot mix inserts and adds");
50   }
51   aij->insertmode = addv;
52   for ( i=0; i<m; i++ ) {
53     if (im[i] < 0) SETERRQ(1,"MatSetValues_MPIAIJ:Negative row");
54     if (im[i] >= aij->M) SETERRQ(1,"MatSetValues_MPIAIJ:Row too large");
55     if (im[i] >= rstart && im[i] < rend) {
56       row = im[i] - rstart;
57       for ( j=0; j<n; j++ ) {
58         if (in[j] < 0) SETERRQ(1,"MatSetValues_MPIAIJ:Negative column");
59         if (in[j] >= aij->N) SETERRQ(1,"MatSetValues_MPIAIJ:Col too large");
60         if (in[j] >= cstart && in[j] < cend){
61           col = in[j] - cstart;
62           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
63           ierr = MatSetValues(aij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
64         }
65         else {
66           if (mat->was_assembled) {
67             if (!aij->colmap) {
68               ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
69             }
70             col = aij->colmap[in[j]] - 1;
71             if (col < 0 && !((Mat_SeqAIJ*)(aij->A->data))->nonew) {
72               ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
73               col =  in[j];
74             }
75           }
76           else col = in[j];
77           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
78           ierr = MatSetValues(aij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
79         }
80       }
81     }
82     else {
83       if (roworiented) {
84         ierr = StashValues_Private(&aij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
85       }
86       else {
87         row = im[i];
88         for ( j=0; j<n; j++ ) {
89           ierr = StashValues_Private(&aij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
90         }
91       }
92     }
93   }
94   return 0;
95 }
96 
97 static int MatGetValues_MPIAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
98 {
99   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
100   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
101   int        cstart = aij->cstart, cend = aij->cend,row,col;
102 
103   for ( i=0; i<m; i++ ) {
104     if (idxm[i] < 0) SETERRQ(1,"MatGetValues_MPIAIJ:Negative row");
105     if (idxm[i] >= aij->M) SETERRQ(1,"MatGetValues_MPIAIJ:Row too large");
106     if (idxm[i] >= rstart && idxm[i] < rend) {
107       row = idxm[i] - rstart;
108       for ( j=0; j<n; j++ ) {
109         if (idxn[j] < 0) SETERRQ(1,"MatGetValues_MPIAIJ:Negative column");
110         if (idxn[j] >= aij->N) SETERRQ(1,"MatGetValues_MPIAIJ:Col too large");
111         if (idxn[j] >= cstart && idxn[j] < cend){
112           col = idxn[j] - cstart;
113           ierr = MatGetValues(aij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
114         }
115         else {
116           if (!aij->colmap) {
117             ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
118           }
119           col = aij->colmap[idxn[j]] - 1;
120           if ((col < 0) || (aij->garray[col] != idxn[j])) *(v+i*n+j) = 0.0;
121           else {
122             ierr = MatGetValues(aij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
123           }
124         }
125       }
126     }
127     else {
128       SETERRQ(1,"MatGetValues_MPIAIJ:Only local values currently supported");
129     }
130   }
131   return 0;
132 }
133 
134 static int MatAssemblyBegin_MPIAIJ(Mat mat,MatAssemblyType mode)
135 {
136   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
137   MPI_Comm    comm = mat->comm;
138   int         size = aij->size, *owners = aij->rowners;
139   int         rank = aij->rank,tag = mat->tag, *owner,*starts,count,ierr;
140   MPI_Request *send_waits,*recv_waits;
141   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
142   InsertMode  addv;
143   Scalar      *rvalues,*svalues;
144 
145   /* make sure all processors are either in INSERTMODE or ADDMODE */
146   MPI_Allreduce(&aij->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
147   if (addv == (ADD_VALUES|INSERT_VALUES)) {
148     SETERRQ(1,"MatAssemblyBegin_MPIAIJ:Some processors inserted others added");
149   }
150   aij->insertmode = addv; /* in case this processor had no cache */
151 
152   /*  first count number of contributors to each processor */
153   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
154   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
155   owner = (int *) PetscMalloc( (aij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
156   for ( i=0; i<aij->stash.n; i++ ) {
157     idx = aij->stash.idx[i];
158     for ( j=0; j<size; j++ ) {
159       if (idx >= owners[j] && idx < owners[j+1]) {
160         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
161       }
162     }
163   }
164   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
165 
166   /* inform other processors of number of messages and max length*/
167   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
168   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
169   nreceives = work[rank];
170   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
171   nmax = work[rank];
172   PetscFree(work);
173 
174   /* post receives:
175        1) each message will consist of ordered pairs
176      (global index,value) we store the global index as a double
177      to simplify the message passing.
178        2) since we don't know how long each individual message is we
179      allocate the largest needed buffer for each receive. Potentially
180      this is a lot of wasted space.
181 
182 
183        This could be done better.
184   */
185   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
186   CHKPTRQ(rvalues);
187   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
188   CHKPTRQ(recv_waits);
189   for ( i=0; i<nreceives; i++ ) {
190     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
191               comm,recv_waits+i);
192   }
193 
194   /* do sends:
195       1) starts[i] gives the starting index in svalues for stuff going to
196          the ith processor
197   */
198   svalues = (Scalar *) PetscMalloc(3*(aij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
199   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
200   CHKPTRQ(send_waits);
201   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
202   starts[0] = 0;
203   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
204   for ( i=0; i<aij->stash.n; i++ ) {
205     svalues[3*starts[owner[i]]]       = (Scalar)  aij->stash.idx[i];
206     svalues[3*starts[owner[i]]+1]     = (Scalar)  aij->stash.idy[i];
207     svalues[3*(starts[owner[i]]++)+2] =  aij->stash.array[i];
208   }
209   PetscFree(owner);
210   starts[0] = 0;
211   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
212   count = 0;
213   for ( i=0; i<size; i++ ) {
214     if (procs[i]) {
215       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
216                 comm,send_waits+count++);
217     }
218   }
219   PetscFree(starts); PetscFree(nprocs);
220 
221   /* Free cache space */
222   PLogInfo(0,"[%d]MatAssemblyBegin_MPIAIJ:Number of off processor values %d\n",rank,aij->stash.n);
223   ierr = StashDestroy_Private(&aij->stash); CHKERRQ(ierr);
224 
225   aij->svalues    = svalues;    aij->rvalues    = rvalues;
226   aij->nsends     = nsends;     aij->nrecvs     = nreceives;
227   aij->send_waits = send_waits; aij->recv_waits = recv_waits;
228   aij->rmax       = nmax;
229 
230   return 0;
231 }
232 extern int MatSetUpMultiply_MPIAIJ(Mat);
233 
234 static int MatAssemblyEnd_MPIAIJ(Mat mat,MatAssemblyType mode)
235 {
236   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
237   MPI_Status  *send_status,recv_status;
238   int         imdex,nrecvs = aij->nrecvs, count = nrecvs, i, n, ierr;
239   int         row,col,other_disassembled;
240   Scalar      *values,val;
241   InsertMode  addv = aij->insertmode;
242 
243   /*  wait on receives */
244   while (count) {
245     MPI_Waitany(nrecvs,aij->recv_waits,&imdex,&recv_status);
246     /* unpack receives into our local space */
247     values = aij->rvalues + 3*imdex*aij->rmax;
248     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
249     n = n/3;
250     for ( i=0; i<n; i++ ) {
251       row = (int) PetscReal(values[3*i]) - aij->rstart;
252       col = (int) PetscReal(values[3*i+1]);
253       val = values[3*i+2];
254       if (col >= aij->cstart && col < aij->cend) {
255         col -= aij->cstart;
256         MatSetValues(aij->A,1,&row,1,&col,&val,addv);
257       }
258       else {
259         if (mat->was_assembled) {
260           if (!aij->colmap) {
261             ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
262           }
263           col = aij->colmap[col] - 1;
264           if (col < 0  && !((Mat_SeqAIJ*)(aij->A->data))->nonew) {
265             ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
266             col = (int) PetscReal(values[3*i+1]);
267           }
268         }
269         MatSetValues(aij->B,1,&row,1,&col,&val,addv);
270       }
271     }
272     count--;
273   }
274   PetscFree(aij->recv_waits); PetscFree(aij->rvalues);
275 
276   /* wait on sends */
277   if (aij->nsends) {
278     send_status = (MPI_Status *) PetscMalloc(aij->nsends*sizeof(MPI_Status));
279     CHKPTRQ(send_status);
280     MPI_Waitall(aij->nsends,aij->send_waits,send_status);
281     PetscFree(send_status);
282   }
283   PetscFree(aij->send_waits); PetscFree(aij->svalues);
284 
285   aij->insertmode = NOT_SET_VALUES;
286   ierr = MatAssemblyBegin(aij->A,mode); CHKERRQ(ierr);
287   ierr = MatAssemblyEnd(aij->A,mode); CHKERRQ(ierr);
288 
289   /* determine if any processor has disassembled, if so we must
290      also disassemble ourselfs, in order that we may reassemble. */
291   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
292   if (mat->was_assembled && !other_disassembled) {
293     ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
294   }
295 
296   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
297     ierr = MatSetUpMultiply_MPIAIJ(mat); CHKERRQ(ierr);
298   }
299   ierr = MatAssemblyBegin(aij->B,mode); CHKERRQ(ierr);
300   ierr = MatAssemblyEnd(aij->B,mode); CHKERRQ(ierr);
301 
302   if (aij->rowvalues) {PetscFree(aij->rowvalues); aij->rowvalues = 0;}
303   return 0;
304 }
305 
306 static int MatZeroEntries_MPIAIJ(Mat A)
307 {
308   Mat_MPIAIJ *l = (Mat_MPIAIJ *) A->data;
309   int        ierr;
310   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
311   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
312   return 0;
313 }
314 
315 /* the code does not do the diagonal entries correctly unless the
316    matrix is square and the column and row owerships are identical.
317    This is a BUG. The only way to fix it seems to be to access
318    aij->A and aij->B directly and not through the MatZeroRows()
319    routine.
320 */
321 static int MatZeroRows_MPIAIJ(Mat A,IS is,Scalar *diag)
322 {
323   Mat_MPIAIJ     *l = (Mat_MPIAIJ *) A->data;
324   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
325   int            *procs,*nprocs,j,found,idx,nsends,*work;
326   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
327   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
328   int            *lens,imdex,*lrows,*values;
329   MPI_Comm       comm = A->comm;
330   MPI_Request    *send_waits,*recv_waits;
331   MPI_Status     recv_status,*send_status;
332   IS             istmp;
333 
334   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
335   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
336 
337   /*  first count number of contributors to each processor */
338   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
339   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
340   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
341   for ( i=0; i<N; i++ ) {
342     idx = rows[i];
343     found = 0;
344     for ( j=0; j<size; j++ ) {
345       if (idx >= owners[j] && idx < owners[j+1]) {
346         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
347       }
348     }
349     if (!found) SETERRQ(1,"MatZeroRows_MPIAIJ:Index out of range");
350   }
351   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
352 
353   /* inform other processors of number of messages and max length*/
354   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
355   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
356   nrecvs = work[rank];
357   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
358   nmax = work[rank];
359   PetscFree(work);
360 
361   /* post receives:   */
362   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
363   CHKPTRQ(rvalues);
364   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
365   CHKPTRQ(recv_waits);
366   for ( i=0; i<nrecvs; i++ ) {
367     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
368   }
369 
370   /* do sends:
371       1) starts[i] gives the starting index in svalues for stuff going to
372          the ith processor
373   */
374   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
375   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
376   CHKPTRQ(send_waits);
377   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
378   starts[0] = 0;
379   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
380   for ( i=0; i<N; i++ ) {
381     svalues[starts[owner[i]]++] = rows[i];
382   }
383   ISRestoreIndices(is,&rows);
384 
385   starts[0] = 0;
386   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
387   count = 0;
388   for ( i=0; i<size; i++ ) {
389     if (procs[i]) {
390       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
391     }
392   }
393   PetscFree(starts);
394 
395   base = owners[rank];
396 
397   /*  wait on receives */
398   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
399   source = lens + nrecvs;
400   count  = nrecvs; slen = 0;
401   while (count) {
402     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
403     /* unpack receives into our local space */
404     MPI_Get_count(&recv_status,MPI_INT,&n);
405     source[imdex]  = recv_status.MPI_SOURCE;
406     lens[imdex]  = n;
407     slen += n;
408     count--;
409   }
410   PetscFree(recv_waits);
411 
412   /* move the data into the send scatter */
413   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
414   count = 0;
415   for ( i=0; i<nrecvs; i++ ) {
416     values = rvalues + i*nmax;
417     for ( j=0; j<lens[i]; j++ ) {
418       lrows[count++] = values[j] - base;
419     }
420   }
421   PetscFree(rvalues); PetscFree(lens);
422   PetscFree(owner); PetscFree(nprocs);
423 
424   /* actually zap the local rows */
425   ierr = ISCreateSeq(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
426   PLogObjectParent(A,istmp);
427   PetscFree(lrows);
428   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
429   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
430   ierr = ISDestroy(istmp); CHKERRQ(ierr);
431 
432   /* wait on sends */
433   if (nsends) {
434     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
435     CHKPTRQ(send_status);
436     MPI_Waitall(nsends,send_waits,send_status);
437     PetscFree(send_status);
438   }
439   PetscFree(send_waits); PetscFree(svalues);
440 
441   return 0;
442 }
443 
444 static int MatMult_MPIAIJ(Mat A,Vec xx,Vec yy)
445 {
446   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
447   int        ierr,nt;
448 
449   ierr = VecGetLocalSize(xx,&nt);  CHKERRQ(ierr);
450   if (nt != a->n) {
451     SETERRQ(1,"MatMult_MPIAIJ:Incompatible parition of A and xx");
452   }
453   ierr = VecGetLocalSize(yy,&nt);  CHKERRQ(ierr);
454   if (nt != a->m) {
455     SETERRQ(1,"MatMult_MPIAIJ:Incompatible parition of A and yy");
456   }
457   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx); CHKERRQ(ierr);
458   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
459   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx); CHKERRQ(ierr);
460   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
461   return 0;
462 }
463 
464 static int MatMultAdd_MPIAIJ(Mat A,Vec xx,Vec yy,Vec zz)
465 {
466   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
467   int        ierr;
468   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
469   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
470   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
471   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
472   return 0;
473 }
474 
475 static int MatMultTrans_MPIAIJ(Mat A,Vec xx,Vec yy)
476 {
477   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
478   int        ierr;
479 
480   /* do nondiagonal part */
481   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
482   /* send it on its way */
483   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,
484                 (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
485   /* do local part */
486   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
487   /* receive remote parts: note this assumes the values are not actually */
488   /* inserted in yy until the next line, which is true for my implementation*/
489   /* but is not perhaps always true. */
490   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,
491                   (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
492   return 0;
493 }
494 
495 static int MatMultTransAdd_MPIAIJ(Mat A,Vec xx,Vec yy,Vec zz)
496 {
497   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
498   int        ierr;
499 
500   /* do nondiagonal part */
501   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
502   /* send it on its way */
503   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,
504                  (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
505   /* do local part */
506   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
507   /* receive remote parts: note this assumes the values are not actually */
508   /* inserted in yy until the next line, which is true for my implementation*/
509   /* but is not perhaps always true. */
510   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,
511                   (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
512   return 0;
513 }
514 
515 /*
516   This only works correctly for square matrices where the subblock A->A is the
517    diagonal block
518 */
519 static int MatGetDiagonal_MPIAIJ(Mat A,Vec v)
520 {
521   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
522   if (a->M != a->N)
523     SETERRQ(1,"MatGetDiagonal_MPIAIJ:Supports only square matrix where A->A is diag block");
524   if (a->rstart != a->cstart || a->rend != a->cend) {
525     SETERRQ(1,"MatGetDiagonal_MPIAIJ:row partition must equal col partition");  }
526   return MatGetDiagonal(a->A,v);
527 }
528 
529 static int MatScale_MPIAIJ(Scalar *aa,Mat A)
530 {
531   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
532   int        ierr;
533   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
534   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
535   return 0;
536 }
537 
538 static int MatDestroy_MPIAIJ(PetscObject obj)
539 {
540   Mat        mat = (Mat) obj;
541   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
542   int        ierr;
543 #if defined(PETSC_LOG)
544   PLogObjectState(obj,"Rows=%d, Cols=%d",aij->M,aij->N);
545 #endif
546   PetscFree(aij->rowners);
547   ierr = MatDestroy(aij->A); CHKERRQ(ierr);
548   ierr = MatDestroy(aij->B); CHKERRQ(ierr);
549   if (aij->colmap) PetscFree(aij->colmap);
550   if (aij->garray) PetscFree(aij->garray);
551   if (aij->lvec)   VecDestroy(aij->lvec);
552   if (aij->Mvctx)  VecScatterDestroy(aij->Mvctx);
553   if (aij->rowvalues) PetscFree(aij->rowvalues);
554   PetscFree(aij);
555   PLogObjectDestroy(mat);
556   PetscHeaderDestroy(mat);
557   return 0;
558 }
559 #include "draw.h"
560 #include "pinclude/pviewer.h"
561 
562 static int MatView_MPIAIJ_Binary(Mat mat,Viewer viewer)
563 {
564   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
565   int         ierr;
566 
567   if (aij->size == 1) {
568     ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
569   }
570   else SETERRQ(1,"MatView_MPIAIJ_Binary:Only uniprocessor output supported");
571   return 0;
572 }
573 
574 static int MatView_MPIAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
575 {
576   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
577   Mat_SeqAIJ* C = (Mat_SeqAIJ*)aij->A->data;
578   int         ierr, format,shift = C->indexshift,rank;
579   FILE        *fd;
580   ViewerType  vtype;
581 
582   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
583   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
584     ierr = ViewerGetFormat(viewer,&format);
585     if (format == ASCII_FORMAT_INFO_DETAILED) {
586       int nz, nzalloc, mem, flg;
587       MPI_Comm_rank(mat->comm,&rank);
588       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
589       ierr = MatGetInfo(mat,MAT_LOCAL,&nz,&nzalloc,&mem);
590       ierr = OptionsHasName(PETSC_NULL,"-mat_aij_no_inode",&flg); CHKERRQ(ierr);
591       PetscSequentialPhaseBegin(mat->comm,1);
592       if (flg) fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d mem %d, not using I-node routines\n",
593          rank,aij->m,nz,nzalloc,mem);
594       else fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d mem %d, using I-node routines\n",
595          rank,aij->m,nz,nzalloc,mem);
596       ierr = MatGetInfo(aij->A,MAT_LOCAL,&nz,&nzalloc,&mem);
597       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,nz);
598       ierr = MatGetInfo(aij->B,MAT_LOCAL,&nz,&nzalloc,&mem);
599       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,nz);
600       fflush(fd);
601       PetscSequentialPhaseEnd(mat->comm,1);
602       ierr = VecScatterView(aij->Mvctx,viewer); CHKERRQ(ierr);
603       return 0;
604     }
605     else if (format == ASCII_FORMAT_INFO) {
606       return 0;
607     }
608   }
609 
610   if (vtype == DRAW_VIEWER) {
611     Draw       draw;
612     PetscTruth isnull;
613     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
614     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
615   }
616 
617   if (vtype == ASCII_FILE_VIEWER) {
618     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
619     PetscSequentialPhaseBegin(mat->comm,1);
620     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
621            aij->rank,aij->m,aij->rstart,aij->rend,aij->n,aij->cstart,
622            aij->cend);
623     ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
624     ierr = MatView(aij->B,viewer); CHKERRQ(ierr);
625     fflush(fd);
626     PetscSequentialPhaseEnd(mat->comm,1);
627   }
628   else {
629     int size = aij->size;
630     rank = aij->rank;
631     if (size == 1) {
632       ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
633     }
634     else {
635       /* assemble the entire matrix onto first processor. */
636       Mat         A;
637       Mat_SeqAIJ *Aloc;
638       int         M = aij->M, N = aij->N,m,*ai,*aj,row,*cols,i,*ct;
639       Scalar      *a;
640 
641       if (!rank) {
642         ierr = MatCreateMPIAIJ(mat->comm,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
643                CHKERRQ(ierr);
644       }
645       else {
646         ierr = MatCreateMPIAIJ(mat->comm,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
647                CHKERRQ(ierr);
648       }
649       PLogObjectParent(mat,A);
650 
651       /* copy over the A part */
652       Aloc = (Mat_SeqAIJ*) aij->A->data;
653       m = Aloc->m; ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
654       row = aij->rstart;
655       for ( i=0; i<ai[m]+shift; i++ ) {aj[i] += aij->cstart + shift;}
656       for ( i=0; i<m; i++ ) {
657         ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],aj,a,INSERT_VALUES);CHKERRQ(ierr);
658         row++; a += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
659       }
660       aj = Aloc->j;
661       for ( i=0; i<ai[m]+shift; i++ ) {aj[i] -= aij->cstart + shift;}
662 
663       /* copy over the B part */
664       Aloc = (Mat_SeqAIJ*) aij->B->data;
665       m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
666       row = aij->rstart;
667       ct = cols = (int *) PetscMalloc( (ai[m]+1)*sizeof(int) ); CHKPTRQ(cols);
668       for ( i=0; i<ai[m]+shift; i++ ) {cols[i] = aij->garray[aj[i]+shift];}
669       for ( i=0; i<m; i++ ) {
670         ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],cols,a,INSERT_VALUES);CHKERRQ(ierr);
671         row++; a += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
672       }
673       PetscFree(ct);
674       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
675       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
676       if (!rank) {
677         ierr = MatView(((Mat_MPIAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
678       }
679       ierr = MatDestroy(A); CHKERRQ(ierr);
680     }
681   }
682   return 0;
683 }
684 
685 static int MatView_MPIAIJ(PetscObject obj,Viewer viewer)
686 {
687   Mat         mat = (Mat) obj;
688   int         ierr;
689   ViewerType  vtype;
690 
691   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
692   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
693       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
694     ierr = MatView_MPIAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
695   }
696   else if (vtype == BINARY_FILE_VIEWER) {
697     return MatView_MPIAIJ_Binary(mat,viewer);
698   }
699   return 0;
700 }
701 
702 /*
703     This has to provide several versions.
704 
705      2) a) use only local smoothing updating outer values only once.
706         b) local smoothing updating outer values each inner iteration
707      3) color updating out values betwen colors.
708 */
709 static int MatRelax_MPIAIJ(Mat matin,Vec bb,double omega,MatSORType flag,
710                            double fshift,int its,Vec xx)
711 {
712   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
713   Mat        AA = mat->A, BB = mat->B;
714   Mat_SeqAIJ *A = (Mat_SeqAIJ *) AA->data, *B = (Mat_SeqAIJ *)BB->data;
715   Scalar     *b,*x,*xs,*ls,d,*v,sum;
716   int        ierr,*idx, *diag;
717   int        n = mat->n, m = mat->m, i,shift = A->indexshift;
718 
719   VecGetArray(xx,&x); VecGetArray(bb,&b); VecGetArray(mat->lvec,&ls);
720   xs = x + shift; /* shift by one for index start of 1 */
721   ls = ls + shift;
722   if (!A->diag) {if ((ierr = MatMarkDiag_SeqAIJ(AA))) return ierr;}
723   diag = A->diag;
724   if ((flag & SOR_LOCAL_SYMMETRIC_SWEEP) == SOR_LOCAL_SYMMETRIC_SWEEP){
725     if (flag & SOR_ZERO_INITIAL_GUESS) {
726       return (*mat->A->ops.relax)(mat->A,bb,omega,flag,fshift,its,xx);
727     }
728     ierr=VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,mat->Mvctx);
729     CHKERRQ(ierr);
730     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,mat->Mvctx);
731     CHKERRQ(ierr);
732     while (its--) {
733       /* go down through the rows */
734       for ( i=0; i<m; i++ ) {
735         n    = A->i[i+1] - A->i[i];
736         idx  = A->j + A->i[i] + shift;
737         v    = A->a + A->i[i] + shift;
738         sum  = b[i];
739         SPARSEDENSEMDOT(sum,xs,v,idx,n);
740         d    = fshift + A->a[diag[i]+shift];
741         n    = B->i[i+1] - B->i[i];
742         idx  = B->j + B->i[i] + shift;
743         v    = B->a + B->i[i] + shift;
744         SPARSEDENSEMDOT(sum,ls,v,idx,n);
745         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
746       }
747       /* come up through the rows */
748       for ( i=m-1; i>-1; i-- ) {
749         n    = A->i[i+1] - A->i[i];
750         idx  = A->j + A->i[i] + shift;
751         v    = A->a + A->i[i] + shift;
752         sum  = b[i];
753         SPARSEDENSEMDOT(sum,xs,v,idx,n);
754         d    = fshift + A->a[diag[i]+shift];
755         n    = B->i[i+1] - B->i[i];
756         idx  = B->j + B->i[i] + shift;
757         v    = B->a + B->i[i] + shift;
758         SPARSEDENSEMDOT(sum,ls,v,idx,n);
759         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
760       }
761     }
762   }
763   else if (flag & SOR_LOCAL_FORWARD_SWEEP){
764     if (flag & SOR_ZERO_INITIAL_GUESS) {
765       return (*mat->A->ops.relax)(mat->A,bb,omega,flag,fshift,its,xx);
766     }
767     ierr=VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,mat->Mvctx);
768     CHKERRQ(ierr);
769     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,mat->Mvctx);
770     CHKERRQ(ierr);
771     while (its--) {
772       for ( i=0; i<m; i++ ) {
773         n    = A->i[i+1] - A->i[i];
774         idx  = A->j + A->i[i] + shift;
775         v    = A->a + A->i[i] + shift;
776         sum  = b[i];
777         SPARSEDENSEMDOT(sum,xs,v,idx,n);
778         d    = fshift + A->a[diag[i]+shift];
779         n    = B->i[i+1] - B->i[i];
780         idx  = B->j + B->i[i] + shift;
781         v    = B->a + B->i[i] + shift;
782         SPARSEDENSEMDOT(sum,ls,v,idx,n);
783         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
784       }
785     }
786   }
787   else if (flag & SOR_LOCAL_BACKWARD_SWEEP){
788     if (flag & SOR_ZERO_INITIAL_GUESS) {
789       return (*mat->A->ops.relax)(mat->A,bb,omega,flag,fshift,its,xx);
790     }
791     ierr = VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,
792                             mat->Mvctx); CHKERRQ(ierr);
793     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,
794                             mat->Mvctx); CHKERRQ(ierr);
795     while (its--) {
796       for ( i=m-1; i>-1; i-- ) {
797         n    = A->i[i+1] - A->i[i];
798         idx  = A->j + A->i[i] + shift;
799         v    = A->a + A->i[i] + shift;
800         sum  = b[i];
801         SPARSEDENSEMDOT(sum,xs,v,idx,n);
802         d    = fshift + A->a[diag[i]+shift];
803         n    = B->i[i+1] - B->i[i];
804         idx  = B->j + B->i[i] + shift;
805         v    = B->a + B->i[i] + shift;
806         SPARSEDENSEMDOT(sum,ls,v,idx,n);
807         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
808       }
809     }
810   }
811   else {
812     SETERRQ(1,"MatRelax_MPIAIJ:Option not supported");
813   }
814   return 0;
815 }
816 
817 static int MatGetInfo_MPIAIJ(Mat matin,MatInfoType flag,int *nz,
818                              int *nzalloc,int *mem)
819 {
820   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
821   Mat        A = mat->A, B = mat->B;
822   int        ierr, isend[3], irecv[3], nzA, nzallocA, memA;
823 
824   ierr = MatGetInfo(A,MAT_LOCAL,&nzA,&nzallocA,&memA); CHKERRQ(ierr);
825   ierr = MatGetInfo(B,MAT_LOCAL,&isend[0],&isend[1],&isend[2]); CHKERRQ(ierr);
826   isend[0] += nzA; isend[1] += nzallocA; isend[2] += memA;
827   if (flag == MAT_LOCAL) {
828     if (nz)       *nz      = isend[0];
829     if (nzalloc)  *nzalloc = isend[1];
830     if (mem)      *mem     = isend[2];
831   } else if (flag == MAT_GLOBAL_MAX) {
832     MPI_Allreduce( isend, irecv,3,MPI_INT,MPI_MAX,matin->comm);
833     if (nz)      *nz      = irecv[0];
834     if (nzalloc) *nzalloc = irecv[1];
835     if (mem)     *mem     = irecv[2];
836   } else if (flag == MAT_GLOBAL_SUM) {
837     MPI_Allreduce( isend, irecv,3,MPI_INT,MPI_SUM,matin->comm);
838     if (nz)      *nz      = irecv[0];
839     if (nzalloc) *nzalloc = irecv[1];
840     if (mem)     *mem     = irecv[2];
841   }
842   return 0;
843 }
844 
845 extern int MatLUFactorSymbolic_MPIAIJ(Mat,IS,IS,double,Mat*);
846 extern int MatLUFactorNumeric_MPIAIJ(Mat,Mat*);
847 extern int MatLUFactor_MPIAIJ(Mat,IS,IS,double);
848 extern int MatILUFactorSymbolic_MPIAIJ(Mat,IS,IS,double,int,Mat *);
849 extern int MatSolve_MPIAIJ(Mat,Vec,Vec);
850 extern int MatSolveAdd_MPIAIJ(Mat,Vec,Vec,Vec);
851 extern int MatSolveTrans_MPIAIJ(Mat,Vec,Vec);
852 extern int MatSolveTransAdd_MPIAIJ(Mat,Vec,Vec,Vec);
853 
854 static int MatSetOption_MPIAIJ(Mat A,MatOption op)
855 {
856   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
857 
858   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
859       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
860       op == MAT_COLUMNS_SORTED ||
861       op == MAT_ROW_ORIENTED) {
862         MatSetOption(a->A,op);
863         MatSetOption(a->B,op);
864   }
865   else if (op == MAT_ROWS_SORTED ||
866            op == MAT_SYMMETRIC ||
867            op == MAT_STRUCTURALLY_SYMMETRIC ||
868            op == MAT_YES_NEW_DIAGONALS)
869     PLogInfo(A,"Info:MatSetOption_MPIAIJ:Option ignored\n");
870   else if (op == MAT_COLUMN_ORIENTED) {
871     a->roworiented = 0;
872     MatSetOption(a->A,op);
873     MatSetOption(a->B,op);
874   }
875   else if (op == MAT_NO_NEW_DIAGONALS)
876     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIAIJ:MAT_NO_NEW_DIAGONALS");}
877   else
878     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIAIJ:unknown option");}
879   return 0;
880 }
881 
882 static int MatGetSize_MPIAIJ(Mat matin,int *m,int *n)
883 {
884   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
885   *m = mat->M; *n = mat->N;
886   return 0;
887 }
888 
889 static int MatGetLocalSize_MPIAIJ(Mat matin,int *m,int *n)
890 {
891   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
892   *m = mat->m; *n = mat->N;
893   return 0;
894 }
895 
896 static int MatGetOwnershipRange_MPIAIJ(Mat matin,int *m,int *n)
897 {
898   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
899   *m = mat->rstart; *n = mat->rend;
900   return 0;
901 }
902 
903 extern int MatGetRow_SeqAIJ(Mat,int,int*,int**,Scalar**);
904 extern int MatRestoreRow_SeqAIJ(Mat,int,int*,int**,Scalar**);
905 
906 int MatGetRow_MPIAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
907 {
908   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
909   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
910   int        i, ierr, *cworkA, *cworkB, **pcA, **pcB, cstart = mat->cstart;
911   int        nztot, nzA, nzB, lrow, rstart = mat->rstart, rend = mat->rend;
912   int        *cmap, *idx_p;
913 
914   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,"MatGetRow_MPIAIJ:Already active");
915   mat->getrowactive = PETSC_TRUE;
916 
917   if (!mat->rowvalues && (idx || v)) {
918     /*
919         allocate enough space to hold information from the longest row.
920     */
921     Mat_SeqAIJ *Aa = (Mat_SeqAIJ *) mat->A->data,*Ba = (Mat_SeqAIJ *) mat->B->data;
922     int     max = 1,m = mat->m,tmp;
923     for ( i=0; i<m; i++ ) {
924       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
925       if (max < tmp) { max = tmp; }
926     }
927     mat->rowvalues = (Scalar *) PetscMalloc( max*(sizeof(int)+sizeof(Scalar)));
928     CHKPTRQ(mat->rowvalues);
929     mat->rowindices = (int *) (mat->rowvalues + max);
930   }
931 
932   if (row < rstart || row >= rend) SETERRQ(1,"MatGetRow_MPIAIJ:Only local rows")
933   lrow = row - rstart;
934 
935   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
936   if (!v)   {pvA = 0; pvB = 0;}
937   if (!idx) {pcA = 0; if (!v) pcB = 0;}
938   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
939   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
940   nztot = nzA + nzB;
941 
942   cmap  = mat->garray;
943   if (v  || idx) {
944     if (nztot) {
945       /* Sort by increasing column numbers, assuming A and B already sorted */
946       int imark = -1;
947       if (v) {
948         *v = v_p = mat->rowvalues;
949         for ( i=0; i<nzB; i++ ) {
950           if (cmap[cworkB[i]] < cstart)   v_p[i] = vworkB[i];
951           else break;
952         }
953         imark = i;
954         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
955         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
956       }
957       if (idx) {
958         *idx = idx_p = mat->rowindices;
959         if (imark > -1) {
960           for ( i=0; i<imark; i++ ) {
961             idx_p[i] = cmap[cworkB[i]];
962           }
963         } else {
964           for ( i=0; i<nzB; i++ ) {
965             if (cmap[cworkB[i]] < cstart)   idx_p[i] = cmap[cworkB[i]];
966             else break;
967           }
968           imark = i;
969         }
970         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart + cworkA[i];
971         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]];
972       }
973     }
974     else {
975       if (idx) *idx = 0;
976       if (v)   *v   = 0;
977     }
978   }
979   *nz = nztot;
980   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
981   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
982   return 0;
983 }
984 
985 int MatRestoreRow_MPIAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
986 {
987   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
988   if (aij->getrowactive == PETSC_FALSE) {
989     SETERRQ(1,"MatRestoreRow_MPIAIJ:MatGetRow not called");
990   }
991   aij->getrowactive = PETSC_FALSE;
992   return 0;
993 }
994 
995 static int MatNorm_MPIAIJ(Mat mat,NormType type,double *norm)
996 {
997   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
998   Mat_SeqAIJ *amat = (Mat_SeqAIJ*) aij->A->data, *bmat = (Mat_SeqAIJ*) aij->B->data;
999   int        ierr, i, j, cstart = aij->cstart,shift = amat->indexshift;
1000   double     sum = 0.0;
1001   Scalar     *v;
1002 
1003   if (aij->size == 1) {
1004     ierr =  MatNorm(aij->A,type,norm); CHKERRQ(ierr);
1005   } else {
1006     if (type == NORM_FROBENIUS) {
1007       v = amat->a;
1008       for (i=0; i<amat->nz; i++ ) {
1009 #if defined(PETSC_COMPLEX)
1010         sum += real(conj(*v)*(*v)); v++;
1011 #else
1012         sum += (*v)*(*v); v++;
1013 #endif
1014       }
1015       v = bmat->a;
1016       for (i=0; i<bmat->nz; i++ ) {
1017 #if defined(PETSC_COMPLEX)
1018         sum += real(conj(*v)*(*v)); v++;
1019 #else
1020         sum += (*v)*(*v); v++;
1021 #endif
1022       }
1023       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
1024       *norm = sqrt(*norm);
1025     }
1026     else if (type == NORM_1) { /* max column norm */
1027       double *tmp, *tmp2;
1028       int    *jj, *garray = aij->garray;
1029       tmp  = (double *) PetscMalloc( aij->N*sizeof(double) ); CHKPTRQ(tmp);
1030       tmp2 = (double *) PetscMalloc( aij->N*sizeof(double) ); CHKPTRQ(tmp2);
1031       PetscMemzero(tmp,aij->N*sizeof(double));
1032       *norm = 0.0;
1033       v = amat->a; jj = amat->j;
1034       for ( j=0; j<amat->nz; j++ ) {
1035         tmp[cstart + *jj++ + shift] += PetscAbsScalar(*v);  v++;
1036       }
1037       v = bmat->a; jj = bmat->j;
1038       for ( j=0; j<bmat->nz; j++ ) {
1039         tmp[garray[*jj++ + shift]] += PetscAbsScalar(*v); v++;
1040       }
1041       MPI_Allreduce(tmp,tmp2,aij->N,MPI_DOUBLE,MPI_SUM,mat->comm);
1042       for ( j=0; j<aij->N; j++ ) {
1043         if (tmp2[j] > *norm) *norm = tmp2[j];
1044       }
1045       PetscFree(tmp); PetscFree(tmp2);
1046     }
1047     else if (type == NORM_INFINITY) { /* max row norm */
1048       double ntemp = 0.0;
1049       for ( j=0; j<amat->m; j++ ) {
1050         v = amat->a + amat->i[j] + shift;
1051         sum = 0.0;
1052         for ( i=0; i<amat->i[j+1]-amat->i[j]; i++ ) {
1053           sum += PetscAbsScalar(*v); v++;
1054         }
1055         v = bmat->a + bmat->i[j] + shift;
1056         for ( i=0; i<bmat->i[j+1]-bmat->i[j]; i++ ) {
1057           sum += PetscAbsScalar(*v); v++;
1058         }
1059         if (sum > ntemp) ntemp = sum;
1060       }
1061       MPI_Allreduce(&ntemp,norm,1,MPI_DOUBLE,MPI_MAX,mat->comm);
1062     }
1063     else {
1064       SETERRQ(1,"MatNorm_MPIAIJ:No support for two norm");
1065     }
1066   }
1067   return 0;
1068 }
1069 
1070 static int MatTranspose_MPIAIJ(Mat A,Mat *matout)
1071 {
1072   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
1073   Mat_SeqAIJ *Aloc = (Mat_SeqAIJ *) a->A->data;
1074   int        ierr,shift = Aloc->indexshift;
1075   Mat        B;
1076   int        M = a->M, N = a->N,m,*ai,*aj,row,*cols,i,*ct;
1077   Scalar     *array;
1078 
1079   if (matout == PETSC_NULL && M != N)
1080     SETERRQ(1,"MatTranspose_MPIAIJ:Square matrix only for in-place");
1081   ierr = MatCreateMPIAIJ(A->comm,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,
1082          PETSC_NULL,&B); CHKERRQ(ierr);
1083 
1084   /* copy over the A part */
1085   Aloc = (Mat_SeqAIJ*) a->A->data;
1086   m = Aloc->m; ai = Aloc->i; aj = Aloc->j; array = Aloc->a;
1087   row = a->rstart;
1088   for ( i=0; i<ai[m]+shift; i++ ) {aj[i] += a->cstart + shift;}
1089   for ( i=0; i<m; i++ ) {
1090     ierr = MatSetValues(B,ai[i+1]-ai[i],aj,1,&row,array,INSERT_VALUES);CHKERRQ(ierr);
1091     row++; array += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
1092   }
1093   aj = Aloc->j;
1094   for ( i=0; i<ai[m]+shift; i++ ) {aj[i] -= a->cstart + shift;}
1095 
1096   /* copy over the B part */
1097   Aloc = (Mat_SeqAIJ*) a->B->data;
1098   m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; array = Aloc->a;
1099   row = a->rstart;
1100   ct = cols = (int *) PetscMalloc( (1+ai[m]-shift)*sizeof(int) ); CHKPTRQ(cols);
1101   for ( i=0; i<ai[m]+shift; i++ ) {cols[i] = a->garray[aj[i]+shift];}
1102   for ( i=0; i<m; i++ ) {
1103     ierr = MatSetValues(B,ai[i+1]-ai[i],cols,1,&row,array,INSERT_VALUES);CHKERRQ(ierr);
1104     row++; array += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
1105   }
1106   PetscFree(ct);
1107   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1108   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1109   if (matout != PETSC_NULL) {
1110     *matout = B;
1111   } else {
1112     /* This isn't really an in-place transpose .... but free data structures from a */
1113     PetscFree(a->rowners);
1114     ierr = MatDestroy(a->A); CHKERRQ(ierr);
1115     ierr = MatDestroy(a->B); CHKERRQ(ierr);
1116     if (a->colmap) PetscFree(a->colmap);
1117     if (a->garray) PetscFree(a->garray);
1118     if (a->lvec) VecDestroy(a->lvec);
1119     if (a->Mvctx) VecScatterDestroy(a->Mvctx);
1120     PetscFree(a);
1121     PetscMemcpy(A,B,sizeof(struct _Mat));
1122     PetscHeaderDestroy(B);
1123   }
1124   return 0;
1125 }
1126 
1127 extern int MatPrintHelp_SeqAIJ(Mat);
1128 static int MatPrintHelp_MPIAIJ(Mat A)
1129 {
1130   Mat_MPIAIJ *a   = (Mat_MPIAIJ*) A->data;
1131 
1132   if (!a->rank) return MatPrintHelp_SeqAIJ(a->A);
1133   else return 0;
1134 }
1135 
1136 static int MatGetBlockSize_MPIAIJ(Mat A,int *bs)
1137 {
1138   *bs = 1;
1139   return 0;
1140 }
1141 
1142 extern int MatConvert_MPIAIJ(Mat,MatType,Mat *);
1143 static int MatConvertSameType_MPIAIJ(Mat,Mat *,int);
1144 extern int MatIncreaseOverlap_MPIAIJ(Mat , int, IS *, int);
1145 int MatGetSubMatrices_MPIAIJ (Mat ,int , IS *,IS *,MatGetSubMatrixCall,Mat **);
1146 /* -------------------------------------------------------------------*/
1147 static struct _MatOps MatOps = {MatSetValues_MPIAIJ,
1148        MatGetRow_MPIAIJ,MatRestoreRow_MPIAIJ,
1149        MatMult_MPIAIJ,MatMultAdd_MPIAIJ,
1150        MatMultTrans_MPIAIJ,MatMultTransAdd_MPIAIJ,
1151        MatSolve_MPIAIJ,MatSolveAdd_MPIAIJ,
1152        MatSolveTrans_MPIAIJ,MatSolveTransAdd_MPIAIJ,
1153        MatLUFactor_MPIAIJ,0,
1154        MatRelax_MPIAIJ,
1155        MatTranspose_MPIAIJ,
1156        MatGetInfo_MPIAIJ,0,
1157        MatGetDiagonal_MPIAIJ,0,MatNorm_MPIAIJ,
1158        MatAssemblyBegin_MPIAIJ,MatAssemblyEnd_MPIAIJ,
1159        0,
1160        MatSetOption_MPIAIJ,MatZeroEntries_MPIAIJ,MatZeroRows_MPIAIJ,
1161        MatGetReordering_MPIAIJ,
1162        MatLUFactorSymbolic_MPIAIJ,MatLUFactorNumeric_MPIAIJ,0,0,
1163        MatGetSize_MPIAIJ,MatGetLocalSize_MPIAIJ,MatGetOwnershipRange_MPIAIJ,
1164        MatILUFactorSymbolic_MPIAIJ,0,
1165        0,0,MatConvert_MPIAIJ,MatConvertSameType_MPIAIJ,0,0,
1166        0,0,0,
1167        MatGetSubMatrices_MPIAIJ,MatIncreaseOverlap_MPIAIJ,MatGetValues_MPIAIJ,0,
1168        MatPrintHelp_MPIAIJ,
1169        MatScale_MPIAIJ,0,0,0,MatGetBlockSize_MPIAIJ};
1170 
1171 /*@C
1172    MatCreateMPIAIJ - Creates a sparse parallel matrix in AIJ format
1173    (the default parallel PETSc format).  For good matrix assembly performance
1174    the user should preallocate the matrix storage by setting the parameters
1175    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1176    performance can be increased by more than a factor of 50.
1177 
1178    Input Parameters:
1179 .  comm - MPI communicator
1180 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1181            This value should be the same as the local size used in creating the
1182            y vector for the matrix-vector product y = Ax.
1183 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1184            This value should be the same as the local size used in creating the
1185            x vector for the matrix-vector product y = Ax.
1186 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1187 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1188 .  d_nz - number of nonzeros per row in diagonal portion of local submatrix
1189            (same for all local rows)
1190 .  d_nzz - array containing the number of nonzeros in the various rows of the
1191            diagonal portion of the local submatrix (possibly different for each row)
1192            or PETSC_NULL. You must leave room for the diagonal entry even if
1193            it is zero.
1194 .  o_nz - number of nonzeros per row in the off-diagonal portion of local
1195            submatrix (same for all local rows).
1196 .  o_nzz - array containing the number of nonzeros in the various rows of the
1197            off-diagonal portion of the local submatrix (possibly different for
1198            each row) or PETSC_NULL.
1199 
1200    Output Parameter:
1201 .  A - the matrix
1202 
1203    Notes:
1204    The AIJ format (also called the Yale sparse matrix format or
1205    compressed row storage), is fully compatible with standard Fortran 77
1206    storage.  That is, the stored row and column indices can begin at
1207    either one (as in Fortran) or zero.  See the users manual for details.
1208 
1209    The user MUST specify either the local or global matrix dimensions
1210    (possibly both).
1211 
1212    By default, this format uses inodes (identical nodes) when possible.
1213    We search for consecutive rows with the same nonzero structure, thereby
1214    reusing matrix information to achieve increased efficiency.
1215 
1216    Options Database Keys:
1217 $    -mat_aij_no_inode  - Do not use inodes
1218 $    -mat_aij_inode_limit <limit> - Set inode limit.
1219 $        (max limit=5)
1220 $    -mat_aij_oneindex - Internally use indexing starting at 1
1221 $        rather than 0.  Note: When calling MatSetValues(),
1222 $        the user still MUST index entries starting at 0!
1223 
1224    Storage Information:
1225    For a square global matrix we define each processor's diagonal portion
1226    to be its local rows and the corresponding columns (a square submatrix);
1227    each processor's off-diagonal portion encompasses the remainder of the
1228    local matrix (a rectangular submatrix).
1229 
1230    The user can specify preallocated storage for the diagonal part of
1231    the local submatrix with either d_nz or d_nnz (not both).  Set
1232    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1233    memory allocation.  Likewise, specify preallocated storage for the
1234    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1235 
1236    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1237    the figure below we depict these three local rows and all columns (0-11).
1238 
1239 $          0 1 2 3 4 5 6 7 8 9 10 11
1240 $         -------------------
1241 $  row 3  |  o o o d d d o o o o o o
1242 $  row 4  |  o o o d d d o o o o o o
1243 $  row 5  |  o o o d d d o o o o o o
1244 $         -------------------
1245 $
1246 
1247    Thus, any entries in the d locations are stored in the d (diagonal)
1248    submatrix, and any entries in the o locations are stored in the
1249    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1250    stored simply in the MATSEQAIJ format for compressed row storage.
1251 
1252    Now d_nz should indicate the number of nonzeros per row in the d matrix,
1253    and o_nz should indicate the number of nonzeros per row in the o matrix.
1254    In general, for PDE problems in which most nonzeros are near the diagonal,
1255    one expects d_nz >> o_nz. For large problems you MUST preallocate memory
1256    or you will get TERRIBLE performance; see the users' manual chapter on
1257    matrices and the file $(PETSC_DIR)/Performance.
1258 
1259 .keywords: matrix, aij, compressed row, sparse, parallel
1260 
1261 .seealso: MatCreate(), MatCreateSeqAIJ(), MatSetValues()
1262 @*/
1263 int MatCreateMPIAIJ(MPI_Comm comm,int m,int n,int M,int N,
1264                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1265 {
1266   Mat          B;
1267   Mat_MPIAIJ   *b;
1268   int          ierr, i,sum[2],work[2];
1269 
1270   *A = 0;
1271   PetscHeaderCreate(B,_Mat,MAT_COOKIE,MATMPIAIJ,comm);
1272   PLogObjectCreate(B);
1273   B->data       = (void *) (b = PetscNew(Mat_MPIAIJ)); CHKPTRQ(b);
1274   PetscMemzero(b,sizeof(Mat_MPIAIJ));
1275   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1276   B->destroy    = MatDestroy_MPIAIJ;
1277   B->view       = MatView_MPIAIJ;
1278   B->factor     = 0;
1279   B->assembled  = PETSC_FALSE;
1280 
1281   b->insertmode = NOT_SET_VALUES;
1282   MPI_Comm_rank(comm,&b->rank);
1283   MPI_Comm_size(comm,&b->size);
1284 
1285   if (m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
1286     SETERRQ(1,"MatCreateMPIAIJ:Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1287 
1288   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1289     work[0] = m; work[1] = n;
1290     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
1291     if (M == PETSC_DECIDE) M = sum[0];
1292     if (N == PETSC_DECIDE) N = sum[1];
1293   }
1294   if (m == PETSC_DECIDE) {m = M/b->size + ((M % b->size) > b->rank);}
1295   if (n == PETSC_DECIDE) {n = N/b->size + ((N % b->size) > b->rank);}
1296   b->m = m; B->m = m;
1297   b->n = n; B->n = n;
1298   b->N = N; B->N = N;
1299   b->M = M; B->M = M;
1300 
1301   /* build local table of row and column ownerships */
1302   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1303   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIAIJ));
1304   b->cowners = b->rowners + b->size + 2;
1305   MPI_Allgather(&m,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
1306   b->rowners[0] = 0;
1307   for ( i=2; i<=b->size; i++ ) {
1308     b->rowners[i] += b->rowners[i-1];
1309   }
1310   b->rstart = b->rowners[b->rank];
1311   b->rend   = b->rowners[b->rank+1];
1312   MPI_Allgather(&n,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
1313   b->cowners[0] = 0;
1314   for ( i=2; i<=b->size; i++ ) {
1315     b->cowners[i] += b->cowners[i-1];
1316   }
1317   b->cstart = b->cowners[b->rank];
1318   b->cend   = b->cowners[b->rank+1];
1319 
1320   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1321   ierr = MatCreateSeqAIJ(MPI_COMM_SELF,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1322   PLogObjectParent(B,b->A);
1323   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1324   ierr = MatCreateSeqAIJ(MPI_COMM_SELF,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1325   PLogObjectParent(B,b->B);
1326 
1327   /* build cache for off array entries formed */
1328   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1329   b->colmap      = 0;
1330   b->garray      = 0;
1331   b->roworiented = 1;
1332 
1333   /* stuff used for matrix vector multiply */
1334   b->lvec      = 0;
1335   b->Mvctx     = 0;
1336 
1337   /* stuff for MatGetRow() */
1338   b->rowindices   = 0;
1339   b->rowvalues    = 0;
1340   b->getrowactive = PETSC_FALSE;
1341 
1342   *A = B;
1343   return 0;
1344 }
1345 
1346 static int MatConvertSameType_MPIAIJ(Mat matin,Mat *newmat,int cpvalues)
1347 {
1348   Mat        mat;
1349   Mat_MPIAIJ *a,*oldmat = (Mat_MPIAIJ *) matin->data;
1350   int        ierr, len=0, flg;
1351 
1352   *newmat       = 0;
1353   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIAIJ,matin->comm);
1354   PLogObjectCreate(mat);
1355   mat->data       = (void *) (a = PetscNew(Mat_MPIAIJ)); CHKPTRQ(a);
1356   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1357   mat->destroy    = MatDestroy_MPIAIJ;
1358   mat->view       = MatView_MPIAIJ;
1359   mat->factor     = matin->factor;
1360   mat->assembled  = PETSC_TRUE;
1361 
1362   a->m = mat->m   = oldmat->m;
1363   a->n = mat->n   = oldmat->n;
1364   a->M = mat->M   = oldmat->M;
1365   a->N = mat->N   = oldmat->N;
1366 
1367   a->rstart       = oldmat->rstart;
1368   a->rend         = oldmat->rend;
1369   a->cstart       = oldmat->cstart;
1370   a->cend         = oldmat->cend;
1371   a->size         = oldmat->size;
1372   a->rank         = oldmat->rank;
1373   a->insertmode   = NOT_SET_VALUES;
1374   a->rowvalues    = 0;
1375   a->getrowactive = PETSC_FALSE;
1376 
1377   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1378   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIAIJ));
1379   a->cowners = a->rowners + a->size + 2;
1380   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1381   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1382   if (oldmat->colmap) {
1383     a->colmap = (int *) PetscMalloc((a->N)*sizeof(int));CHKPTRQ(a->colmap);
1384     PLogObjectMemory(mat,(a->N)*sizeof(int));
1385     PetscMemcpy(a->colmap,oldmat->colmap,(a->N)*sizeof(int));
1386   } else a->colmap = 0;
1387   if (oldmat->garray) {
1388     len = ((Mat_SeqAIJ *) (oldmat->B->data))->n;
1389     a->garray = (int *) PetscMalloc((len+1)*sizeof(int)); CHKPTRQ(a->garray);
1390     PLogObjectMemory(mat,len*sizeof(int));
1391     if (len) PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1392   } else a->garray = 0;
1393 
1394   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1395   PLogObjectParent(mat,a->lvec);
1396   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1397   PLogObjectParent(mat,a->Mvctx);
1398   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1399   PLogObjectParent(mat,a->A);
1400   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1401   PLogObjectParent(mat,a->B);
1402   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1403   if (flg) {
1404     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1405   }
1406   *newmat = mat;
1407   return 0;
1408 }
1409 
1410 #include "sys.h"
1411 
1412 int MatLoad_MPIAIJ(Viewer viewer,MatType type,Mat *newmat)
1413 {
1414   Mat          A;
1415   int          i, nz, ierr, j,rstart, rend, fd;
1416   Scalar       *vals,*svals;
1417   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1418   MPI_Status   status;
1419   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,maxnz,*cols;
1420   int          *ourlens,*sndcounts = 0,*procsnz = 0, *offlens,jj,*mycols,*smycols;
1421   int          tag = ((PetscObject)viewer)->tag;
1422 
1423   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1424   if (!rank) {
1425     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1426     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
1427     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIAIJ:not matrix object");
1428   }
1429 
1430   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1431   M = header[1]; N = header[2];
1432   /* determine ownership of all rows */
1433   m = M/size + ((M % size) > rank);
1434   rowners = (int *) PetscMalloc((size+2)*sizeof(int)); CHKPTRQ(rowners);
1435   MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1436   rowners[0] = 0;
1437   for ( i=2; i<=size; i++ ) {
1438     rowners[i] += rowners[i-1];
1439   }
1440   rstart = rowners[rank];
1441   rend   = rowners[rank+1];
1442 
1443   /* distribute row lengths to all processors */
1444   ourlens = (int*) PetscMalloc( 2*(rend-rstart)*sizeof(int) ); CHKPTRQ(ourlens);
1445   offlens = ourlens + (rend-rstart);
1446   if (!rank) {
1447     rowlengths = (int*) PetscMalloc( M*sizeof(int) ); CHKPTRQ(rowlengths);
1448     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1449     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1450     for ( i=0; i<size; i++ ) sndcounts[i] = rowners[i+1] - rowners[i];
1451     MPI_Scatterv(rowlengths,sndcounts,rowners,MPI_INT,ourlens,rend-rstart,MPI_INT,0,comm);
1452     PetscFree(sndcounts);
1453   }
1454   else {
1455     MPI_Scatterv(0,0,0,MPI_INT,ourlens,rend-rstart,MPI_INT, 0,comm);
1456   }
1457 
1458   if (!rank) {
1459     /* calculate the number of nonzeros on each processor */
1460     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1461     PetscMemzero(procsnz,size*sizeof(int));
1462     for ( i=0; i<size; i++ ) {
1463       for ( j=rowners[i]; j< rowners[i+1]; j++ ) {
1464         procsnz[i] += rowlengths[j];
1465       }
1466     }
1467     PetscFree(rowlengths);
1468 
1469     /* determine max buffer needed and allocate it */
1470     maxnz = 0;
1471     for ( i=0; i<size; i++ ) {
1472       maxnz = PetscMax(maxnz,procsnz[i]);
1473     }
1474     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1475 
1476     /* read in my part of the matrix column indices  */
1477     nz = procsnz[0];
1478     mycols = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1479     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1480 
1481     /* read in every one elses and ship off */
1482     for ( i=1; i<size; i++ ) {
1483       nz = procsnz[i];
1484       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1485       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1486     }
1487     PetscFree(cols);
1488   }
1489   else {
1490     /* determine buffer space needed for message */
1491     nz = 0;
1492     for ( i=0; i<m; i++ ) {
1493       nz += ourlens[i];
1494     }
1495     mycols = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1496 
1497     /* receive message of column indices*/
1498     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1499     MPI_Get_count(&status,MPI_INT,&maxnz);
1500     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIAIJ:something is wrong with file");
1501   }
1502 
1503   /* loop over local rows, determining number of off diagonal entries */
1504   PetscMemzero(offlens,m*sizeof(int));
1505   jj = 0;
1506   for ( i=0; i<m; i++ ) {
1507     for ( j=0; j<ourlens[i]; j++ ) {
1508       if (mycols[jj] < rstart || mycols[jj] >= rend) offlens[i]++;
1509       jj++;
1510     }
1511   }
1512 
1513   /* create our matrix */
1514   for ( i=0; i<m; i++ ) {
1515     ourlens[i] -= offlens[i];
1516   }
1517   ierr = MatCreateMPIAIJ(comm,m,PETSC_DECIDE,M,N,0,ourlens,0,offlens,newmat);CHKERRQ(ierr);
1518   A = *newmat;
1519   MatSetOption(A,MAT_COLUMNS_SORTED);
1520   for ( i=0; i<m; i++ ) {
1521     ourlens[i] += offlens[i];
1522   }
1523 
1524   if (!rank) {
1525     vals = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(vals);
1526 
1527     /* read in my part of the matrix numerical values  */
1528     nz = procsnz[0];
1529     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1530 
1531     /* insert into matrix */
1532     jj      = rstart;
1533     smycols = mycols;
1534     svals   = vals;
1535     for ( i=0; i<m; i++ ) {
1536       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1537       smycols += ourlens[i];
1538       svals   += ourlens[i];
1539       jj++;
1540     }
1541 
1542     /* read in other processors and ship out */
1543     for ( i=1; i<size; i++ ) {
1544       nz = procsnz[i];
1545       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1546       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1547     }
1548     PetscFree(procsnz);
1549   }
1550   else {
1551     /* receive numeric values */
1552     vals = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(vals);
1553 
1554     /* receive message of values*/
1555     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1556     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1557     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIAIJ:something is wrong with file");
1558 
1559     /* insert into matrix */
1560     jj      = rstart;
1561     smycols = mycols;
1562     svals   = vals;
1563     for ( i=0; i<m; i++ ) {
1564       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1565       smycols += ourlens[i];
1566       svals   += ourlens[i];
1567       jj++;
1568     }
1569   }
1570   PetscFree(ourlens); PetscFree(vals); PetscFree(mycols); PetscFree(rowners);
1571 
1572   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1573   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1574   return 0;
1575 }
1576