diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index df82251dc07f..b45bfe3d5dd2 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -703,6 +703,8 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto IcebergTableHandle table = (IcebergTableHandle) tableHandle; Table icebergTable = catalog.loadTable(session, table.getSchemaTableName()); + validateNotModifyingOldSnapshot(table, icebergTable); + verify(transaction == null, "transaction already set"); transaction = icebergTable.newTransaction(); @@ -958,6 +960,8 @@ private BeginTableExecuteResult OPTIMIZE_MAX_SUPPORTED_TABLE_VERSION) { throw new TrinoException(NOT_SUPPORTED, format( @@ -1407,8 +1411,12 @@ public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTable if (table.getFormatVersion() < 2) { throw new TrinoException(NOT_SUPPORTED, "Iceberg table updates require at least format version 2"); } + + Table icebergTable = catalog.loadTable(session, table.getSchemaTableName()); + validateNotModifyingOldSnapshot(table, icebergTable); + verify(transaction == null, "transaction already set"); - transaction = catalog.loadTable(session, table.getSchemaTableName()).newTransaction(); + transaction = icebergTable.newTransaction(); return table.withRetryMode(retryMode); } @@ -1431,8 +1439,12 @@ public ConnectorTableHandle beginUpdate(ConnectorSession session, ConnectorTable if (table.getFormatVersion() < 2) { throw new TrinoException(NOT_SUPPORTED, "Iceberg table updates require at least format version 2"); } + + Table icebergTable = catalog.loadTable(session, table.getSchemaTableName()); + validateNotModifyingOldSnapshot(table, icebergTable); + verify(transaction == null, "transaction already set"); - transaction = catalog.loadTable(session, table.getSchemaTableName()).newTransaction(); + transaction = icebergTable.newTransaction(); return table.withRetryMode(retryMode) .withUpdatedColumns(updatedColumns.stream() .map(IcebergColumnHandle.class::cast) @@ -1467,6 +1479,13 @@ public ColumnHandle getUpdateRowIdColumnHandle(ConnectorSession session, Connect return getColumnHandle(icebergRowIdField, typeManager); } + private void validateNotModifyingOldSnapshot(IcebergTableHandle table, Table icebergTable) + { + if (table.getSnapshotId().isPresent() && table.getSnapshotId().get() != icebergTable.currentSnapshot().snapshotId()) { + throw new TrinoException(NOT_SUPPORTED, "Modifying old snapshot is not supported in Iceberg."); + } + } + private void finishWrite(ConnectorSession session, IcebergTableHandle table, Collection fragments, boolean runUpdateValidations) { Table icebergTable = transaction.table(); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index af6d80ffc79f..61afb5da3f64 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -3673,6 +3673,32 @@ public void testEmptyDelete() assertUpdate("DROP TABLE " + tableName); } + @Test + public void testModifyingOldSnapshotIsNotPossible() + { + String tableName = "test_modifying_old_snapshot_" + randomTableSuffix(); + assertUpdate(format("CREATE TABLE %s (col int)", tableName)); + assertUpdate(format("INSERT INTO %s VALUES 1,2,3", tableName), 3); + long oldSnapshotId = getCurrentSnapshotId(tableName); + assertUpdate(format("INSERT INTO %s VALUES 4,5,6", tableName), 3); + assertQuery(format("SELECT * FROM \"%s@%d\"", tableName, oldSnapshotId), "VALUES 1,2,3"); + assertThatThrownBy(() -> query(format("INSERT INTO \"%s@%d\" VALUES 7,8,9", tableName, oldSnapshotId))) + .hasMessage("Modifying old snapshot is not supported in Iceberg."); + assertThatThrownBy(() -> query(format("DELETE FROM \"%s@%d\" WHERE col = 5", tableName, oldSnapshotId))) + .hasMessage("Modifying old snapshot is not supported in Iceberg."); + assertThatThrownBy(() -> query(format("UPDATE \"%s@%d\" SET col = 50 WHERE col = 5", tableName, oldSnapshotId))) + .hasMessage("Modifying old snapshot is not supported in Iceberg."); + assertThatThrownBy(() -> query(format("ALTER TABLE \"%s@%d\" EXECUTE OPTIMIZE", tableName, oldSnapshotId))) + .hasMessage("Modifying old snapshot is not supported in Iceberg."); + assertUpdate(format("INSERT INTO \"%s@%d\" VALUES 7,8,9", tableName, getCurrentSnapshotId(tableName)), 3); + assertUpdate(format("DELETE FROM \"%s@%d\" WHERE col = 9", tableName, getCurrentSnapshotId(tableName)), 1); + assertUpdate(format("UPDATE \"%s@%d\" set col = 50 WHERE col = 5", tableName, getCurrentSnapshotId(tableName)), 1); + assertQuerySucceeds(format("ALTER TABLE \"%s@%d\" EXECUTE OPTIMIZE", tableName, getCurrentSnapshotId(tableName))); + assertQuery(format("SELECT * FROM %s", tableName), "VALUES 1,2,3,4,50,6,7,8"); + + assertUpdate("DROP TABLE " + tableName); + } + private Session prepareCleanUpSession() { return Session.builder(getSession())