xref: /petsc/src/mat/impls/aij/mpi/mpiaij.c (revision 2ee70a8848b18b2f3cd994db03b82f0ad4f3bf6e)
1 #ifndef lint
2 static char vcid[] = "$Id: mpiaij.c,v 1.24 1995/04/02 16:35:07 curfman Exp $";
3 #endif
4 
5 #include "mpiaij.h"
6 #include "vec/vecimpl.h"
7 #include "inline/spops.h"
8 
9 #define CHUNCKSIZE   100
10 /*
11    This is a simple minded stash. Do a linear search to determine if
12  in stash, if not add to end.
13 */
14 static int StashValues(Stash *stash,int row,int n, int *idxn,
15                        Scalar *values,InsertMode addv)
16 {
17   int    i,j,N = stash->n,found,*n_idx, *n_idy;
18   Scalar val,*n_array;
19 
20   for ( i=0; i<n; i++ ) {
21     found = 0;
22     val = *values++;
23     for ( j=0; j<N; j++ ) {
24       if ( stash->idx[j] == row && stash->idy[j] == idxn[i]) {
25         /* found a match */
26         if (addv == AddValues) stash->array[j] += val;
27         else stash->array[j] = val;
28         found = 1;
29         break;
30       }
31     }
32     if (!found) { /* not found so add to end */
33       if ( stash->n == stash->nmax ) {
34         /* allocate a larger stash */
35         n_array = (Scalar *) MALLOC( (stash->nmax + CHUNCKSIZE)*(
36                                      2*sizeof(int) + sizeof(Scalar)));
37         CHKPTR(n_array);
38         n_idx = (int *) (n_array + stash->nmax + CHUNCKSIZE);
39         n_idy = (int *) (n_idx + stash->nmax + CHUNCKSIZE);
40         MEMCPY(n_array,stash->array,stash->nmax*sizeof(Scalar));
41         MEMCPY(n_idx,stash->idx,stash->nmax*sizeof(int));
42         MEMCPY(n_idy,stash->idy,stash->nmax*sizeof(int));
43         if (stash->array) FREE(stash->array);
44         stash->array = n_array; stash->idx = n_idx; stash->idy = n_idy;
45         stash->nmax += CHUNCKSIZE;
46       }
47       stash->array[stash->n]   = val;
48       stash->idx[stash->n]     = row;
49       stash->idy[stash->n++]   = idxn[i];
50     }
51   }
52   return 0;
53 }
54 
55 /* local utility routine that creates a mapping from the global column
56 number to the local number in the off-diagonal part of the local
57 storage of the matrix.  This is done in a non scable way since the
58 length of colmap equals the global matrix length.
59 */
60 static int CreateColmap(Mat mat)
61 {
62   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
63   Mat_AIJ    *B = (Mat_AIJ*) aij->B->data;
64   int        n = B->n,i;
65   aij->colmap = (int *) MALLOC( aij->N*sizeof(int) ); CHKPTR(aij->colmap);
66   MEMSET(aij->colmap,0,aij->N*sizeof(int));
67   for ( i=0; i<n; i++ ) aij->colmap[aij->garray[i]] = i+1;
68   return 0;
69 }
70 
71 static int MatSetValues_MPIAIJ(Mat mat,int m,int *idxm,int n,
72                             int *idxn,Scalar *v,InsertMode addv)
73 {
74   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
75   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
76   int        cstart = aij->cstart, cend = aij->cend,row,col;
77 
78   if (aij->insertmode != NotSetValues && aij->insertmode != addv) {
79     SETERR(1,"You cannot mix inserts and adds");
80   }
81   aij->insertmode = addv;
82   for ( i=0; i<m; i++ ) {
83     if (idxm[i] < 0) SETERR(1,"Negative row index");
84     if (idxm[i] >= aij->M) SETERR(1,"Row index too large");
85     if (idxm[i] >= rstart && idxm[i] < rend) {
86       row = idxm[i] - rstart;
87       for ( j=0; j<n; j++ ) {
88         if (idxn[j] < 0) SETERR(1,"Negative column index");
89         if (idxn[j] >= aij->N) SETERR(1,"Column index too large");
90         if (idxn[j] >= cstart && idxn[j] < cend){
91           col = idxn[j] - cstart;
92           ierr = MatSetValues(aij->A,1,&row,1,&col,v+i*n+j,addv);CHKERR(ierr);
93         }
94         else {
95           if (aij->assembled) {
96             if (!aij->colmap) {ierr = CreateColmap(mat); CHKERR(ierr);}
97             col = aij->colmap[idxn[j]] - 1;
98             if (col < 0) {
99               SETERR(1,"Cannot insert new off diagonal block nonzero in\
100                      already\
101                      assembled matrix. Contact petsc-maint@mcs.anl.gov\
102                      if your need this feature");
103             }
104           }
105           else col = idxn[j];
106           ierr = MatSetValues(aij->B,1,&row,1,&col,v+i*n+j,addv);CHKERR(ierr);
107         }
108       }
109     }
110     else {
111       ierr = StashValues(&aij->stash,idxm[i],n,idxn,v+i*n,addv);CHKERR(ierr);
112     }
113   }
114   return 0;
115 }
116 
117 /*
118     the assembly code is alot like the code for vectors, we should
119     sometime derive a single assembly code that can be used for
120     either case.
121 */
122 
123 static int MatBeginAssemble_MPIAIJ(Mat mat)
124 {
125   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
126   MPI_Comm    comm = mat->comm;
127   int         numtids = aij->numtids, *owners = aij->rowners;
128   int         mytid = aij->mytid;
129   MPI_Request *send_waits,*recv_waits;
130   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
131   int         tag = 50, *owner,*starts,count;
132   InsertMode  addv;
133   Scalar      *rvalues,*svalues;
134 
135   /* make sure all processors are either in INSERTMODE or ADDMODE */
136   MPI_Allreduce((void *) &aij->insertmode,(void *) &addv,1,MPI_INT,
137                 MPI_BOR,comm);
138   if (addv == (AddValues|InsertValues)) {
139     SETERR(1,"Some processors have inserted while others have added");
140   }
141   aij->insertmode = addv; /* in case this processor had no cache */
142 
143   /*  first count number of contributors to each processor */
144   nprocs = (int *) MALLOC( 2*numtids*sizeof(int) ); CHKPTR(nprocs);
145   MEMSET(nprocs,0,2*numtids*sizeof(int)); procs = nprocs + numtids;
146   owner = (int *) MALLOC( (aij->stash.n+1)*sizeof(int) ); CHKPTR(owner);
147   for ( i=0; i<aij->stash.n; i++ ) {
148     idx = aij->stash.idx[i];
149     for ( j=0; j<numtids; j++ ) {
150       if (idx >= owners[j] && idx < owners[j+1]) {
151         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
152       }
153     }
154   }
155   nsends = 0;  for ( i=0; i<numtids; i++ ) { nsends += procs[i];}
156 
157   /* inform other processors of number of messages and max length*/
158   work = (int *) MALLOC( numtids*sizeof(int) ); CHKPTR(work);
159   MPI_Allreduce((void *) procs,(void *) work,numtids,MPI_INT,MPI_SUM,comm);
160   nreceives = work[mytid];
161   MPI_Allreduce((void *) nprocs,(void *) work,numtids,MPI_INT,MPI_MAX,comm);
162   nmax = work[mytid];
163   FREE(work);
164 
165   /* post receives:
166        1) each message will consist of ordered pairs
167      (global index,value) we store the global index as a double
168      to simplify the message passing.
169        2) since we don't know how long each individual message is we
170      allocate the largest needed buffer for each receive. Potentially
171      this is a lot of wasted space.
172 
173 
174        This could be done better.
175   */
176   rvalues = (Scalar *) MALLOC(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
177   CHKPTR(rvalues);
178   recv_waits = (MPI_Request *) MALLOC((nreceives+1)*sizeof(MPI_Request));
179   CHKPTR(recv_waits);
180   for ( i=0; i<nreceives; i++ ) {
181     MPI_Irecv((void *)(rvalues+3*nmax*i),3*nmax,MPI_SCALAR,MPI_ANY_SOURCE,tag,
182               comm,recv_waits+i);
183   }
184 
185   /* do sends:
186       1) starts[i] gives the starting index in svalues for stuff going to
187          the ith processor
188   */
189   svalues = (Scalar *) MALLOC( 3*(aij->stash.n+1)*sizeof(Scalar) );
190   CHKPTR(svalues);
191   send_waits = (MPI_Request *) MALLOC( (nsends+1)*sizeof(MPI_Request));
192   CHKPTR(send_waits);
193   starts = (int *) MALLOC( numtids*sizeof(int) ); CHKPTR(starts);
194   starts[0] = 0;
195   for ( i=1; i<numtids; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
196   for ( i=0; i<aij->stash.n; i++ ) {
197     svalues[3*starts[owner[i]]]       = (Scalar)  aij->stash.idx[i];
198     svalues[3*starts[owner[i]]+1]     = (Scalar)  aij->stash.idy[i];
199     svalues[3*(starts[owner[i]]++)+2] =  aij->stash.array[i];
200   }
201   FREE(owner);
202   starts[0] = 0;
203   for ( i=1; i<numtids; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
204   count = 0;
205   for ( i=0; i<numtids; i++ ) {
206     if (procs[i]) {
207       MPI_Isend((void*)(svalues+3*starts[i]),3*nprocs[i],MPI_SCALAR,i,tag,
208                 comm,send_waits+count++);
209     }
210   }
211   FREE(starts); FREE(nprocs);
212 
213   /* Free cache space */
214   aij->stash.nmax = aij->stash.n = 0;
215   if (aij->stash.array){ FREE(aij->stash.array); aij->stash.array = 0;}
216 
217   aij->svalues    = svalues;       aij->rvalues = rvalues;
218   aij->nsends     = nsends;         aij->nrecvs = nreceives;
219   aij->send_waits = send_waits; aij->recv_waits = recv_waits;
220   aij->rmax       = nmax;
221 
222   return 0;
223 }
224 extern int MatSetUpMultiply_MPIAIJ(Mat);
225 
226 static int MatEndAssemble_MPIAIJ(Mat mat)
227 {
228   int        ierr;
229   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
230 
231   MPI_Status  *send_status,recv_status;
232   int         imdex,nrecvs = aij->nrecvs, count = nrecvs, i, n;
233   int         row,col;
234   Scalar      *values,val;
235   InsertMode  addv = aij->insertmode;
236 
237   /*  wait on receives */
238   while (count) {
239     MPI_Waitany(nrecvs,aij->recv_waits,&imdex,&recv_status);
240     /* unpack receives into our local space */
241     values = aij->rvalues + 3*imdex*aij->rmax;
242     MPI_Get_count(&recv_status,MPI_SCALAR,&n);
243     n = n/3;
244     for ( i=0; i<n; i++ ) {
245       row = (int) PETSCREAL(values[3*i]) - aij->rstart;
246       col = (int) PETSCREAL(values[3*i+1]);
247       val = values[3*i+2];
248       if (col >= aij->cstart && col < aij->cend) {
249           col -= aij->cstart;
250         MatSetValues(aij->A,1,&row,1,&col,&val,addv);
251       }
252       else {
253         if (aij->assembled) {
254           if (!aij->colmap) {ierr = CreateColmap(mat); CHKERR(ierr);}
255           col = aij->colmap[col] - 1;
256           if (col < 0) {
257             SETERR(1,"Cannot insert new off diagonal block nonzero in\
258                      already\
259                      assembled matrix. Contact petsc-maint@mcs.anl.gov\
260                      if your need this feature");
261           }
262         }
263         MatSetValues(aij->B,1,&row,1,&col,&val,addv);
264       }
265     }
266     count--;
267   }
268   FREE(aij->recv_waits); FREE(aij->rvalues);
269 
270   /* wait on sends */
271   if (aij->nsends) {
272     send_status = (MPI_Status *) MALLOC( aij->nsends*sizeof(MPI_Status) );
273     CHKPTR(send_status);
274     MPI_Waitall(aij->nsends,aij->send_waits,send_status);
275     FREE(send_status);
276   }
277   FREE(aij->send_waits); FREE(aij->svalues);
278 
279   aij->insertmode = NotSetValues;
280   ierr = MatBeginAssembly(aij->A); CHKERR(ierr);
281   ierr = MatEndAssembly(aij->A); CHKERR(ierr);
282 
283   if (!aij->assembled) {
284     ierr = MatSetUpMultiply_MPIAIJ(mat); CHKERR(ierr);
285   }
286   ierr = MatBeginAssembly(aij->B); CHKERR(ierr);
287   ierr = MatEndAssembly(aij->B); CHKERR(ierr);
288   aij->assembled = 1;
289   return 0;
290 }
291 
292 static int MatZeroEntries_MPIAIJ(Mat A)
293 {
294   Mat_MPIAIJ *l = (Mat_MPIAIJ *) A->data;
295 
296   MatZeroEntries(l->A); MatZeroEntries(l->B);
297   return 0;
298 }
299 
300 /* again this uses the same basic stratagy as in the assembly and
301    scatter create routines, we should try to do it systemamatically
302    if we can figure out the proper level of generality. */
303 
304 /* the code does not do the diagonal entries correctly unless the
305    matrix is square and the column and row owerships are identical.
306    This is a BUG. The only way to fix it seems to be to access
307    aij->A and aij->B directly and not through the MatZeroRows()
308    routine.
309 */
310 static int MatZeroRows_MPIAIJ(Mat A,IS is,Scalar *diag)
311 {
312   Mat_MPIAIJ     *l = (Mat_MPIAIJ *) A->data;
313   int            i,ierr,N, *rows,*owners = l->rowners,numtids = l->numtids;
314   int            *procs,*nprocs,j,found,idx,nsends,*work;
315   int            nmax,*svalues,*starts,*owner,nrecvs,mytid = l->mytid;
316   int            *rvalues,tag = 67,count,base,slen,n,*source;
317   int            *lens,imdex,*lrows,*values;
318   MPI_Comm       comm = A->comm;
319   MPI_Request    *send_waits,*recv_waits;
320   MPI_Status     recv_status,*send_status;
321   IS             istmp;
322 
323   if (!l->assembled) SETERR(1,"MatZeroRows_MPIAIJ: must assemble matrix first");
324   ierr = ISGetLocalSize(is,&N); CHKERR(ierr);
325   ierr = ISGetIndices(is,&rows); CHKERR(ierr);
326 
327   /*  first count number of contributors to each processor */
328   nprocs = (int *) MALLOC( 2*numtids*sizeof(int) ); CHKPTR(nprocs);
329   MEMSET(nprocs,0,2*numtids*sizeof(int)); procs = nprocs + numtids;
330   owner = (int *) MALLOC((N+1)*sizeof(int)); CHKPTR(owner); /* see note*/
331   for ( i=0; i<N; i++ ) {
332     idx = rows[i];
333     found = 0;
334     for ( j=0; j<numtids; j++ ) {
335       if (idx >= owners[j] && idx < owners[j+1]) {
336         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
337       }
338     }
339     if (!found) SETERR(1,"Imdex out of range");
340   }
341   nsends = 0;  for ( i=0; i<numtids; i++ ) { nsends += procs[i];}
342 
343   /* inform other processors of number of messages and max length*/
344   work = (int *) MALLOC( numtids*sizeof(int) ); CHKPTR(work);
345   MPI_Allreduce((void *) procs,(void *) work,numtids,MPI_INT,MPI_SUM,comm);
346   nrecvs = work[mytid];
347   MPI_Allreduce((void *) nprocs,(void *) work,numtids,MPI_INT,MPI_MAX,comm);
348   nmax = work[mytid];
349   FREE(work);
350 
351   /* post receives:   */
352   rvalues = (int *) MALLOC((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
353   CHKPTR(rvalues);
354   recv_waits = (MPI_Request *) MALLOC((nrecvs+1)*sizeof(MPI_Request));
355   CHKPTR(recv_waits);
356   for ( i=0; i<nrecvs; i++ ) {
357     MPI_Irecv((void *)(rvalues+nmax*i),nmax,MPI_INT,MPI_ANY_SOURCE,tag,
358               comm,recv_waits+i);
359   }
360 
361   /* do sends:
362       1) starts[i] gives the starting index in svalues for stuff going to
363          the ith processor
364   */
365   svalues = (int *) MALLOC( (N+1)*sizeof(int) ); CHKPTR(svalues);
366   send_waits = (MPI_Request *) MALLOC( (nsends+1)*sizeof(MPI_Request));
367   CHKPTR(send_waits);
368   starts = (int *) MALLOC( (numtids+1)*sizeof(int) ); CHKPTR(starts);
369   starts[0] = 0;
370   for ( i=1; i<numtids; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
371   for ( i=0; i<N; i++ ) {
372     svalues[starts[owner[i]]++] = rows[i];
373   }
374   ISRestoreIndices(is,&rows);
375 
376   starts[0] = 0;
377   for ( i=1; i<numtids+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
378   count = 0;
379   for ( i=0; i<numtids; i++ ) {
380     if (procs[i]) {
381       MPI_Isend((void*)(svalues+starts[i]),nprocs[i],MPI_INT,i,tag,
382                 comm,send_waits+count++);
383     }
384   }
385   FREE(starts);
386 
387   base = owners[mytid];
388 
389   /*  wait on receives */
390   lens = (int *) MALLOC( 2*(nrecvs+1)*sizeof(int) ); CHKPTR(lens);
391   source = lens + nrecvs;
392   count = nrecvs; slen = 0;
393   while (count) {
394     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
395     /* unpack receives into our local space */
396     MPI_Get_count(&recv_status,MPI_INT,&n);
397     source[imdex]  = recv_status.MPI_SOURCE;
398     lens[imdex]  = n;
399     slen += n;
400     count--;
401   }
402   FREE(recv_waits);
403 
404   /* move the data into the send scatter */
405   lrows = (int *) MALLOC( slen*sizeof(int) ); CHKPTR(lrows);
406   count = 0;
407   for ( i=0; i<nrecvs; i++ ) {
408     values = rvalues + i*nmax;
409     for ( j=0; j<lens[i]; j++ ) {
410       lrows[count++] = values[j] - base;
411     }
412   }
413   FREE(rvalues); FREE(lens);
414   FREE(owner); FREE(nprocs);
415 
416   /* actually zap the local rows */
417   ierr = ISCreateSequential(slen,lrows,&istmp); CHKERR(ierr);  FREE(lrows);
418   ierr = MatZeroRows(l->A,istmp,diag); CHKERR(ierr);
419   ierr = MatZeroRows(l->B,istmp,0); CHKERR(ierr);
420   ierr = ISDestroy(istmp); CHKERR(ierr);
421 
422   /* wait on sends */
423   if (nsends) {
424     send_status = (MPI_Status *) MALLOC( nsends*sizeof(MPI_Status) );
425     CHKPTR(send_status);
426     MPI_Waitall(nsends,send_waits,send_status);
427     FREE(send_status);
428   }
429   FREE(send_waits); FREE(svalues);
430 
431 
432   return 0;
433 }
434 
435 static int MatMult_MPIAIJ(Mat aijin,Vec xx,Vec yy)
436 {
437   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) aijin->data;
438   int        ierr;
439   if (!aij->assembled) SETERR(1,"MatMult_MPIAIJ: must assemble matrix first");
440   ierr = VecScatterBegin(xx,0,aij->lvec,0,InsertValues,ScatterAll,aij->Mvctx);
441   CHKERR(ierr);
442   ierr = MatMult(aij->A,xx,yy); CHKERR(ierr);
443   ierr = VecScatterEnd(xx,0,aij->lvec,0,InsertValues,ScatterAll,aij->Mvctx);
444   CHKERR(ierr);
445   ierr = MatMultAdd(aij->B,aij->lvec,yy,yy); CHKERR(ierr);
446   return 0;
447 }
448 
449 static int MatMultAdd_MPIAIJ(Mat aijin,Vec xx,Vec yy,Vec zz)
450 {
451   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) aijin->data;
452   int        ierr;
453   if (!aij->assembled) SETERR(1,"MatMult_MPIAIJ: must assemble matrix first");
454   ierr = VecScatterBegin(xx,0,aij->lvec,0,InsertValues,ScatterAll,aij->Mvctx);
455   CHKERR(ierr);
456   ierr = MatMultAdd(aij->A,xx,yy,zz); CHKERR(ierr);
457   ierr = VecScatterEnd(xx,0,aij->lvec,0,InsertValues,ScatterAll,aij->Mvctx);
458   CHKERR(ierr);
459   ierr = MatMultAdd(aij->B,aij->lvec,zz,zz); CHKERR(ierr);
460   return 0;
461 }
462 
463 static int MatMultTrans_MPIAIJ(Mat aijin,Vec xx,Vec yy)
464 {
465   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) aijin->data;
466   int        ierr;
467 
468   if (!aij->assembled)
469     SETERR(1,"MatMulTrans_MPIAIJ: must assemble matrix first");
470   /* do nondiagonal part */
471   ierr = MatMultTrans(aij->B,xx,aij->lvec); CHKERR(ierr);
472   /* send it on its way */
473   ierr = VecScatterBegin(aij->lvec,0,yy,0,AddValues,
474                          ScatterAll|ScatterReverse,aij->Mvctx); CHKERR(ierr);
475   /* do local part */
476   ierr = MatMultTrans(aij->A,xx,yy); CHKERR(ierr);
477   /* receive remote parts: note this assumes the values are not actually */
478   /* inserted in yy until the next line, which is true for my implementation*/
479   /* but is not perhaps always true. */
480   ierr = VecScatterEnd(aij->lvec,0,yy,0,AddValues,ScatterAll|ScatterReverse,
481                          aij->Mvctx); CHKERR(ierr);
482   return 0;
483 }
484 
485 static int MatMultTransAdd_MPIAIJ(Mat aijin,Vec xx,Vec yy,Vec zz)
486 {
487   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) aijin->data;
488   int        ierr;
489 
490   if (!aij->assembled)
491     SETERR(1,"MatMulTransAdd_MPIAIJ: must assemble matrix first");
492   /* do nondiagonal part */
493   ierr = MatMultTrans(aij->B,xx,aij->lvec); CHKERR(ierr);
494   /* send it on its way */
495   ierr = VecScatterBegin(aij->lvec,0,zz,0,AddValues,
496                          ScatterAll|ScatterReverse,aij->Mvctx); CHKERR(ierr);
497   /* do local part */
498   ierr = MatMultTransAdd(aij->A,xx,yy,zz); CHKERR(ierr);
499   /* receive remote parts: note this assumes the values are not actually */
500   /* inserted in yy until the next line, which is true for my implementation*/
501   /* but is not perhaps always true. */
502   ierr = VecScatterEnd(aij->lvec,0,zz,0,AddValues,ScatterAll|ScatterReverse,
503                          aij->Mvctx); CHKERR(ierr);
504   return 0;
505 }
506 
507 /*
508   This only works correctly for square matrices where the subblock A->A is the
509    diagonal block
510 */
511 static int MatGetDiagonal_MPIAIJ(Mat Ain,Vec v)
512 {
513   Mat_MPIAIJ *A = (Mat_MPIAIJ *) Ain->data;
514   if (!A->assembled) SETERR(1,"MatGetDiag_MPIAIJ: must assemble matrix first");
515   return MatGetDiagonal(A->A,v);
516 }
517 
518 static int MatDestroy_MPIAIJ(PetscObject obj)
519 {
520   Mat        mat = (Mat) obj;
521   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
522   int        ierr;
523 #if defined(PETSC_LOG)
524   PLogObjectState(obj,"Rows %d Cols %d",aij->M,aij->N);
525 #endif
526   FREE(aij->rowners);
527   ierr = MatDestroy(aij->A); CHKERR(ierr);
528   ierr = MatDestroy(aij->B); CHKERR(ierr);
529   if (aij->colmap) FREE(aij->colmap);
530   if (aij->garray) FREE(aij->garray);
531   if (aij->lvec) VecDestroy(aij->lvec);
532   if (aij->Mvctx) VecScatterCtxDestroy(aij->Mvctx);
533   FREE(aij);
534   PLogObjectDestroy(mat);
535   PETSCHEADERDESTROY(mat);
536   return 0;
537 }
538 
539 static int MatView_MPIAIJ(PetscObject obj,Viewer viewer)
540 {
541   Mat        mat = (Mat) obj;
542   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
543   int        ierr;
544   PetscObject vobj = (PetscObject) viewer;
545 
546   if (!viewer) { /* so that viewers may be used from debuggers */
547     viewer = STDOUT_VIEWER; vobj = (PetscObject) viewer;
548   }
549   if (!aij->assembled) SETERR(1,"MatView_MPIAIJ: must assemble matrix first");
550   if (vobj->cookie == VIEWER_COOKIE) {
551     FILE *fd = ViewerFileGetPointer(viewer);
552     if (vobj->type == FILE_VIEWER) {
553       MPE_Seq_begin(mat->comm,1);
554       fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
555              aij->mytid,aij->m,aij->rstart,aij->rend,aij->n,aij->cstart,
556              aij->cend);
557       ierr = MatView(aij->A,viewer); CHKERR(ierr);
558       ierr = MatView(aij->B,viewer); CHKERR(ierr);
559       fflush(fd);
560       MPE_Seq_end(mat->comm,1);
561     }
562     else if (vobj->type == FILES_VIEWER) {
563       int numtids = aij->numtids, mytid = aij->mytid;
564       if (numtids == 1) {
565         ierr = MatView(aij->A,viewer); CHKERR(ierr);
566       }
567       else {
568         /* assemble the entire matrix onto first processor. */
569         Mat     A;
570         Mat_AIJ *Aloc;
571         int     M = aij->M, N = aij->N,m,*ai,*aj,row,*cols,i,*ct;
572         Scalar  *a;
573 
574         if (!mytid) {
575           ierr = MatCreateMPIAIJ(mat->comm,M,N,M,N,0,0,0,0,&A);
576         }
577         else {
578           ierr = MatCreateMPIAIJ(mat->comm,0,0,M,N,0,0,0,0,&A);
579         }
580         CHKERR(ierr);
581 
582         /* copy over the A part */
583         Aloc = (Mat_AIJ*) aij->A->data;
584         m = Aloc->m; ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
585         row = aij->rstart;
586         for ( i=0; i<ai[m]; i++ ) {aj[i] += aij->cstart - 1;}
587         for ( i=0; i<m; i++ ) {
588           ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],aj,a,InsertValues);
589           CHKERR(ierr);
590           row++; a += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
591         }
592         aj = Aloc->j;
593         for ( i=0; i<ai[m]; i++ ) {aj[i] -= aij->cstart - 1;}
594 
595         /* copy over the B part */
596         Aloc = (Mat_AIJ*) aij->B->data;
597         m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
598         row = aij->rstart;
599         ct = cols = (int *) MALLOC( (ai[m]+1)*sizeof(int) ); CHKPTR(cols);
600         for ( i=0; i<ai[m]; i++ ) {cols[i] = aij->garray[aj[i]-1];}
601         for ( i=0; i<m; i++ ) {
602           ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],cols,a,InsertValues);
603           CHKERR(ierr);
604           row++; a += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
605         }
606         FREE(ct);
607         ierr = MatBeginAssembly(A); CHKERR(ierr);
608         ierr = MatEndAssembly(A); CHKERR(ierr);
609         if (!mytid) {
610           ierr = MatView(((Mat_MPIAIJ*)(A->data))->A,viewer); CHKERR(ierr);
611         }
612         ierr = MatDestroy(A); CHKERR(ierr);
613       }
614     }
615   }
616   return 0;
617 }
618 
619 extern int MatMarkDiag_AIJ(Mat_AIJ  *);
620 /*
621     This has to provide several versions.
622 
623      1) per sequential
624      2) a) use only local smoothing updating outer values only once.
625         b) local smoothing updating outer values each inner iteration
626      3) color updating out values betwen colors.
627 */
628 static int MatRelax_MPIAIJ(Mat matin,Vec bb,double omega,int flag,double shift,
629                         int its,Vec xx)
630 {
631   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
632   Mat        AA = mat->A, BB = mat->B;
633   Mat_AIJ    *A = (Mat_AIJ *) AA->data, *B = (Mat_AIJ *)BB->data;
634   Scalar     zero = 0.0,*b,*x,*xs,*ls,d,*v,sum,scale,*t,*ts;
635   int        ierr,*idx, *diag;
636   int        n = mat->n, m = mat->m, i;
637   Vec        tt;
638 
639   if (!mat->assembled) SETERR(1,"MatRelax_MPIAIJ: must assemble matrix first");
640 
641   VecGetArray(xx,&x); VecGetArray(bb,&b); VecGetArray(mat->lvec,&ls);
642   xs = x -1; /* shift by one for index start of 1 */
643   ls--;
644   if (!A->diag) {if ((ierr = MatMarkDiag_AIJ(A))) return ierr;}
645   diag = A->diag;
646   if (flag == SOR_APPLY_UPPER || flag == SOR_APPLY_LOWER) {
647     SETERR(1,"That option not yet support for parallel AIJ matrices");
648   }
649   if (flag & SOR_EISENSTAT) {
650     /* Let  A = L + U + D; where L is lower trianglar,
651     U is upper triangular, E is diagonal; This routine applies
652 
653             (L + E)^{-1} A (U + E)^{-1}
654 
655     to a vector efficiently using Eisenstat's trick. This is for
656     the case of SSOR preconditioner, so E is D/omega where omega
657     is the relaxation factor.
658     */
659     ierr = VecCreate(xx,&tt); CHKERR(ierr);
660     VecGetArray(tt,&t);
661     scale = (2.0/omega) - 1.0;
662     /*  x = (E + U)^{-1} b */
663     VecSet(&zero,mat->lvec);
664     ierr = VecPipelineBegin(xx,0,mat->lvec,0,InsertValues,PipelineUp,
665                               mat->Mvctx); CHKERR(ierr);
666     for ( i=m-1; i>-1; i-- ) {
667       n    = A->i[i+1] - diag[i] - 1;
668       idx  = A->j + diag[i];
669       v    = A->a + diag[i];
670       sum  = b[i];
671       SPARSEDENSEMDOT(sum,xs,v,idx,n);
672       d    = shift + A->a[diag[i]-1];
673       n    = B->i[i+1] - B->i[i];
674       idx  = B->j + B->i[i] - 1;
675       v    = B->a + B->i[i] - 1;
676       SPARSEDENSEMDOT(sum,ls,v,idx,n);
677       x[i] = omega*(sum/d);
678     }
679     ierr = VecPipelineEnd(xx,0,mat->lvec,0,InsertValues,PipelineUp,
680                             mat->Mvctx); CHKERR(ierr);
681 
682     /*  t = b - (2*E - D)x */
683     v = A->a;
684     for ( i=0; i<m; i++ ) { t[i] = b[i] - scale*(v[*diag++ - 1])*x[i]; }
685 
686     /*  t = (E + L)^{-1}t */
687     ts = t - 1; /* shifted by one for index start of a or mat->j*/
688     diag = A->diag;
689     VecSet(&zero,mat->lvec);
690     ierr = VecPipelineBegin(tt,0,mat->lvec,0,InsertValues,PipelineDown,
691                                                  mat->Mvctx); CHKERR(ierr);
692     for ( i=0; i<m; i++ ) {
693       n    = diag[i] - A->i[i];
694       idx  = A->j + A->i[i] - 1;
695       v    = A->a + A->i[i] - 1;
696       sum  = t[i];
697       SPARSEDENSEMDOT(sum,ts,v,idx,n);
698       d    = shift + A->a[diag[i]-1];
699       n    = B->i[i+1] - B->i[i];
700       idx  = B->j + B->i[i] - 1;
701       v    = B->a + B->i[i] - 1;
702       SPARSEDENSEMDOT(sum,ls,v,idx,n);
703       t[i] = omega*(sum/d);
704     }
705     ierr = VecPipelineEnd(tt,0,mat->lvec,0,InsertValues,PipelineDown,
706                                                     mat->Mvctx); CHKERR(ierr);
707     /*  x = x + t */
708     for ( i=0; i<m; i++ ) { x[i] += t[i]; }
709     VecDestroy(tt);
710     return 0;
711   }
712 
713 
714   if ((flag & SOR_SYMMETRIC_SWEEP) == SOR_SYMMETRIC_SWEEP){
715     if (flag & SOR_ZERO_INITIAL_GUESS) {
716       VecSet(&zero,mat->lvec); VecSet(&zero,xx);
717     }
718     else {
719       ierr=VecScatterBegin(xx,0,mat->lvec,0,InsertValues,ScatterUp,mat->Mvctx);
720       CHKERR(ierr);
721       ierr = VecScatterEnd(xx,0,mat->lvec,0,InsertValues,ScatterUp,mat->Mvctx);
722       CHKERR(ierr);
723     }
724     while (its--) {
725       /* go down through the rows */
726       ierr = VecPipelineBegin(xx,0,mat->lvec,0,InsertValues,PipelineDown,
727                               mat->Mvctx); CHKERR(ierr);
728       for ( i=0; i<m; i++ ) {
729         n    = A->i[i+1] - A->i[i];
730         idx  = A->j + A->i[i] - 1;
731         v    = A->a + A->i[i] - 1;
732         sum  = b[i];
733         SPARSEDENSEMDOT(sum,xs,v,idx,n);
734         d    = shift + A->a[diag[i]-1];
735         n    = B->i[i+1] - B->i[i];
736         idx  = B->j + B->i[i] - 1;
737         v    = B->a + B->i[i] - 1;
738         SPARSEDENSEMDOT(sum,ls,v,idx,n);
739         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
740       }
741       ierr = VecPipelineEnd(xx,0,mat->lvec,0,InsertValues,PipelineDown,
742                             mat->Mvctx); CHKERR(ierr);
743       /* come up through the rows */
744       ierr = VecPipelineBegin(xx,0,mat->lvec,0,InsertValues,PipelineUp,
745                               mat->Mvctx); CHKERR(ierr);
746       for ( i=m-1; i>-1; i-- ) {
747         n    = A->i[i+1] - A->i[i];
748         idx  = A->j + A->i[i] - 1;
749         v    = A->a + A->i[i] - 1;
750         sum  = b[i];
751         SPARSEDENSEMDOT(sum,xs,v,idx,n);
752         d    = shift + A->a[diag[i]-1];
753         n    = B->i[i+1] - B->i[i];
754         idx  = B->j + B->i[i] - 1;
755         v    = B->a + B->i[i] - 1;
756         SPARSEDENSEMDOT(sum,ls,v,idx,n);
757         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
758       }
759       ierr = VecPipelineEnd(xx,0,mat->lvec,0,InsertValues,PipelineUp,
760                             mat->Mvctx); CHKERR(ierr);
761     }
762   }
763   else if (flag & SOR_FORWARD_SWEEP){
764     if (flag & SOR_ZERO_INITIAL_GUESS) {
765       VecSet(&zero,mat->lvec);
766       ierr = VecPipelineBegin(xx,0,mat->lvec,0,InsertValues,PipelineDown,
767                               mat->Mvctx); CHKERR(ierr);
768       for ( i=0; i<m; i++ ) {
769         n    = diag[i] - A->i[i];
770         idx  = A->j + A->i[i] - 1;
771         v    = A->a + A->i[i] - 1;
772         sum  = b[i];
773         SPARSEDENSEMDOT(sum,xs,v,idx,n);
774         d    = shift + A->a[diag[i]-1];
775         n    = B->i[i+1] - B->i[i];
776         idx  = B->j + B->i[i] - 1;
777         v    = B->a + B->i[i] - 1;
778         SPARSEDENSEMDOT(sum,ls,v,idx,n);
779         x[i] = omega*(sum/d);
780       }
781       ierr = VecPipelineEnd(xx,0,mat->lvec,0,InsertValues,PipelineDown,
782                             mat->Mvctx); CHKERR(ierr);
783       its--;
784     }
785     while (its--) {
786       ierr=VecScatterBegin(xx,0,mat->lvec,0,InsertValues,ScatterUp,mat->Mvctx);
787       CHKERR(ierr);
788       ierr = VecScatterEnd(xx,0,mat->lvec,0,InsertValues,ScatterUp,mat->Mvctx);
789       CHKERR(ierr);
790       ierr = VecPipelineBegin(xx,0,mat->lvec,0,InsertValues,PipelineDown,
791                               mat->Mvctx); CHKERR(ierr);
792       for ( i=0; i<m; i++ ) {
793         n    = A->i[i+1] - A->i[i];
794         idx  = A->j + A->i[i] - 1;
795         v    = A->a + A->i[i] - 1;
796         sum  = b[i];
797         SPARSEDENSEMDOT(sum,xs,v,idx,n);
798         d    = shift + A->a[diag[i]-1];
799         n    = B->i[i+1] - B->i[i];
800         idx  = B->j + B->i[i] - 1;
801         v    = B->a + B->i[i] - 1;
802         SPARSEDENSEMDOT(sum,ls,v,idx,n);
803         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
804       }
805       ierr = VecPipelineEnd(xx,0,mat->lvec,0,InsertValues,PipelineDown,
806                             mat->Mvctx); CHKERR(ierr);
807     }
808   }
809   else if (flag & SOR_BACKWARD_SWEEP){
810     if (flag & SOR_ZERO_INITIAL_GUESS) {
811       VecSet(&zero,mat->lvec);
812       ierr = VecPipelineBegin(xx,0,mat->lvec,0,InsertValues,PipelineUp,
813                               mat->Mvctx); CHKERR(ierr);
814       for ( i=m-1; i>-1; i-- ) {
815         n    = A->i[i+1] - diag[i] - 1;
816         idx  = A->j + diag[i];
817         v    = A->a + diag[i];
818         sum  = b[i];
819         SPARSEDENSEMDOT(sum,xs,v,idx,n);
820         d    = shift + A->a[diag[i]-1];
821         n    = B->i[i+1] - B->i[i];
822         idx  = B->j + B->i[i] - 1;
823         v    = B->a + B->i[i] - 1;
824         SPARSEDENSEMDOT(sum,ls,v,idx,n);
825         x[i] = omega*(sum/d);
826       }
827       ierr = VecPipelineEnd(xx,0,mat->lvec,0,InsertValues,PipelineUp,
828                             mat->Mvctx); CHKERR(ierr);
829       its--;
830     }
831     while (its--) {
832       ierr = VecScatterBegin(xx,0,mat->lvec,0,InsertValues,ScatterDown,
833                             mat->Mvctx); CHKERR(ierr);
834       ierr = VecScatterEnd(xx,0,mat->lvec,0,InsertValues,ScatterDown,
835                             mat->Mvctx); CHKERR(ierr);
836       ierr = VecPipelineBegin(xx,0,mat->lvec,0,InsertValues,PipelineUp,
837                               mat->Mvctx); CHKERR(ierr);
838       for ( i=m-1; i>-1; i-- ) {
839         n    = A->i[i+1] - A->i[i];
840         idx  = A->j + A->i[i] - 1;
841         v    = A->a + A->i[i] - 1;
842         sum  = b[i];
843         SPARSEDENSEMDOT(sum,xs,v,idx,n);
844         d    = shift + A->a[diag[i]-1];
845         n    = B->i[i+1] - B->i[i];
846         idx  = B->j + B->i[i] - 1;
847         v    = B->a + B->i[i] - 1;
848         SPARSEDENSEMDOT(sum,ls,v,idx,n);
849         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
850       }
851       ierr = VecPipelineEnd(xx,0,mat->lvec,0,InsertValues,PipelineUp,
852                             mat->Mvctx); CHKERR(ierr);
853     }
854   }
855   else if ((flag & SOR_LOCAL_SYMMETRIC_SWEEP) == SOR_LOCAL_SYMMETRIC_SWEEP){
856     if (flag & SOR_ZERO_INITIAL_GUESS) {
857       return MatRelax(mat->A,bb,omega,flag,shift,its,xx);
858     }
859     ierr=VecScatterBegin(xx,0,mat->lvec,0,InsertValues,ScatterAll,mat->Mvctx);
860     CHKERR(ierr);
861     ierr = VecScatterEnd(xx,0,mat->lvec,0,InsertValues,ScatterAll,mat->Mvctx);
862     CHKERR(ierr);
863     while (its--) {
864       /* go down through the rows */
865       for ( i=0; i<m; i++ ) {
866         n    = A->i[i+1] - A->i[i];
867         idx  = A->j + A->i[i] - 1;
868         v    = A->a + A->i[i] - 1;
869         sum  = b[i];
870         SPARSEDENSEMDOT(sum,xs,v,idx,n);
871         d    = shift + A->a[diag[i]-1];
872         n    = B->i[i+1] - B->i[i];
873         idx  = B->j + B->i[i] - 1;
874         v    = B->a + B->i[i] - 1;
875         SPARSEDENSEMDOT(sum,ls,v,idx,n);
876         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
877       }
878       /* come up through the rows */
879       for ( i=m-1; i>-1; i-- ) {
880         n    = A->i[i+1] - A->i[i];
881         idx  = A->j + A->i[i] - 1;
882         v    = A->a + A->i[i] - 1;
883         sum  = b[i];
884         SPARSEDENSEMDOT(sum,xs,v,idx,n);
885         d    = shift + A->a[diag[i]-1];
886         n    = B->i[i+1] - B->i[i];
887         idx  = B->j + B->i[i] - 1;
888         v    = B->a + B->i[i] - 1;
889         SPARSEDENSEMDOT(sum,ls,v,idx,n);
890         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
891       }
892     }
893   }
894   else if (flag & SOR_LOCAL_FORWARD_SWEEP){
895     if (flag & SOR_ZERO_INITIAL_GUESS) {
896       return MatRelax(mat->A,bb,omega,flag,shift,its,xx);
897     }
898     ierr=VecScatterBegin(xx,0,mat->lvec,0,InsertValues,ScatterAll,mat->Mvctx);
899     CHKERR(ierr);
900     ierr = VecScatterEnd(xx,0,mat->lvec,0,InsertValues,ScatterAll,mat->Mvctx);
901     CHKERR(ierr);
902     while (its--) {
903       for ( i=0; i<m; i++ ) {
904         n    = A->i[i+1] - A->i[i];
905         idx  = A->j + A->i[i] - 1;
906         v    = A->a + A->i[i] - 1;
907         sum  = b[i];
908         SPARSEDENSEMDOT(sum,xs,v,idx,n);
909         d    = shift + A->a[diag[i]-1];
910         n    = B->i[i+1] - B->i[i];
911         idx  = B->j + B->i[i] - 1;
912         v    = B->a + B->i[i] - 1;
913         SPARSEDENSEMDOT(sum,ls,v,idx,n);
914         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
915       }
916     }
917   }
918   else if (flag & SOR_LOCAL_BACKWARD_SWEEP){
919     if (flag & SOR_ZERO_INITIAL_GUESS) {
920       return MatRelax(mat->A,bb,omega,flag,shift,its,xx);
921     }
922     ierr = VecScatterBegin(xx,0,mat->lvec,0,InsertValues,ScatterAll,
923                             mat->Mvctx); CHKERR(ierr);
924     ierr = VecScatterEnd(xx,0,mat->lvec,0,InsertValues,ScatterAll,
925                             mat->Mvctx); CHKERR(ierr);
926     while (its--) {
927       for ( i=m-1; i>-1; i-- ) {
928         n    = A->i[i+1] - A->i[i];
929         idx  = A->j + A->i[i] - 1;
930         v    = A->a + A->i[i] - 1;
931         sum  = b[i];
932         SPARSEDENSEMDOT(sum,xs,v,idx,n);
933         d    = shift + A->a[diag[i]-1];
934         n    = B->i[i+1] - B->i[i];
935         idx  = B->j + B->i[i] - 1;
936         v    = B->a + B->i[i] - 1;
937         SPARSEDENSEMDOT(sum,ls,v,idx,n);
938         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
939       }
940     }
941   }
942   return 0;
943 }
944 
945 static int MatGetInfo_MPIAIJ(Mat matin,int flag,int *nz,int *nzalloc,int *mem)
946 {
947   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
948   Mat        A = mat->A, B = mat->B;
949   int        ierr, isend[3], irecv[3], nzA, nzallocA, memA;
950 
951   ierr = MatGetInfo(A,MAT_LOCAL,&nzA,&nzallocA,&memA); CHKERR(ierr);
952   ierr = MatGetInfo(B,MAT_LOCAL,&isend[0],&isend[1],&isend[2]); CHKERR(ierr);
953   isend[0] += nzA; isend[1] += nzallocA; isend[2] += memA;
954   if (flag == MAT_LOCAL) {
955     *nz = isend[0]; *nzalloc = isend[1]; *mem = isend[2];
956   } else if (flag == MAT_GLOBAL_MAX) {
957     MPI_Allreduce((void *) isend,(void *) irecv,3,MPI_INT,MPI_MAX,matin->comm);
958     *nz = irecv[0]; *nzalloc = irecv[1]; *mem = irecv[2];
959   } else if (flag == MAT_GLOBAL_SUM) {
960     MPI_Allreduce((void *) isend,(void *) irecv,3,MPI_INT,MPI_SUM,matin->comm);
961     *nz = irecv[0]; *nzalloc = irecv[1]; *mem = irecv[2];
962   }
963   return 0;
964 }
965 
966 static int MatSetOption_MPIAIJ(Mat aijin,int op)
967 {
968   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) aijin->data;
969 
970   if      (op == NO_NEW_NONZERO_LOCATIONS)  {
971     MatSetOption(aij->A,op);
972     MatSetOption(aij->B,op);
973   }
974   else if (op == YES_NEW_NONZERO_LOCATIONS) {
975     MatSetOption(aij->A,op);
976     MatSetOption(aij->B,op);
977   }
978   else if (op == COLUMN_ORIENTED) SETERR(1,"Column oriented not supported");
979   return 0;
980 }
981 
982 static int MatGetSize_MPIAIJ(Mat matin,int *m,int *n)
983 {
984   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
985   *m = mat->M; *n = mat->N;
986   return 0;
987 }
988 
989 static int MatGetLocalSize_MPIAIJ(Mat matin,int *m,int *n)
990 {
991   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
992   *m = mat->m; *n = mat->n;
993   return 0;
994 }
995 
996 static int MatGetOwnershipRange_MPIAIJ(Mat matin,int *m,int *n)
997 {
998   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
999   *m = mat->rstart; *n = mat->rend;
1000   return 0;
1001 }
1002 
1003 static int MatGetRow_MPIAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
1004 {
1005   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1006   Scalar     *vworkA, *vworkB, **pvA, **pvB;
1007   int        i, ierr, *cworkA, *cworkB, **pcA, **pcB, cstart = mat->cstart;
1008   int        nztot, nzA, nzB, lrow, rstart = mat->rstart, rend = mat->rend;
1009 
1010   if (!mat->assembled)
1011     SETERR(1,"MatGetRow_MPIAIJ: Must assemble matrix first.");
1012   if (row < rstart || row >= rend)
1013     SETERR(1,"MatGetRow_MPIAIJ: Currently you can get only local rows.")
1014   lrow = row - rstart;
1015 
1016   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
1017   if (!v)   {pvA = 0; pvB = 0;}
1018   if (!idx) {pcA = 0; if (!v) pcB = 0;}
1019   ierr = MatGetRow(mat->A,lrow,&nzA,pcA,pvA); CHKERR(ierr);
1020   ierr = MatGetRow(mat->B,lrow,&nzB,pcB,pvB); CHKERR(ierr);
1021   nztot = nzA + nzB;
1022 
1023   if (v  || idx) {
1024     if (nztot) {
1025       /* Sort by increasing column numbers, assuming A and B already sorted */
1026       int imark, imark2;
1027       for (i=0; i<nzB; i++) cworkB[i] = mat->garray[cworkB[i]];
1028       if (v) {
1029         *v = (Scalar *) MALLOC( (nztot)*sizeof(Scalar) ); CHKPTR(*v);
1030         for ( i=0; i<nzB; i++ ) {
1031           if (cworkB[i] < cstart)   (*v)[i] = vworkB[i];
1032           else break;
1033         }
1034         imark = i;
1035         for ( i=0; i<nzA; i++ )     (*v)[imark+i] = vworkA[i];
1036         imark2 = imark+nzA;
1037         for ( i=imark; i<nzB; i++ ) (*v)[imark2+i] = vworkB[i];
1038       }
1039       if (idx) {
1040         *idx = (int *) MALLOC( (nztot)*sizeof(int) ); CHKPTR(*idx);
1041         for (i=0; i<nzA; i++) cworkA[i] += cstart;
1042         for ( i=0; i<nzB; i++ ) {
1043           if (cworkB[i] < cstart)   (*idx)[i] = cworkB[i];
1044           else break;
1045         }
1046         imark = i;
1047         for ( i=0; i<nzA; i++ )     (*idx)[imark+i] = cworkA[i];
1048         imark2 = imark+nzA;
1049         for ( i=imark; i<nzB; i++ ) (*idx)[imark2+i] = cworkB[i];
1050       }
1051     }
1052     else {*idx = 0; *v=0;}
1053   }
1054   *nz = nztot;
1055   ierr = MatRestoreRow(mat->A,lrow,&nzA,pcA,pvA); CHKERR(ierr);
1056   ierr = MatRestoreRow(mat->B,lrow,&nzB,pcB,pvB); CHKERR(ierr);
1057   return 0;
1058 }
1059 
1060 static int MatRestoreRow_MPIAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
1061 {
1062   if (idx) FREE(*idx);
1063   if (v) FREE(*v);
1064   return 0;
1065 }
1066 
1067 static int MatCopy_MPIAIJ(Mat,Mat *);
1068 extern int MatConvert_MPIAIJ(Mat,MATTYPE,Mat *);
1069 
1070 /* -------------------------------------------------------------------*/
1071 static struct _MatOps MatOps = {MatSetValues_MPIAIJ,
1072        MatGetRow_MPIAIJ,MatRestoreRow_MPIAIJ,
1073        MatMult_MPIAIJ,MatMultAdd_MPIAIJ,
1074        MatMultTrans_MPIAIJ,MatMultTransAdd_MPIAIJ,
1075        0,0,0,0,
1076        0,0,
1077        MatRelax_MPIAIJ,
1078        0,
1079        MatGetInfo_MPIAIJ,0,
1080        MatCopy_MPIAIJ,
1081        MatGetDiagonal_MPIAIJ,0,0,
1082        MatBeginAssemble_MPIAIJ,MatEndAssemble_MPIAIJ,
1083        0,
1084        MatSetOption_MPIAIJ,MatZeroEntries_MPIAIJ,MatZeroRows_MPIAIJ,0,
1085        0,0,0,0,
1086        MatGetSize_MPIAIJ,MatGetLocalSize_MPIAIJ,MatGetOwnershipRange_MPIAIJ,
1087        0,0,
1088        0,MatConvert_MPIAIJ };
1089 
1090 /*@
1091 
1092       MatCreateMPIAIJ - Creates a sparse parallel matrix
1093                                  in AIJ format.
1094 
1095   Input Parameters:
1096 .   comm - MPI communicator
1097 .   m,n - number of local rows and columns (or -1 to have calculated)
1098 .   M,N - global rows and columns (or -1 to have calculated)
1099 .   d_nz - total number nonzeros in diagonal portion of matrix
1100 .   d_nzz - number of nonzeros per row in diagonal portion of matrix or null
1101 .           You must leave room for the diagonal entry even if it is zero.
1102 .   o_nz - total number nonzeros in off-diagonal portion of matrix
1103 .   o_nzz - number of nonzeros per row in off-diagonal portion of matrix
1104 .           or null. You must have at least one nonzero per row.
1105 
1106   Output parameters:
1107 .  newmat - the matrix
1108 
1109   Keywords: matrix, aij, compressed row, sparse, parallel
1110 @*/
1111 int MatCreateMPIAIJ(MPI_Comm comm,int m,int n,int M,int N,
1112                  int d_nz,int *d_nnz, int o_nz,int *o_nnz,Mat *newmat)
1113 {
1114   Mat          mat;
1115   Mat_MPIAIJ   *aij;
1116   int          ierr, i,sum[2],work[2];
1117   *newmat         = 0;
1118   PETSCHEADERCREATE(mat,_Mat,MAT_COOKIE,MATMPIAIJ,comm);
1119   PLogObjectCreate(mat);
1120   mat->data       = (void *) (aij = NEW(Mat_MPIAIJ)); CHKPTR(aij);
1121   mat->ops        = &MatOps;
1122   mat->destroy    = MatDestroy_MPIAIJ;
1123   mat->view       = MatView_MPIAIJ;
1124   mat->factor     = 0;
1125   mat->row        = 0;
1126   mat->col        = 0;
1127 
1128   mat->comm       = comm;
1129   aij->insertmode = NotSetValues;
1130   MPI_Comm_rank(comm,&aij->mytid);
1131   MPI_Comm_size(comm,&aij->numtids);
1132 
1133   if (M == -1 || N == -1) {
1134     work[0] = m; work[1] = n;
1135     MPI_Allreduce((void *) work,(void *) sum,2,MPI_INT,MPI_SUM,comm );
1136     if (M == -1) M = sum[0];
1137     if (N == -1) N = sum[1];
1138   }
1139   if (m == -1) {m = M/aij->numtids + ((M % aij->numtids) > aij->mytid);}
1140   if (n == -1) {n = N/aij->numtids + ((N % aij->numtids) > aij->mytid);}
1141   aij->m       = m;
1142   aij->n       = n;
1143   aij->N       = N;
1144   aij->M       = M;
1145 
1146   /* build local table of row and column ownerships */
1147   aij->rowners = (int *) MALLOC(2*(aij->numtids+2)*sizeof(int));
1148   CHKPTR(aij->rowners);
1149   aij->cowners = aij->rowners + aij->numtids + 1;
1150   MPI_Allgather(&m,1,MPI_INT,aij->rowners+1,1,MPI_INT,comm);
1151   aij->rowners[0] = 0;
1152   for ( i=2; i<=aij->numtids; i++ ) {
1153     aij->rowners[i] += aij->rowners[i-1];
1154   }
1155   aij->rstart = aij->rowners[aij->mytid];
1156   aij->rend   = aij->rowners[aij->mytid+1];
1157   MPI_Allgather(&n,1,MPI_INT,aij->cowners+1,1,MPI_INT,comm);
1158   aij->cowners[0] = 0;
1159   for ( i=2; i<=aij->numtids; i++ ) {
1160     aij->cowners[i] += aij->cowners[i-1];
1161   }
1162   aij->cstart = aij->cowners[aij->mytid];
1163   aij->cend   = aij->cowners[aij->mytid+1];
1164 
1165 
1166   ierr = MatCreateSequentialAIJ(m,n,d_nz,d_nnz,&aij->A); CHKERR(ierr);
1167   PLogObjectParent(mat,aij->A);
1168   ierr = MatCreateSequentialAIJ(m,N,o_nz,o_nnz,&aij->B); CHKERR(ierr);
1169   PLogObjectParent(mat,aij->B);
1170 
1171   /* build cache for off array entries formed */
1172   aij->stash.nmax = CHUNCKSIZE; /* completely arbratray number */
1173   aij->stash.n    = 0;
1174   aij->stash.array = (Scalar *) MALLOC( aij->stash.nmax*(2*sizeof(int) +
1175                             sizeof(Scalar))); CHKPTR(aij->stash.array);
1176   aij->stash.idx = (int *) (aij->stash.array + aij->stash.nmax);
1177   aij->stash.idy = (int *) (aij->stash.idx + aij->stash.nmax);
1178   aij->colmap    = 0;
1179   aij->garray    = 0;
1180 
1181   /* stuff used for matrix vector multiply */
1182   aij->lvec      = 0;
1183   aij->Mvctx     = 0;
1184   aij->assembled = 0;
1185   aij->bsinterf  = 0;
1186 
1187   *newmat = mat;
1188   return 0;
1189 }
1190 
1191 static int MatCopy_MPIAIJ(Mat matin,Mat *newmat)
1192 {
1193   Mat        mat;
1194   Mat_MPIAIJ *aij,*oldmat = (Mat_MPIAIJ *) matin->data;
1195   int        ierr, len;
1196   *newmat      = 0;
1197 
1198   if (!oldmat->assembled) SETERR(1,"Cannot copy unassembled matrix");
1199   PETSCHEADERCREATE(mat,_Mat,MAT_COOKIE,MATMPIAIJ,matin->comm);
1200   PLogObjectCreate(mat);
1201   mat->data       = (void *) (aij = NEW(Mat_MPIAIJ)); CHKPTR(aij);
1202   mat->ops        = &MatOps;
1203   mat->destroy    = MatDestroy_MPIAIJ;
1204   mat->view       = MatView_MPIAIJ;
1205   mat->factor     = matin->factor;
1206   mat->row        = 0;
1207   mat->col        = 0;
1208 
1209   aij->m          = oldmat->m;
1210   aij->n          = oldmat->n;
1211   aij->M          = oldmat->M;
1212   aij->N          = oldmat->N;
1213 
1214   aij->assembled  = 1;
1215   aij->rstart     = oldmat->rstart;
1216   aij->rend       = oldmat->rend;
1217   aij->cstart     = oldmat->cstart;
1218   aij->cend       = oldmat->cend;
1219   aij->numtids    = oldmat->numtids;
1220   aij->mytid      = oldmat->mytid;
1221   aij->insertmode = NotSetValues;
1222 
1223   aij->rowners        = (int *) MALLOC( (aij->numtids+1)*sizeof(int) );
1224   CHKPTR(aij->rowners);
1225   MEMCPY(aij->rowners,oldmat->rowners,(aij->numtids+1)*sizeof(int));
1226   aij->stash.nmax     = 0;
1227   aij->stash.n        = 0;
1228   aij->stash.array    = 0;
1229   if (oldmat->colmap) {
1230     aij->colmap      = (int *) MALLOC( (aij->N)*sizeof(int) );
1231     CHKPTR(aij->colmap);
1232     MEMCPY(aij->colmap,oldmat->colmap,(aij->N)*sizeof(int));
1233   } else aij->colmap = 0;
1234     if (oldmat->garray && (len = ((Mat_AIJ *) (oldmat->B->data))->n)) {
1235     aij->garray      = (int *) MALLOC(len*sizeof(int) ); CHKPTR(aij->garray);
1236     MEMCPY(aij->garray,oldmat->garray,len*sizeof(int));
1237   } else aij->garray = 0;
1238   mat->comm           = matin->comm;
1239   aij->bsinterf       = 0;
1240 
1241   ierr =  VecCreate(oldmat->lvec,&aij->lvec); CHKERR(ierr);
1242   PLogObjectParent(mat,aij->lvec);
1243   ierr =  VecScatterCtxCopy(oldmat->Mvctx,&aij->Mvctx); CHKERR(ierr);
1244   PLogObjectParent(mat,aij->Mvctx);
1245   ierr =  MatCopy(oldmat->A,&aij->A); CHKERR(ierr);
1246   PLogObjectParent(mat,aij->A);
1247   ierr =  MatCopy(oldmat->B,&aij->B); CHKERR(ierr);
1248   PLogObjectParent(mat,aij->B);
1249   *newmat = mat;
1250   return 0;
1251 }
1252