Skip to content

Commit a441f93

Browse files
authored
Merge branch '3.17' into backport-14911-to-3.17
2 parents 9a5a3a1 + ac57ec7 commit a441f93

File tree

9 files changed

+550
-5
lines changed

9 files changed

+550
-5
lines changed

ddtrace/contrib/internal/langchain/patch.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -320,10 +320,13 @@ def _on_span_started(span: Span):
320320
integration.record_instance(instance, span)
321321

322322
def _on_span_finished(span: Span, streamed_chunks):
323-
joined_chunks = streamed_chunks[0]
324-
for chunk in streamed_chunks[1:]:
325-
joined_chunks += chunk # base message types support __add__ for concatenation
326323
kwargs["_dd.identifying_params"] = instance._identifying_params
324+
if len(streamed_chunks):
325+
joined_chunks = streamed_chunks[0]
326+
for chunk in streamed_chunks[1:]:
327+
joined_chunks += chunk # base message types support __add__ for concatenation
328+
else:
329+
joined_chunks = []
327330
integration.llmobs_set_tags(span, args=args, kwargs=kwargs, response=joined_chunks, operation="chat")
328331

329332
return shared_stream(

ddtrace/internal/endpoints.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,15 +106,16 @@ def flush(self, max_length: int) -> dict:
106106
"""
107107
Flush the endpoints to a payload, returning the first `max` endpoints.
108108
"""
109+
endpoints_snapshot = tuple(self.endpoints)
109110
if max_length >= len(self.endpoints):
110111
res = {
111112
"is_first": self.is_first,
112-
"endpoints": [dataclasses.asdict(ep, dict_factory=_dict_factory) for ep in self.endpoints],
113+
"endpoints": [dataclasses.asdict(ep, dict_factory=_dict_factory) for ep in endpoints_snapshot],
113114
}
114115
self.reset()
115116
return res
116117
else:
117-
batch = [self.endpoints.pop() for _ in range(max_length)]
118+
batch = tuple(self.endpoints.pop() for _ in range(max_length))
118119
res = {
119120
"is_first": self.is_first,
120121
"endpoints": [dataclasses.asdict(ep, dict_factory=_dict_factory) for ep in batch],
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
---
2+
fixes:
3+
- |
4+
langchain: Fixes an issue where streamed responses that end before the first chunk is received would result in an ``IndexError``.
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
---
2+
fixes:
3+
- |
4+
Fix a potential race condition in the tracer.

tests/contrib/langchain/test_langchain.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -539,3 +539,17 @@ def circumference_tool(radius: float) -> float:
539539
)
540540

541541
calculator.invoke("2", config={"unserializable": object()})
542+
543+
544+
@pytest.mark.snapshot(ignores=["meta.error.stack", "meta.error.message"])
545+
def test_streamed_chat_model_with_no_output(langchain_openai, openai_url):
546+
from openai import APITimeoutError
547+
548+
chat_model = langchain_openai.ChatOpenAI(base_url=openai_url, timeout=0.0001)
549+
550+
result = chat_model.stream("Hello, my name is")
551+
try:
552+
next(result)
553+
except Exception as e:
554+
if not isinstance(e, APITimeoutError):
555+
assert False, f"Expected APITimeoutError, got {e}"

tests/internal/test_endpoints.py

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
import threading
2+
from time import sleep
3+
4+
import pytest
5+
6+
from ddtrace.internal.endpoints import HttpEndPointsCollection
7+
8+
9+
@pytest.fixture
10+
def collection():
11+
coll = HttpEndPointsCollection()
12+
coll.reset()
13+
yield coll
14+
coll.reset()
15+
16+
17+
def test_flush_uses_tuple_snapshot(collection):
18+
"""Test that flush() operates on a tuple snapshot, not the original set."""
19+
collection.add_endpoint("GET", "/api/users")
20+
collection.add_endpoint("POST", "/api/users")
21+
collection.add_endpoint("DELETE", "/api/users/123")
22+
23+
assert len(collection.endpoints) == 3
24+
result = collection.flush(max_length=10)
25+
assert result["is_first"] is True
26+
assert len(result["endpoints"]) == 3
27+
assert len(collection.endpoints) == 0
28+
29+
30+
def test_flush_snapshot_prevents_modification_during_iteration(collection):
31+
"""Test that modifying self.endpoints during flush iteration doesn't cause RuntimeError."""
32+
collection.add_endpoint("GET", "/api/v1")
33+
collection.add_endpoint("POST", "/api/v2")
34+
collection.add_endpoint("PUT", "/api/v3")
35+
36+
initial_count = len(collection.endpoints)
37+
assert initial_count == 3
38+
result = collection.flush(max_length=10)
39+
40+
assert len(result["endpoints"]) == initial_count
41+
assert result["is_first"] is True
42+
43+
44+
def test_concurrent_add_during_flush_does_not_break_iteration(collection):
45+
"""Test that adding endpoints from another thread during flush doesn't cause RuntimeError."""
46+
for i in range(5):
47+
collection.add_endpoint("GET", f"/api/endpoint{i}")
48+
49+
assert len(collection.endpoints) == 5
50+
51+
flush_completed = threading.Event()
52+
flush_result = {}
53+
exception_caught = []
54+
55+
def flush_thread():
56+
try:
57+
result = collection.flush(max_length=10)
58+
flush_result["data"] = result
59+
flush_completed.set()
60+
except Exception as e:
61+
exception_caught.append(e)
62+
flush_completed.set()
63+
64+
def add_thread():
65+
sleep(0.001)
66+
67+
# Try to modify the set while flush might be iterating
68+
for i in range(5, 10):
69+
collection.add_endpoint("POST", f"/api/new{i}")
70+
sleep(0.001)
71+
72+
t1 = threading.Thread(target=flush_thread)
73+
t2 = threading.Thread(target=add_thread)
74+
75+
t1.start()
76+
t2.start()
77+
78+
t1.join(timeout=2.0)
79+
t2.join(timeout=2.0)
80+
81+
assert flush_completed.is_set(), "Flush did not complete"
82+
assert len(exception_caught) == 0, f"Exception occurred during flush: {exception_caught}"
83+
assert "data" in flush_result, "Flush did not return a result"
84+
85+
result = flush_result["data"]
86+
assert "endpoints" in result
87+
assert "is_first" in result
88+
89+
90+
def test_flush_with_partial_batch(collection):
91+
"""Test that flush creates a tuple snapshot even when using pop() for partial batches."""
92+
for i in range(10):
93+
collection.add_endpoint("GET", f"/api/endpoint{i}")
94+
95+
assert len(collection.endpoints) == 10
96+
97+
result = collection.flush(max_length=5)
98+
99+
assert len(result["endpoints"]) == 5
100+
assert result["is_first"] is True
101+
assert len(collection.endpoints) == 5
102+
103+
result2 = collection.flush(max_length=10)
104+
assert len(result2["endpoints"]) == 5
105+
assert result2["is_first"] is False # Not first anymore
106+
107+
assert len(collection.endpoints) == 0
108+
109+
110+
def test_partial_flush_with_concurrent_modification(collection):
111+
"""Test that partial flush (max_length < size) is safe from race conditions."""
112+
for i in range(10):
113+
collection.add_endpoint("GET", f"/api/endpoint{i}")
114+
115+
assert len(collection.endpoints) == 10
116+
117+
flush_completed = threading.Event()
118+
flush_result = {}
119+
exception_caught = []
120+
121+
def flush_thread():
122+
try:
123+
# Partial flush - this should trigger the else branch at line 118
124+
result = collection.flush(max_length=5)
125+
flush_result["data"] = result
126+
flush_completed.set()
127+
except Exception as e:
128+
exception_caught.append(e)
129+
flush_completed.set()
130+
131+
def add_thread():
132+
sleep(0.001)
133+
# Try to modify the set while flush might be iterating
134+
for i in range(10, 15):
135+
collection.add_endpoint("POST", f"/api/new{i}")
136+
sleep(0.001)
137+
138+
t1 = threading.Thread(target=flush_thread)
139+
t2 = threading.Thread(target=add_thread)
140+
141+
t1.start()
142+
t2.start()
143+
144+
t1.join(timeout=2.0)
145+
t2.join(timeout=2.0)
146+
147+
assert flush_completed.is_set(), "Flush did not complete"
148+
assert len(exception_caught) == 0, f"Exception occurred during flush: {exception_caught}"
149+
assert "data" in flush_result, "Flush did not return a result"
150+
151+
result = flush_result["data"]
152+
assert len(result["endpoints"]) == 5
153+
assert "is_first" in result
154+
155+
156+
def test_http_endpoint_hash_consistency(collection):
157+
"""Test that HttpEndPoint hashing works correctly for set operations."""
158+
collection.add_endpoint("GET", "/api/test")
159+
collection.add_endpoint("GET", "/api/test")
160+
assert len(collection.endpoints) == 1
161+
162+
collection.add_endpoint("POST", "/api/test")
163+
collection.add_endpoint("GET", "/api/other")
164+
assert len(collection.endpoints) == 3
165+
166+
167+
def test_snapshot_is_tuple_type(collection):
168+
"""Verify that the snapshot created in flush is actually a tuple."""
169+
collection.add_endpoint("GET", "/test")
170+
collection.add_endpoint("POST", "/test")
171+
assert isinstance(collection.endpoints, set)
172+
173+
result = collection.flush(max_length=10)
174+
assert len(result["endpoints"]) == 2
175+
176+
for ep in result["endpoints"]:
177+
assert isinstance(ep, dict)
178+
assert "method" in ep
179+
assert "path" in ep
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
interactions:
2+
- request:
3+
body: '{"messages":[{"content":"Hello, my name is","role":"user"}],"model":"gpt-3.5-turbo","n":1,"stream":true,"temperature":0.7}'
4+
headers:
5+
? !!python/object/apply:multidict._multidict.istr
6+
- Accept
7+
: - application/json
8+
? !!python/object/apply:multidict._multidict.istr
9+
- Accept-Encoding
10+
: - gzip, deflate
11+
? !!python/object/apply:multidict._multidict.istr
12+
- Connection
13+
: - keep-alive
14+
Content-Length:
15+
- '122'
16+
? !!python/object/apply:multidict._multidict.istr
17+
- Content-Type
18+
: - application/json
19+
? !!python/object/apply:multidict._multidict.istr
20+
- User-Agent
21+
: - OpenAI/Python 1.109.1
22+
? !!python/object/apply:multidict._multidict.istr
23+
- X-Stainless-Arch
24+
: - arm64
25+
? !!python/object/apply:multidict._multidict.istr
26+
- X-Stainless-Async
27+
: - 'false'
28+
? !!python/object/apply:multidict._multidict.istr
29+
- X-Stainless-Lang
30+
: - python
31+
? !!python/object/apply:multidict._multidict.istr
32+
- X-Stainless-OS
33+
: - MacOS
34+
? !!python/object/apply:multidict._multidict.istr
35+
- X-Stainless-Package-Version
36+
: - 1.109.1
37+
? !!python/object/apply:multidict._multidict.istr
38+
- X-Stainless-Runtime
39+
: - CPython
40+
? !!python/object/apply:multidict._multidict.istr
41+
- X-Stainless-Runtime-Version
42+
: - 3.10.13
43+
? !!python/object/apply:multidict._multidict.istr
44+
- x-stainless-read-timeout
45+
: - '0.0001'
46+
? !!python/object/apply:multidict._multidict.istr
47+
- x-stainless-retry-count
48+
: - '0'
49+
method: POST
50+
uri: https://api.openai.com/v1/chat/completions
51+
response:
52+
body:
53+
string: 'data: {"id":"chatcmpl-CTDgKS9E25YxCQOhLhNmhfOPiznNk","object":"chat.completion.chunk","created":1761080140,"model":"gpt-3.5-turbo-0125","service_tier":"default","system_fingerprint":null,"choices":[{"index":0,"delta":{"role":"assistant","content":"","refusal":null},"logprobs":null,"finish_reason":null}],"obfuscation":"dDINu"}
54+
55+
56+
data: {"id":"chatcmpl-CTDgKS9E25YxCQOhLhNmhfOPiznNk","object":"chat.completion.chunk","created":1761080140,"model":"gpt-3.5-turbo-0125","service_tier":"default","system_fingerprint":null,"choices":[{"index":0,"delta":{"content":"Assistant"},"logprobs":null,"finish_reason":null}],"obfuscation":"1yL6DImc2cky67"}
57+
58+
59+
data: {"id":"chatcmpl-CTDgKS9E25YxCQOhLhNmhfOPiznNk","object":"chat.completion.chunk","created":1761080140,"model":"gpt-3.5-turbo-0125","service_tier":"default","system_fingerprint":null,"choices":[{"index":0,"delta":{"content":"."},"logprobs":null,"finish_reason":null}],"obfuscation":"kyn0dy"}
60+
61+
62+
data: {"id":"chatcmpl-CTDgKS9E25YxCQOhLhNmhfOPiznNk","object":"chat.completion.chunk","created":1761080140,"model":"gpt-3.5-turbo-0125","service_tier":"default","system_fingerprint":null,"choices":[{"index":0,"delta":{"content":"
63+
How"},"logprobs":null,"finish_reason":null}],"obfuscation":"zF8"}
64+
65+
66+
data: {"id":"chatcmpl-CTDgKS9E25YxCQOhLhNmhfOPiznNk","object":"chat.completion.chunk","created":1761080140,"model":"gpt-3.5-turbo-0125","service_tier":"default","system_fingerprint":null,"choices":[{"index":0,"delta":{"content":"
67+
can"},"logprobs":null,"finish_reason":null}],"obfuscation":"ZOA"}
68+
69+
70+
data: {"id":"chatcmpl-CTDgKS9E25YxCQOhLhNmhfOPiznNk","object":"chat.completion.chunk","created":1761080140,"model":"gpt-3.5-turbo-0125","service_tier":"default","system_fingerprint":null,"choices":[{"index":0,"delta":{"content":"
71+
I"},"logprobs":null,"finish_reason":null}],"obfuscation":"yPnQN"}
72+
73+
74+
data: {"id":"chatcmpl-CTDgKS9E25YxCQOhLhNmhfOPiznNk","object":"chat.completion.chunk","created":1761080140,"model":"gpt-3.5-turbo-0125","service_tier":"default","system_fingerprint":null,"choices":[{"index":0,"delta":{"content":"
75+
assist"},"logprobs":null,"finish_reason":null}],"obfuscation":""}
76+
77+
78+
data: {"id":"chatcmpl-CTDgKS9E25YxCQOhLhNmhfOPiznNk","object":"chat.completion.chunk","created":1761080140,"model":"gpt-3.5-turbo-0125","service_tier":"default","system_fingerprint":null,"choices":[{"index":0,"delta":{"content":"
79+
you"},"logprobs":null,"finish_reason":null}],"obfuscation":"nG8"}
80+
81+
82+
data: {"id":"chatcmpl-CTDgKS9E25YxCQOhLhNmhfOPiznNk","object":"chat.completion.chunk","created":1761080140,"model":"gpt-3.5-turbo-0125","service_tier":"default","system_fingerprint":null,"choices":[{"index":0,"delta":{"content":"
83+
today"},"logprobs":null,"finish_reason":null}],"obfuscation":"a"}
84+
85+
86+
data: {"id":"chatcmpl-CTDgKS9E25YxCQOhLhNmhfOPiznNk","object":"chat.completion.chunk","created":1761080140,"model":"gpt-3.5-turbo-0125","service_tier":"default","system_fingerprint":null,"choices":[{"index":0,"delta":{"content":"?"},"logprobs":null,"finish_reason":null}],"obfuscation":"KxSued"}
87+
88+
89+
data: {"id":"chatcmpl-CTDgKS9E25YxCQOhLhNmhfOPiznNk","object":"chat.completion.chunk","created":1761080140,"model":"gpt-3.5-turbo-0125","service_tier":"default","system_fingerprint":null,"choices":[{"index":0,"delta":{},"logprobs":null,"finish_reason":"stop"}],"obfuscation":"Q"}
90+
91+
92+
data: [DONE]
93+
94+
95+
'
96+
headers:
97+
CF-RAY:
98+
- 9923a83a5cd9081d-IAD
99+
Connection:
100+
- keep-alive
101+
Content-Type:
102+
- text/event-stream; charset=utf-8
103+
Date:
104+
- Tue, 21 Oct 2025 20:55:40 GMT
105+
Server:
106+
- cloudflare
107+
Set-Cookie:
108+
- __cf_bm=o7TNYeatXbBOlFxeMlhl.8fe7kXVRNTm_dL98zIje8M-1761080140-1.0.1.1-FRpu._KCnEk.aGZG5YQ75Od_Ucq8okx9WLNY3JjdbbK3P7mwxS21FIvTtyY6GlllpujKEYLWkFHG6VIIw4zZmwH0yDL04t_gvEzyjWwc1qc;
109+
path=/; expires=Tue, 21-Oct-25 21:25:40 GMT; domain=.api.openai.com; HttpOnly;
110+
Secure; SameSite=None
111+
- _cfuvid=bZiyEgtQ9lNlCOG2DaVgkJLkXs33574MrsoVFl0iIf4-1761080140438-0.0.1.1-604800000;
112+
path=/; domain=.api.openai.com; HttpOnly; Secure; SameSite=None
113+
Strict-Transport-Security:
114+
- max-age=31536000; includeSubDomains; preload
115+
Transfer-Encoding:
116+
- chunked
117+
X-Content-Type-Options:
118+
- nosniff
119+
access-control-expose-headers:
120+
- X-Request-ID
121+
alt-svc:
122+
- h3=":443"; ma=86400
123+
cf-cache-status:
124+
- DYNAMIC
125+
openai-organization:
126+
- datadog-staging
127+
openai-processing-ms:
128+
- '151'
129+
openai-project:
130+
- proj_gt6TQZPRbZfoY2J9AQlEJMpd
131+
openai-version:
132+
- '2020-10-01'
133+
x-envoy-upstream-service-time:
134+
- '174'
135+
x-openai-proxy-wasm:
136+
- v0.1
137+
x-ratelimit-limit-requests:
138+
- '10000'
139+
x-ratelimit-limit-tokens:
140+
- '50000000'
141+
x-ratelimit-remaining-requests:
142+
- '9999'
143+
x-ratelimit-remaining-tokens:
144+
- '49999993'
145+
x-ratelimit-reset-requests:
146+
- 6ms
147+
x-ratelimit-reset-tokens:
148+
- 0s
149+
x-request-id:
150+
- req_c9c685c9da2e4684b4613cefd4af98e8
151+
status:
152+
code: 200
153+
message: OK
154+
version: 1

0 commit comments

Comments
 (0)