#define PETSCKSP_DLL

/***********************************gs.c***************************************

Author: Henry M. Tufo III

e-mail: hmt@cs.brown.edu

snail-mail:
Division of Applied Mathematics
Brown University
Providence, RI 02912

Last Modification: 
6.21.97
************************************gs.c**************************************/

/***********************************gs.c***************************************
File Description:
-----------------

************************************gs.c**************************************/

#include "../src/ksp/pc/impls/tfs/tfs.h"

/* default length of number of items via tree - doubles if exceeded */
#define TREE_BUF_SZ 2048;
#define GS_VEC_SZ   1



/***********************************gs.c***************************************
Type: struct gather_scatter_id 
------------------------------

************************************gs.c**************************************/
typedef struct gather_scatter_id {
  PetscInt id;
  PetscInt nel_min;
  PetscInt nel_max;
  PetscInt nel_sum;
  PetscInt negl;
  PetscInt gl_max;
  PetscInt gl_min;
  PetscInt repeats;
  PetscInt ordered;
  PetscInt positive;
  PetscScalar *vals;

  /* bit mask info */
  PetscInt *my_proc_mask;
  PetscInt mask_sz;
  PetscInt *ngh_buf;
  PetscInt ngh_buf_sz;
  PetscInt *nghs;
  PetscInt num_nghs;
  PetscInt max_nghs;
  PetscInt *pw_nghs;
  PetscInt num_pw_nghs;
  PetscInt *tree_nghs;
  PetscInt num_tree_nghs;

  PetscInt num_loads;

  /* repeats == true -> local info */
  PetscInt nel;         /* number of unique elememts */
  PetscInt *elms;       /* of size nel */
  PetscInt nel_total;
  PetscInt *local_elms; /* of size nel_total */
  PetscInt *companion;  /* of size nel_total */

  /* local info */
  PetscInt num_local_total;
  PetscInt local_strength;
  PetscInt num_local;
  PetscInt *num_local_reduce;
  PetscInt **local_reduce;
  PetscInt num_local_gop;
  PetscInt *num_gop_local_reduce;
  PetscInt **gop_local_reduce;

  /* pairwise info */
  PetscInt level;
  PetscInt num_pairs;
  PetscInt max_pairs;
  PetscInt loc_node_pairs;
  PetscInt max_node_pairs;
  PetscInt min_node_pairs;
  PetscInt avg_node_pairs;
  PetscInt *pair_list;
  PetscInt *msg_sizes;
  PetscInt **node_list;
  PetscInt len_pw_list;
  PetscInt *pw_elm_list;
  PetscScalar *pw_vals;

  MPI_Request *msg_ids_in;
  MPI_Request *msg_ids_out;

  PetscScalar *out;
  PetscScalar *in;
  PetscInt msg_total;

  /* tree - crystal accumulator info */
  PetscInt max_left_over;
  PetscInt *pre;
  PetscInt *in_num;
  PetscInt *out_num;
  PetscInt **in_list;
  PetscInt **out_list;

  /* new tree work*/
  PetscInt  tree_nel;
  PetscInt *tree_elms;
  PetscScalar *tree_buf;
  PetscScalar *tree_work;

  PetscInt  tree_map_sz;
  PetscInt *tree_map_in;
  PetscInt *tree_map_out;

  /* current memory status */
  PetscInt gl_bss_min;
  PetscInt gl_perm_min;

  /* max segment size for gs_gop_vec() */
  PetscInt vec_sz;

  /* hack to make paul happy */
  MPI_Comm gs_comm;

} gs_id;

static gs_id *gsi_check_args(PetscInt *elms, PetscInt nel, PetscInt level);
static PetscErrorCode gsi_via_bit_mask(gs_id *gs);
static PetscErrorCode get_ngh_buf(gs_id *gs);
static PetscErrorCode set_pairwise(gs_id *gs);
static gs_id * gsi_new(void);
static PetscErrorCode set_tree(gs_id *gs);

/* same for all but vector flavor */
static PetscErrorCode gs_gop_local_out(gs_id *gs, PetscScalar *vals);
/* vector flavor */
static PetscErrorCode gs_gop_vec_local_out(gs_id *gs, PetscScalar *vals, PetscInt step);

static PetscErrorCode gs_gop_vec_plus(gs_id *gs, PetscScalar *in_vals, PetscInt step);
static PetscErrorCode gs_gop_vec_pairwise_plus(gs_id *gs, PetscScalar *in_vals, PetscInt step);
static PetscErrorCode gs_gop_vec_local_plus(gs_id *gs, PetscScalar *vals, PetscInt step);
static PetscErrorCode gs_gop_vec_local_in_plus(gs_id *gs, PetscScalar *vals, PetscInt step);
static PetscErrorCode gs_gop_vec_tree_plus(gs_id *gs, PetscScalar *vals, PetscInt step);


static PetscErrorCode gs_gop_local_plus(gs_id *gs, PetscScalar *vals);
static PetscErrorCode gs_gop_local_in_plus(gs_id *gs, PetscScalar *vals);

static PetscErrorCode gs_gop_plus_hc(gs_id *gs, PetscScalar *in_vals, PetscInt dim);
static PetscErrorCode gs_gop_pairwise_plus_hc(gs_id *gs, PetscScalar *in_vals, PetscInt dim);
static PetscErrorCode gs_gop_tree_plus_hc(gs_id *gs, PetscScalar *vals, PetscInt dim);

/* global vars */
/* from comm.c module */

static PetscInt num_gs_ids = 0;

/* should make this dynamic ... later */
static PetscInt msg_buf=MAX_MSG_BUF;
static PetscInt vec_sz=GS_VEC_SZ;
static PetscInt *tree_buf=NULL;
static PetscInt tree_buf_sz=0;
static PetscInt ntree=0;

/***************************************************************************/
PetscErrorCode gs_init_vec_sz(PetscInt size)
{
  PetscFunctionBegin;
  vec_sz = size;
  PetscFunctionReturn(0);
}

/******************************************************************************/
PetscErrorCode gs_init_msg_buf_sz(PetscInt buf_size)
{
  PetscFunctionBegin;
  msg_buf = buf_size;
  PetscFunctionReturn(0);
}

/******************************************************************************/
gs_id *gs_init( PetscInt *elms, PetscInt nel, PetscInt level)
{
   gs_id *gs;
  MPI_Group gs_group;
  MPI_Comm  gs_comm;
  PetscErrorCode ierr;

  PetscFunctionBegin;
  /* ensure that communication package has been initialized */
  comm_init();


  /* determines if we have enough dynamic/semi-static memory */
  /* checks input, allocs and sets gd_id template            */
  gs = gsi_check_args(elms,nel,level);

  /* only bit mask version up and working for the moment    */
  /* LATER :: get int list version working for sparse pblms */
  ierr = gsi_via_bit_mask(gs);CHKERRABORT(PETSC_COMM_WORLD,ierr);


  ierr = MPI_Comm_group(MPI_COMM_WORLD,&gs_group);CHKERRABORT(PETSC_COMM_WORLD,ierr);
  ierr = MPI_Comm_create(MPI_COMM_WORLD,gs_group,&gs_comm);CHKERRABORT(PETSC_COMM_WORLD,ierr);
  gs->gs_comm=gs_comm;

  return(gs);
}

