From 479815e72822d4e52df9a870eb4f9c269a1ba215 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Tue, 3 Feb 2026 09:13:23 +0100 Subject: [PATCH 1/3] Remove unused Guice annotation --- .../transactionlog/writer/LocalTransactionLogSynchronizer.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/writer/LocalTransactionLogSynchronizer.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/writer/LocalTransactionLogSynchronizer.java index 6056e4d4d946..723df44ac922 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/writer/LocalTransactionLogSynchronizer.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/writer/LocalTransactionLogSynchronizer.java @@ -13,7 +13,6 @@ */ package io.trino.plugin.deltalake.transactionlog.writer; -import com.google.inject.Inject; import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystem; import io.trino.plugin.deltalake.DeltaLakeFileSystemFactory; @@ -32,7 +31,6 @@ public class LocalTransactionLogSynchronizer { private final DeltaLakeFileSystemFactory fileSystemFactory; - @Inject public LocalTransactionLogSynchronizer(DeltaLakeFileSystemFactory fileSystemFactory) { this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); From 7b75321d00aced1ac99e6a9f5408175e7e0aea64 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Tue, 3 Feb 2026 09:14:37 +0100 Subject: [PATCH 2/3] Move LocalTransactionLogSynchronizer to tests The `LocalTransactionLogSynchronizer` is test-only facility. Move it to tests folder. --- .../transactionlog/writer/LocalTransactionLogSynchronizer.java | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename plugin/trino-delta-lake/src/{main => test}/java/io/trino/plugin/deltalake/transactionlog/writer/LocalTransactionLogSynchronizer.java (100%) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/writer/LocalTransactionLogSynchronizer.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/writer/LocalTransactionLogSynchronizer.java similarity index 100% rename from plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/writer/LocalTransactionLogSynchronizer.java rename to plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/writer/LocalTransactionLogSynchronizer.java From 5db5e07f9e369df0688e327d35772e5b219f2646 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Tue, 3 Feb 2026 11:44:14 +0100 Subject: [PATCH 3/3] Synchronize Delta log writing in tests too `LocalTransactionLogSynchronizer` used in Delta tests was not creating transaction log files atomically, leading to test flakiness. For tests, synchronization with a Java lock should be sufficient. The class is also renamed to avoid confusion. --- .../filesystem/local/LocalFileSystem.java | 2 +- ...estDeltaLakeLocalConcurrentWritesTest.java | 2 - .../deltalake/TestingDeltaLakePlugin.java | 13 +-- .../LocalTransactionLogSynchronizer.java | 59 ------------- ...estingLocalTransactionLogSynchronizer.java | 86 +++++++++++++++++++ 5 files changed, 94 insertions(+), 68 deletions(-) delete mode 100644 plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/writer/LocalTransactionLogSynchronizer.java create mode 100644 plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/writer/TestingLocalTransactionLogSynchronizer.java diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/local/LocalFileSystem.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/local/LocalFileSystem.java index eba9b27014d0..624438ffb2f1 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/local/LocalFileSystem.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/local/LocalFileSystem.java @@ -253,7 +253,7 @@ public Optional createTemporaryDirectory(Location targetPath, String t return Optional.of(temporary); } - private Path toFilePath(Location location) + public Path toFilePath(Location location) { validateLocalLocation(location); location.verifyValidFileLocation(); diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeLocalConcurrentWritesTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeLocalConcurrentWritesTest.java index 1ae42adcb9cc..51a65b5d58d5 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeLocalConcurrentWritesTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeLocalConcurrentWritesTest.java @@ -206,7 +206,6 @@ private void testConcurrentInsertsSelectingFromTheSameTable(boolean partitioned) } @Test - @Disabled // TODO https://github.com/trinodb/trino/issues/21725 Fix flaky test public void testConcurrentInsertsSelectingFromTheSameVersionedTable() throws Exception { @@ -263,7 +262,6 @@ private void testConcurrentInsertsSelectingFromTheSameVersionedTable(boolean par } @Test - @Disabled // TODO https://github.com/trinodb/trino/issues/21725 Fix flaky test void testConcurrentInsertsSelectingFromTheSameTemporalVersionedTable() throws Exception { diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestingDeltaLakePlugin.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestingDeltaLakePlugin.java index b1da80b478e1..695a33a06749 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestingDeltaLakePlugin.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestingDeltaLakePlugin.java @@ -17,7 +17,7 @@ import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.filesystem.local.LocalFileSystemFactory; import io.trino.plugin.deltalake.metastore.NoOpVendedCredentialsProvider; -import io.trino.plugin.deltalake.transactionlog.writer.LocalTransactionLogSynchronizer; +import io.trino.plugin.deltalake.transactionlog.writer.TestingLocalTransactionLogSynchronizer; import io.trino.plugin.deltalake.transactionlog.writer.TransactionLogSynchronizer; import io.trino.plugin.hive.metastore.file.FileHiveMetastoreConfig; import io.trino.spi.connector.Connector; @@ -38,7 +38,8 @@ public class TestingDeltaLakePlugin extends DeltaLakePlugin { - private final Path localFileSystemRootPath; + private final LocalFileSystemFactory localFileSystemFactory; + private final TestingLocalTransactionLogSynchronizer localTransactionLogSynchronizer; private final Supplier> metastoreModule; public TestingDeltaLakePlugin(Path localFileSystemRootPath) @@ -48,7 +49,9 @@ public TestingDeltaLakePlugin(Path localFileSystemRootPath) public TestingDeltaLakePlugin(Path localFileSystemRootPath, Supplier> metastoreModule) { - this.localFileSystemRootPath = requireNonNull(localFileSystemRootPath, "localFileSystemRootPath is null"); + localFileSystemRootPath.toFile().mkdirs(); + localFileSystemFactory = new LocalFileSystemFactory(localFileSystemRootPath); + localTransactionLogSynchronizer = new TestingLocalTransactionLogSynchronizer(new DefaultDeltaLakeFileSystemFactory(localFileSystemFactory, new NoOpVendedCredentialsProvider())); this.metastoreModule = requireNonNull(metastoreModule, "metastoreModule is null"); } @@ -66,7 +69,6 @@ public String getName() @Override public Connector create(String catalogName, Map config, ConnectorContext context) { - localFileSystemRootPath.toFile().mkdirs(); return createConnector( catalogName, config, @@ -74,11 +76,10 @@ public Connector create(String catalogName, Map config, Connecto metastoreModule.get(), binder -> { binder.install(new TestingDeltaLakeExtensionsModule()); - LocalFileSystemFactory localFileSystemFactory = new LocalFileSystemFactory(localFileSystemRootPath); newMapBinder(binder, String.class, TrinoFileSystemFactory.class) .addBinding("local").toInstance(localFileSystemFactory); newMapBinder(binder, String.class, TransactionLogSynchronizer.class) - .addBinding("local").toInstance(new LocalTransactionLogSynchronizer(new DefaultDeltaLakeFileSystemFactory(localFileSystemFactory, new NoOpVendedCredentialsProvider()))); + .addBinding("local").toInstance(localTransactionLogSynchronizer); configBinder(binder).bindConfigDefaults( FileHiveMetastoreConfig.class, metastoreConfig -> metastoreConfig.setCatalogDirectory("local:///" + catalogName)); diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/writer/LocalTransactionLogSynchronizer.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/writer/LocalTransactionLogSynchronizer.java deleted file mode 100644 index 723df44ac922..000000000000 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/writer/LocalTransactionLogSynchronizer.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.plugin.deltalake.transactionlog.writer; - -import io.trino.filesystem.Location; -import io.trino.filesystem.TrinoFileSystem; -import io.trino.plugin.deltalake.DeltaLakeFileSystemFactory; -import io.trino.plugin.deltalake.metastore.VendedCredentialsHandle; -import io.trino.spi.connector.ConnectorSession; - -import java.io.IOException; -import java.io.OutputStream; -import java.io.UncheckedIOException; -import java.nio.file.FileAlreadyExistsException; - -import static java.util.Objects.requireNonNull; - -public class LocalTransactionLogSynchronizer - implements TransactionLogSynchronizer -{ - private final DeltaLakeFileSystemFactory fileSystemFactory; - - public LocalTransactionLogSynchronizer(DeltaLakeFileSystemFactory fileSystemFactory) - { - this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); - } - - @Override - public void write(ConnectorSession session, VendedCredentialsHandle credentialsHandle, String clusterId, Location newLogEntryPath, byte[] entryContents) - { - TrinoFileSystem fileSystem = fileSystemFactory.create(session, credentialsHandle); - try (OutputStream outputStream = fileSystem.newOutputFile(newLogEntryPath).create()) { - outputStream.write(entryContents); - } - catch (FileAlreadyExistsException e) { - throw new TransactionConflictException("Conflict detected while writing Transaction Log entry " + newLogEntryPath, e); - } - catch (IOException e) { - throw new UncheckedIOException(e); - } - } - - @Override - public boolean isUnsafe() - { - return false; - } -} diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/writer/TestingLocalTransactionLogSynchronizer.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/writer/TestingLocalTransactionLogSynchronizer.java new file mode 100644 index 000000000000..829b201328db --- /dev/null +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/writer/TestingLocalTransactionLogSynchronizer.java @@ -0,0 +1,86 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.deltalake.transactionlog.writer; + +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystem; +import io.trino.filesystem.local.LocalFileSystem; +import io.trino.plugin.deltalake.DeltaLakeFileSystemFactory; +import io.trino.plugin.deltalake.metastore.VendedCredentialsHandle; +import io.trino.spi.connector.ConnectorSession; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.Path; + +import static io.trino.plugin.base.util.Closables.closeAllSuppress; +import static java.nio.file.StandardCopyOption.ATOMIC_MOVE; +import static java.nio.file.StandardOpenOption.CREATE_NEW; +import static java.util.Objects.requireNonNull; +import static java.util.UUID.randomUUID; + +public class TestingLocalTransactionLogSynchronizer + implements TransactionLogSynchronizer +{ + private final DeltaLakeFileSystemFactory fileSystemFactory; + + public TestingLocalTransactionLogSynchronizer(DeltaLakeFileSystemFactory fileSystemFactory) + { + this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); + } + + @Override + public void write(ConnectorSession session, VendedCredentialsHandle credentialsHandle, String clusterId, Location newLogEntryPath, byte[] entryContents) + { + try { + TrinoFileSystem fileSystem = fileSystemFactory.create(session, credentialsHandle); + Path targetPath = ((LocalFileSystem) fileSystem).toFilePath(newLogEntryPath); + Files.createDirectories(targetPath.getParent()); + Path tmpPath = targetPath.resolveSibling(".tmp.%s.%s".formatted(targetPath.getFileName().toString(), randomUUID())); + Files.write(tmpPath, entryContents, CREATE_NEW); + try { + // It's important that file is renamed atomically (hence the ATOMIC_MOVE option), as + // reads do not synchronize with the log writer in any way. + // Files.move with ATOMIC_MOVE is not guaranteed to be exclusive. + // In fact, it can be seen to overwrite the target file if it already exists. + // In-memory synchronization should be sufficient for tests, + // assuming single TestingDeltaLakePlugin instance is used. + synchronized (this) { + if (Files.exists(targetPath)) { + throw new FileAlreadyExistsException(targetPath.toString()); + } + Files.move(tmpPath, targetPath, ATOMIC_MOVE); + } + } + catch (Throwable t) { + closeAllSuppress(t, () -> Files.deleteIfExists(tmpPath)); + throw t; + } + } + catch (FileAlreadyExistsException e) { + throw new TransactionConflictException("Conflict detected while writing Transaction Log entry " + newLogEntryPath, e); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public boolean isUnsafe() + { + return false; + } +}