/* Copyright 1995 Jonathan C. Hardwick */ #include #include #include #include #include "globals.h" #include "main.h" #include "serial.h" #include "utils.h" #if defined (TEAMS) || defined (STEAL) #include "manager.h" #endif #ifdef TEAMS #include "leader.h" #include "teams.h" #endif static double get_pivot (double asrc[], int neltCur) { #ifdef PIVOT double pivot, apivot [3]; int i, index, proc, offset; /* MPE_Log_event (PIVOT_START, 0, NULL); */ for (i = 0; i < 3; i++) { index = (i * neltCur) / 4; proc_and_offset (index, neltCur, gnprocCur, &proc, &offset); if (proc == grnkCur) { apivot [i] = asrc [offset]; } MPI_Bcast (&apivot[i], 1, MPI_DOUBLE, proc, gcomCur); } if (apivot [0] < apivot [1]) { if (apivot [1] < apivot [2]) { pivot = apivot [1]; } else { if (apivot [0] < apivot [2]) { pivot = apivot [2]; } else { pivot = apivot [0]; } } } else { if (apivot [0] < apivot [2]) { pivot = apivot [0]; } else { if (apivot [1] < apivot [2]) { pivot = apivot [2]; } else { pivot = apivot [1]; } } } /* MPE_Log_event (PIVOT_END, 0, NULL); */ #else /* !PIVOT */ double pivot; int index, proc, offset; /* MPE_Log_event (PIVOT_START, 0, NULL); */ index = neltCur / 2; proc_and_offset (index, neltCur, gnprocCur, &proc, &offset); if (proc == grnkCur) { pivot = asrc [offset]; } MPI_Bcast (&pivot, 1, MPI_DOUBLE, proc, gcomCur); /* MPE_Log_event (PIVOT_END, 0, NULL); */ #endif /* PIVOT */ return pivot; } /* Partition the data in asrc into two chunks in adst, according to * whether each item is less than or greater than the pivot. * "Returns" neltLess and neltGreater. oddeven controls which partition * gets elements equal to the pivot. */ static void local_partition (double asrc[], double adst[], int neltHere, double pivot, int *pneltLess, int *pneltGreater, int oddeven) { double elt; int neltLess, neltGreater; int celtLess, celtGreater; int i; MPE_Log_event (PARTN_START, 0, NULL); /* Count elements less than the pivot. */ neltLess = 0; if (oddeven) { for (i = 0; i < neltHere; i++) { if (asrc [i] <= pivot) neltLess++; } } else { for (i = 0; i < neltHere; i++) { if (asrc [i] < pivot) neltLess++; } } neltGreater = neltHere - neltLess; *pneltLess = neltLess; *pneltGreater = neltGreater; /* Second pass over data, partitioning it into adst. */ celtLess = 0; celtGreater = neltLess; if (oddeven) { for (i = 0; i < neltHere; i++) { elt = asrc [i]; if (elt <= pivot) { adst [celtLess++] = elt; } else { adst [celtGreater++] = elt; } } } else { for (i = 0; i < neltHere; i++) { elt = asrc [i]; if (elt < pivot) { adst [celtLess++] = elt; } else { adst [celtGreater++] = elt; } } } MPE_Log_event (PARTN_END, 0, NULL); } /* Tell everyone in the team everyone else's chunk sizes. */ static void exchange_chunk_sizes (int achunk [BUFSIZE], int neltTotal, int neltLess, int neltGreater) { int achunksHere [2] = { neltLess, neltGreater }; /* MPE_Log_event (GATHER_START, 0, NULL); */ #ifdef TEAMS /* Leader zeroes out where new proc chunk sizes might go. */ if (grnkCur == 0) { int i; for (i = 2 * gnprocCur; i < 2 * gnprocWorld; i++) { achunk [i] = 0; } } /* Gather chunk sizes to leader. */ MPI_Gather (achunksHere, 2, MPI_INT, achunk, 2, MPI_INT, 0, gcomCur); /* Leader broadcasts chunk sizes to rest of team, after possibly * getting new procs from manager and adding them to team. */ get_chunk_sizes (achunk, neltTotal); #else /* !TEAMS */ MPI_Allgather (achunksHere, 2, MPI_INT, achunk, 2, MPI_INT, gcomCur); #endif /* TEAMS */ /* MPE_Log_event (GATHER_END, 0, NULL); */ } /* Works out the redistribution of data. Gets: the array of * per-processor chunk sizes, and the number of elements. Returns: the * number of processors to put the "less than" and "greater than" data * on, and the total number of "less than" and "greater than" * elements. */ static void proc_split (int achunk [BUFSIZE], int neltTotal, int *pnprocLess, int *pneltLess, int *pnprocGreater, int *pneltGreater) { double dcutoff, midpoint; int i, cutoff, nprocLess, neltLess, neltGreater; neltLess = 0; neltGreater = 0; for (i = 0; i < gnprocCur * 2; i += 2) { neltLess += achunk [i]; neltGreater += achunk [i+1]; } assert (neltTotal == neltLess + neltGreater); *pneltLess = neltLess; *pneltGreater = neltGreater; /* Get initial approximation to nprocLess. */ dcutoff = ((double) neltLess * gnprocCur) / neltTotal; cutoff = (int) dcutoff; if (cutoff == gnprocCur) cutoff = gnprocCur - 1; assert ((cutoff >= 0) && (cutoff < gnprocCur)); /* Deal with boundary cases. */ if (cutoff == 0) { nprocLess = 1; } else if (cutoff == gnprocCur - 1) { nprocLess = gnprocCur - 1; } else { /* Round cutoff up or down, depending on where the split is. */ midpoint = cutoff + ((double) cutoff / (gnprocCur - 1)); if (dcutoff > midpoint) { nprocLess = cutoff + 1; } else { nprocLess = cutoff; } } assert ((nprocLess >= 1) && (nprocLess <= gnprocCur - 1)); /* printf ("nproc = %d (%d + %d), nelt = %d (%d + %d)\n", gnprocCur, nprocLess, gnprocCur-nprocLess, neltTotal, neltLess, neltGreater); */ *pnprocLess = nprocLess; *pnprocGreater = gnprocCur - nprocLess; } /* Fills in send and receive arrays in correct format for MPI_Alltoallv * to redistribute data within current group. Called twice per * recursion, once each for the less than and greater than data. * * asize: appropriate set of chunk sizes (&chunks[0] or &chunks[1]) * offsetProc: first proc to send to (0 or nprocsLess) * offsetElt: offset into dst buffer (0 or neltsLess) * neltDst : number of elements in destination vector * nprocDst: number of processors holding destination vector */ static void setup_send (int asize[], int procOffset, int eltOffset, int neltDst, int nprocDst, int acountSend [MAXPROCS], int adispSend [MAXPROCS], int acountRecv [MAXPROCS], int adispRecv [MAXPROCS]) { int procSrc, procTarget, offsetTarget, neltTarget; int neltTosend, neltSent, neltCur; int i; /* Loop over source processors and the chunks they hold (less than and * greater than chunk sizes are interleaved, so step by two). */ neltCur = 0; for (i = 0, procSrc = 0; procSrc < gnprocCur; i += 2, procSrc++) { /* Keep going until we've accounted for all of the current chunk. */ neltSent = 0; while (neltSent < asize [i]) { /* Recalculate the following from neltSeen every time. * XXX: possible target for optimization. * * procTarget: absolute rank of the receiving processor * offsetDst : offset on procTarget * neltTarget: number of elements on procTarget * neltTosend: amount of data to send (minimum of what procSrc has * left and what procTarget has room for). */ proc_and_offset (neltCur, neltDst, nprocDst, &procTarget, &offsetTarget); /* Find number of elements target processor can hold. */ neltTarget = nelt_on_proc (procTarget, neltDst, nprocDst); /* Now turn rank of target processor within nprocsDst to rank * within current group, for MPI_Alltoallv to use. */ procTarget += procOffset; /* Figure out how much data to send. */ neltTosend = min ((asize [i] - neltSent), (neltTarget - offsetTarget)); /* Fill in send arrays if we're the sender. */ if (procSrc == grnkCur) { acountSend [procTarget] = neltTosend; adispSend [procTarget] = neltSent + eltOffset; } /* Fill in receive arrays if we're the receiver. */ if (procTarget == grnkCur) { acountRecv [procSrc] = neltTosend; adispRecv [procSrc] = offsetTarget; } neltSent += neltTosend; neltCur += neltTosend; } } } /* Given total number of elements, and the number of elements on this * processor less than and greater than the pivot, redistribute the data * from asrc using MPI_All_to_allv. "Return" total number of elements * in the section of vector we recursed on, and a pointer to our new * chunk of data. */ static double * global_redistribute (double asrc[], int neltLessHere, int neltGreaterHere, int *pneltTotal) { double *adst; int i, nprocLess, nprocGreater; int neltLess, neltGreater, neltNew, neltHere; int arange [1][3]; int acountSend [MAXPROCS], adispSend [MAXPROCS]; int acountRecv [MAXPROCS], adispRecv [MAXPROCS]; int achunk [MAXPROCS*2]; /* Get everyone else's chunk sizes. */ exchange_chunk_sizes (achunk, *pneltTotal, neltLessHere, neltGreaterHere); /* Work out the redistribution of data. */ proc_split (achunk, *pneltTotal, &nprocLess, &neltLess, &nprocGreater, &neltGreater); #ifdef TEAMS /* Leader sends message to manager describing team split. */ if (grnkCur == 0) { tell_mgr_about_split (nprocLess, neltLess, nprocGreater, neltGreater); } #endif /* Which group will this processor end up in? */ arange [0][2] = 1; if (grnkCur < nprocLess) { arange [0][0] = 0; arange [0][1] = nprocLess - 1; neltNew = neltLess; neltHere = nelt_on_proc (grnkCur, neltLess, nprocLess); } else { arange [0][0] = nprocLess; arange [0][1] = gnprocCur - 1; neltNew = neltGreater; neltHere = nelt_on_proc (grnkCur - nprocLess, neltGreater, nprocGreater); } /* Make a new vector to store data into. */ adst = malloc_double (neltHere); for (i = 0; i < MAXPROCS; i++) { acountSend[i] = 0; adispSend[i] = 0; acountRecv[i] = 0; acountRecv[i] = 0; } /* Set up redistribution for less than and greater than data. */ setup_send (&achunk [0], 0, 0, neltLess, nprocLess, acountSend, adispSend, acountRecv, adispRecv); setup_send (&achunk [1], nprocLess, neltLessHere, neltGreater, nprocGreater, acountSend, adispSend, acountRecv, adispRecv); /* Do the data redistribution. */ MPE_Log_event (COMM_START, 0, NULL); MPI_Alltoallv (asrc, acountSend, adispSend, MPI_DOUBLE, adst, acountRecv, adispRecv, MPI_DOUBLE, gcomCur); MPE_Log_event (COMM_END, 0, NULL); /* Split communicator and group. */ MPI_Comm_split (gcomCur, (grnkCur < nprocLess), 0, &gcomCur); MPI_Comm_group (gcomCur, &ggrpCur); MPI_Comm_size (gcomCur, &gnprocCur); MPI_Comm_rank (gcomCur, &grnkCur); *pneltTotal = neltNew; return adst; } /* The function that puts it all together. */ static void parallel_quicksort (double *asrc, int neltTotal) { #if defined (STEAL) || defined (TEAMS) MPI_Status status; int dummy; #endif #ifdef TEAMS int abuf [BUFSIZE]; #endif double pivot; double *atmp; int neltHere, neltLessHere, neltGreaterHere; int oddeven = 0; #ifdef STEAL /* Use team sorting until everyone is in a team of their own. */ while (gnprocCur > 1) { pivot = get_pivot (asrc, neltTotal); neltHere = nelt_on_proc (grnkCur, neltTotal, gnprocCur); atmp = malloc_double (neltHere); local_partition (asrc, atmp, neltHere, pivot, &neltLessHere, &neltGreaterHere, oddeven); oddeven = (oddeven == 0); free (asrc); asrc = global_redistribute (atmp, neltLessHere, neltGreaterHere, &neltTotal); free (atmp); } /* We alternate between sorting the data that we've got (and asking * the manager if we can send some of it to someone else if we've got * too much), and waiting to receive more data from someone else. */ while (1) { atmp = malloc_double (neltTotal); MPE_Log_event (SERIAL_START, 0, NULL); inplace_sort (asrc, atmp, neltTotal, oddeven); MPE_Log_event (SERIAL_END, 0, NULL); /* Tell the manager that we're done. */ MPI_Send (&dummy, 0, MPI_INT, 0, WKR_MGR_DONE_TAG, gcomWorld); /* Block till we receive a message from the manager: it'll either * tell us to quit, or who to receive data from (and how much). */ MPI_Recv (&neltTotal, 1, MPI_INT, 0, MPI_ANY_TAG, gcomWorld, &status); if (status.MPI_TAG == MGR_WKR_QUIT_TAG) { break; } assert (status.MPI_TAG == MGR_WKR_FROM_TAG); assert ((neltTotal > 0) && (neltTotal < 1000000)); asrc = malloc_double (neltTotal); MPE_Log_event (RECV_START, 0, NULL); MPI_Recv (asrc, neltTotal, MPI_DOUBLE, MPI_ANY_SOURCE, WKR_WKR_DATA_TAG, gcomWorld, &status); MPE_Log_event (RECV_END, 0, NULL); } #elif TEAMS while (1) { while (gnprocCur > 1) { pivot = get_pivot (asrc, neltTotal); neltHere = nelt_on_proc (grnkCur, neltTotal, gnprocCur); atmp = malloc_double (neltHere); local_partition (asrc, atmp, neltHere, pivot, &neltLessHere, &neltGreaterHere, oddeven); oddeven = (oddeven == 0); free (asrc); asrc = global_redistribute (atmp, neltLessHere, neltGreaterHere, &neltTotal); free (atmp); } atmp = malloc_double (neltTotal); MPE_Log_event (SERIAL_START, 0, NULL); inplace_sort (asrc, atmp, neltTotal, oddeven); MPE_Log_event (SERIAL_END, 0, NULL); /* Tell the manager that we're done. */ MPI_Send (&dummy, 0, MPI_INT, 0, WKR_MGR_DONE_TAG, gcomWorld); /* Block till we receive a message from a leader telling us to join * its group, or a message from the manager telling us to quit. */ MPI_Recv (abuf, BUFSIZE, MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG, gcomWorld, &status); if (status.MPI_TAG == MGR_WKR_QUIT_TAG) { break; } assert (status.MPI_TAG == LDR_WKR_ADD_TAG); MPE_Log_event (ASSIGN_START, 0, NULL); neltTotal = singleton_group_update (abuf); MPE_Log_event (ASSIGN_END, 0, NULL); neltLessHere = 0; neltGreaterHere = 0; asrc = global_redistribute (NULL, neltLessHere, neltGreaterHere, &neltTotal); } #elif NAIVE while (gnprocCur > 1) { pivot = get_pivot (asrc, neltTotal); neltHere = nelt_on_proc (grnkCur, neltTotal, gnprocCur); atmp = malloc_double (neltHere); local_partition (asrc, atmp, neltHere, pivot, &neltLessHere, &neltGreaterHere, oddeven); oddeven = (oddeven == 0); free (asrc); asrc = global_redistribute (atmp, neltLessHere, neltGreaterHere, &neltTotal); free (atmp); } atmp = malloc_double (neltTotal); MPE_Log_event (SERIAL_START, 0, NULL); inplace_sort (asrc, atmp, neltTotal, oddeven); MPE_Log_event (SERIAL_END, 0, NULL); #endif /* TEAMS, STEAL, NAIVE */ } void main (int argc, char **argv) { double *asrc; int nelt, neltHere, seed; MPI_Init (&argc, &argv); if (getopt (argc, argv, "l:") == EOF) { printf ("Missing length: -l xxx"); MPI_Abort (MPI_COMM_WORLD, 1); } nelt = atoi (optarg); assert (nelt > 0); if (getopt (argc, argv, "s:") == EOF) { printf ("Missing seed: -s xxx"); MPI_Abort (MPI_COMM_WORLD, 1); } seed = atoi (optarg); #ifdef STEAL if (getopt (argc, argv, "c:") == EOF) { printf ("Missing cutoff: -c xxx"); MPI_Abort (MPI_COMM_WORLD, 1); } gcutoff = atoi (optarg); #endif /* Set our communicators, groups, sizes, and ranks, for the world, * workers, and current group. */ MPI_Comm_dup (MPI_COMM_WORLD, &gcomWorld); MPI_Comm_group (gcomWorld, &ggrpWorld); MPI_Comm_size (gcomWorld, &gnprocWorld); MPI_Comm_rank (gcomWorld, &grnkWorld); #ifdef NAIVE MPI_Comm_dup (gcomWorld, &gcomWkr); #else MPI_Comm_split (gcomWorld, (grnkWorld == 0), 0, &gcomWkr); #endif MPI_Comm_group (gcomWkr, &ggrpWkr); MPI_Comm_size (gcomWkr, &gnprocWkr); MPI_Comm_rank (gcomWkr, &grnkWkr); /* Then the current group (also unused by the manager). */ MPI_Comm_dup (gcomWkr, &gcomCur); MPI_Comm_group (gcomCur, &ggrpCur); MPI_Comm_size (gcomCur, &gnprocCur); MPI_Comm_rank (gcomCur, &grnkCur); #ifdef NAIVE neltHere = nelt_on_proc (grnkWkr, nelt, gnprocWkr); asrc = malloc_double (neltHere); setup_sort (seed, asrc, nelt); start_logging (); parallel_quicksort (asrc, nelt); stop_logging (seed); #else if (grnkWorld != 0) { neltHere = nelt_on_proc (grnkWkr, nelt, gnprocWkr); asrc = malloc_double (neltHere); setup_sort (seed, asrc, nelt); } start_logging (); if (grnkWorld == 0) { gnprocWkr = gnprocWorld - 1; manager_main (nelt); } else { parallel_quicksort (asrc, nelt); } stop_logging (seed); #endif /* Free up our resources in reverse order, just to be neat. */ MPI_Group_free (&ggrpCur); MPI_Comm_free (&gcomCur); MPI_Group_free (&ggrpWkr); MPI_Comm_free (&gcomWkr); MPI_Group_free (&ggrpWorld); MPI_Comm_free (&gcomWorld); MPI_Finalize (); exit (0); }