Skip to content

Commit

Permalink
ensure acks cannot be altered when idempotent (#493)
Browse files Browse the repository at this point in the history
  • Loading branch information
mensfeld authored May 29, 2024
1 parent 6a2dcb5 commit 8afe123
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## 2.7.3 (Unreleased)
- [Enhancement] Provide `#idempotent?` similar to `#transactional?`.
- [Enhancement] Provide alias to `#with` named `#variant`.
- [Fix] Prevent from creating `acks` altering variants on idempotent producers.

## 2.7.2 (2024-05-09)
- [Fix] Fix missing requirement of `delegate` for non-Rails use-cases. Always require delegate for variants usage (samsm)
Expand Down
2 changes: 1 addition & 1 deletion config/locales/errors.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ en:
max_wait_timeout_format: must be an integer that is equal or bigger than 0
kafka_key_must_be_a_symbol: All keys under the kafka settings scope need to be symbols
kafka_key_not_per_topic: This config option cannot be set on a per topic basis
kafka_key_acks_not_changeable: Acks value cannot be redefined for a transactional producer
kafka_key_acks_not_changeable: Acks value cannot be redefined for a transactional or idempotent producer

message:
missing: must be present
Expand Down
17 changes: 17 additions & 0 deletions lib/waterdrop/contracts/variant.rb
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,23 @@ class Variant < ::Karafka::Core::Contractable::Contract

errors
end

# Prevent from creating variants altering acks when idempotent
virtual do |config, errors|
next true unless errors.empty?
# Relevant only for the transactional producer
next true unless config.fetch(:idempotent)

errors = []

config
.fetch(:topic_config)
.keys
.select { |key| key.to_s.include?('acks') }
.each { |key| errors << [[:kafka, key], :kafka_key_acks_not_changeable] }

errors
end
end
end
end
4 changes: 3 additions & 1 deletion lib/waterdrop/producer/variant.rb
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ def to_h
# We pass this to validation, to make sure no-one alters the `acks` value when operating
# in the transactional mode as it causes librdkafka to crash ruby
# @see https://github.com/confluentinc/librdkafka/issues/4710
transactional: @producer.transactional?
transactional: @producer.transactional?,
# We pass this for a similar reason as above
idempotent: @producer.idempotent?
}
end
end
Expand Down
19 changes: 19 additions & 0 deletions spec/lib/waterdrop/contracts/variant_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
default: true,
max_wait_timeout: 10,
transactional: false,
idempotent: false,
topic_config: {
'request.required.acks': -1,
acks: 'all',
Expand Down Expand Up @@ -84,4 +85,22 @@

it { expect(contract_result).not_to be_success }
end

context 'when producer is idempotent and we try to redefine acks' do
before do
variant[:idempotent] = true
variant[:topic_config][:acks] = 1
end

it { expect(contract_result).not_to be_success }
end

context 'when producer is idempotent and we try to redefine request.required.acks' do
before do
variant[:idempotent] = true
variant[:topic_config][:'request.required.acks'] = 1
end

it { expect(contract_result).not_to be_success }
end
end
34 changes: 34 additions & 0 deletions spec/lib/waterdrop/producer/variant_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,38 @@
end
end
end

context 'when having an idempotent producer with alterations' do
subject(:producer) { build(:idempotent_producer) }

let(:variant) { producer.with(topic_config: { 'message.timeout.ms': 10_000 }) }

it 'expect to use the settings' do
expect do
variant.produce_sync(topic: SecureRandom.uuid, payload: '')
end.not_to raise_error
end

context 'when trying to overwrite acks on a idempotent producer' do
let(:variant) { producer.with(topic_config: { acks: 1 }) }

it 'expect not to allow it' do
expect do
variant.produce_sync(topic: SecureRandom.uuid, payload: '')
end.to raise_error(config_error)
end
end
end

context 'when trying to lower the acks on an idempotent producer' do
subject(:variant) { producer.variant(topic_config: { acks: 1 }) }

let(:producer) { build(:idempotent_producer) }

it 'expect not to allow it' do
expect do
variant.produce_sync(topic: SecureRandom.uuid, payload: '')
end.to raise_error(config_error)
end
end
end

0 comments on commit 8afe123

Please sign in to comment.