diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index 4f3da6a15d7b..0bbe9742b07c 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -2224,7 +2224,10 @@ private long commitInsertOperation( long currentVersion = getMandatoryCurrentVersion(fileSystem, handle.location(), readVersion.get()); List sameAsTargetSourceTableHandles = getSameAsTargetSourceTableHandles(sourceTableHandles, handle.tableName()); - checkForConcurrentTransactionConflicts(session, fileSystem, sameAsTargetSourceTableHandles, isolationLevel, currentVersion, readVersion, handle.location(), attemptCount); + List> enforcedSourcePartitionConstraints = sameAsTargetSourceTableHandles.stream() + .map(DeltaLakeTableHandle::getEnforcedPartitionConstraint) + .collect(toImmutableList()); + checkForConcurrentTransactionConflicts(session, fileSystem, enforcedSourcePartitionConstraints, isolationLevel, currentVersion, readVersion, handle.location(), attemptCount); long commitVersion = currentVersion + 1; writeTransactionLogForInsertOperation(session, handle, sameAsTargetSourceTableHandles.isEmpty(), isolationLevel, dataFileInfos, commitVersion, currentVersion); return commitVersion; @@ -2246,7 +2249,7 @@ private List getSameAsTargetSourceTableHandles( private void checkForConcurrentTransactionConflicts( ConnectorSession session, TrinoFileSystem fileSystem, - List sameAsTargetSourceTableHandles, + List> enforcedSourcePartitionConstraints, IsolationLevel isolationLevel, long currentVersion, AtomicReference readVersion, @@ -2272,10 +2275,7 @@ private void checkForConcurrentTransactionConflicts( switch (isolationLevel) { case WRITESERIALIZABLE -> { - if (!sameAsTargetSourceTableHandles.isEmpty()) { - List> enforcedSourcePartitionConstraints = sameAsTargetSourceTableHandles.stream() - .map(DeltaLakeTableHandle::getEnforcedPartitionConstraint) - .collect(toImmutableList()); + if (!enforcedSourcePartitionConstraints.isEmpty()) { TupleDomain enforcedSourcePartitionConstraintsUnion = TupleDomain.columnWiseUnion(enforcedSourcePartitionConstraints); checkIfCommittedAddedFilesConflictWithCurrentOperation(enforcedSourcePartitionConstraintsUnion, commitSummary); @@ -2554,9 +2554,12 @@ private long commitMergeOperation( long createdTime = Instant.now().toEpochMilli(); List sameAsTargetSourceTableHandles = getSameAsTargetSourceTableHandles(sourceTableHandles, handle.getSchemaTableName()); + List> enforcedSourcePartitionConstraints = sameAsTargetSourceTableHandles.stream() + .map(DeltaLakeTableHandle::getEnforcedPartitionConstraint) + .collect(toImmutableList()); TrinoFileSystem fileSystem = fileSystemFactory.create(session); long currentVersion = getMandatoryCurrentVersion(fileSystem, tableLocation, readVersion.get()); - checkForConcurrentTransactionConflicts(session, fileSystem, sameAsTargetSourceTableHandles, isolationLevel, currentVersion, readVersion, handle.getLocation(), attemptCount); + checkForConcurrentTransactionConflicts(session, fileSystem, enforcedSourcePartitionConstraints, isolationLevel, currentVersion, readVersion, handle.getLocation(), attemptCount); long commitVersion = currentVersion + 1; transactionLogWriter.appendCommitInfoEntry(getCommitInfoEntry(session, isolationLevel, commitVersion, createdTime, MERGE_OPERATION, handle.getReadVersion(), sameAsTargetSourceTableHandles.isEmpty())); @@ -2674,7 +2677,8 @@ private Optional getTableHandleForOptimize(DeltaLak tableHandle.getMetadataEntry().getOriginalPartitionColumns(), maxScannedFileSize, Optional.empty(), - retryMode != NO_RETRIES), + retryMode != NO_RETRIES, + tableHandle.getEnforcedPartitionConstraint()), tableHandle.getLocation())); } @@ -2728,7 +2732,10 @@ private BeginTableExecuteResult( - executeHandle.withProcedureHandle(optimizeHandle.withCurrentVersion(table.getReadVersion())), + executeHandle.withProcedureHandle( + optimizeHandle + .withCurrentVersion(table.getReadVersion()) + .withEnforcedPartitionConstraint(table.getEnforcedPartitionConstraint())), table.forOptimize(true, optimizeHandle.getMaxScannedFileSize())); } @@ -2747,11 +2754,10 @@ public void finishTableExecute(ConnectorSession session, ConnectorTableExecuteHa private void finishOptimize(ConnectorSession session, DeltaLakeTableExecuteHandle executeHandle, Collection fragments, List splitSourceInfo) { DeltaTableOptimizeHandle optimizeHandle = (DeltaTableOptimizeHandle) executeHandle.procedureHandle(); - long readVersion = optimizeHandle.getCurrentVersion().orElseThrow(() -> new IllegalArgumentException("currentVersion not set")); String tableLocation = executeHandle.tableLocation(); // paths to be deleted - Set scannnedDataFiles = splitSourceInfo.stream() + Set scannedDataFiles = splitSourceInfo.stream() .map(DeltaLakeScannedDataFile.class::cast) .collect(toImmutableSet()); @@ -2767,35 +2773,10 @@ private void finishOptimize(ConnectorSession session, DeltaLakeTableExecuteHandl boolean writeCommitted = false; try { - TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriter(session, tableLocation); - - long createdTime = Instant.now().toEpochMilli(); - long commitVersion = readVersion + 1; - transactionLogWriter.appendCommitInfoEntry(getCommitInfoEntry(session, IsolationLevel.WRITESERIALIZABLE, commitVersion, createdTime, OPTIMIZE_OPERATION, readVersion, false)); - // TODO: Delta writes another field "operationMetrics" that I haven't - // seen before. It contains delete/update metrics. Investigate/include it. - - long writeTimestamp = Instant.now().toEpochMilli(); - - for (DeltaLakeScannedDataFile scannedFile : scannnedDataFiles) { - String relativePath = relativePath(tableLocation, scannedFile.path()); - Map> canonicalPartitionValues = scannedFile.partitionKeys(); - transactionLogWriter.appendRemoveFileEntry(new RemoveFileEntry( - toUriFormat(relativePath), - createPartitionValuesMap(canonicalPartitionValues), - writeTimestamp, - false, - Optional.empty())); - } - - // Note: during writes we want to preserve original case of partition columns - List partitionColumns = getPartitionColumns( - optimizeHandle.getMetadataEntry().getOriginalPartitionColumns(), - optimizeHandle.getTableColumns(), - getColumnMappingMode(optimizeHandle.getMetadataEntry(), optimizeHandle.getProtocolEntry())); - appendAddFileEntries(transactionLogWriter, dataFileInfos, partitionColumns, getExactColumnNames(optimizeHandle.getMetadataEntry()), false); - - transactionLogWriter.flush(); + IsolationLevel isolationLevel = getIsolationLevel(optimizeHandle.getMetadataEntry()); + AtomicReference readVersion = new AtomicReference<>(optimizeHandle.getCurrentVersion().orElseThrow(() -> new IllegalArgumentException("currentVersion not set"))); + long commitVersion = Failsafe.with(TRANSACTION_CONFLICT_RETRY_POLICY) + .get(context -> commitOptimizeOperation(session, optimizeHandle, isolationLevel, tableLocation, scannedDataFiles, dataFileInfos, readVersion, context.getAttemptCount())); writeCommitted = true; enqueueUpdateInfo( session, @@ -2822,6 +2803,59 @@ private void finishOptimize(ConnectorSession session, DeltaLakeTableExecuteHandl } } + private long commitOptimizeOperation( + ConnectorSession session, + DeltaTableOptimizeHandle optimizeHandle, + IsolationLevel isolationLevel, + String tableLocation, + Set scannedDataFiles, + List dataFileInfos, + AtomicReference readVersion, + int attemptCount) + throws IOException + { + TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriter(session, tableLocation); + + long createdTime = Instant.now().toEpochMilli(); + TrinoFileSystem fileSystem = fileSystemFactory.create(session); + long currentVersion = getMandatoryCurrentVersion(fileSystem, tableLocation, readVersion.get()); + checkForConcurrentTransactionConflicts(session, fileSystem, ImmutableList.of(optimizeHandle.getEnforcedPartitionConstraint()), isolationLevel, currentVersion, readVersion, tableLocation, attemptCount); + long commitVersion = currentVersion + 1; + transactionLogWriter.appendCommitInfoEntry(getCommitInfoEntry( + session, + isolationLevel, + commitVersion, + createdTime, + OPTIMIZE_OPERATION, + optimizeHandle.getCurrentVersion().orElseThrow(() -> new IllegalArgumentException("currentVersion not set")), + false)); + // TODO: Delta writes another field "operationMetrics" that I haven't + // seen before. It contains delete/update metrics. Investigate/include it. + + long writeTimestamp = Instant.now().toEpochMilli(); + + for (DeltaLakeScannedDataFile scannedFile : scannedDataFiles) { + String relativePath = relativePath(tableLocation, scannedFile.path()); + Map> canonicalPartitionValues = scannedFile.partitionKeys(); + transactionLogWriter.appendRemoveFileEntry(new RemoveFileEntry( + toUriFormat(relativePath), + createPartitionValuesMap(canonicalPartitionValues), + writeTimestamp, + false, + Optional.empty())); + } + + // Note: during writes we want to preserve original case of partition columns + List partitionColumns = getPartitionColumns( + optimizeHandle.getMetadataEntry().getOriginalPartitionColumns(), + optimizeHandle.getTableColumns(), + getColumnMappingMode(optimizeHandle.getMetadataEntry(), optimizeHandle.getProtocolEntry())); + appendAddFileEntries(transactionLogWriter, dataFileInfos, partitionColumns, getExactColumnNames(optimizeHandle.getMetadataEntry()), false); + + transactionLogWriter.flush(); + return commitVersion; + } + private void checkWriteAllowed(ConnectorSession session, DeltaLakeTableHandle table) { if (!allowWrite(session, table)) { @@ -4158,7 +4192,7 @@ private CommitDeleteOperationResult commitDeleteOperation( long writeTimestamp = Instant.now().toEpochMilli(); TrinoFileSystem fileSystem = fileSystemFactory.create(session); long currentVersion = getMandatoryCurrentVersion(fileSystem, tableLocation, readVersion.get()); - checkForConcurrentTransactionConflicts(session, fileSystem, ImmutableList.of(tableHandle), isolationLevel, currentVersion, readVersion, tableHandle.getLocation(), attemptCount); + checkForConcurrentTransactionConflicts(session, fileSystem, ImmutableList.of(tableHandle.getEnforcedPartitionConstraint()), isolationLevel, currentVersion, readVersion, tableHandle.getLocation(), attemptCount); long commitVersion = currentVersion + 1; transactionLogWriter.appendCommitInfoEntry(getCommitInfoEntry(session, isolationLevel, commitVersion, writeTimestamp, operation, tableHandle.getReadVersion(), false)); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/DeltaTableOptimizeHandle.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/DeltaTableOptimizeHandle.java index 150d2661750a..d95cf1b8cf83 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/DeltaTableOptimizeHandle.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/DeltaTableOptimizeHandle.java @@ -20,6 +20,7 @@ import io.trino.plugin.deltalake.DeltaLakeColumnHandle; import io.trino.plugin.deltalake.transactionlog.MetadataEntry; import io.trino.plugin.deltalake.transactionlog.ProtocolEntry; +import io.trino.spi.predicate.TupleDomain; import java.util.List; import java.util.Optional; @@ -37,6 +38,7 @@ public class DeltaTableOptimizeHandle private final DataSize maxScannedFileSize; private final Optional currentVersion; private final boolean retriesEnabled; + private final TupleDomain enforcedPartitionConstraint; @JsonCreator public DeltaTableOptimizeHandle( @@ -46,7 +48,8 @@ public DeltaTableOptimizeHandle( List originalPartitionColumns, DataSize maxScannedFileSize, Optional currentVersion, - boolean retriesEnabled) + boolean retriesEnabled, + TupleDomain enforcedPartitionConstraint) { this.metadataEntry = requireNonNull(metadataEntry, "metadataEntry is null"); this.protocolEntry = requireNonNull(protocolEntry, "protocolEntry is null"); @@ -55,6 +58,7 @@ public DeltaTableOptimizeHandle( this.maxScannedFileSize = requireNonNull(maxScannedFileSize, "maxScannedFileSize is null"); this.currentVersion = requireNonNull(currentVersion, "currentVersion is null"); this.retriesEnabled = retriesEnabled; + this.enforcedPartitionConstraint = requireNonNull(enforcedPartitionConstraint, "enforcedPartitionConstraint is null"); } public DeltaTableOptimizeHandle withCurrentVersion(long currentVersion) @@ -67,7 +71,21 @@ public DeltaTableOptimizeHandle withCurrentVersion(long currentVersion) originalPartitionColumns, maxScannedFileSize, Optional.of(currentVersion), - retriesEnabled); + retriesEnabled, + enforcedPartitionConstraint); + } + + public DeltaTableOptimizeHandle withEnforcedPartitionConstraint(TupleDomain enforcedPartitionConstraint) + { + return new DeltaTableOptimizeHandle( + metadataEntry, + protocolEntry, + tableColumns, + originalPartitionColumns, + maxScannedFileSize, + currentVersion, + retriesEnabled, + requireNonNull(enforcedPartitionConstraint, "enforcedPartitionConstraint is null")); } @JsonProperty @@ -114,4 +132,10 @@ public boolean isRetriesEnabled() { return retriesEnabled; } + + @JsonProperty + public TupleDomain getEnforcedPartitionConstraint() + { + return enforcedPartitionConstraint; + } } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeLocalConcurrentWritesTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeLocalConcurrentWritesTest.java index 963c6a6a731c..5a7c602d658a 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeLocalConcurrentWritesTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeLocalConcurrentWritesTest.java @@ -29,6 +29,7 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.List; +import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; @@ -38,6 +39,7 @@ import java.util.stream.LongStream; import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.ImmutableSet.toImmutableSet; import static io.trino.plugin.deltalake.TestingDeltaLakeUtils.getConnectorService; import static io.trino.testing.QueryAssertions.getTrinoExceptionCause; import static io.trino.testing.TestingNames.randomNameSuffix; @@ -1551,6 +1553,236 @@ public void testConcurrentDeleteAndDeletePushdownAndNonBlindInsertsReconciliatio } } + @Test + public void testConcurrentOptimizeReconciliation() + throws Exception + { + int threads = 3; + CyclicBarrier barrier = new CyclicBarrier(threads); + ExecutorService executor = newFixedThreadPool(threads); + String tableName = "test_concurrent_optimize_table_" + randomNameSuffix(); + + assertUpdate("CREATE TABLE " + tableName + " (a, part) WITH (partitioned_by = ARRAY['part']) AS VALUES (1, 10), (11, 20), (21, 30)", 3); + // Add more files on each partition + assertUpdate("INSERT INTO " + tableName + " VALUES (2, 10), (12, 20), (22, 30)", 3); + Set beforeOptimizeActiveFiles = getActiveFiles(tableName); + try { + // The OPTIMIZE operations operate on non-overlapping partitions + executor.invokeAll(ImmutableList.>builder() + .add(() -> { + barrier.await(10, SECONDS); + getQueryRunner().execute("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE WHERE part = 10"); + return null; + }) + .add(() -> { + barrier.await(10, SECONDS); + getQueryRunner().execute("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE WHERE part = 20"); + return null; + }) + .add(() -> { + barrier.await(10, SECONDS); + getQueryRunner().execute("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE WHERE part = 30"); + return null; + }) + .build()) + .forEach(MoreFutures::getDone); + + // Verify OPTIMIZE happened, but table data didn't change + assertThat(beforeOptimizeActiveFiles).isNotEqualTo(getActiveFiles(tableName)); + assertThat(query("SELECT * FROM " + tableName)).matches("VALUES (1, 10), (2, 10), (11, 20), (12, 20), (21, 30), (22, 30)"); + assertQuery( + "SELECT operation, isolation_level, is_blind_append FROM \"" + tableName + "$history\"", + """ + VALUES + ('CREATE TABLE AS SELECT', 'WriteSerializable', true), + ('WRITE', 'WriteSerializable', true), + ('OPTIMIZE', 'WriteSerializable', false), + ('OPTIMIZE', 'WriteSerializable', false), + ('OPTIMIZE', 'WriteSerializable', false) + """); + } + finally { + assertUpdate("DROP TABLE " + tableName); + executor.shutdownNow(); + assertThat(executor.awaitTermination(10, SECONDS)).isTrue(); + } + } + + @Test + public void testConcurrentSerializableOptimizeReconciliationFailure() + throws Exception + { + int threads = 5; + CyclicBarrier barrier = new CyclicBarrier(threads); + ExecutorService executor = newFixedThreadPool(threads); + String tableName = "test_concurrent_serializable_optimize_reconciliation" + randomNameSuffix(); + + // TODO create the table through Trino when `isolation_level` table property can be set + registerTableFromResources(tableName, "deltalake/serializable_partitioned_table", getQueryRunner()); + + assertThat(query("SELECT * FROM " + tableName)).matches("VALUES (0, 10), (33, 40)"); + + try { + List> futures = IntStream.range(0, threads) + .mapToObj(threadNumber -> executor.submit(() -> { + barrier.await(10, SECONDS); + try { + // Optimizing concurrently is not permitted on the same partition on Serializable of WriteSerializable isolation level + getQueryRunner().execute("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE WHERE part = 10"); + return true; + } + catch (Exception e) { + RuntimeException trinoException = getTrinoExceptionCause(e); + try { + assertThat(trinoException).hasMessage("Failed to write Delta Lake transaction log entry"); + } + catch (Throwable verifyFailure) { + if (verifyFailure != e) { + verifyFailure.addSuppressed(e); + } + throw verifyFailure; + } + return false; + } + })) + .collect(toImmutableList()); + + long successfulOptimizeOperationsCount = futures.stream() + .map(MoreFutures::getFutureValue) + .filter(success -> success) + .count(); + assertThat(successfulOptimizeOperationsCount).isGreaterThanOrEqualTo(1); + assertThat(query("SELECT * FROM " + tableName)).matches("VALUES (0, 10), (33, 40)"); + } + finally { + assertUpdate("DROP TABLE " + tableName); + executor.shutdownNow(); + assertThat(executor.awaitTermination(10, SECONDS)).isTrue(); + } + } + + @Test + public void testConcurrentOptimizeAndBlindInsertsReconciliation() + throws Exception + { + int threads = 3; + CyclicBarrier barrier = new CyclicBarrier(threads); + ExecutorService executor = newFixedThreadPool(threads); + String tableName = "test_concurrent_optimize_and_inserts_table_" + randomNameSuffix(); + + assertUpdate("CREATE TABLE " + tableName + " (a, part) WITH (partitioned_by = ARRAY['part']) AS VALUES (1, 10), (21, 30)", 2); + assertUpdate("INSERT INTO " + tableName + " VALUES (22, 30)", 1); + Set beforeOptimizeActiveFilesOnPartition30 = computeActual("SELECT DISTINCT \"$path\" FROM " + tableName + " WHERE part = 30").getOnlyColumnAsSet().stream() + .map(String.class::cast) + .collect(toImmutableSet()); + + try { + executor.invokeAll(ImmutableList.>builder() + .add(() -> { + barrier.await(10, SECONDS); + getQueryRunner().execute("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE WHERE part > 20"); + return null; + }) + .add(() -> { + barrier.await(10, SECONDS); + getQueryRunner().execute("INSERT INTO " + tableName + " VALUES (8, 10)"); + return null; + }) + .add(() -> { + barrier.await(10, SECONDS); + getQueryRunner().execute("INSERT INTO " + tableName + " VALUES (11, 20)"); + return null; + }) + .build()) + .forEach(MoreFutures::getDone); + + // Verify OPTIMIZE happened, but table data didn't change + Set afterOptimizeActiveFilesOnPartition30 = computeActual("SELECT DISTINCT \"$path\" FROM " + tableName + " WHERE part = 30").getOnlyColumnAsSet().stream() + .map(String.class::cast) + .collect(toImmutableSet()); + assertThat(beforeOptimizeActiveFilesOnPartition30).isNotEqualTo(afterOptimizeActiveFilesOnPartition30); + + assertThat(query("SELECT * FROM " + tableName)).matches("VALUES (1, 10), (8, 10), (11, 20), (21, 30), (22, 30)"); + assertQuery( + "SELECT operation, isolation_level FROM \"" + tableName + "$history\"", + """ + VALUES + ('CREATE TABLE AS SELECT', 'WriteSerializable'), + ('OPTIMIZE', 'WriteSerializable'), + ('WRITE', 'WriteSerializable'), + ('WRITE', 'WriteSerializable'), + ('WRITE', 'WriteSerializable') + """); + } + finally { + assertUpdate("DROP TABLE " + tableName); + executor.shutdownNow(); + assertThat(executor.awaitTermination(10, SECONDS)).isTrue(); + } + } + + @Test + public void testConcurrentOptimizeAndNonBlindInsertsReconciliation() + throws Exception + { + int threads = 3; + CyclicBarrier barrier = new CyclicBarrier(threads); + ExecutorService executor = newFixedThreadPool(threads); + String tableName = "test_concurrent_optimize_and_inserts_table_" + randomNameSuffix(); + + assertUpdate("CREATE TABLE " + tableName + " (a, part) WITH (partitioned_by = ARRAY['part']) AS VALUES (1, 10), (11, 20), (21, 30)", 3); + // Add more files in the partition 10 + assertUpdate("INSERT INTO " + tableName + " VALUES (2, 10)", 1); + Set beforeOptimizeActiveFilesOnPartition10 = computeActual("SELECT DISTINCT \"$path\" FROM " + tableName + " WHERE part = 10").getOnlyColumnAsSet().stream() + .map(String.class::cast) + .collect(toImmutableSet()); + + try { + // The OPTIMIZE, DELETE and non-blind INSERT operations operate on non-overlapping partitions + executor.invokeAll(ImmutableList.>builder() + .add(() -> { + barrier.await(10, SECONDS); + getQueryRunner().execute("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE WHERE part = 10"); + return null; + }) + .add(() -> { + barrier.await(10, SECONDS); + getQueryRunner().execute("INSERT INTO " + tableName + " SELECT a + 1, part FROM " + tableName + " WHERE part = 20"); + return null; + }) + .add(() -> { + barrier.await(10, SECONDS); + // Use a non-partition filter as well to ensure the DELETE operation is not being pushed down + getQueryRunner().execute("DELETE FROM " + tableName + " WHERE part = 30 AND a BETWEEN 20 AND 30"); + return null; + }) + .build()) + .forEach(MoreFutures::getDone); + + // Verify OPTIMIZE happened, but table data didn't change + Set afterOptimizeActiveFilesOnPartition10 = computeActual("SELECT DISTINCT \"$path\" FROM " + tableName + " WHERE part = 10").getOnlyColumnAsSet().stream() + .map(String.class::cast) + .collect(toImmutableSet()); + assertThat(beforeOptimizeActiveFilesOnPartition10).isNotEqualTo(afterOptimizeActiveFilesOnPartition10); + assertThat(query("SELECT * FROM " + tableName)).matches("VALUES (1, 10), (2, 10), (11, 20), (12, 20)"); + assertQuery( + "SELECT operation, isolation_level, is_blind_append FROM \"" + tableName + "$history\"", + """ + VALUES + ('CREATE TABLE AS SELECT', 'WriteSerializable', true), + ('WRITE', 'WriteSerializable', true), + ('OPTIMIZE', 'WriteSerializable', false), + ('MERGE', 'WriteSerializable', false), + ('WRITE', 'WriteSerializable', false) + """); + } + finally { + assertUpdate("DROP TABLE " + tableName); + executor.shutdownNow(); + assertThat(executor.awaitTermination(10, SECONDS)).isTrue(); + } + } + protected void registerTableFromResources(String table, String resourcePath, QueryRunner queryRunner) throws IOException { @@ -1579,4 +1811,11 @@ protected void registerTableFromResources(String table, String resourcePath, Que queryRunner.execute(format("CALL system.register_table(CURRENT_SCHEMA, '%s', '%s')", table, tableLocation)); } + + private Set getActiveFiles(String tableName) + { + return computeActual("SELECT DISTINCT \"$path\" FROM " + tableName).getOnlyColumnAsSet().stream() + .map(String.class::cast) + .collect(toImmutableSet()); + } }