xref: /petsc/src/ksp/pc/impls/tfs/gs.c (revision be1d678a52e6eff2808b2fa31ae986cdbf03c9fe)
1 
2 /***********************************gs.c***************************************
3 
4 Author: Henry M. Tufo III
5 
6 e-mail: hmt@cs.brown.edu
7 
8 snail-mail:
9 Division of Applied Mathematics
10 Brown University
11 Providence, RI 02912
12 
13 Last Modification:
14 6.21.97
15 ************************************gs.c**************************************/
16 
17 /***********************************gs.c***************************************
18 File Description:
19 -----------------
20 
21 ************************************gs.c**************************************/
22 
23 #include "src/ksp/pc/impls/tfs/tfs.h"
24 
25 /* default length of number of items via tree - doubles if exceeded */
26 #define TREE_BUF_SZ 2048;
27 #define GS_VEC_SZ   1
28 
29 
30 
31 /***********************************gs.c***************************************
32 Type: struct gather_scatter_id
33 ------------------------------
34 
35 ************************************gs.c**************************************/
36 typedef struct gather_scatter_id {
37   int id;
38   int nel_min;
39   int nel_max;
40   int nel_sum;
41   int negl;
42   int gl_max;
43   int gl_min;
44   int repeats;
45   int ordered;
46   int positive;
47   PetscScalar *vals;
48 
49   /* bit mask info */
50   int *my_proc_mask;
51   int mask_sz;
52   int *ngh_buf;
53   int ngh_buf_sz;
54   int *nghs;
55   int num_nghs;
56   int max_nghs;
57   int *pw_nghs;
58   int num_pw_nghs;
59   int *tree_nghs;
60   int num_tree_nghs;
61 
62   int num_loads;
63 
64   /* repeats == true -> local info */
65   int nel;         /* number of unique elememts */
66   int *elms;       /* of size nel */
67   int nel_total;
68   int *local_elms; /* of size nel_total */
69   int *companion;  /* of size nel_total */
70 
71   /* local info */
72   int num_local_total;
73   int local_strength;
74   int num_local;
75   int *num_local_reduce;
76   int **local_reduce;
77   int num_local_gop;
78   int *num_gop_local_reduce;
79   int **gop_local_reduce;
80 
81   /* pairwise info */
82   int level;
83   int num_pairs;
84   int max_pairs;
85   int loc_node_pairs;
86   int max_node_pairs;
87   int min_node_pairs;
88   int avg_node_pairs;
89   int *pair_list;
90   int *msg_sizes;
91   int **node_list;
92   int len_pw_list;
93   int *pw_elm_list;
94   PetscScalar *pw_vals;
95 
96   MPI_Request *msg_ids_in;
97   MPI_Request *msg_ids_out;
98 
99   PetscScalar *out;
100   PetscScalar *in;
101   int msg_total;
102 
103   /* tree - crystal accumulator info */
104   int max_left_over;
105   int *pre;
106   int *in_num;
107   int *out_num;
108   int **in_list;
109   int **out_list;
110 
111   /* new tree work*/
112   int  tree_nel;
113   int *tree_elms;
114   PetscScalar *tree_buf;
115   PetscScalar *tree_work;
116 
117   int  tree_map_sz;
118   int *tree_map_in;
119   int *tree_map_out;
120 
121   /* current memory status */
122   int gl_bss_min;
123   int gl_perm_min;
124 
125   /* max segment size for gs_gop_vec() */
126   int vec_sz;
127 
128   /* hack to make paul happy */
129   MPI_Comm gs_comm;
130 
131 } gs_id;
132 
133 
134 /* to be made public */
135 
136 /* PRIVATE - and definitely not exported */
137 /*static void gs_print_template( gs_id* gs, int who);*/
138 /*static void gs_print_stemplate( gs_id* gs, int who);*/
139 
140 static gs_id *gsi_check_args(int *elms, int nel, int level);
141 static void gsi_via_bit_mask(gs_id *gs);
142 static void get_ngh_buf(gs_id *gs);
143 static void set_pairwise(gs_id *gs);
144 static gs_id * gsi_new(void);
145 static void set_tree(gs_id *gs);
146 
147 /* same for all but vector flavor */
148 static void gs_gop_local_out(gs_id *gs, PetscScalar *vals);
149 /* vector flavor */
150 static void gs_gop_vec_local_out(gs_id *gs, PetscScalar *vals, int step);
151 
152 static void gs_gop_vec_plus(gs_id *gs, PetscScalar *in_vals, int step);
153 static void gs_gop_vec_pairwise_plus(gs_id *gs, PetscScalar *in_vals, int step);
154 static void gs_gop_vec_local_plus(gs_id *gs, PetscScalar *vals, int step);
155 static void gs_gop_vec_local_in_plus(gs_id *gs, PetscScalar *vals, int step);
156 static void gs_gop_vec_tree_plus(gs_id *gs, PetscScalar *vals, int step);
157 
158 
159 static void gs_gop_plus(gs_id *gs, PetscScalar *in_vals);
160 static void gs_gop_pairwise_plus(gs_id *gs, PetscScalar *in_vals);
161 static void gs_gop_local_plus(gs_id *gs, PetscScalar *vals);
162 static void gs_gop_local_in_plus(gs_id *gs, PetscScalar *vals);
163 static void gs_gop_tree_plus(gs_id *gs, PetscScalar *vals);
164 
165 static void gs_gop_plus_hc(gs_id *gs, PetscScalar *in_vals, int dim);
166 static void gs_gop_pairwise_plus_hc(gs_id *gs, PetscScalar *in_vals, int dim);
167 static void gs_gop_tree_plus_hc(gs_id *gs, PetscScalar *vals, int dim);
168 
169 static void gs_gop_times(gs_id *gs, PetscScalar *in_vals);
170 static void gs_gop_pairwise_times(gs_id *gs, PetscScalar *in_vals);
171 static void gs_gop_local_times(gs_id *gs, PetscScalar *vals);
172 static void gs_gop_local_in_times(gs_id *gs, PetscScalar *vals);
173 static void gs_gop_tree_times(gs_id *gs, PetscScalar *vals);
174 
175 static void gs_gop_min(gs_id *gs, PetscScalar *in_vals);
176 static void gs_gop_pairwise_min(gs_id *gs, PetscScalar *in_vals);
177 static void gs_gop_local_min(gs_id *gs, PetscScalar *vals);
178 static void gs_gop_local_in_min(gs_id *gs, PetscScalar *vals);
179 static void gs_gop_tree_min(gs_id *gs, PetscScalar *vals);
180 
181 static void gs_gop_min_abs(gs_id *gs, PetscScalar *in_vals);
182 static void gs_gop_pairwise_min_abs(gs_id *gs, PetscScalar *in_vals);
183 static void gs_gop_local_min_abs(gs_id *gs, PetscScalar *vals);
184 static void gs_gop_local_in_min_abs(gs_id *gs, PetscScalar *vals);
185 static void gs_gop_tree_min_abs(gs_id *gs, PetscScalar *vals);
186 
187 static void gs_gop_max(gs_id *gs, PetscScalar *in_vals);
188 static void gs_gop_pairwise_max(gs_id *gs, PetscScalar *in_vals);
189 static void gs_gop_local_max(gs_id *gs, PetscScalar *vals);
190 static void gs_gop_local_in_max(gs_id *gs, PetscScalar *vals);
191 static void gs_gop_tree_max(gs_id *gs, PetscScalar *vals);
192 
193 static void gs_gop_max_abs(gs_id *gs, PetscScalar *in_vals);
194 static void gs_gop_pairwise_max_abs(gs_id *gs, PetscScalar *in_vals);
195 static void gs_gop_local_max_abs(gs_id *gs, PetscScalar *vals);
196 static void gs_gop_local_in_max_abs(gs_id *gs, PetscScalar *vals);
197 static void gs_gop_tree_max_abs(gs_id *gs, PetscScalar *vals);
198 
199 static void gs_gop_exists(gs_id *gs, PetscScalar *in_vals);
200 static void gs_gop_pairwise_exists(gs_id *gs, PetscScalar *in_vals);
201 static void gs_gop_local_exists(gs_id *gs, PetscScalar *vals);
202 static void gs_gop_local_in_exists(gs_id *gs, PetscScalar *vals);
203 static void gs_gop_tree_exists(gs_id *gs, PetscScalar *vals);
204 
205 static void gs_gop_pairwise_binary(gs_id *gs, PetscScalar *in_vals, rbfp fct);
206 static void gs_gop_local_binary(gs_id *gs, PetscScalar *vals, rbfp fct);
207 static void gs_gop_local_in_binary(gs_id *gs, PetscScalar *vals, rbfp fct);
208 static void gs_gop_tree_binary(gs_id *gs, PetscScalar *vals, rbfp fct);
209 
210 
211 
212 /* global vars */
213 /* from comm.c module */
214 
215 /* module state inf and fortran interface */
216 static int num_gs_ids = 0;
217 
218 /* should make this dynamic ... later */
219 static int msg_buf=MAX_MSG_BUF;
220 static int vec_sz=GS_VEC_SZ;
221 static int *tree_buf=NULL;
222 static int tree_buf_sz=0;
223 static int ntree=0;
224 
225 
226 /******************************************************************************
227 Function: gs_init_()
228 
229 Input :
230 Output:
231 Return:
232 Description:
233 ******************************************************************************/
234 void gs_init_vec_sz(int size)
235 {
236   /*  vec_ch = TRUE; */
237 
238   vec_sz = size;
239 }
240 
241 /******************************************************************************
242 Function: gs_init_()
243 
244 Input :
245 Output:
246 Return:
247 Description:
248 ******************************************************************************/
249 void gs_init_msg_buf_sz(int buf_size)
250 {
251   /*  msg_ch = TRUE; */
252 
253   msg_buf = buf_size;
254 }
255 
256 /******************************************************************************
257 Function: gs_init()
258 
259 Input :
260 
261 Output:
262 
263 RETURN:
264 
265 Description:
266 ******************************************************************************/
267 gs_id *
268 gs_init( int *elms, int nel, int level)
269 {
270    gs_id *gs;
271   MPI_Group gs_group;
272   MPI_Comm  gs_comm;
273 
274   /* ensure that communication package has been initialized */
275   comm_init();
276 
277 
278   /* determines if we have enough dynamic/semi-static memory */
279   /* checks input, allocs and sets gd_id template            */
280   gs = gsi_check_args(elms,nel,level);
281 
282   /* only bit mask version up and working for the moment    */
283   /* LATER :: get int list version working for sparse pblms */
284   gsi_via_bit_mask(gs);
285 
286 
287   MPI_Comm_group(MPI_COMM_WORLD,&gs_group);
288   MPI_Comm_create(MPI_COMM_WORLD,gs_group,&gs_comm);
289   gs->gs_comm=gs_comm;
290 
291   return(gs);
292 }
293 
294 
295 
296 /******************************************************************************
297 Function: gsi_new()
298 
299 Input :
300 Output:
301 Return:
302 Description:
303 
304 elm list must >= 0!!!
305 elm repeats allowed
306 ******************************************************************************/
307 static
308 gs_id *
309 gsi_new(void)
310 {
311   gs_id *gs;
312   gs = (gs_id *) malloc(sizeof(gs_id));
313   PetscMemzero(gs,sizeof(gs_id));
314   return(gs);
315 }
316 
317 
318 
319 /******************************************************************************
320 Function: gsi_check_args()
321 
322 Input :
323 Output:
324 Return:
325 Description:
326 
327 elm list must >= 0!!!
328 elm repeats allowed
329 local working copy of elms is sorted
330 ******************************************************************************/
331 static
332 gs_id *
333 gsi_check_args(int *in_elms, int nel, int level)
334 {
335    int i, j, k, t2;
336   int *companion, *elms, *unique, *iptr;
337   int num_local=0, *num_to_reduce, **local_reduce;
338   int oprs[] = {NON_UNIFORM,GL_MIN,GL_MAX,GL_ADD,GL_MIN,GL_MAX,GL_MIN,GL_B_AND};
339   int vals[sizeof(oprs)/sizeof(oprs[0])-1];
340   int work[sizeof(oprs)/sizeof(oprs[0])-1];
341   gs_id *gs;
342 
343 
344 
345 #ifdef SAFE
346   if (!in_elms)
347     {error_msg_fatal("elms point to nothing!!!\n");}
348 
349   if (nel<0)
350     {error_msg_fatal("can't have fewer than 0 elms!!!\n");}
351 
352   if (nel==0)
353     {error_msg_warning("I don't have any elements!!!\n");}
354 #endif
355 
356   /* get space for gs template */
357   gs = gsi_new();
358   gs->id = ++num_gs_ids;
359 
360   /* hmt 6.4.99                                            */
361   /* caller can set global ids that don't participate to 0 */
362   /* gs_init ignores all zeros in elm list                 */
363   /* negative global ids are still invalid                 */
364   for (i=j=0;i<nel;i++)
365     {if (in_elms[i]!=0) {j++;}}
366 
367   k=nel; nel=j;
368 
369   /* copy over in_elms list and create inverse map */
370   elms = (int*) malloc((nel+1)*sizeof(PetscInt));
371   companion = (int*) malloc(nel*sizeof(PetscInt));
372   /* ivec_c_index(companion,nel); */
373   /* ivec_copy(elms,in_elms,nel); */
374   for (i=j=0;i<k;i++)
375     {
376       if (in_elms[i]!=0)
377         {elms[j] = in_elms[i]; companion[j++] = i;}
378     }
379 
380   if (j!=nel)
381     {error_msg_fatal("nel j mismatch!\n");}
382 
383 #ifdef SAFE
384   /* pre-pass ... check to see if sorted */
385   elms[nel] = INT_MAX;
386   iptr = elms;
387   unique = elms+1;
388   j=0;
389   while (*iptr!=INT_MAX)
390     {
391       if (*iptr++>*unique++)
392         {j=1; break;}
393     }
394 
395   /* set up inverse map */
396   if (j)
397     {
398       error_msg_warning("gsi_check_args() :: elm list *not* sorted!\n");
399       SMI_sort((void*)elms, (void*)companion, nel, SORT_INTEGER);
400     }
401   else
402     {error_msg_warning("gsi_check_args() :: elm list sorted!\n");}
403 #else
404   SMI_sort((void*)elms, (void*)companion, nel, SORT_INTEGER);
405 #endif
406   elms[nel] = INT_MIN;
407 
408   /* first pass */
409   /* determine number of unique elements, check pd */
410   for (i=k=0;i<nel;i+=j)
411     {
412       t2 = elms[i];
413       j=++i;
414 
415       /* clump 'em for now */
416       while (elms[j]==t2) {j++;}
417 
418       /* how many together and num local */
419       if (j-=i)
420         {num_local++; k+=j;}
421     }
422 
423   /* how many unique elements? */
424   gs->repeats=k;
425   gs->nel = nel-k;
426 
427 
428   /* number of repeats? */
429   gs->num_local = num_local;
430   num_local+=2;
431   gs->local_reduce=local_reduce=(int **)malloc(num_local*sizeof(PetscInt*));
432   gs->num_local_reduce=num_to_reduce=(int*) malloc(num_local*sizeof(PetscInt));
433 
434   unique = (int*) malloc((gs->nel+1)*sizeof(PetscInt));
435   gs->elms = unique;
436   gs->nel_total = nel;
437   gs->local_elms = elms;
438   gs->companion = companion;
439 
440   /* compess map as well as keep track of local ops */
441   for (num_local=i=j=0;i<gs->nel;i++)
442     {
443       k=j;
444       t2 = unique[i] = elms[j];
445       companion[i] = companion[j];
446 
447       while (elms[j]==t2) {j++;}
448 
449       if ((t2=(j-k))>1)
450         {
451           /* number together */
452           num_to_reduce[num_local] = t2++;
453           iptr = local_reduce[num_local++] = (int*)malloc(t2*sizeof(PetscInt));
454 
455           /* to use binary searching don't remap until we check intersection */
456           *iptr++ = i;
457 
458           /* note that we're skipping the first one */
459           while (++k<j)
460             {*(iptr++) = companion[k];}
461           *iptr = -1;
462         }
463     }
464 
465   /* sentinel for ngh_buf */
466   unique[gs->nel]=INT_MAX;
467 
468   /* for two partition sort hack */
469   num_to_reduce[num_local] = 0;
470   local_reduce[num_local] = NULL;
471   num_to_reduce[++num_local] = 0;
472   local_reduce[num_local] = NULL;
473 
474   /* load 'em up */
475   /* note one extra to hold NON_UNIFORM flag!!! */
476   vals[2] = vals[1] = vals[0] = nel;
477   if (gs->nel>0)
478     {
479        vals[3] = unique[0];           /* ivec_lb(elms,nel); */
480        vals[4] = unique[gs->nel-1];   /* ivec_ub(elms,nel); */
481     }
482   else
483     {
484        vals[3] = INT_MAX;             /* ivec_lb(elms,nel); */
485        vals[4] = INT_MIN;             /* ivec_ub(elms,nel); */
486     }
487   vals[5] = level;
488   vals[6] = num_gs_ids;
489 
490   /* GLOBAL: send 'em out */
491   giop(vals,work,sizeof(oprs)/sizeof(oprs[0])-1,oprs);
492 
493   /* must be semi-pos def - only pairwise depends on this */
494   /* LATER - remove this restriction */
495   if (vals[3]<0)
496     {error_msg_fatal("gsi_check_args() :: system not semi-pos def ::%d\n",vals[3]);}
497 
498   if (vals[4]==INT_MAX)
499     {error_msg_fatal("gsi_check_args() :: system ub too large ::%d!\n",vals[4]);}
500 
501   gs->nel_min = vals[0];
502   gs->nel_max = vals[1];
503   gs->nel_sum = vals[2];
504   gs->gl_min  = vals[3];
505   gs->gl_max  = vals[4];
506   gs->negl    = vals[4]-vals[3]+1;
507 
508   if (gs->negl<=0)
509     {error_msg_fatal("gsi_check_args() :: system empty or neg :: %d\n",gs->negl);}
510 
511   /* LATER :: add level == -1 -> program selects level */
512   if (vals[5]<0)
513     {vals[5]=0;}
514   else if (vals[5]>num_nodes)
515     {vals[5]=num_nodes;}
516   gs->level = vals[5];
517 
518   return(gs);
519 }
520 
521 
522 /******************************************************************************
523 Function: gsi_via_bit_mask()
524 
525 Input :
526 Output:
527 Return:
528 Description:
529 
530 
531 ******************************************************************************/
532 static
533 void
534 gsi_via_bit_mask(gs_id *gs)
535 {
536    int i, nel, *elms;
537   int t1;
538   int **reduce;
539   int *map;
540 
541   /* totally local removes ... ct_bits == 0 */
542   get_ngh_buf(gs);
543 
544   if (gs->level)
545     {set_pairwise(gs);}
546 
547   if (gs->max_left_over)
548     {set_tree(gs);}
549 
550   /* intersection local and pairwise/tree? */
551   gs->num_local_total = gs->num_local;
552   gs->gop_local_reduce = gs->local_reduce;
553   gs->num_gop_local_reduce = gs->num_local_reduce;
554 
555   map = gs->companion;
556 
557   /* is there any local compression */
558   if (!gs->num_local) {
559     gs->local_strength = NONE;
560     gs->num_local_gop = 0;
561   } else {
562       /* ok find intersection */
563       map = gs->companion;
564       reduce = gs->local_reduce;
565       for (i=0, t1=0; i<gs->num_local; i++, reduce++)
566         {
567           if ((ivec_binary_search(**reduce,gs->pw_elm_list,gs->len_pw_list)>=0)
568               ||
569               ivec_binary_search(**reduce,gs->tree_map_in,gs->tree_map_sz)>=0)
570             {
571               /* printf("C%d :: i=%d, **reduce=%d\n",my_id,i,**reduce); */
572               t1++;
573               if (gs->num_local_reduce[i]<=0)
574                 {error_msg_fatal("nobody in list?");}
575               gs->num_local_reduce[i] *= -1;
576             }
577            **reduce=map[**reduce];
578         }
579 
580       /* intersection is empty */
581       if (!t1)
582         {
583           gs->local_strength = FULL;
584           gs->num_local_gop = 0;
585         }
586       /* intersection not empty */
587       else
588         {
589           gs->local_strength = PARTIAL;
590           SMI_sort((void*)gs->num_local_reduce, (void*)gs->local_reduce,
591                    gs->num_local + 1, SORT_INT_PTR);
592 
593           gs->num_local_gop = t1;
594           gs->num_local_total =  gs->num_local;
595           gs->num_local    -= t1;
596           gs->gop_local_reduce = gs->local_reduce;
597           gs->num_gop_local_reduce = gs->num_local_reduce;
598 
599           for (i=0; i<t1; i++)
600             {
601               if (gs->num_gop_local_reduce[i]>=0)
602                 {error_msg_fatal("they aren't negative?");}
603               gs->num_gop_local_reduce[i] *= -1;
604               gs->local_reduce++;
605               gs->num_local_reduce++;
606             }
607           gs->local_reduce++;
608           gs->num_local_reduce++;
609         }
610     }
611 
612   elms = gs->pw_elm_list;
613   nel  = gs->len_pw_list;
614   for (i=0; i<nel; i++)
615     {elms[i] = map[elms[i]];}
616 
617   elms = gs->tree_map_in;
618   nel  = gs->tree_map_sz;
619   for (i=0; i<nel; i++)
620     {elms[i] = map[elms[i]];}
621 
622   /* clean up */
623   free((void*) gs->local_elms);
624   free((void*) gs->companion);
625   free((void*) gs->elms);
626   free((void*) gs->ngh_buf);
627   gs->local_elms = gs->companion = gs->elms = gs->ngh_buf = NULL;
628 }
629 
630 
631 
632 /******************************************************************************
633 Function: place_in_tree()
634 
635 Input :
636 Output:
637 Return:
638 Description:
639 
640 
641 ******************************************************************************/
642 static
643 void
644 place_in_tree( int elm)
645 {
646    int *tp, n;
647 
648 
649   if (ntree==tree_buf_sz)
650     {
651       if (tree_buf_sz)
652         {
653           tp = tree_buf;
654           n = tree_buf_sz;
655           tree_buf_sz<<=1;
656           tree_buf = (int*)malloc(tree_buf_sz*sizeof(PetscInt));
657           ivec_copy(tree_buf,tp,n);
658           free(tp);
659         }
660       else
661         {
662           tree_buf_sz = TREE_BUF_SZ;
663           tree_buf = (int*)malloc(tree_buf_sz*sizeof(PetscInt));
664         }
665     }
666 
667   tree_buf[ntree++] = elm;
668 }
669 
670 
671 
672 /******************************************************************************
673 Function: get_ngh_buf()
674 
675 Input :
676 Output:
677 Return:
678 Description:
679 
680 
681 ******************************************************************************/
682 static
683 void
684 get_ngh_buf(gs_id *gs)
685 {
686    int i, j, npw=0, ntree_map=0;
687   int p_mask_size, ngh_buf_size, buf_size;
688   int *p_mask, *sh_proc_mask, *pw_sh_proc_mask;
689   int *ngh_buf, *buf1, *buf2;
690   int offset, per_load, num_loads, or_ct, start, end;
691   int *ptr1, *ptr2, i_start, negl, nel, *elms;
692   int oper=GL_B_OR;
693   int *ptr3, *t_mask, level, ct1, ct2;
694 
695   /* to make life easier */
696   nel   = gs->nel;
697   elms  = gs->elms;
698   level = gs->level;
699 
700   /* det #bytes needed for processor bit masks and init w/mask cor. to my_id */
701   p_mask = (int*) malloc(p_mask_size=len_bit_mask(num_nodes));
702   set_bit_mask(p_mask,p_mask_size,my_id);
703 
704   /* allocate space for masks and info bufs */
705   gs->nghs = sh_proc_mask = (int*) malloc(p_mask_size);
706   gs->pw_nghs = pw_sh_proc_mask = (int*) malloc(p_mask_size);
707   gs->ngh_buf_sz = ngh_buf_size = p_mask_size*nel;
708   t_mask = (int*) malloc(p_mask_size);
709   gs->ngh_buf = ngh_buf = (int*) malloc(ngh_buf_size);
710 
711   /* comm buffer size ... memory usage bounded by ~2*msg_buf */
712   /* had thought I could exploit rendezvous threshold */
713 
714   /* default is one pass */
715   per_load = negl  = gs->negl;
716   gs->num_loads = num_loads = 1;
717   i=p_mask_size*negl;
718 
719   /* possible overflow on buffer size */
720   /* overflow hack                    */
721   if (i<0) {i=INT_MAX;}
722 
723   buf_size = PetscMin(msg_buf,i);
724 
725   /* can we do it? */
726   if (p_mask_size>buf_size)
727     {error_msg_fatal("get_ngh_buf() :: buf<pms :: %d>%d\n",p_mask_size,buf_size);}
728 
729   /* get giop buf space ... make *only* one malloc */
730   buf1 = (int*) malloc(buf_size<<1);
731 
732   /* more than one gior exchange needed? */
733   if (buf_size!=i)
734     {
735       per_load = buf_size/p_mask_size;
736       buf_size = per_load*p_mask_size;
737       gs->num_loads = num_loads = negl/per_load + (negl%per_load>0);
738     }
739 
740 
741   /* convert buf sizes from #bytes to #ints - 32 bit only! */
742 #ifdef SAFE
743   p_mask_size/=sizeof(PetscInt); ngh_buf_size/=sizeof(PetscInt); buf_size/=sizeof(PetscInt);
744 #else
745   p_mask_size>>=2; ngh_buf_size>>=2; buf_size>>=2;
746 #endif
747 
748   /* find giop work space */
749   buf2 = buf1+buf_size;
750 
751   /* hold #ints needed for processor masks */
752   gs->mask_sz=p_mask_size;
753 
754   /* init buffers */
755   ivec_zero(sh_proc_mask,p_mask_size);
756   ivec_zero(pw_sh_proc_mask,p_mask_size);
757   ivec_zero(ngh_buf,ngh_buf_size);
758 
759   /* HACK reset tree info */
760   tree_buf=NULL;
761   tree_buf_sz=ntree=0;
762 
763   /* queue the tree elements for now */
764   /* elms_q = new_queue(); */
765 
766   /* can also queue tree info for pruned or forest implememtation */
767   /*  mask_q = new_queue(); */
768 
769   /* ok do it */
770   for (ptr1=ngh_buf,ptr2=elms,end=gs->gl_min,or_ct=i=0; or_ct<num_loads; or_ct++)
771     {
772       /* identity for bitwise or is 000...000 */
773       ivec_zero(buf1,buf_size);
774 
775       /* load msg buffer */
776       for (start=end,end+=per_load,i_start=i; (offset=*ptr2)<end; i++, ptr2++)
777         {
778           offset = (offset-start)*p_mask_size;
779           ivec_copy(buf1+offset,p_mask,p_mask_size);
780         }
781 
782       /* GLOBAL: pass buffer */
783       giop(buf1,buf2,buf_size,&oper);
784 
785 
786       /* unload buffer into ngh_buf */
787       ptr2=(elms+i_start);
788       for(ptr3=buf1,j=start; j<end; ptr3+=p_mask_size,j++)
789         {
790           /* I own it ... may have to pairwise it */
791           if (j==*ptr2)
792             {
793               /* do i share it w/anyone? */
794 #ifdef SAFE
795               ct1 = ct_bits((char *)ptr3,p_mask_size*sizeof(PetscInt));
796 #else
797               ct1 = ct_bits((char *)ptr3,p_mask_size<<2);
798 #endif
799               /* guess not */
800               if (ct1<2)
801                 {ptr2++; ptr1+=p_mask_size; continue;}
802 
803               /* i do ... so keep info and turn off my bit */
804               ivec_copy(ptr1,ptr3,p_mask_size);
805               ivec_xor(ptr1,p_mask,p_mask_size);
806               ivec_or(sh_proc_mask,ptr1,p_mask_size);
807 
808               /* is it to be done pairwise? */
809               if (--ct1<=level)
810                 {
811                   npw++;
812 
813                   /* turn on high bit to indicate pw need to process */
814                   *ptr2++ |= TOP_BIT;
815                   ivec_or(pw_sh_proc_mask,ptr1,p_mask_size);
816                   ptr1+=p_mask_size;
817                   continue;
818                 }
819 
820               /* get set for next and note that I have a tree contribution */
821               /* could save exact elm index for tree here -> save a search */
822               ptr2++; ptr1+=p_mask_size; ntree_map++;
823             }
824           /* i don't but still might be involved in tree */
825           else
826             {
827 
828               /* shared by how many? */
829 #ifdef SAFE
830               ct1 = ct_bits((char *)ptr3,p_mask_size*sizeof(PetscInt));
831 #else
832               ct1 = ct_bits((char *)ptr3,p_mask_size<<2);
833 #endif
834 
835               /* none! */
836               if (ct1<2)
837                 {continue;}
838 
839               /* is it going to be done pairwise? but not by me of course!*/
840               if (--ct1<=level)
841                 {continue;}
842             }
843           /* LATER we're going to have to process it NOW */
844           /* nope ... tree it */
845           place_in_tree(j);
846         }
847     }
848 
849   free((void*)t_mask);
850   free((void*)buf1);
851 
852   gs->len_pw_list=npw;
853   gs->num_nghs = ct_bits((char *)sh_proc_mask,p_mask_size*sizeof(PetscInt));
854 
855   /* expand from bit mask list to int list and save ngh list */
856   gs->nghs = (int*) malloc(gs->num_nghs * sizeof(PetscInt));
857   bm_to_proc((char *)sh_proc_mask,p_mask_size*sizeof(PetscInt),gs->nghs);
858 
859   gs->num_pw_nghs = ct_bits((char *)pw_sh_proc_mask,p_mask_size*sizeof(PetscInt));
860 
861   oper = GL_MAX;
862   ct1 = gs->num_nghs;
863   giop(&ct1,&ct2,1,&oper);
864   gs->max_nghs = ct1;
865 
866   gs->tree_map_sz  = ntree_map;
867   gs->max_left_over=ntree;
868 
869   free((void*)p_mask);
870   free((void*)sh_proc_mask);
871 
872 }
873 
874 
875 
876 
877 
878 /******************************************************************************
879 Function: pairwise_init()
880 
881 Input :
882 Output:
883 Return:
884 Description:
885 
886 if an element is shared by fewer that level# of nodes do pairwise exch
887 ******************************************************************************/
888 static
889 void
890 set_pairwise(gs_id *gs)
891 {
892    int i, j;
893   int p_mask_size;
894   int *p_mask, *sh_proc_mask, *tmp_proc_mask;
895   int *ngh_buf, *buf2;
896   int offset;
897   int *msg_list, *msg_size, **msg_nodes, nprs;
898   int *pairwise_elm_list, len_pair_list=0;
899   int *iptr, t1, i_start, nel, *elms;
900   int ct;
901 
902 
903 
904   /* to make life easier */
905   nel  = gs->nel;
906   elms = gs->elms;
907   ngh_buf = gs->ngh_buf;
908   sh_proc_mask  = gs->pw_nghs;
909 
910   /* need a few temp masks */
911   p_mask_size   = len_bit_mask(num_nodes);
912   p_mask        = (int*) malloc(p_mask_size);
913   tmp_proc_mask = (int*) malloc(p_mask_size);
914 
915   /* set mask to my my_id's bit mask */
916   set_bit_mask(p_mask,p_mask_size,my_id);
917 
918 #ifdef SAFE
919   p_mask_size /= sizeof(PetscInt);
920 #else
921   p_mask_size >>= 2;
922 #endif
923 
924   len_pair_list=gs->len_pw_list;
925   gs->pw_elm_list=pairwise_elm_list=(int*)malloc((len_pair_list+1)*sizeof(PetscInt));
926 
927   /* how many processors (nghs) do we have to exchange with? */
928   nprs=gs->num_pairs=ct_bits((char *)sh_proc_mask,p_mask_size*sizeof(PetscInt));
929 
930 
931   /* allocate space for gs_gop() info */
932   gs->pair_list = msg_list = (int*)  malloc(sizeof(PetscInt)*nprs);
933   gs->msg_sizes = msg_size  = (int*)  malloc(sizeof(PetscInt)*nprs);
934   gs->node_list = msg_nodes = (int **) malloc(sizeof(PetscInt*)*(nprs+1));
935 
936   /* init msg_size list */
937   ivec_zero(msg_size,nprs);
938 
939   /* expand from bit mask list to int list */
940   bm_to_proc((char *)sh_proc_mask,p_mask_size*sizeof(PetscInt),msg_list);
941 
942   /* keep list of elements being handled pairwise */
943   for (i=j=0;i<nel;i++)
944     {
945       if (elms[i] & TOP_BIT)
946         {elms[i] ^= TOP_BIT; pairwise_elm_list[j++] = i;}
947     }
948   pairwise_elm_list[j] = -1;
949 
950   gs->msg_ids_out = (MPI_Request *)  malloc(sizeof(MPI_Request)*(nprs+1));
951   gs->msg_ids_out[nprs] = MPI_REQUEST_NULL;
952   gs->msg_ids_in = (MPI_Request *)  malloc(sizeof(MPI_Request)*(nprs+1));
953   gs->msg_ids_in[nprs] = MPI_REQUEST_NULL;
954   gs->pw_vals = (PetscScalar *) malloc(sizeof(PetscScalar)*len_pair_list*vec_sz);
955 
956   /* find who goes to each processor */
957   for (i_start=i=0;i<nprs;i++)
958     {
959       /* processor i's mask */
960       set_bit_mask(p_mask,p_mask_size*sizeof(PetscInt),msg_list[i]);
961 
962       /* det # going to processor i */
963       for (ct=j=0;j<len_pair_list;j++)
964         {
965           buf2 = ngh_buf+(pairwise_elm_list[j]*p_mask_size);
966           ivec_and3(tmp_proc_mask,p_mask,buf2,p_mask_size);
967           if (ct_bits((char *)tmp_proc_mask,p_mask_size*sizeof(PetscInt)))
968             {ct++;}
969         }
970       msg_size[i] = ct;
971       i_start = PetscMax(i_start,ct);
972 
973       /*space to hold nodes in message to first neighbor */
974       msg_nodes[i] = iptr = (int*) malloc(sizeof(PetscInt)*(ct+1));
975 
976       for (j=0;j<len_pair_list;j++)
977         {
978           buf2 = ngh_buf+(pairwise_elm_list[j]*p_mask_size);
979           ivec_and3(tmp_proc_mask,p_mask,buf2,p_mask_size);
980           if (ct_bits((char *)tmp_proc_mask,p_mask_size*sizeof(PetscInt)))
981             {*iptr++ = j;}
982         }
983       *iptr = -1;
984     }
985   msg_nodes[nprs] = NULL;
986 
987   j=gs->loc_node_pairs=i_start;
988   t1 = GL_MAX;
989   giop(&i_start,&offset,1,&t1);
990   gs->max_node_pairs = i_start;
991 
992   i_start=j;
993   t1 = GL_MIN;
994   giop(&i_start,&offset,1,&t1);
995   gs->min_node_pairs = i_start;
996 
997   i_start=j;
998   t1 = GL_ADD;
999   giop(&i_start,&offset,1,&t1);
1000   gs->avg_node_pairs = i_start/num_nodes + 1;
1001 
1002   i_start=nprs;
1003   t1 = GL_MAX;
1004   giop(&i_start,&offset,1,&t1);
1005   gs->max_pairs = i_start;
1006 
1007 
1008   /* remap pairwise in tail of gsi_via_bit_mask() */
1009   gs->msg_total = ivec_sum(gs->msg_sizes,nprs);
1010   gs->out = (PetscScalar *) malloc(sizeof(PetscScalar)*gs->msg_total*vec_sz);
1011   gs->in  = (PetscScalar *) malloc(sizeof(PetscScalar)*gs->msg_total*vec_sz);
1012 
1013   /* reset malloc pool */
1014   free((void*)p_mask);
1015   free((void*)tmp_proc_mask);
1016 
1017 }
1018 
1019 
1020 
1021 /******************************************************************************
1022 Function: set_tree()
1023 
1024 Input :
1025 Output:
1026 Return:
1027 Description:
1028 
1029 to do pruned tree just save ngh buf copy for each one and decode here!
1030 ******************************************************************************/
1031 static
1032 void
1033 set_tree(gs_id *gs)
1034 {
1035    int i, j, n, nel;
1036    int *iptr_in, *iptr_out, *tree_elms, *elms;
1037 
1038 
1039   /* local work ptrs */
1040   elms = gs->elms;
1041   nel     = gs->nel;
1042 
1043   /* how many via tree */
1044   gs->tree_nel  = n = ntree;
1045   gs->tree_elms = tree_elms = iptr_in = tree_buf;
1046   gs->tree_buf  = (PetscScalar *) malloc(sizeof(PetscScalar)*n*vec_sz);
1047   gs->tree_work = (PetscScalar *) malloc(sizeof(PetscScalar)*n*vec_sz);
1048   j=gs->tree_map_sz;
1049   gs->tree_map_in = iptr_in  = (int*) malloc(sizeof(PetscInt)*(j+1));
1050   gs->tree_map_out = iptr_out = (int*) malloc(sizeof(PetscInt)*(j+1));
1051 
1052   /* search the longer of the two lists */
1053   /* note ... could save this info in get_ngh_buf and save searches */
1054   if (n<=nel)
1055     {
1056       /* bijective fct w/remap - search elm list */
1057       for (i=0; i<n; i++)
1058         {
1059           if ((j=ivec_binary_search(*tree_elms++,elms,nel))>=0)
1060             {*iptr_in++ = j; *iptr_out++ = i;}
1061         }
1062     }
1063   else
1064     {
1065       for (i=0; i<nel; i++)
1066         {
1067           if ((j=ivec_binary_search(*elms++,tree_elms,n))>=0)
1068             {*iptr_in++ = i; *iptr_out++ = j;}
1069         }
1070     }
1071 
1072   /* sentinel */
1073   *iptr_in = *iptr_out = -1;
1074 
1075 }
1076 
1077 
1078 /******************************************************************************
1079 Function: gather_scatter
1080 
1081 Input :
1082 Output:
1083 Return:
1084 Description:
1085 ******************************************************************************/
1086 static
1087 void
1088 gs_gop_local_out( gs_id *gs,  PetscScalar *vals)
1089 {
1090    int *num, *map, **reduce;
1091    PetscScalar tmp;
1092 
1093 
1094   num    = gs->num_gop_local_reduce;
1095   reduce = gs->gop_local_reduce;
1096   while ((map = *reduce++))
1097     {
1098       /* wall */
1099       if (*num == 2)
1100         {
1101           num ++;
1102           vals[map[1]] = vals[map[0]];
1103         }
1104       /* corner shared by three elements */
1105       else if (*num == 3)
1106         {
1107           num ++;
1108           vals[map[2]] = vals[map[1]] = vals[map[0]];
1109         }
1110       /* corner shared by four elements */
1111       else if (*num == 4)
1112         {
1113           num ++;
1114           vals[map[3]] = vals[map[2]] = vals[map[1]] = vals[map[0]];
1115         }
1116       /* general case ... odd geoms ... 3D*/
1117       else
1118         {
1119           num++;
1120           tmp = *(vals + *map++);
1121           while (*map >= 0)
1122             {*(vals + *map++) = tmp;}
1123         }
1124     }
1125 }
1126 
1127 
1128 
1129 /******************************************************************************
1130 Function: gather_scatter
1131 
1132 Input :
1133 Output:
1134 Return:
1135 Description:
1136 ******************************************************************************/
1137 void
1138 gs_gop_binary(gs_ADT gs, PetscScalar *vals, rbfp fct)
1139 {
1140   /* local only operations!!! */
1141   if (gs->num_local)
1142     {gs_gop_local_binary(gs,vals,fct);}
1143 
1144   /* if intersection tree/pairwise and local isn't empty */
1145   if (gs->num_local_gop)
1146     {
1147       gs_gop_local_in_binary(gs,vals,fct);
1148 
1149       /* pairwise */
1150       if (gs->num_pairs)
1151         {gs_gop_pairwise_binary(gs,vals,fct);}
1152 
1153       /* tree */
1154       else if (gs->max_left_over)
1155         {gs_gop_tree_binary(gs,vals,fct);}
1156 
1157       gs_gop_local_out(gs,vals);
1158     }
1159   /* if intersection tree/pairwise and local is empty */
1160   else
1161     {
1162       /* pairwise */
1163       if (gs->num_pairs)
1164         {gs_gop_pairwise_binary(gs,vals,fct);}
1165 
1166       /* tree */
1167       else if (gs->max_left_over)
1168         {gs_gop_tree_binary(gs,vals,fct);}
1169     }
1170 }
1171 
1172 
1173 
1174 /******************************************************************************
1175 Function: gather_scatter
1176 
1177 Input :
1178 Output:
1179 Return:
1180 Description:
1181 ******************************************************************************/
1182 static
1183 void
1184 gs_gop_local_binary( gs_id *gs,  PetscScalar *vals,  rbfp fct)
1185 {
1186    int *num, *map, **reduce;
1187   PetscScalar tmp;
1188 
1189 
1190   num    = gs->num_local_reduce;
1191   reduce = gs->local_reduce;
1192   while ((map = *reduce))
1193     {
1194       num ++;
1195       (*fct)(&tmp,NULL,1);
1196       /* tmp = 0.0; */
1197       while (*map >= 0)
1198         {(*fct)(&tmp,(vals + *map),1); map++;}
1199         /*        {tmp = (*fct)(tmp,*(vals + *map)); map++;} */
1200 
1201       map = *reduce++;
1202       while (*map >= 0)
1203         {*(vals + *map++) = tmp;}
1204     }
1205 }
1206 
1207 
1208 
1209 /******************************************************************************
1210 Function: gather_scatter
1211 
1212 Input :
1213 Output:
1214 Return:
1215 Description:
1216 ******************************************************************************/
1217 static
1218 void
1219 gs_gop_local_in_binary( gs_id *gs,  PetscScalar *vals,  rbfp fct)
1220 {
1221    int *num, *map, **reduce;
1222    PetscScalar *base;
1223 
1224 
1225   num    = gs->num_gop_local_reduce;
1226 
1227   reduce = gs->gop_local_reduce;
1228   while ((map = *reduce++))
1229     {
1230       num++;
1231       base = vals + *map++;
1232       while (*map >= 0)
1233         {(*fct)(base,(vals + *map),1); map++;}
1234     }
1235 }
1236 
1237 
1238 
1239 /******************************************************************************
1240 Function: gather_scatter
1241 
1242 VERSION 3 ::
1243 
1244 Input :
1245 Output:
1246 Return:
1247 Description:
1248 ******************************************************************************/
1249 static
1250 void
1251 gs_gop_pairwise_binary( gs_id *gs,  PetscScalar *in_vals,
1252                         rbfp fct)
1253 {
1254    PetscScalar *dptr1, *dptr2, *dptr3, *in1, *in2;
1255    int *iptr, *msg_list, *msg_size, **msg_nodes;
1256    int *pw, *list, *size, **nodes;
1257   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
1258   MPI_Status status;
1259 
1260 
1261   /* strip and load s */
1262   msg_list =list         = gs->pair_list;
1263   msg_size =size         = gs->msg_sizes;
1264   msg_nodes=nodes        = gs->node_list;
1265   iptr=pw                = gs->pw_elm_list;
1266   dptr1=dptr3            = gs->pw_vals;
1267   msg_ids_in  = ids_in   = gs->msg_ids_in;
1268   msg_ids_out = ids_out  = gs->msg_ids_out;
1269   dptr2                  = gs->out;
1270   in1=in2                = gs->in;
1271 
1272   /* post the receives */
1273   /*  msg_nodes=nodes; */
1274   do
1275     {
1276       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
1277          second one *list and do list++ afterwards */
1278       MPI_Irecv(in1, *size, MPIU_SCALAR, MPI_ANY_SOURCE, MSGTAG1 + *list++,
1279                 gs->gs_comm, msg_ids_in++);
1280       in1 += *size++;
1281     }
1282   while (*++msg_nodes);
1283   msg_nodes=nodes;
1284 
1285   /* load gs values into in out gs buffers */
1286   while (*iptr >= 0)
1287     {*dptr3++ = *(in_vals + *iptr++);}
1288 
1289   /* load out buffers and post the sends */
1290   while ((iptr = *msg_nodes++))
1291     {
1292       dptr3 = dptr2;
1293       while (*iptr >= 0)
1294         {*dptr2++ = *(dptr1 + *iptr++);}
1295       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
1296       /* is msg_ids_out++ correct? */
1297       MPI_Isend(dptr3, *msg_size++, MPIU_SCALAR, *msg_list++,
1298                 MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);
1299     }
1300 
1301   if (gs->max_left_over)
1302     {gs_gop_tree_binary(gs,in_vals,fct);}
1303 
1304   /* process the received data */
1305   msg_nodes=nodes;
1306   while ((iptr = *nodes++))
1307     {
1308       /* Should I check the return value of MPI_Wait() or status? */
1309       /* Can this loop be replaced by a call to MPI_Waitall()? */
1310       MPI_Wait(ids_in++, &status);
1311       while (*iptr >= 0)
1312         {(*fct)((dptr1 + *iptr),in2,1); iptr++; in2++;}
1313       /* {*(dptr1 + *iptr) = (*fct)(*(dptr1 + *iptr),*in2); iptr++; in2++;} */
1314     }
1315 
1316   /* replace vals */
1317   while (*pw >= 0)
1318     {*(in_vals + *pw++) = *dptr1++;}
1319 
1320   /* clear isend message handles */
1321   /* This changed for clarity though it could be the same */
1322   while (*msg_nodes++)
1323     /* Should I check the return value of MPI_Wait() or status? */
1324     /* Can this loop be replaced by a call to MPI_Waitall()? */
1325     {MPI_Wait(ids_out++, &status);}
1326 }
1327 
1328 
1329 
1330 /******************************************************************************
1331 Function: gather_scatter
1332 
1333 Input :
1334 Output:
1335 Return:
1336 Description:
1337 ******************************************************************************/
1338 static
1339 void
1340 gs_gop_tree_binary(gs_id *gs, PetscScalar *vals,  rbfp fct)
1341 {
1342   int size;
1343   int *in, *out;
1344   PetscScalar *buf, *work;
1345 
1346   in   = gs->tree_map_in;
1347   out  = gs->tree_map_out;
1348   buf  = gs->tree_buf;
1349   work = gs->tree_work;
1350   size = gs->tree_nel;
1351 
1352   /* load vals vector w/identity */
1353   (*fct)(buf,NULL,size);
1354 
1355   /* load my contribution into val vector */
1356   while (*in >= 0)
1357     {(*fct)((buf + *out++),(vals + *in++),-1);}
1358 
1359   gfop(buf,work,size,(vbfp)fct,MPIU_SCALAR,0);
1360 
1361   in   = gs->tree_map_in;
1362   out  = gs->tree_map_out;
1363   while (*in >= 0)
1364     {(*fct)((vals + *in++),(buf + *out++),-1);}
1365 
1366 }
1367 
1368 
1369 
1370 
1371 /******************************************************************************
1372 Function: gather_scatter
1373 
1374 Input :
1375 Output:
1376 Return:
1377 Description:
1378 ******************************************************************************/
1379 void
1380 gs_gop( gs_id *gs,  PetscScalar *vals,  const char *op)
1381 {
1382   switch (*op) {
1383   case '+':
1384     gs_gop_plus(gs,vals);
1385     break;
1386   case '*':
1387     gs_gop_times(gs,vals);
1388     break;
1389   case 'a':
1390     gs_gop_min_abs(gs,vals);
1391     break;
1392   case 'A':
1393     gs_gop_max_abs(gs,vals);
1394     break;
1395   case 'e':
1396     gs_gop_exists(gs,vals);
1397     break;
1398   case 'm':
1399     gs_gop_min(gs,vals);
1400     break;
1401   case 'M':
1402     gs_gop_max(gs,vals); break;
1403     /*
1404     if (*(op+1)=='\0')
1405       {gs_gop_max(gs,vals); break;}
1406     else if (*(op+1)=='X')
1407       {gs_gop_max_abs(gs,vals); break;}
1408     else if (*(op+1)=='N')
1409       {gs_gop_min_abs(gs,vals); break;}
1410     */
1411   default:
1412     error_msg_warning("gs_gop() :: %c is not a valid op",op[0]);
1413     error_msg_warning("gs_gop() :: default :: plus");
1414     gs_gop_plus(gs,vals);
1415     break;
1416   }
1417 }
1418 
1419 
1420 /******************************************************************************
1421 Function: gather_scatter
1422 
1423 Input :
1424 Output:
1425 Return:
1426 Description:
1427 ******************************************************************************/
1428 static void
1429 gs_gop_exists( gs_id *gs,  PetscScalar *vals)
1430 {
1431   /* local only operations!!! */
1432   if (gs->num_local)
1433     {gs_gop_local_exists(gs,vals);}
1434 
1435   /* if intersection tree/pairwise and local isn't empty */
1436   if (gs->num_local_gop)
1437     {
1438       gs_gop_local_in_exists(gs,vals);
1439 
1440       /* pairwise */
1441       if (gs->num_pairs)
1442         {gs_gop_pairwise_exists(gs,vals);}
1443 
1444       /* tree */
1445       else if (gs->max_left_over)
1446         {gs_gop_tree_exists(gs,vals);}
1447 
1448       gs_gop_local_out(gs,vals);
1449     }
1450   /* if intersection tree/pairwise and local is empty */
1451   else
1452     {
1453       /* pairwise */
1454       if (gs->num_pairs)
1455         {gs_gop_pairwise_exists(gs,vals);}
1456 
1457       /* tree */
1458       else if (gs->max_left_over)
1459         {gs_gop_tree_exists(gs,vals);}
1460     }
1461 }
1462 
1463 
1464 
1465 /******************************************************************************
1466 Function: gather_scatter
1467 
1468 Input :
1469 Output:
1470 Return:
1471 Description:
1472 ******************************************************************************/
1473 static
1474 void
1475 gs_gop_local_exists( gs_id *gs,  PetscScalar *vals)
1476 {
1477    int *num, *map, **reduce;
1478    PetscScalar tmp;
1479 
1480 
1481   num    = gs->num_local_reduce;
1482   reduce = gs->local_reduce;
1483   while ((map = *reduce))
1484     {
1485       num ++;
1486       tmp = 0.0;
1487       while (*map >= 0)
1488         {tmp = EXISTS(tmp,*(vals + *map)); map++;}
1489 
1490       map = *reduce++;
1491       while (*map >= 0)
1492         {*(vals + *map++) = tmp;}
1493     }
1494 }
1495 
1496 
1497 
1498 /******************************************************************************
1499 Function: gather_scatter
1500 
1501 Input :
1502 Output:
1503 Return:
1504 Description:
1505 ******************************************************************************/
1506 static
1507 void
1508 gs_gop_local_in_exists( gs_id *gs,  PetscScalar *vals)
1509 {
1510    int *num, *map, **reduce;
1511    PetscScalar *base;
1512 
1513 
1514   num    = gs->num_gop_local_reduce;
1515   reduce = gs->gop_local_reduce;
1516   while ((map = *reduce++))
1517     {
1518       num++;
1519       base = vals + *map++;
1520       while (*map >= 0)
1521         {*base = EXISTS(*base,*(vals + *map)); map++;}
1522     }
1523 }
1524 
1525 
1526 
1527 /******************************************************************************
1528 Function: gather_scatter
1529 
1530 VERSION 3 ::
1531 
1532 Input :
1533 Output:
1534 Return:
1535 Description:
1536 ******************************************************************************/
1537 static
1538 void
1539 gs_gop_pairwise_exists( gs_id *gs,  PetscScalar *in_vals)
1540 {
1541    PetscScalar *dptr1, *dptr2, *dptr3, *in1, *in2;
1542    int *iptr, *msg_list, *msg_size, **msg_nodes;
1543    int *pw, *list, *size, **nodes;
1544   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
1545   MPI_Status status;
1546 
1547 
1548   /* strip and load s */
1549   msg_list =list         = gs->pair_list;
1550   msg_size =size         = gs->msg_sizes;
1551   msg_nodes=nodes        = gs->node_list;
1552   iptr=pw                = gs->pw_elm_list;
1553   dptr1=dptr3            = gs->pw_vals;
1554   msg_ids_in  = ids_in   = gs->msg_ids_in;
1555   msg_ids_out = ids_out  = gs->msg_ids_out;
1556   dptr2                  = gs->out;
1557   in1=in2                = gs->in;
1558 
1559   /* post the receives */
1560   /*  msg_nodes=nodes; */
1561   do
1562     {
1563       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
1564          second one *list and do list++ afterwards */
1565       MPI_Irecv(in1, *size, MPIU_SCALAR, MPI_ANY_SOURCE, MSGTAG1 + *list++,
1566                 gs->gs_comm, msg_ids_in++);
1567       in1 += *size++;
1568     }
1569   while (*++msg_nodes);
1570   msg_nodes=nodes;
1571 
1572   /* load gs values into in out gs buffers */
1573   while (*iptr >= 0)
1574     {*dptr3++ = *(in_vals + *iptr++);}
1575 
1576   /* load out buffers and post the sends */
1577   while ((iptr = *msg_nodes++))
1578     {
1579       dptr3 = dptr2;
1580       while (*iptr >= 0)
1581         {*dptr2++ = *(dptr1 + *iptr++);}
1582       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
1583       /* is msg_ids_out++ correct? */
1584       MPI_Isend(dptr3, *msg_size++, MPIU_SCALAR, *msg_list++,
1585                 MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);
1586     }
1587 
1588   if (gs->max_left_over)
1589     {gs_gop_tree_exists(gs,in_vals);}
1590 
1591   /* process the received data */
1592   msg_nodes=nodes;
1593   while ((iptr = *nodes++))
1594     {
1595       /* Should I check the return value of MPI_Wait() or status? */
1596       /* Can this loop be replaced by a call to MPI_Waitall()? */
1597       MPI_Wait(ids_in++, &status);
1598       while (*iptr >= 0)
1599         {*(dptr1 + *iptr) = EXISTS(*(dptr1 + *iptr),*in2); iptr++; in2++;}
1600     }
1601 
1602   /* replace vals */
1603   while (*pw >= 0)
1604     {*(in_vals + *pw++) = *dptr1++;}
1605 
1606   /* clear isend message handles */
1607   /* This changed for clarity though it could be the same */
1608   while (*msg_nodes++)
1609     /* Should I check the return value of MPI_Wait() or status? */
1610     /* Can this loop be replaced by a call to MPI_Waitall()? */
1611     {MPI_Wait(ids_out++, &status);}
1612 }
1613 
1614 
1615 
1616 /******************************************************************************
1617 Function: gather_scatter
1618 
1619 Input :
1620 Output:
1621 Return:
1622 Description:
1623 ******************************************************************************/
1624 static
1625 void
1626 gs_gop_tree_exists(gs_id *gs, PetscScalar *vals)
1627 {
1628   int size;
1629   int *in, *out;
1630   PetscScalar *buf, *work;
1631   int op[] = {GL_EXISTS,0};
1632 
1633 
1634   in   = gs->tree_map_in;
1635   out  = gs->tree_map_out;
1636   buf  = gs->tree_buf;
1637   work = gs->tree_work;
1638   size = gs->tree_nel;
1639 
1640   rvec_zero(buf,size);
1641 
1642   while (*in >= 0)
1643     {
1644       /*
1645       printf("%d :: out=%d\n",my_id,*out);
1646       printf("%d :: in=%d\n",my_id,*in);
1647       */
1648       *(buf + *out++) = *(vals + *in++);
1649     }
1650 
1651   grop(buf,work,size,op);
1652 
1653   in   = gs->tree_map_in;
1654   out  = gs->tree_map_out;
1655 
1656   while (*in >= 0)
1657     {*(vals + *in++) = *(buf + *out++);}
1658 
1659 }
1660 
1661 
1662 
1663 /******************************************************************************
1664 Function: gather_scatter
1665 
1666 Input :
1667 Output:
1668 Return:
1669 Description:
1670 ******************************************************************************/
1671 static void
1672 gs_gop_max_abs( gs_id *gs,  PetscScalar *vals)
1673 {
1674   /* local only operations!!! */
1675   if (gs->num_local)
1676     {gs_gop_local_max_abs(gs,vals);}
1677 
1678   /* if intersection tree/pairwise and local isn't empty */
1679   if (gs->num_local_gop)
1680     {
1681       gs_gop_local_in_max_abs(gs,vals);
1682 
1683       /* pairwise */
1684       if (gs->num_pairs)
1685         {gs_gop_pairwise_max_abs(gs,vals);}
1686 
1687       /* tree */
1688       else if (gs->max_left_over)
1689         {gs_gop_tree_max_abs(gs,vals);}
1690 
1691       gs_gop_local_out(gs,vals);
1692     }
1693   /* if intersection tree/pairwise and local is empty */
1694   else
1695     {
1696       /* pairwise */
1697       if (gs->num_pairs)
1698         {gs_gop_pairwise_max_abs(gs,vals);}
1699 
1700       /* tree */
1701       else if (gs->max_left_over)
1702         {gs_gop_tree_max_abs(gs,vals);}
1703     }
1704 }
1705 
1706 
1707 
1708 /******************************************************************************
1709 Function: gather_scatter
1710 
1711 Input :
1712 Output:
1713 Return:
1714 Description:
1715 ******************************************************************************/
1716 static
1717 void
1718 gs_gop_local_max_abs( gs_id *gs,  PetscScalar *vals)
1719 {
1720    int *num, *map, **reduce;
1721    PetscScalar tmp;
1722 
1723 
1724   num    = gs->num_local_reduce;
1725   reduce = gs->local_reduce;
1726   while ((map = *reduce))
1727     {
1728       num ++;
1729       tmp = 0.0;
1730       while (*map >= 0)
1731         {tmp = MAX_FABS(tmp,*(vals + *map)); map++;}
1732 
1733       map = *reduce++;
1734       while (*map >= 0)
1735         {*(vals + *map++) = tmp;}
1736     }
1737 }
1738 
1739 
1740 
1741 /******************************************************************************
1742 Function: gather_scatter
1743 
1744 Input :
1745 Output:
1746 Return:
1747 Description:
1748 ******************************************************************************/
1749 static
1750 void
1751 gs_gop_local_in_max_abs( gs_id *gs,  PetscScalar *vals)
1752 {
1753    int *num, *map, **reduce;
1754    PetscScalar *base;
1755 
1756 
1757   num    = gs->num_gop_local_reduce;
1758   reduce = gs->gop_local_reduce;
1759   while ((map = *reduce++))
1760     {
1761       num++;
1762       base = vals + *map++;
1763       while (*map >= 0)
1764         {*base = MAX_FABS(*base,*(vals + *map)); map++;}
1765     }
1766 }
1767 
1768 
1769 
1770 /******************************************************************************
1771 Function: gather_scatter
1772 
1773 VERSION 3 ::
1774 
1775 Input :
1776 Output:
1777 Return:
1778 Description:
1779 ******************************************************************************/
1780 static
1781 void
1782 gs_gop_pairwise_max_abs( gs_id *gs,  PetscScalar *in_vals)
1783 {
1784    PetscScalar *dptr1, *dptr2, *dptr3, *in1, *in2;
1785    int *iptr, *msg_list, *msg_size, **msg_nodes;
1786    int *pw, *list, *size, **nodes;
1787   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
1788   MPI_Status status;
1789 
1790 
1791   /* strip and load s */
1792   msg_list =list         = gs->pair_list;
1793   msg_size =size         = gs->msg_sizes;
1794   msg_nodes=nodes        = gs->node_list;
1795   iptr=pw                = gs->pw_elm_list;
1796   dptr1=dptr3            = gs->pw_vals;
1797   msg_ids_in  = ids_in   = gs->msg_ids_in;
1798   msg_ids_out = ids_out  = gs->msg_ids_out;
1799   dptr2                  = gs->out;
1800   in1=in2                = gs->in;
1801 
1802   /* post the receives */
1803   /*  msg_nodes=nodes; */
1804   do
1805     {
1806       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
1807          second one *list and do list++ afterwards */
1808       MPI_Irecv(in1, *size, MPIU_SCALAR, MPI_ANY_SOURCE, MSGTAG1 + *list++,
1809                 gs->gs_comm, msg_ids_in++);
1810       in1 += *size++;
1811     }
1812   while (*++msg_nodes);
1813   msg_nodes=nodes;
1814 
1815   /* load gs values into in out gs buffers */
1816   while (*iptr >= 0)
1817     {*dptr3++ = *(in_vals + *iptr++);}
1818 
1819   /* load out buffers and post the sends */
1820   while ((iptr = *msg_nodes++))
1821     {
1822       dptr3 = dptr2;
1823       while (*iptr >= 0)
1824         {*dptr2++ = *(dptr1 + *iptr++);}
1825       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
1826       /* is msg_ids_out++ correct? */
1827       MPI_Isend(dptr3, *msg_size++, MPIU_SCALAR, *msg_list++,
1828                 MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);
1829     }
1830 
1831   if (gs->max_left_over)
1832     {gs_gop_tree_max_abs(gs,in_vals);}
1833 
1834   /* process the received data */
1835   msg_nodes=nodes;
1836   while ((iptr = *nodes++))
1837     {
1838       /* Should I check the return value of MPI_Wait() or status? */
1839       /* Can this loop be replaced by a call to MPI_Waitall()? */
1840       MPI_Wait(ids_in++, &status);
1841       while (*iptr >= 0)
1842         {*(dptr1 + *iptr) = MAX_FABS(*(dptr1 + *iptr),*in2); iptr++; in2++;}
1843     }
1844 
1845   /* replace vals */
1846   while (*pw >= 0)
1847     {*(in_vals + *pw++) = *dptr1++;}
1848 
1849   /* clear isend message handles */
1850   /* This changed for clarity though it could be the same */
1851   while (*msg_nodes++)
1852     /* Should I check the return value of MPI_Wait() or status? */
1853     /* Can this loop be replaced by a call to MPI_Waitall()? */
1854     {MPI_Wait(ids_out++, &status);}
1855 }
1856 
1857 
1858 
1859 /******************************************************************************
1860 Function: gather_scatter
1861 
1862 Input :
1863 Output:
1864 Return:
1865 Description:
1866 ******************************************************************************/
1867 static
1868 void
1869 gs_gop_tree_max_abs(gs_id *gs, PetscScalar *vals)
1870 {
1871   int size;
1872   int *in, *out;
1873   PetscScalar *buf, *work;
1874   int op[] = {GL_MAX_ABS,0};
1875 
1876 
1877   in   = gs->tree_map_in;
1878   out  = gs->tree_map_out;
1879   buf  = gs->tree_buf;
1880   work = gs->tree_work;
1881   size = gs->tree_nel;
1882 
1883   rvec_zero(buf,size);
1884 
1885   while (*in >= 0)
1886     {
1887       /*
1888       printf("%d :: out=%d\n",my_id,*out);
1889       printf("%d :: in=%d\n",my_id,*in);
1890       */
1891       *(buf + *out++) = *(vals + *in++);
1892     }
1893 
1894   grop(buf,work,size,op);
1895 
1896   in   = gs->tree_map_in;
1897   out  = gs->tree_map_out;
1898 
1899   while (*in >= 0)
1900     {*(vals + *in++) = *(buf + *out++);}
1901 
1902 }
1903 
1904 
1905 
1906 /******************************************************************************
1907 Function: gather_scatter
1908 
1909 Input :
1910 Output:
1911 Return:
1912 Description:
1913 ******************************************************************************/
1914 static void
1915 gs_gop_max( gs_id *gs,  PetscScalar *vals)
1916 {
1917 
1918   /* local only operations!!! */
1919   if (gs->num_local)
1920     {gs_gop_local_max(gs,vals);}
1921 
1922   /* if intersection tree/pairwise and local isn't empty */
1923   if (gs->num_local_gop)
1924     {
1925       gs_gop_local_in_max(gs,vals);
1926 
1927       /* pairwise */
1928       if (gs->num_pairs)
1929         {gs_gop_pairwise_max(gs,vals);}
1930 
1931       /* tree */
1932       else if (gs->max_left_over)
1933         {gs_gop_tree_max(gs,vals);}
1934 
1935       gs_gop_local_out(gs,vals);
1936     }
1937   /* if intersection tree/pairwise and local is empty */
1938   else
1939     {
1940       /* pairwise */
1941       if (gs->num_pairs)
1942         {gs_gop_pairwise_max(gs,vals);}
1943 
1944       /* tree */
1945       else if (gs->max_left_over)
1946         {gs_gop_tree_max(gs,vals);}
1947     }
1948 }
1949 
1950 
1951 
1952 /******************************************************************************
1953 Function: gather_scatter
1954 
1955 Input :
1956 Output:
1957 Return:
1958 Description:
1959 ******************************************************************************/
1960 static
1961 void
1962 gs_gop_local_max( gs_id *gs,  PetscScalar *vals)
1963 {
1964    int *num, *map, **reduce;
1965    PetscScalar tmp;
1966 
1967 
1968   num    = gs->num_local_reduce;
1969   reduce = gs->local_reduce;
1970   while ((map = *reduce))
1971     {
1972       num ++;
1973       tmp = -REAL_MAX;
1974       while (*map >= 0)
1975         {tmp = PetscMax(tmp,*(vals + *map)); map++;}
1976 
1977       map = *reduce++;
1978       while (*map >= 0)
1979         {*(vals + *map++) = tmp;}
1980     }
1981 }
1982 
1983 
1984 
1985 /******************************************************************************
1986 Function: gather_scatter
1987 
1988 Input :
1989 Output:
1990 Return:
1991 Description:
1992 ******************************************************************************/
1993 static
1994 void
1995 gs_gop_local_in_max( gs_id *gs,  PetscScalar *vals)
1996 {
1997    int *num, *map, **reduce;
1998    PetscScalar *base;
1999 
2000 
2001   num    = gs->num_gop_local_reduce;
2002   reduce = gs->gop_local_reduce;
2003   while ((map = *reduce++))
2004     {
2005       num++;
2006       base = vals + *map++;
2007       while (*map >= 0)
2008         {*base = PetscMax(*base,*(vals + *map)); map++;}
2009     }
2010 }
2011 
2012 
2013 
2014 /******************************************************************************
2015 Function: gather_scatter
2016 
2017 VERSION 3 ::
2018 
2019 Input :
2020 Output:
2021 Return:
2022 Description:
2023 ******************************************************************************/
2024 static
2025 void
2026 gs_gop_pairwise_max( gs_id *gs,  PetscScalar *in_vals)
2027 {
2028    PetscScalar *dptr1, *dptr2, *dptr3, *in1, *in2;
2029    int *iptr, *msg_list, *msg_size, **msg_nodes;
2030    int *pw, *list, *size, **nodes;
2031   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
2032   MPI_Status status;
2033 
2034 
2035   /* strip and load s */
2036   msg_list =list         = gs->pair_list;
2037   msg_size =size         = gs->msg_sizes;
2038   msg_nodes=nodes        = gs->node_list;
2039   iptr=pw                = gs->pw_elm_list;
2040   dptr1=dptr3            = gs->pw_vals;
2041   msg_ids_in  = ids_in   = gs->msg_ids_in;
2042   msg_ids_out = ids_out  = gs->msg_ids_out;
2043   dptr2                  = gs->out;
2044   in1=in2                = gs->in;
2045 
2046   /* post the receives */
2047   /*  msg_nodes=nodes; */
2048   do
2049     {
2050       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
2051          second one *list and do list++ afterwards */
2052       MPI_Irecv(in1, *size, MPIU_SCALAR, MPI_ANY_SOURCE, MSGTAG1 + *list++,
2053                 gs->gs_comm, msg_ids_in++);
2054       in1 += *size++;
2055     }
2056   while (*++msg_nodes);
2057   msg_nodes=nodes;
2058 
2059   /* load gs values into in out gs buffers */
2060   while (*iptr >= 0)
2061     {*dptr3++ = *(in_vals + *iptr++);}
2062 
2063   /* load out buffers and post the sends */
2064   while ((iptr = *msg_nodes++))
2065     {
2066       dptr3 = dptr2;
2067       while (*iptr >= 0)
2068         {*dptr2++ = *(dptr1 + *iptr++);}
2069       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
2070       /* is msg_ids_out++ correct? */
2071       MPI_Isend(dptr3, *msg_size++, MPIU_SCALAR, *msg_list++,
2072                 MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);
2073     }
2074 
2075   if (gs->max_left_over)
2076     {gs_gop_tree_max(gs,in_vals);}
2077 
2078   /* process the received data */
2079   msg_nodes=nodes;
2080   while ((iptr = *nodes++))
2081     {
2082       /* Should I check the return value of MPI_Wait() or status? */
2083       /* Can this loop be replaced by a call to MPI_Waitall()? */
2084       MPI_Wait(ids_in++, &status);
2085       while (*iptr >= 0)
2086         {*(dptr1 + *iptr) = PetscMax(*(dptr1 + *iptr),*in2); iptr++; in2++;}
2087     }
2088 
2089   /* replace vals */
2090   while (*pw >= 0)
2091     {*(in_vals + *pw++) = *dptr1++;}
2092 
2093   /* clear isend message handles */
2094   /* This changed for clarity though it could be the same */
2095   while (*msg_nodes++)
2096     /* Should I check the return value of MPI_Wait() or status? */
2097     /* Can this loop be replaced by a call to MPI_Waitall()? */
2098     {MPI_Wait(ids_out++, &status);}
2099 }
2100 
2101 
2102 
2103 /******************************************************************************
2104 Function: gather_scatter
2105 
2106 Input :
2107 Output:
2108 Return:
2109 Description:
2110 ******************************************************************************/
2111 static
2112 void
2113 gs_gop_tree_max(gs_id *gs, PetscScalar *vals)
2114 {
2115   int size;
2116   int *in, *out;
2117   PetscScalar *buf, *work;
2118 
2119   in   = gs->tree_map_in;
2120   out  = gs->tree_map_out;
2121   buf  = gs->tree_buf;
2122   work = gs->tree_work;
2123   size = gs->tree_nel;
2124 
2125   rvec_set(buf,-REAL_MAX,size);
2126 
2127   while (*in >= 0)
2128     {*(buf + *out++) = *(vals + *in++);}
2129 
2130   in   = gs->tree_map_in;
2131   out  = gs->tree_map_out;
2132   MPI_Allreduce(buf,work,size,MPIU_SCALAR,MPI_MAX,gs->gs_comm);
2133   while (*in >= 0)
2134     {*(vals + *in++) = *(work + *out++);}
2135 
2136 }
2137 
2138 
2139 
2140 /******************************************************************************
2141 Function: gather_scatter
2142 
2143 Input :
2144 Output:
2145 Return:
2146 Description:
2147 ******************************************************************************/
2148 static void
2149 gs_gop_min_abs( gs_id *gs,  PetscScalar *vals)
2150 {
2151 
2152   /* local only operations!!! */
2153   if (gs->num_local)
2154     {gs_gop_local_min_abs(gs,vals);}
2155 
2156   /* if intersection tree/pairwise and local isn't empty */
2157   if (gs->num_local_gop)
2158     {
2159       gs_gop_local_in_min_abs(gs,vals);
2160 
2161       /* pairwise */
2162       if (gs->num_pairs)
2163         {gs_gop_pairwise_min_abs(gs,vals);}
2164 
2165       /* tree */
2166       else if (gs->max_left_over)
2167         {gs_gop_tree_min_abs(gs,vals);}
2168 
2169       gs_gop_local_out(gs,vals);
2170     }
2171   /* if intersection tree/pairwise and local is empty */
2172   else
2173     {
2174       /* pairwise */
2175       if (gs->num_pairs)
2176         {gs_gop_pairwise_min_abs(gs,vals);}
2177 
2178       /* tree */
2179       else if (gs->max_left_over)
2180         {gs_gop_tree_min_abs(gs,vals);}
2181     }
2182 }
2183 
2184 
2185 
2186 /******************************************************************************
2187 Function: gather_scatter
2188 
2189 Input :
2190 Output:
2191 Return:
2192 Description:
2193 ******************************************************************************/
2194 static
2195 void
2196 gs_gop_local_min_abs( gs_id *gs,  PetscScalar *vals)
2197 {
2198    int *num, *map, **reduce;
2199    PetscScalar tmp;
2200 
2201 
2202   num    = gs->num_local_reduce;
2203   reduce = gs->local_reduce;
2204   while ((map = *reduce))
2205     {
2206       num ++;
2207       tmp = REAL_MAX;
2208       while (*map >= 0)
2209         {tmp = MIN_FABS(tmp,*(vals + *map)); map++;}
2210 
2211       map = *reduce++;
2212       while (*map >= 0)
2213         {*(vals + *map++) = tmp;}
2214     }
2215 }
2216 
2217 
2218 
2219 /******************************************************************************
2220 Function: gather_scatter
2221 
2222 Input :
2223 Output:
2224 Return:
2225 Description:
2226 ******************************************************************************/
2227 static
2228 void
2229 gs_gop_local_in_min_abs( gs_id *gs,  PetscScalar *vals)
2230 {
2231    int *num, *map, **reduce;
2232    PetscScalar *base;
2233 
2234   num    = gs->num_gop_local_reduce;
2235   reduce = gs->gop_local_reduce;
2236   while ((map = *reduce++))
2237     {
2238       num++;
2239       base = vals + *map++;
2240       while (*map >= 0)
2241         {*base = MIN_FABS(*base,*(vals + *map)); map++;}
2242     }
2243 }
2244 
2245 
2246 
2247 /******************************************************************************
2248 Function: gather_scatter
2249 
2250 VERSION 3 ::
2251 
2252 Input :
2253 Output:
2254 Return:
2255 Description:
2256 ******************************************************************************/
2257 static
2258 void
2259 gs_gop_pairwise_min_abs( gs_id *gs,  PetscScalar *in_vals)
2260 {
2261    PetscScalar *dptr1, *dptr2, *dptr3, *in1, *in2;
2262    int *iptr, *msg_list, *msg_size, **msg_nodes;
2263    int *pw, *list, *size, **nodes;
2264   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
2265   MPI_Status status;
2266 
2267 
2268   /* strip and load s */
2269   msg_list =list         = gs->pair_list;
2270   msg_size =size         = gs->msg_sizes;
2271   msg_nodes=nodes        = gs->node_list;
2272   iptr=pw                = gs->pw_elm_list;
2273   dptr1=dptr3            = gs->pw_vals;
2274   msg_ids_in  = ids_in   = gs->msg_ids_in;
2275   msg_ids_out = ids_out  = gs->msg_ids_out;
2276   dptr2                  = gs->out;
2277   in1=in2                = gs->in;
2278 
2279   /* post the receives */
2280   /*  msg_nodes=nodes; */
2281   do
2282     {
2283       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
2284          second one *list and do list++ afterwards */
2285       MPI_Irecv(in1, *size, MPIU_SCALAR, MPI_ANY_SOURCE, MSGTAG1 + *list++,
2286                 gs->gs_comm, msg_ids_in++);
2287       in1 += *size++;
2288     }
2289   while (*++msg_nodes);
2290   msg_nodes=nodes;
2291 
2292   /* load gs values into in out gs buffers */
2293   while (*iptr >= 0)
2294     {*dptr3++ = *(in_vals + *iptr++);}
2295 
2296   /* load out buffers and post the sends */
2297   while ((iptr = *msg_nodes++))
2298     {
2299       dptr3 = dptr2;
2300       while (*iptr >= 0)
2301         {*dptr2++ = *(dptr1 + *iptr++);}
2302       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
2303       /* is msg_ids_out++ correct? */
2304       MPI_Isend(dptr3, *msg_size++, MPIU_SCALAR, *msg_list++,
2305                 MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);
2306     }
2307 
2308   if (gs->max_left_over)
2309     {gs_gop_tree_min_abs(gs,in_vals);}
2310 
2311   /* process the received data */
2312   msg_nodes=nodes;
2313   while ((iptr = *nodes++))
2314     {
2315       /* Should I check the return value of MPI_Wait() or status? */
2316       /* Can this loop be replaced by a call to MPI_Waitall()? */
2317       MPI_Wait(ids_in++, &status);
2318       while (*iptr >= 0)
2319         {*(dptr1 + *iptr) = MIN_FABS(*(dptr1 + *iptr),*in2); iptr++; in2++;}
2320     }
2321 
2322   /* replace vals */
2323   while (*pw >= 0)
2324     {*(in_vals + *pw++) = *dptr1++;}
2325 
2326   /* clear isend message handles */
2327   /* This changed for clarity though it could be the same */
2328   while (*msg_nodes++)
2329     /* Should I check the return value of MPI_Wait() or status? */
2330     /* Can this loop be replaced by a call to MPI_Waitall()? */
2331     {MPI_Wait(ids_out++, &status);}
2332 }
2333 
2334 
2335 
2336 /******************************************************************************
2337 Function: gather_scatter
2338 
2339 Input :
2340 Output:
2341 Return:
2342 Description:
2343 ******************************************************************************/
2344 static
2345 void
2346 gs_gop_tree_min_abs(gs_id *gs, PetscScalar *vals)
2347 {
2348   int size;
2349   int *in, *out;
2350   PetscScalar *buf, *work;
2351   int op[] = {GL_MIN_ABS,0};
2352 
2353 
2354   in   = gs->tree_map_in;
2355   out  = gs->tree_map_out;
2356   buf  = gs->tree_buf;
2357   work = gs->tree_work;
2358   size = gs->tree_nel;
2359 
2360   rvec_set(buf,REAL_MAX,size);
2361 
2362   while (*in >= 0)
2363     {*(buf + *out++) = *(vals + *in++);}
2364 
2365   in   = gs->tree_map_in;
2366   out  = gs->tree_map_out;
2367   grop(buf,work,size,op);
2368   while (*in >= 0)
2369     {*(vals + *in++) = *(buf + *out++);}
2370 
2371 }
2372 
2373 
2374 
2375 /******************************************************************************
2376 Function: gather_scatter
2377 
2378 Input :
2379 Output:
2380 Return:
2381 Description:
2382 ******************************************************************************/
2383 static void
2384 gs_gop_min( gs_id *gs,  PetscScalar *vals)
2385 {
2386 
2387   /* local only operations!!! */
2388   if (gs->num_local)
2389     {gs_gop_local_min(gs,vals);}
2390 
2391   /* if intersection tree/pairwise and local isn't empty */
2392   if (gs->num_local_gop)
2393     {
2394       gs_gop_local_in_min(gs,vals);
2395 
2396       /* pairwise */
2397       if (gs->num_pairs)
2398         {gs_gop_pairwise_min(gs,vals);}
2399 
2400       /* tree */
2401       else if (gs->max_left_over)
2402         {gs_gop_tree_min(gs,vals);}
2403 
2404       gs_gop_local_out(gs,vals);
2405     }
2406   /* if intersection tree/pairwise and local is empty */
2407   else
2408     {
2409       /* pairwise */
2410       if (gs->num_pairs)
2411         {gs_gop_pairwise_min(gs,vals);}
2412 
2413       /* tree */
2414       else if (gs->max_left_over)
2415         {gs_gop_tree_min(gs,vals);}
2416     }
2417 }
2418 
2419 
2420 
2421 /******************************************************************************
2422 Function: gather_scatter
2423 
2424 Input :
2425 Output:
2426 Return:
2427 Description:
2428 ******************************************************************************/
2429 static
2430 void
2431 gs_gop_local_min( gs_id *gs,  PetscScalar *vals)
2432 {
2433    int *num, *map, **reduce;
2434    PetscScalar tmp;
2435 
2436   num    = gs->num_local_reduce;
2437   reduce = gs->local_reduce;
2438   while ((map = *reduce))
2439     {
2440       num ++;
2441       tmp = REAL_MAX;
2442       while (*map >= 0)
2443         {tmp = PetscMin(tmp,*(vals + *map)); map++;}
2444 
2445       map = *reduce++;
2446       while (*map >= 0)
2447         {*(vals + *map++) = tmp;}
2448     }
2449 }
2450 
2451 
2452 
2453 /******************************************************************************
2454 Function: gather_scatter
2455 
2456 Input :
2457 Output:
2458 Return:
2459 Description:
2460 ******************************************************************************/
2461 static
2462 void
2463 gs_gop_local_in_min( gs_id *gs,  PetscScalar *vals)
2464 {
2465    int *num, *map, **reduce;
2466    PetscScalar *base;
2467 
2468   num    = gs->num_gop_local_reduce;
2469   reduce = gs->gop_local_reduce;
2470   while ((map = *reduce++))
2471     {
2472       num++;
2473       base = vals + *map++;
2474       while (*map >= 0)
2475         {*base = PetscMin(*base,*(vals + *map)); map++;}
2476     }
2477 }
2478 
2479 
2480 
2481 /******************************************************************************
2482 Function: gather_scatter
2483 
2484 VERSION 3 ::
2485 
2486 Input :
2487 Output:
2488 Return:
2489 Description:
2490 ******************************************************************************/
2491 static
2492 void
2493 gs_gop_pairwise_min( gs_id *gs,  PetscScalar *in_vals)
2494 {
2495    PetscScalar *dptr1, *dptr2, *dptr3, *in1, *in2;
2496    int *iptr, *msg_list, *msg_size, **msg_nodes;
2497    int *pw, *list, *size, **nodes;
2498   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
2499   MPI_Status status;
2500 
2501 
2502   /* strip and load s */
2503   msg_list =list         = gs->pair_list;
2504   msg_size =size         = gs->msg_sizes;
2505   msg_nodes=nodes        = gs->node_list;
2506   iptr=pw                = gs->pw_elm_list;
2507   dptr1=dptr3            = gs->pw_vals;
2508   msg_ids_in  = ids_in   = gs->msg_ids_in;
2509   msg_ids_out = ids_out  = gs->msg_ids_out;
2510   dptr2                  = gs->out;
2511   in1=in2                = gs->in;
2512 
2513   /* post the receives */
2514   /*  msg_nodes=nodes; */
2515   do
2516     {
2517       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
2518          second one *list and do list++ afterwards */
2519       MPI_Irecv(in1, *size, MPIU_SCALAR, MPI_ANY_SOURCE, MSGTAG1 + *list++,
2520                 gs->gs_comm, msg_ids_in++);
2521       in1 += *size++;
2522     }
2523   while (*++msg_nodes);
2524   msg_nodes=nodes;
2525 
2526   /* load gs values into in out gs buffers */
2527   while (*iptr >= 0)
2528     {*dptr3++ = *(in_vals + *iptr++);}
2529 
2530   /* load out buffers and post the sends */
2531   while ((iptr = *msg_nodes++))
2532     {
2533       dptr3 = dptr2;
2534       while (*iptr >= 0)
2535         {*dptr2++ = *(dptr1 + *iptr++);}
2536       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
2537       /* is msg_ids_out++ correct? */
2538       MPI_Isend(dptr3, *msg_size++, MPIU_SCALAR, *msg_list++,
2539                 MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);
2540     }
2541 
2542   /* process the received data */
2543   if (gs->max_left_over)
2544     {gs_gop_tree_min(gs,in_vals);}
2545 
2546   msg_nodes=nodes;
2547   while ((iptr = *nodes++))
2548     {
2549       /* Should I check the return value of MPI_Wait() or status? */
2550       /* Can this loop be replaced by a call to MPI_Waitall()? */
2551       MPI_Wait(ids_in++, &status);
2552       while (*iptr >= 0)
2553         {*(dptr1 + *iptr) = PetscMin(*(dptr1 + *iptr),*in2); iptr++; in2++;}
2554     }
2555 
2556   /* replace vals */
2557   while (*pw >= 0)
2558     {*(in_vals + *pw++) = *dptr1++;}
2559 
2560   /* clear isend message handles */
2561   /* This changed for clarity though it could be the same */
2562   while (*msg_nodes++)
2563     /* Should I check the return value of MPI_Wait() or status? */
2564     /* Can this loop be replaced by a call to MPI_Waitall()? */
2565     {MPI_Wait(ids_out++, &status);}
2566 }
2567 
2568 
2569 
2570 /******************************************************************************
2571 Function: gather_scatter
2572 
2573 Input :
2574 Output:
2575 Return:
2576 Description:
2577 ******************************************************************************/
2578 static
2579 void
2580 gs_gop_tree_min(gs_id *gs, PetscScalar *vals)
2581 {
2582   int size;
2583   int *in, *out;
2584   PetscScalar *buf, *work;
2585 
2586   in   = gs->tree_map_in;
2587   out  = gs->tree_map_out;
2588   buf  = gs->tree_buf;
2589   work = gs->tree_work;
2590   size = gs->tree_nel;
2591 
2592   rvec_set(buf,REAL_MAX,size);
2593 
2594   while (*in >= 0)
2595     {*(buf + *out++) = *(vals + *in++);}
2596 
2597   in   = gs->tree_map_in;
2598   out  = gs->tree_map_out;
2599   MPI_Allreduce(buf,work,size,MPIU_SCALAR,MPI_MIN,gs->gs_comm);
2600   while (*in >= 0)
2601     {*(vals + *in++) = *(work + *out++);}
2602 }
2603 
2604 
2605 
2606 /******************************************************************************
2607 Function: gather_scatter
2608 
2609 Input :
2610 Output:
2611 Return:
2612 Description:
2613 ******************************************************************************/
2614 static void
2615 gs_gop_times( gs_id *gs,  PetscScalar *vals)
2616 {
2617 
2618   /* local only operations!!! */
2619   if (gs->num_local)
2620     {gs_gop_local_times(gs,vals);}
2621 
2622   /* if intersection tree/pairwise and local isn't empty */
2623   if (gs->num_local_gop)
2624     {
2625       gs_gop_local_in_times(gs,vals);
2626 
2627       /* pairwise */
2628       if (gs->num_pairs)
2629         {gs_gop_pairwise_times(gs,vals);}
2630 
2631       /* tree */
2632       else if (gs->max_left_over)
2633         {gs_gop_tree_times(gs,vals);}
2634 
2635       gs_gop_local_out(gs,vals);
2636     }
2637   /* if intersection tree/pairwise and local is empty */
2638   else
2639     {
2640       /* pairwise */
2641       if (gs->num_pairs)
2642         {gs_gop_pairwise_times(gs,vals);}
2643 
2644       /* tree */
2645       else if (gs->max_left_over)
2646         {gs_gop_tree_times(gs,vals);}
2647     }
2648 }
2649 
2650 
2651 
2652 /******************************************************************************
2653 Function: gather_scatter
2654 
2655 Input :
2656 Output:
2657 Return:
2658 Description:
2659 ******************************************************************************/
2660 static
2661 void
2662 gs_gop_local_times( gs_id *gs,  PetscScalar *vals)
2663 {
2664    int *num, *map, **reduce;
2665    PetscScalar tmp;
2666 
2667   num    = gs->num_local_reduce;
2668   reduce = gs->local_reduce;
2669   while ((map = *reduce))
2670     {
2671       /* wall */
2672       if (*num == 2)
2673         {
2674           num ++; reduce++;
2675           vals[map[1]] = vals[map[0]] *= vals[map[1]];
2676         }
2677       /* corner shared by three elements */
2678       else if (*num == 3)
2679         {
2680           num ++; reduce++;
2681           vals[map[2]]=vals[map[1]]=vals[map[0]]*=(vals[map[1]]*vals[map[2]]);
2682         }
2683       /* corner shared by four elements */
2684       else if (*num == 4)
2685         {
2686           num ++; reduce++;
2687           vals[map[1]]=vals[map[2]]=vals[map[3]]=vals[map[0]] *=
2688                                  (vals[map[1]] * vals[map[2]] * vals[map[3]]);
2689         }
2690       /* general case ... odd geoms ... 3D*/
2691       else
2692         {
2693           num ++;
2694           tmp = 1.0;
2695           while (*map >= 0)
2696             {tmp *= *(vals + *map++);}
2697 
2698           map = *reduce++;
2699           while (*map >= 0)
2700             {*(vals + *map++) = tmp;}
2701         }
2702     }
2703 }
2704 
2705 
2706 
2707 /******************************************************************************
2708 Function: gather_scatter
2709 
2710 Input :
2711 Output:
2712 Return:
2713 Description:
2714 ******************************************************************************/
2715 static
2716 void
2717 gs_gop_local_in_times( gs_id *gs,  PetscScalar *vals)
2718 {
2719    int *num, *map, **reduce;
2720    PetscScalar *base;
2721 
2722   num    = gs->num_gop_local_reduce;
2723   reduce = gs->gop_local_reduce;
2724   while ((map = *reduce++))
2725     {
2726       /* wall */
2727       if (*num == 2)
2728         {
2729           num ++;
2730           vals[map[0]] *= vals[map[1]];
2731         }
2732       /* corner shared by three elements */
2733       else if (*num == 3)
2734         {
2735           num ++;
2736           vals[map[0]] *= (vals[map[1]] * vals[map[2]]);
2737         }
2738       /* corner shared by four elements */
2739       else if (*num == 4)
2740         {
2741           num ++;
2742           vals[map[0]] *= (vals[map[1]] * vals[map[2]] * vals[map[3]]);
2743         }
2744       /* general case ... odd geoms ... 3D*/
2745       else
2746         {
2747           num++;
2748           base = vals + *map++;
2749           while (*map >= 0)
2750             {*base *= *(vals + *map++);}
2751         }
2752     }
2753 }
2754 
2755 
2756 
2757 /******************************************************************************
2758 Function: gather_scatter
2759 
2760 VERSION 3 ::
2761 
2762 Input :
2763 Output:
2764 Return:
2765 Description:
2766 ******************************************************************************/
2767 static
2768 void
2769 gs_gop_pairwise_times( gs_id *gs,  PetscScalar *in_vals)
2770 {
2771    PetscScalar *dptr1, *dptr2, *dptr3, *in1, *in2;
2772    int *iptr, *msg_list, *msg_size, **msg_nodes;
2773    int *pw, *list, *size, **nodes;
2774   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
2775   MPI_Status status;
2776 
2777 
2778   /* strip and load s */
2779   msg_list =list         = gs->pair_list;
2780   msg_size =size         = gs->msg_sizes;
2781   msg_nodes=nodes        = gs->node_list;
2782   iptr=pw                = gs->pw_elm_list;
2783   dptr1=dptr3            = gs->pw_vals;
2784   msg_ids_in  = ids_in   = gs->msg_ids_in;
2785   msg_ids_out = ids_out  = gs->msg_ids_out;
2786   dptr2                  = gs->out;
2787   in1=in2                = gs->in;
2788 
2789   /* post the receives */
2790   /*  msg_nodes=nodes; */
2791   do
2792     {
2793       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
2794          second one *list and do list++ afterwards */
2795       MPI_Irecv(in1, *size, MPIU_SCALAR, MPI_ANY_SOURCE, MSGTAG1 + *list++,
2796                 gs->gs_comm, msg_ids_in++);
2797       in1 += *size++;
2798     }
2799   while (*++msg_nodes);
2800   msg_nodes=nodes;
2801 
2802   /* load gs values into in out gs buffers */
2803   while (*iptr >= 0)
2804     {*dptr3++ = *(in_vals + *iptr++);}
2805 
2806   /* load out buffers and post the sends */
2807   while ((iptr = *msg_nodes++))
2808     {
2809       dptr3 = dptr2;
2810       while (*iptr >= 0)
2811         {*dptr2++ = *(dptr1 + *iptr++);}
2812       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
2813       /* is msg_ids_out++ correct? */
2814       MPI_Isend(dptr3, *msg_size++, MPIU_SCALAR, *msg_list++,
2815                 MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);
2816     }
2817 
2818   if (gs->max_left_over)
2819     {gs_gop_tree_times(gs,in_vals);}
2820 
2821   /* process the received data */
2822   msg_nodes=nodes;
2823   while ((iptr = *nodes++))
2824     {
2825       /* Should I check the return value of MPI_Wait() or status? */
2826       /* Can this loop be replaced by a call to MPI_Waitall()? */
2827       MPI_Wait(ids_in++, &status);
2828       while (*iptr >= 0)
2829         {*(dptr1 + *iptr++) *= *in2++;}
2830     }
2831 
2832   /* replace vals */
2833   while (*pw >= 0)
2834     {*(in_vals + *pw++) = *dptr1++;}
2835 
2836   /* clear isend message handles */
2837   /* This changed for clarity though it could be the same */
2838   while (*msg_nodes++)
2839     /* Should I check the return value of MPI_Wait() or status? */
2840     /* Can this loop be replaced by a call to MPI_Waitall()? */
2841     {MPI_Wait(ids_out++, &status);}
2842 }
2843 
2844 
2845 
2846 /******************************************************************************
2847 Function: gather_scatter
2848 
2849 Input :
2850 Output:
2851 Return:
2852 Description:
2853 ******************************************************************************/
2854 static
2855 void
2856 gs_gop_tree_times(gs_id *gs, PetscScalar *vals)
2857 {
2858   int size;
2859   int *in, *out;
2860   PetscScalar *buf, *work;
2861 
2862   in   = gs->tree_map_in;
2863   out  = gs->tree_map_out;
2864   buf  = gs->tree_buf;
2865   work = gs->tree_work;
2866   size = gs->tree_nel;
2867 
2868   rvec_one(buf,size);
2869 
2870   while (*in >= 0)
2871     {*(buf + *out++) = *(vals + *in++);}
2872 
2873   in   = gs->tree_map_in;
2874   out  = gs->tree_map_out;
2875   MPI_Allreduce(buf,work,size,MPIU_SCALAR,MPI_PROD,gs->gs_comm);
2876   while (*in >= 0)
2877     {*(vals + *in++) = *(work + *out++);}
2878 
2879 }
2880 
2881 
2882 
2883 /******************************************************************************
2884 Function: gather_scatter
2885 
2886 
2887 Input :
2888 Output:
2889 Return:
2890 Description:
2891 ******************************************************************************/
2892 static void
2893 gs_gop_plus( gs_id *gs,  PetscScalar *vals)
2894 {
2895 
2896   /* local only operations!!! */
2897   if (gs->num_local)
2898     {gs_gop_local_plus(gs,vals);}
2899 
2900   /* if intersection tree/pairwise and local isn't empty */
2901   if (gs->num_local_gop)
2902     {
2903       gs_gop_local_in_plus(gs,vals);
2904 
2905       /* pairwise will NOT do tree inside ... */
2906       if (gs->num_pairs)
2907         {gs_gop_pairwise_plus(gs,vals);}
2908 
2909       /* tree */
2910       if (gs->max_left_over)
2911         {gs_gop_tree_plus(gs,vals);}
2912 
2913       gs_gop_local_out(gs,vals);
2914     }
2915   /* if intersection tree/pairwise and local is empty */
2916   else
2917     {
2918       /* pairwise will NOT do tree inside */
2919       if (gs->num_pairs)
2920         {gs_gop_pairwise_plus(gs,vals);}
2921 
2922       /* tree */
2923       if (gs->max_left_over)
2924         {gs_gop_tree_plus(gs,vals);}
2925     }
2926 
2927 }
2928 
2929 
2930 
2931 /******************************************************************************
2932 Function: gather_scatter
2933 
2934 Input :
2935 Output:
2936 Return:
2937 Description:
2938 ******************************************************************************/
2939 static
2940 void
2941 gs_gop_local_plus( gs_id *gs,  PetscScalar *vals)
2942 {
2943    int *num, *map, **reduce;
2944    PetscScalar tmp;
2945 
2946 
2947   num    = gs->num_local_reduce;
2948   reduce = gs->local_reduce;
2949   while ((map = *reduce))
2950     {
2951       /* wall */
2952       if (*num == 2)
2953         {
2954           num ++; reduce++;
2955           vals[map[1]] = vals[map[0]] += vals[map[1]];
2956         }
2957       /* corner shared by three elements */
2958       else if (*num == 3)
2959         {
2960           num ++; reduce++;
2961           vals[map[2]]=vals[map[1]]=vals[map[0]]+=(vals[map[1]]+vals[map[2]]);
2962         }
2963       /* corner shared by four elements */
2964       else if (*num == 4)
2965         {
2966           num ++; reduce++;
2967           vals[map[1]]=vals[map[2]]=vals[map[3]]=vals[map[0]] +=
2968                                  (vals[map[1]] + vals[map[2]] + vals[map[3]]);
2969         }
2970       /* general case ... odd geoms ... 3D*/
2971       else
2972         {
2973           num ++;
2974           tmp = 0.0;
2975           while (*map >= 0)
2976             {tmp += *(vals + *map++);}
2977 
2978           map = *reduce++;
2979           while (*map >= 0)
2980             {*(vals + *map++) = tmp;}
2981         }
2982     }
2983 }
2984 
2985 
2986 
2987 /******************************************************************************
2988 Function: gather_scatter
2989 
2990 Input :
2991 Output:
2992 Return:
2993 Description:
2994 ******************************************************************************/
2995 static
2996 void
2997 gs_gop_local_in_plus( gs_id *gs,  PetscScalar *vals)
2998 {
2999    int *num, *map, **reduce;
3000    PetscScalar *base;
3001 
3002 
3003   num    = gs->num_gop_local_reduce;
3004   reduce = gs->gop_local_reduce;
3005   while ((map = *reduce++))
3006     {
3007       /* wall */
3008       if (*num == 2)
3009         {
3010           num ++;
3011           vals[map[0]] += vals[map[1]];
3012         }
3013       /* corner shared by three elements */
3014       else if (*num == 3)
3015         {
3016           num ++;
3017           vals[map[0]] += (vals[map[1]] + vals[map[2]]);
3018         }
3019       /* corner shared by four elements */
3020       else if (*num == 4)
3021         {
3022           num ++;
3023           vals[map[0]] += (vals[map[1]] + vals[map[2]] + vals[map[3]]);
3024         }
3025       /* general case ... odd geoms ... 3D*/
3026       else
3027         {
3028           num++;
3029           base = vals + *map++;
3030           while (*map >= 0)
3031             {*base += *(vals + *map++);}
3032         }
3033     }
3034 }
3035 
3036 
3037 
3038 /******************************************************************************
3039 Function: gather_scatter
3040 
3041 VERSION 3 ::
3042 
3043 Input :
3044 Output:
3045 Return:
3046 Description:
3047 ******************************************************************************/
3048 static
3049 void
3050 gs_gop_pairwise_plus( gs_id *gs,  PetscScalar *in_vals)
3051 {
3052    PetscScalar *dptr1, *dptr2, *dptr3, *in1, *in2;
3053    int *iptr, *msg_list, *msg_size, **msg_nodes;
3054    int *pw, *list, *size, **nodes;
3055   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
3056   MPI_Status status;
3057 
3058 
3059   /* strip and load s */
3060   msg_list =list         = gs->pair_list;
3061   msg_size =size         = gs->msg_sizes;
3062   msg_nodes=nodes        = gs->node_list;
3063   iptr=pw                = gs->pw_elm_list;
3064   dptr1=dptr3            = gs->pw_vals;
3065   msg_ids_in  = ids_in   = gs->msg_ids_in;
3066   msg_ids_out = ids_out  = gs->msg_ids_out;
3067   dptr2                  = gs->out;
3068   in1=in2                = gs->in;
3069 
3070   /* post the receives */
3071   /*  msg_nodes=nodes; */
3072   do
3073     {
3074       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
3075          second one *list and do list++ afterwards */
3076       MPI_Irecv(in1, *size, MPIU_SCALAR, MPI_ANY_SOURCE, MSGTAG1 + *list++,
3077                 gs->gs_comm, msg_ids_in++);
3078       in1 += *size++;
3079     }
3080   while (*++msg_nodes);
3081   msg_nodes=nodes;
3082 
3083   /* load gs values into in out gs buffers */
3084   while (*iptr >= 0)
3085     {*dptr3++ = *(in_vals + *iptr++);}
3086 
3087   /* load out buffers and post the sends */
3088   while ((iptr = *msg_nodes++))
3089     {
3090       dptr3 = dptr2;
3091       while (*iptr >= 0)
3092         {*dptr2++ = *(dptr1 + *iptr++);}
3093       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
3094       /* is msg_ids_out++ correct? */
3095       MPI_Isend(dptr3, *msg_size++, MPIU_SCALAR, *msg_list++,
3096                 MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);
3097     }
3098 
3099   /* do the tree while we're waiting */
3100   if (gs->max_left_over)
3101     {gs_gop_tree_plus(gs,in_vals);}
3102 
3103   /* process the received data */
3104   msg_nodes=nodes;
3105   while ((iptr = *nodes++))
3106     {
3107       /* Should I check the return value of MPI_Wait() or status? */
3108       /* Can this loop be replaced by a call to MPI_Waitall()? */
3109       MPI_Wait(ids_in++, &status);
3110       while (*iptr >= 0)
3111         {*(dptr1 + *iptr++) += *in2++;}
3112     }
3113 
3114   /* replace vals */
3115   while (*pw >= 0)
3116     {*(in_vals + *pw++) = *dptr1++;}
3117 
3118   /* clear isend message handles */
3119   /* This changed for clarity though it could be the same */
3120   while (*msg_nodes++)
3121     /* Should I check the return value of MPI_Wait() or status? */
3122     /* Can this loop be replaced by a call to MPI_Waitall()? */
3123     {MPI_Wait(ids_out++, &status);}
3124 
3125 }
3126 
3127 
3128 
3129 /******************************************************************************
3130 Function: gather_scatter
3131 
3132 Input :
3133 Output:
3134 Return:
3135 Description:
3136 ******************************************************************************/
3137 static
3138 void
3139 gs_gop_tree_plus(gs_id *gs, PetscScalar *vals)
3140 {
3141   int size;
3142   int *in, *out;
3143   PetscScalar *buf, *work;
3144 
3145   in   = gs->tree_map_in;
3146   out  = gs->tree_map_out;
3147   buf  = gs->tree_buf;
3148   work = gs->tree_work;
3149   size = gs->tree_nel;
3150 
3151   rvec_zero(buf,size);
3152 
3153   while (*in >= 0)
3154     {*(buf + *out++) = *(vals + *in++);}
3155 
3156   in   = gs->tree_map_in;
3157   out  = gs->tree_map_out;
3158   MPI_Allreduce(buf,work,size,MPIU_SCALAR,MPI_SUM,gs->gs_comm);
3159   while (*in >= 0)
3160     {*(vals + *in++) = *(work + *out++);}
3161 
3162 }
3163 
3164 /******************************************************************************
3165 Function: gs_free()
3166 
3167 Input :
3168 
3169 Output:
3170 
3171 Return:
3172 
3173 Description:
3174   if (gs->sss) {free((void*) gs->sss);}
3175 ******************************************************************************/
3176 void
3177 gs_free( gs_id *gs)
3178 {
3179    int i;
3180 
3181 
3182   if (gs->nghs) {free((void*) gs->nghs);}
3183   if (gs->pw_nghs) {free((void*) gs->pw_nghs);}
3184 
3185   /* tree */
3186   if (gs->max_left_over)
3187     {
3188       if (gs->tree_elms) {free((void*) gs->tree_elms);}
3189       if (gs->tree_buf) {free((void*) gs->tree_buf);}
3190       if (gs->tree_work) {free((void*) gs->tree_work);}
3191       if (gs->tree_map_in) {free((void*) gs->tree_map_in);}
3192       if (gs->tree_map_out) {free((void*) gs->tree_map_out);}
3193     }
3194 
3195   /* pairwise info */
3196   if (gs->num_pairs)
3197     {
3198       /* should be NULL already */
3199       if (gs->ngh_buf) {free((void*) gs->ngh_buf);}
3200       if (gs->elms) {free((void*) gs->elms);}
3201       if (gs->local_elms) {free((void*) gs->local_elms);}
3202       if (gs->companion) {free((void*) gs->companion);}
3203 
3204       /* only set if pairwise */
3205       if (gs->vals) {free((void*) gs->vals);}
3206       if (gs->in) {free((void*) gs->in);}
3207       if (gs->out) {free((void*) gs->out);}
3208       if (gs->msg_ids_in) {free((void*) gs->msg_ids_in);}
3209       if (gs->msg_ids_out) {free((void*) gs->msg_ids_out);}
3210       if (gs->pw_vals) {free((void*) gs->pw_vals);}
3211       if (gs->pw_elm_list) {free((void*) gs->pw_elm_list);}
3212       if (gs->node_list)
3213         {
3214           for (i=0;i<gs->num_pairs;i++)
3215             {if (gs->node_list[i]) {free((void*) gs->node_list[i]);}}
3216           free((void*) gs->node_list);
3217         }
3218       if (gs->msg_sizes) {free((void*) gs->msg_sizes);}
3219       if (gs->pair_list) {free((void*) gs->pair_list);}
3220     }
3221 
3222   /* local info */
3223   if (gs->num_local_total>=0)
3224     {
3225       for (i=0;i<gs->num_local_total+1;i++)
3226         /*      for (i=0;i<gs->num_local_total;i++) */
3227         {
3228           if (gs->num_gop_local_reduce[i])
3229             {free((void*) gs->gop_local_reduce[i]);}
3230         }
3231     }
3232 
3233   /* if intersection tree/pairwise and local isn't empty */
3234   if (gs->gop_local_reduce) {free((void*) gs->gop_local_reduce);}
3235   if (gs->num_gop_local_reduce) {free((void*) gs->num_gop_local_reduce);}
3236 
3237   free((void*) gs);
3238 }
3239 
3240 
3241 
3242 
3243 
3244 
3245 /******************************************************************************
3246 Function: gather_scatter
3247 
3248 Input :
3249 Output:
3250 Return:
3251 Description:
3252 ******************************************************************************/
3253 void
3254 gs_gop_vec( gs_id *gs,  PetscScalar *vals,  const char *op,  int step)
3255 {
3256 
3257   switch (*op) {
3258   case '+':
3259     gs_gop_vec_plus(gs,vals,step);
3260     break;
3261   default:
3262     error_msg_warning("gs_gop_vec() :: %c is not a valid op",op[0]);
3263     error_msg_warning("gs_gop_vec() :: default :: plus");
3264     gs_gop_vec_plus(gs,vals,step);
3265     break;
3266   }
3267 }
3268 
3269 
3270 
3271 /******************************************************************************
3272 Function: gather_scatter
3273 
3274 Input :
3275 Output:
3276 Return:
3277 Description:
3278 ******************************************************************************/
3279 static void
3280 gs_gop_vec_plus( gs_id *gs,  PetscScalar *vals,  int step)
3281 {
3282   if (!gs) {error_msg_fatal("gs_gop_vec() passed NULL gs handle!!!");}
3283 
3284   /* local only operations!!! */
3285   if (gs->num_local)
3286     {gs_gop_vec_local_plus(gs,vals,step);}
3287 
3288   /* if intersection tree/pairwise and local isn't empty */
3289   if (gs->num_local_gop)
3290     {
3291       gs_gop_vec_local_in_plus(gs,vals,step);
3292 
3293       /* pairwise */
3294       if (gs->num_pairs)
3295         {gs_gop_vec_pairwise_plus(gs,vals,step);}
3296 
3297       /* tree */
3298       else if (gs->max_left_over)
3299         {gs_gop_vec_tree_plus(gs,vals,step);}
3300 
3301       gs_gop_vec_local_out(gs,vals,step);
3302     }
3303   /* if intersection tree/pairwise and local is empty */
3304   else
3305     {
3306       /* pairwise */
3307       if (gs->num_pairs)
3308         {gs_gop_vec_pairwise_plus(gs,vals,step);}
3309 
3310       /* tree */
3311       else if (gs->max_left_over)
3312         {gs_gop_vec_tree_plus(gs,vals,step);}
3313     }
3314 }
3315 
3316 
3317 
3318 /******************************************************************************
3319 Function: gather_scatter
3320 
3321 Input :
3322 Output:
3323 Return:
3324 Description:
3325 ******************************************************************************/
3326 static
3327 void
3328 gs_gop_vec_local_plus( gs_id *gs,  PetscScalar *vals,
3329                        int step)
3330 {
3331    int *num, *map, **reduce;
3332    PetscScalar *base;
3333 
3334 
3335   num    = gs->num_local_reduce;
3336   reduce = gs->local_reduce;
3337   while ((map = *reduce))
3338     {
3339       base = vals + map[0] * step;
3340 
3341       /* wall */
3342       if (*num == 2)
3343         {
3344           num++; reduce++;
3345           rvec_add (base,vals+map[1]*step,step);
3346           rvec_copy(vals+map[1]*step,base,step);
3347         }
3348       /* corner shared by three elements */
3349       else if (*num == 3)
3350         {
3351           num++; reduce++;
3352           rvec_add (base,vals+map[1]*step,step);
3353           rvec_add (base,vals+map[2]*step,step);
3354           rvec_copy(vals+map[2]*step,base,step);
3355           rvec_copy(vals+map[1]*step,base,step);
3356         }
3357       /* corner shared by four elements */
3358       else if (*num == 4)
3359         {
3360           num++; reduce++;
3361           rvec_add (base,vals+map[1]*step,step);
3362           rvec_add (base,vals+map[2]*step,step);
3363           rvec_add (base,vals+map[3]*step,step);
3364           rvec_copy(vals+map[3]*step,base,step);
3365           rvec_copy(vals+map[2]*step,base,step);
3366           rvec_copy(vals+map[1]*step,base,step);
3367         }
3368       /* general case ... odd geoms ... 3D */
3369       else
3370         {
3371           num++;
3372           while (*++map >= 0)
3373             {rvec_add (base,vals+*map*step,step);}
3374 
3375           map = *reduce;
3376           while (*++map >= 0)
3377             {rvec_copy(vals+*map*step,base,step);}
3378 
3379           reduce++;
3380         }
3381     }
3382 }
3383 
3384 
3385 
3386 /******************************************************************************
3387 Function: gather_scatter
3388 
3389 Input :
3390 Output:
3391 Return:
3392 Description:
3393 ******************************************************************************/
3394 static
3395 void
3396 gs_gop_vec_local_in_plus( gs_id *gs,  PetscScalar *vals,
3397                           int step)
3398 {
3399    int  *num, *map, **reduce;
3400    PetscScalar *base;
3401 
3402   num    = gs->num_gop_local_reduce;
3403   reduce = gs->gop_local_reduce;
3404   while ((map = *reduce++))
3405     {
3406       base = vals + map[0] * step;
3407 
3408       /* wall */
3409       if (*num == 2)
3410         {
3411           num ++;
3412           rvec_add(base,vals+map[1]*step,step);
3413         }
3414       /* corner shared by three elements */
3415       else if (*num == 3)
3416         {
3417           num ++;
3418           rvec_add(base,vals+map[1]*step,step);
3419           rvec_add(base,vals+map[2]*step,step);
3420         }
3421       /* corner shared by four elements */
3422       else if (*num == 4)
3423         {
3424           num ++;
3425           rvec_add(base,vals+map[1]*step,step);
3426           rvec_add(base,vals+map[2]*step,step);
3427           rvec_add(base,vals+map[3]*step,step);
3428         }
3429       /* general case ... odd geoms ... 3D*/
3430       else
3431         {
3432           num++;
3433           while (*++map >= 0)
3434             {rvec_add(base,vals+*map*step,step);}
3435         }
3436     }
3437 }
3438 
3439 
3440 /******************************************************************************
3441 Function: gather_scatter
3442 
3443 Input :
3444 Output:
3445 Return:
3446 Description:
3447 ******************************************************************************/
3448 static
3449 void
3450 gs_gop_vec_local_out( gs_id *gs,  PetscScalar *vals,
3451                       int step)
3452 {
3453    int *num, *map, **reduce;
3454    PetscScalar *base;
3455 
3456 
3457   num    = gs->num_gop_local_reduce;
3458   reduce = gs->gop_local_reduce;
3459   while ((map = *reduce++))
3460     {
3461       base = vals + map[0] * step;
3462 
3463       /* wall */
3464       if (*num == 2)
3465         {
3466           num ++;
3467           rvec_copy(vals+map[1]*step,base,step);
3468         }
3469       /* corner shared by three elements */
3470       else if (*num == 3)
3471         {
3472           num ++;
3473           rvec_copy(vals+map[1]*step,base,step);
3474           rvec_copy(vals+map[2]*step,base,step);
3475         }
3476       /* corner shared by four elements */
3477       else if (*num == 4)
3478         {
3479           num ++;
3480           rvec_copy(vals+map[1]*step,base,step);
3481           rvec_copy(vals+map[2]*step,base,step);
3482           rvec_copy(vals+map[3]*step,base,step);
3483         }
3484       /* general case ... odd geoms ... 3D*/
3485       else
3486         {
3487           num++;
3488           while (*++map >= 0)
3489             {rvec_copy(vals+*map*step,base,step);}
3490         }
3491     }
3492 }
3493 
3494 
3495 
3496 /******************************************************************************
3497 Function: gather_scatter
3498 
3499 VERSION 3 ::
3500 
3501 Input :
3502 Output:
3503 Return:
3504 Description:
3505 ******************************************************************************/
3506 static
3507 void
3508 gs_gop_vec_pairwise_plus( gs_id *gs,  PetscScalar *in_vals,
3509                           int step)
3510 {
3511    PetscScalar *dptr1, *dptr2, *dptr3, *in1, *in2;
3512    int *iptr, *msg_list, *msg_size, **msg_nodes;
3513    int *pw, *list, *size, **nodes;
3514   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
3515   MPI_Status status;
3516   PetscBLASInt i1;
3517 
3518 
3519   /* strip and load s */
3520   msg_list =list         = gs->pair_list;
3521   msg_size =size         = gs->msg_sizes;
3522   msg_nodes=nodes        = gs->node_list;
3523   iptr=pw                = gs->pw_elm_list;
3524   dptr1=dptr3            = gs->pw_vals;
3525   msg_ids_in  = ids_in   = gs->msg_ids_in;
3526   msg_ids_out = ids_out  = gs->msg_ids_out;
3527   dptr2                  = gs->out;
3528   in1=in2                = gs->in;
3529 
3530   /* post the receives */
3531   /*  msg_nodes=nodes; */
3532   do
3533     {
3534       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
3535          second one *list and do list++ afterwards */
3536       MPI_Irecv(in1, *size *step, MPIU_SCALAR, MPI_ANY_SOURCE, MSGTAG1 + *list++,
3537                 gs->gs_comm, msg_ids_in++);
3538       in1 += *size++ *step;
3539     }
3540   while (*++msg_nodes);
3541   msg_nodes=nodes;
3542 
3543   /* load gs values into in out gs buffers */
3544   while (*iptr >= 0)
3545     {
3546       rvec_copy(dptr3,in_vals + *iptr*step,step);
3547       dptr3+=step;
3548       iptr++;
3549     }
3550 
3551   /* load out buffers and post the sends */
3552   while ((iptr = *msg_nodes++))
3553     {
3554       dptr3 = dptr2;
3555       while (*iptr >= 0)
3556         {
3557           rvec_copy(dptr2,dptr1 + *iptr*step,step);
3558           dptr2+=step;
3559           iptr++;
3560         }
3561       MPI_Isend(dptr3, *msg_size++ *step, MPIU_SCALAR, *msg_list++,
3562                 MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);
3563     }
3564 
3565   /* tree */
3566   if (gs->max_left_over)
3567     {gs_gop_vec_tree_plus(gs,in_vals,step);}
3568 
3569   /* process the received data */
3570   msg_nodes=nodes;
3571   while ((iptr = *nodes++)){
3572     PetscScalar d1 = 1.0;
3573       /* Should I check the return value of MPI_Wait() or status? */
3574       /* Can this loop be replaced by a call to MPI_Waitall()? */
3575       MPI_Wait(ids_in++, &status);
3576       while (*iptr >= 0) {
3577           BLASaxpy_(&step,&d1,in2,&i1,dptr1 + *iptr*step,&i1);
3578           in2+=step;
3579           iptr++;
3580       }
3581   }
3582 
3583   /* replace vals */
3584   while (*pw >= 0)
3585     {
3586       rvec_copy(in_vals + *pw*step,dptr1,step);
3587       dptr1+=step;
3588       pw++;
3589     }
3590 
3591   /* clear isend message handles */
3592   /* This changed for clarity though it could be the same */
3593   while (*msg_nodes++)
3594     /* Should I check the return value of MPI_Wait() or status? */
3595     /* Can this loop be replaced by a call to MPI_Waitall()? */
3596     {MPI_Wait(ids_out++, &status);}
3597 
3598 
3599 }
3600 
3601 
3602 
3603 /******************************************************************************
3604 Function: gather_scatter
3605 
3606 Input :
3607 Output:
3608 Return:
3609 Description:
3610 ******************************************************************************/
3611 static
3612 void
3613 gs_gop_vec_tree_plus( gs_id *gs,  PetscScalar *vals,  int step)
3614 {
3615   int size, *in, *out;
3616   PetscScalar *buf, *work;
3617   int op[] = {GL_ADD,0};
3618   PetscBLASInt i1 = 1;
3619 
3620 
3621   /* copy over to local variables */
3622   in   = gs->tree_map_in;
3623   out  = gs->tree_map_out;
3624   buf  = gs->tree_buf;
3625   work = gs->tree_work;
3626   size = gs->tree_nel*step;
3627 
3628   /* zero out collection buffer */
3629   rvec_zero(buf,size);
3630 
3631 
3632   /* copy over my contributions */
3633   while (*in >= 0)
3634     {
3635       BLAScopy_(&step,vals + *in++*step,&i1,buf + *out++*step,&i1);
3636     }
3637 
3638   /* perform fan in/out on full buffer */
3639   /* must change grop to handle the blas */
3640   grop(buf,work,size,op);
3641 
3642   /* reset */
3643   in   = gs->tree_map_in;
3644   out  = gs->tree_map_out;
3645 
3646   /* get the portion of the results I need */
3647   while (*in >= 0)
3648     {
3649       BLAScopy_(&step,buf + *out++*step,&i1,vals + *in++*step,&i1);
3650     }
3651 
3652 }
3653 
3654 
3655 
3656 /******************************************************************************
3657 Function: gather_scatter
3658 
3659 Input :
3660 Output:
3661 Return:
3662 Description:
3663 ******************************************************************************/
3664 void
3665 gs_gop_hc( gs_id *gs,  PetscScalar *vals,  const char *op,  int dim)
3666 {
3667 
3668   switch (*op) {
3669   case '+':
3670     gs_gop_plus_hc(gs,vals,dim);
3671     break;
3672   default:
3673     error_msg_warning("gs_gop_hc() :: %c is not a valid op",op[0]);
3674     error_msg_warning("gs_gop_hc() :: default :: plus\n");
3675     gs_gop_plus_hc(gs,vals,dim);
3676     break;
3677   }
3678 }
3679 
3680 
3681 
3682 /******************************************************************************
3683 Function: gather_scatter
3684 
3685 Input :
3686 Output:
3687 Return:
3688 Description:
3689 ******************************************************************************/
3690 static void
3691 gs_gop_plus_hc( gs_id *gs,  PetscScalar *vals, int dim)
3692 {
3693   /* if there's nothing to do return */
3694   if (dim<=0)
3695     {return;}
3696 
3697   /* can't do more dimensions then exist */
3698   dim = PetscMin(dim,i_log2_num_nodes);
3699 
3700   /* local only operations!!! */
3701   if (gs->num_local)
3702     {gs_gop_local_plus(gs,vals);}
3703 
3704   /* if intersection tree/pairwise and local isn't empty */
3705   if (gs->num_local_gop)
3706     {
3707       gs_gop_local_in_plus(gs,vals);
3708 
3709       /* pairwise will do tree inside ... */
3710       if (gs->num_pairs)
3711         {gs_gop_pairwise_plus_hc(gs,vals,dim);}
3712 
3713       /* tree only */
3714       else if (gs->max_left_over)
3715         {gs_gop_tree_plus_hc(gs,vals,dim);}
3716 
3717       gs_gop_local_out(gs,vals);
3718     }
3719   /* if intersection tree/pairwise and local is empty */
3720   else
3721     {
3722       /* pairwise will do tree inside */
3723       if (gs->num_pairs)
3724         {gs_gop_pairwise_plus_hc(gs,vals,dim);}
3725 
3726       /* tree */
3727       else if (gs->max_left_over)
3728         {gs_gop_tree_plus_hc(gs,vals,dim);}
3729     }
3730 
3731 }
3732 
3733 
3734 /******************************************************************************
3735 VERSION 3 ::
3736 
3737 Input :
3738 Output:
3739 Return:
3740 Description:
3741 ******************************************************************************/
3742 static
3743 void
3744 gs_gop_pairwise_plus_hc( gs_id *gs,  PetscScalar *in_vals, int dim)
3745 {
3746    PetscScalar *dptr1, *dptr2, *dptr3, *in1, *in2;
3747    int *iptr, *msg_list, *msg_size, **msg_nodes;
3748    int *pw, *list, *size, **nodes;
3749   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
3750   MPI_Status status;
3751   int i, mask=1;
3752 
3753   for (i=1; i<dim; i++)
3754     {mask<<=1; mask++;}
3755 
3756 
3757   /* strip and load s */
3758   msg_list =list         = gs->pair_list;
3759   msg_size =size         = gs->msg_sizes;
3760   msg_nodes=nodes        = gs->node_list;
3761   iptr=pw                = gs->pw_elm_list;
3762   dptr1=dptr3            = gs->pw_vals;
3763   msg_ids_in  = ids_in   = gs->msg_ids_in;
3764   msg_ids_out = ids_out  = gs->msg_ids_out;
3765   dptr2                  = gs->out;
3766   in1=in2                = gs->in;
3767 
3768   /* post the receives */
3769   /*  msg_nodes=nodes; */
3770   do
3771     {
3772       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
3773          second one *list and do list++ afterwards */
3774       if ((my_id|mask)==(*list|mask))
3775         {
3776           MPI_Irecv(in1, *size, MPIU_SCALAR, MPI_ANY_SOURCE, MSGTAG1 + *list++,
3777                     gs->gs_comm, msg_ids_in++);
3778           in1 += *size++;
3779         }
3780       else
3781         {list++; size++;}
3782     }
3783   while (*++msg_nodes);
3784 
3785   /* load gs values into in out gs buffers */
3786   while (*iptr >= 0)
3787     {*dptr3++ = *(in_vals + *iptr++);}
3788 
3789   /* load out buffers and post the sends */
3790   msg_nodes=nodes;
3791   list = msg_list;
3792   while ((iptr = *msg_nodes++))
3793     {
3794       if ((my_id|mask)==(*list|mask))
3795         {
3796           dptr3 = dptr2;
3797           while (*iptr >= 0)
3798             {*dptr2++ = *(dptr1 + *iptr++);}
3799           /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
3800           /* is msg_ids_out++ correct? */
3801           MPI_Isend(dptr3, *msg_size++, MPIU_SCALAR, *list++,
3802                     MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);
3803         }
3804       else
3805         {list++; msg_size++;}
3806     }
3807 
3808   /* do the tree while we're waiting */
3809   if (gs->max_left_over)
3810     {gs_gop_tree_plus_hc(gs,in_vals,dim);}
3811 
3812   /* process the received data */
3813   msg_nodes=nodes;
3814   list = msg_list;
3815   while ((iptr = *nodes++))
3816     {
3817       if ((my_id|mask)==(*list|mask))
3818         {
3819           /* Should I check the return value of MPI_Wait() or status? */
3820           /* Can this loop be replaced by a call to MPI_Waitall()? */
3821           MPI_Wait(ids_in++, &status);
3822           while (*iptr >= 0)
3823             {*(dptr1 + *iptr++) += *in2++;}
3824         }
3825       list++;
3826     }
3827 
3828   /* replace vals */
3829   while (*pw >= 0)
3830     {*(in_vals + *pw++) = *dptr1++;}
3831 
3832   /* clear isend message handles */
3833   /* This changed for clarity though it could be the same */
3834   while (*msg_nodes++)
3835     {
3836       if ((my_id|mask)==(*msg_list|mask))
3837         {
3838           /* Should I check the return value of MPI_Wait() or status? */
3839           /* Can this loop be replaced by a call to MPI_Waitall()? */
3840           MPI_Wait(ids_out++, &status);
3841         }
3842       msg_list++;
3843     }
3844 
3845 
3846 }
3847 
3848 
3849 
3850 /******************************************************************************
3851 Function: gather_scatter
3852 
3853 Input :
3854 Output:
3855 Return:
3856 Description:
3857 ******************************************************************************/
3858 static
3859 void
3860 gs_gop_tree_plus_hc(gs_id *gs, PetscScalar *vals, int dim)
3861 {
3862   int size;
3863   int *in, *out;
3864   PetscScalar *buf, *work;
3865   int op[] = {GL_ADD,0};
3866 
3867   in   = gs->tree_map_in;
3868   out  = gs->tree_map_out;
3869   buf  = gs->tree_buf;
3870   work = gs->tree_work;
3871   size = gs->tree_nel;
3872 
3873   rvec_zero(buf,size);
3874 
3875   while (*in >= 0)
3876     {*(buf + *out++) = *(vals + *in++);}
3877 
3878   in   = gs->tree_map_in;
3879   out  = gs->tree_map_out;
3880 
3881   grop_hc(buf,work,size,op,dim);
3882 
3883   while (*in >= 0)
3884     {*(vals + *in++) = *(buf + *out++);}
3885 
3886 }
3887 
3888 
3889 
3890