Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,32 +5,39 @@
* 2.0.
*/

import agent from 'elastic-apm-node';
import { SpanStatusCode } from '@opentelemetry/api';
import { ApmMiddleware } from './apm_middleware';
import { createRuleExecutionMiddlewareContext } from './test_utils';
import { collectStreamResults, createPipelineStream, createRulePipelineState } from '../test_utils';
import { getDefaultTracer } from '@kbn/default-tracer';

jest.mock('elastic-apm-node', () => ({
startSpan: jest.fn(),
const mockSpan = {
setStatus: jest.fn(),
recordException: jest.fn(),
end: jest.fn(),
isRecording: jest.fn().mockReturnValue(true),
};

const mockStartSpan = jest.fn(() => mockSpan);

jest.mock('@kbn/default-tracer', () => ({
getDefaultTracer: jest.fn(() => ({
startSpan: mockStartSpan,
})),
}));

const agentMock = agent as jest.Mocked<typeof agent>;
const getDefaultTracerMock = getDefaultTracer as jest.MockedFunction<typeof getDefaultTracer>;

describe('ApmMiddleware', () => {
let middleware: ApmMiddleware;
let mockSpan: { addLabels: jest.Mock; outcome: string | undefined; end: jest.Mock };

beforeEach(() => {
middleware = new ApmMiddleware();
mockSpan = { addLabels: jest.fn(), outcome: undefined, end: jest.fn() };
agentMock.startSpan.mockReturnValue(mockSpan as never);
});

afterEach(() => {
jest.clearAllMocks();
mockSpan.isRecording.mockReturnValue(true);
});

it('wraps the stream in an APM span and sets success outcome', async () => {
it('wraps the stream in an OpenTelemetry span and sets success status', async () => {
const state = createRulePipelineState();
const context = createRuleExecutionMiddlewareContext();
const next = jest.fn().mockReturnValue(createPipelineStream([state]));
Expand All @@ -40,13 +47,14 @@ describe('ApmMiddleware', () => {
);

expect(results).toEqual([{ type: 'continue', state }]);
expect(agentMock.startSpan).toHaveBeenCalledWith('rule_executor:test_step', 'rule_executor');
expect(mockSpan.addLabels).toHaveBeenCalledWith({ plugin: 'alerting_v2' });
expect(mockSpan.outcome).toBe('success');
expect(mockStartSpan).toHaveBeenCalledWith('rule_executor:test_step', {
attributes: { plugin: 'alerting_v2' },
});
expect(mockSpan.setStatus).toHaveBeenCalledWith({ code: SpanStatusCode.OK });
expect(mockSpan.end).toHaveBeenCalledTimes(1);
});

it('sets failure outcome and ends span when stream throws', async () => {
it('sets error status and ends span when stream throws', async () => {
const context = createRuleExecutionMiddlewareContext();
const error = new Error('stream error');

Expand All @@ -60,7 +68,11 @@ describe('ApmMiddleware', () => {
collectStreamResults(middleware.execute(context, next, createPipelineStream()))
).rejects.toThrow('stream error');

expect(mockSpan.outcome).toBe('failure');
expect(mockSpan.recordException).toHaveBeenCalledWith(error);
expect(mockSpan.setStatus).toHaveBeenCalledWith({
code: SpanStatusCode.ERROR,
message: 'stream error',
});
expect(mockSpan.end).toHaveBeenCalledTimes(1);
});

Expand All @@ -75,12 +87,12 @@ describe('ApmMiddleware', () => {
);

expect(results).toHaveLength(2);
expect(mockSpan.outcome).toBe('success');
expect(mockSpan.setStatus).toHaveBeenCalledWith({ code: SpanStatusCode.OK });
expect(mockSpan.end).toHaveBeenCalledTimes(1);
});

it('handles null span gracefully when agent is not started', async () => {
agentMock.startSpan.mockReturnValue(null as never);
it('handles undefined tracer gracefully when tracer is not available', async () => {
getDefaultTracerMock.mockReturnValueOnce(undefined as never);

const state = createRulePipelineState();
const context = createRuleExecutionMiddlewareContext();
Expand All @@ -91,7 +103,6 @@ describe('ApmMiddleware', () => {
);

expect(results).toEqual([{ type: 'continue', state }]);
expect(mockSpan.addLabels).not.toHaveBeenCalled();
expect(mockSpan.end).not.toHaveBeenCalled();
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@
* 2.0.
*/

import agent from 'elastic-apm-node';
import { SpanStatusCode } from '@opentelemetry/api';
import { getDefaultTracer } from '@kbn/default-tracer';
import { injectable } from 'inversify';
import type { RuleExecutionMiddlewareContext, RuleExecutionMiddleware } from './types';
import type { PipelineStateStream } from '../types';
import { APP_ID } from '../../constants';

/**
* Middleware that wraps each step's stream processing in an APM span for tracing.
* Middleware that wraps each step's stream processing in an OpenTelemetry span for tracing.
*
* The span stays open for the entire duration of the step's stream,
* capturing both success and failure outcomes.
Expand All @@ -27,22 +28,28 @@ export class ApmMiddleware implements RuleExecutionMiddleware {
input: PipelineStateStream
): PipelineStateStream {
const stream = next(input);
const tracer = getDefaultTracer();

return (async function* () {
const span = agent.startSpan(`rule_executor:${ctx.step.name}`, 'rule_executor') ?? undefined;
span?.addLabels({ plugin: APP_ID });
const span = tracer?.startSpan(`rule_executor:${ctx.step.name}`, {
attributes: { plugin: APP_ID },
});

try {
for await (const result of stream) {
yield result;
}

if (span) {
span.outcome = 'success';
if (span?.isRecording()) {
span.setStatus({ code: SpanStatusCode.OK });
}
} catch (error) {
if (span) {
span.outcome = 'failure';
if (span?.isRecording()) {
span.recordException(error as Error);
span.setStatus({
code: SpanStatusCode.ERROR,
message: error instanceof Error ? error.message : String(error),
});
}

throw error;
Expand Down
1 change: 1 addition & 0 deletions x-pack/platform/plugins/shared/alerting_v2/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
"@kbn/test",
"@kbn/core-saved-objects-api-server-mocks",
"@kbn/apm-utils",
"@kbn/default-tracer",
"@kbn/core-user-profile-common",
"@kbn/core-user-profile-server-mocks",
"@kbn/core-user-profile-server",
Expand Down
Loading