Skip to content

Commit 9e63c00

Browse files
committed
big change: dropping Sync requirement from Resp in AsyncEngine<Req, Resp, Error>
1 parent 6ed716d commit 9e63c00

File tree

16 files changed

+102
-212
lines changed

16 files changed

+102
-212
lines changed

lib/llm/src/http/client.rs

Lines changed: 12 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use crate::protocols::openai::chat_completions::{
2727
};
2828
use crate::protocols::Annotated;
2929
use dynamo_runtime::engine::{
30-
AsyncEngineContext, AsyncEngineContextProvider, AsyncEngineStream, Data,
30+
AsyncEngineContext, AsyncEngineContextProvider, AsyncEngineStream, Data, DataStream,
3131
};
3232

3333
/// Configuration for HTTP clients
@@ -226,43 +226,29 @@ impl BaseHttpClient {
226226
}
227227

228228
/// Type alias for NV chat response stream
229-
pub type NvChatResponseStream = Pin<
230-
Box<
231-
dyn Stream<Item = Result<Annotated<NvCreateChatCompletionStreamResponse>, OpenAIError>>
232-
+ Send
233-
+ Sync,
234-
>,
235-
>;
229+
pub type NvChatResponseStream =
230+
DataStream<Result<Annotated<NvCreateChatCompletionStreamResponse>, OpenAIError>>;
236231

237232
/// Type alias for generic BYOT response stream
238-
pub type ByotResponseStream = Pin<Box<dyn Stream<Item = Result<Value, OpenAIError>> + Send + Sync>>;
233+
pub type ByotResponseStream = DataStream<Result<Value, OpenAIError>>;
239234

240235
/// Type alias for pure OpenAI chat response stream
241-
pub type OpenAIChatResponseStream = Pin<
242-
Box<
243-
dyn Stream<
244-
Item = Result<async_openai::types::CreateChatCompletionStreamResponse, OpenAIError>,
245-
> + Send
246-
+ Sync,
247-
>,
248-
>;
236+
pub type OpenAIChatResponseStream =
237+
DataStream<Result<async_openai::types::CreateChatCompletionStreamResponse, OpenAIError>>;
249238

250239
/// A wrapped HTTP response stream that combines a stream with its context
251240
/// This provides a unified interface for HTTP client responses
252241
#[derive(Dissolve)]
253242
pub struct HttpResponseStream<T> {
254243
/// The underlying stream of responses
255-
pub stream: Pin<Box<dyn Stream<Item = T> + Send>>,
244+
pub stream: DataStream<T>,
256245
/// The context for this request
257246
pub context: Arc<dyn AsyncEngineContext>,
258247
}
259248

