xref: /petsc/src/ksp/pc/impls/tfs/gs.c (revision 827bd09bbb551290dd2e51d0997dc3a77eecb2e6)
1 
2 /***********************************gs.c***************************************
3 SPARSE GATHER-SCATTER PACKAGE: bss_malloc bss_malloc ivec error comm gs queue
4 
5 Author: Henry M. Tufo III
6 
7 e-mail: hmt@cs.brown.edu
8 
9 snail-mail:
10 Division of Applied Mathematics
11 Brown University
12 Providence, RI 02912
13 
14 Last Modification:
15 6.21.97
16 ************************************gs.c**************************************/
17 
18 /***********************************gs.c***************************************
19 File Description:
20 -----------------
21 
22 ************************************gs.c**************************************/
23 #include <stdio.h>
24 #include <math.h>
25 #include <float.h>
26 #include <limits.h>
27 
28 
29 #if   defined NXSRC
30 #ifndef DELTA
31 #include <nx.h>
32 #endif
33 
34 #elif defined MPISRC
35 #include <mpi.h>
36 #endif
37 
38 #include "petscconf.h"
39 #if defined(PETSC_HAVE_STRINGS_H)
40 #include <strings.h>
41 #endif
42 #if defined(PETSC_HAVE_STRING_H)
43 #include <string.h>
44 #endif
45 
46 #include "const.h"
47 #include "types.h"
48 #include "comm.h"
49 #include "ivec.h"
50 #include "bss_malloc.h"
51 #include "bit_mask.h"
52 #include "error.h"
53 #include "queue.h"
54 #include "blas.h"
55 #include "gs.h"
56 
57 /* default length of number of items via tree - doubles if exceeded */
58 #define TREE_BUF_SZ 2048;
59 #define GS_VEC_SZ   1
60 
61 
62 
63 /***********************************gs.c***************************************
64 Type: struct gather_scatter_id
65 ------------------------------
66 
67 ************************************gs.c**************************************/
68 typedef struct gather_scatter_id {
69   int id;
70   int nel_min;
71   int nel_max;
72   int nel_sum;
73   int negl;
74   int gl_max;
75   int gl_min;
76   int repeats;
77   int ordered;
78   int positive;
79   REAL *vals;
80 
81   /* bit mask info */
82   int *my_proc_mask;
83   int mask_sz;
84   int *ngh_buf;
85   int ngh_buf_sz;
86   int *nghs;
87   int num_nghs;
88   int max_nghs;
89   int *pw_nghs;
90   int num_pw_nghs;
91   int *tree_nghs;
92   int num_tree_nghs;
93 
94   int num_loads;
95 
96   /* repeats == true -> local info */
97   int nel;         /* number of unique elememts */
98   int *elms;       /* of size nel */
99   int nel_total;
100   int *local_elms; /* of size nel_total */
101   int *companion;  /* of size nel_total */
102 
103   /* local info */
104   int num_local_total;
105   int local_strength;
106   int num_local;
107   int *num_local_reduce;
108   int **local_reduce;
109   int num_local_gop;
110   int *num_gop_local_reduce;
111   int **gop_local_reduce;
112 
113   /* pairwise info */
114   int level;
115   int num_pairs;
116   int max_pairs;
117   int loc_node_pairs;
118   int max_node_pairs;
119   int min_node_pairs;
120   int avg_node_pairs;
121   int *pair_list;
122   int *msg_sizes;
123   int **node_list;
124   int len_pw_list;
125   int *pw_elm_list;
126   REAL *pw_vals;
127 
128 #ifdef MPISRC
129   MPI_Request *msg_ids_in;
130   MPI_Request *msg_ids_out;
131 
132 #else
133   int *msg_ids_in;
134   int *msg_ids_out;
135 
136 #endif
137 
138   REAL *out;
139   REAL *in;
140   int msg_total;
141 
142   /* tree - crystal accumulator info */
143   int max_left_over;
144   int *pre;
145   int *in_num;
146   int *out_num;
147   int **in_list;
148   int **out_list;
149 
150   /* new tree work*/
151   int  tree_nel;
152   int *tree_elms;
153   REAL *tree_buf;
154   REAL *tree_work;
155 
156   int  tree_map_sz;
157   int *tree_map_in;
158   int *tree_map_out;
159 
160   /* current memory status */
161   int gl_bss_min;
162   int gl_perm_min;
163 
164   /* max segment size for gs_gop_vec() */
165   int vec_sz;
166 
167   /* hack to make paul happy */
168 #ifdef MPISRC
169   MPI_Comm gs_comm;
170 #endif
171 
172 } gs_id;
173 
174 
175 /* to be made public */
176 #if defined(not_used)
177 static int  gs_dump_ngh(gs_id *id, int loc_num, int *num, int *ngh_list);
178 static void gsi_via_int_list(gs_id *gs);
179 static int in_sub_tree(int *ptr3, int p_mask_size, int *buf2, int buf_size);
180 #endif
181 
182 /* PRIVATE - and definitely not exported */
183 /*static void gs_print_template(register gs_id* gs, int who);*/
184 /*static void gs_print_stemplate(register gs_id* gs, int who);*/
185 
186 static gs_id *gsi_check_args(int *elms, int nel, int level);
187 static void gsi_via_bit_mask(gs_id *gs);
188 static void get_ngh_buf(gs_id *gs);
189 static void set_pairwise(gs_id *gs);
190 static gs_id * gsi_new(void);
191 static void set_tree(gs_id *gs);
192 
193 /* same for all but vector flavor */
194 static void gs_gop_local_out(gs_id *gs, REAL *vals);
195 /* vector flavor */
196 static void gs_gop_vec_local_out(gs_id *gs, REAL *vals, int step);
197 
198 static void gs_gop_vec_plus(gs_id *gs, REAL *in_vals, int step);
199 static void gs_gop_vec_pairwise_plus(gs_id *gs, REAL *in_vals, int step);
200 static void gs_gop_vec_local_plus(gs_id *gs, REAL *vals, int step);
201 static void gs_gop_vec_local_in_plus(gs_id *gs, REAL *vals, int step);
202 static void gs_gop_vec_tree_plus(gs_id *gs, REAL *vals, int step);
203 
204 
205 static void gs_gop_plus(gs_id *gs, REAL *in_vals);
206 static void gs_gop_pairwise_plus(gs_id *gs, REAL *in_vals);
207 static void gs_gop_local_plus(gs_id *gs, REAL *vals);
208 static void gs_gop_local_in_plus(gs_id *gs, REAL *vals);
209 static void gs_gop_tree_plus(gs_id *gs, REAL *vals);
210 
211 static void gs_gop_plus_hc(gs_id *gs, REAL *in_vals, int dim);
212 static void gs_gop_pairwise_plus_hc(gs_id *gs, REAL *in_vals, int dim);
213 static void gs_gop_tree_plus_hc(gs_id *gs, REAL *vals, int dim);
214 
215 static void gs_gop_times(gs_id *gs, REAL *in_vals);
216 static void gs_gop_pairwise_times(gs_id *gs, REAL *in_vals);
217 static void gs_gop_local_times(gs_id *gs, REAL *vals);
218 static void gs_gop_local_in_times(gs_id *gs, REAL *vals);
219 static void gs_gop_tree_times(gs_id *gs, REAL *vals);
220 
221 static void gs_gop_min(gs_id *gs, REAL *in_vals);
222 static void gs_gop_pairwise_min(gs_id *gs, REAL *in_vals);
223 static void gs_gop_local_min(gs_id *gs, REAL *vals);
224 static void gs_gop_local_in_min(gs_id *gs, REAL *vals);
225 static void gs_gop_tree_min(gs_id *gs, REAL *vals);
226 
227 static void gs_gop_min_abs(gs_id *gs, REAL *in_vals);
228 static void gs_gop_pairwise_min_abs(gs_id *gs, REAL *in_vals);
229 static void gs_gop_local_min_abs(gs_id *gs, REAL *vals);
230 static void gs_gop_local_in_min_abs(gs_id *gs, REAL *vals);
231 static void gs_gop_tree_min_abs(gs_id *gs, REAL *vals);
232 
233 static void gs_gop_max(gs_id *gs, REAL *in_vals);
234 static void gs_gop_pairwise_max(gs_id *gs, REAL *in_vals);
235 static void gs_gop_local_max(gs_id *gs, REAL *vals);
236 static void gs_gop_local_in_max(gs_id *gs, REAL *vals);
237 static void gs_gop_tree_max(gs_id *gs, REAL *vals);
238 
239 static void gs_gop_max_abs(gs_id *gs, REAL *in_vals);
240 static void gs_gop_pairwise_max_abs(gs_id *gs, REAL *in_vals);
241 static void gs_gop_local_max_abs(gs_id *gs, REAL *vals);
242 static void gs_gop_local_in_max_abs(gs_id *gs, REAL *vals);
243 static void gs_gop_tree_max_abs(gs_id *gs, REAL *vals);
244 
245 static void gs_gop_exists(gs_id *gs, REAL *in_vals);
246 static void gs_gop_pairwise_exists(gs_id *gs, REAL *in_vals);
247 static void gs_gop_local_exists(gs_id *gs, REAL *vals);
248 static void gs_gop_local_in_exists(gs_id *gs, REAL *vals);
249 static void gs_gop_tree_exists(gs_id *gs, REAL *vals);
250 
251 static void gs_gop_pairwise_binary(gs_id *gs, REAL *in_vals, rbfp fct);
252 static void gs_gop_local_binary(gs_id *gs, REAL *vals, rbfp fct);
253 static void gs_gop_local_in_binary(gs_id *gs, REAL *vals, rbfp fct);
254 static void gs_gop_tree_binary(gs_id *gs, REAL *vals, rbfp fct);
255 
256 
257 
258 /* global vars */
259 /* from comm.c module */
260 
261 /* module state inf and fortran interface */
262 static int num_gs_ids = 0;
263 
264 /* should make this dynamic ... later */
265 /*static queue_ADT elms_q, mask_q;*/
266 static int msg_buf=MAX_MSG_BUF;
267 /*static int msg_ch=FALSE;*/
268 
269 static int vec_sz=GS_VEC_SZ;
270 /*static int vec_ch=FALSE; */
271 
272 static int *tree_buf=NULL;
273 static int tree_buf_sz=0;
274 static int ntree=0;
275 
276 
277 /******************************************************************************
278 Function: gs_init_()
279 
280 Input :
281 Output:
282 Return:
283 Description:
284 ******************************************************************************/
285 void gs_init_vec_sz(int size)
286 {
287   /*  vec_ch = TRUE; */
288 
289   vec_sz = size;
290 }
291 
292 /******************************************************************************
293 Function: gs_init_()
294 
295 Input :
296 Output:
297 Return:
298 Description:
299 ******************************************************************************/
300 void gs_init_msg_buf_sz(int buf_size)
301 {
302   /*  msg_ch = TRUE; */
303 
304   msg_buf = buf_size;
305 }
306 
307 /******************************************************************************
308 Function: gs_init()
309 
310 Input :
311 
312 Output:
313 
314 RETURN:
315 
316 Description:
317 ******************************************************************************/
318 gs_id *
319 gs_init(register int *elms, int nel, int level)
320 {
321   register gs_id *gs;
322 #ifdef INFO1
323   int i;
324 #endif
325 #ifdef MPISRC
326   MPI_Group gs_group;
327   MPI_Comm  gs_comm;
328 #endif
329 
330 
331   bss_init();
332   perm_init();
333 
334 #ifdef DEBUG
335   error_msg_warning("gs_init() start w/(%d,%d)\n",my_id,num_nodes);
336 #endif
337 
338   /* ensure that communication package has been initialized */
339   comm_init();
340 
341 #ifdef INFO1
342   bss_stats();
343   perm_stats();
344 #endif
345 
346   /* determines if we have enough dynamic/semi-static memory */
347   /* checks input, allocs and sets gd_id template            */
348   gs = gsi_check_args(elms,nel,level);
349 
350   /* only bit mask version up and working for the moment    */
351   /* LATER :: get int list version working for sparse pblms */
352   gsi_via_bit_mask(gs);
353 
354 #ifdef INFO1
355   /* print out P0's template as well as malloc stats */
356   gs_print_template(gs,0);
357 
358 #if   defined NXSRC
359       gsync();
360 #elif defined MPISRC
361       MPI_Barrier(MPI_COMM_WORLD);
362 #endif
363 
364   for (i=1;i<num_nodes;i++)
365     {
366       gs_print_stemplate(gs,i);
367 #if   defined NXSRC
368       gsync();
369 #elif defined MPISRC
370       MPI_Barrier(MPI_COMM_WORLD);
371 #endif
372     }
373 #endif
374 
375 #ifdef DEBUG
376   error_msg_warning("gs_init() end w/(%d,%d)\n",my_id,num_nodes);
377 #endif
378 
379 #ifdef INFO
380   bss_stats();
381   perm_stats();
382 #endif
383 
384 #ifdef MPISRC
385   MPI_Comm_group(MPI_COMM_WORLD,&gs_group);
386   MPI_Comm_create(MPI_COMM_WORLD,gs_group,&gs_comm);
387   gs->gs_comm=gs_comm;
388 #endif
389 
390   return(gs);
391 }
392 
393 
394 
395 /******************************************************************************
396 Function: gsi_new()
397 
398 Input :
399 Output:
400 Return:
401 Description:
402 
403 elm list must >= 0!!!
404 elm repeats allowed
405 ******************************************************************************/
406 static
407 gs_id *
408 gsi_new(void)
409 {
410   int size=sizeof(gs_id);
411   gs_id *gs;
412 
413 
414 #ifdef DEBUG
415   error_msg_warning("gsi_new() :: size=%d\n",size);
416 #endif
417 
418   gs = (gs_id *) perm_malloc(size);
419 
420   if (!(size%REAL_LEN))
421     {rvec_zero((REAL *)gs,size/REAL_LEN);}
422   else if (!(size%INT_LEN))
423     {ivec_zero((INT *)gs,size/INT_LEN);}
424   else
425     {memset((char *)gs,0,size/sizeof(char));}
426 
427   return(gs);
428 }
429 
430 
431 
432 /******************************************************************************
433 Function: gsi_check_args()
434 
435 Input :
436 Output:
437 Return:
438 Description:
439 
440 elm list must >= 0!!!
441 elm repeats allowed
442 local working copy of elms is sorted
443 ******************************************************************************/
444 static
445 gs_id *
446 gsi_check_args(int *in_elms, int nel, int level)
447 {
448   register int i, j, k, t2;
449   int *companion, *elms, *unique, *iptr;
450   int num_local=0, *num_to_reduce, **local_reduce;
451   int oprs[] = {NON_UNIFORM,GL_MIN,GL_MAX,GL_ADD,GL_MIN,GL_MAX,GL_MIN,GL_B_AND};
452   int vals[sizeof(oprs)/sizeof(oprs[0])-1];
453   int work[sizeof(oprs)/sizeof(oprs[0])-1];
454   gs_id *gs;
455 
456 
457 #ifdef DEBUG
458   error_msg_warning("gs_check_args() begin w/(%d,%d)\n",my_id,num_nodes);
459 #endif
460 
461 #ifdef SAFE
462   if (!in_elms)
463     {error_msg_fatal("elms point to nothing!!!\n");}
464 
465   if (nel<0)
466     {error_msg_fatal("can't have fewer than 0 elms!!!\n");}
467 
468   if (nel==0)
469     {error_msg_warning("I don't have any elements!!!\n");}
470 #endif
471 
472   /* get space for gs template */
473   gs = gsi_new();
474   gs->id = ++num_gs_ids;
475 
476   /* hmt 6.4.99                                            */
477   /* caller can set global ids that don't participate to 0 */
478   /* gs_init ignores all zeros in elm list                 */
479   /* negative global ids are still invalid                 */
480   for (i=j=0;i<nel;i++)
481     {if (in_elms[i]!=0) {j++;}}
482 
483   k=nel; nel=j;
484 
485   /* copy over in_elms list and create inverse map */
486   elms = (int *) bss_malloc((nel+1)*INT_LEN);
487   companion = (int *) bss_malloc(nel*INT_LEN);
488   /* ivec_c_index(companion,nel); */
489   /* ivec_copy(elms,in_elms,nel); */
490   for (i=j=0;i<k;i++)
491     {
492       if (in_elms[i]!=0)
493 	{elms[j] = in_elms[i]; companion[j++] = i;}
494     }
495 
496   if (j!=nel)
497     {error_msg_fatal("nel j mismatch!\n");}
498 
499 #ifdef SAFE
500   /* pre-pass ... check to see if sorted */
501   elms[nel] = INT_MAX;
502   iptr = elms;
503   unique = elms+1;
504   j=0;
505   while (*iptr!=INT_MAX)
506     {
507       if (*iptr++>*unique++)
508 	{j=1; break;}
509     }
510 
511   /* set up inverse map */
512   if (j)
513     {
514       error_msg_warning("gsi_check_args() :: elm list *not* sorted!\n");
515       SMI_sort((void *)elms, (void *)companion, nel, SORT_INTEGER);
516     }
517   else
518     {error_msg_warning("gsi_check_args() :: elm list sorted!\n");}
519 #else
520   SMI_sort((void *)elms, (void *)companion, nel, SORT_INTEGER);
521 #endif
522   elms[nel] = INT_MIN;
523 
524   /* first pass */
525   /* determine number of unique elements, check pd */
526   for (i=k=0;i<nel;i+=j)
527     {
528       t2 = elms[i];
529       j=++i;
530 
531       /* clump 'em for now */
532       while (elms[j]==t2) {j++;}
533 
534       /* how many together and num local */
535       if (j-=i)
536 	{num_local++; k+=j;}
537     }
538 
539   /* how many unique elements? */
540   gs->repeats=k;
541   gs->nel = nel-k;
542 
543 
544   /* number of repeats? */
545   gs->num_local = num_local;
546   num_local+=2;
547   gs->local_reduce=local_reduce=(int **)perm_malloc(num_local*INT_PTR_LEN);
548   gs->num_local_reduce=num_to_reduce=(int *) perm_malloc(num_local*INT_LEN);
549 
550   unique = (int *) bss_malloc((gs->nel+1)*INT_LEN);
551   gs->elms = unique;
552   gs->nel_total = nel;
553   gs->local_elms = elms;
554   gs->companion = companion;
555 
556   /* compess map as well as keep track of local ops */
557   for (num_local=i=j=0;i<gs->nel;i++)
558     {
559       k=j;
560       t2 = unique[i] = elms[j];
561       companion[i] = companion[j];
562 
563       while (elms[j]==t2) {j++;}
564 
565       if ((t2=(j-k))>1)
566 	{
567 	  /* number together */
568 	  num_to_reduce[num_local] = t2++;
569 	  iptr = local_reduce[num_local++] = (int *)perm_malloc(t2*INT_LEN);
570 
571 	  /* to use binary searching don't remap until we check intersection */
572 	  *iptr++ = i;
573 
574 	  /* note that we're skipping the first one */
575 	  while (++k<j)
576 	    {*(iptr++) = companion[k];}
577 	  *iptr = -1;
578 	}
579     }
580 
581   /* sentinel for ngh_buf */
582   unique[gs->nel]=INT_MAX;
583 
584 #ifdef DEBUG
585   if (num_local!=gs->num_local)
586     {error_msg_fatal("compression of maps wrong!!!\n");}
587 #endif
588 
589   /* for two partition sort hack */
590   num_to_reduce[num_local] = 0;
591   local_reduce[num_local] = NULL;
592   num_to_reduce[++num_local] = 0;
593   local_reduce[num_local] = NULL;
594 
595   /* load 'em up */
596   /* note one extra to hold NON_UNIFORM flag!!! */
597   vals[2] = vals[1] = vals[0] = nel;
598   if (gs->nel>0)
599     {
600        vals[3] = unique[0];           /* ivec_lb(elms,nel); */
601        vals[4] = unique[gs->nel-1];   /* ivec_ub(elms,nel); */
602     }
603   else
604     {
605        vals[3] = INT_MAX;             /* ivec_lb(elms,nel); */
606        vals[4] = INT_MIN;             /* ivec_ub(elms,nel); */
607     }
608   vals[5] = level;
609   vals[6] = num_gs_ids;
610 
611   /* GLOBAL: send 'em out */
612   giop(vals,work,sizeof(oprs)/sizeof(oprs[0])-1,oprs);
613 
614   /* must be semi-pos def - only pairwise depends on this */
615   /* LATER - remove this restriction */
616   if (vals[3]<0)
617     {error_msg_fatal("gsi_check_args() :: system not semi-pos def ::%d\n",vals[3]);}
618 
619   if (vals[4]==INT_MAX)
620     {error_msg_fatal("gsi_check_args() :: system ub too large ::%d!\n",vals[4]);}
621 
622 #ifdef DEBUG
623   /* check gs template count */
624   if (vals[6] != num_gs_ids)
625     {error_msg_fatal("num_gs_ids mismatch!!!");}
626 
627   /* check all have same level threshold */
628   if (level != vals[5])
629     {error_msg_fatal("gsi_check_args() :: level not uniform across nodes!!!\n");}
630 #endif
631 
632   gs->nel_min = vals[0];
633   gs->nel_max = vals[1];
634   gs->nel_sum = vals[2];
635   gs->gl_min  = vals[3];
636   gs->gl_max  = vals[4];
637   gs->negl    = vals[4]-vals[3]+1;
638 
639 #ifdef DEBUG
640       printf("nel(unique)=%d\n", gs->nel);
641       printf("nel_max=%d\n",     gs->nel_max);
642       printf("nel_min=%d\n",     gs->nel_min);
643       printf("nel_sum=%d\n",     gs->nel_sum);
644       printf("negl=%d\n",        gs->negl);
645       printf("gl_max=%d\n",      gs->gl_max);
646       printf("gl_min=%d\n",      gs->gl_min);
647       printf("elms ordered=%d\n",gs->ordered);
648       printf("repeats=%d\n",     gs->repeats);
649       printf("positive=%d\n",    gs->positive);
650       printf("level=%d\n",       gs->level);
651 #endif
652 
653   if (gs->negl<=0)
654     {error_msg_fatal("gsi_check_args() :: system empty or neg :: %d\n",gs->negl);}
655 
656   /* LATER :: add level == -1 -> program selects level */
657   if (vals[5]<0)
658     {vals[5]=0;}
659   else if (vals[5]>num_nodes)
660     {vals[5]=num_nodes;}
661   gs->level = vals[5];
662 
663 #ifdef DEBUG
664   error_msg_warning("gs_check_args() :: end w/(%d,%d)+level=%d\n",
665 		    my_id,num_nodes,vals[5]);
666 #endif
667 
668   return(gs);
669 }
670 
671 
672 /******************************************************************************
673 Function: gsi_via_bit_mask()
674 
675 Input :
676 Output:
677 Return:
678 Description:
679 
680 
681 ******************************************************************************/
682 static
683 void
684 gsi_via_bit_mask(gs_id *gs)
685 {
686   register int i, nel, *elms;
687   int t1;
688   int **reduce;
689   int *map;
690 
691 #ifdef DEBUG
692   error_msg_warning("gsi_via_bit_mask() begin w/%d :: %d\n",my_id,num_nodes);
693 #endif
694 
695   /* totally local removes ... ct_bits == 0 */
696   get_ngh_buf(gs);
697 
698   if (gs->level)
699     {set_pairwise(gs);}
700 
701   if (gs->max_left_over)
702     {set_tree(gs);}
703 
704   /* intersection local and pairwise/tree? */
705   gs->num_local_total = gs->num_local;
706   gs->gop_local_reduce = gs->local_reduce;
707   gs->num_gop_local_reduce = gs->num_local_reduce;
708 
709   map = gs->companion;
710 
711   /* is there any local compression */
712   if (gs->num_local == 0)
713     {
714       gs->local_strength = NONE;
715       gs->num_local_gop = 0;
716     }
717   else
718     {
719       /* ok find intersection */
720       map = gs->companion;
721       reduce = gs->local_reduce;
722       for (i=0, t1=0; i<gs->num_local; i++, reduce++)
723 	{
724 	  if ((ivec_binary_search(**reduce,gs->pw_elm_list,gs->len_pw_list)>=0)
725 	      ||
726 	      ivec_binary_search(**reduce,gs->tree_map_in,gs->tree_map_sz)>=0)
727 	    {
728 	      /* printf("C%d :: i=%d, **reduce=%d\n",my_id,i,**reduce); */
729 	      t1++;
730 	      if (gs->num_local_reduce[i]<=0)
731 		{error_msg_fatal("nobody in list?");}
732 	      gs->num_local_reduce[i] *= -1;
733 	    }
734 	   **reduce=map[**reduce];
735 	}
736 
737       /* intersection is empty */
738       if (!t1)
739 	{
740 #ifdef DEBUG
741 	  error_msg_warning("gsi_check_args() :: local gs_gop w/o intersection!");
742 #endif
743 	  gs->local_strength = FULL;
744 	  gs->num_local_gop = 0;
745 	}
746       /* intersection not empty */
747       else
748 	{
749 #ifdef DEBUG
750 	  error_msg_warning("gsi_check_args() :: local gs_gop w/intersection!");
751 #endif
752 	  gs->local_strength = PARTIAL;
753 	  SMI_sort((void *)gs->num_local_reduce, (void *)gs->local_reduce,
754 		   gs->num_local + 1, SORT_INT_PTR);
755 
756 	  gs->num_local_gop = t1;
757 	  gs->num_local_total =  gs->num_local;
758 	  gs->num_local    -= t1;
759 	  gs->gop_local_reduce = gs->local_reduce;
760 	  gs->num_gop_local_reduce = gs->num_local_reduce;
761 
762 	  for (i=0; i<t1; i++)
763 	    {
764 	      if (gs->num_gop_local_reduce[i]>=0)
765 		{error_msg_fatal("they aren't negative?");}
766 	      gs->num_gop_local_reduce[i] *= -1;
767 	      gs->local_reduce++;
768 	      gs->num_local_reduce++;
769 	    }
770 	  gs->local_reduce++;
771 	  gs->num_local_reduce++;
772 	}
773     }
774 
775   elms = gs->pw_elm_list;
776   nel  = gs->len_pw_list;
777   for (i=0; i<nel; i++)
778     {elms[i] = map[elms[i]];}
779 
780   elms = gs->tree_map_in;
781   nel  = gs->tree_map_sz;
782   for (i=0; i<nel; i++)
783     {elms[i] = map[elms[i]];}
784 
785   /* clean up */
786   bss_free((void *) gs->local_elms);
787   bss_free((void *) gs->companion);
788   bss_free((void *) gs->elms);
789   bss_free((void *) gs->ngh_buf);
790   gs->local_elms = gs->companion = gs->elms = gs->ngh_buf = NULL;
791 
792 #ifdef DEBUG
793   error_msg_warning("gsi_via_bit_mask() end w/%d :: %d\n",my_id,num_nodes);
794 #endif
795 }
796 
797 
798 
799 /******************************************************************************
800 Function: place_in_tree()
801 
802 Input :
803 Output:
804 Return:
805 Description:
806 
807 
808 ******************************************************************************/
809 static
810 void
811 place_in_tree(register int elm)
812 {
813   register int *tp, n;
814 
815 
816   if (ntree==tree_buf_sz)
817     {
818       if (tree_buf_sz)
819 	{
820 	  tp = tree_buf;
821 	  n = tree_buf_sz;
822 	  tree_buf_sz<<=1;
823 	  tree_buf = (int *)bss_malloc(tree_buf_sz*INT_LEN);
824 	  ivec_copy(tree_buf,tp,n);
825 	  bss_free(tp);
826 	}
827       else
828 	{
829 	  tree_buf_sz = TREE_BUF_SZ;
830 	  tree_buf = (int *)bss_malloc(tree_buf_sz*INT_LEN);
831 	}
832     }
833 
834   tree_buf[ntree++] = elm;
835 }
836 
837 
838 
839 /******************************************************************************
840 Function: get_ngh_buf()
841 
842 Input :
843 Output:
844 Return:
845 Description:
846 
847 
848 ******************************************************************************/
849 static
850 void
851 get_ngh_buf(gs_id *gs)
852 {
853   register int i, j, npw=0, ntree_map=0;
854   int p_mask_size, ngh_buf_size, buf_size;
855   int *p_mask, *sh_proc_mask, *pw_sh_proc_mask;
856   int *ngh_buf, *buf1, *buf2;
857   int offset, per_load, num_loads, or_ct, start, end;
858   int *ptr1, *ptr2, i_start, negl, nel, *elms;
859   int oper=GL_B_OR;
860   int *ptr3, *t_mask, level, ct1, ct2;
861 
862 #ifdef DEBUG
863   error_msg_warning("get_ngh_buf() begin w/%d :: %d\n",my_id,num_nodes);
864 #endif
865 
866   /* to make life easier */
867   nel   = gs->nel;
868   elms  = gs->elms;
869   level = gs->level;
870 
871   /* det #bytes needed for processor bit masks and init w/mask cor. to my_id */
872   p_mask = (int *) bss_malloc(p_mask_size=len_bit_mask(num_nodes));
873   set_bit_mask(p_mask,p_mask_size,my_id);
874 
875   /* allocate space for masks and info bufs */
876   gs->nghs = sh_proc_mask = (int *) bss_malloc(p_mask_size);
877   gs->pw_nghs = pw_sh_proc_mask = (int *) perm_malloc(p_mask_size);
878   gs->ngh_buf_sz = ngh_buf_size = p_mask_size*nel;
879   t_mask = (int *) bss_malloc(p_mask_size);
880   gs->ngh_buf = ngh_buf = (int *) bss_malloc(ngh_buf_size);
881 
882   /* comm buffer size ... memory usage bounded by ~2*msg_buf */
883   /* had thought I could exploit rendezvous threshold */
884 
885   /* default is one pass */
886   per_load = negl  = gs->negl;
887   gs->num_loads = num_loads = 1;
888   i=p_mask_size*negl;
889 
890   /* possible overflow on buffer size */
891   /* overflow hack                    */
892   if (i<0) {i=INT_MAX;}
893 
894   buf_size = MIN(msg_buf,i);
895 
896   /* can we do it? */
897   if (p_mask_size>buf_size)
898     {error_msg_fatal("get_ngh_buf() :: buf<pms :: %d>%d\n",p_mask_size,buf_size);}
899 
900   /* get giop buf space ... make *only* one malloc */
901   buf1 = (int *) bss_malloc(buf_size<<1);
902 
903   /* more than one gior exchange needed? */
904   if (buf_size!=i)
905     {
906       per_load = buf_size/p_mask_size;
907       buf_size = per_load*p_mask_size;
908       gs->num_loads = num_loads = negl/per_load + (negl%per_load>0);
909     }
910 
911 #ifdef DEBUG
912   /* dump some basic info */
913   error_msg_warning("n_lds=%d,pms=%d,buf_sz=%d\n",num_loads,p_mask_size,buf_size);
914 #endif
915 
916   /* convert buf sizes from #bytes to #ints - 32 bit only! */
917 #ifdef SAFE
918   p_mask_size/=INT_LEN; ngh_buf_size/=INT_LEN; buf_size/=INT_LEN;
919 #else
920   p_mask_size>>=2; ngh_buf_size>>=2; buf_size>>=2;
921 #endif
922 
923   /* find giop work space */
924   buf2 = buf1+buf_size;
925 
926   /* hold #ints needed for processor masks */
927   gs->mask_sz=p_mask_size;
928 
929   /* init buffers */
930   ivec_zero(sh_proc_mask,p_mask_size);
931   ivec_zero(pw_sh_proc_mask,p_mask_size);
932   ivec_zero(ngh_buf,ngh_buf_size);
933 
934   /* HACK reset tree info */
935   tree_buf=NULL;
936   tree_buf_sz=ntree=0;
937 
938   /* queue the tree elements for now */
939   /* elms_q = new_queue(); */
940 
941   /* can also queue tree info for pruned or forest implememtation */
942   /*  mask_q = new_queue(); */
943 
944   /* ok do it */
945   for (ptr1=ngh_buf,ptr2=elms,end=gs->gl_min,or_ct=i=0; or_ct<num_loads; or_ct++)
946     {
947       /* identity for bitwise or is 000...000 */
948       ivec_zero(buf1,buf_size);
949 
950       /* load msg buffer */
951       for (start=end,end+=per_load,i_start=i; (offset=*ptr2)<end; i++, ptr2++)
952 	{
953 	  offset = (offset-start)*p_mask_size;
954 	  ivec_copy(buf1+offset,p_mask,p_mask_size);
955 	}
956 
957       /* GLOBAL: pass buffer */
958 #if   defined NXSRC
959       gior(buf1,buf_size,buf2);
960 #elif defined MPISRC
961       giop(buf1,buf2,buf_size,&oper);
962 #endif
963 
964 
965       /* unload buffer into ngh_buf */
966       ptr2=(elms+i_start);
967       for(ptr3=buf1,j=start; j<end; ptr3+=p_mask_size,j++)
968 	{
969 	  /* I own it ... may have to pairwise it */
970 	  if (j==*ptr2)
971 	    {
972 	      /* do i share it w/anyone? */
973 #ifdef SAFE
974 	      ct1 = ct_bits((char *)ptr3,p_mask_size*INT_LEN);
975 #else
976 	      ct1 = ct_bits((char *)ptr3,p_mask_size<<2);
977 #endif
978 	      /* guess not */
979 	      if (ct1<2)
980 		{ptr2++; ptr1+=p_mask_size; continue;}
981 
982 	      /* i do ... so keep info and turn off my bit */
983 	      ivec_copy(ptr1,ptr3,p_mask_size);
984 	      ivec_xor(ptr1,p_mask,p_mask_size);
985 	      ivec_or(sh_proc_mask,ptr1,p_mask_size);
986 
987 	      /* is it to be done pairwise? */
988 	      if (--ct1<=level)
989 		{
990 		  npw++;
991 
992 		  /* turn on high bit to indicate pw need to process */
993 		  *ptr2++ |= TOP_BIT;
994 		  ivec_or(pw_sh_proc_mask,ptr1,p_mask_size);
995 		  ptr1+=p_mask_size;
996 		  continue;
997 		}
998 
999 	      /* get set for next and note that I have a tree contribution */
1000 	      /* could save exact elm index for tree here -> save a search */
1001 	      ptr2++; ptr1+=p_mask_size; ntree_map++;
1002 	    }
1003 	  /* i don't but still might be involved in tree */
1004 	  else
1005 	    {
1006 
1007 	      /* shared by how many? */
1008 #ifdef SAFE
1009 	      ct1 = ct_bits((char *)ptr3,p_mask_size*INT_LEN);
1010 #else
1011 	      ct1 = ct_bits((char *)ptr3,p_mask_size<<2);
1012 #endif
1013 
1014 	      /* none! */
1015 	      if (ct1<2)
1016 		{continue;}
1017 
1018 	      /* is it going to be done pairwise? but not by me of course!*/
1019 	      if (--ct1<=level)
1020 		{continue;}
1021 	    }
1022 	  /* LATER we're going to have to process it NOW */
1023 	  /* nope ... tree it */
1024 	  place_in_tree(j);
1025 	}
1026     }
1027 
1028   bss_free((void *)t_mask);
1029   bss_free((void *)buf1);
1030 
1031   gs->len_pw_list=npw;
1032   gs->num_nghs = ct_bits((char *)sh_proc_mask,p_mask_size*INT_LEN);
1033 
1034   /* expand from bit mask list to int list and save ngh list */
1035   gs->nghs = (int *) perm_malloc(gs->num_nghs * INT_LEN);
1036   bm_to_proc((char *)sh_proc_mask,p_mask_size*INT_LEN,gs->nghs);
1037 
1038   gs->num_pw_nghs = ct_bits((char *)pw_sh_proc_mask,p_mask_size*INT_LEN);
1039 
1040   oper = GL_MAX;
1041   ct1 = gs->num_nghs;
1042   giop(&ct1,&ct2,1,&oper);
1043   gs->max_nghs = ct1;
1044 
1045   gs->tree_map_sz  = ntree_map;
1046   gs->max_left_over=ntree;
1047 
1048   bss_free((void *)p_mask);
1049   bss_free((void *)sh_proc_mask);
1050 
1051 #ifdef DEBUG
1052   error_msg_warning("get_ngh_buf() end w/%d :: %d\n",my_id,num_nodes);
1053 #endif
1054 }
1055 
1056 
1057 
1058 
1059 
1060 /******************************************************************************
1061 Function: pairwise_init()
1062 
1063 Input :
1064 Output:
1065 Return:
1066 Description:
1067 
1068 if an element is shared by fewer that level# of nodes do pairwise exch
1069 ******************************************************************************/
1070 static
1071 void
1072 set_pairwise(gs_id *gs)
1073 {
1074   register int i, j;
1075   int p_mask_size;
1076   int *p_mask, *sh_proc_mask, *tmp_proc_mask;
1077   int *ngh_buf, *buf2;
1078   int offset;
1079   int *msg_list, *msg_size, **msg_nodes, nprs;
1080   int *pairwise_elm_list, len_pair_list=0;
1081   int *iptr, t1, i_start, nel, *elms;
1082   int ct;
1083 
1084 
1085 #ifdef DEBUG
1086   error_msg_warning("set_pairwise() begin w/%d :: %d\n",my_id,num_nodes);
1087 #endif
1088 
1089   /* to make life easier */
1090   nel  = gs->nel;
1091   elms = gs->elms;
1092   ngh_buf = gs->ngh_buf;
1093   sh_proc_mask  = gs->pw_nghs;
1094 
1095   /* need a few temp masks */
1096   p_mask_size   = len_bit_mask(num_nodes);
1097   p_mask        = (int *) bss_malloc(p_mask_size);
1098   tmp_proc_mask = (int *) bss_malloc(p_mask_size);
1099 
1100   /* set mask to my my_id's bit mask */
1101   set_bit_mask(p_mask,p_mask_size,my_id);
1102 
1103 #ifdef SAFE
1104   p_mask_size /= INT_LEN;
1105 #else
1106   p_mask_size >>= 2;
1107 #endif
1108 
1109   len_pair_list=gs->len_pw_list;
1110   gs->pw_elm_list=pairwise_elm_list=(int*)perm_malloc((len_pair_list+1)*INT_LEN);
1111 
1112   /* how many processors (nghs) do we have to exchange with? */
1113   nprs=gs->num_pairs=ct_bits((char *)sh_proc_mask,p_mask_size*INT_LEN);
1114 
1115 
1116   /* allocate space for gs_gop() info */
1117   gs->pair_list = msg_list = (int *)  perm_malloc(INT_LEN*nprs);
1118   gs->msg_sizes = msg_size  = (int *)  perm_malloc(INT_LEN*nprs);
1119   gs->node_list = msg_nodes = (int **) perm_malloc(INT_PTR_LEN*(nprs+1));
1120 
1121   /* init msg_size list */
1122   ivec_zero(msg_size,nprs);
1123 
1124   /* expand from bit mask list to int list */
1125   bm_to_proc((char *)sh_proc_mask,p_mask_size*INT_LEN,msg_list);
1126 
1127   /* keep list of elements being handled pairwise */
1128   for (i=j=0;i<nel;i++)
1129     {
1130       if (elms[i] & TOP_BIT)
1131 	{elms[i] ^= TOP_BIT; pairwise_elm_list[j++] = i;}
1132     }
1133   pairwise_elm_list[j] = -1;
1134 
1135 #ifdef DEBUG
1136   if (j!=len_pair_list)
1137     {error_msg_fatal("oops ... bad paiwise list in set_pairwise!");}
1138 #endif
1139 
1140 #if defined MPISRC
1141   gs->msg_ids_out = (MPI_Request *)  perm_malloc(sizeof(MPI_Request)*(nprs+1));
1142   gs->msg_ids_out[nprs] = MPI_REQUEST_NULL;
1143   gs->msg_ids_in = (MPI_Request *)  perm_malloc(sizeof(MPI_Request)*(nprs+1));
1144   gs->msg_ids_in[nprs] = MPI_REQUEST_NULL;
1145   gs->pw_vals = (REAL *) perm_malloc(REAL_LEN*len_pair_list*vec_sz);
1146 #else
1147   gs->msg_ids_out = (int *)  perm_malloc(INT_LEN*(nprs+1));
1148   ivec_zero(gs->msg_ids_out,nprs);
1149   gs->msg_ids_out[nprs] = -1;
1150   gs->msg_ids_in = (int *)  perm_malloc(INT_LEN*(nprs+1));
1151   ivec_zero(gs->msg_ids_in,nprs);
1152   gs->msg_ids_in[nprs] = -1;
1153   gs->pw_vals = (REAL *) perm_malloc(REAL_LEN*len_pair_list*vec_sz);
1154 #endif
1155 
1156   /* find who goes to each processor */
1157   for (i_start=i=0;i<nprs;i++)
1158     {
1159       /* processor i's mask */
1160       set_bit_mask(p_mask,p_mask_size*INT_LEN,msg_list[i]);
1161 
1162       /* det # going to processor i */
1163       for (ct=j=0;j<len_pair_list;j++)
1164 	{
1165 	  buf2 = ngh_buf+(pairwise_elm_list[j]*p_mask_size);
1166 	  ivec_and3(tmp_proc_mask,p_mask,buf2,p_mask_size);
1167 	  if (ct_bits((char *)tmp_proc_mask,p_mask_size*INT_LEN))
1168 	    {ct++;}
1169 	}
1170       msg_size[i] = ct;
1171       i_start = MAX(i_start,ct);
1172 
1173       /*space to hold nodes in message to first neighbor */
1174       msg_nodes[i] = iptr = (int *) perm_malloc(INT_LEN*(ct+1));
1175 
1176       for (j=0;j<len_pair_list;j++)
1177 	{
1178 	  buf2 = ngh_buf+(pairwise_elm_list[j]*p_mask_size);
1179 	  ivec_and3(tmp_proc_mask,p_mask,buf2,p_mask_size);
1180 	  if (ct_bits((char *)tmp_proc_mask,p_mask_size*INT_LEN))
1181 	    {*iptr++ = j;}
1182 	}
1183       *iptr = -1;
1184     }
1185   msg_nodes[nprs] = NULL;
1186 
1187 #ifdef INFO1
1188   t1 = GL_MAX;
1189   giop(&i_start,&offset,1,&t1);
1190   gs->max_pairs = i_start;
1191 #else
1192   j=gs->loc_node_pairs=i_start;
1193   t1 = GL_MAX;
1194   giop(&i_start,&offset,1,&t1);
1195   gs->max_node_pairs = i_start;
1196 
1197   i_start=j;
1198   t1 = GL_MIN;
1199   giop(&i_start,&offset,1,&t1);
1200   gs->min_node_pairs = i_start;
1201 
1202   i_start=j;
1203   t1 = GL_ADD;
1204   giop(&i_start,&offset,1,&t1);
1205   gs->avg_node_pairs = i_start/num_nodes + 1;
1206 
1207   i_start=nprs;
1208   t1 = GL_MAX;
1209   giop(&i_start,&offset,1,&t1);
1210   gs->max_pairs = i_start;
1211 
1212   /*gs->max_pairs = -1;*/
1213 #endif
1214 
1215   /* remap pairwise in tail of gsi_via_bit_mask() */
1216   gs->msg_total = ivec_sum(gs->msg_sizes,nprs);
1217   gs->out = (REAL *) perm_malloc(REAL_LEN*gs->msg_total*vec_sz);
1218   gs->in  = (REAL *) perm_malloc(REAL_LEN*gs->msg_total*vec_sz);
1219 
1220   /* reset malloc pool */
1221   bss_free((void *)p_mask);
1222   bss_free((void *)tmp_proc_mask);
1223 
1224 #ifdef DEBUG
1225   error_msg_warning("set_pairwise() end w/%d :: %d\n",my_id,num_nodes);
1226 #endif
1227 }
1228 
1229 
1230 
1231 /******************************************************************************
1232 Function: set_tree()
1233 
1234 Input :
1235 Output:
1236 Return:
1237 Description:
1238 
1239 to do pruned tree just save ngh buf copy for each one and decode here!
1240 ******************************************************************************/
1241 static
1242 void
1243 set_tree(gs_id *gs)
1244 {
1245   register int i, j, n, nel;
1246   register int *iptr_in, *iptr_out, *tree_elms, *elms;
1247 
1248 
1249 #ifdef DEBUG
1250   error_msg_warning("set_tree() :: begin\n");
1251 #endif
1252 
1253   /* local work ptrs */
1254   elms = gs->elms;
1255   nel     = gs->nel;
1256 
1257   /* how many via tree */
1258   gs->tree_nel  = n = ntree;
1259   gs->tree_elms = tree_elms = iptr_in = tree_buf;
1260   gs->tree_buf  = (REAL *) bss_malloc(REAL_LEN*n*vec_sz);
1261   gs->tree_work = (REAL *) bss_malloc(REAL_LEN*n*vec_sz);
1262   j=gs->tree_map_sz;
1263   gs->tree_map_in = iptr_in  = (int *) bss_malloc(INT_LEN*(j+1));
1264   gs->tree_map_out = iptr_out = (int *) bss_malloc(INT_LEN*(j+1));
1265 
1266 #ifdef DEBUG
1267   error_msg_warning("num on tree=%d,%d",gs->max_left_over,gs->tree_nel);
1268 #endif
1269 
1270   /* search the longer of the two lists */
1271   /* note ... could save this info in get_ngh_buf and save searches */
1272   if (n<=nel)
1273     {
1274       /* bijective fct w/remap - search elm list */
1275       for (i=0; i<n; i++)
1276 	{
1277 	  if ((j=ivec_binary_search(*tree_elms++,elms,nel))>=0)
1278 	    {*iptr_in++ = j; *iptr_out++ = i;}
1279 	}
1280     }
1281   else
1282     {
1283       for (i=0; i<nel; i++)
1284 	{
1285 	  if ((j=ivec_binary_search(*elms++,tree_elms,n))>=0)
1286 	    {*iptr_in++ = i; *iptr_out++ = j;}
1287 	}
1288     }
1289 
1290   /* sentinel */
1291   *iptr_in = *iptr_out = -1;
1292 
1293 #ifdef DEBUG
1294   error_msg_warning("set_tree() :: end\n");
1295 #endif
1296 }
1297 
1298 
1299 /******************************************************************************
1300 Function: gsi_via_int_list()
1301 
1302 Input :
1303 Output:
1304 Return:
1305 Description:
1306 ******************************************************************************/
1307 /*static
1308 void
1309 gsi_via_int_list(gs_id *gs)
1310 {
1311 
1312    LATER: for P large the bit masks -> too many passes
1313    LATER: strategy: do gsum w/1 in position i in negl if owner
1314    LATER: then sum of entire vector 1 ... negl determines min buf len
1315    LATER: So choose min from this or mask method
1316 }*/
1317 
1318 
1319 #if defined(not_used)
1320 static
1321 int
1322 root_sub_tree(int *proc_list, int num)
1323 {
1324   register int i, j, p_or, p_and;
1325   register int root, mask;
1326 
1327 
1328   /* ceiling(log2(num_nodes)) - 1 */
1329   j = i_log2_num_nodes;
1330   if (num_nodes==floor_num_nodes)
1331     {j--;}
1332 
1333   /* set mask to msb */
1334   for(mask=1,i=0; i<j; i++)
1335     {mask<<=1;}
1336 
1337   p_or  = ivec_reduce_or(proc_list,num);
1338   p_and = ivec_reduce_and(proc_list,num);
1339   for(root=i=0; i<j; i++,mask>>=1)
1340     {
1341       /* (msb-i)'th bits on ==> root in right 1/2 tree */
1342       if (mask & p_and)
1343 	{root |= mask;}
1344 
1345       /* (msb-i)'th bits differ ==> root found */
1346       else if (mask & p_or)
1347 	{break;}
1348 
1349       /* (msb-i)'th bits off ==>root in left 1/2 tree */
1350     }
1351 
1352 #ifdef DEBUG
1353   if ((root<0) || (root>num_nodes))
1354     {error_msg_fatal("root_sub_tree() :: bad root!");}
1355 
1356   if (!my_id)
1357     {
1358       printf("num_nodes=%d, j=%d, root=%d\n",num_nodes,j,root);
1359       printf("procs: ");
1360       for(i=0;i<num;i++)
1361 	{printf("%d ",proc_list[i]);}
1362       printf("\n");
1363     }
1364 #endif
1365 
1366   return(root);
1367 }
1368 #endif
1369 
1370 
1371 #if defined(not_used)
1372 static int
1373 in_sub_tree(int *mask, int mask_size, int *work, int nw)
1374 {
1375   int ct, nb;
1376 
1377   /* mask size in bytes */
1378   nb = mask_size<<2;
1379 
1380   /* shared amoungst how many? */
1381   ct = ct_bits((char *)mask,nb);
1382 
1383   /* enough space? */
1384   if (nw<ct)
1385     {error_msg_fatal("in_sub_tree() :: not enough space to expand bit mask!");}
1386 
1387   /* expand */
1388   bm_to_proc((char *)mask,nb,work);
1389 
1390   /* find tree root */
1391   root_sub_tree(work,ct);
1392 
1393   /* am i in any of the paths? */
1394 
1395   return(TRUE);
1396 
1397   /*
1398   sh_mask = (int *)bss_malloc(nb);
1399   bss_free(sh_mask);
1400   */
1401 }
1402 #endif
1403 
1404 
1405 /******************************************************************************
1406 Function: gather_scatter
1407 
1408 Input :
1409 Output:
1410 Return:
1411 Description:
1412 ******************************************************************************/
1413 static
1414 void
1415 gs_gop_local_out(register gs_id *gs, register REAL *vals)
1416 {
1417   register int *num, *map, **reduce;
1418   register REAL tmp;
1419 
1420 
1421 #ifdef DEBUG
1422   error_msg_warning("start gs_gop_xxx()\n");
1423 #endif
1424 
1425   num    = gs->num_gop_local_reduce;
1426   reduce = gs->gop_local_reduce;
1427   while ((map = *reduce++))
1428     {
1429       /* wall */
1430       if (*num == 2)
1431 	{
1432 	  num ++;
1433 	  vals[map[1]] = vals[map[0]];
1434 	}
1435       /* corner shared by three elements */
1436       else if (*num == 3)
1437 	{
1438 	  num ++;
1439 	  vals[map[2]] = vals[map[1]] = vals[map[0]];
1440 	}
1441       /* corner shared by four elements */
1442       else if (*num == 4)
1443 	{
1444 	  num ++;
1445 	  vals[map[3]] = vals[map[2]] = vals[map[1]] = vals[map[0]];
1446 	}
1447       /* general case ... odd geoms ... 3D*/
1448       else
1449 	{
1450 	  num++;
1451 	  tmp = *(vals + *map++);
1452 	  while (*map >= 0)
1453 	    {*(vals + *map++) = tmp;}
1454 	}
1455     }
1456 }
1457 
1458 
1459 
1460 /******************************************************************************
1461 Function: gather_scatter
1462 
1463 Input :
1464 Output:
1465 Return:
1466 Description:
1467 ******************************************************************************/
1468 void
1469 gs_gop_binary(gs_ADT gs, REAL *vals, rbfp fct)
1470 {
1471 #ifdef DEBUG
1472   if (!gs)  {error_msg_fatal("gs_gop() :: passed NULL gs handle!!!");}
1473   if (!fct) {error_msg_fatal("gs_gop() :: passed NULL bin fct handle!!!");}
1474   error_msg_warning("start gs_gop_xxx()\n");
1475 #endif
1476 
1477   /* local only operations!!! */
1478   if (gs->num_local)
1479     {gs_gop_local_binary(gs,vals,fct);}
1480 
1481   /* if intersection tree/pairwise and local isn't empty */
1482   if (gs->num_local_gop)
1483     {
1484       gs_gop_local_in_binary(gs,vals,fct);
1485 
1486       /* pairwise */
1487       if (gs->num_pairs)
1488 	{gs_gop_pairwise_binary(gs,vals,fct);}
1489 
1490       /* tree */
1491       else if (gs->max_left_over)
1492 	{gs_gop_tree_binary(gs,vals,fct);}
1493 
1494       gs_gop_local_out(gs,vals);
1495     }
1496   /* if intersection tree/pairwise and local is empty */
1497   else
1498     {
1499       /* pairwise */
1500       if (gs->num_pairs)
1501 	{gs_gop_pairwise_binary(gs,vals,fct);}
1502 
1503       /* tree */
1504       else if (gs->max_left_over)
1505 	{gs_gop_tree_binary(gs,vals,fct);}
1506     }
1507 }
1508 
1509 
1510 
1511 /******************************************************************************
1512 Function: gather_scatter
1513 
1514 Input :
1515 Output:
1516 Return:
1517 Description:
1518 ******************************************************************************/
1519 static
1520 void
1521 gs_gop_local_binary(register gs_id *gs, register REAL *vals, register rbfp fct)
1522 {
1523   register int *num, *map, **reduce;
1524   REAL tmp;
1525 
1526 
1527 #ifdef DEBUG
1528   error_msg_warning("start gs_gop_xxx()\n");
1529 #endif
1530   num    = gs->num_local_reduce;
1531   reduce = gs->local_reduce;
1532   while ((map = *reduce))
1533     {
1534       num ++;
1535       (*fct)(&tmp,NULL,1);
1536       /* tmp = 0.0; */
1537       while (*map >= 0)
1538 	{(*fct)(&tmp,(vals + *map),1); map++;}
1539 	/*        {tmp = (*fct)(tmp,*(vals + *map)); map++;} */
1540 
1541       map = *reduce++;
1542       while (*map >= 0)
1543         {*(vals + *map++) = tmp;}
1544     }
1545 }
1546 
1547 
1548 
1549 /******************************************************************************
1550 Function: gather_scatter
1551 
1552 Input :
1553 Output:
1554 Return:
1555 Description:
1556 ******************************************************************************/
1557 static
1558 void
1559 gs_gop_local_in_binary(register gs_id *gs, register REAL *vals, register rbfp fct)
1560 {
1561   register int *num, *map, **reduce;
1562   register REAL *base;
1563 
1564 
1565 #ifdef DEBUG
1566   error_msg_warning("start gs_gop_xxx()\n");
1567 #endif
1568 
1569   num    = gs->num_gop_local_reduce;
1570 
1571   reduce = gs->gop_local_reduce;
1572   while ((map = *reduce++))
1573     {
1574       num++;
1575       base = vals + *map++;
1576       while (*map >= 0)
1577 	{(*fct)(base,(vals + *map),1); map++;}
1578 	/*        {*base = (*fct)(*base,*(vals + *map)); map++;} */
1579     }
1580 }
1581 
1582 
1583 
1584 /******************************************************************************
1585 Function: gather_scatter
1586 
1587 VERSION 3 ::
1588 
1589 Input :
1590 Output:
1591 Return:
1592 Description:
1593 ******************************************************************************/
1594 static
1595 void
1596 gs_gop_pairwise_binary(register gs_id *gs, register REAL *in_vals,
1597                        register rbfp fct)
1598 {
1599 #if   defined NXSRC
1600   register REAL *dptr1, *dptr2, *dptr3, *in1, *in2;
1601   register int *iptr, *msg_list, *msg_size, **msg_nodes;
1602   register int *pw, *list, *size, **nodes;
1603   register int *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
1604 
1605 #ifdef DEBUG
1606   error_msg_warning("start gs_gop_xxx()\n");
1607 #endif
1608 
1609   /* strip and load registers */
1610   msg_list =list         = gs->pair_list;
1611   msg_size =size         = gs->msg_sizes;
1612   msg_nodes=nodes        = gs->node_list;
1613   iptr=pw                = gs->pw_elm_list;
1614   dptr1=dptr3            = gs->pw_vals;
1615   msg_ids_in  = ids_in   = gs->msg_ids_in;
1616   msg_ids_out = ids_out  = gs->msg_ids_out;
1617   dptr2                  = gs->out;
1618   in1=in2                = gs->in;
1619 
1620   /* post the receives */
1621   do
1622     {
1623       *msg_ids_in++ = (int) irecv(MSGTAG1 + *list++,(char *)in1,*size*REAL_LEN);
1624       in1 += *size++;
1625     }
1626   while (*msg_ids_in >= 0);
1627 
1628   /* load gs values into in out gs buffers */
1629   while (*iptr >= 0)
1630     {*dptr3++ = *(in_vals + *iptr++);}
1631 
1632   /* load out buffers and post the sends */
1633   while (iptr = *msg_nodes++)
1634     {
1635       dptr3 = dptr2;
1636       while (*iptr >= 0)
1637         {*dptr2++ = *(dptr1 + *iptr++);}
1638       *msg_ids_out++ = (int) isend(MSGTAG1+my_id,(char *)dptr3,
1639 				   *(msg_size++)*REAL_LEN,*msg_list++,0);
1640     }
1641 
1642   /* post the receives ... was here*/
1643   if (gs->max_left_over)
1644     {gs_gop_tree_binary(gs,in_vals,fct);}
1645 
1646   /* process the received data */
1647   while (iptr = *nodes++)
1648     {
1649       msgwait(*ids_in++);
1650       while (*iptr >= 0)
1651 	{(*fct)((dptr1 + *iptr),in2,1); iptr++; in2++;}
1652 	/* {*(dptr1 + *iptr) = (*fct)(*(dptr1 + *iptr),*in2); iptr++; in2++;} */
1653     }
1654 
1655   /* replace vals */
1656   while (*pw >= 0)
1657     {*(in_vals + *pw++) = *dptr1++;}
1658 
1659   /* clear isend message handles */
1660   while (*ids_out >= 0)
1661     {msgwait(*ids_out++);}
1662 
1663 #elif defined MPISRC
1664   register REAL *dptr1, *dptr2, *dptr3, *in1, *in2;
1665   register int *iptr, *msg_list, *msg_size, **msg_nodes;
1666   register int *pw, *list, *size, **nodes;
1667   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
1668   MPI_Status status;
1669 
1670 
1671   /* strip and load registers */
1672   msg_list =list         = gs->pair_list;
1673   msg_size =size         = gs->msg_sizes;
1674   msg_nodes=nodes        = gs->node_list;
1675   iptr=pw                = gs->pw_elm_list;
1676   dptr1=dptr3            = gs->pw_vals;
1677   msg_ids_in  = ids_in   = gs->msg_ids_in;
1678   msg_ids_out = ids_out  = gs->msg_ids_out;
1679   dptr2                  = gs->out;
1680   in1=in2                = gs->in;
1681 
1682   /* post the receives */
1683   /*  msg_nodes=nodes; */
1684   do
1685     {
1686       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
1687          second one *list and do list++ afterwards */
1688       MPI_Irecv(in1, *size, REAL_TYPE, MPI_ANY_SOURCE, MSGTAG1 + *list++,
1689                 gs->gs_comm, msg_ids_in++);
1690       in1 += *size++;
1691     }
1692   while (*++msg_nodes);
1693   msg_nodes=nodes;
1694 
1695   /* load gs values into in out gs buffers */
1696   while (*iptr >= 0)
1697     {*dptr3++ = *(in_vals + *iptr++);}
1698 
1699   /* load out buffers and post the sends */
1700   while ((iptr = *msg_nodes++))
1701     {
1702       dptr3 = dptr2;
1703       while (*iptr >= 0)
1704         {*dptr2++ = *(dptr1 + *iptr++);}
1705       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
1706       /* is msg_ids_out++ correct? */
1707       MPI_Isend(dptr3, *msg_size++, REAL_TYPE, *msg_list++,
1708                 MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);
1709     }
1710 
1711   if (gs->max_left_over)
1712     {gs_gop_tree_binary(gs,in_vals,fct);}
1713 
1714   /* process the received data */
1715   msg_nodes=nodes;
1716   while ((iptr = *nodes++))
1717     {
1718       /* Should I check the return value of MPI_Wait() or status? */
1719       /* Can this loop be replaced by a call to MPI_Waitall()? */
1720       MPI_Wait(ids_in++, &status);
1721       while (*iptr >= 0)
1722 	{(*fct)((dptr1 + *iptr),in2,1); iptr++; in2++;}
1723       /* {*(dptr1 + *iptr) = (*fct)(*(dptr1 + *iptr),*in2); iptr++; in2++;} */
1724     }
1725 
1726   /* replace vals */
1727   while (*pw >= 0)
1728     {*(in_vals + *pw++) = *dptr1++;}
1729 
1730   /* clear isend message handles */
1731   /* This changed for clarity though it could be the same */
1732   while (*msg_nodes++)
1733     /* Should I check the return value of MPI_Wait() or status? */
1734     /* Can this loop be replaced by a call to MPI_Waitall()? */
1735     {MPI_Wait(ids_out++, &status);}
1736 #else
1737   return;
1738 #endif
1739 
1740 
1741 }
1742 
1743 
1744 
1745 /******************************************************************************
1746 Function: gather_scatter
1747 
1748 Input :
1749 Output:
1750 Return:
1751 Description:
1752 ******************************************************************************/
1753 static
1754 void
1755 gs_gop_tree_binary(gs_id *gs, REAL *vals, register rbfp fct)
1756 {
1757   int size;
1758   int *in, *out;
1759   REAL *buf, *work;
1760 
1761 #ifdef DEBUG
1762   error_msg_warning("gs_gop_tree_binary() :: start\n");
1763 #endif
1764 
1765   in   = gs->tree_map_in;
1766   out  = gs->tree_map_out;
1767   buf  = gs->tree_buf;
1768   work = gs->tree_work;
1769   size = gs->tree_nel;
1770 
1771   /* load vals vector w/identity */
1772   (*fct)(buf,NULL,size);
1773 
1774   /* load my contribution into val vector */
1775   while (*in >= 0)
1776     {(*fct)((buf + *out++),(vals + *in++),-1);}
1777 /*    {*(buf + *out++) = *(vals + *in++);} */
1778 
1779   gfop(buf,work,size,(vbfp)fct,REAL_TYPE,0);
1780 
1781   in   = gs->tree_map_in;
1782   out  = gs->tree_map_out;
1783   while (*in >= 0)
1784     {(*fct)((vals + *in++),(buf + *out++),-1);}
1785     /*    {*(vals + *in++) = *(buf + *out++);} */
1786 
1787 
1788 #ifdef DEBUG
1789   error_msg_warning("gs_gop_tree_binary() :: end\n");
1790 #endif
1791 
1792 }
1793 
1794 
1795 
1796 
1797 /******************************************************************************
1798 Function: gather_scatter
1799 
1800 Input :
1801 Output:
1802 Return:
1803 Description:
1804 ******************************************************************************/
1805 void
1806 gs_gop(register gs_id *gs, register REAL *vals, register const char *op)
1807 {
1808 #ifdef DEBUG
1809   error_msg_warning("start gs_gop()\n");
1810   if (!gs) {error_msg_fatal("gs_gop() :: passed NULL gs handle!!!");}
1811   if (!op) {error_msg_fatal("gs_gop() :: passed NULL operation!!!");}
1812 #endif
1813 
1814   switch (*op) {
1815   case '+':
1816     gs_gop_plus(gs,vals);
1817     break;
1818   case '*':
1819     gs_gop_times(gs,vals);
1820     break;
1821   case 'a':
1822     gs_gop_min_abs(gs,vals);
1823     break;
1824   case 'A':
1825     gs_gop_max_abs(gs,vals);
1826     break;
1827   case 'e':
1828     gs_gop_exists(gs,vals);
1829     break;
1830   case 'm':
1831     gs_gop_min(gs,vals);
1832     break;
1833   case 'M':
1834     gs_gop_max(gs,vals); break;
1835     /*
1836     if (*(op+1)=='\0')
1837       {gs_gop_max(gs,vals); break;}
1838     else if (*(op+1)=='X')
1839       {gs_gop_max_abs(gs,vals); break;}
1840     else if (*(op+1)=='N')
1841       {gs_gop_min_abs(gs,vals); break;}
1842     */
1843   default:
1844     error_msg_warning("gs_gop() :: %c is not a valid op",op[0]);
1845     error_msg_warning("gs_gop() :: default :: plus");
1846     gs_gop_plus(gs,vals);
1847     break;
1848   }
1849 #ifdef DEBUG
1850   error_msg_warning("end gs_gop()\n");
1851 #endif
1852 }
1853 
1854 
1855 /******************************************************************************
1856 Function: gather_scatter
1857 
1858 Input :
1859 Output:
1860 Return:
1861 Description:
1862 ******************************************************************************/
1863 static void
1864 gs_gop_exists(register gs_id *gs, register REAL *vals)
1865 {
1866 #ifdef DEBUG
1867   error_msg_warning("start gs_gop_xxx()\n");
1868 #endif
1869 
1870   /* local only operations!!! */
1871   if (gs->num_local)
1872     {gs_gop_local_exists(gs,vals);}
1873 
1874   /* if intersection tree/pairwise and local isn't empty */
1875   if (gs->num_local_gop)
1876     {
1877       gs_gop_local_in_exists(gs,vals);
1878 
1879       /* pairwise */
1880       if (gs->num_pairs)
1881 	{gs_gop_pairwise_exists(gs,vals);}
1882 
1883       /* tree */
1884       else if (gs->max_left_over)
1885 	{gs_gop_tree_exists(gs,vals);}
1886 
1887       gs_gop_local_out(gs,vals);
1888     }
1889   /* if intersection tree/pairwise and local is empty */
1890   else
1891     {
1892       /* pairwise */
1893       if (gs->num_pairs)
1894 	{gs_gop_pairwise_exists(gs,vals);}
1895 
1896       /* tree */
1897       else if (gs->max_left_over)
1898 	{gs_gop_tree_exists(gs,vals);}
1899     }
1900 }
1901 
1902 
1903 
1904 /******************************************************************************
1905 Function: gather_scatter
1906 
1907 Input :
1908 Output:
1909 Return:
1910 Description:
1911 ******************************************************************************/
1912 static
1913 void
1914 gs_gop_local_exists(register gs_id *gs, register REAL *vals)
1915 {
1916   register int *num, *map, **reduce;
1917   register REAL tmp;
1918 
1919 
1920 #ifdef DEBUG
1921   error_msg_warning("start gs_gop_xxx()\n");
1922 #endif
1923 
1924   num    = gs->num_local_reduce;
1925   reduce = gs->local_reduce;
1926   while ((map = *reduce))
1927     {
1928       num ++;
1929       tmp = 0.0;
1930       while (*map >= 0)
1931 	{tmp = EXISTS(tmp,*(vals + *map)); map++;}
1932 
1933       map = *reduce++;
1934       while (*map >= 0)
1935 	{*(vals + *map++) = tmp;}
1936     }
1937 }
1938 
1939 
1940 
1941 /******************************************************************************
1942 Function: gather_scatter
1943 
1944 Input :
1945 Output:
1946 Return:
1947 Description:
1948 ******************************************************************************/
1949 static
1950 void
1951 gs_gop_local_in_exists(register gs_id *gs, register REAL *vals)
1952 {
1953   register int *num, *map, **reduce;
1954   register REAL *base;
1955 
1956 
1957 #ifdef DEBUG
1958   error_msg_warning("start gs_gop_xxx()\n");
1959 #endif
1960 
1961   num    = gs->num_gop_local_reduce;
1962   reduce = gs->gop_local_reduce;
1963   while ((map = *reduce++))
1964     {
1965       num++;
1966       base = vals + *map++;
1967       while (*map >= 0)
1968 	{*base = EXISTS(*base,*(vals + *map)); map++;}
1969     }
1970 }
1971 
1972 
1973 
1974 /******************************************************************************
1975 Function: gather_scatter
1976 
1977 VERSION 3 ::
1978 
1979 Input :
1980 Output:
1981 Return:
1982 Description:
1983 ******************************************************************************/
1984 static
1985 void
1986 gs_gop_pairwise_exists(register gs_id *gs, register REAL *in_vals)
1987 {
1988 #if   defined NXSRC
1989   register REAL *dptr1, *dptr2, *dptr3, *in1, *in2;
1990   register int *iptr, *msg_list, *msg_size, **msg_nodes;
1991   register int *pw, *list, *size, **nodes;
1992   register int *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
1993 
1994 
1995 #ifdef DEBUG
1996   error_msg_warning("start gs_gop_xxx()\n");
1997 #endif
1998 
1999   /* strip and load registers */
2000   msg_list =list         = gs->pair_list;
2001   msg_size =size         = gs->msg_sizes;
2002   msg_nodes=nodes        = gs->node_list;
2003   iptr=pw                = gs->pw_elm_list;
2004   dptr1=dptr3            = gs->pw_vals;
2005   msg_ids_in  = ids_in   = gs->msg_ids_in;
2006   msg_ids_out = ids_out  = gs->msg_ids_out;
2007   dptr2                  = gs->out;
2008   in1=in2                = gs->in;
2009 
2010   /* post the receives */
2011   do
2012     {
2013       *msg_ids_in++ = (int) irecv(MSGTAG1 + *list++,(char *)in1,*size*REAL_LEN);
2014       in1 += *size++;
2015     }
2016   while (*msg_ids_in >= 0);
2017 
2018   /* load gs values into in out gs buffers */
2019   while (*iptr >= 0)
2020     {*dptr3++ = *(in_vals + *iptr++);}
2021 
2022   /* load out buffers and post the sends */
2023   while (iptr = *msg_nodes++)
2024     {
2025       dptr3 = dptr2;
2026       while (*iptr >= 0)
2027 	{*dptr2++ = *(dptr1 + *iptr++);}
2028       *msg_ids_out++ = (int) isend(MSGTAG1+my_id,(char *)dptr3,*(msg_size++)*REAL_LEN,
2029 			     *msg_list++,0);
2030     }
2031 
2032   /* post the receives ... was here*/
2033   if (gs->max_left_over)
2034     {gs_gop_tree_exists(gs,in_vals);}
2035 
2036   /* process the received data */
2037   while (iptr = *nodes++)
2038     {
2039       msgwait(*ids_in++);
2040       while (*iptr >= 0)
2041 	{*(dptr1 + *iptr) = EXISTS(*(dptr1 + *iptr),*in2); iptr++; in2++;}
2042     }
2043 
2044   /* replace vals */
2045   while (*pw >= 0)
2046     {*(in_vals + *pw++) = *dptr1++;}
2047 
2048   /* clear isend message handles */
2049   while (*ids_out >= 0)
2050     {msgwait(*ids_out++);}
2051 
2052 #elif defined MPISRC
2053   register REAL *dptr1, *dptr2, *dptr3, *in1, *in2;
2054   register int *iptr, *msg_list, *msg_size, **msg_nodes;
2055   register int *pw, *list, *size, **nodes;
2056   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
2057   MPI_Status status;
2058 
2059 
2060   /* strip and load registers */
2061   msg_list =list         = gs->pair_list;
2062   msg_size =size         = gs->msg_sizes;
2063   msg_nodes=nodes        = gs->node_list;
2064   iptr=pw                = gs->pw_elm_list;
2065   dptr1=dptr3            = gs->pw_vals;
2066   msg_ids_in  = ids_in   = gs->msg_ids_in;
2067   msg_ids_out = ids_out  = gs->msg_ids_out;
2068   dptr2                  = gs->out;
2069   in1=in2                = gs->in;
2070 
2071   /* post the receives */
2072   /*  msg_nodes=nodes; */
2073   do
2074     {
2075       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
2076 	 second one *list and do list++ afterwards */
2077       MPI_Irecv(in1, *size, REAL_TYPE, MPI_ANY_SOURCE, MSGTAG1 + *list++,
2078 		gs->gs_comm, msg_ids_in++);
2079       in1 += *size++;
2080     }
2081   while (*++msg_nodes);
2082   msg_nodes=nodes;
2083 
2084   /* load gs values into in out gs buffers */
2085   while (*iptr >= 0)
2086     {*dptr3++ = *(in_vals + *iptr++);}
2087 
2088   /* load out buffers and post the sends */
2089   while ((iptr = *msg_nodes++))
2090     {
2091       dptr3 = dptr2;
2092       while (*iptr >= 0)
2093 	{*dptr2++ = *(dptr1 + *iptr++);}
2094       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
2095       /* is msg_ids_out++ correct? */
2096       MPI_Isend(dptr3, *msg_size++, REAL_TYPE, *msg_list++,
2097 		MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);
2098     }
2099 
2100   if (gs->max_left_over)
2101     {gs_gop_tree_exists(gs,in_vals);}
2102 
2103   /* process the received data */
2104   msg_nodes=nodes;
2105   while ((iptr = *nodes++))
2106     {
2107       /* Should I check the return value of MPI_Wait() or status? */
2108       /* Can this loop be replaced by a call to MPI_Waitall()? */
2109       MPI_Wait(ids_in++, &status);
2110       while (*iptr >= 0)
2111 	{*(dptr1 + *iptr) = EXISTS(*(dptr1 + *iptr),*in2); iptr++; in2++;}
2112     }
2113 
2114   /* replace vals */
2115   while (*pw >= 0)
2116     {*(in_vals + *pw++) = *dptr1++;}
2117 
2118   /* clear isend message handles */
2119   /* This changed for clarity though it could be the same */
2120   while (*msg_nodes++)
2121     /* Should I check the return value of MPI_Wait() or status? */
2122     /* Can this loop be replaced by a call to MPI_Waitall()? */
2123     {MPI_Wait(ids_out++, &status);}
2124 #else
2125   return;
2126 #endif
2127 }
2128 
2129 
2130 
2131 /******************************************************************************
2132 Function: gather_scatter
2133 
2134 Input :
2135 Output:
2136 Return:
2137 Description:
2138 ******************************************************************************/
2139 static
2140 void
2141 gs_gop_tree_exists(gs_id *gs, REAL *vals)
2142 {
2143   int size;
2144   int *in, *out;
2145   REAL *buf, *work;
2146   int op[] = {GL_EXISTS,0};
2147 
2148 
2149 #ifdef DEBUG
2150   error_msg_warning("start gs_gop_tree_exists()");
2151 #endif
2152 
2153   in   = gs->tree_map_in;
2154   out  = gs->tree_map_out;
2155   buf  = gs->tree_buf;
2156   work = gs->tree_work;
2157   size = gs->tree_nel;
2158 
2159 #if defined  BLAS||CBLAS
2160   *work = 0.0;
2161   copy(size,work,0,buf,1);
2162 #else
2163   rvec_zero(buf,size);
2164 #endif
2165 
2166   while (*in >= 0)
2167     {
2168       /*
2169       printf("%d :: out=%d\n",my_id,*out);
2170       printf("%d :: in=%d\n",my_id,*in);
2171       */
2172       *(buf + *out++) = *(vals + *in++);
2173     }
2174 
2175   grop(buf,work,size,op);
2176 
2177   in   = gs->tree_map_in;
2178   out  = gs->tree_map_out;
2179 
2180   while (*in >= 0)
2181     {*(vals + *in++) = *(buf + *out++);}
2182 
2183 #ifdef DEBUG
2184   error_msg_warning("start gs_gop_tree_exists()");
2185 #endif
2186 }
2187 
2188 
2189 
2190 /******************************************************************************
2191 Function: gather_scatter
2192 
2193 Input :
2194 Output:
2195 Return:
2196 Description:
2197 ******************************************************************************/
2198 static void
2199 gs_gop_max_abs(register gs_id *gs, register REAL *vals)
2200 {
2201 #ifdef DEBUG
2202   error_msg_warning("start gs_gop_xxx()\n");
2203 #endif
2204 
2205   /* local only operations!!! */
2206   if (gs->num_local)
2207     {gs_gop_local_max_abs(gs,vals);}
2208 
2209   /* if intersection tree/pairwise and local isn't empty */
2210   if (gs->num_local_gop)
2211     {
2212       gs_gop_local_in_max_abs(gs,vals);
2213 
2214       /* pairwise */
2215       if (gs->num_pairs)
2216 	{gs_gop_pairwise_max_abs(gs,vals);}
2217 
2218       /* tree */
2219       else if (gs->max_left_over)
2220 	{gs_gop_tree_max_abs(gs,vals);}
2221 
2222       gs_gop_local_out(gs,vals);
2223     }
2224   /* if intersection tree/pairwise and local is empty */
2225   else
2226     {
2227       /* pairwise */
2228       if (gs->num_pairs)
2229 	{gs_gop_pairwise_max_abs(gs,vals);}
2230 
2231       /* tree */
2232       else if (gs->max_left_over)
2233 	{gs_gop_tree_max_abs(gs,vals);}
2234     }
2235 }
2236 
2237 
2238 
2239 /******************************************************************************
2240 Function: gather_scatter
2241 
2242 Input :
2243 Output:
2244 Return:
2245 Description:
2246 ******************************************************************************/
2247 static
2248 void
2249 gs_gop_local_max_abs(register gs_id *gs, register REAL *vals)
2250 {
2251   register int *num, *map, **reduce;
2252   register REAL tmp;
2253 
2254 
2255 #ifdef DEBUG
2256   error_msg_warning("start gs_gop_xxx()\n");
2257 #endif
2258 
2259   num    = gs->num_local_reduce;
2260   reduce = gs->local_reduce;
2261   while ((map = *reduce))
2262     {
2263       num ++;
2264       tmp = 0.0;
2265       while (*map >= 0)
2266 	{tmp = MAX_FABS(tmp,*(vals + *map)); map++;}
2267 
2268       map = *reduce++;
2269       while (*map >= 0)
2270 	{*(vals + *map++) = tmp;}
2271     }
2272 }
2273 
2274 
2275 
2276 /******************************************************************************
2277 Function: gather_scatter
2278 
2279 Input :
2280 Output:
2281 Return:
2282 Description:
2283 ******************************************************************************/
2284 static
2285 void
2286 gs_gop_local_in_max_abs(register gs_id *gs, register REAL *vals)
2287 {
2288   register int *num, *map, **reduce;
2289   register REAL *base;
2290 
2291 
2292 #ifdef DEBUG
2293   error_msg_warning("start gs_gop_xxx()\n");
2294 #endif
2295 
2296   num    = gs->num_gop_local_reduce;
2297   reduce = gs->gop_local_reduce;
2298   while ((map = *reduce++))
2299     {
2300       num++;
2301       base = vals + *map++;
2302       while (*map >= 0)
2303 	{*base = MAX_FABS(*base,*(vals + *map)); map++;}
2304     }
2305 }
2306 
2307 
2308 
2309 /******************************************************************************
2310 Function: gather_scatter
2311 
2312 VERSION 3 ::
2313 
2314 Input :
2315 Output:
2316 Return:
2317 Description:
2318 ******************************************************************************/
2319 static
2320 void
2321 gs_gop_pairwise_max_abs(register gs_id *gs, register REAL *in_vals)
2322 {
2323 #if   defined NXSRC
2324   register REAL *dptr1, *dptr2, *dptr3, *in1, *in2;
2325   register int *iptr, *msg_list, *msg_size, **msg_nodes;
2326   register int *pw, *list, *size, **nodes;
2327   register int *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
2328 
2329 
2330 
2331 #ifdef DEBUG
2332   error_msg_warning("start gs_gop_xxx()\n");
2333 #endif
2334 
2335   /* strip and load registers */
2336   msg_list =list         = gs->pair_list;
2337   msg_size =size         = gs->msg_sizes;
2338   msg_nodes=nodes        = gs->node_list;
2339   iptr=pw                = gs->pw_elm_list;
2340   dptr1=dptr3            = gs->pw_vals;
2341   msg_ids_in  = ids_in   = gs->msg_ids_in;
2342   msg_ids_out = ids_out  = gs->msg_ids_out;
2343   dptr2                  = gs->out;
2344   in1=in2                = gs->in;
2345 
2346   /* post the receives */
2347   do
2348     {
2349       *msg_ids_in++ = (int) irecv(MSGTAG1 + *list++,(char *)in1,*size*REAL_LEN);
2350       in1 += *size++;
2351     }
2352   while (*msg_ids_in >= 0);
2353 
2354   /* load gs values into in out gs buffers */
2355   while (*iptr >= 0)
2356     {*dptr3++ = *(in_vals + *iptr++);}
2357 
2358   /* load out buffers and post the sends */
2359   while (iptr = *msg_nodes++)
2360     {
2361       dptr3 = dptr2;
2362       while (*iptr >= 0)
2363 	{*dptr2++ = *(dptr1 + *iptr++);}
2364       *msg_ids_out++ = (int) isend(MSGTAG1+my_id,(char *)dptr3,*(msg_size++)*REAL_LEN,
2365 			     *msg_list++,0);
2366     }
2367 
2368   /* post the receives ... was here*/
2369   if (gs->max_left_over)
2370     {gs_gop_tree_max_abs(gs,in_vals);}
2371 
2372   /* process the received data */
2373   while (iptr = *nodes++)
2374     {
2375       msgwait(*ids_in++);
2376       while (*iptr >= 0)
2377 	{*(dptr1 + *iptr) = MAX_FABS(*(dptr1 + *iptr),*in2); iptr++; in2++;}
2378     }
2379 
2380   /* replace vals */
2381   while (*pw >= 0)
2382     {*(in_vals + *pw++) = *dptr1++;}
2383 
2384   /* clear isend message handles */
2385   while (*ids_out >= 0)
2386     {msgwait(*ids_out++);}
2387 
2388 #elif defined MPISRC
2389   register REAL *dptr1, *dptr2, *dptr3, *in1, *in2;
2390   register int *iptr, *msg_list, *msg_size, **msg_nodes;
2391   register int *pw, *list, *size, **nodes;
2392   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
2393   MPI_Status status;
2394 
2395 
2396   /* strip and load registers */
2397   msg_list =list         = gs->pair_list;
2398   msg_size =size         = gs->msg_sizes;
2399   msg_nodes=nodes        = gs->node_list;
2400   iptr=pw                = gs->pw_elm_list;
2401   dptr1=dptr3            = gs->pw_vals;
2402   msg_ids_in  = ids_in   = gs->msg_ids_in;
2403   msg_ids_out = ids_out  = gs->msg_ids_out;
2404   dptr2                  = gs->out;
2405   in1=in2                = gs->in;
2406 
2407   /* post the receives */
2408   /*  msg_nodes=nodes; */
2409   do
2410     {
2411       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
2412 	 second one *list and do list++ afterwards */
2413       MPI_Irecv(in1, *size, REAL_TYPE, MPI_ANY_SOURCE, MSGTAG1 + *list++,
2414 		gs->gs_comm, msg_ids_in++);
2415       in1 += *size++;
2416     }
2417   while (*++msg_nodes);
2418   msg_nodes=nodes;
2419 
2420   /* load gs values into in out gs buffers */
2421   while (*iptr >= 0)
2422     {*dptr3++ = *(in_vals + *iptr++);}
2423 
2424   /* load out buffers and post the sends */
2425   while ((iptr = *msg_nodes++))
2426     {
2427       dptr3 = dptr2;
2428       while (*iptr >= 0)
2429 	{*dptr2++ = *(dptr1 + *iptr++);}
2430       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
2431       /* is msg_ids_out++ correct? */
2432       MPI_Isend(dptr3, *msg_size++, REAL_TYPE, *msg_list++,
2433 		MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);
2434     }
2435 
2436   if (gs->max_left_over)
2437     {gs_gop_tree_max_abs(gs,in_vals);}
2438 
2439   /* process the received data */
2440   msg_nodes=nodes;
2441   while ((iptr = *nodes++))
2442     {
2443       /* Should I check the return value of MPI_Wait() or status? */
2444       /* Can this loop be replaced by a call to MPI_Waitall()? */
2445       MPI_Wait(ids_in++, &status);
2446       while (*iptr >= 0)
2447 	{*(dptr1 + *iptr) = MAX_FABS(*(dptr1 + *iptr),*in2); iptr++; in2++;}
2448     }
2449 
2450   /* replace vals */
2451   while (*pw >= 0)
2452     {*(in_vals + *pw++) = *dptr1++;}
2453 
2454   /* clear isend message handles */
2455   /* This changed for clarity though it could be the same */
2456   while (*msg_nodes++)
2457     /* Should I check the return value of MPI_Wait() or status? */
2458     /* Can this loop be replaced by a call to MPI_Waitall()? */
2459     {MPI_Wait(ids_out++, &status);}
2460 #else
2461   return;
2462 #endif
2463 }
2464 
2465 
2466 
2467 /******************************************************************************
2468 Function: gather_scatter
2469 
2470 Input :
2471 Output:
2472 Return:
2473 Description:
2474 ******************************************************************************/
2475 static
2476 void
2477 gs_gop_tree_max_abs(gs_id *gs, REAL *vals)
2478 {
2479   int size;
2480   int *in, *out;
2481   REAL *buf, *work;
2482   int op[] = {GL_MAX_ABS,0};
2483 
2484 
2485 #ifdef DEBUG
2486   error_msg_warning("start gs_gop_tree_max_abs()");
2487 #endif
2488 
2489   in   = gs->tree_map_in;
2490   out  = gs->tree_map_out;
2491   buf  = gs->tree_buf;
2492   work = gs->tree_work;
2493   size = gs->tree_nel;
2494 
2495 #if defined BLAS||CBLAS
2496   *work = 0.0;
2497   copy(size,work,0,buf,1);
2498 #else
2499   rvec_zero(buf,size);
2500 #endif
2501 
2502   while (*in >= 0)
2503     {
2504       /*
2505       printf("%d :: out=%d\n",my_id,*out);
2506       printf("%d :: in=%d\n",my_id,*in);
2507       */
2508       *(buf + *out++) = *(vals + *in++);
2509     }
2510 
2511   grop(buf,work,size,op);
2512 
2513   in   = gs->tree_map_in;
2514   out  = gs->tree_map_out;
2515 
2516   while (*in >= 0)
2517     {*(vals + *in++) = *(buf + *out++);}
2518 
2519 #ifdef DEBUG
2520   error_msg_warning("start gs_gop_tree_max_abs()");
2521 #endif
2522 }
2523 
2524 
2525 
2526 /******************************************************************************
2527 Function: gather_scatter
2528 
2529 Input :
2530 Output:
2531 Return:
2532 Description:
2533 ******************************************************************************/
2534 static void
2535 gs_gop_max(register gs_id *gs, register REAL *vals)
2536 {
2537 #ifdef DEBUG
2538   error_msg_warning("start gs_gop_xxx()\n");
2539 #endif
2540 
2541 
2542   /* local only operations!!! */
2543   if (gs->num_local)
2544     {gs_gop_local_max(gs,vals);}
2545 
2546   /* if intersection tree/pairwise and local isn't empty */
2547   if (gs->num_local_gop)
2548     {
2549       gs_gop_local_in_max(gs,vals);
2550 
2551       /* pairwise */
2552       if (gs->num_pairs)
2553 	{gs_gop_pairwise_max(gs,vals);}
2554 
2555       /* tree */
2556       else if (gs->max_left_over)
2557 	{gs_gop_tree_max(gs,vals);}
2558 
2559       gs_gop_local_out(gs,vals);
2560     }
2561   /* if intersection tree/pairwise and local is empty */
2562   else
2563     {
2564       /* pairwise */
2565       if (gs->num_pairs)
2566 	{gs_gop_pairwise_max(gs,vals);}
2567 
2568       /* tree */
2569       else if (gs->max_left_over)
2570 	{gs_gop_tree_max(gs,vals);}
2571     }
2572 }
2573 
2574 
2575 
2576 /******************************************************************************
2577 Function: gather_scatter
2578 
2579 Input :
2580 Output:
2581 Return:
2582 Description:
2583 ******************************************************************************/
2584 static
2585 void
2586 gs_gop_local_max(register gs_id *gs, register REAL *vals)
2587 {
2588   register int *num, *map, **reduce;
2589   register REAL tmp;
2590 
2591 
2592 #ifdef DEBUG
2593   error_msg_warning("start gs_gop_xxx()\n");
2594 #endif
2595 
2596   num    = gs->num_local_reduce;
2597   reduce = gs->local_reduce;
2598   while ((map = *reduce))
2599     {
2600       num ++;
2601       tmp = -REAL_MAX;
2602       while (*map >= 0)
2603 	{tmp = MAX(tmp,*(vals + *map)); map++;}
2604 
2605       map = *reduce++;
2606       while (*map >= 0)
2607 	{*(vals + *map++) = tmp;}
2608     }
2609 }
2610 
2611 
2612 
2613 /******************************************************************************
2614 Function: gather_scatter
2615 
2616 Input :
2617 Output:
2618 Return:
2619 Description:
2620 ******************************************************************************/
2621 static
2622 void
2623 gs_gop_local_in_max(register gs_id *gs, register REAL *vals)
2624 {
2625   register int *num, *map, **reduce;
2626   register REAL *base;
2627 
2628 
2629 #ifdef DEBUG
2630   error_msg_warning("start gs_gop_xxx()\n");
2631 #endif
2632 
2633   num    = gs->num_gop_local_reduce;
2634   reduce = gs->gop_local_reduce;
2635   while ((map = *reduce++))
2636     {
2637       num++;
2638       base = vals + *map++;
2639       while (*map >= 0)
2640 	{*base = MAX(*base,*(vals + *map)); map++;}
2641     }
2642 }
2643 
2644 
2645 
2646 /******************************************************************************
2647 Function: gather_scatter
2648 
2649 VERSION 3 ::
2650 
2651 Input :
2652 Output:
2653 Return:
2654 Description:
2655 ******************************************************************************/
2656 static
2657 void
2658 gs_gop_pairwise_max(register gs_id *gs, register REAL *in_vals)
2659 {
2660 #if   defined NXSRC
2661   register REAL *dptr1, *dptr2, *dptr3, *in1, *in2;
2662   register int *iptr, *msg_list, *msg_size, **msg_nodes;
2663   register int *pw, *list, *size, **nodes;
2664   register int *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
2665 
2666 
2667 #ifdef DEBUG
2668   error_msg_warning("start gs_gop_xxx()\n");
2669 #endif
2670 
2671   /* strip and load registers */
2672   msg_list =list         = gs->pair_list;
2673   msg_size =size         = gs->msg_sizes;
2674   msg_nodes=nodes        = gs->node_list;
2675   iptr=pw                = gs->pw_elm_list;
2676   dptr1=dptr3            = gs->pw_vals;
2677   msg_ids_in  = ids_in   = gs->msg_ids_in;
2678   msg_ids_out = ids_out  = gs->msg_ids_out;
2679   dptr2                  = gs->out;
2680   in1=in2                = gs->in;
2681 
2682   /* post the receives */
2683   do
2684     {
2685       *msg_ids_in++ = (int) irecv(MSGTAG1 + *list++,(char *)in1,*size*REAL_LEN);
2686       in1 += *size++;
2687     }
2688   while (*msg_ids_in >= 0);
2689 
2690   /* load gs values into in out gs buffers */
2691   while (*iptr >= 0)
2692     {*dptr3++ = *(in_vals + *iptr++);}
2693 
2694   /* load out buffers and post the sends */
2695   while (iptr = *msg_nodes++)
2696     {
2697       dptr3 = dptr2;
2698       while (*iptr >= 0)
2699 	{*dptr2++ = *(dptr1 + *iptr++);}
2700       *msg_ids_out++ = (int) isend(MSGTAG1+my_id,(char *)dptr3,*(msg_size++)*REAL_LEN,
2701 			     *msg_list++,0);
2702     }
2703 
2704   /* post the receives ... was here*/
2705   if (gs->max_left_over)
2706     {gs_gop_tree_max(gs,in_vals);}
2707 
2708   /* process the received data */
2709   while (iptr = *nodes++)
2710     {
2711       msgwait(*ids_in++);
2712       while (*iptr >= 0)
2713 	{*(dptr1 + *iptr) = MAX(*(dptr1 + *iptr),*in2); iptr++; in2++;}
2714     }
2715 
2716   /* replace vals */
2717   while (*pw >= 0)
2718     {*(in_vals + *pw++) = *dptr1++;}
2719 
2720   /* clear isend message handles */
2721   while (*ids_out >= 0)
2722     {msgwait(*ids_out++);}
2723 
2724 #elif defined MPISRC
2725   register REAL *dptr1, *dptr2, *dptr3, *in1, *in2;
2726   register int *iptr, *msg_list, *msg_size, **msg_nodes;
2727   register int *pw, *list, *size, **nodes;
2728   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
2729   MPI_Status status;
2730 
2731 
2732   /* strip and load registers */
2733   msg_list =list         = gs->pair_list;
2734   msg_size =size         = gs->msg_sizes;
2735   msg_nodes=nodes        = gs->node_list;
2736   iptr=pw                = gs->pw_elm_list;
2737   dptr1=dptr3            = gs->pw_vals;
2738   msg_ids_in  = ids_in   = gs->msg_ids_in;
2739   msg_ids_out = ids_out  = gs->msg_ids_out;
2740   dptr2                  = gs->out;
2741   in1=in2                = gs->in;
2742 
2743   /* post the receives */
2744   /*  msg_nodes=nodes; */
2745   do
2746     {
2747       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
2748 	 second one *list and do list++ afterwards */
2749       MPI_Irecv(in1, *size, REAL_TYPE, MPI_ANY_SOURCE, MSGTAG1 + *list++,
2750 		gs->gs_comm, msg_ids_in++);
2751       in1 += *size++;
2752     }
2753   while (*++msg_nodes);
2754   msg_nodes=nodes;
2755 
2756   /* load gs values into in out gs buffers */
2757   while (*iptr >= 0)
2758     {*dptr3++ = *(in_vals + *iptr++);}
2759 
2760   /* load out buffers and post the sends */
2761   while ((iptr = *msg_nodes++))
2762     {
2763       dptr3 = dptr2;
2764       while (*iptr >= 0)
2765 	{*dptr2++ = *(dptr1 + *iptr++);}
2766       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
2767       /* is msg_ids_out++ correct? */
2768       MPI_Isend(dptr3, *msg_size++, REAL_TYPE, *msg_list++,
2769 		MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);
2770     }
2771 
2772   if (gs->max_left_over)
2773     {gs_gop_tree_max(gs,in_vals);}
2774 
2775   /* process the received data */
2776   msg_nodes=nodes;
2777   while ((iptr = *nodes++))
2778     {
2779       /* Should I check the return value of MPI_Wait() or status? */
2780       /* Can this loop be replaced by a call to MPI_Waitall()? */
2781       MPI_Wait(ids_in++, &status);
2782       while (*iptr >= 0)
2783 	{*(dptr1 + *iptr) = MAX(*(dptr1 + *iptr),*in2); iptr++; in2++;}
2784     }
2785 
2786   /* replace vals */
2787   while (*pw >= 0)
2788     {*(in_vals + *pw++) = *dptr1++;}
2789 
2790   /* clear isend message handles */
2791   /* This changed for clarity though it could be the same */
2792   while (*msg_nodes++)
2793     /* Should I check the return value of MPI_Wait() or status? */
2794     /* Can this loop be replaced by a call to MPI_Waitall()? */
2795     {MPI_Wait(ids_out++, &status);}
2796 #else
2797   return;
2798 #endif
2799 }
2800 
2801 
2802 
2803 /******************************************************************************
2804 Function: gather_scatter
2805 
2806 Input :
2807 Output:
2808 Return:
2809 Description:
2810 ******************************************************************************/
2811 static
2812 void
2813 gs_gop_tree_max(gs_id *gs, REAL *vals)
2814 {
2815   int size;
2816   int *in, *out;
2817   REAL *buf, *work;
2818   /* int op[] = {GL_MAX,0}; */
2819 
2820 
2821 #ifdef DEBUG
2822   error_msg_warning("start gs_gop_tree_max()");
2823 #endif
2824 
2825   in   = gs->tree_map_in;
2826   out  = gs->tree_map_out;
2827   buf  = gs->tree_buf;
2828   work = gs->tree_work;
2829   size = gs->tree_nel;
2830 
2831 #if defined BLAS||CBLAS
2832   *work = -REAL_MAX;
2833   copy(size,work,0,buf,1);
2834 #else
2835   rvec_set(buf,-REAL_MAX,size);
2836 #endif
2837 
2838   while (*in >= 0)
2839     {*(buf + *out++) = *(vals + *in++);}
2840 
2841   in   = gs->tree_map_in;
2842   out  = gs->tree_map_out;
2843 #if defined(NXSRC) && defined(r8)
2844   gdhigh(buf,size,work);
2845   while (*in >= 0)
2846     {*(vals + *in++) = *(buf + *out++);}
2847 #elif defined MPISRC
2848   MPI_Allreduce(buf,work,size,REAL_TYPE,MPI_MAX,gs->gs_comm);
2849   while (*in >= 0)
2850     {*(vals + *in++) = *(work + *out++);}
2851 #else
2852   grop(buf,work,size,op);
2853   while (*in >= 0)
2854     {*(vals + *in++) = *(buf + *out++);}
2855 #endif
2856 
2857 #ifdef DEBUG
2858   error_msg_warning("end gs_gop_tree_max()");
2859 #endif
2860 }
2861 
2862 
2863 
2864 /******************************************************************************
2865 Function: gather_scatter
2866 
2867 Input :
2868 Output:
2869 Return:
2870 Description:
2871 ******************************************************************************/
2872 static void
2873 gs_gop_min_abs(register gs_id *gs, register REAL *vals)
2874 {
2875 #ifdef DEBUG
2876   error_msg_warning("start gs_gop_xxx()\n");
2877 #endif
2878 
2879   /* local only operations!!! */
2880   if (gs->num_local)
2881     {gs_gop_local_min_abs(gs,vals);}
2882 
2883   /* if intersection tree/pairwise and local isn't empty */
2884   if (gs->num_local_gop)
2885     {
2886       gs_gop_local_in_min_abs(gs,vals);
2887 
2888       /* pairwise */
2889       if (gs->num_pairs)
2890 	{gs_gop_pairwise_min_abs(gs,vals);}
2891 
2892       /* tree */
2893       else if (gs->max_left_over)
2894 	{gs_gop_tree_min_abs(gs,vals);}
2895 
2896       gs_gop_local_out(gs,vals);
2897     }
2898   /* if intersection tree/pairwise and local is empty */
2899   else
2900     {
2901       /* pairwise */
2902       if (gs->num_pairs)
2903 	{gs_gop_pairwise_min_abs(gs,vals);}
2904 
2905       /* tree */
2906       else if (gs->max_left_over)
2907 	{gs_gop_tree_min_abs(gs,vals);}
2908     }
2909 }
2910 
2911 
2912 
2913 /******************************************************************************
2914 Function: gather_scatter
2915 
2916 Input :
2917 Output:
2918 Return:
2919 Description:
2920 ******************************************************************************/
2921 static
2922 void
2923 gs_gop_local_min_abs(register gs_id *gs, register REAL *vals)
2924 {
2925   register int *num, *map, **reduce;
2926   register REAL tmp;
2927 
2928 
2929 #ifdef DEBUG
2930   error_msg_warning("start gs_gop_xxx()\n");
2931 #endif
2932 
2933   num    = gs->num_local_reduce;
2934   reduce = gs->local_reduce;
2935   while ((map = *reduce))
2936     {
2937       num ++;
2938       tmp = REAL_MAX;
2939       while (*map >= 0)
2940 	{tmp = MIN_FABS(tmp,*(vals + *map)); map++;}
2941 
2942       map = *reduce++;
2943       while (*map >= 0)
2944 	{*(vals + *map++) = tmp;}
2945     }
2946 }
2947 
2948 
2949 
2950 /******************************************************************************
2951 Function: gather_scatter
2952 
2953 Input :
2954 Output:
2955 Return:
2956 Description:
2957 ******************************************************************************/
2958 static
2959 void
2960 gs_gop_local_in_min_abs(register gs_id *gs, register REAL *vals)
2961 {
2962   register int *num, *map, **reduce;
2963   register REAL *base;
2964 
2965 #ifdef DEBUG
2966   error_msg_warning("start gs_gop_xxx()\n");
2967 #endif
2968 
2969   num    = gs->num_gop_local_reduce;
2970   reduce = gs->gop_local_reduce;
2971   while ((map = *reduce++))
2972     {
2973       num++;
2974       base = vals + *map++;
2975       while (*map >= 0)
2976 	{*base = MIN_FABS(*base,*(vals + *map)); map++;}
2977     }
2978 }
2979 
2980 
2981 
2982 /******************************************************************************
2983 Function: gather_scatter
2984 
2985 VERSION 3 ::
2986 
2987 Input :
2988 Output:
2989 Return:
2990 Description:
2991 ******************************************************************************/
2992 static
2993 void
2994 gs_gop_pairwise_min_abs(register gs_id *gs, register REAL *in_vals)
2995 {
2996 #if   defined NXSRC
2997   register REAL *dptr1, *dptr2, *dptr3, *in1, *in2;
2998   register int *iptr, *msg_list, *msg_size, **msg_nodes;
2999   register int *pw, *list, *size, **nodes;
3000   register int *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
3001 
3002 
3003 #ifdef DEBUG
3004   error_msg_warning("start gs_gop_xxx()\n");
3005 #endif
3006 
3007   /* strip and load registers */
3008   msg_list =list         = gs->pair_list;
3009   msg_size =size         = gs->msg_sizes;
3010   msg_nodes=nodes        = gs->node_list;
3011   iptr=pw                = gs->pw_elm_list;
3012   dptr1=dptr3            = gs->pw_vals;
3013   msg_ids_in  = ids_in   = gs->msg_ids_in;
3014   msg_ids_out = ids_out  = gs->msg_ids_out;
3015   dptr2                  = gs->out;
3016   in1=in2                = gs->in;
3017 
3018   /* post the receives */
3019   do
3020     {
3021       *msg_ids_in++ = (int) irecv(MSGTAG1 + *list++,(char *)in1,*size*REAL_LEN);
3022       in1 += *size++;
3023     }
3024   while (*msg_ids_in >= 0);
3025 
3026   /* load gs values into in out gs buffers */
3027   while (*iptr >= 0)
3028     {*dptr3++ = *(in_vals + *iptr++);}
3029 
3030   /* load out buffers and post the sends */
3031   while (iptr = *msg_nodes++)
3032     {
3033       dptr3 = dptr2;
3034       while (*iptr >= 0)
3035 	{*dptr2++ = *(dptr1 + *iptr++);}
3036       *msg_ids_out++ = (int) isend(MSGTAG1+my_id,(char *)dptr3,*(msg_size++)*REAL_LEN,
3037 			     *msg_list++,0);
3038     }
3039 
3040   /* post the receives ... was here*/
3041   if (gs->max_left_over)
3042     {gs_gop_tree_min_abs(gs,in_vals);}
3043 
3044   /* process the received data */
3045   while (iptr = *nodes++)
3046     {
3047       msgwait(*ids_in++);
3048       while (*iptr >= 0)
3049 	{*(dptr1 + *iptr) = MIN_FABS(*(dptr1 + *iptr),*in2); iptr++; in2++;}
3050     }
3051 
3052   /* replace vals */
3053   while (*pw >= 0)
3054     {*(in_vals + *pw++) = *dptr1++;}
3055 
3056   /* clear isend message handles */
3057   while (*ids_out >= 0)
3058     {msgwait(*ids_out++);}
3059 
3060 #elif defined MPISRC
3061   register REAL *dptr1, *dptr2, *dptr3, *in1, *in2;
3062   register int *iptr, *msg_list, *msg_size, **msg_nodes;
3063   register int *pw, *list, *size, **nodes;
3064   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
3065   MPI_Status status;
3066 
3067 
3068   /* strip and load registers */
3069   msg_list =list         = gs->pair_list;
3070   msg_size =size         = gs->msg_sizes;
3071   msg_nodes=nodes        = gs->node_list;
3072   iptr=pw                = gs->pw_elm_list;
3073   dptr1=dptr3            = gs->pw_vals;
3074   msg_ids_in  = ids_in   = gs->msg_ids_in;
3075   msg_ids_out = ids_out  = gs->msg_ids_out;
3076   dptr2                  = gs->out;
3077   in1=in2                = gs->in;
3078 
3079   /* post the receives */
3080   /*  msg_nodes=nodes; */
3081   do
3082     {
3083       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
3084 	 second one *list and do list++ afterwards */
3085       MPI_Irecv(in1, *size, REAL_TYPE, MPI_ANY_SOURCE, MSGTAG1 + *list++,
3086 		gs->gs_comm, msg_ids_in++);
3087       in1 += *size++;
3088     }
3089   while (*++msg_nodes);
3090   msg_nodes=nodes;
3091 
3092   /* load gs values into in out gs buffers */
3093   while (*iptr >= 0)
3094     {*dptr3++ = *(in_vals + *iptr++);}
3095 
3096   /* load out buffers and post the sends */
3097   while ((iptr = *msg_nodes++))
3098     {
3099       dptr3 = dptr2;
3100       while (*iptr >= 0)
3101 	{*dptr2++ = *(dptr1 + *iptr++);}
3102       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
3103       /* is msg_ids_out++ correct? */
3104       MPI_Isend(dptr3, *msg_size++, REAL_TYPE, *msg_list++,
3105 		MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);
3106     }
3107 
3108   if (gs->max_left_over)
3109     {gs_gop_tree_min_abs(gs,in_vals);}
3110 
3111   /* process the received data */
3112   msg_nodes=nodes;
3113   while ((iptr = *nodes++))
3114     {
3115       /* Should I check the return value of MPI_Wait() or status? */
3116       /* Can this loop be replaced by a call to MPI_Waitall()? */
3117       MPI_Wait(ids_in++, &status);
3118       while (*iptr >= 0)
3119 	{*(dptr1 + *iptr) = MIN_FABS(*(dptr1 + *iptr),*in2); iptr++; in2++;}
3120     }
3121 
3122   /* replace vals */
3123   while (*pw >= 0)
3124     {*(in_vals + *pw++) = *dptr1++;}
3125 
3126   /* clear isend message handles */
3127   /* This changed for clarity though it could be the same */
3128   while (*msg_nodes++)
3129     /* Should I check the return value of MPI_Wait() or status? */
3130     /* Can this loop be replaced by a call to MPI_Waitall()? */
3131     {MPI_Wait(ids_out++, &status);}
3132 #else
3133   return;
3134 #endif
3135 }
3136 
3137 
3138 
3139 /******************************************************************************
3140 Function: gather_scatter
3141 
3142 Input :
3143 Output:
3144 Return:
3145 Description:
3146 ******************************************************************************/
3147 static
3148 void
3149 gs_gop_tree_min_abs(gs_id *gs, REAL *vals)
3150 {
3151   int size;
3152   int *in, *out;
3153   REAL *buf, *work;
3154   int op[] = {GL_MIN_ABS,0};
3155 
3156 
3157 #ifdef DEBUG
3158   error_msg_warning("start gs_gop_tree_min_abs()");
3159 #endif
3160 
3161   in   = gs->tree_map_in;
3162   out  = gs->tree_map_out;
3163   buf  = gs->tree_buf;
3164   work = gs->tree_work;
3165   size = gs->tree_nel;
3166 
3167 #if defined  BLAS||CBLAS
3168   *work = REAL_MAX;
3169   copy(size,work,0,buf,1);
3170 #else
3171   rvec_set(buf,REAL_MAX,size);
3172 #endif
3173 
3174   while (*in >= 0)
3175     {*(buf + *out++) = *(vals + *in++);}
3176 
3177   in   = gs->tree_map_in;
3178   out  = gs->tree_map_out;
3179   grop(buf,work,size,op);
3180   while (*in >= 0)
3181     {*(vals + *in++) = *(buf + *out++);}
3182 
3183 #ifdef DEBUG
3184   error_msg_warning("end gs_gop_tree_min_abs()");
3185 #endif
3186 }
3187 
3188 
3189 
3190 /******************************************************************************
3191 Function: gather_scatter
3192 
3193 Input :
3194 Output:
3195 Return:
3196 Description:
3197 ******************************************************************************/
3198 static void
3199 gs_gop_min(register gs_id *gs, register REAL *vals)
3200 {
3201 #ifdef DEBUG
3202   error_msg_warning("start gs_gop_xxx()\n");
3203 #endif
3204 
3205   /* local only operations!!! */
3206   if (gs->num_local)
3207     {gs_gop_local_min(gs,vals);}
3208 
3209   /* if intersection tree/pairwise and local isn't empty */
3210   if (gs->num_local_gop)
3211     {
3212       gs_gop_local_in_min(gs,vals);
3213 
3214       /* pairwise */
3215       if (gs->num_pairs)
3216 	{gs_gop_pairwise_min(gs,vals);}
3217 
3218       /* tree */
3219       else if (gs->max_left_over)
3220 	{gs_gop_tree_min(gs,vals);}
3221 
3222       gs_gop_local_out(gs,vals);
3223     }
3224   /* if intersection tree/pairwise and local is empty */
3225   else
3226     {
3227       /* pairwise */
3228       if (gs->num_pairs)
3229 	{gs_gop_pairwise_min(gs,vals);}
3230 
3231       /* tree */
3232       else if (gs->max_left_over)
3233 	{gs_gop_tree_min(gs,vals);}
3234     }
3235 #ifdef DEBUG
3236   error_msg_warning("end gs_gop_xxx()\n");
3237 #endif
3238 }
3239 
3240 
3241 
3242 /******************************************************************************
3243 Function: gather_scatter
3244 
3245 Input :
3246 Output:
3247 Return:
3248 Description:
3249 ******************************************************************************/
3250 static
3251 void
3252 gs_gop_local_min(register gs_id *gs, register REAL *vals)
3253 {
3254   register int *num, *map, **reduce;
3255   register REAL tmp;
3256 
3257 
3258 #ifdef DEBUG
3259   error_msg_warning("start gs_gop_xxx()\n");
3260 #endif
3261 
3262   num    = gs->num_local_reduce;
3263   reduce = gs->local_reduce;
3264   while ((map = *reduce))
3265     {
3266       num ++;
3267       tmp = REAL_MAX;
3268       while (*map >= 0)
3269 	{tmp = MIN(tmp,*(vals + *map)); map++;}
3270 
3271       map = *reduce++;
3272       while (*map >= 0)
3273 	{*(vals + *map++) = tmp;}
3274     }
3275 }
3276 
3277 
3278 
3279 /******************************************************************************
3280 Function: gather_scatter
3281 
3282 Input :
3283 Output:
3284 Return:
3285 Description:
3286 ******************************************************************************/
3287 static
3288 void
3289 gs_gop_local_in_min(register gs_id *gs, register REAL *vals)
3290 {
3291   register int *num, *map, **reduce;
3292   register REAL *base;
3293 
3294 
3295 #ifdef DEBUG
3296   error_msg_warning("start gs_gop_xxx()\n");
3297 #endif
3298 
3299   num    = gs->num_gop_local_reduce;
3300   reduce = gs->gop_local_reduce;
3301   while ((map = *reduce++))
3302     {
3303       num++;
3304       base = vals + *map++;
3305       while (*map >= 0)
3306 	{*base = MIN(*base,*(vals + *map)); map++;}
3307     }
3308 }
3309 
3310 
3311 
3312 /******************************************************************************
3313 Function: gather_scatter
3314 
3315 VERSION 3 ::
3316 
3317 Input :
3318 Output:
3319 Return:
3320 Description:
3321 ******************************************************************************/
3322 static
3323 void
3324 gs_gop_pairwise_min(register gs_id *gs, register REAL *in_vals)
3325 {
3326 #if   defined NXSRC
3327   register REAL *dptr1, *dptr2, *dptr3, *in1, *in2;
3328   register int *iptr, *msg_list, *msg_size, **msg_nodes;
3329   register int *pw, *list, *size, **nodes;
3330   register int *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
3331 
3332 
3333 #ifdef DEBUG
3334   error_msg_warning("start gs_gop_xxx()\n");
3335 #endif
3336 
3337   /* strip and load registers */
3338   msg_list =list         = gs->pair_list;
3339   msg_size =size         = gs->msg_sizes;
3340   msg_nodes=nodes        = gs->node_list;
3341   iptr=pw                = gs->pw_elm_list;
3342   dptr1=dptr3            = gs->pw_vals;
3343   msg_ids_in  = ids_in   = gs->msg_ids_in;
3344   msg_ids_out = ids_out  = gs->msg_ids_out;
3345   dptr2                  = gs->out;
3346   in1=in2                = gs->in;
3347 
3348   /* post the receives */
3349   do
3350     {
3351       *msg_ids_in++ = (int) irecv(MSGTAG1 + *list++,(char *)in1,*size*REAL_LEN);
3352       in1 += *size++;
3353     }
3354   while (*msg_ids_in >= 0);
3355 
3356   /* load gs values into in out gs buffers */
3357   while (*iptr >= 0)
3358     {*dptr3++ = *(in_vals + *iptr++);}
3359 
3360   /* load out buffers and post the sends */
3361   while (iptr = *msg_nodes++)
3362     {
3363       dptr3 = dptr2;
3364       while (*iptr >= 0)
3365 	{*dptr2++ = *(dptr1 + *iptr++);}
3366       *msg_ids_out++ = (int) isend(MSGTAG1+my_id,(char *)dptr3,*(msg_size++)*REAL_LEN,
3367 			     *msg_list++,0);
3368     }
3369 
3370   /* post the receives ... was here*/
3371   if (gs->max_left_over)
3372     {gs_gop_tree_min(gs,in_vals);}
3373 
3374   /* process the received data */
3375   while (iptr = *nodes++)
3376     {
3377       msgwait(*ids_in++);
3378       while (*iptr >= 0)
3379 	{*(dptr1 + *iptr) = MIN(*(dptr1 + *iptr),*in2); iptr++; in2++;}
3380     }
3381 
3382   /* replace vals */
3383   while (*pw >= 0)
3384     {*(in_vals + *pw++) = *dptr1++;}
3385 
3386   /* clear isend message handles */
3387   while (*ids_out >= 0)
3388     {msgwait(*ids_out++);}
3389 
3390 #elif defined MPISRC
3391   register REAL *dptr1, *dptr2, *dptr3, *in1, *in2;
3392   register int *iptr, *msg_list, *msg_size, **msg_nodes;
3393   register int *pw, *list, *size, **nodes;
3394   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
3395   MPI_Status status;
3396 
3397 
3398   /* strip and load registers */
3399   msg_list =list         = gs->pair_list;
3400   msg_size =size         = gs->msg_sizes;
3401   msg_nodes=nodes        = gs->node_list;
3402   iptr=pw                = gs->pw_elm_list;
3403   dptr1=dptr3            = gs->pw_vals;
3404   msg_ids_in  = ids_in   = gs->msg_ids_in;
3405   msg_ids_out = ids_out  = gs->msg_ids_out;
3406   dptr2                  = gs->out;
3407   in1=in2                = gs->in;
3408 
3409   /* post the receives */
3410   /*  msg_nodes=nodes; */
3411   do
3412     {
3413       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
3414 	 second one *list and do list++ afterwards */
3415       MPI_Irecv(in1, *size, REAL_TYPE, MPI_ANY_SOURCE, MSGTAG1 + *list++,
3416 		gs->gs_comm, msg_ids_in++);
3417       in1 += *size++;
3418     }
3419   while (*++msg_nodes);
3420   msg_nodes=nodes;
3421 
3422   /* load gs values into in out gs buffers */
3423   while (*iptr >= 0)
3424     {*dptr3++ = *(in_vals + *iptr++);}
3425 
3426   /* load out buffers and post the sends */
3427   while ((iptr = *msg_nodes++))
3428     {
3429       dptr3 = dptr2;
3430       while (*iptr >= 0)
3431 	{*dptr2++ = *(dptr1 + *iptr++);}
3432       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
3433       /* is msg_ids_out++ correct? */
3434       MPI_Isend(dptr3, *msg_size++, REAL_TYPE, *msg_list++,
3435 		MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);
3436     }
3437 
3438   /* process the received data */
3439   if (gs->max_left_over)
3440     {gs_gop_tree_min(gs,in_vals);}
3441 
3442   msg_nodes=nodes;
3443   while ((iptr = *nodes++))
3444     {
3445       /* Should I check the return value of MPI_Wait() or status? */
3446       /* Can this loop be replaced by a call to MPI_Waitall()? */
3447       MPI_Wait(ids_in++, &status);
3448       while (*iptr >= 0)
3449 	{*(dptr1 + *iptr) = MIN(*(dptr1 + *iptr),*in2); iptr++; in2++;}
3450     }
3451 
3452   /* replace vals */
3453   while (*pw >= 0)
3454     {*(in_vals + *pw++) = *dptr1++;}
3455 
3456   /* clear isend message handles */
3457   /* This changed for clarity though it could be the same */
3458   while (*msg_nodes++)
3459     /* Should I check the return value of MPI_Wait() or status? */
3460     /* Can this loop be replaced by a call to MPI_Waitall()? */
3461     {MPI_Wait(ids_out++, &status);}
3462 #else
3463   return;
3464 #endif
3465 }
3466 
3467 
3468 
3469 /******************************************************************************
3470 Function: gather_scatter
3471 
3472 Input :
3473 Output:
3474 Return:
3475 Description:
3476 ******************************************************************************/
3477 static
3478 void
3479 gs_gop_tree_min(gs_id *gs, REAL *vals)
3480 {
3481   int size;
3482   int *in, *out;
3483   REAL *buf, *work;
3484   /*int op[] = {GL_MIN,0};*/
3485 
3486 
3487 #ifdef DEBUG
3488   error_msg_warning("start gs_gop_tree_min()");
3489 #endif
3490 
3491   in   = gs->tree_map_in;
3492   out  = gs->tree_map_out;
3493   buf  = gs->tree_buf;
3494   work = gs->tree_work;
3495   size = gs->tree_nel;
3496 
3497 #if defined  BLAS||CBLAS
3498   *work = REAL_MAX;
3499   copy(size,work,0,buf,1);
3500 #else
3501   rvec_set(buf,REAL_MAX,size);
3502 #endif
3503 
3504   while (*in >= 0)
3505     {*(buf + *out++) = *(vals + *in++);}
3506 
3507   in   = gs->tree_map_in;
3508   out  = gs->tree_map_out;
3509 #if defined(NXSRC) && defined(r8)
3510   gdlow(buf,size,work);
3511   while (*in >= 0)
3512     {*(vals + *in++) = *(buf + *out++);}
3513 #elif defined MPISRC
3514   MPI_Allreduce(buf,work,size,REAL_TYPE,MPI_MIN,gs->gs_comm);
3515   while (*in >= 0)
3516     {*(vals + *in++) = *(work + *out++);}
3517 #else
3518   grop(buf,work,size,op);
3519   while (*in >= 0)
3520     {*(vals + *in++) = *(buf + *out++);}
3521 #endif
3522 
3523 #ifdef DEBUG
3524   error_msg_warning("end gs_gop_tree_min()");
3525 #endif
3526 }
3527 
3528 
3529 
3530 /******************************************************************************
3531 Function: gather_scatter
3532 
3533 Input :
3534 Output:
3535 Return:
3536 Description:
3537 ******************************************************************************/
3538 static void
3539 gs_gop_times(register gs_id *gs, register REAL *vals)
3540 {
3541 #ifdef DEBUG
3542   error_msg_warning("start gs_gop_times()\n");
3543 #endif
3544 
3545   /* local only operations!!! */
3546   if (gs->num_local)
3547     {gs_gop_local_times(gs,vals);}
3548 
3549   /* if intersection tree/pairwise and local isn't empty */
3550   if (gs->num_local_gop)
3551     {
3552       gs_gop_local_in_times(gs,vals);
3553 
3554       /* pairwise */
3555       if (gs->num_pairs)
3556 	{gs_gop_pairwise_times(gs,vals);}
3557 
3558       /* tree */
3559       else if (gs->max_left_over)
3560 	{gs_gop_tree_times(gs,vals);}
3561 
3562       gs_gop_local_out(gs,vals);
3563     }
3564   /* if intersection tree/pairwise and local is empty */
3565   else
3566     {
3567       /* pairwise */
3568       if (gs->num_pairs)
3569 	{gs_gop_pairwise_times(gs,vals);}
3570 
3571       /* tree */
3572       else if (gs->max_left_over)
3573 	{gs_gop_tree_times(gs,vals);}
3574     }
3575 }
3576 
3577 
3578 
3579 /******************************************************************************
3580 Function: gather_scatter
3581 
3582 Input :
3583 Output:
3584 Return:
3585 Description:
3586 ******************************************************************************/
3587 static
3588 void
3589 gs_gop_local_times(register gs_id *gs, register REAL *vals)
3590 {
3591   register int *num, *map, **reduce;
3592   register REAL tmp;
3593 
3594 
3595 #ifdef DEBUG
3596   error_msg_warning("start gs_gop_xxx()\n");
3597 #endif
3598 
3599   num    = gs->num_local_reduce;
3600   reduce = gs->local_reduce;
3601   while ((map = *reduce))
3602     {
3603       /* wall */
3604       if (*num == 2)
3605 	{
3606 	  num ++; reduce++;
3607 	  vals[map[1]] = vals[map[0]] *= vals[map[1]];
3608 	}
3609       /* corner shared by three elements */
3610       else if (*num == 3)
3611 	{
3612 	  num ++; reduce++;
3613 	  vals[map[2]]=vals[map[1]]=vals[map[0]]*=(vals[map[1]]*vals[map[2]]);
3614 	}
3615       /* corner shared by four elements */
3616       else if (*num == 4)
3617 	{
3618 	  num ++; reduce++;
3619 	  vals[map[1]]=vals[map[2]]=vals[map[3]]=vals[map[0]] *=
3620 	                         (vals[map[1]] * vals[map[2]] * vals[map[3]]);
3621 	}
3622       /* general case ... odd geoms ... 3D*/
3623       else
3624 	{
3625 	  num ++;
3626  	  tmp = 1.0;
3627 	  while (*map >= 0)
3628 	    {tmp *= *(vals + *map++);}
3629 
3630 	  map = *reduce++;
3631 	  while (*map >= 0)
3632 	    {*(vals + *map++) = tmp;}
3633 	}
3634     }
3635 }
3636 
3637 
3638 
3639 /******************************************************************************
3640 Function: gather_scatter
3641 
3642 Input :
3643 Output:
3644 Return:
3645 Description:
3646 ******************************************************************************/
3647 static
3648 void
3649 gs_gop_local_in_times(register gs_id *gs, register REAL *vals)
3650 {
3651   register int *num, *map, **reduce;
3652   register REAL *base;
3653 
3654 
3655 #ifdef DEBUG
3656   error_msg_warning("start gs_gop_xxx()\n");
3657 #endif
3658 
3659   num    = gs->num_gop_local_reduce;
3660   reduce = gs->gop_local_reduce;
3661   while ((map = *reduce++))
3662     {
3663       /* wall */
3664       if (*num == 2)
3665 	{
3666 	  num ++;
3667 	  vals[map[0]] *= vals[map[1]];
3668 	}
3669       /* corner shared by three elements */
3670       else if (*num == 3)
3671 	{
3672 	  num ++;
3673 	  vals[map[0]] *= (vals[map[1]] * vals[map[2]]);
3674 	}
3675       /* corner shared by four elements */
3676       else if (*num == 4)
3677 	{
3678 	  num ++;
3679 	  vals[map[0]] *= (vals[map[1]] * vals[map[2]] * vals[map[3]]);
3680 	}
3681       /* general case ... odd geoms ... 3D*/
3682       else
3683 	{
3684 	  num++;
3685 	  base = vals + *map++;
3686 	  while (*map >= 0)
3687 	    {*base *= *(vals + *map++);}
3688 	}
3689     }
3690 }
3691 
3692 
3693 
3694 /******************************************************************************
3695 Function: gather_scatter
3696 
3697 VERSION 3 ::
3698 
3699 Input :
3700 Output:
3701 Return:
3702 Description:
3703 ******************************************************************************/
3704 static
3705 void
3706 gs_gop_pairwise_times(register gs_id *gs, register REAL *in_vals)
3707 {
3708 #if   defined NXSRC
3709   register REAL *dptr1, *dptr2, *dptr3, *in1, *in2;
3710   register int *iptr, *msg_list, *msg_size, **msg_nodes;
3711   register int *pw, *list, *size, **nodes;
3712   register int *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
3713 
3714 
3715 
3716 #ifdef DEBUG
3717   error_msg_warning("start gs_gop_xxx()\n");
3718 #endif
3719 
3720   /* strip and load registers */
3721   msg_list =list         = gs->pair_list;
3722   msg_size =size         = gs->msg_sizes;
3723   msg_nodes=nodes        = gs->node_list;
3724   iptr=pw                = gs->pw_elm_list;
3725   dptr1=dptr3            = gs->pw_vals;
3726   msg_ids_in  = ids_in   = gs->msg_ids_in;
3727   msg_ids_out = ids_out  = gs->msg_ids_out;
3728   dptr2                  = gs->out;
3729   in1=in2                = gs->in;
3730 
3731   /* post the receives */
3732   do
3733     {
3734       *msg_ids_in++ = (int) irecv(MSGTAG1 + *list++,(char *)in1,*size*REAL_LEN);
3735       in1 += *size++;
3736     }
3737   while (*msg_ids_in >= 0);
3738 
3739   /* load gs values into in out gs buffers */
3740   while (*iptr >= 0)
3741     {*dptr3++ = *(in_vals + *iptr++);}
3742 
3743   /* load out buffers and post the sends */
3744   while (iptr = *msg_nodes++)
3745     {
3746       dptr3 = dptr2;
3747       while (*iptr >= 0)
3748 	{*dptr2++ = *(dptr1 + *iptr++);}
3749       *msg_ids_out++ = (int) isend(MSGTAG1+my_id,(char *)dptr3,*(msg_size++)*REAL_LEN,
3750 			     *msg_list++,0);
3751     }
3752 
3753   /* post the receives ... was here*/
3754   if (gs->max_left_over)
3755     {gs_gop_tree_times(gs,in_vals);}
3756 
3757   /* process the received data */
3758   while (iptr = *nodes++)
3759     {
3760       msgwait(*ids_in++);
3761       while (*iptr >= 0)
3762 	{*(dptr1 + *iptr++) *= *in2++;}
3763     }
3764 
3765   /* replace vals */
3766   while (*pw >= 0)
3767     {*(in_vals + *pw++) = *dptr1++;}
3768 
3769   /* clear isend message handles */
3770   while (*ids_out >= 0)
3771     {msgwait(*ids_out++);}
3772 
3773 #elif defined MPISRC
3774   register REAL *dptr1, *dptr2, *dptr3, *in1, *in2;
3775   register int *iptr, *msg_list, *msg_size, **msg_nodes;
3776   register int *pw, *list, *size, **nodes;
3777   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
3778   MPI_Status status;
3779 
3780 
3781   /* strip and load registers */
3782   msg_list =list         = gs->pair_list;
3783   msg_size =size         = gs->msg_sizes;
3784   msg_nodes=nodes        = gs->node_list;
3785   iptr=pw                = gs->pw_elm_list;
3786   dptr1=dptr3            = gs->pw_vals;
3787   msg_ids_in  = ids_in   = gs->msg_ids_in;
3788   msg_ids_out = ids_out  = gs->msg_ids_out;
3789   dptr2                  = gs->out;
3790   in1=in2                = gs->in;
3791 
3792   /* post the receives */
3793   /*  msg_nodes=nodes; */
3794   do
3795     {
3796       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
3797 	 second one *list and do list++ afterwards */
3798       MPI_Irecv(in1, *size, REAL_TYPE, MPI_ANY_SOURCE, MSGTAG1 + *list++,
3799 		gs->gs_comm, msg_ids_in++);
3800       in1 += *size++;
3801     }
3802   while (*++msg_nodes);
3803   msg_nodes=nodes;
3804 
3805   /* load gs values into in out gs buffers */
3806   while (*iptr >= 0)
3807     {*dptr3++ = *(in_vals + *iptr++);}
3808 
3809   /* load out buffers and post the sends */
3810   while ((iptr = *msg_nodes++))
3811     {
3812       dptr3 = dptr2;
3813       while (*iptr >= 0)
3814 	{*dptr2++ = *(dptr1 + *iptr++);}
3815       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
3816       /* is msg_ids_out++ correct? */
3817       MPI_Isend(dptr3, *msg_size++, REAL_TYPE, *msg_list++,
3818 		MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);
3819     }
3820 
3821   if (gs->max_left_over)
3822     {gs_gop_tree_times(gs,in_vals);}
3823 
3824   /* process the received data */
3825   msg_nodes=nodes;
3826   while ((iptr = *nodes++))
3827     {
3828       /* Should I check the return value of MPI_Wait() or status? */
3829       /* Can this loop be replaced by a call to MPI_Waitall()? */
3830       MPI_Wait(ids_in++, &status);
3831       while (*iptr >= 0)
3832 	{*(dptr1 + *iptr++) *= *in2++;}
3833     }
3834 
3835   /* replace vals */
3836   while (*pw >= 0)
3837     {*(in_vals + *pw++) = *dptr1++;}
3838 
3839   /* clear isend message handles */
3840   /* This changed for clarity though it could be the same */
3841   while (*msg_nodes++)
3842     /* Should I check the return value of MPI_Wait() or status? */
3843     /* Can this loop be replaced by a call to MPI_Waitall()? */
3844     {MPI_Wait(ids_out++, &status);}
3845 #else
3846   return;
3847 #endif
3848 }
3849 
3850 
3851 
3852 /******************************************************************************
3853 Function: gather_scatter
3854 
3855 Input :
3856 Output:
3857 Return:
3858 Description:
3859 ******************************************************************************/
3860 static
3861 void
3862 gs_gop_tree_times(gs_id *gs, REAL *vals)
3863 {
3864   int size;
3865   int *in, *out;
3866   REAL *buf, *work;
3867   /*int op[] = {GL_MULT,0};*/
3868 
3869 
3870 #ifdef DEBUG
3871   error_msg_warning("start gs_gop_tree_times()");
3872 #endif
3873 
3874   in   = gs->tree_map_in;
3875   out  = gs->tree_map_out;
3876   buf  = gs->tree_buf;
3877   work = gs->tree_work;
3878   size = gs->tree_nel;
3879 
3880 #if defined  BLAS||CBLAS
3881   *work = 1.0;
3882   copy(size,work,0,buf,1);
3883 #else
3884   rvec_one(buf,size);
3885 #endif
3886 
3887   while (*in >= 0)
3888     {*(buf + *out++) = *(vals + *in++);}
3889 
3890   in   = gs->tree_map_in;
3891   out  = gs->tree_map_out;
3892 #if defined(NXSRC) && defined(r8)
3893   gdprod(buf,size,work);
3894   while (*in >= 0)
3895     {*(vals + *in++) = *(buf + *out++);}
3896 #elif defined MPISRC
3897   MPI_Allreduce(buf,work,size,REAL_TYPE,MPI_PROD,gs->gs_comm);
3898   while (*in >= 0)
3899     {*(vals + *in++) = *(work + *out++);}
3900 #else
3901   grop(buf,work,size,op);
3902   while (*in >= 0)
3903     {*(vals + *in++) = *(buf + *out++);}
3904 #endif
3905 
3906 #ifdef DEBUG
3907   error_msg_warning("end gs_gop_tree_times()");
3908 #endif
3909 }
3910 
3911 
3912 
3913 /******************************************************************************
3914 Function: gather_scatter
3915 
3916 
3917 Input :
3918 Output:
3919 Return:
3920 Description:
3921 ******************************************************************************/
3922 static void
3923 gs_gop_plus(register gs_id *gs, register REAL *vals)
3924 {
3925 #ifdef DEBUG
3926   error_msg_warning("start gs_gop_plus()\n");
3927 #endif
3928 
3929   /* local only operations!!! */
3930   if (gs->num_local)
3931     {gs_gop_local_plus(gs,vals);}
3932 
3933   /* if intersection tree/pairwise and local isn't empty */
3934   if (gs->num_local_gop)
3935     {
3936       gs_gop_local_in_plus(gs,vals);
3937 
3938       /* pairwise will NOT do tree inside ... */
3939       if (gs->num_pairs)
3940 	{gs_gop_pairwise_plus(gs,vals);}
3941 
3942       /* tree */
3943       if (gs->max_left_over)
3944 	{gs_gop_tree_plus(gs,vals);}
3945 
3946       gs_gop_local_out(gs,vals);
3947     }
3948   /* if intersection tree/pairwise and local is empty */
3949   else
3950     {
3951       /* pairwise will NOT do tree inside */
3952       if (gs->num_pairs)
3953 	{gs_gop_pairwise_plus(gs,vals);}
3954 
3955       /* tree */
3956       if (gs->max_left_over)
3957 	{gs_gop_tree_plus(gs,vals);}
3958     }
3959 
3960 #ifdef DEBUG
3961   error_msg_warning("end gs_gop_plus()\n");
3962 #endif
3963 }
3964 
3965 
3966 
3967 /******************************************************************************
3968 Function: gather_scatter
3969 
3970 Input :
3971 Output:
3972 Return:
3973 Description:
3974 ******************************************************************************/
3975 static
3976 void
3977 gs_gop_local_plus(register gs_id *gs, register REAL *vals)
3978 {
3979   register int *num, *map, **reduce;
3980   register REAL tmp;
3981 
3982 
3983 #ifdef DEBUG
3984   error_msg_warning("begin gs_gop_local_plus()\n");
3985 #endif
3986 
3987   num    = gs->num_local_reduce;
3988   reduce = gs->local_reduce;
3989   while ((map = *reduce))
3990     {
3991       /* wall */
3992       if (*num == 2)
3993 	{
3994 	  num ++; reduce++;
3995 	  vals[map[1]] = vals[map[0]] += vals[map[1]];
3996 	}
3997       /* corner shared by three elements */
3998       else if (*num == 3)
3999 	{
4000 	  num ++; reduce++;
4001 	  vals[map[2]]=vals[map[1]]=vals[map[0]]+=(vals[map[1]]+vals[map[2]]);
4002 	}
4003       /* corner shared by four elements */
4004       else if (*num == 4)
4005 	{
4006 	  num ++; reduce++;
4007 	  vals[map[1]]=vals[map[2]]=vals[map[3]]=vals[map[0]] +=
4008 	                         (vals[map[1]] + vals[map[2]] + vals[map[3]]);
4009 	}
4010       /* general case ... odd geoms ... 3D*/
4011       else
4012 	{
4013 	  num ++;
4014  	  tmp = 0.0;
4015 	  while (*map >= 0)
4016 	    {tmp += *(vals + *map++);}
4017 
4018 	  map = *reduce++;
4019 	  while (*map >= 0)
4020 	    {*(vals + *map++) = tmp;}
4021 	}
4022     }
4023 #ifdef DEBUG
4024   error_msg_warning("end gs_gop_local_plus()\n");
4025 #endif
4026 }
4027 
4028 
4029 
4030 /******************************************************************************
4031 Function: gather_scatter
4032 
4033 Input :
4034 Output:
4035 Return:
4036 Description:
4037 ******************************************************************************/
4038 static
4039 void
4040 gs_gop_local_in_plus(register gs_id *gs, register REAL *vals)
4041 {
4042   register int *num, *map, **reduce;
4043   register REAL *base;
4044 
4045 
4046 #ifdef DEBUG
4047   error_msg_warning("begin gs_gop_local_in_plus()\n");
4048 #endif
4049 
4050   num    = gs->num_gop_local_reduce;
4051   reduce = gs->gop_local_reduce;
4052   while ((map = *reduce++))
4053     {
4054       /* wall */
4055       if (*num == 2)
4056 	{
4057 	  num ++;
4058 	  vals[map[0]] += vals[map[1]];
4059 	}
4060       /* corner shared by three elements */
4061       else if (*num == 3)
4062 	{
4063 	  num ++;
4064 	  vals[map[0]] += (vals[map[1]] + vals[map[2]]);
4065 	}
4066       /* corner shared by four elements */
4067       else if (*num == 4)
4068 	{
4069 	  num ++;
4070 	  vals[map[0]] += (vals[map[1]] + vals[map[2]] + vals[map[3]]);
4071 	}
4072       /* general case ... odd geoms ... 3D*/
4073       else
4074 	{
4075 	  num++;
4076 	  base = vals + *map++;
4077 	  while (*map >= 0)
4078 	    {*base += *(vals + *map++);}
4079 	}
4080     }
4081 #ifdef DEBUG
4082   error_msg_warning("end gs_gop_local_in_plus()\n");
4083 #endif
4084 }
4085 
4086 
4087 
4088 /******************************************************************************
4089 Function: gather_scatter
4090 
4091 VERSION 3 ::
4092 
4093 Input :
4094 Output:
4095 Return:
4096 Description:
4097 ******************************************************************************/
4098 static
4099 void
4100 gs_gop_pairwise_plus(register gs_id *gs, register REAL *in_vals)
4101 {
4102 #if   defined NXSRC
4103   register REAL *dptr1, *dptr2, *dptr3, *in1, *in2;
4104   register int *iptr, *msg_list, *msg_size, **msg_nodes;
4105   register int *pw, *list, *size, **nodes;
4106   register int *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
4107 
4108 
4109 #ifdef DEBUG
4110   error_msg_warning("gs_gop_pairwise_plus() start\n");
4111 #endif
4112 
4113   /* strip and load registers */
4114   msg_list =list         = gs->pair_list;
4115   msg_size =size         = gs->msg_sizes;
4116   msg_nodes=nodes        = gs->node_list;
4117   iptr=pw                = gs->pw_elm_list;
4118   dptr1=dptr3            = gs->pw_vals;
4119   msg_ids_in  = ids_in   = gs->msg_ids_in;
4120   msg_ids_out = ids_out  = gs->msg_ids_out;
4121   dptr2                  = gs->out;
4122   in1=in2                = gs->in;
4123 
4124   /* post the receives */
4125   do
4126     {
4127       *msg_ids_in++ = (int) irecv(MSGTAG1 + *list++,(char *)in1,*size*REAL_LEN);
4128       in1 += *size++;
4129     }
4130   while (*msg_ids_in >= 0);
4131 
4132   /* load gs values into in out gs buffers */
4133   while (*iptr >= 0)
4134     {*dptr3++ = *(in_vals + *iptr++);}
4135 
4136   /* load out buffers and post the sends */
4137   while (iptr = *msg_nodes++)
4138     {
4139       dptr3 = dptr2;
4140       while (*iptr >= 0)
4141 	{*dptr2++ = *(dptr1 + *iptr++);}
4142       *msg_ids_out++ = (int) isend(MSGTAG1+my_id,(char *)dptr3,*(msg_size++)*REAL_LEN,
4143 			     *msg_list++,0);
4144     }
4145 
4146   /* post the receives ... was here*/
4147 
4148   /* do the tree while we're waiting */
4149   /*
4150   if (gs->max_left_over)
4151     {gs_gop_tree_plus(gs,in_vals);}
4152   */
4153 
4154   /* process the received data */
4155   while (iptr = *nodes++)
4156     {
4157       msgwait(*ids_in++);
4158       while (*iptr >= 0)
4159 	{*(dptr1 + *iptr++) += *in2++;}
4160     }
4161 
4162   /* replace vals */
4163   while (*pw >= 0)
4164     {*(in_vals + *pw++) = *dptr1++;}
4165 
4166   /* clear isend message handles */
4167   while (*ids_out >= 0)
4168     {msgwait(*ids_out++);}
4169 
4170 #ifdef DEBUG
4171   error_msg_warning("gs_gop_pairwise_plus() end\n");
4172 #endif
4173 
4174 #elif defined MPISRC
4175   register REAL *dptr1, *dptr2, *dptr3, *in1, *in2;
4176   register int *iptr, *msg_list, *msg_size, **msg_nodes;
4177   register int *pw, *list, *size, **nodes;
4178   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
4179   MPI_Status status;
4180 
4181 
4182 #ifdef DEBUG
4183   error_msg_warning("gs_gop_pairwise_plus() start\n");
4184 #endif
4185 
4186   /* strip and load registers */
4187   msg_list =list         = gs->pair_list;
4188   msg_size =size         = gs->msg_sizes;
4189   msg_nodes=nodes        = gs->node_list;
4190   iptr=pw                = gs->pw_elm_list;
4191   dptr1=dptr3            = gs->pw_vals;
4192   msg_ids_in  = ids_in   = gs->msg_ids_in;
4193   msg_ids_out = ids_out  = gs->msg_ids_out;
4194   dptr2                  = gs->out;
4195   in1=in2                = gs->in;
4196 
4197   /* post the receives */
4198   /*  msg_nodes=nodes; */
4199   do
4200     {
4201       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
4202 	 second one *list and do list++ afterwards */
4203       MPI_Irecv(in1, *size, REAL_TYPE, MPI_ANY_SOURCE, MSGTAG1 + *list++,
4204 		gs->gs_comm, msg_ids_in++);
4205       in1 += *size++;
4206     }
4207   while (*++msg_nodes);
4208   msg_nodes=nodes;
4209 
4210   /* load gs values into in out gs buffers */
4211   while (*iptr >= 0)
4212     {*dptr3++ = *(in_vals + *iptr++);}
4213 
4214   /* load out buffers and post the sends */
4215   while ((iptr = *msg_nodes++))
4216     {
4217       dptr3 = dptr2;
4218       while (*iptr >= 0)
4219 	{*dptr2++ = *(dptr1 + *iptr++);}
4220       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
4221       /* is msg_ids_out++ correct? */
4222       MPI_Isend(dptr3, *msg_size++, REAL_TYPE, *msg_list++,
4223 		MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);
4224     }
4225 
4226   /* do the tree while we're waiting */
4227   if (gs->max_left_over)
4228     {gs_gop_tree_plus(gs,in_vals);}
4229 
4230   /* process the received data */
4231   msg_nodes=nodes;
4232   while ((iptr = *nodes++))
4233     {
4234       /* Should I check the return value of MPI_Wait() or status? */
4235       /* Can this loop be replaced by a call to MPI_Waitall()? */
4236       MPI_Wait(ids_in++, &status);
4237       while (*iptr >= 0)
4238 	{*(dptr1 + *iptr++) += *in2++;}
4239     }
4240 
4241   /* replace vals */
4242   while (*pw >= 0)
4243     {*(in_vals + *pw++) = *dptr1++;}
4244 
4245   /* clear isend message handles */
4246   /* This changed for clarity though it could be the same */
4247   while (*msg_nodes++)
4248     /* Should I check the return value of MPI_Wait() or status? */
4249     /* Can this loop be replaced by a call to MPI_Waitall()? */
4250     {MPI_Wait(ids_out++, &status);}
4251 
4252 #ifdef DEBUG
4253   error_msg_warning("gs_gop_pairwise_plus() end\n");
4254 #endif
4255 
4256 #else
4257   return;
4258 #endif
4259 }
4260 
4261 
4262 
4263 /******************************************************************************
4264 Function: gather_scatter
4265 
4266 Input :
4267 Output:
4268 Return:
4269 Description:
4270 ******************************************************************************/
4271 static
4272 void
4273 gs_gop_tree_plus(gs_id *gs, REAL *vals)
4274 {
4275   int size;
4276   int *in, *out;
4277   REAL *buf, *work;
4278   /*int op[] = {GL_ADD,0}; */
4279 
4280 
4281 #ifdef DEBUG
4282   error_msg_warning("start gs_gop_tree_plus()\n");
4283 #endif
4284 
4285   in   = gs->tree_map_in;
4286   out  = gs->tree_map_out;
4287   buf  = gs->tree_buf;
4288   work = gs->tree_work;
4289   size = gs->tree_nel;
4290 
4291 #if defined  BLAS||CBLAS
4292   *work = 0.0;
4293   copy(size,work,0,buf,1);
4294 #else
4295   rvec_zero(buf,size);
4296 #endif
4297 
4298   while (*in >= 0)
4299     {*(buf + *out++) = *(vals + *in++);}
4300 
4301   in   = gs->tree_map_in;
4302   out  = gs->tree_map_out;
4303 #if defined(NXSRC) && defined(r8)
4304   gdsum(buf,size,work);
4305 
4306   /* grop(buf,work,size,op);  */
4307   while (*in >= 0)
4308     {*(vals + *in++) = *(buf + *out++);}
4309 #elif defined MPISRC
4310   MPI_Allreduce(buf,work,size,REAL_TYPE,MPI_SUM,gs->gs_comm);
4311   while (*in >= 0)
4312     {*(vals + *in++) = *(work + *out++);}
4313 #else
4314   grop(buf,work,size,op);
4315   while (*in >= 0)
4316     {*(vals + *in++) = *(buf + *out++);}
4317 #endif
4318 
4319 #ifdef DEBUG
4320   error_msg_warning("end gs_gop_tree_plus()\n");
4321 #endif
4322 }
4323 
4324 
4325 
4326 /******************************************************************************
4327 Function: level_best_guess()
4328 
4329 Input :
4330 Output:
4331 Return:
4332 Description:
4333 ******************************************************************************/
4334 #if defined(not_used)
4335 static
4336 int
4337 level_best_guess(void)
4338 {
4339   /* full pairwise for now */
4340   return(num_nodes);
4341 }
4342 #endif
4343 
4344 
4345 /******************************************************************************
4346 Function: gs_print_template()
4347 
4348 Input :
4349 
4350 Output:
4351 
4352 Return:
4353 
4354 Description:
4355 ******************************************************************************/
4356 #if defined(not_used)
4357 static
4358 void
4359 gs_print_template(register gs_id* gs, int who)
4360 {
4361   register int j, k, *iptr, *iptr2;
4362 
4363 
4364   if ((my_id == who) && (num_gs_ids))
4365     {
4366       printf("\n\nP#%d's GS#%d template:\n", my_id, gs->id);
4367       printf("id=%d\n",          gs->id);
4368       printf("nel(unique)=%d\n", gs->nel);
4369       printf("nel_max=%d\n",     gs->nel_max);
4370       printf("nel_min=%d\n",     gs->nel_min);
4371       printf("nel_sum=%d\n",     gs->nel_sum);
4372       printf("negl=%d\n",        gs->negl);
4373       printf("gl_max=%d\n",      gs->gl_max);
4374       printf("gl_min=%d\n",      gs->gl_min);
4375       printf("elms ordered=%d\n",gs->ordered);
4376       printf("repeats=%d\n",     gs->repeats);
4377       printf("positive=%d\n",    gs->positive);
4378       printf("elms=%ld\n",        (PTRINT) gs->elms);
4379       printf("elms(total)=%ld\n", (PTRINT) gs->local_elms);
4380       printf("vals=%ld\n",        (PTRINT) gs->vals);
4381       printf("gl_bss_min=%d\n",  gs->gl_bss_min);
4382       printf("gl_perm_min=%d\n", gs->gl_perm_min);
4383       printf("level=%d\n",       gs->level);
4384       printf("proc_mask_sz=%d\n",gs->mask_sz);
4385       printf("sh_proc_mask=%ld\n",(PTRINT) gs->nghs);
4386       printf("ngh_buf_size=%d\n",gs->ngh_buf_sz);
4387       printf("ngh_buf=%ld\n",     (PTRINT) gs->ngh_buf);
4388       printf("num_nghs=%d\n",    gs->num_nghs);
4389       printf("max_nghs=%d\n",    gs->max_nghs);
4390 
4391       /* pairwise exchange information */
4392       printf("\nPaiwise Info:\n");
4393       printf("num_pairs=%d\n",   gs->num_pairs);
4394       printf("max_pairs=%d\n",   gs->max_pairs);
4395       printf("len_pw_list=%d\n", gs->len_pw_list);
4396       printf("pair_list=%ld\n",   (PTRINT) gs->pair_list);
4397       printf("msg_sizes=%ld\n",   (PTRINT) gs->msg_sizes);
4398       printf("node_list=%ld\n",   (PTRINT) gs->node_list);
4399       printf("pw_elm_list=%ld\n", (PTRINT) gs->pw_elm_list);
4400 
4401       printf("pw_elm_list: ");
4402       if ((iptr = gs->pw_elm_list))
4403 	{
4404 	  for (j=0;j<gs->len_pw_list;j++)
4405 	    {printf("%d ", *iptr); iptr++;}
4406 	}
4407       printf("\n");
4408 
4409       printf("processor_list: ");
4410       if ((iptr = gs->pair_list))
4411 	{
4412 	  for (j=0;j<gs->num_pairs;j++)
4413 	    {printf("%d ", *iptr); iptr++;}
4414 	}
4415       printf("\n");
4416 
4417       printf("loc_node_pairs=%d\n",   gs->loc_node_pairs);
4418       printf("max_node_pairs=%d\n",   gs->max_node_pairs);
4419       printf("min_node_pairs=%d\n",   gs->min_node_pairs);
4420       printf("avg_node_pairs=%d\n",   gs->avg_node_pairs);
4421 
4422       printf("size_list: ");
4423       if ((iptr = gs->msg_sizes))
4424 	{
4425 	  for (j=0;j<gs->num_pairs;j++)
4426 	    {printf("%d ", *iptr); iptr++;}
4427 	}
4428       printf("\n");
4429       if ((iptr = gs->pair_list))
4430 	{
4431 	  for (j=0;j<gs->num_pairs;j++)
4432 	    {
4433 	      printf("node_list %d: ", *iptr);
4434 	      if ((iptr2 = (gs->node_list)[j]))
4435 		{
4436 		  for (k=0;k<(gs->msg_sizes)[j];k++)
4437 		    {printf("%d ", *iptr2); iptr2++;}
4438 		}
4439 	      iptr++;
4440 	      printf("\n");
4441 	    }
4442 	}
4443       printf("\n");
4444 
4445       printf("elm_list(U): ");
4446       if ((iptr = gs->elms))
4447 	{
4448 	  for (j=0;j<gs->nel;j++)
4449 	    {printf("%d ", *iptr); iptr++;}
4450 	}
4451       printf("\n");
4452       printf("\n");
4453 
4454       printf("elm_list(T): ");
4455       if ((iptr = gs->local_elms))
4456 	{
4457 	  for (j=0;j<gs->nel_total;j++)
4458 	    {printf("%d ", *iptr); iptr++;}
4459 	}
4460       printf("\n");
4461       printf("\n");
4462 
4463       printf("map_list(T): ");
4464       if ((iptr = gs->companion))
4465 	{
4466 	  for (j=0;j<gs->nel;j++)
4467 	    {printf("%d ", *iptr); iptr++;}
4468 	}
4469       printf("\n");
4470       printf("\n");
4471 
4472 
4473       /* local exchange information */
4474       printf("\nLocal Info:\n");
4475       printf("local_strength=%d\n",   gs->local_strength);
4476       printf("num_local_total=%d\n",  gs->num_local_total);
4477       printf("num_local=%d\n",        gs->num_local);
4478       printf("num_local_gop=%d\n",    gs->num_local_gop);
4479       printf("num_local_reduce=%ld\n", (PTRINT) gs->num_local_reduce);
4480       printf("local_reduce=%ld\n",     (PTRINT) gs->local_reduce);
4481       printf("num_gop_local_reduce=%ld\n", (PTRINT) gs->num_gop_local_reduce);
4482       printf("gop_local_reduce=%ld\n",     (PTRINT) gs->gop_local_reduce);
4483       printf("\n");
4484 
4485       for (j=0;j<gs->num_local;j++)
4486 	{
4487 	  printf("local reduce_list %d: ", j);
4488 	  if ((iptr2 = (gs->local_reduce)[j]))
4489 	    {
4490 	      if ((gs->num_local_reduce)[j] <= 0)
4491 		{printf("oops");}
4492 
4493 	      for (k=0;k<(gs->num_local_reduce)[j];k++)
4494 		{printf("%d ", *iptr2); iptr2++;}
4495 	    }
4496 	  printf("\n");
4497 	}
4498 
4499       printf("\n");
4500       printf("\n");
4501 
4502       for (j=0;j<gs->num_local_gop;j++)
4503 	{
4504 	  printf("gop reduce_list %d: ", j);
4505 	  iptr2 = (gs->gop_local_reduce)[j];
4506 
4507 	  if ((gs->num_gop_local_reduce)[j] <= 0)
4508 	    {printf("oops");}
4509 
4510 
4511 	  for (k=0;k<(gs->num_gop_local_reduce)[j];k++)
4512 	    {printf("%d ", *iptr2); iptr2++;}
4513 	  printf("\n");
4514 	}
4515       printf("\n");
4516       printf("\n");
4517 
4518       /* crystal router information */
4519       printf("\n\n");
4520       printf("Tree Info:\n");
4521       printf("max_left_over=%d\n",   gs->max_left_over);
4522       printf("num_in_list=%ld\n",    (PTRINT) gs->in_num);
4523       printf("in_list=%ld\n",        (PTRINT) gs->in_list);
4524       printf("num_out_list=%ld\n",   (PTRINT) gs->out_num);
4525       printf("out_list=%ld\n",       (PTRINT) gs->out_list);
4526 
4527       printf("\n\n");
4528     }
4529   fflush(stdout);
4530 }
4531 #endif
4532 
4533 
4534 
4535 /******************************************************************************
4536 Function: gs_free()
4537 
4538 Input :
4539 
4540 Output:
4541 
4542 Return:
4543 
4544 Description:
4545   if (gs->sss) {perm_free((void*) gs->sss);}
4546 ******************************************************************************/
4547 void
4548 gs_free(register gs_id *gs)
4549 {
4550   register int i;
4551 
4552 
4553 #ifdef DEBUG
4554   error_msg_warning("start gs_gop_xxx()\n");
4555   if (!gs) {error_msg_warning("NULL ptr passed to gs_free()"); return;}
4556 #endif
4557 
4558   if (gs->nghs) {perm_free((void*) gs->nghs);}
4559   if (gs->pw_nghs) {perm_free((void*) gs->pw_nghs);}
4560 
4561   /* tree */
4562   if (gs->max_left_over)
4563     {
4564       if (gs->tree_elms) {bss_free((void*) gs->tree_elms);}
4565       if (gs->tree_buf) {bss_free((void*) gs->tree_buf);}
4566       if (gs->tree_work) {bss_free((void*) gs->tree_work);}
4567       if (gs->tree_map_in) {bss_free((void*) gs->tree_map_in);}
4568       if (gs->tree_map_out) {bss_free((void*) gs->tree_map_out);}
4569     }
4570 
4571   /* pairwise info */
4572   if (gs->num_pairs)
4573     {
4574       /* should be NULL already */
4575       if (gs->ngh_buf) {bss_free((void*) gs->ngh_buf);}
4576       if (gs->elms) {bss_free((void*) gs->elms);}
4577       if (gs->local_elms) {bss_free((void*) gs->local_elms);}
4578       if (gs->companion) {bss_free((void*) gs->companion);}
4579 
4580       /* only set if pairwise */
4581       if (gs->vals) {perm_free((void*) gs->vals);}
4582       if (gs->in) {perm_free((void*) gs->in);}
4583       if (gs->out) {perm_free((void*) gs->out);}
4584       if (gs->msg_ids_in) {perm_free((void*) gs->msg_ids_in);}
4585       if (gs->msg_ids_out) {perm_free((void*) gs->msg_ids_out);}
4586       if (gs->pw_vals) {perm_free((void*) gs->pw_vals);}
4587       if (gs->pw_elm_list) {perm_free((void*) gs->pw_elm_list);}
4588       if (gs->node_list)
4589 	{
4590 	  for (i=0;i<gs->num_pairs;i++)
4591 	    {if (gs->node_list[i]) {perm_free((void*) gs->node_list[i]);}}
4592 	  perm_free((void*) gs->node_list);
4593 	}
4594       if (gs->msg_sizes) {perm_free((void*) gs->msg_sizes);}
4595       if (gs->pair_list) {perm_free((void*) gs->pair_list);}
4596     }
4597 
4598   /* local info */
4599   if (gs->num_local_total>=0)
4600     {
4601       for (i=0;i<gs->num_local_total+1;i++)
4602 	/*      for (i=0;i<gs->num_local_total;i++) */
4603 	{
4604 	  if (gs->num_gop_local_reduce[i])
4605 	    {perm_free((void*) gs->gop_local_reduce[i]);}
4606 	}
4607     }
4608 
4609   /* if intersection tree/pairwise and local isn't empty */
4610   if (gs->gop_local_reduce) {perm_free((void*) gs->gop_local_reduce);}
4611   if (gs->num_gop_local_reduce) {perm_free((void*) gs->num_gop_local_reduce);}
4612 
4613   perm_free((void *) gs);
4614 }
4615 
4616 
4617 
4618 
4619 
4620 
4621 /******************************************************************************
4622 Function: gather_scatter
4623 
4624 Input :
4625 Output:
4626 Return:
4627 Description:
4628 ******************************************************************************/
4629 void
4630 gs_gop_vec(register gs_id *gs, register REAL *vals, register const char *op, register int step)
4631 {
4632 #ifdef DEBUG
4633   error_msg_warning("gs_gop_vec() start");
4634   if (!gs) {error_msg_fatal("gs_gop_vec() :: passed NULL gs handle!!!");}
4635   if (!op) {error_msg_fatal("gs_gop_vec() :: passed NULL operation!!!");}
4636 
4637   /* check top make sure that segments being requested aren't larger */
4638   /* then what I reserved earlier ... fix is to allow user to reset  */
4639   if (step>gs->vec_sz)
4640     {error_msg_fatal("gs_gop_vec() :: %d > %d!\n",step,gs->vec_sz);}
4641 #endif
4642 
4643   switch (*op) {
4644   case '+':
4645     gs_gop_vec_plus(gs,vals,step);
4646     break;
4647 #ifdef NOT_YET
4648   case '*':
4649     gs_gop_times(gs,vals);
4650     break;
4651   case 'a':
4652     gs_gop_min_abs(gs,vals);
4653     break;
4654   case 'A':
4655     gs_gop_max_abs(gs,vals);
4656     break;
4657   case 'e':
4658     gs_gop_exists(gs,vals);
4659     break;
4660   case 'm':
4661     gs_gop_min(gs,vals);
4662     break;
4663   case 'M':
4664     gs_gop_max(gs,vals); break;
4665     /*
4666     if (*(op+1)=='\0')
4667       {gs_gop_max(gs,vals); break;}
4668     else if (*(op+1)=='X')
4669       {gs_gop_max_abs(gs,vals); break;}
4670     else if (*(op+1)=='N')
4671       {gs_gop_min_abs(gs,vals); break;}
4672     */
4673 #endif
4674   default:
4675     error_msg_warning("gs_gop_vec() :: %c is not a valid op",op[0]);
4676     error_msg_warning("gs_gop_vec() :: default :: plus");
4677     gs_gop_vec_plus(gs,vals,step);
4678     break;
4679   }
4680 #ifdef DEBUG
4681   error_msg_warning("gs_gop_vec() end");
4682 #endif
4683 }
4684 
4685 
4686 
4687 /******************************************************************************
4688 Function: gather_scatter
4689 
4690 Input :
4691 Output:
4692 Return:
4693 Description:
4694 ******************************************************************************/
4695 static void
4696 gs_gop_vec_plus(register gs_id *gs, register REAL *vals, register int step)
4697 {
4698 #ifdef DEBUG
4699   error_msg_warning("gs_gop_vec_plus() start");
4700 #endif
4701 
4702   if (!gs) {error_msg_fatal("gs_gop_vec() passed NULL gs handle!!!");}
4703 
4704   /* local only operations!!! */
4705   if (gs->num_local)
4706     {gs_gop_vec_local_plus(gs,vals,step);}
4707 
4708   /* if intersection tree/pairwise and local isn't empty */
4709   if (gs->num_local_gop)
4710     {
4711       gs_gop_vec_local_in_plus(gs,vals,step);
4712 
4713       /* pairwise */
4714       if (gs->num_pairs)
4715 	{gs_gop_vec_pairwise_plus(gs,vals,step);}
4716 
4717       /* tree */
4718       else if (gs->max_left_over)
4719 	{gs_gop_vec_tree_plus(gs,vals,step);}
4720 
4721       gs_gop_vec_local_out(gs,vals,step);
4722     }
4723   /* if intersection tree/pairwise and local is empty */
4724   else
4725     {
4726       /* pairwise */
4727       if (gs->num_pairs)
4728 	{gs_gop_vec_pairwise_plus(gs,vals,step);}
4729 
4730       /* tree */
4731       else if (gs->max_left_over)
4732 	{gs_gop_vec_tree_plus(gs,vals,step);}
4733     }
4734 #ifdef DEBUG
4735   error_msg_warning("gs_gop_vec_plus() end");
4736 #endif
4737 }
4738 
4739 
4740 
4741 /******************************************************************************
4742 Function: gather_scatter
4743 
4744 Input :
4745 Output:
4746 Return:
4747 Description:
4748 ******************************************************************************/
4749 static
4750 void
4751 gs_gop_vec_local_plus(register gs_id *gs, register REAL *vals,
4752 		      register int step)
4753 {
4754   register int *num, *map, **reduce;
4755   register REAL *base;
4756 
4757 
4758 #ifdef DEBUG
4759   error_msg_warning("gs_gop_vec_local_plus() start");
4760 #endif
4761 
4762   num    = gs->num_local_reduce;
4763   reduce = gs->local_reduce;
4764   while ((map = *reduce))
4765     {
4766       base = vals + map[0] * step;
4767 
4768       /* wall */
4769       if (*num == 2)
4770 	{
4771 	  num++; reduce++;
4772 	  rvec_add (base,vals+map[1]*step,step);
4773 	  rvec_copy(vals+map[1]*step,base,step);
4774 	}
4775       /* corner shared by three elements */
4776       else if (*num == 3)
4777 	{
4778 	  num++; reduce++;
4779 	  rvec_add (base,vals+map[1]*step,step);
4780 	  rvec_add (base,vals+map[2]*step,step);
4781 	  rvec_copy(vals+map[2]*step,base,step);
4782 	  rvec_copy(vals+map[1]*step,base,step);
4783 	}
4784       /* corner shared by four elements */
4785       else if (*num == 4)
4786 	{
4787 	  num++; reduce++;
4788 	  rvec_add (base,vals+map[1]*step,step);
4789 	  rvec_add (base,vals+map[2]*step,step);
4790 	  rvec_add (base,vals+map[3]*step,step);
4791 	  rvec_copy(vals+map[3]*step,base,step);
4792 	  rvec_copy(vals+map[2]*step,base,step);
4793 	  rvec_copy(vals+map[1]*step,base,step);
4794 	}
4795       /* general case ... odd geoms ... 3D */
4796       else
4797 	{
4798 	  num++;
4799 	  while (*++map >= 0)
4800 	    {rvec_add (base,vals+*map*step,step);}
4801 
4802 	  map = *reduce;
4803 	  while (*++map >= 0)
4804 	    {rvec_copy(vals+*map*step,base,step);}
4805 
4806 	  reduce++;
4807 	}
4808     }
4809 #ifdef DEBUG
4810   error_msg_warning("gs_gop_vec_local_plus() end");
4811 #endif
4812 }
4813 
4814 
4815 
4816 /******************************************************************************
4817 Function: gather_scatter
4818 
4819 Input :
4820 Output:
4821 Return:
4822 Description:
4823 ******************************************************************************/
4824 static
4825 void
4826 gs_gop_vec_local_in_plus(register gs_id *gs, register REAL *vals,
4827 			 register int step)
4828 {
4829   register int  *num, *map, **reduce;
4830   register REAL *base;
4831 
4832 
4833 #ifdef DEBUG
4834   error_msg_warning("gs_gop_vec_locel_in_plus() start");
4835 #endif
4836 
4837   num    = gs->num_gop_local_reduce;
4838   reduce = gs->gop_local_reduce;
4839   while ((map = *reduce++))
4840     {
4841       base = vals + map[0] * step;
4842 
4843       /* wall */
4844       if (*num == 2)
4845 	{
4846 	  num ++;
4847 	  rvec_add(base,vals+map[1]*step,step);
4848 	}
4849       /* corner shared by three elements */
4850       else if (*num == 3)
4851 	{
4852 	  num ++;
4853 	  rvec_add(base,vals+map[1]*step,step);
4854 	  rvec_add(base,vals+map[2]*step,step);
4855 	}
4856       /* corner shared by four elements */
4857       else if (*num == 4)
4858 	{
4859 	  num ++;
4860 	  rvec_add(base,vals+map[1]*step,step);
4861 	  rvec_add(base,vals+map[2]*step,step);
4862 	  rvec_add(base,vals+map[3]*step,step);
4863 	}
4864       /* general case ... odd geoms ... 3D*/
4865       else
4866 	{
4867 	  num++;
4868 	  while (*++map >= 0)
4869 	    {rvec_add(base,vals+*map*step,step);}
4870 	}
4871     }
4872 #ifdef DEBUG
4873   error_msg_warning("gs_gop_vec_local_in_plus() end");
4874 #endif
4875 }
4876 
4877 
4878 /******************************************************************************
4879 Function: gather_scatter
4880 
4881 Input :
4882 Output:
4883 Return:
4884 Description:
4885 ******************************************************************************/
4886 static
4887 void
4888 gs_gop_vec_local_out(register gs_id *gs, register REAL *vals,
4889 		     register int step)
4890 {
4891   register int *num, *map, **reduce;
4892   register REAL *base;
4893 
4894 
4895 #ifdef DEBUG
4896   error_msg_warning("gs_gop_vec_local_out() start");
4897 #endif
4898 
4899   num    = gs->num_gop_local_reduce;
4900   reduce = gs->gop_local_reduce;
4901   while ((map = *reduce++))
4902     {
4903       base = vals + map[0] * step;
4904 
4905       /* wall */
4906       if (*num == 2)
4907 	{
4908 	  num ++;
4909 	  rvec_copy(vals+map[1]*step,base,step);
4910 	}
4911       /* corner shared by three elements */
4912       else if (*num == 3)
4913 	{
4914 	  num ++;
4915 	  rvec_copy(vals+map[1]*step,base,step);
4916 	  rvec_copy(vals+map[2]*step,base,step);
4917 	}
4918       /* corner shared by four elements */
4919       else if (*num == 4)
4920 	{
4921 	  num ++;
4922 	  rvec_copy(vals+map[1]*step,base,step);
4923 	  rvec_copy(vals+map[2]*step,base,step);
4924 	  rvec_copy(vals+map[3]*step,base,step);
4925 	}
4926       /* general case ... odd geoms ... 3D*/
4927       else
4928 	{
4929 	  num++;
4930 	  while (*++map >= 0)
4931 	    {rvec_copy(vals+*map*step,base,step);}
4932 	}
4933     }
4934 #ifdef DEBUG
4935   error_msg_warning("gs_gop_vec_local_out() end");
4936 #endif
4937 }
4938 
4939 
4940 
4941 /******************************************************************************
4942 Function: gather_scatter
4943 
4944 VERSION 3 ::
4945 
4946 Input :
4947 Output:
4948 Return:
4949 Description:
4950 ******************************************************************************/
4951 static
4952 void
4953 gs_gop_vec_pairwise_plus(register gs_id *gs, register REAL *in_vals,
4954 			 register int step)
4955 {
4956 #if   defined NXSRC
4957   register REAL *dptr1, *dptr2, *dptr3, *in1, *in2;
4958   register int *iptr, *msg_list, *msg_size, **msg_nodes;
4959   register int *pw, *list, *size, **nodes;
4960   register int *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
4961   register int i;
4962 
4963 #ifdef DEBUG
4964   error_msg_warning("gs_gop_vec_pairwise_plus() start");
4965 #endif
4966 
4967   /* strip and load registers */
4968   msg_list =list         = gs->pair_list;
4969   msg_size =size         = gs->msg_sizes;
4970   msg_nodes=nodes        = gs->node_list;
4971   iptr=pw                = gs->pw_elm_list;
4972   dptr1=dptr3            = gs->pw_vals;
4973   msg_ids_in  = ids_in   = gs->msg_ids_in;
4974   msg_ids_out = ids_out  = gs->msg_ids_out;
4975   dptr2                  = gs->out;
4976   in1=in2                = gs->in;
4977 
4978   /* post the receives */
4979   do
4980     {
4981       *msg_ids_in++ = (int) irecv(MSGTAG1 + *list++,(char *)in1,*size*REAL_LEN*step);
4982       in1 += *size++ * step;
4983     }
4984   while (*msg_ids_in >= 0);
4985 
4986   /* load gs values into in out gs buffers */
4987   while (*iptr >= 0)
4988     {
4989       rvec_copy(dptr3,in_vals + *iptr*step,step);
4990       dptr3+=step;
4991       iptr++;
4992     }
4993 
4994   /* load out buffers and post the sends */
4995   while (iptr = *msg_nodes++)
4996     {
4997       dptr3 = dptr2;
4998       while (*iptr >= 0)
4999 	{
5000 	  rvec_copy(dptr2,dptr1 + *iptr*step,step);
5001 	  dptr2+=step;
5002 	  iptr++;
5003 	}
5004       *msg_ids_out++ = (int) isend(MSGTAG1+my_id,(char *)dptr3,
5005 				   *(msg_size++)*REAL_LEN*step,*msg_list++,0);
5006     }
5007 
5008   /* post the receives ... was here*/
5009   /* tree */
5010   if (gs->max_left_over)
5011     {gs_gop_vec_tree_plus(gs,in_vals,step);}
5012 
5013   /* process the received data */
5014   while (iptr = *nodes++)
5015     {
5016       msgwait(*ids_in++);
5017       while (*iptr >= 0)
5018 	{
5019 #if defined BLAS||CBLAS
5020 	  axpy(step,1.0,in2,1,dptr1 + *iptr*step,1);
5021 #else
5022 	  rvec_add(dptr1 + *iptr*step,in2,step);
5023 #endif
5024 	  in2+=step;
5025 	  iptr++;
5026 	}
5027     }
5028 
5029   /* replace vals */
5030   while (*pw >= 0)
5031     {
5032       rvec_copy(in_vals + *pw*step,dptr1,step);
5033       dptr1+=step;
5034       pw++;
5035     }
5036 
5037   /* clear isend message handles */
5038   while (*ids_out >= 0)
5039     {msgwait(*ids_out++);}
5040 
5041 #ifdef DEBUG
5042   error_msg_warning("gs_gop_vec_pairwise_plus() end");
5043 #endif
5044 
5045 #elif defined MPISRC
5046   register REAL *dptr1, *dptr2, *dptr3, *in1, *in2;
5047   register int *iptr, *msg_list, *msg_size, **msg_nodes;
5048   register int *pw, *list, *size, **nodes;
5049   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
5050   MPI_Status status;
5051 
5052 
5053 #ifdef DEBUG
5054   error_msg_warning("gs_gop_vec_pairwise_plus() start");
5055 #endif
5056 
5057   /* strip and load registers */
5058   msg_list =list         = gs->pair_list;
5059   msg_size =size         = gs->msg_sizes;
5060   msg_nodes=nodes        = gs->node_list;
5061   iptr=pw                = gs->pw_elm_list;
5062   dptr1=dptr3            = gs->pw_vals;
5063   msg_ids_in  = ids_in   = gs->msg_ids_in;
5064   msg_ids_out = ids_out  = gs->msg_ids_out;
5065   dptr2                  = gs->out;
5066   in1=in2                = gs->in;
5067 
5068   /* post the receives */
5069   /*  msg_nodes=nodes; */
5070   do
5071     {
5072       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
5073 	 second one *list and do list++ afterwards */
5074       MPI_Irecv(in1, *size *step, REAL_TYPE, MPI_ANY_SOURCE, MSGTAG1 + *list++,
5075 		gs->gs_comm, msg_ids_in++);
5076       in1 += *size++ *step;
5077     }
5078   while (*++msg_nodes);
5079   msg_nodes=nodes;
5080 
5081   /* load gs values into in out gs buffers */
5082   while (*iptr >= 0)
5083     {
5084       rvec_copy(dptr3,in_vals + *iptr*step,step);
5085       dptr3+=step;
5086       iptr++;
5087     }
5088 
5089   /* load out buffers and post the sends */
5090   while ((iptr = *msg_nodes++))
5091     {
5092       dptr3 = dptr2;
5093       while (*iptr >= 0)
5094 	{
5095 	  rvec_copy(dptr2,dptr1 + *iptr*step,step);
5096 	  dptr2+=step;
5097 	  iptr++;
5098 	}
5099       MPI_Isend(dptr3, *msg_size++ *step, REAL_TYPE, *msg_list++,
5100 		MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);
5101     }
5102 
5103   /* tree */
5104   if (gs->max_left_over)
5105     {gs_gop_vec_tree_plus(gs,in_vals,step);}
5106 
5107   /* process the received data */
5108   msg_nodes=nodes;
5109   while ((iptr = *nodes++))
5110     {
5111       /* Should I check the return value of MPI_Wait() or status? */
5112       /* Can this loop be replaced by a call to MPI_Waitall()? */
5113       MPI_Wait(ids_in++, &status);
5114       while (*iptr >= 0)
5115 	{
5116 #if defined BLAS||CBLAS
5117 	  axpy(step,1.0,in2,1,dptr1 + *iptr*step,1);
5118 #else
5119 	  rvec_add(dptr1 + *iptr*step,in2,step);
5120 #endif
5121 	  in2+=step;
5122 	  iptr++;
5123 	}
5124     }
5125 
5126   /* replace vals */
5127   while (*pw >= 0)
5128     {
5129       rvec_copy(in_vals + *pw*step,dptr1,step);
5130       dptr1+=step;
5131       pw++;
5132     }
5133 
5134   /* clear isend message handles */
5135   /* This changed for clarity though it could be the same */
5136   while (*msg_nodes++)
5137     /* Should I check the return value of MPI_Wait() or status? */
5138     /* Can this loop be replaced by a call to MPI_Waitall()? */
5139     {MPI_Wait(ids_out++, &status);}
5140 
5141 #ifdef DEBUG
5142   error_msg_warning("gs_gop_vec_pairwise_plus() end");
5143 #endif
5144 
5145 #else
5146   return;
5147 #endif
5148 }
5149 
5150 
5151 
5152 /******************************************************************************
5153 Function: gather_scatter
5154 
5155 Input :
5156 Output:
5157 Return:
5158 Description:
5159 ******************************************************************************/
5160 static
5161 void
5162 gs_gop_vec_tree_plus(register gs_id *gs, register REAL *vals, register int step)
5163 {
5164   register int size, *in, *out;
5165   register REAL *buf, *work;
5166   int op[] = {GL_ADD,0};
5167 
5168 #ifdef DEBUG
5169   error_msg_warning("start gs_gop_vec_tree_plus()");
5170 #endif
5171 
5172   /* copy over to local variables */
5173   in   = gs->tree_map_in;
5174   out  = gs->tree_map_out;
5175   buf  = gs->tree_buf;
5176   work = gs->tree_work;
5177   size = gs->tree_nel*step;
5178 
5179   /* zero out collection buffer */
5180 #if defined  BLAS||CBLAS
5181   *work = 0.0;
5182   copy(size,work,0,buf,1);
5183 #else
5184   rvec_zero(buf,size);
5185 #endif
5186 
5187 
5188   /* copy over my contributions */
5189   while (*in >= 0)
5190     {
5191 #if defined  BLAS||CBLAS
5192       copy(step,vals + *in++*step,1,buf + *out++*step,1);
5193 #else
5194       rvec_copy(buf + *out++*step,vals + *in++*step,step);
5195 #endif
5196     }
5197 
5198   /* perform fan in/out on full buffer */
5199   /* must change grop to handle the blas */
5200   grop(buf,work,size,op);
5201 
5202   /* reset */
5203   in   = gs->tree_map_in;
5204   out  = gs->tree_map_out;
5205 
5206   /* get the portion of the results I need */
5207   while (*in >= 0)
5208     {
5209 #if defined  BLAS||CBLAS
5210       copy(step,buf + *out++*step,1,vals + *in++*step,1);
5211 #else
5212       rvec_copy(vals + *in++*step,buf + *out++*step,step);
5213 #endif
5214     }
5215 
5216 #ifdef DEBUG
5217   error_msg_warning("start gs_gop_vec_tree_plus()");
5218 #endif
5219 }
5220 
5221 
5222 
5223 /******************************************************************************
5224 Function: gather_scatter
5225 
5226 Input :
5227 Output:
5228 Return:
5229 Description:
5230 ******************************************************************************/
5231 void
5232 gs_gop_hc(register gs_id *gs, register REAL *vals, register const char *op, register int dim)
5233 {
5234 #ifdef DEBUG
5235   error_msg_warning("gs_gop_hc() start\n");
5236   if (!gs) {error_msg_fatal("gs_gop_vec() :: passed NULL gs handle!!!\n");}
5237   if (!op) {error_msg_fatal("gs_gop_vec() :: passed NULL operation!!!\n");}
5238 #endif
5239 
5240   switch (*op) {
5241   case '+':
5242     gs_gop_plus_hc(gs,vals,dim);
5243     break;
5244 #ifdef NOT_YET
5245   case '*':
5246     gs_gop_times(gs,vals);
5247     break;
5248   case 'a':
5249     gs_gop_min_abs(gs,vals);
5250     break;
5251   case 'A':
5252     gs_gop_max_abs(gs,vals);
5253     break;
5254   case 'e':
5255     gs_gop_exists(gs,vals);
5256     break;
5257   case 'm':
5258     gs_gop_min(gs,vals);
5259     break;
5260   case 'M':
5261     gs_gop_max(gs,vals); break;
5262     /*
5263     if (*(op+1)=='\0')
5264       {gs_gop_max(gs,vals); break;}
5265     else if (*(op+1)=='X')
5266       {gs_gop_max_abs(gs,vals); break;}
5267     else if (*(op+1)=='N')
5268       {gs_gop_min_abs(gs,vals); break;}
5269     */
5270 #endif
5271   default:
5272     error_msg_warning("gs_gop_hc() :: %c is not a valid op",op[0]);
5273     error_msg_warning("gs_gop_hc() :: default :: plus\n");
5274     gs_gop_plus_hc(gs,vals,dim);
5275     break;
5276   }
5277 #ifdef DEBUG
5278   error_msg_warning("gs_gop_hc() end\n");
5279 #endif
5280 }
5281 
5282 
5283 
5284 /******************************************************************************
5285 Function: gather_scatter
5286 
5287 Input :
5288 Output:
5289 Return:
5290 Description:
5291 ******************************************************************************/
5292 static void
5293 gs_gop_plus_hc(register gs_id *gs, register REAL *vals, int dim)
5294 {
5295 #ifdef DEBUG
5296   error_msg_warning("start gs_gop_hc()\n");
5297   if (!gs) {error_msg_fatal("gs_gop_hc() passed NULL gs handle!!!\n");}
5298 #endif
5299 
5300   /* if there's nothing to do return */
5301   if (dim<=0)
5302     {return;}
5303 
5304   /* can't do more dimensions then exist */
5305   dim = MIN(dim,i_log2_num_nodes);
5306 
5307   /* local only operations!!! */
5308   if (gs->num_local)
5309     {gs_gop_local_plus(gs,vals);}
5310 
5311   /* if intersection tree/pairwise and local isn't empty */
5312   if (gs->num_local_gop)
5313     {
5314       gs_gop_local_in_plus(gs,vals);
5315 
5316       /* pairwise will do tree inside ... */
5317       if (gs->num_pairs)
5318 	{gs_gop_pairwise_plus_hc(gs,vals,dim);}
5319 
5320       /* tree only */
5321       else if (gs->max_left_over)
5322 	{gs_gop_tree_plus_hc(gs,vals,dim);}
5323 
5324       gs_gop_local_out(gs,vals);
5325     }
5326   /* if intersection tree/pairwise and local is empty */
5327   else
5328     {
5329       /* pairwise will do tree inside */
5330       if (gs->num_pairs)
5331 	{gs_gop_pairwise_plus_hc(gs,vals,dim);}
5332 
5333       /* tree */
5334       else if (gs->max_left_over)
5335 	{gs_gop_tree_plus_hc(gs,vals,dim);}
5336     }
5337 
5338 #ifdef DEBUG
5339   error_msg_warning("end gs_gop_hc()\n");
5340 #endif
5341 }
5342 
5343 
5344 /******************************************************************************
5345 VERSION 3 ::
5346 
5347 Input :
5348 Output:
5349 Return:
5350 Description:
5351 ******************************************************************************/
5352 static
5353 void
5354 gs_gop_pairwise_plus_hc(register gs_id *gs, register REAL *in_vals, int dim)
5355 {
5356 #if   defined NXSRC
5357   register REAL *dptr1, *dptr2, *dptr3, *in1, *in2;
5358   register int *iptr, *msg_list, *msg_size, **msg_nodes;
5359   register int *pw, *list, *size, **nodes;
5360   register int *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
5361   int i, mask=1;
5362 
5363   for (i=1; i<dim; i++)
5364     {mask<<=1; mask++;}
5365 
5366 
5367 #ifdef DEBUG
5368   error_msg_warning("gs_gop_pairwise_hc() start\n");
5369 #endif
5370 
5371   /* strip and load registers */
5372   msg_list =list         = gs->pair_list;
5373   msg_size =size         = gs->msg_sizes;
5374   msg_nodes=nodes        = gs->node_list;
5375   iptr=pw                = gs->pw_elm_list;
5376   dptr1=dptr3            = gs->pw_vals;
5377   msg_ids_in  = ids_in   = gs->msg_ids_in;
5378   msg_ids_out = ids_out  = gs->msg_ids_out;
5379   dptr2                  = gs->out;
5380   in1=in2                = gs->in;
5381 
5382   /* post the receives */
5383   do
5384     {
5385       if ((my_id|mask)==(*list|mask))
5386 	{
5387 	  *msg_ids_in++ = (int) irecv(MSGTAG1 + *list++,(char *)in1,*size*REAL_LEN);
5388 	  in1 += *size++;
5389 	}
5390       else
5391 	{list++; size++;}
5392     }
5393   while (*++msg_nodes);
5394 
5395   /* load gs values into in out gs buffers */
5396   while (*iptr >= 0)
5397     {*dptr3++ = *(in_vals + *iptr++);}
5398 
5399   /* load out buffers and post the sends */
5400   list = msg_list;
5401   msg_nodes=nodes;
5402   while (iptr = *msg_nodes++)
5403     {
5404       if ((my_id|mask)==(*list|mask))
5405 	{
5406 	  dptr3 = dptr2;
5407 	  while (*iptr >= 0)
5408 	    {*dptr2++ = *(dptr1 + *iptr++);}
5409 	  *msg_ids_out++ = (int) isend(MSGTAG1+my_id,(char *)dptr3,
5410 				       *(msg_size++)*REAL_LEN,*list++,0);
5411 	}
5412       else
5413 	{msg_size++; list++;}
5414     }
5415   /* post the receives ... was here*/
5416 
5417   /* do the tree while we're waiting */
5418   if (gs->max_left_over)
5419     {gs_gop_tree_plus_hc(gs,in_vals,dim);}
5420 
5421   /* process the received data */
5422   list = msg_list;
5423   msg_nodes=nodes;
5424   while (iptr = *msg_nodes++)
5425     {
5426       if ((my_id|mask)==(*list|mask))
5427 	{
5428 	  msgwait(*ids_in++);
5429 	  while (*iptr >= 0)
5430 	    {*(dptr1 + *iptr++) += *in2++;}
5431 	}
5432       list++;
5433     }
5434 
5435   /* replace vals */
5436   while (*pw >= 0)
5437     {*(in_vals + *pw++) = *dptr1++;}
5438 
5439   /* clear isend message handles */
5440   while (iptr = *nodes++)
5441     {
5442       if ((my_id|mask)==(*msg_list|mask))
5443 	{msgwait(*ids_out++);}
5444       msg_list++;
5445     }
5446 
5447 #ifdef DEBUG
5448   error_msg_warning("gs_gop_pairwise_hc() end\n");
5449 #endif
5450 
5451 #elif defined MPISRC
5452   register REAL *dptr1, *dptr2, *dptr3, *in1, *in2;
5453   register int *iptr, *msg_list, *msg_size, **msg_nodes;
5454   register int *pw, *list, *size, **nodes;
5455   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
5456   MPI_Status status;
5457   int i, mask=1;
5458 
5459   for (i=1; i<dim; i++)
5460     {mask<<=1; mask++;}
5461 
5462 
5463 #ifdef DEBUG
5464   error_msg_warning("gs_gop_pairwise_hc() start\n");
5465 #endif
5466 
5467   /* strip and load registers */
5468   msg_list =list         = gs->pair_list;
5469   msg_size =size         = gs->msg_sizes;
5470   msg_nodes=nodes        = gs->node_list;
5471   iptr=pw                = gs->pw_elm_list;
5472   dptr1=dptr3            = gs->pw_vals;
5473   msg_ids_in  = ids_in   = gs->msg_ids_in;
5474   msg_ids_out = ids_out  = gs->msg_ids_out;
5475   dptr2                  = gs->out;
5476   in1=in2                = gs->in;
5477 
5478   /* post the receives */
5479   /*  msg_nodes=nodes; */
5480   do
5481     {
5482       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
5483 	 second one *list and do list++ afterwards */
5484       if ((my_id|mask)==(*list|mask))
5485 	{
5486 	  MPI_Irecv(in1, *size, REAL_TYPE, MPI_ANY_SOURCE, MSGTAG1 + *list++,
5487 		    gs->gs_comm, msg_ids_in++);
5488 	  in1 += *size++;
5489 	}
5490       else
5491 	{list++; size++;}
5492     }
5493   while (*++msg_nodes);
5494 
5495   /* load gs values into in out gs buffers */
5496   while (*iptr >= 0)
5497     {*dptr3++ = *(in_vals + *iptr++);}
5498 
5499   /* load out buffers and post the sends */
5500   msg_nodes=nodes;
5501   list = msg_list;
5502   while ((iptr = *msg_nodes++))
5503     {
5504       if ((my_id|mask)==(*list|mask))
5505 	{
5506 	  dptr3 = dptr2;
5507 	  while (*iptr >= 0)
5508 	    {*dptr2++ = *(dptr1 + *iptr++);}
5509 	  /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
5510 	  /* is msg_ids_out++ correct? */
5511 	  MPI_Isend(dptr3, *msg_size++, REAL_TYPE, *list++,
5512 		    MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);
5513 	}
5514       else
5515 	{list++; msg_size++;}
5516     }
5517 
5518   /* do the tree while we're waiting */
5519   if (gs->max_left_over)
5520     {gs_gop_tree_plus_hc(gs,in_vals,dim);}
5521 
5522   /* process the received data */
5523   msg_nodes=nodes;
5524   list = msg_list;
5525   while ((iptr = *nodes++))
5526     {
5527       if ((my_id|mask)==(*list|mask))
5528 	{
5529 	  /* Should I check the return value of MPI_Wait() or status? */
5530 	  /* Can this loop be replaced by a call to MPI_Waitall()? */
5531 	  MPI_Wait(ids_in++, &status);
5532 	  while (*iptr >= 0)
5533 	    {*(dptr1 + *iptr++) += *in2++;}
5534 	}
5535       list++;
5536     }
5537 
5538   /* replace vals */
5539   while (*pw >= 0)
5540     {*(in_vals + *pw++) = *dptr1++;}
5541 
5542   /* clear isend message handles */
5543   /* This changed for clarity though it could be the same */
5544   while (*msg_nodes++)
5545     {
5546       if ((my_id|mask)==(*msg_list|mask))
5547 	{
5548 	  /* Should I check the return value of MPI_Wait() or status? */
5549 	  /* Can this loop be replaced by a call to MPI_Waitall()? */
5550 	  MPI_Wait(ids_out++, &status);
5551 	}
5552       msg_list++;
5553     }
5554 
5555 #ifdef DEBUG
5556   error_msg_warning("gs_gop_pairwise_hc() end\n");
5557 #endif
5558 
5559 #else
5560   return;
5561 #endif
5562 }
5563 
5564 
5565 
5566 /******************************************************************************
5567 Function: gather_scatter
5568 
5569 Input :
5570 Output:
5571 Return:
5572 Description:
5573 ******************************************************************************/
5574 static
5575 void
5576 gs_gop_tree_plus_hc(gs_id *gs, REAL *vals, int dim)
5577 {
5578   int size;
5579   int *in, *out;
5580   REAL *buf, *work;
5581   int op[] = {GL_ADD,0};
5582 
5583 #ifdef DEBUG
5584   error_msg_warning("start gs_gop_tree_plus_hc()\n");
5585 #endif
5586 
5587   in   = gs->tree_map_in;
5588   out  = gs->tree_map_out;
5589   buf  = gs->tree_buf;
5590   work = gs->tree_work;
5591   size = gs->tree_nel;
5592 
5593 #if defined  BLAS||CBLAS
5594   *work = 0.0;
5595   copy(size,work,0,buf,1);
5596 #else
5597   rvec_zero(buf,size);
5598 #endif
5599 
5600   while (*in >= 0)
5601     {*(buf + *out++) = *(vals + *in++);}
5602 
5603   in   = gs->tree_map_in;
5604   out  = gs->tree_map_out;
5605 
5606   grop_hc(buf,work,size,op,dim);
5607 
5608   while (*in >= 0)
5609     {*(vals + *in++) = *(buf + *out++);}
5610 
5611 #ifdef DEBUG
5612   error_msg_warning("end gs_gop_tree_plus_hc()\n");
5613 #endif
5614 }
5615 
5616 
5617 
5618