diff --git a/docs/API.md b/docs/API.md index eba59b30c4a..724c75cc8c2 100644 --- a/docs/API.md +++ b/docs/API.md @@ -408,14 +408,74 @@ app.listen(3000) The Datadog SDK supports many of the configurations supported by the OpenTelemetry SDK. The following environment variables are supported: - `DD_LOGS_OTEL_ENABLED` - Enable OpenTelemetry logs (default: `false`) -- `OTEL_EXPORTER_OTLP_LOGS_ENDPOINT` - OTLP endpoint URL for logs (default: `http://localhost:4318`) -- `OTEL_EXPORTER_OTLP_LOGS_HEADERS` - Optional headers in JSON format for logs (default: `{}`) -- `OTEL_EXPORTER_OTLP_LOGS_PROTOCOL` - OTLP protocol for logs (default: `http/protobuf`) -- `OTEL_EXPORTER_OTLP_LOGS_TIMEOUT` - Request timeout in milliseconds for logs (default: `10000`) -- `OTEL_BSP_SCHEDULE_DELAY` - Batch timeout in milliseconds (default: `5000`) +- `OTEL_EXPORTER_OTLP_LOGS_ENDPOINT` - OTLP endpoint URL for logs. Falls back to `OTEL_EXPORTER_OTLP_ENDPOINT` with `/v1/logs` appended (default: `http://localhost:4318/v1/logs`) +- `OTEL_EXPORTER_OTLP_LOGS_HEADERS` - Optional headers for logs in JSON format. Falls back to `OTEL_EXPORTER_OTLP_HEADERS` (default: `{}`) +- `OTEL_EXPORTER_OTLP_LOGS_PROTOCOL` - OTLP protocol for logs. Options: `http/protobuf`, `http/json`. Falls back to `OTEL_EXPORTER_OTLP_PROTOCOL` (default: `http/protobuf`) +- `OTEL_EXPORTER_OTLP_LOGS_TIMEOUT` - Request timeout in milliseconds for logs. Falls back to `OTEL_EXPORTER_OTLP_TIMEOUT` (default: `10000`) +- `OTEL_BSP_SCHEDULE_DELAY` - Batch export delay in milliseconds (default: `5000`) - `OTEL_BSP_MAX_EXPORT_BATCH_SIZE` - Maximum logs per batch (default: `512`) +- `OTEL_BSP_MAX_QUEUE_SIZE` - Maximum logs to queue before dropping (default: `2048`) -Logs are exported via OTLP over HTTP. The protocol can be configured using `OTEL_EXPORTER_OTLP_LOGS_PROTOCOL` or `OTEL_EXPORTER_OTLP_PROTOCOL` environment variables. Supported protocols are `http/protobuf` (default) and `http/json`. For complete OTLP exporter configuration options, see the [OpenTelemetry OTLP Exporter documentation](https://opentelemetry.io/docs/languages/sdk-configuration/otlp-exporter/). +For complete OTLP exporter configuration options, see the [OpenTelemetry OTLP Exporter documentation](https://opentelemetry.io/docs/languages/sdk-configuration/otlp-exporter/). + +

OpenTelemetry Metrics

+ +dd-trace-js includes experimental support for OpenTelemetry metrics, designed as a drop-in replacement for the OpenTelemetry Metrics SDK. This lightweight implementation is fully compliant with the OpenTelemetry Metrics API and integrates with the existing OTLP export infrastructure. Enable it by setting `DD_METRICS_OTEL_ENABLED=true` and use the [OpenTelemetry Metrics API](https://open-telemetry.github.io/opentelemetry-js/modules/_opentelemetry_api.html) to record metric data: + +```javascript +require('dd-trace').init() +const { metrics } = require('@opentelemetry/api') + +const meter = metrics.getMeter('my-service', '1.0.0') + +// Counter - monotonically increasing values +const requestCounter = meter.createCounter('http.requests', { + description: 'Total HTTP requests', + unit: 'requests' +}) +requestCounter.add(1, { method: 'GET', status: 200 }) + +// Histogram - distribution of values +const durationHistogram = meter.createHistogram('http.duration', { + description: 'HTTP request duration', + unit: 'ms' +}) +durationHistogram.record(145, { route: '/api/users' }) + +// UpDownCounter - can increase and decrease +const connectionCounter = meter.createUpDownCounter('active.connections', { + description: 'Active connections', + unit: 'connections' +}) +connectionCounter.add(1) // New connection +connectionCounter.add(-1) // Connection closed + +// ObservableGauge - asynchronous observations +const cpuGauge = meter.createObservableGauge('system.cpu.usage', { + description: 'CPU usage percentage', + unit: 'percent' +}) +cpuGauge.addCallback((result) => { + const cpuUsage = process.cpuUsage() + result.observe(cpuUsage.system / 1000000, { core: '0' }) +}) +``` + +#### Supported Configuration + +The Datadog SDK supports many of the configurations supported by the OpenTelemetry SDK. The following environment variables are supported: + +- `DD_METRICS_OTEL_ENABLED` - Enable OpenTelemetry metrics (default: `false`) +- `OTEL_EXPORTER_OTLP_METRICS_ENDPOINT` - OTLP endpoint URL for metrics. Falls back to `OTEL_EXPORTER_OTLP_ENDPOINT` with `/v1/metrics` appended (default: `http://localhost:4318/v1/metrics`) +- `OTEL_EXPORTER_OTLP_METRICS_HEADERS` - Optional headers for metrics in JSON format. Falls back to `OTEL_EXPORTER_OTLP_HEADERS` (default: `{}`) +- `OTEL_EXPORTER_OTLP_METRICS_PROTOCOL` - OTLP protocol for metrics. Options: `http/protobuf`, `http/json`. Falls back to `OTEL_EXPORTER_OTLP_PROTOCOL` (default: `http/protobuf`) +- `OTEL_EXPORTER_OTLP_METRICS_TIMEOUT` - Request timeout in milliseconds for metrics. Falls back to `OTEL_EXPORTER_OTLP_TIMEOUT` (default: `10000`) +- `OTEL_METRIC_EXPORT_INTERVAL` - Metric export interval in milliseconds (default: `10000`) +- `OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE` - Aggregation temporality preference. Options: `CUMULATIVE`, `DELTA`, `LOWMEMORY` (default: `DELTA`). See [OpenTelemetry spec](https://opentelemetry.io/docs/specs/otel/metrics/sdk_exporters/otlp/#additional-environment-variable-configuration) for details +- `OTEL_BSP_MAX_QUEUE_SIZE` - Maximum metrics to queue before dropping (default: `2048`) +- `OTEL_METRIC_EXPORT_TIMEOUT` - [NOT YET SUPPORTED] Time to export metrics including retries + +For complete OTLP exporter configuration options, see the [OpenTelemetry OTLP Exporter documentation](https://opentelemetry.io/docs/languages/sdk-configuration/otlp-exporter/).

Advanced Configuration

diff --git a/packages/dd-trace/src/config.js b/packages/dd-trace/src/config.js index fd02d3766ad..bd06c07373b 100644 --- a/packages/dd-trace/src/config.js +++ b/packages/dd-trace/src/config.js @@ -476,6 +476,7 @@ class Config { DD_INSTRUMENTATION_CONFIG_ID, DD_LOGS_INJECTION, DD_LOGS_OTEL_ENABLED, + DD_METRICS_OTEL_ENABLED, DD_LANGCHAIN_SPAN_CHAR_LIMIT, DD_LANGCHAIN_SPAN_PROMPT_COMPLETION_SAMPLE_RATE, DD_LLMOBS_AGENTLESS_ENABLED, @@ -570,12 +571,20 @@ class Config { OTEL_EXPORTER_OTLP_LOGS_HEADERS, OTEL_EXPORTER_OTLP_LOGS_PROTOCOL, OTEL_EXPORTER_OTLP_LOGS_TIMEOUT, + OTEL_EXPORTER_OTLP_METRICS_ENDPOINT, + OTEL_EXPORTER_OTLP_METRICS_HEADERS, + OTEL_EXPORTER_OTLP_METRICS_PROTOCOL, + OTEL_EXPORTER_OTLP_METRICS_TIMEOUT, + OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE, + OTEL_METRIC_EXPORT_TIMEOUT, OTEL_EXPORTER_OTLP_PROTOCOL, OTEL_EXPORTER_OTLP_ENDPOINT, OTEL_EXPORTER_OTLP_HEADERS, OTEL_EXPORTER_OTLP_TIMEOUT, OTEL_BSP_SCHEDULE_DELAY, - OTEL_BSP_MAX_EXPORT_BATCH_SIZE + OTEL_BSP_MAX_EXPORT_BATCH_SIZE, + OTEL_BSP_MAX_QUEUE_SIZE, + OTEL_METRIC_EXPORT_INTERVAL } = source const tags = {} @@ -602,10 +611,40 @@ class Config { this.#setString(target, 'otelLogsHeaders', OTEL_EXPORTER_OTLP_LOGS_HEADERS || target.otelHeaders) this.#setString(target, 'otelProtocol', OTEL_EXPORTER_OTLP_PROTOCOL) this.#setString(target, 'otelLogsProtocol', OTEL_EXPORTER_OTLP_LOGS_PROTOCOL || target.otelProtocol) - target.otelTimeout = maybeInt(OTEL_EXPORTER_OTLP_TIMEOUT) - target.otelLogsTimeout = maybeInt(OTEL_EXPORTER_OTLP_LOGS_TIMEOUT) || target.otelTimeout - target.otelLogsBatchTimeout = maybeInt(OTEL_BSP_SCHEDULE_DELAY) - target.otelLogsMaxExportBatchSize = maybeInt(OTEL_BSP_MAX_EXPORT_BATCH_SIZE) + const otelTimeout = nonNegInt(OTEL_EXPORTER_OTLP_TIMEOUT, 'OTEL_EXPORTER_OTLP_TIMEOUT') + if (otelTimeout !== undefined) { + target.otelTimeout = otelTimeout + } + const otelLogsTimeout = nonNegInt(OTEL_EXPORTER_OTLP_LOGS_TIMEOUT, 'OTEL_EXPORTER_OTLP_LOGS_TIMEOUT') + target.otelLogsTimeout = otelLogsTimeout === undefined ? target.otelTimeout : otelLogsTimeout + const otelBatchTimeout = nonNegInt(OTEL_BSP_SCHEDULE_DELAY, 'OTEL_BSP_SCHEDULE_DELAY', false) + if (otelBatchTimeout !== undefined) { + target.otelBatchTimeout = otelBatchTimeout + } + target.otelMaxExportBatchSize = nonNegInt(OTEL_BSP_MAX_EXPORT_BATCH_SIZE, 'OTEL_BSP_MAX_EXPORT_BATCH_SIZE', false) + target.otelMaxQueueSize = nonNegInt(OTEL_BSP_MAX_QUEUE_SIZE, 'OTEL_BSP_MAX_QUEUE_SIZE', false) + + const otelMetricsExporter = !OTEL_METRICS_EXPORTER || OTEL_METRICS_EXPORTER.toLowerCase() !== 'none' + this.#setBoolean(target, 'otelMetricsEnabled', DD_METRICS_OTEL_ENABLED && otelMetricsExporter) + // Set OpenTelemetry metrics configuration with specific _METRICS_ vars + // taking precedence over generic _EXPORTERS_ vars + if (OTEL_EXPORTER_OTLP_ENDPOINT || OTEL_EXPORTER_OTLP_METRICS_ENDPOINT) { + this.#setString(target, 'otelMetricsUrl', OTEL_EXPORTER_OTLP_METRICS_ENDPOINT || target.otelUrl) + } + this.#setString(target, 'otelMetricsHeaders', OTEL_EXPORTER_OTLP_METRICS_HEADERS || target.otelHeaders) + this.#setString(target, 'otelMetricsProtocol', OTEL_EXPORTER_OTLP_METRICS_PROTOCOL || target.otelProtocol) + const otelMetricsTimeout = nonNegInt(OTEL_EXPORTER_OTLP_METRICS_TIMEOUT, 'OTEL_EXPORTER_OTLP_METRICS_TIMEOUT') + target.otelMetricsTimeout = otelMetricsTimeout === undefined ? target.otelTimeout : otelMetricsTimeout + target.otelMetricsExportTimeout = nonNegInt(OTEL_METRIC_EXPORT_TIMEOUT, 'OTEL_METRIC_EXPORT_TIMEOUT') + target.otelMetricsExportInterval = nonNegInt(OTEL_METRIC_EXPORT_INTERVAL, 'OTEL_METRIC_EXPORT_INTERVAL', false) + + // Parse temporality preference (default to DELTA for Datadog) + if (OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE) { + const temporalityPref = OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE.toUpperCase() + if (['DELTA', 'CUMULATIVE', 'LOWMEMORY'].includes(temporalityPref)) { + this.#setString(target, 'otelMetricsTemporalityPreference', temporalityPref) + } + } this.#setBoolean( target, 'apmTracingEnabled', @@ -1200,9 +1239,10 @@ class Config { calc['dogstatsd.hostname'] = this.#getHostname() - // Compute OTLP logs URL to send payloads to the active Datadog Agent + // Compute OTLP logs and metrics URLs to send payloads to the active Datadog Agent const agentHostname = this.#getHostname() calc.otelLogsUrl = `http://${agentHostname}:${DEFAULT_OTLP_PORT}` + calc.otelMetricsUrl = `http://${agentHostname}:${DEFAULT_OTLP_PORT}/v1/metrics` calc.otelUrl = `http://${agentHostname}:${DEFAULT_OTLP_PORT}` this.#setBoolean(calc, 'isGitUploadEnabled', @@ -1491,6 +1531,16 @@ function maybeFloat (number) { return Number.isNaN(parsed) ? undefined : parsed } +function nonNegInt (value, envVarName, allowZero = true) { + if (value === undefined) return + const parsed = Number.parseInt(value) + if (Number.isNaN(parsed) || parsed < 0 || (parsed === 0 && !allowZero)) { + log.warn(`Invalid value ${parsed} for ${envVarName}. Using default value.`) + return + } + return parsed +} + function getAgentUrl (url, options) { if (url) return new URL(url) diff --git a/packages/dd-trace/src/config_defaults.js b/packages/dd-trace/src/config_defaults.js index a7492209c46..201f93830b5 100644 --- a/packages/dd-trace/src/config_defaults.js +++ b/packages/dd-trace/src/config_defaults.js @@ -138,8 +138,17 @@ module.exports = { otelLogsProtocol: 'http/protobuf', otelLogsTimeout: 10_000, otelTimeout: 10_000, - otelLogsBatchTimeout: 5000, - otelLogsMaxExportBatchSize: 512, + otelBatchTimeout: 5000, + otelMaxExportBatchSize: 512, + otelMaxQueueSize: 2048, + otelMetricsEnabled: false, + otelMetricsUrl: undefined, // Will be computed using agent host + otelMetricsHeaders: '', + otelMetricsProtocol: 'http/protobuf', + otelMetricsTimeout: 10_000, + otelMetricsExportTimeout: 7500, + otelMetricsExportInterval: 10_000, + otelMetricsTemporalityPreference: 'DELTA', // DELTA, CUMULATIVE, or LOWMEMORY lookup: undefined, inferredProxyServicesEnabled: false, memcachedCommandEnabled: false, diff --git a/packages/dd-trace/src/opentelemetry/logs/index.js b/packages/dd-trace/src/opentelemetry/logs/index.js index 76a91e05a7b..6b4054da781 100644 --- a/packages/dd-trace/src/opentelemetry/logs/index.js +++ b/packages/dd-trace/src/opentelemetry/logs/index.js @@ -70,8 +70,8 @@ function initializeOpenTelemetryLogs (config) { // Create batch processor for exporting logs to Datadog Agent const processor = new BatchLogRecordProcessor( exporter, - config.otelLogsBatchTimeout, - config.otelLogsMaxExportBatchSize + config.otelBatchTimeout, + config.otelMaxExportBatchSize ) // Create logger provider with processor for Datadog Agent export diff --git a/packages/dd-trace/src/opentelemetry/logs/logger.js b/packages/dd-trace/src/opentelemetry/logs/logger.js index 57eb344fee0..7f210206bd5 100644 --- a/packages/dd-trace/src/opentelemetry/logs/logger.js +++ b/packages/dd-trace/src/opentelemetry/logs/logger.js @@ -2,7 +2,8 @@ const { sanitizeAttributes } = require('@opentelemetry/core') const { context } = require('@opentelemetry/api') -const packageVersion = require('../../../../../package.json').version +const { VERSION: packageVersion } = require('../../../../../version') + /** * @typedef {import('@opentelemetry/api-logs').LogRecord} LogRecord * @typedef {import('@opentelemetry/api').SpanContext} SpanContext diff --git a/packages/dd-trace/src/opentelemetry/logs/otlp_http_log_exporter.js b/packages/dd-trace/src/opentelemetry/logs/otlp_http_log_exporter.js index 29a8ed45901..3d157e85b0b 100644 --- a/packages/dd-trace/src/opentelemetry/logs/otlp_http_log_exporter.js +++ b/packages/dd-trace/src/opentelemetry/logs/otlp_http_log_exporter.js @@ -11,7 +11,7 @@ const OtlpTransformer = require('./otlp_transformer') /** * OtlpHttpLogExporter exports log records via OTLP over HTTP. * - * This implementation follows the OTLP HTTP specification: + * This implementation follows the OTLP HTTP v1.7.0 specification: * https://opentelemetry.io/docs/specs/otlp/#otlphttp * * @class OtlpHttpLogExporter @@ -37,6 +37,8 @@ class OtlpHttpLogExporter extends OtlpHttpExporterBase { * * @param {LogRecord[]} logRecords - Array of enriched log records to export * @param {Function} resultCallback - Callback function for export result + * + * @returns {void} */ export (logRecords, resultCallback) { if (logRecords.length === 0) { @@ -45,8 +47,8 @@ class OtlpHttpLogExporter extends OtlpHttpExporterBase { } const payload = this.transformer.transformLogRecords(logRecords) - this._sendPayload(payload, resultCallback) - this._recordTelemetry('otel.log_records', logRecords.length) + this.sendPayload(payload, resultCallback) + this.recordTelemetry('otel.log_records', logRecords.length) } } diff --git a/packages/dd-trace/src/opentelemetry/logs/otlp_transformer.js b/packages/dd-trace/src/opentelemetry/logs/otlp_transformer.js index 4e2fc97a4fc..3120604103f 100644 --- a/packages/dd-trace/src/opentelemetry/logs/otlp_transformer.js +++ b/packages/dd-trace/src/opentelemetry/logs/otlp_transformer.js @@ -40,7 +40,7 @@ const SEVERITY_MAP = { /** * OtlpTransformer transforms log records to OTLP format. * - * This implementation follows the OTLP Logs Data Model specification: + * This implementation follows the OTLP Logs v1.7.0 Data Model specification: * https://opentelemetry.io/docs/specs/otlp/#log-data-model * * @class OtlpTransformer @@ -80,12 +80,12 @@ class OtlpTransformer extends OtlpTransformerBase { const logsData = { resourceLogs: [{ - resource: this._transformResource(), + resource: this.transformResource(), scopeLogs: this.#transformScope(logRecords), }] } - return this._serializeToProtobuf(protoLogsService, logsData) + return this.serializeToProtobuf(protoLogsService, logsData) } /** @@ -97,11 +97,11 @@ class OtlpTransformer extends OtlpTransformerBase { #transformToJson (logRecords) { const logsData = { resourceLogs: [{ - resource: this._transformResource(), + resource: this.transformResource(), scopeLogs: this.#transformScope(logRecords) }] } - return this._serializeToJson(logsData) + return this.serializeToJson(logsData) } /** @@ -111,7 +111,7 @@ class OtlpTransformer extends OtlpTransformerBase { * @private */ #transformScope (logRecords) { - const groupedRecords = this._groupByInstrumentationScope(logRecords) + const groupedRecords = this.groupByInstrumentationScope(logRecords) const scopeLogs = [] for (const records of groupedRecords.values()) { @@ -159,7 +159,7 @@ class OtlpTransformer extends OtlpTransformerBase { } if (logRecord.attributes) { - result.attributes = this._transformAttributes(logRecord.attributes) + result.attributes = this.transformAttributes(logRecord.attributes) } if (spanContext?.traceFlags !== undefined) { @@ -240,7 +240,7 @@ class OtlpTransformer extends OtlpTransformerBase { kvlistValue: { values: Object.entries(body).map(([key, value]) => ({ key, - value: this._transformAnyValue(value) + value: this.transformAnyValue(value) })) } } diff --git a/packages/dd-trace/src/opentelemetry/metrics/constants.js b/packages/dd-trace/src/opentelemetry/metrics/constants.js new file mode 100644 index 00000000000..098dc1dfdc4 --- /dev/null +++ b/packages/dd-trace/src/opentelemetry/metrics/constants.js @@ -0,0 +1,33 @@ +'use strict' + +// Metric type constants +const METRIC_TYPES = { + HISTOGRAM: 'histogram', + COUNTER: 'counter', + UPDOWNCOUNTER: 'updowncounter', + OBSERVABLECOUNTER: 'observable-counter', + OBSERVABLEUPDOWNCOUNTER: 'observable-updowncounter', + GAUGE: 'gauge' +} + +// Temporality constants +const TEMPORALITY = { + DELTA: 'DELTA', + CUMULATIVE: 'CUMULATIVE', + GAUGE: 'GAUGE', + LOWMEMORY: 'LOWMEMORY' +} + +// Default histogram bucket boundaries (in milliseconds for latency metrics) +const DEFAULT_HISTOGRAM_BUCKETS = [0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10_000] + +// Maximum number of measurements to queue before dropping +// This is an arbitrary limit to prevent memory exhaustion +const DEFAULT_MAX_MEASUREMENT_QUEUE_SIZE = 524_288 + +module.exports = { + METRIC_TYPES, + TEMPORALITY, + DEFAULT_HISTOGRAM_BUCKETS, + DEFAULT_MAX_MEASUREMENT_QUEUE_SIZE +} diff --git a/packages/dd-trace/src/opentelemetry/metrics/index.js b/packages/dd-trace/src/opentelemetry/metrics/index.js new file mode 100644 index 00000000000..9d5bffd7a93 --- /dev/null +++ b/packages/dd-trace/src/opentelemetry/metrics/index.js @@ -0,0 +1,81 @@ +'use strict' + +const os = require('os') + +/** + * @typedef {import('../../config')} Config + */ + +/** + * @fileoverview OpenTelemetry Metrics Implementation for dd-trace-js + * + * This package provides a custom OpenTelemetry Metrics implementation that integrates + * with the Datadog tracing library. It includes all necessary components for + * creating instruments, recording measurements, and exporting metrics via OTLP. + * + * Key Components: + * - MeterProvider: Main entry point for creating meters + * - Meter: Provides methods to create metric instruments + * - Instruments: Gauge, Counter, UpDownCounter, ObservableGauge, ObservableCounter, ObservableUpDownCounter, Histogram + * - PeriodicMetricReader: Collects and exports instruments (metrics) at regular intervals + * - OtlpHttpMetricExporter: Exports instruments (metrics) via OTLP over HTTP + * - OtlpTransformer: Transforms instruments (metrics) to OTLP format + * + * This is a custom implementation to avoid pulling in the full OpenTelemetry SDK, + * based on OTLP Protocol v1.7.0. It supports both protobuf and JSON serialization + * formats and integrates with Datadog's configuration system. + * + * @package + */ + +const MeterProvider = require('./meter_provider') +const PeriodicMetricReader = require('./periodic_metric_reader') +const OtlpHttpMetricExporter = require('./otlp_http_metric_exporter') + +/** + * Initializes OpenTelemetry Metrics support + * @param {Config} config - Tracer configuration instance + */ +function initializeOpenTelemetryMetrics (config) { + const resourceAttributes = { + 'service.name': config.service, + 'service.version': config.version, + 'deployment.environment': config.env + } + + if (config.tags) { + const filteredTags = { ...config.tags } + delete filteredTags.service + delete filteredTags.version + delete filteredTags.env + Object.assign(resourceAttributes, filteredTags) + } + + if (config.reportHostname) { + resourceAttributes['host.name'] = os.hostname() + } + + const exporter = new OtlpHttpMetricExporter( + config.otelMetricsUrl, + config.otelMetricsHeaders, + config.otelMetricsTimeout, + config.otelMetricsProtocol, + resourceAttributes + ) + + const reader = new PeriodicMetricReader( + exporter, + config.otelMetricsExportInterval, + config.otelMetricsTemporalityPreference, + config.otelMaxQueueSize + ) + + const meterProvider = new MeterProvider({ reader }) + + meterProvider.register() +} + +module.exports = { + MeterProvider, + initializeOpenTelemetryMetrics +} diff --git a/packages/dd-trace/src/opentelemetry/metrics/instruments.js b/packages/dd-trace/src/opentelemetry/metrics/instruments.js new file mode 100644 index 00000000000..e789a74f7d3 --- /dev/null +++ b/packages/dd-trace/src/opentelemetry/metrics/instruments.js @@ -0,0 +1,221 @@ +'use strict' + +const { sanitizeAttributes } = require('@opentelemetry/core') +const { METRIC_TYPES } = require('./constants') + +/** + * @typedef {import('@opentelemetry/api').Attributes} Attributes + * @typedef {import('@opentelemetry/core').InstrumentationScope} InstrumentationScope + */ + +/** + * @typedef {Object} Measurement + * @property {string} name - Instrument name + * @property {string} description - Instrument description + * @property {string} unit - Measurement unit + * @property {InstrumentationScope} instrumentationScope - Instrumentation scope + * @property {string} type - Metric type from METRIC_TYPES + * @property {number} value - Measured value + * @property {Attributes} attributes - Sanitized metric attributes + * @property {number} timestamp - Timestamp in nanoseconds + */ + +/** + * Base class for all metric instruments. + * + * @private + */ +class Instrument { + /** + * Creates a new instrument instance. + * + * @param {string} name - Instrument name (e.g., 'http.request.duration') + * @param {Object} options - Instrument configuration options + * @param {string} [options.description] - Human-readable description of the instrument + * @param {string} [options.unit] - Unit of measurement (e.g., 'ms', 'bytes', '1') + * @param {InstrumentationScope} instrumentationScope - Instrumentation scope for this instrument + * @param {Object} reader - Metric reader for recording measurements + */ + constructor (name, options, instrumentationScope, reader) { + this.name = name + this.description = options.description ?? '' + this.unit = options.unit ?? '' + this.valueType = options.valueType // currently ignored, TODO: add support for ValueType + this.advice = options.advice // currently ignored, TODO: add support for MetricAdvice + this.instrumentationScope = instrumentationScope + this.reader = reader + } + + /** + * Creates a measurement object for recording metric values. + * @param {string} type - Metric type from METRIC_TYPES + * @param {number} value - Numeric value to record + * @param {Attributes} attributes - Key-value pairs for metric dimensions + * @returns {Measurement} Measurement object with metadata and timestamp + */ + createMeasurement (type, value, attributes) { + return { + name: this.name, + description: this.description, + unit: this.unit, + instrumentationScope: this.instrumentationScope, + type, + value, + attributes: sanitizeAttributes(attributes), + timestamp: Number(process.hrtime.bigint()) + } + } +} + +/** + * Implementation of the OpenTelemetry Counter interface: + * https://open-telemetry.github.io/opentelemetry-js/interfaces/_opentelemetry_api._opentelemetry_api.Counter.html + * @class Counter + */ +class Counter extends Instrument { + add (value, attributes = {}) { + if (value < 0) return + this.reader?.record(this.createMeasurement(METRIC_TYPES.COUNTER, value, attributes)) + } +} + +/** + * Implementation of the OpenTelemetry UpDownCounter interface: + * https://open-telemetry.github.io/opentelemetry-js/interfaces/_opentelemetry_api._opentelemetry_api.UpDownCounter.html + * @class UpDownCounter + */ +class UpDownCounter extends Instrument { + add (value, attributes = {}) { + this.reader?.record(this.createMeasurement(METRIC_TYPES.UPDOWNCOUNTER, value, attributes)) + } +} + +/** + * Implementation of the OpenTelemetry Histogram interface: + * https://open-telemetry.github.io/opentelemetry-js/interfaces/_opentelemetry_api._opentelemetry_api.Histogram.html + * @class Histogram + */ +class Histogram extends Instrument { + record (value, attributes = {}) { + if (value < 0) return + this.reader?.record(this.createMeasurement(METRIC_TYPES.HISTOGRAM, value, attributes)) + } +} + +/** + * Implementation of the OpenTelemetry Gauge interface: + * https://open-telemetry.github.io/opentelemetry-js/interfaces/_opentelemetry_api._opentelemetry_api.Gauge.html + * @class Gauge + */ +class Gauge extends Instrument { + record (value, attributes = {}) { + this.reader?.record(this.createMeasurement(METRIC_TYPES.GAUGE, value, attributes)) + } +} + +/** + * Base class for observable (asynchronous) instruments. + * Implementation of the OpenTelemetry Observable interface: + * https://open-telemetry.github.io/opentelemetry-js/interfaces/_opentelemetry_api._opentelemetry_api.Observable.html + * @private + */ +class ObservableInstrument extends Instrument { + #callbacks = [] + #type + + constructor (name, options, instrumentationScope, reader, type) { + super(name, options, instrumentationScope, reader) + this.#type = type + } + + /** + * Adds a callback to invoke during metric collection. + * + * @param {Function} callback - Receives an ObservableResult to record observations + */ + addCallback (callback) { + if (typeof callback !== 'function') return + this.#callbacks.push(callback) + this.reader?.registerObservableInstrument(this) + } + + /** + * Removes a callback. + * + * @param {Function} callback - The callback to remove + */ + removeCallback (callback) { + const index = this.#callbacks.indexOf(callback) + if (index !== -1) { + this.#callbacks.splice(index, 1) + } + } + + /** + * Collects observations from all callbacks. Errors are silently ignored. + * + * @returns {Array} Array of measurements + */ + collect () { + const observations = [] + const observableResult = { + observe: (value, attributes = {}) => { + observations.push(this.createMeasurement(this.#type, value, attributes)) + } + } + + for (const callback of this.#callbacks) { + try { + callback(observableResult) + } catch { + // Ignore callback errors per OpenTelemetry spec to prevent disruption + // Errors are swallowed as callbacks should not break metric collection + } + } + + return observations + } +} + +/** + * Implementation of the OpenTelemetry ObservableGauge interface: + * https://open-telemetry.github.io/opentelemetry-js/types/_opentelemetry_api._opentelemetry_api.ObservableGauge.html + * @class ObservableGauge + */ +class ObservableGauge extends ObservableInstrument { + constructor (name, options, instrumentationScope, reader) { + super(name, options, instrumentationScope, reader, METRIC_TYPES.GAUGE) + } +} + +/** + * Implementation of the OpenTelemetry ObservableCounter interface: + * https://open-telemetry.github.io/opentelemetry-js/types/_opentelemetry_api._opentelemetry_api.ObservableCounter.html + * @class ObservableCounter + */ +class ObservableCounter extends ObservableInstrument { + constructor (name, options, instrumentationScope, reader) { + super(name, options, instrumentationScope, reader, METRIC_TYPES.OBSERVABLECOUNTER) + } +} + +/** + * Implementation of the OpenTelemetry ObservableUpDownCounter interface: + * https://open-telemetry.github.io/opentelemetry-js/types/_opentelemetry_api._opentelemetry_api.ObservableUpDownCounter.html + * @class ObservableUpDownCounter + */ +class ObservableUpDownCounter extends ObservableInstrument { + constructor (name, options, instrumentationScope, reader) { + super(name, options, instrumentationScope, reader, METRIC_TYPES.OBSERVABLEUPDOWNCOUNTER) + } +} + +module.exports = { + Counter, + UpDownCounter, + Histogram, + Gauge, + ObservableGauge, + ObservableCounter, + ObservableUpDownCounter +} diff --git a/packages/dd-trace/src/opentelemetry/metrics/meter.js b/packages/dd-trace/src/opentelemetry/metrics/meter.js new file mode 100644 index 00000000000..a06d4ecb829 --- /dev/null +++ b/packages/dd-trace/src/opentelemetry/metrics/meter.js @@ -0,0 +1,171 @@ +'use strict' + +const { VERSION: packageVersion } = require('../../../../../version') +const { + Counter, UpDownCounter, Histogram, Gauge, ObservableGauge, ObservableCounter, ObservableUpDownCounter +} = require('./instruments') +const log = require('../../log') +const { METRIC_TYPES } = require('./constants') + +/** + * @typedef {import('@opentelemetry/api').MetricOptions} MetricOptions + * @typedef {import('@opentelemetry/core').InstrumentationScope} InstrumentationScope + */ + +/** + * Meter provides methods to create metric instruments. + * + * This implementation follows the OpenTelemetry JavaScript API Meter: + * https://open-telemetry.github.io/opentelemetry-js/interfaces/_opentelemetry_api._opentelemetry_api.Meter.html + * + * @class Meter + */ +class Meter { + #instrumentationScope + #instruments = new Map() + /** + * Creates a new Meter instance. + * + * @param {MeterProvider} meterProvider - Parent meter provider + * @param {InstrumentationScope} instrumentationScope - Instrumentation scope information + * @param {string} [instrumentationScope.name] - Meter name (defaults to 'dd-trace-js') + * @param {string} [instrumentationScope.version] - Meter version (defaults to tracer version) + * @param {string} [instrumentationScope.schemaUrl] - Schema URL + * @param {Object} [instrumentationScope.attributes] - Attributes for the instrumentation scope + */ + constructor ( + meterProvider, + { name = 'dd-trace-js', version = packageVersion, schemaUrl = '', attributes = {} } = {} + ) { + this.meterProvider = meterProvider + this.#instrumentationScope = { + name, + version, + schemaUrl, + attributes, + } + } + + /** + * Gets an existing instrument or creates a new one if it doesn't exist. + * Instruments are cached by type and normalized (lowercase) name. + * + * @private + * @param {string} name - Instrument name (will be normalized to lowercase) + * @param {string} type - Instrument type (e.g., 'counter', 'histogram', 'gauge') + * @param {Function} InstrumentClass - Constructor for the instrument type + * @param {MetricOptions} [options] - Instrument options (description, unit, etc.) + * @returns {Instrument} The instrument instance (new or cached) + */ + #getOrCreateInstrument (name, type, InstrumentClass, options) { + const normalizedName = name.toLowerCase() + const key = `${type}:${normalizedName}` + let instrument = this.#instruments.get(key) + if (!instrument) { + instrument = new InstrumentClass( + normalizedName, options, this.#instrumentationScope, this.meterProvider.reader + ) + this.#instruments.set(key, instrument) + } + return instrument + } + + /** + * Creates a Counter instrument. + * + * @param {string} name - Instrument name (case-insensitive) + * @param {MetricOptions} [options] - Instrument options + * @returns {Counter} Counter instrument + */ + createCounter (name, options = {}) { + return this.#getOrCreateInstrument(name, METRIC_TYPES.COUNTER, Counter, options) + } + + /** + * Creates an UpDownCounter instrument. + * + * @param {string} name - Instrument name + * @param {MetricOptions} [options] - Instrument options + * @returns {UpDownCounter} UpDownCounter instrument + */ + createUpDownCounter (name, options = {}) { + return this.#getOrCreateInstrument(name, METRIC_TYPES.UPDOWNCOUNTER, UpDownCounter, options) + } + + /** + * Creates a Histogram instrument. + * + * @param {string} name - Instrument name (case-insensitive) + * @param {MetricOptions} [options] - Instrument options + * @returns {Histogram} Histogram instrument + */ + createHistogram (name, options = {}) { + return this.#getOrCreateInstrument(name, METRIC_TYPES.HISTOGRAM, Histogram, options) + } + + /** + * Creates a Gauge instrument. + * + * @param {string} name - Instrument name (case-insensitive) + * @param {MetricOptions} [options] - Instrument options + * @returns {Gauge} Gauge instrument + */ + createGauge (name, options = {}) { + return this.#getOrCreateInstrument(name, METRIC_TYPES.GAUGE, Gauge, options) + } + + /** + * Creates an ObservableGauge instrument. + * + * @param {string} name - Instrument name (case-insensitive) + * @param {MetricOptions} [options] - Instrument options + * @returns {ObservableGauge} ObservableGauge instrument + */ + createObservableGauge (name, options = {}) { + return this.#getOrCreateInstrument(name, METRIC_TYPES.OBSERVABLEGAUGE, ObservableGauge, options) + } + + /** + * Creates an ObservableCounter instrument. + * + * @param {string} name - Instrument name (case-insensitive) + * @param {MetricOptions} [options] - Instrument options + * @returns {ObservableCounter} ObservableCounter instrument + */ + createObservableCounter (name, options = {}) { + return this.#getOrCreateInstrument(name, METRIC_TYPES.OBSERVABLECOUNTER, ObservableCounter, options) + } + + /** + * Creates an ObservableUpDownCounter instrument. + * + * @param {string} name - Instrument name (case-insensitive) + * @param {MetricOptions} [options] - Instrument options + * @returns {ObservableUpDownCounter} ObservableUpDownCounter instrument + */ + createObservableUpDownCounter (name, options = {}) { + return this.#getOrCreateInstrument(name, METRIC_TYPES.OBSERVABLEUPDOWNCOUNTER, ObservableUpDownCounter, options) + } + + /** + * Adds a batch observable callback (not implemented). + * + * @param {Function} callback - Batch observable callback + * @param {Array} observables - Array of observable instruments + */ + addBatchObservableCallback (callback, observables) { + log.warn('addBatchObservableCallback is not implemented') + } + + /** + * Removes a batch observable callback (not implemented). + * + * @param {Function} callback - Batch observable callback + * @param {Array} observables - Array of observable instruments + */ + removeBatchObservableCallback (callback, observables) { + log.warn('removeBatchObservableCallback is not implemented') + } +} + +module.exports = Meter diff --git a/packages/dd-trace/src/opentelemetry/metrics/meter_provider.js b/packages/dd-trace/src/opentelemetry/metrics/meter_provider.js new file mode 100644 index 00000000000..cf037bffcb1 --- /dev/null +++ b/packages/dd-trace/src/opentelemetry/metrics/meter_provider.js @@ -0,0 +1,113 @@ +'use strict' + +const { metrics } = require('@opentelemetry/api') +const Meter = require('./meter') +const log = require('../../log') +const { context } = require('@opentelemetry/api') +const ContextManager = require('../context_manager') + +/** + * @typedef {import('@opentelemetry/api').Meter} Meter + * @typedef {import('@opentelemetry/api').MeterOptions} MeterOptions + * @typedef {import('./periodic_metric_reader')} PeriodicMetricReader + */ + +/** + * MeterProvider is the main entry point for creating meters with a single reader for Datadog Agent export. + * + * This implementation follows the OpenTelemetry JavaScript API MeterProvider interface: + * https://open-telemetry.github.io/opentelemetry-js/interfaces/_opentelemetry_api._opentelemetry_api.MeterProvider.html + * + * @class MeterProvider + * @implements {import('@opentelemetry/api').MeterProvider} + */ +class MeterProvider { + #meters = new Map() + #contextManager = new ContextManager() + /** + * Creates a new MeterProvider instance with a single reader for Datadog Agent export. + * + * @param {MeterOptions} [options] - MeterProvider options + * @param {PeriodicMetricReader} [options.reader] - Single MetricReader instance for + * exporting metrics to Datadog Agent + */ + constructor (options = {}) { + this.reader = options.reader + this.isShutdown = false + } + + /** + * Gets or creates a meter instance. + * + * @param {string} name - Meter name (case-insensitive) + * @param {string} [version] - Meter version + * @param {MeterOptions} [options] - Additional options + * @returns {Meter} Meter instance + */ + getMeter (name, version = '', { schemaUrl = '' } = {}) { + if (this.isShutdown) { + return this.#createNoOpMeter() + } + const normalizedName = name.toLowerCase() + const key = `${normalizedName}@${version}@${schemaUrl}` + let meter = this.#meters.get(key) + if (!meter) { + meter = new Meter(this, { name: normalizedName, version, schemaUrl }) + this.#meters.set(key, meter) + } + return meter + } + + /** + * Registers this meter provider as the global provider. + */ + register () { + if (this.isShutdown) { + log.warn('Cannot register after shutdown') + return + } + // Set context manager (may be needed for future trace/metrics correlation) + context.setGlobalContextManager(this.#contextManager) + metrics.setGlobalMeterProvider(this) + } + + /** + * Forces a flush of all pending metrics. + * @returns {void} + */ + forceFlush () { + if (!this.isShutdown && this.reader) this.reader.forceFlush() + } + + /** + * Shuts down the meter provider and all associated readers. + * @returns {void} + */ + shutdown () { + if (!this.isShutdown) { + this.isShutdown = true + if (this.reader) { + this.reader.shutdown() + } + } + } + + /** + * Creates a no-op meter for use when the provider is shutdown. + * @returns {Meter} A no-op meter instance + * @private + */ + #createNoOpMeter () { + return { + createCounter: () => ({ add: () => {} }), + createUpDownCounter: () => ({ add: () => {} }), + createHistogram: () => ({ record: () => {} }), + createGauge: () => ({ record: () => {} }), + createObservableGauge: () => ({ addCallback: () => {} }), + createObservableCounter: () => ({ addCallback: () => {} }), + createObservableUpDownCounter: () => ({ addCallback: () => {} }) + } + } +} + +module.exports = MeterProvider diff --git a/packages/dd-trace/src/opentelemetry/metrics/otlp_http_metric_exporter.js b/packages/dd-trace/src/opentelemetry/metrics/otlp_http_metric_exporter.js new file mode 100644 index 00000000000..90a732573c1 --- /dev/null +++ b/packages/dd-trace/src/opentelemetry/metrics/otlp_http_metric_exporter.js @@ -0,0 +1,65 @@ +'use strict' + +const OtlpHttpExporterBase = require('../otlp/otlp_http_exporter_base') +const OtlpTransformer = require('./otlp_transformer') + +/** + * @typedef {import('@opentelemetry/resources').Resource} Resource + * @typedef {import('./periodic_metric_reader').AggregatedMetric} AggregatedMetric + */ + +/** + * OtlpHttpMetricExporter exports metrics via OTLP over HTTP. + * + * @class OtlpHttpMetricExporter + */ +class OtlpHttpMetricExporter extends OtlpHttpExporterBase { + /** + * Creates a new OtlpHttpMetricExporter instance. + * + * @param {string} url - OTLP endpoint URL + * @param {string} headers - Additional HTTP headers as comma-separated key=value string + * @param {number} timeout - Request timeout in milliseconds + * @param {string} protocol - OTLP protocol (http/protobuf or http/json) + * @param {Resource} resource - Resource attributes + */ + constructor (url, headers, timeout, protocol, resource) { + super(url, headers, timeout, protocol, '/v1/metrics', 'metrics') + this.transformer = new OtlpTransformer(resource, protocol) + } + + /** + * Exports metrics via OTLP over HTTP. + * + * @param {Map} metrics - Map of metric data to export + * @param {Function} resultCallback - Callback function for export result + * + * @returns {void} + */ + export (metrics, resultCallback) { + if (metrics.size === 0) { + resultCallback({ code: 0 }) + return + } + + let dataPointCount = 0 + for (const metric of metrics.values()) { + if (metric.dataPointMap) { + dataPointCount += metric.dataPointMap.size + } + } + + const additionalTags = [`points:${dataPointCount}`] + this.recordTelemetry('otel.metrics_export_attempts', 1, additionalTags) + + const payload = this.transformer.transformMetrics(metrics.values()) + this.sendPayload(payload, (result) => { + if (result.code === 0) { + this.recordTelemetry('otel.metrics_export_successes', 1, additionalTags) + } + resultCallback(result) + }) + } +} + +module.exports = OtlpHttpMetricExporter diff --git a/packages/dd-trace/src/opentelemetry/metrics/otlp_transformer.js b/packages/dd-trace/src/opentelemetry/metrics/otlp_transformer.js new file mode 100644 index 00000000000..6043eb370ae --- /dev/null +++ b/packages/dd-trace/src/opentelemetry/metrics/otlp_transformer.js @@ -0,0 +1,251 @@ +'use strict' + +const OtlpTransformerBase = require('../otlp/otlp_transformer_base') +const { getProtobufTypes } = require('../otlp/protobuf_loader') +const { METRIC_TYPES, TEMPORALITY } = require('./constants') + +const { protoAggregationTemporality } = getProtobufTypes() +const AGGREGATION_TEMPORALITY_DELTA = protoAggregationTemporality.values.AGGREGATION_TEMPORALITY_DELTA +const AGGREGATION_TEMPORALITY_CUMULATIVE = protoAggregationTemporality.values.AGGREGATION_TEMPORALITY_CUMULATIVE + +/** + * @typedef {import('./periodic_metric_reader').AggregatedMetric} AggregatedMetric + * @typedef {import('./periodic_metric_reader').NumberDataPoint} NumberDataPoint + * @typedef {import('./periodic_metric_reader').HistogramDataPoint} HistogramDataPoint + */ + +/** + * OtlpTransformer transforms metrics to OTLP format. + * + * This implementation follows the OTLP Metrics v1.7.0 Data Model specification: + * https://opentelemetry.io/docs/specs/otlp/#metrics-data-model + * + * @class OtlpTransformer + * @extends OtlpTransformerBase + */ +class OtlpTransformer extends OtlpTransformerBase { + /** + * Creates a new OtlpTransformer instance. + * + * @param {import('@opentelemetry/api').Attributes} resourceAttributes - Resource attributes + * @param {string} protocol - OTLP protocol (http/protobuf or http/json) + */ + constructor (resourceAttributes, protocol) { + super(resourceAttributes, protocol, 'metrics') + } + + /** + * Transforms metrics to OTLP format based on the configured protocol. + * @param {Iterable} metrics - Iterable of metric data to transform + * @returns {Buffer} Transformed metrics in the appropriate format + */ + transformMetrics (metrics) { + if (this.protocol === 'http/json') { + return this.#transformToJson(metrics) + } + return this.#transformToProtobuf(metrics) + } + + /** + * Transforms metrics to protobuf format. + * @param {Iterable} metrics - Iterable of metrics to transform + * @returns {Buffer} Protobuf-encoded metrics + * @private + */ + #transformToProtobuf (metrics) { + const { protoMetricsService } = getProtobufTypes() + + const metricsData = { + resourceMetrics: [{ + resource: this.transformResource(), + scopeMetrics: this.#transformScope(metrics), + }] + } + + return this.serializeToProtobuf(protoMetricsService, metricsData) + } + + /** + * Transforms metrics to JSON format. + * @param {Array} metrics - Array of metrics to transform + * @returns {Buffer} JSON-encoded metrics + * @private + */ + #transformToJson (metrics) { + const metricsData = { + resourceMetrics: [{ + resource: this.transformResource(), + scopeMetrics: this.#transformScope(metrics, true) + }] + } + return this.serializeToJson(metricsData) + } + + /** + * Creates scope metrics grouped by instrumentation scope. + * @param {Iterable} metrics - Iterable of metrics to transform + * @param {boolean} isJson - Whether to format for JSON output + * @returns {Array} Array of scope metric objects + * @private + */ + #transformScope (metrics, isJson = false) { + const groupedMetrics = this.groupByInstrumentationScope(metrics) + const scopeMetrics = [] + + for (const metricsInScope of groupedMetrics.values()) { + const firstMetric = metricsInScope[0] + const instrumentationScope = firstMetric.instrumentationScope || {} + const { name = '', version = '', schemaUrl = '', attributes = {} } = instrumentationScope + + const scope = { + name, + version, + droppedAttributesCount: 0 + } + + if (attributes) { + const transformed = isJson ? this.attributesToJson(attributes) : this.transformAttributes(attributes) + if (transformed.length) { + scope.attributes = transformed + } + } + + scopeMetrics.push({ + scope, + schemaUrl, + metrics: metricsInScope.map(metric => this.#transformMetric(metric, isJson)) + }) + } + + return scopeMetrics + } + + /** + * Transforms a single metric to protobuf or JSON format. + * @private + * @param {AggregatedMetric} metric - The metric to transform + * @param {boolean} isJson - Whether to output JSON format (vs protobuf) + * @returns {Object} - The metric transformed to OTLP protobuf or JSON format + */ + #transformMetric (metric, isJson = false) { + const result = { + name: metric.name, + description: metric.description || '', + unit: metric.unit || '' + } + + const isCumulative = metric.temporality === TEMPORALITY.CUMULATIVE + let temporality + if (isJson) { + temporality = isCumulative ? 'AGGREGATION_TEMPORALITY_CUMULATIVE' : 'AGGREGATION_TEMPORALITY_DELTA' + } else { + temporality = isCumulative ? AGGREGATION_TEMPORALITY_CUMULATIVE : AGGREGATION_TEMPORALITY_DELTA + } + + switch (metric.type) { + case METRIC_TYPES.HISTOGRAM: + result.histogram = { + dataPoints: Array.from(metric.dataPointMap.values(), dp => this.#transformHistogramDataPoint(dp, isJson)), + aggregationTemporality: temporality + } + break + + case METRIC_TYPES.COUNTER: + case METRIC_TYPES.OBSERVABLECOUNTER: + case METRIC_TYPES.UPDOWNCOUNTER: + case METRIC_TYPES.OBSERVABLEUPDOWNCOUNTER: + result.sum = { + dataPoints: Array.from(metric.dataPointMap.values(), dp => this.#transformNumberDataPoint(dp, isJson)), + aggregationTemporality: temporality, + isMonotonic: metric.type === METRIC_TYPES.COUNTER || metric.type === METRIC_TYPES.OBSERVABLECOUNTER + } + break + + case METRIC_TYPES.GAUGE: + result.gauge = { + dataPoints: Array.from(metric.dataPointMap.values(), dp => this.#transformNumberDataPoint(dp, isJson)) + } + break + } + + return result + } + + /** + * Transforms a histogram data point. + * @private + * @param {HistogramDataPoint} dp - The histogram data point to transform + * @param {boolean} isJson - Whether to output JSON format (vs protobuf) + * @returns {Object} The histogram data point transformed to OTLP protobuf format + */ + #transformHistogramDataPoint (dp, isJson) { + const attributes = isJson + ? this.attributesToJson(dp.attributes) + : this.transformAttributes(dp.attributes) + + const dataPoint = { + attributes, + startTimeUnixNano: dp.startTimeUnixNano, + timeUnixNano: dp.timeUnixNano, + count: dp.count, + sum: dp.sum, + bucketCounts: dp.bucketCounts || [], + explicitBounds: dp.explicitBounds || [], + min: dp.min, + max: dp.max + } + + if (isJson) { + dataPoint.startTimeUnixNano = String(dataPoint.startTimeUnixNano) + dataPoint.timeUnixNano = String(dataPoint.timeUnixNano) + dataPoint.count = dataPoint.count || 0 + } + + return dataPoint + } + + /** + * Transforms a number data point to protobuf or JSON format. + * @private + * @param {NumberDataPoint} dataPoint - The number data point to transform + * @param {boolean} isJson - Whether to output JSON format (vs protobuf) + * @returns {Object} The number data point transformed to OTLP protobuf format + */ + #transformNumberDataPoint (dataPoint, isJson) { + const attributes = isJson + ? this.attributesToJson(dataPoint.attributes) + : this.transformAttributes(dataPoint.attributes) + const timeUnixNano = isJson + ? String(dataPoint.timeUnixNano) + : dataPoint.timeUnixNano + + const result = { + attributes, + timeUnixNano + } + + if (dataPoint.startTimeUnixNano) { + result.startTimeUnixNano = isJson ? String(dataPoint.startTimeUnixNano) : dataPoint.startTimeUnixNano + } + + this.#assignNumberValue(result, dataPoint.value) + return result + } + + /** + * Assigns the appropriate value field (asInt or asDouble) based on the value type. + * @private + * @param {NumberDataPoint} dataPoint - The number data point to assign a value to + * @param {number} value - The value to assign + * @returns {void} + */ + #assignNumberValue (dataPoint, value) { + if (Number.isInteger(value)) { + dataPoint.asInt = value + } else { + dataPoint.asDouble = value + } + } +} + +module.exports = OtlpTransformer diff --git a/packages/dd-trace/src/opentelemetry/metrics/periodic_metric_reader.js b/packages/dd-trace/src/opentelemetry/metrics/periodic_metric_reader.js new file mode 100644 index 00000000000..e4687545456 --- /dev/null +++ b/packages/dd-trace/src/opentelemetry/metrics/periodic_metric_reader.js @@ -0,0 +1,512 @@ +'use strict' + +const { + METRIC_TYPES, TEMPORALITY, DEFAULT_HISTOGRAM_BUCKETS, DEFAULT_MAX_MEASUREMENT_QUEUE_SIZE +} = require('./constants') +const log = require('../../log') +const { stableStringify } = require('../otlp/otlp_transformer_base') + +/** + * @typedef {import('@opentelemetry/api').Attributes} Attributes + * @typedef {import('@opentelemetry/core').InstrumentationScope} InstrumentationScope + * @typedef {import('./instruments').Measurement} Measurement + * + * @typedef {Object} NumberDataPoint + * @property {Attributes} attributes - Metric attributes + * @property {string} attrKey - Stable stringified key for attributes + * @property {number} timeUnixNano - Timestamp in nanoseconds + * @property {number} startTimeUnixNano - Start timestamp for cumulative metrics + * @property {number} value - Metric value + * + * @typedef {Object} HistogramDataPoint + * @property {Attributes} attributes - Metric attributes + * @property {string} attrKey - Stable stringified key for attributes + * @property {number} timeUnixNano - Timestamp in nanoseconds + * @property {number} startTimeUnixNano - Start timestamp + * @property {number} count - Number of observations + * @property {number} sum - Sum of all observations + * @property {number} min - Minimum value observed + * @property {number} max - Maximum value observed + * @property {number[]} bucketCounts - Count per histogram bucket + * @property {number[]} explicitBounds - Histogram bucket boundaries + * + * @typedef {Object} AggregatedMetric + * @property {string} name - Metric name + * @property {string} description - Metric description + * @property {string} unit - Metric unit + * @property {string} type - Metric type from METRIC_TYPES + * @property {InstrumentationScope} instrumentationScope - Instrumentation scope + * @property {string} temporality - Temporality from TEMPORALITY constants + * @property {Map} dataPointMap - Map of attribute keys to data points + * + */ + +/** + * PeriodicMetricReader collects and exports metrics at a regular interval. + * + * This implementation follows the OpenTelemetry JavaScript SDK MetricReader pattern: + * https://open-telemetry.github.io/opentelemetry-js/classes/_opentelemetry_sdk-metrics.PeriodicExportingMetricReader.html + * + * @class PeriodicMetricReader + */ +class PeriodicMetricReader { + #measurements = [] + #observableInstruments = new Set() + #cumulativeState = new Map() + #lastExportedState = new Map() + #droppedCount = 0 + #timer = null + #exportInterval + #aggregator + + /** + * Creates a new PeriodicMetricReader instance. + * + * @param {OtlpHttpMetricExporter} exporter - Metric exporter for sending to Datadog Agent + * @param {number} exportInterval - Export interval in milliseconds + * @param {string} temporalityPreference - Temporality preference: DELTA, CUMULATIVE, or LOWMEMORY + * @param {number} maxQueueSize - Maximum number of measurements to queue before dropping + */ + constructor (exporter, exportInterval, temporalityPreference, maxBatchedQueueSize) { + this.exporter = exporter + this.#exportInterval = exportInterval + this.#aggregator = new MetricAggregator(temporalityPreference, maxBatchedQueueSize) + this.#startTimer() + } + + /** + * Records a measurement from a synchronous instrument. + * + * @param {Measurement} measurement - The measurement data + */ + record (measurement) { + if (this.#measurements.length >= DEFAULT_MAX_MEASUREMENT_QUEUE_SIZE) { + this.#droppedCount++ + return + } + this.#measurements.push(measurement) + } + + /** + * Registers an observable instrument for periodic collection. + * + * @param {ObservableGauge} instrument - The observable instrument to register + */ + registerObservableInstrument (instrument) { + this.#observableInstruments.add(instrument) + } + + /** + * Forces an immediate collection and export of all metrics. + * @returns {void} + */ + forceFlush () { + this.#collectAndExport() + } + + /** + * Shuts down the reader and stops periodic collection. + * @returns {void} + */ + shutdown () { + this.#clearTimer() + this.forceFlush() + } + + /** + * Starts the periodic export timer. + * @private + */ + #startTimer () { + if (this.#timer) return + + this.#timer = setInterval(() => { + this.#collectAndExport() + }, this.#exportInterval).unref() + } + + /** + * Clears the periodic export timer. + * @private + */ + #clearTimer () { + if (this.#timer) { + clearInterval(this.#timer) + this.#timer = null + } + } + + /** + * Collects measurements and exports metrics. + * @private + * @param {Function} [callback] - Called after export completes + */ + #collectAndExport (callback = () => {}) { + // Atomically drain measurements for export. New measurements can be recorded + // during export without interfering with this batch. + const allMeasurements = this.#measurements.splice(0) + + for (const instrument of this.#observableInstruments) { + const observableMeasurements = instrument.collect() + + if (allMeasurements.length >= DEFAULT_MAX_MEASUREMENT_QUEUE_SIZE) { + this.#droppedCount += observableMeasurements.length + continue + } + + const remainingCapacity = DEFAULT_MAX_MEASUREMENT_QUEUE_SIZE - allMeasurements.length + + if (observableMeasurements.length <= remainingCapacity) { + allMeasurements.push(...observableMeasurements) + } else { + allMeasurements.push(...observableMeasurements.slice(0, remainingCapacity)) + this.#droppedCount += observableMeasurements.length - remainingCapacity + } + } + + if (this.#droppedCount > 0) { + log.warn( + `Metric queue exceeded limit (max: ${DEFAULT_MAX_MEASUREMENT_QUEUE_SIZE}). ` + + `Dropping ${this.#droppedCount} measurements. ` + ) + this.#droppedCount = 0 + } + + if (allMeasurements.length === 0) { + callback() + return + } + + const metrics = this.#aggregator.aggregate( + allMeasurements, + this.#cumulativeState, + this.#lastExportedState + ) + + this.exporter.export(metrics, callback) + } +} + +/** + * MetricAggregator aggregates individual measurements into metric data points. + * @private + */ +class MetricAggregator { + #startTime = Number(process.hrtime.bigint()) + #temporalityPreference + #maxBatchedQueueSize + + constructor (temporalityPreference, maxBatchedQueueSize) { + this.#temporalityPreference = temporalityPreference + this.#maxBatchedQueueSize = maxBatchedQueueSize + } + + /** + * Gets the temporality for a given metric type. + * @private + * @param {string} type - Metric type from METRIC_TYPES + * @returns {string} Temporality from TEMPORALITY + */ + #getTemporality (type) { + // UpDownCounter and Observable UpDownCounter always use CUMULATIVE + if (type === METRIC_TYPES.UPDOWNCOUNTER || type === METRIC_TYPES.OBSERVABLEUPDOWNCOUNTER) { + return TEMPORALITY.CUMULATIVE + } + + // Gauge always uses last-value aggregation + if (type === METRIC_TYPES.GAUGE) { + return TEMPORALITY.GAUGE + } + + switch (this.#temporalityPreference) { + case TEMPORALITY.CUMULATIVE: + return TEMPORALITY.CUMULATIVE + case TEMPORALITY.LOWMEMORY: + // LOWMEMORY: only synchronous Counter and Histogram use DELTA, Observable Counter uses CUMULATIVE + return (type === METRIC_TYPES.COUNTER || type === METRIC_TYPES.HISTOGRAM) + ? TEMPORALITY.DELTA + : TEMPORALITY.CUMULATIVE + default: + return TEMPORALITY.DELTA + } + } + + /** + * Aggregates measurements into metrics. + * @private + * @param {Measurement[]} measurements - The measurements to aggregate + * @param {Map} cumulativeState - The cumulative state of the metrics + * @param {Map} lastExportedState - The last exported state of the metrics + * @returns {Iterable} The aggregated metrics + */ + aggregate (measurements, cumulativeState, lastExportedState) { + const metricsMap = new Map() + + for (const measurement of measurements) { + const { + name, + description, + unit, + type, + instrumentationScope, + value, + attributes, + timestamp + } = measurement + + const scopeKey = this.#getScopeKey(instrumentationScope) + const metricKey = `${scopeKey}:${name}:${type}` + const attrKey = stableStringify(attributes) + const stateKey = this.#getStateKey(scopeKey, name, type, attrKey) + + let metric = metricsMap.get(metricKey) + if (!metric) { + if (metricsMap.size >= this.#maxBatchedQueueSize) { + log.warn( + `Metric queue exceeded limit (max: ${this.#maxBatchedQueueSize}). ` + + `Dropping metric: ${metricKey}, value: ${value}. ` + + 'Consider increasing OTEL_BSP_MAX_QUEUE_SIZE or decreasing OTEL_METRIC_EXPORT_INTERVAL.' + ) + continue + } + metric = { + name, + description, + unit, + type, + instrumentationScope, + temporality: this.#getTemporality(type), + dataPointMap: new Map() + } + metricsMap.set(metricKey, metric) + } + + if (type === METRIC_TYPES.COUNTER || type === METRIC_TYPES.UPDOWNCOUNTER) { + this.#aggregateSum(metric, value, attributes, attrKey, timestamp, stateKey, cumulativeState) + } else if (type === METRIC_TYPES.HISTOGRAM) { + this.#aggregateHistogram(metric, value, attributes, attrKey, timestamp, stateKey, cumulativeState) + } else { + this.#aggregateLastValue(metric, value, attributes, attrKey, timestamp) + } + } + + this.#applyDeltaTemporality(metricsMap, lastExportedState) + return metricsMap + } + + /** + * Gets unique identifier for a given instrumentation scope. + * @private + * @param {InstrumentationScope} instrumentationScope - The instrumentation scope + * @returns {string} - The scope identifier + */ + #getScopeKey (instrumentationScope) { + return `${instrumentationScope.name}@${instrumentationScope.version}@${instrumentationScope.schemaUrl}` + } + + /** + * Gets unique identifier for a given metric. + * @private + * @param {string} scopeKey - The scope identifier + * @param {string} name - The metric name + * @param {string} type - The metric type from METRIC_TYPES + * @param {string} attrKey - The attribute key + * @returns {string} - The metric identifier + */ + #getStateKey (scopeKey, name, type, attrKey) { + return `${scopeKey}:${name}:${type}:${attrKey}` + } + + /** + * Checks if a given metric type is a delta type. + * @private + * @param {string} type - The metric type from METRIC_TYPES + * @returns {boolean} - True if the metric type is a delta type + */ + #isDeltaType (type) { + return type === METRIC_TYPES.COUNTER || + type === METRIC_TYPES.OBSERVABLECOUNTER || + type === METRIC_TYPES.HISTOGRAM + } + + /** + * Applies delta temporality to the metrics. + * @private + * @param {Iterable} metrics - The metrics to apply delta temporality to + * @param {Map} lastExportedState - The last exported state of the metrics + * @returns {void} + */ + #applyDeltaTemporality (metrics, lastExportedState) { + for (const metric of metrics) { + if (metric.temporality === TEMPORALITY.DELTA && this.#isDeltaType(metric.type)) { + const scopeKey = this.#getScopeKey(metric.instrumentationScope) + + for (const dataPoint of metric.dataPointMap.values()) { + const stateKey = this.#getStateKey(scopeKey, metric.name, metric.type, dataPoint.attrKey) + + if (metric.type === METRIC_TYPES.COUNTER || metric.type === METRIC_TYPES.OBSERVABLECOUNTER) { + const lastValue = lastExportedState.get(stateKey) || 0 + const currentValue = dataPoint.value + dataPoint.value = currentValue - lastValue + lastExportedState.set(stateKey, currentValue) + } else if (metric.type === METRIC_TYPES.HISTOGRAM) { + const lastState = lastExportedState.get(stateKey) || { + count: 0, + sum: 0, + bucketCounts: new Array(dataPoint.bucketCounts.length).fill(0) + } + const currentState = { + count: dataPoint.count, + sum: dataPoint.sum, + min: dataPoint.min, + max: dataPoint.max, + bucketCounts: [...dataPoint.bucketCounts] + } + dataPoint.count = currentState.count - lastState.count + dataPoint.sum = currentState.sum - lastState.sum + dataPoint.bucketCounts = currentState.bucketCounts.map( + (count, idx) => count - (lastState.bucketCounts[idx] || 0) + ) + lastExportedState.set(stateKey, currentState) + } + } + } + } + } + + /** + * Finds or creates a data point for a given metric. + * @private + * @param {AggregatedMetric} metric - The metric to find or create a data point for + * @param {Attributes} attributes - The attributes of the metric + * @param {string} attrKey - The attribute key + * @param {Function} createInitialDataPoint - Function to create an initial data point + * @returns {NumberDataPoint|HistogramDataPoint} - The data point + */ + #findOrCreateDataPoint (metric, attributes, attrKey, createInitialDataPoint) { + let dataPoint = metric.dataPointMap.get(attrKey) + + if (!dataPoint) { + dataPoint = { attributes, attrKey, ...createInitialDataPoint() } + metric.dataPointMap.set(attrKey, dataPoint) + } + + return dataPoint + } + + /** + * Records the sum of all values for a given metric. + * Creates a new data point if it doesn't exist. + * @private + * @param {AggregatedMetric} metric - The metric to aggregate a sum for + * @param {number} value - The value to aggregate + * @param {Attributes} attributes - The attributes of the metric + * @param {string} attrKey - The attribute key + * @param {number} timestamp - The timestamp of the measurement + * @param {string} stateKey - The state key + * @param {Map} cumulativeState - The cumulative state of the metrics + */ + #aggregateSum (metric, value, attributes, attrKey, timestamp, stateKey, cumulativeState) { + if (!cumulativeState.has(stateKey)) { + cumulativeState.set(stateKey, { + value: 0, + startTime: metric.temporality === TEMPORALITY.CUMULATIVE ? this.#startTime : timestamp + }) + } + + const state = cumulativeState.get(stateKey) + state.value += value + + const dataPoint = this.#findOrCreateDataPoint(metric, attributes, attrKey, () => ({ + startTimeUnixNano: state.startTime, + timeUnixNano: timestamp, + value: 0 + })) + + dataPoint.value = state.value + dataPoint.timeUnixNano = timestamp + } + + /** + * Overwrites the last recorded value for a given metric or + * creates a new data point if it doesn't exist. + * @private + * @param {AggregatedMetric} metric - The metric to aggregate a last value for + * @param {number} value - The value to aggregate + * @param {Attributes} attributes - The attributes of the metric + * @param {string} attrKey - The attribute key + * @param {number} timestamp - The timestamp of the measurement + */ + #aggregateLastValue (metric, value, attributes, attrKey, timestamp) { + const dataPoint = this.#findOrCreateDataPoint(metric, attributes, attrKey, () => ({ + timeUnixNano: timestamp, + value: 0 + })) + + dataPoint.value = value + dataPoint.timeUnixNano = timestamp + } + + /** + * Aggregates histogram values by distributing them into buckets. + * Tracks count, sum, min, max, and per-bucket counts and creates + * a new data point if it doesn't exist. + * @private + * @param {AggregatedMetric} metric - The metric to aggregate a histogram for + * @param {number} value - The value to aggregate + * @param {Attributes} attributes - The attributes of the metric + * @param {string} attrKey - The attribute key + * @param {number} timestamp - The timestamp of the measurement + * @param {string} stateKey - The state key + * @param {Map} cumulativeState - The cumulative state of the metrics + * @returns {void} + */ + #aggregateHistogram (metric, value, attributes, attrKey, timestamp, stateKey, cumulativeState) { + if (!cumulativeState.has(stateKey)) { + cumulativeState.set(stateKey, { + count: 0, + sum: 0, + min: Infinity, + max: -Infinity, + bucketCounts: new Array(DEFAULT_HISTOGRAM_BUCKETS.length + 1).fill(0), + startTime: metric.temporality === TEMPORALITY.CUMULATIVE ? this.#startTime : timestamp + }) + } + + const state = cumulativeState.get(stateKey) + + let bucketIndex = DEFAULT_HISTOGRAM_BUCKETS.length + for (let i = 0; i < DEFAULT_HISTOGRAM_BUCKETS.length; i++) { + if (value <= DEFAULT_HISTOGRAM_BUCKETS[i]) { + bucketIndex = i + break + } + } + + state.bucketCounts[bucketIndex]++ + state.count++ + state.sum += value + state.min = Math.min(state.min, value) + state.max = Math.max(state.max, value) + + const dataPoint = this.#findOrCreateDataPoint(metric, attributes, attrKey, () => ({ + startTimeUnixNano: state.startTime, + timeUnixNano: timestamp, + count: 0, + sum: 0, + min: Infinity, + max: -Infinity, + bucketCounts: new Array(DEFAULT_HISTOGRAM_BUCKETS.length + 1).fill(0), + explicitBounds: DEFAULT_HISTOGRAM_BUCKETS + })) + + dataPoint.count = state.count + dataPoint.sum = state.sum + dataPoint.min = state.min + dataPoint.max = state.max + dataPoint.bucketCounts = [...state.bucketCounts] + dataPoint.timeUnixNano = timestamp + } +} + +module.exports = PeriodicMetricReader diff --git a/packages/dd-trace/src/opentelemetry/otlp/otlp_http_exporter_base.js b/packages/dd-trace/src/opentelemetry/otlp/otlp_http_exporter_base.js index 605a61ed081..b07c513386b 100644 --- a/packages/dd-trace/src/opentelemetry/otlp/otlp_http_exporter_base.js +++ b/packages/dd-trace/src/opentelemetry/otlp/otlp_http_exporter_base.js @@ -10,14 +10,12 @@ const tracerMetrics = telemetryMetrics.manager.namespace('tracers') /** * Base class for OTLP HTTP exporters. * - * This implementation follows the OTLP HTTP specification: + * This implementation follows the OTLP HTTP v1.7.0 specification: * https://opentelemetry.io/docs/specs/otlp/#otlphttp * * @class OtlpHttpExporterBase */ class OtlpHttpExporterBase { - #telemetryTags - /** * Creates a new OtlpHttpExporterBase instance. * @@ -49,31 +47,25 @@ class OtlpHttpExporterBase { ...this.#parseAdditionalHeaders(headers) } } - this.#telemetryTags = [ + this.telemetryTags = [ 'protocol:http', `encoding:${isJson ? 'json' : 'protobuf'}` ] } - /** - * Gets the telemetry tags for this exporter. - * @returns {Array} Telemetry tags - * @protected - */ - _getTelemetryTags () { - return this.#telemetryTags - } - /** * Records telemetry metrics for exported data. * @param {string} metricName - Name of the metric to record * @param {number} count - Count to increment - * @param {Array} [tags] - Optional custom tags (defaults to this exporter's tags) + * @param {Array} [additionalTags] - Optional custom tags (defaults to this exporter's tags) * @protected */ - _recordTelemetry (metricName, count, tags) { - const telemetryTags = tags || this.#telemetryTags - tracerMetrics.count(metricName, telemetryTags).inc(count) + recordTelemetry (metricName, count, additionalTags) { + if (additionalTags?.length > 0) { + tracerMetrics.count(metricName, [...this.telemetryTags, ...additionalTags || []]).inc(count) + } else { + tracerMetrics.count(metricName, this.telemetryTags).inc(count) + } } /** @@ -82,7 +74,7 @@ class OtlpHttpExporterBase { * @param {Function} resultCallback - Callback for the result * @protected */ - _sendPayload (payload, resultCallback) { + sendPayload (payload, resultCallback) { const options = { ...this.options, headers: { diff --git a/packages/dd-trace/src/opentelemetry/otlp/otlp_transformer_base.js b/packages/dd-trace/src/opentelemetry/otlp/otlp_transformer_base.js index 8b2c28229d5..dd4a185dc6a 100644 --- a/packages/dd-trace/src/opentelemetry/otlp/otlp_transformer_base.js +++ b/packages/dd-trace/src/opentelemetry/otlp/otlp_transformer_base.js @@ -25,7 +25,7 @@ class OtlpTransformerBase { * @param {string} signalType - Signal type for warning messages (e.g., 'logs', 'metrics') */ constructor (resourceAttributes, protocol, signalType) { - this.#resourceAttributes = this._transformAttributes(resourceAttributes) + this.#resourceAttributes = this.transformAttributes(resourceAttributes) if (protocol === 'grpc') { log.warn(`OTLP gRPC protocol is not supported for ${signalType}. ` + 'Defaulting to http/protobuf. gRPC protobuf support may be added in a future release.') @@ -40,12 +40,12 @@ class OtlpTransformerBase { * @returns {Map} Map of instrumentation scope key to items * @protected */ - _groupByInstrumentationScope (items) { + groupByInstrumentationScope (items) { const grouped = new Map() for (const item of items) { const instrumentationScope = item.instrumentationScope || { name: '', version: '', schemaUrl: '', attributes: {} } - const attrsKey = JSON.stringify(instrumentationScope.attributes || {}) + const attrsKey = stableStringify(instrumentationScope.attributes || {}) const key = `${instrumentationScope.name}@${instrumentationScope.version}@` + `${instrumentationScope.schemaUrl}@${attrsKey}` @@ -64,7 +64,7 @@ class OtlpTransformerBase { * @returns {Object} OTLP resource object * @protected */ - _transformResource () { + transformResource () { return { attributes: this.#resourceAttributes, droppedAttributesCount: 0 @@ -77,12 +77,10 @@ class OtlpTransformerBase { * @returns {Object[]} Array of OTLP KeyValue objects * @protected */ - _transformAttributes (attributes) { - if (!attributes) return [] - + transformAttributes (attributes) { return Object.entries(attributes).map(([key, value]) => ({ key, - value: this._transformAnyValue(value) + value: this.transformAnyValue(value) })) } @@ -92,7 +90,7 @@ class OtlpTransformerBase { * @returns {Object[]} Array of OTLP KeyValue objects with string values * @protected */ - _attributesToJson (attributes) { + attributesToJson (attributes) { if (!attributes) return [] return Object.entries(attributes).map(([key, value]) => ({ @@ -103,11 +101,13 @@ class OtlpTransformerBase { /** * Transforms any value to OTLP AnyValue format. + * Supports: strings, numbers (int/double), booleans, arrays. + * Objects are filtered out by sanitizeAttributes before reaching this method. * @param {any} value - Value to transform * @returns {Object} OTLP AnyValue object * @protected */ - _transformAnyValue (value) { + transformAnyValue (value) { if (typeof value === 'string') { return { stringValue: value } } else if (typeof value === 'number') { @@ -120,19 +120,11 @@ class OtlpTransformerBase { } else if (Array.isArray(value)) { return { arrayValue: { - values: value.map(v => this._transformAnyValue(v)) - } - } - } else if (value && typeof value === 'object') { - return { - kvlistValue: { - values: Object.entries(value).map(([k, v]) => ({ - key: k, - value: this._transformAnyValue(v) - })) + values: value.map(v => this.transformAnyValue(v)) } } } + // Fallback for any unexpected types return { stringValue: String(value) } } @@ -143,7 +135,7 @@ class OtlpTransformerBase { * @returns {Buffer} Protobuf-encoded data * @protected */ - _serializeToProtobuf (protoType, data) { + serializeToProtobuf (protoType, data) { const message = protoType.create(data) const buffer = protoType.encode(message).finish() return buffer @@ -155,9 +147,31 @@ class OtlpTransformerBase { * @returns {Buffer} JSON-encoded data * @protected */ - _serializeToJson (data) { + serializeToJson (data) { return Buffer.from(JSON.stringify(data)) } } +/** + * Stable stringification of OpenTelemetry Attributes. + * Ensures consistent serialization regardless of key order by sorting keys. + * Supports string keys with primitive values or arrays of primitives. + * + * @param {Attributes} attributes - Attributes object to stringify + * @returns {string} Stable string representation + */ +function stableStringify (attributes) { + if (attributes == null || typeof attributes !== 'object') { + return JSON.stringify(attributes) + } + // Attributes are sorted by key to ensure consistent serialization regardless of key order. + // Keys are always strings and values are always strings, numbers, booleans, + // or arrays of strings, numbers, or booleans. + return Object.keys(attributes) + .sort() + .map(key => `${key}:${JSON.stringify(attributes[key])}`) + .join(',') +} + module.exports = OtlpTransformerBase +module.exports.stableStringify = stableStringify diff --git a/packages/dd-trace/src/proxy.js b/packages/dd-trace/src/proxy.js index 3c569c47ce1..038505dc3e1 100644 --- a/packages/dd-trace/src/proxy.js +++ b/packages/dd-trace/src/proxy.js @@ -219,6 +219,11 @@ class Tracer extends NoopProxy { initializeOpenTelemetryLogs(config) } + if (config.otelMetricsEnabled) { + const { initializeOpenTelemetryMetrics } = require('./opentelemetry/metrics') + initializeOpenTelemetryMetrics(config) + } + if (config.isTestDynamicInstrumentationEnabled) { const getDynamicInstrumentationClient = require('./ci-visibility/dynamic-instrumentation') // We instantiate the client but do not start the Worker here. The worker is started lazily diff --git a/packages/dd-trace/src/supported-configurations.json b/packages/dd-trace/src/supported-configurations.json index f91ebd73507..1633949c47f 100644 --- a/packages/dd-trace/src/supported-configurations.json +++ b/packages/dd-trace/src/supported-configurations.json @@ -124,6 +124,7 @@ "DD_LOG_LEVEL": ["A"], "DD_LOGS_INJECTION": ["A"], "DD_LOGS_OTEL_ENABLED": ["A"], + "DD_METRICS_OTEL_ENABLED": ["A"], "DD_MINI_AGENT_PATH": ["A"], "DD_OPENAI_LOGS_ENABLED": ["A"], "DD_OPENAI_SPAN_CHAR_LIMIT": ["A"], @@ -458,6 +459,13 @@ "OTEL_BSP_SCHEDULE_DELAY": ["A"], "OTEL_BSP_MAX_EXPORT_BATCH_SIZE": ["A"], "OTEL_BSP_MAX_QUEUE_SIZE": ["A"], + "OTEL_EXPORTER_OTLP_METRICS_ENDPOINT": ["A"], + "OTEL_EXPORTER_OTLP_METRICS_HEADERS": ["A"], + "OTEL_EXPORTER_OTLP_METRICS_PROTOCOL": ["A"], + "OTEL_EXPORTER_OTLP_METRICS_TIMEOUT": ["A"], + "OTEL_METRIC_EXPORT_INTERVAL": ["A"], + "OTEL_METRIC_EXPORT_TIMEOUT": ["A"], + "OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE": ["A"], "OTEL_METRICS_EXPORTER": ["A"], "OTEL_PROPAGATORS": ["A"], "OTEL_RESOURCE_ATTRIBUTES": ["A"], diff --git a/packages/dd-trace/src/telemetry/telemetry.js b/packages/dd-trace/src/telemetry/telemetry.js index c19ff2d9708..55e6f1d6ae6 100644 --- a/packages/dd-trace/src/telemetry/telemetry.js +++ b/packages/dd-trace/src/telemetry/telemetry.js @@ -339,8 +339,17 @@ const nameMapping = { otelLogsProtocol: 'OTEL_EXPORTER_OTLP_LOGS_PROTOCOL', otelLogsTimeout: 'OTEL_EXPORTER_OTLP_LOGS_TIMEOUT', otelLogsUrl: 'OTEL_EXPORTER_OTLP_LOGS_ENDPOINT', - otelLogsBatchTimeout: 'OTEL_BSP_SCHEDULE_DELAY', - otelLogsMaxExportBatchSize: 'OTEL_BSP_MAX_EXPORT_BATCH_SIZE', + otelBatchTimeout: 'OTEL_BSP_SCHEDULE_DELAY', + otelMaxExportBatchSize: 'OTEL_BSP_MAX_EXPORT_BATCH_SIZE', + otelMaxQueueSize: 'OTEL_BSP_MAX_QUEUE_SIZE', + otelMetricsEnabled: 'DD_METRICS_OTEL_ENABLED', + otelMetricsHeaders: 'OTEL_EXPORTER_OTLP_METRICS_HEADERS', + otelMetricsProtocol: 'OTEL_EXPORTER_OTLP_METRICS_PROTOCOL', + otelMetricsTimeout: 'OTEL_EXPORTER_OTLP_METRICS_TIMEOUT', + otelMetricsExportTimeout: 'OTEL_METRIC_EXPORT_TIMEOUT', + otelMetricsUrl: 'OTEL_EXPORTER_OTLP_METRICS_ENDPOINT', + otelMetricsExportInterval: 'OTEL_METRIC_EXPORT_INTERVAL', + otelMetricsTemporalityPreference: 'OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE', } const namesNeedFormatting = new Set(['DD_TAGS', 'peerServiceMapping', 'serviceMapping']) diff --git a/packages/dd-trace/test/opentelemetry/metrics.spec.js b/packages/dd-trace/test/opentelemetry/metrics.spec.js new file mode 100644 index 00000000000..a6005a76829 --- /dev/null +++ b/packages/dd-trace/test/opentelemetry/metrics.spec.js @@ -0,0 +1,1041 @@ +'use strict' + +process.setMaxListeners(50) + +require('../setup/core') +const assert = require('assert') +const http = require('http') +const { describe, it, beforeEach, afterEach } = require('tap').mocha +const sinon = require('sinon') +const { metrics } = require('@opentelemetry/api') +const { protoMetricsService } = require('../../src/opentelemetry/otlp/protobuf_loader').getProtobufTypes() + +describe('OpenTelemetry Meter Provider', () => { + let originalEnv + let httpStub + + function setupTracer (envOverrides, setDefaultEnv = true) { + if (setDefaultEnv) { + process.env.DD_METRICS_OTEL_ENABLED = 'true' + process.env.DD_SERVICE = 'test-service' + process.env.DD_VERSION = '1.0.0' + process.env.DD_ENV = 'test' + process.env.OTEL_METRIC_EXPORT_INTERVAL = '100' + process.env.OTEL_EXPORTER_OTLP_METRICS_TIMEOUT = '5000' + } + Object.assign(process.env, envOverrides) + + const tracer = require('../../') + tracer._initialized = false + tracer.init() + return { tracer, meterProvider: metrics.getMeterProvider() } + } + + function mockOtlpExport (validator) { + let capturedPayload, capturedHeaders + let validatorCalled = false + + if (httpStub) { + httpStub.restore() + httpStub = null + } + + httpStub = sinon.stub(http, 'request').callsFake((options, callback) => { + const baseMockReq = { write: () => {}, end: () => {}, on: () => {}, once: () => {}, setTimeout: () => {} } + const baseMockRes = { statusCode: 200, on: () => {}, setTimeout: () => {} } + + if (options.path && options.path.includes('/v1/metrics')) { + capturedHeaders = options.headers + const responseHandlers = {} + const mockRes = { + ...baseMockRes, + on: (event, handler) => { responseHandlers[event] = handler; return mockRes } + } + + const mockReq = { + ...baseMockReq, + write: (data) => { capturedPayload = data }, + end: () => { + const contentType = capturedHeaders['Content-Type'] + const isJson = contentType && contentType.includes('application/json') + + const decoded = isJson + ? JSON.parse(capturedPayload.toString()) + : protoMetricsService.toObject(protoMetricsService.decode(capturedPayload), { + longs: Number, + defaults: false + }) + + validator(decoded, capturedHeaders) + validatorCalled = true + if (responseHandlers.end) responseHandlers.end() + } + } + callback(mockRes) + return mockReq + } + callback(baseMockRes) + return baseMockReq + }) + + return () => { + if (!validatorCalled) throw new Error('OTLP export validator was never called') + } + } + + beforeEach(() => { + originalEnv = { ...process.env } + }) + + afterEach(() => { + process.env = originalEnv + + const provider = metrics.getMeterProvider() + if (provider && provider.shutdown) { + provider.shutdown() + } + metrics.disable() + + if (httpStub) { + httpStub.restore() + httpStub = null + } + sinon.restore() + }) + + describe('Basic Functionality', () => { + it('exports counter metrics', (done) => { + const validator = mockOtlpExport((decoded) => { + const metrics = decoded.resourceMetrics[0].scopeMetrics[0].metrics + assert.strictEqual(metrics.length, 1) + assert.strictEqual(metrics[0].name, 'requests') + assert.strictEqual(metrics[0].sum.isMonotonic, true) + assert.strictEqual(metrics[0].sum.dataPoints[0].asDouble, 10.3) + }) + + setupTracer() + const meter = metrics.getMeter('app') + const counter = meter.createCounter('requests') + counter.add(5.1) + counter.add(5.2) + + setTimeout(() => { validator(); done() }, 150) + }) + + it('exports histogram metrics', (done) => { + const validator = mockOtlpExport((decoded) => { + const histogram = decoded.resourceMetrics[0].scopeMetrics[0].metrics[0] + assert.strictEqual(histogram.name, 'duration') + assert.strictEqual(histogram.histogram.dataPoints[0].count, 1) + assert.strictEqual(histogram.histogram.dataPoints[0].sum, 100) + }) + + setupTracer() + const meter = metrics.getMeter('app') + meter.createHistogram('duration').record(100) + + setTimeout(() => { validator(); done() }, 150) + }) + + it('ignores negative values and callback errors', (done) => { + let validated = false + const validator = mockOtlpExport((decoded) => { + const allMetrics = decoded.resourceMetrics[0].scopeMetrics[0].metrics + const histogram = allMetrics.find(m => m.name === 'size') + const counter = allMetrics.find(m => m.name === 'requests') + const gauge = allMetrics.find(m => m.name === 'memory') + + // Only validate the export that has all three metrics (first export) + if (histogram && counter && gauge && !validated) { + assert.strictEqual(histogram.histogram.dataPoints[0].count, 2) + assert.strictEqual(histogram.histogram.dataPoints[0].sum, 300) + assert.strictEqual(counter.sum.dataPoints[0].asInt, 15) + assert.strictEqual(gauge.gauge.dataPoints[0].asInt, 100) + validated = true + } + }) + + setupTracer() + const meter = metrics.getMeter('app') + + const hist = meter.createHistogram('size') + hist.record(100) + hist.record(-50) + hist.record(200) + hist.record(-100) + + const counter = meter.createCounter('requests') + counter.add(10) + counter.add(-5) + counter.add(5) + + const gauge = meter.createObservableGauge('memory') + gauge.addCallback(() => { throw new Error('Callback error') }) + gauge.addCallback((result) => result.observe(100)) + + setTimeout(() => { + assert(validated, 'Should have validated an export with all metrics') + validator() + done() + }, 150) + }) + + it('exports gauge metrics (last value wins)', (done) => { + const validator = mockOtlpExport((decoded) => { + const gauge = decoded.resourceMetrics[0].scopeMetrics[0].metrics[0] + assert.strictEqual(gauge.name, 'temperature') + assert.strictEqual(gauge.gauge.dataPoints[0].asInt, 75) + }) + + setupTracer() + const meter = metrics.getMeter('app') + const temp = meter.createGauge('temperature') + temp.record(72) + temp.record(75) + + setTimeout(() => { validator(); done() }, 150) + }) + + it('exports updowncounter metrics', (done) => { + const validator = mockOtlpExport((decoded) => { + const updown = decoded.resourceMetrics[0].scopeMetrics[0].metrics[0] + assert.strictEqual(updown.name, 'queue') + assert.strictEqual(updown.sum.isMonotonic, false) + assert.strictEqual(updown.sum.dataPoints[0].asInt, 7) + }) + + setupTracer() + const meter = metrics.getMeter('app') + const queue = meter.createUpDownCounter('queue') + queue.add(10) + queue.add(-3) + + setTimeout(() => { validator(); done() }, 150) + }) + + it('exports observable gauge metrics', (done) => { + const validator = mockOtlpExport((decoded) => { + const gauge = decoded.resourceMetrics[0].scopeMetrics[0].metrics[0] + assert.strictEqual(gauge.name, 'memory') + const dp = gauge.gauge.dataPoints[0] + const value = dp.asDouble !== undefined ? dp.asDouble : dp.asInt + assert(value > 0) + assert.strictEqual(dp.attributes.find(a => a.key === 'type').value.stringValue, 'heap') + }) + + setupTracer() + const meter = metrics.getMeter('app') + const mem = meter.createObservableGauge('memory') + mem.addCallback((result) => result.observe(process.memoryUsage().heapUsed, { type: 'heap' })) + + setTimeout(() => { validator(); done() }, 150) + }) + + it('exports observable counter metrics', (done) => { + const validator = mockOtlpExport((decoded) => { + const counter = decoded.resourceMetrics[0].scopeMetrics[0].metrics[0] + assert.strictEqual(counter.name, 'connections') + assert.strictEqual(counter.sum.isMonotonic, true) + assert.strictEqual(counter.sum.dataPoints[0].asInt, 42) + }) + + setupTracer({ OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE: 'CUMULATIVE' }) + const meter = metrics.getMeter('app') + const conn = meter.createObservableCounter('connections') + conn.addCallback((result) => result.observe(42)) + + setTimeout(() => { validator(); done() }, 150) + }) + + it('exports observable updowncounter metrics', (done) => { + const validator = mockOtlpExport((decoded) => { + const updown = decoded.resourceMetrics[0].scopeMetrics[0].metrics[0] + assert.strictEqual(updown.name, 'tasks') + assert.strictEqual(updown.sum.isMonotonic, false) + assert.strictEqual(updown.sum.dataPoints[0].asInt, 15) + }) + + setupTracer({ OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE: 'CUMULATIVE' }) + const meter = metrics.getMeter('app') + const tasks = meter.createObservableUpDownCounter('tasks') + tasks.addCallback((result) => result.observe(15)) + + setTimeout(() => { validator(); done() }, 150) + }) + }) + + describe('Configuration', () => { + it('uses protobuf with numeric timestamps by default', (done) => { + const validator = mockOtlpExport((decoded, headers) => { + assert.strictEqual(headers['Content-Type'], 'application/x-protobuf') + const dataPoint = decoded.resourceMetrics[0].scopeMetrics[0].metrics[0].sum.dataPoints[0] + assert.strictEqual(dataPoint.asInt, 5) + assert(dataPoint.timeUnixNano > 0) + }) + + setupTracer() + const meter = metrics.getMeter('app') + meter.createCounter('test').add(5) + + setTimeout(() => { validator(); done() }, 150) + }) + + it('uses JSON with string timestamps when configured', (done) => { + const validator = mockOtlpExport((decoded, headers) => { + assert.strictEqual(headers['Content-Type'], 'application/json') + const metrics = decoded.resourceMetrics[0].scopeMetrics[0].metrics + const counter = metrics.find(m => m.name === 'counter') + assert.strictEqual(counter.sum.dataPoints[0].asInt, 5) + const histogram = metrics.find(m => m.name === 'histogram') + assert.strictEqual(histogram.histogram.dataPoints[0].count, 2) + assert.strictEqual(histogram.histogram.dataPoints[0].sum, 30) + const gauge = metrics.find(m => m.name === 'gauge') + assert.strictEqual(gauge.gauge.dataPoints[0].asInt, 100) + }) + + setupTracer({ OTEL_EXPORTER_OTLP_METRICS_PROTOCOL: 'http/json' }) + const meter = metrics.getMeter('app') + meter.createCounter('counter').add(5) + meter.createHistogram('histogram').record(10) + meter.createHistogram('histogram').record(20) + meter.createGauge('gauge').record(100) + + setTimeout(() => { validator(); done() }, 150) + }) + + it('includes custom resource attributes and hostname when enabled', (done) => { + const validator = mockOtlpExport((decoded) => { + const attrs = {} + decoded.resourceMetrics[0].resource.attributes.forEach(attr => { + attrs[attr.key] = attr.value.stringValue || attr.value.intValue + }) + assert.strictEqual(attrs['service.name'], 'custom') + assert.strictEqual(attrs['service.version'], '2.0.0') + assert(attrs['host.name'], 'should include host.name') + }) + + setupTracer({ DD_SERVICE: 'custom', DD_VERSION: '2.0.0', DD_TRACE_REPORT_HOSTNAME: 'true' }) + const meter = metrics.getMeter('app') + meter.createCounter('test').add(1) + + setTimeout(() => { validator(); done() }, 150) + }) + + it('supports multiple attributes and data points', (done) => { + const validator = mockOtlpExport((decoded) => { + const counter = decoded.resourceMetrics[0].scopeMetrics[0].metrics[0] + assert.strictEqual(counter.name, 'api') + assert.strictEqual(counter.sum.dataPoints.length, 2) + const getDp = (method) => counter.sum.dataPoints.find(dp => + dp.attributes.some(a => a.key === 'method' && a.value.stringValue === method) + ) + assert.strictEqual(getDp('GET').asInt, 10) + assert.strictEqual(getDp('POST').asInt, 5) + }) + + setupTracer() + const meter = metrics.getMeter('app') + const api = meter.createCounter('api') + api.add(10, { method: 'GET' }) + api.add(5, { method: 'POST' }) + + setTimeout(() => { validator(); done() }, 150) + }) + + it('aggregates data points with the same attributes', (done) => { + const validator = mockOtlpExport((decoded) => { + const counter = decoded.resourceMetrics[0].scopeMetrics[0].metrics[0] + assert.strictEqual(counter.name, 'api') + assert.strictEqual(counter.sum.dataPoints.length, 2) + const getDp = (method, status) => counter.sum.dataPoints.find(dp => + dp.attributes.some(a => a.key === 'method' && a.value.stringValue === method) && + dp.attributes.some(a => a.key === 'status' && a.value.intValue === status) + ) + assert.strictEqual(getDp('GET', 200).asInt, 15) + assert.strictEqual(getDp('POST', 200).asInt, 150) + }) + + setupTracer() + const meter = metrics.getMeter('app') + const api = meter.createCounter('api') + api.add(10, { method: 'GET', status: 200 }) + api.add(5, { status: 200, method: 'GET' }) + api.add(100, { method: 'POST', status: 200 }) + api.add(50, { status: 200, method: 'POST' }) + + setTimeout(() => { validator(); done() }, 150) + }) + + it('encodes different attribute types and drops objects', (done) => { + let validated = false + const validator = mockOtlpExport((decoded) => { + if (validated) return + const metrics = decoded.resourceMetrics[0].scopeMetrics[0].metrics + const getAttr = (dp, key) => dp.attributes.find(a => a.key === key)?.value + const counter = metrics.find(m => m.name === 'test') + if (!counter) return + + const dp = counter.sum.dataPoints[0] + assert.strictEqual(getAttr(dp, 'str').stringValue, 'val') + assert.strictEqual(getAttr(dp, 'int').intValue, 42) + assert.strictEqual(getAttr(dp, 'double').doubleValue, 3.14) + assert.strictEqual(getAttr(dp, 'bool').boolValue, true) + assert.deepStrictEqual(getAttr(dp, 'arr').arrayValue.values.map(v => v.intValue || v.doubleValue), [1, 2, 3]) + // Verify object attributes are dropped per OpenTelemetry spec + assert.strictEqual(getAttr(dp, 'obj'), undefined) + validated = true + }) + + setupTracer() + const meter = metrics.getMeter('app') + meter.createCounter('test').add(5, { + str: 'val', + int: 42, + double: 3.14, + bool: true, + arr: [1, 2, 3], + obj: { nested: 'dropped' } + }) + + setTimeout(() => { + assert(validated, 'Should have validated attributes') + validator() + done() + }, 150) + }) + }) + + describe('Temporality', () => { + it('supports CUMULATIVE for counters', (done) => { + const validator = mockOtlpExport((decoded) => { + const counter = decoded.resourceMetrics[0].scopeMetrics[0].metrics[0] + assert.strictEqual(counter.name, 'test') + assert.strictEqual(counter.sum.aggregationTemporality, 2) + assert.strictEqual(counter.sum.dataPoints[0].asInt, 8) + }) + + setupTracer({ OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE: 'CUMULATIVE' }) + const meter = metrics.getMeter('app') + const counter = meter.createCounter('test') + counter.add(5) + counter.add(3) + + setTimeout(() => { validator(); done() }, 150) + }) + + it('supports DELTA for counters', (done) => { + const validator = mockOtlpExport((decoded) => { + const counter = decoded.resourceMetrics[0].scopeMetrics[0].metrics[0] + assert.strictEqual(counter.name, 'test') + assert.strictEqual(counter.sum.aggregationTemporality, 1) + assert.strictEqual(counter.sum.dataPoints[0].asInt, 5) + }) + + setupTracer({ OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE: 'DELTA' }) + const meter = metrics.getMeter('app') + meter.createCounter('test').add(5) + + setTimeout(() => { validator(); done() }, 150) + }) + + it('LOWMEMORY uses DELTA for sync counters', (done) => { + const validator = mockOtlpExport((decoded) => { + const counter = decoded.resourceMetrics[0].scopeMetrics[0].metrics[0] + assert.strictEqual(counter.sum.aggregationTemporality, 1) + assert.strictEqual(counter.sum.dataPoints[0].asInt, 5) + }) + + setupTracer({ OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE: 'LOWMEMORY' }) + const meter = metrics.getMeter('app') + meter.createCounter('sync').add(5) + + setTimeout(() => { validator(); done() }, 150) + }) + + it('LOWMEMORY uses CUMULATIVE for observable counters', (done) => { + const validator = mockOtlpExport((decoded) => { + const counter = decoded.resourceMetrics[0].scopeMetrics[0].metrics[0] + assert.strictEqual(counter.sum.aggregationTemporality, 2) + assert.strictEqual(counter.sum.dataPoints[0].asInt, 10) + }) + + setupTracer({ OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE: 'LOWMEMORY' }) + const meter = metrics.getMeter('app') + const obs = meter.createObservableCounter('obs') + obs.addCallback((result) => result.observe(10)) + + setTimeout(() => { validator(); done() }, 150) + }) + + it('updowncounter always uses CUMULATIVE', (done) => { + const validator = mockOtlpExport((decoded) => { + const updown = decoded.resourceMetrics[0].scopeMetrics[0].metrics[0] + assert.strictEqual(updown.sum.aggregationTemporality, 2) + assert.strictEqual(updown.sum.dataPoints[0].asInt, 5) + }) + + setupTracer({ OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE: 'DELTA' }) + const meter = metrics.getMeter('app') + meter.createUpDownCounter('updown').add(5) + + setTimeout(() => { validator(); done() }, 150) + }) + + it('observable updowncounter always uses CUMULATIVE', (done) => { + const validator = mockOtlpExport((decoded) => { + const updown = decoded.resourceMetrics[0].scopeMetrics[0].metrics[0] + assert.strictEqual(updown.name, 'obs.updown') + assert.strictEqual(updown.sum.aggregationTemporality, 2) + assert.strictEqual(updown.sum.dataPoints[0].asInt, 10) + }) + + setupTracer({ OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE: 'DELTA' }) + const meter = metrics.getMeter('app') + const obs = meter.createObservableUpDownCounter('obs.updown') + obs.addCallback((result) => result.observe(10)) + + setTimeout(() => { validator(); done() }, 150) + }) + + it('histograms support DELTA temporality', (done) => { + const validator = mockOtlpExport((decoded) => { + const histogram = decoded.resourceMetrics[0].scopeMetrics[0].metrics[0] + assert.strictEqual(histogram.histogram.aggregationTemporality, 1) + assert.strictEqual(histogram.histogram.dataPoints[0].count, 2) + assert.strictEqual(histogram.histogram.dataPoints[0].sum, 30) + }) + + setupTracer({ OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE: 'DELTA' }) + const meter = metrics.getMeter('app') + meter.createHistogram('latency').record(10) + meter.createHistogram('latency').record(20) + + setTimeout(() => { validator(); done() }, 150) + }) + + it('histograms support CUMULATIVE temporality', (done) => { + const validator = mockOtlpExport((decoded) => { + const histogram = decoded.resourceMetrics[0].scopeMetrics[0].metrics[0] + assert.strictEqual(histogram.histogram.aggregationTemporality, 2) + assert.strictEqual(histogram.histogram.dataPoints[0].count, 3) + assert.strictEqual(histogram.histogram.dataPoints[0].sum, 60) + }) + + setupTracer({ OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE: 'CUMULATIVE' }) + const meter = metrics.getMeter('app') + meter.createHistogram('latency').record(10) + meter.createHistogram('latency').record(20) + meter.createHistogram('latency').record(30) + + setTimeout(() => { validator(); done() }, 150) + }) + }) + + describe('Case Insensitivity', () => { + it('meter names are case-insensitive', () => { + setupTracer() + const meter1 = metrics.getMeter('MyApp') + const meter2 = metrics.getMeter('myapp') + assert.strictEqual(meter1, meter2) + }) + + it('metric names are case-insensitive', (done) => { + const validator = mockOtlpExport((decoded) => { + const counter = decoded.resourceMetrics[0].scopeMetrics[0].metrics[0] + assert.strictEqual(counter.name, 'mymetric') + assert.strictEqual(counter.sum.dataPoints[0].asInt, 6) + }) + + setupTracer() + const meter = metrics.getMeter('app') + const c1 = meter.createCounter('MyMetric') + const c2 = meter.createCounter('mymetric') + c1.add(1) + c2.add(2) + meter.createCounter('MYMETRIC').add(3) + + setTimeout(() => { validator(); done() }, 150) + }) + + it('different instrument types with same name are distinct', (done) => { + const validator = mockOtlpExport((decoded) => { + const metrics = decoded.resourceMetrics[0].scopeMetrics[0].metrics + assert.strictEqual(metrics.length, 2) + const counter = metrics.find(m => m.sum) + const histogram = metrics.find(m => m.histogram) + assert(counter, 'Should have counter') + assert(histogram, 'Should have histogram') + assert.strictEqual(counter.name, 'test') + assert.strictEqual(histogram.name, 'test') + assert.strictEqual(counter.sum.dataPoints.length, 1, 'Counter should have 1 data point') + assert.strictEqual(histogram.histogram.dataPoints.length, 1, 'Histogram should have 1 data point') + assert.strictEqual(counter.sum.dataPoints[0].asInt, 5) + assert.strictEqual(histogram.histogram.dataPoints[0].sum, 100) + }) + + setupTracer() + const meter = metrics.getMeter('app') + meter.createCounter('Test').add(5) + meter.createHistogram('TEST').record(100) + + setTimeout(() => { validator(); done() }, 150) + }) + }) + + describe('Lifecycle', () => { + it('returns no-op meter after shutdown', async () => { + setupTracer() + const provider = metrics.getMeterProvider() + await provider.shutdown() + + const meter = metrics.getMeter('test') + meter.createCounter('test').add(1) + meter.createUpDownCounter('test').add(1) + meter.createHistogram('test').record(1) + meter.createGauge('test').record(1) + meter.createObservableGauge('test').addCallback(() => {}) + meter.createObservableCounter('test').addCallback(() => {}) + meter.createObservableUpDownCounter('test').addCallback(() => {}) + }) + + it('handles shutdown gracefully', async () => { + setupTracer() + const provider = metrics.getMeterProvider() + await provider.shutdown() + await provider.shutdown() // Second shutdown should be safe + }) + + it('handles forceFlush', async () => { + const validator = mockOtlpExport((decoded) => { + assert(decoded.resourceMetrics) + }) + + setupTracer() + const meter = metrics.getMeter('app') + meter.createCounter('test').add(1) + + const provider = metrics.getMeterProvider() + await provider.forceFlush() + validator() + }) + + it('removes callbacks from observable instruments', (done) => { + const validator = mockOtlpExport((decoded) => { + const gauge = decoded.resourceMetrics[0].scopeMetrics[0].metrics[0] + assert.strictEqual(gauge.gauge.dataPoints[0].asInt, 200) + }) + + setupTracer() + const meter = metrics.getMeter('app') + const gauge = meter.createObservableGauge('temperature') + + const cb1 = (result) => result.observe(100) + const cb2 = (result) => result.observe(200) + gauge.addCallback(cb1) + gauge.addCallback(cb2) + gauge.removeCallback(cb1) + + setTimeout(() => { validator(); done() }, 150) + }) + }) + + describe('Unimplemented Features', () => { + it('logs warning for meter batch callbacks', () => { + const log = require('../../src/log') + const warnSpy = sinon.spy(log, 'warn') + + setupTracer() + const meter = metrics.getMeter('app') + meter.addBatchObservableCallback(() => {}, []) + meter.removeBatchObservableCallback(() => {}, []) + + assert.strictEqual(warnSpy.callCount, 2) + assert.strictEqual(warnSpy.firstCall.args[0], 'addBatchObservableCallback is not implemented') + assert.strictEqual(warnSpy.secondCall.args[0], 'removeBatchObservableCallback is not implemented') + + warnSpy.restore() + }) + }) + + describe('Protocol Configuration', () => { + it('uses default protobuf protocol', () => { + const { meterProvider } = setupTracer({ + OTEL_EXPORTER_OTLP_METRICS_PROTOCOL: undefined, + OTEL_EXPORTER_OTLP_PROTOCOL: undefined + }) + assert(meterProvider.reader) + assert.strictEqual(meterProvider.reader.exporter.transformer.protocol, 'http/protobuf') + }) + + it('configures protocol from environment variable', () => { + const { meterProvider } = setupTracer({ OTEL_EXPORTER_OTLP_PROTOCOL: 'http/json' }) + assert.strictEqual(meterProvider.reader.exporter.transformer.protocol, 'http/json') + }) + + it('prioritizes metrics-specific protocol over generic protocol', () => { + const { meterProvider } = setupTracer({ + OTEL_EXPORTER_OTLP_METRICS_PROTOCOL: 'http/json', + OTEL_EXPORTER_OTLP_PROTOCOL: 'http/protobuf' + }) + assert.strictEqual(meterProvider.reader.exporter.transformer.protocol, 'http/json') + }) + + it('logs warning and falls back to protobuf when gRPC protocol is set', () => { + const log = require('../../src/log') + const warnSpy = sinon.spy(log, 'warn') + const { meterProvider } = setupTracer({ OTEL_EXPORTER_OTLP_METRICS_PROTOCOL: 'grpc' }) + assert.strictEqual(meterProvider.reader.exporter.transformer.protocol, 'http/protobuf') + const expectedMsg = 'OTLP gRPC protocol is not supported for metrics. ' + + 'Defaulting to http/protobuf. gRPC protobuf support may be added in a future release.' + assert(warnSpy.calledWith(expectedMsg)) + warnSpy.restore() + }) + }) + + describe('Endpoint Configuration', () => { + it('configures OTLP endpoint from environment variable', () => { + const { meterProvider } = setupTracer({ + OTEL_EXPORTER_OTLP_METRICS_ENDPOINT: 'http://custom:4321/v1/metrics' + }) + assert.strictEqual(meterProvider.reader.exporter.options.path, '/v1/metrics') + assert.strictEqual(meterProvider.reader.exporter.options.hostname, 'custom') + assert.strictEqual(meterProvider.reader.exporter.options.port, '4321') + }) + + it('prioritizes metrics-specific endpoint over generic endpoint', () => { + const { meterProvider } = setupTracer({ + OTEL_EXPORTER_OTLP_METRICS_ENDPOINT: 'http://custom:4318/v1/metrics', + OTEL_EXPORTER_OTLP_ENDPOINT: 'http://generic:4318/v1/metrics' + }) + assert.strictEqual(meterProvider.reader.exporter.options.path, '/v1/metrics') + assert.strictEqual(meterProvider.reader.exporter.options.hostname, 'custom') + assert.strictEqual(meterProvider.reader.exporter.options.port, '4318') + }) + + it('appends /v1/metrics to endpoint if not provided', () => { + process.env.OTEL_EXPORTER_OTLP_ENDPOINT = 'http://custom:4318' + const { meterProvider } = setupTracer() + assert.strictEqual(meterProvider.reader.exporter.options.path, '/v1/metrics') + }) + }) + + describe('Headers Configuration', () => { + it('configures OTLP headers from environment variable', () => { + const { meterProvider } = setupTracer({ OTEL_EXPORTER_OTLP_HEADERS: 'api-key=secret,env=prod' }) + const exporter = meterProvider.reader.exporter + assert.strictEqual(exporter.options.headers['api-key'], 'secret') + assert.strictEqual(exporter.options.headers.env, 'prod') + }) + + it('prioritizes metrics-specific headers over generic OTLP headers', () => { + const { meterProvider } = setupTracer({ + OTEL_EXPORTER_OTLP_HEADERS: 'generic=value,shared=generic', + OTEL_EXPORTER_OTLP_METRICS_HEADERS: 'metrics-specific=value,shared=metrics' + }) + const exporter = meterProvider.reader.exporter + assert.strictEqual(exporter.options.headers['metrics-specific'], 'value') + assert.strictEqual(exporter.options.headers.shared, 'metrics') + assert.strictEqual(exporter.options.headers.generic, undefined) + }) + }) + + describe('Timeout Configuration', () => { + it('uses default timeout when not set', () => { + const { meterProvider } = setupTracer({ OTEL_EXPORTER_OTLP_METRICS_TIMEOUT: undefined }) + assert.strictEqual(meterProvider.reader.exporter.options.timeout, 10000) + }) + + it('configures OTLP timeout from environment variable', () => { + const { meterProvider } = setupTracer({ OTEL_EXPORTER_OTLP_METRICS_TIMEOUT: '1000' }) + assert.strictEqual(meterProvider.reader.exporter.options.timeout, 1000) + }) + + it('prioritizes metrics-specific timeout over generic timeout', () => { + const { meterProvider } = setupTracer( + { OTEL_EXPORTER_OTLP_METRICS_TIMEOUT: '1000', OTEL_EXPORTER_OTLP_TIMEOUT: '2000' } + ) + assert.strictEqual(meterProvider.reader.exporter.options.timeout, 1000) + }) + + it('falls back to generic timeout when metrics-specific not set', () => { + const { meterProvider } = setupTracer({ OTEL_EXPORTER_OTLP_TIMEOUT: '5000' }) + assert.strictEqual(meterProvider.reader.exporter.options.timeout, 5000) + }) + }) + + describe('NonNegInt Configuration Validation', () => { + let log, warnSpy + + beforeEach(() => { + log = require('../../src/log') + warnSpy = sinon.spy(log, 'warn') + }) + + afterEach(() => { + warnSpy.restore() + }) + + it('rejects zero for metrics configs with allowZero=false', () => { + setupTracer({ + OTEL_BSP_SCHEDULE_DELAY: '0', + OTEL_METRIC_EXPORT_INTERVAL: '0', + OTEL_BSP_MAX_QUEUE_SIZE: '0', + OTEL_EXPORTER_OTLP_TIMEOUT: '0', + OTEL_EXPORTER_OTLP_METRICS_TIMEOUT: '0', + OTEL_METRIC_EXPORT_TIMEOUT: '0', + OTEL_BSP_MAX_EXPORT_BATCH_SIZE: '0', + }, false) + assert(warnSpy.calledWith(sinon.match(/Invalid value 0 for OTEL_BSP_SCHEDULE_DELAY/))) + assert(warnSpy.calledWith(sinon.match(/Invalid value 0 for OTEL_METRIC_EXPORT_INTERVAL/))) + assert(warnSpy.calledWith(sinon.match(/Invalid value 0 for OTEL_BSP_MAX_EXPORT_BATCH_SIZE/))) + assert(warnSpy.calledWith(sinon.match(/Invalid value 0 for OTEL_BSP_MAX_QUEUE_SIZE/))) + assert(!warnSpy.calledWith(sinon.match(/Invalid value 0 for OTEL_EXPORTER_OTLP_TIMEOUT/))) + assert(!warnSpy.calledWith(sinon.match(/Invalid value 0 for OTEL_METRIC_EXPORT_TIMEOUT/))) + assert(!warnSpy.calledWith(sinon.match(/Invalid value 0 for OTEL_EXPORTER_OTLP_METRICS_TIMEOUT/))) + }) + + it('rejects negative values for all configs', () => { + setupTracer({ + OTEL_EXPORTER_OTLP_TIMEOUT: '-1', + OTEL_EXPORTER_OTLP_LOGS_TIMEOUT: '-1', + OTEL_EXPORTER_OTLP_METRICS_TIMEOUT: '-1', + OTEL_METRIC_EXPORT_TIMEOUT: '-1', + OTEL_METRIC_EXPORT_INTERVAL: '-1', + OTEL_BSP_SCHEDULE_DELAY: '-1', + OTEL_BSP_MAX_EXPORT_BATCH_SIZE: '-1', + OTEL_BSP_MAX_QUEUE_SIZE: '-1' + }, false) + assert(warnSpy.calledWith(sinon.match(/Invalid value -1 for OTEL_EXPORTER_OTLP_TIMEOUT/))) + assert(warnSpy.calledWith(sinon.match(/Invalid value -1 for OTEL_EXPORTER_OTLP_LOGS_TIMEOUT/))) + assert(warnSpy.calledWith(sinon.match(/Invalid value -1 for OTEL_EXPORTER_OTLP_METRICS_TIMEOUT/))) + assert(warnSpy.calledWith(sinon.match(/Invalid value -1 for OTEL_METRIC_EXPORT_TIMEOUT/))) + assert(warnSpy.calledWith(sinon.match(/Invalid value -1 for OTEL_METRIC_EXPORT_INTERVAL/))) + assert(warnSpy.calledWith(sinon.match(/Invalid value -1 for OTEL_BSP_SCHEDULE_DELAY/))) + assert(warnSpy.calledWith(sinon.match(/Invalid value -1 for OTEL_BSP_MAX_EXPORT_BATCH_SIZE/))) + assert(warnSpy.calledWith(sinon.match(/Invalid value -1 for OTEL_BSP_MAX_QUEUE_SIZE/))) + }) + + it('rejects values that are not numbers for all configs', () => { + setupTracer({ + OTEL_EXPORTER_OTLP_TIMEOUT: 'not a number', + OTEL_EXPORTER_OTLP_LOGS_TIMEOUT: 'invalid', + OTEL_EXPORTER_OTLP_METRICS_TIMEOUT: 'hi sir', + OTEL_METRIC_EXPORT_TIMEOUT: '@weeeeee', + OTEL_METRIC_EXPORT_INTERVAL: 'python!', + OTEL_BSP_SCHEDULE_DELAY: 'NaN', + OTEL_BSP_MAX_EXPORT_BATCH_SIZE: 'abc', + OTEL_BSP_MAX_QUEUE_SIZE: 'xyz' + }, false) + assert(warnSpy.calledWith(sinon.match(/Invalid value NaN for OTEL_EXPORTER_OTLP_TIMEOUT/))) + assert(warnSpy.calledWith(sinon.match(/Invalid value NaN for OTEL_EXPORTER_OTLP_LOGS_TIMEOUT/))) + assert(warnSpy.calledWith(sinon.match(/Invalid value NaN for OTEL_EXPORTER_OTLP_METRICS_TIMEOUT/))) + assert(warnSpy.calledWith(sinon.match(/Invalid value NaN for OTEL_METRIC_EXPORT_TIMEOUT/))) + assert(warnSpy.calledWith(sinon.match(/Invalid value NaN for OTEL_METRIC_EXPORT_INTERVAL/))) + assert(warnSpy.calledWith(sinon.match(/Invalid value NaN for OTEL_BSP_SCHEDULE_DELAY/))) + assert(warnSpy.calledWith(sinon.match(/Invalid value NaN for OTEL_BSP_MAX_EXPORT_BATCH_SIZE/))) + assert(warnSpy.calledWith(sinon.match(/Invalid value NaN for OTEL_BSP_MAX_QUEUE_SIZE/))) + }) + }) + + describe('Initialization', () => { + it('does not initialize when OTEL metrics are disabled', () => { + const { meterProvider } = setupTracer({ DD_METRICS_OTEL_ENABLED: undefined }) + const { MeterProvider } = require('../../src/opentelemetry/metrics') + + // Should return no-op provider when disabled, not our custom MeterProvider + assert.strictEqual(meterProvider instanceof MeterProvider, false) + }) + + it('handles shutdown correctly', () => { + const log = require('../../src/log') + const warnSpy = sinon.spy(log, 'warn') + + setupTracer() + const provider = metrics.getMeterProvider() + provider.shutdown() + + const meter = provider.getMeter('test') + meter.createCounter('test').add(1) + meter.createHistogram('test').record(100) + meter.createUpDownCounter('test').add(5) + meter.createGauge('test').record(1) + const obsGauge = meter.createObservableGauge('test') + obsGauge.addCallback(() => {}) + obsGauge.addCallback('not a function') + + provider.register() + assert.strictEqual(warnSpy.callCount, 1) + assert.strictEqual(warnSpy.firstCall.args[0], 'Cannot register after shutdown') + + warnSpy.restore() + }) + }) + + describe('Queue Size Limits', function () { + function countMetrics (metrics) { + return metrics.resourceMetrics[0].scopeMetrics[0].metrics.length + } + + it('overflows with 4 synchronous metrics when max is 3', (done) => { + const log = require('../../src/log') + const warnSpy = sinon.spy(log, 'warn') + const validator = mockOtlpExport((metrics) => { + assert.strictEqual(countMetrics(metrics), 3) + assert(warnSpy.getCalls().some(call => + call.args[0].includes('Metric queue exceeded limit (max: 3)') + )) + }) + + setupTracer( + { DD_METRICS_OTEL_ENABLED: 'true', OTEL_METRIC_EXPORT_INTERVAL: '100', OTEL_BSP_MAX_QUEUE_SIZE: '3' } + , false + ) + const meter = metrics.getMeterProvider().getMeter('test') + meter.createCounter('counter.1').add(1) + meter.createCounter('counter.2').add(2) + meter.createCounter('counter.3').add(3) + meter.createCounter('counter.4').add(4) + + setTimeout(() => { validator(); warnSpy.restore(); done() }, 200) + }) + + it('overflows with 4 observable metrics when max is 3', (done) => { + const log = require('../../src/log') + const warnSpy = sinon.spy(log, 'warn') + let callCount = 0 + const validator = mockOtlpExport((metrics) => { + if (++callCount === 1) { + assert.strictEqual(countMetrics(metrics), 3) + assert(warnSpy.getCalls().some(call => call.args[0].includes('Metric queue exceeded limit'))) + } + }) + + setupTracer( + { DD_METRICS_OTEL_ENABLED: 'true', OTEL_METRIC_EXPORT_INTERVAL: '100', OTEL_BSP_MAX_QUEUE_SIZE: '3' }, + false + ) + const meter = metrics.getMeterProvider().getMeter('test') + meter.createObservableGauge('gauge.1').addCallback((result) => result.observe(10)) + meter.createObservableGauge('gauge.2').addCallback((result) => result.observe(20)) + meter.createObservableGauge('gauge.3').addCallback((result) => result.observe(30)) + meter.createObservableGauge('gauge.4').addCallback((result) => result.observe(40)) + + setTimeout(() => { validator(); warnSpy.restore(); done() }, 200) + }) + + it('overflows with 2 synchronous + 2 observable metrics when max is 3', (done) => { + const log = require('../../src/log') + const warnSpy = sinon.spy(log, 'warn') + let firstExport = true + const validator = mockOtlpExport((metrics) => { + if (!firstExport) return + firstExport = false + assert.strictEqual(countMetrics(metrics), 3) + assert(warnSpy.getCalls().some(call => call.args[0].includes('Metric queue exceeded limit'))) + }) + + setupTracer( + { DD_METRICS_OTEL_ENABLED: 'true', OTEL_METRIC_EXPORT_INTERVAL: '100', OTEL_BSP_MAX_QUEUE_SIZE: '3' }, + false + ) + const meter = metrics.getMeterProvider().getMeter('test') + meter.createCounter('counter.1').add(1) + meter.createCounter('counter.2').add(2) + meter.createObservableGauge('gauge.1').addCallback((result) => result.observe(10)) + meter.createObservableGauge('gauge.2').addCallback((result) => result.observe(20)) + + setTimeout(() => { validator(); warnSpy.restore(); done() }, 200) + }) + + it('drops measurements when DEFAULT_MAX_MEASUREMENT_QUEUE_SIZE is exceeded', (done) => { + const log = require('../../src/log') + const warnSpy = sinon.spy(log, 'warn') + let firstExport = true + // Validates that up to 524288 measurements can be queued in a metrics interval. + // Note: This test uses significant amount of memory and could be flaky on machines + // with limited memory. + const validator = mockOtlpExport((metrics) => { + if (!firstExport) return + firstExport = false + + const exportedMetrics = metrics.resourceMetrics[0].scopeMetrics[0].metrics + assert(!exportedMetrics.find(m => m.name === 'counter.overflow')) + assert(!exportedMetrics.find(m => m.name === 'gauge.overflow')) + const counter1Metric = exportedMetrics.find(m => m.name === 'counter.sync') + assert(counter1Metric, 'counter.sync should be exported') + assert.strictEqual(counter1Metric.sum.dataPoints.length, 524288) + assert(warnSpy.getCalls().some(call => + call.args[0].includes('Metric queue exceeded limit') && + call.args[0].includes('max: 524288') && + call.args[0].includes('Dropping 2 measurements') + )) + }) + + setupTracer({ DD_METRICS_OTEL_ENABLED: 'true', OTEL_METRIC_EXPORT_INTERVAL: '30000' }, false) + const meter = metrics.getMeterProvider().getMeter('test') + const counter = meter.createCounter('counter.sync') + + for (let i = 0; i < 524288; i++) counter.add(1, { id: `${i}` }) + + meter.createCounter('counter.overflow').add(1) + meter.createObservableGauge('gauge.overflow').addCallback((result) => result.observe(1)) + + metrics.getMeterProvider().forceFlush() + setTimeout(() => { validator(); warnSpy.restore(); done() }, 100) + }) + }) + + describe('HTTP Export Behavior', () => { + it('handles timeout and error without network', (done) => { + const results = { timeout: false, error: false } + let requestCount = 0 + + if (httpStub) { + httpStub.restore() + httpStub = null + } + + httpStub = sinon.stub(http, 'request').callsFake((options, callback) => { + requestCount++ + assert(options.headers['Content-Length'] > 0) + + const handlers = {} + const mockReq = { + write: sinon.stub(), + end: sinon.stub(), + on: (event, handler) => { + handlers[event] = handler + return mockReq + }, + destroy: sinon.stub(), + setTimeout: sinon.stub() + } + + if (requestCount === 1) { + setTimeout(() => { handlers.timeout && handlers.timeout(); results.timeout = true }, 10) + } else { + setTimeout(() => { + handlers.error && handlers.error(new Error('Refused')) + results.error = true + }, 10) + } + + return mockReq + }) + + setupTracer() + const meter = metrics.getMeter('app') + meter.createCounter('test1').add(1) + + setTimeout(() => { + meter.createCounter('test2').add(2) + }, 120) + + setTimeout(() => { + assert(results.timeout) + assert(results.error) + done() + }, 300) + }) + }) +})