xref: /petsc/src/mat/impls/aij/mpi/mpiaij.c (revision fea6b2d80d8ab4080338562a5cbef6dcdf5bb470)
1 
2 #ifndef lint
3 static char vcid[] = "$Id: mpiaij.c,v 1.168 1996/09/19 20:52:39 balay Exp bsmith $";
4 #endif
5 
6 #include "src/mat/impls/aij/mpi/mpiaij.h"
7 #include "src/vec/vecimpl.h"
8 #include "src/inline/spops.h"
9 
10 /* local utility routine that creates a mapping from the global column
11 number to the local number in the off-diagonal part of the local
12 storage of the matrix.  This is done in a non scable way since the
13 length of colmap equals the global matrix length.
14 */
15 static int CreateColmap_MPIAIJ_Private(Mat mat)
16 {
17   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
18   Mat_SeqAIJ *B = (Mat_SeqAIJ*) aij->B->data;
19   int        n = B->n,i;
20 
21   aij->colmap = (int *) PetscMalloc(aij->N*sizeof(int));CHKPTRQ(aij->colmap);
22   PLogObjectMemory(mat,aij->N*sizeof(int));
23   PetscMemzero(aij->colmap,aij->N*sizeof(int));
24   for ( i=0; i<n; i++ ) aij->colmap[aij->garray[i]] = i+1;
25   return 0;
26 }
27 
28 extern int DisAssemble_MPIAIJ(Mat);
29 
30 static int MatGetRowIJ_MPIAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
31                            PetscTruth *done)
32 {
33   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
34   int        ierr;
35   if (aij->size == 1) {
36     ierr = MatGetRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
37   } else SETERRQ(1,"MatGetRowIJ_MPIAIJ:not supported in parallel");
38   return 0;
39 }
40 
41 static int MatRestoreRowIJ_MPIAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
42                                PetscTruth *done)
43 {
44   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
45   int        ierr;
46   if (aij->size == 1) {
47     ierr = MatRestoreRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
48   } else SETERRQ(1,"MatRestoreRowIJ_MPIAIJ:not supported in parallel");
49   return 0;
50 }
51 
52 static int MatSetValues_MPIAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
53 {
54   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
55   Scalar     value;
56   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
57   int        cstart = aij->cstart, cend = aij->cend,row,col;
58   int        roworiented = aij->roworiented;
59 
60   if (aij->insertmode != NOT_SET_VALUES && aij->insertmode != addv) {
61     SETERRQ(1,"MatSetValues_MPIAIJ:Cannot mix inserts and adds");
62   }
63   aij->insertmode = addv;
64   for ( i=0; i<m; i++ ) {
65     if (im[i] < 0) SETERRQ(1,"MatSetValues_MPIAIJ:Negative row");
66     if (im[i] >= aij->M) SETERRQ(1,"MatSetValues_MPIAIJ:Row too large");
67     if (im[i] >= rstart && im[i] < rend) {
68       row = im[i] - rstart;
69       for ( j=0; j<n; j++ ) {
70         if (in[j] < 0) SETERRQ(1,"MatSetValues_MPIAIJ:Negative column");
71         if (in[j] >= aij->N) SETERRQ(1,"MatSetValues_MPIAIJ:Col too large");
72         if (in[j] >= cstart && in[j] < cend){
73           col = in[j] - cstart;
74           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
75           ierr = MatSetValues(aij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
76         }
77         else {
78           if (mat->was_assembled) {
79             if (!aij->colmap) {
80               ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
81             }
82             col = aij->colmap[in[j]] - 1;
83             if (col < 0 && !((Mat_SeqAIJ*)(aij->A->data))->nonew) {
84               ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
85               col =  in[j];
86             }
87           }
88           else col = in[j];
89           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
90           ierr = MatSetValues(aij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
91         }
92       }
93     }
94     else {
95       if (roworiented) {
96         ierr = StashValues_Private(&aij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
97       }
98       else {
99         row = im[i];
100         for ( j=0; j<n; j++ ) {
101           ierr = StashValues_Private(&aij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
102         }
103       }
104     }
105   }
106   return 0;
107 }
108 
109 static int MatGetValues_MPIAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
110 {
111   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
112   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
113   int        cstart = aij->cstart, cend = aij->cend,row,col;
114 
115   for ( i=0; i<m; i++ ) {
116     if (idxm[i] < 0) SETERRQ(1,"MatGetValues_MPIAIJ:Negative row");
117     if (idxm[i] >= aij->M) SETERRQ(1,"MatGetValues_MPIAIJ:Row too large");
118     if (idxm[i] >= rstart && idxm[i] < rend) {
119       row = idxm[i] - rstart;
120       for ( j=0; j<n; j++ ) {
121         if (idxn[j] < 0) SETERRQ(1,"MatGetValues_MPIAIJ:Negative column");
122         if (idxn[j] >= aij->N) SETERRQ(1,"MatGetValues_MPIAIJ:Col too large");
123         if (idxn[j] >= cstart && idxn[j] < cend){
124           col = idxn[j] - cstart;
125           ierr = MatGetValues(aij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
126         }
127         else {
128           if (!aij->colmap) {
129             ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
130           }
131           col = aij->colmap[idxn[j]] - 1;
132           if ((col < 0) || (aij->garray[col] != idxn[j])) *(v+i*n+j) = 0.0;
133           else {
134             ierr = MatGetValues(aij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
135           }
136         }
137       }
138     }
139     else {
140       SETERRQ(1,"MatGetValues_MPIAIJ:Only local values currently supported");
141     }
142   }
143   return 0;
144 }
145 
146 static int MatAssemblyBegin_MPIAIJ(Mat mat,MatAssemblyType mode)
147 {
148   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
149   MPI_Comm    comm = mat->comm;
150   int         size = aij->size, *owners = aij->rowners;
151   int         rank = aij->rank,tag = mat->tag, *owner,*starts,count,ierr;
152   MPI_Request *send_waits,*recv_waits;
153   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
154   InsertMode  addv;
155   Scalar      *rvalues,*svalues;
156 
157   /* make sure all processors are either in INSERTMODE or ADDMODE */
158   MPI_Allreduce(&aij->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
159   if (addv == (ADD_VALUES|INSERT_VALUES)) {
160     SETERRQ(1,"MatAssemblyBegin_MPIAIJ:Some processors inserted others added");
161   }
162   aij->insertmode = addv; /* in case this processor had no cache */
163 
164   /*  first count number of contributors to each processor */
165   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
166   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
167   owner = (int *) PetscMalloc( (aij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
168   for ( i=0; i<aij->stash.n; i++ ) {
169     idx = aij->stash.idx[i];
170     for ( j=0; j<size; j++ ) {
171       if (idx >= owners[j] && idx < owners[j+1]) {
172         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
173       }
174     }
175   }
176   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
177 
178   /* inform other processors of number of messages and max length*/
179   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
180   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
181   nreceives = work[rank];
182   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
183   nmax = work[rank];
184   PetscFree(work);
185 
186   /* post receives:
187        1) each message will consist of ordered pairs
188      (global index,value) we store the global index as a double
189      to simplify the message passing.
190        2) since we don't know how long each individual message is we
191      allocate the largest needed buffer for each receive. Potentially
192      this is a lot of wasted space.
193 
194 
195        This could be done better.
196   */
197   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
198   CHKPTRQ(rvalues);
199   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
200   CHKPTRQ(recv_waits);
201   for ( i=0; i<nreceives; i++ ) {
202     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
203               comm,recv_waits+i);
204   }
205 
206   /* do sends:
207       1) starts[i] gives the starting index in svalues for stuff going to
208          the ith processor
209   */
210   svalues = (Scalar *) PetscMalloc(3*(aij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
211   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
212   CHKPTRQ(send_waits);
213   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
214   starts[0] = 0;
215   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
216   for ( i=0; i<aij->stash.n; i++ ) {
217     svalues[3*starts[owner[i]]]       = (Scalar)  aij->stash.idx[i];
218     svalues[3*starts[owner[i]]+1]     = (Scalar)  aij->stash.idy[i];
219     svalues[3*(starts[owner[i]]++)+2] =  aij->stash.array[i];
220   }
221   PetscFree(owner);
222   starts[0] = 0;
223   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
224   count = 0;
225   for ( i=0; i<size; i++ ) {
226     if (procs[i]) {
227       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
228                 comm,send_waits+count++);
229     }
230   }
231   PetscFree(starts); PetscFree(nprocs);
232 
233   /* Free cache space */
234   PLogInfo(0,"[%d]MatAssemblyBegin_MPIAIJ:Number of off processor values %d\n",rank,aij->stash.n);
235   ierr = StashDestroy_Private(&aij->stash); CHKERRQ(ierr);
236 
237   aij->svalues    = svalues;    aij->rvalues    = rvalues;
238   aij->nsends     = nsends;     aij->nrecvs     = nreceives;
239   aij->send_waits = send_waits; aij->recv_waits = recv_waits;
240   aij->rmax       = nmax;
241 
242   return 0;
243 }
244 extern int MatSetUpMultiply_MPIAIJ(Mat);
245 
246 static int MatAssemblyEnd_MPIAIJ(Mat mat,MatAssemblyType mode)
247 {
248   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
249   MPI_Status  *send_status,recv_status;
250   int         imdex,nrecvs = aij->nrecvs, count = nrecvs, i, n, ierr;
251   int         row,col,other_disassembled;
252   Scalar      *values,val;
253   InsertMode  addv = aij->insertmode;
254 
255   /*  wait on receives */
256   while (count) {
257     MPI_Waitany(nrecvs,aij->recv_waits,&imdex,&recv_status);
258     /* unpack receives into our local space */
259     values = aij->rvalues + 3*imdex*aij->rmax;
260     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
261     n = n/3;
262     for ( i=0; i<n; i++ ) {
263       row = (int) PetscReal(values[3*i]) - aij->rstart;
264       col = (int) PetscReal(values[3*i+1]);
265       val = values[3*i+2];
266       if (col >= aij->cstart && col < aij->cend) {
267         col -= aij->cstart;
268         MatSetValues(aij->A,1,&row,1,&col,&val,addv);
269       }
270       else {
271         if (mat->was_assembled || mat->assembled) {
272           if (!aij->colmap) {
273             ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
274           }
275           col = aij->colmap[col] - 1;
276           if (col < 0  && !((Mat_SeqAIJ*)(aij->A->data))->nonew) {
277             ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
278             col = (int) PetscReal(values[3*i+1]);
279           }
280         }
281         MatSetValues(aij->B,1,&row,1,&col,&val,addv);
282       }
283     }
284     count--;
285   }
286   PetscFree(aij->recv_waits); PetscFree(aij->rvalues);
287 
288   /* wait on sends */
289   if (aij->nsends) {
290     send_status = (MPI_Status *) PetscMalloc(aij->nsends*sizeof(MPI_Status));
291     CHKPTRQ(send_status);
292     MPI_Waitall(aij->nsends,aij->send_waits,send_status);
293     PetscFree(send_status);
294   }
295   PetscFree(aij->send_waits); PetscFree(aij->svalues);
296 
297   aij->insertmode = NOT_SET_VALUES;
298   ierr = MatAssemblyBegin(aij->A,mode); CHKERRQ(ierr);
299   ierr = MatAssemblyEnd(aij->A,mode); CHKERRQ(ierr);
300 
301   /* determine if any processor has disassembled, if so we must
302      also disassemble ourselfs, in order that we may reassemble. */
303   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
304   if (mat->was_assembled && !other_disassembled) {
305     ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
306   }
307 
308   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
309     ierr = MatSetUpMultiply_MPIAIJ(mat); CHKERRQ(ierr);
310   }
311   ierr = MatAssemblyBegin(aij->B,mode); CHKERRQ(ierr);
312   ierr = MatAssemblyEnd(aij->B,mode); CHKERRQ(ierr);
313 
314   if (aij->rowvalues) {PetscFree(aij->rowvalues); aij->rowvalues = 0;}
315   return 0;
316 }
317 
318 static int MatZeroEntries_MPIAIJ(Mat A)
319 {
320   Mat_MPIAIJ *l = (Mat_MPIAIJ *) A->data;
321   int        ierr;
322   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
323   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
324   return 0;
325 }
326 
327 /* the code does not do the diagonal entries correctly unless the
328    matrix is square and the column and row owerships are identical.
329    This is a BUG. The only way to fix it seems to be to access
330    aij->A and aij->B directly and not through the MatZeroRows()
331    routine.
332 */
333 static int MatZeroRows_MPIAIJ(Mat A,IS is,Scalar *diag)
334 {
335   Mat_MPIAIJ     *l = (Mat_MPIAIJ *) A->data;
336   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
337   int            *procs,*nprocs,j,found,idx,nsends,*work;
338   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
339   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
340   int            *lens,imdex,*lrows,*values;
341   MPI_Comm       comm = A->comm;
342   MPI_Request    *send_waits,*recv_waits;
343   MPI_Status     recv_status,*send_status;
344   IS             istmp;
345 
346   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
347   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
348 
349   /*  first count number of contributors to each processor */
350   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
351   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
352   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
353   for ( i=0; i<N; i++ ) {
354     idx = rows[i];
355     found = 0;
356     for ( j=0; j<size; j++ ) {
357       if (idx >= owners[j] && idx < owners[j+1]) {
358         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
359       }
360     }
361     if (!found) SETERRQ(1,"MatZeroRows_MPIAIJ:Index out of range");
362   }
363   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
364 
365   /* inform other processors of number of messages and max length*/
366   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
367   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
368   nrecvs = work[rank];
369   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
370   nmax = work[rank];
371   PetscFree(work);
372 
373   /* post receives:   */
374   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
375   CHKPTRQ(rvalues);
376   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
377   CHKPTRQ(recv_waits);
378   for ( i=0; i<nrecvs; i++ ) {
379     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
380   }
381 
382   /* do sends:
383       1) starts[i] gives the starting index in svalues for stuff going to
384          the ith processor
385   */
386   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
387   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
388   CHKPTRQ(send_waits);
389   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
390   starts[0] = 0;
391   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
392   for ( i=0; i<N; i++ ) {
393     svalues[starts[owner[i]]++] = rows[i];
394   }
395   ISRestoreIndices(is,&rows);
396 
397   starts[0] = 0;
398   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
399   count = 0;
400   for ( i=0; i<size; i++ ) {
401     if (procs[i]) {
402       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
403     }
404   }
405   PetscFree(starts);
406 
407   base = owners[rank];
408 
409   /*  wait on receives */
410   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
411   source = lens + nrecvs;
412   count  = nrecvs; slen = 0;
413   while (count) {
414     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
415     /* unpack receives into our local space */
416     MPI_Get_count(&recv_status,MPI_INT,&n);
417     source[imdex]  = recv_status.MPI_SOURCE;
418     lens[imdex]  = n;
419     slen += n;
420     count--;
421   }
422   PetscFree(recv_waits);
423 
424   /* move the data into the send scatter */
425   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
426   count = 0;
427   for ( i=0; i<nrecvs; i++ ) {
428     values = rvalues + i*nmax;
429     for ( j=0; j<lens[i]; j++ ) {
430       lrows[count++] = values[j] - base;
431     }
432   }
433   PetscFree(rvalues); PetscFree(lens);
434   PetscFree(owner); PetscFree(nprocs);
435 
436   /* actually zap the local rows */
437   ierr = ISCreateGeneral(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
438   PLogObjectParent(A,istmp);
439   PetscFree(lrows);
440   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
441   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
442   ierr = ISDestroy(istmp); CHKERRQ(ierr);
443 
444   /* wait on sends */
445   if (nsends) {
446     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
447     CHKPTRQ(send_status);
448     MPI_Waitall(nsends,send_waits,send_status);
449     PetscFree(send_status);
450   }
451   PetscFree(send_waits); PetscFree(svalues);
452 
453   return 0;
454 }
455 
456 static int MatMult_MPIAIJ(Mat A,Vec xx,Vec yy)
457 {
458   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
459   int        ierr,nt;
460 
461   ierr = VecGetLocalSize(xx,&nt);  CHKERRQ(ierr);
462   if (nt != a->n) {
463     SETERRQ(1,"MatMult_MPIAIJ:Incompatible parition of A and xx");
464   }
465   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx); CHKERRQ(ierr);
466   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
467   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx); CHKERRQ(ierr);
468   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
469   return 0;
470 }
471 
472 static int MatMultAdd_MPIAIJ(Mat A,Vec xx,Vec yy,Vec zz)
473 {
474   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
475   int        ierr;
476   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
477   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
478   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
479   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
480   return 0;
481 }
482 
483 static int MatMultTrans_MPIAIJ(Mat A,Vec xx,Vec yy)
484 {
485   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
486   int        ierr;
487 
488   /* do nondiagonal part */
489   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
490   /* send it on its way */
491   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
492   /* do local part */
493   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
494   /* receive remote parts: note this assumes the values are not actually */
495   /* inserted in yy until the next line, which is true for my implementation*/
496   /* but is not perhaps always true. */
497   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
498   return 0;
499 }
500 
501 static int MatMultTransAdd_MPIAIJ(Mat A,Vec xx,Vec yy,Vec zz)
502 {
503   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
504   int        ierr;
505 
506   /* do nondiagonal part */
507   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
508   /* send it on its way */
509   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
510   /* do local part */
511   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
512   /* receive remote parts: note this assumes the values are not actually */
513   /* inserted in yy until the next line, which is true for my implementation*/
514   /* but is not perhaps always true. */
515   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,
516                   (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
517   return 0;
518 }
519 
520 /*
521   This only works correctly for square matrices where the subblock A->A is the
522    diagonal block
523 */
524 static int MatGetDiagonal_MPIAIJ(Mat A,Vec v)
525 {
526   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
527   if (a->M != a->N)
528     SETERRQ(1,"MatGetDiagonal_MPIAIJ:Supports only square matrix where A->A is diag block");
529   if (a->rstart != a->cstart || a->rend != a->cend) {
530     SETERRQ(1,"MatGetDiagonal_MPIAIJ:row partition must equal col partition");  }
531   return MatGetDiagonal(a->A,v);
532 }
533 
534 static int MatScale_MPIAIJ(Scalar *aa,Mat A)
535 {
536   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
537   int        ierr;
538   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
539   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
540   return 0;
541 }
542 
543 static int MatDestroy_MPIAIJ(PetscObject obj)
544 {
545   Mat        mat = (Mat) obj;
546   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
547   int        ierr;
548 #if defined(PETSC_LOG)
549   PLogObjectState(obj,"Rows=%d, Cols=%d",aij->M,aij->N);
550 #endif
551   PetscFree(aij->rowners);
552   ierr = MatDestroy(aij->A); CHKERRQ(ierr);
553   ierr = MatDestroy(aij->B); CHKERRQ(ierr);
554   if (aij->colmap) PetscFree(aij->colmap);
555   if (aij->garray) PetscFree(aij->garray);
556   if (aij->lvec)   VecDestroy(aij->lvec);
557   if (aij->Mvctx)  VecScatterDestroy(aij->Mvctx);
558   if (aij->rowvalues) PetscFree(aij->rowvalues);
559   PetscFree(aij);
560   PLogObjectDestroy(mat);
561   PetscHeaderDestroy(mat);
562   return 0;
563 }
564 #include "draw.h"
565 #include "pinclude/pviewer.h"
566 
567 static int MatView_MPIAIJ_Binary(Mat mat,Viewer viewer)
568 {
569   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
570   int         ierr;
571 
572   if (aij->size == 1) {
573     ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
574   }
575   else SETERRQ(1,"MatView_MPIAIJ_Binary:Only uniprocessor output supported");
576   return 0;
577 }
578 
579 static int MatView_MPIAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
580 {
581   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
582   Mat_SeqAIJ* C = (Mat_SeqAIJ*)aij->A->data;
583   int         ierr, format,shift = C->indexshift,rank;
584   FILE        *fd;
585   ViewerType  vtype;
586 
587   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
588   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
589     ierr = ViewerGetFormat(viewer,&format);
590     if (format == ASCII_FORMAT_INFO_DETAILED) {
591       MatInfo info;
592       int     flg;
593       MPI_Comm_rank(mat->comm,&rank);
594       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
595       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
596       ierr = OptionsHasName(PETSC_NULL,"-mat_aij_no_inode",&flg); CHKERRQ(ierr);
597       PetscSequentialPhaseBegin(mat->comm,1);
598       if (flg) fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d mem %d, not using I-node routines\n",
599          rank,aij->m,(int)info.nz_used,(int)info.nz_allocated,(int)info.memory);
600       else fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d mem %d, using I-node routines\n",
601          rank,aij->m,(int)info.nz_used,(int)info.nz_allocated,(int)info.memory);
602       ierr = MatGetInfo(aij->A,MAT_LOCAL,&info);
603       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used);
604       ierr = MatGetInfo(aij->B,MAT_LOCAL,&info);
605       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used);
606       fflush(fd);
607       PetscSequentialPhaseEnd(mat->comm,1);
608       ierr = VecScatterView(aij->Mvctx,viewer); CHKERRQ(ierr);
609       return 0;
610     }
611     else if (format == ASCII_FORMAT_INFO) {
612       return 0;
613     }
614   }
615 
616   if (vtype == DRAW_VIEWER) {
617     Draw       draw;
618     PetscTruth isnull;
619     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
620     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
621   }
622 
623   if (vtype == ASCII_FILE_VIEWER) {
624     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
625     PetscSequentialPhaseBegin(mat->comm,1);
626     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
627            aij->rank,aij->m,aij->rstart,aij->rend,aij->n,aij->cstart,
628            aij->cend);
629     ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
630     ierr = MatView(aij->B,viewer); CHKERRQ(ierr);
631     fflush(fd);
632     PetscSequentialPhaseEnd(mat->comm,1);
633   }
634   else {
635     int size = aij->size;
636     rank = aij->rank;
637     if (size == 1) {
638       ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
639     }
640     else {
641       /* assemble the entire matrix onto first processor. */
642       Mat         A;
643       Mat_SeqAIJ *Aloc;
644       int         M = aij->M, N = aij->N,m,*ai,*aj,row,*cols,i,*ct;
645       Scalar      *a;
646 
647       if (!rank) {
648         ierr = MatCreateMPIAIJ(mat->comm,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
649                CHKERRQ(ierr);
650       }
651       else {
652         ierr = MatCreateMPIAIJ(mat->comm,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
653                CHKERRQ(ierr);
654       }
655       PLogObjectParent(mat,A);
656 
657       /* copy over the A part */
658       Aloc = (Mat_SeqAIJ*) aij->A->data;
659       m = Aloc->m; ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
660       row = aij->rstart;
661       for ( i=0; i<ai[m]+shift; i++ ) {aj[i] += aij->cstart + shift;}
662       for ( i=0; i<m; i++ ) {
663         ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],aj,a,INSERT_VALUES);CHKERRQ(ierr);
664         row++; a += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
665       }
666       aj = Aloc->j;
667       for ( i=0; i<ai[m]+shift; i++ ) {aj[i] -= aij->cstart + shift;}
668 
669       /* copy over the B part */
670       Aloc = (Mat_SeqAIJ*) aij->B->data;
671       m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
672       row = aij->rstart;
673       ct = cols = (int *) PetscMalloc( (ai[m]+1)*sizeof(int) ); CHKPTRQ(cols);
674       for ( i=0; i<ai[m]+shift; i++ ) {cols[i] = aij->garray[aj[i]+shift];}
675       for ( i=0; i<m; i++ ) {
676         ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],cols,a,INSERT_VALUES);CHKERRQ(ierr);
677         row++; a += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
678       }
679       PetscFree(ct);
680       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
681       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
682       if (!rank) {
683         ierr = MatView(((Mat_MPIAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
684       }
685       ierr = MatDestroy(A); CHKERRQ(ierr);
686     }
687   }
688   return 0;
689 }
690 
691 static int MatView_MPIAIJ(PetscObject obj,Viewer viewer)
692 {
693   Mat         mat = (Mat) obj;
694   int         ierr;
695   ViewerType  vtype;
696 
697   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
698   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
699       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
700     ierr = MatView_MPIAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
701   }
702   else if (vtype == BINARY_FILE_VIEWER) {
703     return MatView_MPIAIJ_Binary(mat,viewer);
704   }
705   return 0;
706 }
707 
708 /*
709     This has to provide several versions.
710 
711      2) a) use only local smoothing updating outer values only once.
712         b) local smoothing updating outer values each inner iteration
713      3) color updating out values betwen colors.
714 */
715 static int MatRelax_MPIAIJ(Mat matin,Vec bb,double omega,MatSORType flag,
716                            double fshift,int its,Vec xx)
717 {
718   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
719   Mat        AA = mat->A, BB = mat->B;
720   Mat_SeqAIJ *A = (Mat_SeqAIJ *) AA->data, *B = (Mat_SeqAIJ *)BB->data;
721   Scalar     *b,*x,*xs,*ls,d,*v,sum;
722   int        ierr,*idx, *diag;
723   int        n = mat->n, m = mat->m, i,shift = A->indexshift;
724 
725   VecGetArray(xx,&x); VecGetArray(bb,&b); VecGetArray(mat->lvec,&ls);
726   xs = x + shift; /* shift by one for index start of 1 */
727   ls = ls + shift;
728   if (!A->diag) {if ((ierr = MatMarkDiag_SeqAIJ(AA))) return ierr;}
729   diag = A->diag;
730   if ((flag & SOR_LOCAL_SYMMETRIC_SWEEP) == SOR_LOCAL_SYMMETRIC_SWEEP){
731     if (flag & SOR_ZERO_INITIAL_GUESS) {
732       return (*mat->A->ops.relax)(mat->A,bb,omega,flag,fshift,its,xx);
733     }
734     ierr=VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,mat->Mvctx);
735     CHKERRQ(ierr);
736     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,mat->Mvctx);
737     CHKERRQ(ierr);
738     while (its--) {
739       /* go down through the rows */
740       for ( i=0; i<m; i++ ) {
741         n    = A->i[i+1] - A->i[i];
742         idx  = A->j + A->i[i] + shift;
743         v    = A->a + A->i[i] + shift;
744         sum  = b[i];
745         SPARSEDENSEMDOT(sum,xs,v,idx,n);
746         d    = fshift + A->a[diag[i]+shift];
747         n    = B->i[i+1] - B->i[i];
748         idx  = B->j + B->i[i] + shift;
749         v    = B->a + B->i[i] + shift;
750         SPARSEDENSEMDOT(sum,ls,v,idx,n);
751         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
752       }
753       /* come up through the rows */
754       for ( i=m-1; i>-1; i-- ) {
755         n    = A->i[i+1] - A->i[i];
756         idx  = A->j + A->i[i] + shift;
757         v    = A->a + A->i[i] + shift;
758         sum  = b[i];
759         SPARSEDENSEMDOT(sum,xs,v,idx,n);
760         d    = fshift + A->a[diag[i]+shift];
761         n    = B->i[i+1] - B->i[i];
762         idx  = B->j + B->i[i] + shift;
763         v    = B->a + B->i[i] + shift;
764         SPARSEDENSEMDOT(sum,ls,v,idx,n);
765         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
766       }
767     }
768   }
769   else if (flag & SOR_LOCAL_FORWARD_SWEEP){
770     if (flag & SOR_ZERO_INITIAL_GUESS) {
771       return (*mat->A->ops.relax)(mat->A,bb,omega,flag,fshift,its,xx);
772     }
773     ierr=VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,mat->Mvctx);
774     CHKERRQ(ierr);
775     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,mat->Mvctx);
776     CHKERRQ(ierr);
777     while (its--) {
778       for ( i=0; i<m; i++ ) {
779         n    = A->i[i+1] - A->i[i];
780         idx  = A->j + A->i[i] + shift;
781         v    = A->a + A->i[i] + shift;
782         sum  = b[i];
783         SPARSEDENSEMDOT(sum,xs,v,idx,n);
784         d    = fshift + A->a[diag[i]+shift];
785         n    = B->i[i+1] - B->i[i];
786         idx  = B->j + B->i[i] + shift;
787         v    = B->a + B->i[i] + shift;
788         SPARSEDENSEMDOT(sum,ls,v,idx,n);
789         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
790       }
791     }
792   }
793   else if (flag & SOR_LOCAL_BACKWARD_SWEEP){
794     if (flag & SOR_ZERO_INITIAL_GUESS) {
795       return (*mat->A->ops.relax)(mat->A,bb,omega,flag,fshift,its,xx);
796     }
797     ierr = VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,
798                             mat->Mvctx); CHKERRQ(ierr);
799     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,
800                             mat->Mvctx); CHKERRQ(ierr);
801     while (its--) {
802       for ( i=m-1; i>-1; i-- ) {
803         n    = A->i[i+1] - A->i[i];
804         idx  = A->j + A->i[i] + shift;
805         v    = A->a + A->i[i] + shift;
806         sum  = b[i];
807         SPARSEDENSEMDOT(sum,xs,v,idx,n);
808         d    = fshift + A->a[diag[i]+shift];
809         n    = B->i[i+1] - B->i[i];
810         idx  = B->j + B->i[i] + shift;
811         v    = B->a + B->i[i] + shift;
812         SPARSEDENSEMDOT(sum,ls,v,idx,n);
813         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
814       }
815     }
816   }
817   else {
818     SETERRQ(1,"MatRelax_MPIAIJ:Option not supported");
819   }
820   return 0;
821 }
822 
823 static int MatGetInfo_MPIAIJ(Mat matin,MatInfoType flag,MatInfo *info)
824 {
825   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
826   Mat        A = mat->A, B = mat->B;
827   int        ierr;
828   double     isend[5], irecv[5];
829 
830   info->rows_global    = (double)mat->M;
831   info->columns_global = (double)mat->N;
832   info->rows_local     = (double)mat->m;
833   info->columns_local  = (double)mat->N;
834   info->block_size     = 1.0;
835   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
836   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->nz_unneeded;
837   isend[3] = info->memory;  isend[4] = info->mallocs;
838   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
839   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->nz_unneeded;
840   isend[3] += info->memory;  isend[4] += info->mallocs;
841   if (flag == MAT_LOCAL) {
842     info->nz_used      = isend[0];
843     info->nz_allocated = isend[1];
844     info->nz_unneeded  = isend[2];
845     info->memory       = isend[3];
846     info->mallocs      = isend[4];
847   } else if (flag == MAT_GLOBAL_MAX) {
848     MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_MAX,matin->comm);
849     info->nz_used      = irecv[0];
850     info->nz_allocated = irecv[1];
851     info->nz_unneeded  = irecv[2];
852     info->memory       = irecv[3];
853     info->mallocs      = irecv[4];
854   } else if (flag == MAT_GLOBAL_SUM) {
855     MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_SUM,matin->comm);
856     info->nz_used      = irecv[0];
857     info->nz_allocated = irecv[1];
858     info->nz_unneeded  = irecv[2];
859     info->memory       = irecv[3];
860     info->mallocs      = irecv[4];
861   }
862   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
863   info->fill_ratio_needed = 0;
864   info->factor_mallocs    = 0;
865 
866   return 0;
867 }
868 
869 extern int MatLUFactorSymbolic_MPIAIJ(Mat,IS,IS,double,Mat*);
870 extern int MatLUFactorNumeric_MPIAIJ(Mat,Mat*);
871 extern int MatLUFactor_MPIAIJ(Mat,IS,IS,double);
872 extern int MatILUFactorSymbolic_MPIAIJ(Mat,IS,IS,double,int,Mat *);
873 extern int MatSolve_MPIAIJ(Mat,Vec,Vec);
874 extern int MatSolveAdd_MPIAIJ(Mat,Vec,Vec,Vec);
875 extern int MatSolveTrans_MPIAIJ(Mat,Vec,Vec);
876 extern int MatSolveTransAdd_MPIAIJ(Mat,Vec,Vec,Vec);
877 
878 static int MatSetOption_MPIAIJ(Mat A,MatOption op)
879 {
880   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
881 
882   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
883       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
884       op == MAT_COLUMNS_SORTED ||
885       op == MAT_ROW_ORIENTED) {
886         MatSetOption(a->A,op);
887         MatSetOption(a->B,op);
888   }
889   else if (op == MAT_ROWS_SORTED ||
890            op == MAT_SYMMETRIC ||
891            op == MAT_STRUCTURALLY_SYMMETRIC ||
892            op == MAT_YES_NEW_DIAGONALS)
893     PLogInfo(A,"Info:MatSetOption_MPIAIJ:Option ignored\n");
894   else if (op == MAT_COLUMN_ORIENTED) {
895     a->roworiented = 0;
896     MatSetOption(a->A,op);
897     MatSetOption(a->B,op);
898   }
899   else if (op == MAT_NO_NEW_DIAGONALS)
900     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIAIJ:MAT_NO_NEW_DIAGONALS");}
901   else
902     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIAIJ:unknown option");}
903   return 0;
904 }
905 
906 static int MatGetSize_MPIAIJ(Mat matin,int *m,int *n)
907 {
908   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
909   *m = mat->M; *n = mat->N;
910   return 0;
911 }
912 
913 static int MatGetLocalSize_MPIAIJ(Mat matin,int *m,int *n)
914 {
915   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
916   *m = mat->m; *n = mat->N;
917   return 0;
918 }
919 
920 static int MatGetOwnershipRange_MPIAIJ(Mat matin,int *m,int *n)
921 {
922   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
923   *m = mat->rstart; *n = mat->rend;
924   return 0;
925 }
926 
927 extern int MatGetRow_SeqAIJ(Mat,int,int*,int**,Scalar**);
928 extern int MatRestoreRow_SeqAIJ(Mat,int,int*,int**,Scalar**);
929 
930 int MatGetRow_MPIAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
931 {
932   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
933   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
934   int        i, ierr, *cworkA, *cworkB, **pcA, **pcB, cstart = mat->cstart;
935   int        nztot, nzA, nzB, lrow, rstart = mat->rstart, rend = mat->rend;
936   int        *cmap, *idx_p;
937 
938   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,"MatGetRow_MPIAIJ:Already active");
939   mat->getrowactive = PETSC_TRUE;
940 
941   if (!mat->rowvalues && (idx || v)) {
942     /*
943         allocate enough space to hold information from the longest row.
944     */
945     Mat_SeqAIJ *Aa = (Mat_SeqAIJ *) mat->A->data,*Ba = (Mat_SeqAIJ *) mat->B->data;
946     int     max = 1,m = mat->m,tmp;
947     for ( i=0; i<m; i++ ) {
948       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
949       if (max < tmp) { max = tmp; }
950     }
951     mat->rowvalues = (Scalar *) PetscMalloc( max*(sizeof(int)+sizeof(Scalar)));
952     CHKPTRQ(mat->rowvalues);
953     mat->rowindices = (int *) (mat->rowvalues + max);
954   }
955 
956   if (row < rstart || row >= rend) SETERRQ(1,"MatGetRow_MPIAIJ:Only local rows")
957   lrow = row - rstart;
958 
959   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
960   if (!v)   {pvA = 0; pvB = 0;}
961   if (!idx) {pcA = 0; if (!v) pcB = 0;}
962   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
963   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
964   nztot = nzA + nzB;
965 
966   cmap  = mat->garray;
967   if (v  || idx) {
968     if (nztot) {
969       /* Sort by increasing column numbers, assuming A and B already sorted */
970       int imark = -1;
971       if (v) {
972         *v = v_p = mat->rowvalues;
973         for ( i=0; i<nzB; i++ ) {
974           if (cmap[cworkB[i]] < cstart)   v_p[i] = vworkB[i];
975           else break;
976         }
977         imark = i;
978         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
979         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
980       }
981       if (idx) {
982         *idx = idx_p = mat->rowindices;
983         if (imark > -1) {
984           for ( i=0; i<imark; i++ ) {
985             idx_p[i] = cmap[cworkB[i]];
986           }
987         } else {
988           for ( i=0; i<nzB; i++ ) {
989             if (cmap[cworkB[i]] < cstart)   idx_p[i] = cmap[cworkB[i]];
990             else break;
991           }
992           imark = i;
993         }
994         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart + cworkA[i];
995         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]];
996       }
997     }
998     else {
999       if (idx) *idx = 0;
1000       if (v)   *v   = 0;
1001     }
1002   }
1003   *nz = nztot;
1004   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1005   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1006   return 0;
1007 }
1008 
1009 int MatRestoreRow_MPIAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
1010 {
1011   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
1012   if (aij->getrowactive == PETSC_FALSE) {
1013     SETERRQ(1,"MatRestoreRow_MPIAIJ:MatGetRow not called");
1014   }
1015   aij->getrowactive = PETSC_FALSE;
1016   return 0;
1017 }
1018 
1019 static int MatNorm_MPIAIJ(Mat mat,NormType type,double *norm)
1020 {
1021   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
1022   Mat_SeqAIJ *amat = (Mat_SeqAIJ*) aij->A->data, *bmat = (Mat_SeqAIJ*) aij->B->data;
1023   int        ierr, i, j, cstart = aij->cstart,shift = amat->indexshift;
1024   double     sum = 0.0;
1025   Scalar     *v;
1026 
1027   if (aij->size == 1) {
1028     ierr =  MatNorm(aij->A,type,norm); CHKERRQ(ierr);
1029   } else {
1030     if (type == NORM_FROBENIUS) {
1031       v = amat->a;
1032       for (i=0; i<amat->nz; i++ ) {
1033 #if defined(PETSC_COMPLEX)
1034         sum += real(conj(*v)*(*v)); v++;
1035 #else
1036         sum += (*v)*(*v); v++;
1037 #endif
1038       }
1039       v = bmat->a;
1040       for (i=0; i<bmat->nz; i++ ) {
1041 #if defined(PETSC_COMPLEX)
1042         sum += real(conj(*v)*(*v)); v++;
1043 #else
1044         sum += (*v)*(*v); v++;
1045 #endif
1046       }
1047       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
1048       *norm = sqrt(*norm);
1049     }
1050     else if (type == NORM_1) { /* max column norm */
1051       double *tmp, *tmp2;
1052       int    *jj, *garray = aij->garray;
1053       tmp  = (double *) PetscMalloc( aij->N*sizeof(double) ); CHKPTRQ(tmp);
1054       tmp2 = (double *) PetscMalloc( aij->N*sizeof(double) ); CHKPTRQ(tmp2);
1055       PetscMemzero(tmp,aij->N*sizeof(double));
1056       *norm = 0.0;
1057       v = amat->a; jj = amat->j;
1058       for ( j=0; j<amat->nz; j++ ) {
1059         tmp[cstart + *jj++ + shift] += PetscAbsScalar(*v);  v++;
1060       }
1061       v = bmat->a; jj = bmat->j;
1062       for ( j=0; j<bmat->nz; j++ ) {
1063         tmp[garray[*jj++ + shift]] += PetscAbsScalar(*v); v++;
1064       }
1065       MPI_Allreduce(tmp,tmp2,aij->N,MPI_DOUBLE,MPI_SUM,mat->comm);
1066       for ( j=0; j<aij->N; j++ ) {
1067         if (tmp2[j] > *norm) *norm = tmp2[j];
1068       }
1069       PetscFree(tmp); PetscFree(tmp2);
1070     }
1071     else if (type == NORM_INFINITY) { /* max row norm */
1072       double ntemp = 0.0;
1073       for ( j=0; j<amat->m; j++ ) {
1074         v = amat->a + amat->i[j] + shift;
1075         sum = 0.0;
1076         for ( i=0; i<amat->i[j+1]-amat->i[j]; i++ ) {
1077           sum += PetscAbsScalar(*v); v++;
1078         }
1079         v = bmat->a + bmat->i[j] + shift;
1080         for ( i=0; i<bmat->i[j+1]-bmat->i[j]; i++ ) {
1081           sum += PetscAbsScalar(*v); v++;
1082         }
1083         if (sum > ntemp) ntemp = sum;
1084       }
1085       MPI_Allreduce(&ntemp,norm,1,MPI_DOUBLE,MPI_MAX,mat->comm);
1086     }
1087     else {
1088       SETERRQ(1,"MatNorm_MPIAIJ:No support for two norm");
1089     }
1090   }
1091   return 0;
1092 }
1093 
1094 static int MatTranspose_MPIAIJ(Mat A,Mat *matout)
1095 {
1096   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
1097   Mat_SeqAIJ *Aloc = (Mat_SeqAIJ *) a->A->data;
1098   int        ierr,shift = Aloc->indexshift;
1099   Mat        B;
1100   int        M = a->M, N = a->N,m,*ai,*aj,row,*cols,i,*ct;
1101   Scalar     *array;
1102 
1103   if (matout == PETSC_NULL && M != N)
1104     SETERRQ(1,"MatTranspose_MPIAIJ:Square matrix only for in-place");
1105   ierr = MatCreateMPIAIJ(A->comm,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,
1106          PETSC_NULL,&B); CHKERRQ(ierr);
1107 
1108   /* copy over the A part */
1109   Aloc = (Mat_SeqAIJ*) a->A->data;
1110   m = Aloc->m; ai = Aloc->i; aj = Aloc->j; array = Aloc->a;
1111   row = a->rstart;
1112   for ( i=0; i<ai[m]+shift; i++ ) {aj[i] += a->cstart + shift;}
1113   for ( i=0; i<m; i++ ) {
1114     ierr = MatSetValues(B,ai[i+1]-ai[i],aj,1,&row,array,INSERT_VALUES);CHKERRQ(ierr);
1115     row++; array += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
1116   }
1117   aj = Aloc->j;
1118   for ( i=0; i<ai[m]+shift; i++ ) {aj[i] -= a->cstart + shift;}
1119 
1120   /* copy over the B part */
1121   Aloc = (Mat_SeqAIJ*) a->B->data;
1122   m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; array = Aloc->a;
1123   row = a->rstart;
1124   ct = cols = (int *) PetscMalloc( (1+ai[m]-shift)*sizeof(int) ); CHKPTRQ(cols);
1125   for ( i=0; i<ai[m]+shift; i++ ) {cols[i] = a->garray[aj[i]+shift];}
1126   for ( i=0; i<m; i++ ) {
1127     ierr = MatSetValues(B,ai[i+1]-ai[i],cols,1,&row,array,INSERT_VALUES);CHKERRQ(ierr);
1128     row++; array += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
1129   }
1130   PetscFree(ct);
1131   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1132   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1133   if (matout != PETSC_NULL) {
1134     *matout = B;
1135   } else {
1136     /* This isn't really an in-place transpose .... but free data structures from a */
1137     PetscFree(a->rowners);
1138     ierr = MatDestroy(a->A); CHKERRQ(ierr);
1139     ierr = MatDestroy(a->B); CHKERRQ(ierr);
1140     if (a->colmap) PetscFree(a->colmap);
1141     if (a->garray) PetscFree(a->garray);
1142     if (a->lvec) VecDestroy(a->lvec);
1143     if (a->Mvctx) VecScatterDestroy(a->Mvctx);
1144     PetscFree(a);
1145     PetscMemcpy(A,B,sizeof(struct _Mat));
1146     PetscHeaderDestroy(B);
1147   }
1148   return 0;
1149 }
1150 
1151 int MatDiagonalScale_MPIAIJ(Mat A,Vec ll,Vec rr)
1152 {
1153   Mat a = ((Mat_MPIAIJ *) A->data)->A;
1154   Mat b = ((Mat_MPIAIJ *) A->data)->B;
1155   int ierr,s1,s2,s3;
1156 
1157   if (ll)  {
1158     ierr = VecGetLocalSize(ll,&s1); CHKERRQ(ierr);
1159     ierr = MatGetLocalSize(A,&s2,&s3); CHKERRQ(ierr);
1160     if (s1!=s2) SETERRQ(1,"MatDiagonalScale_MPIAIJ:non-conforming local sizes");
1161     ierr = MatDiagonalScale(a,ll,0); CHKERRQ(ierr);
1162     ierr = MatDiagonalScale(b,ll,0); CHKERRQ(ierr);
1163   }
1164   if (rr) SETERRQ(1,"MatDiagonalScale_MPIAIJ:not supported for right vector");
1165   return 0;
1166 }
1167 
1168 
1169 extern int MatPrintHelp_SeqAIJ(Mat);
1170 static int MatPrintHelp_MPIAIJ(Mat A)
1171 {
1172   Mat_MPIAIJ *a   = (Mat_MPIAIJ*) A->data;
1173 
1174   if (!a->rank) return MatPrintHelp_SeqAIJ(a->A);
1175   else return 0;
1176 }
1177 
1178 static int MatGetBlockSize_MPIAIJ(Mat A,int *bs)
1179 {
1180   *bs = 1;
1181   return 0;
1182 }
1183 
1184 extern int MatConvert_MPIAIJ(Mat,MatType,Mat *);
1185 static int MatConvertSameType_MPIAIJ(Mat,Mat *,int);
1186 extern int MatIncreaseOverlap_MPIAIJ(Mat , int, IS *, int);
1187 int MatGetSubMatrices_MPIAIJ (Mat ,int , IS *,IS *,MatGetSubMatrixCall,Mat **);
1188 /* -------------------------------------------------------------------*/
1189 static struct _MatOps MatOps = {MatSetValues_MPIAIJ,
1190        MatGetRow_MPIAIJ,MatRestoreRow_MPIAIJ,
1191        MatMult_MPIAIJ,MatMultAdd_MPIAIJ,
1192        MatMultTrans_MPIAIJ,MatMultTransAdd_MPIAIJ,
1193        MatSolve_MPIAIJ,MatSolveAdd_MPIAIJ,
1194        MatSolveTrans_MPIAIJ,MatSolveTransAdd_MPIAIJ,
1195        MatLUFactor_MPIAIJ,0,
1196        MatRelax_MPIAIJ,
1197        MatTranspose_MPIAIJ,
1198        MatGetInfo_MPIAIJ,0,
1199        MatGetDiagonal_MPIAIJ,MatDiagonalScale_MPIAIJ,MatNorm_MPIAIJ,
1200        MatAssemblyBegin_MPIAIJ,MatAssemblyEnd_MPIAIJ,
1201        0,
1202        MatSetOption_MPIAIJ,MatZeroEntries_MPIAIJ,MatZeroRows_MPIAIJ,
1203        MatLUFactorSymbolic_MPIAIJ,MatLUFactorNumeric_MPIAIJ,0,0,
1204        MatGetSize_MPIAIJ,MatGetLocalSize_MPIAIJ,MatGetOwnershipRange_MPIAIJ,
1205        MatILUFactorSymbolic_MPIAIJ,0,
1206        0,0,MatConvert_MPIAIJ,MatConvertSameType_MPIAIJ,0,0,
1207        0,0,0,
1208        MatGetSubMatrices_MPIAIJ,MatIncreaseOverlap_MPIAIJ,MatGetValues_MPIAIJ,0,
1209        MatPrintHelp_MPIAIJ,
1210        MatScale_MPIAIJ,0,0,0,
1211        MatGetBlockSize_MPIAIJ,
1212        MatGetRowIJ_MPIAIJ,
1213        MatRestoreRowIJ_MPIAIJ};
1214 
1215 /*@C
1216    MatCreateMPIAIJ - Creates a sparse parallel matrix in AIJ format
1217    (the default parallel PETSc format).  For good matrix assembly performance
1218    the user should preallocate the matrix storage by setting the parameters
1219    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1220    performance can be increased by more than a factor of 50.
1221 
1222    Input Parameters:
1223 .  comm - MPI communicator
1224 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1225            This value should be the same as the local size used in creating the
1226            y vector for the matrix-vector product y = Ax.
1227 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1228            This value should be the same as the local size used in creating the
1229            x vector for the matrix-vector product y = Ax.
1230 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1231 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1232 .  d_nz - number of nonzeros per row in diagonal portion of local submatrix
1233            (same for all local rows)
1234 .  d_nzz - array containing the number of nonzeros in the various rows of the
1235            diagonal portion of the local submatrix (possibly different for each row)
1236            or PETSC_NULL. You must leave room for the diagonal entry even if
1237            it is zero.
1238 .  o_nz - number of nonzeros per row in the off-diagonal portion of local
1239            submatrix (same for all local rows).
1240 .  o_nzz - array containing the number of nonzeros in the various rows of the
1241            off-diagonal portion of the local submatrix (possibly different for
1242            each row) or PETSC_NULL.
1243 
1244    Output Parameter:
1245 .  A - the matrix
1246 
1247    Notes:
1248    The AIJ format (also called the Yale sparse matrix format or
1249    compressed row storage), is fully compatible with standard Fortran 77
1250    storage.  That is, the stored row and column indices can begin at
1251    either one (as in Fortran) or zero.  See the users manual for details.
1252 
1253    The user MUST specify either the local or global matrix dimensions
1254    (possibly both).
1255 
1256    By default, this format uses inodes (identical nodes) when possible.
1257    We search for consecutive rows with the same nonzero structure, thereby
1258    reusing matrix information to achieve increased efficiency.
1259 
1260    Options Database Keys:
1261 $    -mat_aij_no_inode  - Do not use inodes
1262 $    -mat_aij_inode_limit <limit> - Set inode limit.
1263 $        (max limit=5)
1264 $    -mat_aij_oneindex - Internally use indexing starting at 1
1265 $        rather than 0.  Note: When calling MatSetValues(),
1266 $        the user still MUST index entries starting at 0!
1267 
1268    Storage Information:
1269    For a square global matrix we define each processor's diagonal portion
1270    to be its local rows and the corresponding columns (a square submatrix);
1271    each processor's off-diagonal portion encompasses the remainder of the
1272    local matrix (a rectangular submatrix).
1273 
1274    The user can specify preallocated storage for the diagonal part of
1275    the local submatrix with either d_nz or d_nnz (not both).  Set
1276    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1277    memory allocation.  Likewise, specify preallocated storage for the
1278    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1279 
1280    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1281    the figure below we depict these three local rows and all columns (0-11).
1282 
1283 $          0 1 2 3 4 5 6 7 8 9 10 11
1284 $         -------------------
1285 $  row 3  |  o o o d d d o o o o o o
1286 $  row 4  |  o o o d d d o o o o o o
1287 $  row 5  |  o o o d d d o o o o o o
1288 $         -------------------
1289 $
1290 
1291    Thus, any entries in the d locations are stored in the d (diagonal)
1292    submatrix, and any entries in the o locations are stored in the
1293    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1294    stored simply in the MATSEQAIJ format for compressed row storage.
1295 
1296    Now d_nz should indicate the number of nonzeros per row in the d matrix,
1297    and o_nz should indicate the number of nonzeros per row in the o matrix.
1298    In general, for PDE problems in which most nonzeros are near the diagonal,
1299    one expects d_nz >> o_nz. For large problems you MUST preallocate memory
1300    or you will get TERRIBLE performance; see the users' manual chapter on
1301    matrices and the file $(PETSC_DIR)/Performance.
1302 
1303 .keywords: matrix, aij, compressed row, sparse, parallel
1304 
1305 .seealso: MatCreate(), MatCreateSeqAIJ(), MatSetValues()
1306 @*/
1307 int MatCreateMPIAIJ(MPI_Comm comm,int m,int n,int M,int N,
1308                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1309 {
1310   Mat          B;
1311   Mat_MPIAIJ   *b;
1312   int          ierr, i,sum[2],work[2];
1313 
1314   *A = 0;
1315   PetscHeaderCreate(B,_Mat,MAT_COOKIE,MATMPIAIJ,comm);
1316   PLogObjectCreate(B);
1317   B->data       = (void *) (b = PetscNew(Mat_MPIAIJ)); CHKPTRQ(b);
1318   PetscMemzero(b,sizeof(Mat_MPIAIJ));
1319   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1320   B->destroy    = MatDestroy_MPIAIJ;
1321   B->view       = MatView_MPIAIJ;
1322   B->factor     = 0;
1323   B->assembled  = PETSC_FALSE;
1324 
1325   b->insertmode = NOT_SET_VALUES;
1326   MPI_Comm_rank(comm,&b->rank);
1327   MPI_Comm_size(comm,&b->size);
1328 
1329   if (m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
1330     SETERRQ(1,"MatCreateMPIAIJ:Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1331 
1332   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1333     work[0] = m; work[1] = n;
1334     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
1335     if (M == PETSC_DECIDE) M = sum[0];
1336     if (N == PETSC_DECIDE) N = sum[1];
1337   }
1338   if (m == PETSC_DECIDE) {m = M/b->size + ((M % b->size) > b->rank);}
1339   if (n == PETSC_DECIDE) {n = N/b->size + ((N % b->size) > b->rank);}
1340   b->m = m; B->m = m;
1341   b->n = n; B->n = n;
1342   b->N = N; B->N = N;
1343   b->M = M; B->M = M;
1344 
1345   /* build local table of row and column ownerships */
1346   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1347   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIAIJ));
1348   b->cowners = b->rowners + b->size + 2;
1349   MPI_Allgather(&m,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
1350   b->rowners[0] = 0;
1351   for ( i=2; i<=b->size; i++ ) {
1352     b->rowners[i] += b->rowners[i-1];
1353   }
1354   b->rstart = b->rowners[b->rank];
1355   b->rend   = b->rowners[b->rank+1];
1356   MPI_Allgather(&n,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
1357   b->cowners[0] = 0;
1358   for ( i=2; i<=b->size; i++ ) {
1359     b->cowners[i] += b->cowners[i-1];
1360   }
1361   b->cstart = b->cowners[b->rank];
1362   b->cend   = b->cowners[b->rank+1];
1363 
1364   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1365   ierr = MatCreateSeqAIJ(MPI_COMM_SELF,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1366   PLogObjectParent(B,b->A);
1367   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1368   ierr = MatCreateSeqAIJ(MPI_COMM_SELF,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1369   PLogObjectParent(B,b->B);
1370 
1371   /* build cache for off array entries formed */
1372   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1373   b->colmap      = 0;
1374   b->garray      = 0;
1375   b->roworiented = 1;
1376 
1377   /* stuff used for matrix vector multiply */
1378   b->lvec      = 0;
1379   b->Mvctx     = 0;
1380 
1381   /* stuff for MatGetRow() */
1382   b->rowindices   = 0;
1383   b->rowvalues    = 0;
1384   b->getrowactive = PETSC_FALSE;
1385 
1386   *A = B;
1387   return 0;
1388 }
1389 
1390 static int MatConvertSameType_MPIAIJ(Mat matin,Mat *newmat,int cpvalues)
1391 {
1392   Mat        mat;
1393   Mat_MPIAIJ *a,*oldmat = (Mat_MPIAIJ *) matin->data;
1394   int        ierr, len=0, flg;
1395 
1396   *newmat       = 0;
1397   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIAIJ,matin->comm);
1398   PLogObjectCreate(mat);
1399   mat->data       = (void *) (a = PetscNew(Mat_MPIAIJ)); CHKPTRQ(a);
1400   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1401   mat->destroy    = MatDestroy_MPIAIJ;
1402   mat->view       = MatView_MPIAIJ;
1403   mat->factor     = matin->factor;
1404   mat->assembled  = PETSC_TRUE;
1405 
1406   a->m = mat->m   = oldmat->m;
1407   a->n = mat->n   = oldmat->n;
1408   a->M = mat->M   = oldmat->M;
1409   a->N = mat->N   = oldmat->N;
1410 
1411   a->rstart       = oldmat->rstart;
1412   a->rend         = oldmat->rend;
1413   a->cstart       = oldmat->cstart;
1414   a->cend         = oldmat->cend;
1415   a->size         = oldmat->size;
1416   a->rank         = oldmat->rank;
1417   a->insertmode   = NOT_SET_VALUES;
1418   a->rowvalues    = 0;
1419   a->getrowactive = PETSC_FALSE;
1420 
1421   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1422   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIAIJ));
1423   a->cowners = a->rowners + a->size + 2;
1424   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1425   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1426   if (oldmat->colmap) {
1427     a->colmap = (int *) PetscMalloc((a->N)*sizeof(int));CHKPTRQ(a->colmap);
1428     PLogObjectMemory(mat,(a->N)*sizeof(int));
1429     PetscMemcpy(a->colmap,oldmat->colmap,(a->N)*sizeof(int));
1430   } else a->colmap = 0;
1431   if (oldmat->garray) {
1432     len = ((Mat_SeqAIJ *) (oldmat->B->data))->n;
1433     a->garray = (int *) PetscMalloc((len+1)*sizeof(int)); CHKPTRQ(a->garray);
1434     PLogObjectMemory(mat,len*sizeof(int));
1435     if (len) PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1436   } else a->garray = 0;
1437 
1438   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1439   PLogObjectParent(mat,a->lvec);
1440   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1441   PLogObjectParent(mat,a->Mvctx);
1442   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1443   PLogObjectParent(mat,a->A);
1444   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1445   PLogObjectParent(mat,a->B);
1446   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1447   if (flg) {
1448     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1449   }
1450   *newmat = mat;
1451   return 0;
1452 }
1453 
1454 #include "sys.h"
1455 
1456 int MatLoad_MPIAIJ(Viewer viewer,MatType type,Mat *newmat)
1457 {
1458   Mat          A;
1459   int          i, nz, ierr, j,rstart, rend, fd;
1460   Scalar       *vals,*svals;
1461   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1462   MPI_Status   status;
1463   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,maxnz,*cols;
1464   int          *ourlens,*sndcounts = 0,*procsnz = 0, *offlens,jj,*mycols,*smycols;
1465   int          tag = ((PetscObject)viewer)->tag;
1466 
1467   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1468   if (!rank) {
1469     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1470     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
1471     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIAIJ:not matrix object");
1472   }
1473 
1474   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1475   M = header[1]; N = header[2];
1476   /* determine ownership of all rows */
1477   m = M/size + ((M % size) > rank);
1478   rowners = (int *) PetscMalloc((size+2)*sizeof(int)); CHKPTRQ(rowners);
1479   MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1480   rowners[0] = 0;
1481   for ( i=2; i<=size; i++ ) {
1482     rowners[i] += rowners[i-1];
1483   }
1484   rstart = rowners[rank];
1485   rend   = rowners[rank+1];
1486 
1487   /* distribute row lengths to all processors */
1488   ourlens = (int*) PetscMalloc( 2*(rend-rstart)*sizeof(int) ); CHKPTRQ(ourlens);
1489   offlens = ourlens + (rend-rstart);
1490   if (!rank) {
1491     rowlengths = (int*) PetscMalloc( M*sizeof(int) ); CHKPTRQ(rowlengths);
1492     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1493     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1494     for ( i=0; i<size; i++ ) sndcounts[i] = rowners[i+1] - rowners[i];
1495     MPI_Scatterv(rowlengths,sndcounts,rowners,MPI_INT,ourlens,rend-rstart,MPI_INT,0,comm);
1496     PetscFree(sndcounts);
1497   }
1498   else {
1499     MPI_Scatterv(0,0,0,MPI_INT,ourlens,rend-rstart,MPI_INT, 0,comm);
1500   }
1501 
1502   if (!rank) {
1503     /* calculate the number of nonzeros on each processor */
1504     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1505     PetscMemzero(procsnz,size*sizeof(int));
1506     for ( i=0; i<size; i++ ) {
1507       for ( j=rowners[i]; j< rowners[i+1]; j++ ) {
1508         procsnz[i] += rowlengths[j];
1509       }
1510     }
1511     PetscFree(rowlengths);
1512 
1513     /* determine max buffer needed and allocate it */
1514     maxnz = 0;
1515     for ( i=0; i<size; i++ ) {
1516       maxnz = PetscMax(maxnz,procsnz[i]);
1517     }
1518     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1519 
1520     /* read in my part of the matrix column indices  */
1521     nz = procsnz[0];
1522     mycols = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1523     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1524 
1525     /* read in every one elses and ship off */
1526     for ( i=1; i<size; i++ ) {
1527       nz = procsnz[i];
1528       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1529       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1530     }
1531     PetscFree(cols);
1532   }
1533   else {
1534     /* determine buffer space needed for message */
1535     nz = 0;
1536     for ( i=0; i<m; i++ ) {
1537       nz += ourlens[i];
1538     }
1539     mycols = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1540 
1541     /* receive message of column indices*/
1542     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1543     MPI_Get_count(&status,MPI_INT,&maxnz);
1544     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIAIJ:something is wrong with file");
1545   }
1546 
1547   /* loop over local rows, determining number of off diagonal entries */
1548   PetscMemzero(offlens,m*sizeof(int));
1549   jj = 0;
1550   for ( i=0; i<m; i++ ) {
1551     for ( j=0; j<ourlens[i]; j++ ) {
1552       if (mycols[jj] < rstart || mycols[jj] >= rend) offlens[i]++;
1553       jj++;
1554     }
1555   }
1556 
1557   /* create our matrix */
1558   for ( i=0; i<m; i++ ) {
1559     ourlens[i] -= offlens[i];
1560   }
1561   ierr = MatCreateMPIAIJ(comm,m,PETSC_DECIDE,M,N,0,ourlens,0,offlens,newmat);CHKERRQ(ierr);
1562   A = *newmat;
1563   MatSetOption(A,MAT_COLUMNS_SORTED);
1564   for ( i=0; i<m; i++ ) {
1565     ourlens[i] += offlens[i];
1566   }
1567 
1568   if (!rank) {
1569     vals = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(vals);
1570 
1571     /* read in my part of the matrix numerical values  */
1572     nz = procsnz[0];
1573     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1574 
1575     /* insert into matrix */
1576     jj      = rstart;
1577     smycols = mycols;
1578     svals   = vals;
1579     for ( i=0; i<m; i++ ) {
1580       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1581       smycols += ourlens[i];
1582       svals   += ourlens[i];
1583       jj++;
1584     }
1585 
1586     /* read in other processors and ship out */
1587     for ( i=1; i<size; i++ ) {
1588       nz = procsnz[i];
1589       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1590       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1591     }
1592     PetscFree(procsnz);
1593   }
1594   else {
1595     /* receive numeric values */
1596     vals = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(vals);
1597 
1598     /* receive message of values*/
1599     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1600     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1601     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIAIJ:something is wrong with file");
1602 
1603     /* insert into matrix */
1604     jj      = rstart;
1605     smycols = mycols;
1606     svals   = vals;
1607     for ( i=0; i<m; i++ ) {
1608       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1609       smycols += ourlens[i];
1610       svals   += ourlens[i];
1611       jj++;
1612     }
1613   }
1614   PetscFree(ourlens); PetscFree(vals); PetscFree(mycols); PetscFree(rowners);
1615 
1616   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1617   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1618   return 0;
1619 }
1620