Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .circleci/config.templ.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1299,6 +1299,14 @@ jobs:
pattern: "langchain"
snapshot: true

anthropic:
<<: *machine_executor
parallelism: 3
steps:
- run_test:
pattern: "anthropic"
snapshot: true

logbook:
<<: *machine_executor
steps:
Expand Down
48 changes: 48 additions & 0 deletions .riot/requirements/1e0e8e5.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#
# This file is autogenerated by pip-compile with Python 3.10
# by the following command:
#
# pip-compile --no-annotate .riot/requirements/1e0e8e5.in
#
annotated-types==0.7.0
anthropic==0.26.1
anyio==4.4.0
attrs==23.2.0
certifi==2024.2.2
charset-normalizer==3.3.2
coverage[toml]==7.5.3
distro==1.9.0
exceptiongroup==1.2.1
filelock==3.14.0
fsspec==2024.5.0
h11==0.14.0
httpcore==1.0.5
httpx==0.27.0
huggingface-hub==0.23.2
hypothesis==6.45.0
idna==3.7
iniconfig==2.0.0
jiter==0.4.0
mock==5.1.0
multidict==6.0.5
opentracing==2.4.0
packaging==24.0
pluggy==1.5.0
pydantic==2.7.2
pydantic-core==2.18.3
pytest==8.2.1
pytest-asyncio==0.23.7
pytest-cov==5.0.0
pytest-mock==3.14.0
pyyaml==6.0.1
requests==2.32.3
sniffio==1.3.1
sortedcontainers==2.4.0
tokenizers==0.19.1
tomli==2.0.1
tqdm==4.66.4
typing-extensions==4.12.0
urllib3==2.2.1
vcrpy==6.0.1
wrapt==1.16.0
yarl==1.9.4
1 change: 1 addition & 0 deletions ddtrace/_monkey.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
"tornado": False,
"openai": True,
"langchain": True,
"anthropic": True,
"subprocess": True,
"unittest": True,
"coverage": False,
Expand Down
17 changes: 17 additions & 0 deletions ddtrace/contrib/anthropic/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
"""
Do later.
""" # noqa: E501
from ...internal.utils.importlib import require_modules


required_modules = ["anthropic"]

with require_modules(required_modules) as missing_modules:
if not missing_modules:
from . import patch as _patch

patch = _patch.patch
unpatch = _patch.unpatch
get_version = _patch.get_version

__all__ = ["patch", "unpatch", "get_version"]
174 changes: 174 additions & 0 deletions ddtrace/contrib/anthropic/patch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
import json
import os
import sys
from typing import Any

import anthropic

from ddtrace import config
from ddtrace.contrib.trace_utils import unwrap
from ddtrace.contrib.trace_utils import with_traced_module
from ddtrace.contrib.trace_utils import wrap
from ddtrace.internal.logger import get_logger
from ddtrace.internal.utils import get_argument_value
from ddtrace.llmobs._integrations import AnthropicIntegration
from ddtrace.pin import Pin

from .utils import _get_attr
from .utils import handle_streamed_response


log = get_logger(__name__)


def get_version():
# type: () -> str
return getattr(anthropic, "__version__", "")


config._add(
"anthropic",
{
"span_prompt_completion_sample_rate": float(os.getenv("DD_ANTHROPIC_SPAN_PROMPT_COMPLETION_SAMPLE_RATE", 1.0)),
"span_char_limit": int(os.getenv("DD_ANTHROPIC_SPAN_CHAR_LIMIT", 128)),
},
)


def _extract_api_key(instance: Any) -> str:
"""
Extract and format LLM-provider API key from instance.
"""
client = getattr(instance, "_client", "")
if client:
return getattr(client, "api_key", None)
return None


@with_traced_module
def traced_chat_model_generate(anthropic, pin, func, instance, args, kwargs):
chat_messages = get_argument_value(args, kwargs, 0, "messages")
integration = anthropic._datadog_integration
is_stream = False

operation_name = "stream" if "stream" in kwargs and kwargs["stream"] is True else func.__name__

