diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java index 08b775f60ee95..59ea8f1d063f9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java @@ -26,11 +26,13 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; import org.apache.avro.Schema; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -138,4 +140,29 @@ public static HashMap getFileIdWithoutSuffixAndRelativePaths(Map } return fileIdToPath; } + + /** + * Process previous commits metadata in the timeline to determine the checkpoint given a checkpoint key. + * NOTE: This is very similar in intent to DeltaSync#getLatestCommitMetadataWithValidCheckpointInfo except that + * different deployment models (deltastreamer or spark structured streaming) could have different checkpoint keys. + * + * @param timeline completed commits in active timeline. + * @param checkpointKey the checkpoint key in the extra metadata of the commit. + * @return An optional commit metadata with latest checkpoint. + */ + public static Option getLatestCommitMetadataWithValidCheckpointInfo(HoodieTimeline timeline, String checkpointKey) { + return (Option) timeline.getReverseOrderedInstants().map(instant -> { + try { + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata + .fromBytes(timeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class); + if (StringUtils.nonEmpty(commitMetadata.getMetadata(checkpointKey))) { + return Option.of(commitMetadata); + } else { + return Option.empty(); + } + } catch (IOException e) { + throw new HoodieIOException("Failed to parse HoodieCommitMetadata for " + instant.toString(), e); + } + }).filter(Option::isPresent).findFirst().orElse(Option.empty()); + } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala index 5f7cee1df0123..23b79a5ed4181 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala @@ -16,6 +16,10 @@ */ package org.apache.hudi +import com.fasterxml.jackson.annotation.JsonInclude.Include +import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} +import com.fasterxml.jackson.module.scala.DefaultScalaModule +import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.async.{AsyncClusteringService, AsyncCompactService, SparkStreamingAsyncClusteringService, SparkStreamingAsyncCompactService} import org.apache.hudi.client.SparkRDDWriteClient import org.apache.hudi.client.common.HoodieSparkEngineContext @@ -24,12 +28,13 @@ import org.apache.hudi.common.table.marker.MarkerType import org.apache.hudi.common.table.timeline.HoodieInstant.State import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} -import org.apache.hudi.common.util.{ClusteringUtils, CompactionUtils} +import org.apache.hudi.common.util.ValidationUtils.checkArgument +import org.apache.hudi.common.util.{ClusteringUtils, CommitUtils, CompactionUtils, StringUtils} import org.apache.hudi.config.HoodieWriteConfig -import org.apache.hudi.exception.HoodieCorruptedDataException +import org.apache.hudi.exception.{HoodieCorruptedDataException, HoodieException, TableNotFoundException} import org.apache.log4j.LogManager import org.apache.spark.api.java.JavaSparkContext -import org.apache.spark.sql.execution.streaming.Sink +import org.apache.spark.sql.execution.streaming.{Sink, StreamExecution} import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode} @@ -44,16 +49,34 @@ class HoodieStreamingSink(sqlContext: SQLContext, outputMode: OutputMode) extends Sink with Serializable { - @volatile private var latestBatchId = -1L + @volatile private var latestCommittedBatchId = -1L private val log = LogManager.getLogger(classOf[HoodieStreamingSink]) - private val retryCnt = options.getOrDefault(DataSourceWriteOptions.STREAMING_RETRY_CNT.key, - DataSourceWriteOptions.STREAMING_RETRY_CNT.defaultValue).toInt - private val retryIntervalMs = options.getOrDefault(DataSourceWriteOptions.STREAMING_RETRY_INTERVAL_MS.key, - DataSourceWriteOptions.STREAMING_RETRY_INTERVAL_MS.defaultValue).toLong - private val ignoreFailedBatch = options.getOrDefault(DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.key, - DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.defaultValue).toBoolean + private val tablePath = options.get("path") + if (tablePath.isEmpty || tablePath.get == null) { + throw new HoodieException(s"'path' must be specified.") + } + private var metaClient: Option[HoodieTableMetaClient] = { + try { + Some(HoodieTableMetaClient.builder() + .setConf(sqlContext.sparkContext.hadoopConfiguration) + .setBasePath(tablePath.get) + .build()) + } catch { + case _: TableNotFoundException => + log.warn("Ignore TableNotFoundException as it is first microbatch.") + Option.empty + } + } + private val retryCnt = options.getOrDefault(STREAMING_RETRY_CNT.key, + STREAMING_RETRY_CNT.defaultValue).toInt + private val retryIntervalMs = options.getOrDefault(STREAMING_RETRY_INTERVAL_MS.key, + STREAMING_RETRY_INTERVAL_MS.defaultValue).toLong + private val ignoreFailedBatch = options.getOrDefault(STREAMING_IGNORE_FAILED_BATCH.key, + STREAMING_IGNORE_FAILED_BATCH.defaultValue).toBoolean + // This constant serves as the checkpoint key for streaming sink so that each microbatch is processed exactly-once. + private val SINK_CHECKPOINT_KEY = "_hudi_streaming_sink_checkpoint" private var isAsyncCompactorServiceShutdownAbnormally = false private var isAsyncClusteringServiceShutdownAbnormally = false @@ -65,10 +88,10 @@ class HoodieStreamingSink(sqlContext: SQLContext, SaveMode.Overwrite } - private var asyncCompactorService : AsyncCompactService = _ + private var asyncCompactorService: AsyncCompactService = _ private var asyncClusteringService: AsyncClusteringService = _ - private var writeClient : Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty - private var hoodieTableConfig : Option[HoodieTableConfig] = Option.empty + private var writeClient: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty + private var hoodieTableConfig: Option[HoodieTableConfig] = Option.empty override def addBatch(batchId: Long, data: DataFrame): Unit = this.synchronized { if (isAsyncCompactorServiceShutdownAbnormally) { @@ -78,26 +101,50 @@ class HoodieStreamingSink(sqlContext: SQLContext, log.error("Async clustering service shutdown unexpectedly") throw new IllegalStateException("Async clustering service shutdown unexpectedly") } + + val queryId = sqlContext.sparkContext.getLocalProperty(StreamExecution.QUERY_ID_KEY) + checkArgument(queryId != null, "queryId is null") + if (metaClient.isDefined && canSkipBatch(batchId, options.getOrDefault(OPERATION.key, UPSERT_OPERATION_OPT_VAL))) { + log.warn(s"Skipping already completed batch $batchId in query $queryId") + return + } + // Override to use direct markers. In Structured streaming, timeline server is closed after // first micro-batch and subsequent micro-batches do not have timeline server running. // Thus, we can't use timeline-server-based markers. var updatedOptions = options.updated(HoodieWriteConfig.MARKERS_TYPE.key(), MarkerType.DIRECT.name()) // we need auto adjustment enabled for streaming sink since async table services are feasible within the same JVM. updatedOptions = updatedOptions.updated(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key, "true") + // Add batchId as checkpoint to the extra metadata. To enable same checkpoint metadata structure for multi-writers, + // SINK_CHECKPOINT_KEY holds a map of batchId to writer context (composed of applicationId and queryId), e.g. + // "_hudi_streaming_sink_checkpoint" : "{\"$batchId\":\"${sqlContext.sparkContext.applicationId}-$queryId\"}" + // NOTE: In case of multi-writers, this map should be mutable and sorted by key to facilitate merging of batchIds. + // HUDI-4432 tracks the implementation of checkpoint management for multi-writer. + val checkpointMap = Map(batchId.toString -> s"${sqlContext.sparkContext.applicationId}-$queryId") + updatedOptions = updatedOptions.updated(SINK_CHECKPOINT_KEY, HoodieSinkCheckpoint.toJson(checkpointMap)) retry(retryCnt, retryIntervalMs)( Try( HoodieSparkSqlWriter.write( sqlContext, mode, updatedOptions, data, hoodieTableConfig, writeClient, Some(triggerAsyncCompactor), Some(triggerAsyncClustering)) - ) match { + ) + match { case Success((true, commitOps, compactionInstantOps, clusteringInstant, client, tableConfig)) => log.info(s"Micro batch id=$batchId succeeded" + (commitOps.isPresent match { case true => s" for commit=${commitOps.get()}" case _ => s" with no new commits" })) + log.info(s"Current value of latestCommittedBatchId: $latestCommittedBatchId. Setting latestCommittedBatchId to batchId $batchId.") + latestCommittedBatchId = batchId writeClient = Some(client) hoodieTableConfig = Some(tableConfig) + if (client != null) { + metaClient = Some(HoodieTableMetaClient.builder() + .setConf(sqlContext.sparkContext.hadoopConfiguration) + .setBasePath(client.getConfig.getBasePath) + .build()) + } if (compactionInstantOps.isPresent) { asyncCompactorService.enqueuePendingAsyncServiceInstant( new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, compactionInstantOps.get())) @@ -112,7 +159,7 @@ class HoodieStreamingSink(sqlContext: SQLContext, // clean up persist rdds in the write process data.sparkSession.sparkContext.getPersistentRDDs .foreach { - case (id, rdd) => + case (_, rdd) => try { rdd.unpersist() } catch { @@ -122,28 +169,29 @@ class HoodieStreamingSink(sqlContext: SQLContext, log.error(s"Micro batch id=$batchId threw following exception: ", e) if (ignoreFailedBatch) { log.warn(s"Ignore the exception and move on streaming as per " + - s"${DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.key} configuration") + s"${STREAMING_IGNORE_FAILED_BATCH.key} configuration") Success((true, None, None)) } else { if (retryCnt > 1) log.info(s"Retrying the failed micro batch id=$batchId ...") Failure(e) } - case Success((false, commitOps, compactionInstantOps, clusteringInstant, client, tableConfig)) => + case Success((false, commitOps, _, _, _, _)) => log.error(s"Micro batch id=$batchId ended up with errors" + (commitOps.isPresent match { - case true => s" for commit=${commitOps.get()}" - case _ => s"" - })) + case true => s" for commit=${commitOps.get()}" + case _ => s"" + })) if (ignoreFailedBatch) { log.info(s"Ignore the errors and move on streaming as per " + - s"${DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.key} configuration") + s"${STREAMING_IGNORE_FAILED_BATCH.key} configuration") Success((true, None, None)) } else { - if (retryCnt > 1) log.info(s"Retrying the failed micro batch id=$batchId ...") + if (retryCnt > 1) log.warn(s"Retrying the failed micro batch id=$batchId ...") Failure(new HoodieCorruptedDataException(s"Micro batch id=$batchId ended up with errors")) } } - ) match { + ) + match { case Failure(e) => if (!ignoreFailedBatch) { log.error(s"Micro batch id=$batchId threw following expections," + @@ -198,14 +246,14 @@ class HoodieStreamingSink(sqlContext: SQLContext, // First time, scan .hoodie folder and get all pending compactions val metaClient = HoodieTableMetaClient.builder().setConf(sqlContext.sparkContext.hadoopConfiguration) .setBasePath(client.getConfig.getBasePath).build() - val pendingInstants :java.util.List[HoodieInstant] = + val pendingInstants: java.util.List[HoodieInstant] = CompactionUtils.getPendingCompactionInstantTimes(metaClient) - pendingInstants.foreach((h : HoodieInstant) => asyncCompactorService.enqueuePendingAsyncServiceInstant(h)) + pendingInstants.foreach((h: HoodieInstant) => asyncCompactorService.enqueuePendingAsyncServiceInstant(h)) } } protected def triggerAsyncClustering(client: SparkRDDWriteClient[HoodieRecordPayload[Nothing]]): Unit = { - if (null == asyncClusteringService) { + if (null == asyncClusteringService) { log.info("Triggering async clustering!") asyncClusteringService = new SparkStreamingAsyncClusteringService(new HoodieSparkEngineContext(new JavaSparkContext(sqlContext.sparkContext)), client) @@ -226,12 +274,12 @@ class HoodieStreamingSink(sqlContext: SQLContext, // First time, scan .hoodie folder and get all pending clustering instants val metaClient = HoodieTableMetaClient.builder().setConf(sqlContext.sparkContext.hadoopConfiguration) .setBasePath(client.getConfig.getBasePath).build() - val pendingInstants :java.util.List[HoodieInstant] = ClusteringUtils.getPendingClusteringInstantTimes(metaClient) - pendingInstants.foreach((h : HoodieInstant) => asyncClusteringService.enqueuePendingAsyncServiceInstant(h)) + val pendingInstants: java.util.List[HoodieInstant] = ClusteringUtils.getPendingClusteringInstantTimes(metaClient) + pendingInstants.foreach((h: HoodieInstant) => asyncClusteringService.enqueuePendingAsyncServiceInstant(h)) } } - private def reset(force: Boolean) : Unit = this.synchronized { + private def reset(force: Boolean): Unit = this.synchronized { if (asyncCompactorService != null) { asyncCompactorService.shutdown(force) asyncCompactorService = null @@ -247,4 +295,45 @@ class HoodieStreamingSink(sqlContext: SQLContext, writeClient = Option.empty } } + + private def canSkipBatch(incomingBatchId: Long, operationType: String): Boolean = { + if (!DELETE_OPERATION_OPT_VAL.equals(operationType)) { + // get the latest checkpoint from the commit metadata to check if the microbatch has already been prcessed or not + val commitMetadata = CommitUtils.getLatestCommitMetadataWithValidCheckpointInfo( + metaClient.get.getActiveTimeline.getCommitsTimeline, SINK_CHECKPOINT_KEY) + if (commitMetadata.isPresent) { + val lastCheckpoint = commitMetadata.get.getMetadata(SINK_CHECKPOINT_KEY) + if (!StringUtils.isNullOrEmpty(lastCheckpoint)) { + latestCommittedBatchId = HoodieSinkCheckpoint.fromJson(lastCheckpoint).keys.head.toLong + } + } + latestCommittedBatchId >= incomingBatchId + } else { + // In case of DELETE_OPERATION_OPT_VAL the incoming batch id is sentinel value (-1) + false + } + } +} + +/** + * SINK_CHECKPOINT_KEY holds a map of batchId to writer context (composed of applicationId and queryId). + * This is a util object to serialize/deserialize map to/from json. + */ +object HoodieSinkCheckpoint { + + lazy val mapper: ObjectMapper = { + val _mapper = new ObjectMapper + _mapper.setSerializationInclusion(Include.NON_ABSENT) + _mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + _mapper.registerModule(DefaultScalaModule) + _mapper + } + + def toJson(checkpoint: Map[String, String]): String = { + mapper.writeValueAsString(checkpoint) + } + + def fromJson(json: String): Map[String, String] = { + mapper.readValue(json, classOf[Map[String, String]]) + } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java index d16685243e5b5..e716d34bd5efd 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java @@ -303,7 +303,9 @@ public int addInputAndValidateIngestion(SparkSession spark, FileSystem fs, Strin } if (tableType.equals(HoodieTableType.MERGE_ON_READ.name())) { - numExpCommits += 1; + if (inputDF2 != null) { + numExpCommits += 1; + } // Wait for compaction to also finish and track latest timestamp as commit timestamp waitTillNCommits(fs, numExpCommits, 180, 3); commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, tablePath); diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala index 8981d1447b12f..1382bafb7621a 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala @@ -223,7 +223,7 @@ class TestStructuredStreaming extends HoodieClientTestBase { success = true } } catch { - case te: TableNotFoundException => + case _: TableNotFoundException => log.info("Got table not found exception. Retrying") } finally { if (!success) { @@ -312,7 +312,7 @@ class TestStructuredStreaming extends HoodieClientTestBase { @throws[InterruptedException] private def waitTillHasCompletedReplaceInstant(tablePath: String, - timeoutSecs: Int, sleepSecsAfterEachRun: Int) = { + timeoutSecs: Int, sleepSecsAfterEachRun: Int) = { val beginTime = System.currentTimeMillis var currTime = beginTime val timeoutMsecs = timeoutSecs * 1000 diff --git a/style/scalastyle.xml b/style/scalastyle.xml index 18962b1644486..e06109388b24f 100644 --- a/style/scalastyle.xml +++ b/style/scalastyle.xml @@ -72,7 +72,7 @@ - +