Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 42 additions & 1 deletion packages/instrumentation-pg/src/instrumentation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ import {
} from './internal-types';
import { PgInstrumentationConfig } from './types';
import * as utils from './utils';
import { addSqlCommenterComment } from '@opentelemetry/sql-common';
import {
addSqlCommenterComment,
buildTraceparent,
} from '@opentelemetry/sql-common';
/** @knipignore */
import { PACKAGE_NAME, PACKAGE_VERSION } from './version';
import { SpanNames } from './enums/SpanNames';
Expand Down Expand Up @@ -66,6 +69,10 @@ function extractModuleExports(module: any) {
: module; // CommonJS
}

const INTERNAL_SET_QUERY = Symbol(
'opentelemetry.instrumentation-pg.internal-set-query'
);

export class PgInstrumentation extends InstrumentationBase<PgInstrumentationConfig> {
declare private _operationDuration: Histogram;
declare private _connectionsCount: UpDownCounter;
Expand Down Expand Up @@ -307,6 +314,15 @@ export class PgInstrumentation extends InstrumentationBase<PgInstrumentationConf
return (original: typeof pgTypes.Client.prototype.query) => {
this._diag.debug('Patching pg.Client.prototype.query');
return function query(this: PgClientExtended, ...args: unknown[]) {
// Skip our own internal SET application_name queries
if (
typeof args[0] === 'object' &&
args[0] !== null &&
(args[0] as any)[INTERNAL_SET_QUERY]
) {
return original.apply(this, args as never);
}

if (utils.shouldSkipInstrumentation(plugin.getConfig())) {
return original.apply(this, args as never);
}
Expand Down Expand Up @@ -466,6 +482,31 @@ export class PgInstrumentation extends InstrumentationBase<PgInstrumentationConf
);
}

// Inject trace context via SET application_name if enabled.
// The SET is pushed synchronously into pg's internal FIFO queue
// before the user's query, guaranteeing correct ordering. Note that
// pg processes one query at a time (readyForQuery gate), so the SET
// completes a full round-trip before the user's query is dispatched.
if (instrumentationConfig.enableTraceContextPropagation) {
const traceparent = buildTraceparent(span);
if (traceparent) {
const setQuery = {
text: `SET application_name = '${traceparent}'`,
[INTERNAL_SET_QUERY]: true,
};
try {
const setResult: unknown = original.apply(this, [
setQuery,
] as never);
if (setResult instanceof Promise) {
setResult.catch(() => {});
}
} catch {
// Silently ignore SET failures to avoid breaking user queries
}
}
Comment on lines +491 to +507
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like there to be some diag warnings here on failure so it doesn't fail completely silently. Would make things a lot easier to debug for people if they're not getting correlation.

}

let result: unknown;
try {
result = original.apply(this, args as never);
Expand Down
13 changes: 13 additions & 0 deletions packages/instrumentation-pg/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,17 @@ export interface PgInstrumentationConfig extends InstrumentationConfig {
* @default false
*/
ignoreConnectSpans?: boolean;

/**
* If true, injects the current span's W3C traceparent into the PostgreSQL
* session via `SET application_name` before each query.
*
* NOTE: This adds an extra `SET application_name` round-trip to the
* connection before each user query. The SET must complete before pg's
* internal queue dispatches the user's query, so expect roughly double
* the number of network round-trips when this option is enabled.
*
* @default false
*/
enableTraceContextPropagation?: boolean;
}
174 changes: 174 additions & 0 deletions packages/instrumentation-pg/test/pg.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1125,6 +1125,180 @@ describe('pg', () => {
});
});

