/*
   master.c
   by Hongsuda Tangmunarunkit, 10/28/96
   master.c is the program for the master process.
*/

#include "app.h"
#include "mw.h"


#define SLAVENAME "/afs/cs/usr/hongsuda/pvm3/source/stest"
#define HOSTFILE "/afs/cs/usr/hongsuda/pvm3/hosts/hostfile"
#define SEE 0
#define ADDHOST 0
#define VERBOSE 0
#define DEBUG  0
#define SHOW 0

/* -------------------------- global ----------------------------- */


/* 
   list of machines 
   apollo.nectar (mdg)
   brandon.nectar
   chiron.nectar
   cronus.nectar
   hades.nectar
   helios.nectar
   hera.nectar
   selene.nectar

   rhea.nectar.cs.cmu.edu
   asclepius.nectar.cs.cmu.edu
   mnemosyne.nectar.cs.cmu.edu
   argus.nectar.cs.cmu.edu
   */

int gnum_shosts = 2;    /* number of slave hosts */
int gnum_processes = 3; /* should it be a program argument?? */
int gdimension = DIMENSION;
char **ghost_list;
int *gstid;
char **ginit_host_list;
int gblocksize;

block_type gmaster_block;
block_type gresult_block;
block_type gsubblock1;

/* ------------------------------------------------------------ */
int cal_num_subblocks(block_type block1, int len, int width){
  int block_len = get_length(block1);
  int block_width = get_width(block1);
  int num_operations = ceil(block_len/(double)len) * ceil(block_width/(double)width);
  
  return(num_operations);

}

/* ------------------------------------------------------------- */
/* given a block and the coordinate of the subblock, pack_subblock
   packs the matrix elements of that subblock to the send buffer */
void pack_subblock(block_type block1, int i1, int j1, int i2, int j2){
  int i, j, row, col;
  int len = i2 - i1 + 1;
  int width = j2 - j1 + 1;
  element_type *start_row; 
  int size; 
  
  pvm_pkint(&i1, 1, 1);
  pvm_pkint(&j1, 1, 1);
  pvm_pkint(&i2, 1, 1);
  pvm_pkint(&j2, 1, 1);
  
  row = i1 - block1.upper_left.i;
  col = j1 - block1.upper_left.j;
  
  size = sizeof(element_type) * width;

  for (i=0; i<len; i++){
    start_row = &((block1.data_block)[row][col]);
    pvm_pkbyte((char *)start_row, size, 1);
    row = row+1;
  }
}

/* ------------------------------------------------------------- */
/* unpack and then fill those elements into block1*/
void unpack_subblock(block_type block1){
  int i1, j1, i2, j2;
  int i, row, col;
  int len, width;
  int size;
  element_type *start_row;
  
  pvm_upkint(&i1, 1, 1);
  pvm_upkint(&j1, 1, 1);
  pvm_upkint(&i2, 1, 1);
  pvm_upkint(&j2, 1, 1);
  
  len = i2 - i1 + 1;
  width = j2 - j2 + 1;
  
  row = i1 - block1.upper_left.i;
  col = j1 - block1.upper_left.j;

  size = sizeof(element_type) * width;
  for (i=row; i<len; i++){
    start_row = &((block1.data_block)[row][col]);
    pvm_upkbyte((char *)start_row, size, 1);
    row = row+1;
  }
  
}

/* ------------------------------------------------------------- */

