Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.avro.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -46,6 +48,7 @@ public class CommitUtils {

private static final Logger LOG = LoggerFactory.getLogger(CommitUtils.class);
private static final String NULL_SCHEMA_STR = Schema.create(Schema.Type.NULL).toString();
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

/**
* Gets the commit action type for given write operation and table type.
Expand Down Expand Up @@ -145,19 +148,24 @@ public static Set<Pair<String, String>> getPartitionAndFileIdWithoutSuffix(Map<S
/**
* Process previous commits metadata in the timeline to determine the checkpoint given a checkpoint key.
* NOTE: This is very similar in intent to DeltaSync#getLatestCommitMetadataWithValidCheckpointInfo except that
* different deployment models (deltastreamer or spark structured streaming) could have different checkpoint keys.
* different deployment models (deltastreamer or spark structured streaming) could have different checkpoint keys.
*
* @param timeline completed commits in active timeline.
* @param timeline completed commits in active timeline.
* @param checkpointKey the checkpoint key in the extra metadata of the commit.
* @param keyToLookup key of interest for which checkpoint is looked up for.
* @return An optional commit metadata with latest checkpoint.
*/
public static Option<HoodieCommitMetadata> getLatestCommitMetadataWithValidCheckpointInfo(HoodieTimeline timeline, String checkpointKey) {
return (Option<HoodieCommitMetadata>) timeline.filterCompletedInstants().getReverseOrderedInstants().map(instant -> {
public static Option<String> getValidCheckpointForCurrentWriter(HoodieTimeline timeline, String checkpointKey,
String keyToLookup) {
return (Option<String>) timeline.getWriteTimeline().getReverseOrderedInstants().map(instant -> {
try {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(timeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class);
if (StringUtils.nonEmpty(commitMetadata.getMetadata(checkpointKey))) {
return Option.of(commitMetadata);
// process commits only with checkpoint entries
String checkpointValue = commitMetadata.getMetadata(checkpointKey);
if (StringUtils.nonEmpty(checkpointValue)) {
// return if checkpoint for "keyForLookup" exists.
return readCheckpointValue(checkpointValue, keyToLookup);
} else {
return Option.empty();
}
Expand All @@ -166,4 +174,27 @@ public static Option<HoodieCommitMetadata> getLatestCommitMetadataWithValidCheck
}
}).filter(Option::isPresent).findFirst().orElse(Option.empty());
}

public static Option<String> readCheckpointValue(String value, String id) {
try {
Map<String, String> checkpointMap = OBJECT_MAPPER.readValue(value, Map.class);
if (!checkpointMap.containsKey(id)) {
return Option.empty();
}
String checkpointVal = checkpointMap.get(id);
return Option.of(checkpointVal);
} catch (IOException e) {
throw new HoodieIOException("Failed to parse checkpoint as map", e);
}
}

public static String getCheckpointValueAsString(String identifier, String batchId) {
try {
Map<String, String> checkpointMap = new HashMap<>();
checkpointMap.put(identifier, batchId);
return OBJECT_MAPPER.writeValueAsString(checkpointMap);
} catch (IOException e) {
throw new HoodieIOException("Failed to parse checkpoint as map", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@
import java.util.List;
import java.util.Map;

import static org.apache.hudi.common.util.CommitUtils.getCheckpointValueAsString;

/**
* Utilities used throughout the data source.
*/
Expand Down Expand Up @@ -141,6 +143,12 @@ public static Map<String, String> getExtraMetadata(Map<String, String> propertie
}
});
}
if (properties.containsKey(HoodieSparkSqlWriter.SPARK_STREAMING_BATCH_ID())) {
extraMetadataMap.put(HoodieStreamingSink.SINK_CHECKPOINT_KEY(),
getCheckpointValueAsString(properties.getOrDefault(DataSourceWriteOptions.STREAMING_CHECKPOINT_IDENTIFIER().key(),
DataSourceWriteOptions.STREAMING_CHECKPOINT_IDENTIFIER().defaultValue()),
properties.get(HoodieSparkSqlWriter.SPARK_STREAMING_BATCH_ID())));
}
return extraMetadataMap;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ object HoodieSparkSqlWriter {
ConfigProperty.key("hoodie.internal.sql.merge.into.writes")
.defaultValue(false)

/**
* For spark streaming use-cases, holds the batch Id.
*/
val SPARK_STREAMING_BATCH_ID = "hoodie.internal.spark.streaming.batch.id"

private val log = LoggerFactory.getLogger(getClass)
private var tableExists: Boolean = false
private var asyncCompactionTriggerFnDefined: Boolean = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@
*/
package org.apache.hudi

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.HoodieSinkCheckpoint.SINK_CHECKPOINT_KEY
import org.apache.hudi.HoodieStreamingSink.SINK_CHECKPOINT_KEY
import org.apache.hudi.async.{AsyncClusteringService, AsyncCompactService, SparkStreamingAsyncClusteringService, SparkStreamingAsyncCompactService}
import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.client.common.HoodieSparkEngineContext
Expand Down Expand Up @@ -108,42 +106,23 @@ class HoodieStreamingSink(sqlContext: SQLContext,
return
}


// Override to use direct markers. In Structured streaming, timeline server is closed after
// first micro-batch and subsequent micro-batches do not have timeline server running.
// Thus, we can't use timeline-server-based markers.
var updatedOptions = options.updated(HoodieWriteConfig.MARKERS_TYPE.key(), MarkerType.DIRECT.name())
// we need auto adjustment enabled for streaming sink since async table services are feasible within the same JVM.
updatedOptions = updatedOptions.updated(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key, "true")
updatedOptions = updatedOptions.updated(HoodieSparkSqlWriter.SPARK_STREAMING_BATCH_ID, batchId.toString)

retry(retryCnt, retryIntervalMs)(
Try(
HoodieSparkSqlWriter.write(
sqlContext, mode, updatedOptions, data, hoodieTableConfig, writeClient, Some(triggerAsyncCompactor), Some(triggerAsyncClustering),
extraPreCommitFn = Some(new BiConsumer[HoodieTableMetaClient, HoodieCommitMetadata] {

override def accept(metaClient: HoodieTableMetaClient,
newCommitMetadata: HoodieCommitMetadata): Unit = {
getStreamIdentifier(options) match {
case Some(identifier) =>
// Fetch the latestCommit with checkpoint Info again to avoid concurrency issue in multi-write scenario.
val lastCheckpointCommitMetadata = CommitUtils.getLatestCommitMetadataWithValidCheckpointInfo(
metaClient.getActiveTimeline.getCommitsTimeline, SINK_CHECKPOINT_KEY)
var checkpointString = ""
if (lastCheckpointCommitMetadata.isPresent) {
val lastCheckpoint = lastCheckpointCommitMetadata.get.getMetadata(SINK_CHECKPOINT_KEY)
if (!StringUtils.isNullOrEmpty(lastCheckpoint)) {
checkpointString = HoodieSinkCheckpoint.toJson(HoodieSinkCheckpoint.fromJson(lastCheckpoint) + (identifier -> batchId.toString))
} else {
checkpointString = HoodieSinkCheckpoint.toJson(Map(identifier -> batchId.toString))
}
} else {
checkpointString = HoodieSinkCheckpoint.toJson(Map(identifier -> batchId.toString))
}

newCommitMetadata.addMetadata(SINK_CHECKPOINT_KEY, checkpointString)
case None =>
// No op since keeping batch id in memory only.
}
override def accept(metaClient: HoodieTableMetaClient, newCommitMetadata: HoodieCommitMetadata): Unit = {
val identifier = options.getOrElse(STREAMING_CHECKPOINT_IDENTIFIER.key(), STREAMING_CHECKPOINT_IDENTIFIER.defaultValue())
newCommitMetadata.addMetadata(SINK_CHECKPOINT_KEY, CommitUtils.getCheckpointValueAsString(identifier, String.valueOf(batchId)))
}
}))
)
Expand Down Expand Up @@ -328,19 +307,12 @@ class HoodieStreamingSink(sqlContext: SQLContext,

private def canSkipBatch(incomingBatchId: Long, operationType: String): Boolean = {
if (!DELETE_OPERATION_OPT_VAL.equals(operationType)) {
getStreamIdentifier(options) match {
case Some(identifier) =>
// get the latest checkpoint from the commit metadata to check if the microbatch has already been processed or not
val commitMetadata = CommitUtils.getLatestCommitMetadataWithValidCheckpointInfo(
metaClient.get.getActiveTimeline.getCommitsTimeline, SINK_CHECKPOINT_KEY)
if (commitMetadata.isPresent) {
val lastCheckpoint = commitMetadata.get.getMetadata(SINK_CHECKPOINT_KEY)
if (!StringUtils.isNullOrEmpty(lastCheckpoint)) {
HoodieSinkCheckpoint.fromJson(lastCheckpoint).get(identifier).foreach(commitBatchId =>
latestCommittedBatchId = commitBatchId.toLong)
}
}
case None =>
val identifier = options.getOrElse(STREAMING_CHECKPOINT_IDENTIFIER.key(), STREAMING_CHECKPOINT_IDENTIFIER.defaultValue())
// get the latest checkpoint from the commit metadata to check if the microbatch has already been processed or not
val lastCheckpoint = CommitUtils.getValidCheckpointForCurrentWriter(
metaClient.get.getActiveTimeline.getWriteTimeline, SINK_CHECKPOINT_KEY, identifier)
if (lastCheckpoint.isPresent) {
latestCommittedBatchId = lastCheckpoint.get().toLong
}
latestCommittedBatchId >= incomingBatchId
} else {
Expand All @@ -350,26 +322,8 @@ class HoodieStreamingSink(sqlContext: SQLContext,
}
}

/**
* SINK_CHECKPOINT_KEY holds a map of batchId to writer context (composed of applicationId and queryId).
* This is a util object to serialize/deserialize map to/from json.
*/
object HoodieSinkCheckpoint {

lazy val mapper: ObjectMapper = {
val _mapper = JsonUtils.getObjectMapper
_mapper.registerModule(DefaultScalaModule)
_mapper
}
object HoodieStreamingSink {

// This constant serves as the checkpoint key for streaming sink so that each microbatch is processed exactly-once.
val SINK_CHECKPOINT_KEY = "_hudi_streaming_sink_checkpoint"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to remove this object now? It's just holding a constant.


def toJson(checkpoint: Map[String, String]): String = {
mapper.writeValueAsString(checkpoint)
}

def fromJson(json: String): Map[String, String] = {
mapper.readValue(json, classOf[Map[String, String]])
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ package org.apache.hudi.functional

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hudi.DataSourceWriteOptions.STREAMING_CHECKPOINT_IDENTIFIER
import org.apache.hudi.HoodieSinkCheckpoint.SINK_CHECKPOINT_KEY
import org.apache.hudi.client.transaction.lock.InProcessLockProvider
import org.apache.hudi.common.config.HoodieStorageConfig
import org.apache.hudi.common.model.{FileSlice, HoodieTableType, WriteConcurrencyMode}
import org.apache.hudi.HoodieStreamingSink.SINK_CHECKPOINT_KEY
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.HoodieTimeline
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
Expand All @@ -31,7 +31,7 @@ import org.apache.hudi.common.util.{CollectionUtils, CommitUtils}
import org.apache.hudi.config.{HoodieClusteringConfig, HoodieCompactionConfig, HoodieLockConfig, HoodieWriteConfig}
import org.apache.hudi.exception.TableNotFoundException
import org.apache.hudi.testutils.HoodieSparkClientTestBase
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, HoodieSinkCheckpoint}
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
import org.apache.spark.sql._
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger}
import org.apache.spark.sql.types.StructType
Expand Down Expand Up @@ -284,7 +284,7 @@ class TestStructuredStreaming extends HoodieSparkClientTestBase {
.start(destPath)

query1.processAllAvailable()
val metaClient = HoodieTableMetaClient.builder
var metaClient = HoodieTableMetaClient.builder
.setConf(fs.getConf).setBasePath(destPath).setLoadActiveTimelineOnLoad(true).build

assertLatestCheckpointInfoMatched(metaClient, "streaming_identifier1", "0")
Expand Down Expand Up @@ -313,16 +313,44 @@ class TestStructuredStreaming extends HoodieSparkClientTestBase {

assertLatestCheckpointInfoMatched(metaClient, "streaming_identifier2", "0")
assertLatestCheckpointInfoMatched(metaClient, "streaming_identifier1", "1")


inputDF1.coalesce(1).write.mode(SaveMode.Append).json(sourcePath)

val query3 = spark.readStream
.schema(schema)
.json(sourcePath)
.writeStream
.format("org.apache.hudi")
.options(commonOpts)
.outputMode(OutputMode.Append)
.option(STREAMING_CHECKPOINT_IDENTIFIER.key(), "streaming_identifier1")
.option("checkpointLocation", s"${basePath}/checkpoint1")
.start(destPath)

query3.processAllAvailable()
metaClient = HoodieTableMetaClient.builder
.setConf(fs.getConf).setBasePath(destPath).setLoadActiveTimelineOnLoad(true).build

assertLatestCheckpointInfoMatched(metaClient, "streaming_identifier1", "2")
assertLatestCheckpointInfoMatched(metaClient, "streaming_identifier2", "0")
}

@Test
def testStructuredStreamingForDefaultIdentifier(): Unit = {
val (sourcePath, destPath) = initStreamingSourceAndDestPath("source", "dest")
testStructuredStreamingInternal()
}

@Test
def testStructuredStreamingWithBulkInsert(): Unit = {
testStructuredStreamingInternal("bulk_insert")
}

def testStructuredStreamingInternal(operation : String = "upsert"): Unit = {
val (sourcePath, destPath) = initStreamingSourceAndDestPath("source", "dest")
val records1 = recordsToStrings(dataGen.generateInsertsForPartition("000", 100, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).toList
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
val schema = inputDF1.schema

inputDF1.coalesce(1).write.mode(SaveMode.Append).json(sourcePath)

val query1 = spark.readStream
Expand All @@ -331,6 +359,7 @@ class TestStructuredStreaming extends HoodieSparkClientTestBase {
.writeStream
.format("org.apache.hudi")
.options(commonOpts)
.option(DataSourceWriteOptions.OPERATION.key(), operation)
.outputMode(OutputMode.Append)
.option("checkpointLocation", s"$basePath/checkpoint1")
.start(destPath)
Expand All @@ -343,17 +372,13 @@ class TestStructuredStreaming extends HoodieSparkClientTestBase {
query1.stop()
}

def assertLatestCheckpointInfoMatched(metaClient: HoodieTableMetaClient,
def assertLatestCheckpointInfoMatched(metaClient: HoodieTableMetaClient,
identifier: String,
expectBatchId: String): Unit = {
metaClient.reloadActiveTimeline()
val lastCheckpointCommitMetadata = CommitUtils.getLatestCommitMetadataWithValidCheckpointInfo(
metaClient.getActiveTimeline.getCommitsTimeline, SINK_CHECKPOINT_KEY)

assertTrue(lastCheckpointCommitMetadata.isPresent)
val checkpointMap = HoodieSinkCheckpoint.fromJson(lastCheckpointCommitMetadata.get().getMetadata(SINK_CHECKPOINT_KEY))

assertEquals(expectBatchId, checkpointMap.get(identifier).orNull)
val lastCheckpoint = CommitUtils.getValidCheckpointForCurrentWriter(
metaClient.getActiveTimeline.getCommitsTimeline, SINK_CHECKPOINT_KEY, identifier)
assertEquals(lastCheckpoint.get(), expectBatchId)
}

def structuredStreamingForTestClusteringRunner(sourcePath: String, destPath: String, tableType: HoodieTableType,
Expand Down