diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/local/LocalFileSystem.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/local/LocalFileSystem.java index eba9b27014d0..624438ffb2f1 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/local/LocalFileSystem.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/local/LocalFileSystem.java @@ -253,7 +253,7 @@ public Optional createTemporaryDirectory(Location targetPath, String t return Optional.of(temporary); } - private Path toFilePath(Location location) + public Path toFilePath(Location location) { validateLocalLocation(location); location.verifyValidFileLocation(); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/writer/LocalTransactionLogSynchronizer.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/writer/LocalTransactionLogSynchronizer.java deleted file mode 100644 index 6056e4d4d946..000000000000 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/writer/LocalTransactionLogSynchronizer.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.plugin.deltalake.transactionlog.writer; - -import com.google.inject.Inject; -import io.trino.filesystem.Location; -import io.trino.filesystem.TrinoFileSystem; -import io.trino.plugin.deltalake.DeltaLakeFileSystemFactory; -import io.trino.plugin.deltalake.metastore.VendedCredentialsHandle; -import io.trino.spi.connector.ConnectorSession; - -import java.io.IOException; -import java.io.OutputStream; -import java.io.UncheckedIOException; -import java.nio.file.FileAlreadyExistsException; - -import static java.util.Objects.requireNonNull; - -public class LocalTransactionLogSynchronizer - implements TransactionLogSynchronizer -{ - private final DeltaLakeFileSystemFactory fileSystemFactory; - - @Inject - public LocalTransactionLogSynchronizer(DeltaLakeFileSystemFactory fileSystemFactory) - { - this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); - } - - @Override - public void write(ConnectorSession session, VendedCredentialsHandle credentialsHandle, String clusterId, Location newLogEntryPath, byte[] entryContents) - { - TrinoFileSystem fileSystem = fileSystemFactory.create(session, credentialsHandle); - try (OutputStream outputStream = fileSystem.newOutputFile(newLogEntryPath).create()) { - outputStream.write(entryContents); - } - catch (FileAlreadyExistsException e) { - throw new TransactionConflictException("Conflict detected while writing Transaction Log entry " + newLogEntryPath, e); - } - catch (IOException e) { - throw new UncheckedIOException(e); - } - } - - @Override - public boolean isUnsafe() - { - return false; - } -} diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeLocalConcurrentWritesTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeLocalConcurrentWritesTest.java index 1ae42adcb9cc..51a65b5d58d5 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeLocalConcurrentWritesTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeLocalConcurrentWritesTest.java @@ -206,7 +206,6 @@ private void testConcurrentInsertsSelectingFromTheSameTable(boolean partitioned) } @Test - @Disabled // TODO https://github.com/trinodb/trino/issues/21725 Fix flaky test public void testConcurrentInsertsSelectingFromTheSameVersionedTable() throws Exception { @@ -263,7 +262,6 @@ private void testConcurrentInsertsSelectingFromTheSameVersionedTable(boolean par } @Test - @Disabled // TODO https://github.com/trinodb/trino/issues/21725 Fix flaky test void testConcurrentInsertsSelectingFromTheSameTemporalVersionedTable() throws Exception { diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestingDeltaLakePlugin.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestingDeltaLakePlugin.java index b1da80b478e1..695a33a06749 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestingDeltaLakePlugin.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestingDeltaLakePlugin.java @@ -17,7 +17,7 @@ import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.filesystem.local.LocalFileSystemFactory; import io.trino.plugin.deltalake.metastore.NoOpVendedCredentialsProvider; -import io.trino.plugin.deltalake.transactionlog.writer.LocalTransactionLogSynchronizer; +import io.trino.plugin.deltalake.transactionlog.writer.TestingLocalTransactionLogSynchronizer; import io.trino.plugin.deltalake.transactionlog.writer.TransactionLogSynchronizer; import io.trino.plugin.hive.metastore.file.FileHiveMetastoreConfig; import io.trino.spi.connector.Connector; @@ -38,7 +38,8 @@ public class TestingDeltaLakePlugin extends DeltaLakePlugin { - private final Path localFileSystemRootPath; + private final LocalFileSystemFactory localFileSystemFactory; + private final TestingLocalTransactionLogSynchronizer localTransactionLogSynchronizer; private final Supplier> metastoreModule; public TestingDeltaLakePlugin(Path localFileSystemRootPath) @@ -48,7 +49,9 @@ public TestingDeltaLakePlugin(Path localFileSystemRootPath) public TestingDeltaLakePlugin(Path localFileSystemRootPath, Supplier> metastoreModule) { - this.localFileSystemRootPath = requireNonNull(localFileSystemRootPath, "localFileSystemRootPath is null"); + localFileSystemRootPath.toFile().mkdirs(); + localFileSystemFactory = new LocalFileSystemFactory(localFileSystemRootPath); + localTransactionLogSynchronizer = new TestingLocalTransactionLogSynchronizer(new DefaultDeltaLakeFileSystemFactory(localFileSystemFactory, new NoOpVendedCredentialsProvider())); this.metastoreModule = requireNonNull(metastoreModule, "metastoreModule is null"); } @@ -66,7 +69,6 @@ public String getName() @Override public Connector create(String catalogName, Map config, ConnectorContext context) { - localFileSystemRootPath.toFile().mkdirs(); return createConnector( catalogName, config, @@ -74,11 +76,10 @@ public Connector create(String catalogName, Map config, Connecto metastoreModule.get(), binder -> { binder.install(new TestingDeltaLakeExtensionsModule()); - LocalFileSystemFactory localFileSystemFactory = new LocalFileSystemFactory(localFileSystemRootPath); newMapBinder(binder, String.class, TrinoFileSystemFactory.class) .addBinding("local").toInstance(localFileSystemFactory); newMapBinder(binder, String.class, TransactionLogSynchronizer.class) - .addBinding("local").toInstance(new LocalTransactionLogSynchronizer(new DefaultDeltaLakeFileSystemFactory(localFileSystemFactory, new NoOpVendedCredentialsProvider()))); + .addBinding("local").toInstance(localTransactionLogSynchronizer); configBinder(binder).bindConfigDefaults( FileHiveMetastoreConfig.class, metastoreConfig -> metastoreConfig.setCatalogDirectory("local:///" + catalogName)); diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/writer/TestingLocalTransactionLogSynchronizer.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/writer/TestingLocalTransactionLogSynchronizer.java new file mode 100644 index 000000000000..829b201328db --- /dev/null +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/writer/TestingLocalTransactionLogSynchronizer.java @@ -0,0 +1,86 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.deltalake.transactionlog.writer; + +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystem; +import io.trino.filesystem.local.LocalFileSystem; +import io.trino.plugin.deltalake.DeltaLakeFileSystemFactory; +import io.trino.plugin.deltalake.metastore.VendedCredentialsHandle; +import io.trino.spi.connector.ConnectorSession; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.Path; + +import static io.trino.plugin.base.util.Closables.closeAllSuppress; +import static java.nio.file.StandardCopyOption.ATOMIC_MOVE; +import static java.nio.file.StandardOpenOption.CREATE_NEW; +import static java.util.Objects.requireNonNull; +import static java.util.UUID.randomUUID; + +public class TestingLocalTransactionLogSynchronizer + implements TransactionLogSynchronizer +{ + private final DeltaLakeFileSystemFactory fileSystemFactory; + + public TestingLocalTransactionLogSynchronizer(DeltaLakeFileSystemFactory fileSystemFactory) + { + this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); + } + + @Override + public void write(ConnectorSession session, VendedCredentialsHandle credentialsHandle, String clusterId, Location newLogEntryPath, byte[] entryContents) + { + try { + TrinoFileSystem fileSystem = fileSystemFactory.create(session, credentialsHandle); + Path targetPath = ((LocalFileSystem) fileSystem).toFilePath(newLogEntryPath); + Files.createDirectories(targetPath.getParent()); + Path tmpPath = targetPath.resolveSibling(".tmp.%s.%s".formatted(targetPath.getFileName().toString(), randomUUID())); + Files.write(tmpPath, entryContents, CREATE_NEW); + try { + // It's important that file is renamed atomically (hence the ATOMIC_MOVE option), as + // reads do not synchronize with the log writer in any way. + // Files.move with ATOMIC_MOVE is not guaranteed to be exclusive. + // In fact, it can be seen to overwrite the target file if it already exists. + // In-memory synchronization should be sufficient for tests, + // assuming single TestingDeltaLakePlugin instance is used. + synchronized (this) { + if (Files.exists(targetPath)) { + throw new FileAlreadyExistsException(targetPath.toString()); + } + Files.move(tmpPath, targetPath, ATOMIC_MOVE); + } + } + catch (Throwable t) { + closeAllSuppress(t, () -> Files.deleteIfExists(tmpPath)); + throw t; + } + } + catch (FileAlreadyExistsException e) { + throw new TransactionConflictException("Conflict detected while writing Transaction Log entry " + newLogEntryPath, e); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public boolean isUnsafe() + { + return false; + } +}