Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ public Optional<Location> createTemporaryDirectory(Location targetPath, String t
return Optional.of(temporary);
}

private Path toFilePath(Location location)
public Path toFilePath(Location location)
{
validateLocalLocation(location);
location.verifyValidFileLocation();
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ private void testConcurrentInsertsSelectingFromTheSameTable(boolean partitioned)
}

@Test
@Disabled // TODO https://github.com/trinodb/trino/issues/21725 Fix flaky test
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test failed on master https://github.com/trinodb/trino/actions/runs/22487358862/job/65140354090

Error:  io.trino.plugin.deltalake.TestDeltaLakeLocalConcurrentWritesTest.testConcurrentInsertsSelectingFromTheSameVersionedTable -- Time elapsed: 2.239 s <<< ERROR!
io.trino.testing.QueryFailedException: Failed to write Delta Lake transaction log entry
	at io.trino.testing.AbstractTestingTrinoClient.execute(AbstractTestingTrinoClient.java:138)
	at io.trino.testing.DistributedQueryRunner.executeInternal(DistributedQueryRunner.java:591)
	at io.trino.testing.DistributedQueryRunner.execute(DistributedQueryRunner.java:574)
	at io.trino.testing.QueryRunner.execute(QueryRunner.java:82)
	at io.trino.plugin.deltalake.TestDeltaLakeLocalConcurrentWritesTest.lambda$testConcurrentInsertsSelectingFromTheSameVersionedTable$1(TestDeltaLakeLocalConcurrentWritesTest.java:235)
	Suppressed: java.lang.Exception: SQL: INSERT INTO test_concurrent_inserts_select_from_same_versioned_table_uegc9xjare SELECT 2, 'c' AS part FROM test_concurrent_inserts_select_from_same_versioned_table_uegc9xjare FOR VERSION AS OF 0
		at io.trino.testing.DistributedQueryRunner.executeInternal(DistributedQueryRunner.java:598)
		... 3 more
Caused by: io.trino.spi.TrinoException: Failed to write Delta Lake transaction log entry
	at io.trino.plugin.deltalake.DeltaLakeMetadata.finishInsert(DeltaLakeMetadata.java:2354)
	at io.trino.plugin.base.classloader.ClassLoaderSafeConnectorMetadata.finishInsert(ClassLoaderSafeConnectorMetadata.java:628)
	at io.trino.tracing.TracingConnectorMetadata.finishInsert(TracingConnectorMetadata.java:708)
	at io.trino.metadata.MetadataManager.finishInsert(MetadataManager.java:1232)
	at io.trino.tracing.TracingMetadata.finishInsert(TracingMetadata.java:742)
	at io.trino.sql.planner.LocalExecutionPlanner.lambda$createTableFinisher$0(LocalExecutionPlanner.java:4180)
	at io.trino.operator.TableFinishOperator.getOutput(TableFinishOperator.java:315)
	at io.trino.operator.Driver.processInternal(Driver.java:403)
	at io.trino.operator.Driver.lambda$process$0(Driver.java:306)
	at io.trino.operator.Driver.tryWithLock(Driver.java:709)
	at io.trino.operator.Driver.process(Driver.java:298)
	at io.trino.operator.Driver.processForDuration(Driver.java:269)
	at io.trino.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:852)
	at io.trino.execution.executor.dedicated.SplitProcessor.run(SplitProcessor.java:77)
	at io.trino.execution.executor.dedicated.TaskEntry$VersionEmbedderBridge.lambda$run$0(TaskEntry.java:205)
	at io.trino.$gen.Trino_testversion____20260227_131036_2.run(Unknown Source)
	at io.trino.execution.executor.dedicated.TaskEntry$VersionEmbedderBridge.run(TaskEntry.java:206)
	at io.trino.execution.executor.scheduler.FairScheduler.runTask(FairScheduler.java:177)
	at io.trino.execution.executor.scheduler.FairScheduler.lambda$submit$0(FairScheduler.java:164)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:545)
	at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:128)
	at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:74)
	at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:80)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1090)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:614)
	at java.base/java.lang.Thread.run(Thread.java:1474)
