Synchronize Delta log writing in tests too#28092
Conversation
The `LocalTransactionLogSynchronizer` is test-only facility. Move it to tests folder.
This comment was marked as outdated.
This comment was marked as outdated.
4b92b62 to
027e444
Compare
fc78595 to
259081e
Compare
chenjian2664
left a comment
There was a problem hiding this comment.
I think the first and second commits and be merged into one.
The third commit looks good to me. However, we should mention (either in a comment or in the commit message) that we are not testing concurrency across multiple clusters — only multi-threaded scenarios within a single cluster
These are unrelated changes, so i will keep separate, ok? |
`LocalTransactionLogSynchronizer` used in Delta tests was not creating transaction log files atomically, leading to test flakiness. For tests, synchronization with a Java lock should be sufficient. The class is also renamed to avoid confusion.
259081e to
5db5e07
Compare
thanks!
expanded the comment, see #28092 (comment) |
| } | ||
|
|
||
| @Test | ||
| @Disabled // TODO https://github.com/trinodb/trino/issues/21725 Fix flaky test |
There was a problem hiding this comment.
This test failed on master https://github.com/trinodb/trino/actions/runs/22487358862/job/65140354090
Error: io.trino.plugin.deltalake.TestDeltaLakeLocalConcurrentWritesTest.testConcurrentInsertsSelectingFromTheSameVersionedTable -- Time elapsed: 2.239 s <<< ERROR!
io.trino.testing.QueryFailedException: Failed to write Delta Lake transaction log entry
at io.trino.testing.AbstractTestingTrinoClient.execute(AbstractTestingTrinoClient.java:138)
at io.trino.testing.DistributedQueryRunner.executeInternal(DistributedQueryRunner.java:591)
at io.trino.testing.DistributedQueryRunner.execute(DistributedQueryRunner.java:574)
at io.trino.testing.QueryRunner.execute(QueryRunner.java:82)
at io.trino.plugin.deltalake.TestDeltaLakeLocalConcurrentWritesTest.lambda$testConcurrentInsertsSelectingFromTheSameVersionedTable$1(TestDeltaLakeLocalConcurrentWritesTest.java:235)
Suppressed: java.lang.Exception: SQL: INSERT INTO test_concurrent_inserts_select_from_same_versioned_table_uegc9xjare SELECT 2, 'c' AS part FROM test_concurrent_inserts_select_from_same_versioned_table_uegc9xjare FOR VERSION AS OF 0
at io.trino.testing.DistributedQueryRunner.executeInternal(DistributedQueryRunner.java:598)
... 3 more
Caused by: io.trino.spi.TrinoException: Failed to write Delta Lake transaction log entry
at io.trino.plugin.deltalake.DeltaLakeMetadata.finishInsert(DeltaLakeMetadata.java:2354)
at io.trino.plugin.base.classloader.ClassLoaderSafeConnectorMetadata.finishInsert(ClassLoaderSafeConnectorMetadata.java:628)
at io.trino.tracing.TracingConnectorMetadata.finishInsert(TracingConnectorMetadata.java:708)
at io.trino.metadata.MetadataManager.finishInsert(MetadataManager.java:1232)
at io.trino.tracing.TracingMetadata.finishInsert(TracingMetadata.java:742)
at io.trino.sql.planner.LocalExecutionPlanner.lambda$createTableFinisher$0(LocalExecutionPlanner.java:4180)
at io.trino.operator.TableFinishOperator.getOutput(TableFinishOperator.java:315)
at io.trino.operator.Driver.processInternal(Driver.java:403)
at io.trino.operator.Driver.lambda$process$0(Driver.java:306)
at io.trino.operator.Driver.tryWithLock(Driver.java:709)
at io.trino.operator.Driver.process(Driver.java:298)
at io.trino.operator.Driver.processForDuration(Driver.java:269)
at io.trino.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:852)
at io.trino.execution.executor.dedicated.SplitProcessor.run(SplitProcessor.java:77)
at io.trino.execution.executor.dedicated.TaskEntry$VersionEmbedderBridge.lambda$run$0(TaskEntry.java:205)
at io.trino.$gen.Trino_testversion____20260227_131036_2.run(Unknown Source)
at io.trino.execution.executor.dedicated.TaskEntry$VersionEmbedderBridge.run(TaskEntry.java:206)
at io.trino.execution.executor.scheduler.FairScheduler.runTask(FairScheduler.java:177)
at io.trino.execution.executor.scheduler.FairScheduler.lambda$submit$0(FairScheduler.java:164)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:545)
at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:128)
at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:74)
at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:80)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1090)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:614)
at java.base/java.lang.Thread.run(Thread.java:1474)
Caused by: io.trino.spi.TrinoException: Error reading statistics from cache
at io.trino.plugin.deltalake.statistics.CachingExtendedStatisticsAccess.readExtendedStatistics(CachingExtendedStatisticsAccess.java:68)
at io.trino.plugin.deltalake.DeltaLakeMetadata.updateTableStatistics(DeltaLakeMetadata.java:4077)
at io.trino.plugin.deltalake.DeltaLakeMetadata.finishInsert(DeltaLakeMetadata.java:2335)
... 25 more
Caused by: java.lang.RuntimeException: Failed to decode JSON
at io.trino.plugin.deltalake.statistics.MetaDirStatisticsAccess.decodeAndRethrowIfNotFound(MetaDirStatisticsAccess.java:142)
at io.trino.plugin.deltalake.statistics.MetaDirStatisticsAccess.readExtendedStatistics(MetaDirStatisticsAccess.java:83)
at io.trino.plugin.deltalake.statistics.MetaDirStatisticsAccess.readExtendedStatistics(MetaDirStatisticsAccess.java:68)
at io.trino.plugin.deltalake.statistics.CachingExtendedStatisticsAccess.lambda$readExtendedStatistics$0(CachingExtendedStatisticsAccess.java:64)
at com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4859)
at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3556)
at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2307)
at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2180)
at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2071)
at com.google.common.cache.LocalCache.get(LocalCache.java:3985)
at com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4854)
at io.trino.cache.EvictableCache.get(EvictableCache.java:120)
at io.trino.cache.CacheUtils.uncheckedCacheGet(CacheUtils.java:39)
at io.trino.plugin.deltalake.statistics.CachingExtendedStatisticsAccess.readExtendedStatistics(CachingExtendedStatisticsAccess.java:64)
... 27 more
Caused by: java.lang.IllegalArgumentException: Invalid JSON bytes for [simple type, class io.trino.plugin.deltalake.statistics.ExtendedStatistics]
at io.airlift.json.JsonCodec.fromJson(JsonCodec.java:242)
at io.trino.plugin.deltalake.statistics.MetaDirStatisticsAccess.decodeAndRethrowIfNotFound(MetaDirStatisticsAccess.java:138)
... 40 more
Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: No content to map due to end-of-input
at [Source: REDACTED (`StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION` disabled); line: 1]
at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:72)
at com.fasterxml.jackson.databind.DeserializationContext.reportInputMismatch(DeserializationContext.java:1814)
at com.fasterxml.jackson.databind.ObjectReader._initForReading(ObjectReader.java:359)
at com.fasterxml.jackson.databind.ObjectReader._bind(ObjectReader.java:2104)
at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:1252)
at io.airlift.json.JsonCodec.fromJson(JsonCodec.java:237)
... 41 more
LocalTransactionLogSynchronizerused in Delta tests was not creatingtransaction log files atomically, leading to test flakiness. For tests,
synchronization with a Java lock should be sufficient. The class is also
renamed to avoid confusion.
Alternative to
This
testConcurrentInsertsSelectingFromTheSameVersionedTableandtestConcurrentInsertsSelectingFromTheSameTemporalVersionedTableinTestDeltaLakeLocalConcurrentWritesTest#21725