Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ private static CoralTableRedirectionResolver coralTableRedirectionResolver(
});
}

public static final String ICEBERG_MATERIALIZED_VIEW_COMMENT = "Presto Materialized View";
public static final String PRESTO_VIEW_FLAG = "presto_view";
static final String VIEW_PREFIX = "/* Presto View: ";
static final String VIEW_SUFFIX = " */";
Expand All @@ -145,6 +146,11 @@ public static boolean isHiveOrPrestoView(String tableType)
return tableType.equals(TableType.VIRTUAL_VIEW.name());
}

public static boolean isTrinoMaterializedView(String tableType, Map<String, String> tableParameters)
{
return isHiveOrPrestoView(tableType) && isPrestoView(tableParameters) && tableParameters.get(TABLE_COMMENT).equalsIgnoreCase(ICEBERG_MATERIALIZED_VIEW_COMMENT);
}

public static boolean canDecodeView(Table table)
{
// we can decode Hive or Presto view
Expand Down
2 changes: 2 additions & 0 deletions plugin/trino-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,7 @@
<exclude>**/TestTrinoGlueCatalogTest.java</exclude>
<exclude>**/TestSharedGlueMetastore.java</exclude>
<exclude>**/TestIcebergGlueCatalogAccessOperations.java</exclude>
<exclude>**/TestIcebergGlueCatalogMaterializedViewTest.java</exclude>
</excludes>
</configuration>
</plugin>
Expand Down Expand Up @@ -455,6 +456,7 @@
<include>**/TestTrinoGlueCatalogTest.java</include>
<include>**/TestSharedGlueMetastore.java</include>
<include>**/TestIcebergGlueCatalogAccessOperations.java</include>
<include>**/TestIcebergGlueCatalogMaterializedViewTest.java</include>
</includes>
</configuration>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,25 @@
package io.trino.plugin.iceberg.catalog;

import com.google.common.collect.ImmutableMap;
import io.trino.plugin.base.CatalogName;
import io.trino.plugin.hive.HdfsEnvironment;
import io.trino.plugin.hive.HiveMetadata;
import io.trino.plugin.hive.HiveViewNotSupportedException;
import io.trino.plugin.hive.ViewReaderUtil;
import io.trino.plugin.iceberg.ColumnIdentity;
import io.trino.plugin.iceberg.IcebergMaterializedViewDefinition;
import io.trino.plugin.iceberg.IcebergUtil;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.CatalogSchemaTableName;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ConnectorMaterializedViewDefinition;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.ConnectorViewDefinition;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.type.TypeManager;
import net.jodah.failsafe.Failsafe;
import net.jodah.failsafe.RetryPolicy;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.PartitionSpec;
Expand All @@ -33,20 +43,33 @@
import org.apache.iceberg.Transaction;

import java.io.IOException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Throwables.throwIfUnchecked;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.plugin.hive.HiveMetadata.STORAGE_TABLE;
import static io.trino.plugin.hive.HiveMetadata.TABLE_COMMENT;
import static io.trino.plugin.hive.ViewReaderUtil.ICEBERG_MATERIALIZED_VIEW_COMMENT;
import static io.trino.plugin.hive.ViewReaderUtil.PRESTO_VIEW_FLAG;
import static io.trino.plugin.hive.ViewReaderUtil.isHiveOrPrestoView;
import static io.trino.plugin.hive.ViewReaderUtil.isPrestoView;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR;
import static io.trino.plugin.iceberg.IcebergMaterializedViewDefinition.decodeMaterializedViewData;
import static io.trino.plugin.iceberg.IcebergTableProperties.FILE_FORMAT_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.PARTITIONING_PROPERTY;
import static io.trino.plugin.iceberg.PartitionFields.toPartitionFields;
import static io.trino.spi.StandardErrorCode.TABLE_NOT_FOUND;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.UUID.randomUUID;
import static org.apache.iceberg.TableMetadata.newTableMetadata;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
import static org.apache.iceberg.Transactions.createTableTransaction;

public abstract class AbstractTrinoCatalog
Expand All @@ -60,15 +83,21 @@ public abstract class AbstractTrinoCatalog
protected static final String PRESTO_QUERY_ID_NAME = HiveMetadata.PRESTO_QUERY_ID_NAME;
protected static final String PRESTO_VIEW_EXPANDED_TEXT_MARKER = HiveMetadata.PRESTO_VIEW_EXPANDED_TEXT_MARKER;

private final CatalogName catalogName;
private final TypeManager typeManager;
protected final IcebergTableOperationsProvider tableOperationsProvider;
private final String trinoVersion;
private final boolean useUniqueTableLocation;

protected AbstractTrinoCatalog(
CatalogName catalogName,
TypeManager typeManager,
IcebergTableOperationsProvider tableOperationsProvider,
String trinoVersion,
boolean useUniqueTableLocation)
{
this.catalogName = requireNonNull(catalogName, "catalogName is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.tableOperationsProvider = requireNonNull(tableOperationsProvider, "tableOperationsProvider is null");
this.trinoVersion = requireNonNull(trinoVersion, "trinoVersion is null");
this.useUniqueTableLocation = useUniqueTableLocation;
Expand Down Expand Up @@ -113,6 +142,25 @@ public Map<SchemaTableName, ConnectorViewDefinition> getViews(ConnectorSession s
return views.buildOrThrow();
}

@Override
public Optional<ConnectorMaterializedViewDefinition> getMaterializedView(ConnectorSession session, SchemaTableName schemaViewName)
{
try {
return Failsafe.with(new RetryPolicy<>()
.withMaxAttempts(10)
.withBackoff(1, 5_000, ChronoUnit.MILLIS, 4)
.withMaxDuration(Duration.ofSeconds(30))
.abortOn(failure -> !(failure instanceof MaterializedViewMayBeBeingRemovedException)))
.get(() -> doGetMaterializedView(session, schemaViewName));
}
catch (MaterializedViewMayBeBeingRemovedException e) {
throwIfUnchecked(e.getCause());
throw new RuntimeException(e.getCause());
}
}

protected abstract Optional<ConnectorMaterializedViewDefinition> doGetMaterializedView(ConnectorSession session, SchemaTableName schemaViewName);

protected Transaction newCreateTableTransaction(
ConnectorSession session,
SchemaTableName schemaTableName,
Expand Down Expand Up @@ -206,4 +254,71 @@ protected Map<String, String> createViewProperties(ConnectorSession session)
.put(TABLE_COMMENT, PRESTO_VIEW_COMMENT)
.buildOrThrow();
}

protected SchemaTableName createMaterializedViewStorageTable(ConnectorSession session, SchemaTableName viewName, ConnectorMaterializedViewDefinition definition)
{
// Generate a storage table name and create a storage table. The properties in the definition are table properties for the
// storage table as indicated in the materialized view definition.
String storageTableName = "st_" + randomUUID().toString().replace("-", "");
Map<String, Object> storageTableProperties = new HashMap<>(definition.getProperties());
storageTableProperties.putIfAbsent(FILE_FORMAT_PROPERTY, DEFAULT_FILE_FORMAT_DEFAULT);

SchemaTableName storageTable = new SchemaTableName(viewName.getSchemaName(), storageTableName);
List<ColumnMetadata> columns = definition.getColumns().stream()
.map(column -> new ColumnMetadata(column.getName(), typeManager.getType(column.getType())))
.collect(toImmutableList());

ConnectorTableMetadata tableMetadata = new ConnectorTableMetadata(storageTable, columns, storageTableProperties, Optional.empty());
Transaction transaction = IcebergUtil.newCreateTableTransaction(this, tableMetadata, session);
transaction.newAppend().commit();
transaction.commitTransaction();
return storageTable;
}

protected ConnectorMaterializedViewDefinition getMaterializedViewDefinition(
SchemaTableName viewName,
Table icebergTable,
Optional<String> owner,
String viewOriginalText,
String storageTableName)
{
ImmutableMap.Builder<String, Object> properties = ImmutableMap.builder();
properties.put(FILE_FORMAT_PROPERTY, IcebergUtil.getFileFormat(icebergTable));
if (!icebergTable.spec().fields().isEmpty()) {
properties.put(PARTITIONING_PROPERTY, toPartitionFields(icebergTable.spec()));
}

IcebergMaterializedViewDefinition definition = decodeMaterializedViewData(viewOriginalText);
return new ConnectorMaterializedViewDefinition(
definition.getOriginalSql(),
Optional.of(new CatalogSchemaTableName(catalogName.toString(), new SchemaTableName(viewName.getSchemaName(), storageTableName))),
definition.getCatalog(),
definition.getSchema(),
definition.getColumns().stream()
.map(column -> new ConnectorMaterializedViewDefinition.Column(column.getName(), column.getType()))
.collect(toImmutableList()),
definition.getComment(),
owner,
properties.buildOrThrow());
}

protected Map<String, String> createMaterializedViewProperties(ConnectorSession session, String storageTableName)
{
return ImmutableMap.<String, String>builder()
.put(PRESTO_QUERY_ID_NAME, session.getQueryId())
.put(STORAGE_TABLE, storageTableName)
.put(PRESTO_VIEW_FLAG, "true")
.put(TRINO_CREATED_BY, TRINO_CREATED_BY_VALUE)
.put(TABLE_COMMENT, ICEBERG_MATERIALIZED_VIEW_COMMENT)
.buildOrThrow();
}

protected static class MaterializedViewMayBeBeingRemovedException
extends RuntimeException
{
public MaterializedViewMayBeBeingRemovedException(Throwable cause)
{
super(requireNonNull(cause, "cause is null"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,14 +108,14 @@ Transaction newCreateTableTransaction(

void createMaterializedView(
ConnectorSession session,
SchemaTableName schemaViewName,
SchemaTableName viewName,
ConnectorMaterializedViewDefinition definition,
boolean replace,
boolean ignoreExisting);

void dropMaterializedView(ConnectorSession session, SchemaTableName schemaViewName);
void dropMaterializedView(ConnectorSession session, SchemaTableName viewName);

Optional<ConnectorMaterializedViewDefinition> getMaterializedView(ConnectorSession session, SchemaTableName schemaViewName);
Optional<ConnectorMaterializedViewDefinition> getMaterializedView(ConnectorSession session, SchemaTableName viewName);

void renameMaterializedView(ConnectorSession session, SchemaTableName source, SchemaTableName target);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Optional;

import static io.trino.plugin.hive.HiveMetadata.PRESTO_VIEW_EXPANDED_TEXT_MARKER;
import static io.trino.plugin.hive.ViewReaderUtil.ICEBERG_MATERIALIZED_VIEW_COMMENT;
import static org.apache.hadoop.hive.metastore.TableType.EXTERNAL_TABLE;
import static org.apache.hadoop.hive.metastore.TableType.VIRTUAL_VIEW;

Expand Down Expand Up @@ -48,4 +49,15 @@ public static TableInput getViewTableInput(String viewName, String viewOriginalT
.withOwner(owner)
.withParameters(parameters);
}

public static TableInput getMaterializedViewTableInput(String viewName, String viewOriginalText, String owner, Map<String, String> parameters)
{
return new TableInput()
.withName(viewName)
.withTableType(VIRTUAL_VIEW.name())
.withViewOriginalText(viewOriginalText)
.withViewExpandedText(ICEBERG_MATERIALIZED_VIEW_COMMENT)
.withOwner(owner)
.withParameters(parameters);
}
}
Loading