Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,13 @@ protected void makeOldAndNewFilePaths(String partitionPath, String oldFileName,
break;
}

rolloverPaths.add(newFilePath);
// Override the old file name,
// In rare cases, when a checkpoint was aborted and the instant time
// is reused, the merge handle generates a new file name
// with the reused instant time of last checkpoint, which is duplicate,
// use the same name file as new base file in case data loss.
oldFilePath = newFilePath;
rolloverPaths.add(oldFilePath);
newFileName = newFileNameWithRollover(rollNumber++);
newFilePath = makeNewFilePath(partitionPath, newFileName);
LOG.warn("Duplicate write for MERGE bucket with path: " + oldFilePath + ", rolls over to new path: " + newFilePath);
Expand All @@ -161,6 +167,12 @@ protected String newFileNameWithRollover(int rollNumber) {
this.fileId, hoodieTable.getBaseFileExtension());
}

@Override
protected void setWriteStatusPath() {
// if there was rollover, should set up the path as the initial new file path.
writeStatus.getStat().setPath(new Path(config.getBasePath()), getWritePath());
}

@Override
public List<WriteStatus> close() {
try {
Expand Down Expand Up @@ -193,6 +205,12 @@ public void finalizeWrite() {
throw new HoodieIOException("Error when clean the temporary rollover data file: " + path, e);
}
}
final Path desiredPath = rolloverPaths.get(0);
try {
fs.rename(newFilePath, desiredPath);
} catch (IOException e) {
throw new HoodieIOException("Error when rename the temporary roll file: " + newFilePath + " to: " + desiredPath, e);
}
}

@Override
Expand All @@ -216,6 +234,6 @@ public void closeGracefully() {

@Override
public Path getWritePath() {
return newFilePath;
return rolloverPaths.size() > 0 ? rolloverPaths.get(0) : newFilePath;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -408,16 +408,6 @@ private boolean hasData() {
&& this.buckets.values().stream().anyMatch(bucket -> bucket.records.size() > 0);
}

private void cleanWriteHandles() {
if (freshInstant(currentInstant)) {
// In rare cases, when a checkpoint was aborted and the instant time
// is reused, the merge handle generates a new file name
// with the reused instant time of last checkpoint, the write handles
// should be kept and reused in case data loss.
this.writeClient.cleanHandles();
}
}

@SuppressWarnings("unchecked, rawtypes")
private boolean flushBucket(DataBucket bucket) {
String instant = instantToWrite(true);
Expand Down Expand Up @@ -489,7 +479,7 @@ private void flushRemaining(boolean endInput) {
this.eventGateway.sendEventToCoordinator(event);
this.buckets.clear();
this.tracer.reset();
cleanWriteHandles();
this.writeClient.cleanHandles();
this.writeStatuses.addAll(writeStatus);
// blocks flushing until the coordinator starts a new instant
this.confirming = true;
Expand Down