diff --git a/Consuming-Messages.md b/Consuming-Messages.md index 415c67b2..37281ffc 100644 --- a/Consuming-Messages.md +++ b/Consuming-Messages.md @@ -1,141 +1,186 @@ -Consumers should inherit from the **ApplicationConsumer**. You need to define a ```#consume``` method that will execute your business logic code against a batch of messages. +Karafka framework has a long-running server process responsible for fetching and consuming messages. Consumers should inherit from the **ApplicationConsumer**. You need to define a ```#consume``` method that will execute your business logic code against a batch of messages. Karafka fetches and consumes messages in batches by default. -Karafka fetches and consumes messages in batches by default. +## Consuming Messages in Batches -## Consuming Messages +Data fetched from Kafka is accessible using the `#messages` method. The returned object is an enumerable containing received data and additional information that can be useful during the processing. -Karafka framework has a long-running server process responsible for fetching and consuming messages. +1. To start the Karafka server process, use the following CLI command: -To start the Karafka server process, use the following CLI command: + ```shell + bundle exec karafka server + ``` -```shell -bundle exec karafka server -``` +1. To access the message batch, use the `#messages` method: + + ```ruby + + class EventsConsumer < ApplicationConsumer + def consume + # Access the batch via messages method + batch = messages + end + end + ``` -### In Batches +1. Select one of two processing approaches based on your use case: -Data fetched from Kafka is accessible using the `#messages` method. The returned object is an enumerable containing received data and additional information that can be useful during the processing. + - Process each message one by one + - Process all payloads together to leverage batch database operations provided by many ORMs -To access the payload of your messages, you can use the `#payload` method available for each received message: +1. Access message payloads. -```ruby -class EventsConsumer < ApplicationConsumer - def consume - # Print all the payloads one after another - messages.each do |message| - puts message.payload + For individual message iteration, use the `#payload` method available for each received message: + + ```ruby + class EventsConsumer < ApplicationConsumer + def consume + # Print all the payloads one after another + messages.each do |message| + puts message.payload + end + end end - end -end -``` + ``` -You can also access all the payloads together to elevate things like batch DB operations available for some of the ORMs: + For bulk operations, use the `#payloads` method to access all payloads at once: -```ruby -class EventsConsumer < ApplicationConsumer - def consume - # Insert all the events at once with a single query - Event.insert_all messages.payloads - end -end -``` + ```ruby + class EventsConsumer < ApplicationConsumer + def consume + # Insert all the events at once with a single query + Event.insert_all messages.payloads + end + end + ``` -### One At a Time +## Consuming Messages One At a Time -While we encourage you to process data in batches to elevate in-memory computation and many DBs batch APIs, you may want to process messages one at a time. +While batch processing is recommended to leverage in-memory computation and batch database operations provided by many ORMs, you may need to process messages individually for certain use cases. -You can achieve this by defining a base consumer with such a capability: +1. To start the Karafka server process, use the following CLI command: -```ruby -class SingleMessageBaseConsumer < Karafka::BaseConsumer - attr_reader :message + ```shell + bundle exec karafka server + ``` - def consume - messages.each do |message| - @message = message - consume_one +1. Define a reusable base consumer that handles the single-message iteration pattern: - mark_as_consumed(message) + ```ruby + class SingleMessageBaseConsumer < Karafka::BaseConsumer + attr_reader :message + + def consume + messages.each do |message| + @message = message + consume_one + + mark_as_consumed(message) + end + end end - end -end -class Consumer < SingleMessageBaseConsumer - def consume_one - puts "I received following message: #{message.payload}" - end -end -``` + class Consumer < SingleMessageBaseConsumer + def consume_one + puts "I received following message: #{message.payload}" + end + end + ``` -### Accessing Topic Details + **Result:** The `#consume_one` method will be called for each message in the batch, allowing you to process messages individually while maintaining the benefits of Karafka's batch fetching. -If, in any case, your logic is dependent on some routing details, you can access them from the consumer using the ```#topic``` method. You could use it, for example, in case you want to perform a different logic within a single consumer based on the topic from which your messages come: +## Accessing Topic Details -```ruby -class UsersConsumer < ApplicationConsumer - def consume - send(:"topic_#{topic.name}") - end +If your logic depends on specific routing details, you can access them from the consumer, using the `#topic` method. - def topic_a - # do something - end +!!! example "Use Case" - def topic_b - # do something else if it's a "b" topic - end -end -``` + You could use it, for example, when you want to perform a different logic within a single consumer based on the topic from which your messages come. -If you're interested in all the details that are stored in the topic, you can extract all of them at once, by using the ```#to_h``` method: +1. To access the topic details, call the ```#topic``` method within the consume method: -```ruby -class UsersConsumer < ApplicationConsumer - def consume - puts topic.to_h #=> { name: 'x', ... } - end -end -``` + ```ruby + class UsersConsumer < ApplicationConsumer + def consume + send(:"topic_#{topic.name}") + end -## Consuming From Earliest or Latest Offset + def topic_a + # do something + end -Karafka, by default, will start consuming messages from the earliest it can reach. You can, however configure it to start consuming from the latest message by setting the `initial_offset` value as a default: + def topic_b + # do something else if it's a "b" topic + end + end + ``` -```ruby -# This will start from the earliest (default) -class KarafkaApp < Karafka::App - setup do |config| - config.initial_offset = 'earliest' - end -end +1. To extract all the details that are stored in the topic at once, use the ```#to_h``` method: -# This will make Karafka start consuming from the latest message on a given topic -class KarafkaApp < Karafka::App - setup do |config| - config.initial_offset = 'latest' - end -end -``` + ```ruby + class UsersConsumer < ApplicationConsumer + def consume + puts topic.to_h #=> { name: 'x', ... } + end + end + ``` -or on a per-topic basis: +## Setting Initial Offset Position -```ruby -class KarafkaApp < Karafka::App - routes.draw do - topic :events do - consumer EventsConsumer - # Start from earliest for this specific topic - initial_offset 'earliest' +By default, Karafka starts consuming messages from the earliest available offset. Use this procedure to configure the initial offset position for your consumers. + +To configure the initial offset globally: + +1. Open your Karafka application configuration file. +1. Set the `initial_offset` value in the setup block. + + To start from the earliest offset (default behavior): + + ```ruby + # This will start from the earliest (default) + class KarafkaApp < Karafka::App + setup do |config| + config.initial_offset = 'earliest' + end end + ``` + + To start from the latest offset: - topic :notifications do - consumer NotificationsConsumer - # Start from latest for this specific topic - initial_offset 'latest' + ```ruby + # This will make Karafka start consuming from the latest message on a given topic + class KarafkaApp < Karafka::App + setup do |config| + config.initial_offset = 'latest' + end end - end -end -``` + ``` + +**Result:** All topics will use this offset position as the default. + +To configure the initial offset for specific topics: + +1. Open your Karafka routing configuration. +1. Add the `initial_offset` setting to individual topic definitions: + + ```ruby + class KarafkaApp < Karafka::App + routes.draw do + topic :events do + consumer EventsConsumer + # Start from earliest for this specific topic + initial_offset 'earliest' + end + + topic :notifications do + consumer NotificationsConsumer + # Start from latest for this specific topic + initial_offset 'latest' + end + end + end + ``` + + **Result:** Each topic will use its configured offset position, overriding the global default. !!! note @@ -143,11 +188,13 @@ end ## Detecting Revocation Midway -When working with a distributed system like Kafka, partitions of a topic can be distributed among different consumers in a consumer group for processing. However, there might be cases where a partition needs to be taken away from a consumer and reassigned to another consumer. This is referred to as a partition revocation. +When working with a distributed system like Kafka, topic partitions can be distributed among different consumers in a consumer group for processing. However, there are cases where a partition needs to be removed from one consumer and reassigned to another. This process is known as a partition revocation. -Partition revocation can be voluntary, where the consumer willingly gives up the partition after it is done processing the current batch, or it can be involuntary. An involuntary partition revocation is typically due to a rebalance triggered by consumer group changes or a failure in the consumer, which causes it to become unresponsive. It is important to remember that involuntary revocations can occur during data processing. You may not want to continue processing messages when you know the partition has been taken away. This is where the `#revoked?` method is beneficial. +Partition revocation can be voluntary, where a consumer willingly gives up the partition after processsing the current batch, or it can be involuntary. Involuntary partition revocation usually happens due to events such as a rebalance triggered by changes in the consumer group or a failure of a consumer that makes it unresponsive. It is important to remember that involuntary revocations can occur during data processing. if you are aware that a partition has been removed, you may not want to continue processing messages. This is where the `#revoked?` method is beneficial. -By monitoring the status of the `#revoked?` method, your application can detect that your process no longer owns a partition you are operating on. In such scenarios, you can choose to stop any ongoing, expensive processing. This can help you save resources and limit the number of potential reprocessings. +By monitoring the status of the `#revoked?` method, your application can detect that your process no longer owns a partition you are operating on. In such cases, you can choose to stop any ongoing, expensive processing. This can help you save resources and reduce the number of potential reprocessings. + +As shown in the following example, you can check for revocation after processing each message: ```ruby def consume @@ -161,27 +208,34 @@ def consume end ``` -It is worth, however, keeping in mind that under normal operating conditions, Karafka will complete all ongoing processing before a rebalance occurs. This includes finishing the processing of all messages already fetched. Karafka has built-in mechanisms to handle voluntary partition revocations and rebalances, ensuring that no messages are lost or unprocessed during such events. Hence `#revoked?` is especially useful for involuntary revocations. +It is worth noting, however, that under normal operating conditions, Karafka will complete all ongoing processing before a rebalance occurs. This includes finishing the processing of all messages already fetched. Karafka has built-in mechanisms to handle voluntary partition revocations and rebalances, ensuring that no messages are lost or unprocessed during such events. Hence, `#revoked?` is especially useful for involuntary revocations. -In most cases, especially if you do not use [Long-Running Jobs](Pro-Long-Running-Jobs), the Karafka default [offset management](Offset-management) strategy should be more than enough. It ensures that after batch processing as well as upon rebalances, before partition reassignment, all the offsets are committed. In a healthy system with stable deployment procedures and without frequent short-lived consumer generations, the number of re-processings should be close to zero. +In most cases, especially if you do not use [Long-Running Jobs](Pro-Long-Running-Jobs), the Karafka default [offset management](Offset-management) strategy should be more than enough. It ensures that, after batch processing and upon rebalances, all offsets are committed before partition reassignment. In a healthy system with stable deployment procedures and without frequent short-lived consumer generations, the number of re-processings should be close to zero. !!! note - - You do **not** need to mark the message as consumed for the `#revoked?` method result to change. + + The `#revoked?` method detects partition revocation immediately. You don't need to mark messages as consumed for it to detect revocation. !!! note - When using the [Long-Running Jobs](Pro-Long-Running-Jobs) feature, `#revoked?` result also changes independently from marking messages. + With [Long-Running Jobs](Pro-Long-Running-Jobs), `#revoked?` result also changes independently from marking messages. ## Consumer Persistence -Karafka consumer instances are persistent by default. This means that a single consumer instance will "live" as long as a given process instance consumes a given topic partition. This means you can elevate in-memory processing and buffering to achieve better performance. +Karafka consumer instances are persistent by default. A single consumer instance will "live" as long as a given process consumes a given topic partition. This allows you to: + +- Maintain database connections across batches +- Keep in-memory state and caches +- Buffer messages for batch processing +- Reuse expensive resources -Karafka consumer instance for a given topic partition will be re-created in case a given partition is lost and re-assigned. +Karafka recreates the consumer instance only when a partition is lost and reassigned. !!! note - If you decide to utilize such techniques, you may be better with manual offset management. + When buffering messages in memory, use manual offset management. Without it, you'll lose buffered data, if the process crashes before flushing. + +The following example contains a consumer that buffers messages until it reaches 1,000 of them before flushing: ```ruby # A consumer that will buffer messages in memory until it reaches 1000 of them. Then it will flush @@ -217,10 +271,12 @@ end ## Shutdown and Partition Revocation Handlers -Karafka consumer, aside from the `#consume` method, allows you to define two additional methods that you can use to free any resources that you may be using upon certain events. Those are: +Karafka consumer, aside from the `#consume` method, allows you to define two additional methods to free resources used during specific events: + +- `#revoked` - it will be executed when there is a rebalance resulting in the given partition being revoked from the current process. +- `#shutdown` - it will be executed when the Karafka process is being shutdown. -- `#revoked` - will be executed when there is a rebalance resulting in the given partition being revoked from the current process. -- `#shutdown` - will be executed when the Karafka process is being shutdown. +The following code demonstrates all three lifecycle methods: ```ruby class LogsConsumer < ApplicationConsumer @@ -244,13 +300,17 @@ class LogsConsumer < ApplicationConsumer end ``` -Please note that when using `#shutdown` with the filtering API or [Delayed Topics](Pro-Delayed-Topics), there are scenarios where `#shutdown` and `#revoked` may be invoked without prior `#consume` running and the `#messages` batch may be empty. +!!! note "Shutdown Edge Case Alert" + + When you use `#shutdown` with the filtering API or [Delayed Topics](Pro-Delayed-Topics), there are scenarios where `#shutdown` and `#revoked` may be invoked without prior `#consume` running and the `#messages` batch may be empty. ## Initial State Setup -Karafka consumers provide a special `#initialized` method called automatically after the consumer instance is fully prepared and initialized. +Karafka consumers provide a dedicated `#initialized` method called automatically after the consumer instance is fully prepared and initialized. -This method can be used to set up any additional state, resources, or connections your consumer may need during its lifecycle. Karafka's consumer instance is not entirely bootstrapped during the `#initialize` method. This means crucial details, like routing information, topic details, and more, may not yet be available. Using `#initialize` to set up dependencies might result in incomplete or incorrect configurations. On the other hand, `#initialized` is executed once the consumer is fully ready and contains all the details it might need. By default, `#initialized` does nothing. Still, you can override it to include custom setup logic for your consumer: +Use this method to set up any additional state, resources, or connections your consumer may need during its lifecycle. Karafka's consumer instance is not entirely bootstrapped during the `#initialize` method. This means crucial details, like routing information, topic details, and more, may not yet be available. Using `#initialize` to set up dependencies might result in incomplete or incorrect configurations. On the other hand, `#initialized` is executed once the consumer is fully ready and contains all the details it might need. By default, `#initialized` does nothing. Still, you can override it to include custom setup logic for your consumer. + +The following example shows two methods on how to override the `#initialized"` method in a Karafka consumer to set up resources after the consumer is fully ready. ```ruby class EventsConsumer < ApplicationConsumer @@ -275,13 +335,13 @@ class EventsConsumer < ApplicationConsumer end ``` -Using `#initialized` allows access to the full context of the consumer, as it is called when the consumer has been fully set up. This provides several benefits, such as establishing database connections, setting up loggers, or initializing API clients that require topic-specific information. By deferring resource setup to `#initialized`, you avoid potential issues arising when certain resources or states are unavailable during the construction phase. +Using `#initialized` provides access to the whole consumer context, as it is called after the consumer has been fully set up. This offers several benefits, such as establishing database connections, setting up loggers, or initializing API clients that require topic-specific information. By deferring resource setup to `#initialized`, you avoid potential issues that can arise when specific resources or states are unavailable during the construction phase. -## `enable.partition.eof` Early Yield +## Early Message Yielding (`enable.partition.eof`) -In typical Karafka consumption scenarios, when a consumer reaches the end of a partition, it might still wait for new messages to arrive. This behavior is governed by settings such as `max_wait_time` or `max_messages`, which dictate how long a consumer should wait for new data before timing out or moving on. While this can benefit continuous data streams, it may introduce unnecessary latency in scenarios where real-time data processing and responsiveness are critical. +In typical Karafka consumption scenarios, when a consumer reaches the end of a partition, it might still wait for new messages to arrive. This behavior is governed by settings such as `max_wait_time` or `max_messages`, which dictate how long a consumer should wait for new data before the polling operation completes or returns. While this can benefit continuous data streams, it may introduce unnecessary latency in scenarios where real-time data processing and responsiveness are critical. -The `enable.partition.eof` configuration option changes how Karafka responds when the end of a partition is reached during message consumption. By default, when Karafka encounters the end of a partition, it waits for more messages until either `max_wait_time` or `max_messages` limits are reached. However, if `enable.partition.eof` is set for a subscription group to `true`, Karafka will immediately delegate already accumulated messages (if any) for processing, even if neither `max_wait_time` nor `max_messages` has been reached. +The `enable.partition.eof` configuration option changes how Karafka responds when the end of a partition is reached during message consumption. By default, when Karafka reaches the end of a partition, it waits for additional messages until either `max_wait_time` or `max_messages` is reached. However, if `enable.partition.eof` is set for a subscription group to `true`, Karafka will immediately delegate already accumulated messages (if any) for processing, even if neither `max_wait_time` nor `max_messages` has been reached. ### Benefits of Early Yield @@ -299,7 +359,7 @@ The `enable.partition.eof` configuration option changes how Karafka responds whe ### Configuring `enable.partition.eof` -The `enable.partition.eof` is one of the `kafka` scoped options and can be set for all subscription groups or on a per-subscription group basis, depending on your use case. +The `enable.partition.eof` is one of the `kafka`-scoped options. Depending on your use case, you can configure enable.partition.eof either globally for all subscription groups in the setup block, or on a per-subscription-group basis in your routing configuration, as shown in the following example: ```ruby class KarafkaApp < Karafka::App @@ -327,12 +387,14 @@ class KarafkaApp < Karafka::App end ``` -This configuration ensures that as soon as the end of a partition is reached, any accumulated messages are immediately processed, enhancing the system's responsiveness and efficiency. +This configuration ensures that as soon as the end of a partition is reached, any accumulated messages are immediately processed, enhancing the system responsiveness and efficiency. -## Inline API Based Consumption +## Consuming with the Iterator API Karafka Pro provides the [Iterator API](Pro-Iterator-API) that allows you to subscribe to topics and to perform lookups from Rake tasks, custom scripts, Rails console, or any other Ruby processes. +The following example demonstrates searching for messages with a specific header value: + ```ruby # Note: you still need to configure your settings using `karafka.rb` @@ -355,19 +417,20 @@ end puts "There were #{user_5_events.count} messages" ``` -You can read more about it [here](Pro-Iterator-API). +For more details on this feature, see [Iterator API](Pro-Iterator-API). -## Avoiding Unintentional Overwriting of the Consumer Instance Variables +## Avoiding Accidental Overwriting of Consumer Instance Variables -When working with Karafka consumers, it is crucial to be aware of and avoid unintentionally overwriting certain instance variables used by the consumer instances. Overwriting these variables can lead to critical processing errors and result in issues such as `worker.process.error` visible in the web UI. Below are the primary instance variables used in consumers that you need to be cautious about: +When working with Karafka consumers, it is essential to be mindful of certain instance variables used by the consumer instances. Unintentionally overwriting these variables can lead to critical processing errors and result in issues, such as `worker.process.error`, which can be seen in the Karafka Web UI. The following are the primary instance variables that you should be careful about: -- `@id`: Represents the ID of the current consumer. -- `@messages`: Stores the messages for the topic to which a given consumer is subscribed. -- `@client`: Refers to the Kafka connection client. -- `@coordinator`: Handles coordination of message processing. -- `@producer`: Holds the instance of the producer. +- `@id`: Consumer instance identifier +- `@messages`: Messages batch for the subscribed topic +- `@client`: Kafka connection client +- `@coordinator`: Message processing coordinator +- `@producer`: Producer instance +- `@used`: Internal flag tracking whether the consumer has actively processed messages -Accidentally overwriting any of these instance variables can disrupt the normal functioning of the consumer, leading to: +Accidental overwriting of any instance variables can disrupt the normal functioning of the consumer, leading to: - Inability to correctly process or retrieve messages. - Loss of connection to the Kafka server. @@ -412,7 +475,7 @@ EOF signaling can happen in two ways: !!! tip "Full Coverage of EOF" - To ensure full coverage of EOF scenarios, both the `#eofed` method and the `#eofed?` method should be used. This ensures that EOF is handled whether it occurs with or without new messages. + To ensure full coverage of EOF scenarios, use both the `#eofed` method and the `#eofed?` method. This ensures that EOF is handled whether it occurs with or without new messages. #### `#eofed` Method @@ -442,7 +505,7 @@ end #### Handling EOF in `#consume` Method -If EOF is signaled together with messages, the `#eofed` method will not be triggered. In such cases, Karafka provides a `#eofed?` method that can be used to detect that EOF has been signaled alongside the messages. +If EOF is signaled together with messages, the `#eofed` method will not be triggered. In such cases, Karafka offers the `#eofed?` method, which allows you to detect when EOF has been signaled along with the messages. The `#eofed?` method allows you to detect EOF within the `#consume` method: @@ -467,29 +530,29 @@ end Knowing when a partition has reached EOF can be helpful in several scenarios: -- **Batch Processing Completion**: When processing data in batches, knowing when you have processed all available data can be beneficial. This allows you to finalize batch operations, such as committing transactions or aggregating results. - -- **Data Synchronization**: In cases where you need to synchronize data between different systems, knowing the EOF can signal that all current data has been consumed, and it's safe to start a new synchronization cycle. +- **Batch Processing Completion**: When processing data in batches, knowing when you have processed all available data allows you to finalize batch operations, such as committing transactions or aggregating results. -- **Resource Cleanup**: After reaching the end of a partition, you may want to release or reallocate resources that are no longer needed, optimizing your application's performance. +- **Data Synchronization**: When synchronizing data between different systems, you can use EOF as a signal that all current data has been consumed and it is safe to start a new synchronization cycle. + +- **Resource Cleanup**: After reaching the end of a partition, you may want to release or reallocate resources that are no longer needed, optimizing your application performance. -- **Logging and Monitoring**: Logging EOF events can be useful for monitoring data consumption and detecting when there are no more messages to process, which can help debugging and performance tuning. +- **Logging and Monitoring**: EOF events help track data consumption and detect when no more messages are available to process, aiding in debugging and performance tuning. -- **Triggering Downstream Processes**: EOF can be a signal to trigger downstream processes that depend on the completion of data consumption, ensuring that subsequent operations only start once all relevant data has been processed. +- **Triggering Downstream Processes**: EOF signals when to start processes that require all messages to be consumed first. ## Wrapping the Execution Flow -Karafka's design includes the ability to "wrap" the execution flow, which allows to execute custom logic before and after the message processing cycle. This functionality is particularly valuable for scenarios where additional setup, teardown, or contextual operations (such as selecting a transactional producer from WaterDrop's [connection pool](WaterDrop-Connection-Pool)) are needed. +The `#wrap` method allows you to execute custom logic before and after message processing. Use this when you need additional setup, teardown, or contextual operations (such as selecting a transactional producer from WaterDrop [connection pool](WaterDrop-Connection-Pool). -The `#wrap` method surrounds the entire operational flow of the consumer, not just the user's business logic. This includes: +The `#wrap` method encompasses the consumer's entire execution flow, not just user-defined business logic. This includes: -1. **User-Defined Logic**: The custom message processing logic implemented in all the actions such as `#consume`, `#revoked`, etc. methods. +1. **User-Defined Logic**: The custom message processing logic implemented in all actions such as `#consume`, `#revoked`, etc. methods. 2. **Framework-Level Operations**: Core functionalities such as offset management, message acknowledgment, and internal state synchronization. 3. **Error Handling and Recovery**: Ensures proper transactional rollbacks or retries in case of failures. -To implement `#wrap`, override it in your consumer class. The method should ensure that `yield` is **always** invoked, regardless of any failures or conditions. This is critical because skipping `yield` can disrupt Karafka's ability to execute its internal processes, leading to inconsistencies or data loss. +To implement `#wrap`, override it in your consumer class. The method ensures that `yield` is **always** invoked, regardless of any failures or conditions. This is critical, because skipping `yield` can disrupt Karafka ability to execute its internal processes, leading to inconsistencies or data loss. -Here's an example of a `#wrap` implementation: +In the following example, `#wrap` implementation is shown: ```ruby class CustomConsumer < ApplicationConsumer diff --git a/Development/Technical-Writing.md b/Development/Technical-Writing.md new file mode 100644 index 00000000..2ecbb67b --- /dev/null +++ b/Development/Technical-Writing.md @@ -0,0 +1,5 @@ +# Best Practices + +## Documents structure and sources + +1. H1 and H2 headers in th TOC (Wiki) are rendered from the home.md file. Edit them directly in the Home.md file. Renaming the titles in the navigation pane in the VS Code editor does nothing. diff --git a/Home.md b/Home.md index bc85332e..eec1b74e 100644 --- a/Home.md +++ b/Home.md @@ -249,3 +249,4 @@ It is recommended to do one major upgrade at a time. - [LLM Documentation Guidelines](Development-LLM-Documentation-Guidelines) - [Librdkafka Update Release Policy](Development-Librdkafka-Update-Release-Policy) - [Karafka Integration Tests Catalog](Development-Karafka-Integration-Tests-Catalog) +- [Technical Writing](Development-Technical-Writing) diff --git a/Producing-Messages.md b/Producing-Messages.md index 048b2787..c1f1caaf 100644 --- a/Producing-Messages.md +++ b/Producing-Messages.md @@ -1,6 +1,8 @@ -It's quite common when using Kafka to treat applications as parts of a bigger pipeline (similarly to Bash pipeline) and forward processing results to other applications. Karafka provides a way of dealing with that by allowing you to use the [WaterDrop](https://github.com/karafka/waterdrop) messages producer from any place within your application. +It is quite common in Kafka to treat applications as components of a larger pipeline, similar to a Bash pipeline, where processing results are forwarded to other applications. Karafka facilitates this by enabling you to use the [WaterDrop](https://github.com/karafka/waterdrop) message producer from anywhere in your application. -You can access the pre-initialized WaterDrop producer instance using the `Karafka.producer` method from any place within your codebase. +You can access the pre-initialized WaterDrop producer instance using the `Karafka.producer` method from any location within your codebase. + +The following example demonstrates how to asynchronously produce a message to Kafka. ```ruby Karafka.producer.produce_async( @@ -11,7 +13,7 @@ Karafka.producer.produce_async( WaterDrop is thread-safe and operates well at scale. -If you're looking to produce messages within Karafka consumers, you have several convenient alias methods at your disposal, including `#producer`, `#produce_sync`, `#produce_async`, `#produce_many_sync`, and `#produce_many_async`. Here's how you might use them: +If you need to produce messages within Karafka consumers, you have several convenient alias methods at your disposal, including `#producer`, `#produce_sync`, `#produce_async`, `#produce_many_sync`, and `#produce_many_async`, as shown in the folllowing example: ```ruby class VisitsConsumer < ApplicationConsumer @@ -32,27 +34,35 @@ class VisitsConsumer < ApplicationConsumer end ``` -Please follow the [WaterDrop documentation](WaterDrop-Usage) for more details on how to use it. +For more details on how to use the WaterDrop producer and its various message production methods, see [WaterDrop documentation](WaterDrop-Usage) . ## Messages Piping -If you are looking for seamless message piping in Kafka-based systems, we recommend checking out the [message piping](Pro-Piping) feature in Karafka Pro. Exclusive to Karafka Pro, this feature offers synchronous and asynchronous forwarding capabilities with enhanced traceability, which is perfect for streamlining data workflows. +If you are looking for seamless message piping in Kafka-based systems, see [Piping](Pro-Piping) to get familiar with the message piping feature exclusive to Karafka Pro. This feature offers synchronous and asynchronous forwarding capabilities with enhanced traceability, which is perfect for streamlining data workflows. ## Producer Shutdown -When using the Karafka producer in processes like Puma, Sidekiq, or rake tasks, it is always recommended to call the `#close` method on the producer before shutting it down. +Before shutting down the Karafka producer in processes such as Puma, Sidekiq, or rake tasks, make sure to call the `#close` method on the producer. + +This is because the `#close` method ensures that any pending messages in the producer buffer are flushed to the Kafka broker before shutting down the producer. + +!!! info + + If you do not call `#close`, there is a risk that some messages may not be sent to the Kafka broker, resulting in lost or incomplete data. In addition, calling `#close` also releases any resources held by the producer, such as network connections, file handles, and memory buffers. Failing to release these resources can lead to memory leaks, socket exhaustion, or other system-level issues that can impact the stability and performance of your application. -This is because the `#close` method ensures that any pending messages in the producer's buffer are flushed to the Kafka broker before shutting down the producer. If you do not call `#close`, there is a risk that some messages may not be sent to the Kafka broker, resulting in lost or incomplete data. +Overall, calling `#close` on the Karafka producer is a best practice that helps ensure reliable and efficient message delivery to Kafka while promoting your application stability and scalability. -In addition, calling `#close` also releases any resources held by the producer, such as network connections, file handles, and memory buffers. Failing to release these resources can lead to memory leaks, socket exhaustion, or other system-level issues that can impact the stability and performance of your application. +In the following sections, you can find an example of how to `#close` the producer used in various Ruby processes. -Overall, calling `#close` on the Karafka producer is a best practice that helps ensure reliable and efficient message delivery to Kafka while promoting your application's stability and scalability. +!!! warning -Below you can find an example of how to `#close` the producer used in various Ruby processes. Please note, that you should **not** close the producer manually if you are using the [Embedding API](Embedding) in the same process. + Note, that you should **not** close the producer manually if you are using the [Embedding API](Embedding) in the same process. ### Closing Producer Used in Karafka -When you shut down the Karafka consumer, the `Karafka.producer` automatically closes. There's no need to close it yourself. If you're using multiple producers or a more advanced setup, you can use the `app.stopped` event during shutdown to handle them. +When you shut down the Karafka consumer, the `Karafka.producer` automatically closes. There is no need to close it yourself. If you are using multiple producers or a more advanced setup, you can use the `app.stopped` event during shutdown to handle them. + +The following examples show how to properly close Karafka producers in various Ruby environments to ensure all messages are delivered and resources are released. ### Closing Producer Used in Puma (Single Mode) @@ -162,9 +172,11 @@ end ### Closing Producer in any Ruby Process -While integrating Karafka producers into your Ruby applications, it's essential to ensure that resources are managed correctly, especially when terminating processes. We generally recommend utilizing hooks specific to the environment or framework within which the producer operates. These hooks ensure graceful shutdowns and resource cleanup tailored to the application's lifecycle. +While integrating Karafka producers into your Ruby applications, it is essential to ensure that resources are managed correctly, especially when terminating processes. We generally recommend utilizing hooks specific to the environment or framework within which the producer operates. These hooks ensure proper shutdowns and resource cleanup tailored to the application lifecycle. + +However, there might be scenarios where such specific hooks are not available or suitable. In these cases, employ Ruby's `at_exit` hook as a universal fallback to close the producer before the Ruby process exits. -However, there might be scenarios where such specific hooks are not available or suitable. In these cases, Ruby's `at_exit` hook can be employed as a universal fallback to close the producer before the Ruby process exits. Here's a basic example of using at_exit with a Karafka producer: +The following basic example demonstrates using at_exit with a Karafka producer: ```ruby at_exit do @@ -174,11 +186,13 @@ end ## Producing to Multiple Clusters -Karafka, by default, provides a producer that sends messages to a specified Kafka cluster. If you don't configure it otherwise, this producer will always produce messages to the default cluster that you've configured Karafka to work with. If you only specify one Kafka cluster in your configuration, all produced messages will be sent to this cluster. This is the out-of-the-box behavior and works well for many setups with a single cluster. +Karafka, by default, provides a producer that sends messages to a specified Kafka cluster. If you do not configure it otherwise, this producer will always produce messages to the default cluster that you have configured Karafka to work with. If you specify one Kafka cluster in your configuration, all produced messages will be sent to this cluster. This is the out-of-the-box behavior and works well for many setups with a single cluster. + +However, if you have a more complex setup where you need to produce messages to different Kafka clusters based on certain logic or conditions, you need a more customized setup. In such cases, configure a producer for each cluster you want to produce to. This allows for distinct producer configurations for each cluster, making it possible to produce to any of them as needed. -However, if you have a more complex setup where you'd like to produce messages to different Kafka clusters based on certain logic or conditions, you need a more customized setup. In such cases, you must configure a producer for each cluster to which you want to produce. This means you'll have separate producer configurations tailored to each cluster, allowing you to produce to any of them as required. +If you need to determine which cluster to produce to based on the consumer logic or the message being consumed, you can override the `#producer` method in your consumer. By doing so, you can define a cluster-aware producer instance that aligns with your application's logic. -In scenarios where you want to decide which cluster to produce to based on the consumer logic or the consumed message, you can override the `#producer` method in your consumer. By overriding this method, you can specify a dedicated cluster-aware producer instance depending on your application's logic. +The following example demonstrates how to create and use multiple producers for different Kafka clusters, allowing you to dynamically route messages: ```ruby # Define your producers for each of the clusters @@ -210,6 +224,6 @@ class MyConsumer < ApplicationConsumer end ``` -The Web UI relies on per-producer listeners to monitor asynchronous errors. If you're crafting your consumers and utilizing the Web UI, please ensure you configure this integration appropriately. +The Web UI relies on per-producer listeners to monitor asynchronous errors. If you craft your consumers and utilize the Web UI, make sure that you configure this integration appropriately. -By leveraging this flexibility in Karafka, you can effectively manage and direct the flow of messages in multi-cluster Kafka environments, ensuring that data gets to the right place based on your application's unique requirements. +By leveraging this flexibility in Karafka, you can effectively manage and direct message flow in multi-cluster Kafka environments, ensuring data reaches the right place based on your application's unique requirements. diff --git a/Upgrades/Karafka/2.0.md b/Upgrades/Karafka/2.0.md index 4b4c4b2f..1ff6944f 100644 --- a/Upgrades/Karafka/2.0.md +++ b/Upgrades/Karafka/2.0.md @@ -396,7 +396,7 @@ class Consumer < SingleMessageBaseConsumer end ``` -[Here](Consuming-Messages#one-at-a-time) you can find more details about this. +[Here](Consuming-Messages#consuming-messages-one-at-a-time) you can find more details about this. ## Dependency changes