diff --git a/dspy/streaming/streaming_listener.py b/dspy/streaming/streaming_listener.py index 5c1cc144df..4080f26cdd 100644 --- a/dspy/streaming/streaming_listener.py +++ b/dspy/streaming/streaming_listener.py @@ -1,3 +1,4 @@ +import inspect import re from collections import defaultdict from queue import Queue @@ -135,7 +136,12 @@ def receive(self, chunk: ModelResponseStream): return # Handle custom streamable types - if self._output_type and issubclass(self._output_type, Type) and self._output_type.is_streamable(): + if ( + self._output_type + and inspect.isclass(self._output_type) + and issubclass(self._output_type, Type) + and self._output_type.is_streamable() + ): if parsed_chunk := self._output_type.parse_stream_chunk(chunk): return StreamResponse( self.predict_name, diff --git a/tests/streaming/test_streaming.py b/tests/streaming/test_streaming.py index c4efaff7f4..08946a761b 100644 --- a/tests/streaming/test_streaming.py +++ b/tests/streaming/test_streaming.py @@ -1205,6 +1205,52 @@ async def chat_stream(*args, **kwargs): assert "success" in full_content +@pytest.mark.anyio +async def test_chat_adapter_with_generic_type_annotation(): + class TestSignature(dspy.Signature): + question: str = dspy.InputField() + response: list[str] | int = dspy.OutputField() + + class MyProgram(dspy.Module): + def __init__(self): + self.predict = dspy.Predict(TestSignature) + + def forward(self, question, **kwargs): + return self.predict(question=question, **kwargs) + + async def chat_stream(*args, **kwargs): + yield ModelResponseStream(model="gpt-4o-mini", choices=[StreamingChoices(delta=Delta(content="[[ ##"))]) + yield ModelResponseStream(model="gpt-4o-mini", choices=[StreamingChoices(delta=Delta(content=" response"))]) + yield ModelResponseStream(model="gpt-4o-mini", choices=[StreamingChoices(delta=Delta(content=" ## ]]\n\n"))]) + yield ModelResponseStream( + model="gpt-4o-mini", choices=[StreamingChoices(delta=Delta(content="1"))] + ) + yield ModelResponseStream(model="gpt-4o-mini", choices=[StreamingChoices(delta=Delta(content="\n\n[[ ##"))]) + yield ModelResponseStream(model="gpt-4o-mini", choices=[StreamingChoices(delta=Delta(content=" completed"))]) + yield ModelResponseStream(model="gpt-4o-mini", choices=[StreamingChoices(delta=Delta(content=" ## ]]"))]) + + program = dspy.streamify( + MyProgram(), + stream_listeners=[ + dspy.streaming.StreamListener(signature_field_name="response"), + ], + ) + + with mock.patch("litellm.acompletion", side_effect=chat_stream): + with dspy.context(lm=dspy.LM("openai/gpt-4o-mini", cache=False), adapter=dspy.ChatAdapter()): + output = program(question="Say hello") + chunks = [] + async for value in output: + if isinstance(value, StreamResponse): + chunks.append(value) + + assert len(chunks) > 0 + assert chunks[0].signature_field_name == "response" + + full_content = "".join(chunk.chunk for chunk in chunks) + assert "1" in full_content + + @pytest.mark.anyio async def test_chat_adapter_nested_pydantic_streaming(): """Test ChatAdapter streaming with nested pydantic model."""