Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 32 additions & 8 deletions cpp/include/tensorrt_llm/batch_manager/kvCacheManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "tensorrt_llm/batch_manager/llmRequest.h" // TODO forward declare
#include "tensorrt_llm/common/optionalRef.h"
#include "tensorrt_llm/executor/executor.h"
#include "tensorrt_llm/executor/transferAgent.h"
#include "tensorrt_llm/kernels/kvCacheIndex.h"
#include "tensorrt_llm/runtime/bufferManager.h"
#include "tensorrt_llm/runtime/common.h"
Expand All @@ -42,6 +43,8 @@
#include <unordered_map>
#include <vector>

namespace kvc = tensorrt_llm::executor::kv_cache;

namespace tensorrt_llm::batch_manager::eviction_policy
{
class BaseEvictionPolicy;
Expand Down Expand Up @@ -445,6 +448,16 @@ class GenerationRequest
return mKvCacheRetentionConfig.getDecodeDurationMs();
}

[[nodiscard]] executor::KvCacheTransferMode getTransferMode() const
{
return mKvCacheRetentionConfig.getTransferMode();
}

[[nodiscard]] std::string const& getDirectory() const
{
return mKvCacheRetentionConfig.getDirectory();
}

// @brief Check whether the sequence uses cyclic KV cache.
// @return `true` if we have begun overwriting the beginning of the sequence's KV cache.
// @details If `true`, we cannot store the sequence's KV cache for reuse.
Expand Down Expand Up @@ -538,7 +551,8 @@ class WindowBlockManager
SizeType32 blocksInSecondaryPool, SizeType32 maxNumSequences, std::shared_ptr<runtime::CudaStream> stream,
bool onboardBlocks, CacheType cacheType, std::optional<executor::RetentionPriority> secondaryOffloadMinPriority,
std::shared_ptr<KVCacheEventManager> eventManager, bool enablePartialReuse, bool copyOnPartialReuse,
std::shared_ptr<kv_connector::KvCacheConnectorManager> kvCacheConnectorManager);
std::shared_ptr<kv_connector::KvCacheConnectorManager> kvCacheConnectorManager,
std::shared_ptr<kvc::BaseLoopbackAgent> loopbackAgent = nullptr);

~WindowBlockManager();

Expand Down Expand Up @@ -702,11 +716,13 @@ class WindowBlockManager

//! \brief Bring offloaded block from secondary to primary memory.
//! \details Does nothing if block is already in primary memory.
void onboardBlock(BlockPtr const& offloadBlock);
void onboardBlock(BlockPtr const& offloadBlock,
executor::KvCacheTransferMode mode = executor::KvCacheTransferMode::DRAM, std::string const& directory = "");

//! \brief Bring block from primary to secondary memory.
//! \details Does nothing if block is already in secondary memory.
void offloadBlock(BlockPtr const& block);
void offloadBlock(BlockPtr const& block, executor::KvCacheTransferMode mode = executor::KvCacheTransferMode::DRAM,
std::string const& directory = "");

//! \brief Find first new block that must be allocated for context phase and return it's concatenated token vectors.
//! \details Only full blocks are considered.
Expand Down Expand Up @@ -760,7 +776,8 @@ class WindowBlockManager
//! \param sequence Sequence to which blocks are assigned.
//! \return Number of matched tokens from loaded blocks.
SizeType32 loadOrAllocateBlocks(std::vector<BlockKey> const& blockKeys, SizeType32 numContextBlocks,
GenerationRequest& sequence, std::vector<executor::RetentionPriorityAndDuration> const& perBlockRetentions);
GenerationRequest& sequence, std::vector<executor::RetentionPriorityAndDuration> const& perBlockRetentions,
executor::KvCacheTransferMode mode = executor::KvCacheTransferMode::DRAM, std::string const& directory = "");

//! \brief Free block and all it's descendants. This makes block a claimed leaf block.
void freeChildren(BlockPtr const& block, executor::RetentionPriority priority,
Expand All @@ -769,7 +786,8 @@ class WindowBlockManager
//! \brief Find block least likely to be reused, free it if necessary and return.
[[nodiscard]] BlockPtr getFreeBlock(
executor::RetentionPriority = executor::KvCacheRetentionConfig::kDefaultRetentionPriority,
std::optional<std::chrono::milliseconds> durationMs = std::nullopt);
std::optional<std::chrono::milliseconds> durationMs = std::nullopt,
executor::KvCacheTransferMode mode = executor::KvCacheTransferMode::DRAM, std::string const& directory = "");

