Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/GettingStarted.md
Original file line number Diff line number Diff line change
Expand Up @@ -1154,6 +1154,10 @@ require 'datadog'

Datadog.configure do |c|
c.tracing.instrument :karafka, **options

# different options for different kafka topics
c.tracing.instrument :karafka, describes: "some-topic", distributed_tracing: false
c.tracing.instrument :karafka, describes: /^topic-regex-.*/, enabled: false
end

```
Expand All @@ -1176,6 +1180,10 @@ require 'datadog'

Datadog.configure do |c|
c.tracing.instrument :waterdrop, **options

# different options for different kafka topics
c.tracing.instrument :waterdrop, describes: "some-topic", distributed_tracing: false
c.tracing.instrument :waterdrop, describes: /^topic-regex-.*/, enabled: false
end

```
Expand Down
11 changes: 8 additions & 3 deletions lib/datadog/tracing/contrib/karafka/framework.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,23 @@ module Karafka
# - instrument parts of the framework when needed
module Framework
def self.setup
karafka_configurations = Datadog.configuration.tracing.fetch_integration(:karafka).configurations

Datadog.configure do |datadog_config|
karafka_config = datadog_config.tracing[:karafka]
activate_waterdrop!(datadog_config, karafka_config)
karafka_configurations.each do |config_name, karafka_config|
activate_waterdrop!(datadog_config, config_name, karafka_config)
end
end
end

# Apply relevant configuration from Karafka to WaterDrop
def self.activate_waterdrop!(datadog_config, karafka_config)
def self.activate_waterdrop!(datadog_config, config_name, karafka_config)
datadog_config.tracing.instrument(
:waterdrop,
enabled: karafka_config[:enabled],
service_name: karafka_config[:service_name],
distributed_tracing: karafka_config[:distributed_tracing],
describes: config_name,
)
end
end
Expand Down
4 changes: 4 additions & 0 deletions lib/datadog/tracing/contrib/karafka/integration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ def new_configuration
def patcher
Patcher
end

def resolver
@resolver ||= Contrib::Configuration::Resolvers::PatternResolver.new
end
end
end
end
Expand Down
72 changes: 35 additions & 37 deletions lib/datadog/tracing/contrib/karafka/patcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,50 +10,42 @@ module Contrib
module Karafka
# Patch to add tracing to Karafka::Messages::Messages
module MessagesPatch
def configuration
Datadog.configuration.tracing[:karafka]
end

def propagation
@propagation ||= Contrib::Karafka::Distributed::Propagation.new
end

# `each` is the most popular access point to Karafka messages,
# but not the only one
# Other access patterns do not have a straightforward tracing avenue
# (e.g. `my_batch_operation messages.payloads`)
# @see https://github.com/karafka/karafka/blob/b06d1f7c17818e1605f80c2bb573454a33376b40/README.md?plain=1#L29-L35
def each(&block)
@messages_array.each do |message|
if configuration[:distributed_tracing]
configuration = datadog_configuration(message.topic)
trace_digest = if configuration[:distributed_tracing]
headers = if message.metadata.respond_to?(:raw_headers)
message.metadata.raw_headers
else
message.metadata.headers
end
trace_digest = Karafka.extract(headers)
Datadog::Tracing.continue_trace!(trace_digest) if trace_digest
Karafka.extract(headers)
end

if Datadog::DataStreams.enabled?
begin
headers = if message.metadata.respond_to?(:raw_headers)
message.metadata.raw_headers
else
message.metadata.headers
Tracing.trace(Ext::SPAN_MESSAGE_CONSUME, continue_from: trace_digest) do |span, trace|
if Datadog::DataStreams.enabled?
begin
headers = if message.metadata.respond_to?(:raw_headers)
message.metadata.raw_headers
else
message.metadata.headers
end

Datadog::DataStreams.set_consume_checkpoint(
type: 'kafka',
source: message.topic,
auto_instrumentation: true
) { |key| headers[key] }
rescue => e
Datadog.logger.debug("Error setting DSM checkpoint: #{e.class}: #{e}")
end

