diff --git a/CHANGELOG.md b/CHANGELOG.md index 507cc3b7c2e..4fefda964a7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,7 +17,8 @@ For notes on migrating to 2.x / 0.200.x see [the upgrade guide](doc/upgrade-to-2 ### :rocket: Features -* feat(sdk-logs): implement log creation metrics [#6433](https://github.com/open-telemetry/opentelemetry-js/pull/6433) @anuraaga +* feat(sdk-logs): implement log creation metrics [#6433](https://github.com/open-telemetry/opentelemetry-js/pull/6433) @anuraaga +* feat(sdk-logs): implement log processor metrics [#6554](https://github.com/open-telemetry/opentelemetry-js/pull/6554) @anuraaga ### :bug: Bug Fixes diff --git a/experimental/packages/opentelemetry-sdk-node/src/sdk.ts b/experimental/packages/opentelemetry-sdk-node/src/sdk.ts index d9e865515ed..22cb03a38d7 100644 --- a/experimental/packages/opentelemetry-sdk-node/src/sdk.ts +++ b/experimental/packages/opentelemetry-sdk-node/src/sdk.ts @@ -249,8 +249,6 @@ export class NodeSDK { diag.warn( "The 'logRecordProcessor' option is deprecated. Please use 'logRecordProcessors' instead." ); - } else { - this.configureLoggerProviderFromEnv(); } if (configuration.metricReaders) { @@ -355,6 +353,12 @@ export class NodeSDK { trace.setGlobalTracerProvider(this._tracerProvider); } + if (!this._loggerProviderConfig) { + this.configureLoggerProviderFromEnv( + sdkMetricsEnabled ? this._meterProvider : undefined + ); + } + if (this._loggerProviderConfig) { const loggerProvider = new LoggerProvider({ ...getLoggerProviderConfigFromEnv(), @@ -388,7 +392,9 @@ export class NodeSDK { ); } - private configureLoggerProviderFromEnv(): void { + private configureLoggerProviderFromEnv( + meterProvider: MeterProvider | undefined + ): void { const enabledExporters = Array.from( new Set(getStringListFromEnv('OTEL_LOGS_EXPORTER') ?? []) ); @@ -444,9 +450,9 @@ export class NodeSDK { this._loggerProviderConfig = { logRecordProcessors: exporters.map(exporter => { if (exporter instanceof ConsoleLogRecordExporter) { - return new SimpleLogRecordProcessor(exporter); + return new SimpleLogRecordProcessor(exporter, { meterProvider }); } else { - return getBatchLogRecordProcessorFromEnv(exporter); + return getBatchLogRecordProcessorFromEnv(exporter, meterProvider); } }), }; diff --git a/experimental/packages/opentelemetry-sdk-node/src/utils.ts b/experimental/packages/opentelemetry-sdk-node/src/utils.ts index e819a1f0bee..e48fe915999 100644 --- a/experimental/packages/opentelemetry-sdk-node/src/utils.ts +++ b/experimental/packages/opentelemetry-sdk-node/src/utils.ts @@ -56,6 +56,7 @@ import type { import type { AggregationOption, IMetricReader, + MeterProvider, PushMetricExporter, ViewOptions, } from '@opentelemetry/sdk-metrics'; @@ -578,12 +579,13 @@ export function getBatchLogRecordProcessorConfigFromEnv(): BufferConfig { } export function getBatchLogRecordProcessorFromEnv( - exporter: LogRecordExporter + exporter: LogRecordExporter, + meterProvider: MeterProvider | undefined ): BatchLogRecordProcessor { - return new BatchLogRecordProcessor( - exporter, - getBatchLogRecordProcessorConfigFromEnv() - ); + return new BatchLogRecordProcessor(exporter, { + ...getBatchLogRecordProcessorConfigFromEnv(), + meterProvider, + }); } export function getLogRecordExporter( diff --git a/experimental/packages/opentelemetry-sdk-node/test/sdk.test.ts b/experimental/packages/opentelemetry-sdk-node/test/sdk.test.ts index 212a2a5ef78..1e328281342 100644 --- a/experimental/packages/opentelemetry-sdk-node/test/sdk.test.ts +++ b/experimental/packages/opentelemetry-sdk-node/test/sdk.test.ts @@ -411,27 +411,14 @@ describe('Node SDK', () => { it('should configure components for SDK metrics if enabled', async () => { process.env.OTEL_NODE_EXPERIMENTAL_SDK_METRICS = 'true'; - const exporter = new ConsoleMetricExporter(); - const metricReader = new PeriodicExportingMetricReader({ - exporter: exporter, - exportIntervalMillis: 100, - exportTimeoutMillis: 100, - }); + process.env.OTEL_TRACES_EXPORTER = 'console'; + process.env.OTEL_LOGS_EXPORTER = 'console'; + process.env.OTEL_METRICS_EXPORTER = 'console'; - const sdk = new NodeSDK({ - metricReader: metricReader, - traceExporter: new ConsoleSpanExporter(), - logRecordProcessors: [ - new SimpleLogRecordProcessor(new InMemoryLogRecordExporter()), - ], - autoDetectResources: false, - }); + const sdk = new NodeSDK(); sdk.start(); - assertDefaultContextManagerRegistered(); - assertDefaultPropagatorRegistered(); - assert.strictEqual(setGlobalTracerProviderSpy.callCount, 1); const tracerProvider = setGlobalTracerProviderSpy.lastCall.args[0]; assert.ok(tracerProvider instanceof NodeTracerProvider); @@ -444,13 +431,21 @@ describe('Node SDK', () => { (loggerProvider as any)['_sharedState'].loggerMetrics.createdLogs, NOOP_COUNTER_METRIC ); + assert.notDeepEqual( + (loggerProvider as any)['_sharedState'].registeredLogRecordProcessors[0] + ._metrics.processedLogs, + NOOP_COUNTER_METRIC + ); assert.ok(metrics.getMeterProvider() instanceof MeterProvider); await sdk.shutdown(); }); - it('should not configure components for SDK metrics if disabled', async () => { + it('should configure initialized components for SDK metrics if enabled', async () => { + process.env.OTEL_NODE_EXPERIMENTAL_SDK_METRICS = 'true'; + process.env.OTEL_LOGS_EXPORTER = 'console'; + const exporter = new ConsoleMetricExporter(); const metricReader = new PeriodicExportingMetricReader({ exporter: exporter, @@ -461,16 +456,42 @@ describe('Node SDK', () => { const sdk = new NodeSDK({ metricReader: metricReader, traceExporter: new ConsoleSpanExporter(), - logRecordProcessors: [ - new SimpleLogRecordProcessor(new InMemoryLogRecordExporter()), - ], autoDetectResources: false, }); sdk.start(); - assertDefaultContextManagerRegistered(); - assertDefaultPropagatorRegistered(); + assert.strictEqual(setGlobalTracerProviderSpy.callCount, 1); + const tracerProvider = setGlobalTracerProviderSpy.lastCall.args[0]; + assert.ok(tracerProvider instanceof NodeTracerProvider); + assert.ok( + (tracerProvider as any)._config.meterProvider instanceof MeterProvider + ); + + const loggerProvider = setGlobalLoggerProviderSpy.lastCall.args[0]; + assert.notDeepEqual( + (loggerProvider as any)['_sharedState'].loggerMetrics.createdLogs, + NOOP_COUNTER_METRIC + ); + assert.notDeepEqual( + (loggerProvider as any)['_sharedState'].registeredLogRecordProcessors[0] + ._metrics.processedLogs, + NOOP_COUNTER_METRIC + ); + + assert.ok(metrics.getMeterProvider() instanceof MeterProvider); + + await sdk.shutdown(); + }); + + it('should not configure components for SDK metrics if disabled', async () => { + process.env.OTEL_TRACES_EXPORTER = 'console'; + process.env.OTEL_LOGS_EXPORTER = 'console'; + process.env.OTEL_METRICS_EXPORTER = 'console'; + + const sdk = new NodeSDK(); + + sdk.start(); assert.strictEqual(setGlobalTracerProviderSpy.callCount, 1); const tracerProvider = setGlobalTracerProviderSpy.lastCall.args[0]; @@ -482,6 +503,11 @@ describe('Node SDK', () => { (loggerProvider as any)['_sharedState'].loggerMetrics.createdLogs, NOOP_COUNTER_METRIC ); + assert.deepEqual( + (loggerProvider as any)['_sharedState'].registeredLogRecordProcessors[0] + ._metrics.processedLogs, + NOOP_COUNTER_METRIC + ); assert.ok(metrics.getMeterProvider() instanceof MeterProvider); diff --git a/experimental/packages/sdk-logs/src/export/BatchLogRecordProcessorBase.ts b/experimental/packages/sdk-logs/src/export/BatchLogRecordProcessorBase.ts index 3b3f95e4014..a1e918098b8 100644 --- a/experimental/packages/sdk-logs/src/export/BatchLogRecordProcessorBase.ts +++ b/experimental/packages/sdk-logs/src/export/BatchLogRecordProcessorBase.ts @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -import { context, diag } from '@opentelemetry/api'; +import { context, createNoopMeter, diag } from '@opentelemetry/api'; import { ExportResultCode, globalErrorHandler, @@ -15,6 +15,9 @@ import type { BufferConfig } from '../types'; import type { SdkLogRecord } from './SdkLogRecord'; import type { LogRecordExporter } from './LogRecordExporter'; import type { LogRecordProcessor } from '../LogRecordProcessor'; +import type { LogRecordProcessorConfig } from './LogRecordProcessorConfig'; +import { LogRecordProcessorMetrics } from './LogRecordProcessorMetrics'; +import { OTEL_COMPONENT_TYPE_VALUE_BATCHING_LOG_PROCESSOR } from '../semconv'; /** * Waits for all pending async resources in the log records to be resolved. @@ -42,12 +45,14 @@ async function waitForResources(logRecords: SdkLogRecord[]): Promise { class ExportOperation { private readonly _exportCompleted: Promise; private readonly _exportScheduledPromise: Promise; + private readonly _metrics: LogRecordProcessorMetrics; private _exportScheduledResolve!: () => void; constructor( exporter: LogRecordExporter, logRecords: SdkLogRecord[], - exportTimeoutMillis: number + exportTimeoutMillis: number, + metrics: LogRecordProcessorMetrics ) { this._exportScheduledPromise = new Promise(resolve => { this._exportScheduledResolve = resolve; @@ -57,6 +62,7 @@ class ExportOperation { logRecords, exportTimeoutMillis ); + this._metrics = metrics; } /** Get the promise that resolves when the export completes */ @@ -106,6 +112,7 @@ class ExportOperation { // Call exporter.export() and immediately resolve exportScheduled exporter.export(logRecords, result => { + this._metrics.finishLogs(logRecords.length, result.error); clearTimeout(timer); if (result.code === ExportResultCode.SUCCESS) { resolve(); @@ -131,6 +138,7 @@ export abstract class BatchLogRecordProcessorBase private readonly _scheduledDelayMillis: number; private readonly _exportTimeoutMillis: number; private readonly _exporter: LogRecordExporter; + private readonly _metrics: LogRecordProcessorMetrics; private _currentExport: ExportOperation | null = null; private _finishedLogRecords: SdkLogRecord[] = []; @@ -138,7 +146,10 @@ export abstract class BatchLogRecordProcessorBase private _shutdownOnce: BindOnceFuture; private _flushing: boolean = false; - constructor(exporter: LogRecordExporter, config?: T) { + constructor( + exporter: LogRecordExporter, + config?: T & LogRecordProcessorConfig + ) { this._exporter = exporter; this._maxExportBatchSize = config?.maxExportBatchSize ?? 512; this._maxQueueSize = config?.maxQueueSize ?? 2048; @@ -153,6 +164,19 @@ export abstract class BatchLogRecordProcessorBase ); this._maxExportBatchSize = this._maxQueueSize; } + + const meter = config?.meterProvider + ? config.meterProvider.getMeter('@opentelemetry/sdk-logs') + : createNoopMeter(); + + this._metrics = new LogRecordProcessorMetrics( + OTEL_COMPONENT_TYPE_VALUE_BATCHING_LOG_PROCESSOR, + meter, + { + capacity: this._maxQueueSize, + getQueueSize: () => this._finishedLogRecords.length, + } + ); } public onEmit(logRecord: SdkLogRecord): void { @@ -172,6 +196,7 @@ export abstract class BatchLogRecordProcessorBase /** Add a LogRecord in the buffer. */ private _addToBuffer(logRecord: SdkLogRecord) { if (this._finishedLogRecords.length >= this._maxQueueSize) { + this._metrics.dropLogs(1); return; } this._finishedLogRecords.push(logRecord); @@ -185,6 +210,7 @@ export abstract class BatchLogRecordProcessorBase private async _shutdown(): Promise { this.onShutdown(); await this._flushAll(); + this._metrics.shutdown(); await this._exporter.shutdown(); } @@ -230,7 +256,8 @@ export abstract class BatchLogRecordProcessorBase const exportOp = new ExportOperation( this._exporter, batch, - this._exportTimeoutMillis + this._exportTimeoutMillis, + this._metrics ); this._currentExport = exportOp; @@ -279,7 +306,8 @@ export abstract class BatchLogRecordProcessorBase const exportOp = new ExportOperation( this._exporter, logRecords, - this._exportTimeoutMillis + this._exportTimeoutMillis, + this._metrics ); this._currentExport = exportOp; diff --git a/experimental/packages/sdk-logs/src/export/LogRecordProcessorConfig.ts b/experimental/packages/sdk-logs/src/export/LogRecordProcessorConfig.ts new file mode 100644 index 00000000000..e2557a1524b --- /dev/null +++ b/experimental/packages/sdk-logs/src/export/LogRecordProcessorConfig.ts @@ -0,0 +1,13 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +import type { MeterProvider } from '@opentelemetry/api'; + +/** + * Common options for SDK log record processors. + */ +export interface LogRecordProcessorConfig { + meterProvider?: MeterProvider; +} diff --git a/experimental/packages/sdk-logs/src/export/LogRecordProcessorMetrics.ts b/experimental/packages/sdk-logs/src/export/LogRecordProcessorMetrics.ts new file mode 100644 index 00000000000..e4a2c22e827 --- /dev/null +++ b/experimental/packages/sdk-logs/src/export/LogRecordProcessorMetrics.ts @@ -0,0 +1,108 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +import type { + Attributes, + Counter, + Meter, + ObservableCallback, + ObservableUpDownCounter, +} from '@opentelemetry/api'; +import { + ATTR_ERROR_TYPE, + ATTR_OTEL_COMPONENT_NAME, + ATTR_OTEL_COMPONENT_TYPE, + METRIC_OTEL_SDK_PROCESSOR_LOG_PROCESSED, + METRIC_OTEL_SDK_PROCESSOR_LOG_QUEUE_CAPACITY, + METRIC_OTEL_SDK_PROCESSOR_LOG_QUEUE_SIZE, +} from '../semconv'; + +const componentCounter = new Map(); + +interface QueueConfig { + capacity: number; + getQueueSize: () => number; +} + +export class LogRecordProcessorMetrics { + private readonly processedLogs: Counter; + private readonly queueSize: ObservableUpDownCounter | undefined; + private readonly queueSizeCallback: ObservableCallback | undefined; + + private readonly standardAttrs: Attributes; + private readonly droppedAttrs: Attributes; + + constructor(componentType: string, meter: Meter, queueConfig?: QueueConfig) { + const counter = componentCounter.get(componentType) ?? 0; + componentCounter.set(componentType, counter + 1); + + this.standardAttrs = { + [ATTR_OTEL_COMPONENT_TYPE]: componentType, + [ATTR_OTEL_COMPONENT_NAME]: `${componentType}/${counter}`, + }; + + this.droppedAttrs = { + ...this.standardAttrs, + [ATTR_ERROR_TYPE]: 'queue_full', + }; + + this.processedLogs = meter.createCounter( + METRIC_OTEL_SDK_PROCESSOR_LOG_PROCESSED, + { + unit: '{log_record}', + description: + 'The number of log records for which the processing has finished, either successful or failed.', + } + ); + + if (queueConfig) { + const { capacity, getQueueSize } = queueConfig; + const queueCapacity = meter.createUpDownCounter( + METRIC_OTEL_SDK_PROCESSOR_LOG_QUEUE_CAPACITY, + { + unit: '{log_record}', + description: + 'The maximum number of log records the queue of a given instance of an SDK log processor can hold.', + } + ); + queueCapacity.add(capacity, this.standardAttrs); + + this.queueSize = meter.createObservableUpDownCounter( + METRIC_OTEL_SDK_PROCESSOR_LOG_QUEUE_SIZE, + { + unit: '{log_record}', + description: + 'The number of log records in the queue of a given instance of an SDK log processor.', + } + ); + this.queueSizeCallback = result => + result.observe(getQueueSize(), this.standardAttrs); + this.queueSize.addCallback(this.queueSizeCallback); + } + } + + dropLogs(count: number) { + this.processedLogs.add(count, this.droppedAttrs); + } + + finishLogs(count: number, error: Error | undefined) { + if (!error) { + this.processedLogs.add(count, this.standardAttrs); + return; + } + + const attrs = { + ...this.standardAttrs, + [ATTR_ERROR_TYPE]: error.name, + }; + this.processedLogs.add(count, attrs); + } + + shutdown() { + if (this.queueSize && this.queueSizeCallback) { + this.queueSize.removeCallback(this.queueSizeCallback); + } + } +} diff --git a/experimental/packages/sdk-logs/src/export/SimpleLogRecordProcessor.ts b/experimental/packages/sdk-logs/src/export/SimpleLogRecordProcessor.ts index 68e6282c3bb..0474f8d39ef 100644 --- a/experimental/packages/sdk-logs/src/export/SimpleLogRecordProcessor.ts +++ b/experimental/packages/sdk-logs/src/export/SimpleLogRecordProcessor.ts @@ -3,6 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ +import { createNoopMeter } from '@opentelemetry/api'; import type { ExportResult } from '@opentelemetry/core'; import { BindOnceFuture, @@ -13,6 +14,9 @@ import { import type { LogRecordExporter } from './LogRecordExporter'; import type { LogRecordProcessor } from '../LogRecordProcessor'; import type { SdkLogRecord } from './SdkLogRecord'; +import type { LogRecordProcessorConfig } from './LogRecordProcessorConfig'; +import { OTEL_COMPONENT_TYPE_VALUE_SIMPLE_LOG_PROCESSOR } from '../semconv'; +import { LogRecordProcessorMetrics } from './LogRecordProcessorMetrics'; /** * An implementation of the {@link LogRecordProcessor} interface that exports @@ -25,13 +29,22 @@ import type { SdkLogRecord } from './SdkLogRecord'; */ export class SimpleLogRecordProcessor implements LogRecordProcessor { private readonly _exporter: LogRecordExporter; + private readonly _metrics: LogRecordProcessorMetrics; private _shutdownOnce: BindOnceFuture; private _unresolvedExports: Set>; - constructor(exporter: LogRecordExporter) { + constructor(exporter: LogRecordExporter, config?: LogRecordProcessorConfig) { this._exporter = exporter; this._shutdownOnce = new BindOnceFuture(this._shutdown, this); this._unresolvedExports = new Set>(); + + const meter = config?.meterProvider + ? config.meterProvider.getMeter('@opentelemetry/sdk-logs') + : createNoopMeter(); + this._metrics = new LogRecordProcessorMetrics( + OTEL_COMPONENT_TYPE_VALUE_SIMPLE_LOG_PROCESSOR, + meter + ); } public onEmit(logRecord: SdkLogRecord): void { @@ -43,6 +56,7 @@ export class SimpleLogRecordProcessor implements LogRecordProcessor { internal ._export(this._exporter, [logRecord]) .then((result: ExportResult) => { + this._metrics.finishLogs(1, result.error); if (result.code !== ExportResultCode.SUCCESS) { globalErrorHandler( result.error ?? @@ -85,6 +99,7 @@ export class SimpleLogRecordProcessor implements LogRecordProcessor { } private _shutdown(): Promise { + this._metrics.shutdown(); return this._exporter.shutdown(); } } diff --git a/experimental/packages/sdk-logs/src/semconv.ts b/experimental/packages/sdk-logs/src/semconv.ts index 80a4462e6d3..0bfdd00bd53 100644 --- a/experimental/packages/sdk-logs/src/semconv.ts +++ b/experimental/packages/sdk-logs/src/semconv.ts @@ -15,3 +15,122 @@ * @experimental This metric is experimental and is subject to breaking changes in minor releases of `@opentelemetry/semantic-conventions`. */ export const METRIC_OTEL_SDK_LOG_CREATED = 'otel.sdk.log.created' as const; + +/** + * The number of log records for which the processing has finished, either successful or failed. + * + * @note For successful processing, `error.type` **MUST NOT** be set. For failed processing, `error.type` **MUST** contain the failure cause. + * For the SDK Simple and Batching Log Record Processor a log record is considered to be processed already when it has been submitted to the exporter, + * not when the corresponding export call has finished. + * + * @experimental This metric is experimental and is subject to breaking changes in minor releases of `@opentelemetry/semantic-conventions`. + */ +export const METRIC_OTEL_SDK_PROCESSOR_LOG_PROCESSED = + 'otel.sdk.processor.log.processed' as const; + +/** + * The maximum number of log records the queue of a given instance of an SDK Log Record processor can hold. + * + * @note Only applies to Log Record processors which use a queue, e.g. the SDK Batching Log Record Processor. + * + * @experimental This metric is experimental and is subject to breaking changes in minor releases of `@opentelemetry/semantic-conventions`. + */ +export const METRIC_OTEL_SDK_PROCESSOR_LOG_QUEUE_CAPACITY = + 'otel.sdk.processor.log.queue.capacity' as const; + +/** + * The number of log records in the queue of a given instance of an SDK log processor. + * + * @note Only applies to log record processors which use a queue, e.g. the SDK Batching Log Record Processor. + * + * @experimental This metric is experimental and is subject to breaking changes in minor releases of `@opentelemetry/semantic-conventions`. + */ +export const METRIC_OTEL_SDK_PROCESSOR_LOG_QUEUE_SIZE = + 'otel.sdk.processor.log.queue.size' as const; + +/** + * A name uniquely identifying the instance of the OpenTelemetry component within its containing SDK instance. + * + * @example otlp_grpc_span_exporter/0 + * @example custom-name + * + * @note Implementations **SHOULD** ensure a low cardinality for this attribute, even across application or SDK restarts. + * E.g. implementations **MUST NOT** use UUIDs as values for this attribute. + * + * Implementations **MAY** achieve these goals by following a `/` pattern, e.g. `batching_span_processor/0`. + * Hereby `otel.component.type` refers to the corresponding attribute value of the component. + * + * The value of `instance-counter` **MAY** be automatically assigned by the component and uniqueness within the enclosing SDK instance **MUST** be guaranteed. + * For example, `` **MAY** be implemented by using a monotonically increasing counter (starting with `0`), which is incremented every time an + * instance of the given component type is started. + * + * With this implementation, for example the first Batching Span Processor would have `batching_span_processor/0` + * as `otel.component.name`, the second one `batching_span_processor/1` and so on. + * These values will therefore be reused in the case of an application restart. + * + * @experimental This attribute is experimental and is subject to breaking changes in minor releases of `@opentelemetry/semantic-conventions`. + */ +export const ATTR_OTEL_COMPONENT_NAME = 'otel.component.name' as const; + +/** + * A name identifying the type of the OpenTelemetry component. + * + * @example batching_span_processor + * @example com.example.MySpanExporter + * + * @note If none of the standardized values apply, implementations **SHOULD** use the language-defined name of the type. + * E.g. for Java the fully qualified classname **SHOULD** be used in this case. + * + * @experimental This attribute is experimental and is subject to breaking changes in minor releases of `@opentelemetry/semantic-conventions`. + */ +export const ATTR_OTEL_COMPONENT_TYPE = 'otel.component.type' as const; + +/** + * Enum value "batching_log_processor" for attribute {@link ATTR_OTEL_COMPONENT_TYPE}. + * + * The builtin SDK batching log record processor + * + * @experimental This enum value is experimental and is subject to breaking changes in minor releases of `@opentelemetry/semantic-conventions`. + */ +export const OTEL_COMPONENT_TYPE_VALUE_BATCHING_LOG_PROCESSOR = + 'batching_log_processor' as const; + +/** + * Enum value "simple_log_processor" for attribute {@link ATTR_OTEL_COMPONENT_TYPE}. + * + * The builtin SDK simple log record processor + * + * @experimental This enum value is experimental and is subject to breaking changes in minor releases of `@opentelemetry/semantic-conventions`. + */ +export const OTEL_COMPONENT_TYPE_VALUE_SIMPLE_LOG_PROCESSOR = + 'simple_log_processor' as const; + +/** + * Describes a class of error the operation ended with. + * + * @example timeout + * @example java.net.UnknownHostException + * @example server_certificate_invalid + * @example 500 + * + * @note The `error.type` **SHOULD** be predictable, and **SHOULD** have low cardinality. + * + * When `error.type` is set to a type (e.g., an exception type), its + * canonical class name identifying the type within the artifact **SHOULD** be used. + * + * Instrumentations **SHOULD** document the list of errors they report. + * + * The cardinality of `error.type` within one instrumentation library **SHOULD** be low. + * Telemetry consumers that aggregate data from multiple instrumentation libraries and applications + * should be prepared for `error.type` to have high cardinality at query time when no + * additional filters are applied. + * + * If the operation has completed successfully, instrumentations **SHOULD NOT** set `error.type`. + * + * If a specific domain defines its own set of error identifiers (such as HTTP or RPC status codes), + * it's **RECOMMENDED** to: + * + * - Use a domain-specific attribute + * - Set `error.type` to capture all errors, regardless of whether they are defined within the domain-specific set or not. + */ +export const ATTR_ERROR_TYPE = 'error.type' as const; diff --git a/experimental/packages/sdk-logs/test/common/export/BatchLogRecordProcessor.test.ts b/experimental/packages/sdk-logs/test/common/export/BatchLogRecordProcessor.test.ts index 13896482355..8881a20e3ba 100644 --- a/experimental/packages/sdk-logs/test/common/export/BatchLogRecordProcessor.test.ts +++ b/experimental/packages/sdk-logs/test/common/export/BatchLogRecordProcessor.test.ts @@ -11,6 +11,7 @@ import { loggingErrorHandler, setGlobalErrorHandler, } from '@opentelemetry/core'; +import { MeterProvider } from '@opentelemetry/sdk-metrics'; import type { BufferConfig, @@ -27,6 +28,7 @@ import { resourceFromAttributes, } from '@opentelemetry/resources'; import { LogRecordImpl } from '../../../src/LogRecordImpl'; +import { TestMetricReader, withResolvers } from '../utils'; class BatchLogRecordProcessor extends BatchLogRecordProcessorBase { onShutdown() {} @@ -624,4 +626,209 @@ describe('BatchLogRecordProcessorBase', () => { assert.equal(logRecords.length, 10); }); }); + + describe('Metrics', () => { + it('should record metrics', async () => { + const metricReader = new TestMetricReader(); + const meterProvider = new MeterProvider({ + readers: [metricReader], + }); + const processor = new BatchLogRecordProcessor(exporter, { + maxQueueSize: 1, + maxExportBatchSize: 1, + scheduledDelayMillis: 1_000_000_000, // Manually flush + meterProvider, + }); + + const exportStub = sinon.stub(exporter, 'export'); + + const { resolve: resolveExport1, promise: export1Promise } = + withResolvers(); + const { resolve: resolveExport2, promise: export2Promise } = + withResolvers(); + + // Signal for when export has started + const { resolve: resolveFirstExport, promise: firstExportPromise } = + withResolvers(); + + exportStub + .onFirstCall() + .callsFake((_logs, resultCallback: (result: ExportResult) => void) => { + resolveFirstExport(); + export1Promise.then(result => resultCallback(result)); + }) + .onSecondCall() + .callsFake((_logs, resultCallback: (result: ExportResult) => void) => { + export2Promise.then(result => resultCallback(result)); + }); + + const log1 = createLogRecord(); + // Immediately processed + processor.onEmit(log1); + + // Wait for log to be sent to exporter. + await firstExportPromise; + + // Queue empty, export in progress, this log is queued. + const log2 = createLogRecord(); + processor.onEmit(log2); + + // Queue full, this log is dropped. + const log3 = createLogRecord(); + processor.onEmit(log3); + + let { resourceMetrics } = await metricReader.collect(); + let scopeMetrics = resourceMetrics.scopeMetrics.find( + sm => sm.scope.name === '@opentelemetry/sdk-logs' + ); + assert.ok(scopeMetrics); + let processedLogsMetric = scopeMetrics.metrics.find( + m => m.descriptor.name === 'otel.sdk.processor.log.processed' + ); + assert.ok(processedLogsMetric); + assert.strictEqual(processedLogsMetric.dataPoints[0].value, 1); + assert.strictEqual( + processedLogsMetric.dataPoints[0].attributes['otel.component.type'], + 'batching_log_processor' + ); + assert.ok( + processedLogsMetric.dataPoints[0].attributes['otel.component.name'] + ?.toString() + .startsWith('batching_log_processor/') + ); + assert.strictEqual( + processedLogsMetric.dataPoints[0].attributes['error.type'], + 'queue_full' + ); + let logCapacityMetric = scopeMetrics.metrics.find( + m => m.descriptor.name === 'otel.sdk.processor.log.queue.capacity' + ); + assert.ok(logCapacityMetric); + assert.strictEqual(logCapacityMetric.dataPoints[0].value, 1); + assert.strictEqual( + logCapacityMetric.dataPoints[0].attributes['otel.component.type'], + 'batching_log_processor' + ); + assert.ok( + logCapacityMetric.dataPoints[0].attributes['otel.component.name'] + ?.toString() + .startsWith('batching_log_processor/') + ); + assert.strictEqual(logCapacityMetric.dataPoints[0].value, 1); + let logQueueSizeMetric = scopeMetrics.metrics.find( + m => m.descriptor.name === 'otel.sdk.processor.log.queue.size' + ); + assert.ok(logQueueSizeMetric); + assert.strictEqual(logQueueSizeMetric.dataPoints[0].value, 1); + assert.strictEqual( + logQueueSizeMetric.dataPoints[0].attributes['otel.component.type'], + 'batching_log_processor' + ); + assert.ok( + logQueueSizeMetric.dataPoints[0].attributes['otel.component.name'] + ?.toString() + .startsWith('batching_log_processor/') + ); + sinon.assert.calledOnce(exportStub); + + resolveExport1({ code: ExportResultCode.SUCCESS }); + const error = new Error('Export failed'); + error.name = 'BackendError'; + resolveExport2({ + code: ExportResultCode.FAILED, + error, + }); + + await processor.forceFlush(); + sinon.assert.calledTwice(exportStub); + + ({ resourceMetrics } = await metricReader.collect()); + scopeMetrics = resourceMetrics.scopeMetrics.find( + sm => sm.scope.name === '@opentelemetry/sdk-logs' + ); + assert.ok(scopeMetrics); + + processedLogsMetric = scopeMetrics.metrics.find( + m => m.descriptor.name === 'otel.sdk.processor.log.processed' + ); + assert.ok(processedLogsMetric); + const processedLogsDataPoints = processedLogsMetric.dataPoints as Array<{ + value: number; + attributes: Record; + }>; + const queueFullPoint = processedLogsDataPoints.find( + dataPoint => dataPoint.attributes['error.type'] === 'queue_full' + ); + assert.ok(queueFullPoint); + assert.strictEqual(queueFullPoint.value, 1); + assert.strictEqual( + queueFullPoint.attributes['otel.component.type'], + 'batching_log_processor' + ); + assert.ok( + queueFullPoint.attributes['otel.component.name'] + ?.toString() + .startsWith('batching_log_processor/') + ); + const successPoint = processedLogsDataPoints.find( + dataPoint => dataPoint.attributes['error.type'] === undefined + ); + assert.ok(successPoint); + assert.strictEqual(successPoint.value, 1); + assert.strictEqual( + successPoint.attributes['otel.component.type'], + 'batching_log_processor' + ); + assert.ok( + successPoint.attributes['otel.component.name'] + ?.toString() + .startsWith('batching_log_processor/') + ); + const failedPoint = processedLogsDataPoints.find( + dataPoint => dataPoint.attributes['error.type'] === 'BackendError' + ); + assert.ok(failedPoint); + assert.strictEqual(failedPoint.value, 1); + assert.strictEqual( + failedPoint.attributes['otel.component.type'], + 'batching_log_processor' + ); + assert.ok( + failedPoint.attributes['otel.component.name'] + ?.toString() + .startsWith('batching_log_processor/') + ); + + logCapacityMetric = scopeMetrics.metrics.find( + m => m.descriptor.name === 'otel.sdk.processor.log.queue.capacity' + ); + assert.ok(logCapacityMetric); + assert.strictEqual(logCapacityMetric.dataPoints[0].value, 1); + assert.strictEqual( + logCapacityMetric.dataPoints[0].attributes['otel.component.type'], + 'batching_log_processor' + ); + assert.ok( + logCapacityMetric.dataPoints[0].attributes['otel.component.name'] + ?.toString() + .startsWith('batching_log_processor/') + ); + assert.strictEqual(logCapacityMetric.dataPoints[0].value, 1); + + logQueueSizeMetric = scopeMetrics.metrics.find( + m => m.descriptor.name === 'otel.sdk.processor.log.queue.size' + ); + assert.ok(logQueueSizeMetric); + assert.strictEqual(logQueueSizeMetric.dataPoints[0].value, 0); + assert.strictEqual( + logQueueSizeMetric.dataPoints[0].attributes['otel.component.type'], + 'batching_log_processor' + ); + assert.ok( + logQueueSizeMetric.dataPoints[0].attributes['otel.component.name'] + ?.toString() + .startsWith('batching_log_processor/') + ); + }); + }); }); diff --git a/experimental/packages/sdk-logs/test/common/export/SimpleLogRecordProcessor.test.ts b/experimental/packages/sdk-logs/test/common/export/SimpleLogRecordProcessor.test.ts index b181a5c2d6c..5c925eb0a27 100644 --- a/experimental/packages/sdk-logs/test/common/export/SimpleLogRecordProcessor.test.ts +++ b/experimental/packages/sdk-logs/test/common/export/SimpleLogRecordProcessor.test.ts @@ -5,6 +5,7 @@ import * as assert from 'assert'; import * as sinon from 'sinon'; +import type { ExportResult } from '@opentelemetry/core'; import { ExportResultCode, loggingErrorHandler, @@ -15,6 +16,7 @@ import { defaultResource, resourceFromAttributes, } from '@opentelemetry/resources'; +import { MeterProvider } from '@opentelemetry/sdk-metrics'; import type { LogRecordExporter } from './../../../src'; import { @@ -24,6 +26,7 @@ import { import { LoggerProviderSharedState } from '../../../src/internal/LoggerProviderSharedState'; import { TestExporterWithDelay } from './TestExporterWithDelay'; import { LogRecordImpl } from '../../../src/LogRecordImpl'; +import { TestMetricReader } from '../utils'; const setup = (exporter: LogRecordExporter, resource?: Resource) => { const sharedState = new LoggerProviderSharedState( @@ -153,4 +156,78 @@ describe('SimpleLogRecordProcessor', () => { assert.strictEqual(exportedLogRecords.length, 1); }); }); + + describe('Metrics', () => { + it('should record metrics', async () => { + const metricReader = new TestMetricReader(); + const meterProvider = new MeterProvider({ + readers: [metricReader], + }); + const exporter = new InMemoryLogRecordExporter(); + const { logRecord } = setup(exporter); + const processor = new SimpleLogRecordProcessor(exporter, { + meterProvider, + }); + + const exportStub = sinon.stub(exporter, 'export'); + exportStub + .onFirstCall() + .callsFake((_logs, resultCallback: (result: ExportResult) => void) => { + resultCallback({ code: ExportResultCode.SUCCESS }); + }) + .onSecondCall() + .callsFake((_logs, resultCallback: (result: ExportResult) => void) => { + const error = new Error('Export failed'); + error.name = 'SystemError'; + resultCallback({ code: ExportResultCode.FAILED, error }); + }); + + processor.onEmit(logRecord); + processor.onEmit(logRecord); + + await processor.forceFlush(); + + const { resourceMetrics } = await metricReader.collect(); + const scopeMetrics = resourceMetrics.scopeMetrics.find( + sm => sm.scope.name === '@opentelemetry/sdk-logs' + ); + assert.ok(scopeMetrics); + const processedLogsMetric = scopeMetrics.metrics.find( + m => m.descriptor.name === 'otel.sdk.processor.log.processed' + ); + assert.ok(processedLogsMetric); + const processedLogsDataPoints = processedLogsMetric.dataPoints as Array<{ + value: number; + attributes: Record; + }>; + const successPoint = processedLogsDataPoints.find( + dataPoint => dataPoint.attributes['error.type'] === undefined + ); + assert.ok(successPoint); + assert.strictEqual(successPoint.value, 1); + assert.strictEqual( + successPoint.attributes['otel.component.type'], + 'simple_log_processor' + ); + assert.ok( + successPoint.attributes['otel.component.name'] + ?.toString() + .startsWith('simple_log_processor/') + ); + const failedPoint = processedLogsDataPoints.find( + dataPoint => dataPoint.attributes['error.type'] === 'SystemError' + ); + assert.ok(failedPoint); + assert.strictEqual(failedPoint.value, 1); + assert.strictEqual( + failedPoint.attributes['otel.component.type'], + 'simple_log_processor' + ); + assert.ok( + failedPoint.attributes['otel.component.name'] + ?.toString() + .startsWith('simple_log_processor/') + ); + }); + }); }); diff --git a/experimental/packages/sdk-logs/test/common/utils.ts b/experimental/packages/sdk-logs/test/common/utils.ts index 1ed7f28db8a..7c03e363193 100644 --- a/experimental/packages/sdk-logs/test/common/utils.ts +++ b/experimental/packages/sdk-logs/test/common/utils.ts @@ -30,3 +30,25 @@ export class TestMetricReader extends MetricReader { return Promise.resolve(); } } + +interface Resolvers { + promise: Promise; + resolve: (value: T) => void; + reject: (reason: any) => void; +} + +// Use Promise.withResolvers when we can +export function withResolvers(): Resolvers { + let resolve: (value: T) => void; + let reject: (reason: any) => void; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + + return { + promise, + resolve: resolve!, + reject: reject!, + }; +}