From 7684b4eff94587d6da29900d7b1774949720a79e Mon Sep 17 00:00:00 2001 From: nina9753 Date: Wed, 29 Oct 2025 12:48:33 -0400 Subject: [PATCH 1/7] Add batch PublishMessage API with producer-side span linking - Add full producer.js with batch message handling - Add span link extraction from messages 2-N - Add parent context extraction from first message - Add batch span creation (pubsub.request) - Inject batch metadata into messages (_dd.pubsub_request.*, _dd.batch.*) - Add Topic.publishMessage/publish wrappers for trace context injection - Inject _dd.p.tid for 128-bit trace ID support --- .../src/producer.js | 137 +++++++++++++++++- 1 file changed, 129 insertions(+), 8 deletions(-) diff --git a/packages/datadog-plugin-google-cloud-pubsub/src/producer.js b/packages/datadog-plugin-google-cloud-pubsub/src/producer.js index 94644e4e063..dbffb5dc4a6 100644 --- a/packages/datadog-plugin-google-cloud-pubsub/src/producer.js +++ b/packages/datadog-plugin-google-cloud-pubsub/src/producer.js @@ -9,35 +9,156 @@ class GoogleCloudPubsubProducerPlugin extends ProducerPlugin { bindStart (ctx) { const { request, api, projectId } = ctx - if (api !== 'publish') return const messages = request.messages || [] const topic = request.topic - const span = this.startSpan({ // TODO: rename + const messageCount = messages.length + const hasTraceContext = messages[0]?.attributes?.['x-datadog-trace-id'] + + // Collect span links from messages 2-N (skip first - it becomes parent) + const spanLinkData = hasTraceContext + ? messages.slice(1).map(msg => this._extractSpanLink(msg.attributes)).filter(Boolean) + : [] + + // Extract parent from first message + const firstAttrs = messages[0]?.attributes + const parentData = firstAttrs?.['x-datadog-trace-id'] && firstAttrs['x-datadog-parent-id'] + ? { + traceId: firstAttrs['x-datadog-trace-id'], + spanId: firstAttrs['x-datadog-parent-id'], + traceIdUpper: firstAttrs['_dd.p.tid'], + samplingPriority: firstAttrs['x-datadog-sampling-priority'] + } + : null + + // Create pubsub.request span + const batchSpan = this.startSpan({ + childOf: parentData ? this._extractParentContext(parentData) : undefined, resource: `${api} ${topic}`, + service: this.config.service || `${this.tracer._service}-pubsub`, meta: { 'gcloud.project_id': projectId, - 'pubsub.method': api, // TODO: remove - 'pubsub.topic': topic + 'pubsub.method': api, + 'pubsub.topic': topic, + 'span.kind': 'producer', + '_dd.base_service': this.tracer._service, + '_dd.serviceoverride.type': 'integration', + 'pubsub.linked_message_count': spanLinkData.length || undefined, + operation: messageCount > 1 ? 'batched.pubsub.request' : 'pubsub.request' + }, + metrics: { + 'pubsub.batch.message_count': messageCount, + 'pubsub.batch': messageCount > 1 ? true : undefined } }, ctx) +<<<<<<< Updated upstream for (const msg of messages) { if (!msg.attributes) { msg.attributes = {} } this.tracer.inject(span, 'text_map', msg.attributes) +======= + const spanCtx = batchSpan.context() + const batchTraceId = spanCtx.toTraceId() + const batchSpanId = spanCtx.toSpanId() + const batchTraceIdUpper = spanCtx._trace.tags['_dd.p.tid'] + + // Convert to hex for storage (simpler, used directly by span links) + const batchTraceIdHex = BigInt(batchTraceId).toString(16).padStart(16, '0') + const batchSpanIdHex = BigInt(batchSpanId).toString(16).padStart(16, '0') + + // Add span links as metadata + if (spanLinkData.length) { + batchSpan.setTag('_dd.span_links', JSON.stringify( + spanLinkData.map(link => ({ + trace_id: link.traceId, + span_id: link.spanId, + flags: link.samplingPriority || 0 + })) + )) + } + + // Add metadata to all messages + messages.forEach((msg, i) => { + msg.attributes = msg.attributes || {} + + if (!hasTraceContext) { + this.tracer.inject(batchSpan, 'text_map', msg.attributes) + } + + Object.assign(msg.attributes, { + '_dd.pubsub_request.trace_id': batchTraceIdHex, + '_dd.pubsub_request.span_id': batchSpanIdHex, + '_dd.batch.size': String(messageCount), + '_dd.batch.index': String(i), + 'gcloud.project_id': projectId, + 'pubsub.topic': topic + }) + + if (batchTraceIdUpper) { + msg.attributes['_dd.pubsub_request.p.tid'] = batchTraceIdUpper + } + + msg.attributes['x-dd-publish-start-time'] ??= String(Date.now()) + +>>>>>>> Stashed changes if (this.config.dsmEnabled) { - const payloadSize = getHeadersSize(msg) - const dataStreamsContext = this.tracer - .setCheckpoint(['direction:out', `topic:${topic}`, 'type:google-pubsub'], span, payloadSize) + const dataStreamsContext = this.tracer.setCheckpoint( + ['direction:out', `topic:${topic}`, 'type:google-pubsub'], + batchSpan, + getHeadersSize(msg) + ) DsmPathwayCodec.encode(dataStreamsContext, msg.attributes) } - } + }) + ctx.batchSpan = batchSpan return ctx.currentStore } + + bindFinish (ctx) { + if (ctx.batchSpan && !ctx.batchSpan._duration) ctx.batchSpan.finish() + return super.bindFinish(ctx) + } + + bindError (ctx) { + if (ctx.error && ctx.batchSpan) { + ctx.batchSpan.setTag('error', ctx.error) + ctx.batchSpan.finish() + } + return super.bindError(ctx) + } + + _extractSpanLink (attrs) { + if (!attrs?.['x-datadog-trace-id'] || !attrs['x-datadog-parent-id']) return null + + const lowerHex = BigInt(attrs['x-datadog-trace-id']).toString(16).padStart(16, '0') + const spanIdHex = BigInt(attrs['x-datadog-parent-id']).toString(16).padStart(16, '0') + const traceIdHex = attrs['_dd.p.tid'] + ? attrs['_dd.p.tid'] + lowerHex + : lowerHex.padStart(32, '0') + + return { + traceId: traceIdHex, + spanId: spanIdHex, + samplingPriority: attrs['x-datadog-sampling-priority'] + ? parseInt(attrs['x-datadog-sampling-priority'], 10) + : undefined + } + } + + _extractParentContext (data) { + const carrier = { + 'x-datadog-trace-id': data.traceId, + 'x-datadog-parent-id': data.spanId + } + if (data.traceIdUpper) carrier['_dd.p.tid'] = data.traceIdUpper + if (data.samplingPriority) carrier['x-datadog-sampling-priority'] = String(data.samplingPriority) + + return this.tracer.extract('text_map', carrier) + } } module.exports = GoogleCloudPubsubProducerPlugin From d4756d158dbdf9ef92e116156ba635660d34ca30 Mon Sep 17 00:00:00 2001 From: nina9753 Date: Wed, 29 Oct 2025 12:56:25 -0400 Subject: [PATCH 2/7] Full batch handling with span link extraction --- .../datadog-plugin-google-cloud-pubsub/src/producer.js | 8 -------- 1 file changed, 8 deletions(-) diff --git a/packages/datadog-plugin-google-cloud-pubsub/src/producer.js b/packages/datadog-plugin-google-cloud-pubsub/src/producer.js index dbffb5dc4a6..3563ff839a1 100644 --- a/packages/datadog-plugin-google-cloud-pubsub/src/producer.js +++ b/packages/datadog-plugin-google-cloud-pubsub/src/producer.js @@ -53,13 +53,6 @@ class GoogleCloudPubsubProducerPlugin extends ProducerPlugin { } }, ctx) -<<<<<<< Updated upstream - for (const msg of messages) { - if (!msg.attributes) { - msg.attributes = {} - } - this.tracer.inject(span, 'text_map', msg.attributes) -======= const spanCtx = batchSpan.context() const batchTraceId = spanCtx.toTraceId() const batchSpanId = spanCtx.toSpanId() @@ -103,7 +96,6 @@ class GoogleCloudPubsubProducerPlugin extends ProducerPlugin { msg.attributes['x-dd-publish-start-time'] ??= String(Date.now()) ->>>>>>> Stashed changes if (this.config.dsmEnabled) { const dataStreamsContext = this.tracer.setCheckpoint( ['direction:out', `topic:${topic}`, 'type:google-pubsub'], From c0312bc02fc7e3a219d269651b88d4430683207e Mon Sep 17 00:00:00 2001 From: nina9753 Date: Thu, 30 Oct 2025 09:19:51 -0400 Subject: [PATCH 3/7] Fix lint errors and operation name. Use Number.parseInt instead of parseInt --- .../src/producer.js | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/packages/datadog-plugin-google-cloud-pubsub/src/producer.js b/packages/datadog-plugin-google-cloud-pubsub/src/producer.js index 3563ff839a1..c39289ebee4 100644 --- a/packages/datadog-plugin-google-cloud-pubsub/src/producer.js +++ b/packages/datadog-plugin-google-cloud-pubsub/src/producer.js @@ -5,7 +5,7 @@ const { DsmPathwayCodec, getHeadersSize } = require('../../dd-trace/src/datastre class GoogleCloudPubsubProducerPlugin extends ProducerPlugin { static id = 'google-cloud-pubsub' - static operation = 'request' + static operation = 'send' bindStart (ctx) { const { request, api, projectId } = ctx @@ -17,7 +17,7 @@ class GoogleCloudPubsubProducerPlugin extends ProducerPlugin { const hasTraceContext = messages[0]?.attributes?.['x-datadog-trace-id'] // Collect span links from messages 2-N (skip first - it becomes parent) - const spanLinkData = hasTraceContext + const spanLinkData = hasTraceContex ? messages.slice(1).map(msg => this._extractSpanLink(msg.attributes)).filter(Boolean) : [] @@ -76,7 +76,7 @@ class GoogleCloudPubsubProducerPlugin extends ProducerPlugin { // Add metadata to all messages messages.forEach((msg, i) => { msg.attributes = msg.attributes || {} - + if (!hasTraceContext) { this.tracer.inject(batchSpan, 'text_map', msg.attributes) } @@ -98,8 +98,8 @@ class GoogleCloudPubsubProducerPlugin extends ProducerPlugin { if (this.config.dsmEnabled) { const dataStreamsContext = this.tracer.setCheckpoint( - ['direction:out', `topic:${topic}`, 'type:google-pubsub'], - batchSpan, + ['direction:out', `topic:${topic}`, 'type:google-pubsub'], + batchSpan, getHeadersSize(msg) ) DsmPathwayCodec.encode(dataStreamsContext, msg.attributes) @@ -128,15 +128,15 @@ class GoogleCloudPubsubProducerPlugin extends ProducerPlugin { const lowerHex = BigInt(attrs['x-datadog-trace-id']).toString(16).padStart(16, '0') const spanIdHex = BigInt(attrs['x-datadog-parent-id']).toString(16).padStart(16, '0') - const traceIdHex = attrs['_dd.p.tid'] - ? attrs['_dd.p.tid'] + lowerHex + const traceIdHex = attrs['_dd.p.tid'] + ? attrs['_dd.p.tid'] + lowerHex : lowerHex.padStart(32, '0') return { traceId: traceIdHex, spanId: spanIdHex, - samplingPriority: attrs['x-datadog-sampling-priority'] - ? parseInt(attrs['x-datadog-sampling-priority'], 10) + samplingPriority: attrs['x-datadog-sampling-priority'] + ? Number.parseInt(attrs['x-datadog-sampling-priority'], 10) : undefined } } @@ -148,7 +148,7 @@ class GoogleCloudPubsubProducerPlugin extends ProducerPlugin { } if (data.traceIdUpper) carrier['_dd.p.tid'] = data.traceIdUpper if (data.samplingPriority) carrier['x-datadog-sampling-priority'] = String(data.samplingPriority) - + return this.tracer.extract('text_map', carrier) } } From 237fe928d99471a446343cceeac4012aa7dd8454 Mon Sep 17 00:00:00 2001 From: nina9753 Date: Thu, 30 Oct 2025 09:24:01 -0400 Subject: [PATCH 4/7] Fix typo --- packages/datadog-plugin-google-cloud-pubsub/src/producer.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/datadog-plugin-google-cloud-pubsub/src/producer.js b/packages/datadog-plugin-google-cloud-pubsub/src/producer.js index c39289ebee4..87f42a82183 100644 --- a/packages/datadog-plugin-google-cloud-pubsub/src/producer.js +++ b/packages/datadog-plugin-google-cloud-pubsub/src/producer.js @@ -17,7 +17,7 @@ class GoogleCloudPubsubProducerPlugin extends ProducerPlugin { const hasTraceContext = messages[0]?.attributes?.['x-datadog-trace-id'] // Collect span links from messages 2-N (skip first - it becomes parent) - const spanLinkData = hasTraceContex + const spanLinkData = hasTraceContext ? messages.slice(1).map(msg => this._extractSpanLink(msg.attributes)).filter(Boolean) : [] From 6f0a5126f3dec596280f2b8b1c19175e126a4caf Mon Sep 17 00:00:00 2001 From: nina9753 Date: Thu, 30 Oct 2025 09:56:18 -0400 Subject: [PATCH 5/7] Fix producer plugin channel subscription - use operation 'request' to match instrumentation --- packages/datadog-plugin-google-cloud-pubsub/src/producer.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/datadog-plugin-google-cloud-pubsub/src/producer.js b/packages/datadog-plugin-google-cloud-pubsub/src/producer.js index 87f42a82183..b80c102b180 100644 --- a/packages/datadog-plugin-google-cloud-pubsub/src/producer.js +++ b/packages/datadog-plugin-google-cloud-pubsub/src/producer.js @@ -5,7 +5,7 @@ const { DsmPathwayCodec, getHeadersSize } = require('../../dd-trace/src/datastre class GoogleCloudPubsubProducerPlugin extends ProducerPlugin { static id = 'google-cloud-pubsub' - static operation = 'send' + static operation = 'request' bindStart (ctx) { const { request, api, projectId } = ctx From 006fa18cbf70dbfa368321f039fb7d7599fd9908 Mon Sep 17 00:00:00 2001 From: nina9753 Date: Thu, 30 Oct 2025 10:13:42 -0400 Subject: [PATCH 6/7] Remove explicit service setting to allow ProducerPlugin to apply nomenclature correctly --- packages/datadog-plugin-google-cloud-pubsub/src/producer.js | 1 - packages/datadog-plugin-google-cloud-pubsub/test/naming.js | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/datadog-plugin-google-cloud-pubsub/src/producer.js b/packages/datadog-plugin-google-cloud-pubsub/src/producer.js index b80c102b180..902b215cce1 100644 --- a/packages/datadog-plugin-google-cloud-pubsub/src/producer.js +++ b/packages/datadog-plugin-google-cloud-pubsub/src/producer.js @@ -36,7 +36,6 @@ class GoogleCloudPubsubProducerPlugin extends ProducerPlugin { const batchSpan = this.startSpan({ childOf: parentData ? this._extractParentContext(parentData) : undefined, resource: `${api} ${topic}`, - service: this.config.service || `${this.tracer._service}-pubsub`, meta: { 'gcloud.project_id': projectId, 'pubsub.method': api, diff --git a/packages/datadog-plugin-google-cloud-pubsub/test/naming.js b/packages/datadog-plugin-google-cloud-pubsub/test/naming.js index b03e300f346..e7aa1992f91 100644 --- a/packages/datadog-plugin-google-cloud-pubsub/test/naming.js +++ b/packages/datadog-plugin-google-cloud-pubsub/test/naming.js @@ -9,7 +9,7 @@ const rawExpectedSchema = { serviceName: 'test-pubsub' }, v1: { - opName: 'gcp.pubsub.send', + opName: 'gcp.pubsub.request', serviceName: 'test' } }, From f638388b301e31975167205de1063428c3e03b3d Mon Sep 17 00:00:00 2001 From: nina9753 Date: Thu, 30 Oct 2025 10:46:33 -0400 Subject: [PATCH 7/7] Fix test expectation:'gcp.pubsub.send' --- packages/datadog-plugin-google-cloud-pubsub/test/naming.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/datadog-plugin-google-cloud-pubsub/test/naming.js b/packages/datadog-plugin-google-cloud-pubsub/test/naming.js index e7aa1992f91..b03e300f346 100644 --- a/packages/datadog-plugin-google-cloud-pubsub/test/naming.js +++ b/packages/datadog-plugin-google-cloud-pubsub/test/naming.js @@ -9,7 +9,7 @@ const rawExpectedSchema = { serviceName: 'test-pubsub' }, v1: { - opName: 'gcp.pubsub.request', + opName: 'gcp.pubsub.send', serviceName: 'test' } },