xref: /petsc/src/mat/impls/aij/mpi/mpiaij.c (revision 0d4b0b6c1a374e6007cd4b2fbb5567147f7b1605)
1 
2 #ifndef lint
3 static char vcid[] = "$Id: mpiaij.c,v 1.162 1996/08/14 15:08:16 bsmith Exp bsmith $";
4 #endif
5 
6 #include "src/mat/impls/aij/mpi/mpiaij.h"
7 #include "src/vec/vecimpl.h"
8 #include "src/inline/spops.h"
9 
10 /* local utility routine that creates a mapping from the global column
11 number to the local number in the off-diagonal part of the local
12 storage of the matrix.  This is done in a non scable way since the
13 length of colmap equals the global matrix length.
14 */
15 static int CreateColmap_MPIAIJ_Private(Mat mat)
16 {
17   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
18   Mat_SeqAIJ *B = (Mat_SeqAIJ*) aij->B->data;
19   int        n = B->n,i;
20 
21   aij->colmap = (int *) PetscMalloc(aij->N*sizeof(int));CHKPTRQ(aij->colmap);
22   PLogObjectMemory(mat,aij->N*sizeof(int));
23   PetscMemzero(aij->colmap,aij->N*sizeof(int));
24   for ( i=0; i<n; i++ ) aij->colmap[aij->garray[i]] = i+1;
25   return 0;
26 }
27 
28 extern int DisAssemble_MPIAIJ(Mat);
29 
30 static int MatGetReordering_MPIAIJ(Mat mat,MatReordering type,IS *rperm,IS *cperm)
31 {
32   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
33   int        ierr;
34   if (aij->size == 1) {
35     ierr = MatGetReordering(aij->A,type,rperm,cperm); CHKERRQ(ierr);
36   } else SETERRQ(1,"MatGetReordering_MPIAIJ:not supported in parallel");
37   return 0;
38 }
39 
40 static int MatSetValues_MPIAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
41 {
42   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
43   Scalar     value;
44   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
45   int        cstart = aij->cstart, cend = aij->cend,row,col;
46   int        roworiented = aij->roworiented;
47 
48   if (aij->insertmode != NOT_SET_VALUES && aij->insertmode != addv) {
49     SETERRQ(1,"MatSetValues_MPIAIJ:Cannot mix inserts and adds");
50   }
51   aij->insertmode = addv;
52   for ( i=0; i<m; i++ ) {
53     if (im[i] < 0) SETERRQ(1,"MatSetValues_MPIAIJ:Negative row");
54     if (im[i] >= aij->M) SETERRQ(1,"MatSetValues_MPIAIJ:Row too large");
55     if (im[i] >= rstart && im[i] < rend) {
56       row = im[i] - rstart;
57       for ( j=0; j<n; j++ ) {
58         if (in[j] < 0) SETERRQ(1,"MatSetValues_MPIAIJ:Negative column");
59         if (in[j] >= aij->N) SETERRQ(1,"MatSetValues_MPIAIJ:Col too large");
60         if (in[j] >= cstart && in[j] < cend){
61           col = in[j] - cstart;
62           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
63           ierr = MatSetValues(aij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
64         }
65         else {
66           if (mat->was_assembled) {
67             if (!aij->colmap) {
68               ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
69             }
70             col = aij->colmap[in[j]] - 1;
71             if (col < 0 && !((Mat_SeqAIJ*)(aij->A->data))->nonew) {
72               ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
73               col =  in[j];
74             }
75           }
76           else col = in[j];
77           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
78           ierr = MatSetValues(aij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
79         }
80       }
81     }
82     else {
83       if (roworiented) {
84         ierr = StashValues_Private(&aij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
85       }
86       else {
87         row = im[i];
88         for ( j=0; j<n; j++ ) {
89           ierr = StashValues_Private(&aij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
90         }
91       }
92     }
93   }
94   return 0;
95 }
96 
97 static int MatGetValues_MPIAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
98 {
99   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
100   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
101   int        cstart = aij->cstart, cend = aij->cend,row,col;
102 
103   for ( i=0; i<m; i++ ) {
104     if (idxm[i] < 0) SETERRQ(1,"MatGetValues_MPIAIJ:Negative row");
105     if (idxm[i] >= aij->M) SETERRQ(1,"MatGetValues_MPIAIJ:Row too large");
106     if (idxm[i] >= rstart && idxm[i] < rend) {
107       row = idxm[i] - rstart;
108       for ( j=0; j<n; j++ ) {
109         if (idxn[j] < 0) SETERRQ(1,"MatGetValues_MPIAIJ:Negative column");
110         if (idxn[j] >= aij->N) SETERRQ(1,"MatGetValues_MPIAIJ:Col too large");
111         if (idxn[j] >= cstart && idxn[j] < cend){
112           col = idxn[j] - cstart;
113           ierr = MatGetValues(aij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
114         }
115         else {
116           if (!aij->colmap) {
117             ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
118           }
119           col = aij->colmap[idxn[j]] - 1;
120           if ((col < 0) || (aij->garray[col] != idxn[j])) *(v+i*n+j) = 0.0;
121           else {
122             ierr = MatGetValues(aij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
123           }
124         }
125       }
126     }
127     else {
128       SETERRQ(1,"MatGetValues_MPIAIJ:Only local values currently supported");
129     }
130   }
131   return 0;
132 }
133 
134 static int MatAssemblyBegin_MPIAIJ(Mat mat,MatAssemblyType mode)
135 {
136   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
137   MPI_Comm    comm = mat->comm;
138   int         size = aij->size, *owners = aij->rowners;
139   int         rank = aij->rank,tag = mat->tag, *owner,*starts,count,ierr;
140   MPI_Request *send_waits,*recv_waits;
141   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
142   InsertMode  addv;
143   Scalar      *rvalues,*svalues;
144 
145   /* make sure all processors are either in INSERTMODE or ADDMODE */
146   MPI_Allreduce(&aij->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
147   if (addv == (ADD_VALUES|INSERT_VALUES)) {
148     SETERRQ(1,"MatAssemblyBegin_MPIAIJ:Some processors inserted others added");
149   }
150   aij->insertmode = addv; /* in case this processor had no cache */
151 
152   /*  first count number of contributors to each processor */
153   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
154   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
155   owner = (int *) PetscMalloc( (aij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
156   for ( i=0; i<aij->stash.n; i++ ) {
157     idx = aij->stash.idx[i];
158     for ( j=0; j<size; j++ ) {
159       if (idx >= owners[j] && idx < owners[j+1]) {
160         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
161       }
162     }
163   }
164   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
165 
166   /* inform other processors of number of messages and max length*/
167   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
168   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
169   nreceives = work[rank];
170   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
171   nmax = work[rank];
172   PetscFree(work);
173 
174   /* post receives:
175        1) each message will consist of ordered pairs
176      (global index,value) we store the global index as a double
177      to simplify the message passing.
178        2) since we don't know how long each individual message is we
179      allocate the largest needed buffer for each receive. Potentially
180      this is a lot of wasted space.
181 
182 
183        This could be done better.
184   */
185   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
186   CHKPTRQ(rvalues);
187   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
188   CHKPTRQ(recv_waits);
189   for ( i=0; i<nreceives; i++ ) {
190     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
191               comm,recv_waits+i);
192   }
193 
194   /* do sends:
195       1) starts[i] gives the starting index in svalues for stuff going to
196          the ith processor
197   */
198   svalues = (Scalar *) PetscMalloc(3*(aij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
199   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
200   CHKPTRQ(send_waits);
201   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
202   starts[0] = 0;
203   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
204   for ( i=0; i<aij->stash.n; i++ ) {
205     svalues[3*starts[owner[i]]]       = (Scalar)  aij->stash.idx[i];
206     svalues[3*starts[owner[i]]+1]     = (Scalar)  aij->stash.idy[i];
207     svalues[3*(starts[owner[i]]++)+2] =  aij->stash.array[i];
208   }
209   PetscFree(owner);
210   starts[0] = 0;
211   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
212   count = 0;
213   for ( i=0; i<size; i++ ) {
214     if (procs[i]) {
215       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
216                 comm,send_waits+count++);
217     }
218   }
219   PetscFree(starts); PetscFree(nprocs);
220 
221   /* Free cache space */
222   PLogInfo(0,"[%d]MatAssemblyBegin_MPIAIJ:Number of off processor values %d\n",rank,aij->stash.n);
223   ierr = StashDestroy_Private(&aij->stash); CHKERRQ(ierr);
224 
225   aij->svalues    = svalues;    aij->rvalues    = rvalues;
226   aij->nsends     = nsends;     aij->nrecvs     = nreceives;
227   aij->send_waits = send_waits; aij->recv_waits = recv_waits;
228   aij->rmax       = nmax;
229 
230   return 0;
231 }
232 extern int MatSetUpMultiply_MPIAIJ(Mat);
233 
234 static int MatAssemblyEnd_MPIAIJ(Mat mat,MatAssemblyType mode)
235 {
236   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
237   MPI_Status  *send_status,recv_status;
238   int         imdex,nrecvs = aij->nrecvs, count = nrecvs, i, n, ierr;
239   int         row,col,other_disassembled;
240   Scalar      *values,val;
241   InsertMode  addv = aij->insertmode;
242 
243   /*  wait on receives */
244   while (count) {
245     MPI_Waitany(nrecvs,aij->recv_waits,&imdex,&recv_status);
246     /* unpack receives into our local space */
247     values = aij->rvalues + 3*imdex*aij->rmax;
248     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
249     n = n/3;
250     for ( i=0; i<n; i++ ) {
251       row = (int) PetscReal(values[3*i]) - aij->rstart;
252       col = (int) PetscReal(values[3*i+1]);
253       val = values[3*i+2];
254       if (col >= aij->cstart && col < aij->cend) {
255         col -= aij->cstart;
256         MatSetValues(aij->A,1,&row,1,&col,&val,addv);
257       }
258       else {
259         if (mat->was_assembled) {
260           if (!aij->colmap) {
261             ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
262           }
263           col = aij->colmap[col] - 1;
264           if (col < 0  && !((Mat_SeqAIJ*)(aij->A->data))->nonew) {
265             ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
266             col = (int) PetscReal(values[3*i+1]);
267           }
268         }
269         MatSetValues(aij->B,1,&row,1,&col,&val,addv);
270       }
271     }
272     count--;
273   }
274   PetscFree(aij->recv_waits); PetscFree(aij->rvalues);
275 
276   /* wait on sends */
277   if (aij->nsends) {
278     send_status = (MPI_Status *) PetscMalloc(aij->nsends*sizeof(MPI_Status));
279     CHKPTRQ(send_status);
280     MPI_Waitall(aij->nsends,aij->send_waits,send_status);
281     PetscFree(send_status);
282   }
283   PetscFree(aij->send_waits); PetscFree(aij->svalues);
284 
285   aij->insertmode = NOT_SET_VALUES;
286   ierr = MatAssemblyBegin(aij->A,mode); CHKERRQ(ierr);
287   ierr = MatAssemblyEnd(aij->A,mode); CHKERRQ(ierr);
288 
289   /* determine if any processor has disassembled, if so we must
290      also disassemble ourselfs, in order that we may reassemble. */
291   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
292   if (mat->was_assembled && !other_disassembled) {
293     ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
294   }
295 
296   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
297     ierr = MatSetUpMultiply_MPIAIJ(mat); CHKERRQ(ierr);
298   }
299   ierr = MatAssemblyBegin(aij->B,mode); CHKERRQ(ierr);
300   ierr = MatAssemblyEnd(aij->B,mode); CHKERRQ(ierr);
301 
302   if (aij->rowvalues) {PetscFree(aij->rowvalues); aij->rowvalues = 0;}
303   return 0;
304 }
305 
306 static int MatZeroEntries_MPIAIJ(Mat A)
307 {
308   Mat_MPIAIJ *l = (Mat_MPIAIJ *) A->data;
309   int        ierr;
310   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
311   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
312   return 0;
313 }
314 
315 /* the code does not do the diagonal entries correctly unless the
316    matrix is square and the column and row owerships are identical.
317    This is a BUG. The only way to fix it seems to be to access
318    aij->A and aij->B directly and not through the MatZeroRows()
319    routine.
320 */
321 static int MatZeroRows_MPIAIJ(Mat A,IS is,Scalar *diag)
322 {
323   Mat_MPIAIJ     *l = (Mat_MPIAIJ *) A->data;
324   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
325   int            *procs,*nprocs,j,found,idx,nsends,*work;
326   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
327   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
328   int            *lens,imdex,*lrows,*values;
329   MPI_Comm       comm = A->comm;
330   MPI_Request    *send_waits,*recv_waits;
331   MPI_Status     recv_status,*send_status;
332   IS             istmp;
333 
334   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
335   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
336 
337   /*  first count number of contributors to each processor */
338   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
339   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
340   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
341   for ( i=0; i<N; i++ ) {
342     idx = rows[i];
343     found = 0;
344     for ( j=0; j<size; j++ ) {
345       if (idx >= owners[j] && idx < owners[j+1]) {
346         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
347       }
348     }
349     if (!found) SETERRQ(1,"MatZeroRows_MPIAIJ:Index out of range");
350   }
351   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
352 
353   /* inform other processors of number of messages and max length*/
354   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
355   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
356   nrecvs = work[rank];
357   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
358   nmax = work[rank];
359   PetscFree(work);
360 
361   /* post receives:   */
362   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
363   CHKPTRQ(rvalues);
364   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
365   CHKPTRQ(recv_waits);
366   for ( i=0; i<nrecvs; i++ ) {
367     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
368   }
369 
370   /* do sends:
371       1) starts[i] gives the starting index in svalues for stuff going to
372          the ith processor
373   */
374   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
375   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
376   CHKPTRQ(send_waits);
377   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
378   starts[0] = 0;
379   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
380   for ( i=0; i<N; i++ ) {
381     svalues[starts[owner[i]]++] = rows[i];
382   }
383   ISRestoreIndices(is,&rows);
384 
385   starts[0] = 0;
386   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
387   count = 0;
388   for ( i=0; i<size; i++ ) {
389     if (procs[i]) {
390       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
391     }
392   }
393   PetscFree(starts);
394 
395   base = owners[rank];
396 
397   /*  wait on receives */
398   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
399   source = lens + nrecvs;
400   count  = nrecvs; slen = 0;
401   while (count) {
402     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
403     /* unpack receives into our local space */
404     MPI_Get_count(&recv_status,MPI_INT,&n);
405     source[imdex]  = recv_status.MPI_SOURCE;
406     lens[imdex]  = n;
407     slen += n;
408     count--;
409   }
410   PetscFree(recv_waits);
411 
412   /* move the data into the send scatter */
413   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
414   count = 0;
415   for ( i=0; i<nrecvs; i++ ) {
416     values = rvalues + i*nmax;
417     for ( j=0; j<lens[i]; j++ ) {
418       lrows[count++] = values[j] - base;
419     }
420   }
421   PetscFree(rvalues); PetscFree(lens);
422   PetscFree(owner); PetscFree(nprocs);
423 
424   /* actually zap the local rows */
425   ierr = ISCreateGeneral(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
426   PLogObjectParent(A,istmp);
427   PetscFree(lrows);
428   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
429   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
430   ierr = ISDestroy(istmp); CHKERRQ(ierr);
431 
432   /* wait on sends */
433   if (nsends) {
434     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
435     CHKPTRQ(send_status);
436     MPI_Waitall(nsends,send_waits,send_status);
437     PetscFree(send_status);
438   }
439   PetscFree(send_waits); PetscFree(svalues);
440 
441   return 0;
442 }
443 
444 static int MatMult_MPIAIJ(Mat A,Vec xx,Vec yy)
445 {
446   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
447   int        ierr,nt;
448 
449   ierr = VecGetLocalSize(xx,&nt);  CHKERRQ(ierr);
450   if (nt != a->n) {
451     SETERRQ(1,"MatMult_MPIAIJ:Incompatible parition of A and xx");
452   }
453   ierr = VecGetLocalSize(yy,&nt);  CHKERRQ(ierr);
454   if (nt != a->m) {
455     SETERRQ(1,"MatMult_MPIAIJ:Incompatible parition of A and yy");
456   }
457   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx); CHKERRQ(ierr);
458   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
459   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx); CHKERRQ(ierr);
460   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
461   return 0;
462 }
463 
464 static int MatMultAdd_MPIAIJ(Mat A,Vec xx,Vec yy,Vec zz)
465 {
466   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
467   int        ierr;
468   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
469   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
470   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
471   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
472   return 0;
473 }
474 
475 static int MatMultTrans_MPIAIJ(Mat A,Vec xx,Vec yy)
476 {
477   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
478   int        ierr;
479 
480   /* do nondiagonal part */
481   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
482   /* send it on its way */
483   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
484   /* do local part */
485   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
486   /* receive remote parts: note this assumes the values are not actually */
487   /* inserted in yy until the next line, which is true for my implementation*/
488   /* but is not perhaps always true. */
489   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
490   return 0;
491 }
492 
493 static int MatMultTransAdd_MPIAIJ(Mat A,Vec xx,Vec yy,Vec zz)
494 {
495   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
496   int        ierr;
497 
498   /* do nondiagonal part */
499   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
500   /* send it on its way */
501   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
502   /* do local part */
503   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
504   /* receive remote parts: note this assumes the values are not actually */
505   /* inserted in yy until the next line, which is true for my implementation*/
506   /* but is not perhaps always true. */
507   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,
508                   (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
509   return 0;
510 }
511 
512 /*
513   This only works correctly for square matrices where the subblock A->A is the
514    diagonal block
515 */
516 static int MatGetDiagonal_MPIAIJ(Mat A,Vec v)
517 {
518   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
519   if (a->M != a->N)
520     SETERRQ(1,"MatGetDiagonal_MPIAIJ:Supports only square matrix where A->A is diag block");
521   if (a->rstart != a->cstart || a->rend != a->cend) {
522     SETERRQ(1,"MatGetDiagonal_MPIAIJ:row partition must equal col partition");  }
523   return MatGetDiagonal(a->A,v);
524 }
525 
526 static int MatScale_MPIAIJ(Scalar *aa,Mat A)
527 {
528   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
529   int        ierr;
530   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
531   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
532   return 0;
533 }
534 
535 static int MatDestroy_MPIAIJ(PetscObject obj)
536 {
537   Mat        mat = (Mat) obj;
538   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
539   int        ierr;
540 #if defined(PETSC_LOG)
541   PLogObjectState(obj,"Rows=%d, Cols=%d",aij->M,aij->N);
542 #endif
543   PetscFree(aij->rowners);
544   ierr = MatDestroy(aij->A); CHKERRQ(ierr);
545   ierr = MatDestroy(aij->B); CHKERRQ(ierr);
546   if (aij->colmap) PetscFree(aij->colmap);
547   if (aij->garray) PetscFree(aij->garray);
548   if (aij->lvec)   VecDestroy(aij->lvec);
549   if (aij->Mvctx)  VecScatterDestroy(aij->Mvctx);
550   if (aij->rowvalues) PetscFree(aij->rowvalues);
551   PetscFree(aij);
552   PLogObjectDestroy(mat);
553   PetscHeaderDestroy(mat);
554   return 0;
555 }
556 #include "draw.h"
557 #include "pinclude/pviewer.h"
558 
559 static int MatView_MPIAIJ_Binary(Mat mat,Viewer viewer)
560 {
561   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
562   int         ierr;
563 
564   if (aij->size == 1) {
565     ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
566   }
567   else SETERRQ(1,"MatView_MPIAIJ_Binary:Only uniprocessor output supported");
568   return 0;
569 }
570 
571 static int MatView_MPIAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
572 {
573   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
574   Mat_SeqAIJ* C = (Mat_SeqAIJ*)aij->A->data;
575   int         ierr, format,shift = C->indexshift,rank;
576   FILE        *fd;
577   ViewerType  vtype;
578 
579   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
580   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
581     ierr = ViewerGetFormat(viewer,&format);
582     if (format == ASCII_FORMAT_INFO_DETAILED) {
583       int nz, nzalloc, mem, flg;
584       MPI_Comm_rank(mat->comm,&rank);
585       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
586       ierr = MatGetInfo(mat,MAT_LOCAL,&nz,&nzalloc,&mem);
587       ierr = OptionsHasName(PETSC_NULL,"-mat_aij_no_inode",&flg); CHKERRQ(ierr);
588       PetscSequentialPhaseBegin(mat->comm,1);
589       if (flg) fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d mem %d, not using I-node routines\n",
590          rank,aij->m,nz,nzalloc,mem);
591       else fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d mem %d, using I-node routines\n",
592          rank,aij->m,nz,nzalloc,mem);
593       ierr = MatGetInfo(aij->A,MAT_LOCAL,&nz,&nzalloc,&mem);
594       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,nz);
595       ierr = MatGetInfo(aij->B,MAT_LOCAL,&nz,&nzalloc,&mem);
596       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,nz);
597       fflush(fd);
598       PetscSequentialPhaseEnd(mat->comm,1);
599       ierr = VecScatterView(aij->Mvctx,viewer); CHKERRQ(ierr);
600       return 0;
601     }
602     else if (format == ASCII_FORMAT_INFO) {
603       return 0;
604     }
605   }
606 
607   if (vtype == DRAW_VIEWER) {
608     Draw       draw;
609     PetscTruth isnull;
610     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
611     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
612   }
613 
614   if (vtype == ASCII_FILE_VIEWER) {
615     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
616     PetscSequentialPhaseBegin(mat->comm,1);
617     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
618            aij->rank,aij->m,aij->rstart,aij->rend,aij->n,aij->cstart,
619            aij->cend);
620     ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
621     ierr = MatView(aij->B,viewer); CHKERRQ(ierr);
622     fflush(fd);
623     PetscSequentialPhaseEnd(mat->comm,1);
624   }
625   else {
626     int size = aij->size;
627     rank = aij->rank;
628     if (size == 1) {
629       ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
630     }
631     else {
632       /* assemble the entire matrix onto first processor. */
633       Mat         A;
634       Mat_SeqAIJ *Aloc;
635       int         M = aij->M, N = aij->N,m,*ai,*aj,row,*cols,i,*ct;
636       Scalar      *a;
637 
638       if (!rank) {
639         ierr = MatCreateMPIAIJ(mat->comm,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
640                CHKERRQ(ierr);
641       }
642       else {
643         ierr = MatCreateMPIAIJ(mat->comm,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
644                CHKERRQ(ierr);
645       }
646       PLogObjectParent(mat,A);
647 
648       /* copy over the A part */
649       Aloc = (Mat_SeqAIJ*) aij->A->data;
650       m = Aloc->m; ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
651       row = aij->rstart;
652       for ( i=0; i<ai[m]+shift; i++ ) {aj[i] += aij->cstart + shift;}
653       for ( i=0; i<m; i++ ) {
654         ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],aj,a,INSERT_VALUES);CHKERRQ(ierr);
655         row++; a += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
656       }
657       aj = Aloc->j;
658       for ( i=0; i<ai[m]+shift; i++ ) {aj[i] -= aij->cstart + shift;}
659 
660       /* copy over the B part */
661       Aloc = (Mat_SeqAIJ*) aij->B->data;
662       m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
663       row = aij->rstart;
664       ct = cols = (int *) PetscMalloc( (ai[m]+1)*sizeof(int) ); CHKPTRQ(cols);
665       for ( i=0; i<ai[m]+shift; i++ ) {cols[i] = aij->garray[aj[i]+shift];}
666       for ( i=0; i<m; i++ ) {
667         ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],cols,a,INSERT_VALUES);CHKERRQ(ierr);
668         row++; a += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
669       }
670       PetscFree(ct);
671       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
672       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
673       if (!rank) {
674         ierr = MatView(((Mat_MPIAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
675       }
676       ierr = MatDestroy(A); CHKERRQ(ierr);
677     }
678   }
679   return 0;
680 }
681 
682 static int MatView_MPIAIJ(PetscObject obj,Viewer viewer)
683 {
684   Mat         mat = (Mat) obj;
685   int         ierr;
686   ViewerType  vtype;
687 
688   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
689   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
690       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
691     ierr = MatView_MPIAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
692   }
693   else if (vtype == BINARY_FILE_VIEWER) {
694     return MatView_MPIAIJ_Binary(mat,viewer);
695   }
696   return 0;
697 }
698 
699 /*
700     This has to provide several versions.
701 
702      2) a) use only local smoothing updating outer values only once.
703         b) local smoothing updating outer values each inner iteration
704      3) color updating out values betwen colors.
705 */
706 static int MatRelax_MPIAIJ(Mat matin,Vec bb,double omega,MatSORType flag,
707                            double fshift,int its,Vec xx)
708 {
709   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
710   Mat        AA = mat->A, BB = mat->B;
711   Mat_SeqAIJ *A = (Mat_SeqAIJ *) AA->data, *B = (Mat_SeqAIJ *)BB->data;
712   Scalar     *b,*x,*xs,*ls,d,*v,sum;
713   int        ierr,*idx, *diag;
714   int        n = mat->n, m = mat->m, i,shift = A->indexshift;
715 
716   VecGetArray(xx,&x); VecGetArray(bb,&b); VecGetArray(mat->lvec,&ls);
717   xs = x + shift; /* shift by one for index start of 1 */
718   ls = ls + shift;
719   if (!A->diag) {if ((ierr = MatMarkDiag_SeqAIJ(AA))) return ierr;}
720   diag = A->diag;
721   if ((flag & SOR_LOCAL_SYMMETRIC_SWEEP) == SOR_LOCAL_SYMMETRIC_SWEEP){
722     if (flag & SOR_ZERO_INITIAL_GUESS) {
723       return (*mat->A->ops.relax)(mat->A,bb,omega,flag,fshift,its,xx);
724     }
725     ierr=VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,mat->Mvctx);
726     CHKERRQ(ierr);
727     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,mat->Mvctx);
728     CHKERRQ(ierr);
729     while (its--) {
730       /* go down through the rows */
731       for ( i=0; i<m; i++ ) {
732         n    = A->i[i+1] - A->i[i];
733         idx  = A->j + A->i[i] + shift;
734         v    = A->a + A->i[i] + shift;
735         sum  = b[i];
736         SPARSEDENSEMDOT(sum,xs,v,idx,n);
737         d    = fshift + A->a[diag[i]+shift];
738         n    = B->i[i+1] - B->i[i];
739         idx  = B->j + B->i[i] + shift;
740         v    = B->a + B->i[i] + shift;
741         SPARSEDENSEMDOT(sum,ls,v,idx,n);
742         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
743       }
744       /* come up through the rows */
745       for ( i=m-1; i>-1; i-- ) {
746         n    = A->i[i+1] - A->i[i];
747         idx  = A->j + A->i[i] + shift;
748         v    = A->a + A->i[i] + shift;
749         sum  = b[i];
750         SPARSEDENSEMDOT(sum,xs,v,idx,n);
751         d    = fshift + A->a[diag[i]+shift];
752         n    = B->i[i+1] - B->i[i];
753         idx  = B->j + B->i[i] + shift;
754         v    = B->a + B->i[i] + shift;
755         SPARSEDENSEMDOT(sum,ls,v,idx,n);
756         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
757       }
758     }
759   }
760   else if (flag & SOR_LOCAL_FORWARD_SWEEP){
761     if (flag & SOR_ZERO_INITIAL_GUESS) {
762       return (*mat->A->ops.relax)(mat->A,bb,omega,flag,fshift,its,xx);
763     }
764     ierr=VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,mat->Mvctx);
765     CHKERRQ(ierr);
766     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,mat->Mvctx);
767     CHKERRQ(ierr);
768     while (its--) {
769       for ( i=0; i<m; i++ ) {
770         n    = A->i[i+1] - A->i[i];
771         idx  = A->j + A->i[i] + shift;
772         v    = A->a + A->i[i] + shift;
773         sum  = b[i];
774         SPARSEDENSEMDOT(sum,xs,v,idx,n);
775         d    = fshift + A->a[diag[i]+shift];
776         n    = B->i[i+1] - B->i[i];
777         idx  = B->j + B->i[i] + shift;
778         v    = B->a + B->i[i] + shift;
779         SPARSEDENSEMDOT(sum,ls,v,idx,n);
780         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
781       }
782     }
783   }
784   else if (flag & SOR_LOCAL_BACKWARD_SWEEP){
785     if (flag & SOR_ZERO_INITIAL_GUESS) {
786       return (*mat->A->ops.relax)(mat->A,bb,omega,flag,fshift,its,xx);
787     }
788     ierr = VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,
789                             mat->Mvctx); CHKERRQ(ierr);
790     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,
791                             mat->Mvctx); CHKERRQ(ierr);
792     while (its--) {
793       for ( i=m-1; i>-1; i-- ) {
794         n    = A->i[i+1] - A->i[i];
795         idx  = A->j + A->i[i] + shift;
796         v    = A->a + A->i[i] + shift;
797         sum  = b[i];
798         SPARSEDENSEMDOT(sum,xs,v,idx,n);
799         d    = fshift + A->a[diag[i]+shift];
800         n    = B->i[i+1] - B->i[i];
801         idx  = B->j + B->i[i] + shift;
802         v    = B->a + B->i[i] + shift;
803         SPARSEDENSEMDOT(sum,ls,v,idx,n);
804         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
805       }
806     }
807   }
808   else {
809     SETERRQ(1,"MatRelax_MPIAIJ:Option not supported");
810   }
811   return 0;
812 }
813 
814 static int MatGetInfo_MPIAIJ(Mat matin,MatInfoType flag,int *nz,
815                              int *nzalloc,int *mem)
816 {
817   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
818   Mat        A = mat->A, B = mat->B;
819   int        ierr, isend[3], irecv[3], nzA, nzallocA, memA;
820 
821   ierr = MatGetInfo(A,MAT_LOCAL,&nzA,&nzallocA,&memA); CHKERRQ(ierr);
822   ierr = MatGetInfo(B,MAT_LOCAL,&isend[0],&isend[1],&isend[2]); CHKERRQ(ierr);
823   isend[0] += nzA; isend[1] += nzallocA; isend[2] += memA;
824   if (flag == MAT_LOCAL) {
825     if (nz)       *nz      = isend[0];
826     if (nzalloc)  *nzalloc = isend[1];
827     if (mem)      *mem     = isend[2];
828   } else if (flag == MAT_GLOBAL_MAX) {
829     MPI_Allreduce( isend, irecv,3,MPI_INT,MPI_MAX,matin->comm);
830     if (nz)      *nz      = irecv[0];
831     if (nzalloc) *nzalloc = irecv[1];
832     if (mem)     *mem     = irecv[2];
833   } else if (flag == MAT_GLOBAL_SUM) {
834     MPI_Allreduce( isend, irecv,3,MPI_INT,MPI_SUM,matin->comm);
835     if (nz)      *nz      = irecv[0];
836     if (nzalloc) *nzalloc = irecv[1];
837     if (mem)     *mem     = irecv[2];
838   }
839   return 0;
840 }
841 
842 extern int MatLUFactorSymbolic_MPIAIJ(Mat,IS,IS,double,Mat*);
843 extern int MatLUFactorNumeric_MPIAIJ(Mat,Mat*);
844 extern int MatLUFactor_MPIAIJ(Mat,IS,IS,double);
845 extern int MatILUFactorSymbolic_MPIAIJ(Mat,IS,IS,double,int,Mat *);
846 extern int MatSolve_MPIAIJ(Mat,Vec,Vec);
847 extern int MatSolveAdd_MPIAIJ(Mat,Vec,Vec,Vec);
848 extern int MatSolveTrans_MPIAIJ(Mat,Vec,Vec);
849 extern int MatSolveTransAdd_MPIAIJ(Mat,Vec,Vec,Vec);
850 
851 static int MatSetOption_MPIAIJ(Mat A,MatOption op)
852 {
853   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
854 
855   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
856       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
857       op == MAT_COLUMNS_SORTED ||
858       op == MAT_ROW_ORIENTED) {
859         MatSetOption(a->A,op);
860         MatSetOption(a->B,op);
861   }
862   else if (op == MAT_ROWS_SORTED ||
863            op == MAT_SYMMETRIC ||
864            op == MAT_STRUCTURALLY_SYMMETRIC ||
865            op == MAT_YES_NEW_DIAGONALS)
866     PLogInfo(A,"Info:MatSetOption_MPIAIJ:Option ignored\n");
867   else if (op == MAT_COLUMN_ORIENTED) {
868     a->roworiented = 0;
869     MatSetOption(a->A,op);
870     MatSetOption(a->B,op);
871   }
872   else if (op == MAT_NO_NEW_DIAGONALS)
873     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIAIJ:MAT_NO_NEW_DIAGONALS");}
874   else
875     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIAIJ:unknown option");}
876   return 0;
877 }
878 
879 static int MatGetSize_MPIAIJ(Mat matin,int *m,int *n)
880 {
881   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
882   *m = mat->M; *n = mat->N;
883   return 0;
884 }
885 
886 static int MatGetLocalSize_MPIAIJ(Mat matin,int *m,int *n)
887 {
888   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
889   *m = mat->m; *n = mat->N;
890   return 0;
891 }
892 
893 static int MatGetOwnershipRange_MPIAIJ(Mat matin,int *m,int *n)
894 {
895   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
896   *m = mat->rstart; *n = mat->rend;
897   return 0;
898 }
899 
900 extern int MatGetRow_SeqAIJ(Mat,int,int*,int**,Scalar**);
901 extern int MatRestoreRow_SeqAIJ(Mat,int,int*,int**,Scalar**);
902 
903 int MatGetRow_MPIAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
904 {
905   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
906   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
907   int        i, ierr, *cworkA, *cworkB, **pcA, **pcB, cstart = mat->cstart;
908   int        nztot, nzA, nzB, lrow, rstart = mat->rstart, rend = mat->rend;
909   int        *cmap, *idx_p;
910 
911   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,"MatGetRow_MPIAIJ:Already active");
912   mat->getrowactive = PETSC_TRUE;
913 
914   if (!mat->rowvalues && (idx || v)) {
915     /*
916         allocate enough space to hold information from the longest row.
917     */
918     Mat_SeqAIJ *Aa = (Mat_SeqAIJ *) mat->A->data,*Ba = (Mat_SeqAIJ *) mat->B->data;
919     int     max = 1,m = mat->m,tmp;
920     for ( i=0; i<m; i++ ) {
921       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
922       if (max < tmp) { max = tmp; }
923     }
924     mat->rowvalues = (Scalar *) PetscMalloc( max*(sizeof(int)+sizeof(Scalar)));
925     CHKPTRQ(mat->rowvalues);
926     mat->rowindices = (int *) (mat->rowvalues + max);
927   }
928 
929   if (row < rstart || row >= rend) SETERRQ(1,"MatGetRow_MPIAIJ:Only local rows")
930   lrow = row - rstart;
931 
932   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
933   if (!v)   {pvA = 0; pvB = 0;}
934   if (!idx) {pcA = 0; if (!v) pcB = 0;}
935   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
936   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
937   nztot = nzA + nzB;
938 
939   cmap  = mat->garray;
940   if (v  || idx) {
941     if (nztot) {
942       /* Sort by increasing column numbers, assuming A and B already sorted */
943       int imark = -1;
944       if (v) {
945         *v = v_p = mat->rowvalues;
946         for ( i=0; i<nzB; i++ ) {
947           if (cmap[cworkB[i]] < cstart)   v_p[i] = vworkB[i];
948           else break;
949         }
950         imark = i;
951         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
952         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
953       }
954       if (idx) {
955         *idx = idx_p = mat->rowindices;
956         if (imark > -1) {
957           for ( i=0; i<imark; i++ ) {
958             idx_p[i] = cmap[cworkB[i]];
959           }
960         } else {
961           for ( i=0; i<nzB; i++ ) {
962             if (cmap[cworkB[i]] < cstart)   idx_p[i] = cmap[cworkB[i]];
963             else break;
964           }
965           imark = i;
966         }
967         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart + cworkA[i];
968         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]];
969       }
970     }
971     else {
972       if (idx) *idx = 0;
973       if (v)   *v   = 0;
974     }
975   }
976   *nz = nztot;
977   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
978   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
979   return 0;
980 }
981 
982 int MatRestoreRow_MPIAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
983 {
984   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
985   if (aij->getrowactive == PETSC_FALSE) {
986     SETERRQ(1,"MatRestoreRow_MPIAIJ:MatGetRow not called");
987   }
988   aij->getrowactive = PETSC_FALSE;
989   return 0;
990 }
991 
992 static int MatNorm_MPIAIJ(Mat mat,NormType type,double *norm)
993 {
994   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
995   Mat_SeqAIJ *amat = (Mat_SeqAIJ*) aij->A->data, *bmat = (Mat_SeqAIJ*) aij->B->data;
996   int        ierr, i, j, cstart = aij->cstart,shift = amat->indexshift;
997   double     sum = 0.0;
998   Scalar     *v;
999 
1000   if (aij->size == 1) {
1001     ierr =  MatNorm(aij->A,type,norm); CHKERRQ(ierr);
1002   } else {
1003     if (type == NORM_FROBENIUS) {
1004       v = amat->a;
1005       for (i=0; i<amat->nz; i++ ) {
1006 #if defined(PETSC_COMPLEX)
1007         sum += real(conj(*v)*(*v)); v++;
1008 #else
1009         sum += (*v)*(*v); v++;
1010 #endif
1011       }
1012       v = bmat->a;
1013       for (i=0; i<bmat->nz; i++ ) {
1014 #if defined(PETSC_COMPLEX)
1015         sum += real(conj(*v)*(*v)); v++;
1016 #else
1017         sum += (*v)*(*v); v++;
1018 #endif
1019       }
1020       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
1021       *norm = sqrt(*norm);
1022     }
1023     else if (type == NORM_1) { /* max column norm */
1024       double *tmp, *tmp2;
1025       int    *jj, *garray = aij->garray;
1026       tmp  = (double *) PetscMalloc( aij->N*sizeof(double) ); CHKPTRQ(tmp);
1027       tmp2 = (double *) PetscMalloc( aij->N*sizeof(double) ); CHKPTRQ(tmp2);
1028       PetscMemzero(tmp,aij->N*sizeof(double));
1029       *norm = 0.0;
1030       v = amat->a; jj = amat->j;
1031       for ( j=0; j<amat->nz; j++ ) {
1032         tmp[cstart + *jj++ + shift] += PetscAbsScalar(*v);  v++;
1033       }
1034       v = bmat->a; jj = bmat->j;
1035       for ( j=0; j<bmat->nz; j++ ) {
1036         tmp[garray[*jj++ + shift]] += PetscAbsScalar(*v); v++;
1037       }
1038       MPI_Allreduce(tmp,tmp2,aij->N,MPI_DOUBLE,MPI_SUM,mat->comm);
1039       for ( j=0; j<aij->N; j++ ) {
1040         if (tmp2[j] > *norm) *norm = tmp2[j];
1041       }
1042       PetscFree(tmp); PetscFree(tmp2);
1043     }
1044     else if (type == NORM_INFINITY) { /* max row norm */
1045       double ntemp = 0.0;
1046       for ( j=0; j<amat->m; j++ ) {
1047         v = amat->a + amat->i[j] + shift;
1048         sum = 0.0;
1049         for ( i=0; i<amat->i[j+1]-amat->i[j]; i++ ) {
1050           sum += PetscAbsScalar(*v); v++;
1051         }
1052         v = bmat->a + bmat->i[j] + shift;
1053         for ( i=0; i<bmat->i[j+1]-bmat->i[j]; i++ ) {
1054           sum += PetscAbsScalar(*v); v++;
1055         }
1056         if (sum > ntemp) ntemp = sum;
1057       }
1058       MPI_Allreduce(&ntemp,norm,1,MPI_DOUBLE,MPI_MAX,mat->comm);
1059     }
1060     else {
1061       SETERRQ(1,"MatNorm_MPIAIJ:No support for two norm");
1062     }
1063   }
1064   return 0;
1065 }
1066 
1067 static int MatTranspose_MPIAIJ(Mat A,Mat *matout)
1068 {
1069   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
1070   Mat_SeqAIJ *Aloc = (Mat_SeqAIJ *) a->A->data;
1071   int        ierr,shift = Aloc->indexshift;
1072   Mat        B;
1073   int        M = a->M, N = a->N,m,*ai,*aj,row,*cols,i,*ct;
1074   Scalar     *array;
1075 
1076   if (matout == PETSC_NULL && M != N)
1077     SETERRQ(1,"MatTranspose_MPIAIJ:Square matrix only for in-place");
1078   ierr = MatCreateMPIAIJ(A->comm,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,
1079          PETSC_NULL,&B); CHKERRQ(ierr);
1080 
1081   /* copy over the A part */
1082   Aloc = (Mat_SeqAIJ*) a->A->data;
1083   m = Aloc->m; ai = Aloc->i; aj = Aloc->j; array = Aloc->a;
1084   row = a->rstart;
1085   for ( i=0; i<ai[m]+shift; i++ ) {aj[i] += a->cstart + shift;}
1086   for ( i=0; i<m; i++ ) {
1087     ierr = MatSetValues(B,ai[i+1]-ai[i],aj,1,&row,array,INSERT_VALUES);CHKERRQ(ierr);
1088     row++; array += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
1089   }
1090   aj = Aloc->j;
1091   for ( i=0; i<ai[m]+shift; i++ ) {aj[i] -= a->cstart + shift;}
1092 
1093   /* copy over the B part */
1094   Aloc = (Mat_SeqAIJ*) a->B->data;
1095   m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; array = Aloc->a;
1096   row = a->rstart;
1097   ct = cols = (int *) PetscMalloc( (1+ai[m]-shift)*sizeof(int) ); CHKPTRQ(cols);
1098   for ( i=0; i<ai[m]+shift; i++ ) {cols[i] = a->garray[aj[i]+shift];}
1099   for ( i=0; i<m; i++ ) {
1100     ierr = MatSetValues(B,ai[i+1]-ai[i],cols,1,&row,array,INSERT_VALUES);CHKERRQ(ierr);
1101     row++; array += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
1102   }
1103   PetscFree(ct);
1104   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1105   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1106   if (matout != PETSC_NULL) {
1107     *matout = B;
1108   } else {
1109     /* This isn't really an in-place transpose .... but free data structures from a */
1110     PetscFree(a->rowners);
1111     ierr = MatDestroy(a->A); CHKERRQ(ierr);
1112     ierr = MatDestroy(a->B); CHKERRQ(ierr);
1113     if (a->colmap) PetscFree(a->colmap);
1114     if (a->garray) PetscFree(a->garray);
1115     if (a->lvec) VecDestroy(a->lvec);
1116     if (a->Mvctx) VecScatterDestroy(a->Mvctx);
1117     PetscFree(a);
1118     PetscMemcpy(A,B,sizeof(struct _Mat));
1119     PetscHeaderDestroy(B);
1120   }
1121   return 0;
1122 }
1123 
1124 extern int MatPrintHelp_SeqAIJ(Mat);
1125 static int MatPrintHelp_MPIAIJ(Mat A)
1126 {
1127   Mat_MPIAIJ *a   = (Mat_MPIAIJ*) A->data;
1128 
1129   if (!a->rank) return MatPrintHelp_SeqAIJ(a->A);
1130   else return 0;
1131 }
1132 
1133 static int MatGetBlockSize_MPIAIJ(Mat A,int *bs)
1134 {
1135   *bs = 1;
1136   return 0;
1137 }
1138 
1139 extern int MatConvert_MPIAIJ(Mat,MatType,Mat *);
1140 static int MatConvertSameType_MPIAIJ(Mat,Mat *,int);
1141 extern int MatIncreaseOverlap_MPIAIJ(Mat , int, IS *, int);
1142 int MatGetSubMatrices_MPIAIJ (Mat ,int , IS *,IS *,MatGetSubMatrixCall,Mat **);
1143 /* -------------------------------------------------------------------*/
1144 static struct _MatOps MatOps = {MatSetValues_MPIAIJ,
1145        MatGetRow_MPIAIJ,MatRestoreRow_MPIAIJ,
1146        MatMult_MPIAIJ,MatMultAdd_MPIAIJ,
1147        MatMultTrans_MPIAIJ,MatMultTransAdd_MPIAIJ,
1148        MatSolve_MPIAIJ,MatSolveAdd_MPIAIJ,
1149        MatSolveTrans_MPIAIJ,MatSolveTransAdd_MPIAIJ,
1150        MatLUFactor_MPIAIJ,0,
1151        MatRelax_MPIAIJ,
1152        MatTranspose_MPIAIJ,
1153        MatGetInfo_MPIAIJ,0,
1154        MatGetDiagonal_MPIAIJ,0,MatNorm_MPIAIJ,
1155        MatAssemblyBegin_MPIAIJ,MatAssemblyEnd_MPIAIJ,
1156        0,
1157        MatSetOption_MPIAIJ,MatZeroEntries_MPIAIJ,MatZeroRows_MPIAIJ,
1158        MatGetReordering_MPIAIJ,
1159        MatLUFactorSymbolic_MPIAIJ,MatLUFactorNumeric_MPIAIJ,0,0,
1160        MatGetSize_MPIAIJ,MatGetLocalSize_MPIAIJ,MatGetOwnershipRange_MPIAIJ,
1161        MatILUFactorSymbolic_MPIAIJ,0,
1162        0,0,MatConvert_MPIAIJ,MatConvertSameType_MPIAIJ,0,0,
1163        0,0,0,
1164        MatGetSubMatrices_MPIAIJ,MatIncreaseOverlap_MPIAIJ,MatGetValues_MPIAIJ,0,
1165        MatPrintHelp_MPIAIJ,
1166        MatScale_MPIAIJ,0,0,0,MatGetBlockSize_MPIAIJ};
1167 
1168 /*@C
1169    MatCreateMPIAIJ - Creates a sparse parallel matrix in AIJ format
1170    (the default parallel PETSc format).  For good matrix assembly performance
1171    the user should preallocate the matrix storage by setting the parameters
1172    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1173    performance can be increased by more than a factor of 50.
1174 
1175    Input Parameters:
1176 .  comm - MPI communicator
1177 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1178            This value should be the same as the local size used in creating the
1179            y vector for the matrix-vector product y = Ax.
1180 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1181            This value should be the same as the local size used in creating the
1182            x vector for the matrix-vector product y = Ax.
1183 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1184 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1185 .  d_nz - number of nonzeros per row in diagonal portion of local submatrix
1186            (same for all local rows)
1187 .  d_nzz - array containing the number of nonzeros in the various rows of the
1188            diagonal portion of the local submatrix (possibly different for each row)
1189            or PETSC_NULL. You must leave room for the diagonal entry even if
1190            it is zero.
1191 .  o_nz - number of nonzeros per row in the off-diagonal portion of local
1192            submatrix (same for all local rows).
1193 .  o_nzz - array containing the number of nonzeros in the various rows of the
1194            off-diagonal portion of the local submatrix (possibly different for
1195            each row) or PETSC_NULL.
1196 
1197    Output Parameter:
1198 .  A - the matrix
1199 
1200    Notes:
1201    The AIJ format (also called the Yale sparse matrix format or
1202    compressed row storage), is fully compatible with standard Fortran 77
1203    storage.  That is, the stored row and column indices can begin at
1204    either one (as in Fortran) or zero.  See the users manual for details.
1205 
1206    The user MUST specify either the local or global matrix dimensions
1207    (possibly both).
1208 
1209    By default, this format uses inodes (identical nodes) when possible.
1210    We search for consecutive rows with the same nonzero structure, thereby
1211    reusing matrix information to achieve increased efficiency.
1212 
1213    Options Database Keys:
1214 $    -mat_aij_no_inode  - Do not use inodes
1215 $    -mat_aij_inode_limit <limit> - Set inode limit.
1216 $        (max limit=5)
1217 $    -mat_aij_oneindex - Internally use indexing starting at 1
1218 $        rather than 0.  Note: When calling MatSetValues(),
1219 $        the user still MUST index entries starting at 0!
1220 
1221    Storage Information:
1222    For a square global matrix we define each processor's diagonal portion
1223    to be its local rows and the corresponding columns (a square submatrix);
1224    each processor's off-diagonal portion encompasses the remainder of the
1225    local matrix (a rectangular submatrix).
1226 
1227    The user can specify preallocated storage for the diagonal part of
1228    the local submatrix with either d_nz or d_nnz (not both).  Set
1229    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1230    memory allocation.  Likewise, specify preallocated storage for the
1231    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1232 
1233    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1234    the figure below we depict these three local rows and all columns (0-11).
1235 
1236 $          0 1 2 3 4 5 6 7 8 9 10 11
1237 $         -------------------
1238 $  row 3  |  o o o d d d o o o o o o
1239 $  row 4  |  o o o d d d o o o o o o
1240 $  row 5  |  o o o d d d o o o o o o
1241 $         -------------------
1242 $
1243 
1244    Thus, any entries in the d locations are stored in the d (diagonal)
1245    submatrix, and any entries in the o locations are stored in the
1246    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1247    stored simply in the MATSEQAIJ format for compressed row storage.
1248 
1249    Now d_nz should indicate the number of nonzeros per row in the d matrix,
1250    and o_nz should indicate the number of nonzeros per row in the o matrix.
1251    In general, for PDE problems in which most nonzeros are near the diagonal,
1252    one expects d_nz >> o_nz. For large problems you MUST preallocate memory
1253    or you will get TERRIBLE performance; see the users' manual chapter on
1254    matrices and the file $(PETSC_DIR)/Performance.
1255 
1256 .keywords: matrix, aij, compressed row, sparse, parallel
1257 
1258 .seealso: MatCreate(), MatCreateSeqAIJ(), MatSetValues()
1259 @*/
1260 int MatCreateMPIAIJ(MPI_Comm comm,int m,int n,int M,int N,
1261                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1262 {
1263   Mat          B;
1264   Mat_MPIAIJ   *b;
1265   int          ierr, i,sum[2],work[2];
1266 
1267   *A = 0;
1268   PetscHeaderCreate(B,_Mat,MAT_COOKIE,MATMPIAIJ,comm);
1269   PLogObjectCreate(B);
1270   B->data       = (void *) (b = PetscNew(Mat_MPIAIJ)); CHKPTRQ(b);
1271   PetscMemzero(b,sizeof(Mat_MPIAIJ));
1272   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1273   B->destroy    = MatDestroy_MPIAIJ;
1274   B->view       = MatView_MPIAIJ;
1275   B->factor     = 0;
1276   B->assembled  = PETSC_FALSE;
1277 
1278   b->insertmode = NOT_SET_VALUES;
1279   MPI_Comm_rank(comm,&b->rank);
1280   MPI_Comm_size(comm,&b->size);
1281 
1282   if (m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
1283     SETERRQ(1,"MatCreateMPIAIJ:Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1284 
1285   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1286     work[0] = m; work[1] = n;
1287     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
1288     if (M == PETSC_DECIDE) M = sum[0];
1289     if (N == PETSC_DECIDE) N = sum[1];
1290   }
1291   if (m == PETSC_DECIDE) {m = M/b->size + ((M % b->size) > b->rank);}
1292   if (n == PETSC_DECIDE) {n = N/b->size + ((N % b->size) > b->rank);}
1293   b->m = m; B->m = m;
1294   b->n = n; B->n = n;
1295   b->N = N; B->N = N;
1296   b->M = M; B->M = M;
1297 
1298   /* build local table of row and column ownerships */
1299   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1300   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIAIJ));
1301   b->cowners = b->rowners + b->size + 2;
1302   MPI_Allgather(&m,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
1303   b->rowners[0] = 0;
1304   for ( i=2; i<=b->size; i++ ) {
1305     b->rowners[i] += b->rowners[i-1];
1306   }
1307   b->rstart = b->rowners[b->rank];
1308   b->rend   = b->rowners[b->rank+1];
1309   MPI_Allgather(&n,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
1310   b->cowners[0] = 0;
1311   for ( i=2; i<=b->size; i++ ) {
1312     b->cowners[i] += b->cowners[i-1];
1313   }
1314   b->cstart = b->cowners[b->rank];
1315   b->cend   = b->cowners[b->rank+1];
1316 
1317   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1318   ierr = MatCreateSeqAIJ(MPI_COMM_SELF,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1319   PLogObjectParent(B,b->A);
1320   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1321   ierr = MatCreateSeqAIJ(MPI_COMM_SELF,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1322   PLogObjectParent(B,b->B);
1323 
1324   /* build cache for off array entries formed */
1325   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1326   b->colmap      = 0;
1327   b->garray      = 0;
1328   b->roworiented = 1;
1329 
1330   /* stuff used for matrix vector multiply */
1331   b->lvec      = 0;
1332   b->Mvctx     = 0;
1333 
1334   /* stuff for MatGetRow() */
1335   b->rowindices   = 0;
1336   b->rowvalues    = 0;
1337   b->getrowactive = PETSC_FALSE;
1338 
1339   *A = B;
1340   return 0;
1341 }
1342 
1343 static int MatConvertSameType_MPIAIJ(Mat matin,Mat *newmat,int cpvalues)
1344 {
1345   Mat        mat;
1346   Mat_MPIAIJ *a,*oldmat = (Mat_MPIAIJ *) matin->data;
1347   int        ierr, len=0, flg;
1348 
1349   *newmat       = 0;
1350   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIAIJ,matin->comm);
1351   PLogObjectCreate(mat);
1352   mat->data       = (void *) (a = PetscNew(Mat_MPIAIJ)); CHKPTRQ(a);
1353   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1354   mat->destroy    = MatDestroy_MPIAIJ;
1355   mat->view       = MatView_MPIAIJ;
1356   mat->factor     = matin->factor;
1357   mat->assembled  = PETSC_TRUE;
1358 
1359   a->m = mat->m   = oldmat->m;
1360   a->n = mat->n   = oldmat->n;
1361   a->M = mat->M   = oldmat->M;
1362   a->N = mat->N   = oldmat->N;
1363 
1364   a->rstart       = oldmat->rstart;
1365   a->rend         = oldmat->rend;
1366   a->cstart       = oldmat->cstart;
1367   a->cend         = oldmat->cend;
1368   a->size         = oldmat->size;
1369   a->rank         = oldmat->rank;
1370   a->insertmode   = NOT_SET_VALUES;
1371   a->rowvalues    = 0;
1372   a->getrowactive = PETSC_FALSE;
1373 
1374   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1375   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIAIJ));
1376   a->cowners = a->rowners + a->size + 2;
1377   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1378   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1379   if (oldmat->colmap) {
1380     a->colmap = (int *) PetscMalloc((a->N)*sizeof(int));CHKPTRQ(a->colmap);
1381     PLogObjectMemory(mat,(a->N)*sizeof(int));
1382     PetscMemcpy(a->colmap,oldmat->colmap,(a->N)*sizeof(int));
1383   } else a->colmap = 0;
1384   if (oldmat->garray) {
1385     len = ((Mat_SeqAIJ *) (oldmat->B->data))->n;
1386     a->garray = (int *) PetscMalloc((len+1)*sizeof(int)); CHKPTRQ(a->garray);
1387     PLogObjectMemory(mat,len*sizeof(int));
1388     if (len) PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1389   } else a->garray = 0;
1390 
1391   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1392   PLogObjectParent(mat,a->lvec);
1393   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1394   PLogObjectParent(mat,a->Mvctx);
1395   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1396   PLogObjectParent(mat,a->A);
1397   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1398   PLogObjectParent(mat,a->B);
1399   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1400   if (flg) {
1401     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1402   }
1403   *newmat = mat;
1404   return 0;
1405 }
1406 
1407 #include "sys.h"
1408 
1409 int MatLoad_MPIAIJ(Viewer viewer,MatType type,Mat *newmat)
1410 {
1411   Mat          A;
1412   int          i, nz, ierr, j,rstart, rend, fd;
1413   Scalar       *vals,*svals;
1414   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1415   MPI_Status   status;
1416   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,maxnz,*cols;
1417   int          *ourlens,*sndcounts = 0,*procsnz = 0, *offlens,jj,*mycols,*smycols;
1418   int          tag = ((PetscObject)viewer)->tag;
1419 
1420   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1421   if (!rank) {
1422     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1423     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
1424     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIAIJ:not matrix object");
1425   }
1426 
1427   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1428   M = header[1]; N = header[2];
1429   /* determine ownership of all rows */
1430   m = M/size + ((M % size) > rank);
1431   rowners = (int *) PetscMalloc((size+2)*sizeof(int)); CHKPTRQ(rowners);
1432   MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1433   rowners[0] = 0;
1434   for ( i=2; i<=size; i++ ) {
1435     rowners[i] += rowners[i-1];
1436   }
1437   rstart = rowners[rank];
1438   rend   = rowners[rank+1];
1439 
1440   /* distribute row lengths to all processors */
1441   ourlens = (int*) PetscMalloc( 2*(rend-rstart)*sizeof(int) ); CHKPTRQ(ourlens);
1442   offlens = ourlens + (rend-rstart);
1443   if (!rank) {
1444     rowlengths = (int*) PetscMalloc( M*sizeof(int) ); CHKPTRQ(rowlengths);
1445     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1446     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1447     for ( i=0; i<size; i++ ) sndcounts[i] = rowners[i+1] - rowners[i];
1448     MPI_Scatterv(rowlengths,sndcounts,rowners,MPI_INT,ourlens,rend-rstart,MPI_INT,0,comm);
1449     PetscFree(sndcounts);
1450   }
1451   else {
1452     MPI_Scatterv(0,0,0,MPI_INT,ourlens,rend-rstart,MPI_INT, 0,comm);
1453   }
1454 
1455   if (!rank) {
1456     /* calculate the number of nonzeros on each processor */
1457     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1458     PetscMemzero(procsnz,size*sizeof(int));
1459     for ( i=0; i<size; i++ ) {
1460       for ( j=rowners[i]; j< rowners[i+1]; j++ ) {
1461         procsnz[i] += rowlengths[j];
1462       }
1463     }
1464     PetscFree(rowlengths);
1465 
1466     /* determine max buffer needed and allocate it */
1467     maxnz = 0;
1468     for ( i=0; i<size; i++ ) {
1469       maxnz = PetscMax(maxnz,procsnz[i]);
1470     }
1471     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1472 
1473     /* read in my part of the matrix column indices  */
1474     nz = procsnz[0];
1475     mycols = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1476     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1477 
1478     /* read in every one elses and ship off */
1479     for ( i=1; i<size; i++ ) {
1480       nz = procsnz[i];
1481       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1482       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1483     }
1484     PetscFree(cols);
1485   }
1486   else {
1487     /* determine buffer space needed for message */
1488     nz = 0;
1489     for ( i=0; i<m; i++ ) {
1490       nz += ourlens[i];
1491     }
1492     mycols = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1493 
1494     /* receive message of column indices*/
1495     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1496     MPI_Get_count(&status,MPI_INT,&maxnz);
1497     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIAIJ:something is wrong with file");
1498   }
1499 
1500   /* loop over local rows, determining number of off diagonal entries */
1501   PetscMemzero(offlens,m*sizeof(int));
1502   jj = 0;
1503   for ( i=0; i<m; i++ ) {
1504     for ( j=0; j<ourlens[i]; j++ ) {
1505       if (mycols[jj] < rstart || mycols[jj] >= rend) offlens[i]++;
1506       jj++;
1507     }
1508   }
1509 
1510   /* create our matrix */
1511   for ( i=0; i<m; i++ ) {
1512     ourlens[i] -= offlens[i];
1513   }
1514   ierr = MatCreateMPIAIJ(comm,m,PETSC_DECIDE,M,N,0,ourlens,0,offlens,newmat);CHKERRQ(ierr);
1515   A = *newmat;
1516   MatSetOption(A,MAT_COLUMNS_SORTED);
1517   for ( i=0; i<m; i++ ) {
1518     ourlens[i] += offlens[i];
1519   }
1520 
1521   if (!rank) {
1522     vals = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(vals);
1523 
1524     /* read in my part of the matrix numerical values  */
1525     nz = procsnz[0];
1526     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1527 
1528     /* insert into matrix */
1529     jj      = rstart;
1530     smycols = mycols;
1531     svals   = vals;
1532     for ( i=0; i<m; i++ ) {
1533       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1534       smycols += ourlens[i];
1535       svals   += ourlens[i];
1536       jj++;
1537     }
1538 
1539     /* read in other processors and ship out */
1540     for ( i=1; i<size; i++ ) {
1541       nz = procsnz[i];
1542       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1543       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1544     }
1545     PetscFree(procsnz);
1546   }
1547   else {
1548     /* receive numeric values */
1549     vals = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(vals);
1550 
1551     /* receive message of values*/
1552     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1553     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1554     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIAIJ:something is wrong with file");
1555 
1556     /* insert into matrix */
1557     jj      = rstart;
1558     smycols = mycols;
1559     svals   = vals;
1560     for ( i=0; i<m; i++ ) {
1561       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1562       smycols += ourlens[i];
1563       svals   += ourlens[i];
1564       jj++;
1565     }
1566   }
1567   PetscFree(ourlens); PetscFree(vals); PetscFree(mycols); PetscFree(rowners);
1568 
1569   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1570   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1571   return 0;
1572 }
1573