xref: /petsc/src/mat/impls/aij/mpi/mpiaij.c (revision abc0e9e4e3d6f0c410da20f63879832668d2a788)
1 #ifndef lint
2 static char vcid[] = "$Id: mpiaij.c,v 1.17 1995/03/25 01:26:53 bsmith Exp curfman $";
3 #endif
4 
5 #include "mpiaij.h"
6 #include "vec/vecimpl.h"
7 #include "inline/spops.h"
8 
9 #define CHUNCKSIZE   100
10 /*
11    This is a simple minded stash. Do a linear search to determine if
12  in stash, if not add to end.
13 */
14 static int StashValues(Stash *stash,int row,int n, int *idxn,
15                        Scalar *values,InsertMode addv)
16 {
17   int    i,j,N = stash->n,found,*n_idx, *n_idy;
18   Scalar val,*n_array;
19 
20   for ( i=0; i<n; i++ ) {
21     found = 0;
22     val = *values++;
23     for ( j=0; j<N; j++ ) {
24       if ( stash->idx[j] == row && stash->idy[j] == idxn[i]) {
25         /* found a match */
26         if (addv == AddValues) stash->array[j] += val;
27         else stash->array[j] = val;
28         found = 1;
29         break;
30       }
31     }
32     if (!found) { /* not found so add to end */
33       if ( stash->n == stash->nmax ) {
34         /* allocate a larger stash */
35         n_array = (Scalar *) MALLOC( (stash->nmax + CHUNCKSIZE)*(
36                                      2*sizeof(int) + sizeof(Scalar)));
37         CHKPTR(n_array);
38         n_idx = (int *) (n_array + stash->nmax + CHUNCKSIZE);
39         n_idy = (int *) (n_idx + stash->nmax + CHUNCKSIZE);
40         MEMCPY(n_array,stash->array,stash->nmax*sizeof(Scalar));
41         MEMCPY(n_idx,stash->idx,stash->nmax*sizeof(int));
42         MEMCPY(n_idy,stash->idy,stash->nmax*sizeof(int));
43         if (stash->array) FREE(stash->array);
44         stash->array = n_array; stash->idx = n_idx; stash->idy = n_idy;
45         stash->nmax += CHUNCKSIZE;
46       }
47       stash->array[stash->n]   = val;
48       stash->idx[stash->n]     = row;
49       stash->idy[stash->n++]   = idxn[i];
50     }
51   }
52   return 0;
53 }
54 
55 /* local utility routine that creates a mapping from the global column
56 number to the local number in the off-diagonal part of the local
57 storage of the matrix.  This is done in a non scable way since the
58 length of colmap equals the global matrix length.
59 */
60 static int CreateColmap(Mat mat)
61 {
62   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
63   Mat_AIJ    *B = (Mat_AIJ*) aij->B->data;
64   int        n = B->n,i;
65   aij->colmap = (int *) MALLOC( aij->N*sizeof(int) ); CHKPTR(aij->colmap);
66   MEMSET(aij->colmap,0,aij->N*sizeof(int));
67   for ( i=0; i<n; i++ ) aij->colmap[aij->garray[i]] = i+1;
68   return 0;
69 }
70 
71 static int MatInsertValues_MPIAIJ(Mat mat,int m,int *idxm,int n,
72                             int *idxn,Scalar *v,InsertMode addv)
73 {
74   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
75   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
76   int        cstart = aij->cstart, cend = aij->cend,row,col;
77 
78   if (aij->insertmode != NotSetValues && aij->insertmode != addv) {
79     SETERR(1,"You cannot mix inserts and adds");
80   }
81   aij->insertmode = addv;
82   for ( i=0; i<m; i++ ) {
83     if (idxm[i] < 0) SETERR(1,"Negative row index");
84     if (idxm[i] >= aij->M) SETERR(1,"Row index too large");
85     if (idxm[i] >= rstart && idxm[i] < rend) {
86       row = idxm[i] - rstart;
87       for ( j=0; j<n; j++ ) {
88         if (idxn[j] < 0) SETERR(1,"Negative column index");
89         if (idxn[j] >= aij->N) SETERR(1,"Column index too large");
90         if (idxn[j] >= cstart && idxn[j] < cend){
91           col = idxn[j] - cstart;
92           ierr = MatSetValues(aij->A,1,&row,1,&col,v+i*n+j,addv);CHKERR(ierr);
93         }
94         else {
95           if (aij->assembled) {
96             if (!aij->colmap) {ierr = CreateColmap(mat); CHKERR(ierr);}
97             col = aij->colmap[idxn[j]] - 1;
98             if (col < 0) {
99               SETERR(1,"Cannot insert new off diagonal block nonzero in\
100                      already\
101                      assembled matrix. Contact petsc-maint@mcs.anl.gov\
102                      if your need this feature");
103             }
104           }
105           else col = idxn[j];
106           ierr = MatSetValues(aij->B,1,&row,1,&col,v+i*n+j,addv);CHKERR(ierr);
107         }
108       }
109     }
110     else {
111       ierr = StashValues(&aij->stash,idxm[i],n,idxn,v+i*n,addv);CHKERR(ierr);
112     }
113   }
114   return 0;
115 }
116 
117 /*
118     the assembly code is alot like the code for vectors, we should
119     sometime derive a single assembly code that can be used for
120     either case.
121 */
122 
123 static int MatBeginAssemble_MPIAIJ(Mat mat)
124 {
125   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
126   MPI_Comm    comm = mat->comm;
127   int         numtids = aij->numtids, *owners = aij->rowners;
128   int         mytid = aij->mytid;
129   MPI_Request *send_waits,*recv_waits;
130   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
131   int         tag = 50, *owner,*starts,count;
132   InsertMode  addv;
133   Scalar      *rvalues,*svalues;
134 
135   /* make sure all processors are either in INSERTMODE or ADDMODE */
136   MPI_Allreduce((void *) &aij->insertmode,(void *) &addv,1,MPI_INT,
137                 MPI_BOR,comm);
138   if (addv == (AddValues|InsertValues)) {
139     SETERR(1,"Some processors have inserted while others have added");
140   }
141   aij->insertmode = addv; /* in case this processor had no cache */
142 
143   /*  first count number of contributors to each processor */
144   nprocs = (int *) MALLOC( 2*numtids*sizeof(int) ); CHKPTR(nprocs);
145   MEMSET(nprocs,0,2*numtids*sizeof(int)); procs = nprocs + numtids;
146   owner = (int *) MALLOC( (aij->stash.n+1)*sizeof(int) ); CHKPTR(owner);
147   for ( i=0; i<aij->stash.n; i++ ) {
148     idx = aij->stash.idx[i];
149     for ( j=0; j<numtids; j++ ) {
150       if (idx >= owners[j] && idx < owners[j+1]) {
151         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
152       }
153     }
154   }
155   nsends = 0;  for ( i=0; i<numtids; i++ ) { nsends += procs[i];}
156 
157   /* inform other processors of number of messages and max length*/
158   work = (int *) MALLOC( numtids*sizeof(int) ); CHKPTR(work);
159   MPI_Allreduce((void *) procs,(void *) work,numtids,MPI_INT,MPI_SUM,comm);
160   nreceives = work[mytid];
161   MPI_Allreduce((void *) nprocs,(void *) work,numtids,MPI_INT,MPI_MAX,comm);
162   nmax = work[mytid];
163   FREE(work);
164 
165   /* post receives:
166        1) each message will consist of ordered pairs
167      (global index,value) we store the global index as a double
168      to simplify the message passing.
169        2) since we don't know how long each individual message is we
170      allocate the largest needed buffer for each receive. Potentially
171      this is a lot of wasted space.
172 
173 
174        This could be done better.
175   */
176   rvalues = (Scalar *) MALLOC(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
177   CHKPTR(rvalues);
178   recv_waits = (MPI_Request *) MALLOC((nreceives+1)*sizeof(MPI_Request));
179   CHKPTR(recv_waits);
180   for ( i=0; i<nreceives; i++ ) {
181     MPI_Irecv((void *)(rvalues+3*nmax*i),3*nmax,MPI_SCALAR,MPI_ANY_SOURCE,tag,
182               comm,recv_waits+i);
183   }
184 
185   /* do sends:
186       1) starts[i] gives the starting index in svalues for stuff going to
187          the ith processor
188   */
189   svalues = (Scalar *) MALLOC( 3*(aij->stash.n+1)*sizeof(Scalar) );
190   CHKPTR(svalues);
191   send_waits = (MPI_Request *) MALLOC( (nsends+1)*sizeof(MPI_Request));
192   CHKPTR(send_waits);
193   starts = (int *) MALLOC( numtids*sizeof(int) ); CHKPTR(starts);
194   starts[0] = 0;
195   for ( i=1; i<numtids; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
196   for ( i=0; i<aij->stash.n; i++ ) {
197     svalues[3*starts[owner[i]]]       = (Scalar)  aij->stash.idx[i];
198     svalues[3*starts[owner[i]]+1]     = (Scalar)  aij->stash.idy[i];
199     svalues[3*(starts[owner[i]]++)+2] =  aij->stash.array[i];
200   }
201   FREE(owner);
202   starts[0] = 0;
203   for ( i=1; i<numtids; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
204   count = 0;
205   for ( i=0; i<numtids; i++ ) {
206     if (procs[i]) {
207       MPI_Isend((void*)(svalues+3*starts[i]),3*nprocs[i],MPI_SCALAR,i,tag,
208                 comm,send_waits+count++);
209     }
210   }
211   FREE(starts); FREE(nprocs);
212 
213   /* Free cache space */
214   aij->stash.nmax = aij->stash.n = 0;
215   if (aij->stash.array){ FREE(aij->stash.array); aij->stash.array = 0;}
216 
217   aij->svalues    = svalues;       aij->rvalues = rvalues;
218   aij->nsends     = nsends;         aij->nrecvs = nreceives;
219   aij->send_waits = send_waits; aij->recv_waits = recv_waits;
220   aij->rmax       = nmax;
221 
222   return 0;
223 }
224 extern int MatSetUpMultiply_MPIAIJ(Mat);
225 
226 static int MatEndAssemble_MPIAIJ(Mat mat)
227 {
228   int        ierr;
229   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
230 
231   MPI_Status  *send_status,recv_status;
232   int         imdex,nrecvs = aij->nrecvs, count = nrecvs, i, n;
233   int         row,col;
234   Scalar      *values,val;
235   InsertMode  addv = aij->insertmode;
236 
237   /*  wait on receives */
238   while (count) {
239     MPI_Waitany(nrecvs,aij->recv_waits,&imdex,&recv_status);
240     /* unpack receives into our local space */
241     values = aij->rvalues + 3*imdex*aij->rmax;
242     MPI_Get_count(&recv_status,MPI_SCALAR,&n);
243     n = n/3;
244     for ( i=0; i<n; i++ ) {
245       row = (int) PETSCREAL(values[3*i]) - aij->rstart;
246       col = (int) PETSCREAL(values[3*i+1]);
247       val = values[3*i+2];
248       if (col >= aij->cstart && col < aij->cend) {
249           col -= aij->cstart;
250         MatSetValues(aij->A,1,&row,1,&col,&val,addv);
251       }
252       else {
253         if (aij->assembled) {
254           if (!aij->colmap) {ierr = CreateColmap(mat); CHKERR(ierr);}
255           col = aij->colmap[col] - 1;
256           if (col < 0) {
257             SETERR(1,"Cannot insert new off diagonal block nonzero in\
258                      already\
259                      assembled matrix. Contact petsc-maint@mcs.anl.gov\
260                      if your need this feature");
261           }
262         }
263         MatSetValues(aij->B,1,&row,1,&col,&val,addv);
264       }
265     }
266     count--;
267   }
268   FREE(aij->recv_waits); FREE(aij->rvalues);
269 
270   /* wait on sends */
271   if (aij->nsends) {
272     send_status = (MPI_Status *) MALLOC( aij->nsends*sizeof(MPI_Status) );
273     CHKPTR(send_status);
274     MPI_Waitall(aij->nsends,aij->send_waits,send_status);
275     FREE(send_status);
276   }
277   FREE(aij->send_waits); FREE(aij->svalues);
278 
279   aij->insertmode = NotSetValues;
280   ierr = MatBeginAssembly(aij->A); CHKERR(ierr);
281   ierr = MatEndAssembly(aij->A); CHKERR(ierr);
282 
283   if (!aij->assembled) {
284     ierr = MatSetUpMultiply_MPIAIJ(mat); CHKERR(ierr);
285   }
286   ierr = MatBeginAssembly(aij->B); CHKERR(ierr);
287   ierr = MatEndAssembly(aij->B); CHKERR(ierr);
288 
289   aij->assembled = 1;
290   return 0;
291 }
292 
293 static int MatZero_MPIAIJ(Mat A)
294 {
295   Mat_MPIAIJ *l = (Mat_MPIAIJ *) A->data;
296 
297   MatZeroEntries(l->A); MatZeroEntries(l->B);
298   return 0;
299 }
300 
301 /* again this uses the same basic stratagy as in the assembly and
302    scatter create routines, we should try to do it systemamatically
303    if we can figure out the proper level of generality. */
304 
305 /* the code does not do the diagonal entries correctly unless the
306    matrix is square and the column and row owerships are identical.
307    This is a BUG. The only way to fix it seems to be to access
308    aij->A and aij->B directly and not through the MatZeroRows()
309    routine.
310 */
311 static int MatZeroRows_MPIAIJ(Mat A,IS is,Scalar *diag)
312 {
313   Mat_MPIAIJ     *l = (Mat_MPIAIJ *) A->data;
314   int            i,ierr,N, *rows,*owners = l->rowners,numtids = l->numtids;
315   int            *procs,*nprocs,j,found,idx,nsends,*work;
316   int            nmax,*svalues,*starts,*owner,nrecvs,mytid = l->mytid;
317   int            *rvalues,tag = 67,count,base,slen,n,*source;
318   int            *lens,imdex,*lrows,*values;
319   MPI_Comm       comm = A->comm;
320   MPI_Request    *send_waits,*recv_waits;
321   MPI_Status     recv_status,*send_status;
322   IS             istmp;
323 
324   if (!l->assembled) SETERR(1,"MatZeroRows_MPIAIJ: must assemble matrix first");
325   ierr = ISGetLocalSize(is,&N); CHKERR(ierr);
326   ierr = ISGetIndices(is,&rows); CHKERR(ierr);
327 
328   /*  first count number of contributors to each processor */
329   nprocs = (int *) MALLOC( 2*numtids*sizeof(int) ); CHKPTR(nprocs);
330   MEMSET(nprocs,0,2*numtids*sizeof(int)); procs = nprocs + numtids;
331   owner = (int *) MALLOC((N+1)*sizeof(int)); CHKPTR(owner); /* see note*/
332   for ( i=0; i<N; i++ ) {
333     idx = rows[i];
334     found = 0;
335     for ( j=0; j<numtids; j++ ) {
336       if (idx >= owners[j] && idx < owners[j+1]) {
337         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
338       }
339     }
340     if (!found) SETERR(1,"Imdex out of range");
341   }
342   nsends = 0;  for ( i=0; i<numtids; i++ ) { nsends += procs[i];}
343 
344   /* inform other processors of number of messages and max length*/
345   work = (int *) MALLOC( numtids*sizeof(int) ); CHKPTR(work);
346   MPI_Allreduce((void *) procs,(void *) work,numtids,MPI_INT,MPI_SUM,comm);
347   nrecvs = work[mytid];
348   MPI_Allreduce((void *) nprocs,(void *) work,numtids,MPI_INT,MPI_MAX,comm);
349   nmax = work[mytid];
350   FREE(work);
351 
352   /* post receives:   */
353   rvalues = (int *) MALLOC((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
354   CHKPTR(rvalues);
355   recv_waits = (MPI_Request *) MALLOC((nrecvs+1)*sizeof(MPI_Request));
356   CHKPTR(recv_waits);
357   for ( i=0; i<nrecvs; i++ ) {
358     MPI_Irecv((void *)(rvalues+nmax*i),nmax,MPI_INT,MPI_ANY_SOURCE,tag,
359               comm,recv_waits+i);
360   }
361 
362   /* do sends:
363       1) starts[i] gives the starting index in svalues for stuff going to
364          the ith processor
365   */
366   svalues = (int *) MALLOC( (N+1)*sizeof(int) ); CHKPTR(svalues);
367   send_waits = (MPI_Request *) MALLOC( (nsends+1)*sizeof(MPI_Request));
368   CHKPTR(send_waits);
369   starts = (int *) MALLOC( (numtids+1)*sizeof(int) ); CHKPTR(starts);
370   starts[0] = 0;
371   for ( i=1; i<numtids; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
372   for ( i=0; i<N; i++ ) {
373     svalues[starts[owner[i]]++] = rows[i];
374   }
375   ISRestoreIndices(is,&rows);
376 
377   starts[0] = 0;
378   for ( i=1; i<numtids+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
379   count = 0;
380   for ( i=0; i<numtids; i++ ) {
381     if (procs[i]) {
382       MPI_Isend((void*)(svalues+starts[i]),nprocs[i],MPI_INT,i,tag,
383                 comm,send_waits+count++);
384     }
385   }
386   FREE(starts);
387 
388   base = owners[mytid];
389 
390   /*  wait on receives */
391   lens = (int *) MALLOC( 2*(nrecvs+1)*sizeof(int) ); CHKPTR(lens);
392   source = lens + nrecvs;
393   count = nrecvs; slen = 0;
394   while (count) {
395     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
396     /* unpack receives into our local space */
397     MPI_Get_count(&recv_status,MPI_INT,&n);
398     source[imdex]  = recv_status.MPI_SOURCE;
399     lens[imdex]  = n;
400     slen += n;
401     count--;
402   }
403   FREE(recv_waits);
404 
405   /* move the data into the send scatter */
406   lrows = (int *) MALLOC( slen*sizeof(int) ); CHKPTR(lrows);
407   count = 0;
408   for ( i=0; i<nrecvs; i++ ) {
409     values = rvalues + i*nmax;
410     for ( j=0; j<lens[i]; j++ ) {
411       lrows[count++] = values[j] - base;
412     }
413   }
414   FREE(rvalues); FREE(lens);
415   FREE(owner); FREE(nprocs);
416 
417   /* actually zap the local rows */
418   ierr = ISCreateSequential(slen,lrows,&istmp); CHKERR(ierr);  FREE(lrows);
419   ierr = MatZeroRows(l->A,istmp,diag); CHKERR(ierr);
420   ierr = MatZeroRows(l->B,istmp,0); CHKERR(ierr);
421   ierr = ISDestroy(istmp); CHKERR(ierr);
422 
423   /* wait on sends */
424   if (nsends) {
425     send_status = (MPI_Status *) MALLOC( nsends*sizeof(MPI_Status) );
426     CHKPTR(send_status);
427     MPI_Waitall(nsends,send_waits,send_status);
428     FREE(send_status);
429   }
430   FREE(send_waits); FREE(svalues);
431 
432 
433   return 0;
434 }
435 
436 static int MatMult_MPIAIJ(Mat aijin,Vec xx,Vec yy)
437 {
438   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) aijin->data;
439   int        ierr;
440   if (!aij->assembled) SETERR(1,"MatMult_MPIAIJ: must assemble matrix first");
441   ierr = VecScatterBegin(xx,0,aij->lvec,0,InsertValues,ScatterAll,aij->Mvctx);
442   CHKERR(ierr);
443   ierr = MatMult(aij->A,xx,yy); CHKERR(ierr);
444   ierr = VecScatterEnd(xx,0,aij->lvec,0,InsertValues,ScatterAll,aij->Mvctx);
445   CHKERR(ierr);
446   ierr = MatMultAdd(aij->B,aij->lvec,yy,yy); CHKERR(ierr);
447   return 0;
448 }
449 
450 static int MatMultAdd_MPIAIJ(Mat aijin,Vec xx,Vec yy,Vec zz)
451 {
452   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) aijin->data;
453   int        ierr;
454   if (!aij->assembled) SETERR(1,"MatMult_MPIAIJ: must assemble matrix first");
455   ierr = VecScatterBegin(xx,0,aij->lvec,0,InsertValues,ScatterAll,aij->Mvctx);
456   CHKERR(ierr);
457   ierr = MatMultAdd(aij->A,xx,yy,zz); CHKERR(ierr);
458   ierr = VecScatterEnd(xx,0,aij->lvec,0,InsertValues,ScatterAll,aij->Mvctx);
459   CHKERR(ierr);
460   ierr = MatMultAdd(aij->B,aij->lvec,zz,zz); CHKERR(ierr);
461   return 0;
462 }
463 
464 static int MatMultTrans_MPIAIJ(Mat aijin,Vec xx,Vec yy)
465 {
466   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) aijin->data;
467   int        ierr;
468 
469   if (!aij->assembled)
470     SETERR(1,"MatMulTrans_MPIAIJ: must assemble matrix first");
471   /* do nondiagonal part */
472   ierr = MatMultTrans(aij->B,xx,aij->lvec); CHKERR(ierr);
473   /* send it on its way */
474   ierr = VecScatterBegin(aij->lvec,0,yy,0,AddValues,
475                          ScatterAll|ScatterReverse,aij->Mvctx); CHKERR(ierr);
476   /* do local part */
477   ierr = MatMultTrans(aij->A,xx,yy); CHKERR(ierr);
478   /* receive remote parts: note this assumes the values are not actually */
479   /* inserted in yy until the next line, which is true for my implementation*/
480   /* but is not perhaps always true. */
481   ierr = VecScatterEnd(aij->lvec,0,yy,0,AddValues,ScatterAll|ScatterReverse,
482                          aij->Mvctx); CHKERR(ierr);
483   return 0;
484 }
485 
486 static int MatMultTransAdd_MPIAIJ(Mat aijin,Vec xx,Vec yy,Vec zz)
487 {
488   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) aijin->data;
489   int        ierr;
490 
491   if (!aij->assembled)
492     SETERR(1,"MatMulTransAdd_MPIAIJ: must assemble matrix first");
493   /* do nondiagonal part */
494   ierr = MatMultTrans(aij->B,xx,aij->lvec); CHKERR(ierr);
495   /* send it on its way */
496   ierr = VecScatterBegin(aij->lvec,0,zz,0,AddValues,
497                          ScatterAll|ScatterReverse,aij->Mvctx); CHKERR(ierr);
498   /* do local part */
499   ierr = MatMultTransAdd(aij->A,xx,yy,zz); CHKERR(ierr);
500   /* receive remote parts: note this assumes the values are not actually */
501   /* inserted in yy until the next line, which is true for my implementation*/
502   /* but is not perhaps always true. */
503   ierr = VecScatterEnd(aij->lvec,0,zz,0,AddValues,ScatterAll|ScatterReverse,
504                          aij->Mvctx); CHKERR(ierr);
505   return 0;
506 }
507 
508 /*
509   This only works correctly for square matrices where the subblock A->A is the
510    diagonal block
511 */
512 static int MatGetDiag_MPIAIJ(Mat Ain,Vec v)
513 {
514   Mat_MPIAIJ *A = (Mat_MPIAIJ *) Ain->data;
515   if (!A->assembled) SETERR(1,"MatGetDiag_MPIAIJ: must assemble matrix first");
516   return MatGetDiagonal(A->A,v);
517 }
518 
519 static int MatDestroy_MPIAIJ(PetscObject obj)
520 {
521   Mat        mat = (Mat) obj;
522   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
523   int        ierr;
524 #if defined(PETSC_LOG)
525   PLogObjectState(obj,"Rows %d Cols %d",aij->M,aij->N);
526 #endif
527   FREE(aij->rowners);
528   ierr = MatDestroy(aij->A); CHKERR(ierr);
529   ierr = MatDestroy(aij->B); CHKERR(ierr);
530   if (aij->colmap) FREE(aij->colmap);
531   if (aij->garray) FREE(aij->garray);
532   if (aij->lvec) VecDestroy(aij->lvec);
533   if (aij->Mvctx) VecScatterCtxDestroy(aij->Mvctx);
534   FREE(aij);
535   PLogObjectDestroy(mat);
536   PETSCHEADERDESTROY(mat);
537   return 0;
538 }
539 
540 static int MatView_MPIAIJ(PetscObject obj,Viewer viewer)
541 {
542   Mat        mat = (Mat) obj;
543   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
544   int        ierr;
545   PetscObject vobj = (PetscObject) viewer;
546 
547   if (!aij->assembled) SETERR(1,"MatView_MPIAIJ: must assemble matrix first");
548   if (vobj->cookie == VIEWER_COOKIE) {
549     FILE *fd = ViewerFileGetPointer(viewer);
550     if (vobj->type == FILE_VIEWER) {
551       MPE_Seq_begin(mat->comm,1);
552       fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
553              aij->mytid,aij->m,aij->rstart,aij->rend,aij->n,aij->cstart,
554              aij->cend);
555       ierr = MatView(aij->A,viewer); CHKERR(ierr);
556       ierr = MatView(aij->B,viewer); CHKERR(ierr);
557       fflush(fd);
558       MPE_Seq_end(mat->comm,1);
559     }
560   }
561   return 0;
562 }
563 
564 extern int MatMarkDiag_AIJ(Mat_AIJ  *);
565 /*
566     This has to provide several versions.
567 
568      1) per sequential
569      2) a) use only local smoothing updating outer values only once.
570         b) local smoothing updating outer values each inner iteration
571      3) color updating out values betwen colors.
572 */
573 static int MatRelax_MPIAIJ(Mat matin,Vec bb,double omega,int flag,double shift,
574                         int its,Vec xx)
575 {
576   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
577   Mat        AA = mat->A, BB = mat->B;
578   Mat_AIJ    *A = (Mat_AIJ *) AA->data, *B = (Mat_AIJ *)BB->data;
579   Scalar     zero = 0.0,*b,*x,*xs,*ls,d,*v,sum,scale,*t,*ts;
580   int        ierr,*idx, *diag;
581   int        n = mat->n, m = mat->m, i;
582   Vec        tt;
583 
584   if (!mat->assembled) SETERR(1,"MatRelax_MPIAIJ: must assemble matrix first");
585 
586   VecGetArray(xx,&x); VecGetArray(bb,&b); VecGetArray(mat->lvec,&ls);
587   xs = x -1; /* shift by one for index start of 1 */
588   ls--;
589   if (!A->diag) {if ((ierr = MatMarkDiag_AIJ(A))) return ierr;}
590   diag = A->diag;
591   if (flag == SOR_APPLY_UPPER || flag == SOR_APPLY_LOWER) {
592     SETERR(1,"That option not yet support for parallel AIJ matrices");
593   }
594   if (flag & SOR_EISENSTAT) {
595     /* Let  A = L + U + D; where L is lower trianglar,
596     U is upper triangular, E is diagonal; This routine applies
597 
598             (L + E)^{-1} A (U + E)^{-1}
599 
600     to a vector efficiently using Eisenstat's trick. This is for
601     the case of SSOR preconditioner, so E is D/omega where omega
602     is the relaxation factor.
603     */
604     ierr = VecCreate(xx,&tt); CHKERR(ierr);
605     VecGetArray(tt,&t);
606     scale = (2.0/omega) - 1.0;
607     /*  x = (E + U)^{-1} b */
608     VecSet(&zero,mat->lvec);
609     ierr = VecPipelineBegin(xx,0,mat->lvec,0,InsertValues,PipelineUp,
610                               mat->Mvctx); CHKERR(ierr);
611     for ( i=m-1; i>-1; i-- ) {
612       n    = A->i[i+1] - diag[i] - 1;
613       idx  = A->j + diag[i];
614       v    = A->a + diag[i];
615       sum  = b[i];
616       SPARSEDENSEMDOT(sum,xs,v,idx,n);
617       d    = shift + A->a[diag[i]-1];
618       n    = B->i[i+1] - B->i[i];
619       idx  = B->j + B->i[i] - 1;
620       v    = B->a + B->i[i] - 1;
621       SPARSEDENSEMDOT(sum,ls,v,idx,n);
622       x[i] = omega*(sum/d);
623     }
624     ierr = VecPipelineEnd(xx,0,mat->lvec,0,InsertValues,PipelineUp,
625                             mat->Mvctx); CHKERR(ierr);
626 
627     /*  t = b - (2*E - D)x */
628     v = A->a;
629     for ( i=0; i<m; i++ ) { t[i] = b[i] - scale*(v[*diag++ - 1])*x[i]; }
630 
631     /*  t = (E + L)^{-1}t */
632     ts = t - 1; /* shifted by one for index start of a or mat->j*/
633     diag = A->diag;
634     VecSet(&zero,mat->lvec);
635     ierr = VecPipelineBegin(tt,0,mat->lvec,0,InsertValues,PipelineDown,
636                                                  mat->Mvctx); CHKERR(ierr);
637     for ( i=0; i<m; i++ ) {
638       n    = diag[i] - A->i[i];
639       idx  = A->j + A->i[i] - 1;
640       v    = A->a + A->i[i] - 1;
641       sum  = t[i];
642       SPARSEDENSEMDOT(sum,ts,v,idx,n);
643       d    = shift + A->a[diag[i]-1];
644       n    = B->i[i+1] - B->i[i];
645       idx  = B->j + B->i[i] - 1;
646       v    = B->a + B->i[i] - 1;
647       SPARSEDENSEMDOT(sum,ls,v,idx,n);
648       t[i] = omega*(sum/d);
649     }
650     ierr = VecPipelineEnd(tt,0,mat->lvec,0,InsertValues,PipelineDown,
651                                                     mat->Mvctx); CHKERR(ierr);
652     /*  x = x + t */
653     for ( i=0; i<m; i++ ) { x[i] += t[i]; }
654     VecDestroy(tt);
655     return 0;
656   }
657 
658 
659   if ((flag & SOR_SYMMETRIC_SWEEP) == SOR_SYMMETRIC_SWEEP){
660     if (flag & SOR_ZERO_INITIAL_GUESS) {
661       VecSet(&zero,mat->lvec); VecSet(&zero,xx);
662     }
663     else {
664       ierr=VecScatterBegin(xx,0,mat->lvec,0,InsertValues,ScatterUp,mat->Mvctx);
665       CHKERR(ierr);
666       ierr = VecScatterEnd(xx,0,mat->lvec,0,InsertValues,ScatterUp,mat->Mvctx);
667       CHKERR(ierr);
668     }
669     while (its--) {
670       /* go down through the rows */
671       ierr = VecPipelineBegin(xx,0,mat->lvec,0,InsertValues,PipelineDown,
672                               mat->Mvctx); CHKERR(ierr);
673       for ( i=0; i<m; i++ ) {
674         n    = A->i[i+1] - A->i[i];
675         idx  = A->j + A->i[i] - 1;
676         v    = A->a + A->i[i] - 1;
677         sum  = b[i];
678         SPARSEDENSEMDOT(sum,xs,v,idx,n);
679         d    = shift + A->a[diag[i]-1];
680         n    = B->i[i+1] - B->i[i];
681         idx  = B->j + B->i[i] - 1;
682         v    = B->a + B->i[i] - 1;
683         SPARSEDENSEMDOT(sum,ls,v,idx,n);
684         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
685       }
686       ierr = VecPipelineEnd(xx,0,mat->lvec,0,InsertValues,PipelineDown,
687                             mat->Mvctx); CHKERR(ierr);
688       /* come up through the rows */
689       ierr = VecPipelineBegin(xx,0,mat->lvec,0,InsertValues,PipelineUp,
690                               mat->Mvctx); CHKERR(ierr);
691       for ( i=m-1; i>-1; i-- ) {
692         n    = A->i[i+1] - A->i[i];
693         idx  = A->j + A->i[i] - 1;
694         v    = A->a + A->i[i] - 1;
695         sum  = b[i];
696         SPARSEDENSEMDOT(sum,xs,v,idx,n);
697         d    = shift + A->a[diag[i]-1];
698         n    = B->i[i+1] - B->i[i];
699         idx  = B->j + B->i[i] - 1;
700         v    = B->a + B->i[i] - 1;
701         SPARSEDENSEMDOT(sum,ls,v,idx,n);
702         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
703       }
704       ierr = VecPipelineEnd(xx,0,mat->lvec,0,InsertValues,PipelineUp,
705                             mat->Mvctx); CHKERR(ierr);
706     }
707   }
708   else if (flag & SOR_FORWARD_SWEEP){
709     if (flag & SOR_ZERO_INITIAL_GUESS) {
710       VecSet(&zero,mat->lvec);
711       ierr = VecPipelineBegin(xx,0,mat->lvec,0,InsertValues,PipelineDown,
712                               mat->Mvctx); CHKERR(ierr);
713       for ( i=0; i<m; i++ ) {
714         n    = diag[i] - A->i[i];
715         idx  = A->j + A->i[i] - 1;
716         v    = A->a + A->i[i] - 1;
717         sum  = b[i];
718         SPARSEDENSEMDOT(sum,xs,v,idx,n);
719         d    = shift + A->a[diag[i]-1];
720         n    = B->i[i+1] - B->i[i];
721         idx  = B->j + B->i[i] - 1;
722         v    = B->a + B->i[i] - 1;
723         SPARSEDENSEMDOT(sum,ls,v,idx,n);
724         x[i] = omega*(sum/d);
725       }
726       ierr = VecPipelineEnd(xx,0,mat->lvec,0,InsertValues,PipelineDown,
727                             mat->Mvctx); CHKERR(ierr);
728       its--;
729     }
730     while (its--) {
731       ierr=VecScatterBegin(xx,0,mat->lvec,0,InsertValues,ScatterUp,mat->Mvctx);
732       CHKERR(ierr);
733       ierr = VecScatterEnd(xx,0,mat->lvec,0,InsertValues,ScatterUp,mat->Mvctx);
734       CHKERR(ierr);
735       ierr = VecPipelineBegin(xx,0,mat->lvec,0,InsertValues,PipelineDown,
736                               mat->Mvctx); CHKERR(ierr);
737       for ( i=0; i<m; i++ ) {
738         n    = A->i[i+1] - A->i[i];
739         idx  = A->j + A->i[i] - 1;
740         v    = A->a + A->i[i] - 1;
741         sum  = b[i];
742         SPARSEDENSEMDOT(sum,xs,v,idx,n);
743         d    = shift + A->a[diag[i]-1];
744         n    = B->i[i+1] - B->i[i];
745         idx  = B->j + B->i[i] - 1;
746         v    = B->a + B->i[i] - 1;
747         SPARSEDENSEMDOT(sum,ls,v,idx,n);
748         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
749       }
750       ierr = VecPipelineEnd(xx,0,mat->lvec,0,InsertValues,PipelineDown,
751                             mat->Mvctx); CHKERR(ierr);
752     }
753   }
754   else if (flag & SOR_BACKWARD_SWEEP){
755     if (flag & SOR_ZERO_INITIAL_GUESS) {
756       VecSet(&zero,mat->lvec);
757       ierr = VecPipelineBegin(xx,0,mat->lvec,0,InsertValues,PipelineUp,
758                               mat->Mvctx); CHKERR(ierr);
759       for ( i=m-1; i>-1; i-- ) {
760         n    = A->i[i+1] - diag[i] - 1;
761         idx  = A->j + diag[i];
762         v    = A->a + diag[i];
763         sum  = b[i];
764         SPARSEDENSEMDOT(sum,xs,v,idx,n);
765         d    = shift + A->a[diag[i]-1];
766         n    = B->i[i+1] - B->i[i];
767         idx  = B->j + B->i[i] - 1;
768         v    = B->a + B->i[i] - 1;
769         SPARSEDENSEMDOT(sum,ls,v,idx,n);
770         x[i] = omega*(sum/d);
771       }
772       ierr = VecPipelineEnd(xx,0,mat->lvec,0,InsertValues,PipelineUp,
773                             mat->Mvctx); CHKERR(ierr);
774       its--;
775     }
776     while (its--) {
777       ierr = VecScatterBegin(xx,0,mat->lvec,0,InsertValues,ScatterDown,
778                             mat->Mvctx); CHKERR(ierr);
779       ierr = VecScatterEnd(xx,0,mat->lvec,0,InsertValues,ScatterDown,
780                             mat->Mvctx); CHKERR(ierr);
781       ierr = VecPipelineBegin(xx,0,mat->lvec,0,InsertValues,PipelineUp,
782                               mat->Mvctx); CHKERR(ierr);
783       for ( i=m-1; i>-1; i-- ) {
784         n    = A->i[i+1] - A->i[i];
785         idx  = A->j + A->i[i] - 1;
786         v    = A->a + A->i[i] - 1;
787         sum  = b[i];
788         SPARSEDENSEMDOT(sum,xs,v,idx,n);
789         d    = shift + A->a[diag[i]-1];
790         n    = B->i[i+1] - B->i[i];
791         idx  = B->j + B->i[i] - 1;
792         v    = B->a + B->i[i] - 1;
793         SPARSEDENSEMDOT(sum,ls,v,idx,n);
794         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
795       }
796       ierr = VecPipelineEnd(xx,0,mat->lvec,0,InsertValues,PipelineUp,
797                             mat->Mvctx); CHKERR(ierr);
798     }
799   }
800   else if ((flag & SOR_LOCAL_SYMMETRIC_SWEEP) == SOR_LOCAL_SYMMETRIC_SWEEP){
801     if (flag & SOR_ZERO_INITIAL_GUESS) {
802       return MatRelax(mat->A,bb,omega,flag,shift,its,xx);
803     }
804     ierr=VecScatterBegin(xx,0,mat->lvec,0,InsertValues,ScatterAll,mat->Mvctx);
805     CHKERR(ierr);
806     ierr = VecScatterEnd(xx,0,mat->lvec,0,InsertValues,ScatterAll,mat->Mvctx);
807     CHKERR(ierr);
808     while (its--) {
809       /* go down through the rows */
810       for ( i=0; i<m; i++ ) {
811         n    = A->i[i+1] - A->i[i];
812         idx  = A->j + A->i[i] - 1;
813         v    = A->a + A->i[i] - 1;
814         sum  = b[i];
815         SPARSEDENSEMDOT(sum,xs,v,idx,n);
816         d    = shift + A->a[diag[i]-1];
817         n    = B->i[i+1] - B->i[i];
818         idx  = B->j + B->i[i] - 1;
819         v    = B->a + B->i[i] - 1;
820         SPARSEDENSEMDOT(sum,ls,v,idx,n);
821         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
822       }
823       /* come up through the rows */
824       for ( i=m-1; i>-1; i-- ) {
825         n    = A->i[i+1] - A->i[i];
826         idx  = A->j + A->i[i] - 1;
827         v    = A->a + A->i[i] - 1;
828         sum  = b[i];
829         SPARSEDENSEMDOT(sum,xs,v,idx,n);
830         d    = shift + A->a[diag[i]-1];
831         n    = B->i[i+1] - B->i[i];
832         idx  = B->j + B->i[i] - 1;
833         v    = B->a + B->i[i] - 1;
834         SPARSEDENSEMDOT(sum,ls,v,idx,n);
835         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
836       }
837     }
838   }
839   else if (flag & SOR_LOCAL_FORWARD_SWEEP){
840     if (flag & SOR_ZERO_INITIAL_GUESS) {
841       return MatRelax(mat->A,bb,omega,flag,shift,its,xx);
842     }
843     ierr=VecScatterBegin(xx,0,mat->lvec,0,InsertValues,ScatterAll,mat->Mvctx);
844     CHKERR(ierr);
845     ierr = VecScatterEnd(xx,0,mat->lvec,0,InsertValues,ScatterAll,mat->Mvctx);
846     CHKERR(ierr);
847     while (its--) {
848       for ( i=0; i<m; i++ ) {
849         n    = A->i[i+1] - A->i[i];
850         idx  = A->j + A->i[i] - 1;
851         v    = A->a + A->i[i] - 1;
852         sum  = b[i];
853         SPARSEDENSEMDOT(sum,xs,v,idx,n);
854         d    = shift + A->a[diag[i]-1];
855         n    = B->i[i+1] - B->i[i];
856         idx  = B->j + B->i[i] - 1;
857         v    = B->a + B->i[i] - 1;
858         SPARSEDENSEMDOT(sum,ls,v,idx,n);
859         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
860       }
861     }
862   }
863   else if (flag & SOR_LOCAL_BACKWARD_SWEEP){
864     if (flag & SOR_ZERO_INITIAL_GUESS) {
865       return MatRelax(mat->A,bb,omega,flag,shift,its,xx);
866     }
867     ierr = VecScatterBegin(xx,0,mat->lvec,0,InsertValues,ScatterAll,
868                             mat->Mvctx); CHKERR(ierr);
869     ierr = VecScatterEnd(xx,0,mat->lvec,0,InsertValues,ScatterAll,
870                             mat->Mvctx); CHKERR(ierr);
871     while (its--) {
872       for ( i=m-1; i>-1; i-- ) {
873         n    = A->i[i+1] - A->i[i];
874         idx  = A->j + A->i[i] - 1;
875         v    = A->a + A->i[i] - 1;
876         sum  = b[i];
877         SPARSEDENSEMDOT(sum,xs,v,idx,n);
878         d    = shift + A->a[diag[i]-1];
879         n    = B->i[i+1] - B->i[i];
880         idx  = B->j + B->i[i] - 1;
881         v    = B->a + B->i[i] - 1;
882         SPARSEDENSEMDOT(sum,ls,v,idx,n);
883         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
884       }
885     }
886   }
887   return 0;
888 }
889 static int MatInsOpt_MPIAIJ(Mat aijin,int op)
890 {
891   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) aijin->data;
892 
893   if      (op == NO_NEW_NONZERO_LOCATIONS)  {
894     MatSetOption(aij->A,op);
895     MatSetOption(aij->B,op);
896   }
897   else if (op == YES_NEW_NONZERO_LOCATIONS) {
898     MatSetOption(aij->A,op);
899     MatSetOption(aij->B,op);
900   }
901   else if (op == COLUMN_ORIENTED) SETERR(1,"Column oriented not supported");
902   return 0;
903 }
904 
905 static int MatSize_MPIAIJ(Mat matin,int *m,int *n)
906 {
907   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
908   *m = mat->M; *n = mat->N;
909   return 0;
910 }
911 
912 static int MatLocalSize_MPIAIJ(Mat matin,int *m,int *n)
913 {
914   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
915   *m = mat->m; *n = mat->n;
916   return 0;
917 }
918 
919 static int MatRange_MPIAIJ(Mat matin,int *m,int *n)
920 {
921   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
922   *m = mat->rstart; *n = mat->rend;
923   return 0;
924 }
925 
926 static int MatGetRow_MPIAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
927 {
928   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) matin->data;
929   Scalar     *vworkA, *vworkB;
930   int        ierr, *cworkA, *cworkB, lrow, cstart = aij->cstart;
931   int        nztot, nzA, nzB, i, rstart = aij->rstart, rend = aij->rend;
932 
933   if (!aij->assembled)
934     SETERR(1,"MatGetRow_MPIAIJ: Must assemble matrix first.");
935   if (row < rstart || row >= rend)
936     SETERR(1,"MatGetRow_MPIAIJ: Currently you can get only local rows.")
937   lrow = row - rstart;
938   ierr = MatGetRow(aij->A,lrow,&nzA,&cworkA,&vworkA); CHKERR(ierr);
939   for (i=0; i<nzA; i++) cworkA[i] += cstart;
940   ierr = MatGetRow(aij->B,lrow,&nzB,&cworkB,&vworkB); CHKERR(ierr);
941   for (i=0; i<nzB; i++) cworkB[i] = aij->garray[cworkB[i]];
942 
943   if (nztot = nzA + nzB) {
944     *idx = (int *) MALLOC( (nztot)*sizeof(int) ); CHKPTR(*idx);
945     *v   = (Scalar *) MALLOC( (nztot)*sizeof(Scalar) ); CHKPTR(*v);
946     for ( i=0; i<nzA; i++ ) {
947       (*idx)[i] = cworkA[i];
948       (*v)[i] = vworkA[i];
949     }
950     for ( i=0; i<nzB; i++ ) {
951       (*idx)[i+nzA] = cworkB[i];
952       (*v)[i+nzA] = vworkB[i];
953     }
954   }
955   else {*idx = 0; *v=0;}
956   *nz = nztot;
957   ierr = MatRestoreRow(aij->A,lrow,&nzA,&cworkA,&vworkA); CHKERR(ierr);
958   ierr = MatRestoreRow(aij->B,lrow,&nzB,&cworkB,&vworkB); CHKERR(ierr);
959   return 0;
960 }
961 
962 static int MatRestoreRow_MPIAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
963 {
964   if (*idx) FREE(*idx);
965   if (*v) FREE(*v);
966   return 0;
967 }
968 
969 static int MatCopy_MPIAIJ(Mat,Mat *);
970 extern int MatConvert_MPIAIJ(Mat,MATTYPE,Mat *);
971 
972 /* -------------------------------------------------------------------*/
973 static struct _MatOps MatOps = {MatInsertValues_MPIAIJ,
974        MatGetRow_MPIAIJ,MatRestoreRow_MPIAIJ,
975        MatMult_MPIAIJ,MatMultAdd_MPIAIJ,
976        MatMultTrans_MPIAIJ,MatMultTransAdd_MPIAIJ,
977        0,0,0,0,
978        0,0,
979        MatRelax_MPIAIJ,
980        0,
981        0,0,0,
982        MatCopy_MPIAIJ,
983        MatGetDiag_MPIAIJ,0,0,
984        MatBeginAssemble_MPIAIJ,MatEndAssemble_MPIAIJ,
985        0,
986        MatInsOpt_MPIAIJ,MatZero_MPIAIJ,MatZeroRows_MPIAIJ,0,
987        0,0,0,0,
988        MatSize_MPIAIJ,MatLocalSize_MPIAIJ,MatRange_MPIAIJ,
989        0,0,
990        0,MatConvert_MPIAIJ };
991 
992 /*@
993 
994       MatCreateMPIAIJ - Creates a sparse parallel matrix
995                                  in AIJ format.
996 
997   Input Parameters:
998 .   comm - MPI communicator
999 .   m,n - number of local rows and columns (or -1 to have calculated)
1000 .   M,N - global rows and columns (or -1 to have calculated)
1001 .   d_nz - total number nonzeros in diagonal portion of matrix
1002 .   d_nzz - number of nonzeros per row in diagonal portion of matrix or null
1003 .           You must leave room for the diagonal entry even if it is zero.
1004 .   o_nz - total number nonzeros in off-diagonal portion of matrix
1005 .   o_nzz - number of nonzeros per row in off-diagonal portion of matrix
1006 .           or null. You must have at least one nonzero per row.
1007 
1008   Output parameters:
1009 .  newmat - the matrix
1010 
1011   Keywords: matrix, aij, compressed row, sparse, parallel
1012 @*/
1013 int MatCreateMPIAIJ(MPI_Comm comm,int m,int n,int M,int N,
1014                  int d_nz,int *d_nnz, int o_nz,int *o_nnz,Mat *newmat)
1015 {
1016   Mat          mat;
1017   Mat_MPIAIJ   *aij;
1018   int          ierr, i,sum[2],work[2];
1019   *newmat         = 0;
1020   PETSCHEADERCREATE(mat,_Mat,MAT_COOKIE,MATMPIAIJ,comm);
1021   PLogObjectCreate(mat);
1022   mat->data       = (void *) (aij = NEW(Mat_MPIAIJ)); CHKPTR(aij);
1023   mat->ops        = &MatOps;
1024   mat->destroy    = MatDestroy_MPIAIJ;
1025   mat->view       = MatView_MPIAIJ;
1026   mat->factor     = 0;
1027   mat->row        = 0;
1028   mat->col        = 0;
1029 
1030   mat->comm       = comm;
1031   aij->insertmode = NotSetValues;
1032   MPI_Comm_rank(comm,&aij->mytid);
1033   MPI_Comm_size(comm,&aij->numtids);
1034 
1035   if (M == -1 || N == -1) {
1036     work[0] = m; work[1] = n;
1037     MPI_Allreduce((void *) work,(void *) sum,2,MPI_INT,MPI_SUM,comm );
1038     if (M == -1) M = sum[0];
1039     if (N == -1) N = sum[1];
1040   }
1041   if (m == -1) {m = M/aij->numtids + ((M % aij->numtids) > aij->mytid);}
1042   if (n == -1) {n = N/aij->numtids + ((N % aij->numtids) > aij->mytid);}
1043   aij->m       = m;
1044   aij->n       = n;
1045   aij->N       = N;
1046   aij->M       = M;
1047 
1048   /* build local table of row and column ownerships */
1049   aij->rowners = (int *) MALLOC(2*(aij->numtids+2)*sizeof(int));
1050   CHKPTR(aij->rowners);
1051   aij->cowners = aij->rowners + aij->numtids + 1;
1052   MPI_Allgather(&m,1,MPI_INT,aij->rowners+1,1,MPI_INT,comm);
1053   aij->rowners[0] = 0;
1054   for ( i=2; i<=aij->numtids; i++ ) {
1055     aij->rowners[i] += aij->rowners[i-1];
1056   }
1057   aij->rstart = aij->rowners[aij->mytid];
1058   aij->rend   = aij->rowners[aij->mytid+1];
1059   MPI_Allgather(&n,1,MPI_INT,aij->cowners+1,1,MPI_INT,comm);
1060   aij->cowners[0] = 0;
1061   for ( i=2; i<=aij->numtids; i++ ) {
1062     aij->cowners[i] += aij->cowners[i-1];
1063   }
1064   aij->cstart = aij->cowners[aij->mytid];
1065   aij->cend   = aij->cowners[aij->mytid+1];
1066 
1067 
1068   ierr = MatCreateSequentialAIJ(m,n,d_nz,d_nnz,&aij->A); CHKERR(ierr);
1069   PLogObjectParent(mat,aij->A);
1070   ierr = MatCreateSequentialAIJ(m,N,o_nz,o_nnz,&aij->B); CHKERR(ierr);
1071   PLogObjectParent(mat,aij->B);
1072 
1073   /* build cache for off array entries formed */
1074   aij->stash.nmax = CHUNCKSIZE; /* completely arbratray number */
1075   aij->stash.n    = 0;
1076   aij->stash.array = (Scalar *) MALLOC( aij->stash.nmax*(2*sizeof(int) +
1077                             sizeof(Scalar))); CHKPTR(aij->stash.array);
1078   aij->stash.idx = (int *) (aij->stash.array + aij->stash.nmax);
1079   aij->stash.idy = (int *) (aij->stash.idx + aij->stash.nmax);
1080   aij->colmap    = 0;
1081   aij->garray    = 0;
1082 
1083   /* stuff used for matrix vector multiply */
1084   aij->lvec      = 0;
1085   aij->Mvctx     = 0;
1086   aij->assembled = 0;
1087 
1088   *newmat = mat;
1089   return 0;
1090 }
1091 
1092 static int MatCopy_MPIAIJ(Mat matin,Mat *newmat)
1093 {
1094   Mat        mat;
1095   Mat_MPIAIJ *aij,*oldmat = (Mat_MPIAIJ *) matin->data;
1096   int        ierr;
1097   *newmat      = 0;
1098 
1099   if (!oldmat->assembled) SETERR(1,"Cannot copy unassembled matrix");
1100   PETSCHEADERCREATE(mat,_Mat,MAT_COOKIE,MATMPIAIJ,matin->comm);
1101   PLogObjectCreate(mat);
1102   mat->data       = (void *) (aij = NEW(Mat_MPIAIJ)); CHKPTR(aij);
1103   mat->ops        = &MatOps;
1104   mat->destroy    = MatDestroy_MPIAIJ;
1105   mat->view       = MatView_MPIAIJ;
1106   mat->factor     = matin->factor;
1107   mat->row        = 0;
1108   mat->col        = 0;
1109 
1110   aij->m          = oldmat->m;
1111   aij->n          = oldmat->n;
1112   aij->M          = oldmat->M;
1113   aij->N          = oldmat->N;
1114 
1115   aij->assembled  = 1;
1116   aij->rstart     = oldmat->rstart;
1117   aij->rend       = oldmat->rend;
1118   aij->cstart     = oldmat->cstart;
1119   aij->cend       = oldmat->cend;
1120   aij->numtids    = oldmat->numtids;
1121   aij->mytid      = oldmat->mytid;
1122   aij->insertmode = NotSetValues;
1123 
1124   aij->rowners    = (int *) MALLOC( (aij->numtids+1)*sizeof(int) );
1125   CHKPTR(aij->rowners);
1126   MEMCPY(aij->rowners,oldmat->rowners,(aij->numtids+1)*sizeof(int));
1127   aij->stash.nmax = 0;
1128   aij->stash.n    = 0;
1129   aij->stash.array= 0;
1130   aij->colmap     = 0;
1131   aij->garray     = 0;
1132   mat->comm       = matin->comm;
1133 
1134   ierr =  VecCreate(oldmat->lvec,&aij->lvec); CHKERR(ierr);
1135   PLogObjectParent(mat,aij->lvec);
1136   ierr =  VecScatterCtxCopy(oldmat->Mvctx,&aij->Mvctx); CHKERR(ierr);
1137   PLogObjectParent(mat,aij->Mvctx);
1138   ierr =  MatCopy(oldmat->A,&aij->A); CHKERR(ierr);
1139   PLogObjectParent(mat,aij->A);
1140   ierr =  MatCopy(oldmat->B,&aij->B); CHKERR(ierr);
1141   PLogObjectParent(mat,aij->B);
1142   *newmat = mat;
1143   return 0;
1144 }
1145