Skip to content

Commit 5319c2c

Browse files
committed
Add rollback_to_snapshot table procedure in Iceberg
1 parent 29ffc6c commit 5319c2c

File tree

13 files changed

+113
-10
lines changed

13 files changed

+113
-10
lines changed

docs/src/main/sphinx/connector/iceberg.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1642,11 +1642,11 @@ FROM example.testdb."customer_orders$snapshots"
16421642
ORDER BY committed_at DESC LIMIT 1
16431643
```
16441644

1645-
The procedure `system.rollback_to_snapshot` allows the caller to roll back the
1645+
The table procedure `rollback_to_snapshot` allows the caller to roll back the
16461646
state of the table to a previous snapshot id:
16471647

16481648
```
1649-
CALL example.system.rollback_to_snapshot('testdb', 'customer_orders', 8954597067493422955)
1649+
ALTER TABLE testdb.customer_orders EXECUTE rollback_to_snapshot(8954597067493422955)
16501650
```
16511651

16521652
#### `NOT NULL` column constraint

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import io.trino.plugin.iceberg.procedure.IcebergExpireSnapshotsHandle;
5454
import io.trino.plugin.iceberg.procedure.IcebergOptimizeHandle;
5555
import io.trino.plugin.iceberg.procedure.IcebergRemoveOrphanFilesHandle;
56+
import io.trino.plugin.iceberg.procedure.IcebergRollbackToSnapshotHandle;
5657
import io.trino.plugin.iceberg.procedure.IcebergTableExecuteHandle;
5758
import io.trino.plugin.iceberg.procedure.IcebergTableProcedureId;
5859
import io.trino.plugin.iceberg.procedure.MigrationUtils.RecursiveDirectory;
@@ -328,6 +329,7 @@
328329
import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.EXPIRE_SNAPSHOTS;
329330
import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.OPTIMIZE;
330331
import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.REMOVE_ORPHAN_FILES;
332+
import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.ROLLBACK_TO_SNAPSHOT;
331333
import static io.trino.plugin.iceberg.procedure.MigrationUtils.addFiles;
332334
import static io.trino.plugin.iceberg.procedure.MigrationUtils.addFilesFromTable;
333335
import static io.trino.spi.StandardErrorCode.COLUMN_ALREADY_EXISTS;
@@ -1586,6 +1588,7 @@ public Optional<ConnectorTableExecuteHandle> getTableHandleForExecute(
15861588
return switch (procedureId) {
15871589
case OPTIMIZE -> getTableHandleForOptimize(tableHandle, icebergTable, executeProperties, retryMode);
15881590
case DROP_EXTENDED_STATS -> getTableHandleForDropExtendedStats(session, tableHandle);
1591+
case ROLLBACK_TO_SNAPSHOT -> getTableHandleForRollbackToSnapshot(session, tableHandle, executeProperties);
15891592
case EXPIRE_SNAPSHOTS -> getTableHandleForExpireSnapshots(session, tableHandle, executeProperties);
15901593
case REMOVE_ORPHAN_FILES -> getTableHandleForRemoveOrphanFiles(session, tableHandle, executeProperties);
15911594
case ADD_FILES -> getTableHandleForAddFiles(session, accessControl, tableHandle, executeProperties);
@@ -1756,6 +1759,19 @@ private Optional<ConnectorTableExecuteHandle> getTableHandleForAddFilesFromTable
17561759
icebergTable.io().properties()));
17571760
}
17581761

1762+
private Optional<ConnectorTableExecuteHandle> getTableHandleForRollbackToSnapshot(ConnectorSession session, IcebergTableHandle tableHandle, Map<String, Object> executeProperties)
1763+
{
1764+
long snapshotId = (long) executeProperties.get("snapshot_id");
1765+
Table icebergTable = catalog.loadTable(session, tableHandle.getSchemaTableName());
1766+
1767+
return Optional.of(new IcebergTableExecuteHandle(
1768+
tableHandle.getSchemaTableName(),
1769+
ROLLBACK_TO_SNAPSHOT,
1770+
new IcebergRollbackToSnapshotHandle(snapshotId),
1771+
icebergTable.location(),
1772+
icebergTable.io().properties()));
1773+
}
1774+
17591775
private static Object requireProcedureArgument(Map<String, Object> properties, String name)
17601776
{
17611777
Object value = properties.get(name);
@@ -1771,6 +1787,7 @@ public Optional<ConnectorTableLayout> getLayoutForTableExecute(ConnectorSession
17711787
case OPTIMIZE:
17721788
return getLayoutForOptimize(session, executeHandle);
17731789
case DROP_EXTENDED_STATS:
1790+
case ROLLBACK_TO_SNAPSHOT:
17741791
case EXPIRE_SNAPSHOTS:
17751792
case REMOVE_ORPHAN_FILES:
17761793
case ADD_FILES:
@@ -1800,6 +1817,7 @@ public BeginTableExecuteResult<ConnectorTableExecuteHandle, ConnectorTableHandle
18001817
case OPTIMIZE:
18011818
return beginOptimize(session, executeHandle, table);
18021819
case DROP_EXTENDED_STATS:
1820+
case ROLLBACK_TO_SNAPSHOT:
18031821
case EXPIRE_SNAPSHOTS:
18041822
case REMOVE_ORPHAN_FILES:
18051823
case ADD_FILES:
@@ -1845,6 +1863,7 @@ public void finishTableExecute(ConnectorSession session, ConnectorTableExecuteHa
18451863
finishOptimize(session, executeHandle, fragments, splitSourceInfo);
18461864
return;
18471865
case DROP_EXTENDED_STATS:
1866+
case ROLLBACK_TO_SNAPSHOT:
18481867
case EXPIRE_SNAPSHOTS:
18491868
case REMOVE_ORPHAN_FILES:
18501869
case ADD_FILES:
@@ -1978,6 +1997,9 @@ public void executeTableExecute(ConnectorSession session, ConnectorTableExecuteH
19781997
case DROP_EXTENDED_STATS:
19791998
executeDropExtendedStats(session, executeHandle);
19801999
return;
2000+
case ROLLBACK_TO_SNAPSHOT:
2001+
executeRollbackToSnapshot(session, executeHandle);
2002+
return;
19812003
case EXPIRE_SNAPSHOTS:
19822004
executeExpireSnapshots(session, executeHandle);
19832005
return;
@@ -2010,6 +2032,15 @@ private void executeDropExtendedStats(ConnectorSession session, IcebergTableExec
20102032
transaction = null;
20112033
}
20122034

2035+
private void executeRollbackToSnapshot(ConnectorSession session, IcebergTableExecuteHandle executeHandle)
2036+
{
2037+
checkArgument(executeHandle.procedureHandle() instanceof IcebergRollbackToSnapshotHandle, "Unexpected procedure handle %s", executeHandle.procedureHandle());
2038+
long snapshotId = ((IcebergRollbackToSnapshotHandle) executeHandle.procedureHandle()).snapshotId();
2039+
2040+
Table icebergTable = catalog.loadTable(session, executeHandle.schemaTableName());
2041+
icebergTable.manageSnapshots().setCurrentSnapshot(snapshotId).commit();
2042+
}
2043+
20132044
private void executeExpireSnapshots(ConnectorSession session, IcebergTableExecuteHandle executeHandle)
20142045
{
20152046
IcebergExpireSnapshotsHandle expireSnapshotsHandle = (IcebergExpireSnapshotsHandle) executeHandle.procedureHandle();

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import io.trino.plugin.iceberg.procedure.RegisterTableProcedure;
5252
import io.trino.plugin.iceberg.procedure.RemoveOrphanFilesTableProcedure;
5353
import io.trino.plugin.iceberg.procedure.RollbackToSnapshotProcedure;
54+
import io.trino.plugin.iceberg.procedure.RollbackToSnapshotTableProcedure;
5455
import io.trino.plugin.iceberg.procedure.UnregisterTableProcedure;
5556
import io.trino.spi.catalog.CatalogName;
5657
import io.trino.spi.connector.ConnectorNodePartitioningProvider;
@@ -131,6 +132,7 @@ public void configure(Binder binder)
131132
Multibinder<TableProcedureMetadata> tableProcedures = newSetBinder(binder, TableProcedureMetadata.class);
132133
tableProcedures.addBinding().toProvider(OptimizeTableProcedure.class).in(Scopes.SINGLETON);
133134
tableProcedures.addBinding().toProvider(DropExtendedStatsTableProcedure.class).in(Scopes.SINGLETON);
135+
tableProcedures.addBinding().toProvider(RollbackToSnapshotTableProcedure.class).in(Scopes.SINGLETON);
134136
tableProcedures.addBinding().toProvider(ExpireSnapshotsTableProcedure.class).in(Scopes.SINGLETON);
135137
tableProcedures.addBinding().toProvider(RemoveOrphanFilesTableProcedure.class).in(Scopes.SINGLETON);
136138
tableProcedures.addBinding().toProvider(AddFilesTableProcedure.class).in(Scopes.SINGLETON);

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSinkProvider.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa
147147
typeManager,
148148
pageSorter);
149149
case DROP_EXTENDED_STATS:
150+
case ROLLBACK_TO_SNAPSHOT:
150151
case EXPIRE_SNAPSHOTS:
151152
case REMOVE_ORPHAN_FILES:
152153
case ADD_FILES:

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergProcedureHandle.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
property = "@type")
2222
@JsonSubTypes({
2323
@JsonSubTypes.Type(value = IcebergDropExtendedStatsHandle.class, name = "drop_extended_stats"),
24+
@JsonSubTypes.Type(value = IcebergRollbackToSnapshotHandle.class, name = "rollback_to_snapshot"),
2425
@JsonSubTypes.Type(value = IcebergExpireSnapshotsHandle.class, name = "expire_snapshots"),
2526
@JsonSubTypes.Type(value = IcebergOptimizeHandle.class, name = "optimize"),
2627
@JsonSubTypes.Type(value = IcebergRemoveOrphanFilesHandle.class, name = "remove_orphan_files"),
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.plugin.iceberg.procedure;
15+
16+
public record IcebergRollbackToSnapshotHandle(long snapshotId)
17+
implements IcebergProcedureHandle {}

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergTableProcedureId.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ public enum IcebergTableProcedureId
1717
{
1818
OPTIMIZE,
1919
DROP_EXTENDED_STATS,
20+
ROLLBACK_TO_SNAPSHOT,
2021
EXPIRE_SNAPSHOTS,
2122
REMOVE_ORPHAN_FILES,
2223
ADD_FILES,

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/RollbackToSnapshotProcedure.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import static java.lang.invoke.MethodHandles.lookup;
3232
import static java.util.Objects.requireNonNull;
3333

34+
@Deprecated
3435
public class RollbackToSnapshotProcedure
3536
implements Provider<Procedure>
3637
{
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.plugin.iceberg.procedure;
15+
16+
import com.google.common.collect.ImmutableList;
17+
import com.google.inject.Provider;
18+
import io.trino.spi.connector.TableProcedureMetadata;
19+
import io.trino.spi.session.PropertyMetadata;
20+
21+
import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.ROLLBACK_TO_SNAPSHOT;
22+
import static io.trino.spi.connector.TableProcedureExecutionMode.coordinatorOnly;
23+
import static io.trino.spi.session.PropertyMetadata.longProperty;
24+
25+
public class RollbackToSnapshotTableProcedure
26+
implements Provider<TableProcedureMetadata>
27+
{
28+
@Override
29+
public TableProcedureMetadata get()
30+
{
31+
return new TableProcedureMetadata(
32+
ROLLBACK_TO_SNAPSHOT.name(),
33+
coordinatorOnly(),
34+
ImmutableList.<PropertyMetadata<?>>builder()
35+
.add(longProperty(
36+
"snapshot_id",
37+
"Snapshot ID",
38+
null,
39+
false))
40+
.build());
41+
}
42+
}

plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1670,6 +1670,13 @@ public void testTableComments()
16701670

16711671
@Test
16721672
public void testRollbackSnapshot()
1673+
{
1674+
testRollbackSnapshot("ALTER TABLE tpch.test_rollback EXECUTE rollback_to_snapshot(%s)");
1675+
testRollbackSnapshot("ALTER TABLE tpch.test_rollback EXECUTE rollback_to_snapshot(snapshot_id => %s)");
1676+
testRollbackSnapshot("CALL system.rollback_to_snapshot('tpch', 'test_rollback', %s)");
1677+
}
1678+
1679+
private void testRollbackSnapshot(String rollbackToSnapshotFormat)
16731680
{
16741681
assertUpdate("CREATE TABLE test_rollback (col0 INTEGER, col1 BIGINT)");
16751682
long afterCreateTableId = getCurrentSnapshotId("test_rollback");
@@ -1679,16 +1686,16 @@ public void testRollbackSnapshot()
16791686
assertQuery("SELECT * FROM test_rollback ORDER BY col0", "VALUES (123, CAST(987 AS BIGINT))");
16801687

16811688
// Check that rollback_to_snapshot can be executed also when it does not do any changes
1682-
assertUpdate(format("CALL system.rollback_to_snapshot('tpch', 'test_rollback', %s)", afterFirstInsertId));
1689+
assertUpdate(format(rollbackToSnapshotFormat, afterFirstInsertId));
16831690
assertQuery("SELECT * FROM test_rollback ORDER BY col0", "VALUES (123, CAST(987 AS BIGINT))");
16841691

16851692
assertUpdate("INSERT INTO test_rollback (col0, col1) VALUES (456, CAST(654 AS BIGINT))", 1);
16861693
assertQuery("SELECT * FROM test_rollback ORDER BY col0", "VALUES (123, CAST(987 AS BIGINT)), (456, CAST(654 AS BIGINT))");
16871694

1688-
assertUpdate(format("CALL system.rollback_to_snapshot('tpch', 'test_rollback', %s)", afterFirstInsertId));
1695+
assertUpdate(format(rollbackToSnapshotFormat, afterFirstInsertId));
16891696
assertQuery("SELECT * FROM test_rollback ORDER BY col0", "VALUES (123, CAST(987 AS BIGINT))");
16901697

1691-
assertUpdate(format("CALL system.rollback_to_snapshot('tpch', 'test_rollback', %s)", afterCreateTableId));
1698+
assertUpdate(format(rollbackToSnapshotFormat, afterCreateTableId));
16921699
assertThat((long) computeActual("SELECT COUNT(*) FROM test_rollback").getOnlyValue()).isEqualTo(0);
16931700

16941701
assertUpdate("INSERT INTO test_rollback (col0, col1) VALUES (789, CAST(987 AS BIGINT))", 1);
@@ -1697,7 +1704,7 @@ public void testRollbackSnapshot()
16971704
// extra insert which should be dropped on rollback
16981705
assertUpdate("INSERT INTO test_rollback (col0, col1) VALUES (999, CAST(999 AS BIGINT))", 1);
16991706

1700-
assertUpdate(format("CALL system.rollback_to_snapshot('tpch', 'test_rollback', %s)", afterSecondInsertId));
1707+
assertUpdate(format("ALTER TABLE tpch.test_rollback EXECUTE rollback_to_snapshot(%s)", afterSecondInsertId));
17011708
assertQuery("SELECT * FROM test_rollback ORDER BY col0", "VALUES (789, CAST(987 AS BIGINT))");
17021709

17031710
assertUpdate("DROP TABLE test_rollback");
@@ -7652,7 +7659,7 @@ public void testCorruptedTableLocation()
76527659
assertQueryFails("TRUNCATE TABLE " + tableName, "Metadata not found in metadata location for table " + schemaTableName);
76537660
assertQueryFails("COMMENT ON TABLE " + tableName + " IS NULL", "Metadata not found in metadata location for table " + schemaTableName);
76547661
assertQueryFails("COMMENT ON COLUMN " + tableName + ".foo IS NULL", "Metadata not found in metadata location for table " + schemaTableName);
7655-
assertQueryFails("CALL iceberg.system.rollback_to_snapshot(CURRENT_SCHEMA, '" + tableName + "', 8954597067493422955)", "Metadata not found in metadata location for table " + schemaTableName);
7662+
assertQueryFails("ALTER TABLE " + tableName + " EXECUTE rollback_to_snapshot(8954597067493422955)", "Metadata not found in metadata location for table " + schemaTableName);
76567663

76577664
// Avoid failing metadata queries
76587665
assertQuery("SHOW TABLES LIKE 'test_corrupted_table_location_%' ESCAPE '\\'", "VALUES '" + tableName + "'");

0 commit comments

Comments
 (0)