Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@
import io.trino.spi.connector.ConstraintApplicationResult;
import io.trino.spi.connector.DiscretePredicates;
import io.trino.spi.connector.MaterializedViewFreshness;
import io.trino.spi.connector.MaterializedViewNotFoundException;
import io.trino.spi.connector.ProjectionApplicationResult;
import io.trino.spi.connector.RetryMode;
import io.trino.spi.connector.RowChangeParadigm;
Expand Down Expand Up @@ -2504,25 +2503,6 @@ public void renameMaterializedView(ConnectorSession session, SchemaTableName sou
catalog.renameMaterializedView(session, source, target);
}

public Optional<TableToken> getTableToken(ConnectorSession session, ConnectorTableHandle tableHandle)
{
IcebergTableHandle table = (IcebergTableHandle) tableHandle;
Table icebergTable = catalog.loadTable(session, table.getSchemaTableName());
return Optional.ofNullable(icebergTable.currentSnapshot())
.map(snapshot -> new TableToken(snapshot.snapshotId()));
}

public boolean isTableCurrent(ConnectorSession session, ConnectorTableHandle tableHandle, Optional<TableToken> tableToken)
{
Optional<TableToken> currentToken = getTableToken(session, tableHandle);

if (tableToken.isEmpty() || currentToken.isEmpty()) {
return false;
}

return tableToken.get().getSnapshotId() == currentToken.get().getSnapshotId();
}

@Override
public MaterializedViewFreshness getMaterializedViewFreshness(ConnectorSession session, SchemaTableName materializedViewName)
{
Expand Down Expand Up @@ -2569,22 +2549,33 @@ else if (strings.size() != 2) {
IcebergTableHandle tableHandle = getTableHandle(session, schemaTableName, Optional.empty(), Optional.empty());

if (tableHandle == null) {
throw new MaterializedViewNotFoundException(materializedViewName);
// Base table is gone
return new MaterializedViewFreshness(STALE);
}
Optional<TableToken> tableToken;
Optional<Long> snapshotAtRefresh;
if (value.isEmpty()) {
tableToken = Optional.empty();
snapshotAtRefresh = Optional.empty();
}
else {
tableToken = Optional.of(new TableToken(Long.parseLong(value)));
snapshotAtRefresh = Optional.of(Long.parseLong(value));
}
if (!isTableCurrent(session, tableHandle, tableToken)) {
if (!isSnapshotCurrent(session, tableHandle, snapshotAtRefresh)) {
return new MaterializedViewFreshness(STALE);
}
}
return new MaterializedViewFreshness(hasUnknownTables ? UNKNOWN : FRESH);
}

private boolean isSnapshotCurrent(ConnectorSession session, IcebergTableHandle table, Optional<Long> snapshotId)
{
Table icebergTable = catalog.loadTable(session, table.getSchemaTableName());
Snapshot currentSnapshot = icebergTable.currentSnapshot();
if (snapshotId.isEmpty() || currentSnapshot == null) {
return false;
}
return snapshotId.get() == currentSnapshot.snapshotId();
}

@Override
public boolean supportsReportingWrittenBytes(ConnectorSession session, ConnectorTableHandle connectorTableHandle)
{
Expand Down Expand Up @@ -2614,20 +2605,4 @@ private void beginTransaction(Table icebergTable)
verify(transaction == null, "transaction already set");
transaction = icebergTable.newTransaction();
}

private static class TableToken
{
// Current Snapshot ID of the table
private final long snapshotId;

public TableToken(long snapshotId)
{
this.snapshotId = snapshotId;
}

public long getSnapshotId()
{
return snapshotId;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,13 @@ public void testMaterializedView()
.hasMessage("createMaterializedView is not supported for Iceberg JDBC catalogs");
}

@Override
public void testMaterializedViewBaseTableGone(boolean initialized)
{
assertThatThrownBy(() -> super.testMaterializedViewBaseTableGone(initialized))
.hasMessage("createMaterializedView is not supported for Iceberg JDBC catalogs");
}

@Test(dataProvider = "testColumnNameDataProvider")
@Override
public void testMaterializedViewColumnName(String columnName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1220,6 +1220,28 @@ public void testFederatedMaterializedView()
});
}

@Test(dataProviderClass = DataProviders.class, dataProvider = "trueFalse")
public void testMaterializedViewBaseTableGone(boolean initialized)
{
skipTestUnless(hasBehavior(SUPPORTS_CREATE_MATERIALIZED_VIEW));

String catalog = getSession().getCatalog().orElseThrow();
String schema = getSession().getSchema().orElseThrow();
String viewName = "mv_base_table_missing_" + randomNameSuffix();
String baseTable = "mv_base_table_missing_the_table_" + randomNameSuffix();

assertUpdate("CREATE TABLE " + baseTable + " AS SELECT 1 a", 1);
assertUpdate("CREATE MATERIALIZED VIEW " + viewName + " AS SELECT * FROM " + baseTable);
if (initialized) {
assertUpdate("REFRESH MATERIALIZED VIEW " + viewName, 1);
}
assertUpdate("DROP TABLE " + baseTable);
assertQueryFails(
"TABLE " + viewName,
"line 1:1: Failed analyzing stored view '%1$s\\.%2$s\\.%3$s': line 3:3: Table '%1$s\\.%2$s\\.%4$s' does not exist".formatted(catalog, schema, viewName, baseTable));
assertUpdate("DROP MATERIALIZED VIEW " + viewName);
}

@Test
public void testCompatibleTypeChangeForView()
{
Expand Down