Skip to content

Commit d07cb95

Browse files
authored
Fix default config (#16)
1 parent bd8b1e6 commit d07cb95

File tree

1 file changed

+7
-3
lines changed
  • kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework

1 file changed

+7
-3
lines changed

kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/KafkaStreamsApp.java

+7-3
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG;
55
import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
66
import static org.apache.kafka.clients.consumer.ConsumerConfig.MAX_POLL_RECORDS_CONFIG;
7+
import static org.apache.kafka.clients.producer.ProducerConfig.BATCH_SIZE_CONFIG;
78
import static org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_CONFIG;
89
import static org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG;
910
import static org.apache.kafka.clients.producer.ProducerConfig.MAX_REQUEST_SIZE_CONFIG;
@@ -12,6 +13,7 @@
1213
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG;
1314
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG;
1415
import static org.apache.kafka.streams.StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG;
16+
import static org.apache.kafka.streams.StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG;
1517
import static org.apache.kafka.streams.StreamsConfig.TOPOLOGY_OPTIMIZATION;
1618
import static org.apache.kafka.streams.StreamsConfig.consumerPrefix;
1719
import static org.apache.kafka.streams.StreamsConfig.producerPrefix;
@@ -35,6 +37,7 @@
3537
import org.apache.kafka.streams.kstream.KStream;
3638
import org.hypertrace.core.kafkastreams.framework.listeners.LoggingStateListener;
3739
import org.hypertrace.core.kafkastreams.framework.listeners.LoggingStateRestoreListener;
40+
import org.hypertrace.core.kafkastreams.framework.rocksdb.RocksDBStateStoreConfigSetter;
3841
import org.hypertrace.core.kafkastreams.framework.timestampextractors.UseWallclockTimeOnInvalidTimestamp;
3942
import org.hypertrace.core.kafkastreams.framework.topics.creator.KafkaTopicCreator;
4043
import org.hypertrace.core.kafkastreams.framework.util.ExceptionUtils;
@@ -141,20 +144,21 @@ public Map<String, Object> getBaseStreamsConfig() {
141144
.put(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, UseWallclockTimeOnInvalidTimestamp.class);
142145
baseStreamsConfig.put(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
143146
LogAndContinueExceptionHandler.class);
147+
baseStreamsConfig.put(ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, RocksDBStateStoreConfigSetter.class);
144148

145149
// Default serde configurations
146150
baseStreamsConfig.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
147151
baseStreamsConfig.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
148152

149153
// Default producer configurations
150-
baseStreamsConfig.put(producerPrefix(LINGER_MS_CONFIG), "5000");
151-
baseStreamsConfig.put(producerPrefix(MAX_REQUEST_SIZE_CONFIG), "1048576");
154+
baseStreamsConfig.put(producerPrefix(LINGER_MS_CONFIG), "2000");
155+
baseStreamsConfig.put(producerPrefix(BATCH_SIZE_CONFIG), "2097152");
152156
baseStreamsConfig.put(producerPrefix(COMPRESSION_TYPE_CONFIG), CompressionType.GZIP.name);
157+
baseStreamsConfig.put(producerPrefix(MAX_REQUEST_SIZE_CONFIG), "10485760");
153158

154159
// Default consumer configurations
155160
baseStreamsConfig.put(consumerPrefix(MAX_POLL_RECORDS_CONFIG), "1000");
156161
baseStreamsConfig.put(consumerPrefix(AUTO_OFFSET_RESET_CONFIG), "latest");
157-
baseStreamsConfig.put(consumerPrefix(AUTO_COMMIT_INTERVAL_MS_CONFIG), "5000");
158162

159163
return baseStreamsConfig;
160164
}

0 commit comments

Comments
 (0)