void send_subblock_orig(block_type block1, int len, int width){
  int i1, j1, i2, j2;
  int lower_right_i = block1.lower_right.i;
  int lower_right_j = block1.lower_right.j;
  block_type a_block;
  int total_bytes;
  int temp_int;

  i1 = block1.upper_left.i;

  /* first sending overhead ---- */

  pvm_initsend(ENCODING);
  temp_int = cal_num_subblocks(block1, len, width);
#if DEBUG
  printf("number of subblocks = %d \n", temp_int);
#endif
  pvm_pkint(&temp_int, 1, 1);
  pvm_pkint(&block1.upper_left.i, 1, 1);
  pvm_pkint(&block1.upper_left.j, 1, 1);
  pvm_pkint(&block1.lower_right.i, 1, 1);
  pvm_pkint(&block1.lower_right.j, 1, 1);
  pvm_send(gstid[0],EXCHANGE_DATA_TAG);
  
  /* send the content of the block ----- */

#if 1
  while (i1 <= lower_right_i){
    i2 = i1 + len - 1;
    if (i2 > lower_right_i)
      i2 = lower_right_i;
    
    j1 = block1.upper_left.j;
    while (j1 <= lower_right_j){
      j2 = j1 + width - 1;
      if (j2 > lower_right_j)
	j2 = lower_right_j;

      pvm_initsend(ENCODING);
      pack_subblock(block1, i1, j1, i2, j2);
      pvm_send(gstid[0], EXCHANGE_DATA_TAG);

      j1 = j2+1;
    }
    i1 = i2+1;
  }
#else

  pvm_initsend(ENCODING);
  pack_for_send(block1, &total_bytes);
  pvm_send(gstid[0],EXCHANGE_DATA_TAG);
  
#endif

}

/* ------------------------------------------------------------- */

void send_subblock(block_type block1, int r_threshold, int w_threshold){
  int i1, j1, i2, j2;
  int lower_right_i = block1.lower_right.i;
  int lower_right_j = block1.lower_right.j;
  block_type a_block;
  int total_bytes;
  int i, j;
  int len = ceil(get_length(block1)/(double)r_threshold) - 1;
  int width = ceil(get_width(block1)/(double)w_threshold) - 1;
  int counter1=0, counter2=0;

  i1 = block1.upper_left.i;
  
  for (counter1 = 0; counter1<len; counter1++){
    i2 = i1 + r_threshold - 1;
    
    j1 = block1.upper_left.j;
    for (counter2 = 0; counter2 <width; counter2++){
      j2 = j1 + w_threshold - 1;
        
      subblock(&a_block, block1, i1, j1, i2, j2);
      
      pvm_initsend(ENCODING);
      pack_for_send(a_block, &total_bytes);
      pvm_send(gstid[0],EXCHANGE_DATA_TAG);
      
      compute_one_block(&gresult_block, gsubblock1, a_block);
      
      free(a_block.data_block);
      
      j1 = j2+1;
    }

    j2 = lower_right_j;
    subblock(&a_block, block1, i1, j1, i2, j2);
    
    pvm_initsend(ENCODING);
    pack_for_send(a_block, &total_bytes);
    pvm_send(gstid[0],EXCHANGE_DATA_TAG);
    
    compute_one_block(&gresult_block, gsubblock1, a_block);
    
    free(a_block.data_block);
    
    i1 = i2+1;
  }

  
  i2 = lower_right_i;
  j1 = block1.upper_left.j;
  for (counter2 = 0; counter2 <width; counter2++){
    j2 = j1 + width - 1;
    
    subblock(&a_block, block1, i1, j1, i2, j2);
    
    pvm_initsend(ENCODING);
    pack_for_send(a_block, &total_bytes);
    pvm_send(gstid[0],EXCHANGE_DATA_TAG);
    
    compute_one_block(&gresult_block, gsubblock1, a_block);
    
    free(a_block.data_block);
    
    j1 = j2+1;
  }
  
  j2 = lower_right_j;
  subblock(&a_block, block1, i1, j1, i2, j2);
  
  pvm_initsend(ENCODING);
  pack_for_send(a_block, &total_bytes);
  pvm_send(gstid[0],EXCHANGE_DATA_TAG);
  
  compute_one_block(&gresult_block, gsubblock1, a_block);
  
  free(a_block.data_block);
  
}

/* ------------------------------------------------------------- */
/* Send initial block (row) to each slave
 */

