xref: /petsc/src/mat/impls/dense/mpi/mpidense.c (revision 8c4694699a036658cda99e99adbdf64e3eeac914)
1 #ifndef lint
2 static char vcid[] = "$Id: mpidense.c,v 1.22 1996/01/01 22:32:54 curfman Exp curfman $";
3 #endif
4 
5 /*
6    Basic functions for basic parallel dense matrices.
7 */
8 
9 #include "mpidense.h"
10 #include "vec/vecimpl.h"
11 #include "inline/spops.h"
12 
13 static int MatSetValues_MPIDense(Mat mat,int m,int *idxm,int n,
14                                int *idxn,Scalar *v,InsertMode addv)
15 {
16   Mat_MPIDense *A = (Mat_MPIDense *) mat->data;
17   int          ierr, i, j, rstart = A->rstart, rend = A->rend, row;
18   int          roworiented = A->roworiented;
19 
20   if (A->insertmode != NOT_SET_VALUES && A->insertmode != addv) {
21     SETERRQ(1,"MatSetValues_MPIDense:Cannot mix inserts and adds");
22   }
23   A->insertmode = addv;
24   for ( i=0; i<m; i++ ) {
25     if (idxm[i] < 0) SETERRQ(1,"MatSetValues_MPIDense:Negative row");
26     if (idxm[i] >= A->M) SETERRQ(1,"MatSetValues_MPIDense:Row too large");
27     if (idxm[i] >= rstart && idxm[i] < rend) {
28       row = idxm[i] - rstart;
29       if (roworiented) {
30         ierr = MatSetValues(A->A,1,&row,n,idxn,v+i*n,addv); CHKERRQ(ierr);
31       }
32       else {
33         for ( j=0; j<n; j++ ) {
34           if (idxn[j] < 0) SETERRQ(1,"MatSetValues_MPIDense:Negative column");
35           if (idxn[j] >= A->N) SETERRQ(1,"MatSetValues_MPIDense:Column too large");
36           ierr = MatSetValues(A->A,1,&row,1,&idxn[j],v+i+j*m,addv); CHKERRQ(ierr);
37         }
38       }
39     }
40     else {
41       if (roworiented) {
42         ierr = StashValues_Private(&A->stash,idxm[i],n,idxn,v+i*n,addv); CHKERRQ(ierr);
43       }
44       else { /* must stash each seperately */
45         row = idxm[i];
46         for ( j=0; j<n; j++ ) {
47           ierr = StashValues_Private(&A->stash,row,1,&idxn[j],v+i+j*m,addv);
48                  CHKERRQ(ierr);
49         }
50       }
51     }
52   }
53   return 0;
54 }
55 
56 static int MatGetValues_MPIDense(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
57 {
58   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
59   int          ierr, i, j, rstart = mdn->rstart, rend = mdn->rend, row;
60 
61   if (!mdn->assembled) SETERRQ(1,"MatGetValues_MPIDense:Not for unassembled matrix");
62   for ( i=0; i<m; i++ ) {
63     if (idxm[i] < 0) SETERRQ(1,"MatGetValues_MPIDense:Negative row");
64     if (idxm[i] >= mdn->M) SETERRQ(1,"MatGetValues_MPIDense:Row too large");
65     if (idxm[i] >= rstart && idxm[i] < rend) {
66       row = idxm[i] - rstart;
67       for ( j=0; j<n; j++ ) {
68         if (idxn[j] < 0) SETERRQ(1,"MatGetValues_MPIDense:Negative column");
69         if (idxn[j] >= mdn->N)
70           SETERRQ(1,"MatGetValues_MPIDense:Column too large");
71         ierr = MatGetValues(mdn->A,1,&row,1,&idxn[j],v+i*n+j); CHKERRQ(ierr);
72       }
73     }
74     else {
75       SETERRQ(1,"MatGetValues_MPIDense:Only local values currently supported");
76     }
77   }
78   return 0;
79 }
80 
81 static int MatAssemblyBegin_MPIDense(Mat mat,MatAssemblyType mode)
82 {
83   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
84   MPI_Comm     comm = mat->comm;
85   int          size = mdn->size, *owners = mdn->rowners, rank = mdn->rank;
86   int          *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
87   int          tag = mat->tag, *owner,*starts,count,ierr;
88   InsertMode   addv;
89   MPI_Request  *send_waits,*recv_waits;
90   Scalar       *rvalues,*svalues;
91 
92   /* make sure all processors are either in INSERTMODE or ADDMODE */
93   MPI_Allreduce((void *) &mdn->insertmode,(void *) &addv,1,MPI_INT,
94                 MPI_BOR,comm);
95   if (addv == (ADD_VALUES|INSERT_VALUES)) { SETERRQ(1,
96     "MatAssemblyBegin_MPIDense:Cannot mix adds/inserts on different procs");
97     }
98   mdn->insertmode = addv; /* in case this processor had no cache */
99 
100   /*  first count number of contributors to each processor */
101   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
102   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
103   owner = (int *) PetscMalloc( (mdn->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
104   for ( i=0; i<mdn->stash.n; i++ ) {
105     idx = mdn->stash.idx[i];
106     for ( j=0; j<size; j++ ) {
107       if (idx >= owners[j] && idx < owners[j+1]) {
108         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
109       }
110     }
111   }
112   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
113 
114   /* inform other processors of number of messages and max length*/
115   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
116   MPI_Allreduce((void *) procs,(void *) work,size,MPI_INT,MPI_SUM,comm);
117   nreceives = work[rank];
118   MPI_Allreduce((void *) nprocs,(void *) work,size,MPI_INT,MPI_MAX,comm);
119   nmax = work[rank];
120   PetscFree(work);
121 
122   /* post receives:
123        1) each message will consist of ordered pairs
124      (global index,value) we store the global index as a double
125      to simplify the message passing.
126        2) since we don't know how long each individual message is we
127      allocate the largest needed buffer for each receive. Potentially
128      this is a lot of wasted space.
129 
130        This could be done better.
131   */
132   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
133   CHKPTRQ(rvalues);
134   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
135   CHKPTRQ(recv_waits);
136   for ( i=0; i<nreceives; i++ ) {
137     MPI_Irecv((void *)(rvalues+3*nmax*i),3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
138               comm,recv_waits+i);
139   }
140 
141   /* do sends:
142       1) starts[i] gives the starting index in svalues for stuff going to
143          the ith processor
144   */
145   svalues = (Scalar *) PetscMalloc( 3*(mdn->stash.n+1)*sizeof(Scalar) );
146   CHKPTRQ(svalues);
147   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
148   CHKPTRQ(send_waits);
149   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
150   starts[0] = 0;
151   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
152   for ( i=0; i<mdn->stash.n; i++ ) {
153     svalues[3*starts[owner[i]]]       = (Scalar)  mdn->stash.idx[i];
154     svalues[3*starts[owner[i]]+1]     = (Scalar)  mdn->stash.idy[i];
155     svalues[3*(starts[owner[i]]++)+2] =  mdn->stash.array[i];
156   }
157   PetscFree(owner);
158   starts[0] = 0;
159   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
160   count = 0;
161   for ( i=0; i<size; i++ ) {
162     if (procs[i]) {
163       MPI_Isend((void*)(svalues+3*starts[i]),3*nprocs[i],MPIU_SCALAR,i,tag,
164                 comm,send_waits+count++);
165     }
166   }
167   PetscFree(starts); PetscFree(nprocs);
168 
169   /* Free cache space */
170   ierr = StashDestroy_Private(&mdn->stash); CHKERRQ(ierr);
171 
172   mdn->svalues    = svalues;    mdn->rvalues = rvalues;
173   mdn->nsends     = nsends;     mdn->nrecvs = nreceives;
174   mdn->send_waits = send_waits; mdn->recv_waits = recv_waits;
175   mdn->rmax       = nmax;
176 
177   return 0;
178 }
179 extern int MatSetUpMultiply_MPIDense(Mat);
180 
181 static int MatAssemblyEnd_MPIDense(Mat mat,MatAssemblyType mode)
182 {
183   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
184   MPI_Status   *send_status,recv_status;
185   int          imdex, nrecvs=mdn->nrecvs, count=nrecvs, i, n, ierr, row, col;
186   Scalar       *values,val;
187   InsertMode   addv = mdn->insertmode;
188 
189   /*  wait on receives */
190   while (count) {
191     MPI_Waitany(nrecvs,mdn->recv_waits,&imdex,&recv_status);
192     /* unpack receives into our local space */
193     values = mdn->rvalues + 3*imdex*mdn->rmax;
194     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
195     n = n/3;
196     for ( i=0; i<n; i++ ) {
197       row = (int) PETSCREAL(values[3*i]) - mdn->rstart;
198       col = (int) PETSCREAL(values[3*i+1]);
199       val = values[3*i+2];
200       if (col >= 0 && col < mdn->N) {
201         MatSetValues(mdn->A,1,&row,1,&col,&val,addv);
202       }
203       else {SETERRQ(1,"MatAssemblyEnd_MPIDense:Invalid column");}
204     }
205     count--;
206   }
207   PetscFree(mdn->recv_waits); PetscFree(mdn->rvalues);
208 
209   /* wait on sends */
210   if (mdn->nsends) {
211     send_status = (MPI_Status *) PetscMalloc( mdn->nsends*sizeof(MPI_Status) );
212     CHKPTRQ(send_status);
213     MPI_Waitall(mdn->nsends,mdn->send_waits,send_status);
214     PetscFree(send_status);
215   }
216   PetscFree(mdn->send_waits); PetscFree(mdn->svalues);
217 
218   mdn->insertmode = NOT_SET_VALUES;
219   ierr = MatAssemblyBegin(mdn->A,mode); CHKERRQ(ierr);
220   ierr = MatAssemblyEnd(mdn->A,mode); CHKERRQ(ierr);
221 
222   if (!mdn->assembled && mode == FINAL_ASSEMBLY) {
223     ierr = MatSetUpMultiply_MPIDense(mat); CHKERRQ(ierr);
224   }
225   mdn->assembled = 1;
226   return 0;
227 }
228 
229 static int MatZeroEntries_MPIDense(Mat A)
230 {
231   Mat_MPIDense *l = (Mat_MPIDense *) A->data;
232   return MatZeroEntries(l->A);
233 }
234 
235 /* the code does not do the diagonal entries correctly unless the
236    matrix is square and the column and row owerships are identical.
237    This is a BUG. The only way to fix it seems to be to access
238    mdn->A and mdn->B directly and not through the MatZeroRows()
239    routine.
240 */
241 static int MatZeroRows_MPIDense(Mat A,IS is,Scalar *diag)
242 {
243   Mat_MPIDense   *l = (Mat_MPIDense *) A->data;
244   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
245   int            *procs,*nprocs,j,found,idx,nsends,*work;
246   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
247   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
248   int            *lens,imdex,*lrows,*values;
249   MPI_Comm       comm = A->comm;
250   MPI_Request    *send_waits,*recv_waits;
251   MPI_Status     recv_status,*send_status;
252   IS             istmp;
253 
254   if (!l->assembled) SETERRQ(1,"MatZeroRows_MPIDense:Must assemble matrix");
255   ierr = ISGetLocalSize(is,&N); CHKERRQ(ierr);
256   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
257 
258   /*  first count number of contributors to each processor */
259   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
260   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
261   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
262   for ( i=0; i<N; i++ ) {
263     idx = rows[i];
264     found = 0;
265     for ( j=0; j<size; j++ ) {
266       if (idx >= owners[j] && idx < owners[j+1]) {
267         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
268       }
269     }
270     if (!found) SETERRQ(1,"MatZeroRows_MPIDense:Index out of range");
271   }
272   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
273 
274   /* inform other processors of number of messages and max length*/
275   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
276   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
277   nrecvs = work[rank];
278   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
279   nmax = work[rank];
280   PetscFree(work);
281 
282   /* post receives:   */
283   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
284   CHKPTRQ(rvalues);
285   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
286   CHKPTRQ(recv_waits);
287   for ( i=0; i<nrecvs; i++ ) {
288     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
289   }
290 
291   /* do sends:
292       1) starts[i] gives the starting index in svalues for stuff going to
293          the ith processor
294   */
295   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
296   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
297   CHKPTRQ(send_waits);
298   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
299   starts[0] = 0;
300   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
301   for ( i=0; i<N; i++ ) {
302     svalues[starts[owner[i]]++] = rows[i];
303   }
304   ISRestoreIndices(is,&rows);
305 
306   starts[0] = 0;
307   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
308   count = 0;
309   for ( i=0; i<size; i++ ) {
310     if (procs[i]) {
311       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
312     }
313   }
314   PetscFree(starts);
315 
316   base = owners[rank];
317 
318   /*  wait on receives */
319   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
320   source = lens + nrecvs;
321   count  = nrecvs; slen = 0;
322   while (count) {
323     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
324     /* unpack receives into our local space */
325     MPI_Get_count(&recv_status,MPI_INT,&n);
326     source[imdex]  = recv_status.MPI_SOURCE;
327     lens[imdex]  = n;
328     slen += n;
329     count--;
330   }
331   PetscFree(recv_waits);
332 
333   /* move the data into the send scatter */
334   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
335   count = 0;
336   for ( i=0; i<nrecvs; i++ ) {
337     values = rvalues + i*nmax;
338     for ( j=0; j<lens[i]; j++ ) {
339       lrows[count++] = values[j] - base;
340     }
341   }
342   PetscFree(rvalues); PetscFree(lens);
343   PetscFree(owner); PetscFree(nprocs);
344 
345   /* actually zap the local rows */
346   ierr = ISCreateSeq(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
347   PLogObjectParent(A,istmp);
348   PetscFree(lrows);
349   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
350   ierr = ISDestroy(istmp); CHKERRQ(ierr);
351 
352   /* wait on sends */
353   if (nsends) {
354     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
355     CHKPTRQ(send_status);
356     MPI_Waitall(nsends,send_waits,send_status);
357     PetscFree(send_status);
358   }
359   PetscFree(send_waits); PetscFree(svalues);
360 
361   return 0;
362 }
363 
364 static int MatMult_MPIDense(Mat mat,Vec xx,Vec yy)
365 {
366   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
367   int          ierr;
368   if (!mdn->assembled)
369     SETERRQ(1,"MatMult_MPIDense:Must assemble matrix first");
370   ierr = VecScatterBegin(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);
371   CHKERRQ(ierr);
372   ierr = VecScatterEnd(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);
373   CHKERRQ(ierr);
374   ierr = MatMult(mdn->A,mdn->lvec,yy); CHKERRQ(ierr);
375   return 0;
376 }
377 
378 static int MatMultAdd_MPIDense(Mat mat,Vec xx,Vec yy,Vec zz)
379 {
380   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
381   int          ierr;
382   if (!mdn->assembled)
383     SETERRQ(1,"MatMultAdd_MPIDense:Must assemble matrix first");
384   ierr = VecScatterBegin(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);
385   CHKERRQ(ierr);
386   ierr = VecScatterEnd(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);
387   CHKERRQ(ierr);
388   ierr = MatMultAdd(mdn->A,mdn->lvec,yy,zz); CHKERRQ(ierr);
389   return 0;
390 }
391 
392 static int MatMultTrans_MPIDense(Mat A,Vec xx,Vec yy)
393 {
394   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
395   int          ierr;
396   Scalar       zero = 0.0;
397 
398   if (!a->assembled) SETERRQ(1,"MatMulTrans_MPIDense:must assemble matrix");
399   ierr = VecSet(&zero,yy); CHKERRQ(ierr);
400   ierr = MatMultTrans(a->A,xx,a->lvec); CHKERRQ(ierr);
401   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,
402          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
403   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,
404          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
405   return 0;
406 }
407 
408 static int MatMultTransAdd_MPIDense(Mat A,Vec xx,Vec yy,Vec zz)
409 {
410   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
411   int          ierr;
412 
413   if (!a->assembled) SETERRQ(1,"MatMulTransAdd_MPIDense:must assemble matrix");
414   ierr = VecCopy(yy,zz); CHKERRQ(ierr);
415   ierr = MatMultTrans(a->A,xx,a->lvec); CHKERRQ(ierr);
416   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,
417          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
418   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,
419          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
420   return 0;
421 }
422 
423 static int MatGetDiagonal_MPIDense(Mat A,Vec v)
424 {
425   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
426   Mat_SeqDense *aloc = (Mat_SeqDense *) a->A->data;
427   int          ierr, i, n, m = a->m, radd;
428   Scalar       *x;
429 
430   if (!a->assembled) SETERRQ(1,"MatGetDiag_MPIDense:must assemble matrix");
431   ierr = VecGetArray(v,&x); CHKERRQ(ierr);
432   ierr = VecGetSize(v,&n); CHKERRQ(ierr);
433   if (n != a->M) SETERRQ(1,"MatGetDiagonal_SeqDense:Nonconforming mat and vec");
434   radd = a->rstart*m*m;
435   for ( i=0; i<m; i++ ) {
436     x[i] = aloc->v[radd + i*m + i];
437   }
438   return 0;
439 }
440 
441 static int MatDestroy_MPIDense(PetscObject obj)
442 {
443   Mat          mat = (Mat) obj;
444   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
445   int          ierr;
446 
447 #if defined(PETSC_LOG)
448   PLogObjectState(obj,"Rows=%d, Cols=%d",mdn->M,mdn->N);
449 #endif
450   PetscFree(mdn->rowners);
451   ierr = MatDestroy(mdn->A); CHKERRQ(ierr);
452   if (mdn->lvec)   VecDestroy(mdn->lvec);
453   if (mdn->Mvctx)  VecScatterDestroy(mdn->Mvctx);
454   if (mdn->factor) {
455     if (mdn->factor->temp)   PetscFree(mdn->factor->temp);
456     if (mdn->factor->tag)    PetscFree(mdn->factor->tag);
457     if (mdn->factor->pivots) PetscFree(mdn->factor->pivots);
458     PetscFree(mdn->factor);
459   }
460   PetscFree(mdn);
461   PLogObjectDestroy(mat);
462   PetscHeaderDestroy(mat);
463   return 0;
464 }
465 
466 #include "pinclude/pviewer.h"
467 
468 static int MatView_MPIDense_Binary(Mat mat,Viewer viewer)
469 {
470   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
471   int          ierr;
472   if (mdn->size == 1) {
473     ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
474   }
475   else SETERRQ(1,"MatView_MPIDense_Binary:Only uniprocessor output supported");
476   return 0;
477 }
478 
479 static int MatView_MPIDense_ASCII(Mat mat,Viewer viewer)
480 {
481   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
482   int          ierr, format;
483   PetscObject  vobj = (PetscObject) viewer;
484   FILE         *fd;
485 
486   ierr = ViewerFileGetPointer_Private(viewer,&fd); CHKERRQ(ierr);
487   if (vobj->type == ASCII_FILE_VIEWER || vobj->type == ASCII_FILES_VIEWER) {
488     ierr = ViewerFileGetFormat_Private(viewer,&format);
489     if (format == FILE_FORMAT_INFO_DETAILED) {
490       int nz, nzalloc, mem, rank;
491       MPI_Comm_rank(mat->comm,&rank);
492       ierr = MatGetInfo(mat,MAT_LOCAL,&nz,&nzalloc,&mem);
493       MPIU_Seq_begin(mat->comm,1);
494         fprintf(fd,"  [%d] local rows %d nz %d nz alloced %d mem %d \n",
495             rank,mdn->m,nz,nzalloc,mem);
496       fflush(fd);
497       MPIU_Seq_end(mat->comm,1);
498       ierr = VecScatterView(mdn->Mvctx,viewer); CHKERRQ(ierr);
499       return 0;
500     }
501     else if (format == FILE_FORMAT_INFO) {
502       return 0;
503     }
504   }
505   if (vobj->type == ASCII_FILE_VIEWER) {
506     MPIU_Seq_begin(mat->comm,1);
507     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d\n",
508              mdn->rank,mdn->m,mdn->rstart,mdn->rend,mdn->n);
509     ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
510     fflush(fd);
511     MPIU_Seq_end(mat->comm,1);
512   }
513   else {
514     int size = mdn->size, rank = mdn->rank;
515     if (size == 1) {
516       ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
517     }
518     else {
519       /* assemble the entire matrix onto first processor. */
520       Mat          A;
521       int          M = mdn->M, N = mdn->N,m,row,i, nz, *cols;
522       Scalar       *vals;
523       Mat_SeqDense *Amdn = (Mat_SeqDense*) mdn->A->data;
524 
525       if (!rank) {
526         ierr = MatCreateMPIDense(mat->comm,M,M,N,N,PETSC_NULL,&A); CHKERRQ(ierr);
527       }
528       else {
529         ierr = MatCreateMPIDense(mat->comm,0,M,N,N,PETSC_NULL,&A); CHKERRQ(ierr);
530       }
531       PLogObjectParent(mat,A);
532 
533       /* Copy the matrix ... This isn't the most efficient means,
534          but it's quick for now */
535       row = mdn->rstart; m = Amdn->m;
536       for ( i=0; i<m; i++ ) {
537         ierr = MatGetRow(mat,row,&nz,&cols,&vals); CHKERRQ(ierr);
538         ierr = MatSetValues(A,1,&row,nz,cols,vals,INSERT_VALUES); CHKERRQ(ierr);
539         ierr = MatRestoreRow(mat,row,&nz,&cols,&vals); CHKERRQ(ierr);
540         row++;
541       }
542 
543       ierr = MatAssemblyBegin(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
544       ierr = MatAssemblyEnd(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
545       if (!rank) {
546         ierr = MatView(((Mat_MPIDense*)(A->data))->A,viewer); CHKERRQ(ierr);
547       }
548       ierr = MatDestroy(A); CHKERRQ(ierr);
549     }
550   }
551   return 0;
552 }
553 
554 static int MatView_MPIDense(PetscObject obj,Viewer viewer)
555 {
556   Mat          mat = (Mat) obj;
557   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
558   PetscObject  vobj = (PetscObject) viewer;
559   int          ierr;
560 
561   if (!mdn->assembled) SETERRQ(1,"MatView_MPIDense:must assemble matrix");
562   if (!viewer) {
563     viewer = STDOUT_VIEWER_SELF; vobj = (PetscObject) viewer;
564   }
565   if (vobj->cookie == VIEWER_COOKIE && vobj->type == ASCII_FILE_VIEWER) {
566     ierr = MatView_MPIDense_ASCII(mat,viewer); CHKERRQ(ierr);
567   }
568   else if (vobj->cookie == VIEWER_COOKIE && vobj->type == ASCII_FILES_VIEWER) {
569     ierr = MatView_MPIDense_ASCII(mat,viewer); CHKERRQ(ierr);
570   }
571   else if (vobj->type == BINARY_FILE_VIEWER) {
572     return MatView_MPIDense_Binary(mat,viewer);
573   }
574   return 0;
575 }
576 
577 static int MatGetInfo_MPIDense(Mat A,MatInfoType flag,int *nz,
578                              int *nzalloc,int *mem)
579 {
580   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
581   Mat          mdn = mat->A;
582   int          ierr, isend[3], irecv[3];
583 
584   ierr = MatGetInfo(mdn,MAT_LOCAL,&isend[0],&isend[1],&isend[2]); CHKERRQ(ierr);
585   if (flag == MAT_LOCAL) {
586     *nz = isend[0]; *nzalloc = isend[1]; *mem = isend[2];
587   } else if (flag == MAT_GLOBAL_MAX) {
588     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_MAX,A->comm);
589     *nz = irecv[0]; *nzalloc = irecv[1]; *mem = irecv[2];
590   } else if (flag == MAT_GLOBAL_SUM) {
591     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_SUM,A->comm);
592     *nz = irecv[0]; *nzalloc = irecv[1]; *mem = irecv[2];
593   }
594   return 0;
595 }
596 
597 /* extern int MatLUFactorSymbolic_MPIDense(Mat,IS,IS,double,Mat*);
598    extern int MatLUFactorNumeric_MPIDense(Mat,Mat*);
599    extern int MatLUFactor_MPIDense(Mat,IS,IS,double);
600    extern int MatSolve_MPIDense(Mat,Vec,Vec);
601    extern int MatSolveAdd_MPIDense(Mat,Vec,Vec,Vec);
602    extern int MatSolveTrans_MPIDense(Mat,Vec,Vec);
603    extern int MatSolveTransAdd_MPIDense(Mat,Vec,Vec,Vec); */
604 
605 static int MatSetOption_MPIDense(Mat A,MatOption op)
606 {
607   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
608 
609   if (op == NO_NEW_NONZERO_LOCATIONS ||
610       op == YES_NEW_NONZERO_LOCATIONS ||
611       op == COLUMNS_SORTED ||
612       op == ROW_ORIENTED) {
613         MatSetOption(a->A,op);
614   }
615   else if (op == ROWS_SORTED ||
616            op == SYMMETRIC_MATRIX ||
617            op == STRUCTURALLY_SYMMETRIC_MATRIX ||
618            op == YES_NEW_DIAGONALS)
619     PLogInfo((PetscObject)A,"Info:MatSetOption_MPIDense:Option ignored\n");
620   else if (op == COLUMN_ORIENTED)
621     { a->roworiented = 0; MatSetOption(a->A,op);}
622   else if (op == NO_NEW_DIAGONALS)
623     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIDense:NO_NEW_DIAGONALS");}
624   else
625     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIDense:unknown option");}
626   return 0;
627 }
628 
629 static int MatGetSize_MPIDense(Mat A,int *m,int *n)
630 {
631   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
632   *m = mat->M; *n = mat->N;
633   return 0;
634 }
635 
636 static int MatGetLocalSize_MPIDense(Mat A,int *m,int *n)
637 {
638   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
639   *m = mat->m; *n = mat->N;
640   return 0;
641 }
642 
643 static int MatGetOwnershipRange_MPIDense(Mat A,int *m,int *n)
644 {
645   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
646   *m = mat->rstart; *n = mat->rend;
647   return 0;
648 }
649 
650 static int MatGetRow_MPIDense(Mat A,int row,int *nz,int **idx,Scalar **v)
651 {
652   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
653   int          lrow, rstart = mat->rstart, rend = mat->rend;
654 
655   if (row < rstart || row >= rend) SETERRQ(1,"MatGetRow_MPIDense:only local rows")
656   lrow = row - rstart;
657   return MatGetRow(mat->A,lrow,nz,idx,v);
658 }
659 
660 static int MatRestoreRow_MPIDense(Mat mat,int row,int *nz,int **idx,Scalar **v)
661 {
662   if (idx) PetscFree(*idx);
663   if (v) PetscFree(*v);
664   return 0;
665 }
666 
667 static int MatNorm_MPIDense(Mat A,NormType type,double *norm)
668 {
669   Mat_MPIDense *mdn = (Mat_MPIDense *) A->data;
670   Mat_SeqDense *mat = (Mat_SeqDense*) mdn->A->data;
671   int          ierr, i, j;
672   double       sum = 0.0;
673   Scalar       *v = mat->v;
674 
675   if (!mdn->assembled) SETERRQ(1,"MatNorm_MPIDense:Must assemble matrix");
676   if (mdn->size == 1) {
677     ierr =  MatNorm(mdn->A,type,norm); CHKERRQ(ierr);
678   } else {
679     if (type == NORM_FROBENIUS) {
680       for (i=0; i<mat->n*mat->m; i++ ) {
681 #if defined(PETSC_COMPLEX)
682         sum += real(conj(*v)*(*v)); v++;
683 #else
684         sum += (*v)*(*v); v++;
685 #endif
686       }
687       MPI_Allreduce((void*)&sum,(void*)norm,1,MPI_DOUBLE,MPI_SUM,A->comm);
688       *norm = sqrt(*norm);
689       PLogFlops(2*mat->n*mat->m);
690     }
691     else if (type == NORM_1) {
692       double *tmp, *tmp2;
693       tmp  = (double *) PetscMalloc( 2*mdn->N*sizeof(double) ); CHKPTRQ(tmp);
694       tmp2 = tmp + mdn->N;
695       PetscMemzero(tmp,2*mdn->N*sizeof(double));
696       *norm = 0.0;
697       v = mat->v;
698       for ( j=0; j<mat->n; j++ ) {
699         for ( i=0; i<mat->m; i++ ) {
700           tmp[j] += PetscAbsScalar(*v);  v++;
701         }
702       }
703       MPI_Allreduce((void*)tmp,(void*)tmp2,mdn->N,MPI_DOUBLE,MPI_SUM,A->comm);
704       for ( j=0; j<mdn->N; j++ ) {
705         if (tmp2[j] > *norm) *norm = tmp2[j];
706       }
707       PetscFree(tmp);
708       PLogFlops(mat->n*mat->m);
709     }
710     else if (type == NORM_INFINITY) { /* max row norm */
711       double ntemp;
712       ierr = MatNorm(mdn->A,type,&ntemp); CHKERRQ(ierr);
713       MPI_Allreduce((void*)&ntemp,(void*)norm,1,MPI_DOUBLE,MPI_MAX,A->comm);
714     }
715     else {
716       SETERRQ(1,"MatNorm_MPIDense:No support for two norm");
717     }
718   }
719   return 0;
720 }
721 
722 static int MatTranspose_MPIDense(Mat A,Mat *matout)
723 {
724   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
725   Mat_SeqDense *Aloc = (Mat_SeqDense *) a->A->data;
726   Mat          B;
727   int          M = a->M, N = a->N, m, n, *rwork, rstart = a->rstart;
728   int          j, i, ierr;
729   Scalar       *v;
730 
731   if (!matout && M != N)
732     SETERRQ(1,"MatTranspose_MPIDense:Supports square matrix only in-place");
733   ierr = MatCreateMPIDense(A->comm,PETSC_DECIDE,PETSC_DECIDE,N,M,PETSC_NULL,&B);
734          CHKERRQ(ierr);
735 
736   m = Aloc->m; n = Aloc->n; v = Aloc->v;
737   rwork = (int *) PetscMalloc(n*sizeof(int)); CHKPTRQ(rwork);
738   for ( j=0; j<n; j++ ) {
739     for (i=0; i<m; i++) rwork[i] = rstart + i;
740     ierr = MatSetValues(B,1,&j,m,rwork,v,INSERT_VALUES); CHKERRQ(ierr);
741     v += m;
742   }
743   PetscFree(rwork);
744   ierr = MatAssemblyBegin(B,FINAL_ASSEMBLY); CHKERRQ(ierr);
745   ierr = MatAssemblyEnd(B,FINAL_ASSEMBLY); CHKERRQ(ierr);
746   if (matout) {
747     *matout = B;
748   } else {
749     /* This isn't really an in-place transpose, but free data struct from a */
750     PetscFree(a->rowners);
751     ierr = MatDestroy(a->A); CHKERRQ(ierr);
752     if (a->lvec) VecDestroy(a->lvec);
753     if (a->Mvctx) VecScatterDestroy(a->Mvctx);
754     PetscFree(a);
755     PetscMemcpy(A,B,sizeof(struct _Mat));
756     PetscHeaderDestroy(B);
757   }
758   return 0;
759 }
760 
761 static int MatConvertSameType_MPIDense(Mat,Mat *,int);
762 
763 /* -------------------------------------------------------------------*/
764 static struct _MatOps MatOps = {MatSetValues_MPIDense,
765        MatGetRow_MPIDense,MatRestoreRow_MPIDense,
766        MatMult_MPIDense,MatMultAdd_MPIDense,
767        MatMultTrans_MPIDense,MatMultTransAdd_MPIDense,
768        MatSolve_MPIDense,0,
769        0,0,
770        0,0,
771 /*       MatLUFactor_MPIDense,0, */
772        0,MatTranspose_MPIDense,
773        MatGetInfo_MPIDense,0,
774        MatGetDiagonal_MPIDense,0,MatNorm_MPIDense,
775        MatAssemblyBegin_MPIDense,MatAssemblyEnd_MPIDense,
776        0,
777        MatSetOption_MPIDense,MatZeroEntries_MPIDense,MatZeroRows_MPIDense,
778        0,0,0,
779 /*       0,MatLUFactorSymbolic_MPIDense,MatLUFactorNumeric_MPIDense, */
780        0,0,
781        MatGetSize_MPIDense,MatGetLocalSize_MPIDense,
782        MatGetOwnershipRange_MPIDense,
783        0,0,
784        0,0,0,0,0,MatConvertSameType_MPIDense,
785        0,0,0,0,0,
786        0,0,MatGetValues_MPIDense};
787 
788 /*@C
789    MatCreateMPIDense - Creates a sparse parallel matrix in dense format.
790 
791    Input Parameters:
792 .  comm - MPI communicator
793 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
794 .  n - number of local columns (or PETSC_DECIDE to have calculated
795            if N is given)
796 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
797 .  N - number of global columns (or PETSC_DECIDE to have calculated
798            if n is given)
799 .  data - optional location of matrix data.  Set data=PETSC_NULL for PETSc
800    to control all matrix memory allocation.
801 
802    Output Parameter:
803 .  newmat - the matrix
804 
805    Notes:
806    The dense format is fully compatible with standard Fortran 77
807    storage by columns.
808 
809    The data input variable is intended primarily for Fortran programmers
810    who wish to allocate their own matrix memory space.  Most users should
811    set data=PETSC_NULL.
812 
813    The user MUST specify either the local or global matrix dimensions
814    (possibly both).
815 
816    Currently, the only parallel dense matrix decomposition is by rows,
817    so that n=N and each submatrix owns all of the global columns.
818 
819 .keywords: matrix, dense, parallel
820 
821 .seealso: MatCreate(), MatCreateSeqDense(), MatSetValues()
822 @*/
823 int MatCreateMPIDense(MPI_Comm comm,int m,int n,int M,int N,Scalar *data,Mat *newmat)
824 {
825   Mat          mat;
826   Mat_MPIDense *a;
827   int          ierr, i;
828 
829 /* Note:  For now, when data is specified above, this assumes the user correctly
830    allocates the local dense storage space.  We should add error checking. */
831 
832   *newmat         = 0;
833   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIDENSE,comm);
834   PLogObjectCreate(mat);
835   mat->data       = (void *) (a = PetscNew(Mat_MPIDense)); CHKPTRQ(a);
836   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
837   mat->destroy    = MatDestroy_MPIDense;
838   mat->view       = MatView_MPIDense;
839   mat->factor     = 0;
840 
841   a->factor = 0;
842   a->insertmode = NOT_SET_VALUES;
843   MPI_Comm_rank(comm,&a->rank);
844   MPI_Comm_size(comm,&a->size);
845 
846   if (M == PETSC_DECIDE) MPI_Allreduce(&m,&M,1,MPI_INT,MPI_SUM,comm);
847   if (m == PETSC_DECIDE) {m = M/a->size + ((M % a->size) > a->rank);}
848 
849   /* each row stores all columns */
850   if (N == PETSC_DECIDE) N = n;
851   if (n == PETSC_DECIDE) {n = N/a->size + ((N % a->size) > a->rank);}
852   /*  if (n != N) SETERRQ(1,"MatCreateMPIDense:For now, only n=N is supported"); */
853   a->N = N;
854   a->M = M;
855   a->m = m;
856   a->n = n;
857 
858   /* build local table of row and column ownerships */
859   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
860   a->cowners = a->rowners + a->size + 1;
861   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+
862                        sizeof(Mat_MPIDense));
863   MPI_Allgather(&m,1,MPI_INT,a->rowners+1,1,MPI_INT,comm);
864   a->rowners[0] = 0;
865   for ( i=2; i<=a->size; i++ ) {
866     a->rowners[i] += a->rowners[i-1];
867   }
868   a->rstart = a->rowners[a->rank];
869   a->rend   = a->rowners[a->rank+1];
870   MPI_Allgather(&n,1,MPI_INT,a->cowners+1,1,MPI_INT,comm);
871   a->cowners[0] = 0;
872   for ( i=2; i<=a->size; i++ ) {
873     a->cowners[i] += a->cowners[i-1];
874   }
875 
876   ierr = MatCreateSeqDense(MPI_COMM_SELF,m,N,data,&a->A); CHKERRQ(ierr);
877   PLogObjectParent(mat,a->A);
878 
879   /* build cache for off array entries formed */
880   ierr = StashBuild_Private(&a->stash); CHKERRQ(ierr);
881 
882   /* stuff used for matrix vector multiply */
883   a->lvec        = 0;
884   a->Mvctx       = 0;
885   a->assembled   = 0;
886   a->roworiented = 1;
887 
888   *newmat = mat;
889   if (OptionsHasName(PETSC_NULL,"-help")) {
890     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
891   }
892   return 0;
893 }
894 
895 static int MatConvertSameType_MPIDense(Mat A,Mat *newmat,int cpvalues)
896 {
897   Mat          mat;
898   Mat_MPIDense *a,*oldmat = (Mat_MPIDense *) A->data;
899   int          ierr;
900   FactorCtx    *factor;
901 
902   if (!oldmat->assembled) SETERRQ(1,"MatConvertSameType_MPIDense:Must assemble matrix");
903   *newmat       = 0;
904   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIDENSE,A->comm);
905   PLogObjectCreate(mat);
906   mat->data     = (void *) (a = PetscNew(Mat_MPIDense)); CHKPTRQ(a);
907   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
908   mat->destroy  = MatDestroy_MPIDense;
909   mat->view     = MatView_MPIDense;
910   mat->factor   = A->factor;
911 
912   a->m          = oldmat->m;
913   a->n          = oldmat->n;
914   a->M          = oldmat->M;
915   a->N          = oldmat->N;
916   if (oldmat->factor) {
917     a->factor = (FactorCtx *) (factor = PetscNew(FactorCtx)); CHKPTRQ(factor);
918     /* copy factor contents ... add this code! */
919   } else a->factor = 0;
920 
921   a->assembled  = 1;
922   a->rstart     = oldmat->rstart;
923   a->rend       = oldmat->rend;
924   a->size       = oldmat->size;
925   a->rank       = oldmat->rank;
926   a->insertmode = NOT_SET_VALUES;
927 
928   a->rowners = (int *) PetscMalloc((a->size+1)*sizeof(int)); CHKPTRQ(a->rowners);
929   PLogObjectMemory(mat,(a->size+1)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIDense));
930   PetscMemcpy(a->rowners,oldmat->rowners,(a->size+1)*sizeof(int));
931   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
932 
933   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
934   PLogObjectParent(mat,a->lvec);
935   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
936   PLogObjectParent(mat,a->Mvctx);
937   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
938   PLogObjectParent(mat,a->A);
939   *newmat = mat;
940   return 0;
941 }
942 
943 #include "sysio.h"
944 
945 int MatLoad_MPIDense(Viewer bview,MatType type,Mat *newmat)
946 {
947   Mat          A;
948   int          i, nz, ierr, j,rstart, rend, fd;
949   Scalar       *vals,*svals;
950   PetscObject  vobj = (PetscObject) bview;
951   MPI_Comm     comm = vobj->comm;
952   MPI_Status   status;
953   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,maxnz,*cols;
954   int          *ourlens,*sndcounts = 0,*procsnz = 0, *offlens,jj,*mycols,*smycols;
955   int          tag = ((PetscObject)bview)->tag;
956 
957   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
958   if (!rank) {
959     ierr = ViewerFileGetDescriptor_Private(bview,&fd); CHKERRQ(ierr);
960     ierr = SYRead(fd,(char *)header,4,SYINT); CHKERRQ(ierr);
961     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIDenseorMPIRow:not matrix object");
962   }
963 
964   MPI_Bcast(header+1,3,MPI_INT,0,comm);
965   M = header[1]; N = header[2];
966   /* determine ownership of all rows */
967   m = M/size + ((M % size) > rank);
968   rowners = (int *) PetscMalloc((size+2)*sizeof(int)); CHKPTRQ(rowners);
969   MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);
970   rowners[0] = 0;
971   for ( i=2; i<=size; i++ ) {
972     rowners[i] += rowners[i-1];
973   }
974   rstart = rowners[rank];
975   rend   = rowners[rank+1];
976 
977   /* distribute row lengths to all processors */
978   ourlens = (int*) PetscMalloc( 2*(rend-rstart)*sizeof(int) ); CHKPTRQ(ourlens);
979   offlens = ourlens + (rend-rstart);
980   if (!rank) {
981     rowlengths = (int*) PetscMalloc( M*sizeof(int) ); CHKPTRQ(rowlengths);
982     ierr = SYRead(fd,rowlengths,M,SYINT); CHKERRQ(ierr);
983     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
984     for ( i=0; i<size; i++ ) sndcounts[i] = rowners[i+1] - rowners[i];
985     MPI_Scatterv(rowlengths,sndcounts,rowners,MPI_INT,ourlens,rend-rstart,MPI_INT,0,comm);
986     PetscFree(sndcounts);
987   }
988   else {
989     MPI_Scatterv(0,0,0,MPI_INT,ourlens,rend-rstart,MPI_INT, 0,comm);
990   }
991 
992   if (!rank) {
993     /* calculate the number of nonzeros on each processor */
994     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
995     PetscMemzero(procsnz,size*sizeof(int));
996     for ( i=0; i<size; i++ ) {
997       for ( j=rowners[i]; j< rowners[i+1]; j++ ) {
998         procsnz[i] += rowlengths[j];
999       }
1000     }
1001     PetscFree(rowlengths);
1002 
1003     /* determine max buffer needed and allocate it */
1004     maxnz = 0;
1005     for ( i=0; i<size; i++ ) {
1006       maxnz = PetscMax(maxnz,procsnz[i]);
1007     }
1008     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1009 
1010     /* read in my part of the matrix column indices  */
1011     nz = procsnz[0];
1012     mycols = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1013     ierr = SYRead(fd,mycols,nz,SYINT); CHKERRQ(ierr);
1014 
1015     /* read in every one elses and ship off */
1016     for ( i=1; i<size; i++ ) {
1017       nz = procsnz[i];
1018       ierr = SYRead(fd,cols,nz,SYINT); CHKERRQ(ierr);
1019       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1020     }
1021     PetscFree(cols);
1022   }
1023   else {
1024     /* determine buffer space needed for message */
1025     nz = 0;
1026     for ( i=0; i<m; i++ ) {
1027       nz += ourlens[i];
1028     }
1029     mycols = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1030 
1031     /* receive message of column indices*/
1032     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1033     MPI_Get_count(&status,MPI_INT,&maxnz);
1034     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIDenseorMPIRow:something is wrong with file");
1035   }
1036 
1037   /* loop over local rows, determining number of off diagonal entries */
1038   PetscMemzero(offlens,m*sizeof(int));
1039   jj = 0;
1040   for ( i=0; i<m; i++ ) {
1041     for ( j=0; j<ourlens[i]; j++ ) {
1042       if (mycols[jj] < rstart || mycols[jj] >= rend) offlens[i]++;
1043       jj++;
1044     }
1045   }
1046 
1047   /* create our matrix */
1048   for ( i=0; i<m; i++ ) {
1049     ourlens[i] -= offlens[i];
1050   }
1051   if (type == MATMPIDENSE) {
1052     ierr = MatCreateMPIDense(comm,m,PETSC_DECIDE,M,N,PETSC_NULL,newmat); CHKERRQ(ierr);
1053   }
1054   A = *newmat;
1055   for ( i=0; i<m; i++ ) {
1056     ourlens[i] += offlens[i];
1057   }
1058 
1059   if (!rank) {
1060     vals = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(vals);
1061 
1062     /* read in my part of the matrix numerical values  */
1063     nz = procsnz[0];
1064     ierr = SYRead(fd,vals,nz,SYSCALAR); CHKERRQ(ierr);
1065 
1066     /* insert into matrix */
1067     jj      = rstart;
1068     smycols = mycols;
1069     svals   = vals;
1070     for ( i=0; i<m; i++ ) {
1071       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1072       smycols += ourlens[i];
1073       svals   += ourlens[i];
1074       jj++;
1075     }
1076 
1077     /* read in other processors and ship out */
1078     for ( i=1; i<size; i++ ) {
1079       nz = procsnz[i];
1080       ierr = SYRead(fd,vals,nz,SYSCALAR); CHKERRQ(ierr);
1081       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1082     }
1083     PetscFree(procsnz);
1084   }
1085   else {
1086     /* receive numeric values */
1087     vals = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(vals);
1088 
1089     /* receive message of values*/
1090     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1091     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1092     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIDenseorMPIRow:something is wrong with file");
1093 
1094     /* insert into matrix */
1095     jj      = rstart;
1096     smycols = mycols;
1097     svals   = vals;
1098     for ( i=0; i<m; i++ ) {
1099       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1100       smycols += ourlens[i];
1101       svals   += ourlens[i];
1102       jj++;
1103     }
1104   }
1105   PetscFree(ourlens); PetscFree(vals); PetscFree(mycols); PetscFree(rowners);
1106 
1107   ierr = MatAssemblyBegin(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
1108   ierr = MatAssemblyEnd(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
1109   return 0;
1110 }
1111