Releases: karafka/waterdrop
v2.8.0
This release contains BREAKING changes. Make sure to read and apply upgrade notes.
- [Breaking] Require Ruby
3.1+
. - [Breaking] Remove ability to abort transactions using
throw(:abort)
. Please useraise WaterDrop::Errors::AbortTransaction
. - [Breaking] Disallow (similar to ActiveRecord) exiting transactions with
return
,break
orthrow
. - [Breaking] License changed from MIT to LGPL with an additional commercial option. Note: there is no commercial code in this repository. The commercial license is available for companies unable to use LGPL-licensed software for legal reasons.
- [Enhancement] Make variants fiber safe.
- [Enhancement] In transactional mode do not return any
dispatched
messages as none will be dispatched due to rollback. - [Enhancement] Align the
LoggerListener
async messages to reflect, that messages are delegated to the internal queue and not dispatched. - [Fix] Ensure, that
:dispatched
key for#produce_many_sync
always contains delivery handles (final) and not delivery reports.
Upgrade Notes
PLEASE MAKE SURE TO READ AND APPLY THEM!
throw(:abort)
No Longer Allowed To Abort Transactions
Replace:
producer.transaction do
messages.each do |message|
# Pipe all events
producer.produce_async(topic: 'events', payload: message.raw_payload)
end
# And abort if more events are no longer needed
throw(:abort) if KnowledgeBase.more_events_needed?
end
With:
producer.transaction do
messages.each do |message|
# Pipe all events
producer.produce_async(topic: 'events', payload: message.raw_payload)
end
# And abort if more events are no longer needed
raise(WaterDrop::AbortTransaction) if KnowledgeBase.more_events_needed?
end
return
, break
and throw
No Longer Allowed Inside Transaction Block
Previously, transactions would abort if you exited early using return
, break
, or throw
. This could create unexpected behavior, where users might not notice the rollback or have different intentions. For example, the following would trigger a rollback:
MAX = 10
def process(messages)
count = 0
producer.transaction do
messages.each do |message|
count += 1
producer.produce_async(topic: 'events', payload: message.raw_payload)
# This would trigger a rollback.
return if count >= MAX
end
end
end
This is a source of errors, hence such exits are no longer allowed. You can implement similar flow control inside of your methods that are wrapped in a WaterDrop transaction:
MAX = 10
def process(messages)
producer.transaction do
# Early return from this method will not affect the transaction.
# It will be committed
insert_with_limit(messages)
end
end
def insert_with_limit(messages)
count = 0
messages.each do |message|
count += 1
producer.produce_async(topic: 'events', payload: message.raw_payload)
# This would trigger a rollback.
return if count >= MAX
end
end
v2.7.4
- [Maintenance] Alias
WaterDrop::Errors::AbortTransaction
withWaterDrop::AbortTransaction
. - [Maintenance] Lower the precision reporting to 100 microseconds in the logger listener.
- [Fix] Consumer consuming error: Local: Erroneous state (state) post break flow in transaction.
- [Change] Require 'karafka-core'
>= 2.4.3
v2.7.3
- [Enhancement] Introduce
reload_on_transaction_fatal_error
to reload the librdkafka after transactional failures - [Enhancement] Flush on fatal transactional errors.
- [Enhancement] Add topic scope to
report_metric
(YadhuPrakash) - [Enhancement] Cache middleware reference saving 1 object allocation on each message dispatch.
- [Enhancement] Provide
#idempotent?
similar to#transactional?
. - [Enhancement] Provide alias to
#with
named#variant
. - [Fix] Prevent from creating
acks
altering variants on idempotent producers.
v2.7.2
v2.7.1
v2.7.0
This release contains BREAKING changes. Make sure to read and apply upgrade notes.
- [Feature] Support custom OAuth providers.
- [Breaking] Drop Ruby
2.7
support. - [Breaking] Change default timeouts so final delivery
message.timeout.ms
is less thatmax_wait_time
so we do not end up with not final verdict. - [Breaking] Update all the time related configuration settings to be in
ms
and not mixed. - [Breaking] Remove no longer needed
wait_timeout
configuration option. - [Breaking] Do not validate or morph (via middleware) messages added to the buffer prior to
flush_sync
orflush_async
. - [Enhancement] Provide
WaterDrop::Producer#transaction?
that returns only when producer has an active transaction running. - [Enhancement] Introduce
instrument_on_wait_queue_full
flag (defaults totrue
) to be able to configure whether non critical (retryable) queue full errors should be instrumented in the error pipeline. Useful when building high-performance pipes with WaterDrop queue retry backoff as a throttler. - [Enhancement] Protect critical
rdkafka
thread executable code sections. - [Enhancement] Treat the queue size as a gauge rather than a cumulative stat (isturdy).
- [Fix] Fix a case where purge on non-initialized client would crash.
- [Fix] Middlewares run twice when using buffered produce.
- [Fix] Validations run twice when using buffered produce.
Upgrade Notes
PLEASE MAKE SURE TO READ AND APPLY THEM!
wait_timeout
Configuration No Longer Needed
The wait_timeout
WaterDrop configuration option is no longer needed. You can safely remove it.
producer = WaterDrop::Producer.new
producer.setup do |config|
# Other config...
# Remove this, no longer needed
config.wait_timeout = 30
end
Time Settings Format Alignment
All time-related values are now configured in milliseconds instead of some being in seconds and some in milliseconds.
The values that were changed from seconds to milliseconds are:
max_wait_timeout
wait_backoff_on_queue_full
wait_timeout_on_queue_full
wait_backoff_on_transaction_command, default
If you have configured any of those yourself, please replace the seconds representation with milliseconds:
producer = WaterDrop::Producer.new
producer.setup do |config|
config.deliver = true
# Replace this:
config.max_wait_timeout = 30
# With
config.max_wait_timeout = 30_000
# ...
end
Defaults Alignment
In this release, we've updated our default settings to address a crucial issue: previous defaults could lead to inconclusive outcomes in synchronous operations due to wait timeout errors. Users often mistakenly believed that a message dispatch was halted because of these errors when, in fact, the timeout was related to awaiting the final dispatch verdict, not the dispatch action itself.
The new defaults in WaterDrop 2.7.0 eliminate this confusion by ensuring synchronous operation results are always transparent and conclusive. This change aims to provide a straightforward understanding of wait timeout errors, reinforcing that they reflect the wait state, not the dispatch success.
Below, you can find a table with what has changed, the new defaults, and the current ones in case you want to retain the previous behavior:
Config | Previous Default | New Default |
---|---|---|
root max_wait_timeout |
5000 ms (5 seconds) | 60000 ms (60 seconds) |
kafka message.timeout.ms |
300000 ms (5 minutes) | 50000 ms (50 seconds) |
kafka transaction.timeout.ms |
60000 ms (1 minute) | 55000 ms (55 seconds) |
This alignment ensures that when using sync operations or invoking #wait
, any exception you get should give you a conclusive and final delivery verdict.
Buffering No Longer Early Validates Messages
As of version 2.7.0
, WaterDrop has changed how message buffering works. Previously, messages underwent validation and middleware processing when they were buffered. Now, these steps are deferred until just before dispatching the messages. The buffer functions strictly as a thread-safe storage area without performing any validations or middleware operations until the messages are ready to be sent.
This adjustment was made primarily to ensure that middleware runs and validations are applied when most relevant—shortly before message dispatch. This approach addresses potential issues with buffers that might hold messages for extended periods:
-
Temporal Relevance: Validating and processing messages near their dispatch time helps ensure that actions such as partition assignments reflect the current system state. This is crucial in dynamic environments where system states are subject to rapid changes.
-
Stale State Management: By delaying validations and middleware to the dispatch phase, the system minimizes the risk of acting on outdated information, which could lead to incorrect processing or partitioning decisions.
# Prior to 2.7.0 this would raise an error
producer.buffer(topic: nil, payload: '')
# => WaterDrop::Errors::MessageInvalidError
# After 2.7.0 buffer will not, but flush_async will
producer.buffer(topic: nil, payload: '')
# => all good here
producer.flush_async(topic: nil, payload: '')
# => WaterDrop::Errors::MessageInvalidError
Middleware Execution Prior to Flush When Buffering
The timing of middleware execution has been adjusted. Middleware, which was previously run when messages were added to the buffer, will now only execute immediately before the messages are flushed from the buffer and dispatched. This change is similar to the validation-related changes.
v2.7.0.rc2
This release contains BREAKING changes. Make sure to read and apply upgrade notes.
- [Feature] Support custom OAuth providers.
- [Breaking] Drop Ruby
2.7
support. - [Breaking] Change default timeouts so final delivery
message.timeout.ms
is less thatmax_wait_time
so we do not end up with not final verdict. - [Breaking] Update all the time related configuration settings to be in
ms
and not mixed. - [Breaking] Remove no longer needed
wait_timeout
configuration option. - [Breaking] Do not validate or morph (via middleware) messages added to the buffer prior to
flush_sync
orflush_async
. - [Enhancement] Provide
WaterDrop::Producer#transaction?
that returns only when producer has an active transaction running. - [Enhancement] Introduce
instrument_on_wait_queue_full
flag (defaults totrue
) to be able to configure whether non critical (retryable) queue full errors should be instrumented in the error pipeline. Useful when building high-performance pipes with WaterDrop queue retry backoff as a throttler. - [Enhancement] Treat the queue size as a gauge rather than a cumulative stat (isturdy).
- [Fix] Fix a case where purge on non-initialized client would crash.
- [Fix] Middlewares run twice when using buffered produce.
- [Fix] Validations run twice when using buffered produce.
Upgrade Notes
PLEASE MAKE SURE TO READ AND APPLY THEM!
wait_timeout
Configuration No Longer Needed
The wait_timeout
WaterDrop configuration option is no longer needed. You can safely remove it.
producer = WaterDrop::Producer.new
producer.setup do |config|
# Other config...
# Remove this, no longer needed
config.wait_timeout = 30
end
Time Settings Format Alignment
All time-related values are now configured in milliseconds instead of some being in seconds and some in milliseconds.
The values that were changed from seconds to milliseconds are:
max_wait_timeout
wait_backoff_on_queue_full
wait_timeout_on_queue_full
wait_backoff_on_transaction_command, default
If you have configured any of those yourself, please replace the seconds representation with milliseconds:
producer = WaterDrop::Producer.new
producer.setup do |config|
config.deliver = true
# Replace this:
config.max_wait_timeout = 30
# With
config.max_wait_timeout = 30_000
# ...
end
Defaults Alignment
In this release, we've updated our default settings to address a crucial issue: previous defaults could lead to inconclusive outcomes in synchronous operations due to wait timeout errors. Users often mistakenly believed that a message dispatch was halted because of these errors when, in fact, the timeout was related to awaiting the final dispatch verdict, not the dispatch action itself.
The new defaults in WaterDrop 2.7.0 eliminate this confusion by ensuring synchronous operation results are always transparent and conclusive. This change aims to provide a straightforward understanding of wait timeout errors, reinforcing that they reflect the wait state, not the dispatch success.
Below, you can find a table with what has changed, the new defaults, and the current ones in case you want to retain the previous behavior:
Config | Previous Default | New Default |
---|---|---|
root max_wait_timeout |
5000 ms (5 seconds) | 60000 ms (60 seconds) |
kafka message.timeout.ms |
300000 ms (5 minutes) | 50000 ms (50 seconds) |
kafka transaction.timeout.ms |
60000 ms (1 minute) | 55000 ms (55 seconds) |
This alignment ensures that when using sync operations or invoking #wait
, any exception you get should give you a conclusive and final delivery verdict.
Buffering No Longer Early Validates Messages
As of version 2.7.0
, WaterDrop has changed how message buffering works. Previously, messages underwent validation and middleware processing when they were buffered. Now, these steps are deferred until just before dispatching the messages. The buffer functions strictly as a thread-safe storage area without performing any validations or middleware operations until the messages are ready to be sent.
This adjustment was made primarily to ensure that middleware runs and validations are applied when most relevant—shortly before message dispatch. This approach addresses potential issues with buffers that might hold messages for extended periods:
-
Temporal Relevance: Validating and processing messages near their dispatch time helps ensure that actions such as partition assignments reflect the current system state. This is crucial in dynamic environments where system states are subject to rapid changes.
-
Stale State Management: By delaying validations and middleware to the dispatch phase, the system minimizes the risk of acting on outdated information, which could lead to incorrect processing or partitioning decisions.
# Prior to 2.7.0 this would raise an error
producer.buffer(topic: nil, payload: '')
# => WaterDrop::Errors::MessageInvalidError
# After 2.7.0 buffer will not, but flush_async will
producer.buffer(topic: nil, payload: '')
# => all good here
producer.flush_async(topic: nil, payload: '')
# => WaterDrop::Errors::MessageInvalidError
Middleware Execution Prior to Flush When Buffering
The timing of middleware execution has been adjusted. Middleware, which was previously run when messages were added to the buffer, will now only execute immediately before the messages are flushed from the buffer and dispatched. This change is similar to the validation-related changes.
v2.7.0.rc1
This release contains BREAKING changes. Make sure to read and apply upgrade notes.
- [Feature] Support custom OAuth providers.
- [Breaking] Drop Ruby
2.7
support. - [Breaking] Change default timeouts so final delivery
message.timeout.ms
is less thatmax_wait_time
so we do not end up with not final verdict. - [Breaking] Update all the time related configuration settings to be in
ms
and not mixed. - [Breaking] Remove no longer needed
wait_timeout
configuration option. - [Enhancement] Provide
WaterDrop::Producer#transaction?
that returns only when producer has an active transaction running. - [Enhancement] Introduce
instrument_on_wait_queue_full
flag (defaults totrue
) to be able to configure whether non critical (retryable) queue full errors should be instrumented in the error pipeline. Useful when building high-performance pipes with WaterDrop queue retry backoff as a throttler.
Upgrade Notes
PLEASE MAKE SURE TO READ AND APPLY THEM!
Time Settings Format Alignment
All time-related values are now configured in milliseconds instead of some being in seconds and some in milliseconds.
The values that were changed from seconds to milliseconds are:
max_wait_timeout
wait_timeout
wait_backoff_on_queue_full
wait_timeout_on_queue_full
wait_backoff_on_transaction_command, default
If you have configured any of those yourself, please replace the seconds representation with milliseconds:
producer = WaterDrop::Producer.new
producer.setup do |config|
config.deliver = true
# Replace this:
config.wait_timeout = 30
# With
config.wait_timeout = 30_000
# ...
end
Defaults Alignment
In this release, we've updated our default settings to address a crucial issue: previous defaults could lead to inconclusive outcomes in synchronous operations due to wait timeout errors. Users often mistakenly believed that a message dispatch was halted because of these errors when, in fact, the timeout was related to awaiting the final dispatch verdict, not the dispatch action itself.
The new defaults in WaterDrop 2.7.0 eliminate this confusion by ensuring synchronous operation results are always transparent and conclusive. This change aims to provide a straightforward understanding of wait timeout errors, reinforcing that they reflect the wait state, not the dispatch success.
Below, you can find a table with what has changed, the new defaults, and the current ones in case you want to retain the previous behavior:
Config | Previous Default | New Default |
---|---|---|
root max_wait_timeout |
5000 ms (5 seconds) | 60000 ms (60 seconds) |
kafka message.timeout.ms |
300000 ms (5 minutes) | 50000 ms (50 seconds) |
kafka transaction.timeout.ms |
60000 ms (1 minute) | 55000 ms (55 seconds) |
This alignment ensures that when using sync operations or invoking #wait
, any exception you get should give you a conclusive and final delivery verdict.
v2.7.0.beta1
This release contains BREAKING changes. Make sure to read and apply upgrade notes.
- [Feature] Support custom OAuth providers.
- [Breaking] Drop Ruby
2.7
support. - [Breaking] Change default timeouts so final delivery
message.timeout.ms
is less thatmax_wait_time
so we do not end up with not final verdict. - [Breaking] Update all the time related configuration settings to be in
ms
and not mixed. - [Breaking] Remove no longer needed
wait_timeout
configuration option. - [Enhancement] Introduce
instrument_on_wait_queue_full
flag (defaults totrue
) to be able to configure whether non critical (retryable) queue full errors should be instrumented in the error pipeline. Useful when building high-performance pipes with WaterDrop queue retry backoff as a throttler.
v2.7.0.alpha3
- [Breaking] Drop Ruby
2.7
support. - [Breaking] Change default timeouts so final delivery
message.timeout.ms
is less thatmax_wait_time
so we do not end up with not final verdict. - [Breaking] Update all the time related configuration settings to be in
ms
and not mixed. - [Breaking] Remove no longer needed
wait_timeout
configuration option. - [Enhancement] Introduce
instrument_on_wait_queue_full
flag (defaults totrue
) to be able to configure whether non critical (retryable) queue full errors should be instrumented in the error pipeline. Useful when building high-performance pipes with WaterDrop queue retry backoff as a throttler.