diff --git a/packages/instrumentation-pg/src/instrumentation.ts b/packages/instrumentation-pg/src/instrumentation.ts index 79e34a5d80..522da1340c 100644 --- a/packages/instrumentation-pg/src/instrumentation.ts +++ b/packages/instrumentation-pg/src/instrumentation.ts @@ -35,7 +35,10 @@ import { } from './internal-types'; import { PgInstrumentationConfig } from './types'; import * as utils from './utils'; -import { addSqlCommenterComment } from '@opentelemetry/sql-common'; +import { + addSqlCommenterComment, + buildTraceparent, +} from '@opentelemetry/sql-common'; /** @knipignore */ import { PACKAGE_NAME, PACKAGE_VERSION } from './version'; import { SpanNames } from './enums/SpanNames'; @@ -66,6 +69,10 @@ function extractModuleExports(module: any) { : module; // CommonJS } +const INTERNAL_SET_QUERY = Symbol( + 'opentelemetry.instrumentation-pg.internal-set-query' +); + export class PgInstrumentation extends InstrumentationBase { declare private _operationDuration: Histogram; declare private _connectionsCount: UpDownCounter; @@ -307,6 +314,15 @@ export class PgInstrumentation extends InstrumentationBase { this._diag.debug('Patching pg.Client.prototype.query'); return function query(this: PgClientExtended, ...args: unknown[]) { + // Skip our own internal SET application_name queries + if ( + typeof args[0] === 'object' && + args[0] !== null && + (args[0] as any)[INTERNAL_SET_QUERY] + ) { + return original.apply(this, args as never); + } + if (utils.shouldSkipInstrumentation(plugin.getConfig())) { return original.apply(this, args as never); } @@ -466,6 +482,31 @@ export class PgInstrumentation extends InstrumentationBase {}); + } + } catch { + // Silently ignore SET failures to avoid breaking user queries + } + } + } + let result: unknown; try { result = original.apply(this, args as never); diff --git a/packages/instrumentation-pg/src/types.ts b/packages/instrumentation-pg/src/types.ts index ff77d1bdd8..9258c02690 100644 --- a/packages/instrumentation-pg/src/types.ts +++ b/packages/instrumentation-pg/src/types.ts @@ -76,4 +76,17 @@ export interface PgInstrumentationConfig extends InstrumentationConfig { * @default false */ ignoreConnectSpans?: boolean; + + /** + * If true, injects the current span's W3C traceparent into the PostgreSQL + * session via `SET application_name` before each query. + * + * NOTE: This adds an extra `SET application_name` round-trip to the + * connection before each user query. The SET must complete before pg's + * internal queue dispatches the user's query, so expect roughly double + * the number of network round-trips when this option is enabled. + * + * @default false + */ + enableTraceContextPropagation?: boolean; } diff --git a/packages/instrumentation-pg/test/pg.test.ts b/packages/instrumentation-pg/test/pg.test.ts index b17ea4feed..1af7a866d8 100644 --- a/packages/instrumentation-pg/test/pg.test.ts +++ b/packages/instrumentation-pg/test/pg.test.ts @@ -1125,6 +1125,180 @@ describe('pg', () => { }); }); + describe('enableTraceContextPropagation', () => { + function expectedSetApplicationName(span: { spanContext(): any }) { + const sc = span.spanContext(); + const traceparent = `00-${sc.traceId}-${sc.spanId}-0${Number(sc.traceFlags || 0).toString(16)}`; + return `SET application_name = '${traceparent}'`; + } + + it('should not send SET application_name when enableTraceContextPropagation is not specified', async () => { + const span = tracer.startSpan('test span'); + await context.with(trace.setSpan(context.active(), span), async () => { + const query = 'SELECT NOW()'; + const res = await client.query(query); + assert.ok(res); + + const executedQueries = getExecutedQueries(); + assert.equal(executedQueries.length, 1); + assert.equal(executedQueries[0].text, query); + }); + }); + + it('should send SET application_name before the user query when enableTraceContextPropagation=true', async () => { + instrumentation.setConfig({ + enableTraceContextPropagation: true, + }); + + const span = tracer.startSpan('test span'); + await context.with(trace.setSpan(context.active(), span), async () => { + const query = 'SELECT NOW()'; + const res = await client.query(query); + assert.ok(res); + + const executedQueries = getExecutedQueries(); + assert.equal( + executedQueries.length, + 2, + 'Expected 2 queries: SET application_name + user query' + ); + + const [querySpan] = memoryExporter.getFinishedSpans(); + + assert.equal( + executedQueries[0].text, + expectedSetApplicationName(querySpan), + 'SET application_name should contain the exact traceparent of the query span' + ); + + assert.equal(executedQueries[1].text, query); + }); + }); + + it('should set application_name for prepared statements', async () => { + instrumentation.setConfig({ + enableTraceContextPropagation: true, + }); + + const span = tracer.startSpan('test span'); + await context.with(trace.setSpan(context.active(), span), async () => { + const res = await client.query({ + name: 'prepared-trace-ctx', + text: 'SELECT $1::text', + values: ['hello'], + }); + assert.ok(res); + + const executedQueries = getExecutedQueries(); + assert.equal( + executedQueries.length, + 2, + 'Expected 2 queries: SET application_name + prepared statement' + ); + const [querySpan] = memoryExporter.getFinishedSpans(); + assert.equal( + executedQueries[0].text, + expectedSetApplicationName(querySpan), + 'SET application_name should contain exact traceparent before prepared statement' + ); + assert.equal(executedQueries[1].text, 'SELECT $1::text'); + }); + }); + + it('should work together with addSqlCommenterCommentToQueries', async () => { + instrumentation.setConfig({ + enableTraceContextPropagation: true, + addSqlCommenterCommentToQueries: true, + }); + + const span = tracer.startSpan('test span'); + await context.with(trace.setSpan(context.active(), span), async () => { + const query = 'SELECT NOW()'; + const res = await client.query(query); + assert.ok(res); + + const executedQueries = getExecutedQueries(); + assert.equal( + executedQueries.length, + 2, + 'Expected 2 queries: SET application_name + commented query' + ); + + const [querySpan] = memoryExporter.getFinishedSpans(); + assert.equal( + executedQueries[0].text, + expectedSetApplicationName(querySpan), + 'First query should be SET application_name with exact traceparent' + ); + + // Second should have SQL Commenter comment appended + assert.ok( + executedQueries[1].text?.includes('/*'), + 'User query should have SQL Commenter comment' + ); + assert.ok( + executedQueries[1].text?.startsWith('SELECT NOW()'), + 'User query should start with original text' + ); + }); + }); + + it('should set correct application_name for concurrent queries on the same connection', async () => { + instrumentation.setConfig({ + enableTraceContextPropagation: true, + }); + + // Fire 3 queries concurrently without awaiting each individually. + // All 6 items (3x SET + 3x query) are pushed synchronously into + // pg's queryQueue, and we verify they are correctly paired. + const results = await Promise.all([ + context.with( + trace.setSpan(context.active(), tracer.startSpan('span-A')), + () => client.query('SELECT 1 AS a') + ), + context.with( + trace.setSpan(context.active(), tracer.startSpan('span-B')), + () => client.query('SELECT 2 AS b') + ), + context.with( + trace.setSpan(context.active(), tracer.startSpan('span-C')), + () => client.query('SELECT 3 AS c') + ), + ]); + + assert.ok(results[0]); + assert.ok(results[1]); + assert.ok(results[2]); + + const executedQueries = getExecutedQueries(); + assert.equal( + executedQueries.length, + 6, + 'Expected 6 queries: 3x SET + 3x user query' + ); + + const finishedSpans = memoryExporter.getFinishedSpans(); + assert.equal(finishedSpans.length, 3, 'Expected 3 query spans'); + + // Verify correct pairing: SET-A, Q-A, SET-B, Q-B, SET-C, Q-C + // Each SET should match the exact traceparent of its corresponding query span. + for (let i = 0; i < 3; i++) { + const setQuery = executedQueries[i * 2]; + const userQuery = executedQueries[i * 2 + 1]; + + assert.equal( + setQuery.text, + expectedSetApplicationName(finishedSpans[i]), + `Query ${i * 2} should be SET application_name with exact traceparent for span ${i}` + ); + assert.ok( + !userQuery.text?.startsWith('SET application_name'), + `Query ${i * 2 + 1} should be the user query, not SET` + ); + } + }); + }); + describe('exception event recording', () => { const assertExceptionEvents = (pgSpan: any) => { assert.strictEqual( diff --git a/packages/sql-common/src/index.ts b/packages/sql-common/src/index.ts index 20dbd6bb4f..9cb248a8fb 100644 --- a/packages/sql-common/src/index.ts +++ b/packages/sql-common/src/index.ts @@ -11,6 +11,26 @@ import { } from '@opentelemetry/api'; import { W3CTraceContextPropagator } from '@opentelemetry/core'; +const propagator = new W3CTraceContextPropagator(); +const TRACE_PARENT_HEADER = 'traceparent'; + +/** + * Builds a W3C traceparent string from a span. + * Returns undefined if the span context is invalid or tracing is suppressed. + * + * Format: "00-{traceId}-{spanId}-{flags}" + * See: https://www.w3.org/TR/trace-context/#traceparent-header + */ +export function buildTraceparent(span: Span): string | undefined { + const carrier: Record = {}; + propagator.inject( + trace.setSpan(ROOT_CONTEXT, span), + carrier, + defaultTextMapSetter + ); + return carrier[TRACE_PARENT_HEADER]; +} + // NOTE: This function currently is returning false-positives // in cases where comment characters appear in string literals // ("SELECT '-- not a comment';" would return true, although has no comment)