Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
from opentelemetry.instrumentation.bedrock.span_utils import (
converse_usage_record,
set_converse_input_prompt_span_attributes,
_set_converse_finish_reasons,
_set_finish_reasons_unconditionally,
set_converse_model_span_attributes,
set_converse_response_span_attributes,
set_converse_streaming_response_span_attributes,
Expand All @@ -48,6 +50,13 @@
unwrap,
)
from opentelemetry.metrics import Counter, Histogram, Meter, get_meter
from opentelemetry.semconv._incubating.attributes import (
gen_ai_attributes as GenAIAttributes,
)
from opentelemetry.semconv._incubating.attributes.gen_ai_attributes import (
GenAiOperationNameValues,
GenAiSystemValues,
)
from opentelemetry.semconv_ai import (
SUPPRESS_LANGUAGE_MODEL_INSTRUMENTATION_KEY,
Meters,
Expand Down Expand Up @@ -104,8 +113,9 @@ def __init__(
{"package": "botocore.session", "object": "Session", "method": "create_client"},
]

_BEDROCK_INVOKE_SPAN_NAME = "bedrock.completion"
_BEDROCK_CONVERSE_SPAN_NAME = "bedrock.converse"
def _span_name(operation_name, model):
"""Build span name per OTel semconv: '{operation_name} {model}'."""
return f"{operation_name} {model}" if model else operation_name


def is_metrics_enabled() -> bool:
Expand Down Expand Up @@ -200,8 +210,15 @@ def with_instrumentation(*args, **kwargs):
if context_api.get_value(SUPPRESS_LANGUAGE_MODEL_INSTRUMENTATION_KEY):
return fn(*args, **kwargs)

(provider, _model_vendor, _model) = _get_vendor_model(kwargs.get("modelId"))
operation_name = _derive_operation_name(kwargs)
span_attributes = {
GenAIAttributes.GEN_AI_PROVIDER_NAME: provider,
GenAIAttributes.GEN_AI_OPERATION_NAME: operation_name,
GenAIAttributes.GEN_AI_REQUEST_MODEL: _model,
}
with tracer.start_as_current_span(
_BEDROCK_INVOKE_SPAN_NAME, kind=SpanKind.CLIENT
_span_name(operation_name, _model), kind=SpanKind.CLIENT, attributes=span_attributes
) as span:
response = fn(*args, **kwargs)
_handle_call(span, kwargs, response, metric_params, event_logger)
Expand All @@ -218,7 +235,18 @@ def with_instrumentation(*args, **kwargs):
if context_api.get_value(SUPPRESS_LANGUAGE_MODEL_INSTRUMENTATION_KEY):
return fn(*args, **kwargs)

span = tracer.start_span(_BEDROCK_INVOKE_SPAN_NAME, kind=SpanKind.CLIENT)
(provider, _model_vendor, _model) = _get_vendor_model(kwargs.get("modelId"))
operation_name = _derive_operation_name(kwargs)
span_attributes = {
GenAIAttributes.GEN_AI_PROVIDER_NAME: provider,
GenAIAttributes.GEN_AI_OPERATION_NAME: operation_name,
GenAIAttributes.GEN_AI_REQUEST_MODEL: _model,
}
span = tracer.start_span(
_span_name(operation_name, _model),
kind=SpanKind.CLIENT,
attributes=span_attributes,
)

response = fn(*args, **kwargs)
_handle_stream_call(span, kwargs, response, metric_params, event_logger)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Expand All @@ -237,8 +265,16 @@ def with_instrumentation(*args, **kwargs):
if context_api.get_value(SUPPRESS_LANGUAGE_MODEL_INSTRUMENTATION_KEY):
return fn(*args, **kwargs)

(provider, _model_vendor, _model) = _get_vendor_model(kwargs.get("modelId"))
span_attributes = {
GenAIAttributes.GEN_AI_PROVIDER_NAME: provider,
GenAIAttributes.GEN_AI_OPERATION_NAME: GenAiOperationNameValues.CHAT.value,
GenAIAttributes.GEN_AI_REQUEST_MODEL: _model,
}
with tracer.start_as_current_span(
_BEDROCK_CONVERSE_SPAN_NAME, kind=SpanKind.CLIENT
_span_name(GenAiOperationNameValues.CHAT.value, _model),
kind=SpanKind.CLIENT,
attributes=span_attributes,
) as span:
response = fn(*args, **kwargs)
_handle_converse(span, kwargs, response, metric_params, event_logger)
Expand All @@ -254,7 +290,17 @@ def with_instrumentation(*args, **kwargs):
if context_api.get_value(SUPPRESS_LANGUAGE_MODEL_INSTRUMENTATION_KEY):
return fn(*args, **kwargs)

span = tracer.start_span(_BEDROCK_CONVERSE_SPAN_NAME, kind=SpanKind.CLIENT)
(provider, _model_vendor, _model) = _get_vendor_model(kwargs.get("modelId"))
span_attributes = {
GenAIAttributes.GEN_AI_PROVIDER_NAME: provider,
GenAIAttributes.GEN_AI_OPERATION_NAME: GenAiOperationNameValues.CHAT.value,
GenAIAttributes.GEN_AI_REQUEST_MODEL: _model,
}
span = tracer.start_span(
_span_name(GenAiOperationNameValues.CHAT.value, _model),
kind=SpanKind.CLIENT,
attributes=span_attributes,
)
response = fn(*args, **kwargs)
if span.is_recording():
_handle_converse_stream(span, kwargs, response, metric_params, event_logger)
Expand Down Expand Up @@ -299,6 +345,7 @@ def stream_done(response_body):
if should_emit_events() and event_logger:
emit_message_events(event_logger, kwargs)
emit_streaming_response_event(response_body, event_logger)
_set_finish_reasons_unconditionally(model_vendor, span, response_body)
else:
set_model_message_span_attributes(model_vendor, span, request_body)
set_model_choice_span_attributes(model_vendor, span, response_body)
Expand Down Expand Up @@ -345,6 +392,7 @@ def _handle_call(span: Span, kwargs, response, metric_params, event_logger):
if should_emit_events() and event_logger:
emit_message_events(event_logger, kwargs)
emit_choice_events(event_logger, response)
_set_finish_reasons_unconditionally(model_vendor, span, response_body)
else:
set_model_message_span_attributes(model_vendor, span, request_body)
set_model_choice_span_attributes(model_vendor, span, response_body)
Expand All @@ -362,6 +410,7 @@ def _handle_converse(span, kwargs, response, metric_params, event_logger):
if should_emit_events() and event_logger:
emit_input_events_converse(kwargs, event_logger)
emit_response_event_converse(response, event_logger)
_set_converse_finish_reasons(span, response.get("stopReason"))
else:
set_converse_input_prompt_span_attributes(kwargs, span)
set_converse_response_span_attributes(response, span)
Expand All @@ -385,11 +434,28 @@ def _handle_converse_stream(span, kwargs, response, metric_params, event_logger)
def handler(func):
def wrap(*args, **kwargs):
response_msg = kwargs.pop("response_msg")
tool_blocks = kwargs.pop("tool_blocks")
reasoning_blocks = kwargs.pop("reasoning_blocks")
span = kwargs.pop("span")
event = func(*args, **kwargs)
nonlocal role
if "contentBlockDelta" in event and "text" in event["contentBlockDelta"].get("delta", {}):
response_msg.append(event["contentBlockDelta"]["delta"]["text"])
if "contentBlockDelta" in event:
delta = event["contentBlockDelta"].get("delta", {})
if "text" in delta:
response_msg.append(delta["text"])
if "toolUse" in delta:
# Merge delta input into the last tool block (created by contentBlockStart)
if tool_blocks:
tool_blocks[-1].setdefault("input", "")
tool_blocks[-1]["input"] += delta["toolUse"].get("input", "")
else:
tool_blocks.append(delta["toolUse"])
if "reasoningContent" in delta:
reasoning_blocks.append(delta["reasoningContent"].get("text", ""))
elif "contentBlockStart" in event:
start = event["contentBlockStart"].get("start", {})
if "toolUse" in start:
tool_blocks.append(start["toolUse"])
Comment thread
coderabbitai[bot] marked this conversation as resolved.
elif "messageStart" in event:
role = event["messageStart"]["role"]
elif "metadata" in event:
Expand All @@ -398,29 +464,36 @@ def wrap(*args, **kwargs):
converse_usage_record(span, event["metadata"], metric_params)
span.end()
elif "messageStop" in event:
stop_reason = event.get("messageStop", {}).get("stopReason")
if should_emit_events() and event_logger:
emit_streaming_converse_response_event(
event_logger,
response_msg,
role,
event.get("messageStop", {}).get("stopReason", "unknown"),
stop_reason,
)
_set_converse_finish_reasons(span, stop_reason)
else:
set_converse_streaming_response_span_attributes(
response_msg, role, span
response_msg,
role,
span,
finish_reason=stop_reason,
tool_blocks=tool_blocks,
reasoning_blocks=reasoning_blocks,
)

return event

return partial(wrap, response_msg=[], span=span)
return partial(wrap, response_msg=[], tool_blocks=[], reasoning_blocks=[], span=span)

stream._parse_event = handler(stream._parse_event)


def _get_vendor_model(modelId):
# Docs:
# https://docs.aws.amazon.com/bedrock/latest/userguide/inference-profiles-support.html#inference-profiles-support-system
provider = "AWS"
provider = GenAiSystemValues.AWS_BEDROCK.value
model_vendor = "imported_model"
model = modelId

Expand Down Expand Up @@ -449,19 +522,32 @@ def _cross_region_check(value):
return model_vendor, model


def _derive_operation_name(kwargs):
"""Derive operation name for invoke_model spans prior to creation."""
body_str = kwargs.get("body")
if body_str:
try:
body = json.loads(body_str)
if isinstance(body, dict) and "messages" in body:
return GenAiOperationNameValues.CHAT.value
except (json.JSONDecodeError, TypeError):
pass
return GenAiOperationNameValues.TEXT_COMPLETION.value


class GuardrailMeters:
LLM_BEDROCK_GUARDRAIL_ACTIVATION = "gen_ai.bedrock.guardrail.activation"
LLM_BEDROCK_GUARDRAIL_LATENCY = "gen_ai.bedrock.guardrail.latency"
LLM_BEDROCK_GUARDRAIL_COVERAGE = "gen_ai.bedrock.guardrail.coverage"
LLM_BEDROCK_GUARDRAIL_SENSITIVE = "gen_ai.bedrock.guardrail.sensitive_info"
LLM_BEDROCK_GUARDRAIL_TOPICS = "gen_ai.bedrock.guardrail.topics"
LLM_BEDROCK_GUARDRAIL_CONTENT = "gen_ai.bedrock.guardrail.content"
LLM_BEDROCK_GUARDRAIL_WORDS = "gen_ai.bedrock.guardrail.words"
GEN_AI_BEDROCK_GUARDRAIL_ACTIVATION = "gen_ai.bedrock.guardrail.activation"
GEN_AI_BEDROCK_GUARDRAIL_LATENCY = "gen_ai.bedrock.guardrail.latency"
GEN_AI_BEDROCK_GUARDRAIL_COVERAGE = "gen_ai.bedrock.guardrail.coverage"
GEN_AI_BEDROCK_GUARDRAIL_SENSITIVE = "gen_ai.bedrock.guardrail.sensitive_info"
GEN_AI_BEDROCK_GUARDRAIL_TOPICS = "gen_ai.bedrock.guardrail.topics"
GEN_AI_BEDROCK_GUARDRAIL_CONTENT = "gen_ai.bedrock.guardrail.content"
GEN_AI_BEDROCK_GUARDRAIL_WORDS = "gen_ai.bedrock.guardrail.words"


class PromptCaching:
# will be moved under the AI SemConv. Not namespaced since also OpenAI supports this.
LLM_BEDROCK_PROMPT_CACHING = "gen_ai.prompt.caching"
GEN_AI_PROMPT_CACHING = "gen_ai.prompt.caching"

Comment on lines 538 to 551
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Keep backward-compatible aliases for the renamed metric constants.

GuardrailMeters and PromptCaching live in the package root, and the old LLM_BEDROCK_* names disappear completely here. That turns an otherwise additive semconv migration into an AttributeError for downstream code that imports those constants.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/__init__.py`
around lines 456 - 469, The renamed metric constants removed the old
LLM_BEDROCK_* identifiers causing AttributeError for downstream imports; restore
backward-compatible aliases by adding the old names as assignments pointing to
the new constants (e.g., define LLM_BEDROCK_GUARDRAIL_ACTIVATION =
GuardrailMeters.GEN_AI_BEDROCK_GUARDRAIL_ACTIVATION,
LLM_BEDROCK_GUARDRAIL_LATENCY =
GuardrailMeters.GEN_AI_BEDROCK_GUARDRAIL_LATENCY, etc., and
LLM_BEDROCK_PROMPT_CACHING = PromptCaching.GEN_AI_PROMPT_CACHING) so both
GuardrailMeters and PromptCaching keep the new names while the legacy
LLM_BEDROCK_* symbols remain available.


def _create_metrics(meter: Meter):
Expand All @@ -484,58 +570,57 @@ def _create_metrics(meter: Meter):
)

exception_counter = meter.create_counter(
# TODO: will fix this in future as a consolidation for semantic convention
name="llm.bedrock.completions.exceptions",
name="gen_ai.bedrock.completions.exceptions",
unit="time",
description="Number of exceptions occurred during chat completions",
)

# Guardrail metrics
guardrail_activation = meter.create_counter(
name=GuardrailMeters.LLM_BEDROCK_GUARDRAIL_ACTIVATION,
name=GuardrailMeters.GEN_AI_BEDROCK_GUARDRAIL_ACTIVATION,
unit="",
description="Number of guardrail activation",
)

guardrail_latency_histogram = meter.create_histogram(
name=GuardrailMeters.LLM_BEDROCK_GUARDRAIL_LATENCY,
name=GuardrailMeters.GEN_AI_BEDROCK_GUARDRAIL_LATENCY,
unit="ms",
description="GenAI guardrail latency",
)

guardrail_coverage = meter.create_counter(
name=GuardrailMeters.LLM_BEDROCK_GUARDRAIL_COVERAGE,
name=GuardrailMeters.GEN_AI_BEDROCK_GUARDRAIL_COVERAGE,
unit="char",
description="GenAI guardrail coverage",
)

guardrail_sensitive_info = meter.create_counter(
name=GuardrailMeters.LLM_BEDROCK_GUARDRAIL_SENSITIVE,
name=GuardrailMeters.GEN_AI_BEDROCK_GUARDRAIL_SENSITIVE,
unit="",
description="GenAI guardrail sensitive information protection",
)

guardrail_topic = meter.create_counter(
name=GuardrailMeters.LLM_BEDROCK_GUARDRAIL_TOPICS,
name=GuardrailMeters.GEN_AI_BEDROCK_GUARDRAIL_TOPICS,
unit="",
description="GenAI guardrail topics protection",
)

guardrail_content = meter.create_counter(
name=GuardrailMeters.LLM_BEDROCK_GUARDRAIL_CONTENT,
name=GuardrailMeters.GEN_AI_BEDROCK_GUARDRAIL_CONTENT,
unit="",
description="GenAI guardrail content filter protection",
)

guardrail_words = meter.create_counter(
name=GuardrailMeters.LLM_BEDROCK_GUARDRAIL_WORDS,
name=GuardrailMeters.GEN_AI_BEDROCK_GUARDRAIL_WORDS,
unit="",
description="GenAI guardrail words filter protection",
)

# Prompt Caching
prompt_caching = meter.create_counter(
name=PromptCaching.LLM_BEDROCK_PROMPT_CACHING,
name=PromptCaching.GEN_AI_PROMPT_CACHING,
unit="",
description="Number of cached tokens",
)
Expand Down
Loading
Loading