xref: /petsc/src/ksp/pc/impls/tfs/gs.c (revision 4c8c32f384357201972f9dd67b0f2e901704fd35)
1 
2 /***********************************gs.c***************************************
3 SPARSE GATHER-SCATTER PACKAGE: bss_malloc bss_malloc ivec error comm gs queue
4 
5 Author: Henry M. Tufo III
6 
7 e-mail: hmt@cs.brown.edu
8 
9 snail-mail:
10 Division of Applied Mathematics
11 Brown University
12 Providence, RI 02912
13 
14 Last Modification:
15 6.21.97
16 ************************************gs.c**************************************/
17 
18 /***********************************gs.c***************************************
19 File Description:
20 -----------------
21 
22 ************************************gs.c**************************************/
23 #include <stdio.h>
24 #include <math.h>
25 #include <float.h>
26 #include <limits.h>
27 
28 
29 #if   defined NXSRC
30 #ifndef DELTA
31 #include <nx.h>
32 #endif
33 
34 #elif defined MPISRC
35 #include <mpi.h>
36 #endif
37 
38 #include "petscconf.h"
39 #if defined(PETSC_HAVE_STRINGS_H)
40 #include <strings.h>
41 #endif
42 #if defined(PETSC_HAVE_STRING_H)
43 #include <string.h>
44 #endif
45 
46 #include "const.h"
47 #include "types.h"
48 #include "comm.h"
49 #include "ivec.h"
50 #include "bss_malloc.h"
51 #include "bit_mask.h"
52 #include "error.h"
53 #include "queue.h"
54 #include "blas.h"
55 #include "gs.h"
56 
57 /* default length of number of items via tree - doubles if exceeded */
58 #define TREE_BUF_SZ 2048;
59 #define GS_VEC_SZ   1
60 
61 
62 
63 /***********************************gs.c***************************************
64 Type: struct gather_scatter_id
65 ------------------------------
66 
67 ************************************gs.c**************************************/
68 typedef struct gather_scatter_id {
69   int id;
70   int nel_min;
71   int nel_max;
72   int nel_sum;
73   int negl;
74   int gl_max;
75   int gl_min;
76   int repeats;
77   int ordered;
78   int positive;
79   REAL *vals;
80 
81   /* bit mask info */
82   int *my_proc_mask;
83   int mask_sz;
84   int *ngh_buf;
85   int ngh_buf_sz;
86   int *nghs;
87   int num_nghs;
88   int max_nghs;
89   int *pw_nghs;
90   int num_pw_nghs;
91   int *tree_nghs;
92   int num_tree_nghs;
93 
94   int num_loads;
95 
96   /* repeats == true -> local info */
97   int nel;         /* number of unique elememts */
98   int *elms;       /* of size nel */
99   int nel_total;
100   int *local_elms; /* of size nel_total */
101   int *companion;  /* of size nel_total */
102 
103   /* local info */
104   int num_local_total;
105   int local_strength;
106   int num_local;
107   int *num_local_reduce;
108   int **local_reduce;
109   int num_local_gop;
110   int *num_gop_local_reduce;
111   int **gop_local_reduce;
112 
113   /* pairwise info */
114   int level;
115   int num_pairs;
116   int max_pairs;
117   int loc_node_pairs;
118   int max_node_pairs;
119   int min_node_pairs;
120   int avg_node_pairs;
121   int *pair_list;
122   int *msg_sizes;
123   int **node_list;
124   int len_pw_list;
125   int *pw_elm_list;
126   REAL *pw_vals;
127 
128 #ifdef MPISRC
129   MPI_Request *msg_ids_in;
130   MPI_Request *msg_ids_out;
131 
132 #else
133   int *msg_ids_in;
134   int *msg_ids_out;
135 
136 #endif
137 
138   REAL *out;
139   REAL *in;
140   int msg_total;
141 
142   /* tree - crystal accumulator info */
143   int max_left_over;
144   int *pre;
145   int *in_num;
146   int *out_num;
147   int **in_list;
148   int **out_list;
149 
150   /* new tree work*/
151   int  tree_nel;
152   int *tree_elms;
153   REAL *tree_buf;
154   REAL *tree_work;
155 
156   int  tree_map_sz;
157   int *tree_map_in;
158   int *tree_map_out;
159 
160   /* current memory status */
161   int gl_bss_min;
162   int gl_perm_min;
163 
164   /* max segment size for gs_gop_vec() */
165   int vec_sz;
166 
167   /* hack to make paul happy */
168 #ifdef MPISRC
169   MPI_Comm gs_comm;
170 #endif
171 
172 } gs_id;
173 
174 
175 /* to be made public */
176 #if defined(not_used)
177 static PetscErrorCode  gs_dump_ngh(gs_id *id, int loc_num, int *num, int *ngh_list);
178 static void gsi_via_int_list(gs_id *gs);
179 static PetscErrorCode in_sub_tree(int *ptr3, int p_mask_size, int *buf2, int buf_size);
180 #endif
181 
182 /* PRIVATE - and definitely not exported */
183 /*static void gs_print_template(register gs_id* gs, int who);*/
184 /*static void gs_print_stemplate(register gs_id* gs, int who);*/
185 
186 static gs_id *gsi_check_args(int *elms, int nel, int level);
187 static void gsi_via_bit_mask(gs_id *gs);
188 static void get_ngh_buf(gs_id *gs);
189 static void set_pairwise(gs_id *gs);
190 static gs_id * gsi_new(void);
191 static void set_tree(gs_id *gs);
192 
193 /* same for all but vector flavor */
194 static void gs_gop_local_out(gs_id *gs, REAL *vals);
195 /* vector flavor */
196 static void gs_gop_vec_local_out(gs_id *gs, REAL *vals, int step);
197 
198 static void gs_gop_vec_plus(gs_id *gs, REAL *in_vals, int step);
199 static void gs_gop_vec_pairwise_plus(gs_id *gs, REAL *in_vals, int step);
200 static void gs_gop_vec_local_plus(gs_id *gs, REAL *vals, int step);
201 static void gs_gop_vec_local_in_plus(gs_id *gs, REAL *vals, int step);
202 static void gs_gop_vec_tree_plus(gs_id *gs, REAL *vals, int step);
203 
204 
205 static void gs_gop_plus(gs_id *gs, REAL *in_vals);
206 static void gs_gop_pairwise_plus(gs_id *gs, REAL *in_vals);
207 static void gs_gop_local_plus(gs_id *gs, REAL *vals);
208 static void gs_gop_local_in_plus(gs_id *gs, REAL *vals);
209 static void gs_gop_tree_plus(gs_id *gs, REAL *vals);
210 
211 static void gs_gop_plus_hc(gs_id *gs, REAL *in_vals, int dim);
212 static void gs_gop_pairwise_plus_hc(gs_id *gs, REAL *in_vals, int dim);
213 static void gs_gop_tree_plus_hc(gs_id *gs, REAL *vals, int dim);
214 
215 static void gs_gop_times(gs_id *gs, REAL *in_vals);
216 static void gs_gop_pairwise_times(gs_id *gs, REAL *in_vals);
217 static void gs_gop_local_times(gs_id *gs, REAL *vals);
218 static void gs_gop_local_in_times(gs_id *gs, REAL *vals);
219 static void gs_gop_tree_times(gs_id *gs, REAL *vals);
220 
221 static void gs_gop_min(gs_id *gs, REAL *in_vals);
222 static void gs_gop_pairwise_min(gs_id *gs, REAL *in_vals);
223 static void gs_gop_local_min(gs_id *gs, REAL *vals);
224 static void gs_gop_local_in_min(gs_id *gs, REAL *vals);
225 static void gs_gop_tree_min(gs_id *gs, REAL *vals);
226 
227 static void gs_gop_min_abs(gs_id *gs, REAL *in_vals);
228 static void gs_gop_pairwise_min_abs(gs_id *gs, REAL *in_vals);
229 static void gs_gop_local_min_abs(gs_id *gs, REAL *vals);
230 static void gs_gop_local_in_min_abs(gs_id *gs, REAL *vals);
231 static void gs_gop_tree_min_abs(gs_id *gs, REAL *vals);
232 
233 static void gs_gop_max(gs_id *gs, REAL *in_vals);
234 static void gs_gop_pairwise_max(gs_id *gs, REAL *in_vals);
235 static void gs_gop_local_max(gs_id *gs, REAL *vals);
236 static void gs_gop_local_in_max(gs_id *gs, REAL *vals);
237 static void gs_gop_tree_max(gs_id *gs, REAL *vals);
238 
239 static void gs_gop_max_abs(gs_id *gs, REAL *in_vals);
240 static void gs_gop_pairwise_max_abs(gs_id *gs, REAL *in_vals);
241 static void gs_gop_local_max_abs(gs_id *gs, REAL *vals);
242 static void gs_gop_local_in_max_abs(gs_id *gs, REAL *vals);
243 static void gs_gop_tree_max_abs(gs_id *gs, REAL *vals);
244 
245 static void gs_gop_exists(gs_id *gs, REAL *in_vals);
246 static void gs_gop_pairwise_exists(gs_id *gs, REAL *in_vals);
247 static void gs_gop_local_exists(gs_id *gs, REAL *vals);
248 static void gs_gop_local_in_exists(gs_id *gs, REAL *vals);
249 static void gs_gop_tree_exists(gs_id *gs, REAL *vals);
250 
251 static void gs_gop_pairwise_binary(gs_id *gs, REAL *in_vals, rbfp fct);
252 static void gs_gop_local_binary(gs_id *gs, REAL *vals, rbfp fct);
253 static void gs_gop_local_in_binary(gs_id *gs, REAL *vals, rbfp fct);
254 static void gs_gop_tree_binary(gs_id *gs, REAL *vals, rbfp fct);
255 
256 
257 
258 /* global vars */
259 /* from comm.c module */
260 
261 /* module state inf and fortran interface */
262 static int num_gs_ids = 0;
263 
264 /* should make this dynamic ... later */
265 /*static queue_ADT elms_q, mask_q;*/
266 static int msg_buf=MAX_MSG_BUF;
267 /*static int msg_ch=FALSE;*/
268 
269 static int vec_sz=GS_VEC_SZ;
270 /*static int vec_ch=FALSE; */
271 
272 static int *tree_buf=NULL;
273 static int tree_buf_sz=0;
274 static int ntree=0;
275 
276 
277 /******************************************************************************
278 Function: gs_init_()
279 
280 Input :
281 Output:
282 Return:
283 Description:
284 ******************************************************************************/
285 void gs_init_vec_sz(int size)
286 {
287   /*  vec_ch = TRUE; */
288 
289   vec_sz = size;
290 }
291 
292 /******************************************************************************
293 Function: gs_init_()
294 
295 Input :
296 Output:
297 Return:
298 Description:
299 ******************************************************************************/
300 void gs_init_msg_buf_sz(int buf_size)
301 {
302   /*  msg_ch = TRUE; */
303 
304   msg_buf = buf_size;
305 }
306 
307 /******************************************************************************
308 Function: gs_init()
309 
310 Input :
311 
312 Output:
313 
314 RETURN:
315 
316 Description:
317 ******************************************************************************/
318 gs_id *
319 gs_init(register int *elms, int nel, int level)
320 {
321   register gs_id *gs;
322 #ifdef INFO1
323   int i;
324 #endif
325 #ifdef MPISRC
326   MPI_Group gs_group;
327   MPI_Comm  gs_comm;
328 #endif
329 
330 
331   bss_init();
332   perm_init();
333 
334 #ifdef DEBUG
335   error_msg_warning("gs_init() start w/(%d,%d)\n",my_id,num_nodes);
336 #endif
337 
338   /* ensure that communication package has been initialized */
339   comm_init();
340 
341 #ifdef INFO1
342   bss_stats();
343   perm_stats();
344 #endif
345 
346   /* determines if we have enough dynamic/semi-static memory */
347   /* checks input, allocs and sets gd_id template            */
348   gs = gsi_check_args(elms,nel,level);
349 
350   /* only bit mask version up and working for the moment    */
351   /* LATER :: get int list version working for sparse pblms */
352   gsi_via_bit_mask(gs);
353 
354 #ifdef INFO1
355   /* print out P0's template as well as malloc stats */
356   gs_print_template(gs,0);
357 
358 #if   defined NXSRC
359       gsync();
360 #elif defined MPISRC
361       MPI_Barrier(MPI_COMM_WORLD);
362 #endif
363 
364   for (i=1;i<num_nodes;i++)
365     {
366       gs_print_stemplate(gs,i);
367 #if   defined NXSRC
368       gsync();
369 #elif defined MPISRC
370       MPI_Barrier(MPI_COMM_WORLD);
371 #endif
372     }
373 #endif
374 
375 #ifdef DEBUG
376   error_msg_warning("gs_init() end w/(%d,%d)\n",my_id,num_nodes);
377 #endif
378 
379 #ifdef INFO
380   bss_stats();
381   perm_stats();
382 #endif
383 
384 #ifdef MPISRC
385   MPI_Comm_group(MPI_COMM_WORLD,&gs_group);
386   MPI_Comm_create(MPI_COMM_WORLD,gs_group,&gs_comm);
387   gs->gs_comm=gs_comm;
388 #endif
389 
390   return(gs);
391 }
392 
393 
394 
395 /******************************************************************************
396 Function: gsi_new()
397 
398 Input :
399 Output:
400 Return:
401 Description:
402 
403 elm list must >= 0!!!
404 elm repeats allowed
405 ******************************************************************************/
406 static
407 gs_id *
408 gsi_new(void)
409 {
410   int size=sizeof(gs_id);
411   gs_id *gs;
412 
413 
414 #ifdef DEBUG
415   error_msg_warning("gsi_new() :: size=%d\n",size);
416 #endif
417 
418   gs = (gs_id *) perm_malloc(size);
419 
420   if (!(size%REAL_LEN))
421     {rvec_zero((REAL *)gs,size/REAL_LEN);}
422   else if (!(size%INT_LEN))
423     {ivec_zero((INT*)gs,size/INT_LEN);}
424   else
425     {memset((char *)gs,0,size/sizeof(char));}
426 
427   return(gs);
428 }
429 
430 
431 
432 /******************************************************************************
433 Function: gsi_check_args()
434 
435 Input :
436 Output:
437 Return:
438 Description:
439 
440 elm list must >= 0!!!
441 elm repeats allowed
442 local working copy of elms is sorted
443 ******************************************************************************/
444 static
445 gs_id *
446 gsi_check_args(int *in_elms, int nel, int level)
447 {
448   register int i, j, k, t2;
449   int *companion, *elms, *unique, *iptr;
450   int num_local=0, *num_to_reduce, **local_reduce;
451   int oprs[] = {NON_UNIFORM,GL_MIN,GL_MAX,GL_ADD,GL_MIN,GL_MAX,GL_MIN,GL_B_AND};
452   int vals[sizeof(oprs)/sizeof(oprs[0])-1];
453   int work[sizeof(oprs)/sizeof(oprs[0])-1];
454   gs_id *gs;
455 
456 
457 #ifdef DEBUG
458   error_msg_warning("gs_check_args() begin w/(%d,%d)\n",my_id,num_nodes);
459 #endif
460 
461 #ifdef SAFE
462   if (!in_elms)
463     {error_msg_fatal("elms point to nothing!!!\n");}
464 
465   if (nel<0)
466     {error_msg_fatal("can't have fewer than 0 elms!!!\n");}
467 
468   if (nel==0)
469     {error_msg_warning("I don't have any elements!!!\n");}
470 #endif
471 
472   /* get space for gs template */
473   gs = gsi_new();
474   gs->id = ++num_gs_ids;
475 
476   /* hmt 6.4.99                                            */
477   /* caller can set global ids that don't participate to 0 */
478   /* gs_init ignores all zeros in elm list                 */
479   /* negative global ids are still invalid                 */
480   for (i=j=0;i<nel;i++)
481     {if (in_elms[i]!=0) {j++;}}
482 
483   k=nel; nel=j;
484 
485   /* copy over in_elms list and create inverse map */
486   elms = (int*) bss_malloc((nel+1)*INT_LEN);
487   companion = (int*) bss_malloc(nel*INT_LEN);
488   /* ivec_c_index(companion,nel); */
489   /* ivec_copy(elms,in_elms,nel); */
490   for (i=j=0;i<k;i++)
491     {
492       if (in_elms[i]!=0)
493 	{elms[j] = in_elms[i]; companion[j++] = i;}
494     }
495 
496   if (j!=nel)
497     {error_msg_fatal("nel j mismatch!\n");}
498 
499 #ifdef SAFE
500   /* pre-pass ... check to see if sorted */
501   elms[nel] = INT_MAX;
502   iptr = elms;
503   unique = elms+1;
504   j=0;
505   while (*iptr!=INT_MAX)
506     {
507       if (*iptr++>*unique++)
508 	{j=1; break;}
509     }
510 
511   /* set up inverse map */
512   if (j)
513     {
514       error_msg_warning("gsi_check_args() :: elm list *not* sorted!\n");
515       SMI_sort((void*)elms, (void*)companion, nel, SORT_INTEGER);
516     }
517   else
518     {error_msg_warning("gsi_check_args() :: elm list sorted!\n");}
519 #else
520   SMI_sort((void*)elms, (void*)companion, nel, SORT_INTEGER);
521 #endif
522   elms[nel] = INT_MIN;
523 
524   /* first pass */
525   /* determine number of unique elements, check pd */
526   for (i=k=0;i<nel;i+=j)
527     {
528       t2 = elms[i];
529       j=++i;
530 
531       /* clump 'em for now */
532       while (elms[j]==t2) {j++;}
533 
534       /* how many together and num local */
535       if (j-=i)
536 	{num_local++; k+=j;}
537     }
538 
539   /* how many unique elements? */
540   gs->repeats=k;
541   gs->nel = nel-k;
542 
543 
544   /* number of repeats? */
545   gs->num_local = num_local;
546   num_local+=2;
547   gs->local_reduce=local_reduce=(int **)perm_malloc(num_local*INT_PTR_LEN);
548   gs->num_local_reduce=num_to_reduce=(int*) perm_malloc(num_local*INT_LEN);
549 
550   unique = (int*) bss_malloc((gs->nel+1)*INT_LEN);
551   gs->elms = unique;
552   gs->nel_total = nel;
553   gs->local_elms = elms;
554   gs->companion = companion;
555 
556   /* compess map as well as keep track of local ops */
557   for (num_local=i=j=0;i<gs->nel;i++)
558     {
559       k=j;
560       t2 = unique[i] = elms[j];
561       companion[i] = companion[j];
562 
563       while (elms[j]==t2) {j++;}
564 
565       if ((t2=(j-k))>1)
566 	{
567 	  /* number together */
568 	  num_to_reduce[num_local] = t2++;
569 	  iptr = local_reduce[num_local++] = (int*)perm_malloc(t2*INT_LEN);
570 
571 	  /* to use binary searching don't remap until we check intersection */
572 	  *iptr++ = i;
573 
574 	  /* note that we're skipping the first one */
575 	  while (++k<j)
576 	    {*(iptr++) = companion[k];}
577 	  *iptr = -1;
578 	}
579     }
580 
581   /* sentinel for ngh_buf */
582   unique[gs->nel]=INT_MAX;
583 
584 #ifdef DEBUG
585   if (num_local!=gs->num_local)
586     {error_msg_fatal("compression of maps wrong!!!\n");}
587 #endif
588 
589   /* for two partition sort hack */
590   num_to_reduce[num_local] = 0;
591   local_reduce[num_local] = NULL;
592   num_to_reduce[++num_local] = 0;
593   local_reduce[num_local] = NULL;
594 
595   /* load 'em up */
596   /* note one extra to hold NON_UNIFORM flag!!! */
597   vals[2] = vals[1] = vals[0] = nel;
598   if (gs->nel>0)
599     {
600        vals[3] = unique[0];           /* ivec_lb(elms,nel); */
601        vals[4] = unique[gs->nel-1];   /* ivec_ub(elms,nel); */
602     }
603   else
604     {
605        vals[3] = INT_MAX;             /* ivec_lb(elms,nel); */
606        vals[4] = INT_MIN;             /* ivec_ub(elms,nel); */
607     }
608   vals[5] = level;
609   vals[6] = num_gs_ids;
610 
611   /* GLOBAL: send 'em out */
612   giop(vals,work,sizeof(oprs)/sizeof(oprs[0])-1,oprs);
613 
614   /* must be semi-pos def - only pairwise depends on this */
615   /* LATER - remove this restriction */
616   if (vals[3]<0)
617     {error_msg_fatal("gsi_check_args() :: system not semi-pos def ::%d\n",vals[3]);}
618 
619   if (vals[4]==INT_MAX)
620     {error_msg_fatal("gsi_check_args() :: system ub too large ::%d!\n",vals[4]);}
621 
622 #ifdef DEBUG
623   /* check gs template count */
624   if (vals[6] != num_gs_ids)
625     {error_msg_fatal("num_gs_ids mismatch!!!");}
626 
627   /* check all have same level threshold */
628   if (level != vals[5])
629     {error_msg_fatal("gsi_check_args() :: level not uniform across nodes!!!\n");}
630 #endif
631 
632   gs->nel_min = vals[0];
633   gs->nel_max = vals[1];
634   gs->nel_sum = vals[2];
635   gs->gl_min  = vals[3];
636   gs->gl_max  = vals[4];
637   gs->negl    = vals[4]-vals[3]+1;
638 
639 #ifdef DEBUG
640       printf("nel(unique)=%d\n", gs->nel);
641       printf("nel_max=%d\n",     gs->nel_max);
642       printf("nel_min=%d\n",     gs->nel_min);
643       printf("nel_sum=%d\n",     gs->nel_sum);
644       printf("negl=%d\n",        gs->negl);
645       printf("gl_max=%d\n",      gs->gl_max);
646       printf("gl_min=%d\n",      gs->gl_min);
647       printf("elms ordered=%d\n",gs->ordered);
648       printf("repeats=%d\n",     gs->repeats);
649       printf("positive=%d\n",    gs->positive);
650       printf("level=%d\n",       gs->level);
651 #endif
652 
653   if (gs->negl<=0)
654     {error_msg_fatal("gsi_check_args() :: system empty or neg :: %d\n",gs->negl);}
655 
656   /* LATER :: add level == -1 -> program selects level */
657   if (vals[5]<0)
658     {vals[5]=0;}
659   else if (vals[5]>num_nodes)
660     {vals[5]=num_nodes;}
661   gs->level = vals[5];
662 
663 #ifdef DEBUG
664   error_msg_warning("gs_check_args() :: end w/(%d,%d)+level=%d\n",
665 		    my_id,num_nodes,vals[5]);
666 #endif
667 
668   return(gs);
669 }
670 
671 
672 /******************************************************************************
673 Function: gsi_via_bit_mask()
674 
675 Input :
676 Output:
677 Return:
678 Description:
679 
680 
681 ******************************************************************************/
682 static
683 void
684 gsi_via_bit_mask(gs_id *gs)
685 {
686   register int i, nel, *elms;
687   int t1;
688   int **reduce;
689   int *map;
690 
691 #ifdef DEBUG
692   error_msg_warning("gsi_via_bit_mask() begin w/%d :: %d\n",my_id,num_nodes);
693 #endif
694 
695   /* totally local removes ... ct_bits == 0 */
696   get_ngh_buf(gs);
697 
698   if (gs->level)
699     {set_pairwise(gs);}
700 
701   if (gs->max_left_over)
702     {set_tree(gs);}
703 
704   /* intersection local and pairwise/tree? */
705   gs->num_local_total = gs->num_local;
706   gs->gop_local_reduce = gs->local_reduce;
707   gs->num_gop_local_reduce = gs->num_local_reduce;
708 
709   map = gs->companion;
710 
711   /* is there any local compression */
712   if (!gs->num_local) {
713     gs->local_strength = NONE;
714     gs->num_local_gop = 0;
715   } else {
716       /* ok find intersection */
717       map = gs->companion;
718       reduce = gs->local_reduce;
719       for (i=0, t1=0; i<gs->num_local; i++, reduce++)
720 	{
721 	  if ((ivec_binary_search(**reduce,gs->pw_elm_list,gs->len_pw_list)>=0)
722 	      ||
723 	      ivec_binary_search(**reduce,gs->tree_map_in,gs->tree_map_sz)>=0)
724 	    {
725 	      /* printf("C%d :: i=%d, **reduce=%d\n",my_id,i,**reduce); */
726 	      t1++;
727 	      if (gs->num_local_reduce[i]<=0)
728 		{error_msg_fatal("nobody in list?");}
729 	      gs->num_local_reduce[i] *= -1;
730 	    }
731 	   **reduce=map[**reduce];
732 	}
733 
734       /* intersection is empty */
735       if (!t1)
736 	{
737 #ifdef DEBUG
738 	  error_msg_warning("gsi_check_args() :: local gs_gop w/o intersection!");
739 #endif
740 	  gs->local_strength = FULL;
741 	  gs->num_local_gop = 0;
742 	}
743       /* intersection not empty */
744       else
745 	{
746 #ifdef DEBUG
747 	  error_msg_warning("gsi_check_args() :: local gs_gop w/intersection!");
748 #endif
749 	  gs->local_strength = PARTIAL;
750 	  SMI_sort((void*)gs->num_local_reduce, (void*)gs->local_reduce,
751 		   gs->num_local + 1, SORT_INT_PTR);
752 
753 	  gs->num_local_gop = t1;
754 	  gs->num_local_total =  gs->num_local;
755 	  gs->num_local    -= t1;
756 	  gs->gop_local_reduce = gs->local_reduce;
757 	  gs->num_gop_local_reduce = gs->num_local_reduce;
758 
759 	  for (i=0; i<t1; i++)
760 	    {
761 	      if (gs->num_gop_local_reduce[i]>=0)
762 		{error_msg_fatal("they aren't negative?");}
763 	      gs->num_gop_local_reduce[i] *= -1;
764 	      gs->local_reduce++;
765 	      gs->num_local_reduce++;
766 	    }
767 	  gs->local_reduce++;
768 	  gs->num_local_reduce++;
769 	}
770     }
771 
772   elms = gs->pw_elm_list;
773   nel  = gs->len_pw_list;
774   for (i=0; i<nel; i++)
775     {elms[i] = map[elms[i]];}
776 
777   elms = gs->tree_map_in;
778   nel  = gs->tree_map_sz;
779   for (i=0; i<nel; i++)
780     {elms[i] = map[elms[i]];}
781 
782   /* clean up */
783   bss_free((void*) gs->local_elms);
784   bss_free((void*) gs->companion);
785   bss_free((void*) gs->elms);
786   bss_free((void*) gs->ngh_buf);
787   gs->local_elms = gs->companion = gs->elms = gs->ngh_buf = NULL;
788 
789 #ifdef DEBUG
790   error_msg_warning("gsi_via_bit_mask() end w/%d :: %d\n",my_id,num_nodes);
791 #endif
792 }
793 
794 
795 
796 /******************************************************************************
797 Function: place_in_tree()
798 
799 Input :
800 Output:
801 Return:
802 Description:
803 
804 
805 ******************************************************************************/
806 static
807 void
808 place_in_tree(register int elm)
809 {
810   register int *tp, n;
811 
812 
813   if (ntree==tree_buf_sz)
814     {
815       if (tree_buf_sz)
816 	{
817 	  tp = tree_buf;
818 	  n = tree_buf_sz;
819 	  tree_buf_sz<<=1;
820 	  tree_buf = (int*)bss_malloc(tree_buf_sz*INT_LEN);
821 	  ivec_copy(tree_buf,tp,n);
822 	  bss_free(tp);
823 	}
824       else
825 	{
826 	  tree_buf_sz = TREE_BUF_SZ;
827 	  tree_buf = (int*)bss_malloc(tree_buf_sz*INT_LEN);
828 	}
829     }
830 
831   tree_buf[ntree++] = elm;
832 }
833 
834 
835 
836 /******************************************************************************
837 Function: get_ngh_buf()
838 
839 Input :
840 Output:
841 Return:
842 Description:
843 
844 
845 ******************************************************************************/
846 static
847 void
848 get_ngh_buf(gs_id *gs)
849 {
850   register int i, j, npw=0, ntree_map=0;
851   int p_mask_size, ngh_buf_size, buf_size;
852   int *p_mask, *sh_proc_mask, *pw_sh_proc_mask;
853   int *ngh_buf, *buf1, *buf2;
854   int offset, per_load, num_loads, or_ct, start, end;
855   int *ptr1, *ptr2, i_start, negl, nel, *elms;
856   int oper=GL_B_OR;
857   int *ptr3, *t_mask, level, ct1, ct2;
858 
859 #ifdef DEBUG
860   error_msg_warning("get_ngh_buf() begin w/%d :: %d\n",my_id,num_nodes);
861 #endif
862 
863   /* to make life easier */
864   nel   = gs->nel;
865   elms  = gs->elms;
866   level = gs->level;
867 
868   /* det #bytes needed for processor bit masks and init w/mask cor. to my_id */
869   p_mask = (int*) bss_malloc(p_mask_size=len_bit_mask(num_nodes));
870   set_bit_mask(p_mask,p_mask_size,my_id);
871 
872   /* allocate space for masks and info bufs */
873   gs->nghs = sh_proc_mask = (int*) bss_malloc(p_mask_size);
874   gs->pw_nghs = pw_sh_proc_mask = (int*) perm_malloc(p_mask_size);
875   gs->ngh_buf_sz = ngh_buf_size = p_mask_size*nel;
876   t_mask = (int*) bss_malloc(p_mask_size);
877   gs->ngh_buf = ngh_buf = (int*) bss_malloc(ngh_buf_size);
878 
879   /* comm buffer size ... memory usage bounded by ~2*msg_buf */
880   /* had thought I could exploit rendezvous threshold */
881 
882   /* default is one pass */
883   per_load = negl  = gs->negl;
884   gs->num_loads = num_loads = 1;
885   i=p_mask_size*negl;
886 
887   /* possible overflow on buffer size */
888   /* overflow hack                    */
889   if (i<0) {i=INT_MAX;}
890 
891   buf_size = MIN(msg_buf,i);
892 
893   /* can we do it? */
894   if (p_mask_size>buf_size)
895     {error_msg_fatal("get_ngh_buf() :: buf<pms :: %d>%d\n",p_mask_size,buf_size);}
896 
897   /* get giop buf space ... make *only* one malloc */
898   buf1 = (int*) bss_malloc(buf_size<<1);
899 
900   /* more than one gior exchange needed? */
901   if (buf_size!=i)
902     {
903       per_load = buf_size/p_mask_size;
904       buf_size = per_load*p_mask_size;
905       gs->num_loads = num_loads = negl/per_load + (negl%per_load>0);
906     }
907 
908 #ifdef DEBUG
909   /* dump some basic info */
910   error_msg_warning("n_lds=%d,pms=%d,buf_sz=%d\n",num_loads,p_mask_size,buf_size);
911 #endif
912 
913   /* convert buf sizes from #bytes to #ints - 32 bit only! */
914 #ifdef SAFE
915   p_mask_size/=INT_LEN; ngh_buf_size/=INT_LEN; buf_size/=INT_LEN;
916 #else
917   p_mask_size>>=2; ngh_buf_size>>=2; buf_size>>=2;
918 #endif
919 
920   /* find giop work space */
921   buf2 = buf1+buf_size;
922 
923   /* hold #ints needed for processor masks */
924   gs->mask_sz=p_mask_size;
925 
926   /* init buffers */
927   ivec_zero(sh_proc_mask,p_mask_size);
928   ivec_zero(pw_sh_proc_mask,p_mask_size);
929   ivec_zero(ngh_buf,ngh_buf_size);
930 
931   /* HACK reset tree info */
932   tree_buf=NULL;
933   tree_buf_sz=ntree=0;
934 
935   /* queue the tree elements for now */
936   /* elms_q = new_queue(); */
937 
938   /* can also queue tree info for pruned or forest implememtation */
939   /*  mask_q = new_queue(); */
940 
941   /* ok do it */
942   for (ptr1=ngh_buf,ptr2=elms,end=gs->gl_min,or_ct=i=0; or_ct<num_loads; or_ct++)
943     {
944       /* identity for bitwise or is 000...000 */
945       ivec_zero(buf1,buf_size);
946 
947       /* load msg buffer */
948       for (start=end,end+=per_load,i_start=i; (offset=*ptr2)<end; i++, ptr2++)
949 	{
950 	  offset = (offset-start)*p_mask_size;
951 	  ivec_copy(buf1+offset,p_mask,p_mask_size);
952 	}
953 
954       /* GLOBAL: pass buffer */
955 #if   defined NXSRC
956       gior(buf1,buf_size,buf2);
957 #elif defined MPISRC
958       giop(buf1,buf2,buf_size,&oper);
959 #endif
960 
961 
962       /* unload buffer into ngh_buf */
963       ptr2=(elms+i_start);
964       for(ptr3=buf1,j=start; j<end; ptr3+=p_mask_size,j++)
965 	{
966 	  /* I own it ... may have to pairwise it */
967 	  if (j==*ptr2)
968 	    {
969 	      /* do i share it w/anyone? */
970 #ifdef SAFE
971 	      ct1 = ct_bits((char *)ptr3,p_mask_size*INT_LEN);
972 #else
973 	      ct1 = ct_bits((char *)ptr3,p_mask_size<<2);
974 #endif
975 	      /* guess not */
976 	      if (ct1<2)
977 		{ptr2++; ptr1+=p_mask_size; continue;}
978 
979 	      /* i do ... so keep info and turn off my bit */
980 	      ivec_copy(ptr1,ptr3,p_mask_size);
981 	      ivec_xor(ptr1,p_mask,p_mask_size);
982 	      ivec_or(sh_proc_mask,ptr1,p_mask_size);
983 
984 	      /* is it to be done pairwise? */
985 	      if (--ct1<=level)
986 		{
987 		  npw++;
988 
989 		  /* turn on high bit to indicate pw need to process */
990 		  *ptr2++ |= TOP_BIT;
991 		  ivec_or(pw_sh_proc_mask,ptr1,p_mask_size);
992 		  ptr1+=p_mask_size;
993 		  continue;
994 		}
995 
996 	      /* get set for next and note that I have a tree contribution */
997 	      /* could save exact elm index for tree here -> save a search */
998 	      ptr2++; ptr1+=p_mask_size; ntree_map++;
999 	    }
1000 	  /* i don't but still might be involved in tree */
1001 	  else
1002 	    {
1003 
1004 	      /* shared by how many? */
1005 #ifdef SAFE
1006 	      ct1 = ct_bits((char *)ptr3,p_mask_size*INT_LEN);
1007 #else
1008 	      ct1 = ct_bits((char *)ptr3,p_mask_size<<2);
1009 #endif
1010 
1011 	      /* none! */
1012 	      if (ct1<2)
1013 		{continue;}
1014 
1015 	      /* is it going to be done pairwise? but not by me of course!*/
1016 	      if (--ct1<=level)
1017 		{continue;}
1018 	    }
1019 	  /* LATER we're going to have to process it NOW */
1020 	  /* nope ... tree it */
1021 	  place_in_tree(j);
1022 	}
1023     }
1024 
1025   bss_free((void*)t_mask);
1026   bss_free((void*)buf1);
1027 
1028   gs->len_pw_list=npw;
1029   gs->num_nghs = ct_bits((char *)sh_proc_mask,p_mask_size*INT_LEN);
1030 
1031   /* expand from bit mask list to int list and save ngh list */
1032   gs->nghs = (int*) perm_malloc(gs->num_nghs * INT_LEN);
1033   bm_to_proc((char *)sh_proc_mask,p_mask_size*INT_LEN,gs->nghs);
1034 
1035   gs->num_pw_nghs = ct_bits((char *)pw_sh_proc_mask,p_mask_size*INT_LEN);
1036 
1037   oper = GL_MAX;
1038   ct1 = gs->num_nghs;
1039   giop(&ct1,&ct2,1,&oper);
1040   gs->max_nghs = ct1;
1041 
1042   gs->tree_map_sz  = ntree_map;
1043   gs->max_left_over=ntree;
1044 
1045   bss_free((void*)p_mask);
1046   bss_free((void*)sh_proc_mask);
1047 
1048 #ifdef DEBUG
1049   error_msg_warning("get_ngh_buf() end w/%d :: %d\n",my_id,num_nodes);
1050 #endif
1051 }
1052 
1053 
1054 
1055 
1056 
1057 /******************************************************************************
1058 Function: pairwise_init()
1059 
1060 Input :
1061 Output:
1062 Return:
1063 Description:
1064 
1065 if an element is shared by fewer that level# of nodes do pairwise exch
1066 ******************************************************************************/
1067 static
1068 void
1069 set_pairwise(gs_id *gs)
1070 {
1071   register int i, j;
1072   int p_mask_size;
1073   int *p_mask, *sh_proc_mask, *tmp_proc_mask;
1074   int *ngh_buf, *buf2;
1075   int offset;
1076   int *msg_list, *msg_size, **msg_nodes, nprs;
1077   int *pairwise_elm_list, len_pair_list=0;
1078   int *iptr, t1, i_start, nel, *elms;
1079   int ct;
1080 
1081 
1082 #ifdef DEBUG
1083   error_msg_warning("set_pairwise() begin w/%d :: %d\n",my_id,num_nodes);
1084 #endif
1085 
1086   /* to make life easier */
1087   nel  = gs->nel;
1088   elms = gs->elms;
1089   ngh_buf = gs->ngh_buf;
1090   sh_proc_mask  = gs->pw_nghs;
1091 
1092   /* need a few temp masks */
1093   p_mask_size   = len_bit_mask(num_nodes);
1094   p_mask        = (int*) bss_malloc(p_mask_size);
1095   tmp_proc_mask = (int*) bss_malloc(p_mask_size);
1096 
1097   /* set mask to my my_id's bit mask */
1098   set_bit_mask(p_mask,p_mask_size,my_id);
1099 
1100 #ifdef SAFE
1101   p_mask_size /= INT_LEN;
1102 #else
1103   p_mask_size >>= 2;
1104 #endif
1105 
1106   len_pair_list=gs->len_pw_list;
1107   gs->pw_elm_list=pairwise_elm_list=(int*)perm_malloc((len_pair_list+1)*INT_LEN);
1108 
1109   /* how many processors (nghs) do we have to exchange with? */
1110   nprs=gs->num_pairs=ct_bits((char *)sh_proc_mask,p_mask_size*INT_LEN);
1111 
1112 
1113   /* allocate space for gs_gop() info */
1114   gs->pair_list = msg_list = (int*)  perm_malloc(INT_LEN*nprs);
1115   gs->msg_sizes = msg_size  = (int*)  perm_malloc(INT_LEN*nprs);
1116   gs->node_list = msg_nodes = (int **) perm_malloc(INT_PTR_LEN*(nprs+1));
1117 
1118   /* init msg_size list */
1119   ivec_zero(msg_size,nprs);
1120 
1121   /* expand from bit mask list to int list */
1122   bm_to_proc((char *)sh_proc_mask,p_mask_size*INT_LEN,msg_list);
1123 
1124   /* keep list of elements being handled pairwise */
1125   for (i=j=0;i<nel;i++)
1126     {
1127       if (elms[i] & TOP_BIT)
1128 	{elms[i] ^= TOP_BIT; pairwise_elm_list[j++] = i;}
1129     }
1130   pairwise_elm_list[j] = -1;
1131 
1132 #ifdef DEBUG
1133   if (j!=len_pair_list)
1134     {error_msg_fatal("oops ... bad paiwise list in set_pairwise!");}
1135 #endif
1136 
1137 #if defined MPISRC
1138   gs->msg_ids_out = (MPI_Request *)  perm_malloc(sizeof(MPI_Request)*(nprs+1));
1139   gs->msg_ids_out[nprs] = MPI_REQUEST_NULL;
1140   gs->msg_ids_in = (MPI_Request *)  perm_malloc(sizeof(MPI_Request)*(nprs+1));
1141   gs->msg_ids_in[nprs] = MPI_REQUEST_NULL;
1142   gs->pw_vals = (REAL *) perm_malloc(REAL_LEN*len_pair_list*vec_sz);
1143 #else
1144   gs->msg_ids_out = (int*)  perm_malloc(INT_LEN*(nprs+1));
1145   ivec_zero(gs->msg_ids_out,nprs);
1146   gs->msg_ids_out[nprs] = -1;
1147   gs->msg_ids_in = (int*)  perm_malloc(INT_LEN*(nprs+1));
1148   ivec_zero(gs->msg_ids_in,nprs);
1149   gs->msg_ids_in[nprs] = -1;
1150   gs->pw_vals = (REAL *) perm_malloc(REAL_LEN*len_pair_list*vec_sz);
1151 #endif
1152 
1153   /* find who goes to each processor */
1154   for (i_start=i=0;i<nprs;i++)
1155     {
1156       /* processor i's mask */
1157       set_bit_mask(p_mask,p_mask_size*INT_LEN,msg_list[i]);
1158 
1159       /* det # going to processor i */
1160       for (ct=j=0;j<len_pair_list;j++)
1161 	{
1162 	  buf2 = ngh_buf+(pairwise_elm_list[j]*p_mask_size);
1163 	  ivec_and3(tmp_proc_mask,p_mask,buf2,p_mask_size);
1164 	  if (ct_bits((char *)tmp_proc_mask,p_mask_size*INT_LEN))
1165 	    {ct++;}
1166 	}
1167       msg_size[i] = ct;
1168       i_start = MAX(i_start,ct);
1169 
1170       /*space to hold nodes in message to first neighbor */
1171       msg_nodes[i] = iptr = (int*) perm_malloc(INT_LEN*(ct+1));
1172 
1173       for (j=0;j<len_pair_list;j++)
1174 	{
1175 	  buf2 = ngh_buf+(pairwise_elm_list[j]*p_mask_size);
1176 	  ivec_and3(tmp_proc_mask,p_mask,buf2,p_mask_size);
1177 	  if (ct_bits((char *)tmp_proc_mask,p_mask_size*INT_LEN))
1178 	    {*iptr++ = j;}
1179 	}
1180       *iptr = -1;
1181     }
1182   msg_nodes[nprs] = NULL;
1183 
1184 #ifdef INFO1
1185   t1 = GL_MAX;
1186   giop(&i_start,&offset,1,&t1);
1187   gs->max_pairs = i_start;
1188 #else
1189   j=gs->loc_node_pairs=i_start;
1190   t1 = GL_MAX;
1191   giop(&i_start,&offset,1,&t1);
1192   gs->max_node_pairs = i_start;
1193 
1194   i_start=j;
1195   t1 = GL_MIN;
1196   giop(&i_start,&offset,1,&t1);
1197   gs->min_node_pairs = i_start;
1198 
1199   i_start=j;
1200   t1 = GL_ADD;
1201   giop(&i_start,&offset,1,&t1);
1202   gs->avg_node_pairs = i_start/num_nodes + 1;
1203 
1204   i_start=nprs;
1205   t1 = GL_MAX;
1206   giop(&i_start,&offset,1,&t1);
1207   gs->max_pairs = i_start;
1208 
1209   /*gs->max_pairs = -1;*/
1210 #endif
1211 
1212   /* remap pairwise in tail of gsi_via_bit_mask() */
1213   gs->msg_total = ivec_sum(gs->msg_sizes,nprs);
1214   gs->out = (REAL *) perm_malloc(REAL_LEN*gs->msg_total*vec_sz);
1215   gs->in  = (REAL *) perm_malloc(REAL_LEN*gs->msg_total*vec_sz);
1216 
1217   /* reset malloc pool */
1218   bss_free((void*)p_mask);
1219   bss_free((void*)tmp_proc_mask);
1220 
1221 #ifdef DEBUG
1222   error_msg_warning("set_pairwise() end w/%d :: %d\n",my_id,num_nodes);
1223 #endif
1224 }
1225 
1226 
1227 
1228 /******************************************************************************
1229 Function: set_tree()
1230 
1231 Input :
1232 Output:
1233 Return:
1234 Description:
1235 
1236 to do pruned tree just save ngh buf copy for each one and decode here!
1237 ******************************************************************************/
1238 static
1239 void
1240 set_tree(gs_id *gs)
1241 {
1242   register int i, j, n, nel;
1243   register int *iptr_in, *iptr_out, *tree_elms, *elms;
1244 
1245 
1246 #ifdef DEBUG
1247   error_msg_warning("set_tree() :: begin\n");
1248 #endif
1249 
1250   /* local work ptrs */
1251   elms = gs->elms;
1252   nel     = gs->nel;
1253 
1254   /* how many via tree */
1255   gs->tree_nel  = n = ntree;
1256   gs->tree_elms = tree_elms = iptr_in = tree_buf;
1257   gs->tree_buf  = (REAL *) bss_malloc(REAL_LEN*n*vec_sz);
1258   gs->tree_work = (REAL *) bss_malloc(REAL_LEN*n*vec_sz);
1259   j=gs->tree_map_sz;
1260   gs->tree_map_in = iptr_in  = (int*) bss_malloc(INT_LEN*(j+1));
1261   gs->tree_map_out = iptr_out = (int*) bss_malloc(INT_LEN*(j+1));
1262 
1263 #ifdef DEBUG
1264   error_msg_warning("num on tree=%d,%d",gs->max_left_over,gs->tree_nel);
1265 #endif
1266 
1267   /* search the longer of the two lists */
1268   /* note ... could save this info in get_ngh_buf and save searches */
1269   if (n<=nel)
1270     {
1271       /* bijective fct w/remap - search elm list */
1272       for (i=0; i<n; i++)
1273 	{
1274 	  if ((j=ivec_binary_search(*tree_elms++,elms,nel))>=0)
1275 	    {*iptr_in++ = j; *iptr_out++ = i;}
1276 	}
1277     }
1278   else
1279     {
1280       for (i=0; i<nel; i++)
1281 	{
1282 	  if ((j=ivec_binary_search(*elms++,tree_elms,n))>=0)
1283 	    {*iptr_in++ = i; *iptr_out++ = j;}
1284 	}
1285     }
1286 
1287   /* sentinel */
1288   *iptr_in = *iptr_out = -1;
1289 
1290 #ifdef DEBUG
1291   error_msg_warning("set_tree() :: end\n");
1292 #endif
1293 }
1294 
1295 
1296 /******************************************************************************
1297 Function: gsi_via_int_list()
1298 
1299 Input :
1300 Output:
1301 Return:
1302 Description:
1303 ******************************************************************************/
1304 /*static
1305 void
1306 gsi_via_int_list(gs_id *gs)
1307 {
1308 
1309    LATER: for P large the bit masks -> too many passes
1310    LATER: strategy: do gsum w/1 in position i in negl if owner
1311    LATER: then sum of entire vector 1 ... negl determines min buf len
1312    LATER: So choose min from this or mask method
1313 }*/
1314 
1315 
1316 #if defined(not_used)
1317 static
1318 int
1319 root_sub_tree(int *proc_list, int num)
1320 {
1321   register int i, j, p_or, p_and;
1322   register int root, mask;
1323 
1324 
1325   /* ceiling(log2(num_nodes)) - 1 */
1326   j = i_log2_num_nodes;
1327   if (num_nodes==floor_num_nodes)
1328     {j--;}
1329 
1330   /* set mask to msb */
1331   for(mask=1,i=0; i<j; i++)
1332     {mask<<=1;}
1333 
1334   p_or  = ivec_reduce_or(proc_list,num);
1335   p_and = ivec_reduce_and(proc_list,num);
1336   for(root=i=0; i<j; i++,mask>>=1)
1337     {
1338       /* (msb-i)'th bits on ==> root in right 1/2 tree */
1339       if (mask & p_and)
1340 	{root |= mask;}
1341 
1342       /* (msb-i)'th bits differ ==> root found */
1343       else if (mask & p_or)
1344 	{break;}
1345 
1346       /* (msb-i)'th bits off ==>root in left 1/2 tree */
1347     }
1348 
1349 #ifdef DEBUG
1350   if ((root<0) || (root>num_nodes))
1351     {error_msg_fatal("root_sub_tree() :: bad root!");}
1352 
1353   if (!my_id)
1354     {
1355       printf("num_nodes=%d, j=%d, root=%d\n",num_nodes,j,root);
1356       printf("procs: ");
1357       for(i=0;i<num;i++)
1358 	{printf("%d ",proc_list[i]);}
1359       printf("\n");
1360     }
1361 #endif
1362 
1363   return(root);
1364 }
1365 #endif
1366 
1367 
1368 #if defined(not_used)
1369 static int
1370 in_sub_tree(int *mask, int mask_size, int *work, int nw)
1371 {
1372   int ct, nb;
1373 
1374   /* mask size in bytes */
1375   nb = mask_size<<2;
1376 
1377   /* shared amoungst how many? */
1378   ct = ct_bits((char *)mask,nb);
1379 
1380   /* enough space? */
1381   if (nw<ct)
1382     {error_msg_fatal("in_sub_tree() :: not enough space to expand bit mask!");}
1383 
1384   /* expand */
1385   bm_to_proc((char *)mask,nb,work);
1386 
1387   /* find tree root */
1388   root_sub_tree(work,ct);
1389 
1390   /* am i in any of the paths? */
1391 
1392   return(TRUE);
1393 
1394   /*
1395   sh_mask = (int*)bss_malloc(nb);
1396   bss_free(sh_mask);
1397   */
1398 }
1399 #endif
1400 
1401 
1402 /******************************************************************************
1403 Function: gather_scatter
1404 
1405 Input :
1406 Output:
1407 Return:
1408 Description:
1409 ******************************************************************************/
1410 static
1411 void
1412 gs_gop_local_out(register gs_id *gs, register REAL *vals)
1413 {
1414   register int *num, *map, **reduce;
1415   register REAL tmp;
1416 
1417 
1418 #ifdef DEBUG
1419   error_msg_warning("start gs_gop_xxx()\n");
1420 #endif
1421 
1422   num    = gs->num_gop_local_reduce;
1423   reduce = gs->gop_local_reduce;
1424   while ((map = *reduce++))
1425     {
1426       /* wall */
1427       if (*num == 2)
1428 	{
1429 	  num ++;
1430 	  vals[map[1]] = vals[map[0]];
1431 	}
1432       /* corner shared by three elements */
1433       else if (*num == 3)
1434 	{
1435 	  num ++;
1436 	  vals[map[2]] = vals[map[1]] = vals[map[0]];
1437 	}
1438       /* corner shared by four elements */
1439       else if (*num == 4)
1440 	{
1441 	  num ++;
1442 	  vals[map[3]] = vals[map[2]] = vals[map[1]] = vals[map[0]];
1443 	}
1444       /* general case ... odd geoms ... 3D*/
1445       else
1446 	{
1447 	  num++;
1448 	  tmp = *(vals + *map++);
1449 	  while (*map >= 0)
1450 	    {*(vals + *map++) = tmp;}
1451 	}
1452     }
1453 }
1454 
1455 
1456 
1457 /******************************************************************************
1458 Function: gather_scatter
1459 
1460 Input :
1461 Output:
1462 Return:
1463 Description:
1464 ******************************************************************************/
1465 void
1466 gs_gop_binary(gs_ADT gs, REAL *vals, rbfp fct)
1467 {
1468 #ifdef DEBUG
1469   if (!gs)  {error_msg_fatal("gs_gop() :: passed NULL gs handle!!!");}
1470   if (!fct) {error_msg_fatal("gs_gop() :: passed NULL bin fct handle!!!");}
1471   error_msg_warning("start gs_gop_xxx()\n");
1472 #endif
1473 
1474   /* local only operations!!! */
1475   if (gs->num_local)
1476     {gs_gop_local_binary(gs,vals,fct);}
1477 
1478   /* if intersection tree/pairwise and local isn't empty */
1479   if (gs->num_local_gop)
1480     {
1481       gs_gop_local_in_binary(gs,vals,fct);
1482 
1483       /* pairwise */
1484       if (gs->num_pairs)
1485 	{gs_gop_pairwise_binary(gs,vals,fct);}
1486 
1487       /* tree */
1488       else if (gs->max_left_over)
1489 	{gs_gop_tree_binary(gs,vals,fct);}
1490 
1491       gs_gop_local_out(gs,vals);
1492     }
1493   /* if intersection tree/pairwise and local is empty */
1494   else
1495     {
1496       /* pairwise */
1497       if (gs->num_pairs)
1498 	{gs_gop_pairwise_binary(gs,vals,fct);}
1499 
1500       /* tree */
1501       else if (gs->max_left_over)
1502 	{gs_gop_tree_binary(gs,vals,fct);}
1503     }
1504 }
1505 
1506 
1507 
1508 /******************************************************************************
1509 Function: gather_scatter
1510 
1511 Input :
1512 Output:
1513 Return:
1514 Description:
1515 ******************************************************************************/
1516 static
1517 void
1518 gs_gop_local_binary(register gs_id *gs, register REAL *vals, register rbfp fct)
1519 {
1520   register int *num, *map, **reduce;
1521   REAL tmp;
1522 
1523 
1524 #ifdef DEBUG
1525   error_msg_warning("start gs_gop_xxx()\n");
1526 #endif
1527   num    = gs->num_local_reduce;
1528   reduce = gs->local_reduce;
1529   while ((map = *reduce))
1530     {
1531       num ++;
1532       (*fct)(&tmp,NULL,1);
1533       /* tmp = 0.0; */
1534       while (*map >= 0)
1535 	{(*fct)(&tmp,(vals + *map),1); map++;}
1536 	/*        {tmp = (*fct)(tmp,*(vals + *map)); map++;} */
1537 
1538       map = *reduce++;
1539       while (*map >= 0)
1540         {*(vals + *map++) = tmp;}
1541     }
1542 }
1543 
1544 
1545 
1546 /******************************************************************************
1547 Function: gather_scatter
1548 
1549 Input :
1550 Output:
1551 Return:
1552 Description:
1553 ******************************************************************************/
1554 static
1555 void
1556 gs_gop_local_in_binary(register gs_id *gs, register REAL *vals, register rbfp fct)
1557 {
1558   register int *num, *map, **reduce;
1559   register REAL *base;
1560 
1561 
1562 #ifdef DEBUG
1563   error_msg_warning("start gs_gop_xxx()\n");
1564 #endif
1565 
1566   num    = gs->num_gop_local_reduce;
1567 
1568   reduce = gs->gop_local_reduce;
1569   while ((map = *reduce++))
1570     {
1571       num++;
1572       base = vals + *map++;
1573       while (*map >= 0)
1574 	{(*fct)(base,(vals + *map),1); map++;}
1575 	/*        {*base = (*fct)(*base,*(vals + *map)); map++;} */
1576     }
1577 }
1578 
1579 
1580 
1581 /******************************************************************************
1582 Function: gather_scatter
1583 
1584 VERSION 3 ::
1585 
1586 Input :
1587 Output:
1588 Return:
1589 Description:
1590 ******************************************************************************/
1591 static
1592 void
1593 gs_gop_pairwise_binary(register gs_id *gs, register REAL *in_vals,
1594                        register rbfp fct)
1595 {
1596 #if   defined NXSRC
1597   register REAL *dptr1, *dptr2, *dptr3, *in1, *in2;
1598   register int *iptr, *msg_list, *msg_size, **msg_nodes;
1599   register int *pw, *list, *size, **nodes;
1600   register int *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
1601 
1602 #ifdef DEBUG
1603   error_msg_warning("start gs_gop_xxx()\n");
1604 #endif
1605 
1606   /* strip and load registers */
1607   msg_list =list         = gs->pair_list;
1608   msg_size =size         = gs->msg_sizes;
1609   msg_nodes=nodes        = gs->node_list;
1610   iptr=pw                = gs->pw_elm_list;
1611   dptr1=dptr3            = gs->pw_vals;
1612   msg_ids_in  = ids_in   = gs->msg_ids_in;
1613   msg_ids_out = ids_out  = gs->msg_ids_out;
1614   dptr2                  = gs->out;
1615   in1=in2                = gs->in;
1616 
1617   /* post the receives */
1618   do
1619     {
1620       *msg_ids_in++ = (int) irecv(MSGTAG1 + *list++,(char *)in1,*size*REAL_LEN);
1621       in1 += *size++;
1622     }
1623   while (*msg_ids_in >= 0);
1624 
1625   /* load gs values into in out gs buffers */
1626   while (*iptr >= 0)
1627     {*dptr3++ = *(in_vals + *iptr++);}
1628 
1629   /* load out buffers and post the sends */
1630   while (iptr = *msg_nodes++)
1631     {
1632       dptr3 = dptr2;
1633       while (*iptr >= 0)
1634         {*dptr2++ = *(dptr1 + *iptr++);}
1635       *msg_ids_out++ = (int) isend(MSGTAG1+my_id,(char *)dptr3,
1636 				   *(msg_size++)*REAL_LEN,*msg_list++,0);
1637     }
1638 
1639   /* post the receives ... was here*/
1640   if (gs->max_left_over)
1641     {gs_gop_tree_binary(gs,in_vals,fct);}
1642 
1643   /* process the received data */
1644   while (iptr = *nodes++)
1645     {
1646       msgwait(*ids_in++);
1647       while (*iptr >= 0)
1648 	{(*fct)((dptr1 + *iptr),in2,1); iptr++; in2++;}
1649 	/* {*(dptr1 + *iptr) = (*fct)(*(dptr1 + *iptr),*in2); iptr++; in2++;} */
1650     }
1651 
1652   /* replace vals */
1653   while (*pw >= 0)
1654     {*(in_vals + *pw++) = *dptr1++;}
1655 
1656   /* clear isend message handles */
1657   while (*ids_out >= 0)
1658     {msgwait(*ids_out++);}
1659 
1660 #elif defined MPISRC
1661   register REAL *dptr1, *dptr2, *dptr3, *in1, *in2;
1662   register int *iptr, *msg_list, *msg_size, **msg_nodes;
1663   register int *pw, *list, *size, **nodes;
1664   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
1665   MPI_Status status;
1666 
1667 
1668   /* strip and load registers */
1669   msg_list =list         = gs->pair_list;
1670   msg_size =size         = gs->msg_sizes;
1671   msg_nodes=nodes        = gs->node_list;
1672   iptr=pw                = gs->pw_elm_list;
1673   dptr1=dptr3            = gs->pw_vals;
1674   msg_ids_in  = ids_in   = gs->msg_ids_in;
1675   msg_ids_out = ids_out  = gs->msg_ids_out;
1676   dptr2                  = gs->out;
1677   in1=in2                = gs->in;
1678 
1679   /* post the receives */
1680   /*  msg_nodes=nodes; */
1681   do
1682     {
1683       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
1684          second one *list and do list++ afterwards */
1685       MPI_Irecv(in1, *size, REAL_TYPE, MPI_ANY_SOURCE, MSGTAG1 + *list++,
1686                 gs->gs_comm, msg_ids_in++);
1687       in1 += *size++;
1688     }
1689   while (*++msg_nodes);
1690   msg_nodes=nodes;
1691 
1692   /* load gs values into in out gs buffers */
1693   while (*iptr >= 0)
1694     {*dptr3++ = *(in_vals + *iptr++);}
1695 
1696   /* load out buffers and post the sends */
1697   while ((iptr = *msg_nodes++))
1698     {
1699       dptr3 = dptr2;
1700       while (*iptr >= 0)
1701         {*dptr2++ = *(dptr1 + *iptr++);}
1702       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
1703       /* is msg_ids_out++ correct? */
1704       MPI_Isend(dptr3, *msg_size++, REAL_TYPE, *msg_list++,
1705                 MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);
1706     }
1707 
1708   if (gs->max_left_over)
1709     {gs_gop_tree_binary(gs,in_vals,fct);}
1710 
1711   /* process the received data */
1712   msg_nodes=nodes;
1713   while ((iptr = *nodes++))
1714     {
1715       /* Should I check the return value of MPI_Wait() or status? */
1716       /* Can this loop be replaced by a call to MPI_Waitall()? */
1717       MPI_Wait(ids_in++, &status);
1718       while (*iptr >= 0)
1719 	{(*fct)((dptr1 + *iptr),in2,1); iptr++; in2++;}
1720       /* {*(dptr1 + *iptr) = (*fct)(*(dptr1 + *iptr),*in2); iptr++; in2++;} */
1721     }
1722 
1723   /* replace vals */
1724   while (*pw >= 0)
1725     {*(in_vals + *pw++) = *dptr1++;}
1726 
1727   /* clear isend message handles */
1728   /* This changed for clarity though it could be the same */
1729   while (*msg_nodes++)
1730     /* Should I check the return value of MPI_Wait() or status? */
1731     /* Can this loop be replaced by a call to MPI_Waitall()? */
1732     {MPI_Wait(ids_out++, &status);}
1733 #else
1734   return;
1735 #endif
1736 
1737 
1738 }
1739 
1740 
1741 
1742 /******************************************************************************
1743 Function: gather_scatter
1744 
1745 Input :
1746 Output:
1747 Return:
1748 Description:
1749 ******************************************************************************/
1750 static
1751 void
1752 gs_gop_tree_binary(gs_id *gs, REAL *vals, register rbfp fct)
1753 {
1754   int size;
1755   int *in, *out;
1756   REAL *buf, *work;
1757 
1758 #ifdef DEBUG
1759   error_msg_warning("gs_gop_tree_binary() :: start\n");
1760 #endif
1761 
1762   in   = gs->tree_map_in;
1763   out  = gs->tree_map_out;
1764   buf  = gs->tree_buf;
1765   work = gs->tree_work;
1766   size = gs->tree_nel;
1767 
1768   /* load vals vector w/identity */
1769   (*fct)(buf,NULL,size);
1770 
1771   /* load my contribution into val vector */
1772   while (*in >= 0)
1773     {(*fct)((buf + *out++),(vals + *in++),-1);}
1774 /*    {*(buf + *out++) = *(vals + *in++);} */
1775 
1776   gfop(buf,work,size,(vbfp)fct,REAL_TYPE,0);
1777 
1778   in   = gs->tree_map_in;
1779   out  = gs->tree_map_out;
1780   while (*in >= 0)
1781     {(*fct)((vals + *in++),(buf + *out++),-1);}
1782     /*    {*(vals + *in++) = *(buf + *out++);} */
1783 
1784 
1785 #ifdef DEBUG
1786   error_msg_warning("gs_gop_tree_binary() :: end\n");
1787 #endif
1788 
1789 }
1790 
1791 
1792 
1793 
1794 /******************************************************************************
1795 Function: gather_scatter
1796 
1797 Input :
1798 Output:
1799 Return:
1800 Description:
1801 ******************************************************************************/
1802 void
1803 gs_gop(register gs_id *gs, register REAL *vals, register const char *op)
1804 {
1805 #ifdef DEBUG
1806   error_msg_warning("start gs_gop()\n");
1807   if (!gs) {error_msg_fatal("gs_gop() :: passed NULL gs handle!!!");}
1808   if (!op) {error_msg_fatal("gs_gop() :: passed NULL operation!!!");}
1809 #endif
1810 
1811   switch (*op) {
1812   case '+':
1813     gs_gop_plus(gs,vals);
1814     break;
1815   case '*':
1816     gs_gop_times(gs,vals);
1817     break;
1818   case 'a':
1819     gs_gop_min_abs(gs,vals);
1820     break;
1821   case 'A':
1822     gs_gop_max_abs(gs,vals);
1823     break;
1824   case 'e':
1825     gs_gop_exists(gs,vals);
1826     break;
1827   case 'm':
1828     gs_gop_min(gs,vals);
1829     break;
1830   case 'M':
1831     gs_gop_max(gs,vals); break;
1832     /*
1833     if (*(op+1)=='\0')
1834       {gs_gop_max(gs,vals); break;}
1835     else if (*(op+1)=='X')
1836       {gs_gop_max_abs(gs,vals); break;}
1837     else if (*(op+1)=='N')
1838       {gs_gop_min_abs(gs,vals); break;}
1839     */
1840   default:
1841     error_msg_warning("gs_gop() :: %c is not a valid op",op[0]);
1842     error_msg_warning("gs_gop() :: default :: plus");
1843     gs_gop_plus(gs,vals);
1844     break;
1845   }
1846 #ifdef DEBUG
1847   error_msg_warning("end gs_gop()\n");
1848 #endif
1849 }
1850 
1851 
1852 /******************************************************************************
1853 Function: gather_scatter
1854 
1855 Input :
1856 Output:
1857 Return:
1858 Description:
1859 ******************************************************************************/
1860 static void
1861 gs_gop_exists(register gs_id *gs, register REAL *vals)
1862 {
1863 #ifdef DEBUG
1864   error_msg_warning("start gs_gop_xxx()\n");
1865 #endif
1866 
1867   /* local only operations!!! */
1868   if (gs->num_local)
1869     {gs_gop_local_exists(gs,vals);}
1870 
1871   /* if intersection tree/pairwise and local isn't empty */
1872   if (gs->num_local_gop)
1873     {
1874       gs_gop_local_in_exists(gs,vals);
1875 
1876       /* pairwise */
1877       if (gs->num_pairs)
1878 	{gs_gop_pairwise_exists(gs,vals);}
1879 
1880       /* tree */
1881       else if (gs->max_left_over)
1882 	{gs_gop_tree_exists(gs,vals);}
1883 
1884       gs_gop_local_out(gs,vals);
1885     }
1886   /* if intersection tree/pairwise and local is empty */
1887   else
1888     {
1889       /* pairwise */
1890       if (gs->num_pairs)
1891 	{gs_gop_pairwise_exists(gs,vals);}
1892 
1893       /* tree */
1894       else if (gs->max_left_over)
1895 	{gs_gop_tree_exists(gs,vals);}
1896     }
1897 }
1898 
1899 
1900 
1901 /******************************************************************************
1902 Function: gather_scatter
1903 
1904 Input :
1905 Output:
1906 Return:
1907 Description:
1908 ******************************************************************************/
1909 static
1910 void
1911 gs_gop_local_exists(register gs_id *gs, register REAL *vals)
1912 {
1913   register int *num, *map, **reduce;
1914   register REAL tmp;
1915 
1916 
1917 #ifdef DEBUG
1918   error_msg_warning("start gs_gop_xxx()\n");
1919 #endif
1920 
1921   num    = gs->num_local_reduce;
1922   reduce = gs->local_reduce;
1923   while ((map = *reduce))
1924     {
1925       num ++;
1926       tmp = 0.0;
1927       while (*map >= 0)
1928 	{tmp = EXISTS(tmp,*(vals + *map)); map++;}
1929 
1930       map = *reduce++;
1931       while (*map >= 0)
1932 	{*(vals + *map++) = tmp;}
1933     }
1934 }
1935 
1936 
1937 
1938 /******************************************************************************
1939 Function: gather_scatter
1940 
1941 Input :
1942 Output:
1943 Return:
1944 Description:
1945 ******************************************************************************/
1946 static
1947 void
1948 gs_gop_local_in_exists(register gs_id *gs, register REAL *vals)
1949 {
1950   register int *num, *map, **reduce;
1951   register REAL *base;
1952 
1953 
1954 #ifdef DEBUG
1955   error_msg_warning("start gs_gop_xxx()\n");
1956 #endif
1957 
1958   num    = gs->num_gop_local_reduce;
1959   reduce = gs->gop_local_reduce;
1960   while ((map = *reduce++))
1961     {
1962       num++;
1963       base = vals + *map++;
1964       while (*map >= 0)
1965 	{*base = EXISTS(*base,*(vals + *map)); map++;}
1966     }
1967 }
1968 
1969 
1970 
1971 /******************************************************************************
1972 Function: gather_scatter
1973 
1974 VERSION 3 ::
1975 
1976 Input :
1977 Output:
1978 Return:
1979 Description:
1980 ******************************************************************************/
1981 static
1982 void
1983 gs_gop_pairwise_exists(register gs_id *gs, register REAL *in_vals)
1984 {
1985 #if   defined NXSRC
1986   register REAL *dptr1, *dptr2, *dptr3, *in1, *in2;
1987   register int *iptr, *msg_list, *msg_size, **msg_nodes;
1988   register int *pw, *list, *size, **nodes;
1989   register int *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
1990 
1991 
1992 #ifdef DEBUG
1993   error_msg_warning("start gs_gop_xxx()\n");
1994 #endif
1995 
1996   /* strip and load registers */
1997   msg_list =list         = gs->pair_list;
1998   msg_size =size         = gs->msg_sizes;
1999   msg_nodes=nodes        = gs->node_list;
2000   iptr=pw                = gs->pw_elm_list;
2001   dptr1=dptr3            = gs->pw_vals;
2002   msg_ids_in  = ids_in   = gs->msg_ids_in;
2003   msg_ids_out = ids_out  = gs->msg_ids_out;
2004   dptr2                  = gs->out;
2005   in1=in2                = gs->in;
2006 
2007   /* post the receives */
2008   do
2009     {
2010       *msg_ids_in++ = (int) irecv(MSGTAG1 + *list++,(char *)in1,*size*REAL_LEN);
2011       in1 += *size++;
2012     }
2013   while (*msg_ids_in >= 0);
2014 
2015   /* load gs values into in out gs buffers */
2016   while (*iptr >= 0)
2017     {*dptr3++ = *(in_vals + *iptr++);}
2018 
2019   /* load out buffers and post the sends */
2020   while (iptr = *msg_nodes++)
2021     {
2022       dptr3 = dptr2;
2023       while (*iptr >= 0)
2024 	{*dptr2++ = *(dptr1 + *iptr++);}
2025       *msg_ids_out++ = (int) isend(MSGTAG1+my_id,(char *)dptr3,*(msg_size++)*REAL_LEN,
2026 			     *msg_list++,0);
2027     }
2028 
2029   /* post the receives ... was here*/
2030   if (gs->max_left_over)
2031     {gs_gop_tree_exists(gs,in_vals);}
2032 
2033   /* process the received data */
2034   while (iptr = *nodes++)
2035     {
2036       msgwait(*ids_in++);
2037       while (*iptr >= 0)
2038 	{*(dptr1 + *iptr) = EXISTS(*(dptr1 + *iptr),*in2); iptr++; in2++;}
2039     }
2040 
2041   /* replace vals */
2042   while (*pw >= 0)
2043     {*(in_vals + *pw++) = *dptr1++;}
2044 
2045   /* clear isend message handles */
2046   while (*ids_out >= 0)
2047     {msgwait(*ids_out++);}
2048 
2049 #elif defined MPISRC
2050   register REAL *dptr1, *dptr2, *dptr3, *in1, *in2;
2051   register int *iptr, *msg_list, *msg_size, **msg_nodes;
2052   register int *pw, *list, *size, **nodes;
2053   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
2054   MPI_Status status;
2055 
2056 
2057   /* strip and load registers */
2058   msg_list =list         = gs->pair_list;
2059   msg_size =size         = gs->msg_sizes;
2060   msg_nodes=nodes        = gs->node_list;
2061   iptr=pw                = gs->pw_elm_list;
2062   dptr1=dptr3            = gs->pw_vals;
2063   msg_ids_in  = ids_in   = gs->msg_ids_in;
2064   msg_ids_out = ids_out  = gs->msg_ids_out;
2065   dptr2                  = gs->out;
2066   in1=in2                = gs->in;
2067 
2068   /* post the receives */
2069   /*  msg_nodes=nodes; */
2070   do
2071     {
2072       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
2073 	 second one *list and do list++ afterwards */
2074       MPI_Irecv(in1, *size, REAL_TYPE, MPI_ANY_SOURCE, MSGTAG1 + *list++,
2075 		gs->gs_comm, msg_ids_in++);
2076       in1 += *size++;
2077     }
2078   while (*++msg_nodes);
2079   msg_nodes=nodes;
2080 
2081   /* load gs values into in out gs buffers */
2082   while (*iptr >= 0)
2083     {*dptr3++ = *(in_vals + *iptr++);}
2084 
2085   /* load out buffers and post the sends */
2086   while ((iptr = *msg_nodes++))
2087     {
2088       dptr3 = dptr2;
2089       while (*iptr >= 0)
2090 	{*dptr2++ = *(dptr1 + *iptr++);}
2091       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
2092       /* is msg_ids_out++ correct? */
2093       MPI_Isend(dptr3, *msg_size++, REAL_TYPE, *msg_list++,
2094 		MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);
2095     }
2096 
2097   if (gs->max_left_over)
2098     {gs_gop_tree_exists(gs,in_vals);}
2099 
2100   /* process the received data */
2101   msg_nodes=nodes;
2102   while ((iptr = *nodes++))
2103     {
2104       /* Should I check the return value of MPI_Wait() or status? */
2105       /* Can this loop be replaced by a call to MPI_Waitall()? */
2106       MPI_Wait(ids_in++, &status);
2107       while (*iptr >= 0)
2108 	{*(dptr1 + *iptr) = EXISTS(*(dptr1 + *iptr),*in2); iptr++; in2++;}
2109     }
2110 
2111   /* replace vals */
2112   while (*pw >= 0)
2113     {*(in_vals + *pw++) = *dptr1++;}
2114 
2115   /* clear isend message handles */
2116   /* This changed for clarity though it could be the same */
2117   while (*msg_nodes++)
2118     /* Should I check the return value of MPI_Wait() or status? */
2119     /* Can this loop be replaced by a call to MPI_Waitall()? */
2120     {MPI_Wait(ids_out++, &status);}
2121 #else
2122   return;
2123 #endif
2124 }
2125 
2126 
2127 
2128 /******************************************************************************
2129 Function: gather_scatter
2130 
2131 Input :
2132 Output:
2133 Return:
2134 Description:
2135 ******************************************************************************/
2136 static
2137 void
2138 gs_gop_tree_exists(gs_id *gs, REAL *vals)
2139 {
2140   int size;
2141   int *in, *out;
2142   REAL *buf, *work;
2143   int op[] = {GL_EXISTS,0};
2144 
2145 
2146 #ifdef DEBUG
2147   error_msg_warning("start gs_gop_tree_exists()");
2148 #endif
2149 
2150   in   = gs->tree_map_in;
2151   out  = gs->tree_map_out;
2152   buf  = gs->tree_buf;
2153   work = gs->tree_work;
2154   size = gs->tree_nel;
2155 
2156 #if defined  BLAS||CBLAS
2157   *work = 0.0;
2158   copy(size,work,0,buf,1);
2159 #else
2160   rvec_zero(buf,size);
2161 #endif
2162 
2163   while (*in >= 0)
2164     {
2165       /*
2166       printf("%d :: out=%d\n",my_id,*out);
2167       printf("%d :: in=%d\n",my_id,*in);
2168       */
2169       *(buf + *out++) = *(vals + *in++);
2170     }
2171 
2172   grop(buf,work,size,op);
2173 
2174   in   = gs->tree_map_in;
2175   out  = gs->tree_map_out;
2176 
2177   while (*in >= 0)
2178     {*(vals + *in++) = *(buf + *out++);}
2179 
2180 #ifdef DEBUG
2181   error_msg_warning("start gs_gop_tree_exists()");
2182 #endif
2183 }
2184 
2185 
2186 
2187 /******************************************************************************
2188 Function: gather_scatter
2189 
2190 Input :
2191 Output:
2192 Return:
2193 Description:
2194 ******************************************************************************/
2195 static void
2196 gs_gop_max_abs(register gs_id *gs, register REAL *vals)
2197 {
2198 #ifdef DEBUG
2199   error_msg_warning("start gs_gop_xxx()\n");
2200 #endif
2201 
2202   /* local only operations!!! */
2203   if (gs->num_local)
2204     {gs_gop_local_max_abs(gs,vals);}
2205 
2206   /* if intersection tree/pairwise and local isn't empty */
2207   if (gs->num_local_gop)
2208     {
2209       gs_gop_local_in_max_abs(gs,vals);
2210 
2211       /* pairwise */
2212       if (gs->num_pairs)
2213 	{gs_gop_pairwise_max_abs(gs,vals);}
2214 
2215       /* tree */
2216       else if (gs->max_left_over)
2217 	{gs_gop_tree_max_abs(gs,vals);}
2218 
2219       gs_gop_local_out(gs,vals);
2220     }
2221   /* if intersection tree/pairwise and local is empty */
2222   else
2223     {
2224       /* pairwise */
2225       if (gs->num_pairs)
2226 	{gs_gop_pairwise_max_abs(gs,vals);}
2227 
2228       /* tree */
2229       else if (gs->max_left_over)
2230 	{gs_gop_tree_max_abs(gs,vals);}
2231     }
2232 }
2233 
2234 
2235 
2236 /******************************************************************************
2237 Function: gather_scatter
2238 
2239 Input :
2240 Output:
2241 Return:
2242 Description:
2243 ******************************************************************************/
2244 static
2245 void
2246 gs_gop_local_max_abs(register gs_id *gs, register REAL *vals)
2247 {
2248   register int *num, *map, **reduce;
2249   register REAL tmp;
2250 
2251 
2252 #ifdef DEBUG
2253   error_msg_warning("start gs_gop_xxx()\n");
2254 #endif
2255 
2256   num    = gs->num_local_reduce;
2257   reduce = gs->local_reduce;
2258   while ((map = *reduce))
2259     {
2260       num ++;
2261       tmp = 0.0;
2262       while (*map >= 0)
2263 	{tmp = MAX_FABS(tmp,*(vals + *map)); map++;}
2264 
2265       map = *reduce++;
2266       while (*map >= 0)
2267 	{*(vals + *map++) = tmp;}
2268     }
2269 }
2270 
2271 
2272 
2273 /******************************************************************************
2274 Function: gather_scatter
2275 
2276 Input :
2277 Output:
2278 Return:
2279 Description:
2280 ******************************************************************************/
2281 static
2282 void
2283 gs_gop_local_in_max_abs(register gs_id *gs, register REAL *vals)
2284 {
2285   register int *num, *map, **reduce;
2286   register REAL *base;
2287 
2288 
2289 #ifdef DEBUG
2290   error_msg_warning("start gs_gop_xxx()\n");
2291 #endif
2292 
2293   num    = gs->num_gop_local_reduce;
2294   reduce = gs->gop_local_reduce;
2295   while ((map = *reduce++))
2296     {
2297       num++;
2298       base = vals + *map++;
2299       while (*map >= 0)
2300 	{*base = MAX_FABS(*base,*(vals + *map)); map++;}
2301     }
2302 }
2303 
2304 
2305 
2306 /******************************************************************************
2307 Function: gather_scatter
2308 
2309 VERSION 3 ::
2310 
2311 Input :
2312 Output:
2313 Return:
2314 Description:
2315 ******************************************************************************/
2316 static
2317 void
2318 gs_gop_pairwise_max_abs(register gs_id *gs, register REAL *in_vals)
2319 {
2320 #if   defined NXSRC
2321   register REAL *dptr1, *dptr2, *dptr3, *in1, *in2;
2322   register int *iptr, *msg_list, *msg_size, **msg_nodes;
2323   register int *pw, *list, *size, **nodes;
2324   register int *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
2325 
2326 
2327 
2328 #ifdef DEBUG
2329   error_msg_warning("start gs_gop_xxx()\n");
2330 #endif
2331 
2332   /* strip and load registers */
2333   msg_list =list         = gs->pair_list;
2334   msg_size =size         = gs->msg_sizes;
2335   msg_nodes=nodes        = gs->node_list;
2336   iptr=pw                = gs->pw_elm_list;
2337   dptr1=dptr3            = gs->pw_vals;
2338   msg_ids_in  = ids_in   = gs->msg_ids_in;
2339   msg_ids_out = ids_out  = gs->msg_ids_out;
2340   dptr2                  = gs->out;
2341   in1=in2                = gs->in;
2342 
2343   /* post the receives */
2344   do
2345     {
2346       *msg_ids_in++ = (int) irecv(MSGTAG1 + *list++,(char *)in1,*size*REAL_LEN);
2347       in1 += *size++;
2348     }
2349   while (*msg_ids_in >= 0);
2350 
2351   /* load gs values into in out gs buffers */
2352   while (*iptr >= 0)
2353     {*dptr3++ = *(in_vals + *iptr++);}
2354 
2355   /* load out buffers and post the sends */
2356   while (iptr = *msg_nodes++)
2357     {
2358       dptr3 = dptr2;
2359       while (*iptr >= 0)
2360 	{*dptr2++ = *(dptr1 + *iptr++);}
2361       *msg_ids_out++ = (int) isend(MSGTAG1+my_id,(char *)dptr3,*(msg_size++)*REAL_LEN,
2362 			     *msg_list++,0);
2363     }
2364 
2365   /* post the receives ... was here*/
2366   if (gs->max_left_over)
2367     {gs_gop_tree_max_abs(gs,in_vals);}
2368 
2369   /* process the received data */
2370   while (iptr = *nodes++)
2371     {
2372       msgwait(*ids_in++);
2373       while (*iptr >= 0)
2374 	{*(dptr1 + *iptr) = MAX_FABS(*(dptr1 + *iptr),*in2); iptr++; in2++;}
2375     }
2376 
2377   /* replace vals */
2378   while (*pw >= 0)
2379     {*(in_vals + *pw++) = *dptr1++;}
2380 
2381   /* clear isend message handles */
2382   while (*ids_out >= 0)
2383     {msgwait(*ids_out++);}
2384 
2385 #elif defined MPISRC
2386   register REAL *dptr1, *dptr2, *dptr3, *in1, *in2;
2387   register int *iptr, *msg_list, *msg_size, **msg_nodes;
2388   register int *pw, *list, *size, **nodes;
2389   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
2390   MPI_Status status;
2391 
2392 
2393   /* strip and load registers */
2394   msg_list =list         = gs->pair_list;
2395   msg_size =size         = gs->msg_sizes;
2396   msg_nodes=nodes        = gs->node_list;
2397   iptr=pw                = gs->pw_elm_list;
2398   dptr1=dptr3            = gs->pw_vals;
2399   msg_ids_in  = ids_in   = gs->msg_ids_in;
2400   msg_ids_out = ids_out  = gs->msg_ids_out;
2401   dptr2                  = gs->out;
2402   in1=in2                = gs->in;
2403 
2404   /* post the receives */
2405   /*  msg_nodes=nodes; */
2406   do
2407     {
2408       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
2409 	 second one *list and do list++ afterwards */
2410       MPI_Irecv(in1, *size, REAL_TYPE, MPI_ANY_SOURCE, MSGTAG1 + *list++,
2411 		gs->gs_comm, msg_ids_in++);
2412       in1 += *size++;
2413     }
2414   while (*++msg_nodes);
2415   msg_nodes=nodes;
2416 
2417   /* load gs values into in out gs buffers */
2418   while (*iptr >= 0)
2419     {*dptr3++ = *(in_vals + *iptr++);}
2420 
2421   /* load out buffers and post the sends */
2422   while ((iptr = *msg_nodes++))
2423     {
2424       dptr3 = dptr2;
2425       while (*iptr >= 0)
2426 	{*dptr2++ = *(dptr1 + *iptr++);}
2427       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
2428       /* is msg_ids_out++ correct? */
2429       MPI_Isend(dptr3, *msg_size++, REAL_TYPE, *msg_list++,
2430 		MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);
2431     }
2432 
2433   if (gs->max_left_over)
2434     {gs_gop_tree_max_abs(gs,in_vals);}
2435 
2436   /* process the received data */
2437   msg_nodes=nodes;
2438   while ((iptr = *nodes++))
2439     {
2440       /* Should I check the return value of MPI_Wait() or status? */
2441       /* Can this loop be replaced by a call to MPI_Waitall()? */
2442       MPI_Wait(ids_in++, &status);
2443       while (*iptr >= 0)
2444 	{*(dptr1 + *iptr) = MAX_FABS(*(dptr1 + *iptr),*in2); iptr++; in2++;}
2445     }
2446 
2447   /* replace vals */
2448   while (*pw >= 0)
2449     {*(in_vals + *pw++) = *dptr1++;}
2450 
2451   /* clear isend message handles */
2452   /* This changed for clarity though it could be the same */
2453   while (*msg_nodes++)
2454     /* Should I check the return value of MPI_Wait() or status? */
2455     /* Can this loop be replaced by a call to MPI_Waitall()? */
2456     {MPI_Wait(ids_out++, &status);}
2457 #else
2458   return;
2459 #endif
2460 }
2461 
2462 
2463 
2464 /******************************************************************************
2465 Function: gather_scatter
2466 
2467 Input :
2468 Output:
2469 Return:
2470 Description:
2471 ******************************************************************************/
2472 static
2473 void
2474 gs_gop_tree_max_abs(gs_id *gs, REAL *vals)
2475 {
2476   int size;
2477   int *in, *out;
2478   REAL *buf, *work;
2479   int op[] = {GL_MAX_ABS,0};
2480 
2481 
2482 #ifdef DEBUG
2483   error_msg_warning("start gs_gop_tree_max_abs()");
2484 #endif
2485 
2486   in   = gs->tree_map_in;
2487   out  = gs->tree_map_out;
2488   buf  = gs->tree_buf;
2489   work = gs->tree_work;
2490   size = gs->tree_nel;
2491 
2492 #if defined BLAS||CBLAS
2493   *work = 0.0;
2494   copy(size,work,0,buf,1);
2495 #else
2496   rvec_zero(buf,size);
2497 #endif
2498 
2499   while (*in >= 0)
2500     {
2501       /*
2502       printf("%d :: out=%d\n",my_id,*out);
2503       printf("%d :: in=%d\n",my_id,*in);
2504       */
2505       *(buf + *out++) = *(vals + *in++);
2506     }
2507 
2508   grop(buf,work,size,op);
2509 
2510   in   = gs->tree_map_in;
2511   out  = gs->tree_map_out;
2512 
2513   while (*in >= 0)
2514     {*(vals + *in++) = *(buf + *out++);}
2515 
2516 #ifdef DEBUG
2517   error_msg_warning("start gs_gop_tree_max_abs()");
2518 #endif
2519 }
2520 
2521 
2522 
2523 /******************************************************************************
2524 Function: gather_scatter
2525 
2526 Input :
2527 Output:
2528 Return:
2529 Description:
2530 ******************************************************************************/
2531 static void
2532 gs_gop_max(register gs_id *gs, register REAL *vals)
2533 {
2534 #ifdef DEBUG
2535   error_msg_warning("start gs_gop_xxx()\n");
2536 #endif
2537 
2538 
2539   /* local only operations!!! */
2540   if (gs->num_local)
2541     {gs_gop_local_max(gs,vals);}
2542 
2543   /* if intersection tree/pairwise and local isn't empty */
2544   if (gs->num_local_gop)
2545     {
2546       gs_gop_local_in_max(gs,vals);
2547 
2548       /* pairwise */
2549       if (gs->num_pairs)
2550 	{gs_gop_pairwise_max(gs,vals);}
2551 
2552       /* tree */
2553       else if (gs->max_left_over)
2554 	{gs_gop_tree_max(gs,vals);}
2555 
2556       gs_gop_local_out(gs,vals);
2557     }
2558   /* if intersection tree/pairwise and local is empty */
2559   else
2560     {
2561       /* pairwise */
2562       if (gs->num_pairs)
2563 	{gs_gop_pairwise_max(gs,vals);}
2564 
2565       /* tree */
2566       else if (gs->max_left_over)
2567 	{gs_gop_tree_max(gs,vals);}
2568     }
2569 }
2570 
2571 
2572 
2573 /******************************************************************************
2574 Function: gather_scatter
2575 
2576 Input :
2577 Output:
2578 Return:
2579 Description:
2580 ******************************************************************************/
2581 static
2582 void
2583 gs_gop_local_max(register gs_id *gs, register REAL *vals)
2584 {
2585   register int *num, *map, **reduce;
2586   register REAL tmp;
2587 
2588 
2589 #ifdef DEBUG
2590   error_msg_warning("start gs_gop_xxx()\n");
2591 #endif
2592 
2593   num    = gs->num_local_reduce;
2594   reduce = gs->local_reduce;
2595   while ((map = *reduce))
2596     {
2597       num ++;
2598       tmp = -REAL_MAX;
2599       while (*map >= 0)
2600 	{tmp = MAX(tmp,*(vals + *map)); map++;}
2601 
2602       map = *reduce++;
2603       while (*map >= 0)
2604 	{*(vals + *map++) = tmp;}
2605     }
2606 }
2607 
2608 
2609 
2610 /******************************************************************************
2611 Function: gather_scatter
2612 
2613 Input :
2614 Output:
2615 Return:
2616 Description:
2617 ******************************************************************************/
2618 static
2619 void
2620 gs_gop_local_in_max(register gs_id *gs, register REAL *vals)
2621 {
2622   register int *num, *map, **reduce;
2623   register REAL *base;
2624 
2625 
2626 #ifdef DEBUG
2627   error_msg_warning("start gs_gop_xxx()\n");
2628 #endif
2629 
2630   num    = gs->num_gop_local_reduce;
2631   reduce = gs->gop_local_reduce;
2632   while ((map = *reduce++))
2633     {
2634       num++;
2635       base = vals + *map++;
2636       while (*map >= 0)
2637 	{*base = MAX(*base,*(vals + *map)); map++;}
2638     }
2639 }
2640 
2641 
2642 
2643 /******************************************************************************
2644 Function: gather_scatter
2645 
2646 VERSION 3 ::
2647 
2648 Input :
2649 Output:
2650 Return:
2651 Description:
2652 ******************************************************************************/
2653 static
2654 void
2655 gs_gop_pairwise_max(register gs_id *gs, register REAL *in_vals)
2656 {
2657 #if   defined NXSRC
2658   register REAL *dptr1, *dptr2, *dptr3, *in1, *in2;
2659   register int *iptr, *msg_list, *msg_size, **msg_nodes;
2660   register int *pw, *list, *size, **nodes;
2661   register int *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
2662 
2663 
2664 #ifdef DEBUG
2665   error_msg_warning("start gs_gop_xxx()\n");
2666 #endif
2667 
2668   /* strip and load registers */
2669   msg_list =list         = gs->pair_list;
2670   msg_size =size         = gs->msg_sizes;
2671   msg_nodes=nodes        = gs->node_list;
2672   iptr=pw                = gs->pw_elm_list;
2673   dptr1=dptr3            = gs->pw_vals;
2674   msg_ids_in  = ids_in   = gs->msg_ids_in;
2675   msg_ids_out = ids_out  = gs->msg_ids_out;
2676   dptr2                  = gs->out;
2677   in1=in2                = gs->in;
2678 
2679   /* post the receives */
2680   do
2681     {
2682       *msg_ids_in++ = (int) irecv(MSGTAG1 + *list++,(char *)in1,*size*REAL_LEN);
2683       in1 += *size++;
2684     }
2685   while (*msg_ids_in >= 0);
2686 
2687   /* load gs values into in out gs buffers */
2688   while (*iptr >= 0)
2689     {*dptr3++ = *(in_vals + *iptr++);}
2690 
2691   /* load out buffers and post the sends */
2692   while (iptr = *msg_nodes++)
2693     {
2694       dptr3 = dptr2;
2695       while (*iptr >= 0)
2696 	{*dptr2++ = *(dptr1 + *iptr++);}
2697       *msg_ids_out++ = (int) isend(MSGTAG1+my_id,(char *)dptr3,*(msg_size++)*REAL_LEN,
2698 			     *msg_list++,0);
2699     }
2700 
2701   /* post the receives ... was here*/
2702   if (gs->max_left_over)
2703     {gs_gop_tree_max(gs,in_vals);}
2704 
2705   /* process the received data */
2706   while (iptr = *nodes++)
2707     {
2708       msgwait(*ids_in++);
2709       while (*iptr >= 0)
2710 	{*(dptr1 + *iptr) = MAX(*(dptr1 + *iptr),*in2); iptr++; in2++;}
2711     }
2712 
2713   /* replace vals */
2714   while (*pw >= 0)
2715     {*(in_vals + *pw++) = *dptr1++;}
2716 
2717   /* clear isend message handles */
2718   while (*ids_out >= 0)
2719     {msgwait(*ids_out++);}
2720 
2721 #elif defined MPISRC
2722   register REAL *dptr1, *dptr2, *dptr3, *in1, *in2;
2723   register int *iptr, *msg_list, *msg_size, **msg_nodes;
2724   register int *pw, *list, *size, **nodes;
2725   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
2726   MPI_Status status;
2727 
2728 
2729   /* strip and load registers */
2730   msg_list =list         = gs->pair_list;
2731   msg_size =size         = gs->msg_sizes;
2732   msg_nodes=nodes        = gs->node_list;
2733   iptr=pw                = gs->pw_elm_list;
2734   dptr1=dptr3            = gs->pw_vals;
2735   msg_ids_in  = ids_in   = gs->msg_ids_in;
2736   msg_ids_out = ids_out  = gs->msg_ids_out;
2737   dptr2                  = gs->out;
2738   in1=in2                = gs->in;
2739 
2740   /* post the receives */
2741   /*  msg_nodes=nodes; */
2742   do
2743     {
2744       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
2745 	 second one *list and do list++ afterwards */
2746       MPI_Irecv(in1, *size, REAL_TYPE, MPI_ANY_SOURCE, MSGTAG1 + *list++,
2747 		gs->gs_comm, msg_ids_in++);
2748       in1 += *size++;
2749     }
2750   while (*++msg_nodes);
2751   msg_nodes=nodes;
2752 
2753   /* load gs values into in out gs buffers */
2754   while (*iptr >= 0)
2755     {*dptr3++ = *(in_vals + *iptr++);}
2756 
2757   /* load out buffers and post the sends */
2758   while ((iptr = *msg_nodes++))
2759     {
2760       dptr3 = dptr2;
2761       while (*iptr >= 0)
2762 	{*dptr2++ = *(dptr1 + *iptr++);}
2763       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
2764       /* is msg_ids_out++ correct? */
2765       MPI_Isend(dptr3, *msg_size++, REAL_TYPE, *msg_list++,
2766 		MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);
2767     }
2768 
2769   if (gs->max_left_over)
2770     {gs_gop_tree_max(gs,in_vals);}
2771 
2772   /* process the received data */
2773   msg_nodes=nodes;
2774   while ((iptr = *nodes++))
2775     {
2776       /* Should I check the return value of MPI_Wait() or status? */
2777       /* Can this loop be replaced by a call to MPI_Waitall()? */
2778       MPI_Wait(ids_in++, &status);
2779       while (*iptr >= 0)
2780 	{*(dptr1 + *iptr) = MAX(*(dptr1 + *iptr),*in2); iptr++; in2++;}
2781     }
2782 
2783   /* replace vals */
2784   while (*pw >= 0)
2785     {*(in_vals + *pw++) = *dptr1++;}
2786 
2787   /* clear isend message handles */
2788   /* This changed for clarity though it could be the same */
2789   while (*msg_nodes++)
2790     /* Should I check the return value of MPI_Wait() or status? */
2791     /* Can this loop be replaced by a call to MPI_Waitall()? */
2792     {MPI_Wait(ids_out++, &status);}
2793 #else
2794   return;
2795 #endif
2796 }
2797 
2798 
2799 
2800 /******************************************************************************
2801 Function: gather_scatter
2802 
2803 Input :
2804 Output:
2805 Return:
2806 Description:
2807 ******************************************************************************/
2808 static
2809 void
2810 gs_gop_tree_max(gs_id *gs, REAL *vals)
2811 {
2812   int size;
2813   int *in, *out;
2814   REAL *buf, *work;
2815   /* int op[] = {GL_MAX,0}; */
2816 
2817 
2818 #ifdef DEBUG
2819   error_msg_warning("start gs_gop_tree_max()");
2820 #endif
2821 
2822   in   = gs->tree_map_in;
2823   out  = gs->tree_map_out;
2824   buf  = gs->tree_buf;
2825   work = gs->tree_work;
2826   size = gs->tree_nel;
2827 
2828 #if defined BLAS||CBLAS
2829   *work = -REAL_MAX;
2830   copy(size,work,0,buf,1);
2831 #else
2832   rvec_set(buf,-REAL_MAX,size);
2833 #endif
2834 
2835   while (*in >= 0)
2836     {*(buf + *out++) = *(vals + *in++);}
2837 
2838   in   = gs->tree_map_in;
2839   out  = gs->tree_map_out;
2840 #if defined(NXSRC) && defined(r8)
2841   gdhigh(buf,size,work);
2842   while (*in >= 0)
2843     {*(vals + *in++) = *(buf + *out++);}
2844 #elif defined MPISRC
2845   MPI_Allreduce(buf,work,size,REAL_TYPE,MPI_MAX,gs->gs_comm);
2846   while (*in >= 0)
2847     {*(vals + *in++) = *(work + *out++);}
2848 #else
2849   grop(buf,work,size,op);
2850   while (*in >= 0)
2851     {*(vals + *in++) = *(buf + *out++);}
2852 #endif
2853 
2854 #ifdef DEBUG
2855   error_msg_warning("end gs_gop_tree_max()");
2856 #endif
2857 }
2858 
2859 
2860 
2861 /******************************************************************************
2862 Function: gather_scatter
2863 
2864 Input :
2865 Output:
2866 Return:
2867 Description:
2868 ******************************************************************************/
2869 static void
2870 gs_gop_min_abs(register gs_id *gs, register REAL *vals)
2871 {
2872 #ifdef DEBUG
2873   error_msg_warning("start gs_gop_xxx()\n");
2874 #endif
2875 
2876   /* local only operations!!! */
2877   if (gs->num_local)
2878     {gs_gop_local_min_abs(gs,vals);}
2879 
2880   /* if intersection tree/pairwise and local isn't empty */
2881   if (gs->num_local_gop)
2882     {
2883       gs_gop_local_in_min_abs(gs,vals);
2884 
2885       /* pairwise */
2886       if (gs->num_pairs)
2887 	{gs_gop_pairwise_min_abs(gs,vals);}
2888 
2889       /* tree */
2890       else if (gs->max_left_over)
2891 	{gs_gop_tree_min_abs(gs,vals);}
2892 
2893       gs_gop_local_out(gs,vals);
2894     }
2895   /* if intersection tree/pairwise and local is empty */
2896   else
2897     {
2898       /* pairwise */
2899       if (gs->num_pairs)
2900 	{gs_gop_pairwise_min_abs(gs,vals);}
2901 
2902       /* tree */
2903       else if (gs->max_left_over)
2904 	{gs_gop_tree_min_abs(gs,vals);}
2905     }
2906 }
2907 
2908 
2909 
2910 /******************************************************************************
2911 Function: gather_scatter
2912 
2913 Input :
2914 Output:
2915 Return:
2916 Description:
2917 ******************************************************************************/
2918 static
2919 void
2920 gs_gop_local_min_abs(register gs_id *gs, register REAL *vals)
2921 {
2922   register int *num, *map, **reduce;
2923   register REAL tmp;
2924 
2925 
2926 #ifdef DEBUG
2927   error_msg_warning("start gs_gop_xxx()\n");
2928 #endif
2929 
2930   num    = gs->num_local_reduce;
2931   reduce = gs->local_reduce;
2932   while ((map = *reduce))
2933     {
2934       num ++;
2935       tmp = REAL_MAX;
2936       while (*map >= 0)
2937 	{tmp = MIN_FABS(tmp,*(vals + *map)); map++;}
2938 
2939       map = *reduce++;
2940       while (*map >= 0)
2941 	{*(vals + *map++) = tmp;}
2942     }
2943 }
2944 
2945 
2946 
2947 /******************************************************************************
2948 Function: gather_scatter
2949 
2950 Input :
2951 Output:
2952 Return:
2953 Description:
2954 ******************************************************************************/
2955 static
2956 void
2957 gs_gop_local_in_min_abs(register gs_id *gs, register REAL *vals)
2958 {
2959   register int *num, *map, **reduce;
2960   register REAL *base;
2961 
2962 #ifdef DEBUG
2963   error_msg_warning("start gs_gop_xxx()\n");
2964 #endif
2965 
2966   num    = gs->num_gop_local_reduce;
2967   reduce = gs->gop_local_reduce;
2968   while ((map = *reduce++))
2969     {
2970       num++;
2971       base = vals + *map++;
2972       while (*map >= 0)
2973 	{*base = MIN_FABS(*base,*(vals + *map)); map++;}
2974     }
2975 }
2976 
2977 
2978 
2979 /******************************************************************************
2980 Function: gather_scatter
2981 
2982 VERSION 3 ::
2983 
2984 Input :
2985 Output:
2986 Return:
2987 Description:
2988 ******************************************************************************/
2989 static
2990 void
2991 gs_gop_pairwise_min_abs(register gs_id *gs, register REAL *in_vals)
2992 {
2993 #if   defined NXSRC
2994   register REAL *dptr1, *dptr2, *dptr3, *in1, *in2;
2995   register int *iptr, *msg_list, *msg_size, **msg_nodes;
2996   register int *pw, *list, *size, **nodes;
2997   register int *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
2998 
2999 
3000 #ifdef DEBUG
3001   error_msg_warning("start gs_gop_xxx()\n");
3002 #endif
3003 
3004   /* strip and load registers */
3005   msg_list =list         = gs->pair_list;
3006   msg_size =size         = gs->msg_sizes;
3007   msg_nodes=nodes        = gs->node_list;
3008   iptr=pw                = gs->pw_elm_list;
3009   dptr1=dptr3            = gs->pw_vals;
3010   msg_ids_in  = ids_in   = gs->msg_ids_in;
3011   msg_ids_out = ids_out  = gs->msg_ids_out;
3012   dptr2                  = gs->out;
3013   in1=in2                = gs->in;
3014 
3015   /* post the receives */
3016   do
3017     {
3018       *msg_ids_in++ = (int) irecv(MSGTAG1 + *list++,(char *)in1,*size*REAL_LEN);
3019       in1 += *size++;
3020     }
3021   while (*msg_ids_in >= 0);
3022 
3023   /* load gs values into in out gs buffers */
3024   while (*iptr >= 0)
3025     {*dptr3++ = *(in_vals + *iptr++);}
3026 
3027   /* load out buffers and post the sends */
3028   while (iptr = *msg_nodes++)
3029     {
3030       dptr3 = dptr2;
3031       while (*iptr >= 0)
3032 	{*dptr2++ = *(dptr1 + *iptr++);}
3033       *msg_ids_out++ = (int) isend(MSGTAG1+my_id,(char *)dptr3,*(msg_size++)*REAL_LEN,
3034 			     *msg_list++,0);
3035     }
3036 
3037   /* post the receives ... was here*/
3038   if (gs->max_left_over)
3039     {gs_gop_tree_min_abs(gs,in_vals);}
3040 
3041   /* process the received data */
3042   while (iptr = *nodes++)
3043     {
3044       msgwait(*ids_in++);
3045       while (*iptr >= 0)
3046 	{*(dptr1 + *iptr) = MIN_FABS(*(dptr1 + *iptr),*in2); iptr++; in2++;}
3047     }
3048 
3049   /* replace vals */
3050   while (*pw >= 0)
3051     {*(in_vals + *pw++) = *dptr1++;}
3052 
3053   /* clear isend message handles */
3054   while (*ids_out >= 0)
3055     {msgwait(*ids_out++);}
3056 
3057 #elif defined MPISRC
3058   register REAL *dptr1, *dptr2, *dptr3, *in1, *in2;
3059   register int *iptr, *msg_list, *msg_size, **msg_nodes;
3060   register int *pw, *list, *size, **nodes;
3061   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
3062   MPI_Status status;
3063 
3064 
3065   /* strip and load registers */
3066   msg_list =list         = gs->pair_list;
3067   msg_size =size         = gs->msg_sizes;
3068   msg_nodes=nodes        = gs->node_list;
3069   iptr=pw                = gs->pw_elm_list;
3070   dptr1=dptr3            = gs->pw_vals;
3071   msg_ids_in  = ids_in   = gs->msg_ids_in;
3072   msg_ids_out = ids_out  = gs->msg_ids_out;
3073   dptr2                  = gs->out;
3074   in1=in2                = gs->in;
3075 
3076   /* post the receives */
3077   /*  msg_nodes=nodes; */
3078   do
3079     {
3080       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
3081 	 second one *list and do list++ afterwards */
3082       MPI_Irecv(in1, *size, REAL_TYPE, MPI_ANY_SOURCE, MSGTAG1 + *list++,
3083 		gs->gs_comm, msg_ids_in++);
3084       in1 += *size++;
3085     }
3086   while (*++msg_nodes);
3087   msg_nodes=nodes;
3088 
3089   /* load gs values into in out gs buffers */
3090   while (*iptr >= 0)
3091     {*dptr3++ = *(in_vals + *iptr++);}
3092 
3093   /* load out buffers and post the sends */
3094   while ((iptr = *msg_nodes++))
3095     {
3096       dptr3 = dptr2;
3097       while (*iptr >= 0)
3098 	{*dptr2++ = *(dptr1 + *iptr++);}
3099       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
3100       /* is msg_ids_out++ correct? */
3101       MPI_Isend(dptr3, *msg_size++, REAL_TYPE, *msg_list++,
3102 		MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);
3103     }
3104 
3105   if (gs->max_left_over)
3106     {gs_gop_tree_min_abs(gs,in_vals);}
3107 
3108   /* process the received data */
3109   msg_nodes=nodes;
3110   while ((iptr = *nodes++))
3111     {
3112       /* Should I check the return value of MPI_Wait() or status? */
3113       /* Can this loop be replaced by a call to MPI_Waitall()? */
3114       MPI_Wait(ids_in++, &status);
3115       while (*iptr >= 0)
3116 	{*(dptr1 + *iptr) = MIN_FABS(*(dptr1 + *iptr),*in2); iptr++; in2++;}
3117     }
3118 
3119   /* replace vals */
3120   while (*pw >= 0)
3121     {*(in_vals + *pw++) = *dptr1++;}
3122 
3123   /* clear isend message handles */
3124   /* This changed for clarity though it could be the same */
3125   while (*msg_nodes++)
3126     /* Should I check the return value of MPI_Wait() or status? */
3127     /* Can this loop be replaced by a call to MPI_Waitall()? */
3128     {MPI_Wait(ids_out++, &status);}
3129 #else
3130   return;
3131 #endif
3132 }
3133 
3134 
3135 
3136 /******************************************************************************
3137 Function: gather_scatter
3138 
3139 Input :
3140 Output:
3141 Return:
3142 Description:
3143 ******************************************************************************/
3144 static
3145 void
3146 gs_gop_tree_min_abs(gs_id *gs, REAL *vals)
3147 {
3148   int size;
3149   int *in, *out;
3150   REAL *buf, *work;
3151   int op[] = {GL_MIN_ABS,0};
3152 
3153 
3154 #ifdef DEBUG
3155   error_msg_warning("start gs_gop_tree_min_abs()");
3156 #endif
3157 
3158   in   = gs->tree_map_in;
3159   out  = gs->tree_map_out;
3160   buf  = gs->tree_buf;
3161   work = gs->tree_work;
3162   size = gs->tree_nel;
3163 
3164 #if defined  BLAS||CBLAS
3165   *work = REAL_MAX;
3166   copy(size,work,0,buf,1);
3167 #else
3168   rvec_set(buf,REAL_MAX,size);
3169 #endif
3170 
3171   while (*in >= 0)
3172     {*(buf + *out++) = *(vals + *in++);}
3173 
3174   in   = gs->tree_map_in;
3175   out  = gs->tree_map_out;
3176   grop(buf,work,size,op);
3177   while (*in >= 0)
3178     {*(vals + *in++) = *(buf + *out++);}
3179 
3180 #ifdef DEBUG
3181   error_msg_warning("end gs_gop_tree_min_abs()");
3182 #endif
3183 }
3184 
3185 
3186 
3187 /******************************************************************************
3188 Function: gather_scatter
3189 
3190 Input :
3191 Output:
3192 Return:
3193 Description:
3194 ******************************************************************************/
3195 static void
3196 gs_gop_min(register gs_id *gs, register REAL *vals)
3197 {
3198 #ifdef DEBUG
3199   error_msg_warning("start gs_gop_xxx()\n");
3200 #endif
3201 
3202   /* local only operations!!! */
3203   if (gs->num_local)
3204     {gs_gop_local_min(gs,vals);}
3205 
3206   /* if intersection tree/pairwise and local isn't empty */
3207   if (gs->num_local_gop)
3208     {
3209       gs_gop_local_in_min(gs,vals);
3210 
3211       /* pairwise */
3212       if (gs->num_pairs)
3213 	{gs_gop_pairwise_min(gs,vals);}
3214 
3215       /* tree */
3216       else if (gs->max_left_over)
3217 	{gs_gop_tree_min(gs,vals);}
3218 
3219       gs_gop_local_out(gs,vals);
3220     }
3221   /* if intersection tree/pairwise and local is empty */
3222   else
3223     {
3224       /* pairwise */
3225       if (gs->num_pairs)
3226 	{gs_gop_pairwise_min(gs,vals);}
3227 
3228       /* tree */
3229       else if (gs->max_left_over)
3230 	{gs_gop_tree_min(gs,vals);}
3231     }
3232 #ifdef DEBUG
3233   error_msg_warning("end gs_gop_xxx()\n");
3234 #endif
3235 }
3236 
3237 
3238 
3239 /******************************************************************************
3240 Function: gather_scatter
3241 
3242 Input :
3243 Output:
3244 Return:
3245 Description:
3246 ******************************************************************************/
3247 static
3248 void
3249 gs_gop_local_min(register gs_id *gs, register REAL *vals)
3250 {
3251   register int *num, *map, **reduce;
3252   register REAL tmp;
3253 
3254 
3255 #ifdef DEBUG
3256   error_msg_warning("start gs_gop_xxx()\n");
3257 #endif
3258 
3259   num    = gs->num_local_reduce;
3260   reduce = gs->local_reduce;
3261   while ((map = *reduce))
3262     {
3263       num ++;
3264       tmp = REAL_MAX;
3265       while (*map >= 0)
3266 	{tmp = MIN(tmp,*(vals + *map)); map++;}
3267 
3268       map = *reduce++;
3269       while (*map >= 0)
3270 	{*(vals + *map++) = tmp;}
3271     }
3272 }
3273 
3274 
3275 
3276 /******************************************************************************
3277 Function: gather_scatter
3278 
3279 Input :
3280 Output:
3281 Return:
3282 Description:
3283 ******************************************************************************/
3284 static
3285 void
3286 gs_gop_local_in_min(register gs_id *gs, register REAL *vals)
3287 {
3288   register int *num, *map, **reduce;
3289   register REAL *base;
3290 
3291 
3292 #ifdef DEBUG
3293   error_msg_warning("start gs_gop_xxx()\n");
3294 #endif
3295 
3296   num    = gs->num_gop_local_reduce;
3297   reduce = gs->gop_local_reduce;
3298   while ((map = *reduce++))
3299     {
3300       num++;
3301       base = vals + *map++;
3302       while (*map >= 0)
3303 	{*base = MIN(*base,*(vals + *map)); map++;}
3304     }
3305 }
3306 
3307 
3308 
3309 /******************************************************************************
3310 Function: gather_scatter
3311 
3312 VERSION 3 ::
3313 
3314 Input :
3315 Output:
3316 Return:
3317 Description:
3318 ******************************************************************************/
3319 static
3320 void
3321 gs_gop_pairwise_min(register gs_id *gs, register REAL *in_vals)
3322 {
3323 #if   defined NXSRC
3324   register REAL *dptr1, *dptr2, *dptr3, *in1, *in2;
3325   register int *iptr, *msg_list, *msg_size, **msg_nodes;
3326   register int *pw, *list, *size, **nodes;
3327   register int *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
3328 
3329 
3330 #ifdef DEBUG
3331   error_msg_warning("start gs_gop_xxx()\n");
3332 #endif
3333 
3334   /* strip and load registers */
3335   msg_list =list         = gs->pair_list;
3336   msg_size =size         = gs->msg_sizes;
3337   msg_nodes=nodes        = gs->node_list;
3338   iptr=pw                = gs->pw_elm_list;
3339   dptr1=dptr3            = gs->pw_vals;
3340   msg_ids_in  = ids_in   = gs->msg_ids_in;
3341   msg_ids_out = ids_out  = gs->msg_ids_out;
3342   dptr2                  = gs->out;
3343   in1=in2                = gs->in;
3344 
3345   /* post the receives */
3346   do
3347     {
3348       *msg_ids_in++ = (int) irecv(MSGTAG1 + *list++,(char *)in1,*size*REAL_LEN);
3349       in1 += *size++;
3350     }
3351   while (*msg_ids_in >= 0);
3352 
3353   /* load gs values into in out gs buffers */
3354   while (*iptr >= 0)
3355     {*dptr3++ = *(in_vals + *iptr++);}
3356 
3357   /* load out buffers and post the sends */
3358   while (iptr = *msg_nodes++)
3359     {
3360       dptr3 = dptr2;
3361       while (*iptr >= 0)
3362 	{*dptr2++ = *(dptr1 + *iptr++);}
3363       *msg_ids_out++ = (int) isend(MSGTAG1+my_id,(char *)dptr3,*(msg_size++)*REAL_LEN,
3364 			     *msg_list++,0);
3365     }
3366 
3367   /* post the receives ... was here*/
3368   if (gs->max_left_over)
3369     {gs_gop_tree_min(gs,in_vals);}
3370 
3371   /* process the received data */
3372   while (iptr = *nodes++)
3373     {
3374       msgwait(*ids_in++);
3375       while (*iptr >= 0)
3376 	{*(dptr1 + *iptr) = MIN(*(dptr1 + *iptr),*in2); iptr++; in2++;}
3377     }
3378 
3379   /* replace vals */
3380   while (*pw >= 0)
3381     {*(in_vals + *pw++) = *dptr1++;}
3382 
3383   /* clear isend message handles */
3384   while (*ids_out >= 0)
3385     {msgwait(*ids_out++);}
3386 
3387 #elif defined MPISRC
3388   register REAL *dptr1, *dptr2, *dptr3, *in1, *in2;
3389   register int *iptr, *msg_list, *msg_size, **msg_nodes;
3390   register int *pw, *list, *size, **nodes;
3391   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
3392   MPI_Status status;
3393 
3394 
3395   /* strip and load registers */
3396   msg_list =list         = gs->pair_list;
3397   msg_size =size         = gs->msg_sizes;
3398   msg_nodes=nodes        = gs->node_list;
3399   iptr=pw                = gs->pw_elm_list;
3400   dptr1=dptr3            = gs->pw_vals;
3401   msg_ids_in  = ids_in   = gs->msg_ids_in;
3402   msg_ids_out = ids_out  = gs->msg_ids_out;
3403   dptr2                  = gs->out;
3404   in1=in2                = gs->in;
3405 
3406   /* post the receives */
3407   /*  msg_nodes=nodes; */
3408   do
3409     {
3410       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
3411 	 second one *list and do list++ afterwards */
3412       MPI_Irecv(in1, *size, REAL_TYPE, MPI_ANY_SOURCE, MSGTAG1 + *list++,
3413 		gs->gs_comm, msg_ids_in++);
3414       in1 += *size++;
3415     }
3416   while (*++msg_nodes);
3417   msg_nodes=nodes;
3418 
3419   /* load gs values into in out gs buffers */
3420   while (*iptr >= 0)
3421     {*dptr3++ = *(in_vals + *iptr++);}
3422 
3423   /* load out buffers and post the sends */
3424   while ((iptr = *msg_nodes++))
3425     {
3426       dptr3 = dptr2;
3427       while (*iptr >= 0)
3428 	{*dptr2++ = *(dptr1 + *iptr++);}
3429       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
3430       /* is msg_ids_out++ correct? */
3431       MPI_Isend(dptr3, *msg_size++, REAL_TYPE, *msg_list++,
3432 		MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);
3433     }
3434 
3435   /* process the received data */
3436   if (gs->max_left_over)
3437     {gs_gop_tree_min(gs,in_vals);}
3438 
3439   msg_nodes=nodes;
3440   while ((iptr = *nodes++))
3441     {
3442       /* Should I check the return value of MPI_Wait() or status? */
3443       /* Can this loop be replaced by a call to MPI_Waitall()? */
3444       MPI_Wait(ids_in++, &status);
3445       while (*iptr >= 0)
3446 	{*(dptr1 + *iptr) = MIN(*(dptr1 + *iptr),*in2); iptr++; in2++;}
3447     }
3448 
3449   /* replace vals */
3450   while (*pw >= 0)
3451     {*(in_vals + *pw++) = *dptr1++;}
3452 
3453   /* clear isend message handles */
3454   /* This changed for clarity though it could be the same */
3455   while (*msg_nodes++)
3456     /* Should I check the return value of MPI_Wait() or status? */
3457     /* Can this loop be replaced by a call to MPI_Waitall()? */
3458     {MPI_Wait(ids_out++, &status);}
3459 #else
3460   return;
3461 #endif
3462 }
3463 
3464 
3465 
3466 /******************************************************************************
3467 Function: gather_scatter
3468 
3469 Input :
3470 Output:
3471 Return:
3472 Description:
3473 ******************************************************************************/
3474 static
3475 void
3476 gs_gop_tree_min(gs_id *gs, REAL *vals)
3477 {
3478   int size;
3479   int *in, *out;
3480   REAL *buf, *work;
3481   /*int op[] = {GL_MIN,0};*/
3482 
3483 
3484 #ifdef DEBUG
3485   error_msg_warning("start gs_gop_tree_min()");
3486 #endif
3487 
3488   in   = gs->tree_map_in;
3489   out  = gs->tree_map_out;
3490   buf  = gs->tree_buf;
3491   work = gs->tree_work;
3492   size = gs->tree_nel;
3493 
3494 #if defined  BLAS||CBLAS
3495   *work = REAL_MAX;
3496   copy(size,work,0,buf,1);
3497 #else
3498   rvec_set(buf,REAL_MAX,size);
3499 #endif
3500 
3501   while (*in >= 0)
3502     {*(buf + *out++) = *(vals + *in++);}
3503 
3504   in   = gs->tree_map_in;
3505   out  = gs->tree_map_out;
3506 #if defined(NXSRC) && defined(r8)
3507   gdlow(buf,size,work);
3508   while (*in >= 0)
3509     {*(vals + *in++) = *(buf + *out++);}
3510 #elif defined MPISRC
3511   MPI_Allreduce(buf,work,size,REAL_TYPE,MPI_MIN,gs->gs_comm);
3512   while (*in >= 0)
3513     {*(vals + *in++) = *(work + *out++);}
3514 #else
3515   grop(buf,work,size,op);
3516   while (*in >= 0)
3517     {*(vals + *in++) = *(buf + *out++);}
3518 #endif
3519 
3520 #ifdef DEBUG
3521   error_msg_warning("end gs_gop_tree_min()");
3522 #endif
3523 }
3524 
3525 
3526 
3527 /******************************************************************************
3528 Function: gather_scatter
3529 
3530 Input :
3531 Output:
3532 Return:
3533 Description:
3534 ******************************************************************************/
3535 static void
3536 gs_gop_times(register gs_id *gs, register REAL *vals)
3537 {
3538 #ifdef DEBUG
3539   error_msg_warning("start gs_gop_times()\n");
3540 #endif
3541 
3542   /* local only operations!!! */
3543   if (gs->num_local)
3544     {gs_gop_local_times(gs,vals);}
3545 
3546   /* if intersection tree/pairwise and local isn't empty */
3547   if (gs->num_local_gop)
3548     {
3549       gs_gop_local_in_times(gs,vals);
3550 
3551       /* pairwise */
3552       if (gs->num_pairs)
3553 	{gs_gop_pairwise_times(gs,vals);}
3554 
3555       /* tree */
3556       else if (gs->max_left_over)
3557 	{gs_gop_tree_times(gs,vals);}
3558 
3559       gs_gop_local_out(gs,vals);
3560     }
3561   /* if intersection tree/pairwise and local is empty */
3562   else
3563     {
3564       /* pairwise */
3565       if (gs->num_pairs)
3566 	{gs_gop_pairwise_times(gs,vals);}
3567 
3568       /* tree */
3569       else if (gs->max_left_over)
3570 	{gs_gop_tree_times(gs,vals);}
3571     }
3572 }
3573 
3574 
3575 
3576 /******************************************************************************
3577 Function: gather_scatter
3578 
3579 Input :
3580 Output:
3581 Return:
3582 Description:
3583 ******************************************************************************/
3584 static
3585 void
3586 gs_gop_local_times(register gs_id *gs, register REAL *vals)
3587 {
3588   register int *num, *map, **reduce;
3589   register REAL tmp;
3590 
3591 
3592 #ifdef DEBUG
3593   error_msg_warning("start gs_gop_xxx()\n");
3594 #endif
3595 
3596   num    = gs->num_local_reduce;
3597   reduce = gs->local_reduce;
3598   while ((map = *reduce))
3599     {
3600       /* wall */
3601       if (*num == 2)
3602 	{
3603 	  num ++; reduce++;
3604 	  vals[map[1]] = vals[map[0]] *= vals[map[1]];
3605 	}
3606       /* corner shared by three elements */
3607       else if (*num == 3)
3608 	{
3609 	  num ++; reduce++;
3610 	  vals[map[2]]=vals[map[1]]=vals[map[0]]*=(vals[map[1]]*vals[map[2]]);
3611 	}
3612       /* corner shared by four elements */
3613       else if (*num == 4)
3614 	{
3615 	  num ++; reduce++;
3616 	  vals[map[1]]=vals[map[2]]=vals[map[3]]=vals[map[0]] *=
3617 	                         (vals[map[1]] * vals[map[2]] * vals[map[3]]);
3618 	}
3619       /* general case ... odd geoms ... 3D*/
3620       else
3621 	{
3622 	  num ++;
3623  	  tmp = 1.0;
3624 	  while (*map >= 0)
3625 	    {tmp *= *(vals + *map++);}
3626 
3627 	  map = *reduce++;
3628 	  while (*map >= 0)
3629 	    {*(vals + *map++) = tmp;}
3630 	}
3631     }
3632 }
3633 
3634 
3635 
3636 /******************************************************************************
3637 Function: gather_scatter
3638 
3639 Input :
3640 Output:
3641 Return:
3642 Description:
3643 ******************************************************************************/
3644 static
3645 void
3646 gs_gop_local_in_times(register gs_id *gs, register REAL *vals)
3647 {
3648   register int *num, *map, **reduce;
3649   register REAL *base;
3650 
3651 
3652 #ifdef DEBUG
3653   error_msg_warning("start gs_gop_xxx()\n");
3654 #endif
3655 
3656   num    = gs->num_gop_local_reduce;
3657   reduce = gs->gop_local_reduce;
3658   while ((map = *reduce++))
3659     {
3660       /* wall */
3661       if (*num == 2)
3662 	{
3663 	  num ++;
3664 	  vals[map[0]] *= vals[map[1]];
3665 	}
3666       /* corner shared by three elements */
3667       else if (*num == 3)
3668 	{
3669 	  num ++;
3670 	  vals[map[0]] *= (vals[map[1]] * vals[map[2]]);
3671 	}
3672       /* corner shared by four elements */
3673       else if (*num == 4)
3674 	{
3675 	  num ++;
3676 	  vals[map[0]] *= (vals[map[1]] * vals[map[2]] * vals[map[3]]);
3677 	}
3678       /* general case ... odd geoms ... 3D*/
3679       else
3680 	{
3681 	  num++;
3682 	  base = vals + *map++;
3683 	  while (*map >= 0)
3684 	    {*base *= *(vals + *map++);}
3685 	}
3686     }
3687 }
3688 
3689 
3690 
3691 /******************************************************************************
3692 Function: gather_scatter
3693 
3694 VERSION 3 ::
3695 
3696 Input :
3697 Output:
3698 Return:
3699 Description:
3700 ******************************************************************************/
3701 static
3702 void
3703 gs_gop_pairwise_times(register gs_id *gs, register REAL *in_vals)
3704 {
3705 #if   defined NXSRC
3706   register REAL *dptr1, *dptr2, *dptr3, *in1, *in2;
3707   register int *iptr, *msg_list, *msg_size, **msg_nodes;
3708   register int *pw, *list, *size, **nodes;
3709   register int *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
3710 
3711 
3712 
3713 #ifdef DEBUG
3714   error_msg_warning("start gs_gop_xxx()\n");
3715 #endif
3716 
3717   /* strip and load registers */
3718   msg_list =list         = gs->pair_list;
3719   msg_size =size         = gs->msg_sizes;
3720   msg_nodes=nodes        = gs->node_list;
3721   iptr=pw                = gs->pw_elm_list;
3722   dptr1=dptr3            = gs->pw_vals;
3723   msg_ids_in  = ids_in   = gs->msg_ids_in;
3724   msg_ids_out = ids_out  = gs->msg_ids_out;
3725   dptr2                  = gs->out;
3726   in1=in2                = gs->in;
3727 
3728   /* post the receives */
3729   do
3730     {
3731       *msg_ids_in++ = (int) irecv(MSGTAG1 + *list++,(char *)in1,*size*REAL_LEN);
3732       in1 += *size++;
3733     }
3734   while (*msg_ids_in >= 0);
3735 
3736   /* load gs values into in out gs buffers */
3737   while (*iptr >= 0)
3738     {*dptr3++ = *(in_vals + *iptr++);}
3739 
3740   /* load out buffers and post the sends */
3741   while (iptr = *msg_nodes++)
3742     {
3743       dptr3 = dptr2;
3744       while (*iptr >= 0)
3745 	{*dptr2++ = *(dptr1 + *iptr++);}
3746       *msg_ids_out++ = (int) isend(MSGTAG1+my_id,(char *)dptr3,*(msg_size++)*REAL_LEN,
3747 			     *msg_list++,0);
3748     }
3749 
3750   /* post the receives ... was here*/
3751   if (gs->max_left_over)
3752     {gs_gop_tree_times(gs,in_vals);}
3753 
3754   /* process the received data */
3755   while (iptr = *nodes++)
3756     {
3757       msgwait(*ids_in++);
3758       while (*iptr >= 0)
3759 	{*(dptr1 + *iptr++) *= *in2++;}
3760     }
3761 
3762   /* replace vals */
3763   while (*pw >= 0)
3764     {*(in_vals + *pw++) = *dptr1++;}
3765 
3766   /* clear isend message handles */
3767   while (*ids_out >= 0)
3768     {msgwait(*ids_out++);}
3769 
3770 #elif defined MPISRC
3771   register REAL *dptr1, *dptr2, *dptr3, *in1, *in2;
3772   register int *iptr, *msg_list, *msg_size, **msg_nodes;
3773   register int *pw, *list, *size, **nodes;
3774   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
3775   MPI_Status status;
3776 
3777 
3778   /* strip and load registers */
3779   msg_list =list         = gs->pair_list;
3780   msg_size =size         = gs->msg_sizes;
3781   msg_nodes=nodes        = gs->node_list;
3782   iptr=pw                = gs->pw_elm_list;
3783   dptr1=dptr3            = gs->pw_vals;
3784   msg_ids_in  = ids_in   = gs->msg_ids_in;
3785   msg_ids_out = ids_out  = gs->msg_ids_out;
3786   dptr2                  = gs->out;
3787   in1=in2                = gs->in;
3788 
3789   /* post the receives */
3790   /*  msg_nodes=nodes; */
3791   do
3792     {
3793       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
3794 	 second one *list and do list++ afterwards */
3795       MPI_Irecv(in1, *size, REAL_TYPE, MPI_ANY_SOURCE, MSGTAG1 + *list++,
3796 		gs->gs_comm, msg_ids_in++);
3797       in1 += *size++;
3798     }
3799   while (*++msg_nodes);
3800   msg_nodes=nodes;
3801 
3802   /* load gs values into in out gs buffers */
3803   while (*iptr >= 0)
3804     {*dptr3++ = *(in_vals + *iptr++);}
3805 
3806   /* load out buffers and post the sends */
3807   while ((iptr = *msg_nodes++))
3808     {
3809       dptr3 = dptr2;
3810       while (*iptr >= 0)
3811 	{*dptr2++ = *(dptr1 + *iptr++);}
3812       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
3813       /* is msg_ids_out++ correct? */
3814       MPI_Isend(dptr3, *msg_size++, REAL_TYPE, *msg_list++,
3815 		MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);
3816     }
3817 
3818   if (gs->max_left_over)
3819     {gs_gop_tree_times(gs,in_vals);}
3820 
3821   /* process the received data */
3822   msg_nodes=nodes;
3823   while ((iptr = *nodes++))
3824     {
3825       /* Should I check the return value of MPI_Wait() or status? */
3826       /* Can this loop be replaced by a call to MPI_Waitall()? */
3827       MPI_Wait(ids_in++, &status);
3828       while (*iptr >= 0)
3829 	{*(dptr1 + *iptr++) *= *in2++;}
3830     }
3831 
3832   /* replace vals */
3833   while (*pw >= 0)
3834     {*(in_vals + *pw++) = *dptr1++;}
3835 
3836   /* clear isend message handles */
3837   /* This changed for clarity though it could be the same */
3838   while (*msg_nodes++)
3839     /* Should I check the return value of MPI_Wait() or status? */
3840     /* Can this loop be replaced by a call to MPI_Waitall()? */
3841     {MPI_Wait(ids_out++, &status);}
3842 #else
3843   return;
3844 #endif
3845 }
3846 
3847 
3848 
3849 /******************************************************************************
3850 Function: gather_scatter
3851 
3852 Input :
3853 Output:
3854 Return:
3855 Description:
3856 ******************************************************************************/
3857 static
3858 void
3859 gs_gop_tree_times(gs_id *gs, REAL *vals)
3860 {
3861   int size;
3862   int *in, *out;
3863   REAL *buf, *work;
3864   /*int op[] = {GL_MULT,0};*/
3865 
3866 
3867 #ifdef DEBUG
3868   error_msg_warning("start gs_gop_tree_times()");
3869 #endif
3870 
3871   in   = gs->tree_map_in;
3872   out  = gs->tree_map_out;
3873   buf  = gs->tree_buf;
3874   work = gs->tree_work;
3875   size = gs->tree_nel;
3876 
3877 #if defined  BLAS||CBLAS
3878   *work = 1.0;
3879   copy(size,work,0,buf,1);
3880 #else
3881   rvec_one(buf,size);
3882 #endif
3883 
3884   while (*in >= 0)
3885     {*(buf + *out++) = *(vals + *in++);}
3886 
3887   in   = gs->tree_map_in;
3888   out  = gs->tree_map_out;
3889 #if defined(NXSRC) && defined(r8)
3890   gdprod(buf,size,work);
3891   while (*in >= 0)
3892     {*(vals + *in++) = *(buf + *out++);}
3893 #elif defined MPISRC
3894   MPI_Allreduce(buf,work,size,REAL_TYPE,MPI_PROD,gs->gs_comm);
3895   while (*in >= 0)
3896     {*(vals + *in++) = *(work + *out++);}
3897 #else
3898   grop(buf,work,size,op);
3899   while (*in >= 0)
3900     {*(vals + *in++) = *(buf + *out++);}
3901 #endif
3902 
3903 #ifdef DEBUG
3904   error_msg_warning("end gs_gop_tree_times()");
3905 #endif
3906 }
3907 
3908 
3909 
3910 /******************************************************************************
3911 Function: gather_scatter
3912 
3913 
3914 Input :
3915 Output:
3916 Return:
3917 Description:
3918 ******************************************************************************/
3919 static void
3920 gs_gop_plus(register gs_id *gs, register REAL *vals)
3921 {
3922 #ifdef DEBUG
3923   error_msg_warning("start gs_gop_plus()\n");
3924 #endif
3925 
3926   /* local only operations!!! */
3927   if (gs->num_local)
3928     {gs_gop_local_plus(gs,vals);}
3929 
3930   /* if intersection tree/pairwise and local isn't empty */
3931   if (gs->num_local_gop)
3932     {
3933       gs_gop_local_in_plus(gs,vals);
3934 
3935       /* pairwise will NOT do tree inside ... */
3936       if (gs->num_pairs)
3937 	{gs_gop_pairwise_plus(gs,vals);}
3938 
3939       /* tree */
3940       if (gs->max_left_over)
3941 	{gs_gop_tree_plus(gs,vals);}
3942 
3943       gs_gop_local_out(gs,vals);
3944     }
3945   /* if intersection tree/pairwise and local is empty */
3946   else
3947     {
3948       /* pairwise will NOT do tree inside */
3949       if (gs->num_pairs)
3950 	{gs_gop_pairwise_plus(gs,vals);}
3951 
3952       /* tree */
3953       if (gs->max_left_over)
3954 	{gs_gop_tree_plus(gs,vals);}
3955     }
3956 
3957 #ifdef DEBUG
3958   error_msg_warning("end gs_gop_plus()\n");
3959 #endif
3960 }
3961 
3962 
3963 
3964 /******************************************************************************
3965 Function: gather_scatter
3966 
3967 Input :
3968 Output:
3969 Return:
3970 Description:
3971 ******************************************************************************/
3972 static
3973 void
3974 gs_gop_local_plus(register gs_id *gs, register REAL *vals)
3975 {
3976   register int *num, *map, **reduce;
3977   register REAL tmp;
3978 
3979 
3980 #ifdef DEBUG
3981   error_msg_warning("begin gs_gop_local_plus()\n");
3982 #endif
3983 
3984   num    = gs->num_local_reduce;
3985   reduce = gs->local_reduce;
3986   while ((map = *reduce))
3987     {
3988       /* wall */
3989       if (*num == 2)
3990 	{
3991 	  num ++; reduce++;
3992 	  vals[map[1]] = vals[map[0]] += vals[map[1]];
3993 	}
3994       /* corner shared by three elements */
3995       else if (*num == 3)
3996 	{
3997 	  num ++; reduce++;
3998 	  vals[map[2]]=vals[map[1]]=vals[map[0]]+=(vals[map[1]]+vals[map[2]]);
3999 	}
4000       /* corner shared by four elements */
4001       else if (*num == 4)
4002 	{
4003 	  num ++; reduce++;
4004 	  vals[map[1]]=vals[map[2]]=vals[map[3]]=vals[map[0]] +=
4005 	                         (vals[map[1]] + vals[map[2]] + vals[map[3]]);
4006 	}
4007       /* general case ... odd geoms ... 3D*/
4008       else
4009 	{
4010 	  num ++;
4011  	  tmp = 0.0;
4012 	  while (*map >= 0)
4013 	    {tmp += *(vals + *map++);}
4014 
4015 	  map = *reduce++;
4016 	  while (*map >= 0)
4017 	    {*(vals + *map++) = tmp;}
4018 	}
4019     }
4020 #ifdef DEBUG
4021   error_msg_warning("end gs_gop_local_plus()\n");
4022 #endif
4023 }
4024 
4025 
4026 
4027 /******************************************************************************
4028 Function: gather_scatter
4029 
4030 Input :
4031 Output:
4032 Return:
4033 Description:
4034 ******************************************************************************/
4035 static
4036 void
4037 gs_gop_local_in_plus(register gs_id *gs, register REAL *vals)
4038 {
4039   register int *num, *map, **reduce;
4040   register REAL *base;
4041 
4042 
4043 #ifdef DEBUG
4044   error_msg_warning("begin gs_gop_local_in_plus()\n");
4045 #endif
4046 
4047   num    = gs->num_gop_local_reduce;
4048   reduce = gs->gop_local_reduce;
4049   while ((map = *reduce++))
4050     {
4051       /* wall */
4052       if (*num == 2)
4053 	{
4054 	  num ++;
4055 	  vals[map[0]] += vals[map[1]];
4056 	}
4057       /* corner shared by three elements */
4058       else if (*num == 3)
4059 	{
4060 	  num ++;
4061 	  vals[map[0]] += (vals[map[1]] + vals[map[2]]);
4062 	}
4063       /* corner shared by four elements */
4064       else if (*num == 4)
4065 	{
4066 	  num ++;
4067 	  vals[map[0]] += (vals[map[1]] + vals[map[2]] + vals[map[3]]);
4068 	}
4069       /* general case ... odd geoms ... 3D*/
4070       else
4071 	{
4072 	  num++;
4073 	  base = vals + *map++;
4074 	  while (*map >= 0)
4075 	    {*base += *(vals + *map++);}
4076 	}
4077     }
4078 #ifdef DEBUG
4079   error_msg_warning("end gs_gop_local_in_plus()\n");
4080 #endif
4081 }
4082 
4083 
4084 
4085 /******************************************************************************
4086 Function: gather_scatter
4087 
4088 VERSION 3 ::
4089 
4090 Input :
4091 Output:
4092 Return:
4093 Description:
4094 ******************************************************************************/
4095 static
4096 void
4097 gs_gop_pairwise_plus(register gs_id *gs, register REAL *in_vals)
4098 {
4099 #if   defined NXSRC
4100   register REAL *dptr1, *dptr2, *dptr3, *in1, *in2;
4101   register int *iptr, *msg_list, *msg_size, **msg_nodes;
4102   register int *pw, *list, *size, **nodes;
4103   register int *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
4104 
4105 
4106 #ifdef DEBUG
4107   error_msg_warning("gs_gop_pairwise_plus() start\n");
4108 #endif
4109 
4110   /* strip and load registers */
4111   msg_list =list         = gs->pair_list;
4112   msg_size =size         = gs->msg_sizes;
4113   msg_nodes=nodes        = gs->node_list;
4114   iptr=pw                = gs->pw_elm_list;
4115   dptr1=dptr3            = gs->pw_vals;
4116   msg_ids_in  = ids_in   = gs->msg_ids_in;
4117   msg_ids_out = ids_out  = gs->msg_ids_out;
4118   dptr2                  = gs->out;
4119   in1=in2                = gs->in;
4120 
4121   /* post the receives */
4122   do
4123     {
4124       *msg_ids_in++ = (int) irecv(MSGTAG1 + *list++,(char *)in1,*size*REAL_LEN);
4125       in1 += *size++;
4126     }
4127   while (*msg_ids_in >= 0);
4128 
4129   /* load gs values into in out gs buffers */
4130   while (*iptr >= 0)
4131     {*dptr3++ = *(in_vals + *iptr++);}
4132 
4133   /* load out buffers and post the sends */
4134   while (iptr = *msg_nodes++)
4135     {
4136       dptr3 = dptr2;
4137       while (*iptr >= 0)
4138 	{*dptr2++ = *(dptr1 + *iptr++);}
4139       *msg_ids_out++ = (int) isend(MSGTAG1+my_id,(char *)dptr3,*(msg_size++)*REAL_LEN,
4140 			     *msg_list++,0);
4141     }
4142 
4143   /* post the receives ... was here*/
4144 
4145   /* do the tree while we're waiting */
4146   /*
4147   if (gs->max_left_over)
4148     {gs_gop_tree_plus(gs,in_vals);}
4149   */
4150 
4151   /* process the received data */
4152   while (iptr = *nodes++)
4153     {
4154       msgwait(*ids_in++);
4155       while (*iptr >= 0)
4156 	{*(dptr1 + *iptr++) += *in2++;}
4157     }
4158 
4159   /* replace vals */
4160   while (*pw >= 0)
4161     {*(in_vals + *pw++) = *dptr1++;}
4162 
4163   /* clear isend message handles */
4164   while (*ids_out >= 0)
4165     {msgwait(*ids_out++);}
4166 
4167 #ifdef DEBUG
4168   error_msg_warning("gs_gop_pairwise_plus() end\n");
4169 #endif
4170 
4171 #elif defined MPISRC
4172   register REAL *dptr1, *dptr2, *dptr3, *in1, *in2;
4173   register int *iptr, *msg_list, *msg_size, **msg_nodes;
4174   register int *pw, *list, *size, **nodes;
4175   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
4176   MPI_Status status;
4177 
4178 
4179 #ifdef DEBUG
4180   error_msg_warning("gs_gop_pairwise_plus() start\n");
4181 #endif
4182 
4183   /* strip and load registers */
4184   msg_list =list         = gs->pair_list;
4185   msg_size =size         = gs->msg_sizes;
4186   msg_nodes=nodes        = gs->node_list;
4187   iptr=pw                = gs->pw_elm_list;
4188   dptr1=dptr3            = gs->pw_vals;
4189   msg_ids_in  = ids_in   = gs->msg_ids_in;
4190   msg_ids_out = ids_out  = gs->msg_ids_out;
4191   dptr2                  = gs->out;
4192   in1=in2                = gs->in;
4193 
4194   /* post the receives */
4195   /*  msg_nodes=nodes; */
4196   do
4197     {
4198       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
4199 	 second one *list and do list++ afterwards */
4200       MPI_Irecv(in1, *size, REAL_TYPE, MPI_ANY_SOURCE, MSGTAG1 + *list++,
4201 		gs->gs_comm, msg_ids_in++);
4202       in1 += *size++;
4203     }
4204   while (*++msg_nodes);
4205   msg_nodes=nodes;
4206 
4207   /* load gs values into in out gs buffers */
4208   while (*iptr >= 0)
4209     {*dptr3++ = *(in_vals + *iptr++);}
4210 
4211   /* load out buffers and post the sends */
4212   while ((iptr = *msg_nodes++))
4213     {
4214       dptr3 = dptr2;
4215       while (*iptr >= 0)
4216 	{*dptr2++ = *(dptr1 + *iptr++);}
4217       /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
4218       /* is msg_ids_out++ correct? */
4219       MPI_Isend(dptr3, *msg_size++, REAL_TYPE, *msg_list++,
4220 		MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);
4221     }
4222 
4223   /* do the tree while we're waiting */
4224   if (gs->max_left_over)
4225     {gs_gop_tree_plus(gs,in_vals);}
4226 
4227   /* process the received data */
4228   msg_nodes=nodes;
4229   while ((iptr = *nodes++))
4230     {
4231       /* Should I check the return value of MPI_Wait() or status? */
4232       /* Can this loop be replaced by a call to MPI_Waitall()? */
4233       MPI_Wait(ids_in++, &status);
4234       while (*iptr >= 0)
4235 	{*(dptr1 + *iptr++) += *in2++;}
4236     }
4237 
4238   /* replace vals */
4239   while (*pw >= 0)
4240     {*(in_vals + *pw++) = *dptr1++;}
4241 
4242   /* clear isend message handles */
4243   /* This changed for clarity though it could be the same */
4244   while (*msg_nodes++)
4245     /* Should I check the return value of MPI_Wait() or status? */
4246     /* Can this loop be replaced by a call to MPI_Waitall()? */
4247     {MPI_Wait(ids_out++, &status);}
4248 
4249 #ifdef DEBUG
4250   error_msg_warning("gs_gop_pairwise_plus() end\n");
4251 #endif
4252 
4253 #else
4254   return;
4255 #endif
4256 }
4257 
4258 
4259 
4260 /******************************************************************************
4261 Function: gather_scatter
4262 
4263 Input :
4264 Output:
4265 Return:
4266 Description:
4267 ******************************************************************************/
4268 static
4269 void
4270 gs_gop_tree_plus(gs_id *gs, REAL *vals)
4271 {
4272   int size;
4273   int *in, *out;
4274   REAL *buf, *work;
4275   /*int op[] = {GL_ADD,0}; */
4276 
4277 
4278 #ifdef DEBUG
4279   error_msg_warning("start gs_gop_tree_plus()\n");
4280 #endif
4281 
4282   in   = gs->tree_map_in;
4283   out  = gs->tree_map_out;
4284   buf  = gs->tree_buf;
4285   work = gs->tree_work;
4286   size = gs->tree_nel;
4287 
4288 #if defined  BLAS||CBLAS
4289   *work = 0.0;
4290   copy(size,work,0,buf,1);
4291 #else
4292   rvec_zero(buf,size);
4293 #endif
4294 
4295   while (*in >= 0)
4296     {*(buf + *out++) = *(vals + *in++);}
4297 
4298   in   = gs->tree_map_in;
4299   out  = gs->tree_map_out;
4300 #if defined(NXSRC) && defined(r8)
4301   gdsum(buf,size,work);
4302 
4303   /* grop(buf,work,size,op);  */
4304   while (*in >= 0)
4305     {*(vals + *in++) = *(buf + *out++);}
4306 #elif defined MPISRC
4307   MPI_Allreduce(buf,work,size,REAL_TYPE,MPI_SUM,gs->gs_comm);
4308   while (*in >= 0)
4309     {*(vals + *in++) = *(work + *out++);}
4310 #else
4311   grop(buf,work,size,op);
4312   while (*in >= 0)
4313     {*(vals + *in++) = *(buf + *out++);}
4314 #endif
4315 
4316 #ifdef DEBUG
4317   error_msg_warning("end gs_gop_tree_plus()\n");
4318 #endif
4319 }
4320 
4321 
4322 
4323 /******************************************************************************
4324 Function: level_best_guess()
4325 
4326 Input :
4327 Output:
4328 Return:
4329 Description:
4330 ******************************************************************************/
4331 #if defined(not_used)
4332 static
4333 PetscErrorCode level_best_guess(void)
4334 {
4335   /* full pairwise for now */
4336   return(num_nodes);
4337 }
4338 #endif
4339 
4340 
4341 /******************************************************************************
4342 Function: gs_print_template()
4343 
4344 Input :
4345 
4346 Output:
4347 
4348 Return:
4349 
4350 Description:
4351 ******************************************************************************/
4352 #if defined(not_used)
4353 static
4354 void
4355 gs_print_template(register gs_id* gs, int who)
4356 {
4357   register int j, k, *iptr, *iptr2;
4358 
4359 
4360   if ((my_id == who) && (num_gs_ids))
4361     {
4362       printf("\n\nP#%d's GS#%d template:\n", my_id, gs->id);
4363       printf("id=%d\n",          gs->id);
4364       printf("nel(unique)=%d\n", gs->nel);
4365       printf("nel_max=%d\n",     gs->nel_max);
4366       printf("nel_min=%d\n",     gs->nel_min);
4367       printf("nel_sum=%d\n",     gs->nel_sum);
4368       printf("negl=%d\n",        gs->negl);
4369       printf("gl_max=%d\n",      gs->gl_max);
4370       printf("gl_min=%d\n",      gs->gl_min);
4371       printf("elms ordered=%d\n",gs->ordered);
4372       printf("repeats=%d\n",     gs->repeats);
4373       printf("positive=%d\n",    gs->positive);
4374       printf("elms=%ld\n",        (PTRINT) gs->elms);
4375       printf("elms(total)=%ld\n", (PTRINT) gs->local_elms);
4376       printf("vals=%ld\n",        (PTRINT) gs->vals);
4377       printf("gl_bss_min=%d\n",  gs->gl_bss_min);
4378       printf("gl_perm_min=%d\n", gs->gl_perm_min);
4379       printf("level=%d\n",       gs->level);
4380       printf("proc_mask_sz=%d\n",gs->mask_sz);
4381       printf("sh_proc_mask=%ld\n",(PTRINT) gs->nghs);
4382       printf("ngh_buf_size=%d\n",gs->ngh_buf_sz);
4383       printf("ngh_buf=%ld\n",     (PTRINT) gs->ngh_buf);
4384       printf("num_nghs=%d\n",    gs->num_nghs);
4385       printf("max_nghs=%d\n",    gs->max_nghs);
4386 
4387       /* pairwise exchange information */
4388       printf("\nPaiwise Info:\n");
4389       printf("num_pairs=%d\n",   gs->num_pairs);
4390       printf("max_pairs=%d\n",   gs->max_pairs);
4391       printf("len_pw_list=%d\n", gs->len_pw_list);
4392       printf("pair_list=%ld\n",   (PTRINT) gs->pair_list);
4393       printf("msg_sizes=%ld\n",   (PTRINT) gs->msg_sizes);
4394       printf("node_list=%ld\n",   (PTRINT) gs->node_list);
4395       printf("pw_elm_list=%ld\n", (PTRINT) gs->pw_elm_list);
4396 
4397       printf("pw_elm_list: ");
4398       if ((iptr = gs->pw_elm_list))
4399 	{
4400 	  for (j=0;j<gs->len_pw_list;j++)
4401 	    {printf("%d ", *iptr); iptr++;}
4402 	}
4403       printf("\n");
4404 
4405       printf("processor_list: ");
4406       if ((iptr = gs->pair_list))
4407 	{
4408 	  for (j=0;j<gs->num_pairs;j++)
4409 	    {printf("%d ", *iptr); iptr++;}
4410 	}
4411       printf("\n");
4412 
4413       printf("loc_node_pairs=%d\n",   gs->loc_node_pairs);
4414       printf("max_node_pairs=%d\n",   gs->max_node_pairs);
4415       printf("min_node_pairs=%d\n",   gs->min_node_pairs);
4416       printf("avg_node_pairs=%d\n",   gs->avg_node_pairs);
4417 
4418       printf("size_list: ");
4419       if ((iptr = gs->msg_sizes))
4420 	{
4421 	  for (j=0;j<gs->num_pairs;j++)
4422 	    {printf("%d ", *iptr); iptr++;}
4423 	}
4424       printf("\n");
4425       if ((iptr = gs->pair_list))
4426 	{
4427 	  for (j=0;j<gs->num_pairs;j++)
4428 	    {
4429 	      printf("node_list %d: ", *iptr);
4430 	      if ((iptr2 = (gs->node_list)[j]))
4431 		{
4432 		  for (k=0;k<(gs->msg_sizes)[j];k++)
4433 		    {printf("%d ", *iptr2); iptr2++;}
4434 		}
4435 	      iptr++;
4436 	      printf("\n");
4437 	    }
4438 	}
4439       printf("\n");
4440 
4441       printf("elm_list(U): ");
4442       if ((iptr = gs->elms))
4443 	{
4444 	  for (j=0;j<gs->nel;j++)
4445 	    {printf("%d ", *iptr); iptr++;}
4446 	}
4447       printf("\n");
4448       printf("\n");
4449 
4450       printf("elm_list(T): ");
4451       if ((iptr = gs->local_elms))
4452 	{
4453 	  for (j=0;j<gs->nel_total;j++)
4454 	    {printf("%d ", *iptr); iptr++;}
4455 	}
4456       printf("\n");
4457       printf("\n");
4458 
4459       printf("map_list(T): ");
4460       if ((iptr = gs->companion))
4461 	{
4462 	  for (j=0;j<gs->nel;j++)
4463 	    {printf("%d ", *iptr); iptr++;}
4464 	}
4465       printf("\n");
4466       printf("\n");
4467 
4468 
4469       /* local exchange information */
4470       printf("\nLocal Info:\n");
4471       printf("local_strength=%d\n",   gs->local_strength);
4472       printf("num_local_total=%d\n",  gs->num_local_total);
4473       printf("num_local=%d\n",        gs->num_local);
4474       printf("num_local_gop=%d\n",    gs->num_local_gop);
4475       printf("num_local_reduce=%ld\n", (PTRINT) gs->num_local_reduce);
4476       printf("local_reduce=%ld\n",     (PTRINT) gs->local_reduce);
4477       printf("num_gop_local_reduce=%ld\n", (PTRINT) gs->num_gop_local_reduce);
4478       printf("gop_local_reduce=%ld\n",     (PTRINT) gs->gop_local_reduce);
4479       printf("\n");
4480 
4481       for (j=0;j<gs->num_local;j++)
4482 	{
4483 	  printf("local reduce_list %d: ", j);
4484 	  if ((iptr2 = (gs->local_reduce)[j]))
4485 	    {
4486 	      if ((gs->num_local_reduce)[j] <= 0)
4487 		{printf("oops");}
4488 
4489 	      for (k=0;k<(gs->num_local_reduce)[j];k++)
4490 		{printf("%d ", *iptr2); iptr2++;}
4491 	    }
4492 	  printf("\n");
4493 	}
4494 
4495       printf("\n");
4496       printf("\n");
4497 
4498       for (j=0;j<gs->num_local_gop;j++)
4499 	{
4500 	  printf("gop reduce_list %d: ", j);
4501 	  iptr2 = (gs->gop_local_reduce)[j];
4502 
4503 	  if ((gs->num_gop_local_reduce)[j] <= 0)
4504 	    {printf("oops");}
4505 
4506 
4507 	  for (k=0;k<(gs->num_gop_local_reduce)[j];k++)
4508 	    {printf("%d ", *iptr2); iptr2++;}
4509 	  printf("\n");
4510 	}
4511       printf("\n");
4512       printf("\n");
4513 
4514       /* crystal router information */
4515       printf("\n\n");
4516       printf("Tree Info:\n");
4517       printf("max_left_over=%d\n",   gs->max_left_over);
4518       printf("num_in_list=%ld\n",    (PTRINT) gs->in_num);
4519       printf("in_list=%ld\n",        (PTRINT) gs->in_list);
4520       printf("num_out_list=%ld\n",   (PTRINT) gs->out_num);
4521       printf("out_list=%ld\n",       (PTRINT) gs->out_list);
4522 
4523       printf("\n\n");
4524     }
4525   fflush(stdout);
4526 }
4527 #endif
4528 
4529 
4530 
4531 /******************************************************************************
4532 Function: gs_free()
4533 
4534 Input :
4535 
4536 Output:
4537 
4538 Return:
4539 
4540 Description:
4541   if (gs->sss) {perm_free((void*) gs->sss);}
4542 ******************************************************************************/
4543 void
4544 gs_free(register gs_id *gs)
4545 {
4546   register int i;
4547 
4548 
4549 #ifdef DEBUG
4550   error_msg_warning("start gs_gop_xxx()\n");
4551   if (!gs) {error_msg_warning("NULL ptr passed to gs_free()"); return;}
4552 #endif
4553 
4554   if (gs->nghs) {perm_free((void*) gs->nghs);}
4555   if (gs->pw_nghs) {perm_free((void*) gs->pw_nghs);}
4556 
4557   /* tree */
4558   if (gs->max_left_over)
4559     {
4560       if (gs->tree_elms) {bss_free((void*) gs->tree_elms);}
4561       if (gs->tree_buf) {bss_free((void*) gs->tree_buf);}
4562       if (gs->tree_work) {bss_free((void*) gs->tree_work);}
4563       if (gs->tree_map_in) {bss_free((void*) gs->tree_map_in);}
4564       if (gs->tree_map_out) {bss_free((void*) gs->tree_map_out);}
4565     }
4566 
4567   /* pairwise info */
4568   if (gs->num_pairs)
4569     {
4570       /* should be NULL already */
4571       if (gs->ngh_buf) {bss_free((void*) gs->ngh_buf);}
4572       if (gs->elms) {bss_free((void*) gs->elms);}
4573       if (gs->local_elms) {bss_free((void*) gs->local_elms);}
4574       if (gs->companion) {bss_free((void*) gs->companion);}
4575 
4576       /* only set if pairwise */
4577       if (gs->vals) {perm_free((void*) gs->vals);}
4578       if (gs->in) {perm_free((void*) gs->in);}
4579       if (gs->out) {perm_free((void*) gs->out);}
4580       if (gs->msg_ids_in) {perm_free((void*) gs->msg_ids_in);}
4581       if (gs->msg_ids_out) {perm_free((void*) gs->msg_ids_out);}
4582       if (gs->pw_vals) {perm_free((void*) gs->pw_vals);}
4583       if (gs->pw_elm_list) {perm_free((void*) gs->pw_elm_list);}
4584       if (gs->node_list)
4585 	{
4586 	  for (i=0;i<gs->num_pairs;i++)
4587 	    {if (gs->node_list[i]) {perm_free((void*) gs->node_list[i]);}}
4588 	  perm_free((void*) gs->node_list);
4589 	}
4590       if (gs->msg_sizes) {perm_free((void*) gs->msg_sizes);}
4591       if (gs->pair_list) {perm_free((void*) gs->pair_list);}
4592     }
4593 
4594   /* local info */
4595   if (gs->num_local_total>=0)
4596     {
4597       for (i=0;i<gs->num_local_total+1;i++)
4598 	/*      for (i=0;i<gs->num_local_total;i++) */
4599 	{
4600 	  if (gs->num_gop_local_reduce[i])
4601 	    {perm_free((void*) gs->gop_local_reduce[i]);}
4602 	}
4603     }
4604 
4605   /* if intersection tree/pairwise and local isn't empty */
4606   if (gs->gop_local_reduce) {perm_free((void*) gs->gop_local_reduce);}
4607   if (gs->num_gop_local_reduce) {perm_free((void*) gs->num_gop_local_reduce);}
4608 
4609   perm_free((void*) gs);
4610 }
4611 
4612 
4613 
4614 
4615 
4616 
4617 /******************************************************************************
4618 Function: gather_scatter
4619 
4620 Input :
4621 Output:
4622 Return:
4623 Description:
4624 ******************************************************************************/
4625 void
4626 gs_gop_vec(register gs_id *gs, register REAL *vals, register const char *op, register int step)
4627 {
4628 #ifdef DEBUG
4629   error_msg_warning("gs_gop_vec() start");
4630   if (!gs) {error_msg_fatal("gs_gop_vec() :: passed NULL gs handle!!!");}
4631   if (!op) {error_msg_fatal("gs_gop_vec() :: passed NULL operation!!!");}
4632 
4633   /* check top make sure that segments being requested aren't larger */
4634   /* then what I reserved earlier ... fix is to allow user to reset  */
4635   if (step>gs->vec_sz)
4636     {error_msg_fatal("gs_gop_vec() :: %d > %d!\n",step,gs->vec_sz);}
4637 #endif
4638 
4639   switch (*op) {
4640   case '+':
4641     gs_gop_vec_plus(gs,vals,step);
4642     break;
4643 #ifdef NOT_YET
4644   case '*':
4645     gs_gop_times(gs,vals);
4646     break;
4647   case 'a':
4648     gs_gop_min_abs(gs,vals);
4649     break;
4650   case 'A':
4651     gs_gop_max_abs(gs,vals);
4652     break;
4653   case 'e':
4654     gs_gop_exists(gs,vals);
4655     break;
4656   case 'm':
4657     gs_gop_min(gs,vals);
4658     break;
4659   case 'M':
4660     gs_gop_max(gs,vals); break;
4661     /*
4662     if (*(op+1)=='\0')
4663       {gs_gop_max(gs,vals); break;}
4664     else if (*(op+1)=='X')
4665       {gs_gop_max_abs(gs,vals); break;}
4666     else if (*(op+1)=='N')
4667       {gs_gop_min_abs(gs,vals); break;}
4668     */
4669 #endif
4670   default:
4671     error_msg_warning("gs_gop_vec() :: %c is not a valid op",op[0]);
4672     error_msg_warning("gs_gop_vec() :: default :: plus");
4673     gs_gop_vec_plus(gs,vals,step);
4674     break;
4675   }
4676 #ifdef DEBUG
4677   error_msg_warning("gs_gop_vec() end");
4678 #endif
4679 }
4680 
4681 
4682 
4683 /******************************************************************************
4684 Function: gather_scatter
4685 
4686 Input :
4687 Output:
4688 Return:
4689 Description:
4690 ******************************************************************************/
4691 static void
4692 gs_gop_vec_plus(register gs_id *gs, register REAL *vals, register int step)
4693 {
4694 #ifdef DEBUG
4695   error_msg_warning("gs_gop_vec_plus() start");
4696 #endif
4697 
4698   if (!gs) {error_msg_fatal("gs_gop_vec() passed NULL gs handle!!!");}
4699 
4700   /* local only operations!!! */
4701   if (gs->num_local)
4702     {gs_gop_vec_local_plus(gs,vals,step);}
4703 
4704   /* if intersection tree/pairwise and local isn't empty */
4705   if (gs->num_local_gop)
4706     {
4707       gs_gop_vec_local_in_plus(gs,vals,step);
4708 
4709       /* pairwise */
4710       if (gs->num_pairs)
4711 	{gs_gop_vec_pairwise_plus(gs,vals,step);}
4712 
4713       /* tree */
4714       else if (gs->max_left_over)
4715 	{gs_gop_vec_tree_plus(gs,vals,step);}
4716 
4717       gs_gop_vec_local_out(gs,vals,step);
4718     }
4719   /* if intersection tree/pairwise and local is empty */
4720   else
4721     {
4722       /* pairwise */
4723       if (gs->num_pairs)
4724 	{gs_gop_vec_pairwise_plus(gs,vals,step);}
4725 
4726       /* tree */
4727       else if (gs->max_left_over)
4728 	{gs_gop_vec_tree_plus(gs,vals,step);}
4729     }
4730 #ifdef DEBUG
4731   error_msg_warning("gs_gop_vec_plus() end");
4732 #endif
4733 }
4734 
4735 
4736 
4737 /******************************************************************************
4738 Function: gather_scatter
4739 
4740 Input :
4741 Output:
4742 Return:
4743 Description:
4744 ******************************************************************************/
4745 static
4746 void
4747 gs_gop_vec_local_plus(register gs_id *gs, register REAL *vals,
4748 		      register int step)
4749 {
4750   register int *num, *map, **reduce;
4751   register REAL *base;
4752 
4753 
4754 #ifdef DEBUG
4755   error_msg_warning("gs_gop_vec_local_plus() start");
4756 #endif
4757 
4758   num    = gs->num_local_reduce;
4759   reduce = gs->local_reduce;
4760   while ((map = *reduce))
4761     {
4762       base = vals + map[0] * step;
4763 
4764       /* wall */
4765       if (*num == 2)
4766 	{
4767 	  num++; reduce++;
4768 	  rvec_add (base,vals+map[1]*step,step);
4769 	  rvec_copy(vals+map[1]*step,base,step);
4770 	}
4771       /* corner shared by three elements */
4772       else if (*num == 3)
4773 	{
4774 	  num++; reduce++;
4775 	  rvec_add (base,vals+map[1]*step,step);
4776 	  rvec_add (base,vals+map[2]*step,step);
4777 	  rvec_copy(vals+map[2]*step,base,step);
4778 	  rvec_copy(vals+map[1]*step,base,step);
4779 	}
4780       /* corner shared by four elements */
4781       else if (*num == 4)
4782 	{
4783 	  num++; reduce++;
4784 	  rvec_add (base,vals+map[1]*step,step);
4785 	  rvec_add (base,vals+map[2]*step,step);
4786 	  rvec_add (base,vals+map[3]*step,step);
4787 	  rvec_copy(vals+map[3]*step,base,step);
4788 	  rvec_copy(vals+map[2]*step,base,step);
4789 	  rvec_copy(vals+map[1]*step,base,step);
4790 	}
4791       /* general case ... odd geoms ... 3D */
4792       else
4793 	{
4794 	  num++;
4795 	  while (*++map >= 0)
4796 	    {rvec_add (base,vals+*map*step,step);}
4797 
4798 	  map = *reduce;
4799 	  while (*++map >= 0)
4800 	    {rvec_copy(vals+*map*step,base,step);}
4801 
4802 	  reduce++;
4803 	}
4804     }
4805 #ifdef DEBUG
4806   error_msg_warning("gs_gop_vec_local_plus() end");
4807 #endif
4808 }
4809 
4810 
4811 
4812 /******************************************************************************
4813 Function: gather_scatter
4814 
4815 Input :
4816 Output:
4817 Return:
4818 Description:
4819 ******************************************************************************/
4820 static
4821 void
4822 gs_gop_vec_local_in_plus(register gs_id *gs, register REAL *vals,
4823 			 register int step)
4824 {
4825   register int  *num, *map, **reduce;
4826   register REAL *base;
4827 
4828 
4829 #ifdef DEBUG
4830   error_msg_warning("gs_gop_vec_locel_in_plus() start");
4831 #endif
4832 
4833   num    = gs->num_gop_local_reduce;
4834   reduce = gs->gop_local_reduce;
4835   while ((map = *reduce++))
4836     {
4837       base = vals + map[0] * step;
4838 
4839       /* wall */
4840       if (*num == 2)
4841 	{
4842 	  num ++;
4843 	  rvec_add(base,vals+map[1]*step,step);
4844 	}
4845       /* corner shared by three elements */
4846       else if (*num == 3)
4847 	{
4848 	  num ++;
4849 	  rvec_add(base,vals+map[1]*step,step);
4850 	  rvec_add(base,vals+map[2]*step,step);
4851 	}
4852       /* corner shared by four elements */
4853       else if (*num == 4)
4854 	{
4855 	  num ++;
4856 	  rvec_add(base,vals+map[1]*step,step);
4857 	  rvec_add(base,vals+map[2]*step,step);
4858 	  rvec_add(base,vals+map[3]*step,step);
4859 	}
4860       /* general case ... odd geoms ... 3D*/
4861       else
4862 	{
4863 	  num++;
4864 	  while (*++map >= 0)
4865 	    {rvec_add(base,vals+*map*step,step);}
4866 	}
4867     }
4868 #ifdef DEBUG
4869   error_msg_warning("gs_gop_vec_local_in_plus() end");
4870 #endif
4871 }
4872 
4873 
4874 /******************************************************************************
4875 Function: gather_scatter
4876 
4877 Input :
4878 Output:
4879 Return:
4880 Description:
4881 ******************************************************************************/
4882 static
4883 void
4884 gs_gop_vec_local_out(register gs_id *gs, register REAL *vals,
4885 		     register int step)
4886 {
4887   register int *num, *map, **reduce;
4888   register REAL *base;
4889 
4890 
4891 #ifdef DEBUG
4892   error_msg_warning("gs_gop_vec_local_out() start");
4893 #endif
4894 
4895   num    = gs->num_gop_local_reduce;
4896   reduce = gs->gop_local_reduce;
4897   while ((map = *reduce++))
4898     {
4899       base = vals + map[0] * step;
4900 
4901       /* wall */
4902       if (*num == 2)
4903 	{
4904 	  num ++;
4905 	  rvec_copy(vals+map[1]*step,base,step);
4906 	}
4907       /* corner shared by three elements */
4908       else if (*num == 3)
4909 	{
4910 	  num ++;
4911 	  rvec_copy(vals+map[1]*step,base,step);
4912 	  rvec_copy(vals+map[2]*step,base,step);
4913 	}
4914       /* corner shared by four elements */
4915       else if (*num == 4)
4916 	{
4917 	  num ++;
4918 	  rvec_copy(vals+map[1]*step,base,step);
4919 	  rvec_copy(vals+map[2]*step,base,step);
4920 	  rvec_copy(vals+map[3]*step,base,step);
4921 	}
4922       /* general case ... odd geoms ... 3D*/
4923       else
4924 	{
4925 	  num++;
4926 	  while (*++map >= 0)
4927 	    {rvec_copy(vals+*map*step,base,step);}
4928 	}
4929     }
4930 #ifdef DEBUG
4931   error_msg_warning("gs_gop_vec_local_out() end");
4932 #endif
4933 }
4934 
4935 
4936 
4937 /******************************************************************************
4938 Function: gather_scatter
4939 
4940 VERSION 3 ::
4941 
4942 Input :
4943 Output:
4944 Return:
4945 Description:
4946 ******************************************************************************/
4947 static
4948 void
4949 gs_gop_vec_pairwise_plus(register gs_id *gs, register REAL *in_vals,
4950 			 register int step)
4951 {
4952 #if   defined NXSRC
4953   register REAL *dptr1, *dptr2, *dptr3, *in1, *in2;
4954   register int *iptr, *msg_list, *msg_size, **msg_nodes;
4955   register int *pw, *list, *size, **nodes;
4956   register int *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
4957   register int i;
4958 
4959 #ifdef DEBUG
4960   error_msg_warning("gs_gop_vec_pairwise_plus() start");
4961 #endif
4962 
4963   /* strip and load registers */
4964   msg_list =list         = gs->pair_list;
4965   msg_size =size         = gs->msg_sizes;
4966   msg_nodes=nodes        = gs->node_list;
4967   iptr=pw                = gs->pw_elm_list;
4968   dptr1=dptr3            = gs->pw_vals;
4969   msg_ids_in  = ids_in   = gs->msg_ids_in;
4970   msg_ids_out = ids_out  = gs->msg_ids_out;
4971   dptr2                  = gs->out;
4972   in1=in2                = gs->in;
4973 
4974   /* post the receives */
4975   do
4976     {
4977       *msg_ids_in++ = (int) irecv(MSGTAG1 + *list++,(char *)in1,*size*REAL_LEN*step);
4978       in1 += *size++ * step;
4979     }
4980   while (*msg_ids_in >= 0);
4981 
4982   /* load gs values into in out gs buffers */
4983   while (*iptr >= 0)
4984     {
4985       rvec_copy(dptr3,in_vals + *iptr*step,step);
4986       dptr3+=step;
4987       iptr++;
4988     }
4989 
4990   /* load out buffers and post the sends */
4991   while (iptr = *msg_nodes++)
4992     {
4993       dptr3 = dptr2;
4994       while (*iptr >= 0)
4995 	{
4996 	  rvec_copy(dptr2,dptr1 + *iptr*step,step);
4997 	  dptr2+=step;
4998 	  iptr++;
4999 	}
5000       *msg_ids_out++ = (int) isend(MSGTAG1+my_id,(char *)dptr3,
5001 				   *(msg_size++)*REAL_LEN*step,*msg_list++,0);
5002     }
5003 
5004   /* post the receives ... was here*/
5005   /* tree */
5006   if (gs->max_left_over)
5007     {gs_gop_vec_tree_plus(gs,in_vals,step);}
5008 
5009   /* process the received data */
5010   while (iptr = *nodes++)
5011     {
5012       msgwait(*ids_in++);
5013       while (*iptr >= 0)
5014 	{
5015 #if defined BLAS||CBLAS
5016 	  axpy(step,1.0,in2,1,dptr1 + *iptr*step,1);
5017 #else
5018 	  rvec_add(dptr1 + *iptr*step,in2,step);
5019 #endif
5020 	  in2+=step;
5021 	  iptr++;
5022 	}
5023     }
5024 
5025   /* replace vals */
5026   while (*pw >= 0)
5027     {
5028       rvec_copy(in_vals + *pw*step,dptr1,step);
5029       dptr1+=step;
5030       pw++;
5031     }
5032 
5033   /* clear isend message handles */
5034   while (*ids_out >= 0)
5035     {msgwait(*ids_out++);}
5036 
5037 #ifdef DEBUG
5038   error_msg_warning("gs_gop_vec_pairwise_plus() end");
5039 #endif
5040 
5041 #elif defined MPISRC
5042   register REAL *dptr1, *dptr2, *dptr3, *in1, *in2;
5043   register int *iptr, *msg_list, *msg_size, **msg_nodes;
5044   register int *pw, *list, *size, **nodes;
5045   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
5046   MPI_Status status;
5047 
5048 
5049 #ifdef DEBUG
5050   error_msg_warning("gs_gop_vec_pairwise_plus() start");
5051 #endif
5052 
5053   /* strip and load registers */
5054   msg_list =list         = gs->pair_list;
5055   msg_size =size         = gs->msg_sizes;
5056   msg_nodes=nodes        = gs->node_list;
5057   iptr=pw                = gs->pw_elm_list;
5058   dptr1=dptr3            = gs->pw_vals;
5059   msg_ids_in  = ids_in   = gs->msg_ids_in;
5060   msg_ids_out = ids_out  = gs->msg_ids_out;
5061   dptr2                  = gs->out;
5062   in1=in2                = gs->in;
5063 
5064   /* post the receives */
5065   /*  msg_nodes=nodes; */
5066   do
5067     {
5068       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
5069 	 second one *list and do list++ afterwards */
5070       MPI_Irecv(in1, *size *step, REAL_TYPE, MPI_ANY_SOURCE, MSGTAG1 + *list++,
5071 		gs->gs_comm, msg_ids_in++);
5072       in1 += *size++ *step;
5073     }
5074   while (*++msg_nodes);
5075   msg_nodes=nodes;
5076 
5077   /* load gs values into in out gs buffers */
5078   while (*iptr >= 0)
5079     {
5080       rvec_copy(dptr3,in_vals + *iptr*step,step);
5081       dptr3+=step;
5082       iptr++;
5083     }
5084 
5085   /* load out buffers and post the sends */
5086   while ((iptr = *msg_nodes++))
5087     {
5088       dptr3 = dptr2;
5089       while (*iptr >= 0)
5090 	{
5091 	  rvec_copy(dptr2,dptr1 + *iptr*step,step);
5092 	  dptr2+=step;
5093 	  iptr++;
5094 	}
5095       MPI_Isend(dptr3, *msg_size++ *step, REAL_TYPE, *msg_list++,
5096 		MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);
5097     }
5098 
5099   /* tree */
5100   if (gs->max_left_over)
5101     {gs_gop_vec_tree_plus(gs,in_vals,step);}
5102 
5103   /* process the received data */
5104   msg_nodes=nodes;
5105   while ((iptr = *nodes++))
5106     {
5107       /* Should I check the return value of MPI_Wait() or status? */
5108       /* Can this loop be replaced by a call to MPI_Waitall()? */
5109       MPI_Wait(ids_in++, &status);
5110       while (*iptr >= 0)
5111 	{
5112 #if defined BLAS||CBLAS
5113 	  axpy(step,1.0,in2,1,dptr1 + *iptr*step,1);
5114 #else
5115 	  rvec_add(dptr1 + *iptr*step,in2,step);
5116 #endif
5117 	  in2+=step;
5118 	  iptr++;
5119 	}
5120     }
5121 
5122   /* replace vals */
5123   while (*pw >= 0)
5124     {
5125       rvec_copy(in_vals + *pw*step,dptr1,step);
5126       dptr1+=step;
5127       pw++;
5128     }
5129 
5130   /* clear isend message handles */
5131   /* This changed for clarity though it could be the same */
5132   while (*msg_nodes++)
5133     /* Should I check the return value of MPI_Wait() or status? */
5134     /* Can this loop be replaced by a call to MPI_Waitall()? */
5135     {MPI_Wait(ids_out++, &status);}
5136 
5137 #ifdef DEBUG
5138   error_msg_warning("gs_gop_vec_pairwise_plus() end");
5139 #endif
5140 
5141 #else
5142   return;
5143 #endif
5144 }
5145 
5146 
5147 
5148 /******************************************************************************
5149 Function: gather_scatter
5150 
5151 Input :
5152 Output:
5153 Return:
5154 Description:
5155 ******************************************************************************/
5156 static
5157 void
5158 gs_gop_vec_tree_plus(register gs_id *gs, register REAL *vals, register int step)
5159 {
5160   register int size, *in, *out;
5161   register REAL *buf, *work;
5162   int op[] = {GL_ADD,0};
5163 
5164 #ifdef DEBUG
5165   error_msg_warning("start gs_gop_vec_tree_plus()");
5166 #endif
5167 
5168   /* copy over to local variables */
5169   in   = gs->tree_map_in;
5170   out  = gs->tree_map_out;
5171   buf  = gs->tree_buf;
5172   work = gs->tree_work;
5173   size = gs->tree_nel*step;
5174 
5175   /* zero out collection buffer */
5176 #if defined  BLAS||CBLAS
5177   *work = 0.0;
5178   copy(size,work,0,buf,1);
5179 #else
5180   rvec_zero(buf,size);
5181 #endif
5182 
5183 
5184   /* copy over my contributions */
5185   while (*in >= 0)
5186     {
5187 #if defined  BLAS||CBLAS
5188       copy(step,vals + *in++*step,1,buf + *out++*step,1);
5189 #else
5190       rvec_copy(buf + *out++*step,vals + *in++*step,step);
5191 #endif
5192     }
5193 
5194   /* perform fan in/out on full buffer */
5195   /* must change grop to handle the blas */
5196   grop(buf,work,size,op);
5197 
5198   /* reset */
5199   in   = gs->tree_map_in;
5200   out  = gs->tree_map_out;
5201 
5202   /* get the portion of the results I need */
5203   while (*in >= 0)
5204     {
5205 #if defined  BLAS||CBLAS
5206       copy(step,buf + *out++*step,1,vals + *in++*step,1);
5207 #else
5208       rvec_copy(vals + *in++*step,buf + *out++*step,step);
5209 #endif
5210     }
5211 
5212 #ifdef DEBUG
5213   error_msg_warning("start gs_gop_vec_tree_plus()");
5214 #endif
5215 }
5216 
5217 
5218 
5219 /******************************************************************************
5220 Function: gather_scatter
5221 
5222 Input :
5223 Output:
5224 Return:
5225 Description:
5226 ******************************************************************************/
5227 void
5228 gs_gop_hc(register gs_id *gs, register REAL *vals, register const char *op, register int dim)
5229 {
5230 #ifdef DEBUG
5231   error_msg_warning("gs_gop_hc() start\n");
5232   if (!gs) {error_msg_fatal("gs_gop_vec() :: passed NULL gs handle!!!\n");}
5233   if (!op) {error_msg_fatal("gs_gop_vec() :: passed NULL operation!!!\n");}
5234 #endif
5235 
5236   switch (*op) {
5237   case '+':
5238     gs_gop_plus_hc(gs,vals,dim);
5239     break;
5240 #ifdef NOT_YET
5241   case '*':
5242     gs_gop_times(gs,vals);
5243     break;
5244   case 'a':
5245     gs_gop_min_abs(gs,vals);
5246     break;
5247   case 'A':
5248     gs_gop_max_abs(gs,vals);
5249     break;
5250   case 'e':
5251     gs_gop_exists(gs,vals);
5252     break;
5253   case 'm':
5254     gs_gop_min(gs,vals);
5255     break;
5256   case 'M':
5257     gs_gop_max(gs,vals); break;
5258     /*
5259     if (*(op+1)=='\0')
5260       {gs_gop_max(gs,vals); break;}
5261     else if (*(op+1)=='X')
5262       {gs_gop_max_abs(gs,vals); break;}
5263     else if (*(op+1)=='N')
5264       {gs_gop_min_abs(gs,vals); break;}
5265     */
5266 #endif
5267   default:
5268     error_msg_warning("gs_gop_hc() :: %c is not a valid op",op[0]);
5269     error_msg_warning("gs_gop_hc() :: default :: plus\n");
5270     gs_gop_plus_hc(gs,vals,dim);
5271     break;
5272   }
5273 #ifdef DEBUG
5274   error_msg_warning("gs_gop_hc() end\n");
5275 #endif
5276 }
5277 
5278 
5279 
5280 /******************************************************************************
5281 Function: gather_scatter
5282 
5283 Input :
5284 Output:
5285 Return:
5286 Description:
5287 ******************************************************************************/
5288 static void
5289 gs_gop_plus_hc(register gs_id *gs, register REAL *vals, int dim)
5290 {
5291 #ifdef DEBUG
5292   error_msg_warning("start gs_gop_hc()\n");
5293   if (!gs) {error_msg_fatal("gs_gop_hc() passed NULL gs handle!!!\n");}
5294 #endif
5295 
5296   /* if there's nothing to do return */
5297   if (dim<=0)
5298     {return;}
5299 
5300   /* can't do more dimensions then exist */
5301   dim = MIN(dim,i_log2_num_nodes);
5302 
5303   /* local only operations!!! */
5304   if (gs->num_local)
5305     {gs_gop_local_plus(gs,vals);}
5306 
5307   /* if intersection tree/pairwise and local isn't empty */
5308   if (gs->num_local_gop)
5309     {
5310       gs_gop_local_in_plus(gs,vals);
5311 
5312       /* pairwise will do tree inside ... */
5313       if (gs->num_pairs)
5314 	{gs_gop_pairwise_plus_hc(gs,vals,dim);}
5315 
5316       /* tree only */
5317       else if (gs->max_left_over)
5318 	{gs_gop_tree_plus_hc(gs,vals,dim);}
5319 
5320       gs_gop_local_out(gs,vals);
5321     }
5322   /* if intersection tree/pairwise and local is empty */
5323   else
5324     {
5325       /* pairwise will do tree inside */
5326       if (gs->num_pairs)
5327 	{gs_gop_pairwise_plus_hc(gs,vals,dim);}
5328 
5329       /* tree */
5330       else if (gs->max_left_over)
5331 	{gs_gop_tree_plus_hc(gs,vals,dim);}
5332     }
5333 
5334 #ifdef DEBUG
5335   error_msg_warning("end gs_gop_hc()\n");
5336 #endif
5337 }
5338 
5339 
5340 /******************************************************************************
5341 VERSION 3 ::
5342 
5343 Input :
5344 Output:
5345 Return:
5346 Description:
5347 ******************************************************************************/
5348 static
5349 void
5350 gs_gop_pairwise_plus_hc(register gs_id *gs, register REAL *in_vals, int dim)
5351 {
5352 #if   defined NXSRC
5353   register REAL *dptr1, *dptr2, *dptr3, *in1, *in2;
5354   register int *iptr, *msg_list, *msg_size, **msg_nodes;
5355   register int *pw, *list, *size, **nodes;
5356   register int *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
5357   int i, mask=1;
5358 
5359   for (i=1; i<dim; i++)
5360     {mask<<=1; mask++;}
5361 
5362 
5363 #ifdef DEBUG
5364   error_msg_warning("gs_gop_pairwise_hc() start\n");
5365 #endif
5366 
5367   /* strip and load registers */
5368   msg_list =list         = gs->pair_list;
5369   msg_size =size         = gs->msg_sizes;
5370   msg_nodes=nodes        = gs->node_list;
5371   iptr=pw                = gs->pw_elm_list;
5372   dptr1=dptr3            = gs->pw_vals;
5373   msg_ids_in  = ids_in   = gs->msg_ids_in;
5374   msg_ids_out = ids_out  = gs->msg_ids_out;
5375   dptr2                  = gs->out;
5376   in1=in2                = gs->in;
5377 
5378   /* post the receives */
5379   do
5380     {
5381       if ((my_id|mask)==(*list|mask))
5382 	{
5383 	  *msg_ids_in++ = (int) irecv(MSGTAG1 + *list++,(char *)in1,*size*REAL_LEN);
5384 	  in1 += *size++;
5385 	}
5386       else
5387 	{list++; size++;}
5388     }
5389   while (*++msg_nodes);
5390 
5391   /* load gs values into in out gs buffers */
5392   while (*iptr >= 0)
5393     {*dptr3++ = *(in_vals + *iptr++);}
5394 
5395   /* load out buffers and post the sends */
5396   list = msg_list;
5397   msg_nodes=nodes;
5398   while (iptr = *msg_nodes++)
5399     {
5400       if ((my_id|mask)==(*list|mask))
5401 	{
5402 	  dptr3 = dptr2;
5403 	  while (*iptr >= 0)
5404 	    {*dptr2++ = *(dptr1 + *iptr++);}
5405 	  *msg_ids_out++ = (int) isend(MSGTAG1+my_id,(char *)dptr3,
5406 				       *(msg_size++)*REAL_LEN,*list++,0);
5407 	}
5408       else
5409 	{msg_size++; list++;}
5410     }
5411   /* post the receives ... was here*/
5412 
5413   /* do the tree while we're waiting */
5414   if (gs->max_left_over)
5415     {gs_gop_tree_plus_hc(gs,in_vals,dim);}
5416 
5417   /* process the received data */
5418   list = msg_list;
5419   msg_nodes=nodes;
5420   while (iptr = *msg_nodes++)
5421     {
5422       if ((my_id|mask)==(*list|mask))
5423 	{
5424 	  msgwait(*ids_in++);
5425 	  while (*iptr >= 0)
5426 	    {*(dptr1 + *iptr++) += *in2++;}
5427 	}
5428       list++;
5429     }
5430 
5431   /* replace vals */
5432   while (*pw >= 0)
5433     {*(in_vals + *pw++) = *dptr1++;}
5434 
5435   /* clear isend message handles */
5436   while (iptr = *nodes++)
5437     {
5438       if ((my_id|mask)==(*msg_list|mask))
5439 	{msgwait(*ids_out++);}
5440       msg_list++;
5441     }
5442 
5443 #ifdef DEBUG
5444   error_msg_warning("gs_gop_pairwise_hc() end\n");
5445 #endif
5446 
5447 #elif defined MPISRC
5448   register REAL *dptr1, *dptr2, *dptr3, *in1, *in2;
5449   register int *iptr, *msg_list, *msg_size, **msg_nodes;
5450   register int *pw, *list, *size, **nodes;
5451   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
5452   MPI_Status status;
5453   int i, mask=1;
5454 
5455   for (i=1; i<dim; i++)
5456     {mask<<=1; mask++;}
5457 
5458 
5459 #ifdef DEBUG
5460   error_msg_warning("gs_gop_pairwise_hc() start\n");
5461 #endif
5462 
5463   /* strip and load registers */
5464   msg_list =list         = gs->pair_list;
5465   msg_size =size         = gs->msg_sizes;
5466   msg_nodes=nodes        = gs->node_list;
5467   iptr=pw                = gs->pw_elm_list;
5468   dptr1=dptr3            = gs->pw_vals;
5469   msg_ids_in  = ids_in   = gs->msg_ids_in;
5470   msg_ids_out = ids_out  = gs->msg_ids_out;
5471   dptr2                  = gs->out;
5472   in1=in2                = gs->in;
5473 
5474   /* post the receives */
5475   /*  msg_nodes=nodes; */
5476   do
5477     {
5478       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
5479 	 second one *list and do list++ afterwards */
5480       if ((my_id|mask)==(*list|mask))
5481 	{
5482 	  MPI_Irecv(in1, *size, REAL_TYPE, MPI_ANY_SOURCE, MSGTAG1 + *list++,
5483 		    gs->gs_comm, msg_ids_in++);
5484 	  in1 += *size++;
5485 	}
5486       else
5487 	{list++; size++;}
5488     }
5489   while (*++msg_nodes);
5490 
5491   /* load gs values into in out gs buffers */
5492   while (*iptr >= 0)
5493     {*dptr3++ = *(in_vals + *iptr++);}
5494 
5495   /* load out buffers and post the sends */
5496   msg_nodes=nodes;
5497   list = msg_list;
5498   while ((iptr = *msg_nodes++))
5499     {
5500       if ((my_id|mask)==(*list|mask))
5501 	{
5502 	  dptr3 = dptr2;
5503 	  while (*iptr >= 0)
5504 	    {*dptr2++ = *(dptr1 + *iptr++);}
5505 	  /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
5506 	  /* is msg_ids_out++ correct? */
5507 	  MPI_Isend(dptr3, *msg_size++, REAL_TYPE, *list++,
5508 		    MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);
5509 	}
5510       else
5511 	{list++; msg_size++;}
5512     }
5513 
5514   /* do the tree while we're waiting */
5515   if (gs->max_left_over)
5516     {gs_gop_tree_plus_hc(gs,in_vals,dim);}
5517 
5518   /* process the received data */
5519   msg_nodes=nodes;
5520   list = msg_list;
5521   while ((iptr = *nodes++))
5522     {
5523       if ((my_id|mask)==(*list|mask))
5524 	{
5525 	  /* Should I check the return value of MPI_Wait() or status? */
5526 	  /* Can this loop be replaced by a call to MPI_Waitall()? */
5527 	  MPI_Wait(ids_in++, &status);
5528 	  while (*iptr >= 0)
5529 	    {*(dptr1 + *iptr++) += *in2++;}
5530 	}
5531       list++;
5532     }
5533 
5534   /* replace vals */
5535   while (*pw >= 0)
5536     {*(in_vals + *pw++) = *dptr1++;}
5537 
5538   /* clear isend message handles */
5539   /* This changed for clarity though it could be the same */
5540   while (*msg_nodes++)
5541     {
5542       if ((my_id|mask)==(*msg_list|mask))
5543 	{
5544 	  /* Should I check the return value of MPI_Wait() or status? */
5545 	  /* Can this loop be replaced by a call to MPI_Waitall()? */
5546 	  MPI_Wait(ids_out++, &status);
5547 	}
5548       msg_list++;
5549     }
5550 
5551 #ifdef DEBUG
5552   error_msg_warning("gs_gop_pairwise_hc() end\n");
5553 #endif
5554 
5555 #else
5556   return;
5557 #endif
5558 }
5559 
5560 
5561 
5562 /******************************************************************************
5563 Function: gather_scatter
5564 
5565 Input :
5566 Output:
5567 Return:
5568 Description:
5569 ******************************************************************************/
5570 static
5571 void
5572 gs_gop_tree_plus_hc(gs_id *gs, REAL *vals, int dim)
5573 {
5574   int size;
5575   int *in, *out;
5576   REAL *buf, *work;
5577   int op[] = {GL_ADD,0};
5578 
5579 #ifdef DEBUG
5580   error_msg_warning("start gs_gop_tree_plus_hc()\n");
5581 #endif
5582 
5583   in   = gs->tree_map_in;
5584   out  = gs->tree_map_out;
5585   buf  = gs->tree_buf;
5586   work = gs->tree_work;
5587   size = gs->tree_nel;
5588 
5589 #if defined  BLAS||CBLAS
5590   *work = 0.0;
5591   copy(size,work,0,buf,1);
5592 #else
5593   rvec_zero(buf,size);
5594 #endif
5595 
5596   while (*in >= 0)
5597     {*(buf + *out++) = *(vals + *in++);}
5598 
5599   in   = gs->tree_map_in;
5600   out  = gs->tree_map_out;
5601 
5602   grop_hc(buf,work,size,op,dim);
5603 
5604   while (*in >= 0)
5605     {*(vals + *in++) = *(buf + *out++);}
5606 
5607 #ifdef DEBUG
5608   error_msg_warning("end gs_gop_tree_plus_hc()\n");
5609 #endif
5610 }
5611 
5612 
5613 
5614