Skip to content

Commit c19c9de

Browse files
tshmilnvidiaglevnv
authored andcommitted
copyBlock with NixlLoopbackAgent
Implement copyBlock function with nixlLoopbackAgent class. Signed-off-by: Tomer Shmilovich <[email protected]>
1 parent a7ae526 commit c19c9de

File tree

2 files changed

+30
-96
lines changed

2 files changed

+30
-96
lines changed

cpp/include/tensorrt_llm/batch_manager/kvCacheTransferManager.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ namespace tensorrt_llm::batch_manager::kv_cache_manager
3232
class KVCacheTransferManager
3333
{
3434
public:
35-
explicit KVCacheTransferManager(tr::BufferManager const& bufferManager);
35+
explicit KVCacheTransferManager(tr::BufferManager const& bufferManager, BaseLoopbackAgent* loopbackAgent = nullptr);
3636

3737
//! \brief Onboard a block to gpu memory.
3838
void onboard(BlockPtr const& offloadBlock, BlockPtr const& block, std::vector<KVCacheBlockPool> const& pools,
@@ -75,6 +75,9 @@ class KVCacheTransferManager
7575

7676
// Track the block ids offloaded in this iteration.
7777
std::unordered_map<int32_t, tr::CudaEvent> mPendingOffloads;
78+
// Reference to parent loopback agent
79+
BaseLoopbackAgent* mLoopbackAgent;
80+
int mDeviceId;
7881
};
7982

8083
} // namespace tensorrt_llm::batch_manager::kv_cache_manager

cpp/tensorrt_llm/batch_manager/kvCacheTransferManager.cpp

Lines changed: 26 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@
4343
namespace tr = tensorrt_llm::runtime;
4444
namespace tk = tensorrt_llm::kernels;
4545

