xref: /petsc/src/mat/impls/dense/mpi/mpidense.c (revision 77f663780391b9cec2cf2a45a215aa3217e95ae6)
1 #ifndef lint
2 static char vcid[] = "$Id: mpidense.c,v 1.55 1996/11/27 22:52:57 bsmith Exp curfman $";
3 #endif
4 
5 /*
6    Basic functions for basic parallel dense matrices.
7 */
8 
9 #include "src/mat/impls/dense/mpi/mpidense.h"
10 #include "src/vec/vecimpl.h"
11 
12 static int MatSetValues_MPIDense(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v,InsertMode addv)
13 {
14   Mat_MPIDense *A = (Mat_MPIDense *) mat->data;
15   int          ierr, i, j, rstart = A->rstart, rend = A->rend, row;
16   int          roworiented = A->roworiented;
17 
18   if (A->insertmode != NOT_SET_VALUES && A->insertmode != addv) {
19     SETERRQ(1,"MatSetValues_MPIDense:Cannot mix inserts and adds");
20   }
21   A->insertmode = addv;
22   for ( i=0; i<m; i++ ) {
23     if (idxm[i] < 0) SETERRQ(1,"MatSetValues_MPIDense:Negative row");
24     if (idxm[i] >= A->M) SETERRQ(1,"MatSetValues_MPIDense:Row too large");
25     if (idxm[i] >= rstart && idxm[i] < rend) {
26       row = idxm[i] - rstart;
27       if (roworiented) {
28         ierr = MatSetValues(A->A,1,&row,n,idxn,v+i*n,addv); CHKERRQ(ierr);
29       }
30       else {
31         for ( j=0; j<n; j++ ) {
32           if (idxn[j] < 0) SETERRQ(1,"MatSetValues_MPIDense:Negative column");
33           if (idxn[j] >= A->N) SETERRQ(1,"MatSetValues_MPIDense:Column too large");
34           ierr = MatSetValues(A->A,1,&row,1,&idxn[j],v+i+j*m,addv); CHKERRQ(ierr);
35         }
36       }
37     }
38     else {
39       if (roworiented) {
40         ierr = StashValues_Private(&A->stash,idxm[i],n,idxn,v+i*n,addv); CHKERRQ(ierr);
41       }
42       else { /* must stash each seperately */
43         row = idxm[i];
44         for ( j=0; j<n; j++ ) {
45           ierr = StashValues_Private(&A->stash,row,1,&idxn[j],v+i+j*m,addv);CHKERRQ(ierr);
46         }
47       }
48     }
49   }
50   return 0;
51 }
52 
53 static int MatGetValues_MPIDense(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
54 {
55   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
56   int          ierr, i, j, rstart = mdn->rstart, rend = mdn->rend, row;
57 
58   for ( i=0; i<m; i++ ) {
59     if (idxm[i] < 0) SETERRQ(1,"MatGetValues_MPIDense:Negative row");
60     if (idxm[i] >= mdn->M) SETERRQ(1,"MatGetValues_MPIDense:Row too large");
61     if (idxm[i] >= rstart && idxm[i] < rend) {
62       row = idxm[i] - rstart;
63       for ( j=0; j<n; j++ ) {
64         if (idxn[j] < 0) SETERRQ(1,"MatGetValues_MPIDense:Negative column");
65         if (idxn[j] >= mdn->N)
66           SETERRQ(1,"MatGetValues_MPIDense:Column too large");
67         ierr = MatGetValues(mdn->A,1,&row,1,&idxn[j],v+i*n+j); CHKERRQ(ierr);
68       }
69     }
70     else {
71       SETERRQ(1,"MatGetValues_MPIDense:Only local values currently supported");
72     }
73   }
74   return 0;
75 }
76 
77 static int MatGetArray_MPIDense(Mat A,Scalar **array)
78 {
79   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
80   int ierr;
81 
82   ierr = MatGetArray(a->A,array); CHKERRQ(ierr);
83   return 0;
84 }
85 
86 static int MatRestoreArray_MPIDense(Mat A,Scalar **array)
87 {
88   return 0;
89 }
90 
91 static int MatAssemblyBegin_MPIDense(Mat mat,MatAssemblyType mode)
92 {
93   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
94   MPI_Comm     comm = mat->comm;
95   int          size = mdn->size, *owners = mdn->rowners, rank = mdn->rank;
96   int          *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
97   int          tag = mat->tag, *owner,*starts,count,ierr;
98   InsertMode   addv;
99   MPI_Request  *send_waits,*recv_waits;
100   Scalar       *rvalues,*svalues;
101 
102   /* make sure all processors are either in INSERTMODE or ADDMODE */
103   MPI_Allreduce(&mdn->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
104   if (addv == (ADD_VALUES|INSERT_VALUES)) {
105     SETERRQ(1,"MatAssemblyBegin_MPIDense:Cannot mix adds/inserts on different procs");
106   }
107   mdn->insertmode = addv; /* in case this processor had no cache */
108 
109   /*  first count number of contributors to each processor */
110   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
111   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
112   owner = (int *) PetscMalloc( (mdn->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
113   for ( i=0; i<mdn->stash.n; i++ ) {
114     idx = mdn->stash.idx[i];
115     for ( j=0; j<size; j++ ) {
116       if (idx >= owners[j] && idx < owners[j+1]) {
117         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
118       }
119     }
120   }
121   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
122 
123   /* inform other processors of number of messages and max length*/
124   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
125   MPI_Allreduce(procs,work,size,MPI_INT,MPI_SUM,comm);
126   nreceives = work[rank];
127   if (nreceives > size) SETERRQ(1,"MatAssemblyBegin_MPIDense:Internal PETSc error");
128   MPI_Allreduce(nprocs,work,size,MPI_INT,MPI_MAX,comm);
129   nmax = work[rank];
130   PetscFree(work);
131 
132   /* post receives:
133        1) each message will consist of ordered pairs
134      (global index,value) we store the global index as a double
135      to simplify the message passing.
136        2) since we don't know how long each individual message is we
137      allocate the largest needed buffer for each receive. Potentially
138      this is a lot of wasted space.
139 
140        This could be done better.
141   */
142   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));CHKPTRQ(rvalues);
143   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
144   for ( i=0; i<nreceives; i++ ) {
145     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
146   }
147 
148   /* do sends:
149       1) starts[i] gives the starting index in svalues for stuff going to
150          the ith processor
151   */
152   svalues = (Scalar *) PetscMalloc( 3*(mdn->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
153   send_waits = (MPI_Request *) PetscMalloc((nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
154   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
155   starts[0] = 0;
156   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
157   for ( i=0; i<mdn->stash.n; i++ ) {
158     svalues[3*starts[owner[i]]]       = (Scalar)  mdn->stash.idx[i];
159     svalues[3*starts[owner[i]]+1]     = (Scalar)  mdn->stash.idy[i];
160     svalues[3*(starts[owner[i]]++)+2] =  mdn->stash.array[i];
161   }
162   PetscFree(owner);
163   starts[0] = 0;
164   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
165   count = 0;
166   for ( i=0; i<size; i++ ) {
167     if (procs[i]) {
168       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,comm,send_waits+count++);
169     }
170   }
171   PetscFree(starts); PetscFree(nprocs);
172 
173   /* Free cache space */
174   ierr = StashDestroy_Private(&mdn->stash); CHKERRQ(ierr);
175 
176   mdn->svalues    = svalues;    mdn->rvalues = rvalues;
177   mdn->nsends     = nsends;     mdn->nrecvs = nreceives;
178   mdn->send_waits = send_waits; mdn->recv_waits = recv_waits;
179   mdn->rmax       = nmax;
180 
181   return 0;
182 }
183 extern int MatSetUpMultiply_MPIDense(Mat);
184 
185 static int MatAssemblyEnd_MPIDense(Mat mat,MatAssemblyType mode)
186 {
187   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
188   MPI_Status   *send_status,recv_status;
189   int          imdex, nrecvs=mdn->nrecvs, count=nrecvs, i, n, ierr, row, col;
190   Scalar       *values,val;
191   InsertMode   addv = mdn->insertmode;
192 
193   /*  wait on receives */
194   while (count) {
195     MPI_Waitany(nrecvs,mdn->recv_waits,&imdex,&recv_status);
196     /* unpack receives into our local space */
197     values = mdn->rvalues + 3*imdex*mdn->rmax;
198     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
199     n = n/3;
200     for ( i=0; i<n; i++ ) {
201       row = (int) PetscReal(values[3*i]) - mdn->rstart;
202       col = (int) PetscReal(values[3*i+1]);
203       val = values[3*i+2];
204       if (col >= 0 && col < mdn->N) {
205         MatSetValues(mdn->A,1,&row,1,&col,&val,addv);
206       }
207       else {SETERRQ(1,"MatAssemblyEnd_MPIDense:Invalid column");}
208     }
209     count--;
210   }
211   PetscFree(mdn->recv_waits); PetscFree(mdn->rvalues);
212 
213   /* wait on sends */
214   if (mdn->nsends) {
215     send_status = (MPI_Status *) PetscMalloc(mdn->nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
216     MPI_Waitall(mdn->nsends,mdn->send_waits,send_status);
217     PetscFree(send_status);
218   }
219   PetscFree(mdn->send_waits); PetscFree(mdn->svalues);
220 
221   mdn->insertmode = NOT_SET_VALUES;
222   ierr = MatAssemblyBegin(mdn->A,mode); CHKERRQ(ierr);
223   ierr = MatAssemblyEnd(mdn->A,mode); CHKERRQ(ierr);
224 
225   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
226     ierr = MatSetUpMultiply_MPIDense(mat); CHKERRQ(ierr);
227   }
228   return 0;
229 }
230 
231 static int MatZeroEntries_MPIDense(Mat A)
232 {
233   Mat_MPIDense *l = (Mat_MPIDense *) A->data;
234   return MatZeroEntries(l->A);
235 }
236 
237 static int MatGetBlockSize_MPIDense(Mat A,int *bs)
238 {
239   *bs = 1;
240   return 0;
241 }
242 
243 /* the code does not do the diagonal entries correctly unless the
244    matrix is square and the column and row owerships are identical.
245    This is a BUG. The only way to fix it seems to be to access
246    mdn->A and mdn->B directly and not through the MatZeroRows()
247    routine.
248 */
249 static int MatZeroRows_MPIDense(Mat A,IS is,Scalar *diag)
250 {
251   Mat_MPIDense   *l = (Mat_MPIDense *) A->data;
252   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
253   int            *procs,*nprocs,j,found,idx,nsends,*work;
254   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
255   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
256   int            *lens,imdex,*lrows,*values;
257   MPI_Comm       comm = A->comm;
258   MPI_Request    *send_waits,*recv_waits;
259   MPI_Status     recv_status,*send_status;
260   IS             istmp;
261 
262   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
263   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
264 
265   /*  first count number of contributors to each processor */
266   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
267   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
268   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
269   for ( i=0; i<N; i++ ) {
270     idx = rows[i];
271     found = 0;
272     for ( j=0; j<size; j++ ) {
273       if (idx >= owners[j] && idx < owners[j+1]) {
274         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
275       }
276     }
277     if (!found) SETERRQ(1,"MatZeroRows_MPIDense:Index out of range");
278   }
279   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
280 
281   /* inform other processors of number of messages and max length*/
282   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
283   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
284   nrecvs = work[rank];
285   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
286   nmax = work[rank];
287   PetscFree(work);
288 
289   /* post receives:   */
290   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
291   CHKPTRQ(rvalues);
292   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
293   CHKPTRQ(recv_waits);
294   for ( i=0; i<nrecvs; i++ ) {
295     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
296   }
297 
298   /* do sends:
299       1) starts[i] gives the starting index in svalues for stuff going to
300          the ith processor
301   */
302   svalues    = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
303   send_waits = (MPI_Request *) PetscMalloc((nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
304   starts     = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
305   starts[0]  = 0;
306   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
307   for ( i=0; i<N; i++ ) {
308     svalues[starts[owner[i]]++] = rows[i];
309   }
310   ISRestoreIndices(is,&rows);
311 
312   starts[0] = 0;
313   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
314   count = 0;
315   for ( i=0; i<size; i++ ) {
316     if (procs[i]) {
317       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
318     }
319   }
320   PetscFree(starts);
321 
322   base = owners[rank];
323 
324   /*  wait on receives */
325   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
326   source = lens + nrecvs;
327   count  = nrecvs; slen = 0;
328   while (count) {
329     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
330     /* unpack receives into our local space */
331     MPI_Get_count(&recv_status,MPI_INT,&n);
332     source[imdex]  = recv_status.MPI_SOURCE;
333     lens[imdex]  = n;
334     slen += n;
335     count--;
336   }
337   PetscFree(recv_waits);
338 
339   /* move the data into the send scatter */
340   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
341   count = 0;
342   for ( i=0; i<nrecvs; i++ ) {
343     values = rvalues + i*nmax;
344     for ( j=0; j<lens[i]; j++ ) {
345       lrows[count++] = values[j] - base;
346     }
347   }
348   PetscFree(rvalues); PetscFree(lens);
349   PetscFree(owner); PetscFree(nprocs);
350 
351   /* actually zap the local rows */
352   ierr = ISCreateGeneral(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
353   PLogObjectParent(A,istmp);
354   PetscFree(lrows);
355   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
356   ierr = ISDestroy(istmp); CHKERRQ(ierr);
357 
358   /* wait on sends */
359   if (nsends) {
360     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
361     MPI_Waitall(nsends,send_waits,send_status);
362     PetscFree(send_status);
363   }
364   PetscFree(send_waits); PetscFree(svalues);
365 
366   return 0;
367 }
368 
369 static int MatMult_MPIDense(Mat mat,Vec xx,Vec yy)
370 {
371   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
372   int          ierr;
373 
374   ierr = VecScatterBegin(xx,mdn->lvec,INSERT_VALUES,SCATTER_FORWARD,mdn->Mvctx);CHKERRQ(ierr);
375   ierr = VecScatterEnd(xx,mdn->lvec,INSERT_VALUES,SCATTER_FORWARD,mdn->Mvctx);CHKERRQ(ierr);
376   ierr = MatMult_SeqDense(mdn->A,mdn->lvec,yy); CHKERRQ(ierr);
377   return 0;
378 }
379 
380 static int MatMultAdd_MPIDense(Mat mat,Vec xx,Vec yy,Vec zz)
381 {
382   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
383   int          ierr;
384 
385   ierr = VecScatterBegin(xx,mdn->lvec,INSERT_VALUES,SCATTER_FORWARD,mdn->Mvctx);CHKERRQ(ierr);
386   ierr = VecScatterEnd(xx,mdn->lvec,INSERT_VALUES,SCATTER_FORWARD,mdn->Mvctx);CHKERRQ(ierr);
387   ierr = MatMultAdd_SeqDense(mdn->A,mdn->lvec,yy,zz); CHKERRQ(ierr);
388   return 0;
389 }
390 
391 static int MatMultTrans_MPIDense(Mat A,Vec xx,Vec yy)
392 {
393   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
394   int          ierr;
395   Scalar       zero = 0.0;
396 
397   ierr = VecSet(&zero,yy); CHKERRQ(ierr);
398   ierr = MatMultTrans_SeqDense(a->A,xx,a->lvec); CHKERRQ(ierr);
399   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
400   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
401   return 0;
402 }
403 
404 static int MatMultTransAdd_MPIDense(Mat A,Vec xx,Vec yy,Vec zz)
405 {
406   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
407   int          ierr;
408 
409   ierr = VecCopy(yy,zz); CHKERRQ(ierr);
410   ierr = MatMultTrans_SeqDense(a->A,xx,a->lvec); CHKERRQ(ierr);
411   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
412   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
413   return 0;
414 }
415 
416 static int MatGetDiagonal_MPIDense(Mat A,Vec v)
417 {
418   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
419   Mat_SeqDense *aloc = (Mat_SeqDense *) a->A->data;
420   int          ierr, len, i, n, m = a->m, radd;
421   Scalar       *x, zero = 0.0;
422 
423   VecSet(&zero,v);
424   ierr = VecGetArray(v,&x); CHKERRQ(ierr);
425   ierr = VecGetSize(v,&n); CHKERRQ(ierr);
426   if (n != a->M) SETERRQ(1,"MatGetDiagonal_SeqDense:Nonconforming mat and vec");
427   len = PetscMin(aloc->m,aloc->n);
428   radd = a->rstart*m;
429   for ( i=0; i<len; i++ ) {
430     x[i] = aloc->v[radd + i*m + i];
431   }
432   return 0;
433 }
434 
435 static int MatDestroy_MPIDense(PetscObject obj)
436 {
437   Mat          mat = (Mat) obj;
438   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
439   int          ierr;
440 
441 #if defined(PETSC_LOG)
442   PLogObjectState(obj,"Rows=%d, Cols=%d",mdn->M,mdn->N);
443 #endif
444   PetscFree(mdn->rowners);
445   ierr = MatDestroy(mdn->A); CHKERRQ(ierr);
446   if (mdn->lvec)   VecDestroy(mdn->lvec);
447   if (mdn->Mvctx)  VecScatterDestroy(mdn->Mvctx);
448   if (mdn->factor) {
449     if (mdn->factor->temp)   PetscFree(mdn->factor->temp);
450     if (mdn->factor->tag)    PetscFree(mdn->factor->tag);
451     if (mdn->factor->pivots) PetscFree(mdn->factor->pivots);
452     PetscFree(mdn->factor);
453   }
454   PetscFree(mdn);
455   if (mat->mapping) {
456     ierr = ISLocalToGlobalMappingDestroy(mat->mapping); CHKERRQ(ierr);
457   }
458   PLogObjectDestroy(mat);
459   PetscHeaderDestroy(mat);
460   return 0;
461 }
462 
463 #include "pinclude/pviewer.h"
464 
465 static int MatView_MPIDense_Binary(Mat mat,Viewer viewer)
466 {
467   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
468   int          ierr;
469 
470   if (mdn->size == 1) {
471     ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
472   }
473   else SETERRQ(1,"MatView_MPIDense_Binary:Only uniprocessor output supported");
474   return 0;
475 }
476 
477 static int MatView_MPIDense_ASCII(Mat mat,Viewer viewer)
478 {
479   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
480   int          ierr, format;
481   FILE         *fd;
482   ViewerType   vtype;
483 
484   ViewerGetType(viewer,&vtype);
485   ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
486   ierr = ViewerGetFormat(viewer,&format);
487   if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
488     int rank;
489     MatInfo info;
490     MPI_Comm_rank(mat->comm,&rank);
491     ierr = MatGetInfo(mat,MAT_LOCAL,&info);
492     PetscSequentialPhaseBegin(mat->comm,1);
493       fprintf(fd,"  [%d] local rows %d nz %d nz alloced %d mem %d \n",rank,mdn->m,
494          (int)info.nz_used,(int)info.nz_allocated,(int)info.memory);
495       fflush(fd);
496     PetscSequentialPhaseEnd(mat->comm,1);
497     ierr = VecScatterView(mdn->Mvctx,viewer); CHKERRQ(ierr);
498     return 0;
499   }
500   else if (format == VIEWER_FORMAT_ASCII_INFO) {
501     return 0;
502   }
503   if (vtype == ASCII_FILE_VIEWER) {
504     PetscSequentialPhaseBegin(mat->comm,1);
505     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d\n",
506              mdn->rank,mdn->m,mdn->rstart,mdn->rend,mdn->n);
507     ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
508     fflush(fd);
509     PetscSequentialPhaseEnd(mat->comm,1);
510   }
511   else {
512     int size = mdn->size, rank = mdn->rank;
513     if (size == 1) {
514       ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
515     }
516     else {
517       /* assemble the entire matrix onto first processor. */
518       Mat          A;
519       int          M = mdn->M, N = mdn->N,m,row,i, nz, *cols;
520       Scalar       *vals;
521       Mat_SeqDense *Amdn = (Mat_SeqDense*) mdn->A->data;
522 
523       if (!rank) {
524         ierr = MatCreateMPIDense(mat->comm,M,M,N,N,PETSC_NULL,&A); CHKERRQ(ierr);
525       }
526       else {
527         ierr = MatCreateMPIDense(mat->comm,0,M,N,N,PETSC_NULL,&A); CHKERRQ(ierr);
528       }
529       PLogObjectParent(mat,A);
530 
531       /* Copy the matrix ... This isn't the most efficient means,
532          but it's quick for now */
533       row = mdn->rstart; m = Amdn->m;
534       for ( i=0; i<m; i++ ) {
535         ierr = MatGetRow(mat,row,&nz,&cols,&vals); CHKERRQ(ierr);
536         ierr = MatSetValues(A,1,&row,nz,cols,vals,INSERT_VALUES); CHKERRQ(ierr);
537         ierr = MatRestoreRow(mat,row,&nz,&cols,&vals); CHKERRQ(ierr);
538         row++;
539       }
540 
541       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
542       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
543       if (!rank) {
544         ierr = MatView(((Mat_MPIDense*)(A->data))->A,viewer); CHKERRQ(ierr);
545       }
546       ierr = MatDestroy(A); CHKERRQ(ierr);
547     }
548   }
549   return 0;
550 }
551 
552 static int MatView_MPIDense(PetscObject obj,Viewer viewer)
553 {
554   Mat          mat = (Mat) obj;
555   int          ierr;
556   ViewerType   vtype;
557 
558   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
559   if (vtype == ASCII_FILE_VIEWER) {
560     ierr = MatView_MPIDense_ASCII(mat,viewer); CHKERRQ(ierr);
561   }
562   else if (vtype == ASCII_FILES_VIEWER) {
563     ierr = MatView_MPIDense_ASCII(mat,viewer); CHKERRQ(ierr);
564   }
565   else if (vtype == BINARY_FILE_VIEWER) {
566     return MatView_MPIDense_Binary(mat,viewer);
567   }
568   return 0;
569 }
570 
571 static int MatGetInfo_MPIDense(Mat A,MatInfoType flag,MatInfo *info)
572 {
573   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
574   Mat          mdn = mat->A;
575   int          ierr;
576   double       isend[5], irecv[5];
577 
578   info->rows_global    = (double)mat->M;
579   info->columns_global = (double)mat->N;
580   info->rows_local     = (double)mat->m;
581   info->columns_local  = (double)mat->N;
582   info->block_size     = 1.0;
583   ierr = MatGetInfo(mdn,MAT_LOCAL,info); CHKERRQ(ierr);
584   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->nz_unneeded;
585   isend[3] = info->memory;  isend[4] = info->mallocs;
586   if (flag == MAT_LOCAL) {
587     info->nz_used      = isend[0];
588     info->nz_allocated = isend[1];
589     info->nz_unneeded  = isend[2];
590     info->memory       = isend[3];
591     info->mallocs      = isend[4];
592   } else if (flag == MAT_GLOBAL_MAX) {
593     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_MAX,A->comm);
594     info->nz_used      = irecv[0];
595     info->nz_allocated = irecv[1];
596     info->nz_unneeded  = irecv[2];
597     info->memory       = irecv[3];
598     info->mallocs      = irecv[4];
599   } else if (flag == MAT_GLOBAL_SUM) {
600     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_SUM,A->comm);
601     info->nz_used      = irecv[0];
602     info->nz_allocated = irecv[1];
603     info->nz_unneeded  = irecv[2];
604     info->memory       = irecv[3];
605     info->mallocs      = irecv[4];
606   }
607   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
608   info->fill_ratio_needed = 0;
609   info->factor_mallocs    = 0;
610   return 0;
611 }
612 
613 /* extern int MatLUFactorSymbolic_MPIDense(Mat,IS,IS,double,Mat*);
614    extern int MatLUFactorNumeric_MPIDense(Mat,Mat*);
615    extern int MatLUFactor_MPIDense(Mat,IS,IS,double);
616    extern int MatSolve_MPIDense(Mat,Vec,Vec);
617    extern int MatSolveAdd_MPIDense(Mat,Vec,Vec,Vec);
618    extern int MatSolveTrans_MPIDense(Mat,Vec,Vec);
619    extern int MatSolveTransAdd_MPIDense(Mat,Vec,Vec,Vec); */
620 
621 static int MatSetOption_MPIDense(Mat A,MatOption op)
622 {
623   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
624 
625   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
626       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
627       op == MAT_COLUMNS_SORTED ||
628       op == MAT_COLUMNS_UNSORTED) {
629         MatSetOption(a->A,op);
630   } else if (op == MAT_ROW_ORIENTED) {
631         a->roworiented = 1;
632         MatSetOption(a->A,op);
633   } else if (op == MAT_ROWS_SORTED ||
634              op == MAT_ROWS_UNSORTED ||
635              op == MAT_SYMMETRIC ||
636              op == MAT_STRUCTURALLY_SYMMETRIC ||
637              op == MAT_YES_NEW_DIAGONALS)
638     PLogInfo(A,"Info:MatSetOption_MPIDense:Option ignored\n");
639   else if (op == MAT_COLUMN_ORIENTED)
640     {a->roworiented = 0; MatSetOption(a->A,op);}
641   else if (op == MAT_NO_NEW_DIAGONALS)
642     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIDense:MAT_NO_NEW_DIAGONALS");}
643   else
644     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIDense:unknown option");}
645   return 0;
646 }
647 
648 static int MatGetSize_MPIDense(Mat A,int *m,int *n)
649 {
650   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
651   *m = mat->M; *n = mat->N;
652   return 0;
653 }
654 
655 static int MatGetLocalSize_MPIDense(Mat A,int *m,int *n)
656 {
657   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
658   *m = mat->m; *n = mat->N;
659   return 0;
660 }
661 
662 static int MatGetOwnershipRange_MPIDense(Mat A,int *m,int *n)
663 {
664   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
665   *m = mat->rstart; *n = mat->rend;
666   return 0;
667 }
668 
669 static int MatGetRow_MPIDense(Mat A,int row,int *nz,int **idx,Scalar **v)
670 {
671   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
672   int          lrow, rstart = mat->rstart, rend = mat->rend;
673 
674   if (row < rstart || row >= rend) SETERRQ(1,"MatGetRow_MPIDense:only local rows")
675   lrow = row - rstart;
676   return MatGetRow(mat->A,lrow,nz,idx,v);
677 }
678 
679 static int MatRestoreRow_MPIDense(Mat mat,int row,int *nz,int **idx,Scalar **v)
680 {
681   if (idx) PetscFree(*idx);
682   if (v) PetscFree(*v);
683   return 0;
684 }
685 
686 static int MatNorm_MPIDense(Mat A,NormType type,double *norm)
687 {
688   Mat_MPIDense *mdn = (Mat_MPIDense *) A->data;
689   Mat_SeqDense *mat = (Mat_SeqDense*) mdn->A->data;
690   int          ierr, i, j;
691   double       sum = 0.0;
692   Scalar       *v = mat->v;
693 
694   if (mdn->size == 1) {
695     ierr =  MatNorm(mdn->A,type,norm); CHKERRQ(ierr);
696   } else {
697     if (type == NORM_FROBENIUS) {
698       for (i=0; i<mat->n*mat->m; i++ ) {
699 #if defined(PETSC_COMPLEX)
700         sum += real(conj(*v)*(*v)); v++;
701 #else
702         sum += (*v)*(*v); v++;
703 #endif
704       }
705       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,A->comm);
706       *norm = sqrt(*norm);
707       PLogFlops(2*mat->n*mat->m);
708     }
709     else if (type == NORM_1) {
710       double *tmp, *tmp2;
711       tmp  = (double *) PetscMalloc( 2*mdn->N*sizeof(double) ); CHKPTRQ(tmp);
712       tmp2 = tmp + mdn->N;
713       PetscMemzero(tmp,2*mdn->N*sizeof(double));
714       *norm = 0.0;
715       v = mat->v;
716       for ( j=0; j<mat->n; j++ ) {
717         for ( i=0; i<mat->m; i++ ) {
718           tmp[j] += PetscAbsScalar(*v);  v++;
719         }
720       }
721       MPI_Allreduce(tmp,tmp2,mdn->N,MPI_DOUBLE,MPI_SUM,A->comm);
722       for ( j=0; j<mdn->N; j++ ) {
723         if (tmp2[j] > *norm) *norm = tmp2[j];
724       }
725       PetscFree(tmp);
726       PLogFlops(mat->n*mat->m);
727     }
728     else if (type == NORM_INFINITY) { /* max row norm */
729       double ntemp;
730       ierr = MatNorm(mdn->A,type,&ntemp); CHKERRQ(ierr);
731       MPI_Allreduce(&ntemp,norm,1,MPI_DOUBLE,MPI_MAX,A->comm);
732     }
733     else {
734       SETERRQ(1,"MatNorm_MPIDense:No support for two norm");
735     }
736   }
737   return 0;
738 }
739 
740 static int MatTranspose_MPIDense(Mat A,Mat *matout)
741 {
742   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
743   Mat_SeqDense *Aloc = (Mat_SeqDense *) a->A->data;
744   Mat          B;
745   int          M = a->M, N = a->N, m, n, *rwork, rstart = a->rstart;
746   int          j, i, ierr;
747   Scalar       *v;
748 
749   if (matout == PETSC_NULL && M != N) {
750     SETERRQ(1,"MatTranspose_MPIDense:Supports square matrix only in-place");
751   }
752   ierr = MatCreateMPIDense(A->comm,PETSC_DECIDE,PETSC_DECIDE,N,M,PETSC_NULL,&B);CHKERRQ(ierr);
753 
754   m = Aloc->m; n = Aloc->n; v = Aloc->v;
755   rwork = (int *) PetscMalloc(n*sizeof(int)); CHKPTRQ(rwork);
756   for ( j=0; j<n; j++ ) {
757     for (i=0; i<m; i++) rwork[i] = rstart + i;
758     ierr = MatSetValues(B,1,&j,m,rwork,v,INSERT_VALUES); CHKERRQ(ierr);
759     v   += m;
760   }
761   PetscFree(rwork);
762   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
763   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
764   if (matout != PETSC_NULL) {
765     *matout = B;
766   } else {
767     /* This isn't really an in-place transpose, but free data struct from a */
768     PetscFree(a->rowners);
769     ierr = MatDestroy(a->A); CHKERRQ(ierr);
770     if (a->lvec) VecDestroy(a->lvec);
771     if (a->Mvctx) VecScatterDestroy(a->Mvctx);
772     PetscFree(a);
773     PetscMemcpy(A,B,sizeof(struct _Mat));
774     PetscHeaderDestroy(B);
775   }
776   return 0;
777 }
778 
779 #include "pinclude/plapack.h"
780 static int MatScale_MPIDense(Scalar *alpha,Mat inA)
781 {
782   Mat_MPIDense *A = (Mat_MPIDense *) inA->data;
783   Mat_SeqDense *a = (Mat_SeqDense *) A->A->data;
784   int          one = 1, nz;
785 
786   nz = a->m*a->n;
787   BLscal_( &nz, alpha, a->v, &one );
788   PLogFlops(nz);
789   return 0;
790 }
791 
792 static int MatConvertSameType_MPIDense(Mat,Mat *,int);
793 
794 /* -------------------------------------------------------------------*/
795 static struct _MatOps MatOps = {MatSetValues_MPIDense,
796        MatGetRow_MPIDense,MatRestoreRow_MPIDense,
797        MatMult_MPIDense,MatMultAdd_MPIDense,
798        MatMultTrans_MPIDense,MatMultTransAdd_MPIDense,
799 /*       MatSolve_MPIDense,0, */
800        0,0,
801        0,0,
802        0,0,
803 /*       MatLUFactor_MPIDense,0, */
804        0,MatTranspose_MPIDense,
805        MatGetInfo_MPIDense,0,
806        MatGetDiagonal_MPIDense,0,MatNorm_MPIDense,
807        MatAssemblyBegin_MPIDense,MatAssemblyEnd_MPIDense,
808        0,
809        MatSetOption_MPIDense,MatZeroEntries_MPIDense,MatZeroRows_MPIDense,
810        0,0,
811 /*       0,MatLUFactorSymbolic_MPIDense,MatLUFactorNumeric_MPIDense, */
812        0,0,
813        MatGetSize_MPIDense,MatGetLocalSize_MPIDense,
814        MatGetOwnershipRange_MPIDense,
815        0,0, MatGetArray_MPIDense, MatRestoreArray_MPIDense,
816        0,MatConvertSameType_MPIDense,
817        0,0,0,0,0,
818        0,0,MatGetValues_MPIDense,0,0,MatScale_MPIDense,
819        0,0,0,MatGetBlockSize_MPIDense};
820 
821 /*@C
822    MatCreateMPIDense - Creates a sparse parallel matrix in dense format.
823 
824    Input Parameters:
825 .  comm - MPI communicator
826 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
827 .  n - number of local columns (or PETSC_DECIDE to have calculated
828            if N is given)
829 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
830 .  N - number of global columns (or PETSC_DECIDE to have calculated
831            if n is given)
832 .  data - optional location of matrix data.  Set data=PETSC_NULL for PETSc
833    to control all matrix memory allocation.
834 
835    Output Parameter:
836 .  A - the matrix
837 
838    Notes:
839    The dense format is fully compatible with standard Fortran 77
840    storage by columns.
841 
842    The data input variable is intended primarily for Fortran programmers
843    who wish to allocate their own matrix memory space.  Most users should
844    set data=PETSC_NULL.
845 
846    The user MUST specify either the local or global matrix dimensions
847    (possibly both).
848 
849    Currently, the only parallel dense matrix decomposition is by rows,
850    so that n=N and each submatrix owns all of the global columns.
851 
852 .keywords: matrix, dense, parallel
853 
854 .seealso: MatCreate(), MatCreateSeqDense(), MatSetValues()
855 @*/
856 int MatCreateMPIDense(MPI_Comm comm,int m,int n,int M,int N,Scalar *data,Mat *A)
857 {
858   Mat          mat;
859   Mat_MPIDense *a;
860   int          ierr, i,flg;
861 
862   /* Note:  For now, when data is specified above, this assumes the user correctly
863    allocates the local dense storage space.  We should add error checking. */
864 
865   *A = 0;
866   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIDENSE,comm);
867   PLogObjectCreate(mat);
868   mat->data       = (void *) (a = PetscNew(Mat_MPIDense)); CHKPTRQ(a);
869   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
870   mat->destroy    = MatDestroy_MPIDense;
871   mat->view       = MatView_MPIDense;
872   mat->factor     = 0;
873   mat->mapping    = 0;
874 
875   a->factor       = 0;
876   a->insertmode   = NOT_SET_VALUES;
877   MPI_Comm_rank(comm,&a->rank);
878   MPI_Comm_size(comm,&a->size);
879 
880   if (M == PETSC_DECIDE) MPI_Allreduce(&m,&M,1,MPI_INT,MPI_SUM,comm);
881   if (m == PETSC_DECIDE) {m = M/a->size + ((M % a->size) > a->rank);}
882 
883   /* each row stores all columns */
884   if (N == PETSC_DECIDE) N = n;
885   if (n == PETSC_DECIDE) {n = N/a->size + ((N % a->size) > a->rank);}
886   /*  if (n != N) SETERRQ(1,"MatCreateMPIDense:For now, only n=N is supported"); */
887   a->N = mat->N = N;
888   a->M = mat->M = M;
889   a->m = mat->m = m;
890   a->n = mat->n = n;
891 
892   /* build local table of row and column ownerships */
893   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
894   a->cowners = a->rowners + a->size + 1;
895   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIDense));
896   MPI_Allgather(&m,1,MPI_INT,a->rowners+1,1,MPI_INT,comm);
897   a->rowners[0] = 0;
898   for ( i=2; i<=a->size; i++ ) {
899     a->rowners[i] += a->rowners[i-1];
900   }
901   a->rstart = a->rowners[a->rank];
902   a->rend   = a->rowners[a->rank+1];
903   MPI_Allgather(&n,1,MPI_INT,a->cowners+1,1,MPI_INT,comm);
904   a->cowners[0] = 0;
905   for ( i=2; i<=a->size; i++ ) {
906     a->cowners[i] += a->cowners[i-1];
907   }
908 
909   ierr = MatCreateSeqDense(MPI_COMM_SELF,m,N,data,&a->A); CHKERRQ(ierr);
910   PLogObjectParent(mat,a->A);
911 
912   /* build cache for off array entries formed */
913   ierr = StashBuild_Private(&a->stash); CHKERRQ(ierr);
914 
915   /* stuff used for matrix vector multiply */
916   a->lvec        = 0;
917   a->Mvctx       = 0;
918   a->roworiented = 1;
919 
920   *A = mat;
921   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
922   if (flg) {
923     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
924   }
925   return 0;
926 }
927 
928 static int MatConvertSameType_MPIDense(Mat A,Mat *newmat,int cpvalues)
929 {
930   Mat          mat;
931   Mat_MPIDense *a,*oldmat = (Mat_MPIDense *) A->data;
932   int          ierr;
933   FactorCtx    *factor;
934 
935   *newmat       = 0;
936   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIDENSE,A->comm);
937   PLogObjectCreate(mat);
938   mat->data      = (void *) (a = PetscNew(Mat_MPIDense)); CHKPTRQ(a);
939   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
940   mat->destroy   = MatDestroy_MPIDense;
941   mat->view      = MatView_MPIDense;
942   mat->factor    = A->factor;
943   mat->assembled = PETSC_TRUE;
944 
945   a->m = mat->m = oldmat->m;
946   a->n = mat->n = oldmat->n;
947   a->M = mat->M = oldmat->M;
948   a->N = mat->N = oldmat->N;
949   if (oldmat->factor) {
950     a->factor = (FactorCtx *) (factor = PetscNew(FactorCtx)); CHKPTRQ(factor);
951     /* copy factor contents ... add this code! */
952   } else a->factor = 0;
953 
954   a->rstart     = oldmat->rstart;
955   a->rend       = oldmat->rend;
956   a->size       = oldmat->size;
957   a->rank       = oldmat->rank;
958   a->insertmode = NOT_SET_VALUES;
959 
960   a->rowners = (int *) PetscMalloc((a->size+1)*sizeof(int)); CHKPTRQ(a->rowners);
961   PLogObjectMemory(mat,(a->size+1)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIDense));
962   PetscMemcpy(a->rowners,oldmat->rowners,(a->size+1)*sizeof(int));
963   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
964 
965   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
966   PLogObjectParent(mat,a->lvec);
967   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
968   PLogObjectParent(mat,a->Mvctx);
969   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
970   PLogObjectParent(mat,a->A);
971   *newmat = mat;
972   return 0;
973 }
974 
975 #include "sys.h"
976 
977 int MatLoad_MPIDense_DenseInFile(MPI_Comm comm,int fd,int M, int N, Mat *newmat)
978 {
979   int        *rowners, i,size,rank,m,rstart,rend,ierr,nz,j;
980   Scalar     *array,*vals,*vals_ptr;
981   MPI_Status status;
982 
983   MPI_Comm_rank(comm,&rank);
984   MPI_Comm_size(comm,&size);
985 
986   /* determine ownership of all rows */
987   m = M/size + ((M % size) > rank);
988   rowners = (int *) PetscMalloc((size+2)*sizeof(int)); CHKPTRQ(rowners);
989   MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);
990   rowners[0] = 0;
991   for ( i=2; i<=size; i++ ) {
992     rowners[i] += rowners[i-1];
993   }
994   rstart = rowners[rank];
995   rend   = rowners[rank+1];
996 
997   ierr = MatCreateMPIDense(comm,m,PETSC_DECIDE,M,N,PETSC_NULL,newmat);CHKERRQ(ierr);
998   ierr = MatGetArray(*newmat,&array); CHKERRQ(ierr);
999 
1000   if (!rank) {
1001     vals = (Scalar *) PetscMalloc( m*N*sizeof(Scalar) ); CHKPTRQ(vals);
1002 
1003     /* read in my part of the matrix numerical values  */
1004     ierr = PetscBinaryRead(fd,vals,m*N,BINARY_SCALAR); CHKERRQ(ierr);
1005 
1006     /* insert into matrix-by row (this is why cannot directly read into array */
1007     vals_ptr = vals;
1008     for ( i=0; i<m; i++ ) {
1009       for ( j=0; j<N; j++ ) {
1010         array[i + j*m] = *vals_ptr++;
1011       }
1012     }
1013 
1014     /* read in other processors and ship out */
1015     for ( i=1; i<size; i++ ) {
1016       nz   = (rowners[i+1] - rowners[i])*N;
1017       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1018       MPI_Send(vals,nz,MPIU_SCALAR,i,(*newmat)->tag,comm);
1019     }
1020   }
1021   else {
1022     /* receive numeric values */
1023     vals = (Scalar*) PetscMalloc( m*N*sizeof(Scalar) ); CHKPTRQ(vals);
1024 
1025     /* receive message of values*/
1026     MPI_Recv(vals,m*N,MPIU_SCALAR,0,(*newmat)->tag,comm,&status);
1027 
1028     /* insert into matrix-by row (this is why cannot directly read into array */
1029     vals_ptr = vals;
1030     for ( i=0; i<m; i++ ) {
1031       for ( j=0; j<N; j++ ) {
1032         array[i + j*m] = *vals_ptr++;
1033       }
1034     }
1035   }
1036   PetscFree(rowners);
1037   PetscFree(vals);
1038   ierr = MatAssemblyBegin(*newmat,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1039   ierr = MatAssemblyEnd(*newmat,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1040   return 0;
1041 }
1042 
1043 
1044 int MatLoad_MPIDense(Viewer viewer,MatType type,Mat *newmat)
1045 {
1046   Mat          A;
1047   int          i, nz, ierr, j,rstart, rend, fd;
1048   Scalar       *vals,*svals;
1049   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1050   MPI_Status   status;
1051   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,maxnz,*cols;
1052   int          *ourlens,*sndcounts = 0,*procsnz = 0, *offlens,jj,*mycols,*smycols;
1053   int          tag = ((PetscObject)viewer)->tag;
1054 
1055   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1056   if (!rank) {
1057     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1058     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
1059     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIDenseorMPIRow:not matrix object");
1060   }
1061 
1062   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1063   M = header[1]; N = header[2]; nz = header[3];
1064 
1065   /*
1066        Handle case where matrix is stored on disk as a dense matrix
1067   */
1068   if (nz == MATRIX_BINARY_FORMAT_DENSE) {
1069     return MatLoad_MPIDense_DenseInFile(comm,fd,M,N,newmat);
1070   }
1071 
1072   /* determine ownership of all rows */
1073   m = M/size + ((M % size) > rank);
1074   rowners = (int *) PetscMalloc((size+2)*sizeof(int)); CHKPTRQ(rowners);
1075   MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1076   rowners[0] = 0;
1077   for ( i=2; i<=size; i++ ) {
1078     rowners[i] += rowners[i-1];
1079   }
1080   rstart = rowners[rank];
1081   rend   = rowners[rank+1];
1082 
1083   /* distribute row lengths to all processors */
1084   ourlens = (int*) PetscMalloc( 2*(rend-rstart)*sizeof(int) ); CHKPTRQ(ourlens);
1085   offlens = ourlens + (rend-rstart);
1086   if (!rank) {
1087     rowlengths = (int*) PetscMalloc( M*sizeof(int) ); CHKPTRQ(rowlengths);
1088     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1089     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1090     for ( i=0; i<size; i++ ) sndcounts[i] = rowners[i+1] - rowners[i];
1091     MPI_Scatterv(rowlengths,sndcounts,rowners,MPI_INT,ourlens,rend-rstart,MPI_INT,0,comm);
1092     PetscFree(sndcounts);
1093   }
1094   else {
1095     MPI_Scatterv(0,0,0,MPI_INT,ourlens,rend-rstart,MPI_INT, 0,comm);
1096   }
1097 
1098   if (!rank) {
1099     /* calculate the number of nonzeros on each processor */
1100     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1101     PetscMemzero(procsnz,size*sizeof(int));
1102     for ( i=0; i<size; i++ ) {
1103       for ( j=rowners[i]; j< rowners[i+1]; j++ ) {
1104         procsnz[i] += rowlengths[j];
1105       }
1106     }
1107     PetscFree(rowlengths);
1108 
1109     /* determine max buffer needed and allocate it */
1110     maxnz = 0;
1111     for ( i=0; i<size; i++ ) {
1112       maxnz = PetscMax(maxnz,procsnz[i]);
1113     }
1114     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1115 
1116     /* read in my part of the matrix column indices  */
1117     nz = procsnz[0];
1118     mycols = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1119     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1120 
1121     /* read in every one elses and ship off */
1122     for ( i=1; i<size; i++ ) {
1123       nz = procsnz[i];
1124       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1125       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1126     }
1127     PetscFree(cols);
1128   }
1129   else {
1130     /* determine buffer space needed for message */
1131     nz = 0;
1132     for ( i=0; i<m; i++ ) {
1133       nz += ourlens[i];
1134     }
1135     mycols = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1136 
1137     /* receive message of column indices*/
1138     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1139     MPI_Get_count(&status,MPI_INT,&maxnz);
1140     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIDenseorMPIRow:something is wrong with file");
1141   }
1142 
1143   /* loop over local rows, determining number of off diagonal entries */
1144   PetscMemzero(offlens,m*sizeof(int));
1145   jj = 0;
1146   for ( i=0; i<m; i++ ) {
1147     for ( j=0; j<ourlens[i]; j++ ) {
1148       if (mycols[jj] < rstart || mycols[jj] >= rend) offlens[i]++;
1149       jj++;
1150     }
1151   }
1152 
1153   /* create our matrix */
1154   for ( i=0; i<m; i++ ) {
1155     ourlens[i] -= offlens[i];
1156   }
1157   ierr = MatCreateMPIDense(comm,m,PETSC_DECIDE,M,N,PETSC_NULL,newmat);CHKERRQ(ierr);
1158   A = *newmat;
1159   for ( i=0; i<m; i++ ) {
1160     ourlens[i] += offlens[i];
1161   }
1162 
1163   if (!rank) {
1164     vals = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(vals);
1165 
1166     /* read in my part of the matrix numerical values  */
1167     nz = procsnz[0];
1168     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1169 
1170     /* insert into matrix */
1171     jj      = rstart;
1172     smycols = mycols;
1173     svals   = vals;
1174     for ( i=0; i<m; i++ ) {
1175       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1176       smycols += ourlens[i];
1177       svals   += ourlens[i];
1178       jj++;
1179     }
1180 
1181     /* read in other processors and ship out */
1182     for ( i=1; i<size; i++ ) {
1183       nz = procsnz[i];
1184       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1185       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1186     }
1187     PetscFree(procsnz);
1188   }
1189   else {
1190     /* receive numeric values */
1191     vals = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(vals);
1192 
1193     /* receive message of values*/
1194     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1195     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1196     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIDenseorMPIRow:something is wrong with file");
1197 
1198     /* insert into matrix */
1199     jj      = rstart;
1200     smycols = mycols;
1201     svals   = vals;
1202     for ( i=0; i<m; i++ ) {
1203       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1204       smycols += ourlens[i];
1205       svals   += ourlens[i];
1206       jj++;
1207     }
1208   }
1209   PetscFree(ourlens); PetscFree(vals); PetscFree(mycols); PetscFree(rowners);
1210 
1211   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1212   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1213   return 0;
1214 }
1215 
1216 
1217 
1218 
1219 
1220