Skip to content

Commit 0faf894

Browse files
committed
Support different karafka/waterdrop configurations per topic
1 parent c32d6a6 commit 0faf894

File tree

8 files changed

+119
-32
lines changed

8 files changed

+119
-32
lines changed

docs/GettingStarted.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1154,6 +1154,10 @@ require 'datadog'
11541154

11551155
Datadog.configure do |c|
11561156
c.tracing.instrument :karafka, **options
1157+
1158+
# different options for different kafka topics
1159+
c.tracing.instrument :karafka, describes: "some-topic", distributed_tracing: false
1160+
c.tracing.instrument :karafka, describes: /^topic-regex-.*/, enabled: false
11571161
end
11581162

11591163
```
@@ -1176,6 +1180,10 @@ require 'datadog'
11761180

11771181
Datadog.configure do |c|
11781182
c.tracing.instrument :waterdrop, **options
1183+
1184+
# different options for different kafka topics
1185+
c.tracing.instrument :waterdrop, describes: "some-topic", distributed_tracing: false
1186+
c.tracing.instrument :waterdrop, describes: /^topic-regex-.*/, enabled: false
11791187
end
11801188

11811189
```

lib/datadog/tracing/contrib/karafka/framework.rb

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,18 +9,23 @@ module Karafka
99
# - instrument parts of the framework when needed
1010
module Framework
1111
def self.setup
12+
karafka_configurations = Datadog.configuration.tracing.fetch_integration(:karafka).configurations
13+
1214
Datadog.configure do |datadog_config|
13-
karafka_config = datadog_config.tracing[:karafka]
14-
activate_waterdrop!(datadog_config, karafka_config)
15+
karafka_configurations.each do |config_name, karafka_config|
16+
activate_waterdrop!(datadog_config, config_name, karafka_config)
17+
end
1518
end
1619
end
1720

1821
# Apply relevant configuration from Karafka to WaterDrop
19-
def self.activate_waterdrop!(datadog_config, karafka_config)
22+
def self.activate_waterdrop!(datadog_config, config_name, karafka_config)
2023
datadog_config.tracing.instrument(
2124
:waterdrop,
25+
enabled: karafka_config[:enabled],
2226
service_name: karafka_config[:service_name],
2327
distributed_tracing: karafka_config[:distributed_tracing],
28+
describes: config_name,
2429
)
2530
end
2631
end

lib/datadog/tracing/contrib/karafka/integration.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ def new_configuration
3838
def patcher
3939
Patcher
4040
end
41+
42+
def resolver
43+
@resolver ||= Contrib::Configuration::Resolvers::PatternResolver.new
44+
end
4145
end
4246
end
4347
end

lib/datadog/tracing/contrib/karafka/patcher.rb

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,21 +10,14 @@ module Contrib
1010
module Karafka
1111
# Patch to add tracing to Karafka::Messages::Messages
1212
module MessagesPatch
13-
def configuration
14-
Datadog.configuration.tracing[:karafka]
15-
end
16-
17-
def propagation
18-
@propagation ||= Contrib::Karafka::Distributed::Propagation.new
19-
end
20-
2113
# `each` is the most popular access point to Karafka messages,
2214
# but not the only one
2315
# Other access patterns do not have a straightforward tracing avenue
2416
# (e.g. `my_batch_operation messages.payloads`)
2517
# @see https://github.com/karafka/karafka/blob/b06d1f7c17818e1605f80c2bb573454a33376b40/README.md?plain=1#L29-L35
2618
def each(&block)
2719
@messages_array.each do |message|
20+
configuration = datadog_configuration(message.topic)
2821
trace_digest = if configuration[:distributed_tracing]
2922
headers = if message.metadata.respond_to?(:raw_headers)
3023
message.metadata.raw_headers
@@ -63,6 +56,12 @@ def each(&block)
6356
end
6457
end
6558
end
59+
60+
private
61+
62+
def datadog_configuration(topic)
63+
Datadog.configuration.tracing[:karafka, topic]
64+
end
6665
end
6766

6867
module AppPatch

lib/datadog/tracing/contrib/waterdrop/integration.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ def new_configuration
3636
def patcher
3737
Patcher
3838
end
39+
40+
def resolver
41+
@resolver ||= Contrib::Configuration::Resolvers::PatternResolver.new
42+
end
3943
end
4044
end
4145
end

lib/datadog/tracing/contrib/waterdrop/middleware.rb

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ def call(message)
1313
trace_op = Datadog::Tracing.active_trace
1414

