From b343e422b5e04fb0097e9b2c84eaa08cfd6e2538 Mon Sep 17 00:00:00 2001 From: Usiel Riedl Date: Wed, 31 Jan 2024 17:55:11 +0800 Subject: [PATCH 1/3] Ensure rotation of files By falling back to wallclock time when there is no record (i.e. when the upstream `DataWriter` calls `write()`, when the buffer is empty) we should initiate a rotation of tmp files to committed files. --- .../confluent/connect/hdfs/TopicPartitionWriter.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java b/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java index 3e010d92e..d26cc269a 100644 --- a/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java +++ b/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java @@ -296,9 +296,9 @@ public boolean recover() { private void updateRotationTimers(SinkRecord currentRecord) { long now = time.milliseconds(); // Wallclock-based partitioners should be independent of the record argument. - lastRotate = isWallclockBased + lastRotate = isWallclockBased || currentRecord == null ? (Long) now - : currentRecord != null ? timestampExtractor.extract(currentRecord) : null; + : timestampExtractor.extract(currentRecord); if (log.isDebugEnabled() && rotateIntervalMs > 0) { log.debug( "Update last rotation timer. Next rotation for {} will be in {}ms", @@ -563,10 +563,10 @@ private void setState(State state) { } private boolean shouldRotateAndMaybeUpdateTimers(SinkRecord currentRecord, long now) { - Long currentTimestamp = null; - if (isWallclockBased) { + Long currentTimestamp; + if (isWallclockBased || currentRecord == null) { currentTimestamp = now; - } else if (currentRecord != null) { + } else { currentTimestamp = timestampExtractor.extract(currentRecord); lastRotate = lastRotate == null ? currentTimestamp : lastRotate; } From d04bc321dd2e0fb29cf41cc4425e9ed29b84b96a Mon Sep 17 00:00:00 2001 From: Usiel Riedl Date: Wed, 31 Jan 2024 17:57:22 +0800 Subject: [PATCH 2/3] Fix tests Tests depend on Guava 18 (later versions remove deprecated methods that are used by the tests). --- pom.xml | 2 +- .../confluent/connect/hdfs/HdfsSinkConnectorConfigTest.java | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 8e03f5d72..5fba27f7a 100644 --- a/pom.xml +++ b/pom.xml @@ -61,7 +61,7 @@ 2.0 1.1.10.3 6.5.0 - 32.1.3-jre + 18.0 diff --git a/src/test/java/io/confluent/connect/hdfs/HdfsSinkConnectorConfigTest.java b/src/test/java/io/confluent/connect/hdfs/HdfsSinkConnectorConfigTest.java index 5a7666641..ce1ab5439 100644 --- a/src/test/java/io/confluent/connect/hdfs/HdfsSinkConnectorConfigTest.java +++ b/src/test/java/io/confluent/connect/hdfs/HdfsSinkConnectorConfigTest.java @@ -17,6 +17,7 @@ import io.confluent.connect.hdfs.orc.OrcFormat; import io.confluent.connect.hdfs.parquet.ParquetFormat; +import io.confluent.connect.hdfs.partitioner.TopicMappingTimeBasedPartitioner; import io.confluent.connect.hdfs.string.StringFormat; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.kafka.common.config.ConfigException; @@ -286,7 +287,8 @@ public void testRecommendedValues() throws Exception { HourlyPartitioner.class, DailyPartitioner.class, TimeBasedPartitioner.class, - FieldPartitioner.class + FieldPartitioner.class, + TopicMappingTimeBasedPartitioner.class ); List values = HdfsSinkConnectorConfig.getConfig().validate(properties); From c86ff76f559744f1d295a42eeccdcf50d33b4280 Mon Sep 17 00:00:00 2001 From: Usiel Riedl Date: Wed, 31 Jan 2024 17:58:24 +0800 Subject: [PATCH 3/3] Bump version to 10.2.4-tumblr+0.2 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 5fba27f7a..6f7fd2534 100644 --- a/pom.xml +++ b/pom.xml @@ -24,7 +24,7 @@ kafka-connect-hdfs - 10.2.4-tumblr+0.1 + 10.2.4-tumblr+0.2 jar kafka-connect-hdfs