260249
impl<T> HttpResponseStream<T> {
261250
/// Create a new HttpResponseStream
262-
pub fn new(
263-
stream: Pin<Box<dyn Stream<Item = T> + Send>>,
264-
context: Arc<dyn AsyncEngineContext>,
265-
) -> Self {
251+
pub fn new(stream: DataStream<T>, context: Arc<dyn AsyncEngineContext>) -> Self {
266252
Self { stream, context }
267253
}
268254
}
@@ -299,7 +285,7 @@ impl<T: Data> HttpResponseStream<T> {
299285

300286
/// A wrapper that implements AsyncEngineStream for streams that are Send + Sync
301287
struct AsyncEngineStreamWrapper<T> {
302-
stream: Pin<Box<dyn Stream<Item = T> + Send>>,
288+
stream: DataStream<T>,
303289
context: Arc<dyn AsyncEngineContext>,
304290
}
305291

@@ -317,9 +303,9 @@ impl<T: Data> AsyncEngineContextProvider for AsyncEngineStreamWrapper<T> {
317303
}
318304
}
319305

320-
// This is unsafe because we're claiming the stream is Sync when it might not be
321-
// But this is needed for the AsyncEngineStream trait
322-
unsafe impl<T> Sync for AsyncEngineStreamWrapper<T> {}
306+
// // This is unsafe because we're claiming the stream is Sync when it might not be
307+
// // But this is needed for the AsyncEngineStream trait
308+
// unsafe impl<T> Sync for AsyncEngineStreamWrapper<T> {}
323309

324310
impl<T: Data> AsyncEngineStream<T> for AsyncEngineStreamWrapper<T> {}
325311

lib/llm/src/protocols.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,7 @@
1919
//! both publicly via the HTTP API and internally between Dynamo components.
2020
//!
2121
22-
use std::pin::Pin;
23-
24-
use futures::{Stream, StreamExt};
22+
use futures::StreamExt;
2523
use serde::{Deserialize, Serialize};
2624

2725
pub mod codec;
@@ -30,7 +28,7 @@ pub mod openai;
3028

3129
/// The token ID type
3230
pub type TokenIdType = u32;
33-
pub type DataStream<T> = Pin<Box<dyn Stream<Item = T> + Send + Sync>>;
31+
pub use dynamo_runtime::engine::DataStream;
3432

3533
// TODO: This is an awkward dependency that we need to address
3634
// Originally, all the Annotated/SSE Codec bits where in the LLM protocol module; however, [Annotated]

lib/llm/src/protocols/openai/chat_completions/aggregator.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use crate::protocols::{
2424
};
2525

2626
/// A type alias for a pinned, dynamically-dispatched stream that is `Send` and `Sync`.
27-
type DataStream<T> = Pin<Box<dyn Stream<Item = T> + Send + Sync>>;
27+
type DataStream<T> = Pin<Box<dyn Stream<Item = T> + Send>>;
2828

2929
/// Aggregates a stream of [`NvCreateChatCompletionStreamResponse`]s into a single
3030
/// [`NvCreateChatCompletionResponse`]. This struct accumulates incremental responses

lib/llm/src/protocols/openai/embeddings/aggregator.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,7 @@ use crate::protocols::{
2222
codec::{Message, SseCodecError},
2323
convert_sse_stream, Annotated,
2424
};
25-
26-
/// A type alias for a pinned, dynamically-dispatched stream that is `Send` and `Sync`.
27-
type DataStream<T> = Pin<Box<dyn Stream<Item = T> + Send + Sync>>;
25+
use dynamo_runtime::engine::DataStream;
2826

2927
/// Aggregates a stream of [`NvCreateEmbeddingResponse`]s into a single
3028
/// [`NvCreateEmbeddingResponse`]. For embeddings, this is typically simpler

lib/llm/tests/data/replays/deepseek-r1-distill-llama-8b/chat-completions.stream.logprobs.1

Lines changed: 0 additions & 67 deletions
This file was deleted.

lib/llm/tests/logprob_analysis_integration.rs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -210,20 +210,20 @@ fn create_multi_choice_stream() -> Arc<RecordedStream<NvCreateChatCompletionStre
210210
Arc::new(stream)
211211
}
212212

213-
fn create_stream_from_recorded_sse_stream(
214-
file: &str,
215-
) -> Arc<RecordedStream<NvCreateChatCompletionStreamResponse>> {
216-
let data = std::fs::read_to_string(file).unwrap();
217-
let sse_stream = create_message_stream(&data);
218-
let response_stream =
219-
convert_sse_stream::<NvCreateChatCompletionStreamResponse>(Box::pin(sse_stream));
220-
221-
let context = Arc::new(MockContext::new());
222-
let response_stream = record_stream_with_context(response_stream, context, RecordingMode::Sink);
223-
let filtered_stream = response_stream.filter_map(|annotated| async move { annotated.data });
224-
let (recorded_stream, recording_rx) =
225-
record_stream_with_context(Box::pin(filtered_stream), ctx, RecordingMode::Sink);
226-
}
213+
// fn create_stream_from_recorded_sse_stream(
214+
// file: &str,
215+
// ) -> Arc<RecordedStream<NvCreateChatCompletionStreamResponse>> {
216+
// let data = std::fs::read_to_string(file).unwrap();
217+
// let sse_stream = create_message_stream(&data);
218+
// let response_stream =
219+
// convert_sse_stream::<NvCreateChatCompletionStreamResponse>(Box::pin(sse_stream));
220+
221+
// let context = Arc::new(MockContext::new());
222+
// let response_stream = record_stream_with_context(response_stream, context, RecordingMode::Sink);
223+
// let filtered_stream = response_stream.filter_map(|annotated| async move { annotated.data });
224+
// let (recorded_stream, recording_rx) =
225+
// record_stream_with_context(Box::pin(filtered_stream), ctx, RecordingMode::Sink);
226+
// }
227227