describe('enableTraceContextPropagation', () => {
function expectedSetApplicationName(span: { spanContext(): any }) {
const sc = span.spanContext();
const traceparent = `00-${sc.traceId}-${sc.spanId}-0${Number(sc.traceFlags || 0).toString(16)}`;
return `SET application_name = '${traceparent}'`;
}

it('should not send SET application_name when enableTraceContextPropagation is not specified', async () => {
const span = tracer.startSpan('test span');
await context.with(trace.setSpan(context.active(), span), async () => {
const query = 'SELECT NOW()';
const res = await client.query(query);
assert.ok(res);

const executedQueries = getExecutedQueries();
assert.equal(executedQueries.length, 1);
assert.equal(executedQueries[0].text, query);
});
});

it('should send SET application_name before the user query when enableTraceContextPropagation=true', async () => {
instrumentation.setConfig({
enableTraceContextPropagation: true,
});

const span = tracer.startSpan('test span');
await context.with(trace.setSpan(context.active(), span), async () => {
const query = 'SELECT NOW()';
const res = await client.query(query);
assert.ok(res);

const executedQueries = getExecutedQueries();
assert.equal(
executedQueries.length,
2,
'Expected 2 queries: SET application_name + user query'
);

const [querySpan] = memoryExporter.getFinishedSpans();

assert.equal(
executedQueries[0].text,
expectedSetApplicationName(querySpan),
'SET application_name should contain the exact traceparent of the query span'
);

assert.equal(executedQueries[1].text, query);
});
});

it('should set application_name for prepared statements', async () => {
instrumentation.setConfig({
enableTraceContextPropagation: true,
});

const span = tracer.startSpan('test span');
await context.with(trace.setSpan(context.active(), span), async () => {
const res = await client.query({
name: 'prepared-trace-ctx',
text: 'SELECT $1::text',
values: ['hello'],
});
assert.ok(res);

const executedQueries = getExecutedQueries();
assert.equal(
executedQueries.length,
2,
'Expected 2 queries: SET application_name + prepared statement'
);
const [querySpan] = memoryExporter.getFinishedSpans();
assert.equal(
executedQueries[0].text,
expectedSetApplicationName(querySpan),
'SET application_name should contain exact traceparent before prepared statement'
);
assert.equal(executedQueries[1].text, 'SELECT $1::text');
});
});

it('should work together with addSqlCommenterCommentToQueries', async () => {
instrumentation.setConfig({
enableTraceContextPropagation: true,
addSqlCommenterCommentToQueries: true,
});

const span = tracer.startSpan('test span');
await context.with(trace.setSpan(context.active(), span), async () => {
const query = 'SELECT NOW()';
const res = await client.query(query);
assert.ok(res);

const executedQueries = getExecutedQueries();
assert.equal(
executedQueries.length,
2,
'Expected 2 queries: SET application_name + commented query'
);

const [querySpan] = memoryExporter.getFinishedSpans();
assert.equal(
executedQueries[0].text,
expectedSetApplicationName(querySpan),
'First query should be SET application_name with exact traceparent'
);

// Second should have SQL Commenter comment appended
assert.ok(
executedQueries[1].text?.includes('/*'),
'User query should have SQL Commenter comment'
);
assert.ok(
executedQueries[1].text?.startsWith('SELECT NOW()'),
'User query should start with original text'
);
});
});

it('should set correct application_name for concurrent queries on the same connection', async () => {
instrumentation.setConfig({
enableTraceContextPropagation: true,
});

// Fire 3 queries concurrently without awaiting each individually.
// All 6 items (3x SET + 3x query) are pushed synchronously into
// pg's queryQueue, and we verify they are correctly paired.
const results = await Promise.all([
context.with(
trace.setSpan(context.active(), tracer.startSpan('span-A')),
() => client.query('SELECT 1 AS a')
),
context.with(
trace.setSpan(context.active(), tracer.startSpan('span-B')),
() => client.query('SELECT 2 AS b')
),
context.with(
trace.setSpan(context.active(), tracer.startSpan('span-C')),
() => client.query('SELECT 3 AS c')
),
]);

assert.ok(results[0]);
assert.ok(results[1]);
assert.ok(results[2]);

const executedQueries = getExecutedQueries();
assert.equal(
executedQueries.length,
6,
'Expected 6 queries: 3x SET + 3x user query'
);

const finishedSpans = memoryExporter.getFinishedSpans();
assert.equal(finishedSpans.length, 3, 'Expected 3 query spans');

// Verify correct pairing: SET-A, Q-A, SET-B, Q-B, SET-C, Q-C
// Each SET should match the exact traceparent of its corresponding query span.
for (let i = 0; i < 3; i++) {
const setQuery = executedQueries[i * 2];
const userQuery = executedQueries[i * 2 + 1];

assert.equal(
setQuery.text,
expectedSetApplicationName(finishedSpans[i]),
`Query ${i * 2} should be SET application_name with exact traceparent for span ${i}`
);
assert.ok(
!userQuery.text?.startsWith('SET application_name'),
`Query ${i * 2 + 1} should be the user query, not SET`
);
}
});
});

describe('exception event recording', () => {
const assertExceptionEvents = (pgSpan: any) => {
assert.strictEqual(
Expand Down
20 changes: 20 additions & 0 deletions packages/sql-common/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,26 @@ import {
} from '@opentelemetry/api';
import { W3CTraceContextPropagator } from '@opentelemetry/core';

const propagator = new W3CTraceContextPropagator();
const TRACE_PARENT_HEADER = 'traceparent';

/**
* Builds a W3C traceparent string from a span.
* Returns undefined if the span context is invalid or tracing is suppressed.
*
* Format: "00-{traceId}-{spanId}-{flags}"
* See: https://www.w3.org/TR/trace-context/#traceparent-header
*/
export function buildTraceparent(span: Span): string | undefined {
const carrier: Record<string, string> = {};
propagator.inject(
trace.setSpan(ROOT_CONTEXT, span),
carrier,
defaultTextMapSetter
);
return carrier[TRACE_PARENT_HEADER];
}

// NOTE: This function currently is returning false-positives
// in cases where comment characters appear in string literals
// ("SELECT '-- not a comment';" would return true, although has no comment)
Expand Down
Loading