From 3311ecc90c25b11bbff47012a42ddd72169a0fc1 Mon Sep 17 00:00:00 2001 From: Gal Kleinman Date: Mon, 4 Sep 2023 16:29:45 +0300 Subject: [PATCH] feat: sdk code + openai instrumentation --- .../instrumentation/openai/__init__.py | 217 +++++++++++++++++- .../poetry.lock | 34 ++- .../pyproject.toml | 1 + .../poetry.lock | 34 ++- .../pyproject.toml | 1 + packages/traceloop-sdk/poetry.lock | 2 + .../traceloop/sdk/semconv/__init__.py | 0 .../traceloop/sdk/tracing/__init__.py | 0 .../traceloop/sdk/tracing/context_manager.py | 22 ++ .../tracing/no_log_span_batch_processor.py | 39 ++++ .../traceloop/sdk/tracing/tracer.py | 153 ++++++++++++ 11 files changed, 500 insertions(+), 3 deletions(-) create mode 100644 packages/traceloop-sdk/traceloop/sdk/semconv/__init__.py create mode 100644 packages/traceloop-sdk/traceloop/sdk/tracing/__init__.py create mode 100644 packages/traceloop-sdk/traceloop/sdk/tracing/context_manager.py create mode 100644 packages/traceloop-sdk/traceloop/sdk/tracing/no_log_span_batch_processor.py create mode 100644 packages/traceloop-sdk/traceloop/sdk/tracing/tracer.py diff --git a/packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/__init__.py b/packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/__init__.py index 65cc04087a..284d449294 100644 --- a/packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/__init__.py +++ b/packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/__init__.py @@ -1 +1,216 @@ -"""OpenTelemetry OpenAI instrumentation""" +import logging +from typing import Collection +from wrapt import wrap_function_wrapper +import openai + +from opentelemetry import context as context_api +from opentelemetry.trace import get_tracer, SpanKind +from opentelemetry.trace.status import Status, StatusCode + +from opentelemetry.instrumentation.instrumentor import BaseInstrumentor +from opentelemetry.instrumentation.utils import ( + _SUPPRESS_INSTRUMENTATION_KEY, + unwrap, +) + +from traceloop.semconv import SpanAttributes, LLMRequestTypeValues + +logger = logging.getLogger(__name__) + +_instruments = ("openai ~= 0.27.8",) +__version__ = "0.1.0" + +WRAPPED_METHODS = [ + { + "object": "ChatCompletion", + "method": "create", + "span_name": "openai.chat", + }, + { + "object": "Completion", + "method": "create", + "span_name": "openai.completion", + }, +] + + +def _set_span_attribute(span, name, value): + if value is not None: + if value != "": + span.set_attribute(name, value) + return + + +def _set_api_attributes(span): + _set_span_attribute(span, SpanAttributes.OPENAI_API_BASE, openai.api_base) + _set_span_attribute(span, SpanAttributes.OPENAI_API_TYPE, openai.api_type) + _set_span_attribute(span, SpanAttributes.OPENAI_API_VERSION, openai.api_version) + + return + + +def _set_span_prompts(span, messages): + if messages is None: + return + + for i, msg in enumerate(messages): + prefix = f"{SpanAttributes.LLM_PROMPTS}.{i}" + _set_span_attribute(span, f"{prefix}.role", msg.get("role")) + _set_span_attribute(span, f"{prefix}.content", msg.get("content")) + + +def _set_input_attributes(span, llm_request_type, kwargs): + _set_span_attribute(span, SpanAttributes.LLM_REQUEST_MODEL, kwargs.get("model")) + _set_span_attribute( + span, SpanAttributes.LLM_REQUEST_MAX_TOKENS, kwargs.get("max_tokens") + ) + _set_span_attribute(span, SpanAttributes.LLM_TEMPERATURE, kwargs.get("temperature")) + _set_span_attribute(span, SpanAttributes.LLM_TOP_P, kwargs.get("top_p")) + _set_span_attribute( + span, SpanAttributes.LLM_FREQUENCY_PENALTY, kwargs.get("frequency_penalty") + ) + _set_span_attribute( + span, SpanAttributes.LLM_PRESENCE_PENALTY, kwargs.get("presence_penalty") + ) + + if llm_request_type == LLMRequestTypeValues.CHAT: + _set_span_prompts(span, kwargs.get("messages")) + elif llm_request_type == LLMRequestTypeValues.COMPLETION: + _set_span_attribute( + span, f"{SpanAttributes.LLM_PROMPTS}.0.user", kwargs.get("prompt") + ) + + return + + +def _set_span_completions(span, llm_request_type, choices): + if choices is None: + return + + for choice in choices: + index = choice.get("index") + prefix = f"{SpanAttributes.LLM_COMPLETIONS}.{index}" + _set_span_attribute( + span, f"{prefix}.finish_reason", choice.get("finish_reason") + ) + + if llm_request_type == LLMRequestTypeValues.CHAT: + message = choice.get("message") + if message is not None: + _set_span_attribute(span, f"{prefix}.role", message.get("role")) + _set_span_attribute(span, f"{prefix}.content", message.get("content")) + elif llm_request_type == LLMRequestTypeValues.COMPLETION: + _set_span_attribute(span, f"{prefix}.content", choice.get("text")) + + +def _set_response_attributes(span, llm_request_type, response): + _set_span_attribute(span, SpanAttributes.LLM_RESPONSE_MODEL, response.get("model")) + _set_span_completions(span, llm_request_type, response.get("choices")) + + usage = response.get("usage") + if usage is not None: + _set_span_attribute( + span, SpanAttributes.LLM_USAGE_TOTAL_TOKENS, usage.get("total_tokens") + ) + _set_span_attribute( + span, + SpanAttributes.LLM_USAGE_COMPLETION_TOKENS, + usage.get("completion_tokens"), + ) + _set_span_attribute( + span, SpanAttributes.LLM_USAGE_PROMPT_TOKENS, usage.get("prompt_tokens") + ) + + return + + +def _with_tracer_wrapper(func): + """Helper for providing tracer for wrapper functions.""" + + def _with_tracer(tracer, to_wrap): + def wrapper(wrapped, instance, args, kwargs): + # prevent double wrapping + if hasattr(wrapped, "__wrapped__"): + return wrapped(*args, **kwargs) + + return func(tracer, to_wrap, wrapped, instance, args, kwargs) + + return wrapper + + return _with_tracer + + +def _llm_request_type_by_object(object_name): + if object_name == "Completion": + return LLMRequestTypeValues.COMPLETION + elif object_name == "ChatCompletion": + return LLMRequestTypeValues.CHAT + else: + return LLMRequestTypeValues.UNKNOWN + + +@_with_tracer_wrapper +def _wrap(tracer, to_wrap, wrapped, instance, args, kwargs): + """Instruments and calls every function defined in TO_WRAP.""" + if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY): + return wrapped(*args, **kwargs) + + name = to_wrap.get("span_name") + llm_request_type = _llm_request_type_by_object(to_wrap.get("object")) + with tracer.start_as_current_span( + name, + kind=SpanKind.CLIENT, + attributes={ + SpanAttributes.LLM_VENDOR: "OpenAI", + SpanAttributes.LLM_REQUEST_TYPE: llm_request_type.value, + }, + ) as span: + if span.is_recording(): + _set_api_attributes(span) + try: + if span.is_recording(): + _set_input_attributes(span, llm_request_type, kwargs) + + except Exception as ex: # pylint: disable=broad-except + logger.warning( + "Failed to set input attributes for openai span, error: %s", str(ex) + ) + + response = wrapped(*args, **kwargs) + + if response: + try: + if span.is_recording(): + _set_response_attributes(span, llm_request_type, response) + + except Exception as ex: # pylint: disable=broad-except + logger.warning( + "Failed to set response attributes for openai span, error: %s", + str(ex), + ) + if span.is_recording(): + span.set_status(Status(StatusCode.OK)) + + return response + + +class OpenAIInstrumentor(BaseInstrumentor): + """An instrumentor for OpenAI's client library.""" + + def instrumentation_dependencies(self) -> Collection[str]: + return _instruments + + def _instrument(self, **kwargs): + tracer_provider = kwargs.get("tracer_provider") + tracer = get_tracer(__name__, __version__, tracer_provider) + for wrapped_method in WRAPPED_METHODS: + wrap_object = wrapped_method.get("object") + wrap_method = wrapped_method.get("method") + wrap_function_wrapper( + "openai", f"{wrap_object}.{wrap_method}", _wrap(tracer, wrapped_method) + ) + + def _uninstrument(self, **kwargs): + for wrapped_method in WRAPPED_METHODS: + wrap_object = wrapped_method.get("object") + unwrap(f"openai.{wrap_object}", wrapped_method.get("method")) diff --git a/packages/opentelemetry-instrumentation-openai/poetry.lock b/packages/opentelemetry-instrumentation-openai/poetry.lock index 03f65228ce..a88a5bb0b8 100644 --- a/packages/opentelemetry-instrumentation-openai/poetry.lock +++ b/packages/opentelemetry-instrumentation-openai/poetry.lock @@ -704,6 +704,22 @@ files = [ deprecated = ">=1.2.6" importlib-metadata = ">=6.0,<7.0" +[[package]] +name = "opentelemetry-instrumentation" +version = "0.40b0" +description = "Instrumentation Tools & Auto Instrumentation for OpenTelemetry Python" +optional = false +python-versions = ">=3.7" +files = [ + {file = "opentelemetry_instrumentation-0.40b0-py3-none-any.whl", hash = "sha256:789d3726e698aa9526dd247b461b9172f99a4345571546c4aecf40279679fc8e"}, + {file = "opentelemetry_instrumentation-0.40b0.tar.gz", hash = "sha256:08bebe6a752514ed61e901e9fee5ccf06ae7533074442e707d75bb65f3e0aa17"}, +] + +[package.dependencies] +opentelemetry-api = ">=1.4,<2.0" +setuptools = ">=16.0" +wrapt = ">=1.0.0,<2.0.0" + [[package]] name = "opentelemetry-semantic-conventions-llm" version = "0.0.1" @@ -882,6 +898,22 @@ urllib3 = ">=1.21.1,<3" socks = ["PySocks (>=1.5.6,!=1.5.7)"] use-chardet-on-py3 = ["chardet (>=3.0.2,<6)"] +[[package]] +name = "setuptools" +version = "68.1.2" +description = "Easily download, build, install, upgrade, and uninstall Python packages" +optional = false +python-versions = ">=3.8" +files = [ + {file = "setuptools-68.1.2-py3-none-any.whl", hash = "sha256:3d8083eed2d13afc9426f227b24fd1659489ec107c0e86cec2ffdde5c92e790b"}, + {file = "setuptools-68.1.2.tar.gz", hash = "sha256:3d4dfa6d95f1b101d695a6160a7626e15583af71a5f52176efa5d39a054d475d"}, +] + +[package.extras] +docs = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "pygments-github-lexers (==0.0.5)", "rst.linker (>=1.9)", "sphinx (>=3.5,<=7.1.2)", "sphinx-favicon", "sphinx-hoverxref (<2)", "sphinx-inline-tabs", "sphinx-lint", "sphinx-notfound-page (==0.8.3)", "sphinx-reredirects", "sphinxcontrib-towncrier"] +testing = ["build[virtualenv]", "filelock (>=3.4.0)", "flake8-2020", "ini2toml[lite] (>=0.9)", "jaraco.develop (>=7.21)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "pip (>=19.1)", "pytest (>=6)", "pytest-black (>=0.3.7)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=2.2)", "pytest-mypy (>=0.9.1)", "pytest-perf", "pytest-ruff", "pytest-timeout", "pytest-xdist", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel"] +testing-integration = ["build[virtualenv]", "filelock (>=3.4.0)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "pytest", "pytest-enabler", "pytest-xdist", "tomli", "virtualenv (>=13.0.0)", "wheel"] + [[package]] name = "termcolor" version = "2.3.0" @@ -1133,4 +1165,4 @@ testing = ["big-O", "jaraco.functools", "jaraco.itertools", "more-itertools", "p [metadata] lock-version = "2.0" python-versions = ">=3.8.1,<3.12" -content-hash = "3875699d5a270a4dda5b2f44b1c9949339a0e73d017100a98ec4c53b8cffb9b2" +content-hash = "4aaed02769b94faaad0c02186c3750488aa14de13eee5ce60e5400124ac01669" diff --git a/packages/opentelemetry-instrumentation-openai/pyproject.toml b/packages/opentelemetry-instrumentation-openai/pyproject.toml index 8b0201709e..590cbbf6f3 100644 --- a/packages/opentelemetry-instrumentation-openai/pyproject.toml +++ b/packages/opentelemetry-instrumentation-openai/pyproject.toml @@ -27,6 +27,7 @@ include = "opentelemetry/instrumentation/openai" python = ">=3.8.1,<3.12" openai = "^0.28.0" opentelemetry-api = "^1.19.0" +opentelemetry-instrumentation = "^0.40b0" [tool.poetry.dependencies.opentelemetry-semantic-conventions-llm] path = "../opentelemetry-semantic-conventions-llm" diff --git a/packages/opentelemetry-instrumentation-pinecone/poetry.lock b/packages/opentelemetry-instrumentation-pinecone/poetry.lock index 7ce113f656..7fe6674ed1 100644 --- a/packages/opentelemetry-instrumentation-pinecone/poetry.lock +++ b/packages/opentelemetry-instrumentation-pinecone/poetry.lock @@ -272,6 +272,22 @@ files = [ deprecated = ">=1.2.6" importlib-metadata = ">=6.0,<7.0" +[[package]] +name = "opentelemetry-instrumentation" +version = "0.40b0" +description = "Instrumentation Tools & Auto Instrumentation for OpenTelemetry Python" +optional = false +python-versions = ">=3.7" +files = [ + {file = "opentelemetry_instrumentation-0.40b0-py3-none-any.whl", hash = "sha256:789d3726e698aa9526dd247b461b9172f99a4345571546c4aecf40279679fc8e"}, + {file = "opentelemetry_instrumentation-0.40b0.tar.gz", hash = "sha256:08bebe6a752514ed61e901e9fee5ccf06ae7533074442e707d75bb65f3e0aa17"}, +] + +[package.dependencies] +opentelemetry-api = ">=1.4,<2.0" +setuptools = ">=16.0" +wrapt = ">=1.0.0,<2.0.0" + [[package]] name = "opentelemetry-semantic-conventions-llm" version = "0.0.1" @@ -429,6 +445,22 @@ termcolor = ">=2.1.0" [package.extras] dev = ["black", "flake8", "pre-commit"] +[[package]] +name = "setuptools" +version = "68.1.2" +description = "Easily download, build, install, upgrade, and uninstall Python packages" +optional = false +python-versions = ">=3.8" +files = [ + {file = "setuptools-68.1.2-py3-none-any.whl", hash = "sha256:3d8083eed2d13afc9426f227b24fd1659489ec107c0e86cec2ffdde5c92e790b"}, + {file = "setuptools-68.1.2.tar.gz", hash = "sha256:3d4dfa6d95f1b101d695a6160a7626e15583af71a5f52176efa5d39a054d475d"}, +] + +[package.extras] +docs = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "pygments-github-lexers (==0.0.5)", "rst.linker (>=1.9)", "sphinx (>=3.5,<=7.1.2)", "sphinx-favicon", "sphinx-hoverxref (<2)", "sphinx-inline-tabs", "sphinx-lint", "sphinx-notfound-page (==0.8.3)", "sphinx-reredirects", "sphinxcontrib-towncrier"] +testing = ["build[virtualenv]", "filelock (>=3.4.0)", "flake8-2020", "ini2toml[lite] (>=0.9)", "jaraco.develop (>=7.21)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "pip (>=19.1)", "pytest (>=6)", "pytest-black (>=0.3.7)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=2.2)", "pytest-mypy (>=0.9.1)", "pytest-perf", "pytest-ruff", "pytest-timeout", "pytest-xdist", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel"] +testing-integration = ["build[virtualenv]", "filelock (>=3.4.0)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "pytest", "pytest-enabler", "pytest-xdist", "tomli", "virtualenv (>=13.0.0)", "wheel"] + [[package]] name = "termcolor" version = "2.3.0" @@ -556,4 +588,4 @@ testing = ["big-O", "jaraco.functools", "jaraco.itertools", "more-itertools", "p [metadata] lock-version = "2.0" python-versions = ">=3.8.1,<3.12" -content-hash = "64fd02242e97e0e64221f4e31d737af0d4208a9aea89565285926311ea4e59e2" +content-hash = "1b979090595c1e70fe597d44796f2eb332000dedc733f7a4b90b474e754a4f91" diff --git a/packages/opentelemetry-instrumentation-pinecone/pyproject.toml b/packages/opentelemetry-instrumentation-pinecone/pyproject.toml index 9450751f8c..0659a5e52a 100644 --- a/packages/opentelemetry-instrumentation-pinecone/pyproject.toml +++ b/packages/opentelemetry-instrumentation-pinecone/pyproject.toml @@ -26,6 +26,7 @@ include = "opentelemetry/instrumentation/pinecone" [tool.poetry.dependencies] python = ">=3.8.1,<3.12" opentelemetry-api = "^1.19.0" +opentelemetry-instrumentation = "^0.40b0" [tool.poetry.dependencies.opentelemetry-semantic-conventions-llm] path = "../opentelemetry-semantic-conventions-llm" diff --git a/packages/traceloop-sdk/poetry.lock b/packages/traceloop-sdk/poetry.lock index 2c729f3955..b1be6d5e39 100644 --- a/packages/traceloop-sdk/poetry.lock +++ b/packages/traceloop-sdk/poetry.lock @@ -818,6 +818,7 @@ develop = true [package.dependencies] openai = "^0.28.0" opentelemetry-api = "^1.19.0" +opentelemetry-instrumentation = "^0.40b0" opentelemetry-semantic-conventions-llm = {path = "../opentelemetry-semantic-conventions-llm", develop = true} [package.source] @@ -835,6 +836,7 @@ develop = true [package.dependencies] opentelemetry-api = "^1.19.0" +opentelemetry-instrumentation = "^0.40b0" opentelemetry-semantic-conventions-llm = {path = "../opentelemetry-semantic-conventions-llm", develop = true} [package.source] diff --git a/packages/traceloop-sdk/traceloop/sdk/semconv/__init__.py b/packages/traceloop-sdk/traceloop/sdk/semconv/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/packages/traceloop-sdk/traceloop/sdk/tracing/__init__.py b/packages/traceloop-sdk/traceloop/sdk/tracing/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/packages/traceloop-sdk/traceloop/sdk/tracing/context_manager.py b/packages/traceloop-sdk/traceloop/sdk/tracing/context_manager.py new file mode 100644 index 0000000000..6a3ea8f50d --- /dev/null +++ b/packages/traceloop-sdk/traceloop/sdk/tracing/context_manager.py @@ -0,0 +1,22 @@ +from contextlib import contextmanager, asynccontextmanager +from typing import Optional + +from sdk.tracing.tracer import Tracing + + +@contextmanager +def get_tracer(flush_on_exit: bool = False): + try: + yield Tracing.get_tracer() + finally: + if flush_on_exit: + Tracing.flush() + + +@asynccontextmanager +async def get_async_tracer(flush_on_exit: bool = False): + try: + yield Tracing.get_tracer() + finally: + if flush_on_exit: + Tracing.flush() diff --git a/packages/traceloop-sdk/traceloop/sdk/tracing/no_log_span_batch_processor.py b/packages/traceloop-sdk/traceloop/sdk/tracing/no_log_span_batch_processor.py new file mode 100644 index 0000000000..38312683d7 --- /dev/null +++ b/packages/traceloop-sdk/traceloop/sdk/tracing/no_log_span_batch_processor.py @@ -0,0 +1,39 @@ +import os + +from opentelemetry.context import ( + attach, + set_value, + _SUPPRESS_INSTRUMENTATION_KEY, + detach, +) +from opentelemetry.sdk.trace.export import BatchSpanProcessor +import logging + +logger = logging.getLogger(__name__) + + +class NoLogSpanBatchProcessor(BatchSpanProcessor): + def _export_batch(self) -> int: + """Exports at most max_export_batch_size spans and returns the number of + exported spans. + """ + idx = 0 + # currently only a single thread acts as consumer, so queue.pop() will + # not raise an exception + while idx < self.max_export_batch_size and self.queue: + self.spans_list[idx] = self.queue.pop() + idx += 1 + token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)) + try: + # Ignore type b/c the Optional[None]+slicing is too "clever" + # for mypy + self.span_exporter.export(self.spans_list[:idx]) # type: ignore + except Exception: # pylint: disable=broad-except + if os.getenv("TRACELOOP_LOGGING_ENABLED", "false").lower() == "true": + logger.exception("Exception while exporting Span batch.") + detach(token) + + # clean up list + for index in range(idx): + self.spans_list[index] = None + return idx diff --git a/packages/traceloop-sdk/traceloop/sdk/tracing/tracer.py b/packages/traceloop-sdk/traceloop/sdk/tracing/tracer.py new file mode 100644 index 0000000000..4afa83a4ae --- /dev/null +++ b/packages/traceloop-sdk/traceloop/sdk/tracing/tracer.py @@ -0,0 +1,153 @@ +import contextvars +import logging +import os +from typing import Optional + +from opentelemetry import trace +from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter +from opentelemetry.sdk.trace import TracerProvider, SpanProcessor, Tracer +import importlib.util + +from opentelemetry.sdk.trace.export import SpanExporter +from opentelemetry.trace import get_tracer_provider, ProxyTracerProvider +from traceloop.sdk.semconv import SpanAttributes +from traceloop.sdk.tracing import NoLogSpanBatchProcessor + +TRACER_NAME = "traceloop.tracer" +TRACELOOP_API_ENDPOINT = "https://api.traceloop.dev/v1/traces" +EXCLUDED_URLS = "api.openai.com,openai.azure.com" + +ctx_correlation_id: contextvars.ContextVar[str] = contextvars.ContextVar("correlation_id") +ctx_workflow_name: contextvars.ContextVar[str] = contextvars.ContextVar("workflow_name") + +def init_openai_instrumentor(): + if importlib.util.find_spec("openai") is not None: + from traceloop.instrumentation.openai import OpenAIInstrumentor + + instrumentor = OpenAIInstrumentor() + if not instrumentor.is_instrumented_by_opentelemetry: + instrumentor.instrument() + + +def init_requests_instrumentor(): + if importlib.util.find_spec("requests") is not None: + from opentelemetry.instrumentation.requests import RequestsInstrumentor + + instrumentor = RequestsInstrumentor() + if not instrumentor.is_instrumented_by_opentelemetry: + instrumentor.instrument(excluded_urls=EXCLUDED_URLS) + + +def init_urllib3_instrumentor(): + if importlib.util.find_spec("urllib3") is not None: + from opentelemetry.instrumentation.urllib3 import URLLib3Instrumentor + + instrumentor = URLLib3Instrumentor() + if not instrumentor.is_instrumented_by_opentelemetry: + instrumentor.instrument(excluded_urls=EXCLUDED_URLS) + + +def init_pymysql_instrumentor(): + if importlib.util.find_spec("pymysql") is not None: + from opentelemetry.instrumentation.pymysql import PyMySQLInstrumentor + + instrumentor = PyMySQLInstrumentor() + if not instrumentor.is_instrumented_by_opentelemetry: + instrumentor.instrument() + + +def init_instrumentations(): + init_openai_instrumentor() + init_requests_instrumentor() + init_urllib3_instrumentor() + init_pymysql_instrumentor() + + +def span_processor_on_start(span, parent_context): + workflow_name = ctx_workflow_name.get() + if workflow_name is not None: + span.set_attribute(SpanAttributes.TRACELOOP_WORKFLOW_NAME, workflow_name) + + correlation_id = ctx_correlation_id.get() + if correlation_id is not None: + span.set_attribute(SpanAttributes.TRACELOOP_CORRELATION_ID, correlation_id) + + +class Tracing: + __initialized: bool = False + __spans_exporter: SpanExporter = None + __spans_processor: SpanProcessor = None + __otel_tracer: Tracer = None + __otel_tracer_provider: TracerProvider = None + + @staticmethod + def init(app_name: Optional[str] = None): + if Tracing.__initialized: + return + + if app_name is not None: + print(f"Traceloop tracing initialized with app name: {app_name}") + os.environ["OTEL_SERVICE_NAME"] = app_name + + Tracing.init_spans_exporter() + Tracing.init_tracer_provider() + Tracing.init_spans_processor() + Tracing.__otel_tracer_provider.add_span_processor(Tracing.__spans_processor) + + Tracing.__otel_tracer_provider. + + init_instrumentations() + + Tracing.__initialized = True + + @staticmethod + def get_tracer(): + if not Tracing.__initialized: + raise Exception("Traceloop tracing is not initialized") + + return Tracing.__otel_tracer_provider.get_tracer(TRACER_NAME) + + @staticmethod + def init_spans_exporter(): + api_key = os.getenv("TRACELOOP_API_KEY") + api_endpoint = os.getenv("TRACELOOP_API_ENDPOINT") or TRACELOOP_API_ENDPOINT + + Tracing.__spans_exporter = OTLPSpanExporter( + endpoint=api_endpoint, + headers={ + "Authorization": f"Bearer {api_key}", + } + ) + + @staticmethod + def init_tracer_provider(): + tracer_provider: TracerProvider + default_provider: TracerProvider = get_tracer_provider() + + if isinstance(default_provider, ProxyTracerProvider): + tracer_provider = TracerProvider() + trace.set_tracer_provider(tracer_provider) + elif not hasattr(default_provider, "add_span_processor"): + logging.error("Cannot add span processor to the default provider since it doesn't support it") + return + else: + tracer_provider = default_provider + + Tracing.__otel_tracer_provider = tracer_provider + + @staticmethod + def init_spans_processor(): + Tracing.__spans_processor = NoLogSpanBatchProcessor(Tracing.__spans_exporter) + Tracing.__spans_processor.on_start = span_processor_on_start + + @staticmethod + def flush(): + Tracing.__spans_processor.force_flush() + + @staticmethod + def set_correlation_id(correlation_id: str): + ctx_correlation_id.set(correlation_id) + + @staticmethod + def set_workflow_name(workflow_name: str): + ctx_workflow_name.set(workflow_name)