Skip to content

Commit

Permalink
Store metadata when listing columns/comments in Delta Lake
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Sep 27, 2024
1 parent 60a6529 commit 3fd0d60
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -917,6 +917,7 @@ private RelationCommentMetadata getRelationCommentMetadata(ConnectorSession sess

TableSnapshot snapshot = getSnapshot(session, tableName, tableLocation, Optional.empty());
MetadataEntry metadata = transactionLogAccess.getMetadataEntry(session, snapshot);
enqueueUpdateInfo(session, table.getDatabaseName(), table.getTableName(), snapshot.getVersion(), metadata.getSchemaString(), Optional.ofNullable(metadata.getDescription()));
return RelationCommentMetadata.forRelation(tableName, Optional.ofNullable(metadata.getDescription()));
}
catch (RuntimeException e) {
Expand Down Expand Up @@ -990,12 +991,11 @@ public Iterator<TableColumnsMetadata> streamTableColumns(ConnectorSession sessio
List<ColumnMetadata> columnsMetadata = metadataScheduler.getColumnsMetadata(table);
return Stream.of(TableColumnsMetadata.forTable(tableName, columnsMetadata));
}
// Don't store cache in streamTableColumns method for avoiding too many update calls

TableSnapshot snapshot = transactionLogAccess.loadSnapshot(session, tableName, tableLocation, Optional.empty());
MetadataEntry metadata = transactionLogAccess.getMetadataEntry(session, snapshot);
ProtocolEntry protocol = transactionLogAccess.getProtocolEntry(session, snapshot);
List<ColumnMetadata> columnMetadata = getTableColumnMetadata(metadata, protocol);
enqueueUpdateInfo(session, table.getDatabaseName(), table.getTableName(), snapshot.getVersion(), metadata.getSchemaString(), Optional.ofNullable(metadata.getDescription()));
return Stream.of(TableColumnsMetadata.forTable(tableName, columnMetadata));
}
catch (NotADeltaLakeTableException | IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.plugin.deltalake.metastore;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMultiset;
import com.google.common.collect.Maps;
import com.google.common.collect.Multiset;
Expand All @@ -34,6 +35,7 @@

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

Expand All @@ -49,6 +51,7 @@
import static io.trino.plugin.hive.metastore.MetastoreMethod.REPLACE_TABLE;
import static io.trino.plugin.hive.metastore.MetastoreUtil.buildInitialPrivilegeSet;
import static io.trino.testing.MultisetAssertions.assertMultisetsEqual;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static java.util.Map.entry;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.parallel.ExecutionMode.SAME_THREAD;
Expand All @@ -57,6 +60,8 @@
public class TestDeltaLakeMetastoreAccessOperations
extends AbstractTestQueryFramework
{
private static final int MAX_PREFIXES_COUNT = 10;

private HiveMetastore metastore;
private DeltaLakeTableMetadataScheduler metadataScheduler;

Expand Down Expand Up @@ -278,6 +283,143 @@ public void testDropTable()
.build());
}

@Test
public void testInformationSchemaColumns()
{
for (int tables : ImmutableList.of(3, MAX_PREFIXES_COUNT, MAX_PREFIXES_COUNT + 3)) {
String schemaName = "test_i_s_columns_schema" + randomNameSuffix();
assertUpdate("CREATE SCHEMA " + schemaName);
Session session = Session.builder(getSession())
.setSchema(schemaName)
.build();

for (int i = 0; i < tables; i++) {
assertUpdate(session, "CREATE TABLE test_select_i_s_columns" + i + "(id varchar, age integer)");
assertUpdate(session, "INSERT INTO test_select_i_s_columns" + i + " VALUES ('abc', 11)", 1);
assertUpdate(session, "INSERT INTO test_select_i_s_columns" + i + " VALUES ('xyz', 12)", 1);
removeStoredParameters(schemaName, "test_select_i_s_columns" + i);

assertUpdate(session, "CREATE TABLE test_other_select_i_s_columns" + i + "(id varchar, age integer)"); // won't match the filter
removeStoredParameters(schemaName, "test_other_select_i_s_columns" + i);
}
metadataScheduler.clear();

// Bulk retrieval
assertMetastoreInvocations(session, "SELECT * FROM information_schema.columns WHERE table_schema = CURRENT_SCHEMA",
ImmutableMultiset.<MetastoreMethod>builder()
.add(GET_TABLES)
.addCopies(GET_TABLE, tables * 2)
.build(),
ImmutableMultiset.<MetastoreMethod>builder()
.addCopies(GET_TABLE, tables * 2)
.addCopies(REPLACE_TABLE, tables * 2)
.build());

assertMetastoreInvocations(session, "SELECT * FROM information_schema.columns WHERE table_schema = CURRENT_SCHEMA AND table_name LIKE 'test_select_i_s_columns%'",
ImmutableMultiset.<MetastoreMethod>builder()
.add(GET_TABLES)
.addCopies(GET_TABLE, tables * 2)
.build());

// Pointed lookup
assertMetastoreInvocations(session, "SELECT * FROM information_schema.columns WHERE table_schema = CURRENT_SCHEMA AND table_name = 'test_select_i_s_columns0'",
ImmutableMultiset.<MetastoreMethod>builder()
.add(GET_TABLE)
.build());

// Pointed lookup with LIKE predicate (as if unintentional)
assertMetastoreInvocations(session, "SELECT * FROM information_schema.columns WHERE table_schema = CURRENT_SCHEMA AND table_name LIKE 'test_select_i_s_columns0'",
ImmutableMultiset.<MetastoreMethod>builder()
.add(GET_TABLES)
.addCopies(GET_TABLE, tables * 2)
.build());

// Pointed lookup via DESCRIBE (which does some additional things before delegating to information_schema.columns)
assertMetastoreInvocations(session, "DESCRIBE test_select_i_s_columns0",
ImmutableMultiset.<MetastoreMethod>builder()
.add(GET_ALL_DATABASES)
.add(GET_TABLE)
.build());

for (int i = 0; i < tables; i++) {
assertUpdate(session, "DROP TABLE test_select_i_s_columns" + i);
assertUpdate(session, "DROP TABLE test_other_select_i_s_columns" + i);
}
}
}