Caused by: io.trino.spi.TrinoException: Error reading statistics from cache
	at io.trino.plugin.deltalake.statistics.CachingExtendedStatisticsAccess.readExtendedStatistics(CachingExtendedStatisticsAccess.java:68)
	at io.trino.plugin.deltalake.DeltaLakeMetadata.updateTableStatistics(DeltaLakeMetadata.java:4077)
	at io.trino.plugin.deltalake.DeltaLakeMetadata.finishInsert(DeltaLakeMetadata.java:2335)
	... 25 more
Caused by: java.lang.RuntimeException: Failed to decode JSON
	at io.trino.plugin.deltalake.statistics.MetaDirStatisticsAccess.decodeAndRethrowIfNotFound(MetaDirStatisticsAccess.java:142)
	at io.trino.plugin.deltalake.statistics.MetaDirStatisticsAccess.readExtendedStatistics(MetaDirStatisticsAccess.java:83)
	at io.trino.plugin.deltalake.statistics.MetaDirStatisticsAccess.readExtendedStatistics(MetaDirStatisticsAccess.java:68)
	at io.trino.plugin.deltalake.statistics.CachingExtendedStatisticsAccess.lambda$readExtendedStatistics$0(CachingExtendedStatisticsAccess.java:64)
	at com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4859)
	at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3556)
	at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2307)
	at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2180)
	at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2071)
	at com.google.common.cache.LocalCache.get(LocalCache.java:3985)
	at com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4854)
	at io.trino.cache.EvictableCache.get(EvictableCache.java:120)
	at io.trino.cache.CacheUtils.uncheckedCacheGet(CacheUtils.java:39)
	at io.trino.plugin.deltalake.statistics.CachingExtendedStatisticsAccess.readExtendedStatistics(CachingExtendedStatisticsAccess.java:64)
	... 27 more
Caused by: java.lang.IllegalArgumentException: Invalid JSON bytes for [simple type, class io.trino.plugin.deltalake.statistics.ExtendedStatistics]
	at io.airlift.json.JsonCodec.fromJson(JsonCodec.java:242)
	at io.trino.plugin.deltalake.statistics.MetaDirStatisticsAccess.decodeAndRethrowIfNotFound(MetaDirStatisticsAccess.java:138)
	... 40 more
Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: No content to map due to end-of-input
 at [Source: REDACTED (`StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION` disabled); line: 1]
	at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:72)
	at com.fasterxml.jackson.databind.DeserializationContext.reportInputMismatch(DeserializationContext.java:1814)
	at com.fasterxml.jackson.databind.ObjectReader._initForReading(ObjectReader.java:359)
	at com.fasterxml.jackson.databind.ObjectReader._bind(ObjectReader.java:2104)
	at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:1252)
	at io.airlift.json.JsonCodec.fromJson(JsonCodec.java:237)
	... 41 more

