Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,32 @@
"method": "stream",
"span_name": "anthropic.chat",
},
# Beta API methods (regular Anthropic SDK)
{
"package": "anthropic.resources.beta.messages.messages",
"object": "Messages",
"method": "create",
"span_name": "anthropic.chat",
},
{
"package": "anthropic.resources.beta.messages.messages",
"object": "Messages",
"method": "stream",
"span_name": "anthropic.chat",
},
# Beta API methods (Bedrock SDK)
{
"package": "anthropic.lib.bedrock._beta_messages",
"object": "Messages",
"method": "create",
"span_name": "anthropic.chat",
},
{
"package": "anthropic.lib.bedrock._beta_messages",
"object": "Messages",
"method": "stream",
"span_name": "anthropic.chat",
},
]

WRAPPED_AMETHODS = [
Expand All @@ -96,6 +122,32 @@
"method": "create",
"span_name": "anthropic.chat",
},
# Beta API async methods (regular Anthropic SDK)
{
"package": "anthropic.resources.beta.messages.messages",
"object": "AsyncMessages",
"method": "create",
"span_name": "anthropic.chat",
},
{
"package": "anthropic.resources.beta.messages.messages",
"object": "AsyncMessages",
"method": "stream",
"span_name": "anthropic.chat",
},
# Beta API async methods (Bedrock SDK)
{
"package": "anthropic.lib.bedrock._beta_messages",
"object": "AsyncMessages",
"method": "create",
"span_name": "anthropic.chat",
},
{
"package": "anthropic.lib.bedrock._beta_messages",
"object": "AsyncMessages",
"method": "stream",
"span_name": "anthropic.chat",
},
]


Expand Down Expand Up @@ -130,8 +182,8 @@ async def _aset_token_usage(
token_histogram: Histogram = None,
choice_counter: Counter = None,
):
if not isinstance(response, dict):
response = response.__dict__
from opentelemetry.instrumentation.anthropic.utils import _aextract_response_data
response = await _aextract_response_data(response)

if usage := response.get("usage"):
prompt_tokens = usage.input_tokens
Expand Down Expand Up @@ -223,8 +275,8 @@ def _set_token_usage(
token_histogram: Histogram = None,
choice_counter: Counter = None,
):
if not isinstance(response, dict):
response = response.__dict__
from opentelemetry.instrumentation.anthropic.utils import _extract_response_data
response = _extract_response_data(response)

if usage := response.get("usage"):
prompt_tokens = usage.input_tokens
Expand Down Expand Up @@ -384,6 +436,17 @@ async def _ahandle_input(span: Span, event_logger: Optional[EventLogger], kwargs
await aset_input_attributes(span, kwargs)


@dont_throw
async def _ahandle_response(span: Span, event_logger: Optional[EventLogger], response):
if should_emit_events():
emit_response_events(event_logger, response)
else:
if not span.is_recording():
return
from opentelemetry.instrumentation.anthropic.span_utils import aset_response_attributes
await aset_response_attributes(span, response)


@dont_throw
def _handle_response(span: Span, event_logger: Optional[EventLogger], response):
if should_emit_events():
Expand Down Expand Up @@ -606,7 +669,8 @@ async def _awrap(
kwargs,
)
elif response:
metric_attributes = shared_metrics_attributes(response)
from opentelemetry.instrumentation.anthropic.utils import ashared_metrics_attributes
metric_attributes = await ashared_metrics_attributes(response)

if duration_histogram:
duration = time.time() - start_time
Expand All @@ -615,7 +679,7 @@ async def _awrap(
attributes=metric_attributes,
)

_handle_response(span, event_logger, response)
await _ahandle_response(span, event_logger, response)

if span.is_recording():
await _aset_token_usage(
Expand Down Expand Up @@ -710,7 +774,9 @@ def _instrument(self, **kwargs):
wrapped_method,
),
)
except ModuleNotFoundError:
logger.debug(f"Successfully wrapped {wrap_package}.{wrap_object}.{wrap_method}")
except Exception as e:
logger.debug(f"Failed to wrap {wrap_package}.{wrap_object}.{wrap_method}: {e}")
pass # that's ok, we don't want to fail if some methods do not exist

for wrapped_method in WRAPPED_AMETHODS:
Expand All @@ -731,7 +797,7 @@ def _instrument(self, **kwargs):
wrapped_method,
),
)
except ModuleNotFoundError:
except Exception:
pass # that's ok, we don't want to fail if some methods do not exist

def _uninstrument(self, **kwargs):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
dont_throw,
model_as_dict,
should_send_prompts,
_extract_response_data,
)
from opentelemetry.semconv._incubating.attributes.gen_ai_attributes import (
GEN_AI_RESPONSE_ID,
Expand Down Expand Up @@ -165,11 +166,81 @@ async def aset_input_attributes(span, kwargs):
)


async def _aset_span_completions(span, response):
if not should_send_prompts():
return
from opentelemetry.instrumentation.anthropic import set_span_attribute
from opentelemetry.instrumentation.anthropic.utils import _aextract_response_data

response = await _aextract_response_data(response)
index = 0
prefix = f"{SpanAttributes.LLM_COMPLETIONS}.{index}"
set_span_attribute(span, f"{prefix}.finish_reason", response.get("stop_reason"))
if response.get("role"):
set_span_attribute(span, f"{prefix}.role", response.get("role"))

if response.get("completion"):
set_span_attribute(span, f"{prefix}.content", response.get("completion"))
elif response.get("content"):
tool_call_index = 0
text = ""
for content in response.get("content"):
content_block_type = content.type
# usually, Antrhopic responds with just one text block,
# but the API allows for multiple text blocks, so concatenate them
if content_block_type == "text" and hasattr(content, "text"):
text += content.text
elif content_block_type == "thinking":
content = dict(content)
# override the role to thinking
set_span_attribute(
span,
f"{prefix}.role",
"thinking",
)
set_span_attribute(
span,
f"{prefix}.content",
content.get("thinking"),
)
# increment the index for subsequent content blocks
index += 1
prefix = f"{SpanAttributes.LLM_COMPLETIONS}.{index}"
# set the role to the original role on the next completions
set_span_attribute(
span,
f"{prefix}.role",
response.get("role"),
)
elif content_block_type == "tool_use":
content = dict(content)
set_span_attribute(
span,
f"{prefix}.tool_calls.{tool_call_index}.id",
content.get("id"),
)
set_span_attribute(
span,
f"{prefix}.tool_calls.{tool_call_index}.name",
content.get("name"),
)
tool_arguments = content.get("input")
if tool_arguments is not None:
set_span_attribute(
span,
f"{prefix}.tool_calls.{tool_call_index}.arguments",
json.dumps(tool_arguments),
)
tool_call_index += 1
set_span_attribute(span, f"{prefix}.content", text)


def _set_span_completions(span, response):
if not should_send_prompts():
return
from opentelemetry.instrumentation.anthropic import set_span_attribute

response = _extract_response_data(response)
index = 0
prefix = f"{SpanAttributes.LLM_COMPLETIONS}.{index}"
set_span_attribute(span, f"{prefix}.finish_reason", response.get("stop_reason"))
Expand Down Expand Up @@ -232,12 +303,36 @@ def _set_span_completions(span, response):
set_span_attribute(span, f"{prefix}.content", text)


@dont_throw
async def aset_response_attributes(span, response):
from opentelemetry.instrumentation.anthropic import set_span_attribute
from opentelemetry.instrumentation.anthropic.utils import _aextract_response_data

response = await _aextract_response_data(response)
set_span_attribute(span, SpanAttributes.LLM_RESPONSE_MODEL, response.get("model"))
set_span_attribute(span, GEN_AI_RESPONSE_ID, response.get("id"))

if response.get("usage"):
prompt_tokens = response.get("usage").input_tokens
completion_tokens = response.get("usage").output_tokens
set_span_attribute(span, SpanAttributes.LLM_USAGE_PROMPT_TOKENS, prompt_tokens)
set_span_attribute(
span, SpanAttributes.LLM_USAGE_COMPLETION_TOKENS, completion_tokens
)
set_span_attribute(
span,
SpanAttributes.LLM_USAGE_TOTAL_TOKENS,
prompt_tokens + completion_tokens,
)

await _aset_span_completions(span, response)


@dont_throw
def set_response_attributes(span, response):
from opentelemetry.instrumentation.anthropic import set_span_attribute

if not isinstance(response, dict):
response = response.__dict__
response = _extract_response_data(response)
set_span_attribute(span, SpanAttributes.LLM_RESPONSE_MODEL, response.get("model"))
set_span_attribute(span, GEN_AI_RESPONSE_ID, response.get("id"))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,95 @@ def _handle_exception(e, func, logger):
return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper


async def _aextract_response_data(response):
"""Async version of _extract_response_data that can await coroutines."""
import inspect

# If we get a coroutine, await it
if inspect.iscoroutine(response):
try:
response = await response
except Exception as e:
import logging
logger = logging.getLogger(__name__)
logger.debug(f"Failed to await coroutine response: {e}")
return {}

if isinstance(response, dict):
return response

# Handle with_raw_response wrapped responses
if hasattr(response, 'parse') and callable(response.parse):
try:
# For with_raw_response, parse() gives us the actual response object
parsed_response = response.parse()
if not isinstance(parsed_response, dict):
parsed_response = parsed_response.__dict__
return parsed_response
except Exception as e:
import logging
logger = logging.getLogger(__name__)
logger.debug(f"Failed to parse response: {e}, response type: {type(response)}")

# Fallback to __dict__ for regular response objects
if hasattr(response, '__dict__'):
response_dict = response.__dict__
return response_dict

return {}


def _extract_response_data(response):
"""Extract the actual response data from both regular and with_raw_response wrapped responses."""
import inspect

# If we get a coroutine, we cannot process it in sync context
if inspect.iscoroutine(response):
import logging
logger = logging.getLogger(__name__)
logger.warning(f"_extract_response_data received coroutine {response} - response processing skipped")
return {}

if isinstance(response, dict):
return response

# Handle with_raw_response wrapped responses
if hasattr(response, 'parse') and callable(response.parse):
try:
# For with_raw_response, parse() gives us the actual response object
parsed_response = response.parse()
if not isinstance(parsed_response, dict):
parsed_response = parsed_response.__dict__
return parsed_response
except Exception as e:
import logging
logger = logging.getLogger(__name__)
logger.debug(f"Failed to parse response: {e}, response type: {type(response)}")

# Fallback to __dict__ for regular response objects
if hasattr(response, '__dict__'):
response_dict = response.__dict__
return response_dict

return {}


@dont_throw
async def ashared_metrics_attributes(response):
response = await _aextract_response_data(response)

common_attributes = Config.get_common_metrics_attributes()

return {
**common_attributes,
GEN_AI_SYSTEM: GEN_AI_SYSTEM_ANTHROPIC,
SpanAttributes.LLM_RESPONSE_MODEL: response.get("model"),
}


@dont_throw
def shared_metrics_attributes(response):
if not isinstance(response, dict):
response = response.__dict__
response = _extract_response_data(response)

common_attributes = Config.get_common_metrics_attributes()

Expand Down
Loading