xref: /petsc/src/mat/impls/dense/mpi/mpidense.c (revision 39b7565bcaa6f5a9e4b30078553e8ce3336b28c3)
1 #ifndef lint
2 static char vcid[] = "$Id: mpidense.c,v 1.18 1995/12/23 04:53:03 bsmith Exp bsmith $";
3 #endif
4 
5 /*
6    Basic functions for basic parallel dense matrices.
7 */
8 
9 #include "mpidense.h"
10 #include "vec/vecimpl.h"
11 #include "inline/spops.h"
12 
13 static int MatSetValues_MPIDense(Mat mat,int m,int *idxm,int n,
14                                int *idxn,Scalar *v,InsertMode addv)
15 {
16   Mat_MPIDense *A = (Mat_MPIDense *) mat->data;
17   int          ierr, i, j, rstart = A->rstart, rend = A->rend, row;
18   int          roworiented = A->roworiented;
19 
20   if (A->insertmode != NOT_SET_VALUES && A->insertmode != addv) {
21     SETERRQ(1,"MatSetValues_MPIDense:Cannot mix inserts and adds");
22   }
23   A->insertmode = addv;
24   for ( i=0; i<m; i++ ) {
25     if (idxm[i] < 0) SETERRQ(1,"MatSetValues_MPIDense:Negative row");
26     if (idxm[i] >= A->M) SETERRQ(1,"MatSetValues_MPIDense:Row too large");
27     if (idxm[i] >= rstart && idxm[i] < rend) {
28       row = idxm[i] - rstart;
29       if (roworiented) {
30         ierr = MatSetValues(A->A,1,&row,n,idxn,v+i*n,addv); CHKERRQ(ierr);
31       }
32       else {
33         for ( j=0; j<n; j++ ) {
34           if (idxn[j] < 0) SETERRQ(1,"MatSetValues_MPIDense:Negative column");
35           if (idxn[j] >= A->N) SETERRQ(1,"MatSetValues_MPIDense:Column too large");
36           ierr = MatSetValues(A->A,1,&row,1,&idxn[j],v+i+j*m,addv); CHKERRQ(ierr);
37         }
38       }
39     }
40     else {
41       if (roworiented) {
42         ierr = StashValues_Private(&A->stash,idxm[i],n,idxn,v+i*n,addv); CHKERRQ(ierr);
43       }
44       else { /* must stash each seperately */
45         row = idxm[i];
46         for ( j=0; j<n; j++ ) {
47           ierr = StashValues_Private(&A->stash,row,1,&idxn[j],v+i+j*m,addv);
48                  CHKERRQ(ierr);
49         }
50       }
51     }
52   }
53   return 0;
54 }
55 
56 static int MatGetValues_MPIDense(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
57 {
58   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
59   int          ierr, i, j, rstart = mdn->rstart, rend = mdn->rend, row;
60 
61   if (!mdn->assembled) SETERRQ(1,"MatGetValues_MPIDense:Not for unassembled matrix");
62   for ( i=0; i<m; i++ ) {
63     if (idxm[i] < 0) SETERRQ(1,"MatGetValues_MPIDense:Negative row");
64     if (idxm[i] >= mdn->M) SETERRQ(1,"MatGetValues_MPIDense:Row too large");
65     if (idxm[i] >= rstart && idxm[i] < rend) {
66       row = idxm[i] - rstart;
67       for ( j=0; j<n; j++ ) {
68         if (idxn[j] < 0) SETERRQ(1,"MatGetValues_MPIDense:Negative column");
69         if (idxn[j] >= mdn->N)
70           SETERRQ(1,"MatGetValues_MPIDense:Column too large");
71         ierr = MatGetValues(mdn->A,1,&row,1,&idxn[j],v+i*n+j); CHKERRQ(ierr);
72       }
73     }
74     else {
75       SETERRQ(1,"MatGetValues_MPIDense:Only local values currently supported");
76     }
77   }
78   return 0;
79 }
80 
81 static int MatAssemblyBegin_MPIDense(Mat mat,MatAssemblyType mode)
82 {
83   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
84   MPI_Comm     comm = mat->comm;
85   int          size = mdn->size, *owners = mdn->rowners, rank = mdn->rank;
86   int          *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
87   int          tag = mat->tag, *owner,*starts,count,ierr;
88   InsertMode   addv;
89   MPI_Request  *send_waits,*recv_waits;
90   Scalar       *rvalues,*svalues;
91 
92   /* make sure all processors are either in INSERTMODE or ADDMODE */
93   MPI_Allreduce((void *) &mdn->insertmode,(void *) &addv,1,MPI_INT,
94                 MPI_BOR,comm);
95   if (addv == (ADD_VALUES|INSERT_VALUES)) { SETERRQ(1,
96     "MatAssemblyBegin_MPIDense:Cannot mix adds/inserts on different procs");
97     }
98   mdn->insertmode = addv; /* in case this processor had no cache */
99 
100   /*  first count number of contributors to each processor */
101   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
102   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
103   owner = (int *) PetscMalloc( (mdn->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
104   for ( i=0; i<mdn->stash.n; i++ ) {
105     idx = mdn->stash.idx[i];
106     for ( j=0; j<size; j++ ) {
107       if (idx >= owners[j] && idx < owners[j+1]) {
108         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
109       }
110     }
111   }
112   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
113 
114   /* inform other processors of number of messages and max length*/
115   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
116   MPI_Allreduce((void *) procs,(void *) work,size,MPI_INT,MPI_SUM,comm);
117   nreceives = work[rank];
118   MPI_Allreduce((void *) nprocs,(void *) work,size,MPI_INT,MPI_MAX,comm);
119   nmax = work[rank];
120   PetscFree(work);
121 
122   /* post receives:
123        1) each message will consist of ordered pairs
124      (global index,value) we store the global index as a double
125      to simplify the message passing.
126        2) since we don't know how long each individual message is we
127      allocate the largest needed buffer for each receive. Potentially
128      this is a lot of wasted space.
129 
130        This could be done better.
131   */
132   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
133   CHKPTRQ(rvalues);
134   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
135   CHKPTRQ(recv_waits);
136   for ( i=0; i<nreceives; i++ ) {
137     MPI_Irecv((void *)(rvalues+3*nmax*i),3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
138               comm,recv_waits+i);
139   }
140 
141   /* do sends:
142       1) starts[i] gives the starting index in svalues for stuff going to
143          the ith processor
144   */
145   svalues = (Scalar *) PetscMalloc( 3*(mdn->stash.n+1)*sizeof(Scalar) );
146   CHKPTRQ(svalues);
147   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
148   CHKPTRQ(send_waits);
149   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
150   starts[0] = 0;
151   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
152   for ( i=0; i<mdn->stash.n; i++ ) {
153     svalues[3*starts[owner[i]]]       = (Scalar)  mdn->stash.idx[i];
154     svalues[3*starts[owner[i]]+1]     = (Scalar)  mdn->stash.idy[i];
155     svalues[3*(starts[owner[i]]++)+2] =  mdn->stash.array[i];
156   }
157   PetscFree(owner);
158   starts[0] = 0;
159   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
160   count = 0;
161   for ( i=0; i<size; i++ ) {
162     if (procs[i]) {
163       MPI_Isend((void*)(svalues+3*starts[i]),3*nprocs[i],MPIU_SCALAR,i,tag,
164                 comm,send_waits+count++);
165     }
166   }
167   PetscFree(starts); PetscFree(nprocs);
168 
169   /* Free cache space */
170   ierr = StashDestroy_Private(&mdn->stash); CHKERRQ(ierr);
171 
172   mdn->svalues    = svalues;    mdn->rvalues = rvalues;
173   mdn->nsends     = nsends;     mdn->nrecvs = nreceives;
174   mdn->send_waits = send_waits; mdn->recv_waits = recv_waits;
175   mdn->rmax       = nmax;
176 
177   return 0;
178 }
179 extern int MatSetUpMultiply_MPIDense(Mat);
180 
181 static int MatAssemblyEnd_MPIDense(Mat mat,MatAssemblyType mode)
182 {
183   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
184   MPI_Status   *send_status,recv_status;
185   int          imdex, nrecvs=mdn->nrecvs, count=nrecvs, i, n, ierr, row, col;
186   Scalar       *values,val;
187   InsertMode   addv = mdn->insertmode;
188 
189   /*  wait on receives */
190   while (count) {
191     MPI_Waitany(nrecvs,mdn->recv_waits,&imdex,&recv_status);
192     /* unpack receives into our local space */
193     values = mdn->rvalues + 3*imdex*mdn->rmax;
194     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
195     n = n/3;
196     for ( i=0; i<n; i++ ) {
197       row = (int) PETSCREAL(values[3*i]) - mdn->rstart;
198       col = (int) PETSCREAL(values[3*i+1]);
199       val = values[3*i+2];
200       if (col >= 0 && col < mdn->N) {
201         MatSetValues(mdn->A,1,&row,1,&col,&val,addv);
202       }
203       else {SETERRQ(1,"MatAssemblyEnd_MPIDense:Invalid column");}
204     }
205     count--;
206   }
207   PetscFree(mdn->recv_waits); PetscFree(mdn->rvalues);
208 
209   /* wait on sends */
210   if (mdn->nsends) {
211     send_status = (MPI_Status *) PetscMalloc( mdn->nsends*sizeof(MPI_Status) );
212     CHKPTRQ(send_status);
213     MPI_Waitall(mdn->nsends,mdn->send_waits,send_status);
214     PetscFree(send_status);
215   }
216   PetscFree(mdn->send_waits); PetscFree(mdn->svalues);
217 
218   mdn->insertmode = NOT_SET_VALUES;
219   ierr = MatAssemblyBegin(mdn->A,mode); CHKERRQ(ierr);
220   ierr = MatAssemblyEnd(mdn->A,mode); CHKERRQ(ierr);
221 
222   if (!mdn->assembled && mode == FINAL_ASSEMBLY) {
223     ierr = MatSetUpMultiply_MPIDense(mat); CHKERRQ(ierr);
224   }
225   mdn->assembled = 1;
226   return 0;
227 }
228 
229 static int MatZeroEntries_MPIDense(Mat A)
230 {
231   Mat_MPIDense *l = (Mat_MPIDense *) A->data;
232   return MatZeroEntries(l->A);
233 }
234 
235 /* the code does not do the diagonal entries correctly unless the
236    matrix is square and the column and row owerships are identical.
237    This is a BUG. The only way to fix it seems to be to access
238    mdn->A and mdn->B directly and not through the MatZeroRows()
239    routine.
240 */
241 static int MatZeroRows_MPIDense(Mat A,IS is,Scalar *diag)
242 {
243   Mat_MPIDense   *l = (Mat_MPIDense *) A->data;
244   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
245   int            *procs,*nprocs,j,found,idx,nsends,*work;
246   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
247   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
248   int            *lens,imdex,*lrows,*values;
249   MPI_Comm       comm = A->comm;
250   MPI_Request    *send_waits,*recv_waits;
251   MPI_Status     recv_status,*send_status;
252   IS             istmp;
253 
254   if (!l->assembled) SETERRQ(1,"MatZeroRows_MPIDense:Must assemble matrix");
255   ierr = ISGetLocalSize(is,&N); CHKERRQ(ierr);
256   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
257 
258   /*  first count number of contributors to each processor */
259   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
260   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
261   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
262   for ( i=0; i<N; i++ ) {
263     idx = rows[i];
264     found = 0;
265     for ( j=0; j<size; j++ ) {
266       if (idx >= owners[j] && idx < owners[j+1]) {
267         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
268       }
269     }
270     if (!found) SETERRQ(1,"MatZeroRows_MPIDense:Index out of range");
271   }
272   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
273 
274   /* inform other processors of number of messages and max length*/
275   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
276   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
277   nrecvs = work[rank];
278   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
279   nmax = work[rank];
280   PetscFree(work);
281 
282   /* post receives:   */
283   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
284   CHKPTRQ(rvalues);
285   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
286   CHKPTRQ(recv_waits);
287   for ( i=0; i<nrecvs; i++ ) {
288     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
289   }
290 
291   /* do sends:
292       1) starts[i] gives the starting index in svalues for stuff going to
293          the ith processor
294   */
295   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
296   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
297   CHKPTRQ(send_waits);
298   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
299   starts[0] = 0;
300   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
301   for ( i=0; i<N; i++ ) {
302     svalues[starts[owner[i]]++] = rows[i];
303   }
304   ISRestoreIndices(is,&rows);
305 
306   starts[0] = 0;
307   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
308   count = 0;
309   for ( i=0; i<size; i++ ) {
310     if (procs[i]) {
311       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
312     }
313   }
314   PetscFree(starts);
315 
316   base = owners[rank];
317 
318   /*  wait on receives */
319   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
320   source = lens + nrecvs;
321   count  = nrecvs; slen = 0;
322   while (count) {
323     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
324     /* unpack receives into our local space */
325     MPI_Get_count(&recv_status,MPI_INT,&n);
326     source[imdex]  = recv_status.MPI_SOURCE;
327     lens[imdex]  = n;
328     slen += n;
329     count--;
330   }
331   PetscFree(recv_waits);
332 
333   /* move the data into the send scatter */
334   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
335   count = 0;
336   for ( i=0; i<nrecvs; i++ ) {
337     values = rvalues + i*nmax;
338     for ( j=0; j<lens[i]; j++ ) {
339       lrows[count++] = values[j] - base;
340     }
341   }
342   PetscFree(rvalues); PetscFree(lens);
343   PetscFree(owner); PetscFree(nprocs);
344 
345   /* actually zap the local rows */
346   ierr = ISCreateSeq(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
347   PLogObjectParent(A,istmp);
348   PetscFree(lrows);
349   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
350   ierr = ISDestroy(istmp); CHKERRQ(ierr);
351 
352   /* wait on sends */
353   if (nsends) {
354     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
355     CHKPTRQ(send_status);
356     MPI_Waitall(nsends,send_waits,send_status);
357     PetscFree(send_status);
358   }
359   PetscFree(send_waits); PetscFree(svalues);
360 
361   return 0;
362 }
363 
364 static int MatMult_MPIDense(Mat mat,Vec xx,Vec yy)
365 {
366   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
367   int          ierr;
368   if (!mdn->assembled)
369     SETERRQ(1,"MatMult_MPIDense:Must assemble matrix first");
370   ierr = VecScatterBegin(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);
371   CHKERRQ(ierr);
372   ierr = VecScatterEnd(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);
373   CHKERRQ(ierr);
374   ierr = MatMult(mdn->A,mdn->lvec,yy); CHKERRQ(ierr);
375   return 0;
376 }
377 
378 static int MatMultAdd_MPIDense(Mat mat,Vec xx,Vec yy,Vec zz)
379 {
380   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
381   int          ierr;
382   if (!mdn->assembled)
383     SETERRQ(1,"MatMultAdd_MPIDense:Must assemble matrix first");
384   ierr = VecScatterBegin(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);
385   CHKERRQ(ierr);
386   ierr = VecScatterEnd(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);
387   CHKERRQ(ierr);
388   ierr = MatMultAdd(mdn->A,mdn->lvec,yy,zz); CHKERRQ(ierr);
389   return 0;
390 }
391 
392 static int MatMultTrans_MPIDense(Mat A,Vec xx,Vec yy)
393 {
394   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
395   int          ierr;
396   Scalar       zero = 0.0;
397 
398   if (!a->assembled) SETERRQ(1,"MatMulTrans_MPIDense:must assemble matrix");
399   ierr = VecSet(&zero,yy); CHKERRQ(ierr);
400   ierr = MatMultTrans(a->A,xx,a->lvec); CHKERRQ(ierr);
401   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,
402          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
403   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,
404          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
405   return 0;
406 }
407 
408 static int MatMultTransAdd_MPIDense(Mat A,Vec xx,Vec yy,Vec zz)
409 {
410   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
411   int          ierr;
412 
413   if (!a->assembled) SETERRQ(1,"MatMulTransAdd_MPIDense:must assemble matrix");
414   ierr = VecCopy(yy,zz); CHKERRQ(ierr);
415   ierr = MatMultTrans(a->A,xx,a->lvec); CHKERRQ(ierr);
416   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,
417          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
418   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,
419          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
420   return 0;
421 }
422 
423 static int MatGetDiagonal_MPIDense(Mat A,Vec v)
424 {
425   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
426   Mat_SeqDense *aloc = (Mat_SeqDense *) a->A->data;
427   int          ierr, i, n, m = a->m, radd;
428   Scalar       *x;
429 
430   if (!a->assembled) SETERRQ(1,"MatGetDiag_MPIDense:must assemble matrix");
431   ierr = VecGetArray(v,&x); CHKERRQ(ierr);
432   ierr = VecGetSize(v,&n); CHKERRQ(ierr);
433   if (n != a->M) SETERRQ(1,"MatGetDiagonal_SeqDense:Nonconforming mat and vec");
434   radd = a->rstart*m*m;
435   for ( i=0; i<m; i++ ) {
436     x[i] = aloc->v[radd + i*m + i];
437   }
438   return 0;
439 }
440 
441 static int MatDestroy_MPIDense(PetscObject obj)
442 {
443   Mat          mat = (Mat) obj;
444   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
445   int          ierr;
446 
447 #if defined(PETSC_LOG)
448   PLogObjectState(obj,"Rows=%d, Cols=%d",mdn->M,mdn->N);
449 #endif
450   PetscFree(mdn->rowners);
451   ierr = MatDestroy(mdn->A); CHKERRQ(ierr);
452   if (mdn->lvec)   VecDestroy(mdn->lvec);
453   if (mdn->Mvctx)  VecScatterDestroy(mdn->Mvctx);
454   if (mdn->factor) PetscFree(mdn->factor);
455   PetscFree(mdn);
456   PLogObjectDestroy(mat);
457   PetscHeaderDestroy(mat);
458   return 0;
459 }
460 
461 #include "pinclude/pviewer.h"
462 
463 static int MatView_MPIDense_Binary(Mat mat,Viewer viewer)
464 {
465   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
466   int          ierr;
467   if (mdn->size == 1) {
468     ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
469   }
470   else SETERRQ(1,"MatView_MPIDense_Binary:Only uniprocessor output supported");
471   return 0;
472 }
473 
474 static int MatView_MPIDense_ASCII(Mat mat,Viewer viewer)
475 {
476   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
477   int          ierr, format;
478   PetscObject  vobj = (PetscObject) viewer;
479   FILE         *fd;
480 
481   ierr = ViewerFileGetPointer_Private(viewer,&fd); CHKERRQ(ierr);
482   if (vobj->type == ASCII_FILE_VIEWER || vobj->type == ASCII_FILES_VIEWER) {
483     ierr = ViewerFileGetFormat_Private(viewer,&format);
484     if (format == FILE_FORMAT_INFO_DETAILED) {
485       int nz, nzalloc, mem, rank;
486       MPI_Comm_rank(mat->comm,&rank);
487       ierr = MatGetInfo(mat,MAT_LOCAL,&nz,&nzalloc,&mem);
488       MPIU_Seq_begin(mat->comm,1);
489         fprintf(fd,"  [%d] local rows %d nz %d nz alloced %d mem %d \n",
490             rank,mdn->m,nz,nzalloc,mem);
491       fflush(fd);
492       MPIU_Seq_end(mat->comm,1);
493       ierr = VecScatterView(mdn->Mvctx,viewer); CHKERRQ(ierr);
494       return 0;
495     }
496     else if (format == FILE_FORMAT_INFO) {
497       return 0;
498     }
499   }
500   if (vobj->type == ASCII_FILE_VIEWER) {
501     MPIU_Seq_begin(mat->comm,1);
502     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d\n",
503              mdn->rank,mdn->m,mdn->rstart,mdn->rend,mdn->n);
504     ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
505     fflush(fd);
506     MPIU_Seq_end(mat->comm,1);
507   }
508   else {
509     int size = mdn->size, rank = mdn->rank;
510     if (size == 1) {
511       ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
512     }
513     else {
514       /* assemble the entire matrix onto first processor. */
515       Mat          A;
516       int          M = mdn->M, N = mdn->N,m,row,i, nz, *cols;
517       Scalar       *vals;
518       Mat_SeqDense *Amdn = (Mat_SeqDense*) mdn->A->data;
519 
520       if (!rank) {
521         ierr = MatCreateMPIDense(mat->comm,M,M,N,N,PETSC_NULL,&A); CHKERRQ(ierr);
522       }
523       else {
524         ierr = MatCreateMPIDense(mat->comm,0,M,N,N,PETSC_NULL,&A); CHKERRQ(ierr);
525       }
526       PLogObjectParent(mat,A);
527 
528       /* Copy the matrix ... This isn't the most efficient means,
529          but it's quick for now */
530       row = mdn->rstart; m = Amdn->m;
531       for ( i=0; i<m; i++ ) {
532         ierr = MatGetRow(mat,row,&nz,&cols,&vals); CHKERRQ(ierr);
533         ierr = MatSetValues(A,1,&row,nz,cols,vals,INSERT_VALUES); CHKERRQ(ierr);
534         ierr = MatRestoreRow(mat,row,&nz,&cols,&vals); CHKERRQ(ierr);
535         row++;
536       }
537 
538       ierr = MatAssemblyBegin(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
539       ierr = MatAssemblyEnd(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
540       if (!rank) {
541         ierr = MatView(((Mat_MPIDense*)(A->data))->A,viewer); CHKERRQ(ierr);
542       }
543       ierr = MatDestroy(A); CHKERRQ(ierr);
544     }
545   }
546   return 0;
547 }
548 
549 static int MatView_MPIDense(PetscObject obj,Viewer viewer)
550 {
551   Mat          mat = (Mat) obj;
552   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
553   PetscObject  vobj = (PetscObject) viewer;
554   int          ierr;
555 
556   if (!mdn->assembled) SETERRQ(1,"MatView_MPIDense:must assemble matrix");
557   if (!viewer) {
558     viewer = STDOUT_VIEWER_SELF; vobj = (PetscObject) viewer;
559   }
560   if (vobj->cookie == VIEWER_COOKIE && vobj->type == ASCII_FILE_VIEWER) {
561     ierr = MatView_MPIDense_ASCII(mat,viewer); CHKERRQ(ierr);
562   }
563   else if (vobj->cookie == VIEWER_COOKIE && vobj->type == ASCII_FILES_VIEWER) {
564     ierr = MatView_MPIDense_ASCII(mat,viewer); CHKERRQ(ierr);
565   }
566   else if (vobj->type == BINARY_FILE_VIEWER) {
567     return MatView_MPIDense_Binary(mat,viewer);
568   }
569   return 0;
570 }
571 
572 static int MatGetInfo_MPIDense(Mat A,MatInfoType flag,int *nz,
573                              int *nzalloc,int *mem)
574 {
575   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
576   Mat          mdn = mat->A;
577   int          ierr, isend[3], irecv[3];
578 
579   ierr = MatGetInfo(mdn,MAT_LOCAL,&isend[0],&isend[1],&isend[2]); CHKERRQ(ierr);
580   if (flag == MAT_LOCAL) {
581     *nz = isend[0]; *nzalloc = isend[1]; *mem = isend[2];
582   } else if (flag == MAT_GLOBAL_MAX) {
583     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_MAX,A->comm);
584     *nz = irecv[0]; *nzalloc = irecv[1]; *mem = irecv[2];
585   } else if (flag == MAT_GLOBAL_SUM) {
586     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_SUM,A->comm);
587     *nz = irecv[0]; *nzalloc = irecv[1]; *mem = irecv[2];
588   }
589   return 0;
590 }
591 
592 static int MatSetOption_MPIDense(Mat A,MatOption op)
593 {
594   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
595 
596   if (op == NO_NEW_NONZERO_LOCATIONS ||
597       op == YES_NEW_NONZERO_LOCATIONS ||
598       op == COLUMNS_SORTED ||
599       op == ROW_ORIENTED) {
600         MatSetOption(a->A,op);
601   }
602   else if (op == ROWS_SORTED ||
603            op == SYMMETRIC_MATRIX ||
604            op == STRUCTURALLY_SYMMETRIC_MATRIX ||
605            op == YES_NEW_DIAGONALS)
606     PLogInfo((PetscObject)A,"Info:MatSetOption_MPIDense:Option ignored\n");
607   else if (op == COLUMN_ORIENTED)
608     { a->roworiented = 0; MatSetOption(a->A,op);}
609   else if (op == NO_NEW_DIAGONALS)
610     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIDense:NO_NEW_DIAGONALS");}
611   else
612     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIDense:unknown option");}
613   return 0;
614 }
615 
616 static int MatGetSize_MPIDense(Mat A,int *m,int *n)
617 {
618   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
619   *m = mat->M; *n = mat->N;
620   return 0;
621 }
622 
623 static int MatGetLocalSize_MPIDense(Mat A,int *m,int *n)
624 {
625   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
626   *m = mat->m; *n = mat->N;
627   return 0;
628 }
629 
630 static int MatGetOwnershipRange_MPIDense(Mat A,int *m,int *n)
631 {
632   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
633   *m = mat->rstart; *n = mat->rend;
634   return 0;
635 }
636 
637 static int MatGetRow_MPIDense(Mat A,int row,int *nz,int **idx,Scalar **v)
638 {
639   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
640   int          lrow, rstart = mat->rstart, rend = mat->rend;
641 
642   if (row < rstart || row >= rend) SETERRQ(1,"MatGetRow_MPIDense:only local rows")
643   lrow = row - rstart;
644   return MatGetRow(mat->A,lrow,nz,idx,v);
645 }
646 
647 static int MatRestoreRow_MPIDense(Mat mat,int row,int *nz,int **idx,Scalar **v)
648 {
649   if (idx) PetscFree(*idx);
650   if (v) PetscFree(*v);
651   return 0;
652 }
653 
654 static int MatNorm_MPIDense(Mat A,NormType type,double *norm)
655 {
656   Mat_MPIDense *mdn = (Mat_MPIDense *) A->data;
657   Mat_SeqDense *mat = (Mat_SeqDense*) mdn->A->data;
658   int          ierr, i, j;
659   double       sum = 0.0;
660   Scalar       *v = mat->v;
661 
662   if (!mdn->assembled) SETERRQ(1,"MatNorm_MPIDense:Must assemble matrix");
663   if (mdn->size == 1) {
664     ierr =  MatNorm(mdn->A,type,norm); CHKERRQ(ierr);
665   } else {
666     if (type == NORM_FROBENIUS) {
667       for (i=0; i<mat->n*mat->m; i++ ) {
668 #if defined(PETSC_COMPLEX)
669         sum += real(conj(*v)*(*v)); v++;
670 #else
671         sum += (*v)*(*v); v++;
672 #endif
673       }
674       MPI_Allreduce((void*)&sum,(void*)norm,1,MPI_DOUBLE,MPI_SUM,A->comm);
675       *norm = sqrt(*norm);
676       PLogFlops(2*mat->n*mat->m);
677     }
678     else if (type == NORM_1) {
679       double *tmp, *tmp2;
680       tmp  = (double *) PetscMalloc( 2*mdn->N*sizeof(double) ); CHKPTRQ(tmp);
681       tmp2 = tmp + mdn->N;
682       PetscMemzero(tmp,2*mdn->N*sizeof(double));
683       *norm = 0.0;
684       v = mat->v;
685       for ( j=0; j<mat->n; j++ ) {
686         for ( i=0; i<mat->m; i++ ) {
687           tmp[j] += PetscAbsScalar(*v);  v++;
688         }
689       }
690       MPI_Allreduce((void*)tmp,(void*)tmp2,mdn->N,MPI_DOUBLE,MPI_SUM,A->comm);
691       for ( j=0; j<mdn->N; j++ ) {
692         if (tmp2[j] > *norm) *norm = tmp2[j];
693       }
694       PetscFree(tmp);
695       PLogFlops(mat->n*mat->m);
696     }
697     else if (type == NORM_INFINITY) { /* max row norm */
698       double ntemp;
699       ierr = MatNorm(mdn->A,type,&ntemp); CHKERRQ(ierr);
700       MPI_Allreduce((void*)&ntemp,(void*)norm,1,MPI_DOUBLE,MPI_MAX,A->comm);
701     }
702     else {
703       SETERRQ(1,"MatNorm_MPIDense:No support for two norm");
704     }
705   }
706   return 0;
707 }
708 
709 static int MatTranspose_MPIDense(Mat A,Mat *matout)
710 {
711   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
712   Mat_SeqDense *Aloc = (Mat_SeqDense *) a->A->data;
713   Mat          B;
714   int          M = a->M, N = a->N, m, n, *rwork, rstart = a->rstart;
715   int          j, i, ierr;
716   Scalar       *v;
717 
718   if (!matout && M != N)
719     SETERRQ(1,"MatTranspose_MPIDense:Supports square matrix only in-place");
720   ierr = MatCreateMPIDense(A->comm,PETSC_DECIDE,PETSC_DECIDE,N,M,PETSC_NULL,&B);
721          CHKERRQ(ierr);
722 
723   m = Aloc->m; n = Aloc->n; v = Aloc->v;
724   rwork = (int *) PetscMalloc(n*sizeof(int)); CHKPTRQ(rwork);
725   for ( j=0; j<n; j++ ) {
726     for (i=0; i<m; i++) rwork[i] = rstart + i;
727     ierr = MatSetValues(B,1,&j,m,rwork,v,INSERT_VALUES); CHKERRQ(ierr);
728     v += m;
729   }
730   PetscFree(rwork);
731   ierr = MatAssemblyBegin(B,FINAL_ASSEMBLY); CHKERRQ(ierr);
732   ierr = MatAssemblyEnd(B,FINAL_ASSEMBLY); CHKERRQ(ierr);
733   if (matout) {
734     *matout = B;
735   } else {
736     /* This isn't really an in-place transpose, but free data struct from a */
737     PetscFree(a->rowners);
738     ierr = MatDestroy(a->A); CHKERRQ(ierr);
739     if (a->lvec) VecDestroy(a->lvec);
740     if (a->Mvctx) VecScatterDestroy(a->Mvctx);
741     PetscFree(a);
742     PetscMemcpy(A,B,sizeof(struct _Mat));
743     PetscHeaderDestroy(B);
744   }
745   return 0;
746 }
747 
748 static int MatConvertSameType_MPIDense(Mat,Mat *,int);
749 
750 /* -------------------------------------------------------------------*/
751 static struct _MatOps MatOps = {MatSetValues_MPIDense,
752        MatGetRow_MPIDense,MatRestoreRow_MPIDense,
753        MatMult_MPIDense,MatMultAdd_MPIDense,
754        MatMultTrans_MPIDense,MatMultTransAdd_MPIDense,
755        0,0,
756        0,0,0,
757        0,0,MatTranspose_MPIDense,
758        MatGetInfo_MPIDense,0,
759        MatGetDiagonal_MPIDense,0,MatNorm_MPIDense,
760        MatAssemblyBegin_MPIDense,MatAssemblyEnd_MPIDense,
761        0,
762        MatSetOption_MPIDense,MatZeroEntries_MPIDense,MatZeroRows_MPIDense,
763        0,
764        0,0,0,0,
765        MatGetSize_MPIDense,MatGetLocalSize_MPIDense,
766        MatGetOwnershipRange_MPIDense,
767        0,0,
768        0,0,0,0,0,MatConvertSameType_MPIDense,
769        0,0,0,0,0,
770        0,0,MatGetValues_MPIDense};
771 
772 /*@C
773    MatCreateMPIDense - Creates a sparse parallel matrix in dense format.
774 
775    Input Parameters:
776 .  comm - MPI communicator
777 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
778 .  n - number of local columns (or PETSC_DECIDE to have calculated
779            if N is given)
780 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
781 .  N - number of global columns (or PETSC_DECIDE to have calculated
782            if n is given)
783 .  data - optional location of matrix data.  Set data=PETSC_NULL for PETSc
784    to control all matrix memory allocation.
785 
786    Output Parameter:
787 .  newmat - the matrix
788 
789    Notes:
790    The dense format is fully compatible with standard Fortran 77
791    storage by columns.
792 
793    The data input variable is intended primarily for Fortran programmers
794    who wish to allocate their own matrix memory space.  Most users should
795    set data=PETSC_NULL.
796 
797    The user MUST specify either the local or global matrix dimensions
798    (possibly both).
799 
800    Currently, the only parallel dense matrix decomposition is by rows,
801    so that n=N and each submatrix owns all of the global columns.
802 
803 .keywords: matrix, dense, parallel
804 
805 .seealso: MatCreate(), MatCreateSeqDense(), MatSetValues()
806 @*/
807 int MatCreateMPIDense(MPI_Comm comm,int m,int n,int M,int N,Scalar *data,Mat *newmat)
808 {
809   Mat          mat;
810   Mat_MPIDense *a;
811   int          ierr, i;
812 
813 /* Note:  For now, when data is specified above, this assumes the user correctly
814    allocates the local dense storage space.  We should add error checking. */
815 
816   *newmat         = 0;
817   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIDENSE,comm);
818   PLogObjectCreate(mat);
819   mat->data       = (void *) (a = PetscNew(Mat_MPIDense)); CHKPTRQ(a);
820   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
821   mat->destroy    = MatDestroy_MPIDense;
822   mat->view       = MatView_MPIDense;
823   mat->factor     = 0;
824 
825   a->insertmode = NOT_SET_VALUES;
826   MPI_Comm_rank(comm,&a->rank);
827   MPI_Comm_size(comm,&a->size);
828 
829   if (M == PETSC_DECIDE) MPI_Allreduce(&m,&M,1,MPI_INT,MPI_SUM,comm);
830   if (m == PETSC_DECIDE) {m = M/a->size + ((M % a->size) > a->rank);}
831 
832   /* each row stores all columns */
833   if (N == PETSC_DECIDE) N = n;
834   if (n == PETSC_DECIDE) {n = N/a->size + ((N % a->size) > a->rank);}
835   /*  if (n != N) SETERRQ(1,"MatCreateMPIDense:For now, only n=N is supported"); */
836   a->N = N;
837   a->M = M;
838   a->m = m;
839   a->n = n;
840 
841   /* build local table of row and column ownerships */
842   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
843   a->cowners = a->rowners + a->size + 1;
844   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+
845                        sizeof(Mat_MPIDense));
846   MPI_Allgather(&m,1,MPI_INT,a->rowners+1,1,MPI_INT,comm);
847   a->rowners[0] = 0;
848   for ( i=2; i<=a->size; i++ ) {
849     a->rowners[i] += a->rowners[i-1];
850   }
851   a->rstart = a->rowners[a->rank];
852   a->rend   = a->rowners[a->rank+1];
853   MPI_Allgather(&n,1,MPI_INT,a->cowners+1,1,MPI_INT,comm);
854   a->cowners[0] = 0;
855   for ( i=2; i<=a->size; i++ ) {
856     a->cowners[i] += a->cowners[i-1];
857   }
858 
859   ierr = MatCreateSeqDense(MPI_COMM_SELF,m,N,data,&a->A); CHKERRQ(ierr);
860   PLogObjectParent(mat,a->A);
861 
862   /* build cache for off array entries formed */
863   ierr = StashBuild_Private(&a->stash); CHKERRQ(ierr);
864 
865   /* stuff used for matrix vector multiply */
866   a->lvec        = 0;
867   a->Mvctx       = 0;
868   a->assembled   = 0;
869   a->roworiented = 1;
870 
871   *newmat = mat;
872   return 0;
873 }
874 
875 static int MatConvertSameType_MPIDense(Mat A,Mat *newmat,int cpvalues)
876 {
877   Mat          mat;
878   Mat_MPIDense *a,*oldmat = (Mat_MPIDense *) A->data;
879   int          ierr;
880 
881   if (!oldmat->assembled) SETERRQ(1,"MatConvertSameType_MPIDense:Must assemble matrix");
882   *newmat       = 0;
883   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIDENSE,A->comm);
884   PLogObjectCreate(mat);
885   mat->data     = (void *) (a = PetscNew(Mat_MPIDense)); CHKPTRQ(a);
886   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
887   mat->destroy  = MatDestroy_MPIDense;
888   mat->view     = MatView_MPIDense;
889   mat->factor   = A->factor;
890 
891   a->m          = oldmat->m;
892   a->n          = oldmat->n;
893   a->M          = oldmat->M;
894   a->N          = oldmat->N;
895 
896   a->assembled  = 1;
897   a->rstart     = oldmat->rstart;
898   a->rend       = oldmat->rend;
899   a->size       = oldmat->size;
900   a->rank       = oldmat->rank;
901   a->insertmode = NOT_SET_VALUES;
902 
903   a->rowners = (int *) PetscMalloc((a->size+1)*sizeof(int)); CHKPTRQ(a->rowners);
904   PLogObjectMemory(mat,(a->size+1)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIDense));
905   PetscMemcpy(a->rowners,oldmat->rowners,(a->size+1)*sizeof(int));
906   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
907 
908   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
909   PLogObjectParent(mat,a->lvec);
910   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
911   PLogObjectParent(mat,a->Mvctx);
912   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
913   PLogObjectParent(mat,a->A);
914   *newmat = mat;
915   return 0;
916 }
917 
918 #include "sysio.h"
919 
920 int MatLoad_MPIDense(Viewer bview,MatType type,Mat *newmat)
921 {
922   Mat          A;
923   int          i, nz, ierr, j,rstart, rend, fd;
924   Scalar       *vals,*svals;
925   PetscObject  vobj = (PetscObject) bview;
926   MPI_Comm     comm = vobj->comm;
927   MPI_Status   status;
928   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,maxnz,*cols;
929   int          *ourlens,*sndcounts = 0,*procsnz = 0, *offlens,jj,*mycols,*smycols;
930   int          tag = ((PetscObject)bview)->tag;
931 
932   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
933   if (!rank) {
934     ierr = ViewerFileGetDescriptor_Private(bview,&fd); CHKERRQ(ierr);
935     ierr = SYRead(fd,(char *)header,4,SYINT); CHKERRQ(ierr);
936     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIDenseorMPIRow:not matrix object");
937   }
938 
939   MPI_Bcast(header+1,3,MPI_INT,0,comm);
940   M = header[1]; N = header[2];
941   /* determine ownership of all rows */
942   m = M/size + ((M % size) > rank);
943   rowners = (int *) PetscMalloc((size+2)*sizeof(int)); CHKPTRQ(rowners);
944   MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);
945   rowners[0] = 0;
946   for ( i=2; i<=size; i++ ) {
947     rowners[i] += rowners[i-1];
948   }
949   rstart = rowners[rank];
950   rend   = rowners[rank+1];
951 
952   /* distribute row lengths to all processors */
953   ourlens = (int*) PetscMalloc( 2*(rend-rstart)*sizeof(int) ); CHKPTRQ(ourlens);
954   offlens = ourlens + (rend-rstart);
955   if (!rank) {
956     rowlengths = (int*) PetscMalloc( M*sizeof(int) ); CHKPTRQ(rowlengths);
957     ierr = SYRead(fd,rowlengths,M,SYINT); CHKERRQ(ierr);
958     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
959     for ( i=0; i<size; i++ ) sndcounts[i] = rowners[i+1] - rowners[i];
960     MPI_Scatterv(rowlengths,sndcounts,rowners,MPI_INT,ourlens,rend-rstart,MPI_INT,0,comm);
961     PetscFree(sndcounts);
962   }
963   else {
964     MPI_Scatterv(0,0,0,MPI_INT,ourlens,rend-rstart,MPI_INT, 0,comm);
965   }
966 
967   if (!rank) {
968     /* calculate the number of nonzeros on each processor */
969     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
970     PetscMemzero(procsnz,size*sizeof(int));
971     for ( i=0; i<size; i++ ) {
972       for ( j=rowners[i]; j< rowners[i+1]; j++ ) {
973         procsnz[i] += rowlengths[j];
974       }
975     }
976     PetscFree(rowlengths);
977 
978     /* determine max buffer needed and allocate it */
979     maxnz = 0;
980     for ( i=0; i<size; i++ ) {
981       maxnz = PetscMax(maxnz,procsnz[i]);
982     }
983     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
984 
985     /* read in my part of the matrix column indices  */
986     nz = procsnz[0];
987     mycols = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
988     ierr = SYRead(fd,mycols,nz,SYINT); CHKERRQ(ierr);
989 
990     /* read in every one elses and ship off */
991     for ( i=1; i<size; i++ ) {
992       nz = procsnz[i];
993       ierr = SYRead(fd,cols,nz,SYINT); CHKERRQ(ierr);
994       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
995     }
996     PetscFree(cols);
997   }
998   else {
999     /* determine buffer space needed for message */
1000     nz = 0;
1001     for ( i=0; i<m; i++ ) {
1002       nz += ourlens[i];
1003     }
1004     mycols = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1005 
1006     /* receive message of column indices*/
1007     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1008     MPI_Get_count(&status,MPI_INT,&maxnz);
1009     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIDenseorMPIRow:something is wrong with file");
1010   }
1011 
1012   /* loop over local rows, determining number of off diagonal entries */
1013   PetscMemzero(offlens,m*sizeof(int));
1014   jj = 0;
1015   for ( i=0; i<m; i++ ) {
1016     for ( j=0; j<ourlens[i]; j++ ) {
1017       if (mycols[jj] < rstart || mycols[jj] >= rend) offlens[i]++;
1018       jj++;
1019     }
1020   }
1021 
1022   /* create our matrix */
1023   for ( i=0; i<m; i++ ) {
1024     ourlens[i] -= offlens[i];
1025   }
1026   if (type == MATMPIDENSE) {
1027     ierr = MatCreateMPIDense(comm,m,PETSC_DECIDE,M,N,PETSC_NULL,newmat); CHKERRQ(ierr);
1028   }
1029   A = *newmat;
1030   for ( i=0; i<m; i++ ) {
1031     ourlens[i] += offlens[i];
1032   }
1033 
1034   if (!rank) {
1035     vals = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(vals);
1036 
1037     /* read in my part of the matrix numerical values  */
1038     nz = procsnz[0];
1039     ierr = SYRead(fd,vals,nz,SYSCALAR); CHKERRQ(ierr);
1040 
1041     /* insert into matrix */
1042     jj      = rstart;
1043     smycols = mycols;
1044     svals   = vals;
1045     for ( i=0; i<m; i++ ) {
1046       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1047       smycols += ourlens[i];
1048       svals   += ourlens[i];
1049       jj++;
1050     }
1051 
1052     /* read in other processors and ship out */
1053     for ( i=1; i<size; i++ ) {
1054       nz = procsnz[i];
1055       ierr = SYRead(fd,vals,nz,SYSCALAR); CHKERRQ(ierr);
1056       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1057     }
1058     PetscFree(procsnz);
1059   }
1060   else {
1061     /* receive numeric values */
1062     vals = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(vals);
1063 
1064     /* receive message of values*/
1065     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1066     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1067     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIDenseorMPIRow:something is wrong with file");
1068 
1069     /* insert into matrix */
1070     jj      = rstart;
1071     smycols = mycols;
1072     svals   = vals;
1073     for ( i=0; i<m; i++ ) {
1074       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1075       smycols += ourlens[i];
1076       svals   += ourlens[i];
1077       jj++;
1078     }
1079   }
1080   PetscFree(ourlens); PetscFree(vals); PetscFree(mycols); PetscFree(rowners);
1081 
1082   ierr = MatAssemblyBegin(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
1083   ierr = MatAssemblyEnd(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
1084   return 0;
1085 }
1086