Skip to content
Open
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
0443c4f
adding byot feature
ryanolson Jul 14, 2025
abeb568
async-openai features added at top-level; removing features from the …
ryanolson Jul 14, 2025
32933eb
updates
ryanolson Jul 14, 2025
cbaec4a
refactor
ryanolson Jul 14, 2025
cdf2dfd
reducing scope on recorded stream
ryanolson Jul 15, 2025
0b56e7c
fix: resolve merge conflicts in perf module and http service tests
ryanolson Jul 15, 2025
fa0e539
full end-to-end tests with captures vllm sse streams
ryanolson Jul 15, 2025
6ed716d
adding capture notes
ryanolson Jul 15, 2025
9e63c00
big change: dropping Sync requirement from Resp in AsyncEngine<Req, R…
ryanolson Jul 15, 2025
2c50d30
rebased off main - ugly mismatch from ryan/http-client
ryanolson Jul 16, 2025
485bbfe
removed comments
ryanolson Jul 16, 2025
56093db
Merge branch 'main' into ryan/logprob-v2
ryanolson Jul 16, 2025
f5b96db
missing test data file
ryanolson Jul 16, 2025
3723e92
Merge branch 'ryan/logprob-v2' of github.com:ai-dynamo/dynamo into ry…
ryanolson Jul 16, 2025
ab43e34
Merge branch 'main' into ryan/logprob-v2
ryanolson Jul 16, 2025
af65f3f
update filename
ryanolson Jul 16, 2025
3a9b2b3
Merge branch 'main' into ryan/logprob-v2
ryanolson Jul 16, 2025
a52d174
updates to lobprobs api and data structures
ryanolson Jul 17, 2025
a1b94bd
Merge branch 'ryan/logprob-v2' of github.com:ai-dynamo/dynamo into ry…
ryanolson Jul 17, 2025
623fd0e
perform token counting analysis
ryanolson Jul 17, 2025
189e79b
updates
ryanolson Jul 17, 2025
19a6abd
merge main
ryanolson Jul 17, 2025
3278239
update deny.toml
ryanolson Jul 17, 2025
de4d30a
Merge branch 'main' into ryan/token-counting-analysis
ryanolson Jul 17, 2025
3f2aa57
update deny.toml
ryanolson Jul 17, 2025
2647113
Merge branch 'ryan/token-counting-analysis' of github.com:ai-dynamo/d…
ryanolson Jul 17, 2025
c0d6971
adding filename/path to the read file error
ryanolson Jul 17, 2025
3b516ee
Merge branch 'main' into ryan/token-counting-analysis
ryanolson Jul 19, 2025
bc72bbb
adding libbz2
ryanolson Jul 19, 2025
1ac6ad0
libbz2 -> bzip2
ryanolson Jul 19, 2025
8e487c9
bzip2 -> libbz2-1.0
ryanolson Jul 19, 2025
537b52b
updates to reduce box pinning streams
ryanolson Jul 19, 2025
c77a319
remove ci change
ryanolson Jul 19, 2025
d065bdf
trying libbz2-dev
ryanolson Jul 19, 2025
a7be2e7
pivot: bzip2 -> gzip
ryanolson Jul 19, 2025
2218bbb
adding git-lfs
ryanolson Jul 19, 2025
13514ca
adding checkotu with lfs
ryanolson Jul 19, 2025
c1cb46d
try to improve ci caching
ryanolson Jul 19, 2025
54709e5
Merge branch 'main' into ryan/token-counting-analysis
ryanolson Jul 19, 2025
1753c2e
update ci for caching
ryanolson Jul 19, 2025
413f3b6
updating ci again
ryanolson Jul 19, 2025
16c0a56
Merge branch 'ryan/token-counting-analysis' of github.com:ai-dynamo/d…
ryanolson Jul 19, 2025
d8c844e
still seeing slow ci; merging slow cargo bits together
ryanolson Jul 19, 2025
8dbdd09
more ci
ryanolson Jul 19, 2025
c17fc24
descoping
ryanolson Jul 19, 2025
86f1e29
clean up updated ci file
ryanolson Jul 19, 2025
d613a4b
adding a ci profile
ryanolson Jul 19, 2025
f7b56e2
invalidate cache
ryanolson Jul 19, 2025
3f33571
adding ci profiles everywhere
ryanolson Jul 19, 2025
f176216
missing ci profile
ryanolson Jul 19, 2025
36629f0
inherits is mandatory
ryanolson Jul 19, 2025
0fa3f12
update gguf tokenizer to address upstream changes
ryanolson Jul 19, 2025
57a94f4
Merge branch 'main' into ryan/token-counting-analysis
ryanolson Jul 21, 2025
0a29d53
not sure why but tests in the vllm container are failing
ryanolson Jul 21, 2025
94f7ebe
Merge branch 'ryan/token-counting-analysis' of github.com:ai-dynamo/d…
ryanolson Jul 21, 2025
cad03b3
refactor: remove deepseek tokenizer and replace with mock tokenizer
ryanolson Sep 18, 2025
6d593eb
feat: merge main branch and resolve conflicts
ryanolson Sep 18, 2025
e4ae6f7
feat: merge main branch into token-counting-analysis
ryanolson Sep 26, 2025
17316a0
chore: remove old utils.rs file after module restructure
ryanolson Sep 26, 2025
7a5b1a6
style: format code with Rust 1.90 rustfmt
ryanolson Sep 26, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitattributes
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
*.[Pp][Nn][Gg] binary
*.[Zz][Ii][Pp] binary
*.[Tt][Gg][Zz] binary