//! \brief Free block from previous block and claim it from free blocks list.
void claimLeafBlock(BlockPtr const& block, std::optional<executor::RetentionPriority> priority = std::nullopt,
Expand Down Expand Up @@ -817,6 +835,8 @@ class WindowBlockManager
std::shared_ptr<BaseEvictionPolicy> mEvictionPolicy;
// Event manager
std::shared_ptr<KVCacheEventManager> mEventManager;
// Pointer to parent loopback agent
std::shared_ptr<kvc::BaseLoopbackAgent> mLoopbackAgent;
// Transfer manager
std::shared_ptr<KVCacheTransferManager> mTransferManager;

Expand Down Expand Up @@ -864,7 +884,8 @@ class BlockManager
std::optional<executor::RetentionPriority> secondaryOffloadMinPriority = std::nullopt,
std::shared_ptr<KVCacheEventManager> eventManager = nullptr, bool enablePartialReuse = true,
bool copyOnPartialReuse = true,
std::shared_ptr<kv_connector::KvCacheConnectorManager> kvCacheConnectorManager = nullptr);
std::shared_ptr<kv_connector::KvCacheConnectorManager> kvCacheConnectorManager = nullptr,
std::optional<kvc::BaseAgentConfig> agentConfig = std::nullopt);

BlockManager(BlockManager const&) = delete;
BlockManager& operator=(BlockManager const&) = delete;
Expand Down Expand Up @@ -913,11 +934,13 @@ class BlockManager

//! \brief Bring block from primary to secondary memory for window size.
//! \details Does nothing if block is already in primary memory.
void onboardBlock(BlockPtr const& offloadBlock, SizeType32 windowSize);
void onboardBlock(BlockPtr const& offloadBlock, SizeType32 windowSize,
executor::KvCacheTransferMode mode = executor::KvCacheTransferMode::DRAM, std::string const& directory = "");

//! \brief Bring block from primary to secondary memory for window size.
//! \details Does nothing if block is already in secondary memory.
void offloadBlock(BlockPtr const& block, SizeType32 windowSize);
void offloadBlock(BlockPtr const& block, SizeType32 windowSize,
executor::KvCacheTransferMode mode = executor::KvCacheTransferMode::DRAM, std::string const& directory = "");

void storeBlocks(std::vector<BlockKey> const& blockKeys, std::vector<KVCacheBlock::IdType> const& blockIds,
SizeType32 windowSize)
Expand Down Expand Up @@ -1156,6 +1179,7 @@ class BlockManager
SizeType32 mNumLayers;
SizeType32 mTokensPerBlock;
std::shared_ptr<KVCacheEventManager> mEventManager;
std::shared_ptr<kvc::BaseLoopbackAgent> mLoopbackAgent;
CudaStreamPtr mStream;
CacheType mCacheType;

Expand Down
13 changes: 9 additions & 4 deletions cpp/include/tensorrt_llm/batch_manager/kvCacheTransferManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "tensorrt_llm/runtime/cudaEvent.h"

namespace tr = tensorrt_llm::runtime;
namespace kvc = tensorrt_llm::executor::kv_cache;

#pragma once

Expand All @@ -32,17 +33,18 @@ namespace tensorrt_llm::batch_manager::kv_cache_manager
class KVCacheTransferManager
{
public:
explicit KVCacheTransferManager(tr::BufferManager const& bufferManager);
explicit KVCacheTransferManager(
tr::BufferManager const& bufferManager, std::shared_ptr<kvc::BaseLoopbackAgent> loopbackAgent = nullptr);

//! \brief Onboard a block to gpu memory.
void onboard(BlockPtr const& offloadBlock, BlockPtr const& block, std::vector<KVCacheBlockPool> const& pools,
int numTokensToCopy = 0, executor::KvCacheTransferMode mode = executor::KvCacheTransferMode::DRAM,
std::optional<std::string> directory = std::nullopt);
std::string const& directory = "");

//! \brief Offload a block to cpu memory.
void offload(BlockPtr const& block, BlockPtr const& offloadBlock, std::vector<KVCacheBlockPool> const& pools,
int numTokensToCopy = 0, executor::KvCacheTransferMode mode = executor::KvCacheTransferMode::DRAM,
std::optional<std::string> directory = std::nullopt);
std::string const& directory = "");

//! \brief Synchronize the offload/onboard streams with the bufferManager stream.
void syncTransfers();
Expand All @@ -67,14 +69,17 @@ class KVCacheTransferManager
*/
void copyBlock(BlockPtr const& src, BlockPtr const& dst, std::vector<KVCacheBlockPool> const& pools, bool isOffload,
int numTokensToCopy = 0, executor::KvCacheTransferMode mode = executor::KvCacheTransferMode::DRAM,
std::optional<std::string> directory = std::nullopt);
std::string const& directory = "");

runtime::BufferManager mBufferManager;
runtime::BufferManager mOnboardManager;
runtime::BufferManager mOffloadManager;

// Track the block ids offloaded in this iteration.
std::unordered_map<int32_t, tr::CudaEvent> mPendingOffloads;
// Reference to parent loopback agent
std::shared_ptr<kvc::BaseLoopbackAgent> mLoopbackAgent;
int mDeviceId;
};

} // namespace tensorrt_llm::batch_manager::kv_cache_manager
7 changes: 3 additions & 4 deletions cpp/include/tensorrt_llm/executor/executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -582,14 +582,13 @@ class KvCacheRetentionConfig
explicit KvCacheRetentionConfig(std::vector<TokenRangeRetentionConfig> const& tokenRangeRetentionPriorities,
RetentionPriority decodeRetentionPriority = kDefaultRetentionPriority,
std::optional<std::chrono::milliseconds> decodeDurationMs = std::nullopt,
KvCacheTransferMode transferMode = KvCacheTransferMode::DRAM,
std::optional<std::string> directory = std::nullopt);
KvCacheTransferMode transferMode = KvCacheTransferMode::DRAM, std::string const& directory = "");

