From 94b5fecdde9bd366447a14ecdf5294f8f96dbdf1 Mon Sep 17 00:00:00 2001 From: Xiaolin Lin Date: Sat, 28 Feb 2026 22:10:48 -0500 Subject: [PATCH] Pass through Anthropic stream errors Signed-off-by: Xiaolin Lin --- internal/translator/anthropic_helper.go | 18 +++++++++- .../translator/openai_awsanthropic_test.go | 36 +++++++++++++++++++ 2 files changed, 53 insertions(+), 1 deletion(-) diff --git a/internal/translator/anthropic_helper.go b/internal/translator/anthropic_helper.go index cb7a4178a5..74201d1a92 100644 --- a/internal/translator/anthropic_helper.go +++ b/internal/translator/anthropic_helper.go @@ -9,6 +9,7 @@ import ( "bytes" "cmp" "encoding/base64" + "errors" "fmt" "io" "strings" @@ -723,6 +724,13 @@ func newAnthropicStreamParser(requestModel string) *anthropicStreamParser { func (p *anthropicStreamParser) writeChunk(eventBlock []byte, buf *[]byte) error { chunk, err := p.parseAndHandleEvent(eventBlock) if err != nil { + var streamErr anthropicStreamErrorEvent + if errors.As(err, &streamErr) { + *buf = append(*buf, sseDataPrefix...) + *buf = append(*buf, streamErr.payload...) + *buf = append(*buf, '\n', '\n') + return nil + } return err } if chunk != nil { @@ -832,6 +840,14 @@ func (p *anthropicStreamParser) Process(body io.Reader, endOfStream bool, span t return } +type anthropicStreamErrorEvent struct { + payload []byte +} + +func (e anthropicStreamErrorEvent) Error() string { + return "anthropic stream error event" +} + func (p *anthropicStreamParser) parseAndHandleEvent(eventBlock []byte) (*openai.ChatCompletionResponseChunk, error) { var eventType []byte var eventData []byte @@ -1041,7 +1057,7 @@ func (p *anthropicStreamParser) handleAnthropicStreamEvent(eventType []byte, dat if err := json.Unmarshal(data, &errEvent); err != nil { return nil, fmt.Errorf("unparsable error event: %s", string(data)) } - return nil, fmt.Errorf("anthropic stream error: %s - %s", errEvent.Error.Type, errEvent.Error.Message) + return nil, anthropicStreamErrorEvent{payload: data} case "ping": // Per documentation, ping events can be ignored. diff --git a/internal/translator/openai_awsanthropic_test.go b/internal/translator/openai_awsanthropic_test.go index 4c983bc03c..abebc86353 100644 --- a/internal/translator/openai_awsanthropic_test.go +++ b/internal/translator/openai_awsanthropic_test.go @@ -675,6 +675,19 @@ func TestAWSAnthropicStreamParser_ErrorHandling(t *testing.T) { return err } + runStreamBodyTest := func(t *testing.T, sseStream string, endOfStream bool) ([]byte, error) { + eventStreamData, err := wrapAnthropicSSEInEventStream(sseStream) + require.NoError(t, err) + + openAIReq := &openai.ChatCompletionRequest{Stream: true, Model: "test-model", MaxTokens: new(int64)} + translator := NewChatCompletionOpenAIToAWSAnthropicTranslator("", "").(*openAIToAWSAnthropicTranslatorV1ChatCompletion) + _, _, err = translator.RequestBody(nil, openAIReq, false) + require.NoError(t, err) + + _, body, _, _, err := translator.ResponseBody(map[string]string{}, bytes.NewReader(eventStreamData), endOfStream, nil) + return body, err + } + tests := []struct { name string sseStream string @@ -706,6 +719,29 @@ func TestAWSAnthropicStreamParser_ErrorHandling(t *testing.T) { }) } + t.Run("forwards anthropic error event and continues stream", func(t *testing.T) { + sseStream := `event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"before error"}} + +event: error +data: {"type":"error","error":{"type":"overloaded_error","message":"Overloaded"}} + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"after error"}} + +event: message_stop +data: {"type":"message_stop"} +` + body, err := runStreamBodyTest(t, sseStream, true) + require.NoError(t, err) + + bodyStr := string(body) + require.Contains(t, bodyStr, `"content":"before error"`) + require.Contains(t, bodyStr, `"error":{"type":"overloaded_error","message":"Overloaded"}`) + require.Contains(t, bodyStr, `"content":"after error"`) + require.Contains(t, bodyStr, string(sseDoneMessage)) + }) + t.Run("body read error", func(t *testing.T) { parser := newAnthropicStreamParser("test-model") _, _, _, _, err := parser.Process(&mockErrorReader{}, false, nil)