diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/callback/SourceCommitCallback.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/callback/SourceCommitCallback.java new file mode 100644 index 0000000000000..3d3597320467c --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/callback/SourceCommitCallback.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.callback; + +/** + * A callback interface that provides the Source an option to perform action on successful Hudi commit. + */ +public interface SourceCommitCallback { + + /** + * Performs some action on successful Hudi commit like committing offsets to Kafka. + * + * @param lastCkptStr last checkpoint string. + */ + default void onCommit(String lastCkptStr) { + } +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index c60449a797688..cdea9120cb7f9 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -473,7 +473,7 @@ private Pair, JavaRDD> writeToSink(JavaRDD> fetchNewDataInRowFormat(Option lastCkptS throw new IllegalArgumentException("Unknown source type (" + source.getSourceType() + ")"); } } + + public Source getSource() { + return source; + } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java index 124487941bd5d..652e442a89d21 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java @@ -40,6 +40,9 @@ import org.apache.spark.streaming.kafka010.LocationStrategies; import org.apache.spark.streaming.kafka010.OffsetRange; +import static org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config.DEFAULT_ENABLE_KAFKA_COMMIT_OFFSET; +import static org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET; + /** * Reads avro serialized Kafka data, based on the confluent schema-registry. */ @@ -95,4 +98,11 @@ private JavaRDD toRDD(OffsetRange[] offsetRanges) { return KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges, LocationStrategies.PreferConsistent()).map(obj -> (GenericRecord) obj.value()); } + + @Override + public void onCommit(String lastCkptStr) { + if (this.props.getBoolean(ENABLE_KAFKA_COMMIT_OFFSET, DEFAULT_ENABLE_KAFKA_COMMIT_OFFSET)) { + offsetGen.commitOffsetToKafka(lastCkptStr); + } + } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java index cedaba48c2e3d..c1e2e3dad97ce 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java @@ -35,6 +35,9 @@ import org.apache.spark.streaming.kafka010.LocationStrategies; import org.apache.spark.streaming.kafka010.OffsetRange; +import static org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config.DEFAULT_ENABLE_KAFKA_COMMIT_OFFSET; +import static org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET; + /** * Read json kafka data. */ @@ -71,4 +74,11 @@ private JavaRDD toRDD(OffsetRange[] offsetRanges) { return KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges, LocationStrategies.PreferConsistent()).map(x -> (String) x.value()); } + + @Override + public void onCommit(String lastCkptStr) { + if (this.props.getBoolean(ENABLE_KAFKA_COMMIT_OFFSET, DEFAULT_ENABLE_KAFKA_COMMIT_OFFSET)) { + offsetGen.commitOffsetToKafka(lastCkptStr); + } + } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java index 4d25d479a774a..6d610d5c8cbdc 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java @@ -23,6 +23,7 @@ import org.apache.hudi.PublicAPIMethod; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.Option; +import org.apache.hudi.utilities.callback.SourceCommitCallback; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.spark.api.java.JavaSparkContext; @@ -34,7 +35,7 @@ * Represents a source from which we can tail data. Assumes a constructor that takes properties. */ @PublicAPIClass(maturity = ApiMaturityLevel.STABLE) -public abstract class Source implements Serializable { +public abstract class Source implements SourceCommitCallback, Serializable { public enum SourceType { JSON, AVRO, ROW diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java index be23002f8a6f7..d3f410d029d18 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java @@ -26,9 +26,13 @@ import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics; +import org.apache.kafka.clients.consumer.CommitFailedException; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.streaming.kafka010.OffsetRange; @@ -157,6 +161,8 @@ public static class Config { private static final String KAFKA_TOPIC_NAME = "hoodie.deltastreamer.source.kafka.topic"; private static final String MAX_EVENTS_FROM_KAFKA_SOURCE_PROP = "hoodie.deltastreamer.kafka.source.maxEvents"; + public static final String ENABLE_KAFKA_COMMIT_OFFSET = "hoodie.deltastreamer.source.kafka.enable.commit.offset"; + public static final Boolean DEFAULT_ENABLE_KAFKA_COMMIT_OFFSET = false; // "auto.offset.reset" is kafka native config param. Do not change the config param name. public static final String KAFKA_AUTO_OFFSET_RESET = "auto.offset.reset"; private static final KafkaResetOffsetStrategies DEFAULT_KAFKA_AUTO_OFFSET_RESET = KafkaResetOffsetStrategies.LATEST; @@ -164,24 +170,14 @@ public static class Config { public static long maxEventsFromKafkaSource = DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE; } - private final HashMap kafkaParams; + private final Map kafkaParams; private final TypedProperties props; protected final String topicName; private KafkaResetOffsetStrategies autoResetValue; public KafkaOffsetGen(TypedProperties props) { this.props = props; - - kafkaParams = new HashMap<>(); - props.keySet().stream().filter(prop -> { - // In order to prevent printing unnecessary warn logs, here filter out the hoodie - // configuration items before passing to kafkaParams - return !prop.toString().startsWith("hoodie.") - // We need to pass some properties to kafka client so that KafkaAvroSchemaDeserializer can use it - || prop.toString().startsWith("hoodie.deltastreamer.source.kafka.value.deserializer."); - }).forEach(prop -> { - kafkaParams.put(prop.toString(), props.get(prop.toString())); - }); + kafkaParams = excludeHoodieConfigs(props); DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.KAFKA_TOPIC_NAME)); topicName = props.getString(Config.KAFKA_TOPIC_NAME); String kafkaAutoResetOffsetsStr = props.getString(Config.KAFKA_AUTO_OFFSET_RESET, Config.DEFAULT_KAFKA_AUTO_OFFSET_RESET.name().toLowerCase()); @@ -299,7 +295,36 @@ public String getTopicName() { return topicName; } - public HashMap getKafkaParams() { + public Map getKafkaParams() { + return kafkaParams; + } + + private Map excludeHoodieConfigs(TypedProperties props) { + Map kafkaParams = new HashMap<>(); + props.keySet().stream().filter(prop -> { + // In order to prevent printing unnecessary warn logs, here filter out the hoodie + // configuration items before passing to kafkaParams + return !prop.toString().startsWith("hoodie."); + }).forEach(prop -> { + kafkaParams.put(prop.toString(), props.get(prop.toString())); + }); return kafkaParams; } + + /** + * Commit offsets to Kafka only after hoodie commit is successful. + * @param checkpointStr checkpoint string containing offsets. + */ + public void commitOffsetToKafka(String checkpointStr) { + DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(ConsumerConfig.GROUP_ID_CONFIG)); + Map offsetMap = CheckpointUtils.strToOffsets(checkpointStr); + Map kafkaParams = excludeHoodieConfigs(props); + Map offsetAndMetadataMap = new HashMap<>(offsetMap.size()); + try (KafkaConsumer consumer = new KafkaConsumer(kafkaParams)) { + offsetMap.forEach((topicPartition, offset) -> offsetAndMetadataMap.put(topicPartition, new OffsetAndMetadata(offset))); + consumer.commitSync(offsetAndMetadataMap); + } catch (CommitFailedException | TimeoutException e) { + LOG.warn("Committing offsets to Kafka failed, this does not impact processing of records", e); + } + } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java index ec8a945c41065..a1a00faa592a6 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics; import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter; import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; @@ -30,6 +31,9 @@ import org.apache.avro.generic.GenericRecord; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; @@ -40,9 +44,15 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; import java.util.UUID; +import static org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.mock; /** @@ -58,7 +68,7 @@ public class TestKafkaSource extends UtilitiesTestBase { @BeforeAll public static void initClass() throws Exception { - UtilitiesTestBase.initClass(); + UtilitiesTestBase.initClass(false); } @AfterAll @@ -85,6 +95,7 @@ private TypedProperties createPropsForJsonSource(Long maxEventsToReadFromKafkaSo props.setProperty("hoodie.deltastreamer.source.kafka.topic", TEST_TOPIC_NAME); props.setProperty("bootstrap.servers", testUtils.brokerAddress()); props.setProperty(Config.KAFKA_AUTO_OFFSET_RESET, resetStrategy); + props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", maxEventsToReadFromKafkaSource != null ? String.valueOf(maxEventsToReadFromKafkaSource) : String.valueOf(Config.maxEventsFromKafkaSource)); @@ -276,4 +287,64 @@ public void testJsonKafkaSourceWithConfigurableUpperCap() { kafkaSource.fetchNewDataInAvroFormat(Option.of(fetch4.getCheckpointForNextBatch()), Long.MAX_VALUE); assertEquals(Option.empty(), fetch6.getBatch()); } + + @Test + public void testCommitOffsetToKafka() { + // topic setup. + testUtils.createTopic(TEST_TOPIC_NAME, 2); + List topicPartitions = new ArrayList<>(); + TopicPartition topicPartition0 = new TopicPartition(TEST_TOPIC_NAME, 0); + topicPartitions.add(topicPartition0); + TopicPartition topicPartition1 = new TopicPartition(TEST_TOPIC_NAME, 1); + topicPartitions.add(topicPartition1); + + HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); + TypedProperties props = createPropsForJsonSource(null, "earliest"); + props.put(ENABLE_KAFKA_COMMIT_OFFSET, "true"); + Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider, metrics); + SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource); + + // 1. Extract without any checkpoint => get all the data, respecting sourceLimit + assertEquals(Option.empty(), kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch()); + testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000))); + + InputBatch> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 599); + // commit to kafka after first batch + kafkaSource.getSource().onCommit(fetch1.getCheckpointForNextBatch()); + try (KafkaConsumer consumer = new KafkaConsumer(props)) { + consumer.assign(topicPartitions); + + OffsetAndMetadata offsetAndMetadata = consumer.committed(topicPartition0); + assertNotNull(offsetAndMetadata); + assertEquals(300, offsetAndMetadata.offset()); + offsetAndMetadata = consumer.committed(topicPartition1); + assertNotNull(offsetAndMetadata); + assertEquals(299, offsetAndMetadata.offset()); + // end offsets will point to 500 for each partition because we consumed less messages from first batch + Map endOffsets = consumer.endOffsets(topicPartitions); + assertEquals(500L, endOffsets.get(topicPartition0)); + assertEquals(500L, endOffsets.get(topicPartition1)); + + testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("001", 500))); + InputBatch> fetch2 = + kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE); + + // commit to Kafka after second batch is processed completely + kafkaSource.getSource().onCommit(fetch2.getCheckpointForNextBatch()); + + offsetAndMetadata = consumer.committed(topicPartition0); + assertNotNull(offsetAndMetadata); + assertEquals(750, offsetAndMetadata.offset()); + offsetAndMetadata = consumer.committed(topicPartition1); + assertNotNull(offsetAndMetadata); + assertEquals(750, offsetAndMetadata.offset()); + + endOffsets = consumer.endOffsets(topicPartitions); + assertEquals(750L, endOffsets.get(topicPartition0)); + assertEquals(750L, endOffsets.get(topicPartition1)); + } + // check failure case + props.remove(ConsumerConfig.GROUP_ID_CONFIG); + assertThrows(HoodieNotSupportedException.class,() -> kafkaSource.getSource().onCommit("")); + } }