/******************************************************************************/
static gs_id *gsi_new(void)
{
  PetscErrorCode ierr;
  gs_id *gs;
  gs = (gs_id *) malloc(sizeof(gs_id));
  ierr = PetscMemzero(gs,sizeof(gs_id));CHKERRABORT(PETSC_COMM_WORLD,ierr);
  return(gs);
}

/******************************************************************************/
static gs_id * gsi_check_args(PetscInt *in_elms, PetscInt nel, PetscInt level)
{
   PetscInt i, j, k, t2;
  PetscInt *companion, *elms, *unique, *iptr;
  PetscInt num_local=0, *num_to_reduce, **local_reduce;
  PetscInt oprs[] = {NON_UNIFORM,GL_MIN,GL_MAX,GL_ADD,GL_MIN,GL_MAX,GL_MIN,GL_B_AND};
  PetscInt vals[sizeof(oprs)/sizeof(oprs[0])-1];
  PetscInt work[sizeof(oprs)/sizeof(oprs[0])-1];
  gs_id *gs;
  PetscErrorCode ierr;


  if (!in_elms)
    {SETERRABORT(PETSC_COMM_WORLD,PETSC_ERR_PLIB,"elms point to nothing!!!\n");}

  if (nel<0)
    {SETERRABORT(PETSC_COMM_WORLD,PETSC_ERR_PLIB,"can't have fewer than 0 elms!!!\n");}

  if (nel==0)
    {ierr = PetscInfo(0,"I don't have any elements!!!\n");CHKERRABORT(PETSC_COMM_WORLD,ierr);}

  /* get space for gs template */
  gs = gsi_new();
  gs->id = ++num_gs_ids;

  /* hmt 6.4.99                                            */
  /* caller can set global ids that don't participate to 0 */
  /* gs_init ignores all zeros in elm list                 */
  /* negative global ids are still invalid                 */
  for (i=j=0;i<nel;i++)
    {if (in_elms[i]!=0) {j++;}}

  k=nel; nel=j;

  /* copy over in_elms list and create inverse map */
  elms = (PetscInt*) malloc((nel+1)*sizeof(PetscInt));
  companion = (PetscInt*) malloc(nel*sizeof(PetscInt));

  for (i=j=0;i<k;i++)
    {
      if (in_elms[i]!=0)
        {elms[j] = in_elms[i]; companion[j++] = i;}
    }

  if (j!=nel)
    {SETERRABORT(PETSC_COMM_WORLD,PETSC_ERR_PLIB,"nel j mismatch!\n");}

  /* pre-pass ... check to see if sorted */
  elms[nel] = INT_MAX;
  iptr = elms;
  unique = elms+1;
  j=0;
  while (*iptr!=INT_MAX)
    {
      if (*iptr++>*unique++) 
        {j=1; break;}
    }

  /* set up inverse map */  
  if (j)
    {
      ierr = PetscInfo(0,"gsi_check_args() :: elm list *not* sorted!\n");CHKERRABORT(PETSC_COMM_WORLD,ierr);
      ierr = SMI_sort((void*)elms, (void*)companion, nel, SORT_INTEGER);CHKERRABORT(PETSC_COMM_WORLD,ierr);
    }
  else
    {ierr = PetscInfo(0,"gsi_check_args() :: elm list sorted!\n");CHKERRABORT(PETSC_COMM_WORLD,ierr);}
  elms[nel] = INT_MIN;

  /* first pass */
  /* determine number of unique elements, check pd */
  for (i=k=0;i<nel;i+=j)
    {
      t2 = elms[i];
      j=++i;
     
      /* clump 'em for now */ 
      while (elms[j]==t2) {j++;}
      
      /* how many together and num local */
      if (j-=i)
        {num_local++; k+=j;}
    }

  /* how many unique elements? */
  gs->repeats=k;
  gs->nel = nel-k;


  /* number of repeats? */
  gs->num_local = num_local;
  num_local+=2;
  gs->local_reduce=local_reduce=(PetscInt **)malloc(num_local*sizeof(PetscInt*));
  gs->num_local_reduce=num_to_reduce=(PetscInt*) malloc(num_local*sizeof(PetscInt));

  unique = (PetscInt*) malloc((gs->nel+1)*sizeof(PetscInt));
  gs->elms = unique; 
  gs->nel_total = nel;
  gs->local_elms = elms;
  gs->companion = companion;

  /* compess map as well as keep track of local ops */
  for (num_local=i=j=0;i<gs->nel;i++)
    {
      k=j;
      t2 = unique[i] = elms[j];
      companion[i] = companion[j];
     
      while (elms[j]==t2) {j++;}

      if ((t2=(j-k))>1)
        {
          /* number together */
          num_to_reduce[num_local] = t2++;
          iptr = local_reduce[num_local++] = (PetscInt*)malloc(t2*sizeof(PetscInt));

          /* to use binary searching don't remap until we check intersection */
          *iptr++ = i;
          
          /* note that we're skipping the first one */
          while (++k<j)
            {*(iptr++) = companion[k];}
          *iptr = -1;
        }
    }

  /* sentinel for ngh_buf */
  unique[gs->nel]=INT_MAX;

  /* for two partition sort hack */
  num_to_reduce[num_local] = 0;
  local_reduce[num_local] = NULL;
  num_to_reduce[++num_local] = 0;
  local_reduce[num_local] = NULL;

  /* load 'em up */
  /* note one extra to hold NON_UNIFORM flag!!! */
  vals[2] = vals[1] = vals[0] = nel;
  if (gs->nel>0)
    {
       vals[3] = unique[0];           
       vals[4] = unique[gs->nel-1];   
    }
  else
    {
       vals[3] = INT_MAX;             
       vals[4] = INT_MIN;             
    }
  vals[5] = level;
  vals[6] = num_gs_ids;

  /* GLOBAL: send 'em out */
  ierr = giop(vals,work,sizeof(oprs)/sizeof(oprs[0])-1,oprs);CHKERRABORT(PETSC_COMM_WORLD,ierr);

  /* must be semi-pos def - only pairwise depends on this */
  /* LATER - remove this restriction */
  if (vals[3]<0)
    {SETERRABORT(PETSC_COMM_WORLD,PETSC_ERR_PLIB,"gsi_check_args() :: system not semi-pos def \n");}

  if (vals[4]==INT_MAX)
    {SETERRABORT(PETSC_COMM_WORLD,PETSC_ERR_PLIB,"gsi_check_args() :: system ub too large !\n");}

  gs->nel_min = vals[0];
  gs->nel_max = vals[1];
  gs->nel_sum = vals[2];
  gs->gl_min  = vals[3];
  gs->gl_max  = vals[4];
  gs->negl    = vals[4]-vals[3]+1;

  if (gs->negl<=0)
    {SETERRABORT(PETSC_COMM_WORLD,PETSC_ERR_PLIB,"gsi_check_args() :: system empty or neg :: %d\n");}
  
  /* LATER :: add level == -1 -> program selects level */
  if (vals[5]<0)
    {vals[5]=0;}
  else if (vals[5]>num_nodes)
    {vals[5]=num_nodes;}
  gs->level = vals[5];

  return(gs);
}

