Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,12 @@ def _end_span(self, span: Span, run_id: UUID) -> None:
token = self.spans[run_id].token
if token:
try:
context_api.detach(token)
except ValueError:
# Use the runtime context directly to avoid logging from context_api.detach()
from opentelemetry.context import _RUNTIME_CONTEXT
_RUNTIME_CONTEXT.detach(token)
except (ValueError, RuntimeError, Exception):
# Context detach can fail in async scenarios when tokens are created in different contexts
# This includes ValueError, RuntimeError, and other context-related exceptions
# This is expected behavior and doesn't affect the correct span hierarchy
Comment on lines +191 to 197
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Avoid importing and using private _RUNTIME_CONTEXT inline; add a safe, module-level fallback and narrow exception scope

Importing a private symbol inside the method on every call is brittle and adds overhead. Also, catching broad Exception can hide unrelated bugs. Prefer a module-level optional import (with fallback to context_api.detach) and only catch the expected detach errors.

Apply this diff within the selected range:

-            try:
-                # Use the runtime context directly to avoid logging from context_api.detach()
-                from opentelemetry.context import _RUNTIME_CONTEXT
-                _RUNTIME_CONTEXT.detach(token)
-            except (ValueError, RuntimeError, Exception):
-                # Context detach can fail in async scenarios when tokens are created in different contexts
-                # This includes ValueError, RuntimeError, and other context-related exceptions
-                # This is expected behavior and doesn't affect the correct span hierarchy
-                pass
+            try:
+                # Prefer the runtime context to avoid error-level logging in context_api.detach()
+                if _OTEL_RUNTIME_CONTEXT is not None:
+                    _OTEL_RUNTIME_CONTEXT.detach(token)
+                else:
+                    context_api.detach(token)
+            except (ValueError, RuntimeError):
+                # Detach can fail when token belongs to a different async context; ignore.
+                pass

And at the top of the file (module scope), add a safe optional import:

# Prefer runtime context when available to avoid logging in context_api.detach()
try:
    from opentelemetry.context import _RUNTIME_CONTEXT as _OTEL_RUNTIME_CONTEXT  # type: ignore[attr-defined]
except Exception:
    _OTEL_RUNTIME_CONTEXT = None  # type: ignore[assignment]

Rationale:

  • _RUNTIME_CONTEXT is a private API; guarding with a fallback limits blast radius if the upstream package changes.
  • Narrowing to (ValueError, RuntimeError) prevents masking unexpected failures while still addressing the async token-mismatch cases.
🤖 Prompt for AI Agents
In
packages/opentelemetry-instrumentation-langchain/opentelemetry/instrumentation/langchain/callback_handler.py
around lines 191-197, replace the inline import and broad Exception catch: add a
module-level optional import of _RUNTIME_CONTEXT (named _OTEL_RUNTIME_CONTEXT)
at the top of the file with a safe fallback to None, then in this block use
_OTEL_RUNTIME_CONTEXT.detach(token) when available and otherwise call
context_api.detach(token); also narrow the except clause to only catch
ValueError and RuntimeError so unrelated exceptions are not swallowed.

pass

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,3 +338,144 @@ async def run_test_agent():
assert http_call_task_span.parent.span_id == workflow_span.context.span_id
assert otel_span_task_span.parent.span_id == workflow_span.context.span_id
assert workflow_span.parent.span_id == root_span.context.span_id


def test_context_detachment_error_handling(
instrument_legacy, span_exporter, tracer_provider, caplog
):
"""
Test that context detachment errors are handled properly without logging.

This test specifically validates the fix for the issue where OpenTelemetry
context detachment failures in async scenarios would cause error logging:
'ERROR:opentelemetry.context:Failed to detach context'

The test creates conditions that trigger context tokens to be created in
one context and detached in another, which previously caused ValueError
exceptions to be logged by OpenTelemetry's context_api.detach().
"""
import asyncio
import logging
from opentelemetry import trace
from langgraph.graph import END, START, StateGraph