# Exclude test data files from linguist language detection
lib/llm/tests/data/** linguist-vendored
lib/llm/tests/snapshots/** linguist-vendored
lib/llm/tests/data/replays/deepseek-r1-distill-llama-8b/tokenizer-deepseek-r1-distill-llama-8b.json.bz2 filter=lfs diff=lfs merge=lfs -text
16 changes: 16 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ async-stream = { version = "0.3" }
async-trait = { version = "0.1" }
async_zmq = { version = "0.4.0" }
blake3 = { version = "1" }
bzip2 = { version = "0.6" } # Purpose: Pure Rust bzip2 compression/decompression
bytes = { version = "1" }
chrono = { version = "0.4", default-features = false, features = ["alloc", "std", "clock", "now", "serde"] }
derive_builder = { version = "0.20" }
Expand Down
5 changes: 4 additions & 1 deletion deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ allow = [
"BSL-1.0",
"MPL-2.0",
"CDLA-Permissive-2.0",
"Zlib"
"Zlib",
"bzip2-1.0.6"
]

# TODO exceptions
Expand All @@ -43,6 +44,7 @@ allow = [
# MIT: https://github.com/guidance-ai/llguidance/toktrie_hf_tokenizers
# "toktrie_hf_tokenizers",


[[licenses.clarify]]

name = "ring"
Expand All @@ -51,6 +53,7 @@ license-files = [
{ path = "LICENSE", hash = 0xbd0eed23 }
]


[bans]
deny = [
# Ensure we don't depend on openssl
Expand Down
16 changes: 16 additions & 0 deletions lib/bindings/python/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion lib/llm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ async-stream = { workspace = true }
async-trait = { workspace = true }
async-nats = { workspace = true }
async_zmq = { workspace = true }
bzip2 = { workspace = true }
bytes = { workspace = true }
chrono = { workspace = true }
derive_builder = {workspace = true }
Expand All @@ -77,7 +78,7 @@ uuid = { workspace = true }
xxhash-rust = { workspace = true }

akin = "0.4.0"
blake3 = "1"
blake3 = { workspace = true }
bytemuck = "1.22"
candle-core = { version = "0.8.0" }
derive-getters = "0.5"
Expand Down
1 change: 1 addition & 0 deletions lib/llm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub mod request_template;
pub mod tokenizers;
pub mod tokens;
pub mod types;
pub mod utils;

#[cfg(feature = "block-manager")]
pub mod block_manager;
Expand Down
65 changes: 65 additions & 0 deletions lib/llm/src/perf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,14 @@
//! during collection, then analyze the recorded data for performance insights.

pub mod logprobs;
pub mod tokens;

use anyhow::Context as ErrorContext;
use dynamo_runtime::protocols::annotated::Annotated;
use futures::Stream;
use serde::de::DeserializeOwned;
use serde::Serialize;

use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
Expand All @@ -21,6 +27,9 @@ use dynamo_runtime::engine::{
};
use std::sync::Arc;

pub use crate::protocols::codec::create_message_stream as parse_sse_stream;
pub use crate::protocols::convert_sse_stream as convert_sse_to_annotated_stream;

/// Type alias for a receiver of recorded stream data
pub type RecordedStreamReceiver<R> = oneshot::Receiver<RecordedStream<R>>;

Expand Down Expand Up @@ -340,6 +349,62 @@ pub fn record_response_stream<R: Data + Clone>(
record_stream(response_stream, mode)
}

/// Record a data stream by consuming it entirely and returning the recorded data.
///
/// This is a convenience function that operates only in "sink" mode - it consumes
/// the entire stream and returns all the recorded responses. Unlike the other recording
/// functions, this doesn't return a pass-through stream.
///
/// # Arguments
/// * `data_stream` - The data stream to consume and record
///
/// # Returns
/// A `RecordedStream` containing all the responses with timing information
///
/// # Example
/// ```rust,ignore
/// use dynamo_llm::perf::record_data_stream;
///
/// let stream = create_my_data_stream();
/// let recorded = record_data_stream(stream).await;
/// println!("Recorded {} responses", recorded.response_count());
/// ```
pub async fn record_data_stream<R: Data + Clone>(
mut data_stream: DataStream<R>,
) -> RecordedStream<R> {
use futures::StreamExt;

let start_time = Instant::now();
let mut responses = Vec::new();
let mut sequence_number = 0;

// Consume the entire stream
while let Some(item) = data_stream.next().await {
responses.push(TimestampedResponse::new(item, sequence_number));
sequence_number += 1;
}

let end_time = Instant::now();

RecordedStream::new(responses, start_time, end_time)
}

/// Read an annotated stream from a file and collect all items
pub fn read_annotated_stream_from_file<T: Serialize + DeserializeOwned>(
path: &str,
) -> Result<DataStream<Annotated<T>>, anyhow::Error> {
// Read the entire file as a string
let data = std::fs::read_to_string(path)
.map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
.with_context(|| format!("Failed to read file: {}", path))?;

// Create SSE stream from the string data
let sse_stream = parse_sse_stream(&data);

// Convert SSE messages to annotated stream
Ok(convert_sse_to_annotated_stream::<T>(Box::pin(sse_stream)))
}

#[cfg(test)]
pub mod tests {
use super::*;
Expand Down
21 changes: 7 additions & 14 deletions lib/llm/src/perf/logprobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -568,9 +568,10 @@ mod tests {
type TestTokenAlternative = (&'static str, f32);
type TestTokenData = (&'static str, f32, Vec<TestTokenAlternative>);
type TestTokenDataVec = Vec<TestTokenData>;
use crate::perf::{record_stream_with_context, RecordingMode, TimestampedResponse};
use crate::protocols::codec::create_message_stream;
use crate::protocols::convert_sse_stream;
use crate::perf::{
read_annotated_stream_from_file, record_stream_with_context, RecordingMode,
TimestampedResponse,
};
use approx::assert_abs_diff_eq;
use async_openai::types::{
ChatChoiceLogprobs, ChatChoiceStream, ChatCompletionStreamResponseDelta,
Expand Down Expand Up @@ -1449,21 +1450,13 @@ mod tests {

#[tokio::test]
async fn test_real_sse_stream_analysis() {
// Read the real SSE data with logprobs
let data = std::fs::read_to_string(
let stream = read_annotated_stream_from_file::<NvCreateChatCompletionStreamResponse>(
"tests/data/replays/deepseek-r1-distill-llama-8b/chat-completions.stream.1",
)
.expect("Failed to read test data file");

// Create stream from SSE data
let sse_stream = create_message_stream(&data);

// Convert SSE messages to our stream response format using the existing converter
let response_stream =
convert_sse_stream::<NvCreateChatCompletionStreamResponse>(Box::pin(sse_stream));
.unwrap();

// Filter out errors and extract successful responses
let filtered_stream = response_stream.filter_map(|annotated| async move { annotated.data });
let filtered_stream = stream.filter_map(|annotated| async move { annotated.data });

// Create a mock context for recording
let ctx = Arc::new(MockContext::new());
Expand Down
Loading
Loading