Datadog::DataStreams.set_consume_checkpoint(
type: 'kafka',
source: message.topic,
auto_instrumentation: true
) { |key| headers[key] }
rescue => e
Datadog.logger.debug("Error setting DSM checkpoint: #{e.class}: #{e}")
end
end

Tracing.trace(Ext::SPAN_MESSAGE_CONSUME) do |span|
span.set_tag(Ext::TAG_OFFSET, message.metadata.offset)
span.set_tag(Contrib::Ext::Messaging::TAG_DESTINATION, message.topic)
span.set_tag(Contrib::Ext::Messaging::TAG_SYSTEM, Ext::TAG_SYSTEM)
Expand All @@ -64,24 +56,20 @@ def each(&block)
end
end
end
end

module AppPatch
ONLY_ONCE_PER_APP = Hash.new { |h, key| h[key] = Core::Utils::OnlyOnce.new }
private

def initialized!
ONLY_ONCE_PER_APP[self].run do
# Activate tracing on components related to Karafka (e.g. WaterDrop)
Contrib::Karafka::Framework.setup
end
super
def datadog_configuration(topic)
Datadog.configuration.tracing[:karafka, topic]
end
end

# Patcher enables patching of 'karafka' module.
module Patcher
include Contrib::Patcher

ACTIVATE_FRAMEWORK_ONLY_ONCE = Core::Utils::OnlyOnce.new

module_function

def target_version
Expand All @@ -91,10 +79,20 @@ def target_version
def patch
require_relative 'monitor'
require_relative 'framework'
require_relative '../waterdrop'

::Karafka::Instrumentation::Monitor.prepend(Monitor)
::Karafka::Messages::Messages.prepend(MessagesPatch)
::Karafka::App.singleton_class.prepend(AppPatch)

if Contrib::WaterDrop::Integration.compatible?
::Karafka.monitor.subscribe('app.initialized') do |event|
ACTIVATE_FRAMEWORK_ONLY_ONCE.run do
Contrib::Karafka::Framework.setup
end

Contrib::WaterDrop::Patcher.add_middleware(::Karafka.producer)
end
end
end
end
end
Expand Down
4 changes: 4 additions & 0 deletions lib/datadog/tracing/contrib/waterdrop/integration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ def new_configuration
def patcher
Patcher
end

def resolver
@resolver ||= Contrib::Configuration::Resolvers::PatternResolver.new
end
end
end
end
Expand Down
6 changes: 3 additions & 3 deletions lib/datadog/tracing/contrib/waterdrop/middleware.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def call(message)
trace_op = Datadog::Tracing.active_trace

if trace_op && Datadog::Tracing::Distributed::PropagationPolicy.enabled?(
global_config: configuration,
global_config: datadog_configuration(message[:topic]),
trace: trace_op
)
WaterDrop.inject(trace_op.to_digest, message[:headers] ||= {})
Expand All @@ -35,8 +35,8 @@ def call(message)

private

def configuration
Datadog.configuration.tracing[:waterdrop]
def datadog_configuration(topic)
Datadog.configuration.tracing[:waterdrop, topic]
end
end
end
Expand Down
9 changes: 6 additions & 3 deletions lib/datadog/tracing/contrib/waterdrop/patcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ def patch
::WaterDrop::Producer.prepend(Producer)
::WaterDrop.instrumentation.subscribe('producer.configured') do |event|
producer = event[:producer]

included_middlewares = producer.middleware.instance_variable_get(:@steps)
producer.middleware.append(Middleware) unless included_middlewares.include?(Middleware)
add_middleware(producer)

if Datadog.configuration.data_streams.enabled
producer.monitor.subscribe('message.acknowledged') do |ack_event|
Expand All @@ -39,6 +37,11 @@ def patch
end
end
end

def add_middleware(producer)
included_middlewares = producer.middleware.instance_variable_get(:@steps)
producer.middleware.append(Middleware) unless included_middlewares.include?(Middleware)
end
end
end
end
Expand Down
Loading