Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 54 additions & 4 deletions python/sglang/jit_kernel/csrc/ngram_corpus/ngram.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,10 @@ void Ngram::insertWorker() {
}
}

Result Ngram::batchMatch(const std::vector<std::vector<int32_t>>& tokens) const {
Result Ngram::batchMatch(const std::vector<std::vector<int32_t>>& tokens) {
std::unique_lock<std::mutex> lock(mutex_);

using BuildFn = Result (Trie::*)(const int32_t*, size_t, int32_t, size_t, const Param&) const;
using BuildFn = Result (Trie::*)(const int32_t*, size_t, int32_t, size_t, const Param&, MatchState&, size_t) const;
BuildFn build_fn;
if (param_.match_type == "BFS") {
build_fn = &Trie::buildRecency;
Expand All @@ -91,13 +91,63 @@ Result Ngram::batchMatch(const std::vector<std::vector<int32_t>>& tokens) const
}

Result merged;
for (const auto& suffix : tokens) {
for (size_t i = 0; i < tokens.size(); ++i) {
const auto& suffix = tokens[i];
if (suffix.empty()) {
throw std::runtime_error("batchMatch received an empty token tail");
}
MatchState temp_state;
auto draft_token_num = param_.get_draft_token_num(tokens.size());
auto res = (trie_.get()->*build_fn)(
suffix.data(), suffix.size(), suffix.back(), draft_token_num, param_, temp_state, suffix.size());
merged.token.insert(merged.token.end(), res.token.begin(), res.token.end());
merged.mask.insert(merged.mask.end(), res.mask.begin(), res.mask.end());
}
return merged;
}

Result Ngram::batchMatch(
const std::vector<int64_t>& state_ids,
const std::vector<std::vector<int32_t>>& tokens,
const std::vector<size_t>& total_lens) {
if (state_ids.size() != tokens.size() || state_ids.size() != total_lens.size()) {
throw std::runtime_error("batchMatch expects state_ids, tokens, and total_lens to match in size");
}

std::unique_lock<std::mutex> lock(mutex_);

using BuildFn = Result (Trie::*)(const int32_t*, size_t, int32_t, size_t, const Param&, MatchState&, size_t) const;
BuildFn build_fn;
if (param_.match_type == "BFS") {
build_fn = &Trie::buildRecency;
} else if (param_.match_type == "PROB") {
build_fn = &Trie::buildFrequency;
} else {
throw std::runtime_error("Unknown match_type: '" + param_.match_type + "'. Must be 'BFS' or 'PROB'.");
}

Result merged;
for (size_t i = 0; i < state_ids.size(); ++i) {
const auto& suffix = tokens[i];
if (suffix.empty()) {
throw std::runtime_error("batchMatch received an empty token tail");
}

auto& state = match_state_[state_ids[i]];
auto draft_token_num = param_.get_draft_token_num(tokens.size());
auto res = (trie_.get()->*build_fn)(suffix.data(), suffix.size(), suffix.back(), draft_token_num, param_);
auto res = (trie_.get()->*build_fn)(
suffix.data(), suffix.size(), suffix.back(), draft_token_num, param_, state, total_lens[i]);
merged.token.insert(merged.token.end(), res.token.begin(), res.token.end());
merged.mask.insert(merged.mask.end(), res.mask.begin(), res.mask.end());
}
return merged;
}

void Ngram::eraseMatchState(const std::vector<int64_t>& state_ids) {
std::unique_lock<std::mutex> lock(mutex_);
for (const auto& sid : state_ids) {
match_state_.erase(sid);
}
}

} // namespace ngram
13 changes: 12 additions & 1 deletion python/sglang/jit_kernel/csrc/ngram_corpus/ngram.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
#include <cstdint>
#include <memory>
#include <mutex>
#include <string>
#include <thread>
#include <unordered_map>
#include <vector>

namespace ngram {
Expand All @@ -28,6 +30,7 @@ class Ngram {
size_t pending_count_ = 0;
utils::Queue<std::vector<int32_t>> insert_queue_;
std::thread insert_worker_;
std::unordered_map<int64_t, MatchState> match_state_;

public:
Ngram(size_t capacity, const Param& param);
Expand All @@ -37,13 +40,21 @@ class Ngram {

void asyncInsert(std::vector<std::vector<int32_t>>&& tokens);

Result batchMatch(const std::vector<std::vector<int32_t>>& tokens) const;
Result batchMatch(const std::vector<std::vector<int32_t>>& tokens);

Result batchMatch(
const std::vector<int64_t>& state_ids,
const std::vector<std::vector<int32_t>>& tokens,
const std::vector<size_t>& total_lens);

void eraseMatchState(const std::vector<int64_t>& state_ids);

void reset() {
std::unique_lock<std::mutex> lock(mutex_);
if (trie_) {
trie_->reset();
}
match_state_.clear();
}

const Param& param() const {
Expand Down
56 changes: 47 additions & 9 deletions python/sglang/jit_kernel/csrc/ngram_corpus/ngram_corpus_ffi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,52 @@ struct NgramCorpusObj : public tvm::ffi::Object {
}

auto result = ngram_->batchMatch(tokens);
write_result_(result, out_tokens, out_mask);
}

void batch_match_stateful(
const tvm::ffi::TensorView state_ids_tv,
const tvm::ffi::TensorView tokens_flat,
const tvm::ffi::TensorView offsets,
const tvm::ffi::TensorView total_lens_tv,
const tvm::ffi::TensorView out_tokens,
const tvm::ffi::TensorView out_mask) {
auto* sid = static_cast<const int64_t*>(state_ids_tv.data_ptr());
auto* data = static_cast<const int32_t*>(tokens_flat.data_ptr());
auto* offs = static_cast<const int64_t*>(offsets.data_ptr());
auto* tlens = static_cast<const int64_t*>(total_lens_tv.data_ptr());
int64_t batch_size = offsets.size(0) - 1;

std::vector<int64_t> state_ids(sid, sid + batch_size);
std::vector<std::vector<int32_t>> tokens(batch_size);
std::vector<size_t> total_lens(batch_size);
for (int64_t i = 0; i < batch_size; ++i) {
tokens[i].assign(data + offs[i], data + offs[i + 1]);
total_lens[i] = static_cast<size_t>(tlens[i]);
}

auto result = ngram_->batchMatch(state_ids, tokens, total_lens);
write_result_(result, out_tokens, out_mask);
}

void erase_match_state(const tvm::ffi::TensorView state_ids_tv) {
auto* sid = static_cast<const int64_t*>(state_ids_tv.data_ptr());
int64_t n = state_ids_tv.size(0);
std::vector<int64_t> state_ids(sid, sid + n);
ngram_->eraseMatchState(state_ids);
}

void synchronize() {
ngram_->synchronize();
}

void reset() {
ngram_->reset();
}

private:
void write_result_(
const ngram::Result& result, const tvm::ffi::TensorView& out_tokens, const tvm::ffi::TensorView& out_mask) {
auto* out_tok = static_cast<int32_t*>(out_tokens.data_ptr());
auto* out_msk = static_cast<uint8_t*>(out_mask.data_ptr());
if (result.token.size() > static_cast<size_t>(out_tokens.size(0))) {
Expand All @@ -79,15 +124,6 @@ struct NgramCorpusObj : public tvm::ffi::Object {
std::memcpy(out_msk, result.mask.data(), result.mask.size() * sizeof(uint8_t));
}

void synchronize() {
ngram_->synchronize();
}

void reset() {
ngram_->reset();
}

private:
std::unique_ptr<ngram::Ngram> ngram_;
};

Expand All @@ -97,6 +133,8 @@ void register_ngram_corpus() {
.def(refl::init<int64_t, int64_t, int64_t, int64_t, int64_t, int64_t>(), "__init__")
.def("async_insert", &NgramCorpusObj::async_insert)
.def("batch_match", &NgramCorpusObj::batch_match)
.def("batch_match_stateful", &NgramCorpusObj::batch_match_stateful)
.def("erase_match_state", &NgramCorpusObj::erase_match_state)
.def("synchronize", &NgramCorpusObj::synchronize)
.def("reset", &NgramCorpusObj::reset);
}
Expand Down
122 changes: 110 additions & 12 deletions python/sglang/jit_kernel/csrc/ngram_corpus/trie.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,14 @@ void Trie::squeeze(size_t count) {
last->parent->lru.erase(last->parent_lru_pos);
last->parent->sorted_children.erase(last);
last->parent->child.erase(last->token);
retireNode(last);

node_pool_[free_node_count_++] = last;
}
}

void Trie::reset() {
++trie_epoch_;
global_lru_.clear();
path_.clear();
node_pool_.clear();
Expand All @@ -100,11 +102,31 @@ void Trie::reset() {
root_ = getNode();
}

std::vector<std::pair<TrieNode*, int32_t>> Trie::match(const int32_t* context, size_t len) const {
std::vector<std::pair<TrieNode*, int32_t>> result;
const TrieNode* Trie::resolve(const MatchState& state, const NodeRef& ref) const {
if (ref.ptr == nullptr || state.trie_epoch != trie_epoch_ || ref.ptr->version != ref.version) {
return nullptr;
}
return ref.ptr;
}

bool Trie::validateMatchState_(const MatchState& state) const {
if (state.trie_epoch != trie_epoch_) {
return false;
}
for (const auto& ref : state.anchors) {
if (ref.ptr && !resolve(state, ref)) {
return false;
}
}
return true;
}

void Trie::rebuildMatchState_(const int32_t* context, size_t len, MatchState& state, size_t total_len) const {
const auto max_match_depth = std::min(len, param_.max_trie_depth);
result.reserve(max_match_depth);
for (size_t match_depth = max_match_depth; match_depth > 0; --match_depth) {
state.trie_epoch = trie_epoch_;
state.processed_total_len = total_len;
state.anchors.assign(max_match_depth, {});
for (size_t match_depth = 1; match_depth <= max_match_depth; ++match_depth) {
auto start = context + len - match_depth;
auto end = start + match_depth;
auto cursor = root_;
Expand All @@ -117,17 +139,88 @@ std::vector<std::pair<TrieNode*, int32_t>> Trie::match(const int32_t* context, s
++start;
cursor = iter->second;
}
if (cursor != nullptr && !cursor->child.empty()) {
result.emplace_back(cursor, static_cast<int32_t>(match_depth));
if (cursor != nullptr) {
state.anchors[match_depth - 1] = capture(cursor);
}
}
}

bool Trie::advanceMatchState_(MatchState& state, const int32_t* tokens, size_t len, size_t total_len) const {
if (!validateMatchState_(state)) {
return false;
}

for (size_t i = 0; i < len; ++i) {
const auto next_depth = std::min(state.anchors.size() + 1, param_.max_trie_depth);
std::vector<NodeRef> next(next_depth);

const auto root_ref = rootRef();
const auto root = resolve(state, root_ref);
if (root == nullptr) {
return false;
}
if (auto iter = root->child.find(tokens[i]); iter != root->child.end()) {
next[0] = capture(iter->second);
}

for (size_t depth = 1; depth < next_depth; ++depth) {
const auto& prev_ref = state.anchors[depth - 1];
if (prev_ref.ptr == nullptr) {
continue;
}
const auto prev_node = resolve(state, prev_ref);
if (prev_node == nullptr) {
return false;
}
if (auto iter = prev_node->child.find(tokens[i]); iter != prev_node->child.end()) {
next[depth] = capture(iter->second);
}
}

state.anchors.swap(next);
}

state.processed_total_len = total_len;
return true;
}

std::vector<std::pair<const TrieNode*, int32_t>> Trie::getExpandableAnchors_(const MatchState& state) const {
std::vector<std::pair<const TrieNode*, int32_t>> result;
result.reserve(state.anchors.size());
for (size_t depth = state.anchors.size(); depth > 0; --depth) {
const auto node = resolve(state, state.anchors[depth - 1]);
if (node != nullptr && !node->child.empty()) {
result.emplace_back(node, static_cast<int32_t>(depth));
}
}
return result;
}

Result Trie::buildRecency(
const int32_t* context, size_t len, int32_t last_token, size_t draft_token_num, const Param& param) const {
auto anchors = match(context, len);
std::vector<std::pair<const TrieNode*, int32_t>>
Trie::match(const int32_t* context, size_t len, MatchState& state, size_t total_len) const {
const bool has_forward_progress = total_len >= state.processed_total_len;
const auto appended_len = has_forward_progress ? total_len - state.processed_total_len : 0;
const auto expected_prev_depth = std::min(state.processed_total_len, param_.max_trie_depth);
const bool can_advance = state.trie_epoch == trie_epoch_ && has_forward_progress && appended_len <= len &&
state.anchors.size() == expected_prev_depth;

if (can_advance && advanceMatchState_(state, context + len - appended_len, appended_len, total_len)) {
return getExpandableAnchors_(state);
}

rebuildMatchState_(context, len, state, total_len);
return getExpandableAnchors_(state);
}

Result Trie::buildRecency(
const int32_t* context,
size_t len,
int32_t last_token,
size_t draft_token_num,
const Param& param,
MatchState& state,
size_t total_len) const {
auto anchors = match(context, len, state, total_len);
const auto max_match_depth = std::max<int32_t>(1, static_cast<int32_t>(param.max_trie_depth - 1));
double bfs_breadth_scale = double(param.max_bfs_breadth - param.min_bfs_breadth) / max_match_depth;

Expand Down Expand Up @@ -166,9 +259,14 @@ Result Trie::buildRecency(
}

Result Trie::buildFrequency(
const int32_t* context, size_t len, int32_t last_token, size_t draft_token_num, const Param& param) const {
auto anchors = match(context, len);

const int32_t* context,
size_t len,
int32_t last_token,
size_t draft_token_num,
const Param& param,
MatchState& state,
size_t total_len) const {
auto anchors = match(context, len, state, total_len);
struct CompareByLastDouble {
bool operator()(
const std::tuple<double, const TrieNode*, double>& a,
Expand Down
Loading
Loading