Skip to content

Commit

Permalink
Protect rdkafka thread from instrumentation crashes (#481)
Browse files Browse the repository at this point in the history
* protect rdkafka thread

* missing specs
  • Loading branch information
mensfeld authored Apr 23, 2024
1 parent a1b7708 commit 076094a
Show file tree
Hide file tree
Showing 9 changed files with 137 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ This release contains **BREAKING** changes. Make sure to read and apply upgrade
- **[Breaking]** Do **not** validate or morph (via middleware) messages added to the buffer prior to `flush_sync` or `flush_async`.
- [Enhancement] Provide `WaterDrop::Producer#transaction?` that returns only when producer has an active transaction running.
- [Enhancement] Introduce `instrument_on_wait_queue_full` flag (defaults to `true`) to be able to configure whether non critical (retryable) queue full errors should be instrumented in the error pipeline. Useful when building high-performance pipes with WaterDrop queue retry backoff as a throttler.
- [Enhancement] Protect critical `rdkafka` thread executable code sections.
- [Enhancement] Treat the queue size as a gauge rather than a cumulative stat (isturdy).
- [Fix] Fix a case where purge on non-initialized client would crash.
- [Fix] Middlewares run twice when using buffered produce.
Expand Down
11 changes: 11 additions & 0 deletions lib/waterdrop/instrumentation/callbacks/delivery.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,17 @@ def call(delivery_report)
else
instrument_error(delivery_report)
end
# This runs from the rdkafka thread, thus we want to safe-guard it and prevent absolute
# crashes even if the instrumentation code fails. If it would bubble-up, it could crash
# the rdkafka background thread
rescue StandardError => e
@monitor.instrument(
'error.occurred',
caller: self,
error: e,
producer_id: @producer_id,
type: 'callbacks.delivery.error'
)
end

private
Expand Down
11 changes: 11 additions & 0 deletions lib/waterdrop/instrumentation/callbacks/error.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,17 @@ def call(client_name, error)
producer_id: @producer_id,
type: 'librdkafka.error'
)
# This runs from the rdkafka thread, thus we want to safe-guard it and prevent absolute
# crashes even if the instrumentation code fails. If it would bubble-up, it could crash
# the rdkafka background thread
rescue StandardError => e
@monitor.instrument(
'error.occurred',
caller: self,
error: e,
producer_id: @producer_id,
type: 'callbacks.error.error'
)
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,17 @@ def call(_rd_config, bearer_name)
bearer: @bearer,
caller: self
)
# This runs from the rdkafka thread, thus we want to safe-guard it and prevent absolute
# crashes even if the instrumentation code fails. If it would bubble-up, it could crash
# the rdkafka background thread
rescue StandardError => e
@monitor.instrument(
'error.occurred',
caller: self,
error: e,
producer_id: @producer_id,
type: 'callbacks.oauthbearer_token_refresh.error'
)
end
end
end
Expand Down
11 changes: 11 additions & 0 deletions lib/waterdrop/instrumentation/callbacks/statistics.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,17 @@ def call(statistics)
producer_id: @producer_id,
statistics: @statistics_decorator.call(statistics)
)
# This runs from the rdkafka thread, thus we want to safe-guard it and prevent absolute
# crashes even if the instrumentation code fails. If it would bubble-up, it could crash
# the rdkafka background thread
rescue StandardError => e
@monitor.instrument(
'error.occurred',
caller: self,
error: e,
producer_id: @producer_id,
type: 'callbacks.statistics.error'
)
end
end
end
Expand Down
22 changes: 22 additions & 0 deletions spec/lib/waterdrop/instrumentation/callbacks/delivery_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,28 @@
it { expect(event[:offset]).to eq(delivery_report.offset) }
it { expect(event[:partition]).to eq(delivery_report.partition) }
it { expect(event[:topic]).to eq(delivery_report.topic_name) }

context 'when delivery handler code contains an error' do
let(:tracked_errors) { [] }

before do
monitor.subscribe('message.acknowledged') do
raise
end

local_errors = tracked_errors

monitor.subscribe('error.occurred') do |event|
local_errors << event
end
end

it 'expect to contain in, notify and continue as we do not want to crash rdkafka' do
expect { callback.call(delivery_report) }.not_to raise_error
expect(tracked_errors.size).to eq(1)
expect(tracked_errors.first[:type]).to eq('callbacks.delivery.error')
end
end
end

describe '#when we do an end-to-end delivery report check' do
Expand Down
24 changes: 24 additions & 0 deletions spec/lib/waterdrop/instrumentation/callbacks/error_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,28 @@
it { expect(event[:error]).to eq(error) }
it { expect(event[:type]).to eq('librdkafka.error') }
end

context 'when librdkafka error handling handler contains error' do
let(:tracked_errors) { [] }

before do
monitor.subscribe('error.occurred') do |event|
next unless event[:type] == 'librdkafka.error'

raise
end

local_errors = tracked_errors

monitor.subscribe('error.occurred') do |event|
local_errors << event
end
end

it 'expect to contain in, notify and continue as we do not want to crash rdkafka' do
expect { callback.call(client_name, error) }.not_to raise_error
expect(tracked_errors.size).to eq(1)
expect(tracked_errors.first[:type]).to eq('callbacks.error.error')
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,28 @@
expect(monitor).not_to have_received(:instrument)
end
end

context 'when oauth bearer handler contains error' do
let(:statistics) { { 'name' => client_name } }
let(:tracked_errors) { [] }

before do
monitor.subscribe('oauthbearer.token_refresh') do
raise
end

local_errors = tracked_errors

monitor.subscribe('error.occurred') do |event|
local_errors << event
end
end

it 'expect to contain in, notify and continue as we do not want to crash rdkafka' do
expect { callback.call(rd_config, bearer_name) }.not_to raise_error
expect(tracked_errors.size).to eq(1)
expect(tracked_errors.first[:type]).to eq('callbacks.oauthbearer_token_refresh.error')
end
end
end
end
23 changes: 23 additions & 0 deletions spec/lib/waterdrop/instrumentation/callbacks/statistics_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,29 @@
end
end

context 'when emitted statistics handler code contains an error' do
let(:statistics) { { 'name' => client_name } }
let(:tracked_errors) { [] }

before do
monitor.subscribe('statistics.emitted') do
raise
end

local_errors = tracked_errors

monitor.subscribe('error.occurred') do |event|
local_errors << event
end
end

it 'expect to contain in, notify and continue as we do not want to crash rdkafka' do
expect { callback.call(statistics) }.not_to raise_error
expect(tracked_errors.size).to eq(1)
expect(tracked_errors.first[:type]).to eq('callbacks.statistics.error')
end
end

context 'when emitted statistics refer to expected producer' do
let(:statistics) { { 'name' => client_name } }

Expand Down

0 comments on commit 076094a

Please sign in to comment.