-
Notifications
You must be signed in to change notification settings - Fork 41.6k
Add auto-configuation for kafka-streams #14021
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @garyrussell - We should probably add dependency management for Kafka streams as part of this change.
I've added a few comments, let me know what you think.
| public static class KafkaStreamsConfiguration { | ||
|
|
||
| @Bean(KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME) | ||
| public Properties defaultKafkaStreamsConfig(KafkaProperties properties) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not keen to expose a @Bean of type Properties, perhaps we could have a dedicated object for this or does the underlying configuration actually require this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kafka deprecated the use of an immutable StreamsConfig in favor of a Properties object to configure the StreamsBuilder. I can add a wrapper class in spring-kafka if you prefer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes please, Properties is way to generic to be exposed as a bean. Ideally this should be done in spring-kafka itself as I imagine this would be larger in scope.
| } | ||
|
|
||
| @Bean | ||
| public Object kafkaStreamsFactoryBeanConfigurer( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Object ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I used a NullBean (return null) to avoid creating an explicit ...Configurer class just for this one property; I can do that if you prefer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure I understand what that means but I'd rather have something that backs off with a property type if we should do something.
| import org.apache.kafka.common.config.SslConfigs; | ||
| import org.apache.kafka.common.serialization.StringDeserializer; | ||
| import org.apache.kafka.common.serialization.StringSerializer; | ||
| import org.apache.kafka.streams.StreamsConfig; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That requires Kafka streams to be on the classpath and, as I understand, this is an optional dependency on top of Kafka
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More over according some previous comment this class is deprecated there.
| /** | ||
| * Kafka streams application.id property; default spring.application.name. | ||
| */ | ||
| private String applicationId = "${spring.application.name}"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is that supposed to be SpEL? I'd prefer if we perform that detection elsewhere (in the caller with access to the Environment).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I used a property placeholder which gets resolved as expected in the test case; I can move it to the caller.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't do property placeholder in properties. Yes please.
| private String applicationId = "${spring.application.name}"; | ||
|
|
||
| /** | ||
| * Whether or not to auto-start the streams factory bean; default false. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't add default values to descriptions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since autoStartup is usually true by default; I thought it was important to add it here. The reason it's false is I don't want to start the factory bean for an existing Boot app that uses streams but doesn't use the auto configured infrastructure (start connects to Kafka).
I did mention it in the docs, however, so I can remove it here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The point I was trying to make is that the metadata will already state the default is false (as inferred by the default value of the field). So your IDE will tell you it's false and having that in text is a duplicate that could potentially get out of sync.
Does that make sense?
| * Comma-delimited list of host:port pairs to use for establishing the initial | ||
| * connection to the Kafka cluster. | ||
| */ | ||
| private List<String> bootstrapServers; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am confused. Is that streams specific? Yet this property doesn't indicate that's the case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is similar to producer/consumer - we can add a global property and/or override at the component level - I will enhance these docs in all 3 places.
| private String clientId; | ||
|
|
||
| /** | ||
| * The replication factor for change log topics and repartition topics created by |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Key descriptions do not start with "The", "A", etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK; I simply copied the Kafka descriptions 😄
|
|
||
| Spring for Apache Kafka provides a factory bean to create a `StreamsBuilder` object and | ||
| manage the lifecycle of its streams; the factory bean is created when | ||
| `@EnableKafkaStreams` is present on a `@Configuration` class. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can see the auto-configuration enables that so something is not consistent. Perhaps the auto-configuration should check for the presence of @EnableKafkaStreams if that makes sense? (We don't do that for Kafka so perhaps that's the wrong idea).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My mistake; I originally did not add @EnableKafkaStreams in the auto config because of the problem with auto-starting the factory bean.
I later added the autoStartup auto-config to prevent the FB from starting unless the user explicitly sets it, but failed to revisit the docs.
Which do you prefer - requiring a user-specified @EnableKafkaStreams or this autoStartup dance? I think I would prefer that the user adds the annotation, but that seems inconsistent with other practices.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know really. I'd like this to be consistent. On the other hand, I don't know if adding kafka-streams is enough to trigger that behaviour. Since adding spring-kafka is enough for the kafka use case, I'd be tempted to say that's the same here (you need spring-kafka and kafka-streams anyway).
Any potential clash with Spring Cloud Stream and the kafka streams story there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll stick with the presence of the jar for consistency (and fix the configurer).
I will reach out to the SCSt team to make them aware of this.
|
@snicoll Thanks for reviewing;
Is something else needed? I will address your other comments shortly; thanks. |
Wow I totally missed that. It means we can remove our hard-coded version in initializr, sweet! Thanks for the feedback, let me know if you need help with any of this. |
See spring-projects/spring-boot#14021 (comment) Avoid using a bean of type `Properties`.
|
I pushed a PR for spring-kafka to avoid the Thanks. |
See spring-projects/spring-boot#14021 (comment) Avoid using a bean of type `Properties`. * Polishing - PR Comments
If `kafka-streams` is on the classpath, auto configure Kafka Streams `Properties`, set `@EnableKafkaStreams` and disable the factory bean's `autoStartup` property.
`toString()` rendered the list of servers within [...]. Remove the brackets since streams expects a comma-delimited list.
775eb8f to
30d330d
Compare
|
@snicoll I addressed your concerns. Please note that I am on PTO this week; if this needs more polishing, please reach out to @artembilan ; I will make him a collaborator on my fork so he can push to this PR. Thanks, Gary |
|
@garyrussell @artembilan I've changed my mind looking at the current proposal, please see Given that Now if you enable auto-startup, you have an exception deep down that tells you application.id is not set. The link to Spring Boot is not super obvious. I am not a huge fan of overriding the default value of I've modified the initial proposal so that it requires Concretely, nothing happens until What do you think about this proposal? Does that make sense? Thanks! |
|
@snicoll LGTM - that was my original approach (user opt-in with Since I thought that was inconsistent with other auto-config (adding I definitely prefer this solution, though. Thanks! |
|
Sweet, thanks for the feedback @garyrussell! |
* pr/14021: Polish "Add Kafka Streams auto-configuration" Add Kafka Streams auto-configuration
See spring-projects/spring-boot#14021 (comment) Avoid using a bean of type `Properties`. * Polishing - PR Comments
If
kafka-streamsis on the classpath, auto configure Kafka StreamsProperties, set@EnableKafkaStreamsand disable the factorybean's
autoStartupproperty.