diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/BaseMultiPartUploadCommitter.java b/paimon-common/src/main/java/org/apache/paimon/fs/BaseMultiPartUploadCommitter.java index 1f7527a8557a..d0aa8d9e2247 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/BaseMultiPartUploadCommitter.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/BaseMultiPartUploadCommitter.java @@ -83,4 +83,7 @@ public void discard(FileIO fileIO) throws IOException { public Path targetFilePath() { return new Path(objectName); } + + @Override + public void clean(FileIO fileIO) throws IOException {} } diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/RenamingTwoPhaseOutputStream.java b/paimon-common/src/main/java/org/apache/paimon/fs/RenamingTwoPhaseOutputStream.java index cfa7da537645..7518f4cf4914 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/RenamingTwoPhaseOutputStream.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/RenamingTwoPhaseOutputStream.java @@ -29,6 +29,7 @@ */ @Public public class RenamingTwoPhaseOutputStream extends TwoPhaseOutputStream { + private static final String TEMP_DIR_NAME = "_temporary"; private final Path targetPath; private final Path tempPath; @@ -87,12 +88,12 @@ public Committer closeForCommit() throws IOException { * directory as the target with a unique suffix. */ private Path generateTempPath(Path targetPath) { - String tempFileName = ".tmp." + UUID.randomUUID(); + String tempFileName = TEMP_DIR_NAME + "/.tmp." + UUID.randomUUID(); return new Path(targetPath.getParent(), tempFileName); } /** Committer implementation that renames temporary file to target path. */ - private static class TempFileCommitter implements Committer { + public static class TempFileCommitter implements Committer { private static final long serialVersionUID = 1L; @@ -132,5 +133,13 @@ public void discard(FileIO fileIO) throws IOException { public Path targetFilePath() { return targetPath; } + + @Override + public void clean(FileIO fileIO) throws IOException { + Path path = tempPath.getParent(); + if (fileIO.exists(path)) { + fileIO.deleteQuietly(path); + } + } } } diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/TwoPhaseOutputStream.java b/paimon-common/src/main/java/org/apache/paimon/fs/TwoPhaseOutputStream.java index 2a6714e14939..7975c4ea767b 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/TwoPhaseOutputStream.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/TwoPhaseOutputStream.java @@ -53,5 +53,7 @@ public interface Committer extends Serializable { void discard(FileIO fileIO) throws IOException; Path targetFilePath(); + + void clean(FileIO fileIO) throws IOException; } } diff --git a/paimon-common/src/test/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStreamTest.java b/paimon-common/src/test/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStreamTest.java index 1cf28389b360..ac20a6585296 100644 --- a/paimon-common/src/test/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStreamTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStreamTest.java @@ -322,6 +322,9 @@ public void discard(FileIO fileIO) throws IOException { public Path targetFilePath() { return new Path(objectName); } + + @Override + public void clean(FileIO fileIO) throws IOException {} } private static final class TestPart { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableCommit.java b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableCommit.java index 54c8cac54157..e1fd5a903e25 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableCommit.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableCommit.java @@ -100,6 +100,10 @@ public void commit(List commitMessages) { for (TwoPhaseOutputStream.Committer committer : committers) { committer.commit(this.fileIO); } + for (TwoPhaseOutputStream.Committer committer : committers) { + committer.clean(this.fileIO); + } + } catch (Exception e) { this.abort(commitMessages); throw new RuntimeException(e);