/******************************************************************************/
static PetscErrorCode gsi_via_bit_mask(gs_id *gs)
{
   PetscInt i, nel, *elms;
  PetscInt t1;
  PetscInt **reduce;
  PetscInt *map;
  PetscErrorCode ierr;

  PetscFunctionBegin;
  /* totally local removes ... ct_bits == 0 */
  get_ngh_buf(gs);

  if (gs->level)
    {set_pairwise(gs);}

  if (gs->max_left_over)
    {set_tree(gs);}

  /* intersection local and pairwise/tree? */
  gs->num_local_total = gs->num_local;
  gs->gop_local_reduce = gs->local_reduce;
  gs->num_gop_local_reduce = gs->num_local_reduce;

  map = gs->companion;

  /* is there any local compression */
  if (!gs->num_local) {
    gs->local_strength = NONE;
    gs->num_local_gop = 0;
  } else {
      /* ok find intersection */
      map = gs->companion;
      reduce = gs->local_reduce;  
      for (i=0, t1=0; i<gs->num_local; i++, reduce++)
        {
          if ((ivec_binary_search(**reduce,gs->pw_elm_list,gs->len_pw_list)>=0)
              ||
              ivec_binary_search(**reduce,gs->tree_map_in,gs->tree_map_sz)>=0)
            {
              t1++; 
              if (gs->num_local_reduce[i]<=0) SETERRQ(PETSC_ERR_PLIB,"nobody in list?");
              gs->num_local_reduce[i] *= -1;
            }
           **reduce=map[**reduce];
        }

      /* intersection is empty */
      if (!t1)
        {
          gs->local_strength = FULL;
          gs->num_local_gop = 0;          
        }
      /* intersection not empty */
      else
        {
          gs->local_strength = PARTIAL;
          ierr = SMI_sort((void*)gs->num_local_reduce, (void*)gs->local_reduce, gs->num_local + 1, SORT_INT_PTR);CHKERRQ(ierr);

          gs->num_local_gop = t1;
          gs->num_local_total =  gs->num_local;
          gs->num_local    -= t1;
          gs->gop_local_reduce = gs->local_reduce;
          gs->num_gop_local_reduce = gs->num_local_reduce;

          for (i=0; i<t1; i++)
            {
              if (gs->num_gop_local_reduce[i]>=0) SETERRQ(PETSC_ERR_PLIB,"they aren't negative?");
              gs->num_gop_local_reduce[i] *= -1;
              gs->local_reduce++;
              gs->num_local_reduce++;
            }
          gs->local_reduce++;
          gs->num_local_reduce++;
        }
    }

  elms = gs->pw_elm_list;
  nel  = gs->len_pw_list;
  for (i=0; i<nel; i++)
    {elms[i] = map[elms[i]];}

  elms = gs->tree_map_in;
  nel  = gs->tree_map_sz;
  for (i=0; i<nel; i++)
    {elms[i] = map[elms[i]];}

  /* clean up */
  free((void*) gs->local_elms);
  free((void*) gs->companion);
  free((void*) gs->elms);
  free((void*) gs->ngh_buf);
  gs->local_elms = gs->companion = gs->elms = gs->ngh_buf = NULL;
  PetscFunctionReturn(0);
}

/******************************************************************************/
static PetscErrorCode place_in_tree( PetscInt elm)
{
   PetscInt *tp, n;

  PetscFunctionBegin;
  if (ntree==tree_buf_sz)
    {
      if (tree_buf_sz)
        {
          tp = tree_buf;
          n = tree_buf_sz;
          tree_buf_sz<<=1;
          tree_buf = (PetscInt*)malloc(tree_buf_sz*sizeof(PetscInt));
          ivec_copy(tree_buf,tp,n);
          free(tp);
        }
      else
        {
          tree_buf_sz = TREE_BUF_SZ;
          tree_buf = (PetscInt*)malloc(tree_buf_sz*sizeof(PetscInt));
        }
    }

  tree_buf[ntree++] = elm;
  PetscFunctionReturn(0);
}

