xref: /petsc/src/mat/impls/aij/mpi/mpiaij.c (revision c60448a52889ebeb9af4e25a0e1bb8def1ff3021)
1 #ifndef lint
2 static char vcid[] = "$Id: mpiaij.c,v 1.50 1995/06/14 17:24:10 bsmith Exp bsmith $";
3 #endif
4 
5 #include "mpiaij.h"
6 #include "vec/vecimpl.h"
7 #include "inline/spops.h"
8 
9 /* local utility routine that creates a mapping from the global column
10 number to the local number in the off-diagonal part of the local
11 storage of the matrix.  This is done in a non scable way since the
12 length of colmap equals the global matrix length.
13 */
14 static int CreateColmap(Mat mat)
15 {
16   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
17   Mat_AIJ    *B = (Mat_AIJ*) aij->B->data;
18   int        n = B->n,i;
19   aij->colmap = (int *) PETSCMALLOC( aij->N*sizeof(int) ); CHKPTRQ(aij->colmap);
20   PETSCMEMSET(aij->colmap,0,aij->N*sizeof(int));
21   for ( i=0; i<n; i++ ) aij->colmap[aij->garray[i]] = i+1;
22   return 0;
23 }
24 
25 static int MatSetValues_MPIAIJ(Mat mat,int m,int *idxm,int n,
26                             int *idxn,Scalar *v,InsertMode addv)
27 {
28   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
29   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
30   int        cstart = aij->cstart, cend = aij->cend,row,col;
31 
32   if (aij->insertmode != NOTSETVALUES && aij->insertmode != addv) {
33     SETERRQ(1,"You cannot mix inserts and adds");
34   }
35   aij->insertmode = addv;
36   for ( i=0; i<m; i++ ) {
37     if (idxm[i] < 0) SETERRQ(1,"Negative row index");
38     if (idxm[i] >= aij->M) SETERRQ(1,"Row index too large");
39     if (idxm[i] >= rstart && idxm[i] < rend) {
40       row = idxm[i] - rstart;
41       for ( j=0; j<n; j++ ) {
42         if (idxn[j] < 0) SETERRQ(1,"Negative column index");
43         if (idxn[j] >= aij->N) SETERRQ(1,"Column index too large");
44         if (idxn[j] >= cstart && idxn[j] < cend){
45           col = idxn[j] - cstart;
46           ierr = MatSetValues(aij->A,1,&row,1,&col,v+i*n+j,addv);CHKERRQ(ierr);
47         }
48         else {
49           if (aij->assembled) {
50             if (!aij->colmap) {ierr = CreateColmap(mat); CHKERRQ(ierr);}
51             col = aij->colmap[idxn[j]] - 1;
52             if (col < 0) {
53               SETERRQ(1,"Cannot insert new off diagonal block nonzero in\
54                      already\
55                      assembled matrix. Contact petsc-maint@mcs.anl.gov\
56                      if your need this feature");
57             }
58           }
59           else col = idxn[j];
60           ierr = MatSetValues(aij->B,1,&row,1,&col,v+i*n+j,addv);CHKERRQ(ierr);
61         }
62       }
63     }
64     else {
65       ierr = StashValues_Private(&aij->stash,idxm[i],n,idxn,v+i*n,addv);
66       CHKERRQ(ierr);
67     }
68   }
69   return 0;
70 }
71 
72 /*
73     the assembly code is a lot like the code for vectors, we should
74     sometime derive a single assembly code that can be used for
75     either case.
76 */
77 
78 static int MatAssemblyBegin_MPIAIJ(Mat mat,MatAssemblyType mode)
79 {
80   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
81   MPI_Comm    comm = mat->comm;
82   int         numtids = aij->numtids, *owners = aij->rowners;
83   int         mytid = aij->mytid;
84   MPI_Request *send_waits,*recv_waits;
85   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
86   int         tag = mat->tag, *owner,*starts,count,ierr;
87   InsertMode  addv;
88   Scalar      *rvalues,*svalues;
89 
90   /* make sure all processors are either in INSERTMODE or ADDMODE */
91   MPI_Allreduce((void *) &aij->insertmode,(void *) &addv,1,MPI_INT,
92                 MPI_BOR,comm);
93   if (addv == (ADDVALUES|INSERTVALUES)) {
94     SETERRQ(1,"Some processors have inserted while others have added");
95   }
96   aij->insertmode = addv; /* in case this processor had no cache */
97 
98   /*  first count number of contributors to each processor */
99   nprocs = (int *) PETSCMALLOC( 2*numtids*sizeof(int) ); CHKPTRQ(nprocs);
100   PETSCMEMSET(nprocs,0,2*numtids*sizeof(int)); procs = nprocs + numtids;
101   owner = (int *) PETSCMALLOC( (aij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
102   for ( i=0; i<aij->stash.n; i++ ) {
103     idx = aij->stash.idx[i];
104     for ( j=0; j<numtids; j++ ) {
105       if (idx >= owners[j] && idx < owners[j+1]) {
106         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
107       }
108     }
109   }
110   nsends = 0;  for ( i=0; i<numtids; i++ ) { nsends += procs[i];}
111 
112   /* inform other processors of number of messages and max length*/
113   work = (int *) PETSCMALLOC( numtids*sizeof(int) ); CHKPTRQ(work);
114   MPI_Allreduce((void *) procs,(void *) work,numtids,MPI_INT,MPI_SUM,comm);
115   nreceives = work[mytid];
116   MPI_Allreduce((void *) nprocs,(void *) work,numtids,MPI_INT,MPI_MAX,comm);
117   nmax = work[mytid];
118   PETSCFREE(work);
119 
120   /* post receives:
121        1) each message will consist of ordered pairs
122      (global index,value) we store the global index as a double
123      to simplify the message passing.
124        2) since we don't know how long each individual message is we
125      allocate the largest needed buffer for each receive. Potentially
126      this is a lot of wasted space.
127 
128 
129        This could be done better.
130   */
131   rvalues = (Scalar *) PETSCMALLOC(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
132   CHKPTRQ(rvalues);
133   recv_waits = (MPI_Request *) PETSCMALLOC((nreceives+1)*sizeof(MPI_Request));
134   CHKPTRQ(recv_waits);
135   for ( i=0; i<nreceives; i++ ) {
136     MPI_Irecv((void *)(rvalues+3*nmax*i),3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
137               comm,recv_waits+i);
138   }
139 
140   /* do sends:
141       1) starts[i] gives the starting index in svalues for stuff going to
142          the ith processor
143   */
144   svalues = (Scalar *) PETSCMALLOC( 3*(aij->stash.n+1)*sizeof(Scalar) );
145   CHKPTRQ(svalues);
146   send_waits = (MPI_Request *) PETSCMALLOC( (nsends+1)*sizeof(MPI_Request));
147   CHKPTRQ(send_waits);
148   starts = (int *) PETSCMALLOC( numtids*sizeof(int) ); CHKPTRQ(starts);
149   starts[0] = 0;
150   for ( i=1; i<numtids; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
151   for ( i=0; i<aij->stash.n; i++ ) {
152     svalues[3*starts[owner[i]]]       = (Scalar)  aij->stash.idx[i];
153     svalues[3*starts[owner[i]]+1]     = (Scalar)  aij->stash.idy[i];
154     svalues[3*(starts[owner[i]]++)+2] =  aij->stash.array[i];
155   }
156   PETSCFREE(owner);
157   starts[0] = 0;
158   for ( i=1; i<numtids; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
159   count = 0;
160   for ( i=0; i<numtids; i++ ) {
161     if (procs[i]) {
162       MPI_Isend((void*)(svalues+3*starts[i]),3*nprocs[i],MPIU_SCALAR,i,tag,
163                 comm,send_waits+count++);
164     }
165   }
166   PETSCFREE(starts); PETSCFREE(nprocs);
167 
168   /* Free cache space */
169   ierr = StashDestroy_Private(&aij->stash); CHKERRQ(ierr);
170 
171   aij->svalues    = svalues;    aij->rvalues = rvalues;
172   aij->nsends     = nsends;     aij->nrecvs = nreceives;
173   aij->send_waits = send_waits; aij->recv_waits = recv_waits;
174   aij->rmax       = nmax;
175 
176   return 0;
177 }
178 extern int MatSetUpMultiply_MPIAIJ(Mat);
179 
180 static int MatAssemblyEnd_MPIAIJ(Mat mat,MatAssemblyType mode)
181 {
182   int        ierr;
183   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
184 
185   MPI_Status  *send_status,recv_status;
186   int         imdex,nrecvs = aij->nrecvs, count = nrecvs, i, n;
187   int         row,col;
188   Scalar      *values,val;
189   InsertMode  addv = aij->insertmode;
190 
191   /*  wait on receives */
192   while (count) {
193     MPI_Waitany(nrecvs,aij->recv_waits,&imdex,&recv_status);
194     /* unpack receives into our local space */
195     values = aij->rvalues + 3*imdex*aij->rmax;
196     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
197     n = n/3;
198     for ( i=0; i<n; i++ ) {
199       row = (int) PETSCREAL(values[3*i]) - aij->rstart;
200       col = (int) PETSCREAL(values[3*i+1]);
201       val = values[3*i+2];
202       if (col >= aij->cstart && col < aij->cend) {
203           col -= aij->cstart;
204         MatSetValues(aij->A,1,&row,1,&col,&val,addv);
205       }
206       else {
207         if (aij->assembled) {
208           if (!aij->colmap) {ierr = CreateColmap(mat); CHKERRQ(ierr);}
209           col = aij->colmap[col] - 1;
210           if (col < 0) {
211             SETERRQ(1,"Cannot insert new off diagonal block nonzero in\
212                      already\
213                      assembled matrix. Contact petsc-maint@mcs.anl.gov\
214                      if your need this feature");
215           }
216         }
217         MatSetValues(aij->B,1,&row,1,&col,&val,addv);
218       }
219     }
220     count--;
221   }
222   PETSCFREE(aij->recv_waits); PETSCFREE(aij->rvalues);
223 
224   /* wait on sends */
225   if (aij->nsends) {
226     send_status = (MPI_Status *) PETSCMALLOC( aij->nsends*sizeof(MPI_Status) );
227     CHKPTRQ(send_status);
228     MPI_Waitall(aij->nsends,aij->send_waits,send_status);
229     PETSCFREE(send_status);
230   }
231   PETSCFREE(aij->send_waits); PETSCFREE(aij->svalues);
232 
233   aij->insertmode = NOTSETVALUES;
234   ierr = MatAssemblyBegin(aij->A,mode); CHKERRQ(ierr);
235   ierr = MatAssemblyEnd(aij->A,mode); CHKERRQ(ierr);
236 
237   if (!aij->assembled && mode == FINAL_ASSEMBLY) {
238     ierr = MatSetUpMultiply_MPIAIJ(mat); CHKERRQ(ierr);
239     aij->assembled = 1;
240   }
241   ierr = MatAssemblyBegin(aij->B,mode); CHKERRQ(ierr);
242   ierr = MatAssemblyEnd(aij->B,mode); CHKERRQ(ierr);
243 
244   return 0;
245 }
246 
247 static int MatZeroEntries_MPIAIJ(Mat A)
248 {
249   Mat_MPIAIJ *l = (Mat_MPIAIJ *) A->data;
250   int ierr;
251   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
252   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
253   return 0;
254 }
255 
256 /* again this uses the same basic stratagy as in the assembly and
257    scatter create routines, we should try to do it systemamatically
258    if we can figure out the proper level of generality. */
259 
260 /* the code does not do the diagonal entries correctly unless the
261    matrix is square and the column and row owerships are identical.
262    This is a BUG. The only way to fix it seems to be to access
263    aij->A and aij->B directly and not through the MatZeroRows()
264    routine.
265 */
266 static int MatZeroRows_MPIAIJ(Mat A,IS is,Scalar *diag)
267 {
268   Mat_MPIAIJ     *l = (Mat_MPIAIJ *) A->data;
269   int            i,ierr,N, *rows,*owners = l->rowners,numtids = l->numtids;
270   int            *procs,*nprocs,j,found,idx,nsends,*work;
271   int            nmax,*svalues,*starts,*owner,nrecvs,mytid = l->mytid;
272   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
273   int            *lens,imdex,*lrows,*values;
274   MPI_Comm       comm = A->comm;
275   MPI_Request    *send_waits,*recv_waits;
276   MPI_Status     recv_status,*send_status;
277   IS             istmp;
278 
279   if (!l->assembled)
280     SETERRQ(1,"MatZeroRows_MPIAIJ: Must assemble matrix first");
281   ierr = ISGetLocalSize(is,&N); CHKERRQ(ierr);
282   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
283 
284   /*  first count number of contributors to each processor */
285   nprocs = (int *) PETSCMALLOC( 2*numtids*sizeof(int) ); CHKPTRQ(nprocs);
286   PETSCMEMSET(nprocs,0,2*numtids*sizeof(int)); procs = nprocs + numtids;
287   owner = (int *) PETSCMALLOC((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
288   for ( i=0; i<N; i++ ) {
289     idx = rows[i];
290     found = 0;
291     for ( j=0; j<numtids; j++ ) {
292       if (idx >= owners[j] && idx < owners[j+1]) {
293         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
294       }
295     }
296     if (!found) SETERRQ(1,"Index out of range.");
297   }
298   nsends = 0;  for ( i=0; i<numtids; i++ ) { nsends += procs[i];}
299 
300   /* inform other processors of number of messages and max length*/
301   work = (int *) PETSCMALLOC( numtids*sizeof(int) ); CHKPTRQ(work);
302   MPI_Allreduce((void *) procs,(void *) work,numtids,MPI_INT,MPI_SUM,comm);
303   nrecvs = work[mytid];
304   MPI_Allreduce((void *) nprocs,(void *) work,numtids,MPI_INT,MPI_MAX,comm);
305   nmax = work[mytid];
306   PETSCFREE(work);
307 
308   /* post receives:   */
309   rvalues = (int *) PETSCMALLOC((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
310   CHKPTRQ(rvalues);
311   recv_waits = (MPI_Request *) PETSCMALLOC((nrecvs+1)*sizeof(MPI_Request));
312   CHKPTRQ(recv_waits);
313   for ( i=0; i<nrecvs; i++ ) {
314     MPI_Irecv((void *)(rvalues+nmax*i),nmax,MPI_INT,MPI_ANY_SOURCE,tag,
315               comm,recv_waits+i);
316   }
317 
318   /* do sends:
319       1) starts[i] gives the starting index in svalues for stuff going to
320          the ith processor
321   */
322   svalues = (int *) PETSCMALLOC( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
323   send_waits = (MPI_Request *) PETSCMALLOC( (nsends+1)*sizeof(MPI_Request));
324   CHKPTRQ(send_waits);
325   starts = (int *) PETSCMALLOC( (numtids+1)*sizeof(int) ); CHKPTRQ(starts);
326   starts[0] = 0;
327   for ( i=1; i<numtids; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
328   for ( i=0; i<N; i++ ) {
329     svalues[starts[owner[i]]++] = rows[i];
330   }
331   ISRestoreIndices(is,&rows);
332 
333   starts[0] = 0;
334   for ( i=1; i<numtids+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
335   count = 0;
336   for ( i=0; i<numtids; i++ ) {
337     if (procs[i]) {
338       MPI_Isend((void*)(svalues+starts[i]),nprocs[i],MPI_INT,i,tag,
339                 comm,send_waits+count++);
340     }
341   }
342   PETSCFREE(starts);
343 
344   base = owners[mytid];
345 
346   /*  wait on receives */
347   lens = (int *) PETSCMALLOC( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
348   source = lens + nrecvs;
349   count = nrecvs; slen = 0;
350   while (count) {
351     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
352     /* unpack receives into our local space */
353     MPI_Get_count(&recv_status,MPI_INT,&n);
354     source[imdex]  = recv_status.MPI_SOURCE;
355     lens[imdex]  = n;
356     slen += n;
357     count--;
358   }
359   PETSCFREE(recv_waits);
360 
361   /* move the data into the send scatter */
362   lrows = (int *) PETSCMALLOC( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
363   count = 0;
364   for ( i=0; i<nrecvs; i++ ) {
365     values = rvalues + i*nmax;
366     for ( j=0; j<lens[i]; j++ ) {
367       lrows[count++] = values[j] - base;
368     }
369   }
370   PETSCFREE(rvalues); PETSCFREE(lens);
371   PETSCFREE(owner); PETSCFREE(nprocs);
372 
373   /* actually zap the local rows */
374   ierr = ISCreateSequential(MPI_COMM_SELF,slen,lrows,&istmp);
375   CHKERRQ(ierr);  PETSCFREE(lrows);
376   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
377   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
378   ierr = ISDestroy(istmp); CHKERRQ(ierr);
379 
380   /* wait on sends */
381   if (nsends) {
382     send_status = (MPI_Status *) PETSCMALLOC( nsends*sizeof(MPI_Status) );
383     CHKPTRQ(send_status);
384     MPI_Waitall(nsends,send_waits,send_status);
385     PETSCFREE(send_status);
386   }
387   PETSCFREE(send_waits); PETSCFREE(svalues);
388 
389 
390   return 0;
391 }
392 
393 static int MatMult_MPIAIJ(Mat aijin,Vec xx,Vec yy)
394 {
395   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) aijin->data;
396   int        ierr;
397   if (!aij->assembled) SETERRQ(1,"MatMult_MPIAIJ: must assemble matrix first");
398   ierr = VecScatterBegin(xx,aij->lvec,INSERTVALUES,SCATTERALL,aij->Mvctx);
399   CHKERRQ(ierr);
400   ierr = MatMult(aij->A,xx,yy); CHKERRQ(ierr);
401   ierr = VecScatterEnd(xx,aij->lvec,INSERTVALUES,SCATTERALL,aij->Mvctx);
402   CHKERRQ(ierr);
403   ierr = MatMultAdd(aij->B,aij->lvec,yy,yy); CHKERRQ(ierr);
404   return 0;
405 }
406 
407 static int MatMultAdd_MPIAIJ(Mat aijin,Vec xx,Vec yy,Vec zz)
408 {
409   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) aijin->data;
410   int        ierr;
411   if (!aij->assembled) SETERRQ(1,"MatMult_MPIAIJ: must assemble matrix first");
412   ierr = VecScatterBegin(xx,aij->lvec,INSERTVALUES,SCATTERALL,aij->Mvctx);
413   CHKERRQ(ierr);
414   ierr = MatMultAdd(aij->A,xx,yy,zz); CHKERRQ(ierr);
415   ierr = VecScatterEnd(xx,aij->lvec,INSERTVALUES,SCATTERALL,aij->Mvctx);
416   CHKERRQ(ierr);
417   ierr = MatMultAdd(aij->B,aij->lvec,zz,zz); CHKERRQ(ierr);
418   return 0;
419 }
420 
421 static int MatMultTrans_MPIAIJ(Mat aijin,Vec xx,Vec yy)
422 {
423   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) aijin->data;
424   int        ierr;
425 
426   if (!aij->assembled)
427     SETERRQ(1,"MatMulTrans_MPIAIJ: must assemble matrix first");
428   /* do nondiagonal part */
429   ierr = MatMultTrans(aij->B,xx,aij->lvec); CHKERRQ(ierr);
430   /* send it on its way */
431   ierr = VecScatterBegin(aij->lvec,yy,ADDVALUES,
432            (ScatterMode)(SCATTERALL|SCATTERREVERSE),aij->Mvctx); CHKERRQ(ierr);
433   /* do local part */
434   ierr = MatMultTrans(aij->A,xx,yy); CHKERRQ(ierr);
435   /* receive remote parts: note this assumes the values are not actually */
436   /* inserted in yy until the next line, which is true for my implementation*/
437   /* but is not perhaps always true. */
438   ierr = VecScatterEnd(aij->lvec,yy,ADDVALUES,
439          (ScatterMode)(SCATTERALL|SCATTERREVERSE),aij->Mvctx); CHKERRQ(ierr);
440   return 0;
441 }
442 
443 static int MatMultTransAdd_MPIAIJ(Mat aijin,Vec xx,Vec yy,Vec zz)
444 {
445   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) aijin->data;
446   int        ierr;
447 
448   if (!aij->assembled)
449     SETERRQ(1,"MatMulTransAdd_MPIAIJ: must assemble matrix first");
450   /* do nondiagonal part */
451   ierr = MatMultTrans(aij->B,xx,aij->lvec); CHKERRQ(ierr);
452   /* send it on its way */
453   ierr = VecScatterBegin(aij->lvec,zz,ADDVALUES,
454          (ScatterMode)(SCATTERALL|SCATTERREVERSE),aij->Mvctx); CHKERRQ(ierr);
455   /* do local part */
456   ierr = MatMultTransAdd(aij->A,xx,yy,zz); CHKERRQ(ierr);
457   /* receive remote parts: note this assumes the values are not actually */
458   /* inserted in yy until the next line, which is true for my implementation*/
459   /* but is not perhaps always true. */
460   ierr = VecScatterEnd(aij->lvec,zz,ADDVALUES,
461        (ScatterMode)(SCATTERALL|SCATTERREVERSE),aij->Mvctx); CHKERRQ(ierr);
462   return 0;
463 }
464 
465 /*
466   This only works correctly for square matrices where the subblock A->A is the
467    diagonal block
468 */
469 static int MatGetDiagonal_MPIAIJ(Mat Ain,Vec v)
470 {
471   Mat_MPIAIJ *A = (Mat_MPIAIJ *) Ain->data;
472   if (!A->assembled) SETERRQ(1,"MatGetDiag_MPIAIJ: must assemble matrix first");
473   return MatGetDiagonal(A->A,v);
474 }
475 
476 static int MatDestroy_MPIAIJ(PetscObject obj)
477 {
478   Mat        mat = (Mat) obj;
479   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
480   int        ierr;
481 #if defined(PETSC_LOG)
482   PLogObjectState(obj,"Rows %d Cols %d",aij->M,aij->N);
483 #endif
484   PETSCFREE(aij->rowners);
485   ierr = MatDestroy(aij->A); CHKERRQ(ierr);
486   ierr = MatDestroy(aij->B); CHKERRQ(ierr);
487   if (aij->colmap) PETSCFREE(aij->colmap);
488   if (aij->garray) PETSCFREE(aij->garray);
489   if (aij->lvec) VecDestroy(aij->lvec);
490   if (aij->Mvctx) VecScatterCtxDestroy(aij->Mvctx);
491   PETSCFREE(aij);
492   PLogObjectDestroy(mat);
493   PETSCHEADERDESTROY(mat);
494   return 0;
495 }
496 #include "draw.h"
497 #include "pviewer.h"
498 
499 static int MatView_MPIAIJ(PetscObject obj,Viewer viewer)
500 {
501   Mat        mat = (Mat) obj;
502   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
503   int        ierr;
504   PetscObject vobj = (PetscObject) viewer;
505 
506   if (!aij->assembled) SETERRQ(1,"MatView_MPIAIJ: must assemble matrix first");
507   if (!viewer) { /* so that viewers may be used from debuggers */
508     viewer = STDOUT_VIEWER; vobj = (PetscObject) viewer;
509   }
510   if (vobj->cookie == DRAW_COOKIE && vobj->type == NULLWINDOW) return 0;
511   if (vobj->cookie == VIEWER_COOKIE && vobj->type == FILE_VIEWER) {
512     FILE *fd = ViewerFileGetPointer_Private(viewer);
513     MPIU_Seq_begin(mat->comm,1);
514     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
515              aij->mytid,aij->m,aij->rstart,aij->rend,aij->n,aij->cstart,
516              aij->cend);
517     ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
518     ierr = MatView(aij->B,viewer); CHKERRQ(ierr);
519     fflush(fd);
520     MPIU_Seq_end(mat->comm,1);
521   }
522   else if ((vobj->cookie == VIEWER_COOKIE && vobj->type == FILES_VIEWER) ||
523             vobj->cookie == DRAW_COOKIE) {
524     int numtids = aij->numtids, mytid = aij->mytid;
525     if (numtids == 1) {
526       ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
527     }
528     else {
529       /* assemble the entire matrix onto first processor. */
530       Mat     A;
531       Mat_AIJ *Aloc;
532       int     M = aij->M, N = aij->N,m,*ai,*aj,row,*cols,i,*ct;
533       Scalar  *a;
534 
535       if (!mytid) {
536         ierr = MatCreateMPIAIJ(mat->comm,M,N,M,N,0,0,0,0,&A);
537       }
538       else {
539         ierr = MatCreateMPIAIJ(mat->comm,0,0,M,N,0,0,0,0,&A);
540       }
541       CHKERRQ(ierr);
542 
543       /* copy over the A part */
544       Aloc = (Mat_AIJ*) aij->A->data;
545       m = Aloc->m; ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
546       row = aij->rstart;
547       for ( i=0; i<ai[m]; i++ ) {aj[i] += aij->cstart - 1;}
548       for ( i=0; i<m; i++ ) {
549         ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],aj,a,INSERTVALUES);
550         CHKERRQ(ierr);
551         row++; a += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
552       }
553       aj = Aloc->j;
554       for ( i=0; i<ai[m]; i++ ) {aj[i] -= aij->cstart - 1;}
555 
556       /* copy over the B part */
557       Aloc = (Mat_AIJ*) aij->B->data;
558       m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
559       row = aij->rstart;
560       ct = cols = (int *) PETSCMALLOC( (ai[m]+1)*sizeof(int) ); CHKPTRQ(cols);
561       for ( i=0; i<ai[m]; i++ ) {cols[i] = aij->garray[aj[i]-1];}
562       for ( i=0; i<m; i++ ) {
563         ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],cols,a,INSERTVALUES);
564         CHKERRQ(ierr);
565         row++; a += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
566       }
567       PETSCFREE(ct);
568       ierr = MatAssemblyBegin(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
569       ierr = MatAssemblyEnd(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
570       if (!mytid) {
571         ierr = MatView(((Mat_MPIAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
572       }
573       ierr = MatDestroy(A); CHKERRQ(ierr);
574     }
575   }
576   return 0;
577 }
578 
579 extern int MatMarkDiag_AIJ(Mat_AIJ  *);
580 /*
581     This has to provide several versions.
582 
583      1) per sequential
584      2) a) use only local smoothing updating outer values only once.
585         b) local smoothing updating outer values each inner iteration
586      3) color updating out values betwen colors.
587 */
588 static int MatRelax_MPIAIJ(Mat matin,Vec bb,double omega,MatSORType flag,
589                            double shift,int its,Vec xx)
590 {
591   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
592   Mat        AA = mat->A, BB = mat->B;
593   Mat_AIJ    *A = (Mat_AIJ *) AA->data, *B = (Mat_AIJ *)BB->data;
594   Scalar     zero = 0.0,*b,*x,*xs,*ls,d,*v,sum,scale,*t,*ts;
595   int        ierr,*idx, *diag;
596   int        n = mat->n, m = mat->m, i;
597   Vec        tt;
598 
599   if (!mat->assembled) SETERRQ(1,"MatRelax_MPIAIJ: must assemble matrix first");
600 
601   VecGetArray(xx,&x); VecGetArray(bb,&b); VecGetArray(mat->lvec,&ls);
602   xs = x -1; /* shift by one for index start of 1 */
603   ls--;
604   if (!A->diag) {if ((ierr = MatMarkDiag_AIJ(A))) return ierr;}
605   diag = A->diag;
606   if (flag == SOR_APPLY_UPPER || flag == SOR_APPLY_LOWER) {
607     SETERRQ(1,"That option not yet support for parallel AIJ matrices");
608   }
609   if (flag & SOR_EISENSTAT) {
610     /* Let  A = L + U + D; where L is lower trianglar,
611     U is upper triangular, E is diagonal; This routine applies
612 
613             (L + E)^{-1} A (U + E)^{-1}
614 
615     to a vector efficiently using Eisenstat's trick. This is for
616     the case of SSOR preconditioner, so E is D/omega where omega
617     is the relaxation factor.
618     */
619     ierr = VecDuplicate(xx,&tt); CHKERRQ(ierr);
620     VecGetArray(tt,&t);
621     scale = (2.0/omega) - 1.0;
622     /*  x = (E + U)^{-1} b */
623     VecSet(&zero,mat->lvec);
624     ierr = VecPipelineBegin(xx,mat->lvec,INSERTVALUES,PIPELINEUP,
625                               mat->Mvctx); CHKERRQ(ierr);
626     for ( i=m-1; i>-1; i-- ) {
627       n    = A->i[i+1] - diag[i] - 1;
628       idx  = A->j + diag[i];
629       v    = A->a + diag[i];
630       sum  = b[i];
631       SPARSEDENSEMDOT(sum,xs,v,idx,n);
632       d    = shift + A->a[diag[i]-1];
633       n    = B->i[i+1] - B->i[i];
634       idx  = B->j + B->i[i] - 1;
635       v    = B->a + B->i[i] - 1;
636       SPARSEDENSEMDOT(sum,ls,v,idx,n);
637       x[i] = omega*(sum/d);
638     }
639     ierr = VecPipelineEnd(xx,mat->lvec,INSERTVALUES,PIPELINEUP,
640                             mat->Mvctx); CHKERRQ(ierr);
641 
642     /*  t = b - (2*E - D)x */
643     v = A->a;
644     for ( i=0; i<m; i++ ) { t[i] = b[i] - scale*(v[*diag++ - 1])*x[i]; }
645 
646     /*  t = (E + L)^{-1}t */
647     ts = t - 1; /* shifted by one for index start of a or mat->j*/
648     diag = A->diag;
649     VecSet(&zero,mat->lvec);
650     ierr = VecPipelineBegin(tt,mat->lvec,INSERTVALUES,PIPELINEDOWN,
651                                                  mat->Mvctx); CHKERRQ(ierr);
652     for ( i=0; i<m; i++ ) {
653       n    = diag[i] - A->i[i];
654       idx  = A->j + A->i[i] - 1;
655       v    = A->a + A->i[i] - 1;
656       sum  = t[i];
657       SPARSEDENSEMDOT(sum,ts,v,idx,n);
658       d    = shift + A->a[diag[i]-1];
659       n    = B->i[i+1] - B->i[i];
660       idx  = B->j + B->i[i] - 1;
661       v    = B->a + B->i[i] - 1;
662       SPARSEDENSEMDOT(sum,ls,v,idx,n);
663       t[i] = omega*(sum/d);
664     }
665     ierr = VecPipelineEnd(tt,mat->lvec,INSERTVALUES,PIPELINEDOWN,
666                                                     mat->Mvctx); CHKERRQ(ierr);
667     /*  x = x + t */
668     for ( i=0; i<m; i++ ) { x[i] += t[i]; }
669     VecDestroy(tt);
670     return 0;
671   }
672 
673 
674   if ((flag & SOR_SYMMETRIC_SWEEP) == SOR_SYMMETRIC_SWEEP){
675     if (flag & SOR_ZERO_INITIAL_GUESS) {
676       VecSet(&zero,mat->lvec); VecSet(&zero,xx);
677     }
678     else {
679       ierr=VecScatterBegin(xx,mat->lvec,INSERTVALUES,SCATTERUP,mat->Mvctx);
680       CHKERRQ(ierr);
681       ierr = VecScatterEnd(xx,mat->lvec,INSERTVALUES,SCATTERUP,mat->Mvctx);
682       CHKERRQ(ierr);
683     }
684     while (its--) {
685       /* go down through the rows */
686       ierr = VecPipelineBegin(xx,mat->lvec,INSERTVALUES,PIPELINEDOWN,
687                               mat->Mvctx); CHKERRQ(ierr);
688       for ( i=0; i<m; i++ ) {
689         n    = A->i[i+1] - A->i[i];
690         idx  = A->j + A->i[i] - 1;
691         v    = A->a + A->i[i] - 1;
692         sum  = b[i];
693         SPARSEDENSEMDOT(sum,xs,v,idx,n);
694         d    = shift + A->a[diag[i]-1];
695         n    = B->i[i+1] - B->i[i];
696         idx  = B->j + B->i[i] - 1;
697         v    = B->a + B->i[i] - 1;
698         SPARSEDENSEMDOT(sum,ls,v,idx,n);
699         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
700       }
701       ierr = VecPipelineEnd(xx,mat->lvec,INSERTVALUES,PIPELINEDOWN,
702                             mat->Mvctx); CHKERRQ(ierr);
703       /* come up through the rows */
704       ierr = VecPipelineBegin(xx,mat->lvec,INSERTVALUES,PIPELINEUP,
705                               mat->Mvctx); CHKERRQ(ierr);
706       for ( i=m-1; i>-1; i-- ) {
707         n    = A->i[i+1] - A->i[i];
708         idx  = A->j + A->i[i] - 1;
709         v    = A->a + A->i[i] - 1;
710         sum  = b[i];
711         SPARSEDENSEMDOT(sum,xs,v,idx,n);
712         d    = shift + A->a[diag[i]-1];
713         n    = B->i[i+1] - B->i[i];
714         idx  = B->j + B->i[i] - 1;
715         v    = B->a + B->i[i] - 1;
716         SPARSEDENSEMDOT(sum,ls,v,idx,n);
717         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
718       }
719       ierr = VecPipelineEnd(xx,mat->lvec,INSERTVALUES,PIPELINEUP,
720                             mat->Mvctx); CHKERRQ(ierr);
721     }
722   }
723   else if (flag & SOR_FORWARD_SWEEP){
724     if (flag & SOR_ZERO_INITIAL_GUESS) {
725       VecSet(&zero,mat->lvec);
726       ierr = VecPipelineBegin(xx,mat->lvec,INSERTVALUES,PIPELINEDOWN,
727                               mat->Mvctx); CHKERRQ(ierr);
728       for ( i=0; i<m; i++ ) {
729         n    = diag[i] - A->i[i];
730         idx  = A->j + A->i[i] - 1;
731         v    = A->a + A->i[i] - 1;
732         sum  = b[i];
733         SPARSEDENSEMDOT(sum,xs,v,idx,n);
734         d    = shift + A->a[diag[i]-1];
735         n    = B->i[i+1] - B->i[i];
736         idx  = B->j + B->i[i] - 1;
737         v    = B->a + B->i[i] - 1;
738         SPARSEDENSEMDOT(sum,ls,v,idx,n);
739         x[i] = omega*(sum/d);
740       }
741       ierr = VecPipelineEnd(xx,mat->lvec,INSERTVALUES,PIPELINEDOWN,
742                             mat->Mvctx); CHKERRQ(ierr);
743       its--;
744     }
745     while (its--) {
746       ierr=VecScatterBegin(xx,mat->lvec,INSERTVALUES,SCATTERUP,mat->Mvctx);
747       CHKERRQ(ierr);
748       ierr = VecScatterEnd(xx,mat->lvec,INSERTVALUES,SCATTERUP,mat->Mvctx);
749       CHKERRQ(ierr);
750       ierr = VecPipelineBegin(xx,mat->lvec,INSERTVALUES,PIPELINEDOWN,
751                               mat->Mvctx); CHKERRQ(ierr);
752       for ( i=0; i<m; i++ ) {
753         n    = A->i[i+1] - A->i[i];
754         idx  = A->j + A->i[i] - 1;
755         v    = A->a + A->i[i] - 1;
756         sum  = b[i];
757         SPARSEDENSEMDOT(sum,xs,v,idx,n);
758         d    = shift + A->a[diag[i]-1];
759         n    = B->i[i+1] - B->i[i];
760         idx  = B->j + B->i[i] - 1;
761         v    = B->a + B->i[i] - 1;
762         SPARSEDENSEMDOT(sum,ls,v,idx,n);
763         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
764       }
765       ierr = VecPipelineEnd(xx,mat->lvec,INSERTVALUES,PIPELINEDOWN,
766                             mat->Mvctx); CHKERRQ(ierr);
767     }
768   }
769   else if (flag & SOR_BACKWARD_SWEEP){
770     if (flag & SOR_ZERO_INITIAL_GUESS) {
771       VecSet(&zero,mat->lvec);
772       ierr = VecPipelineBegin(xx,mat->lvec,INSERTVALUES,PIPELINEUP,
773                               mat->Mvctx); CHKERRQ(ierr);
774       for ( i=m-1; i>-1; i-- ) {
775         n    = A->i[i+1] - diag[i] - 1;
776         idx  = A->j + diag[i];
777         v    = A->a + diag[i];
778         sum  = b[i];
779         SPARSEDENSEMDOT(sum,xs,v,idx,n);
780         d    = shift + A->a[diag[i]-1];
781         n    = B->i[i+1] - B->i[i];
782         idx  = B->j + B->i[i] - 1;
783         v    = B->a + B->i[i] - 1;
784         SPARSEDENSEMDOT(sum,ls,v,idx,n);
785         x[i] = omega*(sum/d);
786       }
787       ierr = VecPipelineEnd(xx,mat->lvec,INSERTVALUES,PIPELINEUP,
788                             mat->Mvctx); CHKERRQ(ierr);
789       its--;
790     }
791     while (its--) {
792       ierr = VecScatterBegin(xx,mat->lvec,INSERTVALUES,SCATTERDOWN,
793                             mat->Mvctx); CHKERRQ(ierr);
794       ierr = VecScatterEnd(xx,mat->lvec,INSERTVALUES,SCATTERDOWN,
795                             mat->Mvctx); CHKERRQ(ierr);
796       ierr = VecPipelineBegin(xx,mat->lvec,INSERTVALUES,PIPELINEUP,
797                               mat->Mvctx); CHKERRQ(ierr);
798       for ( i=m-1; i>-1; i-- ) {
799         n    = A->i[i+1] - A->i[i];
800         idx  = A->j + A->i[i] - 1;
801         v    = A->a + A->i[i] - 1;
802         sum  = b[i];
803         SPARSEDENSEMDOT(sum,xs,v,idx,n);
804         d    = shift + A->a[diag[i]-1];
805         n    = B->i[i+1] - B->i[i];
806         idx  = B->j + B->i[i] - 1;
807         v    = B->a + B->i[i] - 1;
808         SPARSEDENSEMDOT(sum,ls,v,idx,n);
809         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
810       }
811       ierr = VecPipelineEnd(xx,mat->lvec,INSERTVALUES,PIPELINEUP,
812                             mat->Mvctx); CHKERRQ(ierr);
813     }
814   }
815   else if ((flag & SOR_LOCAL_SYMMETRIC_SWEEP) == SOR_LOCAL_SYMMETRIC_SWEEP){
816     if (flag & SOR_ZERO_INITIAL_GUESS) {
817       return MatRelax(mat->A,bb,omega,flag,shift,its,xx);
818     }
819     ierr=VecScatterBegin(xx,mat->lvec,INSERTVALUES,SCATTERALL,mat->Mvctx);
820     CHKERRQ(ierr);
821     ierr = VecScatterEnd(xx,mat->lvec,INSERTVALUES,SCATTERALL,mat->Mvctx);
822     CHKERRQ(ierr);
823     while (its--) {
824       /* go down through the rows */
825       for ( i=0; i<m; i++ ) {
826         n    = A->i[i+1] - A->i[i];
827         idx  = A->j + A->i[i] - 1;
828         v    = A->a + A->i[i] - 1;
829         sum  = b[i];
830         SPARSEDENSEMDOT(sum,xs,v,idx,n);
831         d    = shift + A->a[diag[i]-1];
832         n    = B->i[i+1] - B->i[i];
833         idx  = B->j + B->i[i] - 1;
834         v    = B->a + B->i[i] - 1;
835         SPARSEDENSEMDOT(sum,ls,v,idx,n);
836         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
837       }
838       /* come up through the rows */
839       for ( i=m-1; i>-1; i-- ) {
840         n    = A->i[i+1] - A->i[i];
841         idx  = A->j + A->i[i] - 1;
842         v    = A->a + A->i[i] - 1;
843         sum  = b[i];
844         SPARSEDENSEMDOT(sum,xs,v,idx,n);
845         d    = shift + A->a[diag[i]-1];
846         n    = B->i[i+1] - B->i[i];
847         idx  = B->j + B->i[i] - 1;
848         v    = B->a + B->i[i] - 1;
849         SPARSEDENSEMDOT(sum,ls,v,idx,n);
850         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
851       }
852     }
853   }
854   else if (flag & SOR_LOCAL_FORWARD_SWEEP){
855     if (flag & SOR_ZERO_INITIAL_GUESS) {
856       return MatRelax(mat->A,bb,omega,flag,shift,its,xx);
857     }
858     ierr=VecScatterBegin(xx,mat->lvec,INSERTVALUES,SCATTERALL,mat->Mvctx);
859     CHKERRQ(ierr);
860     ierr = VecScatterEnd(xx,mat->lvec,INSERTVALUES,SCATTERALL,mat->Mvctx);
861     CHKERRQ(ierr);
862     while (its--) {
863       for ( i=0; i<m; i++ ) {
864         n    = A->i[i+1] - A->i[i];
865         idx  = A->j + A->i[i] - 1;
866         v    = A->a + A->i[i] - 1;
867         sum  = b[i];
868         SPARSEDENSEMDOT(sum,xs,v,idx,n);
869         d    = shift + A->a[diag[i]-1];
870         n    = B->i[i+1] - B->i[i];
871         idx  = B->j + B->i[i] - 1;
872         v    = B->a + B->i[i] - 1;
873         SPARSEDENSEMDOT(sum,ls,v,idx,n);
874         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
875       }
876     }
877   }
878   else if (flag & SOR_LOCAL_BACKWARD_SWEEP){
879     if (flag & SOR_ZERO_INITIAL_GUESS) {
880       return MatRelax(mat->A,bb,omega,flag,shift,its,xx);
881     }
882     ierr = VecScatterBegin(xx,mat->lvec,INSERTVALUES,SCATTERALL,
883                             mat->Mvctx); CHKERRQ(ierr);
884     ierr = VecScatterEnd(xx,mat->lvec,INSERTVALUES,SCATTERALL,
885                             mat->Mvctx); CHKERRQ(ierr);
886     while (its--) {
887       for ( i=m-1; i>-1; i-- ) {
888         n    = A->i[i+1] - A->i[i];
889         idx  = A->j + A->i[i] - 1;
890         v    = A->a + A->i[i] - 1;
891         sum  = b[i];
892         SPARSEDENSEMDOT(sum,xs,v,idx,n);
893         d    = shift + A->a[diag[i]-1];
894         n    = B->i[i+1] - B->i[i];
895         idx  = B->j + B->i[i] - 1;
896         v    = B->a + B->i[i] - 1;
897         SPARSEDENSEMDOT(sum,ls,v,idx,n);
898         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
899       }
900     }
901   }
902   return 0;
903 }
904 
905 static int MatGetInfo_MPIAIJ(Mat matin,MatInfoType flag,int *nz,
906                              int *nzalloc,int *mem)
907 {
908   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
909   Mat        A = mat->A, B = mat->B;
910   int        ierr, isend[3], irecv[3], nzA, nzallocA, memA;
911 
912   ierr = MatGetInfo(A,MAT_LOCAL,&nzA,&nzallocA,&memA); CHKERRQ(ierr);
913   ierr = MatGetInfo(B,MAT_LOCAL,&isend[0],&isend[1],&isend[2]); CHKERRQ(ierr);
914   isend[0] += nzA; isend[1] += nzallocA; isend[2] += memA;
915   if (flag == MAT_LOCAL) {
916     *nz = isend[0]; *nzalloc = isend[1]; *mem = isend[2];
917   } else if (flag == MAT_GLOBAL_MAX) {
918     MPI_Allreduce((void *) isend,(void *) irecv,3,MPI_INT,MPI_MAX,matin->comm);
919     *nz = irecv[0]; *nzalloc = irecv[1]; *mem = irecv[2];
920   } else if (flag == MAT_GLOBAL_SUM) {
921     MPI_Allreduce((void *) isend,(void *) irecv,3,MPI_INT,MPI_SUM,matin->comm);
922     *nz = irecv[0]; *nzalloc = irecv[1]; *mem = irecv[2];
923   }
924   return 0;
925 }
926 
927 static int MatSetOption_MPIAIJ(Mat aijin,MatOption op)
928 {
929   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) aijin->data;
930 
931   if      (op == NO_NEW_NONZERO_LOCATIONS)  {
932     MatSetOption(aij->A,op);
933     MatSetOption(aij->B,op);
934   }
935   else if (op == YES_NEW_NONZERO_LOCATIONS) {
936     MatSetOption(aij->A,op);
937     MatSetOption(aij->B,op);
938   }
939   else if (op == COLUMN_ORIENTED) SETERRQ(1,"Column oriented not supported");
940   return 0;
941 }
942 
943 static int MatGetSize_MPIAIJ(Mat matin,int *m,int *n)
944 {
945   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
946   *m = mat->M; *n = mat->N;
947   return 0;
948 }
949 
950 static int MatGetLocalSize_MPIAIJ(Mat matin,int *m,int *n)
951 {
952   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
953   *m = mat->m; *n = mat->n;
954   return 0;
955 }
956 
957 static int MatGetOwnershipRange_MPIAIJ(Mat matin,int *m,int *n)
958 {
959   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
960   *m = mat->rstart; *n = mat->rend;
961   return 0;
962 }
963 
964 static int MatGetRow_MPIAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
965 {
966   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
967   Scalar     *vworkA, *vworkB, **pvA, **pvB;
968   int        i, ierr, *cworkA, *cworkB, **pcA, **pcB, cstart = mat->cstart;
969   int        nztot, nzA, nzB, lrow, rstart = mat->rstart, rend = mat->rend;
970 
971   if (!mat->assembled)
972     SETERRQ(1,"MatGetRow_MPIAIJ: Must assemble matrix first.");
973   if (row < rstart || row >= rend)
974     SETERRQ(1,"MatGetRow_MPIAIJ: Currently you can get only local rows.")
975   lrow = row - rstart;
976 
977   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
978   if (!v)   {pvA = 0; pvB = 0;}
979   if (!idx) {pcA = 0; if (!v) pcB = 0;}
980   ierr = MatGetRow(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
981   ierr = MatGetRow(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
982   nztot = nzA + nzB;
983 
984   if (v  || idx) {
985     if (nztot) {
986       /* Sort by increasing column numbers, assuming A and B already sorted */
987       int imark, imark2;
988       for (i=0; i<nzB; i++) cworkB[i] = mat->garray[cworkB[i]];
989       if (v) {
990         *v = (Scalar *) PETSCMALLOC( (nztot)*sizeof(Scalar) ); CHKPTRQ(*v);
991         for ( i=0; i<nzB; i++ ) {
992           if (cworkB[i] < cstart)   (*v)[i] = vworkB[i];
993           else break;
994         }
995         imark = i;
996         for ( i=0; i<nzA; i++ )     (*v)[imark+i] = vworkA[i];
997         imark2 = imark+nzA;
998         for ( i=imark; i<nzB; i++ ) (*v)[imark2+i] = vworkB[i];
999       }
1000       if (idx) {
1001         *idx = (int *) PETSCMALLOC( (nztot)*sizeof(int) ); CHKPTRQ(*idx);
1002         for (i=0; i<nzA; i++) cworkA[i] += cstart;
1003         for ( i=0; i<nzB; i++ ) {
1004           if (cworkB[i] < cstart)   (*idx)[i] = cworkB[i];
1005           else break;
1006         }
1007         imark = i;
1008         for ( i=0; i<nzA; i++ )     (*idx)[imark+i] = cworkA[i];
1009         imark2 = imark+nzA;
1010         for ( i=imark; i<nzB; i++ ) (*idx)[imark2+i] = cworkB[i];
1011       }
1012     }
1013     else {*idx = 0; *v=0;}
1014   }
1015   *nz = nztot;
1016   ierr = MatRestoreRow(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1017   ierr = MatRestoreRow(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1018   return 0;
1019 }
1020 
1021 static int MatRestoreRow_MPIAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
1022 {
1023   if (idx) PETSCFREE(*idx);
1024   if (v) PETSCFREE(*v);
1025   return 0;
1026 }
1027 
1028 extern int MatConvert_MPIAIJ(Mat,MatType,Mat *);
1029 static int MatCopyPrivate_MPIAIJ(Mat,Mat *);
1030 
1031 /* -------------------------------------------------------------------*/
1032 static struct _MatOps MatOps = {MatSetValues_MPIAIJ,
1033        MatGetRow_MPIAIJ,MatRestoreRow_MPIAIJ,
1034        MatMult_MPIAIJ,MatMultAdd_MPIAIJ,
1035        MatMultTrans_MPIAIJ,MatMultTransAdd_MPIAIJ,
1036        0,0,0,0,
1037        0,0,
1038        MatRelax_MPIAIJ,
1039        0,
1040        MatGetInfo_MPIAIJ,0,
1041        MatGetDiagonal_MPIAIJ,0,0,
1042        MatAssemblyBegin_MPIAIJ,MatAssemblyEnd_MPIAIJ,
1043        0,
1044        MatSetOption_MPIAIJ,MatZeroEntries_MPIAIJ,MatZeroRows_MPIAIJ,0,
1045        0,0,0,0,
1046        MatGetSize_MPIAIJ,MatGetLocalSize_MPIAIJ,MatGetOwnershipRange_MPIAIJ,
1047        0,0,
1048        0,0,MatConvert_MPIAIJ,0,0,MatCopyPrivate_MPIAIJ};
1049 
1050 /*@
1051    MatCreateMPIAIJ - Creates a sparse parallel matrix in AIJ format
1052    (the default parallel PETSc format).
1053 
1054    Input Parameters:
1055 .  comm - MPI communicator
1056 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1057 .  n - number of local columns (or PETSC_DECIDE to have calculated
1058            if N is given)
1059 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1060 .  N - number of global columns (or PETSC_DECIDE to have calculated
1061            if n is given)
1062 .  d_nz - number of nonzeros per row in diagonal portion of matrix
1063            (same for all local rows)
1064 .  d_nzz - number of nonzeros per row in diagonal portion of matrix or null
1065            (possibly different for each row).  You must leave room for the
1066            diagonal entry even if it is zero.
1067 .  o_nz - number of nonzeros per row in off-diagonal portion of matrix
1068            (same for all local rows)
1069 .  o_nzz - number of nonzeros per row in off-diagonal portion of matrix
1070            or null (possibly different for each row).
1071 
1072    Output Parameter:
1073 .  newmat - the matrix
1074 
1075    Notes:
1076    The AIJ format (also called the Yale sparse matrix format or
1077    compressed row storage), is fully compatible with standard Fortran 77
1078    storage.  That is, the stored row and column indices begin at
1079    one, not zero.
1080 
1081    The user MUST specify either the local or global matrix dimensions
1082    (possibly both).
1083 
1084    The user can set d_nz, d_nnz, o_nz, and o_nnz to zero for PETSc to
1085    control dynamic memory allocation.
1086 
1087 .keywords: matrix, aij, compressed row, sparse, parallel
1088 
1089 .seealso: MatCreate(), MatCreateSequentialAIJ(), MatSetValues()
1090 @*/
1091 int MatCreateMPIAIJ(MPI_Comm comm,int m,int n,int M,int N,
1092                  int d_nz,int *d_nnz, int o_nz,int *o_nnz,Mat *newmat)
1093 {
1094   Mat          mat;
1095   Mat_MPIAIJ   *aij;
1096   int          ierr, i,sum[2],work[2];
1097   *newmat         = 0;
1098   PETSCHEADERCREATE(mat,_Mat,MAT_COOKIE,MATMPIAIJ,comm);
1099   PLogObjectCreate(mat);
1100   mat->data       = (void *) (aij = PETSCNEW(Mat_MPIAIJ)); CHKPTRQ(aij);
1101   mat->ops        = &MatOps;
1102   mat->destroy    = MatDestroy_MPIAIJ;
1103   mat->view       = MatView_MPIAIJ;
1104   mat->factor     = 0;
1105 
1106   aij->insertmode = NOTSETVALUES;
1107   MPI_Comm_rank(comm,&aij->mytid);
1108   MPI_Comm_size(comm,&aij->numtids);
1109 
1110   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1111     work[0] = m; work[1] = n;
1112     MPI_Allreduce((void *) work,(void *) sum,2,MPI_INT,MPI_SUM,comm );
1113     if (M == PETSC_DECIDE) M = sum[0];
1114     if (N == PETSC_DECIDE) N = sum[1];
1115   }
1116   if (m == PETSC_DECIDE)
1117     {m = M/aij->numtids + ((M % aij->numtids) > aij->mytid);}
1118   if (n == PETSC_DECIDE)
1119     {n = N/aij->numtids + ((N % aij->numtids) > aij->mytid);}
1120   aij->m = m;
1121   aij->n = n;
1122   aij->N = N;
1123   aij->M = M;
1124 
1125   /* build local table of row and column ownerships */
1126   aij->rowners = (int *) PETSCMALLOC(2*(aij->numtids+2)*sizeof(int));
1127   CHKPTRQ(aij->rowners);
1128   aij->cowners = aij->rowners + aij->numtids + 1;
1129   MPI_Allgather(&m,1,MPI_INT,aij->rowners+1,1,MPI_INT,comm);
1130   aij->rowners[0] = 0;
1131   for ( i=2; i<=aij->numtids; i++ ) {
1132     aij->rowners[i] += aij->rowners[i-1];
1133   }
1134   aij->rstart = aij->rowners[aij->mytid];
1135   aij->rend   = aij->rowners[aij->mytid+1];
1136   MPI_Allgather(&n,1,MPI_INT,aij->cowners+1,1,MPI_INT,comm);
1137   aij->cowners[0] = 0;
1138   for ( i=2; i<=aij->numtids; i++ ) {
1139     aij->cowners[i] += aij->cowners[i-1];
1140   }
1141   aij->cstart = aij->cowners[aij->mytid];
1142   aij->cend   = aij->cowners[aij->mytid+1];
1143 
1144 
1145   ierr = MatCreateSequentialAIJ(MPI_COMM_SELF,m,n,d_nz,d_nnz,&aij->A);
1146   CHKERRQ(ierr);
1147   PLogObjectParent(mat,aij->A);
1148   ierr = MatCreateSequentialAIJ(MPI_COMM_SELF,m,N,o_nz,o_nnz,&aij->B);
1149   CHKERRQ(ierr);
1150   PLogObjectParent(mat,aij->B);
1151 
1152   /* build cache for off array entries formed */
1153   ierr = StashBuild_Private(&aij->stash); CHKERRQ(ierr);
1154   aij->colmap    = 0;
1155   aij->garray    = 0;
1156 
1157   /* stuff used for matrix vector multiply */
1158   aij->lvec      = 0;
1159   aij->Mvctx     = 0;
1160   aij->assembled = 0;
1161 
1162   *newmat = mat;
1163   return 0;
1164 }
1165 
1166 static int MatCopyPrivate_MPIAIJ(Mat matin,Mat *newmat)
1167 {
1168   Mat        mat;
1169   Mat_MPIAIJ *aij,*oldmat = (Mat_MPIAIJ *) matin->data;
1170   int        ierr, len;
1171   *newmat      = 0;
1172 
1173   if (!oldmat->assembled) SETERRQ(1,"Cannot copy unassembled matrix");
1174   PETSCHEADERCREATE(mat,_Mat,MAT_COOKIE,MATMPIAIJ,matin->comm);
1175   PLogObjectCreate(mat);
1176   mat->data       = (void *) (aij = PETSCNEW(Mat_MPIAIJ)); CHKPTRQ(aij);
1177   mat->ops        = &MatOps;
1178   mat->destroy    = MatDestroy_MPIAIJ;
1179   mat->view       = MatView_MPIAIJ;
1180   mat->factor     = matin->factor;
1181 
1182   aij->m          = oldmat->m;
1183   aij->n          = oldmat->n;
1184   aij->M          = oldmat->M;
1185   aij->N          = oldmat->N;
1186 
1187   aij->assembled  = 1;
1188   aij->rstart     = oldmat->rstart;
1189   aij->rend       = oldmat->rend;
1190   aij->cstart     = oldmat->cstart;
1191   aij->cend       = oldmat->cend;
1192   aij->numtids    = oldmat->numtids;
1193   aij->mytid      = oldmat->mytid;
1194   aij->insertmode = NOTSETVALUES;
1195 
1196   aij->rowners        = (int *) PETSCMALLOC( (aij->numtids+1)*sizeof(int) );
1197   CHKPTRQ(aij->rowners);
1198   PETSCMEMCPY(aij->rowners,oldmat->rowners,(aij->numtids+1)*sizeof(int));
1199   ierr = StashInitialize_Private(&aij->stash); CHKERRQ(ierr);
1200   if (oldmat->colmap) {
1201     aij->colmap      = (int *) PETSCMALLOC( (aij->N)*sizeof(int) );
1202     CHKPTRQ(aij->colmap);
1203     PETSCMEMCPY(aij->colmap,oldmat->colmap,(aij->N)*sizeof(int));
1204   } else aij->colmap = 0;
1205   if (oldmat->garray && (len = ((Mat_AIJ *) (oldmat->B->data))->n)) {
1206     aij->garray      = (int *) PETSCMALLOC(len*sizeof(int) ); CHKPTRQ(aij->garray);
1207     PETSCMEMCPY(aij->garray,oldmat->garray,len*sizeof(int));
1208   } else aij->garray = 0;
1209 
1210   ierr =  VecDuplicate(oldmat->lvec,&aij->lvec); CHKERRQ(ierr);
1211   PLogObjectParent(mat,aij->lvec);
1212   ierr =  VecScatterCtxCopy(oldmat->Mvctx,&aij->Mvctx); CHKERRQ(ierr);
1213   PLogObjectParent(mat,aij->Mvctx);
1214   ierr =  MatConvert(oldmat->A,MATSAME,&aij->A); CHKERRQ(ierr);
1215   PLogObjectParent(mat,aij->A);
1216   ierr =  MatConvert(oldmat->B,MATSAME,&aij->B); CHKERRQ(ierr);
1217   PLogObjectParent(mat,aij->B);
1218   *newmat = mat;
1219   return 0;
1220 }
1221