Skip to content

Commit

Permalink
Store metadata when listing columns/comments in Delta Lake
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Oct 3, 2024
1 parent 990416a commit e8b858a
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -921,6 +921,7 @@ private RelationCommentMetadata getRelationCommentMetadata(ConnectorSession sess

TableSnapshot snapshot = getSnapshot(session, tableName, tableLocation, Optional.empty());
MetadataEntry metadata = transactionLogAccess.getMetadataEntry(session, snapshot);
enqueueUpdateInfo(session, table.getDatabaseName(), table.getTableName(), snapshot.getVersion(), metadata.getSchemaString(), Optional.ofNullable(metadata.getDescription()));
return RelationCommentMetadata.forRelation(tableName, Optional.ofNullable(metadata.getDescription()));
}
catch (RuntimeException e) {
Expand Down Expand Up @@ -994,12 +995,11 @@ public Iterator<TableColumnsMetadata> streamTableColumns(ConnectorSession sessio
List<ColumnMetadata> columnsMetadata = metadataScheduler.getColumnsMetadata(table);
return Stream.of(TableColumnsMetadata.forTable(tableName, columnsMetadata));
}
// Don't store cache in streamTableColumns method for avoiding too many update calls

TableSnapshot snapshot = transactionLogAccess.loadSnapshot(session, tableName, tableLocation, Optional.empty());
MetadataEntry metadata = transactionLogAccess.getMetadataEntry(session, snapshot);
ProtocolEntry protocol = transactionLogAccess.getProtocolEntry(session, snapshot);
List<ColumnMetadata> columnMetadata = getTableColumnMetadata(metadata, protocol);
enqueueUpdateInfo(session, table.getDatabaseName(), table.getTableName(), snapshot.getVersion(), metadata.getSchemaString(), Optional.ofNullable(metadata.getDescription()));
return Stream.of(TableColumnsMetadata.forTable(tableName, columnMetadata));
}
catch (NotADeltaLakeTableException | IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
import io.opentelemetry.sdk.trace.data.SpanData;
import io.trino.Session;
import io.trino.SystemSessionProperties;
import io.trino.metastore.HiveMetastore;
import io.trino.metastore.Table;
import io.trino.plugin.deltalake.metastore.DeltaLakeTableMetadataScheduler;
import io.trino.plugin.hive.metastore.HiveMetastoreFactory;
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.DistributedQueryRunner;
import io.trino.testing.QueryRunner;
Expand All @@ -36,10 +39,12 @@
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.collect.Maps.filterKeys;
import static com.google.common.io.MoreFiles.deleteRecursively;
import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
import static io.trino.filesystem.tracing.FileSystemAttributes.FILE_LOCATION;
Expand All @@ -52,6 +57,7 @@
import static io.trino.plugin.deltalake.TestDeltaLakeFileOperations.FileType.TRANSACTION_LOG_JSON;
import static io.trino.plugin.deltalake.TestDeltaLakeFileOperations.FileType.TRINO_EXTENDED_STATS_JSON;
import static io.trino.plugin.deltalake.TestingDeltaLakeUtils.copyDirectoryContents;
import static io.trino.plugin.hive.metastore.MetastoreUtil.buildInitialPrivilegeSet;
import static io.trino.testing.MultisetAssertions.assertMultisetsEqual;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static java.lang.Math.toIntExact;
Expand All @@ -66,6 +72,7 @@ public class TestDeltaLakeFileOperations
{
private static final int MAX_PREFIXES_COUNT = 10;

private HiveMetastore metastore;
// TODO: Consider waiting for scheduled task completion instead of manual triggering
private DeltaLakeTableMetadataScheduler metadataScheduler;

Expand All @@ -85,6 +92,7 @@ protected QueryRunner createQueryRunner()
.addDeltaProperty("delta.metastore.store-table-metadata-threads", "0") // Use the same thread to make the test deterministic
.addDeltaProperty("delta.metastore.store-table-metadata-interval", "30m") // Use a large interval to avoid interference with the test
.build();
metastore = TestingDeltaLakeUtils.getConnectorService(queryRunner, HiveMetastoreFactory.class).createMetastore(Optional.empty());
metadataScheduler = TestingDeltaLakeUtils.getConnectorService(queryRunner, DeltaLakeTableMetadataScheduler.class);
return queryRunner;
}
Expand Down Expand Up @@ -693,6 +701,12 @@ public void testTableChangesFileSystemAccess()

@Test
public void testInformationSchemaColumns()
{
testInformationSchemaColumns(true);
testInformationSchemaColumns(false);
}

private void testInformationSchemaColumns(boolean removeCachedProperties)
{
for (int tables : Arrays.asList(3, MAX_PREFIXES_COUNT, MAX_PREFIXES_COUNT + 3)) {
String schemaName = "test_i_s_columns_schema" + randomNameSuffix();
Expand All @@ -708,12 +722,35 @@ public void testInformationSchemaColumns()
assertUpdate(session, "INSERT INTO test_select_i_s_columns" + i + " VALUES ('xyz', 12)", 1);

assertUpdate(session, "CREATE TABLE test_other_select_i_s_columns" + i + "(id varchar, age integer)"); // won't match the filter

if (removeCachedProperties) {
removeLastTransactionVersionFromMetastore(schemaName, "test_select_i_s_columns" + i);
removeLastTransactionVersionFromMetastore(schemaName, "test_other_select_i_s_columns" + i);
metadataScheduler.clear();
}
}

// Store table metadata in metastore for making the file access counts deterministic
metadataScheduler.process();

// Bulk retrieval
assertFileSystemAccesses(session, "SELECT * FROM information_schema.columns WHERE table_schema = CURRENT_SCHEMA", removeCachedProperties ?
ImmutableMultiset.<FileOperation>builder()
.addCopies(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", "InputFile.newStream"), tables * 2)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.newStream"), tables * 2)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.newStream"), tables * 2)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.newStream"), tables)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.newStream"), tables)
.build() :
ImmutableMultiset.<FileOperation>builder()
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.exists"), tables)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.exists"), tables)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.exists"), tables)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.exists"), tables)
.build());

