xref: /petsc/src/mat/impls/aij/mpi/mpiaij.c (revision 6a67db7e1512b5d9ff9ba8f94e48d05f0a9eae3d)
1 
2 #ifndef lint
3 static char vcid[] = "$Id: mpiaij.c,v 1.164 1996/08/21 19:59:43 bsmith Exp curfman $";
4 #endif
5 
6 #include "src/mat/impls/aij/mpi/mpiaij.h"
7 #include "src/vec/vecimpl.h"
8 #include "src/inline/spops.h"
9 
10 /* local utility routine that creates a mapping from the global column
11 number to the local number in the off-diagonal part of the local
12 storage of the matrix.  This is done in a non scable way since the
13 length of colmap equals the global matrix length.
14 */
15 static int CreateColmap_MPIAIJ_Private(Mat mat)
16 {
17   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
18   Mat_SeqAIJ *B = (Mat_SeqAIJ*) aij->B->data;
19   int        n = B->n,i;
20 
21   aij->colmap = (int *) PetscMalloc(aij->N*sizeof(int));CHKPTRQ(aij->colmap);
22   PLogObjectMemory(mat,aij->N*sizeof(int));
23   PetscMemzero(aij->colmap,aij->N*sizeof(int));
24   for ( i=0; i<n; i++ ) aij->colmap[aij->garray[i]] = i+1;
25   return 0;
26 }
27 
28 extern int DisAssemble_MPIAIJ(Mat);
29 
30 static int MatGetReordering_MPIAIJ(Mat mat,MatReordering type,IS *rperm,IS *cperm)
31 {
32   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
33   int        ierr;
34   if (aij->size == 1) {
35     ierr = MatGetReordering(aij->A,type,rperm,cperm); CHKERRQ(ierr);
36   } else SETERRQ(1,"MatGetReordering_MPIAIJ:not supported in parallel");
37   return 0;
38 }
39 
40 static int MatSetValues_MPIAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
41 {
42   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
43   Scalar     value;
44   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
45   int        cstart = aij->cstart, cend = aij->cend,row,col;
46   int        roworiented = aij->roworiented;
47 
48   if (aij->insertmode != NOT_SET_VALUES && aij->insertmode != addv) {
49     SETERRQ(1,"MatSetValues_MPIAIJ:Cannot mix inserts and adds");
50   }
51   aij->insertmode = addv;
52   for ( i=0; i<m; i++ ) {
53     if (im[i] < 0) SETERRQ(1,"MatSetValues_MPIAIJ:Negative row");
54     if (im[i] >= aij->M) SETERRQ(1,"MatSetValues_MPIAIJ:Row too large");
55     if (im[i] >= rstart && im[i] < rend) {
56       row = im[i] - rstart;
57       for ( j=0; j<n; j++ ) {
58         if (in[j] < 0) SETERRQ(1,"MatSetValues_MPIAIJ:Negative column");
59         if (in[j] >= aij->N) SETERRQ(1,"MatSetValues_MPIAIJ:Col too large");
60         if (in[j] >= cstart && in[j] < cend){
61           col = in[j] - cstart;
62           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
63           ierr = MatSetValues(aij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
64         }
65         else {
66           if (mat->was_assembled) {
67             if (!aij->colmap) {
68               ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
69             }
70             col = aij->colmap[in[j]] - 1;
71             if (col < 0 && !((Mat_SeqAIJ*)(aij->A->data))->nonew) {
72               ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
73               col =  in[j];
74             }
75           }
76           else col = in[j];
77           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
78           ierr = MatSetValues(aij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
79         }
80       }
81     }
82     else {
83       if (roworiented) {
84         ierr = StashValues_Private(&aij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
85       }
86       else {
87         row = im[i];
88         for ( j=0; j<n; j++ ) {
89           ierr = StashValues_Private(&aij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
90         }
91       }
92     }
93   }
94   return 0;
95 }
96 
97 static int MatGetValues_MPIAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
98 {
99   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
100   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
101   int        cstart = aij->cstart, cend = aij->cend,row,col;
102 
103   for ( i=0; i<m; i++ ) {
104     if (idxm[i] < 0) SETERRQ(1,"MatGetValues_MPIAIJ:Negative row");
105     if (idxm[i] >= aij->M) SETERRQ(1,"MatGetValues_MPIAIJ:Row too large");
106     if (idxm[i] >= rstart && idxm[i] < rend) {
107       row = idxm[i] - rstart;
108       for ( j=0; j<n; j++ ) {
109         if (idxn[j] < 0) SETERRQ(1,"MatGetValues_MPIAIJ:Negative column");
110         if (idxn[j] >= aij->N) SETERRQ(1,"MatGetValues_MPIAIJ:Col too large");
111         if (idxn[j] >= cstart && idxn[j] < cend){
112           col = idxn[j] - cstart;
113           ierr = MatGetValues(aij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
114         }
115         else {
116           if (!aij->colmap) {
117             ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
118           }
119           col = aij->colmap[idxn[j]] - 1;
120           if ((col < 0) || (aij->garray[col] != idxn[j])) *(v+i*n+j) = 0.0;
121           else {
122             ierr = MatGetValues(aij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
123           }
124         }
125       }
126     }
127     else {
128       SETERRQ(1,"MatGetValues_MPIAIJ:Only local values currently supported");
129     }
130   }
131   return 0;
132 }
133 
134 static int MatAssemblyBegin_MPIAIJ(Mat mat,MatAssemblyType mode)
135 {
136   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
137   MPI_Comm    comm = mat->comm;
138   int         size = aij->size, *owners = aij->rowners;
139   int         rank = aij->rank,tag = mat->tag, *owner,*starts,count,ierr;
140   MPI_Request *send_waits,*recv_waits;
141   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
142   InsertMode  addv;
143   Scalar      *rvalues,*svalues;
144 
145   /* make sure all processors are either in INSERTMODE or ADDMODE */
146   MPI_Allreduce(&aij->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
147   if (addv == (ADD_VALUES|INSERT_VALUES)) {
148     SETERRQ(1,"MatAssemblyBegin_MPIAIJ:Some processors inserted others added");
149   }
150   aij->insertmode = addv; /* in case this processor had no cache */
151 
152   /*  first count number of contributors to each processor */
153   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
154   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
155   owner = (int *) PetscMalloc( (aij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
156   for ( i=0; i<aij->stash.n; i++ ) {
157     idx = aij->stash.idx[i];
158     for ( j=0; j<size; j++ ) {
159       if (idx >= owners[j] && idx < owners[j+1]) {
160         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
161       }
162     }
163   }
164   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
165 
166   /* inform other processors of number of messages and max length*/
167   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
168   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
169   nreceives = work[rank];
170   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
171   nmax = work[rank];
172   PetscFree(work);
173 
174   /* post receives:
175        1) each message will consist of ordered pairs
176      (global index,value) we store the global index as a double
177      to simplify the message passing.
178        2) since we don't know how long each individual message is we
179      allocate the largest needed buffer for each receive. Potentially
180      this is a lot of wasted space.
181 
182 
183        This could be done better.
184   */
185   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
186   CHKPTRQ(rvalues);
187   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
188   CHKPTRQ(recv_waits);
189   for ( i=0; i<nreceives; i++ ) {
190     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
191               comm,recv_waits+i);
192   }
193 
194   /* do sends:
195       1) starts[i] gives the starting index in svalues for stuff going to
196          the ith processor
197   */
198   svalues = (Scalar *) PetscMalloc(3*(aij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
199   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
200   CHKPTRQ(send_waits);
201   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
202   starts[0] = 0;
203   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
204   for ( i=0; i<aij->stash.n; i++ ) {
205     svalues[3*starts[owner[i]]]       = (Scalar)  aij->stash.idx[i];
206     svalues[3*starts[owner[i]]+1]     = (Scalar)  aij->stash.idy[i];
207     svalues[3*(starts[owner[i]]++)+2] =  aij->stash.array[i];
208   }
209   PetscFree(owner);
210   starts[0] = 0;
211   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
212   count = 0;
213   for ( i=0; i<size; i++ ) {
214     if (procs[i]) {
215       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
216                 comm,send_waits+count++);
217     }
218   }
219   PetscFree(starts); PetscFree(nprocs);
220 
221   /* Free cache space */
222   PLogInfo(0,"[%d]MatAssemblyBegin_MPIAIJ:Number of off processor values %d\n",rank,aij->stash.n);
223   ierr = StashDestroy_Private(&aij->stash); CHKERRQ(ierr);
224 
225   aij->svalues    = svalues;    aij->rvalues    = rvalues;
226   aij->nsends     = nsends;     aij->nrecvs     = nreceives;
227   aij->send_waits = send_waits; aij->recv_waits = recv_waits;
228   aij->rmax       = nmax;
229 
230   return 0;
231 }
232 extern int MatSetUpMultiply_MPIAIJ(Mat);
233 
234 static int MatAssemblyEnd_MPIAIJ(Mat mat,MatAssemblyType mode)
235 {
236   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
237   MPI_Status  *send_status,recv_status;
238   int         imdex,nrecvs = aij->nrecvs, count = nrecvs, i, n, ierr;
239   int         row,col,other_disassembled;
240   Scalar      *values,val;
241   InsertMode  addv = aij->insertmode;
242 
243   /*  wait on receives */
244   while (count) {
245     MPI_Waitany(nrecvs,aij->recv_waits,&imdex,&recv_status);
246     /* unpack receives into our local space */
247     values = aij->rvalues + 3*imdex*aij->rmax;
248     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
249     n = n/3;
250     for ( i=0; i<n; i++ ) {
251       row = (int) PetscReal(values[3*i]) - aij->rstart;
252       col = (int) PetscReal(values[3*i+1]);
253       val = values[3*i+2];
254       if (col >= aij->cstart && col < aij->cend) {
255         col -= aij->cstart;
256         MatSetValues(aij->A,1,&row,1,&col,&val,addv);
257       }
258       else {
259         if (mat->was_assembled) {
260           if (!aij->colmap) {
261             ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
262           }
263           col = aij->colmap[col] - 1;
264           if (col < 0  && !((Mat_SeqAIJ*)(aij->A->data))->nonew) {
265             ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
266             col = (int) PetscReal(values[3*i+1]);
267           }
268         }
269         MatSetValues(aij->B,1,&row,1,&col,&val,addv);
270       }
271     }
272     count--;
273   }
274   PetscFree(aij->recv_waits); PetscFree(aij->rvalues);
275 
276   /* wait on sends */
277   if (aij->nsends) {
278     send_status = (MPI_Status *) PetscMalloc(aij->nsends*sizeof(MPI_Status));
279     CHKPTRQ(send_status);
280     MPI_Waitall(aij->nsends,aij->send_waits,send_status);
281     PetscFree(send_status);
282   }
283   PetscFree(aij->send_waits); PetscFree(aij->svalues);
284 
285   aij->insertmode = NOT_SET_VALUES;
286   ierr = MatAssemblyBegin(aij->A,mode); CHKERRQ(ierr);
287   ierr = MatAssemblyEnd(aij->A,mode); CHKERRQ(ierr);
288 
289   /* determine if any processor has disassembled, if so we must
290      also disassemble ourselfs, in order that we may reassemble. */
291   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
292   if (mat->was_assembled && !other_disassembled) {
293     ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
294   }
295 
296   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
297     ierr = MatSetUpMultiply_MPIAIJ(mat); CHKERRQ(ierr);
298   }
299   ierr = MatAssemblyBegin(aij->B,mode); CHKERRQ(ierr);
300   ierr = MatAssemblyEnd(aij->B,mode); CHKERRQ(ierr);
301 
302   if (aij->rowvalues) {PetscFree(aij->rowvalues); aij->rowvalues = 0;}
303   return 0;
304 }
305 
306 static int MatZeroEntries_MPIAIJ(Mat A)
307 {
308   Mat_MPIAIJ *l = (Mat_MPIAIJ *) A->data;
309   int        ierr;
310   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
311   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
312   return 0;
313 }
314 
315 /* the code does not do the diagonal entries correctly unless the
316    matrix is square and the column and row owerships are identical.
317    This is a BUG. The only way to fix it seems to be to access
318    aij->A and aij->B directly and not through the MatZeroRows()
319    routine.
320 */
321 static int MatZeroRows_MPIAIJ(Mat A,IS is,Scalar *diag)
322 {
323   Mat_MPIAIJ     *l = (Mat_MPIAIJ *) A->data;
324   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
325   int            *procs,*nprocs,j,found,idx,nsends,*work;
326   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
327   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
328   int            *lens,imdex,*lrows,*values;
329   MPI_Comm       comm = A->comm;
330   MPI_Request    *send_waits,*recv_waits;
331   MPI_Status     recv_status,*send_status;
332   IS             istmp;
333 
334   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
335   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
336 
337   /*  first count number of contributors to each processor */
338   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
339   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
340   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
341   for ( i=0; i<N; i++ ) {
342     idx = rows[i];
343     found = 0;
344     for ( j=0; j<size; j++ ) {
345       if (idx >= owners[j] && idx < owners[j+1]) {
346         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
347       }
348     }
349     if (!found) SETERRQ(1,"MatZeroRows_MPIAIJ:Index out of range");
350   }
351   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
352 
353   /* inform other processors of number of messages and max length*/
354   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
355   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
356   nrecvs = work[rank];
357   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
358   nmax = work[rank];
359   PetscFree(work);
360 
361   /* post receives:   */
362   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
363   CHKPTRQ(rvalues);
364   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
365   CHKPTRQ(recv_waits);
366   for ( i=0; i<nrecvs; i++ ) {
367     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
368   }
369 
370   /* do sends:
371       1) starts[i] gives the starting index in svalues for stuff going to
372          the ith processor
373   */
374   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
375   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
376   CHKPTRQ(send_waits);
377   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
378   starts[0] = 0;
379   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
380   for ( i=0; i<N; i++ ) {
381     svalues[starts[owner[i]]++] = rows[i];
382   }
383   ISRestoreIndices(is,&rows);
384 
385   starts[0] = 0;
386   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
387   count = 0;
388   for ( i=0; i<size; i++ ) {
389     if (procs[i]) {
390       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
391     }
392   }
393   PetscFree(starts);
394 
395   base = owners[rank];
396 
397   /*  wait on receives */
398   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
399   source = lens + nrecvs;
400   count  = nrecvs; slen = 0;
401   while (count) {
402     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
403     /* unpack receives into our local space */
404     MPI_Get_count(&recv_status,MPI_INT,&n);
405     source[imdex]  = recv_status.MPI_SOURCE;
406     lens[imdex]  = n;
407     slen += n;
408     count--;
409   }
410   PetscFree(recv_waits);
411 
412   /* move the data into the send scatter */
413   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
414   count = 0;
415   for ( i=0; i<nrecvs; i++ ) {
416     values = rvalues + i*nmax;
417     for ( j=0; j<lens[i]; j++ ) {
418       lrows[count++] = values[j] - base;
419     }
420   }
421   PetscFree(rvalues); PetscFree(lens);
422   PetscFree(owner); PetscFree(nprocs);
423 
424   /* actually zap the local rows */
425   ierr = ISCreateGeneral(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
426   PLogObjectParent(A,istmp);
427   PetscFree(lrows);
428   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
429   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
430   ierr = ISDestroy(istmp); CHKERRQ(ierr);
431 
432   /* wait on sends */
433   if (nsends) {
434     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
435     CHKPTRQ(send_status);
436     MPI_Waitall(nsends,send_waits,send_status);
437     PetscFree(send_status);
438   }
439   PetscFree(send_waits); PetscFree(svalues);
440 
441   return 0;
442 }
443 
444 static int MatMult_MPIAIJ(Mat A,Vec xx,Vec yy)
445 {
446   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
447   int        ierr,nt;
448 
449   ierr = VecGetLocalSize(xx,&nt);  CHKERRQ(ierr);
450   if (nt != a->n) {
451     SETERRQ(1,"MatMult_MPIAIJ:Incompatible parition of A and xx");
452   }
453   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx); CHKERRQ(ierr);
454   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
455   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx); CHKERRQ(ierr);
456   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
457   return 0;
458 }
459 
460 static int MatMultAdd_MPIAIJ(Mat A,Vec xx,Vec yy,Vec zz)
461 {
462   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
463   int        ierr;
464   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
465   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
466   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
467   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
468   return 0;
469 }
470 
471 static int MatMultTrans_MPIAIJ(Mat A,Vec xx,Vec yy)
472 {
473   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
474   int        ierr;
475 
476   /* do nondiagonal part */
477   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
478   /* send it on its way */
479   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
480   /* do local part */
481   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
482   /* receive remote parts: note this assumes the values are not actually */
483   /* inserted in yy until the next line, which is true for my implementation*/
484   /* but is not perhaps always true. */
485   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
486   return 0;
487 }
488 
489 static int MatMultTransAdd_MPIAIJ(Mat A,Vec xx,Vec yy,Vec zz)
490 {
491   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
492   int        ierr;
493 
494   /* do nondiagonal part */
495   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
496   /* send it on its way */
497   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
498   /* do local part */
499   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
500   /* receive remote parts: note this assumes the values are not actually */
501   /* inserted in yy until the next line, which is true for my implementation*/
502   /* but is not perhaps always true. */
503   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,
504                   (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
505   return 0;
506 }
507 
508 /*
509   This only works correctly for square matrices where the subblock A->A is the
510    diagonal block
511 */
512 static int MatGetDiagonal_MPIAIJ(Mat A,Vec v)
513 {
514   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
515   if (a->M != a->N)
516     SETERRQ(1,"MatGetDiagonal_MPIAIJ:Supports only square matrix where A->A is diag block");
517   if (a->rstart != a->cstart || a->rend != a->cend) {
518     SETERRQ(1,"MatGetDiagonal_MPIAIJ:row partition must equal col partition");  }
519   return MatGetDiagonal(a->A,v);
520 }
521 
522 static int MatScale_MPIAIJ(Scalar *aa,Mat A)
523 {
524   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
525   int        ierr;
526   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
527   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
528   return 0;
529 }
530 
531 static int MatDestroy_MPIAIJ(PetscObject obj)
532 {
533   Mat        mat = (Mat) obj;
534   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
535   int        ierr;
536 #if defined(PETSC_LOG)
537   PLogObjectState(obj,"Rows=%d, Cols=%d",aij->M,aij->N);
538 #endif
539   PetscFree(aij->rowners);
540   ierr = MatDestroy(aij->A); CHKERRQ(ierr);
541   ierr = MatDestroy(aij->B); CHKERRQ(ierr);
542   if (aij->colmap) PetscFree(aij->colmap);
543   if (aij->garray) PetscFree(aij->garray);
544   if (aij->lvec)   VecDestroy(aij->lvec);
545   if (aij->Mvctx)  VecScatterDestroy(aij->Mvctx);
546   if (aij->rowvalues) PetscFree(aij->rowvalues);
547   PetscFree(aij);
548   PLogObjectDestroy(mat);
549   PetscHeaderDestroy(mat);
550   return 0;
551 }
552 #include "draw.h"
553 #include "pinclude/pviewer.h"
554 
555 static int MatView_MPIAIJ_Binary(Mat mat,Viewer viewer)
556 {
557   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
558   int         ierr;
559 
560   if (aij->size == 1) {
561     ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
562   }
563   else SETERRQ(1,"MatView_MPIAIJ_Binary:Only uniprocessor output supported");
564   return 0;
565 }
566 
567 static int MatView_MPIAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
568 {
569   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
570   Mat_SeqAIJ* C = (Mat_SeqAIJ*)aij->A->data;
571   int         ierr, format,shift = C->indexshift,rank;
572   FILE        *fd;
573   ViewerType  vtype;
574 
575   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
576   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
577     ierr = ViewerGetFormat(viewer,&format);
578     if (format == ASCII_FORMAT_INFO_DETAILED) {
579       MatInfo info;
580       int     flg;
581       MPI_Comm_rank(mat->comm,&rank);
582       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
583       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
584       ierr = OptionsHasName(PETSC_NULL,"-mat_aij_no_inode",&flg); CHKERRQ(ierr);
585       PetscSequentialPhaseBegin(mat->comm,1);
586       if (flg) fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d mem %d, not using I-node routines\n",
587          rank,aij->m,(int)info.nz_used,(int)info.nz_allocated,(int)info.memory);
588       else fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d mem %d, using I-node routines\n",
589          rank,aij->m,(int)info.nz_used,(int)info.nz_allocated,(int)info.memory);
590       ierr = MatGetInfo(aij->A,MAT_LOCAL,&info);
591       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used);
592       ierr = MatGetInfo(aij->B,MAT_LOCAL,&info);
593       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used);
594       fflush(fd);
595       PetscSequentialPhaseEnd(mat->comm,1);
596       ierr = VecScatterView(aij->Mvctx,viewer); CHKERRQ(ierr);
597       return 0;
598     }
599     else if (format == ASCII_FORMAT_INFO) {
600       return 0;
601     }
602   }
603 
604   if (vtype == DRAW_VIEWER) {
605     Draw       draw;
606     PetscTruth isnull;
607     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
608     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
609   }
610 
611   if (vtype == ASCII_FILE_VIEWER) {
612     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
613     PetscSequentialPhaseBegin(mat->comm,1);
614     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
615            aij->rank,aij->m,aij->rstart,aij->rend,aij->n,aij->cstart,
616            aij->cend);
617     ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
618     ierr = MatView(aij->B,viewer); CHKERRQ(ierr);
619     fflush(fd);
620     PetscSequentialPhaseEnd(mat->comm,1);
621   }
622   else {
623     int size = aij->size;
624     rank = aij->rank;
625     if (size == 1) {
626       ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
627     }
628     else {
629       /* assemble the entire matrix onto first processor. */
630       Mat         A;
631       Mat_SeqAIJ *Aloc;
632       int         M = aij->M, N = aij->N,m,*ai,*aj,row,*cols,i,*ct;
633       Scalar      *a;
634 
635       if (!rank) {
636         ierr = MatCreateMPIAIJ(mat->comm,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
637                CHKERRQ(ierr);
638       }
639       else {
640         ierr = MatCreateMPIAIJ(mat->comm,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
641                CHKERRQ(ierr);
642       }
643       PLogObjectParent(mat,A);
644 
645       /* copy over the A part */
646       Aloc = (Mat_SeqAIJ*) aij->A->data;
647       m = Aloc->m; ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
648       row = aij->rstart;
649       for ( i=0; i<ai[m]+shift; i++ ) {aj[i] += aij->cstart + shift;}
650       for ( i=0; i<m; i++ ) {
651         ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],aj,a,INSERT_VALUES);CHKERRQ(ierr);
652         row++; a += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
653       }
654       aj = Aloc->j;
655       for ( i=0; i<ai[m]+shift; i++ ) {aj[i] -= aij->cstart + shift;}
656 
657       /* copy over the B part */
658       Aloc = (Mat_SeqAIJ*) aij->B->data;
659       m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
660       row = aij->rstart;
661       ct = cols = (int *) PetscMalloc( (ai[m]+1)*sizeof(int) ); CHKPTRQ(cols);
662       for ( i=0; i<ai[m]+shift; i++ ) {cols[i] = aij->garray[aj[i]+shift];}
663       for ( i=0; i<m; i++ ) {
664         ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],cols,a,INSERT_VALUES);CHKERRQ(ierr);
665         row++; a += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
666       }
667       PetscFree(ct);
668       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
669       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
670       if (!rank) {
671         ierr = MatView(((Mat_MPIAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
672       }
673       ierr = MatDestroy(A); CHKERRQ(ierr);
674     }
675   }
676   return 0;
677 }
678 
679 static int MatView_MPIAIJ(PetscObject obj,Viewer viewer)
680 {
681   Mat         mat = (Mat) obj;
682   int         ierr;
683   ViewerType  vtype;
684 
685   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
686   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
687       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
688     ierr = MatView_MPIAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
689   }
690   else if (vtype == BINARY_FILE_VIEWER) {
691     return MatView_MPIAIJ_Binary(mat,viewer);
692   }
693   return 0;
694 }
695 
696 /*
697     This has to provide several versions.
698 
699      2) a) use only local smoothing updating outer values only once.
700         b) local smoothing updating outer values each inner iteration
701      3) color updating out values betwen colors.
702 */
703 static int MatRelax_MPIAIJ(Mat matin,Vec bb,double omega,MatSORType flag,
704                            double fshift,int its,Vec xx)
705 {
706   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
707   Mat        AA = mat->A, BB = mat->B;
708   Mat_SeqAIJ *A = (Mat_SeqAIJ *) AA->data, *B = (Mat_SeqAIJ *)BB->data;
709   Scalar     *b,*x,*xs,*ls,d,*v,sum;
710   int        ierr,*idx, *diag;
711   int        n = mat->n, m = mat->m, i,shift = A->indexshift;
712 
713   VecGetArray(xx,&x); VecGetArray(bb,&b); VecGetArray(mat->lvec,&ls);
714   xs = x + shift; /* shift by one for index start of 1 */
715   ls = ls + shift;
716   if (!A->diag) {if ((ierr = MatMarkDiag_SeqAIJ(AA))) return ierr;}
717   diag = A->diag;
718   if ((flag & SOR_LOCAL_SYMMETRIC_SWEEP) == SOR_LOCAL_SYMMETRIC_SWEEP){
719     if (flag & SOR_ZERO_INITIAL_GUESS) {
720       return (*mat->A->ops.relax)(mat->A,bb,omega,flag,fshift,its,xx);
721     }
722     ierr=VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,mat->Mvctx);
723     CHKERRQ(ierr);
724     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,mat->Mvctx);
725     CHKERRQ(ierr);
726     while (its--) {
727       /* go down through the rows */
728       for ( i=0; i<m; i++ ) {
729         n    = A->i[i+1] - A->i[i];
730         idx  = A->j + A->i[i] + shift;
731         v    = A->a + A->i[i] + shift;
732         sum  = b[i];
733         SPARSEDENSEMDOT(sum,xs,v,idx,n);
734         d    = fshift + A->a[diag[i]+shift];
735         n    = B->i[i+1] - B->i[i];
736         idx  = B->j + B->i[i] + shift;
737         v    = B->a + B->i[i] + shift;
738         SPARSEDENSEMDOT(sum,ls,v,idx,n);
739         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
740       }
741       /* come up through the rows */
742       for ( i=m-1; i>-1; i-- ) {
743         n    = A->i[i+1] - A->i[i];
744         idx  = A->j + A->i[i] + shift;
745         v    = A->a + A->i[i] + shift;
746         sum  = b[i];
747         SPARSEDENSEMDOT(sum,xs,v,idx,n);
748         d    = fshift + A->a[diag[i]+shift];
749         n    = B->i[i+1] - B->i[i];
750         idx  = B->j + B->i[i] + shift;
751         v    = B->a + B->i[i] + shift;
752         SPARSEDENSEMDOT(sum,ls,v,idx,n);
753         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
754       }
755     }
756   }
757   else if (flag & SOR_LOCAL_FORWARD_SWEEP){
758     if (flag & SOR_ZERO_INITIAL_GUESS) {
759       return (*mat->A->ops.relax)(mat->A,bb,omega,flag,fshift,its,xx);
760     }
761     ierr=VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,mat->Mvctx);
762     CHKERRQ(ierr);
763     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,mat->Mvctx);
764     CHKERRQ(ierr);
765     while (its--) {
766       for ( i=0; i<m; i++ ) {
767         n    = A->i[i+1] - A->i[i];
768         idx  = A->j + A->i[i] + shift;
769         v    = A->a + A->i[i] + shift;
770         sum  = b[i];
771         SPARSEDENSEMDOT(sum,xs,v,idx,n);
772         d    = fshift + A->a[diag[i]+shift];
773         n    = B->i[i+1] - B->i[i];
774         idx  = B->j + B->i[i] + shift;
775         v    = B->a + B->i[i] + shift;
776         SPARSEDENSEMDOT(sum,ls,v,idx,n);
777         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
778       }
779     }
780   }
781   else if (flag & SOR_LOCAL_BACKWARD_SWEEP){
782     if (flag & SOR_ZERO_INITIAL_GUESS) {
783       return (*mat->A->ops.relax)(mat->A,bb,omega,flag,fshift,its,xx);
784     }
785     ierr = VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,
786                             mat->Mvctx); CHKERRQ(ierr);
787     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,
788                             mat->Mvctx); CHKERRQ(ierr);
789     while (its--) {
790       for ( i=m-1; i>-1; i-- ) {
791         n    = A->i[i+1] - A->i[i];
792         idx  = A->j + A->i[i] + shift;
793         v    = A->a + A->i[i] + shift;
794         sum  = b[i];
795         SPARSEDENSEMDOT(sum,xs,v,idx,n);
796         d    = fshift + A->a[diag[i]+shift];
797         n    = B->i[i+1] - B->i[i];
798         idx  = B->j + B->i[i] + shift;
799         v    = B->a + B->i[i] + shift;
800         SPARSEDENSEMDOT(sum,ls,v,idx,n);
801         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
802       }
803     }
804   }
805   else {
806     SETERRQ(1,"MatRelax_MPIAIJ:Option not supported");
807   }
808   return 0;
809 }
810 
811 static int MatGetInfo_MPIAIJ(Mat matin,MatInfoType flag,MatInfo *info)
812 {
813   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
814   Mat        A = mat->A, B = mat->B;
815   int        ierr;
816   double     isend[5], irecv[5];
817 
818   info->rows_global    = (double)mat->M;
819   info->columns_global = (double)mat->N;
820   info->rows_local     = (double)mat->m;
821   info->columns_local  = (double)mat->N;
822   info->block_size     = 1.0;
823   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
824   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->nz_unneeded;
825   isend[3] = info->memory;  isend[4] = info->mallocs;
826   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
827   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->nz_unneeded;
828   isend[3] += info->memory;  isend[4] += info->mallocs;
829   if (flag == MAT_LOCAL) {
830     info->nz_used      = isend[0];
831     info->nz_allocated = isend[1];
832     info->nz_unneeded  = isend[2];
833     info->memory       = isend[3];
834     info->mallocs      = isend[4];
835   } else if (flag == MAT_GLOBAL_MAX) {
836     MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_MAX,matin->comm);
837     info->nz_used      = irecv[0];
838     info->nz_allocated = irecv[1];
839     info->nz_unneeded  = irecv[2];
840     info->memory       = irecv[3];
841     info->mallocs      = irecv[4];
842   } else if (flag == MAT_GLOBAL_SUM) {
843     MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_SUM,matin->comm);
844     info->nz_used      = irecv[0];
845     info->nz_allocated = irecv[1];
846     info->nz_unneeded  = irecv[2];
847     info->memory       = irecv[3];
848     info->mallocs      = irecv[4];
849   }
850   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
851   info->fill_ratio_needed = 0;
852   info->factor_mallocs    = 0;
853 
854   return 0;
855 }
856 
857 extern int MatLUFactorSymbolic_MPIAIJ(Mat,IS,IS,double,Mat*);
858 extern int MatLUFactorNumeric_MPIAIJ(Mat,Mat*);
859 extern int MatLUFactor_MPIAIJ(Mat,IS,IS,double);
860 extern int MatILUFactorSymbolic_MPIAIJ(Mat,IS,IS,double,int,Mat *);
861 extern int MatSolve_MPIAIJ(Mat,Vec,Vec);
862 extern int MatSolveAdd_MPIAIJ(Mat,Vec,Vec,Vec);
863 extern int MatSolveTrans_MPIAIJ(Mat,Vec,Vec);
864 extern int MatSolveTransAdd_MPIAIJ(Mat,Vec,Vec,Vec);
865 
866 static int MatSetOption_MPIAIJ(Mat A,MatOption op)
867 {
868   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
869 
870   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
871       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
872       op == MAT_COLUMNS_SORTED ||
873       op == MAT_ROW_ORIENTED) {
874         MatSetOption(a->A,op);
875         MatSetOption(a->B,op);
876   }
877   else if (op == MAT_ROWS_SORTED ||
878            op == MAT_SYMMETRIC ||
879            op == MAT_STRUCTURALLY_SYMMETRIC ||
880            op == MAT_YES_NEW_DIAGONALS)
881     PLogInfo(A,"Info:MatSetOption_MPIAIJ:Option ignored\n");
882   else if (op == MAT_COLUMN_ORIENTED) {
883     a->roworiented = 0;
884     MatSetOption(a->A,op);
885     MatSetOption(a->B,op);
886   }
887   else if (op == MAT_NO_NEW_DIAGONALS)
888     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIAIJ:MAT_NO_NEW_DIAGONALS");}
889   else
890     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIAIJ:unknown option");}
891   return 0;
892 }
893 
894 static int MatGetSize_MPIAIJ(Mat matin,int *m,int *n)
895 {
896   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
897   *m = mat->M; *n = mat->N;
898   return 0;
899 }
900 
901 static int MatGetLocalSize_MPIAIJ(Mat matin,int *m,int *n)
902 {
903   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
904   *m = mat->m; *n = mat->N;
905   return 0;
906 }
907 
908 static int MatGetOwnershipRange_MPIAIJ(Mat matin,int *m,int *n)
909 {
910   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
911   *m = mat->rstart; *n = mat->rend;
912   return 0;
913 }
914 
915 extern int MatGetRow_SeqAIJ(Mat,int,int*,int**,Scalar**);
916 extern int MatRestoreRow_SeqAIJ(Mat,int,int*,int**,Scalar**);
917 
918 int MatGetRow_MPIAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
919 {
920   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
921   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
922   int        i, ierr, *cworkA, *cworkB, **pcA, **pcB, cstart = mat->cstart;
923   int        nztot, nzA, nzB, lrow, rstart = mat->rstart, rend = mat->rend;
924   int        *cmap, *idx_p;
925 
926   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,"MatGetRow_MPIAIJ:Already active");
927   mat->getrowactive = PETSC_TRUE;
928 
929   if (!mat->rowvalues && (idx || v)) {
930     /*
931         allocate enough space to hold information from the longest row.
932     */
933     Mat_SeqAIJ *Aa = (Mat_SeqAIJ *) mat->A->data,*Ba = (Mat_SeqAIJ *) mat->B->data;
934     int     max = 1,m = mat->m,tmp;
935     for ( i=0; i<m; i++ ) {
936       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
937       if (max < tmp) { max = tmp; }
938     }
939     mat->rowvalues = (Scalar *) PetscMalloc( max*(sizeof(int)+sizeof(Scalar)));
940     CHKPTRQ(mat->rowvalues);
941     mat->rowindices = (int *) (mat->rowvalues + max);
942   }
943 
944   if (row < rstart || row >= rend) SETERRQ(1,"MatGetRow_MPIAIJ:Only local rows")
945   lrow = row - rstart;
946 
947   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
948   if (!v)   {pvA = 0; pvB = 0;}
949   if (!idx) {pcA = 0; if (!v) pcB = 0;}
950   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
951   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
952   nztot = nzA + nzB;
953 
954   cmap  = mat->garray;
955   if (v  || idx) {
956     if (nztot) {
957       /* Sort by increasing column numbers, assuming A and B already sorted */
958       int imark = -1;
959       if (v) {
960         *v = v_p = mat->rowvalues;
961         for ( i=0; i<nzB; i++ ) {
962           if (cmap[cworkB[i]] < cstart)   v_p[i] = vworkB[i];
963           else break;
964         }
965         imark = i;
966         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
967         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
968       }
969       if (idx) {
970         *idx = idx_p = mat->rowindices;
971         if (imark > -1) {
972           for ( i=0; i<imark; i++ ) {
973             idx_p[i] = cmap[cworkB[i]];
974           }
975         } else {
976           for ( i=0; i<nzB; i++ ) {
977             if (cmap[cworkB[i]] < cstart)   idx_p[i] = cmap[cworkB[i]];
978             else break;
979           }
980           imark = i;
981         }
982         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart + cworkA[i];
983         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]];
984       }
985     }
986     else {
987       if (idx) *idx = 0;
988       if (v)   *v   = 0;
989     }
990   }
991   *nz = nztot;
992   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
993   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
994   return 0;
995 }
996 
997 int MatRestoreRow_MPIAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
998 {
999   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
1000   if (aij->getrowactive == PETSC_FALSE) {
1001     SETERRQ(1,"MatRestoreRow_MPIAIJ:MatGetRow not called");
1002   }
1003   aij->getrowactive = PETSC_FALSE;
1004   return 0;
1005 }
1006 
1007 static int MatNorm_MPIAIJ(Mat mat,NormType type,double *norm)
1008 {
1009   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
1010   Mat_SeqAIJ *amat = (Mat_SeqAIJ*) aij->A->data, *bmat = (Mat_SeqAIJ*) aij->B->data;
1011   int        ierr, i, j, cstart = aij->cstart,shift = amat->indexshift;
1012   double     sum = 0.0;
1013   Scalar     *v;
1014 
1015   if (aij->size == 1) {
1016     ierr =  MatNorm(aij->A,type,norm); CHKERRQ(ierr);
1017   } else {
1018     if (type == NORM_FROBENIUS) {
1019       v = amat->a;
1020       for (i=0; i<amat->nz; i++ ) {
1021 #if defined(PETSC_COMPLEX)
1022         sum += real(conj(*v)*(*v)); v++;
1023 #else
1024         sum += (*v)*(*v); v++;
1025 #endif
1026       }
1027       v = bmat->a;
1028       for (i=0; i<bmat->nz; i++ ) {
1029 #if defined(PETSC_COMPLEX)
1030         sum += real(conj(*v)*(*v)); v++;
1031 #else
1032         sum += (*v)*(*v); v++;
1033 #endif
1034       }
1035       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
1036       *norm = sqrt(*norm);
1037     }
1038     else if (type == NORM_1) { /* max column norm */
1039       double *tmp, *tmp2;
1040       int    *jj, *garray = aij->garray;
1041       tmp  = (double *) PetscMalloc( aij->N*sizeof(double) ); CHKPTRQ(tmp);
1042       tmp2 = (double *) PetscMalloc( aij->N*sizeof(double) ); CHKPTRQ(tmp2);
1043       PetscMemzero(tmp,aij->N*sizeof(double));
1044       *norm = 0.0;
1045       v = amat->a; jj = amat->j;
1046       for ( j=0; j<amat->nz; j++ ) {
1047         tmp[cstart + *jj++ + shift] += PetscAbsScalar(*v);  v++;
1048       }
1049       v = bmat->a; jj = bmat->j;
1050       for ( j=0; j<bmat->nz; j++ ) {
1051         tmp[garray[*jj++ + shift]] += PetscAbsScalar(*v); v++;
1052       }
1053       MPI_Allreduce(tmp,tmp2,aij->N,MPI_DOUBLE,MPI_SUM,mat->comm);
1054       for ( j=0; j<aij->N; j++ ) {
1055         if (tmp2[j] > *norm) *norm = tmp2[j];
1056       }
1057       PetscFree(tmp); PetscFree(tmp2);
1058     }
1059     else if (type == NORM_INFINITY) { /* max row norm */
1060       double ntemp = 0.0;
1061       for ( j=0; j<amat->m; j++ ) {
1062         v = amat->a + amat->i[j] + shift;
1063         sum = 0.0;
1064         for ( i=0; i<amat->i[j+1]-amat->i[j]; i++ ) {
1065           sum += PetscAbsScalar(*v); v++;
1066         }
1067         v = bmat->a + bmat->i[j] + shift;
1068         for ( i=0; i<bmat->i[j+1]-bmat->i[j]; i++ ) {
1069           sum += PetscAbsScalar(*v); v++;
1070         }
1071         if (sum > ntemp) ntemp = sum;
1072       }
1073       MPI_Allreduce(&ntemp,norm,1,MPI_DOUBLE,MPI_MAX,mat->comm);
1074     }
1075     else {
1076       SETERRQ(1,"MatNorm_MPIAIJ:No support for two norm");
1077     }
1078   }
1079   return 0;
1080 }
1081 
1082 static int MatTranspose_MPIAIJ(Mat A,Mat *matout)
1083 {
1084   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
1085   Mat_SeqAIJ *Aloc = (Mat_SeqAIJ *) a->A->data;
1086   int        ierr,shift = Aloc->indexshift;
1087   Mat        B;
1088   int        M = a->M, N = a->N,m,*ai,*aj,row,*cols,i,*ct;
1089   Scalar     *array;
1090 
1091   if (matout == PETSC_NULL && M != N)
1092     SETERRQ(1,"MatTranspose_MPIAIJ:Square matrix only for in-place");
1093   ierr = MatCreateMPIAIJ(A->comm,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,
1094          PETSC_NULL,&B); CHKERRQ(ierr);
1095 
1096   /* copy over the A part */
1097   Aloc = (Mat_SeqAIJ*) a->A->data;
1098   m = Aloc->m; ai = Aloc->i; aj = Aloc->j; array = Aloc->a;
1099   row = a->rstart;
1100   for ( i=0; i<ai[m]+shift; i++ ) {aj[i] += a->cstart + shift;}
1101   for ( i=0; i<m; i++ ) {
1102     ierr = MatSetValues(B,ai[i+1]-ai[i],aj,1,&row,array,INSERT_VALUES);CHKERRQ(ierr);
1103     row++; array += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
1104   }
1105   aj = Aloc->j;
1106   for ( i=0; i<ai[m]+shift; i++ ) {aj[i] -= a->cstart + shift;}
1107 
1108   /* copy over the B part */
1109   Aloc = (Mat_SeqAIJ*) a->B->data;
1110   m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; array = Aloc->a;
1111   row = a->rstart;
1112   ct = cols = (int *) PetscMalloc( (1+ai[m]-shift)*sizeof(int) ); CHKPTRQ(cols);
1113   for ( i=0; i<ai[m]+shift; i++ ) {cols[i] = a->garray[aj[i]+shift];}
1114   for ( i=0; i<m; i++ ) {
1115     ierr = MatSetValues(B,ai[i+1]-ai[i],cols,1,&row,array,INSERT_VALUES);CHKERRQ(ierr);
1116     row++; array += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
1117   }
1118   PetscFree(ct);
1119   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1120   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1121   if (matout != PETSC_NULL) {
1122     *matout = B;
1123   } else {
1124     /* This isn't really an in-place transpose .... but free data structures from a */
1125     PetscFree(a->rowners);
1126     ierr = MatDestroy(a->A); CHKERRQ(ierr);
1127     ierr = MatDestroy(a->B); CHKERRQ(ierr);
1128     if (a->colmap) PetscFree(a->colmap);
1129     if (a->garray) PetscFree(a->garray);
1130     if (a->lvec) VecDestroy(a->lvec);
1131     if (a->Mvctx) VecScatterDestroy(a->Mvctx);
1132     PetscFree(a);
1133     PetscMemcpy(A,B,sizeof(struct _Mat));
1134     PetscHeaderDestroy(B);
1135   }
1136   return 0;
1137 }
1138 
1139 extern int MatPrintHelp_SeqAIJ(Mat);
1140 static int MatPrintHelp_MPIAIJ(Mat A)
1141 {
1142   Mat_MPIAIJ *a   = (Mat_MPIAIJ*) A->data;
1143 
1144   if (!a->rank) return MatPrintHelp_SeqAIJ(a->A);
1145   else return 0;
1146 }
1147 
1148 static int MatGetBlockSize_MPIAIJ(Mat A,int *bs)
1149 {
1150   *bs = 1;
1151   return 0;
1152 }
1153 
1154 extern int MatConvert_MPIAIJ(Mat,MatType,Mat *);
1155 static int MatConvertSameType_MPIAIJ(Mat,Mat *,int);
1156 extern int MatIncreaseOverlap_MPIAIJ(Mat , int, IS *, int);
1157 int MatGetSubMatrices_MPIAIJ (Mat ,int , IS *,IS *,MatGetSubMatrixCall,Mat **);
1158 /* -------------------------------------------------------------------*/
1159 static struct _MatOps MatOps = {MatSetValues_MPIAIJ,
1160        MatGetRow_MPIAIJ,MatRestoreRow_MPIAIJ,
1161        MatMult_MPIAIJ,MatMultAdd_MPIAIJ,
1162        MatMultTrans_MPIAIJ,MatMultTransAdd_MPIAIJ,
1163        MatSolve_MPIAIJ,MatSolveAdd_MPIAIJ,
1164        MatSolveTrans_MPIAIJ,MatSolveTransAdd_MPIAIJ,
1165        MatLUFactor_MPIAIJ,0,
1166        MatRelax_MPIAIJ,
1167        MatTranspose_MPIAIJ,
1168        MatGetInfo_MPIAIJ,0,
1169        MatGetDiagonal_MPIAIJ,0,MatNorm_MPIAIJ,
1170        MatAssemblyBegin_MPIAIJ,MatAssemblyEnd_MPIAIJ,
1171        0,
1172        MatSetOption_MPIAIJ,MatZeroEntries_MPIAIJ,MatZeroRows_MPIAIJ,
1173        MatGetReordering_MPIAIJ,
1174        MatLUFactorSymbolic_MPIAIJ,MatLUFactorNumeric_MPIAIJ,0,0,
1175        MatGetSize_MPIAIJ,MatGetLocalSize_MPIAIJ,MatGetOwnershipRange_MPIAIJ,
1176        MatILUFactorSymbolic_MPIAIJ,0,
1177        0,0,MatConvert_MPIAIJ,MatConvertSameType_MPIAIJ,0,0,
1178        0,0,0,
1179        MatGetSubMatrices_MPIAIJ,MatIncreaseOverlap_MPIAIJ,MatGetValues_MPIAIJ,0,
1180        MatPrintHelp_MPIAIJ,
1181        MatScale_MPIAIJ,0,0,0,MatGetBlockSize_MPIAIJ};
1182 
1183 /*@C
1184    MatCreateMPIAIJ - Creates a sparse parallel matrix in AIJ format
1185    (the default parallel PETSc format).  For good matrix assembly performance
1186    the user should preallocate the matrix storage by setting the parameters
1187    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1188    performance can be increased by more than a factor of 50.
1189 
1190    Input Parameters:
1191 .  comm - MPI communicator
1192 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1193            This value should be the same as the local size used in creating the
1194            y vector for the matrix-vector product y = Ax.
1195 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1196            This value should be the same as the local size used in creating the
1197            x vector for the matrix-vector product y = Ax.
1198 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1199 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1200 .  d_nz - number of nonzeros per row in diagonal portion of local submatrix
1201            (same for all local rows)
1202 .  d_nzz - array containing the number of nonzeros in the various rows of the
1203            diagonal portion of the local submatrix (possibly different for each row)
1204            or PETSC_NULL. You must leave room for the diagonal entry even if
1205            it is zero.
1206 .  o_nz - number of nonzeros per row in the off-diagonal portion of local
1207            submatrix (same for all local rows).
1208 .  o_nzz - array containing the number of nonzeros in the various rows of the
1209            off-diagonal portion of the local submatrix (possibly different for
1210            each row) or PETSC_NULL.
1211 
1212    Output Parameter:
1213 .  A - the matrix
1214 
1215    Notes:
1216    The AIJ format (also called the Yale sparse matrix format or
1217    compressed row storage), is fully compatible with standard Fortran 77
1218    storage.  That is, the stored row and column indices can begin at
1219    either one (as in Fortran) or zero.  See the users manual for details.
1220 
1221    The user MUST specify either the local or global matrix dimensions
1222    (possibly both).
1223 
1224    By default, this format uses inodes (identical nodes) when possible.
1225    We search for consecutive rows with the same nonzero structure, thereby
1226    reusing matrix information to achieve increased efficiency.
1227 
1228    Options Database Keys:
1229 $    -mat_aij_no_inode  - Do not use inodes
1230 $    -mat_aij_inode_limit <limit> - Set inode limit.
1231 $        (max limit=5)
1232 $    -mat_aij_oneindex - Internally use indexing starting at 1
1233 $        rather than 0.  Note: When calling MatSetValues(),
1234 $        the user still MUST index entries starting at 0!
1235 
1236    Storage Information:
1237    For a square global matrix we define each processor's diagonal portion
1238    to be its local rows and the corresponding columns (a square submatrix);
1239    each processor's off-diagonal portion encompasses the remainder of the
1240    local matrix (a rectangular submatrix).
1241 
1242    The user can specify preallocated storage for the diagonal part of
1243    the local submatrix with either d_nz or d_nnz (not both).  Set
1244    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1245    memory allocation.  Likewise, specify preallocated storage for the
1246    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1247 
1248    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1249    the figure below we depict these three local rows and all columns (0-11).
1250 
1251 $          0 1 2 3 4 5 6 7 8 9 10 11
1252 $         -------------------
1253 $  row 3  |  o o o d d d o o o o o o
1254 $  row 4  |  o o o d d d o o o o o o
1255 $  row 5  |  o o o d d d o o o o o o
1256 $         -------------------
1257 $
1258 
1259    Thus, any entries in the d locations are stored in the d (diagonal)
1260    submatrix, and any entries in the o locations are stored in the
1261    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1262    stored simply in the MATSEQAIJ format for compressed row storage.
1263 
1264    Now d_nz should indicate the number of nonzeros per row in the d matrix,
1265    and o_nz should indicate the number of nonzeros per row in the o matrix.
1266    In general, for PDE problems in which most nonzeros are near the diagonal,
1267    one expects d_nz >> o_nz. For large problems you MUST preallocate memory
1268    or you will get TERRIBLE performance; see the users' manual chapter on
1269    matrices and the file $(PETSC_DIR)/Performance.
1270 
1271 .keywords: matrix, aij, compressed row, sparse, parallel
1272 
1273 .seealso: MatCreate(), MatCreateSeqAIJ(), MatSetValues()
1274 @*/
1275 int MatCreateMPIAIJ(MPI_Comm comm,int m,int n,int M,int N,
1276                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1277 {
1278   Mat          B;
1279   Mat_MPIAIJ   *b;
1280   int          ierr, i,sum[2],work[2];
1281 
1282   *A = 0;
1283   PetscHeaderCreate(B,_Mat,MAT_COOKIE,MATMPIAIJ,comm);
1284   PLogObjectCreate(B);
1285   B->data       = (void *) (b = PetscNew(Mat_MPIAIJ)); CHKPTRQ(b);
1286   PetscMemzero(b,sizeof(Mat_MPIAIJ));
1287   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1288   B->destroy    = MatDestroy_MPIAIJ;
1289   B->view       = MatView_MPIAIJ;
1290   B->factor     = 0;
1291   B->assembled  = PETSC_FALSE;
1292 
1293   b->insertmode = NOT_SET_VALUES;
1294   MPI_Comm_rank(comm,&b->rank);
1295   MPI_Comm_size(comm,&b->size);
1296 
1297   if (m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
1298     SETERRQ(1,"MatCreateMPIAIJ:Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1299 
1300   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1301     work[0] = m; work[1] = n;
1302     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
1303     if (M == PETSC_DECIDE) M = sum[0];
1304     if (N == PETSC_DECIDE) N = sum[1];
1305   }
1306   if (m == PETSC_DECIDE) {m = M/b->size + ((M % b->size) > b->rank);}
1307   if (n == PETSC_DECIDE) {n = N/b->size + ((N % b->size) > b->rank);}
1308   b->m = m; B->m = m;
1309   b->n = n; B->n = n;
1310   b->N = N; B->N = N;
1311   b->M = M; B->M = M;
1312 
1313   /* build local table of row and column ownerships */
1314   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1315   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIAIJ));
1316   b->cowners = b->rowners + b->size + 2;
1317   MPI_Allgather(&m,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
1318   b->rowners[0] = 0;
1319   for ( i=2; i<=b->size; i++ ) {
1320     b->rowners[i] += b->rowners[i-1];
1321   }
1322   b->rstart = b->rowners[b->rank];
1323   b->rend   = b->rowners[b->rank+1];
1324   MPI_Allgather(&n,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
1325   b->cowners[0] = 0;
1326   for ( i=2; i<=b->size; i++ ) {
1327     b->cowners[i] += b->cowners[i-1];
1328   }
1329   b->cstart = b->cowners[b->rank];
1330   b->cend   = b->cowners[b->rank+1];
1331 
1332   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1333   ierr = MatCreateSeqAIJ(MPI_COMM_SELF,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1334   PLogObjectParent(B,b->A);
1335   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1336   ierr = MatCreateSeqAIJ(MPI_COMM_SELF,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1337   PLogObjectParent(B,b->B);
1338 
1339   /* build cache for off array entries formed */
1340   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1341   b->colmap      = 0;
1342   b->garray      = 0;
1343   b->roworiented = 1;
1344 
1345   /* stuff used for matrix vector multiply */
1346   b->lvec      = 0;
1347   b->Mvctx     = 0;
1348 
1349   /* stuff for MatGetRow() */
1350   b->rowindices   = 0;
1351   b->rowvalues    = 0;
1352   b->getrowactive = PETSC_FALSE;
1353 
1354   *A = B;
1355   return 0;
1356 }
1357 
1358 static int MatConvertSameType_MPIAIJ(Mat matin,Mat *newmat,int cpvalues)
1359 {
1360   Mat        mat;
1361   Mat_MPIAIJ *a,*oldmat = (Mat_MPIAIJ *) matin->data;
1362   int        ierr, len=0, flg;
1363 
1364   *newmat       = 0;
1365   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIAIJ,matin->comm);
1366   PLogObjectCreate(mat);
1367   mat->data       = (void *) (a = PetscNew(Mat_MPIAIJ)); CHKPTRQ(a);
1368   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1369   mat->destroy    = MatDestroy_MPIAIJ;
1370   mat->view       = MatView_MPIAIJ;
1371   mat->factor     = matin->factor;
1372   mat->assembled  = PETSC_TRUE;
1373 
1374   a->m = mat->m   = oldmat->m;
1375   a->n = mat->n   = oldmat->n;
1376   a->M = mat->M   = oldmat->M;
1377   a->N = mat->N   = oldmat->N;
1378 
1379   a->rstart       = oldmat->rstart;
1380   a->rend         = oldmat->rend;
1381   a->cstart       = oldmat->cstart;
1382   a->cend         = oldmat->cend;
1383   a->size         = oldmat->size;
1384   a->rank         = oldmat->rank;
1385   a->insertmode   = NOT_SET_VALUES;
1386   a->rowvalues    = 0;
1387   a->getrowactive = PETSC_FALSE;
1388 
1389   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1390   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIAIJ));
1391   a->cowners = a->rowners + a->size + 2;
1392   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1393   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1394   if (oldmat->colmap) {
1395     a->colmap = (int *) PetscMalloc((a->N)*sizeof(int));CHKPTRQ(a->colmap);
1396     PLogObjectMemory(mat,(a->N)*sizeof(int));
1397     PetscMemcpy(a->colmap,oldmat->colmap,(a->N)*sizeof(int));
1398   } else a->colmap = 0;
1399   if (oldmat->garray) {
1400     len = ((Mat_SeqAIJ *) (oldmat->B->data))->n;
1401     a->garray = (int *) PetscMalloc((len+1)*sizeof(int)); CHKPTRQ(a->garray);
1402     PLogObjectMemory(mat,len*sizeof(int));
1403     if (len) PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1404   } else a->garray = 0;
1405 
1406   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1407   PLogObjectParent(mat,a->lvec);
1408   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1409   PLogObjectParent(mat,a->Mvctx);
1410   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1411   PLogObjectParent(mat,a->A);
1412   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1413   PLogObjectParent(mat,a->B);
1414   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1415   if (flg) {
1416     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1417   }
1418   *newmat = mat;
1419   return 0;
1420 }
1421 
1422 #include "sys.h"
1423 
1424 int MatLoad_MPIAIJ(Viewer viewer,MatType type,Mat *newmat)
1425 {
1426   Mat          A;
1427   int          i, nz, ierr, j,rstart, rend, fd;
1428   Scalar       *vals,*svals;
1429   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1430   MPI_Status   status;
1431   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,maxnz,*cols;
1432   int          *ourlens,*sndcounts = 0,*procsnz = 0, *offlens,jj,*mycols,*smycols;
1433   int          tag = ((PetscObject)viewer)->tag;
1434 
1435   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1436   if (!rank) {
1437     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1438     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
1439     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIAIJ:not matrix object");
1440   }
1441 
1442   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1443   M = header[1]; N = header[2];
1444   /* determine ownership of all rows */
1445   m = M/size + ((M % size) > rank);
1446   rowners = (int *) PetscMalloc((size+2)*sizeof(int)); CHKPTRQ(rowners);
1447   MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1448   rowners[0] = 0;
1449   for ( i=2; i<=size; i++ ) {
1450     rowners[i] += rowners[i-1];
1451   }
1452   rstart = rowners[rank];
1453   rend   = rowners[rank+1];
1454 
1455   /* distribute row lengths to all processors */
1456   ourlens = (int*) PetscMalloc( 2*(rend-rstart)*sizeof(int) ); CHKPTRQ(ourlens);
1457   offlens = ourlens + (rend-rstart);
1458   if (!rank) {
1459     rowlengths = (int*) PetscMalloc( M*sizeof(int) ); CHKPTRQ(rowlengths);
1460     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1461     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1462     for ( i=0; i<size; i++ ) sndcounts[i] = rowners[i+1] - rowners[i];
1463     MPI_Scatterv(rowlengths,sndcounts,rowners,MPI_INT,ourlens,rend-rstart,MPI_INT,0,comm);
1464     PetscFree(sndcounts);
1465   }
1466   else {
1467     MPI_Scatterv(0,0,0,MPI_INT,ourlens,rend-rstart,MPI_INT, 0,comm);
1468   }
1469 
1470   if (!rank) {
1471     /* calculate the number of nonzeros on each processor */
1472     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1473     PetscMemzero(procsnz,size*sizeof(int));
1474     for ( i=0; i<size; i++ ) {
1475       for ( j=rowners[i]; j< rowners[i+1]; j++ ) {
1476         procsnz[i] += rowlengths[j];
1477       }
1478     }
1479     PetscFree(rowlengths);
1480 
1481     /* determine max buffer needed and allocate it */
1482     maxnz = 0;
1483     for ( i=0; i<size; i++ ) {
1484       maxnz = PetscMax(maxnz,procsnz[i]);
1485     }
1486     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1487 
1488     /* read in my part of the matrix column indices  */
1489     nz = procsnz[0];
1490     mycols = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1491     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1492 
1493     /* read in every one elses and ship off */
1494     for ( i=1; i<size; i++ ) {
1495       nz = procsnz[i];
1496       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1497       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1498     }
1499     PetscFree(cols);
1500   }
1501   else {
1502     /* determine buffer space needed for message */
1503     nz = 0;
1504     for ( i=0; i<m; i++ ) {
1505       nz += ourlens[i];
1506     }
1507     mycols = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1508 
1509     /* receive message of column indices*/
1510     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1511     MPI_Get_count(&status,MPI_INT,&maxnz);
1512     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIAIJ:something is wrong with file");
1513   }
1514 
1515   /* loop over local rows, determining number of off diagonal entries */
1516   PetscMemzero(offlens,m*sizeof(int));
1517   jj = 0;
1518   for ( i=0; i<m; i++ ) {
1519     for ( j=0; j<ourlens[i]; j++ ) {
1520       if (mycols[jj] < rstart || mycols[jj] >= rend) offlens[i]++;
1521       jj++;
1522     }
1523   }
1524 
1525   /* create our matrix */
1526   for ( i=0; i<m; i++ ) {
1527     ourlens[i] -= offlens[i];
1528   }
1529   ierr = MatCreateMPIAIJ(comm,m,PETSC_DECIDE,M,N,0,ourlens,0,offlens,newmat);CHKERRQ(ierr);
1530   A = *newmat;
1531   MatSetOption(A,MAT_COLUMNS_SORTED);
1532   for ( i=0; i<m; i++ ) {
1533     ourlens[i] += offlens[i];
1534   }
1535 
1536   if (!rank) {
1537     vals = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(vals);
1538 
1539     /* read in my part of the matrix numerical values  */
1540     nz = procsnz[0];
1541     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1542 
1543     /* insert into matrix */
1544     jj      = rstart;
1545     smycols = mycols;
1546     svals   = vals;
1547     for ( i=0; i<m; i++ ) {
1548       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1549       smycols += ourlens[i];
1550       svals   += ourlens[i];
1551       jj++;
1552     }
1553 
1554     /* read in other processors and ship out */
1555     for ( i=1; i<size; i++ ) {
1556       nz = procsnz[i];
1557       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1558       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1559     }
1560     PetscFree(procsnz);
1561   }
1562   else {
1563     /* receive numeric values */
1564     vals = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(vals);
1565 
1566     /* receive message of values*/
1567     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1568     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1569     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIAIJ:something is wrong with file");
1570 
1571     /* insert into matrix */
1572     jj      = rstart;
1573     smycols = mycols;
1574     svals   = vals;
1575     for ( i=0; i<m; i++ ) {
1576       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1577       smycols += ourlens[i];
1578       svals   += ourlens[i];
1579       jj++;
1580     }
1581   }
1582   PetscFree(ourlens); PetscFree(vals); PetscFree(mycols); PetscFree(rowners);
1583 
1584   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1585   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1586   return 0;
1587 }
1588