Skip to content

Commit

Permalink
Merge pull request confluentinc#3 from Tumblr/usielriedl/kafka-connec…
Browse files Browse the repository at this point in the history
…t-69-ensure-rotation

Ensure file rotation
  • Loading branch information
Usiel authored and GitHub Enterprise committed Feb 6, 2024
2 parents 2b26ae1 + c86ff76 commit 842cbf9
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 8 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
</parent>

<artifactId>kafka-connect-hdfs</artifactId>
<version>10.2.4-tumblr+0.1</version>
<version>10.2.4-tumblr+0.2</version>
<packaging>jar</packaging>
<name>kafka-connect-hdfs</name>
<organization>
Expand Down Expand Up @@ -61,7 +61,7 @@
<snakeyaml.version>2.0</snakeyaml.version>
<snappy.java.version>1.1.10.3</snappy.java.version>
<woodstox-core.version>6.5.0</woodstox-core.version>
<guava.version>32.1.3-jre</guava.version>
<guava.version>18.0</guava.version>
</properties>

<repositories>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,9 +296,9 @@ public boolean recover() {
private void updateRotationTimers(SinkRecord currentRecord) {
long now = time.milliseconds();
// Wallclock-based partitioners should be independent of the record argument.
lastRotate = isWallclockBased
lastRotate = isWallclockBased || currentRecord == null
? (Long) now
: currentRecord != null ? timestampExtractor.extract(currentRecord) : null;
: timestampExtractor.extract(currentRecord);
if (log.isDebugEnabled() && rotateIntervalMs > 0) {
log.debug(
"Update last rotation timer. Next rotation for {} will be in {}ms",
Expand Down Expand Up @@ -563,10 +563,10 @@ private void setState(State state) {
}

private boolean shouldRotateAndMaybeUpdateTimers(SinkRecord currentRecord, long now) {
Long currentTimestamp = null;
if (isWallclockBased) {
Long currentTimestamp;
if (isWallclockBased || currentRecord == null) {
currentTimestamp = now;
} else if (currentRecord != null) {
} else {
currentTimestamp = timestampExtractor.extract(currentRecord);
lastRotate = lastRotate == null ? currentTimestamp : lastRotate;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.confluent.connect.hdfs.orc.OrcFormat;
import io.confluent.connect.hdfs.parquet.ParquetFormat;
import io.confluent.connect.hdfs.partitioner.TopicMappingTimeBasedPartitioner;
import io.confluent.connect.hdfs.string.StringFormat;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.kafka.common.config.ConfigException;
Expand Down Expand Up @@ -286,7 +287,8 @@ public void testRecommendedValues() throws Exception {
HourlyPartitioner.class,
DailyPartitioner.class,
TimeBasedPartitioner.class,
FieldPartitioner.class
FieldPartitioner.class,
TopicMappingTimeBasedPartitioner.class
);

List<ConfigValue> values = HdfsSinkConnectorConfig.getConfig().validate(properties);
Expand Down

0 comments on commit 842cbf9

Please sign in to comment.