Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
"""
LangGraph StateGraph example with an LLM node.

Similar to the manual example (../manual/main.py) but uses LangGraph's StateGraph
with a node that calls ChatOpenAI. OpenTelemetry LangChain instrumentation traces
the LLM calls made from within the graph node.
"""

from typing import Annotated


from langchain_core.messages import HumanMessage, SystemMessage
from langchain_openai import ChatOpenAI
from langgraph.graph import END, START, StateGraph
from langgraph.graph.message import add_messages
from typing_extensions import TypedDict

from opentelemetry import _logs, metrics, trace
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import (
OTLPLogExporter,
)
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import (
OTLPMetricExporter,
)
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
OTLPSpanExporter,
)
from opentelemetry.instrumentation.langchain import LangChainInstrumentor
from opentelemetry.sdk._logs import LoggerProvider
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor


# Configure tracing
trace.set_tracer_provider(TracerProvider())
span_processor = BatchSpanProcessor(OTLPSpanExporter())
trace.get_tracer_provider().add_span_processor(span_processor)

# Configure logging
_logs.set_logger_provider(LoggerProvider())
_logs.get_logger_provider().add_log_record_processor(
BatchLogRecordProcessor(OTLPLogExporter())
)

# Configure metrics
metrics.set_meter_provider(
MeterProvider(
metric_readers=[
PeriodicExportingMetricReader(
OTLPMetricExporter(),
),
]
)
)


class GraphState(TypedDict):
"""State for the graph; messages are accumulated with add_messages."""

messages: Annotated[list, add_messages]


def build_graph(llm: ChatOpenAI):
"""Build a StateGraph with a single LLM node."""

def llm_node(state: GraphState) -> dict:
"""Node that invokes the LLM with the current messages."""
response = llm.invoke(state["messages"])
return {"messages": [response]}

builder = StateGraph(GraphState)
builder.add_node("llm", llm_node)
builder.add_edge(START, "llm")
builder.add_edge("llm", END)
return builder.compile()


def main():
# Set up instrumentation (traces LLM calls from within graph nodes)
LangChainInstrumentor().instrument()

# ChatOpenAI setup
llm = ChatOpenAI(
model="gpt-3.5-turbo",
temperature=0.1,
max_tokens=100,
top_p=0.9,
frequency_penalty=0.5,
presence_penalty=0.5,
stop_sequences=["\n", "Human:", "AI:"],
seed=100,
)

graph = build_graph(llm)

initial_messages = [
SystemMessage(content="You are a helpful assistant!"),
HumanMessage(content="What is the capital of France?"),
]

result = graph.invoke({"messages": initial_messages})

print("LangGraph output (messages):")
for msg in result.get("messages", []):
print(f" {type(msg).__name__}: {msg.content}")

# Un-instrument after use
LangChainInstrumentor().uninstrument()


if __name__ == "__main__":
main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
langchain==0.3.21
langchain_openai
langgraph
opentelemetry-sdk>=1.39.0
opentelemetry-exporter-otlp-proto-grpc>=1.39.0

# Uncomment after langchain instrumentation is released
# opentelemetry-instrumentation-langchain~=2.0b0.dev
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def _instrument(self, **kwargs: Any):
)

wrap_function_wrapper(
module="langchain_core.callbacks",
target="langchain_core.callbacks",
name="BaseCallbackManager.__init__",
wrapper=_BaseCallbackManagerInitWrapper(otel_callback_handler),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
MessagePart,
OutputMessage,
Text,
WorkflowInvocation,
)


Expand All @@ -45,6 +46,79 @@ def __init__(self, telemetry_handler: TelemetryHandler) -> None:
self._telemetry_handler = telemetry_handler
self._invocation_manager = _InvocationManager()

def on_chain_start(
self,
serialized: dict[str, Any],
inputs: dict[str, Any],
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
tags: Optional[list[str]] = None,
metadata: Optional[dict[str, Any]] = None,
**kwargs: Any,
) -> Any:
payload = serialized or {}
name_source = (
payload.get("name")
or payload.get("id")
or kwargs.get("name")
or (metadata.get("langgraph_node") if metadata else None)
)
name = _safe_str(name_source or "chain")

if parent_run_id is None:
workflow_name_override = metadata.get("workflow_name") if metadata else None
wf = WorkflowInvocation(name=workflow_name_override or name)
self._telemetry_handler.start(wf)
self._invocation_manager.add_invocation_state(run_id, None, wf)
return
else:
self._invocation_manager.add_invocation_state(run_id, parent_run_id)


def on_chain_end(
self,
outputs: dict[str, Any],
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
**kwargs: Any,
) -> Any:
invocation = self._invocation_manager.get_invocation(run_id=run_id)
if invocation is None or not isinstance(
invocation, WorkflowInvocation
):
# If the invocation does not exist, we cannot set attributes or end it
return

self._telemetry_handler.stop(invocation)

if invocation.span and not invocation.span.is_recording():
self._invocation_manager.delete_invocation_state(run_id)


def on_chain_error(
self,
error: BaseException,
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
**kwargs: Any,
) -> Any:
invocation = self._invocation_manager.get_invocation(run_id=run_id)
if invocation is None or not isinstance(
invocation, WorkflowInvocation
):
# If the invocation does not exist, we cannot set attributes or end it
return

