diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/local/LocalOutputFile.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/local/LocalOutputFile.java index be6d2f4fb1ec..2141edae819a 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/local/LocalOutputFile.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/local/LocalOutputFile.java @@ -13,6 +13,7 @@ */ package io.trino.filesystem.local; +import com.google.common.io.Closer; import io.trino.filesystem.Location; import io.trino.filesystem.TrinoOutputFile; import io.trino.memory.context.AggregatedMemoryContext; @@ -20,12 +21,17 @@ import java.io.File; import java.io.IOException; import java.io.OutputStream; +import java.nio.channels.FileChannel; +import java.nio.file.FileAlreadyExistsException; import java.nio.file.Files; import java.nio.file.Path; -import java.nio.file.StandardOpenOption; import static io.trino.filesystem.local.LocalUtils.handleException; +import static java.nio.file.StandardCopyOption.ATOMIC_MOVE; +import static java.nio.file.StandardOpenOption.CREATE_NEW; +import static java.nio.file.StandardOpenOption.WRITE; import static java.util.Objects.requireNonNull; +import static java.util.UUID.randomUUID; public class LocalOutputFile implements TrinoOutputFile @@ -50,7 +56,7 @@ public OutputStream create(AggregatedMemoryContext memoryContext) { try { Files.createDirectories(path.getParent()); - OutputStream stream = Files.newOutputStream(path, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE); + OutputStream stream = Files.newOutputStream(path, CREATE_NEW, WRITE); return new LocalOutputStream(location, stream); } catch (IOException e) { @@ -58,6 +64,44 @@ public OutputStream create(AggregatedMemoryContext memoryContext) } } + @Override + public void createExclusive(byte[] data) + throws IOException + { + Files.createDirectories(path.getParent()); + + // see if we can stop early without acquire locking + if (Files.exists(path)) { + throw new FileAlreadyExistsException(location.toString()); + } + + Path lockPath = path.resolveSibling(path.getFileName() + ".lock"); + Closer closer = Closer.create(); + try { + try (FileChannel _ = FileChannel.open(lockPath, CREATE_NEW, WRITE)) { + closer.register(() -> Files.deleteIfExists(lockPath)); + if (Files.exists(path)) { + throw new FileAlreadyExistsException(location.toString()); + } + + Path tmpFilePath = path.resolveSibling(path.getFileName() + "." + randomUUID() + ".tmp"); + try (OutputStream out = Files.newOutputStream(tmpFilePath, CREATE_NEW, WRITE)) { + closer.register(() -> Files.deleteIfExists(tmpFilePath)); + out.write(data); + } + + // Ensure that the file is only visible when fully written + Files.move(tmpFilePath, path, ATOMIC_MOVE); + } + } + catch (IOException e) { + throw closer.rethrow(handleException(location, e)); + } + finally { + closer.close(); + } + } + @Override public void createOrOverwrite(byte[] data) throws IOException diff --git a/lib/trino-filesystem/src/test/java/io/trino/filesystem/local/TestLocalFileSystem.java b/lib/trino-filesystem/src/test/java/io/trino/filesystem/local/TestLocalFileSystem.java index 036bbcd8a980..9ffdc107d2c0 100644 --- a/lib/trino-filesystem/src/test/java/io/trino/filesystem/local/TestLocalFileSystem.java +++ b/lib/trino-filesystem/src/test/java/io/trino/filesystem/local/TestLocalFileSystem.java @@ -47,12 +47,6 @@ void beforeAll() fileSystem = new LocalFileSystem(tempDirectory); } - @Override - protected boolean supportsCreateExclusive() - { - return false; - } - @AfterEach void afterEach() throws IOException diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/writer/LocalTransactionLogSynchronizer.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/writer/LocalTransactionLogSynchronizer.java index 6056e4d4d946..ee34471e00cd 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/writer/LocalTransactionLogSynchronizer.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/writer/LocalTransactionLogSynchronizer.java @@ -21,7 +21,6 @@ import io.trino.spi.connector.ConnectorSession; import java.io.IOException; -import java.io.OutputStream; import java.io.UncheckedIOException; import java.nio.file.FileAlreadyExistsException; @@ -42,8 +41,8 @@ public LocalTransactionLogSynchronizer(DeltaLakeFileSystemFactory fileSystemFact public void write(ConnectorSession session, VendedCredentialsHandle credentialsHandle, String clusterId, Location newLogEntryPath, byte[] entryContents) { TrinoFileSystem fileSystem = fileSystemFactory.create(session, credentialsHandle); - try (OutputStream outputStream = fileSystem.newOutputFile(newLogEntryPath).create()) { - outputStream.write(entryContents); + try { + fileSystem.newOutputFile(newLogEntryPath).createExclusive(entryContents); } catch (FileAlreadyExistsException e) { throw new TransactionConflictException("Conflict detected while writing Transaction Log entry " + newLogEntryPath, e);