Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -328,35 +328,7 @@ public Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> readFromSource(
// Retrieve the previous round checkpoints, if any
Option<String> resumeCheckpointStr = Option.empty();
if (commitTimelineOpt.isPresent()) {
Option<HoodieInstant> lastCommit = commitTimelineOpt.get().lastInstant();
if (lastCommit.isPresent()) {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(commitTimelineOpt.get().getInstantDetails(lastCommit.get()).get(), HoodieCommitMetadata.class);
if (cfg.checkpoint != null && (StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))
|| !cfg.checkpoint.equals(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY)))) {
resumeCheckpointStr = Option.of(cfg.checkpoint);
} else if (!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_KEY))) {
//if previous checkpoint is an empty string, skip resume use Option.empty()
resumeCheckpointStr = Option.of(commitMetadata.getMetadata(CHECKPOINT_KEY));
} else if (HoodieTimeline.compareTimestamps(HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS,
HoodieTimeline.LESSER_THAN, lastCommit.get().getTimestamp())) {
// if previous commit metadata did not have the checkpoint key, try traversing previous commits until we find one.
Option<String> prevCheckpoint = getPreviousCheckpoint(commitTimelineOpt.get());
if (prevCheckpoint.isPresent()) {
resumeCheckpointStr = prevCheckpoint;
} else {
throw new HoodieDeltaStreamerException(
"Unable to find previous checkpoint. Please double check if this table "
+ "was indeed built via delta streamer. Last Commit :" + lastCommit + ", Instants :"
+ commitTimelineOpt.get().getInstants().collect(Collectors.toList()) + ", CommitMetadata="
+ commitMetadata.toJsonString());
}
}
// KAFKA_CHECKPOINT_TYPE will be honored only for first batch.
if (!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))) {
props.remove(KafkaOffsetGen.Config.KAFKA_CHECKPOINT_TYPE.key());
}
}
resumeCheckpointStr = getCheckpointToResume(commitTimelineOpt);
} else {
// initialize the table for the first time.
String partitionColumns = HoodieSparkUtils.getPartitionColumns(keyGenerator, props);
Expand Down Expand Up @@ -459,12 +431,55 @@ public Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> readFromSource(
return Pair.of(schemaProvider, Pair.of(checkpointStr, records));
}

protected Option<String> getPreviousCheckpoint(HoodieTimeline timeline) throws IOException {
return timeline.getReverseOrderedInstants().map(instant -> {
/**
* Process previous commit metadata and checkpoint configs set by user to determine the checkpoint to resume from.
* @param commitTimelineOpt commit timeline of interest.
* @return the checkpoint to resume from if applicable.
* @throws IOException
*/
private Option<String> getCheckpointToResume(Option<HoodieTimeline> commitTimelineOpt) throws IOException {
Option<String> resumeCheckpointStr = Option.empty();
Option<HoodieInstant> lastCommit = commitTimelineOpt.get().lastInstant();
if (lastCommit.isPresent()) {
// if previous commit metadata did not have the checkpoint key, try traversing previous commits until we find one.
Option<HoodieCommitMetadata> commitMetadataOption = getLatestCommitMetadataWithValidCheckpointInfo(commitTimelineOpt.get());
if (commitMetadataOption.isPresent()) {
HoodieCommitMetadata commitMetadata = commitMetadataOption.get();
if (cfg.checkpoint != null && (StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))
|| !cfg.checkpoint.equals(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY)))) {
resumeCheckpointStr = Option.of(cfg.checkpoint);
} else if (!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_KEY))) {
//if previous checkpoint is an empty string, skip resume use Option.empty()
resumeCheckpointStr = Option.of(commitMetadata.getMetadata(CHECKPOINT_KEY));
} else if (HoodieTimeline.compareTimestamps(HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS,
HoodieTimeline.LESSER_THAN, lastCommit.get().getTimestamp())) {
throw new HoodieDeltaStreamerException(
"Unable to find previous checkpoint. Please double check if this table "
+ "was indeed built via delta streamer. Last Commit :" + lastCommit + ", Instants :"
+ commitTimelineOpt.get().getInstants().collect(Collectors.toList()) + ", CommitMetadata="
+ commitMetadata.toJsonString());
}
// KAFKA_CHECKPOINT_TYPE will be honored only for first batch.
if (!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))) {
props.remove(KafkaOffsetGen.Config.KAFKA_CHECKPOINT_TYPE.key());
}
} else if (cfg.checkpoint != null) { // getLatestCommitMetadataWithValidCheckpointInfo(commitTimelineOpt.get()) will never return a commit metadata w/o any checkpoint key set.
resumeCheckpointStr = Option.of(cfg.checkpoint);
}
}
return resumeCheckpointStr;
}

