Skip to content

Conversation

@fede-kamel
Copy link

@fede-kamel fede-kamel commented Jul 5, 2025

refactor: process all streaming events for forward compatibility

Summary

This PR refactors the streaming implementation to process all event types instead of filtering to a predefined set. This improves forward compatibility by ensuring any new event types introduced by the API will be automatically processed without requiring SDK updates.

Changes Made

Rebased on latest next branch which includes the response.close() optimization from codegen (commit 109b771).

Key Change: Forward Compatibility

Replaced hardcoded event filtering with generic event processing:

Before (filtering to predefined events):

if (sse.event == "message_start" or 
    sse.event == "message_delta" or 
    sse.event == "message_stop" or ...):
    # Only processes these 6 specific event types
    yield process_data(...)

After (processes any event):

if sse.event == "completion":
    yield process_data(...)

if sse.event == "ping":
    continue

if sse.event == "error":
    raise error...

# Process ANY other event for forward compatibility
data = sse.json()
if is_dict(data) and "type" not in data:
    data["type"] = sse.event
yield process_data(...)

Benefits

  1. Forward Compatibility - New event types automatically supported without SDK changes

    • If the API introduces content_block_thinking, tool_use_start, etc., they'll automatically work
    • No SDK update required for new event types
  2. Cleaner Code - Simpler logic without predefined event filtering

    • Removed MESSAGE_EVENTS frozenset constant
    • More maintainable and easier to understand
  3. API Alignment - Matches the SDK design philosophy of being forward compatible

  4. Zero Breaking Changes - Existing functionality preserved

    • All current events still processed correctly
    • Performance maintained with response.close() optimization from codegen

Backward Compatibility

This change maintains 100% backward compatibility. All existing streaming code continues to work identically, with improved forward compatibility for future API evolution.

Technical Details

  • Applies to both sync (Stream) and async (AsyncStream) implementations
  • Keeps the response.close() / await response.aclose() optimization from codegen
  • All events get proper type field assignment if not present

@fede-kamel fede-kamel requested a review from a team as a code owner July 5, 2025 06:02
@fede-kamel fede-kamel force-pushed the streaming-performance branch 3 times, most recently from 96899e2 to 8f31d23 Compare July 5, 2025 07:01
@fede-kamel fede-kamel changed the title perf(streaming-performance): optimize streaming performance and error handling perf(streaming-performance): optimize streaming performance Jul 5, 2025
@fede-kamel
Copy link
Author

fede-kamel commented Jul 9, 2025

@anthropics/sdk can we prioritize? - need this performance improvement -

@fede-kamel
Copy link
Author

@RobertCraigie what's the next step to merge this?

@fede-kamel
Copy link
Author

It's been already more than two months this PR was opened. The intention of the PR is to increase the speed of events in your SDK. Difficult to understand why it is taking so long for a clear improvement in the code quality of your SDK.

cc: @RobertCraigie @anthropics/sdk

Copy link
Collaborator

@karpetrosyan karpetrosyan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for delayed answer, I'll try to review it. Thanks so much for your time and efforts, I'll try to get this merged quickly

# Explicitly closes decoder resources if available
# Immediately releases connection instead of consuming remaining stream
if hasattr(self._decoder, "close"):
self._decoder.close() # Properly closes decoder resources without unnecessary iteration
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this close the underlying connection? Running a simple streaming script, I see that at that point the decoder is SSEDecoder, which doesn't have a close method. Though if we iterate through it, we will consume the underlying httpx stream and the connection will be closed. Maybe we should call self.response.close() here?

or sse.event == "content_block_delta"
or sse.event == "content_block_stop"
):
# Single, fast membership test instead of multiple string comparisons
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this, it really speeds things up. Plus, even though in this case we have O(6) = O(1) complexity for a single lookup, our codegen might have N such events, so it's worth it.

or sse.event == "content_block_delta"
or sse.event == "content_block_stop"
):
# same O(1) lookup for consistency between sync/async versions
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's have the same comment for sync/async case

@fede-kamel fede-kamel force-pushed the streaming-performance branch from 8f31d23 to cf51769 Compare October 28, 2025 12:19
fede-kamel added a commit to fede-kamel/anthropic-sdk-python that referenced this pull request Oct 28, 2025
- Replace decoder.close() with response.close() in sync stream
- Replace decoder.close() with response.aclose() in async stream
- Standardize comments between sync and async implementations

Addresses feedback from PR anthropics#993:
- SSEDecoder doesn't have a close method, properly close httpx response instead
- Ensure consistent comments between sync/async versions
@fede-kamel
Copy link
Author

Review Feedback Addressed

Thank you @karpetrosyan for the review! I've addressed all your feedback:

✅ Fixed: Connection Cleanup (Line 126 & 236)

Your feedback:

Does this close the underlying connection? Running a simple streaming script, I see that at that point the decoder is SSEDecoder, which doesn't have a close method. Though if we iterate through it, we will consume the underlying httpx stream and the connection will be closed. Maybe we should call self.response.close() here?

Changes made:

  • Sync version (line 125): Changed from if hasattr(self._decoder, "close"): self._decoder.close() to direct self.response.close()
  • Async version (line 236): Changed from if hasattr(self._decoder, "close"): self._decoder.close() to await self.response.aclose()

This ensures proper connection cleanup via the httpx response object instead of attempting to close the SSEDecoder which doesn't have a close method.

✅ Fixed: Comment Consistency (Line 209)

Your feedback:

let's have the same comment for sync/async case

Changes made:

  • Changed async comment from "same O(1) lookup for consistency between sync/async versions" to match sync version
  • Both now say: # Single, fast membership test instead of multiple string comparisons

✅ Confirmed: O(1) Performance Improvement (Line 96)

Your feedback:

I like this, it really speeds things up. Plus, even though in this case we have O(6) = O(1) complexity for a single lookup, our codegen might have N such events, so it's worth it.

Thank you! The optimization delivers 1.64-1.69x speedup as validated by our performance tests.


Test Results

All tests pass with live API validation:

✅ 11/11 tests passed

🔬 Event Type Lookup Optimization:
   OLD (O(n) chain): 0.000201s (44,766,769 lookups/sec)
   NEW (O(1) set):   0.000123s (73,344,273 lookups/sec)
   🚀 Speedup: 1.64x

🧪 Real Streaming Performance Test:
   📊 Events processed: 9
   ⏱️  Duration: 2.1174s
   ✅ Real streaming optimizations work!

🧪 Async Real Streaming Test:
   📊 Events processed: 9
   ⏱️  Duration: 2.5522s
   ✅ Async streaming processed successfully

Commits

  • 5a87d43: Initial review feedback fixes (connection cleanup + comment consistency)
  • c4fa0a8: Updated tests to verify response.close() instead of decoder.close()

Ready for re-review! 🚀

@karpetrosyan
Copy link
Collaborator

Reviewing this again, I think we’d better just remove that event name check and process any event we receive instead. The loop could look like this.

for sse in iterator:
            if sse.event == "completion":
                yield process_data(data=sse.json(), cast_to=cast_to, response=response)

            if sse.event == "ping":
                continue

            if sse.event == "error":
                body = sse.data

                try:
                    body = sse.json()
                    err_msg = f"{body}"
                except Exception:
                    err_msg = sse.data or f"Error code: {response.status_code}"

                raise self._client._make_status_error(
                    err_msg,
                    body=body,
                    response=self.response,
                )

            data = sse.json()
            if is_dict(data) and "type" not in data:
                data["type"] = sse.event

            yield process_data(data=data, cast_to=cast_to, response=response)

So... I guess our next steps are:

  1. Remove the event name checks

  2. Remove the early response closing part — I’ll port that change to our codegen

  3. Remove the performance test, fix the PR title, and point out that we’re now processing any events we receive

Processing any events would align better with our SDK design in general, where we try to be forward compatible wherever we can.

@fede-kamel fede-kamel changed the title perf(streaming-performance): optimize streaming performance refactor: process all streaming events for forward compatibility Oct 28, 2025
fede-kamel added a commit to fede-kamel/anthropic-sdk-python that referenced this pull request Oct 28, 2025
Changes based on review feedback from @karpetrosyan:

1. Removed MESSAGE_EVENTS filtering - now processes any event received
2. Removed early response closing (response.close/aclose) - will be ported to codegen
3. Removed performance test file - no longer applicable

This change improves forward compatibility by processing any event type
the API sends, rather than filtering to a predefined set. The new
structure handles completion, ping, and error events explicitly, while
processing all other events generically.

Closes review feedback in anthropics#993
@fede-kamel
Copy link
Author

@karpetrosyan Thank you for the review and guidance. I've implemented all the changes you requested:

✅ Changes Implemented

  1. Removed MESSAGE_EVENTS filtering - Now processes any event received
  2. Removed early response closing - Understood this will be ported to codegen
  3. Removed performance test - Updated PR to reflect new scope
  4. Updated PR title and description - Now focuses on forward compatibility

🧪 Testing Performed

I ran comprehensive integration tests to verify the changes work correctly:

  • Sync Streaming: 15 events processed successfully
  • Async Streaming: 13 events processed successfully
  • All event types handled correctly: message_start, content_block_start, content_block_delta, content_block_stop, message_delta, message_stop, text

📊 Value Proposition

What this PR provides:

Forward compatibility for future streaming events. If Anthropic introduces new event types (e.g., tool_use_start, citation_delta), they'll be automatically processed without requiring an SDK update.

Before: Unknown events would be filtered out by MESSAGE_EVENTS check
After: All events are processed generically

Trade-off context:

The original PR included measurable performance optimizations (1.69x speedup via O(1) frozenset lookups) alongside this forward compatibility improvement. Per your feedback, I've focused solely on the forward compatibility aspect, which provides architectural value but is speculative (depends on future API evolution).

I understand the priority for forward compatibility aligns with the SDK's design philosophy. The PR is ready for re-review as implemented.

)

# Ensure the entire stream is consumed
for _sse in iterator:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah sorry, we want to keep this part until I port early exit improvement to codegen

@fede-kamel
Copy link
Author

Fixed! I've added back the stream consumption loop. The code now keeps:

# Ensure the entire stream is consumed
for _sse in iterator:
    pass

And the async version:

# Ensure the entire stream is consumed  
async for _sse in iterator:
    pass

This will remain until you port the early exit improvement to codegen. Changes pushed in commit 2a3e30b.

@karpetrosyan
Copy link
Collaborator

This looks great, thanks! One note: we need to merge commits to the next branch instead of main

@fede-kamel fede-kamel changed the base branch from main to next October 28, 2025 19:29
@fede-kamel
Copy link
Author

fede-kamel commented Oct 28, 2025

@karpetrosyan done - whats the next step?

fede-kamel added a commit to fede-kamel/anthropic-sdk-python that referenced this pull request Oct 29, 2025
- Replace decoder.close() with response.close() in sync stream
- Replace decoder.close() with response.aclose() in async stream
- Standardize comments between sync and async implementations

Addresses feedback from PR anthropics#993:
- SSEDecoder doesn't have a close method, properly close httpx response instead
- Ensure consistent comments between sync/async versions
fede-kamel added a commit to fede-kamel/anthropic-sdk-python that referenced this pull request Oct 29, 2025
Changes based on review feedback from @karpetrosyan:

1. Removed MESSAGE_EVENTS filtering - now processes any event received
2. Removed early response closing (response.close/aclose) - will be ported to codegen
3. Removed performance test file - no longer applicable

This change improves forward compatibility by processing any event type
the API sends, rather than filtering to a predefined set. The new
structure handles completion, ping, and error events explicitly, while
processing all other events generically.

Closes review feedback in anthropics#993
@fede-kamel fede-kamel force-pushed the streaming-performance branch from 2a3e30b to 3216326 Compare October 29, 2025 12:40
Fede Kamelhar and others added 3 commits October 29, 2025 08:49
…resource cleanup

fix(streaming-performance) - Optimize streaming event processing and resource cleanup

fix(streaming-performance) - Optimize streaming event processing and resource cleanup
Updated test assertions to verify response.close() is called
instead of decoder.close(), aligning with review feedback.

Changes:
- NewImplementationSimulator now calls response.close()
- Stream cleanup test verifies response.close()
- Regression test verifies response.close()

All 11 tests now pass with live API validation.
Changes based on review feedback from @karpetrosyan:

1. Removed MESSAGE_EVENTS filtering - now processes any event received
2. Removed early response closing (response.close/aclose) - will be ported to codegen
3. Removed performance test file - no longer applicable

This change improves forward compatibility by processing any event type
the API sends, rather than filtering to a predefined set. The new
structure handles completion, ping, and error events explicitly, while
processing all other events generically.

Closes review feedback in anthropics#993
@fede-kamel fede-kamel force-pushed the streaming-performance branch from 3216326 to cbc3e13 Compare October 29, 2025 12:50
@fede-kamel
Copy link
Author

Rebased on latest next branch - conflicts resolved ✅

This PR has been rebased onto the latest next branch and conflicts have been resolved.

Resolution Strategy

Combined both changes:

  1. Kept the response.close() optimization from commit 109b771 (the codegen port)
  2. Kept the forward compatibility refactor (this PR's main contribution)

What This PR Changes

The core change is removing hardcoded event filtering in favor of generic event processing:

Before: Only 6 event types were processed (hardcoded list)

if (sse.event == "message_start" or sse.event == "message_delta" or ...):
    yield process_data(...)

After: ANY event type is processed (forward compatible)

# Handle specific events explicitly
if sse.event == "completion": ...
if sse.event == "ping": continue
if sse.event == "error": raise error

# Process any other event for forward compatibility
data = sse.json()
if is_dict(data) and "type" not in data:
    data["type"] = sse.event
yield process_data(...)

This ensures new event types like content_block_thinking, tool_use_start, etc. will automatically work without SDK updates.

Ready for review! @karpetrosyan

@fede-kamel
Copy link
Author

@karpetrosyan when this can get pushed? Thank you!

@fede-kamel
Copy link
Author

@karpetrosyan time to push this? whats the process?

@karpetrosyan
Copy link
Collaborator

I can't merge this myself, but I've asked the team to check it out.

@RobertCraigie
Copy link
Collaborator

Sorry for the delay, we're figuring out internally if this is a safe change to make.

We did merge one of the original changes in your PR through a codegen change and marked you as a coauthor :) 109b771 (#1056)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants