Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1 +1,216 @@
"""OpenTelemetry OpenAI instrumentation"""
import logging
from typing import Collection
from wrapt import wrap_function_wrapper
import openai

from opentelemetry import context as context_api
from opentelemetry.trace import get_tracer, SpanKind
from opentelemetry.trace.status import Status, StatusCode

from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import (
_SUPPRESS_INSTRUMENTATION_KEY,
unwrap,
)

from traceloop.semconv import SpanAttributes, LLMRequestTypeValues

logger = logging.getLogger(__name__)

_instruments = ("openai ~= 0.27.8",)
__version__ = "0.1.0"

WRAPPED_METHODS = [
{
"object": "ChatCompletion",
"method": "create",
"span_name": "openai.chat",
},
{
"object": "Completion",
"method": "create",
"span_name": "openai.completion",
},
]


def _set_span_attribute(span, name, value):
if value is not None:
if value != "":
span.set_attribute(name, value)
return


def _set_api_attributes(span):
_set_span_attribute(span, SpanAttributes.OPENAI_API_BASE, openai.api_base)
_set_span_attribute(span, SpanAttributes.OPENAI_API_TYPE, openai.api_type)
_set_span_attribute(span, SpanAttributes.OPENAI_API_VERSION, openai.api_version)

return


def _set_span_prompts(span, messages):
if messages is None:
return

for i, msg in enumerate(messages):
prefix = f"{SpanAttributes.LLM_PROMPTS}.{i}"
_set_span_attribute(span, f"{prefix}.role", msg.get("role"))
_set_span_attribute(span, f"{prefix}.content", msg.get("content"))


def _set_input_attributes(span, llm_request_type, kwargs):
_set_span_attribute(span, SpanAttributes.LLM_REQUEST_MODEL, kwargs.get("model"))
_set_span_attribute(
span, SpanAttributes.LLM_REQUEST_MAX_TOKENS, kwargs.get("max_tokens")
)
_set_span_attribute(span, SpanAttributes.LLM_TEMPERATURE, kwargs.get("temperature"))
_set_span_attribute(span, SpanAttributes.LLM_TOP_P, kwargs.get("top_p"))
_set_span_attribute(
span, SpanAttributes.LLM_FREQUENCY_PENALTY, kwargs.get("frequency_penalty")
)
_set_span_attribute(
span, SpanAttributes.LLM_PRESENCE_PENALTY, kwargs.get("presence_penalty")
)

if llm_request_type == LLMRequestTypeValues.CHAT:
_set_span_prompts(span, kwargs.get("messages"))
elif llm_request_type == LLMRequestTypeValues.COMPLETION:
_set_span_attribute(
span, f"{SpanAttributes.LLM_PROMPTS}.0.user", kwargs.get("prompt")
)

return


def _set_span_completions(span, llm_request_type, choices):
if choices is None:
return

for choice in choices:
index = choice.get("index")
prefix = f"{SpanAttributes.LLM_COMPLETIONS}.{index}"
_set_span_attribute(
span, f"{prefix}.finish_reason", choice.get("finish_reason")
)

if llm_request_type == LLMRequestTypeValues.CHAT:
message = choice.get("message")
if message is not None:
_set_span_attribute(span, f"{prefix}.role", message.get("role"))
_set_span_attribute(span, f"{prefix}.content", message.get("content"))
elif llm_request_type == LLMRequestTypeValues.COMPLETION:
_set_span_attribute(span, f"{prefix}.content", choice.get("text"))


def _set_response_attributes(span, llm_request_type, response):
_set_span_attribute(span, SpanAttributes.LLM_RESPONSE_MODEL, response.get("model"))
_set_span_completions(span, llm_request_type, response.get("choices"))

usage = response.get("usage")
if usage is not None:
_set_span_attribute(
span, SpanAttributes.LLM_USAGE_TOTAL_TOKENS, usage.get("total_tokens")
)
_set_span_attribute(
span,
SpanAttributes.LLM_USAGE_COMPLETION_TOKENS,
usage.get("completion_tokens"),
)
_set_span_attribute(
span, SpanAttributes.LLM_USAGE_PROMPT_TOKENS, usage.get("prompt_tokens")
)

return


def _with_tracer_wrapper(func):
"""Helper for providing tracer for wrapper functions."""

