Make Kafka AutoProvision actually create missing topics (GH-2537)#2540
Merged
jeremydmiller merged 1 commit intomainfrom Apr 20, 2026
Merged
Conversation
AutoProvision() is documented as "Create newly used Kafka topics on endpoint activation if the topic is missing", but on its own it had no effect: nothing in the Kafka transport ever read the flag at startup. The bug was invisible unless users also called AddResourceSetupOnStartup (which runs SetupAsync for every broker unconditionally), and users reported hitting "Subscribed topic not available" from the KafkaTopicGroupListener consumer on fresh brokers. Root cause: BrokerTransport.InitializeAsync iterates endpoints and calls endpoint.InitializeAsync(logger), but neither KafkaTopic nor KafkaTopicGroup overrode that hook. Both classes already had a correct SetupAsync that creates topics (and swallows "already exists"), so the fix is just to wire Parent.AutoProvision into InitializeAsync — the same pattern RabbitMqQueue.InitializeAsync uses. KafkaTopicGroup needs its own override (not just inheritance) because it shadows SetupAsync with a multi-topic version — the base KafkaTopic.SetupAsync would otherwise create a single topic with the group's sanitized composite name. Reproducer test covers the user's reported path (ListenToKafkaTopics group listener) and is careful not to let confluent-local's default auto.create.topics.enable=true mask the bug: it uses full-cluster broker metadata (not per-topic queries, which trigger auto-creation) and never produces to the topic. Full Kafka test suite: 137 / 137 pass on net9.0. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This was referenced Apr 21, 2026
Closed
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
AutoProvision()on the Kafka transport is documented as "Create newly used Kafka topics on endpoint activation if the topic is missing", but on its own it had no effect — nothing in the Kafka transport ever read the flag during startup. The only workaround today is to additionally callAddResourceSetupOnStartup(), which runs for every broker and defeats per-transport granularity.KafkaTopicandKafkaTopicGroupnow each overrideInitializeAsyncand call their existingSetupAsyncwhenParent.AutoProvisionis true — the same wiringRabbitMqQueue.InitializeAsyncalready uses.Root cause
BrokerTransport.InitializeAsynciterates endpoints and callsendpoint.InitializeAsync(logger), but neither Kafka endpoint class overrode that hook, so the default no-op ran and the flag was never consulted. Both classes already had correctSetupAsyncmethods (create topic, swallow "already exists") — they just weren't being invoked on the AutoProvision path.KafkaTopicGroupshadowsSetupAsyncwith a multi-topic version vianew, so it needs its own override — otherwise the baseKafkaTopic.SetupAsyncwould create a single topic named after the sanitized composite group name instead of each topic in the group.Reproducer
Test:
src/Transports/Kafka/Wolverine.Kafka.Tests/Bugs/Bug_2537_autoprovision_creates_missing_topics.csTargets the user's reported path (
ListenToKafkaTopics(…), group listener). Two deliberate design choices to avoid Confluent-local's defaults masking the bug:confluent-local:7.7.1hasauto.create.topics.enable=true, so any producer publish would silently create the topic. The test never produces — it only asserts on broker metadata.admin.GetMetadata(topicName, timeout)also triggers broker auto-create on a per-topic query. The test usesadmin.GetMetadata(timeout)(full-cluster snapshot) instead.Without the fix, the test fails with the exact error the user reported:
Test plan
dotnet test src/Transports/Kafka/Wolverine.Kafka.Tests --framework net9.0→ 137 / 137 pass (2 pre-existing skips), 8m 36s locally againstconfluentinc/confluent-local:7.7.1🤖 Generated with Claude Code