Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,11 @@ public Optional<ConnectorTableExecuteHandle> getTableHandleForExecute(
RetryMode retryMode)
{
IcebergTableHandle tableHandle = (IcebergTableHandle) connectorTableHandle;
checkArgument(tableHandle.getTableType() == DATA, "Cannot execute table procedure %s on non-DATA table: %s", procedureName, tableHandle.getTableType());
Table icebergTable = catalog.loadTable(session, tableHandle.getSchemaTableName());
if (tableHandle.getSnapshotId().isPresent() && (tableHandle.getSnapshotId().get() != icebergTable.currentSnapshot().snapshotId())) {
throw new TrinoException(NOT_SUPPORTED, "Cannot execute table procedure %s on old snapshot %s".formatted(procedureName, tableHandle.getSnapshotId().get()));
}

IcebergTableProcedureId procedureId;
try {
Expand Down Expand Up @@ -1536,7 +1541,7 @@ private static void verifyTableVersionForUpdate(IcebergTableHandle table)
private static void validateNotModifyingOldSnapshot(IcebergTableHandle table, Table icebergTable)
{
if (table.getSnapshotId().isPresent() && (table.getSnapshotId().get() != icebergTable.currentSnapshot().snapshotId())) {
throw new TrinoException(NOT_SUPPORTED, "Modifying old snapshot is not supported in Iceberg.");
throw new TrinoException(NOT_SUPPORTED, "Modifying old snapshot is not supported in Iceberg");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1067,10 +1067,10 @@ public void testTableComments()
public void testRollbackSnapshot()
{
assertUpdate("CREATE TABLE test_rollback (col0 INTEGER, col1 BIGINT)");
long afterCreateTableId = getLatestSnapshotId("test_rollback");
long afterCreateTableId = getCurrentSnapshotId("test_rollback");

assertUpdate("INSERT INTO test_rollback (col0, col1) VALUES (123, CAST(987 AS BIGINT))", 1);
long afterFirstInsertId = getLatestSnapshotId("test_rollback");
long afterFirstInsertId = getCurrentSnapshotId("test_rollback");

assertUpdate("INSERT INTO test_rollback (col0, col1) VALUES (456, CAST(654 AS BIGINT))", 1);
assertQuery("SELECT * FROM test_rollback ORDER BY col0", "VALUES (123, CAST(987 AS BIGINT)), (456, CAST(654 AS BIGINT))");
Expand All @@ -1082,7 +1082,7 @@ public void testRollbackSnapshot()
assertEquals((long) computeActual("SELECT COUNT(*) FROM test_rollback").getOnlyValue(), 0);

assertUpdate("INSERT INTO test_rollback (col0, col1) VALUES (789, CAST(987 AS BIGINT))", 1);
long afterSecondInsertId = getLatestSnapshotId("test_rollback");
long afterSecondInsertId = getCurrentSnapshotId("test_rollback");

// extra insert which should be dropped on rollback
assertUpdate("INSERT INTO test_rollback (col0, col1) VALUES (999, CAST(999 AS BIGINT))", 1);
Expand All @@ -1093,11 +1093,6 @@ public void testRollbackSnapshot()
dropTable("test_rollback");
}

private long getLatestSnapshotId(String tableName)
{
return (long) computeScalar(format("SELECT snapshot_id FROM \"%s$snapshots\" ORDER BY committed_at DESC FETCH FIRST 1 ROW WITH TIES", tableName));
}

@Override
protected String errorMessageForInsertIntoNotNullColumn(String columnName)
{
Expand Down Expand Up @@ -4488,6 +4483,34 @@ public void testOptimizeCleansUpDeleteFiles()
assertUpdate("DROP TABLE " + tableName);
}

@Test
public void testOptimizeSnapshot()
{
Session session = Session.builder(getSession())
.setCatalogSessionProperty(getSession().getCatalog().orElseThrow(), "allow_legacy_snapshot_syntax", "true")
.build();
String tableName = "test_optimize_snapshot_" + randomTableSuffix();

assertUpdate("CREATE TABLE " + tableName + " (a) AS VALUES 11", 1);
long snapshotId = getCurrentSnapshotId(tableName);
assertUpdate("INSERT INTO " + tableName + " VALUES 22", 1);
assertThatThrownBy(() -> query(session, "ALTER TABLE \"%s@%d\" EXECUTE OPTIMIZE".formatted(tableName, snapshotId)))
.hasMessage("Cannot execute table procedure OPTIMIZE on old snapshot " + snapshotId);
Comment on lines 4497 to 4498
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: How about assertQueryFails to ensure TrinoException?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assertThatThrownBy(() -> query(...)) is a pattern we use very widely.
if we want to deprecate it in favor of assertQueryFails, we should have a discussion about this first.

assertThat(query("SELECT * FROM " + tableName))
.matches("VALUES 11, 22");
Comment on lines 4499 to 4500
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: No change requested. assertQuery looks better for simple query assertion.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree. i think i perhaps did this more for consistency reasons.


assertUpdate("DROP TABLE " + tableName);
}

@Test
public void testOptimizeSystemTable()
{
assertThatThrownBy(() -> query("ALTER TABLE \"nation$files\" EXECUTE OPTIMIZE"))
.hasMessage("This connector does not support table procedures");
assertThatThrownBy(() -> query("ALTER TABLE \"nation$snapshots\" EXECUTE OPTIMIZE"))
.hasMessage("This connector does not support table procedures");
}

private List<String> getActiveFiles(String tableName)
{
return computeActual(format("SELECT file_path FROM \"%s$files\"", tableName)).getOnlyColumn()
Expand Down Expand Up @@ -4813,6 +4836,34 @@ public void testExpireSnapshotsPartitionedTable()
assertThat(updatedSnapshots.size()).isLessThan(initialSnapshots.size());
}

@Test
public void testExpireSnapshotsOnSnapshot()
{
Session session = Session.builder(getSession())
.setCatalogSessionProperty(getSession().getCatalog().orElseThrow(), "allow_legacy_snapshot_syntax", "true")
.build();
String tableName = "test_expire_snapshots_on_snapshot_" + randomTableSuffix();

assertUpdate("CREATE TABLE " + tableName + " (a) AS VALUES 11", 1);
long snapshotId = getCurrentSnapshotId(tableName);
assertUpdate("INSERT INTO " + tableName + " VALUES 22", 1);
assertThatThrownBy(() -> query(session, "ALTER TABLE \"%s@%d\" EXECUTE EXPIRE_SNAPSHOTS".formatted(tableName, snapshotId)))
.hasMessage("Cannot execute table procedure EXPIRE_SNAPSHOTS on old snapshot " + snapshotId);
assertThat(query("SELECT * FROM " + tableName))
.matches("VALUES 11, 22");

assertUpdate("DROP TABLE " + tableName);
}

@Test
public void testExpireSnapshotsSystemTable()
{
assertThatThrownBy(() -> query("ALTER TABLE \"nation$files\" EXECUTE EXPIRE_SNAPSHOTS"))
.hasMessage("This connector does not support table procedures");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know it is not in the scope of this PR but I find this error message missleading for this particular case.
This connector actually supports table procedures just not on the system tables.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding io.trino.connector.system.SystemTablesMetadata#getTableHandleForExecute with a more meaningful exception message would be a solution.

assertThatThrownBy(() -> query("ALTER TABLE \"nation$snapshots\" EXECUTE EXPIRE_SNAPSHOTS"))
.hasMessage("This connector does not support table procedures");
}

@Test
public void testExplainExpireSnapshotOutput()
{
Expand Down Expand Up @@ -4965,6 +5016,34 @@ public void testRemoveOrphanFilesParameterValidation()
"\\QRetention specified (33.00s) is shorter than the minimum retention configured in the system (7.00d). Minimum retention can be changed with iceberg.remove_orphan_files.min-retention configuration property or iceberg.remove_orphan_files_min_retention session property");
}

@Test
public void testRemoveOrphanFilesOnSnapshot()
{
Session session = Session.builder(getSession())
.setCatalogSessionProperty(getSession().getCatalog().orElseThrow(), "allow_legacy_snapshot_syntax", "true")
.build();
String tableName = "test_remove_orphan_files_on_snapshot_" + randomTableSuffix();

assertUpdate("CREATE TABLE " + tableName + " (a) AS VALUES 11", 1);
long snapshotId = getCurrentSnapshotId(tableName);
assertUpdate("INSERT INTO " + tableName + " VALUES 22", 1);
assertThatThrownBy(() -> query(session, "ALTER TABLE \"%s@%d\" EXECUTE REMOVE_ORPHAN_FILES".formatted(tableName, snapshotId)))
.hasMessage("Cannot execute table procedure REMOVE_ORPHAN_FILES on old snapshot " + snapshotId);
assertThat(query("SELECT * FROM " + tableName))
.matches("VALUES 11, 22");

assertUpdate("DROP TABLE " + tableName);
}

@Test
public void testRemoveOrphanFilesSystemTable()
{
assertThatThrownBy(() -> query("ALTER TABLE \"nation$files\" EXECUTE REMOVE_ORPHAN_FILES"))
.hasMessage("This connector does not support table procedures");
assertThatThrownBy(() -> query("ALTER TABLE \"nation$snapshots\" EXECUTE REMOVE_ORPHAN_FILES"))
.hasMessage("This connector does not support table procedures");
}

@Test
public void testIfDeletesReturnsNumberOfRemovedRows()
{
Expand Down Expand Up @@ -5095,13 +5174,11 @@ public void testModifyingOldSnapshotIsNotPossible()
assertUpdate(format("INSERT INTO %s VALUES 4,5,6", tableName), 3);
assertQuery(sessionWithLegacySyntaxSupport, format("SELECT * FROM \"%s@%d\"", tableName, oldSnapshotId), "VALUES 1,2,3");
assertThatThrownBy(() -> query(sessionWithLegacySyntaxSupport, format("INSERT INTO \"%s@%d\" VALUES 7,8,9", tableName, oldSnapshotId)))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: There are a few more ." occurrences in the exception messages within IcebergMetadata

.hasMessage("Modifying old snapshot is not supported in Iceberg.");
.hasMessage("Modifying old snapshot is not supported in Iceberg");
assertThatThrownBy(() -> query(sessionWithLegacySyntaxSupport, format("DELETE FROM \"%s@%d\" WHERE col = 5", tableName, oldSnapshotId)))
.hasMessage("Modifying old snapshot is not supported in Iceberg.");
.hasMessage("Modifying old snapshot is not supported in Iceberg");
assertThatThrownBy(() -> query(sessionWithLegacySyntaxSupport, format("UPDATE \"%s@%d\" SET col = 50 WHERE col = 5", tableName, oldSnapshotId)))
.hasMessage("Modifying old snapshot is not supported in Iceberg.");
assertThatThrownBy(() -> query(sessionWithLegacySyntaxSupport, format("ALTER TABLE \"%s@%d\" EXECUTE OPTIMIZE", tableName, oldSnapshotId)))
.hasMessage("Modifying old snapshot is not supported in Iceberg.");
.hasMessage("Modifying old snapshot is not supported in Iceberg");
// TODO Change to assertThatThrownBy because the syntax `table@versionid` should not be supported for DML operations
assertUpdate(sessionWithLegacySyntaxSupport, format("INSERT INTO \"%s@%d\" VALUES 7,8,9", tableName, getCurrentSnapshotId(tableName)), 3);
assertUpdate(sessionWithLegacySyntaxSupport, format("DELETE FROM \"%s@%d\" WHERE col = 9", tableName, getCurrentSnapshotId(tableName)), 1);
Expand All @@ -5126,11 +5203,11 @@ public void testCreateTableAsSelectFromVersionedTable()
// Enforce having exactly one snapshot of the table at the timestamp corresponding to `afterInsert123EpochMillis`
Thread.sleep(1);
assertUpdate("INSERT INTO " + sourceTableName + " VALUES 1, 2, 3", 3);
long afterInsert123SnapshotId = getLatestSnapshotId(sourceTableName);
long afterInsert123SnapshotId = getCurrentSnapshotId(sourceTableName);
long afterInsert123EpochMillis = getCommittedAtInEpochMilliseconds(sourceTableName, afterInsert123SnapshotId);
Thread.sleep(1);
assertUpdate("INSERT INTO " + sourceTableName + " VALUES 4, 5, 6", 3);
long afterInsert456SnapshotId = getLatestSnapshotId(sourceTableName);
long afterInsert456SnapshotId = getCurrentSnapshotId(sourceTableName);
assertUpdate("INSERT INTO " + sourceTableName + " VALUES 7, 8, 9", 3);

assertUpdate("CREATE TABLE " + snapshotVersionedSinkTableName + " AS SELECT * FROM " + sourceTableName + " FOR VERSION AS OF " + afterInsert456SnapshotId, 6);
Expand Down Expand Up @@ -5228,7 +5305,7 @@ public void testReadFromVersionedTableWithSchemaEvolution()
String tableName = "test_versioned_table_schema_evolution_" + randomTableSuffix();

assertQuerySucceeds("CREATE TABLE " + tableName + "(col1 varchar)");
long v1SnapshotId = getLatestSnapshotId(tableName);
long v1SnapshotId = getCurrentSnapshotId(tableName);
assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v1SnapshotId))
.hasOutputTypes(ImmutableList.of(VARCHAR))
.returnsEmptyResult();
Expand All @@ -5239,7 +5316,7 @@ public void testReadFromVersionedTableWithSchemaEvolution()
.returnsEmptyResult();

assertUpdate("INSERT INTO " + tableName + " VALUES ('a', 11)", 1);
long v2SnapshotId = getLatestSnapshotId(tableName);
long v2SnapshotId = getCurrentSnapshotId(tableName);
assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v2SnapshotId))
.hasOutputTypes(ImmutableList.of(VARCHAR, INTEGER))
.matches("VALUES (VARCHAR 'a', 11)");
Expand All @@ -5256,7 +5333,7 @@ public void testReadFromVersionedTableWithSchemaEvolution()
.matches("VALUES (VARCHAR 'a', 11, CAST(NULL AS bigint))");

