diff --git a/CHANGELOG.md b/CHANGELOG.md index c2e3a85a..e6b452c2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ This release contains **BREAKING** changes. Make sure to read and apply upgrade notes. +- **[Feature]** Support custom OAuth providers. - **[Breaking]** Drop Ruby `2.7` support. - **[Breaking]** Change default timeouts so final delivery `message.timeout.ms` is less that `max_wait_time` so we do not end up with not final verdict. - **[Breaking]** Update all the time related configuration settings to be in `ms` and not mixed. diff --git a/Gemfile.lock b/Gemfile.lock index 50046cf7..857cfa21 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,8 +1,8 @@ PATH remote: . specs: - waterdrop (2.7.0.alpha3) - karafka-core (>= 2.4.0.alpha1, < 3.0.0) + waterdrop (2.7.0.beta1) + karafka-core (>= 2.4.0.beta2, < 3.0.0) zeitwerk (~> 2.3) GEM @@ -31,9 +31,9 @@ GEM ffi (1.16.3) i18n (1.14.4) concurrent-ruby (~> 1.0) - karafka-core (2.4.0.alpha1) - karafka-rdkafka (>= 0.15.0.alpha1, < 0.16.0) - karafka-rdkafka (0.15.0.alpha1) + karafka-core (2.4.0.beta2) + karafka-rdkafka (>= 0.15.0.beta3, < 0.16.0) + karafka-rdkafka (0.15.0.beta3) ffi (~> 1.15) mini_portile2 (~> 2.6) rake (> 12) diff --git a/config/locales/errors.yml b/config/locales/errors.yml index 1852651a..14063dcd 100644 --- a/config/locales/errors.yml +++ b/config/locales/errors.yml @@ -4,14 +4,20 @@ en: missing: must be present logger_format: must be present deliver_format: must be boolean + instrument_on_wait_queue_full_format: must be boolean id_format: must be a non-empty string + monitor_format: must be present + client_class_format: must be present max_payload_size_format: must be an integer that is equal or bigger than 1 max_wait_timeout_format: must be an integer that is equal or bigger than 0 kafka_format: must be a hash with symbol based keys kafka_key_must_be_a_symbol: All keys under the kafka settings scope need to be symbols wait_on_queue_full_format: must be boolean - wait_backoff_on_queue_full_format: must be a numeric that is bigger or equal to 0 - wait_timeout_on_queue_full_format: must be a numeric that is bigger or equal to 0 + wait_backoff_on_queue_full_format: must be a numeric that is equal or bigger to 0 + wait_timeout_on_queue_full_format: must be a numeric that is equal or bigger to 0 + wait_backoff_on_transaction_command_format: must be a numeric that is equal or bigger to 0 + max_attempts_on_transaction_command_format: must be an integer that is equal or bigger than 1 + oauth.token_provider_listener_format: 'must be false or respond to #on_oauthbearer_token_refresh' message: missing: must be present diff --git a/lib/waterdrop/clients/rdkafka.rb b/lib/waterdrop/clients/rdkafka.rb index c5a73188..422da213 100644 --- a/lib/waterdrop/clients/rdkafka.rb +++ b/lib/waterdrop/clients/rdkafka.rb @@ -11,20 +11,50 @@ class << self # @param producer [WaterDrop::Producer] producer instance with its config, etc # @note We overwrite this that way, because we do not care def new(producer) - config = producer.config.kafka.to_h + kafka_config = producer.config.kafka.to_h + monitor = producer.config.monitor - client = ::Rdkafka::Config.new(config).producer + client = ::Rdkafka::Config.new(kafka_config).producer(native_kafka_auto_start: false) + + # Register statistics runner for this particular type of callbacks + ::Karafka::Core::Instrumentation.statistics_callbacks.add( + producer.id, + Instrumentation::Callbacks::Statistics.new(producer.id, client.name, monitor) + ) + + # Register error tracking callback + ::Karafka::Core::Instrumentation.error_callbacks.add( + producer.id, + Instrumentation::Callbacks::Error.new(producer.id, client.name, monitor) + ) + + # Register oauth bearer refresh for this particular type of callbacks + ::Karafka::Core::Instrumentation.oauthbearer_token_refresh_callbacks.add( + producer.id, + Instrumentation::Callbacks::OauthbearerTokenRefresh.new(client, monitor) + ) # This callback is not global and is per client, thus we do not have to wrap it with a # callbacks manager to make it work client.delivery_callback = Instrumentation::Callbacks::Delivery.new( producer.id, producer.transactional?, - producer.config.monitor + monitor ) + oauth_listener = producer.config.oauth.token_provider_listener + # We need to subscribe the oauth listener here because we want it to be ready before + # any producer callbacks run. In theory because WaterDrop rdkafka producer is lazy loaded + # we would have enough time to make user subscribe it himself, but then it would not + # coop with auto-configuration coming from Karafka. The way it is done below, if it is + # configured it will be subscribed and if not, user always can subscribe it himself as + # long as it is done prior to first usage + monitor.subscribe(oauth_listener) if oauth_listener + + client.start + # Switch to the transactional mode if user provided the transactional id - client.init_transactions if config.key?(:'transactional.id') + client.init_transactions if kafka_config.key?(:'transactional.id') client end diff --git a/lib/waterdrop/config.rb b/lib/waterdrop/config.rb index b8830a03..5cf498d7 100644 --- a/lib/waterdrop/config.rb +++ b/lib/waterdrop/config.rb @@ -89,6 +89,14 @@ class Config constructor: ->(middleware) { middleware || WaterDrop::Middleware.new } ) + # Namespace for oauth related configuration + setting :oauth do + # option [false, #call] Listener for using oauth bearer. This listener will be able to + # get the client name to decide whether to use a single multi-client token refreshing + # or have separate tokens per instance. + setting :token_provider_listener, default: false + end + # Configuration method # @yield Runs a block of code providing a config singleton instance to it # @yieldparam [WaterDrop::Config] WaterDrop config instance diff --git a/lib/waterdrop/contracts/config.rb b/lib/waterdrop/contracts/config.rb index 7d62d5c2..51aadf34 100644 --- a/lib/waterdrop/contracts/config.rb +++ b/lib/waterdrop/contracts/config.rb @@ -14,13 +14,24 @@ class Config < ::Karafka::Core::Contractable::Contract required(:id) { |val| val.is_a?(String) && !val.empty? } required(:logger) { |val| !val.nil? } + required(:monitor) { |val| !val.nil? } required(:deliver) { |val| [true, false].include?(val) } required(:max_payload_size) { |val| val.is_a?(Integer) && val >= 1 } required(:max_wait_timeout) { |val| val.is_a?(Numeric) && val >= 0 } + required(:client_class) { |val| !val.nil? } required(:kafka) { |val| val.is_a?(Hash) && !val.empty? } required(:wait_on_queue_full) { |val| [true, false].include?(val) } + required(:instrument_on_wait_queue_full) { |val| [true, false].include?(val) } required(:wait_backoff_on_queue_full) { |val| val.is_a?(Numeric) && val >= 0 } required(:wait_timeout_on_queue_full) { |val| val.is_a?(Numeric) && val >= 0 } + required(:wait_backoff_on_transaction_command) { |val| val.is_a?(Numeric) && val >= 0 } + required(:max_attempts_on_transaction_command) { |val| val.is_a?(Integer) && val >= 1 } + + nested(:oauth) do + required(:token_provider_listener) do |val| + val == false || val.respond_to?(:on_oauthbearer_token_refresh) + end + end # rdkafka allows both symbols and strings as keys for config but then casts them to strings # This can be confusing, so we expect all keys to be symbolized diff --git a/lib/waterdrop/instrumentation/callbacks/oauthbearer_token_refresh.rb b/lib/waterdrop/instrumentation/callbacks/oauthbearer_token_refresh.rb new file mode 100644 index 00000000..07dcfb0b --- /dev/null +++ b/lib/waterdrop/instrumentation/callbacks/oauthbearer_token_refresh.rb @@ -0,0 +1,38 @@ +# frozen_string_literal: true + +module WaterDrop + module Instrumentation + module Callbacks + # Callback that is triggered when oauth token needs to be refreshed. + class OauthbearerTokenRefresh + # @param bearer [Rdkafka::Producer] given rdkafka instance. It is needed as + # we need to have a reference to call `#oauthbearer_set_token` or + # `#oauthbearer_set_token_failure` upon the event. + # @param monitor [WaterDrop::Instrumentation::Monitor] monitor we are using + def initialize(bearer, monitor) + @bearer = bearer + @monitor = monitor + end + + # Upon receiving of this event, user is required to invoke either `#oauthbearer_set_token` + # or `#oauthbearer_set_token_failure` on the `event[:bearer]` depending whether token + # obtaining was successful or not. + # + # Please refer to WaterDrop and Karafka documentation or `Rdkafka::Helpers::OAuth` + # documentation directly for exact parameters of those methods. + # + # @param _rd_config [Rdkafka::Config] + # @param bearer_name [String] name of the bearer for which we refresh + def call(_rd_config, bearer_name) + return unless @bearer.name == bearer_name + + @monitor.instrument( + 'oauthbearer.token_refresh', + bearer: @bearer, + caller: self + ) + end + end + end + end +end diff --git a/lib/waterdrop/instrumentation/notifications.rb b/lib/waterdrop/instrumentation/notifications.rb index 07196f5f..c2603a7f 100644 --- a/lib/waterdrop/instrumentation/notifications.rb +++ b/lib/waterdrop/instrumentation/notifications.rb @@ -21,6 +21,8 @@ class Notifications < ::Karafka::Core::Monitoring::Notifications messages.produced_sync messages.buffered + oauthbearer.token_refresh + transaction.started transaction.committed transaction.aborted diff --git a/lib/waterdrop/producer.rb b/lib/waterdrop/producer.rb index ba0ffdef..020c61d7 100644 --- a/lib/waterdrop/producer.rb +++ b/lib/waterdrop/producer.rb @@ -104,18 +104,6 @@ def client @pid = Process.pid @client = Builder.new.call(self, @config) - # Register statistics runner for this particular type of callbacks - ::Karafka::Core::Instrumentation.statistics_callbacks.add( - @id, - Instrumentation::Callbacks::Statistics.new(@id, @client.name, @config.monitor) - ) - - # Register error tracking callback - ::Karafka::Core::Instrumentation.error_callbacks.add( - @id, - Instrumentation::Callbacks::Error.new(@id, @client.name, @config.monitor) - ) - @status.connected! @monitor.instrument('producer.connected', producer_id: id) end @@ -209,6 +197,7 @@ def close(force: false) # Remove callbacks runners that were registered ::Karafka::Core::Instrumentation.statistics_callbacks.delete(@id) ::Karafka::Core::Instrumentation.error_callbacks.delete(@id) + ::Karafka::Core::Instrumentation.oauthbearer_token_refresh_callbacks.delete(@id) @status.closed! end diff --git a/lib/waterdrop/version.rb b/lib/waterdrop/version.rb index 00baf25b..b5dd57c6 100644 --- a/lib/waterdrop/version.rb +++ b/lib/waterdrop/version.rb @@ -3,5 +3,5 @@ # WaterDrop library module WaterDrop # Current WaterDrop version - VERSION = '2.7.0.alpha3' + VERSION = '2.7.0.beta1' end diff --git a/spec/lib/waterdrop/contracts/config_spec.rb b/spec/lib/waterdrop/contracts/config_spec.rb index edc93e66..50fc3e4c 100644 --- a/spec/lib/waterdrop/contracts/config_spec.rb +++ b/spec/lib/waterdrop/contracts/config_spec.rb @@ -8,12 +8,21 @@ { id: SecureRandom.uuid, logger: Logger.new('/dev/null'), + monitor: WaterDrop::Instrumentation::Monitor.new, deliver: false, + client_class: WaterDrop::Clients::Rdkafka, max_payload_size: 1024 * 1024, max_wait_timeout: 1, wait_on_queue_full: true, wait_backoff_on_queue_full: 1, wait_timeout_on_queue_full: 10, + wait_backoff_on_transaction_command: 15, + max_attempts_on_transaction_command_format: 5, + instrument_on_wait_queue_full: true, + max_attempts_on_transaction_command: 1, + oauth: { + token_provider_listener: false + }, kafka: { 'bootstrap.servers': 'localhost:9092,localhots:9092' } @@ -45,6 +54,34 @@ it { expect(contract_errors[:id]).not_to be_empty } end + context 'when monitor is missing' do + before { config.delete(:monitor) } + + it { expect(contract_result).not_to be_success } + it { expect(contract_errors[:monitor]).not_to be_empty } + end + + context 'when monitor is nil' do + before { config[:monitor] = nil } + + it { expect(contract_result).not_to be_success } + it { expect(contract_errors[:monitor]).not_to be_empty } + end + + context 'when client_class is missing' do + before { config.delete(:client_class) } + + it { expect(contract_result).not_to be_success } + it { expect(contract_errors[:client_class]).not_to be_empty } + end + + context 'when client_class is nil' do + before { config[:client_class] = nil } + + it { expect(contract_result).not_to be_success } + it { expect(contract_errors[:client_class]).not_to be_empty } + end + context 'when logger is missing' do before { config.delete(:logger) } @@ -141,6 +178,45 @@ it { expect(contract_result).not_to be_success } end + context 'when max_attempts_on_transaction_command is nil' do + before { config[:max_attempts_on_transaction_command] = nil } + + it { expect(contract_result).not_to be_success } + it { expect(contract_errors[:max_attempts_on_transaction_command]).not_to be_empty } + end + + context 'when max_attempts_on_transaction_command is a negative int' do + before { config[:max_attempts_on_transaction_command] = -1 } + + it { expect(contract_result).not_to be_success } + it { expect(contract_errors[:max_attempts_on_transaction_command]).not_to be_empty } + end + + context 'when max_attempts_on_transaction_command is a negative float' do + before { config[:max_attempts_on_transaction_command] = -0.1 } + + it { expect(contract_result).not_to be_success } + it { expect(contract_errors[:max_attempts_on_transaction_command]).not_to be_empty } + end + + context 'when max_attempts_on_transaction_command is 0' do + before { config[:max_attempts_on_transaction_command] = 0 } + + it { expect(contract_result).not_to be_success } + end + + context 'when max_attempts_on_transaction_command is positive int' do + before { config[:max_attempts_on_transaction_command] = 1 } + + it { expect(contract_result).to be_success } + end + + context 'when max_attempts_on_transaction_command is positive float' do + before { config[:max_attempts_on_transaction_command] = 1.1 } + + it { expect(contract_result).not_to be_success } + end + context 'when max_wait_timeout is missing' do before { config.delete(:max_wait_timeout) } @@ -194,6 +270,13 @@ it { expect(contract_errors[:wait_on_queue_full]).not_to be_empty } end + context 'when instrument_on_wait_queue_full is not a boolean' do + before { config[:instrument_on_wait_queue_full] = 0 } + + it { expect(contract_result).not_to be_success } + it { expect(contract_errors[:instrument_on_wait_queue_full]).not_to be_empty } + end + context 'when wait_backoff_on_queue_full is not a numeric' do before { config[:wait_backoff_on_queue_full] = 'na' } @@ -208,6 +291,20 @@ it { expect(contract_errors[:wait_backoff_on_queue_full]).not_to be_empty } end + context 'when wait_backoff_on_transaction_command is not a numeric' do + before { config[:wait_backoff_on_transaction_command] = 'na' } + + it { expect(contract_result).not_to be_success } + it { expect(contract_errors[:wait_backoff_on_transaction_command]).not_to be_empty } + end + + context 'when wait_backoff_on_transaction_command is less than 0' do + before { config[:wait_backoff_on_transaction_command] = -1 } + + it { expect(contract_result).not_to be_success } + it { expect(contract_errors[:wait_backoff_on_transaction_command]).not_to be_empty } + end + context 'when wait_timeout_on_queue_full is not a numeric' do before { config[:wait_timeout_on_queue_full] = 'na' } @@ -221,4 +318,23 @@ it { expect(contract_result).not_to be_success } it { expect(contract_errors[:wait_timeout_on_queue_full]).not_to be_empty } end + + context 'when oauth token_provider_listener does not respond to on_oauthbearer_token_refresh' do + before { config[:oauth][:token_provider_listener] = true } + + it { expect(contract_result).not_to be_success } + it { expect(contract_errors[:'oauth.token_provider_listener']).not_to be_empty } + end + + context 'when oauth token_provider_listener responds to on_oauthbearer_token_refresh' do + let(:listener) do + Class.new do + def on_oauthbearer_token_refresh(_); end + end + end + + before { config[:oauth][:token_provider_listener] = listener.new } + + it { expect(contract_result).to be_success } + end end diff --git a/spec/lib/waterdrop/instrumentation/callbacks/oauthbearer_token_refresh_spec.rb b/spec/lib/waterdrop/instrumentation/callbacks/oauthbearer_token_refresh_spec.rb new file mode 100644 index 00000000..632e0a84 --- /dev/null +++ b/spec/lib/waterdrop/instrumentation/callbacks/oauthbearer_token_refresh_spec.rb @@ -0,0 +1,40 @@ +# frozen_string_literal: true + +RSpec.describe_current do + subject(:callback) { described_class.new(bearer, monitor) } + + let(:bearer) { instance_double('Rdkafka::Producer', name: 'test_bearer') } + let(:monitor) { WaterDrop::Instrumentation::Monitor.new } + let(:rd_config) { Rdkafka::Config.new } + let(:bearer_name) { 'test_bearer' } + + describe '#call' do + context 'when the bearer name matches' do + before do + allow(monitor).to receive(:instrument) + callback.call(rd_config, bearer_name) + end + + it 'instruments an oauthbearer.token_refresh event' do + expect(monitor).to have_received(:instrument).with( + 'oauthbearer.token_refresh', + bearer: bearer, + caller: callback + ) + end + end + + context 'when the bearer name does not match' do + let(:bearer_name) { 'different_bearer' } + + before do + allow(monitor).to receive(:instrument) + callback.call(rd_config, bearer_name) + end + + it 'does not instrument any event' do + expect(monitor).not_to have_received(:instrument) + end + end + end +end diff --git a/spec/lib/waterdrop/producer_spec.rb b/spec/lib/waterdrop/producer_spec.rb index 400b8f5b..ac62ccbc 100644 --- a/spec/lib/waterdrop/producer_spec.rb +++ b/spec/lib/waterdrop/producer_spec.rb @@ -23,6 +23,25 @@ it { expect(producer.status.configured?).to eq(true) } it { expect(producer.status.active?).to eq(true) } end + + context 'when initializing with oauth listener' do + let(:listener) do + Class.new do + def on_oauthbearer_token_refresh(_); end + end + end + + subject(:producer) do + described_class.new do |config| + config.kafka = { 'bootstrap.servers': 'localhost:9092' } + config.oauth.token_provider_listener = listener.new + end + end + + it { expect { producer }.not_to raise_error } + it { expect(producer.status.configured?).to eq(true) } + it { expect(producer.status.active?).to eq(true) } + end end describe '#setup' do diff --git a/waterdrop.gemspec b/waterdrop.gemspec index c0862f77..05040322 100644 --- a/waterdrop.gemspec +++ b/waterdrop.gemspec @@ -16,7 +16,7 @@ Gem::Specification.new do |spec| spec.description = spec.summary spec.license = 'MIT' - spec.add_dependency 'karafka-core', '>= 2.4.0.alpha1', '< 3.0.0' + spec.add_dependency 'karafka-core', '>= 2.4.0.beta2', '< 3.0.0' spec.add_dependency 'zeitwerk', '~> 2.3' spec.required_ruby_version = '>= 3.0.0'