Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@
import org.apache.iceberg.UpdatePartitionSpec;
import org.apache.iceberg.UpdateProperties;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.exceptions.NoSuchViewException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
Expand Down Expand Up @@ -193,7 +192,6 @@
import static com.facebook.presto.iceberg.IcebergUtil.getSnapshotIdTimeOperator;
import static com.facebook.presto.iceberg.IcebergUtil.getSortFields;
import static com.facebook.presto.iceberg.IcebergUtil.getTableComment;
import static com.facebook.presto.iceberg.IcebergUtil.getViewComment;
import static com.facebook.presto.iceberg.IcebergUtil.resolveSnapshotIdByName;
import static com.facebook.presto.iceberg.IcebergUtil.toHiveColumns;
import static com.facebook.presto.iceberg.IcebergUtil.tryGetLocation;
Expand Down Expand Up @@ -320,7 +318,7 @@ protected final Table getIcebergTable(ConnectorSession session, SchemaTableName

protected abstract Table getRawIcebergTable(ConnectorSession session, SchemaTableName schemaTableName);

protected abstract View getIcebergView(ConnectorSession session, SchemaTableName schemaTableName);
protected abstract Optional<IcebergViewMetadata> getViewMetadata(ConnectorSession session, SchemaTableName viewName);

protected abstract void createIcebergView(
ConnectorSession session,
Expand Down Expand Up @@ -540,13 +538,9 @@ protected ConnectorTableMetadata getTableOrViewMetadata(ConnectorSession session
// Considering that the Iceberg library does not provide an efficient way to determine whether
// it's a view or a table without loading it, we first try to load it as a table directly, and then
// try to load it as a view when getting an `NoSuchTableException`. This will be more efficient.
try {
View icebergView = getIcebergView(session, schemaTableName);
return new ConnectorTableMetadata(table, getColumnMetadata(session, icebergView), createViewMetadataProperties(icebergView), getViewComment(icebergView));
}
catch (NoSuchViewException noSuchViewException) {
throw new TableNotFoundException(schemaTableName);
}
return getViewMetadata(session, schemaTableName)
.map(IcebergViewMetadata::getTableMetadata)
.orElseThrow(() -> new TableNotFoundException(schemaTableName));
}
}

Expand Down Expand Up @@ -1613,112 +1607,98 @@ public void createMaterializedView(
@Override
public List<SchemaTableName> listMaterializedViews(ConnectorSession session, String schemaName)
{
ImmutableList.Builder<SchemaTableName> materializedViews = ImmutableList.builder();

List<SchemaTableName> views = listViews(session, Optional.of(schemaName));

for (SchemaTableName viewName : views) {
View icebergView = getIcebergView(session, viewName);
Map<String, String> properties = icebergView.properties();
if (properties.containsKey(PRESTO_MATERIALIZED_VIEW_FORMAT_VERSION)) {
materializedViews.add(viewName);
}
}

return materializedViews.build();
return views.stream()
.filter(viewName -> getViewMetadata(session, viewName)
.map(IcebergViewMetadata::isMaterializedView)
.orElse(false))
.collect(toImmutableList());
}

@Override
public Optional<MaterializedViewDefinition> getMaterializedView(ConnectorSession session, SchemaTableName viewName)
{
try {
View icebergView = getIcebergView(session, viewName);
Optional<IcebergViewMetadata> viewMetadata = getViewMetadata(session, viewName);
if (!viewMetadata.isPresent() || !viewMetadata.get().isMaterializedView()) {
return Optional.empty();
}

Map<String, String> viewProperties = icebergView.properties();
String originalSql = viewProperties.get(PRESTO_MATERIALIZED_VIEW_ORIGINAL_SQL);
Map<String, String> viewProperties = viewMetadata.get().getProperties();
String originalSql = viewProperties.get(PRESTO_MATERIALIZED_VIEW_ORIGINAL_SQL);

if (originalSql == null) {
return Optional.empty();
}
if (originalSql == null) {
return Optional.empty();
}

// Validate format version
String formatVersion = getRequiredMaterializedViewProperty(viewProperties, PRESTO_MATERIALIZED_VIEW_FORMAT_VERSION);
int version;
try {
version = Integer.parseInt(formatVersion);
}
catch (NumberFormatException e) {
throw new PrestoException(ICEBERG_INVALID_MATERIALIZED_VIEW,
format("Invalid materialized view format version: %s", formatVersion));
}
// Validate format version
String formatVersion = getRequiredMaterializedViewProperty(viewProperties, PRESTO_MATERIALIZED_VIEW_FORMAT_VERSION);
int version;
try {
version = Integer.parseInt(formatVersion);
}
catch (NumberFormatException e) {
throw new PrestoException(ICEBERG_INVALID_MATERIALIZED_VIEW,
format("Invalid materialized view format version: %s", formatVersion));
}

if (version != CURRENT_MATERIALIZED_VIEW_FORMAT_VERSION) {
throw new PrestoException(ICEBERG_INVALID_MATERIALIZED_VIEW,
format("Materialized view format version %d is not supported by this version of Presto (current version: %d). Please upgrade Presto.",
version, CURRENT_MATERIALIZED_VIEW_FORMAT_VERSION));
}
if (version != CURRENT_MATERIALIZED_VIEW_FORMAT_VERSION) {
throw new PrestoException(ICEBERG_INVALID_MATERIALIZED_VIEW,
format("Materialized view format version %d is not supported by this version of Presto (current version: %d). Please upgrade Presto.",
version, CURRENT_MATERIALIZED_VIEW_FORMAT_VERSION));
}

String baseTablesStr = getRequiredMaterializedViewProperty(viewProperties, PRESTO_MATERIALIZED_VIEW_BASE_TABLES);
List<SchemaTableName> baseTables;
if (baseTablesStr.isEmpty()) {
baseTables = ImmutableList.of();
}
else {
baseTables = deserializeSchemaTableNames(baseTablesStr);
}
String baseTablesStr = getRequiredMaterializedViewProperty(viewProperties, PRESTO_MATERIALIZED_VIEW_BASE_TABLES);
List<SchemaTableName> baseTables;
if (baseTablesStr.isEmpty()) {
baseTables = ImmutableList.of();
}
else {
baseTables = deserializeSchemaTableNames(baseTablesStr);
}

String columnMappingsJson = getRequiredMaterializedViewProperty(viewProperties, PRESTO_MATERIALIZED_VIEW_COLUMN_MAPPINGS);
List<ColumnMapping> columnMappings = deserializeColumnMappings(columnMappingsJson);
String columnMappingsJson = getRequiredMaterializedViewProperty(viewProperties, PRESTO_MATERIALIZED_VIEW_COLUMN_MAPPINGS);
List<ColumnMapping> columnMappings = deserializeColumnMappings(columnMappingsJson);

String storageSchema = getRequiredMaterializedViewProperty(viewProperties, PRESTO_MATERIALIZED_VIEW_STORAGE_SCHEMA);
String storageTableName = getRequiredMaterializedViewProperty(viewProperties, PRESTO_MATERIALIZED_VIEW_STORAGE_TABLE_NAME);
String storageSchema = getRequiredMaterializedViewProperty(viewProperties, PRESTO_MATERIALIZED_VIEW_STORAGE_SCHEMA);
String storageTableName = getRequiredMaterializedViewProperty(viewProperties, PRESTO_MATERIALIZED_VIEW_STORAGE_TABLE_NAME);

String owner = getRequiredMaterializedViewProperty(viewProperties, PRESTO_MATERIALIZED_VIEW_OWNER);
ViewSecurity securityMode;
try {
securityMode = ViewSecurity.valueOf(getRequiredMaterializedViewProperty(viewProperties, PRESTO_MATERIALIZED_VIEW_SECURITY_MODE));
}
catch (IllegalArgumentException | NullPointerException e) {
throw new PrestoException(ICEBERG_INVALID_MATERIALIZED_VIEW, "Invalid or missing materialized view security mode");
}
String owner = getRequiredMaterializedViewProperty(viewProperties, PRESTO_MATERIALIZED_VIEW_OWNER);
ViewSecurity securityMode;
try {
securityMode = ViewSecurity.valueOf(getRequiredMaterializedViewProperty(viewProperties, PRESTO_MATERIALIZED_VIEW_SECURITY_MODE));
}
catch (IllegalArgumentException | NullPointerException e) {
throw new PrestoException(ICEBERG_INVALID_MATERIALIZED_VIEW, "Invalid or missing materialized view security mode");
}

// Parse staleness config - staleness window defaults to 0s if behavior is set
Optional<MaterializedViewStaleReadBehavior> staleReadBehavior = getOptionalEnumProperty(
viewProperties, PRESTO_MATERIALIZED_VIEW_STALE_READ_BEHAVIOR, MaterializedViewStaleReadBehavior.class);
Optional<Duration> stalenessWindow = getOptionalDurationProperty(viewProperties, PRESTO_MATERIALIZED_VIEW_STALENESS_WINDOW);
// Parse staleness config - staleness window defaults to 0s if behavior is set
Optional<MaterializedViewStaleReadBehavior> staleReadBehavior = getOptionalEnumProperty(
viewProperties, PRESTO_MATERIALIZED_VIEW_STALE_READ_BEHAVIOR, MaterializedViewStaleReadBehavior.class);
Optional<Duration> stalenessWindow = getOptionalDurationProperty(viewProperties, PRESTO_MATERIALIZED_VIEW_STALENESS_WINDOW);

Optional<MaterializedViewStalenessConfig> stalenessConfig = Optional.empty();
if (staleReadBehavior.isPresent()) {
stalenessConfig = Optional.of(new MaterializedViewStalenessConfig(
staleReadBehavior.get(),
stalenessWindow.orElse(new Duration(0, TimeUnit.SECONDS))));
}
Optional<MaterializedViewStalenessConfig> stalenessConfig = Optional.empty();
if (staleReadBehavior.isPresent()) {
stalenessConfig = Optional.of(new MaterializedViewStalenessConfig(
staleReadBehavior.get(),
stalenessWindow.orElse(new Duration(0, TimeUnit.SECONDS))));
}

Optional<MaterializedViewRefreshType> refreshType = getOptionalEnumProperty(
viewProperties, PRESTO_MATERIALIZED_VIEW_REFRESH_TYPE, MaterializedViewRefreshType.class);
Optional<MaterializedViewRefreshType> refreshType = getOptionalEnumProperty(
viewProperties, PRESTO_MATERIALIZED_VIEW_REFRESH_TYPE, MaterializedViewRefreshType.class);

return Optional.of(new MaterializedViewDefinition(
originalSql,
storageSchema,
storageTableName,
baseTables,
Optional.of(owner),
Optional.of(securityMode),
columnMappings,
ImmutableList.of(),
Optional.empty(),
stalenessConfig,
refreshType));
}
catch (NoSuchViewException e) {
return Optional.empty();
}
catch (PrestoException e) {
if (e.getErrorCode() == NOT_SUPPORTED.toErrorCode()) {
return Optional.empty();
}
throw e;
}
return Optional.of(new MaterializedViewDefinition(
originalSql,
storageSchema,
storageTableName,
baseTables,
Optional.of(owner),
Optional.of(securityMode),
columnMappings,
ImmutableList.of(),
Optional.empty(),
stalenessConfig,
refreshType));
}

@Override
Expand Down Expand Up @@ -1749,8 +1729,12 @@ public MaterializedViewStatus getMaterializedViewStatus(
return new MaterializedViewStatus(NOT_MATERIALIZED, ImmutableMap.of());
}

View icebergView = getIcebergView(session, materializedViewName);
Map<String, String> props = icebergView.properties();
Optional<IcebergViewMetadata> viewMetadata = getViewMetadata(session, materializedViewName);
if (!viewMetadata.isPresent()) {
throw new PrestoException(ICEBERG_INVALID_MATERIALIZED_VIEW,
format("Materialized view metadata not found for %s", materializedViewName));
}
Comment thread
hantangwangd marked this conversation as resolved.
Map<String, String> props = viewMetadata.get().getProperties();
String lastRefreshSnapshotStr = props.get(PRESTO_MATERIALIZED_VIEW_LAST_REFRESH_SNAPSHOT_ID);
if (lastRefreshSnapshotStr == null) {
return new MaterializedViewStatus(NOT_MATERIALIZED, ImmutableMap.of());
Expand Down Expand Up @@ -1967,12 +1951,6 @@ private static Optional<Duration> getOptionalDurationProperty(Map<String, String

private boolean viewExists(ConnectorSession session, ConnectorTableMetadata viewMetadata)
{
try {
getIcebergView(session, viewMetadata.getTable());
return true;
}
catch (NoSuchViewException e) {
return false;
}
return getViewMetadata(session, viewMetadata.getTable()).isPresent();
}
}
Loading
Loading