diff --git a/.buildkite/scripts/steps/security/third_party_packages.txt b/.buildkite/scripts/steps/security/third_party_packages.txt index c959186c7fe18..52d43bdbe250f 100644 --- a/.buildkite/scripts/steps/security/third_party_packages.txt +++ b/.buildkite/scripts/steps/security/third_party_packages.txt @@ -50,3 +50,4 @@ patch-package @opentelemetry/resources @opentelemetry/exporter-logs-otlp-grpc @opentelemetry/exporter-logs-otlp-proto +@opentelemetry/otlp-transformer diff --git a/package.json b/package.json index 4ccb87abda0f7..97b11d3124ec3 100644 --- a/package.json +++ b/package.json @@ -1313,6 +1313,7 @@ "@opentelemetry/instrumentation-http": "0.215.0", "@opentelemetry/instrumentation-undici": "0.25.0", "@opentelemetry/otlp-exporter-base": "0.215.0", + "@opentelemetry/otlp-transformer": "0.215.0", "@opentelemetry/resources": "2.7.0", "@opentelemetry/sdk-logs": "0.215.0", "@opentelemetry/semantic-conventions": "1.40.0", diff --git a/renovate.json b/renovate.json index 6531a37c383fb..d8e07e83b7fef 100644 --- a/renovate.json +++ b/renovate.json @@ -2336,6 +2336,7 @@ "@opentelemetry/instrumentation-http", "@opentelemetry/instrumentation-undici", "@opentelemetry/otlp-exporter-base", + "@opentelemetry/otlp-transformer", "@opentelemetry/resources", "@opentelemetry/sdk-logs", "@opentelemetry/semantic-conventions" diff --git a/src/platform/packages/shared/kbn-tracing/index.ts b/src/platform/packages/shared/kbn-tracing/index.ts index 9ae2f76b39ad9..339c71182d4c6 100644 --- a/src/platform/packages/shared/kbn-tracing/index.ts +++ b/src/platform/packages/shared/kbn-tracing/index.ts @@ -10,3 +10,4 @@ export { LateBindingSpanProcessor } from './src/late_binding_span_processor'; export { initTracing } from './src/init_tracing'; export { OTLPSpanProcessor } from './src/otlp_span_processor'; +export { ElasticsearchOtlpExporter } from './src/elasticsearch_otlp_exporter'; diff --git a/src/platform/packages/shared/kbn-tracing/moon.yml b/src/platform/packages/shared/kbn-tracing/moon.yml index 2926705ea7570..dcb0728401f84 100644 --- a/src/platform/packages/shared/kbn-tracing/moon.yml +++ b/src/platform/packages/shared/kbn-tracing/moon.yml @@ -21,6 +21,7 @@ dependsOn: - '@kbn/std' - '@kbn/tracing-config' - '@kbn/cleanup-before-exit' + - '@kbn/core-elasticsearch-server' tags: - shared-server - package diff --git a/src/platform/packages/shared/kbn-tracing/src/elasticsearch_otlp_exporter.test.ts b/src/platform/packages/shared/kbn-tracing/src/elasticsearch_otlp_exporter.test.ts new file mode 100644 index 0000000000000..b168351cd360b --- /dev/null +++ b/src/platform/packages/shared/kbn-tracing/src/elasticsearch_otlp_exporter.test.ts @@ -0,0 +1,186 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +import { core } from '@elastic/opentelemetry-node/sdk'; +import type { tracing } from '@elastic/opentelemetry-node/sdk'; +import { ProtobufTraceSerializer } from '@opentelemetry/otlp-transformer'; +import { ElasticsearchOtlpExporter } from './elasticsearch_otlp_exporter'; +import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server'; + +jest.mock('@opentelemetry/otlp-transformer', () => ({ + ProtobufTraceSerializer: { + serializeRequest: jest.fn(), + }, +})); + +const mockedSerializeRequest = ProtobufTraceSerializer.serializeRequest as jest.MockedFunction< + typeof ProtobufTraceSerializer.serializeRequest +>; + +describe('ElasticsearchOtlpExporter', () => { + const spans = [] as tracing.ReadableSpan[]; + + beforeEach(() => { + jest.clearAllMocks(); + }); + + it('calls transport.request with correct path, method, and headers on success', (done) => { + const serialized = new Uint8Array([1, 2, 3]); + mockedSerializeRequest.mockReturnValue(serialized); + + const request = jest.fn().mockResolvedValue({}); + const client = { + transport: { request }, + } as unknown as ElasticsearchClient; + + const exporter = new ElasticsearchOtlpExporter(client); + + exporter.export(spans, (result) => { + try { + expect(result.code).toBe(core.ExportResultCode.SUCCESS); + expect(request).toHaveBeenCalledWith( + { + method: 'POST', + path: '/_otlp/v1/traces', + body: Buffer.from(serialized), + }, + { + headers: { 'Content-Type': 'application/x-protobuf' }, + maxRetries: 3, + } + ); + done(); + } catch (err) { + done(err); + } + }); + }); + + it('returns FAILED when serialization fails', (done) => { + mockedSerializeRequest.mockImplementation(() => undefined); + + const request = jest.fn(); + const client = { + transport: { request }, + } as unknown as ElasticsearchClient; + + const exporter = new ElasticsearchOtlpExporter(client); + + exporter.export(spans, (result) => { + try { + expect(result.code).toBe(core.ExportResultCode.FAILED); + expect(result.error).toEqual(new Error('Serialization failed')); + expect(request).not.toHaveBeenCalled(); + done(); + } catch (err) { + done(err); + } + }); + }); + + it('returns FAILED when transport.request rejects', (done) => { + const serialized = new Uint8Array([9]); + mockedSerializeRequest.mockReturnValue(serialized); + + const transportError = new Error('connection reset'); + const request = jest.fn().mockRejectedValue(transportError); + const client = { + transport: { request }, + } as unknown as ElasticsearchClient; + + const exporter = new ElasticsearchOtlpExporter(client); + + exporter.export(spans, (result) => { + try { + expect(result.code).toBe(core.ExportResultCode.FAILED); + expect(result.error).toBe(transportError); + done(); + } catch (err) { + done(err); + } + }); + }); + + it('shutdown waits for in-flight exports to complete before returning', async () => { + const serialized = new Uint8Array([1, 2, 3]); + mockedSerializeRequest.mockReturnValue(serialized); + + const request = jest.fn().mockResolvedValue({}); + const client = { + transport: { request }, + } as unknown as ElasticsearchClient; + + const exporter = new ElasticsearchOtlpExporter(client); + const results: core.ExportResult[] = []; + exporter.export(spans, (result) => results.push(result)); + + await exporter.shutdown(); + + expect(results).toHaveLength(1); + expect(results[0].code).toBe(core.ExportResultCode.SUCCESS); + }); + + it('export after shutdown returns FAILED immediately', (done) => { + const client = { + transport: { request: jest.fn() }, + } as unknown as ElasticsearchClient; + const exporter = new ElasticsearchOtlpExporter(client); + + exporter.shutdown().then(() => { + exporter.export(spans, (result) => { + try { + expect(result.code).toBe(core.ExportResultCode.FAILED); + expect(result.error?.message).toBe('Exporter has been shut down'); + expect(client.transport.request).not.toHaveBeenCalled(); + done(); + } catch (err) { + done(err); + } + }); + }); + }); + + it('forceFlush waits for in-flight exports', async () => { + const serialized = new Uint8Array([4, 5]); + mockedSerializeRequest.mockReturnValue(serialized); + + let resolveRequest: () => void; + const request = jest.fn().mockImplementation( + () => + new Promise((resolve) => { + resolveRequest = resolve; + }) + ); + const client = { + transport: { request }, + } as unknown as ElasticsearchClient; + + const exporter = new ElasticsearchOtlpExporter(client); + const results: core.ExportResult[] = []; + exporter.export(spans, (result) => results.push(result)); + + const flushPromise = exporter.forceFlush(); + + expect(results).toHaveLength(0); + resolveRequest!(); + await flushPromise; + + expect(results).toHaveLength(1); + expect(results[0].code).toBe(core.ExportResultCode.SUCCESS); + }); + + it('forceFlush resolves immediately when no exports are pending', async () => { + const client = { + transport: { request: jest.fn() }, + } as unknown as ElasticsearchClient; + const exporter = new ElasticsearchOtlpExporter(client); + + await expect(exporter.forceFlush()).resolves.toBeUndefined(); + }); +}); diff --git a/src/platform/packages/shared/kbn-tracing/src/elasticsearch_otlp_exporter.ts b/src/platform/packages/shared/kbn-tracing/src/elasticsearch_otlp_exporter.ts new file mode 100644 index 0000000000000..e45dc962a6f40 --- /dev/null +++ b/src/platform/packages/shared/kbn-tracing/src/elasticsearch_otlp_exporter.ts @@ -0,0 +1,75 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +import type { tracing } from '@elastic/opentelemetry-node/sdk'; +import { core } from '@elastic/opentelemetry-node/sdk'; +import { ProtobufTraceSerializer } from '@opentelemetry/otlp-transformer'; +import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server'; + +const ES_OTLP_TRACES_PATH = '/_otlp/v1/traces'; +const CONTENT_TYPE_PROTOBUF = 'application/x-protobuf'; + +/** + * A {@link tracing.SpanExporter} that ships OTLP-protobuf encoded spans + * to Elasticsearch's native `/_otlp/v1/traces` endpoint via the + * ES client transport. This reuses the same connection, auth, and TLS + * settings that Kibana already has for talking to Elasticsearch. + */ +export class ElasticsearchOtlpExporter implements tracing.SpanExporter { + private readonly sendingPromises = new Set>(); + private isShutdown = false; + + constructor(private readonly client: ElasticsearchClient) {} + + export(spans: tracing.ReadableSpan[], resultCallback: (result: core.ExportResult) => void): void { + if (this.isShutdown) { + resultCallback({ + code: core.ExportResultCode.FAILED, + error: new Error('Exporter has been shut down'), + }); + return; + } + + const serialized = ProtobufTraceSerializer.serializeRequest(spans); + if (!serialized) { + resultCallback({ + code: core.ExportResultCode.FAILED, + error: new Error('Serialization failed'), + }); + return; + } + + const exportPromise = this.client.transport + .request( + { + method: 'POST', + path: ES_OTLP_TRACES_PATH, + body: Buffer.from(serialized), + }, + { + headers: { 'Content-Type': CONTENT_TYPE_PROTOBUF }, + maxRetries: 3, + } + ) + .then(() => resultCallback({ code: core.ExportResultCode.SUCCESS })) + .catch((error) => resultCallback({ code: core.ExportResultCode.FAILED, error })) + .finally(() => this.sendingPromises.delete(exportPromise)); + + this.sendingPromises.add(exportPromise); + } + + async forceFlush(): Promise { + await Promise.all(this.sendingPromises); + } + + async shutdown(): Promise { + this.isShutdown = true; + await this.forceFlush(); + } +} diff --git a/src/platform/packages/shared/kbn-tracing/src/inference_preserving_sampler.test.ts b/src/platform/packages/shared/kbn-tracing/src/inference_preserving_sampler.test.ts new file mode 100644 index 0000000000000..cf620e1252ae9 --- /dev/null +++ b/src/platform/packages/shared/kbn-tracing/src/inference_preserving_sampler.test.ts @@ -0,0 +1,124 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +import type { Attributes, Context, Link } from '@opentelemetry/api'; +import { context, propagation, SpanKind } from '@opentelemetry/api'; +import { tracing } from '@elastic/opentelemetry-node/sdk'; +import { BAGGAGE_TRACKING_BEACON_KEY, BAGGAGE_TRACKING_BEACON_VALUE } from '@kbn/inference-tracing'; +import { InferencePreservingSampler } from './inference_preserving_sampler'; + +const baseSamplingResult: tracing.SamplingResult = { + decision: tracing.SamplingDecision.NOT_RECORD, +}; + +const traceId = 'TEST_TRACE_ID'; +const spanName = 'test-span'; +const attributes: Attributes = {}; +const links: Link[] = []; + +function createInferenceContext(): Context { + const baggage = propagation.createBaggage({ + [BAGGAGE_TRACKING_BEACON_KEY]: { value: BAGGAGE_TRACKING_BEACON_VALUE }, + }); + return propagation.setBaggage(context.active(), baggage); +} + +describe('InferencePreservingSampler', () => { + const ctx: Context = context.active(); + + beforeEach(() => { + jest.restoreAllMocks(); + }); + + it('passes through when delegate returns RECORD', () => { + const delegate: tracing.Sampler = { + shouldSample: jest.fn().mockReturnValue({ + decision: tracing.SamplingDecision.RECORD, + attributes: {}, + }), + }; + const sampler = new InferencePreservingSampler(delegate); + + const result = sampler.shouldSample( + ctx, + traceId, + spanName, + SpanKind.INTERNAL, + attributes, + links + ); + + expect(result.decision).toBe(tracing.SamplingDecision.RECORD); + expect(delegate.shouldSample).toHaveBeenCalledTimes(1); + }); + + it('passes through when delegate returns RECORD_AND_SAMPLED', () => { + const delegate: tracing.Sampler = { + shouldSample: jest.fn().mockReturnValue({ + decision: tracing.SamplingDecision.RECORD_AND_SAMPLED, + attributes: {}, + }), + }; + const sampler = new InferencePreservingSampler(delegate); + + const result = sampler.shouldSample( + ctx, + traceId, + spanName, + SpanKind.INTERNAL, + attributes, + links + ); + + expect(result.decision).toBe(tracing.SamplingDecision.RECORD_AND_SAMPLED); + }); + + it('passes through when delegate returns NOT_RECORD but no inference baggage', () => { + const delegate: tracing.Sampler = { + shouldSample: jest.fn().mockReturnValue(baseSamplingResult), + }; + const sampler = new InferencePreservingSampler(delegate); + jest.spyOn(propagation, 'getBaggage').mockReturnValue(undefined); + + const result = sampler.shouldSample( + ctx, + traceId, + spanName, + SpanKind.INTERNAL, + attributes, + links + ); + + expect(result).toEqual(baseSamplingResult); + expect(result.decision).toBe(tracing.SamplingDecision.NOT_RECORD); + }); + + it('upgrades NOT_RECORD to RECORD when inference baggage is present', () => { + const delegate: tracing.Sampler = { + shouldSample: jest.fn().mockReturnValue({ + ...baseSamplingResult, + attributes: { key: 'value' }, + }), + }; + const sampler = new InferencePreservingSampler(delegate); + const inferenceCtx = createInferenceContext(); + + const result = sampler.shouldSample( + inferenceCtx, + traceId, + spanName, + SpanKind.INTERNAL, + attributes, + links + ); + + expect(result.decision).toBe(tracing.SamplingDecision.RECORD); + expect(result.attributes).toEqual({ key: 'value' }); + }); +}); diff --git a/src/platform/packages/shared/kbn-tracing/src/inference_preserving_sampler.ts b/src/platform/packages/shared/kbn-tracing/src/inference_preserving_sampler.ts new file mode 100644 index 0000000000000..768f9992ef8df --- /dev/null +++ b/src/platform/packages/shared/kbn-tracing/src/inference_preserving_sampler.ts @@ -0,0 +1,63 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +import type { Context, SpanKind, Link, Attributes } from '@opentelemetry/api'; +import { propagation } from '@opentelemetry/api'; +import { tracing } from '@elastic/opentelemetry-node/sdk'; + +import { BAGGAGE_TRACKING_BEACON_KEY, BAGGAGE_TRACKING_BEACON_VALUE } from '@kbn/inference-tracing'; + +/** + * Sampler wrapper that ensures inference spans are always recorded, even when + * the global sample rate drops them. + * + * For non-inference spans it is a transparent pass-through. + * + * For inference spans (identified by the `kibana.inference.tracing` baggage): + * - If the delegate already samples them, the decision is returned as-is. + * - If the delegate drops them (NOT_RECORD), the decision is upgraded to + * RECORD (without the SAMPLED flag). Domain-specific processors (e.g. + * AgentBuilderSpanProcessor) can then force the SAMPLED flag on a copy + * for their own export pipeline. + */ +export class InferencePreservingSampler implements tracing.Sampler { + constructor(private readonly delegate: tracing.Sampler) {} + + shouldSample( + ctx: Context, + traceId: string, + spanName: string, + spanKind: SpanKind, + attributes: Attributes, + links: Link[] + ): tracing.SamplingResult { + const result = this.delegate.shouldSample(ctx, traceId, spanName, spanKind, attributes, links); + + if (result.decision !== tracing.SamplingDecision.NOT_RECORD) { + return result; + } + + const baggage = propagation.getBaggage(ctx); + const inInferenceContext = + baggage?.getEntry(BAGGAGE_TRACKING_BEACON_KEY)?.value === BAGGAGE_TRACKING_BEACON_VALUE; + + if (!inInferenceContext) { + return result; + } + + return { + ...result, + decision: tracing.SamplingDecision.RECORD, + }; + } + + toString(): string { + return `InferencePreservingSampler{${this.delegate}}`; + } +} diff --git a/src/platform/packages/shared/kbn-tracing/src/init_tracing.ts b/src/platform/packages/shared/kbn-tracing/src/init_tracing.ts index 0e817ab2b605e..457e878bbe086 100644 --- a/src/platform/packages/shared/kbn-tracing/src/init_tracing.ts +++ b/src/platform/packages/shared/kbn-tracing/src/init_tracing.ts @@ -20,6 +20,7 @@ import { AsyncLocalStorageContextManager } from '@opentelemetry/context-async-ho import { castArray } from 'lodash'; import { cleanupBeforeExit } from '@kbn/cleanup-before-exit'; import { EvalSpanProcessor } from './eval_span_processor'; +import { InferencePreservingSampler } from './inference_preserving_sampler'; import { OTLPSpanProcessor } from './otlp_span_processor'; import { LateBindingSpanProcessor } from '..'; @@ -54,12 +55,12 @@ export function initTracing({ const traceIdSampler = new tracing.TraceIdRatioBasedSampler(tracingConfig.sample_rate); + const baseSampler = new tracing.ParentBasedSampler({ + root: traceIdSampler, + }); + const nodeTracerProvider = new node.NodeTracerProvider({ - // by default, base sampling on parent context, - // or for root spans, based on the configured sample rate - sampler: new tracing.ParentBasedSampler({ - root: traceIdSampler, - }), + sampler: new InferencePreservingSampler(baseSampler), spanProcessors: allSpanProcessors, resource, }); diff --git a/src/platform/packages/shared/kbn-tracing/tsconfig.json b/src/platform/packages/shared/kbn-tracing/tsconfig.json index 7c349d03466a2..8d9280b1d99b0 100644 --- a/src/platform/packages/shared/kbn-tracing/tsconfig.json +++ b/src/platform/packages/shared/kbn-tracing/tsconfig.json @@ -18,5 +18,6 @@ "@kbn/std", "@kbn/tracing-config", "@kbn/cleanup-before-exit", + "@kbn/core-elasticsearch-server", ] } diff --git a/x-pack/platform/packages/shared/kbn-inference-tracing/index.ts b/x-pack/platform/packages/shared/kbn-inference-tracing/index.ts index 33bf5c8d50726..a1d28b074c56e 100644 --- a/x-pack/platform/packages/shared/kbn-inference-tracing/index.ts +++ b/x-pack/platform/packages/shared/kbn-inference-tracing/index.ts @@ -9,7 +9,12 @@ export { withExecuteToolSpan } from './src/with_execute_tool_span'; export { withActiveInferenceSpan } from './src/with_active_inference_span'; export { withInferenceContext } from './src/with_inference_context'; export { GenAISemanticConventions, ElasticGenAIAttributes } from './src/types'; -export { EVAL_RUN_ID_BAGGAGE_KEY } from './src/baggage'; +export { + BAGGAGE_TRACKING_BEACON_KEY, + BAGGAGE_TRACKING_BEACON_VALUE, + EVAL_RUN_ID_BAGGAGE_KEY, +} from './src/baggage'; +export { isInferenceSpan } from './src/is_inference_span'; export { LangfuseSpanProcessor } from './src/langfuse/langfuse_span_processor'; export { PhoenixSpanProcessor } from './src/phoenix/phoenix_span_processor'; diff --git a/x-pack/platform/packages/shared/kbn-inference-tracing/src/base_inference_span_processor.ts b/x-pack/platform/packages/shared/kbn-inference-tracing/src/base_inference_span_processor.ts index 79d71be84330b..7b9e97aa46802 100644 --- a/x-pack/platform/packages/shared/kbn-inference-tracing/src/base_inference_span_processor.ts +++ b/x-pack/platform/packages/shared/kbn-inference-tracing/src/base_inference_span_processor.ts @@ -8,8 +8,8 @@ import type { api } from '@elastic/opentelemetry-node/sdk'; import { tracing } from '@elastic/opentelemetry-node/sdk'; import type { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-proto'; -import { isInInferenceContext } from './is_in_inference_context'; import { IS_ROOT_INFERENCE_SPAN_ATTRIBUTE_NAME } from './root_inference_span'; +import { isInferenceSpan } from './is_inference_span'; export abstract class BaseInferenceSpanProcessor implements tracing.SpanProcessor { private delegate: tracing.SpanProcessor; @@ -23,11 +23,7 @@ export abstract class BaseInferenceSpanProcessor implements tracing.SpanProcesso abstract processInferenceSpan(span: tracing.ReadableSpan): tracing.ReadableSpan; onStart(span: tracing.Span, parentContext: api.Context): void { - const shouldTrack = - (isInInferenceContext(parentContext) || span.instrumentationScope.name === 'inference') && - span.instrumentationScope.name !== '@elastic/transport'; - - if (shouldTrack) { + if (isInferenceSpan(span, parentContext)) { span.setAttribute('_should_track', true); this.delegate.onStart(span, parentContext); } diff --git a/x-pack/platform/packages/shared/kbn-inference-tracing/src/is_inference_span.test.ts b/x-pack/platform/packages/shared/kbn-inference-tracing/src/is_inference_span.test.ts new file mode 100644 index 0000000000000..52d88383ed0f1 --- /dev/null +++ b/x-pack/platform/packages/shared/kbn-inference-tracing/src/is_inference_span.test.ts @@ -0,0 +1,65 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { context, propagation } from '@opentelemetry/api'; +import { AsyncLocalStorageContextManager } from '@opentelemetry/context-async-hooks'; +import type { tracing } from '@elastic/opentelemetry-node/sdk'; +import { BAGGAGE_TRACKING_BEACON_KEY, BAGGAGE_TRACKING_BEACON_VALUE } from './baggage'; +import { isInferenceSpan } from './is_inference_span'; + +function createSpan(scopeName: string): tracing.Span { + const span = { + instrumentationScope: { name: scopeName }, + }; + return span as tracing.Span; +} + +describe('isInferenceSpan', () => { + let contextManager: AsyncLocalStorageContextManager; + + beforeEach(() => { + contextManager = new AsyncLocalStorageContextManager(); + context.setGlobalContextManager(contextManager); + contextManager.enable(); + }); + + afterEach(() => { + contextManager.disable(); + }); + + it('returns true when in inference context (baggage)', () => { + const baggage = propagation.createBaggage({ + [BAGGAGE_TRACKING_BEACON_KEY]: { value: BAGGAGE_TRACKING_BEACON_VALUE }, + }); + const parentContext = propagation.setBaggage(context.active(), baggage); + const span = createSpan('some.scope'); + + expect(isInferenceSpan(span, parentContext)).toBe(true); + }); + + it('returns true when instrumentationScope.name is inference', () => { + const span = createSpan('inference'); + + expect(isInferenceSpan(span, context.active())).toBe(true); + }); + + it('returns false when instrumentationScope.name is @elastic/transport', () => { + const baggage = propagation.createBaggage({ + [BAGGAGE_TRACKING_BEACON_KEY]: { value: BAGGAGE_TRACKING_BEACON_VALUE }, + }); + const parentContext = propagation.setBaggage(context.active(), baggage); + const span = createSpan('@elastic/transport'); + + expect(isInferenceSpan(span, parentContext)).toBe(false); + }); + + it('returns false for non-inference spans outside inference context', () => { + const span = createSpan('http'); + + expect(isInferenceSpan(span, context.active())).toBe(false); + }); +}); diff --git a/x-pack/platform/packages/shared/kbn-inference-tracing/src/is_inference_span.ts b/x-pack/platform/packages/shared/kbn-inference-tracing/src/is_inference_span.ts new file mode 100644 index 0000000000000..7a28b515bfe8a --- /dev/null +++ b/x-pack/platform/packages/shared/kbn-inference-tracing/src/is_inference_span.ts @@ -0,0 +1,17 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { api } from '@elastic/opentelemetry-node/sdk'; +import type { tracing } from '@elastic/opentelemetry-node/sdk'; +import { isInInferenceContext } from './is_in_inference_context'; + +export function isInferenceSpan(span: tracing.Span, parentContext: api.Context): boolean { + return ( + (isInInferenceContext(parentContext) || span.instrumentationScope.name === 'inference') && + span.instrumentationScope.name !== '@elastic/transport' + ); +} diff --git a/x-pack/platform/plugins/shared/agent_builder/moon.yml b/x-pack/platform/plugins/shared/agent_builder/moon.yml index f4f954fb1772d..033b9e01443a4 100644 --- a/x-pack/platform/plugins/shared/agent_builder/moon.yml +++ b/x-pack/platform/plugins/shared/agent_builder/moon.yml @@ -51,6 +51,7 @@ dependsOn: - '@kbn/core-ui-settings-server' - '@kbn/utility-types' - '@kbn/inference-tracing' + - '@kbn/tracing' - '@kbn/std' - '@kbn/esql-language' - '@kbn/esql-utils' diff --git a/x-pack/platform/plugins/shared/agent_builder/server/config.ts b/x-pack/platform/plugins/shared/agent_builder/server/config.ts index 74bcecef8bb3d..fefd6eb53277e 100644 --- a/x-pack/platform/plugins/shared/agent_builder/server/config.ts +++ b/x-pack/platform/plugins/shared/agent_builder/server/config.ts @@ -8,6 +8,13 @@ import type { PluginConfigDescriptor } from '@kbn/core/server'; import { schema, type TypeOf } from '@kbn/config-schema'; +const scheduledDelay = schema.conditional( + schema.contextRef('dev'), + true, + schema.number({ defaultValue: 1000, min: 50 }), + schema.number({ defaultValue: 5000, min: 50 }) +); + export const configSchema = schema.object({ enabled: schema.boolean({ defaultValue: true }), githubBaseUrl: schema.string({ defaultValue: 'https://github.com' }), @@ -15,6 +22,17 @@ export const configSchema = schema.object({ numSnippets: schema.number({ defaultValue: 2, min: 1, max: 10 }), numWords: schema.number({ defaultValue: 750, min: 1, max: 5000 }), }), + tracing: schema.object({ + send_to_self: schema.boolean({ defaultValue: true }), + exporters: schema.arrayOf( + schema.object({ + url: schema.uri({ scheme: ['http', 'https'] }), + headers: schema.maybe(schema.recordOf(schema.string(), schema.string())), + }), + { defaultValue: [] } + ), + scheduledDelay, + }), }); export type AgentBuilderConfig = TypeOf; diff --git a/x-pack/platform/plugins/shared/agent_builder/server/plugin.ts b/x-pack/platform/plugins/shared/agent_builder/server/plugin.ts index 71e10b7e65379..31bffd60b9934 100644 --- a/x-pack/platform/plugins/shared/agent_builder/server/plugin.ts +++ b/x-pack/platform/plugins/shared/agent_builder/server/plugin.ts @@ -10,6 +10,7 @@ import type { Logger } from '@kbn/logging'; import type { UsageCounter } from '@kbn/usage-collection-plugin/server'; import type { HomeServerPluginSetup } from '@kbn/home-plugin/server'; import type { AgentBuilderConfig } from './config'; +import { registerTracingExporter } from './tracing/register_tracing'; import { ServiceManager } from './services'; import type { AgentBuilderPluginSetup, @@ -53,6 +54,7 @@ export class AgentBuilderPlugin private trackingService?: TrackingService; private analyticsService?: AnalyticsService; private home: HomeServerPluginSetup | null = null; + private teardownTracing?: () => Promise; private startDeps?: AgentBuilderStartDependencies; constructor(context: PluginInitializerContext) { this.logger = context.logger.get(); @@ -207,6 +209,13 @@ export class AgentBuilderPlugin start(coreStart: CoreStart, startDeps: AgentBuilderStartDependencies): AgentBuilderPluginStart { this.startDeps = startDeps; + void registerTracingExporter({ + core: coreStart, + tracingConfig: this.config.tracing, + logger: this.logger.get('tracing'), + }).then((teardownTracing) => { + this.teardownTracing = teardownTracing; + }); const { inference, spaces, actions, taskManager, searchInferenceEndpoints } = startDeps; const { elasticsearch, security, uiSettings, savedObjects, dataStreams, featureFlags } = coreStart; @@ -285,6 +294,9 @@ export class AgentBuilderPlugin }; } + async stop() { + await this.teardownTracing?.(); + } /** * Remove orphaned SML crawler task instances from older scheduled-task id prefixes. * Safe on every start — uses a single `bulkRemove` for the known legacy instance ids. @@ -303,6 +315,4 @@ export class AgentBuilderPlugin logger.warn(`Failed to remove legacy SML crawler tasks: ${(error as Error).message}`); } } - - stop() {} } diff --git a/x-pack/platform/plugins/shared/agent_builder/server/services/plugins/plugin_service.test.ts b/x-pack/platform/plugins/shared/agent_builder/server/services/plugins/plugin_service.test.ts index 1ae1299ca8262..86f20fbf9e82e 100644 --- a/x-pack/platform/plugins/shared/agent_builder/server/services/plugins/plugin_service.test.ts +++ b/x-pack/platform/plugins/shared/agent_builder/server/services/plugins/plugin_service.test.ts @@ -160,6 +160,7 @@ describe('PluginsService', () => { enabled: true, githubBaseUrl: 'https://github.com', topSnippets: { numSnippets: 2, numWords: 750 }, + tracing: { send_to_self: true, exporters: [], scheduledDelay: 1000 }, }, analyticsService: mockAnalyticsService as unknown as AnalyticsService, trackingService: mockTrackingService as unknown as TrackingService, @@ -556,6 +557,7 @@ describe('PluginsService', () => { enabled: true, githubBaseUrl: 'https://github.com', topSnippets: { numSnippets: 2, numWords: 750 }, + tracing: { send_to_self: true, exporters: [], scheduledDelay: 1000 }, }, analyticsService: mockAnalyticsService as unknown as AnalyticsService, }); diff --git a/x-pack/platform/plugins/shared/agent_builder/server/services/tools/tools_services.test.ts b/x-pack/platform/plugins/shared/agent_builder/server/services/tools/tools_services.test.ts index dfc452c1ed21d..b0446c612fe0d 100644 --- a/x-pack/platform/plugins/shared/agent_builder/server/services/tools/tools_services.test.ts +++ b/x-pack/platform/plugins/shared/agent_builder/server/services/tools/tools_services.test.ts @@ -39,6 +39,7 @@ describe('ToolsService', () => { enabled: true, githubBaseUrl: 'https://github.com', topSnippets: { numSnippets: 2, numWords: 750 }, + tracing: { send_to_self: true, exporters: [], scheduledDelay: 1000 }, }, }); @@ -54,6 +55,7 @@ describe('ToolsService', () => { enabled: true, githubBaseUrl: 'https://github.com', topSnippets: { numSnippets: 2, numWords: 750 }, + tracing: { send_to_self: true, exporters: [], scheduledDelay: 1000 }, }, }); diff --git a/x-pack/platform/plugins/shared/agent_builder/server/tracing/agent_builder_span_processor.test.ts b/x-pack/platform/plugins/shared/agent_builder/server/tracing/agent_builder_span_processor.test.ts new file mode 100644 index 0000000000000..37b6053c7523a --- /dev/null +++ b/x-pack/platform/plugins/shared/agent_builder/server/tracing/agent_builder_span_processor.test.ts @@ -0,0 +1,233 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { Attributes } from '@opentelemetry/api'; +import { context, propagation, TraceFlags } from '@opentelemetry/api'; +import { AsyncLocalStorageContextManager } from '@opentelemetry/context-async-hooks'; +import type { tracing } from '@elastic/opentelemetry-node/sdk'; +import { tracing as elasticTracing } from '@elastic/opentelemetry-node/sdk'; +import { BAGGAGE_TRACKING_BEACON_KEY, BAGGAGE_TRACKING_BEACON_VALUE } from '@kbn/inference-tracing'; +import { AgentBuilderSpanProcessor } from './agent_builder_span_processor'; + +const SHOULD_TRACK_ATTR = '_agent_builder_should_track'; + +const emptyResource = { + attributes: {}, + merge: jest.fn(), + getRawAttributes: jest.fn().mockReturnValue([]), +}; + +describe('AgentBuilderSpanProcessor', () => { + let contextManager: AsyncLocalStorageContextManager; + const mockBatch: tracing.SpanProcessor = { + onStart: jest.fn(), + onEnd: jest.fn(), + forceFlush: jest.fn, []>().mockResolvedValue(undefined), + shutdown: jest.fn, []>().mockResolvedValue(undefined), + }; + + beforeEach(() => { + contextManager = new AsyncLocalStorageContextManager(); + context.setGlobalContextManager(contextManager); + contextManager.enable(); + + jest + .spyOn(elasticTracing, 'BatchSpanProcessor') + .mockReturnValue(mockBatch as elasticTracing.BatchSpanProcessor); + + (mockBatch.onStart as jest.Mock).mockClear(); + (mockBatch.onEnd as jest.Mock).mockClear(); + (mockBatch.forceFlush as jest.Mock).mockClear(); + (mockBatch.shutdown as jest.Mock).mockClear(); + (mockBatch.forceFlush as jest.Mock, []>).mockResolvedValue(undefined); + (mockBatch.shutdown as jest.Mock, []>).mockResolvedValue(undefined); + }); + + afterEach(() => { + jest.restoreAllMocks(); + contextManager.disable(); + }); + + function inferenceParentContext(): ReturnType { + const baggage = propagation.createBaggage({ + [BAGGAGE_TRACKING_BEACON_KEY]: { value: BAGGAGE_TRACKING_BEACON_VALUE }, + }); + return propagation.setBaggage(context.active(), baggage); + } + + function createMockSpan(scopeName: string): tracing.Span { + const spanCtx = { + traceId: 't'.repeat(32), + spanId: 's'.repeat(16), + traceFlags: TraceFlags.NONE, + }; + const span: tracing.Span & tracing.ReadableSpan = { + name: 'test', + kind: 0, + startTime: [0, 0], + endTime: [0, 0], + status: { code: 0 }, + resource: emptyResource, + instrumentationScope: { name: scopeName }, + duration: [0, 0], + ended: true, + events: [], + links: [], + parentSpanContext: undefined, + droppedAttributesCount: 0, + droppedEventsCount: 0, + droppedLinksCount: 0, + attributes: {}, + spanContext: jest.fn().mockReturnValue(spanCtx), + setAttribute: jest.fn(), + setAttributes: jest.fn(), + addEvent: jest.fn(), + addLink: jest.fn(), + addLinks: jest.fn(), + setStatus: jest.fn(), + updateName: jest.fn(), + end: jest.fn(), + isRecording: jest.fn().mockReturnValue(true), + recordException: jest.fn(), + }; + return span; + } + + function createMockReadableSpan(attrs: Attributes): tracing.ReadableSpan { + const readable: tracing.ReadableSpan = { + name: 'test-span', + kind: 0, + startTime: [0, 0], + endTime: [0, 0], + status: { code: 0 }, + resource: emptyResource, + instrumentationScope: { name: 'test' }, + duration: [0, 0], + ended: true, + events: [], + links: [], + parentSpanContext: undefined, + droppedAttributesCount: 0, + droppedEventsCount: 0, + droppedLinksCount: 0, + attributes: attrs, + spanContext: () => ({ + traceId: 't'.repeat(32), + spanId: 's'.repeat(16), + traceFlags: TraceFlags.NONE, + }), + }; + return readable; + } + + function createExporter(): tracing.SpanExporter { + return { + export: jest.fn(), + shutdown: jest.fn, []>().mockResolvedValue(undefined), + forceFlush: jest.fn, []>().mockResolvedValue(undefined), + }; + } + + it('onStart marks inference spans with attribute when enabled', async () => { + const processor = new AgentBuilderSpanProcessor({ + exporter: createExporter(), + scheduledDelayMillis: 1, + isEnabled: () => true, + }); + + const span = createMockSpan('inference'); + const parentContext = inferenceParentContext(); + await processor.onStart(span, parentContext); + + expect(span.setAttribute).toHaveBeenCalledWith(SHOULD_TRACK_ATTR, true); + expect(mockBatch.onStart).toHaveBeenCalledWith(span, parentContext); + }); + + it('onStart skips non-inference spans', async () => { + const processor = new AgentBuilderSpanProcessor({ + exporter: createExporter(), + scheduledDelayMillis: 1, + }); + + const span = createMockSpan('http'); + await processor.onStart(span, context.active()); + + expect(span.setAttribute).not.toHaveBeenCalled(); + expect(mockBatch.onStart).not.toHaveBeenCalled(); + }); + + it('onStart skips when isEnabled returns false', async () => { + const processor = new AgentBuilderSpanProcessor({ + exporter: createExporter(), + scheduledDelayMillis: 1, + isEnabled: () => false, + }); + + const span = createMockSpan('inference'); + await processor.onStart(span, inferenceParentContext()); + + expect(span.setAttribute).not.toHaveBeenCalled(); + expect(mockBatch.onStart).not.toHaveBeenCalled(); + }); + + it('onEnd skips spans without the tracking attribute', () => { + const processor = new AgentBuilderSpanProcessor({ + exporter: createExporter(), + scheduledDelayMillis: 1, + }); + + const readable = createMockReadableSpan({}); + processor.onEnd(readable); + + expect(mockBatch.onEnd).not.toHaveBeenCalled(); + }); + + it('onEnd creates a copy with SAMPLED flag and data_stream.dataset', () => { + const processor = new AgentBuilderSpanProcessor({ + exporter: createExporter(), + scheduledDelayMillis: 1, + }); + + const readable = createMockReadableSpan({ + [SHOULD_TRACK_ATTR]: true, + existing: 'keep-me', + }); + + processor.onEnd(readable); + + expect(mockBatch.onEnd).toHaveBeenCalledTimes(1); + const exported = (mockBatch.onEnd as jest.Mock).mock.calls[0][0] as tracing.ReadableSpan; + expect(exported.attributes).toEqual({ + existing: 'keep-me', + 'data_stream.dataset': 'agent_builder', + }); + expect(exported.spanContext().traceFlags).toBe(TraceFlags.SAMPLED); + expect(SHOULD_TRACK_ATTR in exported.attributes).toBe(false); + }); + + it('forceFlush delegates to batch processor', async () => { + const processor = new AgentBuilderSpanProcessor({ + exporter: createExporter(), + scheduledDelayMillis: 1, + }); + + await processor.forceFlush(); + + expect(mockBatch.forceFlush).toHaveBeenCalledTimes(1); + }); + + it('shutdown delegates to batch processor', async () => { + const processor = new AgentBuilderSpanProcessor({ + exporter: createExporter(), + scheduledDelayMillis: 1, + }); + + await processor.shutdown(); + + expect(mockBatch.shutdown).toHaveBeenCalledTimes(1); + }); +}); diff --git a/x-pack/platform/plugins/shared/agent_builder/server/tracing/agent_builder_span_processor.ts b/x-pack/platform/plugins/shared/agent_builder/server/tracing/agent_builder_span_processor.ts new file mode 100644 index 0000000000000..f342e2a607e05 --- /dev/null +++ b/x-pack/platform/plugins/shared/agent_builder/server/tracing/agent_builder_span_processor.ts @@ -0,0 +1,82 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { api } from '@elastic/opentelemetry-node/sdk'; +import { tracing } from '@elastic/opentelemetry-node/sdk'; +import { TraceFlags } from '@opentelemetry/api'; +import { isInferenceSpan } from '@kbn/inference-tracing'; + +const SHOULD_TRACK_ATTR = '_agent_builder_should_track'; + +interface AgentBuilderSpanProcessorOpts { + exporter: tracing.SpanExporter; + scheduledDelayMillis: number; + isEnabled?: () => boolean; +} + +/** + * Span processor that exports Agent Builder inference spans. + * + * This processor forces the SAMPLED trace flag on the span copy before + * passing it to BatchSpanProcessor.onEnd. This is necessary because + * InferencePreservingSampler upgrades dropped inference spans to RECORD + * (not RECORD_AND_SAMPLED), so BatchSpanProcessor would otherwise skip + * them. By setting the flag on a copy, we ensure export without affecting + * the original span or other processors. + */ +export class AgentBuilderSpanProcessor implements tracing.SpanProcessor { + private readonly batchProcessor: tracing.SpanProcessor; + private readonly isEnabled: () => boolean; + + constructor(opts: AgentBuilderSpanProcessorOpts) { + this.batchProcessor = new tracing.BatchSpanProcessor(opts.exporter, { + scheduledDelayMillis: opts.scheduledDelayMillis, + }); + this.isEnabled = opts.isEnabled ?? (() => true); + } + + async onStart(span: tracing.Span, parentContext: api.Context): Promise { + if (!this.isEnabled()) { + return; + } + if (isInferenceSpan(span, parentContext)) { + span.setAttribute(SHOULD_TRACK_ATTR, true); + this.batchProcessor.onStart(span, parentContext); + } + } + + onEnd(span: tracing.ReadableSpan): void { + if (!span.attributes[SHOULD_TRACK_ATTR]) { + return; + } + + const { [SHOULD_TRACK_ATTR]: _, ...cleanAttributes } = span.attributes; + const originalSpanContext = span.spanContext(); + + const exportSpan: tracing.ReadableSpan = { + ...span, + spanContext: () => ({ + ...originalSpanContext, + traceFlags: TraceFlags.SAMPLED, // force 100% sampling + }), + attributes: { + ...cleanAttributes, + 'data_stream.dataset': 'agent_builder', + }, + }; + + this.batchProcessor.onEnd(exportSpan); + } + + forceFlush(): Promise { + return this.batchProcessor.forceFlush(); + } + + shutdown(): Promise { + return this.batchProcessor.shutdown(); + } +} diff --git a/x-pack/platform/plugins/shared/agent_builder/server/tracing/index.ts b/x-pack/platform/plugins/shared/agent_builder/server/tracing/index.ts index 4a90b285401c1..4d456ed15e166 100644 --- a/x-pack/platform/plugins/shared/agent_builder/server/tracing/index.ts +++ b/x-pack/platform/plugins/shared/agent_builder/server/tracing/index.ts @@ -5,6 +5,7 @@ * 2.0. */ +export { AgentBuilderSpanProcessor } from './agent_builder_span_processor'; export { withAgentSpan } from './with_agent_span'; export { withConverseSpan } from './with_converse_span'; export { getCurrentTraceId } from './get_current_trace_id'; diff --git a/x-pack/platform/plugins/shared/agent_builder/server/tracing/register_tracing.test.ts b/x-pack/platform/plugins/shared/agent_builder/server/tracing/register_tracing.test.ts new file mode 100644 index 0000000000000..e5beaf91b27fb --- /dev/null +++ b/x-pack/platform/plugins/shared/agent_builder/server/tracing/register_tracing.test.ts @@ -0,0 +1,236 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-proto'; +import { ElasticsearchOtlpExporter } from '@kbn/tracing'; +import { LateBindingSpanProcessor } from '@kbn/tracing'; +import { coreMock } from '@kbn/core/server/mocks'; +import { loggerMock } from '@kbn/logging-mocks'; +import type { AgentBuilderConfig } from '../config'; +import { registerTracingExporter } from './register_tracing'; +import { AgentBuilderSpanProcessor } from './agent_builder_span_processor'; + +jest.mock('@kbn/core/server', () => { + const actual = jest.requireActual('@kbn/core/server'); + return { + ...actual, + SavedObjectsClient: jest.fn(() => ({})), + }; +}); + +jest.mock('lru-cache', () => ({ + LRUCache: jest.fn().mockImplementation((options: { fetchMethod: () => Promise }) => { + let stored: boolean | undefined; + + const refresh = () => + options.fetchMethod().then((v) => { + stored = v; + return v; + }); + + return { + fetch: jest.fn(() => refresh()), + get: jest.fn(() => stored), + }; + }), +})); + +jest.mock('@kbn/tracing', () => ({ + LateBindingSpanProcessor: { + register: jest.fn(() => jest.fn().mockResolvedValue(undefined)), + }, + ElasticsearchOtlpExporter: jest.fn(), +})); + +jest.mock('@opentelemetry/exporter-trace-otlp-proto', () => ({ + OTLPTraceExporter: jest.fn(), +})); + +jest.mock('./agent_builder_span_processor', () => ({ + AgentBuilderSpanProcessor: jest.fn(), +})); + +type TracingConfig = AgentBuilderConfig['tracing']; + +const MockedOtlpExporter = OTLPTraceExporter as jest.MockedClass; +const MockedEsOtlpExporter = ElasticsearchOtlpExporter as jest.MockedClass< + typeof ElasticsearchOtlpExporter +>; +const MockedAgentBuilderProcessor = AgentBuilderSpanProcessor as jest.MockedClass< + typeof AgentBuilderSpanProcessor +>; + +const flushPromises = () => new Promise((resolve) => setImmediate(resolve)); + +describe('registerTracingExporter', () => { + const logger = loggerMock.create(); + + afterEach(async () => { + await flushPromises(); + }); + + function createCore() { + const core = coreMock.createStart(); + const scopedUiSettings = jest.mocked(core.uiSettings.asScopedToClient(jest.fn() as never)); + scopedUiSettings.get.mockResolvedValue(true); + return core; + } + + beforeEach(() => { + jest.clearAllMocks(); + }); + + it('returns undefined when no exporters are configured', async () => { + const coreStart = createCore(); + const tracingConfig: TracingConfig = { + send_to_self: false, + exporters: [], + scheduledDelay: 1000, + }; + + const result = await registerTracingExporter({ + core: coreStart, + tracingConfig, + logger, + }); + + expect(result).toBeUndefined(); + expect(LateBindingSpanProcessor.register).not.toHaveBeenCalled(); + }); + + it('creates OTLPTraceExporter when exporters with url are configured', async () => { + const coreStart = createCore(); + const tracingConfig: TracingConfig = { + send_to_self: false, + exporters: [ + { + url: 'http://otel-collector:4318/v1/traces', + headers: { Authorization: 'Bearer token' }, + }, + ], + scheduledDelay: 750, + }; + + await registerTracingExporter({ + core: coreStart, + tracingConfig, + logger, + }); + + expect(MockedOtlpExporter).toHaveBeenCalledWith({ + url: 'http://otel-collector:4318/v1/traces', + headers: { Authorization: 'Bearer token' }, + }); + expect(MockedEsOtlpExporter).not.toHaveBeenCalled(); + }); + + it('creates ElasticsearchOtlpExporter when send_to_self is true', async () => { + const coreStart = createCore(); + const tracingConfig: TracingConfig = { + send_to_self: true, + exporters: [], + scheduledDelay: 500, + }; + + await registerTracingExporter({ + core: coreStart, + tracingConfig, + logger, + }); + + expect(MockedEsOtlpExporter).toHaveBeenCalledWith( + coreStart.elasticsearch.client.asInternalUser + ); + expect(MockedOtlpExporter).not.toHaveBeenCalled(); + }); + + it('registers processor via LateBindingSpanProcessor', async () => { + const coreStart = createCore(); + const tracingConfig: TracingConfig = { + send_to_self: true, + exporters: [], + scheduledDelay: 250, + }; + + await registerTracingExporter({ + core: coreStart, + tracingConfig, + logger, + }); + + expect(LateBindingSpanProcessor.register).toHaveBeenCalledTimes(1); + expect(MockedAgentBuilderProcessor).toHaveBeenCalledTimes(1); + const [registeredProcessor] = jest.mocked(LateBindingSpanProcessor.register).mock.calls[0]; + expect(registeredProcessor).toBe(MockedAgentBuilderProcessor.mock.instances[0]); + }); + + it('createCachedIsEnabled returns true after registerTracingExporter resolves', async () => { + const coreStart = createCore(); + const tracingConfig: TracingConfig = { + send_to_self: true, + exporters: [], + scheduledDelay: 100, + }; + + await registerTracingExporter({ + core: coreStart, + tracingConfig, + logger, + }); + + const ctorOpts = MockedAgentBuilderProcessor.mock.calls[0][0]; + const { isEnabled } = ctorOpts; + expect(isEnabled!()).toBe(true); + }); + + it('refreshes the cache value when setting changes', async () => { + const coreStart = createCore(); + const scopedUiSettings = jest.mocked(coreStart.uiSettings.asScopedToClient(jest.fn() as never)); + scopedUiSettings.get.mockResolvedValue(true); + + const tracingConfig: TracingConfig = { + send_to_self: true, + exporters: [], + scheduledDelay: 100, + }; + + await registerTracingExporter({ core: coreStart, tracingConfig, logger }); + + const { isEnabled } = MockedAgentBuilderProcessor.mock.calls[0][0]; + expect(isEnabled!()).toBe(true); + + scopedUiSettings.get.mockResolvedValue(false); + isEnabled!(); + await flushPromises(); + + expect(isEnabled!()).toBe(false); + }); + + it('logs error when fetch rejects', async () => { + const coreStart = createCore(); + const scopedUiSettings = jest.mocked(coreStart.uiSettings.asScopedToClient(jest.fn() as never)); + scopedUiSettings.get.mockResolvedValue(true); + + const tracingConfig: TracingConfig = { + send_to_self: true, + exporters: [], + scheduledDelay: 100, + }; + + await registerTracingExporter({ core: coreStart, tracingConfig, logger }); + + scopedUiSettings.get.mockRejectedValue(new Error('SO unavailable')); + + const { isEnabled } = MockedAgentBuilderProcessor.mock.calls[0][0]; + isEnabled!(); + await flushPromises(); + + expect(logger.error).toHaveBeenCalledWith( + expect.stringContaining('Failed to refresh tracing settings') + ); + }); +}); diff --git a/x-pack/platform/plugins/shared/agent_builder/server/tracing/register_tracing.ts b/x-pack/platform/plugins/shared/agent_builder/server/tracing/register_tracing.ts new file mode 100644 index 0000000000000..a3796bef3f7f6 --- /dev/null +++ b/x-pack/platform/plugins/shared/agent_builder/server/tracing/register_tracing.ts @@ -0,0 +1,107 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { CoreStart } from '@kbn/core/server'; +import type { Logger } from '@kbn/logging'; +import type { tracing } from '@elastic/opentelemetry-node/sdk'; +import { SavedObjectsClient } from '@kbn/core/server'; +import { LateBindingSpanProcessor, ElasticsearchOtlpExporter } from '@kbn/tracing'; +import { AGENT_BUILDER_EXPERIMENTAL_FEATURES_SETTING_ID } from '@kbn/management-settings-ids'; +import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-proto'; +import { LRUCache } from 'lru-cache'; +import type { AgentBuilderConfig } from '../config'; +import { AgentBuilderSpanProcessor } from './agent_builder_span_processor'; + +const SETTING_CACHE_TTL_MS = 30_000; + +/** + * Returns a synchronous `isEnabled()` function backed by an LRU cache with + * stale-while-revalidate semantics. + * We need the cache to prevent calling the async uiSettings read on the hot path. + * + * The span processor hot-path requires also required synchronous check, but the underlying uiSettings read is async. + * The cache with `allowStale: true` ensures `isEnabled()` always returns instantly + * (stale or fresh) while a background fetch refreshes the value every {@link SETTING_CACHE_TTL_MS} ms. + */ +const createCachedIsEnabled = async (core: CoreStart, logger: Logger): Promise<() => boolean> => { + const cache = new LRUCache({ + max: 1, + ttl: SETTING_CACHE_TTL_MS, + allowStale: true, + noDeleteOnStaleGet: true, + noDeleteOnFetchRejection: true, + fetchMethod: async () => { + const internalRepo = core.savedObjects.createInternalRepository(); + const internalClient = new SavedObjectsClient(internalRepo); + return core.uiSettings + .asScopedToClient(internalClient) + .get(AGENT_BUILDER_EXPERIMENTAL_FEATURES_SETTING_ID); + }, + }); + + // Eagerly populate the cache so the first synchronous isEnabled() call has a value + await cache.fetch('enabled').catch((error) => { + logger.error(`Failed to fetch tracing settings: ${error.message}`); + }); + + return () => { + // Stale-while-revalidate: trigger a background refresh when the entry is past TTL. + void cache.fetch('enabled').catch((error) => { + logger.error(`Failed to refresh tracing settings: ${error.message}`); + }); + return cache.get('enabled') ?? false; + }; +}; + +const buildExporters = ( + core: CoreStart, + tracingConfig: AgentBuilderConfig['tracing'] +): tracing.SpanExporter[] => { + return [ + ...(tracingConfig.send_to_self + ? [new ElasticsearchOtlpExporter(core.elasticsearch.client.asInternalUser)] + : []), + ...tracingConfig.exporters.map( + ({ url, headers }) => + new OTLPTraceExporter({ + url, + ...(headers ? { headers } : {}), + }) + ), + ]; +}; + +export const registerTracingExporter = async ({ + core, + tracingConfig, + logger, +}: { + core: CoreStart; + tracingConfig: AgentBuilderConfig['tracing']; + logger: Logger; +}): Promise<(() => Promise) | undefined> => { + const exporters = buildExporters(core, tracingConfig); + + if (exporters.length === 0) { + return undefined; + } + + const isEnabled = await createCachedIsEnabled(core, logger); + + const tearDowns = exporters.map((exporter) => { + const processor = new AgentBuilderSpanProcessor({ + exporter, + scheduledDelayMillis: tracingConfig.scheduledDelay, + isEnabled, + }); + return LateBindingSpanProcessor.register(processor); + }); + + return async () => { + await Promise.all(tearDowns.map((teardown) => teardown())); + }; +}; diff --git a/x-pack/platform/plugins/shared/agent_builder/tsconfig.json b/x-pack/platform/plugins/shared/agent_builder/tsconfig.json index 5be6215651e99..86dba8e3a7bee 100644 --- a/x-pack/platform/plugins/shared/agent_builder/tsconfig.json +++ b/x-pack/platform/plugins/shared/agent_builder/tsconfig.json @@ -40,6 +40,7 @@ "@kbn/core-ui-settings-server", "@kbn/utility-types", "@kbn/inference-tracing", + "@kbn/tracing", "@kbn/std", "@kbn/esql-language", "@kbn/esql-utils",