Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,16 @@ private boolean hasData() {
&& this.buckets.values().stream().anyMatch(bucket -> bucket.records.size() > 0);
}

private void cleanWriteHandles() {
if (freshInstant(currentInstant)) {
// In rare cases, when a checkpoint was aborted and the instant time
// is reused, the merge handle generates a new file name
// with the reused instant time of last checkpoint, the write handles
// should be kept and reused in case data loss.
this.writeClient.cleanHandles();
}
}

@SuppressWarnings("unchecked, rawtypes")
private boolean flushBucket(DataBucket bucket) {
String instant = instantToWrite(true);
Expand Down Expand Up @@ -479,7 +489,7 @@ private void flushRemaining(boolean endInput) {
this.eventGateway.sendEventToCoordinator(event);
this.buckets.clear();
this.tracer.reset();
this.writeClient.cleanHandles();
cleanWriteHandles();
this.writeStatuses.addAll(writeStatus);
// blocks flushing until the coordinator starts a new instant
this.confirming = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,10 @@ private void reset() {
*/
private boolean allEventsReceived() {
return Arrays.stream(eventBuffer)
.allMatch(event -> event != null && event.isReady(this.instant));
// we do not use event.isReady to check the instant
// because the write task may send an event eagerly for empty
// data set, the even may have a timestamp of last committed instant.
.allMatch(event -> event != null && event.isLastBatch());
}

private void addEventToBuffer(WriteMetadataEvent event) {
Expand Down Expand Up @@ -425,12 +428,14 @@ private void handleEndInputEvent(WriteMetadataEvent event) {
addEventToBuffer(event);
if (allEventsReceived()) {
// start to commit the instant.
commitInstant(this.instant);
// The executor thread inherits the classloader of the #handleEventFromOperator
// caller, which is a AppClassLoader.
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
// sync Hive synchronously if it is enabled in batch mode.
syncHive();
boolean committed = commitInstant(this.instant);
if (committed) {
// The executor thread inherits the classloader of the #handleEventFromOperator
// caller, which is a AppClassLoader.
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
// sync Hive synchronously if it is enabled in batch mode.
syncHive();
}
}
}

Expand Down Expand Up @@ -474,8 +479,8 @@ private static boolean sendToFinishedTasks(Throwable throwable) {
/**
* Commits the instant.
*/
private void commitInstant(String instant) {
commitInstant(instant, -1);
private boolean commitInstant(String instant) {
return commitInstant(instant, -1);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,13 @@ protected String lastPendingInstant() {
return this.ckpMetadata.lastPendingInstant();
}

/**
* Returns whether the instant is fresh new(not aborted).
*/
protected boolean freshInstant(String instant) {
return !this.ckpMetadata.isAborted(instant);
}

/**
* Prepares the instant time to write with for next checkpoint.
*
Expand Down Expand Up @@ -279,6 +286,6 @@ protected String instantToWrite(boolean hasData) {
* Returns whether the pending instant is invalid to write with.
*/
private boolean invalidInstant(String instant, boolean hasData) {
return instant.equals(this.currentInstant) && hasData && !this.ckpMetadata.isAborted(instant);
return instant.equals(this.currentInstant) && hasData && freshInstant(instant);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -358,9 +358,9 @@ public static DataStream<Object> hoodieStreamWrite(Configuration conf, int defau
* The whole pipeline looks like the following:
*
* <pre>
* /=== | task1 | ===\
* | plan generation | ===> hash | commit |
* \=== | task2 | ===/
* /=== | task1 | ===\
* | plan generation | ===> hash | commit |
* \=== | task2 | ===/
*
* Note: both the compaction plan generation task and commission task are singleton.
* </pre>
Expand All @@ -374,6 +374,8 @@ public static DataStreamSink<CompactionCommitEvent> compact(Configuration conf,
TypeInformation.of(CompactionPlanEvent.class),
new CompactionPlanOperator(conf))
.setParallelism(1) // plan generate must be singleton
// make the distribution strategy deterministic to avoid concurrent modifications
// on the same bucket files
.keyBy(plan -> plan.getOperation().getFileGroupId().getFileId())
.transform("compact_task",
TypeInformation.of(CompactionCommitEvent.class),
Expand All @@ -393,9 +395,9 @@ public static DataStreamSink<CompactionCommitEvent> compact(Configuration conf,
* The whole pipeline looks like the following:
*
* <pre>
* /=== | task1 | ===\
* | plan generation | ===> hash | commit |
* \=== | task2 | ===/
* /=== | task1 | ===\
* | plan generation | ===> hash | commit |
* \=== | task2 | ===/
*
* Note: both the clustering plan generation task and commission task are singleton.
* </pre>
Expand All @@ -410,9 +412,11 @@ public static DataStreamSink<ClusteringCommitEvent> cluster(Configuration conf,
TypeInformation.of(ClusteringPlanEvent.class),
new ClusteringPlanOperator(conf))
.setParallelism(1) // plan generate must be singleton
.keyBy(plan -> plan.getClusteringGroupInfo().getOperations()
.stream().map(ClusteringOperation::getFileId)
.collect(Collectors.joining()))
.keyBy(plan ->
// make the distribution strategy deterministic to avoid concurrent modifications
// on the same bucket files
plan.getClusteringGroupInfo().getOperations()
.stream().map(ClusteringOperation::getFileId).collect(Collectors.joining()))
.transform("clustering_task",
TypeInformation.of(ClusteringCommitEvent.class),
new ClusteringOperator(conf, rowType))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,21 +248,8 @@ private void testWriteToHoodie(
Pipelines.clean(conf, pipeline);
Pipelines.compact(conf, pipeline);
}
JobClient client = execEnv.executeAsync(jobName);
if (isMor) {
if (client.getJobStatus().get() != JobStatus.FAILED) {
try {
TimeUnit.SECONDS.sleep(20); // wait long enough for the compaction to finish
client.cancel();
} catch (Throwable var1) {
// ignored
}
}
} else {
// wait for the streaming job to finish
client.getJobExecutionResult().get();
}

execute(execEnv, isMor, jobName);
TestData.checkWrittenDataCOW(tempFile, expected);
}

Expand Down Expand Up @@ -322,17 +309,14 @@ private void testWriteToHoodieWithCluster(
execEnv.addOperator(pipeline.getTransformation());

Pipelines.cluster(conf, rowType, pipeline);
JobClient client = execEnv.executeAsync(jobName);

// wait for the streaming job to finish
client.getJobExecutionResult().get();
execEnv.execute(jobName);

TestData.checkWrittenDataCOW(tempFile, expected);
}

public void execute(StreamExecutionEnvironment execEnv, boolean isMor, String jobName) throws Exception {
JobClient client = execEnv.executeAsync(jobName);
if (isMor) {
JobClient client = execEnv.executeAsync(jobName);
if (client.getJobStatus().get() != JobStatus.FAILED) {
try {
TimeUnit.SECONDS.sleep(20); // wait long enough for the compaction to finish
Expand All @@ -343,7 +327,7 @@ public void execute(StreamExecutionEnvironment execEnv, boolean isMor, String jo
}
} else {
// wait for the streaming job to finish
client.getJobExecutionResult().get();
execEnv.execute(jobName);
}
}

Expand Down Expand Up @@ -451,5 +435,4 @@ public void testHoodiePipelineBuilderSink() throws Exception {
execute(execEnv, true, "Api_Sink_Test");
TestData.checkWrittenDataCOW(tempFile, EXPECTED);
}

}