xref: /petsc/src/mat/impls/aij/mpi/mpiaij.c (revision 416022c9818a71eecdf06d41c1abd586feaab60e)
1 #ifndef lint
2 static char vcid[] = "$Id: mpiaij.c,v 1.80 1995/09/21 20:10:26 bsmith Exp bsmith $";
3 #endif
4 
5 #include "mpiaij.h"
6 #include "vec/vecimpl.h"
7 #include "inline/spops.h"
8 
9 /* local utility routine that creates a mapping from the global column
10 number to the local number in the off-diagonal part of the local
11 storage of the matrix.  This is done in a non scable way since the
12 length of colmap equals the global matrix length.
13 */
14 static int CreateColmap_Private(Mat mat)
15 {
16   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
17   Mat_SeqAIJ *B = (Mat_SeqAIJ*) aij->B->data;
18   int        n = B->n,i,shift = B->indexshift;
19 
20   aij->colmap = (int *) PETSCMALLOC(aij->N*sizeof(int));CHKPTRQ(aij->colmap);
21   PLogObjectMemory(mat,aij->N*sizeof(int));
22   PetscZero(aij->colmap,aij->N*sizeof(int));
23   for ( i=0; i<n; i++ ) aij->colmap[aij->garray[i]] = i-shift;
24   return 0;
25 }
26 
27 extern int DisAssemble_MPIAIJ(Mat);
28 
29 static int MatGetReordering_MPIAIJ(Mat mat,MatOrdering type,IS *rperm,IS *cperm)
30 {
31   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
32   int ierr;
33   if (aij->numtids == 1) {
34     ierr = MatGetReordering(aij->A,type,rperm,cperm); CHKERRQ(ierr);
35   } else SETERRQ(1,"MatGetReordering_MPIAIJ:not yet supported in parallel");
36   return 0;
37 }
38 
39 static int MatSetValues_MPIAIJ(Mat mat,int m,int *idxm,int n,
40                                int *idxn,Scalar *v,InsertMode addv)
41 {
42   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
43   Mat_SeqAIJ *C = (Mat_SeqAIJ*) aij->A->data;
44   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
45   int        cstart = aij->cstart, cend = aij->cend,row,col;
46   int        shift = C->indexshift;
47 
48   if (aij->insertmode != NOTSETVALUES && aij->insertmode != addv) {
49     SETERRQ(1,"MatSetValues_MPIAIJ:You cannot mix inserts and adds");
50   }
51   aij->insertmode = addv;
52   for ( i=0; i<m; i++ ) {
53     if (idxm[i] < 0) SETERRQ(1,"MatSetValues_MPIAIJ:Negative row");
54     if (idxm[i] >= aij->M) SETERRQ(1,"MatSetValues_MPIAIJ:Row too large");
55     if (idxm[i] >= rstart && idxm[i] < rend) {
56       row = idxm[i] - rstart;
57       for ( j=0; j<n; j++ ) {
58         if (idxn[j] < 0) SETERRQ(1,"MatSetValues_MPIAIJ:Negative column");
59         if (idxn[j] >= aij->N) SETERRQ(1,"MatSetValues_MPIAIJ:Col too large");
60         if (idxn[j] >= cstart && idxn[j] < cend){
61           col = idxn[j] - cstart;
62           ierr = MatSetValues(aij->A,1,&row,1,&col,v+i*n+j,addv);CHKERRQ(ierr);
63         }
64         else {
65           if (aij->assembled) {
66             if (!aij->colmap) {ierr = CreateColmap_Private(mat);CHKERRQ(ierr);}
67             col = aij->colmap[idxn[j]] + shift;
68             if (col < 0 && !((Mat_SeqAIJ*)(aij->A->data))->nonew) {
69               ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
70               col =  idxn[j];
71             }
72           }
73           else col = idxn[j];
74           ierr = MatSetValues(aij->B,1,&row,1,&col,v+i*n+j,addv);CHKERRQ(ierr);
75         }
76       }
77     }
78     else {
79       ierr = StashValues_Private(&aij->stash,idxm[i],n,idxn,v+i*n,addv);CHKERRQ(ierr);
80     }
81   }
82   return 0;
83 }
84 
85 static int MatAssemblyBegin_MPIAIJ(Mat mat,MatAssemblyType mode)
86 {
87   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
88   MPI_Comm    comm = mat->comm;
89   int         numtids = aij->numtids, *owners = aij->rowners;
90   int         mytid = aij->mytid,tag = mat->tag, *owner,*starts,count,ierr;
91   MPI_Request *send_waits,*recv_waits;
92   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
93   InsertMode  addv;
94   Scalar      *rvalues,*svalues;
95 
96   /* make sure all processors are either in INSERTMODE or ADDMODE */
97   MPI_Allreduce(&aij->insertmode,(void *) &addv,1,MPI_INT,MPI_BOR,comm);
98   if (addv == (ADD_VALUES|INSERT_VALUES)) {
99     SETERRQ(1,"MatAssemblyBegin_MPIAIJ:Some processors inserted others added");
100   }
101   aij->insertmode = addv; /* in case this processor had no cache */
102 
103   /*  first count number of contributors to each processor */
104   nprocs = (int *) PETSCMALLOC( 2*numtids*sizeof(int) ); CHKPTRQ(nprocs);
105   PetscZero(nprocs,2*numtids*sizeof(int)); procs = nprocs + numtids;
106   owner = (int *) PETSCMALLOC( (aij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
107   for ( i=0; i<aij->stash.n; i++ ) {
108     idx = aij->stash.idx[i];
109     for ( j=0; j<numtids; j++ ) {
110       if (idx >= owners[j] && idx < owners[j+1]) {
111         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
112       }
113     }
114   }
115   nsends = 0;  for ( i=0; i<numtids; i++ ) { nsends += procs[i];}
116 
117   /* inform other processors of number of messages and max length*/
118   work = (int *) PETSCMALLOC( numtids*sizeof(int) ); CHKPTRQ(work);
119   MPI_Allreduce((void *) procs,(void *) work,numtids,MPI_INT,MPI_SUM,comm);
120   nreceives = work[mytid];
121   MPI_Allreduce((void *) nprocs,(void *) work,numtids,MPI_INT,MPI_MAX,comm);
122   nmax = work[mytid];
123   PETSCFREE(work);
124 
125   /* post receives:
126        1) each message will consist of ordered pairs
127      (global index,value) we store the global index as a double
128      to simplify the message passing.
129        2) since we don't know how long each individual message is we
130      allocate the largest needed buffer for each receive. Potentially
131      this is a lot of wasted space.
132 
133 
134        This could be done better.
135   */
136   rvalues = (Scalar *) PETSCMALLOC(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
137   CHKPTRQ(rvalues);
138   recv_waits = (MPI_Request *) PETSCMALLOC((nreceives+1)*sizeof(MPI_Request));
139   CHKPTRQ(recv_waits);
140   for ( i=0; i<nreceives; i++ ) {
141     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
142               comm,recv_waits+i);
143   }
144 
145   /* do sends:
146       1) starts[i] gives the starting index in svalues for stuff going to
147          the ith processor
148   */
149   svalues = (Scalar *) PETSCMALLOC(3*(aij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
150   send_waits = (MPI_Request *) PETSCMALLOC( (nsends+1)*sizeof(MPI_Request));
151   CHKPTRQ(send_waits);
152   starts = (int *) PETSCMALLOC( numtids*sizeof(int) ); CHKPTRQ(starts);
153   starts[0] = 0;
154   for ( i=1; i<numtids; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
155   for ( i=0; i<aij->stash.n; i++ ) {
156     svalues[3*starts[owner[i]]]       = (Scalar)  aij->stash.idx[i];
157     svalues[3*starts[owner[i]]+1]     = (Scalar)  aij->stash.idy[i];
158     svalues[3*(starts[owner[i]]++)+2] =  aij->stash.array[i];
159   }
160   PETSCFREE(owner);
161   starts[0] = 0;
162   for ( i=1; i<numtids; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
163   count = 0;
164   for ( i=0; i<numtids; i++ ) {
165     if (procs[i]) {
166       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
167                 comm,send_waits+count++);
168     }
169   }
170   PETSCFREE(starts); PETSCFREE(nprocs);
171 
172   /* Free cache space */
173   ierr = StashDestroy_Private(&aij->stash); CHKERRQ(ierr);
174 
175   aij->svalues    = svalues;    aij->rvalues    = rvalues;
176   aij->nsends     = nsends;     aij->nrecvs     = nreceives;
177   aij->send_waits = send_waits; aij->recv_waits = recv_waits;
178   aij->rmax       = nmax;
179 
180   return 0;
181 }
182 extern int MatSetUpMultiply_MPIAIJ(Mat);
183 
184 static int MatAssemblyEnd_MPIAIJ(Mat mat,MatAssemblyType mode)
185 {
186   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
187   Mat_SeqAIJ *C = (Mat_SeqAIJ *) aij->A->data;
188   MPI_Status  *send_status,recv_status;
189   int         imdex,nrecvs = aij->nrecvs, count = nrecvs, i, n, ierr;
190   int         row,col,other_disassembled,shift = C->indexshift;
191   Scalar      *values,val;
192   InsertMode  addv = aij->insertmode;
193 
194   /*  wait on receives */
195   while (count) {
196     MPI_Waitany(nrecvs,aij->recv_waits,&imdex,&recv_status);
197     /* unpack receives into our local space */
198     values = aij->rvalues + 3*imdex*aij->rmax;
199     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
200     n = n/3;
201     for ( i=0; i<n; i++ ) {
202       row = (int) PETSCREAL(values[3*i]) - aij->rstart;
203       col = (int) PETSCREAL(values[3*i+1]);
204       val = values[3*i+2];
205       if (col >= aij->cstart && col < aij->cend) {
206         col -= aij->cstart;
207         MatSetValues(aij->A,1,&row,1,&col,&val,addv);
208       }
209       else {
210         if (aij->assembled) {
211           if (!aij->colmap) {ierr = CreateColmap_Private(mat);CHKERRQ(ierr);}
212           col = aij->colmap[col] + shift;
213           if (col < 0  && !((Mat_SeqAIJ*)(aij->A->data))->nonew) {
214             ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
215             col = (int) PETSCREAL(values[3*i+1]);
216           }
217         }
218         MatSetValues(aij->B,1,&row,1,&col,&val,addv);
219       }
220     }
221     count--;
222   }
223   PETSCFREE(aij->recv_waits); PETSCFREE(aij->rvalues);
224 
225   /* wait on sends */
226   if (aij->nsends) {
227     send_status = (MPI_Status *) PETSCMALLOC(aij->nsends*sizeof(MPI_Status));
228     CHKPTRQ(send_status);
229     MPI_Waitall(aij->nsends,aij->send_waits,send_status);
230     PETSCFREE(send_status);
231   }
232   PETSCFREE(aij->send_waits); PETSCFREE(aij->svalues);
233 
234   aij->insertmode = NOTSETVALUES;
235   ierr = MatAssemblyBegin(aij->A,mode); CHKERRQ(ierr);
236   ierr = MatAssemblyEnd(aij->A,mode); CHKERRQ(ierr);
237 
238   /* determine if any processor has disassembled, if so we must
239      also disassemble ourselfs, in order that we may reassemble. */
240   MPI_Allreduce(&aij->assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
241   if (aij->assembled && !other_disassembled) {
242     ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
243   }
244 
245   if (!aij->assembled && mode == FINAL_ASSEMBLY) {
246     ierr = MatSetUpMultiply_MPIAIJ(mat); CHKERRQ(ierr);
247     aij->assembled = 1;
248   }
249   ierr = MatAssemblyBegin(aij->B,mode); CHKERRQ(ierr);
250   ierr = MatAssemblyEnd(aij->B,mode); CHKERRQ(ierr);
251 
252   return 0;
253 }
254 
255 static int MatZeroEntries_MPIAIJ(Mat A)
256 {
257   Mat_MPIAIJ *l = (Mat_MPIAIJ *) A->data;
258   int        ierr;
259   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
260   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
261   return 0;
262 }
263 
264 /* the code does not do the diagonal entries correctly unless the
265    matrix is square and the column and row owerships are identical.
266    This is a BUG. The only way to fix it seems to be to access
267    aij->A and aij->B directly and not through the MatZeroRows()
268    routine.
269 */
270 static int MatZeroRows_MPIAIJ(Mat A,IS is,Scalar *diag)
271 {
272   Mat_MPIAIJ     *l = (Mat_MPIAIJ *) A->data;
273   int            i,ierr,N, *rows,*owners = l->rowners,numtids = l->numtids;
274   int            *procs,*nprocs,j,found,idx,nsends,*work;
275   int            nmax,*svalues,*starts,*owner,nrecvs,mytid = l->mytid;
276   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
277   int            *lens,imdex,*lrows,*values;
278   MPI_Comm       comm = A->comm;
279   MPI_Request    *send_waits,*recv_waits;
280   MPI_Status     recv_status,*send_status;
281   IS             istmp;
282 
283   if (!l->assembled) SETERRQ(1,"MatZeroRows_MPIAIJ:Must assemble matrix first");
284   ierr = ISGetLocalSize(is,&N); CHKERRQ(ierr);
285   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
286 
287   /*  first count number of contributors to each processor */
288   nprocs = (int *) PETSCMALLOC( 2*numtids*sizeof(int) ); CHKPTRQ(nprocs);
289   PetscZero(nprocs,2*numtids*sizeof(int)); procs = nprocs + numtids;
290   owner = (int *) PETSCMALLOC((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
291   for ( i=0; i<N; i++ ) {
292     idx = rows[i];
293     found = 0;
294     for ( j=0; j<numtids; j++ ) {
295       if (idx >= owners[j] && idx < owners[j+1]) {
296         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
297       }
298     }
299     if (!found) SETERRQ(1,"MatZeroRows_MPIAIJ:Index out of range");
300   }
301   nsends = 0;  for ( i=0; i<numtids; i++ ) { nsends += procs[i];}
302 
303   /* inform other processors of number of messages and max length*/
304   work = (int *) PETSCMALLOC( numtids*sizeof(int) ); CHKPTRQ(work);
305   MPI_Allreduce((void *) procs,(void *) work,numtids,MPI_INT,MPI_SUM,comm);
306   nrecvs = work[mytid];
307   MPI_Allreduce((void *) nprocs,(void *) work,numtids,MPI_INT,MPI_MAX,comm);
308   nmax = work[mytid];
309   PETSCFREE(work);
310 
311   /* post receives:   */
312   rvalues = (int *) PETSCMALLOC((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
313   CHKPTRQ(rvalues);
314   recv_waits = (MPI_Request *) PETSCMALLOC((nrecvs+1)*sizeof(MPI_Request));
315   CHKPTRQ(recv_waits);
316   for ( i=0; i<nrecvs; i++ ) {
317     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
318   }
319 
320   /* do sends:
321       1) starts[i] gives the starting index in svalues for stuff going to
322          the ith processor
323   */
324   svalues = (int *) PETSCMALLOC( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
325   send_waits = (MPI_Request *) PETSCMALLOC( (nsends+1)*sizeof(MPI_Request));
326   CHKPTRQ(send_waits);
327   starts = (int *) PETSCMALLOC( (numtids+1)*sizeof(int) ); CHKPTRQ(starts);
328   starts[0] = 0;
329   for ( i=1; i<numtids; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
330   for ( i=0; i<N; i++ ) {
331     svalues[starts[owner[i]]++] = rows[i];
332   }
333   ISRestoreIndices(is,&rows);
334 
335   starts[0] = 0;
336   for ( i=1; i<numtids+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
337   count = 0;
338   for ( i=0; i<numtids; i++ ) {
339     if (procs[i]) {
340       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
341     }
342   }
343   PETSCFREE(starts);
344 
345   base = owners[mytid];
346 
347   /*  wait on receives */
348   lens   = (int *) PETSCMALLOC( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
349   source = lens + nrecvs;
350   count  = nrecvs; slen = 0;
351   while (count) {
352     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
353     /* unpack receives into our local space */
354     MPI_Get_count(&recv_status,MPI_INT,&n);
355     source[imdex]  = recv_status.MPI_SOURCE;
356     lens[imdex]  = n;
357     slen += n;
358     count--;
359   }
360   PETSCFREE(recv_waits);
361 
362   /* move the data into the send scatter */
363   lrows = (int *) PETSCMALLOC( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
364   count = 0;
365   for ( i=0; i<nrecvs; i++ ) {
366     values = rvalues + i*nmax;
367     for ( j=0; j<lens[i]; j++ ) {
368       lrows[count++] = values[j] - base;
369     }
370   }
371   PETSCFREE(rvalues); PETSCFREE(lens);
372   PETSCFREE(owner); PETSCFREE(nprocs);
373 
374   /* actually zap the local rows */
375   ierr = ISCreateSeq(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
376   PLogObjectParent(A,istmp);
377   PETSCFREE(lrows);
378   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
379   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
380   ierr = ISDestroy(istmp); CHKERRQ(ierr);
381 
382   /* wait on sends */
383   if (nsends) {
384     send_status = (MPI_Status *) PETSCMALLOC(nsends*sizeof(MPI_Status));
385     CHKPTRQ(send_status);
386     MPI_Waitall(nsends,send_waits,send_status);
387     PETSCFREE(send_status);
388   }
389   PETSCFREE(send_waits); PETSCFREE(svalues);
390 
391   return 0;
392 }
393 
394 static int MatMult_MPIAIJ(Mat A,Vec xx,Vec yy)
395 {
396   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
397   int        ierr;
398 
399   if (!a->assembled) SETERRQ(1,"MatMult_MPIAIJ:must assemble matrix first");
400   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTERALL,a->Mvctx);CHKERRQ(ierr);
401   ierr = MatMult(a->A,xx,yy); CHKERRQ(ierr);
402   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTERALL,a->Mvctx);CHKERRQ(ierr);
403   ierr = MatMultAdd(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
404   return 0;
405 }
406 
407 static int MatMultAdd_MPIAIJ(Mat A,Vec xx,Vec yy,Vec zz)
408 {
409   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
410   int        ierr;
411   if (!a->assembled) SETERRQ(1,"MatMult_MPIAIJ:must assemble matrix first");
412   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTERALL,a->Mvctx);CHKERRQ(ierr);
413   ierr = MatMultAdd(a->A,xx,yy,zz); CHKERRQ(ierr);
414   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTERALL,a->Mvctx);CHKERRQ(ierr);
415   ierr = MatMultAdd(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
416   return 0;
417 }
418 
419 static int MatMultTrans_MPIAIJ(Mat A,Vec xx,Vec yy)
420 {
421   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
422   int        ierr;
423 
424   if (!a->assembled) SETERRQ(1,"MatMulTrans_MPIAIJ:must assemble matrix first");
425   /* do nondiagonal part */
426   ierr = MatMultTrans(a->B,xx,a->lvec); CHKERRQ(ierr);
427   /* send it on its way */
428   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,
429                 (ScatterMode)(SCATTERALL|SCATTERREVERSE),a->Mvctx); CHKERRQ(ierr);
430   /* do local part */
431   ierr = MatMultTrans(a->A,xx,yy); CHKERRQ(ierr);
432   /* receive remote parts: note this assumes the values are not actually */
433   /* inserted in yy until the next line, which is true for my implementation*/
434   /* but is not perhaps always true. */
435   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,
436                   (ScatterMode)(SCATTERALL|SCATTERREVERSE),a->Mvctx); CHKERRQ(ierr);
437   return 0;
438 }
439 
440 static int MatMultTransAdd_MPIAIJ(Mat A,Vec xx,Vec yy,Vec zz)
441 {
442   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
443   int        ierr;
444 
445   if (!a->assembled) SETERRQ(1,"MatMulTransAdd_MPIAIJ:must assemble matrix first");
446   /* do nondiagonal part */
447   ierr = MatMultTrans(a->B,xx,a->lvec); CHKERRQ(ierr);
448   /* send it on its way */
449   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,
450                  (ScatterMode)(SCATTERALL|SCATTERREVERSE),a->Mvctx); CHKERRQ(ierr);
451   /* do local part */
452   ierr = MatMultTransAdd(a->A,xx,yy,zz); CHKERRQ(ierr);
453   /* receive remote parts: note this assumes the values are not actually */
454   /* inserted in yy until the next line, which is true for my implementation*/
455   /* but is not perhaps always true. */
456   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,
457                   (ScatterMode)(SCATTERALL|SCATTERREVERSE),a->Mvctx); CHKERRQ(ierr);
458   return 0;
459 }
460 
461 /*
462   This only works correctly for square matrices where the subblock A->A is the
463    diagonal block
464 */
465 static int MatGetDiagonal_MPIAIJ(Mat A,Vec v)
466 {
467   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
468   if (!a->assembled) SETERRQ(1,"MatGetDiag_MPIAIJ:must assemble matrix first");
469   return MatGetDiagonal(a->A,v);
470 }
471 
472 static int MatDestroy_MPIAIJ(PetscObject obj)
473 {
474   Mat        mat = (Mat) obj;
475   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
476   int        ierr;
477 #if defined(PETSC_LOG)
478   PLogObjectState(obj,"Rows=%d, Cols=%d",aij->M,aij->N);
479 #endif
480   PETSCFREE(aij->rowners);
481   ierr = MatDestroy(aij->A); CHKERRQ(ierr);
482   ierr = MatDestroy(aij->B); CHKERRQ(ierr);
483   if (aij->colmap) PETSCFREE(aij->colmap);
484   if (aij->garray) PETSCFREE(aij->garray);
485   if (aij->lvec)   VecDestroy(aij->lvec);
486   if (aij->Mvctx)  VecScatterCtxDestroy(aij->Mvctx);
487   PETSCFREE(aij);
488   PLogObjectDestroy(mat);
489   PETSCHEADERDESTROY(mat);
490   return 0;
491 }
492 #include "draw.h"
493 #include "pinclude/pviewer.h"
494 
495 static int MatView_MPIAIJ_Binary(Mat mat,Viewer viewer)
496 {
497   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
498   int         ierr;
499 
500   if (aij->numtids == 1) {
501     ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
502   }
503   return 0;
504 }
505 
506 static int MatView_MPIAIJ_ASCIIorDraw(Mat mat,Viewer viewer)
507 {
508   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
509   Mat_SeqAIJ* C = (Mat_SeqAIJ*)aij->A->data;
510   int         ierr, format,shift = C->indexshift;
511   PetscObject vobj = (PetscObject) viewer;
512   FILE        *fd;
513 
514   if (vobj->type == ASCII_FILE_VIEWER || vobj->type == ASCII_FILES_VIEWER) {
515     ierr = ViewerFileGetFormat_Private(viewer,&format);
516     if (format == FILE_FORMAT_INFO) {
517       return 0;
518     }
519   }
520 
521   if (vobj->type == ASCII_FILE_VIEWER) {
522     ierr = ViewerFileGetPointer_Private(viewer,&fd); CHKERRQ(ierr);
523     MPIU_Seq_begin(mat->comm,1);
524     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
525            aij->mytid,aij->m,aij->rstart,aij->rend,aij->n,aij->cstart,
526            aij->cend);
527     ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
528     ierr = MatView(aij->B,viewer); CHKERRQ(ierr);
529     fflush(fd);
530     MPIU_Seq_end(mat->comm,1);
531   }
532   else {
533     int numtids = aij->numtids, mytid = aij->mytid;
534     if (numtids == 1) {
535       ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
536     }
537     else {
538       /* assemble the entire matrix onto first processor. */
539       Mat         A;
540       Mat_SeqAIJ *Aloc;
541       int         M = aij->M, N = aij->N,m,*ai,*aj,row,*cols,i,*ct;
542       Scalar      *a;
543 
544       if (!mytid) {
545         ierr = MatCreateMPIAIJ(mat->comm,M,N,M,N,0,0,0,0,&A);CHKERRQ(ierr);
546       }
547       else {
548         ierr = MatCreateMPIAIJ(mat->comm,0,0,M,N,0,0,0,0,&A);CHKERRQ(ierr);
549       }
550       PLogObjectParent(mat,A);
551 
552 
553       /* copy over the A part */
554       Aloc = (Mat_SeqAIJ*) aij->A->data;
555       m = Aloc->m; ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
556       row = aij->rstart;
557       for ( i=0; i<ai[m]+shift; i++ ) {aj[i] += aij->cstart + shift;}
558       for ( i=0; i<m; i++ ) {
559         ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],aj,a,INSERT_VALUES);CHKERRQ(ierr);
560         row++; a += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
561       }
562       aj = Aloc->j;
563       for ( i=0; i<ai[m]+shift; i++ ) {aj[i] -= aij->cstart + shift;}
564 
565       /* copy over the B part */
566       Aloc = (Mat_SeqAIJ*) aij->B->data;
567       m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
568       row = aij->rstart;
569       ct = cols = (int *) PETSCMALLOC( (ai[m]+1)*sizeof(int) ); CHKPTRQ(cols);
570       for ( i=0; i<ai[m]+shift; i++ ) {cols[i] = aij->garray[aj[i]+shift];}
571       for ( i=0; i<m; i++ ) {
572         ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],cols,a,INSERT_VALUES);CHKERRQ(ierr);
573         row++; a += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
574       }
575       PETSCFREE(ct);
576       ierr = MatAssemblyBegin(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
577       ierr = MatAssemblyEnd(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
578       if (!mytid) {
579         ierr = MatView(((Mat_MPIAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
580       }
581       ierr = MatDestroy(A); CHKERRQ(ierr);
582     }
583   }
584   return 0;
585 }
586 
587 static int MatView_MPIAIJ(PetscObject obj,Viewer viewer)
588 {
589   Mat         mat = (Mat) obj;
590   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
591   int         ierr;
592   PetscObject vobj = (PetscObject) viewer;
593 
594   if (!aij->assembled) SETERRQ(1,"MatView_MPIAIJ:must assemble matrix first");
595   if (!viewer) {
596     viewer = STDOUT_VIEWER_SELF; vobj = (PetscObject) viewer;
597   }
598   if (vobj->cookie == DRAW_COOKIE && vobj->type == NULLWINDOW) return 0;
599   else if (vobj->cookie == VIEWER_COOKIE && vobj->type == ASCII_FILE_VIEWER) {
600     ierr = MatView_MPIAIJ_ASCIIorDraw(mat,viewer); CHKERRQ(ierr);
601   }
602   else if (vobj->cookie == VIEWER_COOKIE && vobj->type == ASCII_FILES_VIEWER) {
603     ierr = MatView_MPIAIJ_ASCIIorDraw(mat,viewer); CHKERRQ(ierr);
604   }
605   else if (vobj->cookie == DRAW_COOKIE) {
606     ierr = MatView_MPIAIJ_ASCIIorDraw(mat,viewer); CHKERRQ(ierr);
607   }
608   else if (vobj->type == BINARY_FILE_VIEWER) {
609     return MatView_MPIAIJ_Binary(mat,viewer);
610   }
611   return 0;
612 }
613 
614 extern int MatMarkDiag_SeqAIJ(Mat);
615 /*
616     This has to provide several versions.
617 
618      1) per sequential
619      2) a) use only local smoothing updating outer values only once.
620         b) local smoothing updating outer values each inner iteration
621      3) color updating out values betwen colors.
622 */
623 static int MatRelax_MPIAIJ(Mat matin,Vec bb,double omega,MatSORType flag,
624                            double fshift,int its,Vec xx)
625 {
626   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
627   Mat        AA = mat->A, BB = mat->B;
628   Mat_SeqAIJ *A = (Mat_SeqAIJ *) AA->data, *B = (Mat_SeqAIJ *)BB->data;
629   Scalar     zero = 0.0,*b,*x,*xs,*ls,d,*v,sum,scale,*t,*ts;
630   int        ierr,*idx, *diag;
631   int        n = mat->n, m = mat->m, i,shift = A->indexshift;
632   Vec        tt;
633 
634   if (!mat->assembled) SETERRQ(1,"MatRelax_MPIAIJ:must assemble matrix first");
635 
636   VecGetArray(xx,&x); VecGetArray(bb,&b); VecGetArray(mat->lvec,&ls);
637   xs = x + shift; /* shift by one for index start of 1 */
638   ls = ls + shift;
639   if (!A->diag) {if ((ierr = MatMarkDiag_SeqAIJ(AA))) return ierr;}
640   diag = A->diag;
641   if (flag == SOR_APPLY_UPPER || flag == SOR_APPLY_LOWER) {
642     SETERRQ(1,"MatRelax_MPIAIJ:Option not yet supported");
643   }
644   if (flag & SOR_EISENSTAT) {
645     /* Let  A = L + U + D; where L is lower trianglar,
646     U is upper triangular, E is diagonal; This routine applies
647 
648             (L + E)^{-1} A (U + E)^{-1}
649 
650     to a vector efficiently using Eisenstat's trick. This is for
651     the case of SSOR preconditioner, so E is D/omega where omega
652     is the relaxation factor.
653     */
654     ierr = VecDuplicate(xx,&tt); CHKERRQ(ierr);
655     PLogObjectParent(matin,tt);
656     VecGetArray(tt,&t);
657     scale = (2.0/omega) - 1.0;
658     /*  x = (E + U)^{-1} b */
659     VecSet(&zero,mat->lvec);
660     ierr = VecPipelineBegin(xx,mat->lvec,INSERT_VALUES,PIPELINEUP,
661                               mat->Mvctx); CHKERRQ(ierr);
662     for ( i=m-1; i>-1; i-- ) {
663       n    = A->i[i+1] - diag[i] - 1;
664       idx  = A->j + diag[i] + !shift;
665       v    = A->a + diag[i] + !shift;
666       sum  = b[i];
667       SPARSEDENSEMDOT(sum,xs,v,idx,n);
668       d    = fshift + A->a[diag[i]+shift];
669       n    = B->i[i+1] - B->i[i];
670       idx  = B->j + B->i[i] + shift;
671       v    = B->a + B->i[i] + shift;
672       SPARSEDENSEMDOT(sum,ls,v,idx,n);
673       x[i] = omega*(sum/d);
674     }
675     ierr = VecPipelineEnd(xx,mat->lvec,INSERT_VALUES,PIPELINEUP,
676                             mat->Mvctx); CHKERRQ(ierr);
677 
678     /*  t = b - (2*E - D)x */
679     v = A->a;
680     for ( i=0; i<m; i++ ) { t[i] = b[i] - scale*(v[*diag++ + shift])*x[i]; }
681 
682     /*  t = (E + L)^{-1}t */
683     ts = t + shift; /* shifted by one for index start of a or mat->j*/
684     diag = A->diag;
685     VecSet(&zero,mat->lvec);
686     ierr = VecPipelineBegin(tt,mat->lvec,INSERT_VALUES,PIPELINEDOWN,
687                                                  mat->Mvctx); CHKERRQ(ierr);
688     for ( i=0; i<m; i++ ) {
689       n    = diag[i] - A->i[i];
690       idx  = A->j + A->i[i] + shift;
691       v    = A->a + A->i[i] + shift;
692       sum  = t[i];
693       SPARSEDENSEMDOT(sum,ts,v,idx,n);
694       d    = fshift + A->a[diag[i]+shift];
695       n    = B->i[i+1] - B->i[i];
696       idx  = B->j + B->i[i] + shift;
697       v    = B->a + B->i[i] + shift;
698       SPARSEDENSEMDOT(sum,ls,v,idx,n);
699       t[i] = omega*(sum/d);
700     }
701     ierr = VecPipelineEnd(tt,mat->lvec,INSERT_VALUES,PIPELINEDOWN,
702                                                     mat->Mvctx); CHKERRQ(ierr);
703     /*  x = x + t */
704     for ( i=0; i<m; i++ ) { x[i] += t[i]; }
705     VecDestroy(tt);
706     return 0;
707   }
708 
709 
710   if ((flag & SOR_SYMMETRIC_SWEEP) == SOR_SYMMETRIC_SWEEP){
711     if (flag & SOR_ZERO_INITIAL_GUESS) {
712       VecSet(&zero,mat->lvec); VecSet(&zero,xx);
713     }
714     else {
715       ierr=VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTERUP,mat->Mvctx);
716       CHKERRQ(ierr);
717       ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTERUP,mat->Mvctx);
718       CHKERRQ(ierr);
719     }
720     while (its--) {
721       /* go down through the rows */
722       ierr = VecPipelineBegin(xx,mat->lvec,INSERT_VALUES,PIPELINEDOWN,
723                               mat->Mvctx); CHKERRQ(ierr);
724       for ( i=0; i<m; i++ ) {
725         n    = A->i[i+1] - A->i[i];
726         idx  = A->j + A->i[i] + shift;
727         v    = A->a + A->i[i] + shift;
728         sum  = b[i];
729         SPARSEDENSEMDOT(sum,xs,v,idx,n);
730         d    = fshift + A->a[diag[i]+shift];
731         n    = B->i[i+1] - B->i[i];
732         idx  = B->j + B->i[i] + shift;
733         v    = B->a + B->i[i] + shift;
734         SPARSEDENSEMDOT(sum,ls,v,idx,n);
735         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
736       }
737       ierr = VecPipelineEnd(xx,mat->lvec,INSERT_VALUES,PIPELINEDOWN,
738                             mat->Mvctx); CHKERRQ(ierr);
739       /* come up through the rows */
740       ierr = VecPipelineBegin(xx,mat->lvec,INSERT_VALUES,PIPELINEUP,
741                               mat->Mvctx); CHKERRQ(ierr);
742       for ( i=m-1; i>-1; i-- ) {
743         n    = A->i[i+1] - A->i[i];
744         idx  = A->j + A->i[i] + shift;
745         v    = A->a + A->i[i] + shift;
746         sum  = b[i];
747         SPARSEDENSEMDOT(sum,xs,v,idx,n);
748         d    = fshift + A->a[diag[i]+shift];
749         n    = B->i[i+1] - B->i[i];
750         idx  = B->j + B->i[i] + shift;
751         v    = B->a + B->i[i] + shift;
752         SPARSEDENSEMDOT(sum,ls,v,idx,n);
753         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
754       }
755       ierr = VecPipelineEnd(xx,mat->lvec,INSERT_VALUES,PIPELINEUP,
756                             mat->Mvctx); CHKERRQ(ierr);
757     }
758   }
759   else if (flag & SOR_FORWARD_SWEEP){
760     if (flag & SOR_ZERO_INITIAL_GUESS) {
761       VecSet(&zero,mat->lvec);
762       ierr = VecPipelineBegin(xx,mat->lvec,INSERT_VALUES,PIPELINEDOWN,
763                               mat->Mvctx); CHKERRQ(ierr);
764       for ( i=0; i<m; i++ ) {
765         n    = diag[i] - A->i[i];
766         idx  = A->j + A->i[i] + shift;
767         v    = A->a + A->i[i] + shift;
768         sum  = b[i];
769         SPARSEDENSEMDOT(sum,xs,v,idx,n);
770         d    = fshift + A->a[diag[i]+shift];
771         n    = B->i[i+1] - B->i[i];
772         idx  = B->j + B->i[i] + shift;
773         v    = B->a + B->i[i] + shift;
774         SPARSEDENSEMDOT(sum,ls,v,idx,n);
775         x[i] = omega*(sum/d);
776       }
777       ierr = VecPipelineEnd(xx,mat->lvec,INSERT_VALUES,PIPELINEDOWN,
778                             mat->Mvctx); CHKERRQ(ierr);
779       its--;
780     }
781     while (its--) {
782       ierr=VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTERUP,mat->Mvctx);
783       CHKERRQ(ierr);
784       ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTERUP,mat->Mvctx);
785       CHKERRQ(ierr);
786       ierr = VecPipelineBegin(xx,mat->lvec,INSERT_VALUES,PIPELINEDOWN,
787                               mat->Mvctx); CHKERRQ(ierr);
788       for ( i=0; i<m; i++ ) {
789         n    = A->i[i+1] - A->i[i];
790         idx  = A->j + A->i[i] + shift;
791         v    = A->a + A->i[i] + shift;
792         sum  = b[i];
793         SPARSEDENSEMDOT(sum,xs,v,idx,n);
794         d    = fshift + A->a[diag[i]+shift];
795         n    = B->i[i+1] - B->i[i];
796         idx  = B->j + B->i[i] + shift;
797         v    = B->a + B->i[i] + shift;
798         SPARSEDENSEMDOT(sum,ls,v,idx,n);
799         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
800       }
801       ierr = VecPipelineEnd(xx,mat->lvec,INSERT_VALUES,PIPELINEDOWN,
802                             mat->Mvctx); CHKERRQ(ierr);
803     }
804   }
805   else if (flag & SOR_BACKWARD_SWEEP){
806     if (flag & SOR_ZERO_INITIAL_GUESS) {
807       VecSet(&zero,mat->lvec);
808       ierr = VecPipelineBegin(xx,mat->lvec,INSERT_VALUES,PIPELINEUP,
809                               mat->Mvctx); CHKERRQ(ierr);
810       for ( i=m-1; i>-1; i-- ) {
811         n    = A->i[i+1] - diag[i] - 1;
812         idx  = A->j + diag[i] + !shift;
813         v    = A->a + diag[i] + !shift;
814         sum  = b[i];
815         SPARSEDENSEMDOT(sum,xs,v,idx,n);
816         d    = fshift + A->a[diag[i]+shift];
817         n    = B->i[i+1] - B->i[i];
818         idx  = B->j + B->i[i] + shift;
819         v    = B->a + B->i[i] + shift;
820         SPARSEDENSEMDOT(sum,ls,v,idx,n);
821         x[i] = omega*(sum/d);
822       }
823       ierr = VecPipelineEnd(xx,mat->lvec,INSERT_VALUES,PIPELINEUP,
824                             mat->Mvctx); CHKERRQ(ierr);
825       its--;
826     }
827     while (its--) {
828       ierr = VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTERDOWN,
829                             mat->Mvctx); CHKERRQ(ierr);
830       ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTERDOWN,
831                             mat->Mvctx); CHKERRQ(ierr);
832       ierr = VecPipelineBegin(xx,mat->lvec,INSERT_VALUES,PIPELINEUP,
833                               mat->Mvctx); CHKERRQ(ierr);
834       for ( i=m-1; i>-1; i-- ) {
835         n    = A->i[i+1] - A->i[i];
836         idx  = A->j + A->i[i] + shift;
837         v    = A->a + A->i[i] + shift;
838         sum  = b[i];
839         SPARSEDENSEMDOT(sum,xs,v,idx,n);
840         d    = fshift + A->a[diag[i]+shift];
841         n    = B->i[i+1] - B->i[i];
842         idx  = B->j + B->i[i] + shift;
843         v    = B->a + B->i[i] + shift;
844         SPARSEDENSEMDOT(sum,ls,v,idx,n);
845         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
846       }
847       ierr = VecPipelineEnd(xx,mat->lvec,INSERT_VALUES,PIPELINEUP,
848                             mat->Mvctx); CHKERRQ(ierr);
849     }
850   }
851   else if ((flag & SOR_LOCAL_SYMMETRIC_SWEEP) == SOR_LOCAL_SYMMETRIC_SWEEP){
852     if (flag & SOR_ZERO_INITIAL_GUESS) {
853       return MatRelax(mat->A,bb,omega,flag,fshift,its,xx);
854     }
855     ierr=VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTERALL,mat->Mvctx);
856     CHKERRQ(ierr);
857     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTERALL,mat->Mvctx);
858     CHKERRQ(ierr);
859     while (its--) {
860       /* go down through the rows */
861       for ( i=0; i<m; i++ ) {
862         n    = A->i[i+1] - A->i[i];
863         idx  = A->j + A->i[i] + shift;
864         v    = A->a + A->i[i] + shift;
865         sum  = b[i];
866         SPARSEDENSEMDOT(sum,xs,v,idx,n);
867         d    = fshift + A->a[diag[i]+shift];
868         n    = B->i[i+1] - B->i[i];
869         idx  = B->j + B->i[i] + shift;
870         v    = B->a + B->i[i] + shift;
871         SPARSEDENSEMDOT(sum,ls,v,idx,n);
872         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
873       }
874       /* come up through the rows */
875       for ( i=m-1; i>-1; i-- ) {
876         n    = A->i[i+1] - A->i[i];
877         idx  = A->j + A->i[i] + shift;
878         v    = A->a + A->i[i] + shift;
879         sum  = b[i];
880         SPARSEDENSEMDOT(sum,xs,v,idx,n);
881         d    = fshift + A->a[diag[i]+shift];
882         n    = B->i[i+1] - B->i[i];
883         idx  = B->j + B->i[i] + shift;
884         v    = B->a + B->i[i] + shift;
885         SPARSEDENSEMDOT(sum,ls,v,idx,n);
886         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
887       }
888     }
889   }
890   else if (flag & SOR_LOCAL_FORWARD_SWEEP){
891     if (flag & SOR_ZERO_INITIAL_GUESS) {
892       return MatRelax(mat->A,bb,omega,flag,fshift,its,xx);
893     }
894     ierr=VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTERALL,mat->Mvctx);
895     CHKERRQ(ierr);
896     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTERALL,mat->Mvctx);
897     CHKERRQ(ierr);
898     while (its--) {
899       for ( i=0; i<m; i++ ) {
900         n    = A->i[i+1] - A->i[i];
901         idx  = A->j + A->i[i] + shift;
902         v    = A->a + A->i[i] + shift;
903         sum  = b[i];
904         SPARSEDENSEMDOT(sum,xs,v,idx,n);
905         d    = fshift + A->a[diag[i]+shift];
906         n    = B->i[i+1] - B->i[i];
907         idx  = B->j + B->i[i] + shift;
908         v    = B->a + B->i[i] + shift;
909         SPARSEDENSEMDOT(sum,ls,v,idx,n);
910         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
911       }
912     }
913   }
914   else if (flag & SOR_LOCAL_BACKWARD_SWEEP){
915     if (flag & SOR_ZERO_INITIAL_GUESS) {
916       return MatRelax(mat->A,bb,omega,flag,fshift,its,xx);
917     }
918     ierr = VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTERALL,
919                             mat->Mvctx); CHKERRQ(ierr);
920     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTERALL,
921                             mat->Mvctx); CHKERRQ(ierr);
922     while (its--) {
923       for ( i=m-1; i>-1; i-- ) {
924         n    = A->i[i+1] - A->i[i];
925         idx  = A->j + A->i[i] + shift;
926         v    = A->a + A->i[i] + shift;
927         sum  = b[i];
928         SPARSEDENSEMDOT(sum,xs,v,idx,n);
929         d    = fshift + A->a[diag[i]+shift];
930         n    = B->i[i+1] - B->i[i];
931         idx  = B->j + B->i[i] + shift;
932         v    = B->a + B->i[i] + shift;
933         SPARSEDENSEMDOT(sum,ls,v,idx,n);
934         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
935       }
936     }
937   }
938   return 0;
939 }
940 
941 static int MatGetInfo_MPIAIJ(Mat matin,MatInfoType flag,int *nz,
942                              int *nzalloc,int *mem)
943 {
944   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
945   Mat        A = mat->A, B = mat->B;
946   int        ierr, isend[3], irecv[3], nzA, nzallocA, memA;
947 
948   ierr = MatGetInfo(A,MAT_LOCAL,&nzA,&nzallocA,&memA); CHKERRQ(ierr);
949   ierr = MatGetInfo(B,MAT_LOCAL,&isend[0],&isend[1],&isend[2]); CHKERRQ(ierr);
950   isend[0] += nzA; isend[1] += nzallocA; isend[2] += memA;
951   if (flag == MAT_LOCAL) {
952     *nz = isend[0]; *nzalloc = isend[1]; *mem = isend[2];
953   } else if (flag == MAT_GLOBAL_MAX) {
954     MPI_Allreduce((void *) isend,(void *) irecv,3,MPI_INT,MPI_MAX,matin->comm);
955     *nz = irecv[0]; *nzalloc = irecv[1]; *mem = irecv[2];
956   } else if (flag == MAT_GLOBAL_SUM) {
957     MPI_Allreduce((void *) isend,(void *) irecv,3,MPI_INT,MPI_SUM,matin->comm);
958     *nz = irecv[0]; *nzalloc = irecv[1]; *mem = irecv[2];
959   }
960   return 0;
961 }
962 
963 extern int MatLUFactorSymbolic_MPIAIJ(Mat,IS,IS,double,Mat*);
964 extern int MatLUFactorNumeric_MPIAIJ(Mat,Mat*);
965 extern int MatLUFactor_MPIAIJ(Mat,IS,IS,double);
966 extern int MatILUFactorSymbolic_MPIAIJ(Mat,IS,IS,double,int,Mat *);
967 extern int MatSolve_MPIAIJ(Mat,Vec,Vec);
968 extern int MatSolveAdd_MPIAIJ(Mat,Vec,Vec,Vec);
969 extern int MatSolveTrans_MPIAIJ(Mat,Vec,Vec);
970 extern int MatSolveTransAdd_MPIAIJ(Mat,Vec,Vec,Vec);
971 
972 static int MatSetOption_MPIAIJ(Mat A,MatOption op)
973 {
974   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) A->data;
975 
976   if      (op == NO_NEW_NONZERO_LOCATIONS)  {
977     MatSetOption(aij->A,op);
978     MatSetOption(aij->B,op);
979   }
980   else if (op == YES_NEW_NONZERO_LOCATIONS) {
981     MatSetOption(aij->A,op);
982     MatSetOption(aij->B,op);
983   }
984   else if (op == COLUMN_ORIENTED)
985     SETERRQ(1,"MatSetOption_MPIAIJ:Column oriented not supported");
986   return 0;
987 }
988 
989 static int MatGetSize_MPIAIJ(Mat matin,int *m,int *n)
990 {
991   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
992   *m = mat->M; *n = mat->N;
993   return 0;
994 }
995 
996 static int MatGetLocalSize_MPIAIJ(Mat matin,int *m,int *n)
997 {
998   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
999   *m = mat->m; *n = mat->N;
1000   return 0;
1001 }
1002 
1003 static int MatGetOwnershipRange_MPIAIJ(Mat matin,int *m,int *n)
1004 {
1005   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1006   *m = mat->rstart; *n = mat->rend;
1007   return 0;
1008 }
1009 
1010 static int MatGetRow_MPIAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
1011 {
1012   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1013   Scalar     *vworkA, *vworkB, **pvA, **pvB;
1014   int        i, ierr, *cworkA, *cworkB, **pcA, **pcB, cstart = mat->cstart;
1015   int        nztot, nzA, nzB, lrow, rstart = mat->rstart, rend = mat->rend;
1016 
1017   if (row < rstart || row >= rend) SETERRQ(1,"MatGetRow_MPIAIJ:only local rows")
1018   lrow = row - rstart;
1019 
1020   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
1021   if (!v)   {pvA = 0; pvB = 0;}
1022   if (!idx) {pcA = 0; if (!v) pcB = 0;}
1023   ierr = MatGetRow(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1024   ierr = MatGetRow(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1025   nztot = nzA + nzB;
1026 
1027   if (v  || idx) {
1028     if (nztot) {
1029       /* Sort by increasing column numbers, assuming A and B already sorted */
1030       int imark;
1031       if (mat->assembled) {
1032         for (i=0; i<nzB; i++) cworkB[i] = mat->garray[cworkB[i]];
1033       }
1034       if (v) {
1035         *v = (Scalar *) PETSCMALLOC( (nztot)*sizeof(Scalar) ); CHKPTRQ(*v);
1036         for ( i=0; i<nzB; i++ ) {
1037           if (cworkB[i] < cstart)   (*v)[i] = vworkB[i];
1038           else break;
1039         }
1040         imark = i;
1041         for ( i=0; i<nzA; i++ )     (*v)[imark+i] = vworkA[i];
1042         for ( i=imark; i<nzB; i++ ) (*v)[nzA+i] = vworkB[i];
1043       }
1044       if (idx) {
1045         *idx = (int *) PETSCMALLOC( (nztot)*sizeof(int) ); CHKPTRQ(*idx);
1046         for (i=0; i<nzA; i++) cworkA[i] += cstart;
1047         for ( i=0; i<nzB; i++ ) {
1048           if (cworkB[i] < cstart)   (*idx)[i] = cworkB[i];
1049           else break;
1050         }
1051         imark = i;
1052         for ( i=0; i<nzA; i++ )     (*idx)[imark+i] = cworkA[i];
1053         for ( i=imark; i<nzB; i++ ) (*idx)[nzA+i] = cworkB[i];
1054       }
1055     }
1056     else {*idx = 0; *v=0;}
1057   }
1058   *nz = nztot;
1059   ierr = MatRestoreRow(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1060   ierr = MatRestoreRow(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1061   return 0;
1062 }
1063 
1064 static int MatRestoreRow_MPIAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
1065 {
1066   if (idx) PETSCFREE(*idx);
1067   if (v) PETSCFREE(*v);
1068   return 0;
1069 }
1070 
1071 static int MatNorm_MPIAIJ(Mat mat,MatNormType type,double *norm)
1072 {
1073   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
1074   Mat_SeqAIJ *amat = (Mat_SeqAIJ*) aij->A->data, *bmat = (Mat_SeqAIJ*) aij->B->data;
1075   int        ierr, i, j, cstart = aij->cstart,shift = amat->indexshift;
1076   double     sum = 0.0;
1077   Scalar     *v;
1078 
1079   if (!aij->assembled) SETERRQ(1,"MatNorm_MPIAIJ:Must assemble matrix");
1080   if (aij->numtids == 1) {
1081     ierr =  MatNorm(aij->A,type,norm); CHKERRQ(ierr);
1082   } else {
1083     if (type == NORM_FROBENIUS) {
1084       v = amat->a;
1085       for (i=0; i<amat->nz; i++ ) {
1086 #if defined(PETSC_COMPLEX)
1087         sum += real(conj(*v)*(*v)); v++;
1088 #else
1089         sum += (*v)*(*v); v++;
1090 #endif
1091       }
1092       v = bmat->a;
1093       for (i=0; i<bmat->nz; i++ ) {
1094 #if defined(PETSC_COMPLEX)
1095         sum += real(conj(*v)*(*v)); v++;
1096 #else
1097         sum += (*v)*(*v); v++;
1098 #endif
1099       }
1100       MPI_Allreduce((void*)&sum,(void*)norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
1101       *norm = sqrt(*norm);
1102     }
1103     else if (type == NORM_1) { /* max column norm */
1104       double *tmp, *tmp2;
1105       int    *jj, *garray = aij->garray;
1106       tmp  = (double *) PETSCMALLOC( aij->N*sizeof(double) ); CHKPTRQ(tmp);
1107       tmp2 = (double *) PETSCMALLOC( aij->N*sizeof(double) ); CHKPTRQ(tmp2);
1108       PetscZero(tmp,aij->N*sizeof(double));
1109       *norm = 0.0;
1110       v = amat->a; jj = amat->j;
1111       for ( j=0; j<amat->nz; j++ ) {
1112 #if defined(PETSC_COMPLEX)
1113         tmp[cstart + *jj++ + shift] += abs(*v++);
1114 #else
1115         tmp[cstart + *jj++ + shift] += fabs(*v++);
1116 #endif
1117       }
1118       v = bmat->a; jj = bmat->j;
1119       for ( j=0; j<bmat->nz; j++ ) {
1120 #if defined(PETSC_COMPLEX)
1121         tmp[garray[*jj++ + shift]] += abs(*v++);
1122 #else
1123         tmp[garray[*jj++ + shift]] += fabs(*v++);
1124 #endif
1125       }
1126       MPI_Allreduce((void*)tmp,(void*)tmp2,aij->N,MPI_DOUBLE,MPI_SUM,mat->comm);
1127       for ( j=0; j<aij->N; j++ ) {
1128         if (tmp2[j] > *norm) *norm = tmp2[j];
1129       }
1130       PETSCFREE(tmp); PETSCFREE(tmp2);
1131     }
1132     else if (type == NORM_INFINITY) { /* max row norm */
1133       double normtemp = 0.0;
1134       for ( j=0; j<amat->m; j++ ) {
1135         v = amat->a + amat->i[j] + shift;
1136         sum = 0.0;
1137         for ( i=0; i<amat->i[j+1]-amat->i[j]; i++ ) {
1138 #if defined(PETSC_COMPLEX)
1139           sum += abs(*v); v++;
1140 #else
1141           sum += fabs(*v); v++;
1142 #endif
1143         }
1144         v = bmat->a + bmat->i[j] + shift;
1145         for ( i=0; i<bmat->i[j+1]-bmat->i[j]; i++ ) {
1146 #if defined(PETSC_COMPLEX)
1147           sum += abs(*v); v++;
1148 #else
1149           sum += fabs(*v); v++;
1150 #endif
1151         }
1152         if (sum > normtemp) normtemp = sum;
1153         MPI_Allreduce((void*)&normtemp,(void*)norm,1,MPI_DOUBLE,MPI_MAX,mat->comm);
1154       }
1155     }
1156     else {
1157       SETERRQ(1,"MatNorm_MPIRow:No support for the two norm");
1158     }
1159   }
1160   return 0;
1161 }
1162 
1163 static int MatTranspose_MPIAIJ(Mat A,Mat *matout)
1164 {
1165   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
1166   Mat_SeqAIJ *Aloc = (Mat_SeqAIJ *) a->A->data;
1167   int        ierr,shift = Aloc->indexshift;
1168   Mat        B;
1169   int        M = a->M, N = a->N,m,*ai,*aj,row,*cols,i,*ct;
1170   Scalar     *array;
1171 
1172   if (!matout && M != N) SETERRQ(1,"MatTranspose_MPIAIJ:Square only in-place");
1173   ierr = MatCreateMPIAIJ(A->comm,PETSC_DECIDE,PETSC_DECIDE,N,M,0,0,0,0,&B);
1174   CHKERRQ(ierr);
1175 
1176   /* copy over the A part */
1177   Aloc = (Mat_SeqAIJ*) a->A->data;
1178   m = Aloc->m; ai = Aloc->i; aj = Aloc->j; array = Aloc->a;
1179   row = a->rstart;
1180   for ( i=0; i<ai[m]+shift; i++ ) {aj[i] += a->cstart + shift;}
1181   for ( i=0; i<m; i++ ) {
1182     ierr = MatSetValues(B,ai[i+1]-ai[i],aj,1,&row,array,INSERT_VALUES);CHKERRQ(ierr);
1183     row++; array += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
1184   }
1185   aj = Aloc->j;
1186   for ( i=0; i<ai[m]|+shift; i++ ) {aj[i] -= a->cstart + shift;}
1187 
1188   /* copy over the B part */
1189   Aloc = (Mat_SeqAIJ*) a->B->data;
1190   m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; array = Aloc->a;
1191   row = a->rstart;
1192   ct = cols = (int *) PETSCMALLOC( (ai[m]-shift)*sizeof(int) ); CHKPTRQ(cols);
1193   for ( i=0; i<ai[m]+shift; i++ ) {cols[i] = a->garray[aj[i]+shift];}
1194   for ( i=0; i<m; i++ ) {
1195     ierr = MatSetValues(B,ai[i+1]-ai[i],cols,1,&row,array,INSERT_VALUES);CHKERRQ(ierr);
1196     row++; array += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
1197   }
1198   PETSCFREE(ct);
1199   ierr = MatAssemblyBegin(B,FINAL_ASSEMBLY); CHKERRQ(ierr);
1200   ierr = MatAssemblyEnd(B,FINAL_ASSEMBLY); CHKERRQ(ierr);
1201   if (matout) {
1202     *matout = B;
1203   } else {
1204     /* This isn't really an in-place transpose .... but free data structures from a */
1205     PETSCFREE(a->rowners);
1206     ierr = MatDestroy(a->A); CHKERRQ(ierr);
1207     ierr = MatDestroy(a->B); CHKERRQ(ierr);
1208     if (a->colmap) PETSCFREE(a->colmap);
1209     if (a->garray) PETSCFREE(a->garray);
1210     if (a->lvec) VecDestroy(a->lvec);
1211     if (a->Mvctx) VecScatterCtxDestroy(a->Mvctx);
1212     PETSCFREE(a);
1213     PetscMemcpy(A,B,sizeof(struct _Mat));
1214     PETSCHEADERDESTROY(B);
1215   }
1216   return 0;
1217 }
1218 
1219 extern int MatConvert_MPIAIJ(Mat,MatType,Mat *);
1220 static int MatCopyPrivate_MPIAIJ(Mat,Mat *);
1221 
1222 /* -------------------------------------------------------------------*/
1223 static struct _MatOps MatOps = {MatSetValues_MPIAIJ,
1224        MatGetRow_MPIAIJ,MatRestoreRow_MPIAIJ,
1225        MatMult_MPIAIJ,MatMultAdd_MPIAIJ,
1226        MatMultTrans_MPIAIJ,MatMultTransAdd_MPIAIJ,
1227        MatSolve_MPIAIJ,MatSolveAdd_MPIAIJ,
1228        MatSolveTrans_MPIAIJ,MatSolveTransAdd_MPIAIJ,
1229        MatLUFactor_MPIAIJ,0,
1230        MatRelax_MPIAIJ,
1231        MatTranspose_MPIAIJ,
1232        MatGetInfo_MPIAIJ,0,
1233        MatGetDiagonal_MPIAIJ,0,MatNorm_MPIAIJ,
1234        MatAssemblyBegin_MPIAIJ,MatAssemblyEnd_MPIAIJ,
1235        0,
1236        MatSetOption_MPIAIJ,MatZeroEntries_MPIAIJ,MatZeroRows_MPIAIJ,
1237        MatGetReordering_MPIAIJ,
1238        MatLUFactorSymbolic_MPIAIJ,MatLUFactorNumeric_MPIAIJ,0,0,
1239        MatGetSize_MPIAIJ,MatGetLocalSize_MPIAIJ,MatGetOwnershipRange_MPIAIJ,
1240        MatILUFactorSymbolic_MPIAIJ,0,
1241        0,0,MatConvert_MPIAIJ,0,0,MatCopyPrivate_MPIAIJ};
1242 
1243 /*@
1244    MatCreateMPIAIJ - Creates a sparse parallel matrix in AIJ format
1245    (the default parallel PETSc format).
1246 
1247    Input Parameters:
1248 .  comm - MPI communicator
1249 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1250 .  n - number of local columns (or PETSC_DECIDE to have calculated
1251            if N is given)
1252 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1253 .  N - number of global columns (or PETSC_DECIDE to have calculated
1254            if n is given)
1255 .  d_nz - number of nonzeros per row in diagonal portion of local submatrix
1256            (same for all local rows)
1257 .  d_nzz - number of nonzeros per row in diagonal portion of local submatrix
1258            or null (possibly different for each row).  You must leave room
1259            for the diagonal entry even if it is zero.
1260 .  o_nz - number of nonzeros per row in off-diagonal portion of local
1261            submatrix (same for all local rows).
1262 .  o_nzz - number of nonzeros per row in off-diagonal portion of local
1263            submatrix or null (possibly different for each row).
1264 
1265    Output Parameter:
1266 .  newmat - the matrix
1267 
1268    Notes:
1269    The AIJ format (also called the Yale sparse matrix format or
1270    compressed row storage), is fully compatible with standard Fortran 77
1271    storage.  That is, the stored row and column indices begin at
1272    one, not zero.  See the users manual for further details.
1273 
1274    The user MUST specify either the local or global matrix dimensions
1275    (possibly both).
1276 
1277    Storage Information:
1278    For a square global matrix we define each processor's diagonal portion
1279    to be its local rows and the corresponding columns (a square submatrix);
1280    each processor's off-diagonal portion encompasses the remainder of the
1281    local matrix (a rectangular submatrix).
1282 
1283    The user can specify preallocated storage for the diagonal part of
1284    the local submatrix with either d_nz or d_nnz (not both). Set both
1285    d_nz and d_nnz to zero for PETSc to control dynamic memory allocation.
1286    Likewise, specify preallocated storage for the off-diagonal part of
1287    the local submatrix with o_nz or o_nnz (not both).
1288 
1289 .keywords: matrix, aij, compressed row, sparse, parallel
1290 
1291 .seealso: MatCreate(), MatCreateSeqAIJ(), MatSetValues()
1292 @*/
1293 int MatCreateMPIAIJ(MPI_Comm comm,int m,int n,int M,int N,
1294                     int d_nz,int *d_nnz, int o_nz,int *o_nnz,Mat *newmat)
1295 {
1296   Mat          mat;
1297   Mat_MPIAIJ   *a;
1298   int          ierr, i,sum[2],work[2];
1299 
1300   *newmat         = 0;
1301   PETSCHEADERCREATE(mat,_Mat,MAT_COOKIE,MATMPIAIJ,comm);
1302   PLogObjectCreate(mat);
1303   mat->data       = (void *) (a = PETSCNEW(Mat_MPIAIJ)); CHKPTRQ(a);
1304   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1305   mat->destroy    = MatDestroy_MPIAIJ;
1306   mat->view       = MatView_MPIAIJ;
1307   mat->factor     = 0;
1308 
1309   a->insertmode = NOTSETVALUES;
1310   MPI_Comm_rank(comm,&a->mytid);
1311   MPI_Comm_size(comm,&a->numtids);
1312 
1313   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1314     work[0] = m; work[1] = n;
1315     MPI_Allreduce((void *) work,(void *) sum,2,MPI_INT,MPI_SUM,comm );
1316     if (M == PETSC_DECIDE) M = sum[0];
1317     if (N == PETSC_DECIDE) N = sum[1];
1318   }
1319   if (m == PETSC_DECIDE) {m = M/a->numtids + ((M % a->numtids) > a->mytid);}
1320   if (n == PETSC_DECIDE) {n = N/a->numtids + ((N % a->numtids) > a->mytid);}
1321   a->m = m;
1322   a->n = n;
1323   a->N = N;
1324   a->M = M;
1325 
1326   /* build local table of row and column ownerships */
1327   a->rowners = (int *) PETSCMALLOC(2*(a->numtids+2)*sizeof(int)); CHKPTRQ(a->rowners);
1328   PLogObjectMemory(mat,2*(a->numtids+2)*sizeof(int)+sizeof(struct _Mat)+
1329                        sizeof(Mat_MPIAIJ));
1330   a->cowners = a->rowners + a->numtids + 1;
1331   MPI_Allgather(&m,1,MPI_INT,a->rowners+1,1,MPI_INT,comm);
1332   a->rowners[0] = 0;
1333   for ( i=2; i<=a->numtids; i++ ) {
1334     a->rowners[i] += a->rowners[i-1];
1335   }
1336   a->rstart = a->rowners[a->mytid];
1337   a->rend   = a->rowners[a->mytid+1];
1338   MPI_Allgather(&n,1,MPI_INT,a->cowners+1,1,MPI_INT,comm);
1339   a->cowners[0] = 0;
1340   for ( i=2; i<=a->numtids; i++ ) {
1341     a->cowners[i] += a->cowners[i-1];
1342   }
1343   a->cstart = a->cowners[a->mytid];
1344   a->cend   = a->cowners[a->mytid+1];
1345 
1346   ierr = MatCreateSeqAIJ(MPI_COMM_SELF,m,n,d_nz,d_nnz,&a->A); CHKERRQ(ierr);
1347   PLogObjectParent(mat,a->A);
1348   ierr = MatCreateSeqAIJ(MPI_COMM_SELF,m,N,o_nz,o_nnz,&a->B); CHKERRQ(ierr);
1349   PLogObjectParent(mat,a->B);
1350 
1351   /* build cache for off array entries formed */
1352   ierr = StashBuild_Private(&a->stash); CHKERRQ(ierr);
1353   a->colmap    = 0;
1354   a->garray    = 0;
1355 
1356   /* stuff used for matrix vector multiply */
1357   a->lvec      = 0;
1358   a->Mvctx     = 0;
1359   a->assembled = 0;
1360 
1361   *newmat = mat;
1362   return 0;
1363 }
1364 
1365 static int MatCopyPrivate_MPIAIJ(Mat matin,Mat *newmat)
1366 {
1367   Mat        mat;
1368   Mat_MPIAIJ *a,*oldmat = (Mat_MPIAIJ *) matin->data;
1369   int        ierr, len;
1370 
1371   if (!oldmat->assembled) SETERRQ(1,"MatCopyPrivate_MPIAIJ:Must assemble matrix");
1372   *newmat      = 0;
1373   PETSCHEADERCREATE(mat,_Mat,MAT_COOKIE,MATMPIAIJ,matin->comm);
1374   PLogObjectCreate(mat);
1375   mat->data       = (void *) (a = PETSCNEW(Mat_MPIAIJ)); CHKPTRQ(a);
1376   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1377   mat->destroy    = MatDestroy_MPIAIJ;
1378   mat->view       = MatView_MPIAIJ;
1379   mat->factor     = matin->factor;
1380 
1381   a->m          = oldmat->m;
1382   a->n          = oldmat->n;
1383   a->M          = oldmat->M;
1384   a->N          = oldmat->N;
1385 
1386   a->assembled  = 1;
1387   a->rstart     = oldmat->rstart;
1388   a->rend       = oldmat->rend;
1389   a->cstart     = oldmat->cstart;
1390   a->cend       = oldmat->cend;
1391   a->numtids    = oldmat->numtids;
1392   a->mytid      = oldmat->mytid;
1393   a->insertmode = NOTSETVALUES;
1394 
1395   a->rowners    = (int *) PETSCMALLOC((a->numtids+1)*sizeof(int));CHKPTRQ(a->rowners);
1396   PLogObjectMemory(mat,(a->numtids+1)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIAIJ));
1397   PetscMemcpy(a->rowners,oldmat->rowners,(a->numtids+1)*sizeof(int));
1398   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1399   if (oldmat->colmap) {
1400     a->colmap      = (int *) PETSCMALLOC( (a->N)*sizeof(int) );CHKPTRQ(a->colmap);
1401     PLogObjectMemory(mat,(a->N)*sizeof(int));
1402     PetscMemcpy(a->colmap,oldmat->colmap,(a->N)*sizeof(int));
1403   } else a->colmap = 0;
1404   if (oldmat->garray && (len = ((Mat_SeqAIJ *) (oldmat->B->data))->n)) {
1405     a->garray      = (int *) PETSCMALLOC(len*sizeof(int) ); CHKPTRQ(a->garray);
1406     PLogObjectMemory(mat,len*sizeof(int));
1407     PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1408   } else a->garray = 0;
1409 
1410   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1411   PLogObjectParent(mat,a->lvec);
1412   ierr =  VecScatterCtxCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1413   PLogObjectParent(mat,a->Mvctx);
1414   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1415   PLogObjectParent(mat,a->A);
1416   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1417   PLogObjectParent(mat,a->B);
1418   *newmat = mat;
1419   return 0;
1420 }
1421 
1422 #include "sysio.h"
1423 
1424 int MatLoad_MPIAIJ(Viewer bview,MatType type,Mat *newmat)
1425 {
1426   Mat_MPIAIJ  *mrow;
1427   Mat          mat;
1428   int          i, nz, ierr, j,rstart, rend, fd;
1429   Scalar       *vals;
1430   PetscObject  vobj = (PetscObject) bview;
1431   MPI_Comm     comm = vobj->comm;
1432   MPI_Status   status;
1433   int          header[4],mytid,numtid,*rowlengths = 0,M,N,m,*rowners,maxnz,*cols;
1434   int          *ourlens,*sndcounts = 0,*procsnz, *offlens,jj;
1435 
1436   MPI_Comm_size(comm,&numtid); MPI_Comm_rank(comm,&mytid);
1437   if (!mytid) {
1438     ierr = ViewerFileGetDescriptor_Private(bview,&fd); CHKERRQ(ierr);
1439     ierr = SYRead(fd,(char *)header,4,SYINT); CHKERRQ(ierr);
1440     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIAIJ: not matrix object");
1441   }
1442 
1443   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1444   M = header[1]; N = header[2];
1445   /* determine ownership of all rows */
1446   m = M/numtid + ((M % numtid) > mytid);
1447   rowners = (int *) PETSCMALLOC((numtid+2)*sizeof(int)); CHKPTRQ(rowners);
1448   MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1449   rowners[0] = 0;
1450   for ( i=2; i<=numtid; i++ ) {
1451     rowners[i] += rowners[i-1];
1452   }
1453   rstart = rowners[mytid];
1454   rend   = rowners[mytid+1];
1455 
1456   /* distribute row lengths to all processors */
1457   ourlens = (int*) PETSCMALLOC( 2*(rend-rstart)*sizeof(int) ); CHKPTRQ(ourlens);
1458   offlens = ourlens + (rend-rstart);
1459   if (!mytid) {
1460     rowlengths = (int*) PETSCMALLOC( M*sizeof(int) ); CHKPTRQ(rowlengths);
1461     ierr = SYRead(fd,rowlengths,M,SYINT); CHKERRQ(ierr);
1462     sndcounts = (int*) PETSCMALLOC( numtid*sizeof(int) ); CHKPTRQ(sndcounts);
1463     for ( i=0; i<numtid; i++ ) sndcounts[i] = rowners[i+1] - rowners[i];
1464     MPI_Scatterv(rowlengths,sndcounts,rowners,MPI_INT,ourlens,rend-rstart,MPI_INT,
1465                0,comm);
1466   }
1467   else {
1468     MPI_Scatterv(0,0,0,MPI_INT,ourlens,rend-rstart,MPI_INT, 0,comm);
1469   }
1470 
1471 
1472   if (!mytid) {
1473     /* calculate the number of nonzeros on each processor */
1474     procsnz = (int*) PETSCMALLOC( numtid*sizeof(int) ); CHKPTRQ(procsnz);
1475     PetscZero(procsnz,numtid*sizeof(int));
1476     for ( i=0; i<numtid; i++ ) {
1477       for ( j=rowners[i]; j< rowners[i+1]; j++ ) {
1478         procsnz[i] += rowlengths[j];
1479       }
1480     }
1481 
1482     /* determine max buffer needed and allocate it */
1483     maxnz = 0;
1484     for ( i=0; i<numtid; i++ ) {
1485       maxnz = PETSCMAX(maxnz,procsnz[i]);
1486     }
1487     cols = (int *) PETSCMALLOC( maxnz*sizeof(int) ); CHKPTRQ(cols);
1488 
1489     /* read in my part of the matrix column indices  */
1490     nz = procsnz[0];
1491     ierr = SYRead(fd,cols,nz,SYINT); CHKERRQ(ierr);
1492   }
1493   else {
1494     /* determine buffer space needed for message */
1495     nz = 0;
1496     for ( i=0; i<m; i++ ) {
1497       nz += ourlens[i];
1498     }
1499     cols = (int*) PETSCMALLOC( nz*sizeof(int) ); CHKPTRQ(cols);
1500 
1501     /* receive message of column indices*/
1502     MPI_Recv(cols,nz,MPI_INT,0,mat->tag,comm,&status);
1503     MPI_Get_count(&status,MPI_INT,&maxnz);
1504     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIRowbs: something is way wrong");
1505   }
1506 
1507   /* loop over local rows, determining number of off diagonal entries */
1508   PetscZero(offlens,m*sizeof(int));
1509   jj = 0;
1510   for ( i=0; i<m; i++ ) {
1511     for ( j=0; j<ourlens[i]; j++ ) {
1512       if (cols[jj] < rstart || cols[jj] >= rend) offlens[i]++;
1513       jj++;
1514     }
1515   }
1516   for ( i=0; i<m; i++ ) {
1517     ourlens[i] -= offlens[i];
1518   }
1519 
1520   /* create our matrix */
1521   ierr = MatCreateMPIAIJ(comm,m,PETSC_DECIDE,M,N,0,ourlens,0,offlens,newmat);
1522   CHKERRQ(ierr);
1523   mat = *newmat;
1524   PETSCFREE(ourlens);
1525 
1526   if (!mytid) {
1527     /* read in column index parts for all other processors */
1528     for ( i=1; i<numtid; i++ ) {
1529       nz = procsnz[i];
1530       ierr = SYRead(fd,cols,nz,SYINT); CHKERRQ(ierr);
1531       MPI_Send(cols,nz,MPI_INT,i,mat->tag,comm);
1532     }
1533     vals = (Scalar *) PETSCMALLOC( maxnz*sizeof(Scalar) ); CHKPTRQ(vals);
1534 
1535     /* read in my part of the matrix numerical values  */
1536     nz = procsnz[0];
1537     ierr = SYRead(fd,vals,nz,SYSCALAR); CHKERRQ(ierr);
1538   }
1539   else { /* receive numeric values */
1540 
1541 
1542     /* read in parts for all other processors */
1543     for ( i=1; i<numtid; i++ ) {
1544       nz = procsnz[i];
1545       ierr = SYRead(fd,vals,nz,SYSCALAR); CHKERRQ(ierr);
1546       MPI_Send(vals,nz,MPIU_SCALAR,i,mat->tag,comm);
1547     }
1548     PETSCFREE(vals);
1549 
1550   }
1551 
1552   ierr = MatAssemblyBegin(mat,FINAL_ASSEMBLY); CHKERRQ(ierr);
1553   ierr = MatAssemblyEnd(mat,FINAL_ASSEMBLY); CHKERRQ(ierr);
1554   return 0;
1555 }
1556