Skip to content

Fix global partitioning interceptor not intercepting Kafka messages#2362

Merged
jeremydmiller merged 1 commit intomainfrom
kafka-global-partitioning
Mar 27, 2026
Merged

Fix global partitioning interceptor not intercepting Kafka messages#2362
jeremydmiller merged 1 commit intomainfrom
kafka-global-partitioning

Conversation

@jeremydmiller
Copy link
Copy Markdown
Member

Summary

  • Root cause: GlobalPartitionedInterceptor checked envelope.Message != null to decide whether to intercept, but Kafka envelopes arrive with only raw envelope.Data bytes — Message is always null at the receiver level because deserialization happens later in the handler pipeline. This meant the interceptor never fired for Kafka messages.
  • Impact: In multi-node setups with global partitioning + Kafka, messages bypassed the companion local queues and were processed directly on whichever node received them. This caused concurrent access to the same Marten event stream across nodes, resulting in EventStreamUnexpectedMaxEventIdException.
  • Fix: The interceptor now falls back to checking envelope.MessageType (the string type name from Kafka headers) when envelope.Message is null, deserializes via Pipeline.TryDeserializeEnvelope() before re-publishing, and GlobalPartitionedMessageTopology maintains a HashSet<string> of message type names for fast string-based matching.
  • Test: Added a multi-node reproducing test (global_partitioned_aggregate_concurrency) that runs 2 Wolverine hosts with a separate publisher, confirming 0 concurrent access violations (was 17 before fix).

Test plan

  • New multi-node concurrency test passes (0 concurrent access violations)
  • All 6 existing global partitioning Kafka tests pass (no regressions)
  • All 98 other Kafka tests pass (1 pre-existing flaky batch test excluded)
  • All 1196 CoreTests pass

🤖 Generated with Claude Code

The interceptor checked envelope.Message != null to decide whether to
intercept, but for Kafka (and other transports that defer deserialization),
envelope.Message is always null at the receiver level - only raw
envelope.Data bytes and envelope.MessageType metadata are set. This meant
the interceptor never fired for Kafka messages, so messages bypassed
global partitioning and were processed directly on whichever node
received them, causing EventStreamUnexpectedMaxEventIdException from
concurrent access to the same Marten event stream.

The fix:
- GlobalPartitionedInterceptor now falls back to checking
  envelope.MessageType (string from headers) when envelope.Message is
  null, and deserializes via Pipeline.TryDeserializeEnvelope() before
  re-publishing
- GlobalPartitionedMessageTopology tracks message type name strings for
  fast string-based matching, populated both eagerly (Message<T>()) and
  at startup (for MessagesImplementing<T>() and other scope-based
  subscriptions)
- Added multi-node reproducing test that runs 2 Wolverine hosts with a
  separate publisher, confirming 0 concurrent access violations

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant