From e04b702fe65ac7f5e0dfaa6581ab3a4b2f83700f Mon Sep 17 00:00:00 2001 From: Marius Grama Date: Wed, 19 Jul 2023 00:46:39 +0200 Subject: [PATCH 1/3] Use same file naming pattern across Delta Lake sinks --- .../io/trino/plugin/deltalake/AbstractDeltaLakePageSink.java | 2 +- .../io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/AbstractDeltaLakePageSink.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/AbstractDeltaLakePageSink.java index 7679ddc46167..a97015522427 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/AbstractDeltaLakePageSink.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/AbstractDeltaLakePageSink.java @@ -357,7 +357,7 @@ private int[] getWriterIndexes(Page page) partitionName = Optional.of(partName); } - String fileName = session.getQueryId() + "-" + randomUUID(); + String fileName = session.getQueryId() + "_" + randomUUID(); filePath = filePath.appendPath(fileName); FileWriter fileWriter = createParquetFileWriter(filePath); diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java index d5e0248aae19..371230b64328 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java @@ -282,7 +282,7 @@ private record FileOperation(FileType fileType, String fileId, OperationType ope { public static FileOperation create(String path, OperationType operationType) { - Pattern dataFilePattern = Pattern.compile(".*/(?key=[^/]*/)(?\\d{8}_\\d{6}_\\d{5}_\\w{5})-(?[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})"); + Pattern dataFilePattern = Pattern.compile(".*/(?key=[^/]*/)(?\\d{8}_\\d{6}_\\d{5}_\\w{5})_(?[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})"); String fileName = path.replaceFirst(".*/", ""); if (path.matches(".*/_delta_log/_last_checkpoint")) { return new FileOperation(LAST_CHECKPOINT, fileName, operationType); From 8d7c4ebc337acad920d0fef93139a61a7e1b3314 Mon Sep 17 00:00:00 2001 From: Marius Grama Date: Wed, 9 Aug 2023 12:44:03 +0200 Subject: [PATCH 2/3] Rename class to `TestDeltaLakeDeleteCompatibility` --- ...abricksDelete.java => TestDeltaLakeDeleteCompatibility.java} | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/{TestDeltaLakeDatabricksDelete.java => TestDeltaLakeDeleteCompatibility.java} (99%) diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksDelete.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDeleteCompatibility.java similarity index 99% rename from testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksDelete.java rename to testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDeleteCompatibility.java index a886a4e7106f..18ab46227eea 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksDelete.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDeleteCompatibility.java @@ -33,7 +33,7 @@ import static io.trino.tests.product.utils.QueryExecutors.onTrino; import static org.assertj.core.api.Assertions.assertThat; -public class TestDeltaLakeDatabricksDelete +public class TestDeltaLakeDeleteCompatibility extends BaseTestDeltaLakeS3Storage { @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, PROFILE_SPECIFIC_TESTS}) From 29ba6d3a1702deb96f26e36fcfebd6158c91dbc9 Mon Sep 17 00:00:00 2001 From: Marius Grama Date: Wed, 19 Jul 2023 00:44:09 +0200 Subject: [PATCH 3/3] Push down DELETE for enforceable filters Perform DELETE only on the metadata of the Delta Lake table when the delete filter does not touch any data columns. Sample queries affected by this enhancement are: ``` DELETE FROM table DELETE FROM table WHERE true DELETE FROM partitioned_table WHERE part1=value1 AND part2=value2 DELETE FROM partitioned_table WHERE part2=value2 ``` Given that this operation is performed only on the metadata layer, when there are `add` file entries in the transaction log of the table without statistics containing number of records, then number of deleted records will not be returned. --- .../plugin/deltalake/DeltaLakeMetadata.java | 71 +++++++++++++- .../deltalake/TestDeltaLakeConnectorTest.java | 93 +++++++++++++++++++ .../plugin/deltalake/TestDeltaLakeDelete.java | 11 ++- .../TestDeltaLakeFileOperations.java | 91 ++++++++++++++++++ .../deltalake/TestDeltaLakeSystemTables.java | 2 +- .../TestDeltaLakeDeleteCompatibility.java | 37 ++++++++ 6 files changed, 300 insertions(+), 5 deletions(-) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index 5cd616595ef3..1c1fc2c5d368 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -197,6 +197,7 @@ import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isProjectionPushdownEnabled; import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isQueryPartitionFilterRequired; import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isTableStatisticsEnabled; +import static io.trino.plugin.deltalake.DeltaLakeSplitManager.partitionMatchesPredicate; import static io.trino.plugin.deltalake.DeltaLakeTableProperties.CHANGE_DATA_FEED_ENABLED_PROPERTY; import static io.trino.plugin.deltalake.DeltaLakeTableProperties.CHECKPOINT_INTERVAL_PROPERTY; import static io.trino.plugin.deltalake.DeltaLakeTableProperties.COLUMN_MAPPING_MODE_PROPERTY; @@ -315,7 +316,7 @@ public class DeltaLakeMetadata public static final String INSERT_OPERATION = "WRITE"; public static final String MERGE_OPERATION = "MERGE"; public static final String UPDATE_OPERATION = "UPDATE"; // used by old Trino versions and Spark - public static final String DELETE_OPERATION = "DELETE"; // used by old Trino versions and Spark + public static final String DELETE_OPERATION = "DELETE"; // used Trino for whole table/partition deletes as well as Spark public static final String OPTIMIZE_OPERATION = "OPTIMIZE"; public static final String SET_TBLPROPERTIES_OPERATION = "SET TBLPROPERTIES"; public static final String CHANGE_COLUMN_OPERATION = "CHANGE COLUMN"; @@ -3258,6 +3259,74 @@ public WriterScalingOptions getInsertWriterScalingOptions(ConnectorSession sessi return WriterScalingOptions.ENABLED; } + @Override + public Optional applyDelete(ConnectorSession session, ConnectorTableHandle handle) + { + DeltaLakeTableHandle tableHandle = (DeltaLakeTableHandle) handle; + if (tableHandle.getMetadataEntry().isChangeDataFeedEnabled().orElse(false)) { + // For tables with CDF enabled the DELETE operation can't be performed only on metadata files + return Optional.empty(); + } + + return Optional.of(tableHandle); + } + + @Override + public OptionalLong executeDelete(ConnectorSession session, ConnectorTableHandle handle) + { + DeltaLakeTableHandle tableHandle = (DeltaLakeTableHandle) handle; + if (isAppendOnly(tableHandle.getMetadataEntry())) { + throw new TrinoException(NOT_SUPPORTED, "Cannot modify rows from a table with '" + APPEND_ONLY_CONFIGURATION_KEY + "' set to true"); + } + + String tableLocation = tableHandle.location(); + List activeFiles = getAddFileEntriesMatchingEnforcedPartitionConstraint(session, tableHandle); + + try { + TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriter(session, tableLocation); + + long writeTimestamp = Instant.now().toEpochMilli(); + TrinoFileSystem fileSystem = fileSystemFactory.create(session); + long currentVersion = getMandatoryCurrentVersion(fileSystem, tableLocation); + if (currentVersion != tableHandle.getReadVersion()) { + throw new TransactionConflictException(format("Conflicting concurrent writes found. Expected transaction log version: %s, actual version: %s", tableHandle.getReadVersion(), currentVersion)); + } + long commitVersion = currentVersion + 1; + transactionLogWriter.appendCommitInfoEntry(getCommitInfoEntry(session, commitVersion, writeTimestamp, DELETE_OPERATION, tableHandle.getReadVersion())); + + long deletedRecords = 0L; + boolean allDeletedFilesStatsPresent = true; + for (AddFileEntry addFileEntry : activeFiles) { + transactionLogWriter.appendRemoveFileEntry(new RemoveFileEntry(addFileEntry.getPath(), writeTimestamp, true)); + + Optional fileRecords = addFileEntry.getStats().flatMap(DeltaLakeFileStatistics::getNumRecords); + allDeletedFilesStatsPresent &= fileRecords.isPresent(); + deletedRecords += fileRecords.orElse(0L); + } + + transactionLogWriter.flush(); + writeCheckpointIfNeeded(session, tableHandle.getSchemaTableName(), tableHandle.location(), tableHandle.getMetadataEntry().getCheckpointInterval(), commitVersion); + return allDeletedFilesStatsPresent ? OptionalLong.of(deletedRecords) : OptionalLong.empty(); + } + catch (Exception e) { + throw new TrinoException(DELTA_LAKE_BAD_WRITE, "Failed to write Delta Lake transaction log entry", e); + } + } + + private List getAddFileEntriesMatchingEnforcedPartitionConstraint(ConnectorSession session, DeltaLakeTableHandle tableHandle) + { + TableSnapshot tableSnapshot = getSnapshot(tableHandle.getSchemaTableName(), tableHandle.getLocation(), session); + List validDataFiles = transactionLogAccess.getActiveFiles(tableSnapshot, session); + TupleDomain enforcedPartitionConstraint = tableHandle.getEnforcedPartitionConstraint(); + if (enforcedPartitionConstraint.isAll()) { + return validDataFiles; + } + Map enforcedDomains = enforcedPartitionConstraint.getDomains().orElseThrow(); + return validDataFiles.stream() + .filter(addAction -> partitionMatchesPredicate(addAction.getCanonicalPartitionValues(), enforcedDomains)) + .collect(toImmutableList()); + } + private static Map toDeltaLakeColumnStatistics(Collection computedStatistics) { return computedStatistics.stream() diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConnectorTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConnectorTest.java index c9149387620e..12276e30f0c4 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConnectorTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConnectorTest.java @@ -21,6 +21,9 @@ import io.trino.execution.QueryInfo; import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.ColumnMappingMode; import io.trino.plugin.tpch.TpchPlugin; +import io.trino.sql.planner.plan.TableDeleteNode; +import io.trino.sql.planner.plan.TableFinishNode; +import io.trino.sql.planner.plan.TableWriterNode; import io.trino.testing.BaseConnectorTest; import io.trino.testing.DataProviders; import io.trino.testing.DistributedQueryRunner; @@ -60,6 +63,7 @@ import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.TRANSACTION_LOG_DIRECTORY; import static io.trino.plugin.tpch.TpchMetadata.TINY_SCHEMA_NAME; import static io.trino.spi.type.VarcharType.VARCHAR; +import static io.trino.sql.planner.optimizations.PlanNodeSearcher.searchFrom; import static io.trino.testing.DataProviders.toDataProvider; import static io.trino.testing.MaterializedResult.resultBuilder; import static io.trino.testing.QueryAssertions.copyTpchTables; @@ -77,6 +81,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; public class TestDeltaLakeConnectorTest extends BaseConnectorTest @@ -1820,6 +1825,94 @@ public void testTableWithTrailingSlashLocation(boolean partitioned) assertUpdate("DROP TABLE " + tableName); } + @Test(dataProvider = "deleteFiltersForTable") + public void testDeleteWithFilter(String createTableSql, String deleteFilter, boolean pushDownDelete) + { + String table = "delete_with_filter_" + randomNameSuffix(); + assertUpdate(format(createTableSql, table, bucketName, table)); + + assertUpdate(format("INSERT INTO %s (customer, purchases, address) VALUES ('Aaron', 5, 'Antioch'), ('Bill', 7, 'Antioch'), ('Mary', 10, 'Adelphi'), ('Aaron', 3, 'Dallas')", table), 4); + + assertUpdate( + getSession(), + format("DELETE FROM %s WHERE %s", table, deleteFilter), + 2, + plan -> { + if (pushDownDelete) { + boolean tableDelete = searchFrom(plan.getRoot()).where(node -> node instanceof TableDeleteNode).matches(); + assertTrue(tableDelete, "A TableDeleteNode should be present"); + } + else { + TableFinishNode finishNode = searchFrom(plan.getRoot()) + .where(TableFinishNode.class::isInstance) + .findOnlyElement(); + assertTrue(finishNode.getTarget() instanceof TableWriterNode.MergeTarget, "Delete operation should be performed through MERGE mechanism"); + } + }); + assertQuery("SELECT customer, purchases, address FROM " + table, "VALUES ('Mary', 10, 'Adelphi'), ('Aaron', 3, 'Dallas')"); + assertUpdate("DROP TABLE " + table); + } + + @DataProvider + public Object[][] deleteFiltersForTable() + { + return new Object[][]{ + { + "CREATE TABLE %s (customer VARCHAR, purchases INT, address VARCHAR) WITH (location = 's3://%s/%s')", + "address = 'Antioch'", + false + }, + { + // delete filter applied on function over non-partitioned field + "CREATE TABLE %s (customer VARCHAR, address VARCHAR, purchases INT) WITH (location = 's3://%s/%s', partitioned_by = ARRAY['address'])", + "starts_with(address, 'Antioch')", + false + }, + { + // delete filter applied on partitioned field + "CREATE TABLE %s (customer VARCHAR, address VARCHAR, purchases INT) WITH (location = 's3://%s/%s', partitioned_by = ARRAY['address'])", + "address = 'Antioch'", + true + }, + { + // delete filter applied on partitioned field and on synthesized field + "CREATE TABLE %s (customer VARCHAR, address VARCHAR, purchases INT) WITH (location = 's3://%s/%s', partitioned_by = ARRAY['address'])", + "address = 'Antioch' AND \"$file_size\" > 0", + false + }, + { + // delete filter applied on function over partitioned field + "CREATE TABLE %s (customer VARCHAR, address VARCHAR, purchases INT) WITH (location = 's3://%s/%s', partitioned_by = ARRAY['address'])", + "starts_with(address, 'Antioch')", + false + }, + { + // delete filter applied on non-partitioned field + "CREATE TABLE %s (customer VARCHAR, address VARCHAR, purchases INT) WITH (location = 's3://%s/%s', partitioned_by = ARRAY['customer'])", + "address = 'Antioch'", + false + }, + { + // delete filter fully applied on composed partition + "CREATE TABLE %s (purchases INT, customer VARCHAR, address VARCHAR) WITH (location = 's3://%s/%s', partitioned_by = ARRAY['address', 'customer'])", + "address = 'Antioch' AND (customer = 'Aaron' OR customer = 'Bill')", + true + }, + { + // delete filter applied only partly on first partitioned field + "CREATE TABLE %s (purchases INT, address VARCHAR, customer VARCHAR) WITH (location = 's3://%s/%s', partitioned_by = ARRAY['address', 'customer'])", + "address = 'Antioch'", + true + }, + { + // delete filter applied only partly on second partitioned field + "CREATE TABLE %s (purchases INT, address VARCHAR, customer VARCHAR) WITH (location = 's3://%s/%s', partitioned_by = ARRAY['customer', 'address'])", + "address = 'Antioch'", + true + }, + }; + } + @Override protected void verifyAddNotNullColumnToNonEmptyTableFailurePermissible(Throwable e) { diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeDelete.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeDelete.java index 7aaa3cd89661..bf2d2c60c667 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeDelete.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeDelete.java @@ -181,9 +181,14 @@ public void testDeleteAllDatabricks() public void testDeleteAllOssDeltaLake() { String tableName = "test_delete_all_deltalake" + randomNameSuffix(); - Set originalFiles = testDeleteAllAndReturnInitialDataLakeFilesSet( - tableName, - "io/trino/plugin/deltalake/testing/resources/ossdeltalake"); + hiveMinioDataLake.copyResources("io/trino/plugin/deltalake/testing/resources/ossdeltalake/customer", tableName); + Set originalFiles = ImmutableSet.copyOf(hiveMinioDataLake.listFiles(tableName)); + getQueryRunner().execute(format("CALL system.register_table('%s', '%s', '%s')", SCHEMA, tableName, getLocationForTable(tableName))); + assertQuery("SELECT * FROM " + tableName, "SELECT * FROM customer"); + // There are `add` files in the transaction log without stats, reason why the DELETE statement on the whole table + // performed on the basis of metadata does not return the number of deleted records + assertUpdate("DELETE FROM " + tableName); + assertQuery("SELECT count(*) FROM " + tableName, "VALUES 0"); Set expected = ImmutableSet.builder() .addAll(originalFiles) .add(tableName + "/_delta_log/00000000000000000001.json") diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java index 371230b64328..4595bb75f485 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java @@ -34,7 +34,10 @@ import java.util.regex.Pattern; import static com.google.inject.util.Modules.EMPTY_MODULE; +import static io.trino.filesystem.TrackingFileSystemFactory.OperationType.INPUT_FILE_EXISTS; +import static io.trino.filesystem.TrackingFileSystemFactory.OperationType.INPUT_FILE_GET_LENGTH; import static io.trino.filesystem.TrackingFileSystemFactory.OperationType.INPUT_FILE_NEW_STREAM; +import static io.trino.filesystem.TrackingFileSystemFactory.OperationType.OUTPUT_FILE_CREATE; import static io.trino.plugin.base.util.Closables.closeAllSuppress; import static io.trino.plugin.deltalake.TestDeltaLakeFileOperations.FileType.CDF_DATA; import static io.trino.plugin.deltalake.TestDeltaLakeFileOperations.FileType.DATA; @@ -155,6 +158,94 @@ public void testReadWholePartition() assertUpdate("DROP TABLE test_read_part_key"); } + @Test + public void testDeleteWholePartition() + { + assertUpdate("DROP TABLE IF EXISTS test_delete_part_key"); + assertUpdate("CREATE TABLE test_delete_part_key(key varchar, data varchar) WITH (partitioned_by=ARRAY['key'])"); + + // Create multiple files per partition + assertUpdate("INSERT INTO test_delete_part_key(key, data) VALUES ('p1', '1-abc'), ('p1', '1-def'), ('p2', '2-abc'), ('p2', '2-def')", 4); + assertUpdate("INSERT INTO test_delete_part_key(key, data) VALUES ('p1', '1-baz'), ('p2', '2-baz')", 2); + + // Delete partition column only + assertUpdate("CALL system.flush_metadata_cache(schema_name => CURRENT_SCHEMA, table_name => 'test_delete_part_key')"); + assertFileSystemAccesses( + "DELETE FROM test_delete_part_key WHERE key = 'p1'", + ImmutableMultiset.builder() + .addCopies(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", INPUT_FILE_NEW_STREAM), 5) // TODO (https://github.com/trinodb/trino/issues/16782) should be checked once per query + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", INPUT_FILE_EXISTS), 1) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", INPUT_FILE_EXISTS), 1) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", INPUT_FILE_EXISTS), 1) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", INPUT_FILE_NEW_STREAM), 4) // TODO (https://github.com/trinodb/trino/issues/16780) why is last transaction log accessed more times than others? + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", INPUT_FILE_NEW_STREAM), 1) + .build()); + + assertUpdate("DROP TABLE test_delete_part_key"); + } + + @Test + public void testDeleteWholeTable() + { + assertUpdate("DROP TABLE IF EXISTS test_delete_whole_table"); + assertUpdate("CREATE TABLE test_delete_whole_table(key varchar, data varchar)"); + + // Create multiple files per partition + assertUpdate("INSERT INTO test_delete_whole_table(key, data) VALUES ('p1', '1-abc'), ('p1', '1-def'), ('p2', '2-abc'), ('p2', '2-def')", 4); + assertUpdate("INSERT INTO test_delete_whole_table(key, data) VALUES ('p1', '1-baz'), ('p2', '2-baz')", 2); + + assertUpdate("CALL system.flush_metadata_cache(schema_name => CURRENT_SCHEMA, table_name => 'test_delete_whole_table')"); + assertFileSystemAccesses( + "DELETE FROM test_delete_whole_table WHERE true", + ImmutableMultiset.builder() + .addCopies(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", INPUT_FILE_NEW_STREAM), 5) // TODO (https://github.com/trinodb/trino/issues/16782) should be checked once per query + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", INPUT_FILE_EXISTS), 1) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", INPUT_FILE_EXISTS), 1) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", INPUT_FILE_EXISTS), 1) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", INPUT_FILE_NEW_STREAM), 4) // TODO (https://github.com/trinodb/trino/issues/16780) why is last transaction log accessed more times than others? + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", INPUT_FILE_NEW_STREAM), 1) + .build()); + + assertUpdate("DROP TABLE test_delete_whole_table"); + } + + @Test + public void testDeleteWithNonPartitionFilter() + { + assertUpdate("CREATE TABLE test_delete_with_non_partition_filter (page_url VARCHAR, key VARCHAR, views INTEGER) WITH (partitioned_by=ARRAY['key'])"); + assertUpdate("INSERT INTO test_delete_with_non_partition_filter VALUES('url1', 'domain1', 1)", 1); + assertUpdate("INSERT INTO test_delete_with_non_partition_filter VALUES('url2', 'domain2', 2)", 1); + assertUpdate("INSERT INTO test_delete_with_non_partition_filter VALUES('url3', 'domain3', 3)", 1); + + assertUpdate("CALL system.flush_metadata_cache(schema_name => CURRENT_SCHEMA, table_name => 'test_delete_with_non_partition_filter')"); + assertFileSystemAccesses( + "DELETE FROM test_delete_with_non_partition_filter WHERE page_url ='url1'", + ImmutableMultiset.builder() + .addCopies(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", INPUT_FILE_NEW_STREAM), 7) // TODO (https://github.com/trinodb/trino/issues/16782) should be checked once per query + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", INPUT_FILE_EXISTS), 2) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", INPUT_FILE_EXISTS), 2) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", INPUT_FILE_EXISTS), 2) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", INPUT_FILE_EXISTS), 2) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", INPUT_FILE_NEW_STREAM), 5) // TODO (https://github.com/trinodb/trino/issues/16780) why is last transaction log accessed more times than others? + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000005.json", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(DATA, "key=domain1/", INPUT_FILE_NEW_STREAM), 2) + .addCopies(new FileOperation(DATA, "key=domain1/", INPUT_FILE_GET_LENGTH), 2) + .addCopies(new FileOperation(DATA, "key=domain1/", OUTPUT_FILE_CREATE), 1) + .build()); + + assertUpdate("DROP TABLE test_delete_with_non_partition_filter"); + } + @Test public void testHistorySystemTable() { diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSystemTables.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSystemTables.java index c34f3fc928e3..914ffa0d8d58 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSystemTables.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSystemTables.java @@ -78,7 +78,7 @@ public void testHistoryTable() .matches(""" VALUES (BIGINT '5', VARCHAR 'OPTIMIZE', BIGINT '4', VARCHAR 'WriteSerializable', true), - (BIGINT '4', VARCHAR 'MERGE', BIGINT '3', VARCHAR 'WriteSerializable', true), + (BIGINT '4', VARCHAR 'DELETE', BIGINT '3', VARCHAR 'WriteSerializable', true), (BIGINT '3', VARCHAR 'MERGE', BIGINT '2', VARCHAR 'WriteSerializable', true), (BIGINT '2', VARCHAR 'WRITE', BIGINT '1', VARCHAR 'WriteSerializable', true), (BIGINT '1', VARCHAR 'WRITE', BIGINT '0', VARCHAR 'WriteSerializable', true), diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDeleteCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDeleteCompatibility.java index 18ab46227eea..7aeb1b017c0d 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDeleteCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDeleteCompatibility.java @@ -13,6 +13,7 @@ */ package io.trino.tests.product.deltalake; +import io.trino.testing.DataProviders; import io.trino.testng.services.Flaky; import org.testng.annotations.Test; @@ -36,6 +37,39 @@ public class TestDeltaLakeDeleteCompatibility extends BaseTestDeltaLakeS3Storage { + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, PROFILE_SPECIFIC_TESTS}, dataProviderClass = DataProviders.class, dataProvider = "trueFalse") + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) + public void testDeleteOnEnforcedConstraintsReturnsRowsCount(boolean partitioned) + { + String tableName = "test_delete_push_down_" + randomNameSuffix(); + onDelta().executeQuery("" + + "CREATE TABLE default." + tableName + + "(v INT, p INT)" + + "USING delta " + + (partitioned ? "PARTITIONED BY (p)" : "") + + "LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'"); + + try { + onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (1, 10), (2, 10), (11, 20), (21, 30), (22, 30)"); + onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (3, 10), (12, 20)"); + if (partitioned) { + assertThat(onTrino().executeQuery("DELETE FROM default." + tableName + " WHERE p = 10")) + .containsOnly(row(3)); + assertThat(onTrino().executeQuery("DELETE FROM default." + tableName)) + .containsOnly(row(4)); + } + else { + assertThat(onTrino().executeQuery("DELETE FROM default." + tableName)) + .containsOnly(row(7)); + } + + assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName)).hasNoRows(); + } + finally { + dropDeltaTableWithRetry("default." + tableName); + } + } + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, PROFILE_SPECIFIC_TESTS}) @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) public void testDeleteOnAppendOnlyTableFails() @@ -53,6 +87,9 @@ public void testDeleteOnAppendOnlyTableFails() .hasMessageContaining("This table is configured to only allow appends"); assertQueryFailure(() -> onTrino().executeQuery("DELETE FROM default." + tableName + " WHERE a = 1")) .hasMessageContaining("Cannot modify rows from a table with 'delta.appendOnly' set to true"); + // Whole table deletes should be disallowed as well + assertQueryFailure(() -> onTrino().executeQuery("DELETE FROM default." + tableName)) + .hasMessageContaining("Cannot modify rows from a table with 'delta.appendOnly' set to true"); assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName)) .containsOnly(row(1, 11), row(2, 12));