/* Copyright 1995 Jonathan C. Hardwick */ #include #include #include #include "globals.h" #include "main.h" #include "manager.h" #ifdef TEAMS #include "leader.h" #define NO_TEAMS_LEFT -1 typedef struct team_t { int rnkLdr; int nelt; int nprocCur; int nprocToadd; int arnkToadd [MAXPROCS]; } team_t; static team_t mateam [MAXPROCS]; static int mnteam; /* XXX Rewrite the whole list of teams concept into a real list, even if * it's implemented using an array. */ /* Update team entry in the list. If it's not there, add it. */ static void update_team_entry (int leader, int nproc, int nelt) { int i; assert (leader > 0); for (i = 0; i < mnteam; i++) { if (mateam [i].rnkLdr == leader) break; } if (i == mnteam) mnteam++; mateam [i].rnkLdr = leader; mateam [i].nprocCur = nproc; mateam [i].nprocToadd = 0; mateam [i].nelt = nelt; } /* Delete a team entry. */ static void delete_team_entry (int leader) { int k, i, j; assert (leader > 0); /* First find the entry. */ for (k = 0; k < mnteam; k++) { if (mateam [k].rnkLdr == leader) break; } assert (k != mnteam); /* Then shift all entries above it down by 1. */ for (i = k; i < mnteam - 1; i++) { mateam [i].rnkLdr = mateam [i+1].rnkLdr; mateam [i].nprocCur = mateam [i+1].nprocCur; mateam [i].nprocToadd = mateam [i+1].nprocToadd; mateam [i].nelt = mateam [i+1].nelt; for (j = 0; j < mateam [i+1].nprocToadd; j++) { mateam [i].arnkToadd [j] = mateam [i+1]. arnkToadd[j]; } } mnteam--; } /* Called when manager receives a "here are two new teams message" from a * leader. */ static void split_team_in_two (team_split_msg_t msg, int rnkFrom) { assert (rnkFrom == msg.rnkLdrOld); /* Delete the old team if it now has <= 1 member, else update it. */ if (msg.nprocOld <= 1) { delete_team_entry (msg.rnkLdrOld); } else { assert (msg.rnkLdrOld > 0); update_team_entry (msg.rnkLdrOld, msg.nprocOld, msg.neltOld); } /* Add an entry for the new team if it has more than 1 member. */ if (msg.nprocNew > 1) { assert (msg.rnkLdrNew > 0); update_team_entry (msg.rnkLdrNew, msg.nprocNew, msg.neltNew); } } /* Return the index of the most loaded team (most loaded in terms of * amount of data per processor. */ static int most_loaded_team (void) { int i, indexMax, loadMax, loadCur; if (mnteam == 0) { return NO_TEAMS_LEFT; } indexMax = 0; loadMax = mateam [0].nelt / (mateam [0].nprocCur + mateam [0].nprocToadd); for (i = 1; i < mnteam; i++) { loadCur = mateam [i].nelt / (mateam [i].nprocCur + mateam [0].nprocToadd); if (loadCur > loadMax) { loadMax = loadCur; indexMax = i; } } return indexMax; } /* Called when manager receives a "sync: got any workers for me?" * message from a leader. */ static void sync_with_leader (int rnkFrom) { MPI_Status status; team_split_msg_t msg; int i; /* Find the team entry */ for (i = 0; i < mnteam; i++) { if (mateam [i].rnkLdr == rnkFrom) break; } assert (i != mnteam); /* Send the ranks of the processors to add to the team leader. */ MPI_Send (mateam [i].arnkToadd, mateam [i].nprocToadd, MPI_INT, rnkFrom, MGR_LDR_SYNC_TAG, gcomWorld); /* Update our concept of the team. */ mateam [i].nprocCur += mateam [i].nprocToadd; mateam [i].nprocToadd = 0; /* Wait for a split message. XXX manager could do proc_split for * itself if it knew who the leader of the new team would be. * Alternatively, put this in a LDR_MGR_SPLIT_TAG case in the main * loop, and when we're splitting a team record, reassign the * processors in arnkToadd. Current code is just the simplest (and * probably slowest) solution. */ MPI_Recv (&msg, sizeof(team_split_msg_t) / sizeof(MPI_INT), MPI_INT, rnkFrom, LDR_MGR_SPLIT_TAG, gcomWorld, &status); split_team_in_two (msg, rnkFrom); } void manager_main (int nelt) { double total = 0.0, start, stop; int nquit, dummy, team, rnkSender; MPI_Status status; update_team_entry (1, gnprocWkr, nelt); nquit = 0; do { MPI_Recv (&dummy, 0, MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG, gcomWorld, &status); start = MPI_Wtime (); MPE_Log_event (ASSIGN_START, 0, NULL); /* If a new team consists of a singleton proc, we don't count it as * a team, since it will be running serial code and can't be stopped * to add another proc. Thus, the number of teams will eventually * fall to zero. When the number of teams is 0, send everyone a * quit message, and wait for them to terminate. */ rnkSender = status.MPI_SOURCE; switch (status.MPI_TAG) { case WKR_MGR_DONE_TAG: /* A worker is done: find a team to add it to. */ team = most_loaded_team (); if (team == NO_TEAMS_LEFT) { /* If there are no teams left, tell the worker to quit. */ MPI_Send (&dummy, 0, MPI_INT, rnkSender, MGR_WKR_QUIT_TAG, gcomWorld); nquit++; } else { /* Put the worker on list to add to team we identified. */ mateam [team].arnkToadd [mateam[team].nprocToadd++] = rnkSender; } break; case LDR_MGR_SYNC_TAG: sync_with_leader (rnkSender); break; default: MPI_Abort (MPI_COMM_WORLD, 3); } stop = MPI_Wtime (); total += (stop - start); MPE_Log_event (ASSIGN_END, 0, NULL); } while (nquit != gnprocWkr); printf ("%g\n", total); } #elif STEAL static int marnkAvail [MAXPROCS]; static int mfirst = 0; static int mlast = 0; static int get_avail_proc (void) { int result; result = -1; if (mfirst != mlast) { result = marnkAvail [mfirst++]; if (mfirst == MAXPROCS) { mfirst = 0; } } assert (result != 0); return result; } static int add_avail_proc (int rnk) { int result; marnkAvail [mlast++] = rnk; if (mlast == MAXPROCS) { mlast = 0; } if (mlast >= mfirst) { result = mlast - mfirst; } else { result = MAXPROCS - (mfirst - mlast); } return result; } static void tell_workers_to_quit (void) { int proc, dummy; for (proc = 1; proc <= gnprocWkr; proc++) { MPI_Send (&dummy, 0, MPI_INT, proc, MGR_WKR_QUIT_TAG, gcomWorld); } } void manager_main (int nelt) { double total = 0.0, start, stop; int dummy, payload; int nprocAvail, rnkAvail, rnkSender; MPI_Status status; nprocAvail = 0; do { MPI_Recv (&payload, 1, MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG, gcomWorld, &status); start = MPI_Wtime (); MPE_Log_event (ASSIGN_START, 0, NULL); rnkSender = status.MPI_SOURCE; switch (status.MPI_TAG) { case WKR_MGR_DONE_TAG: nprocAvail = add_avail_proc (rnkSender); break; case WKR_MGR_HELP_TAG: rnkAvail = get_avail_proc (); if (rnkAvail == -1) { /* No spare processors. */ MPI_Send (&dummy, 0, MPI_INT, rnkSender, MGR_WKR_TOUGH_TAG, gcomWorld); } else { /* Send nelt to rnkAvail so it knows how much to receive. */ MPI_Send (&payload, 1, MPI_INT, rnkAvail, MGR_WKR_FROM_TAG, gcomWorld); /* Send rnkAvail to rnkSender so it knows where to send to. */ MPI_Send (&rnkAvail, 1, MPI_INT, rnkSender, MGR_WKR_TO_TAG, gcomWorld); } break; default: MPI_Abort (MPI_COMM_WORLD, 3); } stop = MPI_Wtime (); total += (stop - start); MPE_Log_event (ASSIGN_END, 0, NULL); } while (nprocAvail != gnprocWkr); tell_workers_to_quit (); printf ("%g\n", total); } #endif /* TEAMS, STEAL */