From f4e890d7d4e00d596573d93aa9068140ec270fa2 Mon Sep 17 00:00:00 2001 From: Rafael Gibim <9031589+Drowze@users.noreply.github.com> Date: Mon, 8 Dec 2025 14:43:35 -0300 Subject: [PATCH 1/4] Fix Karafka framework patch By the time that `Karafka::App#initialized!` is called, `Karafka.producer` was already initialized. On top of that, there's really nowhere we can hook in the Karafka initialization where we can be sure that `Karafka.producer` wasn't yet initiaized - that's because the Karafka initialization is NOT necessarily tied to the WaterDrop initialization. For instance, the following is a possible scenario: ``` $producer = WaterDrop::Producer.new { ... } Datadog.configure do |c| c.tracing.instrument :karafka end # note that the producer was initialized before the Karafka app (and in # this case, even before datadog was configured) Karafka::App.setup do |c| c.producer = $producer end ``` So instead of trying to hook somewhere before `Karafka.producer` is initialized, let's simply listen to a Karafka after-initialization event and append our middleware to the producer when that event is triggered. --- .../tracing/contrib/karafka/patcher.rb | 24 +++--- .../tracing/contrib/waterdrop/patcher.rb | 9 ++- .../tracing/contrib/karafka/patcher_spec.rb | 77 +++++++++++++++++++ 3 files changed, 94 insertions(+), 16 deletions(-) diff --git a/lib/datadog/tracing/contrib/karafka/patcher.rb b/lib/datadog/tracing/contrib/karafka/patcher.rb index 1423e7f9b8c..d0ccdcd9bf1 100644 --- a/lib/datadog/tracing/contrib/karafka/patcher.rb +++ b/lib/datadog/tracing/contrib/karafka/patcher.rb @@ -66,22 +66,12 @@ def each(&block) end end - module AppPatch - ONLY_ONCE_PER_APP = Hash.new { |h, key| h[key] = Core::Utils::OnlyOnce.new } - - def initialized! - ONLY_ONCE_PER_APP[self].run do - # Activate tracing on components related to Karafka (e.g. WaterDrop) - Contrib::Karafka::Framework.setup - end - super - end - end - # Patcher enables patching of 'karafka' module. module Patcher include Contrib::Patcher + ACTIVATE_FRAMEWORK_ONLY_ONCE = Core::Utils::OnlyOnce.new + module_function def target_version @@ -91,10 +81,18 @@ def target_version def patch require_relative 'monitor' require_relative 'framework' + require_relative '../waterdrop/middleware' ::Karafka::Instrumentation::Monitor.prepend(Monitor) ::Karafka::Messages::Messages.prepend(MessagesPatch) - ::Karafka::App.singleton_class.prepend(AppPatch) + + ::Karafka.monitor.subscribe('app.initialized') do |event| + ACTIVATE_FRAMEWORK_ONLY_ONCE.run do + Contrib::Karafka::Framework.setup + end + + Contrib::WaterDrop::Patcher.add_middleware(::Karafka.producer) + end end end end diff --git a/lib/datadog/tracing/contrib/waterdrop/patcher.rb b/lib/datadog/tracing/contrib/waterdrop/patcher.rb index aa1b04d370a..293d2937cf3 100644 --- a/lib/datadog/tracing/contrib/waterdrop/patcher.rb +++ b/lib/datadog/tracing/contrib/waterdrop/patcher.rb @@ -25,9 +25,7 @@ def patch ::WaterDrop::Producer.prepend(Producer) ::WaterDrop.instrumentation.subscribe('producer.configured') do |event| producer = event[:producer] - - included_middlewares = producer.middleware.instance_variable_get(:@steps) - producer.middleware.append(Middleware) unless included_middlewares.include?(Middleware) + add_middleware(producer) if Datadog.configuration.data_streams.enabled producer.monitor.subscribe('message.acknowledged') do |ack_event| @@ -39,6 +37,11 @@ def patch end end end + + def add_middleware(producer) + included_middlewares = producer.middleware.instance_variable_get(:@steps) + producer.middleware.append(Middleware) unless included_middlewares.include?(Middleware) + end end end end diff --git a/spec/datadog/tracing/contrib/karafka/patcher_spec.rb b/spec/datadog/tracing/contrib/karafka/patcher_spec.rb index ae989e511b7..739edc4a357 100644 --- a/spec/datadog/tracing/contrib/karafka/patcher_spec.rb +++ b/spec/datadog/tracing/contrib/karafka/patcher_spec.rb @@ -80,4 +80,81 @@ expect(span.resource).to eq 'ABC#consume' end end + + describe 'framework auto-instrumentation' do + around do |example| + # Reset before and after each example; don't allow global state to linger. + Datadog.registry[:waterdrop].reset_configuration! + example.run + Datadog.registry[:waterdrop].reset_configuration! + + # reset Karafka internal state as well + Karafka::App.config.internal.status.reset! + Karafka::App.config.producer = nil + Karafka.refresh! + end + + let(:producer_middlewares) { Karafka.producer.middleware.instance_variable_get(:@steps) } + + it 'automatically enables waterdrop instrumentation' do + Karafka::App.setup do |c| + c.kafka = {"bootstrap.servers": '127.0.0.1:9092'} + end + + expect(Datadog.configuration.tracing[:karafka][:enabled]).to be true + expect(Datadog.configuration.tracing[:karafka][:distributed_tracing]).to be true + + expect(Datadog.configuration.tracing[:waterdrop][:enabled]).to be true + expect(Datadog.configuration.tracing[:waterdrop][:distributed_tracing]).to be true + end + + context 'when user does not supply a custom producer' do + it 'sets up Karafka.producer with the datadog waterdrop middleware' do + Karafka::App.setup do |c| + c.kafka = {"bootstrap.servers": '127.0.0.1:9092'} + end + + expect(producer_middlewares).to eq([ + Datadog::Tracing::Contrib::WaterDrop::Middleware + ]) + end + end + + context 'when the user does supply a custom producer with custom middlewares' do + let(:custom_middleware) { ->(message) { messsage } } + + it 'appends the datadog middleware at the end of the Karafka.producer middleware stack' do + Karafka::App.setup do |c| + c.kafka = {"bootstrap.servers": '127.0.0.1:9092'} + c.producer = WaterDrop::Producer.new do |producer_config| + producer_config.kafka = {"bootstrap.servers": '127.0.0.1:9092'} + producer_config.middleware.append(custom_middleware) + end + end + + expect(producer_middlewares).to eq([ + custom_middleware, + Datadog::Tracing::Contrib::WaterDrop::Middleware + ]) + end + end + + context 'when the waterdrop integration is manually configured' do + before do + Datadog.configure do |c| + c.tracing.instrument :waterdrop, configuration_options + end + end + + it 'appends the datadog middleware to Karafka.producer only once' do + Karafka::App.setup do |c| + c.kafka = {"bootstrap.servers": '127.0.0.1:9092'} + end + + expect(producer_middlewares).to eq([ + Datadog::Tracing::Contrib::WaterDrop::Middleware + ]) + end + end + end end From 96e4e6c87367778aa0ad1bfc7e8a24cdce5c2221 Mon Sep 17 00:00:00 2001 From: Rafael Gibim <9031589+Drowze@users.noreply.github.com> Date: Mon, 8 Dec 2025 15:28:31 -0300 Subject: [PATCH 2/4] Do not attempt to auto-activate WaterDrop unless it's compatible --- .../tracing/contrib/karafka/patcher.rb | 14 +++++---- .../tracing/contrib/karafka/patcher_spec.rb | 31 ++++++++++++++++--- 2 files changed, 34 insertions(+), 11 deletions(-) diff --git a/lib/datadog/tracing/contrib/karafka/patcher.rb b/lib/datadog/tracing/contrib/karafka/patcher.rb index d0ccdcd9bf1..62ce55b7509 100644 --- a/lib/datadog/tracing/contrib/karafka/patcher.rb +++ b/lib/datadog/tracing/contrib/karafka/patcher.rb @@ -81,17 +81,19 @@ def target_version def patch require_relative 'monitor' require_relative 'framework' - require_relative '../waterdrop/middleware' + require_relative '../waterdrop' ::Karafka::Instrumentation::Monitor.prepend(Monitor) ::Karafka::Messages::Messages.prepend(MessagesPatch) - ::Karafka.monitor.subscribe('app.initialized') do |event| - ACTIVATE_FRAMEWORK_ONLY_ONCE.run do - Contrib::Karafka::Framework.setup - end + if Contrib::WaterDrop::Integration.compatible? + ::Karafka.monitor.subscribe('app.initialized') do |event| + ACTIVATE_FRAMEWORK_ONLY_ONCE.run do + Contrib::Karafka::Framework.setup + end - Contrib::WaterDrop::Patcher.add_middleware(::Karafka.producer) + Contrib::WaterDrop::Patcher.add_middleware(::Karafka.producer) + end end end end diff --git a/spec/datadog/tracing/contrib/karafka/patcher_spec.rb b/spec/datadog/tracing/contrib/karafka/patcher_spec.rb index 739edc4a357..aa8b5b0473a 100644 --- a/spec/datadog/tracing/contrib/karafka/patcher_spec.rb +++ b/spec/datadog/tracing/contrib/karafka/patcher_spec.rb @@ -96,7 +96,13 @@ let(:producer_middlewares) { Karafka.producer.middleware.instance_variable_get(:@steps) } - it 'automatically enables waterdrop instrumentation' do + def waterdrop_compatible? + Datadog::Tracing::Contrib::WaterDrop::Integration.compatible? + end + + it 'automatically enables WaterDrop instrumentation' do + skip 'WaterDrop is not activated unless it is on a supported version' unless waterdrop_compatible? + Karafka::App.setup do |c| c.kafka = {"bootstrap.servers": '127.0.0.1:9092'} end @@ -110,6 +116,8 @@ context 'when user does not supply a custom producer' do it 'sets up Karafka.producer with the datadog waterdrop middleware' do + skip 'WaterDrop is not activated unless it is on a supported version' unless waterdrop_compatible? + Karafka::App.setup do |c| c.kafka = {"bootstrap.servers": '127.0.0.1:9092'} end @@ -124,6 +132,8 @@ let(:custom_middleware) { ->(message) { messsage } } it 'appends the datadog middleware at the end of the Karafka.producer middleware stack' do + skip 'WaterDrop is not activated unless it is on a supported version' unless waterdrop_compatible? + Karafka::App.setup do |c| c.kafka = {"bootstrap.servers": '127.0.0.1:9092'} c.producer = WaterDrop::Producer.new do |producer_config| @@ -140,13 +150,12 @@ end context 'when the waterdrop integration is manually configured' do - before do + it 'appends the datadog middleware to Karafka.producer only once' do + skip 'WaterDrop is not activated unless it is on a supported version' unless waterdrop_compatible? + Datadog.configure do |c| c.tracing.instrument :waterdrop, configuration_options end - end - - it 'appends the datadog middleware to Karafka.producer only once' do Karafka::App.setup do |c| c.kafka = {"bootstrap.servers": '127.0.0.1:9092'} end @@ -156,5 +165,17 @@ ]) end end + + context 'when the waterdrop integration is not on a compatbile version' do + it 'does not attempt to activate waterdrop or append any producer middleware' do + skip 'WaterDrop is not activated unless it is on a supported version' if waterdrop_compatible? + + Karafka::App.setup do |c| + c.kafka = {"bootstrap.servers": '127.0.0.1:9092'} + end + + expect(producer_middlewares).to be_empty + end + end end end From feef9febfcaeced047f74dc4981ae5550da7e2b5 Mon Sep 17 00:00:00 2001 From: Rafael Gibim <9031589+Drowze@users.noreply.github.com> Date: Mon, 5 Jan 2026 10:08:59 -0300 Subject: [PATCH 3/4] Fix steep errors --- sig/datadog/tracing/contrib/waterdrop/patcher.rbs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sig/datadog/tracing/contrib/waterdrop/patcher.rbs b/sig/datadog/tracing/contrib/waterdrop/patcher.rbs index 951bb4e3d96..bd6bf89b751 100644 --- a/sig/datadog/tracing/contrib/waterdrop/patcher.rbs +++ b/sig/datadog/tracing/contrib/waterdrop/patcher.rbs @@ -9,6 +9,9 @@ module Datadog def self?.target_version: () -> ::Gem::Version? def self?.patch: () -> void + + # Adds the tracing middleware to a WaterDrop producer + def add_middleware: (untyped producer) -> void end end end From b1fbdf5e4ae40ad19884ccab3ab5af28cc1ae4e5 Mon Sep 17 00:00:00 2001 From: Rafael Gibim <9031589+Drowze@users.noreply.github.com> Date: Tue, 6 Jan 2026 17:32:43 -0300 Subject: [PATCH 4/4] Review: move `skip` to a `before` hook --- .../tracing/contrib/karafka/patcher_spec.rb | 100 +++++++++--------- 1 file changed, 50 insertions(+), 50 deletions(-) diff --git a/spec/datadog/tracing/contrib/karafka/patcher_spec.rb b/spec/datadog/tracing/contrib/karafka/patcher_spec.rb index 417e599f4c8..7db64653ef6 100644 --- a/spec/datadog/tracing/contrib/karafka/patcher_spec.rb +++ b/spec/datadog/tracing/contrib/karafka/patcher_spec.rb @@ -189,76 +189,76 @@ def waterdrop_compatible? Datadog::Tracing::Contrib::WaterDrop::Integration.compatible? end - it 'automatically enables WaterDrop instrumentation' do - skip 'WaterDrop is not activated unless it is on a supported version' unless waterdrop_compatible? - - Karafka::App.setup do |c| - c.kafka = {"bootstrap.servers": '127.0.0.1:9092'} - end - - expect(Datadog.configuration.tracing[:karafka][:enabled]).to be true - expect(Datadog.configuration.tracing[:karafka][:distributed_tracing]).to be true - - expect(Datadog.configuration.tracing[:waterdrop][:enabled]).to be true - expect(Datadog.configuration.tracing[:waterdrop][:distributed_tracing]).to be true - end - - context 'when user does not supply a custom producer' do - it 'sets up Karafka.producer with the datadog waterdrop middleware' do + context 'when WaterDrop integration is on a compatible version' do + before do skip 'WaterDrop is not activated unless it is on a supported version' unless waterdrop_compatible? + end + it 'automatically enables WaterDrop instrumentation' do Karafka::App.setup do |c| c.kafka = {"bootstrap.servers": '127.0.0.1:9092'} end - expect(producer_middlewares).to eq([ - Datadog::Tracing::Contrib::WaterDrop::Middleware - ]) + expect(Datadog.configuration.tracing[:karafka][:enabled]).to be true + expect(Datadog.configuration.tracing[:karafka][:distributed_tracing]).to be true + + expect(Datadog.configuration.tracing[:waterdrop][:enabled]).to be true + expect(Datadog.configuration.tracing[:waterdrop][:distributed_tracing]).to be true end - end - context 'when the user does supply a custom producer with custom middlewares' do - let(:custom_middleware) { ->(message) { messsage } } + context 'when user does not supply a custom producer' do + it 'sets up Karafka.producer with the datadog waterdrop middleware' do + Karafka::App.setup do |c| + c.kafka = {"bootstrap.servers": '127.0.0.1:9092'} + end - it 'appends the datadog middleware at the end of the Karafka.producer middleware stack' do - skip 'WaterDrop is not activated unless it is on a supported version' unless waterdrop_compatible? + expect(producer_middlewares).to eq([ + Datadog::Tracing::Contrib::WaterDrop::Middleware + ]) + end + end - Karafka::App.setup do |c| - c.kafka = {"bootstrap.servers": '127.0.0.1:9092'} - c.producer = WaterDrop::Producer.new do |producer_config| - producer_config.kafka = {"bootstrap.servers": '127.0.0.1:9092'} - producer_config.middleware.append(custom_middleware) + context 'when the user does supply a custom producer with custom middlewares' do + let(:custom_middleware) { ->(message) { messsage } } + + it 'appends the datadog middleware at the end of the Karafka.producer middleware stack' do + Karafka::App.setup do |c| + c.kafka = {"bootstrap.servers": '127.0.0.1:9092'} + c.producer = WaterDrop::Producer.new do |producer_config| + producer_config.kafka = {"bootstrap.servers": '127.0.0.1:9092'} + producer_config.middleware.append(custom_middleware) + end end - end - expect(producer_middlewares).to eq([ - custom_middleware, - Datadog::Tracing::Contrib::WaterDrop::Middleware - ]) + expect(producer_middlewares).to eq([ + custom_middleware, + Datadog::Tracing::Contrib::WaterDrop::Middleware + ]) + end end - end - context 'when the waterdrop integration is manually configured' do - it 'appends the datadog middleware to Karafka.producer only once' do - skip 'WaterDrop is not activated unless it is on a supported version' unless waterdrop_compatible? + context 'when the waterdrop integration is manually configured' do + it 'appends the datadog middleware to Karafka.producer only once' do + Datadog.configure do |c| + c.tracing.instrument :waterdrop, configuration_options + end + Karafka::App.setup do |c| + c.kafka = {"bootstrap.servers": '127.0.0.1:9092'} + end - Datadog.configure do |c| - c.tracing.instrument :waterdrop, configuration_options + expect(producer_middlewares).to eq([ + Datadog::Tracing::Contrib::WaterDrop::Middleware + ]) end - Karafka::App.setup do |c| - c.kafka = {"bootstrap.servers": '127.0.0.1:9092'} - end - - expect(producer_middlewares).to eq([ - Datadog::Tracing::Contrib::WaterDrop::Middleware - ]) end end - context 'when the waterdrop integration is not on a compatbile version' do - it 'does not attempt to activate waterdrop or append any producer middleware' do - skip 'WaterDrop is not activated unless it is on a supported version' if waterdrop_compatible? + context 'when the waterdrop integration is not on a compatible version' do + before do + skip 'WaterDrop is on a supported version' if waterdrop_compatible? + end + it 'does not attempt to activate waterdrop or append any producer middleware' do Karafka::App.setup do |c| c.kafka = {"bootstrap.servers": '127.0.0.1:9092'} end