diff --git a/loq.toml b/loq.toml index 4b64839..0296af2 100644 --- a/loq.toml +++ b/loq.toml @@ -10,7 +10,7 @@ max_lines = 750 # Source files that still need exceptions above 750 [[rules]] path = "src/docket/worker.py" -max_lines = 1125 +max_lines = 1120 [[rules]] path = "src/docket/cli.py" @@ -18,7 +18,7 @@ max_lines = 945 [[rules]] path = "src/docket/execution.py" -max_lines = 903 +max_lines = 910 [[rules]] path = "src/docket/docket.py" @@ -27,3 +27,7 @@ max_lines = 866 [[rules]] path = "src/docket/strikelist.py" max_lines = 616 + +[[rules]] +path = "src/docket/_redis.py" +max_lines = 544 diff --git a/pyproject.toml b/pyproject.toml index 1defb83..2f6a33f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,7 +28,6 @@ dependencies = [ "exceptiongroup>=1.2.0; python_version < '3.11'", "taskgroup>=0.2.2; python_version < '3.11'", "fakeredis[lua]>=2.32.1", - "opentelemetry-api>=1.33.0", "prometheus-client>=0.21.1", "py-key-value-aio[memory,redis]>=0.3.0", "python-json-logger>=2.0.7", @@ -39,7 +38,11 @@ dependencies = [ ] [project.optional-dependencies] +telemetry = [ + "opentelemetry-api>=1.33.0", +] metrics = [ + "pydocket[telemetry]", "opentelemetry-sdk>=1.33.0", ] diff --git a/src/docket/_otel.py b/src/docket/_otel.py new file mode 100644 index 0000000..b2e85bf --- /dev/null +++ b/src/docket/_otel.py @@ -0,0 +1,140 @@ +"""Unified OpenTelemetry interface with fallback to stubs. + +This module provides a single import point for OpenTelemetry functionality. +When OpenTelemetry is installed, it uses the real implementation. +When not installed, it falls back to noop stubs. + +Usage: + from docket._otel import trace, metrics, get_tracer, get_meter, OTEL_AVAILABLE +""" + +from typing import Any + +# Try importing OpenTelemetry, fall back to stubs if not available +try: + from opentelemetry import context as _otel_context + from opentelemetry import metrics as _otel_metrics + from opentelemetry import propagate as _otel_propagate + from opentelemetry import trace as _otel_trace + from opentelemetry.context import _SUPPRESS_INSTRUMENTATION_KEY + from opentelemetry.metrics import set_meter_provider + from opentelemetry.propagators.textmap import Getter, Setter + from opentelemetry.trace import Link, SpanKind, Status, StatusCode + + OTEL_AVAILABLE = True + + # Re-export trace functions + def get_tracer( + instrumenting_module_name: str, + instrumenting_library_version: str | None = None, + tracer_provider: Any = None, + schema_url: str | None = None, + ) -> Any: + """Get a tracer from OpenTelemetry.""" + return _otel_trace.get_tracer( + instrumenting_module_name, + instrumenting_library_version, + tracer_provider, + schema_url, + ) + + def get_current_span(context: Any = None) -> Any: + """Get the current span.""" + return _otel_trace.get_current_span(context) + + # Re-export metrics functions + def get_meter( + name: str, + version: str = "", + meter_provider: Any = None, + ) -> Any: + """Get a meter from OpenTelemetry.""" + return _otel_metrics.get_meter(name, version, meter_provider) + + # Re-export context functions + def context_get_current() -> Any: + """Get the current context.""" + return _otel_context.get_current() + + def context_set_value(key: str, value: Any, context: Any = None) -> Any: + """Set a value in context.""" + return _otel_context.set_value(key, value, context) + + def context_attach(context: Any) -> object: + """Attach a context.""" + return _otel_context.attach(context) + + def context_detach(token: object) -> None: + """Detach a context.""" + _otel_context.detach(token) + + # Re-export propagation functions + def propagate_inject( + carrier: Any, + context: Any = None, + setter: Any = None, + ) -> None: + """Inject trace context into a carrier.""" + _otel_propagate.inject(carrier, context, setter) + + def propagate_extract( + carrier: Any, + context: Any = None, + getter: Any = None, + ) -> Any: + """Extract trace context from a carrier.""" + return _otel_propagate.extract(carrier, context, getter) + + # Context type for type hints + Context = _otel_context.Context + +except ImportError: + from ._otel_stubs import ( + Link, + NoopContext, + NoopGetter as Getter, + NoopSetter as Setter, + SpanKind, + Status, + StatusCode, + _SUPPRESS_INSTRUMENTATION_KEY, + context_attach, + context_detach, + context_get_current, + context_set_value, + get_current_span, + get_meter, + get_tracer, + propagate_extract, + propagate_inject, + set_meter_provider, + ) + + OTEL_AVAILABLE = False + + # Context type for type hints when OTel not available + Context = NoopContext # type: ignore[misc,assignment] + + +__all__ = [ + "OTEL_AVAILABLE", + "Context", + "Getter", + "Link", + "Setter", + "SpanKind", + "Status", + "StatusCode", + "_SUPPRESS_INSTRUMENTATION_KEY", + "context_attach", + "context_detach", + "context_get_current", + "context_set_value", + "get_current_span", + "get_meter", + "get_tracer", + "propagate_extract", + "propagate_inject", + "set_meter_provider", +] + diff --git a/src/docket/_otel_stubs.py b/src/docket/_otel_stubs.py new file mode 100644 index 0000000..8442424 --- /dev/null +++ b/src/docket/_otel_stubs.py @@ -0,0 +1,354 @@ +"""Stub implementations for OpenTelemetry interfaces. + +These noop implementations allow docket to function without OpenTelemetry installed. +All operations are silent and do nothing. +""" + +from contextlib import contextmanager +from enum import Enum +from typing import Any, Generator, Mapping, Sequence + + +class StatusCode(Enum): + """Noop StatusCode enum matching OpenTelemetry's StatusCode.""" + + UNSET = 0 + OK = 1 + ERROR = 2 + + +class Status: + """Noop Status class matching OpenTelemetry's Status.""" + + def __init__( + self, status_code: StatusCode = StatusCode.UNSET, description: str | None = None + ) -> None: + self.status_code = status_code + self.description = description + + +class SpanKind(Enum): + """Noop SpanKind enum matching OpenTelemetry's SpanKind.""" + + INTERNAL = 0 + SERVER = 1 + CLIENT = 2 + PRODUCER = 3 + CONSUMER = 4 + + +class SpanContext: + """Noop SpanContext class.""" + + trace_id: int = 0 + span_id: int = 0 + is_remote: bool = False + trace_flags: int = 0 + trace_state: None = None + + @property + def is_valid(self) -> bool: + return False + + +class Link: + """Noop Link class matching OpenTelemetry's Link.""" + + def __init__( + self, + context: SpanContext, + attributes: Mapping[str, Any] | None = None, + ) -> None: + self.context = context + self.attributes = attributes or {} + + +class NoopSpan: + """Noop Span that does nothing.""" + + def __init__(self, name: str = "", context: SpanContext | None = None) -> None: + self._name = name + self._context = context or SpanContext() + + def get_span_context(self) -> SpanContext: + return self._context + + def set_status(self, status: Status | StatusCode, description: str | None = None) -> None: + pass + + def set_attribute(self, key: str, value: Any) -> None: + pass + + def set_attributes(self, attributes: Mapping[str, Any]) -> None: + pass + + def add_event( + self, + name: str, + attributes: Mapping[str, Any] | None = None, + timestamp: int | None = None, + ) -> None: + pass + + def record_exception( + self, + exception: BaseException, + attributes: Mapping[str, Any] | None = None, + timestamp: int | None = None, + escaped: bool = False, + ) -> None: + pass + + def update_name(self, name: str) -> None: + pass + + def is_recording(self) -> bool: + return False + + def end(self, end_time: int | None = None) -> None: + pass + + def __enter__(self) -> "NoopSpan": + return self + + def __exit__(self, *args: Any) -> None: + pass + + +class NoopTracer: + """Noop Tracer that returns noop spans.""" + + @contextmanager + def start_as_current_span( + self, + name: str, + context: Any = None, + kind: SpanKind = SpanKind.INTERNAL, + attributes: Mapping[str, Any] | None = None, + links: Sequence[Link] | None = None, + start_time: int | None = None, + record_exception: bool = True, + set_status_on_exception: bool = True, + end_on_exit: bool = True, + ) -> Generator[NoopSpan, None, None]: + yield NoopSpan(name) + + def start_span( + self, + name: str, + context: Any = None, + kind: SpanKind = SpanKind.INTERNAL, + attributes: Mapping[str, Any] | None = None, + links: Sequence[Link] | None = None, + start_time: int | None = None, + record_exception: bool = True, + set_status_on_exception: bool = True, + ) -> NoopSpan: + return NoopSpan(name) + + +class NoopCounter: + """Noop Counter that does nothing.""" + + def add( + self, + amount: int | float, + attributes: Mapping[str, Any] | None = None, + ) -> None: + pass + + +class NoopHistogram: + """Noop Histogram that does nothing.""" + + def record( + self, + amount: int | float, + attributes: Mapping[str, Any] | None = None, + ) -> None: + pass + + +class NoopGauge: + """Noop Gauge that does nothing.""" + + def set( + self, + amount: int | float, + attributes: Mapping[str, Any] | None = None, + ) -> None: + pass + + +class NoopUpDownCounter: + """Noop UpDownCounter that does nothing.""" + + def add( + self, + amount: int | float, + attributes: Mapping[str, Any] | None = None, + ) -> None: + pass + + +class NoopMeter: + """Noop Meter that returns noop instruments.""" + + def create_counter( + self, + name: str, + unit: str = "", + description: str = "", + ) -> NoopCounter: + return NoopCounter() + + def create_histogram( + self, + name: str, + unit: str = "", + description: str = "", + ) -> NoopHistogram: + return NoopHistogram() + + def create_gauge( + self, + name: str, + unit: str = "", + description: str = "", + ) -> NoopGauge: + return NoopGauge() + + def create_up_down_counter( + self, + name: str, + unit: str = "", + description: str = "", + ) -> NoopUpDownCounter: + return NoopUpDownCounter() + + +class NoopMeterProvider: + """Noop MeterProvider.""" + + def get_meter( + self, + name: str, + version: str | None = None, + schema_url: str | None = None, + ) -> NoopMeter: + return NoopMeter() + + +class NoopTracerProvider: + """Noop TracerProvider.""" + + def get_tracer( + self, + instrumenting_module_name: str, + instrumenting_library_version: str | None = None, + schema_url: str | None = None, + ) -> NoopTracer: + return NoopTracer() + + +# Global noop providers +_tracer_provider = NoopTracerProvider() +_meter_provider = NoopMeterProvider() + + +def get_tracer( + instrumenting_module_name: str, + instrumenting_library_version: str | None = None, + tracer_provider: NoopTracerProvider | None = None, + schema_url: str | None = None, +) -> NoopTracer: + """Get a noop tracer.""" + provider = tracer_provider or _tracer_provider + return provider.get_tracer(instrumenting_module_name, instrumenting_library_version) + + +def get_meter( + name: str, + version: str = "", + meter_provider: NoopMeterProvider | None = None, +) -> NoopMeter: + """Get a noop meter.""" + provider = meter_provider or _meter_provider + return provider.get_meter(name, version) + + +def get_current_span(context: Any = None) -> NoopSpan: + """Get the current span (always returns noop).""" + return NoopSpan() + + +def set_meter_provider(meter_provider: Any) -> None: + """Noop set_meter_provider.""" + pass + + +# Noop context module +class NoopContext: + """Noop context that stores nothing.""" + + pass + + +_SUPPRESS_INSTRUMENTATION_KEY = "suppress_instrumentation" + + +def context_get_current() -> NoopContext: + """Get current context (always returns new noop context).""" + return NoopContext() + + +def context_set_value(key: str, value: Any, context: NoopContext | None = None) -> NoopContext: + """Set a value in context (returns same noop context).""" + return context or NoopContext() + + +def context_attach(context: NoopContext) -> object: + """Attach context (returns noop token).""" + return object() + + +def context_detach(token: object) -> None: + """Detach context (does nothing).""" + pass + + +# Noop propagation +class NoopGetter: + """Noop Getter for propagation.""" + + def get(self, carrier: Any, key: str) -> list[str] | None: + return None + + def keys(self, carrier: Any) -> list[str]: + return [] + + +class NoopSetter: + """Noop Setter for propagation.""" + + def set(self, carrier: Any, key: str, value: str) -> None: + pass + + +def propagate_inject( + carrier: Any, + context: Any = None, + setter: NoopSetter | None = None, +) -> None: + """Noop inject (does nothing).""" + pass + + +def propagate_extract( + carrier: Any, + context: Any = None, + getter: NoopGetter | None = None, +) -> NoopContext: + """Noop extract (returns noop context).""" + return NoopContext() + diff --git a/src/docket/_telemetry.py b/src/docket/_telemetry.py index 23f34c4..332c406 100644 --- a/src/docket/_telemetry.py +++ b/src/docket/_telemetry.py @@ -11,8 +11,13 @@ from contextlib import contextmanager from typing import Generator -from opentelemetry import context -from opentelemetry.context import _SUPPRESS_INSTRUMENTATION_KEY +from ._otel import ( + _SUPPRESS_INSTRUMENTATION_KEY, + context_attach, + context_detach, + context_get_current, + context_set_value, +) _SUPPRESS_INSTRUMENTATION_KEY_PLAIN = "suppress_instrumentation" @@ -25,11 +30,11 @@ def suppress_instrumentation() -> Generator[None, None, None]: creating spans. This is useful for internal operations (like Redis polling) that would generate excessive noise in traces. """ - ctx = context.get_current() - ctx = context.set_value(_SUPPRESS_INSTRUMENTATION_KEY, True, ctx) - ctx = context.set_value(_SUPPRESS_INSTRUMENTATION_KEY_PLAIN, True, ctx) - token = context.attach(ctx) + ctx = context_get_current() + ctx = context_set_value(_SUPPRESS_INSTRUMENTATION_KEY, True, ctx) + ctx = context_set_value(_SUPPRESS_INSTRUMENTATION_KEY_PLAIN, True, ctx) + token = context_attach(ctx) try: yield finally: - context.detach(token) + context_detach(token) diff --git a/src/docket/docket.py b/src/docket/docket.py index f65b0aa..ff4253a 100644 --- a/src/docket/docket.py +++ b/src/docket/docket.py @@ -21,7 +21,6 @@ import redis.exceptions from key_value.aio.protocols.key_value import AsyncKeyValue -from opentelemetry import trace from redis.asyncio import Redis from redis.asyncio.client import PubSub from redis.asyncio.cluster import RedisCluster @@ -31,6 +30,7 @@ from ._docket_snapshot import DocketSnapshotMixin from ._docket_snapshot import RunningExecution as RunningExecution from ._docket_snapshot import WorkerInfo as WorkerInfo +from ._otel import get_tracer from ._redis import RedisConnection from ._result_store import ResultStorage from ._uuid7 import uuid7 @@ -54,7 +54,7 @@ ) logger: logging.Logger = logging.getLogger(__name__) -tracer: trace.Tracer = trace.get_tracer(__name__) +tracer = get_tracer(__name__) class _cancel_task(Protocol): diff --git a/src/docket/execution.py b/src/docket/execution.py index 3f55250..261fc4e 100644 --- a/src/docket/execution.py +++ b/src/docket/execution.py @@ -19,11 +19,17 @@ ) import cloudpickle -import opentelemetry.context -from opentelemetry import propagate, trace -from ._telemetry import suppress_instrumentation from typing_extensions import Self +from ._otel import ( + Context, + Link, + get_current_span, + propagate_extract, + propagate_inject, +) +from ._telemetry import suppress_instrumentation + from ._execution_progress import ExecutionProgress, ProgressEvent, StateEvent from .annotations import Logged from .instrumentation import CACHE_SIZE, message_getter, message_setter @@ -108,7 +114,7 @@ def __init__( key: str, when: datetime, attempt: int, - trace_context: opentelemetry.context.Context | None = None, + trace_context: Context | None = None, redelivered: bool = False, function_name: str | None = None, ) -> None: @@ -173,7 +179,7 @@ def function_name(self) -> str: # Scheduling metadata properties @property - def trace_context(self) -> opentelemetry.context.Context | None: + def trace_context(self) -> Context | None: """OpenTelemetry trace context.""" return self._trace_context @@ -225,7 +231,7 @@ async def from_message( key=message[b"key"].decode(), when=datetime.fromisoformat(message[b"when"].decode()), attempt=int(message[b"attempt"].decode()), - trace_context=propagate.extract(message, getter=message_getter), + trace_context=propagate_extract(message, getter=message_getter), redelivered=redelivered, function_name=function_name, ) @@ -271,10 +277,10 @@ def call_repr(self) -> str: return f"{function_name}({', '.join(arguments)}){{{self.key}}}" - def incoming_span_links(self) -> list[trace.Link]: - initiating_span = trace.get_current_span(self.trace_context) + def incoming_span_links(self) -> list[Link]: + initiating_span = get_current_span(self.trace_context) initiating_context = initiating_span.get_span_context() - return [trace.Link(initiating_context)] if initiating_context.is_valid else [] + return [Link(initiating_context)] if initiating_context.is_valid else [] async def schedule( self, replace: bool = False, reschedule_message: "RedisMessageID | None" = None @@ -302,7 +308,7 @@ async def schedule( Used when a task needs to be rescheduled from an active stream message. """ message: dict[bytes, bytes] = self.as_message() - propagate.inject(message, setter=message_setter) + propagate_inject(message, setter=message_setter) key = self.key when = self.when diff --git a/src/docket/instrumentation.py b/src/docket/instrumentation.py index 105326c..dafbeb0 100644 --- a/src/docket/instrumentation.py +++ b/src/docket/instrumentation.py @@ -1,12 +1,10 @@ from contextlib import contextmanager from threading import Thread -from typing import Generator, cast +from typing import Any, Generator, cast -from opentelemetry import metrics -from opentelemetry.metrics import set_meter_provider -from opentelemetry.propagators.textmap import Getter, Setter +from ._otel import Getter, Setter, get_meter, set_meter_provider -meter: metrics.Meter = metrics.get_meter("docket") +meter = get_meter("docket") TASKS_ADDED = meter.create_counter( "docket_tasks_added", @@ -130,7 +128,7 @@ Message = dict[bytes, bytes] -class MessageGetter(Getter[Message]): +class MessageGetter(Getter): # type: ignore[type-arg] def get(self, carrier: Message, key: str) -> list[str] | None: val = carrier.get(key.encode(), None) if val is None: @@ -141,7 +139,7 @@ def keys(self, carrier: Message) -> list[str]: return [key.decode() for key in carrier.keys()] -class MessageSetter(Setter[Message]): +class MessageSetter(Setter): # type: ignore[type-arg] def set( self, carrier: Message, @@ -166,7 +164,7 @@ def healthcheck_server( from http.server import BaseHTTPRequestHandler, HTTPServer class HealthcheckHandler(BaseHTTPRequestHandler): - def do_GET(self): + def do_GET(self) -> None: self.send_response(200) self.send_header("Content-type", "text/plain") self.end_headers() @@ -192,7 +190,6 @@ def metrics_server( return import sys - from typing import Any try: from opentelemetry.sdk.metrics import MeterProvider diff --git a/src/docket/worker.py b/src/docket/worker.py index 39852aa..2cdfabe 100644 --- a/src/docket/worker.py +++ b/src/docket/worker.py @@ -30,10 +30,8 @@ else: from asyncio import TaskGroup # pragma: no cover -from opentelemetry import trace -from opentelemetry.trace import Status, StatusCode, Tracer - from ._cancellation import CANCEL_MSG_CLEANUP, cancel_task +from ._otel import SpanKind, Status, StatusCode, get_tracer from ._telemetry import suppress_instrumentation from redis.asyncio import Redis from redis.exceptions import ConnectionError, LockError, ResponseError @@ -121,7 +119,7 @@ async def default_fallback_task( logger: logging.Logger = logging.getLogger(__name__) -tracer: Tracer = trace.get_tracer(__name__) +tracer = get_tracer(__name__) class _stream_due_tasks(Protocol): @@ -825,7 +823,7 @@ async def _execute(self, execution: Execution) -> None: with tracer.start_as_current_span( execution.function_name, - kind=trace.SpanKind.CONSUMER, + kind=SpanKind.CONSUMER, attributes={ **self.labels(), **execution.specific_labels(), diff --git a/tests/instrumentation/__init__.py b/tests/instrumentation/__init__.py new file mode 100644 index 0000000..1c1ffc4 --- /dev/null +++ b/tests/instrumentation/__init__.py @@ -0,0 +1,2 @@ +"""OpenTelemetry instrumentation tests.""" + diff --git a/tests/instrumentation/conftest.py b/tests/instrumentation/conftest.py new file mode 100644 index 0000000..d4b3c0b --- /dev/null +++ b/tests/instrumentation/conftest.py @@ -0,0 +1,34 @@ +"""Pytest configuration for instrumentation tests. + +These tests require both OpenTelemetry API and SDK to be installed. +The API provides base types, while the SDK provides implementations +needed for metrics_server and TracerProvider. + +If either is missing, all tests in this directory are skipped. +""" + +from importlib import import_module + +import pytest + +from docket._otel import OTEL_AVAILABLE + + +def _check_otel_sdk_available() -> bool: + """Check if OpenTelemetry SDK is available (needed for metrics_server and TracerProvider).""" + try: + import_module("opentelemetry.sdk.metrics") + import_module("opentelemetry.sdk.trace") + return True + except ImportError: + return False + + +OTEL_SDK_AVAILABLE = _check_otel_sdk_available() + +# Skip all tests in this directory if OpenTelemetry API or SDK is not available +pytestmark = pytest.mark.skipif( + not OTEL_AVAILABLE or not OTEL_SDK_AVAILABLE, + reason="OpenTelemetry SDK not installed (install with: pip install pydocket[metrics])", +) + diff --git a/tests/instrumentation/test_counters.py b/tests/instrumentation/test_counters.py index a9b3728..ac9065e 100644 --- a/tests/instrumentation/test_counters.py +++ b/tests/instrumentation/test_counters.py @@ -9,7 +9,6 @@ if sys.version_info < (3, 11): # pragma: no cover from exceptiongroup import ExceptionGroup -from opentelemetry.metrics import Counter from docket import Docket, Worker from docket.dependencies import Perpetual, Retry @@ -24,7 +23,7 @@ def task_labels(docket: Docket, the_task: AsyncMock) -> dict[str, str]: @pytest.fixture def TASKS_ADDED(monkeypatch: pytest.MonkeyPatch) -> Mock: """Mock for the TASKS_ADDED counter.""" - mock_obj = Mock(spec=Counter.add) + mock_obj = Mock() monkeypatch.setattr("docket.instrumentation.TASKS_ADDED.add", mock_obj) return mock_obj @@ -32,7 +31,7 @@ def TASKS_ADDED(monkeypatch: pytest.MonkeyPatch) -> Mock: @pytest.fixture def TASKS_REPLACED(monkeypatch: pytest.MonkeyPatch) -> Mock: """Mock for the TASKS_REPLACED counter.""" - mock_obj = Mock(spec=Counter.add) + mock_obj = Mock() monkeypatch.setattr("docket.instrumentation.TASKS_REPLACED.add", mock_obj) return mock_obj @@ -40,7 +39,7 @@ def TASKS_REPLACED(monkeypatch: pytest.MonkeyPatch) -> Mock: @pytest.fixture def TASKS_SCHEDULED(monkeypatch: pytest.MonkeyPatch) -> Mock: """Mock for the TASKS_SCHEDULED counter.""" - mock_obj = Mock(spec=Counter.add) + mock_obj = Mock() monkeypatch.setattr("docket.instrumentation.TASKS_SCHEDULED.add", mock_obj) return mock_obj @@ -85,7 +84,7 @@ async def test_replacing_a_task_increments_counter( @pytest.fixture def TASKS_CANCELLED(monkeypatch: pytest.MonkeyPatch) -> Mock: """Mock for the TASKS_CANCELLED counter.""" - mock_obj = Mock(spec=Counter.add) + mock_obj = Mock() monkeypatch.setattr("docket.instrumentation.TASKS_CANCELLED.add", mock_obj) return mock_obj @@ -122,7 +121,7 @@ def worker_labels( @pytest.fixture def TASKS_STARTED(monkeypatch: pytest.MonkeyPatch) -> Mock: """Mock for the TASKS_STARTED counter.""" - mock_obj = Mock(spec=Counter.add) + mock_obj = Mock() monkeypatch.setattr("docket.instrumentation.TASKS_STARTED.add", mock_obj) return mock_obj @@ -130,7 +129,7 @@ def TASKS_STARTED(monkeypatch: pytest.MonkeyPatch) -> Mock: @pytest.fixture def TASKS_COMPLETED(monkeypatch: pytest.MonkeyPatch) -> Mock: """Mock for the TASKS_COMPLETED counter.""" - mock_obj = Mock(spec=Counter.add) + mock_obj = Mock() monkeypatch.setattr("docket.instrumentation.TASKS_COMPLETED.add", mock_obj) return mock_obj @@ -138,7 +137,7 @@ def TASKS_COMPLETED(monkeypatch: pytest.MonkeyPatch) -> Mock: @pytest.fixture def TASKS_SUCCEEDED(monkeypatch: pytest.MonkeyPatch) -> Mock: """Mock for the TASKS_SUCCEEDED counter.""" - mock_obj = Mock(spec=Counter.add) + mock_obj = Mock() monkeypatch.setattr("docket.instrumentation.TASKS_SUCCEEDED.add", mock_obj) return mock_obj @@ -146,7 +145,7 @@ def TASKS_SUCCEEDED(monkeypatch: pytest.MonkeyPatch) -> Mock: @pytest.fixture def TASKS_FAILED(monkeypatch: pytest.MonkeyPatch) -> Mock: """Mock for the TASKS_FAILED counter.""" - mock_obj = Mock(spec=Counter.add) + mock_obj = Mock() monkeypatch.setattr("docket.instrumentation.TASKS_FAILED.add", mock_obj) return mock_obj @@ -154,7 +153,7 @@ def TASKS_FAILED(monkeypatch: pytest.MonkeyPatch) -> Mock: @pytest.fixture def TASKS_RETRIED(monkeypatch: pytest.MonkeyPatch) -> Mock: """Mock for the TASKS_RETRIED counter.""" - mock_obj = Mock(spec=Counter.add) + mock_obj = Mock() monkeypatch.setattr("docket.instrumentation.TASKS_RETRIED.add", mock_obj) return mock_obj @@ -162,7 +161,7 @@ def TASKS_RETRIED(monkeypatch: pytest.MonkeyPatch) -> Mock: @pytest.fixture def TASKS_PERPETUATED(monkeypatch: pytest.MonkeyPatch) -> Mock: """Mock for the TASKS_PERPETUATED counter.""" - mock_obj = Mock(spec=Counter.add) + mock_obj = Mock() monkeypatch.setattr("docket.instrumentation.TASKS_PERPETUATED.add", mock_obj) return mock_obj @@ -170,7 +169,7 @@ def TASKS_PERPETUATED(monkeypatch: pytest.MonkeyPatch) -> Mock: @pytest.fixture def TASKS_REDELIVERED(monkeypatch: pytest.MonkeyPatch) -> Mock: """Mock for the TASKS_REDELIVERED counter.""" - mock_obj = Mock(spec=Counter.add) + mock_obj = Mock() monkeypatch.setattr("docket.instrumentation.TASKS_REDELIVERED.add", mock_obj) return mock_obj diff --git a/tests/instrumentation/test_export.py b/tests/instrumentation/test_export.py index cf17f51..9e640d4 100644 --- a/tests/instrumentation/test_export.py +++ b/tests/instrumentation/test_export.py @@ -8,8 +8,6 @@ from unittest.mock import AsyncMock, Mock import pytest -from opentelemetry.metrics import Histogram, UpDownCounter -from opentelemetry.metrics import _Gauge as Gauge from docket import Docket, Worker from docket.instrumentation import healthcheck_server, metrics_server @@ -30,7 +28,7 @@ def worker_labels( @pytest.fixture def TASK_DURATION(monkeypatch: pytest.MonkeyPatch) -> Mock: """Mock for the TASK_DURATION histogram.""" - mock_obj = Mock(spec=Histogram.record) + mock_obj = Mock() monkeypatch.setattr("docket.instrumentation.TASK_DURATION.record", mock_obj) return mock_obj @@ -56,7 +54,7 @@ async def the_task(): @pytest.fixture def TASK_PUNCTUALITY(monkeypatch: pytest.MonkeyPatch) -> Mock: """Mock for the TASK_PUNCTUALITY histogram.""" - mock_obj = Mock(spec=Histogram.record) + mock_obj = Mock() monkeypatch.setattr("docket.instrumentation.TASK_PUNCTUALITY.record", mock_obj) return mock_obj @@ -84,7 +82,7 @@ async def test_task_punctuality_is_measured( @pytest.fixture def TASKS_RUNNING(monkeypatch: pytest.MonkeyPatch) -> Mock: """Mock for the TASKS_RUNNING up-down counter.""" - mock_obj = Mock(spec=UpDownCounter.add) + mock_obj = Mock() monkeypatch.setattr("docket.instrumentation.TASKS_RUNNING.add", mock_obj) return mock_obj @@ -167,7 +165,7 @@ def read_metrics(port: int) -> tuple[http.client.HTTPResponse, str]: @pytest.fixture def QUEUE_DEPTH(monkeypatch: pytest.MonkeyPatch) -> Mock: """Mock for the QUEUE_DEPTH counter.""" - mock_obj = Mock(spec=Gauge.set) + mock_obj = Mock() monkeypatch.setattr("docket.instrumentation.QUEUE_DEPTH.set", mock_obj) return mock_obj @@ -175,7 +173,7 @@ def QUEUE_DEPTH(monkeypatch: pytest.MonkeyPatch) -> Mock: @pytest.fixture def SCHEDULE_DEPTH(monkeypatch: pytest.MonkeyPatch) -> Mock: """Mock for the SCHEDULE_DEPTH counter.""" - mock_obj = Mock(spec=Gauge.set) + mock_obj = Mock() monkeypatch.setattr("docket.instrumentation.SCHEDULE_DEPTH.set", mock_obj) return mock_obj diff --git a/tests/instrumentation/test_tracing.py b/tests/instrumentation/test_tracing.py index 79af561..8d18e16 100644 --- a/tests/instrumentation/test_tracing.py +++ b/tests/instrumentation/test_tracing.py @@ -14,7 +14,7 @@ @pytest.fixture(scope="module", autouse=True) def tracer_provider() -> TracerProvider: - """Sets up a "real" TracerProvider so that spans are recorded for the tests""" + """Sets up a "real" TracerProvider so that spans are recorded for the tests.""" provider = TracerProvider() trace.set_tracer_provider(provider) return provider diff --git a/uv.lock b/uv.lock index b2f0e32..4be910c 100644 --- a/uv.lock +++ b/uv.lock @@ -1578,7 +1578,6 @@ dependencies = [ { name = "croniter" }, { name = "exceptiongroup", marker = "python_full_version < '3.11'" }, { name = "fakeredis", extra = ["lua"] }, - { name = "opentelemetry-api" }, { name = "prometheus-client" }, { name = "py-key-value-aio", extra = ["memory", "redis"] }, { name = "python-json-logger" }, @@ -1591,8 +1590,12 @@ dependencies = [ [package.optional-dependencies] metrics = [ + { name = "opentelemetry-api" }, { name = "opentelemetry-sdk" }, ] +telemetry = [ + { name = "opentelemetry-api" }, +] [package.dev-dependencies] dev = [ @@ -1642,7 +1645,8 @@ requires-dist = [ { name = "croniter", specifier = ">=6" }, { name = "exceptiongroup", marker = "python_full_version < '3.11'", specifier = ">=1.2.0" }, { name = "fakeredis", extras = ["lua"], specifier = ">=2.32.1" }, - { name = "opentelemetry-api", specifier = ">=1.33.0" }, + { name = "opentelemetry-api", marker = "extra == 'metrics'", specifier = ">=1.33.0" }, + { name = "opentelemetry-api", marker = "extra == 'telemetry'", specifier = ">=1.33.0" }, { name = "opentelemetry-sdk", marker = "extra == 'metrics'", specifier = ">=1.33.0" }, { name = "prometheus-client", specifier = ">=0.21.1" }, { name = "py-key-value-aio", extras = ["memory", "redis"], specifier = ">=0.3.0" }, @@ -1653,7 +1657,7 @@ requires-dist = [ { name = "typer", specifier = ">=0.15.1" }, { name = "typing-extensions", specifier = ">=4.12.0" }, ] -provides-extras = ["metrics"] +provides-extras = ["metrics", "telemetry"] [package.metadata.requires-dev] dev = [