Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 75 additions & 1 deletion docs/guide/messaging/transports/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,81 @@ using var host = await Host.CreateDefaultBuilder()
The various `Configure*****()` methods provide quick access to the full API of the Confluent Kafka library for security
and fine tuning the Kafka topic behavior.

## Publishing by Partition Key
## Listener Consumer Settings <Badge type="tip" text="5.16" />

When building a Kafka listener, Wolverine configures the underlying Confluent Kafka `ConsumerConfig` differently
depending on whether the listener endpoint is **durable** (backed by the transactional inbox) and how the listener
processes messages. Understanding these settings is important for getting the delivery guarantees you need.

### How Endpoint Mode Affects Consumer Configuration

When an endpoint uses `EndpointMode.Durable` (i.e., you've called `.UseDurableInbox()` or applied durable inbox
globally), Wolverine overrides two key consumer settings before building the listener:

| Consumer Setting | Durable (`UseDurableInbox`) | Non-Durable (`BufferedInMemory` / `Inline`) |
|---|---|---|
| `EnableAutoCommit` | `false` | `true` (Kafka default) |
| `EnableAutoOffsetStore` | `false` | `true` (Kafka default) |

In **durable mode**, Wolverine disables Kafka's automatic offset management so that offsets are only committed
after a message has been successfully processed and persisted to the transactional inbox. This prevents message loss
if the application shuts down unexpectedly -- unprocessed messages will be re-delivered when the consumer rejoins
the group.

In **non-durable mode** (`BufferedInMemory` or `ProcessInline`), Kafka's default auto-commit behavior is left
in place. The Kafka client library periodically commits offsets automatically, which provides higher throughput
at the cost of potential message loss during an ungraceful shutdown.

### Offset Commit Behavior in the Listener

Regardless of endpoint mode, the `KafkaListener` calls `_consumer.Commit()` in these situations:

- **On successful processing** -- `CompleteAsync()` explicitly commits the consumer offset after a message
finishes processing. In durable mode this is the *only* path that advances the offset.
- **On poison pill messages** -- If an incoming Kafka message cannot be deserialized into a Wolverine envelope
at all (a true poison pill), the listener commits the offset to skip past the bad message and avoid blocking
the consumer.
- **On dead letter queue routing** -- When a message exhausts all retries and is moved to the native dead letter
queue topic, the offset is committed after the DLQ produce succeeds.

### Recommended Configuration by Use Case

**At-least-once delivery** (recommended for most use cases):

```csharp
opts.ListenToKafkaTopic("orders")
.UseDurableInbox();
```

This ensures messages are persisted to the inbox before the offset is committed. If your process crashes, the
message will be re-delivered by Kafka and de-duplicated by Wolverine's inbox.

**Higher throughput, at-most-once delivery**:

```csharp
opts.ListenToKafkaTopic("telemetry")
.BufferedInMemory();
```

With auto-commit enabled, offsets may be committed before processing completes. This is suitable for
high-volume, loss-tolerant workloads like telemetry or logging.

**Inline processing with manual consumer tuning**:

```csharp
opts.ListenToKafkaTopic("events")
.ProcessInline()
.ConfigureConsumer(config =>
{
config.EnableAutoCommit = false;
config.AutoOffsetReset = AutoOffsetReset.Earliest;
});
```

You can always override any consumer setting per-topic using `ConfigureConsumer()`. Note that this
**completely replaces** the parent-level consumer configuration -- it is not combinatorial.

## Publishing by Partition Key

To publish messages with Kafka using a designated [partition key](https://developer.confluent.io/courses/apache-kafka/partitions/), use the
`DeliveryOptions` to designate a partition like so:
Expand Down
Loading