error_otel = Error(message=str(error), type=type(error))
invocation = self._telemetry_handler.fail(
invocation=invocation, error=error_otel
)
if invocation.span and not invocation.span.is_recording():
self._invocation_manager.delete_invocation_state(run_id=run_id)

def on_chat_model_start(
self,
serialized: dict[str, Any],
Expand Down Expand Up @@ -152,7 +226,7 @@ def on_chat_model_start(
temperature=temperature,
max_tokens=max_tokens,
)
llm_invocation = self._telemetry_handler.start_llm(
llm_invocation = self._telemetry_handler.start(
invocation=llm_invocation
)
self._invocation_manager.add_invocation_state(
Expand Down Expand Up @@ -246,7 +320,7 @@ def on_llm_end(
if response_id is not None:
llm_invocation.response_id = str(response_id)

llm_invocation = self._telemetry_handler.stop_llm(
llm_invocation = self._telemetry_handler.stop(
invocation=llm_invocation
)
if llm_invocation.span and not llm_invocation.span.is_recording():
Expand All @@ -268,8 +342,14 @@ def on_llm_error(
return

error_otel = Error(message=str(error), type=type(error))
llm_invocation = self._telemetry_handler.fail_llm(
llm_invocation = self._telemetry_handler.fail(
invocation=llm_invocation, error=error_otel
)
if llm_invocation.span and not llm_invocation.span.is_recording():
self._invocation_manager.delete_invocation_state(run_id=run_id)

def _safe_str(value: Any) -> str:
try:
return str(value)
except (TypeError, ValueError):
return "<unrepr>"
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def add_invocation_state(
self,
run_id: UUID,
parent_run_id: Optional[UUID],
invocation: GenAIInvocation,
invocation: GenAIInvocation = None,
):
invocation_state = _InvocationState(invocation=invocation)
self._invocations[run_id] = invocation_state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,13 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from opentelemetry.util.genai.context_attributes import ( # noqa: F401
get_context_scoped_attributes,
set_context_scoped_attributes,
)

__all__ = [
"get_context_scoped_attributes",
"set_context_scoped_attributes",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Local Context-Scoped Attributes (CSA) layer for GenAI instrumentation.

This module provides a process-local alternative to W3C Baggage for propagating
instrumentation attributes (e.g. ``gen_ai.workflow.name``) to child spans within
the same process. It follows the API shape proposed by OTel spec PR #4931 so
migration to the upstream CSA implementation will be straightforward.

Key properties:
- Attributes are stored under a private OTel context key — they are **never**
serialised into W3C Baggage headers or any outbound propagation format.
- Lower-priority semantics: keys already present in the context are **not**
overwritten when new attributes are merged in.
"""

from __future__ import annotations

from typing import Any

from opentelemetry import context as otel_context
from opentelemetry.context import Context

_GENAI_CONTEXT_ATTRS_KEY = otel_context.create_key(
"opentelemetry.util.genai.context_scoped_attrs"
)


def set_context_scoped_attributes(
attrs: dict[str, Any],
context: Context | None = None,
) -> Context:
"""Return a new Context with *attrs* merged in (existing keys win).

Args:
attrs: Attributes to add to the context. Keys that are already
present in the context are **not** overwritten (lower-priority
semantics matching the CSA spec).
context: Base context to merge into. Defaults to the current context.

Returns:
A new :class:`~opentelemetry.context.Context` containing the merged
attributes. The caller is responsible for attaching it if needed.
"""
ctx = context if context is not None else otel_context.get_current()
existing: dict[str, Any] = (
otel_context.get_value(_GENAI_CONTEXT_ATTRS_KEY, context=ctx) or {}
)
# existing keys win — new attrs fill in gaps only
merged = {**attrs, **existing}
return otel_context.set_value(_GENAI_CONTEXT_ATTRS_KEY, merged, ctx)


def get_context_scoped_attributes(
context: Context | None = None,
) -> dict[str, Any]:
"""Return context-scoped attributes from *context*, or an empty dict.

Args:
context: Context to read from. Defaults to the current context.

Returns:
A dict of attributes previously set via
:func:`set_context_scoped_attributes`, or ``{}`` if none are present.
"""
ctx = context if context is not None else otel_context.get_current()
return otel_context.get_value(_GENAI_CONTEXT_ATTRS_KEY, context=ctx) or {}
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,13 @@
The maximum number of concurrent uploads to queue. New uploads will be dropped if the queue is
full. Defaults to 20.
"""

OTEL_PYTHON_GENAI_CAPTURE_BAGGAGE = "OTEL_PYTHON_GENAI_CAPTURE_BAGGAGE"
"""
.. envvar:: OTEL_PYTHON_GENAI_CAPTURE_BAGGAGE

Opt-in flag that enables writing ``gen_ai.workflow.name`` to W3C Baggage in
addition to the process-local context-scoped attributes. Set to ``true`` or
``1`` to enable cross-process propagation of the workflow name via Baggage
headers. Defaults to ``false``.
"""
Loading