protected Option<HoodieCommitMetadata> getLatestCommitMetadataWithValidCheckpointInfo(HoodieTimeline timeline) throws IOException {
Comment thread
nsivabalan marked this conversation as resolved.
return (Option<HoodieCommitMetadata>) timeline.getReverseOrderedInstants().map(instant -> {
try {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(timeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class);
return Option.ofNullable(commitMetadata.getMetadata(CHECKPOINT_KEY));
if (!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_KEY)) || !StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))) {
return Option.of(commitMetadata);
} else {
return Option.empty();
}
} catch (IOException e) {
throw new HoodieIOException("Failed to parse HoodieCommitMetadata for " + instant.toString(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,14 +292,23 @@ static void addCommitToTimeline(HoodieTableMetaClient metaCient) throws IOExcept
}

static void addCommitToTimeline(HoodieTableMetaClient metaCient, Map<String, String> extraMetadata) throws IOException {
addCommitToTimeline(metaCient, WriteOperationType.UPSERT, HoodieTimeline.COMMIT_ACTION, extraMetadata);
}

static void addReplaceCommitToTimeline(HoodieTableMetaClient metaCient, Map<String, String> extraMetadata) throws IOException {
addCommitToTimeline(metaCient, WriteOperationType.CLUSTER, HoodieTimeline.REPLACE_COMMIT_ACTION, extraMetadata);
}

static void addCommitToTimeline(HoodieTableMetaClient metaCient, WriteOperationType writeOperationType, String commitActiontype,
Map<String, String> extraMetadata) throws IOException {
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
commitMetadata.setOperationType(WriteOperationType.UPSERT);
commitMetadata.setOperationType(writeOperationType);
extraMetadata.forEach((k,v) -> commitMetadata.getExtraMetadata().put(k, v));
String commitTime = HoodieActiveTimeline.createNewInstantTime();
metaCient.getActiveTimeline().createNewInstant(new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMMIT_ACTION, commitTime));
metaCient.getActiveTimeline().createNewInstant(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, commitTime));
metaCient.getActiveTimeline().createNewInstant(new HoodieInstant(HoodieInstant.State.REQUESTED, commitActiontype, commitTime));
metaCient.getActiveTimeline().createNewInstant(new HoodieInstant(HoodieInstant.State.INFLIGHT, commitActiontype, commitTime));
metaCient.getActiveTimeline().saveAsComplete(
new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, commitTime),
new HoodieInstant(HoodieInstant.State.INFLIGHT, commitActiontype, commitTime),
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1758,14 +1758,20 @@ public void testFetchingCheckpointFromPreviousCommits() throws IOException {
extraMetadata.put(HoodieWriteConfig.DELTASTREAMER_CHECKPOINT_KEY, "abc");
addCommitToTimeline(metaClient, extraMetadata);
metaClient.reloadActiveTimeline();
assertEquals(testDeltaSync.getPreviousCheckpoint(metaClient.getActiveTimeline().getCommitsTimeline()).get(), "abc");
assertEquals(testDeltaSync.getLatestCommitMetadataWithValidCheckpointInfo(metaClient.getActiveTimeline()
.getCommitsTimeline()).get().getMetadata(CHECKPOINT_KEY), "abc");

addCommitToTimeline(metaClient, Collections.emptyMap());
metaClient.reloadActiveTimeline();
extraMetadata.put(HoodieWriteConfig.DELTASTREAMER_CHECKPOINT_KEY, "def");
addCommitToTimeline(metaClient, extraMetadata);
metaClient.reloadActiveTimeline();
assertEquals(testDeltaSync.getPreviousCheckpoint(metaClient.getActiveTimeline().getCommitsTimeline()).get(), "def");
assertEquals(testDeltaSync.getLatestCommitMetadataWithValidCheckpointInfo(metaClient.getActiveTimeline()
.getCommitsTimeline()).get().getMetadata(CHECKPOINT_KEY), "def");

// add a replace commit which does not have CEHCKPOINT_KEY. Deltastreamer should be able to go back and pick the right checkpoint.
addReplaceCommitToTimeline(metaClient, Collections.emptyMap());
metaClient.reloadActiveTimeline();
assertEquals(testDeltaSync.getLatestCommitMetadataWithValidCheckpointInfo(metaClient.getActiveTimeline()
.getCommitsTimeline()).get().getMetadata(CHECKPOINT_KEY), "def");
}

class TestDeltaSync extends DeltaSync {
Expand All @@ -1776,8 +1782,8 @@ public TestDeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession,
super(cfg, sparkSession, schemaProvider, props, jssc, fs, conf, onInitializingHoodieWriteClient);
}

protected Option<String> getPreviousCheckpoint(HoodieTimeline timeline) throws IOException {
return super.getPreviousCheckpoint(timeline);
protected Option<HoodieCommitMetadata> getLatestCommitMetadataWithValidCheckpointInfo(HoodieTimeline timeline) throws IOException {
return super.getLatestCommitMetadataWithValidCheckpointInfo(timeline);
}
}

Expand Down