Skip to content

Commit f2df89f

Browse files
committed
Add support for max.request.size and batch.size configuration Kafka Event Listener
1 parent cda78c6 commit f2df89f

File tree

4 files changed

+48
-1
lines changed

4 files changed

+48
-1
lines changed

docs/src/main/sphinx/admin/event-listeners-kafka.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,14 @@ Use the following properties for further configuration.
9696
distinction in Kafka, if multiple Trino clusters send events to the same
9797
Kafka system.
9898
-
99+
* - `kafka-event-listener.max-request-size`
100+
- [Size value](prop-type-data-size) that specifies the maximum size
101+
of a request that the Kafka producer will send. Useful to control the size of log events being sent to Kafka.
102+
- `5MB`
103+
* - `kafka-event-listener.batch-size`
104+
- [Size value](prop-type-data-size) that specifies the size to batch before sending records to Kafka.
105+
Helps improve throughput by batching multiple events into fewer requests.
106+
- `16384B`
99107
* - `kafka-event-listener.publish-created-event`
100108
- [Boolean](prop-type-boolean) switch to control publishing of query creation
101109
events.

plugin/trino-kafka-event-listener/src/main/java/io/trino/plugin/eventlistener/kafka/KafkaEventListenerConfig.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import io.airlift.configuration.ConfigDescription;
2020
import io.airlift.configuration.DefunctConfig;
2121
import io.airlift.configuration.validation.FileExists;
22+
import io.airlift.units.DataSize;
23+
import io.airlift.units.DataSize.Unit;
2224
import io.airlift.units.Duration;
2325
import io.airlift.units.MinDuration;
2426
import jakarta.validation.constraints.AssertTrue;
@@ -48,6 +50,8 @@ public class KafkaEventListenerConfig
4850
private Optional<String> splitCompletedTopicName = Optional.empty();
4951
private String brokerEndpoints;
5052
private Optional<String> clientId = Optional.empty();
53+
private DataSize maxRequestSize = DataSize.of(5, Unit.MEGABYTE); // Greater than default value because the size of completed events are quite large
54+
private DataSize batchSize = DataSize.of(16384, Unit.BYTE); // Default value of batch.size
5155
private Set<String> excludedFields = Collections.emptySet();
5256
private Duration requestTimeout = new Duration(10, SECONDS);
5357
private boolean terminateOnInitializationFailure = true;
@@ -91,6 +95,32 @@ public KafkaEventListenerConfig setClientId(String clientId)
9195
return this;
9296
}
9397

98+
public DataSize getMaxRequestSize()
99+
{
100+
return maxRequestSize;
101+
}
102+
103+
@ConfigDescription("The maximum size of a request/message in bytes")
104+
@Config("kafka-event-listener.max-request-size")
105+
public KafkaEventListenerConfig setMaxRequestSize(DataSize maxRequestSize)
106+
{
107+
this.maxRequestSize = maxRequestSize;
108+
return this;
109+
}
110+
111+
public DataSize getBatchSize()
112+
{
113+
return batchSize;
114+
}
115+
116+
@ConfigDescription("Attempt to batch records/messages together up to batch.size bytes before sending")
117+
@Config("kafka-event-listener.batch-size")
118+
public KafkaEventListenerConfig setBatchSize(DataSize batchSize)
119+
{
120+
this.batchSize = batchSize;
121+
return this;
122+
}
123+
94124
public Optional<String> getCompletedTopicName()
95125
{
96126
return completedTopicName;

plugin/trino-kafka-event-listener/src/main/java/io/trino/plugin/eventlistener/kafka/producer/BaseKafkaProducerFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ protected Map<String, Object> baseConfig(KafkaEventListenerConfig config)
3939
kafkaClientConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
4040
kafkaClientConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
4141
kafkaClientConfig.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "zstd");
42-
kafkaClientConfig.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "5242880");
42+
kafkaClientConfig.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, Long.toString(config.getMaxRequestSize().toBytes()));
43+
kafkaClientConfig.put(ProducerConfig.BATCH_SIZE_CONFIG, Long.toString(config.getBatchSize().toBytes()));
4344
kafkaClientConfig.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, Long.toString(config.getRequestTimeout().toMillis()));
4445
return kafkaClientConfig;
4546
}

plugin/trino-kafka-event-listener/src/test/java/io/trino/plugin/eventlistener/kafka/TestKafkaEventListenerConfig.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import java.util.Set;
2828
import java.util.concurrent.TimeUnit;
2929

30+
import io.airlift.units.DataSize;
31+
import io.airlift.units.DataSize.Unit;
3032
import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping;
3133
import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults;
3234
import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults;
@@ -46,6 +48,8 @@ void testDefaults()
4648
.setCreatedTopicName(null)
4749
.setSplitCompletedTopicName(null)
4850
.setBrokerEndpoints(null)
51+
.setMaxRequestSize(DataSize.of(5, Unit.MEGABYTE))
52+
.setBatchSize(DataSize.of(16384, Unit.BYTE))
4953
.setClientId(null)
5054
.setExcludedFields(Set.of())
5155
.setRequestTimeout(new Duration(10, TimeUnit.SECONDS))
@@ -66,6 +70,8 @@ void testExplicitPropertyMappings()
6670
.put("kafka-event-listener.publish-completed-event", "false")
6771
.put("kafka-event-listener.publish-split-completed-event", "true")
6872
.put("kafka-event-listener.broker-endpoints", "kafka-host-1:9093,kafka-host-2:9093")
73+
.put("kafka-event-listener.max-request-size", "1048576")
74+
.put("kafka-event-listener.batch-size", "81920")
6975
.put("kafka-event-listener.created-event.topic", "query_created")
7076
.put("kafka-event-listener.completed-event.topic", "query_completed")
7177
.put("kafka-event-listener.split-completed-event.topic", "split_completed")
@@ -84,6 +90,8 @@ void testExplicitPropertyMappings()
8490
.setPublishCompletedEvent(false)
8591
.setPublishSplitCompletedEvent(true)
8692
.setBrokerEndpoints("kafka-host-1:9093,kafka-host-2:9093")
93+
.setMaxRequestSize(DataSize.of(1048576, Unit.BYTE))
94+
.setBatchSize(DataSize.of(81920, Unit.BYTE))
8795
.setCreatedTopicName("query_created")
8896
.setCompletedTopicName("query_completed")
8997
.setSplitCompletedTopicName("split_completed")

0 commit comments

Comments
 (0)