xref: /petsc/src/mat/impls/aij/mpi/mpiaij.c (revision e452cd2f5c6761e08e2d8d89e2f3e2dbc96d95af)
1 #ifndef lint
2 static char vcid[] = "$Id: mpiaij.c,v 1.46 1995/05/28 17:38:03 bsmith Exp bsmith $";
3 #endif
4 
5 #include "mpiaij.h"
6 #include "vec/vecimpl.h"
7 #include "inline/spops.h"
8 
9 /* local utility routine that creates a mapping from the global column
10 number to the local number in the off-diagonal part of the local
11 storage of the matrix.  This is done in a non scable way since the
12 length of colmap equals the global matrix length.
13 */
14 static int CreateColmap(Mat mat)
15 {
16   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
17   Mat_AIJ    *B = (Mat_AIJ*) aij->B->data;
18   int        n = B->n,i;
19   aij->colmap = (int *) MALLOC( aij->N*sizeof(int) ); CHKPTR(aij->colmap);
20   MEMSET(aij->colmap,0,aij->N*sizeof(int));
21   for ( i=0; i<n; i++ ) aij->colmap[aij->garray[i]] = i+1;
22   return 0;
23 }
24 
25 static int MatSetValues_MPIAIJ(Mat mat,int m,int *idxm,int n,
26                             int *idxn,Scalar *v,InsertMode addv)
27 {
28   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
29   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
30   int        cstart = aij->cstart, cend = aij->cend,row,col;
31 
32   if (aij->insertmode != NOTSETVALUES && aij->insertmode != addv) {
33     SETERR(1,"You cannot mix inserts and adds");
34   }
35   aij->insertmode = addv;
36   for ( i=0; i<m; i++ ) {
37     if (idxm[i] < 0) SETERR(1,"Negative row index");
38     if (idxm[i] >= aij->M) SETERR(1,"Row index too large");
39     if (idxm[i] >= rstart && idxm[i] < rend) {
40       row = idxm[i] - rstart;
41       for ( j=0; j<n; j++ ) {
42         if (idxn[j] < 0) SETERR(1,"Negative column index");
43         if (idxn[j] >= aij->N) SETERR(1,"Column index too large");
44         if (idxn[j] >= cstart && idxn[j] < cend){
45           col = idxn[j] - cstart;
46           ierr = MatSetValues(aij->A,1,&row,1,&col,v+i*n+j,addv);CHKERR(ierr);
47         }
48         else {
49           if (aij->assembled) {
50             if (!aij->colmap) {ierr = CreateColmap(mat); CHKERR(ierr);}
51             col = aij->colmap[idxn[j]] - 1;
52             if (col < 0) {
53               SETERR(1,"Cannot insert new off diagonal block nonzero in\
54                      already\
55                      assembled matrix. Contact petsc-maint@mcs.anl.gov\
56                      if your need this feature");
57             }
58           }
59           else col = idxn[j];
60           ierr = MatSetValues(aij->B,1,&row,1,&col,v+i*n+j,addv);CHKERR(ierr);
61         }
62       }
63     }
64     else {
65       ierr = StashValues_Private(&aij->stash,idxm[i],n,idxn,v+i*n,addv);
66       CHKERR(ierr);
67     }
68   }
69   return 0;
70 }
71 
72 /*
73     the assembly code is a lot like the code for vectors, we should
74     sometime derive a single assembly code that can be used for
75     either case.
76 */
77 
78 static int MatAssemblyBegin_MPIAIJ(Mat mat,MatAssemblyType mode)
79 {
80   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
81   MPI_Comm    comm = mat->comm;
82   int         numtids = aij->numtids, *owners = aij->rowners;
83   int         mytid = aij->mytid;
84   MPI_Request *send_waits,*recv_waits;
85   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
86   int         tag = 50, *owner,*starts,count,ierr;
87   InsertMode  addv;
88   Scalar      *rvalues,*svalues;
89 
90   /* make sure all processors are either in INSERTMODE or ADDMODE */
91   MPI_Allreduce((void *) &aij->insertmode,(void *) &addv,1,MPI_INT,
92                 MPI_BOR,comm);
93   if (addv == (ADDVALUES|INSERTVALUES)) {
94     SETERR(1,"Some processors have inserted while others have added");
95   }
96   aij->insertmode = addv; /* in case this processor had no cache */
97 
98   /*  first count number of contributors to each processor */
99   nprocs = (int *) MALLOC( 2*numtids*sizeof(int) ); CHKPTR(nprocs);
100   MEMSET(nprocs,0,2*numtids*sizeof(int)); procs = nprocs + numtids;
101   owner = (int *) MALLOC( (aij->stash.n+1)*sizeof(int) ); CHKPTR(owner);
102   for ( i=0; i<aij->stash.n; i++ ) {
103     idx = aij->stash.idx[i];
104     for ( j=0; j<numtids; j++ ) {
105       if (idx >= owners[j] && idx < owners[j+1]) {
106         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
107       }
108     }
109   }
110   nsends = 0;  for ( i=0; i<numtids; i++ ) { nsends += procs[i];}
111 
112   /* inform other processors of number of messages and max length*/
113   work = (int *) MALLOC( numtids*sizeof(int) ); CHKPTR(work);
114   MPI_Allreduce((void *) procs,(void *) work,numtids,MPI_INT,MPI_SUM,comm);
115   nreceives = work[mytid];
116   MPI_Allreduce((void *) nprocs,(void *) work,numtids,MPI_INT,MPI_MAX,comm);
117   nmax = work[mytid];
118   FREE(work);
119 
120   /* post receives:
121        1) each message will consist of ordered pairs
122      (global index,value) we store the global index as a double
123      to simplify the message passing.
124        2) since we don't know how long each individual message is we
125      allocate the largest needed buffer for each receive. Potentially
126      this is a lot of wasted space.
127 
128 
129        This could be done better.
130   */
131   rvalues = (Scalar *) MALLOC(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
132   CHKPTR(rvalues);
133   recv_waits = (MPI_Request *) MALLOC((nreceives+1)*sizeof(MPI_Request));
134   CHKPTR(recv_waits);
135   for ( i=0; i<nreceives; i++ ) {
136     MPI_Irecv((void *)(rvalues+3*nmax*i),3*nmax,MPI_SCALAR,MPI_ANY_SOURCE,tag,
137               comm,recv_waits+i);
138   }
139 
140   /* do sends:
141       1) starts[i] gives the starting index in svalues for stuff going to
142          the ith processor
143   */
144   svalues = (Scalar *) MALLOC( 3*(aij->stash.n+1)*sizeof(Scalar) );
145   CHKPTR(svalues);
146   send_waits = (MPI_Request *) MALLOC( (nsends+1)*sizeof(MPI_Request));
147   CHKPTR(send_waits);
148   starts = (int *) MALLOC( numtids*sizeof(int) ); CHKPTR(starts);
149   starts[0] = 0;
150   for ( i=1; i<numtids; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
151   for ( i=0; i<aij->stash.n; i++ ) {
152     svalues[3*starts[owner[i]]]       = (Scalar)  aij->stash.idx[i];
153     svalues[3*starts[owner[i]]+1]     = (Scalar)  aij->stash.idy[i];
154     svalues[3*(starts[owner[i]]++)+2] =  aij->stash.array[i];
155   }
156   FREE(owner);
157   starts[0] = 0;
158   for ( i=1; i<numtids; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
159   count = 0;
160   for ( i=0; i<numtids; i++ ) {
161     if (procs[i]) {
162       MPI_Isend((void*)(svalues+3*starts[i]),3*nprocs[i],MPI_SCALAR,i,tag,
163                 comm,send_waits+count++);
164     }
165   }
166   FREE(starts); FREE(nprocs);
167 
168   /* Free cache space */
169   ierr = StashDestroy_Private(&aij->stash); CHKERR(ierr);
170 
171   aij->svalues    = svalues;    aij->rvalues = rvalues;
172   aij->nsends     = nsends;     aij->nrecvs = nreceives;
173   aij->send_waits = send_waits; aij->recv_waits = recv_waits;
174   aij->rmax       = nmax;
175 
176   return 0;
177 }
178 extern int MatSetUpMultiply_MPIAIJ(Mat);
179 
180 static int MatAssemblyEnd_MPIAIJ(Mat mat,MatAssemblyType mode)
181 {
182   int        ierr;
183   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
184 
185   MPI_Status  *send_status,recv_status;
186   int         imdex,nrecvs = aij->nrecvs, count = nrecvs, i, n;
187   int         row,col;
188   Scalar      *values,val;
189   InsertMode  addv = aij->insertmode;
190 
191   /*  wait on receives */
192   while (count) {
193     MPI_Waitany(nrecvs,aij->recv_waits,&imdex,&recv_status);
194     /* unpack receives into our local space */
195     values = aij->rvalues + 3*imdex*aij->rmax;
196     MPI_Get_count(&recv_status,MPI_SCALAR,&n);
197     n = n/3;
198     for ( i=0; i<n; i++ ) {
199       row = (int) PETSCREAL(values[3*i]) - aij->rstart;
200       col = (int) PETSCREAL(values[3*i+1]);
201       val = values[3*i+2];
202       if (col >= aij->cstart && col < aij->cend) {
203           col -= aij->cstart;
204         MatSetValues(aij->A,1,&row,1,&col,&val,addv);
205       }
206       else {
207         if (aij->assembled) {
208           if (!aij->colmap) {ierr = CreateColmap(mat); CHKERR(ierr);}
209           col = aij->colmap[col] - 1;
210           if (col < 0) {
211             SETERR(1,"Cannot insert new off diagonal block nonzero in\
212                      already\
213                      assembled matrix. Contact petsc-maint@mcs.anl.gov\
214                      if your need this feature");
215           }
216         }
217         MatSetValues(aij->B,1,&row,1,&col,&val,addv);
218       }
219     }
220     count--;
221   }
222   FREE(aij->recv_waits); FREE(aij->rvalues);
223 
224   /* wait on sends */
225   if (aij->nsends) {
226     send_status = (MPI_Status *) MALLOC( aij->nsends*sizeof(MPI_Status) );
227     CHKPTR(send_status);
228     MPI_Waitall(aij->nsends,aij->send_waits,send_status);
229     FREE(send_status);
230   }
231   FREE(aij->send_waits); FREE(aij->svalues);
232 
233   aij->insertmode = NOTSETVALUES;
234   ierr = MatAssemblyBegin(aij->A,mode); CHKERR(ierr);
235   ierr = MatAssemblyEnd(aij->A,mode); CHKERR(ierr);
236 
237   if (!aij->assembled && mode == FINAL_ASSEMBLY) {
238     ierr = MatSetUpMultiply_MPIAIJ(mat); CHKERR(ierr);
239     aij->assembled = 1;
240   }
241   ierr = MatAssemblyBegin(aij->B,mode); CHKERR(ierr);
242   ierr = MatAssemblyEnd(aij->B,mode); CHKERR(ierr);
243 
244   return 0;
245 }
246 
247 static int MatZeroEntries_MPIAIJ(Mat A)
248 {
249   Mat_MPIAIJ *l = (Mat_MPIAIJ *) A->data;
250   int ierr;
251   ierr = MatZeroEntries(l->A); CHKERR(ierr);
252   ierr = MatZeroEntries(l->B); CHKERR(ierr);
253   return 0;
254 }
255 
256 /* again this uses the same basic stratagy as in the assembly and
257    scatter create routines, we should try to do it systemamatically
258    if we can figure out the proper level of generality. */
259 
260 /* the code does not do the diagonal entries correctly unless the
261    matrix is square and the column and row owerships are identical.
262    This is a BUG. The only way to fix it seems to be to access
263    aij->A and aij->B directly and not through the MatZeroRows()
264    routine.
265 */
266 static int MatZeroRows_MPIAIJ(Mat A,IS is,Scalar *diag)
267 {
268   Mat_MPIAIJ     *l = (Mat_MPIAIJ *) A->data;
269   int            i,ierr,N, *rows,*owners = l->rowners,numtids = l->numtids;
270   int            *procs,*nprocs,j,found,idx,nsends,*work;
271   int            nmax,*svalues,*starts,*owner,nrecvs,mytid = l->mytid;
272   int            *rvalues,tag = 67,count,base,slen,n,*source;
273   int            *lens,imdex,*lrows,*values;
274   MPI_Comm       comm = A->comm;
275   MPI_Request    *send_waits,*recv_waits;
276   MPI_Status     recv_status,*send_status;
277   IS             istmp;
278 
279   if (!l->assembled) SETERR(1,"MatZeroRows_MPIAIJ: must assemble matrix first");
280   ierr = ISGetLocalSize(is,&N); CHKERR(ierr);
281   ierr = ISGetIndices(is,&rows); CHKERR(ierr);
282 
283   /*  first count number of contributors to each processor */
284   nprocs = (int *) MALLOC( 2*numtids*sizeof(int) ); CHKPTR(nprocs);
285   MEMSET(nprocs,0,2*numtids*sizeof(int)); procs = nprocs + numtids;
286   owner = (int *) MALLOC((N+1)*sizeof(int)); CHKPTR(owner); /* see note*/
287   for ( i=0; i<N; i++ ) {
288     idx = rows[i];
289     found = 0;
290     for ( j=0; j<numtids; j++ ) {
291       if (idx >= owners[j] && idx < owners[j+1]) {
292         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
293       }
294     }
295     if (!found) SETERR(1,"Imdex out of range");
296   }
297   nsends = 0;  for ( i=0; i<numtids; i++ ) { nsends += procs[i];}
298 
299   /* inform other processors of number of messages and max length*/
300   work = (int *) MALLOC( numtids*sizeof(int) ); CHKPTR(work);
301   MPI_Allreduce((void *) procs,(void *) work,numtids,MPI_INT,MPI_SUM,comm);
302   nrecvs = work[mytid];
303   MPI_Allreduce((void *) nprocs,(void *) work,numtids,MPI_INT,MPI_MAX,comm);
304   nmax = work[mytid];
305   FREE(work);
306 
307   /* post receives:   */
308   rvalues = (int *) MALLOC((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
309   CHKPTR(rvalues);
310   recv_waits = (MPI_Request *) MALLOC((nrecvs+1)*sizeof(MPI_Request));
311   CHKPTR(recv_waits);
312   for ( i=0; i<nrecvs; i++ ) {
313     MPI_Irecv((void *)(rvalues+nmax*i),nmax,MPI_INT,MPI_ANY_SOURCE,tag,
314               comm,recv_waits+i);
315   }
316 
317   /* do sends:
318       1) starts[i] gives the starting index in svalues for stuff going to
319          the ith processor
320   */
321   svalues = (int *) MALLOC( (N+1)*sizeof(int) ); CHKPTR(svalues);
322   send_waits = (MPI_Request *) MALLOC( (nsends+1)*sizeof(MPI_Request));
323   CHKPTR(send_waits);
324   starts = (int *) MALLOC( (numtids+1)*sizeof(int) ); CHKPTR(starts);
325   starts[0] = 0;
326   for ( i=1; i<numtids; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
327   for ( i=0; i<N; i++ ) {
328     svalues[starts[owner[i]]++] = rows[i];
329   }
330   ISRestoreIndices(is,&rows);
331 
332   starts[0] = 0;
333   for ( i=1; i<numtids+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
334   count = 0;
335   for ( i=0; i<numtids; i++ ) {
336     if (procs[i]) {
337       MPI_Isend((void*)(svalues+starts[i]),nprocs[i],MPI_INT,i,tag,
338                 comm,send_waits+count++);
339     }
340   }
341   FREE(starts);
342 
343   base = owners[mytid];
344 
345   /*  wait on receives */
346   lens = (int *) MALLOC( 2*(nrecvs+1)*sizeof(int) ); CHKPTR(lens);
347   source = lens + nrecvs;
348   count = nrecvs; slen = 0;
349   while (count) {
350     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
351     /* unpack receives into our local space */
352     MPI_Get_count(&recv_status,MPI_INT,&n);
353     source[imdex]  = recv_status.MPI_SOURCE;
354     lens[imdex]  = n;
355     slen += n;
356     count--;
357   }
358   FREE(recv_waits);
359 
360   /* move the data into the send scatter */
361   lrows = (int *) MALLOC( (slen+1)*sizeof(int) ); CHKPTR(lrows);
362   count = 0;
363   for ( i=0; i<nrecvs; i++ ) {
364     values = rvalues + i*nmax;
365     for ( j=0; j<lens[i]; j++ ) {
366       lrows[count++] = values[j] - base;
367     }
368   }
369   FREE(rvalues); FREE(lens);
370   FREE(owner); FREE(nprocs);
371 
372   /* actually zap the local rows */
373   ierr = ISCreateSequential(MPI_COMM_SELF,slen,lrows,&istmp);
374   CHKERR(ierr);  FREE(lrows);
375   ierr = MatZeroRows(l->A,istmp,diag); CHKERR(ierr);
376   ierr = MatZeroRows(l->B,istmp,0); CHKERR(ierr);
377   ierr = ISDestroy(istmp); CHKERR(ierr);
378 
379   /* wait on sends */
380   if (nsends) {
381     send_status = (MPI_Status *) MALLOC( nsends*sizeof(MPI_Status) );
382     CHKPTR(send_status);
383     MPI_Waitall(nsends,send_waits,send_status);
384     FREE(send_status);
385   }
386   FREE(send_waits); FREE(svalues);
387 
388 
389   return 0;
390 }
391 
392 static int MatMult_MPIAIJ(Mat aijin,Vec xx,Vec yy)
393 {
394   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) aijin->data;
395   int        ierr;
396   if (!aij->assembled) SETERR(1,"MatMult_MPIAIJ: must assemble matrix first");
397   ierr = VecScatterBegin(xx,aij->lvec,INSERTVALUES,SCATTERALL,aij->Mvctx);
398   CHKERR(ierr);
399   ierr = MatMult(aij->A,xx,yy); CHKERR(ierr);
400   ierr = VecScatterEnd(xx,aij->lvec,INSERTVALUES,SCATTERALL,aij->Mvctx);
401   CHKERR(ierr);
402   ierr = MatMultAdd(aij->B,aij->lvec,yy,yy); CHKERR(ierr);
403   return 0;
404 }
405 
406 static int MatMultAdd_MPIAIJ(Mat aijin,Vec xx,Vec yy,Vec zz)
407 {
408   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) aijin->data;
409   int        ierr;
410   if (!aij->assembled) SETERR(1,"MatMult_MPIAIJ: must assemble matrix first");
411   ierr = VecScatterBegin(xx,aij->lvec,INSERTVALUES,SCATTERALL,aij->Mvctx);
412   CHKERR(ierr);
413   ierr = MatMultAdd(aij->A,xx,yy,zz); CHKERR(ierr);
414   ierr = VecScatterEnd(xx,aij->lvec,INSERTVALUES,SCATTERALL,aij->Mvctx);
415   CHKERR(ierr);
416   ierr = MatMultAdd(aij->B,aij->lvec,zz,zz); CHKERR(ierr);
417   return 0;
418 }
419 
420 static int MatMultTrans_MPIAIJ(Mat aijin,Vec xx,Vec yy)
421 {
422   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) aijin->data;
423   int        ierr;
424 
425   if (!aij->assembled)
426     SETERR(1,"MatMulTrans_MPIAIJ: must assemble matrix first");
427   /* do nondiagonal part */
428   ierr = MatMultTrans(aij->B,xx,aij->lvec); CHKERR(ierr);
429   /* send it on its way */
430   ierr = VecScatterBegin(aij->lvec,yy,ADDVALUES,
431            (ScatterMode)(SCATTERALL|SCATTERREVERSE),aij->Mvctx); CHKERR(ierr);
432   /* do local part */
433   ierr = MatMultTrans(aij->A,xx,yy); CHKERR(ierr);
434   /* receive remote parts: note this assumes the values are not actually */
435   /* inserted in yy until the next line, which is true for my implementation*/
436   /* but is not perhaps always true. */
437   ierr = VecScatterEnd(aij->lvec,yy,ADDVALUES,
438          (ScatterMode)(SCATTERALL|SCATTERREVERSE),aij->Mvctx); CHKERR(ierr);
439   return 0;
440 }
441 
442 static int MatMultTransAdd_MPIAIJ(Mat aijin,Vec xx,Vec yy,Vec zz)
443 {
444   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) aijin->data;
445   int        ierr;
446 
447   if (!aij->assembled)
448     SETERR(1,"MatMulTransAdd_MPIAIJ: must assemble matrix first");
449   /* do nondiagonal part */
450   ierr = MatMultTrans(aij->B,xx,aij->lvec); CHKERR(ierr);
451   /* send it on its way */
452   ierr = VecScatterBegin(aij->lvec,zz,ADDVALUES,
453          (ScatterMode)(SCATTERALL|SCATTERREVERSE),aij->Mvctx); CHKERR(ierr);
454   /* do local part */
455   ierr = MatMultTransAdd(aij->A,xx,yy,zz); CHKERR(ierr);
456   /* receive remote parts: note this assumes the values are not actually */
457   /* inserted in yy until the next line, which is true for my implementation*/
458   /* but is not perhaps always true. */
459   ierr = VecScatterEnd(aij->lvec,zz,ADDVALUES,
460        (ScatterMode)(SCATTERALL|SCATTERREVERSE),aij->Mvctx); CHKERR(ierr);
461   return 0;
462 }
463 
464 /*
465   This only works correctly for square matrices where the subblock A->A is the
466    diagonal block
467 */
468 static int MatGetDiagonal_MPIAIJ(Mat Ain,Vec v)
469 {
470   Mat_MPIAIJ *A = (Mat_MPIAIJ *) Ain->data;
471   if (!A->assembled) SETERR(1,"MatGetDiag_MPIAIJ: must assemble matrix first");
472   return MatGetDiagonal(A->A,v);
473 }
474 
475 static int MatDestroy_MPIAIJ(PetscObject obj)
476 {
477   Mat        mat = (Mat) obj;
478   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
479   int        ierr;
480 #if defined(PETSC_LOG)
481   PLogObjectState(obj,"Rows %d Cols %d",aij->M,aij->N);
482 #endif
483   FREE(aij->rowners);
484   ierr = MatDestroy(aij->A); CHKERR(ierr);
485   ierr = MatDestroy(aij->B); CHKERR(ierr);
486   if (aij->colmap) FREE(aij->colmap);
487   if (aij->garray) FREE(aij->garray);
488   if (aij->lvec) VecDestroy(aij->lvec);
489   if (aij->Mvctx) VecScatterCtxDestroy(aij->Mvctx);
490   FREE(aij);
491   PLogObjectDestroy(mat);
492   PETSCHEADERDESTROY(mat);
493   return 0;
494 }
495 #include "draw.h"
496 #include "pviewer.h"
497 
498 static int MatView_MPIAIJ(PetscObject obj,Viewer viewer)
499 {
500   Mat        mat = (Mat) obj;
501   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
502   int        ierr;
503   PetscObject vobj = (PetscObject) viewer;
504 
505   if (!aij->assembled) SETERR(1,"MatView_MPIAIJ: must assemble matrix first");
506   if (!viewer) { /* so that viewers may be used from debuggers */
507     viewer = STDOUT_VIEWER; vobj = (PetscObject) viewer;
508   }
509   if (vobj->cookie == DRAW_COOKIE && vobj->type == NULLWINDOW) return 0;
510   if (vobj->cookie == VIEWER_COOKIE && vobj->type == FILE_VIEWER) {
511     FILE *fd = ViewerFileGetPointer_Private(viewer);
512     MPIU_Seq_begin(mat->comm,1);
513     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
514              aij->mytid,aij->m,aij->rstart,aij->rend,aij->n,aij->cstart,
515              aij->cend);
516     ierr = MatView(aij->A,viewer); CHKERR(ierr);
517     ierr = MatView(aij->B,viewer); CHKERR(ierr);
518     fflush(fd);
519     MPIU_Seq_end(mat->comm,1);
520   }
521   else if ((vobj->cookie == VIEWER_COOKIE && vobj->type == FILES_VIEWER) ||
522             vobj->cookie == DRAW_COOKIE) {
523     int numtids = aij->numtids, mytid = aij->mytid;
524     if (numtids == 1) {
525       ierr = MatView(aij->A,viewer); CHKERR(ierr);
526     }
527     else {
528       /* assemble the entire matrix onto first processor. */
529       Mat     A;
530       Mat_AIJ *Aloc;
531       int     M = aij->M, N = aij->N,m,*ai,*aj,row,*cols,i,*ct;
532       Scalar  *a;
533 
534       if (!mytid) {
535         ierr = MatCreateMPIAIJ(mat->comm,M,N,M,N,0,0,0,0,&A);
536       }
537       else {
538         ierr = MatCreateMPIAIJ(mat->comm,0,0,M,N,0,0,0,0,&A);
539       }
540       CHKERR(ierr);
541 
542       /* copy over the A part */
543       Aloc = (Mat_AIJ*) aij->A->data;
544       m = Aloc->m; ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
545       row = aij->rstart;
546       for ( i=0; i<ai[m]; i++ ) {aj[i] += aij->cstart - 1;}
547       for ( i=0; i<m; i++ ) {
548         ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],aj,a,INSERTVALUES);
549         CHKERR(ierr);
550         row++; a += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
551       }
552       aj = Aloc->j;
553       for ( i=0; i<ai[m]; i++ ) {aj[i] -= aij->cstart - 1;}
554 
555       /* copy over the B part */
556       Aloc = (Mat_AIJ*) aij->B->data;
557       m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
558       row = aij->rstart;
559       ct = cols = (int *) MALLOC( (ai[m]+1)*sizeof(int) ); CHKPTR(cols);
560       for ( i=0; i<ai[m]; i++ ) {cols[i] = aij->garray[aj[i]-1];}
561       for ( i=0; i<m; i++ ) {
562         ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],cols,a,INSERTVALUES);
563         CHKERR(ierr);
564         row++; a += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
565       }
566       FREE(ct);
567       ierr = MatAssemblyBegin(A,FINAL_ASSEMBLY); CHKERR(ierr);
568       ierr = MatAssemblyEnd(A,FINAL_ASSEMBLY); CHKERR(ierr);
569       if (!mytid) {
570         ierr = MatView(((Mat_MPIAIJ*)(A->data))->A,viewer); CHKERR(ierr);
571       }
572       ierr = MatDestroy(A); CHKERR(ierr);
573     }
574   }
575   return 0;
576 }
577 
578 extern int MatMarkDiag_AIJ(Mat_AIJ  *);
579 /*
580     This has to provide several versions.
581 
582      1) per sequential
583      2) a) use only local smoothing updating outer values only once.
584         b) local smoothing updating outer values each inner iteration
585      3) color updating out values betwen colors.
586 */
587 static int MatRelax_MPIAIJ(Mat matin,Vec bb,double omega,MatSORType flag,
588                            double shift,int its,Vec xx)
589 {
590   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
591   Mat        AA = mat->A, BB = mat->B;
592   Mat_AIJ    *A = (Mat_AIJ *) AA->data, *B = (Mat_AIJ *)BB->data;
593   Scalar     zero = 0.0,*b,*x,*xs,*ls,d,*v,sum,scale,*t,*ts;
594   int        ierr,*idx, *diag;
595   int        n = mat->n, m = mat->m, i;
596   Vec        tt;
597 
598   if (!mat->assembled) SETERR(1,"MatRelax_MPIAIJ: must assemble matrix first");
599 
600   VecGetArray(xx,&x); VecGetArray(bb,&b); VecGetArray(mat->lvec,&ls);
601   xs = x -1; /* shift by one for index start of 1 */
602   ls--;
603   if (!A->diag) {if ((ierr = MatMarkDiag_AIJ(A))) return ierr;}
604   diag = A->diag;
605   if (flag == SOR_APPLY_UPPER || flag == SOR_APPLY_LOWER) {
606     SETERR(1,"That option not yet support for parallel AIJ matrices");
607   }
608   if (flag & SOR_EISENSTAT) {
609     /* Let  A = L + U + D; where L is lower trianglar,
610     U is upper triangular, E is diagonal; This routine applies
611 
612             (L + E)^{-1} A (U + E)^{-1}
613 
614     to a vector efficiently using Eisenstat's trick. This is for
615     the case of SSOR preconditioner, so E is D/omega where omega
616     is the relaxation factor.
617     */
618     ierr = VecDuplicate(xx,&tt); CHKERR(ierr);
619     VecGetArray(tt,&t);
620     scale = (2.0/omega) - 1.0;
621     /*  x = (E + U)^{-1} b */
622     VecSet(&zero,mat->lvec);
623     ierr = VecPipelineBegin(xx,mat->lvec,INSERTVALUES,PIPELINEUP,
624                               mat->Mvctx); CHKERR(ierr);
625     for ( i=m-1; i>-1; i-- ) {
626       n    = A->i[i+1] - diag[i] - 1;
627       idx  = A->j + diag[i];
628       v    = A->a + diag[i];
629       sum  = b[i];
630       SPARSEDENSEMDOT(sum,xs,v,idx,n);
631       d    = shift + A->a[diag[i]-1];
632       n    = B->i[i+1] - B->i[i];
633       idx  = B->j + B->i[i] - 1;
634       v    = B->a + B->i[i] - 1;
635       SPARSEDENSEMDOT(sum,ls,v,idx,n);
636       x[i] = omega*(sum/d);
637     }
638     ierr = VecPipelineEnd(xx,mat->lvec,INSERTVALUES,PIPELINEUP,
639                             mat->Mvctx); CHKERR(ierr);
640 
641     /*  t = b - (2*E - D)x */
642     v = A->a;
643     for ( i=0; i<m; i++ ) { t[i] = b[i] - scale*(v[*diag++ - 1])*x[i]; }
644 
645     /*  t = (E + L)^{-1}t */
646     ts = t - 1; /* shifted by one for index start of a or mat->j*/
647     diag = A->diag;
648     VecSet(&zero,mat->lvec);
649     ierr = VecPipelineBegin(tt,mat->lvec,INSERTVALUES,PIPELINEDOWN,
650                                                  mat->Mvctx); CHKERR(ierr);
651     for ( i=0; i<m; i++ ) {
652       n    = diag[i] - A->i[i];
653       idx  = A->j + A->i[i] - 1;
654       v    = A->a + A->i[i] - 1;
655       sum  = t[i];
656       SPARSEDENSEMDOT(sum,ts,v,idx,n);
657       d    = shift + A->a[diag[i]-1];
658       n    = B->i[i+1] - B->i[i];
659       idx  = B->j + B->i[i] - 1;
660       v    = B->a + B->i[i] - 1;
661       SPARSEDENSEMDOT(sum,ls,v,idx,n);
662       t[i] = omega*(sum/d);
663     }
664     ierr = VecPipelineEnd(tt,mat->lvec,INSERTVALUES,PIPELINEDOWN,
665                                                     mat->Mvctx); CHKERR(ierr);
666     /*  x = x + t */
667     for ( i=0; i<m; i++ ) { x[i] += t[i]; }
668     VecDestroy(tt);
669     return 0;
670   }
671 
672 
673   if ((flag & SOR_SYMMETRIC_SWEEP) == SOR_SYMMETRIC_SWEEP){
674     if (flag & SOR_ZERO_INITIAL_GUESS) {
675       VecSet(&zero,mat->lvec); VecSet(&zero,xx);
676     }
677     else {
678       ierr=VecScatterBegin(xx,mat->lvec,INSERTVALUES,SCATTERUP,mat->Mvctx);
679       CHKERR(ierr);
680       ierr = VecScatterEnd(xx,mat->lvec,INSERTVALUES,SCATTERUP,mat->Mvctx);
681       CHKERR(ierr);
682     }
683     while (its--) {
684       /* go down through the rows */
685       ierr = VecPipelineBegin(xx,mat->lvec,INSERTVALUES,PIPELINEDOWN,
686                               mat->Mvctx); CHKERR(ierr);
687       for ( i=0; i<m; i++ ) {
688         n    = A->i[i+1] - A->i[i];
689         idx  = A->j + A->i[i] - 1;
690         v    = A->a + A->i[i] - 1;
691         sum  = b[i];
692         SPARSEDENSEMDOT(sum,xs,v,idx,n);
693         d    = shift + A->a[diag[i]-1];
694         n    = B->i[i+1] - B->i[i];
695         idx  = B->j + B->i[i] - 1;
696         v    = B->a + B->i[i] - 1;
697         SPARSEDENSEMDOT(sum,ls,v,idx,n);
698         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
699       }
700       ierr = VecPipelineEnd(xx,mat->lvec,INSERTVALUES,PIPELINEDOWN,
701                             mat->Mvctx); CHKERR(ierr);
702       /* come up through the rows */
703       ierr = VecPipelineBegin(xx,mat->lvec,INSERTVALUES,PIPELINEUP,
704                               mat->Mvctx); CHKERR(ierr);
705       for ( i=m-1; i>-1; i-- ) {
706         n    = A->i[i+1] - A->i[i];
707         idx  = A->j + A->i[i] - 1;
708         v    = A->a + A->i[i] - 1;
709         sum  = b[i];
710         SPARSEDENSEMDOT(sum,xs,v,idx,n);
711         d    = shift + A->a[diag[i]-1];
712         n    = B->i[i+1] - B->i[i];
713         idx  = B->j + B->i[i] - 1;
714         v    = B->a + B->i[i] - 1;
715         SPARSEDENSEMDOT(sum,ls,v,idx,n);
716         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
717       }
718       ierr = VecPipelineEnd(xx,mat->lvec,INSERTVALUES,PIPELINEUP,
719                             mat->Mvctx); CHKERR(ierr);
720     }
721   }
722   else if (flag & SOR_FORWARD_SWEEP){
723     if (flag & SOR_ZERO_INITIAL_GUESS) {
724       VecSet(&zero,mat->lvec);
725       ierr = VecPipelineBegin(xx,mat->lvec,INSERTVALUES,PIPELINEDOWN,
726                               mat->Mvctx); CHKERR(ierr);
727       for ( i=0; i<m; i++ ) {
728         n    = diag[i] - A->i[i];
729         idx  = A->j + A->i[i] - 1;
730         v    = A->a + A->i[i] - 1;
731         sum  = b[i];
732         SPARSEDENSEMDOT(sum,xs,v,idx,n);
733         d    = shift + A->a[diag[i]-1];
734         n    = B->i[i+1] - B->i[i];
735         idx  = B->j + B->i[i] - 1;
736         v    = B->a + B->i[i] - 1;
737         SPARSEDENSEMDOT(sum,ls,v,idx,n);
738         x[i] = omega*(sum/d);
739       }
740       ierr = VecPipelineEnd(xx,mat->lvec,INSERTVALUES,PIPELINEDOWN,
741                             mat->Mvctx); CHKERR(ierr);
742       its--;
743     }
744     while (its--) {
745       ierr=VecScatterBegin(xx,mat->lvec,INSERTVALUES,SCATTERUP,mat->Mvctx);
746       CHKERR(ierr);
747       ierr = VecScatterEnd(xx,mat->lvec,INSERTVALUES,SCATTERUP,mat->Mvctx);
748       CHKERR(ierr);
749       ierr = VecPipelineBegin(xx,mat->lvec,INSERTVALUES,PIPELINEDOWN,
750                               mat->Mvctx); CHKERR(ierr);
751       for ( i=0; i<m; i++ ) {
752         n    = A->i[i+1] - A->i[i];
753         idx  = A->j + A->i[i] - 1;
754         v    = A->a + A->i[i] - 1;
755         sum  = b[i];
756         SPARSEDENSEMDOT(sum,xs,v,idx,n);
757         d    = shift + A->a[diag[i]-1];
758         n    = B->i[i+1] - B->i[i];
759         idx  = B->j + B->i[i] - 1;
760         v    = B->a + B->i[i] - 1;
761         SPARSEDENSEMDOT(sum,ls,v,idx,n);
762         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
763       }
764       ierr = VecPipelineEnd(xx,mat->lvec,INSERTVALUES,PIPELINEDOWN,
765                             mat->Mvctx); CHKERR(ierr);
766     }
767   }
768   else if (flag & SOR_BACKWARD_SWEEP){
769     if (flag & SOR_ZERO_INITIAL_GUESS) {
770       VecSet(&zero,mat->lvec);
771       ierr = VecPipelineBegin(xx,mat->lvec,INSERTVALUES,PIPELINEUP,
772                               mat->Mvctx); CHKERR(ierr);
773       for ( i=m-1; i>-1; i-- ) {
774         n    = A->i[i+1] - diag[i] - 1;
775         idx  = A->j + diag[i];
776         v    = A->a + diag[i];
777         sum  = b[i];
778         SPARSEDENSEMDOT(sum,xs,v,idx,n);
779         d    = shift + A->a[diag[i]-1];
780         n    = B->i[i+1] - B->i[i];
781         idx  = B->j + B->i[i] - 1;
782         v    = B->a + B->i[i] - 1;
783         SPARSEDENSEMDOT(sum,ls,v,idx,n);
784         x[i] = omega*(sum/d);
785       }
786       ierr = VecPipelineEnd(xx,mat->lvec,INSERTVALUES,PIPELINEUP,
787                             mat->Mvctx); CHKERR(ierr);
788       its--;
789     }
790     while (its--) {
791       ierr = VecScatterBegin(xx,mat->lvec,INSERTVALUES,SCATTERDOWN,
792                             mat->Mvctx); CHKERR(ierr);
793       ierr = VecScatterEnd(xx,mat->lvec,INSERTVALUES,SCATTERDOWN,
794                             mat->Mvctx); CHKERR(ierr);
795       ierr = VecPipelineBegin(xx,mat->lvec,INSERTVALUES,PIPELINEUP,
796                               mat->Mvctx); CHKERR(ierr);
797       for ( i=m-1; i>-1; i-- ) {
798         n    = A->i[i+1] - A->i[i];
799         idx  = A->j + A->i[i] - 1;
800         v    = A->a + A->i[i] - 1;
801         sum  = b[i];
802         SPARSEDENSEMDOT(sum,xs,v,idx,n);
803         d    = shift + A->a[diag[i]-1];
804         n    = B->i[i+1] - B->i[i];
805         idx  = B->j + B->i[i] - 1;
806         v    = B->a + B->i[i] - 1;
807         SPARSEDENSEMDOT(sum,ls,v,idx,n);
808         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
809       }
810       ierr = VecPipelineEnd(xx,mat->lvec,INSERTVALUES,PIPELINEUP,
811                             mat->Mvctx); CHKERR(ierr);
812     }
813   }
814   else if ((flag & SOR_LOCAL_SYMMETRIC_SWEEP) == SOR_LOCAL_SYMMETRIC_SWEEP){
815     if (flag & SOR_ZERO_INITIAL_GUESS) {
816       return MatRelax(mat->A,bb,omega,flag,shift,its,xx);
817     }
818     ierr=VecScatterBegin(xx,mat->lvec,INSERTVALUES,SCATTERALL,mat->Mvctx);
819     CHKERR(ierr);
820     ierr = VecScatterEnd(xx,mat->lvec,INSERTVALUES,SCATTERALL,mat->Mvctx);
821     CHKERR(ierr);
822     while (its--) {
823       /* go down through the rows */
824       for ( i=0; i<m; i++ ) {
825         n    = A->i[i+1] - A->i[i];
826         idx  = A->j + A->i[i] - 1;
827         v    = A->a + A->i[i] - 1;
828         sum  = b[i];
829         SPARSEDENSEMDOT(sum,xs,v,idx,n);
830         d    = shift + A->a[diag[i]-1];
831         n    = B->i[i+1] - B->i[i];
832         idx  = B->j + B->i[i] - 1;
833         v    = B->a + B->i[i] - 1;
834         SPARSEDENSEMDOT(sum,ls,v,idx,n);
835         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
836       }
837       /* come up through the rows */
838       for ( i=m-1; i>-1; i-- ) {
839         n    = A->i[i+1] - A->i[i];
840         idx  = A->j + A->i[i] - 1;
841         v    = A->a + A->i[i] - 1;
842         sum  = b[i];
843         SPARSEDENSEMDOT(sum,xs,v,idx,n);
844         d    = shift + A->a[diag[i]-1];
845         n    = B->i[i+1] - B->i[i];
846         idx  = B->j + B->i[i] - 1;
847         v    = B->a + B->i[i] - 1;
848         SPARSEDENSEMDOT(sum,ls,v,idx,n);
849         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
850       }
851     }
852   }
853   else if (flag & SOR_LOCAL_FORWARD_SWEEP){
854     if (flag & SOR_ZERO_INITIAL_GUESS) {
855       return MatRelax(mat->A,bb,omega,flag,shift,its,xx);
856     }
857     ierr=VecScatterBegin(xx,mat->lvec,INSERTVALUES,SCATTERALL,mat->Mvctx);
858     CHKERR(ierr);
859     ierr = VecScatterEnd(xx,mat->lvec,INSERTVALUES,SCATTERALL,mat->Mvctx);
860     CHKERR(ierr);
861     while (its--) {
862       for ( i=0; i<m; i++ ) {
863         n    = A->i[i+1] - A->i[i];
864         idx  = A->j + A->i[i] - 1;
865         v    = A->a + A->i[i] - 1;
866         sum  = b[i];
867         SPARSEDENSEMDOT(sum,xs,v,idx,n);
868         d    = shift + A->a[diag[i]-1];
869         n    = B->i[i+1] - B->i[i];
870         idx  = B->j + B->i[i] - 1;
871         v    = B->a + B->i[i] - 1;
872         SPARSEDENSEMDOT(sum,ls,v,idx,n);
873         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
874       }
875     }
876   }
877   else if (flag & SOR_LOCAL_BACKWARD_SWEEP){
878     if (flag & SOR_ZERO_INITIAL_GUESS) {
879       return MatRelax(mat->A,bb,omega,flag,shift,its,xx);
880     }
881     ierr = VecScatterBegin(xx,mat->lvec,INSERTVALUES,SCATTERALL,
882                             mat->Mvctx); CHKERR(ierr);
883     ierr = VecScatterEnd(xx,mat->lvec,INSERTVALUES,SCATTERALL,
884                             mat->Mvctx); CHKERR(ierr);
885     while (its--) {
886       for ( i=m-1; i>-1; i-- ) {
887         n    = A->i[i+1] - A->i[i];
888         idx  = A->j + A->i[i] - 1;
889         v    = A->a + A->i[i] - 1;
890         sum  = b[i];
891         SPARSEDENSEMDOT(sum,xs,v,idx,n);
892         d    = shift + A->a[diag[i]-1];
893         n    = B->i[i+1] - B->i[i];
894         idx  = B->j + B->i[i] - 1;
895         v    = B->a + B->i[i] - 1;
896         SPARSEDENSEMDOT(sum,ls,v,idx,n);
897         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
898       }
899     }
900   }
901   return 0;
902 }
903 
904 static int MatGetInfo_MPIAIJ(Mat matin,MatInfoType flag,int *nz,
905                              int *nzalloc,int *mem)
906 {
907   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
908   Mat        A = mat->A, B = mat->B;
909   int        ierr, isend[3], irecv[3], nzA, nzallocA, memA;
910 
911   ierr = MatGetInfo(A,MAT_LOCAL,&nzA,&nzallocA,&memA); CHKERR(ierr);
912   ierr = MatGetInfo(B,MAT_LOCAL,&isend[0],&isend[1],&isend[2]); CHKERR(ierr);
913   isend[0] += nzA; isend[1] += nzallocA; isend[2] += memA;
914   if (flag == MAT_LOCAL) {
915     *nz = isend[0]; *nzalloc = isend[1]; *mem = isend[2];
916   } else if (flag == MAT_GLOBAL_MAX) {
917     MPI_Allreduce((void *) isend,(void *) irecv,3,MPI_INT,MPI_MAX,matin->comm);
918     *nz = irecv[0]; *nzalloc = irecv[1]; *mem = irecv[2];
919   } else if (flag == MAT_GLOBAL_SUM) {
920     MPI_Allreduce((void *) isend,(void *) irecv,3,MPI_INT,MPI_SUM,matin->comm);
921     *nz = irecv[0]; *nzalloc = irecv[1]; *mem = irecv[2];
922   }
923   return 0;
924 }
925 
926 static int MatSetOption_MPIAIJ(Mat aijin,MatOption op)
927 {
928   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) aijin->data;
929 
930   if      (op == NO_NEW_NONZERO_LOCATIONS)  {
931     MatSetOption(aij->A,op);
932     MatSetOption(aij->B,op);
933   }
934   else if (op == YES_NEW_NONZERO_LOCATIONS) {
935     MatSetOption(aij->A,op);
936     MatSetOption(aij->B,op);
937   }
938   else if (op == COLUMN_ORIENTED) SETERR(1,"Column oriented not supported");
939   return 0;
940 }
941 
942 static int MatGetSize_MPIAIJ(Mat matin,int *m,int *n)
943 {
944   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
945   *m = mat->M; *n = mat->N;
946   return 0;
947 }
948 
949 static int MatGetLocalSize_MPIAIJ(Mat matin,int *m,int *n)
950 {
951   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
952   *m = mat->m; *n = mat->n;
953   return 0;
954 }
955 
956 static int MatGetOwnershipRange_MPIAIJ(Mat matin,int *m,int *n)
957 {
958   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
959   *m = mat->rstart; *n = mat->rend;
960   return 0;
961 }
962 
963 static int MatGetRow_MPIAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
964 {
965   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
966   Scalar     *vworkA, *vworkB, **pvA, **pvB;
967   int        i, ierr, *cworkA, *cworkB, **pcA, **pcB, cstart = mat->cstart;
968   int        nztot, nzA, nzB, lrow, rstart = mat->rstart, rend = mat->rend;
969 
970   if (!mat->assembled)
971     SETERR(1,"MatGetRow_MPIAIJ: Must assemble matrix first.");
972   if (row < rstart || row >= rend)
973     SETERR(1,"MatGetRow_MPIAIJ: Currently you can get only local rows.")
974   lrow = row - rstart;
975 
976   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
977   if (!v)   {pvA = 0; pvB = 0;}
978   if (!idx) {pcA = 0; if (!v) pcB = 0;}
979   ierr = MatGetRow(mat->A,lrow,&nzA,pcA,pvA); CHKERR(ierr);
980   ierr = MatGetRow(mat->B,lrow,&nzB,pcB,pvB); CHKERR(ierr);
981   nztot = nzA + nzB;
982 
983   if (v  || idx) {
984     if (nztot) {
985       /* Sort by increasing column numbers, assuming A and B already sorted */
986       int imark, imark2;
987       for (i=0; i<nzB; i++) cworkB[i] = mat->garray[cworkB[i]];
988       if (v) {
989         *v = (Scalar *) MALLOC( (nztot)*sizeof(Scalar) ); CHKPTR(*v);
990         for ( i=0; i<nzB; i++ ) {
991           if (cworkB[i] < cstart)   (*v)[i] = vworkB[i];
992           else break;
993         }
994         imark = i;
995         for ( i=0; i<nzA; i++ )     (*v)[imark+i] = vworkA[i];
996         imark2 = imark+nzA;
997         for ( i=imark; i<nzB; i++ ) (*v)[imark2+i] = vworkB[i];
998       }
999       if (idx) {
1000         *idx = (int *) MALLOC( (nztot)*sizeof(int) ); CHKPTR(*idx);
1001         for (i=0; i<nzA; i++) cworkA[i] += cstart;
1002         for ( i=0; i<nzB; i++ ) {
1003           if (cworkB[i] < cstart)   (*idx)[i] = cworkB[i];
1004           else break;
1005         }
1006         imark = i;
1007         for ( i=0; i<nzA; i++ )     (*idx)[imark+i] = cworkA[i];
1008         imark2 = imark+nzA;
1009         for ( i=imark; i<nzB; i++ ) (*idx)[imark2+i] = cworkB[i];
1010       }
1011     }
1012     else {*idx = 0; *v=0;}
1013   }
1014   *nz = nztot;
1015   ierr = MatRestoreRow(mat->A,lrow,&nzA,pcA,pvA); CHKERR(ierr);
1016   ierr = MatRestoreRow(mat->B,lrow,&nzB,pcB,pvB); CHKERR(ierr);
1017   return 0;
1018 }
1019 
1020 static int MatRestoreRow_MPIAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
1021 {
1022   if (idx) FREE(*idx);
1023   if (v) FREE(*v);
1024   return 0;
1025 }
1026 
1027 extern int MatConvert_MPIAIJ(Mat,MatType,Mat *);
1028 static int MatCopyPrivate_MPIAIJ(Mat,Mat *);
1029 
1030 /* -------------------------------------------------------------------*/
1031 static struct _MatOps MatOps = {MatSetValues_MPIAIJ,
1032        MatGetRow_MPIAIJ,MatRestoreRow_MPIAIJ,
1033        MatMult_MPIAIJ,MatMultAdd_MPIAIJ,
1034        MatMultTrans_MPIAIJ,MatMultTransAdd_MPIAIJ,
1035        0,0,0,0,
1036        0,0,
1037        MatRelax_MPIAIJ,
1038        0,
1039        MatGetInfo_MPIAIJ,0,
1040        MatGetDiagonal_MPIAIJ,0,0,
1041        MatAssemblyBegin_MPIAIJ,MatAssemblyEnd_MPIAIJ,
1042        0,
1043        MatSetOption_MPIAIJ,MatZeroEntries_MPIAIJ,MatZeroRows_MPIAIJ,0,
1044        0,0,0,0,
1045        MatGetSize_MPIAIJ,MatGetLocalSize_MPIAIJ,MatGetOwnershipRange_MPIAIJ,
1046        0,0,
1047        0,0,MatConvert_MPIAIJ,0,0,MatCopyPrivate_MPIAIJ};
1048 
1049 /*@
1050    MatCreateMPIAIJ - Creates a sparse parallel matrix in AIJ format
1051    (the default parallel PETSc format).
1052 
1053    Input Parameters:
1054 .  comm - MPI communicator
1055 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1056 .  n - number of local columns (or PETSC_DECIDE to have calculated
1057            if N is given)
1058 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1059 .  N - number of global columns (or PETSC_DECIDE to have calculated
1060            if n is given)
1061 .  d_nz - number of nonzeros per row in diagonal portion of matrix
1062            (same for all local rows)
1063 .  d_nzz - number of nonzeros per row in diagonal portion of matrix or null
1064            (possibly different for each row).  You must leave room for the
1065            diagonal entry even if it is zero.
1066 .  o_nz - number of nonzeros per row in off-diagonal portion of matrix
1067            (same for all local rows)
1068 .  o_nzz - number of nonzeros per row in off-diagonal portion of matrix
1069            or null (possibly different for each row).
1070 
1071    Output Parameter:
1072 .  newmat - the matrix
1073 
1074    Notes:
1075    The AIJ format (also called the Yale sparse matrix format or
1076    compressed row storage), is fully compatible with standard Fortran 77
1077    storage.  That is, the stored row and column indices begin at
1078    one, not zero.
1079 
1080    The user MUST specify either the local or global matrix dimensions
1081    (possibly both).
1082 
1083    The user can set d_nz, d_nnz, o_nz, and o_nnz to zero for PETSc to
1084    control dynamic memory allocation.
1085 
1086 .keywords: matrix, aij, compressed row, sparse, parallel
1087 
1088 .seealso: MatCreate(), MatCreateSequentialAIJ(), MatSetValues()
1089 @*/
1090 int MatCreateMPIAIJ(MPI_Comm comm,int m,int n,int M,int N,
1091                  int d_nz,int *d_nnz, int o_nz,int *o_nnz,Mat *newmat)
1092 {
1093   Mat          mat;
1094   Mat_MPIAIJ   *aij;
1095   int          ierr, i,sum[2],work[2];
1096   *newmat         = 0;
1097   PETSCHEADERCREATE(mat,_Mat,MAT_COOKIE,MATMPIAIJ,comm);
1098   PLogObjectCreate(mat);
1099   mat->data       = (void *) (aij = NEW(Mat_MPIAIJ)); CHKPTR(aij);
1100   mat->ops        = &MatOps;
1101   mat->destroy    = MatDestroy_MPIAIJ;
1102   mat->view       = MatView_MPIAIJ;
1103   mat->factor     = 0;
1104 
1105   aij->insertmode = NOTSETVALUES;
1106   MPI_Comm_rank(comm,&aij->mytid);
1107   MPI_Comm_size(comm,&aij->numtids);
1108 
1109   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1110     work[0] = m; work[1] = n;
1111     MPI_Allreduce((void *) work,(void *) sum,2,MPI_INT,MPI_SUM,comm );
1112     if (M == PETSC_DECIDE) M = sum[0];
1113     if (N == PETSC_DECIDE) N = sum[1];
1114   }
1115   if (m == PETSC_DECIDE)
1116     {m = M/aij->numtids + ((M % aij->numtids) > aij->mytid);}
1117   if (n == PETSC_DECIDE)
1118     {n = N/aij->numtids + ((N % aij->numtids) > aij->mytid);}
1119   aij->m = m;
1120   aij->n = n;
1121   aij->N = N;
1122   aij->M = M;
1123 
1124   /* build local table of row and column ownerships */
1125   aij->rowners = (int *) MALLOC(2*(aij->numtids+2)*sizeof(int));
1126   CHKPTR(aij->rowners);
1127   aij->cowners = aij->rowners + aij->numtids + 1;
1128   MPI_Allgather(&m,1,MPI_INT,aij->rowners+1,1,MPI_INT,comm);
1129   aij->rowners[0] = 0;
1130   for ( i=2; i<=aij->numtids; i++ ) {
1131     aij->rowners[i] += aij->rowners[i-1];
1132   }
1133   aij->rstart = aij->rowners[aij->mytid];
1134   aij->rend   = aij->rowners[aij->mytid+1];
1135   MPI_Allgather(&n,1,MPI_INT,aij->cowners+1,1,MPI_INT,comm);
1136   aij->cowners[0] = 0;
1137   for ( i=2; i<=aij->numtids; i++ ) {
1138     aij->cowners[i] += aij->cowners[i-1];
1139   }
1140   aij->cstart = aij->cowners[aij->mytid];
1141   aij->cend   = aij->cowners[aij->mytid+1];
1142 
1143 
1144   ierr = MatCreateSequentialAIJ(MPI_COMM_SELF,m,n,d_nz,d_nnz,&aij->A);
1145   CHKERR(ierr);
1146   PLogObjectParent(mat,aij->A);
1147   ierr = MatCreateSequentialAIJ(MPI_COMM_SELF,m,N,o_nz,o_nnz,&aij->B);
1148   CHKERR(ierr);
1149   PLogObjectParent(mat,aij->B);
1150 
1151   /* build cache for off array entries formed */
1152   ierr = StashBuild_Private(&aij->stash); CHKERR(ierr);
1153   aij->colmap    = 0;
1154   aij->garray    = 0;
1155 
1156   /* stuff used for matrix vector multiply */
1157   aij->lvec      = 0;
1158   aij->Mvctx     = 0;
1159   aij->assembled = 0;
1160 
1161   *newmat = mat;
1162   return 0;
1163 }
1164 
1165 static int MatCopyPrivate_MPIAIJ(Mat matin,Mat *newmat)
1166 {
1167   Mat        mat;
1168   Mat_MPIAIJ *aij,*oldmat = (Mat_MPIAIJ *) matin->data;
1169   int        ierr, len;
1170   *newmat      = 0;
1171 
1172   if (!oldmat->assembled) SETERR(1,"Cannot copy unassembled matrix");
1173   PETSCHEADERCREATE(mat,_Mat,MAT_COOKIE,MATMPIAIJ,matin->comm);
1174   PLogObjectCreate(mat);
1175   mat->data       = (void *) (aij = NEW(Mat_MPIAIJ)); CHKPTR(aij);
1176   mat->ops        = &MatOps;
1177   mat->destroy    = MatDestroy_MPIAIJ;
1178   mat->view       = MatView_MPIAIJ;
1179   mat->factor     = matin->factor;
1180 
1181   aij->m          = oldmat->m;
1182   aij->n          = oldmat->n;
1183   aij->M          = oldmat->M;
1184   aij->N          = oldmat->N;
1185 
1186   aij->assembled  = 1;
1187   aij->rstart     = oldmat->rstart;
1188   aij->rend       = oldmat->rend;
1189   aij->cstart     = oldmat->cstart;
1190   aij->cend       = oldmat->cend;
1191   aij->numtids    = oldmat->numtids;
1192   aij->mytid      = oldmat->mytid;
1193   aij->insertmode = NOTSETVALUES;
1194 
1195   aij->rowners        = (int *) MALLOC( (aij->numtids+1)*sizeof(int) );
1196   CHKPTR(aij->rowners);
1197   MEMCPY(aij->rowners,oldmat->rowners,(aij->numtids+1)*sizeof(int));
1198   ierr = StashInitialize_Private(&aij->stash); CHKERR(ierr);
1199   if (oldmat->colmap) {
1200     aij->colmap      = (int *) MALLOC( (aij->N)*sizeof(int) );
1201     CHKPTR(aij->colmap);
1202     MEMCPY(aij->colmap,oldmat->colmap,(aij->N)*sizeof(int));
1203   } else aij->colmap = 0;
1204   if (oldmat->garray && (len = ((Mat_AIJ *) (oldmat->B->data))->n)) {
1205     aij->garray      = (int *) MALLOC(len*sizeof(int) ); CHKPTR(aij->garray);
1206     MEMCPY(aij->garray,oldmat->garray,len*sizeof(int));
1207   } else aij->garray = 0;
1208 
1209   ierr =  VecDuplicate(oldmat->lvec,&aij->lvec); CHKERR(ierr);
1210   PLogObjectParent(mat,aij->lvec);
1211   ierr =  VecScatterCtxCopy(oldmat->Mvctx,&aij->Mvctx); CHKERR(ierr);
1212   PLogObjectParent(mat,aij->Mvctx);
1213   ierr =  MatConvert(oldmat->A,MATSAME,&aij->A); CHKERR(ierr);
1214   PLogObjectParent(mat,aij->A);
1215   ierr =  MatConvert(oldmat->B,MATSAME,&aij->B); CHKERR(ierr);
1216   PLogObjectParent(mat,aij->B);
1217   *newmat = mat;
1218   return 0;
1219 }
1220