diff --git a/CMakeLists.txt b/CMakeLists.txt index 528ae92a..4bcd75c9 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -11,7 +11,7 @@ cmake_minimum_required(VERSION 3.9) set(ALUMINUM_VERSION_MAJOR 0) set(ALUMINUM_VERSION_MINOR 3) -set(ALUMINUM_VERSION_PATCH 1) +set(ALUMINUM_VERSION_PATCH 2) set(ALUMINUM_VERSION "${ALUMINUM_VERSION_MAJOR}.${ALUMINUM_VERSION_MINOR}.${ALUMINUM_VERSION_PATCH}") project(ALUMINUM VERSION ${ALUMINUM_VERSION} LANGUAGES CXX) diff --git a/src/mpi_cuda/allgather.hpp b/src/mpi_cuda/allgather.hpp index 9e96ceb8..1581a262 100644 --- a/src/mpi_cuda/allgather.hpp +++ b/src/mpi_cuda/allgather.hpp @@ -76,18 +76,20 @@ class AllgatherAlState : public AlState { release_pinned_memory(host_mem_); } - bool step() override { - if (!ag_started_) { - // Check if mem xfer complete + PEAction step() override { + if (!mem_xfer_done_) { if (d2h_event_.query()) { - MPI_Iallgather(MPI_IN_PLACE, count_, mpi::TypeMap(), - host_mem_, count_, mpi::TypeMap(), comm_, &req_); - ag_started_ = true; - } - else { - return false; + mem_xfer_done_ = true; + return PEAction::advance; + } else { + return PEAction::cont; } } + if (!ag_started_) { + MPI_Iallgather(MPI_IN_PLACE, count_, mpi::TypeMap(), + host_mem_, count_, mpi::TypeMap(), comm_, &req_); + ag_started_ = true; + } if (!ag_done_) { // Wait for the all2all to complete @@ -98,16 +100,16 @@ class AllgatherAlState : public AlState { gpuwait_.signal(); } else { - return false; + return PEAction::cont; } } // Wait for host-to-device memcopy; cleanup if (h2d_event_.query()) { - return true; + return PEAction::complete; } - return false; + return PEAction::cont; } bool needs_completion() const override { return false; } @@ -126,6 +128,7 @@ class AllgatherAlState : public AlState { MPI_Comm comm_; MPI_Request req_ = MPI_REQUEST_NULL; + bool mem_xfer_done_ = false; bool ag_started_ = false; bool ag_done_ = false; diff --git a/src/mpi_cuda/allreduce.hpp b/src/mpi_cuda/allreduce.hpp index ad6961d6..ae1e2c1e 100644 --- a/src/mpi_cuda/allreduce.hpp +++ b/src/mpi_cuda/allreduce.hpp @@ -86,31 +86,34 @@ class HostTransferState : public AlState { release_pinned_memory(host_mem); } - bool step() override { - if (!ar_started) { - // Wait for memory to get to the host. + PEAction step() override { + if (!mem_xfer_done) { if (d2h_event.query()) { - host_ar->setup(); - ar_started = true; + mem_xfer_done = true; + return PEAction::advance; } else { - return false; + return PEAction::cont; } } + if (!ar_started) { + host_ar->setup(); + ar_started = true; + } if (!ar_done) { // Wait for the allreduce to complete. - if (host_ar->step()) { + if (host_ar->step() == PEAction::complete) { ar_done = true; // Mark the sync as done to wake up the device. gpu_wait.signal(); } else { - return false; + return PEAction::cont; } } // Wait for the memcpy back to device to complete so we can clean up. if (h2d_event.query()) { - return true; + return PEAction::complete; } - return false; + return PEAction::cont; } bool needs_completion() const override { return false; } void* get_compute_stream() const override { return compute_stream; } @@ -124,6 +127,7 @@ class HostTransferState : public AlState { mpi::MPIAlState* host_ar; cuda::FastEvent d2h_event, h2d_event; cuda::GPUWait gpu_wait; + bool mem_xfer_done = false; bool ar_started = false; bool ar_done = false; cudaStream_t compute_stream; diff --git a/src/mpi_cuda/alltoall.hpp b/src/mpi_cuda/alltoall.hpp index 1ef3b56e..b37e3983 100644 --- a/src/mpi_cuda/alltoall.hpp +++ b/src/mpi_cuda/alltoall.hpp @@ -65,18 +65,20 @@ class AlltoallAlState : public AlState { release_pinned_memory(host_mem_); } - bool step() override { - if (!a2a_started_) { - // Check if mem xfer complete + PEAction step() override { + if (!mem_xfer_done_) { if (d2h_event_.query()) { - MPI_Ialltoall(MPI_IN_PLACE, count_, mpi::TypeMap(), - host_mem_, count_, mpi::TypeMap(), comm_, &req_); - a2a_started_ = true; - } - else { - return false; + mem_xfer_done_ = true; + return PEAction::advance; + } else { + return PEAction::cont; } } + if (!a2a_started_) { + MPI_Ialltoall(MPI_IN_PLACE, count_, mpi::TypeMap(), + host_mem_, count_, mpi::TypeMap(), comm_, &req_); + a2a_started_ = true; + } if (!a2a_done_) { // Wait for the all2all to complete @@ -87,16 +89,16 @@ class AlltoallAlState : public AlState { gpuwait_.signal(); } else { - return false; + return PEAction::cont; } } // Wait for host-to-device memcopy; cleanup if (h2d_event_.query()) { - return true; + return PEAction::complete; } - return false; + return PEAction::cont; } bool needs_completion() const override { return false; } @@ -115,6 +117,7 @@ class AlltoallAlState : public AlState { MPI_Comm comm_; MPI_Request req_ = MPI_REQUEST_NULL; + bool mem_xfer_done_ = false; bool a2a_started_ = false; bool a2a_done_ = false; diff --git a/src/mpi_cuda/bcast.hpp b/src/mpi_cuda/bcast.hpp index 26863605..d06cf779 100644 --- a/src/mpi_cuda/bcast.hpp +++ b/src/mpi_cuda/bcast.hpp @@ -70,18 +70,22 @@ class BcastAlState : public AlState { release_pinned_memory(host_mem_); } - bool step() override { - if (!bcast_started_) { + PEAction step() override { + if (!mem_xfer_done_) { if (d2h_event_.query()) { - MPI_Ibcast(host_mem_, count_, mpi::TypeMap(), - root_, comm_, &req_); - if (rank_ == root_) { - gpuwait_.signal(); - } - bcast_started_ = true; + mem_xfer_done_ = true; + return PEAction::advance; } else { - return false; + return PEAction::cont; + } + } + if (!bcast_started_) { + MPI_Ibcast(host_mem_, count_, mpi::TypeMap(), + root_, comm_, &req_); + if (rank_ == root_) { + gpuwait_.signal(); } + bcast_started_ = true; } if (!bcast_done_) { @@ -93,19 +97,19 @@ class BcastAlState : public AlState { if (rank_ != root_) { gpuwait_.signal(); } else { - return true; + return PEAction::complete; } } else { - return false; + return PEAction::cont; } } // Wait for host-to-device memcopy; cleanup if (h2d_event_.query()) { - return true; + return PEAction::complete; } - return false; + return PEAction::cont; } bool needs_completion() const override { return false; } @@ -126,6 +130,7 @@ class BcastAlState : public AlState { MPI_Comm comm_; MPI_Request req_ = MPI_REQUEST_NULL; + bool mem_xfer_done_ = false; bool bcast_started_ = false; bool bcast_done_ = false; diff --git a/src/mpi_cuda/gather.hpp b/src/mpi_cuda/gather.hpp index 753c5c35..737e7d3f 100644 --- a/src/mpi_cuda/gather.hpp +++ b/src/mpi_cuda/gather.hpp @@ -74,25 +74,27 @@ class GatherAlState : public AlState { release_pinned_memory(host_mem_); } - bool step() override { - if (!gather_started_) { - // Check if mem xfer complete + PEAction step() override { + if (!mem_xfer_done_) { if (d2h_event_.query()) { - if (root_ == rank_) { - MPI_Igather(MPI_IN_PLACE, count_, mpi::TypeMap(), - host_mem_, count_, mpi::TypeMap(), - root_, comm_, &req_); - } else { - MPI_Igather(host_mem_, count_, mpi::TypeMap(), - host_mem_, count_, mpi::TypeMap(), - root_, comm_, &req_); - gpuwait_.signal(); - } - gather_started_ = true; + mem_xfer_done_ = true; + return PEAction::advance; + } else { + return PEAction::cont; } - else { - return false; + } + if (!gather_started_) { + if (root_ == rank_) { + MPI_Igather(MPI_IN_PLACE, count_, mpi::TypeMap(), + host_mem_, count_, mpi::TypeMap(), + root_, comm_, &req_); + } else { + MPI_Igather(host_mem_, count_, mpi::TypeMap(), + host_mem_, count_, mpi::TypeMap(), + root_, comm_, &req_); + gpuwait_.signal(); } + gather_started_ = true; } if (!gather_done_) { @@ -104,24 +106,24 @@ class GatherAlState : public AlState { if (rank_ == root_) gpuwait_.signal(); else - return true; + return PEAction::complete; } else { - return false; + return PEAction::cont; } } else if (rank_ != root_) { // Paranoia, in case step() is ever called again after returning // 'true' for the first time. - return true; + return PEAction::complete; } // Wait for host-to-device memcopy; cleanup if (h2d_event_.query()) { - return true; + return PEAction::complete; } - return false; + return PEAction::cont; } bool needs_completion() const override { return false; } @@ -142,6 +144,7 @@ class GatherAlState : public AlState { MPI_Comm comm_; MPI_Request req_ = MPI_REQUEST_NULL; + bool mem_xfer_done_ = false; bool gather_started_ = false; bool gather_done_ = false; diff --git a/src/mpi_cuda/pt2pt.hpp b/src/mpi_cuda/pt2pt.hpp index 2153c8fc..76f1cd01 100644 --- a/src/mpi_cuda/pt2pt.hpp +++ b/src/mpi_cuda/pt2pt.hpp @@ -58,19 +58,16 @@ class SendAlState : public AlState { prof_range = profiling::prof_start("HTSend d2h"); } #endif - bool step() override { + PEAction step() override { if (!mem_transfer_done) { if (sync_event.query()) { mem_transfer_done = true; #ifdef AL_HAS_PROF profiling::prof_end(prof_range); #endif + return PEAction::advance; } - - // Always return false here so the send is not started until the next - // pass through the in-progress requests. - // This ensures that sends always start in the order they were posted. - return false; + return PEAction::cont; } if (!send_started) { #ifdef AL_HAS_PROF @@ -86,7 +83,7 @@ class SendAlState : public AlState { profiling::prof_end(prof_range); } #endif - return flag; + return flag ? PEAction::complete : PEAction::cont; } bool needs_completion() const override { return false; } void* get_compute_stream() const override { return compute_stream; } @@ -133,7 +130,7 @@ class RecvAlState : public AlState { #endif MPI_Irecv(mem, count, mpi::TypeMap(), src, pt2pt_tag, comm, &req); } - bool step() override { + PEAction step() override { if (!recv_done) { int flag; MPI_Test(&req, &flag, MPI_STATUS_IGNORE); @@ -153,9 +150,9 @@ class RecvAlState : public AlState { if (r) { profiling::prof_end(prof_range); } - return r; + return r ? PEAction::complete : PEAction::cont; #else - return sync_event.query(); + return sync_event.query() ? PEAction::complete : PEAction::cont; #endif } bool needs_completion() const override { return false; } @@ -194,14 +191,18 @@ class SendRecvAlState : public AlState { send_state.start(); recv_state.start(); } - bool step() override { + PEAction step() override { if (!send_done) { - send_done = send_state.step(); + PEAction send_action = send_state.step(); + if (send_action == PEAction::advance) { + return send_action; + } + send_done = send_action == PEAction::complete; } if (!recv_done) { - recv_done = recv_state.step(); + recv_done = recv_state.step() == PEAction::complete; } - return send_done && recv_done; + return send_done && recv_done ? PEAction::complete : PEAction::cont; } bool needs_completion() const override { return false; } void* get_compute_stream() const override { @@ -215,6 +216,7 @@ class SendRecvAlState : public AlState { private: SendAlState send_state; RecvAlState recv_state; + bool send_advanced = false; bool send_done = false; bool recv_done = false; }; diff --git a/src/mpi_cuda/reduce.hpp b/src/mpi_cuda/reduce.hpp index e7e91fbc..88b10d36 100644 --- a/src/mpi_cuda/reduce.hpp +++ b/src/mpi_cuda/reduce.hpp @@ -68,23 +68,25 @@ class ReduceAlState : public AlState { release_pinned_memory(host_mem_); } - bool step() override { - if (!reduce_started_) { - // Check if mem xfer complete + PEAction step() override { + if (!mem_xfer_done_) { if (d2h_event_.query()) { - if (root_ == rank_) { - MPI_Ireduce(MPI_IN_PLACE, host_mem_, count_, mpi::TypeMap(), - op_, root_, comm_, &req_); - } else { - MPI_Ireduce(host_mem_, host_mem_, count_, mpi::TypeMap(), - op_, root_, comm_, &req_); - gpuwait_.signal(); - } - reduce_started_ = true; + mem_xfer_done_ = true; + return PEAction::advance; + } else { + return PEAction::cont; } - else { - return false; + } + if (!reduce_started_) { + if (root_ == rank_) { + MPI_Ireduce(MPI_IN_PLACE, host_mem_, count_, mpi::TypeMap(), + op_, root_, comm_, &req_); + } else { + MPI_Ireduce(host_mem_, host_mem_, count_, mpi::TypeMap(), + op_, root_, comm_, &req_); + gpuwait_.signal(); } + reduce_started_ = true; } if (!reduce_done_) { @@ -96,20 +98,20 @@ class ReduceAlState : public AlState { if (rank_ == root_) { gpuwait_.signal(); } else { - return true; + return PEAction::complete; } } else { - return false; + return PEAction::cont; } } // Wait for host-to-device memcopy; cleanup if (h2d_event_.query()) { - return true; + return PEAction::complete; } - return false; + return PEAction::cont; } bool needs_completion() const override { return false; } @@ -130,6 +132,7 @@ class ReduceAlState : public AlState { MPI_Comm comm_; MPI_Request req_ = MPI_REQUEST_NULL; + bool mem_xfer_done_ = false; bool reduce_started_ = false; bool reduce_done_ = false; diff --git a/src/mpi_cuda/reduce_scatter.hpp b/src/mpi_cuda/reduce_scatter.hpp index 95856178..e5e10c08 100644 --- a/src/mpi_cuda/reduce_scatter.hpp +++ b/src/mpi_cuda/reduce_scatter.hpp @@ -67,17 +67,20 @@ class ReduceScatterAlState : public AlState { release_pinned_memory(host_mem_); } - bool step() override { - if (!rs_started_) { - // Wait for memory to get to the host. + PEAction step() override { + if (!mem_xfer_done_) { if (d2h_event_.query()) { - MPI_Ireduce_scatter_block(MPI_IN_PLACE, host_mem_, count_, - mpi::TypeMap(), op_, comm_, &req_); - rs_started_ = true; + mem_xfer_done_ = true; + return PEAction::advance; } else { - return false; + return PEAction::cont; } } + if (!rs_started_) { + MPI_Ireduce_scatter_block(MPI_IN_PLACE, host_mem_, count_, + mpi::TypeMap(), op_, comm_, &req_); + rs_started_ = true; + } if (!rs_done_) { // Wait for the RS to complete. int flag; @@ -86,14 +89,14 @@ class ReduceScatterAlState : public AlState { rs_done_ = true; gpuwait_.signal(); } else { - return false; + return PEAction::cont; } } // Wait for host-to-device memcopy; cleanup if (h2d_event_.query()) { - return true; + return PEAction::complete; } - return false; + return PEAction::cont; } bool needs_completion() const override { return false; } @@ -108,6 +111,7 @@ class ReduceScatterAlState : public AlState { MPI_Op op_; MPI_Comm comm_; MPI_Request req_ = MPI_REQUEST_NULL; + bool mem_xfer_done_ = false; bool rs_started_ = false; bool rs_done_ = false; cudaStream_t compute_stream; diff --git a/src/mpi_cuda/rma_ipc.hpp b/src/mpi_cuda/rma_ipc.hpp index a06b46f9..ecd19d2b 100644 --- a/src/mpi_cuda/rma_ipc.hpp +++ b/src/mpi_cuda/rma_ipc.hpp @@ -50,10 +50,10 @@ class NotifyState: public AlState { MPI_Irecv(&key, 1, MPI_INT, m_peer, 0, m_comm.get_comm(), &m_requests[1]); } - bool step() override { + PEAction step() override { int flag; MPI_Testall(2, m_requests, &flag, MPI_STATUS_IGNORE); - return flag; + return flag ? PEAction::complete : PEAction::cont; } private: int key = 0; @@ -75,7 +75,7 @@ class WaitState: public AlState { MPI_Irecv(&key, 1, MPI_INT, m_peer, 0, m_comm.get_comm(), &m_req); } - bool step() override { + PEAction step() override { int flag; MPI_Test(&m_req, &flag, MPI_STATUS_IGNORE); if (flag) { @@ -85,12 +85,12 @@ class WaitState: public AlState { MPI_Isend(&key, 1, MPI_INT, m_peer, 0, m_comm.get_comm(), &m_req); m_stream_wait_set = true; - return false; + return PEAction::cont; } else { - return true; + return PEAction::complete; } } - return false; + return PEAction::cont; } private: int key = 0; @@ -116,7 +116,7 @@ class SyncState: public AlState { MPI_Irecv(&key, 1, MPI_INT, m_peer, 0, m_comm.get_comm(), &m_requests[1]); } - bool step() override { + PEAction step() override { int flag; MPI_Testall(2, m_requests, &flag, MPI_STATUS_IGNORE); if (flag) { @@ -129,10 +129,10 @@ class SyncState: public AlState { m_comm.get_comm(), &m_requests[1]); m_stream_wait_set = true; } else { - return true; + return PEAction::complete; } } - return false; + return PEAction::cont; } private: int key = 0; diff --git a/src/mpi_cuda/scatter.hpp b/src/mpi_cuda/scatter.hpp index 87a719d8..7ad5525e 100644 --- a/src/mpi_cuda/scatter.hpp +++ b/src/mpi_cuda/scatter.hpp @@ -79,26 +79,28 @@ class ScatterAlState : public AlState { release_pinned_memory(host_mem_); } - bool step() override { - if (!scatter_started_) { - // Check if mem xfer complete + PEAction step() override { + if (!mem_xfer_done_) { if (d2h_event_.query()) { - if (root_ == rank_) { - MPI_Iscatter(host_mem_, count_, mpi::TypeMap(), - MPI_IN_PLACE, count_, mpi::TypeMap(), - root_, comm_, &req_); - // Root can unblock the device here. - gpuwait_.signal(); - } else { - MPI_Iscatter(host_mem_, count_, mpi::TypeMap(), - host_mem_, count_, mpi::TypeMap(), - root_, comm_, &req_); - } - scatter_started_ = true; + mem_xfer_done_ = true; + return PEAction::advance; + } else { + return PEAction::cont; } - else { - return false; + } + if (!scatter_started_) { + if (root_ == rank_) { + MPI_Iscatter(host_mem_, count_, mpi::TypeMap(), + MPI_IN_PLACE, count_, mpi::TypeMap(), + root_, comm_, &req_); + // Root can unblock the device here. + gpuwait_.signal(); + } else { + MPI_Iscatter(host_mem_, count_, mpi::TypeMap(), + host_mem_, count_, mpi::TypeMap(), + root_, comm_, &req_); } + scatter_started_ = true; } if (!scatter_done_) { @@ -108,21 +110,21 @@ class ScatterAlState : public AlState { if (flag) { scatter_done_ = true; if (rank_ == root_) { - return true; + return PEAction::complete; } else { gpuwait_.signal(); } } else { - return false; + return PEAction::cont; } } // Wait for host-to-device memcopy; cleanup if (h2d_event_.query()) { - return true; + return PEAction::complete; } - return false; + return PEAction::cont; } bool needs_completion() const override { return false; } @@ -142,6 +144,7 @@ class ScatterAlState : public AlState { MPI_Comm comm_; MPI_Request req_ = MPI_REQUEST_NULL; + bool mem_xfer_done_ = false; bool scatter_started_ = false; bool scatter_done_ = false; diff --git a/src/mpi_impl.hpp b/src/mpi_impl.hpp index d6f5773d..bee6d165 100644 --- a/src/mpi_impl.hpp +++ b/src/mpi_impl.hpp @@ -572,13 +572,13 @@ class MPIPassthroughAlState : public MPIAlState { } return false; } - bool step() override { + PEAction step() override { int flag; MPI_Test(&mpi_req, &flag, MPI_STATUS_IGNORE); if (flag) { - return true; + return PEAction::complete; } - return false; + return PEAction::cont; } std::string get_name() const override { return "MPIPassthrough"; } private: @@ -702,7 +702,7 @@ class MPIRecursiveDoublingAlState : public MPIAlState { } return r; } - bool step() override { + PEAction step() override { // Check the send/recv from setup, if any. if (!setup_comm_done) { if (this->test_send_recv()) { @@ -712,19 +712,19 @@ class MPIRecursiveDoublingAlState : public MPIAlState { this->reduction_op(this->recv_to, this->recvbuf, this->count); } } else { - return false; + return PEAction::cont; } } // Complete final communication in the non-power-of-2 case. if (final_comm_started) { - return this->test_send_recv(); + return this->test_send_recv() ? PEAction::complete : PEAction::cont; } if (adjusted_rank == -1) { // Just need to wait for data. MPI_Irecv(this->recvbuf, this->count, this->type, this->rank + 1, this->tag, this->comm, &(this->send_recv_reqs[0])); final_comm_started = true; - return false; + return PEAction::cont; } bool test = this->test_send_recv(); if (started && test) { @@ -737,9 +737,9 @@ class MPIRecursiveDoublingAlState : public MPIAlState { MPI_Isend(this->recvbuf, this->count, this->type, this->rank - 1, this->tag, this->comm, &(this->send_recv_reqs[0])); final_comm_started = true; - return false; + return PEAction::cont; } else { - return true; + return PEAction::complete; } } } @@ -753,7 +753,7 @@ class MPIRecursiveDoublingAlState : public MPIAlState { this->recv_to, this->count, partner); started = true; } - return false; + return PEAction::cont; } std::string get_name() const override { return "MPIRecursiveDoubling"; } private: @@ -1020,7 +1020,7 @@ class MPIRingAlState : public MPIAlState { } return r; } - bool step() override { + PEAction step() override { if (phase == 0) { if (rs_step()) { // Switch to allgather for next step. @@ -1028,9 +1028,9 @@ class MPIRingAlState : public MPIAlState { started = false; cur_step = 0; } - return false; + return PEAction::cont; } else { - return ag_step(); + return ag_step() ? PEAction::complete : PEAction::cont; } } std::string get_name() const override { return "MPIRing"; } @@ -1330,7 +1330,7 @@ class MPIRabenseifnerAlState : public MPIAlState { } return r; } - bool step() override { + PEAction step() override { // Complete setup communication, if any. if (!setup_comm_done) { if (this->test_send_recv()) { @@ -1340,19 +1340,19 @@ class MPIRabenseifnerAlState : public MPIAlState { this->reduction_op(this->recv_to, this->recvbuf, this->count); } } else { - return false; + return PEAction::cont; } } // Complete final communication in the non-power-of-2 case. if (final_comm_started) { - return this->test_send_recv(); + return this->test_send_recv() ? PEAction::complete : PEAction::cont; } if (adjusted_rank == -1) { // Just need to wait for data. MPI_Irecv(this->recvbuf, this->count, this->type, this->rank + 1, this->tag, this->comm, &(this->send_recv_reqs[0])); final_comm_started = true; - return false; + return PEAction::cont; } if (phase == 0) { if (rs_step()) { @@ -1362,7 +1362,7 @@ class MPIRabenseifnerAlState : public MPIAlState { slice_mask >>= 1; partner_mask = 1; } - return false; + return PEAction::cont; } else { if (ag_step()) { // Done, but in the non-power-of-2 case we need to send to our partner. @@ -1370,12 +1370,12 @@ class MPIRabenseifnerAlState : public MPIAlState { MPI_Isend(this->recvbuf, this->count, this->type, this->rank - 1, this->tag, this->comm, &(this->send_recv_reqs[0])); final_comm_started = true; - return false; + return PEAction::cont; } else { - return true; + return PEAction::complete; } } else { - return false; + return PEAction::cont; } } } diff --git a/src/progress.cpp b/src/progress.cpp index 66f91e93..d6829e0a 100644 --- a/src/progress.cpp +++ b/src/progress.cpp @@ -137,14 +137,21 @@ std::ostream& ProgressEngine::dump_state(std::ostream& ss) { // Note: This pulls *directly from internal state*. // This is *not* thread safe, and stuff might blow up. // You should only be dumping state where you don't care about that anyway. - const size_t run_queue_size = run_queue.size(); - ss << "Run queue (" << run_queue_size << "):\n"; - for (size_t i = 0; i < run_queue_size; ++i) { - ss << i << ": "; - if (run_queue[i]) { - ss << run_queue[i]->get_name() << " " << run_queue[i]->get_desc() << "\n"; - } else { - ss << "(unknown)\n"; + for (auto&& stream_pipeline_pair : run_queues) { + ss << "Pipelined run queue for stream " << stream_pipeline_pair.first << ":\n"; + auto&& pipeline = stream_pipeline_pair.second; + for (size_t stage = 0; stage < AL_PE_NUM_PIPELINE_STAGES; ++stage) { + const size_t stage_queue_size = pipeline[stage].size(); + ss << "Stage " << stage << " run queue (" << stage_queue_size << "):\n"; + for (size_t i = 0; i < stage_queue_size; ++i) { + ss << i << ": "; + if (pipeline[stage][i]) { + ss << pipeline[stage][i]->get_name() << " " + << pipeline[stage][i]->get_desc() << "\n"; + } else { + ss << "(unknown)\n"; + } + } } } const size_t req_queue_size = num_input_streams.load(); @@ -264,7 +271,13 @@ void ProgressEngine::engine() { break; } if (do_start) { - run_queue.push_back(req); + // Add to end of first pipeline stage. + // Create run queues if needed. + if (!run_queues.count(req->get_compute_stream())) { + run_queues.emplace(req->get_compute_stream(), + decltype(run_queues)::mapped_type{}); + } + run_queues[req->get_compute_stream()][0].push_back(req); req->start(); #ifdef AL_DEBUG_HANG_CHECK req->start_time = get_time(); @@ -282,42 +295,90 @@ void ProgressEngine::engine() { } } // Process one step of each in-progress request. - for (auto i = run_queue.begin(); i != run_queue.end();) { - AlState* req = *i; - if (req->step()) { - if (req->needs_completion()) { - req->get_req()->store(true, std::memory_order_release); - } - if (req->get_run_type() == RunType::bounded) { - --num_bounded; - } - if (req->blocks()) { - // Unblock the associated input queue. - request_queues[blocking_reqs[req]].blocked = false; - blocking_reqs.erase(req); - } + for (auto&& stream_pipeline_pair : run_queues) { + auto&& pipeline = stream_pipeline_pair.second; + for (size_t stage = 0; stage < AL_PE_NUM_PIPELINE_STAGES; ++stage) { + // Process this stage of the pipeline. + for (auto i = pipeline[stage].begin(); i != pipeline[stage].end();) { + AlState* req = *i; + // Simply skip over paused states. + if (req->paused_for_advance) { + ++i; + } else { + PEAction action = req->step(); + switch (action) { + case PEAction::cont: + // Nothing to do here. +#ifdef AL_DEBUG_HANG_CHECK + // Check whether we have hung. + if (!req->hang_reported) { + double t = get_time(); + if (t - req->start_time > 10.0 + world_comm->rank()) { + std::cout << world_comm->rank() + << ": Progress engine detected a possible hang" + << " state=" << req << " " << req->get_name() + << " compute_stream=" << req->get_compute_stream() + << " run_type=" + << (req->get_run_type() == RunType::bounded ? "bounded" : "unbounded") + << " blocks=" << req->blocks() << std::endl; + req->hang_reported = true; + } + } +#endif + ++i; + break; + case PEAction::advance: +#ifdef AL_DEBUG + // Ensure we don't advance too far. + if (stage + 1 >= AL_PE_NUM_PIPELINE_STAGES) { + throw_al_exception("Trying to advance pipeline stage too far"); + } +#endif + // Only move if this is the head of the pipeline stage. + if (i == pipeline[stage].begin()) { + pipeline[stage+1].push_back(req); + i = pipeline[stage].erase(i); + } else { + req->paused_for_advance = true; + ++i; + } + break; + case PEAction::complete: + if (req->needs_completion()) { + req->get_req()->store(true, std::memory_order_release); + } + if (req->get_run_type() == RunType::bounded) { + --num_bounded; + } + if (req->blocks()) { + // Unblock the associated input queue. + request_queues[blocking_reqs[req]].blocked = false; + blocking_reqs.erase(req); + } #ifdef AL_TRACE - trace::record_pe_done(*req); + trace::record_pe_done(*req); #endif - delete req; - i = run_queue.erase(i); - } else { -#ifdef AL_DEBUG_HANG_CHECK - if (!req->hang_reported) { - double t = get_time(); - if (t - req->start_time > 10.0 + world_comm->rank()) { - std::cout << world_comm->rank() - << ": Progress engine detected a possible hang" - << " state=" << req << " " << req->get_name() - << " compute_stream=" << req->get_compute_stream() - << " run_type=" - << (req->get_run_type() == RunType::bounded ? "bounded" : "unbounded") - << " blocks=" << req->blocks() << std::endl; - req->hang_reported = true; + delete req; + i = pipeline[stage].erase(i); + break; + default: + throw_al_exception("Unknown PEAction"); + break; + } + } + } + // Check whether we can advance paused states. + for (auto i = pipeline[stage].begin(); i != pipeline[stage].end();) { + AlState* req = *i; + if (req->paused_for_advance) { + // Move to the next stage. + req->paused_for_advance = false; + pipeline[stage+1].push_back(req); + i = pipeline[stage].erase(i); + } else { + break; // Nothing at the head to advance. } } -#endif - ++i; } } } diff --git a/src/progress.hpp b/src/progress.hpp index 5f41450b..8523a9c3 100644 --- a/src/progress.hpp +++ b/src/progress.hpp @@ -40,6 +40,7 @@ #include #include #include +#include namespace Al { @@ -71,15 +72,33 @@ enum class RunType { unbounded }; +/** Actions a state can ask the progress engine to do. */ +enum class PEAction { + /** Do nothing (i.e. keep running as it is now). */ + cont, + /** Advance the state to the next pipeline stage. */ + advance, + /** Operation is complete. */ + complete +}; + /** * Represents the state and algorithm for an asynchronous operation. * A non-blocking operation should create one of these and enqueue it for * execution by the progress thread. Specific implementations can override * as needed. + * * An algorithm should be broken up into steps which execute some small, * discrete operation. Steps from different operations may be interleaved. * Note that the memory pool is not thread-safe, so memory from it should be * pre-allocated before enqueueing. + * + * The steps are run through a simple pipeline. The algorithm can request it + * advance to the next stage by returning PEAction::advance. Operations enqueud + * on the same compute stream will only be advanced in the order they were + * enqueued. If a state asks to advance but it is not at the head of its + * pipeline stage, step will not be called again until it has successfully + * advanced. */ class AlState { friend class ProgressEngine; @@ -94,10 +113,9 @@ class AlState { virtual void start(); /** * Run one step of the algorithm. - * Return true if the operation has completed, false if it has more steps - * remaining. + * Return the action the algorithm wishes the progress engine to take. */ - virtual bool step() = 0; + virtual PEAction step() = 0; /** Return the associated request. */ AlRequest& get_req() { return req; } /** True if this is meant to be waited on by the user. */ @@ -119,6 +137,8 @@ class AlState { double start_time = std::numeric_limits::max(); #endif profiling::ProfileRange prof_range; + /** Whether execution of this operation is paused on pipeline advancement. */ + bool paused_for_advance = false; }; /** @@ -332,12 +352,12 @@ class ProgressEngine { */ std::unordered_map stream_to_queue; /** - * Arbitrary-length run queue. + * Per-stream pipelined run queues. * This should be accessed only by the progress engine. * Using a vector for compactness and to avoid repeated memory allocations. - * @todo May extend OrderedArray to handle this. + * @todo May extend OrderedArray / make a new class to handle this. */ - std::vector run_queue; + std::unordered_map, AL_PE_NUM_PIPELINE_STAGES>> run_queues; /** Number of currently-active bounded-length operations. */ size_t num_bounded = 0; /** diff --git a/src/tuning_params.hpp b/src/tuning_params.hpp index 9b17ff95..da01d391 100644 --- a/src/tuning_params.hpp +++ b/src/tuning_params.hpp @@ -51,7 +51,10 @@ * This must be a positive number. */ #define AL_PE_NUM_CONCURRENT_OPS 4 +/** Max number of streams the progress engine supports. */ #define AL_PE_NUM_STREAMS 64 +/** Max number of pipeline stages the progress engine supports. */ +#define AL_PE_NUM_PIPELINE_STAGES 2 /** Whether to protect memory pools with locks. */ #define AL_LOCK_MEMPOOL 1 diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index f1b1399d..7bb6e910 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -28,6 +28,9 @@ set(TEST_SRCS test_gather.cpp test_scatter.cpp test_pt2pt.cpp + test_exchange.cpp + test_transfer_to_one.cpp + test_transfer_from_one.cpp test_multi_nballreduces.cpp) foreach(src ${TEST_SRCS}) diff --git a/test/test_exchange.cpp b/test/test_exchange.cpp new file mode 100644 index 00000000..124c7bb3 --- /dev/null +++ b/test/test_exchange.cpp @@ -0,0 +1,117 @@ +#include +#include "Al.hpp" +#include "test_utils.hpp" +#ifdef AL_HAS_MPI_CUDA +#include "test_utils_mpi_cuda.hpp" +#endif + +#include +#include +#include +#include + +size_t start_size = 1; +size_t max_size = 1<<30; +const int num_peers = 4; // Should be even, but will not wrap ranks. + +template +void do_transfer(const typename VectorType::type& expected_recv, + typename VectorType::type& to_send, + const std::vector peers, + typename Backend::comm_type& comm) { + auto recv = get_vector(expected_recv.size()); + std::vector requests; + const size_t chunk_per_peer = to_send.size() / peers.size(); + for (size_t i = 0; i < peers.size(); ++i) { + int peer = peers[i]; + requests.push_back(Backend::null_req); + auto& req = requests.back(); + Al::NonblockingSendRecv( + to_send.data() + i*chunk_per_peer, chunk_per_peer, peer, + recv.data() + i*chunk_per_peer, chunk_per_peer, peer, + comm, req); + } + for (auto& req : requests) { + Al::Wait(req); + } + if (!check_vector(expected_recv, recv)) { + std::cout << comm.rank() << ": sendrecv does not match" << std::endl; + std::abort(); + } +} + +template +void test_correctness() { + typename Backend::comm_type comm = get_comm_with_stream(MPI_COMM_WORLD); + if (comm.size() <= num_peers) { + std::cout << "Communicator size " << comm.size() << " too small for " + << num_peers << " peers" << std::endl; + std::abort(); + } + const int peers_per_side = num_peers / 2; + std::vector peers; + for (int peer = comm.rank() - peers_per_side; + peer <= comm.rank() + peers_per_side; + ++peer) { + if (peer < 0 || peer >= comm.size() || peer == comm.rank()) { + continue; + } + peers.push_back(peer); + } + { + std::stringstream ss; + ss << comm.rank() << ": Peers: "; + for (const auto& peer : peers) ss << peer << " "; + std::cout << ss.str() << std::endl; + } + // Compute sizes to test. + std::vector sizes = get_sizes(start_size, max_size, true); + for (const auto& size : sizes) { + if (comm.rank() == 0) { + std::cout << "Testing size " << human_readable_size(size) << std::endl; + } + for (size_t trial = 0; trial < 1; ++trial) { + const size_t total_size = size*peers.size(); + std::vector host_to_send(total_size, comm.rank()); + std::vector host_to_recv(total_size); + // Compute correct values. + for (size_t i = 0; i < peers.size(); ++i) { + int peer = peers[i]; + std::fill_n(host_to_recv.data() + i*size, size, peer); + } + typename VectorType::type to_send(host_to_send); + typename VectorType::type to_recv(host_to_recv); + MPI_Barrier(MPI_COMM_WORLD); + do_transfer(to_recv, to_send, peers, comm); + } + } + free_comm_with_stream(comm); +} + +int main(int argc, char** argv) { + // Need to set the CUDA device before initializing Aluminum. +#ifdef AL_HAS_CUDA + set_device(); +#endif + Al::Initialize(argc, argv); + + std::string backend = "MPI"; + parse_args(argc, argv, backend, start_size, max_size); + + if (backend == "MPI") { + std::cerr << "Point-to-point not supported on MPI backend." << std::endl; + std::abort(); +#ifdef AL_HAS_NCCL + } else if (backend == "NCCL") { + std::cerr << "Point-to-point not supported on NCCL backend." << std::endl; + std::abort(); +#endif +#ifdef AL_HAS_MPI_CUDA + } else if (backend == "MPI-CUDA") { + test_correctness(); +#endif + } + + Al::Finalize(); + return 0; +} diff --git a/test/test_transfer_from_one.cpp b/test/test_transfer_from_one.cpp new file mode 100644 index 00000000..2b1a93a6 --- /dev/null +++ b/test/test_transfer_from_one.cpp @@ -0,0 +1,134 @@ +#include +#include "Al.hpp" +#include "test_utils.hpp" +#ifdef AL_HAS_MPI_CUDA +#include "test_utils_mpi_cuda.hpp" +#endif + +#include +#include +#include +#include + +size_t start_size = 1; +size_t max_size = 1<<30; +// One rank will receive from all the others. +const int num_peers = 4; + +template +void do_recv(const typename VectorType::type& expected_recv, + int src, + typename Backend::comm_type& comm) { + auto recv = get_vector(expected_recv.size()); + typename Backend::req_type req; + Al::NonblockingSendRecv( + nullptr, 0, src, + recv.data(), recv.size(), src, + comm, req); + Al::Wait(req); + if (!check_vector(expected_recv, recv)) { + std::cout << comm.rank() << ": recv does not match" << std::endl; + std::abort(); + } +} + +template +void do_send(typename VectorType::type& to_send, + const std::vector dests, + typename Backend::comm_type& comm) { + std::vector requests; + const size_t chunk_per_peer = to_send.size() / dests.size(); + for (size_t i = 0; i < dests.size(); ++i) { + int dest = dests[i]; + requests.push_back(Backend::null_req); + auto& req = requests.back(); + Al::NonblockingSendRecv( + to_send.data() + i*chunk_per_peer, chunk_per_peer, dest, + nullptr, 0, dest, + comm, req); + } + for (auto& req : requests) { + Al::Wait(req); + } +} + +template +void test_correctness() { + typename Backend::comm_type comm = get_comm_with_stream(MPI_COMM_WORLD); + if (comm.size() % num_peers != 0) { + std::cout << "Communicator size " << comm.size() + << " not evenly divisible by " << num_peers + << " peers" << std::endl; + std::abort(); + } + std::vector sizes = get_sizes(start_size, max_size, true); + if (comm.rank() % num_peers == 0) { + // This rank sends everything. + std::vector peers; + for (int peer = comm.rank() + 1; peer < comm.rank() + num_peers; ++peer) { + peers.push_back(peer); + } + { + std::stringstream ss; + ss << comm.rank() << ": Peers: "; + for (const auto& peer : peers) ss << peer << " "; + std::cout << ss.str() << std::endl; + } + for (const auto& size : sizes) { + if (comm.rank() == 0) { + std::cout << "Testing size " << human_readable_size(size) << std::endl; + } + for (size_t trial = 0; trial < 1000; ++trial) { + if (comm.rank() == 0) { + std::cout << "Trial " << trial << std::endl; + } + const size_t total_size = size*peers.size(); + std::vector host_to_send(total_size, comm.rank()); + typename VectorType::type to_send(host_to_send); + MPI_Barrier(MPI_COMM_WORLD); + do_send(to_send, peers, comm); + } + } + } else { + // Receive from sending rank. + int peer = comm.rank() - (comm.rank() % num_peers); + std::cout << comm.rank() << " Peers: " << peer << std::endl; + for (const auto& size : sizes) { + for (size_t trial = 0; trial < 1000; ++trial) { + std::vector host_to_recv(size, peer); + typename VectorType::type to_recv(host_to_recv); + MPI_Barrier(MPI_COMM_WORLD); + do_recv(to_recv, peer, comm); + } + } + } + free_comm_with_stream(comm); +} + +int main(int argc, char** argv) { + // Need to set the CUDA device before initializing Aluminum. +#ifdef AL_HAS_CUDA + set_device(); +#endif + Al::Initialize(argc, argv); + + std::string backend = "MPI"; + parse_args(argc, argv, backend, start_size, max_size); + + if (backend == "MPI") { + std::cerr << "Point-to-point not supported on MPI backend." << std::endl; + std::abort(); +#ifdef AL_HAS_NCCL + } else if (backend == "NCCL") { + std::cerr << "Point-to-point not supported on NCCL backend." << std::endl; + std::abort(); +#endif +#ifdef AL_HAS_MPI_CUDA + } else if (backend == "MPI-CUDA") { + test_correctness(); +#endif + } + + Al::Finalize(); + return 0; +} diff --git a/test/test_transfer_to_one.cpp b/test/test_transfer_to_one.cpp new file mode 100644 index 00000000..9900df53 --- /dev/null +++ b/test/test_transfer_to_one.cpp @@ -0,0 +1,138 @@ +#include +#include "Al.hpp" +#include "test_utils.hpp" +#ifdef AL_HAS_MPI_CUDA +#include "test_utils_mpi_cuda.hpp" +#endif + +#include +#include +#include +#include + +size_t start_size = 1; +size_t max_size = 1<<30; +// One rank will receive from all the others. +const int num_peers = 4; + +template +void do_recv(const typename VectorType::type& expected_recv, + const std::vector srcs, + typename Backend::comm_type& comm) { + auto recv = get_vector(expected_recv.size()); + std::vector requests; + const size_t chunk_per_peer = expected_recv.size() / srcs.size(); + for (size_t i = 0; i < srcs.size(); ++i) { + int src = srcs[i]; + requests.push_back(Backend::null_req); + auto& req = requests.back(); + Al::NonblockingSendRecv( + nullptr, 0, src, + recv.data() + i*chunk_per_peer, chunk_per_peer, src, + comm, req); + } + for (auto& req : requests) { + Al::Wait(req); + } + if (!check_vector(expected_recv, recv)) { + std::cout << comm.rank() << ": recv does not match" << std::endl; + std::abort(); + } +} + +template +void do_send(typename VectorType::type& to_send, + const int dest, + typename Backend::comm_type& comm) { + typename Backend::req_type req; + Al::NonblockingSendRecv( + to_send.data(), to_send.size(), dest, + nullptr, 0, dest, + comm, req); + Al::Wait(req); +} + +template +void test_correctness() { + typename Backend::comm_type comm = get_comm_with_stream(MPI_COMM_WORLD); + if (comm.size() % num_peers != 0) { + std::cout << "Communicator size " << comm.size() + << " not evenly divisible by " << num_peers + << " peers" << std::endl; + std::abort(); + } + std::vector sizes = get_sizes(start_size, max_size, true); + if (comm.rank() % num_peers == 0) { + // This rank receives everything. + std::vector peers; + for (int peer = comm.rank() + 1; peer < comm.rank() + num_peers; ++peer) { + peers.push_back(peer); + } + { + std::stringstream ss; + ss << comm.rank() << ": Peers: "; + for (const auto& peer : peers) ss << peer << " "; + std::cout << ss.str() << std::endl; + } + for (const auto& size : sizes) { + if (comm.rank() == 0) { + std::cout << "Testing size " << human_readable_size(size) << std::endl; + } + for (size_t trial = 0; trial < 1000; ++trial) { + if (comm.rank() == 0) { + std::cout << "Trial " << trial << std::endl; + } + const size_t total_size = size*peers.size(); + std::vector host_to_recv(total_size); + // Compute correct values. + for (size_t i = 0; i < peers.size(); ++i) { + std::fill_n(host_to_recv.data() + i*size, size, peers[i]); + } + typename VectorType::type to_recv(host_to_recv); + MPI_Barrier(MPI_COMM_WORLD); + do_recv(to_recv, peers, comm); + } + } + } else { + // Sends to receiving rank. + int peer = comm.rank() - (comm.rank() % num_peers); + std::cout << comm.rank() << " Peers: " << peer << std::endl; + for (const auto& size : sizes) { + for (size_t trial = 0; trial < 1000; ++trial) { + std::vector host_to_send(size, comm.rank()); + typename VectorType::type to_send(host_to_send); + MPI_Barrier(MPI_COMM_WORLD); + do_send(to_send, peer, comm); + } + } + } + free_comm_with_stream(comm); +} + +int main(int argc, char** argv) { + // Need to set the CUDA device before initializing Aluminum. +#ifdef AL_HAS_CUDA + set_device(); +#endif + Al::Initialize(argc, argv); + + std::string backend = "MPI"; + parse_args(argc, argv, backend, start_size, max_size); + + if (backend == "MPI") { + std::cerr << "Point-to-point not supported on MPI backend." << std::endl; + std::abort(); +#ifdef AL_HAS_NCCL + } else if (backend == "NCCL") { + std::cerr << "Point-to-point not supported on NCCL backend." << std::endl; + std::abort(); +#endif +#ifdef AL_HAS_MPI_CUDA + } else if (backend == "MPI-CUDA") { + test_correctness(); +#endif + } + + Al::Finalize(); + return 0; +}