diff --git a/core/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.java b/core/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.java index 73b9cbe9580e..b1d4bef90fbb 100644 --- a/core/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.java +++ b/core/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.java @@ -1584,7 +1584,8 @@ private Optional resolveTableFunction(TableFunctionInvoca name.getSchemaFunctionName().getFunctionName())); return Optional.of(new TableFunctionMetadata(catalogHandle, resolved.get())); } - } return Optional.empty(); + } + return Optional.empty(); } private Map analyzeArguments(Node node, List argumentSpecifications, List arguments) diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakePageSink.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakePageSink.java index b1b31be42b97..845967fd2638 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakePageSink.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakePageSink.java @@ -43,6 +43,7 @@ import java.util.Collection; import java.util.List; import java.util.Optional; +import java.util.concurrent.TimeUnit; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.io.MoreFiles.deleteRecursively; @@ -99,7 +100,7 @@ public void testPageSinkStats() } } Page page = pageBuilder.build(); - pageSink.appendPage(page); + pageSink.appendPage(page).get(10, TimeUnit.SECONDS); JsonCodec dataFileInfoCodec = new JsonCodecFactory().jsonCodec(DataFileInfo.class); Collection fragments = getFutureValue(pageSink.finish()); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 2c9fbae6128d..5be3a59cd13a 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -2078,8 +2078,7 @@ public Optional finishRefreshMaterializedView( String dependencies = sourceTableHandles.stream() .map(handle -> (IcebergTableHandle) handle) - .filter(handle -> handle.getSnapshotId().isPresent()) - .map(handle -> handle.getSchemaTableName() + "=" + handle.getSnapshotId().get()) + .map(handle -> handle.getSchemaTableName() + "=" + handle.getSnapshotId().map(Object.class::cast).orElse("")) .distinct() .collect(joining(",")); @@ -2168,12 +2167,24 @@ public boolean isTableCurrent(ConnectorSession session, ConnectorTableHandle tab @Override public MaterializedViewFreshness getMaterializedViewFreshness(ConnectorSession session, SchemaTableName materializedViewName) { - Map> refreshStateMap = getMaterializedViewToken(session, materializedViewName); - if (refreshStateMap.isEmpty()) { + Optional materializedViewDefinition = getMaterializedView(session, materializedViewName); + if (materializedViewDefinition.isEmpty()) { + // View not found, might have been concurrently deleted return new MaterializedViewFreshness(false); } - for (Map.Entry> entry : refreshStateMap.entrySet()) { + SchemaTableName storageTableName = materializedViewDefinition.get().getStorageTable() + .map(CatalogSchemaTableName::getSchemaTableName) + .orElseThrow(() -> new IllegalStateException("Storage table missing in definition of materialized view " + materializedViewName)); + + Table icebergTable = catalog.loadTable(session, storageTableName); + String dependsOnTables = icebergTable.currentSnapshot().summary().getOrDefault(DEPENDS_ON_TABLES, ""); + if (dependsOnTables.isEmpty()) { + // Information missing + return new MaterializedViewFreshness(false); + } + Map tableToSnapshotIdMap = Splitter.on(',').withKeyValueSeparator('=').split(dependsOnTables); + for (Map.Entry entry : tableToSnapshotIdMap.entrySet()) { List strings = Splitter.on(".").splitToList(entry.getKey()); if (strings.size() == 3) { strings = strings.subList(1, 3); @@ -2189,7 +2200,14 @@ else if (strings.size() != 2) { if (tableHandle == null) { throw new MaterializedViewNotFoundException(materializedViewName); } - if (!isTableCurrent(session, tableHandle, entry.getValue())) { + Optional tableToken; + if (entry.getValue().isEmpty()) { + tableToken = Optional.empty(); + } + else { + tableToken = Optional.of(new TableToken(Long.parseLong(entry.getValue()))); + } + if (!isTableCurrent(session, tableHandle, tableToken)) { return new MaterializedViewFreshness(false); } } @@ -2214,29 +2232,6 @@ public void setColumnComment(ConnectorSession session, ConnectorTableHandle tabl catalog.updateColumnComment(session, ((IcebergTableHandle) tableHandle).getSchemaTableName(), ((IcebergColumnHandle) column).getColumnIdentity(), comment); } - private Map> getMaterializedViewToken(ConnectorSession session, SchemaTableName name) - { - Map> viewToken = new HashMap<>(); - Optional materializedViewDefinition = getMaterializedView(session, name); - if (materializedViewDefinition.isEmpty()) { - return viewToken; - } - - SchemaTableName storageTableName = materializedViewDefinition.get().getStorageTable() - .map(CatalogSchemaTableName::getSchemaTableName) - .orElseThrow(() -> new IllegalStateException("Storage table missing in definition of materialized view " + name)); - - Table icebergTable = catalog.loadTable(session, storageTableName); - String dependsOnTables = icebergTable.currentSnapshot().summary().getOrDefault(DEPENDS_ON_TABLES, ""); - if (!dependsOnTables.isEmpty()) { - Map tableToSnapshotIdMap = Splitter.on(',').withKeyValueSeparator('=').split(dependsOnTables); - for (Map.Entry entry : tableToSnapshotIdMap.entrySet()) { - viewToken.put(entry.getKey(), Optional.of(new TableToken(Long.parseLong(entry.getValue())))); - } - } - return viewToken; - } - @Override public Optional redirectTable(ConnectorSession session, SchemaTableName tableName) {