From 98e631d28f262db01fd249f95d505ddc4972d3ca Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Mon, 2 May 2022 19:23:51 +0900 Subject: [PATCH] Allow executing optimize procedure for Iceberg v2 table --- .../trino/plugin/iceberg/IcebergMetadata.java | 7 ++- .../procedure/IcebergOptimizeHandle.java | 10 ++++ .../iceberg/BaseIcebergConnectorTest.java | 52 ++++++++++++++++--- .../TestIcebergSparkCompatibility.java | 22 ++++---- 4 files changed, 72 insertions(+), 19 deletions(-) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 26578943fd38..c2ce359fc14a 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -215,7 +215,7 @@ public class IcebergMetadata { private static final Logger log = Logger.get(IcebergMetadata.class); private static final Pattern PATH_PATTERN = Pattern.compile("(.*)/[^/]+"); - private static final int OPTIMIZE_MAX_SUPPORTED_TABLE_VERSION = 1; + private static final int OPTIMIZE_MAX_SUPPORTED_TABLE_VERSION = 2; private static final int CLEANING_UP_PROCEDURES_MAX_SUPPORTED_TABLE_VERSION = 2; private static final String RETENTION_THRESHOLD = "retention_threshold"; public static final Set UPDATABLE_TABLE_PROPERTIES = ImmutableSet.of(FILE_FORMAT_PROPERTY, FORMAT_VERSION_PROPERTY, PARTITIONING_PROPERTY); @@ -791,6 +791,7 @@ private Optional getTableHandleForOptimize(Connecto tableHandle.getSchemaTableName(), OPTIMIZE, new IcebergOptimizeHandle( + tableHandle.getSnapshotId().orElseThrow(), SchemaParser.toJson(icebergTable.schema()), PartitionSpecParser.toJson(icebergTable.spec()), getColumns(icebergTable.schema(), typeManager), @@ -875,7 +876,6 @@ private BeginTableExecuteResult OPTIMIZE_MAX_SUPPORTED_TABLE_VERSION) { - // Currently, Optimize would fail when position deletes files are present in Iceberg table throw new TrinoException(NOT_SUPPORTED, format( "%s is not supported for Iceberg table format version > %d. Table %s format version is %s.", OPTIMIZE.name(), @@ -959,6 +959,9 @@ private void finishOptimize(ConnectorSession session, IcebergTableExecuteHandle } RewriteFiles rewriteFiles = transaction.newRewrite(); rewriteFiles.rewriteFiles(scannedFiles, newFiles); + // Table.snapshot method returns null if there is no matching snapshot + Snapshot snapshot = requireNonNull(icebergTable.snapshot(optimizeHandle.getSnapshotId()), "snapshot is null"); + rewriteFiles.validateFromSnapshot(snapshot.snapshotId()); rewriteFiles.commit(); transaction.commitTransaction(); transaction = null; diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergOptimizeHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergOptimizeHandle.java index 5aec626a006b..31a94b32730b 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergOptimizeHandle.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergOptimizeHandle.java @@ -30,6 +30,7 @@ public class IcebergOptimizeHandle extends IcebergProcedureHandle { + private final long snapshotId; private final String schemaAsJson; private final String partitionSpecAsJson; private final List tableColumns; @@ -40,6 +41,7 @@ public class IcebergOptimizeHandle @JsonCreator public IcebergOptimizeHandle( + long snapshotId, String schemaAsJson, String partitionSpecAsJson, List tableColumns, @@ -48,6 +50,7 @@ public IcebergOptimizeHandle( DataSize maxScannedFileSize, boolean retriesEnabled) { + this.snapshotId = snapshotId; this.schemaAsJson = requireNonNull(schemaAsJson, "schemaAsJson is null"); this.partitionSpecAsJson = requireNonNull(partitionSpecAsJson, "partitionSpecAsJson is null"); this.tableColumns = ImmutableList.copyOf(requireNonNull(tableColumns, "tableColumns is null")); @@ -57,6 +60,12 @@ public IcebergOptimizeHandle( this.retriesEnabled = retriesEnabled; } + @JsonProperty + public long getSnapshotId() + { + return snapshotId; + } + @JsonProperty public String getSchemaAsJson() { @@ -103,6 +112,7 @@ public boolean isRetriesEnabled() public String toString() { return toStringHelper(this) + .add("snapshotId", snapshotId) .add("schemaAsJson", schemaAsJson) .add("partitionSpecAsJson", partitionSpecAsJson) .add("tableColumns", tableColumns) diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index fffad0085648..6f790c1fb85e 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -3038,12 +3038,12 @@ public void testProjectionPushdownOnPartitionedTableWithComments() assertUpdate("DROP TABLE IF EXISTS test_projection_pushdown_comments"); } - @Test - public void testOptimize() + @Test(dataProvider = "tableFormatVersion") + public void testOptimize(int formatVersion) throws Exception { String tableName = "test_optimize_" + randomTableSuffix(); - assertUpdate("CREATE TABLE " + tableName + " (key integer, value varchar) WITH (format_version = 1)"); + assertUpdate("CREATE TABLE " + tableName + " (key integer, value varchar) WITH (format_version = " + formatVersion + ")"); // DistributedQueryRunner sets node-scheduler.include-coordinator by default, so include coordinator int workerCount = getQueryRunner().getNodeCount(); @@ -3092,8 +3092,8 @@ public void testOptimize() assertUpdate("DROP TABLE " + tableName); } - @Test - public void testOptimizeForPartitionedTable() + @Test(dataProvider = "tableFormatVersion") + public void testOptimizeForPartitionedTable(int formatVersion) throws IOException { // This test will have its own session to make sure partitioning is indeed forced and is not a result @@ -3105,7 +3105,7 @@ public void testOptimizeForPartitionedTable() .setSystemProperty("preferred_write_partitioning_min_number_of_partitions", "100") .build(); String tableName = "test_repartitiong_during_optimize_" + randomTableSuffix(); - assertUpdate(session, "CREATE TABLE " + tableName + " (key varchar, value integer) WITH (format_version = 1, partitioning = ARRAY['key'])"); + assertUpdate(session, "CREATE TABLE " + tableName + " (key varchar, value integer) WITH (format_version = " + formatVersion + ", partitioning = ARRAY['key'])"); // optimize an empty table assertQuerySucceeds(session, "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE"); @@ -3136,6 +3136,41 @@ public void testOptimizeForPartitionedTable() assertUpdate("DROP TABLE " + tableName); } + @DataProvider + public Object[][] tableFormatVersion() + { + return IntStream.rangeClosed(IcebergConfig.FORMAT_VERSION_SUPPORT_MIN, IcebergConfig.FORMAT_VERSION_SUPPORT_MAX).boxed() + .collect(DataProviders.toDataProvider()); + } + + @Test + public void testOptimizeTableAfterDeleteWithFormatVersion2() + { + String tableName = "test_optimize_" + randomTableSuffix(); + assertUpdate("CREATE TABLE " + tableName + " AS SELECT * FROM nation", 25); + + List initialFiles = getActiveFiles(tableName); + + assertUpdate("DELETE FROM " + tableName + " WHERE nationkey = 7", 1); + + // Verify that delete files exists + assertQuery( + "SELECT summary['total-delete-files'] FROM \"" + tableName + "$snapshots\" WHERE snapshot_id = " + getCurrentSnapshotId(tableName), + "VALUES '1'"); + + computeActual("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE"); + + List updatedFiles = getActiveFiles(tableName); + assertThat(updatedFiles) + .hasSize(1) + .isNotEqualTo(initialFiles); + + assertThat(query("SELECT * FROM " + tableName)) + .matches("SELECT * FROM nation WHERE nationkey != 7"); + + assertUpdate("DROP TABLE " + tableName); + } + private List getActiveFiles(String tableName) { return computeActual(format("SELECT file_path FROM \"%s$files\"", tableName)).getOnlyColumn() @@ -3531,6 +3566,11 @@ private List getSnapshotIds(String tableName) .collect(toUnmodifiableList()); } + private long getCurrentSnapshotId(String tableName) + { + return (long) computeScalar("SELECT snapshot_id FROM \"" + tableName + "$snapshots\" ORDER BY committed_at DESC LIMIT 1"); + } + private Path getIcebergTableDataPath(String tableName) { return getIcebergTablePath(tableName, "data"); diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java index 123aae081f1e..33b61df69b7d 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java @@ -1587,19 +1587,19 @@ public void testMissingMetrics() } @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) - public void testOptimizeFailsOnV2IcebergTable() + public void testOptimizeOnV2IcebergTable() { - String tableName = format("test_optimize_fails_on_v2_iceberg_table_%s", randomTableSuffix()); + String tableName = format("test_optimize_on_v2_iceberg_table_%s", randomTableSuffix()); String sparkTableName = sparkTableName(tableName); String trinoTableName = trinoTableName(tableName); - onSpark().executeQuery("CREATE TABLE " + sparkTableName + "(a INT, b INT) " + "USING ICEBERG PARTITIONED BY (b) " + "TBLPROPERTIES ('format-version'='2', 'write.delete.mode'='merge-on-read')"); onSpark().executeQuery("INSERT INTO " + sparkTableName + " VALUES (1, 2), (2, 2), (3, 2), (11, 12), (12, 12), (13, 12)"); + onTrino().executeQuery(format("ALTER TABLE %s EXECUTE OPTIMIZE", trinoTableName)); - assertQueryFailure(() -> onTrino().executeQuery(format("ALTER TABLE %s EXECUTE OPTIMIZE", trinoTableName))) - .hasMessageContaining("is not supported for Iceberg table format version > 1"); + assertThat(onSpark().executeQuery("SELECT * FROM " + sparkTableName)) + .containsOnly(row(1, 2), row(2, 2), row(3, 2), row(11, 12), row(12, 12), row(13, 12)); } private static String escapeSparkString(String value) @@ -1742,15 +1742,15 @@ public void testSparkReadsTrinoTableAfterCleaningUp(StorageFormat storageFormat) onTrino().executeQuery("DROP TABLE " + trinoTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats") - public void testSparkReadsTrinoTableAfterOptimizeAndCleaningUp(StorageFormat storageFormat) + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormatsWithSpecVersion") + public void testSparkReadsTrinoTableAfterOptimizeAndCleaningUp(StorageFormat storageFormat, int specVersion) { String baseTableName = "test_spark_reads_trino_partitioned_table_after_expiring_snapshots_after_optimize" + storageFormat; String trinoTableName = trinoTableName(baseTableName); String sparkTableName = sparkTableName(baseTableName); onTrino().executeQuery("DROP TABLE IF EXISTS " + trinoTableName); - onTrino().executeQuery(format("CREATE TABLE %s (_string VARCHAR, _bigint BIGINT) WITH (partitioning = ARRAY['_string'], format = '%s', format_version = 1)", trinoTableName, storageFormat)); + onTrino().executeQuery(format("CREATE TABLE %s (_string VARCHAR, _bigint BIGINT) WITH (partitioning = ARRAY['_string'], format = '%s', format_version = %s)", trinoTableName, storageFormat, specVersion)); // separate inserts give us snapshot per insert onTrino().executeQuery(format("INSERT INTO %s VALUES ('a', 1001)", trinoTableName)); onTrino().executeQuery(format("INSERT INTO %s VALUES ('a', 1002)", trinoTableName)); @@ -1786,15 +1786,15 @@ public void testSparkReadsTrinoTableAfterOptimizeAndCleaningUp(StorageFormat sto onTrino().executeQuery("DROP TABLE " + trinoTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats") - public void testTrinoReadsTrinoTableWithSparkDeletesAfterOptimizeAndCleanUp(StorageFormat storageFormat) + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormatsWithSpecVersion") + public void testTrinoReadsTrinoTableWithSparkDeletesAfterOptimizeAndCleanUp(StorageFormat storageFormat, int specVersion) { String baseTableName = "test_spark_reads_trino_partitioned_table_with_deletes_after_expiring_snapshots_after_optimize" + storageFormat; String trinoTableName = trinoTableName(baseTableName); String sparkTableName = sparkTableName(baseTableName); onTrino().executeQuery("DROP TABLE IF EXISTS " + trinoTableName); - onTrino().executeQuery(format("CREATE TABLE %s (_string VARCHAR, _bigint BIGINT) WITH (partitioning = ARRAY['_string'], format = '%s', format_version = 1)", trinoTableName, storageFormat)); + onTrino().executeQuery(format("CREATE TABLE %s (_string VARCHAR, _bigint BIGINT) WITH (partitioning = ARRAY['_string'], format = '%s', format_version = %s)", trinoTableName, storageFormat, specVersion)); // separate inserts give us snapshot per insert onTrino().executeQuery(format("INSERT INTO %s VALUES ('a', 1001)", trinoTableName)); onTrino().executeQuery(format("INSERT INTO %s VALUES ('a', 1002)", trinoTableName));