Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,8 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto
IcebergTableHandle table = (IcebergTableHandle) tableHandle;
Table icebergTable = catalog.loadTable(session, table.getSchemaTableName());

validateNotModifyingOldSnapshot(table, icebergTable);

verify(transaction == null, "transaction already set");
transaction = icebergTable.newTransaction();

Expand Down Expand Up @@ -958,6 +960,8 @@ private BeginTableExecuteResult<ConnectorTableExecuteHandle, ConnectorTableHandl
IcebergOptimizeHandle optimizeHandle = (IcebergOptimizeHandle) executeHandle.getProcedureHandle();
Table icebergTable = catalog.loadTable(session, table.getSchemaTableName());

validateNotModifyingOldSnapshot(table, icebergTable);

int tableFormatVersion = ((BaseTable) icebergTable).operations().current().formatVersion();
if (tableFormatVersion > OPTIMIZE_MAX_SUPPORTED_TABLE_VERSION) {
throw new TrinoException(NOT_SUPPORTED, format(
Expand Down Expand Up @@ -1407,8 +1411,12 @@ public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTable
if (table.getFormatVersion() < 2) {
throw new TrinoException(NOT_SUPPORTED, "Iceberg table updates require at least format version 2");
}

Table icebergTable = catalog.loadTable(session, table.getSchemaTableName());
validateNotModifyingOldSnapshot(table, icebergTable);

verify(transaction == null, "transaction already set");
transaction = catalog.loadTable(session, table.getSchemaTableName()).newTransaction();
transaction = icebergTable.newTransaction();
return table.withRetryMode(retryMode);
}

Expand All @@ -1431,8 +1439,12 @@ public ConnectorTableHandle beginUpdate(ConnectorSession session, ConnectorTable
if (table.getFormatVersion() < 2) {
throw new TrinoException(NOT_SUPPORTED, "Iceberg table updates require at least format version 2");
}

Table icebergTable = catalog.loadTable(session, table.getSchemaTableName());
validateNotModifyingOldSnapshot(table, icebergTable);

verify(transaction == null, "transaction already set");
transaction = catalog.loadTable(session, table.getSchemaTableName()).newTransaction();
transaction = icebergTable.newTransaction();
return table.withRetryMode(retryMode)
.withUpdatedColumns(updatedColumns.stream()
.map(IcebergColumnHandle.class::cast)
Expand Down Expand Up @@ -1467,6 +1479,13 @@ public ColumnHandle getUpdateRowIdColumnHandle(ConnectorSession session, Connect
return getColumnHandle(icebergRowIdField, typeManager);
}

private void validateNotModifyingOldSnapshot(IcebergTableHandle table, Table icebergTable)
{
if (table.getSnapshotId().isPresent() && table.getSnapshotId().get() != icebergTable.currentSnapshot().snapshotId()) {
throw new TrinoException(NOT_SUPPORTED, "Modifying old snapshot is not supported in Iceberg.");
}
}

private void finishWrite(ConnectorSession session, IcebergTableHandle table, Collection<Slice> fragments, boolean runUpdateValidations)
{
Table icebergTable = transaction.table();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3673,6 +3673,32 @@ public void testEmptyDelete()
assertUpdate("DROP TABLE " + tableName);
}

@Test
public void testModifyingOldSnapshotIsNotPossible()
{
String tableName = "test_modifying_old_snapshot_" + randomTableSuffix();
assertUpdate(format("CREATE TABLE %s (col int)", tableName));
assertUpdate(format("INSERT INTO %s VALUES 1,2,3", tableName), 3);
long oldSnapshotId = getCurrentSnapshotId(tableName);
assertUpdate(format("INSERT INTO %s VALUES 4,5,6", tableName), 3);
Comment thread
ebyhr marked this conversation as resolved.
Outdated
assertQuery(format("SELECT * FROM \"%s@%d\"", tableName, oldSnapshotId), "VALUES 1,2,3");
assertThatThrownBy(() -> query(format("INSERT INTO \"%s@%d\" VALUES 7,8,9", tableName, oldSnapshotId)))
.hasMessage("Modifying old snapshot is not supported in Iceberg.");
assertThatThrownBy(() -> query(format("DELETE FROM \"%s@%d\" WHERE col = 5", tableName, oldSnapshotId)))
.hasMessage("Modifying old snapshot is not supported in Iceberg.");
assertThatThrownBy(() -> query(format("UPDATE \"%s@%d\" SET col = 50 WHERE col = 5", tableName, oldSnapshotId)))
.hasMessage("Modifying old snapshot is not supported in Iceberg.");
assertThatThrownBy(() -> query(format("ALTER TABLE \"%s@%d\" EXECUTE OPTIMIZE", tableName, oldSnapshotId)))
.hasMessage("Modifying old snapshot is not supported in Iceberg.");
assertUpdate(format("INSERT INTO \"%s@%d\" VALUES 7,8,9", tableName, getCurrentSnapshotId(tableName)), 3);
assertUpdate(format("DELETE FROM \"%s@%d\" WHERE col = 9", tableName, getCurrentSnapshotId(tableName)), 1);
assertUpdate(format("UPDATE \"%s@%d\" set col = 50 WHERE col = 5", tableName, getCurrentSnapshotId(tableName)), 1);
assertQuerySucceeds(format("ALTER TABLE \"%s@%d\" EXECUTE OPTIMIZE", tableName, getCurrentSnapshotId(tableName)));
assertQuery(format("SELECT * FROM %s", tableName), "VALUES 1,2,3,4,50,6,7,8");

assertUpdate("DROP TABLE " + tableName);
}
Comment thread
ebyhr marked this conversation as resolved.
Outdated

private Session prepareCleanUpSession()
{
return Session.builder(getSession())
Expand Down