/******************************************************************************/
static PetscErrorCode get_ngh_buf(gs_id *gs)
{
   PetscInt i, j, npw=0, ntree_map=0;
  PetscInt p_mask_size, ngh_buf_size, buf_size;
  PetscInt *p_mask, *sh_proc_mask, *pw_sh_proc_mask;
  PetscInt *ngh_buf, *buf1, *buf2;
  PetscInt offset, per_load, num_loads, or_ct, start, end;
  PetscInt *ptr1, *ptr2, i_start, negl, nel, *elms;
  PetscInt oper=GL_B_OR;
  PetscInt *ptr3, *t_mask, level, ct1, ct2;
  PetscErrorCode ierr;

  PetscFunctionBegin;
  /* to make life easier */
  nel   = gs->nel;
  elms  = gs->elms;
  level = gs->level;
  
  /* det #bytes needed for processor bit masks and init w/mask cor. to my_id */
  p_mask = (PetscInt*) malloc(p_mask_size=len_bit_mask(num_nodes));
  ierr = set_bit_mask(p_mask,p_mask_size,my_id);CHKERRQ(ierr);

  /* allocate space for masks and info bufs */
  gs->nghs = sh_proc_mask = (PetscInt*) malloc(p_mask_size);
  gs->pw_nghs = pw_sh_proc_mask = (PetscInt*) malloc(p_mask_size);
  gs->ngh_buf_sz = ngh_buf_size = p_mask_size*nel;
  t_mask = (PetscInt*) malloc(p_mask_size);
  gs->ngh_buf = ngh_buf = (PetscInt*) malloc(ngh_buf_size);

  /* comm buffer size ... memory usage bounded by ~2*msg_buf */
  /* had thought I could exploit rendezvous threshold */

  /* default is one pass */
  per_load = negl  = gs->negl;
  gs->num_loads = num_loads = 1;
  i=p_mask_size*negl;

  /* possible overflow on buffer size */
  /* overflow hack                    */
  if (i<0) {i=INT_MAX;}

  buf_size = PetscMin(msg_buf,i);

  /* can we do it? */
  if (p_mask_size>buf_size) SETERRQ2(PETSC_ERR_PLIB,"get_ngh_buf() :: buf<pms :: %d>%d\n",p_mask_size,buf_size);

  /* get giop buf space ... make *only* one malloc */
  buf1 = (PetscInt*) malloc(buf_size<<1);

  /* more than one gior exchange needed? */
  if (buf_size!=i)
    {
      per_load = buf_size/p_mask_size;
      buf_size = per_load*p_mask_size;
      gs->num_loads = num_loads = negl/per_load + (negl%per_load>0);
    }


  /* convert buf sizes from #bytes to #ints - 32 bit only! */
  p_mask_size/=sizeof(PetscInt); ngh_buf_size/=sizeof(PetscInt); buf_size/=sizeof(PetscInt);
  
  /* find giop work space */
  buf2 = buf1+buf_size;

  /* hold #ints needed for processor masks */
  gs->mask_sz=p_mask_size;

  /* init buffers */ 
  ierr = ivec_zero(sh_proc_mask,p_mask_size);CHKERRQ(ierr);
  ierr = ivec_zero(pw_sh_proc_mask,p_mask_size);CHKERRQ(ierr);
  ierr = ivec_zero(ngh_buf,ngh_buf_size);CHKERRQ(ierr);

  /* HACK reset tree info */
  tree_buf=NULL;
  tree_buf_sz=ntree=0;

  /* ok do it */
  for (ptr1=ngh_buf,ptr2=elms,end=gs->gl_min,or_ct=i=0; or_ct<num_loads; or_ct++)
    {
      /* identity for bitwise or is 000...000 */
      ivec_zero(buf1,buf_size);

      /* load msg buffer */
      for (start=end,end+=per_load,i_start=i; (offset=*ptr2)<end; i++, ptr2++)
        {
          offset = (offset-start)*p_mask_size;
          ivec_copy(buf1+offset,p_mask,p_mask_size);
        }

      /* GLOBAL: pass buffer */
      ierr = giop(buf1,buf2,buf_size,&oper);CHKERRQ(ierr);


      /* unload buffer into ngh_buf */
      ptr2=(elms+i_start);
      for(ptr3=buf1,j=start; j<end; ptr3+=p_mask_size,j++)
        {
          /* I own it ... may have to pairwise it */
          if (j==*ptr2)
            {
              /* do i share it w/anyone? */
              ct1 = ct_bits((char *)ptr3,p_mask_size*sizeof(PetscInt));
              /* guess not */
              if (ct1<2)
                {ptr2++; ptr1+=p_mask_size; continue;}

              /* i do ... so keep info and turn off my bit */
              ivec_copy(ptr1,ptr3,p_mask_size);
              ierr = ivec_xor(ptr1,p_mask,p_mask_size);CHKERRQ(ierr);
              ierr = ivec_or(sh_proc_mask,ptr1,p_mask_size);CHKERRQ(ierr);
              
              /* is it to be done pairwise? */
              if (--ct1<=level)
                {
                  npw++;
                  
                  /* turn on high bit to indicate pw need to process */
                  *ptr2++ |= TOP_BIT; 
                  ierr = ivec_or(pw_sh_proc_mask,ptr1,p_mask_size);CHKERRQ(ierr);
                  ptr1+=p_mask_size; 
                  continue;
                }

              /* get set for next and note that I have a tree contribution */
              /* could save exact elm index for tree here -> save a search */
              ptr2++; ptr1+=p_mask_size; ntree_map++;
            }
          /* i don't but still might be involved in tree */
          else
            {

              /* shared by how many? */
              ct1 = ct_bits((char *)ptr3,p_mask_size*sizeof(PetscInt));

              /* none! */
              if (ct1<2) continue;

              /* is it going to be done pairwise? but not by me of course!*/
              if (--ct1<=level) continue;
            }
          /* LATER we're going to have to process it NOW */
          /* nope ... tree it */
          ierr = place_in_tree(j);CHKERRQ(ierr);
        }
    }

  free((void*)t_mask);
  free((void*)buf1);

  gs->len_pw_list=npw;
  gs->num_nghs = ct_bits((char *)sh_proc_mask,p_mask_size*sizeof(PetscInt));

  /* expand from bit mask list to int list and save ngh list */
  gs->nghs = (PetscInt*) malloc(gs->num_nghs * sizeof(PetscInt));
  bm_to_proc((char *)sh_proc_mask,p_mask_size*sizeof(PetscInt),gs->nghs);

  gs->num_pw_nghs = ct_bits((char *)pw_sh_proc_mask,p_mask_size*sizeof(PetscInt));

  oper = GL_MAX;
  ct1 = gs->num_nghs;
  ierr = giop(&ct1,&ct2,1,&oper);CHKERRQ(ierr);
  gs->max_nghs = ct1;

  gs->tree_map_sz  = ntree_map;
  gs->max_left_over=ntree;

  free((void*)p_mask);
  free((void*)sh_proc_mask);
  PetscFunctionReturn(0);
}

