Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ public class HiveMetadata
public static final String PRESTO_QUERY_ID_NAME = "presto_query_id";
public static final String BUCKETING_VERSION = "bucketing_version";
public static final String TABLE_COMMENT = "comment";
public static final String STORAGE_TABLE = "storage_table";
private static final String TRANSACTIONAL = "transactional";

private static final String ORC_BLOOM_FILTER_COLUMNS_KEY = "orc.bloom.filter.columns";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.prestosql.execution;

import com.google.common.util.concurrent.ListenableFuture;
import io.prestosql.Session;
import io.prestosql.connector.CatalogName;
import io.prestosql.execution.warnings.WarningCollector;
import io.prestosql.metadata.Metadata;
import io.prestosql.metadata.QualifiedObjectName;
import io.prestosql.security.AccessControl;
import io.prestosql.spi.PrestoException;
import io.prestosql.spi.connector.ConnectorMaterializedViewDefinition;
import io.prestosql.sql.analyzer.Analysis;
import io.prestosql.sql.analyzer.Analyzer;
import io.prestosql.sql.analyzer.FeaturesConfig;
import io.prestosql.sql.parser.SqlParser;
import io.prestosql.sql.tree.CreateMaterializedView;
import io.prestosql.sql.tree.Expression;
import io.prestosql.sql.tree.NodeRef;
import io.prestosql.sql.tree.Parameter;
import io.prestosql.sql.tree.Statement;
import io.prestosql.transaction.TransactionManager;

import javax.inject.Inject;

import java.util.List;
import java.util.Map;
import java.util.Optional;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static io.prestosql.metadata.MetadataUtil.createQualifiedObjectName;
import static io.prestosql.spi.StandardErrorCode.NOT_FOUND;
import static io.prestosql.sql.NodeUtils.mapFromProperties;
import static io.prestosql.sql.ParameterUtils.parameterExtractor;
import static io.prestosql.sql.SqlFormatterUtil.getFormattedSql;
import static java.util.Objects.requireNonNull;

