Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 23 additions & 5 deletions lib/ruby_llm/streaming.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@ def create_stream_processor(parser, buffer, &)
end
end

def process_stream_chunk(chunk, parser, _env, &)
def process_stream_chunk(chunk, parser, env, &)
RubyLLM.logger.debug "Received chunk: #{chunk}"

if error_chunk?(chunk)
handle_error_chunk(chunk, nil)
handle_error_chunk(chunk, env)
else
yield handle_sse(chunk, parser, nil, &)
yield handle_sse(chunk, parser, env, &)
end
end

Expand All @@ -88,7 +88,16 @@ def error_chunk?(chunk)
def handle_error_chunk(chunk, env)
error_data = chunk.split("\n")[1].delete_prefix('data: ')
status, _message = parse_streaming_error(error_data)
error_response = env.merge(body: JSON.parse(error_data), status: status)
parsed_data = JSON.parse(error_data)

# Create a response-like object that works for both Faraday v1 and v2
error_response = if env
env.merge(body: parsed_data, status: status)
else
# For Faraday v1, create a simple object that responds to .status and .body
Struct.new(:body, :status).new(parsed_data, status)
end

ErrorMiddleware.parse_error(provider: self, response: error_response)
rescue JSON::ParserError => e
RubyLLM.logger.debug "Failed to parse error chunk: #{e.message}"
Expand Down Expand Up @@ -122,7 +131,16 @@ def handle_data(data)

def handle_error_event(data, env)
status, _message = parse_streaming_error(data)
error_response = env.merge(body: JSON.parse(data), status: status)
parsed_data = JSON.parse(data)

# Create a response-like object that works for both Faraday v1 and v2
error_response = if env
env.merge(body: parsed_data, status: status)
else
# For Faraday v1, create a simple object that responds to .status and .body
Struct.new(:body, :status).new(parsed_data, status)
end

ErrorMiddleware.parse_error(provider: self, response: error_response)
rescue JSON::ParserError => e
RubyLLM.logger.debug "Failed to parse error event: #{e.message}"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
---
http_interactions:
- request:
method: post
uri: https://api.anthropic.com/v1/messages
body:
encoding: UTF-8
string: '{"model":"claude-3-5-haiku-20241022","messages":[{"role":"user","content":[{"type":"text","text":"Count
from 1 to 3"}]}],"temperature":0.7,"stream":true,"max_tokens":8192}'
headers:
User-Agent:
- Faraday v2.13.1
X-Api-Key:
- "<ANTHROPIC_API_KEY>"
Anthropic-Version:
- '2023-06-01'
Content-Type:
- application/json
Accept-Encoding:
- gzip;q=1.0,deflate;q=0.6,identity;q=0.3
Accept:
- "*/*"
response:
status:
code: 200
message: OK
headers:
Date:
- Wed, 11 Jun 2025 12:53:20 GMT
Content-Type:
- text/event-stream; charset=utf-8
Transfer-Encoding:
- chunked
Connection:
- keep-alive
Cache-Control:
- no-cache
Anthropic-Ratelimit-Input-Tokens-Limit:
- '100000'
Anthropic-Ratelimit-Input-Tokens-Remaining:
- '100000'
Anthropic-Ratelimit-Input-Tokens-Reset:
- '2025-06-11T12:53:18Z'
Anthropic-Ratelimit-Output-Tokens-Limit:
- '20000'
Anthropic-Ratelimit-Output-Tokens-Remaining:
- '20000'
Anthropic-Ratelimit-Output-Tokens-Reset:
- '2025-06-11T12:53:18Z'
Anthropic-Ratelimit-Requests-Limit:
- '1000'
Anthropic-Ratelimit-Requests-Remaining:
- '999'
Anthropic-Ratelimit-Requests-Reset:
- '2025-06-11T12:53:19Z'
Anthropic-Ratelimit-Tokens-Limit:
- '120000'
Anthropic-Ratelimit-Tokens-Remaining:
- '120000'
Anthropic-Ratelimit-Tokens-Reset:
- '2025-06-11T12:53:18Z'
Request-Id:
- "<REQUEST_ID>"
Strict-Transport-Security:
- max-age=31536000; includeSubDomains; preload
Anthropic-Organization-Id:
- 0137b15c-16bf-490d-9f90-8cfd7e325ec0
Via:
- 1.1 google
Cf-Cache-Status:
- DYNAMIC
X-Robots-Tag:
- none
Server:
- cloudflare
Cf-Ray:
- "<CF_RAY>"
body:
encoding: UTF-8
string: |+
event: error
data: {"type":"error","error":{"details":null,"type":"overloaded_error","message":"Overloaded"} }


recorded_at: Wed, 11 Jun 2025 12:53:21 GMT
recorded_with: VCR 6.3.1
...
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
---
http_interactions:
- request:
method: post
uri: https://api.anthropic.com/v1/messages
body:
encoding: UTF-8
string: '{"model":"claude-3-5-haiku-20241022","messages":[{"role":"user","content":[{"type":"text","text":"Count
from 1 to 3"}]}],"temperature":0.7,"stream":true,"max_tokens":8192}'
headers:
User-Agent:
- Faraday v2.13.1
X-Api-Key:
- "<ANTHROPIC_API_KEY>"
Anthropic-Version:
- '2023-06-01'
Content-Type:
- application/json
Accept-Encoding:
- gzip;q=1.0,deflate;q=0.6,identity;q=0.3
Accept:
- "*/*"
response:
status:
code: 200
message: OK
headers:
Date:
- Wed, 11 Jun 2025 12:53:20 GMT
Content-Type:
- text/event-stream; charset=utf-8
Transfer-Encoding:
- chunked
Connection:
- keep-alive
Cache-Control:
- no-cache
Anthropic-Ratelimit-Input-Tokens-Limit:
- '100000'
Anthropic-Ratelimit-Input-Tokens-Remaining:
- '100000'
Anthropic-Ratelimit-Input-Tokens-Reset:
- '2025-06-11T12:53:18Z'
Anthropic-Ratelimit-Output-Tokens-Limit:
- '20000'
Anthropic-Ratelimit-Output-Tokens-Remaining:
- '20000'
Anthropic-Ratelimit-Output-Tokens-Reset:
- '2025-06-11T12:53:18Z'
Anthropic-Ratelimit-Requests-Limit:
- '1000'
Anthropic-Ratelimit-Requests-Remaining:
- '999'
Anthropic-Ratelimit-Requests-Reset:
- '2025-06-11T12:53:19Z'
Anthropic-Ratelimit-Tokens-Limit:
- '120000'
Anthropic-Ratelimit-Tokens-Remaining:
- '120000'
Anthropic-Ratelimit-Tokens-Reset:
- '2025-06-11T12:53:18Z'
Request-Id:
- "<REQUEST_ID>"
Strict-Transport-Security:
- max-age=31536000; includeSubDomains; preload
Anthropic-Organization-Id:
- 0137b15c-16bf-490d-9f90-8cfd7e325ec0
Via:
- 1.1 google
Cf-Cache-Status:
- DYNAMIC
X-Robots-Tag:
- none
Server:
- cloudflare
Cf-Ray:
- "<CF_RAY>"
body:
encoding: UTF-8
string: |+
event: message_start
data: {"type":"message_start","message":{"id":"msg_01C9wXLHGibzr3JZM3HQSiRd","type":"message","role":"assistant","model":"claude-3-5-haiku-20241022","content":[],"stop_reason":null,"stop_sequence":null,"usage":{"input_tokens":15,"cache_creation_input_tokens":0,"cache_read_input_tokens":0,"output_tokens":1,"service_tier":"standard"}}}

event: content_block_start
data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""} }

event: ping
data: {"type": "ping"}

event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Here"} }

event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"'s counting from 1 to"} }

event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":" 3:\n\n1"} }

event: error
data: {"type":"error","error":{"details":null,"type":"overloaded_error","message":"Overloaded"} }


recorded_at: Wed, 11 Jun 2025 12:53:21 GMT
recorded_with: VCR 6.3.1
...
76 changes: 76 additions & 0 deletions spec/ruby_llm/chat_streaming_spec.rb
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes to the spec file are quite different than our current style for spec files. We also assume that cassettes can be removed, and in fact rake vcr:record[anthropic] would remove yours. I'd suggest to mock the messages coming back from the API instead.

It would also be great to test with other providers too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I've removed the VCR cassettes and replaced it with stubbed_requests.

I've also added support for other providers (other than Bedrock)

I've verified the error format used in the mocks against Anthropic and Open AI apis, so I'm reasonably confident in that mocking. I don't have access to Bedrock right now, and the error handling seems somewhat different as far as I can tell, so I haven't added support for that in the tests yet. That would ideally be done by someone with access to Bedrock.

Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,80 @@
end
end
end

describe 'Error handling' do
let(:chat) { RubyLLM.chat(model: 'claude-3-5-haiku-20241022', provider: :anthropic) }

describe 'Faraday version 1' do
before do
stub_const('Faraday::VERSION', '1.10.0')
end

it 'anthropic/claude-3-5-haiku-20241022 supports handling streaming error chunks' do # rubocop:disable RSpec/ExampleLength
VCR.use_cassette(
'chat_streaming_responses_anthropic_claude-3-5-haiku-20241022_supports_streaming_error_chunks',
record: :none
) do
chunks = []

expect do
chat.ask('Count from 1 to 3') do |chunk|
chunks << chunk
end
end.to raise_error(RubyLLM::Error, /Overloaded/)
end
end

it 'anthropic/claude-3-5-haiku-20241022 supports handling streaming error events' do # rubocop:disable RSpec/ExampleLength
VCR.use_cassette(
'chat_streaming_responses_anthropic_claude-3-5-haiku-20241022_supports_streaming_error_events',
record: :none
) do
chunks = []

expect do
chat.ask('Count from 1 to 3') do |chunk|
chunks << chunk
end
end.to raise_error(RubyLLM::Error, /Overloaded/)
end
end
end

describe 'Faraday version 2' do
before do
stub_const('Faraday::VERSION', '2.0.0')
end

it 'anthropic/claude-3-5-haiku-20241022 supports handling streaming error chunks' do # rubocop:disable RSpec/ExampleLength
VCR.use_cassette(
'chat_streaming_responses_anthropic_claude-3-5-haiku-20241022_supports_streaming_error_chunks',
record: :none
) do
chunks = []

expect do
chat.ask('Count from 1 to 3') do |chunk|
chunks << chunk
end
end.to raise_error(RubyLLM::Error, /Overloaded/)
end
end

it 'anthropic/claude-3-5-haiku-20241022 supports handling streaming error events' do # rubocop:disable RSpec/ExampleLength
VCR.use_cassette(
'chat_streaming_responses_anthropic_claude-3-5-haiku-20241022_supports_streaming_error_events',
record: :none
) do
chunks = []

expect do
chat.ask('Count from 1 to 3') do |chunk|
chunks << chunk
end
end.to raise_error(RubyLLM::Error, /Overloaded/)
end
end
end
end
end