From d895cafeb06c5e5d1c1ad3b195852ccee64abcd2 Mon Sep 17 00:00:00 2001 From: Ryan Olson Date: Mon, 14 Jul 2025 18:53:37 +0000 Subject: [PATCH 01/12] adding http clients and recorded response stream --- Cargo.toml | 2 +- lib/llm/src/http.rs | 1 + lib/llm/src/lib.rs | 1 + lib/llm/src/perf.rs | 677 ++++++++++++++++++++++++++++++++++ lib/llm/tests/http-service.rs | 405 +++++++++++++++++++- lib/runtime/src/engine.rs | 2 +- 6 files changed, 1081 insertions(+), 7 deletions(-) create mode 100644 lib/llm/src/perf.rs diff --git a/Cargo.toml b/Cargo.toml index a41fe37005..bd5aa206ba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,7 +46,7 @@ dynamo-tokens = { path = "lib/tokens", version = "0.3.1" } # External dependencies anyhow = { version = "1" } async-nats = { version = "0.40", features = ["service"] } -async-openai = { version = "0.29.0" } +async-openai = { version = "0.29.0", features = ["rustls", "byot"] } async-stream = { version = "0.3" } async-trait = { version = "0.1" } async_zmq = { version = "0.4.0" } diff --git a/lib/llm/src/http.rs b/lib/llm/src/http.rs index f429524497..98acf8675e 100644 --- a/lib/llm/src/http.rs +++ b/lib/llm/src/http.rs @@ -13,4 +13,5 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod client; pub mod service; diff --git a/lib/llm/src/lib.rs b/lib/llm/src/lib.rs index 5027810f29..e5e2e50fb6 100644 --- a/lib/llm/src/lib.rs +++ b/lib/llm/src/lib.rs @@ -25,6 +25,7 @@ pub mod local_model; pub mod mocker; pub mod model_card; pub mod model_type; +pub mod perf; pub mod preprocessor; pub mod protocols; pub mod recorder; diff --git a/lib/llm/src/perf.rs b/lib/llm/src/perf.rs new file mode 100644 index 0000000000..168c06ec9a --- /dev/null +++ b/lib/llm/src/perf.rs @@ -0,0 +1,677 @@ +//! Performance recording and analysis for streaming LLM responses +//! +//! This module provides mechanisms to record streaming responses with minimal overhead +//! during collection, then analyze the recorded data for performance insights. + +use futures::Stream; +use serde::{Deserialize, Serialize}; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::{Duration, Instant}; +use tokio::sync::oneshot; + +// Import the runtime types we need +use dynamo_runtime::engine::{ + AsyncEngineContext, AsyncEngineContextProvider, AsyncEngineStream, Data, ResponseStream, +}; +use std::sync::Arc; + +/// A response wrapper that adds timing information with minimal overhead +#[derive(Debug, Clone)] +pub struct TimestampedResponse { + /// The actual response data + pub response: T, + /// High-resolution timestamp when this response was recorded + pub timestamp: Instant, + /// Sequence number in the stream (0-based) + pub sequence_number: usize, +} + +impl TimestampedResponse { + /// Create a new timestamped response + pub fn new(response: T, sequence_number: usize) -> Self { + Self { + response, + timestamp: Instant::now(), + sequence_number, + } + } + + /// Get the response data + pub fn data(&self) -> &T { + &self.response + } + + /// Get the elapsed time since stream start + pub fn elapsed_since(&self, start_time: Instant) -> Duration { + self.timestamp.duration_since(start_time) + } +} + +/// Trait for requests that can provide hints about expected response count +/// This enables capacity pre-allocation for better performance +pub trait CapacityHint { + /// Estimate the number of responses this request might generate + /// Returns None if estimation is not possible + fn estimated_response_count(&self) -> Option; +} + +/// Recording mode determines how the recorder behaves with the stream +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum RecordingMode { + /// Pass responses through while recording (scan mode) + /// Stream continues to flow to downstream consumers + Scan, + /// Consume responses as terminus (sink mode) + /// Stream ends at the recorder + Sink, +} + +/// Container for recorded streaming responses with analysis capabilities +#[derive(Debug, Clone)] +pub struct RecordedStream { + /// All recorded responses with timestamps + pub responses: Vec>, + /// Total duration from first to last response + pub total_duration: Duration, + /// When recording started + pub start_time: Instant, + /// When recording ended + pub end_time: Instant, +} + +impl RecordedStream { + /// Create a new recorded stream from collected responses + pub fn new( + responses: Vec>, + start_time: Instant, + end_time: Instant, + ) -> Self { + let total_duration = end_time.duration_since(start_time); + Self { + responses, + total_duration, + start_time, + end_time, + } + } + + /// Get the number of responses recorded + pub fn response_count(&self) -> usize { + self.responses.len() + } + + /// Get the total duration of the stream + pub fn total_duration(&self) -> Duration { + self.total_duration + } + + /// Calculate the average time between responses + pub fn average_inter_response_time(&self) -> Option { + if self.responses.len() < 2 { + return None; + } + + let total_time = self.total_duration; + let intervals = self.responses.len() - 1; + Some(total_time / intervals as u32) + } + + /// Get inter-response timings (time between consecutive responses) + pub fn inter_response_times(&self) -> Vec { + self.responses + .windows(2) + .map(|pair| pair[1].timestamp.duration_since(pair[0].timestamp)) + .collect() + } + + /// Get response timings relative to stream start + pub fn response_timings(&self) -> Vec { + self.responses + .iter() + .map(|r| r.elapsed_since(self.start_time)) + .collect() + } +} + +/// Recording stream that wraps an AsyncEngineStream and records responses +/// Following the pattern of ResponseStream for AsyncEngine compatibility +pub struct RecordingStream { + /// The wrapped stream + stream: Pin + Send + Sync>>, + /// Context from the original stream + ctx: Arc, + /// Recording mode + mode: RecordingMode, + /// Recorded responses + responses: Vec>, + /// When recording started + start_time: Instant, + /// Channel to send recorded data when stream completes + recorded_tx: Option>>, +} + +impl Unpin for RecordingStream {} + +impl RecordingStream { + /// Create a new recording stream from a raw stream and context + pub fn from_stream_and_context( + stream: Pin + Send + Sync>>, + ctx: Arc, + mode: RecordingMode, + capacity: Option, + recorded_tx: oneshot::Sender>, + ) -> Self { + let mut responses = Vec::new(); + if let Some(cap) = capacity { + responses.reserve(cap); + } + + Self { + stream, + ctx, + mode, + responses, + start_time: Instant::now(), + recorded_tx: Some(recorded_tx), + } + } + + /// Create a new recording stream from an AsyncEngineStream (private constructor) + fn from_async_engine_stream( + stream: Pin>>, + mode: RecordingMode, + capacity: Option, + recorded_tx: oneshot::Sender>, + ) -> Self { + let ctx = stream.context(); + Self::from_stream_and_context(stream, ctx, mode, capacity, recorded_tx) + } + + /// Convert to Pin>> + pub fn into_async_engine_stream(self) -> Pin>> { + Box::pin(self) + } +} + +impl From>>> for RecordingStream { + fn from(stream: Pin>>) -> Self { + let (tx, _rx) = oneshot::channel(); // Note: receiver is dropped, for convenience + Self::from_async_engine_stream(stream, RecordingMode::Scan, None, tx) + } +} + +impl From> for RecordingStream { + fn from(response_stream: ResponseStream) -> Self { + let (tx, _rx) = oneshot::channel(); // Note: receiver is dropped, for convenience + Self::from_async_engine_stream(Box::pin(response_stream), RecordingMode::Scan, None, tx) + } +} + +impl From>>> for RecordingStream { + fn from(response_stream: Pin>>) -> Self { + let (tx, _rx) = oneshot::channel(); // Note: receiver is dropped, for convenience + Self::from_async_engine_stream(response_stream, RecordingMode::Scan, None, tx) + } +} + +impl Stream for RecordingStream { + type Item = R; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.as_mut().get_mut(); + + match Pin::new(&mut this.stream).poll_next(cx) { + Poll::Ready(Some(item)) => { + // Always capture timestamp first (cheap operation) + let timestamp = Instant::now(); + let sequence_number = this.responses.len(); + + match this.mode { + RecordingMode::Scan => { + // Clone for recording, pass original through + let timestamped = TimestampedResponse { + response: item.clone(), + timestamp, + sequence_number, + }; + this.responses.push(timestamped); + Poll::Ready(Some(item)) // Pass through original + } + RecordingMode::Sink => { + // Move item directly into recording (no clone needed) + let timestamped = TimestampedResponse { + response: item, // Move, don't clone + timestamp, + sequence_number, + }; + this.responses.push(timestamped); + + // Continue consuming but don't emit + self.poll_next(cx) + } + } + } + Poll::Ready(None) => { + // Stream ended - send recorded data + if let Some(tx) = this.recorded_tx.take() { + let recorded = RecordedStream::new( + std::mem::take(&mut this.responses), + this.start_time, + Instant::now(), + ); + let _ = tx.send(recorded); // Ignore if receiver dropped + } + + Poll::Ready(None) + } + Poll::Pending => Poll::Pending, + } + } +} + +impl AsyncEngineStream for RecordingStream {} + +impl AsyncEngineContextProvider for RecordingStream { + fn context(&self) -> Arc { + self.ctx.clone() + } +} + +impl std::fmt::Debug for RecordingStream { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RecordingStream") + .field("mode", &self.mode) + .field("responses_count", &self.responses.len()) + .field("ctx", &self.ctx) + .finish() + } +} + +/// Create a recording stream that wraps an AsyncEngineStream +/// Returns a pinned stream and a receiver for the recorded data +pub fn record_stream( + stream: Pin>>, + mode: RecordingMode, +) -> ( + Pin>>, + oneshot::Receiver>, +) { + let (tx, rx) = oneshot::channel(); + let recording_stream = RecordingStream::from_async_engine_stream(stream, mode, None, tx); + let boxed_stream = Box::pin(recording_stream); + (boxed_stream, rx) +} + +/// Create a recording stream from a raw stream and context +/// Returns a pinned stream and a receiver for the recorded data +pub fn record_stream_with_context( + stream: Pin + Send + Sync>>, + ctx: Arc, + mode: RecordingMode, +) -> ( + Pin>>, + oneshot::Receiver>, +) { + let (tx, rx) = oneshot::channel(); + let recording_stream = RecordingStream::from_stream_and_context(stream, ctx, mode, None, tx); + let boxed_stream = Box::pin(recording_stream); + (boxed_stream, rx) +} + +/// Create a recording stream with capacity hint +pub fn record_stream_with_capacity( + stream: Pin>>, + mode: RecordingMode, + capacity: usize, +) -> ( + Pin>>, + oneshot::Receiver>, +) { + let (tx, rx) = oneshot::channel(); + let recording_stream = + RecordingStream::from_async_engine_stream(stream, mode, Some(capacity), tx); + let boxed_stream = Box::pin(recording_stream); + (boxed_stream, rx) +} + +/// Create a recording stream with capacity hint from request +pub fn record_stream_with_request_hint( + stream: Pin>>, + mode: RecordingMode, + request: &Req, +) -> ( + Pin>>, + oneshot::Receiver>, +) { + let capacity = request.estimated_response_count(); + match capacity { + Some(cap) => record_stream_with_capacity(stream, mode, cap), + None => record_stream(stream, mode), + } +} + +/// Create a recording stream from a raw stream and context with capacity hint +pub fn record_stream_with_context_and_capacity( + stream: Pin + Send + Sync>>, + ctx: Arc, + mode: RecordingMode, + capacity: usize, +) -> ( + Pin>>, + oneshot::Receiver>, +) { + let (tx, rx) = oneshot::channel(); + let recording_stream = + RecordingStream::from_stream_and_context(stream, ctx, mode, Some(capacity), tx); + let boxed_stream = Box::pin(recording_stream); + (boxed_stream, rx) +} + +/// Create a recording stream from ResponseStream (convenience wrapper) +pub fn record_response_stream( + response_stream: Pin>>, + mode: RecordingMode, +) -> ( + Pin>>, + oneshot::Receiver>, +) { + record_stream(response_stream, mode) +} + +/// Serializable performance metrics extracted from recorded streams +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StreamPerformanceMetrics { + /// Total number of responses + pub response_count: usize, + /// Total stream duration in milliseconds + pub total_duration_ms: u64, + /// Average time between responses in milliseconds + pub avg_inter_response_time_ms: Option, + /// Minimum time between responses in milliseconds + pub min_inter_response_time_ms: Option, + /// Maximum time between responses in milliseconds + pub max_inter_response_time_ms: Option, + /// Time to first response in milliseconds + pub time_to_first_response_ms: Option, + /// Responses per second + pub responses_per_second: Option, +} + +impl From<&RecordedStream> for StreamPerformanceMetrics { + fn from(recorded: &RecordedStream) -> Self { + let inter_times = recorded.inter_response_times(); + + let avg_inter_response_time_ms = recorded + .average_inter_response_time() + .map(|d| d.as_millis() as u64); + + let min_inter_response_time_ms = inter_times.iter().min().map(|d| d.as_millis() as u64); + + let max_inter_response_time_ms = inter_times.iter().max().map(|d| d.as_millis() as u64); + + let time_to_first_response_ms = recorded + .responses + .first() + .map(|r| r.elapsed_since(recorded.start_time).as_millis() as u64); + + let responses_per_second = if recorded.total_duration.as_secs_f64() > 0.0 { + Some(recorded.response_count() as f64 / recorded.total_duration.as_secs_f64()) + } else { + None + }; + + Self { + response_count: recorded.response_count(), + total_duration_ms: recorded.total_duration.as_millis() as u64, + avg_inter_response_time_ms, + min_inter_response_time_ms, + max_inter_response_time_ms, + time_to_first_response_ms, + responses_per_second, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use dynamo_runtime::engine::ResponseStream; + use futures::stream; + use std::time::Duration; + + #[test] + fn test_timestamped_response_creation() { + let response = "test response"; + let timestamped = TimestampedResponse::new(response, 0); + + assert_eq!(timestamped.response, response); + assert_eq!(timestamped.sequence_number, 0); + assert_eq!(timestamped.data(), &response); + } + + #[test] + fn test_recorded_stream_analysis() { + let start_time = Instant::now(); + + // Create mock responses with known timing + let responses = vec![ + TimestampedResponse { + response: "response1", + timestamp: start_time, + sequence_number: 0, + }, + TimestampedResponse { + response: "response2", + timestamp: start_time + Duration::from_millis(100), + sequence_number: 1, + }, + TimestampedResponse { + response: "response3", + timestamp: start_time + Duration::from_millis(250), + sequence_number: 2, + }, + ]; + + let end_time = start_time + Duration::from_millis(250); + let recorded = RecordedStream::new(responses, start_time, end_time); + + assert_eq!(recorded.response_count(), 3); + assert_eq!(recorded.total_duration(), Duration::from_millis(250)); + + let inter_times = recorded.inter_response_times(); + assert_eq!(inter_times.len(), 2); + assert_eq!(inter_times[0], Duration::from_millis(100)); + assert_eq!(inter_times[1], Duration::from_millis(150)); + + // Test average calculation + let avg = recorded.average_inter_response_time().unwrap(); + assert_eq!(avg, Duration::from_millis(125)); // (250ms / 2 intervals) + } + + #[test] + fn test_performance_metrics_conversion() { + let start_time = Instant::now(); + let responses = vec![ + TimestampedResponse { + response: "test", + timestamp: start_time + Duration::from_millis(50), + sequence_number: 0, + }, + TimestampedResponse { + response: "test", + timestamp: start_time + Duration::from_millis(150), + sequence_number: 1, + }, + ]; + + let end_time = start_time + Duration::from_millis(150); + let recorded = RecordedStream::new(responses, start_time, end_time); + let metrics = StreamPerformanceMetrics::from(&recorded); + + assert_eq!(metrics.response_count, 2); + assert_eq!(metrics.total_duration_ms, 150); + assert_eq!(metrics.time_to_first_response_ms, Some(50)); + assert_eq!(metrics.min_inter_response_time_ms, Some(100)); + assert_eq!(metrics.max_inter_response_time_ms, Some(100)); + } + + #[tokio::test] + async fn test_recording_stream_scan_mode() { + use futures::StreamExt; + + // Create a simple test stream + let test_data = vec!["token1", "token2", "token3"]; + let base_stream = stream::iter(test_data.clone()); + + // Create a mock context for the stream + let ctx = Arc::new(MockContext::new()); + + // Record the stream in scan mode using the simplified API + let (recorded_stream, recording_rx) = + record_stream_with_context(Box::pin(base_stream), ctx, RecordingMode::Scan); + + // Consume the stream normally (pass-through mode) + let collected_responses: Vec<_> = recorded_stream.collect().await; + + // Verify the responses passed through unchanged + assert_eq!(collected_responses, test_data); + + // Get the recorded data + let recorded = recording_rx.await.unwrap(); + assert_eq!(recorded.response_count(), 3); + assert_eq!(recorded.responses[0].response, "token1"); + assert_eq!(recorded.responses[1].response, "token2"); + assert_eq!(recorded.responses[2].response, "token3"); + + // Verify timing was recorded + assert!(recorded.total_duration() > Duration::from_nanos(0)); + } + + #[tokio::test] + async fn test_recording_stream_sink_mode() { + use futures::StreamExt; + + // Create a simple test stream + let test_data = vec!["token1", "token2", "token3"]; + let base_stream = stream::iter(test_data.clone()); + + // Create a mock context for the stream + let ctx = Arc::new(MockContext::new()); + + // Record the stream in sink mode using the simplified API + let (recorded_stream, recording_rx) = + record_stream_with_context(Box::pin(base_stream), ctx, RecordingMode::Sink); + + // In sink mode, the stream should complete without emitting items + let collected_responses: Vec<_> = recorded_stream.collect().await; + assert_eq!(collected_responses, Vec::<&str>::new()); + + // Get the recorded data - should contain all original items + let recorded = recording_rx.await.unwrap(); + assert_eq!(recorded.response_count(), 3); + assert_eq!(recorded.responses[0].response, "token1"); + assert_eq!(recorded.responses[1].response, "token2"); + assert_eq!(recorded.responses[2].response, "token3"); + + // Verify timing was recorded + assert!(recorded.total_duration() > Duration::from_nanos(0)); + } + + #[tokio::test] + async fn test_recording_stream_from_response_stream() { + use futures::StreamExt; + + // Create a simple test stream + let test_data = vec!["token1", "token2", "token3"]; + let base_stream = stream::iter(test_data.clone()); + + // Create a ResponseStream (the traditional way) + let ctx = Arc::new(MockContext::new()); + let response_stream = ResponseStream::new(Box::pin(base_stream), ctx); + + // Use the convenience API for ResponseStream + let (recorded_stream, recording_rx) = + record_response_stream(response_stream, RecordingMode::Scan); + + // Consume the stream normally (pass-through mode) + let collected_responses: Vec<_> = recorded_stream.collect().await; + + // Verify the responses passed through unchanged + assert_eq!(collected_responses, test_data); + + // Get the recorded data + let recorded = recording_rx.await.unwrap(); + assert_eq!(recorded.response_count(), 3); + assert_eq!(recorded.responses[0].response, "token1"); + assert_eq!(recorded.responses[1].response, "token2"); + assert_eq!(recorded.responses[2].response, "token3"); + + // Verify timing was recorded + assert!(recorded.total_duration() > Duration::from_nanos(0)); + } + + #[test] + fn test_recording_stream_from_implementations() { + // Test the From implementations work (without async execution) + let test_data = vec!["token1", "token2"]; + let base_stream = stream::iter(test_data); + let ctx = Arc::new(MockContext::new()); + let response_stream = ResponseStream::new(Box::pin(base_stream), ctx); + + // Test From>>> + let _recording_stream: RecordingStream<&str> = response_stream.into(); + + // The From implementation should work without panicking + // (We can't easily test the actual execution without async setup) + } + + // Mock context for testing + #[derive(Debug)] + struct MockContext { + id: String, + } + + impl MockContext { + fn new() -> Self { + Self { + id: "test-context".to_string(), + } + } + } + + #[async_trait::async_trait] + impl AsyncEngineContext for MockContext { + fn id(&self) -> &str { + &self.id + } + + fn stop(&self) { + // No-op for testing + } + + fn stop_generating(&self) { + // No-op for testing + } + + fn kill(&self) { + // No-op for testing + } + + fn is_stopped(&self) -> bool { + false + } + + fn is_killed(&self) -> bool { + false + } + + async fn stopped(&self) { + // No-op for testing + } + + async fn killed(&self) { + // No-op for testing + } + } +} diff --git a/lib/llm/tests/http-service.rs b/lib/llm/tests/http-service.rs index 1a9b850277..076711a4fb 100644 --- a/lib/llm/tests/http-service.rs +++ b/lib/llm/tests/http-service.rs @@ -14,12 +14,18 @@ // limitations under the License. use anyhow::Error; +use async_openai::config::OpenAIConfig; use async_stream::stream; -use dynamo_llm::http::service::{ - error::HttpError, - metrics::{Endpoint, RequestType, Status}, - service_v2::HttpService, - Metrics, +use dynamo_llm::http::{ + client::{ + GenericBYOTClient, HttpClientConfig, HttpRequestContext, NvCustomClient, PureOpenAIClient, + }, + service::{ + error::HttpError, + metrics::{Endpoint, RequestType, Status}, + service_v2::HttpService, + Metrics, + }, }; use dynamo_llm::protocols::{ openai::{ @@ -29,13 +35,16 @@ use dynamo_llm::protocols::{ Annotated, }; use dynamo_runtime::{ + engine::AsyncEngineContext, pipeline::{ async_trait, AsyncEngine, AsyncEngineContextProvider, ManyOut, ResponseStream, SingleIn, }, CancellationToken, }; +use futures::StreamExt; use prometheus::{proto::MetricType, Registry}; use reqwest::StatusCode; +use rstest::*; use std::sync::Arc; struct CounterEngine {} @@ -470,3 +479,389 @@ async fn test_http_service() { cancel_token.cancel(); task.await.unwrap().unwrap(); } + +// === HTTP Client Tests === + +#[fixture] +fn service_with_engines( + #[default(8990)] port: u16, +) -> (HttpService, Arc, Arc) { + let service = HttpService::builder().port(port).build().unwrap(); + let manager = service.model_manager(); + + let counter = Arc::new(CounterEngine {}); + let failure = Arc::new(AlwaysFailEngine {}); + + manager + .add_chat_completions_model("foo", counter.clone()) + .unwrap(); + manager + .add_chat_completions_model("bar", failure.clone()) + .unwrap(); + manager + .add_completions_model("bar", failure.clone()) + .unwrap(); + + (service, counter, failure) +} + +#[fixture] +fn pure_openai_client(#[default(8990)] port: u16) -> PureOpenAIClient { + let config = HttpClientConfig { + openai_config: OpenAIConfig::new().with_api_base(&format!("http://localhost:{}/v1", port)), + verbose: false, + }; + PureOpenAIClient::new(config) +} + +#[fixture] +fn nv_custom_client(#[default(8991)] port: u16) -> NvCustomClient { + let config = HttpClientConfig { + openai_config: OpenAIConfig::new().with_api_base(&format!("http://localhost:{}/v1", port)), + verbose: false, + }; + NvCustomClient::new(config) +} + +#[fixture] +fn generic_byot_client(#[default(8992)] port: u16) -> GenericBYOTClient { + let config = HttpClientConfig { + openai_config: OpenAIConfig::new().with_api_base(&format!("http://localhost:{}/v1", port)), + verbose: false, + }; + GenericBYOTClient::new(config) +} + +#[rstest] +#[tokio::test] +async fn test_pure_openai_client( + #[with(8990)] service_with_engines: (HttpService, Arc, Arc), + #[with(8990)] pure_openai_client: PureOpenAIClient, +) { + let (service, _counter, _failure) = service_with_engines; + let token = CancellationToken::new(); + let cancel_token = token.clone(); + + // Start the service + let task = tokio::spawn(async move { service.run(token).await }); + + // Give the service time to start + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + // Test successful streaming request + let request = async_openai::types::CreateChatCompletionRequestArgs::default() + .model("foo") + .messages(vec![ + async_openai::types::ChatCompletionRequestMessage::User( + async_openai::types::ChatCompletionRequestUserMessage { + content: async_openai::types::ChatCompletionRequestUserMessageContent::Text( + "Hi".to_string(), + ), + name: None, + }, + ), + ]) + .stream(true) + .max_tokens(50u32) + .build() + .unwrap(); + + let result = pure_openai_client.chat_stream(request).await; + assert!(result.is_ok(), "PureOpenAI client should succeed"); + + let (mut stream, _context) = result.unwrap().dissolve(); + let mut count = 0; + while let Some(response) = stream.next().await { + count += 1; + assert!(response.is_ok(), "Response should be ok"); + if count >= 3 { + break; // Don't consume entire stream + } + } + assert!(count > 0, "Should receive at least one response"); + + // Test error case with invalid model + let request = async_openai::types::CreateChatCompletionRequestArgs::default() + .model("bar") // This model will fail + .messages(vec![ + async_openai::types::ChatCompletionRequestMessage::User( + async_openai::types::ChatCompletionRequestUserMessage { + content: async_openai::types::ChatCompletionRequestUserMessageContent::Text( + "Hi".to_string(), + ), + name: None, + }, + ), + ]) + .stream(true) + .max_tokens(50u32) + .build() + .unwrap(); + + let result = pure_openai_client.chat_stream(request).await; + assert!( + result.is_ok(), + "Client should return stream even for failing model" + ); + + let (mut stream, _context) = result.unwrap().dissolve(); + if let Some(response) = stream.next().await { + assert!( + response.is_err(), + "Response should be error for failing model" + ); + } + + // Test context management + let ctx = HttpRequestContext::new(); + let request = async_openai::types::CreateChatCompletionRequestArgs::default() + .model("foo") + .messages(vec![ + async_openai::types::ChatCompletionRequestMessage::User( + async_openai::types::ChatCompletionRequestUserMessage { + content: async_openai::types::ChatCompletionRequestUserMessageContent::Text( + "Hi".to_string(), + ), + name: None, + }, + ), + ]) + .stream(true) + .max_tokens(50u32) + .build() + .unwrap(); + + let result = pure_openai_client + .chat_stream_with_context(request, ctx.clone()) + .await; + assert!(result.is_ok(), "Context-based request should succeed"); + + let (_stream, context) = result.unwrap().dissolve(); + assert_eq!(context.id(), ctx.id(), "Context ID should match"); + + cancel_token.cancel(); + task.await.unwrap().unwrap(); +} + +#[rstest] +#[tokio::test] +async fn test_nv_custom_client( + #[with(8991)] service_with_engines: (HttpService, Arc, Arc), + #[with(8991)] nv_custom_client: NvCustomClient, +) { + let (service, _counter, _failure) = service_with_engines; + let token = CancellationToken::new(); + let cancel_token = token.clone(); + + // Start the service + let task = tokio::spawn(async move { service.run(token).await }); + + // Give the service time to start + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + // Test successful streaming request + let inner_request = async_openai::types::CreateChatCompletionRequestArgs::default() + .model("foo") + .messages(vec![ + async_openai::types::ChatCompletionRequestMessage::User( + async_openai::types::ChatCompletionRequestUserMessage { + content: async_openai::types::ChatCompletionRequestUserMessageContent::Text( + "Hi".to_string(), + ), + name: None, + }, + ), + ]) + .stream(true) + .max_tokens(50u32) + .build() + .unwrap(); + + let request = NvCreateChatCompletionRequest { + inner: inner_request, + nvext: None, + }; + + let result = nv_custom_client.chat_stream(request).await; + assert!(result.is_ok(), "NvCustom client should succeed"); + + let (mut stream, _context) = result.unwrap().dissolve(); + let mut count = 0; + while let Some(response) = stream.next().await { + count += 1; + assert!(response.is_ok(), "Response should be ok"); + if count >= 3 { + break; // Don't consume entire stream + } + } + assert!(count > 0, "Should receive at least one response"); + + // Test error case with invalid model + let inner_request = async_openai::types::CreateChatCompletionRequestArgs::default() + .model("bar") // This model will fail + .messages(vec![ + async_openai::types::ChatCompletionRequestMessage::User( + async_openai::types::ChatCompletionRequestUserMessage { + content: async_openai::types::ChatCompletionRequestUserMessageContent::Text( + "Hi".to_string(), + ), + name: None, + }, + ), + ]) + .stream(true) + .max_tokens(50u32) + .build() + .unwrap(); + + let request = NvCreateChatCompletionRequest { + inner: inner_request, + nvext: None, + }; + + let result = nv_custom_client.chat_stream(request).await; + assert!( + result.is_ok(), + "Client should return stream even for failing model" + ); + + let (mut stream, _context) = result.unwrap().dissolve(); + if let Some(response) = stream.next().await { + assert!( + response.is_err(), + "Response should be error for failing model" + ); + } + + // Test context management + let ctx = HttpRequestContext::new(); + let inner_request = async_openai::types::CreateChatCompletionRequestArgs::default() + .model("foo") + .messages(vec![ + async_openai::types::ChatCompletionRequestMessage::User( + async_openai::types::ChatCompletionRequestUserMessage { + content: async_openai::types::ChatCompletionRequestUserMessageContent::Text( + "Hi".to_string(), + ), + name: None, + }, + ), + ]) + .stream(true) + .max_tokens(50u32) + .build() + .unwrap(); + + let request = NvCreateChatCompletionRequest { + inner: inner_request, + nvext: None, + }; + + let result = nv_custom_client + .chat_stream_with_context(request, ctx.clone()) + .await; + assert!(result.is_ok(), "Context-based request should succeed"); + + let (_stream, context) = result.unwrap().dissolve(); + assert_eq!(context.id(), ctx.id(), "Context ID should match"); + + cancel_token.cancel(); + task.await.unwrap().unwrap(); +} + +#[rstest] +#[tokio::test] +async fn test_generic_byot_client( + #[with(8992)] service_with_engines: (HttpService, Arc, Arc), + #[with(8992)] generic_byot_client: GenericBYOTClient, +) { + let (service, _counter, _failure) = service_with_engines; + let token = CancellationToken::new(); + let cancel_token = token.clone(); + + // Start the service + let task = tokio::spawn(async move { service.run(token).await }); + + // Give the service time to start + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + // Test successful streaming request + let request = serde_json::json!({ + "model": "foo", + "messages": [ + { + "role": "user", + "content": "Hi" + } + ], + "stream": true, + "max_tokens": 50 + }); + + let result = generic_byot_client.chat_stream(request).await; + assert!(result.is_ok(), "GenericBYOT client should succeed"); + + let (mut stream, _context) = result.unwrap().dissolve(); + let mut count = 0; + while let Some(response) = stream.next().await { + println!("Response: {:?}", response); + count += 1; + assert!(response.is_ok(), "Response should be ok"); + if count >= 3 { + break; // Don't consume entire stream + } + } + assert!(count > 0, "Should receive at least one response"); + + // Test error case with invalid model + let request = serde_json::json!({ + "model": "bar", // This model will fail + "messages": [ + { + "role": "user", + "content": "Hi" + } + ], + "stream": true, + "max_tokens": 50 + }); + + let result = generic_byot_client.chat_stream(request).await; + assert!( + result.is_ok(), + "Client should return stream even for failing model" + ); + + let (mut stream, _context) = result.unwrap().dissolve(); + if let Some(response) = stream.next().await { + assert!( + response.is_err(), + "Response should be error for failing model" + ); + } + + // Test context management + let ctx = HttpRequestContext::new(); + let request = serde_json::json!({ + "model": "foo", + "messages": [ + { + "role": "user", + "content": "Hi" + } + ], + "stream": true, + "max_tokens": 50 + }); + + let result = generic_byot_client + .chat_stream_with_context(request, ctx.clone()) + .await; + assert!(result.is_ok(), "Context-based request should succeed"); + + let (_stream, context) = result.unwrap().dissolve(); + assert_eq!(context.id(), ctx.id(), "Context ID should match"); + + cancel_token.cancel(); + task.await.unwrap().unwrap(); +} diff --git a/lib/runtime/src/engine.rs b/lib/runtime/src/engine.rs index e3096532ec..df0da9a809 100644 --- a/lib/runtime/src/engine.rs +++ b/lib/runtime/src/engine.rs @@ -165,7 +165,7 @@ pub trait AsyncEngineContext: Send + Sync + Debug { /// /// This trait is implemented by both unary and streaming engine results, allowing /// uniform access to context information regardless of the operation type. -pub trait AsyncEngineContextProvider: Send + Sync + Debug { +pub trait AsyncEngineContextProvider: Send + Debug { fn context(&self) -> Arc; } From 9c34eb7048c982b5ed58fcdcecaf6538a0b18505 Mon Sep 17 00:00:00 2001 From: Ryan Olson Date: Mon, 14 Jul 2025 18:55:46 +0000 Subject: [PATCH 02/12] adding byot feature --- lib/llm/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/llm/Cargo.toml b/lib/llm/Cargo.toml index 68085fbfb4..a0d5e71a2f 100644 --- a/lib/llm/Cargo.toml +++ b/lib/llm/Cargo.toml @@ -43,7 +43,7 @@ dynamo-runtime = { workspace = true } # workspace anyhow = { workspace = true } async-openai = { workspace = true } -async-stream = { workspace = true } +async-stream = { workspace = true, features = ["rustls", "byot"] } async-trait = { workspace = true } async-nats = { workspace = true } async_zmq = { workspace = true } From 81cbd9cba6288d38780e724bb0ff689256cc08c6 Mon Sep 17 00:00:00 2001 From: Ryan Olson Date: Mon, 14 Jul 2025 18:57:31 +0000 Subject: [PATCH 03/12] adding missing files --- lib/llm/src/http/client.rs | 712 +++++++++++++++++++++++++++++++++++++ lib/llm/src/perf.rs | 3 + 2 files changed, 715 insertions(+) create mode 100644 lib/llm/src/http/client.rs diff --git a/lib/llm/src/http/client.rs b/lib/llm/src/http/client.rs new file mode 100644 index 0000000000..6da24b9a5a --- /dev/null +++ b/lib/llm/src/http/client.rs @@ -0,0 +1,712 @@ +// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! HTTP clients for streaming LLM responses with performance recording +//! +//! This module provides HTTP clients that leverage async-openai with BYOT (Bring Your Own Types) +//! feature to work with OpenAI-compatible APIs. The clients support recording streaming responses +//! for performance analysis. + +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; +use std::time::Instant; + +use async_openai::{config::OpenAIConfig, error::OpenAIError, Client}; +use async_trait::async_trait; +use derive_getters::Dissolve; +use futures::Stream; +use serde_json::Value; +use tokio_util::sync::CancellationToken; +use tracing; +use uuid::Uuid; + +// Import our existing recording infrastructure +use crate::protocols::openai::chat_completions::{ + NvCreateChatCompletionRequest, NvCreateChatCompletionStreamResponse, +}; +use crate::protocols::Annotated; +use dynamo_runtime::engine::{ + AsyncEngineContext, AsyncEngineContextProvider, AsyncEngineStream, Data, +}; + +/// Configuration for HTTP clients +#[derive(Clone, Default)] +pub struct HttpClientConfig { + /// OpenAI API configuration + pub openai_config: OpenAIConfig, + /// Whether to enable detailed logging + pub verbose: bool, +} + +/// Error types for HTTP clients +#[derive(Debug, thiserror::Error)] +pub enum HttpClientError { + #[error("OpenAI API error: {0}")] + OpenAI(#[from] OpenAIError), + #[error("Request timeout")] + Timeout, + #[error("Request cancelled")] + Cancelled, + #[error("Invalid request: {0}")] + InvalidRequest(String), +} + +/// Context for HTTP client requests that supports cancellation +/// This bridges AsyncEngineContext and reqwest cancellation +#[derive(Clone)] +pub struct HttpRequestContext { + /// Unique request identifier + id: String, + /// Tokio cancellation token for reqwest integration + cancel_token: CancellationToken, + /// When this context was created + created_at: Instant, + /// Whether the request has been stopped + stopped: Arc, +} + +impl HttpRequestContext { + /// Create a new HTTP request context + pub fn new() -> Self { + Self { + id: Uuid::new_v4().to_string(), + cancel_token: CancellationToken::new(), + created_at: Instant::now(), + stopped: Arc::new(std::sync::atomic::AtomicBool::new(false)), + } + } + + /// Create a new context with a specific ID + pub fn with_id(id: String) -> Self { + Self { + id, + cancel_token: CancellationToken::new(), + created_at: Instant::now(), + stopped: Arc::new(std::sync::atomic::AtomicBool::new(false)), + } + } + + /// Create a child context from this parent context + /// The child will be cancelled when the parent is cancelled + pub fn child(&self) -> Self { + Self { + id: Uuid::new_v4().to_string(), + cancel_token: self.cancel_token.child_token(), + created_at: Instant::now(), + stopped: Arc::new(std::sync::atomic::AtomicBool::new(false)), + } + } + + /// Create a child context with a specific ID + pub fn child_with_id(&self, id: String) -> Self { + Self { + id, + cancel_token: self.cancel_token.child_token(), + created_at: Instant::now(), + stopped: Arc::new(std::sync::atomic::AtomicBool::new(false)), + } + } + + /// Get the cancellation token for use with reqwest + pub fn cancellation_token(&self) -> CancellationToken { + self.cancel_token.clone() + } + + /// Get the elapsed time since context creation + pub fn elapsed(&self) -> std::time::Duration { + self.created_at.elapsed() + } +} + +impl Default for HttpRequestContext { + fn default() -> Self { + Self::new() + } +} + +impl std::fmt::Debug for HttpRequestContext { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("HttpRequestContext") + .field("id", &self.id) + .field("created_at", &self.created_at) + .field("is_stopped", &self.is_stopped()) + .field("is_killed", &self.is_killed()) + .field("is_cancelled", &self.cancel_token.is_cancelled()) + .finish() + } +} + +#[async_trait] +impl AsyncEngineContext for HttpRequestContext { + fn id(&self) -> &str { + &self.id + } + + fn stop(&self) { + self.stopped + .store(true, std::sync::atomic::Ordering::Release); + self.cancel_token.cancel(); + } + + fn stop_generating(&self) { + // For HTTP clients, stop_generating is the same as stop + self.stop(); + } + + fn kill(&self) { + self.stopped + .store(true, std::sync::atomic::Ordering::Release); + self.cancel_token.cancel(); + } + + fn is_stopped(&self) -> bool { + self.stopped.load(std::sync::atomic::Ordering::Acquire) + } + + fn is_killed(&self) -> bool { + self.stopped.load(std::sync::atomic::Ordering::Acquire) + } + + async fn stopped(&self) { + self.cancel_token.cancelled().await; + } + + async fn killed(&self) { + // For HTTP clients, killed is the same as stopped + self.cancel_token.cancelled().await; + } +} + +/// Base HTTP client with common functionality +pub struct BaseHttpClient { + /// async-openai client + client: Client, + /// Client configuration + config: HttpClientConfig, + /// Root context for this client + root_context: HttpRequestContext, +} + +impl BaseHttpClient { + /// Create a new base HTTP client + pub fn new(config: HttpClientConfig) -> Self { + let client = Client::with_config(config.openai_config.clone()); + Self { + client, + config, + root_context: HttpRequestContext::new(), + } + } + + /// Get a reference to the underlying async-openai client + pub fn client(&self) -> &Client { + &self.client + } + + /// Create a new request context as a child of the root context + pub fn create_context(&self) -> HttpRequestContext { + self.root_context.child() + } + + /// Create a new request context with a specific ID as a child of the root context + pub fn create_context_with_id(&self, id: String) -> HttpRequestContext { + self.root_context.child_with_id(id) + } + + /// Get the root context for this client + pub fn root_context(&self) -> &HttpRequestContext { + &self.root_context + } + + /// Check if verbose logging is enabled + pub fn is_verbose(&self) -> bool { + self.config.verbose + } +} + +/// Type alias for NV chat response stream +pub type NvChatResponseStream = Pin< + Box< + dyn Stream, OpenAIError>> + + Send + + Sync, + >, +>; + +/// Type alias for generic BYOT response stream +pub type ByotResponseStream = Pin> + Send + Sync>>; + +/// Type alias for pure OpenAI chat response stream +pub type OpenAIChatResponseStream = Pin< + Box< + dyn Stream< + Item = Result, + > + Send + + Sync, + >, +>; + +/// A wrapped HTTP response stream that combines a stream with its context +/// This provides a unified interface for HTTP client responses +#[derive(Dissolve)] +pub struct HttpResponseStream { + /// The underlying stream of responses + pub stream: Pin + Send>>, + /// The context for this request + pub context: Arc, +} + +impl HttpResponseStream { + /// Create a new HttpResponseStream + pub fn new( + stream: Pin + Send>>, + context: Arc, + ) -> Self { + Self { stream, context } + } +} + +impl Stream for HttpResponseStream { + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.stream).poll_next(cx) + } +} + +impl AsyncEngineContextProvider for HttpResponseStream { + fn context(&self) -> Arc { + self.context.clone() + } +} + +impl HttpResponseStream { + /// Convert this HttpResponseStream to a Pin>> + /// This requires the stream to be Send + Sync, which may not be true for all streams + pub fn into_async_engine_stream(self) -> Pin>> + where + T: 'static, + { + // This will only work if the underlying stream is actually Send + Sync + // For now, we create a wrapper that assumes this + Box::pin(AsyncEngineStreamWrapper { + stream: self.stream, + context: self.context, + }) + } +} + +/// A wrapper that implements AsyncEngineStream for streams that are Send + Sync +struct AsyncEngineStreamWrapper { + stream: Pin + Send>>, + context: Arc, +} + +impl Stream for AsyncEngineStreamWrapper { + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.stream).poll_next(cx) + } +} + +impl AsyncEngineContextProvider for AsyncEngineStreamWrapper { + fn context(&self) -> Arc { + self.context.clone() + } +} + +// This is unsafe because we're claiming the stream is Sync when it might not be +// But this is needed for the AsyncEngineStream trait +unsafe impl Sync for AsyncEngineStreamWrapper {} + +impl AsyncEngineStream for AsyncEngineStreamWrapper {} + +impl std::fmt::Debug for AsyncEngineStreamWrapper { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("AsyncEngineStreamWrapper") + .field("context", &self.context) + .finish() + } +} + +// This is unsafe because we're claiming the stream is Sync when it might not be +// But this is needed for the AsyncEngineStream trait compatibility +unsafe impl Sync for HttpResponseStream {} + +impl std::fmt::Debug for HttpResponseStream { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("HttpResponseStream") + .field("context", &self.context) + .finish() + } +} + +/// Type alias for HttpResponseStream with NV chat completion responses +pub type NvHttpResponseStream = + HttpResponseStream, OpenAIError>>; + +/// Type alias for HttpResponseStream with BYOT responses +pub type ByotHttpResponseStream = HttpResponseStream>; + +/// Type alias for HttpResponseStream with pure OpenAI responses +pub type OpenAIHttpResponseStream = HttpResponseStream< + Result, +>; + +/// Pure OpenAI client using standard async-openai types +pub struct PureOpenAIClient { + base: BaseHttpClient, +} + +impl PureOpenAIClient { + /// Create a new pure OpenAI client + pub fn new(config: HttpClientConfig) -> Self { + Self { + base: BaseHttpClient::new(config), + } + } + + /// Create streaming chat completions using standard OpenAI types + /// Uses a client-managed context + pub async fn chat_stream( + &self, + request: async_openai::types::CreateChatCompletionRequest, + ) -> Result { + let ctx = self.base.create_context(); + self.chat_stream_with_context(request, ctx).await + } + + /// Create streaming chat completions with a custom context + pub async fn chat_stream_with_context( + &self, + request: async_openai::types::CreateChatCompletionRequest, + context: HttpRequestContext, + ) -> Result { + let ctx_arc: Arc = Arc::new(context.clone()); + + if !request.stream.unwrap_or(false) { + return Err(HttpClientError::InvalidRequest( + "chat_stream requires the request to have 'stream': true".to_string(), + )); + } + + if self.base.is_verbose() { + tracing::info!( + "Starting pure OpenAI chat stream for request {}", + context.id() + ); + } + + // Create the stream with cancellation support + let stream = self + .base + .client() + .chat() + .create_stream(request) + .await + .map_err(HttpClientError::OpenAI)?; + + // TODO: In Phase 3, we'll add cancellation integration with reqwest + // For now, return the stream as-is + Ok(HttpResponseStream::new(stream, ctx_arc)) + } +} + +/// NV Custom client using NvCreateChatCompletionRequest with Annotated responses +pub struct NvCustomClient { + base: BaseHttpClient, +} + +impl NvCustomClient { + /// Create a new NV custom client + pub fn new(config: HttpClientConfig) -> Self { + Self { + base: BaseHttpClient::new(config), + } + } + + /// Create streaming chat completions using NV custom types + /// Uses a client-managed context + pub async fn chat_stream( + &self, + request: NvCreateChatCompletionRequest, + ) -> Result { + let ctx = self.base.create_context(); + self.chat_stream_with_context(request, ctx).await + } + + /// Create streaming chat completions with a custom context + pub async fn chat_stream_with_context( + &self, + request: NvCreateChatCompletionRequest, + context: HttpRequestContext, + ) -> Result { + let ctx_arc: Arc = Arc::new(context.clone()); + + if !request.inner.stream.unwrap_or(false) { + return Err(HttpClientError::InvalidRequest( + "chat_stream requires the request to have 'stream': true".to_string(), + )); + } + + if self.base.is_verbose() { + tracing::info!( + "Starting NV custom chat stream for request {}", + context.id() + ); + } + + // Use BYOT feature to send NvCreateChatCompletionRequest + // The stream type is explicitly specified to deserialize directly into Annotated + let stream = self + .base + .client() + .chat() + .create_stream_byot(request) + .await + .map_err(HttpClientError::OpenAI)?; + + Ok(HttpResponseStream::new(stream, ctx_arc)) + } +} + +/// Generic BYOT client using serde_json::Value for maximum flexibility +pub struct GenericBYOTClient { + base: BaseHttpClient, +} + +impl GenericBYOTClient { + /// Create a new generic BYOT client + pub fn new(config: HttpClientConfig) -> Self { + Self { + base: BaseHttpClient::new(config), + } + } + + /// Create streaming chat completions using arbitrary JSON values + /// Uses a client-managed context + pub async fn chat_stream( + &self, + request: Value, + ) -> Result { + let ctx = self.base.create_context(); + self.chat_stream_with_context(request, ctx).await + } + + /// Create streaming chat completions with a custom context + pub async fn chat_stream_with_context( + &self, + request: Value, + context: HttpRequestContext, + ) -> Result { + let ctx_arc: Arc = Arc::new(context.clone()); + + if !request + .get("stream") + .unwrap_or(&Value::Bool(false)) + .as_bool() + .unwrap_or(false) + { + return Err(HttpClientError::InvalidRequest( + "chat_stream requires the request to have 'stream': true".to_string(), + )); + } + + if self.base.is_verbose() { + tracing::info!( + "Starting generic BYOT chat stream for request {}", + context.id() + ); + } + + // Validate that the request has stream: true + if let Some(stream_val) = request.get("stream") { + if !stream_val.as_bool().unwrap_or(false) { + return Err(HttpClientError::InvalidRequest( + "Request must have 'stream': true for streaming".to_string(), + )); + } + } else { + return Err(HttpClientError::InvalidRequest( + "Request must include 'stream' field".to_string(), + )); + } + + // Use BYOT feature with raw JSON + // The stream type is explicitly specified to deserialize directly into serde_json::Value + let stream = self + .base + .client() + .chat() + .create_stream_byot(request) + .await + .map_err(HttpClientError::OpenAI)?; + + Ok(HttpResponseStream::new(stream, ctx_arc)) + } +} + +// TODO: Implement recording integration in Phase 3: +// - Recording wrapper functions +// - Capacity hints from request parameters +// - Integration with existing recording infrastructure + +#[cfg(test)] +mod tests { + use super::*; + use tokio::time::{sleep, Duration}; + + #[tokio::test] + async fn test_http_request_context_creation() { + let ctx = HttpRequestContext::new(); + assert!(!ctx.id().is_empty()); + assert!(!ctx.is_stopped()); + assert!(!ctx.is_killed()); + } + + #[tokio::test] + async fn test_http_request_context_child() { + let parent = HttpRequestContext::new(); + let child = parent.child(); + + // Child should have different ID + assert_ne!(parent.id(), child.id()); + + // Child should not be stopped initially + assert!(!child.is_stopped()); + + // When parent is stopped, child should be cancelled via token + parent.stop(); + assert!(parent.is_stopped()); + assert!(child.cancellation_token().is_cancelled()); + } + + #[tokio::test] + async fn test_http_request_context_child_with_id() { + let parent = HttpRequestContext::new(); + let child_id = "test-child"; + let child = parent.child_with_id(child_id.to_string()); + + assert_eq!(child.id(), child_id); + assert!(!child.is_stopped()); + + // Test hierarchical cancellation + parent.stop(); + assert!(child.cancellation_token().is_cancelled()); + } + + #[tokio::test] + async fn test_http_request_context_cancellation() { + let ctx = HttpRequestContext::new(); + let cancel_token = ctx.cancellation_token(); + + // Test stop functionality + assert!(!ctx.is_stopped()); + ctx.stop(); + assert!(ctx.is_stopped()); + assert!(cancel_token.is_cancelled()); + } + + #[tokio::test] + async fn test_http_request_context_kill() { + let ctx = HttpRequestContext::new(); + + // Test kill functionality + assert!(!ctx.is_killed()); + ctx.kill(); + assert!(ctx.is_killed()); + assert!(ctx.is_stopped()); + } + + #[tokio::test] + async fn test_http_request_context_async_cancellation() { + let ctx = HttpRequestContext::new(); + + // Test async cancellation + let ctx_clone = ctx.clone(); + let task = tokio::spawn(async move { + ctx_clone.stopped().await; + }); + + // Give a moment for the task to start waiting + sleep(Duration::from_millis(10)).await; + + // Cancel the context + ctx.stop(); + + // The task should complete + task.await.unwrap(); + } + + #[test] + fn test_base_http_client_creation() { + let config = HttpClientConfig::default(); + let client = BaseHttpClient::new(config); + assert!(!client.is_verbose()); + + // Test that client has a root context + assert!(!client.root_context().id().is_empty()); + } + + #[test] + fn test_base_http_client_context_creation() { + let config = HttpClientConfig::default(); + let client = BaseHttpClient::new(config); + + // Test creating child contexts + let ctx1 = client.create_context(); + let ctx2 = client.create_context(); + + // Should have different IDs + assert_ne!(ctx1.id(), ctx2.id()); + + // Should be children of root context + client.root_context().stop(); + assert!(ctx1.cancellation_token().is_cancelled()); + assert!(ctx2.cancellation_token().is_cancelled()); + } + + #[test] + fn test_base_http_client_context_with_id() { + let config = HttpClientConfig::default(); + let client = BaseHttpClient::new(config); + + let custom_id = "custom-request-id"; + let ctx = client.create_context_with_id(custom_id.to_string()); + + assert_eq!(ctx.id(), custom_id); + + // Should still be child of root + client.root_context().stop(); + assert!(ctx.cancellation_token().is_cancelled()); + } + + #[test] + fn test_http_client_config_defaults() { + let config = HttpClientConfig::default(); + assert!(!config.verbose); + } + + #[test] + fn test_pure_openai_client_creation() { + let config = HttpClientConfig::default(); + let _client = PureOpenAIClient::new(config); + // If we get here, creation succeeded + } + + #[test] + fn test_nv_custom_client_creation() { + let config = HttpClientConfig::default(); + let _client = NvCustomClient::new(config); + // If we get here, creation succeeded + } + + #[test] + fn test_generic_byot_client_creation() { + let config = HttpClientConfig::default(); + let _client = GenericBYOTClient::new(config); + // If we get here, creation succeeded + } +} diff --git a/lib/llm/src/perf.rs b/lib/llm/src/perf.rs index 168c06ec9a..0ab6ca69dc 100644 --- a/lib/llm/src/perf.rs +++ b/lib/llm/src/perf.rs @@ -1,3 +1,6 @@ +// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + //! Performance recording and analysis for streaming LLM responses //! //! This module provides mechanisms to record streaming responses with minimal overhead From 2d3ac34736d341179567e35ebbe2c848ff777fc8 Mon Sep 17 00:00:00 2001 From: Ryan Olson Date: Mon, 14 Jul 2025 21:24:19 +0000 Subject: [PATCH 04/12] async-openai features added at top-level; removing features from the wrong package --- lib/llm/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/llm/Cargo.toml b/lib/llm/Cargo.toml index a0d5e71a2f..68085fbfb4 100644 --- a/lib/llm/Cargo.toml +++ b/lib/llm/Cargo.toml @@ -43,7 +43,7 @@ dynamo-runtime = { workspace = true } # workspace anyhow = { workspace = true } async-openai = { workspace = true } -async-stream = { workspace = true, features = ["rustls", "byot"] } +async-stream = { workspace = true } async-trait = { workspace = true } async-nats = { workspace = true } async_zmq = { workspace = true } From 8318aac466123dfd61fa5a6be636309723395653 Mon Sep 17 00:00:00 2001 From: Ryan Olson Date: Mon, 14 Jul 2025 21:41:40 +0000 Subject: [PATCH 05/12] removing from impl --- lib/llm/src/perf.rs | 36 ------------------------------------ 1 file changed, 36 deletions(-) diff --git a/lib/llm/src/perf.rs b/lib/llm/src/perf.rs index 0ab6ca69dc..4aa2a29695 100644 --- a/lib/llm/src/perf.rs +++ b/lib/llm/src/perf.rs @@ -197,27 +197,6 @@ impl RecordingStream { } } -impl From>>> for RecordingStream { - fn from(stream: Pin>>) -> Self { - let (tx, _rx) = oneshot::channel(); // Note: receiver is dropped, for convenience - Self::from_async_engine_stream(stream, RecordingMode::Scan, None, tx) - } -} - -impl From> for RecordingStream { - fn from(response_stream: ResponseStream) -> Self { - let (tx, _rx) = oneshot::channel(); // Note: receiver is dropped, for convenience - Self::from_async_engine_stream(Box::pin(response_stream), RecordingMode::Scan, None, tx) - } -} - -impl From>>> for RecordingStream { - fn from(response_stream: Pin>>) -> Self { - let (tx, _rx) = oneshot::channel(); // Note: receiver is dropped, for convenience - Self::from_async_engine_stream(response_stream, RecordingMode::Scan, None, tx) - } -} - impl Stream for RecordingStream { type Item = R; @@ -614,21 +593,6 @@ mod tests { assert!(recorded.total_duration() > Duration::from_nanos(0)); } - #[test] - fn test_recording_stream_from_implementations() { - // Test the From implementations work (without async execution) - let test_data = vec!["token1", "token2"]; - let base_stream = stream::iter(test_data); - let ctx = Arc::new(MockContext::new()); - let response_stream = ResponseStream::new(Box::pin(base_stream), ctx); - - // Test From>>> - let _recording_stream: RecordingStream<&str> = response_stream.into(); - - // The From implementation should work without panicking - // (We can't easily test the actual execution without async setup) - } - // Mock context for testing #[derive(Debug)] struct MockContext { From 16b1c6697211004abdbe81c05bcb5a2fa6a7f5b1 Mon Sep 17 00:00:00 2001 From: Ryan Olson Date: Mon, 14 Jul 2025 21:57:05 +0000 Subject: [PATCH 06/12] updates --- lib/llm/src/perf.rs | 163 +++++++++++++++++++++----------------------- 1 file changed, 77 insertions(+), 86 deletions(-) diff --git a/lib/llm/src/perf.rs b/lib/llm/src/perf.rs index 4aa2a29695..ac41d9627a 100644 --- a/lib/llm/src/perf.rs +++ b/lib/llm/src/perf.rs @@ -15,10 +15,17 @@ use tokio::sync::oneshot; // Import the runtime types we need use dynamo_runtime::engine::{ - AsyncEngineContext, AsyncEngineContextProvider, AsyncEngineStream, Data, ResponseStream, + AsyncEngineContext, AsyncEngineContextProvider, AsyncEngineStream, Data, DataStream, + EngineStream, ResponseStream, }; use std::sync::Arc; +/// Type alias for a receiver of recorded stream data +pub type RecordedStreamReceiver = oneshot::Receiver>; + +/// Type alias for the return type of recording functions +pub type RecordingResult = (EngineStream, RecordedStreamReceiver); + /// A response wrapper that adds timing information with minimal overhead #[derive(Debug, Clone)] pub struct TimestampedResponse { @@ -141,7 +148,7 @@ impl RecordedStream { /// Following the pattern of ResponseStream for AsyncEngine compatibility pub struct RecordingStream { /// The wrapped stream - stream: Pin + Send + Sync>>, + stream: DataStream, /// Context from the original stream ctx: Arc, /// Recording mode @@ -159,7 +166,7 @@ impl Unpin for RecordingStream {} impl RecordingStream { /// Create a new recording stream from a raw stream and context pub fn from_stream_and_context( - stream: Pin + Send + Sync>>, + stream: DataStream, ctx: Arc, mode: RecordingMode, capacity: Option, @@ -182,7 +189,7 @@ impl RecordingStream { /// Create a new recording stream from an AsyncEngineStream (private constructor) fn from_async_engine_stream( - stream: Pin>>, + stream: EngineStream, mode: RecordingMode, capacity: Option, recorded_tx: oneshot::Sender>, @@ -192,7 +199,7 @@ impl RecordingStream { } /// Convert to Pin>> - pub fn into_async_engine_stream(self) -> Pin>> { + pub fn into_async_engine_stream(self) -> EngineStream { Box::pin(self) } } @@ -230,7 +237,9 @@ impl Stream for RecordingStream { this.responses.push(timestamped); // Continue consuming but don't emit - self.poll_next(cx) + // self.poll_next(cx) + cx.waker().wake_by_ref(); + Poll::Pending } } } @@ -273,12 +282,9 @@ impl std::fmt::Debug for RecordingStream { /// Create a recording stream that wraps an AsyncEngineStream /// Returns a pinned stream and a receiver for the recorded data pub fn record_stream( - stream: Pin>>, + stream: EngineStream, mode: RecordingMode, -) -> ( - Pin>>, - oneshot::Receiver>, -) { +) -> RecordingResult { let (tx, rx) = oneshot::channel(); let recording_stream = RecordingStream::from_async_engine_stream(stream, mode, None, tx); let boxed_stream = Box::pin(recording_stream); @@ -288,13 +294,10 @@ pub fn record_stream( /// Create a recording stream from a raw stream and context /// Returns a pinned stream and a receiver for the recorded data pub fn record_stream_with_context( - stream: Pin + Send + Sync>>, + stream: DataStream, ctx: Arc, mode: RecordingMode, -) -> ( - Pin>>, - oneshot::Receiver>, -) { +) -> RecordingResult { let (tx, rx) = oneshot::channel(); let recording_stream = RecordingStream::from_stream_and_context(stream, ctx, mode, None, tx); let boxed_stream = Box::pin(recording_stream); @@ -303,13 +306,10 @@ pub fn record_stream_with_context( /// Create a recording stream with capacity hint pub fn record_stream_with_capacity( - stream: Pin>>, + stream: EngineStream, mode: RecordingMode, capacity: usize, -) -> ( - Pin>>, - oneshot::Receiver>, -) { +) -> RecordingResult { let (tx, rx) = oneshot::channel(); let recording_stream = RecordingStream::from_async_engine_stream(stream, mode, Some(capacity), tx); @@ -319,13 +319,10 @@ pub fn record_stream_with_capacity( /// Create a recording stream with capacity hint from request pub fn record_stream_with_request_hint( - stream: Pin>>, + stream: EngineStream, mode: RecordingMode, request: &Req, -) -> ( - Pin>>, - oneshot::Receiver>, -) { +) -> RecordingResult { let capacity = request.estimated_response_count(); match capacity { Some(cap) => record_stream_with_capacity(stream, mode, cap), @@ -335,14 +332,11 @@ pub fn record_stream_with_request_hint( /// Create a recording stream from a raw stream and context with capacity hint pub fn record_stream_with_context_and_capacity( - stream: Pin + Send + Sync>>, + stream: DataStream, ctx: Arc, mode: RecordingMode, capacity: usize, -) -> ( - Pin>>, - oneshot::Receiver>, -) { +) -> RecordingResult { let (tx, rx) = oneshot::channel(); let recording_stream = RecordingStream::from_stream_and_context(stream, ctx, mode, Some(capacity), tx); @@ -354,73 +348,70 @@ pub fn record_stream_with_context_and_capacity( pub fn record_response_stream( response_stream: Pin>>, mode: RecordingMode, -) -> ( - Pin>>, - oneshot::Receiver>, -) { +) -> RecordingResult { record_stream(response_stream, mode) } -/// Serializable performance metrics extracted from recorded streams -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct StreamPerformanceMetrics { - /// Total number of responses - pub response_count: usize, - /// Total stream duration in milliseconds - pub total_duration_ms: u64, - /// Average time between responses in milliseconds - pub avg_inter_response_time_ms: Option, - /// Minimum time between responses in milliseconds - pub min_inter_response_time_ms: Option, - /// Maximum time between responses in milliseconds - pub max_inter_response_time_ms: Option, - /// Time to first response in milliseconds - pub time_to_first_response_ms: Option, - /// Responses per second - pub responses_per_second: Option, -} +#[cfg(test)] +mod tests { + use super::*; + use dynamo_runtime::engine::ResponseStream; + use futures::stream; + use std::time::Duration; -impl From<&RecordedStream> for StreamPerformanceMetrics { - fn from(recorded: &RecordedStream) -> Self { - let inter_times = recorded.inter_response_times(); + /// Serializable performance metrics extracted from recorded streams + #[derive(Debug, Clone, Serialize, Deserialize)] + pub struct StreamPerformanceMetrics { + /// Total number of responses + pub response_count: usize, + /// Total stream duration in milliseconds + pub total_duration_ms: u64, + /// Average time between responses in milliseconds + pub avg_inter_response_time_ms: Option, + /// Minimum time between responses in milliseconds + pub min_inter_response_time_ms: Option, + /// Maximum time between responses in milliseconds + pub max_inter_response_time_ms: Option, + /// Time to first response in milliseconds + pub time_to_first_response_ms: Option, + /// Responses per second + pub responses_per_second: Option, + } - let avg_inter_response_time_ms = recorded - .average_inter_response_time() - .map(|d| d.as_millis() as u64); + impl From<&RecordedStream> for StreamPerformanceMetrics { + fn from(recorded: &RecordedStream) -> Self { + let inter_times = recorded.inter_response_times(); - let min_inter_response_time_ms = inter_times.iter().min().map(|d| d.as_millis() as u64); + let avg_inter_response_time_ms = recorded + .average_inter_response_time() + .map(|d| d.as_millis() as u64); - let max_inter_response_time_ms = inter_times.iter().max().map(|d| d.as_millis() as u64); + let min_inter_response_time_ms = inter_times.iter().min().map(|d| d.as_millis() as u64); - let time_to_first_response_ms = recorded - .responses - .first() - .map(|r| r.elapsed_since(recorded.start_time).as_millis() as u64); + let max_inter_response_time_ms = inter_times.iter().max().map(|d| d.as_millis() as u64); - let responses_per_second = if recorded.total_duration.as_secs_f64() > 0.0 { - Some(recorded.response_count() as f64 / recorded.total_duration.as_secs_f64()) - } else { - None - }; + let time_to_first_response_ms = recorded + .responses + .first() + .map(|r| r.elapsed_since(recorded.start_time).as_millis() as u64); - Self { - response_count: recorded.response_count(), - total_duration_ms: recorded.total_duration.as_millis() as u64, - avg_inter_response_time_ms, - min_inter_response_time_ms, - max_inter_response_time_ms, - time_to_first_response_ms, - responses_per_second, + let responses_per_second = if recorded.total_duration.as_secs_f64() > 0.0 { + Some(recorded.response_count() as f64 / recorded.total_duration.as_secs_f64()) + } else { + None + }; + + Self { + response_count: recorded.response_count(), + total_duration_ms: recorded.total_duration.as_millis() as u64, + avg_inter_response_time_ms, + min_inter_response_time_ms, + max_inter_response_time_ms, + time_to_first_response_ms, + responses_per_second, + } } } -} - -#[cfg(test)] -mod tests { - use super::*; - use dynamo_runtime::engine::ResponseStream; - use futures::stream; - use std::time::Duration; #[test] fn test_timestamped_response_creation() { From 0c5bcba14b4136925aa6c1c88d36d3f8d4b76206 Mon Sep 17 00:00:00 2001 From: Ryan Olson Date: Mon, 14 Jul 2025 21:57:34 +0000 Subject: [PATCH 07/12] refactor --- lib/llm/src/perf.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/llm/src/perf.rs b/lib/llm/src/perf.rs index ac41d9627a..86d35740ef 100644 --- a/lib/llm/src/perf.rs +++ b/lib/llm/src/perf.rs @@ -7,7 +7,6 @@ //! during collection, then analyze the recorded data for performance insights. use futures::Stream; -use serde::{Deserialize, Serialize}; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; @@ -357,6 +356,7 @@ mod tests { use super::*; use dynamo_runtime::engine::ResponseStream; use futures::stream; + use serde::{Deserialize, Serialize}; use std::time::Duration; /// Serializable performance metrics extracted from recorded streams From 92573f5578e966c69ee4c07bc75a0ebb30401523 Mon Sep 17 00:00:00 2001 From: Ryan Olson Date: Mon, 14 Jul 2025 21:58:26 +0000 Subject: [PATCH 08/12] eliminate warnings --- lib/llm/tests/http-service.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/llm/tests/http-service.rs b/lib/llm/tests/http-service.rs index 076711a4fb..ff84aa02b7 100644 --- a/lib/llm/tests/http-service.rs +++ b/lib/llm/tests/http-service.rs @@ -508,7 +508,7 @@ fn service_with_engines( #[fixture] fn pure_openai_client(#[default(8990)] port: u16) -> PureOpenAIClient { let config = HttpClientConfig { - openai_config: OpenAIConfig::new().with_api_base(&format!("http://localhost:{}/v1", port)), + openai_config: OpenAIConfig::new().with_api_base(format!("http://localhost:{}/v1", port)), verbose: false, }; PureOpenAIClient::new(config) @@ -517,7 +517,7 @@ fn pure_openai_client(#[default(8990)] port: u16) -> PureOpenAIClient { #[fixture] fn nv_custom_client(#[default(8991)] port: u16) -> NvCustomClient { let config = HttpClientConfig { - openai_config: OpenAIConfig::new().with_api_base(&format!("http://localhost:{}/v1", port)), + openai_config: OpenAIConfig::new().with_api_base(format!("http://localhost:{}/v1", port)), verbose: false, }; NvCustomClient::new(config) @@ -526,7 +526,7 @@ fn nv_custom_client(#[default(8991)] port: u16) -> NvCustomClient { #[fixture] fn generic_byot_client(#[default(8992)] port: u16) -> GenericBYOTClient { let config = HttpClientConfig { - openai_config: OpenAIConfig::new().with_api_base(&format!("http://localhost:{}/v1", port)), + openai_config: OpenAIConfig::new().with_api_base(format!("http://localhost:{}/v1", port)), verbose: false, }; GenericBYOTClient::new(config) From 5cda4eddcc4b3d9d69444e78eb8417d342977264 Mon Sep 17 00:00:00 2001 From: Ryan Olson Date: Mon, 14 Jul 2025 22:14:22 +0000 Subject: [PATCH 09/12] remove unsafe --- lib/llm/src/http/client.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/lib/llm/src/http/client.rs b/lib/llm/src/http/client.rs index 6da24b9a5a..530508cea3 100644 --- a/lib/llm/src/http/client.rs +++ b/lib/llm/src/http/client.rs @@ -331,10 +331,6 @@ impl std::fmt::Debug for AsyncEngineStreamWrapper { } } -// This is unsafe because we're claiming the stream is Sync when it might not be -// But this is needed for the AsyncEngineStream trait compatibility -unsafe impl Sync for HttpResponseStream {} - impl std::fmt::Debug for HttpResponseStream { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("HttpResponseStream") From be5c93e875c97808104c22594aa9d3e38cb9df22 Mon Sep 17 00:00:00 2001 From: Ryan Olson Date: Mon, 14 Jul 2025 22:16:16 +0000 Subject: [PATCH 10/12] remove duplicate validation check for generic --- lib/llm/src/http/client.rs | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/lib/llm/src/http/client.rs b/lib/llm/src/http/client.rs index 530508cea3..be6d0a8018 100644 --- a/lib/llm/src/http/client.rs +++ b/lib/llm/src/http/client.rs @@ -499,17 +499,6 @@ impl GenericBYOTClient { ) -> Result { let ctx_arc: Arc = Arc::new(context.clone()); - if !request - .get("stream") - .unwrap_or(&Value::Bool(false)) - .as_bool() - .unwrap_or(false) - { - return Err(HttpClientError::InvalidRequest( - "chat_stream requires the request to have 'stream': true".to_string(), - )); - } - if self.base.is_verbose() { tracing::info!( "Starting generic BYOT chat stream for request {}", From 2c7b70d4ece4ba7cbe08494823467a5381904aaa Mon Sep 17 00:00:00 2001 From: Ryan Olson Date: Mon, 14 Jul 2025 22:22:04 +0000 Subject: [PATCH 11/12] await health ep rather than a fixed time out --- lib/llm/tests/http-service.rs | 27 +++++++++++++++++++++------ 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/lib/llm/tests/http-service.rs b/lib/llm/tests/http-service.rs index ff84aa02b7..c321af30ad 100644 --- a/lib/llm/tests/http-service.rs +++ b/lib/llm/tests/http-service.rs @@ -482,6 +482,21 @@ async fn test_http_service() { // === HTTP Client Tests === +/// Wait for the HTTP service to be ready by checking its health endpoint +async fn wait_for_service_ready(port: u16) { + let start = tokio::time::Instant::now(); + let timeout = tokio::time::Duration::from_secs(5); + loop { + match reqwest::get(&format!("http://localhost:{}/health", port)).await { + Ok(_) => break, + Err(_) if start.elapsed() < timeout => { + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + } + Err(e) => panic!("Service failed to start within timeout: {}", e), + } + } +} + #[fixture] fn service_with_engines( #[default(8990)] port: u16, @@ -545,8 +560,8 @@ async fn test_pure_openai_client( // Start the service let task = tokio::spawn(async move { service.run(token).await }); - // Give the service time to start - tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + // Wait for service to be ready + wait_for_service_ready(8990).await; // Test successful streaming request let request = async_openai::types::CreateChatCompletionRequestArgs::default() @@ -656,8 +671,8 @@ async fn test_nv_custom_client( // Start the service let task = tokio::spawn(async move { service.run(token).await }); - // Give the service time to start - tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + // Wait for service to be ready + wait_for_service_ready(8991).await; // Test successful streaming request let inner_request = async_openai::types::CreateChatCompletionRequestArgs::default() @@ -782,8 +797,8 @@ async fn test_generic_byot_client( // Start the service let task = tokio::spawn(async move { service.run(token).await }); - // Give the service time to start - tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + // Wait for service to be ready + wait_for_service_ready(8992).await; // Test successful streaming request let request = serde_json::json!({ From 813a085eacc46485e98e20923d3f73f8b597f5dc Mon Sep 17 00:00:00 2001 From: Ryan Olson Date: Tue, 15 Jul 2025 02:09:05 +0000 Subject: [PATCH 12/12] reducing scope on recorded stream --- lib/llm/src/perf.rs | 119 +++++++------------------------------------- 1 file changed, 19 insertions(+), 100 deletions(-) diff --git a/lib/llm/src/perf.rs b/lib/llm/src/perf.rs index 86d35740ef..c00fa89d79 100644 --- a/lib/llm/src/perf.rs +++ b/lib/llm/src/perf.rs @@ -76,17 +76,18 @@ pub enum RecordingMode { Sink, } -/// Container for recorded streaming responses with analysis capabilities +/// Container for recorded streaming responses. +/// This forms the core object on which analysis is performed. #[derive(Debug, Clone)] pub struct RecordedStream { /// All recorded responses with timestamps - pub responses: Vec>, - /// Total duration from first to last response - pub total_duration: Duration, + responses: Vec>, + /// When recording started - pub start_time: Instant, + start_time: Instant, + /// When recording ended - pub end_time: Instant, + end_time: Instant, } impl RecordedStream { @@ -96,10 +97,8 @@ impl RecordedStream { start_time: Instant, end_time: Instant, ) -> Self { - let total_duration = end_time.duration_since(start_time); Self { responses, - total_duration, start_time, end_time, } @@ -112,34 +111,22 @@ impl RecordedStream { /// Get the total duration of the stream pub fn total_duration(&self) -> Duration { - self.total_duration + self.end_time.duration_since(self.start_time) } - /// Calculate the average time between responses - pub fn average_inter_response_time(&self) -> Option { - if self.responses.len() < 2 { - return None; - } - - let total_time = self.total_duration; - let intervals = self.responses.len() - 1; - Some(total_time / intervals as u32) + /// Get the responses recorded + pub fn responses(&self) -> &[TimestampedResponse] { + &self.responses } - /// Get inter-response timings (time between consecutive responses) - pub fn inter_response_times(&self) -> Vec { - self.responses - .windows(2) - .map(|pair| pair[1].timestamp.duration_since(pair[0].timestamp)) - .collect() + /// Get the start time of the stream + pub fn start_time(&self) -> &Instant { + &self.start_time } - /// Get response timings relative to stream start - pub fn response_timings(&self) -> Vec { - self.responses - .iter() - .map(|r| r.elapsed_since(self.start_time)) - .collect() + /// Get the end time of the stream + pub fn end_time(&self) -> &Instant { + &self.end_time } } @@ -356,63 +343,8 @@ mod tests { use super::*; use dynamo_runtime::engine::ResponseStream; use futures::stream; - use serde::{Deserialize, Serialize}; use std::time::Duration; - /// Serializable performance metrics extracted from recorded streams - #[derive(Debug, Clone, Serialize, Deserialize)] - pub struct StreamPerformanceMetrics { - /// Total number of responses - pub response_count: usize, - /// Total stream duration in milliseconds - pub total_duration_ms: u64, - /// Average time between responses in milliseconds - pub avg_inter_response_time_ms: Option, - /// Minimum time between responses in milliseconds - pub min_inter_response_time_ms: Option, - /// Maximum time between responses in milliseconds - pub max_inter_response_time_ms: Option, - /// Time to first response in milliseconds - pub time_to_first_response_ms: Option, - /// Responses per second - pub responses_per_second: Option, - } - - impl From<&RecordedStream> for StreamPerformanceMetrics { - fn from(recorded: &RecordedStream) -> Self { - let inter_times = recorded.inter_response_times(); - - let avg_inter_response_time_ms = recorded - .average_inter_response_time() - .map(|d| d.as_millis() as u64); - - let min_inter_response_time_ms = inter_times.iter().min().map(|d| d.as_millis() as u64); - - let max_inter_response_time_ms = inter_times.iter().max().map(|d| d.as_millis() as u64); - - let time_to_first_response_ms = recorded - .responses - .first() - .map(|r| r.elapsed_since(recorded.start_time).as_millis() as u64); - - let responses_per_second = if recorded.total_duration.as_secs_f64() > 0.0 { - Some(recorded.response_count() as f64 / recorded.total_duration.as_secs_f64()) - } else { - None - }; - - Self { - response_count: recorded.response_count(), - total_duration_ms: recorded.total_duration.as_millis() as u64, - avg_inter_response_time_ms, - min_inter_response_time_ms, - max_inter_response_time_ms, - time_to_first_response_ms, - responses_per_second, - } - } - } - #[test] fn test_timestamped_response_creation() { let response = "test response"; @@ -451,15 +383,6 @@ mod tests { assert_eq!(recorded.response_count(), 3); assert_eq!(recorded.total_duration(), Duration::from_millis(250)); - - let inter_times = recorded.inter_response_times(); - assert_eq!(inter_times.len(), 2); - assert_eq!(inter_times[0], Duration::from_millis(100)); - assert_eq!(inter_times[1], Duration::from_millis(150)); - - // Test average calculation - let avg = recorded.average_inter_response_time().unwrap(); - assert_eq!(avg, Duration::from_millis(125)); // (250ms / 2 intervals) } #[test] @@ -480,13 +403,9 @@ mod tests { let end_time = start_time + Duration::from_millis(150); let recorded = RecordedStream::new(responses, start_time, end_time); - let metrics = StreamPerformanceMetrics::from(&recorded); - assert_eq!(metrics.response_count, 2); - assert_eq!(metrics.total_duration_ms, 150); - assert_eq!(metrics.time_to_first_response_ms, Some(50)); - assert_eq!(metrics.min_inter_response_time_ms, Some(100)); - assert_eq!(metrics.max_inter_response_time_ms, Some(100)); + assert_eq!(recorded.response_count(), 2); + assert_eq!(recorded.total_duration(), Duration::from_millis(150)); } #[tokio::test]