xref: /petsc/src/ksp/pc/impls/tfs/gs.c (revision f1ed62a8326c47f0b5aa980386b7ed5f27a86ba7)
1 #define PETSCKSP_DLL
2 
3 /***********************************gs.c***************************************
4 
5 Author: Henry M. Tufo III
6 
7 e-mail: hmt@cs.brown.edu
8 
9 snail-mail:
10 Division of Applied Mathematics
11 Brown University
12 Providence, RI 02912
13 
14 Last Modification:
15 6.21.97
16 ************************************gs.c**************************************/
17 
18 /***********************************gs.c***************************************
19 File Description:
20 -----------------
21 
22 ************************************gs.c**************************************/
23 
24 #include "src/ksp/pc/impls/tfs/tfs.h"
25 
26 /* default length of number of items via tree - doubles if exceeded */
27 #define TREE_BUF_SZ 2048;
28 #define GS_VEC_SZ   1
29 
30 
31 
32 /***********************************gs.c***************************************
33 Type: struct gather_scatter_id
34 ------------------------------
35 
36 ************************************gs.c**************************************/
37 typedef struct gather_scatter_id {
38   PetscInt id;
39   PetscInt nel_min;
40   PetscInt nel_max;
41   PetscInt nel_sum;
42   PetscInt negl;
43   PetscInt gl_max;
44   PetscInt gl_min;
45   PetscInt repeats;
46   PetscInt ordered;
47   PetscInt positive;
48   PetscScalar *vals;
49 
50   /* bit mask info */
51   PetscInt *my_proc_mask;
52   PetscInt mask_sz;
53   PetscInt *ngh_buf;
54   PetscInt ngh_buf_sz;
55   PetscInt *nghs;
56   PetscInt num_nghs;
57   PetscInt max_nghs;
58   PetscInt *pw_nghs;
59   PetscInt num_pw_nghs;
60   PetscInt *tree_nghs;
61   PetscInt num_tree_nghs;
62 
63   PetscInt num_loads;
64 
65   /* repeats == true -> local info */
66   PetscInt nel;         /* number of unique elememts */
67   PetscInt *elms;       /* of size nel */
68   PetscInt nel_total;
69   PetscInt *local_elms; /* of size nel_total */
70   PetscInt *companion;  /* of size nel_total */
71 
72   /* local info */
73   PetscInt num_local_total;
74   PetscInt local_strength;
75   PetscInt num_local;
76   PetscInt *num_local_reduce;
77   PetscInt **local_reduce;
78   PetscInt num_local_gop;
79   PetscInt *num_gop_local_reduce;
80   PetscInt **gop_local_reduce;
81 
82   /* pairwise info */
83   PetscInt level;
84   PetscInt num_pairs;
85   PetscInt max_pairs;
86   PetscInt loc_node_pairs;
87   PetscInt max_node_pairs;
88   PetscInt min_node_pairs;
89   PetscInt avg_node_pairs;
90   PetscInt *pair_list;
91   PetscInt *msg_sizes;
92   PetscInt **node_list;
93   PetscInt len_pw_list;
94   PetscInt *pw_elm_list;
95   PetscScalar *pw_vals;
96 
97   MPI_Request *msg_ids_in;
98   MPI_Request *msg_ids_out;
99 
100   PetscScalar *out;
101   PetscScalar *in;
102   PetscInt msg_total;
103 
104   /* tree - crystal accumulator info */
105   PetscInt max_left_over;
106   PetscInt *pre;
107   PetscInt *in_num;
108   PetscInt *out_num;
109   PetscInt **in_list;
110   PetscInt **out_list;
111 
112   /* new tree work*/
113   PetscInt  tree_nel;
114   PetscInt *tree_elms;
115   PetscScalar *tree_buf;
116   PetscScalar *tree_work;
117 
118   PetscInt  tree_map_sz;
119   PetscInt *tree_map_in;
120   PetscInt *tree_map_out;
121 
122   /* current memory status */
123   PetscInt gl_bss_min;
124   PetscInt gl_perm_min;
125 
126   /* max segment size for gs_gop_vec() */
127   PetscInt vec_sz;
128 
129   /* hack to make paul happy */
130   MPI_Comm gs_comm;
131 
132 } gs_id;
133 
134 static gs_id *gsi_check_args(PetscInt *elms, PetscInt nel, PetscInt level);
135 static PetscErrorCode gsi_via_bit_mask(gs_id *gs);
136 static PetscErrorCode get_ngh_buf(gs_id *gs);
137 static PetscErrorCode set_pairwise(gs_id *gs);
138 static gs_id * gsi_new(void);
139 static PetscErrorCode set_tree(gs_id *gs);
140 
141 /* same for all but vector flavor */
142 static PetscErrorCode gs_gop_local_out(gs_id *gs, PetscScalar *vals);
143 /* vector flavor */
144 static PetscErrorCode gs_gop_vec_local_out(gs_id *gs, PetscScalar *vals, PetscInt step);
145 
146 static PetscErrorCode gs_gop_vec_plus(gs_id *gs, PetscScalar *in_vals, PetscInt step);
147 static PetscErrorCode gs_gop_vec_pairwise_plus(gs_id *gs, PetscScalar *in_vals, PetscInt step);
148 static PetscErrorCode gs_gop_vec_local_plus(gs_id *gs, PetscScalar *vals, PetscInt step);
149 static PetscErrorCode gs_gop_vec_local_in_plus(gs_id *gs, PetscScalar *vals, PetscInt step);
150 static PetscErrorCode gs_gop_vec_tree_plus(gs_id *gs, PetscScalar *vals, PetscInt step);
151 
152 
153 static PetscErrorCode gs_gop_plus(gs_id *gs, PetscScalar *in_vals);
154 static PetscErrorCode gs_gop_pairwise_plus(gs_id *gs, PetscScalar *in_vals);
155 static PetscErrorCode gs_gop_local_plus(gs_id *gs, PetscScalar *vals);
156 static PetscErrorCode gs_gop_local_in_plus(gs_id *gs, PetscScalar *vals);
157 static PetscErrorCode gs_gop_tree_plus(gs_id *gs, PetscScalar *vals);
158 
159 static PetscErrorCode gs_gop_plus_hc(gs_id *gs, PetscScalar *in_vals, PetscInt dim);
160 static PetscErrorCode gs_gop_pairwise_plus_hc(gs_id *gs, PetscScalar *in_vals, PetscInt dim);
161 static PetscErrorCode gs_gop_tree_plus_hc(gs_id *gs, PetscScalar *vals, PetscInt dim);
162 
163 static PetscErrorCode gs_gop_times(gs_id *gs, PetscScalar *in_vals);
164 static PetscErrorCode gs_gop_pairwise_times(gs_id *gs, PetscScalar *in_vals);
165 static PetscErrorCode gs_gop_local_times(gs_id *gs, PetscScalar *vals);
166 static PetscErrorCode gs_gop_local_in_times(gs_id *gs, PetscScalar *vals);
167 static PetscErrorCode gs_gop_tree_times(gs_id *gs, PetscScalar *vals);
168 
169 static PetscErrorCode gs_gop_min(gs_id *gs, PetscScalar *in_vals);
170 static PetscErrorCode gs_gop_pairwise_min(gs_id *gs, PetscScalar *in_vals);
171 static PetscErrorCode gs_gop_local_min(gs_id *gs, PetscScalar *vals);
172 static PetscErrorCode gs_gop_local_in_min(gs_id *gs, PetscScalar *vals);
173 static PetscErrorCode gs_gop_tree_min(gs_id *gs, PetscScalar *vals);
174 
175 static PetscErrorCode gs_gop_min_abs(gs_id *gs, PetscScalar *in_vals);
176 static PetscErrorCode gs_gop_pairwise_min_abs(gs_id *gs, PetscScalar *in_vals);
177 static PetscErrorCode gs_gop_local_min_abs(gs_id *gs, PetscScalar *vals);
178 static PetscErrorCode gs_gop_local_in_min_abs(gs_id *gs, PetscScalar *vals);
179 static PetscErrorCode gs_gop_tree_min_abs(gs_id *gs, PetscScalar *vals);
180 
181 static PetscErrorCode gs_gop_max(gs_id *gs, PetscScalar *in_vals);
182 static PetscErrorCode gs_gop_pairwise_max(gs_id *gs, PetscScalar *in_vals);
183 static PetscErrorCode gs_gop_local_max(gs_id *gs, PetscScalar *vals);
184 static PetscErrorCode gs_gop_local_in_max(gs_id *gs, PetscScalar *vals);
185 static PetscErrorCode gs_gop_tree_max(gs_id *gs, PetscScalar *vals);
186 
187 static PetscErrorCode gs_gop_max_abs(gs_id *gs, PetscScalar *in_vals);
188 static PetscErrorCode gs_gop_pairwise_max_abs(gs_id *gs, PetscScalar *in_vals);
189 static PetscErrorCode gs_gop_local_max_abs(gs_id *gs, PetscScalar *vals);
190 static PetscErrorCode gs_gop_local_in_max_abs(gs_id *gs, PetscScalar *vals);
191 static PetscErrorCode gs_gop_tree_max_abs(gs_id *gs, PetscScalar *vals);
192 
193 static PetscErrorCode gs_gop_exists(gs_id *gs, PetscScalar *in_vals);
194 static PetscErrorCode gs_gop_pairwise_exists(gs_id *gs, PetscScalar *in_vals);
195 static PetscErrorCode gs_gop_local_exists(gs_id *gs, PetscScalar *vals);
196 static PetscErrorCode gs_gop_local_in_exists(gs_id *gs, PetscScalar *vals);
197 static PetscErrorCode gs_gop_tree_exists(gs_id *gs, PetscScalar *vals);
198 
199 static PetscErrorCode gs_gop_pairwise_binary(gs_id *gs, PetscScalar *in_vals, rbfp fct);
200 static PetscErrorCode gs_gop_local_binary(gs_id *gs, PetscScalar *vals, rbfp fct);
201 static PetscErrorCode gs_gop_local_in_binary(gs_id *gs, PetscScalar *vals, rbfp fct);
202 static PetscErrorCode gs_gop_tree_binary(gs_id *gs, PetscScalar *vals, rbfp fct);
203 
204 
205 
206 /* global vars */
207 /* from comm.c module */
208 
209 static PetscInt num_gs_ids = 0;
210 
211 /* should make this dynamic ... later */
212 static PetscInt msg_buf=MAX_MSG_BUF;
213 static PetscInt vec_sz=GS_VEC_SZ;
214 static PetscInt *tree_buf=NULL;
215 static PetscInt tree_buf_sz=0;
216 static PetscInt ntree=0;
217 
218 /***************************************************************************/
219 PetscErrorCode gs_init_vec_sz(PetscInt size)
220 {
221   PetscFunctionBegin;
222   vec_sz = size;
223   PetscFunctionReturn(0);
224 }
225 
226 /******************************************************************************/
227 PetscErrorCode gs_init_msg_buf_sz(PetscInt buf_size)
228 {
229   PetscFunctionBegin;
230   msg_buf = buf_size;
231   PetscFunctionReturn(0);
232 }
233 
234 /******************************************************************************/
235 gs_id *gs_init( PetscInt *elms, PetscInt nel, PetscInt level)
236 {
237    gs_id *gs;
238   MPI_Group gs_group;
239   MPI_Comm  gs_comm;
240   PetscErrorCode ierr;
241 
242   PetscFunctionBegin;
243   /* ensure that communication package has been initialized */
244   comm_init();
245 
246 
247   /* determines if we have enough dynamic/semi-static memory */
248   /* checks input, allocs and sets gd_id template            */
249   gs = gsi_check_args(elms,nel,level);
250 
251   /* only bit mask version up and working for the moment    */
252   /* LATER :: get int list version working for sparse pblms */
253   ierr = gsi_via_bit_mask(gs);CHKERRABORT(PETSC_COMM_WORLD,ierr);
254 
255 
256   ierr = MPI_Comm_group(MPI_COMM_WORLD,&gs_group);CHKERRABORT(PETSC_COMM_WORLD,ierr);
257   ierr = MPI_Comm_create(MPI_COMM_WORLD,gs_group,&gs_comm);CHKERRABORT(PETSC_COMM_WORLD,ierr);
258   gs->gs_comm=gs_comm;
259 
260   return(gs);
261 }
262 
263 /******************************************************************************/
264 static gs_id *gsi_new(void)
265 {
266   PetscErrorCode ierr;
267   gs_id *gs;
268   gs = (gs_id *) malloc(sizeof(gs_id));
269   ierr = PetscMemzero(gs,sizeof(gs_id));CHKERRABORT(PETSC_COMM_WORLD,ierr);
270   return(gs);
271 }
272 
273 /******************************************************************************/
274 static gs_id * gsi_check_args(PetscInt *in_elms, PetscInt nel, PetscInt level)
275 {
276    PetscInt i, j, k, t2;
277   PetscInt *companion, *elms, *unique, *iptr;
278   PetscInt num_local=0, *num_to_reduce, **local_reduce;
279   PetscInt oprs[] = {NON_UNIFORM,GL_MIN,GL_MAX,GL_ADD,GL_MIN,GL_MAX,GL_MIN,GL_B_AND};
280   PetscInt vals[sizeof(oprs)/sizeof(oprs[0])-1];
281   PetscInt work[sizeof(oprs)/sizeof(oprs[0])-1];
282   gs_id *gs;
283   PetscErrorCode ierr;
284 
285 
286   if (!in_elms)
287     {SETERRABORT(PETSC_COMM_WORLD,PETSC_ERR_PLIB,"elms point to nothing!!!\n");}
288 
289   if (nel<0)
290     {SETERRABORT(PETSC_COMM_WORLD,PETSC_ERR_PLIB,"can't have fewer than 0 elms!!!\n");}
291 
292   if (nel==0)
293     {ierr = PetscInfo(0,"I don't have any elements!!!\n");CHKERRABORT(PETSC_COMM_WORLD,ierr);}
294 
295   /* get space for gs template */
296   gs = gsi_new();
297   gs->id = ++num_gs_ids;
298 
299   /* hmt 6.4.99                                            */
300   /* caller can set global ids that don't participate to 0 */
301   /* gs_init ignores all zeros in elm list                 */
302   /* negative global ids are still invalid                 */
303   for (i=j=0;i<nel;i++)
304     {if (in_elms[i]!=0) {j++;}}
305 
306   k=nel; nel=j;
307 
308   /* copy over in_elms list and create inverse map */
309   elms = (PetscInt*) malloc((nel+1)*sizeof(PetscInt));
310   companion = (PetscInt*) malloc(nel*sizeof(PetscInt));
311 
312   for (i=j=0;i<k;i++)
313     {
314       if (in_elms[i]!=0)
315         {elms[j] = in_elms[i]; companion[j++] = i;}
316     }
317 
318   if (j!=nel)
319     {SETERRABORT(PETSC_COMM_WORLD,PETSC_ERR_PLIB,"nel j mismatch!\n");}
320 
321   /* pre-pass ... check to see if sorted */
322   elms[nel] = INT_MAX;
323   iptr = elms;
324   unique = elms+1;
325   j=0;
326   while (*iptr!=INT_MAX)
327     {
328       if (*iptr++>*unique++)
329         {j=1; break;}
330     }
331 
332   /* set up inverse map */
333   if (j)
334     {
335       ierr = PetscInfo(0,"gsi_check_args() :: elm list *not* sorted!\n");CHKERRABORT(PETSC_COMM_WORLD,ierr);
336       ierr = SMI_sort((void*)elms, (void*)companion, nel, SORT_INTEGER);CHKERRABORT(PETSC_COMM_WORLD,ierr);
337     }
338   else
339     {ierr = PetscInfo(0,"gsi_check_args() :: elm list sorted!\n");CHKERRABORT(PETSC_COMM_WORLD,ierr);}
340   elms[nel] = INT_MIN;
341 
342   /* first pass */
343   /* determine number of unique elements, check pd */
344   for (i=k=0;i<nel;i+=j)
345     {
346       t2 = elms[i];
347       j=++i;
348 
349       /* clump 'em for now */
350       while (elms[j]==t2) {j++;}
351 
352       /* how many together and num local */
353       if (j-=i)
354         {num_local++; k+=j;}
355     }
356 
357   /* how many unique elements? */
358   gs->repeats=k;
359   gs->nel = nel-k;
360 
361 
362   /* number of repeats? */
363   gs->num_local = num_local;
364   num_local+=2;
365   gs->local_reduce=local_reduce=(PetscInt **)malloc(num_local*sizeof(PetscInt*));
366   gs->num_local_reduce=num_to_reduce=(PetscInt*) malloc(num_local*sizeof(PetscInt));
367 
368   unique = (PetscInt*) malloc((gs->nel+1)*sizeof(PetscInt));
369   gs->elms = unique;
370   gs->nel_total = nel;
371   gs->local_elms = elms;
372   gs->companion = companion;
373 
374   /* compess map as well as keep track of local ops */
375   for (num_local=i=j=0;i<gs->nel;i++)
376     {
377       k=j;
378       t2 = unique[i] = elms[j];
379       companion[i] = companion[j];
380 
381       while (elms[j]==t2) {j++;}
382 
383       if ((t2=(j-k))>1)
384         {
385           /* number together */
386           num_to_reduce[num_local] = t2++;
387           iptr = local_reduce[num_local++] = (PetscInt*)malloc(t2*sizeof(PetscInt));
388 
389           /* to use binary searching don't remap until we check intersection */
390           *iptr++ = i;
391 
392           /* note that we're skipping the first one */
393           while (++k<j)
394             {*(iptr++) = companion[k];}
395           *iptr = -1;
396         }
397     }
398 
399   /* sentinel for ngh_buf */
400   unique[gs->nel]=INT_MAX;
401 
402   /* for two partition sort hack */
403   num_to_reduce[num_local] = 0;
404   local_reduce[num_local] = NULL;
405   num_to_reduce[++num_local] = 0;
406   local_reduce[num_local] = NULL;
407 
408   /* load 'em up */
409   /* note one extra to hold NON_UNIFORM flag!!! */
410   vals[2] = vals[1] = vals[0] = nel;
411   if (gs->nel>0)
412     {
413        vals[3] = unique[0];
414        vals[4] = unique[gs->nel-1];
415     }
416   else
417     {
418        vals[3] = INT_MAX;
419        vals[4] = INT_MIN;
420     }
421   vals[5] = level;
422   vals[6] = num_gs_ids;
423 
424   /* GLOBAL: send 'em out */
425   ierr = giop(vals,work,sizeof(oprs)/sizeof(oprs[0])-1,oprs);CHKERRABORT(PETSC_COMM_WORLD,ierr);
426 
427   /* must be semi-pos def - only pairwise depends on this */
428   /* LATER - remove this restriction */
429   if (vals[3]<0)
430     {SETERRABORT(PETSC_COMM_WORLD,PETSC_ERR_PLIB,"gsi_check_args() :: system not semi-pos def \n");}
431 
432   if (vals[4]==INT_MAX)
433     {SETERRABORT(PETSC_COMM_WORLD,PETSC_ERR_PLIB,"gsi_check_args() :: system ub too large !\n");}
434 
435   gs->nel_min = vals[0];
436   gs->nel_max = vals[1];
437   gs->nel_sum = vals[2];
438   gs->gl_min  = vals[3];
439   gs->gl_max  = vals[4];
440   gs->negl    = vals[4]-vals[3]+1;
441 
442   if (gs->negl<=0)
443     {SETERRABORT(PETSC_COMM_WORLD,PETSC_ERR_PLIB,"gsi_check_args() :: system empty or neg :: %d\n");}
444 
445   /* LATER :: add level == -1 -> program selects level */
446   if (vals[5]<0)
447     {vals[5]=0;}
448   else if (vals[5]>num_nodes)
449     {vals[5]=num_nodes;}
450   gs->level = vals[5];
451 
452   return(gs);
453 }
454 
455 /******************************************************************************/
456 static PetscErrorCode gsi_via_bit_mask(gs_id *gs)
457 {
458    PetscInt i, nel, *elms;
459   PetscInt t1;
460   PetscInt **reduce;
461   PetscInt *map;
462   PetscErrorCode ierr;
463 
464   PetscFunctionBegin;
465   /* totally local removes ... ct_bits == 0 */
466   get_ngh_buf(gs);
467 
468   if (gs->level)
469     {set_pairwise(gs);}
470 
471   if (gs->max_left_over)
472     {set_tree(gs);}
473 
474   /* intersection local and pairwise/tree? */
475   gs->num_local_total = gs->num_local;
476   gs->gop_local_reduce = gs->local_reduce;
477   gs->num_gop_local_reduce = gs->num_local_reduce;
478 
479   map = gs->companion;
480 
481   /* is there any local compression */
482   if (!gs->num_local) {
483     gs->local_strength = NONE;
484     gs->num_local_gop = 0;
485   } else {
486       /* ok find intersection */
487       map = gs->companion;
488       reduce = gs->local_reduce;
489       for (i=0, t1=0; i<gs->num_local; i++, reduce++)
490         {
491           if ((ivec_binary_search(**reduce,gs->pw_elm_list,gs->len_pw_list)>=0)
492               ||
493               ivec_binary_search(**reduce,gs->tree_map_in,gs->tree_map_sz)>=0)
494             {
495               t1++;
496               if (gs->num_local_reduce[i]<=0) SETERRQ(PETSC_ERR_PLIB,"nobody in list?");
497               gs->num_local_reduce[i] *= -1;
498             }
499            **reduce=map[**reduce];
500         }
501 
502       /* intersection is empty */
503       if (!t1)
504         {
505           gs->local_strength = FULL;
506           gs->num_local_gop = 0;
507         }
508       /* intersection not empty */
509       else
510         {
511           gs->local_strength = PARTIAL;
512           ierr = SMI_sort((void*)gs->num_local_reduce, (void*)gs->local_reduce, gs->num_local + 1, SORT_INT_PTR);CHKERRQ(ierr);
513 
514           gs->num_local_gop = t1;
515           gs->num_local_total =  gs->num_local;
516           gs->num_local    -= t1;
517           gs->gop_local_reduce = gs->local_reduce;
518           gs->num_gop_local_reduce = gs->num_local_reduce;
519 
520           for (i=0; i<t1; i++)
521             {
522               if (gs->num_gop_local_reduce[i]>=0) SETERRQ(PETSC_ERR_PLIB,"they aren't negative?");
523               gs->num_gop_local_reduce[i] *= -1;
524               gs->local_reduce++;
525               gs->num_local_reduce++;
526             }
527           gs->local_reduce++;
528           gs->num_local_reduce++;
529         }
530     }
531 
532   elms = gs->pw_elm_list;
533   nel  = gs->len_pw_list;
534   for (i=0; i<nel; i++)
535     {elms[i] = map[elms[i]];}
536 
537   elms = gs->tree_map_in;
538   nel  = gs->tree_map_sz;
539   for (i=0; i<nel; i++)
540     {elms[i] = map[elms[i]];}
541 
542   /* clean up */
543   free((void*) gs->local_elms);
544   free((void*) gs->companion);
545   free((void*) gs->elms);
546   free((void*) gs->ngh_buf);
547   gs->local_elms = gs->companion = gs->elms = gs->ngh_buf = NULL;
548   PetscFunctionReturn(0);
549 }
550 
551 /******************************************************************************/
552 static PetscErrorCode place_in_tree( PetscInt elm)
553 {
554    PetscInt *tp, n;
555 
556   PetscFunctionBegin;
557   if (ntree==tree_buf_sz)
558     {
559       if (tree_buf_sz)
560         {
561           tp = tree_buf;
562           n = tree_buf_sz;
563           tree_buf_sz<<=1;
564           tree_buf = (PetscInt*)malloc(tree_buf_sz*sizeof(PetscInt));
565           ivec_copy(tree_buf,tp,n);
566           free(tp);
567         }
568       else
569         {
570           tree_buf_sz = TREE_BUF_SZ;
571           tree_buf = (PetscInt*)malloc(tree_buf_sz*sizeof(PetscInt));
572         }
573     }
574 
575   tree_buf[ntree++] = elm;
576   PetscFunctionReturn(0);
577 }
578 
579 /******************************************************************************/
580 static PetscErrorCode get_ngh_buf(gs_id *gs)
581 {
582    PetscInt i, j, npw=0, ntree_map=0;
583   PetscInt p_mask_size, ngh_buf_size, buf_size;
584   PetscInt *p_mask, *sh_proc_mask, *pw_sh_proc_mask;
585   PetscInt *ngh_buf, *buf1, *buf2;
586   PetscInt offset, per_load, num_loads, or_ct, start, end;
587   PetscInt *ptr1, *ptr2, i_start, negl, nel, *elms;
588   PetscInt oper=GL_B_OR;
589   PetscInt *ptr3, *t_mask, level, ct1, ct2;
590   PetscErrorCode ierr;
591 
592   PetscFunctionBegin;
593   /* to make life easier */
594   nel   = gs->nel;
595   elms  = gs->elms;
596   level = gs->level;
597 
598   /* det #bytes needed for processor bit masks and init w/mask cor. to my_id */
599   p_mask = (PetscInt*) malloc(p_mask_size=len_bit_mask(num_nodes));
600   ierr = set_bit_mask(p_mask,p_mask_size,my_id);CHKERRQ(ierr);
601 
602   /* allocate space for masks and info bufs */
603   gs->nghs = sh_proc_mask = (PetscInt*) malloc(p_mask_size);
604   gs->pw_nghs = pw_sh_proc_mask = (PetscInt*) malloc(p_mask_size);
605   gs->ngh_buf_sz = ngh_buf_size = p_mask_size*nel;
606   t_mask = (PetscInt*) malloc(p_mask_size);
607   gs->ngh_buf = ngh_buf = (PetscInt*) malloc(ngh_buf_size);
608 
609   /* comm buffer size ... memory usage bounded by ~2*msg_buf */
610   /* had thought I could exploit rendezvous threshold */
611 
612   /* default is one pass */
613   per_load = negl  = gs->negl;
614   gs->num_loads = num_loads = 1;
615   i=p_mask_size*negl;
616 
617   /* possible overflow on buffer size */
618   /* overflow hack                    */
619   if (i<0) {i=INT_MAX;}
620 
621   buf_size = PetscMin(msg_buf,i);
622 
623   /* can we do it? */
624   if (p_mask_size>buf_size) SETERRQ2(PETSC_ERR_PLIB,"get_ngh_buf() :: buf<pms :: %d>%d\n",p_mask_size,buf_size);
625 
626   /* get giop buf space ... make *only* one malloc */
627   buf1 = (PetscInt*) malloc(buf_size<<1);
628 
629   /* more than one gior exchange needed? */
630   if (buf_size!=i)
631     {
632       per_load = buf_size/p_mask_size;
633       buf_size = per_load*p_mask_size;
634       gs->num_loads = num_loads = negl/per_load + (negl%per_load>0);
635     }
636 
637 
638   /* convert buf sizes from #bytes to #ints - 32 bit only! */
639   p_mask_size/=sizeof(PetscInt); ngh_buf_size/=sizeof(PetscInt); buf_size/=sizeof(PetscInt);
640 
641   /* find giop work space */
642   buf2 = buf1+buf_size;
643 
644   /* hold #ints needed for processor masks */
645   gs->mask_sz=p_mask_size;
646 
647   /* init buffers */
648   ierr = ivec_zero(sh_proc_mask,p_mask_size);CHKERRQ(ierr);
649   ierr = ivec_zero(pw_sh_proc_mask,p_mask_size);CHKERRQ(ierr);
650   ierr = ivec_zero(ngh_buf,ngh_buf_size);CHKERRQ(ierr);
651 
652   /* HACK reset tree info */
653   tree_buf=NULL;
654   tree_buf_sz=ntree=0;
655 
656   /* ok do it */
657   for (ptr1=ngh_buf,ptr2=elms,end=gs->gl_min,or_ct=i=0; or_ct<num_loads; or_ct++)
658     {
659       /* identity for bitwise or is 000...000 */
660       ivec_zero(buf1,buf_size);
661 
662       /* load msg buffer */
663       for (start=end,end+=per_load,i_start=i; (offset=*ptr2)<end; i++, ptr2++)
664         {
665           offset = (offset-start)*p_mask_size;
666           ivec_copy(buf1+offset,p_mask,p_mask_size);
667         }
668 
669       /* GLOBAL: pass buffer */
670       ierr = giop(buf1,buf2,buf_size,&oper);CHKERRQ(ierr);
671 
672 
673       /* unload buffer into ngh_buf */
674       ptr2=(elms+i_start);
675       for(ptr3=buf1,j=start; j<end; ptr3+=p_mask_size,j++)
676         {
677           /* I own it ... may have to pairwise it */
678           if (j==*ptr2)
679             {
680               /* do i share it w/anyone? */
681               ct1 = ct_bits((char *)ptr3,p_mask_size*sizeof(PetscInt));
682               /* guess not */
683               if (ct1<2)
684                 {ptr2++; ptr1+=p_mask_size; continue;}
685 
686               /* i do ... so keep info and turn off my bit */
687               ivec_copy(ptr1,ptr3,p_mask_size);
688               ierr = ivec_xor(ptr1,p_mask,p_mask_size);CHKERRQ(ierr);
689               ierr = ivec_or(sh_proc_mask,ptr1,p_mask_size);CHKERRQ(ierr);
690 
691               /* is it to be done pairwise? */
692               if (--ct1<=level)
693                 {
694                   npw++;
695 
696                   /* turn on high bit to indicate pw need to process */
697                   *ptr2++ |= TOP_BIT;
698                   ierr = ivec_or(pw_sh_proc_mask,ptr1,p_mask_size);CHKERRQ(ierr);
699                   ptr1+=p_mask_size;
700                   continue;
701                 }
702 
703               /* get set for next and note that I have a tree contribution */
704               /* could save exact elm index for tree here -> save a search */
705               ptr2++; ptr1+=p_mask_size; ntree_map++;
706             }
707           /* i don't but still might be involved in tree */
708           else
709             {
710 
711               /* shared by how many? */
712               ct1 = ct_bits((char *)ptr3,p_mask_size*sizeof(PetscInt));
713 
714               /* none! */
715               if (ct1<2) continue;
716 
717               /* is it going to be done pairwise? but not by me of course!*/
718               if (--ct1<=level) continue;
719             }
720           /* LATER we're going to have to process it NOW */
721           /* nope ... tree it */
722           ierr = place_in_tree(j);CHKERRQ(ierr);
723         }
724     }
725 
726   free((void*)t_mask);
727   free((void*)buf1);
728 
729   gs->len_pw_list=npw;
730   gs->num_nghs = ct_bits((char *)sh_proc_mask,p_mask_size*sizeof(PetscInt));
731 
732   /* expand from bit mask list to int list and save ngh list */
733   gs->nghs = (PetscInt*) malloc(gs->num_nghs * sizeof(PetscInt));
734   bm_to_proc((char *)sh_proc_mask,p_mask_size*sizeof(PetscInt),gs->nghs);
735 
736   gs->num_pw_nghs = ct_bits((char *)pw_sh_proc_mask,p_mask_size*sizeof(PetscInt));
737 
738   oper = GL_MAX;
739   ct1 = gs->num_nghs;
740   ierr = giop(&ct1,&ct2,1,&oper);CHKERRQ(ierr);
741   gs->max_nghs = ct1;
742 
743   gs->tree_map_sz  = ntree_map;
744   gs->max_left_over=ntree;
745 
746   free((void*)p_mask);
747   free((void*)sh_proc_mask);
748   PetscFunctionReturn(0);
749 }
750 
751 /******************************************************************************/
752 static PetscErrorCode set_pairwise(gs_id *gs)
753 {
754    PetscInt i, j;
755   PetscInt p_mask_size;
756   PetscInt *p_mask, *sh_proc_mask, *tmp_proc_mask;
757   PetscInt *ngh_buf, *buf2;
758   PetscInt offset;
759   PetscInt *msg_list, *msg_size, **msg_nodes, nprs;
760   PetscInt *pairwise_elm_list, len_pair_list=0;
761   PetscInt *iptr, t1, i_start, nel, *elms;
762   PetscInt ct;
763   PetscErrorCode ierr;
764 
765   PetscFunctionBegin;
766   /* to make life easier */
767   nel  = gs->nel;
768   elms = gs->elms;
769   ngh_buf = gs->ngh_buf;
770   sh_proc_mask  = gs->pw_nghs;
771 
772   /* need a few temp masks */
773   p_mask_size   = len_bit_mask(num_nodes);
774   p_mask        = (PetscInt*) malloc(p_mask_size);
775   tmp_proc_mask = (PetscInt*) malloc(p_mask_size);
776 
777   /* set mask to my my_id's bit mask */
778   ierr = set_bit_mask(p_mask,p_mask_size,my_id);CHKERRQ(ierr);
779 
780   p_mask_size /= sizeof(PetscInt);
781 
782   len_pair_list=gs->len_pw_list;
783   gs->pw_elm_list=pairwise_elm_list=(PetscInt*)malloc((len_pair_list+1)*sizeof(PetscInt));
784 
785   /* how many processors (nghs) do we have to exchange with? */
786   nprs=gs->num_pairs=ct_bits((char *)sh_proc_mask,p_mask_size*sizeof(PetscInt));
787 
788 
789   /* allocate space for gs_gop() info */
790   gs->pair_list = msg_list = (PetscInt *)  malloc(sizeof(PetscInt)*nprs);
791   gs->msg_sizes = msg_size  = (PetscInt *)  malloc(sizeof(PetscInt)*nprs);
792   gs->node_list = msg_nodes = (PetscInt **) malloc(sizeof(PetscInt*)*(nprs+1));
793 
794   /* init msg_size list */
795   ierr = ivec_zero(msg_size,nprs);CHKERRQ(ierr);
796 
797   /* expand from bit mask list to int list */
798   ierr = bm_to_proc((char *)sh_proc_mask,p_mask_size*sizeof(PetscInt),msg_list);CHKERRQ(ierr);
799 
800   /* keep list of elements being handled pairwise */
801   for (i=j=0;i<nel;i++)
802     {
803       if (elms[i] & TOP_BIT)
804         {elms[i] ^= TOP_BIT; pairwise_elm_list[j++] = i;}
805     }
806   pairwise_elm_list[j] = -1;
807 
808   gs->msg_ids_out = (MPI_Request *)  malloc(sizeof(MPI_Request)*(nprs+1));
809   gs->msg_ids_out[nprs] = MPI_REQUEST_NULL;
810   gs->msg_ids_in = (MPI_Request *)  malloc(sizeof(MPI_Request)*(nprs+1));
811   gs->msg_ids_in[nprs] = MPI_REQUEST_NULL;
812   gs->pw_vals = (PetscScalar *) malloc(sizeof(PetscScalar)*len_pair_list*vec_sz);
813 
814   /* find who goes to each processor */
815   for (i_start=i=0;i<nprs;i++)
816     {
817       /* processor i's mask */
818       ierr = set_bit_mask(p_mask,p_mask_size*sizeof(PetscInt),msg_list[i]);CHKERRQ(ierr);
819 
820       /* det # going to processor i */
821       for (ct=j=0;j<len_pair_list;j++)
822         {
823           buf2 = ngh_buf+(pairwise_elm_list[j]*p_mask_size);
824           ierr = ivec_and3(tmp_proc_mask,p_mask,buf2,p_mask_size);CHKERRQ(ierr);
825           if (ct_bits((char *)tmp_proc_mask,p_mask_size*sizeof(PetscInt)))
826             {ct++;}
827         }
828       msg_size[i] = ct;
829       i_start = PetscMax(i_start,ct);
830 
831       /*space to hold nodes in message to first neighbor */
832       msg_nodes[i] = iptr = (PetscInt*) malloc(sizeof(PetscInt)*(ct+1));
833 
834       for (j=0;j<len_pair_list;j++)
835         {
836           buf2 = ngh_buf+(pairwise_elm_list[j]*p_mask_size);
837           ierr = ivec_and3(tmp_proc_mask,p_mask,buf2,p_mask_size);CHKERRQ(ierr);
838           if (ct_bits((char *)tmp_proc_mask,p_mask_size*sizeof(PetscInt)))
839             {*iptr++ = j;}
840         }
841       *iptr = -1;
842     }
843   msg_nodes[nprs] = NULL;
844 
845   j=gs->loc_node_pairs=i_start;
846   t1 = GL_MAX;
847   ierr = giop(&i_start,&offset,1,&t1);CHKERRQ(ierr);
848   gs->max_node_pairs = i_start;
849 
850   i_start=j;
851   t1 = GL_MIN;
852   ierr = giop(&i_start,&offset,1,&t1);CHKERRQ(ierr);
853   gs->min_node_pairs = i_start;
854 
855   i_start=j;
856   t1 = GL_ADD;
857   ierr = giop(&i_start,&offset,1,&t1);CHKERRQ(ierr);
858   gs->avg_node_pairs = i_start/num_nodes + 1;
859 
860   i_start=nprs;
861   t1 = GL_MAX;
862   giop(&i_start,&offset,1,&t1);
863   gs->max_pairs = i_start;
864 
865 
866   /* remap pairwise in tail of gsi_via_bit_mask() */
867   gs->msg_total = ivec_sum(gs->msg_sizes,nprs);
868   gs->out = (PetscScalar *) malloc(sizeof(PetscScalar)*gs->msg_total*vec_sz);
869   gs->in  = (PetscScalar *) malloc(sizeof(PetscScalar)*gs->msg_total*vec_sz);
870 
871   /* reset malloc pool */
872   free((void*)p_mask);
873   free((void*)tmp_proc_mask);
874   PetscFunctionReturn(0);
875 }
876 
877 /* to do pruned tree just save ngh buf copy for each one and decode here!
878 ******************************************************************************/
879 static PetscErrorCode set_tree(gs_id *gs)
880 {
881   PetscInt i, j, n, nel;
882   PetscInt *iptr_in, *iptr_out, *tree_elms, *elms;
883 
884   PetscFunctionBegin;
885   /* local work ptrs */
886   elms = gs->elms;
887   nel     = gs->nel;
888 
889   /* how many via tree */
890   gs->tree_nel  = n = ntree;
891   gs->tree_elms = tree_elms = iptr_in = tree_buf;
892   gs->tree_buf  = (PetscScalar *) malloc(sizeof(PetscScalar)*n*vec_sz);
893   gs->tree_work = (PetscScalar *) malloc(sizeof(PetscScalar)*n*vec_sz);
894   j=gs->tree_map_sz;
895   gs->tree_map_in = iptr_in  = (PetscInt*) malloc(sizeof(PetscInt)*(j+1));
896   gs->tree_map_out = iptr_out = (PetscInt*) malloc(sizeof(PetscInt)*(j+1));
897 
898   /* search the longer of the two lists */
899   /* note ... could save this info in get_ngh_buf and save searches */
900   if (n<=nel)
901     {
902       /* bijective fct w/remap - search elm list */
903       for (i=0; i<n; i++)
904         {
905           if ((j=ivec_binary_search(*tree_elms++,elms,nel))>=0)
906             {*iptr_in++ = j; *iptr_out++ = i;}
907         }
908     }
909   else
910     {
911       for (i=0; i<nel; i++)
912         {
913           if ((j=ivec_binary_search(*elms++,tree_elms,n))>=0)
914             {*iptr_in++ = i; *iptr_out++ = j;}
915         }
916     }
917 
918   /* sentinel */
919   *iptr_in = *iptr_out = -1;
920   PetscFunctionReturn(0);
921 }
922 
923 /******************************************************************************/
924 static PetscErrorCode gs_gop_local_out( gs_id *gs,  PetscScalar *vals)
925 {
926   PetscInt *num, *map, **reduce;
927   PetscScalar tmp;
928 
929   PetscFunctionBegin;
930   num    = gs->num_gop_local_reduce;
931   reduce = gs->gop_local_reduce;
932   while ((map = *reduce++))
933     {
934       /* wall */
935       if (*num == 2)
936         {
937           num ++;
938           vals[map[1]] = vals[map[0]];
939         }
940       /* corner shared by three elements */
941       else if (*num == 3)
942         {
943           num ++;
944           vals[map[2]] = vals[map[1]] = vals[map[0]];
945         }
946       /* corner shared by four elements */
947       else if (*num == 4)
948         {
949           num ++;
950           vals[map[3]] = vals[map[2]] = vals[map[1]] = vals[map[0]];
951         }
952       /* general case ... odd geoms ... 3D*/
953       else
954         {
955           num++;
956           tmp = *(vals + *map++);
957           while (*map >= 0)
958             {*(vals + *map++) = tmp;}
959         }
960     }
961   PetscFunctionReturn(0);
962 }
963 
964 /******************************************************************************/
965 PetscErrorCode gs_gop_binary(gs_ADT gs, PetscScalar *vals, rbfp fct)
966 {
967   PetscErrorCode ierr;
968   PetscFunctionBegin;
969   /* local only operations!!! */
970   if (gs->num_local)
971     {ierr = gs_gop_local_binary(gs,vals,fct);CHKERRQ(ierr);}
972 
973   /* if intersection tree/pairwise and local isn't empty */
974   if (gs->num_local_gop)
975     {
976       ierr = gs_gop_local_in_binary(gs,vals,fct);CHKERRQ(ierr);
977 
978       /* pairwise */
979       if (gs->num_pairs)
980         {ierr = gs_gop_pairwise_binary(gs,vals,fct);CHKERRQ(ierr);}
981 
982       /* tree */
983       else if (gs->max_left_over)
984         {ierr = gs_gop_tree_binary(gs,vals,fct);CHKERRQ(ierr);}
985 
986       ierr = gs_gop_local_out(gs,vals);CHKERRQ(ierr);
987     }
988   /* if intersection tree/pairwise and local is empty */
989   else
990     {
991       /* pairwise */
992       if (gs->num_pairs)
993         {ierr = gs_gop_pairwise_binary(gs,vals,fct);CHKERRQ(ierr);}
994 
995       /* tree */
996       else if (gs->max_left_over)
997         {ierr = gs_gop_tree_binary(gs,vals,fct);CHKERRQ(ierr);}
998     }
999   PetscFunctionReturn(0);
1000 }
1001 
1002 /******************************************************************************/
1003 static PetscErrorCode gs_gop_local_binary( gs_id *gs,  PetscScalar *vals,  rbfp fct)
1004 {
1005    PetscInt *num, *map, **reduce;
1006   PetscScalar tmp;
1007 
1008   PetscFunctionBegin;
1009   num    = gs->num_local_reduce;
1010   reduce = gs->local_reduce;
1011   while ((map = *reduce))
1012     {
1013       num ++;
1014       (*fct)(&tmp,NULL,1);
1015       /* tmp = 0.0; */
1016       while (*map >= 0)
1017         {(*fct)(&tmp,(vals + *map),1); map++;}
1018         /*        {tmp = (*fct)(tmp,*(vals + *map)); map++;} */
1019 
1020       map = *reduce++;
1021       while (*map >= 0)
1022         {*(vals + *map++) = tmp;}
1023     }
1024   PetscFunctionReturn(0);
1025 }
1026 
1027 /******************************************************************************/
1028 static PetscErrorCode gs_gop_local_in_binary( gs_id *gs,  PetscScalar *vals,  rbfp fct)
1029 {
1030    PetscInt *num, *map, **reduce;
1031    PetscScalar *base;
1032 
1033   PetscFunctionBegin;
1034   num    = gs->num_gop_local_reduce;
1035 
1036   reduce = gs->gop_local_reduce;
1037   while ((map = *reduce++))
1038     {
1039       num++;
1040       base = vals + *map++;
1041       while (*map >= 0)
1042         {(*fct)(base,(vals + *map),1); map++;}
1043     }
1044   PetscFunctionReturn(0);
1045 }
1046 
1047 /******************************************************************************/
1048 static PetscErrorCode gs_gop_pairwise_binary( gs_id *gs,  PetscScalar *in_vals,rbfp fct)
1049 {
1050   PetscScalar    *dptr1, *dptr2, *dptr3, *in1, *in2;
1051   PetscInt            *iptr, *msg_list, *msg_size, **msg_nodes;
1052   PetscInt            *pw, *list, *size, **nodes;
1053   MPI_Request    *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
1054   MPI_Status     status;
1055   PetscErrorCode ierr;
1056 
1057   PetscFunctionBegin;
1058   /* strip and load s */
1059   msg_list =list         = gs->pair_list;
1060   msg_size =size         = gs->msg_sizes;
1061   msg_nodes=nodes        = gs->node_list;
1062   iptr=pw                = gs->pw_elm_list;
1063   dptr1=dptr3            = gs->pw_vals;
1064   msg_ids_in  = ids_in   = gs->msg_ids_in;
1065   msg_ids_out = ids_out  = gs->msg_ids_out;
1066   dptr2                  = gs->out;
1067   in1=in2                = gs->in;
1068 
1069   /* post the receives */
1070   /*  msg_nodes=nodes; */
1071   do
1072     {
1073       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
1074          second one *list and do list++ afterwards */
1075       ierr = MPI_Irecv(in1, *size, MPIU_SCALAR, MPI_ANY_SOURCE, MSGTAG1 + *list++, gs->gs_comm, msg_ids_in++);CHKERRQ(ierr);
1076       in1 += *size++;
1077     }
1078   while (*++msg_nodes);
1079   msg_nodes=nodes;
1080 
1081   /* load gs values into in out gs buffers */
1082   while (*iptr >= 0)
1083     {*dptr3++ = *(in_vals + *iptr++);}
1084 
1085   /* load out buffers and post the sends */
1086   while ((iptr = *msg_nodes++))
1087     {
1088       dptr3 = dptr2;
1089       while (*iptr >= 0)
1090         {*dptr2++ = *(dptr1 + *iptr++);}
1091       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
1092       /* is msg_ids_out++ correct? */
1093       ierr = MPI_Isend(dptr3, *msg_size++, MPIU_SCALAR, *msg_list++, MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);CHKERRQ(ierr);
1094     }
1095 
1096   if (gs->max_left_over)
1097     {gs_gop_tree_binary(gs,in_vals,fct);}
1098 
1099   /* process the received data */
1100   msg_nodes=nodes;
1101   while ((iptr = *nodes++))
1102     {
1103       /* Should I check the return value of MPI_Wait() or status? */
1104       /* Can this loop be replaced by a call to MPI_Waitall()? */
1105       ierr = MPI_Wait(ids_in++, &status);CHKERRQ(ierr);
1106       while (*iptr >= 0)
1107         {(*fct)((dptr1 + *iptr),in2,1); iptr++; in2++;}
1108       /* {*(dptr1 + *iptr) = (*fct)(*(dptr1 + *iptr),*in2); iptr++; in2++;} */
1109     }
1110 
1111   /* replace vals */
1112   while (*pw >= 0)
1113     {*(in_vals + *pw++) = *dptr1++;}
1114 
1115   /* clear isend message handles */
1116   /* This changed for clarity though it could be the same */
1117   while (*msg_nodes++)
1118     /* Should I check the return value of MPI_Wait() or status? */
1119     /* Can this loop be replaced by a call to MPI_Waitall()? */
1120     {ierr = MPI_Wait(ids_out++, &status);CHKERRQ(ierr);}
1121   PetscFunctionReturn(0);
1122 }
1123 
1124 /******************************************************************************/
1125 static PetscErrorCode gs_gop_tree_binary(gs_id *gs, PetscScalar *vals,  rbfp fct)
1126 {
1127   PetscInt         size;
1128   PetscInt         *in,    *out;
1129   PetscScalar *buf, *work;
1130 
1131   PetscFunctionBegin;
1132   in   = gs->tree_map_in;
1133   out  = gs->tree_map_out;
1134   buf  = gs->tree_buf;
1135   work = gs->tree_work;
1136   size = gs->tree_nel;
1137 
1138   /* load vals vector w/identity */
1139   (*fct)(buf,NULL,size);
1140 
1141   /* load my contribution into val vector */
1142   while (*in >= 0) {
1143     (*fct)((buf + *out++),(vals + *in++),-1);
1144   }
1145 
1146   gfop(buf,work,size,(vbfp)fct,MPIU_SCALAR,0);
1147 
1148   in   = gs->tree_map_in;
1149   out  = gs->tree_map_out;
1150   while (*in >= 0) {
1151     (*fct)((vals + *in++),(buf + *out++),-1);
1152   }
1153   PetscFunctionReturn(0);
1154 }
1155 
1156 /******************************************************************************/
1157 PetscErrorCode gs_gop( gs_id *gs,  PetscScalar *vals,  const char *op)
1158 {
1159   PetscErrorCode ierr;
1160 
1161   PetscFunctionBegin;
1162   switch (*op) {
1163   case '+':
1164     gs_gop_plus(gs,vals);
1165     break;
1166   case '*':
1167     gs_gop_times(gs,vals);
1168     break;
1169   case 'a':
1170     gs_gop_min_abs(gs,vals);
1171     break;
1172   case 'A':
1173     gs_gop_max_abs(gs,vals);
1174     break;
1175   case 'e':
1176     gs_gop_exists(gs,vals);
1177     break;
1178   case 'm':
1179     gs_gop_min(gs,vals);
1180     break;
1181   case 'M':
1182     gs_gop_max(gs,vals); break;
1183   default:
1184     ierr = PetscInfo1(0,"gs_gop() :: %c is not a valid op",op[0]);CHKERRQ(ierr);
1185     ierr = PetscInfo(0,"gs_gop() :: default :: plus");CHKERRQ(ierr);
1186     gs_gop_plus(gs,vals);
1187     break;
1188   }
1189   PetscFunctionReturn(0);
1190 }
1191 
1192 /******************************************************************************/
1193 static PetscErrorCode gs_gop_exists( gs_id *gs,  PetscScalar *vals)
1194 {
1195   PetscFunctionBegin;
1196   /* local only operations!!! */
1197   if (gs->num_local)
1198     {gs_gop_local_exists(gs,vals);}
1199 
1200   /* if intersection tree/pairwise and local isn't empty */
1201   if (gs->num_local_gop)
1202     {
1203       gs_gop_local_in_exists(gs,vals);
1204 
1205       /* pairwise */
1206       if (gs->num_pairs)
1207         {gs_gop_pairwise_exists(gs,vals);}
1208 
1209       /* tree */
1210       else if (gs->max_left_over)
1211         {gs_gop_tree_exists(gs,vals);}
1212 
1213       gs_gop_local_out(gs,vals);
1214     }
1215   /* if intersection tree/pairwise and local is empty */
1216   else
1217     {
1218       /* pairwise */
1219       if (gs->num_pairs)
1220         {gs_gop_pairwise_exists(gs,vals);}
1221 
1222       /* tree */
1223       else if (gs->max_left_over)
1224         {gs_gop_tree_exists(gs,vals);}
1225     }
1226   PetscFunctionReturn(0);
1227 }
1228 
1229 /******************************************************************************/
1230 static PetscErrorCode gs_gop_local_exists( gs_id *gs,  PetscScalar *vals)
1231 {
1232    PetscInt         *num, *map, **reduce;
1233    PetscScalar tmp;
1234 
1235   PetscFunctionBegin;
1236   num    = gs->num_local_reduce;
1237   reduce = gs->local_reduce;
1238   while ((map = *reduce))
1239     {
1240       num ++;
1241       tmp = 0.0;
1242       while (*map >= 0)
1243         {tmp = EXISTS(tmp,*(vals + *map)); map++;}
1244 
1245       map = *reduce++;
1246       while (*map >= 0)
1247         {*(vals + *map++) = tmp;}
1248     }
1249   PetscFunctionReturn(0);
1250 }
1251 
1252 /******************************************************************************/
1253 static PetscErrorCode gs_gop_local_in_exists( gs_id *gs,  PetscScalar *vals)
1254 {
1255   PetscInt         *num, *map, **reduce;
1256   PetscScalar *base;
1257 
1258   PetscFunctionBegin;
1259   num    = gs->num_gop_local_reduce;
1260   reduce = gs->gop_local_reduce;
1261   while ((map = *reduce++))
1262     {
1263       num++;
1264       base = vals + *map++;
1265       while (*map >= 0)
1266         {*base = EXISTS(*base,*(vals + *map)); map++;}
1267     }
1268   PetscFunctionReturn(0);
1269 }
1270 
1271 static PetscErrorCode gs_gop_pairwise_exists( gs_id *gs,  PetscScalar *in_vals)
1272 {
1273   PetscScalar    *dptr1, *dptr2, *dptr3, *in1, *in2;
1274   PetscInt            *iptr, *msg_list, *msg_size, **msg_nodes;
1275   PetscInt            *pw, *list, *size, **nodes;
1276   MPI_Request    *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
1277   MPI_Status     status;
1278   PetscErrorCode ierr;
1279 
1280   PetscFunctionBegin;
1281   /* strip and load s */
1282   msg_list =list         = gs->pair_list;
1283   msg_size =size         = gs->msg_sizes;
1284   msg_nodes=nodes        = gs->node_list;
1285   iptr=pw                = gs->pw_elm_list;
1286   dptr1=dptr3            = gs->pw_vals;
1287   msg_ids_in  = ids_in   = gs->msg_ids_in;
1288   msg_ids_out = ids_out  = gs->msg_ids_out;
1289   dptr2                  = gs->out;
1290   in1=in2                = gs->in;
1291 
1292   /* post the receives */
1293   do
1294     {
1295       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
1296          second one *list and do list++ afterwards */
1297       ierr = MPI_Irecv(in1, *size, MPIU_SCALAR, MPI_ANY_SOURCE, MSGTAG1 + *list++, gs->gs_comm, msg_ids_in++);CHKERRQ(ierr);
1298       in1 += *size++;
1299     }
1300   while (*++msg_nodes);
1301   msg_nodes=nodes;
1302 
1303   /* load gs values into in out gs buffers */
1304   while (*iptr >= 0)
1305     {*dptr3++ = *(in_vals + *iptr++);}
1306 
1307   /* load out buffers and post the sends */
1308   while ((iptr = *msg_nodes++))
1309     {
1310       dptr3 = dptr2;
1311       while (*iptr >= 0)
1312         {*dptr2++ = *(dptr1 + *iptr++);}
1313       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
1314       /* is msg_ids_out++ correct? */
1315       ierr = MPI_Isend(dptr3, *msg_size++, MPIU_SCALAR, *msg_list++, MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);CHKERRQ(ierr);
1316     }
1317 
1318   if (gs->max_left_over)
1319     {gs_gop_tree_exists(gs,in_vals);}
1320 
1321   /* process the received data */
1322   msg_nodes=nodes;
1323   while ((iptr = *nodes++))
1324     {
1325       /* Should I check the return value of MPI_Wait() or status? */
1326       /* Can this loop be replaced by a call to MPI_Waitall()? */
1327       ierr = MPI_Wait(ids_in++, &status);CHKERRQ(ierr);
1328       while (*iptr >= 0)
1329         {*(dptr1 + *iptr) = EXISTS(*(dptr1 + *iptr),*in2); iptr++; in2++;}
1330     }
1331 
1332   /* replace vals */
1333   while (*pw >= 0)
1334     {*(in_vals + *pw++) = *dptr1++;}
1335 
1336   /* clear isend message handles */
1337   /* This changed for clarity though it could be the same */
1338   while (*msg_nodes++)
1339     /* Should I check the return value of MPI_Wait() or status? */
1340     /* Can this loop be replaced by a call to MPI_Waitall()? */
1341     {ierr = MPI_Wait(ids_out++, &status);CHKERRQ(ierr);}
1342   PetscFunctionReturn(0);
1343 }
1344 /******************************************************************************/
1345 static PetscErrorCode gs_gop_tree_exists(gs_id *gs, PetscScalar *vals)
1346 {
1347   PetscInt         size;
1348   PetscInt         *in, *out;
1349   PetscScalar *buf, *work;
1350   PetscInt         op[] = {GL_EXISTS,0};
1351 
1352   PetscFunctionBegin;
1353   in   = gs->tree_map_in;
1354   out  = gs->tree_map_out;
1355   buf  = gs->tree_buf;
1356   work = gs->tree_work;
1357   size = gs->tree_nel;
1358 
1359   rvec_zero(buf,size);
1360 
1361   while (*in >= 0)
1362     {
1363       /*
1364       printf("%d :: out=%d\n",my_id,*out);
1365       printf("%d :: in=%d\n",my_id,*in);
1366       */
1367       *(buf + *out++) = *(vals + *in++);
1368     }
1369 
1370   grop(buf,work,size,op);
1371 
1372   in   = gs->tree_map_in;
1373   out  = gs->tree_map_out;
1374 
1375   while (*in >= 0)
1376     {*(vals + *in++) = *(buf + *out++);}
1377   PetscFunctionReturn(0);
1378 }
1379 
1380 /*******************************************************************************/
1381 static PetscErrorCode gs_gop_max_abs( gs_id *gs,  PetscScalar *vals)
1382 {
1383   PetscFunctionBegin;
1384   /* local only operations!!! */
1385   if (gs->num_local)
1386     {gs_gop_local_max_abs(gs,vals);}
1387 
1388   /* if intersection tree/pairwise and local isn't empty */
1389   if (gs->num_local_gop)
1390     {
1391       gs_gop_local_in_max_abs(gs,vals);
1392 
1393       /* pairwise */
1394       if (gs->num_pairs)
1395         {gs_gop_pairwise_max_abs(gs,vals);}
1396 
1397       /* tree */
1398       else if (gs->max_left_over)
1399         {gs_gop_tree_max_abs(gs,vals);}
1400 
1401       gs_gop_local_out(gs,vals);
1402     }
1403   /* if intersection tree/pairwise and local is empty */
1404   else
1405     {
1406       /* pairwise */
1407       if (gs->num_pairs)
1408         {gs_gop_pairwise_max_abs(gs,vals);}
1409 
1410       /* tree */
1411       else if (gs->max_left_over)
1412         {gs_gop_tree_max_abs(gs,vals);}
1413     }
1414   PetscFunctionReturn(0);
1415 }
1416 
1417 /******************************************************************************/
1418 static PetscErrorCode gs_gop_local_max_abs( gs_id *gs,  PetscScalar *vals)
1419 {
1420   PetscInt         *num, *map, **reduce;
1421   PetscScalar tmp;
1422 
1423   PetscFunctionBegin;
1424   num    = gs->num_local_reduce;
1425   reduce = gs->local_reduce;
1426   while ((map = *reduce))
1427     {
1428       num ++;
1429       tmp = 0.0;
1430       while (*map >= 0)
1431         {tmp = MAX_FABS(tmp,*(vals + *map)); map++;}
1432 
1433       map = *reduce++;
1434       while (*map >= 0)
1435         {*(vals + *map++) = tmp;}
1436     }
1437   PetscFunctionReturn(0);
1438 }
1439 
1440 /******************************************************************************/
1441 static PetscErrorCode gs_gop_local_in_max_abs( gs_id *gs,  PetscScalar *vals)
1442 {
1443   PetscInt         *num, *map, **reduce;
1444   PetscScalar *base;
1445 
1446   PetscFunctionBegin;
1447   num    = gs->num_gop_local_reduce;
1448   reduce = gs->gop_local_reduce;
1449   while ((map = *reduce++))
1450     {
1451       num++;
1452       base = vals + *map++;
1453       while (*map >= 0)
1454         {*base = MAX_FABS(*base,*(vals + *map)); map++;}
1455     }
1456   PetscFunctionReturn(0);
1457 }
1458 
1459 /******************************************************************************/
1460 static PetscErrorCode gs_gop_pairwise_max_abs( gs_id *gs,  PetscScalar *in_vals)
1461 {
1462   PetscScalar    *dptr1, *dptr2, *dptr3, *in1, *in2;
1463   PetscInt            *iptr, *msg_list, *msg_size, **msg_nodes;
1464   PetscInt            *pw, *list, *size, **nodes;
1465   MPI_Request    *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
1466   MPI_Status     status;
1467   PetscErrorCode ierr;
1468 
1469   PetscFunctionBegin;
1470   /* strip and load s */
1471   msg_list =list         = gs->pair_list;
1472   msg_size =size         = gs->msg_sizes;
1473   msg_nodes=nodes        = gs->node_list;
1474   iptr=pw                = gs->pw_elm_list;
1475   dptr1=dptr3            = gs->pw_vals;
1476   msg_ids_in  = ids_in   = gs->msg_ids_in;
1477   msg_ids_out = ids_out  = gs->msg_ids_out;
1478   dptr2                  = gs->out;
1479   in1=in2                = gs->in;
1480 
1481   /* post the receives */
1482   /*  msg_nodes=nodes; */
1483   do
1484     {
1485       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
1486          second one *list and do list++ afterwards */
1487       ierr = MPI_Irecv(in1, *size, MPIU_SCALAR, MPI_ANY_SOURCE, MSGTAG1 + *list++, gs->gs_comm, msg_ids_in++);CHKERRQ(ierr);
1488       in1 += *size++;
1489     }
1490   while (*++msg_nodes);
1491   msg_nodes=nodes;
1492 
1493   /* load gs values into in out gs buffers */
1494   while (*iptr >= 0)
1495     {*dptr3++ = *(in_vals + *iptr++);}
1496 
1497   /* load out buffers and post the sends */
1498   while ((iptr = *msg_nodes++))
1499     {
1500       dptr3 = dptr2;
1501       while (*iptr >= 0)
1502         {*dptr2++ = *(dptr1 + *iptr++);}
1503       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
1504       /* is msg_ids_out++ correct? */
1505       ierr = MPI_Isend(dptr3, *msg_size++, MPIU_SCALAR, *msg_list++, MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);CHKERRQ(ierr);
1506     }
1507 
1508   if (gs->max_left_over)
1509     {gs_gop_tree_max_abs(gs,in_vals);}
1510 
1511   /* process the received data */
1512   msg_nodes=nodes;
1513   while ((iptr = *nodes++))
1514     {
1515       /* Should I check the return value of MPI_Wait() or status? */
1516       /* Can this loop be replaced by a call to MPI_Waitall()? */
1517       ierr = MPI_Wait(ids_in++, &status);CHKERRQ(ierr);
1518       while (*iptr >= 0)
1519         {*(dptr1 + *iptr) = MAX_FABS(*(dptr1 + *iptr),*in2); iptr++; in2++;}
1520     }
1521 
1522   /* replace vals */
1523   while (*pw >= 0)
1524     {*(in_vals + *pw++) = *dptr1++;}
1525 
1526   /* clear isend message handles */
1527   /* This changed for clarity though it could be the same */
1528   while (*msg_nodes++)
1529     /* Should I check the return value of MPI_Wait() or status? */
1530     /* Can this loop be replaced by a call to MPI_Waitall()? */
1531     {ierr = MPI_Wait(ids_out++, &status);CHKERRQ(ierr);}
1532   PetscFunctionReturn(0);
1533 }
1534 
1535 /******************************************************************************/
1536 static PetscErrorCode gs_gop_tree_max_abs(gs_id *gs, PetscScalar *vals)
1537 {
1538   PetscInt         size;
1539   PetscInt         *in, *out;
1540   PetscScalar *buf, *work;
1541   PetscInt         op[] = {GL_MAX_ABS,0};
1542 
1543   PetscFunctionBegin;
1544   in   = gs->tree_map_in;
1545   out  = gs->tree_map_out;
1546   buf  = gs->tree_buf;
1547   work = gs->tree_work;
1548   size = gs->tree_nel;
1549 
1550   rvec_zero(buf,size);
1551 
1552   while (*in >= 0)
1553     {
1554       /*
1555       printf("%d :: out=%d\n",my_id,*out);
1556       printf("%d :: in=%d\n",my_id,*in);
1557       */
1558       *(buf + *out++) = *(vals + *in++);
1559     }
1560 
1561   grop(buf,work,size,op);
1562 
1563   in   = gs->tree_map_in;
1564   out  = gs->tree_map_out;
1565 
1566   while (*in >= 0)
1567     {*(vals + *in++) = *(buf + *out++);}
1568   PetscFunctionReturn(0);
1569 }
1570 
1571 /******************************************************************************/
1572 static PetscErrorCode gs_gop_max( gs_id *gs,  PetscScalar *vals)
1573 {
1574   PetscFunctionBegin;
1575   /* local only operations!!! */
1576   if (gs->num_local)
1577     {gs_gop_local_max(gs,vals);}
1578 
1579   /* if intersection tree/pairwise and local isn't empty */
1580   if (gs->num_local_gop)
1581     {
1582       gs_gop_local_in_max(gs,vals);
1583 
1584       /* pairwise */
1585       if (gs->num_pairs)
1586         {gs_gop_pairwise_max(gs,vals);}
1587 
1588       /* tree */
1589       else if (gs->max_left_over)
1590         {gs_gop_tree_max(gs,vals);}
1591 
1592       gs_gop_local_out(gs,vals);
1593     }
1594   /* if intersection tree/pairwise and local is empty */
1595   else
1596     {
1597       /* pairwise */
1598       if (gs->num_pairs)
1599         {gs_gop_pairwise_max(gs,vals);}
1600 
1601       /* tree */
1602       else if (gs->max_left_over)
1603         {gs_gop_tree_max(gs,vals);}
1604     }
1605   PetscFunctionReturn(0);
1606 }
1607 
1608 /******************************************************************************/
1609 static PetscErrorCode gs_gop_local_max( gs_id *gs,  PetscScalar *vals)
1610 {
1611   PetscInt         *num, *map, **reduce;
1612   PetscScalar tmp;
1613 
1614   PetscFunctionBegin;
1615   num    = gs->num_local_reduce;
1616   reduce = gs->local_reduce;
1617   while ((map = *reduce))
1618     {
1619       num ++;
1620       tmp = -REAL_MAX;
1621       while (*map >= 0)
1622         {tmp = PetscMax(tmp,*(vals + *map)); map++;}
1623 
1624       map = *reduce++;
1625       while (*map >= 0)
1626         {*(vals + *map++) = tmp;}
1627     }
1628   PetscFunctionReturn(0);
1629 }
1630 
1631 /******************************************************************************/
1632 static PetscErrorCode gs_gop_local_in_max( gs_id *gs,  PetscScalar *vals)
1633 {
1634   PetscInt         *num, *map, **reduce;
1635   PetscScalar *base;
1636 
1637   PetscFunctionBegin;
1638   num    = gs->num_gop_local_reduce;
1639   reduce = gs->gop_local_reduce;
1640   while ((map = *reduce++))
1641     {
1642       num++;
1643       base = vals + *map++;
1644       while (*map >= 0)
1645         {*base = PetscMax(*base,*(vals + *map)); map++;}
1646     }
1647   PetscFunctionReturn(0);
1648 }
1649 
1650 /******************************************************************************/
1651 static PetscErrorCode gs_gop_pairwise_max( gs_id *gs,  PetscScalar *in_vals)
1652 {
1653   PetscScalar    *dptr1, *dptr2, *dptr3, *in1, *in2;
1654   PetscInt            *iptr, *msg_list, *msg_size, **msg_nodes;
1655   PetscInt            *pw, *list, *size, **nodes;
1656   MPI_Request    *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
1657   MPI_Status     status;
1658   PetscErrorCode ierr;
1659 
1660   PetscFunctionBegin;
1661   /* strip and load s */
1662   msg_list =list         = gs->pair_list;
1663   msg_size =size         = gs->msg_sizes;
1664   msg_nodes=nodes        = gs->node_list;
1665   iptr=pw                = gs->pw_elm_list;
1666   dptr1=dptr3            = gs->pw_vals;
1667   msg_ids_in  = ids_in   = gs->msg_ids_in;
1668   msg_ids_out = ids_out  = gs->msg_ids_out;
1669   dptr2                  = gs->out;
1670   in1=in2                = gs->in;
1671 
1672   /* post the receives */
1673   /*  msg_nodes=nodes; */
1674   do
1675     {
1676       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
1677          second one *list and do list++ afterwards */
1678       ierr = MPI_Irecv(in1, *size, MPIU_SCALAR, MPI_ANY_SOURCE, MSGTAG1 + *list++, gs->gs_comm, msg_ids_in++);CHKERRQ(ierr);
1679       in1 += *size++;
1680     }
1681   while (*++msg_nodes);
1682   msg_nodes=nodes;
1683 
1684   /* load gs values into in out gs buffers */
1685   while (*iptr >= 0)
1686     {*dptr3++ = *(in_vals + *iptr++);}
1687 
1688   /* load out buffers and post the sends */
1689   while ((iptr = *msg_nodes++))
1690     {
1691       dptr3 = dptr2;
1692       while (*iptr >= 0)
1693         {*dptr2++ = *(dptr1 + *iptr++);}
1694       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
1695       /* is msg_ids_out++ correct? */
1696       ierr = MPI_Isend(dptr3, *msg_size++, MPIU_SCALAR, *msg_list++, MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);CHKERRQ(ierr);
1697     }
1698 
1699   if (gs->max_left_over)
1700     {gs_gop_tree_max(gs,in_vals);}
1701 
1702   /* process the received data */
1703   msg_nodes=nodes;
1704   while ((iptr = *nodes++))
1705     {
1706       /* Should I check the return value of MPI_Wait() or status? */
1707       /* Can this loop be replaced by a call to MPI_Waitall()? */
1708       ierr = MPI_Wait(ids_in++, &status);CHKERRQ(ierr);
1709       while (*iptr >= 0)
1710         {*(dptr1 + *iptr) = PetscMax(*(dptr1 + *iptr),*in2); iptr++; in2++;}
1711     }
1712 
1713   /* replace vals */
1714   while (*pw >= 0)
1715     {*(in_vals + *pw++) = *dptr1++;}
1716 
1717   /* clear isend message handles */
1718   /* This changed for clarity though it could be the same */
1719   while (*msg_nodes++)
1720     /* Should I check the return value of MPI_Wait() or status? */
1721     /* Can this loop be replaced by a call to MPI_Waitall()? */
1722     {ierr = MPI_Wait(ids_out++, &status);CHKERRQ(ierr);}
1723   PetscFunctionReturn(0);
1724 }
1725 
1726 /******************************************************************************/
1727 static PetscErrorCode gs_gop_tree_max(gs_id *gs, PetscScalar *vals)
1728 {
1729   PetscInt            size;
1730   PetscInt            *in, *out;
1731   PetscScalar    *buf, *work;
1732   PetscErrorCode ierr;
1733 
1734   PetscFunctionBegin;
1735   in   = gs->tree_map_in;
1736   out  = gs->tree_map_out;
1737   buf  = gs->tree_buf;
1738   work = gs->tree_work;
1739   size = gs->tree_nel;
1740 
1741   rvec_set(buf,-REAL_MAX,size);
1742 
1743   while (*in >= 0)
1744     {*(buf + *out++) = *(vals + *in++);}
1745 
1746   in   = gs->tree_map_in;
1747   out  = gs->tree_map_out;
1748   ierr = MPI_Allreduce(buf,work,size,MPIU_SCALAR,MPI_MAX,gs->gs_comm);CHKERRQ(ierr);
1749   while (*in >= 0)
1750     {*(vals + *in++) = *(work + *out++);}
1751   PetscFunctionReturn(0);
1752 }
1753 /******************************************************************************/
1754 static PetscErrorCode gs_gop_min_abs( gs_id *gs,  PetscScalar *vals)
1755 {
1756   PetscFunctionBegin;
1757   /* local only operations!!! */
1758   if (gs->num_local)
1759     {gs_gop_local_min_abs(gs,vals);}
1760 
1761   /* if intersection tree/pairwise and local isn't empty */
1762   if (gs->num_local_gop)
1763     {
1764       gs_gop_local_in_min_abs(gs,vals);
1765 
1766       /* pairwise */
1767       if (gs->num_pairs)
1768         {gs_gop_pairwise_min_abs(gs,vals);}
1769 
1770       /* tree */
1771       else if (gs->max_left_over)
1772         {gs_gop_tree_min_abs(gs,vals);}
1773 
1774       gs_gop_local_out(gs,vals);
1775     }
1776   /* if intersection tree/pairwise and local is empty */
1777   else
1778     {
1779       /* pairwise */
1780       if (gs->num_pairs)
1781         {gs_gop_pairwise_min_abs(gs,vals);}
1782 
1783       /* tree */
1784       else if (gs->max_left_over)
1785         {gs_gop_tree_min_abs(gs,vals);}
1786     }
1787   PetscFunctionReturn(0);
1788 }
1789 
1790 /******************************************************************************/
1791 static PetscErrorCode gs_gop_local_min_abs( gs_id *gs,  PetscScalar *vals)
1792 {
1793    PetscInt *num, *map, **reduce;
1794    PetscScalar tmp;
1795 
1796   PetscFunctionBegin;
1797   num    = gs->num_local_reduce;
1798   reduce = gs->local_reduce;
1799   while ((map = *reduce))
1800     {
1801       num ++;
1802       tmp = REAL_MAX;
1803       while (*map >= 0)
1804         {tmp = MIN_FABS(tmp,*(vals + *map)); map++;}
1805 
1806       map = *reduce++;
1807       while (*map >= 0)
1808         {*(vals + *map++) = tmp;}
1809     }
1810   PetscFunctionReturn(0);
1811 }
1812 
1813 /******************************************************************************/
1814 static PetscErrorCode gs_gop_local_in_min_abs( gs_id *gs,  PetscScalar *vals)
1815 {
1816    PetscInt *num, *map, **reduce;
1817    PetscScalar *base;
1818 
1819   PetscFunctionBegin;
1820   num    = gs->num_gop_local_reduce;
1821   reduce = gs->gop_local_reduce;
1822   while ((map = *reduce++))
1823     {
1824       num++;
1825       base = vals + *map++;
1826       while (*map >= 0)
1827         {*base = MIN_FABS(*base,*(vals + *map)); map++;}
1828     }
1829   PetscFunctionReturn(0);
1830 }
1831 
1832 /******************************************************************************/
1833 static PetscErrorCode gs_gop_pairwise_min_abs( gs_id *gs,  PetscScalar *in_vals)
1834 {
1835    PetscScalar *dptr1, *dptr2, *dptr3, *in1, *in2;
1836    PetscInt *iptr, *msg_list, *msg_size, **msg_nodes;
1837    PetscInt *pw, *list, *size, **nodes;
1838   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
1839   MPI_Status status;
1840   PetscErrorCode ierr;
1841 
1842   PetscFunctionBegin;
1843   /* strip and load s */
1844   msg_list =list         = gs->pair_list;
1845   msg_size =size         = gs->msg_sizes;
1846   msg_nodes=nodes        = gs->node_list;
1847   iptr=pw                = gs->pw_elm_list;
1848   dptr1=dptr3            = gs->pw_vals;
1849   msg_ids_in  = ids_in   = gs->msg_ids_in;
1850   msg_ids_out = ids_out  = gs->msg_ids_out;
1851   dptr2                  = gs->out;
1852   in1=in2                = gs->in;
1853 
1854   /* post the receives */
1855   /*  msg_nodes=nodes; */
1856   do
1857     {
1858       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
1859          second one *list and do list++ afterwards */
1860       ierr = MPI_Irecv(in1, *size, MPIU_SCALAR, MPI_ANY_SOURCE, MSGTAG1 + *list++, gs->gs_comm, msg_ids_in++);CHKERRQ(ierr);
1861       in1 += *size++;
1862     }
1863   while (*++msg_nodes);
1864   msg_nodes=nodes;
1865 
1866   /* load gs values into in out gs buffers */
1867   while (*iptr >= 0)
1868     {*dptr3++ = *(in_vals + *iptr++);}
1869 
1870   /* load out buffers and post the sends */
1871   while ((iptr = *msg_nodes++))
1872     {
1873       dptr3 = dptr2;
1874       while (*iptr >= 0)
1875         {*dptr2++ = *(dptr1 + *iptr++);}
1876       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
1877       /* is msg_ids_out++ correct? */
1878       ierr = MPI_Isend(dptr3, *msg_size++, MPIU_SCALAR, *msg_list++, MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);CHKERRQ(ierr);
1879     }
1880 
1881   if (gs->max_left_over)
1882     {gs_gop_tree_min_abs(gs,in_vals);}
1883 
1884   /* process the received data */
1885   msg_nodes=nodes;
1886   while ((iptr = *nodes++))
1887     {
1888       /* Should I check the return value of MPI_Wait() or status? */
1889       /* Can this loop be replaced by a call to MPI_Waitall()? */
1890       ierr = MPI_Wait(ids_in++, &status);CHKERRQ(ierr);
1891       while (*iptr >= 0)
1892         {*(dptr1 + *iptr) = MIN_FABS(*(dptr1 + *iptr),*in2); iptr++; in2++;}
1893     }
1894 
1895   /* replace vals */
1896   while (*pw >= 0)
1897     {*(in_vals + *pw++) = *dptr1++;}
1898 
1899   /* clear isend message handles */
1900   /* This changed for clarity though it could be the same */
1901   while (*msg_nodes++)
1902     /* Should I check the return value of MPI_Wait() or status? */
1903     /* Can this loop be replaced by a call to MPI_Waitall()? */
1904     {ierr = MPI_Wait(ids_out++, &status);CHKERRQ(ierr);}
1905   PetscFunctionReturn(0);
1906 }
1907 
1908 /******************************************************************************/
1909 static PetscErrorCode gs_gop_tree_min_abs(gs_id *gs, PetscScalar *vals)
1910 {
1911   PetscInt size;
1912   PetscInt *in, *out;
1913   PetscScalar *buf, *work;
1914   PetscInt op[] = {GL_MIN_ABS,0};
1915 
1916   PetscFunctionBegin;
1917   in   = gs->tree_map_in;
1918   out  = gs->tree_map_out;
1919   buf  = gs->tree_buf;
1920   work = gs->tree_work;
1921   size = gs->tree_nel;
1922 
1923   rvec_set(buf,REAL_MAX,size);
1924 
1925   while (*in >= 0)
1926     {*(buf + *out++) = *(vals + *in++);}
1927 
1928   in   = gs->tree_map_in;
1929   out  = gs->tree_map_out;
1930   grop(buf,work,size,op);
1931   while (*in >= 0)
1932     {*(vals + *in++) = *(buf + *out++);}
1933   PetscFunctionReturn(0);
1934 }
1935 
1936 /******************************************************************************/
1937 static PetscErrorCode gs_gop_min( gs_id *gs,  PetscScalar *vals)
1938 {
1939   PetscFunctionBegin;
1940   /* local only operations!!! */
1941   if (gs->num_local)
1942     {gs_gop_local_min(gs,vals);}
1943 
1944   /* if intersection tree/pairwise and local isn't empty */
1945   if (gs->num_local_gop)
1946     {
1947       gs_gop_local_in_min(gs,vals);
1948 
1949       /* pairwise */
1950       if (gs->num_pairs)
1951         {gs_gop_pairwise_min(gs,vals);}
1952 
1953       /* tree */
1954       else if (gs->max_left_over)
1955         {gs_gop_tree_min(gs,vals);}
1956 
1957       gs_gop_local_out(gs,vals);
1958     }
1959   /* if intersection tree/pairwise and local is empty */
1960   else
1961     {
1962       /* pairwise */
1963       if (gs->num_pairs)
1964         {gs_gop_pairwise_min(gs,vals);}
1965 
1966       /* tree */
1967       else if (gs->max_left_over)
1968         {gs_gop_tree_min(gs,vals);}
1969     }
1970   PetscFunctionReturn(0);
1971 }
1972 
1973 /******************************************************************************/
1974 static PetscErrorCode gs_gop_local_min( gs_id *gs,  PetscScalar *vals)
1975 {
1976    PetscInt *num, *map, **reduce;
1977    PetscScalar tmp;
1978   PetscFunctionBegin;
1979   num    = gs->num_local_reduce;
1980   reduce = gs->local_reduce;
1981   while ((map = *reduce))
1982     {
1983       num ++;
1984       tmp = REAL_MAX;
1985       while (*map >= 0)
1986         {tmp = PetscMin(tmp,*(vals + *map)); map++;}
1987 
1988       map = *reduce++;
1989       while (*map >= 0)
1990         {*(vals + *map++) = tmp;}
1991     }
1992   PetscFunctionReturn(0);
1993 }
1994 
1995 /******************************************************************************/
1996 static PetscErrorCode gs_gop_local_in_min( gs_id *gs,  PetscScalar *vals)
1997 {
1998    PetscInt *num, *map, **reduce;
1999    PetscScalar *base;
2000 
2001   PetscFunctionBegin;
2002   num    = gs->num_gop_local_reduce;
2003   reduce = gs->gop_local_reduce;
2004   while ((map = *reduce++))
2005     {
2006       num++;
2007       base = vals + *map++;
2008       while (*map >= 0)
2009         {*base = PetscMin(*base,*(vals + *map)); map++;}
2010     }
2011   PetscFunctionReturn(0);
2012 }
2013 
2014 /******************************************************************************/
2015 static PetscErrorCode gs_gop_pairwise_min( gs_id *gs,  PetscScalar *in_vals)
2016 {
2017    PetscScalar *dptr1, *dptr2, *dptr3, *in1, *in2;
2018    PetscInt *iptr, *msg_list, *msg_size, **msg_nodes;
2019    PetscInt *pw, *list, *size, **nodes;
2020   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
2021   MPI_Status status;
2022   PetscErrorCode ierr;
2023 
2024   PetscFunctionBegin;
2025   /* strip and load s */
2026   msg_list =list         = gs->pair_list;
2027   msg_size =size         = gs->msg_sizes;
2028   msg_nodes=nodes        = gs->node_list;
2029   iptr=pw                = gs->pw_elm_list;
2030   dptr1=dptr3            = gs->pw_vals;
2031   msg_ids_in  = ids_in   = gs->msg_ids_in;
2032   msg_ids_out = ids_out  = gs->msg_ids_out;
2033   dptr2                  = gs->out;
2034   in1=in2                = gs->in;
2035 
2036   /* post the receives */
2037   /*  msg_nodes=nodes; */
2038   do
2039     {
2040       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
2041          second one *list and do list++ afterwards */
2042       ierr = MPI_Irecv(in1, *size, MPIU_SCALAR, MPI_ANY_SOURCE, MSGTAG1 + *list++, gs->gs_comm, msg_ids_in++);CHKERRQ(ierr);
2043       in1 += *size++;
2044     }
2045   while (*++msg_nodes);
2046   msg_nodes=nodes;
2047 
2048   /* load gs values into in out gs buffers */
2049   while (*iptr >= 0)
2050     {*dptr3++ = *(in_vals + *iptr++);}
2051 
2052   /* load out buffers and post the sends */
2053   while ((iptr = *msg_nodes++))
2054     {
2055       dptr3 = dptr2;
2056       while (*iptr >= 0)
2057         {*dptr2++ = *(dptr1 + *iptr++);}
2058       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
2059       /* is msg_ids_out++ correct? */
2060       ierr = MPI_Isend(dptr3, *msg_size++, MPIU_SCALAR, *msg_list++, MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);CHKERRQ(ierr);
2061     }
2062 
2063   /* process the received data */
2064   if (gs->max_left_over)
2065     {gs_gop_tree_min(gs,in_vals);}
2066 
2067   msg_nodes=nodes;
2068   while ((iptr = *nodes++))
2069     {
2070       /* Should I check the return value of MPI_Wait() or status? */
2071       /* Can this loop be replaced by a call to MPI_Waitall()? */
2072       ierr = MPI_Wait(ids_in++, &status);CHKERRQ(ierr);
2073       while (*iptr >= 0)
2074         {*(dptr1 + *iptr) = PetscMin(*(dptr1 + *iptr),*in2); iptr++; in2++;}
2075     }
2076 
2077   /* replace vals */
2078   while (*pw >= 0)
2079     {*(in_vals + *pw++) = *dptr1++;}
2080 
2081   /* clear isend message handles */
2082   /* This changed for clarity though it could be the same */
2083   while (*msg_nodes++)
2084     /* Should I check the return value of MPI_Wait() or status? */
2085     /* Can this loop be replaced by a call to MPI_Waitall()? */
2086     {ierr = MPI_Wait(ids_out++, &status);CHKERRQ(ierr);}
2087   PetscFunctionReturn(0);
2088 }
2089 
2090 /******************************************************************************/
2091 static PetscErrorCode gs_gop_tree_min(gs_id *gs, PetscScalar *vals)
2092 {
2093   PetscInt size;
2094   PetscInt *in, *out;
2095   PetscScalar *buf, *work;
2096   PetscErrorCode ierr;
2097 
2098   PetscFunctionBegin;
2099   in   = gs->tree_map_in;
2100   out  = gs->tree_map_out;
2101   buf  = gs->tree_buf;
2102   work = gs->tree_work;
2103   size = gs->tree_nel;
2104 
2105   rvec_set(buf,REAL_MAX,size);
2106 
2107   while (*in >= 0)
2108     {*(buf + *out++) = *(vals + *in++);}
2109 
2110   in   = gs->tree_map_in;
2111   out  = gs->tree_map_out;
2112   ierr = MPI_Allreduce(buf,work,size,MPIU_SCALAR,MPI_MIN,gs->gs_comm);CHKERRQ(ierr);
2113   while (*in >= 0)
2114     {*(vals + *in++) = *(work + *out++);}
2115   PetscFunctionReturn(0);
2116 }
2117 
2118 /******************************************************************************/
2119 static PetscErrorCode gs_gop_times( gs_id *gs,  PetscScalar *vals)
2120 {
2121   PetscFunctionBegin;
2122   /* local only operations!!! */
2123   if (gs->num_local)
2124     {gs_gop_local_times(gs,vals);}
2125 
2126   /* if intersection tree/pairwise and local isn't empty */
2127   if (gs->num_local_gop)
2128     {
2129       gs_gop_local_in_times(gs,vals);
2130 
2131       /* pairwise */
2132       if (gs->num_pairs)
2133         {gs_gop_pairwise_times(gs,vals);}
2134 
2135       /* tree */
2136       else if (gs->max_left_over)
2137         {gs_gop_tree_times(gs,vals);}
2138 
2139       gs_gop_local_out(gs,vals);
2140     }
2141   /* if intersection tree/pairwise and local is empty */
2142   else
2143     {
2144       /* pairwise */
2145       if (gs->num_pairs)
2146         {gs_gop_pairwise_times(gs,vals);}
2147 
2148       /* tree */
2149       else if (gs->max_left_over)
2150         {gs_gop_tree_times(gs,vals);}
2151     }
2152   PetscFunctionReturn(0);
2153 }
2154 
2155 /******************************************************************************/
2156 static PetscErrorCode gs_gop_local_times( gs_id *gs,  PetscScalar *vals)
2157 {
2158    PetscInt *num, *map, **reduce;
2159    PetscScalar tmp;
2160 
2161   PetscFunctionBegin;
2162   num    = gs->num_local_reduce;
2163   reduce = gs->local_reduce;
2164   while ((map = *reduce))
2165     {
2166       /* wall */
2167       if (*num == 2)
2168         {
2169           num ++; reduce++;
2170           vals[map[1]] = vals[map[0]] *= vals[map[1]];
2171         }
2172       /* corner shared by three elements */
2173       else if (*num == 3)
2174         {
2175           num ++; reduce++;
2176           vals[map[2]]=vals[map[1]]=vals[map[0]]*=(vals[map[1]]*vals[map[2]]);
2177         }
2178       /* corner shared by four elements */
2179       else if (*num == 4)
2180         {
2181           num ++; reduce++;
2182           vals[map[1]]=vals[map[2]]=vals[map[3]]=vals[map[0]] *=
2183                                  (vals[map[1]] * vals[map[2]] * vals[map[3]]);
2184         }
2185       /* general case ... odd geoms ... 3D*/
2186       else
2187         {
2188           num ++;
2189           tmp = 1.0;
2190           while (*map >= 0)
2191             {tmp *= *(vals + *map++);}
2192 
2193           map = *reduce++;
2194           while (*map >= 0)
2195             {*(vals + *map++) = tmp;}
2196         }
2197     }
2198   PetscFunctionReturn(0);
2199 }
2200 
2201 /******************************************************************************/
2202 static PetscErrorCode gs_gop_local_in_times( gs_id *gs,  PetscScalar *vals)
2203 {
2204    PetscInt *num, *map, **reduce;
2205    PetscScalar *base;
2206 
2207   PetscFunctionBegin;
2208   num    = gs->num_gop_local_reduce;
2209   reduce = gs->gop_local_reduce;
2210   while ((map = *reduce++))
2211     {
2212       /* wall */
2213       if (*num == 2)
2214         {
2215           num ++;
2216           vals[map[0]] *= vals[map[1]];
2217         }
2218       /* corner shared by three elements */
2219       else if (*num == 3)
2220         {
2221           num ++;
2222           vals[map[0]] *= (vals[map[1]] * vals[map[2]]);
2223         }
2224       /* corner shared by four elements */
2225       else if (*num == 4)
2226         {
2227           num ++;
2228           vals[map[0]] *= (vals[map[1]] * vals[map[2]] * vals[map[3]]);
2229         }
2230       /* general case ... odd geoms ... 3D*/
2231       else
2232         {
2233           num++;
2234           base = vals + *map++;
2235           while (*map >= 0)
2236             {*base *= *(vals + *map++);}
2237         }
2238     }
2239   PetscFunctionReturn(0);
2240 }
2241 
2242 /******************************************************************************/
2243 static PetscErrorCode gs_gop_pairwise_times( gs_id *gs,  PetscScalar *in_vals)
2244 {
2245    PetscScalar *dptr1, *dptr2, *dptr3, *in1, *in2;
2246    PetscInt *iptr, *msg_list, *msg_size, **msg_nodes;
2247    PetscInt *pw, *list, *size, **nodes;
2248   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
2249   MPI_Status status;
2250   PetscErrorCode ierr;
2251 
2252   PetscFunctionBegin;
2253   /* strip and load s */
2254   msg_list =list         = gs->pair_list;
2255   msg_size =size         = gs->msg_sizes;
2256   msg_nodes=nodes        = gs->node_list;
2257   iptr=pw                = gs->pw_elm_list;
2258   dptr1=dptr3            = gs->pw_vals;
2259   msg_ids_in  = ids_in   = gs->msg_ids_in;
2260   msg_ids_out = ids_out  = gs->msg_ids_out;
2261   dptr2                  = gs->out;
2262   in1=in2                = gs->in;
2263 
2264   /* post the receives */
2265   /*  msg_nodes=nodes; */
2266   do
2267     {
2268       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
2269          second one *list and do list++ afterwards */
2270       ierr = MPI_Irecv(in1, *size, MPIU_SCALAR, MPI_ANY_SOURCE, MSGTAG1 + *list++, gs->gs_comm, msg_ids_in++);CHKERRQ(ierr);
2271       in1 += *size++;
2272     }
2273   while (*++msg_nodes);
2274   msg_nodes=nodes;
2275 
2276   /* load gs values into in out gs buffers */
2277   while (*iptr >= 0)
2278     {*dptr3++ = *(in_vals + *iptr++);}
2279 
2280   /* load out buffers and post the sends */
2281   while ((iptr = *msg_nodes++))
2282     {
2283       dptr3 = dptr2;
2284       while (*iptr >= 0)
2285         {*dptr2++ = *(dptr1 + *iptr++);}
2286       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
2287       /* is msg_ids_out++ correct? */
2288       ierr = MPI_Isend(dptr3, *msg_size++, MPIU_SCALAR, *msg_list++, MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);CHKERRQ(ierr);
2289     }
2290 
2291   if (gs->max_left_over)
2292     {gs_gop_tree_times(gs,in_vals);}
2293 
2294   /* process the received data */
2295   msg_nodes=nodes;
2296   while ((iptr = *nodes++))
2297     {
2298       /* Should I check the return value of MPI_Wait() or status? */
2299       /* Can this loop be replaced by a call to MPI_Waitall()? */
2300       ierr = MPI_Wait(ids_in++, &status);CHKERRQ(ierr);
2301       while (*iptr >= 0)
2302         {*(dptr1 + *iptr++) *= *in2++;}
2303     }
2304 
2305   /* replace vals */
2306   while (*pw >= 0)
2307     {*(in_vals + *pw++) = *dptr1++;}
2308 
2309   /* clear isend message handles */
2310   /* This changed for clarity though it could be the same */
2311   while (*msg_nodes++)
2312     /* Should I check the return value of MPI_Wait() or status? */
2313     /* Can this loop be replaced by a call to MPI_Waitall()? */
2314     {ierr = MPI_Wait(ids_out++, &status);CHKERRQ(ierr);}
2315   PetscFunctionReturn(0);
2316 }
2317 
2318 /******************************************************************************/
2319 static PetscErrorCode gs_gop_tree_times(gs_id *gs, PetscScalar *vals)
2320 {
2321   PetscInt size;
2322   PetscInt *in, *out;
2323   PetscScalar *buf, *work;
2324   PetscErrorCode ierr;
2325 
2326   PetscFunctionBegin;
2327   in   = gs->tree_map_in;
2328   out  = gs->tree_map_out;
2329   buf  = gs->tree_buf;
2330   work = gs->tree_work;
2331   size = gs->tree_nel;
2332 
2333   rvec_one(buf,size);
2334 
2335   while (*in >= 0)
2336     {*(buf + *out++) = *(vals + *in++);}
2337 
2338   in   = gs->tree_map_in;
2339   out  = gs->tree_map_out;
2340   ierr = MPI_Allreduce(buf,work,size,MPIU_SCALAR,MPI_PROD,gs->gs_comm);CHKERRQ(ierr);
2341   while (*in >= 0)
2342     {*(vals + *in++) = *(work + *out++);}
2343   PetscFunctionReturn(0);
2344 }
2345 
2346 /******************************************************************************/
2347 static PetscErrorCode gs_gop_plus( gs_id *gs,  PetscScalar *vals)
2348 {
2349   PetscFunctionBegin;
2350   /* local only operations!!! */
2351   if (gs->num_local)
2352     {gs_gop_local_plus(gs,vals);}
2353 
2354   /* if intersection tree/pairwise and local isn't empty */
2355   if (gs->num_local_gop)
2356     {
2357       gs_gop_local_in_plus(gs,vals);
2358 
2359       /* pairwise will NOT do tree inside ... */
2360       if (gs->num_pairs)
2361         {gs_gop_pairwise_plus(gs,vals);}
2362 
2363       /* tree */
2364       if (gs->max_left_over)
2365         {gs_gop_tree_plus(gs,vals);}
2366 
2367       gs_gop_local_out(gs,vals);
2368     }
2369   /* if intersection tree/pairwise and local is empty */
2370   else
2371     {
2372       /* pairwise will NOT do tree inside */
2373       if (gs->num_pairs)
2374         {gs_gop_pairwise_plus(gs,vals);}
2375 
2376       /* tree */
2377       if (gs->max_left_over)
2378         {gs_gop_tree_plus(gs,vals);}
2379     }
2380   PetscFunctionReturn(0);
2381 }
2382 
2383 /******************************************************************************/
2384 static PetscErrorCode gs_gop_local_plus( gs_id *gs,  PetscScalar *vals)
2385 {
2386    PetscInt *num, *map, **reduce;
2387    PetscScalar tmp;
2388 
2389   PetscFunctionBegin;
2390   num    = gs->num_local_reduce;
2391   reduce = gs->local_reduce;
2392   while ((map = *reduce))
2393     {
2394       /* wall */
2395       if (*num == 2)
2396         {
2397           num ++; reduce++;
2398           vals[map[1]] = vals[map[0]] += vals[map[1]];
2399         }
2400       /* corner shared by three elements */
2401       else if (*num == 3)
2402         {
2403           num ++; reduce++;
2404           vals[map[2]]=vals[map[1]]=vals[map[0]]+=(vals[map[1]]+vals[map[2]]);
2405         }
2406       /* corner shared by four elements */
2407       else if (*num == 4)
2408         {
2409           num ++; reduce++;
2410           vals[map[1]]=vals[map[2]]=vals[map[3]]=vals[map[0]] +=
2411                                  (vals[map[1]] + vals[map[2]] + vals[map[3]]);
2412         }
2413       /* general case ... odd geoms ... 3D*/
2414       else
2415         {
2416           num ++;
2417           tmp = 0.0;
2418           while (*map >= 0)
2419             {tmp += *(vals + *map++);}
2420 
2421           map = *reduce++;
2422           while (*map >= 0)
2423             {*(vals + *map++) = tmp;}
2424         }
2425     }
2426   PetscFunctionReturn(0);
2427 }
2428 
2429 /******************************************************************************/
2430 static PetscErrorCode gs_gop_local_in_plus( gs_id *gs,  PetscScalar *vals)
2431 {
2432    PetscInt *num, *map, **reduce;
2433    PetscScalar *base;
2434 
2435   PetscFunctionBegin;
2436   num    = gs->num_gop_local_reduce;
2437   reduce = gs->gop_local_reduce;
2438   while ((map = *reduce++))
2439     {
2440       /* wall */
2441       if (*num == 2)
2442         {
2443           num ++;
2444           vals[map[0]] += vals[map[1]];
2445         }
2446       /* corner shared by three elements */
2447       else if (*num == 3)
2448         {
2449           num ++;
2450           vals[map[0]] += (vals[map[1]] + vals[map[2]]);
2451         }
2452       /* corner shared by four elements */
2453       else if (*num == 4)
2454         {
2455           num ++;
2456           vals[map[0]] += (vals[map[1]] + vals[map[2]] + vals[map[3]]);
2457         }
2458       /* general case ... odd geoms ... 3D*/
2459       else
2460         {
2461           num++;
2462           base = vals + *map++;
2463           while (*map >= 0)
2464             {*base += *(vals + *map++);}
2465         }
2466     }
2467   PetscFunctionReturn(0);
2468 }
2469 
2470 /******************************************************************************/
2471 static PetscErrorCode gs_gop_pairwise_plus( gs_id *gs,  PetscScalar *in_vals)
2472 {
2473    PetscScalar *dptr1, *dptr2, *dptr3, *in1, *in2;
2474    PetscInt *iptr, *msg_list, *msg_size, **msg_nodes;
2475    PetscInt *pw, *list, *size, **nodes;
2476   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
2477   MPI_Status status;
2478   PetscErrorCode ierr;
2479 
2480   PetscFunctionBegin;
2481   /* strip and load s */
2482   msg_list =list         = gs->pair_list;
2483   msg_size =size         = gs->msg_sizes;
2484   msg_nodes=nodes        = gs->node_list;
2485   iptr=pw                = gs->pw_elm_list;
2486   dptr1=dptr3            = gs->pw_vals;
2487   msg_ids_in  = ids_in   = gs->msg_ids_in;
2488   msg_ids_out = ids_out  = gs->msg_ids_out;
2489   dptr2                  = gs->out;
2490   in1=in2                = gs->in;
2491 
2492   /* post the receives */
2493   /*  msg_nodes=nodes; */
2494   do
2495     {
2496       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
2497          second one *list and do list++ afterwards */
2498       ierr = MPI_Irecv(in1, *size, MPIU_SCALAR, MPI_ANY_SOURCE, MSGTAG1 + *list++, gs->gs_comm, msg_ids_in++);CHKERRQ(ierr);
2499       in1 += *size++;
2500     }
2501   while (*++msg_nodes);
2502   msg_nodes=nodes;
2503 
2504   /* load gs values into in out gs buffers */
2505   while (*iptr >= 0)
2506     {*dptr3++ = *(in_vals + *iptr++);}
2507 
2508   /* load out buffers and post the sends */
2509   while ((iptr = *msg_nodes++))
2510     {
2511       dptr3 = dptr2;
2512       while (*iptr >= 0)
2513         {*dptr2++ = *(dptr1 + *iptr++);}
2514       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
2515       /* is msg_ids_out++ correct? */
2516       ierr = MPI_Isend(dptr3, *msg_size++, MPIU_SCALAR, *msg_list++, MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);CHKERRQ(ierr);
2517     }
2518 
2519   /* do the tree while we're waiting */
2520   if (gs->max_left_over)
2521     {gs_gop_tree_plus(gs,in_vals);}
2522 
2523   /* process the received data */
2524   msg_nodes=nodes;
2525   while ((iptr = *nodes++))
2526     {
2527       /* Should I check the return value of MPI_Wait() or status? */
2528       /* Can this loop be replaced by a call to MPI_Waitall()? */
2529       ierr = MPI_Wait(ids_in++, &status);CHKERRQ(ierr);
2530       while (*iptr >= 0)
2531         {*(dptr1 + *iptr++) += *in2++;}
2532     }
2533 
2534   /* replace vals */
2535   while (*pw >= 0)
2536     {*(in_vals + *pw++) = *dptr1++;}
2537 
2538   /* clear isend message handles */
2539   /* This changed for clarity though it could be the same */
2540   while (*msg_nodes++)
2541     /* Should I check the return value of MPI_Wait() or status? */
2542     /* Can this loop be replaced by a call to MPI_Waitall()? */
2543     {ierr = MPI_Wait(ids_out++, &status);CHKERRQ(ierr);}
2544   PetscFunctionReturn(0);
2545 }
2546 
2547 /******************************************************************************/
2548 static PetscErrorCode gs_gop_tree_plus(gs_id *gs, PetscScalar *vals)
2549 {
2550   PetscInt size;
2551   PetscInt *in, *out;
2552   PetscScalar *buf, *work;
2553   PetscErrorCode ierr;
2554 
2555   PetscFunctionBegin;
2556   in   = gs->tree_map_in;
2557   out  = gs->tree_map_out;
2558   buf  = gs->tree_buf;
2559   work = gs->tree_work;
2560   size = gs->tree_nel;
2561 
2562   rvec_zero(buf,size);
2563 
2564   while (*in >= 0)
2565     {*(buf + *out++) = *(vals + *in++);}
2566 
2567   in   = gs->tree_map_in;
2568   out  = gs->tree_map_out;
2569   ierr = MPI_Allreduce(buf,work,size,MPIU_SCALAR,MPI_SUM,gs->gs_comm);CHKERRQ(ierr);
2570   while (*in >= 0)
2571     {*(vals + *in++) = *(work + *out++);}
2572   PetscFunctionReturn(0);
2573 }
2574 
2575 /******************************************************************************/
2576 PetscErrorCode gs_free( gs_id *gs)
2577 {
2578    PetscInt i;
2579 
2580   PetscFunctionBegin;
2581   if (gs->nghs) {free((void*) gs->nghs);}
2582   if (gs->pw_nghs) {free((void*) gs->pw_nghs);}
2583 
2584   /* tree */
2585   if (gs->max_left_over)
2586     {
2587       if (gs->tree_elms) {free((void*) gs->tree_elms);}
2588       if (gs->tree_buf) {free((void*) gs->tree_buf);}
2589       if (gs->tree_work) {free((void*) gs->tree_work);}
2590       if (gs->tree_map_in) {free((void*) gs->tree_map_in);}
2591       if (gs->tree_map_out) {free((void*) gs->tree_map_out);}
2592     }
2593 
2594   /* pairwise info */
2595   if (gs->num_pairs)
2596     {
2597       /* should be NULL already */
2598       if (gs->ngh_buf) {free((void*) gs->ngh_buf);}
2599       if (gs->elms) {free((void*) gs->elms);}
2600       if (gs->local_elms) {free((void*) gs->local_elms);}
2601       if (gs->companion) {free((void*) gs->companion);}
2602 
2603       /* only set if pairwise */
2604       if (gs->vals) {free((void*) gs->vals);}
2605       if (gs->in) {free((void*) gs->in);}
2606       if (gs->out) {free((void*) gs->out);}
2607       if (gs->msg_ids_in) {free((void*) gs->msg_ids_in);}
2608       if (gs->msg_ids_out) {free((void*) gs->msg_ids_out);}
2609       if (gs->pw_vals) {free((void*) gs->pw_vals);}
2610       if (gs->pw_elm_list) {free((void*) gs->pw_elm_list);}
2611       if (gs->node_list)
2612         {
2613           for (i=0;i<gs->num_pairs;i++)
2614             {if (gs->node_list[i]) {free((void*) gs->node_list[i]);}}
2615           free((void*) gs->node_list);
2616         }
2617       if (gs->msg_sizes) {free((void*) gs->msg_sizes);}
2618       if (gs->pair_list) {free((void*) gs->pair_list);}
2619     }
2620 
2621   /* local info */
2622   if (gs->num_local_total>=0)
2623     {
2624       for (i=0;i<gs->num_local_total+1;i++)
2625         /*      for (i=0;i<gs->num_local_total;i++) */
2626         {
2627           if (gs->num_gop_local_reduce[i])
2628             {free((void*) gs->gop_local_reduce[i]);}
2629         }
2630     }
2631 
2632   /* if intersection tree/pairwise and local isn't empty */
2633   if (gs->gop_local_reduce) {free((void*) gs->gop_local_reduce);}
2634   if (gs->num_gop_local_reduce) {free((void*) gs->num_gop_local_reduce);}
2635 
2636   free((void*) gs);
2637   PetscFunctionReturn(0);
2638 }
2639 
2640 /******************************************************************************/
2641 PetscErrorCode gs_gop_vec( gs_id *gs,  PetscScalar *vals,  const char *op,  PetscInt step)
2642 {
2643   PetscErrorCode ierr;
2644 
2645   PetscFunctionBegin;
2646   switch (*op) {
2647   case '+':
2648     gs_gop_vec_plus(gs,vals,step);
2649     break;
2650   default:
2651     ierr = PetscInfo1(0,"gs_gop_vec() :: %c is not a valid op",op[0]);CHKERRQ(ierr);
2652     ierr = PetscInfo(0,"gs_gop_vec() :: default :: plus");CHKERRQ(ierr);
2653     gs_gop_vec_plus(gs,vals,step);
2654     break;
2655   }
2656   PetscFunctionReturn(0);
2657 }
2658 
2659 /******************************************************************************/
2660 static PetscErrorCode gs_gop_vec_plus( gs_id *gs,  PetscScalar *vals,  PetscInt step)
2661 {
2662   PetscFunctionBegin;
2663   if (!gs) {SETERRQ(PETSC_ERR_PLIB,"gs_gop_vec() passed NULL gs handle!!!");}
2664 
2665   /* local only operations!!! */
2666   if (gs->num_local)
2667     {gs_gop_vec_local_plus(gs,vals,step);}
2668 
2669   /* if intersection tree/pairwise and local isn't empty */
2670   if (gs->num_local_gop)
2671     {
2672       gs_gop_vec_local_in_plus(gs,vals,step);
2673 
2674       /* pairwise */
2675       if (gs->num_pairs)
2676         {gs_gop_vec_pairwise_plus(gs,vals,step);}
2677 
2678       /* tree */
2679       else if (gs->max_left_over)
2680         {gs_gop_vec_tree_plus(gs,vals,step);}
2681 
2682       gs_gop_vec_local_out(gs,vals,step);
2683     }
2684   /* if intersection tree/pairwise and local is empty */
2685   else
2686     {
2687       /* pairwise */
2688       if (gs->num_pairs)
2689         {gs_gop_vec_pairwise_plus(gs,vals,step);}
2690 
2691       /* tree */
2692       else if (gs->max_left_over)
2693         {gs_gop_vec_tree_plus(gs,vals,step);}
2694     }
2695   PetscFunctionReturn(0);
2696 }
2697 
2698 /******************************************************************************/
2699 static PetscErrorCode gs_gop_vec_local_plus( gs_id *gs,  PetscScalar *vals, PetscInt step)
2700 {
2701    PetscInt *num, *map, **reduce;
2702    PetscScalar *base;
2703 
2704   PetscFunctionBegin;
2705   num    = gs->num_local_reduce;
2706   reduce = gs->local_reduce;
2707   while ((map = *reduce))
2708     {
2709       base = vals + map[0] * step;
2710 
2711       /* wall */
2712       if (*num == 2)
2713         {
2714           num++; reduce++;
2715           rvec_add (base,vals+map[1]*step,step);
2716           rvec_copy(vals+map[1]*step,base,step);
2717         }
2718       /* corner shared by three elements */
2719       else if (*num == 3)
2720         {
2721           num++; reduce++;
2722           rvec_add (base,vals+map[1]*step,step);
2723           rvec_add (base,vals+map[2]*step,step);
2724           rvec_copy(vals+map[2]*step,base,step);
2725           rvec_copy(vals+map[1]*step,base,step);
2726         }
2727       /* corner shared by four elements */
2728       else if (*num == 4)
2729         {
2730           num++; reduce++;
2731           rvec_add (base,vals+map[1]*step,step);
2732           rvec_add (base,vals+map[2]*step,step);
2733           rvec_add (base,vals+map[3]*step,step);
2734           rvec_copy(vals+map[3]*step,base,step);
2735           rvec_copy(vals+map[2]*step,base,step);
2736           rvec_copy(vals+map[1]*step,base,step);
2737         }
2738       /* general case ... odd geoms ... 3D */
2739       else
2740         {
2741           num++;
2742           while (*++map >= 0)
2743             {rvec_add (base,vals+*map*step,step);}
2744 
2745           map = *reduce;
2746           while (*++map >= 0)
2747             {rvec_copy(vals+*map*step,base,step);}
2748 
2749           reduce++;
2750         }
2751     }
2752   PetscFunctionReturn(0);
2753 }
2754 
2755 /******************************************************************************/
2756 static PetscErrorCode gs_gop_vec_local_in_plus( gs_id *gs,  PetscScalar *vals, PetscInt step)
2757 {
2758    PetscInt  *num, *map, **reduce;
2759    PetscScalar *base;
2760   PetscFunctionBegin;
2761   num    = gs->num_gop_local_reduce;
2762   reduce = gs->gop_local_reduce;
2763   while ((map = *reduce++))
2764     {
2765       base = vals + map[0] * step;
2766 
2767       /* wall */
2768       if (*num == 2)
2769         {
2770           num ++;
2771           rvec_add(base,vals+map[1]*step,step);
2772         }
2773       /* corner shared by three elements */
2774       else if (*num == 3)
2775         {
2776           num ++;
2777           rvec_add(base,vals+map[1]*step,step);
2778           rvec_add(base,vals+map[2]*step,step);
2779         }
2780       /* corner shared by four elements */
2781       else if (*num == 4)
2782         {
2783           num ++;
2784           rvec_add(base,vals+map[1]*step,step);
2785           rvec_add(base,vals+map[2]*step,step);
2786           rvec_add(base,vals+map[3]*step,step);
2787         }
2788       /* general case ... odd geoms ... 3D*/
2789       else
2790         {
2791           num++;
2792           while (*++map >= 0)
2793             {rvec_add(base,vals+*map*step,step);}
2794         }
2795     }
2796   PetscFunctionReturn(0);
2797 }
2798 
2799 /******************************************************************************/
2800 static PetscErrorCode gs_gop_vec_local_out( gs_id *gs,  PetscScalar *vals, PetscInt step)
2801 {
2802    PetscInt *num, *map, **reduce;
2803    PetscScalar *base;
2804 
2805   PetscFunctionBegin;
2806   num    = gs->num_gop_local_reduce;
2807   reduce = gs->gop_local_reduce;
2808   while ((map = *reduce++))
2809     {
2810       base = vals + map[0] * step;
2811 
2812       /* wall */
2813       if (*num == 2)
2814         {
2815           num ++;
2816           rvec_copy(vals+map[1]*step,base,step);
2817         }
2818       /* corner shared by three elements */
2819       else if (*num == 3)
2820         {
2821           num ++;
2822           rvec_copy(vals+map[1]*step,base,step);
2823           rvec_copy(vals+map[2]*step,base,step);
2824         }
2825       /* corner shared by four elements */
2826       else if (*num == 4)
2827         {
2828           num ++;
2829           rvec_copy(vals+map[1]*step,base,step);
2830           rvec_copy(vals+map[2]*step,base,step);
2831           rvec_copy(vals+map[3]*step,base,step);
2832         }
2833       /* general case ... odd geoms ... 3D*/
2834       else
2835         {
2836           num++;
2837           while (*++map >= 0)
2838             {rvec_copy(vals+*map*step,base,step);}
2839         }
2840     }
2841   PetscFunctionReturn(0);
2842 }
2843 
2844 /******************************************************************************/
2845 static PetscErrorCode gs_gop_vec_pairwise_plus( gs_id *gs,  PetscScalar *in_vals, PetscInt step)
2846 {
2847    PetscScalar *dptr1, *dptr2, *dptr3, *in1, *in2;
2848    PetscInt *iptr, *msg_list, *msg_size, **msg_nodes;
2849    PetscInt *pw, *list, *size, **nodes;
2850   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
2851   MPI_Status status;
2852   PetscBLASInt i1;
2853   PetscErrorCode ierr;
2854 
2855   PetscFunctionBegin;
2856   /* strip and load s */
2857   msg_list =list         = gs->pair_list;
2858   msg_size =size         = gs->msg_sizes;
2859   msg_nodes=nodes        = gs->node_list;
2860   iptr=pw                = gs->pw_elm_list;
2861   dptr1=dptr3            = gs->pw_vals;
2862   msg_ids_in  = ids_in   = gs->msg_ids_in;
2863   msg_ids_out = ids_out  = gs->msg_ids_out;
2864   dptr2                  = gs->out;
2865   in1=in2                = gs->in;
2866 
2867   /* post the receives */
2868   /*  msg_nodes=nodes; */
2869   do
2870     {
2871       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
2872          second one *list and do list++ afterwards */
2873       ierr = MPI_Irecv(in1, *size *step, MPIU_SCALAR, MPI_ANY_SOURCE, MSGTAG1 + *list++, gs->gs_comm, msg_ids_in++);CHKERRQ(ierr);
2874       in1 += *size++ *step;
2875     }
2876   while (*++msg_nodes);
2877   msg_nodes=nodes;
2878 
2879   /* load gs values into in out gs buffers */
2880   while (*iptr >= 0)
2881     {
2882       rvec_copy(dptr3,in_vals + *iptr*step,step);
2883       dptr3+=step;
2884       iptr++;
2885     }
2886 
2887   /* load out buffers and post the sends */
2888   while ((iptr = *msg_nodes++))
2889     {
2890       dptr3 = dptr2;
2891       while (*iptr >= 0)
2892         {
2893           rvec_copy(dptr2,dptr1 + *iptr*step,step);
2894           dptr2+=step;
2895           iptr++;
2896         }
2897       ierr = MPI_Isend(dptr3, *msg_size++ *step, MPIU_SCALAR, *msg_list++, MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);CHKERRQ(ierr);
2898     }
2899 
2900   /* tree */
2901   if (gs->max_left_over)
2902     {gs_gop_vec_tree_plus(gs,in_vals,step);}
2903 
2904   /* process the received data */
2905   msg_nodes=nodes;
2906   while ((iptr = *nodes++)){
2907     PetscScalar d1 = 1.0;
2908       /* Should I check the return value of MPI_Wait() or status? */
2909       /* Can this loop be replaced by a call to MPI_Waitall()? */
2910       ierr = MPI_Wait(ids_in++, &status);CHKERRQ(ierr);
2911       while (*iptr >= 0) {
2912           BLASaxpy_(&step,&d1,in2,&i1,dptr1 + *iptr*step,&i1);
2913           in2+=step;
2914           iptr++;
2915       }
2916   }
2917 
2918   /* replace vals */
2919   while (*pw >= 0)
2920     {
2921       rvec_copy(in_vals + *pw*step,dptr1,step);
2922       dptr1+=step;
2923       pw++;
2924     }
2925 
2926   /* clear isend message handles */
2927   /* This changed for clarity though it could be the same */
2928   while (*msg_nodes++)
2929     /* Should I check the return value of MPI_Wait() or status? */
2930     /* Can this loop be replaced by a call to MPI_Waitall()? */
2931     {ierr = MPI_Wait(ids_out++, &status);CHKERRQ(ierr);}
2932 
2933   PetscFunctionReturn(0);
2934 }
2935 
2936 /******************************************************************************/
2937 static PetscErrorCode gs_gop_vec_tree_plus( gs_id *gs,  PetscScalar *vals,  PetscInt step)
2938 {
2939   PetscInt size, *in, *out;
2940   PetscScalar *buf, *work;
2941   PetscInt op[] = {GL_ADD,0};
2942   PetscBLASInt i1 = 1;
2943 
2944   PetscFunctionBegin;
2945   /* copy over to local variables */
2946   in   = gs->tree_map_in;
2947   out  = gs->tree_map_out;
2948   buf  = gs->tree_buf;
2949   work = gs->tree_work;
2950   size = gs->tree_nel*step;
2951 
2952   /* zero out collection buffer */
2953   rvec_zero(buf,size);
2954 
2955 
2956   /* copy over my contributions */
2957   while (*in >= 0)
2958     {
2959       BLAScopy_(&step,vals + *in++*step,&i1,buf + *out++*step,&i1);
2960     }
2961 
2962   /* perform fan in/out on full buffer */
2963   /* must change grop to handle the blas */
2964   grop(buf,work,size,op);
2965 
2966   /* reset */
2967   in   = gs->tree_map_in;
2968   out  = gs->tree_map_out;
2969 
2970   /* get the portion of the results I need */
2971   while (*in >= 0)
2972     {
2973       BLAScopy_(&step,buf + *out++*step,&i1,vals + *in++*step,&i1);
2974     }
2975   PetscFunctionReturn(0);
2976 }
2977 
2978 /******************************************************************************/
2979 PetscErrorCode gs_gop_hc( gs_id *gs,  PetscScalar *vals,  const char *op,  PetscInt dim)
2980 {
2981   PetscErrorCode ierr;
2982 
2983   PetscFunctionBegin;
2984   switch (*op) {
2985   case '+':
2986     gs_gop_plus_hc(gs,vals,dim);
2987     break;
2988   default:
2989     ierr = PetscInfo1(0,"gs_gop_hc() :: %c is not a valid op",op[0]);CHKERRQ(ierr);
2990     ierr = PetscInfo(0,"gs_gop_hc() :: default :: plus\n");CHKERRQ(ierr);
2991     gs_gop_plus_hc(gs,vals,dim);
2992     break;
2993   }
2994   PetscFunctionReturn(0);
2995 }
2996 
2997 /******************************************************************************/
2998 static PetscErrorCode gs_gop_plus_hc( gs_id *gs,  PetscScalar *vals, PetscInt dim)
2999 {
3000   PetscFunctionBegin;
3001   /* if there's nothing to do return */
3002   if (dim<=0)
3003     {  PetscFunctionReturn(0);}
3004 
3005   /* can't do more dimensions then exist */
3006   dim = PetscMin(dim,i_log2_num_nodes);
3007 
3008   /* local only operations!!! */
3009   if (gs->num_local)
3010     {gs_gop_local_plus(gs,vals);}
3011 
3012   /* if intersection tree/pairwise and local isn't empty */
3013   if (gs->num_local_gop)
3014     {
3015       gs_gop_local_in_plus(gs,vals);
3016 
3017       /* pairwise will do tree inside ... */
3018       if (gs->num_pairs)
3019         {gs_gop_pairwise_plus_hc(gs,vals,dim);}
3020 
3021       /* tree only */
3022       else if (gs->max_left_over)
3023         {gs_gop_tree_plus_hc(gs,vals,dim);}
3024 
3025       gs_gop_local_out(gs,vals);
3026     }
3027   /* if intersection tree/pairwise and local is empty */
3028   else
3029     {
3030       /* pairwise will do tree inside */
3031       if (gs->num_pairs)
3032         {gs_gop_pairwise_plus_hc(gs,vals,dim);}
3033 
3034       /* tree */
3035       else if (gs->max_left_over)
3036         {gs_gop_tree_plus_hc(gs,vals,dim);}
3037     }
3038   PetscFunctionReturn(0);
3039 }
3040 
3041 /******************************************************************************/
3042 static PetscErrorCode gs_gop_pairwise_plus_hc( gs_id *gs,  PetscScalar *in_vals, PetscInt dim)
3043 {
3044    PetscScalar *dptr1, *dptr2, *dptr3, *in1, *in2;
3045    PetscInt *iptr, *msg_list, *msg_size, **msg_nodes;
3046    PetscInt *pw, *list, *size, **nodes;
3047   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
3048   MPI_Status status;
3049   PetscInt i, mask=1;
3050   PetscErrorCode ierr;
3051 
3052   PetscFunctionBegin;
3053   for (i=1; i<dim; i++)
3054     {mask<<=1; mask++;}
3055 
3056 
3057   /* strip and load s */
3058   msg_list =list         = gs->pair_list;
3059   msg_size =size         = gs->msg_sizes;
3060   msg_nodes=nodes        = gs->node_list;
3061   iptr=pw                = gs->pw_elm_list;
3062   dptr1=dptr3            = gs->pw_vals;
3063   msg_ids_in  = ids_in   = gs->msg_ids_in;
3064   msg_ids_out = ids_out  = gs->msg_ids_out;
3065   dptr2                  = gs->out;
3066   in1=in2                = gs->in;
3067 
3068   /* post the receives */
3069   /*  msg_nodes=nodes; */
3070   do
3071     {
3072       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
3073          second one *list and do list++ afterwards */
3074       if ((my_id|mask)==(*list|mask))
3075         {
3076           ierr = MPI_Irecv(in1, *size, MPIU_SCALAR, MPI_ANY_SOURCE, MSGTAG1 + *list++, gs->gs_comm, msg_ids_in++);CHKERRQ(ierr);
3077           in1 += *size++;
3078         }
3079       else
3080         {list++; size++;}
3081     }
3082   while (*++msg_nodes);
3083 
3084   /* load gs values into in out gs buffers */
3085   while (*iptr >= 0)
3086     {*dptr3++ = *(in_vals + *iptr++);}
3087 
3088   /* load out buffers and post the sends */
3089   msg_nodes=nodes;
3090   list = msg_list;
3091   while ((iptr = *msg_nodes++))
3092     {
3093       if ((my_id|mask)==(*list|mask))
3094         {
3095           dptr3 = dptr2;
3096           while (*iptr >= 0)
3097             {*dptr2++ = *(dptr1 + *iptr++);}
3098           /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
3099           /* is msg_ids_out++ correct? */
3100           ierr = MPI_Isend(dptr3, *msg_size++, MPIU_SCALAR, *list++, MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);CHKERRQ(ierr);
3101         }
3102       else
3103         {list++; msg_size++;}
3104     }
3105 
3106   /* do the tree while we're waiting */
3107   if (gs->max_left_over)
3108     {gs_gop_tree_plus_hc(gs,in_vals,dim);}
3109 
3110   /* process the received data */
3111   msg_nodes=nodes;
3112   list = msg_list;
3113   while ((iptr = *nodes++))
3114     {
3115       if ((my_id|mask)==(*list|mask))
3116         {
3117           /* Should I check the return value of MPI_Wait() or status? */
3118           /* Can this loop be replaced by a call to MPI_Waitall()? */
3119           ierr = MPI_Wait(ids_in++, &status);CHKERRQ(ierr);
3120           while (*iptr >= 0)
3121             {*(dptr1 + *iptr++) += *in2++;}
3122         }
3123       list++;
3124     }
3125 
3126   /* replace vals */
3127   while (*pw >= 0)
3128     {*(in_vals + *pw++) = *dptr1++;}
3129 
3130   /* clear isend message handles */
3131   /* This changed for clarity though it could be the same */
3132   while (*msg_nodes++)
3133     {
3134       if ((my_id|mask)==(*msg_list|mask))
3135         {
3136           /* Should I check the return value of MPI_Wait() or status? */
3137           /* Can this loop be replaced by a call to MPI_Waitall()? */
3138           ierr = MPI_Wait(ids_out++, &status);CHKERRQ(ierr);
3139         }
3140       msg_list++;
3141     }
3142 
3143   PetscFunctionReturn(0);
3144 }
3145 
3146 /******************************************************************************/
3147 static PetscErrorCode gs_gop_tree_plus_hc(gs_id *gs, PetscScalar *vals, PetscInt dim)
3148 {
3149   PetscInt size;
3150   PetscInt *in, *out;
3151   PetscScalar *buf, *work;
3152   PetscInt op[] = {GL_ADD,0};
3153 
3154   PetscFunctionBegin;
3155   in   = gs->tree_map_in;
3156   out  = gs->tree_map_out;
3157   buf  = gs->tree_buf;
3158   work = gs->tree_work;
3159   size = gs->tree_nel;
3160 
3161   rvec_zero(buf,size);
3162 
3163   while (*in >= 0)
3164     {*(buf + *out++) = *(vals + *in++);}
3165 
3166   in   = gs->tree_map_in;
3167   out  = gs->tree_map_out;
3168 
3169   grop_hc(buf,work,size,op,dim);
3170 
3171   while (*in >= 0)
3172     {*(vals + *in++) = *(buf + *out++);}
3173   PetscFunctionReturn(0);
3174 }
3175 
3176 
3177 
3178