public class CreateMaterializedViewTask
implements DataDefinitionTask<CreateMaterializedView>
{
private final SqlParser sqlParser;

@Inject
public CreateMaterializedViewTask(SqlParser sqlParser, FeaturesConfig featuresConfig)
{
this.sqlParser = requireNonNull(sqlParser, "sqlParser is null");
requireNonNull(featuresConfig, "featuresConfig is null");
}

@Override
public String getName()
{
return "CREATE MATERIALIZED VIEW";
}

@Override
public String explain(CreateMaterializedView statement, List<Expression> parameters)
{
return "CREATE MATERIALIZED VIEW " + statement.getName();
}

@Override
public ListenableFuture<?> execute(
CreateMaterializedView statement,
TransactionManager transactionManager,
Metadata metadata,
AccessControl accessControl,
QueryStateMachine stateMachine,
List<Expression> parameters)
{
Session session = stateMachine.getSession();
QualifiedObjectName name = createQualifiedObjectName(session, statement, statement.getName());
Map<NodeRef<Parameter>, Expression> parameterLookup = parameterExtractor(statement, parameters);

String sql = getFormattedSql(statement.getQuery(), sqlParser);

Analysis analysis = analyzeStatement(statement, session, metadata, accessControl, parameters, parameterLookup, stateMachine.getWarningCollector());

List<ConnectorMaterializedViewDefinition.Column> columns = analysis.getOutputDescriptor(statement.getQuery())
.getVisibleFields().stream()
.map(field -> new ConnectorMaterializedViewDefinition.Column(field.getName().get(), field.getType().getTypeId()))
.collect(toImmutableList());

Optional<String> owner = Optional.of(session.getUser());

CatalogName catalogName = metadata.getCatalogHandle(session, name.getCatalogName())
.orElseThrow(() -> new PrestoException(NOT_FOUND, "Catalog does not exist: " + name.getCatalogName()));

Map<String, Expression> sqlProperties = mapFromProperties(statement.getProperties());
Map<String, Object> properties = metadata.getTablePropertyManager().getProperties(
catalogName,
name.getCatalogName(),
sqlProperties,
session,
metadata,
accessControl,
parameterLookup);

ConnectorMaterializedViewDefinition definition = new ConnectorMaterializedViewDefinition(
sql,
null,
session.getCatalog(),
session.getSchema(),
columns,
statement.getComment(),
owner,
properties);

metadata.createMaterializedView(session, name, definition, statement.isReplace(), statement.isNotExists());

return immediateFuture(null);
}

private Analysis analyzeStatement(
Statement statement,
Session session,
Metadata metadata,
AccessControl accessControl,
List<Expression> parameters,
Map<NodeRef<Parameter>, Expression> parameterLookup,
WarningCollector warningCollector)
{
Analyzer analyzer = new Analyzer(session, metadata, sqlParser, accessControl, Optional.empty(), parameters, parameterLookup, warningCollector);
return analyzer.analyze(statement);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.prestosql.execution;

import com.google.common.util.concurrent.ListenableFuture;
import io.prestosql.Session;
import io.prestosql.metadata.Metadata;
import io.prestosql.metadata.QualifiedObjectName;
import io.prestosql.security.AccessControl;
import io.prestosql.spi.connector.ConnectorMaterializedViewDefinition;
import io.prestosql.sql.tree.DropMaterializedView;
import io.prestosql.sql.tree.Expression;
import io.prestosql.transaction.TransactionManager;

import java.util.List;
import java.util.Optional;

import static com.google.common.util.concurrent.Futures.immediateFuture;
import static io.prestosql.metadata.MetadataUtil.createQualifiedObjectName;
import static io.prestosql.spi.StandardErrorCode.TABLE_NOT_FOUND;
import static io.prestosql.sql.analyzer.SemanticExceptions.semanticException;

public class DropMaterializedViewTask
implements DataDefinitionTask<DropMaterializedView>
{
@Override
public String getName()
{
return "DROP MATERIALIZED VIEW";
}

@Override
public ListenableFuture<?> execute(
DropMaterializedView statement,
TransactionManager transactionManager,
Metadata metadata,
AccessControl accessControl,
QueryStateMachine stateMachine,
List<Expression> parameters)
{
Session session = stateMachine.getSession();
QualifiedObjectName name = createQualifiedObjectName(session, statement, statement.getName());

Optional<ConnectorMaterializedViewDefinition> view = metadata.getMaterializedView(session, name);
if (!view.isPresent()) {
if (!statement.isExists()) {
throw semanticException(TABLE_NOT_FOUND, statement, "View '%s' does not exist", name);
}
return immediateFuture(null);
}

accessControl.checkCanDropView(session.toSecurityContext(), name);

metadata.dropMaterializedView(session, name);

return immediateFuture(null);
}
}
38 changes: 38 additions & 0 deletions presto-main/src/main/java/io/prestosql/metadata/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@
import io.prestosql.spi.connector.ColumnHandle;
import io.prestosql.spi.connector.ColumnMetadata;
import io.prestosql.spi.connector.ConnectorCapabilities;
import io.prestosql.spi.connector.ConnectorMaterializedViewDefinition;
import io.prestosql.spi.connector.ConnectorOutputMetadata;
import io.prestosql.spi.connector.ConnectorTableMetadata;
import io.prestosql.spi.connector.ConnectorViewDefinition;
import io.prestosql.spi.connector.Constraint;
import io.prestosql.spi.connector.ConstraintApplicationResult;
import io.prestosql.spi.connector.LimitApplicationResult;
import io.prestosql.spi.connector.MaterializedViewFreshness;
import io.prestosql.spi.connector.ProjectionApplicationResult;
import io.prestosql.spi.connector.SampleType;
import io.prestosql.spi.connector.SortItem;
Expand Down Expand Up @@ -264,6 +266,21 @@ public interface Metadata
*/
Optional<ConnectorOutputMetadata> finishInsert(Session session, InsertTableHandle tableHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics);

/**
* Begin refresh materialized view query
*/
InsertTableHandle beginRefreshMaterializedView(Session session, TableHandle tableHandle);

/**
* Finish refresh materialized view query
*/
Optional<ConnectorOutputMetadata> finishRefreshMaterializedView(
Session session,
InsertTableHandle tableHandle,
Collection<Slice> fragments,
Collection<ComputedStatistics> computedStatistics,
List<TableHandle> sourceTableHandles);

/**
* Get the row ID column handle used with UpdatablePageSource.
*/
Expand Down Expand Up @@ -533,4 +550,25 @@ default ResolvedFunction getCoercion(Type fromType, Type toType)
ColumnPropertyManager getColumnPropertyManager();

AnalyzePropertyManager getAnalyzePropertyManager();

/**
* Creates the specified materialized view with the specified view definition.
*/
void createMaterializedView(Session session, QualifiedObjectName viewName, ConnectorMaterializedViewDefinition definition, boolean replace, boolean ignoreExisting);

/**
* Drops the specified materialized view.
*/
void dropMaterializedView(Session session, QualifiedObjectName viewName);

/**
* Returns the materialized view definition for the specified view name.
*/
Optional<ConnectorMaterializedViewDefinition> getMaterializedView(Session session, QualifiedObjectName viewName);

/**
* Method to get difference between the states of table at two different points in time/or as of given token-ids.
* The method is used by the engine to determine if a materialized view is current with respect to the tables it depends on.
*/
MaterializedViewFreshness getMaterializedViewFreshness(Session session, TableHandle tableHandle);
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import io.prestosql.spi.connector.ColumnMetadata;
import io.prestosql.spi.connector.ConnectorCapabilities;
import io.prestosql.spi.connector.ConnectorInsertTableHandle;
import io.prestosql.spi.connector.ConnectorMaterializedViewDefinition;
import io.prestosql.spi.connector.ConnectorMetadata;
import io.prestosql.spi.connector.ConnectorOutputMetadata;
import io.prestosql.spi.connector.ConnectorOutputTableHandle;
Expand All @@ -70,6 +71,7 @@
import io.prestosql.spi.connector.Constraint;
import io.prestosql.spi.connector.ConstraintApplicationResult;
import io.prestosql.spi.connector.LimitApplicationResult;
import io.prestosql.spi.connector.MaterializedViewFreshness;
import io.prestosql.spi.connector.ProjectionApplicationResult;
import io.prestosql.spi.connector.SampleType;
import io.prestosql.spi.connector.SchemaTableName;
Expand Down Expand Up @@ -844,6 +846,34 @@ public Optional<ConnectorOutputMetadata> finishInsert(Session session, InsertTab
return metadata.finishInsert(session.toConnectorSession(catalogName), tableHandle.getConnectorHandle(), fragments, computedStatistics);
}

@Override
public InsertTableHandle beginRefreshMaterializedView(Session session, TableHandle tableHandle)
{
CatalogName catalogName = tableHandle.getCatalogName();
CatalogMetadata catalogMetadata = getCatalogMetadataForWrite(session, catalogName);
ConnectorMetadata metadata = catalogMetadata.getMetadata();
ConnectorTransactionHandle transactionHandle = catalogMetadata.getTransactionHandleFor(catalogName);
ConnectorInsertTableHandle handle = metadata.beginRefreshMaterializedView(session.toConnectorSession(catalogName), tableHandle.getConnectorHandle());
return new InsertTableHandle(tableHandle.getCatalogName(), transactionHandle, handle);
}

@Override
public Optional<ConnectorOutputMetadata> finishRefreshMaterializedView(
Session session,
InsertTableHandle tableHandle,
Collection<Slice> fragments,
Collection<ComputedStatistics> computedStatistics,
List<TableHandle> sourceTableHandles)
{
CatalogName catalogName = tableHandle.getCatalogName();
ConnectorMetadata metadata = getMetadata(session, catalogName);
List<ConnectorTableHandle> sourceConnectorHandles = new ArrayList<>();
for (TableHandle handle : sourceTableHandles) {
sourceConnectorHandles.add(handle.getConnectorHandle());
}
return metadata.finishRefreshMaterializedView(session.toConnectorSession(catalogName), tableHandle.getConnectorHandle(), fragments, computedStatistics, sourceConnectorHandles);
}

@Override
public ColumnHandle getUpdateRowIdColumnHandle(Session session, TableHandle tableHandle)
{
Expand Down Expand Up @@ -1077,6 +1107,55 @@ public void dropView(Session session, QualifiedObjectName viewName)
metadata.dropView(session.toConnectorSession(catalogName), viewName.asSchemaTableName());
}

@Override
public void createMaterializedView(Session session, QualifiedObjectName viewName, ConnectorMaterializedViewDefinition definition, boolean replace, boolean ignoreExisting)
{
CatalogMetadata catalogMetadata = getCatalogMetadataForWrite(session, viewName.getCatalogName());
CatalogName catalogName = catalogMetadata.getCatalogName();
ConnectorMetadata metadata = catalogMetadata.getMetadata();

metadata.createMaterializedView(session.toConnectorSession(catalogName), viewName.asSchemaTableName(), definition, replace, ignoreExisting);
}

@Override
public void dropMaterializedView(Session session, QualifiedObjectName viewName)
{
CatalogMetadata catalogMetadata = getCatalogMetadataForWrite(session, viewName.getCatalogName());
CatalogName catalogName = catalogMetadata.getCatalogName();
ConnectorMetadata metadata = catalogMetadata.getMetadata();

metadata.dropMaterializedView(session.toConnectorSession(catalogName), viewName.asSchemaTableName());
}

@Override
public Optional<ConnectorMaterializedViewDefinition> getMaterializedView(Session session, QualifiedObjectName viewName)
{
if (viewName.getCatalogName().isEmpty() || viewName.getSchemaName().isEmpty() || viewName.getObjectName().isEmpty()) {
// View cannot exist
return Optional.empty();
}

Optional<CatalogMetadata> catalog = getOptionalCatalogMetadata(session, viewName.getCatalogName());
if (catalog.isPresent()) {
CatalogMetadata catalogMetadata = catalog.get();
CatalogName catalogName = catalogMetadata.getConnectorId(session, viewName);
ConnectorMetadata metadata = catalogMetadata.getMetadataFor(catalogName);

ConnectorSession connectorSession = session.toConnectorSession(catalogName);
return metadata.getMaterializedView(connectorSession, viewName.asSchemaTableName());
}
return Optional.empty();
}

@Override
public MaterializedViewFreshness getMaterializedViewFreshness(Session session, TableHandle tableHandle)
{
CatalogName catalogName = tableHandle.getCatalogName();
CatalogMetadata catalogMetadata = getCatalogMetadata(session, catalogName);
ConnectorMetadata metadata = catalogMetadata.getMetadataFor(catalogName);
return metadata.getMaterializedViewFreshness(session.toConnectorSession(catalogName), tableHandle.getConnectorHandle());
}

@Override
public Optional<ResolvedIndex> resolveIndex(Session session, TableHandle tableHandle, Set<ColumnHandle> indexableColumns, Set<ColumnHandle> outputColumns, TupleDomain<ColumnHandle> tupleDomain)
{
Expand Down
Loading