diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java index a3ac8762f1a2f..05ec523bd2c19 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java @@ -29,11 +29,13 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.avro.Schema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -46,6 +48,7 @@ public class CommitUtils { private static final Logger LOG = LoggerFactory.getLogger(CommitUtils.class); private static final String NULL_SCHEMA_STR = Schema.create(Schema.Type.NULL).toString(); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); /** * Gets the commit action type for given write operation and table type. @@ -145,19 +148,24 @@ public static Set> getPartitionAndFileIdWithoutSuffix(Map getLatestCommitMetadataWithValidCheckpointInfo(HoodieTimeline timeline, String checkpointKey) { - return (Option) timeline.filterCompletedInstants().getReverseOrderedInstants().map(instant -> { + public static Option getValidCheckpointForCurrentWriter(HoodieTimeline timeline, String checkpointKey, + String keyToLookup) { + return (Option) timeline.getWriteTimeline().getReverseOrderedInstants().map(instant -> { try { HoodieCommitMetadata commitMetadata = HoodieCommitMetadata .fromBytes(timeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class); - if (StringUtils.nonEmpty(commitMetadata.getMetadata(checkpointKey))) { - return Option.of(commitMetadata); + // process commits only with checkpoint entries + String checkpointValue = commitMetadata.getMetadata(checkpointKey); + if (StringUtils.nonEmpty(checkpointValue)) { + // return if checkpoint for "keyForLookup" exists. + return readCheckpointValue(checkpointValue, keyToLookup); } else { return Option.empty(); } @@ -166,4 +174,27 @@ public static Option getLatestCommitMetadataWithValidCheck } }).filter(Option::isPresent).findFirst().orElse(Option.empty()); } + + public static Option readCheckpointValue(String value, String id) { + try { + Map checkpointMap = OBJECT_MAPPER.readValue(value, Map.class); + if (!checkpointMap.containsKey(id)) { + return Option.empty(); + } + String checkpointVal = checkpointMap.get(id); + return Option.of(checkpointVal); + } catch (IOException e) { + throw new HoodieIOException("Failed to parse checkpoint as map", e); + } + } + + public static String getCheckpointValueAsString(String identifier, String batchId) { + try { + Map checkpointMap = new HashMap<>(); + checkpointMap.put(identifier, batchId); + return OBJECT_MAPPER.writeValueAsString(checkpointMap); + } catch (IOException e) { + throw new HoodieIOException("Failed to parse checkpoint as map", e); + } + } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java index 10a99e36681aa..9ce0ff2d44bbf 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java @@ -58,6 +58,8 @@ import java.util.List; import java.util.Map; +import static org.apache.hudi.common.util.CommitUtils.getCheckpointValueAsString; + /** * Utilities used throughout the data source. */ @@ -141,6 +143,12 @@ public static Map getExtraMetadata(Map propertie } }); } + if (properties.containsKey(HoodieSparkSqlWriter.SPARK_STREAMING_BATCH_ID())) { + extraMetadataMap.put(HoodieStreamingSink.SINK_CHECKPOINT_KEY(), + getCheckpointValueAsString(properties.getOrDefault(DataSourceWriteOptions.STREAMING_CHECKPOINT_IDENTIFIER().key(), + DataSourceWriteOptions.STREAMING_CHECKPOINT_IDENTIFIER().defaultValue()), + properties.get(HoodieSparkSqlWriter.SPARK_STREAMING_BATCH_ID()))); + } return extraMetadataMap; } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 897fd3e81e1ce..acd0b2c225060 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -97,6 +97,11 @@ object HoodieSparkSqlWriter { ConfigProperty.key("hoodie.internal.sql.merge.into.writes") .defaultValue(false) + /** + * For spark streaming use-cases, holds the batch Id. + */ + val SPARK_STREAMING_BATCH_ID = "hoodie.internal.spark.streaming.batch.id" + private val log = LoggerFactory.getLogger(getClass) private var tableExists: Boolean = false private var asyncCompactionTriggerFnDefined: Boolean = false diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala index c7538754a8c23..ac0dbffe542ca 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala @@ -16,10 +16,8 @@ */ package org.apache.hudi -import com.fasterxml.jackson.databind.ObjectMapper -import com.fasterxml.jackson.module.scala.DefaultScalaModule import org.apache.hudi.DataSourceWriteOptions._ -import org.apache.hudi.HoodieSinkCheckpoint.SINK_CHECKPOINT_KEY +import org.apache.hudi.HoodieStreamingSink.SINK_CHECKPOINT_KEY import org.apache.hudi.async.{AsyncClusteringService, AsyncCompactService, SparkStreamingAsyncClusteringService, SparkStreamingAsyncCompactService} import org.apache.hudi.client.SparkRDDWriteClient import org.apache.hudi.client.common.HoodieSparkEngineContext @@ -108,42 +106,23 @@ class HoodieStreamingSink(sqlContext: SQLContext, return } + // Override to use direct markers. In Structured streaming, timeline server is closed after // first micro-batch and subsequent micro-batches do not have timeline server running. // Thus, we can't use timeline-server-based markers. var updatedOptions = options.updated(HoodieWriteConfig.MARKERS_TYPE.key(), MarkerType.DIRECT.name()) // we need auto adjustment enabled for streaming sink since async table services are feasible within the same JVM. updatedOptions = updatedOptions.updated(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key, "true") + updatedOptions = updatedOptions.updated(HoodieSparkSqlWriter.SPARK_STREAMING_BATCH_ID, batchId.toString) retry(retryCnt, retryIntervalMs)( Try( HoodieSparkSqlWriter.write( sqlContext, mode, updatedOptions, data, hoodieTableConfig, writeClient, Some(triggerAsyncCompactor), Some(triggerAsyncClustering), extraPreCommitFn = Some(new BiConsumer[HoodieTableMetaClient, HoodieCommitMetadata] { - - override def accept(metaClient: HoodieTableMetaClient, - newCommitMetadata: HoodieCommitMetadata): Unit = { - getStreamIdentifier(options) match { - case Some(identifier) => - // Fetch the latestCommit with checkpoint Info again to avoid concurrency issue in multi-write scenario. - val lastCheckpointCommitMetadata = CommitUtils.getLatestCommitMetadataWithValidCheckpointInfo( - metaClient.getActiveTimeline.getCommitsTimeline, SINK_CHECKPOINT_KEY) - var checkpointString = "" - if (lastCheckpointCommitMetadata.isPresent) { - val lastCheckpoint = lastCheckpointCommitMetadata.get.getMetadata(SINK_CHECKPOINT_KEY) - if (!StringUtils.isNullOrEmpty(lastCheckpoint)) { - checkpointString = HoodieSinkCheckpoint.toJson(HoodieSinkCheckpoint.fromJson(lastCheckpoint) + (identifier -> batchId.toString)) - } else { - checkpointString = HoodieSinkCheckpoint.toJson(Map(identifier -> batchId.toString)) - } - } else { - checkpointString = HoodieSinkCheckpoint.toJson(Map(identifier -> batchId.toString)) - } - - newCommitMetadata.addMetadata(SINK_CHECKPOINT_KEY, checkpointString) - case None => - // No op since keeping batch id in memory only. - } + override def accept(metaClient: HoodieTableMetaClient, newCommitMetadata: HoodieCommitMetadata): Unit = { + val identifier = options.getOrElse(STREAMING_CHECKPOINT_IDENTIFIER.key(), STREAMING_CHECKPOINT_IDENTIFIER.defaultValue()) + newCommitMetadata.addMetadata(SINK_CHECKPOINT_KEY, CommitUtils.getCheckpointValueAsString(identifier, String.valueOf(batchId))) } })) ) @@ -328,19 +307,12 @@ class HoodieStreamingSink(sqlContext: SQLContext, private def canSkipBatch(incomingBatchId: Long, operationType: String): Boolean = { if (!DELETE_OPERATION_OPT_VAL.equals(operationType)) { - getStreamIdentifier(options) match { - case Some(identifier) => - // get the latest checkpoint from the commit metadata to check if the microbatch has already been processed or not - val commitMetadata = CommitUtils.getLatestCommitMetadataWithValidCheckpointInfo( - metaClient.get.getActiveTimeline.getCommitsTimeline, SINK_CHECKPOINT_KEY) - if (commitMetadata.isPresent) { - val lastCheckpoint = commitMetadata.get.getMetadata(SINK_CHECKPOINT_KEY) - if (!StringUtils.isNullOrEmpty(lastCheckpoint)) { - HoodieSinkCheckpoint.fromJson(lastCheckpoint).get(identifier).foreach(commitBatchId => - latestCommittedBatchId = commitBatchId.toLong) - } - } - case None => + val identifier = options.getOrElse(STREAMING_CHECKPOINT_IDENTIFIER.key(), STREAMING_CHECKPOINT_IDENTIFIER.defaultValue()) + // get the latest checkpoint from the commit metadata to check if the microbatch has already been processed or not + val lastCheckpoint = CommitUtils.getValidCheckpointForCurrentWriter( + metaClient.get.getActiveTimeline.getWriteTimeline, SINK_CHECKPOINT_KEY, identifier) + if (lastCheckpoint.isPresent) { + latestCommittedBatchId = lastCheckpoint.get().toLong } latestCommittedBatchId >= incomingBatchId } else { @@ -350,26 +322,8 @@ class HoodieStreamingSink(sqlContext: SQLContext, } } -/** - * SINK_CHECKPOINT_KEY holds a map of batchId to writer context (composed of applicationId and queryId). - * This is a util object to serialize/deserialize map to/from json. - */ -object HoodieSinkCheckpoint { - - lazy val mapper: ObjectMapper = { - val _mapper = JsonUtils.getObjectMapper - _mapper.registerModule(DefaultScalaModule) - _mapper - } +object HoodieStreamingSink { // This constant serves as the checkpoint key for streaming sink so that each microbatch is processed exactly-once. val SINK_CHECKPOINT_KEY = "_hudi_streaming_sink_checkpoint" - - def toJson(checkpoint: Map[String, String]): String = { - mapper.writeValueAsString(checkpoint) - } - - def fromJson(json: String): Map[String, String] = { - mapper.readValue(json, classOf[Map[String, String]]) - } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala index ea3b464973540..a1e89760bd57e 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala @@ -19,10 +19,10 @@ package org.apache.hudi.functional import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hudi.DataSourceWriteOptions.STREAMING_CHECKPOINT_IDENTIFIER -import org.apache.hudi.HoodieSinkCheckpoint.SINK_CHECKPOINT_KEY import org.apache.hudi.client.transaction.lock.InProcessLockProvider import org.apache.hudi.common.config.HoodieStorageConfig import org.apache.hudi.common.model.{FileSlice, HoodieTableType, WriteConcurrencyMode} +import org.apache.hudi.HoodieStreamingSink.SINK_CHECKPOINT_KEY import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.timeline.HoodieTimeline import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings @@ -31,7 +31,7 @@ import org.apache.hudi.common.util.{CollectionUtils, CommitUtils} import org.apache.hudi.config.{HoodieClusteringConfig, HoodieCompactionConfig, HoodieLockConfig, HoodieWriteConfig} import org.apache.hudi.exception.TableNotFoundException import org.apache.hudi.testutils.HoodieSparkClientTestBase -import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, HoodieSinkCheckpoint} +import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers} import org.apache.spark.sql._ import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger} import org.apache.spark.sql.types.StructType @@ -284,7 +284,7 @@ class TestStructuredStreaming extends HoodieSparkClientTestBase { .start(destPath) query1.processAllAvailable() - val metaClient = HoodieTableMetaClient.builder + var metaClient = HoodieTableMetaClient.builder .setConf(fs.getConf).setBasePath(destPath).setLoadActiveTimelineOnLoad(true).build assertLatestCheckpointInfoMatched(metaClient, "streaming_identifier1", "0") @@ -313,16 +313,44 @@ class TestStructuredStreaming extends HoodieSparkClientTestBase { assertLatestCheckpointInfoMatched(metaClient, "streaming_identifier2", "0") assertLatestCheckpointInfoMatched(metaClient, "streaming_identifier1", "1") + + + inputDF1.coalesce(1).write.mode(SaveMode.Append).json(sourcePath) + + val query3 = spark.readStream + .schema(schema) + .json(sourcePath) + .writeStream + .format("org.apache.hudi") + .options(commonOpts) + .outputMode(OutputMode.Append) + .option(STREAMING_CHECKPOINT_IDENTIFIER.key(), "streaming_identifier1") + .option("checkpointLocation", s"${basePath}/checkpoint1") + .start(destPath) + + query3.processAllAvailable() + metaClient = HoodieTableMetaClient.builder + .setConf(fs.getConf).setBasePath(destPath).setLoadActiveTimelineOnLoad(true).build + + assertLatestCheckpointInfoMatched(metaClient, "streaming_identifier1", "2") + assertLatestCheckpointInfoMatched(metaClient, "streaming_identifier2", "0") } @Test def testStructuredStreamingForDefaultIdentifier(): Unit = { - val (sourcePath, destPath) = initStreamingSourceAndDestPath("source", "dest") + testStructuredStreamingInternal() + } + @Test + def testStructuredStreamingWithBulkInsert(): Unit = { + testStructuredStreamingInternal("bulk_insert") + } + + def testStructuredStreamingInternal(operation : String = "upsert"): Unit = { + val (sourcePath, destPath) = initStreamingSourceAndDestPath("source", "dest") val records1 = recordsToStrings(dataGen.generateInsertsForPartition("000", 100, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).toList val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) val schema = inputDF1.schema - inputDF1.coalesce(1).write.mode(SaveMode.Append).json(sourcePath) val query1 = spark.readStream @@ -331,6 +359,7 @@ class TestStructuredStreaming extends HoodieSparkClientTestBase { .writeStream .format("org.apache.hudi") .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION.key(), operation) .outputMode(OutputMode.Append) .option("checkpointLocation", s"$basePath/checkpoint1") .start(destPath) @@ -343,17 +372,13 @@ class TestStructuredStreaming extends HoodieSparkClientTestBase { query1.stop() } - def assertLatestCheckpointInfoMatched(metaClient: HoodieTableMetaClient, + def assertLatestCheckpointInfoMatched(metaClient: HoodieTableMetaClient, identifier: String, expectBatchId: String): Unit = { metaClient.reloadActiveTimeline() - val lastCheckpointCommitMetadata = CommitUtils.getLatestCommitMetadataWithValidCheckpointInfo( - metaClient.getActiveTimeline.getCommitsTimeline, SINK_CHECKPOINT_KEY) - - assertTrue(lastCheckpointCommitMetadata.isPresent) - val checkpointMap = HoodieSinkCheckpoint.fromJson(lastCheckpointCommitMetadata.get().getMetadata(SINK_CHECKPOINT_KEY)) - - assertEquals(expectBatchId, checkpointMap.get(identifier).orNull) + val lastCheckpoint = CommitUtils.getValidCheckpointForCurrentWriter( + metaClient.getActiveTimeline.getCommitsTimeline, SINK_CHECKPOINT_KEY, identifier) + assertEquals(lastCheckpoint.get(), expectBatchId) } def structuredStreamingForTestClusteringRunner(sourcePath: String, destPath: String, tableType: HoodieTableType,