xref: /petsc/src/ksp/pc/impls/tfs/gs.c (revision 8bc8193efbc389280f83b3d41dffa9e2d23e2ace)
1 
2 /***********************************gs.c***************************************
3 SPARSE GATHER-SCATTER PACKAGE: bss_malloc bss_malloc ivec error comm gs queue
4 
5 Author: Henry M. Tufo III
6 
7 e-mail: hmt@cs.brown.edu
8 
9 snail-mail:
10 Division of Applied Mathematics
11 Brown University
12 Providence, RI 02912
13 
14 Last Modification:
15 6.21.97
16 ************************************gs.c**************************************/
17 
18 /***********************************gs.c***************************************
19 File Description:
20 -----------------
21 
22 ************************************gs.c**************************************/
23 
24 #include "petsc.h"
25 #if defined(PETSC_HAVE_STRINGS_H)
26 #include <strings.h>
27 #endif
28 #if defined(PETSC_HAVE_STRING_H)
29 #include <string.h>
30 #endif
31 
32 #include <float.h>
33 #include <limits.h>
34 
35 #include "const.h"
36 #include "types.h"
37 #include "comm.h"
38 #include "ivec.h"
39 #include "bss_malloc.h"
40 #include "bit_mask.h"
41 #include "error.h"
42 #include "queue.h"
43 #include "blas.h"
44 #include "gs.h"
45 
46 /* default length of number of items via tree - doubles if exceeded */
47 #define TREE_BUF_SZ 2048;
48 #define GS_VEC_SZ   1
49 
50 
51 
52 /***********************************gs.c***************************************
53 Type: struct gather_scatter_id
54 ------------------------------
55 
56 ************************************gs.c**************************************/
57 typedef struct gather_scatter_id {
58   int id;
59   int nel_min;
60   int nel_max;
61   int nel_sum;
62   int negl;
63   int gl_max;
64   int gl_min;
65   int repeats;
66   int ordered;
67   int positive;
68   REAL *vals;
69 
70   /* bit mask info */
71   int *my_proc_mask;
72   int mask_sz;
73   int *ngh_buf;
74   int ngh_buf_sz;
75   int *nghs;
76   int num_nghs;
77   int max_nghs;
78   int *pw_nghs;
79   int num_pw_nghs;
80   int *tree_nghs;
81   int num_tree_nghs;
82 
83   int num_loads;
84 
85   /* repeats == true -> local info */
86   int nel;         /* number of unique elememts */
87   int *elms;       /* of size nel */
88   int nel_total;
89   int *local_elms; /* of size nel_total */
90   int *companion;  /* of size nel_total */
91 
92   /* local info */
93   int num_local_total;
94   int local_strength;
95   int num_local;
96   int *num_local_reduce;
97   int **local_reduce;
98   int num_local_gop;
99   int *num_gop_local_reduce;
100   int **gop_local_reduce;
101 
102   /* pairwise info */
103   int level;
104   int num_pairs;
105   int max_pairs;
106   int loc_node_pairs;
107   int max_node_pairs;
108   int min_node_pairs;
109   int avg_node_pairs;
110   int *pair_list;
111   int *msg_sizes;
112   int **node_list;
113   int len_pw_list;
114   int *pw_elm_list;
115   REAL *pw_vals;
116 
117   MPI_Request *msg_ids_in;
118   MPI_Request *msg_ids_out;
119 
120   REAL *out;
121   REAL *in;
122   int msg_total;
123 
124   /* tree - crystal accumulator info */
125   int max_left_over;
126   int *pre;
127   int *in_num;
128   int *out_num;
129   int **in_list;
130   int **out_list;
131 
132   /* new tree work*/
133   int  tree_nel;
134   int *tree_elms;
135   REAL *tree_buf;
136   REAL *tree_work;
137 
138   int  tree_map_sz;
139   int *tree_map_in;
140   int *tree_map_out;
141 
142   /* current memory status */
143   int gl_bss_min;
144   int gl_perm_min;
145 
146   /* max segment size for gs_gop_vec() */
147   int vec_sz;
148 
149   /* hack to make paul happy */
150   MPI_Comm gs_comm;
151 
152 } gs_id;
153 
154 
155 /* to be made public */
156 #if defined(not_used)
157 static PetscErrorCode  gs_dump_ngh(gs_id *id, int loc_num, int *num, int *ngh_list);
158 static void gsi_via_int_list(gs_id *gs);
159 static PetscErrorCode in_sub_tree(int *ptr3, int p_mask_size, int *buf2, int buf_size);
160 #endif
161 
162 /* PRIVATE - and definitely not exported */
163 /*static void gs_print_template(register gs_id* gs, int who);*/
164 /*static void gs_print_stemplate(register gs_id* gs, int who);*/
165 
166 static gs_id *gsi_check_args(int *elms, int nel, int level);
167 static void gsi_via_bit_mask(gs_id *gs);
168 static void get_ngh_buf(gs_id *gs);
169 static void set_pairwise(gs_id *gs);
170 static gs_id * gsi_new(void);
171 static void set_tree(gs_id *gs);
172 
173 /* same for all but vector flavor */
174 static void gs_gop_local_out(gs_id *gs, REAL *vals);
175 /* vector flavor */
176 static void gs_gop_vec_local_out(gs_id *gs, REAL *vals, int step);
177 
178 static void gs_gop_vec_plus(gs_id *gs, REAL *in_vals, int step);
179 static void gs_gop_vec_pairwise_plus(gs_id *gs, REAL *in_vals, int step);
180 static void gs_gop_vec_local_plus(gs_id *gs, REAL *vals, int step);
181 static void gs_gop_vec_local_in_plus(gs_id *gs, REAL *vals, int step);
182 static void gs_gop_vec_tree_plus(gs_id *gs, REAL *vals, int step);
183 
184 
185 static void gs_gop_plus(gs_id *gs, REAL *in_vals);
186 static void gs_gop_pairwise_plus(gs_id *gs, REAL *in_vals);
187 static void gs_gop_local_plus(gs_id *gs, REAL *vals);
188 static void gs_gop_local_in_plus(gs_id *gs, REAL *vals);
189 static void gs_gop_tree_plus(gs_id *gs, REAL *vals);
190 
191 static void gs_gop_plus_hc(gs_id *gs, REAL *in_vals, int dim);
192 static void gs_gop_pairwise_plus_hc(gs_id *gs, REAL *in_vals, int dim);
193 static void gs_gop_tree_plus_hc(gs_id *gs, REAL *vals, int dim);
194 
195 static void gs_gop_times(gs_id *gs, REAL *in_vals);
196 static void gs_gop_pairwise_times(gs_id *gs, REAL *in_vals);
197 static void gs_gop_local_times(gs_id *gs, REAL *vals);
198 static void gs_gop_local_in_times(gs_id *gs, REAL *vals);
199 static void gs_gop_tree_times(gs_id *gs, REAL *vals);
200 
201 static void gs_gop_min(gs_id *gs, REAL *in_vals);
202 static void gs_gop_pairwise_min(gs_id *gs, REAL *in_vals);
203 static void gs_gop_local_min(gs_id *gs, REAL *vals);
204 static void gs_gop_local_in_min(gs_id *gs, REAL *vals);
205 static void gs_gop_tree_min(gs_id *gs, REAL *vals);
206 
207 static void gs_gop_min_abs(gs_id *gs, REAL *in_vals);
208 static void gs_gop_pairwise_min_abs(gs_id *gs, REAL *in_vals);
209 static void gs_gop_local_min_abs(gs_id *gs, REAL *vals);
210 static void gs_gop_local_in_min_abs(gs_id *gs, REAL *vals);
211 static void gs_gop_tree_min_abs(gs_id *gs, REAL *vals);
212 
213 static void gs_gop_max(gs_id *gs, REAL *in_vals);
214 static void gs_gop_pairwise_max(gs_id *gs, REAL *in_vals);
215 static void gs_gop_local_max(gs_id *gs, REAL *vals);
216 static void gs_gop_local_in_max(gs_id *gs, REAL *vals);
217 static void gs_gop_tree_max(gs_id *gs, REAL *vals);
218 
219 static void gs_gop_max_abs(gs_id *gs, REAL *in_vals);
220 static void gs_gop_pairwise_max_abs(gs_id *gs, REAL *in_vals);
221 static void gs_gop_local_max_abs(gs_id *gs, REAL *vals);
222 static void gs_gop_local_in_max_abs(gs_id *gs, REAL *vals);
223 static void gs_gop_tree_max_abs(gs_id *gs, REAL *vals);
224 
225 static void gs_gop_exists(gs_id *gs, REAL *in_vals);
226 static void gs_gop_pairwise_exists(gs_id *gs, REAL *in_vals);
227 static void gs_gop_local_exists(gs_id *gs, REAL *vals);
228 static void gs_gop_local_in_exists(gs_id *gs, REAL *vals);
229 static void gs_gop_tree_exists(gs_id *gs, REAL *vals);
230 
231 static void gs_gop_pairwise_binary(gs_id *gs, REAL *in_vals, rbfp fct);
232 static void gs_gop_local_binary(gs_id *gs, REAL *vals, rbfp fct);
233 static void gs_gop_local_in_binary(gs_id *gs, REAL *vals, rbfp fct);
234 static void gs_gop_tree_binary(gs_id *gs, REAL *vals, rbfp fct);
235 
236 
237 
238 /* global vars */
239 /* from comm.c module */
240 
241 /* module state inf and fortran interface */
242 static int num_gs_ids = 0;
243 
244 /* should make this dynamic ... later */
245 /*static queue_ADT elms_q, mask_q;*/
246 static int msg_buf=MAX_MSG_BUF;
247 /*static int msg_ch=FALSE;*/
248 
249 static int vec_sz=GS_VEC_SZ;
250 /*static int vec_ch=FALSE; */
251 
252 static int *tree_buf=NULL;
253 static int tree_buf_sz=0;
254 static int ntree=0;
255 
256 
257 /******************************************************************************
258 Function: gs_init_()
259 
260 Input :
261 Output:
262 Return:
263 Description:
264 ******************************************************************************/
265 void gs_init_vec_sz(int size)
266 {
267   /*  vec_ch = TRUE; */
268 
269   vec_sz = size;
270 }
271 
272 /******************************************************************************
273 Function: gs_init_()
274 
275 Input :
276 Output:
277 Return:
278 Description:
279 ******************************************************************************/
280 void gs_init_msg_buf_sz(int buf_size)
281 {
282   /*  msg_ch = TRUE; */
283 
284   msg_buf = buf_size;
285 }
286 
287 /******************************************************************************
288 Function: gs_init()
289 
290 Input :
291 
292 Output:
293 
294 RETURN:
295 
296 Description:
297 ******************************************************************************/
298 gs_id *
299 gs_init(register int *elms, int nel, int level)
300 {
301   register gs_id *gs;
302 #ifdef INFO1
303   int i;
304 #endif
305   MPI_Group gs_group;
306   MPI_Comm  gs_comm;
307 
308 
309   bss_init();
310   perm_init();
311 
312 #ifdef DEBUG
313   error_msg_warning("gs_init() start w/(%d,%d)\n",my_id,num_nodes);
314 #endif
315 
316   /* ensure that communication package has been initialized */
317   comm_init();
318 
319 #ifdef INFO1
320   bss_stats();
321   perm_stats();
322 #endif
323 
324   /* determines if we have enough dynamic/semi-static memory */
325   /* checks input, allocs and sets gd_id template            */
326   gs = gsi_check_args(elms,nel,level);
327 
328   /* only bit mask version up and working for the moment    */
329   /* LATER :: get int list version working for sparse pblms */
330   gsi_via_bit_mask(gs);
331 
332 #ifdef INFO1
333   /* print out P0's template as well as malloc stats */
334   gs_print_template(gs,0);
335 
336   MPI_Barrier(MPI_COMM_WORLD);
337 
338   for (i=1;i<num_nodes;i++)
339     {
340       gs_print_stemplate(gs,i);
341       MPI_Barrier(MPI_COMM_WORLD);
342     }
343 #endif
344 
345 #ifdef DEBUG
346   error_msg_warning("gs_init() end w/(%d,%d)\n",my_id,num_nodes);
347 #endif
348 
349 #ifdef INFO
350   bss_stats();
351   perm_stats();
352 #endif
353 
354   MPI_Comm_group(MPI_COMM_WORLD,&gs_group);
355   MPI_Comm_create(MPI_COMM_WORLD,gs_group,&gs_comm);
356   gs->gs_comm=gs_comm;
357 
358   return(gs);
359 }
360 
361 
362 
363 /******************************************************************************
364 Function: gsi_new()
365 
366 Input :
367 Output:
368 Return:
369 Description:
370 
371 elm list must >= 0!!!
372 elm repeats allowed
373 ******************************************************************************/
374 static
375 gs_id *
376 gsi_new(void)
377 {
378   int size=sizeof(gs_id);
379   gs_id *gs;
380 
381 
382 #ifdef DEBUG
383   error_msg_warning("gsi_new() :: size=%D\n",size);
384 #endif
385 
386   gs = (gs_id *) perm_malloc(size);
387 
388   if (!(size%REAL_LEN))
389     {rvec_zero((REAL *)gs,size/REAL_LEN);}
390   else if (!(size%INT_LEN))
391     {ivec_zero((INT*)gs,size/INT_LEN);}
392   else
393     {memset((char *)gs,0,size/sizeof(char));}
394 
395   return(gs);
396 }
397 
398 
399 
400 /******************************************************************************
401 Function: gsi_check_args()
402 
403 Input :
404 Output:
405 Return:
406 Description:
407 
408 elm list must >= 0!!!
409 elm repeats allowed
410 local working copy of elms is sorted
411 ******************************************************************************/
412 static
413 gs_id *
414 gsi_check_args(int *in_elms, int nel, int level)
415 {
416   register int i, j, k, t2;
417   int *companion, *elms, *unique, *iptr;
418   int num_local=0, *num_to_reduce, **local_reduce;
419   int oprs[] = {NON_UNIFORM,GL_MIN,GL_MAX,GL_ADD,GL_MIN,GL_MAX,GL_MIN,GL_B_AND};
420   int vals[sizeof(oprs)/sizeof(oprs[0])-1];
421   int work[sizeof(oprs)/sizeof(oprs[0])-1];
422   gs_id *gs;
423 
424 
425 #ifdef DEBUG
426   error_msg_warning("gs_check_args() begin w/(%d,%d)\n",my_id,num_nodes);
427 #endif
428 
429 #ifdef SAFE
430   if (!in_elms)
431     {error_msg_fatal("elms point to nothing!!!\n");}
432 
433   if (nel<0)
434     {error_msg_fatal("can't have fewer than 0 elms!!!\n");}
435 
436   if (nel==0)
437     {error_msg_warning("I don't have any elements!!!\n");}
438 #endif
439 
440   /* get space for gs template */
441   gs = gsi_new();
442   gs->id = ++num_gs_ids;
443 
444   /* hmt 6.4.99                                            */
445   /* caller can set global ids that don't participate to 0 */
446   /* gs_init ignores all zeros in elm list                 */
447   /* negative global ids are still invalid                 */
448   for (i=j=0;i<nel;i++)
449     {if (in_elms[i]!=0) {j++;}}
450 
451   k=nel; nel=j;
452 
453   /* copy over in_elms list and create inverse map */
454   elms = (int*) bss_malloc((nel+1)*INT_LEN);
455   companion = (int*) bss_malloc(nel*INT_LEN);
456   /* ivec_c_index(companion,nel); */
457   /* ivec_copy(elms,in_elms,nel); */
458   for (i=j=0;i<k;i++)
459     {
460       if (in_elms[i]!=0)
461 	{elms[j] = in_elms[i]; companion[j++] = i;}
462     }
463 
464   if (j!=nel)
465     {error_msg_fatal("nel j mismatch!\n");}
466 
467 #ifdef SAFE
468   /* pre-pass ... check to see if sorted */
469   elms[nel] = INT_MAX;
470   iptr = elms;
471   unique = elms+1;
472   j=0;
473   while (*iptr!=INT_MAX)
474     {
475       if (*iptr++>*unique++)
476 	{j=1; break;}
477     }
478 
479   /* set up inverse map */
480   if (j)
481     {
482       error_msg_warning("gsi_check_args() :: elm list *not* sorted!\n");
483       SMI_sort((void*)elms, (void*)companion, nel, SORT_INTEGER);
484     }
485   else
486     {error_msg_warning("gsi_check_args() :: elm list sorted!\n");}
487 #else
488   SMI_sort((void*)elms, (void*)companion, nel, SORT_INTEGER);
489 #endif
490   elms[nel] = INT_MIN;
491 
492   /* first pass */
493   /* determine number of unique elements, check pd */
494   for (i=k=0;i<nel;i+=j)
495     {
496       t2 = elms[i];
497       j=++i;
498 
499       /* clump 'em for now */
500       while (elms[j]==t2) {j++;}
501 
502       /* how many together and num local */
503       if (j-=i)
504 	{num_local++; k+=j;}
505     }
506 
507   /* how many unique elements? */
508   gs->repeats=k;
509   gs->nel = nel-k;
510 
511 
512   /* number of repeats? */
513   gs->num_local = num_local;
514   num_local+=2;
515   gs->local_reduce=local_reduce=(int **)perm_malloc(num_local*INT_PTR_LEN);
516   gs->num_local_reduce=num_to_reduce=(int*) perm_malloc(num_local*INT_LEN);
517 
518   unique = (int*) bss_malloc((gs->nel+1)*INT_LEN);
519   gs->elms = unique;
520   gs->nel_total = nel;
521   gs->local_elms = elms;
522   gs->companion = companion;
523 
524   /* compess map as well as keep track of local ops */
525   for (num_local=i=j=0;i<gs->nel;i++)
526     {
527       k=j;
528       t2 = unique[i] = elms[j];
529       companion[i] = companion[j];
530 
531       while (elms[j]==t2) {j++;}
532 
533       if ((t2=(j-k))>1)
534 	{
535 	  /* number together */
536 	  num_to_reduce[num_local] = t2++;
537 	  iptr = local_reduce[num_local++] = (int*)perm_malloc(t2*INT_LEN);
538 
539 	  /* to use binary searching don't remap until we check intersection */
540 	  *iptr++ = i;
541 
542 	  /* note that we're skipping the first one */
543 	  while (++k<j)
544 	    {*(iptr++) = companion[k];}
545 	  *iptr = -1;
546 	}
547     }
548 
549   /* sentinel for ngh_buf */
550   unique[gs->nel]=INT_MAX;
551 
552 #ifdef DEBUG
553   if (num_local!=gs->num_local)
554     {error_msg_fatal("compression of maps wrong!!!\n");}
555 #endif
556 
557   /* for two partition sort hack */
558   num_to_reduce[num_local] = 0;
559   local_reduce[num_local] = NULL;
560   num_to_reduce[++num_local] = 0;
561   local_reduce[num_local] = NULL;
562 
563   /* load 'em up */
564   /* note one extra to hold NON_UNIFORM flag!!! */
565   vals[2] = vals[1] = vals[0] = nel;
566   if (gs->nel>0)
567     {
568        vals[3] = unique[0];           /* ivec_lb(elms,nel); */
569        vals[4] = unique[gs->nel-1];   /* ivec_ub(elms,nel); */
570     }
571   else
572     {
573        vals[3] = INT_MAX;             /* ivec_lb(elms,nel); */
574        vals[4] = INT_MIN;             /* ivec_ub(elms,nel); */
575     }
576   vals[5] = level;
577   vals[6] = num_gs_ids;
578 
579   /* GLOBAL: send 'em out */
580   giop(vals,work,sizeof(oprs)/sizeof(oprs[0])-1,oprs);
581 
582   /* must be semi-pos def - only pairwise depends on this */
583   /* LATER - remove this restriction */
584   if (vals[3]<0)
585     {error_msg_fatal("gsi_check_args() :: system not semi-pos def ::%d\n",vals[3]);}
586 
587   if (vals[4]==INT_MAX)
588     {error_msg_fatal("gsi_check_args() :: system ub too large ::%d!\n",vals[4]);}
589 
590 #ifdef DEBUG
591   /* check gs template count */
592   if (vals[6] != num_gs_ids)
593     {error_msg_fatal("num_gs_ids mismatch!!!");}
594 
595   /* check all have same level threshold */
596   if (level != vals[5])
597     {error_msg_fatal("gsi_check_args() :: level not uniform across nodes!!!\n");}
598 #endif
599 
600   gs->nel_min = vals[0];
601   gs->nel_max = vals[1];
602   gs->nel_sum = vals[2];
603   gs->gl_min  = vals[3];
604   gs->gl_max  = vals[4];
605   gs->negl    = vals[4]-vals[3]+1;
606 
607 #ifdef DEBUG
608       printf("nel(unique)=%d\n", gs->nel);
609       printf("nel_max=%d\n",     gs->nel_max);
610       printf("nel_min=%d\n",     gs->nel_min);
611       printf("nel_sum=%d\n",     gs->nel_sum);
612       printf("negl=%d\n",        gs->negl);
613       printf("gl_max=%d\n",      gs->gl_max);
614       printf("gl_min=%d\n",      gs->gl_min);
615       printf("elms ordered=%d\n",gs->ordered);
616       printf("repeats=%d\n",     gs->repeats);
617       printf("positive=%d\n",    gs->positive);
618       printf("level=%d\n",       gs->level);
619 #endif
620 
621   if (gs->negl<=0)
622     {error_msg_fatal("gsi_check_args() :: system empty or neg :: %d\n",gs->negl);}
623 
624   /* LATER :: add level == -1 -> program selects level */
625   if (vals[5]<0)
626     {vals[5]=0;}
627   else if (vals[5]>num_nodes)
628     {vals[5]=num_nodes;}
629   gs->level = vals[5];
630 
631 #ifdef DEBUG
632   error_msg_warning("gs_check_args() :: end w/(%d,%d)+level=%d\n",
633 		    my_id,num_nodes,vals[5]);
634 #endif
635 
636   return(gs);
637 }
638 
639 
640 /******************************************************************************
641 Function: gsi_via_bit_mask()
642 
643 Input :
644 Output:
645 Return:
646 Description:
647 
648 
649 ******************************************************************************/
650 static
651 void
652 gsi_via_bit_mask(gs_id *gs)
653 {
654   register int i, nel, *elms;
655   int t1;
656   int **reduce;
657   int *map;
658 
659 #ifdef DEBUG
660   error_msg_warning("gsi_via_bit_mask() begin w/%d :: %d\n",my_id,num_nodes);
661 #endif
662 
663   /* totally local removes ... ct_bits == 0 */
664   get_ngh_buf(gs);
665 
666   if (gs->level)
667     {set_pairwise(gs);}
668 
669   if (gs->max_left_over)
670     {set_tree(gs);}
671 
672   /* intersection local and pairwise/tree? */
673   gs->num_local_total = gs->num_local;
674   gs->gop_local_reduce = gs->local_reduce;
675   gs->num_gop_local_reduce = gs->num_local_reduce;
676 
677   map = gs->companion;
678 
679   /* is there any local compression */
680   if (!gs->num_local) {
681     gs->local_strength = NONE;
682     gs->num_local_gop = 0;
683   } else {
684       /* ok find intersection */
685       map = gs->companion;
686       reduce = gs->local_reduce;
687       for (i=0, t1=0; i<gs->num_local; i++, reduce++)
688 	{
689 	  if ((ivec_binary_search(**reduce,gs->pw_elm_list,gs->len_pw_list)>=0)
690 	      ||
691 	      ivec_binary_search(**reduce,gs->tree_map_in,gs->tree_map_sz)>=0)
692 	    {
693 	      /* printf("C%d :: i=%d, **reduce=%d\n",my_id,i,**reduce); */
694 	      t1++;
695 	      if (gs->num_local_reduce[i]<=0)
696 		{error_msg_fatal("nobody in list?");}
697 	      gs->num_local_reduce[i] *= -1;
698 	    }
699 	   **reduce=map[**reduce];
700 	}
701 
702       /* intersection is empty */
703       if (!t1)
704 	{
705 #ifdef DEBUG
706 	  error_msg_warning("gsi_check_args() :: local gs_gop w/o intersection!");
707 #endif
708 	  gs->local_strength = FULL;
709 	  gs->num_local_gop = 0;
710 	}
711       /* intersection not empty */
712       else
713 	{
714 #ifdef DEBUG
715 	  error_msg_warning("gsi_check_args() :: local gs_gop w/intersection!");
716 #endif
717 	  gs->local_strength = PARTIAL;
718 	  SMI_sort((void*)gs->num_local_reduce, (void*)gs->local_reduce,
719 		   gs->num_local + 1, SORT_INT_PTR);
720 
721 	  gs->num_local_gop = t1;
722 	  gs->num_local_total =  gs->num_local;
723 	  gs->num_local    -= t1;
724 	  gs->gop_local_reduce = gs->local_reduce;
725 	  gs->num_gop_local_reduce = gs->num_local_reduce;
726 
727 	  for (i=0; i<t1; i++)
728 	    {
729 	      if (gs->num_gop_local_reduce[i]>=0)
730 		{error_msg_fatal("they aren't negative?");}
731 	      gs->num_gop_local_reduce[i] *= -1;
732 	      gs->local_reduce++;
733 	      gs->num_local_reduce++;
734 	    }
735 	  gs->local_reduce++;
736 	  gs->num_local_reduce++;
737 	}
738     }
739 
740   elms = gs->pw_elm_list;
741   nel  = gs->len_pw_list;
742   for (i=0; i<nel; i++)
743     {elms[i] = map[elms[i]];}
744 
745   elms = gs->tree_map_in;
746   nel  = gs->tree_map_sz;
747   for (i=0; i<nel; i++)
748     {elms[i] = map[elms[i]];}
749 
750   /* clean up */
751   bss_free((void*) gs->local_elms);
752   bss_free((void*) gs->companion);
753   bss_free((void*) gs->elms);
754   bss_free((void*) gs->ngh_buf);
755   gs->local_elms = gs->companion = gs->elms = gs->ngh_buf = NULL;
756 
757 #ifdef DEBUG
758   error_msg_warning("gsi_via_bit_mask() end w/%d :: %d\n",my_id,num_nodes);
759 #endif
760 }
761 
762 
763 
764 /******************************************************************************
765 Function: place_in_tree()
766 
767 Input :
768 Output:
769 Return:
770 Description:
771 
772 
773 ******************************************************************************/
774 static
775 void
776 place_in_tree(register int elm)
777 {
778   register int *tp, n;
779 
780 
781   if (ntree==tree_buf_sz)
782     {
783       if (tree_buf_sz)
784 	{
785 	  tp = tree_buf;
786 	  n = tree_buf_sz;
787 	  tree_buf_sz<<=1;
788 	  tree_buf = (int*)bss_malloc(tree_buf_sz*INT_LEN);
789 	  ivec_copy(tree_buf,tp,n);
790 	  bss_free(tp);
791 	}
792       else
793 	{
794 	  tree_buf_sz = TREE_BUF_SZ;
795 	  tree_buf = (int*)bss_malloc(tree_buf_sz*INT_LEN);
796 	}
797     }
798 
799   tree_buf[ntree++] = elm;
800 }
801 
802 
803 
804 /******************************************************************************
805 Function: get_ngh_buf()
806 
807 Input :
808 Output:
809 Return:
810 Description:
811 
812 
813 ******************************************************************************/
814 static
815 void
816 get_ngh_buf(gs_id *gs)
817 {
818   register int i, j, npw=0, ntree_map=0;
819   int p_mask_size, ngh_buf_size, buf_size;
820   int *p_mask, *sh_proc_mask, *pw_sh_proc_mask;
821   int *ngh_buf, *buf1, *buf2;
822   int offset, per_load, num_loads, or_ct, start, end;
823   int *ptr1, *ptr2, i_start, negl, nel, *elms;
824   int oper=GL_B_OR;
825   int *ptr3, *t_mask, level, ct1, ct2;
826 
827 #ifdef DEBUG
828   error_msg_warning("get_ngh_buf() begin w/%d :: %d\n",my_id,num_nodes);
829 #endif
830 
831   /* to make life easier */
832   nel   = gs->nel;
833   elms  = gs->elms;
834   level = gs->level;
835 
836   /* det #bytes needed for processor bit masks and init w/mask cor. to my_id */
837   p_mask = (int*) bss_malloc(p_mask_size=len_bit_mask(num_nodes));
838   set_bit_mask(p_mask,p_mask_size,my_id);
839 
840   /* allocate space for masks and info bufs */
841   gs->nghs = sh_proc_mask = (int*) bss_malloc(p_mask_size);
842   gs->pw_nghs = pw_sh_proc_mask = (int*) perm_malloc(p_mask_size);
843   gs->ngh_buf_sz = ngh_buf_size = p_mask_size*nel;
844   t_mask = (int*) bss_malloc(p_mask_size);
845   gs->ngh_buf = ngh_buf = (int*) bss_malloc(ngh_buf_size);
846 
847   /* comm buffer size ... memory usage bounded by ~2*msg_buf */
848   /* had thought I could exploit rendezvous threshold */
849 
850   /* default is one pass */
851   per_load = negl  = gs->negl;
852   gs->num_loads = num_loads = 1;
853   i=p_mask_size*negl;
854 
855   /* possible overflow on buffer size */
856   /* overflow hack                    */
857   if (i<0) {i=INT_MAX;}
858 
859   buf_size = PetscMin(msg_buf,i);
860 
861   /* can we do it? */
862   if (p_mask_size>buf_size)
863     {error_msg_fatal("get_ngh_buf() :: buf<pms :: %d>%d\n",p_mask_size,buf_size);}
864 
865   /* get giop buf space ... make *only* one malloc */
866   buf1 = (int*) bss_malloc(buf_size<<1);
867 
868   /* more than one gior exchange needed? */
869   if (buf_size!=i)
870     {
871       per_load = buf_size/p_mask_size;
872       buf_size = per_load*p_mask_size;
873       gs->num_loads = num_loads = negl/per_load + (negl%per_load>0);
874     }
875 
876 #ifdef DEBUG
877   /* dump some basic info */
878   error_msg_warning("n_lds=%d,pms=%d,buf_sz=%d\n",num_loads,p_mask_size,buf_size);
879 #endif
880 
881   /* convert buf sizes from #bytes to #ints - 32 bit only! */
882 #ifdef SAFE
883   p_mask_size/=INT_LEN; ngh_buf_size/=INT_LEN; buf_size/=INT_LEN;
884 #else
885   p_mask_size>>=2; ngh_buf_size>>=2; buf_size>>=2;
886 #endif
887 
888   /* find giop work space */
889   buf2 = buf1+buf_size;
890 
891   /* hold #ints needed for processor masks */
892   gs->mask_sz=p_mask_size;
893 
894   /* init buffers */
895   ivec_zero(sh_proc_mask,p_mask_size);
896   ivec_zero(pw_sh_proc_mask,p_mask_size);
897   ivec_zero(ngh_buf,ngh_buf_size);
898 
899   /* HACK reset tree info */
900   tree_buf=NULL;
901   tree_buf_sz=ntree=0;
902 
903   /* queue the tree elements for now */
904   /* elms_q = new_queue(); */
905 
906   /* can also queue tree info for pruned or forest implememtation */
907   /*  mask_q = new_queue(); */
908 
909   /* ok do it */
910   for (ptr1=ngh_buf,ptr2=elms,end=gs->gl_min,or_ct=i=0; or_ct<num_loads; or_ct++)
911     {
912       /* identity for bitwise or is 000...000 */
913       ivec_zero(buf1,buf_size);
914 
915       /* load msg buffer */
916       for (start=end,end+=per_load,i_start=i; (offset=*ptr2)<end; i++, ptr2++)
917 	{
918 	  offset = (offset-start)*p_mask_size;
919 	  ivec_copy(buf1+offset,p_mask,p_mask_size);
920 	}
921 
922       /* GLOBAL: pass buffer */
923       giop(buf1,buf2,buf_size,&oper);
924 
925 
926       /* unload buffer into ngh_buf */
927       ptr2=(elms+i_start);
928       for(ptr3=buf1,j=start; j<end; ptr3+=p_mask_size,j++)
929 	{
930 	  /* I own it ... may have to pairwise it */
931 	  if (j==*ptr2)
932 	    {
933 	      /* do i share it w/anyone? */
934 #ifdef SAFE
935 	      ct1 = ct_bits((char *)ptr3,p_mask_size*INT_LEN);
936 #else
937 	      ct1 = ct_bits((char *)ptr3,p_mask_size<<2);
938 #endif
939 	      /* guess not */
940 	      if (ct1<2)
941 		{ptr2++; ptr1+=p_mask_size; continue;}
942 
943 	      /* i do ... so keep info and turn off my bit */
944 	      ivec_copy(ptr1,ptr3,p_mask_size);
945 	      ivec_xor(ptr1,p_mask,p_mask_size);
946 	      ivec_or(sh_proc_mask,ptr1,p_mask_size);
947 
948 	      /* is it to be done pairwise? */
949 	      if (--ct1<=level)
950 		{
951 		  npw++;
952 
953 		  /* turn on high bit to indicate pw need to process */
954 		  *ptr2++ |= TOP_BIT;
955 		  ivec_or(pw_sh_proc_mask,ptr1,p_mask_size);
956 		  ptr1+=p_mask_size;
957 		  continue;
958 		}
959 
960 	      /* get set for next and note that I have a tree contribution */
961 	      /* could save exact elm index for tree here -> save a search */
962 	      ptr2++; ptr1+=p_mask_size; ntree_map++;
963 	    }
964 	  /* i don't but still might be involved in tree */
965 	  else
966 	    {
967 
968 	      /* shared by how many? */
969 #ifdef SAFE
970 	      ct1 = ct_bits((char *)ptr3,p_mask_size*INT_LEN);
971 #else
972 	      ct1 = ct_bits((char *)ptr3,p_mask_size<<2);
973 #endif
974 
975 	      /* none! */
976 	      if (ct1<2)
977 		{continue;}
978 
979 	      /* is it going to be done pairwise? but not by me of course!*/
980 	      if (--ct1<=level)
981 		{continue;}
982 	    }
983 	  /* LATER we're going to have to process it NOW */
984 	  /* nope ... tree it */
985 	  place_in_tree(j);
986 	}
987     }
988 
989   bss_free((void*)t_mask);
990   bss_free((void*)buf1);
991 
992   gs->len_pw_list=npw;
993   gs->num_nghs = ct_bits((char *)sh_proc_mask,p_mask_size*INT_LEN);
994 
995   /* expand from bit mask list to int list and save ngh list */
996   gs->nghs = (int*) perm_malloc(gs->num_nghs * INT_LEN);
997   bm_to_proc((char *)sh_proc_mask,p_mask_size*INT_LEN,gs->nghs);
998 
999   gs->num_pw_nghs = ct_bits((char *)pw_sh_proc_mask,p_mask_size*INT_LEN);
1000 
1001   oper = GL_MAX;
1002   ct1 = gs->num_nghs;
1003   giop(&ct1,&ct2,1,&oper);
1004   gs->max_nghs = ct1;
1005 
1006   gs->tree_map_sz  = ntree_map;
1007   gs->max_left_over=ntree;
1008 
1009   bss_free((void*)p_mask);
1010   bss_free((void*)sh_proc_mask);
1011 
1012 #ifdef DEBUG
1013   error_msg_warning("get_ngh_buf() end w/%d :: %d\n",my_id,num_nodes);
1014 #endif
1015 }
1016 
1017 
1018 
1019 
1020 
1021 /******************************************************************************
1022 Function: pairwise_init()
1023 
1024 Input :
1025 Output:
1026 Return:
1027 Description:
1028 
1029 if an element is shared by fewer that level# of nodes do pairwise exch
1030 ******************************************************************************/
1031 static
1032 void
1033 set_pairwise(gs_id *gs)
1034 {
1035   register int i, j;
1036   int p_mask_size;
1037   int *p_mask, *sh_proc_mask, *tmp_proc_mask;
1038   int *ngh_buf, *buf2;
1039   int offset;
1040   int *msg_list, *msg_size, **msg_nodes, nprs;
1041   int *pairwise_elm_list, len_pair_list=0;
1042   int *iptr, t1, i_start, nel, *elms;
1043   int ct;
1044 
1045 
1046 #ifdef DEBUG
1047   error_msg_warning("set_pairwise() begin w/%d :: %d\n",my_id,num_nodes);
1048 #endif
1049 
1050   /* to make life easier */
1051   nel  = gs->nel;
1052   elms = gs->elms;
1053   ngh_buf = gs->ngh_buf;
1054   sh_proc_mask  = gs->pw_nghs;
1055 
1056   /* need a few temp masks */
1057   p_mask_size   = len_bit_mask(num_nodes);
1058   p_mask        = (int*) bss_malloc(p_mask_size);
1059   tmp_proc_mask = (int*) bss_malloc(p_mask_size);
1060 
1061   /* set mask to my my_id's bit mask */
1062   set_bit_mask(p_mask,p_mask_size,my_id);
1063 
1064 #ifdef SAFE
1065   p_mask_size /= INT_LEN;
1066 #else
1067   p_mask_size >>= 2;
1068 #endif
1069 
1070   len_pair_list=gs->len_pw_list;
1071   gs->pw_elm_list=pairwise_elm_list=(int*)perm_malloc((len_pair_list+1)*INT_LEN);
1072 
1073   /* how many processors (nghs) do we have to exchange with? */
1074   nprs=gs->num_pairs=ct_bits((char *)sh_proc_mask,p_mask_size*INT_LEN);
1075 
1076 
1077   /* allocate space for gs_gop() info */
1078   gs->pair_list = msg_list = (int*)  perm_malloc(INT_LEN*nprs);
1079   gs->msg_sizes = msg_size  = (int*)  perm_malloc(INT_LEN*nprs);
1080   gs->node_list = msg_nodes = (int **) perm_malloc(INT_PTR_LEN*(nprs+1));
1081 
1082   /* init msg_size list */
1083   ivec_zero(msg_size,nprs);
1084 
1085   /* expand from bit mask list to int list */
1086   bm_to_proc((char *)sh_proc_mask,p_mask_size*INT_LEN,msg_list);
1087 
1088   /* keep list of elements being handled pairwise */
1089   for (i=j=0;i<nel;i++)
1090     {
1091       if (elms[i] & TOP_BIT)
1092 	{elms[i] ^= TOP_BIT; pairwise_elm_list[j++] = i;}
1093     }
1094   pairwise_elm_list[j] = -1;
1095 
1096 #ifdef DEBUG
1097   if (j!=len_pair_list)
1098     {error_msg_fatal("oops ... bad paiwise list in set_pairwise!");}
1099 #endif
1100 
1101   gs->msg_ids_out = (MPI_Request *)  perm_malloc(sizeof(MPI_Request)*(nprs+1));
1102   gs->msg_ids_out[nprs] = MPI_REQUEST_NULL;
1103   gs->msg_ids_in = (MPI_Request *)  perm_malloc(sizeof(MPI_Request)*(nprs+1));
1104   gs->msg_ids_in[nprs] = MPI_REQUEST_NULL;
1105   gs->pw_vals = (REAL *) perm_malloc(REAL_LEN*len_pair_list*vec_sz);
1106 
1107   /* find who goes to each processor */
1108   for (i_start=i=0;i<nprs;i++)
1109     {
1110       /* processor i's mask */
1111       set_bit_mask(p_mask,p_mask_size*INT_LEN,msg_list[i]);
1112 
1113       /* det # going to processor i */
1114       for (ct=j=0;j<len_pair_list;j++)
1115 	{
1116 	  buf2 = ngh_buf+(pairwise_elm_list[j]*p_mask_size);
1117 	  ivec_and3(tmp_proc_mask,p_mask,buf2,p_mask_size);
1118 	  if (ct_bits((char *)tmp_proc_mask,p_mask_size*INT_LEN))
1119 	    {ct++;}
1120 	}
1121       msg_size[i] = ct;
1122       i_start = PetscMax(i_start,ct);
1123 
1124       /*space to hold nodes in message to first neighbor */
1125       msg_nodes[i] = iptr = (int*) perm_malloc(INT_LEN*(ct+1));
1126 
1127       for (j=0;j<len_pair_list;j++)
1128 	{
1129 	  buf2 = ngh_buf+(pairwise_elm_list[j]*p_mask_size);
1130 	  ivec_and3(tmp_proc_mask,p_mask,buf2,p_mask_size);
1131 	  if (ct_bits((char *)tmp_proc_mask,p_mask_size*INT_LEN))
1132 	    {*iptr++ = j;}
1133 	}
1134       *iptr = -1;
1135     }
1136   msg_nodes[nprs] = NULL;
1137 
1138 #ifdef INFO1
1139   t1 = GL_MAX;
1140   giop(&i_start,&offset,1,&t1);
1141   gs->max_pairs = i_start;
1142 #else
1143   j=gs->loc_node_pairs=i_start;
1144   t1 = GL_MAX;
1145   giop(&i_start,&offset,1,&t1);
1146   gs->max_node_pairs = i_start;
1147 
1148   i_start=j;
1149   t1 = GL_MIN;
1150   giop(&i_start,&offset,1,&t1);
1151   gs->min_node_pairs = i_start;
1152 
1153   i_start=j;
1154   t1 = GL_ADD;
1155   giop(&i_start,&offset,1,&t1);
1156   gs->avg_node_pairs = i_start/num_nodes + 1;
1157 
1158   i_start=nprs;
1159   t1 = GL_MAX;
1160   giop(&i_start,&offset,1,&t1);
1161   gs->max_pairs = i_start;
1162 
1163   /*gs->max_pairs = -1;*/
1164 #endif
1165 
1166   /* remap pairwise in tail of gsi_via_bit_mask() */
1167   gs->msg_total = ivec_sum(gs->msg_sizes,nprs);
1168   gs->out = (REAL *) perm_malloc(REAL_LEN*gs->msg_total*vec_sz);
1169   gs->in  = (REAL *) perm_malloc(REAL_LEN*gs->msg_total*vec_sz);
1170 
1171   /* reset malloc pool */
1172   bss_free((void*)p_mask);
1173   bss_free((void*)tmp_proc_mask);
1174 
1175 #ifdef DEBUG
1176   error_msg_warning("set_pairwise() end w/%d :: %d\n",my_id,num_nodes);
1177 #endif
1178 }
1179 
1180 
1181 
1182 /******************************************************************************
1183 Function: set_tree()
1184 
1185 Input :
1186 Output:
1187 Return:
1188 Description:
1189 
1190 to do pruned tree just save ngh buf copy for each one and decode here!
1191 ******************************************************************************/
1192 static
1193 void
1194 set_tree(gs_id *gs)
1195 {
1196   register int i, j, n, nel;
1197   register int *iptr_in, *iptr_out, *tree_elms, *elms;
1198 
1199 
1200 #ifdef DEBUG
1201   error_msg_warning("set_tree() :: begin\n");
1202 #endif
1203 
1204   /* local work ptrs */
1205   elms = gs->elms;
1206   nel     = gs->nel;
1207 
1208   /* how many via tree */
1209   gs->tree_nel  = n = ntree;
1210   gs->tree_elms = tree_elms = iptr_in = tree_buf;
1211   gs->tree_buf  = (REAL *) bss_malloc(REAL_LEN*n*vec_sz);
1212   gs->tree_work = (REAL *) bss_malloc(REAL_LEN*n*vec_sz);
1213   j=gs->tree_map_sz;
1214   gs->tree_map_in = iptr_in  = (int*) bss_malloc(INT_LEN*(j+1));
1215   gs->tree_map_out = iptr_out = (int*) bss_malloc(INT_LEN*(j+1));
1216 
1217 #ifdef DEBUG
1218   error_msg_warning("num on tree=%d,%d",gs->max_left_over,gs->tree_nel);
1219 #endif
1220 
1221   /* search the longer of the two lists */
1222   /* note ... could save this info in get_ngh_buf and save searches */
1223   if (n<=nel)
1224     {
1225       /* bijective fct w/remap - search elm list */
1226       for (i=0; i<n; i++)
1227 	{
1228 	  if ((j=ivec_binary_search(*tree_elms++,elms,nel))>=0)
1229 	    {*iptr_in++ = j; *iptr_out++ = i;}
1230 	}
1231     }
1232   else
1233     {
1234       for (i=0; i<nel; i++)
1235 	{
1236 	  if ((j=ivec_binary_search(*elms++,tree_elms,n))>=0)
1237 	    {*iptr_in++ = i; *iptr_out++ = j;}
1238 	}
1239     }
1240 
1241   /* sentinel */
1242   *iptr_in = *iptr_out = -1;
1243 
1244 #ifdef DEBUG
1245   error_msg_warning("set_tree() :: end\n");
1246 #endif
1247 }
1248 
1249 
1250 /******************************************************************************
1251 Function: gsi_via_int_list()
1252 
1253 Input :
1254 Output:
1255 Return:
1256 Description:
1257 ******************************************************************************/
1258 /*static
1259 void
1260 gsi_via_int_list(gs_id *gs)
1261 {
1262 
1263    LATER: for P large the bit masks -> too many passes
1264    LATER: strategy: do gsum w/1 in position i in negl if owner
1265    LATER: then sum of entire vector 1 ... negl determines min buf len
1266    LATER: So choose min from this or mask method
1267 }*/
1268 
1269 
1270 #if defined(not_used)
1271 static
1272 int
1273 root_sub_tree(int *proc_list, int num)
1274 {
1275   register int i, j, p_or, p_and;
1276   register int root, mask;
1277 
1278 
1279   /* ceiling(log2(num_nodes)) - 1 */
1280   j = i_log2_num_nodes;
1281   if (num_nodes==floor_num_nodes)
1282     {j--;}
1283 
1284   /* set mask to msb */
1285   for(mask=1,i=0; i<j; i++)
1286     {mask<<=1;}
1287 
1288   p_or  = ivec_reduce_or(proc_list,num);
1289   p_and = ivec_reduce_and(proc_list,num);
1290   for(root=i=0; i<j; i++,mask>>=1)
1291     {
1292       /* (msb-i)'th bits on ==> root in right 1/2 tree */
1293       if (mask & p_and)
1294 	{root |= mask;}
1295 
1296       /* (msb-i)'th bits differ ==> root found */
1297       else if (mask & p_or)
1298 	{break;}
1299 
1300       /* (msb-i)'th bits off ==>root in left 1/2 tree */
1301     }
1302 
1303 #ifdef DEBUG
1304   if ((root<0) || (root>num_nodes))
1305     {error_msg_fatal("root_sub_tree() :: bad root!");}
1306 
1307   if (!my_id)
1308     {
1309       printf("num_nodes=%d, j=%d, root=%d\n",num_nodes,j,root);
1310       printf("procs: ");
1311       for(i=0;i<num;i++)
1312 	{printf("%d ",proc_list[i]);}
1313       printf("\n");
1314     }
1315 #endif
1316 
1317   return(root);
1318 }
1319 #endif
1320 
1321 
1322 #if defined(not_used)
1323 static int
1324 in_sub_tree(int *mask, int mask_size, int *work, int nw)
1325 {
1326   int ct, nb;
1327 
1328   /* mask size in bytes */
1329   nb = mask_size<<2;
1330 
1331   /* shared amoungst how many? */
1332   ct = ct_bits((char *)mask,nb);
1333 
1334   /* enough space? */
1335   if (nw<ct)
1336     {error_msg_fatal("in_sub_tree() :: not enough space to expand bit mask!");}
1337 
1338   /* expand */
1339   bm_to_proc((char *)mask,nb,work);
1340 
1341   /* find tree root */
1342   root_sub_tree(work,ct);
1343 
1344   /* am i in any of the paths? */
1345 
1346   return(TRUE);
1347 
1348   /*
1349   sh_mask = (int*)bss_malloc(nb);
1350   bss_free(sh_mask);
1351   */
1352 }
1353 #endif
1354 
1355 
1356 /******************************************************************************
1357 Function: gather_scatter
1358 
1359 Input :
1360 Output:
1361 Return:
1362 Description:
1363 ******************************************************************************/
1364 static
1365 void
1366 gs_gop_local_out(register gs_id *gs, register REAL *vals)
1367 {
1368   register int *num, *map, **reduce;
1369   register REAL tmp;
1370 
1371 
1372 #ifdef DEBUG
1373   error_msg_warning("start gs_gop_xxx()\n");
1374 #endif
1375 
1376   num    = gs->num_gop_local_reduce;
1377   reduce = gs->gop_local_reduce;
1378   while ((map = *reduce++))
1379     {
1380       /* wall */
1381       if (*num == 2)
1382 	{
1383 	  num ++;
1384 	  vals[map[1]] = vals[map[0]];
1385 	}
1386       /* corner shared by three elements */
1387       else if (*num == 3)
1388 	{
1389 	  num ++;
1390 	  vals[map[2]] = vals[map[1]] = vals[map[0]];
1391 	}
1392       /* corner shared by four elements */
1393       else if (*num == 4)
1394 	{
1395 	  num ++;
1396 	  vals[map[3]] = vals[map[2]] = vals[map[1]] = vals[map[0]];
1397 	}
1398       /* general case ... odd geoms ... 3D*/
1399       else
1400 	{
1401 	  num++;
1402 	  tmp = *(vals + *map++);
1403 	  while (*map >= 0)
1404 	    {*(vals + *map++) = tmp;}
1405 	}
1406     }
1407 }
1408 
1409 
1410 
1411 /******************************************************************************
1412 Function: gather_scatter
1413 
1414 Input :
1415 Output:
1416 Return:
1417 Description:
1418 ******************************************************************************/
1419 void
1420 gs_gop_binary(gs_ADT gs, REAL *vals, rbfp fct)
1421 {
1422 #ifdef DEBUG
1423   if (!gs)  {error_msg_fatal("gs_gop() :: passed NULL gs handle!!!");}
1424   if (!fct) {error_msg_fatal("gs_gop() :: passed NULL bin fct handle!!!");}
1425   error_msg_warning("start gs_gop_xxx()\n");
1426 #endif
1427 
1428   /* local only operations!!! */
1429   if (gs->num_local)
1430     {gs_gop_local_binary(gs,vals,fct);}
1431 
1432   /* if intersection tree/pairwise and local isn't empty */
1433   if (gs->num_local_gop)
1434     {
1435       gs_gop_local_in_binary(gs,vals,fct);
1436 
1437       /* pairwise */
1438       if (gs->num_pairs)
1439 	{gs_gop_pairwise_binary(gs,vals,fct);}
1440 
1441       /* tree */
1442       else if (gs->max_left_over)
1443 	{gs_gop_tree_binary(gs,vals,fct);}
1444 
1445       gs_gop_local_out(gs,vals);
1446     }
1447   /* if intersection tree/pairwise and local is empty */
1448   else
1449     {
1450       /* pairwise */
1451       if (gs->num_pairs)
1452 	{gs_gop_pairwise_binary(gs,vals,fct);}
1453 
1454       /* tree */
1455       else if (gs->max_left_over)
1456 	{gs_gop_tree_binary(gs,vals,fct);}
1457     }
1458 }
1459 
1460 
1461 
1462 /******************************************************************************
1463 Function: gather_scatter
1464 
1465 Input :
1466 Output:
1467 Return:
1468 Description:
1469 ******************************************************************************/
1470 static
1471 void
1472 gs_gop_local_binary(register gs_id *gs, register REAL *vals, register rbfp fct)
1473 {
1474   register int *num, *map, **reduce;
1475   REAL tmp;
1476 
1477 
1478 #ifdef DEBUG
1479   error_msg_warning("start gs_gop_xxx()\n");
1480 #endif
1481   num    = gs->num_local_reduce;
1482   reduce = gs->local_reduce;
1483   while ((map = *reduce))
1484     {
1485       num ++;
1486       (*fct)(&tmp,NULL,1);
1487       /* tmp = 0.0; */
1488       while (*map >= 0)
1489 	{(*fct)(&tmp,(vals + *map),1); map++;}
1490 	/*        {tmp = (*fct)(tmp,*(vals + *map)); map++;} */
1491 
1492       map = *reduce++;
1493       while (*map >= 0)
1494         {*(vals + *map++) = tmp;}
1495     }
1496 }
1497 
1498 
1499 
1500 /******************************************************************************
1501 Function: gather_scatter
1502 
1503 Input :
1504 Output:
1505 Return:
1506 Description:
1507 ******************************************************************************/
1508 static
1509 void
1510 gs_gop_local_in_binary(register gs_id *gs, register REAL *vals, register rbfp fct)
1511 {
1512   register int *num, *map, **reduce;
1513   register REAL *base;
1514 
1515 
1516 #ifdef DEBUG
1517   error_msg_warning("start gs_gop_xxx()\n");
1518 #endif
1519 
1520   num    = gs->num_gop_local_reduce;
1521 
1522   reduce = gs->gop_local_reduce;
1523   while ((map = *reduce++))
1524     {
1525       num++;
1526       base = vals + *map++;
1527       while (*map >= 0)
1528 	{(*fct)(base,(vals + *map),1); map++;}
1529 	/*        {*base = (*fct)(*base,*(vals + *map)); map++;} */
1530     }
1531 }
1532 
1533 
1534 
1535 /******************************************************************************
1536 Function: gather_scatter
1537 
1538 VERSION 3 ::
1539 
1540 Input :
1541 Output:
1542 Return:
1543 Description:
1544 ******************************************************************************/
1545 static
1546 void
1547 gs_gop_pairwise_binary(register gs_id *gs, register REAL *in_vals,
1548                        register rbfp fct)
1549 {
1550   register REAL *dptr1, *dptr2, *dptr3, *in1, *in2;
1551   register int *iptr, *msg_list, *msg_size, **msg_nodes;
1552   register int *pw, *list, *size, **nodes;
1553   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
1554   MPI_Status status;
1555 
1556 
1557   /* strip and load registers */
1558   msg_list =list         = gs->pair_list;
1559   msg_size =size         = gs->msg_sizes;
1560   msg_nodes=nodes        = gs->node_list;
1561   iptr=pw                = gs->pw_elm_list;
1562   dptr1=dptr3            = gs->pw_vals;
1563   msg_ids_in  = ids_in   = gs->msg_ids_in;
1564   msg_ids_out = ids_out  = gs->msg_ids_out;
1565   dptr2                  = gs->out;
1566   in1=in2                = gs->in;
1567 
1568   /* post the receives */
1569   /*  msg_nodes=nodes; */
1570   do
1571     {
1572       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
1573          second one *list and do list++ afterwards */
1574       MPI_Irecv(in1, *size, REAL_TYPE, MPI_ANY_SOURCE, MSGTAG1 + *list++,
1575                 gs->gs_comm, msg_ids_in++);
1576       in1 += *size++;
1577     }
1578   while (*++msg_nodes);
1579   msg_nodes=nodes;
1580 
1581   /* load gs values into in out gs buffers */
1582   while (*iptr >= 0)
1583     {*dptr3++ = *(in_vals + *iptr++);}
1584 
1585   /* load out buffers and post the sends */
1586   while ((iptr = *msg_nodes++))
1587     {
1588       dptr3 = dptr2;
1589       while (*iptr >= 0)
1590         {*dptr2++ = *(dptr1 + *iptr++);}
1591       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
1592       /* is msg_ids_out++ correct? */
1593       MPI_Isend(dptr3, *msg_size++, REAL_TYPE, *msg_list++,
1594                 MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);
1595     }
1596 
1597   if (gs->max_left_over)
1598     {gs_gop_tree_binary(gs,in_vals,fct);}
1599 
1600   /* process the received data */
1601   msg_nodes=nodes;
1602   while ((iptr = *nodes++))
1603     {
1604       /* Should I check the return value of MPI_Wait() or status? */
1605       /* Can this loop be replaced by a call to MPI_Waitall()? */
1606       MPI_Wait(ids_in++, &status);
1607       while (*iptr >= 0)
1608 	{(*fct)((dptr1 + *iptr),in2,1); iptr++; in2++;}
1609       /* {*(dptr1 + *iptr) = (*fct)(*(dptr1 + *iptr),*in2); iptr++; in2++;} */
1610     }
1611 
1612   /* replace vals */
1613   while (*pw >= 0)
1614     {*(in_vals + *pw++) = *dptr1++;}
1615 
1616   /* clear isend message handles */
1617   /* This changed for clarity though it could be the same */
1618   while (*msg_nodes++)
1619     /* Should I check the return value of MPI_Wait() or status? */
1620     /* Can this loop be replaced by a call to MPI_Waitall()? */
1621     {MPI_Wait(ids_out++, &status);}
1622 }
1623 
1624 
1625 
1626 /******************************************************************************
1627 Function: gather_scatter
1628 
1629 Input :
1630 Output:
1631 Return:
1632 Description:
1633 ******************************************************************************/
1634 static
1635 void
1636 gs_gop_tree_binary(gs_id *gs, REAL *vals, register rbfp fct)
1637 {
1638   int size;
1639   int *in, *out;
1640   REAL *buf, *work;
1641 
1642 #ifdef DEBUG
1643   error_msg_warning("gs_gop_tree_binary() :: start\n");
1644 #endif
1645 
1646   in   = gs->tree_map_in;
1647   out  = gs->tree_map_out;
1648   buf  = gs->tree_buf;
1649   work = gs->tree_work;
1650   size = gs->tree_nel;
1651 
1652   /* load vals vector w/identity */
1653   (*fct)(buf,NULL,size);
1654 
1655   /* load my contribution into val vector */
1656   while (*in >= 0)
1657     {(*fct)((buf + *out++),(vals + *in++),-1);}
1658 /*    {*(buf + *out++) = *(vals + *in++);} */
1659 
1660   gfop(buf,work,size,(vbfp)fct,REAL_TYPE,0);
1661 
1662   in   = gs->tree_map_in;
1663   out  = gs->tree_map_out;
1664   while (*in >= 0)
1665     {(*fct)((vals + *in++),(buf + *out++),-1);}
1666     /*    {*(vals + *in++) = *(buf + *out++);} */
1667 
1668 
1669 #ifdef DEBUG
1670   error_msg_warning("gs_gop_tree_binary() :: end\n");
1671 #endif
1672 
1673 }
1674 
1675 
1676 
1677 
1678 /******************************************************************************
1679 Function: gather_scatter
1680 
1681 Input :
1682 Output:
1683 Return:
1684 Description:
1685 ******************************************************************************/
1686 void
1687 gs_gop(register gs_id *gs, register REAL *vals, register const char *op)
1688 {
1689 #ifdef DEBUG
1690   error_msg_warning("start gs_gop()\n");
1691   if (!gs) {error_msg_fatal("gs_gop() :: passed NULL gs handle!!!");}
1692   if (!op) {error_msg_fatal("gs_gop() :: passed NULL operation!!!");}
1693 #endif
1694 
1695   switch (*op) {
1696   case '+':
1697     gs_gop_plus(gs,vals);
1698     break;
1699   case '*':
1700     gs_gop_times(gs,vals);
1701     break;
1702   case 'a':
1703     gs_gop_min_abs(gs,vals);
1704     break;
1705   case 'A':
1706     gs_gop_max_abs(gs,vals);
1707     break;
1708   case 'e':
1709     gs_gop_exists(gs,vals);
1710     break;
1711   case 'm':
1712     gs_gop_min(gs,vals);
1713     break;
1714   case 'M':
1715     gs_gop_max(gs,vals); break;
1716     /*
1717     if (*(op+1)=='\0')
1718       {gs_gop_max(gs,vals); break;}
1719     else if (*(op+1)=='X')
1720       {gs_gop_max_abs(gs,vals); break;}
1721     else if (*(op+1)=='N')
1722       {gs_gop_min_abs(gs,vals); break;}
1723     */
1724   default:
1725     error_msg_warning("gs_gop() :: %c is not a valid op",op[0]);
1726     error_msg_warning("gs_gop() :: default :: plus");
1727     gs_gop_plus(gs,vals);
1728     break;
1729   }
1730 #ifdef DEBUG
1731   error_msg_warning("end gs_gop()\n");
1732 #endif
1733 }
1734 
1735 
1736 /******************************************************************************
1737 Function: gather_scatter
1738 
1739 Input :
1740 Output:
1741 Return:
1742 Description:
1743 ******************************************************************************/
1744 static void
1745 gs_gop_exists(register gs_id *gs, register REAL *vals)
1746 {
1747 #ifdef DEBUG
1748   error_msg_warning("start gs_gop_xxx()\n");
1749 #endif
1750 
1751   /* local only operations!!! */
1752   if (gs->num_local)
1753     {gs_gop_local_exists(gs,vals);}
1754 
1755   /* if intersection tree/pairwise and local isn't empty */
1756   if (gs->num_local_gop)
1757     {
1758       gs_gop_local_in_exists(gs,vals);
1759 
1760       /* pairwise */
1761       if (gs->num_pairs)
1762 	{gs_gop_pairwise_exists(gs,vals);}
1763 
1764       /* tree */
1765       else if (gs->max_left_over)
1766 	{gs_gop_tree_exists(gs,vals);}
1767 
1768       gs_gop_local_out(gs,vals);
1769     }
1770   /* if intersection tree/pairwise and local is empty */
1771   else
1772     {
1773       /* pairwise */
1774       if (gs->num_pairs)
1775 	{gs_gop_pairwise_exists(gs,vals);}
1776 
1777       /* tree */
1778       else if (gs->max_left_over)
1779 	{gs_gop_tree_exists(gs,vals);}
1780     }
1781 }
1782 
1783 
1784 
1785 /******************************************************************************
1786 Function: gather_scatter
1787 
1788 Input :
1789 Output:
1790 Return:
1791 Description:
1792 ******************************************************************************/
1793 static
1794 void
1795 gs_gop_local_exists(register gs_id *gs, register REAL *vals)
1796 {
1797   register int *num, *map, **reduce;
1798   register REAL tmp;
1799 
1800 
1801 #ifdef DEBUG
1802   error_msg_warning("start gs_gop_xxx()\n");
1803 #endif
1804 
1805   num    = gs->num_local_reduce;
1806   reduce = gs->local_reduce;
1807   while ((map = *reduce))
1808     {
1809       num ++;
1810       tmp = 0.0;
1811       while (*map >= 0)
1812 	{tmp = EXISTS(tmp,*(vals + *map)); map++;}
1813 
1814       map = *reduce++;
1815       while (*map >= 0)
1816 	{*(vals + *map++) = tmp;}
1817     }
1818 }
1819 
1820 
1821 
1822 /******************************************************************************
1823 Function: gather_scatter
1824 
1825 Input :
1826 Output:
1827 Return:
1828 Description:
1829 ******************************************************************************/
1830 static
1831 void
1832 gs_gop_local_in_exists(register gs_id *gs, register REAL *vals)
1833 {
1834   register int *num, *map, **reduce;
1835   register REAL *base;
1836 
1837 
1838 #ifdef DEBUG
1839   error_msg_warning("start gs_gop_xxx()\n");
1840 #endif
1841 
1842   num    = gs->num_gop_local_reduce;
1843   reduce = gs->gop_local_reduce;
1844   while ((map = *reduce++))
1845     {
1846       num++;
1847       base = vals + *map++;
1848       while (*map >= 0)
1849 	{*base = EXISTS(*base,*(vals + *map)); map++;}
1850     }
1851 }
1852 
1853 
1854 
1855 /******************************************************************************
1856 Function: gather_scatter
1857 
1858 VERSION 3 ::
1859 
1860 Input :
1861 Output:
1862 Return:
1863 Description:
1864 ******************************************************************************/
1865 static
1866 void
1867 gs_gop_pairwise_exists(register gs_id *gs, register REAL *in_vals)
1868 {
1869   register REAL *dptr1, *dptr2, *dptr3, *in1, *in2;
1870   register int *iptr, *msg_list, *msg_size, **msg_nodes;
1871   register int *pw, *list, *size, **nodes;
1872   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
1873   MPI_Status status;
1874 
1875 
1876   /* strip and load registers */
1877   msg_list =list         = gs->pair_list;
1878   msg_size =size         = gs->msg_sizes;
1879   msg_nodes=nodes        = gs->node_list;
1880   iptr=pw                = gs->pw_elm_list;
1881   dptr1=dptr3            = gs->pw_vals;
1882   msg_ids_in  = ids_in   = gs->msg_ids_in;
1883   msg_ids_out = ids_out  = gs->msg_ids_out;
1884   dptr2                  = gs->out;
1885   in1=in2                = gs->in;
1886 
1887   /* post the receives */
1888   /*  msg_nodes=nodes; */
1889   do
1890     {
1891       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
1892 	 second one *list and do list++ afterwards */
1893       MPI_Irecv(in1, *size, REAL_TYPE, MPI_ANY_SOURCE, MSGTAG1 + *list++,
1894 		gs->gs_comm, msg_ids_in++);
1895       in1 += *size++;
1896     }
1897   while (*++msg_nodes);
1898   msg_nodes=nodes;
1899 
1900   /* load gs values into in out gs buffers */
1901   while (*iptr >= 0)
1902     {*dptr3++ = *(in_vals + *iptr++);}
1903 
1904   /* load out buffers and post the sends */
1905   while ((iptr = *msg_nodes++))
1906     {
1907       dptr3 = dptr2;
1908       while (*iptr >= 0)
1909 	{*dptr2++ = *(dptr1 + *iptr++);}
1910       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
1911       /* is msg_ids_out++ correct? */
1912       MPI_Isend(dptr3, *msg_size++, REAL_TYPE, *msg_list++,
1913 		MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);
1914     }
1915 
1916   if (gs->max_left_over)
1917     {gs_gop_tree_exists(gs,in_vals);}
1918 
1919   /* process the received data */
1920   msg_nodes=nodes;
1921   while ((iptr = *nodes++))
1922     {
1923       /* Should I check the return value of MPI_Wait() or status? */
1924       /* Can this loop be replaced by a call to MPI_Waitall()? */
1925       MPI_Wait(ids_in++, &status);
1926       while (*iptr >= 0)
1927 	{*(dptr1 + *iptr) = EXISTS(*(dptr1 + *iptr),*in2); iptr++; in2++;}
1928     }
1929 
1930   /* replace vals */
1931   while (*pw >= 0)
1932     {*(in_vals + *pw++) = *dptr1++;}
1933 
1934   /* clear isend message handles */
1935   /* This changed for clarity though it could be the same */
1936   while (*msg_nodes++)
1937     /* Should I check the return value of MPI_Wait() or status? */
1938     /* Can this loop be replaced by a call to MPI_Waitall()? */
1939     {MPI_Wait(ids_out++, &status);}
1940 }
1941 
1942 
1943 
1944 /******************************************************************************
1945 Function: gather_scatter
1946 
1947 Input :
1948 Output:
1949 Return:
1950 Description:
1951 ******************************************************************************/
1952 static
1953 void
1954 gs_gop_tree_exists(gs_id *gs, REAL *vals)
1955 {
1956   int size;
1957   int *in, *out;
1958   REAL *buf, *work;
1959   int op[] = {GL_EXISTS,0};
1960 
1961 
1962 #ifdef DEBUG
1963   error_msg_warning("start gs_gop_tree_exists()");
1964 #endif
1965 
1966   in   = gs->tree_map_in;
1967   out  = gs->tree_map_out;
1968   buf  = gs->tree_buf;
1969   work = gs->tree_work;
1970   size = gs->tree_nel;
1971 
1972 #if defined  BLAS||CBLAS
1973   *work = 0.0;
1974   copy(size,work,0,buf,1);
1975 #else
1976   rvec_zero(buf,size);
1977 #endif
1978 
1979   while (*in >= 0)
1980     {
1981       /*
1982       printf("%d :: out=%d\n",my_id,*out);
1983       printf("%d :: in=%d\n",my_id,*in);
1984       */
1985       *(buf + *out++) = *(vals + *in++);
1986     }
1987 
1988   grop(buf,work,size,op);
1989 
1990   in   = gs->tree_map_in;
1991   out  = gs->tree_map_out;
1992 
1993   while (*in >= 0)
1994     {*(vals + *in++) = *(buf + *out++);}
1995 
1996 #ifdef DEBUG
1997   error_msg_warning("start gs_gop_tree_exists()");
1998 #endif
1999 }
2000 
2001 
2002 
2003 /******************************************************************************
2004 Function: gather_scatter
2005 
2006 Input :
2007 Output:
2008 Return:
2009 Description:
2010 ******************************************************************************/
2011 static void
2012 gs_gop_max_abs(register gs_id *gs, register REAL *vals)
2013 {
2014 #ifdef DEBUG
2015   error_msg_warning("start gs_gop_xxx()\n");
2016 #endif
2017 
2018   /* local only operations!!! */
2019   if (gs->num_local)
2020     {gs_gop_local_max_abs(gs,vals);}
2021 
2022   /* if intersection tree/pairwise and local isn't empty */
2023   if (gs->num_local_gop)
2024     {
2025       gs_gop_local_in_max_abs(gs,vals);
2026 
2027       /* pairwise */
2028       if (gs->num_pairs)
2029 	{gs_gop_pairwise_max_abs(gs,vals);}
2030 
2031       /* tree */
2032       else if (gs->max_left_over)
2033 	{gs_gop_tree_max_abs(gs,vals);}
2034 
2035       gs_gop_local_out(gs,vals);
2036     }
2037   /* if intersection tree/pairwise and local is empty */
2038   else
2039     {
2040       /* pairwise */
2041       if (gs->num_pairs)
2042 	{gs_gop_pairwise_max_abs(gs,vals);}
2043 
2044       /* tree */
2045       else if (gs->max_left_over)
2046 	{gs_gop_tree_max_abs(gs,vals);}
2047     }
2048 }
2049 
2050 
2051 
2052 /******************************************************************************
2053 Function: gather_scatter
2054 
2055 Input :
2056 Output:
2057 Return:
2058 Description:
2059 ******************************************************************************/
2060 static
2061 void
2062 gs_gop_local_max_abs(register gs_id *gs, register REAL *vals)
2063 {
2064   register int *num, *map, **reduce;
2065   register REAL tmp;
2066 
2067 
2068 #ifdef DEBUG
2069   error_msg_warning("start gs_gop_xxx()\n");
2070 #endif
2071 
2072   num    = gs->num_local_reduce;
2073   reduce = gs->local_reduce;
2074   while ((map = *reduce))
2075     {
2076       num ++;
2077       tmp = 0.0;
2078       while (*map >= 0)
2079 	{tmp = MAX_FABS(tmp,*(vals + *map)); map++;}
2080 
2081       map = *reduce++;
2082       while (*map >= 0)
2083 	{*(vals + *map++) = tmp;}
2084     }
2085 }
2086 
2087 
2088 
2089 /******************************************************************************
2090 Function: gather_scatter
2091 
2092 Input :
2093 Output:
2094 Return:
2095 Description:
2096 ******************************************************************************/
2097 static
2098 void
2099 gs_gop_local_in_max_abs(register gs_id *gs, register REAL *vals)
2100 {
2101   register int *num, *map, **reduce;
2102   register REAL *base;
2103 
2104 
2105 #ifdef DEBUG
2106   error_msg_warning("start gs_gop_xxx()\n");
2107 #endif
2108 
2109   num    = gs->num_gop_local_reduce;
2110   reduce = gs->gop_local_reduce;
2111   while ((map = *reduce++))
2112     {
2113       num++;
2114       base = vals + *map++;
2115       while (*map >= 0)
2116 	{*base = MAX_FABS(*base,*(vals + *map)); map++;}
2117     }
2118 }
2119 
2120 
2121 
2122 /******************************************************************************
2123 Function: gather_scatter
2124 
2125 VERSION 3 ::
2126 
2127 Input :
2128 Output:
2129 Return:
2130 Description:
2131 ******************************************************************************/
2132 static
2133 void
2134 gs_gop_pairwise_max_abs(register gs_id *gs, register REAL *in_vals)
2135 {
2136   register REAL *dptr1, *dptr2, *dptr3, *in1, *in2;
2137   register int *iptr, *msg_list, *msg_size, **msg_nodes;
2138   register int *pw, *list, *size, **nodes;
2139   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
2140   MPI_Status status;
2141 
2142 
2143   /* strip and load registers */
2144   msg_list =list         = gs->pair_list;
2145   msg_size =size         = gs->msg_sizes;
2146   msg_nodes=nodes        = gs->node_list;
2147   iptr=pw                = gs->pw_elm_list;
2148   dptr1=dptr3            = gs->pw_vals;
2149   msg_ids_in  = ids_in   = gs->msg_ids_in;
2150   msg_ids_out = ids_out  = gs->msg_ids_out;
2151   dptr2                  = gs->out;
2152   in1=in2                = gs->in;
2153 
2154   /* post the receives */
2155   /*  msg_nodes=nodes; */
2156   do
2157     {
2158       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
2159 	 second one *list and do list++ afterwards */
2160       MPI_Irecv(in1, *size, REAL_TYPE, MPI_ANY_SOURCE, MSGTAG1 + *list++,
2161 		gs->gs_comm, msg_ids_in++);
2162       in1 += *size++;
2163     }
2164   while (*++msg_nodes);
2165   msg_nodes=nodes;
2166 
2167   /* load gs values into in out gs buffers */
2168   while (*iptr >= 0)
2169     {*dptr3++ = *(in_vals + *iptr++);}
2170 
2171   /* load out buffers and post the sends */
2172   while ((iptr = *msg_nodes++))
2173     {
2174       dptr3 = dptr2;
2175       while (*iptr >= 0)
2176 	{*dptr2++ = *(dptr1 + *iptr++);}
2177       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
2178       /* is msg_ids_out++ correct? */
2179       MPI_Isend(dptr3, *msg_size++, REAL_TYPE, *msg_list++,
2180 		MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);
2181     }
2182 
2183   if (gs->max_left_over)
2184     {gs_gop_tree_max_abs(gs,in_vals);}
2185 
2186   /* process the received data */
2187   msg_nodes=nodes;
2188   while ((iptr = *nodes++))
2189     {
2190       /* Should I check the return value of MPI_Wait() or status? */
2191       /* Can this loop be replaced by a call to MPI_Waitall()? */
2192       MPI_Wait(ids_in++, &status);
2193       while (*iptr >= 0)
2194 	{*(dptr1 + *iptr) = MAX_FABS(*(dptr1 + *iptr),*in2); iptr++; in2++;}
2195     }
2196 
2197   /* replace vals */
2198   while (*pw >= 0)
2199     {*(in_vals + *pw++) = *dptr1++;}
2200 
2201   /* clear isend message handles */
2202   /* This changed for clarity though it could be the same */
2203   while (*msg_nodes++)
2204     /* Should I check the return value of MPI_Wait() or status? */
2205     /* Can this loop be replaced by a call to MPI_Waitall()? */
2206     {MPI_Wait(ids_out++, &status);}
2207 }
2208 
2209 
2210 
2211 /******************************************************************************
2212 Function: gather_scatter
2213 
2214 Input :
2215 Output:
2216 Return:
2217 Description:
2218 ******************************************************************************/
2219 static
2220 void
2221 gs_gop_tree_max_abs(gs_id *gs, REAL *vals)
2222 {
2223   int size;
2224   int *in, *out;
2225   REAL *buf, *work;
2226   int op[] = {GL_MAX_ABS,0};
2227 
2228 
2229 #ifdef DEBUG
2230   error_msg_warning("start gs_gop_tree_max_abs()");
2231 #endif
2232 
2233   in   = gs->tree_map_in;
2234   out  = gs->tree_map_out;
2235   buf  = gs->tree_buf;
2236   work = gs->tree_work;
2237   size = gs->tree_nel;
2238 
2239 #if defined BLAS||CBLAS
2240   *work = 0.0;
2241   copy(size,work,0,buf,1);
2242 #else
2243   rvec_zero(buf,size);
2244 #endif
2245 
2246   while (*in >= 0)
2247     {
2248       /*
2249       printf("%d :: out=%d\n",my_id,*out);
2250       printf("%d :: in=%d\n",my_id,*in);
2251       */
2252       *(buf + *out++) = *(vals + *in++);
2253     }
2254 
2255   grop(buf,work,size,op);
2256 
2257   in   = gs->tree_map_in;
2258   out  = gs->tree_map_out;
2259 
2260   while (*in >= 0)
2261     {*(vals + *in++) = *(buf + *out++);}
2262 
2263 #ifdef DEBUG
2264   error_msg_warning("start gs_gop_tree_max_abs()");
2265 #endif
2266 }
2267 
2268 
2269 
2270 /******************************************************************************
2271 Function: gather_scatter
2272 
2273 Input :
2274 Output:
2275 Return:
2276 Description:
2277 ******************************************************************************/
2278 static void
2279 gs_gop_max(register gs_id *gs, register REAL *vals)
2280 {
2281 #ifdef DEBUG
2282   error_msg_warning("start gs_gop_xxx()\n");
2283 #endif
2284 
2285 
2286   /* local only operations!!! */
2287   if (gs->num_local)
2288     {gs_gop_local_max(gs,vals);}
2289 
2290   /* if intersection tree/pairwise and local isn't empty */
2291   if (gs->num_local_gop)
2292     {
2293       gs_gop_local_in_max(gs,vals);
2294 
2295       /* pairwise */
2296       if (gs->num_pairs)
2297 	{gs_gop_pairwise_max(gs,vals);}
2298 
2299       /* tree */
2300       else if (gs->max_left_over)
2301 	{gs_gop_tree_max(gs,vals);}
2302 
2303       gs_gop_local_out(gs,vals);
2304     }
2305   /* if intersection tree/pairwise and local is empty */
2306   else
2307     {
2308       /* pairwise */
2309       if (gs->num_pairs)
2310 	{gs_gop_pairwise_max(gs,vals);}
2311 
2312       /* tree */
2313       else if (gs->max_left_over)
2314 	{gs_gop_tree_max(gs,vals);}
2315     }
2316 }
2317 
2318 
2319 
2320 /******************************************************************************
2321 Function: gather_scatter
2322 
2323 Input :
2324 Output:
2325 Return:
2326 Description:
2327 ******************************************************************************/
2328 static
2329 void
2330 gs_gop_local_max(register gs_id *gs, register REAL *vals)
2331 {
2332   register int *num, *map, **reduce;
2333   register REAL tmp;
2334 
2335 
2336 #ifdef DEBUG
2337   error_msg_warning("start gs_gop_xxx()\n");
2338 #endif
2339 
2340   num    = gs->num_local_reduce;
2341   reduce = gs->local_reduce;
2342   while ((map = *reduce))
2343     {
2344       num ++;
2345       tmp = -REAL_MAX;
2346       while (*map >= 0)
2347 	{tmp = PetscMax(tmp,*(vals + *map)); map++;}
2348 
2349       map = *reduce++;
2350       while (*map >= 0)
2351 	{*(vals + *map++) = tmp;}
2352     }
2353 }
2354 
2355 
2356 
2357 /******************************************************************************
2358 Function: gather_scatter
2359 
2360 Input :
2361 Output:
2362 Return:
2363 Description:
2364 ******************************************************************************/
2365 static
2366 void
2367 gs_gop_local_in_max(register gs_id *gs, register REAL *vals)
2368 {
2369   register int *num, *map, **reduce;
2370   register REAL *base;
2371 
2372 
2373 #ifdef DEBUG
2374   error_msg_warning("start gs_gop_xxx()\n");
2375 #endif
2376 
2377   num    = gs->num_gop_local_reduce;
2378   reduce = gs->gop_local_reduce;
2379   while ((map = *reduce++))
2380     {
2381       num++;
2382       base = vals + *map++;
2383       while (*map >= 0)
2384 	{*base = PetscMax(*base,*(vals + *map)); map++;}
2385     }
2386 }
2387 
2388 
2389 
2390 /******************************************************************************
2391 Function: gather_scatter
2392 
2393 VERSION 3 ::
2394 
2395 Input :
2396 Output:
2397 Return:
2398 Description:
2399 ******************************************************************************/
2400 static
2401 void
2402 gs_gop_pairwise_max(register gs_id *gs, register REAL *in_vals)
2403 {
2404   register REAL *dptr1, *dptr2, *dptr3, *in1, *in2;
2405   register int *iptr, *msg_list, *msg_size, **msg_nodes;
2406   register int *pw, *list, *size, **nodes;
2407   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
2408   MPI_Status status;
2409 
2410 
2411   /* strip and load registers */
2412   msg_list =list         = gs->pair_list;
2413   msg_size =size         = gs->msg_sizes;
2414   msg_nodes=nodes        = gs->node_list;
2415   iptr=pw                = gs->pw_elm_list;
2416   dptr1=dptr3            = gs->pw_vals;
2417   msg_ids_in  = ids_in   = gs->msg_ids_in;
2418   msg_ids_out = ids_out  = gs->msg_ids_out;
2419   dptr2                  = gs->out;
2420   in1=in2                = gs->in;
2421 
2422   /* post the receives */
2423   /*  msg_nodes=nodes; */
2424   do
2425     {
2426       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
2427 	 second one *list and do list++ afterwards */
2428       MPI_Irecv(in1, *size, REAL_TYPE, MPI_ANY_SOURCE, MSGTAG1 + *list++,
2429 		gs->gs_comm, msg_ids_in++);
2430       in1 += *size++;
2431     }
2432   while (*++msg_nodes);
2433   msg_nodes=nodes;
2434 
2435   /* load gs values into in out gs buffers */
2436   while (*iptr >= 0)
2437     {*dptr3++ = *(in_vals + *iptr++);}
2438 
2439   /* load out buffers and post the sends */
2440   while ((iptr = *msg_nodes++))
2441     {
2442       dptr3 = dptr2;
2443       while (*iptr >= 0)
2444 	{*dptr2++ = *(dptr1 + *iptr++);}
2445       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
2446       /* is msg_ids_out++ correct? */
2447       MPI_Isend(dptr3, *msg_size++, REAL_TYPE, *msg_list++,
2448 		MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);
2449     }
2450 
2451   if (gs->max_left_over)
2452     {gs_gop_tree_max(gs,in_vals);}
2453 
2454   /* process the received data */
2455   msg_nodes=nodes;
2456   while ((iptr = *nodes++))
2457     {
2458       /* Should I check the return value of MPI_Wait() or status? */
2459       /* Can this loop be replaced by a call to MPI_Waitall()? */
2460       MPI_Wait(ids_in++, &status);
2461       while (*iptr >= 0)
2462 	{*(dptr1 + *iptr) = PetscMax(*(dptr1 + *iptr),*in2); iptr++; in2++;}
2463     }
2464 
2465   /* replace vals */
2466   while (*pw >= 0)
2467     {*(in_vals + *pw++) = *dptr1++;}
2468 
2469   /* clear isend message handles */
2470   /* This changed for clarity though it could be the same */
2471   while (*msg_nodes++)
2472     /* Should I check the return value of MPI_Wait() or status? */
2473     /* Can this loop be replaced by a call to MPI_Waitall()? */
2474     {MPI_Wait(ids_out++, &status);}
2475 }
2476 
2477 
2478 
2479 /******************************************************************************
2480 Function: gather_scatter
2481 
2482 Input :
2483 Output:
2484 Return:
2485 Description:
2486 ******************************************************************************/
2487 static
2488 void
2489 gs_gop_tree_max(gs_id *gs, REAL *vals)
2490 {
2491   int size;
2492   int *in, *out;
2493   REAL *buf, *work;
2494   /* int op[] = {GL_MAX,0}; */
2495 
2496 
2497 #ifdef DEBUG
2498   error_msg_warning("start gs_gop_tree_max()");
2499 #endif
2500 
2501   in   = gs->tree_map_in;
2502   out  = gs->tree_map_out;
2503   buf  = gs->tree_buf;
2504   work = gs->tree_work;
2505   size = gs->tree_nel;
2506 
2507 #if defined BLAS||CBLAS
2508   *work = -REAL_MAX;
2509   copy(size,work,0,buf,1);
2510 #else
2511   rvec_set(buf,-REAL_MAX,size);
2512 #endif
2513 
2514   while (*in >= 0)
2515     {*(buf + *out++) = *(vals + *in++);}
2516 
2517   in   = gs->tree_map_in;
2518   out  = gs->tree_map_out;
2519   MPI_Allreduce(buf,work,size,REAL_TYPE,MPI_MAX,gs->gs_comm);
2520   while (*in >= 0)
2521     {*(vals + *in++) = *(work + *out++);}
2522 
2523 #ifdef DEBUG
2524   error_msg_warning("end gs_gop_tree_max()");
2525 #endif
2526 }
2527 
2528 
2529 
2530 /******************************************************************************
2531 Function: gather_scatter
2532 
2533 Input :
2534 Output:
2535 Return:
2536 Description:
2537 ******************************************************************************/
2538 static void
2539 gs_gop_min_abs(register gs_id *gs, register REAL *vals)
2540 {
2541 #ifdef DEBUG
2542   error_msg_warning("start gs_gop_xxx()\n");
2543 #endif
2544 
2545   /* local only operations!!! */
2546   if (gs->num_local)
2547     {gs_gop_local_min_abs(gs,vals);}
2548 
2549   /* if intersection tree/pairwise and local isn't empty */
2550   if (gs->num_local_gop)
2551     {
2552       gs_gop_local_in_min_abs(gs,vals);
2553 
2554       /* pairwise */
2555       if (gs->num_pairs)
2556 	{gs_gop_pairwise_min_abs(gs,vals);}
2557 
2558       /* tree */
2559       else if (gs->max_left_over)
2560 	{gs_gop_tree_min_abs(gs,vals);}
2561 
2562       gs_gop_local_out(gs,vals);
2563     }
2564   /* if intersection tree/pairwise and local is empty */
2565   else
2566     {
2567       /* pairwise */
2568       if (gs->num_pairs)
2569 	{gs_gop_pairwise_min_abs(gs,vals);}
2570 
2571       /* tree */
2572       else if (gs->max_left_over)
2573 	{gs_gop_tree_min_abs(gs,vals);}
2574     }
2575 }
2576 
2577 
2578 
2579 /******************************************************************************
2580 Function: gather_scatter
2581 
2582 Input :
2583 Output:
2584 Return:
2585 Description:
2586 ******************************************************************************/
2587 static
2588 void
2589 gs_gop_local_min_abs(register gs_id *gs, register REAL *vals)
2590 {
2591   register int *num, *map, **reduce;
2592   register REAL tmp;
2593 
2594 
2595 #ifdef DEBUG
2596   error_msg_warning("start gs_gop_xxx()\n");
2597 #endif
2598 
2599   num    = gs->num_local_reduce;
2600   reduce = gs->local_reduce;
2601   while ((map = *reduce))
2602     {
2603       num ++;
2604       tmp = REAL_MAX;
2605       while (*map >= 0)
2606 	{tmp = MIN_FABS(tmp,*(vals + *map)); map++;}
2607 
2608       map = *reduce++;
2609       while (*map >= 0)
2610 	{*(vals + *map++) = tmp;}
2611     }
2612 }
2613 
2614 
2615 
2616 /******************************************************************************
2617 Function: gather_scatter
2618 
2619 Input :
2620 Output:
2621 Return:
2622 Description:
2623 ******************************************************************************/
2624 static
2625 void
2626 gs_gop_local_in_min_abs(register gs_id *gs, register REAL *vals)
2627 {
2628   register int *num, *map, **reduce;
2629   register REAL *base;
2630 
2631 #ifdef DEBUG
2632   error_msg_warning("start gs_gop_xxx()\n");
2633 #endif
2634 
2635   num    = gs->num_gop_local_reduce;
2636   reduce = gs->gop_local_reduce;
2637   while ((map = *reduce++))
2638     {
2639       num++;
2640       base = vals + *map++;
2641       while (*map >= 0)
2642 	{*base = MIN_FABS(*base,*(vals + *map)); map++;}
2643     }
2644 }
2645 
2646 
2647 
2648 /******************************************************************************
2649 Function: gather_scatter
2650 
2651 VERSION 3 ::
2652 
2653 Input :
2654 Output:
2655 Return:
2656 Description:
2657 ******************************************************************************/
2658 static
2659 void
2660 gs_gop_pairwise_min_abs(register gs_id *gs, register REAL *in_vals)
2661 {
2662   register REAL *dptr1, *dptr2, *dptr3, *in1, *in2;
2663   register int *iptr, *msg_list, *msg_size, **msg_nodes;
2664   register int *pw, *list, *size, **nodes;
2665   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
2666   MPI_Status status;
2667 
2668 
2669   /* strip and load registers */
2670   msg_list =list         = gs->pair_list;
2671   msg_size =size         = gs->msg_sizes;
2672   msg_nodes=nodes        = gs->node_list;
2673   iptr=pw                = gs->pw_elm_list;
2674   dptr1=dptr3            = gs->pw_vals;
2675   msg_ids_in  = ids_in   = gs->msg_ids_in;
2676   msg_ids_out = ids_out  = gs->msg_ids_out;
2677   dptr2                  = gs->out;
2678   in1=in2                = gs->in;
2679 
2680   /* post the receives */
2681   /*  msg_nodes=nodes; */
2682   do
2683     {
2684       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
2685 	 second one *list and do list++ afterwards */
2686       MPI_Irecv(in1, *size, REAL_TYPE, MPI_ANY_SOURCE, MSGTAG1 + *list++,
2687 		gs->gs_comm, msg_ids_in++);
2688       in1 += *size++;
2689     }
2690   while (*++msg_nodes);
2691   msg_nodes=nodes;
2692 
2693   /* load gs values into in out gs buffers */
2694   while (*iptr >= 0)
2695     {*dptr3++ = *(in_vals + *iptr++);}
2696 
2697   /* load out buffers and post the sends */
2698   while ((iptr = *msg_nodes++))
2699     {
2700       dptr3 = dptr2;
2701       while (*iptr >= 0)
2702 	{*dptr2++ = *(dptr1 + *iptr++);}
2703       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
2704       /* is msg_ids_out++ correct? */
2705       MPI_Isend(dptr3, *msg_size++, REAL_TYPE, *msg_list++,
2706 		MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);
2707     }
2708 
2709   if (gs->max_left_over)
2710     {gs_gop_tree_min_abs(gs,in_vals);}
2711 
2712   /* process the received data */
2713   msg_nodes=nodes;
2714   while ((iptr = *nodes++))
2715     {
2716       /* Should I check the return value of MPI_Wait() or status? */
2717       /* Can this loop be replaced by a call to MPI_Waitall()? */
2718       MPI_Wait(ids_in++, &status);
2719       while (*iptr >= 0)
2720 	{*(dptr1 + *iptr) = MIN_FABS(*(dptr1 + *iptr),*in2); iptr++; in2++;}
2721     }
2722 
2723   /* replace vals */
2724   while (*pw >= 0)
2725     {*(in_vals + *pw++) = *dptr1++;}
2726 
2727   /* clear isend message handles */
2728   /* This changed for clarity though it could be the same */
2729   while (*msg_nodes++)
2730     /* Should I check the return value of MPI_Wait() or status? */
2731     /* Can this loop be replaced by a call to MPI_Waitall()? */
2732     {MPI_Wait(ids_out++, &status);}
2733 }
2734 
2735 
2736 
2737 /******************************************************************************
2738 Function: gather_scatter
2739 
2740 Input :
2741 Output:
2742 Return:
2743 Description:
2744 ******************************************************************************/
2745 static
2746 void
2747 gs_gop_tree_min_abs(gs_id *gs, REAL *vals)
2748 {
2749   int size;
2750   int *in, *out;
2751   REAL *buf, *work;
2752   int op[] = {GL_MIN_ABS,0};
2753 
2754 
2755 #ifdef DEBUG
2756   error_msg_warning("start gs_gop_tree_min_abs()");
2757 #endif
2758 
2759   in   = gs->tree_map_in;
2760   out  = gs->tree_map_out;
2761   buf  = gs->tree_buf;
2762   work = gs->tree_work;
2763   size = gs->tree_nel;
2764 
2765 #if defined  BLAS||CBLAS
2766   *work = REAL_MAX;
2767   copy(size,work,0,buf,1);
2768 #else
2769   rvec_set(buf,REAL_MAX,size);
2770 #endif
2771 
2772   while (*in >= 0)
2773     {*(buf + *out++) = *(vals + *in++);}
2774 
2775   in   = gs->tree_map_in;
2776   out  = gs->tree_map_out;
2777   grop(buf,work,size,op);
2778   while (*in >= 0)
2779     {*(vals + *in++) = *(buf + *out++);}
2780 
2781 #ifdef DEBUG
2782   error_msg_warning("end gs_gop_tree_min_abs()");
2783 #endif
2784 }
2785 
2786 
2787 
2788 /******************************************************************************
2789 Function: gather_scatter
2790 
2791 Input :
2792 Output:
2793 Return:
2794 Description:
2795 ******************************************************************************/
2796 static void
2797 gs_gop_min(register gs_id *gs, register REAL *vals)
2798 {
2799 #ifdef DEBUG
2800   error_msg_warning("start gs_gop_xxx()\n");
2801 #endif
2802 
2803   /* local only operations!!! */
2804   if (gs->num_local)
2805     {gs_gop_local_min(gs,vals);}
2806 
2807   /* if intersection tree/pairwise and local isn't empty */
2808   if (gs->num_local_gop)
2809     {
2810       gs_gop_local_in_min(gs,vals);
2811 
2812       /* pairwise */
2813       if (gs->num_pairs)
2814 	{gs_gop_pairwise_min(gs,vals);}
2815 
2816       /* tree */
2817       else if (gs->max_left_over)
2818 	{gs_gop_tree_min(gs,vals);}
2819 
2820       gs_gop_local_out(gs,vals);
2821     }
2822   /* if intersection tree/pairwise and local is empty */
2823   else
2824     {
2825       /* pairwise */
2826       if (gs->num_pairs)
2827 	{gs_gop_pairwise_min(gs,vals);}
2828 
2829       /* tree */
2830       else if (gs->max_left_over)
2831 	{gs_gop_tree_min(gs,vals);}
2832     }
2833 #ifdef DEBUG
2834   error_msg_warning("end gs_gop_xxx()\n");
2835 #endif
2836 }
2837 
2838 
2839 
2840 /******************************************************************************
2841 Function: gather_scatter
2842 
2843 Input :
2844 Output:
2845 Return:
2846 Description:
2847 ******************************************************************************/
2848 static
2849 void
2850 gs_gop_local_min(register gs_id *gs, register REAL *vals)
2851 {
2852   register int *num, *map, **reduce;
2853   register REAL tmp;
2854 
2855 
2856 #ifdef DEBUG
2857   error_msg_warning("start gs_gop_xxx()\n");
2858 #endif
2859 
2860   num    = gs->num_local_reduce;
2861   reduce = gs->local_reduce;
2862   while ((map = *reduce))
2863     {
2864       num ++;
2865       tmp = REAL_MAX;
2866       while (*map >= 0)
2867 	{tmp = PetscMin(tmp,*(vals + *map)); map++;}
2868 
2869       map = *reduce++;
2870       while (*map >= 0)
2871 	{*(vals + *map++) = tmp;}
2872     }
2873 }
2874 
2875 
2876 
2877 /******************************************************************************
2878 Function: gather_scatter
2879 
2880 Input :
2881 Output:
2882 Return:
2883 Description:
2884 ******************************************************************************/
2885 static
2886 void
2887 gs_gop_local_in_min(register gs_id *gs, register REAL *vals)
2888 {
2889   register int *num, *map, **reduce;
2890   register REAL *base;
2891 
2892 
2893 #ifdef DEBUG
2894   error_msg_warning("start gs_gop_xxx()\n");
2895 #endif
2896 
2897   num    = gs->num_gop_local_reduce;
2898   reduce = gs->gop_local_reduce;
2899   while ((map = *reduce++))
2900     {
2901       num++;
2902       base = vals + *map++;
2903       while (*map >= 0)
2904 	{*base = PetscMin(*base,*(vals + *map)); map++;}
2905     }
2906 }
2907 
2908 
2909 
2910 /******************************************************************************
2911 Function: gather_scatter
2912 
2913 VERSION 3 ::
2914 
2915 Input :
2916 Output:
2917 Return:
2918 Description:
2919 ******************************************************************************/
2920 static
2921 void
2922 gs_gop_pairwise_min(register gs_id *gs, register REAL *in_vals)
2923 {
2924   register REAL *dptr1, *dptr2, *dptr3, *in1, *in2;
2925   register int *iptr, *msg_list, *msg_size, **msg_nodes;
2926   register int *pw, *list, *size, **nodes;
2927   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
2928   MPI_Status status;
2929 
2930 
2931   /* strip and load registers */
2932   msg_list =list         = gs->pair_list;
2933   msg_size =size         = gs->msg_sizes;
2934   msg_nodes=nodes        = gs->node_list;
2935   iptr=pw                = gs->pw_elm_list;
2936   dptr1=dptr3            = gs->pw_vals;
2937   msg_ids_in  = ids_in   = gs->msg_ids_in;
2938   msg_ids_out = ids_out  = gs->msg_ids_out;
2939   dptr2                  = gs->out;
2940   in1=in2                = gs->in;
2941 
2942   /* post the receives */
2943   /*  msg_nodes=nodes; */
2944   do
2945     {
2946       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
2947 	 second one *list and do list++ afterwards */
2948       MPI_Irecv(in1, *size, REAL_TYPE, MPI_ANY_SOURCE, MSGTAG1 + *list++,
2949 		gs->gs_comm, msg_ids_in++);
2950       in1 += *size++;
2951     }
2952   while (*++msg_nodes);
2953   msg_nodes=nodes;
2954 
2955   /* load gs values into in out gs buffers */
2956   while (*iptr >= 0)
2957     {*dptr3++ = *(in_vals + *iptr++);}
2958 
2959   /* load out buffers and post the sends */
2960   while ((iptr = *msg_nodes++))
2961     {
2962       dptr3 = dptr2;
2963       while (*iptr >= 0)
2964 	{*dptr2++ = *(dptr1 + *iptr++);}
2965       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
2966       /* is msg_ids_out++ correct? */
2967       MPI_Isend(dptr3, *msg_size++, REAL_TYPE, *msg_list++,
2968 		MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);
2969     }
2970 
2971   /* process the received data */
2972   if (gs->max_left_over)
2973     {gs_gop_tree_min(gs,in_vals);}
2974 
2975   msg_nodes=nodes;
2976   while ((iptr = *nodes++))
2977     {
2978       /* Should I check the return value of MPI_Wait() or status? */
2979       /* Can this loop be replaced by a call to MPI_Waitall()? */
2980       MPI_Wait(ids_in++, &status);
2981       while (*iptr >= 0)
2982 	{*(dptr1 + *iptr) = PetscMin(*(dptr1 + *iptr),*in2); iptr++; in2++;}
2983     }
2984 
2985   /* replace vals */
2986   while (*pw >= 0)
2987     {*(in_vals + *pw++) = *dptr1++;}
2988 
2989   /* clear isend message handles */
2990   /* This changed for clarity though it could be the same */
2991   while (*msg_nodes++)
2992     /* Should I check the return value of MPI_Wait() or status? */
2993     /* Can this loop be replaced by a call to MPI_Waitall()? */
2994     {MPI_Wait(ids_out++, &status);}
2995 }
2996 
2997 
2998 
2999 /******************************************************************************
3000 Function: gather_scatter
3001 
3002 Input :
3003 Output:
3004 Return:
3005 Description:
3006 ******************************************************************************/
3007 static
3008 void
3009 gs_gop_tree_min(gs_id *gs, REAL *vals)
3010 {
3011   int size;
3012   int *in, *out;
3013   REAL *buf, *work;
3014   /*int op[] = {GL_MIN,0};*/
3015 
3016 
3017 #ifdef DEBUG
3018   error_msg_warning("start gs_gop_tree_min()");
3019 #endif
3020 
3021   in   = gs->tree_map_in;
3022   out  = gs->tree_map_out;
3023   buf  = gs->tree_buf;
3024   work = gs->tree_work;
3025   size = gs->tree_nel;
3026 
3027 #if defined  BLAS||CBLAS
3028   *work = REAL_MAX;
3029   copy(size,work,0,buf,1);
3030 #else
3031   rvec_set(buf,REAL_MAX,size);
3032 #endif
3033 
3034   while (*in >= 0)
3035     {*(buf + *out++) = *(vals + *in++);}
3036 
3037   in   = gs->tree_map_in;
3038   out  = gs->tree_map_out;
3039   MPI_Allreduce(buf,work,size,REAL_TYPE,MPI_MIN,gs->gs_comm);
3040   while (*in >= 0)
3041     {*(vals + *in++) = *(work + *out++);}
3042 
3043 #ifdef DEBUG
3044   error_msg_warning("end gs_gop_tree_min()");
3045 #endif
3046 }
3047 
3048 
3049 
3050 /******************************************************************************
3051 Function: gather_scatter
3052 
3053 Input :
3054 Output:
3055 Return:
3056 Description:
3057 ******************************************************************************/
3058 static void
3059 gs_gop_times(register gs_id *gs, register REAL *vals)
3060 {
3061 #ifdef DEBUG
3062   error_msg_warning("start gs_gop_times()\n");
3063 #endif
3064 
3065   /* local only operations!!! */
3066   if (gs->num_local)
3067     {gs_gop_local_times(gs,vals);}
3068 
3069   /* if intersection tree/pairwise and local isn't empty */
3070   if (gs->num_local_gop)
3071     {
3072       gs_gop_local_in_times(gs,vals);
3073 
3074       /* pairwise */
3075       if (gs->num_pairs)
3076 	{gs_gop_pairwise_times(gs,vals);}
3077 
3078       /* tree */
3079       else if (gs->max_left_over)
3080 	{gs_gop_tree_times(gs,vals);}
3081 
3082       gs_gop_local_out(gs,vals);
3083     }
3084   /* if intersection tree/pairwise and local is empty */
3085   else
3086     {
3087       /* pairwise */
3088       if (gs->num_pairs)
3089 	{gs_gop_pairwise_times(gs,vals);}
3090 
3091       /* tree */
3092       else if (gs->max_left_over)
3093 	{gs_gop_tree_times(gs,vals);}
3094     }
3095 }
3096 
3097 
3098 
3099 /******************************************************************************
3100 Function: gather_scatter
3101 
3102 Input :
3103 Output:
3104 Return:
3105 Description:
3106 ******************************************************************************/
3107 static
3108 void
3109 gs_gop_local_times(register gs_id *gs, register REAL *vals)
3110 {
3111   register int *num, *map, **reduce;
3112   register REAL tmp;
3113 
3114 
3115 #ifdef DEBUG
3116   error_msg_warning("start gs_gop_xxx()\n");
3117 #endif
3118 
3119   num    = gs->num_local_reduce;
3120   reduce = gs->local_reduce;
3121   while ((map = *reduce))
3122     {
3123       /* wall */
3124       if (*num == 2)
3125 	{
3126 	  num ++; reduce++;
3127 	  vals[map[1]] = vals[map[0]] *= vals[map[1]];
3128 	}
3129       /* corner shared by three elements */
3130       else if (*num == 3)
3131 	{
3132 	  num ++; reduce++;
3133 	  vals[map[2]]=vals[map[1]]=vals[map[0]]*=(vals[map[1]]*vals[map[2]]);
3134 	}
3135       /* corner shared by four elements */
3136       else if (*num == 4)
3137 	{
3138 	  num ++; reduce++;
3139 	  vals[map[1]]=vals[map[2]]=vals[map[3]]=vals[map[0]] *=
3140 	                         (vals[map[1]] * vals[map[2]] * vals[map[3]]);
3141 	}
3142       /* general case ... odd geoms ... 3D*/
3143       else
3144 	{
3145 	  num ++;
3146  	  tmp = 1.0;
3147 	  while (*map >= 0)
3148 	    {tmp *= *(vals + *map++);}
3149 
3150 	  map = *reduce++;
3151 	  while (*map >= 0)
3152 	    {*(vals + *map++) = tmp;}
3153 	}
3154     }
3155 }
3156 
3157 
3158 
3159 /******************************************************************************
3160 Function: gather_scatter
3161 
3162 Input :
3163 Output:
3164 Return:
3165 Description:
3166 ******************************************************************************/
3167 static
3168 void
3169 gs_gop_local_in_times(register gs_id *gs, register REAL *vals)
3170 {
3171   register int *num, *map, **reduce;
3172   register REAL *base;
3173 
3174 
3175 #ifdef DEBUG
3176   error_msg_warning("start gs_gop_xxx()\n");
3177 #endif
3178 
3179   num    = gs->num_gop_local_reduce;
3180   reduce = gs->gop_local_reduce;
3181   while ((map = *reduce++))
3182     {
3183       /* wall */
3184       if (*num == 2)
3185 	{
3186 	  num ++;
3187 	  vals[map[0]] *= vals[map[1]];
3188 	}
3189       /* corner shared by three elements */
3190       else if (*num == 3)
3191 	{
3192 	  num ++;
3193 	  vals[map[0]] *= (vals[map[1]] * vals[map[2]]);
3194 	}
3195       /* corner shared by four elements */
3196       else if (*num == 4)
3197 	{
3198 	  num ++;
3199 	  vals[map[0]] *= (vals[map[1]] * vals[map[2]] * vals[map[3]]);
3200 	}
3201       /* general case ... odd geoms ... 3D*/
3202       else
3203 	{
3204 	  num++;
3205 	  base = vals + *map++;
3206 	  while (*map >= 0)
3207 	    {*base *= *(vals + *map++);}
3208 	}
3209     }
3210 }
3211 
3212 
3213 
3214 /******************************************************************************
3215 Function: gather_scatter
3216 
3217 VERSION 3 ::
3218 
3219 Input :
3220 Output:
3221 Return:
3222 Description:
3223 ******************************************************************************/
3224 static
3225 void
3226 gs_gop_pairwise_times(register gs_id *gs, register REAL *in_vals)
3227 {
3228   register REAL *dptr1, *dptr2, *dptr3, *in1, *in2;
3229   register int *iptr, *msg_list, *msg_size, **msg_nodes;
3230   register int *pw, *list, *size, **nodes;
3231   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
3232   MPI_Status status;
3233 
3234 
3235   /* strip and load registers */
3236   msg_list =list         = gs->pair_list;
3237   msg_size =size         = gs->msg_sizes;
3238   msg_nodes=nodes        = gs->node_list;
3239   iptr=pw                = gs->pw_elm_list;
3240   dptr1=dptr3            = gs->pw_vals;
3241   msg_ids_in  = ids_in   = gs->msg_ids_in;
3242   msg_ids_out = ids_out  = gs->msg_ids_out;
3243   dptr2                  = gs->out;
3244   in1=in2                = gs->in;
3245 
3246   /* post the receives */
3247   /*  msg_nodes=nodes; */
3248   do
3249     {
3250       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
3251 	 second one *list and do list++ afterwards */
3252       MPI_Irecv(in1, *size, REAL_TYPE, MPI_ANY_SOURCE, MSGTAG1 + *list++,
3253 		gs->gs_comm, msg_ids_in++);
3254       in1 += *size++;
3255     }
3256   while (*++msg_nodes);
3257   msg_nodes=nodes;
3258 
3259   /* load gs values into in out gs buffers */
3260   while (*iptr >= 0)
3261     {*dptr3++ = *(in_vals + *iptr++);}
3262 
3263   /* load out buffers and post the sends */
3264   while ((iptr = *msg_nodes++))
3265     {
3266       dptr3 = dptr2;
3267       while (*iptr >= 0)
3268 	{*dptr2++ = *(dptr1 + *iptr++);}
3269       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
3270       /* is msg_ids_out++ correct? */
3271       MPI_Isend(dptr3, *msg_size++, REAL_TYPE, *msg_list++,
3272 		MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);
3273     }
3274 
3275   if (gs->max_left_over)
3276     {gs_gop_tree_times(gs,in_vals);}
3277 
3278   /* process the received data */
3279   msg_nodes=nodes;
3280   while ((iptr = *nodes++))
3281     {
3282       /* Should I check the return value of MPI_Wait() or status? */
3283       /* Can this loop be replaced by a call to MPI_Waitall()? */
3284       MPI_Wait(ids_in++, &status);
3285       while (*iptr >= 0)
3286 	{*(dptr1 + *iptr++) *= *in2++;}
3287     }
3288 
3289   /* replace vals */
3290   while (*pw >= 0)
3291     {*(in_vals + *pw++) = *dptr1++;}
3292 
3293   /* clear isend message handles */
3294   /* This changed for clarity though it could be the same */
3295   while (*msg_nodes++)
3296     /* Should I check the return value of MPI_Wait() or status? */
3297     /* Can this loop be replaced by a call to MPI_Waitall()? */
3298     {MPI_Wait(ids_out++, &status);}
3299 }
3300 
3301 
3302 
3303 /******************************************************************************
3304 Function: gather_scatter
3305 
3306 Input :
3307 Output:
3308 Return:
3309 Description:
3310 ******************************************************************************/
3311 static
3312 void
3313 gs_gop_tree_times(gs_id *gs, REAL *vals)
3314 {
3315   int size;
3316   int *in, *out;
3317   REAL *buf, *work;
3318   /*int op[] = {GL_MULT,0};*/
3319 
3320 
3321 #ifdef DEBUG
3322   error_msg_warning("start gs_gop_tree_times()");
3323 #endif
3324 
3325   in   = gs->tree_map_in;
3326   out  = gs->tree_map_out;
3327   buf  = gs->tree_buf;
3328   work = gs->tree_work;
3329   size = gs->tree_nel;
3330 
3331 #if defined  BLAS||CBLAS
3332   *work = 1.0;
3333   copy(size,work,0,buf,1);
3334 #else
3335   rvec_one(buf,size);
3336 #endif
3337 
3338   while (*in >= 0)
3339     {*(buf + *out++) = *(vals + *in++);}
3340 
3341   in   = gs->tree_map_in;
3342   out  = gs->tree_map_out;
3343   MPI_Allreduce(buf,work,size,REAL_TYPE,MPI_PROD,gs->gs_comm);
3344   while (*in >= 0)
3345     {*(vals + *in++) = *(work + *out++);}
3346 
3347 #ifdef DEBUG
3348   error_msg_warning("end gs_gop_tree_times()");
3349 #endif
3350 }
3351 
3352 
3353 
3354 /******************************************************************************
3355 Function: gather_scatter
3356 
3357 
3358 Input :
3359 Output:
3360 Return:
3361 Description:
3362 ******************************************************************************/
3363 static void
3364 gs_gop_plus(register gs_id *gs, register REAL *vals)
3365 {
3366 #ifdef DEBUG
3367   error_msg_warning("start gs_gop_plus()\n");
3368 #endif
3369 
3370   /* local only operations!!! */
3371   if (gs->num_local)
3372     {gs_gop_local_plus(gs,vals);}
3373 
3374   /* if intersection tree/pairwise and local isn't empty */
3375   if (gs->num_local_gop)
3376     {
3377       gs_gop_local_in_plus(gs,vals);
3378 
3379       /* pairwise will NOT do tree inside ... */
3380       if (gs->num_pairs)
3381 	{gs_gop_pairwise_plus(gs,vals);}
3382 
3383       /* tree */
3384       if (gs->max_left_over)
3385 	{gs_gop_tree_plus(gs,vals);}
3386 
3387       gs_gop_local_out(gs,vals);
3388     }
3389   /* if intersection tree/pairwise and local is empty */
3390   else
3391     {
3392       /* pairwise will NOT do tree inside */
3393       if (gs->num_pairs)
3394 	{gs_gop_pairwise_plus(gs,vals);}
3395 
3396       /* tree */
3397       if (gs->max_left_over)
3398 	{gs_gop_tree_plus(gs,vals);}
3399     }
3400 
3401 #ifdef DEBUG
3402   error_msg_warning("end gs_gop_plus()\n");
3403 #endif
3404 }
3405 
3406 
3407 
3408 /******************************************************************************
3409 Function: gather_scatter
3410 
3411 Input :
3412 Output:
3413 Return:
3414 Description:
3415 ******************************************************************************/
3416 static
3417 void
3418 gs_gop_local_plus(register gs_id *gs, register REAL *vals)
3419 {
3420   register int *num, *map, **reduce;
3421   register REAL tmp;
3422 
3423 
3424 #ifdef DEBUG
3425   error_msg_warning("begin gs_gop_local_plus()\n");
3426 #endif
3427 
3428   num    = gs->num_local_reduce;
3429   reduce = gs->local_reduce;
3430   while ((map = *reduce))
3431     {
3432       /* wall */
3433       if (*num == 2)
3434 	{
3435 	  num ++; reduce++;
3436 	  vals[map[1]] = vals[map[0]] += vals[map[1]];
3437 	}
3438       /* corner shared by three elements */
3439       else if (*num == 3)
3440 	{
3441 	  num ++; reduce++;
3442 	  vals[map[2]]=vals[map[1]]=vals[map[0]]+=(vals[map[1]]+vals[map[2]]);
3443 	}
3444       /* corner shared by four elements */
3445       else if (*num == 4)
3446 	{
3447 	  num ++; reduce++;
3448 	  vals[map[1]]=vals[map[2]]=vals[map[3]]=vals[map[0]] +=
3449 	                         (vals[map[1]] + vals[map[2]] + vals[map[3]]);
3450 	}
3451       /* general case ... odd geoms ... 3D*/
3452       else
3453 	{
3454 	  num ++;
3455  	  tmp = 0.0;
3456 	  while (*map >= 0)
3457 	    {tmp += *(vals + *map++);}
3458 
3459 	  map = *reduce++;
3460 	  while (*map >= 0)
3461 	    {*(vals + *map++) = tmp;}
3462 	}
3463     }
3464 #ifdef DEBUG
3465   error_msg_warning("end gs_gop_local_plus()\n");
3466 #endif
3467 }
3468 
3469 
3470 
3471 /******************************************************************************
3472 Function: gather_scatter
3473 
3474 Input :
3475 Output:
3476 Return:
3477 Description:
3478 ******************************************************************************/
3479 static
3480 void
3481 gs_gop_local_in_plus(register gs_id *gs, register REAL *vals)
3482 {
3483   register int *num, *map, **reduce;
3484   register REAL *base;
3485 
3486 
3487 #ifdef DEBUG
3488   error_msg_warning("begin gs_gop_local_in_plus()\n");
3489 #endif
3490 
3491   num    = gs->num_gop_local_reduce;
3492   reduce = gs->gop_local_reduce;
3493   while ((map = *reduce++))
3494     {
3495       /* wall */
3496       if (*num == 2)
3497 	{
3498 	  num ++;
3499 	  vals[map[0]] += vals[map[1]];
3500 	}
3501       /* corner shared by three elements */
3502       else if (*num == 3)
3503 	{
3504 	  num ++;
3505 	  vals[map[0]] += (vals[map[1]] + vals[map[2]]);
3506 	}
3507       /* corner shared by four elements */
3508       else if (*num == 4)
3509 	{
3510 	  num ++;
3511 	  vals[map[0]] += (vals[map[1]] + vals[map[2]] + vals[map[3]]);
3512 	}
3513       /* general case ... odd geoms ... 3D*/
3514       else
3515 	{
3516 	  num++;
3517 	  base = vals + *map++;
3518 	  while (*map >= 0)
3519 	    {*base += *(vals + *map++);}
3520 	}
3521     }
3522 #ifdef DEBUG
3523   error_msg_warning("end gs_gop_local_in_plus()\n");
3524 #endif
3525 }
3526 
3527 
3528 
3529 /******************************************************************************
3530 Function: gather_scatter
3531 
3532 VERSION 3 ::
3533 
3534 Input :
3535 Output:
3536 Return:
3537 Description:
3538 ******************************************************************************/
3539 static
3540 void
3541 gs_gop_pairwise_plus(register gs_id *gs, register REAL *in_vals)
3542 {
3543   register REAL *dptr1, *dptr2, *dptr3, *in1, *in2;
3544   register int *iptr, *msg_list, *msg_size, **msg_nodes;
3545   register int *pw, *list, *size, **nodes;
3546   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
3547   MPI_Status status;
3548 
3549 
3550 #ifdef DEBUG
3551   error_msg_warning("gs_gop_pairwise_plus() start\n");
3552 #endif
3553 
3554   /* strip and load registers */
3555   msg_list =list         = gs->pair_list;
3556   msg_size =size         = gs->msg_sizes;
3557   msg_nodes=nodes        = gs->node_list;
3558   iptr=pw                = gs->pw_elm_list;
3559   dptr1=dptr3            = gs->pw_vals;
3560   msg_ids_in  = ids_in   = gs->msg_ids_in;
3561   msg_ids_out = ids_out  = gs->msg_ids_out;
3562   dptr2                  = gs->out;
3563   in1=in2                = gs->in;
3564 
3565   /* post the receives */
3566   /*  msg_nodes=nodes; */
3567   do
3568     {
3569       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
3570 	 second one *list and do list++ afterwards */
3571       MPI_Irecv(in1, *size, REAL_TYPE, MPI_ANY_SOURCE, MSGTAG1 + *list++,
3572 		gs->gs_comm, msg_ids_in++);
3573       in1 += *size++;
3574     }
3575   while (*++msg_nodes);
3576   msg_nodes=nodes;
3577 
3578   /* load gs values into in out gs buffers */
3579   while (*iptr >= 0)
3580     {*dptr3++ = *(in_vals + *iptr++);}
3581 
3582   /* load out buffers and post the sends */
3583   while ((iptr = *msg_nodes++))
3584     {
3585       dptr3 = dptr2;
3586       while (*iptr >= 0)
3587 	{*dptr2++ = *(dptr1 + *iptr++);}
3588       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
3589       /* is msg_ids_out++ correct? */
3590       MPI_Isend(dptr3, *msg_size++, REAL_TYPE, *msg_list++,
3591 		MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);
3592     }
3593 
3594   /* do the tree while we're waiting */
3595   if (gs->max_left_over)
3596     {gs_gop_tree_plus(gs,in_vals);}
3597 
3598   /* process the received data */
3599   msg_nodes=nodes;
3600   while ((iptr = *nodes++))
3601     {
3602       /* Should I check the return value of MPI_Wait() or status? */
3603       /* Can this loop be replaced by a call to MPI_Waitall()? */
3604       MPI_Wait(ids_in++, &status);
3605       while (*iptr >= 0)
3606 	{*(dptr1 + *iptr++) += *in2++;}
3607     }
3608 
3609   /* replace vals */
3610   while (*pw >= 0)
3611     {*(in_vals + *pw++) = *dptr1++;}
3612 
3613   /* clear isend message handles */
3614   /* This changed for clarity though it could be the same */
3615   while (*msg_nodes++)
3616     /* Should I check the return value of MPI_Wait() or status? */
3617     /* Can this loop be replaced by a call to MPI_Waitall()? */
3618     {MPI_Wait(ids_out++, &status);}
3619 
3620 #ifdef DEBUG
3621   error_msg_warning("gs_gop_pairwise_plus() end\n");
3622 #endif
3623 
3624 }
3625 
3626 
3627 
3628 /******************************************************************************
3629 Function: gather_scatter
3630 
3631 Input :
3632 Output:
3633 Return:
3634 Description:
3635 ******************************************************************************/
3636 static
3637 void
3638 gs_gop_tree_plus(gs_id *gs, REAL *vals)
3639 {
3640   int size;
3641   int *in, *out;
3642   REAL *buf, *work;
3643   /*int op[] = {GL_ADD,0}; */
3644 
3645 
3646 #ifdef DEBUG
3647   error_msg_warning("start gs_gop_tree_plus()\n");
3648 #endif
3649 
3650   in   = gs->tree_map_in;
3651   out  = gs->tree_map_out;
3652   buf  = gs->tree_buf;
3653   work = gs->tree_work;
3654   size = gs->tree_nel;
3655 
3656 #if defined  BLAS||CBLAS
3657   *work = 0.0;
3658   copy(size,work,0,buf,1);
3659 #else
3660   rvec_zero(buf,size);
3661 #endif
3662 
3663   while (*in >= 0)
3664     {*(buf + *out++) = *(vals + *in++);}
3665 
3666   in   = gs->tree_map_in;
3667   out  = gs->tree_map_out;
3668   MPI_Allreduce(buf,work,size,REAL_TYPE,MPI_SUM,gs->gs_comm);
3669   while (*in >= 0)
3670     {*(vals + *in++) = *(work + *out++);}
3671 
3672 #ifdef DEBUG
3673   error_msg_warning("end gs_gop_tree_plus()\n");
3674 #endif
3675 }
3676 
3677 
3678 
3679 /******************************************************************************
3680 Function: level_best_guess()
3681 
3682 Input :
3683 Output:
3684 Return:
3685 Description:
3686 ******************************************************************************/
3687 #if defined(not_used)
3688 static
3689 PetscErrorCode level_best_guess(void)
3690 {
3691   /* full pairwise for now */
3692   return(num_nodes);
3693 }
3694 #endif
3695 
3696 
3697 /******************************************************************************
3698 Function: gs_print_template()
3699 
3700 Input :
3701 
3702 Output:
3703 
3704 Return:
3705 
3706 Description:
3707 ******************************************************************************/
3708 #if defined(not_used)
3709 static
3710 void
3711 gs_print_template(register gs_id* gs, int who)
3712 {
3713   register int j, k, *iptr, *iptr2;
3714 
3715 
3716   if ((my_id == who) && (num_gs_ids))
3717     {
3718       printf("\n\nP#%d's GS#%d template:\n", my_id, gs->id);
3719       printf("id=%d\n",          gs->id);
3720       printf("nel(unique)=%d\n", gs->nel);
3721       printf("nel_max=%d\n",     gs->nel_max);
3722       printf("nel_min=%d\n",     gs->nel_min);
3723       printf("nel_sum=%d\n",     gs->nel_sum);
3724       printf("negl=%d\n",        gs->negl);
3725       printf("gl_max=%d\n",      gs->gl_max);
3726       printf("gl_min=%d\n",      gs->gl_min);
3727       printf("elms ordered=%d\n",gs->ordered);
3728       printf("repeats=%d\n",     gs->repeats);
3729       printf("positive=%d\n",    gs->positive);
3730       printf("elms=%ld\n",        (PTRINT) gs->elms);
3731       printf("elms(total)=%ld\n", (PTRINT) gs->local_elms);
3732       printf("vals=%ld\n",        (PTRINT) gs->vals);
3733       printf("gl_bss_min=%d\n",  gs->gl_bss_min);
3734       printf("gl_perm_min=%d\n", gs->gl_perm_min);
3735       printf("level=%d\n",       gs->level);
3736       printf("proc_mask_sz=%d\n",gs->mask_sz);
3737       printf("sh_proc_mask=%ld\n",(PTRINT) gs->nghs);
3738       printf("ngh_buf_size=%d\n",gs->ngh_buf_sz);
3739       printf("ngh_buf=%ld\n",     (PTRINT) gs->ngh_buf);
3740       printf("num_nghs=%d\n",    gs->num_nghs);
3741       printf("max_nghs=%d\n",    gs->max_nghs);
3742 
3743       /* pairwise exchange information */
3744       printf("\nPaiwise Info:\n");
3745       printf("num_pairs=%d\n",   gs->num_pairs);
3746       printf("max_pairs=%d\n",   gs->max_pairs);
3747       printf("len_pw_list=%d\n", gs->len_pw_list);
3748       printf("pair_list=%ld\n",   (PTRINT) gs->pair_list);
3749       printf("msg_sizes=%ld\n",   (PTRINT) gs->msg_sizes);
3750       printf("node_list=%ld\n",   (PTRINT) gs->node_list);
3751       printf("pw_elm_list=%ld\n", (PTRINT) gs->pw_elm_list);
3752 
3753       printf("pw_elm_list: ");
3754       if ((iptr = gs->pw_elm_list))
3755 	{
3756 	  for (j=0;j<gs->len_pw_list;j++)
3757 	    {printf("%d ", *iptr); iptr++;}
3758 	}
3759       printf("\n");
3760 
3761       printf("processor_list: ");
3762       if ((iptr = gs->pair_list))
3763 	{
3764 	  for (j=0;j<gs->num_pairs;j++)
3765 	    {printf("%d ", *iptr); iptr++;}
3766 	}
3767       printf("\n");
3768 
3769       printf("loc_node_pairs=%d\n",   gs->loc_node_pairs);
3770       printf("max_node_pairs=%d\n",   gs->max_node_pairs);
3771       printf("min_node_pairs=%d\n",   gs->min_node_pairs);
3772       printf("avg_node_pairs=%d\n",   gs->avg_node_pairs);
3773 
3774       printf("size_list: ");
3775       if ((iptr = gs->msg_sizes))
3776 	{
3777 	  for (j=0;j<gs->num_pairs;j++)
3778 	    {printf("%d ", *iptr); iptr++;}
3779 	}
3780       printf("\n");
3781       if ((iptr = gs->pair_list))
3782 	{
3783 	  for (j=0;j<gs->num_pairs;j++)
3784 	    {
3785 	      printf("node_list %d: ", *iptr);
3786 	      if ((iptr2 = (gs->node_list)[j]))
3787 		{
3788 		  for (k=0;k<(gs->msg_sizes)[j];k++)
3789 		    {printf("%d ", *iptr2); iptr2++;}
3790 		}
3791 	      iptr++;
3792 	      printf("\n");
3793 	    }
3794 	}
3795       printf("\n");
3796 
3797       printf("elm_list(U): ");
3798       if ((iptr = gs->elms))
3799 	{
3800 	  for (j=0;j<gs->nel;j++)
3801 	    {printf("%d ", *iptr); iptr++;}
3802 	}
3803       printf("\n");
3804       printf("\n");
3805 
3806       printf("elm_list(T): ");
3807       if ((iptr = gs->local_elms))
3808 	{
3809 	  for (j=0;j<gs->nel_total;j++)
3810 	    {printf("%d ", *iptr); iptr++;}
3811 	}
3812       printf("\n");
3813       printf("\n");
3814 
3815       printf("map_list(T): ");
3816       if ((iptr = gs->companion))
3817 	{
3818 	  for (j=0;j<gs->nel;j++)
3819 	    {printf("%d ", *iptr); iptr++;}
3820 	}
3821       printf("\n");
3822       printf("\n");
3823 
3824 
3825       /* local exchange information */
3826       printf("\nLocal Info:\n");
3827       printf("local_strength=%d\n",   gs->local_strength);
3828       printf("num_local_total=%d\n",  gs->num_local_total);
3829       printf("num_local=%d\n",        gs->num_local);
3830       printf("num_local_gop=%d\n",    gs->num_local_gop);
3831       printf("num_local_reduce=%ld\n", (PTRINT) gs->num_local_reduce);
3832       printf("local_reduce=%ld\n",     (PTRINT) gs->local_reduce);
3833       printf("num_gop_local_reduce=%ld\n", (PTRINT) gs->num_gop_local_reduce);
3834       printf("gop_local_reduce=%ld\n",     (PTRINT) gs->gop_local_reduce);
3835       printf("\n");
3836 
3837       for (j=0;j<gs->num_local;j++)
3838 	{
3839 	  printf("local reduce_list %d: ", j);
3840 	  if ((iptr2 = (gs->local_reduce)[j]))
3841 	    {
3842 	      if ((gs->num_local_reduce)[j] <= 0)
3843 		{printf("oops");}
3844 
3845 	      for (k=0;k<(gs->num_local_reduce)[j];k++)
3846 		{printf("%d ", *iptr2); iptr2++;}
3847 	    }
3848 	  printf("\n");
3849 	}
3850 
3851       printf("\n");
3852       printf("\n");
3853 
3854       for (j=0;j<gs->num_local_gop;j++)
3855 	{
3856 	  printf("gop reduce_list %d: ", j);
3857 	  iptr2 = (gs->gop_local_reduce)[j];
3858 
3859 	  if ((gs->num_gop_local_reduce)[j] <= 0)
3860 	    {printf("oops");}
3861 
3862 
3863 	  for (k=0;k<(gs->num_gop_local_reduce)[j];k++)
3864 	    {printf("%d ", *iptr2); iptr2++;}
3865 	  printf("\n");
3866 	}
3867       printf("\n");
3868       printf("\n");
3869 
3870       /* crystal router information */
3871       printf("\n\n");
3872       printf("Tree Info:\n");
3873       printf("max_left_over=%d\n",   gs->max_left_over);
3874       printf("num_in_list=%ld\n",    (PTRINT) gs->in_num);
3875       printf("in_list=%ld\n",        (PTRINT) gs->in_list);
3876       printf("num_out_list=%ld\n",   (PTRINT) gs->out_num);
3877       printf("out_list=%ld\n",       (PTRINT) gs->out_list);
3878 
3879       printf("\n\n");
3880     }
3881   fflush(stdout);
3882 }
3883 #endif
3884 
3885 
3886 
3887 /******************************************************************************
3888 Function: gs_free()
3889 
3890 Input :
3891 
3892 Output:
3893 
3894 Return:
3895 
3896 Description:
3897   if (gs->sss) {perm_free((void*) gs->sss);}
3898 ******************************************************************************/
3899 void
3900 gs_free(register gs_id *gs)
3901 {
3902   register int i;
3903 
3904 
3905 #ifdef DEBUG
3906   error_msg_warning("start gs_gop_xxx()\n");
3907   if (!gs) {error_msg_warning("NULL ptr passed to gs_free()"); return;}
3908 #endif
3909 
3910   if (gs->nghs) {perm_free((void*) gs->nghs);}
3911   if (gs->pw_nghs) {perm_free((void*) gs->pw_nghs);}
3912 
3913   /* tree */
3914   if (gs->max_left_over)
3915     {
3916       if (gs->tree_elms) {bss_free((void*) gs->tree_elms);}
3917       if (gs->tree_buf) {bss_free((void*) gs->tree_buf);}
3918       if (gs->tree_work) {bss_free((void*) gs->tree_work);}
3919       if (gs->tree_map_in) {bss_free((void*) gs->tree_map_in);}
3920       if (gs->tree_map_out) {bss_free((void*) gs->tree_map_out);}
3921     }
3922 
3923   /* pairwise info */
3924   if (gs->num_pairs)
3925     {
3926       /* should be NULL already */
3927       if (gs->ngh_buf) {bss_free((void*) gs->ngh_buf);}
3928       if (gs->elms) {bss_free((void*) gs->elms);}
3929       if (gs->local_elms) {bss_free((void*) gs->local_elms);}
3930       if (gs->companion) {bss_free((void*) gs->companion);}
3931 
3932       /* only set if pairwise */
3933       if (gs->vals) {perm_free((void*) gs->vals);}
3934       if (gs->in) {perm_free((void*) gs->in);}
3935       if (gs->out) {perm_free((void*) gs->out);}
3936       if (gs->msg_ids_in) {perm_free((void*) gs->msg_ids_in);}
3937       if (gs->msg_ids_out) {perm_free((void*) gs->msg_ids_out);}
3938       if (gs->pw_vals) {perm_free((void*) gs->pw_vals);}
3939       if (gs->pw_elm_list) {perm_free((void*) gs->pw_elm_list);}
3940       if (gs->node_list)
3941 	{
3942 	  for (i=0;i<gs->num_pairs;i++)
3943 	    {if (gs->node_list[i]) {perm_free((void*) gs->node_list[i]);}}
3944 	  perm_free((void*) gs->node_list);
3945 	}
3946       if (gs->msg_sizes) {perm_free((void*) gs->msg_sizes);}
3947       if (gs->pair_list) {perm_free((void*) gs->pair_list);}
3948     }
3949 
3950   /* local info */
3951   if (gs->num_local_total>=0)
3952     {
3953       for (i=0;i<gs->num_local_total+1;i++)
3954 	/*      for (i=0;i<gs->num_local_total;i++) */
3955 	{
3956 	  if (gs->num_gop_local_reduce[i])
3957 	    {perm_free((void*) gs->gop_local_reduce[i]);}
3958 	}
3959     }
3960 
3961   /* if intersection tree/pairwise and local isn't empty */
3962   if (gs->gop_local_reduce) {perm_free((void*) gs->gop_local_reduce);}
3963   if (gs->num_gop_local_reduce) {perm_free((void*) gs->num_gop_local_reduce);}
3964 
3965   perm_free((void*) gs);
3966 }
3967 
3968 
3969 
3970 
3971 
3972 
3973 /******************************************************************************
3974 Function: gather_scatter
3975 
3976 Input :
3977 Output:
3978 Return:
3979 Description:
3980 ******************************************************************************/
3981 void
3982 gs_gop_vec(register gs_id *gs, register REAL *vals, register const char *op, register int step)
3983 {
3984 #ifdef DEBUG
3985   error_msg_warning("gs_gop_vec() start");
3986   if (!gs) {error_msg_fatal("gs_gop_vec() :: passed NULL gs handle!!!");}
3987   if (!op) {error_msg_fatal("gs_gop_vec() :: passed NULL operation!!!");}
3988 
3989   /* check top make sure that segments being requested aren't larger */
3990   /* then what I reserved earlier ... fix is to allow user to reset  */
3991   if (step>gs->vec_sz)
3992     {error_msg_fatal("gs_gop_vec() :: %d > %d!\n",step,gs->vec_sz);}
3993 #endif
3994 
3995   switch (*op) {
3996   case '+':
3997     gs_gop_vec_plus(gs,vals,step);
3998     break;
3999 #ifdef NOT_YET
4000   case '*':
4001     gs_gop_times(gs,vals);
4002     break;
4003   case 'a':
4004     gs_gop_min_abs(gs,vals);
4005     break;
4006   case 'A':
4007     gs_gop_max_abs(gs,vals);
4008     break;
4009   case 'e':
4010     gs_gop_exists(gs,vals);
4011     break;
4012   case 'm':
4013     gs_gop_min(gs,vals);
4014     break;
4015   case 'M':
4016     gs_gop_max(gs,vals); break;
4017     /*
4018     if (*(op+1)=='\0')
4019       {gs_gop_max(gs,vals); break;}
4020     else if (*(op+1)=='X')
4021       {gs_gop_max_abs(gs,vals); break;}
4022     else if (*(op+1)=='N')
4023       {gs_gop_min_abs(gs,vals); break;}
4024     */
4025 #endif
4026   default:
4027     error_msg_warning("gs_gop_vec() :: %c is not a valid op",op[0]);
4028     error_msg_warning("gs_gop_vec() :: default :: plus");
4029     gs_gop_vec_plus(gs,vals,step);
4030     break;
4031   }
4032 #ifdef DEBUG
4033   error_msg_warning("gs_gop_vec() end");
4034 #endif
4035 }
4036 
4037 
4038 
4039 /******************************************************************************
4040 Function: gather_scatter
4041 
4042 Input :
4043 Output:
4044 Return:
4045 Description:
4046 ******************************************************************************/
4047 static void
4048 gs_gop_vec_plus(register gs_id *gs, register REAL *vals, register int step)
4049 {
4050 #ifdef DEBUG
4051   error_msg_warning("gs_gop_vec_plus() start");
4052 #endif
4053 
4054   if (!gs) {error_msg_fatal("gs_gop_vec() passed NULL gs handle!!!");}
4055 
4056   /* local only operations!!! */
4057   if (gs->num_local)
4058     {gs_gop_vec_local_plus(gs,vals,step);}
4059 
4060   /* if intersection tree/pairwise and local isn't empty */
4061   if (gs->num_local_gop)
4062     {
4063       gs_gop_vec_local_in_plus(gs,vals,step);
4064 
4065       /* pairwise */
4066       if (gs->num_pairs)
4067 	{gs_gop_vec_pairwise_plus(gs,vals,step);}
4068 
4069       /* tree */
4070       else if (gs->max_left_over)
4071 	{gs_gop_vec_tree_plus(gs,vals,step);}
4072 
4073       gs_gop_vec_local_out(gs,vals,step);
4074     }
4075   /* if intersection tree/pairwise and local is empty */
4076   else
4077     {
4078       /* pairwise */
4079       if (gs->num_pairs)
4080 	{gs_gop_vec_pairwise_plus(gs,vals,step);}
4081 
4082       /* tree */
4083       else if (gs->max_left_over)
4084 	{gs_gop_vec_tree_plus(gs,vals,step);}
4085     }
4086 #ifdef DEBUG
4087   error_msg_warning("gs_gop_vec_plus() end");
4088 #endif
4089 }
4090 
4091 
4092 
4093 /******************************************************************************
4094 Function: gather_scatter
4095 
4096 Input :
4097 Output:
4098 Return:
4099 Description:
4100 ******************************************************************************/
4101 static
4102 void
4103 gs_gop_vec_local_plus(register gs_id *gs, register REAL *vals,
4104 		      register int step)
4105 {
4106   register int *num, *map, **reduce;
4107   register REAL *base;
4108 
4109 
4110 #ifdef DEBUG
4111   error_msg_warning("gs_gop_vec_local_plus() start");
4112 #endif
4113 
4114   num    = gs->num_local_reduce;
4115   reduce = gs->local_reduce;
4116   while ((map = *reduce))
4117     {
4118       base = vals + map[0] * step;
4119 
4120       /* wall */
4121       if (*num == 2)
4122 	{
4123 	  num++; reduce++;
4124 	  rvec_add (base,vals+map[1]*step,step);
4125 	  rvec_copy(vals+map[1]*step,base,step);
4126 	}
4127       /* corner shared by three elements */
4128       else if (*num == 3)
4129 	{
4130 	  num++; reduce++;
4131 	  rvec_add (base,vals+map[1]*step,step);
4132 	  rvec_add (base,vals+map[2]*step,step);
4133 	  rvec_copy(vals+map[2]*step,base,step);
4134 	  rvec_copy(vals+map[1]*step,base,step);
4135 	}
4136       /* corner shared by four elements */
4137       else if (*num == 4)
4138 	{
4139 	  num++; reduce++;
4140 	  rvec_add (base,vals+map[1]*step,step);
4141 	  rvec_add (base,vals+map[2]*step,step);
4142 	  rvec_add (base,vals+map[3]*step,step);
4143 	  rvec_copy(vals+map[3]*step,base,step);
4144 	  rvec_copy(vals+map[2]*step,base,step);
4145 	  rvec_copy(vals+map[1]*step,base,step);
4146 	}
4147       /* general case ... odd geoms ... 3D */
4148       else
4149 	{
4150 	  num++;
4151 	  while (*++map >= 0)
4152 	    {rvec_add (base,vals+*map*step,step);}
4153 
4154 	  map = *reduce;
4155 	  while (*++map >= 0)
4156 	    {rvec_copy(vals+*map*step,base,step);}
4157 
4158 	  reduce++;
4159 	}
4160     }
4161 #ifdef DEBUG
4162   error_msg_warning("gs_gop_vec_local_plus() end");
4163 #endif
4164 }
4165 
4166 
4167 
4168 /******************************************************************************
4169 Function: gather_scatter
4170 
4171 Input :
4172 Output:
4173 Return:
4174 Description:
4175 ******************************************************************************/
4176 static
4177 void
4178 gs_gop_vec_local_in_plus(register gs_id *gs, register REAL *vals,
4179 			 register int step)
4180 {
4181   register int  *num, *map, **reduce;
4182   register REAL *base;
4183 
4184 
4185 #ifdef DEBUG
4186   error_msg_warning("gs_gop_vec_locel_in_plus() start");
4187 #endif
4188 
4189   num    = gs->num_gop_local_reduce;
4190   reduce = gs->gop_local_reduce;
4191   while ((map = *reduce++))
4192     {
4193       base = vals + map[0] * step;
4194 
4195       /* wall */
4196       if (*num == 2)
4197 	{
4198 	  num ++;
4199 	  rvec_add(base,vals+map[1]*step,step);
4200 	}
4201       /* corner shared by three elements */
4202       else if (*num == 3)
4203 	{
4204 	  num ++;
4205 	  rvec_add(base,vals+map[1]*step,step);
4206 	  rvec_add(base,vals+map[2]*step,step);
4207 	}
4208       /* corner shared by four elements */
4209       else if (*num == 4)
4210 	{
4211 	  num ++;
4212 	  rvec_add(base,vals+map[1]*step,step);
4213 	  rvec_add(base,vals+map[2]*step,step);
4214 	  rvec_add(base,vals+map[3]*step,step);
4215 	}
4216       /* general case ... odd geoms ... 3D*/
4217       else
4218 	{
4219 	  num++;
4220 	  while (*++map >= 0)
4221 	    {rvec_add(base,vals+*map*step,step);}
4222 	}
4223     }
4224 #ifdef DEBUG
4225   error_msg_warning("gs_gop_vec_local_in_plus() end");
4226 #endif
4227 }
4228 
4229 
4230 /******************************************************************************
4231 Function: gather_scatter
4232 
4233 Input :
4234 Output:
4235 Return:
4236 Description:
4237 ******************************************************************************/
4238 static
4239 void
4240 gs_gop_vec_local_out(register gs_id *gs, register REAL *vals,
4241 		     register int step)
4242 {
4243   register int *num, *map, **reduce;
4244   register REAL *base;
4245 
4246 
4247 #ifdef DEBUG
4248   error_msg_warning("gs_gop_vec_local_out() start");
4249 #endif
4250 
4251   num    = gs->num_gop_local_reduce;
4252   reduce = gs->gop_local_reduce;
4253   while ((map = *reduce++))
4254     {
4255       base = vals + map[0] * step;
4256 
4257       /* wall */
4258       if (*num == 2)
4259 	{
4260 	  num ++;
4261 	  rvec_copy(vals+map[1]*step,base,step);
4262 	}
4263       /* corner shared by three elements */
4264       else if (*num == 3)
4265 	{
4266 	  num ++;
4267 	  rvec_copy(vals+map[1]*step,base,step);
4268 	  rvec_copy(vals+map[2]*step,base,step);
4269 	}
4270       /* corner shared by four elements */
4271       else if (*num == 4)
4272 	{
4273 	  num ++;
4274 	  rvec_copy(vals+map[1]*step,base,step);
4275 	  rvec_copy(vals+map[2]*step,base,step);
4276 	  rvec_copy(vals+map[3]*step,base,step);
4277 	}
4278       /* general case ... odd geoms ... 3D*/
4279       else
4280 	{
4281 	  num++;
4282 	  while (*++map >= 0)
4283 	    {rvec_copy(vals+*map*step,base,step);}
4284 	}
4285     }
4286 #ifdef DEBUG
4287   error_msg_warning("gs_gop_vec_local_out() end");
4288 #endif
4289 }
4290 
4291 
4292 
4293 /******************************************************************************
4294 Function: gather_scatter
4295 
4296 VERSION 3 ::
4297 
4298 Input :
4299 Output:
4300 Return:
4301 Description:
4302 ******************************************************************************/
4303 static
4304 void
4305 gs_gop_vec_pairwise_plus(register gs_id *gs, register REAL *in_vals,
4306 			 register int step)
4307 {
4308   register REAL *dptr1, *dptr2, *dptr3, *in1, *in2;
4309   register int *iptr, *msg_list, *msg_size, **msg_nodes;
4310   register int *pw, *list, *size, **nodes;
4311   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
4312   MPI_Status status;
4313 
4314 
4315 #ifdef DEBUG
4316   error_msg_warning("gs_gop_vec_pairwise_plus() start");
4317 #endif
4318 
4319   /* strip and load registers */
4320   msg_list =list         = gs->pair_list;
4321   msg_size =size         = gs->msg_sizes;
4322   msg_nodes=nodes        = gs->node_list;
4323   iptr=pw                = gs->pw_elm_list;
4324   dptr1=dptr3            = gs->pw_vals;
4325   msg_ids_in  = ids_in   = gs->msg_ids_in;
4326   msg_ids_out = ids_out  = gs->msg_ids_out;
4327   dptr2                  = gs->out;
4328   in1=in2                = gs->in;
4329 
4330   /* post the receives */
4331   /*  msg_nodes=nodes; */
4332   do
4333     {
4334       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
4335 	 second one *list and do list++ afterwards */
4336       MPI_Irecv(in1, *size *step, REAL_TYPE, MPI_ANY_SOURCE, MSGTAG1 + *list++,
4337 		gs->gs_comm, msg_ids_in++);
4338       in1 += *size++ *step;
4339     }
4340   while (*++msg_nodes);
4341   msg_nodes=nodes;
4342 
4343   /* load gs values into in out gs buffers */
4344   while (*iptr >= 0)
4345     {
4346       rvec_copy(dptr3,in_vals + *iptr*step,step);
4347       dptr3+=step;
4348       iptr++;
4349     }
4350 
4351   /* load out buffers and post the sends */
4352   while ((iptr = *msg_nodes++))
4353     {
4354       dptr3 = dptr2;
4355       while (*iptr >= 0)
4356 	{
4357 	  rvec_copy(dptr2,dptr1 + *iptr*step,step);
4358 	  dptr2+=step;
4359 	  iptr++;
4360 	}
4361       MPI_Isend(dptr3, *msg_size++ *step, REAL_TYPE, *msg_list++,
4362 		MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);
4363     }
4364 
4365   /* tree */
4366   if (gs->max_left_over)
4367     {gs_gop_vec_tree_plus(gs,in_vals,step);}
4368 
4369   /* process the received data */
4370   msg_nodes=nodes;
4371   while ((iptr = *nodes++))
4372     {
4373       /* Should I check the return value of MPI_Wait() or status? */
4374       /* Can this loop be replaced by a call to MPI_Waitall()? */
4375       MPI_Wait(ids_in++, &status);
4376       while (*iptr >= 0)
4377 	{
4378 #if defined BLAS||CBLAS
4379 	  axpy(step,1.0,in2,1,dptr1 + *iptr*step,1);
4380 #else
4381 	  rvec_add(dptr1 + *iptr*step,in2,step);
4382 #endif
4383 	  in2+=step;
4384 	  iptr++;
4385 	}
4386     }
4387 
4388   /* replace vals */
4389   while (*pw >= 0)
4390     {
4391       rvec_copy(in_vals + *pw*step,dptr1,step);
4392       dptr1+=step;
4393       pw++;
4394     }
4395 
4396   /* clear isend message handles */
4397   /* This changed for clarity though it could be the same */
4398   while (*msg_nodes++)
4399     /* Should I check the return value of MPI_Wait() or status? */
4400     /* Can this loop be replaced by a call to MPI_Waitall()? */
4401     {MPI_Wait(ids_out++, &status);}
4402 
4403 #ifdef DEBUG
4404   error_msg_warning("gs_gop_vec_pairwise_plus() end");
4405 #endif
4406 
4407 }
4408 
4409 
4410 
4411 /******************************************************************************
4412 Function: gather_scatter
4413 
4414 Input :
4415 Output:
4416 Return:
4417 Description:
4418 ******************************************************************************/
4419 static
4420 void
4421 gs_gop_vec_tree_plus(register gs_id *gs, register REAL *vals, register int step)
4422 {
4423   register int size, *in, *out;
4424   register REAL *buf, *work;
4425   int op[] = {GL_ADD,0};
4426 
4427 #ifdef DEBUG
4428   error_msg_warning("start gs_gop_vec_tree_plus()");
4429 #endif
4430 
4431   /* copy over to local variables */
4432   in   = gs->tree_map_in;
4433   out  = gs->tree_map_out;
4434   buf  = gs->tree_buf;
4435   work = gs->tree_work;
4436   size = gs->tree_nel*step;
4437 
4438   /* zero out collection buffer */
4439 #if defined  BLAS||CBLAS
4440   *work = 0.0;
4441   copy(size,work,0,buf,1);
4442 #else
4443   rvec_zero(buf,size);
4444 #endif
4445 
4446 
4447   /* copy over my contributions */
4448   while (*in >= 0)
4449     {
4450 #if defined  BLAS||CBLAS
4451       copy(step,vals + *in++*step,1,buf + *out++*step,1);
4452 #else
4453       rvec_copy(buf + *out++*step,vals + *in++*step,step);
4454 #endif
4455     }
4456 
4457   /* perform fan in/out on full buffer */
4458   /* must change grop to handle the blas */
4459   grop(buf,work,size,op);
4460 
4461   /* reset */
4462   in   = gs->tree_map_in;
4463   out  = gs->tree_map_out;
4464 
4465   /* get the portion of the results I need */
4466   while (*in >= 0)
4467     {
4468 #if defined  BLAS||CBLAS
4469       copy(step,buf + *out++*step,1,vals + *in++*step,1);
4470 #else
4471       rvec_copy(vals + *in++*step,buf + *out++*step,step);
4472 #endif
4473     }
4474 
4475 #ifdef DEBUG
4476   error_msg_warning("start gs_gop_vec_tree_plus()");
4477 #endif
4478 }
4479 
4480 
4481 
4482 /******************************************************************************
4483 Function: gather_scatter
4484 
4485 Input :
4486 Output:
4487 Return:
4488 Description:
4489 ******************************************************************************/
4490 void
4491 gs_gop_hc(register gs_id *gs, register REAL *vals, register const char *op, register int dim)
4492 {
4493 #ifdef DEBUG
4494   error_msg_warning("gs_gop_hc() start\n");
4495   if (!gs) {error_msg_fatal("gs_gop_vec() :: passed NULL gs handle!!!\n");}
4496   if (!op) {error_msg_fatal("gs_gop_vec() :: passed NULL operation!!!\n");}
4497 #endif
4498 
4499   switch (*op) {
4500   case '+':
4501     gs_gop_plus_hc(gs,vals,dim);
4502     break;
4503 #ifdef NOT_YET
4504   case '*':
4505     gs_gop_times(gs,vals);
4506     break;
4507   case 'a':
4508     gs_gop_min_abs(gs,vals);
4509     break;
4510   case 'A':
4511     gs_gop_max_abs(gs,vals);
4512     break;
4513   case 'e':
4514     gs_gop_exists(gs,vals);
4515     break;
4516   case 'm':
4517     gs_gop_min(gs,vals);
4518     break;
4519   case 'M':
4520     gs_gop_max(gs,vals); break;
4521     /*
4522     if (*(op+1)=='\0')
4523       {gs_gop_max(gs,vals); break;}
4524     else if (*(op+1)=='X')
4525       {gs_gop_max_abs(gs,vals); break;}
4526     else if (*(op+1)=='N')
4527       {gs_gop_min_abs(gs,vals); break;}
4528     */
4529 #endif
4530   default:
4531     error_msg_warning("gs_gop_hc() :: %c is not a valid op",op[0]);
4532     error_msg_warning("gs_gop_hc() :: default :: plus\n");
4533     gs_gop_plus_hc(gs,vals,dim);
4534     break;
4535   }
4536 #ifdef DEBUG
4537   error_msg_warning("gs_gop_hc() end\n");
4538 #endif
4539 }
4540 
4541 
4542 
4543 /******************************************************************************
4544 Function: gather_scatter
4545 
4546 Input :
4547 Output:
4548 Return:
4549 Description:
4550 ******************************************************************************/
4551 static void
4552 gs_gop_plus_hc(register gs_id *gs, register REAL *vals, int dim)
4553 {
4554 #ifdef DEBUG
4555   error_msg_warning("start gs_gop_hc()\n");
4556   if (!gs) {error_msg_fatal("gs_gop_hc() passed NULL gs handle!!!\n");}
4557 #endif
4558 
4559   /* if there's nothing to do return */
4560   if (dim<=0)
4561     {return;}
4562 
4563   /* can't do more dimensions then exist */
4564   dim = PetscMin(dim,i_log2_num_nodes);
4565 
4566   /* local only operations!!! */
4567   if (gs->num_local)
4568     {gs_gop_local_plus(gs,vals);}
4569 
4570   /* if intersection tree/pairwise and local isn't empty */
4571   if (gs->num_local_gop)
4572     {
4573       gs_gop_local_in_plus(gs,vals);
4574 
4575       /* pairwise will do tree inside ... */
4576       if (gs->num_pairs)
4577 	{gs_gop_pairwise_plus_hc(gs,vals,dim);}
4578 
4579       /* tree only */
4580       else if (gs->max_left_over)
4581 	{gs_gop_tree_plus_hc(gs,vals,dim);}
4582 
4583       gs_gop_local_out(gs,vals);
4584     }
4585   /* if intersection tree/pairwise and local is empty */
4586   else
4587     {
4588       /* pairwise will do tree inside */
4589       if (gs->num_pairs)
4590 	{gs_gop_pairwise_plus_hc(gs,vals,dim);}
4591 
4592       /* tree */
4593       else if (gs->max_left_over)
4594 	{gs_gop_tree_plus_hc(gs,vals,dim);}
4595     }
4596 
4597 #ifdef DEBUG
4598   error_msg_warning("end gs_gop_hc()\n");
4599 #endif
4600 }
4601 
4602 
4603 /******************************************************************************
4604 VERSION 3 ::
4605 
4606 Input :
4607 Output:
4608 Return:
4609 Description:
4610 ******************************************************************************/
4611 static
4612 void
4613 gs_gop_pairwise_plus_hc(register gs_id *gs, register REAL *in_vals, int dim)
4614 {
4615   register REAL *dptr1, *dptr2, *dptr3, *in1, *in2;
4616   register int *iptr, *msg_list, *msg_size, **msg_nodes;
4617   register int *pw, *list, *size, **nodes;
4618   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
4619   MPI_Status status;
4620   int i, mask=1;
4621 
4622   for (i=1; i<dim; i++)
4623     {mask<<=1; mask++;}
4624 
4625 
4626 #ifdef DEBUG
4627   error_msg_warning("gs_gop_pairwise_hc() start\n");
4628 #endif
4629 
4630   /* strip and load registers */
4631   msg_list =list         = gs->pair_list;
4632   msg_size =size         = gs->msg_sizes;
4633   msg_nodes=nodes        = gs->node_list;
4634   iptr=pw                = gs->pw_elm_list;
4635   dptr1=dptr3            = gs->pw_vals;
4636   msg_ids_in  = ids_in   = gs->msg_ids_in;
4637   msg_ids_out = ids_out  = gs->msg_ids_out;
4638   dptr2                  = gs->out;
4639   in1=in2                = gs->in;
4640 
4641   /* post the receives */
4642   /*  msg_nodes=nodes; */
4643   do
4644     {
4645       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
4646 	 second one *list and do list++ afterwards */
4647       if ((my_id|mask)==(*list|mask))
4648 	{
4649 	  MPI_Irecv(in1, *size, REAL_TYPE, MPI_ANY_SOURCE, MSGTAG1 + *list++,
4650 		    gs->gs_comm, msg_ids_in++);
4651 	  in1 += *size++;
4652 	}
4653       else
4654 	{list++; size++;}
4655     }
4656   while (*++msg_nodes);
4657 
4658   /* load gs values into in out gs buffers */
4659   while (*iptr >= 0)
4660     {*dptr3++ = *(in_vals + *iptr++);}
4661 
4662   /* load out buffers and post the sends */
4663   msg_nodes=nodes;
4664   list = msg_list;
4665   while ((iptr = *msg_nodes++))
4666     {
4667       if ((my_id|mask)==(*list|mask))
4668 	{
4669 	  dptr3 = dptr2;
4670 	  while (*iptr >= 0)
4671 	    {*dptr2++ = *(dptr1 + *iptr++);}
4672 	  /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
4673 	  /* is msg_ids_out++ correct? */
4674 	  MPI_Isend(dptr3, *msg_size++, REAL_TYPE, *list++,
4675 		    MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);
4676 	}
4677       else
4678 	{list++; msg_size++;}
4679     }
4680 
4681   /* do the tree while we're waiting */
4682   if (gs->max_left_over)
4683     {gs_gop_tree_plus_hc(gs,in_vals,dim);}
4684 
4685   /* process the received data */
4686   msg_nodes=nodes;
4687   list = msg_list;
4688   while ((iptr = *nodes++))
4689     {
4690       if ((my_id|mask)==(*list|mask))
4691 	{
4692 	  /* Should I check the return value of MPI_Wait() or status? */
4693 	  /* Can this loop be replaced by a call to MPI_Waitall()? */
4694 	  MPI_Wait(ids_in++, &status);
4695 	  while (*iptr >= 0)
4696 	    {*(dptr1 + *iptr++) += *in2++;}
4697 	}
4698       list++;
4699     }
4700 
4701   /* replace vals */
4702   while (*pw >= 0)
4703     {*(in_vals + *pw++) = *dptr1++;}
4704 
4705   /* clear isend message handles */
4706   /* This changed for clarity though it could be the same */
4707   while (*msg_nodes++)
4708     {
4709       if ((my_id|mask)==(*msg_list|mask))
4710 	{
4711 	  /* Should I check the return value of MPI_Wait() or status? */
4712 	  /* Can this loop be replaced by a call to MPI_Waitall()? */
4713 	  MPI_Wait(ids_out++, &status);
4714 	}
4715       msg_list++;
4716     }
4717 
4718 #ifdef DEBUG
4719   error_msg_warning("gs_gop_pairwise_hc() end\n");
4720 #endif
4721 
4722 }
4723 
4724 
4725 
4726 /******************************************************************************
4727 Function: gather_scatter
4728 
4729 Input :
4730 Output:
4731 Return:
4732 Description:
4733 ******************************************************************************/
4734 static
4735 void
4736 gs_gop_tree_plus_hc(gs_id *gs, REAL *vals, int dim)
4737 {
4738   int size;
4739   int *in, *out;
4740   REAL *buf, *work;
4741   int op[] = {GL_ADD,0};
4742 
4743 #ifdef DEBUG
4744   error_msg_warning("start gs_gop_tree_plus_hc()\n");
4745 #endif
4746 
4747   in   = gs->tree_map_in;
4748   out  = gs->tree_map_out;
4749   buf  = gs->tree_buf;
4750   work = gs->tree_work;
4751   size = gs->tree_nel;
4752 
4753 #if defined  BLAS||CBLAS
4754   *work = 0.0;
4755   copy(size,work,0,buf,1);
4756 #else
4757   rvec_zero(buf,size);
4758 #endif
4759 
4760   while (*in >= 0)
4761     {*(buf + *out++) = *(vals + *in++);}
4762 
4763   in   = gs->tree_map_in;
4764   out  = gs->tree_map_out;
4765 
4766   grop_hc(buf,work,size,op,dim);
4767 
4768   while (*in >= 0)
4769     {*(vals + *in++) = *(buf + *out++);}
4770 
4771 #ifdef DEBUG
4772   error_msg_warning("end gs_gop_tree_plus_hc()\n");
4773 #endif
4774 }
4775 
4776 
4777 
4778