def _with_tracer(tracer, to_wrap):
def wrapper(wrapped, instance, args, kwargs):
# prevent double wrapping
if hasattr(wrapped, "__wrapped__"):
return wrapped(*args, **kwargs)

return func(tracer, to_wrap, wrapped, instance, args, kwargs)

return wrapper

return _with_tracer


def _llm_request_type_by_object(object_name):
if object_name == "Completion":
return LLMRequestTypeValues.COMPLETION
elif object_name == "ChatCompletion":
return LLMRequestTypeValues.CHAT
else:
return LLMRequestTypeValues.UNKNOWN


@_with_tracer_wrapper
def _wrap(tracer, to_wrap, wrapped, instance, args, kwargs):
"""Instruments and calls every function defined in TO_WRAP."""
if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY):
return wrapped(*args, **kwargs)

name = to_wrap.get("span_name")
llm_request_type = _llm_request_type_by_object(to_wrap.get("object"))
with tracer.start_as_current_span(
name,
kind=SpanKind.CLIENT,
attributes={
SpanAttributes.LLM_VENDOR: "OpenAI",
SpanAttributes.LLM_REQUEST_TYPE: llm_request_type.value,
},
) as span:
if span.is_recording():
_set_api_attributes(span)
try:
if span.is_recording():
_set_input_attributes(span, llm_request_type, kwargs)

except Exception as ex: # pylint: disable=broad-except
logger.warning(
"Failed to set input attributes for openai span, error: %s", str(ex)
)

response = wrapped(*args, **kwargs)

if response:
try:
if span.is_recording():
_set_response_attributes(span, llm_request_type, response)

except Exception as ex: # pylint: disable=broad-except
logger.warning(
"Failed to set response attributes for openai span, error: %s",
str(ex),
)
if span.is_recording():
span.set_status(Status(StatusCode.OK))

return response


class OpenAIInstrumentor(BaseInstrumentor):
"""An instrumentor for OpenAI's client library."""

def instrumentation_dependencies(self) -> Collection[str]:
return _instruments

def _instrument(self, **kwargs):
tracer_provider = kwargs.get("tracer_provider")
tracer = get_tracer(__name__, __version__, tracer_provider)
for wrapped_method in WRAPPED_METHODS:
wrap_object = wrapped_method.get("object")
wrap_method = wrapped_method.get("method")
wrap_function_wrapper(
"openai", f"{wrap_object}.{wrap_method}", _wrap(tracer, wrapped_method)
)

def _uninstrument(self, **kwargs):
for wrapped_method in WRAPPED_METHODS:
wrap_object = wrapped_method.get("object")
unwrap(f"openai.{wrap_object}", wrapped_method.get("method"))
34 changes: 33 additions & 1 deletion packages/opentelemetry-instrumentation-openai/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ include = "opentelemetry/instrumentation/openai"
python = ">=3.8.1,<3.12"
openai = "^0.28.0"
opentelemetry-api = "^1.19.0"
opentelemetry-instrumentation = "^0.40b0"

[tool.poetry.dependencies.opentelemetry-semantic-conventions-llm]
path = "../opentelemetry-semantic-conventions-llm"
Expand Down
34 changes: 33 additions & 1 deletion packages/opentelemetry-instrumentation-pinecone/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ include = "opentelemetry/instrumentation/pinecone"
[tool.poetry.dependencies]
python = ">=3.8.1,<3.12"
opentelemetry-api = "^1.19.0"
opentelemetry-instrumentation = "^0.40b0"

[tool.poetry.dependencies.opentelemetry-semantic-conventions-llm]
path = "../opentelemetry-semantic-conventions-llm"
Expand Down
2 changes: 2 additions & 0 deletions packages/traceloop-sdk/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Empty file.
Empty file.
22 changes: 22 additions & 0 deletions packages/traceloop-sdk/traceloop/sdk/tracing/context_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from contextlib import contextmanager, asynccontextmanager
from typing import Optional

from sdk.tracing.tracer import Tracing


@contextmanager
def get_tracer(flush_on_exit: bool = False):
try:
yield Tracing.get_tracer()
finally:
if flush_on_exit:
Tracing.flush()


@asynccontextmanager
async def get_async_tracer(flush_on_exit: bool = False):
try:
yield Tracing.get_tracer()
finally:
if flush_on_exit:
Tracing.flush()
Loading