228228
fn create_stream_with_multiple_close_tokens(
229229
) -> Arc<RecordedStream<NvCreateChatCompletionStreamResponse>> {

lib/runtime/src/engine.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,8 @@ impl<T: Send + Sync + 'static> Data for T {}
9292

9393
/// [`DataStream`] is a type alias for a stream of [`Data`] items. This can be adapted to a [`ResponseStream`]
9494
/// by associating it with a [`AsyncEngineContext`].
95-
pub type DataUnary<T> = Pin<Box<dyn Future<Output = T> + Send + Sync>>;
96-
pub type DataStream<T> = Pin<Box<dyn Stream<Item = T> + Send + Sync>>;
95+
pub type DataUnary<T> = Pin<Box<dyn Future<Output = T> + Send>>;
96+
pub type DataStream<T> = Pin<Box<dyn Stream<Item = T> + Send>>;
9797

9898
pub type Engine<Req, Resp, E> = Arc<dyn AsyncEngine<Req, Resp, E>>;
9999
pub type EngineUnary<Resp> = Pin<Box<dyn AsyncEngineUnary<Resp>>>;
@@ -174,7 +174,7 @@ pub trait AsyncEngineContextProvider: Send + Debug {
174174
/// This trait combines `Future` semantics with context provider capabilities,
175175
/// representing a single async operation that produces one result.
176176
pub trait AsyncEngineUnary<Resp: Data>:
177-
Future<Output = Resp> + AsyncEngineContextProvider + Send + Sync
177+
Future<Output = Resp> + AsyncEngineContextProvider + Send
178178
{
179179
}
180180

@@ -183,7 +183,7 @@ pub trait AsyncEngineUnary<Resp: Data>:
183183
/// This trait combines `Stream` semantics with context provider capabilities,
184184
/// representing a continuous async operation that produces multiple results over time.
185185
pub trait AsyncEngineStream<Resp: Data>:
186-
Stream<Item = Resp> + AsyncEngineContextProvider + Send + Sync
186+
Stream<Item = Resp> + AsyncEngineContextProvider + Send
187187
{
188188
}
189189

@@ -204,7 +204,7 @@ pub trait AsyncEngineStream<Resp: Data>:
204204
/// Implementations should ensure proper error handling and resource management.
205205
/// The `generate` method should be cancellable via the response's context provider.
206206
#[async_trait]
207-
pub trait AsyncEngine<Req: Data, Resp: Data + AsyncEngineContextProvider, E: Data>:
207+
pub trait AsyncEngine<Req: Send + Sync + 'static, Resp: AsyncEngineContextProvider, E: Data>:
208208
Send + Sync
209209
{
210210
/// Generate a stream of completion responses.

lib/runtime/src/pipeline.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ pub type ServerStreamingEngine<T, U> = ServiceEngine<SingleIn<T>, ManyOut<U>>;
6969
/// are considered independent of each other; however, they could be constrained to be related.
7070
pub type BidirectionalStreamingEngine<T, U> = ServiceEngine<ManyIn<T>, ManyOut<U>>;
7171

72-
pub trait AsyncTransportEngine<T: PipelineIO, U: PipelineIO>:
72+
pub trait AsyncTransportEngine<T: Data + PipelineIO, U: Data + PipelineIO>:
7373
AsyncEngine<T, U, Error> + Send + Sync + 'static
7474
{
7575
}
@@ -97,7 +97,7 @@ mod sealed {
9797
}
9898
}
9999

100-
pub trait PipelineIO: Data + sealed::Connectable + AsyncEngineContextProvider {
100+
pub trait PipelineIO: sealed::Connectable + AsyncEngineContextProvider + 'static {
101101
fn id(&self) -> String;
102102
}
103103

lib/runtime/src/pipeline/network.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,7 @@ pub struct Ingress<Req: PipelineIO, Resp: PipelineIO> {
280280
segment: OnceLock<Arc<SegmentSource<Req, Resp>>>,
281281
}
282282

283-
impl<Req: PipelineIO, Resp: PipelineIO> Ingress<Req, Resp> {
283+
impl<Req: PipelineIO + Sync, Resp: PipelineIO> Ingress<Req, Resp> {
284284
pub fn new() -> Arc<Self> {
285285
Arc::new(Self {
286286
segment: OnceLock::new(),

lib/runtime/src/pipeline/nodes.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -221,8 +221,8 @@ where
221221
impl<UpIn, UpOut, DownIn, DownOut> AsyncEngine<UpIn, UpOut, Error>
222222
for PipelineOperator<UpIn, UpOut, DownIn, DownOut>
223223
where
224-
UpIn: PipelineIO,
225-
DownIn: PipelineIO,
224+
UpIn: PipelineIO + Sync,
225+
DownIn: PipelineIO + Sync,
226226
DownOut: PipelineIO,
227227
UpOut: PipelineIO,
228228
{
@@ -235,8 +235,8 @@ where
235235
impl<UpIn, UpOut, DownIn, DownOut> Sink<UpIn>
236236
for PipelineOperatorForwardEdge<UpIn, UpOut, DownIn, DownOut>
237237
where
238-
UpIn: PipelineIO,
239-
DownIn: PipelineIO,
238+
UpIn: PipelineIO + Sync,
239+
DownIn: PipelineIO + Sync,
240240
DownOut: PipelineIO,
241241
UpOut: PipelineIO,
242242
{

0 commit comments

Comments
 (0)