Skip to content

Commit

Permalink
do not close never used client (#203)
Browse files Browse the repository at this point in the history
  • Loading branch information
mensfeld authored Sep 5, 2021
1 parent c50826a commit ae1a569
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## 2.0.3 (Unreleased)
- Remove rdkafka patch in favour of spec topic pre-creation
- Do not close client that was never used upon closing producer

## 2.0.2 (2021-08-13)
- Add support for `partition_key`
Expand Down
4 changes: 3 additions & 1 deletion lib/water_drop/producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,9 @@ def close

# We should not close the client in several threads the same time
# It is safe to run it several times but not exactly the same moment
client.close
# We also mark it as closed only if it was connected, if not, it would trigger a new
# connection that anyhow would be immediately closed
client.close if @client

@status.closed!
end
Expand Down
46 changes: 44 additions & 2 deletions spec/lib/water_drop/producer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -92,24 +92,66 @@
end

describe '#close' do
subject(:producer) { build(:producer).tap(&:client) }
before { allow(producer).to receive(:client).and_call_original }

context 'when producer is already closed' do
subject(:producer) { build(:producer).tap(&:client) }

before { producer.close }

it { expect { producer.close }.not_to raise_error }
it { expect(producer.tap(&:close).status.closed?).to eq(true) }
end

context 'when producer was not yet closed' do
subject(:producer) { build(:producer).tap(&:client) }

it { expect { producer.close }.not_to raise_error }
it { expect(producer.tap(&:close).status.closed?).to eq(true) }
end

context 'when there were messages in the buffer' do
before { producer.buffer(build(:valid_message)) }
subject(:producer) { build(:producer).tap(&:client) }

before do
producer.buffer(build(:valid_message))
allow(producer.client).to receive(:close).and_call_original
end

it { expect { producer.close }.to change { producer.messages.size }.from(1).to(0) }

it 'expect to close client since was open' do
producer.close
expect(producer.client).to have_received(:close)
end
end

context 'when producer was configured but not connected' do
subject(:producer) { build(:producer) }

it { expect(producer.status.configured?).to eq(true) }
it { expect { producer.close }.not_to raise_error }
it { expect(producer.tap(&:close).status.closed?).to eq(true) }

it 'expect not to close client since was not open' do
producer.close
expect(producer).not_to have_received(:client)
end
end

context 'when producer was configured and connected' do
subject(:producer) { build(:producer).tap(&:client) }

before { allow(producer.client).to receive(:close).and_call_original }

it { expect(producer.status.connected?).to eq(true) }
it { expect { producer.close }.not_to raise_error }
it { expect(producer.tap(&:close).status.closed?).to eq(true) }

it 'expect to close client since was open' do
producer.close
expect(producer.client).to have_received(:close)
end
end
end

Expand Down

0 comments on commit ae1a569

Please sign in to comment.