xref: /petsc/src/mat/impls/aij/mpi/mpiaij.c (revision 1942fdfc3363b1385e6dd6a4d0a1dabce71c8c08)
1 
2 #ifndef lint
3 static char vcid[] = "$Id: mpiaij.c,v 1.165 1996/08/22 19:53:29 curfman Exp bsmith $";
4 #endif
5 
6 #include "src/mat/impls/aij/mpi/mpiaij.h"
7 #include "src/vec/vecimpl.h"
8 #include "src/inline/spops.h"
9 
10 /* local utility routine that creates a mapping from the global column
11 number to the local number in the off-diagonal part of the local
12 storage of the matrix.  This is done in a non scable way since the
13 length of colmap equals the global matrix length.
14 */
15 static int CreateColmap_MPIAIJ_Private(Mat mat)
16 {
17   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
18   Mat_SeqAIJ *B = (Mat_SeqAIJ*) aij->B->data;
19   int        n = B->n,i;
20 
21   aij->colmap = (int *) PetscMalloc(aij->N*sizeof(int));CHKPTRQ(aij->colmap);
22   PLogObjectMemory(mat,aij->N*sizeof(int));
23   PetscMemzero(aij->colmap,aij->N*sizeof(int));
24   for ( i=0; i<n; i++ ) aij->colmap[aij->garray[i]] = i+1;
25   return 0;
26 }
27 
28 extern int DisAssemble_MPIAIJ(Mat);
29 
30 static int MatGetRowIJ_MPIAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
31                            PetscTruth *done)
32 {
33   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
34   int        ierr;
35   if (aij->size == 1) {
36     ierr = MatGetRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
37   } else SETERRQ(1,"MatGetRowIJ_MPIAIJ:not supported in parallel");
38   return 0;
39 }
40 
41 static int MatRestoreRowIJ_MPIAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
42                                PetscTruth *done)
43 {
44   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
45   int        ierr;
46   if (aij->size == 1) {
47     ierr = MatRestoreRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
48   } else SETERRQ(1,"MatRestoreRowIJ_MPIAIJ:not supported in parallel");
49   return 0;
50 }
51 
52 static int MatSetValues_MPIAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
53 {
54   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
55   Scalar     value;
56   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
57   int        cstart = aij->cstart, cend = aij->cend,row,col;
58   int        roworiented = aij->roworiented;
59 
60   if (aij->insertmode != NOT_SET_VALUES && aij->insertmode != addv) {
61     SETERRQ(1,"MatSetValues_MPIAIJ:Cannot mix inserts and adds");
62   }
63   aij->insertmode = addv;
64   for ( i=0; i<m; i++ ) {
65     if (im[i] < 0) SETERRQ(1,"MatSetValues_MPIAIJ:Negative row");
66     if (im[i] >= aij->M) SETERRQ(1,"MatSetValues_MPIAIJ:Row too large");
67     if (im[i] >= rstart && im[i] < rend) {
68       row = im[i] - rstart;
69       for ( j=0; j<n; j++ ) {
70         if (in[j] < 0) SETERRQ(1,"MatSetValues_MPIAIJ:Negative column");
71         if (in[j] >= aij->N) SETERRQ(1,"MatSetValues_MPIAIJ:Col too large");
72         if (in[j] >= cstart && in[j] < cend){
73           col = in[j] - cstart;
74           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
75           ierr = MatSetValues(aij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
76         }
77         else {
78           if (mat->was_assembled) {
79             if (!aij->colmap) {
80               ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
81             }
82             col = aij->colmap[in[j]] - 1;
83             if (col < 0 && !((Mat_SeqAIJ*)(aij->A->data))->nonew) {
84               ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
85               col =  in[j];
86             }
87           }
88           else col = in[j];
89           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
90           ierr = MatSetValues(aij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
91         }
92       }
93     }
94     else {
95       if (roworiented) {
96         ierr = StashValues_Private(&aij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
97       }
98       else {
99         row = im[i];
100         for ( j=0; j<n; j++ ) {
101           ierr = StashValues_Private(&aij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
102         }
103       }
104     }
105   }
106   return 0;
107 }
108 
109 static int MatGetValues_MPIAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
110 {
111   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
112   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
113   int        cstart = aij->cstart, cend = aij->cend,row,col;
114 
115   for ( i=0; i<m; i++ ) {
116     if (idxm[i] < 0) SETERRQ(1,"MatGetValues_MPIAIJ:Negative row");
117     if (idxm[i] >= aij->M) SETERRQ(1,"MatGetValues_MPIAIJ:Row too large");
118     if (idxm[i] >= rstart && idxm[i] < rend) {
119       row = idxm[i] - rstart;
120       for ( j=0; j<n; j++ ) {
121         if (idxn[j] < 0) SETERRQ(1,"MatGetValues_MPIAIJ:Negative column");
122         if (idxn[j] >= aij->N) SETERRQ(1,"MatGetValues_MPIAIJ:Col too large");
123         if (idxn[j] >= cstart && idxn[j] < cend){
124           col = idxn[j] - cstart;
125           ierr = MatGetValues(aij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
126         }
127         else {
128           if (!aij->colmap) {
129             ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
130           }
131           col = aij->colmap[idxn[j]] - 1;
132           if ((col < 0) || (aij->garray[col] != idxn[j])) *(v+i*n+j) = 0.0;
133           else {
134             ierr = MatGetValues(aij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
135           }
136         }
137       }
138     }
139     else {
140       SETERRQ(1,"MatGetValues_MPIAIJ:Only local values currently supported");
141     }
142   }
143   return 0;
144 }
145 
146 static int MatAssemblyBegin_MPIAIJ(Mat mat,MatAssemblyType mode)
147 {
148   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
149   MPI_Comm    comm = mat->comm;
150   int         size = aij->size, *owners = aij->rowners;
151   int         rank = aij->rank,tag = mat->tag, *owner,*starts,count,ierr;
152   MPI_Request *send_waits,*recv_waits;
153   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
154   InsertMode  addv;
155   Scalar      *rvalues,*svalues;
156 
157   /* make sure all processors are either in INSERTMODE or ADDMODE */
158   MPI_Allreduce(&aij->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
159   if (addv == (ADD_VALUES|INSERT_VALUES)) {
160     SETERRQ(1,"MatAssemblyBegin_MPIAIJ:Some processors inserted others added");
161   }
162   aij->insertmode = addv; /* in case this processor had no cache */
163 
164   /*  first count number of contributors to each processor */
165   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
166   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
167   owner = (int *) PetscMalloc( (aij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
168   for ( i=0; i<aij->stash.n; i++ ) {
169     idx = aij->stash.idx[i];
170     for ( j=0; j<size; j++ ) {
171       if (idx >= owners[j] && idx < owners[j+1]) {
172         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
173       }
174     }
175   }
176   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
177 
178   /* inform other processors of number of messages and max length*/
179   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
180   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
181   nreceives = work[rank];
182   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
183   nmax = work[rank];
184   PetscFree(work);
185 
186   /* post receives:
187        1) each message will consist of ordered pairs
188      (global index,value) we store the global index as a double
189      to simplify the message passing.
190        2) since we don't know how long each individual message is we
191      allocate the largest needed buffer for each receive. Potentially
192      this is a lot of wasted space.
193 
194 
195        This could be done better.
196   */
197   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
198   CHKPTRQ(rvalues);
199   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
200   CHKPTRQ(recv_waits);
201   for ( i=0; i<nreceives; i++ ) {
202     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
203               comm,recv_waits+i);
204   }
205 
206   /* do sends:
207       1) starts[i] gives the starting index in svalues for stuff going to
208          the ith processor
209   */
210   svalues = (Scalar *) PetscMalloc(3*(aij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
211   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
212   CHKPTRQ(send_waits);
213   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
214   starts[0] = 0;
215   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
216   for ( i=0; i<aij->stash.n; i++ ) {
217     svalues[3*starts[owner[i]]]       = (Scalar)  aij->stash.idx[i];
218     svalues[3*starts[owner[i]]+1]     = (Scalar)  aij->stash.idy[i];
219     svalues[3*(starts[owner[i]]++)+2] =  aij->stash.array[i];
220   }
221   PetscFree(owner);
222   starts[0] = 0;
223   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
224   count = 0;
225   for ( i=0; i<size; i++ ) {
226     if (procs[i]) {
227       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
228                 comm,send_waits+count++);
229     }
230   }
231   PetscFree(starts); PetscFree(nprocs);
232 
233   /* Free cache space */
234   PLogInfo(0,"[%d]MatAssemblyBegin_MPIAIJ:Number of off processor values %d\n",rank,aij->stash.n);
235   ierr = StashDestroy_Private(&aij->stash); CHKERRQ(ierr);
236 
237   aij->svalues    = svalues;    aij->rvalues    = rvalues;
238   aij->nsends     = nsends;     aij->nrecvs     = nreceives;
239   aij->send_waits = send_waits; aij->recv_waits = recv_waits;
240   aij->rmax       = nmax;
241 
242   return 0;
243 }
244 extern int MatSetUpMultiply_MPIAIJ(Mat);
245 
246 static int MatAssemblyEnd_MPIAIJ(Mat mat,MatAssemblyType mode)
247 {
248   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
249   MPI_Status  *send_status,recv_status;
250   int         imdex,nrecvs = aij->nrecvs, count = nrecvs, i, n, ierr;
251   int         row,col,other_disassembled;
252   Scalar      *values,val;
253   InsertMode  addv = aij->insertmode;
254 
255   /*  wait on receives */
256   while (count) {
257     MPI_Waitany(nrecvs,aij->recv_waits,&imdex,&recv_status);
258     /* unpack receives into our local space */
259     values = aij->rvalues + 3*imdex*aij->rmax;
260     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
261     n = n/3;
262     for ( i=0; i<n; i++ ) {
263       row = (int) PetscReal(values[3*i]) - aij->rstart;
264       col = (int) PetscReal(values[3*i+1]);
265       val = values[3*i+2];
266       if (col >= aij->cstart && col < aij->cend) {
267         col -= aij->cstart;
268         MatSetValues(aij->A,1,&row,1,&col,&val,addv);
269       }
270       else {
271         if (mat->was_assembled) {
272           if (!aij->colmap) {
273             ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
274           }
275           col = aij->colmap[col] - 1;
276           if (col < 0  && !((Mat_SeqAIJ*)(aij->A->data))->nonew) {
277             ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
278             col = (int) PetscReal(values[3*i+1]);
279           }
280         }
281         MatSetValues(aij->B,1,&row,1,&col,&val,addv);
282       }
283     }
284     count--;
285   }
286   PetscFree(aij->recv_waits); PetscFree(aij->rvalues);
287 
288   /* wait on sends */
289   if (aij->nsends) {
290     send_status = (MPI_Status *) PetscMalloc(aij->nsends*sizeof(MPI_Status));
291     CHKPTRQ(send_status);
292     MPI_Waitall(aij->nsends,aij->send_waits,send_status);
293     PetscFree(send_status);
294   }
295   PetscFree(aij->send_waits); PetscFree(aij->svalues);
296 
297   aij->insertmode = NOT_SET_VALUES;
298   ierr = MatAssemblyBegin(aij->A,mode); CHKERRQ(ierr);
299   ierr = MatAssemblyEnd(aij->A,mode); CHKERRQ(ierr);
300 
301   /* determine if any processor has disassembled, if so we must
302      also disassemble ourselfs, in order that we may reassemble. */
303   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
304   if (mat->was_assembled && !other_disassembled) {
305     ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
306   }
307 
308   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
309     ierr = MatSetUpMultiply_MPIAIJ(mat); CHKERRQ(ierr);
310   }
311   ierr = MatAssemblyBegin(aij->B,mode); CHKERRQ(ierr);
312   ierr = MatAssemblyEnd(aij->B,mode); CHKERRQ(ierr);
313 
314   if (aij->rowvalues) {PetscFree(aij->rowvalues); aij->rowvalues = 0;}
315   return 0;
316 }
317 
318 static int MatZeroEntries_MPIAIJ(Mat A)
319 {
320   Mat_MPIAIJ *l = (Mat_MPIAIJ *) A->data;
321   int        ierr;
322   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
323   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
324   return 0;
325 }
326 
327 /* the code does not do the diagonal entries correctly unless the
328    matrix is square and the column and row owerships are identical.
329    This is a BUG. The only way to fix it seems to be to access
330    aij->A and aij->B directly and not through the MatZeroRows()
331    routine.
332 */
333 static int MatZeroRows_MPIAIJ(Mat A,IS is,Scalar *diag)
334 {
335   Mat_MPIAIJ     *l = (Mat_MPIAIJ *) A->data;
336   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
337   int            *procs,*nprocs,j,found,idx,nsends,*work;
338   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
339   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
340   int            *lens,imdex,*lrows,*values;
341   MPI_Comm       comm = A->comm;
342   MPI_Request    *send_waits,*recv_waits;
343   MPI_Status     recv_status,*send_status;
344   IS             istmp;
345 
346   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
347   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
348 
349   /*  first count number of contributors to each processor */
350   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
351   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
352   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
353   for ( i=0; i<N; i++ ) {
354     idx = rows[i];
355     found = 0;
356     for ( j=0; j<size; j++ ) {
357       if (idx >= owners[j] && idx < owners[j+1]) {
358         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
359       }
360     }
361     if (!found) SETERRQ(1,"MatZeroRows_MPIAIJ:Index out of range");
362   }
363   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
364 
365   /* inform other processors of number of messages and max length*/
366   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
367   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
368   nrecvs = work[rank];
369   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
370   nmax = work[rank];
371   PetscFree(work);
372 
373   /* post receives:   */
374   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
375   CHKPTRQ(rvalues);
376   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
377   CHKPTRQ(recv_waits);
378   for ( i=0; i<nrecvs; i++ ) {
379     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
380   }
381 
382   /* do sends:
383       1) starts[i] gives the starting index in svalues for stuff going to
384          the ith processor
385   */
386   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
387   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
388   CHKPTRQ(send_waits);
389   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
390   starts[0] = 0;
391   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
392   for ( i=0; i<N; i++ ) {
393     svalues[starts[owner[i]]++] = rows[i];
394   }
395   ISRestoreIndices(is,&rows);
396 
397   starts[0] = 0;
398   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
399   count = 0;
400   for ( i=0; i<size; i++ ) {
401     if (procs[i]) {
402       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
403     }
404   }
405   PetscFree(starts);
406 
407   base = owners[rank];
408 
409   /*  wait on receives */
410   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
411   source = lens + nrecvs;
412   count  = nrecvs; slen = 0;
413   while (count) {
414     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
415     /* unpack receives into our local space */
416     MPI_Get_count(&recv_status,MPI_INT,&n);
417     source[imdex]  = recv_status.MPI_SOURCE;
418     lens[imdex]  = n;
419     slen += n;
420     count--;
421   }
422   PetscFree(recv_waits);
423 
424   /* move the data into the send scatter */
425   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
426   count = 0;
427   for ( i=0; i<nrecvs; i++ ) {
428     values = rvalues + i*nmax;
429     for ( j=0; j<lens[i]; j++ ) {
430       lrows[count++] = values[j] - base;
431     }
432   }
433   PetscFree(rvalues); PetscFree(lens);
434   PetscFree(owner); PetscFree(nprocs);
435 
436   /* actually zap the local rows */
437   ierr = ISCreateGeneral(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
438   PLogObjectParent(A,istmp);
439   PetscFree(lrows);
440   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
441   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
442   ierr = ISDestroy(istmp); CHKERRQ(ierr);
443 
444   /* wait on sends */
445   if (nsends) {
446     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
447     CHKPTRQ(send_status);
448     MPI_Waitall(nsends,send_waits,send_status);
449     PetscFree(send_status);
450   }
451   PetscFree(send_waits); PetscFree(svalues);
452 
453   return 0;
454 }
455 
456 static int MatMult_MPIAIJ(Mat A,Vec xx,Vec yy)
457 {
458   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
459   int        ierr,nt;
460 
461   ierr = VecGetLocalSize(xx,&nt);  CHKERRQ(ierr);
462   if (nt != a->n) {
463     SETERRQ(1,"MatMult_MPIAIJ:Incompatible parition of A and xx");
464   }
465   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx); CHKERRQ(ierr);
466   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
467   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx); CHKERRQ(ierr);
468   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
469   return 0;
470 }
471 
472 static int MatMultAdd_MPIAIJ(Mat A,Vec xx,Vec yy,Vec zz)
473 {
474   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
475   int        ierr;
476   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
477   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
478   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
479   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
480   return 0;
481 }
482 
483 static int MatMultTrans_MPIAIJ(Mat A,Vec xx,Vec yy)
484 {
485   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
486   int        ierr;
487 
488   /* do nondiagonal part */
489   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
490   /* send it on its way */
491   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
492   /* do local part */
493   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
494   /* receive remote parts: note this assumes the values are not actually */
495   /* inserted in yy until the next line, which is true for my implementation*/
496   /* but is not perhaps always true. */
497   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
498   return 0;
499 }
500 
501 static int MatMultTransAdd_MPIAIJ(Mat A,Vec xx,Vec yy,Vec zz)
502 {
503   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
504   int        ierr;
505 
506   /* do nondiagonal part */
507   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
508   /* send it on its way */
509   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
510   /* do local part */
511   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
512   /* receive remote parts: note this assumes the values are not actually */
513   /* inserted in yy until the next line, which is true for my implementation*/
514   /* but is not perhaps always true. */
515   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,
516                   (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
517   return 0;
518 }
519 
520 /*
521   This only works correctly for square matrices where the subblock A->A is the
522    diagonal block
523 */
524 static int MatGetDiagonal_MPIAIJ(Mat A,Vec v)
525 {
526   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
527   if (a->M != a->N)
528     SETERRQ(1,"MatGetDiagonal_MPIAIJ:Supports only square matrix where A->A is diag block");
529   if (a->rstart != a->cstart || a->rend != a->cend) {
530     SETERRQ(1,"MatGetDiagonal_MPIAIJ:row partition must equal col partition");  }
531   return MatGetDiagonal(a->A,v);
532 }
533 
534 static int MatScale_MPIAIJ(Scalar *aa,Mat A)
535 {
536   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
537   int        ierr;
538   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
539   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
540   return 0;
541 }
542 
543 static int MatDestroy_MPIAIJ(PetscObject obj)
544 {
545   Mat        mat = (Mat) obj;
546   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
547   int        ierr;
548 #if defined(PETSC_LOG)
549   PLogObjectState(obj,"Rows=%d, Cols=%d",aij->M,aij->N);
550 #endif
551   PetscFree(aij->rowners);
552   ierr = MatDestroy(aij->A); CHKERRQ(ierr);
553   ierr = MatDestroy(aij->B); CHKERRQ(ierr);
554   if (aij->colmap) PetscFree(aij->colmap);
555   if (aij->garray) PetscFree(aij->garray);
556   if (aij->lvec)   VecDestroy(aij->lvec);
557   if (aij->Mvctx)  VecScatterDestroy(aij->Mvctx);
558   if (aij->rowvalues) PetscFree(aij->rowvalues);
559   PetscFree(aij);
560   PLogObjectDestroy(mat);
561   PetscHeaderDestroy(mat);
562   return 0;
563 }
564 #include "draw.h"
565 #include "pinclude/pviewer.h"
566 
567 static int MatView_MPIAIJ_Binary(Mat mat,Viewer viewer)
568 {
569   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
570   int         ierr;
571 
572   if (aij->size == 1) {
573     ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
574   }
575   else SETERRQ(1,"MatView_MPIAIJ_Binary:Only uniprocessor output supported");
576   return 0;
577 }
578 
579 static int MatView_MPIAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
580 {
581   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
582   Mat_SeqAIJ* C = (Mat_SeqAIJ*)aij->A->data;
583   int         ierr, format,shift = C->indexshift,rank;
584   FILE        *fd;
585   ViewerType  vtype;
586 
587   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
588   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
589     ierr = ViewerGetFormat(viewer,&format);
590     if (format == ASCII_FORMAT_INFO_DETAILED) {
591       MatInfo info;
592       int     flg;
593       MPI_Comm_rank(mat->comm,&rank);
594       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
595       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
596       ierr = OptionsHasName(PETSC_NULL,"-mat_aij_no_inode",&flg); CHKERRQ(ierr);
597       PetscSequentialPhaseBegin(mat->comm,1);
598       if (flg) fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d mem %d, not using I-node routines\n",
599          rank,aij->m,(int)info.nz_used,(int)info.nz_allocated,(int)info.memory);
600       else fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d mem %d, using I-node routines\n",
601          rank,aij->m,(int)info.nz_used,(int)info.nz_allocated,(int)info.memory);
602       ierr = MatGetInfo(aij->A,MAT_LOCAL,&info);
603       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used);
604       ierr = MatGetInfo(aij->B,MAT_LOCAL,&info);
605       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used);
606       fflush(fd);
607       PetscSequentialPhaseEnd(mat->comm,1);
608       ierr = VecScatterView(aij->Mvctx,viewer); CHKERRQ(ierr);
609       return 0;
610     }
611     else if (format == ASCII_FORMAT_INFO) {
612       return 0;
613     }
614   }
615 
616   if (vtype == DRAW_VIEWER) {
617     Draw       draw;
618     PetscTruth isnull;
619     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
620     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
621   }
622 
623   if (vtype == ASCII_FILE_VIEWER) {
624     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
625     PetscSequentialPhaseBegin(mat->comm,1);
626     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
627            aij->rank,aij->m,aij->rstart,aij->rend,aij->n,aij->cstart,
628            aij->cend);
629     ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
630     ierr = MatView(aij->B,viewer); CHKERRQ(ierr);
631     fflush(fd);
632     PetscSequentialPhaseEnd(mat->comm,1);
633   }
634   else {
635     int size = aij->size;
636     rank = aij->rank;
637     if (size == 1) {
638       ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
639     }
640     else {
641       /* assemble the entire matrix onto first processor. */
642       Mat         A;
643       Mat_SeqAIJ *Aloc;
644       int         M = aij->M, N = aij->N,m,*ai,*aj,row,*cols,i,*ct;
645       Scalar      *a;
646 
647       if (!rank) {
648         ierr = MatCreateMPIAIJ(mat->comm,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
649                CHKERRQ(ierr);
650       }
651       else {
652         ierr = MatCreateMPIAIJ(mat->comm,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
653                CHKERRQ(ierr);
654       }
655       PLogObjectParent(mat,A);
656 
657       /* copy over the A part */
658       Aloc = (Mat_SeqAIJ*) aij->A->data;
659       m = Aloc->m; ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
660       row = aij->rstart;
661       for ( i=0; i<ai[m]+shift; i++ ) {aj[i] += aij->cstart + shift;}
662       for ( i=0; i<m; i++ ) {
663         ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],aj,a,INSERT_VALUES);CHKERRQ(ierr);
664         row++; a += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
665       }
666       aj = Aloc->j;
667       for ( i=0; i<ai[m]+shift; i++ ) {aj[i] -= aij->cstart + shift;}
668 
669       /* copy over the B part */
670       Aloc = (Mat_SeqAIJ*) aij->B->data;
671       m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
672       row = aij->rstart;
673       ct = cols = (int *) PetscMalloc( (ai[m]+1)*sizeof(int) ); CHKPTRQ(cols);
674       for ( i=0; i<ai[m]+shift; i++ ) {cols[i] = aij->garray[aj[i]+shift];}
675       for ( i=0; i<m; i++ ) {
676         ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],cols,a,INSERT_VALUES);CHKERRQ(ierr);
677         row++; a += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
678       }
679       PetscFree(ct);
680       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
681       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
682       if (!rank) {
683         ierr = MatView(((Mat_MPIAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
684       }
685       ierr = MatDestroy(A); CHKERRQ(ierr);
686     }
687   }
688   return 0;
689 }
690 
691 static int MatView_MPIAIJ(PetscObject obj,Viewer viewer)
692 {
693   Mat         mat = (Mat) obj;
694   int         ierr;
695   ViewerType  vtype;
696 
697   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
698   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
699       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
700     ierr = MatView_MPIAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
701   }
702   else if (vtype == BINARY_FILE_VIEWER) {
703     return MatView_MPIAIJ_Binary(mat,viewer);
704   }
705   return 0;
706 }
707 
708 /*
709     This has to provide several versions.
710 
711      2) a) use only local smoothing updating outer values only once.
712         b) local smoothing updating outer values each inner iteration
713      3) color updating out values betwen colors.
714 */
715 static int MatRelax_MPIAIJ(Mat matin,Vec bb,double omega,MatSORType flag,
716                            double fshift,int its,Vec xx)
717 {
718   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
719   Mat        AA = mat->A, BB = mat->B;
720   Mat_SeqAIJ *A = (Mat_SeqAIJ *) AA->data, *B = (Mat_SeqAIJ *)BB->data;
721   Scalar     *b,*x,*xs,*ls,d,*v,sum;
722   int        ierr,*idx, *diag;
723   int        n = mat->n, m = mat->m, i,shift = A->indexshift;
724 
725   VecGetArray(xx,&x); VecGetArray(bb,&b); VecGetArray(mat->lvec,&ls);
726   xs = x + shift; /* shift by one for index start of 1 */
727   ls = ls + shift;
728   if (!A->diag) {if ((ierr = MatMarkDiag_SeqAIJ(AA))) return ierr;}
729   diag = A->diag;
730   if ((flag & SOR_LOCAL_SYMMETRIC_SWEEP) == SOR_LOCAL_SYMMETRIC_SWEEP){
731     if (flag & SOR_ZERO_INITIAL_GUESS) {
732       return (*mat->A->ops.relax)(mat->A,bb,omega,flag,fshift,its,xx);
733     }
734     ierr=VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,mat->Mvctx);
735     CHKERRQ(ierr);
736     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,mat->Mvctx);
737     CHKERRQ(ierr);
738     while (its--) {
739       /* go down through the rows */
740       for ( i=0; i<m; i++ ) {
741         n    = A->i[i+1] - A->i[i];
742         idx  = A->j + A->i[i] + shift;
743         v    = A->a + A->i[i] + shift;
744         sum  = b[i];
745         SPARSEDENSEMDOT(sum,xs,v,idx,n);
746         d    = fshift + A->a[diag[i]+shift];
747         n    = B->i[i+1] - B->i[i];
748         idx  = B->j + B->i[i] + shift;
749         v    = B->a + B->i[i] + shift;
750         SPARSEDENSEMDOT(sum,ls,v,idx,n);
751         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
752       }
753       /* come up through the rows */
754       for ( i=m-1; i>-1; i-- ) {
755         n    = A->i[i+1] - A->i[i];
756         idx  = A->j + A->i[i] + shift;
757         v    = A->a + A->i[i] + shift;
758         sum  = b[i];
759         SPARSEDENSEMDOT(sum,xs,v,idx,n);
760         d    = fshift + A->a[diag[i]+shift];
761         n    = B->i[i+1] - B->i[i];
762         idx  = B->j + B->i[i] + shift;
763         v    = B->a + B->i[i] + shift;
764         SPARSEDENSEMDOT(sum,ls,v,idx,n);
765         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
766       }
767     }
768   }
769   else if (flag & SOR_LOCAL_FORWARD_SWEEP){
770     if (flag & SOR_ZERO_INITIAL_GUESS) {
771       return (*mat->A->ops.relax)(mat->A,bb,omega,flag,fshift,its,xx);
772     }
773     ierr=VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,mat->Mvctx);
774     CHKERRQ(ierr);
775     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,mat->Mvctx);
776     CHKERRQ(ierr);
777     while (its--) {
778       for ( i=0; i<m; i++ ) {
779         n    = A->i[i+1] - A->i[i];
780         idx  = A->j + A->i[i] + shift;
781         v    = A->a + A->i[i] + shift;
782         sum  = b[i];
783         SPARSEDENSEMDOT(sum,xs,v,idx,n);
784         d    = fshift + A->a[diag[i]+shift];
785         n    = B->i[i+1] - B->i[i];
786         idx  = B->j + B->i[i] + shift;
787         v    = B->a + B->i[i] + shift;
788         SPARSEDENSEMDOT(sum,ls,v,idx,n);
789         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
790       }
791     }
792   }
793   else if (flag & SOR_LOCAL_BACKWARD_SWEEP){
794     if (flag & SOR_ZERO_INITIAL_GUESS) {
795       return (*mat->A->ops.relax)(mat->A,bb,omega,flag,fshift,its,xx);
796     }
797     ierr = VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,
798                             mat->Mvctx); CHKERRQ(ierr);
799     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,
800                             mat->Mvctx); CHKERRQ(ierr);
801     while (its--) {
802       for ( i=m-1; i>-1; i-- ) {
803         n    = A->i[i+1] - A->i[i];
804         idx  = A->j + A->i[i] + shift;
805         v    = A->a + A->i[i] + shift;
806         sum  = b[i];
807         SPARSEDENSEMDOT(sum,xs,v,idx,n);
808         d    = fshift + A->a[diag[i]+shift];
809         n    = B->i[i+1] - B->i[i];
810         idx  = B->j + B->i[i] + shift;
811         v    = B->a + B->i[i] + shift;
812         SPARSEDENSEMDOT(sum,ls,v,idx,n);
813         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
814       }
815     }
816   }
817   else {
818     SETERRQ(1,"MatRelax_MPIAIJ:Option not supported");
819   }
820   return 0;
821 }
822 
823 static int MatGetInfo_MPIAIJ(Mat matin,MatInfoType flag,MatInfo *info)
824 {
825   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
826   Mat        A = mat->A, B = mat->B;
827   int        ierr;
828   double     isend[5], irecv[5];
829 
830   info->rows_global    = (double)mat->M;
831   info->columns_global = (double)mat->N;
832   info->rows_local     = (double)mat->m;
833   info->columns_local  = (double)mat->N;
834   info->block_size     = 1.0;
835   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
836   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->nz_unneeded;
837   isend[3] = info->memory;  isend[4] = info->mallocs;
838   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
839   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->nz_unneeded;
840   isend[3] += info->memory;  isend[4] += info->mallocs;
841   if (flag == MAT_LOCAL) {
842     info->nz_used      = isend[0];
843     info->nz_allocated = isend[1];
844     info->nz_unneeded  = isend[2];
845     info->memory       = isend[3];
846     info->mallocs      = isend[4];
847   } else if (flag == MAT_GLOBAL_MAX) {
848     MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_MAX,matin->comm);
849     info->nz_used      = irecv[0];
850     info->nz_allocated = irecv[1];
851     info->nz_unneeded  = irecv[2];
852     info->memory       = irecv[3];
853     info->mallocs      = irecv[4];
854   } else if (flag == MAT_GLOBAL_SUM) {
855     MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_SUM,matin->comm);
856     info->nz_used      = irecv[0];
857     info->nz_allocated = irecv[1];
858     info->nz_unneeded  = irecv[2];
859     info->memory       = irecv[3];
860     info->mallocs      = irecv[4];
861   }
862   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
863   info->fill_ratio_needed = 0;
864   info->factor_mallocs    = 0;
865 
866   return 0;
867 }
868 
869 extern int MatLUFactorSymbolic_MPIAIJ(Mat,IS,IS,double,Mat*);
870 extern int MatLUFactorNumeric_MPIAIJ(Mat,Mat*);
871 extern int MatLUFactor_MPIAIJ(Mat,IS,IS,double);
872 extern int MatILUFactorSymbolic_MPIAIJ(Mat,IS,IS,double,int,Mat *);
873 extern int MatSolve_MPIAIJ(Mat,Vec,Vec);
874 extern int MatSolveAdd_MPIAIJ(Mat,Vec,Vec,Vec);
875 extern int MatSolveTrans_MPIAIJ(Mat,Vec,Vec);
876 extern int MatSolveTransAdd_MPIAIJ(Mat,Vec,Vec,Vec);
877 
878 static int MatSetOption_MPIAIJ(Mat A,MatOption op)
879 {
880   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
881 
882   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
883       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
884       op == MAT_COLUMNS_SORTED ||
885       op == MAT_ROW_ORIENTED) {
886         MatSetOption(a->A,op);
887         MatSetOption(a->B,op);
888   }
889   else if (op == MAT_ROWS_SORTED ||
890            op == MAT_SYMMETRIC ||
891            op == MAT_STRUCTURALLY_SYMMETRIC ||
892            op == MAT_YES_NEW_DIAGONALS)
893     PLogInfo(A,"Info:MatSetOption_MPIAIJ:Option ignored\n");
894   else if (op == MAT_COLUMN_ORIENTED) {
895     a->roworiented = 0;
896     MatSetOption(a->A,op);
897     MatSetOption(a->B,op);
898   }
899   else if (op == MAT_NO_NEW_DIAGONALS)
900     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIAIJ:MAT_NO_NEW_DIAGONALS");}
901   else
902     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIAIJ:unknown option");}
903   return 0;
904 }
905 
906 static int MatGetSize_MPIAIJ(Mat matin,int *m,int *n)
907 {
908   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
909   *m = mat->M; *n = mat->N;
910   return 0;
911 }
912 
913 static int MatGetLocalSize_MPIAIJ(Mat matin,int *m,int *n)
914 {
915   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
916   *m = mat->m; *n = mat->N;
917   return 0;
918 }
919 
920 static int MatGetOwnershipRange_MPIAIJ(Mat matin,int *m,int *n)
921 {
922   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
923   *m = mat->rstart; *n = mat->rend;
924   return 0;
925 }
926 
927 extern int MatGetRow_SeqAIJ(Mat,int,int*,int**,Scalar**);
928 extern int MatRestoreRow_SeqAIJ(Mat,int,int*,int**,Scalar**);
929 
930 int MatGetRow_MPIAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
931 {
932   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
933   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
934   int        i, ierr, *cworkA, *cworkB, **pcA, **pcB, cstart = mat->cstart;
935   int        nztot, nzA, nzB, lrow, rstart = mat->rstart, rend = mat->rend;
936   int        *cmap, *idx_p;
937 
938   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,"MatGetRow_MPIAIJ:Already active");
939   mat->getrowactive = PETSC_TRUE;
940 
941   if (!mat->rowvalues && (idx || v)) {
942     /*
943         allocate enough space to hold information from the longest row.
944     */
945     Mat_SeqAIJ *Aa = (Mat_SeqAIJ *) mat->A->data,*Ba = (Mat_SeqAIJ *) mat->B->data;
946     int     max = 1,m = mat->m,tmp;
947     for ( i=0; i<m; i++ ) {
948       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
949       if (max < tmp) { max = tmp; }
950     }
951     mat->rowvalues = (Scalar *) PetscMalloc( max*(sizeof(int)+sizeof(Scalar)));
952     CHKPTRQ(mat->rowvalues);
953     mat->rowindices = (int *) (mat->rowvalues + max);
954   }
955 
956   if (row < rstart || row >= rend) SETERRQ(1,"MatGetRow_MPIAIJ:Only local rows")
957   lrow = row - rstart;
958 
959   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
960   if (!v)   {pvA = 0; pvB = 0;}
961   if (!idx) {pcA = 0; if (!v) pcB = 0;}
962   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
963   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
964   nztot = nzA + nzB;
965 
966   cmap  = mat->garray;
967   if (v  || idx) {
968     if (nztot) {
969       /* Sort by increasing column numbers, assuming A and B already sorted */
970       int imark = -1;
971       if (v) {
972         *v = v_p = mat->rowvalues;
973         for ( i=0; i<nzB; i++ ) {
974           if (cmap[cworkB[i]] < cstart)   v_p[i] = vworkB[i];
975           else break;
976         }
977         imark = i;
978         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
979         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
980       }
981       if (idx) {
982         *idx = idx_p = mat->rowindices;
983         if (imark > -1) {
984           for ( i=0; i<imark; i++ ) {
985             idx_p[i] = cmap[cworkB[i]];
986           }
987         } else {
988           for ( i=0; i<nzB; i++ ) {
989             if (cmap[cworkB[i]] < cstart)   idx_p[i] = cmap[cworkB[i]];
990             else break;
991           }
992           imark = i;
993         }
994         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart + cworkA[i];
995         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]];
996       }
997     }
998     else {
999       if (idx) *idx = 0;
1000       if (v)   *v   = 0;
1001     }
1002   }
1003   *nz = nztot;
1004   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1005   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1006   return 0;
1007 }
1008 
1009 int MatRestoreRow_MPIAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
1010 {
1011   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
1012   if (aij->getrowactive == PETSC_FALSE) {
1013     SETERRQ(1,"MatRestoreRow_MPIAIJ:MatGetRow not called");
1014   }
1015   aij->getrowactive = PETSC_FALSE;
1016   return 0;
1017 }
1018 
1019 static int MatNorm_MPIAIJ(Mat mat,NormType type,double *norm)
1020 {
1021   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
1022   Mat_SeqAIJ *amat = (Mat_SeqAIJ*) aij->A->data, *bmat = (Mat_SeqAIJ*) aij->B->data;
1023   int        ierr, i, j, cstart = aij->cstart,shift = amat->indexshift;
1024   double     sum = 0.0;
1025   Scalar     *v;
1026 
1027   if (aij->size == 1) {
1028     ierr =  MatNorm(aij->A,type,norm); CHKERRQ(ierr);
1029   } else {
1030     if (type == NORM_FROBENIUS) {
1031       v = amat->a;
1032       for (i=0; i<amat->nz; i++ ) {
1033 #if defined(PETSC_COMPLEX)
1034         sum += real(conj(*v)*(*v)); v++;
1035 #else
1036         sum += (*v)*(*v); v++;
1037 #endif
1038       }
1039       v = bmat->a;
1040       for (i=0; i<bmat->nz; i++ ) {
1041 #if defined(PETSC_COMPLEX)
1042         sum += real(conj(*v)*(*v)); v++;
1043 #else
1044         sum += (*v)*(*v); v++;
1045 #endif
1046       }
1047       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
1048       *norm = sqrt(*norm);
1049     }
1050     else if (type == NORM_1) { /* max column norm */
1051       double *tmp, *tmp2;
1052       int    *jj, *garray = aij->garray;
1053       tmp  = (double *) PetscMalloc( aij->N*sizeof(double) ); CHKPTRQ(tmp);
1054       tmp2 = (double *) PetscMalloc( aij->N*sizeof(double) ); CHKPTRQ(tmp2);
1055       PetscMemzero(tmp,aij->N*sizeof(double));
1056       *norm = 0.0;
1057       v = amat->a; jj = amat->j;
1058       for ( j=0; j<amat->nz; j++ ) {
1059         tmp[cstart + *jj++ + shift] += PetscAbsScalar(*v);  v++;
1060       }
1061       v = bmat->a; jj = bmat->j;
1062       for ( j=0; j<bmat->nz; j++ ) {
1063         tmp[garray[*jj++ + shift]] += PetscAbsScalar(*v); v++;
1064       }
1065       MPI_Allreduce(tmp,tmp2,aij->N,MPI_DOUBLE,MPI_SUM,mat->comm);
1066       for ( j=0; j<aij->N; j++ ) {
1067         if (tmp2[j] > *norm) *norm = tmp2[j];
1068       }
1069       PetscFree(tmp); PetscFree(tmp2);
1070     }
1071     else if (type == NORM_INFINITY) { /* max row norm */
1072       double ntemp = 0.0;
1073       for ( j=0; j<amat->m; j++ ) {
1074         v = amat->a + amat->i[j] + shift;
1075         sum = 0.0;
1076         for ( i=0; i<amat->i[j+1]-amat->i[j]; i++ ) {
1077           sum += PetscAbsScalar(*v); v++;
1078         }
1079         v = bmat->a + bmat->i[j] + shift;
1080         for ( i=0; i<bmat->i[j+1]-bmat->i[j]; i++ ) {
1081           sum += PetscAbsScalar(*v); v++;
1082         }
1083         if (sum > ntemp) ntemp = sum;
1084       }
1085       MPI_Allreduce(&ntemp,norm,1,MPI_DOUBLE,MPI_MAX,mat->comm);
1086     }
1087     else {
1088       SETERRQ(1,"MatNorm_MPIAIJ:No support for two norm");
1089     }
1090   }
1091   return 0;
1092 }
1093 
1094 static int MatTranspose_MPIAIJ(Mat A,Mat *matout)
1095 {
1096   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
1097   Mat_SeqAIJ *Aloc = (Mat_SeqAIJ *) a->A->data;
1098   int        ierr,shift = Aloc->indexshift;
1099   Mat        B;
1100   int        M = a->M, N = a->N,m,*ai,*aj,row,*cols,i,*ct;
1101   Scalar     *array;
1102 
1103   if (matout == PETSC_NULL && M != N)
1104     SETERRQ(1,"MatTranspose_MPIAIJ:Square matrix only for in-place");
1105   ierr = MatCreateMPIAIJ(A->comm,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,
1106          PETSC_NULL,&B); CHKERRQ(ierr);
1107 
1108   /* copy over the A part */
1109   Aloc = (Mat_SeqAIJ*) a->A->data;
1110   m = Aloc->m; ai = Aloc->i; aj = Aloc->j; array = Aloc->a;
1111   row = a->rstart;
1112   for ( i=0; i<ai[m]+shift; i++ ) {aj[i] += a->cstart + shift;}
1113   for ( i=0; i<m; i++ ) {
1114     ierr = MatSetValues(B,ai[i+1]-ai[i],aj,1,&row,array,INSERT_VALUES);CHKERRQ(ierr);
1115     row++; array += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
1116   }
1117   aj = Aloc->j;
1118   for ( i=0; i<ai[m]+shift; i++ ) {aj[i] -= a->cstart + shift;}
1119 
1120   /* copy over the B part */
1121   Aloc = (Mat_SeqAIJ*) a->B->data;
1122   m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; array = Aloc->a;
1123   row = a->rstart;
1124   ct = cols = (int *) PetscMalloc( (1+ai[m]-shift)*sizeof(int) ); CHKPTRQ(cols);
1125   for ( i=0; i<ai[m]+shift; i++ ) {cols[i] = a->garray[aj[i]+shift];}
1126   for ( i=0; i<m; i++ ) {
1127     ierr = MatSetValues(B,ai[i+1]-ai[i],cols,1,&row,array,INSERT_VALUES);CHKERRQ(ierr);
1128     row++; array += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
1129   }
1130   PetscFree(ct);
1131   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1132   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1133   if (matout != PETSC_NULL) {
1134     *matout = B;
1135   } else {
1136     /* This isn't really an in-place transpose .... but free data structures from a */
1137     PetscFree(a->rowners);
1138     ierr = MatDestroy(a->A); CHKERRQ(ierr);
1139     ierr = MatDestroy(a->B); CHKERRQ(ierr);
1140     if (a->colmap) PetscFree(a->colmap);
1141     if (a->garray) PetscFree(a->garray);
1142     if (a->lvec) VecDestroy(a->lvec);
1143     if (a->Mvctx) VecScatterDestroy(a->Mvctx);
1144     PetscFree(a);
1145     PetscMemcpy(A,B,sizeof(struct _Mat));
1146     PetscHeaderDestroy(B);
1147   }
1148   return 0;
1149 }
1150 
1151 extern int MatPrintHelp_SeqAIJ(Mat);
1152 static int MatPrintHelp_MPIAIJ(Mat A)
1153 {
1154   Mat_MPIAIJ *a   = (Mat_MPIAIJ*) A->data;
1155 
1156   if (!a->rank) return MatPrintHelp_SeqAIJ(a->A);
1157   else return 0;
1158 }
1159 
1160 static int MatGetBlockSize_MPIAIJ(Mat A,int *bs)
1161 {
1162   *bs = 1;
1163   return 0;
1164 }
1165 
1166 extern int MatConvert_MPIAIJ(Mat,MatType,Mat *);
1167 static int MatConvertSameType_MPIAIJ(Mat,Mat *,int);
1168 extern int MatIncreaseOverlap_MPIAIJ(Mat , int, IS *, int);
1169 int MatGetSubMatrices_MPIAIJ (Mat ,int , IS *,IS *,MatGetSubMatrixCall,Mat **);
1170 /* -------------------------------------------------------------------*/
1171 static struct _MatOps MatOps = {MatSetValues_MPIAIJ,
1172        MatGetRow_MPIAIJ,MatRestoreRow_MPIAIJ,
1173        MatMult_MPIAIJ,MatMultAdd_MPIAIJ,
1174        MatMultTrans_MPIAIJ,MatMultTransAdd_MPIAIJ,
1175        MatSolve_MPIAIJ,MatSolveAdd_MPIAIJ,
1176        MatSolveTrans_MPIAIJ,MatSolveTransAdd_MPIAIJ,
1177        MatLUFactor_MPIAIJ,0,
1178        MatRelax_MPIAIJ,
1179        MatTranspose_MPIAIJ,
1180        MatGetInfo_MPIAIJ,0,
1181        MatGetDiagonal_MPIAIJ,0,MatNorm_MPIAIJ,
1182        MatAssemblyBegin_MPIAIJ,MatAssemblyEnd_MPIAIJ,
1183        0,
1184        MatSetOption_MPIAIJ,MatZeroEntries_MPIAIJ,MatZeroRows_MPIAIJ,
1185        MatLUFactorSymbolic_MPIAIJ,MatLUFactorNumeric_MPIAIJ,0,0,
1186        MatGetSize_MPIAIJ,MatGetLocalSize_MPIAIJ,MatGetOwnershipRange_MPIAIJ,
1187        MatILUFactorSymbolic_MPIAIJ,0,
1188        0,0,MatConvert_MPIAIJ,MatConvertSameType_MPIAIJ,0,0,
1189        0,0,0,
1190        MatGetSubMatrices_MPIAIJ,MatIncreaseOverlap_MPIAIJ,MatGetValues_MPIAIJ,0,
1191        MatPrintHelp_MPIAIJ,
1192        MatScale_MPIAIJ,0,0,0,
1193        MatGetBlockSize_MPIAIJ,
1194        MatGetRowIJ_MPIAIJ,
1195        MatRestoreRowIJ_MPIAIJ};
1196 
1197 /*@C
1198    MatCreateMPIAIJ - Creates a sparse parallel matrix in AIJ format
1199    (the default parallel PETSc format).  For good matrix assembly performance
1200    the user should preallocate the matrix storage by setting the parameters
1201    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1202    performance can be increased by more than a factor of 50.
1203 
1204    Input Parameters:
1205 .  comm - MPI communicator
1206 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1207            This value should be the same as the local size used in creating the
1208            y vector for the matrix-vector product y = Ax.
1209 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1210            This value should be the same as the local size used in creating the
1211            x vector for the matrix-vector product y = Ax.
1212 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1213 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1214 .  d_nz - number of nonzeros per row in diagonal portion of local submatrix
1215            (same for all local rows)
1216 .  d_nzz - array containing the number of nonzeros in the various rows of the
1217            diagonal portion of the local submatrix (possibly different for each row)
1218            or PETSC_NULL. You must leave room for the diagonal entry even if
1219            it is zero.
1220 .  o_nz - number of nonzeros per row in the off-diagonal portion of local
1221            submatrix (same for all local rows).
1222 .  o_nzz - array containing the number of nonzeros in the various rows of the
1223            off-diagonal portion of the local submatrix (possibly different for
1224            each row) or PETSC_NULL.
1225 
1226    Output Parameter:
1227 .  A - the matrix
1228 
1229    Notes:
1230    The AIJ format (also called the Yale sparse matrix format or
1231    compressed row storage), is fully compatible with standard Fortran 77
1232    storage.  That is, the stored row and column indices can begin at
1233    either one (as in Fortran) or zero.  See the users manual for details.
1234 
1235    The user MUST specify either the local or global matrix dimensions
1236    (possibly both).
1237 
1238    By default, this format uses inodes (identical nodes) when possible.
1239    We search for consecutive rows with the same nonzero structure, thereby
1240    reusing matrix information to achieve increased efficiency.
1241 
1242    Options Database Keys:
1243 $    -mat_aij_no_inode  - Do not use inodes
1244 $    -mat_aij_inode_limit <limit> - Set inode limit.
1245 $        (max limit=5)
1246 $    -mat_aij_oneindex - Internally use indexing starting at 1
1247 $        rather than 0.  Note: When calling MatSetValues(),
1248 $        the user still MUST index entries starting at 0!
1249 
1250    Storage Information:
1251    For a square global matrix we define each processor's diagonal portion
1252    to be its local rows and the corresponding columns (a square submatrix);
1253    each processor's off-diagonal portion encompasses the remainder of the
1254    local matrix (a rectangular submatrix).
1255 
1256    The user can specify preallocated storage for the diagonal part of
1257    the local submatrix with either d_nz or d_nnz (not both).  Set
1258    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1259    memory allocation.  Likewise, specify preallocated storage for the
1260    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1261 
1262    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1263    the figure below we depict these three local rows and all columns (0-11).
1264 
1265 $          0 1 2 3 4 5 6 7 8 9 10 11
1266 $         -------------------
1267 $  row 3  |  o o o d d d o o o o o o
1268 $  row 4  |  o o o d d d o o o o o o
1269 $  row 5  |  o o o d d d o o o o o o
1270 $         -------------------
1271 $
1272 
1273    Thus, any entries in the d locations are stored in the d (diagonal)
1274    submatrix, and any entries in the o locations are stored in the
1275    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1276    stored simply in the MATSEQAIJ format for compressed row storage.
1277 
1278    Now d_nz should indicate the number of nonzeros per row in the d matrix,
1279    and o_nz should indicate the number of nonzeros per row in the o matrix.
1280    In general, for PDE problems in which most nonzeros are near the diagonal,
1281    one expects d_nz >> o_nz. For large problems you MUST preallocate memory
1282    or you will get TERRIBLE performance; see the users' manual chapter on
1283    matrices and the file $(PETSC_DIR)/Performance.
1284 
1285 .keywords: matrix, aij, compressed row, sparse, parallel
1286 
1287 .seealso: MatCreate(), MatCreateSeqAIJ(), MatSetValues()
1288 @*/
1289 int MatCreateMPIAIJ(MPI_Comm comm,int m,int n,int M,int N,
1290                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1291 {
1292   Mat          B;
1293   Mat_MPIAIJ   *b;
1294   int          ierr, i,sum[2],work[2];
1295 
1296   *A = 0;
1297   PetscHeaderCreate(B,_Mat,MAT_COOKIE,MATMPIAIJ,comm);
1298   PLogObjectCreate(B);
1299   B->data       = (void *) (b = PetscNew(Mat_MPIAIJ)); CHKPTRQ(b);
1300   PetscMemzero(b,sizeof(Mat_MPIAIJ));
1301   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1302   B->destroy    = MatDestroy_MPIAIJ;
1303   B->view       = MatView_MPIAIJ;
1304   B->factor     = 0;
1305   B->assembled  = PETSC_FALSE;
1306 
1307   b->insertmode = NOT_SET_VALUES;
1308   MPI_Comm_rank(comm,&b->rank);
1309   MPI_Comm_size(comm,&b->size);
1310 
1311   if (m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
1312     SETERRQ(1,"MatCreateMPIAIJ:Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1313 
1314   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1315     work[0] = m; work[1] = n;
1316     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
1317     if (M == PETSC_DECIDE) M = sum[0];
1318     if (N == PETSC_DECIDE) N = sum[1];
1319   }
1320   if (m == PETSC_DECIDE) {m = M/b->size + ((M % b->size) > b->rank);}
1321   if (n == PETSC_DECIDE) {n = N/b->size + ((N % b->size) > b->rank);}
1322   b->m = m; B->m = m;
1323   b->n = n; B->n = n;
1324   b->N = N; B->N = N;
1325   b->M = M; B->M = M;
1326 
1327   /* build local table of row and column ownerships */
1328   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1329   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIAIJ));
1330   b->cowners = b->rowners + b->size + 2;
1331   MPI_Allgather(&m,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
1332   b->rowners[0] = 0;
1333   for ( i=2; i<=b->size; i++ ) {
1334     b->rowners[i] += b->rowners[i-1];
1335   }
1336   b->rstart = b->rowners[b->rank];
1337   b->rend   = b->rowners[b->rank+1];
1338   MPI_Allgather(&n,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
1339   b->cowners[0] = 0;
1340   for ( i=2; i<=b->size; i++ ) {
1341     b->cowners[i] += b->cowners[i-1];
1342   }
1343   b->cstart = b->cowners[b->rank];
1344   b->cend   = b->cowners[b->rank+1];
1345 
1346   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1347   ierr = MatCreateSeqAIJ(MPI_COMM_SELF,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1348   PLogObjectParent(B,b->A);
1349   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1350   ierr = MatCreateSeqAIJ(MPI_COMM_SELF,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1351   PLogObjectParent(B,b->B);
1352 
1353   /* build cache for off array entries formed */
1354   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1355   b->colmap      = 0;
1356   b->garray      = 0;
1357   b->roworiented = 1;
1358 
1359   /* stuff used for matrix vector multiply */
1360   b->lvec      = 0;
1361   b->Mvctx     = 0;
1362 
1363   /* stuff for MatGetRow() */
1364   b->rowindices   = 0;
1365   b->rowvalues    = 0;
1366   b->getrowactive = PETSC_FALSE;
1367 
1368   *A = B;
1369   return 0;
1370 }
1371 
1372 static int MatConvertSameType_MPIAIJ(Mat matin,Mat *newmat,int cpvalues)
1373 {
1374   Mat        mat;
1375   Mat_MPIAIJ *a,*oldmat = (Mat_MPIAIJ *) matin->data;
1376   int        ierr, len=0, flg;
1377 
1378   *newmat       = 0;
1379   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIAIJ,matin->comm);
1380   PLogObjectCreate(mat);
1381   mat->data       = (void *) (a = PetscNew(Mat_MPIAIJ)); CHKPTRQ(a);
1382   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1383   mat->destroy    = MatDestroy_MPIAIJ;
1384   mat->view       = MatView_MPIAIJ;
1385   mat->factor     = matin->factor;
1386   mat->assembled  = PETSC_TRUE;
1387 
1388   a->m = mat->m   = oldmat->m;
1389   a->n = mat->n   = oldmat->n;
1390   a->M = mat->M   = oldmat->M;
1391   a->N = mat->N   = oldmat->N;
1392 
1393   a->rstart       = oldmat->rstart;
1394   a->rend         = oldmat->rend;
1395   a->cstart       = oldmat->cstart;
1396   a->cend         = oldmat->cend;
1397   a->size         = oldmat->size;
1398   a->rank         = oldmat->rank;
1399   a->insertmode   = NOT_SET_VALUES;
1400   a->rowvalues    = 0;
1401   a->getrowactive = PETSC_FALSE;
1402 
1403   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1404   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIAIJ));
1405   a->cowners = a->rowners + a->size + 2;
1406   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1407   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1408   if (oldmat->colmap) {
1409     a->colmap = (int *) PetscMalloc((a->N)*sizeof(int));CHKPTRQ(a->colmap);
1410     PLogObjectMemory(mat,(a->N)*sizeof(int));
1411     PetscMemcpy(a->colmap,oldmat->colmap,(a->N)*sizeof(int));
1412   } else a->colmap = 0;
1413   if (oldmat->garray) {
1414     len = ((Mat_SeqAIJ *) (oldmat->B->data))->n;
1415     a->garray = (int *) PetscMalloc((len+1)*sizeof(int)); CHKPTRQ(a->garray);
1416     PLogObjectMemory(mat,len*sizeof(int));
1417     if (len) PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1418   } else a->garray = 0;
1419 
1420   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1421   PLogObjectParent(mat,a->lvec);
1422   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1423   PLogObjectParent(mat,a->Mvctx);
1424   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1425   PLogObjectParent(mat,a->A);
1426   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1427   PLogObjectParent(mat,a->B);
1428   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1429   if (flg) {
1430     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1431   }
1432   *newmat = mat;
1433   return 0;
1434 }
1435 
1436 #include "sys.h"
1437 
1438 int MatLoad_MPIAIJ(Viewer viewer,MatType type,Mat *newmat)
1439 {
1440   Mat          A;
1441   int          i, nz, ierr, j,rstart, rend, fd;
1442   Scalar       *vals,*svals;
1443   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1444   MPI_Status   status;
1445   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,maxnz,*cols;
1446   int          *ourlens,*sndcounts = 0,*procsnz = 0, *offlens,jj,*mycols,*smycols;
1447   int          tag = ((PetscObject)viewer)->tag;
1448 
1449   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1450   if (!rank) {
1451     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1452     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
1453     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIAIJ:not matrix object");
1454   }
1455 
1456   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1457   M = header[1]; N = header[2];
1458   /* determine ownership of all rows */
1459   m = M/size + ((M % size) > rank);
1460   rowners = (int *) PetscMalloc((size+2)*sizeof(int)); CHKPTRQ(rowners);
1461   MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1462   rowners[0] = 0;
1463   for ( i=2; i<=size; i++ ) {
1464     rowners[i] += rowners[i-1];
1465   }
1466   rstart = rowners[rank];
1467   rend   = rowners[rank+1];
1468 
1469   /* distribute row lengths to all processors */
1470   ourlens = (int*) PetscMalloc( 2*(rend-rstart)*sizeof(int) ); CHKPTRQ(ourlens);
1471   offlens = ourlens + (rend-rstart);
1472   if (!rank) {
1473     rowlengths = (int*) PetscMalloc( M*sizeof(int) ); CHKPTRQ(rowlengths);
1474     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1475     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1476     for ( i=0; i<size; i++ ) sndcounts[i] = rowners[i+1] - rowners[i];
1477     MPI_Scatterv(rowlengths,sndcounts,rowners,MPI_INT,ourlens,rend-rstart,MPI_INT,0,comm);
1478     PetscFree(sndcounts);
1479   }
1480   else {
1481     MPI_Scatterv(0,0,0,MPI_INT,ourlens,rend-rstart,MPI_INT, 0,comm);
1482   }
1483 
1484   if (!rank) {
1485     /* calculate the number of nonzeros on each processor */
1486     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1487     PetscMemzero(procsnz,size*sizeof(int));
1488     for ( i=0; i<size; i++ ) {
1489       for ( j=rowners[i]; j< rowners[i+1]; j++ ) {
1490         procsnz[i] += rowlengths[j];
1491       }
1492     }
1493     PetscFree(rowlengths);
1494 
1495     /* determine max buffer needed and allocate it */
1496     maxnz = 0;
1497     for ( i=0; i<size; i++ ) {
1498       maxnz = PetscMax(maxnz,procsnz[i]);
1499     }
1500     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1501 
1502     /* read in my part of the matrix column indices  */
1503     nz = procsnz[0];
1504     mycols = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1505     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1506 
1507     /* read in every one elses and ship off */
1508     for ( i=1; i<size; i++ ) {
1509       nz = procsnz[i];
1510       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1511       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1512     }
1513     PetscFree(cols);
1514   }
1515   else {
1516     /* determine buffer space needed for message */
1517     nz = 0;
1518     for ( i=0; i<m; i++ ) {
1519       nz += ourlens[i];
1520     }
1521     mycols = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1522 
1523     /* receive message of column indices*/
1524     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1525     MPI_Get_count(&status,MPI_INT,&maxnz);
1526     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIAIJ:something is wrong with file");
1527   }
1528 
1529   /* loop over local rows, determining number of off diagonal entries */
1530   PetscMemzero(offlens,m*sizeof(int));
1531   jj = 0;
1532   for ( i=0; i<m; i++ ) {
1533     for ( j=0; j<ourlens[i]; j++ ) {
1534       if (mycols[jj] < rstart || mycols[jj] >= rend) offlens[i]++;
1535       jj++;
1536     }
1537   }
1538 
1539   /* create our matrix */
1540   for ( i=0; i<m; i++ ) {
1541     ourlens[i] -= offlens[i];
1542   }
1543   ierr = MatCreateMPIAIJ(comm,m,PETSC_DECIDE,M,N,0,ourlens,0,offlens,newmat);CHKERRQ(ierr);
1544   A = *newmat;
1545   MatSetOption(A,MAT_COLUMNS_SORTED);
1546   for ( i=0; i<m; i++ ) {
1547     ourlens[i] += offlens[i];
1548   }
1549 
1550   if (!rank) {
1551     vals = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(vals);
1552 
1553     /* read in my part of the matrix numerical values  */
1554     nz = procsnz[0];
1555     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1556 
1557     /* insert into matrix */
1558     jj      = rstart;
1559     smycols = mycols;
1560     svals   = vals;
1561     for ( i=0; i<m; i++ ) {
1562       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1563       smycols += ourlens[i];
1564       svals   += ourlens[i];
1565       jj++;
1566     }
1567 
1568     /* read in other processors and ship out */
1569     for ( i=1; i<size; i++ ) {
1570       nz = procsnz[i];
1571       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1572       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1573     }
1574     PetscFree(procsnz);
1575   }
1576   else {
1577     /* receive numeric values */
1578     vals = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(vals);
1579 
1580     /* receive message of values*/
1581     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1582     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1583     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIAIJ:something is wrong with file");
1584 
1585     /* insert into matrix */
1586     jj      = rstart;
1587     smycols = mycols;
1588     svals   = vals;
1589     for ( i=0; i<m; i++ ) {
1590       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1591       smycols += ourlens[i];
1592       svals   += ourlens[i];
1593       jj++;
1594     }
1595   }
1596   PetscFree(ourlens); PetscFree(vals); PetscFree(mycols); PetscFree(rowners);
1597 
1598   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1599   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1600   return 0;
1601 }
1602