Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 57 additions & 19 deletions Consuming-Messages.md
Original file line number Diff line number Diff line change
@@ -1,22 +1,33 @@
Consumers should inherit from the **ApplicationConsumer**. You need to define a ```#consume``` method that will execute your business logic code against a batch of messages.
Karafka framework has a long-running server process responsible for fetching and consuming messages. Consumers should inherit from the **ApplicationConsumer**. You need to define a ```#consume``` method that will execute your business logic code against a batch of messages. Karafka fetches and consumes messages in batches by default.

Karafka fetches and consumes messages in batches by default.
### Consuming Messages in Batches
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my mistake here, it will be changed to H2


## Consuming Messages

Karafka framework has a long-running server process responsible for fetching and consuming messages.
Data fetched from Kafka is accessible using the `#messages` method. The returned object is an enumerable containing received data and additional information that can be useful during the processing.

To start the Karafka server process, use the following CLI command:
1. To start the Karafka server process, use the following CLI command:

```shell
bundle exec karafka server
```
2. To access the message batch, use the `#messages` method:
```ruby

class EventsConsumer < ApplicationConsumer
def consume
# Access the batch via messages method
batch = messages
end
end
```

### In Batches
3. Select one of two processing approaches based on your use case:

Data fetched from Kafka is accessible using the `#messages` method. The returned object is an enumerable containing received data and additional information that can be useful during the processing.
- Process each message one by one
- Process all payloads together to leverage batch database operations provided by many ORMs

4. Access message payloads.

To access the payload of your messages, you can use the `#payload` method available for each received message:
For individual message iteration, use the `#payload` method available for each received message:

```ruby
class EventsConsumer < ApplicationConsumer
Expand All @@ -28,8 +39,8 @@ class EventsConsumer < ApplicationConsumer
end
end
```

You can also access all the payloads together to elevate things like batch DB operations available for some of the ORMs:
For bulk operations, use the `#payloads` method to access all payloads at once:

```ruby
class EventsConsumer < ApplicationConsumer
Expand All @@ -40,11 +51,16 @@ class EventsConsumer < ApplicationConsumer
end
```

### One At a Time
### Consuming Messages One At a Time

While we encourage you to process data in batches to elevate in-memory computation and many DBs batch APIs, you may want to process messages one at a time.
While batch processing is recommended to leverage in-memory computation and batch database operations provided by many ORMs, you may need to process messages individually for certain use cases.

You can achieve this by defining a base consumer with such a capability:
1. To start the Karafka server process, use the following CLI command:

```shell
bundle exec karafka server
```
2. Define a reusable base consumer that handles the single-message iteration pattern:

```ruby
class SingleMessageBaseConsumer < Karafka::BaseConsumer
Expand All @@ -66,10 +82,17 @@ class Consumer < SingleMessageBaseConsumer
end
end
```
**Result:** The `#consume_one` method will be called for each message in the batch, allowing you to process messages individually while maintaining the benefits of Karafka's batch fetching.

### Accessing Topic Details
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, in Wiki it is H3 displayed under the Consuming Messages Topic. Is it a subsection? Or can I move it/treat it as H2 in TOC? I understood, it is a separate procedure from Consuming Messages (in batches or individually) per se.


If, in any case, your logic is dependent on some routing details, you can access them from the consumer using the ```#topic``` method. You could use it, for example, in case you want to perform a different logic within a single consumer based on the topic from which your messages come:
If your logic depends on specific routing details, you can access them from the consumer, using the ```#topic``` method.

!!! example Use Case

You could use it, for example, when you want to perform a different logic within a single consumer based on the topic from which your messages come.

1. To access the topic details, call the ```#topic``` method within the consume method:

```ruby
class UsersConsumer < ApplicationConsumer
Expand All @@ -87,7 +110,7 @@ class UsersConsumer < ApplicationConsumer
end
```

If you're interested in all the details that are stored in the topic, you can extract all of them at once, by using the ```#to_h``` method:
2. To extract all the details that are stored in the topic at once, use the ```#to_h``` method:

```ruby
class UsersConsumer < ApplicationConsumer
Expand All @@ -97,9 +120,16 @@ class UsersConsumer < ApplicationConsumer
end
```

## Consuming From Earliest or Latest Offset
## Setting Initial Offset Position

By default, Karafka starts consuming messages from the earliest available offset. Use this procedure to configure the initial offset position for your consumers.

Karafka, by default, will start consuming messages from the earliest it can reach. You can, however configure it to start consuming from the latest message by setting the `initial_offset` value as a default:
**To configure the initial offset globally:**

1. Open your Karafka application configuration file.
2. Set the `initial_offset` value in the setup block.

To start from the earliest offset (default behavior):

```ruby
# This will start from the earliest (default)
Expand All @@ -108,16 +138,23 @@ class KarafkaApp < Karafka::App
config.initial_offset = 'earliest'
end
end
```

To start from the latest offset:
```ruby
# This will make Karafka start consuming from the latest message on a given topic
class KarafkaApp < Karafka::App
setup do |config|
config.initial_offset = 'latest'
end
end
```
**Result:** All topics will use this offset position as the default.

**To configure the initial offset for specific topics:**

or on a per-topic basis:
1. Open your Karafka routing configuration.
2. Add the `initial_offset` setting to individual topic definitions:

```ruby
class KarafkaApp < Karafka::App
Expand All @@ -136,6 +173,7 @@ class KarafkaApp < Karafka::App
end
end
```
**Result:** Each topic will use its configured offset position, overriding the global default.

!!! note

Expand Down
6 changes: 6 additions & 0 deletions Development/Technical-Writing.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Best Practices

## Documents structure and sources

1. H1 and H2 headers in th TOC (Wiki) are rendered from the home.md file. Edit them directly in the home.md file. Renaming the titles in the navigation pane in the VS Code editor does nothing.
2. Test step
1 change: 1 addition & 0 deletions Home.md
Original file line number Diff line number Diff line change
Expand Up @@ -247,3 +247,4 @@ It is recommended to do one major upgrade at a time.
- [LLM Documentation Guidelines](Development-LLM-Documentation-Guidelines)
- [Librdkafka Update Release Policy](Development-Librdkafka-Update-Release-Policy)
- [Karafka Integration Tests Catalog](Development-Karafka-Integration-Tests-Catalog)
- [Technical Writing](Development-Technical-Writing)
Loading