void distribute_init_data(block_type iblock){
  int i;
  int size = gblocksize;
  block_type a_block1;
  int total_bytes1;
  int temp_int; 
  int i1, j1, i2, j2;

#if VERBOSE
  printf("size = %d \n", size);
  print_block(iblock);
#endif
  i1 = iblock.upper_left.i + size;  /* (i,j) should be (0,0) */
  j1 = iblock.upper_left.j;
  j2 = iblock.lower_right.j;

  temp_int = gnum_processes - 2;
  for (i=0; i<temp_int; i++){
    i2 = i1 + size - 1;
    subblock(&a_block1, iblock, i1, j1, i2, j2);
    i1 = i2+1;
    
#if VERBOSE
    printf("distribute_init_data\n");
    print_block(a_block1);
#endif
    pvm_initsend(ENCODING);
    pack_for_send(a_block1, &total_bytes1);
    pvm_send(gstid[i],INIT_DATA_TAG);
    
    free(a_block1.data_block);
  }
  
  i2 = iblock.lower_right.i;
  subblock(&a_block1, iblock, i1, j1, i2, j2);
  
#if VERBOSE
  printf("distribute_init_data: i=%d: almost send to %x \n", i, gstid[i]);
  print_block(a_block1);
#endif
  
  pvm_initsend(ENCODING);
  pack_for_send(a_block1, &total_bytes1);
  pvm_send(gstid[i],INIT_DATA_TAG);
  
  free(a_block1.data_block);
}

/* ------------------------------------------------------------- */
void compute_phase(block_type iblock){
  send_subblock_orig(iblock, THRESHOLD_LENGTH, THRESHOLD_WIDTH);
}
/* ------------------------------------------------------------- */


/* ------------------------------------------------------------- */
void compute_phase_orig(block_type iblock){
  int i;
  int size = gblocksize;
  block_type a_block;
  int total_bytes;
  int temp_int; 
  int i1, j1, i2, j2;
  
#if DEBUG
  printf("compute_phase: size = %d \n", size);
#endif
  i1 = iblock.upper_left.i;
  j1 = iblock.upper_left.j;
  i2 = iblock.lower_right.i;
  
  temp_int = gnum_processes-1;
  for (i=0; i<temp_int; i++){
    j2 = j1 + size - 1;
    subblock(&a_block, iblock, i1, j1, i2, j2);
    j1 = j2+1;
    
#if DEBUG
    printf("will send %d \n", i);
    print_block(a_block);
#endif

    send_subblock_orig(a_block, THRESHOLD_LENGTH, THRESHOLD_WIDTH);
    
    compute_one_block(&gresult_block, gsubblock1, a_block);  

#if VERBOSE
    printf("compute phase: i=%d: almost send to %x \n", i, gstid[0]);
    print_block(gresult_block);
#endif
    
    free(a_block.data_block);
  }
  
  j2 = iblock.lower_right.j;
  
  subblock(&a_block, iblock, i1, j1, i2, j2);

#if DEBUG
  printf("will send %d \n", i);
  print_block(a_block);
#endif

  send_subblock_orig(a_block, THRESHOLD_LENGTH, THRESHOLD_WIDTH);

  compute_one_block(&gresult_block, gsubblock1, a_block);  
  
#if VERBOSE
  printf("compute_phase: i=%d: almost send to %x \n", i, gstid[0]);
  print_block(gresult_block);
#endif
  
  free(a_block.data_block);

}

