xref: /petsc/src/mat/impls/dense/mpi/mpidense.c (revision c456f2949236df7c211b0dd2704e71733d0ab932)
1 #ifndef lint
2 static char vcid[] = "$Id: mpidense.c,v 1.25 1996/01/12 22:07:01 bsmith Exp bsmith $";
3 #endif
4 
5 /*
6    Basic functions for basic parallel dense matrices.
7 */
8 
9 #include "mpidense.h"
10 #include "vec/vecimpl.h"
11 #include "inline/spops.h"
12 
13 static int MatSetValues_MPIDense(Mat mat,int m,int *idxm,int n,
14                                int *idxn,Scalar *v,InsertMode addv)
15 {
16   Mat_MPIDense *A = (Mat_MPIDense *) mat->data;
17   int          ierr, i, j, rstart = A->rstart, rend = A->rend, row;
18   int          roworiented = A->roworiented;
19 
20   if (A->insertmode != NOT_SET_VALUES && A->insertmode != addv) {
21     SETERRQ(1,"MatSetValues_MPIDense:Cannot mix inserts and adds");
22   }
23   A->insertmode = addv;
24   for ( i=0; i<m; i++ ) {
25     if (idxm[i] < 0) SETERRQ(1,"MatSetValues_MPIDense:Negative row");
26     if (idxm[i] >= A->M) SETERRQ(1,"MatSetValues_MPIDense:Row too large");
27     if (idxm[i] >= rstart && idxm[i] < rend) {
28       row = idxm[i] - rstart;
29       if (roworiented) {
30         ierr = MatSetValues(A->A,1,&row,n,idxn,v+i*n,addv); CHKERRQ(ierr);
31       }
32       else {
33         for ( j=0; j<n; j++ ) {
34           if (idxn[j] < 0) SETERRQ(1,"MatSetValues_MPIDense:Negative column");
35           if (idxn[j] >= A->N) SETERRQ(1,"MatSetValues_MPIDense:Column too large");
36           ierr = MatSetValues(A->A,1,&row,1,&idxn[j],v+i+j*m,addv); CHKERRQ(ierr);
37         }
38       }
39     }
40     else {
41       if (roworiented) {
42         ierr = StashValues_Private(&A->stash,idxm[i],n,idxn,v+i*n,addv); CHKERRQ(ierr);
43       }
44       else { /* must stash each seperately */
45         row = idxm[i];
46         for ( j=0; j<n; j++ ) {
47           ierr = StashValues_Private(&A->stash,row,1,&idxn[j],v+i+j*m,addv);
48                  CHKERRQ(ierr);
49         }
50       }
51     }
52   }
53   return 0;
54 }
55 
56 static int MatGetValues_MPIDense(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
57 {
58   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
59   int          ierr, i, j, rstart = mdn->rstart, rend = mdn->rend, row;
60 
61   for ( i=0; i<m; i++ ) {
62     if (idxm[i] < 0) SETERRQ(1,"MatGetValues_MPIDense:Negative row");
63     if (idxm[i] >= mdn->M) SETERRQ(1,"MatGetValues_MPIDense:Row too large");
64     if (idxm[i] >= rstart && idxm[i] < rend) {
65       row = idxm[i] - rstart;
66       for ( j=0; j<n; j++ ) {
67         if (idxn[j] < 0) SETERRQ(1,"MatGetValues_MPIDense:Negative column");
68         if (idxn[j] >= mdn->N)
69           SETERRQ(1,"MatGetValues_MPIDense:Column too large");
70         ierr = MatGetValues(mdn->A,1,&row,1,&idxn[j],v+i*n+j); CHKERRQ(ierr);
71       }
72     }
73     else {
74       SETERRQ(1,"MatGetValues_MPIDense:Only local values currently supported");
75     }
76   }
77   return 0;
78 }
79 
80 static int MatAssemblyBegin_MPIDense(Mat mat,MatAssemblyType mode)
81 {
82   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
83   MPI_Comm     comm = mat->comm;
84   int          size = mdn->size, *owners = mdn->rowners, rank = mdn->rank;
85   int          *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
86   int          tag = mat->tag, *owner,*starts,count,ierr;
87   InsertMode   addv;
88   MPI_Request  *send_waits,*recv_waits;
89   Scalar       *rvalues,*svalues;
90 
91   /* make sure all processors are either in INSERTMODE or ADDMODE */
92   MPI_Allreduce((void *) &mdn->insertmode,(void *) &addv,1,MPI_INT,
93                 MPI_BOR,comm);
94   if (addv == (ADD_VALUES|INSERT_VALUES)) { SETERRQ(1,
95     "MatAssemblyBegin_MPIDense:Cannot mix adds/inserts on different procs");
96     }
97   mdn->insertmode = addv; /* in case this processor had no cache */
98 
99   /*  first count number of contributors to each processor */
100   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
101   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
102   owner = (int *) PetscMalloc( (mdn->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
103   for ( i=0; i<mdn->stash.n; i++ ) {
104     idx = mdn->stash.idx[i];
105     for ( j=0; j<size; j++ ) {
106       if (idx >= owners[j] && idx < owners[j+1]) {
107         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
108       }
109     }
110   }
111   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
112 
113   /* inform other processors of number of messages and max length*/
114   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
115   MPI_Allreduce((void *) procs,(void *) work,size,MPI_INT,MPI_SUM,comm);
116   nreceives = work[rank];
117   MPI_Allreduce((void *) nprocs,(void *) work,size,MPI_INT,MPI_MAX,comm);
118   nmax = work[rank];
119   PetscFree(work);
120 
121   /* post receives:
122        1) each message will consist of ordered pairs
123      (global index,value) we store the global index as a double
124      to simplify the message passing.
125        2) since we don't know how long each individual message is we
126      allocate the largest needed buffer for each receive. Potentially
127      this is a lot of wasted space.
128 
129        This could be done better.
130   */
131   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
132   CHKPTRQ(rvalues);
133   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
134   CHKPTRQ(recv_waits);
135   for ( i=0; i<nreceives; i++ ) {
136     MPI_Irecv((void *)(rvalues+3*nmax*i),3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
137               comm,recv_waits+i);
138   }
139 
140   /* do sends:
141       1) starts[i] gives the starting index in svalues for stuff going to
142          the ith processor
143   */
144   svalues = (Scalar *) PetscMalloc( 3*(mdn->stash.n+1)*sizeof(Scalar) );
145   CHKPTRQ(svalues);
146   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
147   CHKPTRQ(send_waits);
148   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
149   starts[0] = 0;
150   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
151   for ( i=0; i<mdn->stash.n; i++ ) {
152     svalues[3*starts[owner[i]]]       = (Scalar)  mdn->stash.idx[i];
153     svalues[3*starts[owner[i]]+1]     = (Scalar)  mdn->stash.idy[i];
154     svalues[3*(starts[owner[i]]++)+2] =  mdn->stash.array[i];
155   }
156   PetscFree(owner);
157   starts[0] = 0;
158   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
159   count = 0;
160   for ( i=0; i<size; i++ ) {
161     if (procs[i]) {
162       MPI_Isend((void*)(svalues+3*starts[i]),3*nprocs[i],MPIU_SCALAR,i,tag,
163                 comm,send_waits+count++);
164     }
165   }
166   PetscFree(starts); PetscFree(nprocs);
167 
168   /* Free cache space */
169   ierr = StashDestroy_Private(&mdn->stash); CHKERRQ(ierr);
170 
171   mdn->svalues    = svalues;    mdn->rvalues = rvalues;
172   mdn->nsends     = nsends;     mdn->nrecvs = nreceives;
173   mdn->send_waits = send_waits; mdn->recv_waits = recv_waits;
174   mdn->rmax       = nmax;
175 
176   return 0;
177 }
178 extern int MatSetUpMultiply_MPIDense(Mat);
179 
180 static int MatAssemblyEnd_MPIDense(Mat mat,MatAssemblyType mode)
181 {
182   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
183   MPI_Status   *send_status,recv_status;
184   int          imdex, nrecvs=mdn->nrecvs, count=nrecvs, i, n, ierr, row, col;
185   Scalar       *values,val;
186   InsertMode   addv = mdn->insertmode;
187 
188   /*  wait on receives */
189   while (count) {
190     MPI_Waitany(nrecvs,mdn->recv_waits,&imdex,&recv_status);
191     /* unpack receives into our local space */
192     values = mdn->rvalues + 3*imdex*mdn->rmax;
193     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
194     n = n/3;
195     for ( i=0; i<n; i++ ) {
196       row = (int) PETSCREAL(values[3*i]) - mdn->rstart;
197       col = (int) PETSCREAL(values[3*i+1]);
198       val = values[3*i+2];
199       if (col >= 0 && col < mdn->N) {
200         MatSetValues(mdn->A,1,&row,1,&col,&val,addv);
201       }
202       else {SETERRQ(1,"MatAssemblyEnd_MPIDense:Invalid column");}
203     }
204     count--;
205   }
206   PetscFree(mdn->recv_waits); PetscFree(mdn->rvalues);
207 
208   /* wait on sends */
209   if (mdn->nsends) {
210     send_status = (MPI_Status *) PetscMalloc( mdn->nsends*sizeof(MPI_Status) );
211     CHKPTRQ(send_status);
212     MPI_Waitall(mdn->nsends,mdn->send_waits,send_status);
213     PetscFree(send_status);
214   }
215   PetscFree(mdn->send_waits); PetscFree(mdn->svalues);
216 
217   mdn->insertmode = NOT_SET_VALUES;
218   ierr = MatAssemblyBegin(mdn->A,mode); CHKERRQ(ierr);
219   ierr = MatAssemblyEnd(mdn->A,mode); CHKERRQ(ierr);
220 
221   if (!mat->assembled && mode == FINAL_ASSEMBLY) {
222     ierr = MatSetUpMultiply_MPIDense(mat); CHKERRQ(ierr);
223   }
224   return 0;
225 }
226 
227 static int MatZeroEntries_MPIDense(Mat A)
228 {
229   Mat_MPIDense *l = (Mat_MPIDense *) A->data;
230   return MatZeroEntries(l->A);
231 }
232 
233 /* the code does not do the diagonal entries correctly unless the
234    matrix is square and the column and row owerships are identical.
235    This is a BUG. The only way to fix it seems to be to access
236    mdn->A and mdn->B directly and not through the MatZeroRows()
237    routine.
238 */
239 static int MatZeroRows_MPIDense(Mat A,IS is,Scalar *diag)
240 {
241   Mat_MPIDense   *l = (Mat_MPIDense *) A->data;
242   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
243   int            *procs,*nprocs,j,found,idx,nsends,*work;
244   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
245   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
246   int            *lens,imdex,*lrows,*values;
247   MPI_Comm       comm = A->comm;
248   MPI_Request    *send_waits,*recv_waits;
249   MPI_Status     recv_status,*send_status;
250   IS             istmp;
251 
252   ierr = ISGetLocalSize(is,&N); CHKERRQ(ierr);
253   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
254 
255   /*  first count number of contributors to each processor */
256   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
257   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
258   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
259   for ( i=0; i<N; i++ ) {
260     idx = rows[i];
261     found = 0;
262     for ( j=0; j<size; j++ ) {
263       if (idx >= owners[j] && idx < owners[j+1]) {
264         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
265       }
266     }
267     if (!found) SETERRQ(1,"MatZeroRows_MPIDense:Index out of range");
268   }
269   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
270 
271   /* inform other processors of number of messages and max length*/
272   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
273   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
274   nrecvs = work[rank];
275   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
276   nmax = work[rank];
277   PetscFree(work);
278 
279   /* post receives:   */
280   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
281   CHKPTRQ(rvalues);
282   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
283   CHKPTRQ(recv_waits);
284   for ( i=0; i<nrecvs; i++ ) {
285     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
286   }
287 
288   /* do sends:
289       1) starts[i] gives the starting index in svalues for stuff going to
290          the ith processor
291   */
292   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
293   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
294   CHKPTRQ(send_waits);
295   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
296   starts[0] = 0;
297   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
298   for ( i=0; i<N; i++ ) {
299     svalues[starts[owner[i]]++] = rows[i];
300   }
301   ISRestoreIndices(is,&rows);
302 
303   starts[0] = 0;
304   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
305   count = 0;
306   for ( i=0; i<size; i++ ) {
307     if (procs[i]) {
308       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
309     }
310   }
311   PetscFree(starts);
312 
313   base = owners[rank];
314 
315   /*  wait on receives */
316   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
317   source = lens + nrecvs;
318   count  = nrecvs; slen = 0;
319   while (count) {
320     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
321     /* unpack receives into our local space */
322     MPI_Get_count(&recv_status,MPI_INT,&n);
323     source[imdex]  = recv_status.MPI_SOURCE;
324     lens[imdex]  = n;
325     slen += n;
326     count--;
327   }
328   PetscFree(recv_waits);
329 
330   /* move the data into the send scatter */
331   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
332   count = 0;
333   for ( i=0; i<nrecvs; i++ ) {
334     values = rvalues + i*nmax;
335     for ( j=0; j<lens[i]; j++ ) {
336       lrows[count++] = values[j] - base;
337     }
338   }
339   PetscFree(rvalues); PetscFree(lens);
340   PetscFree(owner); PetscFree(nprocs);
341 
342   /* actually zap the local rows */
343   ierr = ISCreateSeq(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
344   PLogObjectParent(A,istmp);
345   PetscFree(lrows);
346   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
347   ierr = ISDestroy(istmp); CHKERRQ(ierr);
348 
349   /* wait on sends */
350   if (nsends) {
351     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
352     CHKPTRQ(send_status);
353     MPI_Waitall(nsends,send_waits,send_status);
354     PetscFree(send_status);
355   }
356   PetscFree(send_waits); PetscFree(svalues);
357 
358   return 0;
359 }
360 
361 static int MatMult_MPIDense(Mat mat,Vec xx,Vec yy)
362 {
363   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
364   int          ierr;
365 
366   ierr = VecScatterBegin(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);
367   CHKERRQ(ierr);
368   ierr = VecScatterEnd(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);
369   CHKERRQ(ierr);
370   ierr = MatMult(mdn->A,mdn->lvec,yy); CHKERRQ(ierr);
371   return 0;
372 }
373 
374 static int MatMultAdd_MPIDense(Mat mat,Vec xx,Vec yy,Vec zz)
375 {
376   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
377   int          ierr;
378 
379   ierr = VecScatterBegin(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);CHKERRQ(ierr);
380   ierr = VecScatterEnd(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);CHKERRQ(ierr);
381   ierr = MatMultAdd(mdn->A,mdn->lvec,yy,zz); CHKERRQ(ierr);
382   return 0;
383 }
384 
385 static int MatMultTrans_MPIDense(Mat A,Vec xx,Vec yy)
386 {
387   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
388   int          ierr;
389   Scalar       zero = 0.0;
390 
391   ierr = VecSet(&zero,yy); CHKERRQ(ierr);
392   ierr = MatMultTrans(a->A,xx,a->lvec); CHKERRQ(ierr);
393   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,
394          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
395   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,
396          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
397   return 0;
398 }
399 
400 static int MatMultTransAdd_MPIDense(Mat A,Vec xx,Vec yy,Vec zz)
401 {
402   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
403   int          ierr;
404 
405   ierr = VecCopy(yy,zz); CHKERRQ(ierr);
406   ierr = MatMultTrans(a->A,xx,a->lvec); CHKERRQ(ierr);
407   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,
408          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
409   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,
410          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
411   return 0;
412 }
413 
414 static int MatGetDiagonal_MPIDense(Mat A,Vec v)
415 {
416   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
417   Mat_SeqDense *aloc = (Mat_SeqDense *) a->A->data;
418   int          ierr, i, n, m = a->m, radd;
419   Scalar       *x;
420 
421   ierr = VecGetArray(v,&x); CHKERRQ(ierr);
422   ierr = VecGetSize(v,&n); CHKERRQ(ierr);
423   if (n != a->M) SETERRQ(1,"MatGetDiagonal_SeqDense:Nonconforming mat and vec");
424   radd = a->rstart*m*m;
425   for ( i=0; i<m; i++ ) {
426     x[i] = aloc->v[radd + i*m + i];
427   }
428   return 0;
429 }
430 
431 static int MatDestroy_MPIDense(PetscObject obj)
432 {
433   Mat          mat = (Mat) obj;
434   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
435   int          ierr;
436 
437 #if defined(PETSC_LOG)
438   PLogObjectState(obj,"Rows=%d, Cols=%d",mdn->M,mdn->N);
439 #endif
440   PetscFree(mdn->rowners);
441   ierr = MatDestroy(mdn->A); CHKERRQ(ierr);
442   if (mdn->lvec)   VecDestroy(mdn->lvec);
443   if (mdn->Mvctx)  VecScatterDestroy(mdn->Mvctx);
444   if (mdn->factor) {
445     if (mdn->factor->temp)   PetscFree(mdn->factor->temp);
446     if (mdn->factor->tag)    PetscFree(mdn->factor->tag);
447     if (mdn->factor->pivots) PetscFree(mdn->factor->pivots);
448     PetscFree(mdn->factor);
449   }
450   PetscFree(mdn);
451   PLogObjectDestroy(mat);
452   PetscHeaderDestroy(mat);
453   return 0;
454 }
455 
456 #include "pinclude/pviewer.h"
457 
458 static int MatView_MPIDense_Binary(Mat mat,Viewer viewer)
459 {
460   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
461   int          ierr;
462   if (mdn->size == 1) {
463     ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
464   }
465   else SETERRQ(1,"MatView_MPIDense_Binary:Only uniprocessor output supported");
466   return 0;
467 }
468 
469 static int MatView_MPIDense_ASCII(Mat mat,Viewer viewer)
470 {
471   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
472   int          ierr, format;
473   PetscObject  vobj = (PetscObject) viewer;
474   FILE         *fd;
475 
476   ierr = ViewerFileGetPointer_Private(viewer,&fd); CHKERRQ(ierr);
477   if (vobj->type == ASCII_FILE_VIEWER || vobj->type == ASCII_FILES_VIEWER) {
478     ierr = ViewerFileGetFormat_Private(viewer,&format);
479     if (format == FILE_FORMAT_INFO_DETAILED) {
480       int nz, nzalloc, mem, rank;
481       MPI_Comm_rank(mat->comm,&rank);
482       ierr = MatGetInfo(mat,MAT_LOCAL,&nz,&nzalloc,&mem);
483       MPIU_Seq_begin(mat->comm,1);
484         fprintf(fd,"  [%d] local rows %d nz %d nz alloced %d mem %d \n",
485             rank,mdn->m,nz,nzalloc,mem);
486       fflush(fd);
487       MPIU_Seq_end(mat->comm,1);
488       ierr = VecScatterView(mdn->Mvctx,viewer); CHKERRQ(ierr);
489       return 0;
490     }
491     else if (format == FILE_FORMAT_INFO) {
492       return 0;
493     }
494   }
495   if (vobj->type == ASCII_FILE_VIEWER) {
496     MPIU_Seq_begin(mat->comm,1);
497     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d\n",
498              mdn->rank,mdn->m,mdn->rstart,mdn->rend,mdn->n);
499     ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
500     fflush(fd);
501     MPIU_Seq_end(mat->comm,1);
502   }
503   else {
504     int size = mdn->size, rank = mdn->rank;
505     if (size == 1) {
506       ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
507     }
508     else {
509       /* assemble the entire matrix onto first processor. */
510       Mat          A;
511       int          M = mdn->M, N = mdn->N,m,row,i, nz, *cols;
512       Scalar       *vals;
513       Mat_SeqDense *Amdn = (Mat_SeqDense*) mdn->A->data;
514 
515       if (!rank) {
516         ierr = MatCreateMPIDense(mat->comm,M,M,N,N,PETSC_NULL,&A); CHKERRQ(ierr);
517       }
518       else {
519         ierr = MatCreateMPIDense(mat->comm,0,M,N,N,PETSC_NULL,&A); CHKERRQ(ierr);
520       }
521       PLogObjectParent(mat,A);
522 
523       /* Copy the matrix ... This isn't the most efficient means,
524          but it's quick for now */
525       row = mdn->rstart; m = Amdn->m;
526       for ( i=0; i<m; i++ ) {
527         ierr = MatGetRow(mat,row,&nz,&cols,&vals); CHKERRQ(ierr);
528         ierr = MatSetValues(A,1,&row,nz,cols,vals,INSERT_VALUES); CHKERRQ(ierr);
529         ierr = MatRestoreRow(mat,row,&nz,&cols,&vals); CHKERRQ(ierr);
530         row++;
531       }
532 
533       ierr = MatAssemblyBegin(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
534       ierr = MatAssemblyEnd(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
535       if (!rank) {
536         ierr = MatView(((Mat_MPIDense*)(A->data))->A,viewer); CHKERRQ(ierr);
537       }
538       ierr = MatDestroy(A); CHKERRQ(ierr);
539     }
540   }
541   return 0;
542 }
543 
544 static int MatView_MPIDense(PetscObject obj,Viewer viewer)
545 {
546   Mat          mat = (Mat) obj;
547   PetscObject  vobj = (PetscObject) viewer;
548   int          ierr;
549 
550   if (!viewer) {
551     viewer = STDOUT_VIEWER_SELF; vobj = (PetscObject) viewer;
552   }
553   if (vobj->cookie == VIEWER_COOKIE && vobj->type == ASCII_FILE_VIEWER) {
554     ierr = MatView_MPIDense_ASCII(mat,viewer); CHKERRQ(ierr);
555   }
556   else if (vobj->cookie == VIEWER_COOKIE && vobj->type == ASCII_FILES_VIEWER) {
557     ierr = MatView_MPIDense_ASCII(mat,viewer); CHKERRQ(ierr);
558   }
559   else if (vobj->type == BINARY_FILE_VIEWER) {
560     return MatView_MPIDense_Binary(mat,viewer);
561   }
562   return 0;
563 }
564 
565 static int MatGetInfo_MPIDense(Mat A,MatInfoType flag,int *nz,
566                              int *nzalloc,int *mem)
567 {
568   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
569   Mat          mdn = mat->A;
570   int          ierr, isend[3], irecv[3];
571 
572   ierr = MatGetInfo(mdn,MAT_LOCAL,&isend[0],&isend[1],&isend[2]); CHKERRQ(ierr);
573   if (flag == MAT_LOCAL) {
574     *nz = isend[0]; *nzalloc = isend[1]; *mem = isend[2];
575   } else if (flag == MAT_GLOBAL_MAX) {
576     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_MAX,A->comm);
577     *nz = irecv[0]; *nzalloc = irecv[1]; *mem = irecv[2];
578   } else if (flag == MAT_GLOBAL_SUM) {
579     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_SUM,A->comm);
580     *nz = irecv[0]; *nzalloc = irecv[1]; *mem = irecv[2];
581   }
582   return 0;
583 }
584 
585 /* extern int MatLUFactorSymbolic_MPIDense(Mat,IS,IS,double,Mat*);
586    extern int MatLUFactorNumeric_MPIDense(Mat,Mat*);
587    extern int MatLUFactor_MPIDense(Mat,IS,IS,double);
588    extern int MatSolve_MPIDense(Mat,Vec,Vec);
589    extern int MatSolveAdd_MPIDense(Mat,Vec,Vec,Vec);
590    extern int MatSolveTrans_MPIDense(Mat,Vec,Vec);
591    extern int MatSolveTransAdd_MPIDense(Mat,Vec,Vec,Vec); */
592 
593 static int MatSetOption_MPIDense(Mat A,MatOption op)
594 {
595   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
596 
597   if (op == NO_NEW_NONZERO_LOCATIONS ||
598       op == YES_NEW_NONZERO_LOCATIONS ||
599       op == COLUMNS_SORTED ||
600       op == ROW_ORIENTED) {
601         MatSetOption(a->A,op);
602   }
603   else if (op == ROWS_SORTED ||
604            op == SYMMETRIC_MATRIX ||
605            op == STRUCTURALLY_SYMMETRIC_MATRIX ||
606            op == YES_NEW_DIAGONALS)
607     PLogInfo((PetscObject)A,"Info:MatSetOption_MPIDense:Option ignored\n");
608   else if (op == COLUMN_ORIENTED)
609     { a->roworiented = 0; MatSetOption(a->A,op);}
610   else if (op == NO_NEW_DIAGONALS)
611     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIDense:NO_NEW_DIAGONALS");}
612   else
613     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIDense:unknown option");}
614   return 0;
615 }
616 
617 static int MatGetSize_MPIDense(Mat A,int *m,int *n)
618 {
619   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
620   *m = mat->M; *n = mat->N;
621   return 0;
622 }
623 
624 static int MatGetLocalSize_MPIDense(Mat A,int *m,int *n)
625 {
626   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
627   *m = mat->m; *n = mat->N;
628   return 0;
629 }
630 
631 static int MatGetOwnershipRange_MPIDense(Mat A,int *m,int *n)
632 {
633   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
634   *m = mat->rstart; *n = mat->rend;
635   return 0;
636 }
637 
638 static int MatGetRow_MPIDense(Mat A,int row,int *nz,int **idx,Scalar **v)
639 {
640   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
641   int          lrow, rstart = mat->rstart, rend = mat->rend;
642 
643   if (row < rstart || row >= rend) SETERRQ(1,"MatGetRow_MPIDense:only local rows")
644   lrow = row - rstart;
645   return MatGetRow(mat->A,lrow,nz,idx,v);
646 }
647 
648 static int MatRestoreRow_MPIDense(Mat mat,int row,int *nz,int **idx,Scalar **v)
649 {
650   if (idx) PetscFree(*idx);
651   if (v) PetscFree(*v);
652   return 0;
653 }
654 
655 static int MatNorm_MPIDense(Mat A,NormType type,double *norm)
656 {
657   Mat_MPIDense *mdn = (Mat_MPIDense *) A->data;
658   Mat_SeqDense *mat = (Mat_SeqDense*) mdn->A->data;
659   int          ierr, i, j;
660   double       sum = 0.0;
661   Scalar       *v = mat->v;
662 
663   if (mdn->size == 1) {
664     ierr =  MatNorm(mdn->A,type,norm); CHKERRQ(ierr);
665   } else {
666     if (type == NORM_FROBENIUS) {
667       for (i=0; i<mat->n*mat->m; i++ ) {
668 #if defined(PETSC_COMPLEX)
669         sum += real(conj(*v)*(*v)); v++;
670 #else
671         sum += (*v)*(*v); v++;
672 #endif
673       }
674       MPI_Allreduce((void*)&sum,(void*)norm,1,MPI_DOUBLE,MPI_SUM,A->comm);
675       *norm = sqrt(*norm);
676       PLogFlops(2*mat->n*mat->m);
677     }
678     else if (type == NORM_1) {
679       double *tmp, *tmp2;
680       tmp  = (double *) PetscMalloc( 2*mdn->N*sizeof(double) ); CHKPTRQ(tmp);
681       tmp2 = tmp + mdn->N;
682       PetscMemzero(tmp,2*mdn->N*sizeof(double));
683       *norm = 0.0;
684       v = mat->v;
685       for ( j=0; j<mat->n; j++ ) {
686         for ( i=0; i<mat->m; i++ ) {
687           tmp[j] += PetscAbsScalar(*v);  v++;
688         }
689       }
690       MPI_Allreduce((void*)tmp,(void*)tmp2,mdn->N,MPI_DOUBLE,MPI_SUM,A->comm);
691       for ( j=0; j<mdn->N; j++ ) {
692         if (tmp2[j] > *norm) *norm = tmp2[j];
693       }
694       PetscFree(tmp);
695       PLogFlops(mat->n*mat->m);
696     }
697     else if (type == NORM_INFINITY) { /* max row norm */
698       double ntemp;
699       ierr = MatNorm(mdn->A,type,&ntemp); CHKERRQ(ierr);
700       MPI_Allreduce((void*)&ntemp,(void*)norm,1,MPI_DOUBLE,MPI_MAX,A->comm);
701     }
702     else {
703       SETERRQ(1,"MatNorm_MPIDense:No support for two norm");
704     }
705   }
706   return 0;
707 }
708 
709 static int MatTranspose_MPIDense(Mat A,Mat *matout)
710 {
711   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
712   Mat_SeqDense *Aloc = (Mat_SeqDense *) a->A->data;
713   Mat          B;
714   int          M = a->M, N = a->N, m, n, *rwork, rstart = a->rstart;
715   int          j, i, ierr;
716   Scalar       *v;
717 
718   if (!matout && M != N)
719     SETERRQ(1,"MatTranspose_MPIDense:Supports square matrix only in-place");
720   ierr = MatCreateMPIDense(A->comm,PETSC_DECIDE,PETSC_DECIDE,N,M,PETSC_NULL,&B);
721          CHKERRQ(ierr);
722 
723   m = Aloc->m; n = Aloc->n; v = Aloc->v;
724   rwork = (int *) PetscMalloc(n*sizeof(int)); CHKPTRQ(rwork);
725   for ( j=0; j<n; j++ ) {
726     for (i=0; i<m; i++) rwork[i] = rstart + i;
727     ierr = MatSetValues(B,1,&j,m,rwork,v,INSERT_VALUES); CHKERRQ(ierr);
728     v += m;
729   }
730   PetscFree(rwork);
731   ierr = MatAssemblyBegin(B,FINAL_ASSEMBLY); CHKERRQ(ierr);
732   ierr = MatAssemblyEnd(B,FINAL_ASSEMBLY); CHKERRQ(ierr);
733   if (matout) {
734     *matout = B;
735   } else {
736     /* This isn't really an in-place transpose, but free data struct from a */
737     PetscFree(a->rowners);
738     ierr = MatDestroy(a->A); CHKERRQ(ierr);
739     if (a->lvec) VecDestroy(a->lvec);
740     if (a->Mvctx) VecScatterDestroy(a->Mvctx);
741     PetscFree(a);
742     PetscMemcpy(A,B,sizeof(struct _Mat));
743     PetscHeaderDestroy(B);
744   }
745   return 0;
746 }
747 
748 static int MatConvertSameType_MPIDense(Mat,Mat *,int);
749 
750 /* -------------------------------------------------------------------*/
751 static struct _MatOps MatOps = {MatSetValues_MPIDense,
752        MatGetRow_MPIDense,MatRestoreRow_MPIDense,
753        MatMult_MPIDense,MatMultAdd_MPIDense,
754        MatMultTrans_MPIDense,MatMultTransAdd_MPIDense,
755 /*       MatSolve_MPIDense,0, */
756        0,0,
757        0,0,
758        0,0,
759 /*       MatLUFactor_MPIDense,0, */
760        0,MatTranspose_MPIDense,
761        MatGetInfo_MPIDense,0,
762        MatGetDiagonal_MPIDense,0,MatNorm_MPIDense,
763        MatAssemblyBegin_MPIDense,MatAssemblyEnd_MPIDense,
764        0,
765        MatSetOption_MPIDense,MatZeroEntries_MPIDense,MatZeroRows_MPIDense,
766        0,0,0,
767 /*       0,MatLUFactorSymbolic_MPIDense,MatLUFactorNumeric_MPIDense, */
768        0,0,
769        MatGetSize_MPIDense,MatGetLocalSize_MPIDense,
770        MatGetOwnershipRange_MPIDense,
771        0,0,
772        0,0,0,0,0,MatConvertSameType_MPIDense,
773        0,0,0,0,0,
774        0,0,MatGetValues_MPIDense};
775 
776 /*@C
777    MatCreateMPIDense - Creates a sparse parallel matrix in dense format.
778 
779    Input Parameters:
780 .  comm - MPI communicator
781 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
782 .  n - number of local columns (or PETSC_DECIDE to have calculated
783            if N is given)
784 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
785 .  N - number of global columns (or PETSC_DECIDE to have calculated
786            if n is given)
787 .  data - optional location of matrix data.  Set data=PETSC_NULL for PETSc
788    to control all matrix memory allocation.
789 
790    Output Parameter:
791 .  newmat - the matrix
792 
793    Notes:
794    The dense format is fully compatible with standard Fortran 77
795    storage by columns.
796 
797    The data input variable is intended primarily for Fortran programmers
798    who wish to allocate their own matrix memory space.  Most users should
799    set data=PETSC_NULL.
800 
801    The user MUST specify either the local or global matrix dimensions
802    (possibly both).
803 
804    Currently, the only parallel dense matrix decomposition is by rows,
805    so that n=N and each submatrix owns all of the global columns.
806 
807 .keywords: matrix, dense, parallel
808 
809 .seealso: MatCreate(), MatCreateSeqDense(), MatSetValues()
810 @*/
811 int MatCreateMPIDense(MPI_Comm comm,int m,int n,int M,int N,Scalar *data,Mat *newmat)
812 {
813   Mat          mat;
814   Mat_MPIDense *a;
815   int          ierr, i,flg;
816 
817 /* Note:  For now, when data is specified above, this assumes the user correctly
818    allocates the local dense storage space.  We should add error checking. */
819 
820   *newmat         = 0;
821   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIDENSE,comm);
822   PLogObjectCreate(mat);
823   mat->data       = (void *) (a = PetscNew(Mat_MPIDense)); CHKPTRQ(a);
824   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
825   mat->destroy    = MatDestroy_MPIDense;
826   mat->view       = MatView_MPIDense;
827   mat->factor     = 0;
828 
829   a->factor = 0;
830   a->insertmode = NOT_SET_VALUES;
831   MPI_Comm_rank(comm,&a->rank);
832   MPI_Comm_size(comm,&a->size);
833 
834   if (M == PETSC_DECIDE) MPI_Allreduce(&m,&M,1,MPI_INT,MPI_SUM,comm);
835   if (m == PETSC_DECIDE) {m = M/a->size + ((M % a->size) > a->rank);}
836 
837   /* each row stores all columns */
838   if (N == PETSC_DECIDE) N = n;
839   if (n == PETSC_DECIDE) {n = N/a->size + ((N % a->size) > a->rank);}
840   /*  if (n != N) SETERRQ(1,"MatCreateMPIDense:For now, only n=N is supported"); */
841   a->N = N;
842   a->M = M;
843   a->m = m;
844   a->n = n;
845 
846   /* build local table of row and column ownerships */
847   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
848   a->cowners = a->rowners + a->size + 1;
849   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+
850                        sizeof(Mat_MPIDense));
851   MPI_Allgather(&m,1,MPI_INT,a->rowners+1,1,MPI_INT,comm);
852   a->rowners[0] = 0;
853   for ( i=2; i<=a->size; i++ ) {
854     a->rowners[i] += a->rowners[i-1];
855   }
856   a->rstart = a->rowners[a->rank];
857   a->rend   = a->rowners[a->rank+1];
858   MPI_Allgather(&n,1,MPI_INT,a->cowners+1,1,MPI_INT,comm);
859   a->cowners[0] = 0;
860   for ( i=2; i<=a->size; i++ ) {
861     a->cowners[i] += a->cowners[i-1];
862   }
863 
864   ierr = MatCreateSeqDense(MPI_COMM_SELF,m,N,data,&a->A); CHKERRQ(ierr);
865   PLogObjectParent(mat,a->A);
866 
867   /* build cache for off array entries formed */
868   ierr = StashBuild_Private(&a->stash); CHKERRQ(ierr);
869 
870   /* stuff used for matrix vector multiply */
871   a->lvec        = 0;
872   a->Mvctx       = 0;
873   a->roworiented = 1;
874 
875   *newmat = mat;
876   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
877   if (flg) {
878     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
879   }
880   return 0;
881 }
882 
883 static int MatConvertSameType_MPIDense(Mat A,Mat *newmat,int cpvalues)
884 {
885   Mat          mat;
886   Mat_MPIDense *a,*oldmat = (Mat_MPIDense *) A->data;
887   int          ierr;
888   FactorCtx    *factor;
889 
890   *newmat       = 0;
891   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIDENSE,A->comm);
892   PLogObjectCreate(mat);
893   mat->data     = (void *) (a = PetscNew(Mat_MPIDense)); CHKPTRQ(a);
894   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
895   mat->destroy  = MatDestroy_MPIDense;
896   mat->view     = MatView_MPIDense;
897   mat->factor   = A->factor;
898   mat->assembled  = PETSC_TRUE;
899 
900   a->m          = oldmat->m;
901   a->n          = oldmat->n;
902   a->M          = oldmat->M;
903   a->N          = oldmat->N;
904   if (oldmat->factor) {
905     a->factor = (FactorCtx *) (factor = PetscNew(FactorCtx)); CHKPTRQ(factor);
906     /* copy factor contents ... add this code! */
907   } else a->factor = 0;
908 
909   a->rstart     = oldmat->rstart;
910   a->rend       = oldmat->rend;
911   a->size       = oldmat->size;
912   a->rank       = oldmat->rank;
913   a->insertmode = NOT_SET_VALUES;
914 
915   a->rowners = (int *) PetscMalloc((a->size+1)*sizeof(int)); CHKPTRQ(a->rowners);
916   PLogObjectMemory(mat,(a->size+1)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIDense));
917   PetscMemcpy(a->rowners,oldmat->rowners,(a->size+1)*sizeof(int));
918   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
919 
920   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
921   PLogObjectParent(mat,a->lvec);
922   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
923   PLogObjectParent(mat,a->Mvctx);
924   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
925   PLogObjectParent(mat,a->A);
926   *newmat = mat;
927   return 0;
928 }
929 
930 #include "sysio.h"
931 
932 int MatLoad_MPIDense(Viewer bview,MatType type,Mat *newmat)
933 {
934   Mat          A;
935   int          i, nz, ierr, j,rstart, rend, fd;
936   Scalar       *vals,*svals;
937   PetscObject  vobj = (PetscObject) bview;
938   MPI_Comm     comm = vobj->comm;
939   MPI_Status   status;
940   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,maxnz,*cols;
941   int          *ourlens,*sndcounts = 0,*procsnz = 0, *offlens,jj,*mycols,*smycols;
942   int          tag = ((PetscObject)bview)->tag;
943 
944   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
945   if (!rank) {
946     ierr = ViewerFileGetDescriptor_Private(bview,&fd); CHKERRQ(ierr);
947     ierr = SYRead(fd,(char *)header,4,SYINT); CHKERRQ(ierr);
948     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIDenseorMPIRow:not matrix object");
949   }
950 
951   MPI_Bcast(header+1,3,MPI_INT,0,comm);
952   M = header[1]; N = header[2];
953   /* determine ownership of all rows */
954   m = M/size + ((M % size) > rank);
955   rowners = (int *) PetscMalloc((size+2)*sizeof(int)); CHKPTRQ(rowners);
956   MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);
957   rowners[0] = 0;
958   for ( i=2; i<=size; i++ ) {
959     rowners[i] += rowners[i-1];
960   }
961   rstart = rowners[rank];
962   rend   = rowners[rank+1];
963 
964   /* distribute row lengths to all processors */
965   ourlens = (int*) PetscMalloc( 2*(rend-rstart)*sizeof(int) ); CHKPTRQ(ourlens);
966   offlens = ourlens + (rend-rstart);
967   if (!rank) {
968     rowlengths = (int*) PetscMalloc( M*sizeof(int) ); CHKPTRQ(rowlengths);
969     ierr = SYRead(fd,rowlengths,M,SYINT); CHKERRQ(ierr);
970     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
971     for ( i=0; i<size; i++ ) sndcounts[i] = rowners[i+1] - rowners[i];
972     MPI_Scatterv(rowlengths,sndcounts,rowners,MPI_INT,ourlens,rend-rstart,MPI_INT,0,comm);
973     PetscFree(sndcounts);
974   }
975   else {
976     MPI_Scatterv(0,0,0,MPI_INT,ourlens,rend-rstart,MPI_INT, 0,comm);
977   }
978 
979   if (!rank) {
980     /* calculate the number of nonzeros on each processor */
981     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
982     PetscMemzero(procsnz,size*sizeof(int));
983     for ( i=0; i<size; i++ ) {
984       for ( j=rowners[i]; j< rowners[i+1]; j++ ) {
985         procsnz[i] += rowlengths[j];
986       }
987     }
988     PetscFree(rowlengths);
989 
990     /* determine max buffer needed and allocate it */
991     maxnz = 0;
992     for ( i=0; i<size; i++ ) {
993       maxnz = PetscMax(maxnz,procsnz[i]);
994     }
995     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
996 
997     /* read in my part of the matrix column indices  */
998     nz = procsnz[0];
999     mycols = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1000     ierr = SYRead(fd,mycols,nz,SYINT); CHKERRQ(ierr);
1001 
1002     /* read in every one elses and ship off */
1003     for ( i=1; i<size; i++ ) {
1004       nz = procsnz[i];
1005       ierr = SYRead(fd,cols,nz,SYINT); CHKERRQ(ierr);
1006       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1007     }
1008     PetscFree(cols);
1009   }
1010   else {
1011     /* determine buffer space needed for message */
1012     nz = 0;
1013     for ( i=0; i<m; i++ ) {
1014       nz += ourlens[i];
1015     }
1016     mycols = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1017 
1018     /* receive message of column indices*/
1019     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1020     MPI_Get_count(&status,MPI_INT,&maxnz);
1021     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIDenseorMPIRow:something is wrong with file");
1022   }
1023 
1024   /* loop over local rows, determining number of off diagonal entries */
1025   PetscMemzero(offlens,m*sizeof(int));
1026   jj = 0;
1027   for ( i=0; i<m; i++ ) {
1028     for ( j=0; j<ourlens[i]; j++ ) {
1029       if (mycols[jj] < rstart || mycols[jj] >= rend) offlens[i]++;
1030       jj++;
1031     }
1032   }
1033 
1034   /* create our matrix */
1035   for ( i=0; i<m; i++ ) {
1036     ourlens[i] -= offlens[i];
1037   }
1038   if (type == MATMPIDENSE) {
1039     ierr = MatCreateMPIDense(comm,m,PETSC_DECIDE,M,N,PETSC_NULL,newmat); CHKERRQ(ierr);
1040   }
1041   A = *newmat;
1042   for ( i=0; i<m; i++ ) {
1043     ourlens[i] += offlens[i];
1044   }
1045 
1046   if (!rank) {
1047     vals = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(vals);
1048 
1049     /* read in my part of the matrix numerical values  */
1050     nz = procsnz[0];
1051     ierr = SYRead(fd,vals,nz,SYSCALAR); CHKERRQ(ierr);
1052 
1053     /* insert into matrix */
1054     jj      = rstart;
1055     smycols = mycols;
1056     svals   = vals;
1057     for ( i=0; i<m; i++ ) {
1058       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1059       smycols += ourlens[i];
1060       svals   += ourlens[i];
1061       jj++;
1062     }
1063 
1064     /* read in other processors and ship out */
1065     for ( i=1; i<size; i++ ) {
1066       nz = procsnz[i];
1067       ierr = SYRead(fd,vals,nz,SYSCALAR); CHKERRQ(ierr);
1068       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1069     }
1070     PetscFree(procsnz);
1071   }
1072   else {
1073     /* receive numeric values */
1074     vals = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(vals);
1075 
1076     /* receive message of values*/
1077     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1078     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1079     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIDenseorMPIRow:something is wrong with file");
1080 
1081     /* insert into matrix */
1082     jj      = rstart;
1083     smycols = mycols;
1084     svals   = vals;
1085     for ( i=0; i<m; i++ ) {
1086       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1087       smycols += ourlens[i];
1088       svals   += ourlens[i];
1089       jj++;
1090     }
1091   }
1092   PetscFree(ourlens); PetscFree(vals); PetscFree(mycols); PetscFree(rowners);
1093 
1094   ierr = MatAssemblyBegin(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
1095   ierr = MatAssemblyEnd(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
1096   return 0;
1097 }
1098