xref: /petsc/src/mat/impls/aij/mpi/mpiaij.c (revision bbf0e169f9fbcb77d596644d33cfb942ebffc0f0)
1 
2 #ifndef lint
3 static char vcid[] = "$Id: mpiaij.c,v 1.169 1996/09/23 18:20:49 bsmith Exp bsmith $";
4 #endif
5 
6 #include "src/mat/impls/aij/mpi/mpiaij.h"
7 #include "src/vec/vecimpl.h"
8 #include "src/inline/spops.h"
9 
10 /* local utility routine that creates a mapping from the global column
11 number to the local number in the off-diagonal part of the local
12 storage of the matrix.  This is done in a non scable way since the
13 length of colmap equals the global matrix length.
14 */
15 static int CreateColmap_MPIAIJ_Private(Mat mat)
16 {
17   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
18   Mat_SeqAIJ *B = (Mat_SeqAIJ*) aij->B->data;
19   int        n = B->n,i;
20 
21   aij->colmap = (int *) PetscMalloc(aij->N*sizeof(int));CHKPTRQ(aij->colmap);
22   PLogObjectMemory(mat,aij->N*sizeof(int));
23   PetscMemzero(aij->colmap,aij->N*sizeof(int));
24   for ( i=0; i<n; i++ ) aij->colmap[aij->garray[i]] = i+1;
25   return 0;
26 }
27 
28 extern int DisAssemble_MPIAIJ(Mat);
29 
30 static int MatGetRowIJ_MPIAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
31                            PetscTruth *done)
32 {
33   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
34   int        ierr;
35   if (aij->size == 1) {
36     ierr = MatGetRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
37   } else SETERRQ(1,"MatGetRowIJ_MPIAIJ:not supported in parallel");
38   return 0;
39 }
40 
41 static int MatRestoreRowIJ_MPIAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
42                                PetscTruth *done)
43 {
44   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
45   int        ierr;
46   if (aij->size == 1) {
47     ierr = MatRestoreRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
48   } else SETERRQ(1,"MatRestoreRowIJ_MPIAIJ:not supported in parallel");
49   return 0;
50 }
51 
52 static int MatSetValues_MPIAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
53 {
54   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
55   Scalar     value;
56   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
57   int        cstart = aij->cstart, cend = aij->cend,row,col;
58   int        roworiented = aij->roworiented;
59 
60   if (aij->insertmode != NOT_SET_VALUES && aij->insertmode != addv) {
61     SETERRQ(1,"MatSetValues_MPIAIJ:Cannot mix inserts and adds");
62   }
63   aij->insertmode = addv;
64   for ( i=0; i<m; i++ ) {
65     if (im[i] < 0) SETERRQ(1,"MatSetValues_MPIAIJ:Negative row");
66     if (im[i] >= aij->M) SETERRQ(1,"MatSetValues_MPIAIJ:Row too large");
67     if (im[i] >= rstart && im[i] < rend) {
68       row = im[i] - rstart;
69       for ( j=0; j<n; j++ ) {
70         if (in[j] < 0) SETERRQ(1,"MatSetValues_MPIAIJ:Negative column");
71         if (in[j] >= aij->N) SETERRQ(1,"MatSetValues_MPIAIJ:Col too large");
72         if (in[j] >= cstart && in[j] < cend){
73           col = in[j] - cstart;
74           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
75           ierr = MatSetValues(aij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
76         }
77         else {
78           if (mat->was_assembled) {
79             if (!aij->colmap) {
80               ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
81             }
82             col = aij->colmap[in[j]] - 1;
83             if (col < 0 && !((Mat_SeqAIJ*)(aij->A->data))->nonew) {
84               ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
85               col =  in[j];
86             }
87           }
88           else col = in[j];
89           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
90           ierr = MatSetValues(aij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
91         }
92       }
93     }
94     else {
95       if (roworiented) {
96         ierr = StashValues_Private(&aij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
97       }
98       else {
99         row = im[i];
100         for ( j=0; j<n; j++ ) {
101           ierr = StashValues_Private(&aij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
102         }
103       }
104     }
105   }
106   return 0;
107 }
108 
109 static int MatGetValues_MPIAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
110 {
111   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
112   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
113   int        cstart = aij->cstart, cend = aij->cend,row,col;
114 
115   for ( i=0; i<m; i++ ) {
116     if (idxm[i] < 0) SETERRQ(1,"MatGetValues_MPIAIJ:Negative row");
117     if (idxm[i] >= aij->M) SETERRQ(1,"MatGetValues_MPIAIJ:Row too large");
118     if (idxm[i] >= rstart && idxm[i] < rend) {
119       row = idxm[i] - rstart;
120       for ( j=0; j<n; j++ ) {
121         if (idxn[j] < 0) SETERRQ(1,"MatGetValues_MPIAIJ:Negative column");
122         if (idxn[j] >= aij->N) SETERRQ(1,"MatGetValues_MPIAIJ:Col too large");
123         if (idxn[j] >= cstart && idxn[j] < cend){
124           col = idxn[j] - cstart;
125           ierr = MatGetValues(aij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
126         }
127         else {
128           if (!aij->colmap) {
129             ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
130           }
131           col = aij->colmap[idxn[j]] - 1;
132           if ((col < 0) || (aij->garray[col] != idxn[j])) *(v+i*n+j) = 0.0;
133           else {
134             ierr = MatGetValues(aij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
135           }
136         }
137       }
138     }
139     else {
140       SETERRQ(1,"MatGetValues_MPIAIJ:Only local values currently supported");
141     }
142   }
143   return 0;
144 }
145 
146 static int MatAssemblyBegin_MPIAIJ(Mat mat,MatAssemblyType mode)
147 {
148   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
149   MPI_Comm    comm = mat->comm;
150   int         size = aij->size, *owners = aij->rowners;
151   int         rank = aij->rank,tag = mat->tag, *owner,*starts,count,ierr;
152   MPI_Request *send_waits,*recv_waits;
153   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
154   InsertMode  addv;
155   Scalar      *rvalues,*svalues;
156 
157   /* make sure all processors are either in INSERTMODE or ADDMODE */
158   MPI_Allreduce(&aij->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
159   if (addv == (ADD_VALUES|INSERT_VALUES)) {
160     SETERRQ(1,"MatAssemblyBegin_MPIAIJ:Some processors inserted others added");
161   }
162   aij->insertmode = addv; /* in case this processor had no cache */
163 
164   /*  first count number of contributors to each processor */
165   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
166   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
167   owner = (int *) PetscMalloc( (aij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
168   for ( i=0; i<aij->stash.n; i++ ) {
169     idx = aij->stash.idx[i];
170     for ( j=0; j<size; j++ ) {
171       if (idx >= owners[j] && idx < owners[j+1]) {
172         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
173       }
174     }
175   }
176   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
177 
178   /* inform other processors of number of messages and max length*/
179   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
180   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
181   nreceives = work[rank];
182   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
183   nmax = work[rank];
184   PetscFree(work);
185 
186   /* post receives:
187        1) each message will consist of ordered pairs
188      (global index,value) we store the global index as a double
189      to simplify the message passing.
190        2) since we don't know how long each individual message is we
191      allocate the largest needed buffer for each receive. Potentially
192      this is a lot of wasted space.
193 
194 
195        This could be done better.
196   */
197   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
198   CHKPTRQ(rvalues);
199   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
200   CHKPTRQ(recv_waits);
201   for ( i=0; i<nreceives; i++ ) {
202     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
203               comm,recv_waits+i);
204   }
205 
206   /* do sends:
207       1) starts[i] gives the starting index in svalues for stuff going to
208          the ith processor
209   */
210   svalues = (Scalar *) PetscMalloc(3*(aij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
211   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
212   CHKPTRQ(send_waits);
213   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
214   starts[0] = 0;
215   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
216   for ( i=0; i<aij->stash.n; i++ ) {
217     svalues[3*starts[owner[i]]]       = (Scalar)  aij->stash.idx[i];
218     svalues[3*starts[owner[i]]+1]     = (Scalar)  aij->stash.idy[i];
219     svalues[3*(starts[owner[i]]++)+2] =  aij->stash.array[i];
220   }
221   PetscFree(owner);
222   starts[0] = 0;
223   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
224   count = 0;
225   for ( i=0; i<size; i++ ) {
226     if (procs[i]) {
227       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
228                 comm,send_waits+count++);
229     }
230   }
231   PetscFree(starts); PetscFree(nprocs);
232 
233   /* Free cache space */
234   PLogInfo(0,"[%d]MatAssemblyBegin_MPIAIJ:Number of off processor values %d\n",rank,aij->stash.n);
235   ierr = StashDestroy_Private(&aij->stash); CHKERRQ(ierr);
236 
237   aij->svalues    = svalues;    aij->rvalues    = rvalues;
238   aij->nsends     = nsends;     aij->nrecvs     = nreceives;
239   aij->send_waits = send_waits; aij->recv_waits = recv_waits;
240   aij->rmax       = nmax;
241 
242   return 0;
243 }
244 extern int MatSetUpMultiply_MPIAIJ(Mat);
245 
246 static int MatAssemblyEnd_MPIAIJ(Mat mat,MatAssemblyType mode)
247 {
248   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
249   MPI_Status  *send_status,recv_status;
250   int         imdex,nrecvs = aij->nrecvs, count = nrecvs, i, n, ierr;
251   int         row,col,other_disassembled;
252   Scalar      *values,val;
253   InsertMode  addv = aij->insertmode;
254 
255   /*  wait on receives */
256   while (count) {
257     MPI_Waitany(nrecvs,aij->recv_waits,&imdex,&recv_status);
258     /* unpack receives into our local space */
259     values = aij->rvalues + 3*imdex*aij->rmax;
260     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
261     n = n/3;
262     for ( i=0; i<n; i++ ) {
263       row = (int) PetscReal(values[3*i]) - aij->rstart;
264       col = (int) PetscReal(values[3*i+1]);
265       val = values[3*i+2];
266       if (col >= aij->cstart && col < aij->cend) {
267         col -= aij->cstart;
268         MatSetValues(aij->A,1,&row,1,&col,&val,addv);
269       }
270       else {
271         if (mat->was_assembled || mat->assembled) {
272           if (!aij->colmap) {
273             ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
274           }
275           col = aij->colmap[col] - 1;
276           if (col < 0  && !((Mat_SeqAIJ*)(aij->A->data))->nonew) {
277             ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
278             col = (int) PetscReal(values[3*i+1]);
279           }
280         }
281         MatSetValues(aij->B,1,&row,1,&col,&val,addv);
282       }
283     }
284     count--;
285   }
286   PetscFree(aij->recv_waits); PetscFree(aij->rvalues);
287 
288   /* wait on sends */
289   if (aij->nsends) {
290     send_status = (MPI_Status *) PetscMalloc(aij->nsends*sizeof(MPI_Status));
291     CHKPTRQ(send_status);
292     MPI_Waitall(aij->nsends,aij->send_waits,send_status);
293     PetscFree(send_status);
294   }
295   PetscFree(aij->send_waits); PetscFree(aij->svalues);
296 
297   aij->insertmode = NOT_SET_VALUES;
298   ierr = MatAssemblyBegin(aij->A,mode); CHKERRQ(ierr);
299   ierr = MatAssemblyEnd(aij->A,mode); CHKERRQ(ierr);
300 
301   /* determine if any processor has disassembled, if so we must
302      also disassemble ourselfs, in order that we may reassemble. */
303   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
304   if (mat->was_assembled && !other_disassembled) {
305     ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
306   }
307 
308   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
309     ierr = MatSetUpMultiply_MPIAIJ(mat); CHKERRQ(ierr);
310   }
311   ierr = MatAssemblyBegin(aij->B,mode); CHKERRQ(ierr);
312   ierr = MatAssemblyEnd(aij->B,mode); CHKERRQ(ierr);
313 
314   if (aij->rowvalues) {PetscFree(aij->rowvalues); aij->rowvalues = 0;}
315   return 0;
316 }
317 
318 static int MatZeroEntries_MPIAIJ(Mat A)
319 {
320   Mat_MPIAIJ *l = (Mat_MPIAIJ *) A->data;
321   int        ierr;
322   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
323   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
324   return 0;
325 }
326 
327 /* the code does not do the diagonal entries correctly unless the
328    matrix is square and the column and row owerships are identical.
329    This is a BUG. The only way to fix it seems to be to access
330    aij->A and aij->B directly and not through the MatZeroRows()
331    routine.
332 */
333 static int MatZeroRows_MPIAIJ(Mat A,IS is,Scalar *diag)
334 {
335   Mat_MPIAIJ     *l = (Mat_MPIAIJ *) A->data;
336   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
337   int            *procs,*nprocs,j,found,idx,nsends,*work;
338   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
339   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
340   int            *lens,imdex,*lrows,*values;
341   MPI_Comm       comm = A->comm;
342   MPI_Request    *send_waits,*recv_waits;
343   MPI_Status     recv_status,*send_status;
344   IS             istmp;
345 
346   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
347   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
348 
349   /*  first count number of contributors to each processor */
350   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
351   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
352   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
353   for ( i=0; i<N; i++ ) {
354     idx = rows[i];
355     found = 0;
356     for ( j=0; j<size; j++ ) {
357       if (idx >= owners[j] && idx < owners[j+1]) {
358         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
359       }
360     }
361     if (!found) SETERRQ(1,"MatZeroRows_MPIAIJ:Index out of range");
362   }
363   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
364 
365   /* inform other processors of number of messages and max length*/
366   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
367   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
368   nrecvs = work[rank];
369   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
370   nmax = work[rank];
371   PetscFree(work);
372 
373   /* post receives:   */
374   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
375   CHKPTRQ(rvalues);
376   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
377   CHKPTRQ(recv_waits);
378   for ( i=0; i<nrecvs; i++ ) {
379     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
380   }
381 
382   /* do sends:
383       1) starts[i] gives the starting index in svalues for stuff going to
384          the ith processor
385   */
386   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
387   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
388   CHKPTRQ(send_waits);
389   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
390   starts[0] = 0;
391   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
392   for ( i=0; i<N; i++ ) {
393     svalues[starts[owner[i]]++] = rows[i];
394   }
395   ISRestoreIndices(is,&rows);
396 
397   starts[0] = 0;
398   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
399   count = 0;
400   for ( i=0; i<size; i++ ) {
401     if (procs[i]) {
402       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
403     }
404   }
405   PetscFree(starts);
406 
407   base = owners[rank];
408 
409   /*  wait on receives */
410   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
411   source = lens + nrecvs;
412   count  = nrecvs; slen = 0;
413   while (count) {
414     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
415     /* unpack receives into our local space */
416     MPI_Get_count(&recv_status,MPI_INT,&n);
417     source[imdex]  = recv_status.MPI_SOURCE;
418     lens[imdex]  = n;
419     slen += n;
420     count--;
421   }
422   PetscFree(recv_waits);
423 
424   /* move the data into the send scatter */
425   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
426   count = 0;
427   for ( i=0; i<nrecvs; i++ ) {
428     values = rvalues + i*nmax;
429     for ( j=0; j<lens[i]; j++ ) {
430       lrows[count++] = values[j] - base;
431     }
432   }
433   PetscFree(rvalues); PetscFree(lens);
434   PetscFree(owner); PetscFree(nprocs);
435 
436   /* actually zap the local rows */
437   ierr = ISCreateGeneral(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
438   PLogObjectParent(A,istmp);
439   PetscFree(lrows);
440   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
441   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
442   ierr = ISDestroy(istmp); CHKERRQ(ierr);
443 
444   /* wait on sends */
445   if (nsends) {
446     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
447     CHKPTRQ(send_status);
448     MPI_Waitall(nsends,send_waits,send_status);
449     PetscFree(send_status);
450   }
451   PetscFree(send_waits); PetscFree(svalues);
452 
453   return 0;
454 }
455 
456 static int MatMult_MPIAIJ(Mat A,Vec xx,Vec yy)
457 {
458   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
459   int        ierr,nt;
460 
461   ierr = VecGetLocalSize(xx,&nt);  CHKERRQ(ierr);
462   if (nt != a->n) {
463     SETERRQ(1,"MatMult_MPIAIJ:Incompatible parition of A and xx");
464   }
465   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx); CHKERRQ(ierr);
466   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
467   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx); CHKERRQ(ierr);
468   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
469   return 0;
470 }
471 
472 static int MatMultAdd_MPIAIJ(Mat A,Vec xx,Vec yy,Vec zz)
473 {
474   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
475   int        ierr;
476   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
477   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
478   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
479   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
480   return 0;
481 }
482 
483 static int MatMultTrans_MPIAIJ(Mat A,Vec xx,Vec yy)
484 {
485   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
486   int        ierr;
487 
488   /* do nondiagonal part */
489   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
490   /* send it on its way */
491   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
492   /* do local part */
493   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
494   /* receive remote parts: note this assumes the values are not actually */
495   /* inserted in yy until the next line, which is true for my implementation*/
496   /* but is not perhaps always true. */
497   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
498   return 0;
499 }
500 
501 static int MatMultTransAdd_MPIAIJ(Mat A,Vec xx,Vec yy,Vec zz)
502 {
503   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
504   int        ierr;
505 
506   /* do nondiagonal part */
507   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
508   /* send it on its way */
509   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
510   /* do local part */
511   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
512   /* receive remote parts: note this assumes the values are not actually */
513   /* inserted in yy until the next line, which is true for my implementation*/
514   /* but is not perhaps always true. */
515   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,
516                   (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
517   return 0;
518 }
519 
520 /*
521   This only works correctly for square matrices where the subblock A->A is the
522    diagonal block
523 */
524 static int MatGetDiagonal_MPIAIJ(Mat A,Vec v)
525 {
526   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
527   if (a->M != a->N)
528     SETERRQ(1,"MatGetDiagonal_MPIAIJ:Supports only square matrix where A->A is diag block");
529   if (a->rstart != a->cstart || a->rend != a->cend) {
530     SETERRQ(1,"MatGetDiagonal_MPIAIJ:row partition must equal col partition");  }
531   return MatGetDiagonal(a->A,v);
532 }
533 
534 static int MatScale_MPIAIJ(Scalar *aa,Mat A)
535 {
536   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
537   int        ierr;
538   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
539   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
540   return 0;
541 }
542 
543 static int MatDestroy_MPIAIJ(PetscObject obj)
544 {
545   Mat        mat = (Mat) obj;
546   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
547   int        ierr;
548 #if defined(PETSC_LOG)
549   PLogObjectState(obj,"Rows=%d, Cols=%d",aij->M,aij->N);
550 #endif
551   PetscFree(aij->rowners);
552   ierr = MatDestroy(aij->A); CHKERRQ(ierr);
553   ierr = MatDestroy(aij->B); CHKERRQ(ierr);
554   if (aij->colmap) PetscFree(aij->colmap);
555   if (aij->garray) PetscFree(aij->garray);
556   if (aij->lvec)   VecDestroy(aij->lvec);
557   if (aij->Mvctx)  VecScatterDestroy(aij->Mvctx);
558   if (aij->rowvalues) PetscFree(aij->rowvalues);
559   PetscFree(aij);
560   PLogObjectDestroy(mat);
561   PetscHeaderDestroy(mat);
562   return 0;
563 }
564 #include "draw.h"
565 #include "pinclude/pviewer.h"
566 
567 static int MatView_MPIAIJ_Binary(Mat mat,Viewer viewer)
568 {
569   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
570   int         ierr;
571 
572   if (aij->size == 1) {
573     ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
574   }
575   else SETERRQ(1,"MatView_MPIAIJ_Binary:Only uniprocessor output supported");
576   return 0;
577 }
578 
579 static int MatView_MPIAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
580 {
581   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
582   Mat_SeqAIJ* C = (Mat_SeqAIJ*)aij->A->data;
583   int         ierr, format,shift = C->indexshift,rank;
584   FILE        *fd;
585   ViewerType  vtype;
586 
587   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
588   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
589     ierr = ViewerGetFormat(viewer,&format);
590     if (format == ASCII_FORMAT_INFO_DETAILED) {
591       MatInfo info;
592       int     flg;
593       MPI_Comm_rank(mat->comm,&rank);
594       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
595       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
596       ierr = OptionsHasName(PETSC_NULL,"-mat_aij_no_inode",&flg); CHKERRQ(ierr);
597       PetscSequentialPhaseBegin(mat->comm,1);
598       if (flg) fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d mem %d, not using I-node routines\n",
599          rank,aij->m,(int)info.nz_used,(int)info.nz_allocated,(int)info.memory);
600       else fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d mem %d, using I-node routines\n",
601          rank,aij->m,(int)info.nz_used,(int)info.nz_allocated,(int)info.memory);
602       ierr = MatGetInfo(aij->A,MAT_LOCAL,&info);
603       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used);
604       ierr = MatGetInfo(aij->B,MAT_LOCAL,&info);
605       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used);
606       fflush(fd);
607       PetscSequentialPhaseEnd(mat->comm,1);
608       ierr = VecScatterView(aij->Mvctx,viewer); CHKERRQ(ierr);
609       return 0;
610     }
611     else if (format == ASCII_FORMAT_INFO) {
612       return 0;
613     }
614   }
615 
616   if (vtype == DRAW_VIEWER) {
617     Draw       draw;
618     PetscTruth isnull;
619     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
620     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
621   }
622 
623   if (vtype == ASCII_FILE_VIEWER) {
624     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
625     PetscSequentialPhaseBegin(mat->comm,1);
626     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
627            aij->rank,aij->m,aij->rstart,aij->rend,aij->n,aij->cstart,
628            aij->cend);
629     ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
630     ierr = MatView(aij->B,viewer); CHKERRQ(ierr);
631     fflush(fd);
632     PetscSequentialPhaseEnd(mat->comm,1);
633   }
634   else {
635     int size = aij->size;
636     rank = aij->rank;
637     if (size == 1) {
638       ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
639     }
640     else {
641       /* assemble the entire matrix onto first processor. */
642       Mat         A;
643       Mat_SeqAIJ *Aloc;
644       int         M = aij->M, N = aij->N,m,*ai,*aj,row,*cols,i,*ct;
645       Scalar      *a;
646 
647       if (!rank) {
648         ierr = MatCreateMPIAIJ(mat->comm,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
649                CHKERRQ(ierr);
650       }
651       else {
652         ierr = MatCreateMPIAIJ(mat->comm,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
653                CHKERRQ(ierr);
654       }
655       PLogObjectParent(mat,A);
656 
657       /* copy over the A part */
658       Aloc = (Mat_SeqAIJ*) aij->A->data;
659       m = Aloc->m; ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
660       row = aij->rstart;
661       for ( i=0; i<ai[m]+shift; i++ ) {aj[i] += aij->cstart + shift;}
662       for ( i=0; i<m; i++ ) {
663         ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],aj,a,INSERT_VALUES);CHKERRQ(ierr);
664         row++; a += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
665       }
666       aj = Aloc->j;
667       for ( i=0; i<ai[m]+shift; i++ ) {aj[i] -= aij->cstart + shift;}
668 
669       /* copy over the B part */
670       Aloc = (Mat_SeqAIJ*) aij->B->data;
671       m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
672       row = aij->rstart;
673       ct = cols = (int *) PetscMalloc( (ai[m]+1)*sizeof(int) ); CHKPTRQ(cols);
674       for ( i=0; i<ai[m]+shift; i++ ) {cols[i] = aij->garray[aj[i]+shift];}
675       for ( i=0; i<m; i++ ) {
676         ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],cols,a,INSERT_VALUES);CHKERRQ(ierr);
677         row++; a += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
678       }
679       PetscFree(ct);
680       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
681       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
682       if (!rank) {
683         ierr = MatView(((Mat_MPIAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
684       }
685       ierr = MatDestroy(A); CHKERRQ(ierr);
686     }
687   }
688   return 0;
689 }
690 
691 static int MatView_MPIAIJ(PetscObject obj,Viewer viewer)
692 {
693   Mat         mat = (Mat) obj;
694   int         ierr;
695   ViewerType  vtype;
696 
697   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
698   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
699       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
700     ierr = MatView_MPIAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
701   }
702   else if (vtype == BINARY_FILE_VIEWER) {
703     return MatView_MPIAIJ_Binary(mat,viewer);
704   }
705   return 0;
706 }
707 
708 /*
709     This has to provide several versions.
710 
711      2) a) use only local smoothing updating outer values only once.
712         b) local smoothing updating outer values each inner iteration
713      3) color updating out values betwen colors.
714 */
715 static int MatRelax_MPIAIJ(Mat matin,Vec bb,double omega,MatSORType flag,
716                            double fshift,int its,Vec xx)
717 {
718   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
719   Mat        AA = mat->A, BB = mat->B;
720   Mat_SeqAIJ *A = (Mat_SeqAIJ *) AA->data, *B = (Mat_SeqAIJ *)BB->data;
721   Scalar     *b,*x,*xs,*ls,d,*v,sum;
722   int        ierr,*idx, *diag;
723   int        n = mat->n, m = mat->m, i,shift = A->indexshift;
724 
725   VecGetArray(xx,&x); VecGetArray(bb,&b); VecGetArray(mat->lvec,&ls);
726   xs = x + shift; /* shift by one for index start of 1 */
727   ls = ls + shift;
728   if (!A->diag) {if ((ierr = MatMarkDiag_SeqAIJ(AA))) return ierr;}
729   diag = A->diag;
730   if ((flag & SOR_LOCAL_SYMMETRIC_SWEEP) == SOR_LOCAL_SYMMETRIC_SWEEP){
731     if (flag & SOR_ZERO_INITIAL_GUESS) {
732       return (*mat->A->ops.relax)(mat->A,bb,omega,flag,fshift,its,xx);
733     }
734     ierr=VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,mat->Mvctx);
735     CHKERRQ(ierr);
736     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,mat->Mvctx);
737     CHKERRQ(ierr);
738     while (its--) {
739       /* go down through the rows */
740       for ( i=0; i<m; i++ ) {
741         n    = A->i[i+1] - A->i[i];
742         idx  = A->j + A->i[i] + shift;
743         v    = A->a + A->i[i] + shift;
744         sum  = b[i];
745         SPARSEDENSEMDOT(sum,xs,v,idx,n);
746         d    = fshift + A->a[diag[i]+shift];
747         n    = B->i[i+1] - B->i[i];
748         idx  = B->j + B->i[i] + shift;
749         v    = B->a + B->i[i] + shift;
750         SPARSEDENSEMDOT(sum,ls,v,idx,n);
751         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
752       }
753       /* come up through the rows */
754       for ( i=m-1; i>-1; i-- ) {
755         n    = A->i[i+1] - A->i[i];
756         idx  = A->j + A->i[i] + shift;
757         v    = A->a + A->i[i] + shift;
758         sum  = b[i];
759         SPARSEDENSEMDOT(sum,xs,v,idx,n);
760         d    = fshift + A->a[diag[i]+shift];
761         n    = B->i[i+1] - B->i[i];
762         idx  = B->j + B->i[i] + shift;
763         v    = B->a + B->i[i] + shift;
764         SPARSEDENSEMDOT(sum,ls,v,idx,n);
765         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
766       }
767     }
768   }
769   else if (flag & SOR_LOCAL_FORWARD_SWEEP){
770     if (flag & SOR_ZERO_INITIAL_GUESS) {
771       return (*mat->A->ops.relax)(mat->A,bb,omega,flag,fshift,its,xx);
772     }
773     ierr=VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,mat->Mvctx);
774     CHKERRQ(ierr);
775     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,mat->Mvctx);
776     CHKERRQ(ierr);
777     while (its--) {
778       for ( i=0; i<m; i++ ) {
779         n    = A->i[i+1] - A->i[i];
780         idx  = A->j + A->i[i] + shift;
781         v    = A->a + A->i[i] + shift;
782         sum  = b[i];
783         SPARSEDENSEMDOT(sum,xs,v,idx,n);
784         d    = fshift + A->a[diag[i]+shift];
785         n    = B->i[i+1] - B->i[i];
786         idx  = B->j + B->i[i] + shift;
787         v    = B->a + B->i[i] + shift;
788         SPARSEDENSEMDOT(sum,ls,v,idx,n);
789         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
790       }
791     }
792   }
793   else if (flag & SOR_LOCAL_BACKWARD_SWEEP){
794     if (flag & SOR_ZERO_INITIAL_GUESS) {
795       return (*mat->A->ops.relax)(mat->A,bb,omega,flag,fshift,its,xx);
796     }
797     ierr = VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,
798                             mat->Mvctx); CHKERRQ(ierr);
799     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,
800                             mat->Mvctx); CHKERRQ(ierr);
801     while (its--) {
802       for ( i=m-1; i>-1; i-- ) {
803         n    = A->i[i+1] - A->i[i];
804         idx  = A->j + A->i[i] + shift;
805         v    = A->a + A->i[i] + shift;
806         sum  = b[i];
807         SPARSEDENSEMDOT(sum,xs,v,idx,n);
808         d    = fshift + A->a[diag[i]+shift];
809         n    = B->i[i+1] - B->i[i];
810         idx  = B->j + B->i[i] + shift;
811         v    = B->a + B->i[i] + shift;
812         SPARSEDENSEMDOT(sum,ls,v,idx,n);
813         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
814       }
815     }
816   }
817   else {
818     SETERRQ(1,"MatRelax_MPIAIJ:Option not supported");
819   }
820   return 0;
821 }
822 
823 static int MatGetInfo_MPIAIJ(Mat matin,MatInfoType flag,MatInfo *info)
824 {
825   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
826   Mat        A = mat->A, B = mat->B;
827   int        ierr;
828   double     isend[5], irecv[5];
829 
830   info->rows_global    = (double)mat->M;
831   info->columns_global = (double)mat->N;
832   info->rows_local     = (double)mat->m;
833   info->columns_local  = (double)mat->N;
834   info->block_size     = 1.0;
835   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
836   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->nz_unneeded;
837   isend[3] = info->memory;  isend[4] = info->mallocs;
838   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
839   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->nz_unneeded;
840   isend[3] += info->memory;  isend[4] += info->mallocs;
841   if (flag == MAT_LOCAL) {
842     info->nz_used      = isend[0];
843     info->nz_allocated = isend[1];
844     info->nz_unneeded  = isend[2];
845     info->memory       = isend[3];
846     info->mallocs      = isend[4];
847   } else if (flag == MAT_GLOBAL_MAX) {
848     MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_MAX,matin->comm);
849     info->nz_used      = irecv[0];
850     info->nz_allocated = irecv[1];
851     info->nz_unneeded  = irecv[2];
852     info->memory       = irecv[3];
853     info->mallocs      = irecv[4];
854   } else if (flag == MAT_GLOBAL_SUM) {
855     MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_SUM,matin->comm);
856     info->nz_used      = irecv[0];
857     info->nz_allocated = irecv[1];
858     info->nz_unneeded  = irecv[2];
859     info->memory       = irecv[3];
860     info->mallocs      = irecv[4];
861   }
862   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
863   info->fill_ratio_needed = 0;
864   info->factor_mallocs    = 0;
865 
866   return 0;
867 }
868 
869 extern int MatLUFactorSymbolic_MPIAIJ(Mat,IS,IS,double,Mat*);
870 extern int MatLUFactorNumeric_MPIAIJ(Mat,Mat*);
871 extern int MatLUFactor_MPIAIJ(Mat,IS,IS,double);
872 extern int MatILUFactorSymbolic_MPIAIJ(Mat,IS,IS,double,int,Mat *);
873 extern int MatSolve_MPIAIJ(Mat,Vec,Vec);
874 extern int MatSolveAdd_MPIAIJ(Mat,Vec,Vec,Vec);
875 extern int MatSolveTrans_MPIAIJ(Mat,Vec,Vec);
876 extern int MatSolveTransAdd_MPIAIJ(Mat,Vec,Vec,Vec);
877 
878 static int MatSetOption_MPIAIJ(Mat A,MatOption op)
879 {
880   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
881 
882   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
883       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
884       op == MAT_COLUMNS_SORTED ||
885       op == MAT_ROW_ORIENTED) {
886         MatSetOption(a->A,op);
887         MatSetOption(a->B,op);
888   }
889   else if (op == MAT_ROWS_SORTED ||
890            op == MAT_SYMMETRIC ||
891            op == MAT_STRUCTURALLY_SYMMETRIC ||
892            op == MAT_YES_NEW_DIAGONALS)
893     PLogInfo(A,"Info:MatSetOption_MPIAIJ:Option ignored\n");
894   else if (op == MAT_COLUMN_ORIENTED) {
895     a->roworiented = 0;
896     MatSetOption(a->A,op);
897     MatSetOption(a->B,op);
898   }
899   else if (op == MAT_NO_NEW_DIAGONALS)
900     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIAIJ:MAT_NO_NEW_DIAGONALS");}
901   else
902     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIAIJ:unknown option");}
903   return 0;
904 }
905 
906 static int MatGetSize_MPIAIJ(Mat matin,int *m,int *n)
907 {
908   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
909   *m = mat->M; *n = mat->N;
910   return 0;
911 }
912 
913 static int MatGetLocalSize_MPIAIJ(Mat matin,int *m,int *n)
914 {
915   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
916   *m = mat->m; *n = mat->N;
917   return 0;
918 }
919 
920 static int MatGetOwnershipRange_MPIAIJ(Mat matin,int *m,int *n)
921 {
922   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
923   *m = mat->rstart; *n = mat->rend;
924   return 0;
925 }
926 
927 extern int MatGetRow_SeqAIJ(Mat,int,int*,int**,Scalar**);
928 extern int MatRestoreRow_SeqAIJ(Mat,int,int*,int**,Scalar**);
929 
930 int MatGetRow_MPIAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
931 {
932   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
933   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
934   int        i, ierr, *cworkA, *cworkB, **pcA, **pcB, cstart = mat->cstart;
935   int        nztot, nzA, nzB, lrow, rstart = mat->rstart, rend = mat->rend;
936   int        *cmap, *idx_p;
937 
938   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,"MatGetRow_MPIAIJ:Already active");
939   mat->getrowactive = PETSC_TRUE;
940 
941   if (!mat->rowvalues && (idx || v)) {
942     /*
943         allocate enough space to hold information from the longest row.
944     */
945     Mat_SeqAIJ *Aa = (Mat_SeqAIJ *) mat->A->data,*Ba = (Mat_SeqAIJ *) mat->B->data;
946     int     max = 1,m = mat->m,tmp;
947     for ( i=0; i<m; i++ ) {
948       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
949       if (max < tmp) { max = tmp; }
950     }
951     mat->rowvalues = (Scalar *) PetscMalloc( max*(sizeof(int)+sizeof(Scalar)));
952     CHKPTRQ(mat->rowvalues);
953     mat->rowindices = (int *) (mat->rowvalues + max);
954   }
955 
956   if (row < rstart || row >= rend) SETERRQ(1,"MatGetRow_MPIAIJ:Only local rows")
957   lrow = row - rstart;
958 
959   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
960   if (!v)   {pvA = 0; pvB = 0;}
961   if (!idx) {pcA = 0; if (!v) pcB = 0;}
962   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
963   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
964   nztot = nzA + nzB;
965 
966   cmap  = mat->garray;
967   if (v  || idx) {
968     if (nztot) {
969       /* Sort by increasing column numbers, assuming A and B already sorted */
970       int imark = -1;
971       if (v) {
972         *v = v_p = mat->rowvalues;
973         for ( i=0; i<nzB; i++ ) {
974           if (cmap[cworkB[i]] < cstart)   v_p[i] = vworkB[i];
975           else break;
976         }
977         imark = i;
978         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
979         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
980       }
981       if (idx) {
982         *idx = idx_p = mat->rowindices;
983         if (imark > -1) {
984           for ( i=0; i<imark; i++ ) {
985             idx_p[i] = cmap[cworkB[i]];
986           }
987         } else {
988           for ( i=0; i<nzB; i++ ) {
989             if (cmap[cworkB[i]] < cstart)   idx_p[i] = cmap[cworkB[i]];
990             else break;
991           }
992           imark = i;
993         }
994         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart + cworkA[i];
995         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]];
996       }
997     }
998     else {
999       if (idx) *idx = 0;
1000       if (v)   *v   = 0;
1001     }
1002   }
1003   *nz = nztot;
1004   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1005   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1006   return 0;
1007 }
1008 
1009 int MatRestoreRow_MPIAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
1010 {
1011   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
1012   if (aij->getrowactive == PETSC_FALSE) {
1013     SETERRQ(1,"MatRestoreRow_MPIAIJ:MatGetRow not called");
1014   }
1015   aij->getrowactive = PETSC_FALSE;
1016   return 0;
1017 }
1018 
1019 static int MatNorm_MPIAIJ(Mat mat,NormType type,double *norm)
1020 {
1021   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
1022   Mat_SeqAIJ *amat = (Mat_SeqAIJ*) aij->A->data, *bmat = (Mat_SeqAIJ*) aij->B->data;
1023   int        ierr, i, j, cstart = aij->cstart,shift = amat->indexshift;
1024   double     sum = 0.0;
1025   Scalar     *v;
1026 
1027   if (aij->size == 1) {
1028     ierr =  MatNorm(aij->A,type,norm); CHKERRQ(ierr);
1029   } else {
1030     if (type == NORM_FROBENIUS) {
1031       v = amat->a;
1032       for (i=0; i<amat->nz; i++ ) {
1033 #if defined(PETSC_COMPLEX)
1034         sum += real(conj(*v)*(*v)); v++;
1035 #else
1036         sum += (*v)*(*v); v++;
1037 #endif
1038       }
1039       v = bmat->a;
1040       for (i=0; i<bmat->nz; i++ ) {
1041 #if defined(PETSC_COMPLEX)
1042         sum += real(conj(*v)*(*v)); v++;
1043 #else
1044         sum += (*v)*(*v); v++;
1045 #endif
1046       }
1047       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
1048       *norm = sqrt(*norm);
1049     }
1050     else if (type == NORM_1) { /* max column norm */
1051       double *tmp, *tmp2;
1052       int    *jj, *garray = aij->garray;
1053       tmp  = (double *) PetscMalloc( aij->N*sizeof(double) ); CHKPTRQ(tmp);
1054       tmp2 = (double *) PetscMalloc( aij->N*sizeof(double) ); CHKPTRQ(tmp2);
1055       PetscMemzero(tmp,aij->N*sizeof(double));
1056       *norm = 0.0;
1057       v = amat->a; jj = amat->j;
1058       for ( j=0; j<amat->nz; j++ ) {
1059         tmp[cstart + *jj++ + shift] += PetscAbsScalar(*v);  v++;
1060       }
1061       v = bmat->a; jj = bmat->j;
1062       for ( j=0; j<bmat->nz; j++ ) {
1063         tmp[garray[*jj++ + shift]] += PetscAbsScalar(*v); v++;
1064       }
1065       MPI_Allreduce(tmp,tmp2,aij->N,MPI_DOUBLE,MPI_SUM,mat->comm);
1066       for ( j=0; j<aij->N; j++ ) {
1067         if (tmp2[j] > *norm) *norm = tmp2[j];
1068       }
1069       PetscFree(tmp); PetscFree(tmp2);
1070     }
1071     else if (type == NORM_INFINITY) { /* max row norm */
1072       double ntemp = 0.0;
1073       for ( j=0; j<amat->m; j++ ) {
1074         v = amat->a + amat->i[j] + shift;
1075         sum = 0.0;
1076         for ( i=0; i<amat->i[j+1]-amat->i[j]; i++ ) {
1077           sum += PetscAbsScalar(*v); v++;
1078         }
1079         v = bmat->a + bmat->i[j] + shift;
1080         for ( i=0; i<bmat->i[j+1]-bmat->i[j]; i++ ) {
1081           sum += PetscAbsScalar(*v); v++;
1082         }
1083         if (sum > ntemp) ntemp = sum;
1084       }
1085       MPI_Allreduce(&ntemp,norm,1,MPI_DOUBLE,MPI_MAX,mat->comm);
1086     }
1087     else {
1088       SETERRQ(1,"MatNorm_MPIAIJ:No support for two norm");
1089     }
1090   }
1091   return 0;
1092 }
1093 
1094 static int MatTranspose_MPIAIJ(Mat A,Mat *matout)
1095 {
1096   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
1097   Mat_SeqAIJ *Aloc = (Mat_SeqAIJ *) a->A->data;
1098   int        ierr,shift = Aloc->indexshift;
1099   Mat        B;
1100   int        M = a->M, N = a->N,m,*ai,*aj,row,*cols,i,*ct;
1101   Scalar     *array;
1102 
1103   if (matout == PETSC_NULL && M != N)
1104     SETERRQ(1,"MatTranspose_MPIAIJ:Square matrix only for in-place");
1105   ierr = MatCreateMPIAIJ(A->comm,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,
1106          PETSC_NULL,&B); CHKERRQ(ierr);
1107 
1108   /* copy over the A part */
1109   Aloc = (Mat_SeqAIJ*) a->A->data;
1110   m = Aloc->m; ai = Aloc->i; aj = Aloc->j; array = Aloc->a;
1111   row = a->rstart;
1112   for ( i=0; i<ai[m]+shift; i++ ) {aj[i] += a->cstart + shift;}
1113   for ( i=0; i<m; i++ ) {
1114     ierr = MatSetValues(B,ai[i+1]-ai[i],aj,1,&row,array,INSERT_VALUES);CHKERRQ(ierr);
1115     row++; array += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
1116   }
1117   aj = Aloc->j;
1118   for ( i=0; i<ai[m]+shift; i++ ) {aj[i] -= a->cstart + shift;}
1119 
1120   /* copy over the B part */
1121   Aloc = (Mat_SeqAIJ*) a->B->data;
1122   m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; array = Aloc->a;
1123   row = a->rstart;
1124   ct = cols = (int *) PetscMalloc( (1+ai[m]-shift)*sizeof(int) ); CHKPTRQ(cols);
1125   for ( i=0; i<ai[m]+shift; i++ ) {cols[i] = a->garray[aj[i]+shift];}
1126   for ( i=0; i<m; i++ ) {
1127     ierr = MatSetValues(B,ai[i+1]-ai[i],cols,1,&row,array,INSERT_VALUES);CHKERRQ(ierr);
1128     row++; array += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
1129   }
1130   PetscFree(ct);
1131   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1132   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1133   if (matout != PETSC_NULL) {
1134     *matout = B;
1135   } else {
1136     /* This isn't really an in-place transpose .... but free data structures from a */
1137     PetscFree(a->rowners);
1138     ierr = MatDestroy(a->A); CHKERRQ(ierr);
1139     ierr = MatDestroy(a->B); CHKERRQ(ierr);
1140     if (a->colmap) PetscFree(a->colmap);
1141     if (a->garray) PetscFree(a->garray);
1142     if (a->lvec) VecDestroy(a->lvec);
1143     if (a->Mvctx) VecScatterDestroy(a->Mvctx);
1144     PetscFree(a);
1145     PetscMemcpy(A,B,sizeof(struct _Mat));
1146     PetscHeaderDestroy(B);
1147   }
1148   return 0;
1149 }
1150 
1151 int MatDiagonalScale_MPIAIJ(Mat A,Vec ll,Vec rr)
1152 {
1153   Mat a = ((Mat_MPIAIJ *) A->data)->A;
1154   Mat b = ((Mat_MPIAIJ *) A->data)->B;
1155   int ierr,s1,s2,s3;
1156 
1157   if (ll)  {
1158     ierr = VecGetLocalSize(ll,&s1); CHKERRQ(ierr);
1159     ierr = MatGetLocalSize(A,&s2,&s3); CHKERRQ(ierr);
1160     if (s1!=s2) SETERRQ(1,"MatDiagonalScale_MPIAIJ:non-conforming local sizes");
1161     ierr = MatDiagonalScale(a,ll,0); CHKERRQ(ierr);
1162     ierr = MatDiagonalScale(b,ll,0); CHKERRQ(ierr);
1163   }
1164   if (rr) SETERRQ(1,"MatDiagonalScale_MPIAIJ:not supported for right vector");
1165   return 0;
1166 }
1167 
1168 
1169 extern int MatPrintHelp_SeqAIJ(Mat);
1170 static int MatPrintHelp_MPIAIJ(Mat A)
1171 {
1172   Mat_MPIAIJ *a   = (Mat_MPIAIJ*) A->data;
1173 
1174   if (!a->rank) return MatPrintHelp_SeqAIJ(a->A);
1175   else return 0;
1176 }
1177 
1178 static int MatGetBlockSize_MPIAIJ(Mat A,int *bs)
1179 {
1180   *bs = 1;
1181   return 0;
1182 }
1183 
1184 extern int MatConvert_MPIAIJ(Mat,MatType,Mat *);
1185 static int MatConvertSameType_MPIAIJ(Mat,Mat *,int);
1186 extern int MatIncreaseOverlap_MPIAIJ(Mat , int, IS *, int);
1187 int MatGetSubMatrices_MPIAIJ (Mat ,int , IS *,IS *,MatGetSubMatrixCall,Mat **);
1188 /* -------------------------------------------------------------------*/
1189 static struct _MatOps MatOps = {MatSetValues_MPIAIJ,
1190        MatGetRow_MPIAIJ,MatRestoreRow_MPIAIJ,
1191        MatMult_MPIAIJ,MatMultAdd_MPIAIJ,
1192        MatMultTrans_MPIAIJ,MatMultTransAdd_MPIAIJ,
1193        0,0,
1194        0,0,
1195        0,0,
1196        MatRelax_MPIAIJ,
1197        MatTranspose_MPIAIJ,
1198        MatGetInfo_MPIAIJ,0,
1199        MatGetDiagonal_MPIAIJ,MatDiagonalScale_MPIAIJ,MatNorm_MPIAIJ,
1200        MatAssemblyBegin_MPIAIJ,MatAssemblyEnd_MPIAIJ,
1201        0,
1202        MatSetOption_MPIAIJ,MatZeroEntries_MPIAIJ,MatZeroRows_MPIAIJ,
1203        0,0,0,0,
1204        MatGetSize_MPIAIJ,MatGetLocalSize_MPIAIJ,MatGetOwnershipRange_MPIAIJ,
1205        0,0,
1206        0,0,MatConvert_MPIAIJ,MatConvertSameType_MPIAIJ,0,0,
1207        0,0,0,
1208        MatGetSubMatrices_MPIAIJ,MatIncreaseOverlap_MPIAIJ,MatGetValues_MPIAIJ,0,
1209        MatPrintHelp_MPIAIJ,
1210        MatScale_MPIAIJ,0,0,0,
1211        MatGetBlockSize_MPIAIJ};
1212 
1213 
1214 /*@C
1215    MatCreateMPIAIJ - Creates a sparse parallel matrix in AIJ format
1216    (the default parallel PETSc format).  For good matrix assembly performance
1217    the user should preallocate the matrix storage by setting the parameters
1218    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1219    performance can be increased by more than a factor of 50.
1220 
1221    Input Parameters:
1222 .  comm - MPI communicator
1223 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1224            This value should be the same as the local size used in creating the
1225            y vector for the matrix-vector product y = Ax.
1226 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1227            This value should be the same as the local size used in creating the
1228            x vector for the matrix-vector product y = Ax.
1229 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1230 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1231 .  d_nz - number of nonzeros per row in diagonal portion of local submatrix
1232            (same for all local rows)
1233 .  d_nzz - array containing the number of nonzeros in the various rows of the
1234            diagonal portion of the local submatrix (possibly different for each row)
1235            or PETSC_NULL. You must leave room for the diagonal entry even if
1236            it is zero.
1237 .  o_nz - number of nonzeros per row in the off-diagonal portion of local
1238            submatrix (same for all local rows).
1239 .  o_nzz - array containing the number of nonzeros in the various rows of the
1240            off-diagonal portion of the local submatrix (possibly different for
1241            each row) or PETSC_NULL.
1242 
1243    Output Parameter:
1244 .  A - the matrix
1245 
1246    Notes:
1247    The AIJ format (also called the Yale sparse matrix format or
1248    compressed row storage), is fully compatible with standard Fortran 77
1249    storage.  That is, the stored row and column indices can begin at
1250    either one (as in Fortran) or zero.  See the users manual for details.
1251 
1252    The user MUST specify either the local or global matrix dimensions
1253    (possibly both).
1254 
1255    By default, this format uses inodes (identical nodes) when possible.
1256    We search for consecutive rows with the same nonzero structure, thereby
1257    reusing matrix information to achieve increased efficiency.
1258 
1259    Options Database Keys:
1260 $    -mat_aij_no_inode  - Do not use inodes
1261 $    -mat_aij_inode_limit <limit> - Set inode limit.
1262 $        (max limit=5)
1263 $    -mat_aij_oneindex - Internally use indexing starting at 1
1264 $        rather than 0.  Note: When calling MatSetValues(),
1265 $        the user still MUST index entries starting at 0!
1266 
1267    Storage Information:
1268    For a square global matrix we define each processor's diagonal portion
1269    to be its local rows and the corresponding columns (a square submatrix);
1270    each processor's off-diagonal portion encompasses the remainder of the
1271    local matrix (a rectangular submatrix).
1272 
1273    The user can specify preallocated storage for the diagonal part of
1274    the local submatrix with either d_nz or d_nnz (not both).  Set
1275    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1276    memory allocation.  Likewise, specify preallocated storage for the
1277    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1278 
1279    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1280    the figure below we depict these three local rows and all columns (0-11).
1281 
1282 $          0 1 2 3 4 5 6 7 8 9 10 11
1283 $         -------------------
1284 $  row 3  |  o o o d d d o o o o o o
1285 $  row 4  |  o o o d d d o o o o o o
1286 $  row 5  |  o o o d d d o o o o o o
1287 $         -------------------
1288 $
1289 
1290    Thus, any entries in the d locations are stored in the d (diagonal)
1291    submatrix, and any entries in the o locations are stored in the
1292    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1293    stored simply in the MATSEQAIJ format for compressed row storage.
1294 
1295    Now d_nz should indicate the number of nonzeros per row in the d matrix,
1296    and o_nz should indicate the number of nonzeros per row in the o matrix.
1297    In general, for PDE problems in which most nonzeros are near the diagonal,
1298    one expects d_nz >> o_nz. For large problems you MUST preallocate memory
1299    or you will get TERRIBLE performance; see the users' manual chapter on
1300    matrices and the file $(PETSC_DIR)/Performance.
1301 
1302 .keywords: matrix, aij, compressed row, sparse, parallel
1303 
1304 .seealso: MatCreate(), MatCreateSeqAIJ(), MatSetValues()
1305 @*/
1306 int MatCreateMPIAIJ(MPI_Comm comm,int m,int n,int M,int N,
1307                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1308 {
1309   Mat          B;
1310   Mat_MPIAIJ   *b;
1311   int          ierr, i,sum[2],work[2],size;
1312 
1313   *A = 0;
1314   PetscHeaderCreate(B,_Mat,MAT_COOKIE,MATMPIAIJ,comm);
1315   PLogObjectCreate(B);
1316   B->data       = (void *) (b = PetscNew(Mat_MPIAIJ)); CHKPTRQ(b);
1317   PetscMemzero(b,sizeof(Mat_MPIAIJ));
1318   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1319   MPI_Comm_size(comm,&size);
1320   if (size == 1) {
1321     B->ops.getrowij          = MatGetRowIJ_MPIAIJ;
1322     B->ops.restorerowij      = MatRestoreRowIJ_MPIAIJ;
1323     B->ops.lufactorsymbolic  = MatLUFactorSymbolic_MPIAIJ;
1324     B->ops.lufactornumeric   = MatLUFactorNumeric_MPIAIJ;
1325     B->ops.lufactor          = MatLUFactor_MPIAIJ;
1326     B->ops.solve             = MatSolve_MPIAIJ;
1327     B->ops.solveadd          = MatSolveAdd_MPIAIJ;
1328     B->ops.solvetrans        = MatSolveTrans_MPIAIJ;
1329     B->ops.solvetransadd     = MatSolveTransAdd_MPIAIJ;
1330     B->ops.ilufactorsymbolic = MatILUFactorSymbolic_MPIAIJ;
1331   }
1332   B->destroy    = MatDestroy_MPIAIJ;
1333   B->view       = MatView_MPIAIJ;
1334   B->factor     = 0;
1335   B->assembled  = PETSC_FALSE;
1336 
1337   b->insertmode = NOT_SET_VALUES;
1338   MPI_Comm_rank(comm,&b->rank);
1339   MPI_Comm_size(comm,&b->size);
1340 
1341   if (m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
1342     SETERRQ(1,"MatCreateMPIAIJ:Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1343 
1344   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1345     work[0] = m; work[1] = n;
1346     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
1347     if (M == PETSC_DECIDE) M = sum[0];
1348     if (N == PETSC_DECIDE) N = sum[1];
1349   }
1350   if (m == PETSC_DECIDE) {m = M/b->size + ((M % b->size) > b->rank);}
1351   if (n == PETSC_DECIDE) {n = N/b->size + ((N % b->size) > b->rank);}
1352   b->m = m; B->m = m;
1353   b->n = n; B->n = n;
1354   b->N = N; B->N = N;
1355   b->M = M; B->M = M;
1356 
1357   /* build local table of row and column ownerships */
1358   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1359   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIAIJ));
1360   b->cowners = b->rowners + b->size + 2;
1361   MPI_Allgather(&m,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
1362   b->rowners[0] = 0;
1363   for ( i=2; i<=b->size; i++ ) {
1364     b->rowners[i] += b->rowners[i-1];
1365   }
1366   b->rstart = b->rowners[b->rank];
1367   b->rend   = b->rowners[b->rank+1];
1368   MPI_Allgather(&n,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
1369   b->cowners[0] = 0;
1370   for ( i=2; i<=b->size; i++ ) {
1371     b->cowners[i] += b->cowners[i-1];
1372   }
1373   b->cstart = b->cowners[b->rank];
1374   b->cend   = b->cowners[b->rank+1];
1375 
1376   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1377   ierr = MatCreateSeqAIJ(MPI_COMM_SELF,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1378   PLogObjectParent(B,b->A);
1379   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1380   ierr = MatCreateSeqAIJ(MPI_COMM_SELF,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1381   PLogObjectParent(B,b->B);
1382 
1383   /* build cache for off array entries formed */
1384   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1385   b->colmap      = 0;
1386   b->garray      = 0;
1387   b->roworiented = 1;
1388 
1389   /* stuff used for matrix vector multiply */
1390   b->lvec      = 0;
1391   b->Mvctx     = 0;
1392 
1393   /* stuff for MatGetRow() */
1394   b->rowindices   = 0;
1395   b->rowvalues    = 0;
1396   b->getrowactive = PETSC_FALSE;
1397 
1398   *A = B;
1399   return 0;
1400 }
1401 
1402 static int MatConvertSameType_MPIAIJ(Mat matin,Mat *newmat,int cpvalues)
1403 {
1404   Mat        mat;
1405   Mat_MPIAIJ *a,*oldmat = (Mat_MPIAIJ *) matin->data;
1406   int        ierr, len=0, flg;
1407 
1408   *newmat       = 0;
1409   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIAIJ,matin->comm);
1410   PLogObjectCreate(mat);
1411   mat->data       = (void *) (a = PetscNew(Mat_MPIAIJ)); CHKPTRQ(a);
1412   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1413   mat->destroy    = MatDestroy_MPIAIJ;
1414   mat->view       = MatView_MPIAIJ;
1415   mat->factor     = matin->factor;
1416   mat->assembled  = PETSC_TRUE;
1417 
1418   a->m = mat->m   = oldmat->m;
1419   a->n = mat->n   = oldmat->n;
1420   a->M = mat->M   = oldmat->M;
1421   a->N = mat->N   = oldmat->N;
1422 
1423   a->rstart       = oldmat->rstart;
1424   a->rend         = oldmat->rend;
1425   a->cstart       = oldmat->cstart;
1426   a->cend         = oldmat->cend;
1427   a->size         = oldmat->size;
1428   a->rank         = oldmat->rank;
1429   a->insertmode   = NOT_SET_VALUES;
1430   a->rowvalues    = 0;
1431   a->getrowactive = PETSC_FALSE;
1432 
1433   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1434   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIAIJ));
1435   a->cowners = a->rowners + a->size + 2;
1436   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1437   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1438   if (oldmat->colmap) {
1439     a->colmap = (int *) PetscMalloc((a->N)*sizeof(int));CHKPTRQ(a->colmap);
1440     PLogObjectMemory(mat,(a->N)*sizeof(int));
1441     PetscMemcpy(a->colmap,oldmat->colmap,(a->N)*sizeof(int));
1442   } else a->colmap = 0;
1443   if (oldmat->garray) {
1444     len = ((Mat_SeqAIJ *) (oldmat->B->data))->n;
1445     a->garray = (int *) PetscMalloc((len+1)*sizeof(int)); CHKPTRQ(a->garray);
1446     PLogObjectMemory(mat,len*sizeof(int));
1447     if (len) PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1448   } else a->garray = 0;
1449 
1450   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1451   PLogObjectParent(mat,a->lvec);
1452   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1453   PLogObjectParent(mat,a->Mvctx);
1454   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1455   PLogObjectParent(mat,a->A);
1456   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1457   PLogObjectParent(mat,a->B);
1458   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1459   if (flg) {
1460     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1461   }
1462   *newmat = mat;
1463   return 0;
1464 }
1465 
1466 #include "sys.h"
1467 
1468 int MatLoad_MPIAIJ(Viewer viewer,MatType type,Mat *newmat)
1469 {
1470   Mat          A;
1471   int          i, nz, ierr, j,rstart, rend, fd;
1472   Scalar       *vals,*svals;
1473   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1474   MPI_Status   status;
1475   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,maxnz,*cols;
1476   int          *ourlens,*sndcounts = 0,*procsnz = 0, *offlens,jj,*mycols,*smycols;
1477   int          tag = ((PetscObject)viewer)->tag;
1478 
1479   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1480   if (!rank) {
1481     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1482     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
1483     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIAIJ:not matrix object");
1484   }
1485 
1486   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1487   M = header[1]; N = header[2];
1488   /* determine ownership of all rows */
1489   m = M/size + ((M % size) > rank);
1490   rowners = (int *) PetscMalloc((size+2)*sizeof(int)); CHKPTRQ(rowners);
1491   MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1492   rowners[0] = 0;
1493   for ( i=2; i<=size; i++ ) {
1494     rowners[i] += rowners[i-1];
1495   }
1496   rstart = rowners[rank];
1497   rend   = rowners[rank+1];
1498 
1499   /* distribute row lengths to all processors */
1500   ourlens = (int*) PetscMalloc( 2*(rend-rstart)*sizeof(int) ); CHKPTRQ(ourlens);
1501   offlens = ourlens + (rend-rstart);
1502   if (!rank) {
1503     rowlengths = (int*) PetscMalloc( M*sizeof(int) ); CHKPTRQ(rowlengths);
1504     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1505     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1506     for ( i=0; i<size; i++ ) sndcounts[i] = rowners[i+1] - rowners[i];
1507     MPI_Scatterv(rowlengths,sndcounts,rowners,MPI_INT,ourlens,rend-rstart,MPI_INT,0,comm);
1508     PetscFree(sndcounts);
1509   }
1510   else {
1511     MPI_Scatterv(0,0,0,MPI_INT,ourlens,rend-rstart,MPI_INT, 0,comm);
1512   }
1513 
1514   if (!rank) {
1515     /* calculate the number of nonzeros on each processor */
1516     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1517     PetscMemzero(procsnz,size*sizeof(int));
1518     for ( i=0; i<size; i++ ) {
1519       for ( j=rowners[i]; j< rowners[i+1]; j++ ) {
1520         procsnz[i] += rowlengths[j];
1521       }
1522     }
1523     PetscFree(rowlengths);
1524 
1525     /* determine max buffer needed and allocate it */
1526     maxnz = 0;
1527     for ( i=0; i<size; i++ ) {
1528       maxnz = PetscMax(maxnz,procsnz[i]);
1529     }
1530     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1531 
1532     /* read in my part of the matrix column indices  */
1533     nz = procsnz[0];
1534     mycols = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1535     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1536 
1537     /* read in every one elses and ship off */
1538     for ( i=1; i<size; i++ ) {
1539       nz = procsnz[i];
1540       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1541       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1542     }
1543     PetscFree(cols);
1544   }
1545   else {
1546     /* determine buffer space needed for message */
1547     nz = 0;
1548     for ( i=0; i<m; i++ ) {
1549       nz += ourlens[i];
1550     }
1551     mycols = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1552 
1553     /* receive message of column indices*/
1554     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1555     MPI_Get_count(&status,MPI_INT,&maxnz);
1556     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIAIJ:something is wrong with file");
1557   }
1558 
1559   /* loop over local rows, determining number of off diagonal entries */
1560   PetscMemzero(offlens,m*sizeof(int));
1561   jj = 0;
1562   for ( i=0; i<m; i++ ) {
1563     for ( j=0; j<ourlens[i]; j++ ) {
1564       if (mycols[jj] < rstart || mycols[jj] >= rend) offlens[i]++;
1565       jj++;
1566     }
1567   }
1568 
1569   /* create our matrix */
1570   for ( i=0; i<m; i++ ) {
1571     ourlens[i] -= offlens[i];
1572   }
1573   ierr = MatCreateMPIAIJ(comm,m,PETSC_DECIDE,M,N,0,ourlens,0,offlens,newmat);CHKERRQ(ierr);
1574   A = *newmat;
1575   MatSetOption(A,MAT_COLUMNS_SORTED);
1576   for ( i=0; i<m; i++ ) {
1577     ourlens[i] += offlens[i];
1578   }
1579 
1580   if (!rank) {
1581     vals = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(vals);
1582 
1583     /* read in my part of the matrix numerical values  */
1584     nz = procsnz[0];
1585     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1586 
1587     /* insert into matrix */
1588     jj      = rstart;
1589     smycols = mycols;
1590     svals   = vals;
1591     for ( i=0; i<m; i++ ) {
1592       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1593       smycols += ourlens[i];
1594       svals   += ourlens[i];
1595       jj++;
1596     }
1597 
1598     /* read in other processors and ship out */
1599     for ( i=1; i<size; i++ ) {
1600       nz = procsnz[i];
1601       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1602       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1603     }
1604     PetscFree(procsnz);
1605   }
1606   else {
1607     /* receive numeric values */
1608     vals = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(vals);
1609 
1610     /* receive message of values*/
1611     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1612     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1613     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIAIJ:something is wrong with file");
1614 
1615     /* insert into matrix */
1616     jj      = rstart;
1617     smycols = mycols;
1618     svals   = vals;
1619     for ( i=0; i<m; i++ ) {
1620       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1621       smycols += ourlens[i];
1622       svals   += ourlens[i];
1623       jj++;
1624     }
1625   }
1626   PetscFree(ourlens); PetscFree(vals); PetscFree(mycols); PetscFree(rowners);
1627 
1628   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1629   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1630   return 0;
1631 }
1632