trace.set_tracer_provider(tracer_provider)
tracer = trace.get_tracer(__name__)

with caplog.at_level(logging.ERROR):

class AsyncTestState(TypedDict):
counter: int
result: str

async def concurrent_span_node(state: AsyncTestState) -> dict:
"""Node that creates spans in async context, triggering potential context issues."""
with tracer.start_as_current_span("concurrent_async_span") as span:
span.set_attribute("node.type", "concurrent_async")
span.set_attribute("input.counter", state["counter"])

await asyncio.sleep(0.001)

with tracer.start_as_current_span("nested_span") as nested_span:
nested_span.set_attribute("nested.work", True)
await asyncio.sleep(0.001)

result = f"processed_{state['counter']}"
span.set_attribute("output.result", result)

return {"counter": state["counter"] + 1, "result": result}

async def parallel_processing_node(state: AsyncTestState) -> dict:
"""Node that processes multiple tasks in parallel, stressing context management."""

async def parallel_task(task_id: int):
with tracer.start_as_current_span(f"parallel_task_{task_id}") as span:
span.set_attribute("task.id", task_id)
await asyncio.sleep(0.001)
return f"task_{task_id}_done"

tasks = [parallel_task(i) for i in range(5)]
parallel_results = await asyncio.gather(*tasks)
combined_result = (
f"{state['result']} + parallel_results: {','.join(parallel_results)}"
)
return {"counter": state["counter"], "result": combined_result}

def build_context_stress_graph():
"""Build a graph designed to stress context management."""
builder = StateGraph(AsyncTestState)
builder.add_node("concurrent", concurrent_span_node)
builder.add_node("parallel", parallel_processing_node)

builder.add_edge(START, "concurrent")
builder.add_edge("concurrent", "parallel")
builder.add_edge("parallel", END)

return builder.compile()

async def run_concurrent_executions():
"""Run multiple concurrent graph executions to trigger context issues."""
graph = build_context_stress_graph()

tasks = []
for i in range(10):
task = graph.ainvoke({"counter": i, "result": ""})
tasks.append(task)

results = await asyncio.gather(*tasks, return_exceptions=True)
return results

results = asyncio.run(run_concurrent_executions())

assert len(results) == 10
for i, result in enumerate(results):
assert not isinstance(result, Exception), f"Execution {i} failed: {result}"
assert result["counter"] == i + 1
assert f"processed_{i}" in result["result"]

spans = span_exporter.get_finished_spans()

assert len(spans) >= 100, f"Expected at least 100 spans, got {len(spans)}"

workflow_spans = [s for s in spans if s.name == "LangGraph.workflow"]
concurrent_spans = [s for s in spans if s.name == "concurrent_async_span"]
nested_spans = [s for s in spans if s.name == "nested_span"]
parallel_task_spans = [s for s in spans if s.name.startswith("parallel_task_")]

assert (
len(workflow_spans) == 10
), f"Expected 10 workflow spans, got {len(workflow_spans)}"
assert (
len(concurrent_spans) == 10
), f"Expected 10 concurrent spans, got {len(concurrent_spans)}"
assert (
len(nested_spans) == 10
), f"Expected 10 nested spans, got {len(nested_spans)}"
assert (
len(parallel_task_spans) == 50
), f"Expected 50 parallel task spans, got {len(parallel_task_spans)}"

error_logs = [
record.message
for record in caplog.records
if record.levelno >= logging.ERROR
]
context_errors = [
msg for msg in error_logs if "Failed to detach context" in msg
]

assert len(context_errors) == 0, (
f"Found {len(context_errors)} context detachment errors in logs. "
f"This indicates the fix is not working properly. Errors: {context_errors}"
)

for nested_span in nested_spans:
assert nested_span.parent is not None, "Nested spans should have parents"
parent_span = next(
(s for s in spans if s.context.span_id == nested_span.parent.span_id),
None,
)
assert parent_span is not None, "Parent span should exist"
assert (
parent_span.name == "concurrent_async_span"
), "Nested span should be child of concurrent_async_span"