Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from opentelemetry.instrumentation.langchain.config import Config
from wrapt import wrap_function_wrapper

from opentelemetry import context as context_api

from opentelemetry.trace import get_tracer

from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
Expand All @@ -24,7 +26,10 @@
)

from opentelemetry.metrics import get_meter
from opentelemetry.semconv_ai import Meters
from opentelemetry.semconv_ai import (
Meters,
SUPPRESS_LANGUAGE_MODEL_INSTRUMENTATION_KEY
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -176,8 +181,8 @@ def _uninstrument(self, **kwargs):


class _BaseCallbackManagerInitWrapper:
def __init__(self, callback_manager: "TraceloopCallbackHandler"):
self._callback_manager = callback_manager
def __init__(self, callback_handler: "TraceloopCallbackHandler"):
self._callback_handler = callback_handler

def __call__(
self,
Expand All @@ -188,10 +193,14 @@ def __call__(
) -> None:
wrapped(*args, **kwargs)
for handler in instance.inheritable_handlers:
if isinstance(handler, type(self._callback_manager)):
if isinstance(handler, type(self._callback_handler)):
break
else:
instance.add_handler(self._callback_manager, True)
# Add a property to the handler which indicates the CallbackManager instance.
# Since the CallbackHandler only propagates context for sync callbacks,
# we need a way to determine the type of CallbackManager being wrapped.
self._callback_handler._callback_manager = instance
instance.add_handler(self._callback_handler, True)


# This class wraps a function call to inject tracing information (trace headers) into
Expand Down Expand Up @@ -227,4 +236,10 @@ def __call__(
# Update kwargs to include the modified headers
kwargs["extra_headers"] = extra_headers

# In legacy chains like LLMChain, suppressing model instrumentations
# within create_llm_span doesn't work, so this should helps as a fallback
context_api.attach(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider capturing the token returned by context_api.attach and detaching it after the wrapped call to limit the suppression flag's scope. This helps avoid unintended propagation in subsequent calls.

context_api.set_value(SUPPRESS_LANGUAGE_MODEL_INSTRUMENTATION_KEY, True)
)

return wrapped(*args, **kwargs)
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

from langchain_core.callbacks import (
BaseCallbackHandler,
CallbackManager,
AsyncCallbackManager,
)
from langchain_core.messages import BaseMessage
from langchain_core.outputs import LLMResult
Expand Down Expand Up @@ -80,7 +82,7 @@ def _set_request_params(span, kwargs, span_holder: SpanHolder):

_set_span_attribute(span, SpanAttributes.LLM_REQUEST_MODEL, model)
# response is not available for LLM requests (as opposed to chat)
_set_span_attribute(span, SpanAttributes.LLM_RESPONSE_MODEL, model)
_set_span_attribute(span, SpanAttributes.LLM_RESPONSE_MODEL, model or "unknown")

if "invocation_params" in kwargs:
params = (
Expand Down Expand Up @@ -370,6 +372,7 @@ def __init__(
self.token_histogram = token_histogram
self.spans: dict[UUID, SpanHolder] = {}
self.run_inline = True
self._callback_manager: CallbackManager | AsyncCallbackManager = None

@staticmethod
def _get_name_from_callback(
Expand Down Expand Up @@ -399,6 +402,9 @@ def _end_span(self, span: Span, run_id: UUID) -> None:
if child_span.end_time is None: # avoid warning on ended spans
child_span.end()
span.end()
token = self.spans[run_id].token
if token:
context_api.detach(token)

def _create_span(
self,
Expand Down Expand Up @@ -437,13 +443,17 @@ def _create_span(
else:
span = self.tracer.start_span(span_name, kind=kind)

token = None
# TODO: make this unconditional once attach/detach works properly with async callbacks.
# Currently, it doesn't work due to this - https://github.com/langchain-ai/langchain/issues/31398
# As a sidenote, OTel Python users also report similar issues -
# https://github.com/open-telemetry/opentelemetry-python/issues/2606
if self._callback_manager and not self._callback_manager.is_async:
token = context_api.attach(set_span_in_context(span))

_set_span_attribute(span, SpanAttributes.TRACELOOP_WORKFLOW_NAME, workflow_name)
_set_span_attribute(span, SpanAttributes.TRACELOOP_ENTITY_PATH, entity_path)

token = context_api.attach(
context_api.set_value(SUPPRESS_LANGUAGE_MODEL_INSTRUMENTATION_KEY, True)
)

self.spans[run_id] = SpanHolder(
span, token, None, [], workflow_name, entity_name, entity_path
)
Expand Down Expand Up @@ -503,6 +513,16 @@ def _create_llm_span(
_set_span_attribute(span, SpanAttributes.LLM_SYSTEM, "Langchain")
_set_span_attribute(span, SpanAttributes.LLM_REQUEST_TYPE, request_type.value)

# we already have an LLM span by this point,
# so skip any downstream instrumentation from here
token = context_api.attach(
context_api.set_value(SUPPRESS_LANGUAGE_MODEL_INSTRUMENTATION_KEY, True)
)

self.spans[run_id] = SpanHolder(
span, token, None, [], workflow_name, None, entity_path
)

return span

@dont_throw
Expand Down Expand Up @@ -661,7 +681,7 @@ def on_llm_end(
"model_name"
) or response.llm_output.get("model_id")
if model_name is not None:
_set_span_attribute(span, SpanAttributes.LLM_RESPONSE_MODEL, model_name)
_set_span_attribute(span, SpanAttributes.LLM_RESPONSE_MODEL, model_name or "unknown")

if self.spans[run_id].request_model is None:
_set_span_attribute(span, SpanAttributes.LLM_REQUEST_MODEL, model_name)
Expand Down
Loading