Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 81 additions & 31 deletions Consuming-Messages.md
Original file line number Diff line number Diff line change
@@ -1,22 +1,33 @@
Consumers should inherit from the **ApplicationConsumer**. You need to define a ```#consume``` method that will execute your business logic code against a batch of messages.
Karafka framework has a long-running server process responsible for fetching and consuming messages. Consumers should inherit from the **ApplicationConsumer**. You need to define a ```#consume``` method that will execute your business logic code against a batch of messages. Karafka fetches and consumes messages in batches by default.

Karafka fetches and consumes messages in batches by default.
## Consuming Messages in Batches

## Consuming Messages

Karafka framework has a long-running server process responsible for fetching and consuming messages.
Data fetched from Kafka is accessible using the `#messages` method. The returned object is an enumerable containing received data and additional information that can be useful during the processing.

To start the Karafka server process, use the following CLI command:
1. To start the Karafka server process, use the following CLI command:

```shell
bundle exec karafka server
```
2. To access the message batch, use the `#messages` method:
```ruby

class EventsConsumer < ApplicationConsumer
def consume
# Access the batch via messages method
batch = messages
end
end
```

### In Batches
3. Select one of two processing approaches based on your use case:

Data fetched from Kafka is accessible using the `#messages` method. The returned object is an enumerable containing received data and additional information that can be useful during the processing.
- Process each message one by one
- Process all payloads together to leverage batch database operations provided by many ORMs

4. Access message payloads.

To access the payload of your messages, you can use the `#payload` method available for each received message:
For individual message iteration, use the `#payload` method available for each received message:

```ruby
class EventsConsumer < ApplicationConsumer
Expand All @@ -28,8 +39,8 @@ class EventsConsumer < ApplicationConsumer
end
end
```

You can also access all the payloads together to elevate things like batch DB operations available for some of the ORMs:
For bulk operations, use the `#payloads` method to access all payloads at once:

```ruby
class EventsConsumer < ApplicationConsumer
Expand All @@ -40,11 +51,16 @@ class EventsConsumer < ApplicationConsumer
end
```

### One At a Time
## Consuming Messages One At a Time

While we encourage you to process data in batches to elevate in-memory computation and many DBs batch APIs, you may want to process messages one at a time.
While batch processing is recommended to leverage in-memory computation and batch database operations provided by many ORMs, you may need to process messages individually for certain use cases.

You can achieve this by defining a base consumer with such a capability:
1. To start the Karafka server process, use the following CLI command:

```shell
bundle exec karafka server
```
2. Define a reusable base consumer that handles the single-message iteration pattern:

```ruby
class SingleMessageBaseConsumer < Karafka::BaseConsumer
Expand All @@ -66,10 +82,17 @@ class Consumer < SingleMessageBaseConsumer
end
end
```
**Result:** The `#consume_one` method will be called for each message in the batch, allowing you to process messages individually while maintaining the benefits of Karafka's batch fetching.

## Accessing Topic Details

If your logic depends on specific routing details, you can access them from the consumer, using the ```#topic``` method.

### Accessing Topic Details
!!! example Use Case

If, in any case, your logic is dependent on some routing details, you can access them from the consumer using the ```#topic``` method. You could use it, for example, in case you want to perform a different logic within a single consumer based on the topic from which your messages come:
You could use it, for example, when you want to perform a different logic within a single consumer based on the topic from which your messages come.

1. To access the topic details, call the ```#topic``` method within the consume method:

```ruby
class UsersConsumer < ApplicationConsumer
Expand All @@ -87,7 +110,7 @@ class UsersConsumer < ApplicationConsumer
end
```

If you're interested in all the details that are stored in the topic, you can extract all of them at once, by using the ```#to_h``` method:
2. To extract all the details that are stored in the topic at once, use the ```#to_h``` method:

```ruby
class UsersConsumer < ApplicationConsumer
Expand All @@ -97,9 +120,16 @@ class UsersConsumer < ApplicationConsumer
end
```

## Consuming From Earliest or Latest Offset
## Setting Initial Offset Position

By default, Karafka starts consuming messages from the earliest available offset. Use this procedure to configure the initial offset position for your consumers.

**To configure the initial offset globally:**

1. Open your Karafka application configuration file.
2. Set the `initial_offset` value in the setup block.

Karafka, by default, will start consuming messages from the earliest it can reach. You can, however configure it to start consuming from the latest message by setting the `initial_offset` value as a default:
To start from the earliest offset (default behavior):

```ruby
# This will start from the earliest (default)
Expand All @@ -108,16 +138,23 @@ class KarafkaApp < Karafka::App
config.initial_offset = 'earliest'
end
end
```

To start from the latest offset:
```ruby
# This will make Karafka start consuming from the latest message on a given topic
class KarafkaApp < Karafka::App
setup do |config|
config.initial_offset = 'latest'
end
end
```
**Result:** All topics will use this offset position as the default.

**To configure the initial offset for specific topics:**

or on a per-topic basis:
1. Open your Karafka routing configuration.
2. Add the `initial_offset` setting to individual topic definitions:

```ruby
class KarafkaApp < Karafka::App
Expand All @@ -136,18 +173,21 @@ class KarafkaApp < Karafka::App
end
end
```
**Result:** Each topic will use its configured offset position, overriding the global default.

!!! note

This setting applies only to the first execution of a Karafka process. All following executions will pick up from the last offset where the process ended previously.

## Detecting Revocation Midway

