Skip to content

Commit 915a51c

Browse files
committed
fix: handle empty input gracefully
1 parent 0bf764e commit 915a51c

File tree

3 files changed

+36
-0
lines changed

3 files changed

+36
-0
lines changed

connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java

+4
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,10 @@ private Sender createSender() {
9090

9191
@Override
9292
public void put(Collection<SinkRecord> collection) {
93+
if (collection.isEmpty()) {
94+
log.debug("Received empty collection, ignoring");
95+
return;
96+
}
9397
if (log.isDebugEnabled()) {
9498
SinkRecord record = collection.iterator().next();
9599
log.debug("Received {} records. First record kafka coordinates:({}-{}-{}). ",

connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java

+31
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,37 @@ public void testRetrying_recoversFromInfrastructureIssues() throws Exception {
259259
"select firstname,lastname,age from " + topicName + " where age = 49");
260260
}
261261

262+
@Test
263+
public void testEmptyCollection_wontFailTheConnector() {
264+
connect.kafka().createTopic(topicName, 1);
265+
Map<String, String> props = baseConnectorProps(topicName);
266+
// filter out all message
267+
props.put("transforms", "drop");
268+
props.put("transforms.drop.type", "org.apache.kafka.connect.transforms.Filter");
269+
270+
271+
connect.configureConnector(CONNECTOR_NAME, props);
272+
assertConnectorTaskRunningEventually();
273+
Schema schema = SchemaBuilder.struct().name("com.example.Person")
274+
.field("firstname", Schema.STRING_SCHEMA)
275+
.field("lastname", Schema.STRING_SCHEMA)
276+
.field("age", Schema.INT8_SCHEMA)
277+
.build();
278+
279+
Struct struct = new Struct(schema)
280+
.put("firstname", "John")
281+
.put("lastname", "Doe")
282+
.put("age", (byte) 42);
283+
284+
connect.kafka().produce(topicName, "key", new String(converter.fromConnectData(topicName, schema, struct)));
285+
286+
int durationMs = 10_000;
287+
long deadline = System.currentTimeMillis() + durationMs;
288+
while (System.currentTimeMillis() < deadline) {
289+
assertConnectorTaskState(CONNECTOR_NAME, AbstractStatus.State.RUNNING);
290+
}
291+
}
292+
262293
@Test
263294
public void testSymbol_withAllOtherILPTypes() {
264295
connect.kafka().createTopic(topicName, 1);

connector/src/test/resources/log4j.properties

+1
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,4 @@ log4j.logger.org.apache.zookeeper=WARN
2626
log4j.logger.kafka=WARN
2727
log4j.logger.org.reflections=ERROR
2828
log4j.logger.state.change.logger=WARN
29+
log4j.logger.io.questdb.kafka=DEBUG

0 commit comments

Comments
 (0)