Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,12 @@
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.LongTimestampWithTimeZone;

import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;

import static com.google.common.collect.MoreCollectors.onlyElement;
import static com.google.common.collect.Streams.mapWithIndex;
import static io.trino.connector.system.jdbc.FilterUtil.isImpossibleObjectName;
import static io.trino.connector.system.jdbc.FilterUtil.tablePrefix;
import static io.trino.connector.system.jdbc.FilterUtil.tryGetSingleVarcharValue;
Expand All @@ -50,6 +54,8 @@
import static io.trino.spi.type.Timestamps.PICOSECONDS_PER_NANOSECOND;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static io.trino.spi.type.VarcharType.createUnboundedVarcharType;
import static java.lang.Math.toIntExact;
import static java.util.Map.entry;
import static java.util.Objects.requireNonNull;

public class MaterializedViewSystemTable
Expand Down Expand Up @@ -95,7 +101,8 @@ public ConnectorTableMetadata getTableMetadata()
public RecordCursor cursor(
ConnectorTransactionHandle transactionHandle,
ConnectorSession connectorSession,
TupleDomain<Integer> constraint)
TupleDomain<Integer> constraint,
Set<Integer> requiredColumns)
{
Session session = ((FullConnectorSession) connectorSession).getSession();
InMemoryRecordSet.Builder displayTable = InMemoryRecordSet.builder(getTableMetadata());
Expand All @@ -109,6 +116,7 @@ public RecordCursor cursor(
}

Optional<String> tableFilter = tryGetSingleVarcharValue(tableDomain);
boolean needFreshness = requiredColumns.contains(columnIndex("freshness")) || requiredColumns.contains(columnIndex("last_fresh_time"));

listCatalogNames(session, metadata, accessControl, catalogDomain).forEach(catalogName -> {
// TODO A connector may be able to pull information from multiple schemas at once, so pass the schema filter to the connector instead.
Expand All @@ -119,29 +127,31 @@ public RecordCursor cursor(
if (isImpossibleObjectName(schemaName)) {
continue;
}
addMaterializedViewForCatalog(session, displayTable, tablePrefix(catalogName, Optional.of(schemaName), tableFilter));
addMaterializedViewForCatalog(session, displayTable, tablePrefix(catalogName, Optional.of(schemaName), tableFilter), needFreshness);
}
}
else {
addMaterializedViewForCatalog(session, displayTable, tablePrefix(catalogName, Optional.empty(), tableFilter));
addMaterializedViewForCatalog(session, displayTable, tablePrefix(catalogName, Optional.empty(), tableFilter), needFreshness);
}
});

return displayTable.build().cursor();
}