[[nodiscard]] std::vector<TokenRangeRetentionConfig> getTokenRangeRetentionConfigs() const;
[[nodiscard]] RetentionPriority getDecodeRetentionPriority() const;
[[nodiscard]] std::optional<std::chrono::milliseconds> getDecodeDurationMs() const;
[[nodiscard]] KvCacheTransferMode getTransferMode() const;
[[nodiscard]] std::optional<std::string> getDirectory() const;
[[nodiscard]] std::string const& getDirectory() const;

/// @brief Convert the token range data into an entry per kv block. Returns a tuple of vectors corresponding to the
/// priorities and durations for each block.
Expand All @@ -616,7 +615,7 @@ class KvCacheRetentionConfig
/// @brief The transfer mode for the block.
KvCacheTransferMode mTransferMode;
/// @brief Name of the directory if transfer mode is GDS or POSIX_DEBUG_FALLBACK.
std::optional<std::string> mDirectory;
std::string mDirectory;
};

/// @brief A class that holds information about the request
Expand Down
100 changes: 100 additions & 0 deletions cpp/include/tensorrt_llm/executor/transferAgent.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@
#pragma once

#include "tensorrt_llm/common/assert.h"
#include <fcntl.h>
#include <memory>
#include <mutex>
#include <optional>
#include <string>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <unordered_map>
#include <vector>

Expand Down Expand Up @@ -109,6 +113,80 @@ class MemoryDescs
std::vector<MemoryDesc> mDescs;
};

class FileDesc
{
public:
FileDesc(std::string const& filename, int flags, mode_t mode, size_t len)
: mLen{len}
{
int fd = ::open(filename.c_str(), flags, mode);
TLLM_CHECK_WITH_INFO(fd >= 0, "Failed to open '%s' (GDS)", filename.c_str());
this->fd = fd;
}

FileDesc(FileDesc&& other) noexcept
: fd(other.fd)
, mLen(other.mLen)
{
other.fd = -1;
other.mLen = 0;
}

FileDesc& operator=(FileDesc&& other) noexcept
{
if (this != &other)
{
if (fd != -1)
::close(fd);
fd = other.fd;
mLen = other.mLen;
other.fd = -1;
other.mLen = 0;
}
return *this;
}

~FileDesc()
{
if (fd != -1)
::close(fd);
}

[[nodiscard]] uint64_t getFd() const noexcept
{
return fd;
}

[[nodiscard]] size_t getLen() const noexcept
{
return mLen;
}

FileDesc(FileDesc const&) = delete;
FileDesc& operator=(FileDesc const&) = delete;

private:
int fd;
size_t mLen;
};

class FileDescs
{
public:
FileDescs(std::vector<FileDesc>&& descs)
: mDescs(std::move(descs))
{
}

[[nodiscard]] std::vector<FileDesc> const& getDescs() const noexcept
{
return mDescs;
}

private:
std::vector<FileDesc> mDescs;
};

using TransferDescs = MemoryDescs;
using RegisterDescs = MemoryDescs;
using SyncMessage = std::string;
Expand Down Expand Up @@ -195,6 +273,7 @@ struct BaseAgentConfig
{
std::string mName;
bool useProgThread;
bool multiThread;
};

class BaseTransferAgent
Expand All @@ -221,6 +300,13 @@ class BaseTransferAgent
virtual bool checkRemoteDescs(std::string const& name, MemoryDescs const& memoryDescs) = 0;
};

class BaseLoopbackAgent
{
public:
virtual ~BaseLoopbackAgent() = default;
virtual void executeLoopbackRequest(MemoryDescs const& memoryDescs, FileDescs const& fileDescs, bool isOffload) = 0;
};

class DynLibLoader final
{
public:
Expand Down Expand Up @@ -264,4 +350,18 @@ template <typename... Args>
TLLM_THROW("Unknown backend name.");
}

template <typename... Args>
[[nodiscard]] std::shared_ptr<BaseLoopbackAgent> makeLoopbackAgent(std::string const& backend, Args&&... args)
{
if (backend == "nixl")
{
auto& loader = DynLibLoader::getInstance();
using CreateNixlFuncType = std::shared_ptr<BaseLoopbackAgent> (*)(BaseAgentConfig const*);
auto* func = loader.getFunctionPointer<CreateNixlFuncType>(
"libtensorrt_llm_nixl_wrapper.so", "createNixlLoopbackAgent");
return func(std::forward<Args>(args)...);
}
TLLM_THROW("Unknown backend name.");
}

} // namespace tensorrt_llm::executor::kv_cache
Loading