xref: /petsc/src/ksp/pc/impls/tfs/gs.c (revision 52f87cdab2191b5e933e56fedaec062d60660400)
1 #define PETSCKSP_DLL
2 
3 /***********************************gs.c***************************************
4 
5 Author: Henry M. Tufo III
6 
7 e-mail: hmt@cs.brown.edu
8 
9 snail-mail:
10 Division of Applied Mathematics
11 Brown University
12 Providence, RI 02912
13 
14 Last Modification:
15 6.21.97
16 ************************************gs.c**************************************/
17 
18 /***********************************gs.c***************************************
19 File Description:
20 -----------------
21 
22 ************************************gs.c**************************************/
23 
24 #include "src/ksp/pc/impls/tfs/tfs.h"
25 
26 /* default length of number of items via tree - doubles if exceeded */
27 #define TREE_BUF_SZ 2048;
28 #define GS_VEC_SZ   1
29 
30 
31 
32 /***********************************gs.c***************************************
33 Type: struct gather_scatter_id
34 ------------------------------
35 
36 ************************************gs.c**************************************/
37 typedef struct gather_scatter_id {
38   PetscInt id;
39   PetscInt nel_min;
40   PetscInt nel_max;
41   PetscInt nel_sum;
42   PetscInt negl;
43   PetscInt gl_max;
44   PetscInt gl_min;
45   PetscInt repeats;
46   PetscInt ordered;
47   PetscInt positive;
48   PetscScalar *vals;
49 
50   /* bit mask info */
51   PetscInt *my_proc_mask;
52   PetscInt mask_sz;
53   PetscInt *ngh_buf;
54   PetscInt ngh_buf_sz;
55   PetscInt *nghs;
56   PetscInt num_nghs;
57   PetscInt max_nghs;
58   PetscInt *pw_nghs;
59   PetscInt num_pw_nghs;
60   PetscInt *tree_nghs;
61   PetscInt num_tree_nghs;
62 
63   PetscInt num_loads;
64 
65   /* repeats == true -> local info */
66   PetscInt nel;         /* number of unique elememts */
67   PetscInt *elms;       /* of size nel */
68   PetscInt nel_total;
69   PetscInt *local_elms; /* of size nel_total */
70   PetscInt *companion;  /* of size nel_total */
71 
72   /* local info */
73   PetscInt num_local_total;
74   PetscInt local_strength;
75   PetscInt num_local;
76   PetscInt *num_local_reduce;
77   PetscInt **local_reduce;
78   PetscInt num_local_gop;
79   PetscInt *num_gop_local_reduce;
80   PetscInt **gop_local_reduce;
81 
82   /* pairwise info */
83   PetscInt level;
84   PetscInt num_pairs;
85   PetscInt max_pairs;
86   PetscInt loc_node_pairs;
87   PetscInt max_node_pairs;
88   PetscInt min_node_pairs;
89   PetscInt avg_node_pairs;
90   PetscInt *pair_list;
91   PetscInt *msg_sizes;
92   PetscInt **node_list;
93   PetscInt len_pw_list;
94   PetscInt *pw_elm_list;
95   PetscScalar *pw_vals;
96 
97   MPI_Request *msg_ids_in;
98   MPI_Request *msg_ids_out;
99 
100   PetscScalar *out;
101   PetscScalar *in;
102   PetscInt msg_total;
103 
104   /* tree - crystal accumulator info */
105   PetscInt max_left_over;
106   PetscInt *pre;
107   PetscInt *in_num;
108   PetscInt *out_num;
109   PetscInt **in_list;
110   PetscInt **out_list;
111 
112   /* new tree work*/
113   PetscInt  tree_nel;
114   PetscInt *tree_elms;
115   PetscScalar *tree_buf;
116   PetscScalar *tree_work;
117 
118   PetscInt  tree_map_sz;
119   PetscInt *tree_map_in;
120   PetscInt *tree_map_out;
121 
122   /* current memory status */
123   PetscInt gl_bss_min;
124   PetscInt gl_perm_min;
125 
126   /* max segment size for gs_gop_vec() */
127   PetscInt vec_sz;
128 
129   /* hack to make paul happy */
130   MPI_Comm gs_comm;
131 
132 } gs_id;
133 
134 static gs_id *gsi_check_args(PetscInt *elms, PetscInt nel, PetscInt level);
135 static PetscErrorCode gsi_via_bit_mask(gs_id *gs);
136 static PetscErrorCode get_ngh_buf(gs_id *gs);
137 static PetscErrorCode set_pairwise(gs_id *gs);
138 static gs_id * gsi_new(void);
139 static PetscErrorCode set_tree(gs_id *gs);
140 
141 /* same for all but vector flavor */
142 static PetscErrorCode gs_gop_local_out(gs_id *gs, PetscScalar *vals);
143 /* vector flavor */
144 static PetscErrorCode gs_gop_vec_local_out(gs_id *gs, PetscScalar *vals, PetscInt step);
145 
146 static PetscErrorCode gs_gop_vec_plus(gs_id *gs, PetscScalar *in_vals, PetscInt step);
147 static PetscErrorCode gs_gop_vec_pairwise_plus(gs_id *gs, PetscScalar *in_vals, PetscInt step);
148 static PetscErrorCode gs_gop_vec_local_plus(gs_id *gs, PetscScalar *vals, PetscInt step);
149 static PetscErrorCode gs_gop_vec_local_in_plus(gs_id *gs, PetscScalar *vals, PetscInt step);
150 static PetscErrorCode gs_gop_vec_tree_plus(gs_id *gs, PetscScalar *vals, PetscInt step);
151 
152 
153 static PetscErrorCode gs_gop_plus(gs_id *gs, PetscScalar *in_vals);
154 static PetscErrorCode gs_gop_pairwise_plus(gs_id *gs, PetscScalar *in_vals);
155 static PetscErrorCode gs_gop_local_plus(gs_id *gs, PetscScalar *vals);
156 static PetscErrorCode gs_gop_local_in_plus(gs_id *gs, PetscScalar *vals);
157 static PetscErrorCode gs_gop_tree_plus(gs_id *gs, PetscScalar *vals);
158 
159 static PetscErrorCode gs_gop_plus_hc(gs_id *gs, PetscScalar *in_vals, PetscInt dim);
160 static PetscErrorCode gs_gop_pairwise_plus_hc(gs_id *gs, PetscScalar *in_vals, PetscInt dim);
161 static PetscErrorCode gs_gop_tree_plus_hc(gs_id *gs, PetscScalar *vals, PetscInt dim);
162 
163 static PetscErrorCode gs_gop_times(gs_id *gs, PetscScalar *in_vals);
164 static PetscErrorCode gs_gop_pairwise_times(gs_id *gs, PetscScalar *in_vals);
165 static PetscErrorCode gs_gop_local_times(gs_id *gs, PetscScalar *vals);
166 static PetscErrorCode gs_gop_local_in_times(gs_id *gs, PetscScalar *vals);
167 static PetscErrorCode gs_gop_tree_times(gs_id *gs, PetscScalar *vals);
168 
169 static PetscErrorCode gs_gop_min(gs_id *gs, PetscScalar *in_vals);
170 static PetscErrorCode gs_gop_pairwise_min(gs_id *gs, PetscScalar *in_vals);
171 static PetscErrorCode gs_gop_local_min(gs_id *gs, PetscScalar *vals);
172 static PetscErrorCode gs_gop_local_in_min(gs_id *gs, PetscScalar *vals);
173 static PetscErrorCode gs_gop_tree_min(gs_id *gs, PetscScalar *vals);
174 
175 static PetscErrorCode gs_gop_min_abs(gs_id *gs, PetscScalar *in_vals);
176 static PetscErrorCode gs_gop_pairwise_min_abs(gs_id *gs, PetscScalar *in_vals);
177 static PetscErrorCode gs_gop_local_min_abs(gs_id *gs, PetscScalar *vals);
178 static PetscErrorCode gs_gop_local_in_min_abs(gs_id *gs, PetscScalar *vals);
179 static PetscErrorCode gs_gop_tree_min_abs(gs_id *gs, PetscScalar *vals);
180 
181 static PetscErrorCode gs_gop_max(gs_id *gs, PetscScalar *in_vals);
182 static PetscErrorCode gs_gop_pairwise_max(gs_id *gs, PetscScalar *in_vals);
183 static PetscErrorCode gs_gop_local_max(gs_id *gs, PetscScalar *vals);
184 static PetscErrorCode gs_gop_local_in_max(gs_id *gs, PetscScalar *vals);
185 static PetscErrorCode gs_gop_tree_max(gs_id *gs, PetscScalar *vals);
186 
187 static PetscErrorCode gs_gop_max_abs(gs_id *gs, PetscScalar *in_vals);
188 static PetscErrorCode gs_gop_pairwise_max_abs(gs_id *gs, PetscScalar *in_vals);
189 static PetscErrorCode gs_gop_local_max_abs(gs_id *gs, PetscScalar *vals);
190 static PetscErrorCode gs_gop_local_in_max_abs(gs_id *gs, PetscScalar *vals);
191 static PetscErrorCode gs_gop_tree_max_abs(gs_id *gs, PetscScalar *vals);
192 
193 static PetscErrorCode gs_gop_exists(gs_id *gs, PetscScalar *in_vals);
194 static PetscErrorCode gs_gop_pairwise_exists(gs_id *gs, PetscScalar *in_vals);
195 static PetscErrorCode gs_gop_local_exists(gs_id *gs, PetscScalar *vals);
196 static PetscErrorCode gs_gop_local_in_exists(gs_id *gs, PetscScalar *vals);
197 static PetscErrorCode gs_gop_tree_exists(gs_id *gs, PetscScalar *vals);
198 
199 static PetscErrorCode gs_gop_pairwise_binary(gs_id *gs, PetscScalar *in_vals, rbfp fct);
200 static PetscErrorCode gs_gop_local_binary(gs_id *gs, PetscScalar *vals, rbfp fct);
201 static PetscErrorCode gs_gop_local_in_binary(gs_id *gs, PetscScalar *vals, rbfp fct);
202 static PetscErrorCode gs_gop_tree_binary(gs_id *gs, PetscScalar *vals, rbfp fct);
203 
204 
205 
206 /* global vars */
207 /* from comm.c module */
208 
209 static PetscInt num_gs_ids = 0;
210 
211 /* should make this dynamic ... later */
212 static PetscInt msg_buf=MAX_MSG_BUF;
213 static PetscInt vec_sz=GS_VEC_SZ;
214 static PetscInt *tree_buf=NULL;
215 static PetscInt tree_buf_sz=0;
216 static PetscInt ntree=0;
217 
218 
219 /******************************************************************************
220 Function: gs_init_()
221 
222 Input :
223 Output:
224 Return:
225 Description:
226 ******************************************************************************/
227 PetscErrorCode gs_init_vec_sz(PetscInt size)
228 {
229   PetscFunctionBegin;
230   vec_sz = size;
231   PetscFunctionReturn(0);
232 }
233 
234 /******************************************************************************
235 Function: gs_init_()
236 
237 Input :
238 Output:
239 Return:
240 Description:
241 ******************************************************************************/
242 PetscErrorCode gs_init_msg_buf_sz(PetscInt buf_size)
243 {
244   PetscFunctionBegin;
245   msg_buf = buf_size;
246   PetscFunctionReturn(0);
247 }
248 
249 /******************************************************************************
250 Function: gs_init()
251 
252 Input :
253 
254 Output:
255 
256 RETURN:
257 
258 Description:
259 ******************************************************************************/
260 gs_id *gs_init( PetscInt *elms, PetscInt nel, PetscInt level)
261 {
262    gs_id *gs;
263   MPI_Group gs_group;
264   MPI_Comm  gs_comm;
265 
266   PetscFunctionBegin;
267   /* ensure that communication package has been initialized */
268   comm_init();
269 
270 
271   /* determines if we have enough dynamic/semi-static memory */
272   /* checks input, allocs and sets gd_id template            */
273   gs = gsi_check_args(elms,nel,level);
274 
275   /* only bit mask version up and working for the moment    */
276   /* LATER :: get int list version working for sparse pblms */
277   gsi_via_bit_mask(gs);
278 
279 
280   MPI_Comm_group(MPI_COMM_WORLD,&gs_group);
281   MPI_Comm_create(MPI_COMM_WORLD,gs_group,&gs_comm);
282   gs->gs_comm=gs_comm;
283 
284   return(gs);
285 }
286 
287 
288 
289 /******************************************************************************
290 Function: gsi_new()
291 
292 Input :
293 Output:
294 Return:
295 Description:
296 
297 elm list must >= 0!!!
298 elm repeats allowed
299 ******************************************************************************/
300 static gs_id *gsi_new(void)
301 {
302   gs_id *gs;
303   gs = (gs_id *) malloc(sizeof(gs_id));
304   PetscMemzero(gs,sizeof(gs_id));
305   return(gs);
306 }
307 
308 
309 
310 /******************************************************************************
311 Function: gsi_check_args()
312 
313 Input :
314 Output:
315 Return:
316 Description:
317 
318 elm list must >= 0!!!
319 elm repeats allowed
320 local working copy of elms is sorted
321 ******************************************************************************/
322 static gs_id * gsi_check_args(PetscInt *in_elms, PetscInt nel, PetscInt level)
323 {
324    PetscInt i, j, k, t2;
325   PetscInt *companion, *elms, *unique, *iptr;
326   PetscInt num_local=0, *num_to_reduce, **local_reduce;
327   PetscInt oprs[] = {NON_UNIFORM,GL_MIN,GL_MAX,GL_ADD,GL_MIN,GL_MAX,GL_MIN,GL_B_AND};
328   PetscInt vals[sizeof(oprs)/sizeof(oprs[0])-1];
329   PetscInt work[sizeof(oprs)/sizeof(oprs[0])-1];
330   gs_id *gs;
331 
332 
333 
334   if (!in_elms)
335     {error_msg_fatal("elms point to nothing!!!\n");}
336 
337   if (nel<0)
338     {error_msg_fatal("can't have fewer than 0 elms!!!\n");}
339 
340   if (nel==0)
341     {error_msg_warning("I don't have any elements!!!\n");}
342 
343   /* get space for gs template */
344   gs = gsi_new();
345   gs->id = ++num_gs_ids;
346 
347   /* hmt 6.4.99                                            */
348   /* caller can set global ids that don't participate to 0 */
349   /* gs_init ignores all zeros in elm list                 */
350   /* negative global ids are still invalid                 */
351   for (i=j=0;i<nel;i++)
352     {if (in_elms[i]!=0) {j++;}}
353 
354   k=nel; nel=j;
355 
356   /* copy over in_elms list and create inverse map */
357   elms = (PetscInt*) malloc((nel+1)*sizeof(PetscInt));
358   companion = (PetscInt*) malloc(nel*sizeof(PetscInt));
359 
360   for (i=j=0;i<k;i++)
361     {
362       if (in_elms[i]!=0)
363         {elms[j] = in_elms[i]; companion[j++] = i;}
364     }
365 
366   if (j!=nel)
367     {error_msg_fatal("nel j mismatch!\n");}
368 
369   /* pre-pass ... check to see if sorted */
370   elms[nel] = INT_MAX;
371   iptr = elms;
372   unique = elms+1;
373   j=0;
374   while (*iptr!=INT_MAX)
375     {
376       if (*iptr++>*unique++)
377         {j=1; break;}
378     }
379 
380   /* set up inverse map */
381   if (j)
382     {
383       error_msg_warning("gsi_check_args() :: elm list *not* sorted!\n");
384       SMI_sort((void*)elms, (void*)companion, nel, SORT_INTEGER);
385     }
386   else
387     {error_msg_warning("gsi_check_args() :: elm list sorted!\n");}
388   elms[nel] = INT_MIN;
389 
390   /* first pass */
391   /* determine number of unique elements, check pd */
392   for (i=k=0;i<nel;i+=j)
393     {
394       t2 = elms[i];
395       j=++i;
396 
397       /* clump 'em for now */
398       while (elms[j]==t2) {j++;}
399 
400       /* how many together and num local */
401       if (j-=i)
402         {num_local++; k+=j;}
403     }
404 
405   /* how many unique elements? */
406   gs->repeats=k;
407   gs->nel = nel-k;
408 
409 
410   /* number of repeats? */
411   gs->num_local = num_local;
412   num_local+=2;
413   gs->local_reduce=local_reduce=(PetscInt **)malloc(num_local*sizeof(PetscInt*));
414   gs->num_local_reduce=num_to_reduce=(PetscInt*) malloc(num_local*sizeof(PetscInt));
415 
416   unique = (PetscInt*) malloc((gs->nel+1)*sizeof(PetscInt));
417   gs->elms = unique;
418   gs->nel_total = nel;
419   gs->local_elms = elms;
420   gs->companion = companion;
421 
422   /* compess map as well as keep track of local ops */
423   for (num_local=i=j=0;i<gs->nel;i++)
424     {
425       k=j;
426       t2 = unique[i] = elms[j];
427       companion[i] = companion[j];
428 
429       while (elms[j]==t2) {j++;}
430 
431       if ((t2=(j-k))>1)
432         {
433           /* number together */
434           num_to_reduce[num_local] = t2++;
435           iptr = local_reduce[num_local++] = (PetscInt*)malloc(t2*sizeof(PetscInt));
436 
437           /* to use binary searching don't remap until we check intersection */
438           *iptr++ = i;
439 
440           /* note that we're skipping the first one */
441           while (++k<j)
442             {*(iptr++) = companion[k];}
443           *iptr = -1;
444         }
445     }
446 
447   /* sentinel for ngh_buf */
448   unique[gs->nel]=INT_MAX;
449 
450   /* for two partition sort hack */
451   num_to_reduce[num_local] = 0;
452   local_reduce[num_local] = NULL;
453   num_to_reduce[++num_local] = 0;
454   local_reduce[num_local] = NULL;
455 
456   /* load 'em up */
457   /* note one extra to hold NON_UNIFORM flag!!! */
458   vals[2] = vals[1] = vals[0] = nel;
459   if (gs->nel>0)
460     {
461        vals[3] = unique[0];
462        vals[4] = unique[gs->nel-1];
463     }
464   else
465     {
466        vals[3] = INT_MAX;
467        vals[4] = INT_MIN;
468     }
469   vals[5] = level;
470   vals[6] = num_gs_ids;
471 
472   /* GLOBAL: send 'em out */
473   giop(vals,work,sizeof(oprs)/sizeof(oprs[0])-1,oprs);
474 
475   /* must be semi-pos def - only pairwise depends on this */
476   /* LATER - remove this restriction */
477   if (vals[3]<0)
478     {error_msg_fatal("gsi_check_args() :: system not semi-pos def ::%d\n",vals[3]);}
479 
480   if (vals[4]==INT_MAX)
481     {error_msg_fatal("gsi_check_args() :: system ub too large ::%d!\n",vals[4]);}
482 
483   gs->nel_min = vals[0];
484   gs->nel_max = vals[1];
485   gs->nel_sum = vals[2];
486   gs->gl_min  = vals[3];
487   gs->gl_max  = vals[4];
488   gs->negl    = vals[4]-vals[3]+1;
489 
490   if (gs->negl<=0)
491     {error_msg_fatal("gsi_check_args() :: system empty or neg :: %d\n",gs->negl);}
492 
493   /* LATER :: add level == -1 -> program selects level */
494   if (vals[5]<0)
495     {vals[5]=0;}
496   else if (vals[5]>num_nodes)
497     {vals[5]=num_nodes;}
498   gs->level = vals[5];
499 
500   return(gs);
501 }
502 
503 
504 /******************************************************************************
505 Function: gsi_via_bit_mask()
506 
507 Input :
508 Output:
509 Return:
510 Description:
511 
512 
513 ******************************************************************************/
514 static PetscErrorCode gsi_via_bit_mask(gs_id *gs)
515 {
516    PetscInt i, nel, *elms;
517   PetscInt t1;
518   PetscInt **reduce;
519   PetscInt *map;
520 
521   /* totally local removes ... ct_bits == 0 */
522   get_ngh_buf(gs);
523 
524   if (gs->level)
525     {set_pairwise(gs);}
526 
527   if (gs->max_left_over)
528     {set_tree(gs);}
529 
530   /* intersection local and pairwise/tree? */
531   gs->num_local_total = gs->num_local;
532   gs->gop_local_reduce = gs->local_reduce;
533   gs->num_gop_local_reduce = gs->num_local_reduce;
534 
535   map = gs->companion;
536 
537   /* is there any local compression */
538   if (!gs->num_local) {
539     gs->local_strength = NONE;
540     gs->num_local_gop = 0;
541   } else {
542       /* ok find intersection */
543       map = gs->companion;
544       reduce = gs->local_reduce;
545       for (i=0, t1=0; i<gs->num_local; i++, reduce++)
546         {
547           if ((ivec_binary_search(**reduce,gs->pw_elm_list,gs->len_pw_list)>=0)
548               ||
549               ivec_binary_search(**reduce,gs->tree_map_in,gs->tree_map_sz)>=0)
550             {
551               /* printf("C%d :: i=%d, **reduce=%d\n",my_id,i,**reduce); */
552               t1++;
553               if (gs->num_local_reduce[i]<=0)
554                 {error_msg_fatal("nobody in list?");}
555               gs->num_local_reduce[i] *= -1;
556             }
557            **reduce=map[**reduce];
558         }
559 
560       /* intersection is empty */
561       if (!t1)
562         {
563           gs->local_strength = FULL;
564           gs->num_local_gop = 0;
565         }
566       /* intersection not empty */
567       else
568         {
569           gs->local_strength = PARTIAL;
570           SMI_sort((void*)gs->num_local_reduce, (void*)gs->local_reduce,
571                    gs->num_local + 1, SORT_INT_PTR);
572 
573           gs->num_local_gop = t1;
574           gs->num_local_total =  gs->num_local;
575           gs->num_local    -= t1;
576           gs->gop_local_reduce = gs->local_reduce;
577           gs->num_gop_local_reduce = gs->num_local_reduce;
578 
579           for (i=0; i<t1; i++)
580             {
581               if (gs->num_gop_local_reduce[i]>=0)
582                 {error_msg_fatal("they aren't negative?");}
583               gs->num_gop_local_reduce[i] *= -1;
584               gs->local_reduce++;
585               gs->num_local_reduce++;
586             }
587           gs->local_reduce++;
588           gs->num_local_reduce++;
589         }
590     }
591 
592   elms = gs->pw_elm_list;
593   nel  = gs->len_pw_list;
594   for (i=0; i<nel; i++)
595     {elms[i] = map[elms[i]];}
596 
597   elms = gs->tree_map_in;
598   nel  = gs->tree_map_sz;
599   for (i=0; i<nel; i++)
600     {elms[i] = map[elms[i]];}
601 
602   /* clean up */
603   free((void*) gs->local_elms);
604   free((void*) gs->companion);
605   free((void*) gs->elms);
606   free((void*) gs->ngh_buf);
607   gs->local_elms = gs->companion = gs->elms = gs->ngh_buf = NULL;
608   PetscFunctionReturn(0);
609 }
610 
611 
612 
613 /******************************************************************************
614 Function: place_in_tree()
615 
616 Input :
617 Output:
618 Return:
619 Description:
620 
621 
622 ******************************************************************************/
623 static PetscErrorCode place_in_tree( PetscInt elm)
624 {
625    PetscInt *tp, n;
626 
627   PetscFunctionBegin;
628   if (ntree==tree_buf_sz)
629     {
630       if (tree_buf_sz)
631         {
632           tp = tree_buf;
633           n = tree_buf_sz;
634           tree_buf_sz<<=1;
635           tree_buf = (PetscInt*)malloc(tree_buf_sz*sizeof(PetscInt));
636           ivec_copy(tree_buf,tp,n);
637           free(tp);
638         }
639       else
640         {
641           tree_buf_sz = TREE_BUF_SZ;
642           tree_buf = (PetscInt*)malloc(tree_buf_sz*sizeof(PetscInt));
643         }
644     }
645 
646   tree_buf[ntree++] = elm;
647   PetscFunctionReturn(0);
648 }
649 
650 
651 
652 /******************************************************************************
653 Function: get_ngh_buf()
654 
655 Input :
656 Output:
657 Return:
658 Description:
659 
660 
661 ******************************************************************************/
662 static PetscErrorCode get_ngh_buf(gs_id *gs)
663 {
664    PetscInt i, j, npw=0, ntree_map=0;
665   PetscInt p_mask_size, ngh_buf_size, buf_size;
666   PetscInt *p_mask, *sh_proc_mask, *pw_sh_proc_mask;
667   PetscInt *ngh_buf, *buf1, *buf2;
668   PetscInt offset, per_load, num_loads, or_ct, start, end;
669   PetscInt *ptr1, *ptr2, i_start, negl, nel, *elms;
670   PetscInt oper=GL_B_OR;
671   PetscInt *ptr3, *t_mask, level, ct1, ct2;
672 
673   PetscFunctionBegin;
674   /* to make life easier */
675   nel   = gs->nel;
676   elms  = gs->elms;
677   level = gs->level;
678 
679   /* det #bytes needed for processor bit masks and init w/mask cor. to my_id */
680   p_mask = (PetscInt*) malloc(p_mask_size=len_bit_mask(num_nodes));
681   set_bit_mask(p_mask,p_mask_size,my_id);
682 
683   /* allocate space for masks and info bufs */
684   gs->nghs = sh_proc_mask = (PetscInt*) malloc(p_mask_size);
685   gs->pw_nghs = pw_sh_proc_mask = (PetscInt*) malloc(p_mask_size);
686   gs->ngh_buf_sz = ngh_buf_size = p_mask_size*nel;
687   t_mask = (PetscInt*) malloc(p_mask_size);
688   gs->ngh_buf = ngh_buf = (PetscInt*) malloc(ngh_buf_size);
689 
690   /* comm buffer size ... memory usage bounded by ~2*msg_buf */
691   /* had thought I could exploit rendezvous threshold */
692 
693   /* default is one pass */
694   per_load = negl  = gs->negl;
695   gs->num_loads = num_loads = 1;
696   i=p_mask_size*negl;
697 
698   /* possible overflow on buffer size */
699   /* overflow hack                    */
700   if (i<0) {i=INT_MAX;}
701 
702   buf_size = PetscMin(msg_buf,i);
703 
704   /* can we do it? */
705   if (p_mask_size>buf_size)
706     {error_msg_fatal("get_ngh_buf() :: buf<pms :: %d>%d\n",p_mask_size,buf_size);}
707 
708   /* get giop buf space ... make *only* one malloc */
709   buf1 = (PetscInt*) malloc(buf_size<<1);
710 
711   /* more than one gior exchange needed? */
712   if (buf_size!=i)
713     {
714       per_load = buf_size/p_mask_size;
715       buf_size = per_load*p_mask_size;
716       gs->num_loads = num_loads = negl/per_load + (negl%per_load>0);
717     }
718 
719 
720   /* convert buf sizes from #bytes to #ints - 32 bit only! */
721   p_mask_size/=sizeof(PetscInt); ngh_buf_size/=sizeof(PetscInt); buf_size/=sizeof(PetscInt);
722 
723   /* find giop work space */
724   buf2 = buf1+buf_size;
725 
726   /* hold #ints needed for processor masks */
727   gs->mask_sz=p_mask_size;
728 
729   /* init buffers */
730   ivec_zero(sh_proc_mask,p_mask_size);
731   ivec_zero(pw_sh_proc_mask,p_mask_size);
732   ivec_zero(ngh_buf,ngh_buf_size);
733 
734   /* HACK reset tree info */
735   tree_buf=NULL;
736   tree_buf_sz=ntree=0;
737 
738   /* ok do it */
739   for (ptr1=ngh_buf,ptr2=elms,end=gs->gl_min,or_ct=i=0; or_ct<num_loads; or_ct++)
740     {
741       /* identity for bitwise or is 000...000 */
742       ivec_zero(buf1,buf_size);
743 
744       /* load msg buffer */
745       for (start=end,end+=per_load,i_start=i; (offset=*ptr2)<end; i++, ptr2++)
746         {
747           offset = (offset-start)*p_mask_size;
748           ivec_copy(buf1+offset,p_mask,p_mask_size);
749         }
750 
751       /* GLOBAL: pass buffer */
752       giop(buf1,buf2,buf_size,&oper);
753 
754 
755       /* unload buffer into ngh_buf */
756       ptr2=(elms+i_start);
757       for(ptr3=buf1,j=start; j<end; ptr3+=p_mask_size,j++)
758         {
759           /* I own it ... may have to pairwise it */
760           if (j==*ptr2)
761             {
762               /* do i share it w/anyone? */
763               ct1 = ct_bits((char *)ptr3,p_mask_size*sizeof(PetscInt));
764               /* guess not */
765               if (ct1<2)
766                 {ptr2++; ptr1+=p_mask_size; continue;}
767 
768               /* i do ... so keep info and turn off my bit */
769               ivec_copy(ptr1,ptr3,p_mask_size);
770               ivec_xor(ptr1,p_mask,p_mask_size);
771               ivec_or(sh_proc_mask,ptr1,p_mask_size);
772 
773               /* is it to be done pairwise? */
774               if (--ct1<=level)
775                 {
776                   npw++;
777 
778                   /* turn on high bit to indicate pw need to process */
779                   *ptr2++ |= TOP_BIT;
780                   ivec_or(pw_sh_proc_mask,ptr1,p_mask_size);
781                   ptr1+=p_mask_size;
782                   continue;
783                 }
784 
785               /* get set for next and note that I have a tree contribution */
786               /* could save exact elm index for tree here -> save a search */
787               ptr2++; ptr1+=p_mask_size; ntree_map++;
788             }
789           /* i don't but still might be involved in tree */
790           else
791             {
792 
793               /* shared by how many? */
794               ct1 = ct_bits((char *)ptr3,p_mask_size*sizeof(PetscInt));
795 
796               /* none! */
797               if (ct1<2)
798                 {continue;}
799 
800               /* is it going to be done pairwise? but not by me of course!*/
801               if (--ct1<=level)
802                 {continue;}
803             }
804           /* LATER we're going to have to process it NOW */
805           /* nope ... tree it */
806           place_in_tree(j);
807         }
808     }
809 
810   free((void*)t_mask);
811   free((void*)buf1);
812 
813   gs->len_pw_list=npw;
814   gs->num_nghs = ct_bits((char *)sh_proc_mask,p_mask_size*sizeof(PetscInt));
815 
816   /* expand from bit mask list to int list and save ngh list */
817   gs->nghs = (PetscInt*) malloc(gs->num_nghs * sizeof(PetscInt));
818   bm_to_proc((char *)sh_proc_mask,p_mask_size*sizeof(PetscInt),gs->nghs);
819 
820   gs->num_pw_nghs = ct_bits((char *)pw_sh_proc_mask,p_mask_size*sizeof(PetscInt));
821 
822   oper = GL_MAX;
823   ct1 = gs->num_nghs;
824   giop(&ct1,&ct2,1,&oper);
825   gs->max_nghs = ct1;
826 
827   gs->tree_map_sz  = ntree_map;
828   gs->max_left_over=ntree;
829 
830   free((void*)p_mask);
831   free((void*)sh_proc_mask);
832   PetscFunctionReturn(0);
833 }
834 
835 
836 
837 
838 
839 /******************************************************************************
840 Function: pairwise_init()
841 
842 Input :
843 Output:
844 Return:
845 Description:
846 
847 if an element is shared by fewer that level# of nodes do pairwise exch
848 ******************************************************************************/
849 static PetscErrorCode set_pairwise(gs_id *gs)
850 {
851    PetscInt i, j;
852   PetscInt p_mask_size;
853   PetscInt *p_mask, *sh_proc_mask, *tmp_proc_mask;
854   PetscInt *ngh_buf, *buf2;
855   PetscInt offset;
856   PetscInt *msg_list, *msg_size, **msg_nodes, nprs;
857   PetscInt *pairwise_elm_list, len_pair_list=0;
858   PetscInt *iptr, t1, i_start, nel, *elms;
859   PetscInt ct;
860 
861 
862   PetscFunctionBegin;
863   /* to make life easier */
864   nel  = gs->nel;
865   elms = gs->elms;
866   ngh_buf = gs->ngh_buf;
867   sh_proc_mask  = gs->pw_nghs;
868 
869   /* need a few temp masks */
870   p_mask_size   = len_bit_mask(num_nodes);
871   p_mask        = (PetscInt*) malloc(p_mask_size);
872   tmp_proc_mask = (PetscInt*) malloc(p_mask_size);
873 
874   /* set mask to my my_id's bit mask */
875   set_bit_mask(p_mask,p_mask_size,my_id);
876 
877   p_mask_size /= sizeof(PetscInt);
878 
879   len_pair_list=gs->len_pw_list;
880   gs->pw_elm_list=pairwise_elm_list=(PetscInt*)malloc((len_pair_list+1)*sizeof(PetscInt));
881 
882   /* how many processors (nghs) do we have to exchange with? */
883   nprs=gs->num_pairs=ct_bits((char *)sh_proc_mask,p_mask_size*sizeof(PetscInt));
884 
885 
886   /* allocate space for gs_gop() info */
887   gs->pair_list = msg_list = (PetscInt *)  malloc(sizeof(PetscInt)*nprs);
888   gs->msg_sizes = msg_size  = (PetscInt *)  malloc(sizeof(PetscInt)*nprs);
889   gs->node_list = msg_nodes = (PetscInt **) malloc(sizeof(PetscInt*)*(nprs+1));
890 
891   /* init msg_size list */
892   ivec_zero(msg_size,nprs);
893 
894   /* expand from bit mask list to int list */
895   bm_to_proc((char *)sh_proc_mask,p_mask_size*sizeof(PetscInt),msg_list);
896 
897   /* keep list of elements being handled pairwise */
898   for (i=j=0;i<nel;i++)
899     {
900       if (elms[i] & TOP_BIT)
901         {elms[i] ^= TOP_BIT; pairwise_elm_list[j++] = i;}
902     }
903   pairwise_elm_list[j] = -1;
904 
905   gs->msg_ids_out = (MPI_Request *)  malloc(sizeof(MPI_Request)*(nprs+1));
906   gs->msg_ids_out[nprs] = MPI_REQUEST_NULL;
907   gs->msg_ids_in = (MPI_Request *)  malloc(sizeof(MPI_Request)*(nprs+1));
908   gs->msg_ids_in[nprs] = MPI_REQUEST_NULL;
909   gs->pw_vals = (PetscScalar *) malloc(sizeof(PetscScalar)*len_pair_list*vec_sz);
910 
911   /* find who goes to each processor */
912   for (i_start=i=0;i<nprs;i++)
913     {
914       /* processor i's mask */
915       set_bit_mask(p_mask,p_mask_size*sizeof(PetscInt),msg_list[i]);
916 
917       /* det # going to processor i */
918       for (ct=j=0;j<len_pair_list;j++)
919         {
920           buf2 = ngh_buf+(pairwise_elm_list[j]*p_mask_size);
921           ivec_and3(tmp_proc_mask,p_mask,buf2,p_mask_size);
922           if (ct_bits((char *)tmp_proc_mask,p_mask_size*sizeof(PetscInt)))
923             {ct++;}
924         }
925       msg_size[i] = ct;
926       i_start = PetscMax(i_start,ct);
927 
928       /*space to hold nodes in message to first neighbor */
929       msg_nodes[i] = iptr = (PetscInt*) malloc(sizeof(PetscInt)*(ct+1));
930 
931       for (j=0;j<len_pair_list;j++)
932         {
933           buf2 = ngh_buf+(pairwise_elm_list[j]*p_mask_size);
934           ivec_and3(tmp_proc_mask,p_mask,buf2,p_mask_size);
935           if (ct_bits((char *)tmp_proc_mask,p_mask_size*sizeof(PetscInt)))
936             {*iptr++ = j;}
937         }
938       *iptr = -1;
939     }
940   msg_nodes[nprs] = NULL;
941 
942   j=gs->loc_node_pairs=i_start;
943   t1 = GL_MAX;
944   giop(&i_start,&offset,1,&t1);
945   gs->max_node_pairs = i_start;
946 
947   i_start=j;
948   t1 = GL_MIN;
949   giop(&i_start,&offset,1,&t1);
950   gs->min_node_pairs = i_start;
951 
952   i_start=j;
953   t1 = GL_ADD;
954   giop(&i_start,&offset,1,&t1);
955   gs->avg_node_pairs = i_start/num_nodes + 1;
956 
957   i_start=nprs;
958   t1 = GL_MAX;
959   giop(&i_start,&offset,1,&t1);
960   gs->max_pairs = i_start;
961 
962 
963   /* remap pairwise in tail of gsi_via_bit_mask() */
964   gs->msg_total = ivec_sum(gs->msg_sizes,nprs);
965   gs->out = (PetscScalar *) malloc(sizeof(PetscScalar)*gs->msg_total*vec_sz);
966   gs->in  = (PetscScalar *) malloc(sizeof(PetscScalar)*gs->msg_total*vec_sz);
967 
968   /* reset malloc pool */
969   free((void*)p_mask);
970   free((void*)tmp_proc_mask);
971   PetscFunctionReturn(0);
972 }
973 
974 
975 
976 /******************************************************************************
977 Function: set_tree()
978 
979 Input :
980 Output:
981 Return:
982 Description:
983 
984 to do pruned tree just save ngh buf copy for each one and decode here!
985 ******************************************************************************/
986 static PetscErrorCode set_tree(gs_id *gs)
987 {
988    PetscInt i, j, n, nel;
989    PetscInt *iptr_in, *iptr_out, *tree_elms, *elms;
990 
991   PetscFunctionBegin;
992   /* local work ptrs */
993   elms = gs->elms;
994   nel     = gs->nel;
995 
996   /* how many via tree */
997   gs->tree_nel  = n = ntree;
998   gs->tree_elms = tree_elms = iptr_in = tree_buf;
999   gs->tree_buf  = (PetscScalar *) malloc(sizeof(PetscScalar)*n*vec_sz);
1000   gs->tree_work = (PetscScalar *) malloc(sizeof(PetscScalar)*n*vec_sz);
1001   j=gs->tree_map_sz;
1002   gs->tree_map_in = iptr_in  = (PetscInt*) malloc(sizeof(PetscInt)*(j+1));
1003   gs->tree_map_out = iptr_out = (PetscInt*) malloc(sizeof(PetscInt)*(j+1));
1004 
1005   /* search the longer of the two lists */
1006   /* note ... could save this info in get_ngh_buf and save searches */
1007   if (n<=nel)
1008     {
1009       /* bijective fct w/remap - search elm list */
1010       for (i=0; i<n; i++)
1011         {
1012           if ((j=ivec_binary_search(*tree_elms++,elms,nel))>=0)
1013             {*iptr_in++ = j; *iptr_out++ = i;}
1014         }
1015     }
1016   else
1017     {
1018       for (i=0; i<nel; i++)
1019         {
1020           if ((j=ivec_binary_search(*elms++,tree_elms,n))>=0)
1021             {*iptr_in++ = i; *iptr_out++ = j;}
1022         }
1023     }
1024 
1025   /* sentinel */
1026   *iptr_in = *iptr_out = -1;
1027   PetscFunctionReturn(0);
1028 }
1029 
1030 
1031 /******************************************************************************
1032 Function: gather_scatter
1033 
1034 Input :
1035 Output:
1036 Return:
1037 Description:
1038 ******************************************************************************/
1039 static PetscErrorCode gs_gop_local_out( gs_id *gs,  PetscScalar *vals)
1040 {
1041    PetscInt *num, *map, **reduce;
1042    PetscScalar tmp;
1043 
1044   PetscFunctionBegin;
1045   num    = gs->num_gop_local_reduce;
1046   reduce = gs->gop_local_reduce;
1047   while ((map = *reduce++))
1048     {
1049       /* wall */
1050       if (*num == 2)
1051         {
1052           num ++;
1053           vals[map[1]] = vals[map[0]];
1054         }
1055       /* corner shared by three elements */
1056       else if (*num == 3)
1057         {
1058           num ++;
1059           vals[map[2]] = vals[map[1]] = vals[map[0]];
1060         }
1061       /* corner shared by four elements */
1062       else if (*num == 4)
1063         {
1064           num ++;
1065           vals[map[3]] = vals[map[2]] = vals[map[1]] = vals[map[0]];
1066         }
1067       /* general case ... odd geoms ... 3D*/
1068       else
1069         {
1070           num++;
1071           tmp = *(vals + *map++);
1072           while (*map >= 0)
1073             {*(vals + *map++) = tmp;}
1074         }
1075     }
1076   PetscFunctionReturn(0);
1077 }
1078 
1079 
1080 
1081 /******************************************************************************
1082 Function: gather_scatter
1083 
1084 Input :
1085 Output:
1086 Return:
1087 Description:
1088 ******************************************************************************/
1089 PetscErrorCode gs_gop_binary(gs_ADT gs, PetscScalar *vals, rbfp fct)
1090 {
1091   PetscFunctionBegin;
1092   /* local only operations!!! */
1093   if (gs->num_local)
1094     {gs_gop_local_binary(gs,vals,fct);}
1095 
1096   /* if intersection tree/pairwise and local isn't empty */
1097   if (gs->num_local_gop)
1098     {
1099       gs_gop_local_in_binary(gs,vals,fct);
1100 
1101       /* pairwise */
1102       if (gs->num_pairs)
1103         {gs_gop_pairwise_binary(gs,vals,fct);}
1104 
1105       /* tree */
1106       else if (gs->max_left_over)
1107         {gs_gop_tree_binary(gs,vals,fct);}
1108 
1109       gs_gop_local_out(gs,vals);
1110     }
1111   /* if intersection tree/pairwise and local is empty */
1112   else
1113     {
1114       /* pairwise */
1115       if (gs->num_pairs)
1116         {gs_gop_pairwise_binary(gs,vals,fct);}
1117 
1118       /* tree */
1119       else if (gs->max_left_over)
1120         {gs_gop_tree_binary(gs,vals,fct);}
1121     }
1122   PetscFunctionReturn(0);
1123 }
1124 
1125 
1126 
1127 /******************************************************************************
1128 Function: gather_scatter
1129 
1130 Input :
1131 Output:
1132 Return:
1133 Description:
1134 ******************************************************************************/
1135 static PetscErrorCode gs_gop_local_binary( gs_id *gs,  PetscScalar *vals,  rbfp fct)
1136 {
1137    PetscInt *num, *map, **reduce;
1138   PetscScalar tmp;
1139 
1140   PetscFunctionBegin;
1141   num    = gs->num_local_reduce;
1142   reduce = gs->local_reduce;
1143   while ((map = *reduce))
1144     {
1145       num ++;
1146       (*fct)(&tmp,NULL,1);
1147       /* tmp = 0.0; */
1148       while (*map >= 0)
1149         {(*fct)(&tmp,(vals + *map),1); map++;}
1150         /*        {tmp = (*fct)(tmp,*(vals + *map)); map++;} */
1151 
1152       map = *reduce++;
1153       while (*map >= 0)
1154         {*(vals + *map++) = tmp;}
1155     }
1156   PetscFunctionReturn(0);
1157 }
1158 
1159 
1160 
1161 /******************************************************************************
1162 Function: gather_scatter
1163 
1164 Input :
1165 Output:
1166 Return:
1167 Description:
1168 ******************************************************************************/
1169 static PetscErrorCode gs_gop_local_in_binary( gs_id *gs,  PetscScalar *vals,  rbfp fct)
1170 {
1171    PetscInt *num, *map, **reduce;
1172    PetscScalar *base;
1173 
1174   PetscFunctionBegin;
1175   num    = gs->num_gop_local_reduce;
1176 
1177   reduce = gs->gop_local_reduce;
1178   while ((map = *reduce++))
1179     {
1180       num++;
1181       base = vals + *map++;
1182       while (*map >= 0)
1183         {(*fct)(base,(vals + *map),1); map++;}
1184     }
1185   PetscFunctionReturn(0);
1186 }
1187 
1188 
1189 
1190 /******************************************************************************
1191 Function: gather_scatter
1192 
1193 VERSION 3 ::
1194 
1195 Input :
1196 Output:
1197 Return:
1198 Description:
1199 ******************************************************************************/
1200 static PetscErrorCode gs_gop_pairwise_binary( gs_id *gs,  PetscScalar *in_vals,rbfp fct)
1201 {
1202   PetscScalar    *dptr1, *dptr2, *dptr3, *in1, *in2;
1203   PetscInt            *iptr, *msg_list, *msg_size, **msg_nodes;
1204   PetscInt            *pw, *list, *size, **nodes;
1205   MPI_Request    *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
1206   MPI_Status     status;
1207   PetscErrorCode ierr;
1208 
1209   PetscFunctionBegin;
1210   /* strip and load s */
1211   msg_list =list         = gs->pair_list;
1212   msg_size =size         = gs->msg_sizes;
1213   msg_nodes=nodes        = gs->node_list;
1214   iptr=pw                = gs->pw_elm_list;
1215   dptr1=dptr3            = gs->pw_vals;
1216   msg_ids_in  = ids_in   = gs->msg_ids_in;
1217   msg_ids_out = ids_out  = gs->msg_ids_out;
1218   dptr2                  = gs->out;
1219   in1=in2                = gs->in;
1220 
1221   /* post the receives */
1222   /*  msg_nodes=nodes; */
1223   do
1224     {
1225       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
1226          second one *list and do list++ afterwards */
1227       ierr = MPI_Irecv(in1, *size, MPIU_SCALAR, MPI_ANY_SOURCE, MSGTAG1 + *list++, gs->gs_comm, msg_ids_in++);CHKERRQ(ierr);
1228       in1 += *size++;
1229     }
1230   while (*++msg_nodes);
1231   msg_nodes=nodes;
1232 
1233   /* load gs values into in out gs buffers */
1234   while (*iptr >= 0)
1235     {*dptr3++ = *(in_vals + *iptr++);}
1236 
1237   /* load out buffers and post the sends */
1238   while ((iptr = *msg_nodes++))
1239     {
1240       dptr3 = dptr2;
1241       while (*iptr >= 0)
1242         {*dptr2++ = *(dptr1 + *iptr++);}
1243       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
1244       /* is msg_ids_out++ correct? */
1245       ierr = MPI_Isend(dptr3, *msg_size++, MPIU_SCALAR, *msg_list++, MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);CHKERRQ(ierr);
1246     }
1247 
1248   if (gs->max_left_over)
1249     {gs_gop_tree_binary(gs,in_vals,fct);}
1250 
1251   /* process the received data */
1252   msg_nodes=nodes;
1253   while ((iptr = *nodes++))
1254     {
1255       /* Should I check the return value of MPI_Wait() or status? */
1256       /* Can this loop be replaced by a call to MPI_Waitall()? */
1257       ierr = MPI_Wait(ids_in++, &status);CHKERRQ(ierr);
1258       while (*iptr >= 0)
1259         {(*fct)((dptr1 + *iptr),in2,1); iptr++; in2++;}
1260       /* {*(dptr1 + *iptr) = (*fct)(*(dptr1 + *iptr),*in2); iptr++; in2++;} */
1261     }
1262 
1263   /* replace vals */
1264   while (*pw >= 0)
1265     {*(in_vals + *pw++) = *dptr1++;}
1266 
1267   /* clear isend message handles */
1268   /* This changed for clarity though it could be the same */
1269   while (*msg_nodes++)
1270     /* Should I check the return value of MPI_Wait() or status? */
1271     /* Can this loop be replaced by a call to MPI_Waitall()? */
1272     {ierr = MPI_Wait(ids_out++, &status);CHKERRQ(ierr);}
1273   PetscFunctionReturn(0);
1274 }
1275 
1276 
1277 
1278 /******************************************************************************
1279 Function: gather_scatter
1280 
1281 Input :
1282 Output:
1283 Return:
1284 Description:
1285 ******************************************************************************/
1286 static PetscErrorCode gs_gop_tree_binary(gs_id *gs, PetscScalar *vals,  rbfp fct)
1287 {
1288   PetscInt         size;
1289   PetscInt         *in,    *out;
1290   PetscScalar *buf, *work;
1291 
1292   PetscFunctionBegin;
1293   in   = gs->tree_map_in;
1294   out  = gs->tree_map_out;
1295   buf  = gs->tree_buf;
1296   work = gs->tree_work;
1297   size = gs->tree_nel;
1298 
1299   /* load vals vector w/identity */
1300   (*fct)(buf,NULL,size);
1301 
1302   /* load my contribution into val vector */
1303   while (*in >= 0) {
1304     (*fct)((buf + *out++),(vals + *in++),-1);
1305   }
1306 
1307   gfop(buf,work,size,(vbfp)fct,MPIU_SCALAR,0);
1308 
1309   in   = gs->tree_map_in;
1310   out  = gs->tree_map_out;
1311   while (*in >= 0) {
1312     (*fct)((vals + *in++),(buf + *out++),-1);
1313   }
1314   PetscFunctionReturn(0);
1315 }
1316 
1317 
1318 
1319 
1320 /******************************************************************************
1321 Function: gather_scatter
1322 
1323 Input :
1324 Output:
1325 Return:
1326 Description:
1327 ******************************************************************************/
1328 PetscErrorCode gs_gop( gs_id *gs,  PetscScalar *vals,  const char *op)
1329 {
1330   PetscFunctionBegin;
1331 
1332   switch (*op) {
1333   case '+':
1334     gs_gop_plus(gs,vals);
1335     break;
1336   case '*':
1337     gs_gop_times(gs,vals);
1338     break;
1339   case 'a':
1340     gs_gop_min_abs(gs,vals);
1341     break;
1342   case 'A':
1343     gs_gop_max_abs(gs,vals);
1344     break;
1345   case 'e':
1346     gs_gop_exists(gs,vals);
1347     break;
1348   case 'm':
1349     gs_gop_min(gs,vals);
1350     break;
1351   case 'M':
1352     gs_gop_max(gs,vals); break;
1353   default:
1354     error_msg_warning("gs_gop() :: %c is not a valid op",op[0]);
1355     error_msg_warning("gs_gop() :: default :: plus");
1356     gs_gop_plus(gs,vals);
1357     break;
1358   }
1359   PetscFunctionReturn(0);
1360 }
1361 
1362 
1363 /******************************************************************************
1364 Function: gather_scatter
1365 
1366 Input :
1367 Output:
1368 Return:
1369 Description:
1370 ******************************************************************************/
1371 static PetscErrorCode gs_gop_exists( gs_id *gs,  PetscScalar *vals)
1372 {
1373   PetscFunctionBegin;
1374   /* local only operations!!! */
1375   if (gs->num_local)
1376     {gs_gop_local_exists(gs,vals);}
1377 
1378   /* if intersection tree/pairwise and local isn't empty */
1379   if (gs->num_local_gop)
1380     {
1381       gs_gop_local_in_exists(gs,vals);
1382 
1383       /* pairwise */
1384       if (gs->num_pairs)
1385         {gs_gop_pairwise_exists(gs,vals);}
1386 
1387       /* tree */
1388       else if (gs->max_left_over)
1389         {gs_gop_tree_exists(gs,vals);}
1390 
1391       gs_gop_local_out(gs,vals);
1392     }
1393   /* if intersection tree/pairwise and local is empty */
1394   else
1395     {
1396       /* pairwise */
1397       if (gs->num_pairs)
1398         {gs_gop_pairwise_exists(gs,vals);}
1399 
1400       /* tree */
1401       else if (gs->max_left_over)
1402         {gs_gop_tree_exists(gs,vals);}
1403     }
1404   PetscFunctionReturn(0);
1405 }
1406 
1407 
1408 
1409 /******************************************************************************
1410 Function: gather_scatter
1411 
1412 Input :
1413 Output:
1414 Return:
1415 Description:
1416 ******************************************************************************/
1417 static PetscErrorCode gs_gop_local_exists( gs_id *gs,  PetscScalar *vals)
1418 {
1419    PetscInt         *num, *map, **reduce;
1420    PetscScalar tmp;
1421 
1422   PetscFunctionBegin;
1423   num    = gs->num_local_reduce;
1424   reduce = gs->local_reduce;
1425   while ((map = *reduce))
1426     {
1427       num ++;
1428       tmp = 0.0;
1429       while (*map >= 0)
1430         {tmp = EXISTS(tmp,*(vals + *map)); map++;}
1431 
1432       map = *reduce++;
1433       while (*map >= 0)
1434         {*(vals + *map++) = tmp;}
1435     }
1436   PetscFunctionReturn(0);
1437 }
1438 
1439 /******************************************************************************/
1440 static PetscErrorCode gs_gop_local_in_exists( gs_id *gs,  PetscScalar *vals)
1441 {
1442   PetscInt         *num, *map, **reduce;
1443   PetscScalar *base;
1444 
1445   PetscFunctionBegin;
1446   num    = gs->num_gop_local_reduce;
1447   reduce = gs->gop_local_reduce;
1448   while ((map = *reduce++))
1449     {
1450       num++;
1451       base = vals + *map++;
1452       while (*map >= 0)
1453         {*base = EXISTS(*base,*(vals + *map)); map++;}
1454     }
1455   PetscFunctionReturn(0);
1456 }
1457 
1458 static PetscErrorCode gs_gop_pairwise_exists( gs_id *gs,  PetscScalar *in_vals)
1459 {
1460   PetscScalar    *dptr1, *dptr2, *dptr3, *in1, *in2;
1461   PetscInt            *iptr, *msg_list, *msg_size, **msg_nodes;
1462   PetscInt            *pw, *list, *size, **nodes;
1463   MPI_Request    *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
1464   MPI_Status     status;
1465   PetscErrorCode ierr;
1466 
1467   PetscFunctionBegin;
1468   /* strip and load s */
1469   msg_list =list         = gs->pair_list;
1470   msg_size =size         = gs->msg_sizes;
1471   msg_nodes=nodes        = gs->node_list;
1472   iptr=pw                = gs->pw_elm_list;
1473   dptr1=dptr3            = gs->pw_vals;
1474   msg_ids_in  = ids_in   = gs->msg_ids_in;
1475   msg_ids_out = ids_out  = gs->msg_ids_out;
1476   dptr2                  = gs->out;
1477   in1=in2                = gs->in;
1478 
1479   /* post the receives */
1480   do
1481     {
1482       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
1483          second one *list and do list++ afterwards */
1484       ierr = MPI_Irecv(in1, *size, MPIU_SCALAR, MPI_ANY_SOURCE, MSGTAG1 + *list++, gs->gs_comm, msg_ids_in++);CHKERRQ(ierr);
1485       in1 += *size++;
1486     }
1487   while (*++msg_nodes);
1488   msg_nodes=nodes;
1489 
1490   /* load gs values into in out gs buffers */
1491   while (*iptr >= 0)
1492     {*dptr3++ = *(in_vals + *iptr++);}
1493 
1494   /* load out buffers and post the sends */
1495   while ((iptr = *msg_nodes++))
1496     {
1497       dptr3 = dptr2;
1498       while (*iptr >= 0)
1499         {*dptr2++ = *(dptr1 + *iptr++);}
1500       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
1501       /* is msg_ids_out++ correct? */
1502       ierr = MPI_Isend(dptr3, *msg_size++, MPIU_SCALAR, *msg_list++, MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);CHKERRQ(ierr);
1503     }
1504 
1505   if (gs->max_left_over)
1506     {gs_gop_tree_exists(gs,in_vals);}
1507 
1508   /* process the received data */
1509   msg_nodes=nodes;
1510   while ((iptr = *nodes++))
1511     {
1512       /* Should I check the return value of MPI_Wait() or status? */
1513       /* Can this loop be replaced by a call to MPI_Waitall()? */
1514       ierr = MPI_Wait(ids_in++, &status);CHKERRQ(ierr);
1515       while (*iptr >= 0)
1516         {*(dptr1 + *iptr) = EXISTS(*(dptr1 + *iptr),*in2); iptr++; in2++;}
1517     }
1518 
1519   /* replace vals */
1520   while (*pw >= 0)
1521     {*(in_vals + *pw++) = *dptr1++;}
1522 
1523   /* clear isend message handles */
1524   /* This changed for clarity though it could be the same */
1525   while (*msg_nodes++)
1526     /* Should I check the return value of MPI_Wait() or status? */
1527     /* Can this loop be replaced by a call to MPI_Waitall()? */
1528     {ierr = MPI_Wait(ids_out++, &status);CHKERRQ(ierr);}
1529   PetscFunctionReturn(0);
1530 }
1531 /******************************************************************************/
1532 static PetscErrorCode gs_gop_tree_exists(gs_id *gs, PetscScalar *vals)
1533 {
1534   PetscInt         size;
1535   PetscInt         *in, *out;
1536   PetscScalar *buf, *work;
1537   PetscInt         op[] = {GL_EXISTS,0};
1538 
1539   PetscFunctionBegin;
1540   in   = gs->tree_map_in;
1541   out  = gs->tree_map_out;
1542   buf  = gs->tree_buf;
1543   work = gs->tree_work;
1544   size = gs->tree_nel;
1545 
1546   rvec_zero(buf,size);
1547 
1548   while (*in >= 0)
1549     {
1550       /*
1551       printf("%d :: out=%d\n",my_id,*out);
1552       printf("%d :: in=%d\n",my_id,*in);
1553       */
1554       *(buf + *out++) = *(vals + *in++);
1555     }
1556 
1557   grop(buf,work,size,op);
1558 
1559   in   = gs->tree_map_in;
1560   out  = gs->tree_map_out;
1561 
1562   while (*in >= 0)
1563     {*(vals + *in++) = *(buf + *out++);}
1564   PetscFunctionReturn(0);
1565 }
1566 
1567 /*******************************************************************************/
1568 static PetscErrorCode gs_gop_max_abs( gs_id *gs,  PetscScalar *vals)
1569 {
1570   PetscFunctionBegin;
1571   /* local only operations!!! */
1572   if (gs->num_local)
1573     {gs_gop_local_max_abs(gs,vals);}
1574 
1575   /* if intersection tree/pairwise and local isn't empty */
1576   if (gs->num_local_gop)
1577     {
1578       gs_gop_local_in_max_abs(gs,vals);
1579 
1580       /* pairwise */
1581       if (gs->num_pairs)
1582         {gs_gop_pairwise_max_abs(gs,vals);}
1583 
1584       /* tree */
1585       else if (gs->max_left_over)
1586         {gs_gop_tree_max_abs(gs,vals);}
1587 
1588       gs_gop_local_out(gs,vals);
1589     }
1590   /* if intersection tree/pairwise and local is empty */
1591   else
1592     {
1593       /* pairwise */
1594       if (gs->num_pairs)
1595         {gs_gop_pairwise_max_abs(gs,vals);}
1596 
1597       /* tree */
1598       else if (gs->max_left_over)
1599         {gs_gop_tree_max_abs(gs,vals);}
1600     }
1601   PetscFunctionReturn(0);
1602 }
1603 
1604 /******************************************************************************/
1605 static PetscErrorCode gs_gop_local_max_abs( gs_id *gs,  PetscScalar *vals)
1606 {
1607   PetscInt         *num, *map, **reduce;
1608   PetscScalar tmp;
1609 
1610   PetscFunctionBegin;
1611   num    = gs->num_local_reduce;
1612   reduce = gs->local_reduce;
1613   while ((map = *reduce))
1614     {
1615       num ++;
1616       tmp = 0.0;
1617       while (*map >= 0)
1618         {tmp = MAX_FABS(tmp,*(vals + *map)); map++;}
1619 
1620       map = *reduce++;
1621       while (*map >= 0)
1622         {*(vals + *map++) = tmp;}
1623     }
1624   PetscFunctionReturn(0);
1625 }
1626 
1627 /******************************************************************************/
1628 static PetscErrorCode gs_gop_local_in_max_abs( gs_id *gs,  PetscScalar *vals)
1629 {
1630   PetscInt         *num, *map, **reduce;
1631   PetscScalar *base;
1632 
1633   PetscFunctionBegin;
1634   num    = gs->num_gop_local_reduce;
1635   reduce = gs->gop_local_reduce;
1636   while ((map = *reduce++))
1637     {
1638       num++;
1639       base = vals + *map++;
1640       while (*map >= 0)
1641         {*base = MAX_FABS(*base,*(vals + *map)); map++;}
1642     }
1643   PetscFunctionReturn(0);
1644 }
1645 
1646 /******************************************************************************/
1647 static PetscErrorCode gs_gop_pairwise_max_abs( gs_id *gs,  PetscScalar *in_vals)
1648 {
1649   PetscScalar    *dptr1, *dptr2, *dptr3, *in1, *in2;
1650   PetscInt            *iptr, *msg_list, *msg_size, **msg_nodes;
1651   PetscInt            *pw, *list, *size, **nodes;
1652   MPI_Request    *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
1653   MPI_Status     status;
1654   PetscErrorCode ierr;
1655 
1656   PetscFunctionBegin;
1657   /* strip and load s */
1658   msg_list =list         = gs->pair_list;
1659   msg_size =size         = gs->msg_sizes;
1660   msg_nodes=nodes        = gs->node_list;
1661   iptr=pw                = gs->pw_elm_list;
1662   dptr1=dptr3            = gs->pw_vals;
1663   msg_ids_in  = ids_in   = gs->msg_ids_in;
1664   msg_ids_out = ids_out  = gs->msg_ids_out;
1665   dptr2                  = gs->out;
1666   in1=in2                = gs->in;
1667 
1668   /* post the receives */
1669   /*  msg_nodes=nodes; */
1670   do
1671     {
1672       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
1673          second one *list and do list++ afterwards */
1674       ierr = MPI_Irecv(in1, *size, MPIU_SCALAR, MPI_ANY_SOURCE, MSGTAG1 + *list++, gs->gs_comm, msg_ids_in++);CHKERRQ(ierr);
1675       in1 += *size++;
1676     }
1677   while (*++msg_nodes);
1678   msg_nodes=nodes;
1679 
1680   /* load gs values into in out gs buffers */
1681   while (*iptr >= 0)
1682     {*dptr3++ = *(in_vals + *iptr++);}
1683 
1684   /* load out buffers and post the sends */
1685   while ((iptr = *msg_nodes++))
1686     {
1687       dptr3 = dptr2;
1688       while (*iptr >= 0)
1689         {*dptr2++ = *(dptr1 + *iptr++);}
1690       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
1691       /* is msg_ids_out++ correct? */
1692       ierr = MPI_Isend(dptr3, *msg_size++, MPIU_SCALAR, *msg_list++, MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);CHKERRQ(ierr);
1693     }
1694 
1695   if (gs->max_left_over)
1696     {gs_gop_tree_max_abs(gs,in_vals);}
1697 
1698   /* process the received data */
1699   msg_nodes=nodes;
1700   while ((iptr = *nodes++))
1701     {
1702       /* Should I check the return value of MPI_Wait() or status? */
1703       /* Can this loop be replaced by a call to MPI_Waitall()? */
1704       ierr = MPI_Wait(ids_in++, &status);CHKERRQ(ierr);
1705       while (*iptr >= 0)
1706         {*(dptr1 + *iptr) = MAX_FABS(*(dptr1 + *iptr),*in2); iptr++; in2++;}
1707     }
1708 
1709   /* replace vals */
1710   while (*pw >= 0)
1711     {*(in_vals + *pw++) = *dptr1++;}
1712 
1713   /* clear isend message handles */
1714   /* This changed for clarity though it could be the same */
1715   while (*msg_nodes++)
1716     /* Should I check the return value of MPI_Wait() or status? */
1717     /* Can this loop be replaced by a call to MPI_Waitall()? */
1718     {ierr = MPI_Wait(ids_out++, &status);CHKERRQ(ierr);}
1719   PetscFunctionReturn(0);
1720 }
1721 
1722 /******************************************************************************/
1723 static PetscErrorCode gs_gop_tree_max_abs(gs_id *gs, PetscScalar *vals)
1724 {
1725   PetscInt         size;
1726   PetscInt         *in, *out;
1727   PetscScalar *buf, *work;
1728   PetscInt         op[] = {GL_MAX_ABS,0};
1729 
1730   PetscFunctionBegin;
1731   in   = gs->tree_map_in;
1732   out  = gs->tree_map_out;
1733   buf  = gs->tree_buf;
1734   work = gs->tree_work;
1735   size = gs->tree_nel;
1736 
1737   rvec_zero(buf,size);
1738 
1739   while (*in >= 0)
1740     {
1741       /*
1742       printf("%d :: out=%d\n",my_id,*out);
1743       printf("%d :: in=%d\n",my_id,*in);
1744       */
1745       *(buf + *out++) = *(vals + *in++);
1746     }
1747 
1748   grop(buf,work,size,op);
1749 
1750   in   = gs->tree_map_in;
1751   out  = gs->tree_map_out;
1752 
1753   while (*in >= 0)
1754     {*(vals + *in++) = *(buf + *out++);}
1755   PetscFunctionReturn(0);
1756 }
1757 
1758 /******************************************************************************/
1759 static PetscErrorCode gs_gop_max( gs_id *gs,  PetscScalar *vals)
1760 {
1761   PetscFunctionBegin;
1762   /* local only operations!!! */
1763   if (gs->num_local)
1764     {gs_gop_local_max(gs,vals);}
1765 
1766   /* if intersection tree/pairwise and local isn't empty */
1767   if (gs->num_local_gop)
1768     {
1769       gs_gop_local_in_max(gs,vals);
1770 
1771       /* pairwise */
1772       if (gs->num_pairs)
1773         {gs_gop_pairwise_max(gs,vals);}
1774 
1775       /* tree */
1776       else if (gs->max_left_over)
1777         {gs_gop_tree_max(gs,vals);}
1778 
1779       gs_gop_local_out(gs,vals);
1780     }
1781   /* if intersection tree/pairwise and local is empty */
1782   else
1783     {
1784       /* pairwise */
1785       if (gs->num_pairs)
1786         {gs_gop_pairwise_max(gs,vals);}
1787 
1788       /* tree */
1789       else if (gs->max_left_over)
1790         {gs_gop_tree_max(gs,vals);}
1791     }
1792   PetscFunctionReturn(0);
1793 }
1794 
1795 /******************************************************************************/
1796 static PetscErrorCode gs_gop_local_max( gs_id *gs,  PetscScalar *vals)
1797 {
1798   PetscInt         *num, *map, **reduce;
1799   PetscScalar tmp;
1800 
1801   PetscFunctionBegin;
1802   num    = gs->num_local_reduce;
1803   reduce = gs->local_reduce;
1804   while ((map = *reduce))
1805     {
1806       num ++;
1807       tmp = -REAL_MAX;
1808       while (*map >= 0)
1809         {tmp = PetscMax(tmp,*(vals + *map)); map++;}
1810 
1811       map = *reduce++;
1812       while (*map >= 0)
1813         {*(vals + *map++) = tmp;}
1814     }
1815   PetscFunctionReturn(0);
1816 }
1817 
1818 /******************************************************************************/
1819 static PetscErrorCode gs_gop_local_in_max( gs_id *gs,  PetscScalar *vals)
1820 {
1821   PetscInt         *num, *map, **reduce;
1822   PetscScalar *base;
1823 
1824   PetscFunctionBegin;
1825   num    = gs->num_gop_local_reduce;
1826   reduce = gs->gop_local_reduce;
1827   while ((map = *reduce++))
1828     {
1829       num++;
1830       base = vals + *map++;
1831       while (*map >= 0)
1832         {*base = PetscMax(*base,*(vals + *map)); map++;}
1833     }
1834   PetscFunctionReturn(0);
1835 }
1836 
1837 /******************************************************************************/
1838 static PetscErrorCode gs_gop_pairwise_max( gs_id *gs,  PetscScalar *in_vals)
1839 {
1840   PetscScalar    *dptr1, *dptr2, *dptr3, *in1, *in2;
1841   PetscInt            *iptr, *msg_list, *msg_size, **msg_nodes;
1842   PetscInt            *pw, *list, *size, **nodes;
1843   MPI_Request    *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
1844   MPI_Status     status;
1845   PetscErrorCode ierr;
1846 
1847   PetscFunctionBegin;
1848   /* strip and load s */
1849   msg_list =list         = gs->pair_list;
1850   msg_size =size         = gs->msg_sizes;
1851   msg_nodes=nodes        = gs->node_list;
1852   iptr=pw                = gs->pw_elm_list;
1853   dptr1=dptr3            = gs->pw_vals;
1854   msg_ids_in  = ids_in   = gs->msg_ids_in;
1855   msg_ids_out = ids_out  = gs->msg_ids_out;
1856   dptr2                  = gs->out;
1857   in1=in2                = gs->in;
1858 
1859   /* post the receives */
1860   /*  msg_nodes=nodes; */
1861   do
1862     {
1863       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
1864          second one *list and do list++ afterwards */
1865       ierr = MPI_Irecv(in1, *size, MPIU_SCALAR, MPI_ANY_SOURCE, MSGTAG1 + *list++, gs->gs_comm, msg_ids_in++);CHKERRQ(ierr);
1866       in1 += *size++;
1867     }
1868   while (*++msg_nodes);
1869   msg_nodes=nodes;
1870 
1871   /* load gs values into in out gs buffers */
1872   while (*iptr >= 0)
1873     {*dptr3++ = *(in_vals + *iptr++);}
1874 
1875   /* load out buffers and post the sends */
1876   while ((iptr = *msg_nodes++))
1877     {
1878       dptr3 = dptr2;
1879       while (*iptr >= 0)
1880         {*dptr2++ = *(dptr1 + *iptr++);}
1881       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
1882       /* is msg_ids_out++ correct? */
1883       ierr = MPI_Isend(dptr3, *msg_size++, MPIU_SCALAR, *msg_list++, MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);CHKERRQ(ierr);
1884     }
1885 
1886   if (gs->max_left_over)
1887     {gs_gop_tree_max(gs,in_vals);}
1888 
1889   /* process the received data */
1890   msg_nodes=nodes;
1891   while ((iptr = *nodes++))
1892     {
1893       /* Should I check the return value of MPI_Wait() or status? */
1894       /* Can this loop be replaced by a call to MPI_Waitall()? */
1895       ierr = MPI_Wait(ids_in++, &status);CHKERRQ(ierr);
1896       while (*iptr >= 0)
1897         {*(dptr1 + *iptr) = PetscMax(*(dptr1 + *iptr),*in2); iptr++; in2++;}
1898     }
1899 
1900   /* replace vals */
1901   while (*pw >= 0)
1902     {*(in_vals + *pw++) = *dptr1++;}
1903 
1904   /* clear isend message handles */
1905   /* This changed for clarity though it could be the same */
1906   while (*msg_nodes++)
1907     /* Should I check the return value of MPI_Wait() or status? */
1908     /* Can this loop be replaced by a call to MPI_Waitall()? */
1909     {ierr = MPI_Wait(ids_out++, &status);CHKERRQ(ierr);}
1910   PetscFunctionReturn(0);
1911 }
1912 
1913 /******************************************************************************/
1914 static PetscErrorCode gs_gop_tree_max(gs_id *gs, PetscScalar *vals)
1915 {
1916   PetscInt            size;
1917   PetscInt            *in, *out;
1918   PetscScalar    *buf, *work;
1919   PetscErrorCode ierr;
1920 
1921   PetscFunctionBegin;
1922   in   = gs->tree_map_in;
1923   out  = gs->tree_map_out;
1924   buf  = gs->tree_buf;
1925   work = gs->tree_work;
1926   size = gs->tree_nel;
1927 
1928   rvec_set(buf,-REAL_MAX,size);
1929 
1930   while (*in >= 0)
1931     {*(buf + *out++) = *(vals + *in++);}
1932 
1933   in   = gs->tree_map_in;
1934   out  = gs->tree_map_out;
1935   ierr = MPI_Allreduce(buf,work,size,MPIU_SCALAR,MPI_MAX,gs->gs_comm);CHKERRQ(ierr);
1936   while (*in >= 0)
1937     {*(vals + *in++) = *(work + *out++);}
1938   PetscFunctionReturn(0);
1939 }
1940 /******************************************************************************/
1941 static PetscErrorCode gs_gop_min_abs( gs_id *gs,  PetscScalar *vals)
1942 {
1943   PetscFunctionBegin;
1944   /* local only operations!!! */
1945   if (gs->num_local)
1946     {gs_gop_local_min_abs(gs,vals);}
1947 
1948   /* if intersection tree/pairwise and local isn't empty */
1949   if (gs->num_local_gop)
1950     {
1951       gs_gop_local_in_min_abs(gs,vals);
1952 
1953       /* pairwise */
1954       if (gs->num_pairs)
1955         {gs_gop_pairwise_min_abs(gs,vals);}
1956 
1957       /* tree */
1958       else if (gs->max_left_over)
1959         {gs_gop_tree_min_abs(gs,vals);}
1960 
1961       gs_gop_local_out(gs,vals);
1962     }
1963   /* if intersection tree/pairwise and local is empty */
1964   else
1965     {
1966       /* pairwise */
1967       if (gs->num_pairs)
1968         {gs_gop_pairwise_min_abs(gs,vals);}
1969 
1970       /* tree */
1971       else if (gs->max_left_over)
1972         {gs_gop_tree_min_abs(gs,vals);}
1973     }
1974   PetscFunctionReturn(0);
1975 }
1976 
1977 /******************************************************************************/
1978 static PetscErrorCode gs_gop_local_min_abs( gs_id *gs,  PetscScalar *vals)
1979 {
1980    PetscInt *num, *map, **reduce;
1981    PetscScalar tmp;
1982 
1983   PetscFunctionBegin;
1984   num    = gs->num_local_reduce;
1985   reduce = gs->local_reduce;
1986   while ((map = *reduce))
1987     {
1988       num ++;
1989       tmp = REAL_MAX;
1990       while (*map >= 0)
1991         {tmp = MIN_FABS(tmp,*(vals + *map)); map++;}
1992 
1993       map = *reduce++;
1994       while (*map >= 0)
1995         {*(vals + *map++) = tmp;}
1996     }
1997   PetscFunctionReturn(0);
1998 }
1999 
2000 /******************************************************************************/
2001 static PetscErrorCode gs_gop_local_in_min_abs( gs_id *gs,  PetscScalar *vals)
2002 {
2003    PetscInt *num, *map, **reduce;
2004    PetscScalar *base;
2005 
2006   PetscFunctionBegin;
2007   num    = gs->num_gop_local_reduce;
2008   reduce = gs->gop_local_reduce;
2009   while ((map = *reduce++))
2010     {
2011       num++;
2012       base = vals + *map++;
2013       while (*map >= 0)
2014         {*base = MIN_FABS(*base,*(vals + *map)); map++;}
2015     }
2016   PetscFunctionReturn(0);
2017 }
2018 
2019 /******************************************************************************/
2020 static PetscErrorCode gs_gop_pairwise_min_abs( gs_id *gs,  PetscScalar *in_vals)
2021 {
2022    PetscScalar *dptr1, *dptr2, *dptr3, *in1, *in2;
2023    PetscInt *iptr, *msg_list, *msg_size, **msg_nodes;
2024    PetscInt *pw, *list, *size, **nodes;
2025   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
2026   MPI_Status status;
2027   PetscErrorCode ierr;
2028 
2029   PetscFunctionBegin;
2030   /* strip and load s */
2031   msg_list =list         = gs->pair_list;
2032   msg_size =size         = gs->msg_sizes;
2033   msg_nodes=nodes        = gs->node_list;
2034   iptr=pw                = gs->pw_elm_list;
2035   dptr1=dptr3            = gs->pw_vals;
2036   msg_ids_in  = ids_in   = gs->msg_ids_in;
2037   msg_ids_out = ids_out  = gs->msg_ids_out;
2038   dptr2                  = gs->out;
2039   in1=in2                = gs->in;
2040 
2041   /* post the receives */
2042   /*  msg_nodes=nodes; */
2043   do
2044     {
2045       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
2046          second one *list and do list++ afterwards */
2047       ierr = MPI_Irecv(in1, *size, MPIU_SCALAR, MPI_ANY_SOURCE, MSGTAG1 + *list++, gs->gs_comm, msg_ids_in++);CHKERRQ(ierr);
2048       in1 += *size++;
2049     }
2050   while (*++msg_nodes);
2051   msg_nodes=nodes;
2052 
2053   /* load gs values into in out gs buffers */
2054   while (*iptr >= 0)
2055     {*dptr3++ = *(in_vals + *iptr++);}
2056 
2057   /* load out buffers and post the sends */
2058   while ((iptr = *msg_nodes++))
2059     {
2060       dptr3 = dptr2;
2061       while (*iptr >= 0)
2062         {*dptr2++ = *(dptr1 + *iptr++);}
2063       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
2064       /* is msg_ids_out++ correct? */
2065       ierr = MPI_Isend(dptr3, *msg_size++, MPIU_SCALAR, *msg_list++, MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);CHKERRQ(ierr);
2066     }
2067 
2068   if (gs->max_left_over)
2069     {gs_gop_tree_min_abs(gs,in_vals);}
2070 
2071   /* process the received data */
2072   msg_nodes=nodes;
2073   while ((iptr = *nodes++))
2074     {
2075       /* Should I check the return value of MPI_Wait() or status? */
2076       /* Can this loop be replaced by a call to MPI_Waitall()? */
2077       ierr = MPI_Wait(ids_in++, &status);CHKERRQ(ierr);
2078       while (*iptr >= 0)
2079         {*(dptr1 + *iptr) = MIN_FABS(*(dptr1 + *iptr),*in2); iptr++; in2++;}
2080     }
2081 
2082   /* replace vals */
2083   while (*pw >= 0)
2084     {*(in_vals + *pw++) = *dptr1++;}
2085 
2086   /* clear isend message handles */
2087   /* This changed for clarity though it could be the same */
2088   while (*msg_nodes++)
2089     /* Should I check the return value of MPI_Wait() or status? */
2090     /* Can this loop be replaced by a call to MPI_Waitall()? */
2091     {ierr = MPI_Wait(ids_out++, &status);CHKERRQ(ierr);}
2092   PetscFunctionReturn(0);
2093 }
2094 
2095 /******************************************************************************/
2096 static PetscErrorCode gs_gop_tree_min_abs(gs_id *gs, PetscScalar *vals)
2097 {
2098   PetscInt size;
2099   PetscInt *in, *out;
2100   PetscScalar *buf, *work;
2101   PetscInt op[] = {GL_MIN_ABS,0};
2102 
2103   PetscFunctionBegin;
2104   in   = gs->tree_map_in;
2105   out  = gs->tree_map_out;
2106   buf  = gs->tree_buf;
2107   work = gs->tree_work;
2108   size = gs->tree_nel;
2109 
2110   rvec_set(buf,REAL_MAX,size);
2111 
2112   while (*in >= 0)
2113     {*(buf + *out++) = *(vals + *in++);}
2114 
2115   in   = gs->tree_map_in;
2116   out  = gs->tree_map_out;
2117   grop(buf,work,size,op);
2118   while (*in >= 0)
2119     {*(vals + *in++) = *(buf + *out++);}
2120   PetscFunctionReturn(0);
2121 }
2122 
2123 /******************************************************************************/
2124 static PetscErrorCode gs_gop_min( gs_id *gs,  PetscScalar *vals)
2125 {
2126   PetscFunctionBegin;
2127   /* local only operations!!! */
2128   if (gs->num_local)
2129     {gs_gop_local_min(gs,vals);}
2130 
2131   /* if intersection tree/pairwise and local isn't empty */
2132   if (gs->num_local_gop)
2133     {
2134       gs_gop_local_in_min(gs,vals);
2135 
2136       /* pairwise */
2137       if (gs->num_pairs)
2138         {gs_gop_pairwise_min(gs,vals);}
2139 
2140       /* tree */
2141       else if (gs->max_left_over)
2142         {gs_gop_tree_min(gs,vals);}
2143 
2144       gs_gop_local_out(gs,vals);
2145     }
2146   /* if intersection tree/pairwise and local is empty */
2147   else
2148     {
2149       /* pairwise */
2150       if (gs->num_pairs)
2151         {gs_gop_pairwise_min(gs,vals);}
2152 
2153       /* tree */
2154       else if (gs->max_left_over)
2155         {gs_gop_tree_min(gs,vals);}
2156     }
2157   PetscFunctionReturn(0);
2158 }
2159 
2160 /******************************************************************************/
2161 static PetscErrorCode gs_gop_local_min( gs_id *gs,  PetscScalar *vals)
2162 {
2163    PetscInt *num, *map, **reduce;
2164    PetscScalar tmp;
2165   PetscFunctionBegin;
2166   num    = gs->num_local_reduce;
2167   reduce = gs->local_reduce;
2168   while ((map = *reduce))
2169     {
2170       num ++;
2171       tmp = REAL_MAX;
2172       while (*map >= 0)
2173         {tmp = PetscMin(tmp,*(vals + *map)); map++;}
2174 
2175       map = *reduce++;
2176       while (*map >= 0)
2177         {*(vals + *map++) = tmp;}
2178     }
2179   PetscFunctionReturn(0);
2180 }
2181 
2182 /******************************************************************************/
2183 static PetscErrorCode gs_gop_local_in_min( gs_id *gs,  PetscScalar *vals)
2184 {
2185    PetscInt *num, *map, **reduce;
2186    PetscScalar *base;
2187 
2188   PetscFunctionBegin;
2189   num    = gs->num_gop_local_reduce;
2190   reduce = gs->gop_local_reduce;
2191   while ((map = *reduce++))
2192     {
2193       num++;
2194       base = vals + *map++;
2195       while (*map >= 0)
2196         {*base = PetscMin(*base,*(vals + *map)); map++;}
2197     }
2198   PetscFunctionReturn(0);
2199 }
2200 
2201 /******************************************************************************/
2202 static PetscErrorCode gs_gop_pairwise_min( gs_id *gs,  PetscScalar *in_vals)
2203 {
2204    PetscScalar *dptr1, *dptr2, *dptr3, *in1, *in2;
2205    PetscInt *iptr, *msg_list, *msg_size, **msg_nodes;
2206    PetscInt *pw, *list, *size, **nodes;
2207   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
2208   MPI_Status status;
2209   PetscErrorCode ierr;
2210 
2211   PetscFunctionBegin;
2212   /* strip and load s */
2213   msg_list =list         = gs->pair_list;
2214   msg_size =size         = gs->msg_sizes;
2215   msg_nodes=nodes        = gs->node_list;
2216   iptr=pw                = gs->pw_elm_list;
2217   dptr1=dptr3            = gs->pw_vals;
2218   msg_ids_in  = ids_in   = gs->msg_ids_in;
2219   msg_ids_out = ids_out  = gs->msg_ids_out;
2220   dptr2                  = gs->out;
2221   in1=in2                = gs->in;
2222 
2223   /* post the receives */
2224   /*  msg_nodes=nodes; */
2225   do
2226     {
2227       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
2228          second one *list and do list++ afterwards */
2229       ierr = MPI_Irecv(in1, *size, MPIU_SCALAR, MPI_ANY_SOURCE, MSGTAG1 + *list++, gs->gs_comm, msg_ids_in++);CHKERRQ(ierr);
2230       in1 += *size++;
2231     }
2232   while (*++msg_nodes);
2233   msg_nodes=nodes;
2234 
2235   /* load gs values into in out gs buffers */
2236   while (*iptr >= 0)
2237     {*dptr3++ = *(in_vals + *iptr++);}
2238 
2239   /* load out buffers and post the sends */
2240   while ((iptr = *msg_nodes++))
2241     {
2242       dptr3 = dptr2;
2243       while (*iptr >= 0)
2244         {*dptr2++ = *(dptr1 + *iptr++);}
2245       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
2246       /* is msg_ids_out++ correct? */
2247       ierr = MPI_Isend(dptr3, *msg_size++, MPIU_SCALAR, *msg_list++, MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);CHKERRQ(ierr);
2248     }
2249 
2250   /* process the received data */
2251   if (gs->max_left_over)
2252     {gs_gop_tree_min(gs,in_vals);}
2253 
2254   msg_nodes=nodes;
2255   while ((iptr = *nodes++))
2256     {
2257       /* Should I check the return value of MPI_Wait() or status? */
2258       /* Can this loop be replaced by a call to MPI_Waitall()? */
2259       ierr = MPI_Wait(ids_in++, &status);CHKERRQ(ierr);
2260       while (*iptr >= 0)
2261         {*(dptr1 + *iptr) = PetscMin(*(dptr1 + *iptr),*in2); iptr++; in2++;}
2262     }
2263 
2264   /* replace vals */
2265   while (*pw >= 0)
2266     {*(in_vals + *pw++) = *dptr1++;}
2267 
2268   /* clear isend message handles */
2269   /* This changed for clarity though it could be the same */
2270   while (*msg_nodes++)
2271     /* Should I check the return value of MPI_Wait() or status? */
2272     /* Can this loop be replaced by a call to MPI_Waitall()? */
2273     {ierr = MPI_Wait(ids_out++, &status);CHKERRQ(ierr);}
2274   PetscFunctionReturn(0);
2275 }
2276 
2277 /******************************************************************************/
2278 static PetscErrorCode gs_gop_tree_min(gs_id *gs, PetscScalar *vals)
2279 {
2280   PetscInt size;
2281   PetscInt *in, *out;
2282   PetscScalar *buf, *work;
2283   PetscErrorCode ierr;
2284 
2285   PetscFunctionBegin;
2286   in   = gs->tree_map_in;
2287   out  = gs->tree_map_out;
2288   buf  = gs->tree_buf;
2289   work = gs->tree_work;
2290   size = gs->tree_nel;
2291 
2292   rvec_set(buf,REAL_MAX,size);
2293 
2294   while (*in >= 0)
2295     {*(buf + *out++) = *(vals + *in++);}
2296 
2297   in   = gs->tree_map_in;
2298   out  = gs->tree_map_out;
2299   ierr = MPI_Allreduce(buf,work,size,MPIU_SCALAR,MPI_MIN,gs->gs_comm);CHKERRQ(ierr);
2300   while (*in >= 0)
2301     {*(vals + *in++) = *(work + *out++);}
2302   PetscFunctionReturn(0);
2303 }
2304 
2305 /******************************************************************************/
2306 static PetscErrorCode gs_gop_times( gs_id *gs,  PetscScalar *vals)
2307 {
2308   PetscFunctionBegin;
2309   /* local only operations!!! */
2310   if (gs->num_local)
2311     {gs_gop_local_times(gs,vals);}
2312 
2313   /* if intersection tree/pairwise and local isn't empty */
2314   if (gs->num_local_gop)
2315     {
2316       gs_gop_local_in_times(gs,vals);
2317 
2318       /* pairwise */
2319       if (gs->num_pairs)
2320         {gs_gop_pairwise_times(gs,vals);}
2321 
2322       /* tree */
2323       else if (gs->max_left_over)
2324         {gs_gop_tree_times(gs,vals);}
2325 
2326       gs_gop_local_out(gs,vals);
2327     }
2328   /* if intersection tree/pairwise and local is empty */
2329   else
2330     {
2331       /* pairwise */
2332       if (gs->num_pairs)
2333         {gs_gop_pairwise_times(gs,vals);}
2334 
2335       /* tree */
2336       else if (gs->max_left_over)
2337         {gs_gop_tree_times(gs,vals);}
2338     }
2339   PetscFunctionReturn(0);
2340 }
2341 
2342 /******************************************************************************/
2343 static PetscErrorCode gs_gop_local_times( gs_id *gs,  PetscScalar *vals)
2344 {
2345    PetscInt *num, *map, **reduce;
2346    PetscScalar tmp;
2347 
2348   PetscFunctionBegin;
2349   num    = gs->num_local_reduce;
2350   reduce = gs->local_reduce;
2351   while ((map = *reduce))
2352     {
2353       /* wall */
2354       if (*num == 2)
2355         {
2356           num ++; reduce++;
2357           vals[map[1]] = vals[map[0]] *= vals[map[1]];
2358         }
2359       /* corner shared by three elements */
2360       else if (*num == 3)
2361         {
2362           num ++; reduce++;
2363           vals[map[2]]=vals[map[1]]=vals[map[0]]*=(vals[map[1]]*vals[map[2]]);
2364         }
2365       /* corner shared by four elements */
2366       else if (*num == 4)
2367         {
2368           num ++; reduce++;
2369           vals[map[1]]=vals[map[2]]=vals[map[3]]=vals[map[0]] *=
2370                                  (vals[map[1]] * vals[map[2]] * vals[map[3]]);
2371         }
2372       /* general case ... odd geoms ... 3D*/
2373       else
2374         {
2375           num ++;
2376           tmp = 1.0;
2377           while (*map >= 0)
2378             {tmp *= *(vals + *map++);}
2379 
2380           map = *reduce++;
2381           while (*map >= 0)
2382             {*(vals + *map++) = tmp;}
2383         }
2384     }
2385   PetscFunctionReturn(0);
2386 }
2387 
2388 /******************************************************************************/
2389 static PetscErrorCode gs_gop_local_in_times( gs_id *gs,  PetscScalar *vals)
2390 {
2391    PetscInt *num, *map, **reduce;
2392    PetscScalar *base;
2393 
2394   PetscFunctionBegin;
2395   num    = gs->num_gop_local_reduce;
2396   reduce = gs->gop_local_reduce;
2397   while ((map = *reduce++))
2398     {
2399       /* wall */
2400       if (*num == 2)
2401         {
2402           num ++;
2403           vals[map[0]] *= vals[map[1]];
2404         }
2405       /* corner shared by three elements */
2406       else if (*num == 3)
2407         {
2408           num ++;
2409           vals[map[0]] *= (vals[map[1]] * vals[map[2]]);
2410         }
2411       /* corner shared by four elements */
2412       else if (*num == 4)
2413         {
2414           num ++;
2415           vals[map[0]] *= (vals[map[1]] * vals[map[2]] * vals[map[3]]);
2416         }
2417       /* general case ... odd geoms ... 3D*/
2418       else
2419         {
2420           num++;
2421           base = vals + *map++;
2422           while (*map >= 0)
2423             {*base *= *(vals + *map++);}
2424         }
2425     }
2426   PetscFunctionReturn(0);
2427 }
2428 
2429 /******************************************************************************/
2430 static PetscErrorCode gs_gop_pairwise_times( gs_id *gs,  PetscScalar *in_vals)
2431 {
2432    PetscScalar *dptr1, *dptr2, *dptr3, *in1, *in2;
2433    PetscInt *iptr, *msg_list, *msg_size, **msg_nodes;
2434    PetscInt *pw, *list, *size, **nodes;
2435   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
2436   MPI_Status status;
2437   PetscErrorCode ierr;
2438 
2439   PetscFunctionBegin;
2440   /* strip and load s */
2441   msg_list =list         = gs->pair_list;
2442   msg_size =size         = gs->msg_sizes;
2443   msg_nodes=nodes        = gs->node_list;
2444   iptr=pw                = gs->pw_elm_list;
2445   dptr1=dptr3            = gs->pw_vals;
2446   msg_ids_in  = ids_in   = gs->msg_ids_in;
2447   msg_ids_out = ids_out  = gs->msg_ids_out;
2448   dptr2                  = gs->out;
2449   in1=in2                = gs->in;
2450 
2451   /* post the receives */
2452   /*  msg_nodes=nodes; */
2453   do
2454     {
2455       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
2456          second one *list and do list++ afterwards */
2457       ierr = MPI_Irecv(in1, *size, MPIU_SCALAR, MPI_ANY_SOURCE, MSGTAG1 + *list++, gs->gs_comm, msg_ids_in++);CHKERRQ(ierr);
2458       in1 += *size++;
2459     }
2460   while (*++msg_nodes);
2461   msg_nodes=nodes;
2462 
2463   /* load gs values into in out gs buffers */
2464   while (*iptr >= 0)
2465     {*dptr3++ = *(in_vals + *iptr++);}
2466 
2467   /* load out buffers and post the sends */
2468   while ((iptr = *msg_nodes++))
2469     {
2470       dptr3 = dptr2;
2471       while (*iptr >= 0)
2472         {*dptr2++ = *(dptr1 + *iptr++);}
2473       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
2474       /* is msg_ids_out++ correct? */
2475       ierr = MPI_Isend(dptr3, *msg_size++, MPIU_SCALAR, *msg_list++, MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);CHKERRQ(ierr);
2476     }
2477 
2478   if (gs->max_left_over)
2479     {gs_gop_tree_times(gs,in_vals);}
2480 
2481   /* process the received data */
2482   msg_nodes=nodes;
2483   while ((iptr = *nodes++))
2484     {
2485       /* Should I check the return value of MPI_Wait() or status? */
2486       /* Can this loop be replaced by a call to MPI_Waitall()? */
2487       ierr = MPI_Wait(ids_in++, &status);CHKERRQ(ierr);
2488       while (*iptr >= 0)
2489         {*(dptr1 + *iptr++) *= *in2++;}
2490     }
2491 
2492   /* replace vals */
2493   while (*pw >= 0)
2494     {*(in_vals + *pw++) = *dptr1++;}
2495 
2496   /* clear isend message handles */
2497   /* This changed for clarity though it could be the same */
2498   while (*msg_nodes++)
2499     /* Should I check the return value of MPI_Wait() or status? */
2500     /* Can this loop be replaced by a call to MPI_Waitall()? */
2501     {ierr = MPI_Wait(ids_out++, &status);CHKERRQ(ierr);}
2502   PetscFunctionReturn(0);
2503 }
2504 
2505 /******************************************************************************/
2506 static PetscErrorCode gs_gop_tree_times(gs_id *gs, PetscScalar *vals)
2507 {
2508   PetscInt size;
2509   PetscInt *in, *out;
2510   PetscScalar *buf, *work;
2511   PetscErrorCode ierr;
2512 
2513   PetscFunctionBegin;
2514   in   = gs->tree_map_in;
2515   out  = gs->tree_map_out;
2516   buf  = gs->tree_buf;
2517   work = gs->tree_work;
2518   size = gs->tree_nel;
2519 
2520   rvec_one(buf,size);
2521 
2522   while (*in >= 0)
2523     {*(buf + *out++) = *(vals + *in++);}
2524 
2525   in   = gs->tree_map_in;
2526   out  = gs->tree_map_out;
2527   ierr = MPI_Allreduce(buf,work,size,MPIU_SCALAR,MPI_PROD,gs->gs_comm);CHKERRQ(ierr);
2528   while (*in >= 0)
2529     {*(vals + *in++) = *(work + *out++);}
2530   PetscFunctionReturn(0);
2531 }
2532 
2533 /******************************************************************************/
2534 static PetscErrorCode gs_gop_plus( gs_id *gs,  PetscScalar *vals)
2535 {
2536   PetscFunctionBegin;
2537   /* local only operations!!! */
2538   if (gs->num_local)
2539     {gs_gop_local_plus(gs,vals);}
2540 
2541   /* if intersection tree/pairwise and local isn't empty */
2542   if (gs->num_local_gop)
2543     {
2544       gs_gop_local_in_plus(gs,vals);
2545 
2546       /* pairwise will NOT do tree inside ... */
2547       if (gs->num_pairs)
2548         {gs_gop_pairwise_plus(gs,vals);}
2549 
2550       /* tree */
2551       if (gs->max_left_over)
2552         {gs_gop_tree_plus(gs,vals);}
2553 
2554       gs_gop_local_out(gs,vals);
2555     }
2556   /* if intersection tree/pairwise and local is empty */
2557   else
2558     {
2559       /* pairwise will NOT do tree inside */
2560       if (gs->num_pairs)
2561         {gs_gop_pairwise_plus(gs,vals);}
2562 
2563       /* tree */
2564       if (gs->max_left_over)
2565         {gs_gop_tree_plus(gs,vals);}
2566     }
2567   PetscFunctionReturn(0);
2568 }
2569 
2570 /******************************************************************************/
2571 static PetscErrorCode gs_gop_local_plus( gs_id *gs,  PetscScalar *vals)
2572 {
2573    PetscInt *num, *map, **reduce;
2574    PetscScalar tmp;
2575 
2576   PetscFunctionBegin;
2577   num    = gs->num_local_reduce;
2578   reduce = gs->local_reduce;
2579   while ((map = *reduce))
2580     {
2581       /* wall */
2582       if (*num == 2)
2583         {
2584           num ++; reduce++;
2585           vals[map[1]] = vals[map[0]] += vals[map[1]];
2586         }
2587       /* corner shared by three elements */
2588       else if (*num == 3)
2589         {
2590           num ++; reduce++;
2591           vals[map[2]]=vals[map[1]]=vals[map[0]]+=(vals[map[1]]+vals[map[2]]);
2592         }
2593       /* corner shared by four elements */
2594       else if (*num == 4)
2595         {
2596           num ++; reduce++;
2597           vals[map[1]]=vals[map[2]]=vals[map[3]]=vals[map[0]] +=
2598                                  (vals[map[1]] + vals[map[2]] + vals[map[3]]);
2599         }
2600       /* general case ... odd geoms ... 3D*/
2601       else
2602         {
2603           num ++;
2604           tmp = 0.0;
2605           while (*map >= 0)
2606             {tmp += *(vals + *map++);}
2607 
2608           map = *reduce++;
2609           while (*map >= 0)
2610             {*(vals + *map++) = tmp;}
2611         }
2612     }
2613   PetscFunctionReturn(0);
2614 }
2615 
2616 /******************************************************************************/
2617 static PetscErrorCode gs_gop_local_in_plus( gs_id *gs,  PetscScalar *vals)
2618 {
2619    PetscInt *num, *map, **reduce;
2620    PetscScalar *base;
2621 
2622   PetscFunctionBegin;
2623   num    = gs->num_gop_local_reduce;
2624   reduce = gs->gop_local_reduce;
2625   while ((map = *reduce++))
2626     {
2627       /* wall */
2628       if (*num == 2)
2629         {
2630           num ++;
2631           vals[map[0]] += vals[map[1]];
2632         }
2633       /* corner shared by three elements */
2634       else if (*num == 3)
2635         {
2636           num ++;
2637           vals[map[0]] += (vals[map[1]] + vals[map[2]]);
2638         }
2639       /* corner shared by four elements */
2640       else if (*num == 4)
2641         {
2642           num ++;
2643           vals[map[0]] += (vals[map[1]] + vals[map[2]] + vals[map[3]]);
2644         }
2645       /* general case ... odd geoms ... 3D*/
2646       else
2647         {
2648           num++;
2649           base = vals + *map++;
2650           while (*map >= 0)
2651             {*base += *(vals + *map++);}
2652         }
2653     }
2654   PetscFunctionReturn(0);
2655 }
2656 
2657 /******************************************************************************/
2658 static PetscErrorCode gs_gop_pairwise_plus( gs_id *gs,  PetscScalar *in_vals)
2659 {
2660    PetscScalar *dptr1, *dptr2, *dptr3, *in1, *in2;
2661    PetscInt *iptr, *msg_list, *msg_size, **msg_nodes;
2662    PetscInt *pw, *list, *size, **nodes;
2663   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
2664   MPI_Status status;
2665   PetscErrorCode ierr;
2666 
2667   PetscFunctionBegin;
2668   /* strip and load s */
2669   msg_list =list         = gs->pair_list;
2670   msg_size =size         = gs->msg_sizes;
2671   msg_nodes=nodes        = gs->node_list;
2672   iptr=pw                = gs->pw_elm_list;
2673   dptr1=dptr3            = gs->pw_vals;
2674   msg_ids_in  = ids_in   = gs->msg_ids_in;
2675   msg_ids_out = ids_out  = gs->msg_ids_out;
2676   dptr2                  = gs->out;
2677   in1=in2                = gs->in;
2678 
2679   /* post the receives */
2680   /*  msg_nodes=nodes; */
2681   do
2682     {
2683       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
2684          second one *list and do list++ afterwards */
2685       ierr = MPI_Irecv(in1, *size, MPIU_SCALAR, MPI_ANY_SOURCE, MSGTAG1 + *list++, gs->gs_comm, msg_ids_in++);CHKERRQ(ierr);
2686       in1 += *size++;
2687     }
2688   while (*++msg_nodes);
2689   msg_nodes=nodes;
2690 
2691   /* load gs values into in out gs buffers */
2692   while (*iptr >= 0)
2693     {*dptr3++ = *(in_vals + *iptr++);}
2694 
2695   /* load out buffers and post the sends */
2696   while ((iptr = *msg_nodes++))
2697     {
2698       dptr3 = dptr2;
2699       while (*iptr >= 0)
2700         {*dptr2++ = *(dptr1 + *iptr++);}
2701       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
2702       /* is msg_ids_out++ correct? */
2703       ierr = MPI_Isend(dptr3, *msg_size++, MPIU_SCALAR, *msg_list++, MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);CHKERRQ(ierr);
2704     }
2705 
2706   /* do the tree while we're waiting */
2707   if (gs->max_left_over)
2708     {gs_gop_tree_plus(gs,in_vals);}
2709 
2710   /* process the received data */
2711   msg_nodes=nodes;
2712   while ((iptr = *nodes++))
2713     {
2714       /* Should I check the return value of MPI_Wait() or status? */
2715       /* Can this loop be replaced by a call to MPI_Waitall()? */
2716       ierr = MPI_Wait(ids_in++, &status);CHKERRQ(ierr);
2717       while (*iptr >= 0)
2718         {*(dptr1 + *iptr++) += *in2++;}
2719     }
2720 
2721   /* replace vals */
2722   while (*pw >= 0)
2723     {*(in_vals + *pw++) = *dptr1++;}
2724 
2725   /* clear isend message handles */
2726   /* This changed for clarity though it could be the same */
2727   while (*msg_nodes++)
2728     /* Should I check the return value of MPI_Wait() or status? */
2729     /* Can this loop be replaced by a call to MPI_Waitall()? */
2730     {ierr = MPI_Wait(ids_out++, &status);CHKERRQ(ierr);}
2731   PetscFunctionReturn(0);
2732 }
2733 
2734 /******************************************************************************/
2735 static PetscErrorCode gs_gop_tree_plus(gs_id *gs, PetscScalar *vals)
2736 {
2737   PetscInt size;
2738   PetscInt *in, *out;
2739   PetscScalar *buf, *work;
2740   PetscErrorCode ierr;
2741 
2742   PetscFunctionBegin;
2743   in   = gs->tree_map_in;
2744   out  = gs->tree_map_out;
2745   buf  = gs->tree_buf;
2746   work = gs->tree_work;
2747   size = gs->tree_nel;
2748 
2749   rvec_zero(buf,size);
2750 
2751   while (*in >= 0)
2752     {*(buf + *out++) = *(vals + *in++);}
2753 
2754   in   = gs->tree_map_in;
2755   out  = gs->tree_map_out;
2756   ierr = MPI_Allreduce(buf,work,size,MPIU_SCALAR,MPI_SUM,gs->gs_comm);CHKERRQ(ierr);
2757   while (*in >= 0)
2758     {*(vals + *in++) = *(work + *out++);}
2759   PetscFunctionReturn(0);
2760 }
2761 
2762 /******************************************************************************/
2763 PetscErrorCode gs_free( gs_id *gs)
2764 {
2765    PetscInt i;
2766 
2767   PetscFunctionBegin;
2768   if (gs->nghs) {free((void*) gs->nghs);}
2769   if (gs->pw_nghs) {free((void*) gs->pw_nghs);}
2770 
2771   /* tree */
2772   if (gs->max_left_over)
2773     {
2774       if (gs->tree_elms) {free((void*) gs->tree_elms);}
2775       if (gs->tree_buf) {free((void*) gs->tree_buf);}
2776       if (gs->tree_work) {free((void*) gs->tree_work);}
2777       if (gs->tree_map_in) {free((void*) gs->tree_map_in);}
2778       if (gs->tree_map_out) {free((void*) gs->tree_map_out);}
2779     }
2780 
2781   /* pairwise info */
2782   if (gs->num_pairs)
2783     {
2784       /* should be NULL already */
2785       if (gs->ngh_buf) {free((void*) gs->ngh_buf);}
2786       if (gs->elms) {free((void*) gs->elms);}
2787       if (gs->local_elms) {free((void*) gs->local_elms);}
2788       if (gs->companion) {free((void*) gs->companion);}
2789 
2790       /* only set if pairwise */
2791       if (gs->vals) {free((void*) gs->vals);}
2792       if (gs->in) {free((void*) gs->in);}
2793       if (gs->out) {free((void*) gs->out);}
2794       if (gs->msg_ids_in) {free((void*) gs->msg_ids_in);}
2795       if (gs->msg_ids_out) {free((void*) gs->msg_ids_out);}
2796       if (gs->pw_vals) {free((void*) gs->pw_vals);}
2797       if (gs->pw_elm_list) {free((void*) gs->pw_elm_list);}
2798       if (gs->node_list)
2799         {
2800           for (i=0;i<gs->num_pairs;i++)
2801             {if (gs->node_list[i]) {free((void*) gs->node_list[i]);}}
2802           free((void*) gs->node_list);
2803         }
2804       if (gs->msg_sizes) {free((void*) gs->msg_sizes);}
2805       if (gs->pair_list) {free((void*) gs->pair_list);}
2806     }
2807 
2808   /* local info */
2809   if (gs->num_local_total>=0)
2810     {
2811       for (i=0;i<gs->num_local_total+1;i++)
2812         /*      for (i=0;i<gs->num_local_total;i++) */
2813         {
2814           if (gs->num_gop_local_reduce[i])
2815             {free((void*) gs->gop_local_reduce[i]);}
2816         }
2817     }
2818 
2819   /* if intersection tree/pairwise and local isn't empty */
2820   if (gs->gop_local_reduce) {free((void*) gs->gop_local_reduce);}
2821   if (gs->num_gop_local_reduce) {free((void*) gs->num_gop_local_reduce);}
2822 
2823   free((void*) gs);
2824   PetscFunctionReturn(0);
2825 }
2826 
2827 /******************************************************************************/
2828 PetscErrorCode gs_gop_vec( gs_id *gs,  PetscScalar *vals,  const char *op,  PetscInt step)
2829 {
2830   PetscFunctionBegin;
2831   switch (*op) {
2832   case '+':
2833     gs_gop_vec_plus(gs,vals,step);
2834     break;
2835   default:
2836     error_msg_warning("gs_gop_vec() :: %c is not a valid op",op[0]);
2837     error_msg_warning("gs_gop_vec() :: default :: plus");
2838     gs_gop_vec_plus(gs,vals,step);
2839     break;
2840   }
2841   PetscFunctionReturn(0);
2842 }
2843 
2844 /******************************************************************************/
2845 static PetscErrorCode gs_gop_vec_plus( gs_id *gs,  PetscScalar *vals,  PetscInt step)
2846 {
2847   PetscFunctionBegin;
2848   if (!gs) {error_msg_fatal("gs_gop_vec() passed NULL gs handle!!!");}
2849 
2850   /* local only operations!!! */
2851   if (gs->num_local)
2852     {gs_gop_vec_local_plus(gs,vals,step);}
2853 
2854   /* if intersection tree/pairwise and local isn't empty */
2855   if (gs->num_local_gop)
2856     {
2857       gs_gop_vec_local_in_plus(gs,vals,step);
2858 
2859       /* pairwise */
2860       if (gs->num_pairs)
2861         {gs_gop_vec_pairwise_plus(gs,vals,step);}
2862 
2863       /* tree */
2864       else if (gs->max_left_over)
2865         {gs_gop_vec_tree_plus(gs,vals,step);}
2866 
2867       gs_gop_vec_local_out(gs,vals,step);
2868     }
2869   /* if intersection tree/pairwise and local is empty */
2870   else
2871     {
2872       /* pairwise */
2873       if (gs->num_pairs)
2874         {gs_gop_vec_pairwise_plus(gs,vals,step);}
2875 
2876       /* tree */
2877       else if (gs->max_left_over)
2878         {gs_gop_vec_tree_plus(gs,vals,step);}
2879     }
2880   PetscFunctionReturn(0);
2881 }
2882 
2883 /******************************************************************************/
2884 static PetscErrorCode gs_gop_vec_local_plus( gs_id *gs,  PetscScalar *vals, PetscInt step)
2885 {
2886    PetscInt *num, *map, **reduce;
2887    PetscScalar *base;
2888 
2889   PetscFunctionBegin;
2890   num    = gs->num_local_reduce;
2891   reduce = gs->local_reduce;
2892   while ((map = *reduce))
2893     {
2894       base = vals + map[0] * step;
2895 
2896       /* wall */
2897       if (*num == 2)
2898         {
2899           num++; reduce++;
2900           rvec_add (base,vals+map[1]*step,step);
2901           rvec_copy(vals+map[1]*step,base,step);
2902         }
2903       /* corner shared by three elements */
2904       else if (*num == 3)
2905         {
2906           num++; reduce++;
2907           rvec_add (base,vals+map[1]*step,step);
2908           rvec_add (base,vals+map[2]*step,step);
2909           rvec_copy(vals+map[2]*step,base,step);
2910           rvec_copy(vals+map[1]*step,base,step);
2911         }
2912       /* corner shared by four elements */
2913       else if (*num == 4)
2914         {
2915           num++; reduce++;
2916           rvec_add (base,vals+map[1]*step,step);
2917           rvec_add (base,vals+map[2]*step,step);
2918           rvec_add (base,vals+map[3]*step,step);
2919           rvec_copy(vals+map[3]*step,base,step);
2920           rvec_copy(vals+map[2]*step,base,step);
2921           rvec_copy(vals+map[1]*step,base,step);
2922         }
2923       /* general case ... odd geoms ... 3D */
2924       else
2925         {
2926           num++;
2927           while (*++map >= 0)
2928             {rvec_add (base,vals+*map*step,step);}
2929 
2930           map = *reduce;
2931           while (*++map >= 0)
2932             {rvec_copy(vals+*map*step,base,step);}
2933 
2934           reduce++;
2935         }
2936     }
2937   PetscFunctionReturn(0);
2938 }
2939 
2940 /******************************************************************************/
2941 static PetscErrorCode gs_gop_vec_local_in_plus( gs_id *gs,  PetscScalar *vals, PetscInt step)
2942 {
2943    PetscInt  *num, *map, **reduce;
2944    PetscScalar *base;
2945   PetscFunctionBegin;
2946   num    = gs->num_gop_local_reduce;
2947   reduce = gs->gop_local_reduce;
2948   while ((map = *reduce++))
2949     {
2950       base = vals + map[0] * step;
2951 
2952       /* wall */
2953       if (*num == 2)
2954         {
2955           num ++;
2956           rvec_add(base,vals+map[1]*step,step);
2957         }
2958       /* corner shared by three elements */
2959       else if (*num == 3)
2960         {
2961           num ++;
2962           rvec_add(base,vals+map[1]*step,step);
2963           rvec_add(base,vals+map[2]*step,step);
2964         }
2965       /* corner shared by four elements */
2966       else if (*num == 4)
2967         {
2968           num ++;
2969           rvec_add(base,vals+map[1]*step,step);
2970           rvec_add(base,vals+map[2]*step,step);
2971           rvec_add(base,vals+map[3]*step,step);
2972         }
2973       /* general case ... odd geoms ... 3D*/
2974       else
2975         {
2976           num++;
2977           while (*++map >= 0)
2978             {rvec_add(base,vals+*map*step,step);}
2979         }
2980     }
2981   PetscFunctionReturn(0);
2982 }
2983 
2984 /******************************************************************************/
2985 static PetscErrorCode gs_gop_vec_local_out( gs_id *gs,  PetscScalar *vals, PetscInt step)
2986 {
2987    PetscInt *num, *map, **reduce;
2988    PetscScalar *base;
2989 
2990   PetscFunctionBegin;
2991   num    = gs->num_gop_local_reduce;
2992   reduce = gs->gop_local_reduce;
2993   while ((map = *reduce++))
2994     {
2995       base = vals + map[0] * step;
2996 
2997       /* wall */
2998       if (*num == 2)
2999         {
3000           num ++;
3001           rvec_copy(vals+map[1]*step,base,step);
3002         }
3003       /* corner shared by three elements */
3004       else if (*num == 3)
3005         {
3006           num ++;
3007           rvec_copy(vals+map[1]*step,base,step);
3008           rvec_copy(vals+map[2]*step,base,step);
3009         }
3010       /* corner shared by four elements */
3011       else if (*num == 4)
3012         {
3013           num ++;
3014           rvec_copy(vals+map[1]*step,base,step);
3015           rvec_copy(vals+map[2]*step,base,step);
3016           rvec_copy(vals+map[3]*step,base,step);
3017         }
3018       /* general case ... odd geoms ... 3D*/
3019       else
3020         {
3021           num++;
3022           while (*++map >= 0)
3023             {rvec_copy(vals+*map*step,base,step);}
3024         }
3025     }
3026   PetscFunctionReturn(0);
3027 }
3028 
3029 /******************************************************************************/
3030 static PetscErrorCode gs_gop_vec_pairwise_plus( gs_id *gs,  PetscScalar *in_vals, PetscInt step)
3031 {
3032    PetscScalar *dptr1, *dptr2, *dptr3, *in1, *in2;
3033    PetscInt *iptr, *msg_list, *msg_size, **msg_nodes;
3034    PetscInt *pw, *list, *size, **nodes;
3035   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
3036   MPI_Status status;
3037   PetscBLASInt i1;
3038   PetscErrorCode ierr;
3039 
3040   PetscFunctionBegin;
3041   /* strip and load s */
3042   msg_list =list         = gs->pair_list;
3043   msg_size =size         = gs->msg_sizes;
3044   msg_nodes=nodes        = gs->node_list;
3045   iptr=pw                = gs->pw_elm_list;
3046   dptr1=dptr3            = gs->pw_vals;
3047   msg_ids_in  = ids_in   = gs->msg_ids_in;
3048   msg_ids_out = ids_out  = gs->msg_ids_out;
3049   dptr2                  = gs->out;
3050   in1=in2                = gs->in;
3051 
3052   /* post the receives */
3053   /*  msg_nodes=nodes; */
3054   do
3055     {
3056       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
3057          second one *list and do list++ afterwards */
3058       ierr = MPI_Irecv(in1, *size *step, MPIU_SCALAR, MPI_ANY_SOURCE, MSGTAG1 + *list++, gs->gs_comm, msg_ids_in++);CHKERRQ(ierr);
3059       in1 += *size++ *step;
3060     }
3061   while (*++msg_nodes);
3062   msg_nodes=nodes;
3063 
3064   /* load gs values into in out gs buffers */
3065   while (*iptr >= 0)
3066     {
3067       rvec_copy(dptr3,in_vals + *iptr*step,step);
3068       dptr3+=step;
3069       iptr++;
3070     }
3071 
3072   /* load out buffers and post the sends */
3073   while ((iptr = *msg_nodes++))
3074     {
3075       dptr3 = dptr2;
3076       while (*iptr >= 0)
3077         {
3078           rvec_copy(dptr2,dptr1 + *iptr*step,step);
3079           dptr2+=step;
3080           iptr++;
3081         }
3082       ierr = MPI_Isend(dptr3, *msg_size++ *step, MPIU_SCALAR, *msg_list++, MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);CHKERRQ(ierr);
3083     }
3084 
3085   /* tree */
3086   if (gs->max_left_over)
3087     {gs_gop_vec_tree_plus(gs,in_vals,step);}
3088 
3089   /* process the received data */
3090   msg_nodes=nodes;
3091   while ((iptr = *nodes++)){
3092     PetscScalar d1 = 1.0;
3093       /* Should I check the return value of MPI_Wait() or status? */
3094       /* Can this loop be replaced by a call to MPI_Waitall()? */
3095       ierr = MPI_Wait(ids_in++, &status);CHKERRQ(ierr);
3096       while (*iptr >= 0) {
3097           BLASaxpy_(&step,&d1,in2,&i1,dptr1 + *iptr*step,&i1);
3098           in2+=step;
3099           iptr++;
3100       }
3101   }
3102 
3103   /* replace vals */
3104   while (*pw >= 0)
3105     {
3106       rvec_copy(in_vals + *pw*step,dptr1,step);
3107       dptr1+=step;
3108       pw++;
3109     }
3110 
3111   /* clear isend message handles */
3112   /* This changed for clarity though it could be the same */
3113   while (*msg_nodes++)
3114     /* Should I check the return value of MPI_Wait() or status? */
3115     /* Can this loop be replaced by a call to MPI_Waitall()? */
3116     {ierr = MPI_Wait(ids_out++, &status);CHKERRQ(ierr);}
3117 
3118   PetscFunctionReturn(0);
3119 }
3120 
3121 /******************************************************************************/
3122 static PetscErrorCode gs_gop_vec_tree_plus( gs_id *gs,  PetscScalar *vals,  PetscInt step)
3123 {
3124   PetscInt size, *in, *out;
3125   PetscScalar *buf, *work;
3126   PetscInt op[] = {GL_ADD,0};
3127   PetscBLASInt i1 = 1;
3128 
3129   PetscFunctionBegin;
3130   /* copy over to local variables */
3131   in   = gs->tree_map_in;
3132   out  = gs->tree_map_out;
3133   buf  = gs->tree_buf;
3134   work = gs->tree_work;
3135   size = gs->tree_nel*step;
3136 
3137   /* zero out collection buffer */
3138   rvec_zero(buf,size);
3139 
3140 
3141   /* copy over my contributions */
3142   while (*in >= 0)
3143     {
3144       BLAScopy_(&step,vals + *in++*step,&i1,buf + *out++*step,&i1);
3145     }
3146 
3147   /* perform fan in/out on full buffer */
3148   /* must change grop to handle the blas */
3149   grop(buf,work,size,op);
3150 
3151   /* reset */
3152   in   = gs->tree_map_in;
3153   out  = gs->tree_map_out;
3154 
3155   /* get the portion of the results I need */
3156   while (*in >= 0)
3157     {
3158       BLAScopy_(&step,buf + *out++*step,&i1,vals + *in++*step,&i1);
3159     }
3160   PetscFunctionReturn(0);
3161 }
3162 
3163 /******************************************************************************/
3164 PetscErrorCode gs_gop_hc( gs_id *gs,  PetscScalar *vals,  const char *op,  PetscInt dim)
3165 {
3166   PetscFunctionBegin;
3167   switch (*op) {
3168   case '+':
3169     gs_gop_plus_hc(gs,vals,dim);
3170     break;
3171   default:
3172     error_msg_warning("gs_gop_hc() :: %c is not a valid op",op[0]);
3173     error_msg_warning("gs_gop_hc() :: default :: plus\n");
3174     gs_gop_plus_hc(gs,vals,dim);
3175     break;
3176   }
3177   PetscFunctionReturn(0);
3178 }
3179 
3180 /******************************************************************************/
3181 static PetscErrorCode gs_gop_plus_hc( gs_id *gs,  PetscScalar *vals, PetscInt dim)
3182 {
3183   PetscFunctionBegin;
3184   /* if there's nothing to do return */
3185   if (dim<=0)
3186     {  PetscFunctionReturn(0);}
3187 
3188   /* can't do more dimensions then exist */
3189   dim = PetscMin(dim,i_log2_num_nodes);
3190 
3191   /* local only operations!!! */
3192   if (gs->num_local)
3193     {gs_gop_local_plus(gs,vals);}
3194 
3195   /* if intersection tree/pairwise and local isn't empty */
3196   if (gs->num_local_gop)
3197     {
3198       gs_gop_local_in_plus(gs,vals);
3199 
3200       /* pairwise will do tree inside ... */
3201       if (gs->num_pairs)
3202         {gs_gop_pairwise_plus_hc(gs,vals,dim);}
3203 
3204       /* tree only */
3205       else if (gs->max_left_over)
3206         {gs_gop_tree_plus_hc(gs,vals,dim);}
3207 
3208       gs_gop_local_out(gs,vals);
3209     }
3210   /* if intersection tree/pairwise and local is empty */
3211   else
3212     {
3213       /* pairwise will do tree inside */
3214       if (gs->num_pairs)
3215         {gs_gop_pairwise_plus_hc(gs,vals,dim);}
3216 
3217       /* tree */
3218       else if (gs->max_left_over)
3219         {gs_gop_tree_plus_hc(gs,vals,dim);}
3220     }
3221   PetscFunctionReturn(0);
3222 }
3223 
3224 /******************************************************************************/
3225 static PetscErrorCode gs_gop_pairwise_plus_hc( gs_id *gs,  PetscScalar *in_vals, PetscInt dim)
3226 {
3227    PetscScalar *dptr1, *dptr2, *dptr3, *in1, *in2;
3228    PetscInt *iptr, *msg_list, *msg_size, **msg_nodes;
3229    PetscInt *pw, *list, *size, **nodes;
3230   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
3231   MPI_Status status;
3232   PetscInt i, mask=1;
3233   PetscErrorCode ierr;
3234 
3235   PetscFunctionBegin;
3236   for (i=1; i<dim; i++)
3237     {mask<<=1; mask++;}
3238 
3239 
3240   /* strip and load s */
3241   msg_list =list         = gs->pair_list;
3242   msg_size =size         = gs->msg_sizes;
3243   msg_nodes=nodes        = gs->node_list;
3244   iptr=pw                = gs->pw_elm_list;
3245   dptr1=dptr3            = gs->pw_vals;
3246   msg_ids_in  = ids_in   = gs->msg_ids_in;
3247   msg_ids_out = ids_out  = gs->msg_ids_out;
3248   dptr2                  = gs->out;
3249   in1=in2                = gs->in;
3250 
3251   /* post the receives */
3252   /*  msg_nodes=nodes; */
3253   do
3254     {
3255       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
3256          second one *list and do list++ afterwards */
3257       if ((my_id|mask)==(*list|mask))
3258         {
3259           ierr = MPI_Irecv(in1, *size, MPIU_SCALAR, MPI_ANY_SOURCE, MSGTAG1 + *list++, gs->gs_comm, msg_ids_in++);CHKERRQ(ierr);
3260           in1 += *size++;
3261         }
3262       else
3263         {list++; size++;}
3264     }
3265   while (*++msg_nodes);
3266 
3267   /* load gs values into in out gs buffers */
3268   while (*iptr >= 0)
3269     {*dptr3++ = *(in_vals + *iptr++);}
3270 
3271   /* load out buffers and post the sends */
3272   msg_nodes=nodes;
3273   list = msg_list;
3274   while ((iptr = *msg_nodes++))
3275     {
3276       if ((my_id|mask)==(*list|mask))
3277         {
3278           dptr3 = dptr2;
3279           while (*iptr >= 0)
3280             {*dptr2++ = *(dptr1 + *iptr++);}
3281           /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
3282           /* is msg_ids_out++ correct? */
3283           ierr = MPI_Isend(dptr3, *msg_size++, MPIU_SCALAR, *list++, MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);CHKERRQ(ierr);
3284         }
3285       else
3286         {list++; msg_size++;}
3287     }
3288 
3289   /* do the tree while we're waiting */
3290   if (gs->max_left_over)
3291     {gs_gop_tree_plus_hc(gs,in_vals,dim);}
3292 
3293   /* process the received data */
3294   msg_nodes=nodes;
3295   list = msg_list;
3296   while ((iptr = *nodes++))
3297     {
3298       if ((my_id|mask)==(*list|mask))
3299         {
3300           /* Should I check the return value of MPI_Wait() or status? */
3301           /* Can this loop be replaced by a call to MPI_Waitall()? */
3302           ierr = MPI_Wait(ids_in++, &status);CHKERRQ(ierr);
3303           while (*iptr >= 0)
3304             {*(dptr1 + *iptr++) += *in2++;}
3305         }
3306       list++;
3307     }
3308 
3309   /* replace vals */
3310   while (*pw >= 0)
3311     {*(in_vals + *pw++) = *dptr1++;}
3312 
3313   /* clear isend message handles */
3314   /* This changed for clarity though it could be the same */
3315   while (*msg_nodes++)
3316     {
3317       if ((my_id|mask)==(*msg_list|mask))
3318         {
3319           /* Should I check the return value of MPI_Wait() or status? */
3320           /* Can this loop be replaced by a call to MPI_Waitall()? */
3321           ierr = MPI_Wait(ids_out++, &status);CHKERRQ(ierr);
3322         }
3323       msg_list++;
3324     }
3325 
3326   PetscFunctionReturn(0);
3327 }
3328 
3329 /******************************************************************************/
3330 static PetscErrorCode gs_gop_tree_plus_hc(gs_id *gs, PetscScalar *vals, PetscInt dim)
3331 {
3332   PetscInt size;
3333   PetscInt *in, *out;
3334   PetscScalar *buf, *work;
3335   PetscInt op[] = {GL_ADD,0};
3336 
3337   PetscFunctionBegin;
3338   in   = gs->tree_map_in;
3339   out  = gs->tree_map_out;
3340   buf  = gs->tree_buf;
3341   work = gs->tree_work;
3342   size = gs->tree_nel;
3343 
3344   rvec_zero(buf,size);
3345 
3346   while (*in >= 0)
3347     {*(buf + *out++) = *(vals + *in++);}
3348 
3349   in   = gs->tree_map_in;
3350   out  = gs->tree_map_out;
3351 
3352   grop_hc(buf,work,size,op,dim);
3353 
3354   while (*in >= 0)
3355     {*(vals + *in++) = *(buf + *out++);}
3356   PetscFunctionReturn(0);
3357 }
3358 
3359 
3360 
3361