When working with a distributed system like Kafka, partitions of a topic can be distributed among different consumers in a consumer group for processing. However, there might be cases where a partition needs to be taken away from a consumer and reassigned to another consumer. This is referred to as a partition revocation.
When working with a distributed system like Kafka, topic partitions can be distributed among different consumers in a consumer group for processing. However, there are cases where a partition needs to be removed from one consumer and reassigned to another. This process is known as a partition revocation.

Partition revocation can be voluntary, where the consumer willingly gives up the partition after it is done processing the current batch, or it can be involuntary. An involuntary partition revocation is typically due to a rebalance triggered by consumer group changes or a failure in the consumer, which causes it to become unresponsive. It is important to remember that involuntary revocations can occur during data processing. You may not want to continue processing messages when you know the partition has been taken away. This is where the `#revoked?` method is beneficial.
Partition revocation can be voluntary, where a consumer willingly gives up the partition after processsing the current batch, or it can be involuntary. Involuntary partition revocation usually happens due to events such as a rebalance triggered by changes in the consumer group or a failure of a consumer that makes it unresponsive. It is important to remember that involuntary revocations can occur during data processing. if you are aware that a partition has been removed, you may not want to continue processing messages. This is where the `#revoked?` method is beneficial.

By monitoring the status of the `#revoked?` method, your application can detect that your process no longer owns a partition you are operating on. In such scenarios, you can choose to stop any ongoing, expensive processing. This can help you save resources and limit the number of potential reprocessings.
By monitoring the status of the `#revoked?` method, your application can detect that your process no longer owns a partition you are operating on. In such cases, you can choose to stop any ongoing, expensive processing. This can help you save resources and reduce the number of potential reprocessings.

As shown in the following example, you can check for revocation after processing each message:

```ruby
def consume
Expand All @@ -161,27 +201,37 @@ def consume
end
```

It is worth, however, keeping in mind that under normal operating conditions, Karafka will complete all ongoing processing before a rebalance occurs. This includes finishing the processing of all messages already fetched. Karafka has built-in mechanisms to handle voluntary partition revocations and rebalances, ensuring that no messages are lost or unprocessed during such events. Hence `#revoked?` is especially useful for involuntary revocations.
It is worth noting, however, that under normal operating conditions, Karafka will complete all ongoing processing before a rebalance occurs. This includes finishing the processing of all messages already fetched. Karafka has built-in mechanisms to handle voluntary partition revocations and rebalances, ensuring that no messages are lost or unprocessed during such events. Hence, `#revoked?` is especially useful for involuntary revocations.

In most cases, especially if you do not use [Long-Running Jobs](Pro-Long-Running-Jobs), the Karafka default [offset management](Offset-management) strategy should be more than enough. It ensures that after batch processing as well as upon rebalances, before partition reassignment, all the offsets are committed. In a healthy system with stable deployment procedures and without frequent short-lived consumer generations, the number of re-processings should be close to zero.
In most cases, especially if you do not use [Long-Running Jobs](Pro-Long-Running-Jobs), the Karafka default [offset management](Offset-management) strategy should be more than enough. It ensures that, after batch processing and upon rebalances, all offsets are committed before partition reassignment. In a healthy system with stable deployment procedures and without frequent short-lived consumer generations, the number of re-processings should be close to zero.

!!! note

You do **not** need to mark the message as consumed for the `#revoked?` method result to change.
The `#revoked?` method detects partition revocation immediately. You don't need to mark messages as consumed for it to detect revocation.

!!! note

When using the [Long-Running Jobs](Pro-Long-Running-Jobs) feature, `#revoked?` result also changes independently from marking messages.
With [Long-Running Jobs](Pro-Long-Running-Jobs), the `#revoked?` method still updates automatically, even when messages remain unmarked for extended periods.


## Consumer Persistence

Karafka consumer instances are persistent by default. This means that a single consumer instance will "live" as long as a given process instance consumes a given topic partition. This means you can elevate in-memory processing and buffering to achieve better performance.
Karafka consumer instances are persistent by default. A single consumer instance will "live" as long as a given process consumes a given topic partition. This allows you to:

- Maintain database connections across batches
- Keep in-memory state and caches
- Buffer messages for batch processing
- Reuse expensive resources

Karafka recreates the consumer instance only when a partition is lost and reassigned.

Karafka consumer instance for a given topic partition will be re-created in case a given partition is lost and re-assigned.

!!! note

If you decide to utilize such techniques, you may be better with manual offset management.
When buffering messages in memory, use manual offset management. Without it, you'll lose buffered data, if the process crashes before flushing.


For example, here's a consumer that buffers messages until it reaches 1,000 of them before flushing:

```ruby
# A consumer that will buffer messages in memory until it reaches 1000 of them. Then it will flush
Expand Down
6 changes: 6 additions & 0 deletions Development/Technical-Writing.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Best Practices

## Documents structure and sources

1. H1 and H2 headers in th TOC (Wiki) are rendered from the home.md file. Edit them directly in the home.md file. Renaming the titles in the navigation pane in the VS Code editor does nothing.
2. Test step
1 change: 1 addition & 0 deletions Home.md
Original file line number Diff line number Diff line change
Expand Up @@ -247,3 +247,4 @@ It is recommended to do one major upgrade at a time.
- [LLM Documentation Guidelines](Development-LLM-Documentation-Guidelines)
- [Librdkafka Update Release Policy](Development-Librdkafka-Update-Release-Policy)
- [Karafka Integration Tests Catalog](Development-Karafka-Integration-Tests-Catalog)
- [Technical Writing](Development-Technical-Writing)
Loading