diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java index d3f410d029d18..99e960cf55fa5 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java @@ -151,7 +151,7 @@ public static long totalNewMessages(OffsetRange[] ranges) { * Kafka reset offset strategies. */ enum KafkaResetOffsetStrategies { - LATEST, EARLIEST + LATEST, EARLIEST, GROUP } /** @@ -192,6 +192,9 @@ public KafkaOffsetGen(TypedProperties props) { if (!found) { throw new HoodieDeltaStreamerException(Config.KAFKA_AUTO_OFFSET_RESET + " config set to unknown value " + kafkaAutoResetOffsetsStr); } + if (autoResetValue.equals(KafkaResetOffsetStrategies.GROUP)) { + this.kafkaParams.put(Config.KAFKA_AUTO_OFFSET_RESET, Config.DEFAULT_KAFKA_AUTO_OFFSET_RESET.name().toLowerCase()); + } } public OffsetRange[] getNextOffsetRanges(Option lastCheckpointStr, long sourceLimit, HoodieDeltaStreamerMetrics metrics) { @@ -220,8 +223,11 @@ public OffsetRange[] getNextOffsetRanges(Option lastCheckpointStr, long case LATEST: fromOffsets = consumer.endOffsets(topicPartitions); break; + case GROUP: + fromOffsets = getGroupOffsets(consumer, topicPartitions); + break; default: - throw new HoodieNotSupportedException("Auto reset value must be one of 'earliest' or 'latest' "); + throw new HoodieNotSupportedException("Auto reset value must be one of 'earliest' or 'latest' or 'group' "); } } @@ -318,7 +324,6 @@ private Map excludeHoodieConfigs(TypedProperties props) { public void commitOffsetToKafka(String checkpointStr) { DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(ConsumerConfig.GROUP_ID_CONFIG)); Map offsetMap = CheckpointUtils.strToOffsets(checkpointStr); - Map kafkaParams = excludeHoodieConfigs(props); Map offsetAndMetadataMap = new HashMap<>(offsetMap.size()); try (KafkaConsumer consumer = new KafkaConsumer(kafkaParams)) { offsetMap.forEach((topicPartition, offset) -> offsetAndMetadataMap.put(topicPartition, new OffsetAndMetadata(offset))); @@ -327,4 +332,19 @@ public void commitOffsetToKafka(String checkpointStr) { LOG.warn("Committing offsets to Kafka failed, this does not impact processing of records", e); } } + + private Map getGroupOffsets(KafkaConsumer consumer, Set topicPartitions) { + Map fromOffsets = new HashMap<>(); + for (TopicPartition topicPartition : topicPartitions) { + OffsetAndMetadata committedOffsetAndMetadata = consumer.committed(topicPartition); + if (committedOffsetAndMetadata != null) { + fromOffsets.put(topicPartition, committedOffsetAndMetadata.offset()); + } else { + LOG.warn("There are no commits associated with this consumer group, starting to consume from latest offset"); + fromOffsets = consumer.endOffsets(topicPartitions); + break; + } + } + return fromOffsets; + } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java index 508c90aa6f2b2..ccc141b51a491 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java @@ -25,6 +25,7 @@ import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics; import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config; import org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.spark.streaming.kafka010.KafkaTestUtils; @@ -33,6 +34,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.util.UUID; + import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -66,6 +69,7 @@ private TypedProperties getConsumerConfigs(String autoOffsetReset) { props.setProperty("bootstrap.servers", testUtils.brokerAddress()); props.setProperty("key.deserializer", StringDeserializer.class.getName()); props.setProperty("value.deserializer", StringDeserializer.class.getName()); + props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); return props; } @@ -127,6 +131,30 @@ public void testGetNextOffsetRangesFromMultiplePartitions() { assertEquals(249, nextOffsetRanges[1].untilOffset()); } + @Test + public void testGetNextOffsetRangesFromGroup() { + HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); + testUtils.createTopic(TEST_TOPIC_NAME, 2); + testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000))); + KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("group")); + String lastCheckpointString = TEST_TOPIC_NAME + ",0:250,1:249"; + kafkaOffsetGen.commitOffsetToKafka(lastCheckpointString); + // don't pass lastCheckpointString as we want to read from group committed offset + OffsetRange[] nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 300, metrics); + assertEquals(250, nextOffsetRanges[0].fromOffset()); + assertEquals(400, nextOffsetRanges[0].untilOffset()); + assertEquals(249, nextOffsetRanges[1].fromOffset()); + assertEquals(399, nextOffsetRanges[1].untilOffset()); + + // committed offsets are not present for the consumer group + kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("group")); + nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 300, metrics); + assertEquals(500, nextOffsetRanges[0].fromOffset()); + assertEquals(500, nextOffsetRanges[0].untilOffset()); + assertEquals(500, nextOffsetRanges[1].fromOffset()); + assertEquals(500, nextOffsetRanges[1].untilOffset()); + } + @Test public void testCheckTopicExists() { TypedProperties props = getConsumerConfigs("latest");