From f07bbeef00ded77a18b65d7aa591a1ca92258261 Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Sat, 7 Dec 2024 00:55:39 +0000 Subject: [PATCH 1/7] WIP --- src/executor/execution_plan.cc | 25 ++++++++++++++++++------- src/executor/executor.cc | 20 +++++++++++--------- src/include/execution_plan.hpp | 1 + 3 files changed, 30 insertions(+), 16 deletions(-) diff --git a/src/executor/execution_plan.cc b/src/executor/execution_plan.cc index 144fb4174..2d72d00ef 100644 --- a/src/executor/execution_plan.cc +++ b/src/executor/execution_plan.cc @@ -165,20 +165,31 @@ std::vector ExecutionPlan::Impl::getConnectedBufferTypes(int rank) c } return std::vector(bufferTypes.begin(), bufferTypes.end()); } + size_t ExecutionPlan::Impl::getScratchBufferSize(int rank, size_t inputSize, size_t outputSize) const { - size_t sizePerRank; + size_t sizePerChunk = 0; if (this->inputChunks.at(rank) != 0) - sizePerRank = inputSize / this->inputChunks.at(rank); - else if (this->outputChunks.at(rank) != 0) - sizePerRank = outputSize / this->outputChunks.at(rank); + sizePerChunk = inputSize / this->inputChunks.at(rank); else - throw mscclpp::Error("Output or Input chunks must be greater than 0", mscclpp::ErrorCode::ExecutorError); + throw mscclpp::Error("Input chunks must be greater than 0", mscclpp::ErrorCode::ExecutorError); if (this->isUsingPacket) { - return sizePerRank * this->scratchChunks.at(rank) * 2 /* data + flag*/ * 2 /*double buffer*/; + return sizePerChunk * this->scratchChunks.at(rank) * 2 /* data + flag*/ * 2 /*double buffer*/; } - return sizePerRank * this->scratchChunks.at(rank); + return sizePerChunk * this->scratchChunks.at(rank); } + +size_t ExecutionPlan::Impl::getMaxScratchBufferSize(int rank) const { + size_t sizePerChunk = 0; + size_t inputChunks = this->inputChunks.at(rank); + if (inputChunks != 0) + sizePerChunk = this->maxMessageSize + inputChunks - 1 / inputChunks; + else + throw mscclpp::Error("Input chunks must be greater than 0", mscclpp::ErrorCode::ExecutorError); + + return this->getScratchBufferSize(rank, this->maxMessageSize, sizePerChunk * this->outputChunks.at(rank)); +} + std::vector ExecutionPlan::Impl::getOperations(int rank, int threadblock) const { return this->operations.at(rank)[threadblock]; } diff --git a/src/executor/executor.cc b/src/executor/executor.cc index ad694b3f7..da7729219 100644 --- a/src/executor/executor.cc +++ b/src/executor/executor.cc @@ -140,8 +140,8 @@ struct Executor::Impl { ExecutionContext setupExecutionContext(int rank, void* sendbuff, void* recvbuff, size_t inputMessageSize, size_t outputMessageSize, size_t constSrcOffset, size_t constDstOffset, - size_t sendBufferSize, size_t recvBufferSize, const ExecutionPlan& plan) { - ExecutionContextKey key = {sendbuff, recvbuff, sendBufferSize, recvBufferSize, plan.impl_->name}; + size_t sendMemRange, size_t recvMemRange, const ExecutionPlan& plan) { + ExecutionContextKey key = {sendbuff, recvbuff, sendMemRange, recvMemRange, plan.impl_->name}; DeviceExecutionPlanKey devicePlanKey = {inputMessageSize, outputMessageSize, constSrcOffset, constDstOffset}; if (this->contexts.find(key) != this->contexts.end()) { auto& devicePlans = this->contexts[key].deviceExecutionPlans; @@ -167,7 +167,9 @@ struct Executor::Impl { plan.impl_->loadExecutionPlan(inputMessageSize, outputMessageSize, constSrcOffset, constDstOffset); ExecutionContext context; - size_t scratchBufferSize = plan.impl_->getScratchBufferSize(rank, sendBufferSize, recvBufferSize); + size_t maxScratchBufferSize = plan.impl_->getMaxScratchBufferSize(rank); + size_t scratchBufferSize = + std::min(plan.impl_->getScratchBufferSize(rank, sendMemRange, recvMemRange), maxScratchBufferSize); std::shared_ptr scratchBuffer; if (isNvlsSupported()) { scratchBuffer = allocSharedPhysicalCuda(scratchBufferSize); @@ -179,8 +181,8 @@ struct Executor::Impl { context.proxyService = std::make_shared(); context.nthreadsPerBlock = plan.impl_->getNThreadsPerBlock(); this->setupConnections(context, rank, plan); - this->setupRegisteredMemories(context, sendbuff, recvbuff, sendBufferSize, recvBufferSize, rank, plan); - this->setupChannels(context, sendbuff, recvbuff, sendBufferSize, recvBufferSize, rank, plan); + this->setupRegisteredMemories(context, sendbuff, recvbuff, sendMemRange, recvMemRange, rank, plan); + this->setupChannels(context, sendbuff, recvbuff, sendMemRange, recvMemRange, rank, plan); this->setupNvlsChannels(context, sendbuff, recvbuff, rank, plan); this->setupDeviceExecutionPlan(context, devicePlanKey, rank, plan); context.deviceExecutionPlansBuffers[devicePlanKey] = @@ -433,16 +435,16 @@ Executor::Executor(std::shared_ptr comm) : impl_(std::make_unique< void Executor::execute(int rank, void* sendbuff, void* recvbuff, size_t sendBuffSize, [[maybe_unused]] size_t recvBuffSize, DataType dataType, const ExecutionPlan& plan, cudaStream_t stream, PacketType packetType) { - size_t sendBytes, recvBytes; + size_t sendMemRange, recvMemRange; CUdeviceptr sendBasePtr, recvBasePtr; - MSCCLPP_CUTHROW(cuMemGetAddressRange(&sendBasePtr, &sendBytes, (CUdeviceptr)sendbuff)); - MSCCLPP_CUTHROW(cuMemGetAddressRange(&recvBasePtr, &recvBytes, (CUdeviceptr)recvbuff)); + MSCCLPP_CUTHROW(cuMemGetAddressRange(&sendBasePtr, &sendMemRange, (CUdeviceptr)sendbuff)); + MSCCLPP_CUTHROW(cuMemGetAddressRange(&recvBasePtr, &recvMemRange, (CUdeviceptr)recvbuff)); size_t offsetIn = (char*)sendbuff - (char*)sendBasePtr; size_t offsetOut = (char*)recvbuff - (char*)recvBasePtr; ExecutionContext context = this->impl_->setupExecutionContext(rank, (void*)sendBasePtr, (void*)recvBasePtr, sendBuffSize, recvBuffSize, - offsetIn, offsetOut, sendBytes, recvBytes, plan); + offsetIn, offsetOut, sendMemRange, recvMemRange, plan); this->impl_->launchKernel(context, rank, sendbuff, recvbuff, dataType, stream, packetType); } diff --git a/src/include/execution_plan.hpp b/src/include/execution_plan.hpp index 8d291f45f..3af585508 100644 --- a/src/include/execution_plan.hpp +++ b/src/include/execution_plan.hpp @@ -73,6 +73,7 @@ struct ExecutionPlan::Impl { std::vector getConnectedPeers(int rank) const; std::vector getConnectedBufferTypes(int rank) const; size_t getScratchBufferSize(int rank, size_t inputSize, size_t outputSize) const; + size_t getMaxScratchBufferSize(int rank) const; std::vector getOperations(int rank, int threadblock) const; int getThreadblockCount(int rank) const; int getNThreadsPerBlock() const; From dcda43bf4fecd4d4dda233c8840fb3c97a3e1f4a Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Sat, 7 Dec 2024 00:56:55 +0000 Subject: [PATCH 2/7] fix --- src/executor/execution_plan.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/executor/execution_plan.cc b/src/executor/execution_plan.cc index 2d72d00ef..00ee231d0 100644 --- a/src/executor/execution_plan.cc +++ b/src/executor/execution_plan.cc @@ -183,7 +183,7 @@ size_t ExecutionPlan::Impl::getMaxScratchBufferSize(int rank) const { size_t sizePerChunk = 0; size_t inputChunks = this->inputChunks.at(rank); if (inputChunks != 0) - sizePerChunk = this->maxMessageSize + inputChunks - 1 / inputChunks; + sizePerChunk = (this->maxMessageSize + inputChunks - 1) / inputChunks; else throw mscclpp::Error("Input chunks must be greater than 0", mscclpp::ErrorCode::ExecutorError); From 8cb4269c87a783200ec34705752731ff8acd4192 Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Sat, 7 Dec 2024 01:01:20 +0000 Subject: [PATCH 3/7] WIp --- src/executor/execution_plan.cc | 4 ++-- src/executor/executor.cc | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/executor/execution_plan.cc b/src/executor/execution_plan.cc index 00ee231d0..e17e41878 100644 --- a/src/executor/execution_plan.cc +++ b/src/executor/execution_plan.cc @@ -166,7 +166,7 @@ std::vector ExecutionPlan::Impl::getConnectedBufferTypes(int rank) c return std::vector(bufferTypes.begin(), bufferTypes.end()); } -size_t ExecutionPlan::Impl::getScratchBufferSize(int rank, size_t inputSize, size_t outputSize) const { +size_t ExecutionPlan::Impl::getScratchBufferSize(int rank, size_t inputSize) const { size_t sizePerChunk = 0; if (this->inputChunks.at(rank) != 0) sizePerChunk = inputSize / this->inputChunks.at(rank); @@ -187,7 +187,7 @@ size_t ExecutionPlan::Impl::getMaxScratchBufferSize(int rank) const { else throw mscclpp::Error("Input chunks must be greater than 0", mscclpp::ErrorCode::ExecutorError); - return this->getScratchBufferSize(rank, this->maxMessageSize, sizePerChunk * this->outputChunks.at(rank)); + return this->getScratchBufferSize(rank, sizePerChunk * this->inputChunks.at(rank)); } std::vector ExecutionPlan::Impl::getOperations(int rank, int threadblock) const { diff --git a/src/executor/executor.cc b/src/executor/executor.cc index da7729219..f960fa0f8 100644 --- a/src/executor/executor.cc +++ b/src/executor/executor.cc @@ -169,7 +169,7 @@ struct Executor::Impl { ExecutionContext context; size_t maxScratchBufferSize = plan.impl_->getMaxScratchBufferSize(rank); size_t scratchBufferSize = - std::min(plan.impl_->getScratchBufferSize(rank, sendMemRange, recvMemRange), maxScratchBufferSize); + std::min(plan.impl_->getScratchBufferSize(rank, sendMemRange), maxScratchBufferSize); std::shared_ptr scratchBuffer; if (isNvlsSupported()) { scratchBuffer = allocSharedPhysicalCuda(scratchBufferSize); From 06f58e316470504a6f3917bec44a80348aa2e413 Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Sat, 7 Dec 2024 01:07:29 +0000 Subject: [PATCH 4/7] WIP --- src/executor/execution_plan.cc | 5 +++-- src/executor/executor.cc | 3 +-- src/include/execution_plan.hpp | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/executor/execution_plan.cc b/src/executor/execution_plan.cc index e17e41878..22b8b85d5 100644 --- a/src/executor/execution_plan.cc +++ b/src/executor/execution_plan.cc @@ -168,8 +168,9 @@ std::vector ExecutionPlan::Impl::getConnectedBufferTypes(int rank) c size_t ExecutionPlan::Impl::getScratchBufferSize(int rank, size_t inputSize) const { size_t sizePerChunk = 0; - if (this->inputChunks.at(rank) != 0) - sizePerChunk = inputSize / this->inputChunks.at(rank); + size_t inputChunks = this->inputChunks.at(rank); + if (inputChunks != 0) + sizePerChunk = (inputSize + inputChunks - 1) / this->inputChunks.at(rank); else throw mscclpp::Error("Input chunks must be greater than 0", mscclpp::ErrorCode::ExecutorError); diff --git a/src/executor/executor.cc b/src/executor/executor.cc index f960fa0f8..5cb958028 100644 --- a/src/executor/executor.cc +++ b/src/executor/executor.cc @@ -168,8 +168,7 @@ struct Executor::Impl { ExecutionContext context; size_t maxScratchBufferSize = plan.impl_->getMaxScratchBufferSize(rank); - size_t scratchBufferSize = - std::min(plan.impl_->getScratchBufferSize(rank, sendMemRange), maxScratchBufferSize); + size_t scratchBufferSize = std::min(plan.impl_->getScratchBufferSize(rank, sendMemRange), maxScratchBufferSize); std::shared_ptr scratchBuffer; if (isNvlsSupported()) { scratchBuffer = allocSharedPhysicalCuda(scratchBufferSize); diff --git a/src/include/execution_plan.hpp b/src/include/execution_plan.hpp index 3af585508..13815a69e 100644 --- a/src/include/execution_plan.hpp +++ b/src/include/execution_plan.hpp @@ -72,7 +72,7 @@ struct ExecutionPlan::Impl { std::vector getNvlsInfos(int rank) const; std::vector getConnectedPeers(int rank) const; std::vector getConnectedBufferTypes(int rank) const; - size_t getScratchBufferSize(int rank, size_t inputSize, size_t outputSize) const; + size_t getScratchBufferSize(int rank, size_t inputSize) const; size_t getMaxScratchBufferSize(int rank) const; std::vector getOperations(int rank, int threadblock) const; int getThreadblockCount(int rank) const; From d9ec000f1e6dcea03f30fb792c3d214b90590c67 Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Sat, 7 Dec 2024 07:44:26 +0000 Subject: [PATCH 5/7] fix CI --- src/executor/execution_plan.cc | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/executor/execution_plan.cc b/src/executor/execution_plan.cc index 22b8b85d5..4540518d3 100644 --- a/src/executor/execution_plan.cc +++ b/src/executor/execution_plan.cc @@ -181,6 +181,9 @@ size_t ExecutionPlan::Impl::getScratchBufferSize(int rank, size_t inputSize) con } size_t ExecutionPlan::Impl::getMaxScratchBufferSize(int rank) const { + if (this->maxMessageSize == std::numeric_limits::max()) { + return std::numeric_limits::max(); + } size_t sizePerChunk = 0; size_t inputChunks = this->inputChunks.at(rank); if (inputChunks != 0) From 9a168acb01da97909e1b2023e3fb1b4dd865a265 Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Fri, 13 Dec 2024 03:20:29 +0000 Subject: [PATCH 6/7] update --- src/executor/execution_plan.cc | 29 ++++++++++++++++------------- src/executor/executor.cc | 3 ++- src/include/execution_plan.hpp | 2 +- 3 files changed, 19 insertions(+), 15 deletions(-) diff --git a/src/executor/execution_plan.cc b/src/executor/execution_plan.cc index 4540518d3..162c3ef1c 100644 --- a/src/executor/execution_plan.cc +++ b/src/executor/execution_plan.cc @@ -166,18 +166,19 @@ std::vector ExecutionPlan::Impl::getConnectedBufferTypes(int rank) c return std::vector(bufferTypes.begin(), bufferTypes.end()); } -size_t ExecutionPlan::Impl::getScratchBufferSize(int rank, size_t inputSize) const { - size_t sizePerChunk = 0; - size_t inputChunks = this->inputChunks.at(rank); - if (inputChunks != 0) - sizePerChunk = (inputSize + inputChunks - 1) / this->inputChunks.at(rank); +size_t ExecutionPlan::Impl::getScratchBufferSize(int rank, size_t inputSize, size_t outputSize) const { + size_t sizePerRank = 0; + if (this->inputChunks.at(rank) != 0) + sizePerRank = inputSize / this->inputChunks.at(rank); + else if (this->outputChunks.at(rank) != 0) + sizePerRank = outputSize / this->outputChunks.at(rank); else - throw mscclpp::Error("Input chunks must be greater than 0", mscclpp::ErrorCode::ExecutorError); + throw mscclpp::Error("Output or Input chunks must be greater than 0", mscclpp::ErrorCode::ExecutorError); if (this->isUsingPacket) { - return sizePerChunk * this->scratchChunks.at(rank) * 2 /* data + flag*/ * 2 /*double buffer*/; + return sizePerRank * this->scratchChunks.at(rank) * 2 /* data + flag*/ * 2 /*double buffer*/; } - return sizePerChunk * this->scratchChunks.at(rank); + return sizePerRank * this->scratchChunks.at(rank); } size_t ExecutionPlan::Impl::getMaxScratchBufferSize(int rank) const { @@ -185,13 +186,15 @@ size_t ExecutionPlan::Impl::getMaxScratchBufferSize(int rank) const { return std::numeric_limits::max(); } size_t sizePerChunk = 0; - size_t inputChunks = this->inputChunks.at(rank); - if (inputChunks != 0) - sizePerChunk = (this->maxMessageSize + inputChunks - 1) / inputChunks; + if (this->inputChunks.at(rank) != 0) + sizePerChunk = maxMessageSize / this->inputChunks.at(rank); + else if (this->outputChunks.at(rank) != 0) + sizePerChunk = maxMessageSize / this->outputChunks.at(rank); else - throw mscclpp::Error("Input chunks must be greater than 0", mscclpp::ErrorCode::ExecutorError); + throw mscclpp::Error("Output or Input chunks must be greater than 0", mscclpp::ErrorCode::ExecutorError); - return this->getScratchBufferSize(rank, sizePerChunk * this->inputChunks.at(rank)); + return this->getScratchBufferSize(rank, sizePerChunk * this->inputChunks.at(rank), + sizePerChunk * this->outputChunks.at(rank)); } std::vector ExecutionPlan::Impl::getOperations(int rank, int threadblock) const { diff --git a/src/executor/executor.cc b/src/executor/executor.cc index faddc5035..b8e8c6af3 100644 --- a/src/executor/executor.cc +++ b/src/executor/executor.cc @@ -168,7 +168,8 @@ struct Executor::Impl { ExecutionContext context; size_t maxScratchBufferSize = plan.impl_->getMaxScratchBufferSize(rank); - size_t scratchBufferSize = std::min(plan.impl_->getScratchBufferSize(rank, sendMemRange), maxScratchBufferSize); + size_t scratchBufferSize = + std::min(plan.impl_->getScratchBufferSize(rank, sendMemRange, recvMemRange), maxScratchBufferSize); std::shared_ptr scratchBuffer; if (isNvlsSupported()) { scratchBuffer = allocSharedPhysicalCuda(scratchBufferSize); diff --git a/src/include/execution_plan.hpp b/src/include/execution_plan.hpp index 13815a69e..3af585508 100644 --- a/src/include/execution_plan.hpp +++ b/src/include/execution_plan.hpp @@ -72,7 +72,7 @@ struct ExecutionPlan::Impl { std::vector getNvlsInfos(int rank) const; std::vector getConnectedPeers(int rank) const; std::vector getConnectedBufferTypes(int rank) const; - size_t getScratchBufferSize(int rank, size_t inputSize) const; + size_t getScratchBufferSize(int rank, size_t inputSize, size_t outputSize) const; size_t getMaxScratchBufferSize(int rank) const; std::vector getOperations(int rank, int threadblock) const; int getThreadblockCount(int rank) const; From b91da26247c729837487e09f1f77550a1832934f Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Fri, 13 Dec 2024 03:36:33 +0000 Subject: [PATCH 7/7] update --- .azure-pipelines/nccl-api-test.yaml | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/.azure-pipelines/nccl-api-test.yaml b/.azure-pipelines/nccl-api-test.yaml index e3d537fe4..c9e2fcc1c 100644 --- a/.azure-pipelines/nccl-api-test.yaml +++ b/.azure-pipelines/nccl-api-test.yaml @@ -156,6 +156,24 @@ jobs: mpirun -np 8 --bind-to numa --allow-run-as-root -x LD_PRELOAD=/root/mscclpp/build/apps/nccl/libmscclpp_nccl.so -x NCCL_DEBUG=WARN -x MSCCLPP_EXECUTION_PLAN_DIR=/root/mscclpp/msccl-users/execution-files /root/nccl-tests/build/all_reduce_perf -b 1K -e 1G -f 2 -d half -G 20 -w 10 -n 20"' workingDirectory: '$(System.DefaultWorkingDirectory)' + - task: Bash@3 + name: RunNcclGatherTest + displayName: Run NCCL Allreduce Test + inputs: + targetType: 'inline' + script: | + set -e + HOSTFILE=$(System.DefaultWorkingDirectory)/mscclpp/test/deploy/hostfile_ci + ROOT_DIR=$(System.DefaultWorkingDirectory)/mscclpp + SSH_OPTION="StrictHostKeyChecking=no" + KeyFilePath=${SSHKEYFILE_SECUREFILEPATH} + parallel-ssh -i -t 0 -h ${HOSTFILE} -x "-i ${KeyFilePath}" \ + -O $SSH_OPTION 'sudo docker exec -t mscclpp-test bash -c "\ + cd /root/mscclpp; \ + mpirun -np 8 --bind-to numa --allow-run-as-root -x LD_PRELOAD=/root/mscclpp/build/apps/nccl/libmscclpp_nccl.so -x NCCL_DEBUG=WARN -x MSCCLPP_EXECUTION_PLAN_DIR=/root/mscclpp/msccl-users/execution-files /root/nccl-tests/build/all_gather_perf -b 1K -e 1G -f 2 -d half -G 20 -w 10 -n 20"' + workingDirectory: '$(System.DefaultWorkingDirectory)' + + - task: AzureCLI@2 name: StopVMSS displayName: Deallocate VMSS