xref: /petsc/src/mat/impls/aij/mpi/mpiaij.c (revision 0703bec71b91cbbb29aff3f579e0effa75cd816d)
1 #ifndef lint
2 static char vcid[] = "$Id: mpiaij.c,v 1.65 1995/08/17 15:14:08 curfman Exp curfman $";
3 #endif
4 
5 #include "mpiaij.h"
6 #include "vec/vecimpl.h"
7 #include "inline/spops.h"
8 
9 /* local utility routine that creates a mapping from the global column
10 number to the local number in the off-diagonal part of the local
11 storage of the matrix.  This is done in a non scable way since the
12 length of colmap equals the global matrix length.
13 */
14 static int CreateColmap_Private(Mat mat)
15 {
16   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
17   Mat_AIJ    *B = (Mat_AIJ*) aij->B->data;
18   int        n = B->n,i;
19   aij->colmap = (int *) PETSCMALLOC(aij->N*sizeof(int));CHKPTRQ(aij->colmap);
20   PLogObjectMemory(mat,aij->N*sizeof(int));
21   PETSCMEMSET(aij->colmap,0,aij->N*sizeof(int));
22   for ( i=0; i<n; i++ ) aij->colmap[aij->garray[i]] = i+1;
23   return 0;
24 }
25 
26 extern int DisAssemble_MPIAIJ(Mat);
27 
28 static int MatGetReordering_MPIAIJ(Mat mat,MatOrdering type,IS *rperm,IS *cperm)
29 {
30   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
31   int ierr;
32   if (aij->numtids == 1) {
33     ierr = MatGetReordering(aij->A,type,rperm,cperm); CHKERRQ(ierr);
34   } else
35     SETERRQ(1,"MatGetReordering_MPIAIJ:  not yet supported in parallel");
36   return 0;
37 }
38 
39 static int MatSetValues_MPIAIJ(Mat mat,int m,int *idxm,int n,
40                             int *idxn,Scalar *v,InsertMode addv)
41 {
42   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
43   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
44   int        cstart = aij->cstart, cend = aij->cend,row,col;
45 
46   if (aij->insertmode != NOTSETVALUES && aij->insertmode != addv) {
47     SETERRQ(1,"MatSetValues_MPIAIJ:You cannot mix inserts and adds");
48   }
49   aij->insertmode = addv;
50   for ( i=0; i<m; i++ ) {
51     if (idxm[i] < 0) SETERRQ(1,"MatSetValues_MPIAIJ:Negative row");
52     if (idxm[i] >= aij->M) SETERRQ(1,"MatSetValues_MPIAIJ:Row too large");
53     if (idxm[i] >= rstart && idxm[i] < rend) {
54       row = idxm[i] - rstart;
55       for ( j=0; j<n; j++ ) {
56         if (idxn[j] < 0) SETERRQ(1,"MatSetValues_MPIAIJ:Negative column");
57         if (idxn[j] >= aij->N) SETERRQ(1,"MatSetValues_MPIAIJ:Col too large");
58         if (idxn[j] >= cstart && idxn[j] < cend){
59           col = idxn[j] - cstart;
60           ierr = MatSetValues(aij->A,1,&row,1,&col,v+i*n+j,addv);CHKERRQ(ierr);
61         }
62         else {
63           if (aij->assembled) {
64             if (!aij->colmap) {ierr = CreateColmap_Private(mat);CHKERRQ(ierr);}
65             col = aij->colmap[idxn[j]] - 1;
66             if (col < 0 && !((Mat_AIJ*)(aij->A->data))->nonew) {
67               ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
68               col =  idxn[j];
69             }
70           }
71           else col = idxn[j];
72           ierr = MatSetValues(aij->B,1,&row,1,&col,v+i*n+j,addv);CHKERRQ(ierr);
73         }
74       }
75     }
76     else {
77       ierr = StashValues_Private(&aij->stash,idxm[i],n,idxn,v+i*n,addv);
78       CHKERRQ(ierr);
79     }
80   }
81   return 0;
82 }
83 
84 /*
85     the assembly code is a lot like the code for vectors, we should
86     sometime derive a single assembly code that can be used for
87     either case.
88 */
89 
90 static int MatAssemblyBegin_MPIAIJ(Mat mat,MatAssemblyType mode)
91 {
92   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
93   MPI_Comm    comm = mat->comm;
94   int         numtids = aij->numtids, *owners = aij->rowners;
95   int         mytid = aij->mytid;
96   MPI_Request *send_waits,*recv_waits;
97   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
98   int         tag = mat->tag, *owner,*starts,count,ierr;
99   InsertMode  addv;
100   Scalar      *rvalues,*svalues;
101 
102   /* make sure all processors are either in INSERTMODE or ADDMODE */
103   MPI_Allreduce((void *) &aij->insertmode,(void *) &addv,1,MPI_INT,
104                 MPI_BOR,comm);
105   if (addv == (ADDVALUES|INSERTVALUES)) {
106     SETERRQ(1,"MatAssemblyBegin_MPIAIJ:Some processors inserted others added");
107   }
108   aij->insertmode = addv; /* in case this processor had no cache */
109 
110   /*  first count number of contributors to each processor */
111   nprocs = (int *) PETSCMALLOC( 2*numtids*sizeof(int) ); CHKPTRQ(nprocs);
112   PETSCMEMSET(nprocs,0,2*numtids*sizeof(int)); procs = nprocs + numtids;
113   owner = (int *) PETSCMALLOC( (aij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
114   for ( i=0; i<aij->stash.n; i++ ) {
115     idx = aij->stash.idx[i];
116     for ( j=0; j<numtids; j++ ) {
117       if (idx >= owners[j] && idx < owners[j+1]) {
118         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
119       }
120     }
121   }
122   nsends = 0;  for ( i=0; i<numtids; i++ ) { nsends += procs[i];}
123 
124   /* inform other processors of number of messages and max length*/
125   work = (int *) PETSCMALLOC( numtids*sizeof(int) ); CHKPTRQ(work);
126   MPI_Allreduce((void *) procs,(void *) work,numtids,MPI_INT,MPI_SUM,comm);
127   nreceives = work[mytid];
128   MPI_Allreduce((void *) nprocs,(void *) work,numtids,MPI_INT,MPI_MAX,comm);
129   nmax = work[mytid];
130   PETSCFREE(work);
131 
132   /* post receives:
133        1) each message will consist of ordered pairs
134      (global index,value) we store the global index as a double
135      to simplify the message passing.
136        2) since we don't know how long each individual message is we
137      allocate the largest needed buffer for each receive. Potentially
138      this is a lot of wasted space.
139 
140 
141        This could be done better.
142   */
143   rvalues = (Scalar *) PETSCMALLOC(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
144   CHKPTRQ(rvalues);
145   recv_waits = (MPI_Request *) PETSCMALLOC((nreceives+1)*sizeof(MPI_Request));
146   CHKPTRQ(recv_waits);
147   for ( i=0; i<nreceives; i++ ) {
148     MPI_Irecv((void *)(rvalues+3*nmax*i),3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
149               comm,recv_waits+i);
150   }
151 
152   /* do sends:
153       1) starts[i] gives the starting index in svalues for stuff going to
154          the ith processor
155   */
156   svalues = (Scalar *) PETSCMALLOC( 3*(aij->stash.n+1)*sizeof(Scalar) );
157   CHKPTRQ(svalues);
158   send_waits = (MPI_Request *) PETSCMALLOC( (nsends+1)*sizeof(MPI_Request));
159   CHKPTRQ(send_waits);
160   starts = (int *) PETSCMALLOC( numtids*sizeof(int) ); CHKPTRQ(starts);
161   starts[0] = 0;
162   for ( i=1; i<numtids; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
163   for ( i=0; i<aij->stash.n; i++ ) {
164     svalues[3*starts[owner[i]]]       = (Scalar)  aij->stash.idx[i];
165     svalues[3*starts[owner[i]]+1]     = (Scalar)  aij->stash.idy[i];
166     svalues[3*(starts[owner[i]]++)+2] =  aij->stash.array[i];
167   }
168   PETSCFREE(owner);
169   starts[0] = 0;
170   for ( i=1; i<numtids; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
171   count = 0;
172   for ( i=0; i<numtids; i++ ) {
173     if (procs[i]) {
174       MPI_Isend((void*)(svalues+3*starts[i]),3*nprocs[i],MPIU_SCALAR,i,tag,
175                 comm,send_waits+count++);
176     }
177   }
178   PETSCFREE(starts); PETSCFREE(nprocs);
179 
180   /* Free cache space */
181   ierr = StashDestroy_Private(&aij->stash); CHKERRQ(ierr);
182 
183   aij->svalues    = svalues;    aij->rvalues = rvalues;
184   aij->nsends     = nsends;     aij->nrecvs = nreceives;
185   aij->send_waits = send_waits; aij->recv_waits = recv_waits;
186   aij->rmax       = nmax;
187 
188   return 0;
189 }
190 extern int MatSetUpMultiply_MPIAIJ(Mat);
191 
192 static int MatAssemblyEnd_MPIAIJ(Mat mat,MatAssemblyType mode)
193 {
194   int        ierr;
195   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
196   MPI_Status  *send_status,recv_status;
197   int         imdex,nrecvs = aij->nrecvs, count = nrecvs, i, n;
198   int         row,col,other_disassembled;
199   Scalar      *values,val;
200   InsertMode  addv = aij->insertmode;
201 
202   /*  wait on receives */
203   while (count) {
204     MPI_Waitany(nrecvs,aij->recv_waits,&imdex,&recv_status);
205     /* unpack receives into our local space */
206     values = aij->rvalues + 3*imdex*aij->rmax;
207     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
208     n = n/3;
209     for ( i=0; i<n; i++ ) {
210       row = (int) PETSCREAL(values[3*i]) - aij->rstart;
211       col = (int) PETSCREAL(values[3*i+1]);
212       val = values[3*i+2];
213       if (col >= aij->cstart && col < aij->cend) {
214           col -= aij->cstart;
215         MatSetValues(aij->A,1,&row,1,&col,&val,addv);
216       }
217       else {
218         if (aij->assembled) {
219           if (!aij->colmap) {ierr = CreateColmap_Private(mat);CHKERRQ(ierr);}
220           col = aij->colmap[col] - 1;
221           if (col < 0  && !((Mat_AIJ*)(aij->A->data))->nonew) {
222             ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
223             col = (int) PETSCREAL(values[3*i+1]);
224           }
225         }
226         MatSetValues(aij->B,1,&row,1,&col,&val,addv);
227       }
228     }
229     count--;
230   }
231   PETSCFREE(aij->recv_waits); PETSCFREE(aij->rvalues);
232 
233   /* wait on sends */
234   if (aij->nsends) {
235     send_status = (MPI_Status *) PETSCMALLOC( aij->nsends*sizeof(MPI_Status) );
236     CHKPTRQ(send_status);
237     MPI_Waitall(aij->nsends,aij->send_waits,send_status);
238     PETSCFREE(send_status);
239   }
240   PETSCFREE(aij->send_waits); PETSCFREE(aij->svalues);
241 
242   aij->insertmode = NOTSETVALUES;
243   ierr = MatAssemblyBegin(aij->A,mode); CHKERRQ(ierr);
244   ierr = MatAssemblyEnd(aij->A,mode); CHKERRQ(ierr);
245 
246   /* determine if any processor has disassembled, if so we must
247      also disassemble ourselfs, in order that we may reassemble. */
248   MPI_Allreduce((void *) &aij->assembled,(void *) &other_disassembled,1,
249                  MPI_INT,MPI_PROD,mat->comm);
250   if (aij->assembled && !other_disassembled) {
251     ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
252   }
253 
254   if (!aij->assembled && mode == FINAL_ASSEMBLY) {
255     ierr = MatSetUpMultiply_MPIAIJ(mat); CHKERRQ(ierr);
256     aij->assembled = 1;
257   }
258   ierr = MatAssemblyBegin(aij->B,mode); CHKERRQ(ierr);
259   ierr = MatAssemblyEnd(aij->B,mode); CHKERRQ(ierr);
260 
261   return 0;
262 }
263 
264 static int MatZeroEntries_MPIAIJ(Mat A)
265 {
266   Mat_MPIAIJ *l = (Mat_MPIAIJ *) A->data;
267   int ierr;
268   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
269   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
270   return 0;
271 }
272 
273 /* again this uses the same basic stratagy as in the assembly and
274    scatter create routines, we should try to do it systemamatically
275    if we can figure out the proper level of generality. */
276 
277 /* the code does not do the diagonal entries correctly unless the
278    matrix is square and the column and row owerships are identical.
279    This is a BUG. The only way to fix it seems to be to access
280    aij->A and aij->B directly and not through the MatZeroRows()
281    routine.
282 */
283 static int MatZeroRows_MPIAIJ(Mat A,IS is,Scalar *diag)
284 {
285   Mat_MPIAIJ     *l = (Mat_MPIAIJ *) A->data;
286   int            i,ierr,N, *rows,*owners = l->rowners,numtids = l->numtids;
287   int            *procs,*nprocs,j,found,idx,nsends,*work;
288   int            nmax,*svalues,*starts,*owner,nrecvs,mytid = l->mytid;
289   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
290   int            *lens,imdex,*lrows,*values;
291   MPI_Comm       comm = A->comm;
292   MPI_Request    *send_waits,*recv_waits;
293   MPI_Status     recv_status,*send_status;
294   IS             istmp;
295 
296   if (!l->assembled)
297     SETERRQ(1,"MatZeroRows_MPIAIJ:Must assemble matrix first");
298   ierr = ISGetLocalSize(is,&N); CHKERRQ(ierr);
299   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
300 
301   /*  first count number of contributors to each processor */
302   nprocs = (int *) PETSCMALLOC( 2*numtids*sizeof(int) ); CHKPTRQ(nprocs);
303   PETSCMEMSET(nprocs,0,2*numtids*sizeof(int)); procs = nprocs + numtids;
304   owner = (int *) PETSCMALLOC((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
305   for ( i=0; i<N; i++ ) {
306     idx = rows[i];
307     found = 0;
308     for ( j=0; j<numtids; j++ ) {
309       if (idx >= owners[j] && idx < owners[j+1]) {
310         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
311       }
312     }
313     if (!found) SETERRQ(1,"MatZeroRows_MPIAIJ:Index out of range");
314   }
315   nsends = 0;  for ( i=0; i<numtids; i++ ) { nsends += procs[i];}
316 
317   /* inform other processors of number of messages and max length*/
318   work = (int *) PETSCMALLOC( numtids*sizeof(int) ); CHKPTRQ(work);
319   MPI_Allreduce((void *) procs,(void *) work,numtids,MPI_INT,MPI_SUM,comm);
320   nrecvs = work[mytid];
321   MPI_Allreduce((void *) nprocs,(void *) work,numtids,MPI_INT,MPI_MAX,comm);
322   nmax = work[mytid];
323   PETSCFREE(work);
324 
325   /* post receives:   */
326   rvalues = (int *) PETSCMALLOC((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
327   CHKPTRQ(rvalues);
328   recv_waits = (MPI_Request *) PETSCMALLOC((nrecvs+1)*sizeof(MPI_Request));
329   CHKPTRQ(recv_waits);
330   for ( i=0; i<nrecvs; i++ ) {
331     MPI_Irecv((void *)(rvalues+nmax*i),nmax,MPI_INT,MPI_ANY_SOURCE,tag,
332               comm,recv_waits+i);
333   }
334 
335   /* do sends:
336       1) starts[i] gives the starting index in svalues for stuff going to
337          the ith processor
338   */
339   svalues = (int *) PETSCMALLOC( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
340   send_waits = (MPI_Request *) PETSCMALLOC( (nsends+1)*sizeof(MPI_Request));
341   CHKPTRQ(send_waits);
342   starts = (int *) PETSCMALLOC( (numtids+1)*sizeof(int) ); CHKPTRQ(starts);
343   starts[0] = 0;
344   for ( i=1; i<numtids; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
345   for ( i=0; i<N; i++ ) {
346     svalues[starts[owner[i]]++] = rows[i];
347   }
348   ISRestoreIndices(is,&rows);
349 
350   starts[0] = 0;
351   for ( i=1; i<numtids+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
352   count = 0;
353   for ( i=0; i<numtids; i++ ) {
354     if (procs[i]) {
355       MPI_Isend((void*)(svalues+starts[i]),nprocs[i],MPI_INT,i,tag,
356                 comm,send_waits+count++);
357     }
358   }
359   PETSCFREE(starts);
360 
361   base = owners[mytid];
362 
363   /*  wait on receives */
364   lens = (int *) PETSCMALLOC( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
365   source = lens + nrecvs;
366   count = nrecvs; slen = 0;
367   while (count) {
368     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
369     /* unpack receives into our local space */
370     MPI_Get_count(&recv_status,MPI_INT,&n);
371     source[imdex]  = recv_status.MPI_SOURCE;
372     lens[imdex]  = n;
373     slen += n;
374     count--;
375   }
376   PETSCFREE(recv_waits);
377 
378   /* move the data into the send scatter */
379   lrows = (int *) PETSCMALLOC( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
380   count = 0;
381   for ( i=0; i<nrecvs; i++ ) {
382     values = rvalues + i*nmax;
383     for ( j=0; j<lens[i]; j++ ) {
384       lrows[count++] = values[j] - base;
385     }
386   }
387   PETSCFREE(rvalues); PETSCFREE(lens);
388   PETSCFREE(owner); PETSCFREE(nprocs);
389 
390   /* actually zap the local rows */
391   ierr = ISCreateSequential(MPI_COMM_SELF,slen,lrows,&istmp);
392   PLogObjectParent(A,istmp);
393   CHKERRQ(ierr);  PETSCFREE(lrows);
394   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
395   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
396   ierr = ISDestroy(istmp); CHKERRQ(ierr);
397 
398   /* wait on sends */
399   if (nsends) {
400     send_status = (MPI_Status *) PETSCMALLOC( nsends*sizeof(MPI_Status) );
401     CHKPTRQ(send_status);
402     MPI_Waitall(nsends,send_waits,send_status);
403     PETSCFREE(send_status);
404   }
405   PETSCFREE(send_waits); PETSCFREE(svalues);
406 
407 
408   return 0;
409 }
410 
411 static int MatMult_MPIAIJ(Mat aijin,Vec xx,Vec yy)
412 {
413   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) aijin->data;
414   int        ierr;
415   if (!aij->assembled) SETERRQ(1,"MatMult_MPIAIJ:must assemble matrix first");
416   ierr = VecScatterBegin(xx,aij->lvec,INSERTVALUES,SCATTERALL,aij->Mvctx);
417   CHKERRQ(ierr);
418   ierr = MatMult(aij->A,xx,yy); CHKERRQ(ierr);
419   ierr = VecScatterEnd(xx,aij->lvec,INSERTVALUES,SCATTERALL,aij->Mvctx);
420   CHKERRQ(ierr);
421   ierr = MatMultAdd(aij->B,aij->lvec,yy,yy); CHKERRQ(ierr);
422   return 0;
423 }
424 
425 static int MatMultAdd_MPIAIJ(Mat aijin,Vec xx,Vec yy,Vec zz)
426 {
427   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) aijin->data;
428   int        ierr;
429   if (!aij->assembled) SETERRQ(1,"MatMult_MPIAIJ:must assemble matrix first");
430   ierr = VecScatterBegin(xx,aij->lvec,INSERTVALUES,SCATTERALL,aij->Mvctx);
431   CHKERRQ(ierr);
432   ierr = MatMultAdd(aij->A,xx,yy,zz); CHKERRQ(ierr);
433   ierr = VecScatterEnd(xx,aij->lvec,INSERTVALUES,SCATTERALL,aij->Mvctx);
434   CHKERRQ(ierr);
435   ierr = MatMultAdd(aij->B,aij->lvec,zz,zz); CHKERRQ(ierr);
436   return 0;
437 }
438 
439 static int MatMultTrans_MPIAIJ(Mat aijin,Vec xx,Vec yy)
440 {
441   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) aijin->data;
442   int        ierr;
443 
444   if (!aij->assembled)
445     SETERRQ(1,"MatMulTrans_MPIAIJ:must assemble matrix first");
446   /* do nondiagonal part */
447   ierr = MatMultTrans(aij->B,xx,aij->lvec); CHKERRQ(ierr);
448   /* send it on its way */
449   ierr = VecScatterBegin(aij->lvec,yy,ADDVALUES,
450            (ScatterMode)(SCATTERALL|SCATTERREVERSE),aij->Mvctx); CHKERRQ(ierr);
451   /* do local part */
452   ierr = MatMultTrans(aij->A,xx,yy); CHKERRQ(ierr);
453   /* receive remote parts: note this assumes the values are not actually */
454   /* inserted in yy until the next line, which is true for my implementation*/
455   /* but is not perhaps always true. */
456   ierr = VecScatterEnd(aij->lvec,yy,ADDVALUES,
457          (ScatterMode)(SCATTERALL|SCATTERREVERSE),aij->Mvctx); CHKERRQ(ierr);
458   return 0;
459 }
460 
461 static int MatMultTransAdd_MPIAIJ(Mat aijin,Vec xx,Vec yy,Vec zz)
462 {
463   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) aijin->data;
464   int        ierr;
465 
466   if (!aij->assembled)
467     SETERRQ(1,"MatMulTransAdd_MPIAIJ:must assemble matrix first");
468   /* do nondiagonal part */
469   ierr = MatMultTrans(aij->B,xx,aij->lvec); CHKERRQ(ierr);
470   /* send it on its way */
471   ierr = VecScatterBegin(aij->lvec,zz,ADDVALUES,
472          (ScatterMode)(SCATTERALL|SCATTERREVERSE),aij->Mvctx); CHKERRQ(ierr);
473   /* do local part */
474   ierr = MatMultTransAdd(aij->A,xx,yy,zz); CHKERRQ(ierr);
475   /* receive remote parts: note this assumes the values are not actually */
476   /* inserted in yy until the next line, which is true for my implementation*/
477   /* but is not perhaps always true. */
478   ierr = VecScatterEnd(aij->lvec,zz,ADDVALUES,
479        (ScatterMode)(SCATTERALL|SCATTERREVERSE),aij->Mvctx); CHKERRQ(ierr);
480   return 0;
481 }
482 
483 /*
484   This only works correctly for square matrices where the subblock A->A is the
485    diagonal block
486 */
487 static int MatGetDiagonal_MPIAIJ(Mat Ain,Vec v)
488 {
489   Mat_MPIAIJ *A = (Mat_MPIAIJ *) Ain->data;
490   if (!A->assembled) SETERRQ(1,"MatGetDiag_MPIAIJ:must assemble matrix first");
491   return MatGetDiagonal(A->A,v);
492 }
493 
494 static int MatDestroy_MPIAIJ(PetscObject obj)
495 {
496   Mat        mat = (Mat) obj;
497   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
498   int        ierr;
499 #if defined(PETSC_LOG)
500   PLogObjectState(obj,"Rows=%d, Cols=%d",aij->M,aij->N);
501 #endif
502   PETSCFREE(aij->rowners);
503   ierr = MatDestroy(aij->A); CHKERRQ(ierr);
504   ierr = MatDestroy(aij->B); CHKERRQ(ierr);
505   if (aij->colmap) PETSCFREE(aij->colmap);
506   if (aij->garray) PETSCFREE(aij->garray);
507   if (aij->lvec) VecDestroy(aij->lvec);
508   if (aij->Mvctx) VecScatterCtxDestroy(aij->Mvctx);
509   PETSCFREE(aij);
510   PLogObjectDestroy(mat);
511   PETSCHEADERDESTROY(mat);
512   return 0;
513 }
514 #include "draw.h"
515 #include "pviewer.h"
516 
517 static int MatView_MPIAIJ(PetscObject obj,Viewer viewer)
518 {
519   Mat         mat = (Mat) obj;
520   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
521   int         ierr, format;
522   PetscObject vobj = (PetscObject) viewer;
523 
524   if (!aij->assembled) SETERRQ(1,"MatView_MPIAIJ:must assemble matrix first");
525   if (!viewer) { /* so that viewers may be used from debuggers */
526     viewer = STDOUT_VIEWER; vobj = (PetscObject) viewer;
527   }
528   if (vobj->cookie == DRAW_COOKIE && vobj->type == NULLWINDOW) return 0;
529   format = ViewerFileGetFormat_Private(viewer);
530   if (vobj->cookie == VIEWER_COOKIE && format == FILE_FORMAT_INFO &&
531      (vobj->type == FILE_VIEWER || vobj->type == FILES_VIEWER)) {
532    /* do nothing for now */
533   }
534   else if (vobj->cookie == VIEWER_COOKIE && vobj->type == FILE_VIEWER) {
535     FILE *fd = ViewerFileGetPointer_Private(viewer);
536     MPIU_Seq_begin(mat->comm,1);
537     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
538            aij->mytid,aij->m,aij->rstart,aij->rend,aij->n,aij->cstart,
539            aij->cend);
540     ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
541     ierr = MatView(aij->B,viewer); CHKERRQ(ierr);
542     fflush(fd);
543     MPIU_Seq_end(mat->comm,1);
544   }
545   else if ((vobj->cookie == VIEWER_COOKIE && vobj->type == FILES_VIEWER) ||
546             vobj->cookie == DRAW_COOKIE) {
547     int numtids = aij->numtids, mytid = aij->mytid;
548     FILE *fd = ViewerFileGetPointer_Private(viewer);
549     if (numtids == 1) {
550       ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
551     }
552     else {
553       /* assemble the entire matrix onto first processor. */
554       Mat     A;
555       Mat_AIJ *Aloc;
556       int     M = aij->M, N = aij->N,m,*ai,*aj,row,*cols,i,*ct;
557       Scalar  *a;
558 
559       if (!mytid) {
560         ierr = MatCreateMPIAIJ(mat->comm,M,N,M,N,0,0,0,0,&A);
561       }
562       else {
563         ierr = MatCreateMPIAIJ(mat->comm,0,0,M,N,0,0,0,0,&A);
564       }
565       PLogObjectParent(mat,A);
566       CHKERRQ(ierr);
567 
568       /* copy over the A part */
569       Aloc = (Mat_AIJ*) aij->A->data;
570       m = Aloc->m; ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
571       row = aij->rstart;
572       for ( i=0; i<ai[m]-1; i++ ) {aj[i] += aij->cstart - 1;}
573       for ( i=0; i<m; i++ ) {
574         ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],aj,a,INSERTVALUES);
575         CHKERRQ(ierr);
576         row++; a += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
577       }
578       aj = Aloc->j;
579       for ( i=0; i<ai[m]-1; i++ ) {aj[i] -= aij->cstart - 1;}
580 
581       /* copy over the B part */
582       Aloc = (Mat_AIJ*) aij->B->data;
583       m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
584       row = aij->rstart;
585       ct = cols = (int *) PETSCMALLOC( (ai[m]+1)*sizeof(int) ); CHKPTRQ(cols);
586       for ( i=0; i<ai[m]-1; i++ ) {cols[i] = aij->garray[aj[i]-1];}
587       for ( i=0; i<m; i++ ) {
588         ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],cols,a,INSERTVALUES);
589         CHKERRQ(ierr);
590         row++; a += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
591       }
592       PETSCFREE(ct);
593       ierr = MatAssemblyBegin(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
594       ierr = MatAssemblyEnd(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
595       if (!mytid) {
596         ierr = MatView(((Mat_MPIAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
597       }
598       ierr = MatDestroy(A); CHKERRQ(ierr);
599     }
600   }
601   return 0;
602 }
603 
604 extern int MatMarkDiag_AIJ(Mat);
605 /*
606     This has to provide several versions.
607 
608      1) per sequential
609      2) a) use only local smoothing updating outer values only once.
610         b) local smoothing updating outer values each inner iteration
611      3) color updating out values betwen colors.
612 */
613 static int MatRelax_MPIAIJ(Mat matin,Vec bb,double omega,MatSORType flag,
614                            double shift,int its,Vec xx)
615 {
616   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
617   Mat        AA = mat->A, BB = mat->B;
618   Mat_AIJ    *A = (Mat_AIJ *) AA->data, *B = (Mat_AIJ *)BB->data;
619   Scalar     zero = 0.0,*b,*x,*xs,*ls,d,*v,sum,scale,*t,*ts;
620   int        ierr,*idx, *diag;
621   int        n = mat->n, m = mat->m, i;
622   Vec        tt;
623 
624   if (!mat->assembled) SETERRQ(1,"MatRelax_MPIAIJ:must assemble matrix first");
625 
626   VecGetArray(xx,&x); VecGetArray(bb,&b); VecGetArray(mat->lvec,&ls);
627   xs = x -1; /* shift by one for index start of 1 */
628   ls--;
629   if (!A->diag) {if ((ierr = MatMarkDiag_AIJ(AA))) return ierr;}
630   diag = A->diag;
631   if (flag == SOR_APPLY_UPPER || flag == SOR_APPLY_LOWER) {
632     SETERRQ(1,"MatRelax_MPIAIJ:Option not yet supported");
633   }
634   if (flag & SOR_EISENSTAT) {
635     /* Let  A = L + U + D; where L is lower trianglar,
636     U is upper triangular, E is diagonal; This routine applies
637 
638             (L + E)^{-1} A (U + E)^{-1}
639 
640     to a vector efficiently using Eisenstat's trick. This is for
641     the case of SSOR preconditioner, so E is D/omega where omega
642     is the relaxation factor.
643     */
644     ierr = VecDuplicate(xx,&tt); CHKERRQ(ierr);
645     PLogObjectParent(matin,tt);
646     VecGetArray(tt,&t);
647     scale = (2.0/omega) - 1.0;
648     /*  x = (E + U)^{-1} b */
649     VecSet(&zero,mat->lvec);
650     ierr = VecPipelineBegin(xx,mat->lvec,INSERTVALUES,PIPELINEUP,
651                               mat->Mvctx); CHKERRQ(ierr);
652     for ( i=m-1; i>-1; i-- ) {
653       n    = A->i[i+1] - diag[i] - 1;
654       idx  = A->j + diag[i];
655       v    = A->a + diag[i];
656       sum  = b[i];
657       SPARSEDENSEMDOT(sum,xs,v,idx,n);
658       d    = shift + A->a[diag[i]-1];
659       n    = B->i[i+1] - B->i[i];
660       idx  = B->j + B->i[i] - 1;
661       v    = B->a + B->i[i] - 1;
662       SPARSEDENSEMDOT(sum,ls,v,idx,n);
663       x[i] = omega*(sum/d);
664     }
665     ierr = VecPipelineEnd(xx,mat->lvec,INSERTVALUES,PIPELINEUP,
666                             mat->Mvctx); CHKERRQ(ierr);
667 
668     /*  t = b - (2*E - D)x */
669     v = A->a;
670     for ( i=0; i<m; i++ ) { t[i] = b[i] - scale*(v[*diag++ - 1])*x[i]; }
671 
672     /*  t = (E + L)^{-1}t */
673     ts = t - 1; /* shifted by one for index start of a or mat->j*/
674     diag = A->diag;
675     VecSet(&zero,mat->lvec);
676     ierr = VecPipelineBegin(tt,mat->lvec,INSERTVALUES,PIPELINEDOWN,
677                                                  mat->Mvctx); CHKERRQ(ierr);
678     for ( i=0; i<m; i++ ) {
679       n    = diag[i] - A->i[i];
680       idx  = A->j + A->i[i] - 1;
681       v    = A->a + A->i[i] - 1;
682       sum  = t[i];
683       SPARSEDENSEMDOT(sum,ts,v,idx,n);
684       d    = shift + A->a[diag[i]-1];
685       n    = B->i[i+1] - B->i[i];
686       idx  = B->j + B->i[i] - 1;
687       v    = B->a + B->i[i] - 1;
688       SPARSEDENSEMDOT(sum,ls,v,idx,n);
689       t[i] = omega*(sum/d);
690     }
691     ierr = VecPipelineEnd(tt,mat->lvec,INSERTVALUES,PIPELINEDOWN,
692                                                     mat->Mvctx); CHKERRQ(ierr);
693     /*  x = x + t */
694     for ( i=0; i<m; i++ ) { x[i] += t[i]; }
695     VecDestroy(tt);
696     return 0;
697   }
698 
699 
700   if ((flag & SOR_SYMMETRIC_SWEEP) == SOR_SYMMETRIC_SWEEP){
701     if (flag & SOR_ZERO_INITIAL_GUESS) {
702       VecSet(&zero,mat->lvec); VecSet(&zero,xx);
703     }
704     else {
705       ierr=VecScatterBegin(xx,mat->lvec,INSERTVALUES,SCATTERUP,mat->Mvctx);
706       CHKERRQ(ierr);
707       ierr = VecScatterEnd(xx,mat->lvec,INSERTVALUES,SCATTERUP,mat->Mvctx);
708       CHKERRQ(ierr);
709     }
710     while (its--) {
711       /* go down through the rows */
712       ierr = VecPipelineBegin(xx,mat->lvec,INSERTVALUES,PIPELINEDOWN,
713                               mat->Mvctx); CHKERRQ(ierr);
714       for ( i=0; i<m; i++ ) {
715         n    = A->i[i+1] - A->i[i];
716         idx  = A->j + A->i[i] - 1;
717         v    = A->a + A->i[i] - 1;
718         sum  = b[i];
719         SPARSEDENSEMDOT(sum,xs,v,idx,n);
720         d    = shift + A->a[diag[i]-1];
721         n    = B->i[i+1] - B->i[i];
722         idx  = B->j + B->i[i] - 1;
723         v    = B->a + B->i[i] - 1;
724         SPARSEDENSEMDOT(sum,ls,v,idx,n);
725         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
726       }
727       ierr = VecPipelineEnd(xx,mat->lvec,INSERTVALUES,PIPELINEDOWN,
728                             mat->Mvctx); CHKERRQ(ierr);
729       /* come up through the rows */
730       ierr = VecPipelineBegin(xx,mat->lvec,INSERTVALUES,PIPELINEUP,
731                               mat->Mvctx); CHKERRQ(ierr);
732       for ( i=m-1; i>-1; i-- ) {
733         n    = A->i[i+1] - A->i[i];
734         idx  = A->j + A->i[i] - 1;
735         v    = A->a + A->i[i] - 1;
736         sum  = b[i];
737         SPARSEDENSEMDOT(sum,xs,v,idx,n);
738         d    = shift + A->a[diag[i]-1];
739         n    = B->i[i+1] - B->i[i];
740         idx  = B->j + B->i[i] - 1;
741         v    = B->a + B->i[i] - 1;
742         SPARSEDENSEMDOT(sum,ls,v,idx,n);
743         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
744       }
745       ierr = VecPipelineEnd(xx,mat->lvec,INSERTVALUES,PIPELINEUP,
746                             mat->Mvctx); CHKERRQ(ierr);
747     }
748   }
749   else if (flag & SOR_FORWARD_SWEEP){
750     if (flag & SOR_ZERO_INITIAL_GUESS) {
751       VecSet(&zero,mat->lvec);
752       ierr = VecPipelineBegin(xx,mat->lvec,INSERTVALUES,PIPELINEDOWN,
753                               mat->Mvctx); CHKERRQ(ierr);
754       for ( i=0; i<m; i++ ) {
755         n    = diag[i] - A->i[i];
756         idx  = A->j + A->i[i] - 1;
757         v    = A->a + A->i[i] - 1;
758         sum  = b[i];
759         SPARSEDENSEMDOT(sum,xs,v,idx,n);
760         d    = shift + A->a[diag[i]-1];
761         n    = B->i[i+1] - B->i[i];
762         idx  = B->j + B->i[i] - 1;
763         v    = B->a + B->i[i] - 1;
764         SPARSEDENSEMDOT(sum,ls,v,idx,n);
765         x[i] = omega*(sum/d);
766       }
767       ierr = VecPipelineEnd(xx,mat->lvec,INSERTVALUES,PIPELINEDOWN,
768                             mat->Mvctx); CHKERRQ(ierr);
769       its--;
770     }
771     while (its--) {
772       ierr=VecScatterBegin(xx,mat->lvec,INSERTVALUES,SCATTERUP,mat->Mvctx);
773       CHKERRQ(ierr);
774       ierr = VecScatterEnd(xx,mat->lvec,INSERTVALUES,SCATTERUP,mat->Mvctx);
775       CHKERRQ(ierr);
776       ierr = VecPipelineBegin(xx,mat->lvec,INSERTVALUES,PIPELINEDOWN,
777                               mat->Mvctx); CHKERRQ(ierr);
778       for ( i=0; i<m; i++ ) {
779         n    = A->i[i+1] - A->i[i];
780         idx  = A->j + A->i[i] - 1;
781         v    = A->a + A->i[i] - 1;
782         sum  = b[i];
783         SPARSEDENSEMDOT(sum,xs,v,idx,n);
784         d    = shift + A->a[diag[i]-1];
785         n    = B->i[i+1] - B->i[i];
786         idx  = B->j + B->i[i] - 1;
787         v    = B->a + B->i[i] - 1;
788         SPARSEDENSEMDOT(sum,ls,v,idx,n);
789         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
790       }
791       ierr = VecPipelineEnd(xx,mat->lvec,INSERTVALUES,PIPELINEDOWN,
792                             mat->Mvctx); CHKERRQ(ierr);
793     }
794   }
795   else if (flag & SOR_BACKWARD_SWEEP){
796     if (flag & SOR_ZERO_INITIAL_GUESS) {
797       VecSet(&zero,mat->lvec);
798       ierr = VecPipelineBegin(xx,mat->lvec,INSERTVALUES,PIPELINEUP,
799                               mat->Mvctx); CHKERRQ(ierr);
800       for ( i=m-1; i>-1; i-- ) {
801         n    = A->i[i+1] - diag[i] - 1;
802         idx  = A->j + diag[i];
803         v    = A->a + diag[i];
804         sum  = b[i];
805         SPARSEDENSEMDOT(sum,xs,v,idx,n);
806         d    = shift + A->a[diag[i]-1];
807         n    = B->i[i+1] - B->i[i];
808         idx  = B->j + B->i[i] - 1;
809         v    = B->a + B->i[i] - 1;
810         SPARSEDENSEMDOT(sum,ls,v,idx,n);
811         x[i] = omega*(sum/d);
812       }
813       ierr = VecPipelineEnd(xx,mat->lvec,INSERTVALUES,PIPELINEUP,
814                             mat->Mvctx); CHKERRQ(ierr);
815       its--;
816     }
817     while (its--) {
818       ierr = VecScatterBegin(xx,mat->lvec,INSERTVALUES,SCATTERDOWN,
819                             mat->Mvctx); CHKERRQ(ierr);
820       ierr = VecScatterEnd(xx,mat->lvec,INSERTVALUES,SCATTERDOWN,
821                             mat->Mvctx); CHKERRQ(ierr);
822       ierr = VecPipelineBegin(xx,mat->lvec,INSERTVALUES,PIPELINEUP,
823                               mat->Mvctx); CHKERRQ(ierr);
824       for ( i=m-1; i>-1; i-- ) {
825         n    = A->i[i+1] - A->i[i];
826         idx  = A->j + A->i[i] - 1;
827         v    = A->a + A->i[i] - 1;
828         sum  = b[i];
829         SPARSEDENSEMDOT(sum,xs,v,idx,n);
830         d    = shift + A->a[diag[i]-1];
831         n    = B->i[i+1] - B->i[i];
832         idx  = B->j + B->i[i] - 1;
833         v    = B->a + B->i[i] - 1;
834         SPARSEDENSEMDOT(sum,ls,v,idx,n);
835         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
836       }
837       ierr = VecPipelineEnd(xx,mat->lvec,INSERTVALUES,PIPELINEUP,
838                             mat->Mvctx); CHKERRQ(ierr);
839     }
840   }
841   else if ((flag & SOR_LOCAL_SYMMETRIC_SWEEP) == SOR_LOCAL_SYMMETRIC_SWEEP){
842     if (flag & SOR_ZERO_INITIAL_GUESS) {
843       return MatRelax(mat->A,bb,omega,flag,shift,its,xx);
844     }
845     ierr=VecScatterBegin(xx,mat->lvec,INSERTVALUES,SCATTERALL,mat->Mvctx);
846     CHKERRQ(ierr);
847     ierr = VecScatterEnd(xx,mat->lvec,INSERTVALUES,SCATTERALL,mat->Mvctx);
848     CHKERRQ(ierr);
849     while (its--) {
850       /* go down through the rows */
851       for ( i=0; i<m; i++ ) {
852         n    = A->i[i+1] - A->i[i];
853         idx  = A->j + A->i[i] - 1;
854         v    = A->a + A->i[i] - 1;
855         sum  = b[i];
856         SPARSEDENSEMDOT(sum,xs,v,idx,n);
857         d    = shift + A->a[diag[i]-1];
858         n    = B->i[i+1] - B->i[i];
859         idx  = B->j + B->i[i] - 1;
860         v    = B->a + B->i[i] - 1;
861         SPARSEDENSEMDOT(sum,ls,v,idx,n);
862         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
863       }
864       /* come up through the rows */
865       for ( i=m-1; i>-1; i-- ) {
866         n    = A->i[i+1] - A->i[i];
867         idx  = A->j + A->i[i] - 1;
868         v    = A->a + A->i[i] - 1;
869         sum  = b[i];
870         SPARSEDENSEMDOT(sum,xs,v,idx,n);
871         d    = shift + A->a[diag[i]-1];
872         n    = B->i[i+1] - B->i[i];
873         idx  = B->j + B->i[i] - 1;
874         v    = B->a + B->i[i] - 1;
875         SPARSEDENSEMDOT(sum,ls,v,idx,n);
876         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
877       }
878     }
879   }
880   else if (flag & SOR_LOCAL_FORWARD_SWEEP){
881     if (flag & SOR_ZERO_INITIAL_GUESS) {
882       return MatRelax(mat->A,bb,omega,flag,shift,its,xx);
883     }
884     ierr=VecScatterBegin(xx,mat->lvec,INSERTVALUES,SCATTERALL,mat->Mvctx);
885     CHKERRQ(ierr);
886     ierr = VecScatterEnd(xx,mat->lvec,INSERTVALUES,SCATTERALL,mat->Mvctx);
887     CHKERRQ(ierr);
888     while (its--) {
889       for ( i=0; i<m; i++ ) {
890         n    = A->i[i+1] - A->i[i];
891         idx  = A->j + A->i[i] - 1;
892         v    = A->a + A->i[i] - 1;
893         sum  = b[i];
894         SPARSEDENSEMDOT(sum,xs,v,idx,n);
895         d    = shift + A->a[diag[i]-1];
896         n    = B->i[i+1] - B->i[i];
897         idx  = B->j + B->i[i] - 1;
898         v    = B->a + B->i[i] - 1;
899         SPARSEDENSEMDOT(sum,ls,v,idx,n);
900         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
901       }
902     }
903   }
904   else if (flag & SOR_LOCAL_BACKWARD_SWEEP){
905     if (flag & SOR_ZERO_INITIAL_GUESS) {
906       return MatRelax(mat->A,bb,omega,flag,shift,its,xx);
907     }
908     ierr = VecScatterBegin(xx,mat->lvec,INSERTVALUES,SCATTERALL,
909                             mat->Mvctx); CHKERRQ(ierr);
910     ierr = VecScatterEnd(xx,mat->lvec,INSERTVALUES,SCATTERALL,
911                             mat->Mvctx); CHKERRQ(ierr);
912     while (its--) {
913       for ( i=m-1; i>-1; i-- ) {
914         n    = A->i[i+1] - A->i[i];
915         idx  = A->j + A->i[i] - 1;
916         v    = A->a + A->i[i] - 1;
917         sum  = b[i];
918         SPARSEDENSEMDOT(sum,xs,v,idx,n);
919         d    = shift + A->a[diag[i]-1];
920         n    = B->i[i+1] - B->i[i];
921         idx  = B->j + B->i[i] - 1;
922         v    = B->a + B->i[i] - 1;
923         SPARSEDENSEMDOT(sum,ls,v,idx,n);
924         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
925       }
926     }
927   }
928   return 0;
929 }
930 
931 static int MatGetInfo_MPIAIJ(Mat matin,MatInfoType flag,int *nz,
932                              int *nzalloc,int *mem)
933 {
934   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
935   Mat        A = mat->A, B = mat->B;
936   int        ierr, isend[3], irecv[3], nzA, nzallocA, memA;
937 
938   ierr = MatGetInfo(A,MAT_LOCAL,&nzA,&nzallocA,&memA); CHKERRQ(ierr);
939   ierr = MatGetInfo(B,MAT_LOCAL,&isend[0],&isend[1],&isend[2]); CHKERRQ(ierr);
940   isend[0] += nzA; isend[1] += nzallocA; isend[2] += memA;
941   if (flag == MAT_LOCAL) {
942     *nz = isend[0]; *nzalloc = isend[1]; *mem = isend[2];
943   } else if (flag == MAT_GLOBAL_MAX) {
944     MPI_Allreduce((void *) isend,(void *) irecv,3,MPI_INT,MPI_MAX,matin->comm);
945     *nz = irecv[0]; *nzalloc = irecv[1]; *mem = irecv[2];
946   } else if (flag == MAT_GLOBAL_SUM) {
947     MPI_Allreduce((void *) isend,(void *) irecv,3,MPI_INT,MPI_SUM,matin->comm);
948     *nz = irecv[0]; *nzalloc = irecv[1]; *mem = irecv[2];
949   }
950   return 0;
951 }
952 
953 extern int MatLUFactorSymbolic_MPIAIJ(Mat,IS,IS,double,Mat*);
954 extern int MatLUFactorNumeric_MPIAIJ(Mat,Mat*);
955 extern int MatLUFactor_MPIAIJ(Mat,IS,IS,double);
956 extern int MatILUFactorSymbolic_MPIAIJ(Mat,IS,IS,double,int,Mat *);
957 extern int MatSolve_MPIAIJ(Mat,Vec,Vec);
958 extern int MatSolveAdd_MPIAIJ(Mat,Vec,Vec,Vec);
959 extern int MatSolveTrans_MPIAIJ(Mat,Vec,Vec);
960 extern int MatSolveTransAdd_MPIAIJ(Mat,Vec,Vec,Vec);
961 
962 static int MatSetOption_MPIAIJ(Mat aijin,MatOption op)
963 {
964   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) aijin->data;
965 
966   if      (op == NO_NEW_NONZERO_LOCATIONS)  {
967     MatSetOption(aij->A,op);
968     MatSetOption(aij->B,op);
969   }
970   else if (op == YES_NEW_NONZERO_LOCATIONS) {
971     MatSetOption(aij->A,op);
972     MatSetOption(aij->B,op);
973   }
974   else if (op == COLUMN_ORIENTED)
975     SETERRQ(1,"MatSetOption_MPIAIJ:Column oriented not supported");
976   return 0;
977 }
978 
979 static int MatGetSize_MPIAIJ(Mat matin,int *m,int *n)
980 {
981   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
982   *m = mat->M; *n = mat->N;
983   return 0;
984 }
985 
986 static int MatGetLocalSize_MPIAIJ(Mat matin,int *m,int *n)
987 {
988   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
989   *m = mat->m; *n = mat->N;
990   return 0;
991 }
992 
993 static int MatGetOwnershipRange_MPIAIJ(Mat matin,int *m,int *n)
994 {
995   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
996   *m = mat->rstart; *n = mat->rend;
997   return 0;
998 }
999 
1000 static int MatGetRow_MPIAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
1001 {
1002   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1003   Scalar     *vworkA, *vworkB, **pvA, **pvB;
1004   int        i, ierr, *cworkA, *cworkB, **pcA, **pcB, cstart = mat->cstart;
1005   int        nztot, nzA, nzB, lrow, rstart = mat->rstart, rend = mat->rend;
1006 
1007   if (row < rstart || row >= rend)
1008     SETERRQ(1,"MatGetRow_MPIAIJ:Currently you can get only local rows")
1009   lrow = row - rstart;
1010 
1011   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
1012   if (!v)   {pvA = 0; pvB = 0;}
1013   if (!idx) {pcA = 0; if (!v) pcB = 0;}
1014   ierr = MatGetRow(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1015   ierr = MatGetRow(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1016   nztot = nzA + nzB;
1017 
1018   if (v  || idx) {
1019     if (nztot) {
1020       /* Sort by increasing column numbers, assuming A and B already sorted */
1021       int imark;
1022       if (mat->assembled) {
1023         for (i=0; i<nzB; i++) cworkB[i] = mat->garray[cworkB[i]];
1024       }
1025       if (v) {
1026         *v = (Scalar *) PETSCMALLOC( (nztot)*sizeof(Scalar) ); CHKPTRQ(*v);
1027         for ( i=0; i<nzB; i++ ) {
1028           if (cworkB[i] < cstart)   (*v)[i] = vworkB[i];
1029           else break;
1030         }
1031         imark = i;
1032         for ( i=0; i<nzA; i++ )     (*v)[imark+i] = vworkA[i];
1033         for ( i=imark; i<nzB; i++ ) (*v)[nzA+i] = vworkB[i];
1034       }
1035       if (idx) {
1036         *idx = (int *) PETSCMALLOC( (nztot)*sizeof(int) ); CHKPTRQ(*idx);
1037         for (i=0; i<nzA; i++) cworkA[i] += cstart;
1038         for ( i=0; i<nzB; i++ ) {
1039           if (cworkB[i] < cstart)   (*idx)[i] = cworkB[i];
1040           else break;
1041         }
1042         imark = i;
1043         for ( i=0; i<nzA; i++ )     (*idx)[imark+i] = cworkA[i];
1044         for ( i=imark; i<nzB; i++ ) (*idx)[nzA+i] = cworkB[i];
1045       }
1046     }
1047     else {*idx = 0; *v=0;}
1048   }
1049   *nz = nztot;
1050   ierr = MatRestoreRow(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1051   ierr = MatRestoreRow(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1052   return 0;
1053 }
1054 
1055 static int MatRestoreRow_MPIAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
1056 {
1057   if (idx) PETSCFREE(*idx);
1058   if (v) PETSCFREE(*v);
1059   return 0;
1060 }
1061 
1062 static int MatNorm_MPIAIJ(Mat mat,MatNormType type,double *norm)
1063 {
1064   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
1065   int ierr;
1066   if (aij->numtids == 1) {
1067     ierr =  MatNorm(aij->A,type,norm); CHKERRQ(ierr);
1068   } else
1069     SETERRQ(1,"MatNorm_MPIAIJ:not yet supported in parallel");
1070   return 0;
1071 }
1072 
1073 static int MatTranspose_MPIAIJ(Mat A,Mat *Bin)
1074 {
1075   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
1076   int        ierr;
1077   Mat        B;
1078   Mat_AIJ    *Aloc;
1079   int        M = a->M, N = a->N,m,*ai,*aj,row,*cols,i,*ct;
1080   Scalar     *array;
1081 
1082   ierr = MatCreateMPIAIJ(A->comm,PETSC_DECIDE,PETSC_DECIDE,N,M,0,0,0,0,&B);
1083   CHKERRQ(ierr);
1084 
1085   /* copy over the A part */
1086   Aloc = (Mat_AIJ*) a->A->data;
1087   m = Aloc->m; ai = Aloc->i; aj = Aloc->j; array = Aloc->a;
1088   row = a->rstart;
1089   for ( i=0; i<ai[m]-1; i++ ) {aj[i] += a->cstart - 1;}
1090   for ( i=0; i<m; i++ ) {
1091       ierr = MatSetValues(B,ai[i+1]-ai[i],aj,1,&row,array,INSERTVALUES);
1092       CHKERRQ(ierr);
1093       row++; array += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
1094   }
1095   aj = Aloc->j;
1096   for ( i=0; i<ai[m]-1; i++ ) {aj[i] -= a->cstart - 1;}
1097 
1098   /* copy over the B part */
1099   Aloc = (Mat_AIJ*) a->B->data;
1100   m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; array = Aloc->a;
1101   row = a->rstart;
1102   ct = cols = (int *) PETSCMALLOC( (ai[m]+1)*sizeof(int) ); CHKPTRQ(cols);
1103   for ( i=0; i<ai[m]-1; i++ ) {cols[i] = a->garray[aj[i]-1];}
1104   for ( i=0; i<m; i++ ) {
1105     ierr = MatSetValues(B,ai[i+1]-ai[i],cols,1,&row,array,INSERTVALUES);
1106     CHKERRQ(ierr);
1107     row++; array += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
1108   }
1109   PETSCFREE(ct);
1110   ierr = MatAssemblyBegin(B,FINAL_ASSEMBLY); CHKERRQ(ierr);
1111   ierr = MatAssemblyEnd(B,FINAL_ASSEMBLY); CHKERRQ(ierr);
1112   *Bin = B;
1113   return 0;
1114 }
1115 
1116 extern int MatConvert_MPIAIJ(Mat,MatType,Mat *);
1117 static int MatCopyPrivate_MPIAIJ(Mat,Mat *);
1118 
1119 /* -------------------------------------------------------------------*/
1120 static struct _MatOps MatOps = {MatSetValues_MPIAIJ,
1121        MatGetRow_MPIAIJ,MatRestoreRow_MPIAIJ,
1122        MatMult_MPIAIJ,MatMultAdd_MPIAIJ,
1123        MatMultTrans_MPIAIJ,MatMultTransAdd_MPIAIJ,
1124        MatSolve_MPIAIJ,MatSolveAdd_MPIAIJ,
1125        MatSolveTrans_MPIAIJ,MatSolveTransAdd_MPIAIJ,
1126        MatLUFactor_MPIAIJ,0,
1127        MatRelax_MPIAIJ,
1128        MatTranspose_MPIAIJ,
1129        MatGetInfo_MPIAIJ,0,
1130        MatGetDiagonal_MPIAIJ,0,MatNorm_MPIAIJ,
1131        MatAssemblyBegin_MPIAIJ,MatAssemblyEnd_MPIAIJ,
1132        0,
1133        MatSetOption_MPIAIJ,MatZeroEntries_MPIAIJ,MatZeroRows_MPIAIJ,
1134        MatGetReordering_MPIAIJ,
1135        MatLUFactorSymbolic_MPIAIJ,MatLUFactorNumeric_MPIAIJ,0,0,
1136        MatGetSize_MPIAIJ,MatGetLocalSize_MPIAIJ,MatGetOwnershipRange_MPIAIJ,
1137        MatILUFactorSymbolic_MPIAIJ,0,
1138        0,0,MatConvert_MPIAIJ,0,0,MatCopyPrivate_MPIAIJ};
1139 
1140 /*@
1141    MatCreateMPIAIJ - Creates a sparse parallel matrix in AIJ format
1142    (the default parallel PETSc format).
1143 
1144    Input Parameters:
1145 .  comm - MPI communicator
1146 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1147 .  n - number of local columns (or PETSC_DECIDE to have calculated
1148            if N is given)
1149 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1150 .  N - number of global columns (or PETSC_DECIDE to have calculated
1151            if n is given)
1152 .  d_nz - number of nonzeros per row in diagonal portion of local submatrix
1153            (same for all local rows)
1154 .  d_nzz - number of nonzeros per row in diagonal portion of local submatrix
1155            or null (possibly different for each row).  You must leave room
1156            for the diagonal entry even if it is zero.
1157 .  o_nz - number of nonzeros per row in off-diagonal portion of local
1158            submatrix (same for all local rows).
1159 .  o_nzz - number of nonzeros per row in off-diagonal portion of local
1160            submatrix or null (possibly different for each row).
1161 
1162    Output Parameter:
1163 .  newmat - the matrix
1164 
1165    Notes:
1166    The AIJ format (also called the Yale sparse matrix format or
1167    compressed row storage), is fully compatible with standard Fortran 77
1168    storage.  That is, the stored row and column indices begin at
1169    one, not zero.  See the users manual for further details.
1170 
1171    The user MUST specify either the local or global matrix dimensions
1172    (possibly both).
1173 
1174    Storage Information:
1175    For a square global matrix we define each processor's diagonal portion
1176    to be its local rows and the corresponding columns (a square submatrix);
1177    each processor's off-diagonal portion encompasses the remainder of the
1178    local matrix (a rectangular submatrix).
1179 
1180    The user can specify preallocated storage for the diagonal part of
1181    the local submatrix with either d_nz or d_nnz (not both). Set both
1182    d_nz and d_nnz to zero for PETSc to control dynamic memory allocation.
1183    Likewise, specify preallocated storage for the off-diagonal part of
1184    the local submatrix with o_nz or o_nnz (not both).
1185 
1186 .keywords: matrix, aij, compressed row, sparse, parallel
1187 
1188 .seealso: MatCreate(), MatCreateSequentialAIJ(), MatSetValues()
1189 @*/
1190 int MatCreateMPIAIJ(MPI_Comm comm,int m,int n,int M,int N,
1191                  int d_nz,int *d_nnz, int o_nz,int *o_nnz,Mat *newmat)
1192 {
1193   Mat          mat;
1194   Mat_MPIAIJ   *aij;
1195   int          ierr, i,sum[2],work[2];
1196   *newmat         = 0;
1197   PETSCHEADERCREATE(mat,_Mat,MAT_COOKIE,MATMPIAIJ,comm);
1198   PLogObjectCreate(mat);
1199   mat->data       = (void *) (aij = PETSCNEW(Mat_MPIAIJ)); CHKPTRQ(aij);
1200   mat->ops        = &MatOps;
1201   mat->destroy    = MatDestroy_MPIAIJ;
1202   mat->view       = MatView_MPIAIJ;
1203   mat->factor     = 0;
1204 
1205   aij->insertmode = NOTSETVALUES;
1206   MPI_Comm_rank(comm,&aij->mytid);
1207   MPI_Comm_size(comm,&aij->numtids);
1208 
1209   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1210     work[0] = m; work[1] = n;
1211     MPI_Allreduce((void *) work,(void *) sum,2,MPI_INT,MPI_SUM,comm );
1212     if (M == PETSC_DECIDE) M = sum[0];
1213     if (N == PETSC_DECIDE) N = sum[1];
1214   }
1215   if (m == PETSC_DECIDE)
1216     {m = M/aij->numtids + ((M % aij->numtids) > aij->mytid);}
1217   if (n == PETSC_DECIDE)
1218     {n = N/aij->numtids + ((N % aij->numtids) > aij->mytid);}
1219   aij->m = m;
1220   aij->n = n;
1221   aij->N = N;
1222   aij->M = M;
1223 
1224   /* build local table of row and column ownerships */
1225   aij->rowners = (int *) PETSCMALLOC(2*(aij->numtids+2)*sizeof(int));
1226   CHKPTRQ(aij->rowners);
1227   PLogObjectMemory(mat,2*(aij->numtids+2)*sizeof(int)+sizeof(struct _Mat)+
1228                        sizeof(Mat_MPIAIJ));
1229   aij->cowners = aij->rowners + aij->numtids + 1;
1230   MPI_Allgather(&m,1,MPI_INT,aij->rowners+1,1,MPI_INT,comm);
1231   aij->rowners[0] = 0;
1232   for ( i=2; i<=aij->numtids; i++ ) {
1233     aij->rowners[i] += aij->rowners[i-1];
1234   }
1235   aij->rstart = aij->rowners[aij->mytid];
1236   aij->rend   = aij->rowners[aij->mytid+1];
1237   MPI_Allgather(&n,1,MPI_INT,aij->cowners+1,1,MPI_INT,comm);
1238   aij->cowners[0] = 0;
1239   for ( i=2; i<=aij->numtids; i++ ) {
1240     aij->cowners[i] += aij->cowners[i-1];
1241   }
1242   aij->cstart = aij->cowners[aij->mytid];
1243   aij->cend   = aij->cowners[aij->mytid+1];
1244 
1245 
1246   ierr = MatCreateSequentialAIJ(MPI_COMM_SELF,m,n,d_nz,d_nnz,&aij->A);
1247   CHKERRQ(ierr);
1248   PLogObjectParent(mat,aij->A);
1249   ierr = MatCreateSequentialAIJ(MPI_COMM_SELF,m,N,o_nz,o_nnz,&aij->B);
1250   CHKERRQ(ierr);
1251   PLogObjectParent(mat,aij->B);
1252 
1253   /* build cache for off array entries formed */
1254   ierr = StashBuild_Private(&aij->stash); CHKERRQ(ierr);
1255   aij->colmap    = 0;
1256   aij->garray    = 0;
1257 
1258   /* stuff used for matrix vector multiply */
1259   aij->lvec      = 0;
1260   aij->Mvctx     = 0;
1261   aij->assembled = 0;
1262 
1263   *newmat = mat;
1264   return 0;
1265 }
1266 
1267 static int MatCopyPrivate_MPIAIJ(Mat matin,Mat *newmat)
1268 {
1269   Mat        mat;
1270   Mat_MPIAIJ *aij,*oldmat = (Mat_MPIAIJ *) matin->data;
1271   int        ierr, len;
1272   *newmat      = 0;
1273 
1274   if (!oldmat->assembled)
1275     SETERRQ(1,"MatCopyPrivate_MPIAIJ:Cannot copy unassembled matrix");
1276   PETSCHEADERCREATE(mat,_Mat,MAT_COOKIE,MATMPIAIJ,matin->comm);
1277   PLogObjectCreate(mat);
1278   mat->data       = (void *) (aij = PETSCNEW(Mat_MPIAIJ)); CHKPTRQ(aij);
1279   mat->ops        = &MatOps;
1280   mat->destroy    = MatDestroy_MPIAIJ;
1281   mat->view       = MatView_MPIAIJ;
1282   mat->factor     = matin->factor;
1283 
1284   aij->m          = oldmat->m;
1285   aij->n          = oldmat->n;
1286   aij->M          = oldmat->M;
1287   aij->N          = oldmat->N;
1288 
1289   aij->assembled  = 1;
1290   aij->rstart     = oldmat->rstart;
1291   aij->rend       = oldmat->rend;
1292   aij->cstart     = oldmat->cstart;
1293   aij->cend       = oldmat->cend;
1294   aij->numtids    = oldmat->numtids;
1295   aij->mytid      = oldmat->mytid;
1296   aij->insertmode = NOTSETVALUES;
1297 
1298   aij->rowners        = (int *) PETSCMALLOC( (aij->numtids+1)*sizeof(int) );
1299   CHKPTRQ(aij->rowners);
1300   PLogObjectMemory(mat,(aij->numtids+1)*sizeof(int)+sizeof(struct _Mat)+
1301                        sizeof(Mat_MPIAIJ));
1302   PETSCMEMCPY(aij->rowners,oldmat->rowners,(aij->numtids+1)*sizeof(int));
1303   ierr = StashInitialize_Private(&aij->stash); CHKERRQ(ierr);
1304   if (oldmat->colmap) {
1305     aij->colmap      = (int *) PETSCMALLOC( (aij->N)*sizeof(int) );
1306     CHKPTRQ(aij->colmap);
1307     PLogObjectMemory(mat,(aij->N)*sizeof(int));
1308     PETSCMEMCPY(aij->colmap,oldmat->colmap,(aij->N)*sizeof(int));
1309   } else aij->colmap = 0;
1310   if (oldmat->garray && (len = ((Mat_AIJ *) (oldmat->B->data))->n)) {
1311     aij->garray      = (int *) PETSCMALLOC(len*sizeof(int) ); CHKPTRQ(aij->garray);
1312     PLogObjectMemory(mat,len*sizeof(int));
1313     PETSCMEMCPY(aij->garray,oldmat->garray,len*sizeof(int));
1314   } else aij->garray = 0;
1315 
1316   ierr =  VecDuplicate(oldmat->lvec,&aij->lvec); CHKERRQ(ierr);
1317   PLogObjectParent(mat,aij->lvec);
1318   ierr =  VecScatterCtxCopy(oldmat->Mvctx,&aij->Mvctx); CHKERRQ(ierr);
1319   PLogObjectParent(mat,aij->Mvctx);
1320   ierr =  MatConvert(oldmat->A,MATSAME,&aij->A); CHKERRQ(ierr);
1321   PLogObjectParent(mat,aij->A);
1322   ierr =  MatConvert(oldmat->B,MATSAME,&aij->B); CHKERRQ(ierr);
1323   PLogObjectParent(mat,aij->B);
1324   *newmat = mat;
1325   return 0;
1326 }
1327