@@ -57,7 +57,7 @@ public class KafkaProperties {
5757
5858 /**
5959 * Comma-delimited list of host:port pairs to use for establishing the initial
60- * connection to the Kafka cluster.
60+ * connection to the Kafka cluster. Applies to all components unless overridden.
6161 */
6262 private List <String > bootstrapServers = new ArrayList <>(
6363 Collections .singletonList ("localhost:9092" ));
@@ -79,6 +79,8 @@ public class KafkaProperties {
7979
8080 private final Admin admin = new Admin ();
8181
82+ private final Streams streams = new Streams ();
83+
8284 private final Listener listener = new Listener ();
8385
8486 private final Ssl ssl = new Ssl ();
@@ -123,6 +125,10 @@ public Admin getAdmin() {
123125 return this .admin ;
124126 }
125127
128+ public Streams getStreams () {
129+ return this .streams ;
130+ }
131+
126132 public Ssl getSsl () {
127133 return this .ssl ;
128134 }
@@ -193,6 +199,19 @@ public Map<String, Object> buildAdminProperties() {
193199 return properties ;
194200 }
195201
202+ /**
203+ * Create an initial map of streams properties from the state of this instance.
204+ * <p>
205+ * This allows you to add additional properties, if necessary.
206+ * @return the streams properties initialized with the customizations defined on this
207+ * instance
208+ */
209+ public Map <String , Object > buildStreamsProperties () {
210+ Map <String , Object > properties = buildCommonProperties ();
211+ properties .putAll (this .streams .buildProperties ());
212+ return properties ;
213+ }
214+
196215 public static class Consumer {
197216
198217 private final Ssl ssl = new Ssl ();
@@ -211,7 +230,7 @@ public static class Consumer {
211230
212231 /**
213232 * Comma-delimited list of host:port pairs to use for establishing the initial
214- * connection to the Kafka cluster.
233+ * connection to the Kafka cluster. Overrides the global property, for consumers.
215234 */
216235 private List <String > bootstrapServers ;
217236
@@ -421,7 +440,7 @@ public static class Producer {
421440
422441 /**
423442 * Comma-delimited list of host:port pairs to use for establishing the initial
424- * connection to the Kafka cluster.
443+ * connection to the Kafka cluster. Overrides the global property, for producers.
425444 */
426445 private List <String > bootstrapServers ;
427446
@@ -631,6 +650,136 @@ public Map<String, Object> buildProperties() {
631650
632651 }
633652
653+ /**
654+ * High (and some medium) priority Streams properties and a general properties bucket.
655+ */
656+ public static class Streams {
657+
658+ private final Ssl ssl = new Ssl ();
659+
660+ /**
661+ * Kafka streams application.id property; default spring.application.name.
662+ */
663+ private String applicationId ;
664+
665+ /**
666+ * Whether or not to auto-start the streams factory bean.
667+ */
668+ private boolean autoStartup ;
669+
670+ /**
671+ * Comma-delimited list of host:port pairs to use for establishing the initial
672+ * connection to the Kafka cluster. Overrides the global property, for streams.
673+ */
674+ private List <String > bootstrapServers ;
675+
676+ /**
677+ * Maximum number of memory bytes to be used for buffering across all threads.
678+ */
679+ private Integer cacheMaxBytesBuffering ;
680+
681+ /**
682+ * ID to pass to the server when making requests. Used for server-side logging.
683+ */
684+ private String clientId ;
685+
686+ /**
687+ * The replication factor for change log topics and repartition topics created by
688+ * the stream processing application.
689+ */
690+ private Integer replicationFactor ;
691+
692+ /**
693+ * Directory location for the state store.
694+ */
695+ private String stateDir ;
696+
697+ /**
698+ * Additional Kafka properties used to configure the streams.
699+ */
700+ private final Map <String , String > properties = new HashMap <>();
701+
702+ public Ssl getSsl () {
703+ return this .ssl ;
704+ }
705+
706+ public String getApplicationId () {
707+ return this .applicationId ;
708+ }
709+
710+ public void setApplicationId (String applicationId ) {
711+ this .applicationId = applicationId ;
712+ }
713+
714+ public boolean isAutoStartup () {
715+ return this .autoStartup ;
716+ }
717+
718+ public void setAutoStartup (boolean autoStartup ) {
719+ this .autoStartup = autoStartup ;
720+ }
721+
722+ public List <String > getBootstrapServers () {
723+ return this .bootstrapServers ;
724+ }
725+
726+ public void setBootstrapServers (List <String > bootstrapServers ) {
727+ this .bootstrapServers = bootstrapServers ;
728+ }
729+
730+ public Integer getCacheMaxBytesBuffering () {
731+ return this .cacheMaxBytesBuffering ;
732+ }
733+
734+ public void setCacheMaxBytesBuffering (Integer cacheMaxBytesBuffering ) {
735+ this .cacheMaxBytesBuffering = cacheMaxBytesBuffering ;
736+ }
737+
738+ public String getClientId () {
739+ return this .clientId ;
740+ }
741+
742+ public void setClientId (String clientId ) {
743+ this .clientId = clientId ;
744+ }
745+
746+ public Integer getReplicationFactor () {
747+ return this .replicationFactor ;
748+ }
749+
750+ public void setReplicationFactor (Integer replicationFactor ) {
751+ this .replicationFactor = replicationFactor ;
752+ }
753+
754+ public String getStateDir () {
755+ return this .stateDir ;
756+ }
757+
758+ public void setStateDir (String stateDir ) {
759+ this .stateDir = stateDir ;
760+ }
761+
762+ public Map <String , String > getProperties () {
763+ return this .properties ;
764+ }
765+
766+ public Map <String , Object > buildProperties () {
767+ Properties properties = new Properties ();
768+ PropertyMapper map = PropertyMapper .get ().alwaysApplyingWhenNonNull ();
769+ map .from (this ::getApplicationId ).to (properties .in ("application.id" ));
770+ map .from (this ::getBootstrapServers )
771+ .to (properties .in (CommonClientConfigs .BOOTSTRAP_SERVERS_CONFIG ));
772+ map .from (this ::getCacheMaxBytesBuffering )
773+ .to (properties .in ("cache.max.bytes.buffering" ));
774+ map .from (this ::getClientId )
775+ .to (properties .in (CommonClientConfigs .CLIENT_ID_CONFIG ));
776+ map .from (this ::getReplicationFactor ).to (properties .in ("replication.factor" ));
777+ map .from (this ::getStateDir ).to (properties .in ("state.dir" ));
778+ return properties .with (this .ssl , this .properties );
779+ }
780+
781+ }
782+
634783 public static class Template {
635784
636785 /**
@@ -1011,6 +1160,7 @@ public void setOptions(Map<String, String> options) {
10111160
10121161 }
10131162
1163+ @ SuppressWarnings ("serial" )
10141164 private static class Properties extends HashMap <String , Object > {
10151165
10161166 public <V > java .util .function .Consumer <V > in (String key ) {
0 commit comments