xref: /petsc/src/mat/impls/aij/mpi/mpiaij.c (revision c16cb8f2de640b3af1673495cb6d811eff24116a)
1 #ifndef lint
2 static char vcid[] = "$Id: mpiaij.c,v 1.155 1996/07/29 22:51:58 balay Exp bsmith $";
3 #endif
4 
5 #include "mpiaij.h"
6 #include "src/vec/vecimpl.h"
7 #include "src/inline/spops.h"
8 
9 /* local utility routine that creates a mapping from the global column
10 number to the local number in the off-diagonal part of the local
11 storage of the matrix.  This is done in a non scable way since the
12 length of colmap equals the global matrix length.
13 */
14 static int CreateColmap_Private(Mat mat)
15 {
16   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
17   Mat_SeqAIJ *B = (Mat_SeqAIJ*) aij->B->data;
18   int        n = B->n,i,shift = B->indexshift;
19 
20   aij->colmap = (int *) PetscMalloc(aij->N*sizeof(int));CHKPTRQ(aij->colmap);
21   PLogObjectMemory(mat,aij->N*sizeof(int));
22   PetscMemzero(aij->colmap,aij->N*sizeof(int));
23   for ( i=0; i<n; i++ ) aij->colmap[aij->garray[i]] = i-shift;
24   return 0;
25 }
26 
27 extern int DisAssemble_MPIAIJ(Mat);
28 
29 static int MatGetReordering_MPIAIJ(Mat mat,MatReordering type,IS *rperm,IS *cperm)
30 {
31   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
32   int        ierr;
33   if (aij->size == 1) {
34     ierr = MatGetReordering(aij->A,type,rperm,cperm); CHKERRQ(ierr);
35   } else SETERRQ(1,"MatGetReordering_MPIAIJ:not supported in parallel");
36   return 0;
37 }
38 
39 static int MatSetValues_MPIAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
40 {
41   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
42   Mat_SeqAIJ *C = (Mat_SeqAIJ*) aij->A->data;
43   Scalar     value;
44   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
45   int        cstart = aij->cstart, cend = aij->cend,row,col;
46   int        shift = C->indexshift,roworiented = aij->roworiented;
47 
48   if (aij->insertmode != NOT_SET_VALUES && aij->insertmode != addv) {
49     SETERRQ(1,"MatSetValues_MPIAIJ:Cannot mix inserts and adds");
50   }
51   aij->insertmode = addv;
52   for ( i=0; i<m; i++ ) {
53     if (im[i] < 0) SETERRQ(1,"MatSetValues_MPIAIJ:Negative row");
54     if (im[i] >= aij->M) SETERRQ(1,"MatSetValues_MPIAIJ:Row too large");
55     if (im[i] >= rstart && im[i] < rend) {
56       row = im[i] - rstart;
57       for ( j=0; j<n; j++ ) {
58         if (in[j] < 0) SETERRQ(1,"MatSetValues_MPIAIJ:Negative column");
59         if (in[j] >= aij->N) SETERRQ(1,"MatSetValues_MPIAIJ:Col too large");
60         if (in[j] >= cstart && in[j] < cend){
61           col = in[j] - cstart;
62           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
63           ierr = MatSetValues(aij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
64         }
65         else {
66           if (mat->was_assembled) {
67             if (!aij->colmap) {ierr = CreateColmap_Private(mat);CHKERRQ(ierr);}
68             col = aij->colmap[in[j]] + shift;
69             if (col < 0 && !((Mat_SeqAIJ*)(aij->A->data))->nonew) {
70               ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
71               col =  in[j];
72             }
73           }
74           else col = in[j];
75           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
76           ierr = MatSetValues(aij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
77         }
78       }
79     }
80     else {
81       if (roworiented) {
82         ierr = StashValues_Private(&aij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
83       }
84       else {
85         row = im[i];
86         for ( j=0; j<n; j++ ) {
87           ierr = StashValues_Private(&aij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
88         }
89       }
90     }
91   }
92   return 0;
93 }
94 
95 static int MatGetValues_MPIAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
96 {
97   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
98   Mat_SeqAIJ *C = (Mat_SeqAIJ*) aij->A->data;
99   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
100   int        cstart = aij->cstart, cend = aij->cend,row,col;
101   int        shift = C->indexshift;
102 
103   for ( i=0; i<m; i++ ) {
104     if (idxm[i] < 0) SETERRQ(1,"MatGetValues_MPIAIJ:Negative row");
105     if (idxm[i] >= aij->M) SETERRQ(1,"MatGetValues_MPIAIJ:Row too large");
106     if (idxm[i] >= rstart && idxm[i] < rend) {
107       row = idxm[i] - rstart;
108       for ( j=0; j<n; j++ ) {
109         if (idxn[j] < 0) SETERRQ(1,"MatGetValues_MPIAIJ:Negative column");
110         if (idxn[j] >= aij->N) SETERRQ(1,"MatGetValues_MPIAIJ:Col too large");
111         if (idxn[j] >= cstart && idxn[j] < cend){
112           col = idxn[j] - cstart;
113           ierr = MatGetValues(aij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
114         }
115         else {
116           if (!aij->colmap) {ierr = CreateColmap_Private(mat);CHKERRQ(ierr);}
117           col = aij->colmap[idxn[j]] + shift;
118           if ( aij->garray[col] != idxn[j] ) *(v+i*n+j) = 0.0;
119           else {
120             ierr = MatGetValues(aij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
121           }
122         }
123       }
124     }
125     else {
126       SETERRQ(1,"MatGetValues_MPIAIJ:Only local values currently supported");
127     }
128   }
129   return 0;
130 }
131 
132 static int MatAssemblyBegin_MPIAIJ(Mat mat,MatAssemblyType mode)
133 {
134   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
135   MPI_Comm    comm = mat->comm;
136   int         size = aij->size, *owners = aij->rowners;
137   int         rank = aij->rank,tag = mat->tag, *owner,*starts,count,ierr;
138   MPI_Request *send_waits,*recv_waits;
139   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
140   InsertMode  addv;
141   Scalar      *rvalues,*svalues;
142 
143   /* make sure all processors are either in INSERTMODE or ADDMODE */
144   MPI_Allreduce(&aij->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
145   if (addv == (ADD_VALUES|INSERT_VALUES)) {
146     SETERRQ(1,"MatAssemblyBegin_MPIAIJ:Some processors inserted others added");
147   }
148   aij->insertmode = addv; /* in case this processor had no cache */
149 
150   /*  first count number of contributors to each processor */
151   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
152   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
153   owner = (int *) PetscMalloc( (aij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
154   for ( i=0; i<aij->stash.n; i++ ) {
155     idx = aij->stash.idx[i];
156     for ( j=0; j<size; j++ ) {
157       if (idx >= owners[j] && idx < owners[j+1]) {
158         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
159       }
160     }
161   }
162   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
163 
164   /* inform other processors of number of messages and max length*/
165   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
166   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
167   nreceives = work[rank];
168   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
169   nmax = work[rank];
170   PetscFree(work);
171 
172   /* post receives:
173        1) each message will consist of ordered pairs
174      (global index,value) we store the global index as a double
175      to simplify the message passing.
176        2) since we don't know how long each individual message is we
177      allocate the largest needed buffer for each receive. Potentially
178      this is a lot of wasted space.
179 
180 
181        This could be done better.
182   */
183   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
184   CHKPTRQ(rvalues);
185   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
186   CHKPTRQ(recv_waits);
187   for ( i=0; i<nreceives; i++ ) {
188     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
189               comm,recv_waits+i);
190   }
191 
192   /* do sends:
193       1) starts[i] gives the starting index in svalues for stuff going to
194          the ith processor
195   */
196   svalues = (Scalar *) PetscMalloc(3*(aij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
197   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
198   CHKPTRQ(send_waits);
199   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
200   starts[0] = 0;
201   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
202   for ( i=0; i<aij->stash.n; i++ ) {
203     svalues[3*starts[owner[i]]]       = (Scalar)  aij->stash.idx[i];
204     svalues[3*starts[owner[i]]+1]     = (Scalar)  aij->stash.idy[i];
205     svalues[3*(starts[owner[i]]++)+2] =  aij->stash.array[i];
206   }
207   PetscFree(owner);
208   starts[0] = 0;
209   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
210   count = 0;
211   for ( i=0; i<size; i++ ) {
212     if (procs[i]) {
213       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
214                 comm,send_waits+count++);
215     }
216   }
217   PetscFree(starts); PetscFree(nprocs);
218 
219   /* Free cache space */
220   PLogInfo(0,"[%d]MatAssemblyBegin_MPIAIJ:Number of off processor values %d\n",rank,aij->stash.n);
221   ierr = StashDestroy_Private(&aij->stash); CHKERRQ(ierr);
222 
223   aij->svalues    = svalues;    aij->rvalues    = rvalues;
224   aij->nsends     = nsends;     aij->nrecvs     = nreceives;
225   aij->send_waits = send_waits; aij->recv_waits = recv_waits;
226   aij->rmax       = nmax;
227 
228   return 0;
229 }
230 extern int MatSetUpMultiply_MPIAIJ(Mat);
231 
232 static int MatAssemblyEnd_MPIAIJ(Mat mat,MatAssemblyType mode)
233 {
234   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
235   Mat_SeqAIJ *C = (Mat_SeqAIJ *) aij->A->data;
236   MPI_Status  *send_status,recv_status;
237   int         imdex,nrecvs = aij->nrecvs, count = nrecvs, i, n, ierr;
238   int         row,col,other_disassembled,shift = C->indexshift;
239   Scalar      *values,val;
240   InsertMode  addv = aij->insertmode;
241 
242   /*  wait on receives */
243   while (count) {
244     MPI_Waitany(nrecvs,aij->recv_waits,&imdex,&recv_status);
245     /* unpack receives into our local space */
246     values = aij->rvalues + 3*imdex*aij->rmax;
247     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
248     n = n/3;
249     for ( i=0; i<n; i++ ) {
250       row = (int) PetscReal(values[3*i]) - aij->rstart;
251       col = (int) PetscReal(values[3*i+1]);
252       val = values[3*i+2];
253       if (col >= aij->cstart && col < aij->cend) {
254         col -= aij->cstart;
255         MatSetValues(aij->A,1,&row,1,&col,&val,addv);
256       }
257       else {
258         if (mat->was_assembled) {
259           if (!aij->colmap) {ierr = CreateColmap_Private(mat);CHKERRQ(ierr);}
260           col = aij->colmap[col] + shift;
261           if (col < 0  && !((Mat_SeqAIJ*)(aij->A->data))->nonew) {
262             ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
263             col = (int) PetscReal(values[3*i+1]);
264           }
265         }
266         MatSetValues(aij->B,1,&row,1,&col,&val,addv);
267       }
268     }
269     count--;
270   }
271   PetscFree(aij->recv_waits); PetscFree(aij->rvalues);
272 
273   /* wait on sends */
274   if (aij->nsends) {
275     send_status = (MPI_Status *) PetscMalloc(aij->nsends*sizeof(MPI_Status));
276     CHKPTRQ(send_status);
277     MPI_Waitall(aij->nsends,aij->send_waits,send_status);
278     PetscFree(send_status);
279   }
280   PetscFree(aij->send_waits); PetscFree(aij->svalues);
281 
282   aij->insertmode = NOT_SET_VALUES;
283   ierr = MatAssemblyBegin(aij->A,mode); CHKERRQ(ierr);
284   ierr = MatAssemblyEnd(aij->A,mode); CHKERRQ(ierr);
285 
286   /* determine if any processor has disassembled, if so we must
287      also disassemble ourselfs, in order that we may reassemble. */
288   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
289   if (mat->was_assembled && !other_disassembled) {
290     ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
291   }
292 
293   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
294     ierr = MatSetUpMultiply_MPIAIJ(mat); CHKERRQ(ierr);
295   }
296   ierr = MatAssemblyBegin(aij->B,mode); CHKERRQ(ierr);
297   ierr = MatAssemblyEnd(aij->B,mode); CHKERRQ(ierr);
298 
299   if (aij->rowvalues) {PetscFree(aij->rowvalues); aij->rowvalues = 0;}
300   return 0;
301 }
302 
303 static int MatZeroEntries_MPIAIJ(Mat A)
304 {
305   Mat_MPIAIJ *l = (Mat_MPIAIJ *) A->data;
306   int        ierr;
307   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
308   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
309   return 0;
310 }
311 
312 /* the code does not do the diagonal entries correctly unless the
313    matrix is square and the column and row owerships are identical.
314    This is a BUG. The only way to fix it seems to be to access
315    aij->A and aij->B directly and not through the MatZeroRows()
316    routine.
317 */
318 static int MatZeroRows_MPIAIJ(Mat A,IS is,Scalar *diag)
319 {
320   Mat_MPIAIJ     *l = (Mat_MPIAIJ *) A->data;
321   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
322   int            *procs,*nprocs,j,found,idx,nsends,*work;
323   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
324   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
325   int            *lens,imdex,*lrows,*values;
326   MPI_Comm       comm = A->comm;
327   MPI_Request    *send_waits,*recv_waits;
328   MPI_Status     recv_status,*send_status;
329   IS             istmp;
330 
331   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
332   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
333 
334   /*  first count number of contributors to each processor */
335   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
336   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
337   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
338   for ( i=0; i<N; i++ ) {
339     idx = rows[i];
340     found = 0;
341     for ( j=0; j<size; j++ ) {
342       if (idx >= owners[j] && idx < owners[j+1]) {
343         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
344       }
345     }
346     if (!found) SETERRQ(1,"MatZeroRows_MPIAIJ:Index out of range");
347   }
348   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
349 
350   /* inform other processors of number of messages and max length*/
351   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
352   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
353   nrecvs = work[rank];
354   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
355   nmax = work[rank];
356   PetscFree(work);
357 
358   /* post receives:   */
359   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
360   CHKPTRQ(rvalues);
361   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
362   CHKPTRQ(recv_waits);
363   for ( i=0; i<nrecvs; i++ ) {
364     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
365   }
366 
367   /* do sends:
368       1) starts[i] gives the starting index in svalues for stuff going to
369          the ith processor
370   */
371   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
372   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
373   CHKPTRQ(send_waits);
374   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
375   starts[0] = 0;
376   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
377   for ( i=0; i<N; i++ ) {
378     svalues[starts[owner[i]]++] = rows[i];
379   }
380   ISRestoreIndices(is,&rows);
381 
382   starts[0] = 0;
383   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
384   count = 0;
385   for ( i=0; i<size; i++ ) {
386     if (procs[i]) {
387       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
388     }
389   }
390   PetscFree(starts);
391 
392   base = owners[rank];
393 
394   /*  wait on receives */
395   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
396   source = lens + nrecvs;
397   count  = nrecvs; slen = 0;
398   while (count) {
399     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
400     /* unpack receives into our local space */
401     MPI_Get_count(&recv_status,MPI_INT,&n);
402     source[imdex]  = recv_status.MPI_SOURCE;
403     lens[imdex]  = n;
404     slen += n;
405     count--;
406   }
407   PetscFree(recv_waits);
408 
409   /* move the data into the send scatter */
410   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
411   count = 0;
412   for ( i=0; i<nrecvs; i++ ) {
413     values = rvalues + i*nmax;
414     for ( j=0; j<lens[i]; j++ ) {
415       lrows[count++] = values[j] - base;
416     }
417   }
418   PetscFree(rvalues); PetscFree(lens);
419   PetscFree(owner); PetscFree(nprocs);
420 
421   /* actually zap the local rows */
422   ierr = ISCreateSeq(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
423   PLogObjectParent(A,istmp);
424   PetscFree(lrows);
425   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
426   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
427   ierr = ISDestroy(istmp); CHKERRQ(ierr);
428 
429   /* wait on sends */
430   if (nsends) {
431     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
432     CHKPTRQ(send_status);
433     MPI_Waitall(nsends,send_waits,send_status);
434     PetscFree(send_status);
435   }
436   PetscFree(send_waits); PetscFree(svalues);
437 
438   return 0;
439 }
440 
441 static int MatMult_MPIAIJ(Mat A,Vec xx,Vec yy)
442 {
443   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
444   int        ierr,nt;
445 
446   ierr = VecGetLocalSize(xx,&nt);  CHKERRQ(ierr);
447   if (nt != a->n) {
448     SETERRQ(1,"MatMult_MPIAIJ:Incompatible parition of A and xx");
449   }
450   ierr = VecGetLocalSize(yy,&nt);  CHKERRQ(ierr);
451   if (nt != a->m) {
452     SETERRQ(1,"MatMult_MPIAIJ:Incompatible parition of A and yy");
453   }
454   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx); CHKERRQ(ierr);
455   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
456   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx); CHKERRQ(ierr);
457   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
458   return 0;
459 }
460 
461 static int MatMultAdd_MPIAIJ(Mat A,Vec xx,Vec yy,Vec zz)
462 {
463   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
464   int        ierr;
465   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
466   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
467   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
468   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
469   return 0;
470 }
471 
472 static int MatMultTrans_MPIAIJ(Mat A,Vec xx,Vec yy)
473 {
474   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
475   int        ierr;
476 
477   /* do nondiagonal part */
478   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
479   /* send it on its way */
480   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,
481                 (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
482   /* do local part */
483   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
484   /* receive remote parts: note this assumes the values are not actually */
485   /* inserted in yy until the next line, which is true for my implementation*/
486   /* but is not perhaps always true. */
487   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,
488                   (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
489   return 0;
490 }
491 
492 static int MatMultTransAdd_MPIAIJ(Mat A,Vec xx,Vec yy,Vec zz)
493 {
494   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
495   int        ierr;
496 
497   /* do nondiagonal part */
498   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
499   /* send it on its way */
500   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,
501                  (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
502   /* do local part */
503   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
504   /* receive remote parts: note this assumes the values are not actually */
505   /* inserted in yy until the next line, which is true for my implementation*/
506   /* but is not perhaps always true. */
507   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,
508                   (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
509   return 0;
510 }
511 
512 /*
513   This only works correctly for square matrices where the subblock A->A is the
514    diagonal block
515 */
516 static int MatGetDiagonal_MPIAIJ(Mat A,Vec v)
517 {
518   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
519   if (a->M != a->N)
520     SETERRQ(1,"MatGetDiagonal_MPIAIJ:Supports only square matrix where A->A is diag block");
521   return MatGetDiagonal(a->A,v);
522 }
523 
524 static int MatScale_MPIAIJ(Scalar *aa,Mat A)
525 {
526   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
527   int        ierr;
528   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
529   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
530   return 0;
531 }
532 
533 static int MatDestroy_MPIAIJ(PetscObject obj)
534 {
535   Mat        mat = (Mat) obj;
536   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
537   int        ierr;
538 #if defined(PETSC_LOG)
539   PLogObjectState(obj,"Rows=%d, Cols=%d",aij->M,aij->N);
540 #endif
541   PetscFree(aij->rowners);
542   ierr = MatDestroy(aij->A); CHKERRQ(ierr);
543   ierr = MatDestroy(aij->B); CHKERRQ(ierr);
544   if (aij->colmap) PetscFree(aij->colmap);
545   if (aij->garray) PetscFree(aij->garray);
546   if (aij->lvec)   VecDestroy(aij->lvec);
547   if (aij->Mvctx)  VecScatterDestroy(aij->Mvctx);
548   if (aij->rowvalues) PetscFree(aij->rowvalues);
549   PetscFree(aij);
550   PLogObjectDestroy(mat);
551   PetscHeaderDestroy(mat);
552   return 0;
553 }
554 #include "draw.h"
555 #include "pinclude/pviewer.h"
556 
557 static int MatView_MPIAIJ_Binary(Mat mat,Viewer viewer)
558 {
559   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
560   int         ierr;
561 
562   if (aij->size == 1) {
563     ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
564   }
565   else SETERRQ(1,"MatView_MPIAIJ_Binary:Only uniprocessor output supported");
566   return 0;
567 }
568 
569 static int MatView_MPIAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
570 {
571   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
572   Mat_SeqAIJ* C = (Mat_SeqAIJ*)aij->A->data;
573   int         ierr, format,shift = C->indexshift,rank;
574   FILE        *fd;
575   ViewerType  vtype;
576 
577   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
578   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
579     ierr = ViewerGetFormat(viewer,&format);
580     if (format == ASCII_FORMAT_INFO_DETAILED) {
581       int nz, nzalloc, mem, flg;
582       MPI_Comm_rank(mat->comm,&rank);
583       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
584       ierr = MatGetInfo(mat,MAT_LOCAL,&nz,&nzalloc,&mem);
585       ierr = OptionsHasName(PETSC_NULL,"-mat_aij_no_inode",&flg); CHKERRQ(ierr);
586       PetscSequentialPhaseBegin(mat->comm,1);
587       if (flg) fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d mem %d, not using I-node routines\n",
588          rank,aij->m,nz,nzalloc,mem);
589       else fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d mem %d, using I-node routines\n",
590          rank,aij->m,nz,nzalloc,mem);
591       ierr = MatGetInfo(aij->A,MAT_LOCAL,&nz,&nzalloc,&mem);
592       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,nz);
593       ierr = MatGetInfo(aij->B,MAT_LOCAL,&nz,&nzalloc,&mem);
594       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,nz);
595       fflush(fd);
596       PetscSequentialPhaseEnd(mat->comm,1);
597       ierr = VecScatterView(aij->Mvctx,viewer); CHKERRQ(ierr);
598       return 0;
599     }
600     else if (format == ASCII_FORMAT_INFO) {
601       return 0;
602     }
603   }
604 
605   if (vtype == DRAW_VIEWER) {
606     Draw       draw;
607     PetscTruth isnull;
608     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
609     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
610   }
611 
612   if (vtype == ASCII_FILE_VIEWER) {
613     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
614     PetscSequentialPhaseBegin(mat->comm,1);
615     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
616            aij->rank,aij->m,aij->rstart,aij->rend,aij->n,aij->cstart,
617            aij->cend);
618     ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
619     ierr = MatView(aij->B,viewer); CHKERRQ(ierr);
620     fflush(fd);
621     PetscSequentialPhaseEnd(mat->comm,1);
622   }
623   else {
624     int size = aij->size;
625     rank = aij->rank;
626     if (size == 1) {
627       ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
628     }
629     else {
630       /* assemble the entire matrix onto first processor. */
631       Mat         A;
632       Mat_SeqAIJ *Aloc;
633       int         M = aij->M, N = aij->N,m,*ai,*aj,row,*cols,i,*ct;
634       Scalar      *a;
635 
636       if (!rank) {
637         ierr = MatCreateMPIAIJ(mat->comm,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
638                CHKERRQ(ierr);
639       }
640       else {
641         ierr = MatCreateMPIAIJ(mat->comm,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
642                CHKERRQ(ierr);
643       }
644       PLogObjectParent(mat,A);
645 
646       /* copy over the A part */
647       Aloc = (Mat_SeqAIJ*) aij->A->data;
648       m = Aloc->m; ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
649       row = aij->rstart;
650       for ( i=0; i<ai[m]+shift; i++ ) {aj[i] += aij->cstart + shift;}
651       for ( i=0; i<m; i++ ) {
652         ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],aj,a,INSERT_VALUES);CHKERRQ(ierr);
653         row++; a += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
654       }
655       aj = Aloc->j;
656       for ( i=0; i<ai[m]+shift; i++ ) {aj[i] -= aij->cstart + shift;}
657 
658       /* copy over the B part */
659       Aloc = (Mat_SeqAIJ*) aij->B->data;
660       m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
661       row = aij->rstart;
662       ct = cols = (int *) PetscMalloc( (ai[m]+1)*sizeof(int) ); CHKPTRQ(cols);
663       for ( i=0; i<ai[m]+shift; i++ ) {cols[i] = aij->garray[aj[i]+shift];}
664       for ( i=0; i<m; i++ ) {
665         ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],cols,a,INSERT_VALUES);CHKERRQ(ierr);
666         row++; a += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
667       }
668       PetscFree(ct);
669       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
670       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
671       if (!rank) {
672         ierr = MatView(((Mat_MPIAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
673       }
674       ierr = MatDestroy(A); CHKERRQ(ierr);
675     }
676   }
677   return 0;
678 }
679 
680 static int MatView_MPIAIJ(PetscObject obj,Viewer viewer)
681 {
682   Mat         mat = (Mat) obj;
683   int         ierr;
684   ViewerType  vtype;
685 
686   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
687   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
688       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
689     ierr = MatView_MPIAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
690   }
691   else if (vtype == BINARY_FILE_VIEWER) {
692     return MatView_MPIAIJ_Binary(mat,viewer);
693   }
694   return 0;
695 }
696 
697 /*
698     This has to provide several versions.
699 
700      2) a) use only local smoothing updating outer values only once.
701         b) local smoothing updating outer values each inner iteration
702      3) color updating out values betwen colors.
703 */
704 static int MatRelax_MPIAIJ(Mat matin,Vec bb,double omega,MatSORType flag,
705                            double fshift,int its,Vec xx)
706 {
707   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
708   Mat        AA = mat->A, BB = mat->B;
709   Mat_SeqAIJ *A = (Mat_SeqAIJ *) AA->data, *B = (Mat_SeqAIJ *)BB->data;
710   Scalar     *b,*x,*xs,*ls,d,*v,sum;
711   int        ierr,*idx, *diag;
712   int        n = mat->n, m = mat->m, i,shift = A->indexshift;
713 
714   VecGetArray(xx,&x); VecGetArray(bb,&b); VecGetArray(mat->lvec,&ls);
715   xs = x + shift; /* shift by one for index start of 1 */
716   ls = ls + shift;
717   if (!A->diag) {if ((ierr = MatMarkDiag_SeqAIJ(AA))) return ierr;}
718   diag = A->diag;
719   if ((flag & SOR_LOCAL_SYMMETRIC_SWEEP) == SOR_LOCAL_SYMMETRIC_SWEEP){
720     if (flag & SOR_ZERO_INITIAL_GUESS) {
721       return (*mat->A->ops.relax)(mat->A,bb,omega,flag,fshift,its,xx);
722     }
723     ierr=VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,mat->Mvctx);
724     CHKERRQ(ierr);
725     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,mat->Mvctx);
726     CHKERRQ(ierr);
727     while (its--) {
728       /* go down through the rows */
729       for ( i=0; i<m; i++ ) {
730         n    = A->i[i+1] - A->i[i];
731         idx  = A->j + A->i[i] + shift;
732         v    = A->a + A->i[i] + shift;
733         sum  = b[i];
734         SPARSEDENSEMDOT(sum,xs,v,idx,n);
735         d    = fshift + A->a[diag[i]+shift];
736         n    = B->i[i+1] - B->i[i];
737         idx  = B->j + B->i[i] + shift;
738         v    = B->a + B->i[i] + shift;
739         SPARSEDENSEMDOT(sum,ls,v,idx,n);
740         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
741       }
742       /* come up through the rows */
743       for ( i=m-1; i>-1; i-- ) {
744         n    = A->i[i+1] - A->i[i];
745         idx  = A->j + A->i[i] + shift;
746         v    = A->a + A->i[i] + shift;
747         sum  = b[i];
748         SPARSEDENSEMDOT(sum,xs,v,idx,n);
749         d    = fshift + A->a[diag[i]+shift];
750         n    = B->i[i+1] - B->i[i];
751         idx  = B->j + B->i[i] + shift;
752         v    = B->a + B->i[i] + shift;
753         SPARSEDENSEMDOT(sum,ls,v,idx,n);
754         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
755       }
756     }
757   }
758   else if (flag & SOR_LOCAL_FORWARD_SWEEP){
759     if (flag & SOR_ZERO_INITIAL_GUESS) {
760       return (*mat->A->ops.relax)(mat->A,bb,omega,flag,fshift,its,xx);
761     }
762     ierr=VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,mat->Mvctx);
763     CHKERRQ(ierr);
764     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,mat->Mvctx);
765     CHKERRQ(ierr);
766     while (its--) {
767       for ( i=0; i<m; i++ ) {
768         n    = A->i[i+1] - A->i[i];
769         idx  = A->j + A->i[i] + shift;
770         v    = A->a + A->i[i] + shift;
771         sum  = b[i];
772         SPARSEDENSEMDOT(sum,xs,v,idx,n);
773         d    = fshift + A->a[diag[i]+shift];
774         n    = B->i[i+1] - B->i[i];
775         idx  = B->j + B->i[i] + shift;
776         v    = B->a + B->i[i] + shift;
777         SPARSEDENSEMDOT(sum,ls,v,idx,n);
778         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
779       }
780     }
781   }
782   else if (flag & SOR_LOCAL_BACKWARD_SWEEP){
783     if (flag & SOR_ZERO_INITIAL_GUESS) {
784       return (*mat->A->ops.relax)(mat->A,bb,omega,flag,fshift,its,xx);
785     }
786     ierr = VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,
787                             mat->Mvctx); CHKERRQ(ierr);
788     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,
789                             mat->Mvctx); CHKERRQ(ierr);
790     while (its--) {
791       for ( i=m-1; i>-1; i-- ) {
792         n    = A->i[i+1] - A->i[i];
793         idx  = A->j + A->i[i] + shift;
794         v    = A->a + A->i[i] + shift;
795         sum  = b[i];
796         SPARSEDENSEMDOT(sum,xs,v,idx,n);
797         d    = fshift + A->a[diag[i]+shift];
798         n    = B->i[i+1] - B->i[i];
799         idx  = B->j + B->i[i] + shift;
800         v    = B->a + B->i[i] + shift;
801         SPARSEDENSEMDOT(sum,ls,v,idx,n);
802         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
803       }
804     }
805   }
806   else {
807     SETERRQ(1,"MatRelax_MPIAIJ:Option not supported");
808   }
809   return 0;
810 }
811 
812 static int MatGetInfo_MPIAIJ(Mat matin,MatInfoType flag,int *nz,
813                              int *nzalloc,int *mem)
814 {
815   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
816   Mat        A = mat->A, B = mat->B;
817   int        ierr, isend[3], irecv[3], nzA, nzallocA, memA;
818 
819   ierr = MatGetInfo(A,MAT_LOCAL,&nzA,&nzallocA,&memA); CHKERRQ(ierr);
820   ierr = MatGetInfo(B,MAT_LOCAL,&isend[0],&isend[1],&isend[2]); CHKERRQ(ierr);
821   isend[0] += nzA; isend[1] += nzallocA; isend[2] += memA;
822   if (flag == MAT_LOCAL) {
823     if (nz)       *nz      = isend[0];
824     if (nzalloc)  *nzalloc = isend[1];
825     if (mem)      *mem     = isend[2];
826   } else if (flag == MAT_GLOBAL_MAX) {
827     MPI_Allreduce( isend, irecv,3,MPI_INT,MPI_MAX,matin->comm);
828     if (nz)      *nz      = irecv[0];
829     if (nzalloc) *nzalloc = irecv[1];
830     if (mem)     *mem     = irecv[2];
831   } else if (flag == MAT_GLOBAL_SUM) {
832     MPI_Allreduce( isend, irecv,3,MPI_INT,MPI_SUM,matin->comm);
833     if (nz)      *nz      = irecv[0];
834     if (nzalloc) *nzalloc = irecv[1];
835     if (mem)     *mem     = irecv[2];
836   }
837   return 0;
838 }
839 
840 extern int MatLUFactorSymbolic_MPIAIJ(Mat,IS,IS,double,Mat*);
841 extern int MatLUFactorNumeric_MPIAIJ(Mat,Mat*);
842 extern int MatLUFactor_MPIAIJ(Mat,IS,IS,double);
843 extern int MatILUFactorSymbolic_MPIAIJ(Mat,IS,IS,double,int,Mat *);
844 extern int MatSolve_MPIAIJ(Mat,Vec,Vec);
845 extern int MatSolveAdd_MPIAIJ(Mat,Vec,Vec,Vec);
846 extern int MatSolveTrans_MPIAIJ(Mat,Vec,Vec);
847 extern int MatSolveTransAdd_MPIAIJ(Mat,Vec,Vec,Vec);
848 
849 static int MatSetOption_MPIAIJ(Mat A,MatOption op)
850 {
851   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
852 
853   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
854       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
855       op == MAT_COLUMNS_SORTED ||
856       op == MAT_ROW_ORIENTED) {
857         MatSetOption(a->A,op);
858         MatSetOption(a->B,op);
859   }
860   else if (op == MAT_ROWS_SORTED ||
861            op == MAT_SYMMETRIC ||
862            op == MAT_STRUCTURALLY_SYMMETRIC ||
863            op == MAT_YES_NEW_DIAGONALS)
864     PLogInfo(A,"Info:MatSetOption_MPIAIJ:Option ignored\n");
865   else if (op == MAT_COLUMN_ORIENTED) {
866     a->roworiented = 0;
867     MatSetOption(a->A,op);
868     MatSetOption(a->B,op);
869   }
870   else if (op == MAT_NO_NEW_DIAGONALS)
871     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIAIJ:MAT_NO_NEW_DIAGONALS");}
872   else
873     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIAIJ:unknown option");}
874   return 0;
875 }
876 
877 static int MatGetSize_MPIAIJ(Mat matin,int *m,int *n)
878 {
879   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
880   *m = mat->M; *n = mat->N;
881   return 0;
882 }
883 
884 static int MatGetLocalSize_MPIAIJ(Mat matin,int *m,int *n)
885 {
886   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
887   *m = mat->m; *n = mat->N;
888   return 0;
889 }
890 
891 static int MatGetOwnershipRange_MPIAIJ(Mat matin,int *m,int *n)
892 {
893   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
894   *m = mat->rstart; *n = mat->rend;
895   return 0;
896 }
897 
898 extern int MatGetRow_SeqAIJ(Mat,int,int*,int**,Scalar**);
899 extern int MatRestoreRow_SeqAIJ(Mat,int,int*,int**,Scalar**);
900 
901 int MatGetRow_MPIAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
902 {
903   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
904   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
905   int        i, ierr, *cworkA, *cworkB, **pcA, **pcB, cstart = mat->cstart;
906   int        nztot, nzA, nzB, lrow, rstart = mat->rstart, rend = mat->rend;
907   int        *cmap, *idx_p;
908 
909   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,"MatGetRow_MPIAIJ:Already active");
910   mat->getrowactive = PETSC_TRUE;
911 
912   if (!mat->rowvalues && (idx || v)) {
913     /*
914         allocate enough space to hold information from the longest row.
915     */
916     Mat_SeqAIJ *Aa = (Mat_SeqAIJ *) mat->A->data,*Ba = (Mat_SeqAIJ *) mat->B->data;
917     int     max = 1,m = mat->m,tmp;
918     for ( i=0; i<m; i++ ) {
919       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
920       if (max < tmp) { max = tmp; }
921     }
922     mat->rowvalues = (Scalar *) PetscMalloc( max*(sizeof(int)+sizeof(Scalar)));
923     CHKPTRQ(mat->rowvalues);
924     mat->rowindices = (int *) (mat->rowvalues + max);
925   }
926 
927   if (row < rstart || row >= rend) SETERRQ(1,"MatGetRow_MPIAIJ:Only local rows")
928   lrow = row - rstart;
929 
930   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
931   if (!v)   {pvA = 0; pvB = 0;}
932   if (!idx) {pcA = 0; if (!v) pcB = 0;}
933   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
934   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
935   nztot = nzA + nzB;
936 
937   cmap  = mat->garray;
938   if (v  || idx) {
939     if (nztot) {
940       /* Sort by increasing column numbers, assuming A and B already sorted */
941       int imark = -1;
942       if (v) {
943         *v = v_p = mat->rowvalues;
944         for ( i=0; i<nzB; i++ ) {
945           if (cmap[cworkB[i]] < cstart)   v_p[i] = vworkB[i];
946           else break;
947         }
948         imark = i;
949         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
950         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
951       }
952       if (idx) {
953         *idx = idx_p = mat->rowindices;
954         if (imark > -1) {
955           for ( i=0; i<imark; i++ ) {
956             idx_p[i] = cmap[cworkB[i]];
957           }
958         } else {
959           for ( i=0; i<nzB; i++ ) {
960             if (cmap[cworkB[i]] < cstart)   idx_p[i] = cmap[cworkB[i]];
961             else break;
962           }
963           imark = i;
964         }
965         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart + cworkA[i];
966         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]];
967       }
968     }
969     else {
970       if (idx) *idx = 0;
971       if (v)   *v   = 0;
972     }
973   }
974   *nz = nztot;
975   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
976   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
977   return 0;
978 }
979 
980 int MatRestoreRow_MPIAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
981 {
982   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
983   if (aij->getrowactive == PETSC_FALSE) {
984     SETERRQ(1,"MatRestoreRow_MPIAIJ:MatGetRow not called");
985   }
986   aij->getrowactive = PETSC_FALSE;
987   return 0;
988 }
989 
990 static int MatNorm_MPIAIJ(Mat mat,NormType type,double *norm)
991 {
992   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
993   Mat_SeqAIJ *amat = (Mat_SeqAIJ*) aij->A->data, *bmat = (Mat_SeqAIJ*) aij->B->data;
994   int        ierr, i, j, cstart = aij->cstart,shift = amat->indexshift;
995   double     sum = 0.0;
996   Scalar     *v;
997 
998   if (aij->size == 1) {
999     ierr =  MatNorm(aij->A,type,norm); CHKERRQ(ierr);
1000   } else {
1001     if (type == NORM_FROBENIUS) {
1002       v = amat->a;
1003       for (i=0; i<amat->nz; i++ ) {
1004 #if defined(PETSC_COMPLEX)
1005         sum += real(conj(*v)*(*v)); v++;
1006 #else
1007         sum += (*v)*(*v); v++;
1008 #endif
1009       }
1010       v = bmat->a;
1011       for (i=0; i<bmat->nz; i++ ) {
1012 #if defined(PETSC_COMPLEX)
1013         sum += real(conj(*v)*(*v)); v++;
1014 #else
1015         sum += (*v)*(*v); v++;
1016 #endif
1017       }
1018       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
1019       *norm = sqrt(*norm);
1020     }
1021     else if (type == NORM_1) { /* max column norm */
1022       double *tmp, *tmp2;
1023       int    *jj, *garray = aij->garray;
1024       tmp  = (double *) PetscMalloc( aij->N*sizeof(double) ); CHKPTRQ(tmp);
1025       tmp2 = (double *) PetscMalloc( aij->N*sizeof(double) ); CHKPTRQ(tmp2);
1026       PetscMemzero(tmp,aij->N*sizeof(double));
1027       *norm = 0.0;
1028       v = amat->a; jj = amat->j;
1029       for ( j=0; j<amat->nz; j++ ) {
1030         tmp[cstart + *jj++ + shift] += PetscAbsScalar(*v);  v++;
1031       }
1032       v = bmat->a; jj = bmat->j;
1033       for ( j=0; j<bmat->nz; j++ ) {
1034         tmp[garray[*jj++ + shift]] += PetscAbsScalar(*v); v++;
1035       }
1036       MPI_Allreduce(tmp,tmp2,aij->N,MPI_DOUBLE,MPI_SUM,mat->comm);
1037       for ( j=0; j<aij->N; j++ ) {
1038         if (tmp2[j] > *norm) *norm = tmp2[j];
1039       }
1040       PetscFree(tmp); PetscFree(tmp2);
1041     }
1042     else if (type == NORM_INFINITY) { /* max row norm */
1043       double ntemp = 0.0;
1044       for ( j=0; j<amat->m; j++ ) {
1045         v = amat->a + amat->i[j] + shift;
1046         sum = 0.0;
1047         for ( i=0; i<amat->i[j+1]-amat->i[j]; i++ ) {
1048           sum += PetscAbsScalar(*v); v++;
1049         }
1050         v = bmat->a + bmat->i[j] + shift;
1051         for ( i=0; i<bmat->i[j+1]-bmat->i[j]; i++ ) {
1052           sum += PetscAbsScalar(*v); v++;
1053         }
1054         if (sum > ntemp) ntemp = sum;
1055       }
1056       MPI_Allreduce(&ntemp,norm,1,MPI_DOUBLE,MPI_MAX,mat->comm);
1057     }
1058     else {
1059       SETERRQ(1,"MatNorm_MPIAIJ:No support for two norm");
1060     }
1061   }
1062   return 0;
1063 }
1064 
1065 static int MatTranspose_MPIAIJ(Mat A,Mat *matout)
1066 {
1067   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
1068   Mat_SeqAIJ *Aloc = (Mat_SeqAIJ *) a->A->data;
1069   int        ierr,shift = Aloc->indexshift;
1070   Mat        B;
1071   int        M = a->M, N = a->N,m,*ai,*aj,row,*cols,i,*ct;
1072   Scalar     *array;
1073 
1074   if (matout == PETSC_NULL && M != N)
1075     SETERRQ(1,"MatTranspose_MPIAIJ:Square matrix only for in-place");
1076   ierr = MatCreateMPIAIJ(A->comm,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,
1077          PETSC_NULL,&B); CHKERRQ(ierr);
1078 
1079   /* copy over the A part */
1080   Aloc = (Mat_SeqAIJ*) a->A->data;
1081   m = Aloc->m; ai = Aloc->i; aj = Aloc->j; array = Aloc->a;
1082   row = a->rstart;
1083   for ( i=0; i<ai[m]+shift; i++ ) {aj[i] += a->cstart + shift;}
1084   for ( i=0; i<m; i++ ) {
1085     ierr = MatSetValues(B,ai[i+1]-ai[i],aj,1,&row,array,INSERT_VALUES);CHKERRQ(ierr);
1086     row++; array += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
1087   }
1088   aj = Aloc->j;
1089   for ( i=0; i<ai[m]+shift; i++ ) {aj[i] -= a->cstart + shift;}
1090 
1091   /* copy over the B part */
1092   Aloc = (Mat_SeqAIJ*) a->B->data;
1093   m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; array = Aloc->a;
1094   row = a->rstart;
1095   ct = cols = (int *) PetscMalloc( (1+ai[m]-shift)*sizeof(int) ); CHKPTRQ(cols);
1096   for ( i=0; i<ai[m]+shift; i++ ) {cols[i] = a->garray[aj[i]+shift];}
1097   for ( i=0; i<m; i++ ) {
1098     ierr = MatSetValues(B,ai[i+1]-ai[i],cols,1,&row,array,INSERT_VALUES);CHKERRQ(ierr);
1099     row++; array += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
1100   }
1101   PetscFree(ct);
1102   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1103   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1104   if (matout != PETSC_NULL) {
1105     *matout = B;
1106   } else {
1107     /* This isn't really an in-place transpose .... but free data structures from a */
1108     PetscFree(a->rowners);
1109     ierr = MatDestroy(a->A); CHKERRQ(ierr);
1110     ierr = MatDestroy(a->B); CHKERRQ(ierr);
1111     if (a->colmap) PetscFree(a->colmap);
1112     if (a->garray) PetscFree(a->garray);
1113     if (a->lvec) VecDestroy(a->lvec);
1114     if (a->Mvctx) VecScatterDestroy(a->Mvctx);
1115     PetscFree(a);
1116     PetscMemcpy(A,B,sizeof(struct _Mat));
1117     PetscHeaderDestroy(B);
1118   }
1119   return 0;
1120 }
1121 
1122 extern int MatPrintHelp_SeqAIJ(Mat);
1123 static int MatPrintHelp_MPIAIJ(Mat A)
1124 {
1125   Mat_MPIAIJ *a   = (Mat_MPIAIJ*) A->data;
1126 
1127   if (!a->rank) return MatPrintHelp_SeqAIJ(a->A);
1128   else return 0;
1129 }
1130 
1131 static int MatGetBlockSize_MPIAIJ(Mat A,int *bs)
1132 {
1133   *bs = 1;
1134   return 0;
1135 }
1136 
1137 extern int MatConvert_MPIAIJ(Mat,MatType,Mat *);
1138 static int MatConvertSameType_MPIAIJ(Mat,Mat *,int);
1139 extern int MatIncreaseOverlap_MPIAIJ(Mat , int, IS *, int);
1140 int MatGetSubMatrices_MPIAIJ (Mat ,int , IS *,IS *,MatGetSubMatrixCall,Mat **);
1141 /* -------------------------------------------------------------------*/
1142 static struct _MatOps MatOps = {MatSetValues_MPIAIJ,
1143        MatGetRow_MPIAIJ,MatRestoreRow_MPIAIJ,
1144        MatMult_MPIAIJ,MatMultAdd_MPIAIJ,
1145        MatMultTrans_MPIAIJ,MatMultTransAdd_MPIAIJ,
1146        MatSolve_MPIAIJ,MatSolveAdd_MPIAIJ,
1147        MatSolveTrans_MPIAIJ,MatSolveTransAdd_MPIAIJ,
1148        MatLUFactor_MPIAIJ,0,
1149        MatRelax_MPIAIJ,
1150        MatTranspose_MPIAIJ,
1151        MatGetInfo_MPIAIJ,0,
1152        MatGetDiagonal_MPIAIJ,0,MatNorm_MPIAIJ,
1153        MatAssemblyBegin_MPIAIJ,MatAssemblyEnd_MPIAIJ,
1154        0,
1155        MatSetOption_MPIAIJ,MatZeroEntries_MPIAIJ,MatZeroRows_MPIAIJ,
1156        MatGetReordering_MPIAIJ,
1157        MatLUFactorSymbolic_MPIAIJ,MatLUFactorNumeric_MPIAIJ,0,0,
1158        MatGetSize_MPIAIJ,MatGetLocalSize_MPIAIJ,MatGetOwnershipRange_MPIAIJ,
1159        MatILUFactorSymbolic_MPIAIJ,0,
1160        0,0,MatConvert_MPIAIJ,0,0,MatConvertSameType_MPIAIJ,0,0,
1161        0,0,0,
1162        MatGetSubMatrices_MPIAIJ,MatIncreaseOverlap_MPIAIJ,MatGetValues_MPIAIJ,0,
1163        MatPrintHelp_MPIAIJ,
1164        MatScale_MPIAIJ,0,0,0,MatGetBlockSize_MPIAIJ};
1165 
1166 /*@C
1167    MatCreateMPIAIJ - Creates a sparse parallel matrix in AIJ format
1168    (the default parallel PETSc format).  For good matrix assembly performance
1169    the user should preallocate the matrix storage by setting the parameters
1170    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1171    performance can be increased by more than a factor of 50.
1172 
1173    Input Parameters:
1174 .  comm - MPI communicator
1175 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1176            This value should be the same as the local size used in creating the
1177            y vector for the matrix-vector product y = Ax.
1178 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1179            This value should be the same as the local size used in creating the
1180            x vector for the matrix-vector product y = Ax.
1181 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1182 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1183 .  d_nz - number of nonzeros per row in diagonal portion of local submatrix
1184            (same for all local rows)
1185 .  d_nzz - array containing the number of nonzeros in the various rows of the
1186            diagonal portion of the local submatrix (possibly different for each row)
1187            or PETSC_NULL. You must leave room for the diagonal entry even if
1188            it is zero.
1189 .  o_nz - number of nonzeros per row in the off-diagonal portion of local
1190            submatrix (same for all local rows).
1191 .  o_nzz - array containing the number of nonzeros in the various rows of the
1192            off-diagonal portion of the local submatrix (possibly different for
1193            each row) or PETSC_NULL.
1194 
1195    Output Parameter:
1196 .  A - the matrix
1197 
1198    Notes:
1199    The AIJ format (also called the Yale sparse matrix format or
1200    compressed row storage), is fully compatible with standard Fortran 77
1201    storage.  That is, the stored row and column indices can begin at
1202    either one (as in Fortran) or zero.  See the users manual for details.
1203 
1204    The user MUST specify either the local or global matrix dimensions
1205    (possibly both).
1206 
1207    By default, this format uses inodes (identical nodes) when possible.
1208    We search for consecutive rows with the same nonzero structure, thereby
1209    reusing matrix information to achieve increased efficiency.
1210 
1211    Options Database Keys:
1212 $    -mat_aij_no_inode  - Do not use inodes
1213 $    -mat_aij_inode_limit <limit> - Set inode limit.
1214 $        (max limit=5)
1215 $    -mat_aij_oneindex - Internally use indexing starting at 1
1216 $        rather than 0.  Note: When calling MatSetValues(),
1217 $        the user still MUST index entries starting at 0!
1218 
1219    Storage Information:
1220    For a square global matrix we define each processor's diagonal portion
1221    to be its local rows and the corresponding columns (a square submatrix);
1222    each processor's off-diagonal portion encompasses the remainder of the
1223    local matrix (a rectangular submatrix).
1224 
1225    The user can specify preallocated storage for the diagonal part of
1226    the local submatrix with either d_nz or d_nnz (not both).  Set
1227    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1228    memory allocation.  Likewise, specify preallocated storage for the
1229    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1230 
1231    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1232    the figure below we depict these three local rows and all columns (0-11).
1233 
1234 $          0 1 2 3 4 5 6 7 8 9 10 11
1235 $         -------------------
1236 $  row 3  |  o o o d d d o o o o o o
1237 $  row 4  |  o o o d d d o o o o o o
1238 $  row 5  |  o o o d d d o o o o o o
1239 $         -------------------
1240 $
1241 
1242    Thus, any entries in the d locations are stored in the d (diagonal)
1243    submatrix, and any entries in the o locations are stored in the
1244    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1245    stored simply in the MATSEQAIJ format for compressed row storage.
1246 
1247    Now d_nz should indicate the number of nonzeros per row in the d matrix,
1248    and o_nz should indicate the number of nonzeros per row in the o matrix.
1249    In general, for PDE problems in which most nonzeros are near the diagonal,
1250    one expects d_nz >> o_nz. For large problems you MUST preallocate memory
1251    or you will get TERRIBLE performance; see the users' manual chapter on
1252    matrices and the file $(PETSC_DIR)/Performance.
1253 
1254 .keywords: matrix, aij, compressed row, sparse, parallel
1255 
1256 .seealso: MatCreate(), MatCreateSeqAIJ(), MatSetValues()
1257 @*/
1258 int MatCreateMPIAIJ(MPI_Comm comm,int m,int n,int M,int N,
1259                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1260 {
1261   Mat          B;
1262   Mat_MPIAIJ   *b;
1263   int          ierr, i,sum[2],work[2];
1264 
1265   *A = 0;
1266   PetscHeaderCreate(B,_Mat,MAT_COOKIE,MATMPIAIJ,comm);
1267   PLogObjectCreate(B);
1268   B->data       = (void *) (b = PetscNew(Mat_MPIAIJ)); CHKPTRQ(b);
1269   PetscMemzero(b,sizeof(Mat_MPIAIJ));
1270   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1271   B->destroy    = MatDestroy_MPIAIJ;
1272   B->view       = MatView_MPIAIJ;
1273   B->factor     = 0;
1274   B->assembled  = PETSC_FALSE;
1275 
1276   b->insertmode = NOT_SET_VALUES;
1277   MPI_Comm_rank(comm,&b->rank);
1278   MPI_Comm_size(comm,&b->size);
1279 
1280   if (m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
1281     SETERRQ(1,"MatCreateMPIAIJ:Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1282 
1283   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1284     work[0] = m; work[1] = n;
1285     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
1286     if (M == PETSC_DECIDE) M = sum[0];
1287     if (N == PETSC_DECIDE) N = sum[1];
1288   }
1289   if (m == PETSC_DECIDE) {m = M/b->size + ((M % b->size) > b->rank);}
1290   if (n == PETSC_DECIDE) {n = N/b->size + ((N % b->size) > b->rank);}
1291   b->m = m; B->m = m;
1292   b->n = n; B->n = n;
1293   b->N = N; B->N = N;
1294   b->M = M; B->M = M;
1295 
1296   /* build local table of row and column ownerships */
1297   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1298   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIAIJ));
1299   b->cowners = b->rowners + b->size + 2;
1300   MPI_Allgather(&m,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
1301   b->rowners[0] = 0;
1302   for ( i=2; i<=b->size; i++ ) {
1303     b->rowners[i] += b->rowners[i-1];
1304   }
1305   b->rstart = b->rowners[b->rank];
1306   b->rend   = b->rowners[b->rank+1];
1307   MPI_Allgather(&n,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
1308   b->cowners[0] = 0;
1309   for ( i=2; i<=b->size; i++ ) {
1310     b->cowners[i] += b->cowners[i-1];
1311   }
1312   b->cstart = b->cowners[b->rank];
1313   b->cend   = b->cowners[b->rank+1];
1314 
1315   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1316   ierr = MatCreateSeqAIJ(MPI_COMM_SELF,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1317   PLogObjectParent(B,b->A);
1318   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1319   ierr = MatCreateSeqAIJ(MPI_COMM_SELF,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1320   PLogObjectParent(B,b->B);
1321 
1322   /* build cache for off array entries formed */
1323   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1324   b->colmap      = 0;
1325   b->garray      = 0;
1326   b->roworiented = 1;
1327 
1328   /* stuff used for matrix vector multiply */
1329   b->lvec      = 0;
1330   b->Mvctx     = 0;
1331 
1332   /* stuff for MatGetRow() */
1333   b->rowindices   = 0;
1334   b->rowvalues    = 0;
1335   b->getrowactive = PETSC_FALSE;
1336 
1337   *A = B;
1338   return 0;
1339 }
1340 
1341 static int MatConvertSameType_MPIAIJ(Mat matin,Mat *newmat,int cpvalues)
1342 {
1343   Mat        mat;
1344   Mat_MPIAIJ *a,*oldmat = (Mat_MPIAIJ *) matin->data;
1345   int        ierr, len=0, flg;
1346 
1347   *newmat       = 0;
1348   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIAIJ,matin->comm);
1349   PLogObjectCreate(mat);
1350   mat->data       = (void *) (a = PetscNew(Mat_MPIAIJ)); CHKPTRQ(a);
1351   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1352   mat->destroy    = MatDestroy_MPIAIJ;
1353   mat->view       = MatView_MPIAIJ;
1354   mat->factor     = matin->factor;
1355   mat->assembled  = PETSC_TRUE;
1356 
1357   a->m = mat->m   = oldmat->m;
1358   a->n = mat->n   = oldmat->n;
1359   a->M = mat->M   = oldmat->M;
1360   a->N = mat->N   = oldmat->N;
1361 
1362   a->rstart       = oldmat->rstart;
1363   a->rend         = oldmat->rend;
1364   a->cstart       = oldmat->cstart;
1365   a->cend         = oldmat->cend;
1366   a->size         = oldmat->size;
1367   a->rank         = oldmat->rank;
1368   a->insertmode   = NOT_SET_VALUES;
1369   a->rowvalues    = 0;
1370   a->getrowactive = PETSC_FALSE;
1371 
1372   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1373   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIAIJ));
1374   a->cowners = a->rowners + a->size + 2;
1375   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1376   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1377   if (oldmat->colmap) {
1378     a->colmap = (int *) PetscMalloc((a->N)*sizeof(int));CHKPTRQ(a->colmap);
1379     PLogObjectMemory(mat,(a->N)*sizeof(int));
1380     PetscMemcpy(a->colmap,oldmat->colmap,(a->N)*sizeof(int));
1381   } else a->colmap = 0;
1382   if (oldmat->garray && (len = ((Mat_SeqAIJ *) (oldmat->B->data))->n)) {
1383     a->garray = (int *) PetscMalloc(len*sizeof(int)); CHKPTRQ(a->garray);
1384     PLogObjectMemory(mat,len*sizeof(int));
1385     PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1386   } else a->garray = 0;
1387 
1388   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1389   PLogObjectParent(mat,a->lvec);
1390   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1391   PLogObjectParent(mat,a->Mvctx);
1392   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1393   PLogObjectParent(mat,a->A);
1394   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1395   PLogObjectParent(mat,a->B);
1396   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1397   if (flg) {
1398     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1399   }
1400   *newmat = mat;
1401   return 0;
1402 }
1403 
1404 #include "sys.h"
1405 
1406 int MatLoad_MPIAIJ(Viewer viewer,MatType type,Mat *newmat)
1407 {
1408   Mat          A;
1409   int          i, nz, ierr, j,rstart, rend, fd;
1410   Scalar       *vals,*svals;
1411   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1412   MPI_Status   status;
1413   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,maxnz,*cols;
1414   int          *ourlens,*sndcounts = 0,*procsnz = 0, *offlens,jj,*mycols,*smycols;
1415   int          tag = ((PetscObject)viewer)->tag;
1416 
1417   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1418   if (!rank) {
1419     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1420     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
1421     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIAIJ:not matrix object");
1422   }
1423 
1424   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1425   M = header[1]; N = header[2];
1426   /* determine ownership of all rows */
1427   m = M/size + ((M % size) > rank);
1428   rowners = (int *) PetscMalloc((size+2)*sizeof(int)); CHKPTRQ(rowners);
1429   MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1430   rowners[0] = 0;
1431   for ( i=2; i<=size; i++ ) {
1432     rowners[i] += rowners[i-1];
1433   }
1434   rstart = rowners[rank];
1435   rend   = rowners[rank+1];
1436 
1437   /* distribute row lengths to all processors */
1438   ourlens = (int*) PetscMalloc( 2*(rend-rstart)*sizeof(int) ); CHKPTRQ(ourlens);
1439   offlens = ourlens + (rend-rstart);
1440   if (!rank) {
1441     rowlengths = (int*) PetscMalloc( M*sizeof(int) ); CHKPTRQ(rowlengths);
1442     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1443     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1444     for ( i=0; i<size; i++ ) sndcounts[i] = rowners[i+1] - rowners[i];
1445     MPI_Scatterv(rowlengths,sndcounts,rowners,MPI_INT,ourlens,rend-rstart,MPI_INT,0,comm);
1446     PetscFree(sndcounts);
1447   }
1448   else {
1449     MPI_Scatterv(0,0,0,MPI_INT,ourlens,rend-rstart,MPI_INT, 0,comm);
1450   }
1451 
1452   if (!rank) {
1453     /* calculate the number of nonzeros on each processor */
1454     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1455     PetscMemzero(procsnz,size*sizeof(int));
1456     for ( i=0; i<size; i++ ) {
1457       for ( j=rowners[i]; j< rowners[i+1]; j++ ) {
1458         procsnz[i] += rowlengths[j];
1459       }
1460     }
1461     PetscFree(rowlengths);
1462 
1463     /* determine max buffer needed and allocate it */
1464     maxnz = 0;
1465     for ( i=0; i<size; i++ ) {
1466       maxnz = PetscMax(maxnz,procsnz[i]);
1467     }
1468     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1469 
1470     /* read in my part of the matrix column indices  */
1471     nz = procsnz[0];
1472     mycols = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1473     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1474 
1475     /* read in every one elses and ship off */
1476     for ( i=1; i<size; i++ ) {
1477       nz = procsnz[i];
1478       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1479       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1480     }
1481     PetscFree(cols);
1482   }
1483   else {
1484     /* determine buffer space needed for message */
1485     nz = 0;
1486     for ( i=0; i<m; i++ ) {
1487       nz += ourlens[i];
1488     }
1489     mycols = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1490 
1491     /* receive message of column indices*/
1492     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1493     MPI_Get_count(&status,MPI_INT,&maxnz);
1494     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIAIJ:something is wrong with file");
1495   }
1496 
1497   /* loop over local rows, determining number of off diagonal entries */
1498   PetscMemzero(offlens,m*sizeof(int));
1499   jj = 0;
1500   for ( i=0; i<m; i++ ) {
1501     for ( j=0; j<ourlens[i]; j++ ) {
1502       if (mycols[jj] < rstart || mycols[jj] >= rend) offlens[i]++;
1503       jj++;
1504     }
1505   }
1506 
1507   /* create our matrix */
1508   for ( i=0; i<m; i++ ) {
1509     ourlens[i] -= offlens[i];
1510   }
1511   ierr = MatCreateMPIAIJ(comm,m,PETSC_DECIDE,M,N,0,ourlens,0,offlens,newmat);CHKERRQ(ierr);
1512   A = *newmat;
1513   MatSetOption(A,MAT_COLUMNS_SORTED);
1514   for ( i=0; i<m; i++ ) {
1515     ourlens[i] += offlens[i];
1516   }
1517 
1518   if (!rank) {
1519     vals = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(vals);
1520 
1521     /* read in my part of the matrix numerical values  */
1522     nz = procsnz[0];
1523     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1524 
1525     /* insert into matrix */
1526     jj      = rstart;
1527     smycols = mycols;
1528     svals   = vals;
1529     for ( i=0; i<m; i++ ) {
1530       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1531       smycols += ourlens[i];
1532       svals   += ourlens[i];
1533       jj++;
1534     }
1535 
1536     /* read in other processors and ship out */
1537     for ( i=1; i<size; i++ ) {
1538       nz = procsnz[i];
1539       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1540       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1541     }
1542     PetscFree(procsnz);
1543   }
1544   else {
1545     /* receive numeric values */
1546     vals = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(vals);
1547 
1548     /* receive message of values*/
1549     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1550     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1551     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIAIJ:something is wrong with file");
1552 
1553     /* insert into matrix */
1554     jj      = rstart;
1555     smycols = mycols;
1556     svals   = vals;
1557     for ( i=0; i<m; i++ ) {
1558       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1559       smycols += ourlens[i];
1560       svals   += ourlens[i];
1561       jj++;
1562     }
1563   }
1564   PetscFree(ourlens); PetscFree(vals); PetscFree(mycols); PetscFree(rowners);
1565 
1566   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1567   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1568   return 0;
1569 }
1570