From 870fb920bcdcb411a294e673f86c861cb1b9fd96 Mon Sep 17 00:00:00 2001 From: yantian Date: Mon, 3 Nov 2025 11:13:41 +0800 Subject: [PATCH 1/3] [core] format table: support write file in _temporary at first --- .../java/org/apache/paimon/fs/RenamingTwoPhaseOutputStream.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/RenamingTwoPhaseOutputStream.java b/paimon-common/src/main/java/org/apache/paimon/fs/RenamingTwoPhaseOutputStream.java index cfa7da537645..18ff01f55dfb 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/RenamingTwoPhaseOutputStream.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/RenamingTwoPhaseOutputStream.java @@ -87,7 +87,7 @@ public Committer closeForCommit() throws IOException { * directory as the target with a unique suffix. */ private Path generateTempPath(Path targetPath) { - String tempFileName = ".tmp." + UUID.randomUUID(); + String tempFileName = "_temporary/.tmp." + UUID.randomUUID(); return new Path(targetPath.getParent(), tempFileName); } From 38621dcef4d01d681270108e0b8f3edb9b8df5b8 Mon Sep 17 00:00:00 2001 From: yantian Date: Mon, 3 Nov 2025 15:44:19 +0800 Subject: [PATCH 2/3] fix --- .../fs/RenamingTwoPhaseOutputStream.java | 5 +++-- .../table/format/FormatTableCommit.java | 19 ++++++++++++++++++- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/RenamingTwoPhaseOutputStream.java b/paimon-common/src/main/java/org/apache/paimon/fs/RenamingTwoPhaseOutputStream.java index 18ff01f55dfb..5e2e6c90f0c8 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/RenamingTwoPhaseOutputStream.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/RenamingTwoPhaseOutputStream.java @@ -29,6 +29,7 @@ */ @Public public class RenamingTwoPhaseOutputStream extends TwoPhaseOutputStream { + public static final String TEMP_DIR_NAME = "_temporary"; private final Path targetPath; private final Path tempPath; @@ -87,12 +88,12 @@ public Committer closeForCommit() throws IOException { * directory as the target with a unique suffix. */ private Path generateTempPath(Path targetPath) { - String tempFileName = "_temporary/.tmp." + UUID.randomUUID(); + String tempFileName = TEMP_DIR_NAME + "/.tmp." + UUID.randomUUID(); return new Path(targetPath.getParent(), tempFileName); } /** Committer implementation that renames temporary file to target path. */ - private static class TempFileCommitter implements Committer { + public static class TempFileCommitter implements Committer { private static final long serialVersionUID = 1L; diff --git a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableCommit.java b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableCommit.java index 54c8cac54157..9125462209c3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableCommit.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableCommit.java @@ -21,6 +21,7 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.FileStatus; import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.RenamingTwoPhaseOutputStream; import org.apache.paimon.fs.TwoPhaseOutputStream; import org.apache.paimon.metrics.MetricRegistry; import org.apache.paimon.stats.Statistics; @@ -80,6 +81,7 @@ public void commit(List commitMessages) { + commitMessage.getClass().getName()); } } + Set partitionPaths = new HashSet<>(); if (overwrite && staticPartitions != null && !staticPartitions.isEmpty()) { Path partitionPath = buildPartitionPath( @@ -87,9 +89,9 @@ public void commit(List commitMessages) { staticPartitions, formatTablePartitionOnlyValueInPath, partitionKeys); + partitionPaths.add(partitionPath); deletePreviousDataFile(partitionPath); } else if (overwrite) { - Set partitionPaths = new HashSet<>(); for (TwoPhaseOutputStream.Committer c : committers) { partitionPaths.add(c.targetFilePath().getParent()); } @@ -100,6 +102,21 @@ public void commit(List commitMessages) { for (TwoPhaseOutputStream.Committer committer : committers) { committer.commit(this.fileIO); } + if (committers.stream() + .filter(c -> c instanceof RenamingTwoPhaseOutputStream.TempFileCommitter) + .findAny() + .isPresent()) { + if (partitionPaths.size() > 1) { + for (Path partitionPath : partitionPaths) { + Path tempPath = + new Path(partitionPath, RenamingTwoPhaseOutputStream.TEMP_DIR_NAME); + fileIO.deleteQuietly(tempPath); + } + } else { + Path tempPath = new Path(location, RenamingTwoPhaseOutputStream.TEMP_DIR_NAME); + fileIO.deleteQuietly(tempPath); + } + } } catch (Exception e) { this.abort(commitMessages); throw new RuntimeException(e); From a082b56ffd18e9cbd4bf6a72653262b76dadd045 Mon Sep 17 00:00:00 2001 From: yantian Date: Mon, 3 Nov 2025 17:41:36 +0800 Subject: [PATCH 3/3] add clean for committer --- .../fs/BaseMultiPartUploadCommitter.java | 3 +++ .../fs/RenamingTwoPhaseOutputStream.java | 10 ++++++++- .../paimon/fs/TwoPhaseOutputStream.java | 2 ++ ...ltiPartUploadTwoPhaseOutputStreamTest.java | 3 +++ .../table/format/FormatTableCommit.java | 21 ++++--------------- 5 files changed, 21 insertions(+), 18 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/BaseMultiPartUploadCommitter.java b/paimon-common/src/main/java/org/apache/paimon/fs/BaseMultiPartUploadCommitter.java index 1f7527a8557a..d0aa8d9e2247 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/BaseMultiPartUploadCommitter.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/BaseMultiPartUploadCommitter.java @@ -83,4 +83,7 @@ public void discard(FileIO fileIO) throws IOException { public Path targetFilePath() { return new Path(objectName); } + + @Override + public void clean(FileIO fileIO) throws IOException {} } diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/RenamingTwoPhaseOutputStream.java b/paimon-common/src/main/java/org/apache/paimon/fs/RenamingTwoPhaseOutputStream.java index 5e2e6c90f0c8..7518f4cf4914 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/RenamingTwoPhaseOutputStream.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/RenamingTwoPhaseOutputStream.java @@ -29,7 +29,7 @@ */ @Public public class RenamingTwoPhaseOutputStream extends TwoPhaseOutputStream { - public static final String TEMP_DIR_NAME = "_temporary"; + private static final String TEMP_DIR_NAME = "_temporary"; private final Path targetPath; private final Path tempPath; @@ -133,5 +133,13 @@ public void discard(FileIO fileIO) throws IOException { public Path targetFilePath() { return targetPath; } + + @Override + public void clean(FileIO fileIO) throws IOException { + Path path = tempPath.getParent(); + if (fileIO.exists(path)) { + fileIO.deleteQuietly(path); + } + } } } diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/TwoPhaseOutputStream.java b/paimon-common/src/main/java/org/apache/paimon/fs/TwoPhaseOutputStream.java index 2a6714e14939..7975c4ea767b 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/TwoPhaseOutputStream.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/TwoPhaseOutputStream.java @@ -53,5 +53,7 @@ public interface Committer extends Serializable { void discard(FileIO fileIO) throws IOException; Path targetFilePath(); + + void clean(FileIO fileIO) throws IOException; } } diff --git a/paimon-common/src/test/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStreamTest.java b/paimon-common/src/test/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStreamTest.java index 1cf28389b360..ac20a6585296 100644 --- a/paimon-common/src/test/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStreamTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStreamTest.java @@ -322,6 +322,9 @@ public void discard(FileIO fileIO) throws IOException { public Path targetFilePath() { return new Path(objectName); } + + @Override + public void clean(FileIO fileIO) throws IOException {} } private static final class TestPart { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableCommit.java b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableCommit.java index 9125462209c3..e1fd5a903e25 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableCommit.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableCommit.java @@ -21,7 +21,6 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.FileStatus; import org.apache.paimon.fs.Path; -import org.apache.paimon.fs.RenamingTwoPhaseOutputStream; import org.apache.paimon.fs.TwoPhaseOutputStream; import org.apache.paimon.metrics.MetricRegistry; import org.apache.paimon.stats.Statistics; @@ -81,7 +80,6 @@ public void commit(List commitMessages) { + commitMessage.getClass().getName()); } } - Set partitionPaths = new HashSet<>(); if (overwrite && staticPartitions != null && !staticPartitions.isEmpty()) { Path partitionPath = buildPartitionPath( @@ -89,9 +87,9 @@ public void commit(List commitMessages) { staticPartitions, formatTablePartitionOnlyValueInPath, partitionKeys); - partitionPaths.add(partitionPath); deletePreviousDataFile(partitionPath); } else if (overwrite) { + Set partitionPaths = new HashSet<>(); for (TwoPhaseOutputStream.Committer c : committers) { partitionPaths.add(c.targetFilePath().getParent()); } @@ -102,21 +100,10 @@ public void commit(List commitMessages) { for (TwoPhaseOutputStream.Committer committer : committers) { committer.commit(this.fileIO); } - if (committers.stream() - .filter(c -> c instanceof RenamingTwoPhaseOutputStream.TempFileCommitter) - .findAny() - .isPresent()) { - if (partitionPaths.size() > 1) { - for (Path partitionPath : partitionPaths) { - Path tempPath = - new Path(partitionPath, RenamingTwoPhaseOutputStream.TEMP_DIR_NAME); - fileIO.deleteQuietly(tempPath); - } - } else { - Path tempPath = new Path(location, RenamingTwoPhaseOutputStream.TEMP_DIR_NAME); - fileIO.deleteQuietly(tempPath); - } + for (TwoPhaseOutputStream.Committer committer : committers) { + committer.clean(this.fileIO); } + } catch (Exception e) { this.abort(commitMessages); throw new RuntimeException(e);