Skip to content

Commit

Permalink
#218 - callbacks manager concurrency fix (#219)
Browse files Browse the repository at this point in the history
  • Loading branch information
mensfeld committed Dec 1, 2021
1 parent ac63ab1 commit cec47d3
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 10 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# WaterDrop changelog

## 2.0.6 (2021-12-01)
- #218 - Fixes a case, where dispatch of callbacks the same moment a new producer was created could cause a concurrency issue in the manager.
- Fix some unstable specs.

## 2.0.5 (2021-11-28)

### Bug fixes
Expand Down
2 changes: 1 addition & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
waterdrop (2.0.5)
waterdrop (2.0.6)
concurrent-ruby (>= 1.1)
dry-configurable (~> 0.13)
dry-monitor (~> 0.5)
Expand Down
6 changes: 5 additions & 1 deletion lib/water_drop/instrumentation/callbacks_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@ def initialize
# Invokes all the callbacks registered one after another
#
# @param args [Object] any args that should go to the callbacks
# @note We do not use `#each_value` here on purpose. With it being used, we cannot dispatch
# callbacks and add new at the same time. Since we don't know when and in what thread
# things are going to be added to the manager, we need to extract values into an array and
# run it. That way we can add new things the same time.
def call(*args)
@callbacks.each_value { |a| a.call(*args) }
@callbacks.values.each { |callback| callback.call(*args) }
end

# Adds a callback to the manager
Expand Down
2 changes: 1 addition & 1 deletion lib/water_drop/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
# WaterDrop library
module WaterDrop
# Current WaterDrop version
VERSION = '2.0.5'
VERSION = '2.0.6'
end
35 changes: 31 additions & 4 deletions spec/lib/water_drop/instrumentation/callbacks_manager_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
subject(:manager) { described_class.new }

let(:id) { SecureRandom.uuid }
let(:changed) { [] }

describe '#call' do
context 'when there are no callbacks added' do
Expand All @@ -28,18 +29,44 @@
end

describe '#add' do
let(:changed) { [] }

it 'expect after adding to be used' do
manager.add(id, -> { changed << true })
manager.call
expect(changed).to eq([true])
end

context 'when we are adding a callback but at the same time, we call callbacks' do
let(:added_id) { SecureRandom.uuid }
let(:callable) do
lambda do
changed << true
sleep(10)
end
end

before do
# This will simulate a long running callback when manager is called, so when we add new one
# The previous one is still running in a thread
manager.add(id, callable)
Thread.new { manager.call }
# This makes sure, that we wait until the thread kicks in
sleep(0.001) while changed.empty?
end

it { expect { manager.add(added_id, callable) }.not_to raise_error }

it 'expect to register the new callback' do
manager.delete(id)
manager.add(added_id, -> { changed << true })

manager.call

expect(changed).to eq([true, true])
end
end
end

describe '#delete' do
let(:changed) { [] }

before { manager.add(id, -> { changed << true }) }

it 'expect after removal not to be used' do
Expand Down
8 changes: 5 additions & 3 deletions spec/lib/water_drop/producer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -188,13 +188,15 @@

producer.produce_sync(message)

sleep(0.001) while events.size < 2
sleep(0.001) while events.size < 3
end

it { expect(events.last.id).to eq('statistics.emitted') }
it { expect(events.last[:producer_id]).to eq(producer.id) }
it { expect(events.last[:statistics]['msg_cnt']).to eq(1) }
it { expect(events.last[:statistics]['msg_cnt_d']).to eq(0) }
it { expect(events.last[:statistics]['ts']).to be > 0 }
# This is in microseconds. We needed a stable value for comparison, and the distance in
# between statistics events should always be within 1ms
it { expect(events.last[:statistics]['ts_d']).to be_between(90_000, 200_000) }
end

context 'when we have more producers' do
Expand Down

0 comments on commit cec47d3

Please sign in to comment.