span = integration.trace(
pin,
"%s.%s.%s" % (instance.__module__, instance.__class__.__name__, operation_name),
submit_to_llmobs=True,
interface_type="chat_model",
provider="anthropic",
model=kwargs.get("model", ""),
api_key=_extract_api_key(instance),
)

chat_completions = None
try:
for message_idx, message in enumerate(chat_messages):
if isinstance(message, dict):
if isinstance(message.get("content", None), str):
if integration.is_pc_sampled_span(span) and message.get("content", "") != "":
span.set_tag_str(
"anthropic.request.messages.%d.content.0.text" % (message_idx),
integration.trunc(message.get("content", "")),
)
span.set_tag_str(
"anthropic.request.messages.%d.content.0.type" % (message_idx),
"text",
)
elif isinstance(message.get("content", None), list):
for block_idx, block in enumerate(message.get("content", [])):
if integration.is_pc_sampled_span(span):
if block.get("type", None) == "text" and block.get("text", "") != "":
span.set_tag_str(
"anthropic.request.messages.%d.content.%d.text" % (message_idx, block_idx),
integration.trunc(str(block.get("text", ""))),
)
elif block.get("type", None) == "image":
span.set_tag_str(
"anthropic.request.messages.%d.content.%d.text" % (message_idx, block_idx),
"([IMAGE DETECTED])",
)

span.set_tag_str(
"anthropic.request.messages.%d.content.%d.type" % (message_idx, block_idx),
block.get("type", "text"),
)
span.set_tag_str(
"anthropic.request.messages.%d.role" % (message_idx),
message.get("role", ""),
)
params_to_tag = {k: v for k, v in kwargs.items() if k != "messages"}
span.set_tag_str("anthropic.request.parameters", json.dumps(params_to_tag))

chat_completions = func(*args, **kwargs)

if isinstance(chat_completions, anthropic.Stream) or isinstance(
chat_completions, anthropic.lib.streaming._messages.MessageStreamManager
):
is_stream = True
return handle_streamed_response(integration, chat_completions, args, kwargs, span)
else:
handle_non_streamed_response(integration, chat_completions, args, kwargs, span)
except Exception:
span.set_exc_info(*sys.exc_info())
if integration.is_pc_sampled_llmobs(span):
integration.llmobs_set_tags(span=span, resp=None, args=args, kwargs=kwargs, err=bool(span.error))
span.finish()
raise
finally:
# only want to perform this for non-streamed responses, for streamed responses this is done once the generator
# iterator is exhausted
if not is_stream:
if integration.is_pc_sampled_llmobs(span):
integration.llmobs_set_tags(span=span, resp=chat_completions, args=args, kwargs=kwargs)
span.finish()
return chat_completions


def handle_non_streamed_response(integration, chat_completions, args, kwargs, span):
for idx, chat_completion in enumerate(chat_completions.content):
if integration.is_pc_sampled_span(span) and getattr(chat_completion, "text", "") != "":
span.set_tag_str(
"anthropic.response.completions.content.%d.text" % (idx),
integration.trunc(str(getattr(chat_completion, "text", ""))),
)
span.set_tag_str(
"anthropic.response.completions.content.%d.type" % (idx),
chat_completion.type,
)

# set message level tags
if getattr(chat_completions, "stop_reason", None) is not None:
span.set_tag_str("anthropic.response.completions.finish_reason", chat_completions.stop_reason)
span.set_tag_str("anthropic.response.completions.role", chat_completions.role)

usage = _get_attr(chat_completions, "usage", {})
integration.record_usage(span, usage)


def patch():
if getattr(anthropic, "_datadog_patch", False):
return

anthropic._datadog_patch = True

Pin().onto(anthropic)
integration = AnthropicIntegration(integration_config=config.anthropic)
anthropic._datadog_integration = integration

wrap("anthropic", "resources.messages.Messages.create", traced_chat_model_generate(anthropic))
wrap("anthropic", "resources.messages.Messages.stream", traced_chat_model_generate(anthropic))


def unpatch():
if not getattr(anthropic, "_datadog_patch", False):
return

anthropic._datadog_patch = False

unwrap(anthropic.resources.messages.Messages, "create")
unwrap(anthropic.resources.messages.Messages, "stream")

delattr(anthropic, "_datadog_integration")
Loading