/******************************************************************************/
static PetscErrorCode set_pairwise(gs_id *gs)
{
   PetscInt i, j;
  PetscInt p_mask_size;
  PetscInt *p_mask, *sh_proc_mask, *tmp_proc_mask;
  PetscInt *ngh_buf, *buf2;
  PetscInt offset;
  PetscInt *msg_list, *msg_size, **msg_nodes, nprs;
  PetscInt *pairwise_elm_list, len_pair_list=0;
  PetscInt *iptr, t1, i_start, nel, *elms;
  PetscInt ct;
  PetscErrorCode ierr;

  PetscFunctionBegin;
  /* to make life easier */
  nel  = gs->nel;
  elms = gs->elms;
  ngh_buf = gs->ngh_buf;
  sh_proc_mask  = gs->pw_nghs;

  /* need a few temp masks */
  p_mask_size   = len_bit_mask(num_nodes);
  p_mask        = (PetscInt*) malloc(p_mask_size);
  tmp_proc_mask = (PetscInt*) malloc(p_mask_size);

  /* set mask to my my_id's bit mask */
  ierr = set_bit_mask(p_mask,p_mask_size,my_id);CHKERRQ(ierr);

  p_mask_size /= sizeof(PetscInt);
          
  len_pair_list=gs->len_pw_list;
  gs->pw_elm_list=pairwise_elm_list=(PetscInt*)malloc((len_pair_list+1)*sizeof(PetscInt));

  /* how many processors (nghs) do we have to exchange with? */
  nprs=gs->num_pairs=ct_bits((char *)sh_proc_mask,p_mask_size*sizeof(PetscInt));


  /* allocate space for gs_gop() info */
  gs->pair_list = msg_list = (PetscInt *)  malloc(sizeof(PetscInt)*nprs);
  gs->msg_sizes = msg_size  = (PetscInt *)  malloc(sizeof(PetscInt)*nprs);
  gs->node_list = msg_nodes = (PetscInt **) malloc(sizeof(PetscInt*)*(nprs+1));

  /* init msg_size list */
  ierr = ivec_zero(msg_size,nprs);CHKERRQ(ierr);

  /* expand from bit mask list to int list */
  ierr = bm_to_proc((char *)sh_proc_mask,p_mask_size*sizeof(PetscInt),msg_list);CHKERRQ(ierr);
  
  /* keep list of elements being handled pairwise */
  for (i=j=0;i<nel;i++)
    {
      if (elms[i] & TOP_BIT)
        {elms[i] ^= TOP_BIT; pairwise_elm_list[j++] = i;}
    }
  pairwise_elm_list[j] = -1;

  gs->msg_ids_out = (MPI_Request *)  malloc(sizeof(MPI_Request)*(nprs+1));
  gs->msg_ids_out[nprs] = MPI_REQUEST_NULL;
  gs->msg_ids_in = (MPI_Request *)  malloc(sizeof(MPI_Request)*(nprs+1));
  gs->msg_ids_in[nprs] = MPI_REQUEST_NULL;
  gs->pw_vals = (PetscScalar *) malloc(sizeof(PetscScalar)*len_pair_list*vec_sz);

  /* find who goes to each processor */
  for (i_start=i=0;i<nprs;i++)
    {
      /* processor i's mask */
      ierr = set_bit_mask(p_mask,p_mask_size*sizeof(PetscInt),msg_list[i]);CHKERRQ(ierr);

      /* det # going to processor i */
      for (ct=j=0;j<len_pair_list;j++)
        {
          buf2 = ngh_buf+(pairwise_elm_list[j]*p_mask_size);
          ierr = ivec_and3(tmp_proc_mask,p_mask,buf2,p_mask_size);CHKERRQ(ierr);
          if (ct_bits((char *)tmp_proc_mask,p_mask_size*sizeof(PetscInt)))
            {ct++;}
        }
      msg_size[i] = ct;
      i_start = PetscMax(i_start,ct);

      /*space to hold nodes in message to first neighbor */
      msg_nodes[i] = iptr = (PetscInt*) malloc(sizeof(PetscInt)*(ct+1));

      for (j=0;j<len_pair_list;j++)
        {
          buf2 = ngh_buf+(pairwise_elm_list[j]*p_mask_size);
          ierr = ivec_and3(tmp_proc_mask,p_mask,buf2,p_mask_size);CHKERRQ(ierr);
          if (ct_bits((char *)tmp_proc_mask,p_mask_size*sizeof(PetscInt)))
            {*iptr++ = j;}
        }
      *iptr = -1;
    }
  msg_nodes[nprs] = NULL;

  j=gs->loc_node_pairs=i_start;
  t1 = GL_MAX;
  ierr = giop(&i_start,&offset,1,&t1);CHKERRQ(ierr);
  gs->max_node_pairs = i_start;

  i_start=j;
  t1 = GL_MIN;
  ierr = giop(&i_start,&offset,1,&t1);CHKERRQ(ierr);
  gs->min_node_pairs = i_start;

  i_start=j;
  t1 = GL_ADD;
  ierr = giop(&i_start,&offset,1,&t1);CHKERRQ(ierr);
  gs->avg_node_pairs = i_start/num_nodes + 1;

  i_start=nprs;
  t1 = GL_MAX;
  giop(&i_start,&offset,1,&t1);
  gs->max_pairs = i_start;


  /* remap pairwise in tail of gsi_via_bit_mask() */
  gs->msg_total = ivec_sum(gs->msg_sizes,nprs);
  gs->out = (PetscScalar *) malloc(sizeof(PetscScalar)*gs->msg_total*vec_sz);
  gs->in  = (PetscScalar *) malloc(sizeof(PetscScalar)*gs->msg_total*vec_sz);

  /* reset malloc pool */
  free((void*)p_mask);
  free((void*)tmp_proc_mask);
  PetscFunctionReturn(0);
}    

/* to do pruned tree just save ngh buf copy for each one and decode here!
******************************************************************************/
static PetscErrorCode set_tree(gs_id *gs)
{
  PetscInt i, j, n, nel;
  PetscInt *iptr_in, *iptr_out, *tree_elms, *elms;

  PetscFunctionBegin;
  /* local work ptrs */
  elms = gs->elms;
  nel     = gs->nel;

  /* how many via tree */
  gs->tree_nel  = n = ntree;
  gs->tree_elms = tree_elms = iptr_in = tree_buf;
  gs->tree_buf  = (PetscScalar *) malloc(sizeof(PetscScalar)*n*vec_sz);
  gs->tree_work = (PetscScalar *) malloc(sizeof(PetscScalar)*n*vec_sz);
  j=gs->tree_map_sz;
  gs->tree_map_in = iptr_in  = (PetscInt*) malloc(sizeof(PetscInt)*(j+1));
  gs->tree_map_out = iptr_out = (PetscInt*) malloc(sizeof(PetscInt)*(j+1));

  /* search the longer of the two lists */
  /* note ... could save this info in get_ngh_buf and save searches */
  if (n<=nel)
    {
      /* bijective fct w/remap - search elm list */
      for (i=0; i<n; i++)
        {
          if ((j=ivec_binary_search(*tree_elms++,elms,nel))>=0)
            {*iptr_in++ = j; *iptr_out++ = i;} 
        }
    }
  else
    {
      for (i=0; i<nel; i++)
        {
          if ((j=ivec_binary_search(*elms++,tree_elms,n))>=0) 
            {*iptr_in++ = i; *iptr_out++ = j;}
        }
    }

  /* sentinel */
  *iptr_in = *iptr_out = -1;
  PetscFunctionReturn(0);
}

/******************************************************************************/
static PetscErrorCode gs_gop_local_out( gs_id *gs,  PetscScalar *vals)
{
  PetscInt *num, *map, **reduce;
  PetscScalar tmp;

  PetscFunctionBegin;
  num    = gs->num_gop_local_reduce;  
  reduce = gs->gop_local_reduce;  
  while ((map = *reduce++))
    {
      /* wall */
      if (*num == 2)
        {
          num ++;
          vals[map[1]] = vals[map[0]];
        }
      /* corner shared by three elements */
      else if (*num == 3)
        {
          num ++;
          vals[map[2]] = vals[map[1]] = vals[map[0]];
        }
      /* corner shared by four elements */
      else if (*num == 4)
        {
          num ++;
          vals[map[3]] = vals[map[2]] = vals[map[1]] = vals[map[0]];
        }
      /* general case ... odd geoms ... 3D*/
      else
        {
          num++;
          tmp = *(vals + *map++);
          while (*map >= 0)
            {*(vals + *map++) = tmp;}
        }
    }
  PetscFunctionReturn(0);
}

/******************************************************************************/
static PetscErrorCode gs_gop_local_plus( gs_id *gs,  PetscScalar *vals)
{
   PetscInt *num, *map, **reduce;
   PetscScalar tmp;

  PetscFunctionBegin;
  num    = gs->num_local_reduce;  
  reduce = gs->local_reduce;  
  while ((map = *reduce))
    {
      /* wall */
      if (*num == 2)
        {
          num ++; reduce++;
          vals[map[1]] = vals[map[0]] += vals[map[1]];
        }
      /* corner shared by three elements */
      else if (*num == 3)
        {
          num ++; reduce++;
          vals[map[2]]=vals[map[1]]=vals[map[0]]+=(vals[map[1]]+vals[map[2]]);
        }
      /* corner shared by four elements */
      else if (*num == 4)
        {
          num ++; reduce++;
          vals[map[1]]=vals[map[2]]=vals[map[3]]=vals[map[0]] += 
                                 (vals[map[1]] + vals[map[2]] + vals[map[3]]);
        }
      /* general case ... odd geoms ... 3D*/
      else
        {
          num ++;
          tmp = 0.0;
          while (*map >= 0)
            {tmp += *(vals + *map++);}

          map = *reduce++;
          while (*map >= 0)
            {*(vals + *map++) = tmp;}
        }
    }
  PetscFunctionReturn(0);
}

