xref: /petsc/src/mat/impls/aij/mpi/mpiaij.c (revision eaa2832de4c8c4ee7603eea4e7d1f3cd6050bcea)
1 #ifndef lint
2 static char vcid[] = "$Id: mpiaij.c,v 1.10 1995/03/17 04:56:54 bsmith Exp bsmith $";
3 #endif
4 
5 #include "mpiaij.h"
6 #include "vec/vecimpl.h"
7 #include "inline/spops.h"
8 
9 #define CHUNCKSIZE   100
10 /*
11    This is a simple minded stash. Do a linear search to determine if
12  in stash, if not add to end.
13 */
14 static int StashValues(Stash *stash,int row,int n, int *idxn,
15                        Scalar *values,InsertMode addv)
16 {
17   int    i,j,N = stash->n,found,*n_idx, *n_idy;
18   Scalar val,*n_array;
19 
20   for ( i=0; i<n; i++ ) {
21     found = 0;
22     val = *values++;
23     for ( j=0; j<N; j++ ) {
24       if ( stash->idx[j] == row && stash->idy[j] == idxn[i]) {
25         /* found a match */
26         if (addv == AddValues) stash->array[j] += val;
27         else stash->array[j] = val;
28         found = 1;
29         break;
30       }
31     }
32     if (!found) { /* not found so add to end */
33       if ( stash->n == stash->nmax ) {
34         /* allocate a larger stash */
35         n_array = (Scalar *) MALLOC( (stash->nmax + CHUNCKSIZE)*(
36                                      2*sizeof(int) + sizeof(Scalar)));
37         CHKPTR(n_array);
38         n_idx = (int *) (n_array + stash->nmax + CHUNCKSIZE);
39         n_idy = (int *) (n_idx + stash->nmax + CHUNCKSIZE);
40         MEMCPY(n_array,stash->array,stash->nmax*sizeof(Scalar));
41         MEMCPY(n_idx,stash->idx,stash->nmax*sizeof(int));
42         MEMCPY(n_idy,stash->idy,stash->nmax*sizeof(int));
43         if (stash->array) FREE(stash->array);
44         stash->array = n_array; stash->idx = n_idx; stash->idy = n_idy;
45         stash->nmax += CHUNCKSIZE;
46       }
47       stash->array[stash->n]   = val;
48       stash->idx[stash->n]     = row;
49       stash->idy[stash->n++]   = idxn[i];
50     }
51   }
52   return 0;
53 }
54 
55 /* local utility routine that creates a mapping from the global column
56 number to the local number in the off-diagonal part of the local
57 storage of the matrix.  This is done in a non scable way since the
58 length of colmap equals the global matrix length.
59 */
60 static int CreateColmap(Mat mat)
61 {
62   Matimpiaij *aij = (Matimpiaij *) mat->data;
63   Matiaij    *B = (Matiaij*) aij->B->data;
64   int        n = B->n,i;
65   aij->colmap = (int *) MALLOC( aij->N*sizeof(int) ); CHKPTR(aij->colmap);
66   MEMSET(aij->colmap,0,aij->N*sizeof(int));
67   for ( i=0; i<n; i++ ) {
68     aij->colmap[aij->garray[i]] = i+1;
69   }
70   return 0;
71 }
72 
73 static int MatiAIJInsertValues(Mat mat,int m,int *idxm,int n,
74                             int *idxn,Scalar *v,InsertMode addv)
75 {
76   Matimpiaij *aij = (Matimpiaij *) mat->data;
77   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
78   int        cstart = aij->cstart, cend = aij->cend,row,col;
79 
80   if (aij->insertmode != NotSetValues && aij->insertmode != addv) {
81     SETERR(1,"You cannot mix inserts and adds");
82   }
83   aij->insertmode = addv;
84   for ( i=0; i<m; i++ ) {
85     if (idxm[i] < 0) SETERR(1,"Negative row index");
86     if (idxm[i] >= aij->M) SETERR(1,"Row index too large");
87     if (idxm[i] >= rstart && idxm[i] < rend) {
88       row = idxm[i] - rstart;
89       for ( j=0; j<n; j++ ) {
90         if (idxn[j] < 0) SETERR(1,"Negative column index");
91         if (idxn[j] >= aij->N) SETERR(1,"Column index too large");
92         if (idxn[j] >= cstart && idxn[j] < cend){
93           col = idxn[j] - cstart;
94           ierr = MatSetValues(aij->A,1,&row,1,&col,v+i*n+j,addv);CHKERR(ierr);
95         }
96         else {
97           if (aij->assembled) {
98             if (!aij->colmap) {ierr = CreateColmap(mat); CHKERR(ierr);}
99             col = aij->colmap[idxn[j]] - 1;
100             if (col < 0) {
101               SETERR(1,"Cannot insert new off diagonal block nonzero in\
102                      already\
103                      assembled matrix. Contact petsc-maint@mcs.anl.gov\
104                      if your need this feature");
105             }
106           }
107           else col = idxn[j];
108           ierr = MatSetValues(aij->B,1,&row,1,&col,v+i*n+j,addv);CHKERR(ierr);
109         }
110       }
111     }
112     else {
113       ierr = StashValues(&aij->stash,idxm[i],n,idxn,v+i*n,addv);CHKERR(ierr);
114     }
115   }
116   return 0;
117 }
118 
119 /*
120     the assembly code is alot like the code for vectors, we should
121     sometime derive a single assembly code that can be used for
122     either case.
123 */
124 
125 static int MatiAIJBeginAssemble(Mat mat)
126 {
127   Matimpiaij  *aij = (Matimpiaij *) mat->data;
128   MPI_Comm    comm = mat->comm;
129   int         numtids = aij->numtids, *owners = aij->rowners;
130   int         mytid = aij->mytid;
131   MPI_Request *send_waits,*recv_waits;
132   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
133   int         tag = 50, *owner,*starts,count;
134   InsertMode  addv;
135   Scalar      *rvalues,*svalues;
136 
137   /* make sure all processors are either in INSERTMODE or ADDMODE */
138   MPI_Allreduce((void *) &aij->insertmode,(void *) &addv,1,MPI_INT,
139                 MPI_BOR,comm);
140   if (addv == (AddValues|InsertValues)) {
141     SETERR(1,"Some processors have inserted while others have added");
142   }
143   aij->insertmode = addv; /* in case this processor had no cache */
144 
145   /*  first count number of contributors to each processor */
146   nprocs = (int *) MALLOC( 2*numtids*sizeof(int) ); CHKPTR(nprocs);
147   MEMSET(nprocs,0,2*numtids*sizeof(int)); procs = nprocs + numtids;
148   owner = (int *) MALLOC( (aij->stash.n+1)*sizeof(int) ); CHKPTR(owner);
149   for ( i=0; i<aij->stash.n; i++ ) {
150     idx = aij->stash.idx[i];
151     for ( j=0; j<numtids; j++ ) {
152       if (idx >= owners[j] && idx < owners[j+1]) {
153         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
154       }
155     }
156   }
157   nsends = 0;  for ( i=0; i<numtids; i++ ) { nsends += procs[i];}
158 
159   /* inform other processors of number of messages and max length*/
160   work = (int *) MALLOC( numtids*sizeof(int) ); CHKPTR(work);
161   MPI_Allreduce((void *) procs,(void *) work,numtids,MPI_INT,MPI_SUM,comm);
162   nreceives = work[mytid];
163   MPI_Allreduce((void *) nprocs,(void *) work,numtids,MPI_INT,MPI_MAX,comm);
164   nmax = work[mytid];
165   FREE(work);
166 
167   /* post receives:
168        1) each message will consist of ordered pairs
169      (global index,value) we store the global index as a double
170      to simplify the message passing.
171        2) since we don't know how long each individual message is we
172      allocate the largest needed buffer for each receive. Potentially
173      this is a lot of wasted space.
174 
175 
176        This could be done better.
177   */
178   rvalues = (Scalar *) MALLOC(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
179   CHKPTR(rvalues);
180   recv_waits = (MPI_Request *) MALLOC((nreceives+1)*sizeof(MPI_Request));
181   CHKPTR(recv_waits);
182   for ( i=0; i<nreceives; i++ ) {
183     MPI_Irecv((void *)(rvalues+3*nmax*i),3*nmax,MPI_SCALAR,MPI_ANY_SOURCE,tag,
184               comm,recv_waits+i);
185   }
186 
187   /* do sends:
188       1) starts[i] gives the starting index in svalues for stuff going to
189          the ith processor
190   */
191   svalues = (Scalar *) MALLOC( 3*(aij->stash.n+1)*sizeof(Scalar) );
192   CHKPTR(svalues);
193   send_waits = (MPI_Request *) MALLOC( (nsends+1)*sizeof(MPI_Request));
194   CHKPTR(send_waits);
195   starts = (int *) MALLOC( numtids*sizeof(int) ); CHKPTR(starts);
196   starts[0] = 0;
197   for ( i=1; i<numtids; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
198   for ( i=0; i<aij->stash.n; i++ ) {
199     svalues[3*starts[owner[i]]]       = (Scalar)  aij->stash.idx[i];
200     svalues[3*starts[owner[i]]+1]     = (Scalar)  aij->stash.idy[i];
201     svalues[3*(starts[owner[i]]++)+2] =  aij->stash.array[i];
202   }
203   FREE(owner);
204   starts[0] = 0;
205   for ( i=1; i<numtids; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
206   count = 0;
207   for ( i=0; i<numtids; i++ ) {
208     if (procs[i]) {
209       MPI_Isend((void*)(svalues+3*starts[i]),3*nprocs[i],MPI_SCALAR,i,tag,
210                 comm,send_waits+count++);
211     }
212   }
213   FREE(starts); FREE(nprocs);
214 
215   /* Free cache space */
216   aij->stash.nmax = aij->stash.n = 0;
217   if (aij->stash.array){ FREE(aij->stash.array); aij->stash.array = 0;}
218 
219   aij->svalues    = svalues;       aij->rvalues = rvalues;
220   aij->nsends     = nsends;         aij->nrecvs = nreceives;
221   aij->send_waits = send_waits; aij->recv_waits = recv_waits;
222   aij->rmax       = nmax;
223 
224   return 0;
225 }
226 extern int MPIAIJSetUpMultiply(Mat);
227 
228 static int MatiAIJEndAssemble(Mat mat)
229 {
230   int        ierr;
231   Matimpiaij *aij = (Matimpiaij *) mat->data;
232 
233   MPI_Status  *send_status,recv_status;
234   int         imdex,nrecvs = aij->nrecvs, count = nrecvs, i, n;
235   int         row,col;
236   Scalar      *values,val;
237   InsertMode  addv = aij->insertmode;
238 
239   /*  wait on receives */
240   while (count) {
241     MPI_Waitany(nrecvs,aij->recv_waits,&imdex,&recv_status);
242     /* unpack receives into our local space */
243     values = aij->rvalues + 3*imdex*aij->rmax;
244     MPI_Get_count(&recv_status,MPI_SCALAR,&n);
245     n = n/3;
246     for ( i=0; i<n; i++ ) {
247       row = (int) PETSCREAL(values[3*i]) - aij->rstart;
248       col = (int) PETSCREAL(values[3*i+1]);
249       val = values[3*i+2];
250       if (col >= aij->cstart && col < aij->cend) {
251           col -= aij->cstart;
252         MatSetValues(aij->A,1,&row,1,&col,&val,addv);
253       }
254       else {
255         if (aij->assembled) {
256           if (!aij->colmap) {ierr = CreateColmap(mat); CHKERR(ierr);}
257           col = aij->colmap[col] - 1;
258           if (col < 0) {
259             SETERR(1,"Cannot insert new off diagonal block nonzero in\
260                      already\
261                      assembled matrix. Contact petsc-maint@mcs.anl.gov\
262                      if your need this feature");
263           }
264         }
265         MatSetValues(aij->B,1,&row,1,&col,&val,addv);
266       }
267     }
268     count--;
269   }
270   FREE(aij->recv_waits); FREE(aij->rvalues);
271 
272   /* wait on sends */
273   if (aij->nsends) {
274     send_status = (MPI_Status *) MALLOC( aij->nsends*sizeof(MPI_Status) );
275     CHKPTR(send_status);
276     MPI_Waitall(aij->nsends,aij->send_waits,send_status);
277     FREE(send_status);
278   }
279   FREE(aij->send_waits); FREE(aij->svalues);
280 
281   aij->insertmode = NotSetValues;
282   ierr = MatBeginAssembly(aij->A); CHKERR(ierr);
283   ierr = MatEndAssembly(aij->A); CHKERR(ierr);
284 
285   if (!aij->assembled) {
286     ierr = MPIAIJSetUpMultiply(mat); CHKERR(ierr);
287   }
288   ierr = MatBeginAssembly(aij->B); CHKERR(ierr);
289   ierr = MatEndAssembly(aij->B); CHKERR(ierr);
290 
291   aij->assembled = 1;
292   return 0;
293 }
294 
295 static int MatiZero(Mat A)
296 {
297   Matimpiaij *l = (Matimpiaij *) A->data;
298 
299   MatZeroEntries(l->A); MatZeroEntries(l->B);
300   return 0;
301 }
302 
303 /* again this uses the same basic stratagy as in the assembly and
304    scatter create routines, we should try to do it systemamatically
305    if we can figure out the proper level of generality. */
306 
307 /* the code does not do the diagonal entries correctly unless the
308    matrix is square and the column and row owerships are identical.
309    This is a BUG. The only way to fix it seems to be to access
310    aij->A and aij->B directly and not through the MatZeroRows()
311    routine.
312 */
313 static int MatiZerorows(Mat A,IS is,Scalar *diag)
314 {
315   Matimpiaij     *l = (Matimpiaij *) A->data;
316   int            i,ierr,N, *rows,*owners = l->rowners,numtids = l->numtids;
317   int            *procs,*nprocs,j,found,idx,nsends,*work;
318   int            nmax,*svalues,*starts,*owner,nrecvs,mytid = l->mytid;
319   int            *rvalues,tag = 67,count,base,slen,n,*source;
320   int            *lens,imdex,*lrows,*values;
321   MPI_Comm       comm = A->comm;
322   MPI_Request    *send_waits,*recv_waits;
323   MPI_Status     recv_status,*send_status;
324   IS             istmp;
325 
326   if (!l->assembled) SETERR(1,"MatiZerorows: must assmble matrix first");
327   ierr = ISGetLocalSize(is,&N); CHKERR(ierr);
328   ierr = ISGetIndices(is,&rows); CHKERR(ierr);
329 
330   /*  first count number of contributors to each processor */
331   nprocs = (int *) MALLOC( 2*numtids*sizeof(int) ); CHKPTR(nprocs);
332   MEMSET(nprocs,0,2*numtids*sizeof(int)); procs = nprocs + numtids;
333   owner = (int *) MALLOC((N+1)*sizeof(int)); CHKPTR(owner); /* see note*/
334   for ( i=0; i<N; i++ ) {
335     idx = rows[i];
336     found = 0;
337     for ( j=0; j<numtids; j++ ) {
338       if (idx >= owners[j] && idx < owners[j+1]) {
339         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
340       }
341     }
342     if (!found) SETERR(1,"Imdex out of range");
343   }
344   nsends = 0;  for ( i=0; i<numtids; i++ ) { nsends += procs[i];}
345 
346   /* inform other processors of number of messages and max length*/
347   work = (int *) MALLOC( numtids*sizeof(int) ); CHKPTR(work);
348   MPI_Allreduce((void *) procs,(void *) work,numtids,MPI_INT,MPI_SUM,comm);
349   nrecvs = work[mytid];
350   MPI_Allreduce((void *) nprocs,(void *) work,numtids,MPI_INT,MPI_MAX,comm);
351   nmax = work[mytid];
352   FREE(work);
353 
354   /* post receives:   */
355   rvalues = (int *) MALLOC((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
356   CHKPTR(rvalues);
357   recv_waits = (MPI_Request *) MALLOC((nrecvs+1)*sizeof(MPI_Request));
358   CHKPTR(recv_waits);
359   for ( i=0; i<nrecvs; i++ ) {
360     MPI_Irecv((void *)(rvalues+nmax*i),nmax,MPI_INT,MPI_ANY_SOURCE,tag,
361               comm,recv_waits+i);
362   }
363 
364   /* do sends:
365       1) starts[i] gives the starting index in svalues for stuff going to
366          the ith processor
367   */
368   svalues = (int *) MALLOC( (N+1)*sizeof(int) ); CHKPTR(svalues);
369   send_waits = (MPI_Request *) MALLOC( (nsends+1)*sizeof(MPI_Request));
370   CHKPTR(send_waits);
371   starts = (int *) MALLOC( (numtids+1)*sizeof(int) ); CHKPTR(starts);
372   starts[0] = 0;
373   for ( i=1; i<numtids; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
374   for ( i=0; i<N; i++ ) {
375     svalues[starts[owner[i]]++] = rows[i];
376   }
377   ISRestoreIndices(is,&rows);
378 
379   starts[0] = 0;
380   for ( i=1; i<numtids+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
381   count = 0;
382   for ( i=0; i<numtids; i++ ) {
383     if (procs[i]) {
384       MPI_Isend((void*)(svalues+starts[i]),nprocs[i],MPI_INT,i,tag,
385                 comm,send_waits+count++);
386     }
387   }
388   FREE(starts);
389 
390   base = owners[mytid];
391 
392   /*  wait on receives */
393   lens = (int *) MALLOC( 2*(nrecvs+1)*sizeof(int) ); CHKPTR(lens);
394   source = lens + nrecvs;
395   count = nrecvs; slen = 0;
396   while (count) {
397     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
398     /* unpack receives into our local space */
399     MPI_Get_count(&recv_status,MPI_INT,&n);
400     source[imdex]  = recv_status.MPI_SOURCE;
401     lens[imdex]  = n;
402     slen += n;
403     count--;
404   }
405   FREE(recv_waits);
406 
407   /* move the data into the send scatter */
408   lrows = (int *) MALLOC( slen*sizeof(int) ); CHKPTR(lrows);
409   count = 0;
410   for ( i=0; i<nrecvs; i++ ) {
411     values = rvalues + i*nmax;
412     for ( j=0; j<lens[i]; j++ ) {
413       lrows[count++] = values[j] - base;
414     }
415   }
416   FREE(rvalues); FREE(lens);
417   FREE(owner); FREE(nprocs);
418 
419   /* actually zap the local rows */
420   ierr = ISCreateSequential(slen,lrows,&istmp); CHKERR(ierr);  FREE(lrows);
421   ierr = MatZeroRows(l->A,istmp,diag); CHKERR(ierr);
422   ierr = MatZeroRows(l->B,istmp,0); CHKERR(ierr);
423   ierr = ISDestroy(istmp); CHKERR(ierr);
424 
425   /* wait on sends */
426   if (nsends) {
427     send_status = (MPI_Status *) MALLOC( nsends*sizeof(MPI_Status) );
428     CHKPTR(send_status);
429     MPI_Waitall(nsends,send_waits,send_status);
430     FREE(send_status);
431   }
432   FREE(send_waits); FREE(svalues);
433 
434 
435   return 0;
436 }
437 
438 static int MatiAIJMult(Mat aijin,Vec xx,Vec yy)
439 {
440   Matimpiaij *aij = (Matimpiaij *) aijin->data;
441   int        ierr;
442   if (!aij->assembled) SETERR(1,"MatiAIJMul: must assmble matrix first");
443   ierr = VecScatterBegin(xx,0,aij->lvec,0,InsertValues,ScatterAll,aij->Mvctx);
444   CHKERR(ierr);
445   ierr = MatMult(aij->A,xx,yy); CHKERR(ierr);
446   ierr = VecScatterEnd(xx,0,aij->lvec,0,InsertValues,ScatterAll,aij->Mvctx);
447   CHKERR(ierr);
448   ierr = MatMultAdd(aij->B,aij->lvec,yy,yy); CHKERR(ierr);
449   return 0;
450 }
451 
452 static int MatiAIJMultadd(Mat aijin,Vec xx,Vec yy,Vec zz)
453 {
454   Matimpiaij *aij = (Matimpiaij *) aijin->data;
455   int        ierr;
456   if (!aij->assembled) SETERR(1,"MatiAIJMul: must assmble matrix first");
457   ierr = VecScatterBegin(xx,0,aij->lvec,0,InsertValues,ScatterAll,aij->Mvctx);
458   CHKERR(ierr);
459   ierr = MatMultAdd(aij->A,xx,yy,zz); CHKERR(ierr);
460   ierr = VecScatterEnd(xx,0,aij->lvec,0,InsertValues,ScatterAll,aij->Mvctx);
461   CHKERR(ierr);
462   ierr = MatMultAdd(aij->B,aij->lvec,zz,zz); CHKERR(ierr);
463   return 0;
464 }
465 
466 static int MatiAIJMultTrans(Mat aijin,Vec xx,Vec yy)
467 {
468   Matimpiaij *aij = (Matimpiaij *) aijin->data;
469   int        ierr;
470 
471   if (!aij->assembled) SETERR(1,"MatiAIJMulTrans: must assmble matrix first");
472   /* do nondiagonal part */
473   ierr = MatMultTrans(aij->B,xx,aij->lvec); CHKERR(ierr);
474   /* send it on its way */
475   ierr = VecScatterBegin(aij->lvec,0,yy,0,AddValues,
476                          ScatterAll|ScatterReverse,aij->Mvctx); CHKERR(ierr);
477   /* do local part */
478   ierr = MatMultTrans(aij->A,xx,yy); CHKERR(ierr);
479   /* receive remote parts: note this assumes the values are not actually */
480   /* inserted in yy until the next line, which is true for my implementation*/
481   /* but is not perhaps always true. */
482   ierr = VecScatterEnd(aij->lvec,0,yy,0,AddValues,ScatterAll|ScatterReverse,
483                          aij->Mvctx); CHKERR(ierr);
484   return 0;
485 }
486 
487 static int MatiAIJMultTransadd(Mat aijin,Vec xx,Vec yy,Vec zz)
488 {
489   Matimpiaij *aij = (Matimpiaij *) aijin->data;
490   int        ierr;
491 
492   if (!aij->assembled) SETERR(1,"MatiAIJMulTrans: must assmble matrix first");
493   /* do nondiagonal part */
494   ierr = MatMultTrans(aij->B,xx,aij->lvec); CHKERR(ierr);
495   /* send it on its way */
496   ierr = VecScatterBegin(aij->lvec,0,zz,0,AddValues,
497                          ScatterAll|ScatterReverse,aij->Mvctx); CHKERR(ierr);
498   /* do local part */
499   ierr = MatMultTransAdd(aij->A,xx,yy,zz); CHKERR(ierr);
500   /* receive remote parts: note this assumes the values are not actually */
501   /* inserted in yy until the next line, which is true for my implementation*/
502   /* but is not perhaps always true. */
503   ierr = VecScatterEnd(aij->lvec,0,zz,0,AddValues,ScatterAll|ScatterReverse,
504                          aij->Mvctx); CHKERR(ierr);
505   return 0;
506 }
507 
508 /*
509   This only works correctly for square matrices where the subblock A->A is the
510    diagonal block
511 */
512 static int MatiAIJgetdiag(Mat Ain,Vec v)
513 {
514   Matimpiaij *A = (Matimpiaij *) Ain->data;
515   if (!A->assembled) SETERR(1,"MatiAIJgetdiag: must assmble matrix first");
516   return MatGetDiagonal(A->A,v);
517 }
518 
519 static int MatiAIJdestroy(PetscObject obj)
520 {
521   Mat        mat = (Mat) obj;
522   Matimpiaij *aij = (Matimpiaij *) mat->data;
523   int        ierr;
524 #if defined(PETSC_LOG)
525   PLogObjectState(obj,"Rows %d Cols %d",aij->M,aij->N);
526 #endif
527   FREE(aij->rowners);
528   ierr = MatDestroy(aij->A); CHKERR(ierr);
529   ierr = MatDestroy(aij->B); CHKERR(ierr);
530   if (aij->colmap) FREE(aij->colmap);
531   if (aij->garray) FREE(aij->garray);
532   if (aij->lvec) VecDestroy(aij->lvec);
533   if (aij->Mvctx) VecScatterCtxDestroy(aij->Mvctx);
534   FREE(aij);
535   PLogObjectDestroy(mat);
536   PETSCHEADERDESTROY(mat);
537   return 0;
538 }
539 
540 static int MatiView(PetscObject obj,Viewer viewer)
541 {
542   Mat        mat = (Mat) obj;
543   Matimpiaij *aij = (Matimpiaij *) mat->data;
544   int        ierr;
545 
546   if (!aij->assembled) SETERR(1,"MatiAIJMulTrans: must assmble matrix first");
547   MPE_Seq_begin(mat->comm,1);
548     printf("[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
549           aij->mytid,aij->m,aij->rstart,aij->rend,aij->n,aij->cstart,
550           aij->cend);
551     ierr = MatView(aij->A,0); CHKERR(ierr);
552     ierr = MatView(aij->B,0); CHKERR(ierr);
553     fflush(stdout);
554   MPE_Seq_end(mat->comm,1);
555   return 0;
556 }
557 
558 extern int MatiAIJmarkdiag(Matiaij  *);
559 /*
560     This has to provide several versions.
561 
562      1) per sequential
563      2) a) use only local smoothing updating outer values only once.
564         b) local smoothing updating outer values each inner iteration
565      3) color updating out values betwen colors.
566 */
567 static int MatiAIJrelax(Mat matin,Vec bb,double omega,int flag,double shift,
568                         int its,Vec xx)
569 {
570   Matimpiaij *mat = (Matimpiaij *) matin->data;
571   Mat        AA = mat->A, BB = mat->B;
572   Matiaij    *A = (Matiaij *) AA->data, *B = (Matiaij *)BB->data;
573   Scalar     zero = 0.0,*b,*x,*xs,*ls,d,*v,sum,scale,*t,*ts;
574   int        ierr,*idx, *diag;
575   int        n = mat->n, m = mat->m, i;
576   Vec        tt;
577 
578   if (!mat->assembled) SETERR(1,"MatiAIJRelax: must assmble matrix first");
579 
580   VecGetArray(xx,&x); VecGetArray(bb,&b); VecGetArray(mat->lvec,&ls);
581   xs = x -1; /* shift by one for index start of 1 */
582   ls--;
583   if (!A->diag) {if ((ierr = MatiAIJmarkdiag(A))) return ierr;}
584   diag = A->diag;
585   if (flag == SOR_APPLY_UPPER || flag == SOR_APPLY_LOWER) {
586     SETERR(1,"That option not yet support for parallel AIJ matrices");
587   }
588   if (flag & SOR_EISENSTAT) {
589     /* Let  A = L + U + D; where L is lower trianglar,
590     U is upper triangular, E is diagonal; This routine applies
591 
592             (L + E)^{-1} A (U + E)^{-1}
593 
594     to a vector efficiently using Eisenstat's trick. This is for
595     the case of SSOR preconditioner, so E is D/omega where omega
596     is the relaxation factor.
597     */
598     ierr = VecCreate(xx,&tt); CHKERR(ierr);
599     VecGetArray(tt,&t);
600     scale = (2.0/omega) - 1.0;
601     /*  x = (E + U)^{-1} b */
602     VecSet(&zero,mat->lvec);
603     ierr = VecPipelineBegin(xx,0,mat->lvec,0,InsertValues,PipelineUp,
604                               mat->Mvctx); CHKERR(ierr);
605     for ( i=m-1; i>-1; i-- ) {
606       n    = A->i[i+1] - diag[i] - 1;
607       idx  = A->j + diag[i];
608       v    = A->a + diag[i];
609       sum  = b[i];
610       SPARSEDENSEMDOT(sum,xs,v,idx,n);
611       d    = shift + A->a[diag[i]-1];
612       n    = B->i[i+1] - B->i[i];
613       idx  = B->j + B->i[i] - 1;
614       v    = B->a + B->i[i] - 1;
615       SPARSEDENSEMDOT(sum,ls,v,idx,n);
616       x[i] = omega*(sum/d);
617     }
618     ierr = VecPipelineEnd(xx,0,mat->lvec,0,InsertValues,PipelineUp,
619                             mat->Mvctx); CHKERR(ierr);
620 
621     /*  t = b - (2*E - D)x */
622     v = A->a;
623     for ( i=0; i<m; i++ ) { t[i] = b[i] - scale*(v[*diag++ - 1])*x[i]; }
624 
625     /*  t = (E + L)^{-1}t */
626     ts = t - 1; /* shifted by one for index start of a or mat->j*/
627     diag = A->diag;
628     VecSet(&zero,mat->lvec);
629     ierr = VecPipelineBegin(tt,0,mat->lvec,0,InsertValues,PipelineDown,
630                                                  mat->Mvctx); CHKERR(ierr);
631     for ( i=0; i<m; i++ ) {
632       n    = diag[i] - A->i[i];
633       idx  = A->j + A->i[i] - 1;
634       v    = A->a + A->i[i] - 1;
635       sum  = t[i];
636       SPARSEDENSEMDOT(sum,ts,v,idx,n);
637       d    = shift + A->a[diag[i]-1];
638       n    = B->i[i+1] - B->i[i];
639       idx  = B->j + B->i[i] - 1;
640       v    = B->a + B->i[i] - 1;
641       SPARSEDENSEMDOT(sum,ls,v,idx,n);
642       t[i] = omega*(sum/d);
643     }
644     ierr = VecPipelineEnd(tt,0,mat->lvec,0,InsertValues,PipelineDown,
645                                                     mat->Mvctx); CHKERR(ierr);
646     /*  x = x + t */
647     for ( i=0; i<m; i++ ) { x[i] += t[i]; }
648     VecDestroy(tt);
649     return 0;
650   }
651 
652 
653   if ((flag & SOR_SYMMETRIC_SWEEP) == SOR_SYMMETRIC_SWEEP){
654     if (flag & SOR_ZERO_INITIAL_GUESS) {
655       VecSet(&zero,mat->lvec); VecSet(&zero,xx);
656     }
657     else {
658       ierr=VecScatterBegin(xx,0,mat->lvec,0,InsertValues,ScatterUp,mat->Mvctx);
659       CHKERR(ierr);
660       ierr = VecScatterEnd(xx,0,mat->lvec,0,InsertValues,ScatterUp,mat->Mvctx);
661       CHKERR(ierr);
662     }
663     while (its--) {
664       /* go down through the rows */
665       ierr = VecPipelineBegin(xx,0,mat->lvec,0,InsertValues,PipelineDown,
666                               mat->Mvctx); CHKERR(ierr);
667       for ( i=0; i<m; i++ ) {
668         n    = A->i[i+1] - A->i[i];
669         idx  = A->j + A->i[i] - 1;
670         v    = A->a + A->i[i] - 1;
671         sum  = b[i];
672         SPARSEDENSEMDOT(sum,xs,v,idx,n);
673         d    = shift + A->a[diag[i]-1];
674         n    = B->i[i+1] - B->i[i];
675         idx  = B->j + B->i[i] - 1;
676         v    = B->a + B->i[i] - 1;
677         SPARSEDENSEMDOT(sum,ls,v,idx,n);
678         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
679       }
680       ierr = VecPipelineEnd(xx,0,mat->lvec,0,InsertValues,PipelineDown,
681                             mat->Mvctx); CHKERR(ierr);
682       /* come up through the rows */
683       ierr = VecPipelineBegin(xx,0,mat->lvec,0,InsertValues,PipelineUp,
684                               mat->Mvctx); CHKERR(ierr);
685       for ( i=m-1; i>-1; i-- ) {
686         n    = A->i[i+1] - A->i[i];
687         idx  = A->j + A->i[i] - 1;
688         v    = A->a + A->i[i] - 1;
689         sum  = b[i];
690         SPARSEDENSEMDOT(sum,xs,v,idx,n);
691         d    = shift + A->a[diag[i]-1];
692         n    = B->i[i+1] - B->i[i];
693         idx  = B->j + B->i[i] - 1;
694         v    = B->a + B->i[i] - 1;
695         SPARSEDENSEMDOT(sum,ls,v,idx,n);
696         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
697       }
698       ierr = VecPipelineEnd(xx,0,mat->lvec,0,InsertValues,PipelineUp,
699                             mat->Mvctx); CHKERR(ierr);
700     }
701   }
702   else if (flag & SOR_FORWARD_SWEEP){
703     if (flag & SOR_ZERO_INITIAL_GUESS) {
704       VecSet(&zero,mat->lvec);
705       ierr = VecPipelineBegin(xx,0,mat->lvec,0,InsertValues,PipelineDown,
706                               mat->Mvctx); CHKERR(ierr);
707       for ( i=0; i<m; i++ ) {
708         n    = diag[i] - A->i[i];
709         idx  = A->j + A->i[i] - 1;
710         v    = A->a + A->i[i] - 1;
711         sum  = b[i];
712         SPARSEDENSEMDOT(sum,xs,v,idx,n);
713         d    = shift + A->a[diag[i]-1];
714         n    = B->i[i+1] - B->i[i];
715         idx  = B->j + B->i[i] - 1;
716         v    = B->a + B->i[i] - 1;
717         SPARSEDENSEMDOT(sum,ls,v,idx,n);
718         x[i] = omega*(sum/d);
719       }
720       ierr = VecPipelineEnd(xx,0,mat->lvec,0,InsertValues,PipelineDown,
721                             mat->Mvctx); CHKERR(ierr);
722       its--;
723     }
724     while (its--) {
725       ierr=VecScatterBegin(xx,0,mat->lvec,0,InsertValues,ScatterUp,mat->Mvctx);
726       CHKERR(ierr);
727       ierr = VecScatterEnd(xx,0,mat->lvec,0,InsertValues,ScatterUp,mat->Mvctx);
728       CHKERR(ierr);
729       ierr = VecPipelineBegin(xx,0,mat->lvec,0,InsertValues,PipelineDown,
730                               mat->Mvctx); CHKERR(ierr);
731       for ( i=0; i<m; i++ ) {
732         n    = A->i[i+1] - A->i[i];
733         idx  = A->j + A->i[i] - 1;
734         v    = A->a + A->i[i] - 1;
735         sum  = b[i];
736         SPARSEDENSEMDOT(sum,xs,v,idx,n);
737         d    = shift + A->a[diag[i]-1];
738         n    = B->i[i+1] - B->i[i];
739         idx  = B->j + B->i[i] - 1;
740         v    = B->a + B->i[i] - 1;
741         SPARSEDENSEMDOT(sum,ls,v,idx,n);
742         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
743       }
744       ierr = VecPipelineEnd(xx,0,mat->lvec,0,InsertValues,PipelineDown,
745                             mat->Mvctx); CHKERR(ierr);
746     }
747   }
748   else if (flag & SOR_BACKWARD_SWEEP){
749     if (flag & SOR_ZERO_INITIAL_GUESS) {
750       VecSet(&zero,mat->lvec);
751       ierr = VecPipelineBegin(xx,0,mat->lvec,0,InsertValues,PipelineUp,
752                               mat->Mvctx); CHKERR(ierr);
753       for ( i=m-1; i>-1; i-- ) {
754         n    = A->i[i+1] - diag[i] - 1;
755         idx  = A->j + diag[i];
756         v    = A->a + diag[i];
757         sum  = b[i];
758         SPARSEDENSEMDOT(sum,xs,v,idx,n);
759         d    = shift + A->a[diag[i]-1];
760         n    = B->i[i+1] - B->i[i];
761         idx  = B->j + B->i[i] - 1;
762         v    = B->a + B->i[i] - 1;
763         SPARSEDENSEMDOT(sum,ls,v,idx,n);
764         x[i] = omega*(sum/d);
765       }
766       ierr = VecPipelineEnd(xx,0,mat->lvec,0,InsertValues,PipelineUp,
767                             mat->Mvctx); CHKERR(ierr);
768       its--;
769     }
770     while (its--) {
771       ierr = VecScatterBegin(xx,0,mat->lvec,0,InsertValues,ScatterDown,
772                             mat->Mvctx); CHKERR(ierr);
773       ierr = VecScatterEnd(xx,0,mat->lvec,0,InsertValues,ScatterDown,
774                             mat->Mvctx); CHKERR(ierr);
775       ierr = VecPipelineBegin(xx,0,mat->lvec,0,InsertValues,PipelineUp,
776                               mat->Mvctx); CHKERR(ierr);
777       for ( i=m-1; i>-1; i-- ) {
778         n    = A->i[i+1] - A->i[i];
779         idx  = A->j + A->i[i] - 1;
780         v    = A->a + A->i[i] - 1;
781         sum  = b[i];
782         SPARSEDENSEMDOT(sum,xs,v,idx,n);
783         d    = shift + A->a[diag[i]-1];
784         n    = B->i[i+1] - B->i[i];
785         idx  = B->j + B->i[i] - 1;
786         v    = B->a + B->i[i] - 1;
787         SPARSEDENSEMDOT(sum,ls,v,idx,n);
788         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
789       }
790       ierr = VecPipelineEnd(xx,0,mat->lvec,0,InsertValues,PipelineUp,
791                             mat->Mvctx); CHKERR(ierr);
792     }
793   }
794   else if ((flag & SOR_LOCAL_SYMMETRIC_SWEEP) == SOR_LOCAL_SYMMETRIC_SWEEP){
795     if (flag & SOR_ZERO_INITIAL_GUESS) {
796       return MatRelax(mat->A,bb,omega,flag,shift,its,xx);
797     }
798     ierr=VecScatterBegin(xx,0,mat->lvec,0,InsertValues,ScatterAll,mat->Mvctx);
799     CHKERR(ierr);
800     ierr = VecScatterEnd(xx,0,mat->lvec,0,InsertValues,ScatterAll,mat->Mvctx);
801     CHKERR(ierr);
802     while (its--) {
803       /* go down through the rows */
804       for ( i=0; i<m; i++ ) {
805         n    = A->i[i+1] - A->i[i];
806         idx  = A->j + A->i[i] - 1;
807         v    = A->a + A->i[i] - 1;
808         sum  = b[i];
809         SPARSEDENSEMDOT(sum,xs,v,idx,n);
810         d    = shift + A->a[diag[i]-1];
811         n    = B->i[i+1] - B->i[i];
812         idx  = B->j + B->i[i] - 1;
813         v    = B->a + B->i[i] - 1;
814         SPARSEDENSEMDOT(sum,ls,v,idx,n);
815         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
816       }
817       /* come up through the rows */
818       for ( i=m-1; i>-1; i-- ) {
819         n    = A->i[i+1] - A->i[i];
820         idx  = A->j + A->i[i] - 1;
821         v    = A->a + A->i[i] - 1;
822         sum  = b[i];
823         SPARSEDENSEMDOT(sum,xs,v,idx,n);
824         d    = shift + A->a[diag[i]-1];
825         n    = B->i[i+1] - B->i[i];
826         idx  = B->j + B->i[i] - 1;
827         v    = B->a + B->i[i] - 1;
828         SPARSEDENSEMDOT(sum,ls,v,idx,n);
829         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
830       }
831     }
832   }
833   else if (flag & SOR_LOCAL_FORWARD_SWEEP){
834     if (flag & SOR_ZERO_INITIAL_GUESS) {
835       return MatRelax(mat->A,bb,omega,flag,shift,its,xx);
836     }
837     ierr=VecScatterBegin(xx,0,mat->lvec,0,InsertValues,ScatterAll,mat->Mvctx);
838     CHKERR(ierr);
839     ierr = VecScatterEnd(xx,0,mat->lvec,0,InsertValues,ScatterAll,mat->Mvctx);
840     CHKERR(ierr);
841     while (its--) {
842       for ( i=0; i<m; i++ ) {
843         n    = A->i[i+1] - A->i[i];
844         idx  = A->j + A->i[i] - 1;
845         v    = A->a + A->i[i] - 1;
846         sum  = b[i];
847         SPARSEDENSEMDOT(sum,xs,v,idx,n);
848         d    = shift + A->a[diag[i]-1];
849         n    = B->i[i+1] - B->i[i];
850         idx  = B->j + B->i[i] - 1;
851         v    = B->a + B->i[i] - 1;
852         SPARSEDENSEMDOT(sum,ls,v,idx,n);
853         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
854       }
855     }
856   }
857   else if (flag & SOR_LOCAL_BACKWARD_SWEEP){
858     if (flag & SOR_ZERO_INITIAL_GUESS) {
859       return MatRelax(mat->A,bb,omega,flag,shift,its,xx);
860     }
861     ierr = VecScatterBegin(xx,0,mat->lvec,0,InsertValues,ScatterAll,
862                             mat->Mvctx); CHKERR(ierr);
863     ierr = VecScatterEnd(xx,0,mat->lvec,0,InsertValues,ScatterAll,
864                             mat->Mvctx); CHKERR(ierr);
865     while (its--) {
866       for ( i=m-1; i>-1; i-- ) {
867         n    = A->i[i+1] - A->i[i];
868         idx  = A->j + A->i[i] - 1;
869         v    = A->a + A->i[i] - 1;
870         sum  = b[i];
871         SPARSEDENSEMDOT(sum,xs,v,idx,n);
872         d    = shift + A->a[diag[i]-1];
873         n    = B->i[i+1] - B->i[i];
874         idx  = B->j + B->i[i] - 1;
875         v    = B->a + B->i[i] - 1;
876         SPARSEDENSEMDOT(sum,ls,v,idx,n);
877         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
878       }
879     }
880   }
881   return 0;
882 }
883 static int MatiAIJinsopt(Mat aijin,int op)
884 {
885   Matimpiaij *aij = (Matimpiaij *) aijin->data;
886 
887   if      (op == NO_NEW_NONZERO_LOCATIONS)  {
888     MatSetOption(aij->A,op);
889     MatSetOption(aij->B,op);
890   }
891   else if (op == YES_NEW_NONZERO_LOCATIONS) {
892     MatSetOption(aij->A,op);
893     MatSetOption(aij->B,op);
894   }
895   else if (op == COLUMN_ORIENTED) SETERR(1,"Column oriented not supported");
896   return 0;
897 }
898 
899 static int MatiAIJsize(Mat matin,int *m,int *n)
900 {
901   Matimpiaij *mat = (Matimpiaij *) matin->data;
902   *m = mat->M; *n = mat->N;
903   return 0;
904 }
905 
906 static int MatiAIJlocalsize(Mat matin,int *m,int *n)
907 {
908   Matimpiaij *mat = (Matimpiaij *) matin->data;
909   *m = mat->m; *n = mat->n;
910   return 0;
911 }
912 
913 static int MatiAIJrange(Mat matin,int *m,int *n)
914 {
915   Matimpiaij *mat = (Matimpiaij *) matin->data;
916   *m = mat->rstart; *n = mat->rend;
917   return 0;
918 }
919 
920 static int MatiCopy(Mat,Mat *);
921 
922 /* -------------------------------------------------------------------*/
923 static struct _MatOps MatOps = {MatiAIJInsertValues,
924        0, 0,
925        MatiAIJMult,MatiAIJMultadd,MatiAIJMultTrans,MatiAIJMultTransadd,
926        0,0,0,0,
927        0,0,
928        MatiAIJrelax,
929        0,
930        0,0,0,
931        MatiCopy,
932        MatiAIJgetdiag,0,0,
933        MatiAIJBeginAssemble,MatiAIJEndAssemble,
934        0,
935        MatiAIJinsopt,MatiZero,MatiZerorows,0,
936        0,0,0,0,
937        MatiAIJsize,MatiAIJlocalsize,MatiAIJrange };
938 
939 
940 
941 /*@
942 
943       MatCreateMPIAIJ - Creates a sparse parallel matrix
944                                  in AIJ format.
945 
946   Input Parameters:
947 .   comm - MPI communicator
948 .   m,n - number of local rows and columns (or -1 to have calculated)
949 .   M,N - global rows and columns (or -1 to have calculated)
950 .   d_nz - total number nonzeros in diagonal portion of matrix
951 .   d_nzz - number of nonzeros per row in diagonal portion of matrix or null
952 .           You must leave room for the diagonal entry even if it is zero.
953 .   o_nz - total number nonzeros in off-diagonal portion of matrix
954 .   o_nzz - number of nonzeros per row in off-diagonal portion of matrix
955 .           or null. You must have at least one nonzero per row.
956 
957   Output parameters:
958 .  newmat - the matrix
959 
960   Keywords: matrix, aij, compressed row, sparse, parallel
961 @*/
962 int MatCreateMPIAIJ(MPI_Comm comm,int m,int n,int M,int N,
963                  int d_nz,int *d_nnz, int o_nz,int *o_nnz,Mat *newmat)
964 {
965   Mat          mat;
966   Matimpiaij   *aij;
967   int          ierr, i,sum[2],work[2];
968   *newmat         = 0;
969   PETSCHEADERCREATE(mat,_Mat,MAT_COOKIE,MATAIJMPI,comm);
970   PLogObjectCreate(mat);
971   mat->data       = (void *) (aij = NEW(Matimpiaij)); CHKPTR(aij);
972   mat->ops        = &MatOps;
973   mat->destroy    = MatiAIJdestroy;
974   mat->view       = MatiView;
975   mat->factor     = 0;
976   mat->row        = 0;
977   mat->col        = 0;
978 
979   mat->comm       = comm;
980   aij->insertmode = NotSetValues;
981   MPI_Comm_rank(comm,&aij->mytid);
982   MPI_Comm_size(comm,&aij->numtids);
983 
984   if (M == -1 || N == -1) {
985     work[0] = m; work[1] = n;
986     MPI_Allreduce((void *) work,(void *) sum,2,MPI_INT,MPI_SUM,comm );
987     if (M == -1) M = sum[0];
988     if (N == -1) N = sum[1];
989   }
990   if (m == -1) {m = M/aij->numtids + ((M % aij->numtids) > aij->mytid);}
991   if (n == -1) {n = N/aij->numtids + ((N % aij->numtids) > aij->mytid);}
992   aij->m       = m;
993   aij->n       = n;
994   aij->N       = N;
995   aij->M       = M;
996 
997   /* build local table of row and column ownerships */
998   aij->rowners = (int *) MALLOC(2*(aij->numtids+2)*sizeof(int));
999   CHKPTR(aij->rowners);
1000   aij->cowners = aij->rowners + aij->numtids + 1;
1001   MPI_Allgather(&m,1,MPI_INT,aij->rowners+1,1,MPI_INT,comm);
1002   aij->rowners[0] = 0;
1003   for ( i=2; i<=aij->numtids; i++ ) {
1004     aij->rowners[i] += aij->rowners[i-1];
1005   }
1006   aij->rstart = aij->rowners[aij->mytid];
1007   aij->rend   = aij->rowners[aij->mytid+1];
1008   MPI_Allgather(&n,1,MPI_INT,aij->cowners+1,1,MPI_INT,comm);
1009   aij->cowners[0] = 0;
1010   for ( i=2; i<=aij->numtids; i++ ) {
1011     aij->cowners[i] += aij->cowners[i-1];
1012   }
1013   aij->cstart = aij->cowners[aij->mytid];
1014   aij->cend   = aij->cowners[aij->mytid+1];
1015 
1016 
1017   ierr = MatCreateSequentialAIJ(m,n,d_nz,d_nnz,&aij->A); CHKERR(ierr);
1018   PLogObjectParent(mat,aij->A);
1019   ierr = MatCreateSequentialAIJ(m,N,o_nz,o_nnz,&aij->B); CHKERR(ierr);
1020   PLogObjectParent(mat,aij->B);
1021 
1022   /* build cache for off array entries formed */
1023   aij->stash.nmax = CHUNCKSIZE; /* completely arbratray number */
1024   aij->stash.n    = 0;
1025   aij->stash.array = (Scalar *) MALLOC( aij->stash.nmax*(2*sizeof(int) +
1026                             sizeof(Scalar))); CHKPTR(aij->stash.array);
1027   aij->stash.idx = (int *) (aij->stash.array + aij->stash.nmax);
1028   aij->stash.idy = (int *) (aij->stash.idx + aij->stash.nmax);
1029   aij->colmap    = 0;
1030   aij->garray    = 0;
1031 
1032   /* stuff used for matrix vector multiply */
1033   aij->lvec      = 0;
1034   aij->Mvctx     = 0;
1035   aij->assembled = 0;
1036 
1037   *newmat = mat;
1038   return 0;
1039 }
1040 
1041 static int MatiCopy(Mat matin,Mat *newmat)
1042 {
1043   Mat        mat;
1044   Matimpiaij *aij,*oldmat = (Matimpiaij *) matin->data;
1045   int        ierr;
1046   *newmat      = 0;
1047 
1048   if (!oldmat->assembled) SETERR(1,"Cannot copy unassembled matrix");
1049   PETSCHEADERCREATE(mat,_Mat,MAT_COOKIE,MATAIJMPI,matin->comm);
1050   PLogObjectCreate(mat);
1051   mat->data       = (void *) (aij = NEW(Matimpiaij)); CHKPTR(aij);
1052   mat->ops        = &MatOps;
1053   mat->destroy    = MatiAIJdestroy;
1054   mat->view       = MatiView;
1055   mat->factor     = matin->factor;
1056   mat->row        = 0;
1057   mat->col        = 0;
1058 
1059   aij->m          = oldmat->m;
1060   aij->n          = oldmat->n;
1061   aij->M          = oldmat->M;
1062   aij->N          = oldmat->N;
1063 
1064   aij->assembled  = 1;
1065   aij->rstart     = oldmat->rstart;
1066   aij->rend       = oldmat->rend;
1067   aij->cstart     = oldmat->cstart;
1068   aij->cend       = oldmat->cend;
1069   aij->numtids    = oldmat->numtids;
1070   aij->mytid      = oldmat->mytid;
1071   aij->insertmode = NotSetValues;
1072 
1073   aij->rowners    = (int *) MALLOC( (aij->numtids+1)*sizeof(int) );
1074   CHKPTR(aij->rowners);
1075   MEMCPY(aij->rowners,oldmat->rowners,(aij->numtids+1)*sizeof(int));
1076   aij->stash.nmax = 0;
1077   aij->stash.n    = 0;
1078   aij->stash.array= 0;
1079   aij->colmap     = 0;
1080   aij->garray     = 0;
1081   mat->comm       = matin->comm;
1082 
1083   ierr =  VecCreate(oldmat->lvec,&aij->lvec); CHKERR(ierr);
1084   PLogObjectParent(mat,aij->lvec);
1085   ierr =  VecScatterCtxCopy(oldmat->Mvctx,&aij->Mvctx); CHKERR(ierr);
1086   PLogObjectParent(mat,aij->Mvctx);
1087   ierr =  MatCopy(oldmat->A,&aij->A); CHKERR(ierr);
1088   PLogObjectParent(mat,aij->A);
1089   ierr =  MatCopy(oldmat->B,&aij->B); CHKERR(ierr);
1090   PLogObjectParent(mat,aij->B);
1091   *newmat = mat;
1092   return 0;
1093 }
1094