// Repeat bulk retrieval. The above query should store column definitions in metastore and the below query should not open any files.
metadataScheduler.process();
assertFileSystemAccesses(session, "SELECT * FROM information_schema.columns WHERE table_schema = CURRENT_SCHEMA",
ImmutableMultiset.<FileOperation>builder()
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.exists"), tables)
Expand Down Expand Up @@ -768,6 +805,12 @@ public void testInformationSchemaColumns()

@Test
public void testSystemMetadataTableComments()
{
testSystemMetadataTableComments(true);
testSystemMetadataTableComments(false);
}

private void testSystemMetadataTableComments(boolean removeCachedProperties)
{
for (int tables : Arrays.asList(3, MAX_PREFIXES_COUNT, MAX_PREFIXES_COUNT + 3)) {
String schemaName = "test_s_m_table_comments" + randomNameSuffix();
Expand All @@ -783,12 +826,35 @@ public void testSystemMetadataTableComments()
assertUpdate(session, "INSERT INTO test_select_s_m_t_comments" + i + " VALUES ('xyz', 12)", 1);

assertUpdate(session, "CREATE TABLE test_other_select_s_m_t_comments" + i + "(id varchar, age integer)"); // won't match the filter

if (removeCachedProperties) {
removeLastTransactionVersionFromMetastore(schemaName, "test_select_s_m_t_comments" + i);
removeLastTransactionVersionFromMetastore(schemaName, "test_other_select_s_m_t_comments" + i);
metadataScheduler.clear();
}
}

// Store table metadata in metastore for making the file access counts deterministic
metadataScheduler.process();

// Bulk retrieval
assertFileSystemAccesses(session, "SELECT * FROM system.metadata.table_comments WHERE schema_name = CURRENT_SCHEMA", removeCachedProperties ?
ImmutableMultiset.<FileOperation>builder()
.addCopies(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", "InputFile.newStream"), tables * 2)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.newStream"), tables * 2)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.newStream"), tables * 2)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.newStream"), tables)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.newStream"), tables)
.build() :
ImmutableMultiset.<FileOperation>builder()
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.exists"), tables)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.exists"), tables)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.exists"), tables)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.exists"), tables)
.build());

// Repeat bulk retrieval. The above query should store table comments in metastore and the below query should not open any files.
metadataScheduler.process();
assertFileSystemAccesses(session, "SELECT * FROM system.metadata.table_comments WHERE schema_name = CURRENT_SCHEMA",
ImmutableMultiset.<FileOperation>builder()
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.exists"), tables)
Expand Down Expand Up @@ -1067,4 +1133,13 @@ private URL getResourceLocation(String resourcePath)
{
return getClass().getClassLoader().getResource(resourcePath);
}

private void removeLastTransactionVersionFromMetastore(String schemaName, String tableName)
{
Table table = metastore.getTable(schemaName, tableName).orElseThrow();
Table newMetastoreTable = Table.builder(table)
.setParameters(filterKeys(table.getParameters(), key -> !key.equals("trino_last_transaction_version")))
.build();
metastore.replaceTable(table.getDatabaseName(), table.getTableName(), newMetastoreTable, buildInitialPrivilegeSet(table.getOwner().orElseThrow()));
}
}

0 comments on commit e8b858a

Please sign in to comment.