public void testConcurrentInsertsSelectingFromTheSameVersionedTable()
throws Exception
{
Expand Down Expand Up @@ -263,7 +262,6 @@ private void testConcurrentInsertsSelectingFromTheSameVersionedTable(boolean par
}

@Test
@Disabled // TODO https://github.com/trinodb/trino/issues/21725 Fix flaky test
void testConcurrentInsertsSelectingFromTheSameTemporalVersionedTable()
throws Exception
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.filesystem.local.LocalFileSystemFactory;
import io.trino.plugin.deltalake.metastore.NoOpVendedCredentialsProvider;
import io.trino.plugin.deltalake.transactionlog.writer.LocalTransactionLogSynchronizer;
import io.trino.plugin.deltalake.transactionlog.writer.TestingLocalTransactionLogSynchronizer;
import io.trino.plugin.deltalake.transactionlog.writer.TransactionLogSynchronizer;
import io.trino.plugin.hive.metastore.file.FileHiveMetastoreConfig;
import io.trino.spi.connector.Connector;
Expand All @@ -38,7 +38,8 @@
public class TestingDeltaLakePlugin
extends DeltaLakePlugin
{
private final Path localFileSystemRootPath;
private final LocalFileSystemFactory localFileSystemFactory;
private final TestingLocalTransactionLogSynchronizer localTransactionLogSynchronizer;
private final Supplier<Optional<Module>> metastoreModule;

public TestingDeltaLakePlugin(Path localFileSystemRootPath)
Expand All @@ -48,7 +49,9 @@ public TestingDeltaLakePlugin(Path localFileSystemRootPath)

public TestingDeltaLakePlugin(Path localFileSystemRootPath, Supplier<Optional<Module>> metastoreModule)
{
this.localFileSystemRootPath = requireNonNull(localFileSystemRootPath, "localFileSystemRootPath is null");
localFileSystemRootPath.toFile().mkdirs();
localFileSystemFactory = new LocalFileSystemFactory(localFileSystemRootPath);
localTransactionLogSynchronizer = new TestingLocalTransactionLogSynchronizer(new DefaultDeltaLakeFileSystemFactory(localFileSystemFactory, new NoOpVendedCredentialsProvider()));
this.metastoreModule = requireNonNull(metastoreModule, "metastoreModule is null");
}

Expand All @@ -66,19 +69,17 @@ public String getName()
@Override
public Connector create(String catalogName, Map<String, String> config, ConnectorContext context)
{
localFileSystemRootPath.toFile().mkdirs();
return createConnector(
catalogName,
config,
context,
metastoreModule.get(),
binder -> {
binder.install(new TestingDeltaLakeExtensionsModule());
LocalFileSystemFactory localFileSystemFactory = new LocalFileSystemFactory(localFileSystemRootPath);
newMapBinder(binder, String.class, TrinoFileSystemFactory.class)
.addBinding("local").toInstance(localFileSystemFactory);
newMapBinder(binder, String.class, TransactionLogSynchronizer.class)
.addBinding("local").toInstance(new LocalTransactionLogSynchronizer(new DefaultDeltaLakeFileSystemFactory(localFileSystemFactory, new NoOpVendedCredentialsProvider())));
.addBinding("local").toInstance(localTransactionLogSynchronizer);
configBinder(binder).bindConfigDefaults(
FileHiveMetastoreConfig.class,
metastoreConfig -> metastoreConfig.setCatalogDirectory("local:///" + catalogName));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.deltalake.transactionlog.writer;

import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.local.LocalFileSystem;
import io.trino.plugin.deltalake.DeltaLakeFileSystemFactory;
import io.trino.plugin.deltalake.metastore.VendedCredentialsHandle;
import io.trino.spi.connector.ConnectorSession;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.Path;

import static io.trino.plugin.base.util.Closables.closeAllSuppress;
import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
import static java.nio.file.StandardOpenOption.CREATE_NEW;
import static java.util.Objects.requireNonNull;
import static java.util.UUID.randomUUID;

public class TestingLocalTransactionLogSynchronizer
implements TransactionLogSynchronizer
{
private final DeltaLakeFileSystemFactory fileSystemFactory;

public TestingLocalTransactionLogSynchronizer(DeltaLakeFileSystemFactory fileSystemFactory)
Comment thread
findepi marked this conversation as resolved.
{
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
}

@Override
public void write(ConnectorSession session, VendedCredentialsHandle credentialsHandle, String clusterId, Location newLogEntryPath, byte[] entryContents)
{
try {
TrinoFileSystem fileSystem = fileSystemFactory.create(session, credentialsHandle);
Path targetPath = ((LocalFileSystem) fileSystem).toFilePath(newLogEntryPath);
Files.createDirectories(targetPath.getParent());
Path tmpPath = targetPath.resolveSibling(".tmp.%s.%s".formatted(targetPath.getFileName().toString(), randomUUID()));
Comment thread
findepi marked this conversation as resolved.
Files.write(tmpPath, entryContents, CREATE_NEW);
try {
// It's important that file is renamed atomically (hence the ATOMIC_MOVE option), as
// reads do not synchronize with the log writer in any way.
// Files.move with ATOMIC_MOVE is not guaranteed to be exclusive.
Comment thread
findepi marked this conversation as resolved.
// In fact, it can be seen to overwrite the target file if it already exists.
// In-memory synchronization should be sufficient for tests,
// assuming single TestingDeltaLakePlugin instance is used.
synchronized (this) {
if (Files.exists(targetPath)) {
throw new FileAlreadyExistsException(targetPath.toString());
}
Files.move(tmpPath, targetPath, ATOMIC_MOVE);
}
}
catch (Throwable t) {
closeAllSuppress(t, () -> Files.deleteIfExists(tmpPath));
throw t;
}
}
catch (FileAlreadyExistsException e) {
throw new TransactionConflictException("Conflict detected while writing Transaction Log entry " + newLogEntryPath, e);
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
}

@Override
public boolean isUnsafe()
{
return false;
}
}