/******************************************************************************/
static PetscErrorCode gs_gop_local_in_plus( gs_id *gs,  PetscScalar *vals)
{
   PetscInt *num, *map, **reduce;
   PetscScalar *base;

  PetscFunctionBegin;
  num    = gs->num_gop_local_reduce;  
  reduce = gs->gop_local_reduce;  
  while ((map = *reduce++))
    {
      /* wall */
      if (*num == 2)
        {
          num ++;
          vals[map[0]] += vals[map[1]];
        }
      /* corner shared by three elements */
      else if (*num == 3)
        {
          num ++;
          vals[map[0]] += (vals[map[1]] + vals[map[2]]);
        }
      /* corner shared by four elements */
      else if (*num == 4)
        {
          num ++;
          vals[map[0]] += (vals[map[1]] + vals[map[2]] + vals[map[3]]);
        }
      /* general case ... odd geoms ... 3D*/
      else
        {
          num++;
          base = vals + *map++;
          while (*map >= 0)
            {*base += *(vals + *map++);}
        }
    }
  PetscFunctionReturn(0);
}

/******************************************************************************/
PetscErrorCode gs_free( gs_id *gs)
{
   PetscInt i;

  PetscFunctionBegin;
  if (gs->nghs) {free((void*) gs->nghs);}
  if (gs->pw_nghs) {free((void*) gs->pw_nghs);}

  /* tree */
  if (gs->max_left_over)
    {
      if (gs->tree_elms) {free((void*) gs->tree_elms);}
      if (gs->tree_buf) {free((void*) gs->tree_buf);}
      if (gs->tree_work) {free((void*) gs->tree_work);}
      if (gs->tree_map_in) {free((void*) gs->tree_map_in);}
      if (gs->tree_map_out) {free((void*) gs->tree_map_out);}
    }

  /* pairwise info */
  if (gs->num_pairs)
    {    
      /* should be NULL already */
      if (gs->ngh_buf) {free((void*) gs->ngh_buf);}
      if (gs->elms) {free((void*) gs->elms);}
      if (gs->local_elms) {free((void*) gs->local_elms);}
      if (gs->companion) {free((void*) gs->companion);}
      
      /* only set if pairwise */
      if (gs->vals) {free((void*) gs->vals);}
      if (gs->in) {free((void*) gs->in);}
      if (gs->out) {free((void*) gs->out);}
      if (gs->msg_ids_in) {free((void*) gs->msg_ids_in);}  
      if (gs->msg_ids_out) {free((void*) gs->msg_ids_out);}
      if (gs->pw_vals) {free((void*) gs->pw_vals);}
      if (gs->pw_elm_list) {free((void*) gs->pw_elm_list);}
      if (gs->node_list) 
        {
          for (i=0;i<gs->num_pairs;i++)
            {if (gs->node_list[i]) {free((void*) gs->node_list[i]);}}
          free((void*) gs->node_list);
        }
      if (gs->msg_sizes) {free((void*) gs->msg_sizes);}
      if (gs->pair_list) {free((void*) gs->pair_list);}
    }

  /* local info */
  if (gs->num_local_total>=0)
    {
      for (i=0;i<gs->num_local_total+1;i++)
        /*      for (i=0;i<gs->num_local_total;i++) */
        {
          if (gs->num_gop_local_reduce[i]) 
            {free((void*) gs->gop_local_reduce[i]);}
        }
    }

  /* if intersection tree/pairwise and local isn't empty */
  if (gs->gop_local_reduce) {free((void*) gs->gop_local_reduce);}
  if (gs->num_gop_local_reduce) {free((void*) gs->num_gop_local_reduce);}

  free((void*) gs);
  PetscFunctionReturn(0);
}

/******************************************************************************/
PetscErrorCode gs_gop_vec( gs_id *gs,  PetscScalar *vals,  const char *op,  PetscInt step)
{
  PetscErrorCode ierr;

  PetscFunctionBegin;
  switch (*op) {
  case '+':
    gs_gop_vec_plus(gs,vals,step);
    break;
  default:
    ierr = PetscInfo1(0,"gs_gop_vec() :: %c is not a valid op",op[0]);CHKERRQ(ierr);
    ierr = PetscInfo(0,"gs_gop_vec() :: default :: plus");CHKERRQ(ierr);
    gs_gop_vec_plus(gs,vals,step);    
    break;
  }
  PetscFunctionReturn(0);
}

/******************************************************************************/
static PetscErrorCode gs_gop_vec_plus( gs_id *gs,  PetscScalar *vals,  PetscInt step)
{
  PetscFunctionBegin;
  if (!gs) {SETERRQ(PETSC_ERR_PLIB,"gs_gop_vec() passed NULL gs handle!!!");}

  /* local only operations!!! */
  if (gs->num_local)
    {gs_gop_vec_local_plus(gs,vals,step);}

  /* if intersection tree/pairwise and local isn't empty */
  if (gs->num_local_gop)
    {
      gs_gop_vec_local_in_plus(gs,vals,step);

      /* pairwise */
      if (gs->num_pairs)
        {gs_gop_vec_pairwise_plus(gs,vals,step);}

      /* tree */
      else if (gs->max_left_over)
        {gs_gop_vec_tree_plus(gs,vals,step);}

      gs_gop_vec_local_out(gs,vals,step);
    }
  /* if intersection tree/pairwise and local is empty */
  else
    {
      /* pairwise */
      if (gs->num_pairs)
        {gs_gop_vec_pairwise_plus(gs,vals,step);}

      /* tree */
      else if (gs->max_left_over)
        {gs_gop_vec_tree_plus(gs,vals,step);}
    }
  PetscFunctionReturn(0);
}

/******************************************************************************/
static PetscErrorCode gs_gop_vec_local_plus( gs_id *gs,  PetscScalar *vals, PetscInt step)
{
   PetscInt *num, *map, **reduce;
   PetscScalar *base;

  PetscFunctionBegin;
  num    = gs->num_local_reduce;  
  reduce = gs->local_reduce;  
  while ((map = *reduce))
    {
      base = vals + map[0] * step;

      /* wall */
      if (*num == 2)
        {
          num++; reduce++;
          rvec_add (base,vals+map[1]*step,step);
          rvec_copy(vals+map[1]*step,base,step);
        }
      /* corner shared by three elements */
      else if (*num == 3)
        {
          num++; reduce++;
          rvec_add (base,vals+map[1]*step,step);
          rvec_add (base,vals+map[2]*step,step);
          rvec_copy(vals+map[2]*step,base,step);
          rvec_copy(vals+map[1]*step,base,step);
        }
      /* corner shared by four elements */
      else if (*num == 4)
        {
          num++; reduce++;
          rvec_add (base,vals+map[1]*step,step);
          rvec_add (base,vals+map[2]*step,step);
          rvec_add (base,vals+map[3]*step,step);
          rvec_copy(vals+map[3]*step,base,step);
          rvec_copy(vals+map[2]*step,base,step);
          rvec_copy(vals+map[1]*step,base,step);
        }
      /* general case ... odd geoms ... 3D */
      else
        {
          num++;
          while (*++map >= 0)
            {rvec_add (base,vals+*map*step,step);}
              
          map = *reduce;
          while (*++map >= 0)
            {rvec_copy(vals+*map*step,base,step);}
          
          reduce++;
        }
    }
  PetscFunctionReturn(0);
}

