diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 82cf42bfa376..4ea642feaca1 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -871,6 +871,11 @@ public Optional getTableHandleForExecute( RetryMode retryMode) { IcebergTableHandle tableHandle = (IcebergTableHandle) connectorTableHandle; + checkArgument(tableHandle.getTableType() == DATA, "Cannot execute table procedure %s on non-DATA table: %s", procedureName, tableHandle.getTableType()); + Table icebergTable = catalog.loadTable(session, tableHandle.getSchemaTableName()); + if (tableHandle.getSnapshotId().isPresent() && (tableHandle.getSnapshotId().get() != icebergTable.currentSnapshot().snapshotId())) { + throw new TrinoException(NOT_SUPPORTED, "Cannot execute table procedure %s on old snapshot %s".formatted(procedureName, tableHandle.getSnapshotId().get())); + } IcebergTableProcedureId procedureId; try { @@ -1536,7 +1541,7 @@ private static void verifyTableVersionForUpdate(IcebergTableHandle table) private static void validateNotModifyingOldSnapshot(IcebergTableHandle table, Table icebergTable) { if (table.getSnapshotId().isPresent() && (table.getSnapshotId().get() != icebergTable.currentSnapshot().snapshotId())) { - throw new TrinoException(NOT_SUPPORTED, "Modifying old snapshot is not supported in Iceberg."); + throw new TrinoException(NOT_SUPPORTED, "Modifying old snapshot is not supported in Iceberg"); } } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index 07152257c13f..f42d8f917ee9 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -1067,10 +1067,10 @@ public void testTableComments() public void testRollbackSnapshot() { assertUpdate("CREATE TABLE test_rollback (col0 INTEGER, col1 BIGINT)"); - long afterCreateTableId = getLatestSnapshotId("test_rollback"); + long afterCreateTableId = getCurrentSnapshotId("test_rollback"); assertUpdate("INSERT INTO test_rollback (col0, col1) VALUES (123, CAST(987 AS BIGINT))", 1); - long afterFirstInsertId = getLatestSnapshotId("test_rollback"); + long afterFirstInsertId = getCurrentSnapshotId("test_rollback"); assertUpdate("INSERT INTO test_rollback (col0, col1) VALUES (456, CAST(654 AS BIGINT))", 1); assertQuery("SELECT * FROM test_rollback ORDER BY col0", "VALUES (123, CAST(987 AS BIGINT)), (456, CAST(654 AS BIGINT))"); @@ -1082,7 +1082,7 @@ public void testRollbackSnapshot() assertEquals((long) computeActual("SELECT COUNT(*) FROM test_rollback").getOnlyValue(), 0); assertUpdate("INSERT INTO test_rollback (col0, col1) VALUES (789, CAST(987 AS BIGINT))", 1); - long afterSecondInsertId = getLatestSnapshotId("test_rollback"); + long afterSecondInsertId = getCurrentSnapshotId("test_rollback"); // extra insert which should be dropped on rollback assertUpdate("INSERT INTO test_rollback (col0, col1) VALUES (999, CAST(999 AS BIGINT))", 1); @@ -1093,11 +1093,6 @@ public void testRollbackSnapshot() dropTable("test_rollback"); } - private long getLatestSnapshotId(String tableName) - { - return (long) computeScalar(format("SELECT snapshot_id FROM \"%s$snapshots\" ORDER BY committed_at DESC FETCH FIRST 1 ROW WITH TIES", tableName)); - } - @Override protected String errorMessageForInsertIntoNotNullColumn(String columnName) { @@ -4488,6 +4483,34 @@ public void testOptimizeCleansUpDeleteFiles() assertUpdate("DROP TABLE " + tableName); } + @Test + public void testOptimizeSnapshot() + { + Session session = Session.builder(getSession()) + .setCatalogSessionProperty(getSession().getCatalog().orElseThrow(), "allow_legacy_snapshot_syntax", "true") + .build(); + String tableName = "test_optimize_snapshot_" + randomTableSuffix(); + + assertUpdate("CREATE TABLE " + tableName + " (a) AS VALUES 11", 1); + long snapshotId = getCurrentSnapshotId(tableName); + assertUpdate("INSERT INTO " + tableName + " VALUES 22", 1); + assertThatThrownBy(() -> query(session, "ALTER TABLE \"%s@%d\" EXECUTE OPTIMIZE".formatted(tableName, snapshotId))) + .hasMessage("Cannot execute table procedure OPTIMIZE on old snapshot " + snapshotId); + assertThat(query("SELECT * FROM " + tableName)) + .matches("VALUES 11, 22"); + + assertUpdate("DROP TABLE " + tableName); + } + + @Test + public void testOptimizeSystemTable() + { + assertThatThrownBy(() -> query("ALTER TABLE \"nation$files\" EXECUTE OPTIMIZE")) + .hasMessage("This connector does not support table procedures"); + assertThatThrownBy(() -> query("ALTER TABLE \"nation$snapshots\" EXECUTE OPTIMIZE")) + .hasMessage("This connector does not support table procedures"); + } + private List getActiveFiles(String tableName) { return computeActual(format("SELECT file_path FROM \"%s$files\"", tableName)).getOnlyColumn() @@ -4813,6 +4836,34 @@ public void testExpireSnapshotsPartitionedTable() assertThat(updatedSnapshots.size()).isLessThan(initialSnapshots.size()); } + @Test + public void testExpireSnapshotsOnSnapshot() + { + Session session = Session.builder(getSession()) + .setCatalogSessionProperty(getSession().getCatalog().orElseThrow(), "allow_legacy_snapshot_syntax", "true") + .build(); + String tableName = "test_expire_snapshots_on_snapshot_" + randomTableSuffix(); + + assertUpdate("CREATE TABLE " + tableName + " (a) AS VALUES 11", 1); + long snapshotId = getCurrentSnapshotId(tableName); + assertUpdate("INSERT INTO " + tableName + " VALUES 22", 1); + assertThatThrownBy(() -> query(session, "ALTER TABLE \"%s@%d\" EXECUTE EXPIRE_SNAPSHOTS".formatted(tableName, snapshotId))) + .hasMessage("Cannot execute table procedure EXPIRE_SNAPSHOTS on old snapshot " + snapshotId); + assertThat(query("SELECT * FROM " + tableName)) + .matches("VALUES 11, 22"); + + assertUpdate("DROP TABLE " + tableName); + } + + @Test + public void testExpireSnapshotsSystemTable() + { + assertThatThrownBy(() -> query("ALTER TABLE \"nation$files\" EXECUTE EXPIRE_SNAPSHOTS")) + .hasMessage("This connector does not support table procedures"); + assertThatThrownBy(() -> query("ALTER TABLE \"nation$snapshots\" EXECUTE EXPIRE_SNAPSHOTS")) + .hasMessage("This connector does not support table procedures"); + } + @Test public void testExplainExpireSnapshotOutput() { @@ -4965,6 +5016,34 @@ public void testRemoveOrphanFilesParameterValidation() "\\QRetention specified (33.00s) is shorter than the minimum retention configured in the system (7.00d). Minimum retention can be changed with iceberg.remove_orphan_files.min-retention configuration property or iceberg.remove_orphan_files_min_retention session property"); } + @Test + public void testRemoveOrphanFilesOnSnapshot() + { + Session session = Session.builder(getSession()) + .setCatalogSessionProperty(getSession().getCatalog().orElseThrow(), "allow_legacy_snapshot_syntax", "true") + .build(); + String tableName = "test_remove_orphan_files_on_snapshot_" + randomTableSuffix(); + + assertUpdate("CREATE TABLE " + tableName + " (a) AS VALUES 11", 1); + long snapshotId = getCurrentSnapshotId(tableName); + assertUpdate("INSERT INTO " + tableName + " VALUES 22", 1); + assertThatThrownBy(() -> query(session, "ALTER TABLE \"%s@%d\" EXECUTE REMOVE_ORPHAN_FILES".formatted(tableName, snapshotId))) + .hasMessage("Cannot execute table procedure REMOVE_ORPHAN_FILES on old snapshot " + snapshotId); + assertThat(query("SELECT * FROM " + tableName)) + .matches("VALUES 11, 22"); + + assertUpdate("DROP TABLE " + tableName); + } + + @Test + public void testRemoveOrphanFilesSystemTable() + { + assertThatThrownBy(() -> query("ALTER TABLE \"nation$files\" EXECUTE REMOVE_ORPHAN_FILES")) + .hasMessage("This connector does not support table procedures"); + assertThatThrownBy(() -> query("ALTER TABLE \"nation$snapshots\" EXECUTE REMOVE_ORPHAN_FILES")) + .hasMessage("This connector does not support table procedures"); + } + @Test public void testIfDeletesReturnsNumberOfRemovedRows() { @@ -5095,13 +5174,11 @@ public void testModifyingOldSnapshotIsNotPossible() assertUpdate(format("INSERT INTO %s VALUES 4,5,6", tableName), 3); assertQuery(sessionWithLegacySyntaxSupport, format("SELECT * FROM \"%s@%d\"", tableName, oldSnapshotId), "VALUES 1,2,3"); assertThatThrownBy(() -> query(sessionWithLegacySyntaxSupport, format("INSERT INTO \"%s@%d\" VALUES 7,8,9", tableName, oldSnapshotId))) - .hasMessage("Modifying old snapshot is not supported in Iceberg."); + .hasMessage("Modifying old snapshot is not supported in Iceberg"); assertThatThrownBy(() -> query(sessionWithLegacySyntaxSupport, format("DELETE FROM \"%s@%d\" WHERE col = 5", tableName, oldSnapshotId))) - .hasMessage("Modifying old snapshot is not supported in Iceberg."); + .hasMessage("Modifying old snapshot is not supported in Iceberg"); assertThatThrownBy(() -> query(sessionWithLegacySyntaxSupport, format("UPDATE \"%s@%d\" SET col = 50 WHERE col = 5", tableName, oldSnapshotId))) - .hasMessage("Modifying old snapshot is not supported in Iceberg."); - assertThatThrownBy(() -> query(sessionWithLegacySyntaxSupport, format("ALTER TABLE \"%s@%d\" EXECUTE OPTIMIZE", tableName, oldSnapshotId))) - .hasMessage("Modifying old snapshot is not supported in Iceberg."); + .hasMessage("Modifying old snapshot is not supported in Iceberg"); // TODO Change to assertThatThrownBy because the syntax `table@versionid` should not be supported for DML operations assertUpdate(sessionWithLegacySyntaxSupport, format("INSERT INTO \"%s@%d\" VALUES 7,8,9", tableName, getCurrentSnapshotId(tableName)), 3); assertUpdate(sessionWithLegacySyntaxSupport, format("DELETE FROM \"%s@%d\" WHERE col = 9", tableName, getCurrentSnapshotId(tableName)), 1); @@ -5126,11 +5203,11 @@ public void testCreateTableAsSelectFromVersionedTable() // Enforce having exactly one snapshot of the table at the timestamp corresponding to `afterInsert123EpochMillis` Thread.sleep(1); assertUpdate("INSERT INTO " + sourceTableName + " VALUES 1, 2, 3", 3); - long afterInsert123SnapshotId = getLatestSnapshotId(sourceTableName); + long afterInsert123SnapshotId = getCurrentSnapshotId(sourceTableName); long afterInsert123EpochMillis = getCommittedAtInEpochMilliseconds(sourceTableName, afterInsert123SnapshotId); Thread.sleep(1); assertUpdate("INSERT INTO " + sourceTableName + " VALUES 4, 5, 6", 3); - long afterInsert456SnapshotId = getLatestSnapshotId(sourceTableName); + long afterInsert456SnapshotId = getCurrentSnapshotId(sourceTableName); assertUpdate("INSERT INTO " + sourceTableName + " VALUES 7, 8, 9", 3); assertUpdate("CREATE TABLE " + snapshotVersionedSinkTableName + " AS SELECT * FROM " + sourceTableName + " FOR VERSION AS OF " + afterInsert456SnapshotId, 6); @@ -5228,7 +5305,7 @@ public void testReadFromVersionedTableWithSchemaEvolution() String tableName = "test_versioned_table_schema_evolution_" + randomTableSuffix(); assertQuerySucceeds("CREATE TABLE " + tableName + "(col1 varchar)"); - long v1SnapshotId = getLatestSnapshotId(tableName); + long v1SnapshotId = getCurrentSnapshotId(tableName); assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v1SnapshotId)) .hasOutputTypes(ImmutableList.of(VARCHAR)) .returnsEmptyResult(); @@ -5239,7 +5316,7 @@ public void testReadFromVersionedTableWithSchemaEvolution() .returnsEmptyResult(); assertUpdate("INSERT INTO " + tableName + " VALUES ('a', 11)", 1); - long v2SnapshotId = getLatestSnapshotId(tableName); + long v2SnapshotId = getCurrentSnapshotId(tableName); assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v2SnapshotId)) .hasOutputTypes(ImmutableList.of(VARCHAR, INTEGER)) .matches("VALUES (VARCHAR 'a', 11)"); @@ -5256,7 +5333,7 @@ public void testReadFromVersionedTableWithSchemaEvolution() .matches("VALUES (VARCHAR 'a', 11, CAST(NULL AS bigint))"); assertUpdate("INSERT INTO " + tableName + " VALUES ('b', 22, 32)", 1); - long v3SnapshotId = getLatestSnapshotId(tableName); + long v3SnapshotId = getCurrentSnapshotId(tableName); assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v1SnapshotId)) .hasOutputTypes(ImmutableList.of(VARCHAR)) .returnsEmptyResult(); @@ -5277,19 +5354,19 @@ public void testReadFromVersionedTableWithPartitionSpecEvolution() { String tableName = "test_version_table_with_partition_spec_evolution_" + randomTableSuffix(); assertUpdate("CREATE TABLE " + tableName + " (day varchar, views bigint) WITH(partitioning = ARRAY['day'])"); - long v1SnapshotId = getLatestSnapshotId(tableName); + long v1SnapshotId = getCurrentSnapshotId(tableName); long v1EpochMillis = getCommittedAtInEpochMilliseconds(tableName, v1SnapshotId); Thread.sleep(1); assertUpdate("INSERT INTO " + tableName + " (day, views) VALUES ('2022-06-01', 1)", 1); - long v2SnapshotId = getLatestSnapshotId(tableName); + long v2SnapshotId = getCurrentSnapshotId(tableName); long v2EpochMillis = getCommittedAtInEpochMilliseconds(tableName, v2SnapshotId); Thread.sleep(1); assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN hour varchar"); assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES partitioning = ARRAY['day', 'hour']"); assertUpdate("INSERT INTO " + tableName + " (day, hour, views) VALUES ('2022-06-02', '10', 2), ('2022-06-02', '10', 3), ('2022-06-02', '11', 10)", 3); - long v3SnapshotId = getLatestSnapshotId(tableName); + long v3SnapshotId = getCurrentSnapshotId(tableName); long v3EpochMillis = getCommittedAtInEpochMilliseconds(tableName, v3SnapshotId); assertThat(query("SELECT sum(views), day FROM " + tableName + " GROUP BY day")) @@ -5320,15 +5397,15 @@ public void testReadFromVersionedTableWithExpiredHistory() String tableName = "test_version_table_with_expired_snapshots_" + randomTableSuffix(); Session sessionWithShortRetentionUnlocked = prepareCleanUpSession(); assertUpdate("CREATE TABLE " + tableName + " (key varchar, value integer)"); - long v1SnapshotId = getLatestSnapshotId(tableName); + long v1SnapshotId = getCurrentSnapshotId(tableName); long v1EpochMillis = getCommittedAtInEpochMilliseconds(tableName, v1SnapshotId); Thread.sleep(1); assertUpdate("INSERT INTO " + tableName + " VALUES ('one', 1)", 1); - long v2SnapshotId = getLatestSnapshotId(tableName); + long v2SnapshotId = getCurrentSnapshotId(tableName); long v2EpochMillis = getCommittedAtInEpochMilliseconds(tableName, v2SnapshotId); Thread.sleep(1); assertUpdate("INSERT INTO " + tableName + " VALUES ('two', 2)", 1); - long v3SnapshotId = getLatestSnapshotId(tableName); + long v3SnapshotId = getCurrentSnapshotId(tableName); long v3EpochMillis = getCommittedAtInEpochMilliseconds(tableName, v3SnapshotId); assertThat(query("SELECT sum(value), listagg(key, ' ') WITHIN GROUP (ORDER BY key) FROM " + tableName)) .matches("VALUES (BIGINT '3', VARCHAR 'one two')");