From 931c1cd11c30bac2ec08d06b0b305d6eba814e4e Mon Sep 17 00:00:00 2001 From: Balaram Buddharaju <169953907+brb-nv@users.noreply.github.com> Date: Thu, 18 Sep 2025 23:02:02 +0000 Subject: [PATCH 1/4] Switch to contiguous block dist among CP rank Signed-off-by: Balaram Buddharaju <169953907+brb-nv@users.noreply.github.com> --- .../batch_manager/cacheFormatter.cpp | 19 +- .../batch_manager/cacheFormatter.h | 6 +- .../batch_manager/mlaCacheFormatter.cpp | 34 +--- .../batch_manager/mlaCacheFormatter.h | 18 -- cpp/tensorrt_llm/common/envUtils.cpp | 6 + cpp/tensorrt_llm/common/envUtils.h | 2 + .../cache_transmission/cacheSplitConcat.cu | 108 +++++++++-- .../cache_transmission/cacheSplitConcat.h | 30 +++ .../multi_gpu/cacheTransceiverTest.cpp | 172 +++++++++++++----- 9 files changed, 278 insertions(+), 117 deletions(-) diff --git a/cpp/tensorrt_llm/batch_manager/cacheFormatter.cpp b/cpp/tensorrt_llm/batch_manager/cacheFormatter.cpp index e7520d47fc8..61466aac7de 100644 --- a/cpp/tensorrt_llm/batch_manager/cacheFormatter.cpp +++ b/cpp/tensorrt_llm/batch_manager/cacheFormatter.cpp @@ -42,21 +42,22 @@ namespace tensorrt_llm::batch_manager::kv_cache_manager { -BlockRange getBlockRangeForSending( - BaseKVCacheManager* cacheManager, LlmRequest const& llmRequest, BlockKey const& lastBlockKey, int32_t indexFromEnd) +BlockRange getBlockRangeForSending(BaseKVCacheManager* cacheManager, LlmRequest const& llmRequest, + BlockKey const& lastBlockKey, int32_t indexFromEnd, bool recvSideHasCP) { auto poolNum = cacheManager->getBlockManager().getNumPools(); - if (poolNum > 1 || !cacheManager->isEnableBlockReuse() || lastBlockKey.uniqueTokens.size() == 0) + // Note: When recv side has CP, the requested seqLen is lesser than seqLen on the sender side as seqLen is + // distributed among CP ranks. So, we transfer all blocks from send side. + if (poolNum > 1 || !cacheManager->isEnableBlockReuse() || lastBlockKey.uniqueTokens.size() == 0 || recvSideHasCP) { // disable reuse path, and vwsa don't support reuse. bool needSendAllForWindow = common::getEnvKVCacheTransferAllBlocksForWindow(); auto blockRange = BlockRange::fromAllBlockIds(*cacheManager, llmRequest.mRequestId); - // auto inputLen = llmRequest.getPromptLen(); auto const& windowsMetadata = cacheManager->getBlockManager().getWindowSizesMetadata(); - if ((windowsMetadata.size() == 1 || needSendAllForWindow)) + if (windowsMetadata.size() == 1 || needSendAllForWindow || recvSideHasCP) { return blockRange; } @@ -85,10 +86,11 @@ BlockRange getBlockRangeForSending( } BlockRange getBlockRangeForReceiving( - BaseKVCacheManager* cacheManager, LlmRequest const& llmRequest, bool srcEnableBlockReuse) + BaseKVCacheManager* cacheManager, LlmRequest const& llmRequest, bool srcEnableBlockReuse, bool recvSideHasCP) { auto poolNum = cacheManager->getBlockManager().getNumPools(); - if (poolNum == 1 && srcEnableBlockReuse) + // Note: When recv side has CP, we request all blocks from send side right now. + if (poolNum == 1 && srcEnableBlockReuse && !recvSideHasCP) { // Build from all block ids, then slice off the reused blocks so we only transfer newly allocated ones. auto windowSize = cacheManager->getBlockManager().getWindowSizesMetadata().begin()->first; @@ -121,9 +123,8 @@ BlockRange getBlockRangeForReceiving( } auto const& windowsMetadata = cacheManager->getBlockManager().getWindowSizesMetadata(); - if (windowsMetadata.size() == 1 || common::getEnvKVCacheTransferAllBlocksForWindow()) + if (windowsMetadata.size() == 1 || common::getEnvKVCacheTransferAllBlocksForWindow() || recvSideHasCP) { - return BlockRange::fromAllBlockIds(*cacheManager, llmRequest.mRequestId); } auto blockRange = BlockRange::fromAllBlockIds(*cacheManager, llmRequest.mRequestId); diff --git a/cpp/tensorrt_llm/batch_manager/cacheFormatter.h b/cpp/tensorrt_llm/batch_manager/cacheFormatter.h index c6c69000f0c..8c2a979d8e2 100644 --- a/cpp/tensorrt_llm/batch_manager/cacheFormatter.h +++ b/cpp/tensorrt_llm/batch_manager/cacheFormatter.h @@ -43,7 +43,7 @@ class TransferSession; namespace tensorrt_llm::batch_manager::kv_cache_manager { BlockRange getBlockRangeForSending(BaseKVCacheManager* cacheManager, LlmRequest const& llmRequest, - BlockKey const& lastBlockKey, SizeType32 indexFromEnd); + BlockKey const& lastBlockKey, SizeType32 indexFromEnd, bool recvSideHasCP = false); using DataContext = tensorrt_llm::executor::kv_cache::DataContext; using Connection = tensorrt_llm::executor::kv_cache::Connection; @@ -52,8 +52,8 @@ using BaseKVCacheManager = kv_cache_manager::BaseKVCacheManager; using CacheTransBufferManager = kv_cache_manager::CacheTransBufferManager; using BlockRange = kv_cache_manager::BlockRange; -BlockRange getBlockRangeForReceiving( - BaseKVCacheManager* cacheManager, LlmRequest const& llmRequest, bool srcEnableBlockReuse); +BlockRange getBlockRangeForReceiving(BaseKVCacheManager* cacheManager, LlmRequest const& llmRequest, + bool srcEnableBlockReuse, bool recvSideHasCP = false); // Used to support the cache transmission with different layouts and different protocols. class BaseCacheFormatter diff --git a/cpp/tensorrt_llm/batch_manager/mlaCacheFormatter.cpp b/cpp/tensorrt_llm/batch_manager/mlaCacheFormatter.cpp index 8e50ebb1a11..8a684bbf6ed 100644 --- a/cpp/tensorrt_llm/batch_manager/mlaCacheFormatter.cpp +++ b/cpp/tensorrt_llm/batch_manager/mlaCacheFormatter.cpp @@ -37,31 +37,6 @@ namespace tensorrt_llm::batch_manager::kv_cache_manager { -int getBlockNumAccountingForCP(int cpRank, int cpSize, int numTotalBlocks, bool strict) -{ - TLLM_CHECK(cpRank >= 0 && cpRank < cpSize); - if (cpSize == 1) - { - return numTotalBlocks; - } - // NOTE: Non-strict mode may over-allocate blocks when numTotalBlocks is not divisible by cpSize. - // This is a known limitation and will be addressed in a future MR. - if (!strict) - { - // Simple ceiling division. - return (numTotalBlocks + cpSize - 1) / cpSize; - } - // In strict mode, blocks are distributed among CP ranks in a round-robin fashion as evenly as possible. - // When the number of blocks is not divisible by cpSize, the remainder shall be distributed evenly among - // lowest-indexed CP ranks (let's call them overflow ranks). - int numBlocksCurrRank = numTotalBlocks / cpSize; - if (numTotalBlocks % cpSize > cpRank) - { - numBlocksCurrRank++; - } - return numBlocksCurrRank; -} - // some context rank in connection std::vector MLACacheFormatter::pickRecvConnections( size_t numConnections, CacheState const& selfConfig, SizeType32 selfIdx, CacheState const& destConfig) const @@ -145,7 +120,8 @@ void MLACacheFormatter::format(tensorrt_llm::batch_manager::TransferSession& ses int blockNum = 0; std::vector inputKvCacheBlocks; auto const numPools = mCacheManager->getBlockManager().getNumPools(); - auto blockRange = getBlockRangeForSending(mCacheManager, llmRequest, lastBlockKey, indexFromEnd); + bool const recvSideHasCP = destConfig.getParallelConfig().mContextParallelism > 1; + auto blockRange = getBlockRangeForSending(mCacheManager, llmRequest, lastBlockKey, indexFromEnd, recvSideHasCP); auto const& windowSizes = blockRange.getWindowSizes(); TLLM_CHECK_WITH_INFO( static_cast(windowSizes.size()) == numPools, "window sizes should be the same as numPools"); @@ -204,7 +180,7 @@ void MLACacheFormatter::format(tensorrt_llm::batch_manager::TransferSession& ses auto const idx = cpDomainIdx * pPDomainSize + ppDomainIdx; // Note: contextCP is always 1. So, cpDomainSize == genCPSize and cpDomainIdx == genCPRank. auto const peerBlockNum - = getBlockNumAccountingForCP(cpDomainIdx, cPDomainSize, blockNum, /*strict=*/false); + = executor::kv_cache::getBlockNumAccountingForCP(cpDomainIdx, cPDomainSize, blockNum); bufferSizeForTarget[idx] = blockSizePerLayer * peerAttentionLayerNum * peerBlockNum; } } @@ -346,7 +322,9 @@ void MLACacheFormatter::unformat(tensorrt_llm::batch_manager::TransferSession& s auto const& connections = session.getConnections(); auto& bufferManager = session.getBufferManager(); auto pickUpConnections = pickRecvConnections(connections.size(), selfConfig, selfIdx, destConfig); - auto blockRange = getBlockRangeForReceiving(mCacheManager, llmRequest, destConfig.getEnableBlockReuse()); + bool const recvSideHasCP = selfConfig.getParallelConfig().mContextParallelism > 1; + auto blockRange + = getBlockRangeForReceiving(mCacheManager, llmRequest, destConfig.getEnableBlockReuse(), recvSideHasCP); std::vector recvBufferTmps; std::vector outputBuffers; auto const numPools = mCacheManager->getBlockManager().getNumPools(); diff --git a/cpp/tensorrt_llm/batch_manager/mlaCacheFormatter.h b/cpp/tensorrt_llm/batch_manager/mlaCacheFormatter.h index d352e49d78f..acaf231363d 100644 --- a/cpp/tensorrt_llm/batch_manager/mlaCacheFormatter.h +++ b/cpp/tensorrt_llm/batch_manager/mlaCacheFormatter.h @@ -22,24 +22,6 @@ namespace tensorrt_llm::batch_manager::kv_cache_manager { -/** - * @brief Calculate the number of blocks allocated to a specific Context Parallelism (CP) rank. - * - * This function determines how many blocks should be allocated to a given CP rank when - * distributing a total number of blocks across multiple CP ranks. It supports two distribution - * modes: strict and non-strict. - * - * @param cpRank The rank (index) of the current CP process. Must be in range [0, cpSize). - * @param cpSize The total number of CP ranks/processes in the parallel group. - * @param numTotalBlocks The total number of blocks to be distributed across all CP ranks. - * @param strict Flag controlling the distribution strategy: - * - true: Use strict round-robin distribution with exact allocation - * - false: Use ceiling division which may over-allocate - * - * @return The number of blocks allocated to the specified CP rank. - */ -int getBlockNumAccountingForCP(int cpRank, int cpSize, int numTotalBlocks, bool strict); - // Simple cache block copy. Because it does not involve data splitting or merging, it performs best when the // parallel topology is completely identical, making it the preferred method. class MLACacheFormatter final : public BaseCacheFormatter diff --git a/cpp/tensorrt_llm/common/envUtils.cpp b/cpp/tensorrt_llm/common/envUtils.cpp index f4449473a56..42241242d41 100644 --- a/cpp/tensorrt_llm/common/envUtils.cpp +++ b/cpp/tensorrt_llm/common/envUtils.cpp @@ -278,6 +278,12 @@ bool getEnvUseNixlKvCache() return useNixlKvCache; } +bool getEnvUseRoundRobinBlockDistForCP() +{ + static bool const useRoundRobinBlockDistForCP = getBoolEnv("TRTLLM_USE_ROUND_ROBIN_BLOCK_DIST_FOR_CP"); + return useRoundRobinBlockDistForCP; +} + std::string getEnvUCXInterface() { static std::once_flag flag; diff --git a/cpp/tensorrt_llm/common/envUtils.h b/cpp/tensorrt_llm/common/envUtils.h index 9e43a142438..7296c84ef1d 100644 --- a/cpp/tensorrt_llm/common/envUtils.h +++ b/cpp/tensorrt_llm/common/envUtils.h @@ -82,6 +82,8 @@ bool getEnvUseUCXKvCache(); bool getEnvUseMPIKvCache(); bool getEnvUseNixlKvCache(); +bool getEnvUseRoundRobinBlockDistForCP(); + std::string getEnvUCXInterface(); std::string getEnvNixlInterface(); diff --git a/cpp/tensorrt_llm/executor/cache_transmission/cacheSplitConcat.cu b/cpp/tensorrt_llm/executor/cache_transmission/cacheSplitConcat.cu index 524c067b746..8c85d5f680c 100644 --- a/cpp/tensorrt_llm/executor/cache_transmission/cacheSplitConcat.cu +++ b/cpp/tensorrt_llm/executor/cache_transmission/cacheSplitConcat.cu @@ -19,6 +19,7 @@ #include "tensorrt_llm/common/assert.h" #include "tensorrt_llm/common/cudaFp8Utils.h" #include "tensorrt_llm/common/cudaUtils.h" +#include "tensorrt_llm/common/envUtils.h" #include "tensorrt_llm/common/reduceKernelUtils.cuh" #include "tensorrt_llm/executor/dataTransceiverState.h" #include "tensorrt_llm/executor/tensor.h" @@ -45,6 +46,41 @@ inline bool isPowerOfTwo(int n) } } // namespace +int getBlockNumAccountingForCP(int cpRank, int cpSize, int numTotalBlocks) +{ + TLLM_CHECK(cpRank >= 0 && cpRank < cpSize); + if (cpSize == 1) + { + return numTotalBlocks; + } + // Blocks are distributed among CP ranks as evenly as possible. When the number of blocks is not + // divisible by cpSize, the remainder shall be distributed evenly among lowest-indexed CP ranks + // (let's call them overflow ranks). + int numBlocksCurrRank = numTotalBlocks / cpSize; + if (numTotalBlocks % cpSize > cpRank) + { + numBlocksCurrRank++; + } + return numBlocksCurrRank; +} + +int getGlobalBlockIdAccountingForCP(int localBlockIdx, int cpSize, int cpRank, int numTotalBlocks) +{ + if (tensorrt_llm::common::getEnvUseRoundRobinBlockDistForCP()) + { + return localBlockIdx * cpSize + cpRank; + } + else + { + int const minBlocksPerCPRank = numTotalBlocks / cpSize; + int const minBlocksOnPrevCPRanks = minBlocksPerCPRank * cpRank; + int const numOverflowRanks = numTotalBlocks % cpSize; + // Each previous overflow rank has one more block than minBlocksPerCPRank. + int const overflowBlocksOnPrevCPRanks = std::min(cpRank, numOverflowRanks); + return minBlocksOnPrevCPRanks + overflowBlocksOnPrevCPRanks + localBlockIdx; + } +} + // inputBlockNums: [outputBlockNum, inputRanks.size] // [PP, TP] TargetRanksInfo TargetRanksInfoForDP( @@ -568,12 +604,35 @@ __device__ __forceinline__ void getLayerIdInDomainPPandRankInDomainPP(int layerI layerNumInSpecPP = sharedLayerNumInSpecPP; } +__device__ __forceinline__ void getBlockIdInDomainCPandRankInDomainCP( + int blockId, int domainCPSize, uint64_t* prefixBlockNumDevPtr, int& blockIdInDomainCP, int& rankInDomainCP) +{ + __shared__ int sharedBlockIdInDomainCP; + __shared__ int sharedRankInDomainCP; + +#pragma unroll 1 + for (int cpRank = threadIdx.x; cpRank < domainCPSize; cpRank += blockDim.x) + { + if (blockId >= prefixBlockNumDevPtr[cpRank] && blockId < prefixBlockNumDevPtr[cpRank + 1]) + { + sharedBlockIdInDomainCP = blockId - prefixBlockNumDevPtr[cpRank]; + sharedRankInDomainCP = cpRank; + break; + } + } + + __syncthreads(); + blockIdInDomainCP = sharedBlockIdInDomainCP; + rankInDomainCP = sharedRankInDomainCP; +} + // MLA Head 1: One thread block per [(2), tokens, dimsPerHead] template __global__ void splitKVCacheForMLAKernel(T const** __restrict__ inputBlocks, T** __restrict__ outputCaches, int tokensPerBlock, int numLayers, int headNum, int dimsPerHead, int inputBlockNum, int domainPPSize, - int domainTPSize, int domainCPSize, int kvFactor, uint64_t* prefixLayerNumDevPtr) + int domainTPSize, int domainCPSize, int kvFactor, uint64_t* prefixLayerNumDevPtr, bool isCPRoundRobin, + uint64_t* prefixBlockNumDevPtr) { int const subWarpId = threadIdx.x / subWarpSize; int const laneId = threadIdx.x % subWarpSize; @@ -586,8 +645,18 @@ __global__ void splitKVCacheForMLAKernel(T const** __restrict__ inputBlocks, T** for (int64_t blockId = blockIdx.y; blockId < inputBlockNum; blockId += gridDim.y) { + // Default to CP round-robin. + int rankInDomainCP = blockId % domainCPSize; // genCPRank to which this block belongs. + int blockIdInDomainCP = blockId / domainCPSize; // localBlockId on genCPRank. + if (domainCPSize > 1 && !isCPRoundRobin) + { + // NOTE: domainCPSize is the same as genCPSize as contextCP is always 1 currently. So, + // - rankInDomainCP is the same as genCPRank to which this block belongs. + // - blockIdInDomainCP is the localBlockId on this genCPRank. + getBlockIdInDomainCPandRankInDomainCP( + blockId, domainCPSize, prefixBlockNumDevPtr, blockIdInDomainCP, rankInDomainCP); + } #pragma unroll 1 - for (int layerId = blockIdx.x; layerId < numLayers; layerId += gridDim.x) { int layerIdInDomainPP{}; @@ -611,11 +680,10 @@ __global__ void splitKVCacheForMLAKernel(T const** __restrict__ inputBlocks, T** // {pp1cp0}, {pp0cp1}, {pp1cp1}}. So, outputCaches of all ppRanks corresponding to a given cpRank are // grouped together. We do blockId % domainCPSize because blocks are distributed among cpRanks in a // round-robin fashion. - int outputCacheIdx = (blockId % domainCPSize) * domainPPSize + rankInDomainPP; + int outputCacheIdx = rankInDomainCP * domainPPSize + rankInDomainPP; T* outputCachePtr = outputCaches[outputCacheIdx]; int const headIdInDomainTP = headId; - int64_t const blockIdInDomainCP = blockId / domainCPSize; T* kOutputPtr = outputCachePtr + blockIdInDomainCP @@ -1092,6 +1160,15 @@ void splitKVCache(std::map> cachePtrs.insert(cachePtrs.end(), prefixLayerNum.begin(), prefixLayerNum.end()); bool const isWindow = windowSizes.size() > 1; + std::vector prefixBlockNum(targetRankInfo.mDomainCPSize + 1, 0); + prefixBlockNum[0] = 0; + for (int i = 0; i < targetRankInfo.mDomainCPSize; i++) + { + prefixBlockNum[i + 1] + = prefixBlockNum[i] + getBlockNumAccountingForCP(i, targetRankInfo.mDomainCPSize, inputBlockNumSum); + } + cachePtrs.insert(cachePtrs.end(), prefixBlockNum.begin(), prefixBlockNum.end()); + runtime::BufferManager::IBufferPtr PtrsDeviceBuffer = bufferManager.gpu(cachePtrs.size(), nvinfer1::DataType::kINT64); TLLM_CHECK(PtrsDeviceBuffer->getSizeInBytes() == cachePtrs.size() * sizeof(T*)); @@ -1152,6 +1229,8 @@ void splitKVCache(std::map> T** outputCachePtrsDev = static_cast(PtrsDeviceBuffer->data()) + inputBlockNumSum; uint64_t* prefixLayerNumDevPtr = static_cast(PtrsDeviceBuffer->data()) + inputBlockNumSum + outputSplitBlocks.size(); + uint64_t* prefixBlockNumDevPtr = static_cast(PtrsDeviceBuffer->data()) + inputBlockNumSum + + outputSplitBlocks.size() + prefixLayerNum.size(); int const tokensPerBlock = selfModelConfig.mTokensPerBlock; int const selfPPRank = selfIdx / (selfParallelConfig.mTensorParallelism * selfParallelConfig.mContextParallelism); @@ -1164,6 +1243,7 @@ void splitKVCache(std::map> int const headNumDomainTP = headNum / (domainTPSize / targetRankInfo.mPeerDupHeadFactor); int const kvFactor = selfAttentionConfig.mKvFactor; bool const isMLA = selfAttentionConfig.mAttentionType == CacheState::AttentionType::kMLA; + bool const isCPRoundRobin = tensorrt_llm::common::getEnvUseRoundRobinBlockDistForCP(); constexpr int mlaSubWarpSize = 16; TLLM_LOG_DEBUG( @@ -1178,9 +1258,10 @@ void splitKVCache(std::map> { if (isMLA) { - splitKVCacheForMLAKernel<<>>( - inputBlockPtrsDev, outputCachePtrsDev, tokensPerBlock, numLayers, headNum, dimsPerHead, - inputBlockNumSum, domainPPSize, domainTPSize, domainCPSize, kvFactor, prefixLayerNumDevPtr); + splitKVCacheForMLAKernel + <<>>(inputBlockPtrsDev, outputCachePtrsDev, + tokensPerBlock, numLayers, headNum, dimsPerHead, inputBlockNumSum, domainPPSize, domainTPSize, + domainCPSize, kvFactor, prefixLayerNumDevPtr, isCPRoundRobin, prefixBlockNumDevPtr); } else if (isWindow) { @@ -1202,9 +1283,10 @@ void splitKVCache(std::map> { if (isMLA) { - splitKVCacheForMLAKernel<<>>( - inputBlockPtrsDev, outputCachePtrsDev, tokensPerBlock, numLayers, headNum, dimsPerHead, - inputBlockNumSum, domainPPSize, domainTPSize, domainCPSize, kvFactor, prefixLayerNumDevPtr); + splitKVCacheForMLAKernel + <<>>(inputBlockPtrsDev, outputCachePtrsDev, + tokensPerBlock, numLayers, headNum, dimsPerHead, inputBlockNumSum, domainPPSize, domainTPSize, + domainCPSize, kvFactor, prefixLayerNumDevPtr, isCPRoundRobin, prefixBlockNumDevPtr); } else if (isWindow) { @@ -1232,7 +1314,7 @@ void splitKVCache(std::map> splitKVCacheForMLAKernel <<>>(inputBlockPtrsDev, outputCachePtrsDev, tokensPerBlock, numLayers, headNum, dimsPerHead, inputBlockNumSum, domainPPSize, domainTPSize, - domainCPSize, kvFactor, prefixLayerNumDevPtr); + domainCPSize, kvFactor, prefixLayerNumDevPtr, isCPRoundRobin, prefixBlockNumDevPtr); } else if (isWindow) { @@ -1265,7 +1347,7 @@ void splitKVCache(std::map> splitKVCacheForMLAKernel <<>>(inputBlockPtrsDev, outputCachePtrsDev, tokensPerBlock, numLayers, headNum, dimsPerHead, inputBlockNumSum, domainPPSize, domainTPSize, - domainCPSize, kvFactor, prefixLayerNumDevPtr); + domainCPSize, kvFactor, prefixLayerNumDevPtr, isCPRoundRobin, prefixBlockNumDevPtr); } else if (isWindow) { @@ -1294,7 +1376,7 @@ void splitKVCache(std::map> splitKVCacheForMLAKernel <<>>(inputBlockPtrsDev, outputCachePtrsDev, tokensPerBlock, numLayers, headNum, dimsPerHead, inputBlockNumSum, domainPPSize, domainTPSize, - domainCPSize, kvFactor, prefixLayerNumDevPtr); + domainCPSize, kvFactor, prefixLayerNumDevPtr, isCPRoundRobin, prefixBlockNumDevPtr); } else if (isWindow) { diff --git a/cpp/tensorrt_llm/executor/cache_transmission/cacheSplitConcat.h b/cpp/tensorrt_llm/executor/cache_transmission/cacheSplitConcat.h index e932e3ccf38..4e9446b4d1f 100644 --- a/cpp/tensorrt_llm/executor/cache_transmission/cacheSplitConcat.h +++ b/cpp/tensorrt_llm/executor/cache_transmission/cacheSplitConcat.h @@ -58,6 +58,36 @@ TargetRanksInfo targetIRanks( TargetRanksInfo TargetRanksInfoForDP( kv_cache::CacheState const& peerCacheState, kv_cache::CacheState const& selfCacheState, int selfRank); +/** + * @brief Calculate the number of blocks allocated to a specific Context Parallelism (CP) rank. + * + * This function determines how many blocks should be allocated to a given CP rank when + * distributing a total number of blocks across multiple CP ranks. + * + * @param cpRank The rank (index) of the current CP process. Must be in range [0, cpSize). + * @param cpSize The total number of CP ranks/processes in the parallel group. + * @param numTotalBlocks The total number of blocks to be distributed across all CP ranks. + * + * @return The number of blocks allocated to the specified CP rank. + */ +int getBlockNumAccountingForCP(int cpRank, int cpSize, int numTotalBlocks); + +/** + * @brief Convert a local block index to a global block ID when Context Parallelism (CP) is enabled. + * + * This function maps a local block index (within a specific CP rank) to its corresponding + * global block ID across all CP ranks. It supports two distribution strategies controlled + * by the environment variable TRTLLM_USE_ROUND_ROBIN_BLOCK_DIST_FOR_CP. + * + * @param localBlockIdx The local block index within the current CP rank (0-based). + * @param cpSize The total number of CP ranks in the parallel group. + * @param cpRank The rank of the current CP process. Must be in range [0, cpSize). + * @param numTotalBlocks The total number of blocks distributed across all CP ranks. + * + * @return The global block ID corresponding to the local block index. + */ +int getGlobalBlockIdAccountingForCP(int localBlockIdx, int cpSize, int cpRank, int numTotalBlocks); + void concatKVCacheDispatch(runtime::ITensor::SharedPtr* inputBlocks, int inputBlockNum, std::vector const& inputRanks, kv_cache::CacheState const& peerCacheState, runtime::ITensor::SharedPtr* outputBlocks, int outputBlockNum, int selfRank, diff --git a/cpp/tests/unit_tests/multi_gpu/cacheTransceiverTest.cpp b/cpp/tests/unit_tests/multi_gpu/cacheTransceiverTest.cpp index 4796fed2c70..a5e0d080202 100644 --- a/cpp/tests/unit_tests/multi_gpu/cacheTransceiverTest.cpp +++ b/cpp/tests/unit_tests/multi_gpu/cacheTransceiverTest.cpp @@ -419,6 +419,57 @@ TEST_F(SymmetricalCacheTest, SimpleTest) using AsymmetricTestParam = std::tuple; +// CPMetaData struct to hold CP-specific information +struct CPMetaData +{ + int mTotalSeqLenAcrossCPRanks{0}; + int mTotalNumBlocksAcrossCPRanks{0}; + int mNumBlocksThisCPRank{0}; + int mSeqLenOnThisCPRank{0}; + std::vector mGlobalBlockIds{}; + + CPMetaData() = default; + + CPMetaData(int totalSeqLen, int numTokensPerBlock, int cpRank, int cpSize) + { + mTotalSeqLenAcrossCPRanks = totalSeqLen; + mTotalNumBlocksAcrossCPRanks = (totalSeqLen + numTokensPerBlock - 1) / numTokensPerBlock; + mNumBlocksThisCPRank = tensorrt_llm::executor::kv_cache::getBlockNumAccountingForCP( + cpRank, cpSize, mTotalNumBlocksAcrossCPRanks); + mSeqLenOnThisCPRank = totalSeqLen; + int numPaddedTokensLastBlock = 0; + TLLM_CHECK_WITH_INFO(!tensorrt_llm::common::getEnvUseRoundRobinBlockDistForCP(), + "Round-robin block distribution for CP needs further adjustments."); + // If there are any padded tokens, they will be on the last block on last CP rank for contiguous distribution of + // blocks. + if (cpRank == cpSize - 1 && totalSeqLen % numTokensPerBlock != 0) + { + numPaddedTokensLastBlock = numTokensPerBlock - (totalSeqLen % numTokensPerBlock); + } + mSeqLenOnThisCPRank = mNumBlocksThisCPRank * numTokensPerBlock - numPaddedTokensLastBlock; + mGlobalBlockIds = std::vector(mNumBlocksThisCPRank); + for (int i = 0; i < mNumBlocksThisCPRank; i++) + { + mGlobalBlockIds[i] = tensorrt_llm::executor::kv_cache::getGlobalBlockIdAccountingForCP( + i, cpSize, cpRank, mTotalNumBlocksAcrossCPRanks); + } + } +}; + +struct WrappedLlmRequest +{ + std::unique_ptr mLlmRequest; + std::optional mCPMetaData; + + using RequestIdType = LlmRequest::RequestIdType; + + WrappedLlmRequest(std::unique_ptr llmRequest, std::optional cpMetaData) + : mLlmRequest(std::move(llmRequest)) + , mCPMetaData(std::move(cpMetaData)) + { + } +}; + class AsymmetricalCacheTest : public ::testing::TestWithParam { @@ -435,7 +486,7 @@ class AsymmetricalCacheTest : public ::testing::TestWithParamgetModelConfig().mTokensPerBlock; + std::optional cpMetaData; + int seqLen = length; + if (mCpSize > 1) + { + cpMetaData.emplace(length, tokensPerBlock, mCpRank, mCpSize); + seqLen = cpMetaData.value().mSeqLenOnThisCPRank; + } + texec::Request request{VecTokens(seqLen, seqLen), maxNewTokens}; auto state = std::make_unique(); TLLM_CHECK(mContextCommState); @@ -785,7 +843,9 @@ class AsymmetricalCacheTest : public ::testing::TestWithParamsetCacheState(*mContextCacheState); auto stats = texec::ContextPhaseParams({}, mRequestId, state.release(), std::nullopt); request.setContextPhaseParams(std::move(stats)); - return std::make_unique(mRequestId++, std::move(request)); + + auto llmRequestPtr = std::make_unique(mRequestId++, std::move(request)); + return std::make_unique(std::move(llmRequestPtr), cpMetaData); } auto makeLlmRequestWithDP(SizeType32 length, LlmRequest::RequestIdType requestId, int contextDpRank) @@ -807,17 +867,27 @@ class AsymmetricalCacheTest : public ::testing::TestWithParamsetCacheState(cacheState); auto stats = texec::ContextPhaseParams({}, requestId, state.release(), std::nullopt); request.setContextPhaseParams(std::move(stats)); - return std::make_unique(requestId, std::move(request)); + auto llmRequestPtr = std::make_unique(requestId, std::move(request)); + + std::optional cpMetaData; + return std::make_unique(std::move(llmRequestPtr), cpMetaData); } - std::future addRequestAndTransportCacheForContext(std::shared_ptr const& llmRequest) + std::future addRequestAndTransportCacheForContext(std::shared_ptr const& request) { auto constexpr beamIdx{0}; auto constexpr beamWidth{1}; + auto& llmRequest = request->mLlmRequest; mManager->addSequence(llmRequest->mRequestId, llmRequest->getNumTokens(beamIdx), beamWidth, llmRequest); auto blockRange = BlockRange::fromAllBlockIds(*mManager, llmRequest->mRequestId); int const numPools = mManager->getBlockManager().getNumPools(); + auto initial = llmRequest->getPromptLen(); + if (request->mCPMetaData.has_value()) + { + auto const& cpData = request->mCPMetaData.value(); + initial = cpData.mTotalSeqLenAcrossCPRanks; + } TLLM_LOG_DEBUG(" addRequestAndTransportCacheForContext mManager numPools: %d", numPools); auto const& windowSizes = blockRange.getWindowSizes(); int blockIdx = 0; @@ -827,7 +897,7 @@ class AsymmetricalCacheTest : public ::testing::TestWithParamgetPromptLen(), windowSize); + fillBlockData(*it, blockIdx, initial, windowSize); blockIdx++; } TLLM_LOG_DEBUG("windowSize: %d finish fill block data", windowSize); @@ -844,29 +914,16 @@ class AsymmetricalCacheTest : public ::testing::TestWithParam addRequestAndTransportCacheForGeneration(std::shared_ptr const& llmRequest) + std::future addRequestAndTransportCacheForGeneration(std::shared_ptr const& request) { auto constexpr beamIdx{0}; auto constexpr beamWidth{1}; + auto& llmRequest = request->mLlmRequest; mManager->addSequence(llmRequest->mRequestId, llmRequest->getNumTokens(beamIdx), beamWidth, llmRequest); - return mRequester->receiveAsync(*llmRequest); } - // Called only by generationVerifyKVCache. Currently, generation ranks might over-allocate blocks when CP is - // enabled. - bool isBlockOverallocated(int blockIdx, int numTotalBlocks) - { - bool const generationHasCP = mCpSize > 1; - if (!generationHasCP) - { - return false; - } - int const numValidBlocks = getBlockNumAccountingForCP(mCpRank, mCpSize, numTotalBlocks, /*strict=*/true); - return blockIdx >= numValidBlocks; - } - - void generationVerifyKVCache(std::shared_ptr const& llmRequest) + void generationVerifyKVCache(std::shared_ptr const& request) { auto constexpr beamIdx{0}; auto constexpr beamWidth{1}; @@ -874,7 +931,10 @@ class AsymmetricalCacheTest : public ::testing::TestWithParammLlmRequest; auto blockRange = BlockRange::fromAllBlockIds(*mManager, llmRequest->mRequestId); + auto initial = llmRequest->getPromptLen(); + auto const& windowSizes = blockRange.getWindowSizes(); for (auto const& windowSize : windowSizes) { @@ -882,20 +942,21 @@ class AsymmetricalCacheTest : public ::testing::TestWithParamgetModelConfig().mTokensPerBlock; int startBlockId = std::max(0, static_cast(blockRangeForWindow.size()) - (maxBlockInWindow + 1)); int blockIdInWindow = 0; - int const numTotalBlocks = blockRangeForWindow.size(); + std::vector globalBlockIdsForWindow(blockRangeForWindow.size()); + std::iota(globalBlockIdsForWindow.begin(), globalBlockIdsForWindow.end(), 0); + if (request->mCPMetaData.has_value()) + { + // Currently, limit support of CPMetadata to a single window size in our testcases. + TLLM_CHECK(windowSizes.size() == 1); + auto const& cpData = request->mCPMetaData.value(); + initial = cpData.mTotalSeqLenAcrossCPRanks; + globalBlockIdsForWindow = cpData.mGlobalBlockIds; + } for (auto it = blockRangeForWindow.begin(); it != blockRangeForWindow.end(); ++it) { - if (isBlockOverallocated(blockIdx, numTotalBlocks)) - { - TLLM_LOG_INFO( - "[generationVerifyKVCache] Skipping over-allocated block for request id %d (rank %d, blockIdx " - "%d, numTotalBlocks %d)", - llmRequest->mRequestId, mRank, blockIdx, numTotalBlocks); - break; - } if (blockIdInWindow >= startBlockId) { - verifyBlockData(*it, blockIdx, llmRequest->getPromptLen(), windowSize); + verifyBlockData(*it, initial, globalBlockIdsForWindow[blockIdx], windowSize); } blockIdx++; blockIdInWindow++; @@ -931,6 +992,7 @@ class AsymmetricalCacheTest : public ::testing::TestWithParamgetAttentionConfig().mKvFactor; int tokensPerBlock = mCacheState->getModelConfig().mTokensPerBlock; + // We don't account for CP here because contextCP is always 1 currently. int startTokenId = blockId * tokensPerBlock; int sizePerHead = mCacheState->getModelConfig().mSizePerHead; auto dataTypeSize = tensorrt_llm::common::getDTypeSize(blockData.getDataType()); @@ -977,7 +1039,8 @@ class AsymmetricalCacheTest : public ::testing::TestWithParamgetBlockManager(); @@ -1008,7 +1071,7 @@ class AsymmetricalCacheTest : public ::testing::TestWithParamgetAttentionConfig().mKvFactor; int tokensPerBlock = mCacheState->getModelConfig().mTokensPerBlock; - int startTokenId = (blockId * mCpSize + mCpRank) * tokensPerBlock; + int startTokenId = globalBlockId * tokensPerBlock; int sizePerHead = mCacheState->getModelConfig().mSizePerHead; bufferManager.copy(blockData, *hostTensor); @@ -1058,7 +1121,6 @@ class AsymmetricalCacheTest : public ::testing::TestWithParam generateExpectedValue(size_t initial, int windowSize, int tokenId, int layerId, int headId, int hiddenId, bool key, nvinfer1::DataType dataType) { - size_t seed = 0; std::size_t hashValue = std::hash{}(initial); std::hash hasher{}; @@ -1146,6 +1208,24 @@ TEST_P(AsymmetricalCacheTest, TestCase) { GTEST_SKIP() << "Temporarily skipping cache transceiver tests with NIXL backend for CP."; } + std::vector lenList = {30, 10, 60, 80}; + if (genCp > 1) + { + std::vector updatedLenList; + for (auto len : lenList) + { + if (len > tokensPerBlock * (genCp - 1)) + { + updatedLenList.push_back(len); + } + } + if (updatedLenList.empty()) + { + GTEST_SKIP() << "Skipping test because not even one request has one block per genCP rank. tokensPerBlock=" + << tokensPerBlock << ", genCp=" << genCp; + } + lenList = updatedLenList; + } setUpCommunicator(contextTp, contextPp, contextCp, genTp, genPp, genCp, isMLA, contextDP, generationDP); @@ -1153,12 +1233,12 @@ TEST_P(AsymmetricalCacheTest, TestCase) { setUpCacheManager(numLayers, numHeads, sizePerHead, tokensPerBlock, dataType, kvFactor, isMLA, false, isWindow); setUpCacheTransceiver(); - std::vector> requests; + std::vector> requests; // the second loop is for cache reuse for (int i = 0; i < 2; i++) { - for (auto len : {30, 10, 60, 80}) + for (auto len : lenList) { requests.emplace_back(makeLlmRequest(len)); } @@ -1196,7 +1276,7 @@ TEST_P(AsymmetricalCacheTest, TestCase) } for (auto&& request : requests) { - mManager->removeSequence(request->mRequestId, request); + mManager->removeSequence(request->mLlmRequest->mRequestId, request->mLlmRequest); } requests.clear(); mComm->barrier(); @@ -1252,7 +1332,7 @@ TEST_P(AsymmetricalCacheTestWithDP, TestCase) setUpCacheManager( numLayers, numHeads, sizePerHead, tokensPerBlock, dataType, kvFactor, isMLA, enableDP, isWindow); setUpCacheTransceiver(); - std::vector> requests; + std::vector> requests; int requestId = 0; for (auto len : {60, 30, 60, 10}) { @@ -1261,11 +1341,11 @@ TEST_P(AsymmetricalCacheTestWithDP, TestCase) } std::vector> contextFutures; std::vector> generationFutures; - std::vector> generationRequests; + std::vector> generationRequests; if (mIsContext) { - std::vector> contextRequests; + std::vector> contextRequests; if (contextDP) { for (int i = 0; i < requests.size(); i++) @@ -1403,9 +1483,9 @@ INSTANTIATE_TEST_CASE_P(AsymmetricCaseTest0WithCPForMLA, AsymmetricalCacheTest, /*numLayers*/ testing::Values(4), /*numHeads*/ testing::Values(1), /*sizePerHead*/ testing::Values(4), - /*tokensPerBlock*/ testing::Values(16), + /*tokensPerBlock*/ testing::Values(8), /*dataType*/ testing::Values(nvinfer1::DataType::kFLOAT, nvinfer1::DataType::kINT8), - /*kvFactor*/ testing::Values(2), + /*kvFactor*/ testing::Values(1), /*isMLA*/ testing::Values(true), /*contextDP*/ testing::Values(false), /*generationDP*/ testing::Values(false), @@ -1422,9 +1502,9 @@ INSTANTIATE_TEST_CASE_P(AsymmetricCaseTest1WithCPForMLA, AsymmetricalCacheTest, /*numLayers*/ testing::Values(4), /*numHeads*/ testing::Values(1), /*sizePerHead*/ testing::Values(4), - /*tokensPerBlock*/ testing::Values(16), + /*tokensPerBlock*/ testing::Values(8), /*dataType*/ testing::Values(nvinfer1::DataType::kFLOAT, nvinfer1::DataType::kINT8), - /*kvFactor*/ testing::Values(2), + /*kvFactor*/ testing::Values(1), /*isMLA*/ testing::Values(true), /*contextDP*/ testing::Values(false), /*generationDP*/ testing::Values(false), From 7101d220b1f07e5994d582800cca538c1d7e0a71 Mon Sep 17 00:00:00 2001 From: Balaram Buddharaju <169953907+brb-nv@users.noreply.github.com> Date: Tue, 28 Oct 2025 21:22:16 +0000 Subject: [PATCH 2/4] fix test Signed-off-by: Balaram Buddharaju <169953907+brb-nv@users.noreply.github.com> --- .../multi_gpu/cacheTransceiverTest.cpp | 94 ++++++++++++++++--- 1 file changed, 83 insertions(+), 11 deletions(-) diff --git a/cpp/tests/unit_tests/multi_gpu/cacheTransceiverTest.cpp b/cpp/tests/unit_tests/multi_gpu/cacheTransceiverTest.cpp index a5e0d080202..7b6cc715301 100644 --- a/cpp/tests/unit_tests/multi_gpu/cacheTransceiverTest.cpp +++ b/cpp/tests/unit_tests/multi_gpu/cacheTransceiverTest.cpp @@ -478,6 +478,17 @@ class AsymmetricalCacheTest : public ::testing::TestWithParamgetModelConfig().mTokensPerBlock; int startBlockId = std::max(0, static_cast(blockRangeForWindow.size()) - (maxBlockInWindow + 1)); int blockIdInWindow = 0; - std::vector globalBlockIdsForWindow(blockRangeForWindow.size()); - std::iota(globalBlockIdsForWindow.begin(), globalBlockIdsForWindow.end(), 0); + // This is relevant only when context parallelism is enabled. + std::vector globalBlockIdsForWindow; if (request->mCPMetaData.has_value()) { // Currently, limit support of CPMetadata to a single window size in our testcases. TLLM_CHECK(windowSizes.size() == 1); + globalBlockIdsForWindow = std::vector(blockRangeForWindow.size()); auto const& cpData = request->mCPMetaData.value(); initial = cpData.mTotalSeqLenAcrossCPRanks; globalBlockIdsForWindow = cpData.mGlobalBlockIds; @@ -956,7 +968,7 @@ class AsymmetricalCacheTest : public ::testing::TestWithParam= startBlockId) { - verifyBlockData(*it, initial, globalBlockIdsForWindow[blockIdx], windowSize); + verifyBlockData(*it, initial, globalBlockIdsForWindow.empty() ? blockIdx : globalBlockIdsForWindow[blockIdx], windowSize); } blockIdx++; blockIdInWindow++; @@ -966,6 +978,11 @@ class AsymmetricalCacheTest : public ::testing::TestWithParamgetBlockManager(); auto const onlyWindowSize = windowSize == 0 ? blockManager.getPoolWindowSize(0) : windowSize; auto const& bufferManager = blockManager.getBufferManager(onlyWindowSize); @@ -1015,6 +1032,19 @@ class AsymmetricalCacheTest : public ::testing::TestWithParam(hostTensor->data(keyIndex)); *dataPtr = generateValue; + if (TARGET_RANK == -1 || tensorrt_llm::mpi::MpiComm::world().getRank() == TARGET_RANK) + { + TLLM_LOG_INFO(tensorrt_llm::mpi::MpiComm::world().getRank(), + "[RANK %d] [fillBlockData::key] blockId=%d, layer=%d->%d, head=%d->%d, token=%d->%d, hidden=%d, " + "keyIdx=%zu, set_value=%s, dataType=%d", + tensorrt_llm::mpi::MpiComm::world().getRank(), + blockId, layerId, layerId + startLayerId, + headId, headId + startHeadId, + tokenId, tokenId + startTokenId, + hiddenId, keyIndex, + std::to_string(static_cast(*dataPtr)).c_str(), + static_cast(blockData.getDataType())); + } }, generateExpectedValue(initial, windowSize, tokenId + startTokenId, layerId + startLayerId, headId + startHeadId, hiddenId, true, blockData.getDataType())); @@ -1040,10 +1070,16 @@ class AsymmetricalCacheTest : public ::testing::TestWithParamgetBlockManager(); + static const int TARGET_RANK = getEnvMpiDebugRank(); // -1 means all ranks. + if (TARGET_RANK == -1 || tensorrt_llm::mpi::MpiComm::world().getRank() == TARGET_RANK) + { + TLLM_LOG_INFO("verifyBlockData called for rank %d mRankInInstance %d blockId %d", mRank, mRankInInstance, blockId); + } + auto const onlyWindowSize = windowSize == 0 ? blockManager.getPoolWindowSize(0) : windowSize; auto const& bufferManager = blockManager.getBufferManager(onlyWindowSize); @@ -1071,7 +1107,7 @@ class AsymmetricalCacheTest : public ::testing::TestWithParamgetAttentionConfig().mKvFactor; int tokensPerBlock = mCacheState->getModelConfig().mTokensPerBlock; - int startTokenId = globalBlockId * tokensPerBlock; + int startTokenId = blockId * tokensPerBlock; int sizePerHead = mCacheState->getModelConfig().mSizePerHead; bufferManager.copy(blockData, *hostTensor); @@ -1096,6 +1132,24 @@ class AsymmetricalCacheTest : public ::testing::TestWithParam(hostTensor->data(keyIndex)); EXPECT_EQ(*dataPtr, generateValue); + if (TARGET_RANK == -1 || tensorrt_llm::mpi::MpiComm::world().getRank() == TARGET_RANK) + { + std::string result = ""; + if (*dataPtr != generateValue) { + result = "FAILED!"; + } + TLLM_LOG_INFO(tensorrt_llm::mpi::MpiComm::world().getRank(), + "[RANK %d] [verifyBlockData::value] blockId=%d, layer=%d->%d, head=%d->%d, token=%d->%d, hidden=%d, " + "valueIdx=%zu, actual_value=%s, dataType=%d %s", + tensorrt_llm::mpi::MpiComm::world().getRank(), + blockId, layerId, layerId + startLayerId, + headId, headId + startHeadId, + tokenId, tokenId + startTokenId, + hiddenId, valueIndex, + std::to_string(static_cast(*dataPtr)).c_str(), + static_cast(blockData.getDataType()), + result.c_str()); + } }, generateExpectedValue(initial, windowSize, tokenId + startTokenId, layerId + startLayerId, headId + startHeadId, hiddenId, true, blockData.getDataType())); @@ -1121,6 +1175,12 @@ class AsymmetricalCacheTest : public ::testing::TestWithParam generateExpectedValue(size_t initial, int windowSize, int tokenId, int layerId, int headId, int hiddenId, bool key, nvinfer1::DataType dataType) { + static const int TARGET_RANK = getEnvMpiDebugRank(); // -1 means all ranks. + if (TARGET_RANK == -1 || tensorrt_llm::mpi::MpiComm::world().getRank() == TARGET_RANK) + { + TLLM_LOG_INFO("generateExpectedValue called for rank %d, initial=%zu, windowSize=%d, tokenId=%d, layerId=%d, headId=%d, hiddenId=%d, key=%d, dataType=%d", + tensorrt_llm::mpi::MpiComm::world().getRank(), initial, windowSize, tokenId, layerId, headId, hiddenId, key, static_cast(dataType)); + } size_t seed = 0; std::size_t hashValue = std::hash{}(initial); std::hash hasher{}; @@ -1208,7 +1268,7 @@ TEST_P(AsymmetricalCacheTest, TestCase) { GTEST_SKIP() << "Temporarily skipping cache transceiver tests with NIXL backend for CP."; } - std::vector lenList = {30, 10, 60, 80}; + std::vector lenList = {8}; if (genCp > 1) { std::vector updatedLenList; @@ -1236,7 +1296,7 @@ TEST_P(AsymmetricalCacheTest, TestCase) std::vector> requests; // the second loop is for cache reuse - for (int i = 0; i < 2; i++) + for (int i = 0; i < 1; i++) { for (auto len : lenList) { @@ -1413,10 +1473,22 @@ TEST_P(AsymmetricalCacheTestWithDP, TestCase) } INSTANTIATE_TEST_CASE_P(AsymmetricCaseTest0, AsymmetricalCacheTest, - testing::Combine(testing::Values(1, 2), testing::Values(1, 2), testing::Values(1), testing::Values(1, 2), - testing::Values(1, 2), testing::Values(1), testing::Values(4), testing::Values(4), testing::Values(4), - testing::Values(16), testing::Values(nvinfer1::DataType::kFLOAT, nvinfer1::DataType::kINT8), testing::Values(2), - testing::Values(false), testing::Values(false), testing::Values(false), testing::Values(true, false))); + testing::Combine(testing::Values(1), + testing::Values(1), + testing::Values(1), + testing::Values(1), + testing::Values(1), + testing::Values(1), + testing::Values(2), + testing::Values(1), + testing::Values(4), + testing::Values(8), + testing::Values(nvinfer1::DataType::kINT8), + testing::Values(1), + testing::Values(false), + testing::Values(false), + testing::Values(false), + testing::Values(true))); INSTANTIATE_TEST_CASE_P(AsymmetricCaseTestWithWindow, AsymmetricalCacheTest, testing::Combine(testing::Values(1), testing::Values(1), testing::Values(1), testing::Values(1), testing::Values(1), From 6634a21d9662afe6f07fe9475b7b2b67cfe651a9 Mon Sep 17 00:00:00 2001 From: Balaram Buddharaju <169953907+brb-nv@users.noreply.github.com> Date: Tue, 28 Oct 2025 21:31:07 +0000 Subject: [PATCH 3/4] bring back all tests Signed-off-by: Balaram Buddharaju <169953907+brb-nv@users.noreply.github.com> --- .../multi_gpu/cacheTransceiverTest.cpp | 24 +++++-------------- 1 file changed, 6 insertions(+), 18 deletions(-) diff --git a/cpp/tests/unit_tests/multi_gpu/cacheTransceiverTest.cpp b/cpp/tests/unit_tests/multi_gpu/cacheTransceiverTest.cpp index 7b6cc715301..1529b821e0c 100644 --- a/cpp/tests/unit_tests/multi_gpu/cacheTransceiverTest.cpp +++ b/cpp/tests/unit_tests/multi_gpu/cacheTransceiverTest.cpp @@ -1268,7 +1268,7 @@ TEST_P(AsymmetricalCacheTest, TestCase) { GTEST_SKIP() << "Temporarily skipping cache transceiver tests with NIXL backend for CP."; } - std::vector lenList = {8}; + std::vector lenList = {30, 10, 60, 80}; if (genCp > 1) { std::vector updatedLenList; @@ -1296,7 +1296,7 @@ TEST_P(AsymmetricalCacheTest, TestCase) std::vector> requests; // the second loop is for cache reuse - for (int i = 0; i < 1; i++) + for (int i = 0; i < 2; i++) { for (auto len : lenList) { @@ -1473,22 +1473,10 @@ TEST_P(AsymmetricalCacheTestWithDP, TestCase) } INSTANTIATE_TEST_CASE_P(AsymmetricCaseTest0, AsymmetricalCacheTest, - testing::Combine(testing::Values(1), - testing::Values(1), - testing::Values(1), - testing::Values(1), - testing::Values(1), - testing::Values(1), - testing::Values(2), - testing::Values(1), - testing::Values(4), - testing::Values(8), - testing::Values(nvinfer1::DataType::kINT8), - testing::Values(1), - testing::Values(false), - testing::Values(false), - testing::Values(false), - testing::Values(true))); + testing::Combine(testing::Values(1, 2), testing::Values(1, 2), testing::Values(1), testing::Values(1, 2), + testing::Values(1, 2), testing::Values(1), testing::Values(4), testing::Values(4), testing::Values(4), + testing::Values(16), testing::Values(nvinfer1::DataType::kFLOAT, nvinfer1::DataType::kINT8), testing::Values(2), + testing::Values(false), testing::Values(false), testing::Values(false), testing::Values(true, false))); INSTANTIATE_TEST_CASE_P(AsymmetricCaseTestWithWindow, AsymmetricalCacheTest, testing::Combine(testing::Values(1), testing::Values(1), testing::Values(1), testing::Values(1), testing::Values(1), From 00daf76a73721fc5b4390f9e8bf7059d6f024c31 Mon Sep 17 00:00:00 2001 From: Balaram Buddharaju <169953907+brb-nv@users.noreply.github.com> Date: Tue, 28 Oct 2025 21:34:00 +0000 Subject: [PATCH 4/4] clean up Signed-off-by: Balaram Buddharaju <169953907+brb-nv@users.noreply.github.com> --- .../multi_gpu/cacheTransceiverTest.cpp | 65 +------------------ 1 file changed, 3 insertions(+), 62 deletions(-) diff --git a/cpp/tests/unit_tests/multi_gpu/cacheTransceiverTest.cpp b/cpp/tests/unit_tests/multi_gpu/cacheTransceiverTest.cpp index 1529b821e0c..fd8fb199c6f 100644 --- a/cpp/tests/unit_tests/multi_gpu/cacheTransceiverTest.cpp +++ b/cpp/tests/unit_tests/multi_gpu/cacheTransceiverTest.cpp @@ -478,17 +478,6 @@ class AsymmetricalCacheTest : public ::testing::TestWithParam= startBlockId) { - verifyBlockData(*it, initial, globalBlockIdsForWindow.empty() ? blockIdx : globalBlockIdsForWindow[blockIdx], windowSize); + verifyBlockData(*it, initial, + globalBlockIdsForWindow.empty() ? blockIdx : globalBlockIdsForWindow[blockIdx], windowSize); } blockIdx++; blockIdInWindow++; @@ -978,11 +968,6 @@ class AsymmetricalCacheTest : public ::testing::TestWithParamgetBlockManager(); auto const onlyWindowSize = windowSize == 0 ? blockManager.getPoolWindowSize(0) : windowSize; auto const& bufferManager = blockManager.getBufferManager(onlyWindowSize); @@ -1032,19 +1017,6 @@ class AsymmetricalCacheTest : public ::testing::TestWithParam(hostTensor->data(keyIndex)); *dataPtr = generateValue; - if (TARGET_RANK == -1 || tensorrt_llm::mpi::MpiComm::world().getRank() == TARGET_RANK) - { - TLLM_LOG_INFO(tensorrt_llm::mpi::MpiComm::world().getRank(), - "[RANK %d] [fillBlockData::key] blockId=%d, layer=%d->%d, head=%d->%d, token=%d->%d, hidden=%d, " - "keyIdx=%zu, set_value=%s, dataType=%d", - tensorrt_llm::mpi::MpiComm::world().getRank(), - blockId, layerId, layerId + startLayerId, - headId, headId + startHeadId, - tokenId, tokenId + startTokenId, - hiddenId, keyIndex, - std::to_string(static_cast(*dataPtr)).c_str(), - static_cast(blockData.getDataType())); - } }, generateExpectedValue(initial, windowSize, tokenId + startTokenId, layerId + startLayerId, headId + startHeadId, hiddenId, true, blockData.getDataType())); @@ -1069,17 +1041,10 @@ class AsymmetricalCacheTest : public ::testing::TestWithParamgetBlockManager(); - static const int TARGET_RANK = getEnvMpiDebugRank(); // -1 means all ranks. - if (TARGET_RANK == -1 || tensorrt_llm::mpi::MpiComm::world().getRank() == TARGET_RANK) - { - TLLM_LOG_INFO("verifyBlockData called for rank %d mRankInInstance %d blockId %d", mRank, mRankInInstance, blockId); - } - auto const onlyWindowSize = windowSize == 0 ? blockManager.getPoolWindowSize(0) : windowSize; auto const& bufferManager = blockManager.getBufferManager(onlyWindowSize); @@ -1132,24 +1097,6 @@ class AsymmetricalCacheTest : public ::testing::TestWithParam(hostTensor->data(keyIndex)); EXPECT_EQ(*dataPtr, generateValue); - if (TARGET_RANK == -1 || tensorrt_llm::mpi::MpiComm::world().getRank() == TARGET_RANK) - { - std::string result = ""; - if (*dataPtr != generateValue) { - result = "FAILED!"; - } - TLLM_LOG_INFO(tensorrt_llm::mpi::MpiComm::world().getRank(), - "[RANK %d] [verifyBlockData::value] blockId=%d, layer=%d->%d, head=%d->%d, token=%d->%d, hidden=%d, " - "valueIdx=%zu, actual_value=%s, dataType=%d %s", - tensorrt_llm::mpi::MpiComm::world().getRank(), - blockId, layerId, layerId + startLayerId, - headId, headId + startHeadId, - tokenId, tokenId + startTokenId, - hiddenId, valueIndex, - std::to_string(static_cast(*dataPtr)).c_str(), - static_cast(blockData.getDataType()), - result.c_str()); - } }, generateExpectedValue(initial, windowSize, tokenId + startTokenId, layerId + startLayerId, headId + startHeadId, hiddenId, true, blockData.getDataType())); @@ -1175,12 +1122,6 @@ class AsymmetricalCacheTest : public ::testing::TestWithParam generateExpectedValue(size_t initial, int windowSize, int tokenId, int layerId, int headId, int hiddenId, bool key, nvinfer1::DataType dataType) { - static const int TARGET_RANK = getEnvMpiDebugRank(); // -1 means all ranks. - if (TARGET_RANK == -1 || tensorrt_llm::mpi::MpiComm::world().getRank() == TARGET_RANK) - { - TLLM_LOG_INFO("generateExpectedValue called for rank %d, initial=%zu, windowSize=%d, tokenId=%d, layerId=%d, headId=%d, hiddenId=%d, key=%d, dataType=%d", - tensorrt_llm::mpi::MpiComm::world().getRank(), initial, windowSize, tokenId, layerId, headId, hiddenId, key, static_cast(dataType)); - } size_t seed = 0; std::size_t hashValue = std::hash{}(initial); std::hash hasher{};