diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index 57e816619f45f..1e68f820d949e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -655,7 +655,7 @@ protected void reconcileAgainstMarkers(HoodieEngineContext context, invalidDataPaths.removeAll(validDataPaths); if (!invalidDataPaths.isEmpty()) { - LOG.info("Removing duplicate data files created due to spark retries before committing. Paths=" + invalidDataPaths); + LOG.info("Removing duplicate data files created due to task retries before committing. Paths=" + invalidDataPaths); Map>> invalidPathsByPartition = invalidDataPaths.stream() .map(dp -> Pair.of(new Path(basePath, dp).getParent().toString(), new Path(basePath, dp).toString())) .collect(Collectors.groupingBy(Pair::getKey)); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java index cf912f620a568..9fea0a97185cb 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java @@ -167,7 +167,7 @@ public void finalizeWrite() { try { fs.rename(newFilePath, oldFilePath); } catch (IOException e) { - throw new HoodieIOException("Error while renaming the temporary roll file: " + throw new HoodieIOException("Error while renaming the temporary rollover file: " + newFilePath + " to old base file name: " + oldFilePath, e); } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java index 1bff89713b7f2..99f111c82f13d 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java @@ -43,7 +43,7 @@ /** * A {@link HoodieMergeHandle} that supports MERGE write incrementally(small data buffers). * - *

For a new data buffer, it initialize and set up the next file path to write, + *

For a new data buffer, it initializes and set up the next file path to write, * and closes the file path when the data buffer write finish. When next data buffer * write starts, it rolls over to another new file. If all the data buffers write finish * for a checkpoint round, it renames the last new file path as the desired file name @@ -143,8 +143,7 @@ protected void makeOldAndNewFilePaths(String partitionPath, String oldFileName, break; } - oldFilePath = newFilePath; // override the old file name - rolloverPaths.add(oldFilePath); + rolloverPaths.add(newFilePath); newFileName = newFileNameWithRollover(rollNumber++); newFilePath = makeNewFilePath(partitionPath, newFileName); LOG.warn("Duplicate write for MERGE bucket with path: " + oldFilePath + ", rolls over to new path: " + newFilePath); @@ -162,13 +161,6 @@ protected String newFileNameWithRollover(int rollNumber) { this.fileId, hoodieTable.getBaseFileExtension()); } - @Override - protected void setWriteStatusPath() { - // if there was rollover, should set up the path as the initial new file path. - Path path = rolloverPaths.size() > 0 ? rolloverPaths.get(0) : newFilePath; - writeStatus.getStat().setPath(new Path(config.getBasePath()), path); - } - @Override public List close() { try { @@ -188,27 +180,19 @@ boolean needsUpdateLocation() { public void finalizeWrite() { // The file visibility should be kept by the configured ConsistencyGuard instance. - rolloverPaths.add(newFilePath); - if (rolloverPaths.size() == 1) { + if (rolloverPaths.size() == 0) { // only one flush action, no need to roll over return; } - for (int i = 0; i < rolloverPaths.size() - 1; i++) { - Path path = rolloverPaths.get(i); + for (Path path : rolloverPaths) { try { fs.delete(path, false); + LOG.info("Delete the rollover data file: " + path + " success!"); } catch (IOException e) { - throw new HoodieIOException("Error when clean the temporary roll file: " + path, e); + throw new HoodieIOException("Error when clean the temporary rollover data file: " + path, e); } } - final Path lastPath = rolloverPaths.get(rolloverPaths.size() - 1); - final Path desiredPath = rolloverPaths.get(0); - try { - fs.rename(lastPath, desiredPath); - } catch (IOException e) { - throw new HoodieIOException("Error when rename the temporary roll file: " + lastPath + " to: " + desiredPath, e); - } } @Override