46+
using namespace tensorrt_llm::executor::kv_cache;
47+
namespace kvc = tensorrt_llm::executor::kv_cache;
48+
4649
namespace tensorrt_llm::batch_manager::kv_cache_manager
4750
{
4851

@@ -86,11 +89,14 @@ static bool fileToGpuPosix(tr::ITensor::SharedPtr const& dstPtr, std::string con
8689
return true;
8790
}
8891

89-
KVCacheTransferManager::KVCacheTransferManager(tr::BufferManager const& bufferManager)
92+
KVCacheTransferManager::KVCacheTransferManager(tr::BufferManager const& bufferManager, BaseLoopbackAgent* loopbackAgent)
9093
: mBufferManager{bufferManager}
9194
, mOnboardManager(std::make_shared<tr::CudaStream>())
9295
, mOffloadManager(std::make_shared<tr::CudaStream>())
96+
, mLoopbackAgent{loopbackAgent}
9397
{
98+
TLLM_CUDA_CHECK(cudaGetDevice(&mDeviceId));
99+
TLLM_CHECK(mDeviceId != -1);
94100
}
95101

96102
tr::ITensor::SharedPtr KVCacheTransferManager::computeBlockPointer(
@@ -159,115 +165,40 @@ void KVCacheTransferManager::copyBlock(BlockPtr const& src, BlockPtr const& dst,
159165
return;
160166
}
161167

168+
std::vector<FileDesc> fileBlobs;
169+
std::vector<MemoryDesc> memoryBlobs;
170+
162171
for (size_t poolIdx = 0; poolIdx < pools.size(); ++poolIdx)
163172
{
164-
auto srcPtr = computeBlockPointer(src, pools, poolIdx);
165-
auto dstPtr = computeBlockPointer(dst, pools, poolIdx);
173+
auto ptr = isOffload ? computeBlockPointer(src, pools, poolIdx) : computeBlockPointer(dst, pools, poolIdx);
174+
auto block_id = src->getBlockId();
166175

167176
TLLM_CHECK_WITH_INFO(
168177
directory.has_value(), "Expected a directory path for KVCache offload, but none was provided.");
169178

170-
int size = std::snprintf(
171-
nullptr, 0, "%s/block_%d_pool_%zu.bin", directory.value().c_str(), src->getBlockId(), poolIdx);
179+
int size = std::snprintf(nullptr, 0, "%s/block_%d_pool_%zu.bin", directory.value().c_str(), block_id, poolIdx);
172180

173181
std::string filename(size + 1, '\0');
174-
std::snprintf(filename.data(), filename.size(), "%s/block_%d_pool_%zu.bin", directory.value().c_str(),
175-
src->getBlockId(), poolIdx);
176-
177-
if (mode == executor::KvCacheTransferMode::POSIX_DEBUG_FALLBACK)
178-
{
179-
TLLM_LOG_INFO("Forcing POSIX fallback for file: %s", filename.c_str());
180-
if (isOffload)
181-
{
182-
gpuToFilePosix(srcPtr, filename);
183-
}
184-
else
185-
{
186-
fileToGpuPosix(dstPtr, filename);
187-
}
188-
continue;
189-
}
182+
std::snprintf(
183+
filename.data(), filename.size(), "%s/block_%d_pool_%zu.bin", directory.value().c_str(), block_id, poolIdx);
190184

191185
int openFlags = isOffload ? (O_CREAT | O_WRONLY) : O_RDONLY;
192-
int fd = ::open(filename.c_str(), openFlags, 0664);
193-
if (fd < 0)
194-
{
195-
TLLM_LOG_ERROR(
196-
"Failed to open '%s' for %s; fallback POSIX", filename.c_str(), (isOffload ? "writing" : "reading"));
197186

198-
if (isOffload)
199-
{
200-
gpuToFilePosix(srcPtr, filename);
201-
}
202-
else
203-
{
204-
fileToGpuPosix(dstPtr, filename);
205-
}
206-
continue;
207-
}
187+
fileBlobs.emplace_back(filename, openFlags, 0664, ptr->getSizeInBytes());
188+
memoryBlobs.emplace_back(ptr->data(), ptr->getSizeInBytes(), mDeviceId);
189+
}
208190

209-
#ifdef ENABLE_CUFILE
210-
CUfileDescr_t cufileDesc = {};
211-
cufileDesc.type = CU_FILE_HANDLE_TYPE_OPAQUE_FD;
212-
cufileDesc.handle.fd = fd;
191+
FileDescs fileDescs(fileBlobs);
192+
MemoryDescs memoryDescs(kvc::MemoryType::kVRAM, memoryBlobs);
213193

214-
CUfileHandle_t cufileHandle;
215-
CUfileError_t status = cuFileHandleRegister(&cufileHandle, &cufileDesc);
216-
if (status.err != CU_FILE_SUCCESS)
217-
{
218-
// Fallback to POSIX
219-
TLLM_LOG_WARN(
220-
"cuFileHandleRegister failed (err=%d). Falling back to POSIX for '%s'", status.err, filename.c_str());
221-
::close(fd);
222-
if (isOffload)
223-
gpuToFilePosix(srcPtr, filename);
224-
else
225-
fileToGpuPosix(dstPtr, filename);
226-
continue;
227-
}
194+
mLoopbackAgent->registerMemory(memoryDescs);
195+
mLoopbackAgent->registerFiles(fileDescs);
228196

229-
ssize_t numBytes = static_cast<ssize_t>(srcPtr->getSizeInBytes());
230-
if (isOffload)
231-
{
232-
ssize_t written = cuFileWrite(cufileHandle, srcPtr->data(), numBytes, 0, 0);
233-
if (written < 0)
234-
{
235-
TLLM_LOG_ERROR("cuFileWrite error=%zd. Fallback to POSIX", written);
236-
cuFileHandleDeregister(cufileHandle);
237-
::close(fd);
238-
gpuToFilePosix(srcPtr, filename);
239-
continue;
240-
}
241-
}
242-
else
243-
{
244-
ssize_t readCount = cuFileRead(cufileHandle, dstPtr->data(), numBytes, 0, 0);
245-
if (readCount < 0)
246-
{
247-
TLLM_LOG_ERROR("cuFileRead error=%zd. Fallback to POSIX", readCount);
248-
cuFileHandleDeregister(cufileHandle);
249-
::close(fd);
250-
fileToGpuPosix(dstPtr, filename);
251-
continue;
252-
}
253-
}
197+
std::unique_ptr<TransferStatus> status = mLoopbackAgent->submitLoopbackRequests(memoryDescs, fileDescs, isOffload);
198+
status->wait();
254199

255-
cuFileHandleDeregister(cufileHandle);
256-
::close(fd);
257-
#else
258-
// If GDS isn't enabled, fallback to POSIX automatically
259-
TLLM_LOG_DEBUG("ENABLE_CUFILE=OFF, so fallback to POSIX for %s", filename.c_str());
260-
::close(fd); // close the file opened for GDS
261-
if (isOffload)
262-
{
263-
gpuToFilePosix(srcPtr, filename);
264-
}
265-
else
266-
{
267-
fileToGpuPosix(dstPtr, filename);
268-
}
269-
#endif
270-
}
200+
mLoopbackAgent->deregisterMemory(memoryDescs);
201+
mLoopbackAgent->deregisterFiles(fileDescs);
271202
}
272203

273204
void KVCacheTransferManager::onboard(BlockPtr const& offloadBlock, BlockPtr const& block,

0 commit comments

Comments
 (0)