Skip to content

Commit

Permalink
Provide context based producer variants (#483)
Browse files Browse the repository at this point in the history
  • Loading branch information
mensfeld authored May 9, 2024
1 parent fd92b9b commit 0acec3a
Show file tree
Hide file tree
Showing 17 changed files with 457 additions and 24 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# WaterDrop changelog

## 2.7.1 (Unreleased)
- **[Feature]** Support context-base configuration with low-level topic settings alterations producer variants.
- [Enhancement] Prefix random default `SecureRandom.hex(6)` producers ids with `waterdrop-hex` to indicate type of object.

## 2.7.0 (2024-04-26)

This release contains **BREAKING** changes. Make sure to read and apply upgrade notes.
Expand Down
13 changes: 7 additions & 6 deletions Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
PATH
remote: .
specs:
waterdrop (2.7.0)
waterdrop (2.7.1)
karafka-core (>= 2.4.0, < 3.0.0)
karafka-rdkafka (>= 0.15.1.rc1)
zeitwerk (~> 2.3)

GEM
Expand All @@ -19,7 +20,7 @@ GEM
mutex_m
tzinfo (~> 2.0)
base64 (0.2.0)
bigdecimal (3.1.7)
bigdecimal (3.1.8)
byebug (11.1.3)
concurrent-ruby (1.2.3)
connection_pool (2.4.1)
Expand All @@ -29,11 +30,11 @@ GEM
factory_bot (6.4.6)
activesupport (>= 5.0.0)
ffi (1.16.3)
i18n (1.14.4)
i18n (1.14.5)
concurrent-ruby (~> 1.0)
karafka-core (2.4.0)
karafka-rdkafka (>= 0.15.0, < 0.16.0)
karafka-rdkafka (0.15.0)
karafka-rdkafka (0.15.1.rc1)
ffi (~> 1.15)
mini_portile2 (~> 2.6)
rake (> 12)
Expand All @@ -50,7 +51,7 @@ GEM
rspec-expectations (3.13.0)
diff-lcs (>= 1.2.0, < 2.0)
rspec-support (~> 3.13.0)
rspec-mocks (3.13.0)
rspec-mocks (3.13.1)
diff-lcs (>= 1.2.0, < 2.0)
rspec-support (~> 3.13.0)
rspec-support (3.13.1)
Expand All @@ -65,7 +66,7 @@ GEM
zeitwerk (2.6.13)

PLATFORMS
arm64-darwin-22
ruby
x86_64-linux

DEPENDENCIES
Expand Down
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ WaterDrop is a standalone gem that sends messages to Kafka easily with an extra
It:

- Is thread-safe
- Supports sync producing
- Supports async producing
- Supports sync and async producing
- Supports transactions
- Supports buffering
- Supports producing messages to multiple clusters
- Supports producing to multiple clusters
- Supports multiple delivery policies
- Works with Kafka `1.0+` and Ruby `2.7+`
- Supports per-topic configuration alterations (variants)
- Works with Kafka `1.0+` and Ruby `3.0+`
- Works with and without Karafka

## Documentation
Expand Down
8 changes: 8 additions & 0 deletions config/locales/errors.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ en:
max_attempts_on_transaction_command_format: must be an integer that is equal or bigger than 1
oauth.token_provider_listener_format: 'must be false or respond to #on_oauthbearer_token_refresh'

variant:
missing: must be present
default_format: must be boolean
max_wait_timeout_format: must be an integer that is equal or bigger than 0
kafka_key_must_be_a_symbol: All keys under the kafka settings scope need to be symbols
kafka_key_not_per_topic: This config option cannot be set on a per topic basis
kafka_key_acks_not_changeable: Acks value cannot be redefined for a transactional producer

message:
missing: must be present
partition_format: must be an integer greater or equal to -1
Expand Down
2 changes: 1 addition & 1 deletion lib/waterdrop/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class Config
setting(
:id,
default: false,
constructor: ->(id) { id || SecureRandom.hex(6) }
constructor: ->(id) { id || "waterdrop-#{SecureRandom.hex(6)}" }
)
# option [Instance] logger that we want to use
# @note Due to how rdkafka works, this setting is global for all the producers
Expand Down
90 changes: 90 additions & 0 deletions lib/waterdrop/contracts/variant.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# frozen_string_literal: true

module WaterDrop
module Contracts
# Variant validator to ensure basic sanity of the variant alteration data
class Variant < ::Karafka::Core::Contractable::Contract
# Taken from librdkafka config
# Those values can be changed on a per topic basis. We do not support experimental or
# deprecated values. We also do not support settings that would break rdkafka-ruby
#
# @see https://karafka.io/docs/Librdkafka-Configuration/#topic-configuration-properties
TOPIC_CONFIG_KEYS = %i[
acks
compression.codec
compression.level
compression.type
delivery.timeout.ms
message.timeout.ms
partitioner
request.required.acks
request.timeout.ms
].freeze

# Boolean values
BOOLEANS = [true, false].freeze

private_constant :TOPIC_CONFIG_KEYS, :BOOLEANS

configure do |config|
config.error_messages = YAML.safe_load(
File.read(
File.join(WaterDrop.gem_root, 'config', 'locales', 'errors.yml')
)
).fetch('en').fetch('validations').fetch('variant')
end

required(:default) { |val| BOOLEANS.include?(val) }
required(:max_wait_timeout) { |val| val.is_a?(Numeric) && val >= 0 }

# Checks if all keys are symbols
virtual do |config, errors|
next true unless errors.empty?

errors = []

config
.fetch(:topic_config)
.keys
.reject { |key| key.is_a?(Symbol) }
.each { |key| errors << [[:kafka, key], :kafka_key_must_be_a_symbol] }

errors
end

# Checks if we have any keys that are not allowed
virtual do |config, errors|
next true unless errors.empty?

errors = []

config
.fetch(:topic_config)
.keys
.reject { |key| TOPIC_CONFIG_KEYS.include?(key) }
.each { |key| errors << [[:kafka, key], :kafka_key_not_per_topic] }

errors
end

# Ensure, that acks is not changed when in transactional mode
# acks needs to be set to 'all' and should not be changed when working with transactional
# producer as it causes librdkafka to crash
virtual do |config, errors|
next true unless errors.empty?
# Relevant only for the transactional producer
next true unless config.fetch(:transactional)

errors = []

config
.fetch(:topic_config)
.keys
.select { |key| key.to_s.include?('acks') }
.each { |key| errors << [[:kafka, key], :kafka_key_acks_not_changeable] }

errors
end
end
end
end
3 changes: 3 additions & 0 deletions lib/waterdrop/errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ module Errors
# Raised when configuration doesn't match with validation contract
ConfigurationInvalidError = Class.new(BaseError)

# Raised when variant alteration is not valid
VariantInvalidError = Class.new(BaseError)

# Raised when we want to use a producer that was not configured
ProducerNotConfiguredError = Class.new(BaseError)

Expand Down
35 changes: 30 additions & 5 deletions lib/waterdrop/producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def setup(&block)
@id = @config.id
@monitor = @config.monitor
@contract = Contracts::Message.new(max_payload_size: @config.max_payload_size)
@default_variant = Variant.new(self, default: true)
@status.configured!
end

Expand Down Expand Up @@ -181,7 +182,7 @@ def close(force: false)
# The linger.ms time will be ignored for the duration of the call,
# queued messages will be sent to the broker as soon as possible.
begin
@client.flush(@config.max_wait_timeout) unless @client.closed?
@client.flush(current_variant.max_wait_timeout) unless @client.closed?
# We can safely ignore timeouts here because any left outstanding requests
# will anyhow force wait on close if not forced.
# If forced, we will purge the queue and just close
Expand Down Expand Up @@ -209,6 +210,16 @@ def close(force: false)
end
end

# Builds the variant alteration and returns it.
#
# @param args [Object] anything `Producer::Variant` initializer accepts
# @return [WaterDrop::Producer::Variant] variant proxy to use with alterations
def with(**args)
ensure_active!

Variant.new(self, **args)
end

# Closes the producer with forced close after timeout, purging any outgoing data
def close!
close(force: true)
Expand Down Expand Up @@ -244,10 +255,16 @@ def validate_message!(message)
def wait(handler)
handler.wait(
# rdkafka max_wait_timeout is in seconds and we use ms
max_wait_timeout: @config.max_wait_timeout / 1_000.0
max_wait_timeout: current_variant.max_wait_timeout / 1_000.0
)
end

# @return [Producer::Context] the variant config. Either custom if built using `#with` or
# a default one.
def current_variant
Thread.current[id] || @default_variant
end

# Runs the client produce method with a given message
#
# @param message [Hash] message we want to send
Expand All @@ -263,9 +280,17 @@ def produce(message)
ensure_active!
end

# In case someone defines topic as a symbol, we need to convert it into a string as
# librdkafka does not accept symbols
message = message.merge(topic: message[:topic].to_s) if message[:topic].is_a?(Symbol)
# We basically only duplicate the message hash only if it is needed.
# It is needed when user is using a custom settings variant or when symbol is provided as
# the topic name. We should never mutate user input message as it may be a hash that the
# user is using for some other operations
if message[:topic].is_a?(Symbol) || !current_variant.default?
message = message.dup
# In case someone defines topic as a symbol, we need to convert it into a string as
# librdkafka does not accept symbols
message[:topic] = message[:topic].to_s
message[:topic_config] = current_variant.topic_config
end

if transactional?
transaction { client.produce(**message) }
Expand Down
2 changes: 1 addition & 1 deletion lib/waterdrop/producer/transactions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ def transaction_mark_as_consumed(consumer, message, offset_metadata = nil)
client.send_offsets_to_transaction(
consumer,
tpl,
@config.max_wait_timeout
current_variant.max_wait_timeout
)
end
end
Expand Down
106 changes: 106 additions & 0 deletions lib/waterdrop/producer/variant.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# frozen_string_literal: true

module WaterDrop
class Producer
# Object that acts as a proxy allowing for alteration of certain low-level per-topic
# configuration and some other settings that users may find useful to alter, without having
# to create new producers with their underlying librdkafka instances.
#
# Since each librdkafka instance creates at least one TCP connection per broker, creating
# separate objects just to alter thing like `acks` may not be efficient and may lead to
# extensive usage of TCP connections, especially in bigger clusters.
#
# This variant object allows for "wrapping" of the producer with alteration of those settings
# in such a way, that two or more alterations can co-exist and share the same producer,
# effectively sharing the librdkafka client.
#
# Since this is an enhanced `SimpleDelegator` all `WaterDrop::Producer` APIs are preserved and
# a variant alteration can be used as a regular producer. The only important thing is to
# remember to only close it once.
#
# @note Not all settings are alterable. We only allow to alter things that are safe to be
# altered as they have no impact on the producer. If there is a setting you consider
# important and want to make it alterable, please open a GH issue for evaluation.
#
# @note Please be aware, that variant changes also affect buffers. If you overwrite the
# `max_wait_timeout`, since buffers are shared (as they exist on producer level), flushing
# may be impacted.
#
# @note `topic_config` is validated when created for the first time during message production.
# This means, that configuration error may be raised only during dispatch. There is no
# way out of this, since we need `librdkafka` instance to create the references.
class Variant < SimpleDelegator
# Empty hash we use as defaults for topic config.
# When rdkafka-ruby detects empty hash, it will use the librdkafka defaults
EMPTY_HASH = {}.freeze

private_constant :EMPTY_HASH

attr_reader :max_wait_timeout, :topic_config, :producer

# @param producer [WaterDrop::Producer] producer for which we want to have a variant
# @param max_wait_timeout [Integer, nil] alteration to max wait timeout or nil to use
# default
# @param topic_config [Hash] extra topic configuration that can be altered.
# @param default [Boolean] is this a default variant or an altered one
# @see https://karafka.io/docs/Librdkafka-Configuration/#topic-configuration-properties
def initialize(
producer,
max_wait_timeout: producer.config.max_wait_timeout,
topic_config: EMPTY_HASH,
default: false
)
@producer = producer
@max_wait_timeout = max_wait_timeout
@topic_config = topic_config
@default = default
super(producer)

Contracts::Variant.new.validate!(to_h, Errors::VariantInvalidError)
end

# @return [Boolean] is this a default variant for this producer
def default?
@default
end

# We need to wrap any methods from our API that could use a variant alteration with the
# per thread variant injection. Since method_missing can be slow and problematic, it is just
# easier to use our public API components methods to ensure the variant is being injected.
[
Async,
Buffer,
Sync,
Transactions
].each do |scope|
scope.instance_methods(false).each do |method_name|
class_eval <<-RUBY, __FILE__, __LINE__ + 1
def #{method_name}(*args, &block)
Thread.current[@producer.id] = self
@producer.#{method_name}(*args, &block)
ensure
Thread.current[@producer.id] = nil
end
RUBY
end
end

private

# @return [Hash] hash representation for contract validation to ensure basic sanity of the
# settings.
def to_h
{
default: default?,
max_wait_timeout: max_wait_timeout,
topic_config: topic_config,
# We pass this to validation, to make sure no-one alters the `acks` value when operating
# in the transactional mode as it causes librdkafka to crash ruby
# @see https://github.com/confluentinc/librdkafka/issues/4710
transactional: @producer.transactional?
}
end
end
end
end
Loading

0 comments on commit 0acec3a

Please sign in to comment.