Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,7 @@ protected void reconcileAgainstMarkers(HoodieEngineContext context,
invalidDataPaths.removeAll(validDataPaths);

if (!invalidDataPaths.isEmpty()) {
LOG.info("Removing duplicate data files created due to spark retries before committing. Paths=" + invalidDataPaths);
LOG.info("Removing duplicate data files created due to task retries before committing. Paths=" + invalidDataPaths);
Map<String, List<Pair<String, String>>> invalidPathsByPartition = invalidDataPaths.stream()
.map(dp -> Pair.of(new Path(basePath, dp).getParent().toString(), new Path(basePath, dp).toString()))
.collect(Collectors.groupingBy(Pair::getKey));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public void finalizeWrite() {
try {
fs.rename(newFilePath, oldFilePath);
} catch (IOException e) {
throw new HoodieIOException("Error while renaming the temporary roll file: "
throw new HoodieIOException("Error while renaming the temporary rollover file: "
+ newFilePath + " to old base file name: " + oldFilePath, e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
/**
* A {@link HoodieMergeHandle} that supports MERGE write incrementally(small data buffers).
*
* <p>For a new data buffer, it initialize and set up the next file path to write,
* <p>For a new data buffer, it initializes and set up the next file path to write,
* and closes the file path when the data buffer write finish. When next data buffer
* write starts, it rolls over to another new file. If all the data buffers write finish
* for a checkpoint round, it renames the last new file path as the desired file name
Expand Down Expand Up @@ -143,8 +143,7 @@ protected void makeOldAndNewFilePaths(String partitionPath, String oldFileName,
break;
}

oldFilePath = newFilePath; // override the old file name
rolloverPaths.add(oldFilePath);
rolloverPaths.add(newFilePath);
newFileName = newFileNameWithRollover(rollNumber++);
newFilePath = makeNewFilePath(partitionPath, newFileName);
LOG.warn("Duplicate write for MERGE bucket with path: " + oldFilePath + ", rolls over to new path: " + newFilePath);
Expand All @@ -162,13 +161,6 @@ protected String newFileNameWithRollover(int rollNumber) {
this.fileId, hoodieTable.getBaseFileExtension());
}

@Override
protected void setWriteStatusPath() {
// if there was rollover, should set up the path as the initial new file path.
Path path = rolloverPaths.size() > 0 ? rolloverPaths.get(0) : newFilePath;
writeStatus.getStat().setPath(new Path(config.getBasePath()), path);
}

@Override
public List<WriteStatus> close() {
try {
Expand All @@ -188,27 +180,19 @@ boolean needsUpdateLocation() {

public void finalizeWrite() {
// The file visibility should be kept by the configured ConsistencyGuard instance.
rolloverPaths.add(newFilePath);
if (rolloverPaths.size() == 1) {
if (rolloverPaths.size() == 0) {
// only one flush action, no need to roll over
return;
}

for (int i = 0; i < rolloverPaths.size() - 1; i++) {
Path path = rolloverPaths.get(i);
for (Path path : rolloverPaths) {
try {
fs.delete(path, false);
LOG.info("Delete the rollover data file: " + path + " success!");
} catch (IOException e) {
throw new HoodieIOException("Error when clean the temporary roll file: " + path, e);
throw new HoodieIOException("Error when clean the temporary rollover data file: " + path, e);
}
}
final Path lastPath = rolloverPaths.get(rolloverPaths.size() - 1);
final Path desiredPath = rolloverPaths.get(0);
try {
fs.rename(lastPath, desiredPath);
} catch (IOException e) {
throw new HoodieIOException("Error when rename the temporary roll file: " + lastPath + " to: " + desiredPath, e);
}
}

@Override
Expand Down