diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index 45d1b200f7abc..6bb2ee15ed77a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -18,10 +18,6 @@ package org.apache.hudi.client; -import com.codahale.metrics.Timer; -import org.apache.avro.Schema; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hudi.async.AsyncArchiveService; import org.apache.hudi.async.AsyncCleanerService; import org.apache.hudi.avro.HoodieAvroUtils; @@ -98,6 +94,11 @@ import org.apache.hudi.table.marker.WriteMarkersFactory; import org.apache.hudi.table.upgrade.SupportsUpgradeDowngrade; import org.apache.hudi.table.upgrade.UpgradeDowngrade; + +import com.codahale.metrics.Timer; +import org.apache.avro.Schema; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -110,6 +111,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.BiConsumer; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -208,16 +210,24 @@ public boolean commit(String instantTime, O writeStatuses, Option> extraMetadata, + String commitActionType, Map> partitionToReplacedFileIds) { + return commit(instantTime, writeStatuses, extraMetadata, commitActionType, partitionToReplacedFileIds, + Option.empty()); + } + public abstract boolean commit(String instantTime, O writeStatuses, Option> extraMetadata, - String commitActionType, Map> partitionToReplacedFileIds); + String commitActionType, Map> partitionToReplacedFileIds, + Option> extraPreCommitFunc); public boolean commitStats(String instantTime, List stats, Option> extraMetadata, String commitActionType) { - return commitStats(instantTime, stats, extraMetadata, commitActionType, Collections.emptyMap()); + return commitStats(instantTime, stats, extraMetadata, commitActionType, Collections.emptyMap(), Option.empty()); } public boolean commitStats(String instantTime, List stats, Option> extraMetadata, - String commitActionType, Map> partitionToReplaceFileIds) { + String commitActionType, Map> partitionToReplaceFileIds, + Option> extraPreCommitFunc) { // Skip the empty commit if not allowed if (!config.allowEmptyCommit() && stats.isEmpty()) { return true; @@ -233,6 +243,9 @@ public boolean commitStats(String instantTime, List stats, Opti lastCompletedTxnAndMetadata.isPresent() ? Option.of(lastCompletedTxnAndMetadata.get().getLeft()) : Option.empty()); try { preCommit(inflightInstant, metadata); + if (extraPreCommitFunc.isPresent()) { + extraPreCommitFunc.get().accept(table.getMetaClient(), metadata); + } commit(table, commitActionType, instantTime, metadata, stats); // already within lock, and so no lock requried for archival postCommit(table, metadata, instantTime, extraMetadata, false); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 98f58db66cdee..24b7b6f83afb7 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -73,6 +73,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.function.BiConsumer; import java.util.stream.Collectors; /** @@ -114,7 +115,9 @@ protected HoodieIndex createIndex(HoodieWriteConfig writeConfig) { } @Override - public boolean commit(String instantTime, List writeStatuses, Option> extraMetadata, String commitActionType, Map> partitionToReplacedFileIds) { + public boolean commit(String instantTime, List writeStatuses, Option> extraMetadata, + String commitActionType, Map> partitionToReplacedFileIds, + Option> extraPreCommitFunc) { List writeStats = writeStatuses.parallelStream().map(WriteStatus::getStat).collect(Collectors.toList()); // for eager flush, multiple write stat may share one file path. List merged = writeStats.stream() @@ -122,7 +125,7 @@ public boolean commit(String instantTime, List writeStatuses, Optio .values().stream() .map(duplicates -> duplicates.stream().reduce(WriteStatMerger::merge).get()) .collect(Collectors.toList()); - return commitStats(instantTime, merged, extraMetadata, commitActionType, partitionToReplacedFileIds); + return commitStats(instantTime, merged, extraMetadata, commitActionType, partitionToReplacedFileIds, extraPreCommitFunc); } @Override diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java index b6951bc6b7874..1f205d0e82397 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java @@ -46,6 +46,7 @@ import java.util.List; import java.util.Map; +import java.util.function.BiConsumer; import java.util.stream.Collectors; public class HoodieJavaWriteClient extends @@ -82,9 +83,11 @@ public boolean commit(String instantTime, List writeStatuses, Option> extraMetadata, String commitActionType, - Map> partitionToReplacedFileIds) { + Map> partitionToReplacedFileIds, + Option> extraPreCommitFunc) { List writeStats = writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()); - return commitStats(instantTime, writeStats, extraMetadata, commitActionType, partitionToReplacedFileIds); + return commitStats(instantTime, writeStats, extraMetadata, commitActionType, partitionToReplacedFileIds, + extraPreCommitFunc); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index a73c92c383f38..888fa79fa41fb 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -45,8 +45,8 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.data.HoodieJavaRDD; import org.apache.hudi.exception.HoodieClusteringException; -import org.apache.hudi.exception.HoodieWriteConflictException; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieWriteConflictException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.SparkHoodieIndexFactory; import org.apache.hudi.metadata.HoodieTableMetadataWriter; @@ -71,6 +71,7 @@ import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; +import java.util.function.BiConsumer; import java.util.stream.Collectors; @SuppressWarnings("checkstyle:LineLength") @@ -120,10 +121,11 @@ protected HoodieIndex createIndex(HoodieWriteConfig writeConfig) { */ @Override public boolean commit(String instantTime, JavaRDD writeStatuses, Option> extraMetadata, - String commitActionType, Map> partitionToReplacedFileIds) { + String commitActionType, Map> partitionToReplacedFileIds, + Option> extraPreCommitFunc) { context.setJobStatus(this.getClass().getSimpleName(), "Committing stats: " + config.getTableName()); List writeStats = writeStatuses.map(WriteStatus::getStat).collect(); - return commitStats(instantTime, writeStats, extraMetadata, commitActionType, partitionToReplacedFileIds); + return commitStats(instantTime, writeStats, extraMetadata, commitActionType, partitionToReplacedFileIds, extraPreCommitFunc); } @Override diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index c3825e3426cb8..8e5bb9aad04b6 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -434,6 +434,16 @@ object DataSourceWriteOptions { + " within a streaming microbatch. Turning this on, could hide the write status errors while the spark checkpoint moves ahead." + "So, would recommend users to use this with caution.") + val STREAMING_CHECKPOINT_IDENTIFIER: ConfigProperty[String] = ConfigProperty + .key("hoodie.datasource.write.streaming.checkpoint.identifier") + .noDefaultValue() + .sinceVersion("0.13.0") + .withDocumentation("A stream identifier used for HUDI to fetch the right checkpoint(`batch id` to be more specific) " + + "corresponding this writer. Please note that keep the identifier an unique value for different writer " + + "if under multi-writer scenario. If the value is not set, will only keep the checkpoint info in the memory. " + + "This could introduce the potential issue that the job is restart(`batch id` is lost) while spark checkpoint write fails, " + + "causing spark will retry and rewrite the data.") + val META_SYNC_CLIENT_TOOL_CLASS_NAME: ConfigProperty[String] = ConfigProperty .key("hoodie.meta.sync.client.tool.class") .defaultValue(classOf[HiveSyncTool].getName) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index f0ede2b2b82bf..481d0df194457 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -62,6 +62,7 @@ import org.apache.spark.sql.internal.StaticSQLConf import org.apache.spark.sql.types.StructType import org.apache.spark.{SPARK_VERSION, SparkContext} +import java.util.function.BiConsumer import scala.collection.JavaConversions._ import scala.collection.JavaConverters.setAsJavaSetConverter import scala.collection.mutable @@ -81,7 +82,8 @@ object HoodieSparkSqlWriter { hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty, hoodieWriteClient: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty, asyncCompactionTriggerFn: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]] => Unit] = Option.empty, - asyncClusteringTriggerFn: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]] => Unit] = Option.empty) + asyncClusteringTriggerFn: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]] => Unit] = Option.empty, + extraPreCommitFn: Option[BiConsumer[HoodieTableMetaClient, HoodieCommitMetadata]] = Option.empty) : (Boolean, common.util.Option[String], common.util.Option[String], common.util.Option[String], SparkRDDWriteClient[HoodieRecordPayload[Nothing]], HoodieTableConfig) = { @@ -183,7 +185,7 @@ object HoodieSparkSqlWriter { .setShouldDropPartitionColumns(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.DROP_PARTITION_COLUMNS)) .setCommitTimezone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE))) .initTable(sparkContext.hadoopConfiguration, path) - } + } tableConfig = tableMetaClient.getTableConfig val commitActionType = CommitUtils.getCommitActionType(operation, tableConfig.getTableType) @@ -203,14 +205,14 @@ object HoodieSparkSqlWriter { val sourceSchema = convertStructTypeToAvroSchema(df.schema, avroRecordName, avroRecordNamespace) val internalSchemaOpt = getLatestTableInternalSchema(hoodieConfig, tableMetaClient).orElse { - // In case we need to reconcile the schema and schema evolution is enabled, - // we will force-apply schema evolution to the writer's schema - if (shouldReconcileSchema && hoodieConfig.getBooleanOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED)) { - val allowOperationMetaDataField = parameters.getOrElse(HoodieWriteConfig.ALLOW_OPERATION_METADATA_FIELD.key(), "false").toBoolean - Some(AvroInternalSchemaConverter.convert(HoodieAvroUtils.addMetadataFields(latestTableSchemaOpt.getOrElse(sourceSchema), allowOperationMetaDataField))) - } else { - None - } + // In case we need to reconcile the schema and schema evolution is enabled, + // we will force-apply schema evolution to the writer's schema + if (shouldReconcileSchema && hoodieConfig.getBooleanOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED)) { + val allowOperationMetaDataField = parameters.getOrElse(HoodieWriteConfig.ALLOW_OPERATION_METADATA_FIELD.key(), "false").toBoolean + Some(AvroInternalSchemaConverter.convert(HoodieAvroUtils.addMetadataFields(latestTableSchemaOpt.getOrElse(sourceSchema), allowOperationMetaDataField))) + } else { + None + } } // NOTE: Target writer's schema is deduced based on @@ -378,7 +380,7 @@ object HoodieSparkSqlWriter { val (writeSuccessful, compactionInstant, clusteringInstant) = commitAndPerformPostOperations(sqlContext.sparkSession, df.schema, writeResult, parameters, writeClient, tableConfig, jsc, - TableInstantInfo(basePath, instantTime, commitActionType, operation)) + TableInstantInfo(basePath, instantTime, commitActionType, operation), extraPreCommitFn) (writeSuccessful, common.util.Option.ofNullable(instantTime), compactionInstant, clusteringInstant, writeClient, tableConfig) } @@ -567,7 +569,7 @@ object HoodieSparkSqlWriter { def getLatestTableInternalSchema(config: HoodieConfig, tableMetaClient: HoodieTableMetaClient): Option[InternalSchema] = { if (!config.getBooleanOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED)) { - Option.empty[InternalSchema] + Option.empty[InternalSchema] } else { try { val tableSchemaResolver = new TableSchemaResolver(tableMetaClient) @@ -885,7 +887,8 @@ object HoodieSparkSqlWriter { client: SparkRDDWriteClient[HoodieRecordPayload[Nothing]], tableConfig: HoodieTableConfig, jsc: JavaSparkContext, - tableInstantInfo: TableInstantInfo + tableInstantInfo: TableInstantInfo, + extraPreCommitFn: Option[BiConsumer[HoodieTableMetaClient, HoodieCommitMetadata]] ): (Boolean, common.util.Option[java.lang.String], common.util.Option[java.lang.String]) = { if (writeResult.getWriteStatuses.rdd.filter(ws => ws.hasErrors).count() == 0) { log.info("Proceeding to commit the write.") @@ -895,7 +898,8 @@ object HoodieSparkSqlWriter { client.commit(tableInstantInfo.instantTime, writeResult.getWriteStatuses, common.util.Option.of(new java.util.HashMap[String, String](mapAsJavaMap(metaMap))), tableInstantInfo.commitActionType, - writeResult.getPartitionToReplaceFileIds) + writeResult.getPartitionToReplaceFileIds, + common.util.Option.ofNullable(extraPreCommitFn.orNull)) if (commitSuccess) { log.info("Commit " + tableInstantInfo.instantTime + " successful!") @@ -981,7 +985,7 @@ object HoodieSparkSqlWriter { } private def mergeParamsAndGetHoodieConfig(optParams: Map[String, String], - tableConfig: HoodieTableConfig, mode: SaveMode): (Map[String, String], HoodieConfig) = { + tableConfig: HoodieTableConfig, mode: SaveMode): (Map[String, String], HoodieConfig) = { val translatedOptions = DataSourceWriteOptions.translateSqlOptions(optParams) val mergedParams = mutable.Map.empty ++ HoodieWriterUtils.parametersWithWriteDefaults(translatedOptions) if (!mergedParams.contains(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala index 23b79a5ed4181..ae50e3c56c1a5 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala @@ -16,20 +16,20 @@ */ package org.apache.hudi -import com.fasterxml.jackson.annotation.JsonInclude.Include -import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} +import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.scala.DefaultScalaModule import org.apache.hudi.DataSourceWriteOptions._ +import org.apache.hudi.HoodieSinkCheckpoint.SINK_CHECKPOINT_KEY import org.apache.hudi.async.{AsyncClusteringService, AsyncCompactService, SparkStreamingAsyncClusteringService, SparkStreamingAsyncCompactService} import org.apache.hudi.client.SparkRDDWriteClient import org.apache.hudi.client.common.HoodieSparkEngineContext -import org.apache.hudi.common.model.HoodieRecordPayload +import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieRecordPayload} import org.apache.hudi.common.table.marker.MarkerType import org.apache.hudi.common.table.timeline.HoodieInstant.State import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} -import org.apache.hudi.common.util.ValidationUtils.checkArgument -import org.apache.hudi.common.util.{ClusteringUtils, CommitUtils, CompactionUtils, StringUtils} +import org.apache.hudi.common.util.ValidationUtils.{checkArgument, checkState} +import org.apache.hudi.common.util.{ClusteringUtils, CommitUtils, CompactionUtils, JsonUtils, StringUtils} import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.exception.{HoodieCorruptedDataException, HoodieException, TableNotFoundException} import org.apache.log4j.LogManager @@ -39,7 +39,7 @@ import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode} import java.lang -import java.util.function.Function +import java.util.function.{BiConsumer, Function} import scala.collection.JavaConversions._ import scala.util.{Failure, Success, Try} @@ -75,8 +75,6 @@ class HoodieStreamingSink(sqlContext: SQLContext, STREAMING_RETRY_INTERVAL_MS.defaultValue).toLong private val ignoreFailedBatch = options.getOrDefault(STREAMING_IGNORE_FAILED_BATCH.key, STREAMING_IGNORE_FAILED_BATCH.defaultValue).toBoolean - // This constant serves as the checkpoint key for streaming sink so that each microbatch is processed exactly-once. - private val SINK_CHECKPOINT_KEY = "_hudi_streaming_sink_checkpoint" private var isAsyncCompactorServiceShutdownAbnormally = false private var isAsyncClusteringServiceShutdownAbnormally = false @@ -115,18 +113,38 @@ class HoodieStreamingSink(sqlContext: SQLContext, var updatedOptions = options.updated(HoodieWriteConfig.MARKERS_TYPE.key(), MarkerType.DIRECT.name()) // we need auto adjustment enabled for streaming sink since async table services are feasible within the same JVM. updatedOptions = updatedOptions.updated(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key, "true") - // Add batchId as checkpoint to the extra metadata. To enable same checkpoint metadata structure for multi-writers, - // SINK_CHECKPOINT_KEY holds a map of batchId to writer context (composed of applicationId and queryId), e.g. - // "_hudi_streaming_sink_checkpoint" : "{\"$batchId\":\"${sqlContext.sparkContext.applicationId}-$queryId\"}" - // NOTE: In case of multi-writers, this map should be mutable and sorted by key to facilitate merging of batchIds. - // HUDI-4432 tracks the implementation of checkpoint management for multi-writer. - val checkpointMap = Map(batchId.toString -> s"${sqlContext.sparkContext.applicationId}-$queryId") - updatedOptions = updatedOptions.updated(SINK_CHECKPOINT_KEY, HoodieSinkCheckpoint.toJson(checkpointMap)) retry(retryCnt, retryIntervalMs)( Try( HoodieSparkSqlWriter.write( - sqlContext, mode, updatedOptions, data, hoodieTableConfig, writeClient, Some(triggerAsyncCompactor), Some(triggerAsyncClustering)) + sqlContext, mode, updatedOptions, data, hoodieTableConfig, writeClient, Some(triggerAsyncCompactor), Some(triggerAsyncClustering), + extraPreCommitFn = Some(new BiConsumer[HoodieTableMetaClient, HoodieCommitMetadata] { + + override def accept(metaClient: HoodieTableMetaClient, + newCommitMetadata: HoodieCommitMetadata): Unit = { + options.get(STREAMING_CHECKPOINT_IDENTIFIER.key()) match { + case Some(identifier) => + // Fetch the latestCommit with checkpoint Info again to avoid concurrency issue in multi-write scenario. + val lastCheckpointCommitMetadata = CommitUtils.getLatestCommitMetadataWithValidCheckpointInfo( + metaClient.getActiveTimeline.getCommitsTimeline, SINK_CHECKPOINT_KEY) + var checkpointString = "" + if (lastCheckpointCommitMetadata.isPresent) { + val lastCheckpoint = lastCheckpointCommitMetadata.get.getMetadata(SINK_CHECKPOINT_KEY) + if (!StringUtils.isNullOrEmpty(lastCheckpoint)) { + checkpointString = HoodieSinkCheckpoint.toJson(HoodieSinkCheckpoint.fromJson(lastCheckpoint) + (identifier -> batchId.toString)) + } else { + checkpointString = HoodieSinkCheckpoint.toJson(Map(identifier -> batchId.toString)) + } + } else { + checkpointString = HoodieSinkCheckpoint.toJson(Map(identifier -> batchId.toString)) + } + + newCommitMetadata.addMetadata(SINK_CHECKPOINT_KEY, checkpointString) + case None => + // No op since keeping batch id in memory only. + } + } + })) ) match { case Success((true, commitOps, compactionInstantOps, clusteringInstant, client, tableConfig)) => @@ -298,14 +316,19 @@ class HoodieStreamingSink(sqlContext: SQLContext, private def canSkipBatch(incomingBatchId: Long, operationType: String): Boolean = { if (!DELETE_OPERATION_OPT_VAL.equals(operationType)) { - // get the latest checkpoint from the commit metadata to check if the microbatch has already been prcessed or not - val commitMetadata = CommitUtils.getLatestCommitMetadataWithValidCheckpointInfo( - metaClient.get.getActiveTimeline.getCommitsTimeline, SINK_CHECKPOINT_KEY) - if (commitMetadata.isPresent) { - val lastCheckpoint = commitMetadata.get.getMetadata(SINK_CHECKPOINT_KEY) - if (!StringUtils.isNullOrEmpty(lastCheckpoint)) { - latestCommittedBatchId = HoodieSinkCheckpoint.fromJson(lastCheckpoint).keys.head.toLong - } + options.get(STREAMING_CHECKPOINT_IDENTIFIER.key()) match { + case Some(identifier) => + // get the latest checkpoint from the commit metadata to check if the microbatch has already been prcessed or not + val commitMetadata = CommitUtils.getLatestCommitMetadataWithValidCheckpointInfo( + metaClient.get.getActiveTimeline.getCommitsTimeline, SINK_CHECKPOINT_KEY) + if (commitMetadata.isPresent) { + val lastCheckpoint = commitMetadata.get.getMetadata(SINK_CHECKPOINT_KEY) + if (!StringUtils.isNullOrEmpty(lastCheckpoint)) { + HoodieSinkCheckpoint.fromJson(lastCheckpoint).get(identifier).foreach(commitBatchId => + latestCommittedBatchId = commitBatchId.toLong) + } + } + case None => } latestCommittedBatchId >= incomingBatchId } else { @@ -322,13 +345,14 @@ class HoodieStreamingSink(sqlContext: SQLContext, object HoodieSinkCheckpoint { lazy val mapper: ObjectMapper = { - val _mapper = new ObjectMapper - _mapper.setSerializationInclusion(Include.NON_ABSENT) - _mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + val _mapper = JsonUtils.getObjectMapper _mapper.registerModule(DefaultScalaModule) _mapper } + // This constant serves as the checkpoint key for streaming sink so that each microbatch is processed exactly-once. + val SINK_CHECKPOINT_KEY = "_hudi_streaming_sink_checkpoint" + def toJson(checkpoint: Map[String, String]): String = { mapper.writeValueAsString(checkpoint) } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala index 1382bafb7621a..d4651f9ab7880 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala @@ -18,22 +18,24 @@ package org.apache.hudi.functional import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hudi.DataSourceWriteOptions.STREAMING_CHECKPOINT_IDENTIFIER +import org.apache.hudi.HoodieSinkCheckpoint.SINK_CHECKPOINT_KEY import org.apache.hudi.common.model.{FileSlice, HoodieTableType} import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.timeline.HoodieTimeline import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings import org.apache.hudi.common.testutils.{HoodieTestDataGenerator, HoodieTestTable} -import org.apache.hudi.common.util.CollectionUtils +import org.apache.hudi.common.util.{CollectionUtils, CommitUtils} import org.apache.hudi.config.{HoodieClusteringConfig, HoodieCompactionConfig, HoodieStorageConfig, HoodieWriteConfig} import org.apache.hudi.exception.TableNotFoundException import org.apache.hudi.testutils.HoodieClientTestBase -import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers} +import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, HoodieSinkCheckpoint} import org.apache.log4j.LogManager import org.apache.spark.sql._ import org.apache.spark.sql.streaming.{OutputMode, Trigger} import org.apache.spark.sql.types.StructType import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} -import org.junit.jupiter.api.{AfterEach, BeforeEach} +import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.{EnumSource, ValueSource} @@ -257,6 +259,72 @@ class TestStructuredStreaming extends HoodieClientTestBase { structuredStreamingTestRunner(HoodieTableType.MERGE_ON_READ, true, isAsyncCompaction) } + @Test + def testStructuredStreamingWithCheckpoint(): Unit = { + val (sourcePath, destPath) = initStreamingSourceAndDestPath("source", "dest") + + val records1 = recordsToStrings(dataGen.generateInsertsForPartition("000", 100, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).toList + val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) + val schema = inputDF1.schema + + inputDF1.coalesce(1).write.mode(SaveMode.Append).json(sourcePath) + + val query1 = spark.readStream + .schema(schema) + .json(sourcePath) + .writeStream + .format("org.apache.hudi") + .options(commonOpts) + .outputMode(OutputMode.Append) + .option(STREAMING_CHECKPOINT_IDENTIFIER.key(), "streaming_identifier1") + .option("checkpointLocation", s"${basePath}/checkpoint1") + .start(destPath) + + query1.processAllAvailable() + val metaClient = HoodieTableMetaClient.builder + .setConf(fs.getConf).setBasePath(destPath).setLoadActiveTimelineOnLoad(true).build + + assertLatestCheckpointInfoMatched(metaClient, "streaming_identifier1", "0") + + // Add another identifier checkpoint info to the commit. + val records2 = recordsToStrings(dataGen.generateInsertsForPartition("001", 100, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).toList + val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2)) + + inputDF2.coalesce(1).write.mode(SaveMode.Append).json(sourcePath) + + val query2 = spark.readStream + .schema(schema) + .json(sourcePath) + .writeStream + .format("org.apache.hudi") + .options(commonOpts) + .outputMode(OutputMode.Append) + .option(STREAMING_CHECKPOINT_IDENTIFIER.key(), "streaming_identifier2") + .option("checkpointLocation", s"${basePath}/checkpoint2") + .start(destPath) + query2.processAllAvailable() + query1.processAllAvailable() + + query1.stop() + query2.stop() + + assertLatestCheckpointInfoMatched(metaClient, "streaming_identifier2", "0") + assertLatestCheckpointInfoMatched(metaClient, "streaming_identifier1", "1") + } + + def assertLatestCheckpointInfoMatched(metaClient: HoodieTableMetaClient, + identifier: String, + expectBatchId: String): Unit = { + metaClient.reloadActiveTimeline() + val lastCheckpointCommitMetadata = CommitUtils.getLatestCommitMetadataWithValidCheckpointInfo( + metaClient.getActiveTimeline.getCommitsTimeline, SINK_CHECKPOINT_KEY) + + assertTrue(lastCheckpointCommitMetadata.isPresent) + val checkpointMap = HoodieSinkCheckpoint.fromJson(lastCheckpointCommitMetadata.get().getMetadata(SINK_CHECKPOINT_KEY)) + + assertEquals(checkpointMap.get(identifier).orNull, expectBatchId) + } + def structuredStreamingForTestClusteringRunner(sourcePath: String, destPath: String, tableType: HoodieTableType, isInlineClustering: Boolean, isAsyncClustering: Boolean, partitionOfRecords: String, checkClusteringResult: String => Unit): Unit = {