@Test
public void testSystemMetadataTableComments()
{
for (int tables : ImmutableList.of(3, MAX_PREFIXES_COUNT, MAX_PREFIXES_COUNT + 3)) {
String schemaName = "test_s_m_table_comments" + randomNameSuffix();
assertUpdate("CREATE SCHEMA " + schemaName);
Session session = Session.builder(getSession())
.setSchema(schemaName)
.build();

for (int i = 0; i < tables; i++) {
assertUpdate(session, "CREATE TABLE test_select_s_m_t_comments" + i + "(id varchar, age integer)");
assertUpdate(session, "INSERT INTO test_select_s_m_t_comments" + i + " VALUES ('abc', 11)", 1);
assertUpdate(session, "INSERT INTO test_select_s_m_t_comments" + i + " VALUES ('xyz', 12)", 1);
removeStoredParameters(schemaName, "test_select_s_m_t_comments" + i);

assertUpdate(session, "CREATE TABLE test_other_select_s_m_t_comments" + i + "(id varchar, age integer)"); // won't match the filter
removeStoredParameters(schemaName, "test_other_select_s_m_t_comments" + i);
}
metadataScheduler.clear();

// Bulk retrieval
assertMetastoreInvocations(session, "SELECT * FROM system.metadata.table_comments WHERE schema_name = CURRENT_SCHEMA",
ImmutableMultiset.<MetastoreMethod>builder()
.add(GET_TABLES)
.addCopies(GET_TABLE, tables * 2)
.build(),
ImmutableMultiset.<MetastoreMethod>builder()
.addCopies(GET_TABLE, tables * 2)
.addCopies(REPLACE_TABLE, tables * 2)
.build());

assertMetastoreInvocations(session, "SELECT * FROM system.metadata.table_comments WHERE schema_name = CURRENT_SCHEMA AND table_name LIKE 'test_select_s_m_t_comments%'",
ImmutableMultiset.<MetastoreMethod>builder()
.add(GET_TABLES)
.addCopies(GET_TABLE, tables * 2)
.build());

// Bulk retrieval for two schemas
assertMetastoreInvocations(session, "SELECT * FROM system.metadata.table_comments WHERE schema_name IN (CURRENT_SCHEMA, 'non_existent') AND table_name LIKE 'test_select_s_m_t_comments%'",
ImmutableMultiset.<MetastoreMethod>builder()
.addCopies(GET_TABLES, 2)
.addCopies(GET_TABLE, tables * 2)
.build());

// Pointed lookup
assertMetastoreInvocations(session, "SELECT * FROM system.metadata.table_comments WHERE schema_name = CURRENT_SCHEMA AND table_name = 'test_select_s_m_t_comments0'",
ImmutableMultiset.<MetastoreMethod>builder()
.add(GET_TABLE)
.build());

// Pointed lookup with LIKE predicate (as if unintentional)
assertMetastoreInvocations(session, "SELECT * FROM system.metadata.table_comments WHERE schema_name = CURRENT_SCHEMA AND table_name LIKE 'test_select_s_m_t_comments0'",
ImmutableMultiset.<MetastoreMethod>builder()
.add(GET_TABLES)
.addCopies(GET_TABLE, tables * 2)
.build());

for (int i = 0; i < tables; i++) {
assertUpdate(session, "DROP TABLE test_select_s_m_t_comments" + i);
assertUpdate(session, "DROP TABLE test_other_select_s_m_t_comments" + i);
}
}
}
f
private void removeStoredParameters(String schemaName, String tableName)
{
Table table = metastore.getTable(schemaName, tableName).orElseThrow();
Map<String, String> parameters = Maps.filterKeys(table.getParameters(), key -> !key.equals("trino_last_transaction_version") && !key.equals("trino_metadata_schema_string"));
metastore.replaceTable(table.getDatabaseName(), table.getTableName(), table.withParameters(parameters), buildInitialPrivilegeSet(table.getOwner().orElseThrow()));
}

@Test
public void testShowTables()
{
Expand Down

0 comments on commit 3fd0d60

Please sign in to comment.