private void addMaterializedViewForCatalog(Session session, InMemoryRecordSet.Builder displayTable, QualifiedTablePrefix tablePrefix)
private void addMaterializedViewForCatalog(Session session, InMemoryRecordSet.Builder displayTable, QualifiedTablePrefix tablePrefix, boolean needFreshness)
{
getMaterializedViews(session, metadata, accessControl, tablePrefix).forEach((tableName, definition) -> {
QualifiedObjectName name = new QualifiedObjectName(tablePrefix.getCatalogName(), tableName.getSchemaName(), tableName.getTableName());
MaterializedViewFreshness freshness;
Optional<MaterializedViewFreshness> freshness = Optional.empty();

try {
freshness = metadata.getMaterializedViewFreshness(session, name);
}
catch (MaterializedViewNotFoundException e) {
// Ignore materialized view that was dropped during query execution (race condition)
return;
if (needFreshness) {
try {
freshness = Optional.of(metadata.getMaterializedViewFreshness(session, name));
}
catch (MaterializedViewNotFoundException e) {
// Ignore materialized view that was dropped during query execution (race condition)
return;
}
}

Object[] materializedViewRow = createMaterializedViewRow(name, freshness, definition);
Expand All @@ -151,7 +161,7 @@ private void addMaterializedViewForCatalog(Session session, InMemoryRecordSet.Bu

private static Object[] createMaterializedViewRow(
QualifiedObjectName name,
MaterializedViewFreshness freshness,
Optional<MaterializedViewFreshness> freshness,
ViewInfo definition)
{
return new Object[] {
Expand All @@ -168,9 +178,11 @@ private static Object[] createMaterializedViewRow(
.map(storageTable -> storageTable.getSchemaTableName().getTableName())
.orElse(""),
// freshness
freshness.getFreshness().name(),
freshness.map(MaterializedViewFreshness::getFreshness)
.map(Enum::name)
.orElse(null),
// last_fresh_time
freshness.getLastFreshTime()
freshness.flatMap(MaterializedViewFreshness::getLastFreshTime)
.map(instant -> LongTimestampWithTimeZone.fromEpochSecondsAndFraction(
instant.getEpochSecond(),
(long) instant.getNano() * PICOSECONDS_PER_NANOSECOND,
Expand All @@ -180,4 +192,12 @@ private static Object[] createMaterializedViewRow(
definition.getOriginalSql()
};
}

private static int columnIndex(String columnName)
{
return toIntExact(mapWithIndex(TABLE_DEFINITION.getColumns().stream(), (column, index) -> entry(column.getName(), index))
.filter(entry -> entry.getKey().equals(columnName))
.map(Entry::getValue)
.collect(onlyElement()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.connector.system;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.trino.plugin.base.MappedPageSource;
import io.trino.plugin.base.MappedRecordSet;
import io.trino.spi.TrinoException;
Expand All @@ -38,6 +39,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
Expand Down Expand Up @@ -83,6 +85,7 @@ public ConnectorPageSource createPageSource(
}

ImmutableList.Builder<Integer> userToSystemFieldIndex = ImmutableList.builder();
ImmutableSet.Builder<Integer> requiredColumns = ImmutableSet.builder();
for (ColumnHandle column : columns) {
String columnName = ((SystemColumnHandle) column).getColumnName();

Expand All @@ -92,6 +95,7 @@ public ConnectorPageSource createPageSource(
}

userToSystemFieldIndex.add(index);
requiredColumns.add(index);
}

TupleDomain<ColumnHandle> constraint = systemSplit.getConstraint();
Expand All @@ -105,11 +109,18 @@ public ConnectorPageSource createPageSource(
return new MappedPageSource(systemTable.pageSource(systemTransaction.getConnectorTransactionHandle(), session, newConstraint), userToSystemFieldIndex.build());
}
catch (UnsupportedOperationException e) {
return new RecordPageSource(new MappedRecordSet(toRecordSet(systemTransaction.getConnectorTransactionHandle(), systemTable, session, newConstraint), userToSystemFieldIndex.build()));
return new RecordPageSource(new MappedRecordSet(
toRecordSet(
systemTransaction.getConnectorTransactionHandle(),
systemTable,
session,
newConstraint,
requiredColumns.build()),
userToSystemFieldIndex.build()));
}
}

private static RecordSet toRecordSet(ConnectorTransactionHandle sourceTransaction, SystemTable table, ConnectorSession session, TupleDomain<Integer> constraint)
private static RecordSet toRecordSet(ConnectorTransactionHandle sourceTransaction, SystemTable table, ConnectorSession session, TupleDomain<Integer> constraint, Set<Integer> requiredColumns)
{
return new RecordSet()
{
Expand All @@ -126,7 +137,7 @@ public List<Type> getColumnTypes()
@Override
public RecordCursor cursor()
{
return table.cursor(sourceTransaction, session, constraint);
return table.cursor(sourceTransaction, session, constraint, requiredColumns);
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

import io.trino.spi.predicate.TupleDomain;

import java.util.Set;

/**
* Exactly one of {@link #cursor} or {@link #pageSource} must be implemented.
*/
Expand All @@ -40,6 +42,11 @@ default RecordCursor cursor(ConnectorTransactionHandle transactionHandle, Connec
throw new UnsupportedOperationException();
}

default RecordCursor cursor(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain<Integer> constraint, Set<Integer> requiredColumns)
{
return cursor(transactionHandle, session, constraint);
}

/**
* Create a page source for the data in this table.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import io.trino.spi.connector.SystemTable;
import io.trino.spi.predicate.TupleDomain;

import java.util.Set;

import static java.util.Objects.requireNonNull;

public class ClassLoaderSafeSystemTable
Expand Down Expand Up @@ -62,6 +64,14 @@ public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, Connect
}
}

@Override
public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain<Integer> constraint, Set<Integer> requiredColumns)
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
return delegate.cursor(transactionHandle, session, constraint, requiredColumns);
}
}

@Override
public ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain<Integer> constraint)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class CountingAccessHiveMetastore
public enum Method
{
CREATE_DATABASE,
DROP_DATABASE,
CREATE_TABLE,
GET_ALL_DATABASES,
GET_DATABASE,
Expand Down Expand Up @@ -144,7 +145,8 @@ public void createDatabase(Database database)
@Override
public void dropDatabase(String databaseName, boolean deleteData)
{
throw new UnsupportedOperationException();
methodInvocations.add(Method.DROP_DATABASE);
delegate.dropDatabase(databaseName, deleteData);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,57 @@ public Object[][] metadataQueriesTestTableCountDataProvider()
};
}

@Test
public void testSystemMetadataMaterializedViews()
{
String schemaName = "test_materialized_views_" + randomNameSuffix();
assertUpdate("CREATE SCHEMA " + schemaName);
Session session = Session.builder(getSession())
.setSchema(schemaName)
.build();

assertUpdate(session, "CREATE TABLE test_table1 AS SELECT 1 a", 1);
assertUpdate(session, "CREATE TABLE test_table2 AS SELECT 1 a", 1);

assertUpdate(session, "CREATE MATERIALIZED VIEW mv1 AS SELECT * FROM test_table1 JOIN test_table2 USING (a)");
assertUpdate(session, "REFRESH MATERIALIZED VIEW mv1", 1);

assertUpdate(session, "CREATE MATERIALIZED VIEW mv2 AS SELECT count(*) c FROM test_table1 JOIN test_table2 USING (a)");
assertUpdate(session, "REFRESH MATERIALIZED VIEW mv2", 1);

// Bulk retrieval
assertFileSystemAccesses(session, "SELECT * FROM system.metadata.materialized_views WHERE schema_name = CURRENT_SCHEMA",
ImmutableMultiset.<FileOperation>builder()
.addCopies(new FileOperation(METADATA_JSON, INPUT_FILE_NEW_STREAM), 4)
.build());

// Bulk retrieval without selecting freshness
assertFileSystemAccesses(session, "SELECT schema_name, name FROM system.metadata.materialized_views WHERE schema_name = CURRENT_SCHEMA",
ImmutableMultiset.<FileOperation>builder()
.addCopies(new FileOperation(METADATA_JSON, INPUT_FILE_NEW_STREAM), 2)
.build());

// Bulk retrieval for two schemas
assertFileSystemAccesses(session, "SELECT * FROM system.metadata.materialized_views WHERE schema_name IN (CURRENT_SCHEMA, 'non_existent')",
ImmutableMultiset.<FileOperation>builder()
.addCopies(new FileOperation(METADATA_JSON, INPUT_FILE_NEW_STREAM), 4)
.build());

// Pointed lookup
assertFileSystemAccesses(session, "SELECT * FROM system.metadata.materialized_views WHERE schema_name = CURRENT_SCHEMA AND name = 'mv1'",
ImmutableMultiset.<FileOperation>builder()
.addCopies(new FileOperation(METADATA_JSON, INPUT_FILE_NEW_STREAM), 3)
.build());

// Pointed lookup without selecting freshness
assertFileSystemAccesses(session, "SELECT schema_name, name FROM system.metadata.materialized_views WHERE schema_name = CURRENT_SCHEMA AND name = 'mv1'",
ImmutableMultiset.<FileOperation>builder()
.add(new FileOperation(METADATA_JSON, INPUT_FILE_NEW_STREAM))
.build());

assertUpdate("DROP SCHEMA " + schemaName + " CASCADE");
}

@Test
public void testShowTables()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,60 @@ public Object[][] metadataQueriesTestTableCountDataProvider()
};
}

@Test
public void testSystemMetadataMaterializedViews()
{
String schemaName = "test_materialized_views_" + randomNameSuffix();
assertUpdate("CREATE SCHEMA " + schemaName);
Session session = Session.builder(getSession())
.setSchema(schemaName)
.build();

assertUpdate(session, "CREATE TABLE test_table1 AS SELECT 1 a", 1);
assertUpdate(session, "CREATE TABLE test_table2 AS SELECT 1 a", 1);

assertUpdate(session, "CREATE MATERIALIZED VIEW mv1 AS SELECT * FROM test_table1 JOIN test_table2 USING (a)");
assertUpdate(session, "REFRESH MATERIALIZED VIEW mv1", 1);

assertUpdate(session, "CREATE MATERIALIZED VIEW mv2 AS SELECT count(*) c FROM test_table1 JOIN test_table2 USING (a)");
assertUpdate(session, "REFRESH MATERIALIZED VIEW mv2", 1);

// Bulk retrieval
assertMetastoreInvocations(session, "SELECT * FROM system.metadata.materialized_views WHERE schema_name = CURRENT_SCHEMA",
ImmutableMultiset.builder()
.add(GET_TABLES_WITH_PARAMETER)
.addCopies(GET_TABLE, 6)
.build());

// Bulk retrieval without selecting freshness
assertMetastoreInvocations(session, "SELECT schema_name, name FROM system.metadata.materialized_views WHERE schema_name = CURRENT_SCHEMA",
ImmutableMultiset.builder()
.add(GET_TABLES_WITH_PARAMETER)
.addCopies(GET_TABLE, 4)
.build());

// Bulk retrieval for two schemas
assertMetastoreInvocations(session, "SELECT * FROM system.metadata.materialized_views WHERE schema_name IN (CURRENT_SCHEMA, 'non_existent')",
ImmutableMultiset.builder()
.addCopies(GET_TABLES_WITH_PARAMETER, 2)
.addCopies(GET_TABLE, 6)
.build());

// Pointed lookup
assertMetastoreInvocations(session, "SELECT * FROM system.metadata.materialized_views WHERE schema_name = CURRENT_SCHEMA AND name = 'mv1'",
ImmutableMultiset.builder()
.addCopies(GET_TABLE, 4)
.build());

// Pointed lookup without selecting freshness
assertMetastoreInvocations(session, "SELECT schema_name, name FROM system.metadata.materialized_views WHERE schema_name = CURRENT_SCHEMA AND name = 'mv1'",
ImmutableMultiset.builder()
.addCopies(GET_TABLE, 2)
.build());

assertUpdate("DROP SCHEMA " + schemaName + " CASCADE");
}

@Test
public void testShowTables()
{
Expand Down