/******************************************************************************/
static PetscErrorCode gs_gop_vec_local_in_plus( gs_id *gs,  PetscScalar *vals, PetscInt step)
{
   PetscInt  *num, *map, **reduce;
   PetscScalar *base;
  PetscFunctionBegin;
  num    = gs->num_gop_local_reduce;  
  reduce = gs->gop_local_reduce;  
  while ((map = *reduce++))
    {
      base = vals + map[0] * step;

      /* wall */
      if (*num == 2)
        {
          num ++;
          rvec_add(base,vals+map[1]*step,step);
        }
      /* corner shared by three elements */
      else if (*num == 3)
        {
          num ++;
          rvec_add(base,vals+map[1]*step,step);
          rvec_add(base,vals+map[2]*step,step);
        }
      /* corner shared by four elements */
      else if (*num == 4)
        {
          num ++;
          rvec_add(base,vals+map[1]*step,step);
          rvec_add(base,vals+map[2]*step,step);
          rvec_add(base,vals+map[3]*step,step);
        }
      /* general case ... odd geoms ... 3D*/
      else
        {
          num++;
          while (*++map >= 0)
            {rvec_add(base,vals+*map*step,step);}
        }
    }
  PetscFunctionReturn(0);
}

/******************************************************************************/
static PetscErrorCode gs_gop_vec_local_out( gs_id *gs,  PetscScalar *vals, PetscInt step)
{
   PetscInt *num, *map, **reduce;
   PetscScalar *base;

  PetscFunctionBegin;
  num    = gs->num_gop_local_reduce;  
  reduce = gs->gop_local_reduce;  
  while ((map = *reduce++))
    {
      base = vals + map[0] * step;

      /* wall */
      if (*num == 2)
        {
          num ++;
          rvec_copy(vals+map[1]*step,base,step);
        }
      /* corner shared by three elements */
      else if (*num == 3)
        {
          num ++;
          rvec_copy(vals+map[1]*step,base,step);
          rvec_copy(vals+map[2]*step,base,step);
        }
      /* corner shared by four elements */
      else if (*num == 4)
        {
          num ++;
          rvec_copy(vals+map[1]*step,base,step);
          rvec_copy(vals+map[2]*step,base,step);
          rvec_copy(vals+map[3]*step,base,step);
        }
      /* general case ... odd geoms ... 3D*/
      else
        {
          num++;
          while (*++map >= 0)
            {rvec_copy(vals+*map*step,base,step);}
        }
    }
  PetscFunctionReturn(0);
}

/******************************************************************************/
static PetscErrorCode gs_gop_vec_pairwise_plus( gs_id *gs,  PetscScalar *in_vals, PetscInt step)
{
  PetscScalar *dptr1, *dptr2, *dptr3, *in1, *in2;
  PetscInt *iptr, *msg_list, *msg_size, **msg_nodes;
  PetscInt *pw, *list, *size, **nodes;
  MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
  MPI_Status status;
  PetscBLASInt i1 = 1,dstep;
  PetscErrorCode ierr;

  PetscFunctionBegin;
  /* strip and load s */
  msg_list =list         = gs->pair_list;
  msg_size =size         = gs->msg_sizes;
  msg_nodes=nodes        = gs->node_list;
  iptr=pw                = gs->pw_elm_list;  
  dptr1=dptr3            = gs->pw_vals;
  msg_ids_in  = ids_in   = gs->msg_ids_in;
  msg_ids_out = ids_out  = gs->msg_ids_out;
  dptr2                  = gs->out;
  in1=in2                = gs->in;

  /* post the receives */
  /*  msg_nodes=nodes; */
  do 
    {
      /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
         second one *list and do list++ afterwards */
      ierr = MPI_Irecv(in1, *size *step, MPIU_SCALAR, MPI_ANY_SOURCE, MSGTAG1 + *list++, gs->gs_comm, msg_ids_in++);CHKERRQ(ierr);
      in1 += *size++ *step;
    }
  while (*++msg_nodes);
  msg_nodes=nodes;

  /* load gs values into in out gs buffers */  
  while (*iptr >= 0)
    {
      rvec_copy(dptr3,in_vals + *iptr*step,step);
      dptr3+=step;
      iptr++;
    }

  /* load out buffers and post the sends */
  while ((iptr = *msg_nodes++))
    {
      dptr3 = dptr2;
      while (*iptr >= 0)
        {
          rvec_copy(dptr2,dptr1 + *iptr*step,step);
          dptr2+=step;
          iptr++;
        }
      ierr = MPI_Isend(dptr3, *msg_size++ *step, MPIU_SCALAR, *msg_list++, MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);CHKERRQ(ierr);
    }

  /* tree */
  if (gs->max_left_over)
    {gs_gop_vec_tree_plus(gs,in_vals,step);}

  /* process the received data */
  msg_nodes=nodes;
  while ((iptr = *nodes++)){
    PetscScalar d1 = 1.0;
      /* Should I check the return value of MPI_Wait() or status? */
      /* Can this loop be replaced by a call to MPI_Waitall()? */
      ierr = MPI_Wait(ids_in++, &status);CHKERRQ(ierr);
      while (*iptr >= 0) {
	dstep = PetscBLASIntCast(step);
        BLASaxpy_(&dstep,&d1,in2,&i1,dptr1 + *iptr*step,&i1);
	in2+=step;
	iptr++;
      }
  }

  /* replace vals */
  while (*pw >= 0)
    {
      rvec_copy(in_vals + *pw*step,dptr1,step);
      dptr1+=step;
      pw++;
    }

  /* clear isend message handles */
  /* This changed for clarity though it could be the same */
  while (*msg_nodes++)
    /* Should I check the return value of MPI_Wait() or status? */
    /* Can this loop be replaced by a call to MPI_Waitall()? */
    {ierr = MPI_Wait(ids_out++, &status);CHKERRQ(ierr);}

  PetscFunctionReturn(0);
}

/******************************************************************************/
static PetscErrorCode gs_gop_vec_tree_plus( gs_id *gs,  PetscScalar *vals,  PetscInt step) 
{
  PetscInt size, *in, *out;  
  PetscScalar *buf, *work;
  PetscInt op[] = {GL_ADD,0};
  PetscBLASInt i1 = 1;

  PetscFunctionBegin;
  /* copy over to local variables */
  in   = gs->tree_map_in;
  out  = gs->tree_map_out;
  buf  = gs->tree_buf;
  work = gs->tree_work;
  size = gs->tree_nel*step;

  /* zero out collection buffer */
  rvec_zero(buf,size);


  /* copy over my contributions */
  while (*in >= 0)
    { 
      PetscBLASInt dstep = PetscBLASIntCast(step);
      BLAScopy_(&dstep,vals + *in++*step,&i1,buf + *out++*step,&i1);
    }

  /* perform fan in/out on full buffer */
  /* must change grop to handle the blas */
  grop(buf,work,size,op);

  /* reset */
  in   = gs->tree_map_in;
  out  = gs->tree_map_out;

  /* get the portion of the results I need */
  while (*in >= 0)
    {
      PetscBLASInt dstep = PetscBLASIntCast(step);
      BLAScopy_(&dstep,buf + *out++*step,&i1,vals + *in++*step,&i1);
    }
  PetscFunctionReturn(0);
}

