xref: /petsc/src/ksp/pc/impls/tfs/gs.c (revision 6e4f4d19663409d37b5f4f7e6013e1d3e9a3a59d)
1 #define PETSCKSP_DLL
2 
3 /***********************************gs.c***************************************
4 
5 Author: Henry M. Tufo III
6 
7 e-mail: hmt@cs.brown.edu
8 
9 snail-mail:
10 Division of Applied Mathematics
11 Brown University
12 Providence, RI 02912
13 
14 Last Modification:
15 6.21.97
16 ************************************gs.c**************************************/
17 
18 /***********************************gs.c***************************************
19 File Description:
20 -----------------
21 
22 ************************************gs.c**************************************/
23 
24 #include "src/ksp/pc/impls/tfs/tfs.h"
25 
26 /* default length of number of items via tree - doubles if exceeded */
27 #define TREE_BUF_SZ 2048;
28 #define GS_VEC_SZ   1
29 
30 
31 
32 /***********************************gs.c***************************************
33 Type: struct gather_scatter_id
34 ------------------------------
35 
36 ************************************gs.c**************************************/
37 typedef struct gather_scatter_id {
38   PetscInt id;
39   PetscInt nel_min;
40   PetscInt nel_max;
41   PetscInt nel_sum;
42   PetscInt negl;
43   PetscInt gl_max;
44   PetscInt gl_min;
45   PetscInt repeats;
46   PetscInt ordered;
47   PetscInt positive;
48   PetscScalar *vals;
49 
50   /* bit mask info */
51   PetscInt *my_proc_mask;
52   PetscInt mask_sz;
53   PetscInt *ngh_buf;
54   PetscInt ngh_buf_sz;
55   PetscInt *nghs;
56   PetscInt num_nghs;
57   PetscInt max_nghs;
58   PetscInt *pw_nghs;
59   PetscInt num_pw_nghs;
60   PetscInt *tree_nghs;
61   PetscInt num_tree_nghs;
62 
63   PetscInt num_loads;
64 
65   /* repeats == true -> local info */
66   PetscInt nel;         /* number of unique elememts */
67   PetscInt *elms;       /* of size nel */
68   PetscInt nel_total;
69   PetscInt *local_elms; /* of size nel_total */
70   PetscInt *companion;  /* of size nel_total */
71 
72   /* local info */
73   PetscInt num_local_total;
74   PetscInt local_strength;
75   PetscInt num_local;
76   PetscInt *num_local_reduce;
77   PetscInt **local_reduce;
78   PetscInt num_local_gop;
79   PetscInt *num_gop_local_reduce;
80   PetscInt **gop_local_reduce;
81 
82   /* pairwise info */
83   PetscInt level;
84   PetscInt num_pairs;
85   PetscInt max_pairs;
86   PetscInt loc_node_pairs;
87   PetscInt max_node_pairs;
88   PetscInt min_node_pairs;
89   PetscInt avg_node_pairs;
90   PetscInt *pair_list;
91   PetscInt *msg_sizes;
92   PetscInt **node_list;
93   PetscInt len_pw_list;
94   PetscInt *pw_elm_list;
95   PetscScalar *pw_vals;
96 
97   MPI_Request *msg_ids_in;
98   MPI_Request *msg_ids_out;
99 
100   PetscScalar *out;
101   PetscScalar *in;
102   PetscInt msg_total;
103 
104   /* tree - crystal accumulator info */
105   PetscInt max_left_over;
106   PetscInt *pre;
107   PetscInt *in_num;
108   PetscInt *out_num;
109   PetscInt **in_list;
110   PetscInt **out_list;
111 
112   /* new tree work*/
113   PetscInt  tree_nel;
114   PetscInt *tree_elms;
115   PetscScalar *tree_buf;
116   PetscScalar *tree_work;
117 
118   PetscInt  tree_map_sz;
119   PetscInt *tree_map_in;
120   PetscInt *tree_map_out;
121 
122   /* current memory status */
123   PetscInt gl_bss_min;
124   PetscInt gl_perm_min;
125 
126   /* max segment size for gs_gop_vec() */
127   PetscInt vec_sz;
128 
129   /* hack to make paul happy */
130   MPI_Comm gs_comm;
131 
132 } gs_id;
133 
134 static gs_id *gsi_check_args(PetscInt *elms, PetscInt nel, PetscInt level);
135 static PetscErrorCode gsi_via_bit_mask(gs_id *gs);
136 static PetscErrorCode get_ngh_buf(gs_id *gs);
137 static PetscErrorCode set_pairwise(gs_id *gs);
138 static gs_id * gsi_new(void);
139 static PetscErrorCode set_tree(gs_id *gs);
140 
141 /* same for all but vector flavor */
142 static PetscErrorCode gs_gop_local_out(gs_id *gs, PetscScalar *vals);
143 /* vector flavor */
144 static PetscErrorCode gs_gop_vec_local_out(gs_id *gs, PetscScalar *vals, PetscInt step);
145 
146 static PetscErrorCode gs_gop_vec_plus(gs_id *gs, PetscScalar *in_vals, PetscInt step);
147 static PetscErrorCode gs_gop_vec_pairwise_plus(gs_id *gs, PetscScalar *in_vals, PetscInt step);
148 static PetscErrorCode gs_gop_vec_local_plus(gs_id *gs, PetscScalar *vals, PetscInt step);
149 static PetscErrorCode gs_gop_vec_local_in_plus(gs_id *gs, PetscScalar *vals, PetscInt step);
150 static PetscErrorCode gs_gop_vec_tree_plus(gs_id *gs, PetscScalar *vals, PetscInt step);
151 
152 
153 static PetscErrorCode gs_gop_plus(gs_id *gs, PetscScalar *in_vals);
154 static PetscErrorCode gs_gop_pairwise_plus(gs_id *gs, PetscScalar *in_vals);
155 static PetscErrorCode gs_gop_local_plus(gs_id *gs, PetscScalar *vals);
156 static PetscErrorCode gs_gop_local_in_plus(gs_id *gs, PetscScalar *vals);
157 static PetscErrorCode gs_gop_tree_plus(gs_id *gs, PetscScalar *vals);
158 
159 static PetscErrorCode gs_gop_plus_hc(gs_id *gs, PetscScalar *in_vals, PetscInt dim);
160 static PetscErrorCode gs_gop_pairwise_plus_hc(gs_id *gs, PetscScalar *in_vals, PetscInt dim);
161 static PetscErrorCode gs_gop_tree_plus_hc(gs_id *gs, PetscScalar *vals, PetscInt dim);
162 
163 static PetscErrorCode gs_gop_times(gs_id *gs, PetscScalar *in_vals);
164 static PetscErrorCode gs_gop_pairwise_times(gs_id *gs, PetscScalar *in_vals);
165 static PetscErrorCode gs_gop_local_times(gs_id *gs, PetscScalar *vals);
166 static PetscErrorCode gs_gop_local_in_times(gs_id *gs, PetscScalar *vals);
167 static PetscErrorCode gs_gop_tree_times(gs_id *gs, PetscScalar *vals);
168 
169 static PetscErrorCode gs_gop_min(gs_id *gs, PetscScalar *in_vals);
170 static PetscErrorCode gs_gop_pairwise_min(gs_id *gs, PetscScalar *in_vals);
171 static PetscErrorCode gs_gop_local_min(gs_id *gs, PetscScalar *vals);
172 static PetscErrorCode gs_gop_local_in_min(gs_id *gs, PetscScalar *vals);
173 static PetscErrorCode gs_gop_tree_min(gs_id *gs, PetscScalar *vals);
174 
175 static PetscErrorCode gs_gop_min_abs(gs_id *gs, PetscScalar *in_vals);
176 static PetscErrorCode gs_gop_pairwise_min_abs(gs_id *gs, PetscScalar *in_vals);
177 static PetscErrorCode gs_gop_local_min_abs(gs_id *gs, PetscScalar *vals);
178 static PetscErrorCode gs_gop_local_in_min_abs(gs_id *gs, PetscScalar *vals);
179 static PetscErrorCode gs_gop_tree_min_abs(gs_id *gs, PetscScalar *vals);
180 
181 static PetscErrorCode gs_gop_max(gs_id *gs, PetscScalar *in_vals);
182 static PetscErrorCode gs_gop_pairwise_max(gs_id *gs, PetscScalar *in_vals);
183 static PetscErrorCode gs_gop_local_max(gs_id *gs, PetscScalar *vals);
184 static PetscErrorCode gs_gop_local_in_max(gs_id *gs, PetscScalar *vals);
185 static PetscErrorCode gs_gop_tree_max(gs_id *gs, PetscScalar *vals);
186 
187 static PetscErrorCode gs_gop_max_abs(gs_id *gs, PetscScalar *in_vals);
188 static PetscErrorCode gs_gop_pairwise_max_abs(gs_id *gs, PetscScalar *in_vals);
189 static PetscErrorCode gs_gop_local_max_abs(gs_id *gs, PetscScalar *vals);
190 static PetscErrorCode gs_gop_local_in_max_abs(gs_id *gs, PetscScalar *vals);
191 static PetscErrorCode gs_gop_tree_max_abs(gs_id *gs, PetscScalar *vals);
192 
193 static PetscErrorCode gs_gop_exists(gs_id *gs, PetscScalar *in_vals);
194 static PetscErrorCode gs_gop_pairwise_exists(gs_id *gs, PetscScalar *in_vals);
195 static PetscErrorCode gs_gop_local_exists(gs_id *gs, PetscScalar *vals);
196 static PetscErrorCode gs_gop_local_in_exists(gs_id *gs, PetscScalar *vals);
197 static PetscErrorCode gs_gop_tree_exists(gs_id *gs, PetscScalar *vals);
198 
199 /* global vars */
200 /* from comm.c module */
201 
202 static PetscInt num_gs_ids = 0;
203 
204 /* should make this dynamic ... later */
205 static PetscInt msg_buf=MAX_MSG_BUF;
206 static PetscInt vec_sz=GS_VEC_SZ;
207 static PetscInt *tree_buf=NULL;
208 static PetscInt tree_buf_sz=0;
209 static PetscInt ntree=0;
210 
211 /***************************************************************************/
212 PetscErrorCode gs_init_vec_sz(PetscInt size)
213 {
214   PetscFunctionBegin;
215   vec_sz = size;
216   PetscFunctionReturn(0);
217 }
218 
219 /******************************************************************************/
220 PetscErrorCode gs_init_msg_buf_sz(PetscInt buf_size)
221 {
222   PetscFunctionBegin;
223   msg_buf = buf_size;
224   PetscFunctionReturn(0);
225 }
226 
227 /******************************************************************************/
228 gs_id *gs_init( PetscInt *elms, PetscInt nel, PetscInt level)
229 {
230    gs_id *gs;
231   MPI_Group gs_group;
232   MPI_Comm  gs_comm;
233   PetscErrorCode ierr;
234 
235   PetscFunctionBegin;
236   /* ensure that communication package has been initialized */
237   comm_init();
238 
239 
240   /* determines if we have enough dynamic/semi-static memory */
241   /* checks input, allocs and sets gd_id template            */
242   gs = gsi_check_args(elms,nel,level);
243 
244   /* only bit mask version up and working for the moment    */
245   /* LATER :: get int list version working for sparse pblms */
246   ierr = gsi_via_bit_mask(gs);CHKERRABORT(PETSC_COMM_WORLD,ierr);
247 
248 
249   ierr = MPI_Comm_group(MPI_COMM_WORLD,&gs_group);CHKERRABORT(PETSC_COMM_WORLD,ierr);
250   ierr = MPI_Comm_create(MPI_COMM_WORLD,gs_group,&gs_comm);CHKERRABORT(PETSC_COMM_WORLD,ierr);
251   gs->gs_comm=gs_comm;
252 
253   return(gs);
254 }
255 
256 /******************************************************************************/
257 static gs_id *gsi_new(void)
258 {
259   PetscErrorCode ierr;
260   gs_id *gs;
261   gs = (gs_id *) malloc(sizeof(gs_id));
262   ierr = PetscMemzero(gs,sizeof(gs_id));CHKERRABORT(PETSC_COMM_WORLD,ierr);
263   return(gs);
264 }
265 
266 /******************************************************************************/
267 static gs_id * gsi_check_args(PetscInt *in_elms, PetscInt nel, PetscInt level)
268 {
269    PetscInt i, j, k, t2;
270   PetscInt *companion, *elms, *unique, *iptr;
271   PetscInt num_local=0, *num_to_reduce, **local_reduce;
272   PetscInt oprs[] = {NON_UNIFORM,GL_MIN,GL_MAX,GL_ADD,GL_MIN,GL_MAX,GL_MIN,GL_B_AND};
273   PetscInt vals[sizeof(oprs)/sizeof(oprs[0])-1];
274   PetscInt work[sizeof(oprs)/sizeof(oprs[0])-1];
275   gs_id *gs;
276   PetscErrorCode ierr;
277 
278 
279   if (!in_elms)
280     {SETERRABORT(PETSC_COMM_WORLD,PETSC_ERR_PLIB,"elms point to nothing!!!\n");}
281 
282   if (nel<0)
283     {SETERRABORT(PETSC_COMM_WORLD,PETSC_ERR_PLIB,"can't have fewer than 0 elms!!!\n");}
284 
285   if (nel==0)
286     {ierr = PetscInfo(0,"I don't have any elements!!!\n");CHKERRABORT(PETSC_COMM_WORLD,ierr);}
287 
288   /* get space for gs template */
289   gs = gsi_new();
290   gs->id = ++num_gs_ids;
291 
292   /* hmt 6.4.99                                            */
293   /* caller can set global ids that don't participate to 0 */
294   /* gs_init ignores all zeros in elm list                 */
295   /* negative global ids are still invalid                 */
296   for (i=j=0;i<nel;i++)
297     {if (in_elms[i]!=0) {j++;}}
298 
299   k=nel; nel=j;
300 
301   /* copy over in_elms list and create inverse map */
302   elms = (PetscInt*) malloc((nel+1)*sizeof(PetscInt));
303   companion = (PetscInt*) malloc(nel*sizeof(PetscInt));
304 
305   for (i=j=0;i<k;i++)
306     {
307       if (in_elms[i]!=0)
308         {elms[j] = in_elms[i]; companion[j++] = i;}
309     }
310 
311   if (j!=nel)
312     {SETERRABORT(PETSC_COMM_WORLD,PETSC_ERR_PLIB,"nel j mismatch!\n");}
313 
314   /* pre-pass ... check to see if sorted */
315   elms[nel] = INT_MAX;
316   iptr = elms;
317   unique = elms+1;
318   j=0;
319   while (*iptr!=INT_MAX)
320     {
321       if (*iptr++>*unique++)
322         {j=1; break;}
323     }
324 
325   /* set up inverse map */
326   if (j)
327     {
328       ierr = PetscInfo(0,"gsi_check_args() :: elm list *not* sorted!\n");CHKERRABORT(PETSC_COMM_WORLD,ierr);
329       ierr = SMI_sort((void*)elms, (void*)companion, nel, SORT_INTEGER);CHKERRABORT(PETSC_COMM_WORLD,ierr);
330     }
331   else
332     {ierr = PetscInfo(0,"gsi_check_args() :: elm list sorted!\n");CHKERRABORT(PETSC_COMM_WORLD,ierr);}
333   elms[nel] = INT_MIN;
334 
335   /* first pass */
336   /* determine number of unique elements, check pd */
337   for (i=k=0;i<nel;i+=j)
338     {
339       t2 = elms[i];
340       j=++i;
341 
342       /* clump 'em for now */
343       while (elms[j]==t2) {j++;}
344 
345       /* how many together and num local */
346       if (j-=i)
347         {num_local++; k+=j;}
348     }
349 
350   /* how many unique elements? */
351   gs->repeats=k;
352   gs->nel = nel-k;
353 
354 
355   /* number of repeats? */
356   gs->num_local = num_local;
357   num_local+=2;
358   gs->local_reduce=local_reduce=(PetscInt **)malloc(num_local*sizeof(PetscInt*));
359   gs->num_local_reduce=num_to_reduce=(PetscInt*) malloc(num_local*sizeof(PetscInt));
360 
361   unique = (PetscInt*) malloc((gs->nel+1)*sizeof(PetscInt));
362   gs->elms = unique;
363   gs->nel_total = nel;
364   gs->local_elms = elms;
365   gs->companion = companion;
366 
367   /* compess map as well as keep track of local ops */
368   for (num_local=i=j=0;i<gs->nel;i++)
369     {
370       k=j;
371       t2 = unique[i] = elms[j];
372       companion[i] = companion[j];
373 
374       while (elms[j]==t2) {j++;}
375 
376       if ((t2=(j-k))>1)
377         {
378           /* number together */
379           num_to_reduce[num_local] = t2++;
380           iptr = local_reduce[num_local++] = (PetscInt*)malloc(t2*sizeof(PetscInt));
381 
382           /* to use binary searching don't remap until we check intersection */
383           *iptr++ = i;
384 
385           /* note that we're skipping the first one */
386           while (++k<j)
387             {*(iptr++) = companion[k];}
388           *iptr = -1;
389         }
390     }
391 
392   /* sentinel for ngh_buf */
393   unique[gs->nel]=INT_MAX;
394 
395   /* for two partition sort hack */
396   num_to_reduce[num_local] = 0;
397   local_reduce[num_local] = NULL;
398   num_to_reduce[++num_local] = 0;
399   local_reduce[num_local] = NULL;
400 
401   /* load 'em up */
402   /* note one extra to hold NON_UNIFORM flag!!! */
403   vals[2] = vals[1] = vals[0] = nel;
404   if (gs->nel>0)
405     {
406        vals[3] = unique[0];
407        vals[4] = unique[gs->nel-1];
408     }
409   else
410     {
411        vals[3] = INT_MAX;
412        vals[4] = INT_MIN;
413     }
414   vals[5] = level;
415   vals[6] = num_gs_ids;
416 
417   /* GLOBAL: send 'em out */
418   ierr = giop(vals,work,sizeof(oprs)/sizeof(oprs[0])-1,oprs);CHKERRABORT(PETSC_COMM_WORLD,ierr);
419 
420   /* must be semi-pos def - only pairwise depends on this */
421   /* LATER - remove this restriction */
422   if (vals[3]<0)
423     {SETERRABORT(PETSC_COMM_WORLD,PETSC_ERR_PLIB,"gsi_check_args() :: system not semi-pos def \n");}
424 
425   if (vals[4]==INT_MAX)
426     {SETERRABORT(PETSC_COMM_WORLD,PETSC_ERR_PLIB,"gsi_check_args() :: system ub too large !\n");}
427 
428   gs->nel_min = vals[0];
429   gs->nel_max = vals[1];
430   gs->nel_sum = vals[2];
431   gs->gl_min  = vals[3];
432   gs->gl_max  = vals[4];
433   gs->negl    = vals[4]-vals[3]+1;
434 
435   if (gs->negl<=0)
436     {SETERRABORT(PETSC_COMM_WORLD,PETSC_ERR_PLIB,"gsi_check_args() :: system empty or neg :: %d\n");}
437 
438   /* LATER :: add level == -1 -> program selects level */
439   if (vals[5]<0)
440     {vals[5]=0;}
441   else if (vals[5]>num_nodes)
442     {vals[5]=num_nodes;}
443   gs->level = vals[5];
444 
445   return(gs);
446 }
447 
448 /******************************************************************************/
449 static PetscErrorCode gsi_via_bit_mask(gs_id *gs)
450 {
451    PetscInt i, nel, *elms;
452   PetscInt t1;
453   PetscInt **reduce;
454   PetscInt *map;
455   PetscErrorCode ierr;
456 
457   PetscFunctionBegin;
458   /* totally local removes ... ct_bits == 0 */
459   get_ngh_buf(gs);
460 
461   if (gs->level)
462     {set_pairwise(gs);}
463 
464   if (gs->max_left_over)
465     {set_tree(gs);}
466 
467   /* intersection local and pairwise/tree? */
468   gs->num_local_total = gs->num_local;
469   gs->gop_local_reduce = gs->local_reduce;
470   gs->num_gop_local_reduce = gs->num_local_reduce;
471 
472   map = gs->companion;
473 
474   /* is there any local compression */
475   if (!gs->num_local) {
476     gs->local_strength = NONE;
477     gs->num_local_gop = 0;
478   } else {
479       /* ok find intersection */
480       map = gs->companion;
481       reduce = gs->local_reduce;
482       for (i=0, t1=0; i<gs->num_local; i++, reduce++)
483         {
484           if ((ivec_binary_search(**reduce,gs->pw_elm_list,gs->len_pw_list)>=0)
485               ||
486               ivec_binary_search(**reduce,gs->tree_map_in,gs->tree_map_sz)>=0)
487             {
488               t1++;
489               if (gs->num_local_reduce[i]<=0) SETERRQ(PETSC_ERR_PLIB,"nobody in list?");
490               gs->num_local_reduce[i] *= -1;
491             }
492            **reduce=map[**reduce];
493         }
494 
495       /* intersection is empty */
496       if (!t1)
497         {
498           gs->local_strength = FULL;
499           gs->num_local_gop = 0;
500         }
501       /* intersection not empty */
502       else
503         {
504           gs->local_strength = PARTIAL;
505           ierr = SMI_sort((void*)gs->num_local_reduce, (void*)gs->local_reduce, gs->num_local + 1, SORT_INT_PTR);CHKERRQ(ierr);
506 
507           gs->num_local_gop = t1;
508           gs->num_local_total =  gs->num_local;
509           gs->num_local    -= t1;
510           gs->gop_local_reduce = gs->local_reduce;
511           gs->num_gop_local_reduce = gs->num_local_reduce;
512 
513           for (i=0; i<t1; i++)
514             {
515               if (gs->num_gop_local_reduce[i]>=0) SETERRQ(PETSC_ERR_PLIB,"they aren't negative?");
516               gs->num_gop_local_reduce[i] *= -1;
517               gs->local_reduce++;
518               gs->num_local_reduce++;
519             }
520           gs->local_reduce++;
521           gs->num_local_reduce++;
522         }
523     }
524 
525   elms = gs->pw_elm_list;
526   nel  = gs->len_pw_list;
527   for (i=0; i<nel; i++)
528     {elms[i] = map[elms[i]];}
529 
530   elms = gs->tree_map_in;
531   nel  = gs->tree_map_sz;
532   for (i=0; i<nel; i++)
533     {elms[i] = map[elms[i]];}
534 
535   /* clean up */
536   free((void*) gs->local_elms);
537   free((void*) gs->companion);
538   free((void*) gs->elms);
539   free((void*) gs->ngh_buf);
540   gs->local_elms = gs->companion = gs->elms = gs->ngh_buf = NULL;
541   PetscFunctionReturn(0);
542 }
543 
544 /******************************************************************************/
545 static PetscErrorCode place_in_tree( PetscInt elm)
546 {
547    PetscInt *tp, n;
548 
549   PetscFunctionBegin;
550   if (ntree==tree_buf_sz)
551     {
552       if (tree_buf_sz)
553         {
554           tp = tree_buf;
555           n = tree_buf_sz;
556           tree_buf_sz<<=1;
557           tree_buf = (PetscInt*)malloc(tree_buf_sz*sizeof(PetscInt));
558           ivec_copy(tree_buf,tp,n);
559           free(tp);
560         }
561       else
562         {
563           tree_buf_sz = TREE_BUF_SZ;
564           tree_buf = (PetscInt*)malloc(tree_buf_sz*sizeof(PetscInt));
565         }
566     }
567 
568   tree_buf[ntree++] = elm;
569   PetscFunctionReturn(0);
570 }
571 
572 /******************************************************************************/
573 static PetscErrorCode get_ngh_buf(gs_id *gs)
574 {
575    PetscInt i, j, npw=0, ntree_map=0;
576   PetscInt p_mask_size, ngh_buf_size, buf_size;
577   PetscInt *p_mask, *sh_proc_mask, *pw_sh_proc_mask;
578   PetscInt *ngh_buf, *buf1, *buf2;
579   PetscInt offset, per_load, num_loads, or_ct, start, end;
580   PetscInt *ptr1, *ptr2, i_start, negl, nel, *elms;
581   PetscInt oper=GL_B_OR;
582   PetscInt *ptr3, *t_mask, level, ct1, ct2;
583   PetscErrorCode ierr;
584 
585   PetscFunctionBegin;
586   /* to make life easier */
587   nel   = gs->nel;
588   elms  = gs->elms;
589   level = gs->level;
590 
591   /* det #bytes needed for processor bit masks and init w/mask cor. to my_id */
592   p_mask = (PetscInt*) malloc(p_mask_size=len_bit_mask(num_nodes));
593   ierr = set_bit_mask(p_mask,p_mask_size,my_id);CHKERRQ(ierr);
594 
595   /* allocate space for masks and info bufs */
596   gs->nghs = sh_proc_mask = (PetscInt*) malloc(p_mask_size);
597   gs->pw_nghs = pw_sh_proc_mask = (PetscInt*) malloc(p_mask_size);
598   gs->ngh_buf_sz = ngh_buf_size = p_mask_size*nel;
599   t_mask = (PetscInt*) malloc(p_mask_size);
600   gs->ngh_buf = ngh_buf = (PetscInt*) malloc(ngh_buf_size);
601 
602   /* comm buffer size ... memory usage bounded by ~2*msg_buf */
603   /* had thought I could exploit rendezvous threshold */
604 
605   /* default is one pass */
606   per_load = negl  = gs->negl;
607   gs->num_loads = num_loads = 1;
608   i=p_mask_size*negl;
609 
610   /* possible overflow on buffer size */
611   /* overflow hack                    */
612   if (i<0) {i=INT_MAX;}
613 
614   buf_size = PetscMin(msg_buf,i);
615 
616   /* can we do it? */
617   if (p_mask_size>buf_size) SETERRQ2(PETSC_ERR_PLIB,"get_ngh_buf() :: buf<pms :: %d>%d\n",p_mask_size,buf_size);
618 
619   /* get giop buf space ... make *only* one malloc */
620   buf1 = (PetscInt*) malloc(buf_size<<1);
621 
622   /* more than one gior exchange needed? */
623   if (buf_size!=i)
624     {
625       per_load = buf_size/p_mask_size;
626       buf_size = per_load*p_mask_size;
627       gs->num_loads = num_loads = negl/per_load + (negl%per_load>0);
628     }
629 
630 
631   /* convert buf sizes from #bytes to #ints - 32 bit only! */
632   p_mask_size/=sizeof(PetscInt); ngh_buf_size/=sizeof(PetscInt); buf_size/=sizeof(PetscInt);
633 
634   /* find giop work space */
635   buf2 = buf1+buf_size;
636 
637   /* hold #ints needed for processor masks */
638   gs->mask_sz=p_mask_size;
639 
640   /* init buffers */
641   ierr = ivec_zero(sh_proc_mask,p_mask_size);CHKERRQ(ierr);
642   ierr = ivec_zero(pw_sh_proc_mask,p_mask_size);CHKERRQ(ierr);
643   ierr = ivec_zero(ngh_buf,ngh_buf_size);CHKERRQ(ierr);
644 
645   /* HACK reset tree info */
646   tree_buf=NULL;
647   tree_buf_sz=ntree=0;
648 
649   /* ok do it */
650   for (ptr1=ngh_buf,ptr2=elms,end=gs->gl_min,or_ct=i=0; or_ct<num_loads; or_ct++)
651     {
652       /* identity for bitwise or is 000...000 */
653       ivec_zero(buf1,buf_size);
654 
655       /* load msg buffer */
656       for (start=end,end+=per_load,i_start=i; (offset=*ptr2)<end; i++, ptr2++)
657         {
658           offset = (offset-start)*p_mask_size;
659           ivec_copy(buf1+offset,p_mask,p_mask_size);
660         }
661 
662       /* GLOBAL: pass buffer */
663       ierr = giop(buf1,buf2,buf_size,&oper);CHKERRQ(ierr);
664 
665 
666       /* unload buffer into ngh_buf */
667       ptr2=(elms+i_start);
668       for(ptr3=buf1,j=start; j<end; ptr3+=p_mask_size,j++)
669         {
670           /* I own it ... may have to pairwise it */
671           if (j==*ptr2)
672             {
673               /* do i share it w/anyone? */
674               ct1 = ct_bits((char *)ptr3,p_mask_size*sizeof(PetscInt));
675               /* guess not */
676               if (ct1<2)
677                 {ptr2++; ptr1+=p_mask_size; continue;}
678 
679               /* i do ... so keep info and turn off my bit */
680               ivec_copy(ptr1,ptr3,p_mask_size);
681               ierr = ivec_xor(ptr1,p_mask,p_mask_size);CHKERRQ(ierr);
682               ierr = ivec_or(sh_proc_mask,ptr1,p_mask_size);CHKERRQ(ierr);
683 
684               /* is it to be done pairwise? */
685               if (--ct1<=level)
686                 {
687                   npw++;
688 
689                   /* turn on high bit to indicate pw need to process */
690                   *ptr2++ |= TOP_BIT;
691                   ierr = ivec_or(pw_sh_proc_mask,ptr1,p_mask_size);CHKERRQ(ierr);
692                   ptr1+=p_mask_size;
693                   continue;
694                 }
695 
696               /* get set for next and note that I have a tree contribution */
697               /* could save exact elm index for tree here -> save a search */
698               ptr2++; ptr1+=p_mask_size; ntree_map++;
699             }
700           /* i don't but still might be involved in tree */
701           else
702             {
703 
704               /* shared by how many? */
705               ct1 = ct_bits((char *)ptr3,p_mask_size*sizeof(PetscInt));
706 
707               /* none! */
708               if (ct1<2) continue;
709 
710               /* is it going to be done pairwise? but not by me of course!*/
711               if (--ct1<=level) continue;
712             }
713           /* LATER we're going to have to process it NOW */
714           /* nope ... tree it */
715           ierr = place_in_tree(j);CHKERRQ(ierr);
716         }
717     }
718 
719   free((void*)t_mask);
720   free((void*)buf1);
721 
722   gs->len_pw_list=npw;
723   gs->num_nghs = ct_bits((char *)sh_proc_mask,p_mask_size*sizeof(PetscInt));
724 
725   /* expand from bit mask list to int list and save ngh list */
726   gs->nghs = (PetscInt*) malloc(gs->num_nghs * sizeof(PetscInt));
727   bm_to_proc((char *)sh_proc_mask,p_mask_size*sizeof(PetscInt),gs->nghs);
728 
729   gs->num_pw_nghs = ct_bits((char *)pw_sh_proc_mask,p_mask_size*sizeof(PetscInt));
730 
731   oper = GL_MAX;
732   ct1 = gs->num_nghs;
733   ierr = giop(&ct1,&ct2,1,&oper);CHKERRQ(ierr);
734   gs->max_nghs = ct1;
735 
736   gs->tree_map_sz  = ntree_map;
737   gs->max_left_over=ntree;
738 
739   free((void*)p_mask);
740   free((void*)sh_proc_mask);
741   PetscFunctionReturn(0);
742 }
743 
744 /******************************************************************************/
745 static PetscErrorCode set_pairwise(gs_id *gs)
746 {
747    PetscInt i, j;
748   PetscInt p_mask_size;
749   PetscInt *p_mask, *sh_proc_mask, *tmp_proc_mask;
750   PetscInt *ngh_buf, *buf2;
751   PetscInt offset;
752   PetscInt *msg_list, *msg_size, **msg_nodes, nprs;
753   PetscInt *pairwise_elm_list, len_pair_list=0;
754   PetscInt *iptr, t1, i_start, nel, *elms;
755   PetscInt ct;
756   PetscErrorCode ierr;
757 
758   PetscFunctionBegin;
759   /* to make life easier */
760   nel  = gs->nel;
761   elms = gs->elms;
762   ngh_buf = gs->ngh_buf;
763   sh_proc_mask  = gs->pw_nghs;
764 
765   /* need a few temp masks */
766   p_mask_size   = len_bit_mask(num_nodes);
767   p_mask        = (PetscInt*) malloc(p_mask_size);
768   tmp_proc_mask = (PetscInt*) malloc(p_mask_size);
769 
770   /* set mask to my my_id's bit mask */
771   ierr = set_bit_mask(p_mask,p_mask_size,my_id);CHKERRQ(ierr);
772 
773   p_mask_size /= sizeof(PetscInt);
774 
775   len_pair_list=gs->len_pw_list;
776   gs->pw_elm_list=pairwise_elm_list=(PetscInt*)malloc((len_pair_list+1)*sizeof(PetscInt));
777 
778   /* how many processors (nghs) do we have to exchange with? */
779   nprs=gs->num_pairs=ct_bits((char *)sh_proc_mask,p_mask_size*sizeof(PetscInt));
780 
781 
782   /* allocate space for gs_gop() info */
783   gs->pair_list = msg_list = (PetscInt *)  malloc(sizeof(PetscInt)*nprs);
784   gs->msg_sizes = msg_size  = (PetscInt *)  malloc(sizeof(PetscInt)*nprs);
785   gs->node_list = msg_nodes = (PetscInt **) malloc(sizeof(PetscInt*)*(nprs+1));
786 
787   /* init msg_size list */
788   ierr = ivec_zero(msg_size,nprs);CHKERRQ(ierr);
789 
790   /* expand from bit mask list to int list */
791   ierr = bm_to_proc((char *)sh_proc_mask,p_mask_size*sizeof(PetscInt),msg_list);CHKERRQ(ierr);
792 
793   /* keep list of elements being handled pairwise */
794   for (i=j=0;i<nel;i++)
795     {
796       if (elms[i] & TOP_BIT)
797         {elms[i] ^= TOP_BIT; pairwise_elm_list[j++] = i;}
798     }
799   pairwise_elm_list[j] = -1;
800 
801   gs->msg_ids_out = (MPI_Request *)  malloc(sizeof(MPI_Request)*(nprs+1));
802   gs->msg_ids_out[nprs] = MPI_REQUEST_NULL;
803   gs->msg_ids_in = (MPI_Request *)  malloc(sizeof(MPI_Request)*(nprs+1));
804   gs->msg_ids_in[nprs] = MPI_REQUEST_NULL;
805   gs->pw_vals = (PetscScalar *) malloc(sizeof(PetscScalar)*len_pair_list*vec_sz);
806 
807   /* find who goes to each processor */
808   for (i_start=i=0;i<nprs;i++)
809     {
810       /* processor i's mask */
811       ierr = set_bit_mask(p_mask,p_mask_size*sizeof(PetscInt),msg_list[i]);CHKERRQ(ierr);
812 
813       /* det # going to processor i */
814       for (ct=j=0;j<len_pair_list;j++)
815         {
816           buf2 = ngh_buf+(pairwise_elm_list[j]*p_mask_size);
817           ierr = ivec_and3(tmp_proc_mask,p_mask,buf2,p_mask_size);CHKERRQ(ierr);
818           if (ct_bits((char *)tmp_proc_mask,p_mask_size*sizeof(PetscInt)))
819             {ct++;}
820         }
821       msg_size[i] = ct;
822       i_start = PetscMax(i_start,ct);
823 
824       /*space to hold nodes in message to first neighbor */
825       msg_nodes[i] = iptr = (PetscInt*) malloc(sizeof(PetscInt)*(ct+1));
826 
827       for (j=0;j<len_pair_list;j++)
828         {
829           buf2 = ngh_buf+(pairwise_elm_list[j]*p_mask_size);
830           ierr = ivec_and3(tmp_proc_mask,p_mask,buf2,p_mask_size);CHKERRQ(ierr);
831           if (ct_bits((char *)tmp_proc_mask,p_mask_size*sizeof(PetscInt)))
832             {*iptr++ = j;}
833         }
834       *iptr = -1;
835     }
836   msg_nodes[nprs] = NULL;
837 
838   j=gs->loc_node_pairs=i_start;
839   t1 = GL_MAX;
840   ierr = giop(&i_start,&offset,1,&t1);CHKERRQ(ierr);
841   gs->max_node_pairs = i_start;
842 
843   i_start=j;
844   t1 = GL_MIN;
845   ierr = giop(&i_start,&offset,1,&t1);CHKERRQ(ierr);
846   gs->min_node_pairs = i_start;
847 
848   i_start=j;
849   t1 = GL_ADD;
850   ierr = giop(&i_start,&offset,1,&t1);CHKERRQ(ierr);
851   gs->avg_node_pairs = i_start/num_nodes + 1;
852 
853   i_start=nprs;
854   t1 = GL_MAX;
855   giop(&i_start,&offset,1,&t1);
856   gs->max_pairs = i_start;
857 
858 
859   /* remap pairwise in tail of gsi_via_bit_mask() */
860   gs->msg_total = ivec_sum(gs->msg_sizes,nprs);
861   gs->out = (PetscScalar *) malloc(sizeof(PetscScalar)*gs->msg_total*vec_sz);
862   gs->in  = (PetscScalar *) malloc(sizeof(PetscScalar)*gs->msg_total*vec_sz);
863 
864   /* reset malloc pool */
865   free((void*)p_mask);
866   free((void*)tmp_proc_mask);
867   PetscFunctionReturn(0);
868 }
869 
870 /* to do pruned tree just save ngh buf copy for each one and decode here!
871 ******************************************************************************/
872 static PetscErrorCode set_tree(gs_id *gs)
873 {
874   PetscInt i, j, n, nel;
875   PetscInt *iptr_in, *iptr_out, *tree_elms, *elms;
876 
877   PetscFunctionBegin;
878   /* local work ptrs */
879   elms = gs->elms;
880   nel     = gs->nel;
881 
882   /* how many via tree */
883   gs->tree_nel  = n = ntree;
884   gs->tree_elms = tree_elms = iptr_in = tree_buf;
885   gs->tree_buf  = (PetscScalar *) malloc(sizeof(PetscScalar)*n*vec_sz);
886   gs->tree_work = (PetscScalar *) malloc(sizeof(PetscScalar)*n*vec_sz);
887   j=gs->tree_map_sz;
888   gs->tree_map_in = iptr_in  = (PetscInt*) malloc(sizeof(PetscInt)*(j+1));
889   gs->tree_map_out = iptr_out = (PetscInt*) malloc(sizeof(PetscInt)*(j+1));
890 
891   /* search the longer of the two lists */
892   /* note ... could save this info in get_ngh_buf and save searches */
893   if (n<=nel)
894     {
895       /* bijective fct w/remap - search elm list */
896       for (i=0; i<n; i++)
897         {
898           if ((j=ivec_binary_search(*tree_elms++,elms,nel))>=0)
899             {*iptr_in++ = j; *iptr_out++ = i;}
900         }
901     }
902   else
903     {
904       for (i=0; i<nel; i++)
905         {
906           if ((j=ivec_binary_search(*elms++,tree_elms,n))>=0)
907             {*iptr_in++ = i; *iptr_out++ = j;}
908         }
909     }
910 
911   /* sentinel */
912   *iptr_in = *iptr_out = -1;
913   PetscFunctionReturn(0);
914 }
915 
916 /******************************************************************************/
917 static PetscErrorCode gs_gop_local_out( gs_id *gs,  PetscScalar *vals)
918 {
919   PetscInt *num, *map, **reduce;
920   PetscScalar tmp;
921 
922   PetscFunctionBegin;
923   num    = gs->num_gop_local_reduce;
924   reduce = gs->gop_local_reduce;
925   while ((map = *reduce++))
926     {
927       /* wall */
928       if (*num == 2)
929         {
930           num ++;
931           vals[map[1]] = vals[map[0]];
932         }
933       /* corner shared by three elements */
934       else if (*num == 3)
935         {
936           num ++;
937           vals[map[2]] = vals[map[1]] = vals[map[0]];
938         }
939       /* corner shared by four elements */
940       else if (*num == 4)
941         {
942           num ++;
943           vals[map[3]] = vals[map[2]] = vals[map[1]] = vals[map[0]];
944         }
945       /* general case ... odd geoms ... 3D*/
946       else
947         {
948           num++;
949           tmp = *(vals + *map++);
950           while (*map >= 0)
951             {*(vals + *map++) = tmp;}
952         }
953     }
954   PetscFunctionReturn(0);
955 }
956 
957 
958 /******************************************************************************/
959 PetscErrorCode gs_gop( gs_id *gs,  PetscScalar *vals,  const char *op)
960 {
961   PetscErrorCode ierr;
962 
963   PetscFunctionBegin;
964   switch (*op) {
965   case '+':
966     gs_gop_plus(gs,vals);
967     break;
968   case '*':
969     gs_gop_times(gs,vals);
970     break;
971   case 'a':
972     gs_gop_min_abs(gs,vals);
973     break;
974   case 'A':
975     gs_gop_max_abs(gs,vals);
976     break;
977   case 'e':
978     gs_gop_exists(gs,vals);
979     break;
980   case 'm':
981     gs_gop_min(gs,vals);
982     break;
983   case 'M':
984     gs_gop_max(gs,vals); break;
985   default:
986     ierr = PetscInfo1(0,"gs_gop() :: %c is not a valid op",op[0]);CHKERRQ(ierr);
987     ierr = PetscInfo(0,"gs_gop() :: default :: plus");CHKERRQ(ierr);
988     gs_gop_plus(gs,vals);
989     break;
990   }
991   PetscFunctionReturn(0);
992 }
993 
994 /******************************************************************************/
995 static PetscErrorCode gs_gop_exists( gs_id *gs,  PetscScalar *vals)
996 {
997   PetscFunctionBegin;
998   /* local only operations!!! */
999   if (gs->num_local)
1000     {gs_gop_local_exists(gs,vals);}
1001 
1002   /* if intersection tree/pairwise and local isn't empty */
1003   if (gs->num_local_gop)
1004     {
1005       gs_gop_local_in_exists(gs,vals);
1006 
1007       /* pairwise */
1008       if (gs->num_pairs)
1009         {gs_gop_pairwise_exists(gs,vals);}
1010 
1011       /* tree */
1012       else if (gs->max_left_over)
1013         {gs_gop_tree_exists(gs,vals);}
1014 
1015       gs_gop_local_out(gs,vals);
1016     }
1017   /* if intersection tree/pairwise and local is empty */
1018   else
1019     {
1020       /* pairwise */
1021       if (gs->num_pairs)
1022         {gs_gop_pairwise_exists(gs,vals);}
1023 
1024       /* tree */
1025       else if (gs->max_left_over)
1026         {gs_gop_tree_exists(gs,vals);}
1027     }
1028   PetscFunctionReturn(0);
1029 }
1030 
1031 /******************************************************************************/
1032 static PetscErrorCode gs_gop_local_exists( gs_id *gs,  PetscScalar *vals)
1033 {
1034    PetscInt         *num, *map, **reduce;
1035    PetscScalar tmp;
1036 
1037   PetscFunctionBegin;
1038   num    = gs->num_local_reduce;
1039   reduce = gs->local_reduce;
1040   while ((map = *reduce))
1041     {
1042       num ++;
1043       tmp = 0.0;
1044       while (*map >= 0)
1045         {tmp = EXISTS(tmp,*(vals + *map)); map++;}
1046 
1047       map = *reduce++;
1048       while (*map >= 0)
1049         {*(vals + *map++) = tmp;}
1050     }
1051   PetscFunctionReturn(0);
1052 }
1053 
1054 /******************************************************************************/
1055 static PetscErrorCode gs_gop_local_in_exists( gs_id *gs,  PetscScalar *vals)
1056 {
1057   PetscInt         *num, *map, **reduce;
1058   PetscScalar *base;
1059 
1060   PetscFunctionBegin;
1061   num    = gs->num_gop_local_reduce;
1062   reduce = gs->gop_local_reduce;
1063   while ((map = *reduce++))
1064     {
1065       num++;
1066       base = vals + *map++;
1067       while (*map >= 0)
1068         {*base = EXISTS(*base,*(vals + *map)); map++;}
1069     }
1070   PetscFunctionReturn(0);
1071 }
1072 
1073 static PetscErrorCode gs_gop_pairwise_exists( gs_id *gs,  PetscScalar *in_vals)
1074 {
1075   PetscScalar    *dptr1, *dptr2, *dptr3, *in1, *in2;
1076   PetscInt            *iptr, *msg_list, *msg_size, **msg_nodes;
1077   PetscInt            *pw, *list, *size, **nodes;
1078   MPI_Request    *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
1079   MPI_Status     status;
1080   PetscErrorCode ierr;
1081 
1082   PetscFunctionBegin;
1083   /* strip and load s */
1084   msg_list =list         = gs->pair_list;
1085   msg_size =size         = gs->msg_sizes;
1086   msg_nodes=nodes        = gs->node_list;
1087   iptr=pw                = gs->pw_elm_list;
1088   dptr1=dptr3            = gs->pw_vals;
1089   msg_ids_in  = ids_in   = gs->msg_ids_in;
1090   msg_ids_out = ids_out  = gs->msg_ids_out;
1091   dptr2                  = gs->out;
1092   in1=in2                = gs->in;
1093 
1094   /* post the receives */
1095   do
1096     {
1097       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
1098          second one *list and do list++ afterwards */
1099       ierr = MPI_Irecv(in1, *size, MPIU_SCALAR, MPI_ANY_SOURCE, MSGTAG1 + *list++, gs->gs_comm, msg_ids_in++);CHKERRQ(ierr);
1100       in1 += *size++;
1101     }
1102   while (*++msg_nodes);
1103   msg_nodes=nodes;
1104 
1105   /* load gs values into in out gs buffers */
1106   while (*iptr >= 0)
1107     {*dptr3++ = *(in_vals + *iptr++);}
1108 
1109   /* load out buffers and post the sends */
1110   while ((iptr = *msg_nodes++))
1111     {
1112       dptr3 = dptr2;
1113       while (*iptr >= 0)
1114         {*dptr2++ = *(dptr1 + *iptr++);}
1115       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
1116       /* is msg_ids_out++ correct? */
1117       ierr = MPI_Isend(dptr3, *msg_size++, MPIU_SCALAR, *msg_list++, MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);CHKERRQ(ierr);
1118     }
1119 
1120   if (gs->max_left_over)
1121     {gs_gop_tree_exists(gs,in_vals);}
1122 
1123   /* process the received data */
1124   msg_nodes=nodes;
1125   while ((iptr = *nodes++))
1126     {
1127       /* Should I check the return value of MPI_Wait() or status? */
1128       /* Can this loop be replaced by a call to MPI_Waitall()? */
1129       ierr = MPI_Wait(ids_in++, &status);CHKERRQ(ierr);
1130       while (*iptr >= 0)
1131         {*(dptr1 + *iptr) = EXISTS(*(dptr1 + *iptr),*in2); iptr++; in2++;}
1132     }
1133 
1134   /* replace vals */
1135   while (*pw >= 0)
1136     {*(in_vals + *pw++) = *dptr1++;}
1137 
1138   /* clear isend message handles */
1139   /* This changed for clarity though it could be the same */
1140   while (*msg_nodes++)
1141     /* Should I check the return value of MPI_Wait() or status? */
1142     /* Can this loop be replaced by a call to MPI_Waitall()? */
1143     {ierr = MPI_Wait(ids_out++, &status);CHKERRQ(ierr);}
1144   PetscFunctionReturn(0);
1145 }
1146 /******************************************************************************/
1147 static PetscErrorCode gs_gop_tree_exists(gs_id *gs, PetscScalar *vals)
1148 {
1149   PetscInt         size;
1150   PetscInt         *in, *out;
1151   PetscScalar *buf, *work;
1152   PetscInt         op[] = {GL_EXISTS,0};
1153 
1154   PetscFunctionBegin;
1155   in   = gs->tree_map_in;
1156   out  = gs->tree_map_out;
1157   buf  = gs->tree_buf;
1158   work = gs->tree_work;
1159   size = gs->tree_nel;
1160 
1161   rvec_zero(buf,size);
1162 
1163   while (*in >= 0)
1164     {
1165       /*
1166       printf("%d :: out=%d\n",my_id,*out);
1167       printf("%d :: in=%d\n",my_id,*in);
1168       */
1169       *(buf + *out++) = *(vals + *in++);
1170     }
1171 
1172   grop(buf,work,size,op);
1173 
1174   in   = gs->tree_map_in;
1175   out  = gs->tree_map_out;
1176 
1177   while (*in >= 0)
1178     {*(vals + *in++) = *(buf + *out++);}
1179   PetscFunctionReturn(0);
1180 }
1181 
1182 /*******************************************************************************/
1183 static PetscErrorCode gs_gop_max_abs( gs_id *gs,  PetscScalar *vals)
1184 {
1185   PetscFunctionBegin;
1186   /* local only operations!!! */
1187   if (gs->num_local)
1188     {gs_gop_local_max_abs(gs,vals);}
1189 
1190   /* if intersection tree/pairwise and local isn't empty */
1191   if (gs->num_local_gop)
1192     {
1193       gs_gop_local_in_max_abs(gs,vals);
1194 
1195       /* pairwise */
1196       if (gs->num_pairs)
1197         {gs_gop_pairwise_max_abs(gs,vals);}
1198 
1199       /* tree */
1200       else if (gs->max_left_over)
1201         {gs_gop_tree_max_abs(gs,vals);}
1202 
1203       gs_gop_local_out(gs,vals);
1204     }
1205   /* if intersection tree/pairwise and local is empty */
1206   else
1207     {
1208       /* pairwise */
1209       if (gs->num_pairs)
1210         {gs_gop_pairwise_max_abs(gs,vals);}
1211 
1212       /* tree */
1213       else if (gs->max_left_over)
1214         {gs_gop_tree_max_abs(gs,vals);}
1215     }
1216   PetscFunctionReturn(0);
1217 }
1218 
1219 /******************************************************************************/
1220 static PetscErrorCode gs_gop_local_max_abs( gs_id *gs,  PetscScalar *vals)
1221 {
1222   PetscInt         *num, *map, **reduce;
1223   PetscScalar tmp;
1224 
1225   PetscFunctionBegin;
1226   num    = gs->num_local_reduce;
1227   reduce = gs->local_reduce;
1228   while ((map = *reduce))
1229     {
1230       num ++;
1231       tmp = 0.0;
1232       while (*map >= 0)
1233         {tmp = MAX_FABS(tmp,*(vals + *map)); map++;}
1234 
1235       map = *reduce++;
1236       while (*map >= 0)
1237         {*(vals + *map++) = tmp;}
1238     }
1239   PetscFunctionReturn(0);
1240 }
1241 
1242 /******************************************************************************/
1243 static PetscErrorCode gs_gop_local_in_max_abs( gs_id *gs,  PetscScalar *vals)
1244 {
1245   PetscInt         *num, *map, **reduce;
1246   PetscScalar *base;
1247 
1248   PetscFunctionBegin;
1249   num    = gs->num_gop_local_reduce;
1250   reduce = gs->gop_local_reduce;
1251   while ((map = *reduce++))
1252     {
1253       num++;
1254       base = vals + *map++;
1255       while (*map >= 0)
1256         {*base = MAX_FABS(*base,*(vals + *map)); map++;}
1257     }
1258   PetscFunctionReturn(0);
1259 }
1260 
1261 /******************************************************************************/
1262 static PetscErrorCode gs_gop_pairwise_max_abs( gs_id *gs,  PetscScalar *in_vals)
1263 {
1264   PetscScalar    *dptr1, *dptr2, *dptr3, *in1, *in2;
1265   PetscInt            *iptr, *msg_list, *msg_size, **msg_nodes;
1266   PetscInt            *pw, *list, *size, **nodes;
1267   MPI_Request    *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
1268   MPI_Status     status;
1269   PetscErrorCode ierr;
1270 
1271   PetscFunctionBegin;
1272   /* strip and load s */
1273   msg_list =list         = gs->pair_list;
1274   msg_size =size         = gs->msg_sizes;
1275   msg_nodes=nodes        = gs->node_list;
1276   iptr=pw                = gs->pw_elm_list;
1277   dptr1=dptr3            = gs->pw_vals;
1278   msg_ids_in  = ids_in   = gs->msg_ids_in;
1279   msg_ids_out = ids_out  = gs->msg_ids_out;
1280   dptr2                  = gs->out;
1281   in1=in2                = gs->in;
1282 
1283   /* post the receives */
1284   /*  msg_nodes=nodes; */
1285   do
1286     {
1287       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
1288          second one *list and do list++ afterwards */
1289       ierr = MPI_Irecv(in1, *size, MPIU_SCALAR, MPI_ANY_SOURCE, MSGTAG1 + *list++, gs->gs_comm, msg_ids_in++);CHKERRQ(ierr);
1290       in1 += *size++;
1291     }
1292   while (*++msg_nodes);
1293   msg_nodes=nodes;
1294 
1295   /* load gs values into in out gs buffers */
1296   while (*iptr >= 0)
1297     {*dptr3++ = *(in_vals + *iptr++);}
1298 
1299   /* load out buffers and post the sends */
1300   while ((iptr = *msg_nodes++))
1301     {
1302       dptr3 = dptr2;
1303       while (*iptr >= 0)
1304         {*dptr2++ = *(dptr1 + *iptr++);}
1305       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
1306       /* is msg_ids_out++ correct? */
1307       ierr = MPI_Isend(dptr3, *msg_size++, MPIU_SCALAR, *msg_list++, MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);CHKERRQ(ierr);
1308     }
1309 
1310   if (gs->max_left_over)
1311     {gs_gop_tree_max_abs(gs,in_vals);}
1312 
1313   /* process the received data */
1314   msg_nodes=nodes;
1315   while ((iptr = *nodes++))
1316     {
1317       /* Should I check the return value of MPI_Wait() or status? */
1318       /* Can this loop be replaced by a call to MPI_Waitall()? */
1319       ierr = MPI_Wait(ids_in++, &status);CHKERRQ(ierr);
1320       while (*iptr >= 0)
1321         {*(dptr1 + *iptr) = MAX_FABS(*(dptr1 + *iptr),*in2); iptr++; in2++;}
1322     }
1323 
1324   /* replace vals */
1325   while (*pw >= 0)
1326     {*(in_vals + *pw++) = *dptr1++;}
1327 
1328   /* clear isend message handles */
1329   /* This changed for clarity though it could be the same */
1330   while (*msg_nodes++)
1331     /* Should I check the return value of MPI_Wait() or status? */
1332     /* Can this loop be replaced by a call to MPI_Waitall()? */
1333     {ierr = MPI_Wait(ids_out++, &status);CHKERRQ(ierr);}
1334   PetscFunctionReturn(0);
1335 }
1336 
1337 /******************************************************************************/
1338 static PetscErrorCode gs_gop_tree_max_abs(gs_id *gs, PetscScalar *vals)
1339 {
1340   PetscInt         size;
1341   PetscInt         *in, *out;
1342   PetscScalar *buf, *work;
1343   PetscInt         op[] = {GL_MAX_ABS,0};
1344 
1345   PetscFunctionBegin;
1346   in   = gs->tree_map_in;
1347   out  = gs->tree_map_out;
1348   buf  = gs->tree_buf;
1349   work = gs->tree_work;
1350   size = gs->tree_nel;
1351 
1352   rvec_zero(buf,size);
1353 
1354   while (*in >= 0)
1355     {
1356       /*
1357       printf("%d :: out=%d\n",my_id,*out);
1358       printf("%d :: in=%d\n",my_id,*in);
1359       */
1360       *(buf + *out++) = *(vals + *in++);
1361     }
1362 
1363   grop(buf,work,size,op);
1364 
1365   in   = gs->tree_map_in;
1366   out  = gs->tree_map_out;
1367 
1368   while (*in >= 0)
1369     {*(vals + *in++) = *(buf + *out++);}
1370   PetscFunctionReturn(0);
1371 }
1372 
1373 /******************************************************************************/
1374 static PetscErrorCode gs_gop_max( gs_id *gs,  PetscScalar *vals)
1375 {
1376   PetscFunctionBegin;
1377   /* local only operations!!! */
1378   if (gs->num_local)
1379     {gs_gop_local_max(gs,vals);}
1380 
1381   /* if intersection tree/pairwise and local isn't empty */
1382   if (gs->num_local_gop)
1383     {
1384       gs_gop_local_in_max(gs,vals);
1385 
1386       /* pairwise */
1387       if (gs->num_pairs)
1388         {gs_gop_pairwise_max(gs,vals);}
1389 
1390       /* tree */
1391       else if (gs->max_left_over)
1392         {gs_gop_tree_max(gs,vals);}
1393 
1394       gs_gop_local_out(gs,vals);
1395     }
1396   /* if intersection tree/pairwise and local is empty */
1397   else
1398     {
1399       /* pairwise */
1400       if (gs->num_pairs)
1401         {gs_gop_pairwise_max(gs,vals);}
1402 
1403       /* tree */
1404       else if (gs->max_left_over)
1405         {gs_gop_tree_max(gs,vals);}
1406     }
1407   PetscFunctionReturn(0);
1408 }
1409 
1410 /******************************************************************************/
1411 static PetscErrorCode gs_gop_local_max( gs_id *gs,  PetscScalar *vals)
1412 {
1413   PetscInt         *num, *map, **reduce;
1414   PetscScalar tmp;
1415 
1416   PetscFunctionBegin;
1417   num    = gs->num_local_reduce;
1418   reduce = gs->local_reduce;
1419   while ((map = *reduce))
1420     {
1421       num ++;
1422       tmp = -REAL_MAX;
1423       while (*map >= 0)
1424         {tmp = PetscMax(tmp,*(vals + *map)); map++;}
1425 
1426       map = *reduce++;
1427       while (*map >= 0)
1428         {*(vals + *map++) = tmp;}
1429     }
1430   PetscFunctionReturn(0);
1431 }
1432 
1433 /******************************************************************************/
1434 static PetscErrorCode gs_gop_local_in_max( gs_id *gs,  PetscScalar *vals)
1435 {
1436   PetscInt         *num, *map, **reduce;
1437   PetscScalar *base;
1438 
1439   PetscFunctionBegin;
1440   num    = gs->num_gop_local_reduce;
1441   reduce = gs->gop_local_reduce;
1442   while ((map = *reduce++))
1443     {
1444       num++;
1445       base = vals + *map++;
1446       while (*map >= 0)
1447         {*base = PetscMax(*base,*(vals + *map)); map++;}
1448     }
1449   PetscFunctionReturn(0);
1450 }
1451 
1452 /******************************************************************************/
1453 static PetscErrorCode gs_gop_pairwise_max( gs_id *gs,  PetscScalar *in_vals)
1454 {
1455   PetscScalar    *dptr1, *dptr2, *dptr3, *in1, *in2;
1456   PetscInt            *iptr, *msg_list, *msg_size, **msg_nodes;
1457   PetscInt            *pw, *list, *size, **nodes;
1458   MPI_Request    *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
1459   MPI_Status     status;
1460   PetscErrorCode ierr;
1461 
1462   PetscFunctionBegin;
1463   /* strip and load s */
1464   msg_list =list         = gs->pair_list;
1465   msg_size =size         = gs->msg_sizes;
1466   msg_nodes=nodes        = gs->node_list;
1467   iptr=pw                = gs->pw_elm_list;
1468   dptr1=dptr3            = gs->pw_vals;
1469   msg_ids_in  = ids_in   = gs->msg_ids_in;
1470   msg_ids_out = ids_out  = gs->msg_ids_out;
1471   dptr2                  = gs->out;
1472   in1=in2                = gs->in;
1473 
1474   /* post the receives */
1475   /*  msg_nodes=nodes; */
1476   do
1477     {
1478       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
1479          second one *list and do list++ afterwards */
1480       ierr = MPI_Irecv(in1, *size, MPIU_SCALAR, MPI_ANY_SOURCE, MSGTAG1 + *list++, gs->gs_comm, msg_ids_in++);CHKERRQ(ierr);
1481       in1 += *size++;
1482     }
1483   while (*++msg_nodes);
1484   msg_nodes=nodes;
1485 
1486   /* load gs values into in out gs buffers */
1487   while (*iptr >= 0)
1488     {*dptr3++ = *(in_vals + *iptr++);}
1489 
1490   /* load out buffers and post the sends */
1491   while ((iptr = *msg_nodes++))
1492     {
1493       dptr3 = dptr2;
1494       while (*iptr >= 0)
1495         {*dptr2++ = *(dptr1 + *iptr++);}
1496       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
1497       /* is msg_ids_out++ correct? */
1498       ierr = MPI_Isend(dptr3, *msg_size++, MPIU_SCALAR, *msg_list++, MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);CHKERRQ(ierr);
1499     }
1500 
1501   if (gs->max_left_over)
1502     {gs_gop_tree_max(gs,in_vals);}
1503 
1504   /* process the received data */
1505   msg_nodes=nodes;
1506   while ((iptr = *nodes++))
1507     {
1508       /* Should I check the return value of MPI_Wait() or status? */
1509       /* Can this loop be replaced by a call to MPI_Waitall()? */
1510       ierr = MPI_Wait(ids_in++, &status);CHKERRQ(ierr);
1511       while (*iptr >= 0)
1512         {*(dptr1 + *iptr) = PetscMax(*(dptr1 + *iptr),*in2); iptr++; in2++;}
1513     }
1514 
1515   /* replace vals */
1516   while (*pw >= 0)
1517     {*(in_vals + *pw++) = *dptr1++;}
1518 
1519   /* clear isend message handles */
1520   /* This changed for clarity though it could be the same */
1521   while (*msg_nodes++)
1522     /* Should I check the return value of MPI_Wait() or status? */
1523     /* Can this loop be replaced by a call to MPI_Waitall()? */
1524     {ierr = MPI_Wait(ids_out++, &status);CHKERRQ(ierr);}
1525   PetscFunctionReturn(0);
1526 }
1527 
1528 /******************************************************************************/
1529 static PetscErrorCode gs_gop_tree_max(gs_id *gs, PetscScalar *vals)
1530 {
1531   PetscInt            size;
1532   PetscInt            *in, *out;
1533   PetscScalar    *buf, *work;
1534   PetscErrorCode ierr;
1535 
1536   PetscFunctionBegin;
1537   in   = gs->tree_map_in;
1538   out  = gs->tree_map_out;
1539   buf  = gs->tree_buf;
1540   work = gs->tree_work;
1541   size = gs->tree_nel;
1542 
1543   rvec_set(buf,-REAL_MAX,size);
1544 
1545   while (*in >= 0)
1546     {*(buf + *out++) = *(vals + *in++);}
1547 
1548   in   = gs->tree_map_in;
1549   out  = gs->tree_map_out;
1550   ierr = MPI_Allreduce(buf,work,size,MPIU_SCALAR,MPI_MAX,gs->gs_comm);CHKERRQ(ierr);
1551   while (*in >= 0)
1552     {*(vals + *in++) = *(work + *out++);}
1553   PetscFunctionReturn(0);
1554 }
1555 /******************************************************************************/
1556 static PetscErrorCode gs_gop_min_abs( gs_id *gs,  PetscScalar *vals)
1557 {
1558   PetscFunctionBegin;
1559   /* local only operations!!! */
1560   if (gs->num_local)
1561     {gs_gop_local_min_abs(gs,vals);}
1562 
1563   /* if intersection tree/pairwise and local isn't empty */
1564   if (gs->num_local_gop)
1565     {
1566       gs_gop_local_in_min_abs(gs,vals);
1567 
1568       /* pairwise */
1569       if (gs->num_pairs)
1570         {gs_gop_pairwise_min_abs(gs,vals);}
1571 
1572       /* tree */
1573       else if (gs->max_left_over)
1574         {gs_gop_tree_min_abs(gs,vals);}
1575 
1576       gs_gop_local_out(gs,vals);
1577     }
1578   /* if intersection tree/pairwise and local is empty */
1579   else
1580     {
1581       /* pairwise */
1582       if (gs->num_pairs)
1583         {gs_gop_pairwise_min_abs(gs,vals);}
1584 
1585       /* tree */
1586       else if (gs->max_left_over)
1587         {gs_gop_tree_min_abs(gs,vals);}
1588     }
1589   PetscFunctionReturn(0);
1590 }
1591 
1592 /******************************************************************************/
1593 static PetscErrorCode gs_gop_local_min_abs( gs_id *gs,  PetscScalar *vals)
1594 {
1595    PetscInt *num, *map, **reduce;
1596    PetscScalar tmp;
1597 
1598   PetscFunctionBegin;
1599   num    = gs->num_local_reduce;
1600   reduce = gs->local_reduce;
1601   while ((map = *reduce))
1602     {
1603       num ++;
1604       tmp = REAL_MAX;
1605       while (*map >= 0)
1606         {tmp = MIN_FABS(tmp,*(vals + *map)); map++;}
1607 
1608       map = *reduce++;
1609       while (*map >= 0)
1610         {*(vals + *map++) = tmp;}
1611     }
1612   PetscFunctionReturn(0);
1613 }
1614 
1615 /******************************************************************************/
1616 static PetscErrorCode gs_gop_local_in_min_abs( gs_id *gs,  PetscScalar *vals)
1617 {
1618    PetscInt *num, *map, **reduce;
1619    PetscScalar *base;
1620 
1621   PetscFunctionBegin;
1622   num    = gs->num_gop_local_reduce;
1623   reduce = gs->gop_local_reduce;
1624   while ((map = *reduce++))
1625     {
1626       num++;
1627       base = vals + *map++;
1628       while (*map >= 0)
1629         {*base = MIN_FABS(*base,*(vals + *map)); map++;}
1630     }
1631   PetscFunctionReturn(0);
1632 }
1633 
1634 /******************************************************************************/
1635 static PetscErrorCode gs_gop_pairwise_min_abs( gs_id *gs,  PetscScalar *in_vals)
1636 {
1637    PetscScalar *dptr1, *dptr2, *dptr3, *in1, *in2;
1638    PetscInt *iptr, *msg_list, *msg_size, **msg_nodes;
1639    PetscInt *pw, *list, *size, **nodes;
1640   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
1641   MPI_Status status;
1642   PetscErrorCode ierr;
1643 
1644   PetscFunctionBegin;
1645   /* strip and load s */
1646   msg_list =list         = gs->pair_list;
1647   msg_size =size         = gs->msg_sizes;
1648   msg_nodes=nodes        = gs->node_list;
1649   iptr=pw                = gs->pw_elm_list;
1650   dptr1=dptr3            = gs->pw_vals;
1651   msg_ids_in  = ids_in   = gs->msg_ids_in;
1652   msg_ids_out = ids_out  = gs->msg_ids_out;
1653   dptr2                  = gs->out;
1654   in1=in2                = gs->in;
1655 
1656   /* post the receives */
1657   /*  msg_nodes=nodes; */
1658   do
1659     {
1660       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
1661          second one *list and do list++ afterwards */
1662       ierr = MPI_Irecv(in1, *size, MPIU_SCALAR, MPI_ANY_SOURCE, MSGTAG1 + *list++, gs->gs_comm, msg_ids_in++);CHKERRQ(ierr);
1663       in1 += *size++;
1664     }
1665   while (*++msg_nodes);
1666   msg_nodes=nodes;
1667 
1668   /* load gs values into in out gs buffers */
1669   while (*iptr >= 0)
1670     {*dptr3++ = *(in_vals + *iptr++);}
1671 
1672   /* load out buffers and post the sends */
1673   while ((iptr = *msg_nodes++))
1674     {
1675       dptr3 = dptr2;
1676       while (*iptr >= 0)
1677         {*dptr2++ = *(dptr1 + *iptr++);}
1678       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
1679       /* is msg_ids_out++ correct? */
1680       ierr = MPI_Isend(dptr3, *msg_size++, MPIU_SCALAR, *msg_list++, MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);CHKERRQ(ierr);
1681     }
1682 
1683   if (gs->max_left_over)
1684     {gs_gop_tree_min_abs(gs,in_vals);}
1685 
1686   /* process the received data */
1687   msg_nodes=nodes;
1688   while ((iptr = *nodes++))
1689     {
1690       /* Should I check the return value of MPI_Wait() or status? */
1691       /* Can this loop be replaced by a call to MPI_Waitall()? */
1692       ierr = MPI_Wait(ids_in++, &status);CHKERRQ(ierr);
1693       while (*iptr >= 0)
1694         {*(dptr1 + *iptr) = MIN_FABS(*(dptr1 + *iptr),*in2); iptr++; in2++;}
1695     }
1696 
1697   /* replace vals */
1698   while (*pw >= 0)
1699     {*(in_vals + *pw++) = *dptr1++;}
1700 
1701   /* clear isend message handles */
1702   /* This changed for clarity though it could be the same */
1703   while (*msg_nodes++)
1704     /* Should I check the return value of MPI_Wait() or status? */
1705     /* Can this loop be replaced by a call to MPI_Waitall()? */
1706     {ierr = MPI_Wait(ids_out++, &status);CHKERRQ(ierr);}
1707   PetscFunctionReturn(0);
1708 }
1709 
1710 /******************************************************************************/
1711 static PetscErrorCode gs_gop_tree_min_abs(gs_id *gs, PetscScalar *vals)
1712 {
1713   PetscInt size;
1714   PetscInt *in, *out;
1715   PetscScalar *buf, *work;
1716   PetscInt op[] = {GL_MIN_ABS,0};
1717 
1718   PetscFunctionBegin;
1719   in   = gs->tree_map_in;
1720   out  = gs->tree_map_out;
1721   buf  = gs->tree_buf;
1722   work = gs->tree_work;
1723   size = gs->tree_nel;
1724 
1725   rvec_set(buf,REAL_MAX,size);
1726 
1727   while (*in >= 0)
1728     {*(buf + *out++) = *(vals + *in++);}
1729 
1730   in   = gs->tree_map_in;
1731   out  = gs->tree_map_out;
1732   grop(buf,work,size,op);
1733   while (*in >= 0)
1734     {*(vals + *in++) = *(buf + *out++);}
1735   PetscFunctionReturn(0);
1736 }
1737 
1738 /******************************************************************************/
1739 static PetscErrorCode gs_gop_min( gs_id *gs,  PetscScalar *vals)
1740 {
1741   PetscFunctionBegin;
1742   /* local only operations!!! */
1743   if (gs->num_local)
1744     {gs_gop_local_min(gs,vals);}
1745 
1746   /* if intersection tree/pairwise and local isn't empty */
1747   if (gs->num_local_gop)
1748     {
1749       gs_gop_local_in_min(gs,vals);
1750 
1751       /* pairwise */
1752       if (gs->num_pairs)
1753         {gs_gop_pairwise_min(gs,vals);}
1754 
1755       /* tree */
1756       else if (gs->max_left_over)
1757         {gs_gop_tree_min(gs,vals);}
1758 
1759       gs_gop_local_out(gs,vals);
1760     }
1761   /* if intersection tree/pairwise and local is empty */
1762   else
1763     {
1764       /* pairwise */
1765       if (gs->num_pairs)
1766         {gs_gop_pairwise_min(gs,vals);}
1767 
1768       /* tree */
1769       else if (gs->max_left_over)
1770         {gs_gop_tree_min(gs,vals);}
1771     }
1772   PetscFunctionReturn(0);
1773 }
1774 
1775 /******************************************************************************/
1776 static PetscErrorCode gs_gop_local_min( gs_id *gs,  PetscScalar *vals)
1777 {
1778    PetscInt *num, *map, **reduce;
1779    PetscScalar tmp;
1780   PetscFunctionBegin;
1781   num    = gs->num_local_reduce;
1782   reduce = gs->local_reduce;
1783   while ((map = *reduce))
1784     {
1785       num ++;
1786       tmp = REAL_MAX;
1787       while (*map >= 0)
1788         {tmp = PetscMin(tmp,*(vals + *map)); map++;}
1789 
1790       map = *reduce++;
1791       while (*map >= 0)
1792         {*(vals + *map++) = tmp;}
1793     }
1794   PetscFunctionReturn(0);
1795 }
1796 
1797 /******************************************************************************/
1798 static PetscErrorCode gs_gop_local_in_min( gs_id *gs,  PetscScalar *vals)
1799 {
1800    PetscInt *num, *map, **reduce;
1801    PetscScalar *base;
1802 
1803   PetscFunctionBegin;
1804   num    = gs->num_gop_local_reduce;
1805   reduce = gs->gop_local_reduce;
1806   while ((map = *reduce++))
1807     {
1808       num++;
1809       base = vals + *map++;
1810       while (*map >= 0)
1811         {*base = PetscMin(*base,*(vals + *map)); map++;}
1812     }
1813   PetscFunctionReturn(0);
1814 }
1815 
1816 /******************************************************************************/
1817 static PetscErrorCode gs_gop_pairwise_min( gs_id *gs,  PetscScalar *in_vals)
1818 {
1819    PetscScalar *dptr1, *dptr2, *dptr3, *in1, *in2;
1820    PetscInt *iptr, *msg_list, *msg_size, **msg_nodes;
1821    PetscInt *pw, *list, *size, **nodes;
1822   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
1823   MPI_Status status;
1824   PetscErrorCode ierr;
1825 
1826   PetscFunctionBegin;
1827   /* strip and load s */
1828   msg_list =list         = gs->pair_list;
1829   msg_size =size         = gs->msg_sizes;
1830   msg_nodes=nodes        = gs->node_list;
1831   iptr=pw                = gs->pw_elm_list;
1832   dptr1=dptr3            = gs->pw_vals;
1833   msg_ids_in  = ids_in   = gs->msg_ids_in;
1834   msg_ids_out = ids_out  = gs->msg_ids_out;
1835   dptr2                  = gs->out;
1836   in1=in2                = gs->in;
1837 
1838   /* post the receives */
1839   /*  msg_nodes=nodes; */
1840   do
1841     {
1842       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
1843          second one *list and do list++ afterwards */
1844       ierr = MPI_Irecv(in1, *size, MPIU_SCALAR, MPI_ANY_SOURCE, MSGTAG1 + *list++, gs->gs_comm, msg_ids_in++);CHKERRQ(ierr);
1845       in1 += *size++;
1846     }
1847   while (*++msg_nodes);
1848   msg_nodes=nodes;
1849 
1850   /* load gs values into in out gs buffers */
1851   while (*iptr >= 0)
1852     {*dptr3++ = *(in_vals + *iptr++);}
1853 
1854   /* load out buffers and post the sends */
1855   while ((iptr = *msg_nodes++))
1856     {
1857       dptr3 = dptr2;
1858       while (*iptr >= 0)
1859         {*dptr2++ = *(dptr1 + *iptr++);}
1860       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
1861       /* is msg_ids_out++ correct? */
1862       ierr = MPI_Isend(dptr3, *msg_size++, MPIU_SCALAR, *msg_list++, MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);CHKERRQ(ierr);
1863     }
1864 
1865   /* process the received data */
1866   if (gs->max_left_over)
1867     {gs_gop_tree_min(gs,in_vals);}
1868 
1869   msg_nodes=nodes;
1870   while ((iptr = *nodes++))
1871     {
1872       /* Should I check the return value of MPI_Wait() or status? */
1873       /* Can this loop be replaced by a call to MPI_Waitall()? */
1874       ierr = MPI_Wait(ids_in++, &status);CHKERRQ(ierr);
1875       while (*iptr >= 0)
1876         {*(dptr1 + *iptr) = PetscMin(*(dptr1 + *iptr),*in2); iptr++; in2++;}
1877     }
1878 
1879   /* replace vals */
1880   while (*pw >= 0)
1881     {*(in_vals + *pw++) = *dptr1++;}
1882 
1883   /* clear isend message handles */
1884   /* This changed for clarity though it could be the same */
1885   while (*msg_nodes++)
1886     /* Should I check the return value of MPI_Wait() or status? */
1887     /* Can this loop be replaced by a call to MPI_Waitall()? */
1888     {ierr = MPI_Wait(ids_out++, &status);CHKERRQ(ierr);}
1889   PetscFunctionReturn(0);
1890 }
1891 
1892 /******************************************************************************/
1893 static PetscErrorCode gs_gop_tree_min(gs_id *gs, PetscScalar *vals)
1894 {
1895   PetscInt size;
1896   PetscInt *in, *out;
1897   PetscScalar *buf, *work;
1898   PetscErrorCode ierr;
1899 
1900   PetscFunctionBegin;
1901   in   = gs->tree_map_in;
1902   out  = gs->tree_map_out;
1903   buf  = gs->tree_buf;
1904   work = gs->tree_work;
1905   size = gs->tree_nel;
1906 
1907   rvec_set(buf,REAL_MAX,size);
1908 
1909   while (*in >= 0)
1910     {*(buf + *out++) = *(vals + *in++);}
1911 
1912   in   = gs->tree_map_in;
1913   out  = gs->tree_map_out;
1914   ierr = MPI_Allreduce(buf,work,size,MPIU_SCALAR,MPI_MIN,gs->gs_comm);CHKERRQ(ierr);
1915   while (*in >= 0)
1916     {*(vals + *in++) = *(work + *out++);}
1917   PetscFunctionReturn(0);
1918 }
1919 
1920 /******************************************************************************/
1921 static PetscErrorCode gs_gop_times( gs_id *gs,  PetscScalar *vals)
1922 {
1923   PetscFunctionBegin;
1924   /* local only operations!!! */
1925   if (gs->num_local)
1926     {gs_gop_local_times(gs,vals);}
1927 
1928   /* if intersection tree/pairwise and local isn't empty */
1929   if (gs->num_local_gop)
1930     {
1931       gs_gop_local_in_times(gs,vals);
1932 
1933       /* pairwise */
1934       if (gs->num_pairs)
1935         {gs_gop_pairwise_times(gs,vals);}
1936 
1937       /* tree */
1938       else if (gs->max_left_over)
1939         {gs_gop_tree_times(gs,vals);}
1940 
1941       gs_gop_local_out(gs,vals);
1942     }
1943   /* if intersection tree/pairwise and local is empty */
1944   else
1945     {
1946       /* pairwise */
1947       if (gs->num_pairs)
1948         {gs_gop_pairwise_times(gs,vals);}
1949 
1950       /* tree */
1951       else if (gs->max_left_over)
1952         {gs_gop_tree_times(gs,vals);}
1953     }
1954   PetscFunctionReturn(0);
1955 }
1956 
1957 /******************************************************************************/
1958 static PetscErrorCode gs_gop_local_times( gs_id *gs,  PetscScalar *vals)
1959 {
1960    PetscInt *num, *map, **reduce;
1961    PetscScalar tmp;
1962 
1963   PetscFunctionBegin;
1964   num    = gs->num_local_reduce;
1965   reduce = gs->local_reduce;
1966   while ((map = *reduce))
1967     {
1968       /* wall */
1969       if (*num == 2)
1970         {
1971           num ++; reduce++;
1972           vals[map[1]] = vals[map[0]] *= vals[map[1]];
1973         }
1974       /* corner shared by three elements */
1975       else if (*num == 3)
1976         {
1977           num ++; reduce++;
1978           vals[map[2]]=vals[map[1]]=vals[map[0]]*=(vals[map[1]]*vals[map[2]]);
1979         }
1980       /* corner shared by four elements */
1981       else if (*num == 4)
1982         {
1983           num ++; reduce++;
1984           vals[map[1]]=vals[map[2]]=vals[map[3]]=vals[map[0]] *=
1985                                  (vals[map[1]] * vals[map[2]] * vals[map[3]]);
1986         }
1987       /* general case ... odd geoms ... 3D*/
1988       else
1989         {
1990           num ++;
1991           tmp = 1.0;
1992           while (*map >= 0)
1993             {tmp *= *(vals + *map++);}
1994 
1995           map = *reduce++;
1996           while (*map >= 0)
1997             {*(vals + *map++) = tmp;}
1998         }
1999     }
2000   PetscFunctionReturn(0);
2001 }
2002 
2003 /******************************************************************************/
2004 static PetscErrorCode gs_gop_local_in_times( gs_id *gs,  PetscScalar *vals)
2005 {
2006    PetscInt *num, *map, **reduce;
2007    PetscScalar *base;
2008 
2009   PetscFunctionBegin;
2010   num    = gs->num_gop_local_reduce;
2011   reduce = gs->gop_local_reduce;
2012   while ((map = *reduce++))
2013     {
2014       /* wall */
2015       if (*num == 2)
2016         {
2017           num ++;
2018           vals[map[0]] *= vals[map[1]];
2019         }
2020       /* corner shared by three elements */
2021       else if (*num == 3)
2022         {
2023           num ++;
2024           vals[map[0]] *= (vals[map[1]] * vals[map[2]]);
2025         }
2026       /* corner shared by four elements */
2027       else if (*num == 4)
2028         {
2029           num ++;
2030           vals[map[0]] *= (vals[map[1]] * vals[map[2]] * vals[map[3]]);
2031         }
2032       /* general case ... odd geoms ... 3D*/
2033       else
2034         {
2035           num++;
2036           base = vals + *map++;
2037           while (*map >= 0)
2038             {*base *= *(vals + *map++);}
2039         }
2040     }
2041   PetscFunctionReturn(0);
2042 }
2043 
2044 /******************************************************************************/
2045 static PetscErrorCode gs_gop_pairwise_times( gs_id *gs,  PetscScalar *in_vals)
2046 {
2047    PetscScalar *dptr1, *dptr2, *dptr3, *in1, *in2;
2048    PetscInt *iptr, *msg_list, *msg_size, **msg_nodes;
2049    PetscInt *pw, *list, *size, **nodes;
2050   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
2051   MPI_Status status;
2052   PetscErrorCode ierr;
2053 
2054   PetscFunctionBegin;
2055   /* strip and load s */
2056   msg_list =list         = gs->pair_list;
2057   msg_size =size         = gs->msg_sizes;
2058   msg_nodes=nodes        = gs->node_list;
2059   iptr=pw                = gs->pw_elm_list;
2060   dptr1=dptr3            = gs->pw_vals;
2061   msg_ids_in  = ids_in   = gs->msg_ids_in;
2062   msg_ids_out = ids_out  = gs->msg_ids_out;
2063   dptr2                  = gs->out;
2064   in1=in2                = gs->in;
2065 
2066   /* post the receives */
2067   /*  msg_nodes=nodes; */
2068   do
2069     {
2070       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
2071          second one *list and do list++ afterwards */
2072       ierr = MPI_Irecv(in1, *size, MPIU_SCALAR, MPI_ANY_SOURCE, MSGTAG1 + *list++, gs->gs_comm, msg_ids_in++);CHKERRQ(ierr);
2073       in1 += *size++;
2074     }
2075   while (*++msg_nodes);
2076   msg_nodes=nodes;
2077 
2078   /* load gs values into in out gs buffers */
2079   while (*iptr >= 0)
2080     {*dptr3++ = *(in_vals + *iptr++);}
2081 
2082   /* load out buffers and post the sends */
2083   while ((iptr = *msg_nodes++))
2084     {
2085       dptr3 = dptr2;
2086       while (*iptr >= 0)
2087         {*dptr2++ = *(dptr1 + *iptr++);}
2088       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
2089       /* is msg_ids_out++ correct? */
2090       ierr = MPI_Isend(dptr3, *msg_size++, MPIU_SCALAR, *msg_list++, MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);CHKERRQ(ierr);
2091     }
2092 
2093   if (gs->max_left_over)
2094     {gs_gop_tree_times(gs,in_vals);}
2095 
2096   /* process the received data */
2097   msg_nodes=nodes;
2098   while ((iptr = *nodes++))
2099     {
2100       /* Should I check the return value of MPI_Wait() or status? */
2101       /* Can this loop be replaced by a call to MPI_Waitall()? */
2102       ierr = MPI_Wait(ids_in++, &status);CHKERRQ(ierr);
2103       while (*iptr >= 0)
2104         {*(dptr1 + *iptr++) *= *in2++;}
2105     }
2106 
2107   /* replace vals */
2108   while (*pw >= 0)
2109     {*(in_vals + *pw++) = *dptr1++;}
2110 
2111   /* clear isend message handles */
2112   /* This changed for clarity though it could be the same */
2113   while (*msg_nodes++)
2114     /* Should I check the return value of MPI_Wait() or status? */
2115     /* Can this loop be replaced by a call to MPI_Waitall()? */
2116     {ierr = MPI_Wait(ids_out++, &status);CHKERRQ(ierr);}
2117   PetscFunctionReturn(0);
2118 }
2119 
2120 /******************************************************************************/
2121 static PetscErrorCode gs_gop_tree_times(gs_id *gs, PetscScalar *vals)
2122 {
2123   PetscInt size;
2124   PetscInt *in, *out;
2125   PetscScalar *buf, *work;
2126   PetscErrorCode ierr;
2127 
2128   PetscFunctionBegin;
2129   in   = gs->tree_map_in;
2130   out  = gs->tree_map_out;
2131   buf  = gs->tree_buf;
2132   work = gs->tree_work;
2133   size = gs->tree_nel;
2134 
2135   rvec_one(buf,size);
2136 
2137   while (*in >= 0)
2138     {*(buf + *out++) = *(vals + *in++);}
2139 
2140   in   = gs->tree_map_in;
2141   out  = gs->tree_map_out;
2142   ierr = MPI_Allreduce(buf,work,size,MPIU_SCALAR,MPI_PROD,gs->gs_comm);CHKERRQ(ierr);
2143   while (*in >= 0)
2144     {*(vals + *in++) = *(work + *out++);}
2145   PetscFunctionReturn(0);
2146 }
2147 
2148 /******************************************************************************/
2149 static PetscErrorCode gs_gop_plus( gs_id *gs,  PetscScalar *vals)
2150 {
2151   PetscFunctionBegin;
2152   /* local only operations!!! */
2153   if (gs->num_local)
2154     {gs_gop_local_plus(gs,vals);}
2155 
2156   /* if intersection tree/pairwise and local isn't empty */
2157   if (gs->num_local_gop)
2158     {
2159       gs_gop_local_in_plus(gs,vals);
2160 
2161       /* pairwise will NOT do tree inside ... */
2162       if (gs->num_pairs)
2163         {gs_gop_pairwise_plus(gs,vals);}
2164 
2165       /* tree */
2166       if (gs->max_left_over)
2167         {gs_gop_tree_plus(gs,vals);}
2168 
2169       gs_gop_local_out(gs,vals);
2170     }
2171   /* if intersection tree/pairwise and local is empty */
2172   else
2173     {
2174       /* pairwise will NOT do tree inside */
2175       if (gs->num_pairs)
2176         {gs_gop_pairwise_plus(gs,vals);}
2177 
2178       /* tree */
2179       if (gs->max_left_over)
2180         {gs_gop_tree_plus(gs,vals);}
2181     }
2182   PetscFunctionReturn(0);
2183 }
2184 
2185 /******************************************************************************/
2186 static PetscErrorCode gs_gop_local_plus( gs_id *gs,  PetscScalar *vals)
2187 {
2188    PetscInt *num, *map, **reduce;
2189    PetscScalar tmp;
2190 
2191   PetscFunctionBegin;
2192   num    = gs->num_local_reduce;
2193   reduce = gs->local_reduce;
2194   while ((map = *reduce))
2195     {
2196       /* wall */
2197       if (*num == 2)
2198         {
2199           num ++; reduce++;
2200           vals[map[1]] = vals[map[0]] += vals[map[1]];
2201         }
2202       /* corner shared by three elements */
2203       else if (*num == 3)
2204         {
2205           num ++; reduce++;
2206           vals[map[2]]=vals[map[1]]=vals[map[0]]+=(vals[map[1]]+vals[map[2]]);
2207         }
2208       /* corner shared by four elements */
2209       else if (*num == 4)
2210         {
2211           num ++; reduce++;
2212           vals[map[1]]=vals[map[2]]=vals[map[3]]=vals[map[0]] +=
2213                                  (vals[map[1]] + vals[map[2]] + vals[map[3]]);
2214         }
2215       /* general case ... odd geoms ... 3D*/
2216       else
2217         {
2218           num ++;
2219           tmp = 0.0;
2220           while (*map >= 0)
2221             {tmp += *(vals + *map++);}
2222 
2223           map = *reduce++;
2224           while (*map >= 0)
2225             {*(vals + *map++) = tmp;}
2226         }
2227     }
2228   PetscFunctionReturn(0);
2229 }
2230 
2231 /******************************************************************************/
2232 static PetscErrorCode gs_gop_local_in_plus( gs_id *gs,  PetscScalar *vals)
2233 {
2234    PetscInt *num, *map, **reduce;
2235    PetscScalar *base;
2236 
2237   PetscFunctionBegin;
2238   num    = gs->num_gop_local_reduce;
2239   reduce = gs->gop_local_reduce;
2240   while ((map = *reduce++))
2241     {
2242       /* wall */
2243       if (*num == 2)
2244         {
2245           num ++;
2246           vals[map[0]] += vals[map[1]];
2247         }
2248       /* corner shared by three elements */
2249       else if (*num == 3)
2250         {
2251           num ++;
2252           vals[map[0]] += (vals[map[1]] + vals[map[2]]);
2253         }
2254       /* corner shared by four elements */
2255       else if (*num == 4)
2256         {
2257           num ++;
2258           vals[map[0]] += (vals[map[1]] + vals[map[2]] + vals[map[3]]);
2259         }
2260       /* general case ... odd geoms ... 3D*/
2261       else
2262         {
2263           num++;
2264           base = vals + *map++;
2265           while (*map >= 0)
2266             {*base += *(vals + *map++);}
2267         }
2268     }
2269   PetscFunctionReturn(0);
2270 }
2271 
2272 /******************************************************************************/
2273 static PetscErrorCode gs_gop_pairwise_plus( gs_id *gs,  PetscScalar *in_vals)
2274 {
2275    PetscScalar *dptr1, *dptr2, *dptr3, *in1, *in2;
2276    PetscInt *iptr, *msg_list, *msg_size, **msg_nodes;
2277    PetscInt *pw, *list, *size, **nodes;
2278   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
2279   MPI_Status status;
2280   PetscErrorCode ierr;
2281 
2282   PetscFunctionBegin;
2283   /* strip and load s */
2284   msg_list =list         = gs->pair_list;
2285   msg_size =size         = gs->msg_sizes;
2286   msg_nodes=nodes        = gs->node_list;
2287   iptr=pw                = gs->pw_elm_list;
2288   dptr1=dptr3            = gs->pw_vals;
2289   msg_ids_in  = ids_in   = gs->msg_ids_in;
2290   msg_ids_out = ids_out  = gs->msg_ids_out;
2291   dptr2                  = gs->out;
2292   in1=in2                = gs->in;
2293 
2294   /* post the receives */
2295   /*  msg_nodes=nodes; */
2296   do
2297     {
2298       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
2299          second one *list and do list++ afterwards */
2300       ierr = MPI_Irecv(in1, *size, MPIU_SCALAR, MPI_ANY_SOURCE, MSGTAG1 + *list++, gs->gs_comm, msg_ids_in++);CHKERRQ(ierr);
2301       in1 += *size++;
2302     }
2303   while (*++msg_nodes);
2304   msg_nodes=nodes;
2305 
2306   /* load gs values into in out gs buffers */
2307   while (*iptr >= 0)
2308     {*dptr3++ = *(in_vals + *iptr++);}
2309 
2310   /* load out buffers and post the sends */
2311   while ((iptr = *msg_nodes++))
2312     {
2313       dptr3 = dptr2;
2314       while (*iptr >= 0)
2315         {*dptr2++ = *(dptr1 + *iptr++);}
2316       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
2317       /* is msg_ids_out++ correct? */
2318       ierr = MPI_Isend(dptr3, *msg_size++, MPIU_SCALAR, *msg_list++, MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);CHKERRQ(ierr);
2319     }
2320 
2321   /* do the tree while we're waiting */
2322   if (gs->max_left_over)
2323     {gs_gop_tree_plus(gs,in_vals);}
2324 
2325   /* process the received data */
2326   msg_nodes=nodes;
2327   while ((iptr = *nodes++))
2328     {
2329       /* Should I check the return value of MPI_Wait() or status? */
2330       /* Can this loop be replaced by a call to MPI_Waitall()? */
2331       ierr = MPI_Wait(ids_in++, &status);CHKERRQ(ierr);
2332       while (*iptr >= 0)
2333         {*(dptr1 + *iptr++) += *in2++;}
2334     }
2335 
2336   /* replace vals */
2337   while (*pw >= 0)
2338     {*(in_vals + *pw++) = *dptr1++;}
2339 
2340   /* clear isend message handles */
2341   /* This changed for clarity though it could be the same */
2342   while (*msg_nodes++)
2343     /* Should I check the return value of MPI_Wait() or status? */
2344     /* Can this loop be replaced by a call to MPI_Waitall()? */
2345     {ierr = MPI_Wait(ids_out++, &status);CHKERRQ(ierr);}
2346   PetscFunctionReturn(0);
2347 }
2348 
2349 /******************************************************************************/
2350 static PetscErrorCode gs_gop_tree_plus(gs_id *gs, PetscScalar *vals)
2351 {
2352   PetscInt size;
2353   PetscInt *in, *out;
2354   PetscScalar *buf, *work;
2355   PetscErrorCode ierr;
2356 
2357   PetscFunctionBegin;
2358   in   = gs->tree_map_in;
2359   out  = gs->tree_map_out;
2360   buf  = gs->tree_buf;
2361   work = gs->tree_work;
2362   size = gs->tree_nel;
2363 
2364   rvec_zero(buf,size);
2365 
2366   while (*in >= 0)
2367     {*(buf + *out++) = *(vals + *in++);}
2368 
2369   in   = gs->tree_map_in;
2370   out  = gs->tree_map_out;
2371   ierr = MPI_Allreduce(buf,work,size,MPIU_SCALAR,MPI_SUM,gs->gs_comm);CHKERRQ(ierr);
2372   while (*in >= 0)
2373     {*(vals + *in++) = *(work + *out++);}
2374   PetscFunctionReturn(0);
2375 }
2376 
2377 /******************************************************************************/
2378 PetscErrorCode gs_free( gs_id *gs)
2379 {
2380    PetscInt i;
2381 
2382   PetscFunctionBegin;
2383   if (gs->nghs) {free((void*) gs->nghs);}
2384   if (gs->pw_nghs) {free((void*) gs->pw_nghs);}
2385 
2386   /* tree */
2387   if (gs->max_left_over)
2388     {
2389       if (gs->tree_elms) {free((void*) gs->tree_elms);}
2390       if (gs->tree_buf) {free((void*) gs->tree_buf);}
2391       if (gs->tree_work) {free((void*) gs->tree_work);}
2392       if (gs->tree_map_in) {free((void*) gs->tree_map_in);}
2393       if (gs->tree_map_out) {free((void*) gs->tree_map_out);}
2394     }
2395 
2396   /* pairwise info */
2397   if (gs->num_pairs)
2398     {
2399       /* should be NULL already */
2400       if (gs->ngh_buf) {free((void*) gs->ngh_buf);}
2401       if (gs->elms) {free((void*) gs->elms);}
2402       if (gs->local_elms) {free((void*) gs->local_elms);}
2403       if (gs->companion) {free((void*) gs->companion);}
2404 
2405       /* only set if pairwise */
2406       if (gs->vals) {free((void*) gs->vals);}
2407       if (gs->in) {free((void*) gs->in);}
2408       if (gs->out) {free((void*) gs->out);}
2409       if (gs->msg_ids_in) {free((void*) gs->msg_ids_in);}
2410       if (gs->msg_ids_out) {free((void*) gs->msg_ids_out);}
2411       if (gs->pw_vals) {free((void*) gs->pw_vals);}
2412       if (gs->pw_elm_list) {free((void*) gs->pw_elm_list);}
2413       if (gs->node_list)
2414         {
2415           for (i=0;i<gs->num_pairs;i++)
2416             {if (gs->node_list[i]) {free((void*) gs->node_list[i]);}}
2417           free((void*) gs->node_list);
2418         }
2419       if (gs->msg_sizes) {free((void*) gs->msg_sizes);}
2420       if (gs->pair_list) {free((void*) gs->pair_list);}
2421     }
2422 
2423   /* local info */
2424   if (gs->num_local_total>=0)
2425     {
2426       for (i=0;i<gs->num_local_total+1;i++)
2427         /*      for (i=0;i<gs->num_local_total;i++) */
2428         {
2429           if (gs->num_gop_local_reduce[i])
2430             {free((void*) gs->gop_local_reduce[i]);}
2431         }
2432     }
2433 
2434   /* if intersection tree/pairwise and local isn't empty */
2435   if (gs->gop_local_reduce) {free((void*) gs->gop_local_reduce);}
2436   if (gs->num_gop_local_reduce) {free((void*) gs->num_gop_local_reduce);}
2437 
2438   free((void*) gs);
2439   PetscFunctionReturn(0);
2440 }
2441 
2442 /******************************************************************************/
2443 PetscErrorCode gs_gop_vec( gs_id *gs,  PetscScalar *vals,  const char *op,  PetscInt step)
2444 {
2445   PetscErrorCode ierr;
2446 
2447   PetscFunctionBegin;
2448   switch (*op) {
2449   case '+':
2450     gs_gop_vec_plus(gs,vals,step);
2451     break;
2452   default:
2453     ierr = PetscInfo1(0,"gs_gop_vec() :: %c is not a valid op",op[0]);CHKERRQ(ierr);
2454     ierr = PetscInfo(0,"gs_gop_vec() :: default :: plus");CHKERRQ(ierr);
2455     gs_gop_vec_plus(gs,vals,step);
2456     break;
2457   }
2458   PetscFunctionReturn(0);
2459 }
2460 
2461 /******************************************************************************/
2462 static PetscErrorCode gs_gop_vec_plus( gs_id *gs,  PetscScalar *vals,  PetscInt step)
2463 {
2464   PetscFunctionBegin;
2465   if (!gs) {SETERRQ(PETSC_ERR_PLIB,"gs_gop_vec() passed NULL gs handle!!!");}
2466 
2467   /* local only operations!!! */
2468   if (gs->num_local)
2469     {gs_gop_vec_local_plus(gs,vals,step);}
2470 
2471   /* if intersection tree/pairwise and local isn't empty */
2472   if (gs->num_local_gop)
2473     {
2474       gs_gop_vec_local_in_plus(gs,vals,step);
2475 
2476       /* pairwise */
2477       if (gs->num_pairs)
2478         {gs_gop_vec_pairwise_plus(gs,vals,step);}
2479 
2480       /* tree */
2481       else if (gs->max_left_over)
2482         {gs_gop_vec_tree_plus(gs,vals,step);}
2483 
2484       gs_gop_vec_local_out(gs,vals,step);
2485     }
2486   /* if intersection tree/pairwise and local is empty */
2487   else
2488     {
2489       /* pairwise */
2490       if (gs->num_pairs)
2491         {gs_gop_vec_pairwise_plus(gs,vals,step);}
2492 
2493       /* tree */
2494       else if (gs->max_left_over)
2495         {gs_gop_vec_tree_plus(gs,vals,step);}
2496     }
2497   PetscFunctionReturn(0);
2498 }
2499 
2500 /******************************************************************************/
2501 static PetscErrorCode gs_gop_vec_local_plus( gs_id *gs,  PetscScalar *vals, PetscInt step)
2502 {
2503    PetscInt *num, *map, **reduce;
2504    PetscScalar *base;
2505 
2506   PetscFunctionBegin;
2507   num    = gs->num_local_reduce;
2508   reduce = gs->local_reduce;
2509   while ((map = *reduce))
2510     {
2511       base = vals + map[0] * step;
2512 
2513       /* wall */
2514       if (*num == 2)
2515         {
2516           num++; reduce++;
2517           rvec_add (base,vals+map[1]*step,step);
2518           rvec_copy(vals+map[1]*step,base,step);
2519         }
2520       /* corner shared by three elements */
2521       else if (*num == 3)
2522         {
2523           num++; reduce++;
2524           rvec_add (base,vals+map[1]*step,step);
2525           rvec_add (base,vals+map[2]*step,step);
2526           rvec_copy(vals+map[2]*step,base,step);
2527           rvec_copy(vals+map[1]*step,base,step);
2528         }
2529       /* corner shared by four elements */
2530       else if (*num == 4)
2531         {
2532           num++; reduce++;
2533           rvec_add (base,vals+map[1]*step,step);
2534           rvec_add (base,vals+map[2]*step,step);
2535           rvec_add (base,vals+map[3]*step,step);
2536           rvec_copy(vals+map[3]*step,base,step);
2537           rvec_copy(vals+map[2]*step,base,step);
2538           rvec_copy(vals+map[1]*step,base,step);
2539         }
2540       /* general case ... odd geoms ... 3D */
2541       else
2542         {
2543           num++;
2544           while (*++map >= 0)
2545             {rvec_add (base,vals+*map*step,step);}
2546 
2547           map = *reduce;
2548           while (*++map >= 0)
2549             {rvec_copy(vals+*map*step,base,step);}
2550 
2551           reduce++;
2552         }
2553     }
2554   PetscFunctionReturn(0);
2555 }
2556 
2557 /******************************************************************************/
2558 static PetscErrorCode gs_gop_vec_local_in_plus( gs_id *gs,  PetscScalar *vals, PetscInt step)
2559 {
2560    PetscInt  *num, *map, **reduce;
2561    PetscScalar *base;
2562   PetscFunctionBegin;
2563   num    = gs->num_gop_local_reduce;
2564   reduce = gs->gop_local_reduce;
2565   while ((map = *reduce++))
2566     {
2567       base = vals + map[0] * step;
2568 
2569       /* wall */
2570       if (*num == 2)
2571         {
2572           num ++;
2573           rvec_add(base,vals+map[1]*step,step);
2574         }
2575       /* corner shared by three elements */
2576       else if (*num == 3)
2577         {
2578           num ++;
2579           rvec_add(base,vals+map[1]*step,step);
2580           rvec_add(base,vals+map[2]*step,step);
2581         }
2582       /* corner shared by four elements */
2583       else if (*num == 4)
2584         {
2585           num ++;
2586           rvec_add(base,vals+map[1]*step,step);
2587           rvec_add(base,vals+map[2]*step,step);
2588           rvec_add(base,vals+map[3]*step,step);
2589         }
2590       /* general case ... odd geoms ... 3D*/
2591       else
2592         {
2593           num++;
2594           while (*++map >= 0)
2595             {rvec_add(base,vals+*map*step,step);}
2596         }
2597     }
2598   PetscFunctionReturn(0);
2599 }
2600 
2601 /******************************************************************************/
2602 static PetscErrorCode gs_gop_vec_local_out( gs_id *gs,  PetscScalar *vals, PetscInt step)
2603 {
2604    PetscInt *num, *map, **reduce;
2605    PetscScalar *base;
2606 
2607   PetscFunctionBegin;
2608   num    = gs->num_gop_local_reduce;
2609   reduce = gs->gop_local_reduce;
2610   while ((map = *reduce++))
2611     {
2612       base = vals + map[0] * step;
2613 
2614       /* wall */
2615       if (*num == 2)
2616         {
2617           num ++;
2618           rvec_copy(vals+map[1]*step,base,step);
2619         }
2620       /* corner shared by three elements */
2621       else if (*num == 3)
2622         {
2623           num ++;
2624           rvec_copy(vals+map[1]*step,base,step);
2625           rvec_copy(vals+map[2]*step,base,step);
2626         }
2627       /* corner shared by four elements */
2628       else if (*num == 4)
2629         {
2630           num ++;
2631           rvec_copy(vals+map[1]*step,base,step);
2632           rvec_copy(vals+map[2]*step,base,step);
2633           rvec_copy(vals+map[3]*step,base,step);
2634         }
2635       /* general case ... odd geoms ... 3D*/
2636       else
2637         {
2638           num++;
2639           while (*++map >= 0)
2640             {rvec_copy(vals+*map*step,base,step);}
2641         }
2642     }
2643   PetscFunctionReturn(0);
2644 }
2645 
2646 /******************************************************************************/
2647 static PetscErrorCode gs_gop_vec_pairwise_plus( gs_id *gs,  PetscScalar *in_vals, PetscInt step)
2648 {
2649    PetscScalar *dptr1, *dptr2, *dptr3, *in1, *in2;
2650    PetscInt *iptr, *msg_list, *msg_size, **msg_nodes;
2651    PetscInt *pw, *list, *size, **nodes;
2652   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
2653   MPI_Status status;
2654   PetscBLASInt i1,dstep;
2655   PetscErrorCode ierr;
2656 
2657   PetscFunctionBegin;
2658   /* strip and load s */
2659   msg_list =list         = gs->pair_list;
2660   msg_size =size         = gs->msg_sizes;
2661   msg_nodes=nodes        = gs->node_list;
2662   iptr=pw                = gs->pw_elm_list;
2663   dptr1=dptr3            = gs->pw_vals;
2664   msg_ids_in  = ids_in   = gs->msg_ids_in;
2665   msg_ids_out = ids_out  = gs->msg_ids_out;
2666   dptr2                  = gs->out;
2667   in1=in2                = gs->in;
2668 
2669   /* post the receives */
2670   /*  msg_nodes=nodes; */
2671   do
2672     {
2673       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
2674          second one *list and do list++ afterwards */
2675       ierr = MPI_Irecv(in1, *size *step, MPIU_SCALAR, MPI_ANY_SOURCE, MSGTAG1 + *list++, gs->gs_comm, msg_ids_in++);CHKERRQ(ierr);
2676       in1 += *size++ *step;
2677     }
2678   while (*++msg_nodes);
2679   msg_nodes=nodes;
2680 
2681   /* load gs values into in out gs buffers */
2682   while (*iptr >= 0)
2683     {
2684       rvec_copy(dptr3,in_vals + *iptr*step,step);
2685       dptr3+=step;
2686       iptr++;
2687     }
2688 
2689   /* load out buffers and post the sends */
2690   while ((iptr = *msg_nodes++))
2691     {
2692       dptr3 = dptr2;
2693       while (*iptr >= 0)
2694         {
2695           rvec_copy(dptr2,dptr1 + *iptr*step,step);
2696           dptr2+=step;
2697           iptr++;
2698         }
2699       ierr = MPI_Isend(dptr3, *msg_size++ *step, MPIU_SCALAR, *msg_list++, MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);CHKERRQ(ierr);
2700     }
2701 
2702   /* tree */
2703   if (gs->max_left_over)
2704     {gs_gop_vec_tree_plus(gs,in_vals,step);}
2705 
2706   /* process the received data */
2707   msg_nodes=nodes;
2708   while ((iptr = *nodes++)){
2709     PetscScalar d1 = 1.0;
2710       /* Should I check the return value of MPI_Wait() or status? */
2711       /* Can this loop be replaced by a call to MPI_Waitall()? */
2712       ierr = MPI_Wait(ids_in++, &status);CHKERRQ(ierr);
2713       while (*iptr >= 0) {
2714 	dstep = step;
2715         BLASaxpy_(&step,&d1,in2,&i1,dptr1 + *iptr*step,&i1);
2716 	in2+=step;
2717 	iptr++;
2718       }
2719   }
2720 
2721   /* replace vals */
2722   while (*pw >= 0)
2723     {
2724       rvec_copy(in_vals + *pw*step,dptr1,step);
2725       dptr1+=step;
2726       pw++;
2727     }
2728 
2729   /* clear isend message handles */
2730   /* This changed for clarity though it could be the same */
2731   while (*msg_nodes++)
2732     /* Should I check the return value of MPI_Wait() or status? */
2733     /* Can this loop be replaced by a call to MPI_Waitall()? */
2734     {ierr = MPI_Wait(ids_out++, &status);CHKERRQ(ierr);}
2735 
2736   PetscFunctionReturn(0);
2737 }
2738 
2739 /******************************************************************************/
2740 static PetscErrorCode gs_gop_vec_tree_plus( gs_id *gs,  PetscScalar *vals,  PetscInt step)
2741 {
2742   PetscInt size, *in, *out;
2743   PetscScalar *buf, *work;
2744   PetscInt op[] = {GL_ADD,0};
2745   PetscBLASInt i1 = 1;
2746 
2747   PetscFunctionBegin;
2748   /* copy over to local variables */
2749   in   = gs->tree_map_in;
2750   out  = gs->tree_map_out;
2751   buf  = gs->tree_buf;
2752   work = gs->tree_work;
2753   size = gs->tree_nel*step;
2754 
2755   /* zero out collection buffer */
2756   rvec_zero(buf,size);
2757 
2758 
2759   /* copy over my contributions */
2760   while (*in >= 0)
2761     {
2762       PetscBLASInt dstep = step;
2763       BLAScopy_(&dstep,vals + *in++*step,&i1,buf + *out++*step,&i1);
2764     }
2765 
2766   /* perform fan in/out on full buffer */
2767   /* must change grop to handle the blas */
2768   grop(buf,work,size,op);
2769 
2770   /* reset */
2771   in   = gs->tree_map_in;
2772   out  = gs->tree_map_out;
2773 
2774   /* get the portion of the results I need */
2775   while (*in >= 0)
2776     {
2777       PetscBLASInt dstep = step;
2778       BLAScopy_(&dstep,buf + *out++*step,&i1,vals + *in++*step,&i1);
2779     }
2780   PetscFunctionReturn(0);
2781 }
2782 
2783 /******************************************************************************/
2784 PetscErrorCode gs_gop_hc( gs_id *gs,  PetscScalar *vals,  const char *op,  PetscInt dim)
2785 {
2786   PetscErrorCode ierr;
2787 
2788   PetscFunctionBegin;
2789   switch (*op) {
2790   case '+':
2791     gs_gop_plus_hc(gs,vals,dim);
2792     break;
2793   default:
2794     ierr = PetscInfo1(0,"gs_gop_hc() :: %c is not a valid op",op[0]);CHKERRQ(ierr);
2795     ierr = PetscInfo(0,"gs_gop_hc() :: default :: plus\n");CHKERRQ(ierr);
2796     gs_gop_plus_hc(gs,vals,dim);
2797     break;
2798   }
2799   PetscFunctionReturn(0);
2800 }
2801 
2802 /******************************************************************************/
2803 static PetscErrorCode gs_gop_plus_hc( gs_id *gs,  PetscScalar *vals, PetscInt dim)
2804 {
2805   PetscFunctionBegin;
2806   /* if there's nothing to do return */
2807   if (dim<=0)
2808     {  PetscFunctionReturn(0);}
2809 
2810   /* can't do more dimensions then exist */
2811   dim = PetscMin(dim,i_log2_num_nodes);
2812 
2813   /* local only operations!!! */
2814   if (gs->num_local)
2815     {gs_gop_local_plus(gs,vals);}
2816 
2817   /* if intersection tree/pairwise and local isn't empty */
2818   if (gs->num_local_gop)
2819     {
2820       gs_gop_local_in_plus(gs,vals);
2821 
2822       /* pairwise will do tree inside ... */
2823       if (gs->num_pairs)
2824         {gs_gop_pairwise_plus_hc(gs,vals,dim);}
2825 
2826       /* tree only */
2827       else if (gs->max_left_over)
2828         {gs_gop_tree_plus_hc(gs,vals,dim);}
2829 
2830       gs_gop_local_out(gs,vals);
2831     }
2832   /* if intersection tree/pairwise and local is empty */
2833   else
2834     {
2835       /* pairwise will do tree inside */
2836       if (gs->num_pairs)
2837         {gs_gop_pairwise_plus_hc(gs,vals,dim);}
2838 
2839       /* tree */
2840       else if (gs->max_left_over)
2841         {gs_gop_tree_plus_hc(gs,vals,dim);}
2842     }
2843   PetscFunctionReturn(0);
2844 }
2845 
2846 /******************************************************************************/
2847 static PetscErrorCode gs_gop_pairwise_plus_hc( gs_id *gs,  PetscScalar *in_vals, PetscInt dim)
2848 {
2849    PetscScalar *dptr1, *dptr2, *dptr3, *in1, *in2;
2850    PetscInt *iptr, *msg_list, *msg_size, **msg_nodes;
2851    PetscInt *pw, *list, *size, **nodes;
2852   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
2853   MPI_Status status;
2854   PetscInt i, mask=1;
2855   PetscErrorCode ierr;
2856 
2857   PetscFunctionBegin;
2858   for (i=1; i<dim; i++)
2859     {mask<<=1; mask++;}
2860 
2861 
2862   /* strip and load s */
2863   msg_list =list         = gs->pair_list;
2864   msg_size =size         = gs->msg_sizes;
2865   msg_nodes=nodes        = gs->node_list;
2866   iptr=pw                = gs->pw_elm_list;
2867   dptr1=dptr3            = gs->pw_vals;
2868   msg_ids_in  = ids_in   = gs->msg_ids_in;
2869   msg_ids_out = ids_out  = gs->msg_ids_out;
2870   dptr2                  = gs->out;
2871   in1=in2                = gs->in;
2872 
2873   /* post the receives */
2874   /*  msg_nodes=nodes; */
2875   do
2876     {
2877       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
2878          second one *list and do list++ afterwards */
2879       if ((my_id|mask)==(*list|mask))
2880         {
2881           ierr = MPI_Irecv(in1, *size, MPIU_SCALAR, MPI_ANY_SOURCE, MSGTAG1 + *list++, gs->gs_comm, msg_ids_in++);CHKERRQ(ierr);
2882           in1 += *size++;
2883         }
2884       else
2885         {list++; size++;}
2886     }
2887   while (*++msg_nodes);
2888 
2889   /* load gs values into in out gs buffers */
2890   while (*iptr >= 0)
2891     {*dptr3++ = *(in_vals + *iptr++);}
2892 
2893   /* load out buffers and post the sends */
2894   msg_nodes=nodes;
2895   list = msg_list;
2896   while ((iptr = *msg_nodes++))
2897     {
2898       if ((my_id|mask)==(*list|mask))
2899         {
2900           dptr3 = dptr2;
2901           while (*iptr >= 0)
2902             {*dptr2++ = *(dptr1 + *iptr++);}
2903           /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
2904           /* is msg_ids_out++ correct? */
2905           ierr = MPI_Isend(dptr3, *msg_size++, MPIU_SCALAR, *list++, MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);CHKERRQ(ierr);
2906         }
2907       else
2908         {list++; msg_size++;}
2909     }
2910 
2911   /* do the tree while we're waiting */
2912   if (gs->max_left_over)
2913     {gs_gop_tree_plus_hc(gs,in_vals,dim);}
2914 
2915   /* process the received data */
2916   msg_nodes=nodes;
2917   list = msg_list;
2918   while ((iptr = *nodes++))
2919     {
2920       if ((my_id|mask)==(*list|mask))
2921         {
2922           /* Should I check the return value of MPI_Wait() or status? */
2923           /* Can this loop be replaced by a call to MPI_Waitall()? */
2924           ierr = MPI_Wait(ids_in++, &status);CHKERRQ(ierr);
2925           while (*iptr >= 0)
2926             {*(dptr1 + *iptr++) += *in2++;}
2927         }
2928       list++;
2929     }
2930 
2931   /* replace vals */
2932   while (*pw >= 0)
2933     {*(in_vals + *pw++) = *dptr1++;}
2934 
2935   /* clear isend message handles */
2936   /* This changed for clarity though it could be the same */
2937   while (*msg_nodes++)
2938     {
2939       if ((my_id|mask)==(*msg_list|mask))
2940         {
2941           /* Should I check the return value of MPI_Wait() or status? */
2942           /* Can this loop be replaced by a call to MPI_Waitall()? */
2943           ierr = MPI_Wait(ids_out++, &status);CHKERRQ(ierr);
2944         }
2945       msg_list++;
2946     }
2947 
2948   PetscFunctionReturn(0);
2949 }
2950 
2951 /******************************************************************************/
2952 static PetscErrorCode gs_gop_tree_plus_hc(gs_id *gs, PetscScalar *vals, PetscInt dim)
2953 {
2954   PetscInt size;
2955   PetscInt *in, *out;
2956   PetscScalar *buf, *work;
2957   PetscInt op[] = {GL_ADD,0};
2958 
2959   PetscFunctionBegin;
2960   in   = gs->tree_map_in;
2961   out  = gs->tree_map_out;
2962   buf  = gs->tree_buf;
2963   work = gs->tree_work;
2964   size = gs->tree_nel;
2965 
2966   rvec_zero(buf,size);
2967 
2968   while (*in >= 0)
2969     {*(buf + *out++) = *(vals + *in++);}
2970 
2971   in   = gs->tree_map_in;
2972   out  = gs->tree_map_out;
2973 
2974   grop_hc(buf,work,size,op,dim);
2975 
2976   while (*in >= 0)
2977     {*(vals + *in++) = *(buf + *out++);}
2978   PetscFunctionReturn(0);
2979 }
2980 
2981 
2982 
2983