1515
if trace_op && Datadog::Tracing::Distributed::PropagationPolicy.enabled?(
16-
global_config: configuration,
16+
global_config: datadog_configuration(message[:topic]),
1717
trace: trace_op
1818
)
1919
WaterDrop.inject(trace_op.to_digest, message[:headers] ||= {})
@@ -35,8 +35,8 @@ def call(message)
3535

3636
private
3737

38-
def configuration
39-
Datadog.configuration.tracing[:waterdrop]
38+
def datadog_configuration(topic)
39+
Datadog.configuration.tracing[:waterdrop, topic]
4040
end
4141
end
4242
end

spec/datadog/tracing/contrib/karafka/patcher_spec.rb

Lines changed: 73 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
before do
1818
Datadog.configure do |c|
1919
c.tracing.instrument :karafka, configuration_options
20+
c.tracing.instrument :karafka, describes: /special_/, distributed_tracing: false
2021
end
2122
end
2223

@@ -31,16 +32,12 @@
3132
let(:span_name) { Datadog::Tracing::Contrib::Karafka::Ext::SPAN_MESSAGE_CONSUME }
3233

3334
it 'is expected to send a span' do
34-
metadata = ::Karafka::Messages::Metadata.new
35-
metadata['offset'] = 412
35+
metadata = ::Karafka::Messages::Metadata.new(offset: 412, timestamp: Time.now, topic: 'topic_a')
3636
raw_payload = rand.to_s
3737

3838
message = ::Karafka::Messages::Message.new(raw_payload, metadata)
39-
allow(message).to receive(:timestamp).and_return(Time.now)
40-
allow(message).to receive(:topic).and_return('topic_a')
41-
42-
topic = ::Karafka::Routing::Topic.new('topic_a', double(id: 0))
4339

40+
topic = ::Karafka::Routing::Topic.new(message.topic, double(id: 0))
4441
messages = ::Karafka::Messages::Builders::Messages.call([message], topic, 0, Time.now)
4542

4643
expect(messages).to all(be_a(::Karafka::Messages::Message))
@@ -55,20 +52,21 @@
5552
end
5653

5754
context 'when the message has tracing headers' do
55+
let(:topic_name) { 'topic_a' }
5856
let(:message) do
5957
headers = {}
6058
Datadog::Tracing.trace('producer') do |span, trace|
6159
Datadog::Tracing::Contrib::Karafka.inject(trace.to_digest, headers)
6260
end
63-
metadata = ::Karafka::Messages::Metadata.new
64-
metadata['offset'] = 412
65-
metadata[headers_accessor] = headers
61+
metadata = ::Karafka::Messages::Metadata.new(
62+
:offset => 412,
63+
headers_accessor => headers,
64+
:topic => topic_name,
65+
:timestamp => Time.now
66+
)
6667
raw_payload = rand.to_s
6768

68-
message = ::Karafka::Messages::Message.new(raw_payload, metadata)
69-
allow(message).to receive(:timestamp).and_return(Time.now)
70-
allow(message).to receive(:topic).and_return('topic_a')
71-
message
69+
::Karafka::Messages::Message.new(raw_payload, metadata)
7270
end
7371
let(:headers_accessor) do
7472
::Karafka::Messages::Metadata.members.include?(:raw_headers) ? 'raw_headers' : 'headers'
@@ -85,7 +83,7 @@
8583
consumer_span = Datadog::Tracing.active_span
8684
consumer_trace = Datadog::Tracing.active_trace
8785

88-
topic = ::Karafka::Routing::Topic.new('topic_a', double(id: 0))
86+
topic = ::Karafka::Routing::Topic.new(topic_name, double(id: 0))
8987
messages = ::Karafka::Messages::Builders::Messages.call([message], topic, 0, Time.now)
9088
# NOTE: The following will iterate through the messages and create a new span representing
9189
# the individual message processing (and `span` will refer to that particular span)
@@ -108,6 +106,34 @@
108106
end
109107
end
110108

109+
context 'when distributed tracing is disabled for the topic in particular' do
110+
let(:topic_name) { 'special_topic' }
111+
112+
it 'does not continue the span that produced the message' do
113+
consumer_span = nil
114+
consumer_trace = nil
115+
116+
Datadog::Tracing.trace('consumer') do
117+
consumer_span = Datadog::Tracing.active_span
118+
consumer_trace = Datadog::Tracing.active_trace
119+
120+
topic = ::Karafka::Routing::Topic.new(topic_name, double(id: 0))
121+
messages = ::Karafka::Messages::Builders::Messages.call([message], topic, 0, Time.now)
122+
expect(messages).to all(be_a(::Karafka::Messages::Message))
123+
124+
# assert that the current trace re-set to the original trace after iterating the messages
125+
expect(Datadog::Tracing.active_trace).to eq(consumer_trace)
126+
expect(Datadog::Tracing.active_span).to eq(consumer_span)
127+
end
128+
129+
expect(spans).to have(3).items
130+
131+
# assert that the message span is not continuation of the producer span
132+
expect(span.parent_id).to eq(consumer_span.id)
133+
expect(span.trace_id).to eq(consumer_trace.id)
134+
end
135+
end
136+
111137
context 'when distributed tracing is not enabled' do
112138
let(:configuration_options) { {distributed_tracing: false} }
113139

