Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions python/packages/ag-ui/tests/ag_ui/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -1501,3 +1501,42 @@ def test_reasoning_encrypted_value_updated_on_later_delta(self):
assert len(flow.reasoning_messages) == 1
assert flow.reasoning_messages[0]["content"] == "part1 part2"
assert flow.reasoning_messages[0]["encryptedValue"] == "encrypted-payload"

def test_reasoning_done_after_deltas_does_not_duplicate(self):
Comment thread
moonbox3 marked this conversation as resolved.
"""A done-style content arriving after deltas does not duplicate accumulated text.

The upstream client should skip done events when deltas preceded them,
but if one leaks through, the accumulator must not double-append.
This test verifies that only the delta-produced text is stored.
"""
flow = FlowState()
msg_id = "reason_dedup"

delta1 = Content.from_text_reasoning(id=msg_id, text="Hello ")
delta2 = Content.from_text_reasoning(id=msg_id, text="world")

_emit_text_reasoning(delta1, flow)
_emit_text_reasoning(delta2, flow)

# Accumulated text should equal the concatenation of deltas only
assert len(flow.reasoning_messages) == 1
assert flow.reasoning_messages[0]["content"] == "Hello world"
Comment thread
moonbox3 marked this conversation as resolved.
assert flow.reasoning_messages[0]["id"] == msg_id

def test_reasoning_deltas_emit_one_content_event_each(self):
"""Each reasoning delta emits exactly one ReasoningMessageContentEvent."""
flow = FlowState()
msg_id = "reason_evt"

delta1 = Content.from_text_reasoning(id=msg_id, text="Think ")
delta2 = Content.from_text_reasoning(id=msg_id, text="hard")

events1 = _emit_text_reasoning(delta1, flow)
events2 = _emit_text_reasoning(delta2, flow)

all_events = events1 + events2
content_events = [e for e in all_events if isinstance(e, ReasoningMessageContentEvent)]

