xref: /petsc/src/mat/impls/aij/mpi/mpiaij.c (revision 70f55243aafb320636e2a54ff30cab5d1e8d3d7b)
1 
2 #ifndef lint
3 static char vcid[] = "$Id: mpiaij.c,v 1.159 1996/08/06 16:51:19 balay Exp bsmith $";
4 #endif
5 
6 #include "src/mat/impls/aij/mpi/mpiaij.h"
7 #include "src/vec/vecimpl.h"
8 #include "src/inline/spops.h"
9 
10 /* local utility routine that creates a mapping from the global column
11 number to the local number in the off-diagonal part of the local
12 storage of the matrix.  This is done in a non scable way since the
13 length of colmap equals the global matrix length.
14 */
15 static int CreateColmap_MPIAIJ_Private(Mat mat)
16 {
17   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
18   Mat_SeqAIJ *B = (Mat_SeqAIJ*) aij->B->data;
19   int        n = B->n,i;
20 
21   aij->colmap = (int *) PetscMalloc(aij->N*sizeof(int));CHKPTRQ(aij->colmap);
22   PLogObjectMemory(mat,aij->N*sizeof(int));
23   PetscMemzero(aij->colmap,aij->N*sizeof(int));
24   for ( i=0; i<n; i++ ) aij->colmap[aij->garray[i]] = i+1;
25   return 0;
26 }
27 
28 extern int DisAssemble_MPIAIJ(Mat);
29 
30 static int MatGetReordering_MPIAIJ(Mat mat,MatReordering type,IS *rperm,IS *cperm)
31 {
32   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
33   int        ierr;
34   if (aij->size == 1) {
35     ierr = MatGetReordering(aij->A,type,rperm,cperm); CHKERRQ(ierr);
36   } else SETERRQ(1,"MatGetReordering_MPIAIJ:not supported in parallel");
37   return 0;
38 }
39 
40 static int MatSetValues_MPIAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
41 {
42   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
43   Scalar     value;
44   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
45   int        cstart = aij->cstart, cend = aij->cend,row,col;
46   int        roworiented = aij->roworiented;
47 
48   if (aij->insertmode != NOT_SET_VALUES && aij->insertmode != addv) {
49     SETERRQ(1,"MatSetValues_MPIAIJ:Cannot mix inserts and adds");
50   }
51   aij->insertmode = addv;
52   for ( i=0; i<m; i++ ) {
53     if (im[i] < 0) SETERRQ(1,"MatSetValues_MPIAIJ:Negative row");
54     if (im[i] >= aij->M) SETERRQ(1,"MatSetValues_MPIAIJ:Row too large");
55     if (im[i] >= rstart && im[i] < rend) {
56       row = im[i] - rstart;
57       for ( j=0; j<n; j++ ) {
58         if (in[j] < 0) SETERRQ(1,"MatSetValues_MPIAIJ:Negative column");
59         if (in[j] >= aij->N) SETERRQ(1,"MatSetValues_MPIAIJ:Col too large");
60         if (in[j] >= cstart && in[j] < cend){
61           col = in[j] - cstart;
62           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
63           ierr = MatSetValues(aij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
64         }
65         else {
66           if (mat->was_assembled) {
67             if (!aij->colmap) {
68               ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
69             }
70             col = aij->colmap[in[j]] - 1;
71             if (col < 0 && !((Mat_SeqAIJ*)(aij->A->data))->nonew) {
72               ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
73               col =  in[j];
74             }
75           }
76           else col = in[j];
77           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
78           ierr = MatSetValues(aij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
79         }
80       }
81     }
82     else {
83       if (roworiented) {
84         ierr = StashValues_Private(&aij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
85       }
86       else {
87         row = im[i];
88         for ( j=0; j<n; j++ ) {
89           ierr = StashValues_Private(&aij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
90         }
91       }
92     }
93   }
94   return 0;
95 }
96 
97 static int MatGetValues_MPIAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
98 {
99   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
100   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
101   int        cstart = aij->cstart, cend = aij->cend,row,col;
102 
103   for ( i=0; i<m; i++ ) {
104     if (idxm[i] < 0) SETERRQ(1,"MatGetValues_MPIAIJ:Negative row");
105     if (idxm[i] >= aij->M) SETERRQ(1,"MatGetValues_MPIAIJ:Row too large");
106     if (idxm[i] >= rstart && idxm[i] < rend) {
107       row = idxm[i] - rstart;
108       for ( j=0; j<n; j++ ) {
109         if (idxn[j] < 0) SETERRQ(1,"MatGetValues_MPIAIJ:Negative column");
110         if (idxn[j] >= aij->N) SETERRQ(1,"MatGetValues_MPIAIJ:Col too large");
111         if (idxn[j] >= cstart && idxn[j] < cend){
112           col = idxn[j] - cstart;
113           ierr = MatGetValues(aij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
114         }
115         else {
116           if (!aij->colmap) {
117             ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
118           }
119           col = aij->colmap[idxn[j]] - 1;
120           if ((col < 0) || (aij->garray[col] != idxn[j])) *(v+i*n+j) = 0.0;
121           else {
122             ierr = MatGetValues(aij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
123           }
124         }
125       }
126     }
127     else {
128       SETERRQ(1,"MatGetValues_MPIAIJ:Only local values currently supported");
129     }
130   }
131   return 0;
132 }
133 
134 static int MatAssemblyBegin_MPIAIJ(Mat mat,MatAssemblyType mode)
135 {
136   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
137   MPI_Comm    comm = mat->comm;
138   int         size = aij->size, *owners = aij->rowners;
139   int         rank = aij->rank,tag = mat->tag, *owner,*starts,count,ierr;
140   MPI_Request *send_waits,*recv_waits;
141   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
142   InsertMode  addv;
143   Scalar      *rvalues,*svalues;
144 
145   /* make sure all processors are either in INSERTMODE or ADDMODE */
146   MPI_Allreduce(&aij->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
147   if (addv == (ADD_VALUES|INSERT_VALUES)) {
148     SETERRQ(1,"MatAssemblyBegin_MPIAIJ:Some processors inserted others added");
149   }
150   aij->insertmode = addv; /* in case this processor had no cache */
151 
152   /*  first count number of contributors to each processor */
153   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
154   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
155   owner = (int *) PetscMalloc( (aij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
156   for ( i=0; i<aij->stash.n; i++ ) {
157     idx = aij->stash.idx[i];
158     for ( j=0; j<size; j++ ) {
159       if (idx >= owners[j] && idx < owners[j+1]) {
160         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
161       }
162     }
163   }
164   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
165 
166   /* inform other processors of number of messages and max length*/
167   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
168   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
169   nreceives = work[rank];
170   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
171   nmax = work[rank];
172   PetscFree(work);
173 
174   /* post receives:
175        1) each message will consist of ordered pairs
176      (global index,value) we store the global index as a double
177      to simplify the message passing.
178        2) since we don't know how long each individual message is we
179      allocate the largest needed buffer for each receive. Potentially
180      this is a lot of wasted space.
181 
182 
183        This could be done better.
184   */
185   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
186   CHKPTRQ(rvalues);
187   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
188   CHKPTRQ(recv_waits);
189   for ( i=0; i<nreceives; i++ ) {
190     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
191               comm,recv_waits+i);
192   }
193 
194   /* do sends:
195       1) starts[i] gives the starting index in svalues for stuff going to
196          the ith processor
197   */
198   svalues = (Scalar *) PetscMalloc(3*(aij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
199   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
200   CHKPTRQ(send_waits);
201   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
202   starts[0] = 0;
203   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
204   for ( i=0; i<aij->stash.n; i++ ) {
205     svalues[3*starts[owner[i]]]       = (Scalar)  aij->stash.idx[i];
206     svalues[3*starts[owner[i]]+1]     = (Scalar)  aij->stash.idy[i];
207     svalues[3*(starts[owner[i]]++)+2] =  aij->stash.array[i];
208   }
209   PetscFree(owner);
210   starts[0] = 0;
211   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
212   count = 0;
213   for ( i=0; i<size; i++ ) {
214     if (procs[i]) {
215       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
216                 comm,send_waits+count++);
217     }
218   }
219   PetscFree(starts); PetscFree(nprocs);
220 
221   /* Free cache space */
222   PLogInfo(0,"[%d]MatAssemblyBegin_MPIAIJ:Number of off processor values %d\n",rank,aij->stash.n);
223   ierr = StashDestroy_Private(&aij->stash); CHKERRQ(ierr);
224 
225   aij->svalues    = svalues;    aij->rvalues    = rvalues;
226   aij->nsends     = nsends;     aij->nrecvs     = nreceives;
227   aij->send_waits = send_waits; aij->recv_waits = recv_waits;
228   aij->rmax       = nmax;
229 
230   return 0;
231 }
232 extern int MatSetUpMultiply_MPIAIJ(Mat);
233 
234 static int MatAssemblyEnd_MPIAIJ(Mat mat,MatAssemblyType mode)
235 {
236   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
237   MPI_Status  *send_status,recv_status;
238   int         imdex,nrecvs = aij->nrecvs, count = nrecvs, i, n, ierr;
239   int         row,col,other_disassembled;
240   Scalar      *values,val;
241   InsertMode  addv = aij->insertmode;
242 
243   /*  wait on receives */
244   while (count) {
245     MPI_Waitany(nrecvs,aij->recv_waits,&imdex,&recv_status);
246     /* unpack receives into our local space */
247     values = aij->rvalues + 3*imdex*aij->rmax;
248     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
249     n = n/3;
250     for ( i=0; i<n; i++ ) {
251       row = (int) PetscReal(values[3*i]) - aij->rstart;
252       col = (int) PetscReal(values[3*i+1]);
253       val = values[3*i+2];
254       if (col >= aij->cstart && col < aij->cend) {
255         col -= aij->cstart;
256         MatSetValues(aij->A,1,&row,1,&col,&val,addv);
257       }
258       else {
259         if (mat->was_assembled) {
260           if (!aij->colmap) {
261             ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
262           }
263           col = aij->colmap[col] - 1;
264           if (col < 0  && !((Mat_SeqAIJ*)(aij->A->data))->nonew) {
265             ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
266             col = (int) PetscReal(values[3*i+1]);
267           }
268         }
269         MatSetValues(aij->B,1,&row,1,&col,&val,addv);
270       }
271     }
272     count--;
273   }
274   PetscFree(aij->recv_waits); PetscFree(aij->rvalues);
275 
276   /* wait on sends */
277   if (aij->nsends) {
278     send_status = (MPI_Status *) PetscMalloc(aij->nsends*sizeof(MPI_Status));
279     CHKPTRQ(send_status);
280     MPI_Waitall(aij->nsends,aij->send_waits,send_status);
281     PetscFree(send_status);
282   }
283   PetscFree(aij->send_waits); PetscFree(aij->svalues);
284 
285   aij->insertmode = NOT_SET_VALUES;
286   ierr = MatAssemblyBegin(aij->A,mode); CHKERRQ(ierr);
287   ierr = MatAssemblyEnd(aij->A,mode); CHKERRQ(ierr);
288 
289   /* determine if any processor has disassembled, if so we must
290      also disassemble ourselfs, in order that we may reassemble. */
291   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
292   if (mat->was_assembled && !other_disassembled) {
293     ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
294   }
295 
296   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
297     ierr = MatSetUpMultiply_MPIAIJ(mat); CHKERRQ(ierr);
298   }
299   ierr = MatAssemblyBegin(aij->B,mode); CHKERRQ(ierr);
300   ierr = MatAssemblyEnd(aij->B,mode); CHKERRQ(ierr);
301 
302   if (aij->rowvalues) {PetscFree(aij->rowvalues); aij->rowvalues = 0;}
303   return 0;
304 }
305 
306 static int MatZeroEntries_MPIAIJ(Mat A)
307 {
308   Mat_MPIAIJ *l = (Mat_MPIAIJ *) A->data;
309   int        ierr;
310   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
311   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
312   return 0;
313 }
314 
315 /* the code does not do the diagonal entries correctly unless the
316    matrix is square and the column and row owerships are identical.
317    This is a BUG. The only way to fix it seems to be to access
318    aij->A and aij->B directly and not through the MatZeroRows()
319    routine.
320 */
321 static int MatZeroRows_MPIAIJ(Mat A,IS is,Scalar *diag)
322 {
323   Mat_MPIAIJ     *l = (Mat_MPIAIJ *) A->data;
324   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
325   int            *procs,*nprocs,j,found,idx,nsends,*work;
326   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
327   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
328   int            *lens,imdex,*lrows,*values;
329   MPI_Comm       comm = A->comm;
330   MPI_Request    *send_waits,*recv_waits;
331   MPI_Status     recv_status,*send_status;
332   IS             istmp;
333 
334   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
335   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
336 
337   /*  first count number of contributors to each processor */
338   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
339   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
340   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
341   for ( i=0; i<N; i++ ) {
342     idx = rows[i];
343     found = 0;
344     for ( j=0; j<size; j++ ) {
345       if (idx >= owners[j] && idx < owners[j+1]) {
346         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
347       }
348     }
349     if (!found) SETERRQ(1,"MatZeroRows_MPIAIJ:Index out of range");
350   }
351   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
352 
353   /* inform other processors of number of messages and max length*/
354   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
355   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
356   nrecvs = work[rank];
357   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
358   nmax = work[rank];
359   PetscFree(work);
360 
361   /* post receives:   */
362   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
363   CHKPTRQ(rvalues);
364   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
365   CHKPTRQ(recv_waits);
366   for ( i=0; i<nrecvs; i++ ) {
367     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
368   }
369 
370   /* do sends:
371       1) starts[i] gives the starting index in svalues for stuff going to
372          the ith processor
373   */
374   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
375   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
376   CHKPTRQ(send_waits);
377   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
378   starts[0] = 0;
379   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
380   for ( i=0; i<N; i++ ) {
381     svalues[starts[owner[i]]++] = rows[i];
382   }
383   ISRestoreIndices(is,&rows);
384 
385   starts[0] = 0;
386   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
387   count = 0;
388   for ( i=0; i<size; i++ ) {
389     if (procs[i]) {
390       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
391     }
392   }
393   PetscFree(starts);
394 
395   base = owners[rank];
396 
397   /*  wait on receives */
398   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
399   source = lens + nrecvs;
400   count  = nrecvs; slen = 0;
401   while (count) {
402     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
403     /* unpack receives into our local space */
404     MPI_Get_count(&recv_status,MPI_INT,&n);
405     source[imdex]  = recv_status.MPI_SOURCE;
406     lens[imdex]  = n;
407     slen += n;
408     count--;
409   }
410   PetscFree(recv_waits);
411 
412   /* move the data into the send scatter */
413   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
414   count = 0;
415   for ( i=0; i<nrecvs; i++ ) {
416     values = rvalues + i*nmax;
417     for ( j=0; j<lens[i]; j++ ) {
418       lrows[count++] = values[j] - base;
419     }
420   }
421   PetscFree(rvalues); PetscFree(lens);
422   PetscFree(owner); PetscFree(nprocs);
423 
424   /* actually zap the local rows */
425   ierr = ISCreateSeq(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
426   PLogObjectParent(A,istmp);
427   PetscFree(lrows);
428   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
429   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
430   ierr = ISDestroy(istmp); CHKERRQ(ierr);
431 
432   /* wait on sends */
433   if (nsends) {
434     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
435     CHKPTRQ(send_status);
436     MPI_Waitall(nsends,send_waits,send_status);
437     PetscFree(send_status);
438   }
439   PetscFree(send_waits); PetscFree(svalues);
440 
441   return 0;
442 }
443 
444 static int MatMult_MPIAIJ(Mat A,Vec xx,Vec yy)
445 {
446   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
447   int        ierr,nt;
448 
449   ierr = VecGetLocalSize(xx,&nt);  CHKERRQ(ierr);
450   if (nt != a->n) {
451     SETERRQ(1,"MatMult_MPIAIJ:Incompatible parition of A and xx");
452   }
453   ierr = VecGetLocalSize(yy,&nt);  CHKERRQ(ierr);
454   if (nt != a->m) {
455     SETERRQ(1,"MatMult_MPIAIJ:Incompatible parition of A and yy");
456   }
457   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx); CHKERRQ(ierr);
458   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
459   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx); CHKERRQ(ierr);
460   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
461   return 0;
462 }
463 
464 static int MatMultAdd_MPIAIJ(Mat A,Vec xx,Vec yy,Vec zz)
465 {
466   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
467   int        ierr;
468   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
469   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
470   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
471   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
472   return 0;
473 }
474 
475 static int MatMultTrans_MPIAIJ(Mat A,Vec xx,Vec yy)
476 {
477   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
478   int        ierr;
479 
480   /* do nondiagonal part */
481   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
482   /* send it on its way */
483   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,
484                 (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
485   /* do local part */
486   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
487   /* receive remote parts: note this assumes the values are not actually */
488   /* inserted in yy until the next line, which is true for my implementation*/
489   /* but is not perhaps always true. */
490   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,
491                   (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
492   return 0;
493 }
494 
495 static int MatMultTransAdd_MPIAIJ(Mat A,Vec xx,Vec yy,Vec zz)
496 {
497   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
498   int        ierr;
499 
500   /* do nondiagonal part */
501   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
502   /* send it on its way */
503   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,
504                  (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
505   /* do local part */
506   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
507   /* receive remote parts: note this assumes the values are not actually */
508   /* inserted in yy until the next line, which is true for my implementation*/
509   /* but is not perhaps always true. */
510   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,
511                   (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
512   return 0;
513 }
514 
515 /*
516   This only works correctly for square matrices where the subblock A->A is the
517    diagonal block
518 */
519 static int MatGetDiagonal_MPIAIJ(Mat A,Vec v)
520 {
521   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
522   if (a->M != a->N)
523     SETERRQ(1,"MatGetDiagonal_MPIAIJ:Supports only square matrix where A->A is diag block");
524   return MatGetDiagonal(a->A,v);
525 }
526 
527 static int MatScale_MPIAIJ(Scalar *aa,Mat A)
528 {
529   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
530   int        ierr;
531   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
532   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
533   return 0;
534 }
535 
536 static int MatDestroy_MPIAIJ(PetscObject obj)
537 {
538   Mat        mat = (Mat) obj;
539   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
540   int        ierr;
541 #if defined(PETSC_LOG)
542   PLogObjectState(obj,"Rows=%d, Cols=%d",aij->M,aij->N);
543 #endif
544   PetscFree(aij->rowners);
545   ierr = MatDestroy(aij->A); CHKERRQ(ierr);
546   ierr = MatDestroy(aij->B); CHKERRQ(ierr);
547   if (aij->colmap) PetscFree(aij->colmap);
548   if (aij->garray) PetscFree(aij->garray);
549   if (aij->lvec)   VecDestroy(aij->lvec);
550   if (aij->Mvctx)  VecScatterDestroy(aij->Mvctx);
551   if (aij->rowvalues) PetscFree(aij->rowvalues);
552   PetscFree(aij);
553   PLogObjectDestroy(mat);
554   PetscHeaderDestroy(mat);
555   return 0;
556 }
557 #include "draw.h"
558 #include "pinclude/pviewer.h"
559 
560 static int MatView_MPIAIJ_Binary(Mat mat,Viewer viewer)
561 {
562   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
563   int         ierr;
564 
565   if (aij->size == 1) {
566     ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
567   }
568   else SETERRQ(1,"MatView_MPIAIJ_Binary:Only uniprocessor output supported");
569   return 0;
570 }
571 
572 static int MatView_MPIAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
573 {
574   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
575   Mat_SeqAIJ* C = (Mat_SeqAIJ*)aij->A->data;
576   int         ierr, format,shift = C->indexshift,rank;
577   FILE        *fd;
578   ViewerType  vtype;
579 
580   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
581   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
582     ierr = ViewerGetFormat(viewer,&format);
583     if (format == ASCII_FORMAT_INFO_DETAILED) {
584       int nz, nzalloc, mem, flg;
585       MPI_Comm_rank(mat->comm,&rank);
586       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
587       ierr = MatGetInfo(mat,MAT_LOCAL,&nz,&nzalloc,&mem);
588       ierr = OptionsHasName(PETSC_NULL,"-mat_aij_no_inode",&flg); CHKERRQ(ierr);
589       PetscSequentialPhaseBegin(mat->comm,1);
590       if (flg) fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d mem %d, not using I-node routines\n",
591          rank,aij->m,nz,nzalloc,mem);
592       else fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d mem %d, using I-node routines\n",
593          rank,aij->m,nz,nzalloc,mem);
594       ierr = MatGetInfo(aij->A,MAT_LOCAL,&nz,&nzalloc,&mem);
595       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,nz);
596       ierr = MatGetInfo(aij->B,MAT_LOCAL,&nz,&nzalloc,&mem);
597       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,nz);
598       fflush(fd);
599       PetscSequentialPhaseEnd(mat->comm,1);
600       ierr = VecScatterView(aij->Mvctx,viewer); CHKERRQ(ierr);
601       return 0;
602     }
603     else if (format == ASCII_FORMAT_INFO) {
604       return 0;
605     }
606   }
607 
608   if (vtype == DRAW_VIEWER) {
609     Draw       draw;
610     PetscTruth isnull;
611     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
612     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
613   }
614 
615   if (vtype == ASCII_FILE_VIEWER) {
616     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
617     PetscSequentialPhaseBegin(mat->comm,1);
618     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
619            aij->rank,aij->m,aij->rstart,aij->rend,aij->n,aij->cstart,
620            aij->cend);
621     ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
622     ierr = MatView(aij->B,viewer); CHKERRQ(ierr);
623     fflush(fd);
624     PetscSequentialPhaseEnd(mat->comm,1);
625   }
626   else {
627     int size = aij->size;
628     rank = aij->rank;
629     if (size == 1) {
630       ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
631     }
632     else {
633       /* assemble the entire matrix onto first processor. */
634       Mat         A;
635       Mat_SeqAIJ *Aloc;
636       int         M = aij->M, N = aij->N,m,*ai,*aj,row,*cols,i,*ct;
637       Scalar      *a;
638 
639       if (!rank) {
640         ierr = MatCreateMPIAIJ(mat->comm,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
641                CHKERRQ(ierr);
642       }
643       else {
644         ierr = MatCreateMPIAIJ(mat->comm,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
645                CHKERRQ(ierr);
646       }
647       PLogObjectParent(mat,A);
648 
649       /* copy over the A part */
650       Aloc = (Mat_SeqAIJ*) aij->A->data;
651       m = Aloc->m; ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
652       row = aij->rstart;
653       for ( i=0; i<ai[m]+shift; i++ ) {aj[i] += aij->cstart + shift;}
654       for ( i=0; i<m; i++ ) {
655         ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],aj,a,INSERT_VALUES);CHKERRQ(ierr);
656         row++; a += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
657       }
658       aj = Aloc->j;
659       for ( i=0; i<ai[m]+shift; i++ ) {aj[i] -= aij->cstart + shift;}
660 
661       /* copy over the B part */
662       Aloc = (Mat_SeqAIJ*) aij->B->data;
663       m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
664       row = aij->rstart;
665       ct = cols = (int *) PetscMalloc( (ai[m]+1)*sizeof(int) ); CHKPTRQ(cols);
666       for ( i=0; i<ai[m]+shift; i++ ) {cols[i] = aij->garray[aj[i]+shift];}
667       for ( i=0; i<m; i++ ) {
668         ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],cols,a,INSERT_VALUES);CHKERRQ(ierr);
669         row++; a += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
670       }
671       PetscFree(ct);
672       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
673       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
674       if (!rank) {
675         ierr = MatView(((Mat_MPIAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
676       }
677       ierr = MatDestroy(A); CHKERRQ(ierr);
678     }
679   }
680   return 0;
681 }
682 
683 static int MatView_MPIAIJ(PetscObject obj,Viewer viewer)
684 {
685   Mat         mat = (Mat) obj;
686   int         ierr;
687   ViewerType  vtype;
688 
689   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
690   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
691       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
692     ierr = MatView_MPIAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
693   }
694   else if (vtype == BINARY_FILE_VIEWER) {
695     return MatView_MPIAIJ_Binary(mat,viewer);
696   }
697   return 0;
698 }
699 
700 /*
701     This has to provide several versions.
702 
703      2) a) use only local smoothing updating outer values only once.
704         b) local smoothing updating outer values each inner iteration
705      3) color updating out values betwen colors.
706 */
707 static int MatRelax_MPIAIJ(Mat matin,Vec bb,double omega,MatSORType flag,
708                            double fshift,int its,Vec xx)
709 {
710   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
711   Mat        AA = mat->A, BB = mat->B;
712   Mat_SeqAIJ *A = (Mat_SeqAIJ *) AA->data, *B = (Mat_SeqAIJ *)BB->data;
713   Scalar     *b,*x,*xs,*ls,d,*v,sum;
714   int        ierr,*idx, *diag;
715   int        n = mat->n, m = mat->m, i,shift = A->indexshift;
716 
717   VecGetArray(xx,&x); VecGetArray(bb,&b); VecGetArray(mat->lvec,&ls);
718   xs = x + shift; /* shift by one for index start of 1 */
719   ls = ls + shift;
720   if (!A->diag) {if ((ierr = MatMarkDiag_SeqAIJ(AA))) return ierr;}
721   diag = A->diag;
722   if ((flag & SOR_LOCAL_SYMMETRIC_SWEEP) == SOR_LOCAL_SYMMETRIC_SWEEP){
723     if (flag & SOR_ZERO_INITIAL_GUESS) {
724       return (*mat->A->ops.relax)(mat->A,bb,omega,flag,fshift,its,xx);
725     }
726     ierr=VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,mat->Mvctx);
727     CHKERRQ(ierr);
728     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,mat->Mvctx);
729     CHKERRQ(ierr);
730     while (its--) {
731       /* go down through the rows */
732       for ( i=0; i<m; i++ ) {
733         n    = A->i[i+1] - A->i[i];
734         idx  = A->j + A->i[i] + shift;
735         v    = A->a + A->i[i] + shift;
736         sum  = b[i];
737         SPARSEDENSEMDOT(sum,xs,v,idx,n);
738         d    = fshift + A->a[diag[i]+shift];
739         n    = B->i[i+1] - B->i[i];
740         idx  = B->j + B->i[i] + shift;
741         v    = B->a + B->i[i] + shift;
742         SPARSEDENSEMDOT(sum,ls,v,idx,n);
743         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
744       }
745       /* come up through the rows */
746       for ( i=m-1; i>-1; i-- ) {
747         n    = A->i[i+1] - A->i[i];
748         idx  = A->j + A->i[i] + shift;
749         v    = A->a + A->i[i] + shift;
750         sum  = b[i];
751         SPARSEDENSEMDOT(sum,xs,v,idx,n);
752         d    = fshift + A->a[diag[i]+shift];
753         n    = B->i[i+1] - B->i[i];
754         idx  = B->j + B->i[i] + shift;
755         v    = B->a + B->i[i] + shift;
756         SPARSEDENSEMDOT(sum,ls,v,idx,n);
757         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
758       }
759     }
760   }
761   else if (flag & SOR_LOCAL_FORWARD_SWEEP){
762     if (flag & SOR_ZERO_INITIAL_GUESS) {
763       return (*mat->A->ops.relax)(mat->A,bb,omega,flag,fshift,its,xx);
764     }
765     ierr=VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,mat->Mvctx);
766     CHKERRQ(ierr);
767     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,mat->Mvctx);
768     CHKERRQ(ierr);
769     while (its--) {
770       for ( i=0; i<m; i++ ) {
771         n    = A->i[i+1] - A->i[i];
772         idx  = A->j + A->i[i] + shift;
773         v    = A->a + A->i[i] + shift;
774         sum  = b[i];
775         SPARSEDENSEMDOT(sum,xs,v,idx,n);
776         d    = fshift + A->a[diag[i]+shift];
777         n    = B->i[i+1] - B->i[i];
778         idx  = B->j + B->i[i] + shift;
779         v    = B->a + B->i[i] + shift;
780         SPARSEDENSEMDOT(sum,ls,v,idx,n);
781         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
782       }
783     }
784   }
785   else if (flag & SOR_LOCAL_BACKWARD_SWEEP){
786     if (flag & SOR_ZERO_INITIAL_GUESS) {
787       return (*mat->A->ops.relax)(mat->A,bb,omega,flag,fshift,its,xx);
788     }
789     ierr = VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,
790                             mat->Mvctx); CHKERRQ(ierr);
791     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,
792                             mat->Mvctx); CHKERRQ(ierr);
793     while (its--) {
794       for ( i=m-1; i>-1; i-- ) {
795         n    = A->i[i+1] - A->i[i];
796         idx  = A->j + A->i[i] + shift;
797         v    = A->a + A->i[i] + shift;
798         sum  = b[i];
799         SPARSEDENSEMDOT(sum,xs,v,idx,n);
800         d    = fshift + A->a[diag[i]+shift];
801         n    = B->i[i+1] - B->i[i];
802         idx  = B->j + B->i[i] + shift;
803         v    = B->a + B->i[i] + shift;
804         SPARSEDENSEMDOT(sum,ls,v,idx,n);
805         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
806       }
807     }
808   }
809   else {
810     SETERRQ(1,"MatRelax_MPIAIJ:Option not supported");
811   }
812   return 0;
813 }
814 
815 static int MatGetInfo_MPIAIJ(Mat matin,MatInfoType flag,int *nz,
816                              int *nzalloc,int *mem)
817 {
818   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
819   Mat        A = mat->A, B = mat->B;
820   int        ierr, isend[3], irecv[3], nzA, nzallocA, memA;
821 
822   ierr = MatGetInfo(A,MAT_LOCAL,&nzA,&nzallocA,&memA); CHKERRQ(ierr);
823   ierr = MatGetInfo(B,MAT_LOCAL,&isend[0],&isend[1],&isend[2]); CHKERRQ(ierr);
824   isend[0] += nzA; isend[1] += nzallocA; isend[2] += memA;
825   if (flag == MAT_LOCAL) {
826     if (nz)       *nz      = isend[0];
827     if (nzalloc)  *nzalloc = isend[1];
828     if (mem)      *mem     = isend[2];
829   } else if (flag == MAT_GLOBAL_MAX) {
830     MPI_Allreduce( isend, irecv,3,MPI_INT,MPI_MAX,matin->comm);
831     if (nz)      *nz      = irecv[0];
832     if (nzalloc) *nzalloc = irecv[1];
833     if (mem)     *mem     = irecv[2];
834   } else if (flag == MAT_GLOBAL_SUM) {
835     MPI_Allreduce( isend, irecv,3,MPI_INT,MPI_SUM,matin->comm);
836     if (nz)      *nz      = irecv[0];
837     if (nzalloc) *nzalloc = irecv[1];
838     if (mem)     *mem     = irecv[2];
839   }
840   return 0;
841 }
842 
843 extern int MatLUFactorSymbolic_MPIAIJ(Mat,IS,IS,double,Mat*);
844 extern int MatLUFactorNumeric_MPIAIJ(Mat,Mat*);
845 extern int MatLUFactor_MPIAIJ(Mat,IS,IS,double);
846 extern int MatILUFactorSymbolic_MPIAIJ(Mat,IS,IS,double,int,Mat *);
847 extern int MatSolve_MPIAIJ(Mat,Vec,Vec);
848 extern int MatSolveAdd_MPIAIJ(Mat,Vec,Vec,Vec);
849 extern int MatSolveTrans_MPIAIJ(Mat,Vec,Vec);
850 extern int MatSolveTransAdd_MPIAIJ(Mat,Vec,Vec,Vec);
851 
852 static int MatSetOption_MPIAIJ(Mat A,MatOption op)
853 {
854   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
855 
856   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
857       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
858       op == MAT_COLUMNS_SORTED ||
859       op == MAT_ROW_ORIENTED) {
860         MatSetOption(a->A,op);
861         MatSetOption(a->B,op);
862   }
863   else if (op == MAT_ROWS_SORTED ||
864            op == MAT_SYMMETRIC ||
865            op == MAT_STRUCTURALLY_SYMMETRIC ||
866            op == MAT_YES_NEW_DIAGONALS)
867     PLogInfo(A,"Info:MatSetOption_MPIAIJ:Option ignored\n");
868   else if (op == MAT_COLUMN_ORIENTED) {
869     a->roworiented = 0;
870     MatSetOption(a->A,op);
871     MatSetOption(a->B,op);
872   }
873   else if (op == MAT_NO_NEW_DIAGONALS)
874     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIAIJ:MAT_NO_NEW_DIAGONALS");}
875   else
876     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIAIJ:unknown option");}
877   return 0;
878 }
879 
880 static int MatGetSize_MPIAIJ(Mat matin,int *m,int *n)
881 {
882   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
883   *m = mat->M; *n = mat->N;
884   return 0;
885 }
886 
887 static int MatGetLocalSize_MPIAIJ(Mat matin,int *m,int *n)
888 {
889   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
890   *m = mat->m; *n = mat->N;
891   return 0;
892 }
893 
894 static int MatGetOwnershipRange_MPIAIJ(Mat matin,int *m,int *n)
895 {
896   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
897   *m = mat->rstart; *n = mat->rend;
898   return 0;
899 }
900 
901 extern int MatGetRow_SeqAIJ(Mat,int,int*,int**,Scalar**);
902 extern int MatRestoreRow_SeqAIJ(Mat,int,int*,int**,Scalar**);
903 
904 int MatGetRow_MPIAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
905 {
906   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
907   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
908   int        i, ierr, *cworkA, *cworkB, **pcA, **pcB, cstart = mat->cstart;
909   int        nztot, nzA, nzB, lrow, rstart = mat->rstart, rend = mat->rend;
910   int        *cmap, *idx_p;
911 
912   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,"MatGetRow_MPIAIJ:Already active");
913   mat->getrowactive = PETSC_TRUE;
914 
915   if (!mat->rowvalues && (idx || v)) {
916     /*
917         allocate enough space to hold information from the longest row.
918     */
919     Mat_SeqAIJ *Aa = (Mat_SeqAIJ *) mat->A->data,*Ba = (Mat_SeqAIJ *) mat->B->data;
920     int     max = 1,m = mat->m,tmp;
921     for ( i=0; i<m; i++ ) {
922       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
923       if (max < tmp) { max = tmp; }
924     }
925     mat->rowvalues = (Scalar *) PetscMalloc( max*(sizeof(int)+sizeof(Scalar)));
926     CHKPTRQ(mat->rowvalues);
927     mat->rowindices = (int *) (mat->rowvalues + max);
928   }
929 
930   if (row < rstart || row >= rend) SETERRQ(1,"MatGetRow_MPIAIJ:Only local rows")
931   lrow = row - rstart;
932 
933   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
934   if (!v)   {pvA = 0; pvB = 0;}
935   if (!idx) {pcA = 0; if (!v) pcB = 0;}
936   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
937   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
938   nztot = nzA + nzB;
939 
940   cmap  = mat->garray;
941   if (v  || idx) {
942     if (nztot) {
943       /* Sort by increasing column numbers, assuming A and B already sorted */
944       int imark = -1;
945       if (v) {
946         *v = v_p = mat->rowvalues;
947         for ( i=0; i<nzB; i++ ) {
948           if (cmap[cworkB[i]] < cstart)   v_p[i] = vworkB[i];
949           else break;
950         }
951         imark = i;
952         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
953         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
954       }
955       if (idx) {
956         *idx = idx_p = mat->rowindices;
957         if (imark > -1) {
958           for ( i=0; i<imark; i++ ) {
959             idx_p[i] = cmap[cworkB[i]];
960           }
961         } else {
962           for ( i=0; i<nzB; i++ ) {
963             if (cmap[cworkB[i]] < cstart)   idx_p[i] = cmap[cworkB[i]];
964             else break;
965           }
966           imark = i;
967         }
968         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart + cworkA[i];
969         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]];
970       }
971     }
972     else {
973       if (idx) *idx = 0;
974       if (v)   *v   = 0;
975     }
976   }
977   *nz = nztot;
978   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
979   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
980   return 0;
981 }
982 
983 int MatRestoreRow_MPIAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
984 {
985   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
986   if (aij->getrowactive == PETSC_FALSE) {
987     SETERRQ(1,"MatRestoreRow_MPIAIJ:MatGetRow not called");
988   }
989   aij->getrowactive = PETSC_FALSE;
990   return 0;
991 }
992 
993 static int MatNorm_MPIAIJ(Mat mat,NormType type,double *norm)
994 {
995   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
996   Mat_SeqAIJ *amat = (Mat_SeqAIJ*) aij->A->data, *bmat = (Mat_SeqAIJ*) aij->B->data;
997   int        ierr, i, j, cstart = aij->cstart,shift = amat->indexshift;
998   double     sum = 0.0;
999   Scalar     *v;
1000 
1001   if (aij->size == 1) {
1002     ierr =  MatNorm(aij->A,type,norm); CHKERRQ(ierr);
1003   } else {
1004     if (type == NORM_FROBENIUS) {
1005       v = amat->a;
1006       for (i=0; i<amat->nz; i++ ) {
1007 #if defined(PETSC_COMPLEX)
1008         sum += real(conj(*v)*(*v)); v++;
1009 #else
1010         sum += (*v)*(*v); v++;
1011 #endif
1012       }
1013       v = bmat->a;
1014       for (i=0; i<bmat->nz; i++ ) {
1015 #if defined(PETSC_COMPLEX)
1016         sum += real(conj(*v)*(*v)); v++;
1017 #else
1018         sum += (*v)*(*v); v++;
1019 #endif
1020       }
1021       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
1022       *norm = sqrt(*norm);
1023     }
1024     else if (type == NORM_1) { /* max column norm */
1025       double *tmp, *tmp2;
1026       int    *jj, *garray = aij->garray;
1027       tmp  = (double *) PetscMalloc( aij->N*sizeof(double) ); CHKPTRQ(tmp);
1028       tmp2 = (double *) PetscMalloc( aij->N*sizeof(double) ); CHKPTRQ(tmp2);
1029       PetscMemzero(tmp,aij->N*sizeof(double));
1030       *norm = 0.0;
1031       v = amat->a; jj = amat->j;
1032       for ( j=0; j<amat->nz; j++ ) {
1033         tmp[cstart + *jj++ + shift] += PetscAbsScalar(*v);  v++;
1034       }
1035       v = bmat->a; jj = bmat->j;
1036       for ( j=0; j<bmat->nz; j++ ) {
1037         tmp[garray[*jj++ + shift]] += PetscAbsScalar(*v); v++;
1038       }
1039       MPI_Allreduce(tmp,tmp2,aij->N,MPI_DOUBLE,MPI_SUM,mat->comm);
1040       for ( j=0; j<aij->N; j++ ) {
1041         if (tmp2[j] > *norm) *norm = tmp2[j];
1042       }
1043       PetscFree(tmp); PetscFree(tmp2);
1044     }
1045     else if (type == NORM_INFINITY) { /* max row norm */
1046       double ntemp = 0.0;
1047       for ( j=0; j<amat->m; j++ ) {
1048         v = amat->a + amat->i[j] + shift;
1049         sum = 0.0;
1050         for ( i=0; i<amat->i[j+1]-amat->i[j]; i++ ) {
1051           sum += PetscAbsScalar(*v); v++;
1052         }
1053         v = bmat->a + bmat->i[j] + shift;
1054         for ( i=0; i<bmat->i[j+1]-bmat->i[j]; i++ ) {
1055           sum += PetscAbsScalar(*v); v++;
1056         }
1057         if (sum > ntemp) ntemp = sum;
1058       }
1059       MPI_Allreduce(&ntemp,norm,1,MPI_DOUBLE,MPI_MAX,mat->comm);
1060     }
1061     else {
1062       SETERRQ(1,"MatNorm_MPIAIJ:No support for two norm");
1063     }
1064   }
1065   return 0;
1066 }
1067 
1068 static int MatTranspose_MPIAIJ(Mat A,Mat *matout)
1069 {
1070   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
1071   Mat_SeqAIJ *Aloc = (Mat_SeqAIJ *) a->A->data;
1072   int        ierr,shift = Aloc->indexshift;
1073   Mat        B;
1074   int        M = a->M, N = a->N,m,*ai,*aj,row,*cols,i,*ct;
1075   Scalar     *array;
1076 
1077   if (matout == PETSC_NULL && M != N)
1078     SETERRQ(1,"MatTranspose_MPIAIJ:Square matrix only for in-place");
1079   ierr = MatCreateMPIAIJ(A->comm,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,
1080          PETSC_NULL,&B); CHKERRQ(ierr);
1081 
1082   /* copy over the A part */
1083   Aloc = (Mat_SeqAIJ*) a->A->data;
1084   m = Aloc->m; ai = Aloc->i; aj = Aloc->j; array = Aloc->a;
1085   row = a->rstart;
1086   for ( i=0; i<ai[m]+shift; i++ ) {aj[i] += a->cstart + shift;}
1087   for ( i=0; i<m; i++ ) {
1088     ierr = MatSetValues(B,ai[i+1]-ai[i],aj,1,&row,array,INSERT_VALUES);CHKERRQ(ierr);
1089     row++; array += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
1090   }
1091   aj = Aloc->j;
1092   for ( i=0; i<ai[m]+shift; i++ ) {aj[i] -= a->cstart + shift;}
1093 
1094   /* copy over the B part */
1095   Aloc = (Mat_SeqAIJ*) a->B->data;
1096   m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; array = Aloc->a;
1097   row = a->rstart;
1098   ct = cols = (int *) PetscMalloc( (1+ai[m]-shift)*sizeof(int) ); CHKPTRQ(cols);
1099   for ( i=0; i<ai[m]+shift; i++ ) {cols[i] = a->garray[aj[i]+shift];}
1100   for ( i=0; i<m; i++ ) {
1101     ierr = MatSetValues(B,ai[i+1]-ai[i],cols,1,&row,array,INSERT_VALUES);CHKERRQ(ierr);
1102     row++; array += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
1103   }
1104   PetscFree(ct);
1105   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1106   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1107   if (matout != PETSC_NULL) {
1108     *matout = B;
1109   } else {
1110     /* This isn't really an in-place transpose .... but free data structures from a */
1111     PetscFree(a->rowners);
1112     ierr = MatDestroy(a->A); CHKERRQ(ierr);
1113     ierr = MatDestroy(a->B); CHKERRQ(ierr);
1114     if (a->colmap) PetscFree(a->colmap);
1115     if (a->garray) PetscFree(a->garray);
1116     if (a->lvec) VecDestroy(a->lvec);
1117     if (a->Mvctx) VecScatterDestroy(a->Mvctx);
1118     PetscFree(a);
1119     PetscMemcpy(A,B,sizeof(struct _Mat));
1120     PetscHeaderDestroy(B);
1121   }
1122   return 0;
1123 }
1124 
1125 extern int MatPrintHelp_SeqAIJ(Mat);
1126 static int MatPrintHelp_MPIAIJ(Mat A)
1127 {
1128   Mat_MPIAIJ *a   = (Mat_MPIAIJ*) A->data;
1129 
1130   if (!a->rank) return MatPrintHelp_SeqAIJ(a->A);
1131   else return 0;
1132 }
1133 
1134 static int MatGetBlockSize_MPIAIJ(Mat A,int *bs)
1135 {
1136   *bs = 1;
1137   return 0;
1138 }
1139 
1140 extern int MatConvert_MPIAIJ(Mat,MatType,Mat *);
1141 static int MatConvertSameType_MPIAIJ(Mat,Mat *,int);
1142 extern int MatIncreaseOverlap_MPIAIJ(Mat , int, IS *, int);
1143 int MatGetSubMatrices_MPIAIJ (Mat ,int , IS *,IS *,MatGetSubMatrixCall,Mat **);
1144 /* -------------------------------------------------------------------*/
1145 static struct _MatOps MatOps = {MatSetValues_MPIAIJ,
1146        MatGetRow_MPIAIJ,MatRestoreRow_MPIAIJ,
1147        MatMult_MPIAIJ,MatMultAdd_MPIAIJ,
1148        MatMultTrans_MPIAIJ,MatMultTransAdd_MPIAIJ,
1149        MatSolve_MPIAIJ,MatSolveAdd_MPIAIJ,
1150        MatSolveTrans_MPIAIJ,MatSolveTransAdd_MPIAIJ,
1151        MatLUFactor_MPIAIJ,0,
1152        MatRelax_MPIAIJ,
1153        MatTranspose_MPIAIJ,
1154        MatGetInfo_MPIAIJ,0,
1155        MatGetDiagonal_MPIAIJ,0,MatNorm_MPIAIJ,
1156        MatAssemblyBegin_MPIAIJ,MatAssemblyEnd_MPIAIJ,
1157        0,
1158        MatSetOption_MPIAIJ,MatZeroEntries_MPIAIJ,MatZeroRows_MPIAIJ,
1159        MatGetReordering_MPIAIJ,
1160        MatLUFactorSymbolic_MPIAIJ,MatLUFactorNumeric_MPIAIJ,0,0,
1161        MatGetSize_MPIAIJ,MatGetLocalSize_MPIAIJ,MatGetOwnershipRange_MPIAIJ,
1162        MatILUFactorSymbolic_MPIAIJ,0,
1163        0,0,MatConvert_MPIAIJ,MatConvertSameType_MPIAIJ,0,0,
1164        0,0,0,
1165        MatGetSubMatrices_MPIAIJ,MatIncreaseOverlap_MPIAIJ,MatGetValues_MPIAIJ,0,
1166        MatPrintHelp_MPIAIJ,
1167        MatScale_MPIAIJ,0,0,0,MatGetBlockSize_MPIAIJ};
1168 
1169 /*@C
1170    MatCreateMPIAIJ - Creates a sparse parallel matrix in AIJ format
1171    (the default parallel PETSc format).  For good matrix assembly performance
1172    the user should preallocate the matrix storage by setting the parameters
1173    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1174    performance can be increased by more than a factor of 50.
1175 
1176    Input Parameters:
1177 .  comm - MPI communicator
1178 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1179            This value should be the same as the local size used in creating the
1180            y vector for the matrix-vector product y = Ax.
1181 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1182            This value should be the same as the local size used in creating the
1183            x vector for the matrix-vector product y = Ax.
1184 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1185 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1186 .  d_nz - number of nonzeros per row in diagonal portion of local submatrix
1187            (same for all local rows)
1188 .  d_nzz - array containing the number of nonzeros in the various rows of the
1189            diagonal portion of the local submatrix (possibly different for each row)
1190            or PETSC_NULL. You must leave room for the diagonal entry even if
1191            it is zero.
1192 .  o_nz - number of nonzeros per row in the off-diagonal portion of local
1193            submatrix (same for all local rows).
1194 .  o_nzz - array containing the number of nonzeros in the various rows of the
1195            off-diagonal portion of the local submatrix (possibly different for
1196            each row) or PETSC_NULL.
1197 
1198    Output Parameter:
1199 .  A - the matrix
1200 
1201    Notes:
1202    The AIJ format (also called the Yale sparse matrix format or
1203    compressed row storage), is fully compatible with standard Fortran 77
1204    storage.  That is, the stored row and column indices can begin at
1205    either one (as in Fortran) or zero.  See the users manual for details.
1206 
1207    The user MUST specify either the local or global matrix dimensions
1208    (possibly both).
1209 
1210    By default, this format uses inodes (identical nodes) when possible.
1211    We search for consecutive rows with the same nonzero structure, thereby
1212    reusing matrix information to achieve increased efficiency.
1213 
1214    Options Database Keys:
1215 $    -mat_aij_no_inode  - Do not use inodes
1216 $    -mat_aij_inode_limit <limit> - Set inode limit.
1217 $        (max limit=5)
1218 $    -mat_aij_oneindex - Internally use indexing starting at 1
1219 $        rather than 0.  Note: When calling MatSetValues(),
1220 $        the user still MUST index entries starting at 0!
1221 
1222    Storage Information:
1223    For a square global matrix we define each processor's diagonal portion
1224    to be its local rows and the corresponding columns (a square submatrix);
1225    each processor's off-diagonal portion encompasses the remainder of the
1226    local matrix (a rectangular submatrix).
1227 
1228    The user can specify preallocated storage for the diagonal part of
1229    the local submatrix with either d_nz or d_nnz (not both).  Set
1230    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1231    memory allocation.  Likewise, specify preallocated storage for the
1232    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1233 
1234    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1235    the figure below we depict these three local rows and all columns (0-11).
1236 
1237 $          0 1 2 3 4 5 6 7 8 9 10 11
1238 $         -------------------
1239 $  row 3  |  o o o d d d o o o o o o
1240 $  row 4  |  o o o d d d o o o o o o
1241 $  row 5  |  o o o d d d o o o o o o
1242 $         -------------------
1243 $
1244 
1245    Thus, any entries in the d locations are stored in the d (diagonal)
1246    submatrix, and any entries in the o locations are stored in the
1247    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1248    stored simply in the MATSEQAIJ format for compressed row storage.
1249 
1250    Now d_nz should indicate the number of nonzeros per row in the d matrix,
1251    and o_nz should indicate the number of nonzeros per row in the o matrix.
1252    In general, for PDE problems in which most nonzeros are near the diagonal,
1253    one expects d_nz >> o_nz. For large problems you MUST preallocate memory
1254    or you will get TERRIBLE performance; see the users' manual chapter on
1255    matrices and the file $(PETSC_DIR)/Performance.
1256 
1257 .keywords: matrix, aij, compressed row, sparse, parallel
1258 
1259 .seealso: MatCreate(), MatCreateSeqAIJ(), MatSetValues()
1260 @*/
1261 int MatCreateMPIAIJ(MPI_Comm comm,int m,int n,int M,int N,
1262                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1263 {
1264   Mat          B;
1265   Mat_MPIAIJ   *b;
1266   int          ierr, i,sum[2],work[2];
1267 
1268   *A = 0;
1269   PetscHeaderCreate(B,_Mat,MAT_COOKIE,MATMPIAIJ,comm);
1270   PLogObjectCreate(B);
1271   B->data       = (void *) (b = PetscNew(Mat_MPIAIJ)); CHKPTRQ(b);
1272   PetscMemzero(b,sizeof(Mat_MPIAIJ));
1273   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1274   B->destroy    = MatDestroy_MPIAIJ;
1275   B->view       = MatView_MPIAIJ;
1276   B->factor     = 0;
1277   B->assembled  = PETSC_FALSE;
1278 
1279   b->insertmode = NOT_SET_VALUES;
1280   MPI_Comm_rank(comm,&b->rank);
1281   MPI_Comm_size(comm,&b->size);
1282 
1283   if (m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
1284     SETERRQ(1,"MatCreateMPIAIJ:Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1285 
1286   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1287     work[0] = m; work[1] = n;
1288     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
1289     if (M == PETSC_DECIDE) M = sum[0];
1290     if (N == PETSC_DECIDE) N = sum[1];
1291   }
1292   if (m == PETSC_DECIDE) {m = M/b->size + ((M % b->size) > b->rank);}
1293   if (n == PETSC_DECIDE) {n = N/b->size + ((N % b->size) > b->rank);}
1294   b->m = m; B->m = m;
1295   b->n = n; B->n = n;
1296   b->N = N; B->N = N;
1297   b->M = M; B->M = M;
1298 
1299   /* build local table of row and column ownerships */
1300   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1301   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIAIJ));
1302   b->cowners = b->rowners + b->size + 2;
1303   MPI_Allgather(&m,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
1304   b->rowners[0] = 0;
1305   for ( i=2; i<=b->size; i++ ) {
1306     b->rowners[i] += b->rowners[i-1];
1307   }
1308   b->rstart = b->rowners[b->rank];
1309   b->rend   = b->rowners[b->rank+1];
1310   MPI_Allgather(&n,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
1311   b->cowners[0] = 0;
1312   for ( i=2; i<=b->size; i++ ) {
1313     b->cowners[i] += b->cowners[i-1];
1314   }
1315   b->cstart = b->cowners[b->rank];
1316   b->cend   = b->cowners[b->rank+1];
1317 
1318   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1319   ierr = MatCreateSeqAIJ(MPI_COMM_SELF,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1320   PLogObjectParent(B,b->A);
1321   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1322   ierr = MatCreateSeqAIJ(MPI_COMM_SELF,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1323   PLogObjectParent(B,b->B);
1324 
1325   /* build cache for off array entries formed */
1326   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1327   b->colmap      = 0;
1328   b->garray      = 0;
1329   b->roworiented = 1;
1330 
1331   /* stuff used for matrix vector multiply */
1332   b->lvec      = 0;
1333   b->Mvctx     = 0;
1334 
1335   /* stuff for MatGetRow() */
1336   b->rowindices   = 0;
1337   b->rowvalues    = 0;
1338   b->getrowactive = PETSC_FALSE;
1339 
1340   *A = B;
1341   return 0;
1342 }
1343 
1344 static int MatConvertSameType_MPIAIJ(Mat matin,Mat *newmat,int cpvalues)
1345 {
1346   Mat        mat;
1347   Mat_MPIAIJ *a,*oldmat = (Mat_MPIAIJ *) matin->data;
1348   int        ierr, len=0, flg;
1349 
1350   *newmat       = 0;
1351   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIAIJ,matin->comm);
1352   PLogObjectCreate(mat);
1353   mat->data       = (void *) (a = PetscNew(Mat_MPIAIJ)); CHKPTRQ(a);
1354   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1355   mat->destroy    = MatDestroy_MPIAIJ;
1356   mat->view       = MatView_MPIAIJ;
1357   mat->factor     = matin->factor;
1358   mat->assembled  = PETSC_TRUE;
1359 
1360   a->m = mat->m   = oldmat->m;
1361   a->n = mat->n   = oldmat->n;
1362   a->M = mat->M   = oldmat->M;
1363   a->N = mat->N   = oldmat->N;
1364 
1365   a->rstart       = oldmat->rstart;
1366   a->rend         = oldmat->rend;
1367   a->cstart       = oldmat->cstart;
1368   a->cend         = oldmat->cend;
1369   a->size         = oldmat->size;
1370   a->rank         = oldmat->rank;
1371   a->insertmode   = NOT_SET_VALUES;
1372   a->rowvalues    = 0;
1373   a->getrowactive = PETSC_FALSE;
1374 
1375   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1376   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIAIJ));
1377   a->cowners = a->rowners + a->size + 2;
1378   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1379   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1380   if (oldmat->colmap) {
1381     a->colmap = (int *) PetscMalloc((a->N)*sizeof(int));CHKPTRQ(a->colmap);
1382     PLogObjectMemory(mat,(a->N)*sizeof(int));
1383     PetscMemcpy(a->colmap,oldmat->colmap,(a->N)*sizeof(int));
1384   } else a->colmap = 0;
1385   if (oldmat->garray && (len = ((Mat_SeqAIJ *) (oldmat->B->data))->n)) {
1386     a->garray = (int *) PetscMalloc(len*sizeof(int)); CHKPTRQ(a->garray);
1387     PLogObjectMemory(mat,len*sizeof(int));
1388     PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1389   } else a->garray = 0;
1390 
1391   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1392   PLogObjectParent(mat,a->lvec);
1393   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1394   PLogObjectParent(mat,a->Mvctx);
1395   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1396   PLogObjectParent(mat,a->A);
1397   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1398   PLogObjectParent(mat,a->B);
1399   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1400   if (flg) {
1401     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1402   }
1403   *newmat = mat;
1404   return 0;
1405 }
1406 
1407 #include "sys.h"
1408 
1409 int MatLoad_MPIAIJ(Viewer viewer,MatType type,Mat *newmat)
1410 {
1411   Mat          A;
1412   int          i, nz, ierr, j,rstart, rend, fd;
1413   Scalar       *vals,*svals;
1414   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1415   MPI_Status   status;
1416   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,maxnz,*cols;
1417   int          *ourlens,*sndcounts = 0,*procsnz = 0, *offlens,jj,*mycols,*smycols;
1418   int          tag = ((PetscObject)viewer)->tag;
1419 
1420   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1421   if (!rank) {
1422     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1423     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
1424     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIAIJ:not matrix object");
1425   }
1426 
1427   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1428   M = header[1]; N = header[2];
1429   /* determine ownership of all rows */
1430   m = M/size + ((M % size) > rank);
1431   rowners = (int *) PetscMalloc((size+2)*sizeof(int)); CHKPTRQ(rowners);
1432   MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1433   rowners[0] = 0;
1434   for ( i=2; i<=size; i++ ) {
1435     rowners[i] += rowners[i-1];
1436   }
1437   rstart = rowners[rank];
1438   rend   = rowners[rank+1];
1439 
1440   /* distribute row lengths to all processors */
1441   ourlens = (int*) PetscMalloc( 2*(rend-rstart)*sizeof(int) ); CHKPTRQ(ourlens);
1442   offlens = ourlens + (rend-rstart);
1443   if (!rank) {
1444     rowlengths = (int*) PetscMalloc( M*sizeof(int) ); CHKPTRQ(rowlengths);
1445     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1446     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1447     for ( i=0; i<size; i++ ) sndcounts[i] = rowners[i+1] - rowners[i];
1448     MPI_Scatterv(rowlengths,sndcounts,rowners,MPI_INT,ourlens,rend-rstart,MPI_INT,0,comm);
1449     PetscFree(sndcounts);
1450   }
1451   else {
1452     MPI_Scatterv(0,0,0,MPI_INT,ourlens,rend-rstart,MPI_INT, 0,comm);
1453   }
1454 
1455   if (!rank) {
1456     /* calculate the number of nonzeros on each processor */
1457     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1458     PetscMemzero(procsnz,size*sizeof(int));
1459     for ( i=0; i<size; i++ ) {
1460       for ( j=rowners[i]; j< rowners[i+1]; j++ ) {
1461         procsnz[i] += rowlengths[j];
1462       }
1463     }
1464     PetscFree(rowlengths);
1465 
1466     /* determine max buffer needed and allocate it */
1467     maxnz = 0;
1468     for ( i=0; i<size; i++ ) {
1469       maxnz = PetscMax(maxnz,procsnz[i]);
1470     }
1471     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1472 
1473     /* read in my part of the matrix column indices  */
1474     nz = procsnz[0];
1475     mycols = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1476     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1477 
1478     /* read in every one elses and ship off */
1479     for ( i=1; i<size; i++ ) {
1480       nz = procsnz[i];
1481       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1482       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1483     }
1484     PetscFree(cols);
1485   }
1486   else {
1487     /* determine buffer space needed for message */
1488     nz = 0;
1489     for ( i=0; i<m; i++ ) {
1490       nz += ourlens[i];
1491     }
1492     mycols = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1493 
1494     /* receive message of column indices*/
1495     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1496     MPI_Get_count(&status,MPI_INT,&maxnz);
1497     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIAIJ:something is wrong with file");
1498   }
1499 
1500   /* loop over local rows, determining number of off diagonal entries */
1501   PetscMemzero(offlens,m*sizeof(int));
1502   jj = 0;
1503   for ( i=0; i<m; i++ ) {
1504     for ( j=0; j<ourlens[i]; j++ ) {
1505       if (mycols[jj] < rstart || mycols[jj] >= rend) offlens[i]++;
1506       jj++;
1507     }
1508   }
1509 
1510   /* create our matrix */
1511   for ( i=0; i<m; i++ ) {
1512     ourlens[i] -= offlens[i];
1513   }
1514   ierr = MatCreateMPIAIJ(comm,m,PETSC_DECIDE,M,N,0,ourlens,0,offlens,newmat);CHKERRQ(ierr);
1515   A = *newmat;
1516   MatSetOption(A,MAT_COLUMNS_SORTED);
1517   for ( i=0; i<m; i++ ) {
1518     ourlens[i] += offlens[i];
1519   }
1520 
1521   if (!rank) {
1522     vals = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(vals);
1523 
1524     /* read in my part of the matrix numerical values  */
1525     nz = procsnz[0];
1526     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1527 
1528     /* insert into matrix */
1529     jj      = rstart;
1530     smycols = mycols;
1531     svals   = vals;
1532     for ( i=0; i<m; i++ ) {
1533       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1534       smycols += ourlens[i];
1535       svals   += ourlens[i];
1536       jj++;
1537     }
1538 
1539     /* read in other processors and ship out */
1540     for ( i=1; i<size; i++ ) {
1541       nz = procsnz[i];
1542       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1543       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1544     }
1545     PetscFree(procsnz);
1546   }
1547   else {
1548     /* receive numeric values */
1549     vals = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(vals);
1550 
1551     /* receive message of values*/
1552     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1553     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1554     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIAIJ:something is wrong with file");
1555 
1556     /* insert into matrix */
1557     jj      = rstart;
1558     smycols = mycols;
1559     svals   = vals;
1560     for ( i=0; i<m; i++ ) {
1561       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1562       smycols += ourlens[i];
1563       svals   += ourlens[i];
1564       jj++;
1565     }
1566   }
1567   PetscFree(ourlens); PetscFree(vals); PetscFree(mycols); PetscFree(rowners);
1568 
1569   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1570   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1571   return 0;
1572 }
1573