From f038915ab00f2bf37fd1882a65d9333dec565d8f Mon Sep 17 00:00:00 2001 From: John Hofman Date: Tue, 10 Jan 2017 14:03:00 +0100 Subject: [PATCH] Correct logic for detecting stale records --- .../java/io/confluent/connect/hdfs/TopicPartitionWriter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java b/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java index a4f6f7a90..ed8a1734f 100644 --- a/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java +++ b/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java @@ -463,7 +463,7 @@ private void writeRecord(SinkRecord record) throws IOException { long expectedOffset = offset + recordCounter; if (offset == -1) { offset = record.kafkaOffset(); - } else if (record.kafkaOffset() != expectedOffset) { + } else if (record.kafkaOffset() < expectedOffset) { // Currently it's possible to see stale data with the wrong offset after a rebalance when you // rewind, which we do since we manage our own offsets. See KAFKA-2894. if (!sawInvalidOffset) {