diff --git a/packages/opentelemetry-instrumentation-langchain/opentelemetry/instrumentation/langchain/callback_handler.py b/packages/opentelemetry-instrumentation-langchain/opentelemetry/instrumentation/langchain/callback_handler.py index 4ecf0a6150..b4e374feba 100644 --- a/packages/opentelemetry-instrumentation-langchain/opentelemetry/instrumentation/langchain/callback_handler.py +++ b/packages/opentelemetry-instrumentation-langchain/opentelemetry/instrumentation/langchain/callback_handler.py @@ -188,9 +188,12 @@ def _end_span(self, span: Span, run_id: UUID) -> None: token = self.spans[run_id].token if token: try: - context_api.detach(token) - except ValueError: + # Use the runtime context directly to avoid logging from context_api.detach() + from opentelemetry.context import _RUNTIME_CONTEXT + _RUNTIME_CONTEXT.detach(token) + except (ValueError, RuntimeError, Exception): # Context detach can fail in async scenarios when tokens are created in different contexts + # This includes ValueError, RuntimeError, and other context-related exceptions # This is expected behavior and doesn't affect the correct span hierarchy pass diff --git a/packages/opentelemetry-instrumentation-langchain/tests/test_langgraph.py b/packages/opentelemetry-instrumentation-langchain/tests/test_langgraph.py index 51d766cf5a..891b9d09cb 100644 --- a/packages/opentelemetry-instrumentation-langchain/tests/test_langgraph.py +++ b/packages/opentelemetry-instrumentation-langchain/tests/test_langgraph.py @@ -338,3 +338,144 @@ async def run_test_agent(): assert http_call_task_span.parent.span_id == workflow_span.context.span_id assert otel_span_task_span.parent.span_id == workflow_span.context.span_id assert workflow_span.parent.span_id == root_span.context.span_id + + +def test_context_detachment_error_handling( + instrument_legacy, span_exporter, tracer_provider, caplog +): + """ + Test that context detachment errors are handled properly without logging. + + This test specifically validates the fix for the issue where OpenTelemetry + context detachment failures in async scenarios would cause error logging: + 'ERROR:opentelemetry.context:Failed to detach context' + + The test creates conditions that trigger context tokens to be created in + one context and detached in another, which previously caused ValueError + exceptions to be logged by OpenTelemetry's context_api.detach(). + """ + import asyncio + import logging + from opentelemetry import trace + from langgraph.graph import END, START, StateGraph + + trace.set_tracer_provider(tracer_provider) + tracer = trace.get_tracer(__name__) + + with caplog.at_level(logging.ERROR): + + class AsyncTestState(TypedDict): + counter: int + result: str + + async def concurrent_span_node(state: AsyncTestState) -> dict: + """Node that creates spans in async context, triggering potential context issues.""" + with tracer.start_as_current_span("concurrent_async_span") as span: + span.set_attribute("node.type", "concurrent_async") + span.set_attribute("input.counter", state["counter"]) + + await asyncio.sleep(0.001) + + with tracer.start_as_current_span("nested_span") as nested_span: + nested_span.set_attribute("nested.work", True) + await asyncio.sleep(0.001) + + result = f"processed_{state['counter']}" + span.set_attribute("output.result", result) + + return {"counter": state["counter"] + 1, "result": result} + + async def parallel_processing_node(state: AsyncTestState) -> dict: + """Node that processes multiple tasks in parallel, stressing context management.""" + + async def parallel_task(task_id: int): + with tracer.start_as_current_span(f"parallel_task_{task_id}") as span: + span.set_attribute("task.id", task_id) + await asyncio.sleep(0.001) + return f"task_{task_id}_done" + + tasks = [parallel_task(i) for i in range(5)] + parallel_results = await asyncio.gather(*tasks) + combined_result = ( + f"{state['result']} + parallel_results: {','.join(parallel_results)}" + ) + return {"counter": state["counter"], "result": combined_result} + + def build_context_stress_graph(): + """Build a graph designed to stress context management.""" + builder = StateGraph(AsyncTestState) + builder.add_node("concurrent", concurrent_span_node) + builder.add_node("parallel", parallel_processing_node) + + builder.add_edge(START, "concurrent") + builder.add_edge("concurrent", "parallel") + builder.add_edge("parallel", END) + + return builder.compile() + + async def run_concurrent_executions(): + """Run multiple concurrent graph executions to trigger context issues.""" + graph = build_context_stress_graph() + + tasks = [] + for i in range(10): + task = graph.ainvoke({"counter": i, "result": ""}) + tasks.append(task) + + results = await asyncio.gather(*tasks, return_exceptions=True) + return results + + results = asyncio.run(run_concurrent_executions()) + + assert len(results) == 10 + for i, result in enumerate(results): + assert not isinstance(result, Exception), f"Execution {i} failed: {result}" + assert result["counter"] == i + 1 + assert f"processed_{i}" in result["result"] + + spans = span_exporter.get_finished_spans() + + assert len(spans) >= 100, f"Expected at least 100 spans, got {len(spans)}" + + workflow_spans = [s for s in spans if s.name == "LangGraph.workflow"] + concurrent_spans = [s for s in spans if s.name == "concurrent_async_span"] + nested_spans = [s for s in spans if s.name == "nested_span"] + parallel_task_spans = [s for s in spans if s.name.startswith("parallel_task_")] + + assert ( + len(workflow_spans) == 10 + ), f"Expected 10 workflow spans, got {len(workflow_spans)}" + assert ( + len(concurrent_spans) == 10 + ), f"Expected 10 concurrent spans, got {len(concurrent_spans)}" + assert ( + len(nested_spans) == 10 + ), f"Expected 10 nested spans, got {len(nested_spans)}" + assert ( + len(parallel_task_spans) == 50 + ), f"Expected 50 parallel task spans, got {len(parallel_task_spans)}" + + error_logs = [ + record.message + for record in caplog.records + if record.levelno >= logging.ERROR + ] + context_errors = [ + msg for msg in error_logs if "Failed to detach context" in msg + ] + + assert len(context_errors) == 0, ( + f"Found {len(context_errors)} context detachment errors in logs. " + f"This indicates the fix is not working properly. Errors: {context_errors}" + ) + + for nested_span in nested_spans: + assert nested_span.parent is not None, "Nested spans should have parents" + parent_span = next( + (s for s in spans if s.context.span_id == nested_span.parent.span_id), + None, + ) + assert parent_span is not None, "Parent span should exist" + assert ( + parent_span.name == "concurrent_async_span" + ), "Nested span should be child of concurrent_async_span"