xref: /petsc/src/mat/impls/aij/mpi/mpiaij.c (revision da69df5f9c22a9fc6a57aaf87f53b714a6d9ea0e)
1 #ifndef lint
2 static char vcid[] = "$Id: mpiaij.c,v 1.12 1995/03/23 01:05:37 curfman Exp bsmith $";
3 #endif
4 
5 #include "mpiaij.h"
6 #include "vec/vecimpl.h"
7 #include "inline/spops.h"
8 
9 #define CHUNCKSIZE   100
10 /*
11    This is a simple minded stash. Do a linear search to determine if
12  in stash, if not add to end.
13 */
14 static int StashValues(Stash *stash,int row,int n, int *idxn,
15                        Scalar *values,InsertMode addv)
16 {
17   int    i,j,N = stash->n,found,*n_idx, *n_idy;
18   Scalar val,*n_array;
19 
20   for ( i=0; i<n; i++ ) {
21     found = 0;
22     val = *values++;
23     for ( j=0; j<N; j++ ) {
24       if ( stash->idx[j] == row && stash->idy[j] == idxn[i]) {
25         /* found a match */
26         if (addv == AddValues) stash->array[j] += val;
27         else stash->array[j] = val;
28         found = 1;
29         break;
30       }
31     }
32     if (!found) { /* not found so add to end */
33       if ( stash->n == stash->nmax ) {
34         /* allocate a larger stash */
35         n_array = (Scalar *) MALLOC( (stash->nmax + CHUNCKSIZE)*(
36                                      2*sizeof(int) + sizeof(Scalar)));
37         CHKPTR(n_array);
38         n_idx = (int *) (n_array + stash->nmax + CHUNCKSIZE);
39         n_idy = (int *) (n_idx + stash->nmax + CHUNCKSIZE);
40         MEMCPY(n_array,stash->array,stash->nmax*sizeof(Scalar));
41         MEMCPY(n_idx,stash->idx,stash->nmax*sizeof(int));
42         MEMCPY(n_idy,stash->idy,stash->nmax*sizeof(int));
43         if (stash->array) FREE(stash->array);
44         stash->array = n_array; stash->idx = n_idx; stash->idy = n_idy;
45         stash->nmax += CHUNCKSIZE;
46       }
47       stash->array[stash->n]   = val;
48       stash->idx[stash->n]     = row;
49       stash->idy[stash->n++]   = idxn[i];
50     }
51   }
52   return 0;
53 }
54 
55 /* local utility routine that creates a mapping from the global column
56 number to the local number in the off-diagonal part of the local
57 storage of the matrix.  This is done in a non scable way since the
58 length of colmap equals the global matrix length.
59 */
60 static int CreateColmap(Mat mat)
61 {
62   Matimpiaij *aij = (Matimpiaij *) mat->data;
63   Matiaij    *B = (Matiaij*) aij->B->data;
64   int        n = B->n,i;
65   aij->colmap = (int *) MALLOC( aij->N*sizeof(int) ); CHKPTR(aij->colmap);
66   MEMSET(aij->colmap,0,aij->N*sizeof(int));
67   for ( i=0; i<n; i++ ) {
68     aij->colmap[aij->garray[i]] = i+1;
69   }
70   return 0;
71 }
72 
73 static int MatiAIJInsertValues(Mat mat,int m,int *idxm,int n,
74                             int *idxn,Scalar *v,InsertMode addv)
75 {
76   Matimpiaij *aij = (Matimpiaij *) mat->data;
77   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
78   int        cstart = aij->cstart, cend = aij->cend,row,col;
79 
80   if (aij->insertmode != NotSetValues && aij->insertmode != addv) {
81     SETERR(1,"You cannot mix inserts and adds");
82   }
83   aij->insertmode = addv;
84   for ( i=0; i<m; i++ ) {
85     if (idxm[i] < 0) SETERR(1,"Negative row index");
86     if (idxm[i] >= aij->M) SETERR(1,"Row index too large");
87     if (idxm[i] >= rstart && idxm[i] < rend) {
88       row = idxm[i] - rstart;
89       for ( j=0; j<n; j++ ) {
90         if (idxn[j] < 0) SETERR(1,"Negative column index");
91         if (idxn[j] >= aij->N) SETERR(1,"Column index too large");
92         if (idxn[j] >= cstart && idxn[j] < cend){
93           col = idxn[j] - cstart;
94           ierr = MatSetValues(aij->A,1,&row,1,&col,v+i*n+j,addv);CHKERR(ierr);
95         }
96         else {
97           if (aij->assembled) {
98             if (!aij->colmap) {ierr = CreateColmap(mat); CHKERR(ierr);}
99             col = aij->colmap[idxn[j]] - 1;
100             if (col < 0) {
101               SETERR(1,"Cannot insert new off diagonal block nonzero in\
102                      already\
103                      assembled matrix. Contact petsc-maint@mcs.anl.gov\
104                      if your need this feature");
105             }
106           }
107           else col = idxn[j];
108           ierr = MatSetValues(aij->B,1,&row,1,&col,v+i*n+j,addv);CHKERR(ierr);
109         }
110       }
111     }
112     else {
113       ierr = StashValues(&aij->stash,idxm[i],n,idxn,v+i*n,addv);CHKERR(ierr);
114     }
115   }
116   return 0;
117 }
118 
119 /*
120     the assembly code is alot like the code for vectors, we should
121     sometime derive a single assembly code that can be used for
122     either case.
123 */
124 
125 static int MatiAIJBeginAssemble(Mat mat)
126 {
127   Matimpiaij  *aij = (Matimpiaij *) mat->data;
128   MPI_Comm    comm = mat->comm;
129   int         numtids = aij->numtids, *owners = aij->rowners;
130   int         mytid = aij->mytid;
131   MPI_Request *send_waits,*recv_waits;
132   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
133   int         tag = 50, *owner,*starts,count;
134   InsertMode  addv;
135   Scalar      *rvalues,*svalues;
136 
137   /* make sure all processors are either in INSERTMODE or ADDMODE */
138   MPI_Allreduce((void *) &aij->insertmode,(void *) &addv,1,MPI_INT,
139                 MPI_BOR,comm);
140   if (addv == (AddValues|InsertValues)) {
141     SETERR(1,"Some processors have inserted while others have added");
142   }
143   aij->insertmode = addv; /* in case this processor had no cache */
144 
145   /*  first count number of contributors to each processor */
146   nprocs = (int *) MALLOC( 2*numtids*sizeof(int) ); CHKPTR(nprocs);
147   MEMSET(nprocs,0,2*numtids*sizeof(int)); procs = nprocs + numtids;
148   owner = (int *) MALLOC( (aij->stash.n+1)*sizeof(int) ); CHKPTR(owner);
149   for ( i=0; i<aij->stash.n; i++ ) {
150     idx = aij->stash.idx[i];
151     for ( j=0; j<numtids; j++ ) {
152       if (idx >= owners[j] && idx < owners[j+1]) {
153         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
154       }
155     }
156   }
157   nsends = 0;  for ( i=0; i<numtids; i++ ) { nsends += procs[i];}
158 
159   /* inform other processors of number of messages and max length*/
160   work = (int *) MALLOC( numtids*sizeof(int) ); CHKPTR(work);
161   MPI_Allreduce((void *) procs,(void *) work,numtids,MPI_INT,MPI_SUM,comm);
162   nreceives = work[mytid];
163   MPI_Allreduce((void *) nprocs,(void *) work,numtids,MPI_INT,MPI_MAX,comm);
164   nmax = work[mytid];
165   FREE(work);
166 
167   /* post receives:
168        1) each message will consist of ordered pairs
169      (global index,value) we store the global index as a double
170      to simplify the message passing.
171        2) since we don't know how long each individual message is we
172      allocate the largest needed buffer for each receive. Potentially
173      this is a lot of wasted space.
174 
175 
176        This could be done better.
177   */
178   rvalues = (Scalar *) MALLOC(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
179   CHKPTR(rvalues);
180   recv_waits = (MPI_Request *) MALLOC((nreceives+1)*sizeof(MPI_Request));
181   CHKPTR(recv_waits);
182   for ( i=0; i<nreceives; i++ ) {
183     MPI_Irecv((void *)(rvalues+3*nmax*i),3*nmax,MPI_SCALAR,MPI_ANY_SOURCE,tag,
184               comm,recv_waits+i);
185   }
186 
187   /* do sends:
188       1) starts[i] gives the starting index in svalues for stuff going to
189          the ith processor
190   */
191   svalues = (Scalar *) MALLOC( 3*(aij->stash.n+1)*sizeof(Scalar) );
192   CHKPTR(svalues);
193   send_waits = (MPI_Request *) MALLOC( (nsends+1)*sizeof(MPI_Request));
194   CHKPTR(send_waits);
195   starts = (int *) MALLOC( numtids*sizeof(int) ); CHKPTR(starts);
196   starts[0] = 0;
197   for ( i=1; i<numtids; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
198   for ( i=0; i<aij->stash.n; i++ ) {
199     svalues[3*starts[owner[i]]]       = (Scalar)  aij->stash.idx[i];
200     svalues[3*starts[owner[i]]+1]     = (Scalar)  aij->stash.idy[i];
201     svalues[3*(starts[owner[i]]++)+2] =  aij->stash.array[i];
202   }
203   FREE(owner);
204   starts[0] = 0;
205   for ( i=1; i<numtids; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
206   count = 0;
207   for ( i=0; i<numtids; i++ ) {
208     if (procs[i]) {
209       MPI_Isend((void*)(svalues+3*starts[i]),3*nprocs[i],MPI_SCALAR,i,tag,
210                 comm,send_waits+count++);
211     }
212   }
213   FREE(starts); FREE(nprocs);
214 
215   /* Free cache space */
216   aij->stash.nmax = aij->stash.n = 0;
217   if (aij->stash.array){ FREE(aij->stash.array); aij->stash.array = 0;}
218 
219   aij->svalues    = svalues;       aij->rvalues = rvalues;
220   aij->nsends     = nsends;         aij->nrecvs = nreceives;
221   aij->send_waits = send_waits; aij->recv_waits = recv_waits;
222   aij->rmax       = nmax;
223 
224   return 0;
225 }
226 extern int MPIAIJSetUpMultiply(Mat);
227 
228 static int MatiAIJEndAssemble(Mat mat)
229 {
230   int        ierr;
231   Matimpiaij *aij = (Matimpiaij *) mat->data;
232 
233   MPI_Status  *send_status,recv_status;
234   int         imdex,nrecvs = aij->nrecvs, count = nrecvs, i, n;
235   int         row,col;
236   Scalar      *values,val;
237   InsertMode  addv = aij->insertmode;
238 
239   /*  wait on receives */
240   while (count) {
241     MPI_Waitany(nrecvs,aij->recv_waits,&imdex,&recv_status);
242     /* unpack receives into our local space */
243     values = aij->rvalues + 3*imdex*aij->rmax;
244     MPI_Get_count(&recv_status,MPI_SCALAR,&n);
245     n = n/3;
246     for ( i=0; i<n; i++ ) {
247       row = (int) PETSCREAL(values[3*i]) - aij->rstart;
248       col = (int) PETSCREAL(values[3*i+1]);
249       val = values[3*i+2];
250       if (col >= aij->cstart && col < aij->cend) {
251           col -= aij->cstart;
252         MatSetValues(aij->A,1,&row,1,&col,&val,addv);
253       }
254       else {
255         if (aij->assembled) {
256           if (!aij->colmap) {ierr = CreateColmap(mat); CHKERR(ierr);}
257           col = aij->colmap[col] - 1;
258           if (col < 0) {
259             SETERR(1,"Cannot insert new off diagonal block nonzero in\
260                      already\
261                      assembled matrix. Contact petsc-maint@mcs.anl.gov\
262                      if your need this feature");
263           }
264         }
265         MatSetValues(aij->B,1,&row,1,&col,&val,addv);
266       }
267     }
268     count--;
269   }
270   FREE(aij->recv_waits); FREE(aij->rvalues);
271 
272   /* wait on sends */
273   if (aij->nsends) {
274     send_status = (MPI_Status *) MALLOC( aij->nsends*sizeof(MPI_Status) );
275     CHKPTR(send_status);
276     MPI_Waitall(aij->nsends,aij->send_waits,send_status);
277     FREE(send_status);
278   }
279   FREE(aij->send_waits); FREE(aij->svalues);
280 
281   aij->insertmode = NotSetValues;
282   ierr = MatBeginAssembly(aij->A); CHKERR(ierr);
283   ierr = MatEndAssembly(aij->A); CHKERR(ierr);
284 
285   if (!aij->assembled) {
286     ierr = MPIAIJSetUpMultiply(mat); CHKERR(ierr);
287   }
288   ierr = MatBeginAssembly(aij->B); CHKERR(ierr);
289   ierr = MatEndAssembly(aij->B); CHKERR(ierr);
290 
291   aij->assembled = 1;
292   return 0;
293 }
294 
295 static int MatiZero(Mat A)
296 {
297   Matimpiaij *l = (Matimpiaij *) A->data;
298 
299   MatZeroEntries(l->A); MatZeroEntries(l->B);
300   return 0;
301 }
302 
303 /* again this uses the same basic stratagy as in the assembly and
304    scatter create routines, we should try to do it systemamatically
305    if we can figure out the proper level of generality. */
306 
307 /* the code does not do the diagonal entries correctly unless the
308    matrix is square and the column and row owerships are identical.
309    This is a BUG. The only way to fix it seems to be to access
310    aij->A and aij->B directly and not through the MatZeroRows()
311    routine.
312 */
313 static int MatiZerorows(Mat A,IS is,Scalar *diag)
314 {
315   Matimpiaij     *l = (Matimpiaij *) A->data;
316   int            i,ierr,N, *rows,*owners = l->rowners,numtids = l->numtids;
317   int            *procs,*nprocs,j,found,idx,nsends,*work;
318   int            nmax,*svalues,*starts,*owner,nrecvs,mytid = l->mytid;
319   int            *rvalues,tag = 67,count,base,slen,n,*source;
320   int            *lens,imdex,*lrows,*values;
321   MPI_Comm       comm = A->comm;
322   MPI_Request    *send_waits,*recv_waits;
323   MPI_Status     recv_status,*send_status;
324   IS             istmp;
325 
326   if (!l->assembled) SETERR(1,"MatiZerorows: must assmble matrix first");
327   ierr = ISGetLocalSize(is,&N); CHKERR(ierr);
328   ierr = ISGetIndices(is,&rows); CHKERR(ierr);
329 
330   /*  first count number of contributors to each processor */
331   nprocs = (int *) MALLOC( 2*numtids*sizeof(int) ); CHKPTR(nprocs);
332   MEMSET(nprocs,0,2*numtids*sizeof(int)); procs = nprocs + numtids;
333   owner = (int *) MALLOC((N+1)*sizeof(int)); CHKPTR(owner); /* see note*/
334   for ( i=0; i<N; i++ ) {
335     idx = rows[i];
336     found = 0;
337     for ( j=0; j<numtids; j++ ) {
338       if (idx >= owners[j] && idx < owners[j+1]) {
339         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
340       }
341     }
342     if (!found) SETERR(1,"Imdex out of range");
343   }
344   nsends = 0;  for ( i=0; i<numtids; i++ ) { nsends += procs[i];}
345 
346   /* inform other processors of number of messages and max length*/
347   work = (int *) MALLOC( numtids*sizeof(int) ); CHKPTR(work);
348   MPI_Allreduce((void *) procs,(void *) work,numtids,MPI_INT,MPI_SUM,comm);
349   nrecvs = work[mytid];
350   MPI_Allreduce((void *) nprocs,(void *) work,numtids,MPI_INT,MPI_MAX,comm);
351   nmax = work[mytid];
352   FREE(work);
353 
354   /* post receives:   */
355   rvalues = (int *) MALLOC((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
356   CHKPTR(rvalues);
357   recv_waits = (MPI_Request *) MALLOC((nrecvs+1)*sizeof(MPI_Request));
358   CHKPTR(recv_waits);
359   for ( i=0; i<nrecvs; i++ ) {
360     MPI_Irecv((void *)(rvalues+nmax*i),nmax,MPI_INT,MPI_ANY_SOURCE,tag,
361               comm,recv_waits+i);
362   }
363 
364   /* do sends:
365       1) starts[i] gives the starting index in svalues for stuff going to
366          the ith processor
367   */
368   svalues = (int *) MALLOC( (N+1)*sizeof(int) ); CHKPTR(svalues);
369   send_waits = (MPI_Request *) MALLOC( (nsends+1)*sizeof(MPI_Request));
370   CHKPTR(send_waits);
371   starts = (int *) MALLOC( (numtids+1)*sizeof(int) ); CHKPTR(starts);
372   starts[0] = 0;
373   for ( i=1; i<numtids; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
374   for ( i=0; i<N; i++ ) {
375     svalues[starts[owner[i]]++] = rows[i];
376   }
377   ISRestoreIndices(is,&rows);
378 
379   starts[0] = 0;
380   for ( i=1; i<numtids+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
381   count = 0;
382   for ( i=0; i<numtids; i++ ) {
383     if (procs[i]) {
384       MPI_Isend((void*)(svalues+starts[i]),nprocs[i],MPI_INT,i,tag,
385                 comm,send_waits+count++);
386     }
387   }
388   FREE(starts);
389 
390   base = owners[mytid];
391 
392   /*  wait on receives */
393   lens = (int *) MALLOC( 2*(nrecvs+1)*sizeof(int) ); CHKPTR(lens);
394   source = lens + nrecvs;
395   count = nrecvs; slen = 0;
396   while (count) {
397     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
398     /* unpack receives into our local space */
399     MPI_Get_count(&recv_status,MPI_INT,&n);
400     source[imdex]  = recv_status.MPI_SOURCE;
401     lens[imdex]  = n;
402     slen += n;
403     count--;
404   }
405   FREE(recv_waits);
406 
407   /* move the data into the send scatter */
408   lrows = (int *) MALLOC( slen*sizeof(int) ); CHKPTR(lrows);
409   count = 0;
410   for ( i=0; i<nrecvs; i++ ) {
411     values = rvalues + i*nmax;
412     for ( j=0; j<lens[i]; j++ ) {
413       lrows[count++] = values[j] - base;
414     }
415   }
416   FREE(rvalues); FREE(lens);
417   FREE(owner); FREE(nprocs);
418 
419   /* actually zap the local rows */
420   ierr = ISCreateSequential(slen,lrows,&istmp); CHKERR(ierr);  FREE(lrows);
421   ierr = MatZeroRows(l->A,istmp,diag); CHKERR(ierr);
422   ierr = MatZeroRows(l->B,istmp,0); CHKERR(ierr);
423   ierr = ISDestroy(istmp); CHKERR(ierr);
424 
425   /* wait on sends */
426   if (nsends) {
427     send_status = (MPI_Status *) MALLOC( nsends*sizeof(MPI_Status) );
428     CHKPTR(send_status);
429     MPI_Waitall(nsends,send_waits,send_status);
430     FREE(send_status);
431   }
432   FREE(send_waits); FREE(svalues);
433 
434 
435   return 0;
436 }
437 
438 static int MatiAIJMult(Mat aijin,Vec xx,Vec yy)
439 {
440   Matimpiaij *aij = (Matimpiaij *) aijin->data;
441   int        ierr;
442   if (!aij->assembled) SETERR(1,"MatiAIJMul: must assmble matrix first");
443   ierr = VecScatterBegin(xx,0,aij->lvec,0,InsertValues,ScatterAll,aij->Mvctx);
444   CHKERR(ierr);
445   ierr = MatMult(aij->A,xx,yy); CHKERR(ierr);
446   ierr = VecScatterEnd(xx,0,aij->lvec,0,InsertValues,ScatterAll,aij->Mvctx);
447   CHKERR(ierr);
448   ierr = MatMultAdd(aij->B,aij->lvec,yy,yy); CHKERR(ierr);
449   return 0;
450 }
451 
452 static int MatiAIJMultadd(Mat aijin,Vec xx,Vec yy,Vec zz)
453 {
454   Matimpiaij *aij = (Matimpiaij *) aijin->data;
455   int        ierr;
456   if (!aij->assembled) SETERR(1,"MatiAIJMul: must assmble matrix first");
457   ierr = VecScatterBegin(xx,0,aij->lvec,0,InsertValues,ScatterAll,aij->Mvctx);
458   CHKERR(ierr);
459   ierr = MatMultAdd(aij->A,xx,yy,zz); CHKERR(ierr);
460   ierr = VecScatterEnd(xx,0,aij->lvec,0,InsertValues,ScatterAll,aij->Mvctx);
461   CHKERR(ierr);
462   ierr = MatMultAdd(aij->B,aij->lvec,zz,zz); CHKERR(ierr);
463   return 0;
464 }
465 
466 static int MatiAIJMultTrans(Mat aijin,Vec xx,Vec yy)
467 {
468   Matimpiaij *aij = (Matimpiaij *) aijin->data;
469   int        ierr;
470 
471   if (!aij->assembled) SETERR(1,"MatiAIJMulTrans: must assmble matrix first");
472   /* do nondiagonal part */
473   ierr = MatMultTrans(aij->B,xx,aij->lvec); CHKERR(ierr);
474   /* send it on its way */
475   ierr = VecScatterBegin(aij->lvec,0,yy,0,AddValues,
476                          ScatterAll|ScatterReverse,aij->Mvctx); CHKERR(ierr);
477   /* do local part */
478   ierr = MatMultTrans(aij->A,xx,yy); CHKERR(ierr);
479   /* receive remote parts: note this assumes the values are not actually */
480   /* inserted in yy until the next line, which is true for my implementation*/
481   /* but is not perhaps always true. */
482   ierr = VecScatterEnd(aij->lvec,0,yy,0,AddValues,ScatterAll|ScatterReverse,
483                          aij->Mvctx); CHKERR(ierr);
484   return 0;
485 }
486 
487 static int MatiAIJMultTransadd(Mat aijin,Vec xx,Vec yy,Vec zz)
488 {
489   Matimpiaij *aij = (Matimpiaij *) aijin->data;
490   int        ierr;
491 
492   if (!aij->assembled) SETERR(1,"MatiAIJMulTrans: must assmble matrix first");
493   /* do nondiagonal part */
494   ierr = MatMultTrans(aij->B,xx,aij->lvec); CHKERR(ierr);
495   /* send it on its way */
496   ierr = VecScatterBegin(aij->lvec,0,zz,0,AddValues,
497                          ScatterAll|ScatterReverse,aij->Mvctx); CHKERR(ierr);
498   /* do local part */
499   ierr = MatMultTransAdd(aij->A,xx,yy,zz); CHKERR(ierr);
500   /* receive remote parts: note this assumes the values are not actually */
501   /* inserted in yy until the next line, which is true for my implementation*/
502   /* but is not perhaps always true. */
503   ierr = VecScatterEnd(aij->lvec,0,zz,0,AddValues,ScatterAll|ScatterReverse,
504                          aij->Mvctx); CHKERR(ierr);
505   return 0;
506 }
507 
508 /*
509   This only works correctly for square matrices where the subblock A->A is the
510    diagonal block
511 */
512 static int MatiAIJgetdiag(Mat Ain,Vec v)
513 {
514   Matimpiaij *A = (Matimpiaij *) Ain->data;
515   if (!A->assembled) SETERR(1,"MatiAIJgetdiag: must assmble matrix first");
516   return MatGetDiagonal(A->A,v);
517 }
518 
519 static int MatiAIJdestroy(PetscObject obj)
520 {
521   Mat        mat = (Mat) obj;
522   Matimpiaij *aij = (Matimpiaij *) mat->data;
523   int        ierr;
524 #if defined(PETSC_LOG)
525   PLogObjectState(obj,"Rows %d Cols %d",aij->M,aij->N);
526 #endif
527   FREE(aij->rowners);
528   ierr = MatDestroy(aij->A); CHKERR(ierr);
529   ierr = MatDestroy(aij->B); CHKERR(ierr);
530   if (aij->colmap) FREE(aij->colmap);
531   if (aij->garray) FREE(aij->garray);
532   if (aij->lvec) VecDestroy(aij->lvec);
533   if (aij->Mvctx) VecScatterCtxDestroy(aij->Mvctx);
534   FREE(aij);
535   PLogObjectDestroy(mat);
536   PETSCHEADERDESTROY(mat);
537   return 0;
538 }
539 
540 static int MatiView(PetscObject obj,Viewer viewer)
541 {
542   Mat        mat = (Mat) obj;
543   Matimpiaij *aij = (Matimpiaij *) mat->data;
544   int        ierr;
545 
546   if (!aij->assembled) SETERR(1,"MatiAIJMulTrans: must assmble matrix first");
547   MPE_Seq_begin(mat->comm,1);
548     ViewerPrintf(viewer,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
549           aij->mytid,aij->m,aij->rstart,aij->rend,aij->n,aij->cstart,
550           aij->cend);
551     ierr = MatView(aij->A,viewer); CHKERR(ierr);
552     ierr = MatView(aij->B,viewer); CHKERR(ierr);
553     ViewerFlush(viewer);
554   MPE_Seq_end(mat->comm,1);
555   return 0;
556 }
557 
558 extern int MatiAIJmarkdiag(Matiaij  *);
559 /*
560     This has to provide several versions.
561 
562      1) per sequential
563      2) a) use only local smoothing updating outer values only once.
564         b) local smoothing updating outer values each inner iteration
565      3) color updating out values betwen colors.
566 */
567 static int MatiAIJrelax(Mat matin,Vec bb,double omega,int flag,double shift,
568                         int its,Vec xx)
569 {
570   Matimpiaij *mat = (Matimpiaij *) matin->data;
571   Mat        AA = mat->A, BB = mat->B;
572   Matiaij    *A = (Matiaij *) AA->data, *B = (Matiaij *)BB->data;
573   Scalar     zero = 0.0,*b,*x,*xs,*ls,d,*v,sum,scale,*t,*ts;
574   int        ierr,*idx, *diag;
575   int        n = mat->n, m = mat->m, i;
576   Vec        tt;
577 
578   if (!mat->assembled) SETERR(1,"MatiAIJRelax: must assmble matrix first");
579 
580   VecGetArray(xx,&x); VecGetArray(bb,&b); VecGetArray(mat->lvec,&ls);
581   xs = x -1; /* shift by one for index start of 1 */
582   ls--;
583   if (!A->diag) {if ((ierr = MatiAIJmarkdiag(A))) return ierr;}
584   diag = A->diag;
585   if (flag == SOR_APPLY_UPPER || flag == SOR_APPLY_LOWER) {
586     SETERR(1,"That option not yet support for parallel AIJ matrices");
587   }
588   if (flag & SOR_EISENSTAT) {
589     /* Let  A = L + U + D; where L is lower trianglar,
590     U is upper triangular, E is diagonal; This routine applies
591 
592             (L + E)^{-1} A (U + E)^{-1}
593 
594     to a vector efficiently using Eisenstat's trick. This is for
595     the case of SSOR preconditioner, so E is D/omega where omega
596     is the relaxation factor.
597     */
598     ierr = VecCreate(xx,&tt); CHKERR(ierr);
599     VecGetArray(tt,&t);
600     scale = (2.0/omega) - 1.0;
601     /*  x = (E + U)^{-1} b */
602     VecSet(&zero,mat->lvec);
603     ierr = VecPipelineBegin(xx,0,mat->lvec,0,InsertValues,PipelineUp,
604                               mat->Mvctx); CHKERR(ierr);
605     for ( i=m-1; i>-1; i-- ) {
606       n    = A->i[i+1] - diag[i] - 1;
607       idx  = A->j + diag[i];
608       v    = A->a + diag[i];
609       sum  = b[i];
610       SPARSEDENSEMDOT(sum,xs,v,idx,n);
611       d    = shift + A->a[diag[i]-1];
612       n    = B->i[i+1] - B->i[i];
613       idx  = B->j + B->i[i] - 1;
614       v    = B->a + B->i[i] - 1;
615       SPARSEDENSEMDOT(sum,ls,v,idx,n);
616       x[i] = omega*(sum/d);
617     }
618     ierr = VecPipelineEnd(xx,0,mat->lvec,0,InsertValues,PipelineUp,
619                             mat->Mvctx); CHKERR(ierr);
620 
621     /*  t = b - (2*E - D)x */
622     v = A->a;
623     for ( i=0; i<m; i++ ) { t[i] = b[i] - scale*(v[*diag++ - 1])*x[i]; }
624 
625     /*  t = (E + L)^{-1}t */
626     ts = t - 1; /* shifted by one for index start of a or mat->j*/
627     diag = A->diag;
628     VecSet(&zero,mat->lvec);
629     ierr = VecPipelineBegin(tt,0,mat->lvec,0,InsertValues,PipelineDown,
630                                                  mat->Mvctx); CHKERR(ierr);
631     for ( i=0; i<m; i++ ) {
632       n    = diag[i] - A->i[i];
633       idx  = A->j + A->i[i] - 1;
634       v    = A->a + A->i[i] - 1;
635       sum  = t[i];
636       SPARSEDENSEMDOT(sum,ts,v,idx,n);
637       d    = shift + A->a[diag[i]-1];
638       n    = B->i[i+1] - B->i[i];
639       idx  = B->j + B->i[i] - 1;
640       v    = B->a + B->i[i] - 1;
641       SPARSEDENSEMDOT(sum,ls,v,idx,n);
642       t[i] = omega*(sum/d);
643     }
644     ierr = VecPipelineEnd(tt,0,mat->lvec,0,InsertValues,PipelineDown,
645                                                     mat->Mvctx); CHKERR(ierr);
646     /*  x = x + t */
647     for ( i=0; i<m; i++ ) { x[i] += t[i]; }
648     VecDestroy(tt);
649     return 0;
650   }
651 
652 
653   if ((flag & SOR_SYMMETRIC_SWEEP) == SOR_SYMMETRIC_SWEEP){
654     if (flag & SOR_ZERO_INITIAL_GUESS) {
655       VecSet(&zero,mat->lvec); VecSet(&zero,xx);
656     }
657     else {
658       ierr=VecScatterBegin(xx,0,mat->lvec,0,InsertValues,ScatterUp,mat->Mvctx);
659       CHKERR(ierr);
660       ierr = VecScatterEnd(xx,0,mat->lvec,0,InsertValues,ScatterUp,mat->Mvctx);
661       CHKERR(ierr);
662     }
663     while (its--) {
664       /* go down through the rows */
665       ierr = VecPipelineBegin(xx,0,mat->lvec,0,InsertValues,PipelineDown,
666                               mat->Mvctx); CHKERR(ierr);
667       for ( i=0; i<m; i++ ) {
668         n    = A->i[i+1] - A->i[i];
669         idx  = A->j + A->i[i] - 1;
670         v    = A->a + A->i[i] - 1;
671         sum  = b[i];
672         SPARSEDENSEMDOT(sum,xs,v,idx,n);
673         d    = shift + A->a[diag[i]-1];
674         n    = B->i[i+1] - B->i[i];
675         idx  = B->j + B->i[i] - 1;
676         v    = B->a + B->i[i] - 1;
677         SPARSEDENSEMDOT(sum,ls,v,idx,n);
678         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
679       }
680       ierr = VecPipelineEnd(xx,0,mat->lvec,0,InsertValues,PipelineDown,
681                             mat->Mvctx); CHKERR(ierr);
682       /* come up through the rows */
683       ierr = VecPipelineBegin(xx,0,mat->lvec,0,InsertValues,PipelineUp,
684                               mat->Mvctx); CHKERR(ierr);
685       for ( i=m-1; i>-1; i-- ) {
686         n    = A->i[i+1] - A->i[i];
687         idx  = A->j + A->i[i] - 1;
688         v    = A->a + A->i[i] - 1;
689         sum  = b[i];
690         SPARSEDENSEMDOT(sum,xs,v,idx,n);
691         d    = shift + A->a[diag[i]-1];
692         n    = B->i[i+1] - B->i[i];
693         idx  = B->j + B->i[i] - 1;
694         v    = B->a + B->i[i] - 1;
695         SPARSEDENSEMDOT(sum,ls,v,idx,n);
696         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
697       }
698       ierr = VecPipelineEnd(xx,0,mat->lvec,0,InsertValues,PipelineUp,
699                             mat->Mvctx); CHKERR(ierr);
700     }
701   }
702   else if (flag & SOR_FORWARD_SWEEP){
703     if (flag & SOR_ZERO_INITIAL_GUESS) {
704       VecSet(&zero,mat->lvec);
705       ierr = VecPipelineBegin(xx,0,mat->lvec,0,InsertValues,PipelineDown,
706                               mat->Mvctx); CHKERR(ierr);
707       for ( i=0; i<m; i++ ) {
708         n    = diag[i] - A->i[i];
709         idx  = A->j + A->i[i] - 1;
710         v    = A->a + A->i[i] - 1;
711         sum  = b[i];
712         SPARSEDENSEMDOT(sum,xs,v,idx,n);
713         d    = shift + A->a[diag[i]-1];
714         n    = B->i[i+1] - B->i[i];
715         idx  = B->j + B->i[i] - 1;
716         v    = B->a + B->i[i] - 1;
717         SPARSEDENSEMDOT(sum,ls,v,idx,n);
718         x[i] = omega*(sum/d);
719       }
720       ierr = VecPipelineEnd(xx,0,mat->lvec,0,InsertValues,PipelineDown,
721                             mat->Mvctx); CHKERR(ierr);
722       its--;
723     }
724     while (its--) {
725       ierr=VecScatterBegin(xx,0,mat->lvec,0,InsertValues,ScatterUp,mat->Mvctx);
726       CHKERR(ierr);
727       ierr = VecScatterEnd(xx,0,mat->lvec,0,InsertValues,ScatterUp,mat->Mvctx);
728       CHKERR(ierr);
729       ierr = VecPipelineBegin(xx,0,mat->lvec,0,InsertValues,PipelineDown,
730                               mat->Mvctx); CHKERR(ierr);
731       for ( i=0; i<m; i++ ) {
732         n    = A->i[i+1] - A->i[i];
733         idx  = A->j + A->i[i] - 1;
734         v    = A->a + A->i[i] - 1;
735         sum  = b[i];
736         SPARSEDENSEMDOT(sum,xs,v,idx,n);
737         d    = shift + A->a[diag[i]-1];
738         n    = B->i[i+1] - B->i[i];
739         idx  = B->j + B->i[i] - 1;
740         v    = B->a + B->i[i] - 1;
741         SPARSEDENSEMDOT(sum,ls,v,idx,n);
742         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
743       }
744       ierr = VecPipelineEnd(xx,0,mat->lvec,0,InsertValues,PipelineDown,
745                             mat->Mvctx); CHKERR(ierr);
746     }
747   }
748   else if (flag & SOR_BACKWARD_SWEEP){
749     if (flag & SOR_ZERO_INITIAL_GUESS) {
750       VecSet(&zero,mat->lvec);
751       ierr = VecPipelineBegin(xx,0,mat->lvec,0,InsertValues,PipelineUp,
752                               mat->Mvctx); CHKERR(ierr);
753       for ( i=m-1; i>-1; i-- ) {
754         n    = A->i[i+1] - diag[i] - 1;
755         idx  = A->j + diag[i];
756         v    = A->a + diag[i];
757         sum  = b[i];
758         SPARSEDENSEMDOT(sum,xs,v,idx,n);
759         d    = shift + A->a[diag[i]-1];
760         n    = B->i[i+1] - B->i[i];
761         idx  = B->j + B->i[i] - 1;
762         v    = B->a + B->i[i] - 1;
763         SPARSEDENSEMDOT(sum,ls,v,idx,n);
764         x[i] = omega*(sum/d);
765       }
766       ierr = VecPipelineEnd(xx,0,mat->lvec,0,InsertValues,PipelineUp,
767                             mat->Mvctx); CHKERR(ierr);
768       its--;
769     }
770     while (its--) {
771       ierr = VecScatterBegin(xx,0,mat->lvec,0,InsertValues,ScatterDown,
772                             mat->Mvctx); CHKERR(ierr);
773       ierr = VecScatterEnd(xx,0,mat->lvec,0,InsertValues,ScatterDown,
774                             mat->Mvctx); CHKERR(ierr);
775       ierr = VecPipelineBegin(xx,0,mat->lvec,0,InsertValues,PipelineUp,
776                               mat->Mvctx); CHKERR(ierr);
777       for ( i=m-1; i>-1; i-- ) {
778         n    = A->i[i+1] - A->i[i];
779         idx  = A->j + A->i[i] - 1;
780         v    = A->a + A->i[i] - 1;
781         sum  = b[i];
782         SPARSEDENSEMDOT(sum,xs,v,idx,n);
783         d    = shift + A->a[diag[i]-1];
784         n    = B->i[i+1] - B->i[i];
785         idx  = B->j + B->i[i] - 1;
786         v    = B->a + B->i[i] - 1;
787         SPARSEDENSEMDOT(sum,ls,v,idx,n);
788         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
789       }
790       ierr = VecPipelineEnd(xx,0,mat->lvec,0,InsertValues,PipelineUp,
791                             mat->Mvctx); CHKERR(ierr);
792     }
793   }
794   else if ((flag & SOR_LOCAL_SYMMETRIC_SWEEP) == SOR_LOCAL_SYMMETRIC_SWEEP){
795     if (flag & SOR_ZERO_INITIAL_GUESS) {
796       return MatRelax(mat->A,bb,omega,flag,shift,its,xx);
797     }
798     ierr=VecScatterBegin(xx,0,mat->lvec,0,InsertValues,ScatterAll,mat->Mvctx);
799     CHKERR(ierr);
800     ierr = VecScatterEnd(xx,0,mat->lvec,0,InsertValues,ScatterAll,mat->Mvctx);
801     CHKERR(ierr);
802     while (its--) {
803       /* go down through the rows */
804       for ( i=0; i<m; i++ ) {
805         n    = A->i[i+1] - A->i[i];
806         idx  = A->j + A->i[i] - 1;
807         v    = A->a + A->i[i] - 1;
808         sum  = b[i];
809         SPARSEDENSEMDOT(sum,xs,v,idx,n);
810         d    = shift + A->a[diag[i]-1];
811         n    = B->i[i+1] - B->i[i];
812         idx  = B->j + B->i[i] - 1;
813         v    = B->a + B->i[i] - 1;
814         SPARSEDENSEMDOT(sum,ls,v,idx,n);
815         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
816       }
817       /* come up through the rows */
818       for ( i=m-1; i>-1; i-- ) {
819         n    = A->i[i+1] - A->i[i];
820         idx  = A->j + A->i[i] - 1;
821         v    = A->a + A->i[i] - 1;
822         sum  = b[i];
823         SPARSEDENSEMDOT(sum,xs,v,idx,n);
824         d    = shift + A->a[diag[i]-1];
825         n    = B->i[i+1] - B->i[i];
826         idx  = B->j + B->i[i] - 1;
827         v    = B->a + B->i[i] - 1;
828         SPARSEDENSEMDOT(sum,ls,v,idx,n);
829         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
830       }
831     }
832   }
833   else if (flag & SOR_LOCAL_FORWARD_SWEEP){
834     if (flag & SOR_ZERO_INITIAL_GUESS) {
835       return MatRelax(mat->A,bb,omega,flag,shift,its,xx);
836     }
837     ierr=VecScatterBegin(xx,0,mat->lvec,0,InsertValues,ScatterAll,mat->Mvctx);
838     CHKERR(ierr);
839     ierr = VecScatterEnd(xx,0,mat->lvec,0,InsertValues,ScatterAll,mat->Mvctx);
840     CHKERR(ierr);
841     while (its--) {
842       for ( i=0; i<m; i++ ) {
843         n    = A->i[i+1] - A->i[i];
844         idx  = A->j + A->i[i] - 1;
845         v    = A->a + A->i[i] - 1;
846         sum  = b[i];
847         SPARSEDENSEMDOT(sum,xs,v,idx,n);
848         d    = shift + A->a[diag[i]-1];
849         n    = B->i[i+1] - B->i[i];
850         idx  = B->j + B->i[i] - 1;
851         v    = B->a + B->i[i] - 1;
852         SPARSEDENSEMDOT(sum,ls,v,idx,n);
853         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
854       }
855     }
856   }
857   else if (flag & SOR_LOCAL_BACKWARD_SWEEP){
858     if (flag & SOR_ZERO_INITIAL_GUESS) {
859       return MatRelax(mat->A,bb,omega,flag,shift,its,xx);
860     }
861     ierr = VecScatterBegin(xx,0,mat->lvec,0,InsertValues,ScatterAll,
862                             mat->Mvctx); CHKERR(ierr);
863     ierr = VecScatterEnd(xx,0,mat->lvec,0,InsertValues,ScatterAll,
864                             mat->Mvctx); CHKERR(ierr);
865     while (its--) {
866       for ( i=m-1; i>-1; i-- ) {
867         n    = A->i[i+1] - A->i[i];
868         idx  = A->j + A->i[i] - 1;
869         v    = A->a + A->i[i] - 1;
870         sum  = b[i];
871         SPARSEDENSEMDOT(sum,xs,v,idx,n);
872         d    = shift + A->a[diag[i]-1];
873         n    = B->i[i+1] - B->i[i];
874         idx  = B->j + B->i[i] - 1;
875         v    = B->a + B->i[i] - 1;
876         SPARSEDENSEMDOT(sum,ls,v,idx,n);
877         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
878       }
879     }
880   }
881   return 0;
882 }
883 static int MatiAIJinsopt(Mat aijin,int op)
884 {
885   Matimpiaij *aij = (Matimpiaij *) aijin->data;
886 
887   if      (op == NO_NEW_NONZERO_LOCATIONS)  {
888     MatSetOption(aij->A,op);
889     MatSetOption(aij->B,op);
890   }
891   else if (op == YES_NEW_NONZERO_LOCATIONS) {
892     MatSetOption(aij->A,op);
893     MatSetOption(aij->B,op);
894   }
895   else if (op == COLUMN_ORIENTED) SETERR(1,"Column oriented not supported");
896   return 0;
897 }
898 
899 static int MatiAIJsize(Mat matin,int *m,int *n)
900 {
901   Matimpiaij *mat = (Matimpiaij *) matin->data;
902   *m = mat->M; *n = mat->N;
903   return 0;
904 }
905 
906 static int MatiAIJlocalsize(Mat matin,int *m,int *n)
907 {
908   Matimpiaij *mat = (Matimpiaij *) matin->data;
909   *m = mat->m; *n = mat->n;
910   return 0;
911 }
912 
913 static int MatiAIJrange(Mat matin,int *m,int *n)
914 {
915   Matimpiaij *mat = (Matimpiaij *) matin->data;
916   *m = mat->rstart; *n = mat->rend;
917   return 0;
918 }
919 
920 static int MatiCopy(Mat,Mat *);
921 extern int MatiAIJMPIConvert(Mat,MATTYPE,Mat *);
922 
923 /* -------------------------------------------------------------------*/
924 static struct _MatOps MatOps = {MatiAIJInsertValues,
925        0, 0,
926        MatiAIJMult,MatiAIJMultadd,MatiAIJMultTrans,MatiAIJMultTransadd,
927        0,0,0,0,
928        0,0,
929        MatiAIJrelax,
930        0,
931        0,0,0,
932        MatiCopy,
933        MatiAIJgetdiag,0,0,
934        MatiAIJBeginAssemble,MatiAIJEndAssemble,
935        0,
936        MatiAIJinsopt,MatiZero,MatiZerorows,0,
937        0,0,0,0,
938        MatiAIJsize,MatiAIJlocalsize,MatiAIJrange,
939        0,0,
940        0,MatiAIJMPIConvert };
941 
942 
943 /*@
944 
945       MatCreateMPIAIJ - Creates a sparse parallel matrix
946                                  in AIJ format.
947 
948   Input Parameters:
949 .   comm - MPI communicator
950 .   m,n - number of local rows and columns (or -1 to have calculated)
951 .   M,N - global rows and columns (or -1 to have calculated)
952 .   d_nz - total number nonzeros in diagonal portion of matrix
953 .   d_nzz - number of nonzeros per row in diagonal portion of matrix or null
954 .           You must leave room for the diagonal entry even if it is zero.
955 .   o_nz - total number nonzeros in off-diagonal portion of matrix
956 .   o_nzz - number of nonzeros per row in off-diagonal portion of matrix
957 .           or null. You must have at least one nonzero per row.
958 
959   Output parameters:
960 .  newmat - the matrix
961 
962   Keywords: matrix, aij, compressed row, sparse, parallel
963 @*/
964 int MatCreateMPIAIJ(MPI_Comm comm,int m,int n,int M,int N,
965                  int d_nz,int *d_nnz, int o_nz,int *o_nnz,Mat *newmat)
966 {
967   Mat          mat;
968   Matimpiaij   *aij;
969   int          ierr, i,sum[2],work[2];
970   *newmat         = 0;
971   PETSCHEADERCREATE(mat,_Mat,MAT_COOKIE,MATAIJMPI,comm);
972   PLogObjectCreate(mat);
973   mat->data       = (void *) (aij = NEW(Matimpiaij)); CHKPTR(aij);
974   mat->ops        = &MatOps;
975   mat->destroy    = MatiAIJdestroy;
976   mat->view       = MatiView;
977   mat->factor     = 0;
978   mat->row        = 0;
979   mat->col        = 0;
980 
981   mat->comm       = comm;
982   aij->insertmode = NotSetValues;
983   MPI_Comm_rank(comm,&aij->mytid);
984   MPI_Comm_size(comm,&aij->numtids);
985 
986   if (M == -1 || N == -1) {
987     work[0] = m; work[1] = n;
988     MPI_Allreduce((void *) work,(void *) sum,2,MPI_INT,MPI_SUM,comm );
989     if (M == -1) M = sum[0];
990     if (N == -1) N = sum[1];
991   }
992   if (m == -1) {m = M/aij->numtids + ((M % aij->numtids) > aij->mytid);}
993   if (n == -1) {n = N/aij->numtids + ((N % aij->numtids) > aij->mytid);}
994   aij->m       = m;
995   aij->n       = n;
996   aij->N       = N;
997   aij->M       = M;
998 
999   /* build local table of row and column ownerships */
1000   aij->rowners = (int *) MALLOC(2*(aij->numtids+2)*sizeof(int));
1001   CHKPTR(aij->rowners);
1002   aij->cowners = aij->rowners + aij->numtids + 1;
1003   MPI_Allgather(&m,1,MPI_INT,aij->rowners+1,1,MPI_INT,comm);
1004   aij->rowners[0] = 0;
1005   for ( i=2; i<=aij->numtids; i++ ) {
1006     aij->rowners[i] += aij->rowners[i-1];
1007   }
1008   aij->rstart = aij->rowners[aij->mytid];
1009   aij->rend   = aij->rowners[aij->mytid+1];
1010   MPI_Allgather(&n,1,MPI_INT,aij->cowners+1,1,MPI_INT,comm);
1011   aij->cowners[0] = 0;
1012   for ( i=2; i<=aij->numtids; i++ ) {
1013     aij->cowners[i] += aij->cowners[i-1];
1014   }
1015   aij->cstart = aij->cowners[aij->mytid];
1016   aij->cend   = aij->cowners[aij->mytid+1];
1017 
1018 
1019   ierr = MatCreateSequentialAIJ(m,n,d_nz,d_nnz,&aij->A); CHKERR(ierr);
1020   PLogObjectParent(mat,aij->A);
1021   ierr = MatCreateSequentialAIJ(m,N,o_nz,o_nnz,&aij->B); CHKERR(ierr);
1022   PLogObjectParent(mat,aij->B);
1023 
1024   /* build cache for off array entries formed */
1025   aij->stash.nmax = CHUNCKSIZE; /* completely arbratray number */
1026   aij->stash.n    = 0;
1027   aij->stash.array = (Scalar *) MALLOC( aij->stash.nmax*(2*sizeof(int) +
1028                             sizeof(Scalar))); CHKPTR(aij->stash.array);
1029   aij->stash.idx = (int *) (aij->stash.array + aij->stash.nmax);
1030   aij->stash.idy = (int *) (aij->stash.idx + aij->stash.nmax);
1031   aij->colmap    = 0;
1032   aij->garray    = 0;
1033 
1034   /* stuff used for matrix vector multiply */
1035   aij->lvec      = 0;
1036   aij->Mvctx     = 0;
1037   aij->assembled = 0;
1038 
1039   *newmat = mat;
1040   return 0;
1041 }
1042 
1043 static int MatiCopy(Mat matin,Mat *newmat)
1044 {
1045   Mat        mat;
1046   Matimpiaij *aij,*oldmat = (Matimpiaij *) matin->data;
1047   int        ierr;
1048   *newmat      = 0;
1049 
1050   if (!oldmat->assembled) SETERR(1,"Cannot copy unassembled matrix");
1051   PETSCHEADERCREATE(mat,_Mat,MAT_COOKIE,MATAIJMPI,matin->comm);
1052   PLogObjectCreate(mat);
1053   mat->data       = (void *) (aij = NEW(Matimpiaij)); CHKPTR(aij);
1054   mat->ops        = &MatOps;
1055   mat->destroy    = MatiAIJdestroy;
1056   mat->view       = MatiView;
1057   mat->factor     = matin->factor;
1058   mat->row        = 0;
1059   mat->col        = 0;
1060 
1061   aij->m          = oldmat->m;
1062   aij->n          = oldmat->n;
1063   aij->M          = oldmat->M;
1064   aij->N          = oldmat->N;
1065 
1066   aij->assembled  = 1;
1067   aij->rstart     = oldmat->rstart;
1068   aij->rend       = oldmat->rend;
1069   aij->cstart     = oldmat->cstart;
1070   aij->cend       = oldmat->cend;
1071   aij->numtids    = oldmat->numtids;
1072   aij->mytid      = oldmat->mytid;
1073   aij->insertmode = NotSetValues;
1074 
1075   aij->rowners    = (int *) MALLOC( (aij->numtids+1)*sizeof(int) );
1076   CHKPTR(aij->rowners);
1077   MEMCPY(aij->rowners,oldmat->rowners,(aij->numtids+1)*sizeof(int));
1078   aij->stash.nmax = 0;
1079   aij->stash.n    = 0;
1080   aij->stash.array= 0;
1081   aij->colmap     = 0;
1082   aij->garray     = 0;
1083   mat->comm       = matin->comm;
1084 
1085   ierr =  VecCreate(oldmat->lvec,&aij->lvec); CHKERR(ierr);
1086   PLogObjectParent(mat,aij->lvec);
1087   ierr =  VecScatterCtxCopy(oldmat->Mvctx,&aij->Mvctx); CHKERR(ierr);
1088   PLogObjectParent(mat,aij->Mvctx);
1089   ierr =  MatCopy(oldmat->A,&aij->A); CHKERR(ierr);
1090   PLogObjectParent(mat,aij->A);
1091   ierr =  MatCopy(oldmat->B,&aij->B); CHKERR(ierr);
1092   PLogObjectParent(mat,aij->B);
1093   *newmat = mat;
1094   return 0;
1095 }
1096