Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/src/main/sphinx/connector/iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -1642,11 +1642,11 @@ FROM example.testdb."customer_orders$snapshots"
ORDER BY committed_at DESC LIMIT 1
```

The procedure `system.rollback_to_snapshot` allows the caller to roll back the
The table procedure `rollback_to_snapshot` allows the caller to roll back the
state of the table to a previous snapshot id:

```
CALL example.system.rollback_to_snapshot('testdb', 'customer_orders', 8954597067493422955)
ALTER TABLE testdb.customer_orders EXECUTE rollback_to_snapshot(8954597067493422955)
```

#### `NOT NULL` column constraint
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import io.trino.plugin.iceberg.procedure.IcebergExpireSnapshotsHandle;
import io.trino.plugin.iceberg.procedure.IcebergOptimizeHandle;
import io.trino.plugin.iceberg.procedure.IcebergRemoveOrphanFilesHandle;
import io.trino.plugin.iceberg.procedure.IcebergRollbackToSnapshotHandle;
import io.trino.plugin.iceberg.procedure.IcebergTableExecuteHandle;
import io.trino.plugin.iceberg.procedure.IcebergTableProcedureId;
import io.trino.plugin.iceberg.procedure.MigrationUtils.RecursiveDirectory;
Expand Down Expand Up @@ -326,6 +327,7 @@
import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.EXPIRE_SNAPSHOTS;
import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.OPTIMIZE;
import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.REMOVE_ORPHAN_FILES;
import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.ROLLBACK_TO_SNAPSHOT;
import static io.trino.plugin.iceberg.procedure.MigrationUtils.addFiles;
import static io.trino.plugin.iceberg.procedure.MigrationUtils.addFilesFromTable;
import static io.trino.spi.StandardErrorCode.COLUMN_ALREADY_EXISTS;
Expand Down Expand Up @@ -1582,6 +1584,7 @@ public Optional<ConnectorTableExecuteHandle> getTableHandleForExecute(
return switch (procedureId) {
case OPTIMIZE -> getTableHandleForOptimize(tableHandle, icebergTable, executeProperties, retryMode);
case DROP_EXTENDED_STATS -> getTableHandleForDropExtendedStats(session, tableHandle);
case ROLLBACK_TO_SNAPSHOT -> getTableHandleForRollbackToSnapshot(session, tableHandle, executeProperties);
case EXPIRE_SNAPSHOTS -> getTableHandleForExpireSnapshots(session, tableHandle, executeProperties);
case REMOVE_ORPHAN_FILES -> getTableHandleForRemoveOrphanFiles(session, tableHandle, executeProperties);
case ADD_FILES -> getTableHandleForAddFiles(session, accessControl, tableHandle, executeProperties);
Expand Down Expand Up @@ -1752,6 +1755,19 @@ private Optional<ConnectorTableExecuteHandle> getTableHandleForAddFilesFromTable
icebergTable.io().properties()));
}

private Optional<ConnectorTableExecuteHandle> getTableHandleForRollbackToSnapshot(ConnectorSession session, IcebergTableHandle tableHandle, Map<String, Object> executeProperties)
{
long snapshotId = (long) executeProperties.get("snapshot_id");
Table icebergTable = catalog.loadTable(session, tableHandle.getSchemaTableName());

return Optional.of(new IcebergTableExecuteHandle(
tableHandle.getSchemaTableName(),
ROLLBACK_TO_SNAPSHOT,
new IcebergRollbackToSnapshotHandle(snapshotId),
icebergTable.location(),
icebergTable.io().properties()));
}

private static Object requireProcedureArgument(Map<String, Object> properties, String name)
{
Object value = properties.get(name);
Expand All @@ -1767,6 +1783,7 @@ public Optional<ConnectorTableLayout> getLayoutForTableExecute(ConnectorSession
case OPTIMIZE:
return getLayoutForOptimize(session, executeHandle);
case DROP_EXTENDED_STATS:
case ROLLBACK_TO_SNAPSHOT:
case EXPIRE_SNAPSHOTS:
case REMOVE_ORPHAN_FILES:
case ADD_FILES:
Expand Down Expand Up @@ -1796,6 +1813,7 @@ public BeginTableExecuteResult<ConnectorTableExecuteHandle, ConnectorTableHandle
case OPTIMIZE:
return beginOptimize(session, executeHandle, table);
case DROP_EXTENDED_STATS:
case ROLLBACK_TO_SNAPSHOT:
case EXPIRE_SNAPSHOTS:
case REMOVE_ORPHAN_FILES:
case ADD_FILES:
Expand Down Expand Up @@ -1841,6 +1859,7 @@ public void finishTableExecute(ConnectorSession session, ConnectorTableExecuteHa
finishOptimize(session, executeHandle, fragments, splitSourceInfo);
return;
case DROP_EXTENDED_STATS:
case ROLLBACK_TO_SNAPSHOT:
case EXPIRE_SNAPSHOTS:
case REMOVE_ORPHAN_FILES:
case ADD_FILES:
Expand Down Expand Up @@ -1974,6 +1993,9 @@ public void executeTableExecute(ConnectorSession session, ConnectorTableExecuteH
case DROP_EXTENDED_STATS:
executeDropExtendedStats(session, executeHandle);
return;
case ROLLBACK_TO_SNAPSHOT:
executeRollbackToSnapshot(session, executeHandle);
return;
case EXPIRE_SNAPSHOTS:
executeExpireSnapshots(session, executeHandle);
return;
Expand Down Expand Up @@ -2006,6 +2028,15 @@ private void executeDropExtendedStats(ConnectorSession session, IcebergTableExec
transaction = null;
}

private void executeRollbackToSnapshot(ConnectorSession session, IcebergTableExecuteHandle executeHandle)
{
checkArgument(executeHandle.procedureHandle() instanceof IcebergRollbackToSnapshotHandle, "Unexpected procedure handle %s", executeHandle.procedureHandle());
long snapshotId = ((IcebergRollbackToSnapshotHandle) executeHandle.procedureHandle()).snapshotId();

Table icebergTable = catalog.loadTable(session, executeHandle.schemaTableName());
icebergTable.manageSnapshots().setCurrentSnapshot(snapshotId).commit();
}

private void executeExpireSnapshots(ConnectorSession session, IcebergTableExecuteHandle executeHandle)
{
IcebergExpireSnapshotsHandle expireSnapshotsHandle = (IcebergExpireSnapshotsHandle) executeHandle.procedureHandle();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import io.trino.plugin.iceberg.procedure.RegisterTableProcedure;
import io.trino.plugin.iceberg.procedure.RemoveOrphanFilesTableProcedure;
import io.trino.plugin.iceberg.procedure.RollbackToSnapshotProcedure;
import io.trino.plugin.iceberg.procedure.RollbackToSnapshotTableProcedure;
import io.trino.plugin.iceberg.procedure.UnregisterTableProcedure;
import io.trino.spi.catalog.CatalogName;
import io.trino.spi.connector.ConnectorNodePartitioningProvider;
Expand Down Expand Up @@ -131,6 +132,7 @@ public void configure(Binder binder)
Multibinder<TableProcedureMetadata> tableProcedures = newSetBinder(binder, TableProcedureMetadata.class);
tableProcedures.addBinding().toProvider(OptimizeTableProcedure.class).in(Scopes.SINGLETON);
tableProcedures.addBinding().toProvider(DropExtendedStatsTableProcedure.class).in(Scopes.SINGLETON);
tableProcedures.addBinding().toProvider(RollbackToSnapshotTableProcedure.class).in(Scopes.SINGLETON);
tableProcedures.addBinding().toProvider(ExpireSnapshotsTableProcedure.class).in(Scopes.SINGLETON);
tableProcedures.addBinding().toProvider(RemoveOrphanFilesTableProcedure.class).in(Scopes.SINGLETON);
tableProcedures.addBinding().toProvider(AddFilesTableProcedure.class).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa
typeManager,
pageSorter);
case DROP_EXTENDED_STATS:
case ROLLBACK_TO_SNAPSHOT:
case EXPIRE_SNAPSHOTS:
case REMOVE_ORPHAN_FILES:
case ADD_FILES:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
property = "@type")
@JsonSubTypes({
@JsonSubTypes.Type(value = IcebergDropExtendedStatsHandle.class, name = "drop_extended_stats"),
@JsonSubTypes.Type(value = IcebergRollbackToSnapshotHandle.class, name = "rollback_to_snapshot"),
@JsonSubTypes.Type(value = IcebergExpireSnapshotsHandle.class, name = "expire_snapshots"),
@JsonSubTypes.Type(value = IcebergOptimizeHandle.class, name = "optimize"),
@JsonSubTypes.Type(value = IcebergRemoveOrphanFilesHandle.class, name = "remove_orphan_files"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg.procedure;

public record IcebergRollbackToSnapshotHandle(long snapshotId)
implements IcebergProcedureHandle {}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public enum IcebergTableProcedureId
{
OPTIMIZE,
DROP_EXTENDED_STATS,
ROLLBACK_TO_SNAPSHOT,
EXPIRE_SNAPSHOTS,
REMOVE_ORPHAN_FILES,
ADD_FILES,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import static java.lang.invoke.MethodHandles.lookup;
import static java.util.Objects.requireNonNull;

@Deprecated
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this part of a larger endeavor ?
If there are other follow-ups maybe it is worth creating a epic issue to track what else will be done.

public class RollbackToSnapshotProcedure
implements Provider<Procedure>
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg.procedure;

import com.google.common.collect.ImmutableList;
import com.google.inject.Provider;
import io.trino.spi.connector.TableProcedureMetadata;
import io.trino.spi.session.PropertyMetadata;

import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.ROLLBACK_TO_SNAPSHOT;
import static io.trino.spi.connector.TableProcedureExecutionMode.coordinatorOnly;
import static io.trino.spi.session.PropertyMetadata.longProperty;

public class RollbackToSnapshotTableProcedure
implements Provider<TableProcedureMetadata>
{
@Override
public TableProcedureMetadata get()
{
return new TableProcedureMetadata(
ROLLBACK_TO_SNAPSHOT.name(),
coordinatorOnly(),
ImmutableList.<PropertyMetadata<?>>builder()
.add(longProperty(
"snapshot_id",
"Snapshot ID",
null,
false))
.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1676,6 +1676,13 @@ public void testTableComments()

@Test
public void testRollbackSnapshot()
{
testRollbackSnapshot("ALTER TABLE tpch.test_rollback EXECUTE rollback_to_snapshot(%s)");
testRollbackSnapshot("ALTER TABLE tpch.test_rollback EXECUTE rollback_to_snapshot(snapshot_id => %s)");
testRollbackSnapshot("CALL system.rollback_to_snapshot('tpch', 'test_rollback', %s)");
}

private void testRollbackSnapshot(String rollbackToSnapshotFormat)
{
assertUpdate("CREATE TABLE test_rollback (col0 INTEGER, col1 BIGINT)");
long afterCreateTableId = getCurrentSnapshotId("test_rollback");
Expand All @@ -1685,16 +1692,16 @@ public void testRollbackSnapshot()
assertQuery("SELECT * FROM test_rollback ORDER BY col0", "VALUES (123, CAST(987 AS BIGINT))");

// Check that rollback_to_snapshot can be executed also when it does not do any changes
assertUpdate(format("CALL system.rollback_to_snapshot('tpch', 'test_rollback', %s)", afterFirstInsertId));
assertUpdate(format(rollbackToSnapshotFormat, afterFirstInsertId));
assertQuery("SELECT * FROM test_rollback ORDER BY col0", "VALUES (123, CAST(987 AS BIGINT))");

assertUpdate("INSERT INTO test_rollback (col0, col1) VALUES (456, CAST(654 AS BIGINT))", 1);
assertQuery("SELECT * FROM test_rollback ORDER BY col0", "VALUES (123, CAST(987 AS BIGINT)), (456, CAST(654 AS BIGINT))");

assertUpdate(format("CALL system.rollback_to_snapshot('tpch', 'test_rollback', %s)", afterFirstInsertId));
assertUpdate(format(rollbackToSnapshotFormat, afterFirstInsertId));
assertQuery("SELECT * FROM test_rollback ORDER BY col0", "VALUES (123, CAST(987 AS BIGINT))");

assertUpdate(format("CALL system.rollback_to_snapshot('tpch', 'test_rollback', %s)", afterCreateTableId));
assertUpdate(format(rollbackToSnapshotFormat, afterCreateTableId));
assertThat((long) computeActual("SELECT COUNT(*) FROM test_rollback").getOnlyValue()).isEqualTo(0);

assertUpdate("INSERT INTO test_rollback (col0, col1) VALUES (789, CAST(987 AS BIGINT))", 1);
Expand All @@ -1703,7 +1710,7 @@ public void testRollbackSnapshot()
// extra insert which should be dropped on rollback
assertUpdate("INSERT INTO test_rollback (col0, col1) VALUES (999, CAST(999 AS BIGINT))", 1);

assertUpdate(format("CALL system.rollback_to_snapshot('tpch', 'test_rollback', %s)", afterSecondInsertId));
assertUpdate(format("ALTER TABLE tpch.test_rollback EXECUTE rollback_to_snapshot(%s)", afterSecondInsertId));
assertQuery("SELECT * FROM test_rollback ORDER BY col0", "VALUES (789, CAST(987 AS BIGINT))");

assertUpdate("DROP TABLE test_rollback");
Expand Down Expand Up @@ -7657,7 +7664,7 @@ public void testCorruptedTableLocation()
assertQueryFails("TRUNCATE TABLE " + tableName, "Metadata not found in metadata location for table " + schemaTableName);
assertQueryFails("COMMENT ON TABLE " + tableName + " IS NULL", "Metadata not found in metadata location for table " + schemaTableName);
assertQueryFails("COMMENT ON COLUMN " + tableName + ".foo IS NULL", "Metadata not found in metadata location for table " + schemaTableName);
assertQueryFails("CALL iceberg.system.rollback_to_snapshot(CURRENT_SCHEMA, '" + tableName + "', 8954597067493422955)", "Metadata not found in metadata location for table " + schemaTableName);
assertQueryFails("ALTER TABLE " + tableName + " EXECUTE rollback_to_snapshot(8954597067493422955)", "Metadata not found in metadata location for table " + schemaTableName);

// Avoid failing metadata queries
assertQuery("SHOW TABLES LIKE 'test_corrupted_table_location_%' ESCAPE '\\'", "VALUES '" + tableName + "'");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ public void testMaterializedViewOnTableRolledBack()

// Base MV on a snapshot "in the future"
assertUpdate("REFRESH MATERIALIZED VIEW mv_on_rolled_back_the_mv", 1);
assertUpdate(format("CALL system.rollback_to_snapshot(CURRENT_SCHEMA, 'mv_on_rolled_back_base_table', %s)", firstSnapshot));
assertUpdate(format("ALTER TABLE mv_on_rolled_back_base_table EXECUTE rollback_to_snapshot(%s)", firstSnapshot));

// View still can be queried
assertThat(query("TABLE mv_on_rolled_back_the_mv"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -786,7 +786,7 @@ public void testAnalyzeAndRollbackToSnapshot()
(null, null, null, null, 26, null, null)
""");

assertUpdate(format("CALL system.rollback_to_snapshot('%s', '%s', %s)", schema, tableName, createSnapshot));
assertUpdate(format("ALTER TABLE %s.%s EXECUTE rollback_to_snapshot(%s)", schema, tableName, createSnapshot));
// NDV information still present after rollback_to_snapshot
assertQuery(
"SHOW STATS FOR " + tableName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ public void testRollbackToSnapshot()
Thread.sleep(1);
onTrino().executeQuery(format("INSERT INTO %s VALUES 2", tableName));
long snapshotId = getSecondOldestTableSnapshot(tableName);
onTrino().executeQuery(format("call system.rollback_to_snapshot('default', '%s', %d)", tableName, snapshotId));
onTrino().executeQuery(format("ALTER TABLE %s EXECUTE rollback_to_snapshot(%d)", tableName, snapshotId));
assertThat(onTrino().executeQuery(format("SELECT * FROM %s", tableName)))
.containsOnly(row(1));
onTrino().executeQuery(format("DROP TABLE IF EXISTS %s", tableName));
Expand Down
Loading