/* ------------------------------------------------------------ */
int cal_num_operations(block_type block1, int len, int width){
  int block_len = get_length(block1);
  int block_width = get_width(block1);
  int num_operations = ceil(block_len/(double)len) * ceil(block_width/(double)width);
  
  return(num_operations);

}
/* -------------------------- main ----------------------------- */
main(argc, argv)
     int argc;
     char **argv;
{
  int mytid;                  /* my task id */
  int i, j;
  int slave_argv[NUM_ARGC];
  int *r_buf;
  int *stid_list;
  int num_slaves;
  int prev_stid;
  int bufid, tmp_id;
  int total_bytes;
  char *data_buffer;
  block_type v_block1;
  block_type tmp_block;
  struct timeval tv1, tv2;  /* for timing */
  double dt1;
  int num_operations;

  if (argc > 1){
    gnum_shosts = atoi(argv[1]);
    gdimension = atoi(argv[2]);
  }
  stid_list = (int *)malloc(gnum_shosts*sizeof(int));

  /* start the timing */
  gettimeofday(&tv1, (struct timezone *)0);

  /* enroll in pvm */
  if ((mytid = pvm_mytid()) < 0) {
    exit(1);
  }
#if DEBUG
  printf("i'm t%x\n", mytid);
#endif


  for (i=0; i<gnum_shosts; i++)
    stid_list[i] = 0;
  
  /* start up slave task */
  num_slaves = pvm_spawn(SLAVENAME, (char **)0, 0, "", gnum_shosts, stid_list);
#if DEBUG
  printf("after spawning -- gnum_shosts = %d \n", gnum_shosts);
#endif

  if (num_slaves <= 0){
    fputs("Can't initiate slave \n", stderr);
    goto bail;
  }

#if DEBUG
  printf("num_slave = %d \n", num_slaves);
#endif  

  
  /* get a list of slaves' id */
  gnum_processes = num_slaves + 1; /* including the master process */
  gblocksize = gdimension/gnum_processes;
  gstid = (int *)malloc(num_slaves*sizeof(int));
  for (i=0, j=0;i<num_slaves; i++){
    if (stid_list[i] > 0){
      gstid[j++] = stid_list[i];
#if DEBUG
      printf("gstid[%d] = %x \n", i, stid_list[i]);
#endif
    }
  }
  
  /* initialize master block */
  init_block(&gmaster_block, gdimension, gdimension);
  init_result_block(&gresult_block, gdimension, gdimension);
  num_operations = cal_num_operations(gmaster_block, THRESHOLD_LENGTH, THRESHOLD_WIDTH );  

/*
  THRESHOLD_LENGTH = gdimension;
  THRESHOLD_WIDTH = ceil(gdimension/(double)gnum_processes);
  num_operations = gnum_processes;
  */

  /* Wait for slave task to start up */
  pvm_setopt(PvmRoute, PvmRouteDirect);
  for (i=0; i<num_slaves; i++){
    bufid = pvm_recv( -1, SLAVE_TAG );
    pvm_bufinfo(bufid, (int *)0, (int *)0, &tmp_id);
#if DEBUG
    printf("bufid = %x: slave is task t%x\n", bufid, tmp_id);
#endif
  }
  
  /* send the arguments to slaves */
  prev_stid = mytid;
  for (i=0; i<num_slaves; i++){
    slave_argv[0] = prev_stid;
    slave_argv[1] = gstid[(i+1)%num_slaves];
    slave_argv[2] = gnum_processes;
    slave_argv[3] = i;
    slave_argv[4] = num_operations;
    prev_stid = gstid[i];
    
    pvm_initsend(ENCODING);
    pvm_pkint(slave_argv, NUM_ARGC, 1);
        
    if (pvm_send(gstid[i], ARGV_TAG)){
      printf("Can't send data -- will force to go to bail\n");
      goto bail;
    }
#if DEBUG
    printf("NUM_ARGC=%d, id = %x: argv are %x, %x, %d, %d \n", NUM_ARGC, gstid[i], slave_argv[0], slave_argv[1], slave_argv[2], slave_argv[3]); 
#endif
  }
  
  /* master process distributes the initial data to slaves */
  distribute_init_data(gmaster_block); 
  
#if DEBUG
  printf("after districuting init data \n");
#endif
  
  /* create the first subblock for itself */
  subblock(&gsubblock1, gmaster_block, 0, 0, gblocksize-1, gdimension-1);
  
#if DEBUG
  printf("after calculate the first subblock \n");
#endif

  compute_phase_orig(gmaster_block);

#if DEBUG
  printf("after the compute_phase \n");
  print_block(gresult_block);
  printf("after printting result block\n");
#endif
  
  for (i=0; i<num_slaves; i++){
    bufid = pvm_recv(-1, SLAVE_TAG); 
    tmp_block = unpack_for_recv(&total_bytes);
    fill_master_block(gresult_block, tmp_block);  
    free(tmp_block.data_block);
  }
  
#if SHOW
  puts("\ndone");
  print_block(gresult_block);
#endif
  
  /* end the timing */
  gettimeofday(&tv2, (struct timezone *)0);
  
  dt1 = ((tv2.tv_sec - tv1.tv_sec) * 1000000) + tv2.tv_usec - tv1.tv_usec;
  dt1 = (double)dt1 / (double) 1000000 ;
  
  printf("the time used to compute A^2 (A:%dx%d) on %d hosts = %.10f sec\n", gdimension, gdimension, gnum_shosts+1, dt1);

 bail:
  for (i=0; i<num_slaves; i++){
    if (gstid[0] > 0)
      pvm_kill(gstid[0]);
  }
  
  pvm_exit();
  exit(1);
}


