xref: /petsc/src/ksp/pc/impls/redundant/redundant.c (revision 2982eefb9df4848397e1414226a1a537681c401e)
1 #define PETSCKSP_DLL
2 
3 /*
4   This file defines a "solve the problem redundantly on each subgroup of processor" preconditioner.
5 */
6 #include "private/pcimpl.h"     /*I "petscpc.h" I*/
7 #include "petscksp.h"
8 
9 typedef struct {
10   MPI_Comm   parent;      /* parent communicator */
11   MPI_Comm   dupparent;   /* duplicate parent communicator, under which the processors of this subcomm have contiguous rank */
12   MPI_Comm   comm;        /* this communicator */
13   PetscInt   n;           /* num of subcommunicators under the parent communicator */
14   PetscInt   color;       /* color of processors belong to this communicator */
15 } PetscSubcomm;
16 
17 #undef __FUNCT__
18 #define __FUNCT__ "PetscSubcommDestroy"
19 PetscErrorCode PetscSubcommDestroy(PetscSubcomm *psubcomm)
20 {
21   PetscErrorCode ierr;
22 
23   PetscFunctionBegin;
24   ierr = PetscFree(psubcomm);CHKERRQ(ierr);
25   PetscFunctionReturn(0);
26 }
27 
28 /*--------------------------------------------------------------------------------------------------
29     To avoid data scattering from subcomm back to original comm, we create subcomm by iteratively taking a
30      processe into a subcomm.
31      An example: size=4, nsubcomm=3
32       pc->comm:
33       rank:     [0]  [1]  [2]  [3]
34       color:     0    1    2    0
35 
36      subcomm:
37       subrank:  [0]  [0]  [0]  [1]
38 
39      dupcomm:
40       duprank:  [0]  [2]  [3]  [1]
41 
42      Here, subcomm[color = 0] has subsize=2, owns process [0] and [3]
43            subcomm[color = 1] has subsize=1, owns process [1]
44            subcomm[color = 2] has subsize=1, owns process [2]
45           dupcomm has same number of processes as pc->comm, and its duprank maps
46           processes in subcomm contiguously into a 1d array:
47             duprank: [0] [1]      [2]         [3]
48             rank:    [0] [3]      [1]         [2]
49                     subcomm[0] subcomm[1]  subcomm[2]
50      ----------------------------------------------------------------------------------------*/
51 #undef __FUNCT__
52 #define __FUNCT__ "PetscSubcommCreate"
53 PetscErrorCode PetscSubcommCreate(MPI_Comm comm,PetscInt nsubcomm,PetscSubcomm **psubcomm)
54 {
55   PetscErrorCode ierr;
56   PetscMPIInt    rank,size,*subsize,duprank,subrank;
57   PetscInt       np_subcomm,nleftover,i,j,color;
58   MPI_Comm       subcomm=0,dupcomm=0;
59   const char     *prefix;
60   PetscSubcomm   *psubcomm_tmp;
61 
62   PetscFunctionBegin;
63   ierr = MPI_Comm_rank(comm,&rank);CHKERRQ(ierr);
64   ierr = MPI_Comm_size(comm,&size);CHKERRQ(ierr);
65   if (nsubcomm < 1 || nsubcomm > size) SETERRQ2(PETSC_ERR_ARG_OUTOFRANGE, "Num of subcommunicators %D cannot be < 1 or > input comm size %D",nsubcomm,size);
66 
67   /* get size of each subcommunicator */
68   ierr = PetscMalloc((1+nsubcomm)*sizeof(PetscMPIInt),&subsize);CHKERRQ(ierr);
69   np_subcomm = size/nsubcomm;
70   nleftover  = size - nsubcomm*np_subcomm;
71   for (i=0; i<nsubcomm; i++){
72     subsize[i] = np_subcomm;
73     if (i<nleftover) subsize[i]++;
74   }
75 
76   /* find color for this proc */
77   color   = rank%nsubcomm;
78   subrank = rank/nsubcomm;
79 
80   ierr = MPI_Comm_split(comm,color,subrank,&subcomm);CHKERRQ(ierr);
81 
82   j = 0; duprank = 0;
83   for (i=0; i<nsubcomm; i++){
84     if (j == color){
85       duprank += subrank;
86       break;
87     }
88     duprank += subsize[i]; j++;
89   }
90 
91   /* create dupcomm with same size as comm, but its rank, duprank, maps subcomm's contiguously into dupcomm */
92   ierr = MPI_Comm_split(comm,0,duprank,&dupcomm);CHKERRQ(ierr);
93   ierr = PetscFree(subsize);CHKERRQ(ierr);
94 
95   ierr = PetscNew(PetscSubcomm,&psubcomm_tmp);CHKERRQ(ierr);
96   psubcomm_tmp->parent    = comm;
97   psubcomm_tmp->dupparent = dupcomm;
98   psubcomm_tmp->comm      = subcomm;
99   psubcomm_tmp->n         = nsubcomm;
100   psubcomm_tmp->color     = color;
101   *psubcomm = psubcomm_tmp;
102   PetscFunctionReturn(0);
103 }
104 
105 typedef struct {
106   PC           pc;                   /* actual preconditioner used on each processor */
107   Vec          xsub,ysub;            /* vectors of a subcommunicator to hold parallel vectors of pc->comm */
108   Vec          xdup,ydup;            /* parallel vector that congregates xsub or ysub facilitating vector scattering */
109   Mat          pmats;                /* matrix and optional preconditioner matrix belong to a subcommunicator */
110   VecScatter   scatterin,scatterout; /* scatter used to move all values to each processor group (subcommunicator) */
111   PetscTruth   useparallelmat;
112   PetscSubcomm *psubcomm;
113   PetscInt     nsubcomm;           /* num of data structure PetscSubcomm */
114 } PC_Redundant;
115 
116 #undef __FUNCT__
117 #define __FUNCT__ "PCView_Redundant"
118 static PetscErrorCode PCView_Redundant(PC pc,PetscViewer viewer)
119 {
120   PC_Redundant   *red = (PC_Redundant*)pc->data;
121   PetscErrorCode ierr;
122   PetscMPIInt    rank;
123   PetscTruth     iascii,isstring;
124   PetscViewer    sviewer,subviewer;
125   PetscInt       color = red->psubcomm->color;
126 
127   PetscFunctionBegin;
128   ierr = MPI_Comm_rank(pc->comm,&rank);CHKERRQ(ierr);
129   ierr = PetscTypeCompare((PetscObject)viewer,PETSC_VIEWER_ASCII,&iascii);CHKERRQ(ierr);
130   ierr = PetscTypeCompare((PetscObject)viewer,PETSC_VIEWER_STRING,&isstring);CHKERRQ(ierr);
131   if (iascii) {
132     ierr = PetscViewerASCIIPrintf(viewer,"  Redundant solver preconditioner: First PC (color=0) follows\n");CHKERRQ(ierr);
133     ierr = PetscViewerGetSubcomm(viewer,red->pc->comm,&subviewer);CHKERRQ(ierr);
134     if (!color) { /* only view first redundant pc */
135       ierr = PetscViewerASCIIPushTab(viewer);CHKERRQ(ierr);
136       ierr = PCView(red->pc,subviewer);CHKERRQ(ierr);
137       ierr = PetscViewerASCIIPopTab(viewer);CHKERRQ(ierr);
138     }
139     ierr = PetscViewerRestoreSubcomm(viewer,red->pc->comm,&subviewer);CHKERRQ(ierr);
140   } else if (isstring) { /* not test it yet! */
141     ierr = PetscViewerStringSPrintf(viewer," Redundant solver preconditioner");CHKERRQ(ierr);
142     ierr = PetscViewerGetSingleton(viewer,&sviewer);CHKERRQ(ierr);
143     if (!rank) {
144       ierr = PCView(red->pc,sviewer);CHKERRQ(ierr);
145     }
146     ierr = PetscViewerRestoreSingleton(viewer,&sviewer);CHKERRQ(ierr);
147   } else {
148     SETERRQ1(PETSC_ERR_SUP,"Viewer type %s not supported for PC redundant",((PetscObject)viewer)->type_name);
149   }
150   PetscFunctionReturn(0);
151 }
152 
153 #include "include/private/matimpl.h"        /*I "petscmat.h" I*/
154 #include "private/vecimpl.h"
155 #include "src/mat/impls/aij/mpi/mpiaij.h"   /*I "petscmat.h" I*/
156 #include "src/mat/impls/aij/seq/aij.h"      /*I "petscmat.h" I*/
157 
158 typedef struct { /* used by MatGetRedundantMatrix() for reusing matredundant */
159   PetscInt       nzlocal,nsends,nrecvs;
160   PetscInt       *send_rank,*sbuf_nz,*sbuf_j,**rbuf_j;
161   PetscScalar    *sbuf_a,**rbuf_a;
162   PetscErrorCode (*MatDestroy)(Mat);
163 } Mat_Redundant;
164 
165 #undef __FUNCT__
166 #define __FUNCT__ "PetscContainerDestroy_MatRedundant"
167 PetscErrorCode PetscContainerDestroy_MatRedundant(void *ptr)
168 {
169   PetscErrorCode       ierr;
170   Mat_Redundant        *redund=(Mat_Redundant*)ptr;
171   PetscInt             i;
172 
173   PetscFunctionBegin;
174   ierr = PetscFree(redund->send_rank);CHKERRQ(ierr);
175   ierr = PetscFree(redund->sbuf_j);CHKERRQ(ierr);
176   ierr = PetscFree(redund->sbuf_a);CHKERRQ(ierr);
177   for (i=0; i<redund->nrecvs; i++){
178     ierr = PetscFree(redund->rbuf_j[i]);CHKERRQ(ierr);
179     ierr = PetscFree(redund->rbuf_a[i]);CHKERRQ(ierr);
180   }
181   ierr = PetscFree3(redund->sbuf_nz,redund->rbuf_j,redund->rbuf_a);CHKERRQ(ierr);
182   ierr = PetscFree(redund);CHKERRQ(ierr);
183   PetscFunctionReturn(0);
184 }
185 
186 #undef __FUNCT__
187 #define __FUNCT__ "MatDestroy_MatRedundant"
188 PetscErrorCode MatDestroy_MatRedundant(Mat A)
189 {
190   PetscErrorCode  ierr;
191   PetscContainer  container;
192   Mat_Redundant   *redund=PETSC_NULL;
193 
194   PetscFunctionBegin;
195   ierr = PetscObjectQuery((PetscObject)A,"Mat_Redundant",(PetscObject *)&container);CHKERRQ(ierr);
196   if (container) {
197     ierr = PetscContainerGetPointer(container,(void **)&redund);CHKERRQ(ierr);
198   } else {
199     SETERRQ(PETSC_ERR_PLIB,"Container does not exit");
200   }
201   A->ops->destroy = redund->MatDestroy;
202   ierr = PetscObjectCompose((PetscObject)A,"Mat_Redundant",0);CHKERRQ(ierr);
203   ierr = (*A->ops->destroy)(A);CHKERRQ(ierr);
204   ierr = PetscContainerDestroy(container);CHKERRQ(ierr);
205   PetscFunctionReturn(0);
206 }
207 
208 #undef __FUNCT__
209 #define __FUNCT__ "MatGetRedundantMatrix"
210 PetscErrorCode MatGetRedundantMatrix_AIJ(Mat mat,PetscInt nsubcomm,MPI_Comm subcomm,PetscInt mlocal_sub,MatReuse reuse,Mat *matredundant)
211 {
212   PetscMPIInt    rank,size;
213   MPI_Comm       comm=mat->comm;
214   PetscErrorCode ierr;
215   PetscInt       nsends=0,nrecvs=0,i,rownz_max=0;
216   PetscMPIInt    *send_rank=PETSC_NULL,*recv_rank=PETSC_NULL;
217   PetscInt       *rowrange=mat->rmap.range;
218   Mat_MPIAIJ     *aij = (Mat_MPIAIJ*)mat->data;
219   Mat            A=aij->A,B=aij->B,C=*matredundant;
220   Mat_SeqAIJ     *a=(Mat_SeqAIJ*)A->data,*b=(Mat_SeqAIJ*)B->data;
221   PetscScalar    *sbuf_a;
222   PetscInt       nzlocal=a->nz+b->nz;
223   PetscInt       j,cstart=mat->cmap.rstart,cend=mat->cmap.rend,row,nzA,nzB,ncols,*cworkA,*cworkB;
224   PetscInt       rstart=mat->rmap.rstart,rend=mat->rmap.rend,*bmap=aij->garray,M,N;
225   PetscInt       *cols,ctmp,lwrite,*rptr,l,*sbuf_j;
226   PetscScalar    *vals,*aworkA,*aworkB;
227   PetscMPIInt    tag1,tag2,tag3,imdex;
228   MPI_Request    *s_waits1=PETSC_NULL,*s_waits2=PETSC_NULL,*s_waits3=PETSC_NULL,
229                  *r_waits1=PETSC_NULL,*r_waits2=PETSC_NULL,*r_waits3=PETSC_NULL;
230   MPI_Status     recv_status,*send_status;
231   PetscInt       *sbuf_nz=PETSC_NULL,*rbuf_nz=PETSC_NULL,count;
232   PetscInt       **rbuf_j=PETSC_NULL;
233   PetscScalar    **rbuf_a=PETSC_NULL;
234   Mat_Redundant  *redund=PETSC_NULL;
235   PetscContainer container;
236 
237   PetscFunctionBegin;
238   ierr = MPI_Comm_rank(comm,&rank);CHKERRQ(ierr);
239   ierr = MPI_Comm_size(comm,&size);CHKERRQ(ierr);
240 
241   if (reuse == MAT_REUSE_MATRIX) {
242     ierr = MatGetSize(C,&M,&N);CHKERRQ(ierr);
243     if (M != N || M != mat->rmap.N) SETERRQ(PETSC_ERR_ARG_SIZ,"Cannot reuse matrix. Wrong global size");
244     ierr = MatGetLocalSize(C,&M,&N);CHKERRQ(ierr);
245     if (M != N || M != mlocal_sub) SETERRQ(PETSC_ERR_ARG_SIZ,"Cannot reuse matrix. Wrong local size");
246     ierr = PetscObjectQuery((PetscObject)C,"Mat_Redundant",(PetscObject *)&container);CHKERRQ(ierr);
247     if (container) {
248       ierr = PetscContainerGetPointer(container,(void **)&redund);CHKERRQ(ierr);
249     } else {
250       SETERRQ(PETSC_ERR_PLIB,"Container does not exit");
251     }
252     if (nzlocal != redund->nzlocal) SETERRQ(PETSC_ERR_ARG_SIZ,"Cannot reuse matrix. Wrong nzlocal");
253 
254     nsends    = redund->nsends;
255     nrecvs    = redund->nrecvs;
256     send_rank = redund->send_rank; recv_rank = send_rank + size;
257     sbuf_nz   = redund->sbuf_nz;     rbuf_nz = sbuf_nz + nsends;
258     sbuf_j    = redund->sbuf_j;
259     sbuf_a    = redund->sbuf_a;
260     rbuf_j    = redund->rbuf_j;
261     rbuf_a    = redund->rbuf_a;
262   }
263 
264   if (reuse == MAT_INITIAL_MATRIX){
265     PetscMPIInt  subrank,subsize;
266     PetscInt     nleftover,np_subcomm;
267     /* get the destination processors' id send_rank, nsends and nrecvs */
268     ierr = MPI_Comm_rank(subcomm,&subrank);CHKERRQ(ierr);
269     ierr = MPI_Comm_size(subcomm,&subsize);CHKERRQ(ierr);
270     ierr = PetscMalloc((2*size+1)*sizeof(PetscMPIInt),&send_rank);
271     recv_rank = send_rank + size;
272     np_subcomm = size/nsubcomm;
273     nleftover  = size - nsubcomm*np_subcomm;
274     nsends = 0; nrecvs = 0;
275     for (i=0; i<size; i++){ /* i=rank*/
276       if (subrank == i/nsubcomm && rank != i){ /* my_subrank == other's subrank */
277         send_rank[nsends] = i; nsends++;
278         recv_rank[nrecvs++] = i;
279       }
280     }
281     if (rank >= size - nleftover){/* this proc is a leftover processor */
282       i = size-nleftover-1;
283       j = 0;
284       while (j < nsubcomm - nleftover){
285         send_rank[nsends++] = i;
286         i--; j++;
287       }
288     }
289 
290     if (nleftover && subsize == size/nsubcomm && subrank==subsize-1){ /* this proc recvs from leftover processors */
291       for (i=0; i<nleftover; i++){
292         recv_rank[nrecvs++] = size-nleftover+i;
293       }
294     }
295 
296     /* allocate sbuf_j, sbuf_a */
297     i = nzlocal + rowrange[rank+1] - rowrange[rank] + 2;
298     ierr = PetscMalloc(i*sizeof(PetscInt),&sbuf_j);CHKERRQ(ierr);
299     ierr = PetscMalloc((nzlocal+1)*sizeof(PetscScalar),&sbuf_a);CHKERRQ(ierr);
300   } /* endof if (reuse == MAT_INITIAL_MATRIX) */
301 
302   /* copy mat's local entries into the buffers */
303   if (reuse == MAT_INITIAL_MATRIX){
304     rownz_max = 0;
305     rptr = sbuf_j;
306     cols = sbuf_j + rend-rstart + 1;
307     vals = sbuf_a;
308     rptr[0] = 0;
309     for (i=0; i<rend-rstart; i++){
310       row = i + rstart;
311       nzA    = a->i[i+1] - a->i[i]; nzB = b->i[i+1] - b->i[i];
312       ncols  = nzA + nzB;
313       cworkA = a->j + a->i[i]; cworkB = b->j + b->i[i];
314       aworkA = a->a + a->i[i]; aworkB = b->a + b->i[i];
315       /* load the column indices for this row into cols */
316       lwrite = 0;
317       for (l=0; l<nzB; l++) {
318         if ((ctmp = bmap[cworkB[l]]) < cstart){
319           vals[lwrite]   = aworkB[l];
320           cols[lwrite++] = ctmp;
321         }
322       }
323       for (l=0; l<nzA; l++){
324         vals[lwrite]   = aworkA[l];
325         cols[lwrite++] = cstart + cworkA[l];
326       }
327       for (l=0; l<nzB; l++) {
328         if ((ctmp = bmap[cworkB[l]]) >= cend){
329           vals[lwrite]   = aworkB[l];
330           cols[lwrite++] = ctmp;
331         }
332       }
333       vals += ncols;
334       cols += ncols;
335       rptr[i+1] = rptr[i] + ncols;
336       if (rownz_max < ncols) rownz_max = ncols;
337     }
338     if (rptr[rend-rstart] != a->nz + b->nz) SETERRQ4(1, "rptr[%d] %d != %d + %d",rend-rstart,rptr[rend-rstart+1],a->nz,b->nz);
339   } else { /* only copy matrix values into sbuf_a */
340     rptr = sbuf_j;
341     vals = sbuf_a;
342     rptr[0] = 0;
343     for (i=0; i<rend-rstart; i++){
344       row = i + rstart;
345       nzA    = a->i[i+1] - a->i[i]; nzB = b->i[i+1] - b->i[i];
346       ncols  = nzA + nzB;
347       cworkA = a->j + a->i[i]; cworkB = b->j + b->i[i];
348       aworkA = a->a + a->i[i]; aworkB = b->a + b->i[i];
349       lwrite = 0;
350       for (l=0; l<nzB; l++) {
351         if ((ctmp = bmap[cworkB[l]]) < cstart) vals[lwrite++] = aworkB[l];
352       }
353       for (l=0; l<nzA; l++) vals[lwrite++] = aworkA[l];
354       for (l=0; l<nzB; l++) {
355         if ((ctmp = bmap[cworkB[l]]) >= cend) vals[lwrite++] = aworkB[l];
356       }
357       vals += ncols;
358       rptr[i+1] = rptr[i] + ncols;
359     }
360   } /* endof if (reuse == MAT_INITIAL_MATRIX) */
361 
362   /* send nzlocal to others, and recv other's nzlocal */
363   /*--------------------------------------------------*/
364   if (reuse == MAT_INITIAL_MATRIX){
365     ierr = PetscMalloc2(3*(nsends + nrecvs)+1,MPI_Request,&s_waits3,nsends+1,MPI_Status,&send_status);CHKERRQ(ierr);
366     s_waits2 = s_waits3 + nsends;
367     s_waits1 = s_waits2 + nsends;
368     r_waits1 = s_waits1 + nsends;
369     r_waits2 = r_waits1 + nrecvs;
370     r_waits3 = r_waits2 + nrecvs;
371   } else {
372     ierr = PetscMalloc2(nsends + nrecvs +1,MPI_Request,&s_waits3,nsends+1,MPI_Status,&send_status);CHKERRQ(ierr);
373     r_waits3 = s_waits3 + nsends;
374   }
375 
376   ierr = PetscObjectGetNewTag((PetscObject)mat,&tag3);CHKERRQ(ierr);
377   if (reuse == MAT_INITIAL_MATRIX){
378     /* get new tags to keep the communication clean */
379     ierr = PetscObjectGetNewTag((PetscObject)mat,&tag1);CHKERRQ(ierr);
380     ierr = PetscObjectGetNewTag((PetscObject)mat,&tag2);CHKERRQ(ierr);
381     ierr = PetscMalloc3(nsends+nrecvs+1,PetscInt,&sbuf_nz,nrecvs,PetscInt*,&rbuf_j,nrecvs,PetscScalar*,&rbuf_a);CHKERRQ(ierr);
382     rbuf_nz = sbuf_nz + nsends;
383 
384     /* post receives of other's nzlocal */
385     for (i=0; i<nrecvs; i++){
386       ierr = MPI_Irecv(rbuf_nz+i,1,MPIU_INT,MPI_ANY_SOURCE,tag1,comm,r_waits1+i);CHKERRQ(ierr);
387     }
388     /* send nzlocal to others */
389     for (i=0; i<nsends; i++){
390       sbuf_nz[i] = nzlocal;
391       ierr = MPI_Isend(sbuf_nz+i,1,MPIU_INT,send_rank[i],tag1,comm,s_waits1+i);CHKERRQ(ierr);
392     }
393     /* wait on receives of nzlocal; allocate space for rbuf_j, rbuf_a */
394     count = nrecvs;
395     while (count) {
396       ierr = MPI_Waitany(nrecvs,r_waits1,&imdex,&recv_status);CHKERRQ(ierr);
397       recv_rank[imdex] = recv_status.MPI_SOURCE;
398       /* allocate rbuf_a and rbuf_j; then post receives of rbuf_j */
399       ierr = PetscMalloc((rbuf_nz[imdex]+1)*sizeof(PetscScalar),&rbuf_a[imdex]);CHKERRQ(ierr);
400 
401       i = rowrange[recv_status.MPI_SOURCE+1] - rowrange[recv_status.MPI_SOURCE]; /* number of expected mat->i */
402       rbuf_nz[imdex] += i + 2;
403       ierr = PetscMalloc(rbuf_nz[imdex]*sizeof(PetscInt),&rbuf_j[imdex]);CHKERRQ(ierr);
404       ierr = MPI_Irecv(rbuf_j[imdex],rbuf_nz[imdex],MPIU_INT,recv_status.MPI_SOURCE,tag2,comm,r_waits2+imdex);CHKERRQ(ierr);
405       count--;
406     }
407     /* wait on sends of nzlocal */
408     if (nsends) {ierr = MPI_Waitall(nsends,s_waits1,send_status);CHKERRQ(ierr);}
409     /* send mat->i,j to others, and recv from other's */
410     /*------------------------------------------------*/
411     for (i=0; i<nsends; i++){
412       j = nzlocal + rowrange[rank+1] - rowrange[rank] + 1;
413       ierr = MPI_Isend(sbuf_j,j,MPIU_INT,send_rank[i],tag2,comm,s_waits2+i);CHKERRQ(ierr);
414     }
415     /* wait on receives of mat->i,j */
416     /*------------------------------*/
417     count = nrecvs;
418     while (count) {
419       ierr = MPI_Waitany(nrecvs,r_waits2,&imdex,&recv_status);CHKERRQ(ierr);
420       if (recv_rank[imdex] != recv_status.MPI_SOURCE) SETERRQ2(1, "recv_rank %d != MPI_SOURCE %d",recv_rank[imdex],recv_status.MPI_SOURCE);
421       count--;
422     }
423     /* wait on sends of mat->i,j */
424     /*---------------------------*/
425     if (nsends) {
426       ierr = MPI_Waitall(nsends,s_waits2,send_status);CHKERRQ(ierr);
427     }
428   } /* endof if (reuse == MAT_INITIAL_MATRIX) */
429 
430   /* post receives, send and receive mat->a */
431   /*----------------------------------------*/
432   for (imdex=0; imdex<nrecvs; imdex++) {
433     ierr = MPI_Irecv(rbuf_a[imdex],rbuf_nz[imdex],MPIU_SCALAR,recv_rank[imdex],tag3,comm,r_waits3+imdex);CHKERRQ(ierr);
434   }
435   for (i=0; i<nsends; i++){
436     ierr = MPI_Isend(sbuf_a,nzlocal,MPIU_SCALAR,send_rank[i],tag3,comm,s_waits3+i);CHKERRQ(ierr);
437   }
438   count = nrecvs;
439   while (count) {
440     ierr = MPI_Waitany(nrecvs,r_waits3,&imdex,&recv_status);CHKERRQ(ierr);
441     if (recv_rank[imdex] != recv_status.MPI_SOURCE) SETERRQ2(1, "recv_rank %d != MPI_SOURCE %d",recv_rank[imdex],recv_status.MPI_SOURCE);
442     count--;
443   }
444   if (nsends) {
445     ierr = MPI_Waitall(nsends,s_waits3,send_status);CHKERRQ(ierr);
446   }
447 
448   ierr = PetscFree2(s_waits3,send_status);CHKERRQ(ierr);
449 
450   /* create redundant matrix */
451   /*-------------------------*/
452   if (reuse == MAT_INITIAL_MATRIX){
453     /* compute rownz_max for preallocation */
454     for (imdex=0; imdex<nrecvs; imdex++){
455       j = rowrange[recv_rank[imdex]+1] - rowrange[recv_rank[imdex]];
456       rptr = rbuf_j[imdex];
457       for (i=0; i<j; i++){
458         ncols = rptr[i+1] - rptr[i];
459         if (rownz_max < ncols) rownz_max = ncols;
460       }
461     }
462 
463     ierr = MatCreate(subcomm,&C);CHKERRQ(ierr);
464     ierr = MatSetSizes(C,mlocal_sub,mlocal_sub,PETSC_DECIDE,PETSC_DECIDE);CHKERRQ(ierr);
465     ierr = MatSetFromOptions(C);CHKERRQ(ierr);
466     ierr = MatSeqAIJSetPreallocation(C,rownz_max,PETSC_NULL);CHKERRQ(ierr);
467     ierr = MatMPIAIJSetPreallocation(C,rownz_max,PETSC_NULL,rownz_max,PETSC_NULL);CHKERRQ(ierr);
468   } else {
469     C = *matredundant;
470   }
471 
472   /* insert local matrix entries */
473   rptr = sbuf_j;
474   cols = sbuf_j + rend-rstart + 1;
475   vals = sbuf_a;
476   for (i=0; i<rend-rstart; i++){
477     row   = i + rstart;
478     ncols = rptr[i+1] - rptr[i];
479     ierr = MatSetValues(C,1,&row,ncols,cols,vals,INSERT_VALUES);CHKERRQ(ierr);
480     vals += ncols;
481     cols += ncols;
482   }
483   /* insert received matrix entries */
484   for (imdex=0; imdex<nrecvs; imdex++){
485     rstart = rowrange[recv_rank[imdex]];
486     rend   = rowrange[recv_rank[imdex]+1];
487     rptr = rbuf_j[imdex];
488     cols = rbuf_j[imdex] + rend-rstart + 1;
489     vals = rbuf_a[imdex];
490     for (i=0; i<rend-rstart; i++){
491       row   = i + rstart;
492       ncols = rptr[i+1] - rptr[i];
493       ierr = MatSetValues(C,1,&row,ncols,cols,vals,INSERT_VALUES);CHKERRQ(ierr);
494       vals += ncols;
495       cols += ncols;
496     }
497   }
498   ierr = MatAssemblyBegin(C,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
499   ierr = MatAssemblyEnd(C,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
500   ierr = MatGetSize(C,&M,&N);CHKERRQ(ierr);
501   if (M != mat->rmap.N || N != mat->cmap.N) SETERRQ2(PETSC_ERR_ARG_INCOMP,"redundant mat size %d != input mat size %d",M,mat->rmap.N);
502   if (reuse == MAT_INITIAL_MATRIX){
503     PetscContainer container;
504     *matredundant = C;
505     /* create a supporting struct and attach it to C for reuse */
506     ierr = PetscNew(Mat_Redundant,&redund);CHKERRQ(ierr);
507     ierr = PetscContainerCreate(PETSC_COMM_SELF,&container);CHKERRQ(ierr);
508     ierr = PetscContainerSetPointer(container,redund);CHKERRQ(ierr);
509     ierr = PetscObjectCompose((PetscObject)C,"Mat_Redundant",(PetscObject)container);CHKERRQ(ierr);
510     ierr = PetscContainerSetUserDestroy(container,PetscContainerDestroy_MatRedundant);CHKERRQ(ierr);
511 
512     redund->nzlocal = nzlocal;
513     redund->nsends  = nsends;
514     redund->nrecvs  = nrecvs;
515     redund->send_rank = send_rank;
516     redund->sbuf_nz = sbuf_nz;
517     redund->sbuf_j  = sbuf_j;
518     redund->sbuf_a  = sbuf_a;
519     redund->rbuf_j  = rbuf_j;
520     redund->rbuf_a  = rbuf_a;
521 
522     redund->MatDestroy = C->ops->destroy;
523     C->ops->destroy    = MatDestroy_MatRedundant;
524   }
525   PetscFunctionReturn(0);
526 }
527 
528 #undef __FUNCT__
529 #define __FUNCT__ "PCSetUp_Redundant"
530 static PetscErrorCode PCSetUp_Redundant(PC pc)
531 {
532   PC_Redundant   *red = (PC_Redundant*)pc->data;
533   PetscErrorCode ierr;
534   PetscInt       mstart,mend,mlocal,m;
535   PetscMPIInt    size;
536   MatReuse       reuse = MAT_INITIAL_MATRIX;
537   MatStructure   str   = DIFFERENT_NONZERO_PATTERN;
538   MPI_Comm       comm = pc->comm;
539   Vec            vec;
540   PetscInt       mlocal_sub;
541   PetscMPIInt    subsize,subrank;
542   PetscInt       rstart_sub,rend_sub,mloc_sub,nsubcomm;
543   const char     *prefix;
544 
545   PetscFunctionBegin;
546   ierr = MatGetVecs(pc->pmat,&vec,0);CHKERRQ(ierr);
547   ierr = VecGetSize(vec,&m);CHKERRQ(ierr);
548 
549   if (!pc->setupcalled) {
550     ierr = PetscSubcommCreate(comm,red->nsubcomm,&red->psubcomm);CHKERRQ(ierr);
551     ierr = PetscLogObjectMemory(pc,sizeof(PetscSubcomm));CHKERRQ(ierr);
552 
553     /* create a new PC that processors in each subcomm have copy of */
554     MPI_Comm subcomm = red->psubcomm->comm;
555     ierr = PCCreate(subcomm,&red->pc);CHKERRQ(ierr);
556     ierr = PCSetType(red->pc,PCLU);CHKERRQ(ierr);
557     ierr = PCGetOptionsPrefix(pc,&prefix);CHKERRQ(ierr);
558     ierr = PCSetOptionsPrefix(red->pc,prefix);CHKERRQ(ierr);
559     ierr = PCAppendOptionsPrefix(red->pc,"redundant_");CHKERRQ(ierr);
560     ierr = PCSetFromOptions(red->pc);CHKERRQ(ierr);
561 
562     /* create working vectors xsub/ysub and xdup/ydup */
563     ierr = VecGetLocalSize(vec,&mlocal);CHKERRQ(ierr);
564     ierr = VecGetOwnershipRange(vec,&mstart,&mend);CHKERRQ(ierr);
565 
566     /* get local size of xsub/ysub */
567     ierr = MPI_Comm_size(subcomm,&subsize);CHKERRQ(ierr);
568     ierr = MPI_Comm_rank(subcomm,&subrank);CHKERRQ(ierr);
569     rstart_sub = pc->pmat->rmap.range[red->psubcomm->n*subrank]; /* rstart in xsub/ysub */
570     if (subrank+1 < subsize){
571       rend_sub = pc->pmat->rmap.range[red->psubcomm->n*(subrank+1)];
572     } else {
573       rend_sub = m;
574     }
575     mloc_sub = rend_sub - rstart_sub;
576     ierr = VecCreateMPI(subcomm,mloc_sub,PETSC_DECIDE,&red->ysub);CHKERRQ(ierr);
577     /* create xsub with empty local arrays, because xdup's arrays will be placed into it */
578     ierr = VecCreateMPIWithArray(subcomm,mloc_sub,PETSC_DECIDE,PETSC_NULL,&red->xsub);CHKERRQ(ierr);
579 
580     /* create xdup and ydup. ydup has empty local arrays because ysub's arrays will be place into it.
581        Note: we use communicator dupcomm, not pc->comm! */
582     ierr = VecCreateMPI(red->psubcomm->dupparent,mloc_sub,PETSC_DECIDE,&red->xdup);CHKERRQ(ierr);
583     ierr = VecCreateMPIWithArray(red->psubcomm->dupparent,mloc_sub,PETSC_DECIDE,PETSC_NULL,&red->ydup);CHKERRQ(ierr);
584 
585     /* create vec scatters */
586     if (!red->scatterin){
587       IS       is1,is2;
588       PetscInt *idx1,*idx2,i,j,k;
589       ierr = PetscMalloc(2*red->psubcomm->n*mlocal*sizeof(PetscInt),&idx1);CHKERRQ(ierr);
590       idx2 = idx1 + red->psubcomm->n*mlocal;
591       j = 0;
592       for (k=0; k<red->psubcomm->n; k++){
593         for (i=mstart; i<mend; i++){
594           idx1[j]   = i;
595           idx2[j++] = i + m*k;
596         }
597       }
598       ierr = ISCreateGeneral(comm,red->psubcomm->n*mlocal,idx1,&is1);CHKERRQ(ierr);
599       ierr = ISCreateGeneral(comm,red->psubcomm->n*mlocal,idx2,&is2);CHKERRQ(ierr);
600       ierr = VecScatterCreate(vec,is1,red->xdup,is2,&red->scatterin);CHKERRQ(ierr);
601       ierr = ISDestroy(is1);CHKERRQ(ierr);
602       ierr = ISDestroy(is2);CHKERRQ(ierr);
603 
604       ierr = ISCreateStride(comm,mlocal,mstart+ red->psubcomm->color*m,1,&is1);CHKERRQ(ierr);
605       ierr = ISCreateStride(comm,mlocal,mstart,1,&is2);CHKERRQ(ierr);
606       ierr = VecScatterCreate(red->xdup,is1,vec,is2,&red->scatterout);CHKERRQ(ierr);
607       ierr = ISDestroy(is1);CHKERRQ(ierr);
608       ierr = ISDestroy(is2);CHKERRQ(ierr);
609       ierr = PetscFree(idx1);CHKERRQ(ierr);
610     }
611   }
612   ierr = VecDestroy(vec);CHKERRQ(ierr);
613 
614   /* if pmatrix set by user is sequential then we do not need to gather the parallel matrix */
615   ierr = MPI_Comm_size(comm,&size);CHKERRQ(ierr);
616   if (size == 1) {
617     red->useparallelmat = PETSC_FALSE;
618   }
619 
620   if (red->useparallelmat) {
621     if (pc->setupcalled == 1 && pc->flag == DIFFERENT_NONZERO_PATTERN) {
622       /* destroy old matrices */
623       if (red->pmats) {
624         ierr = MatDestroy(red->pmats);CHKERRQ(ierr);
625       }
626     } else if (pc->setupcalled == 1) {
627       reuse = MAT_REUSE_MATRIX;
628       str   = SAME_NONZERO_PATTERN;
629     }
630 
631     /* grab the parallel matrix and put it into processors of a subcomminicator */
632     /*--------------------------------------------------------------------------*/
633     ierr = VecGetLocalSize(red->ysub,&mlocal_sub);CHKERRQ(ierr);
634     ierr = MatGetRedundantMatrix_AIJ(pc->pmat,red->psubcomm->n,red->psubcomm->comm,mlocal_sub,reuse,&red->pmats);CHKERRQ(ierr);
635     /* tell PC of the subcommunicator its operators */
636     ierr = PCSetOperators(red->pc,red->pmats,red->pmats,str);CHKERRQ(ierr);
637   } else {
638     ierr = PCSetOperators(red->pc,pc->mat,pc->pmat,pc->flag);CHKERRQ(ierr);
639   }
640   ierr = PCSetFromOptions(red->pc);CHKERRQ(ierr);
641   ierr = PCSetUp(red->pc);CHKERRQ(ierr);
642   PetscFunctionReturn(0);
643 }
644 
645 #undef __FUNCT__
646 #define __FUNCT__ "PCApply_Redundant"
647 static PetscErrorCode PCApply_Redundant(PC pc,Vec x,Vec y)
648 {
649   PC_Redundant   *red = (PC_Redundant*)pc->data;
650   PetscErrorCode ierr;
651   PetscScalar    *array;
652 
653   PetscFunctionBegin;
654   /* scatter x to xdup */
655   ierr = VecScatterBegin(x,red->xdup,INSERT_VALUES,SCATTER_FORWARD,red->scatterin);CHKERRQ(ierr);
656   ierr = VecScatterEnd(x,red->xdup,INSERT_VALUES,SCATTER_FORWARD,red->scatterin);CHKERRQ(ierr);
657 
658   /* place xdup's local array into xsub */
659   ierr = VecGetArray(red->xdup,&array);CHKERRQ(ierr);
660   ierr = VecPlaceArray(red->xsub,(const PetscScalar*)array);CHKERRQ(ierr);
661 
662   /* apply preconditioner on each processor */
663   ierr = PCApply(red->pc,red->xsub,red->ysub);CHKERRQ(ierr);
664   ierr = VecResetArray(red->xsub);CHKERRQ(ierr);
665   ierr = VecRestoreArray(red->xdup,&array);CHKERRQ(ierr);
666 
667   /* place ysub's local array into ydup */
668   ierr = VecGetArray(red->ysub,&array);CHKERRQ(ierr);
669   ierr = VecPlaceArray(red->ydup,(const PetscScalar*)array);CHKERRQ(ierr);
670 
671   /* scatter ydup to y */
672   ierr = VecScatterBegin(red->ydup,y,INSERT_VALUES,SCATTER_FORWARD,red->scatterout);CHKERRQ(ierr);
673   ierr = VecScatterEnd(red->ydup,y,INSERT_VALUES,SCATTER_FORWARD,red->scatterout);CHKERRQ(ierr);
674   ierr = VecResetArray(red->ydup);CHKERRQ(ierr);
675   ierr = VecRestoreArray(red->ysub,&array);CHKERRQ(ierr);
676   PetscFunctionReturn(0);
677 }
678 
679 #undef __FUNCT__
680 #define __FUNCT__ "PCDestroy_Redundant"
681 static PetscErrorCode PCDestroy_Redundant(PC pc)
682 {
683   PC_Redundant   *red = (PC_Redundant*)pc->data;
684   PetscErrorCode ierr;
685 
686   PetscFunctionBegin;
687   if (red->scatterin)  {ierr = VecScatterDestroy(red->scatterin);CHKERRQ(ierr);}
688   if (red->scatterout) {ierr = VecScatterDestroy(red->scatterout);CHKERRQ(ierr);}
689   if (red->ysub)       {ierr = VecDestroy(red->ysub);CHKERRQ(ierr);}
690   if (red->xsub)       {ierr = VecDestroy(red->xsub);CHKERRQ(ierr);}
691   if (red->xdup)       {ierr = VecDestroy(red->xdup);CHKERRQ(ierr);}
692   if (red->ydup)       {ierr = VecDestroy(red->ydup);CHKERRQ(ierr);}
693   if (red->pmats) {
694     ierr = MatDestroy(red->pmats);CHKERRQ(ierr);
695   }
696   ierr = PetscSubcommDestroy(red->psubcomm);CHKERRQ(ierr);
697   ierr = PCDestroy(red->pc);CHKERRQ(ierr);
698   ierr = PetscFree(red);CHKERRQ(ierr);
699   PetscFunctionReturn(0);
700 }
701 
702 #undef __FUNCT__
703 #define __FUNCT__ "PCSetFromOptions_Redundant"
704 static PetscErrorCode PCSetFromOptions_Redundant(PC pc)
705 {
706   PetscErrorCode ierr;
707   PC_Redundant   *red = (PC_Redundant*)pc->data;
708   PetscMPIInt    size;
709 
710   PetscFunctionBegin;
711   ierr = PetscOptionsHead("Redundant options");CHKERRQ(ierr);
712   ierr = MPI_Comm_size(pc->comm,&size);CHKERRQ(ierr);
713   ierr = PetscOptionsInt("-pc_redundant_number_comm","Number of subcommunicators","PCRedundantSetNumComm",size,&red->nsubcomm,0);CHKERRQ(ierr);
714   ierr = PetscOptionsTail();CHKERRQ(ierr);
715   PetscFunctionReturn(0);
716 }
717 
718 EXTERN_C_BEGIN
719 #undef __FUNCT__
720 #define __FUNCT__ "PCRedundantSetScatter_Redundant"
721 PetscErrorCode PETSCKSP_DLLEXPORT PCRedundantSetScatter_Redundant(PC pc,VecScatter in,VecScatter out)
722 {
723   PC_Redundant   *red = (PC_Redundant*)pc->data;
724   PetscErrorCode ierr;
725 
726   PetscFunctionBegin;
727   red->scatterin  = in;
728   red->scatterout = out;
729   ierr = PetscObjectReference((PetscObject)in);CHKERRQ(ierr);
730   ierr = PetscObjectReference((PetscObject)out);CHKERRQ(ierr);
731   PetscFunctionReturn(0);
732 }
733 EXTERN_C_END
734 
735 #undef __FUNCT__
736 #define __FUNCT__ "PCRedundantSetScatter"
737 /*@
738    PCRedundantSetScatter - Sets the scatter used to copy values into the
739      redundant local solve and the scatter to move them back into the global
740      vector.
741 
742    Collective on PC
743 
744    Input Parameters:
745 +  pc - the preconditioner context
746 .  in - the scatter to move the values in
747 -  out - the scatter to move them out
748 
749    Level: advanced
750 
751 .keywords: PC, redundant solve
752 @*/
753 PetscErrorCode PETSCKSP_DLLEXPORT PCRedundantSetScatter(PC pc,VecScatter in,VecScatter out)
754 {
755   PetscErrorCode ierr,(*f)(PC,VecScatter,VecScatter);
756 
757   PetscFunctionBegin;
758   PetscValidHeaderSpecific(pc,PC_COOKIE,1);
759   PetscValidHeaderSpecific(in,VEC_SCATTER_COOKIE,2);
760   PetscValidHeaderSpecific(out,VEC_SCATTER_COOKIE,3);
761   ierr = PetscObjectQueryFunction((PetscObject)pc,"PCRedundantSetScatter_C",(void (**)(void))&f);CHKERRQ(ierr);
762   if (f) {
763     ierr = (*f)(pc,in,out);CHKERRQ(ierr);
764   }
765   PetscFunctionReturn(0);
766 }
767 
768 EXTERN_C_BEGIN
769 #undef __FUNCT__
770 #define __FUNCT__ "PCRedundantGetPC_Redundant"
771 PetscErrorCode PETSCKSP_DLLEXPORT PCRedundantGetPC_Redundant(PC pc,PC *innerpc)
772 {
773   PC_Redundant *red = (PC_Redundant*)pc->data;
774 
775   PetscFunctionBegin;
776   *innerpc = red->pc;
777   PetscFunctionReturn(0);
778 }
779 EXTERN_C_END
780 
781 #undef __FUNCT__
782 #define __FUNCT__ "PCRedundantGetPC"
783 /*@
784    PCRedundantGetPC - Gets the sequential PC created by the redundant PC.
785 
786    Not Collective
787 
788    Input Parameter:
789 .  pc - the preconditioner context
790 
791    Output Parameter:
792 .  innerpc - the sequential PC
793 
794    Level: advanced
795 
796 .keywords: PC, redundant solve
797 @*/
798 PetscErrorCode PETSCKSP_DLLEXPORT PCRedundantGetPC(PC pc,PC *innerpc)
799 {
800   PetscErrorCode ierr,(*f)(PC,PC*);
801 
802   PetscFunctionBegin;
803   PetscValidHeaderSpecific(pc,PC_COOKIE,1);
804   PetscValidPointer(innerpc,2);
805   ierr = PetscObjectQueryFunction((PetscObject)pc,"PCRedundantGetPC_C",(void (**)(void))&f);CHKERRQ(ierr);
806   if (f) {
807     ierr = (*f)(pc,innerpc);CHKERRQ(ierr);
808   }
809   PetscFunctionReturn(0);
810 }
811 
812 EXTERN_C_BEGIN
813 #undef __FUNCT__
814 #define __FUNCT__ "PCRedundantGetOperators_Redundant"
815 PetscErrorCode PETSCKSP_DLLEXPORT PCRedundantGetOperators_Redundant(PC pc,Mat *mat,Mat *pmat)
816 {
817   PC_Redundant *red = (PC_Redundant*)pc->data;
818 
819   PetscFunctionBegin;
820   if (mat)  *mat  = red->pmats;
821   if (pmat) *pmat = red->pmats;
822   PetscFunctionReturn(0);
823 }
824 EXTERN_C_END
825 
826 #undef __FUNCT__
827 #define __FUNCT__ "PCRedundantGetOperators"
828 /*@
829    PCRedundantGetOperators - gets the sequential matrix and preconditioner matrix
830 
831    Not Collective
832 
833    Input Parameter:
834 .  pc - the preconditioner context
835 
836    Output Parameters:
837 +  mat - the matrix
838 -  pmat - the (possibly different) preconditioner matrix
839 
840    Level: advanced
841 
842 .keywords: PC, redundant solve
843 @*/
844 PetscErrorCode PETSCKSP_DLLEXPORT PCRedundantGetOperators(PC pc,Mat *mat,Mat *pmat)
845 {
846   PetscErrorCode ierr,(*f)(PC,Mat*,Mat*);
847 
848   PetscFunctionBegin;
849   PetscValidHeaderSpecific(pc,PC_COOKIE,1);
850   if (mat)  PetscValidPointer(mat,2);
851   if (pmat) PetscValidPointer(pmat,3);
852   ierr = PetscObjectQueryFunction((PetscObject)pc,"PCRedundantGetOperators_C",(void (**)(void))&f);CHKERRQ(ierr);
853   if (f) {
854     ierr = (*f)(pc,mat,pmat);CHKERRQ(ierr);
855   }
856   PetscFunctionReturn(0);
857 }
858 
859 /* -------------------------------------------------------------------------------------*/
860 /*MC
861      PCREDUNDANT - Runs a preconditioner for the entire problem on subgroups of processors
862 
863      Options for the redundant preconditioners can be set with -redundant_pc_xxx
864 
865   Options Database:
866 .  -pc_redundant_number_comm - number of sub communicators to use
867 
868    Level: intermediate
869 
870 .seealso:  PCCreate(), PCSetType(), PCType (for list of available types), PCRedundantSetScatter(),
871            PCRedundantGetPC(), PCRedundantGetOperators()
872 M*/
873 
874 EXTERN_C_BEGIN
875 #undef __FUNCT__
876 #define __FUNCT__ "PCCreate_Redundant"
877 PetscErrorCode PETSCKSP_DLLEXPORT PCCreate_Redundant(PC pc)
878 {
879   PetscErrorCode ierr;
880   PC_Redundant   *red;
881 
882   PetscFunctionBegin;
883   ierr = PetscNew(PC_Redundant,&red);CHKERRQ(ierr);
884   ierr = PetscLogObjectMemory(pc,sizeof(PC_Redundant));CHKERRQ(ierr);
885   red->useparallelmat = PETSC_TRUE;
886   pc->data            = (void*)red;
887 
888   pc->ops->apply           = PCApply_Redundant;
889   pc->ops->applytranspose  = 0;
890   pc->ops->setup           = PCSetUp_Redundant;
891   pc->ops->destroy         = PCDestroy_Redundant;
892   pc->ops->setfromoptions  = PCSetFromOptions_Redundant;
893   pc->ops->view            = PCView_Redundant;
894   ierr = PetscObjectComposeFunctionDynamic((PetscObject)pc,"PCRedundantSetScatter_C","PCRedundantSetScatter_Redundant",
895                     PCRedundantSetScatter_Redundant);CHKERRQ(ierr);
896   ierr = PetscObjectComposeFunctionDynamic((PetscObject)pc,"PCRedundantGetPC_C","PCRedundantGetPC_Redundant",
897                     PCRedundantGetPC_Redundant);CHKERRQ(ierr);
898   ierr = PetscObjectComposeFunctionDynamic((PetscObject)pc,"PCRedundantGetOperators_C","PCRedundantGetOperators_Redundant",
899                     PCRedundantGetOperators_Redundant);CHKERRQ(ierr);
900   PetscFunctionReturn(0);
901 }
902 EXTERN_C_END
903