/* Copyright 1995 Jonathan C. Hardwick */ #include #include #include "globals.h" #include "main.h" #include "leader.h" #include "teams.h" #include "utils.h" /* Remember that group constructors are local operations, while * communicator constructors are collective operations. */ /* Structure of buffer is: * * Offset Length What * 0 nprocOld Absolute ranks of current group members. * nprocOld nprocNew Absolute ranks of new members to add. * MAXPROCS 1 -1 (flag value PROCS_NOT_BUFFERS). * MAXPROCS+1 1 nprocOld. * MAXPROCS+2 1 nprocNew. * MAXPROCS+3 1 nelt. */ /* Put given variables into abuf[]. */ static void fill_buf (int nprocOld, int nprocNew, int arnkOld [MAXPROCS], int arnkNew [MAXPROCS], int nelt, int abuf [BUFSIZE]) { int i; for (i = 0; i < nprocOld; i++) { abuf [i] = arnkOld [i]; } for (i = 0; i < nprocNew; i++) { abuf [i + nprocOld] = arnkNew [i]; } abuf [MAXPROCS] = PROCS_NOT_CHUNKS; abuf [MAXPROCS + 1] = nprocOld; abuf [MAXPROCS + 2] = nprocNew; abuf [MAXPROCS + 3] = nelt; } /* Extract contents of abuf[]. */ static void extract_from_buf (int abuf [BUFSIZE], int *pnprocOld, int *pnprocNew, int **parnkOld, int **parnkNew, int *pnelt) { int nprocOld, nprocNew, nelt; nprocOld = abuf [MAXPROCS + 1]; nprocNew = abuf [MAXPROCS + 2]; nelt = abuf [MAXPROCS + 3]; *parnkOld = &abuf [0]; *parnkNew = &abuf [nprocOld]; *pnprocOld = nprocOld; *pnprocNew = nprocNew; *pnelt = nelt; } static int is_group_buf (int abuf [BUFSIZE]) { return (abuf [MAXPROCS] == PROCS_NOT_CHUNKS); } /* This is called by the members of the current group at the same * time as the singleton procs are calling singleton_group_update(). * * Using information from the message buffer, add one or more singleton * processors to the current group, update the current communicator, and * get new group size and our rank within it. * * Mechanism for updating communicator is tricky: form the new * processors into an intracommunicator of their own, set up an * intercommunicator between the new intracomm and the old one (using * 0th process in each group as the local_leader for this call), and * merge the intercommunicator to form an intracommunicator. Note that * the 0th process in the new group is just aNewWkr[0]. */ void worker_group_update (int abuf [BUFSIZE]) { int nprocOld, nprocNew; int *arnkOld, *arnkNew; int nelt; MPI_Comm comTmp; MPI_Group grpTmp; extract_from_buf (abuf, &nprocOld, &nprocNew, &arnkOld, &arnkNew, &nelt); assert (nprocNew >= 1); assert (nprocOld == gnprocCur); /* Create temporary group containing new workers. */ MPI_Group_incl (ggrpWorld, nprocNew, arnkNew, &grpTmp); /* Union it with current group (our group goes first). */ MPI_Group_union (ggrpCur, grpTmp, &ggrpCur); /* This section of code executes at the same time as the equivalent * section in singleton_group_update (). */ /* Create intercommunicator from the intracomm for our group, and * whichever intracom arnkNew [0] is the leader of. Use the * world intracomm as the peer. */ MPI_Intercomm_create (gcomCur, 0, gcomWorld, arnkNew [0], GROUP_INTER_TAG, &comTmp); /* Merge the new intercomm to form a new current intracomm. Our group * is the non-high group, i.e. our processors go first. */ MPI_Intercomm_merge (comTmp, FALSE, &gcomCur); /* Take stats of the new intracomm */ MPI_Comm_size (gcomCur, &gnprocCur); MPI_Comm_rank (gcomCur, &grnkCur); MPI_Group_free (&grpTmp); MPI_Comm_free (&comTmp); } /* This is called by the singleton processors at the same time as * the members of the current group are calling worker_group_update(). * * Sequentially add the singleton processors to an intracommunicator. * Then use it to create an intercommunicator, matching the function * call in worker_group_update (), and merge it to form an intracomm for * the joint group. */ int singleton_group_update (int abuf [BUFSIZE]) { int nprocOld, nprocNew; int *arnkOld, *arnkNew; int nelt; int rnkRemoteLdr; int myindex, i; MPI_Comm comTmp; extract_from_buf (abuf, &nprocOld, &nprocNew, &arnkOld, &arnkNew, &nelt); rnkRemoteLdr = arnkOld [0]; /* leader of current group */ /* XXX Rather than doing this sequentially for every processor we want * to add, we could do it in log time using a tree, or alternatively * get a better feature added to MPI2... */ assert (nprocNew >= 1); /* Find where this processor is in buffer of new ranks. */ if (nprocNew > 1) { myindex = -1; for (i = 0; i < nprocNew; i++) { if (arnkNew [i] == grnkWorld) { myindex = i; break; } } assert (myindex != -1); /* If I'm the leader, sit and add everyone else after me. */ if (myindex == 0) { for (i = 1; i < nprocNew; i++) { MPI_Intercomm_create (gcomCur, 0, gcomWorld, arnkNew [i], SINGLE_INTER_TAG, &comTmp); MPI_Intercomm_merge (comTmp, FALSE, &gcomCur); } } else { /* Add myself to the end of the new intracomm. */ MPI_Intercomm_create (gcomCur, 0, gcomWorld, arnkNew [0], SINGLE_INTER_TAG, &comTmp); MPI_Intercomm_merge (comTmp, TRUE, &gcomCur); /* Then join with the leader in adding everyone after me. */ for (i = myindex+1; i < nprocNew; i++) { MPI_Intercomm_create (gcomCur, 0, gcomWorld, arnkNew [i], SINGLE_INTER_TAG, &comTmp); MPI_Intercomm_merge (comTmp, FALSE, &gcomCur); } } MPI_Comm_free (&comTmp); } /* This section of code executes at the same time as the equivalent * section in worker_group_update(). */ MPI_Intercomm_create (gcomCur, 0, gcomWorld, rnkRemoteLdr, GROUP_INTER_TAG, &comTmp); MPI_Intercomm_merge (comTmp, TRUE, &gcomCur); /* Update current group, group size, and rank. */ /* MPI_Group_incl (ggrpWorld, nprocNew + gnprocCur, abuf, &ggrpCur); */ MPI_Comm_group (gcomCur, &ggrpCur); MPI_Comm_size (gcomCur, &gnprocCur); MPI_Comm_rank (gcomCur, &grnkCur); return nelt; } /* Called only by the leader. * * Add new processors to the group if the manager has told us about them. * Involves making sure everyone has enough information to maintain * their own private (but consistent) idea of who is in their group. * * 1) Send each new processor the absolute ranks of the current members * of the group, and the absolute ranks of all the new processors. * * 2) Broadcast the same buffer to the current members of the group * (including ourselves), marking the broadcast with a tag to * distinguish it from the normal buffer-size-broadcast. */ static void leader_group_update (int neltTotal) { int arnkNew [MAXPROCS], arnkRel [MAXPROCS], arnkAbs [MAXPROCS]; int abuf [BUFSIZE]; int nprocNew, nprocTotal; int i; /* Do we have to add any new processors? */ nprocNew = recv_msg_from_mgr (arnkNew); if (nprocNew != 0) { /* Get absolute ranks of current group members. */ for (i = 0; i < gnprocCur; i++) { arnkRel [i] = i; } MPI_Group_translate_ranks (ggrpCur, gnprocCur, arnkRel, ggrpWorld, arnkAbs); fill_buf (gnprocCur, nprocNew, arnkAbs, arnkNew, neltTotal, abuf); nprocTotal = gnprocCur + nprocNew; assert (nprocTotal < MAXPROCS); /* Send each of the new processors the buffer we just constructed. */ for (i = 0; i < nprocNew; i++) { MPI_Send (abuf, BUFSIZE, MPI_INT, arnkNew [i], LDR_WKR_ADD_TAG, gcomWorld); } /* Broadcast it to the other current members of the group. */ MPI_Bcast (abuf, BUFSIZE, MPI_INT, 0, gcomCur); /* Join the workers in adding the new processors to the group */ worker_group_update (abuf); } } /* The leader has everyone's chunk sizes in abuf, and needs to broadcast * them to the group. However, it might also broadcast a list of new * processors to add to the group. */ void get_chunk_sizes (int abuf [BUFSIZE], int neltTotal) { /* Am I the leader? */ if (grnkCur == 0) { /* Add new procs to the group if we've received any. */ leader_group_update (neltTotal); /* And broadcast the chunk sizes. */ MPI_Bcast (abuf, BUFSIZE, MPI_INT, 0, gcomCur); } else { /* Receive a broadcast from the leader. */ MPI_Bcast (abuf, BUFSIZE, MPI_INT, 0, gcomCur); /* Work out whether it's a new processor broadcast or a chunk sizes * broadcast. */ if (is_group_buf (abuf)) { worker_group_update (abuf); /* The next broadcast should be the chunk sizes we expected in the * first place. */ MPI_Bcast (abuf, BUFSIZE, MPI_INT, 0, gcomCur); assert (!is_group_buf (abuf)); } } }