assert len(content_events) == 2
assert content_events[0].delta == "Think "
assert content_events[1].delta == "hard"
42 changes: 30 additions & 12 deletions python/packages/openai/agent_framework_openai/_chat_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,7 @@ def _inner_get_response(

if stream:
function_call_ids: dict[int, tuple[str, str]] = {}
seen_reasoning_delta_item_ids: set[str] = set()
validated_options: dict[str, Any] | None = None

async def _stream() -> AsyncIterable[ChatResponseUpdate]:
Expand All @@ -530,6 +531,7 @@ async def _stream() -> AsyncIterable[ChatResponseUpdate]:
chunk,
options=validated_options,
function_call_ids=function_call_ids,
seen_reasoning_delta_item_ids=seen_reasoning_delta_item_ids,
)
except Exception as ex:
self._handle_request_error(ex)
Expand Down Expand Up @@ -1930,6 +1932,7 @@ def _parse_chunk_from_openai(
event: OpenAIResponseStreamEvent,
options: dict[str, Any],
function_call_ids: dict[int, tuple[str, str]],
seen_reasoning_delta_item_ids: set[str] | None = None,
) -> ChatResponseUpdate:
"""Parse an OpenAI Responses API streaming event into a ChatResponseUpdate."""
metadata: dict[str, Any] = {}
Expand Down Expand Up @@ -2008,6 +2011,8 @@ def _parse_chunk_from_openai(
contents.append(Content.from_text(text=event.delta, raw_representation=event))
metadata.update(self._get_metadata_from_response(event))
case "response.reasoning_text.delta":
if seen_reasoning_delta_item_ids is not None:
seen_reasoning_delta_item_ids.add(event.item_id)
contents.append(
Content.from_text_reasoning(
id=event.item_id,
Expand All @@ -2017,15 +2022,21 @@ def _parse_chunk_from_openai(
)
metadata.update(self._get_metadata_from_response(event))
case "response.reasoning_text.done":
contents.append(
Content.from_text_reasoning(
id=event.item_id,
text=event.text,
raw_representation=event,
# Done event carries the full accumulated text. Emit it only as a
# fallback when no delta was already received for this item_id, to
# avoid duplicating content in downstream accumulators (e.g. ag-ui).
if seen_reasoning_delta_item_ids is None or event.item_id not in seen_reasoning_delta_item_ids:
contents.append(
Content.from_text_reasoning(
id=event.item_id,
text=event.text,
raw_representation=event,
)
)
)
metadata.update(self._get_metadata_from_response(event))
case "response.reasoning_summary_text.delta":
if seen_reasoning_delta_item_ids is not None:
seen_reasoning_delta_item_ids.add(event.item_id)
contents.append(
Content.from_text_reasoning(
id=event.item_id,
Expand All @@ -2035,13 +2046,17 @@ def _parse_chunk_from_openai(
)
metadata.update(self._get_metadata_from_response(event))
case "response.reasoning_summary_text.done":
contents.append(
Content.from_text_reasoning(
id=event.item_id,
text=event.text,
raw_representation=event,
# Done event carries the full accumulated text. Emit it only as a
# fallback when no delta was already received for this item_id, to
# avoid duplicating content in downstream accumulators (e.g. ag-ui).
if seen_reasoning_delta_item_ids is None or event.item_id not in seen_reasoning_delta_item_ids:
contents.append(
Content.from_text_reasoning(
id=event.item_id,
text=event.text,
raw_representation=event,
)
)
)
metadata.update(self._get_metadata_from_response(event))
case "response.code_interpreter_call_code.delta":
call_id = getattr(event, "call_id", None) or getattr(event, "id", None) or event.item_id
Expand All @@ -2065,6 +2080,9 @@ def _parse_chunk_from_openai(
)
)
metadata.update(self._get_metadata_from_response(event))
# NOTE: Unlike reasoning done events, code_interpreter done events always
# emit content because downstream consumers do not accumulate
# code_interpreter deltas the same way.
case "response.code_interpreter_call_code.done":
call_id = getattr(event, "call_id", None) or getattr(event, "id", None) or event.item_id
ci_additional_properties = {
Expand Down
124 changes: 116 additions & 8 deletions python/packages/openai/tests/openai/test_openai_chat_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2808,11 +2808,12 @@ def test_streaming_reasoning_text_delta_event() -> None:
mock_metadata.assert_called_once_with(event)


def test_streaming_reasoning_text_done_event() -> None:
"""Test reasoning text done event creates TextReasoningContent with complete text."""
def test_streaming_reasoning_text_done_event_skipped_after_deltas() -> None:
"""Test reasoning text done event does not emit content when deltas were already received."""
client = OpenAIChatClient(model="test-model", api_key="test-key")
chat_options = ChatOptions()
function_call_ids: dict[int, tuple[str, str]] = {}
seen_reasoning_delta_item_ids: set[str] = {"reasoning_456"}

event = ResponseReasoningTextDoneEvent(
type="response.reasoning_text.done",
Expand All @@ -2824,12 +2825,40 @@ def test_streaming_reasoning_text_done_event() -> None:
)

with patch.object(client, "_get_metadata_from_response", return_value={"test": "data"}) as mock_metadata:
response = client._parse_chunk_from_openai(event, chat_options, function_call_ids) # type: ignore
response = client._parse_chunk_from_openai(
event, chat_options, function_call_ids, seen_reasoning_delta_item_ids
) # type: ignore

assert len(response.contents) == 0
mock_metadata.assert_called_once_with(event)
assert response.additional_properties == {"test": "data"}


def test_streaming_reasoning_text_done_event_fallback_without_deltas() -> None:
"""Test reasoning text done event emits content when no deltas were received for this item_id."""
client = OpenAIChatClient(model="test-model", api_key="test-key")
chat_options = ChatOptions()
function_call_ids: dict[int, tuple[str, str]] = {}
seen_reasoning_delta_item_ids: set[str] = set()

event = ResponseReasoningTextDoneEvent(
type="response.reasoning_text.done",
content_index=0,
item_id="reasoning_456",
output_index=0,
sequence_number=2,
text="complete reasoning",
)

with patch.object(client, "_get_metadata_from_response", return_value={"test": "data"}) as mock_metadata:
response = client._parse_chunk_from_openai(
event, chat_options, function_call_ids, seen_reasoning_delta_item_ids
) # type: ignore

assert len(response.contents) == 1
assert response.contents[0].type == "text_reasoning"
assert response.contents[0].id == "reasoning_456"
assert response.contents[0].text == "complete reasoning"
assert response.contents[0].raw_representation == event
mock_metadata.assert_called_once_with(event)
assert response.additional_properties == {"test": "data"}

Expand Down Expand Up @@ -2859,11 +2888,12 @@ def test_streaming_reasoning_summary_text_delta_event() -> None:
mock_metadata.assert_called_once_with(event)


def test_streaming_reasoning_summary_text_done_event() -> None:
"""Test reasoning summary text done event creates TextReasoningContent with complete text."""
def test_streaming_reasoning_summary_text_done_event_skipped_after_deltas() -> None:
"""Test reasoning summary text done event does not emit content when deltas were already received."""
client = OpenAIChatClient(model="test-model", api_key="test-key")
chat_options = ChatOptions()
function_call_ids: dict[int, tuple[str, str]] = {}
seen_reasoning_delta_item_ids: set[str] = {"summary_012"}

event = ResponseReasoningSummaryTextDoneEvent(
type="response.reasoning_summary_text.done",
Expand All @@ -2875,16 +2905,94 @@ def test_streaming_reasoning_summary_text_done_event() -> None:
)

with patch.object(client, "_get_metadata_from_response", return_value={"custom": "meta"}) as mock_metadata:
response = client._parse_chunk_from_openai(event, chat_options, function_call_ids) # type: ignore
response = client._parse_chunk_from_openai(
event, chat_options, function_call_ids, seen_reasoning_delta_item_ids
) # type: ignore

assert len(response.contents) == 0
mock_metadata.assert_called_once_with(event)
assert response.additional_properties == {"custom": "meta"}


def test_streaming_reasoning_summary_text_done_event_fallback_without_deltas() -> None:
"""Test reasoning summary text done event emits content when no deltas were received for this item_id."""
client = OpenAIChatClient(model="test-model", api_key="test-key")
chat_options = ChatOptions()
function_call_ids: dict[int, tuple[str, str]] = {}
seen_reasoning_delta_item_ids: set[str] = set()

event = ResponseReasoningSummaryTextDoneEvent(
type="response.reasoning_summary_text.done",
item_id="summary_012",
output_index=0,
sequence_number=4,
summary_index=0,
text="complete summary",
)

with patch.object(client, "_get_metadata_from_response", return_value={"custom": "meta"}) as mock_metadata:
response = client._parse_chunk_from_openai(
event, chat_options, function_call_ids, seen_reasoning_delta_item_ids
) # type: ignore

assert len(response.contents) == 1
assert response.contents[0].type == "text_reasoning"
assert response.contents[0].id == "summary_012"
assert response.contents[0].text == "complete summary"
assert response.contents[0].raw_representation == event
mock_metadata.assert_called_once_with(event)
assert response.additional_properties == {"custom": "meta"}


def test_streaming_reasoning_deltas_then_done_no_duplication() -> None:
"""Sending delta events followed by a done event produces content only from deltas."""
client = OpenAIChatClient(model="test-model", api_key="test-key")
chat_options = ChatOptions()
function_call_ids: dict[int, tuple[str, str]] = {}
seen_reasoning_delta_item_ids: set[str] = set()
item_id = "reasoning_seq"

delta1 = ResponseReasoningTextDeltaEvent(
type="response.reasoning_text.delta",
content_index=0,
item_id=item_id,
output_index=0,
sequence_number=1,
delta="Hello ",
)
delta2 = ResponseReasoningTextDeltaEvent(
type="response.reasoning_text.delta",
content_index=0,
item_id=item_id,
output_index=0,
sequence_number=2,
delta="world",
)
done = ResponseReasoningTextDoneEvent(
type="response.reasoning_text.done",
content_index=0,
item_id=item_id,
output_index=0,
sequence_number=3,
text="Hello world",
)

all_contents = []
with patch.object(client, "_get_metadata_from_response", return_value={}):
for event in [delta1, delta2, done]:
response = client._parse_chunk_from_openai(
event,
chat_options,
function_call_ids,
seen_reasoning_delta_item_ids, # type: ignore
)
all_contents.extend(response.contents)

assert len(all_contents) == 2
assert all_contents[0].text == "Hello "
assert all_contents[1].text == "world"
assert "".join(c.text for c in all_contents) == "Hello world"


def test_streaming_reasoning_events_preserve_metadata() -> None:
"""Test that reasoning events preserve metadata like regular text events."""
client = OpenAIChatClient(model="test-model", api_key="test-key")
Expand Down
Loading