@@ -119,7 +145,8 @@
119145
consumer_span = Datadog::Tracing.active_span
120146
consumer_trace = Datadog::Tracing.active_trace
121147

122-
topic = ::Karafka::Routing::Topic.new('topic_a', double(id: 0))
148+
topic = ::Karafka::Routing::Topic.new(topic_name, double(id: 0))
149+
123150
messages = ::Karafka::Messages::Builders::Messages.call([message], topic, 0, Time.now)
124151
# NOTE: The following will iterate through the messages and create a new span representing
125152
# the individual message processing (and `span` will refer to that particular span)
@@ -148,12 +175,11 @@
148175
let(:span_name) { Datadog::Tracing::Contrib::Karafka::Ext::SPAN_WORKER_PROCESS }
149176

150177
it 'is expected to send a span' do
151-
metadata = ::Karafka::Messages::Metadata.new
152-
metadata['offset'] = 412
178+
metadata = ::Karafka::Messages::Metadata.new(offset: 412, topic: 'topic_a')
153179
raw_payload = rand.to_s
154180

155181
message = ::Karafka::Messages::Message.new(raw_payload, metadata)
156-
job = double(executor: double(topic: double(name: 'topic_a', consumer: 'ABC'), partition: 0), messages: [message])
182+
job = double(executor: double(topic: double(name: message.topic, consumer: 'ABC'), partition: 0), messages: [message])
157183

158184
Karafka.monitor.instrument('worker.processed', {job: job}) do
159185
# Noop
@@ -169,4 +195,33 @@
169195
expect(span.resource).to eq 'ABC#consume'
170196
end
171197
end
198+
199+
describe 'framework auto-instrumentation' do
200+
around do |example|
201+
# Reset before and after each example; don't allow global state to linger.
202+
Datadog.registry[:waterdrop].reset_configuration!
203+
example.run
204+
Datadog.registry[:waterdrop].reset_configuration!
205+
206+
# reset Karafka internal state as well
207+
Karafka::App.config.internal.status.reset!
208+
Karafka.refresh!
209+
end
210+
211+
it 'automatically enables waterdrop instrumentation' do
212+
Karafka::App.setup do |c|
213+
c.kafka = {"bootstrap.servers": '127.0.0.1:9092'}
214+
end
215+
216+
expect(Datadog.configuration.tracing[:karafka][:enabled]).to be true
217+
expect(Datadog.configuration.tracing[:karafka][:distributed_tracing]).to be true
218+
expect(Datadog.configuration.tracing[:karafka, 'special_topic'][:enabled]).to be true
219+
expect(Datadog.configuration.tracing[:karafka, 'special_topic'][:distributed_tracing]).to be false
220+
221+
expect(Datadog.configuration.tracing[:waterdrop][:enabled]).to be true
222+
expect(Datadog.configuration.tracing[:waterdrop][:distributed_tracing]).to be true
223+
expect(Datadog.configuration.tracing[:waterdrop, 'special_topic'][:enabled]).to be true
224+
expect(Datadog.configuration.tracing[:waterdrop, 'special_topic'][:distributed_tracing]).to be false
225+
end
226+
end
172227
end

spec/datadog/tracing/contrib/waterdrop/middleware_spec.rb

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
before do
99
Datadog.configure do |c|
1010
c.tracing.instrument :waterdrop, tracing_options
11+
c.tracing.instrument :waterdrop, describes: /special_/, distributed_tracing: false
1112
end
1213
end
1314

@@ -57,6 +58,17 @@
5758
end
5859
end
5960

61+
context 'when distributed tracing is disabled for the topic in particular' do
62+
it 'does not propagate trace context in message headers' do
63+
message_1 = {topic: 'special_topic', payload: 'foo'}
64+
Datadog::Tracing.trace('test.span') do
65+
middleware.call(message_1)
66+
end
67+
68+
expect(message_1[:headers]).to be_nil
69+
end
70+
end
71+
6072
context 'when DataStreams is enabled' do
6173
before do
6274
allow(Datadog::DataStreams).to receive(:enabled?).and_return(true)

0 commit comments

Comments
 (0)