xref: /petsc/src/ksp/pc/impls/tfs/gs.c (revision d1528f56e7efe90e41719e8af1e4ce61b838432e)
1 #define PETSCKSP_DLL
2 
3 /***********************************gs.c***************************************
4 
5 Author: Henry M. Tufo III
6 
7 e-mail: hmt@cs.brown.edu
8 
9 snail-mail:
10 Division of Applied Mathematics
11 Brown University
12 Providence, RI 02912
13 
14 Last Modification:
15 6.21.97
16 ************************************gs.c**************************************/
17 
18 /***********************************gs.c***************************************
19 File Description:
20 -----------------
21 
22 ************************************gs.c**************************************/
23 
24 #include "src/ksp/pc/impls/tfs/tfs.h"
25 
26 /* default length of number of items via tree - doubles if exceeded */
27 #define TREE_BUF_SZ 2048;
28 #define GS_VEC_SZ   1
29 
30 
31 
32 /***********************************gs.c***************************************
33 Type: struct gather_scatter_id
34 ------------------------------
35 
36 ************************************gs.c**************************************/
37 typedef struct gather_scatter_id {
38   PetscInt id;
39   PetscInt nel_min;
40   PetscInt nel_max;
41   PetscInt nel_sum;
42   PetscInt negl;
43   PetscInt gl_max;
44   PetscInt gl_min;
45   PetscInt repeats;
46   PetscInt ordered;
47   PetscInt positive;
48   PetscScalar *vals;
49 
50   /* bit mask info */
51   PetscInt *my_proc_mask;
52   PetscInt mask_sz;
53   PetscInt *ngh_buf;
54   PetscInt ngh_buf_sz;
55   PetscInt *nghs;
56   PetscInt num_nghs;
57   PetscInt max_nghs;
58   PetscInt *pw_nghs;
59   PetscInt num_pw_nghs;
60   PetscInt *tree_nghs;
61   PetscInt num_tree_nghs;
62 
63   PetscInt num_loads;
64 
65   /* repeats == true -> local info */
66   PetscInt nel;         /* number of unique elememts */
67   PetscInt *elms;       /* of size nel */
68   PetscInt nel_total;
69   PetscInt *local_elms; /* of size nel_total */
70   PetscInt *companion;  /* of size nel_total */
71 
72   /* local info */
73   PetscInt num_local_total;
74   PetscInt local_strength;
75   PetscInt num_local;
76   PetscInt *num_local_reduce;
77   PetscInt **local_reduce;
78   PetscInt num_local_gop;
79   PetscInt *num_gop_local_reduce;
80   PetscInt **gop_local_reduce;
81 
82   /* pairwise info */
83   PetscInt level;
84   PetscInt num_pairs;
85   PetscInt max_pairs;
86   PetscInt loc_node_pairs;
87   PetscInt max_node_pairs;
88   PetscInt min_node_pairs;
89   PetscInt avg_node_pairs;
90   PetscInt *pair_list;
91   PetscInt *msg_sizes;
92   PetscInt **node_list;
93   PetscInt len_pw_list;
94   PetscInt *pw_elm_list;
95   PetscScalar *pw_vals;
96 
97   MPI_Request *msg_ids_in;
98   MPI_Request *msg_ids_out;
99 
100   PetscScalar *out;
101   PetscScalar *in;
102   PetscInt msg_total;
103 
104   /* tree - crystal accumulator info */
105   PetscInt max_left_over;
106   PetscInt *pre;
107   PetscInt *in_num;
108   PetscInt *out_num;
109   PetscInt **in_list;
110   PetscInt **out_list;
111 
112   /* new tree work*/
113   PetscInt  tree_nel;
114   PetscInt *tree_elms;
115   PetscScalar *tree_buf;
116   PetscScalar *tree_work;
117 
118   PetscInt  tree_map_sz;
119   PetscInt *tree_map_in;
120   PetscInt *tree_map_out;
121 
122   /* current memory status */
123   PetscInt gl_bss_min;
124   PetscInt gl_perm_min;
125 
126   /* max segment size for gs_gop_vec() */
127   PetscInt vec_sz;
128 
129   /* hack to make paul happy */
130   MPI_Comm gs_comm;
131 
132 } gs_id;
133 
134 static gs_id *gsi_check_args(PetscInt *elms, PetscInt nel, PetscInt level);
135 static PetscErrorCode gsi_via_bit_mask(gs_id *gs);
136 static PetscErrorCode get_ngh_buf(gs_id *gs);
137 static PetscErrorCode set_pairwise(gs_id *gs);
138 static gs_id * gsi_new(void);
139 static PetscErrorCode set_tree(gs_id *gs);
140 
141 /* same for all but vector flavor */
142 static PetscErrorCode gs_gop_local_out(gs_id *gs, PetscScalar *vals);
143 /* vector flavor */
144 static PetscErrorCode gs_gop_vec_local_out(gs_id *gs, PetscScalar *vals, PetscInt step);
145 
146 static PetscErrorCode gs_gop_vec_plus(gs_id *gs, PetscScalar *in_vals, PetscInt step);
147 static PetscErrorCode gs_gop_vec_pairwise_plus(gs_id *gs, PetscScalar *in_vals, PetscInt step);
148 static PetscErrorCode gs_gop_vec_local_plus(gs_id *gs, PetscScalar *vals, PetscInt step);
149 static PetscErrorCode gs_gop_vec_local_in_plus(gs_id *gs, PetscScalar *vals, PetscInt step);
150 static PetscErrorCode gs_gop_vec_tree_plus(gs_id *gs, PetscScalar *vals, PetscInt step);
151 
152 
153 static PetscErrorCode gs_gop_plus(gs_id *gs, PetscScalar *in_vals);
154 static PetscErrorCode gs_gop_pairwise_plus(gs_id *gs, PetscScalar *in_vals);
155 static PetscErrorCode gs_gop_local_plus(gs_id *gs, PetscScalar *vals);
156 static PetscErrorCode gs_gop_local_in_plus(gs_id *gs, PetscScalar *vals);
157 static PetscErrorCode gs_gop_tree_plus(gs_id *gs, PetscScalar *vals);
158 
159 static PetscErrorCode gs_gop_plus_hc(gs_id *gs, PetscScalar *in_vals, PetscInt dim);
160 static PetscErrorCode gs_gop_pairwise_plus_hc(gs_id *gs, PetscScalar *in_vals, PetscInt dim);
161 static PetscErrorCode gs_gop_tree_plus_hc(gs_id *gs, PetscScalar *vals, PetscInt dim);
162 
163 static PetscErrorCode gs_gop_times(gs_id *gs, PetscScalar *in_vals);
164 static PetscErrorCode gs_gop_pairwise_times(gs_id *gs, PetscScalar *in_vals);
165 static PetscErrorCode gs_gop_local_times(gs_id *gs, PetscScalar *vals);
166 static PetscErrorCode gs_gop_local_in_times(gs_id *gs, PetscScalar *vals);
167 static PetscErrorCode gs_gop_tree_times(gs_id *gs, PetscScalar *vals);
168 
169 static PetscErrorCode gs_gop_min(gs_id *gs, PetscScalar *in_vals);
170 static PetscErrorCode gs_gop_pairwise_min(gs_id *gs, PetscScalar *in_vals);
171 static PetscErrorCode gs_gop_local_min(gs_id *gs, PetscScalar *vals);
172 static PetscErrorCode gs_gop_local_in_min(gs_id *gs, PetscScalar *vals);
173 static PetscErrorCode gs_gop_tree_min(gs_id *gs, PetscScalar *vals);
174 
175 static PetscErrorCode gs_gop_min_abs(gs_id *gs, PetscScalar *in_vals);
176 static PetscErrorCode gs_gop_pairwise_min_abs(gs_id *gs, PetscScalar *in_vals);
177 static PetscErrorCode gs_gop_local_min_abs(gs_id *gs, PetscScalar *vals);
178 static PetscErrorCode gs_gop_local_in_min_abs(gs_id *gs, PetscScalar *vals);
179 static PetscErrorCode gs_gop_tree_min_abs(gs_id *gs, PetscScalar *vals);
180 
181 static PetscErrorCode gs_gop_max(gs_id *gs, PetscScalar *in_vals);
182 static PetscErrorCode gs_gop_pairwise_max(gs_id *gs, PetscScalar *in_vals);
183 static PetscErrorCode gs_gop_local_max(gs_id *gs, PetscScalar *vals);
184 static PetscErrorCode gs_gop_local_in_max(gs_id *gs, PetscScalar *vals);
185 static PetscErrorCode gs_gop_tree_max(gs_id *gs, PetscScalar *vals);
186 
187 static PetscErrorCode gs_gop_max_abs(gs_id *gs, PetscScalar *in_vals);
188 static PetscErrorCode gs_gop_pairwise_max_abs(gs_id *gs, PetscScalar *in_vals);
189 static PetscErrorCode gs_gop_local_max_abs(gs_id *gs, PetscScalar *vals);
190 static PetscErrorCode gs_gop_local_in_max_abs(gs_id *gs, PetscScalar *vals);
191 static PetscErrorCode gs_gop_tree_max_abs(gs_id *gs, PetscScalar *vals);
192 
193 static PetscErrorCode gs_gop_exists(gs_id *gs, PetscScalar *in_vals);
194 static PetscErrorCode gs_gop_pairwise_exists(gs_id *gs, PetscScalar *in_vals);
195 static PetscErrorCode gs_gop_local_exists(gs_id *gs, PetscScalar *vals);
196 static PetscErrorCode gs_gop_local_in_exists(gs_id *gs, PetscScalar *vals);
197 static PetscErrorCode gs_gop_tree_exists(gs_id *gs, PetscScalar *vals);
198 
199 static PetscErrorCode gs_gop_pairwise_binary(gs_id *gs, PetscScalar *in_vals, rbfp fct);
200 static PetscErrorCode gs_gop_local_binary(gs_id *gs, PetscScalar *vals, rbfp fct);
201 static PetscErrorCode gs_gop_local_in_binary(gs_id *gs, PetscScalar *vals, rbfp fct);
202 static PetscErrorCode gs_gop_tree_binary(gs_id *gs, PetscScalar *vals, rbfp fct);
203 
204 
205 
206 /* global vars */
207 /* from comm.c module */
208 
209 static PetscInt num_gs_ids = 0;
210 
211 /* should make this dynamic ... later */
212 static PetscInt msg_buf=MAX_MSG_BUF;
213 static PetscInt vec_sz=GS_VEC_SZ;
214 static PetscInt *tree_buf=NULL;
215 static PetscInt tree_buf_sz=0;
216 static PetscInt ntree=0;
217 
218 
219 /******************************************************************************
220 Function: gs_init_()
221 
222 Input :
223 Output:
224 Return:
225 Description:
226 ******************************************************************************/
227 PetscErrorCode gs_init_vec_sz(PetscInt size)
228 {
229   PetscFunctionBegin;
230   vec_sz = size;
231   PetscFunctionReturn(0);
232 }
233 
234 /******************************************************************************
235 Function: gs_init_()
236 
237 Input :
238 Output:
239 Return:
240 Description:
241 ******************************************************************************/
242 PetscErrorCode gs_init_msg_buf_sz(PetscInt buf_size)
243 {
244   PetscFunctionBegin;
245   msg_buf = buf_size;
246   PetscFunctionReturn(0);
247 }
248 
249 /******************************************************************************
250 Function: gs_init()
251 
252 Input :
253 
254 Output:
255 
256 RETURN:
257 
258 Description:
259 ******************************************************************************/
260 gs_id *gs_init( PetscInt *elms, PetscInt nel, PetscInt level)
261 {
262    gs_id *gs;
263   MPI_Group gs_group;
264   MPI_Comm  gs_comm;
265 
266   PetscFunctionBegin;
267   /* ensure that communication package has been initialized */
268   comm_init();
269 
270 
271   /* determines if we have enough dynamic/semi-static memory */
272   /* checks input, allocs and sets gd_id template            */
273   gs = gsi_check_args(elms,nel,level);
274 
275   /* only bit mask version up and working for the moment    */
276   /* LATER :: get int list version working for sparse pblms */
277   gsi_via_bit_mask(gs);
278 
279 
280   MPI_Comm_group(MPI_COMM_WORLD,&gs_group);
281   MPI_Comm_create(MPI_COMM_WORLD,gs_group,&gs_comm);
282   gs->gs_comm=gs_comm;
283 
284   return(gs);
285 }
286 
287 
288 
289 /******************************************************************************
290 Function: gsi_new()
291 
292 Input :
293 Output:
294 Return:
295 Description:
296 
297 elm list must >= 0!!!
298 elm repeats allowed
299 ******************************************************************************/
300 static gs_id *gsi_new(void)
301 {
302   gs_id *gs;
303   gs = (gs_id *) malloc(sizeof(gs_id));
304   PetscMemzero(gs,sizeof(gs_id));
305   return(gs);
306 }
307 
308 
309 
310 /******************************************************************************
311 Function: gsi_check_args()
312 
313 Input :
314 Output:
315 Return:
316 Description:
317 
318 elm list must >= 0!!!
319 elm repeats allowed
320 local working copy of elms is sorted
321 ******************************************************************************/
322 static gs_id * gsi_check_args(PetscInt *in_elms, PetscInt nel, PetscInt level)
323 {
324    PetscInt i, j, k, t2;
325   PetscInt *companion, *elms, *unique, *iptr;
326   PetscInt num_local=0, *num_to_reduce, **local_reduce;
327   PetscInt oprs[] = {NON_UNIFORM,GL_MIN,GL_MAX,GL_ADD,GL_MIN,GL_MAX,GL_MIN,GL_B_AND};
328   PetscInt vals[sizeof(oprs)/sizeof(oprs[0])-1];
329   PetscInt work[sizeof(oprs)/sizeof(oprs[0])-1];
330   gs_id *gs;
331   PetscErrorCode ierr;
332 
333 
334   if (!in_elms)
335     {SETERRABORT(PETSC_COMM_WORLD,PETSC_ERR_PLIB,"elms point to nothing!!!\n");}
336 
337   if (nel<0)
338     {SETERRABORT(PETSC_COMM_WORLD,PETSC_ERR_PLIB,"can't have fewer than 0 elms!!!\n");}
339 
340   if (nel==0)
341     {ierr = PetscInfo(0,"I don't have any elements!!!\n");}
342 
343   /* get space for gs template */
344   gs = gsi_new();
345   gs->id = ++num_gs_ids;
346 
347   /* hmt 6.4.99                                            */
348   /* caller can set global ids that don't participate to 0 */
349   /* gs_init ignores all zeros in elm list                 */
350   /* negative global ids are still invalid                 */
351   for (i=j=0;i<nel;i++)
352     {if (in_elms[i]!=0) {j++;}}
353 
354   k=nel; nel=j;
355 
356   /* copy over in_elms list and create inverse map */
357   elms = (PetscInt*) malloc((nel+1)*sizeof(PetscInt));
358   companion = (PetscInt*) malloc(nel*sizeof(PetscInt));
359 
360   for (i=j=0;i<k;i++)
361     {
362       if (in_elms[i]!=0)
363         {elms[j] = in_elms[i]; companion[j++] = i;}
364     }
365 
366   if (j!=nel)
367     {SETERRABORT(PETSC_COMM_WORLD,PETSC_ERR_PLIB,"nel j mismatch!\n");}
368 
369   /* pre-pass ... check to see if sorted */
370   elms[nel] = INT_MAX;
371   iptr = elms;
372   unique = elms+1;
373   j=0;
374   while (*iptr!=INT_MAX)
375     {
376       if (*iptr++>*unique++)
377         {j=1; break;}
378     }
379 
380   /* set up inverse map */
381   if (j)
382     {
383       ierr = PetscInfo(0,"gsi_check_args() :: elm list *not* sorted!\n");
384       SMI_sort((void*)elms, (void*)companion, nel, SORT_INTEGER);
385     }
386   else
387     {ierr = PetscInfo(0,"gsi_check_args() :: elm list sorted!\n");}
388   elms[nel] = INT_MIN;
389 
390   /* first pass */
391   /* determine number of unique elements, check pd */
392   for (i=k=0;i<nel;i+=j)
393     {
394       t2 = elms[i];
395       j=++i;
396 
397       /* clump 'em for now */
398       while (elms[j]==t2) {j++;}
399 
400       /* how many together and num local */
401       if (j-=i)
402         {num_local++; k+=j;}
403     }
404 
405   /* how many unique elements? */
406   gs->repeats=k;
407   gs->nel = nel-k;
408 
409 
410   /* number of repeats? */
411   gs->num_local = num_local;
412   num_local+=2;
413   gs->local_reduce=local_reduce=(PetscInt **)malloc(num_local*sizeof(PetscInt*));
414   gs->num_local_reduce=num_to_reduce=(PetscInt*) malloc(num_local*sizeof(PetscInt));
415 
416   unique = (PetscInt*) malloc((gs->nel+1)*sizeof(PetscInt));
417   gs->elms = unique;
418   gs->nel_total = nel;
419   gs->local_elms = elms;
420   gs->companion = companion;
421 
422   /* compess map as well as keep track of local ops */
423   for (num_local=i=j=0;i<gs->nel;i++)
424     {
425       k=j;
426       t2 = unique[i] = elms[j];
427       companion[i] = companion[j];
428 
429       while (elms[j]==t2) {j++;}
430 
431       if ((t2=(j-k))>1)
432         {
433           /* number together */
434           num_to_reduce[num_local] = t2++;
435           iptr = local_reduce[num_local++] = (PetscInt*)malloc(t2*sizeof(PetscInt));
436 
437           /* to use binary searching don't remap until we check intersection */
438           *iptr++ = i;
439 
440           /* note that we're skipping the first one */
441           while (++k<j)
442             {*(iptr++) = companion[k];}
443           *iptr = -1;
444         }
445     }
446 
447   /* sentinel for ngh_buf */
448   unique[gs->nel]=INT_MAX;
449 
450   /* for two partition sort hack */
451   num_to_reduce[num_local] = 0;
452   local_reduce[num_local] = NULL;
453   num_to_reduce[++num_local] = 0;
454   local_reduce[num_local] = NULL;
455 
456   /* load 'em up */
457   /* note one extra to hold NON_UNIFORM flag!!! */
458   vals[2] = vals[1] = vals[0] = nel;
459   if (gs->nel>0)
460     {
461        vals[3] = unique[0];
462        vals[4] = unique[gs->nel-1];
463     }
464   else
465     {
466        vals[3] = INT_MAX;
467        vals[4] = INT_MIN;
468     }
469   vals[5] = level;
470   vals[6] = num_gs_ids;
471 
472   /* GLOBAL: send 'em out */
473   giop(vals,work,sizeof(oprs)/sizeof(oprs[0])-1,oprs);
474 
475   /* must be semi-pos def - only pairwise depends on this */
476   /* LATER - remove this restriction */
477   if (vals[3]<0)
478     {SETERRABORT(PETSC_COMM_WORLD,PETSC_ERR_PLIB,"gsi_check_args() :: system not semi-pos def \n");}
479 
480   if (vals[4]==INT_MAX)
481     {SETERRABORT(PETSC_COMM_WORLD,PETSC_ERR_PLIB,"gsi_check_args() :: system ub too large !\n");}
482 
483   gs->nel_min = vals[0];
484   gs->nel_max = vals[1];
485   gs->nel_sum = vals[2];
486   gs->gl_min  = vals[3];
487   gs->gl_max  = vals[4];
488   gs->negl    = vals[4]-vals[3]+1;
489 
490   if (gs->negl<=0)
491     {SETERRABORT(PETSC_COMM_WORLD,PETSC_ERR_PLIB,"gsi_check_args() :: system empty or neg :: %d\n");}
492 
493   /* LATER :: add level == -1 -> program selects level */
494   if (vals[5]<0)
495     {vals[5]=0;}
496   else if (vals[5]>num_nodes)
497     {vals[5]=num_nodes;}
498   gs->level = vals[5];
499 
500   return(gs);
501 }
502 
503 
504 /******************************************************************************
505 Function: gsi_via_bit_mask()
506 
507 Input :
508 Output:
509 Return:
510 Description:
511 
512 
513 ******************************************************************************/
514 static PetscErrorCode gsi_via_bit_mask(gs_id *gs)
515 {
516    PetscInt i, nel, *elms;
517   PetscInt t1;
518   PetscInt **reduce;
519   PetscInt *map;
520 
521   /* totally local removes ... ct_bits == 0 */
522   get_ngh_buf(gs);
523 
524   if (gs->level)
525     {set_pairwise(gs);}
526 
527   if (gs->max_left_over)
528     {set_tree(gs);}
529 
530   /* intersection local and pairwise/tree? */
531   gs->num_local_total = gs->num_local;
532   gs->gop_local_reduce = gs->local_reduce;
533   gs->num_gop_local_reduce = gs->num_local_reduce;
534 
535   map = gs->companion;
536 
537   /* is there any local compression */
538   if (!gs->num_local) {
539     gs->local_strength = NONE;
540     gs->num_local_gop = 0;
541   } else {
542       /* ok find intersection */
543       map = gs->companion;
544       reduce = gs->local_reduce;
545       for (i=0, t1=0; i<gs->num_local; i++, reduce++)
546         {
547           if ((ivec_binary_search(**reduce,gs->pw_elm_list,gs->len_pw_list)>=0)
548               ||
549               ivec_binary_search(**reduce,gs->tree_map_in,gs->tree_map_sz)>=0)
550             {
551               /* printf("C%d :: i=%d, **reduce=%d\n",my_id,i,**reduce); */
552               t1++;
553               if (gs->num_local_reduce[i]<=0)
554                 {SETERRQ(PETSC_ERR_PLIB,"nobody in list?");}
555               gs->num_local_reduce[i] *= -1;
556             }
557            **reduce=map[**reduce];
558         }
559 
560       /* intersection is empty */
561       if (!t1)
562         {
563           gs->local_strength = FULL;
564           gs->num_local_gop = 0;
565         }
566       /* intersection not empty */
567       else
568         {
569           gs->local_strength = PARTIAL;
570           SMI_sort((void*)gs->num_local_reduce, (void*)gs->local_reduce,
571                    gs->num_local + 1, SORT_INT_PTR);
572 
573           gs->num_local_gop = t1;
574           gs->num_local_total =  gs->num_local;
575           gs->num_local    -= t1;
576           gs->gop_local_reduce = gs->local_reduce;
577           gs->num_gop_local_reduce = gs->num_local_reduce;
578 
579           for (i=0; i<t1; i++)
580             {
581               if (gs->num_gop_local_reduce[i]>=0)
582                 {SETERRQ(PETSC_ERR_PLIB,"they aren't negative?");}
583               gs->num_gop_local_reduce[i] *= -1;
584               gs->local_reduce++;
585               gs->num_local_reduce++;
586             }
587           gs->local_reduce++;
588           gs->num_local_reduce++;
589         }
590     }
591 
592   elms = gs->pw_elm_list;
593   nel  = gs->len_pw_list;
594   for (i=0; i<nel; i++)
595     {elms[i] = map[elms[i]];}
596 
597   elms = gs->tree_map_in;
598   nel  = gs->tree_map_sz;
599   for (i=0; i<nel; i++)
600     {elms[i] = map[elms[i]];}
601 
602   /* clean up */
603   free((void*) gs->local_elms);
604   free((void*) gs->companion);
605   free((void*) gs->elms);
606   free((void*) gs->ngh_buf);
607   gs->local_elms = gs->companion = gs->elms = gs->ngh_buf = NULL;
608   PetscFunctionReturn(0);
609 }
610 
611 
612 
613 /******************************************************************************
614 Function: place_in_tree()
615 
616 Input :
617 Output:
618 Return:
619 Description:
620 
621 
622 ******************************************************************************/
623 static PetscErrorCode place_in_tree( PetscInt elm)
624 {
625    PetscInt *tp, n;
626 
627   PetscFunctionBegin;
628   if (ntree==tree_buf_sz)
629     {
630       if (tree_buf_sz)
631         {
632           tp = tree_buf;
633           n = tree_buf_sz;
634           tree_buf_sz<<=1;
635           tree_buf = (PetscInt*)malloc(tree_buf_sz*sizeof(PetscInt));
636           ivec_copy(tree_buf,tp,n);
637           free(tp);
638         }
639       else
640         {
641           tree_buf_sz = TREE_BUF_SZ;
642           tree_buf = (PetscInt*)malloc(tree_buf_sz*sizeof(PetscInt));
643         }
644     }
645 
646   tree_buf[ntree++] = elm;
647   PetscFunctionReturn(0);
648 }
649 
650 
651 
652 /******************************************************************************
653 Function: get_ngh_buf()
654 
655 Input :
656 Output:
657 Return:
658 Description:
659 
660 
661 ******************************************************************************/
662 static PetscErrorCode get_ngh_buf(gs_id *gs)
663 {
664    PetscInt i, j, npw=0, ntree_map=0;
665   PetscInt p_mask_size, ngh_buf_size, buf_size;
666   PetscInt *p_mask, *sh_proc_mask, *pw_sh_proc_mask;
667   PetscInt *ngh_buf, *buf1, *buf2;
668   PetscInt offset, per_load, num_loads, or_ct, start, end;
669   PetscInt *ptr1, *ptr2, i_start, negl, nel, *elms;
670   PetscInt oper=GL_B_OR;
671   PetscInt *ptr3, *t_mask, level, ct1, ct2;
672 
673   PetscFunctionBegin;
674   /* to make life easier */
675   nel   = gs->nel;
676   elms  = gs->elms;
677   level = gs->level;
678 
679   /* det #bytes needed for processor bit masks and init w/mask cor. to my_id */
680   p_mask = (PetscInt*) malloc(p_mask_size=len_bit_mask(num_nodes));
681   set_bit_mask(p_mask,p_mask_size,my_id);
682 
683   /* allocate space for masks and info bufs */
684   gs->nghs = sh_proc_mask = (PetscInt*) malloc(p_mask_size);
685   gs->pw_nghs = pw_sh_proc_mask = (PetscInt*) malloc(p_mask_size);
686   gs->ngh_buf_sz = ngh_buf_size = p_mask_size*nel;
687   t_mask = (PetscInt*) malloc(p_mask_size);
688   gs->ngh_buf = ngh_buf = (PetscInt*) malloc(ngh_buf_size);
689 
690   /* comm buffer size ... memory usage bounded by ~2*msg_buf */
691   /* had thought I could exploit rendezvous threshold */
692 
693   /* default is one pass */
694   per_load = negl  = gs->negl;
695   gs->num_loads = num_loads = 1;
696   i=p_mask_size*negl;
697 
698   /* possible overflow on buffer size */
699   /* overflow hack                    */
700   if (i<0) {i=INT_MAX;}
701 
702   buf_size = PetscMin(msg_buf,i);
703 
704   /* can we do it? */
705   if (p_mask_size>buf_size)
706     {SETERRQ2(PETSC_ERR_PLIB,"get_ngh_buf() :: buf<pms :: %d>%d\n",p_mask_size,buf_size);}
707 
708   /* get giop buf space ... make *only* one malloc */
709   buf1 = (PetscInt*) malloc(buf_size<<1);
710 
711   /* more than one gior exchange needed? */
712   if (buf_size!=i)
713     {
714       per_load = buf_size/p_mask_size;
715       buf_size = per_load*p_mask_size;
716       gs->num_loads = num_loads = negl/per_load + (negl%per_load>0);
717     }
718 
719 
720   /* convert buf sizes from #bytes to #ints - 32 bit only! */
721   p_mask_size/=sizeof(PetscInt); ngh_buf_size/=sizeof(PetscInt); buf_size/=sizeof(PetscInt);
722 
723   /* find giop work space */
724   buf2 = buf1+buf_size;
725 
726   /* hold #ints needed for processor masks */
727   gs->mask_sz=p_mask_size;
728 
729   /* init buffers */
730   ivec_zero(sh_proc_mask,p_mask_size);
731   ivec_zero(pw_sh_proc_mask,p_mask_size);
732   ivec_zero(ngh_buf,ngh_buf_size);
733 
734   /* HACK reset tree info */
735   tree_buf=NULL;
736   tree_buf_sz=ntree=0;
737 
738   /* ok do it */
739   for (ptr1=ngh_buf,ptr2=elms,end=gs->gl_min,or_ct=i=0; or_ct<num_loads; or_ct++)
740     {
741       /* identity for bitwise or is 000...000 */
742       ivec_zero(buf1,buf_size);
743 
744       /* load msg buffer */
745       for (start=end,end+=per_load,i_start=i; (offset=*ptr2)<end; i++, ptr2++)
746         {
747           offset = (offset-start)*p_mask_size;
748           ivec_copy(buf1+offset,p_mask,p_mask_size);
749         }
750 
751       /* GLOBAL: pass buffer */
752       giop(buf1,buf2,buf_size,&oper);
753 
754 
755       /* unload buffer into ngh_buf */
756       ptr2=(elms+i_start);
757       for(ptr3=buf1,j=start; j<end; ptr3+=p_mask_size,j++)
758         {
759           /* I own it ... may have to pairwise it */
760           if (j==*ptr2)
761             {
762               /* do i share it w/anyone? */
763               ct1 = ct_bits((char *)ptr3,p_mask_size*sizeof(PetscInt));
764               /* guess not */
765               if (ct1<2)
766                 {ptr2++; ptr1+=p_mask_size; continue;}
767 
768               /* i do ... so keep info and turn off my bit */
769               ivec_copy(ptr1,ptr3,p_mask_size);
770               ivec_xor(ptr1,p_mask,p_mask_size);
771               ivec_or(sh_proc_mask,ptr1,p_mask_size);
772 
773               /* is it to be done pairwise? */
774               if (--ct1<=level)
775                 {
776                   npw++;
777 
778                   /* turn on high bit to indicate pw need to process */
779                   *ptr2++ |= TOP_BIT;
780                   ivec_or(pw_sh_proc_mask,ptr1,p_mask_size);
781                   ptr1+=p_mask_size;
782                   continue;
783                 }
784 
785               /* get set for next and note that I have a tree contribution */
786               /* could save exact elm index for tree here -> save a search */
787               ptr2++; ptr1+=p_mask_size; ntree_map++;
788             }
789           /* i don't but still might be involved in tree */
790           else
791             {
792 
793               /* shared by how many? */
794               ct1 = ct_bits((char *)ptr3,p_mask_size*sizeof(PetscInt));
795 
796               /* none! */
797               if (ct1<2)
798                 {continue;}
799 
800               /* is it going to be done pairwise? but not by me of course!*/
801               if (--ct1<=level)
802                 {continue;}
803             }
804           /* LATER we're going to have to process it NOW */
805           /* nope ... tree it */
806           place_in_tree(j);
807         }
808     }
809 
810   free((void*)t_mask);
811   free((void*)buf1);
812 
813   gs->len_pw_list=npw;
814   gs->num_nghs = ct_bits((char *)sh_proc_mask,p_mask_size*sizeof(PetscInt));
815 
816   /* expand from bit mask list to int list and save ngh list */
817   gs->nghs = (PetscInt*) malloc(gs->num_nghs * sizeof(PetscInt));
818   bm_to_proc((char *)sh_proc_mask,p_mask_size*sizeof(PetscInt),gs->nghs);
819 
820   gs->num_pw_nghs = ct_bits((char *)pw_sh_proc_mask,p_mask_size*sizeof(PetscInt));
821 
822   oper = GL_MAX;
823   ct1 = gs->num_nghs;
824   giop(&ct1,&ct2,1,&oper);
825   gs->max_nghs = ct1;
826 
827   gs->tree_map_sz  = ntree_map;
828   gs->max_left_over=ntree;
829 
830   free((void*)p_mask);
831   free((void*)sh_proc_mask);
832   PetscFunctionReturn(0);
833 }
834 
835 
836 
837 
838 
839 /******************************************************************************
840 Function: pairwise_init()
841 
842 Input :
843 Output:
844 Return:
845 Description:
846 
847 if an element is shared by fewer that level# of nodes do pairwise exch
848 ******************************************************************************/
849 static PetscErrorCode set_pairwise(gs_id *gs)
850 {
851    PetscInt i, j;
852   PetscInt p_mask_size;
853   PetscInt *p_mask, *sh_proc_mask, *tmp_proc_mask;
854   PetscInt *ngh_buf, *buf2;
855   PetscInt offset;
856   PetscInt *msg_list, *msg_size, **msg_nodes, nprs;
857   PetscInt *pairwise_elm_list, len_pair_list=0;
858   PetscInt *iptr, t1, i_start, nel, *elms;
859   PetscInt ct;
860 
861 
862   PetscFunctionBegin;
863   /* to make life easier */
864   nel  = gs->nel;
865   elms = gs->elms;
866   ngh_buf = gs->ngh_buf;
867   sh_proc_mask  = gs->pw_nghs;
868 
869   /* need a few temp masks */
870   p_mask_size   = len_bit_mask(num_nodes);
871   p_mask        = (PetscInt*) malloc(p_mask_size);
872   tmp_proc_mask = (PetscInt*) malloc(p_mask_size);
873 
874   /* set mask to my my_id's bit mask */
875   set_bit_mask(p_mask,p_mask_size,my_id);
876 
877   p_mask_size /= sizeof(PetscInt);
878 
879   len_pair_list=gs->len_pw_list;
880   gs->pw_elm_list=pairwise_elm_list=(PetscInt*)malloc((len_pair_list+1)*sizeof(PetscInt));
881 
882   /* how many processors (nghs) do we have to exchange with? */
883   nprs=gs->num_pairs=ct_bits((char *)sh_proc_mask,p_mask_size*sizeof(PetscInt));
884 
885 
886   /* allocate space for gs_gop() info */
887   gs->pair_list = msg_list = (PetscInt *)  malloc(sizeof(PetscInt)*nprs);
888   gs->msg_sizes = msg_size  = (PetscInt *)  malloc(sizeof(PetscInt)*nprs);
889   gs->node_list = msg_nodes = (PetscInt **) malloc(sizeof(PetscInt*)*(nprs+1));
890 
891   /* init msg_size list */
892   ivec_zero(msg_size,nprs);
893 
894   /* expand from bit mask list to int list */
895   bm_to_proc((char *)sh_proc_mask,p_mask_size*sizeof(PetscInt),msg_list);
896 
897   /* keep list of elements being handled pairwise */
898   for (i=j=0;i<nel;i++)
899     {
900       if (elms[i] & TOP_BIT)
901         {elms[i] ^= TOP_BIT; pairwise_elm_list[j++] = i;}
902     }
903   pairwise_elm_list[j] = -1;
904 
905   gs->msg_ids_out = (MPI_Request *)  malloc(sizeof(MPI_Request)*(nprs+1));
906   gs->msg_ids_out[nprs] = MPI_REQUEST_NULL;
907   gs->msg_ids_in = (MPI_Request *)  malloc(sizeof(MPI_Request)*(nprs+1));
908   gs->msg_ids_in[nprs] = MPI_REQUEST_NULL;
909   gs->pw_vals = (PetscScalar *) malloc(sizeof(PetscScalar)*len_pair_list*vec_sz);
910 
911   /* find who goes to each processor */
912   for (i_start=i=0;i<nprs;i++)
913     {
914       /* processor i's mask */
915       set_bit_mask(p_mask,p_mask_size*sizeof(PetscInt),msg_list[i]);
916 
917       /* det # going to processor i */
918       for (ct=j=0;j<len_pair_list;j++)
919         {
920           buf2 = ngh_buf+(pairwise_elm_list[j]*p_mask_size);
921           ivec_and3(tmp_proc_mask,p_mask,buf2,p_mask_size);
922           if (ct_bits((char *)tmp_proc_mask,p_mask_size*sizeof(PetscInt)))
923             {ct++;}
924         }
925       msg_size[i] = ct;
926       i_start = PetscMax(i_start,ct);
927 
928       /*space to hold nodes in message to first neighbor */
929       msg_nodes[i] = iptr = (PetscInt*) malloc(sizeof(PetscInt)*(ct+1));
930 
931       for (j=0;j<len_pair_list;j++)
932         {
933           buf2 = ngh_buf+(pairwise_elm_list[j]*p_mask_size);
934           ivec_and3(tmp_proc_mask,p_mask,buf2,p_mask_size);
935           if (ct_bits((char *)tmp_proc_mask,p_mask_size*sizeof(PetscInt)))
936             {*iptr++ = j;}
937         }
938       *iptr = -1;
939     }
940   msg_nodes[nprs] = NULL;
941 
942   j=gs->loc_node_pairs=i_start;
943   t1 = GL_MAX;
944   giop(&i_start,&offset,1,&t1);
945   gs->max_node_pairs = i_start;
946 
947   i_start=j;
948   t1 = GL_MIN;
949   giop(&i_start,&offset,1,&t1);
950   gs->min_node_pairs = i_start;
951 
952   i_start=j;
953   t1 = GL_ADD;
954   giop(&i_start,&offset,1,&t1);
955   gs->avg_node_pairs = i_start/num_nodes + 1;
956 
957   i_start=nprs;
958   t1 = GL_MAX;
959   giop(&i_start,&offset,1,&t1);
960   gs->max_pairs = i_start;
961 
962 
963   /* remap pairwise in tail of gsi_via_bit_mask() */
964   gs->msg_total = ivec_sum(gs->msg_sizes,nprs);
965   gs->out = (PetscScalar *) malloc(sizeof(PetscScalar)*gs->msg_total*vec_sz);
966   gs->in  = (PetscScalar *) malloc(sizeof(PetscScalar)*gs->msg_total*vec_sz);
967 
968   /* reset malloc pool */
969   free((void*)p_mask);
970   free((void*)tmp_proc_mask);
971   PetscFunctionReturn(0);
972 }
973 
974 
975 
976 /******************************************************************************
977 Function: set_tree()
978 
979 Input :
980 Output:
981 Return:
982 Description:
983 
984 to do pruned tree just save ngh buf copy for each one and decode here!
985 ******************************************************************************/
986 static PetscErrorCode set_tree(gs_id *gs)
987 {
988    PetscInt i, j, n, nel;
989    PetscInt *iptr_in, *iptr_out, *tree_elms, *elms;
990 
991   PetscFunctionBegin;
992   /* local work ptrs */
993   elms = gs->elms;
994   nel     = gs->nel;
995 
996   /* how many via tree */
997   gs->tree_nel  = n = ntree;
998   gs->tree_elms = tree_elms = iptr_in = tree_buf;
999   gs->tree_buf  = (PetscScalar *) malloc(sizeof(PetscScalar)*n*vec_sz);
1000   gs->tree_work = (PetscScalar *) malloc(sizeof(PetscScalar)*n*vec_sz);
1001   j=gs->tree_map_sz;
1002   gs->tree_map_in = iptr_in  = (PetscInt*) malloc(sizeof(PetscInt)*(j+1));
1003   gs->tree_map_out = iptr_out = (PetscInt*) malloc(sizeof(PetscInt)*(j+1));
1004 
1005   /* search the longer of the two lists */
1006   /* note ... could save this info in get_ngh_buf and save searches */
1007   if (n<=nel)
1008     {
1009       /* bijective fct w/remap - search elm list */
1010       for (i=0; i<n; i++)
1011         {
1012           if ((j=ivec_binary_search(*tree_elms++,elms,nel))>=0)
1013             {*iptr_in++ = j; *iptr_out++ = i;}
1014         }
1015     }
1016   else
1017     {
1018       for (i=0; i<nel; i++)
1019         {
1020           if ((j=ivec_binary_search(*elms++,tree_elms,n))>=0)
1021             {*iptr_in++ = i; *iptr_out++ = j;}
1022         }
1023     }
1024 
1025   /* sentinel */
1026   *iptr_in = *iptr_out = -1;
1027   PetscFunctionReturn(0);
1028 }
1029 
1030 
1031 /******************************************************************************
1032 Function: gather_scatter
1033 
1034 Input :
1035 Output:
1036 Return:
1037 Description:
1038 ******************************************************************************/
1039 static PetscErrorCode gs_gop_local_out( gs_id *gs,  PetscScalar *vals)
1040 {
1041    PetscInt *num, *map, **reduce;
1042    PetscScalar tmp;
1043 
1044   PetscFunctionBegin;
1045   num    = gs->num_gop_local_reduce;
1046   reduce = gs->gop_local_reduce;
1047   while ((map = *reduce++))
1048     {
1049       /* wall */
1050       if (*num == 2)
1051         {
1052           num ++;
1053           vals[map[1]] = vals[map[0]];
1054         }
1055       /* corner shared by three elements */
1056       else if (*num == 3)
1057         {
1058           num ++;
1059           vals[map[2]] = vals[map[1]] = vals[map[0]];
1060         }
1061       /* corner shared by four elements */
1062       else if (*num == 4)
1063         {
1064           num ++;
1065           vals[map[3]] = vals[map[2]] = vals[map[1]] = vals[map[0]];
1066         }
1067       /* general case ... odd geoms ... 3D*/
1068       else
1069         {
1070           num++;
1071           tmp = *(vals + *map++);
1072           while (*map >= 0)
1073             {*(vals + *map++) = tmp;}
1074         }
1075     }
1076   PetscFunctionReturn(0);
1077 }
1078 
1079 
1080 
1081 /******************************************************************************
1082 Function: gather_scatter
1083 
1084 Input :
1085 Output:
1086 Return:
1087 Description:
1088 ******************************************************************************/
1089 PetscErrorCode gs_gop_binary(gs_ADT gs, PetscScalar *vals, rbfp fct)
1090 {
1091   PetscFunctionBegin;
1092   /* local only operations!!! */
1093   if (gs->num_local)
1094     {gs_gop_local_binary(gs,vals,fct);}
1095 
1096   /* if intersection tree/pairwise and local isn't empty */
1097   if (gs->num_local_gop)
1098     {
1099       gs_gop_local_in_binary(gs,vals,fct);
1100 
1101       /* pairwise */
1102       if (gs->num_pairs)
1103         {gs_gop_pairwise_binary(gs,vals,fct);}
1104 
1105       /* tree */
1106       else if (gs->max_left_over)
1107         {gs_gop_tree_binary(gs,vals,fct);}
1108 
1109       gs_gop_local_out(gs,vals);
1110     }
1111   /* if intersection tree/pairwise and local is empty */
1112   else
1113     {
1114       /* pairwise */
1115       if (gs->num_pairs)
1116         {gs_gop_pairwise_binary(gs,vals,fct);}
1117 
1118       /* tree */
1119       else if (gs->max_left_over)
1120         {gs_gop_tree_binary(gs,vals,fct);}
1121     }
1122   PetscFunctionReturn(0);
1123 }
1124 
1125 
1126 
1127 /******************************************************************************
1128 Function: gather_scatter
1129 
1130 Input :
1131 Output:
1132 Return:
1133 Description:
1134 ******************************************************************************/
1135 static PetscErrorCode gs_gop_local_binary( gs_id *gs,  PetscScalar *vals,  rbfp fct)
1136 {
1137    PetscInt *num, *map, **reduce;
1138   PetscScalar tmp;
1139 
1140   PetscFunctionBegin;
1141   num    = gs->num_local_reduce;
1142   reduce = gs->local_reduce;
1143   while ((map = *reduce))
1144     {
1145       num ++;
1146       (*fct)(&tmp,NULL,1);
1147       /* tmp = 0.0; */
1148       while (*map >= 0)
1149         {(*fct)(&tmp,(vals + *map),1); map++;}
1150         /*        {tmp = (*fct)(tmp,*(vals + *map)); map++;} */
1151 
1152       map = *reduce++;
1153       while (*map >= 0)
1154         {*(vals + *map++) = tmp;}
1155     }
1156   PetscFunctionReturn(0);
1157 }
1158 
1159 
1160 
1161 /******************************************************************************
1162 Function: gather_scatter
1163 
1164 Input :
1165 Output:
1166 Return:
1167 Description:
1168 ******************************************************************************/
1169 static PetscErrorCode gs_gop_local_in_binary( gs_id *gs,  PetscScalar *vals,  rbfp fct)
1170 {
1171    PetscInt *num, *map, **reduce;
1172    PetscScalar *base;
1173 
1174   PetscFunctionBegin;
1175   num    = gs->num_gop_local_reduce;
1176 
1177   reduce = gs->gop_local_reduce;
1178   while ((map = *reduce++))
1179     {
1180       num++;
1181       base = vals + *map++;
1182       while (*map >= 0)
1183         {(*fct)(base,(vals + *map),1); map++;}
1184     }
1185   PetscFunctionReturn(0);
1186 }
1187 
1188 
1189 
1190 /******************************************************************************
1191 Function: gather_scatter
1192 
1193 VERSION 3 ::
1194 
1195 Input :
1196 Output:
1197 Return:
1198 Description:
1199 ******************************************************************************/
1200 static PetscErrorCode gs_gop_pairwise_binary( gs_id *gs,  PetscScalar *in_vals,rbfp fct)
1201 {
1202   PetscScalar    *dptr1, *dptr2, *dptr3, *in1, *in2;
1203   PetscInt            *iptr, *msg_list, *msg_size, **msg_nodes;
1204   PetscInt            *pw, *list, *size, **nodes;
1205   MPI_Request    *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
1206   MPI_Status     status;
1207   PetscErrorCode ierr;
1208 
1209   PetscFunctionBegin;
1210   /* strip and load s */
1211   msg_list =list         = gs->pair_list;
1212   msg_size =size         = gs->msg_sizes;
1213   msg_nodes=nodes        = gs->node_list;
1214   iptr=pw                = gs->pw_elm_list;
1215   dptr1=dptr3            = gs->pw_vals;
1216   msg_ids_in  = ids_in   = gs->msg_ids_in;
1217   msg_ids_out = ids_out  = gs->msg_ids_out;
1218   dptr2                  = gs->out;
1219   in1=in2                = gs->in;
1220 
1221   /* post the receives */
1222   /*  msg_nodes=nodes; */
1223   do
1224     {
1225       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
1226          second one *list and do list++ afterwards */
1227       ierr = MPI_Irecv(in1, *size, MPIU_SCALAR, MPI_ANY_SOURCE, MSGTAG1 + *list++, gs->gs_comm, msg_ids_in++);CHKERRQ(ierr);
1228       in1 += *size++;
1229     }
1230   while (*++msg_nodes);
1231   msg_nodes=nodes;
1232 
1233   /* load gs values into in out gs buffers */
1234   while (*iptr >= 0)
1235     {*dptr3++ = *(in_vals + *iptr++);}
1236 
1237   /* load out buffers and post the sends */
1238   while ((iptr = *msg_nodes++))
1239     {
1240       dptr3 = dptr2;
1241       while (*iptr >= 0)
1242         {*dptr2++ = *(dptr1 + *iptr++);}
1243       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
1244       /* is msg_ids_out++ correct? */
1245       ierr = MPI_Isend(dptr3, *msg_size++, MPIU_SCALAR, *msg_list++, MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);CHKERRQ(ierr);
1246     }
1247 
1248   if (gs->max_left_over)
1249     {gs_gop_tree_binary(gs,in_vals,fct);}
1250 
1251   /* process the received data */
1252   msg_nodes=nodes;
1253   while ((iptr = *nodes++))
1254     {
1255       /* Should I check the return value of MPI_Wait() or status? */
1256       /* Can this loop be replaced by a call to MPI_Waitall()? */
1257       ierr = MPI_Wait(ids_in++, &status);CHKERRQ(ierr);
1258       while (*iptr >= 0)
1259         {(*fct)((dptr1 + *iptr),in2,1); iptr++; in2++;}
1260       /* {*(dptr1 + *iptr) = (*fct)(*(dptr1 + *iptr),*in2); iptr++; in2++;} */
1261     }
1262 
1263   /* replace vals */
1264   while (*pw >= 0)
1265     {*(in_vals + *pw++) = *dptr1++;}
1266 
1267   /* clear isend message handles */
1268   /* This changed for clarity though it could be the same */
1269   while (*msg_nodes++)
1270     /* Should I check the return value of MPI_Wait() or status? */
1271     /* Can this loop be replaced by a call to MPI_Waitall()? */
1272     {ierr = MPI_Wait(ids_out++, &status);CHKERRQ(ierr);}
1273   PetscFunctionReturn(0);
1274 }
1275 
1276 
1277 
1278 /******************************************************************************
1279 Function: gather_scatter
1280 
1281 Input :
1282 Output:
1283 Return:
1284 Description:
1285 ******************************************************************************/
1286 static PetscErrorCode gs_gop_tree_binary(gs_id *gs, PetscScalar *vals,  rbfp fct)
1287 {
1288   PetscInt         size;
1289   PetscInt         *in,    *out;
1290   PetscScalar *buf, *work;
1291 
1292   PetscFunctionBegin;
1293   in   = gs->tree_map_in;
1294   out  = gs->tree_map_out;
1295   buf  = gs->tree_buf;
1296   work = gs->tree_work;
1297   size = gs->tree_nel;
1298 
1299   /* load vals vector w/identity */
1300   (*fct)(buf,NULL,size);
1301 
1302   /* load my contribution into val vector */
1303   while (*in >= 0) {
1304     (*fct)((buf + *out++),(vals + *in++),-1);
1305   }
1306 
1307   gfop(buf,work,size,(vbfp)fct,MPIU_SCALAR,0);
1308 
1309   in   = gs->tree_map_in;
1310   out  = gs->tree_map_out;
1311   while (*in >= 0) {
1312     (*fct)((vals + *in++),(buf + *out++),-1);
1313   }
1314   PetscFunctionReturn(0);
1315 }
1316 
1317 
1318 
1319 
1320 /******************************************************************************
1321 Function: gather_scatter
1322 
1323 Input :
1324 Output:
1325 Return:
1326 Description:
1327 ******************************************************************************/
1328 PetscErrorCode gs_gop( gs_id *gs,  PetscScalar *vals,  const char *op)
1329 {
1330   PetscErrorCode ierr;
1331 
1332   PetscFunctionBegin;
1333   switch (*op) {
1334   case '+':
1335     gs_gop_plus(gs,vals);
1336     break;
1337   case '*':
1338     gs_gop_times(gs,vals);
1339     break;
1340   case 'a':
1341     gs_gop_min_abs(gs,vals);
1342     break;
1343   case 'A':
1344     gs_gop_max_abs(gs,vals);
1345     break;
1346   case 'e':
1347     gs_gop_exists(gs,vals);
1348     break;
1349   case 'm':
1350     gs_gop_min(gs,vals);
1351     break;
1352   case 'M':
1353     gs_gop_max(gs,vals); break;
1354   default:
1355     ierr = PetscInfo1(0,"gs_gop() :: %c is not a valid op",op[0]);
1356     ierr = PetscInfo(0,"gs_gop() :: default :: plus");
1357     gs_gop_plus(gs,vals);
1358     break;
1359   }
1360   PetscFunctionReturn(0);
1361 }
1362 
1363 
1364 /******************************************************************************
1365 Function: gather_scatter
1366 
1367 Input :
1368 Output:
1369 Return:
1370 Description:
1371 ******************************************************************************/
1372 static PetscErrorCode gs_gop_exists( gs_id *gs,  PetscScalar *vals)
1373 {
1374   PetscFunctionBegin;
1375   /* local only operations!!! */
1376   if (gs->num_local)
1377     {gs_gop_local_exists(gs,vals);}
1378 
1379   /* if intersection tree/pairwise and local isn't empty */
1380   if (gs->num_local_gop)
1381     {
1382       gs_gop_local_in_exists(gs,vals);
1383 
1384       /* pairwise */
1385       if (gs->num_pairs)
1386         {gs_gop_pairwise_exists(gs,vals);}
1387 
1388       /* tree */
1389       else if (gs->max_left_over)
1390         {gs_gop_tree_exists(gs,vals);}
1391 
1392       gs_gop_local_out(gs,vals);
1393     }
1394   /* if intersection tree/pairwise and local is empty */
1395   else
1396     {
1397       /* pairwise */
1398       if (gs->num_pairs)
1399         {gs_gop_pairwise_exists(gs,vals);}
1400 
1401       /* tree */
1402       else if (gs->max_left_over)
1403         {gs_gop_tree_exists(gs,vals);}
1404     }
1405   PetscFunctionReturn(0);
1406 }
1407 
1408 
1409 
1410 /******************************************************************************
1411 Function: gather_scatter
1412 
1413 Input :
1414 Output:
1415 Return:
1416 Description:
1417 ******************************************************************************/
1418 static PetscErrorCode gs_gop_local_exists( gs_id *gs,  PetscScalar *vals)
1419 {
1420    PetscInt         *num, *map, **reduce;
1421    PetscScalar tmp;
1422 
1423   PetscFunctionBegin;
1424   num    = gs->num_local_reduce;
1425   reduce = gs->local_reduce;
1426   while ((map = *reduce))
1427     {
1428       num ++;
1429       tmp = 0.0;
1430       while (*map >= 0)
1431         {tmp = EXISTS(tmp,*(vals + *map)); map++;}
1432 
1433       map = *reduce++;
1434       while (*map >= 0)
1435         {*(vals + *map++) = tmp;}
1436     }
1437   PetscFunctionReturn(0);
1438 }
1439 
1440 /******************************************************************************/
1441 static PetscErrorCode gs_gop_local_in_exists( gs_id *gs,  PetscScalar *vals)
1442 {
1443   PetscInt         *num, *map, **reduce;
1444   PetscScalar *base;
1445 
1446   PetscFunctionBegin;
1447   num    = gs->num_gop_local_reduce;
1448   reduce = gs->gop_local_reduce;
1449   while ((map = *reduce++))
1450     {
1451       num++;
1452       base = vals + *map++;
1453       while (*map >= 0)
1454         {*base = EXISTS(*base,*(vals + *map)); map++;}
1455     }
1456   PetscFunctionReturn(0);
1457 }
1458 
1459 static PetscErrorCode gs_gop_pairwise_exists( gs_id *gs,  PetscScalar *in_vals)
1460 {
1461   PetscScalar    *dptr1, *dptr2, *dptr3, *in1, *in2;
1462   PetscInt            *iptr, *msg_list, *msg_size, **msg_nodes;
1463   PetscInt            *pw, *list, *size, **nodes;
1464   MPI_Request    *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
1465   MPI_Status     status;
1466   PetscErrorCode ierr;
1467 
1468   PetscFunctionBegin;
1469   /* strip and load s */
1470   msg_list =list         = gs->pair_list;
1471   msg_size =size         = gs->msg_sizes;
1472   msg_nodes=nodes        = gs->node_list;
1473   iptr=pw                = gs->pw_elm_list;
1474   dptr1=dptr3            = gs->pw_vals;
1475   msg_ids_in  = ids_in   = gs->msg_ids_in;
1476   msg_ids_out = ids_out  = gs->msg_ids_out;
1477   dptr2                  = gs->out;
1478   in1=in2                = gs->in;
1479 
1480   /* post the receives */
1481   do
1482     {
1483       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
1484          second one *list and do list++ afterwards */
1485       ierr = MPI_Irecv(in1, *size, MPIU_SCALAR, MPI_ANY_SOURCE, MSGTAG1 + *list++, gs->gs_comm, msg_ids_in++);CHKERRQ(ierr);
1486       in1 += *size++;
1487     }
1488   while (*++msg_nodes);
1489   msg_nodes=nodes;
1490 
1491   /* load gs values into in out gs buffers */
1492   while (*iptr >= 0)
1493     {*dptr3++ = *(in_vals + *iptr++);}
1494 
1495   /* load out buffers and post the sends */
1496   while ((iptr = *msg_nodes++))
1497     {
1498       dptr3 = dptr2;
1499       while (*iptr >= 0)
1500         {*dptr2++ = *(dptr1 + *iptr++);}
1501       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
1502       /* is msg_ids_out++ correct? */
1503       ierr = MPI_Isend(dptr3, *msg_size++, MPIU_SCALAR, *msg_list++, MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);CHKERRQ(ierr);
1504     }
1505 
1506   if (gs->max_left_over)
1507     {gs_gop_tree_exists(gs,in_vals);}
1508 
1509   /* process the received data */
1510   msg_nodes=nodes;
1511   while ((iptr = *nodes++))
1512     {
1513       /* Should I check the return value of MPI_Wait() or status? */
1514       /* Can this loop be replaced by a call to MPI_Waitall()? */
1515       ierr = MPI_Wait(ids_in++, &status);CHKERRQ(ierr);
1516       while (*iptr >= 0)
1517         {*(dptr1 + *iptr) = EXISTS(*(dptr1 + *iptr),*in2); iptr++; in2++;}
1518     }
1519 
1520   /* replace vals */
1521   while (*pw >= 0)
1522     {*(in_vals + *pw++) = *dptr1++;}
1523 
1524   /* clear isend message handles */
1525   /* This changed for clarity though it could be the same */
1526   while (*msg_nodes++)
1527     /* Should I check the return value of MPI_Wait() or status? */
1528     /* Can this loop be replaced by a call to MPI_Waitall()? */
1529     {ierr = MPI_Wait(ids_out++, &status);CHKERRQ(ierr);}
1530   PetscFunctionReturn(0);
1531 }
1532 /******************************************************************************/
1533 static PetscErrorCode gs_gop_tree_exists(gs_id *gs, PetscScalar *vals)
1534 {
1535   PetscInt         size;
1536   PetscInt         *in, *out;
1537   PetscScalar *buf, *work;
1538   PetscInt         op[] = {GL_EXISTS,0};
1539 
1540   PetscFunctionBegin;
1541   in   = gs->tree_map_in;
1542   out  = gs->tree_map_out;
1543   buf  = gs->tree_buf;
1544   work = gs->tree_work;
1545   size = gs->tree_nel;
1546 
1547   rvec_zero(buf,size);
1548 
1549   while (*in >= 0)
1550     {
1551       /*
1552       printf("%d :: out=%d\n",my_id,*out);
1553       printf("%d :: in=%d\n",my_id,*in);
1554       */
1555       *(buf + *out++) = *(vals + *in++);
1556     }
1557 
1558   grop(buf,work,size,op);
1559 
1560   in   = gs->tree_map_in;
1561   out  = gs->tree_map_out;
1562 
1563   while (*in >= 0)
1564     {*(vals + *in++) = *(buf + *out++);}
1565   PetscFunctionReturn(0);
1566 }
1567 
1568 /*******************************************************************************/
1569 static PetscErrorCode gs_gop_max_abs( gs_id *gs,  PetscScalar *vals)
1570 {
1571   PetscFunctionBegin;
1572   /* local only operations!!! */
1573   if (gs->num_local)
1574     {gs_gop_local_max_abs(gs,vals);}
1575 
1576   /* if intersection tree/pairwise and local isn't empty */
1577   if (gs->num_local_gop)
1578     {
1579       gs_gop_local_in_max_abs(gs,vals);
1580 
1581       /* pairwise */
1582       if (gs->num_pairs)
1583         {gs_gop_pairwise_max_abs(gs,vals);}
1584 
1585       /* tree */
1586       else if (gs->max_left_over)
1587         {gs_gop_tree_max_abs(gs,vals);}
1588 
1589       gs_gop_local_out(gs,vals);
1590     }
1591   /* if intersection tree/pairwise and local is empty */
1592   else
1593     {
1594       /* pairwise */
1595       if (gs->num_pairs)
1596         {gs_gop_pairwise_max_abs(gs,vals);}
1597 
1598       /* tree */
1599       else if (gs->max_left_over)
1600         {gs_gop_tree_max_abs(gs,vals);}
1601     }
1602   PetscFunctionReturn(0);
1603 }
1604 
1605 /******************************************************************************/
1606 static PetscErrorCode gs_gop_local_max_abs( gs_id *gs,  PetscScalar *vals)
1607 {
1608   PetscInt         *num, *map, **reduce;
1609   PetscScalar tmp;
1610 
1611   PetscFunctionBegin;
1612   num    = gs->num_local_reduce;
1613   reduce = gs->local_reduce;
1614   while ((map = *reduce))
1615     {
1616       num ++;
1617       tmp = 0.0;
1618       while (*map >= 0)
1619         {tmp = MAX_FABS(tmp,*(vals + *map)); map++;}
1620 
1621       map = *reduce++;
1622       while (*map >= 0)
1623         {*(vals + *map++) = tmp;}
1624     }
1625   PetscFunctionReturn(0);
1626 }
1627 
1628 /******************************************************************************/
1629 static PetscErrorCode gs_gop_local_in_max_abs( gs_id *gs,  PetscScalar *vals)
1630 {
1631   PetscInt         *num, *map, **reduce;
1632   PetscScalar *base;
1633 
1634   PetscFunctionBegin;
1635   num    = gs->num_gop_local_reduce;
1636   reduce = gs->gop_local_reduce;
1637   while ((map = *reduce++))
1638     {
1639       num++;
1640       base = vals + *map++;
1641       while (*map >= 0)
1642         {*base = MAX_FABS(*base,*(vals + *map)); map++;}
1643     }
1644   PetscFunctionReturn(0);
1645 }
1646 
1647 /******************************************************************************/
1648 static PetscErrorCode gs_gop_pairwise_max_abs( gs_id *gs,  PetscScalar *in_vals)
1649 {
1650   PetscScalar    *dptr1, *dptr2, *dptr3, *in1, *in2;
1651   PetscInt            *iptr, *msg_list, *msg_size, **msg_nodes;
1652   PetscInt            *pw, *list, *size, **nodes;
1653   MPI_Request    *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
1654   MPI_Status     status;
1655   PetscErrorCode ierr;
1656 
1657   PetscFunctionBegin;
1658   /* strip and load s */
1659   msg_list =list         = gs->pair_list;
1660   msg_size =size         = gs->msg_sizes;
1661   msg_nodes=nodes        = gs->node_list;
1662   iptr=pw                = gs->pw_elm_list;
1663   dptr1=dptr3            = gs->pw_vals;
1664   msg_ids_in  = ids_in   = gs->msg_ids_in;
1665   msg_ids_out = ids_out  = gs->msg_ids_out;
1666   dptr2                  = gs->out;
1667   in1=in2                = gs->in;
1668 
1669   /* post the receives */
1670   /*  msg_nodes=nodes; */
1671   do
1672     {
1673       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
1674          second one *list and do list++ afterwards */
1675       ierr = MPI_Irecv(in1, *size, MPIU_SCALAR, MPI_ANY_SOURCE, MSGTAG1 + *list++, gs->gs_comm, msg_ids_in++);CHKERRQ(ierr);
1676       in1 += *size++;
1677     }
1678   while (*++msg_nodes);
1679   msg_nodes=nodes;
1680 
1681   /* load gs values into in out gs buffers */
1682   while (*iptr >= 0)
1683     {*dptr3++ = *(in_vals + *iptr++);}
1684 
1685   /* load out buffers and post the sends */
1686   while ((iptr = *msg_nodes++))
1687     {
1688       dptr3 = dptr2;
1689       while (*iptr >= 0)
1690         {*dptr2++ = *(dptr1 + *iptr++);}
1691       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
1692       /* is msg_ids_out++ correct? */
1693       ierr = MPI_Isend(dptr3, *msg_size++, MPIU_SCALAR, *msg_list++, MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);CHKERRQ(ierr);
1694     }
1695 
1696   if (gs->max_left_over)
1697     {gs_gop_tree_max_abs(gs,in_vals);}
1698 
1699   /* process the received data */
1700   msg_nodes=nodes;
1701   while ((iptr = *nodes++))
1702     {
1703       /* Should I check the return value of MPI_Wait() or status? */
1704       /* Can this loop be replaced by a call to MPI_Waitall()? */
1705       ierr = MPI_Wait(ids_in++, &status);CHKERRQ(ierr);
1706       while (*iptr >= 0)
1707         {*(dptr1 + *iptr) = MAX_FABS(*(dptr1 + *iptr),*in2); iptr++; in2++;}
1708     }
1709 
1710   /* replace vals */
1711   while (*pw >= 0)
1712     {*(in_vals + *pw++) = *dptr1++;}
1713 
1714   /* clear isend message handles */
1715   /* This changed for clarity though it could be the same */
1716   while (*msg_nodes++)
1717     /* Should I check the return value of MPI_Wait() or status? */
1718     /* Can this loop be replaced by a call to MPI_Waitall()? */
1719     {ierr = MPI_Wait(ids_out++, &status);CHKERRQ(ierr);}
1720   PetscFunctionReturn(0);
1721 }
1722 
1723 /******************************************************************************/
1724 static PetscErrorCode gs_gop_tree_max_abs(gs_id *gs, PetscScalar *vals)
1725 {
1726   PetscInt         size;
1727   PetscInt         *in, *out;
1728   PetscScalar *buf, *work;
1729   PetscInt         op[] = {GL_MAX_ABS,0};
1730 
1731   PetscFunctionBegin;
1732   in   = gs->tree_map_in;
1733   out  = gs->tree_map_out;
1734   buf  = gs->tree_buf;
1735   work = gs->tree_work;
1736   size = gs->tree_nel;
1737 
1738   rvec_zero(buf,size);
1739 
1740   while (*in >= 0)
1741     {
1742       /*
1743       printf("%d :: out=%d\n",my_id,*out);
1744       printf("%d :: in=%d\n",my_id,*in);
1745       */
1746       *(buf + *out++) = *(vals + *in++);
1747     }
1748 
1749   grop(buf,work,size,op);
1750 
1751   in   = gs->tree_map_in;
1752   out  = gs->tree_map_out;
1753 
1754   while (*in >= 0)
1755     {*(vals + *in++) = *(buf + *out++);}
1756   PetscFunctionReturn(0);
1757 }
1758 
1759 /******************************************************************************/
1760 static PetscErrorCode gs_gop_max( gs_id *gs,  PetscScalar *vals)
1761 {
1762   PetscFunctionBegin;
1763   /* local only operations!!! */
1764   if (gs->num_local)
1765     {gs_gop_local_max(gs,vals);}
1766 
1767   /* if intersection tree/pairwise and local isn't empty */
1768   if (gs->num_local_gop)
1769     {
1770       gs_gop_local_in_max(gs,vals);
1771 
1772       /* pairwise */
1773       if (gs->num_pairs)
1774         {gs_gop_pairwise_max(gs,vals);}
1775 
1776       /* tree */
1777       else if (gs->max_left_over)
1778         {gs_gop_tree_max(gs,vals);}
1779 
1780       gs_gop_local_out(gs,vals);
1781     }
1782   /* if intersection tree/pairwise and local is empty */
1783   else
1784     {
1785       /* pairwise */
1786       if (gs->num_pairs)
1787         {gs_gop_pairwise_max(gs,vals);}
1788 
1789       /* tree */
1790       else if (gs->max_left_over)
1791         {gs_gop_tree_max(gs,vals);}
1792     }
1793   PetscFunctionReturn(0);
1794 }
1795 
1796 /******************************************************************************/
1797 static PetscErrorCode gs_gop_local_max( gs_id *gs,  PetscScalar *vals)
1798 {
1799   PetscInt         *num, *map, **reduce;
1800   PetscScalar tmp;
1801 
1802   PetscFunctionBegin;
1803   num    = gs->num_local_reduce;
1804   reduce = gs->local_reduce;
1805   while ((map = *reduce))
1806     {
1807       num ++;
1808       tmp = -REAL_MAX;
1809       while (*map >= 0)
1810         {tmp = PetscMax(tmp,*(vals + *map)); map++;}
1811 
1812       map = *reduce++;
1813       while (*map >= 0)
1814         {*(vals + *map++) = tmp;}
1815     }
1816   PetscFunctionReturn(0);
1817 }
1818 
1819 /******************************************************************************/
1820 static PetscErrorCode gs_gop_local_in_max( gs_id *gs,  PetscScalar *vals)
1821 {
1822   PetscInt         *num, *map, **reduce;
1823   PetscScalar *base;
1824 
1825   PetscFunctionBegin;
1826   num    = gs->num_gop_local_reduce;
1827   reduce = gs->gop_local_reduce;
1828   while ((map = *reduce++))
1829     {
1830       num++;
1831       base = vals + *map++;
1832       while (*map >= 0)
1833         {*base = PetscMax(*base,*(vals + *map)); map++;}
1834     }
1835   PetscFunctionReturn(0);
1836 }
1837 
1838 /******************************************************************************/
1839 static PetscErrorCode gs_gop_pairwise_max( gs_id *gs,  PetscScalar *in_vals)
1840 {
1841   PetscScalar    *dptr1, *dptr2, *dptr3, *in1, *in2;
1842   PetscInt            *iptr, *msg_list, *msg_size, **msg_nodes;
1843   PetscInt            *pw, *list, *size, **nodes;
1844   MPI_Request    *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
1845   MPI_Status     status;
1846   PetscErrorCode ierr;
1847 
1848   PetscFunctionBegin;
1849   /* strip and load s */
1850   msg_list =list         = gs->pair_list;
1851   msg_size =size         = gs->msg_sizes;
1852   msg_nodes=nodes        = gs->node_list;
1853   iptr=pw                = gs->pw_elm_list;
1854   dptr1=dptr3            = gs->pw_vals;
1855   msg_ids_in  = ids_in   = gs->msg_ids_in;
1856   msg_ids_out = ids_out  = gs->msg_ids_out;
1857   dptr2                  = gs->out;
1858   in1=in2                = gs->in;
1859 
1860   /* post the receives */
1861   /*  msg_nodes=nodes; */
1862   do
1863     {
1864       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
1865          second one *list and do list++ afterwards */
1866       ierr = MPI_Irecv(in1, *size, MPIU_SCALAR, MPI_ANY_SOURCE, MSGTAG1 + *list++, gs->gs_comm, msg_ids_in++);CHKERRQ(ierr);
1867       in1 += *size++;
1868     }
1869   while (*++msg_nodes);
1870   msg_nodes=nodes;
1871 
1872   /* load gs values into in out gs buffers */
1873   while (*iptr >= 0)
1874     {*dptr3++ = *(in_vals + *iptr++);}
1875 
1876   /* load out buffers and post the sends */
1877   while ((iptr = *msg_nodes++))
1878     {
1879       dptr3 = dptr2;
1880       while (*iptr >= 0)
1881         {*dptr2++ = *(dptr1 + *iptr++);}
1882       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
1883       /* is msg_ids_out++ correct? */
1884       ierr = MPI_Isend(dptr3, *msg_size++, MPIU_SCALAR, *msg_list++, MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);CHKERRQ(ierr);
1885     }
1886 
1887   if (gs->max_left_over)
1888     {gs_gop_tree_max(gs,in_vals);}
1889 
1890   /* process the received data */
1891   msg_nodes=nodes;
1892   while ((iptr = *nodes++))
1893     {
1894       /* Should I check the return value of MPI_Wait() or status? */
1895       /* Can this loop be replaced by a call to MPI_Waitall()? */
1896       ierr = MPI_Wait(ids_in++, &status);CHKERRQ(ierr);
1897       while (*iptr >= 0)
1898         {*(dptr1 + *iptr) = PetscMax(*(dptr1 + *iptr),*in2); iptr++; in2++;}
1899     }
1900 
1901   /* replace vals */
1902   while (*pw >= 0)
1903     {*(in_vals + *pw++) = *dptr1++;}
1904 
1905   /* clear isend message handles */
1906   /* This changed for clarity though it could be the same */
1907   while (*msg_nodes++)
1908     /* Should I check the return value of MPI_Wait() or status? */
1909     /* Can this loop be replaced by a call to MPI_Waitall()? */
1910     {ierr = MPI_Wait(ids_out++, &status);CHKERRQ(ierr);}
1911   PetscFunctionReturn(0);
1912 }
1913 
1914 /******************************************************************************/
1915 static PetscErrorCode gs_gop_tree_max(gs_id *gs, PetscScalar *vals)
1916 {
1917   PetscInt            size;
1918   PetscInt            *in, *out;
1919   PetscScalar    *buf, *work;
1920   PetscErrorCode ierr;
1921 
1922   PetscFunctionBegin;
1923   in   = gs->tree_map_in;
1924   out  = gs->tree_map_out;
1925   buf  = gs->tree_buf;
1926   work = gs->tree_work;
1927   size = gs->tree_nel;
1928 
1929   rvec_set(buf,-REAL_MAX,size);
1930 
1931   while (*in >= 0)
1932     {*(buf + *out++) = *(vals + *in++);}
1933 
1934   in   = gs->tree_map_in;
1935   out  = gs->tree_map_out;
1936   ierr = MPI_Allreduce(buf,work,size,MPIU_SCALAR,MPI_MAX,gs->gs_comm);CHKERRQ(ierr);
1937   while (*in >= 0)
1938     {*(vals + *in++) = *(work + *out++);}
1939   PetscFunctionReturn(0);
1940 }
1941 /******************************************************************************/
1942 static PetscErrorCode gs_gop_min_abs( gs_id *gs,  PetscScalar *vals)
1943 {
1944   PetscFunctionBegin;
1945   /* local only operations!!! */
1946   if (gs->num_local)
1947     {gs_gop_local_min_abs(gs,vals);}
1948 
1949   /* if intersection tree/pairwise and local isn't empty */
1950   if (gs->num_local_gop)
1951     {
1952       gs_gop_local_in_min_abs(gs,vals);
1953 
1954       /* pairwise */
1955       if (gs->num_pairs)
1956         {gs_gop_pairwise_min_abs(gs,vals);}
1957 
1958       /* tree */
1959       else if (gs->max_left_over)
1960         {gs_gop_tree_min_abs(gs,vals);}
1961 
1962       gs_gop_local_out(gs,vals);
1963     }
1964   /* if intersection tree/pairwise and local is empty */
1965   else
1966     {
1967       /* pairwise */
1968       if (gs->num_pairs)
1969         {gs_gop_pairwise_min_abs(gs,vals);}
1970 
1971       /* tree */
1972       else if (gs->max_left_over)
1973         {gs_gop_tree_min_abs(gs,vals);}
1974     }
1975   PetscFunctionReturn(0);
1976 }
1977 
1978 /******************************************************************************/
1979 static PetscErrorCode gs_gop_local_min_abs( gs_id *gs,  PetscScalar *vals)
1980 {
1981    PetscInt *num, *map, **reduce;
1982    PetscScalar tmp;
1983 
1984   PetscFunctionBegin;
1985   num    = gs->num_local_reduce;
1986   reduce = gs->local_reduce;
1987   while ((map = *reduce))
1988     {
1989       num ++;
1990       tmp = REAL_MAX;
1991       while (*map >= 0)
1992         {tmp = MIN_FABS(tmp,*(vals + *map)); map++;}
1993 
1994       map = *reduce++;
1995       while (*map >= 0)
1996         {*(vals + *map++) = tmp;}
1997     }
1998   PetscFunctionReturn(0);
1999 }
2000 
2001 /******************************************************************************/
2002 static PetscErrorCode gs_gop_local_in_min_abs( gs_id *gs,  PetscScalar *vals)
2003 {
2004    PetscInt *num, *map, **reduce;
2005    PetscScalar *base;
2006 
2007   PetscFunctionBegin;
2008   num    = gs->num_gop_local_reduce;
2009   reduce = gs->gop_local_reduce;
2010   while ((map = *reduce++))
2011     {
2012       num++;
2013       base = vals + *map++;
2014       while (*map >= 0)
2015         {*base = MIN_FABS(*base,*(vals + *map)); map++;}
2016     }
2017   PetscFunctionReturn(0);
2018 }
2019 
2020 /******************************************************************************/
2021 static PetscErrorCode gs_gop_pairwise_min_abs( gs_id *gs,  PetscScalar *in_vals)
2022 {
2023    PetscScalar *dptr1, *dptr2, *dptr3, *in1, *in2;
2024    PetscInt *iptr, *msg_list, *msg_size, **msg_nodes;
2025    PetscInt *pw, *list, *size, **nodes;
2026   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
2027   MPI_Status status;
2028   PetscErrorCode ierr;
2029 
2030   PetscFunctionBegin;
2031   /* strip and load s */
2032   msg_list =list         = gs->pair_list;
2033   msg_size =size         = gs->msg_sizes;
2034   msg_nodes=nodes        = gs->node_list;
2035   iptr=pw                = gs->pw_elm_list;
2036   dptr1=dptr3            = gs->pw_vals;
2037   msg_ids_in  = ids_in   = gs->msg_ids_in;
2038   msg_ids_out = ids_out  = gs->msg_ids_out;
2039   dptr2                  = gs->out;
2040   in1=in2                = gs->in;
2041 
2042   /* post the receives */
2043   /*  msg_nodes=nodes; */
2044   do
2045     {
2046       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
2047          second one *list and do list++ afterwards */
2048       ierr = MPI_Irecv(in1, *size, MPIU_SCALAR, MPI_ANY_SOURCE, MSGTAG1 + *list++, gs->gs_comm, msg_ids_in++);CHKERRQ(ierr);
2049       in1 += *size++;
2050     }
2051   while (*++msg_nodes);
2052   msg_nodes=nodes;
2053 
2054   /* load gs values into in out gs buffers */
2055   while (*iptr >= 0)
2056     {*dptr3++ = *(in_vals + *iptr++);}
2057 
2058   /* load out buffers and post the sends */
2059   while ((iptr = *msg_nodes++))
2060     {
2061       dptr3 = dptr2;
2062       while (*iptr >= 0)
2063         {*dptr2++ = *(dptr1 + *iptr++);}
2064       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
2065       /* is msg_ids_out++ correct? */
2066       ierr = MPI_Isend(dptr3, *msg_size++, MPIU_SCALAR, *msg_list++, MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);CHKERRQ(ierr);
2067     }
2068 
2069   if (gs->max_left_over)
2070     {gs_gop_tree_min_abs(gs,in_vals);}
2071 
2072   /* process the received data */
2073   msg_nodes=nodes;
2074   while ((iptr = *nodes++))
2075     {
2076       /* Should I check the return value of MPI_Wait() or status? */
2077       /* Can this loop be replaced by a call to MPI_Waitall()? */
2078       ierr = MPI_Wait(ids_in++, &status);CHKERRQ(ierr);
2079       while (*iptr >= 0)
2080         {*(dptr1 + *iptr) = MIN_FABS(*(dptr1 + *iptr),*in2); iptr++; in2++;}
2081     }
2082 
2083   /* replace vals */
2084   while (*pw >= 0)
2085     {*(in_vals + *pw++) = *dptr1++;}
2086 
2087   /* clear isend message handles */
2088   /* This changed for clarity though it could be the same */
2089   while (*msg_nodes++)
2090     /* Should I check the return value of MPI_Wait() or status? */
2091     /* Can this loop be replaced by a call to MPI_Waitall()? */
2092     {ierr = MPI_Wait(ids_out++, &status);CHKERRQ(ierr);}
2093   PetscFunctionReturn(0);
2094 }
2095 
2096 /******************************************************************************/
2097 static PetscErrorCode gs_gop_tree_min_abs(gs_id *gs, PetscScalar *vals)
2098 {
2099   PetscInt size;
2100   PetscInt *in, *out;
2101   PetscScalar *buf, *work;
2102   PetscInt op[] = {GL_MIN_ABS,0};
2103 
2104   PetscFunctionBegin;
2105   in   = gs->tree_map_in;
2106   out  = gs->tree_map_out;
2107   buf  = gs->tree_buf;
2108   work = gs->tree_work;
2109   size = gs->tree_nel;
2110 
2111   rvec_set(buf,REAL_MAX,size);
2112 
2113   while (*in >= 0)
2114     {*(buf + *out++) = *(vals + *in++);}
2115 
2116   in   = gs->tree_map_in;
2117   out  = gs->tree_map_out;
2118   grop(buf,work,size,op);
2119   while (*in >= 0)
2120     {*(vals + *in++) = *(buf + *out++);}
2121   PetscFunctionReturn(0);
2122 }
2123 
2124 /******************************************************************************/
2125 static PetscErrorCode gs_gop_min( gs_id *gs,  PetscScalar *vals)
2126 {
2127   PetscFunctionBegin;
2128   /* local only operations!!! */
2129   if (gs->num_local)
2130     {gs_gop_local_min(gs,vals);}
2131 
2132   /* if intersection tree/pairwise and local isn't empty */
2133   if (gs->num_local_gop)
2134     {
2135       gs_gop_local_in_min(gs,vals);
2136 
2137       /* pairwise */
2138       if (gs->num_pairs)
2139         {gs_gop_pairwise_min(gs,vals);}
2140 
2141       /* tree */
2142       else if (gs->max_left_over)
2143         {gs_gop_tree_min(gs,vals);}
2144 
2145       gs_gop_local_out(gs,vals);
2146     }
2147   /* if intersection tree/pairwise and local is empty */
2148   else
2149     {
2150       /* pairwise */
2151       if (gs->num_pairs)
2152         {gs_gop_pairwise_min(gs,vals);}
2153 
2154       /* tree */
2155       else if (gs->max_left_over)
2156         {gs_gop_tree_min(gs,vals);}
2157     }
2158   PetscFunctionReturn(0);
2159 }
2160 
2161 /******************************************************************************/
2162 static PetscErrorCode gs_gop_local_min( gs_id *gs,  PetscScalar *vals)
2163 {
2164    PetscInt *num, *map, **reduce;
2165    PetscScalar tmp;
2166   PetscFunctionBegin;
2167   num    = gs->num_local_reduce;
2168   reduce = gs->local_reduce;
2169   while ((map = *reduce))
2170     {
2171       num ++;
2172       tmp = REAL_MAX;
2173       while (*map >= 0)
2174         {tmp = PetscMin(tmp,*(vals + *map)); map++;}
2175 
2176       map = *reduce++;
2177       while (*map >= 0)
2178         {*(vals + *map++) = tmp;}
2179     }
2180   PetscFunctionReturn(0);
2181 }
2182 
2183 /******************************************************************************/
2184 static PetscErrorCode gs_gop_local_in_min( gs_id *gs,  PetscScalar *vals)
2185 {
2186    PetscInt *num, *map, **reduce;
2187    PetscScalar *base;
2188 
2189   PetscFunctionBegin;
2190   num    = gs->num_gop_local_reduce;
2191   reduce = gs->gop_local_reduce;
2192   while ((map = *reduce++))
2193     {
2194       num++;
2195       base = vals + *map++;
2196       while (*map >= 0)
2197         {*base = PetscMin(*base,*(vals + *map)); map++;}
2198     }
2199   PetscFunctionReturn(0);
2200 }
2201 
2202 /******************************************************************************/
2203 static PetscErrorCode gs_gop_pairwise_min( gs_id *gs,  PetscScalar *in_vals)
2204 {
2205    PetscScalar *dptr1, *dptr2, *dptr3, *in1, *in2;
2206    PetscInt *iptr, *msg_list, *msg_size, **msg_nodes;
2207    PetscInt *pw, *list, *size, **nodes;
2208   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
2209   MPI_Status status;
2210   PetscErrorCode ierr;
2211 
2212   PetscFunctionBegin;
2213   /* strip and load s */
2214   msg_list =list         = gs->pair_list;
2215   msg_size =size         = gs->msg_sizes;
2216   msg_nodes=nodes        = gs->node_list;
2217   iptr=pw                = gs->pw_elm_list;
2218   dptr1=dptr3            = gs->pw_vals;
2219   msg_ids_in  = ids_in   = gs->msg_ids_in;
2220   msg_ids_out = ids_out  = gs->msg_ids_out;
2221   dptr2                  = gs->out;
2222   in1=in2                = gs->in;
2223 
2224   /* post the receives */
2225   /*  msg_nodes=nodes; */
2226   do
2227     {
2228       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
2229          second one *list and do list++ afterwards */
2230       ierr = MPI_Irecv(in1, *size, MPIU_SCALAR, MPI_ANY_SOURCE, MSGTAG1 + *list++, gs->gs_comm, msg_ids_in++);CHKERRQ(ierr);
2231       in1 += *size++;
2232     }
2233   while (*++msg_nodes);
2234   msg_nodes=nodes;
2235 
2236   /* load gs values into in out gs buffers */
2237   while (*iptr >= 0)
2238     {*dptr3++ = *(in_vals + *iptr++);}
2239 
2240   /* load out buffers and post the sends */
2241   while ((iptr = *msg_nodes++))
2242     {
2243       dptr3 = dptr2;
2244       while (*iptr >= 0)
2245         {*dptr2++ = *(dptr1 + *iptr++);}
2246       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
2247       /* is msg_ids_out++ correct? */
2248       ierr = MPI_Isend(dptr3, *msg_size++, MPIU_SCALAR, *msg_list++, MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);CHKERRQ(ierr);
2249     }
2250 
2251   /* process the received data */
2252   if (gs->max_left_over)
2253     {gs_gop_tree_min(gs,in_vals);}
2254 
2255   msg_nodes=nodes;
2256   while ((iptr = *nodes++))
2257     {
2258       /* Should I check the return value of MPI_Wait() or status? */
2259       /* Can this loop be replaced by a call to MPI_Waitall()? */
2260       ierr = MPI_Wait(ids_in++, &status);CHKERRQ(ierr);
2261       while (*iptr >= 0)
2262         {*(dptr1 + *iptr) = PetscMin(*(dptr1 + *iptr),*in2); iptr++; in2++;}
2263     }
2264 
2265   /* replace vals */
2266   while (*pw >= 0)
2267     {*(in_vals + *pw++) = *dptr1++;}
2268 
2269   /* clear isend message handles */
2270   /* This changed for clarity though it could be the same */
2271   while (*msg_nodes++)
2272     /* Should I check the return value of MPI_Wait() or status? */
2273     /* Can this loop be replaced by a call to MPI_Waitall()? */
2274     {ierr = MPI_Wait(ids_out++, &status);CHKERRQ(ierr);}
2275   PetscFunctionReturn(0);
2276 }
2277 
2278 /******************************************************************************/
2279 static PetscErrorCode gs_gop_tree_min(gs_id *gs, PetscScalar *vals)
2280 {
2281   PetscInt size;
2282   PetscInt *in, *out;
2283   PetscScalar *buf, *work;
2284   PetscErrorCode ierr;
2285 
2286   PetscFunctionBegin;
2287   in   = gs->tree_map_in;
2288   out  = gs->tree_map_out;
2289   buf  = gs->tree_buf;
2290   work = gs->tree_work;
2291   size = gs->tree_nel;
2292 
2293   rvec_set(buf,REAL_MAX,size);
2294 
2295   while (*in >= 0)
2296     {*(buf + *out++) = *(vals + *in++);}
2297 
2298   in   = gs->tree_map_in;
2299   out  = gs->tree_map_out;
2300   ierr = MPI_Allreduce(buf,work,size,MPIU_SCALAR,MPI_MIN,gs->gs_comm);CHKERRQ(ierr);
2301   while (*in >= 0)
2302     {*(vals + *in++) = *(work + *out++);}
2303   PetscFunctionReturn(0);
2304 }
2305 
2306 /******************************************************************************/
2307 static PetscErrorCode gs_gop_times( gs_id *gs,  PetscScalar *vals)
2308 {
2309   PetscFunctionBegin;
2310   /* local only operations!!! */
2311   if (gs->num_local)
2312     {gs_gop_local_times(gs,vals);}
2313 
2314   /* if intersection tree/pairwise and local isn't empty */
2315   if (gs->num_local_gop)
2316     {
2317       gs_gop_local_in_times(gs,vals);
2318 
2319       /* pairwise */
2320       if (gs->num_pairs)
2321         {gs_gop_pairwise_times(gs,vals);}
2322 
2323       /* tree */
2324       else if (gs->max_left_over)
2325         {gs_gop_tree_times(gs,vals);}
2326 
2327       gs_gop_local_out(gs,vals);
2328     }
2329   /* if intersection tree/pairwise and local is empty */
2330   else
2331     {
2332       /* pairwise */
2333       if (gs->num_pairs)
2334         {gs_gop_pairwise_times(gs,vals);}
2335 
2336       /* tree */
2337       else if (gs->max_left_over)
2338         {gs_gop_tree_times(gs,vals);}
2339     }
2340   PetscFunctionReturn(0);
2341 }
2342 
2343 /******************************************************************************/
2344 static PetscErrorCode gs_gop_local_times( gs_id *gs,  PetscScalar *vals)
2345 {
2346    PetscInt *num, *map, **reduce;
2347    PetscScalar tmp;
2348 
2349   PetscFunctionBegin;
2350   num    = gs->num_local_reduce;
2351   reduce = gs->local_reduce;
2352   while ((map = *reduce))
2353     {
2354       /* wall */
2355       if (*num == 2)
2356         {
2357           num ++; reduce++;
2358           vals[map[1]] = vals[map[0]] *= vals[map[1]];
2359         }
2360       /* corner shared by three elements */
2361       else if (*num == 3)
2362         {
2363           num ++; reduce++;
2364           vals[map[2]]=vals[map[1]]=vals[map[0]]*=(vals[map[1]]*vals[map[2]]);
2365         }
2366       /* corner shared by four elements */
2367       else if (*num == 4)
2368         {
2369           num ++; reduce++;
2370           vals[map[1]]=vals[map[2]]=vals[map[3]]=vals[map[0]] *=
2371                                  (vals[map[1]] * vals[map[2]] * vals[map[3]]);
2372         }
2373       /* general case ... odd geoms ... 3D*/
2374       else
2375         {
2376           num ++;
2377           tmp = 1.0;
2378           while (*map >= 0)
2379             {tmp *= *(vals + *map++);}
2380 
2381           map = *reduce++;
2382           while (*map >= 0)
2383             {*(vals + *map++) = tmp;}
2384         }
2385     }
2386   PetscFunctionReturn(0);
2387 }
2388 
2389 /******************************************************************************/
2390 static PetscErrorCode gs_gop_local_in_times( gs_id *gs,  PetscScalar *vals)
2391 {
2392    PetscInt *num, *map, **reduce;
2393    PetscScalar *base;
2394 
2395   PetscFunctionBegin;
2396   num    = gs->num_gop_local_reduce;
2397   reduce = gs->gop_local_reduce;
2398   while ((map = *reduce++))
2399     {
2400       /* wall */
2401       if (*num == 2)
2402         {
2403           num ++;
2404           vals[map[0]] *= vals[map[1]];
2405         }
2406       /* corner shared by three elements */
2407       else if (*num == 3)
2408         {
2409           num ++;
2410           vals[map[0]] *= (vals[map[1]] * vals[map[2]]);
2411         }
2412       /* corner shared by four elements */
2413       else if (*num == 4)
2414         {
2415           num ++;
2416           vals[map[0]] *= (vals[map[1]] * vals[map[2]] * vals[map[3]]);
2417         }
2418       /* general case ... odd geoms ... 3D*/
2419       else
2420         {
2421           num++;
2422           base = vals + *map++;
2423           while (*map >= 0)
2424             {*base *= *(vals + *map++);}
2425         }
2426     }
2427   PetscFunctionReturn(0);
2428 }
2429 
2430 /******************************************************************************/
2431 static PetscErrorCode gs_gop_pairwise_times( gs_id *gs,  PetscScalar *in_vals)
2432 {
2433    PetscScalar *dptr1, *dptr2, *dptr3, *in1, *in2;
2434    PetscInt *iptr, *msg_list, *msg_size, **msg_nodes;
2435    PetscInt *pw, *list, *size, **nodes;
2436   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
2437   MPI_Status status;
2438   PetscErrorCode ierr;
2439 
2440   PetscFunctionBegin;
2441   /* strip and load s */
2442   msg_list =list         = gs->pair_list;
2443   msg_size =size         = gs->msg_sizes;
2444   msg_nodes=nodes        = gs->node_list;
2445   iptr=pw                = gs->pw_elm_list;
2446   dptr1=dptr3            = gs->pw_vals;
2447   msg_ids_in  = ids_in   = gs->msg_ids_in;
2448   msg_ids_out = ids_out  = gs->msg_ids_out;
2449   dptr2                  = gs->out;
2450   in1=in2                = gs->in;
2451 
2452   /* post the receives */
2453   /*  msg_nodes=nodes; */
2454   do
2455     {
2456       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
2457          second one *list and do list++ afterwards */
2458       ierr = MPI_Irecv(in1, *size, MPIU_SCALAR, MPI_ANY_SOURCE, MSGTAG1 + *list++, gs->gs_comm, msg_ids_in++);CHKERRQ(ierr);
2459       in1 += *size++;
2460     }
2461   while (*++msg_nodes);
2462   msg_nodes=nodes;
2463 
2464   /* load gs values into in out gs buffers */
2465   while (*iptr >= 0)
2466     {*dptr3++ = *(in_vals + *iptr++);}
2467 
2468   /* load out buffers and post the sends */
2469   while ((iptr = *msg_nodes++))
2470     {
2471       dptr3 = dptr2;
2472       while (*iptr >= 0)
2473         {*dptr2++ = *(dptr1 + *iptr++);}
2474       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
2475       /* is msg_ids_out++ correct? */
2476       ierr = MPI_Isend(dptr3, *msg_size++, MPIU_SCALAR, *msg_list++, MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);CHKERRQ(ierr);
2477     }
2478 
2479   if (gs->max_left_over)
2480     {gs_gop_tree_times(gs,in_vals);}
2481 
2482   /* process the received data */
2483   msg_nodes=nodes;
2484   while ((iptr = *nodes++))
2485     {
2486       /* Should I check the return value of MPI_Wait() or status? */
2487       /* Can this loop be replaced by a call to MPI_Waitall()? */
2488       ierr = MPI_Wait(ids_in++, &status);CHKERRQ(ierr);
2489       while (*iptr >= 0)
2490         {*(dptr1 + *iptr++) *= *in2++;}
2491     }
2492 
2493   /* replace vals */
2494   while (*pw >= 0)
2495     {*(in_vals + *pw++) = *dptr1++;}
2496 
2497   /* clear isend message handles */
2498   /* This changed for clarity though it could be the same */
2499   while (*msg_nodes++)
2500     /* Should I check the return value of MPI_Wait() or status? */
2501     /* Can this loop be replaced by a call to MPI_Waitall()? */
2502     {ierr = MPI_Wait(ids_out++, &status);CHKERRQ(ierr);}
2503   PetscFunctionReturn(0);
2504 }
2505 
2506 /******************************************************************************/
2507 static PetscErrorCode gs_gop_tree_times(gs_id *gs, PetscScalar *vals)
2508 {
2509   PetscInt size;
2510   PetscInt *in, *out;
2511   PetscScalar *buf, *work;
2512   PetscErrorCode ierr;
2513 
2514   PetscFunctionBegin;
2515   in   = gs->tree_map_in;
2516   out  = gs->tree_map_out;
2517   buf  = gs->tree_buf;
2518   work = gs->tree_work;
2519   size = gs->tree_nel;
2520 
2521   rvec_one(buf,size);
2522 
2523   while (*in >= 0)
2524     {*(buf + *out++) = *(vals + *in++);}
2525 
2526   in   = gs->tree_map_in;
2527   out  = gs->tree_map_out;
2528   ierr = MPI_Allreduce(buf,work,size,MPIU_SCALAR,MPI_PROD,gs->gs_comm);CHKERRQ(ierr);
2529   while (*in >= 0)
2530     {*(vals + *in++) = *(work + *out++);}
2531   PetscFunctionReturn(0);
2532 }
2533 
2534 /******************************************************************************/
2535 static PetscErrorCode gs_gop_plus( gs_id *gs,  PetscScalar *vals)
2536 {
2537   PetscFunctionBegin;
2538   /* local only operations!!! */
2539   if (gs->num_local)
2540     {gs_gop_local_plus(gs,vals);}
2541 
2542   /* if intersection tree/pairwise and local isn't empty */
2543   if (gs->num_local_gop)
2544     {
2545       gs_gop_local_in_plus(gs,vals);
2546 
2547       /* pairwise will NOT do tree inside ... */
2548       if (gs->num_pairs)
2549         {gs_gop_pairwise_plus(gs,vals);}
2550 
2551       /* tree */
2552       if (gs->max_left_over)
2553         {gs_gop_tree_plus(gs,vals);}
2554 
2555       gs_gop_local_out(gs,vals);
2556     }
2557   /* if intersection tree/pairwise and local is empty */
2558   else
2559     {
2560       /* pairwise will NOT do tree inside */
2561       if (gs->num_pairs)
2562         {gs_gop_pairwise_plus(gs,vals);}
2563 
2564       /* tree */
2565       if (gs->max_left_over)
2566         {gs_gop_tree_plus(gs,vals);}
2567     }
2568   PetscFunctionReturn(0);
2569 }
2570 
2571 /******************************************************************************/
2572 static PetscErrorCode gs_gop_local_plus( gs_id *gs,  PetscScalar *vals)
2573 {
2574    PetscInt *num, *map, **reduce;
2575    PetscScalar tmp;
2576 
2577   PetscFunctionBegin;
2578   num    = gs->num_local_reduce;
2579   reduce = gs->local_reduce;
2580   while ((map = *reduce))
2581     {
2582       /* wall */
2583       if (*num == 2)
2584         {
2585           num ++; reduce++;
2586           vals[map[1]] = vals[map[0]] += vals[map[1]];
2587         }
2588       /* corner shared by three elements */
2589       else if (*num == 3)
2590         {
2591           num ++; reduce++;
2592           vals[map[2]]=vals[map[1]]=vals[map[0]]+=(vals[map[1]]+vals[map[2]]);
2593         }
2594       /* corner shared by four elements */
2595       else if (*num == 4)
2596         {
2597           num ++; reduce++;
2598           vals[map[1]]=vals[map[2]]=vals[map[3]]=vals[map[0]] +=
2599                                  (vals[map[1]] + vals[map[2]] + vals[map[3]]);
2600         }
2601       /* general case ... odd geoms ... 3D*/
2602       else
2603         {
2604           num ++;
2605           tmp = 0.0;
2606           while (*map >= 0)
2607             {tmp += *(vals + *map++);}
2608 
2609           map = *reduce++;
2610           while (*map >= 0)
2611             {*(vals + *map++) = tmp;}
2612         }
2613     }
2614   PetscFunctionReturn(0);
2615 }
2616 
2617 /******************************************************************************/
2618 static PetscErrorCode gs_gop_local_in_plus( gs_id *gs,  PetscScalar *vals)
2619 {
2620    PetscInt *num, *map, **reduce;
2621    PetscScalar *base;
2622 
2623   PetscFunctionBegin;
2624   num    = gs->num_gop_local_reduce;
2625   reduce = gs->gop_local_reduce;
2626   while ((map = *reduce++))
2627     {
2628       /* wall */
2629       if (*num == 2)
2630         {
2631           num ++;
2632           vals[map[0]] += vals[map[1]];
2633         }
2634       /* corner shared by three elements */
2635       else if (*num == 3)
2636         {
2637           num ++;
2638           vals[map[0]] += (vals[map[1]] + vals[map[2]]);
2639         }
2640       /* corner shared by four elements */
2641       else if (*num == 4)
2642         {
2643           num ++;
2644           vals[map[0]] += (vals[map[1]] + vals[map[2]] + vals[map[3]]);
2645         }
2646       /* general case ... odd geoms ... 3D*/
2647       else
2648         {
2649           num++;
2650           base = vals + *map++;
2651           while (*map >= 0)
2652             {*base += *(vals + *map++);}
2653         }
2654     }
2655   PetscFunctionReturn(0);
2656 }
2657 
2658 /******************************************************************************/
2659 static PetscErrorCode gs_gop_pairwise_plus( gs_id *gs,  PetscScalar *in_vals)
2660 {
2661    PetscScalar *dptr1, *dptr2, *dptr3, *in1, *in2;
2662    PetscInt *iptr, *msg_list, *msg_size, **msg_nodes;
2663    PetscInt *pw, *list, *size, **nodes;
2664   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
2665   MPI_Status status;
2666   PetscErrorCode ierr;
2667 
2668   PetscFunctionBegin;
2669   /* strip and load s */
2670   msg_list =list         = gs->pair_list;
2671   msg_size =size         = gs->msg_sizes;
2672   msg_nodes=nodes        = gs->node_list;
2673   iptr=pw                = gs->pw_elm_list;
2674   dptr1=dptr3            = gs->pw_vals;
2675   msg_ids_in  = ids_in   = gs->msg_ids_in;
2676   msg_ids_out = ids_out  = gs->msg_ids_out;
2677   dptr2                  = gs->out;
2678   in1=in2                = gs->in;
2679 
2680   /* post the receives */
2681   /*  msg_nodes=nodes; */
2682   do
2683     {
2684       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
2685          second one *list and do list++ afterwards */
2686       ierr = MPI_Irecv(in1, *size, MPIU_SCALAR, MPI_ANY_SOURCE, MSGTAG1 + *list++, gs->gs_comm, msg_ids_in++);CHKERRQ(ierr);
2687       in1 += *size++;
2688     }
2689   while (*++msg_nodes);
2690   msg_nodes=nodes;
2691 
2692   /* load gs values into in out gs buffers */
2693   while (*iptr >= 0)
2694     {*dptr3++ = *(in_vals + *iptr++);}
2695 
2696   /* load out buffers and post the sends */
2697   while ((iptr = *msg_nodes++))
2698     {
2699       dptr3 = dptr2;
2700       while (*iptr >= 0)
2701         {*dptr2++ = *(dptr1 + *iptr++);}
2702       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
2703       /* is msg_ids_out++ correct? */
2704       ierr = MPI_Isend(dptr3, *msg_size++, MPIU_SCALAR, *msg_list++, MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);CHKERRQ(ierr);
2705     }
2706 
2707   /* do the tree while we're waiting */
2708   if (gs->max_left_over)
2709     {gs_gop_tree_plus(gs,in_vals);}
2710 
2711   /* process the received data */
2712   msg_nodes=nodes;
2713   while ((iptr = *nodes++))
2714     {
2715       /* Should I check the return value of MPI_Wait() or status? */
2716       /* Can this loop be replaced by a call to MPI_Waitall()? */
2717       ierr = MPI_Wait(ids_in++, &status);CHKERRQ(ierr);
2718       while (*iptr >= 0)
2719         {*(dptr1 + *iptr++) += *in2++;}
2720     }
2721 
2722   /* replace vals */
2723   while (*pw >= 0)
2724     {*(in_vals + *pw++) = *dptr1++;}
2725 
2726   /* clear isend message handles */
2727   /* This changed for clarity though it could be the same */
2728   while (*msg_nodes++)
2729     /* Should I check the return value of MPI_Wait() or status? */
2730     /* Can this loop be replaced by a call to MPI_Waitall()? */
2731     {ierr = MPI_Wait(ids_out++, &status);CHKERRQ(ierr);}
2732   PetscFunctionReturn(0);
2733 }
2734 
2735 /******************************************************************************/
2736 static PetscErrorCode gs_gop_tree_plus(gs_id *gs, PetscScalar *vals)
2737 {
2738   PetscInt size;
2739   PetscInt *in, *out;
2740   PetscScalar *buf, *work;
2741   PetscErrorCode ierr;
2742 
2743   PetscFunctionBegin;
2744   in   = gs->tree_map_in;
2745   out  = gs->tree_map_out;
2746   buf  = gs->tree_buf;
2747   work = gs->tree_work;
2748   size = gs->tree_nel;
2749 
2750   rvec_zero(buf,size);
2751 
2752   while (*in >= 0)
2753     {*(buf + *out++) = *(vals + *in++);}
2754 
2755   in   = gs->tree_map_in;
2756   out  = gs->tree_map_out;
2757   ierr = MPI_Allreduce(buf,work,size,MPIU_SCALAR,MPI_SUM,gs->gs_comm);CHKERRQ(ierr);
2758   while (*in >= 0)
2759     {*(vals + *in++) = *(work + *out++);}
2760   PetscFunctionReturn(0);
2761 }
2762 
2763 /******************************************************************************/
2764 PetscErrorCode gs_free( gs_id *gs)
2765 {
2766    PetscInt i;
2767 
2768   PetscFunctionBegin;
2769   if (gs->nghs) {free((void*) gs->nghs);}
2770   if (gs->pw_nghs) {free((void*) gs->pw_nghs);}
2771 
2772   /* tree */
2773   if (gs->max_left_over)
2774     {
2775       if (gs->tree_elms) {free((void*) gs->tree_elms);}
2776       if (gs->tree_buf) {free((void*) gs->tree_buf);}
2777       if (gs->tree_work) {free((void*) gs->tree_work);}
2778       if (gs->tree_map_in) {free((void*) gs->tree_map_in);}
2779       if (gs->tree_map_out) {free((void*) gs->tree_map_out);}
2780     }
2781 
2782   /* pairwise info */
2783   if (gs->num_pairs)
2784     {
2785       /* should be NULL already */
2786       if (gs->ngh_buf) {free((void*) gs->ngh_buf);}
2787       if (gs->elms) {free((void*) gs->elms);}
2788       if (gs->local_elms) {free((void*) gs->local_elms);}
2789       if (gs->companion) {free((void*) gs->companion);}
2790 
2791       /* only set if pairwise */
2792       if (gs->vals) {free((void*) gs->vals);}
2793       if (gs->in) {free((void*) gs->in);}
2794       if (gs->out) {free((void*) gs->out);}
2795       if (gs->msg_ids_in) {free((void*) gs->msg_ids_in);}
2796       if (gs->msg_ids_out) {free((void*) gs->msg_ids_out);}
2797       if (gs->pw_vals) {free((void*) gs->pw_vals);}
2798       if (gs->pw_elm_list) {free((void*) gs->pw_elm_list);}
2799       if (gs->node_list)
2800         {
2801           for (i=0;i<gs->num_pairs;i++)
2802             {if (gs->node_list[i]) {free((void*) gs->node_list[i]);}}
2803           free((void*) gs->node_list);
2804         }
2805       if (gs->msg_sizes) {free((void*) gs->msg_sizes);}
2806       if (gs->pair_list) {free((void*) gs->pair_list);}
2807     }
2808 
2809   /* local info */
2810   if (gs->num_local_total>=0)
2811     {
2812       for (i=0;i<gs->num_local_total+1;i++)
2813         /*      for (i=0;i<gs->num_local_total;i++) */
2814         {
2815           if (gs->num_gop_local_reduce[i])
2816             {free((void*) gs->gop_local_reduce[i]);}
2817         }
2818     }
2819 
2820   /* if intersection tree/pairwise and local isn't empty */
2821   if (gs->gop_local_reduce) {free((void*) gs->gop_local_reduce);}
2822   if (gs->num_gop_local_reduce) {free((void*) gs->num_gop_local_reduce);}
2823 
2824   free((void*) gs);
2825   PetscFunctionReturn(0);
2826 }
2827 
2828 /******************************************************************************/
2829 PetscErrorCode gs_gop_vec( gs_id *gs,  PetscScalar *vals,  const char *op,  PetscInt step)
2830 {
2831   PetscErrorCode ierr;
2832 
2833   PetscFunctionBegin;
2834   switch (*op) {
2835   case '+':
2836     gs_gop_vec_plus(gs,vals,step);
2837     break;
2838   default:
2839     ierr = PetscInfo1(0,"gs_gop_vec() :: %c is not a valid op",op[0]);
2840     ierr = PetscInfo(0,"gs_gop_vec() :: default :: plus");
2841     gs_gop_vec_plus(gs,vals,step);
2842     break;
2843   }
2844   PetscFunctionReturn(0);
2845 }
2846 
2847 /******************************************************************************/
2848 static PetscErrorCode gs_gop_vec_plus( gs_id *gs,  PetscScalar *vals,  PetscInt step)
2849 {
2850   PetscFunctionBegin;
2851   if (!gs) {SETERRQ(PETSC_ERR_PLIB,"gs_gop_vec() passed NULL gs handle!!!");}
2852 
2853   /* local only operations!!! */
2854   if (gs->num_local)
2855     {gs_gop_vec_local_plus(gs,vals,step);}
2856 
2857   /* if intersection tree/pairwise and local isn't empty */
2858   if (gs->num_local_gop)
2859     {
2860       gs_gop_vec_local_in_plus(gs,vals,step);
2861 
2862       /* pairwise */
2863       if (gs->num_pairs)
2864         {gs_gop_vec_pairwise_plus(gs,vals,step);}
2865 
2866       /* tree */
2867       else if (gs->max_left_over)
2868         {gs_gop_vec_tree_plus(gs,vals,step);}
2869 
2870       gs_gop_vec_local_out(gs,vals,step);
2871     }
2872   /* if intersection tree/pairwise and local is empty */
2873   else
2874     {
2875       /* pairwise */
2876       if (gs->num_pairs)
2877         {gs_gop_vec_pairwise_plus(gs,vals,step);}
2878 
2879       /* tree */
2880       else if (gs->max_left_over)
2881         {gs_gop_vec_tree_plus(gs,vals,step);}
2882     }
2883   PetscFunctionReturn(0);
2884 }
2885 
2886 /******************************************************************************/
2887 static PetscErrorCode gs_gop_vec_local_plus( gs_id *gs,  PetscScalar *vals, PetscInt step)
2888 {
2889    PetscInt *num, *map, **reduce;
2890    PetscScalar *base;
2891 
2892   PetscFunctionBegin;
2893   num    = gs->num_local_reduce;
2894   reduce = gs->local_reduce;
2895   while ((map = *reduce))
2896     {
2897       base = vals + map[0] * step;
2898 
2899       /* wall */
2900       if (*num == 2)
2901         {
2902           num++; reduce++;
2903           rvec_add (base,vals+map[1]*step,step);
2904           rvec_copy(vals+map[1]*step,base,step);
2905         }
2906       /* corner shared by three elements */
2907       else if (*num == 3)
2908         {
2909           num++; reduce++;
2910           rvec_add (base,vals+map[1]*step,step);
2911           rvec_add (base,vals+map[2]*step,step);
2912           rvec_copy(vals+map[2]*step,base,step);
2913           rvec_copy(vals+map[1]*step,base,step);
2914         }
2915       /* corner shared by four elements */
2916       else if (*num == 4)
2917         {
2918           num++; reduce++;
2919           rvec_add (base,vals+map[1]*step,step);
2920           rvec_add (base,vals+map[2]*step,step);
2921           rvec_add (base,vals+map[3]*step,step);
2922           rvec_copy(vals+map[3]*step,base,step);
2923           rvec_copy(vals+map[2]*step,base,step);
2924           rvec_copy(vals+map[1]*step,base,step);
2925         }
2926       /* general case ... odd geoms ... 3D */
2927       else
2928         {
2929           num++;
2930           while (*++map >= 0)
2931             {rvec_add (base,vals+*map*step,step);}
2932 
2933           map = *reduce;
2934           while (*++map >= 0)
2935             {rvec_copy(vals+*map*step,base,step);}
2936 
2937           reduce++;
2938         }
2939     }
2940   PetscFunctionReturn(0);
2941 }
2942 
2943 /******************************************************************************/
2944 static PetscErrorCode gs_gop_vec_local_in_plus( gs_id *gs,  PetscScalar *vals, PetscInt step)
2945 {
2946    PetscInt  *num, *map, **reduce;
2947    PetscScalar *base;
2948   PetscFunctionBegin;
2949   num    = gs->num_gop_local_reduce;
2950   reduce = gs->gop_local_reduce;
2951   while ((map = *reduce++))
2952     {
2953       base = vals + map[0] * step;
2954 
2955       /* wall */
2956       if (*num == 2)
2957         {
2958           num ++;
2959           rvec_add(base,vals+map[1]*step,step);
2960         }
2961       /* corner shared by three elements */
2962       else if (*num == 3)
2963         {
2964           num ++;
2965           rvec_add(base,vals+map[1]*step,step);
2966           rvec_add(base,vals+map[2]*step,step);
2967         }
2968       /* corner shared by four elements */
2969       else if (*num == 4)
2970         {
2971           num ++;
2972           rvec_add(base,vals+map[1]*step,step);
2973           rvec_add(base,vals+map[2]*step,step);
2974           rvec_add(base,vals+map[3]*step,step);
2975         }
2976       /* general case ... odd geoms ... 3D*/
2977       else
2978         {
2979           num++;
2980           while (*++map >= 0)
2981             {rvec_add(base,vals+*map*step,step);}
2982         }
2983     }
2984   PetscFunctionReturn(0);
2985 }
2986 
2987 /******************************************************************************/
2988 static PetscErrorCode gs_gop_vec_local_out( gs_id *gs,  PetscScalar *vals, PetscInt step)
2989 {
2990    PetscInt *num, *map, **reduce;
2991    PetscScalar *base;
2992 
2993   PetscFunctionBegin;
2994   num    = gs->num_gop_local_reduce;
2995   reduce = gs->gop_local_reduce;
2996   while ((map = *reduce++))
2997     {
2998       base = vals + map[0] * step;
2999 
3000       /* wall */
3001       if (*num == 2)
3002         {
3003           num ++;
3004           rvec_copy(vals+map[1]*step,base,step);
3005         }
3006       /* corner shared by three elements */
3007       else if (*num == 3)
3008         {
3009           num ++;
3010           rvec_copy(vals+map[1]*step,base,step);
3011           rvec_copy(vals+map[2]*step,base,step);
3012         }
3013       /* corner shared by four elements */
3014       else if (*num == 4)
3015         {
3016           num ++;
3017           rvec_copy(vals+map[1]*step,base,step);
3018           rvec_copy(vals+map[2]*step,base,step);
3019           rvec_copy(vals+map[3]*step,base,step);
3020         }
3021       /* general case ... odd geoms ... 3D*/
3022       else
3023         {
3024           num++;
3025           while (*++map >= 0)
3026             {rvec_copy(vals+*map*step,base,step);}
3027         }
3028     }
3029   PetscFunctionReturn(0);
3030 }
3031 
3032 /******************************************************************************/
3033 static PetscErrorCode gs_gop_vec_pairwise_plus( gs_id *gs,  PetscScalar *in_vals, PetscInt step)
3034 {
3035    PetscScalar *dptr1, *dptr2, *dptr3, *in1, *in2;
3036    PetscInt *iptr, *msg_list, *msg_size, **msg_nodes;
3037    PetscInt *pw, *list, *size, **nodes;
3038   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
3039   MPI_Status status;
3040   PetscBLASInt i1;
3041   PetscErrorCode ierr;
3042 
3043   PetscFunctionBegin;
3044   /* strip and load s */
3045   msg_list =list         = gs->pair_list;
3046   msg_size =size         = gs->msg_sizes;
3047   msg_nodes=nodes        = gs->node_list;
3048   iptr=pw                = gs->pw_elm_list;
3049   dptr1=dptr3            = gs->pw_vals;
3050   msg_ids_in  = ids_in   = gs->msg_ids_in;
3051   msg_ids_out = ids_out  = gs->msg_ids_out;
3052   dptr2                  = gs->out;
3053   in1=in2                = gs->in;
3054 
3055   /* post the receives */
3056   /*  msg_nodes=nodes; */
3057   do
3058     {
3059       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
3060          second one *list and do list++ afterwards */
3061       ierr = MPI_Irecv(in1, *size *step, MPIU_SCALAR, MPI_ANY_SOURCE, MSGTAG1 + *list++, gs->gs_comm, msg_ids_in++);CHKERRQ(ierr);
3062       in1 += *size++ *step;
3063     }
3064   while (*++msg_nodes);
3065   msg_nodes=nodes;
3066 
3067   /* load gs values into in out gs buffers */
3068   while (*iptr >= 0)
3069     {
3070       rvec_copy(dptr3,in_vals + *iptr*step,step);
3071       dptr3+=step;
3072       iptr++;
3073     }
3074 
3075   /* load out buffers and post the sends */
3076   while ((iptr = *msg_nodes++))
3077     {
3078       dptr3 = dptr2;
3079       while (*iptr >= 0)
3080         {
3081           rvec_copy(dptr2,dptr1 + *iptr*step,step);
3082           dptr2+=step;
3083           iptr++;
3084         }
3085       ierr = MPI_Isend(dptr3, *msg_size++ *step, MPIU_SCALAR, *msg_list++, MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);CHKERRQ(ierr);
3086     }
3087 
3088   /* tree */
3089   if (gs->max_left_over)
3090     {gs_gop_vec_tree_plus(gs,in_vals,step);}
3091 
3092   /* process the received data */
3093   msg_nodes=nodes;
3094   while ((iptr = *nodes++)){
3095     PetscScalar d1 = 1.0;
3096       /* Should I check the return value of MPI_Wait() or status? */
3097       /* Can this loop be replaced by a call to MPI_Waitall()? */
3098       ierr = MPI_Wait(ids_in++, &status);CHKERRQ(ierr);
3099       while (*iptr >= 0) {
3100           BLASaxpy_(&step,&d1,in2,&i1,dptr1 + *iptr*step,&i1);
3101           in2+=step;
3102           iptr++;
3103       }
3104   }
3105 
3106   /* replace vals */
3107   while (*pw >= 0)
3108     {
3109       rvec_copy(in_vals + *pw*step,dptr1,step);
3110       dptr1+=step;
3111       pw++;
3112     }
3113 
3114   /* clear isend message handles */
3115   /* This changed for clarity though it could be the same */
3116   while (*msg_nodes++)
3117     /* Should I check the return value of MPI_Wait() or status? */
3118     /* Can this loop be replaced by a call to MPI_Waitall()? */
3119     {ierr = MPI_Wait(ids_out++, &status);CHKERRQ(ierr);}
3120 
3121   PetscFunctionReturn(0);
3122 }
3123 
3124 /******************************************************************************/
3125 static PetscErrorCode gs_gop_vec_tree_plus( gs_id *gs,  PetscScalar *vals,  PetscInt step)
3126 {
3127   PetscInt size, *in, *out;
3128   PetscScalar *buf, *work;
3129   PetscInt op[] = {GL_ADD,0};
3130   PetscBLASInt i1 = 1;
3131 
3132   PetscFunctionBegin;
3133   /* copy over to local variables */
3134   in   = gs->tree_map_in;
3135   out  = gs->tree_map_out;
3136   buf  = gs->tree_buf;
3137   work = gs->tree_work;
3138   size = gs->tree_nel*step;
3139 
3140   /* zero out collection buffer */
3141   rvec_zero(buf,size);
3142 
3143 
3144   /* copy over my contributions */
3145   while (*in >= 0)
3146     {
3147       BLAScopy_(&step,vals + *in++*step,&i1,buf + *out++*step,&i1);
3148     }
3149 
3150   /* perform fan in/out on full buffer */
3151   /* must change grop to handle the blas */
3152   grop(buf,work,size,op);
3153 
3154   /* reset */
3155   in   = gs->tree_map_in;
3156   out  = gs->tree_map_out;
3157 
3158   /* get the portion of the results I need */
3159   while (*in >= 0)
3160     {
3161       BLAScopy_(&step,buf + *out++*step,&i1,vals + *in++*step,&i1);
3162     }
3163   PetscFunctionReturn(0);
3164 }
3165 
3166 /******************************************************************************/
3167 PetscErrorCode gs_gop_hc( gs_id *gs,  PetscScalar *vals,  const char *op,  PetscInt dim)
3168 {
3169   PetscErrorCode ierr;
3170 
3171   PetscFunctionBegin;
3172   switch (*op) {
3173   case '+':
3174     gs_gop_plus_hc(gs,vals,dim);
3175     break;
3176   default:
3177     ierr = PetscInfo1(0,"gs_gop_hc() :: %c is not a valid op",op[0]);
3178     ierr = PetscInfo(0,"gs_gop_hc() :: default :: plus\n");
3179     gs_gop_plus_hc(gs,vals,dim);
3180     break;
3181   }
3182   PetscFunctionReturn(0);
3183 }
3184 
3185 /******************************************************************************/
3186 static PetscErrorCode gs_gop_plus_hc( gs_id *gs,  PetscScalar *vals, PetscInt dim)
3187 {
3188   PetscFunctionBegin;
3189   /* if there's nothing to do return */
3190   if (dim<=0)
3191     {  PetscFunctionReturn(0);}
3192 
3193   /* can't do more dimensions then exist */
3194   dim = PetscMin(dim,i_log2_num_nodes);
3195 
3196   /* local only operations!!! */
3197   if (gs->num_local)
3198     {gs_gop_local_plus(gs,vals);}
3199 
3200   /* if intersection tree/pairwise and local isn't empty */
3201   if (gs->num_local_gop)
3202     {
3203       gs_gop_local_in_plus(gs,vals);
3204 
3205       /* pairwise will do tree inside ... */
3206       if (gs->num_pairs)
3207         {gs_gop_pairwise_plus_hc(gs,vals,dim);}
3208 
3209       /* tree only */
3210       else if (gs->max_left_over)
3211         {gs_gop_tree_plus_hc(gs,vals,dim);}
3212 
3213       gs_gop_local_out(gs,vals);
3214     }
3215   /* if intersection tree/pairwise and local is empty */
3216   else
3217     {
3218       /* pairwise will do tree inside */
3219       if (gs->num_pairs)
3220         {gs_gop_pairwise_plus_hc(gs,vals,dim);}
3221 
3222       /* tree */
3223       else if (gs->max_left_over)
3224         {gs_gop_tree_plus_hc(gs,vals,dim);}
3225     }
3226   PetscFunctionReturn(0);
3227 }
3228 
3229 /******************************************************************************/
3230 static PetscErrorCode gs_gop_pairwise_plus_hc( gs_id *gs,  PetscScalar *in_vals, PetscInt dim)
3231 {
3232    PetscScalar *dptr1, *dptr2, *dptr3, *in1, *in2;
3233    PetscInt *iptr, *msg_list, *msg_size, **msg_nodes;
3234    PetscInt *pw, *list, *size, **nodes;
3235   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
3236   MPI_Status status;
3237   PetscInt i, mask=1;
3238   PetscErrorCode ierr;
3239 
3240   PetscFunctionBegin;
3241   for (i=1; i<dim; i++)
3242     {mask<<=1; mask++;}
3243 
3244 
3245   /* strip and load s */
3246   msg_list =list         = gs->pair_list;
3247   msg_size =size         = gs->msg_sizes;
3248   msg_nodes=nodes        = gs->node_list;
3249   iptr=pw                = gs->pw_elm_list;
3250   dptr1=dptr3            = gs->pw_vals;
3251   msg_ids_in  = ids_in   = gs->msg_ids_in;
3252   msg_ids_out = ids_out  = gs->msg_ids_out;
3253   dptr2                  = gs->out;
3254   in1=in2                = gs->in;
3255 
3256   /* post the receives */
3257   /*  msg_nodes=nodes; */
3258   do
3259     {
3260       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
3261          second one *list and do list++ afterwards */
3262       if ((my_id|mask)==(*list|mask))
3263         {
3264           ierr = MPI_Irecv(in1, *size, MPIU_SCALAR, MPI_ANY_SOURCE, MSGTAG1 + *list++, gs->gs_comm, msg_ids_in++);CHKERRQ(ierr);
3265           in1 += *size++;
3266         }
3267       else
3268         {list++; size++;}
3269     }
3270   while (*++msg_nodes);
3271 
3272   /* load gs values into in out gs buffers */
3273   while (*iptr >= 0)
3274     {*dptr3++ = *(in_vals + *iptr++);}
3275 
3276   /* load out buffers and post the sends */
3277   msg_nodes=nodes;
3278   list = msg_list;
3279   while ((iptr = *msg_nodes++))
3280     {
3281       if ((my_id|mask)==(*list|mask))
3282         {
3283           dptr3 = dptr2;
3284           while (*iptr >= 0)
3285             {*dptr2++ = *(dptr1 + *iptr++);}
3286           /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
3287           /* is msg_ids_out++ correct? */
3288           ierr = MPI_Isend(dptr3, *msg_size++, MPIU_SCALAR, *list++, MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);CHKERRQ(ierr);
3289         }
3290       else
3291         {list++; msg_size++;}
3292     }
3293 
3294   /* do the tree while we're waiting */
3295   if (gs->max_left_over)
3296     {gs_gop_tree_plus_hc(gs,in_vals,dim);}
3297 
3298   /* process the received data */
3299   msg_nodes=nodes;
3300   list = msg_list;
3301   while ((iptr = *nodes++))
3302     {
3303       if ((my_id|mask)==(*list|mask))
3304         {
3305           /* Should I check the return value of MPI_Wait() or status? */
3306           /* Can this loop be replaced by a call to MPI_Waitall()? */
3307           ierr = MPI_Wait(ids_in++, &status);CHKERRQ(ierr);
3308           while (*iptr >= 0)
3309             {*(dptr1 + *iptr++) += *in2++;}
3310         }
3311       list++;
3312     }
3313 
3314   /* replace vals */
3315   while (*pw >= 0)
3316     {*(in_vals + *pw++) = *dptr1++;}
3317 
3318   /* clear isend message handles */
3319   /* This changed for clarity though it could be the same */
3320   while (*msg_nodes++)
3321     {
3322       if ((my_id|mask)==(*msg_list|mask))
3323         {
3324           /* Should I check the return value of MPI_Wait() or status? */
3325           /* Can this loop be replaced by a call to MPI_Waitall()? */
3326           ierr = MPI_Wait(ids_out++, &status);CHKERRQ(ierr);
3327         }
3328       msg_list++;
3329     }
3330 
3331   PetscFunctionReturn(0);
3332 }
3333 
3334 /******************************************************************************/
3335 static PetscErrorCode gs_gop_tree_plus_hc(gs_id *gs, PetscScalar *vals, PetscInt dim)
3336 {
3337   PetscInt size;
3338   PetscInt *in, *out;
3339   PetscScalar *buf, *work;
3340   PetscInt op[] = {GL_ADD,0};
3341 
3342   PetscFunctionBegin;
3343   in   = gs->tree_map_in;
3344   out  = gs->tree_map_out;
3345   buf  = gs->tree_buf;
3346   work = gs->tree_work;
3347   size = gs->tree_nel;
3348 
3349   rvec_zero(buf,size);
3350 
3351   while (*in >= 0)
3352     {*(buf + *out++) = *(vals + *in++);}
3353 
3354   in   = gs->tree_map_in;
3355   out  = gs->tree_map_out;
3356 
3357   grop_hc(buf,work,size,op,dim);
3358 
3359   while (*in >= 0)
3360     {*(vals + *in++) = *(buf + *out++);}
3361   PetscFunctionReturn(0);
3362 }
3363 
3364 
3365 
3366