assertUpdate("INSERT INTO " + tableName + " VALUES ('b', 22, 32)", 1);
long v3SnapshotId = getLatestSnapshotId(tableName);
long v3SnapshotId = getCurrentSnapshotId(tableName);
assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v1SnapshotId))
.hasOutputTypes(ImmutableList.of(VARCHAR))
.returnsEmptyResult();
Expand All @@ -5277,19 +5354,19 @@ public void testReadFromVersionedTableWithPartitionSpecEvolution()
{
String tableName = "test_version_table_with_partition_spec_evolution_" + randomTableSuffix();
assertUpdate("CREATE TABLE " + tableName + " (day varchar, views bigint) WITH(partitioning = ARRAY['day'])");
long v1SnapshotId = getLatestSnapshotId(tableName);
long v1SnapshotId = getCurrentSnapshotId(tableName);
long v1EpochMillis = getCommittedAtInEpochMilliseconds(tableName, v1SnapshotId);
Thread.sleep(1);

assertUpdate("INSERT INTO " + tableName + " (day, views) VALUES ('2022-06-01', 1)", 1);
long v2SnapshotId = getLatestSnapshotId(tableName);
long v2SnapshotId = getCurrentSnapshotId(tableName);
long v2EpochMillis = getCommittedAtInEpochMilliseconds(tableName, v2SnapshotId);
Thread.sleep(1);

assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN hour varchar");
assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES partitioning = ARRAY['day', 'hour']");
assertUpdate("INSERT INTO " + tableName + " (day, hour, views) VALUES ('2022-06-02', '10', 2), ('2022-06-02', '10', 3), ('2022-06-02', '11', 10)", 3);
long v3SnapshotId = getLatestSnapshotId(tableName);
long v3SnapshotId = getCurrentSnapshotId(tableName);
long v3EpochMillis = getCommittedAtInEpochMilliseconds(tableName, v3SnapshotId);

assertThat(query("SELECT sum(views), day FROM " + tableName + " GROUP BY day"))
Expand Down Expand Up @@ -5320,15 +5397,15 @@ public void testReadFromVersionedTableWithExpiredHistory()
String tableName = "test_version_table_with_expired_snapshots_" + randomTableSuffix();
Session sessionWithShortRetentionUnlocked = prepareCleanUpSession();
assertUpdate("CREATE TABLE " + tableName + " (key varchar, value integer)");
long v1SnapshotId = getLatestSnapshotId(tableName);
long v1SnapshotId = getCurrentSnapshotId(tableName);
long v1EpochMillis = getCommittedAtInEpochMilliseconds(tableName, v1SnapshotId);
Thread.sleep(1);
assertUpdate("INSERT INTO " + tableName + " VALUES ('one', 1)", 1);
long v2SnapshotId = getLatestSnapshotId(tableName);
long v2SnapshotId = getCurrentSnapshotId(tableName);
long v2EpochMillis = getCommittedAtInEpochMilliseconds(tableName, v2SnapshotId);
Thread.sleep(1);
assertUpdate("INSERT INTO " + tableName + " VALUES ('two', 2)", 1);
long v3SnapshotId = getLatestSnapshotId(tableName);
long v3SnapshotId = getCurrentSnapshotId(tableName);
long v3EpochMillis = getCommittedAtInEpochMilliseconds(tableName, v3SnapshotId);
assertThat(query("SELECT sum(value), listagg(key, ' ') WITHIN GROUP (ORDER BY key) FROM " + tableName))
.matches("VALUES (BIGINT '3', VARCHAR 'one two')");
Expand Down