xref: /petsc/src/mat/impls/aij/mpi/mpiaij.c (revision 4a0ce1022b72e4f6d4316c6a4e26837fdc335099)
1 #ifndef lint
2 static char vcid[] = "$Id: mpiaij.c,v 1.31 1995/04/25 19:09:13 curfman Exp curfman $";
3 #endif
4 
5 #include "mpiaij.h"
6 #include "vec/vecimpl.h"
7 #include "inline/spops.h"
8 
9 #define CHUNCKSIZE   100
10 /*
11    This is a simple minded stash. Do a linear search to determine if
12  in stash, if not add to end.
13 */
14 static int StashValues(Stash *stash,int row,int n, int *idxn,
15                        Scalar *values,InsertMode addv)
16 {
17   int    i,j,N = stash->n,found,*n_idx, *n_idy;
18   Scalar val,*n_array;
19 
20   for ( i=0; i<n; i++ ) {
21     found = 0;
22     val = *values++;
23     for ( j=0; j<N; j++ ) {
24       if ( stash->idx[j] == row && stash->idy[j] == idxn[i]) {
25         /* found a match */
26         if (addv == AddValues) stash->array[j] += val;
27         else stash->array[j] = val;
28         found = 1;
29         break;
30       }
31     }
32     if (!found) { /* not found so add to end */
33       if ( stash->n == stash->nmax ) {
34         /* allocate a larger stash */
35         n_array = (Scalar *) MALLOC( (stash->nmax + CHUNCKSIZE)*(
36                                      2*sizeof(int) + sizeof(Scalar)));
37         CHKPTR(n_array);
38         n_idx = (int *) (n_array + stash->nmax + CHUNCKSIZE);
39         n_idy = (int *) (n_idx + stash->nmax + CHUNCKSIZE);
40         MEMCPY(n_array,stash->array,stash->nmax*sizeof(Scalar));
41         MEMCPY(n_idx,stash->idx,stash->nmax*sizeof(int));
42         MEMCPY(n_idy,stash->idy,stash->nmax*sizeof(int));
43         if (stash->array) FREE(stash->array);
44         stash->array = n_array; stash->idx = n_idx; stash->idy = n_idy;
45         stash->nmax += CHUNCKSIZE;
46       }
47       stash->array[stash->n]   = val;
48       stash->idx[stash->n]     = row;
49       stash->idy[stash->n++]   = idxn[i];
50     }
51   }
52   return 0;
53 }
54 
55 /* local utility routine that creates a mapping from the global column
56 number to the local number in the off-diagonal part of the local
57 storage of the matrix.  This is done in a non scable way since the
58 length of colmap equals the global matrix length.
59 */
60 static int CreateColmap(Mat mat)
61 {
62   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
63   Mat_AIJ    *B = (Mat_AIJ*) aij->B->data;
64   int        n = B->n,i;
65   aij->colmap = (int *) MALLOC( aij->N*sizeof(int) ); CHKPTR(aij->colmap);
66   MEMSET(aij->colmap,0,aij->N*sizeof(int));
67   for ( i=0; i<n; i++ ) aij->colmap[aij->garray[i]] = i+1;
68   return 0;
69 }
70 
71 static int MatSetValues_MPIAIJ(Mat mat,int m,int *idxm,int n,
72                             int *idxn,Scalar *v,InsertMode addv)
73 {
74   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
75   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
76   int        cstart = aij->cstart, cend = aij->cend,row,col;
77 
78   if (aij->insertmode != NotSetValues && aij->insertmode != addv) {
79     SETERR(1,"You cannot mix inserts and adds");
80   }
81   aij->insertmode = addv;
82   for ( i=0; i<m; i++ ) {
83     if (idxm[i] < 0) SETERR(1,"Negative row index");
84     if (idxm[i] >= aij->M) SETERR(1,"Row index too large");
85     if (idxm[i] >= rstart && idxm[i] < rend) {
86       row = idxm[i] - rstart;
87       for ( j=0; j<n; j++ ) {
88         if (idxn[j] < 0) SETERR(1,"Negative column index");
89         if (idxn[j] >= aij->N) SETERR(1,"Column index too large");
90         if (idxn[j] >= cstart && idxn[j] < cend){
91           col = idxn[j] - cstart;
92           ierr = MatSetValues(aij->A,1,&row,1,&col,v+i*n+j,addv);CHKERR(ierr);
93         }
94         else {
95           if (aij->assembled) {
96             if (!aij->colmap) {ierr = CreateColmap(mat); CHKERR(ierr);}
97             col = aij->colmap[idxn[j]] - 1;
98             if (col < 0) {
99               SETERR(1,"Cannot insert new off diagonal block nonzero in\
100                      already\
101                      assembled matrix. Contact petsc-maint@mcs.anl.gov\
102                      if your need this feature");
103             }
104           }
105           else col = idxn[j];
106           ierr = MatSetValues(aij->B,1,&row,1,&col,v+i*n+j,addv);CHKERR(ierr);
107         }
108       }
109     }
110     else {
111       ierr = StashValues(&aij->stash,idxm[i],n,idxn,v+i*n,addv);CHKERR(ierr);
112     }
113   }
114   return 0;
115 }
116 
117 /*
118     the assembly code is alot like the code for vectors, we should
119     sometime derive a single assembly code that can be used for
120     either case.
121 */
122 
123 static int MatBeginAssembly_MPIAIJ(Mat mat,int mode)
124 {
125   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
126   MPI_Comm    comm = mat->comm;
127   int         numtids = aij->numtids, *owners = aij->rowners;
128   int         mytid = aij->mytid;
129   MPI_Request *send_waits,*recv_waits;
130   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
131   int         tag = 50, *owner,*starts,count;
132   InsertMode  addv;
133   Scalar      *rvalues,*svalues;
134 
135   /* make sure all processors are either in INSERTMODE or ADDMODE */
136   MPI_Allreduce((void *) &aij->insertmode,(void *) &addv,1,MPI_INT,
137                 MPI_BOR,comm);
138   if (addv == (AddValues|InsertValues)) {
139     SETERR(1,"Some processors have inserted while others have added");
140   }
141   aij->insertmode = addv; /* in case this processor had no cache */
142 
143   /*  first count number of contributors to each processor */
144   nprocs = (int *) MALLOC( 2*numtids*sizeof(int) ); CHKPTR(nprocs);
145   MEMSET(nprocs,0,2*numtids*sizeof(int)); procs = nprocs + numtids;
146   owner = (int *) MALLOC( (aij->stash.n+1)*sizeof(int) ); CHKPTR(owner);
147   for ( i=0; i<aij->stash.n; i++ ) {
148     idx = aij->stash.idx[i];
149     for ( j=0; j<numtids; j++ ) {
150       if (idx >= owners[j] && idx < owners[j+1]) {
151         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
152       }
153     }
154   }
155   nsends = 0;  for ( i=0; i<numtids; i++ ) { nsends += procs[i];}
156 
157   /* inform other processors of number of messages and max length*/
158   work = (int *) MALLOC( numtids*sizeof(int) ); CHKPTR(work);
159   MPI_Allreduce((void *) procs,(void *) work,numtids,MPI_INT,MPI_SUM,comm);
160   nreceives = work[mytid];
161   MPI_Allreduce((void *) nprocs,(void *) work,numtids,MPI_INT,MPI_MAX,comm);
162   nmax = work[mytid];
163   FREE(work);
164 
165   /* post receives:
166        1) each message will consist of ordered pairs
167      (global index,value) we store the global index as a double
168      to simplify the message passing.
169        2) since we don't know how long each individual message is we
170      allocate the largest needed buffer for each receive. Potentially
171      this is a lot of wasted space.
172 
173 
174        This could be done better.
175   */
176   rvalues = (Scalar *) MALLOC(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
177   CHKPTR(rvalues);
178   recv_waits = (MPI_Request *) MALLOC((nreceives+1)*sizeof(MPI_Request));
179   CHKPTR(recv_waits);
180   for ( i=0; i<nreceives; i++ ) {
181     MPI_Irecv((void *)(rvalues+3*nmax*i),3*nmax,MPI_SCALAR,MPI_ANY_SOURCE,tag,
182               comm,recv_waits+i);
183   }
184 
185   /* do sends:
186       1) starts[i] gives the starting index in svalues for stuff going to
187          the ith processor
188   */
189   svalues = (Scalar *) MALLOC( 3*(aij->stash.n+1)*sizeof(Scalar) );
190   CHKPTR(svalues);
191   send_waits = (MPI_Request *) MALLOC( (nsends+1)*sizeof(MPI_Request));
192   CHKPTR(send_waits);
193   starts = (int *) MALLOC( numtids*sizeof(int) ); CHKPTR(starts);
194   starts[0] = 0;
195   for ( i=1; i<numtids; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
196   for ( i=0; i<aij->stash.n; i++ ) {
197     svalues[3*starts[owner[i]]]       = (Scalar)  aij->stash.idx[i];
198     svalues[3*starts[owner[i]]+1]     = (Scalar)  aij->stash.idy[i];
199     svalues[3*(starts[owner[i]]++)+2] =  aij->stash.array[i];
200   }
201   FREE(owner);
202   starts[0] = 0;
203   for ( i=1; i<numtids; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
204   count = 0;
205   for ( i=0; i<numtids; i++ ) {
206     if (procs[i]) {
207       MPI_Isend((void*)(svalues+3*starts[i]),3*nprocs[i],MPI_SCALAR,i,tag,
208                 comm,send_waits+count++);
209     }
210   }
211   FREE(starts); FREE(nprocs);
212 
213   /* Free cache space */
214   aij->stash.nmax = aij->stash.n = 0;
215   if (aij->stash.array){ FREE(aij->stash.array); aij->stash.array = 0;}
216 
217   aij->svalues    = svalues;       aij->rvalues = rvalues;
218   aij->nsends     = nsends;         aij->nrecvs = nreceives;
219   aij->send_waits = send_waits; aij->recv_waits = recv_waits;
220   aij->rmax       = nmax;
221 
222   return 0;
223 }
224 extern int MatSetUpMultiply_MPIAIJ(Mat);
225 
226 static int MatEndAssembly_MPIAIJ(Mat mat,int mode)
227 {
228   int        ierr;
229   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
230 
231   MPI_Status  *send_status,recv_status;
232   int         imdex,nrecvs = aij->nrecvs, count = nrecvs, i, n;
233   int         row,col;
234   Scalar      *values,val;
235   InsertMode  addv = aij->insertmode;
236 
237   /*  wait on receives */
238   while (count) {
239     MPI_Waitany(nrecvs,aij->recv_waits,&imdex,&recv_status);
240     /* unpack receives into our local space */
241     values = aij->rvalues + 3*imdex*aij->rmax;
242     MPI_Get_count(&recv_status,MPI_SCALAR,&n);
243     n = n/3;
244     for ( i=0; i<n; i++ ) {
245       row = (int) PETSCREAL(values[3*i]) - aij->rstart;
246       col = (int) PETSCREAL(values[3*i+1]);
247       val = values[3*i+2];
248       if (col >= aij->cstart && col < aij->cend) {
249           col -= aij->cstart;
250         MatSetValues(aij->A,1,&row,1,&col,&val,addv);
251       }
252       else {
253         if (aij->assembled) {
254           if (!aij->colmap) {ierr = CreateColmap(mat); CHKERR(ierr);}
255           col = aij->colmap[col] - 1;
256           if (col < 0) {
257             SETERR(1,"Cannot insert new off diagonal block nonzero in\
258                      already\
259                      assembled matrix. Contact petsc-maint@mcs.anl.gov\
260                      if your need this feature");
261           }
262         }
263         MatSetValues(aij->B,1,&row,1,&col,&val,addv);
264       }
265     }
266     count--;
267   }
268   FREE(aij->recv_waits); FREE(aij->rvalues);
269 
270   /* wait on sends */
271   if (aij->nsends) {
272     send_status = (MPI_Status *) MALLOC( aij->nsends*sizeof(MPI_Status) );
273     CHKPTR(send_status);
274     MPI_Waitall(aij->nsends,aij->send_waits,send_status);
275     FREE(send_status);
276   }
277   FREE(aij->send_waits); FREE(aij->svalues);
278 
279   aij->insertmode = NotSetValues;
280   ierr = MatBeginAssembly(aij->A,mode); CHKERR(ierr);
281   ierr = MatEndAssembly(aij->A,mode); CHKERR(ierr);
282 
283   if (!aij->assembled && mode == FINAL_ASSEMBLY) {
284     ierr = MatSetUpMultiply_MPIAIJ(mat); CHKERR(ierr);
285     aij->assembled = 1;
286   }
287   ierr = MatBeginAssembly(aij->B,mode); CHKERR(ierr);
288   ierr = MatEndAssembly(aij->B,mode); CHKERR(ierr);
289 
290   return 0;
291 }
292 
293 static int MatZeroEntries_MPIAIJ(Mat A)
294 {
295   Mat_MPIAIJ *l = (Mat_MPIAIJ *) A->data;
296 
297   MatZeroEntries(l->A); MatZeroEntries(l->B);
298   return 0;
299 }
300 
301 /* again this uses the same basic stratagy as in the assembly and
302    scatter create routines, we should try to do it systemamatically
303    if we can figure out the proper level of generality. */
304 
305 /* the code does not do the diagonal entries correctly unless the
306    matrix is square and the column and row owerships are identical.
307    This is a BUG. The only way to fix it seems to be to access
308    aij->A and aij->B directly and not through the MatZeroRows()
309    routine.
310 */
311 static int MatZeroRows_MPIAIJ(Mat A,IS is,Scalar *diag)
312 {
313   Mat_MPIAIJ     *l = (Mat_MPIAIJ *) A->data;
314   int            i,ierr,N, *rows,*owners = l->rowners,numtids = l->numtids;
315   int            *procs,*nprocs,j,found,idx,nsends,*work;
316   int            nmax,*svalues,*starts,*owner,nrecvs,mytid = l->mytid;
317   int            *rvalues,tag = 67,count,base,slen,n,*source;
318   int            *lens,imdex,*lrows,*values;
319   MPI_Comm       comm = A->comm;
320   MPI_Request    *send_waits,*recv_waits;
321   MPI_Status     recv_status,*send_status;
322   IS             istmp;
323 
324   if (!l->assembled) SETERR(1,"MatZeroRows_MPIAIJ: must assemble matrix first");
325   ierr = ISGetLocalSize(is,&N); CHKERR(ierr);
326   ierr = ISGetIndices(is,&rows); CHKERR(ierr);
327 
328   /*  first count number of contributors to each processor */
329   nprocs = (int *) MALLOC( 2*numtids*sizeof(int) ); CHKPTR(nprocs);
330   MEMSET(nprocs,0,2*numtids*sizeof(int)); procs = nprocs + numtids;
331   owner = (int *) MALLOC((N+1)*sizeof(int)); CHKPTR(owner); /* see note*/
332   for ( i=0; i<N; i++ ) {
333     idx = rows[i];
334     found = 0;
335     for ( j=0; j<numtids; j++ ) {
336       if (idx >= owners[j] && idx < owners[j+1]) {
337         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
338       }
339     }
340     if (!found) SETERR(1,"Imdex out of range");
341   }
342   nsends = 0;  for ( i=0; i<numtids; i++ ) { nsends += procs[i];}
343 
344   /* inform other processors of number of messages and max length*/
345   work = (int *) MALLOC( numtids*sizeof(int) ); CHKPTR(work);
346   MPI_Allreduce((void *) procs,(void *) work,numtids,MPI_INT,MPI_SUM,comm);
347   nrecvs = work[mytid];
348   MPI_Allreduce((void *) nprocs,(void *) work,numtids,MPI_INT,MPI_MAX,comm);
349   nmax = work[mytid];
350   FREE(work);
351 
352   /* post receives:   */
353   rvalues = (int *) MALLOC((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
354   CHKPTR(rvalues);
355   recv_waits = (MPI_Request *) MALLOC((nrecvs+1)*sizeof(MPI_Request));
356   CHKPTR(recv_waits);
357   for ( i=0; i<nrecvs; i++ ) {
358     MPI_Irecv((void *)(rvalues+nmax*i),nmax,MPI_INT,MPI_ANY_SOURCE,tag,
359               comm,recv_waits+i);
360   }
361 
362   /* do sends:
363       1) starts[i] gives the starting index in svalues for stuff going to
364          the ith processor
365   */
366   svalues = (int *) MALLOC( (N+1)*sizeof(int) ); CHKPTR(svalues);
367   send_waits = (MPI_Request *) MALLOC( (nsends+1)*sizeof(MPI_Request));
368   CHKPTR(send_waits);
369   starts = (int *) MALLOC( (numtids+1)*sizeof(int) ); CHKPTR(starts);
370   starts[0] = 0;
371   for ( i=1; i<numtids; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
372   for ( i=0; i<N; i++ ) {
373     svalues[starts[owner[i]]++] = rows[i];
374   }
375   ISRestoreIndices(is,&rows);
376 
377   starts[0] = 0;
378   for ( i=1; i<numtids+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
379   count = 0;
380   for ( i=0; i<numtids; i++ ) {
381     if (procs[i]) {
382       MPI_Isend((void*)(svalues+starts[i]),nprocs[i],MPI_INT,i,tag,
383                 comm,send_waits+count++);
384     }
385   }
386   FREE(starts);
387 
388   base = owners[mytid];
389 
390   /*  wait on receives */
391   lens = (int *) MALLOC( 2*(nrecvs+1)*sizeof(int) ); CHKPTR(lens);
392   source = lens + nrecvs;
393   count = nrecvs; slen = 0;
394   while (count) {
395     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
396     /* unpack receives into our local space */
397     MPI_Get_count(&recv_status,MPI_INT,&n);
398     source[imdex]  = recv_status.MPI_SOURCE;
399     lens[imdex]  = n;
400     slen += n;
401     count--;
402   }
403   FREE(recv_waits);
404 
405   /* move the data into the send scatter */
406   lrows = (int *) MALLOC( slen*sizeof(int) ); CHKPTR(lrows);
407   count = 0;
408   for ( i=0; i<nrecvs; i++ ) {
409     values = rvalues + i*nmax;
410     for ( j=0; j<lens[i]; j++ ) {
411       lrows[count++] = values[j] - base;
412     }
413   }
414   FREE(rvalues); FREE(lens);
415   FREE(owner); FREE(nprocs);
416 
417   /* actually zap the local rows */
418   ierr = ISCreateSequential(MPI_COMM_SELF,slen,lrows,&istmp);
419   CHKERR(ierr);  FREE(lrows);
420   ierr = MatZeroRows(l->A,istmp,diag); CHKERR(ierr);
421   ierr = MatZeroRows(l->B,istmp,0); CHKERR(ierr);
422   ierr = ISDestroy(istmp); CHKERR(ierr);
423 
424   /* wait on sends */
425   if (nsends) {
426     send_status = (MPI_Status *) MALLOC( nsends*sizeof(MPI_Status) );
427     CHKPTR(send_status);
428     MPI_Waitall(nsends,send_waits,send_status);
429     FREE(send_status);
430   }
431   FREE(send_waits); FREE(svalues);
432 
433 
434   return 0;
435 }
436 
437 static int MatMult_MPIAIJ(Mat aijin,Vec xx,Vec yy)
438 {
439   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) aijin->data;
440   int        ierr;
441   if (!aij->assembled) SETERR(1,"MatMult_MPIAIJ: must assemble matrix first");
442   ierr = VecScatterBegin(xx,0,aij->lvec,0,InsertValues,ScatterAll,aij->Mvctx);
443   CHKERR(ierr);
444   ierr = MatMult(aij->A,xx,yy); CHKERR(ierr);
445   ierr = VecScatterEnd(xx,0,aij->lvec,0,InsertValues,ScatterAll,aij->Mvctx);
446   CHKERR(ierr);
447   ierr = MatMultAdd(aij->B,aij->lvec,yy,yy); CHKERR(ierr);
448   return 0;
449 }
450 
451 static int MatMultAdd_MPIAIJ(Mat aijin,Vec xx,Vec yy,Vec zz)
452 {
453   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) aijin->data;
454   int        ierr;
455   if (!aij->assembled) SETERR(1,"MatMult_MPIAIJ: must assemble matrix first");
456   ierr = VecScatterBegin(xx,0,aij->lvec,0,InsertValues,ScatterAll,aij->Mvctx);
457   CHKERR(ierr);
458   ierr = MatMultAdd(aij->A,xx,yy,zz); CHKERR(ierr);
459   ierr = VecScatterEnd(xx,0,aij->lvec,0,InsertValues,ScatterAll,aij->Mvctx);
460   CHKERR(ierr);
461   ierr = MatMultAdd(aij->B,aij->lvec,zz,zz); CHKERR(ierr);
462   return 0;
463 }
464 
465 static int MatMultTrans_MPIAIJ(Mat aijin,Vec xx,Vec yy)
466 {
467   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) aijin->data;
468   int        ierr;
469 
470   if (!aij->assembled)
471     SETERR(1,"MatMulTrans_MPIAIJ: must assemble matrix first");
472   /* do nondiagonal part */
473   ierr = MatMultTrans(aij->B,xx,aij->lvec); CHKERR(ierr);
474   /* send it on its way */
475   ierr = VecScatterBegin(aij->lvec,0,yy,0,AddValues,
476                          ScatterAll|ScatterReverse,aij->Mvctx); CHKERR(ierr);
477   /* do local part */
478   ierr = MatMultTrans(aij->A,xx,yy); CHKERR(ierr);
479   /* receive remote parts: note this assumes the values are not actually */
480   /* inserted in yy until the next line, which is true for my implementation*/
481   /* but is not perhaps always true. */
482   ierr = VecScatterEnd(aij->lvec,0,yy,0,AddValues,ScatterAll|ScatterReverse,
483                          aij->Mvctx); CHKERR(ierr);
484   return 0;
485 }
486 
487 static int MatMultTransAdd_MPIAIJ(Mat aijin,Vec xx,Vec yy,Vec zz)
488 {
489   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) aijin->data;
490   int        ierr;
491 
492   if (!aij->assembled)
493     SETERR(1,"MatMulTransAdd_MPIAIJ: must assemble matrix first");
494   /* do nondiagonal part */
495   ierr = MatMultTrans(aij->B,xx,aij->lvec); CHKERR(ierr);
496   /* send it on its way */
497   ierr = VecScatterBegin(aij->lvec,0,zz,0,AddValues,
498                          ScatterAll|ScatterReverse,aij->Mvctx); CHKERR(ierr);
499   /* do local part */
500   ierr = MatMultTransAdd(aij->A,xx,yy,zz); CHKERR(ierr);
501   /* receive remote parts: note this assumes the values are not actually */
502   /* inserted in yy until the next line, which is true for my implementation*/
503   /* but is not perhaps always true. */
504   ierr = VecScatterEnd(aij->lvec,0,zz,0,AddValues,ScatterAll|ScatterReverse,
505                          aij->Mvctx); CHKERR(ierr);
506   return 0;
507 }
508 
509 /*
510   This only works correctly for square matrices where the subblock A->A is the
511    diagonal block
512 */
513 static int MatGetDiagonal_MPIAIJ(Mat Ain,Vec v)
514 {
515   Mat_MPIAIJ *A = (Mat_MPIAIJ *) Ain->data;
516   if (!A->assembled) SETERR(1,"MatGetDiag_MPIAIJ: must assemble matrix first");
517   return MatGetDiagonal(A->A,v);
518 }
519 
520 static int MatDestroy_MPIAIJ(PetscObject obj)
521 {
522   Mat        mat = (Mat) obj;
523   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
524   int        ierr;
525 #if defined(PETSC_LOG)
526   PLogObjectState(obj,"Rows %d Cols %d",aij->M,aij->N);
527 #endif
528   FREE(aij->rowners);
529   ierr = MatDestroy(aij->A); CHKERR(ierr);
530   ierr = MatDestroy(aij->B); CHKERR(ierr);
531   if (aij->colmap) FREE(aij->colmap);
532   if (aij->garray) FREE(aij->garray);
533   if (aij->lvec) VecDestroy(aij->lvec);
534   if (aij->Mvctx) VecScatterCtxDestroy(aij->Mvctx);
535   FREE(aij);
536   PLogObjectDestroy(mat);
537   PETSCHEADERDESTROY(mat);
538   return 0;
539 }
540 #include "draw.h"
541 static int MatView_MPIAIJ(PetscObject obj,Viewer viewer)
542 {
543   Mat        mat = (Mat) obj;
544   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
545   int        ierr;
546   PetscObject vobj = (PetscObject) viewer;
547 
548   if (!viewer) { /* so that viewers may be used from debuggers */
549     viewer = STDOUT_VIEWER; vobj = (PetscObject) viewer;
550   }
551   if (!aij->assembled) SETERR(1,"MatView_MPIAIJ: must assemble matrix first");
552   if (vobj->cookie == VIEWER_COOKIE && vobj->type == FILE_VIEWER) {
553     FILE *fd = ViewerFileGetPointer_Private(viewer);
554     MPE_Seq_begin(mat->comm,1);
555     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
556              aij->mytid,aij->m,aij->rstart,aij->rend,aij->n,aij->cstart,
557              aij->cend);
558     ierr = MatView(aij->A,viewer); CHKERR(ierr);
559     ierr = MatView(aij->B,viewer); CHKERR(ierr);
560     fflush(fd);
561     MPE_Seq_end(mat->comm,1);
562   }
563   else if ((vobj->cookie == VIEWER_COOKIE && vobj->type == FILES_VIEWER) ||
564             vobj->cookie == DRAW_COOKIE) {
565     int numtids = aij->numtids, mytid = aij->mytid;
566     if (numtids == 1) {
567       ierr = MatView(aij->A,viewer); CHKERR(ierr);
568     }
569     else {
570       /* assemble the entire matrix onto first processor. */
571       Mat     A;
572       Mat_AIJ *Aloc;
573       int     M = aij->M, N = aij->N,m,*ai,*aj,row,*cols,i,*ct;
574       Scalar  *a;
575 
576       if (!mytid) {
577         ierr = MatCreateMPIAIJ(mat->comm,M,N,M,N,0,0,0,0,&A);
578       }
579       else {
580         ierr = MatCreateMPIAIJ(mat->comm,0,0,M,N,0,0,0,0,&A);
581       }
582       CHKERR(ierr);
583 
584       /* copy over the A part */
585       Aloc = (Mat_AIJ*) aij->A->data;
586       m = Aloc->m; ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
587       row = aij->rstart;
588       for ( i=0; i<ai[m]; i++ ) {aj[i] += aij->cstart - 1;}
589       for ( i=0; i<m; i++ ) {
590         ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],aj,a,InsertValues);
591         CHKERR(ierr);
592         row++; a += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
593       }
594       aj = Aloc->j;
595       for ( i=0; i<ai[m]; i++ ) {aj[i] -= aij->cstart - 1;}
596 
597       /* copy over the B part */
598       Aloc = (Mat_AIJ*) aij->B->data;
599       m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
600       row = aij->rstart;
601       ct = cols = (int *) MALLOC( (ai[m]+1)*sizeof(int) ); CHKPTR(cols);
602       for ( i=0; i<ai[m]; i++ ) {cols[i] = aij->garray[aj[i]-1];}
603       for ( i=0; i<m; i++ ) {
604         ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],cols,a,InsertValues);
605         CHKERR(ierr);
606         row++; a += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
607       }
608       FREE(ct);
609       ierr = MatBeginAssembly(A,FINAL_ASSEMBLY); CHKERR(ierr);
610       ierr = MatEndAssembly(A,FINAL_ASSEMBLY); CHKERR(ierr);
611       if (!mytid) {
612         ierr = MatView(((Mat_MPIAIJ*)(A->data))->A,viewer); CHKERR(ierr);
613       }
614       ierr = MatDestroy(A); CHKERR(ierr);
615     }
616   }
617   return 0;
618 }
619 
620 extern int MatMarkDiag_AIJ(Mat_AIJ  *);
621 /*
622     This has to provide several versions.
623 
624      1) per sequential
625      2) a) use only local smoothing updating outer values only once.
626         b) local smoothing updating outer values each inner iteration
627      3) color updating out values betwen colors.
628 */
629 static int MatRelax_MPIAIJ(Mat matin,Vec bb,double omega,int flag,double shift,
630                         int its,Vec xx)
631 {
632   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
633   Mat        AA = mat->A, BB = mat->B;
634   Mat_AIJ    *A = (Mat_AIJ *) AA->data, *B = (Mat_AIJ *)BB->data;
635   Scalar     zero = 0.0,*b,*x,*xs,*ls,d,*v,sum,scale,*t,*ts;
636   int        ierr,*idx, *diag;
637   int        n = mat->n, m = mat->m, i;
638   Vec        tt;
639 
640   if (!mat->assembled) SETERR(1,"MatRelax_MPIAIJ: must assemble matrix first");
641 
642   VecGetArray(xx,&x); VecGetArray(bb,&b); VecGetArray(mat->lvec,&ls);
643   xs = x -1; /* shift by one for index start of 1 */
644   ls--;
645   if (!A->diag) {if ((ierr = MatMarkDiag_AIJ(A))) return ierr;}
646   diag = A->diag;
647   if (flag == SOR_APPLY_UPPER || flag == SOR_APPLY_LOWER) {
648     SETERR(1,"That option not yet support for parallel AIJ matrices");
649   }
650   if (flag & SOR_EISENSTAT) {
651     /* Let  A = L + U + D; where L is lower trianglar,
652     U is upper triangular, E is diagonal; This routine applies
653 
654             (L + E)^{-1} A (U + E)^{-1}
655 
656     to a vector efficiently using Eisenstat's trick. This is for
657     the case of SSOR preconditioner, so E is D/omega where omega
658     is the relaxation factor.
659     */
660     ierr = VecCreate(xx,&tt); CHKERR(ierr);
661     VecGetArray(tt,&t);
662     scale = (2.0/omega) - 1.0;
663     /*  x = (E + U)^{-1} b */
664     VecSet(&zero,mat->lvec);
665     ierr = VecPipelineBegin(xx,0,mat->lvec,0,InsertValues,PipelineUp,
666                               mat->Mvctx); CHKERR(ierr);
667     for ( i=m-1; i>-1; i-- ) {
668       n    = A->i[i+1] - diag[i] - 1;
669       idx  = A->j + diag[i];
670       v    = A->a + diag[i];
671       sum  = b[i];
672       SPARSEDENSEMDOT(sum,xs,v,idx,n);
673       d    = shift + A->a[diag[i]-1];
674       n    = B->i[i+1] - B->i[i];
675       idx  = B->j + B->i[i] - 1;
676       v    = B->a + B->i[i] - 1;
677       SPARSEDENSEMDOT(sum,ls,v,idx,n);
678       x[i] = omega*(sum/d);
679     }
680     ierr = VecPipelineEnd(xx,0,mat->lvec,0,InsertValues,PipelineUp,
681                             mat->Mvctx); CHKERR(ierr);
682 
683     /*  t = b - (2*E - D)x */
684     v = A->a;
685     for ( i=0; i<m; i++ ) { t[i] = b[i] - scale*(v[*diag++ - 1])*x[i]; }
686 
687     /*  t = (E + L)^{-1}t */
688     ts = t - 1; /* shifted by one for index start of a or mat->j*/
689     diag = A->diag;
690     VecSet(&zero,mat->lvec);
691     ierr = VecPipelineBegin(tt,0,mat->lvec,0,InsertValues,PipelineDown,
692                                                  mat->Mvctx); CHKERR(ierr);
693     for ( i=0; i<m; i++ ) {
694       n    = diag[i] - A->i[i];
695       idx  = A->j + A->i[i] - 1;
696       v    = A->a + A->i[i] - 1;
697       sum  = t[i];
698       SPARSEDENSEMDOT(sum,ts,v,idx,n);
699       d    = shift + A->a[diag[i]-1];
700       n    = B->i[i+1] - B->i[i];
701       idx  = B->j + B->i[i] - 1;
702       v    = B->a + B->i[i] - 1;
703       SPARSEDENSEMDOT(sum,ls,v,idx,n);
704       t[i] = omega*(sum/d);
705     }
706     ierr = VecPipelineEnd(tt,0,mat->lvec,0,InsertValues,PipelineDown,
707                                                     mat->Mvctx); CHKERR(ierr);
708     /*  x = x + t */
709     for ( i=0; i<m; i++ ) { x[i] += t[i]; }
710     VecDestroy(tt);
711     return 0;
712   }
713 
714 
715   if ((flag & SOR_SYMMETRIC_SWEEP) == SOR_SYMMETRIC_SWEEP){
716     if (flag & SOR_ZERO_INITIAL_GUESS) {
717       VecSet(&zero,mat->lvec); VecSet(&zero,xx);
718     }
719     else {
720       ierr=VecScatterBegin(xx,0,mat->lvec,0,InsertValues,ScatterUp,mat->Mvctx);
721       CHKERR(ierr);
722       ierr = VecScatterEnd(xx,0,mat->lvec,0,InsertValues,ScatterUp,mat->Mvctx);
723       CHKERR(ierr);
724     }
725     while (its--) {
726       /* go down through the rows */
727       ierr = VecPipelineBegin(xx,0,mat->lvec,0,InsertValues,PipelineDown,
728                               mat->Mvctx); CHKERR(ierr);
729       for ( i=0; i<m; i++ ) {
730         n    = A->i[i+1] - A->i[i];
731         idx  = A->j + A->i[i] - 1;
732         v    = A->a + A->i[i] - 1;
733         sum  = b[i];
734         SPARSEDENSEMDOT(sum,xs,v,idx,n);
735         d    = shift + A->a[diag[i]-1];
736         n    = B->i[i+1] - B->i[i];
737         idx  = B->j + B->i[i] - 1;
738         v    = B->a + B->i[i] - 1;
739         SPARSEDENSEMDOT(sum,ls,v,idx,n);
740         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
741       }
742       ierr = VecPipelineEnd(xx,0,mat->lvec,0,InsertValues,PipelineDown,
743                             mat->Mvctx); CHKERR(ierr);
744       /* come up through the rows */
745       ierr = VecPipelineBegin(xx,0,mat->lvec,0,InsertValues,PipelineUp,
746                               mat->Mvctx); CHKERR(ierr);
747       for ( i=m-1; i>-1; i-- ) {
748         n    = A->i[i+1] - A->i[i];
749         idx  = A->j + A->i[i] - 1;
750         v    = A->a + A->i[i] - 1;
751         sum  = b[i];
752         SPARSEDENSEMDOT(sum,xs,v,idx,n);
753         d    = shift + A->a[diag[i]-1];
754         n    = B->i[i+1] - B->i[i];
755         idx  = B->j + B->i[i] - 1;
756         v    = B->a + B->i[i] - 1;
757         SPARSEDENSEMDOT(sum,ls,v,idx,n);
758         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
759       }
760       ierr = VecPipelineEnd(xx,0,mat->lvec,0,InsertValues,PipelineUp,
761                             mat->Mvctx); CHKERR(ierr);
762     }
763   }
764   else if (flag & SOR_FORWARD_SWEEP){
765     if (flag & SOR_ZERO_INITIAL_GUESS) {
766       VecSet(&zero,mat->lvec);
767       ierr = VecPipelineBegin(xx,0,mat->lvec,0,InsertValues,PipelineDown,
768                               mat->Mvctx); CHKERR(ierr);
769       for ( i=0; i<m; i++ ) {
770         n    = diag[i] - A->i[i];
771         idx  = A->j + A->i[i] - 1;
772         v    = A->a + A->i[i] - 1;
773         sum  = b[i];
774         SPARSEDENSEMDOT(sum,xs,v,idx,n);
775         d    = shift + A->a[diag[i]-1];
776         n    = B->i[i+1] - B->i[i];
777         idx  = B->j + B->i[i] - 1;
778         v    = B->a + B->i[i] - 1;
779         SPARSEDENSEMDOT(sum,ls,v,idx,n);
780         x[i] = omega*(sum/d);
781       }
782       ierr = VecPipelineEnd(xx,0,mat->lvec,0,InsertValues,PipelineDown,
783                             mat->Mvctx); CHKERR(ierr);
784       its--;
785     }
786     while (its--) {
787       ierr=VecScatterBegin(xx,0,mat->lvec,0,InsertValues,ScatterUp,mat->Mvctx);
788       CHKERR(ierr);
789       ierr = VecScatterEnd(xx,0,mat->lvec,0,InsertValues,ScatterUp,mat->Mvctx);
790       CHKERR(ierr);
791       ierr = VecPipelineBegin(xx,0,mat->lvec,0,InsertValues,PipelineDown,
792                               mat->Mvctx); CHKERR(ierr);
793       for ( i=0; i<m; i++ ) {
794         n    = A->i[i+1] - A->i[i];
795         idx  = A->j + A->i[i] - 1;
796         v    = A->a + A->i[i] - 1;
797         sum  = b[i];
798         SPARSEDENSEMDOT(sum,xs,v,idx,n);
799         d    = shift + A->a[diag[i]-1];
800         n    = B->i[i+1] - B->i[i];
801         idx  = B->j + B->i[i] - 1;
802         v    = B->a + B->i[i] - 1;
803         SPARSEDENSEMDOT(sum,ls,v,idx,n);
804         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
805       }
806       ierr = VecPipelineEnd(xx,0,mat->lvec,0,InsertValues,PipelineDown,
807                             mat->Mvctx); CHKERR(ierr);
808     }
809   }
810   else if (flag & SOR_BACKWARD_SWEEP){
811     if (flag & SOR_ZERO_INITIAL_GUESS) {
812       VecSet(&zero,mat->lvec);
813       ierr = VecPipelineBegin(xx,0,mat->lvec,0,InsertValues,PipelineUp,
814                               mat->Mvctx); CHKERR(ierr);
815       for ( i=m-1; i>-1; i-- ) {
816         n    = A->i[i+1] - diag[i] - 1;
817         idx  = A->j + diag[i];
818         v    = A->a + diag[i];
819         sum  = b[i];
820         SPARSEDENSEMDOT(sum,xs,v,idx,n);
821         d    = shift + A->a[diag[i]-1];
822         n    = B->i[i+1] - B->i[i];
823         idx  = B->j + B->i[i] - 1;
824         v    = B->a + B->i[i] - 1;
825         SPARSEDENSEMDOT(sum,ls,v,idx,n);
826         x[i] = omega*(sum/d);
827       }
828       ierr = VecPipelineEnd(xx,0,mat->lvec,0,InsertValues,PipelineUp,
829                             mat->Mvctx); CHKERR(ierr);
830       its--;
831     }
832     while (its--) {
833       ierr = VecScatterBegin(xx,0,mat->lvec,0,InsertValues,ScatterDown,
834                             mat->Mvctx); CHKERR(ierr);
835       ierr = VecScatterEnd(xx,0,mat->lvec,0,InsertValues,ScatterDown,
836                             mat->Mvctx); CHKERR(ierr);
837       ierr = VecPipelineBegin(xx,0,mat->lvec,0,InsertValues,PipelineUp,
838                               mat->Mvctx); CHKERR(ierr);
839       for ( i=m-1; i>-1; i-- ) {
840         n    = A->i[i+1] - A->i[i];
841         idx  = A->j + A->i[i] - 1;
842         v    = A->a + A->i[i] - 1;
843         sum  = b[i];
844         SPARSEDENSEMDOT(sum,xs,v,idx,n);
845         d    = shift + A->a[diag[i]-1];
846         n    = B->i[i+1] - B->i[i];
847         idx  = B->j + B->i[i] - 1;
848         v    = B->a + B->i[i] - 1;
849         SPARSEDENSEMDOT(sum,ls,v,idx,n);
850         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
851       }
852       ierr = VecPipelineEnd(xx,0,mat->lvec,0,InsertValues,PipelineUp,
853                             mat->Mvctx); CHKERR(ierr);
854     }
855   }
856   else if ((flag & SOR_LOCAL_SYMMETRIC_SWEEP) == SOR_LOCAL_SYMMETRIC_SWEEP){
857     if (flag & SOR_ZERO_INITIAL_GUESS) {
858       return MatRelax(mat->A,bb,omega,flag,shift,its,xx);
859     }
860     ierr=VecScatterBegin(xx,0,mat->lvec,0,InsertValues,ScatterAll,mat->Mvctx);
861     CHKERR(ierr);
862     ierr = VecScatterEnd(xx,0,mat->lvec,0,InsertValues,ScatterAll,mat->Mvctx);
863     CHKERR(ierr);
864     while (its--) {
865       /* go down through the rows */
866       for ( i=0; i<m; i++ ) {
867         n    = A->i[i+1] - A->i[i];
868         idx  = A->j + A->i[i] - 1;
869         v    = A->a + A->i[i] - 1;
870         sum  = b[i];
871         SPARSEDENSEMDOT(sum,xs,v,idx,n);
872         d    = shift + A->a[diag[i]-1];
873         n    = B->i[i+1] - B->i[i];
874         idx  = B->j + B->i[i] - 1;
875         v    = B->a + B->i[i] - 1;
876         SPARSEDENSEMDOT(sum,ls,v,idx,n);
877         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
878       }
879       /* come up through the rows */
880       for ( i=m-1; i>-1; i-- ) {
881         n    = A->i[i+1] - A->i[i];
882         idx  = A->j + A->i[i] - 1;
883         v    = A->a + A->i[i] - 1;
884         sum  = b[i];
885         SPARSEDENSEMDOT(sum,xs,v,idx,n);
886         d    = shift + A->a[diag[i]-1];
887         n    = B->i[i+1] - B->i[i];
888         idx  = B->j + B->i[i] - 1;
889         v    = B->a + B->i[i] - 1;
890         SPARSEDENSEMDOT(sum,ls,v,idx,n);
891         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
892       }
893     }
894   }
895   else if (flag & SOR_LOCAL_FORWARD_SWEEP){
896     if (flag & SOR_ZERO_INITIAL_GUESS) {
897       return MatRelax(mat->A,bb,omega,flag,shift,its,xx);
898     }
899     ierr=VecScatterBegin(xx,0,mat->lvec,0,InsertValues,ScatterAll,mat->Mvctx);
900     CHKERR(ierr);
901     ierr = VecScatterEnd(xx,0,mat->lvec,0,InsertValues,ScatterAll,mat->Mvctx);
902     CHKERR(ierr);
903     while (its--) {
904       for ( i=0; i<m; i++ ) {
905         n    = A->i[i+1] - A->i[i];
906         idx  = A->j + A->i[i] - 1;
907         v    = A->a + A->i[i] - 1;
908         sum  = b[i];
909         SPARSEDENSEMDOT(sum,xs,v,idx,n);
910         d    = shift + A->a[diag[i]-1];
911         n    = B->i[i+1] - B->i[i];
912         idx  = B->j + B->i[i] - 1;
913         v    = B->a + B->i[i] - 1;
914         SPARSEDENSEMDOT(sum,ls,v,idx,n);
915         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
916       }
917     }
918   }
919   else if (flag & SOR_LOCAL_BACKWARD_SWEEP){
920     if (flag & SOR_ZERO_INITIAL_GUESS) {
921       return MatRelax(mat->A,bb,omega,flag,shift,its,xx);
922     }
923     ierr = VecScatterBegin(xx,0,mat->lvec,0,InsertValues,ScatterAll,
924                             mat->Mvctx); CHKERR(ierr);
925     ierr = VecScatterEnd(xx,0,mat->lvec,0,InsertValues,ScatterAll,
926                             mat->Mvctx); CHKERR(ierr);
927     while (its--) {
928       for ( i=m-1; i>-1; i-- ) {
929         n    = A->i[i+1] - A->i[i];
930         idx  = A->j + A->i[i] - 1;
931         v    = A->a + A->i[i] - 1;
932         sum  = b[i];
933         SPARSEDENSEMDOT(sum,xs,v,idx,n);
934         d    = shift + A->a[diag[i]-1];
935         n    = B->i[i+1] - B->i[i];
936         idx  = B->j + B->i[i] - 1;
937         v    = B->a + B->i[i] - 1;
938         SPARSEDENSEMDOT(sum,ls,v,idx,n);
939         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
940       }
941     }
942   }
943   return 0;
944 }
945 
946 static int MatGetInfo_MPIAIJ(Mat matin,int flag,int *nz,int *nzalloc,int *mem)
947 {
948   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
949   Mat        A = mat->A, B = mat->B;
950   int        ierr, isend[3], irecv[3], nzA, nzallocA, memA;
951 
952   ierr = MatGetInfo(A,MAT_LOCAL,&nzA,&nzallocA,&memA); CHKERR(ierr);
953   ierr = MatGetInfo(B,MAT_LOCAL,&isend[0],&isend[1],&isend[2]); CHKERR(ierr);
954   isend[0] += nzA; isend[1] += nzallocA; isend[2] += memA;
955   if (flag == MAT_LOCAL) {
956     *nz = isend[0]; *nzalloc = isend[1]; *mem = isend[2];
957   } else if (flag == MAT_GLOBAL_MAX) {
958     MPI_Allreduce((void *) isend,(void *) irecv,3,MPI_INT,MPI_MAX,matin->comm);
959     *nz = irecv[0]; *nzalloc = irecv[1]; *mem = irecv[2];
960   } else if (flag == MAT_GLOBAL_SUM) {
961     MPI_Allreduce((void *) isend,(void *) irecv,3,MPI_INT,MPI_SUM,matin->comm);
962     *nz = irecv[0]; *nzalloc = irecv[1]; *mem = irecv[2];
963   }
964   return 0;
965 }
966 
967 static int MatSetOption_MPIAIJ(Mat aijin,int op)
968 {
969   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) aijin->data;
970 
971   if      (op == NO_NEW_NONZERO_LOCATIONS)  {
972     MatSetOption(aij->A,op);
973     MatSetOption(aij->B,op);
974   }
975   else if (op == YES_NEW_NONZERO_LOCATIONS) {
976     MatSetOption(aij->A,op);
977     MatSetOption(aij->B,op);
978   }
979   else if (op == COLUMN_ORIENTED) SETERR(1,"Column oriented not supported");
980   return 0;
981 }
982 
983 static int MatGetSize_MPIAIJ(Mat matin,int *m,int *n)
984 {
985   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
986   *m = mat->M; *n = mat->N;
987   return 0;
988 }
989 
990 static int MatGetLocalSize_MPIAIJ(Mat matin,int *m,int *n)
991 {
992   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
993   *m = mat->m; *n = mat->n;
994   return 0;
995 }
996 
997 static int MatGetOwnershipRange_MPIAIJ(Mat matin,int *m,int *n)
998 {
999   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1000   *m = mat->rstart; *n = mat->rend;
1001   return 0;
1002 }
1003 
1004 static int MatGetRow_MPIAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
1005 {
1006   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1007   Scalar     *vworkA, *vworkB, **pvA, **pvB;
1008   int        i, ierr, *cworkA, *cworkB, **pcA, **pcB, cstart = mat->cstart;
1009   int        nztot, nzA, nzB, lrow, rstart = mat->rstart, rend = mat->rend;
1010 
1011   if (!mat->assembled)
1012     SETERR(1,"MatGetRow_MPIAIJ: Must assemble matrix first.");
1013   if (row < rstart || row >= rend)
1014     SETERR(1,"MatGetRow_MPIAIJ: Currently you can get only local rows.")
1015   lrow = row - rstart;
1016 
1017   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
1018   if (!v)   {pvA = 0; pvB = 0;}
1019   if (!idx) {pcA = 0; if (!v) pcB = 0;}
1020   ierr = MatGetRow(mat->A,lrow,&nzA,pcA,pvA); CHKERR(ierr);
1021   ierr = MatGetRow(mat->B,lrow,&nzB,pcB,pvB); CHKERR(ierr);
1022   nztot = nzA + nzB;
1023 
1024   if (v  || idx) {
1025     if (nztot) {
1026       /* Sort by increasing column numbers, assuming A and B already sorted */
1027       int imark, imark2;
1028       for (i=0; i<nzB; i++) cworkB[i] = mat->garray[cworkB[i]];
1029       if (v) {
1030         *v = (Scalar *) MALLOC( (nztot)*sizeof(Scalar) ); CHKPTR(*v);
1031         for ( i=0; i<nzB; i++ ) {
1032           if (cworkB[i] < cstart)   (*v)[i] = vworkB[i];
1033           else break;
1034         }
1035         imark = i;
1036         for ( i=0; i<nzA; i++ )     (*v)[imark+i] = vworkA[i];
1037         imark2 = imark+nzA;
1038         for ( i=imark; i<nzB; i++ ) (*v)[imark2+i] = vworkB[i];
1039       }
1040       if (idx) {
1041         *idx = (int *) MALLOC( (nztot)*sizeof(int) ); CHKPTR(*idx);
1042         for (i=0; i<nzA; i++) cworkA[i] += cstart;
1043         for ( i=0; i<nzB; i++ ) {
1044           if (cworkB[i] < cstart)   (*idx)[i] = cworkB[i];
1045           else break;
1046         }
1047         imark = i;
1048         for ( i=0; i<nzA; i++ )     (*idx)[imark+i] = cworkA[i];
1049         imark2 = imark+nzA;
1050         for ( i=imark; i<nzB; i++ ) (*idx)[imark2+i] = cworkB[i];
1051       }
1052     }
1053     else {*idx = 0; *v=0;}
1054   }
1055   *nz = nztot;
1056   ierr = MatRestoreRow(mat->A,lrow,&nzA,pcA,pvA); CHKERR(ierr);
1057   ierr = MatRestoreRow(mat->B,lrow,&nzB,pcB,pvB); CHKERR(ierr);
1058   return 0;
1059 }
1060 
1061 static int MatRestoreRow_MPIAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
1062 {
1063   if (idx) FREE(*idx);
1064   if (v) FREE(*v);
1065   return 0;
1066 }
1067 
1068 static int MatCopy_MPIAIJ(Mat,Mat *);
1069 extern int MatConvert_MPIAIJ(Mat,MATTYPE,Mat *);
1070 
1071 /* -------------------------------------------------------------------*/
1072 static struct _MatOps MatOps = {MatSetValues_MPIAIJ,
1073        MatGetRow_MPIAIJ,MatRestoreRow_MPIAIJ,
1074        MatMult_MPIAIJ,MatMultAdd_MPIAIJ,
1075        MatMultTrans_MPIAIJ,MatMultTransAdd_MPIAIJ,
1076        0,0,0,0,
1077        0,0,
1078        MatRelax_MPIAIJ,
1079        0,
1080        MatGetInfo_MPIAIJ,0,
1081        MatCopy_MPIAIJ,
1082        MatGetDiagonal_MPIAIJ,0,0,
1083        MatBeginAssembly_MPIAIJ,MatEndAssembly_MPIAIJ,
1084        0,
1085        MatSetOption_MPIAIJ,MatZeroEntries_MPIAIJ,MatZeroRows_MPIAIJ,0,
1086        0,0,0,0,
1087        MatGetSize_MPIAIJ,MatGetLocalSize_MPIAIJ,MatGetOwnershipRange_MPIAIJ,
1088        0,0,
1089        0,MatConvert_MPIAIJ };
1090 
1091 /*@
1092    MatCreateMPIAIJ - Creates a sparse parallel matrix in AIJ format
1093    (the default parallel PETSc format).
1094 
1095    Input Parameters:
1096 .  comm - MPI communicator
1097 .  m - number of local rows (or PETSC_DECIDE to have calculated)
1098 .  n - number of local columns (or PETSC_DECIDE to have calculated)
1099 .  M - number of global rows (or PETSC_DECIDE to have calculated)
1100 .  N - number of global columns (or PETSC_DECIDE to have calculated)
1101 .  d_nz - number of nonzeros per row in diagonal portion of matrix
1102            (same for all local rows)
1103 .  d_nzz - number of nonzeros per row in diagonal portion of matrix or null
1104            (possibly different for each row).  You must leave room for the
1105            diagonal entry even if it is zero.
1106 .  o_nz - number of nonzeros per row in off-diagonal portion of matrix
1107            (same for all local rows)
1108 .  o_nzz - number of nonzeros per row in off-diagonal portion of matrix
1109            or null (possibly different for each row).
1110 
1111    Output Parameter:
1112 .  newmat - the matrix
1113 
1114    Notes:
1115    The AIJ format (also called the Yale sparse matrix format or
1116    compressed row storage), is fully compatible with standard Fortran 77
1117    storage.  That is, the stored row and column indices begin at
1118    one, not zero.
1119 
1120    The user MUST specify either the local or global matrix dimensions
1121    (possibly both).
1122 
1123    The user can set d_nz, d_nnz, o_nz, and o_nnz to zero for PETSc to
1124    control dynamic memory allocation.
1125 
1126 .keywords: Mat, matrix, aij, compressed row, sparse, parallel
1127 
1128 .seealso: MatCreateSequentialAIJ(), MatSetValues()
1129 @*/
1130 int MatCreateMPIAIJ(MPI_Comm comm,int m,int n,int M,int N,
1131                  int d_nz,int *d_nnz, int o_nz,int *o_nnz,Mat *newmat)
1132 {
1133   Mat          mat;
1134   Mat_MPIAIJ   *aij;
1135   int          ierr, i,sum[2],work[2];
1136   *newmat         = 0;
1137   PETSCHEADERCREATE(mat,_Mat,MAT_COOKIE,MATMPIAIJ,comm);
1138   PLogObjectCreate(mat);
1139   mat->data       = (void *) (aij = NEW(Mat_MPIAIJ)); CHKPTR(aij);
1140   mat->ops        = &MatOps;
1141   mat->destroy    = MatDestroy_MPIAIJ;
1142   mat->view       = MatView_MPIAIJ;
1143   mat->factor     = 0;
1144 
1145   mat->comm       = comm;
1146   aij->insertmode = NotSetValues;
1147   MPI_Comm_rank(comm,&aij->mytid);
1148   MPI_Comm_size(comm,&aij->numtids);
1149 
1150   if (M == -1 || N == -1) {
1151     work[0] = m; work[1] = n;
1152     MPI_Allreduce((void *) work,(void *) sum,2,MPI_INT,MPI_SUM,comm );
1153     if (M == -1) M = sum[0];
1154     if (N == -1) N = sum[1];
1155   }
1156   if (m == -1) {m = M/aij->numtids + ((M % aij->numtids) > aij->mytid);}
1157   if (n == -1) {n = N/aij->numtids + ((N % aij->numtids) > aij->mytid);}
1158   aij->m       = m;
1159   aij->n       = n;
1160   aij->N       = N;
1161   aij->M       = M;
1162 
1163   /* build local table of row and column ownerships */
1164   aij->rowners = (int *) MALLOC(2*(aij->numtids+2)*sizeof(int));
1165   CHKPTR(aij->rowners);
1166   aij->cowners = aij->rowners + aij->numtids + 1;
1167   MPI_Allgather(&m,1,MPI_INT,aij->rowners+1,1,MPI_INT,comm);
1168   aij->rowners[0] = 0;
1169   for ( i=2; i<=aij->numtids; i++ ) {
1170     aij->rowners[i] += aij->rowners[i-1];
1171   }
1172   aij->rstart = aij->rowners[aij->mytid];
1173   aij->rend   = aij->rowners[aij->mytid+1];
1174   MPI_Allgather(&n,1,MPI_INT,aij->cowners+1,1,MPI_INT,comm);
1175   aij->cowners[0] = 0;
1176   for ( i=2; i<=aij->numtids; i++ ) {
1177     aij->cowners[i] += aij->cowners[i-1];
1178   }
1179   aij->cstart = aij->cowners[aij->mytid];
1180   aij->cend   = aij->cowners[aij->mytid+1];
1181 
1182 
1183   ierr = MatCreateSequentialAIJ(MPI_COMM_SELF,m,n,d_nz,d_nnz,&aij->A);
1184   CHKERR(ierr);
1185   PLogObjectParent(mat,aij->A);
1186   ierr = MatCreateSequentialAIJ(MPI_COMM_SELF,m,N,o_nz,o_nnz,&aij->B);
1187   CHKERR(ierr);
1188   PLogObjectParent(mat,aij->B);
1189 
1190   /* build cache for off array entries formed */
1191   aij->stash.nmax = CHUNCKSIZE; /* completely arbratray number */
1192   aij->stash.n    = 0;
1193   aij->stash.array = (Scalar *) MALLOC( aij->stash.nmax*(2*sizeof(int) +
1194                             sizeof(Scalar))); CHKPTR(aij->stash.array);
1195   aij->stash.idx = (int *) (aij->stash.array + aij->stash.nmax);
1196   aij->stash.idy = (int *) (aij->stash.idx + aij->stash.nmax);
1197   aij->colmap    = 0;
1198   aij->garray    = 0;
1199 
1200   /* stuff used for matrix vector multiply */
1201   aij->lvec      = 0;
1202   aij->Mvctx     = 0;
1203   aij->assembled = 0;
1204 
1205   *newmat = mat;
1206   return 0;
1207 }
1208 
1209 static int MatCopy_MPIAIJ(Mat matin,Mat *newmat)
1210 {
1211   Mat        mat;
1212   Mat_MPIAIJ *aij,*oldmat = (Mat_MPIAIJ *) matin->data;
1213   int        ierr, len;
1214   *newmat      = 0;
1215 
1216   if (!oldmat->assembled) SETERR(1,"Cannot copy unassembled matrix");
1217   PETSCHEADERCREATE(mat,_Mat,MAT_COOKIE,MATMPIAIJ,matin->comm);
1218   PLogObjectCreate(mat);
1219   mat->data       = (void *) (aij = NEW(Mat_MPIAIJ)); CHKPTR(aij);
1220   mat->ops        = &MatOps;
1221   mat->destroy    = MatDestroy_MPIAIJ;
1222   mat->view       = MatView_MPIAIJ;
1223   mat->factor     = matin->factor;
1224 
1225   aij->m          = oldmat->m;
1226   aij->n          = oldmat->n;
1227   aij->M          = oldmat->M;
1228   aij->N          = oldmat->N;
1229 
1230   aij->assembled  = 1;
1231   aij->rstart     = oldmat->rstart;
1232   aij->rend       = oldmat->rend;
1233   aij->cstart     = oldmat->cstart;
1234   aij->cend       = oldmat->cend;
1235   aij->numtids    = oldmat->numtids;
1236   aij->mytid      = oldmat->mytid;
1237   aij->insertmode = NotSetValues;
1238 
1239   aij->rowners        = (int *) MALLOC( (aij->numtids+1)*sizeof(int) );
1240   CHKPTR(aij->rowners);
1241   MEMCPY(aij->rowners,oldmat->rowners,(aij->numtids+1)*sizeof(int));
1242   aij->stash.nmax     = 0;
1243   aij->stash.n        = 0;
1244   aij->stash.array    = 0;
1245   if (oldmat->colmap) {
1246     aij->colmap      = (int *) MALLOC( (aij->N)*sizeof(int) );
1247     CHKPTR(aij->colmap);
1248     MEMCPY(aij->colmap,oldmat->colmap,(aij->N)*sizeof(int));
1249   } else aij->colmap = 0;
1250     if (oldmat->garray && (len = ((Mat_AIJ *) (oldmat->B->data))->n)) {
1251     aij->garray      = (int *) MALLOC(len*sizeof(int) ); CHKPTR(aij->garray);
1252     MEMCPY(aij->garray,oldmat->garray,len*sizeof(int));
1253   } else aij->garray = 0;
1254   mat->comm           = matin->comm;
1255 
1256   ierr =  VecCreate(oldmat->lvec,&aij->lvec); CHKERR(ierr);
1257   PLogObjectParent(mat,aij->lvec);
1258   ierr =  VecScatterCtxCopy(oldmat->Mvctx,&aij->Mvctx); CHKERR(ierr);
1259   PLogObjectParent(mat,aij->Mvctx);
1260   ierr =  MatCopy(oldmat->A,&aij->A); CHKERR(ierr);
1261   PLogObjectParent(mat,aij->A);
1262   ierr =  MatCopy(oldmat->B,&aij->B); CHKERR(ierr);
1263   PLogObjectParent(mat,aij->B);
1264   *newmat = mat;
1265   return 0;
1266 }
1267