xref: /petsc/src/mat/impls/aij/mpi/mpiaij.c (revision 8e37dc09c3c07dccd819271701c1b69f0d3c0aa4)
1 #ifndef lint
2 static char vcid[] = "$Id: mpiaij.c,v 1.15 1995/03/23 23:12:56 curfman Exp $";
3 #endif
4 
5 #include "mpiaij.h"
6 #include "vec/vecimpl.h"
7 #include "inline/spops.h"
8 
9 #define CHUNCKSIZE   100
10 /*
11    This is a simple minded stash. Do a linear search to determine if
12  in stash, if not add to end.
13 */
14 static int StashValues(Stash *stash,int row,int n, int *idxn,
15                        Scalar *values,InsertMode addv)
16 {
17   int    i,j,N = stash->n,found,*n_idx, *n_idy;
18   Scalar val,*n_array;
19 
20   for ( i=0; i<n; i++ ) {
21     found = 0;
22     val = *values++;
23     for ( j=0; j<N; j++ ) {
24       if ( stash->idx[j] == row && stash->idy[j] == idxn[i]) {
25         /* found a match */
26         if (addv == AddValues) stash->array[j] += val;
27         else stash->array[j] = val;
28         found = 1;
29         break;
30       }
31     }
32     if (!found) { /* not found so add to end */
33       if ( stash->n == stash->nmax ) {
34         /* allocate a larger stash */
35         n_array = (Scalar *) MALLOC( (stash->nmax + CHUNCKSIZE)*(
36                                      2*sizeof(int) + sizeof(Scalar)));
37         CHKPTR(n_array);
38         n_idx = (int *) (n_array + stash->nmax + CHUNCKSIZE);
39         n_idy = (int *) (n_idx + stash->nmax + CHUNCKSIZE);
40         MEMCPY(n_array,stash->array,stash->nmax*sizeof(Scalar));
41         MEMCPY(n_idx,stash->idx,stash->nmax*sizeof(int));
42         MEMCPY(n_idy,stash->idy,stash->nmax*sizeof(int));
43         if (stash->array) FREE(stash->array);
44         stash->array = n_array; stash->idx = n_idx; stash->idy = n_idy;
45         stash->nmax += CHUNCKSIZE;
46       }
47       stash->array[stash->n]   = val;
48       stash->idx[stash->n]     = row;
49       stash->idy[stash->n++]   = idxn[i];
50     }
51   }
52   return 0;
53 }
54 
55 /* local utility routine that creates a mapping from the global column
56 number to the local number in the off-diagonal part of the local
57 storage of the matrix.  This is done in a non scable way since the
58 length of colmap equals the global matrix length.
59 */
60 static int CreateColmap(Mat mat)
61 {
62   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
63   Mat_AIJ    *B = (Mat_AIJ*) aij->B->data;
64   int        n = B->n,i;
65   aij->colmap = (int *) MALLOC( aij->N*sizeof(int) ); CHKPTR(aij->colmap);
66   aij->invcolmap = (int *) MALLOC( aij->N*sizeof(int) ); CHKPTR(aij->invcolmap);
67   MEMSET(aij->colmap,0,aij->N*sizeof(int));
68   MEMSET(aij->invcolmap,0,aij->N*sizeof(int));
69   for ( i=0; i<n; i++ ) {
70     aij->colmap[aij->garray[i]] = i+1;
71     aij->invcolmap[i+1] = aij->garray[i];
72   }
73   return 0;
74 }
75 
76 static int MatInsertValues_MPIAIJ(Mat mat,int m,int *idxm,int n,
77                             int *idxn,Scalar *v,InsertMode addv)
78 {
79   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
80   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
81   int        cstart = aij->cstart, cend = aij->cend,row,col;
82 
83   if (aij->insertmode != NotSetValues && aij->insertmode != addv) {
84     SETERR(1,"You cannot mix inserts and adds");
85   }
86   aij->insertmode = addv;
87   for ( i=0; i<m; i++ ) {
88     if (idxm[i] < 0) SETERR(1,"Negative row index");
89     if (idxm[i] >= aij->M) SETERR(1,"Row index too large");
90     if (idxm[i] >= rstart && idxm[i] < rend) {
91       row = idxm[i] - rstart;
92       for ( j=0; j<n; j++ ) {
93         if (idxn[j] < 0) SETERR(1,"Negative column index");
94         if (idxn[j] >= aij->N) SETERR(1,"Column index too large");
95         if (idxn[j] >= cstart && idxn[j] < cend){
96           col = idxn[j] - cstart;
97           ierr = MatSetValues(aij->A,1,&row,1,&col,v+i*n+j,addv);CHKERR(ierr);
98         }
99         else {
100           if (aij->assembled) {
101             if (!aij->colmap) {ierr = CreateColmap(mat); CHKERR(ierr);}
102             col = aij->colmap[idxn[j]] - 1;
103             if (col < 0) {
104               SETERR(1,"Cannot insert new off diagonal block nonzero in\
105                      already\
106                      assembled matrix. Contact petsc-maint@mcs.anl.gov\
107                      if your need this feature");
108             }
109           }
110           else col = idxn[j];
111           ierr = MatSetValues(aij->B,1,&row,1,&col,v+i*n+j,addv);CHKERR(ierr);
112         }
113       }
114     }
115     else {
116       ierr = StashValues(&aij->stash,idxm[i],n,idxn,v+i*n,addv);CHKERR(ierr);
117     }
118   }
119   return 0;
120 }
121 
122 /*
123     the assembly code is alot like the code for vectors, we should
124     sometime derive a single assembly code that can be used for
125     either case.
126 */
127 
128 static int MatBeginAssemble_MPIAIJ(Mat mat)
129 {
130   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
131   MPI_Comm    comm = mat->comm;
132   int         numtids = aij->numtids, *owners = aij->rowners;
133   int         mytid = aij->mytid;
134   MPI_Request *send_waits,*recv_waits;
135   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
136   int         tag = 50, *owner,*starts,count;
137   InsertMode  addv;
138   Scalar      *rvalues,*svalues;
139 
140   /* make sure all processors are either in INSERTMODE or ADDMODE */
141   MPI_Allreduce((void *) &aij->insertmode,(void *) &addv,1,MPI_INT,
142                 MPI_BOR,comm);
143   if (addv == (AddValues|InsertValues)) {
144     SETERR(1,"Some processors have inserted while others have added");
145   }
146   aij->insertmode = addv; /* in case this processor had no cache */
147 
148   /*  first count number of contributors to each processor */
149   nprocs = (int *) MALLOC( 2*numtids*sizeof(int) ); CHKPTR(nprocs);
150   MEMSET(nprocs,0,2*numtids*sizeof(int)); procs = nprocs + numtids;
151   owner = (int *) MALLOC( (aij->stash.n+1)*sizeof(int) ); CHKPTR(owner);
152   for ( i=0; i<aij->stash.n; i++ ) {
153     idx = aij->stash.idx[i];
154     for ( j=0; j<numtids; j++ ) {
155       if (idx >= owners[j] && idx < owners[j+1]) {
156         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
157       }
158     }
159   }
160   nsends = 0;  for ( i=0; i<numtids; i++ ) { nsends += procs[i];}
161 
162   /* inform other processors of number of messages and max length*/
163   work = (int *) MALLOC( numtids*sizeof(int) ); CHKPTR(work);
164   MPI_Allreduce((void *) procs,(void *) work,numtids,MPI_INT,MPI_SUM,comm);
165   nreceives = work[mytid];
166   MPI_Allreduce((void *) nprocs,(void *) work,numtids,MPI_INT,MPI_MAX,comm);
167   nmax = work[mytid];
168   FREE(work);
169 
170   /* post receives:
171        1) each message will consist of ordered pairs
172      (global index,value) we store the global index as a double
173      to simplify the message passing.
174        2) since we don't know how long each individual message is we
175      allocate the largest needed buffer for each receive. Potentially
176      this is a lot of wasted space.
177 
178 
179        This could be done better.
180   */
181   rvalues = (Scalar *) MALLOC(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
182   CHKPTR(rvalues);
183   recv_waits = (MPI_Request *) MALLOC((nreceives+1)*sizeof(MPI_Request));
184   CHKPTR(recv_waits);
185   for ( i=0; i<nreceives; i++ ) {
186     MPI_Irecv((void *)(rvalues+3*nmax*i),3*nmax,MPI_SCALAR,MPI_ANY_SOURCE,tag,
187               comm,recv_waits+i);
188   }
189 
190   /* do sends:
191       1) starts[i] gives the starting index in svalues for stuff going to
192          the ith processor
193   */
194   svalues = (Scalar *) MALLOC( 3*(aij->stash.n+1)*sizeof(Scalar) );
195   CHKPTR(svalues);
196   send_waits = (MPI_Request *) MALLOC( (nsends+1)*sizeof(MPI_Request));
197   CHKPTR(send_waits);
198   starts = (int *) MALLOC( numtids*sizeof(int) ); CHKPTR(starts);
199   starts[0] = 0;
200   for ( i=1; i<numtids; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
201   for ( i=0; i<aij->stash.n; i++ ) {
202     svalues[3*starts[owner[i]]]       = (Scalar)  aij->stash.idx[i];
203     svalues[3*starts[owner[i]]+1]     = (Scalar)  aij->stash.idy[i];
204     svalues[3*(starts[owner[i]]++)+2] =  aij->stash.array[i];
205   }
206   FREE(owner);
207   starts[0] = 0;
208   for ( i=1; i<numtids; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
209   count = 0;
210   for ( i=0; i<numtids; i++ ) {
211     if (procs[i]) {
212       MPI_Isend((void*)(svalues+3*starts[i]),3*nprocs[i],MPI_SCALAR,i,tag,
213                 comm,send_waits+count++);
214     }
215   }
216   FREE(starts); FREE(nprocs);
217 
218   /* Free cache space */
219   aij->stash.nmax = aij->stash.n = 0;
220   if (aij->stash.array){ FREE(aij->stash.array); aij->stash.array = 0;}
221 
222   aij->svalues    = svalues;       aij->rvalues = rvalues;
223   aij->nsends     = nsends;         aij->nrecvs = nreceives;
224   aij->send_waits = send_waits; aij->recv_waits = recv_waits;
225   aij->rmax       = nmax;
226 
227   return 0;
228 }
229 extern int MatSetUpMultiply_MPIAIJ(Mat);
230 
231 static int MatEndAssemble_MPIAIJ(Mat mat)
232 {
233   int        ierr;
234   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
235 
236   MPI_Status  *send_status,recv_status;
237   int         imdex,nrecvs = aij->nrecvs, count = nrecvs, i, n;
238   int         row,col;
239   Scalar      *values,val;
240   InsertMode  addv = aij->insertmode;
241 
242   /*  wait on receives */
243   while (count) {
244     MPI_Waitany(nrecvs,aij->recv_waits,&imdex,&recv_status);
245     /* unpack receives into our local space */
246     values = aij->rvalues + 3*imdex*aij->rmax;
247     MPI_Get_count(&recv_status,MPI_SCALAR,&n);
248     n = n/3;
249     for ( i=0; i<n; i++ ) {
250       row = (int) PETSCREAL(values[3*i]) - aij->rstart;
251       col = (int) PETSCREAL(values[3*i+1]);
252       val = values[3*i+2];
253       if (col >= aij->cstart && col < aij->cend) {
254           col -= aij->cstart;
255         MatSetValues(aij->A,1,&row,1,&col,&val,addv);
256       }
257       else {
258         if (aij->assembled) {
259           if (!aij->colmap) {ierr = CreateColmap(mat); CHKERR(ierr);}
260           col = aij->colmap[col] - 1;
261           if (col < 0) {
262             SETERR(1,"Cannot insert new off diagonal block nonzero in\
263                      already\
264                      assembled matrix. Contact petsc-maint@mcs.anl.gov\
265                      if your need this feature");
266           }
267         }
268         MatSetValues(aij->B,1,&row,1,&col,&val,addv);
269       }
270     }
271     count--;
272   }
273   FREE(aij->recv_waits); FREE(aij->rvalues);
274 
275   /* wait on sends */
276   if (aij->nsends) {
277     send_status = (MPI_Status *) MALLOC( aij->nsends*sizeof(MPI_Status) );
278     CHKPTR(send_status);
279     MPI_Waitall(aij->nsends,aij->send_waits,send_status);
280     FREE(send_status);
281   }
282   FREE(aij->send_waits); FREE(aij->svalues);
283 
284   aij->insertmode = NotSetValues;
285   ierr = MatBeginAssembly(aij->A); CHKERR(ierr);
286   ierr = MatEndAssembly(aij->A); CHKERR(ierr);
287 
288   if (!aij->assembled) {
289     ierr = MatSetUpMultiply_MPIAIJ(mat); CHKERR(ierr);
290   }
291   ierr = MatBeginAssembly(aij->B); CHKERR(ierr);
292   ierr = MatEndAssembly(aij->B); CHKERR(ierr);
293 
294   aij->assembled = 1;
295   return 0;
296 }
297 
298 static int MatZero_MPIAIJ(Mat A)
299 {
300   Mat_MPIAIJ *l = (Mat_MPIAIJ *) A->data;
301 
302   MatZeroEntries(l->A); MatZeroEntries(l->B);
303   return 0;
304 }
305 
306 /* again this uses the same basic stratagy as in the assembly and
307    scatter create routines, we should try to do it systemamatically
308    if we can figure out the proper level of generality. */
309 
310 /* the code does not do the diagonal entries correctly unless the
311    matrix is square and the column and row owerships are identical.
312    This is a BUG. The only way to fix it seems to be to access
313    aij->A and aij->B directly and not through the MatZeroRows()
314    routine.
315 */
316 static int MatZeroRows_MPIAIJ(Mat A,IS is,Scalar *diag)
317 {
318   Mat_MPIAIJ     *l = (Mat_MPIAIJ *) A->data;
319   int            i,ierr,N, *rows,*owners = l->rowners,numtids = l->numtids;
320   int            *procs,*nprocs,j,found,idx,nsends,*work;
321   int            nmax,*svalues,*starts,*owner,nrecvs,mytid = l->mytid;
322   int            *rvalues,tag = 67,count,base,slen,n,*source;
323   int            *lens,imdex,*lrows,*values;
324   MPI_Comm       comm = A->comm;
325   MPI_Request    *send_waits,*recv_waits;
326   MPI_Status     recv_status,*send_status;
327   IS             istmp;
328 
329   if (!l->assembled) SETERR(1,"MatZeroRows_MPIAIJ: must assemble matrix first");
330   ierr = ISGetLocalSize(is,&N); CHKERR(ierr);
331   ierr = ISGetIndices(is,&rows); CHKERR(ierr);
332 
333   /*  first count number of contributors to each processor */
334   nprocs = (int *) MALLOC( 2*numtids*sizeof(int) ); CHKPTR(nprocs);
335   MEMSET(nprocs,0,2*numtids*sizeof(int)); procs = nprocs + numtids;
336   owner = (int *) MALLOC((N+1)*sizeof(int)); CHKPTR(owner); /* see note*/
337   for ( i=0; i<N; i++ ) {
338     idx = rows[i];
339     found = 0;
340     for ( j=0; j<numtids; j++ ) {
341       if (idx >= owners[j] && idx < owners[j+1]) {
342         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
343       }
344     }
345     if (!found) SETERR(1,"Imdex out of range");
346   }
347   nsends = 0;  for ( i=0; i<numtids; i++ ) { nsends += procs[i];}
348 
349   /* inform other processors of number of messages and max length*/
350   work = (int *) MALLOC( numtids*sizeof(int) ); CHKPTR(work);
351   MPI_Allreduce((void *) procs,(void *) work,numtids,MPI_INT,MPI_SUM,comm);
352   nrecvs = work[mytid];
353   MPI_Allreduce((void *) nprocs,(void *) work,numtids,MPI_INT,MPI_MAX,comm);
354   nmax = work[mytid];
355   FREE(work);
356 
357   /* post receives:   */
358   rvalues = (int *) MALLOC((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
359   CHKPTR(rvalues);
360   recv_waits = (MPI_Request *) MALLOC((nrecvs+1)*sizeof(MPI_Request));
361   CHKPTR(recv_waits);
362   for ( i=0; i<nrecvs; i++ ) {
363     MPI_Irecv((void *)(rvalues+nmax*i),nmax,MPI_INT,MPI_ANY_SOURCE,tag,
364               comm,recv_waits+i);
365   }
366 
367   /* do sends:
368       1) starts[i] gives the starting index in svalues for stuff going to
369          the ith processor
370   */
371   svalues = (int *) MALLOC( (N+1)*sizeof(int) ); CHKPTR(svalues);
372   send_waits = (MPI_Request *) MALLOC( (nsends+1)*sizeof(MPI_Request));
373   CHKPTR(send_waits);
374   starts = (int *) MALLOC( (numtids+1)*sizeof(int) ); CHKPTR(starts);
375   starts[0] = 0;
376   for ( i=1; i<numtids; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
377   for ( i=0; i<N; i++ ) {
378     svalues[starts[owner[i]]++] = rows[i];
379   }
380   ISRestoreIndices(is,&rows);
381 
382   starts[0] = 0;
383   for ( i=1; i<numtids+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
384   count = 0;
385   for ( i=0; i<numtids; i++ ) {
386     if (procs[i]) {
387       MPI_Isend((void*)(svalues+starts[i]),nprocs[i],MPI_INT,i,tag,
388                 comm,send_waits+count++);
389     }
390   }
391   FREE(starts);
392 
393   base = owners[mytid];
394 
395   /*  wait on receives */
396   lens = (int *) MALLOC( 2*(nrecvs+1)*sizeof(int) ); CHKPTR(lens);
397   source = lens + nrecvs;
398   count = nrecvs; slen = 0;
399   while (count) {
400     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
401     /* unpack receives into our local space */
402     MPI_Get_count(&recv_status,MPI_INT,&n);
403     source[imdex]  = recv_status.MPI_SOURCE;
404     lens[imdex]  = n;
405     slen += n;
406     count--;
407   }
408   FREE(recv_waits);
409 
410   /* move the data into the send scatter */
411   lrows = (int *) MALLOC( slen*sizeof(int) ); CHKPTR(lrows);
412   count = 0;
413   for ( i=0; i<nrecvs; i++ ) {
414     values = rvalues + i*nmax;
415     for ( j=0; j<lens[i]; j++ ) {
416       lrows[count++] = values[j] - base;
417     }
418   }
419   FREE(rvalues); FREE(lens);
420   FREE(owner); FREE(nprocs);
421 
422   /* actually zap the local rows */
423   ierr = ISCreateSequential(slen,lrows,&istmp); CHKERR(ierr);  FREE(lrows);
424   ierr = MatZeroRows(l->A,istmp,diag); CHKERR(ierr);
425   ierr = MatZeroRows(l->B,istmp,0); CHKERR(ierr);
426   ierr = ISDestroy(istmp); CHKERR(ierr);
427 
428   /* wait on sends */
429   if (nsends) {
430     send_status = (MPI_Status *) MALLOC( nsends*sizeof(MPI_Status) );
431     CHKPTR(send_status);
432     MPI_Waitall(nsends,send_waits,send_status);
433     FREE(send_status);
434   }
435   FREE(send_waits); FREE(svalues);
436 
437 
438   return 0;
439 }
440 
441 static int MatMult_MPIAIJ(Mat aijin,Vec xx,Vec yy)
442 {
443   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) aijin->data;
444   int        ierr;
445   if (!aij->assembled) SETERR(1,"MatMult_MPIAIJ: must assemble matrix first");
446   ierr = VecScatterBegin(xx,0,aij->lvec,0,InsertValues,ScatterAll,aij->Mvctx);
447   CHKERR(ierr);
448   ierr = MatMult(aij->A,xx,yy); CHKERR(ierr);
449   ierr = VecScatterEnd(xx,0,aij->lvec,0,InsertValues,ScatterAll,aij->Mvctx);
450   CHKERR(ierr);
451   ierr = MatMultAdd(aij->B,aij->lvec,yy,yy); CHKERR(ierr);
452   return 0;
453 }
454 
455 static int MatMultAdd_MPIAIJ(Mat aijin,Vec xx,Vec yy,Vec zz)
456 {
457   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) aijin->data;
458   int        ierr;
459   if (!aij->assembled) SETERR(1,"MatMult_MPIAIJ: must assemble matrix first");
460   ierr = VecScatterBegin(xx,0,aij->lvec,0,InsertValues,ScatterAll,aij->Mvctx);
461   CHKERR(ierr);
462   ierr = MatMultAdd(aij->A,xx,yy,zz); CHKERR(ierr);
463   ierr = VecScatterEnd(xx,0,aij->lvec,0,InsertValues,ScatterAll,aij->Mvctx);
464   CHKERR(ierr);
465   ierr = MatMultAdd(aij->B,aij->lvec,zz,zz); CHKERR(ierr);
466   return 0;
467 }
468 
469 static int MatMultTrans_MPIAIJ(Mat aijin,Vec xx,Vec yy)
470 {
471   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) aijin->data;
472   int        ierr;
473 
474   if (!aij->assembled)
475     SETERR(1,"MatMulTrans_MPIAIJ: must assemble matrix first");
476   /* do nondiagonal part */
477   ierr = MatMultTrans(aij->B,xx,aij->lvec); CHKERR(ierr);
478   /* send it on its way */
479   ierr = VecScatterBegin(aij->lvec,0,yy,0,AddValues,
480                          ScatterAll|ScatterReverse,aij->Mvctx); CHKERR(ierr);
481   /* do local part */
482   ierr = MatMultTrans(aij->A,xx,yy); CHKERR(ierr);
483   /* receive remote parts: note this assumes the values are not actually */
484   /* inserted in yy until the next line, which is true for my implementation*/
485   /* but is not perhaps always true. */
486   ierr = VecScatterEnd(aij->lvec,0,yy,0,AddValues,ScatterAll|ScatterReverse,
487                          aij->Mvctx); CHKERR(ierr);
488   return 0;
489 }
490 
491 static int MatMultTransAdd_MPIAIJ(Mat aijin,Vec xx,Vec yy,Vec zz)
492 {
493   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) aijin->data;
494   int        ierr;
495 
496   if (!aij->assembled)
497     SETERR(1,"MatMulTransAdd_MPIAIJ: must assemble matrix first");
498   /* do nondiagonal part */
499   ierr = MatMultTrans(aij->B,xx,aij->lvec); CHKERR(ierr);
500   /* send it on its way */
501   ierr = VecScatterBegin(aij->lvec,0,zz,0,AddValues,
502                          ScatterAll|ScatterReverse,aij->Mvctx); CHKERR(ierr);
503   /* do local part */
504   ierr = MatMultTransAdd(aij->A,xx,yy,zz); CHKERR(ierr);
505   /* receive remote parts: note this assumes the values are not actually */
506   /* inserted in yy until the next line, which is true for my implementation*/
507   /* but is not perhaps always true. */
508   ierr = VecScatterEnd(aij->lvec,0,zz,0,AddValues,ScatterAll|ScatterReverse,
509                          aij->Mvctx); CHKERR(ierr);
510   return 0;
511 }
512 
513 /*
514   This only works correctly for square matrices where the subblock A->A is the
515    diagonal block
516 */
517 static int MatGetDiag_MPIAIJ(Mat Ain,Vec v)
518 {
519   Mat_MPIAIJ *A = (Mat_MPIAIJ *) Ain->data;
520   if (!A->assembled) SETERR(1,"MatGetDiag_MPIAIJ: must assemble matrix first");
521   return MatGetDiagonal(A->A,v);
522 }
523 
524 static int MatDestroy_MPIAIJ(PetscObject obj)
525 {
526   Mat        mat = (Mat) obj;
527   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
528   int        ierr;
529 #if defined(PETSC_LOG)
530   PLogObjectState(obj,"Rows %d Cols %d",aij->M,aij->N);
531 #endif
532   FREE(aij->rowners);
533   ierr = MatDestroy(aij->A); CHKERR(ierr);
534   ierr = MatDestroy(aij->B); CHKERR(ierr);
535   if (aij->colmap) FREE(aij->colmap);
536   if (aij->garray) FREE(aij->garray);
537   if (aij->lvec) VecDestroy(aij->lvec);
538   if (aij->Mvctx) VecScatterCtxDestroy(aij->Mvctx);
539   FREE(aij);
540   PLogObjectDestroy(mat);
541   PETSCHEADERDESTROY(mat);
542   return 0;
543 }
544 
545 static int MatView_MPIAIJ(PetscObject obj,Viewer viewer)
546 {
547   Mat        mat = (Mat) obj;
548   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
549   int        ierr;
550 
551   if (!aij->assembled) SETERR(1,"MatView_MPIAIJ: must assemble matrix first");
552   MPE_Seq_begin(mat->comm,1);
553     ViewerPrintf(viewer,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
554           aij->mytid,aij->m,aij->rstart,aij->rend,aij->n,aij->cstart,
555           aij->cend);
556     ierr = MatView(aij->A,viewer); CHKERR(ierr);
557     ierr = MatView(aij->B,viewer); CHKERR(ierr);
558     ViewerFlush(viewer);
559   MPE_Seq_end(mat->comm,1);
560   return 0;
561 }
562 
563 extern int MatMarkDiag_AIJ(Mat_AIJ  *);
564 /*
565     This has to provide several versions.
566 
567      1) per sequential
568      2) a) use only local smoothing updating outer values only once.
569         b) local smoothing updating outer values each inner iteration
570      3) color updating out values betwen colors.
571 */
572 static int MatRelax_MPIAIJ(Mat matin,Vec bb,double omega,int flag,double shift,
573                         int its,Vec xx)
574 {
575   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
576   Mat        AA = mat->A, BB = mat->B;
577   Mat_AIJ    *A = (Mat_AIJ *) AA->data, *B = (Mat_AIJ *)BB->data;
578   Scalar     zero = 0.0,*b,*x,*xs,*ls,d,*v,sum,scale,*t,*ts;
579   int        ierr,*idx, *diag;
580   int        n = mat->n, m = mat->m, i;
581   Vec        tt;
582 
583   if (!mat->assembled) SETERR(1,"MatRelax_MPIAIJ: must assemble matrix first");
584 
585   VecGetArray(xx,&x); VecGetArray(bb,&b); VecGetArray(mat->lvec,&ls);
586   xs = x -1; /* shift by one for index start of 1 */
587   ls--;
588   if (!A->diag) {if ((ierr = MatMarkDiag_AIJ(A))) return ierr;}
589   diag = A->diag;
590   if (flag == SOR_APPLY_UPPER || flag == SOR_APPLY_LOWER) {
591     SETERR(1,"That option not yet support for parallel AIJ matrices");
592   }
593   if (flag & SOR_EISENSTAT) {
594     /* Let  A = L + U + D; where L is lower trianglar,
595     U is upper triangular, E is diagonal; This routine applies
596 
597             (L + E)^{-1} A (U + E)^{-1}
598 
599     to a vector efficiently using Eisenstat's trick. This is for
600     the case of SSOR preconditioner, so E is D/omega where omega
601     is the relaxation factor.
602     */
603     ierr = VecCreate(xx,&tt); CHKERR(ierr);
604     VecGetArray(tt,&t);
605     scale = (2.0/omega) - 1.0;
606     /*  x = (E + U)^{-1} b */
607     VecSet(&zero,mat->lvec);
608     ierr = VecPipelineBegin(xx,0,mat->lvec,0,InsertValues,PipelineUp,
609                               mat->Mvctx); CHKERR(ierr);
610     for ( i=m-1; i>-1; i-- ) {
611       n    = A->i[i+1] - diag[i] - 1;
612       idx  = A->j + diag[i];
613       v    = A->a + diag[i];
614       sum  = b[i];
615       SPARSEDENSEMDOT(sum,xs,v,idx,n);
616       d    = shift + A->a[diag[i]-1];
617       n    = B->i[i+1] - B->i[i];
618       idx  = B->j + B->i[i] - 1;
619       v    = B->a + B->i[i] - 1;
620       SPARSEDENSEMDOT(sum,ls,v,idx,n);
621       x[i] = omega*(sum/d);
622     }
623     ierr = VecPipelineEnd(xx,0,mat->lvec,0,InsertValues,PipelineUp,
624                             mat->Mvctx); CHKERR(ierr);
625 
626     /*  t = b - (2*E - D)x */
627     v = A->a;
628     for ( i=0; i<m; i++ ) { t[i] = b[i] - scale*(v[*diag++ - 1])*x[i]; }
629 
630     /*  t = (E + L)^{-1}t */
631     ts = t - 1; /* shifted by one for index start of a or mat->j*/
632     diag = A->diag;
633     VecSet(&zero,mat->lvec);
634     ierr = VecPipelineBegin(tt,0,mat->lvec,0,InsertValues,PipelineDown,
635                                                  mat->Mvctx); CHKERR(ierr);
636     for ( i=0; i<m; i++ ) {
637       n    = diag[i] - A->i[i];
638       idx  = A->j + A->i[i] - 1;
639       v    = A->a + A->i[i] - 1;
640       sum  = t[i];
641       SPARSEDENSEMDOT(sum,ts,v,idx,n);
642       d    = shift + A->a[diag[i]-1];
643       n    = B->i[i+1] - B->i[i];
644       idx  = B->j + B->i[i] - 1;
645       v    = B->a + B->i[i] - 1;
646       SPARSEDENSEMDOT(sum,ls,v,idx,n);
647       t[i] = omega*(sum/d);
648     }
649     ierr = VecPipelineEnd(tt,0,mat->lvec,0,InsertValues,PipelineDown,
650                                                     mat->Mvctx); CHKERR(ierr);
651     /*  x = x + t */
652     for ( i=0; i<m; i++ ) { x[i] += t[i]; }
653     VecDestroy(tt);
654     return 0;
655   }
656 
657 
658   if ((flag & SOR_SYMMETRIC_SWEEP) == SOR_SYMMETRIC_SWEEP){
659     if (flag & SOR_ZERO_INITIAL_GUESS) {
660       VecSet(&zero,mat->lvec); VecSet(&zero,xx);
661     }
662     else {
663       ierr=VecScatterBegin(xx,0,mat->lvec,0,InsertValues,ScatterUp,mat->Mvctx);
664       CHKERR(ierr);
665       ierr = VecScatterEnd(xx,0,mat->lvec,0,InsertValues,ScatterUp,mat->Mvctx);
666       CHKERR(ierr);
667     }
668     while (its--) {
669       /* go down through the rows */
670       ierr = VecPipelineBegin(xx,0,mat->lvec,0,InsertValues,PipelineDown,
671                               mat->Mvctx); CHKERR(ierr);
672       for ( i=0; i<m; i++ ) {
673         n    = A->i[i+1] - A->i[i];
674         idx  = A->j + A->i[i] - 1;
675         v    = A->a + A->i[i] - 1;
676         sum  = b[i];
677         SPARSEDENSEMDOT(sum,xs,v,idx,n);
678         d    = shift + A->a[diag[i]-1];
679         n    = B->i[i+1] - B->i[i];
680         idx  = B->j + B->i[i] - 1;
681         v    = B->a + B->i[i] - 1;
682         SPARSEDENSEMDOT(sum,ls,v,idx,n);
683         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
684       }
685       ierr = VecPipelineEnd(xx,0,mat->lvec,0,InsertValues,PipelineDown,
686                             mat->Mvctx); CHKERR(ierr);
687       /* come up through the rows */
688       ierr = VecPipelineBegin(xx,0,mat->lvec,0,InsertValues,PipelineUp,
689                               mat->Mvctx); CHKERR(ierr);
690       for ( i=m-1; i>-1; i-- ) {
691         n    = A->i[i+1] - A->i[i];
692         idx  = A->j + A->i[i] - 1;
693         v    = A->a + A->i[i] - 1;
694         sum  = b[i];
695         SPARSEDENSEMDOT(sum,xs,v,idx,n);
696         d    = shift + A->a[diag[i]-1];
697         n    = B->i[i+1] - B->i[i];
698         idx  = B->j + B->i[i] - 1;
699         v    = B->a + B->i[i] - 1;
700         SPARSEDENSEMDOT(sum,ls,v,idx,n);
701         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
702       }
703       ierr = VecPipelineEnd(xx,0,mat->lvec,0,InsertValues,PipelineUp,
704                             mat->Mvctx); CHKERR(ierr);
705     }
706   }
707   else if (flag & SOR_FORWARD_SWEEP){
708     if (flag & SOR_ZERO_INITIAL_GUESS) {
709       VecSet(&zero,mat->lvec);
710       ierr = VecPipelineBegin(xx,0,mat->lvec,0,InsertValues,PipelineDown,
711                               mat->Mvctx); CHKERR(ierr);
712       for ( i=0; i<m; i++ ) {
713         n    = diag[i] - A->i[i];
714         idx  = A->j + A->i[i] - 1;
715         v    = A->a + A->i[i] - 1;
716         sum  = b[i];
717         SPARSEDENSEMDOT(sum,xs,v,idx,n);
718         d    = shift + A->a[diag[i]-1];
719         n    = B->i[i+1] - B->i[i];
720         idx  = B->j + B->i[i] - 1;
721         v    = B->a + B->i[i] - 1;
722         SPARSEDENSEMDOT(sum,ls,v,idx,n);
723         x[i] = omega*(sum/d);
724       }
725       ierr = VecPipelineEnd(xx,0,mat->lvec,0,InsertValues,PipelineDown,
726                             mat->Mvctx); CHKERR(ierr);
727       its--;
728     }
729     while (its--) {
730       ierr=VecScatterBegin(xx,0,mat->lvec,0,InsertValues,ScatterUp,mat->Mvctx);
731       CHKERR(ierr);
732       ierr = VecScatterEnd(xx,0,mat->lvec,0,InsertValues,ScatterUp,mat->Mvctx);
733       CHKERR(ierr);
734       ierr = VecPipelineBegin(xx,0,mat->lvec,0,InsertValues,PipelineDown,
735                               mat->Mvctx); CHKERR(ierr);
736       for ( i=0; i<m; i++ ) {
737         n    = A->i[i+1] - A->i[i];
738         idx  = A->j + A->i[i] - 1;
739         v    = A->a + A->i[i] - 1;
740         sum  = b[i];
741         SPARSEDENSEMDOT(sum,xs,v,idx,n);
742         d    = shift + A->a[diag[i]-1];
743         n    = B->i[i+1] - B->i[i];
744         idx  = B->j + B->i[i] - 1;
745         v    = B->a + B->i[i] - 1;
746         SPARSEDENSEMDOT(sum,ls,v,idx,n);
747         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
748       }
749       ierr = VecPipelineEnd(xx,0,mat->lvec,0,InsertValues,PipelineDown,
750                             mat->Mvctx); CHKERR(ierr);
751     }
752   }
753   else if (flag & SOR_BACKWARD_SWEEP){
754     if (flag & SOR_ZERO_INITIAL_GUESS) {
755       VecSet(&zero,mat->lvec);
756       ierr = VecPipelineBegin(xx,0,mat->lvec,0,InsertValues,PipelineUp,
757                               mat->Mvctx); CHKERR(ierr);
758       for ( i=m-1; i>-1; i-- ) {
759         n    = A->i[i+1] - diag[i] - 1;
760         idx  = A->j + diag[i];
761         v    = A->a + diag[i];
762         sum  = b[i];
763         SPARSEDENSEMDOT(sum,xs,v,idx,n);
764         d    = shift + A->a[diag[i]-1];
765         n    = B->i[i+1] - B->i[i];
766         idx  = B->j + B->i[i] - 1;
767         v    = B->a + B->i[i] - 1;
768         SPARSEDENSEMDOT(sum,ls,v,idx,n);
769         x[i] = omega*(sum/d);
770       }
771       ierr = VecPipelineEnd(xx,0,mat->lvec,0,InsertValues,PipelineUp,
772                             mat->Mvctx); CHKERR(ierr);
773       its--;
774     }
775     while (its--) {
776       ierr = VecScatterBegin(xx,0,mat->lvec,0,InsertValues,ScatterDown,
777                             mat->Mvctx); CHKERR(ierr);
778       ierr = VecScatterEnd(xx,0,mat->lvec,0,InsertValues,ScatterDown,
779                             mat->Mvctx); CHKERR(ierr);
780       ierr = VecPipelineBegin(xx,0,mat->lvec,0,InsertValues,PipelineUp,
781                               mat->Mvctx); CHKERR(ierr);
782       for ( i=m-1; i>-1; i-- ) {
783         n    = A->i[i+1] - A->i[i];
784         idx  = A->j + A->i[i] - 1;
785         v    = A->a + A->i[i] - 1;
786         sum  = b[i];
787         SPARSEDENSEMDOT(sum,xs,v,idx,n);
788         d    = shift + A->a[diag[i]-1];
789         n    = B->i[i+1] - B->i[i];
790         idx  = B->j + B->i[i] - 1;
791         v    = B->a + B->i[i] - 1;
792         SPARSEDENSEMDOT(sum,ls,v,idx,n);
793         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
794       }
795       ierr = VecPipelineEnd(xx,0,mat->lvec,0,InsertValues,PipelineUp,
796                             mat->Mvctx); CHKERR(ierr);
797     }
798   }
799   else if ((flag & SOR_LOCAL_SYMMETRIC_SWEEP) == SOR_LOCAL_SYMMETRIC_SWEEP){
800     if (flag & SOR_ZERO_INITIAL_GUESS) {
801       return MatRelax(mat->A,bb,omega,flag,shift,its,xx);
802     }
803     ierr=VecScatterBegin(xx,0,mat->lvec,0,InsertValues,ScatterAll,mat->Mvctx);
804     CHKERR(ierr);
805     ierr = VecScatterEnd(xx,0,mat->lvec,0,InsertValues,ScatterAll,mat->Mvctx);
806     CHKERR(ierr);
807     while (its--) {
808       /* go down through the rows */
809       for ( i=0; i<m; i++ ) {
810         n    = A->i[i+1] - A->i[i];
811         idx  = A->j + A->i[i] - 1;
812         v    = A->a + A->i[i] - 1;
813         sum  = b[i];
814         SPARSEDENSEMDOT(sum,xs,v,idx,n);
815         d    = shift + A->a[diag[i]-1];
816         n    = B->i[i+1] - B->i[i];
817         idx  = B->j + B->i[i] - 1;
818         v    = B->a + B->i[i] - 1;
819         SPARSEDENSEMDOT(sum,ls,v,idx,n);
820         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
821       }
822       /* come up through the rows */
823       for ( i=m-1; i>-1; i-- ) {
824         n    = A->i[i+1] - A->i[i];
825         idx  = A->j + A->i[i] - 1;
826         v    = A->a + A->i[i] - 1;
827         sum  = b[i];
828         SPARSEDENSEMDOT(sum,xs,v,idx,n);
829         d    = shift + A->a[diag[i]-1];
830         n    = B->i[i+1] - B->i[i];
831         idx  = B->j + B->i[i] - 1;
832         v    = B->a + B->i[i] - 1;
833         SPARSEDENSEMDOT(sum,ls,v,idx,n);
834         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
835       }
836     }
837   }
838   else if (flag & SOR_LOCAL_FORWARD_SWEEP){
839     if (flag & SOR_ZERO_INITIAL_GUESS) {
840       return MatRelax(mat->A,bb,omega,flag,shift,its,xx);
841     }
842     ierr=VecScatterBegin(xx,0,mat->lvec,0,InsertValues,ScatterAll,mat->Mvctx);
843     CHKERR(ierr);
844     ierr = VecScatterEnd(xx,0,mat->lvec,0,InsertValues,ScatterAll,mat->Mvctx);
845     CHKERR(ierr);
846     while (its--) {
847       for ( i=0; i<m; i++ ) {
848         n    = A->i[i+1] - A->i[i];
849         idx  = A->j + A->i[i] - 1;
850         v    = A->a + A->i[i] - 1;
851         sum  = b[i];
852         SPARSEDENSEMDOT(sum,xs,v,idx,n);
853         d    = shift + A->a[diag[i]-1];
854         n    = B->i[i+1] - B->i[i];
855         idx  = B->j + B->i[i] - 1;
856         v    = B->a + B->i[i] - 1;
857         SPARSEDENSEMDOT(sum,ls,v,idx,n);
858         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
859       }
860     }
861   }
862   else if (flag & SOR_LOCAL_BACKWARD_SWEEP){
863     if (flag & SOR_ZERO_INITIAL_GUESS) {
864       return MatRelax(mat->A,bb,omega,flag,shift,its,xx);
865     }
866     ierr = VecScatterBegin(xx,0,mat->lvec,0,InsertValues,ScatterAll,
867                             mat->Mvctx); CHKERR(ierr);
868     ierr = VecScatterEnd(xx,0,mat->lvec,0,InsertValues,ScatterAll,
869                             mat->Mvctx); CHKERR(ierr);
870     while (its--) {
871       for ( i=m-1; i>-1; i-- ) {
872         n    = A->i[i+1] - A->i[i];
873         idx  = A->j + A->i[i] - 1;
874         v    = A->a + A->i[i] - 1;
875         sum  = b[i];
876         SPARSEDENSEMDOT(sum,xs,v,idx,n);
877         d    = shift + A->a[diag[i]-1];
878         n    = B->i[i+1] - B->i[i];
879         idx  = B->j + B->i[i] - 1;
880         v    = B->a + B->i[i] - 1;
881         SPARSEDENSEMDOT(sum,ls,v,idx,n);
882         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
883       }
884     }
885   }
886   return 0;
887 }
888 static int MatInsOpt_MPIAIJ(Mat aijin,int op)
889 {
890   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) aijin->data;
891 
892   if      (op == NO_NEW_NONZERO_LOCATIONS)  {
893     MatSetOption(aij->A,op);
894     MatSetOption(aij->B,op);
895   }
896   else if (op == YES_NEW_NONZERO_LOCATIONS) {
897     MatSetOption(aij->A,op);
898     MatSetOption(aij->B,op);
899   }
900   else if (op == COLUMN_ORIENTED) SETERR(1,"Column oriented not supported");
901   return 0;
902 }
903 
904 static int MatSize_MPIAIJ(Mat matin,int *m,int *n)
905 {
906   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
907   *m = mat->M; *n = mat->N;
908   return 0;
909 }
910 
911 static int MatLocalSize_MPIAIJ(Mat matin,int *m,int *n)
912 {
913   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
914   *m = mat->m; *n = mat->n;
915   return 0;
916 }
917 
918 static int MatRange_MPIAIJ(Mat matin,int *m,int *n)
919 {
920   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
921   *m = mat->rstart; *n = mat->rend;
922   return 0;
923 }
924 
925 static int MatGetRow_MPIAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
926 {
927   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) matin->data;
928   Mat        A = aij->A, B = aij->B;
929   Mat_AIJ    *Ad = (Mat_AIJ *)(A->data), *Bd = (Mat_AIJ *)(B->data);
930   Scalar     *vworkA, *vworkB;
931   int        ierr, *cworkA, *cworkB;
932   int        nztot, nzA, nzB, i, rstart = aij->rstart, rend = aij->rend;
933   int        cstart = aij->cstart;
934 
935   if (!aij->assembled)
936     SETERR(1,"MatGetRow_MPIAIJ: Must assemble matrix first.");
937   if (row < rstart || row >= rend)
938     SETERR(1,"MatGetRow_MPIAIJ: Currently you can get only the local rows.")
939 
940   ierr = MatGetRow(A,row,&nzA,&cworkA,&vworkA);	CHKERR(ierr);
941   for (i=0; i<nzA; i++) cworkA[i] += cstart;
942   ierr = MatGetRow(B,row,&nzB,&cworkB,&vworkB);	CHKERR(ierr);
943   for (i=0; i<nzB; i++) {
944       printf("i=%d, invcolmap[i]=%d, cwork=%d, vwork=%g\n",
945                i, aij->invcolmap[i], cworkB[i], vworkB[i] );
946 /*    cworkB[i] = aij->invcolmap[cworkB[i]]; */
947    }
948 
949   if (nztot = nzA + nzB) {
950     *idx = (int *) MALLOC( (nztot)*sizeof(int) ); CHKPTR(*idx);
951     *v   = (Scalar *) MALLOC( (nztot)*sizeof(Scalar) ); CHKPTR(*v);
952     for ( i=0; i<nzA; i++ ) {
953       (*idx)[i] = cworkA[i];
954       (*v)[i] = vworkA[i];
955     }
956     for ( i=0; i<nzB; i++ ) {
957       (*idx)[i+nzA] = cworkB[i];
958       (*v)[i] = vworkB[i];
959     }
960   }
961   else {*idx = 0; *v=0;}
962   *nz = nztot;
963   ierr = MatRestoreRow(A,row,&nzA,&cworkA,&vworkA);	CHKERR(ierr);
964   ierr = MatRestoreRow(B,row,&nzB,&cworkB,&vworkB);	CHKERR(ierr);
965   return 0;
966 }
967 
968 static int MatRestoreRow_MPIAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
969 {
970   if (*idx) FREE(*idx);
971   if (*v) FREE(*v);
972   return 0;
973 }
974 
975 static int MatCopy_MPIAIJ(Mat,Mat *);
976 extern int MatConvert_MPIAIJ(Mat,MATTYPE,Mat *);
977 
978 /* -------------------------------------------------------------------*/
979 static struct _MatOps MatOps = {MatInsertValues_MPIAIJ,
980        MatGetRow_MPIAIJ,MatRestoreRow_MPIAIJ,
981        MatMult_MPIAIJ,MatMultAdd_MPIAIJ,
982        MatMultTrans_MPIAIJ,MatMultTransAdd_MPIAIJ,
983        0,0,0,0,
984        0,0,
985        MatRelax_MPIAIJ,
986        0,
987        0,0,0,
988        MatCopy_MPIAIJ,
989        MatGetDiag_MPIAIJ,0,0,
990        MatBeginAssemble_MPIAIJ,MatEndAssemble_MPIAIJ,
991        0,
992        MatInsOpt_MPIAIJ,MatZero_MPIAIJ,MatZeroRows_MPIAIJ,0,
993        0,0,0,0,
994        MatSize_MPIAIJ,MatLocalSize_MPIAIJ,MatRange_MPIAIJ,
995        0,0,
996        0,MatConvert_MPIAIJ };
997 
998 /*@
999 
1000       MatCreateMPIAIJ - Creates a sparse parallel matrix
1001                                  in AIJ format.
1002 
1003   Input Parameters:
1004 .   comm - MPI communicator
1005 .   m,n - number of local rows and columns (or -1 to have calculated)
1006 .   M,N - global rows and columns (or -1 to have calculated)
1007 .   d_nz - total number nonzeros in diagonal portion of matrix
1008 .   d_nzz - number of nonzeros per row in diagonal portion of matrix or null
1009 .           You must leave room for the diagonal entry even if it is zero.
1010 .   o_nz - total number nonzeros in off-diagonal portion of matrix
1011 .   o_nzz - number of nonzeros per row in off-diagonal portion of matrix
1012 .           or null. You must have at least one nonzero per row.
1013 
1014   Output parameters:
1015 .  newmat - the matrix
1016 
1017   Keywords: matrix, aij, compressed row, sparse, parallel
1018 @*/
1019 int MatCreateMPIAIJ(MPI_Comm comm,int m,int n,int M,int N,
1020                  int d_nz,int *d_nnz, int o_nz,int *o_nnz,Mat *newmat)
1021 {
1022   Mat          mat;
1023   Mat_MPIAIJ   *aij;
1024   int          ierr, i,sum[2],work[2];
1025   *newmat         = 0;
1026   PETSCHEADERCREATE(mat,_Mat,MAT_COOKIE,MATMPIAIJ,comm);
1027   PLogObjectCreate(mat);
1028   mat->data       = (void *) (aij = NEW(Mat_MPIAIJ)); CHKPTR(aij);
1029   mat->ops        = &MatOps;
1030   mat->destroy    = MatDestroy_MPIAIJ;
1031   mat->view       = MatView_MPIAIJ;
1032   mat->factor     = 0;
1033   mat->row        = 0;
1034   mat->col        = 0;
1035 
1036   mat->comm       = comm;
1037   aij->insertmode = NotSetValues;
1038   MPI_Comm_rank(comm,&aij->mytid);
1039   MPI_Comm_size(comm,&aij->numtids);
1040 
1041   if (M == -1 || N == -1) {
1042     work[0] = m; work[1] = n;
1043     MPI_Allreduce((void *) work,(void *) sum,2,MPI_INT,MPI_SUM,comm );
1044     if (M == -1) M = sum[0];
1045     if (N == -1) N = sum[1];
1046   }
1047   if (m == -1) {m = M/aij->numtids + ((M % aij->numtids) > aij->mytid);}
1048   if (n == -1) {n = N/aij->numtids + ((N % aij->numtids) > aij->mytid);}
1049   aij->m       = m;
1050   aij->n       = n;
1051   aij->N       = N;
1052   aij->M       = M;
1053 
1054   /* build local table of row and column ownerships */
1055   aij->rowners = (int *) MALLOC(2*(aij->numtids+2)*sizeof(int));
1056   CHKPTR(aij->rowners);
1057   aij->cowners = aij->rowners + aij->numtids + 1;
1058   MPI_Allgather(&m,1,MPI_INT,aij->rowners+1,1,MPI_INT,comm);
1059   aij->rowners[0] = 0;
1060   for ( i=2; i<=aij->numtids; i++ ) {
1061     aij->rowners[i] += aij->rowners[i-1];
1062   }
1063   aij->rstart = aij->rowners[aij->mytid];
1064   aij->rend   = aij->rowners[aij->mytid+1];
1065   MPI_Allgather(&n,1,MPI_INT,aij->cowners+1,1,MPI_INT,comm);
1066   aij->cowners[0] = 0;
1067   for ( i=2; i<=aij->numtids; i++ ) {
1068     aij->cowners[i] += aij->cowners[i-1];
1069   }
1070   aij->cstart = aij->cowners[aij->mytid];
1071   aij->cend   = aij->cowners[aij->mytid+1];
1072 
1073 
1074   ierr = MatCreateSequentialAIJ(m,n,d_nz,d_nnz,&aij->A); CHKERR(ierr);
1075   PLogObjectParent(mat,aij->A);
1076   ierr = MatCreateSequentialAIJ(m,N,o_nz,o_nnz,&aij->B); CHKERR(ierr);
1077   PLogObjectParent(mat,aij->B);
1078 
1079   /* build cache for off array entries formed */
1080   aij->stash.nmax = CHUNCKSIZE; /* completely arbratray number */
1081   aij->stash.n    = 0;
1082   aij->stash.array = (Scalar *) MALLOC( aij->stash.nmax*(2*sizeof(int) +
1083                             sizeof(Scalar))); CHKPTR(aij->stash.array);
1084   aij->stash.idx = (int *) (aij->stash.array + aij->stash.nmax);
1085   aij->stash.idy = (int *) (aij->stash.idx + aij->stash.nmax);
1086   aij->colmap    = 0;
1087   aij->garray    = 0;
1088 
1089   /* stuff used for matrix vector multiply */
1090   aij->lvec      = 0;
1091   aij->Mvctx     = 0;
1092   aij->assembled = 0;
1093 
1094   *newmat = mat;
1095   return 0;
1096 }
1097 
1098 static int MatCopy_MPIAIJ(Mat matin,Mat *newmat)
1099 {
1100   Mat        mat;
1101   Mat_MPIAIJ *aij,*oldmat = (Mat_MPIAIJ *) matin->data;
1102   int        ierr;
1103   *newmat      = 0;
1104 
1105   if (!oldmat->assembled) SETERR(1,"Cannot copy unassembled matrix");
1106   PETSCHEADERCREATE(mat,_Mat,MAT_COOKIE,MATMPIAIJ,matin->comm);
1107   PLogObjectCreate(mat);
1108   mat->data       = (void *) (aij = NEW(Mat_MPIAIJ)); CHKPTR(aij);
1109   mat->ops        = &MatOps;
1110   mat->destroy    = MatDestroy_MPIAIJ;
1111   mat->view       = MatView_MPIAIJ;
1112   mat->factor     = matin->factor;
1113   mat->row        = 0;
1114   mat->col        = 0;
1115 
1116   aij->m          = oldmat->m;
1117   aij->n          = oldmat->n;
1118   aij->M          = oldmat->M;
1119   aij->N          = oldmat->N;
1120 
1121   aij->assembled  = 1;
1122   aij->rstart     = oldmat->rstart;
1123   aij->rend       = oldmat->rend;
1124   aij->cstart     = oldmat->cstart;
1125   aij->cend       = oldmat->cend;
1126   aij->numtids    = oldmat->numtids;
1127   aij->mytid      = oldmat->mytid;
1128   aij->insertmode = NotSetValues;
1129 
1130   aij->rowners    = (int *) MALLOC( (aij->numtids+1)*sizeof(int) );
1131   CHKPTR(aij->rowners);
1132   MEMCPY(aij->rowners,oldmat->rowners,(aij->numtids+1)*sizeof(int));
1133   aij->stash.nmax = 0;
1134   aij->stash.n    = 0;
1135   aij->stash.array= 0;
1136   aij->colmap     = 0;
1137   aij->garray     = 0;
1138   mat->comm       = matin->comm;
1139 
1140   ierr =  VecCreate(oldmat->lvec,&aij->lvec); CHKERR(ierr);
1141   PLogObjectParent(mat,aij->lvec);
1142   ierr =  VecScatterCtxCopy(oldmat->Mvctx,&aij->Mvctx); CHKERR(ierr);
1143   PLogObjectParent(mat,aij->Mvctx);
1144   ierr =  MatCopy(oldmat->A,&aij->A); CHKERR(ierr);
1145   PLogObjectParent(mat,aij->A);
1146   ierr =  MatCopy(oldmat->B,&aij->B); CHKERR(ierr);
1147   PLogObjectParent(mat,aij->B);
1148   *newmat = mat;
1149   return 0;
1150 }
1151