/******************************************************************************/
PetscErrorCode gs_gop_hc( gs_id *gs,  PetscScalar *vals,  const char *op,  PetscInt dim)
{
  PetscErrorCode ierr;

  PetscFunctionBegin;
  switch (*op) {
  case '+':
    gs_gop_plus_hc(gs,vals,dim);
    break;
  default:
    ierr = PetscInfo1(0,"gs_gop_hc() :: %c is not a valid op",op[0]);CHKERRQ(ierr);
    ierr = PetscInfo(0,"gs_gop_hc() :: default :: plus\n");CHKERRQ(ierr);
    gs_gop_plus_hc(gs,vals,dim);    
    break;
  }
  PetscFunctionReturn(0);
}

/******************************************************************************/
static PetscErrorCode gs_gop_plus_hc( gs_id *gs,  PetscScalar *vals, PetscInt dim)
{
  PetscFunctionBegin;
  /* if there's nothing to do return */
  if (dim<=0)
    {  PetscFunctionReturn(0);}

  /* can't do more dimensions then exist */
  dim = PetscMin(dim,i_log2_num_nodes);

  /* local only operations!!! */
  if (gs->num_local)
    {gs_gop_local_plus(gs,vals);}

  /* if intersection tree/pairwise and local isn't empty */
  if (gs->num_local_gop)
    {
      gs_gop_local_in_plus(gs,vals);

      /* pairwise will do tree inside ... */
      if (gs->num_pairs)
        {gs_gop_pairwise_plus_hc(gs,vals,dim);}

      /* tree only */
      else if (gs->max_left_over)
        {gs_gop_tree_plus_hc(gs,vals,dim);}
      
      gs_gop_local_out(gs,vals);
    }
  /* if intersection tree/pairwise and local is empty */
  else
    {
      /* pairwise will do tree inside */
      if (gs->num_pairs)
        {gs_gop_pairwise_plus_hc(gs,vals,dim);}
      
      /* tree */
      else if (gs->max_left_over)
        {gs_gop_tree_plus_hc(gs,vals,dim);}
    }
  PetscFunctionReturn(0);
}

/******************************************************************************/
static PetscErrorCode gs_gop_pairwise_plus_hc( gs_id *gs,  PetscScalar *in_vals, PetscInt dim)
{
   PetscScalar *dptr1, *dptr2, *dptr3, *in1, *in2;
   PetscInt *iptr, *msg_list, *msg_size, **msg_nodes;
   PetscInt *pw, *list, *size, **nodes;
  MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
  MPI_Status status;
  PetscInt i, mask=1;
  PetscErrorCode ierr;

  PetscFunctionBegin;
  for (i=1; i<dim; i++)
    {mask<<=1; mask++;}


  /* strip and load s */
  msg_list =list         = gs->pair_list;
  msg_size =size         = gs->msg_sizes;
  msg_nodes=nodes        = gs->node_list;
  iptr=pw                = gs->pw_elm_list;  
  dptr1=dptr3            = gs->pw_vals;
  msg_ids_in  = ids_in   = gs->msg_ids_in;
  msg_ids_out = ids_out  = gs->msg_ids_out;
  dptr2                  = gs->out;
  in1=in2                = gs->in;

  /* post the receives */
  /*  msg_nodes=nodes; */
  do 
    {
      /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
         second one *list and do list++ afterwards */
      if ((my_id|mask)==(*list|mask))
        {
          ierr = MPI_Irecv(in1, *size, MPIU_SCALAR, MPI_ANY_SOURCE, MSGTAG1 + *list++, gs->gs_comm, msg_ids_in++);CHKERRQ(ierr);
          in1 += *size++;
        }
      else
        {list++; size++;}
    }
  while (*++msg_nodes);

  /* load gs values into in out gs buffers */  
  while (*iptr >= 0)
    {*dptr3++ = *(in_vals + *iptr++);}

  /* load out buffers and post the sends */
  msg_nodes=nodes;
  list = msg_list;
  while ((iptr = *msg_nodes++))
    {
      if ((my_id|mask)==(*list|mask))
        {
          dptr3 = dptr2;
          while (*iptr >= 0)
            {*dptr2++ = *(dptr1 + *iptr++);}
          /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
          /* is msg_ids_out++ correct? */
          ierr = MPI_Isend(dptr3, *msg_size++, MPIU_SCALAR, *list++, MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);CHKERRQ(ierr);
        }
      else
        {list++; msg_size++;}
    }

  /* do the tree while we're waiting */
  if (gs->max_left_over)
    {gs_gop_tree_plus_hc(gs,in_vals,dim);}

  /* process the received data */
  msg_nodes=nodes;
  list = msg_list;
  while ((iptr = *nodes++))
    {
      if ((my_id|mask)==(*list|mask))
        {
          /* Should I check the return value of MPI_Wait() or status? */
          /* Can this loop be replaced by a call to MPI_Waitall()? */
          ierr = MPI_Wait(ids_in++, &status);CHKERRQ(ierr);
          while (*iptr >= 0)
            {*(dptr1 + *iptr++) += *in2++;}
        }
      list++;
    }

  /* replace vals */
  while (*pw >= 0)
    {*(in_vals + *pw++) = *dptr1++;}

  /* clear isend message handles */
  /* This changed for clarity though it could be the same */
  while (*msg_nodes++)
    {
      if ((my_id|mask)==(*msg_list|mask))
        {
          /* Should I check the return value of MPI_Wait() or status? */
          /* Can this loop be replaced by a call to MPI_Waitall()? */
          ierr = MPI_Wait(ids_out++, &status);CHKERRQ(ierr);
        }
      msg_list++;
    }

  PetscFunctionReturn(0);
}

/******************************************************************************/
static PetscErrorCode gs_gop_tree_plus_hc(gs_id *gs, PetscScalar *vals, PetscInt dim)
{
  PetscInt size;
  PetscInt *in, *out;  
  PetscScalar *buf, *work;
  PetscInt op[] = {GL_ADD,0};

  PetscFunctionBegin;
  in   = gs->tree_map_in;
  out  = gs->tree_map_out;
  buf  = gs->tree_buf;
  work = gs->tree_work;
  size = gs->tree_nel;

  rvec_zero(buf,size);

  while (*in >= 0)
    {*(buf + *out++) = *(vals + *in++);}

  in   = gs->tree_map_in;
  out  = gs->tree_map_out;

  grop_hc(buf,work,size,op,dim);

  while (*in >= 0)
    {*(vals + *in++) = *(buf + *out++);}
  PetscFunctionReturn(0);
}



