Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store metadata when listing columns/comments in Delta Lake #23589

Merged
merged 1 commit into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -921,6 +921,7 @@ private RelationCommentMetadata getRelationCommentMetadata(ConnectorSession sess

TableSnapshot snapshot = getSnapshot(session, tableName, tableLocation, Optional.empty());
MetadataEntry metadata = transactionLogAccess.getMetadataEntry(session, snapshot);
enqueueUpdateInfo(session, table.getDatabaseName(), table.getTableName(), snapshot.getVersion(), metadata.getSchemaString(), Optional.ofNullable(metadata.getDescription()));
return RelationCommentMetadata.forRelation(tableName, Optional.ofNullable(metadata.getDescription()));
}
catch (RuntimeException e) {
Expand Down Expand Up @@ -994,12 +995,11 @@ public Iterator<TableColumnsMetadata> streamTableColumns(ConnectorSession sessio
List<ColumnMetadata> columnsMetadata = metadataScheduler.getColumnsMetadata(table);
return Stream.of(TableColumnsMetadata.forTable(tableName, columnsMetadata));
}
// Don't store cache in streamTableColumns method for avoiding too many update calls

TableSnapshot snapshot = transactionLogAccess.loadSnapshot(session, tableName, tableLocation, Optional.empty());
MetadataEntry metadata = transactionLogAccess.getMetadataEntry(session, snapshot);
ProtocolEntry protocol = transactionLogAccess.getProtocolEntry(session, snapshot);
List<ColumnMetadata> columnMetadata = getTableColumnMetadata(metadata, protocol);
enqueueUpdateInfo(session, table.getDatabaseName(), table.getTableName(), snapshot.getVersion(), metadata.getSchemaString(), Optional.ofNullable(metadata.getDescription()));
return Stream.of(TableColumnsMetadata.forTable(tableName, columnMetadata));
}
catch (NotADeltaLakeTableException | IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
import io.opentelemetry.sdk.trace.data.SpanData;
import io.trino.Session;
import io.trino.SystemSessionProperties;
import io.trino.metastore.HiveMetastore;
import io.trino.metastore.Table;
import io.trino.plugin.deltalake.metastore.DeltaLakeTableMetadataScheduler;
import io.trino.plugin.hive.metastore.HiveMetastoreFactory;
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.DistributedQueryRunner;
import io.trino.testing.QueryRunner;
Expand All @@ -36,10 +39,12 @@
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.collect.Maps.filterKeys;
import static com.google.common.io.MoreFiles.deleteRecursively;
import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
import static io.trino.filesystem.tracing.FileSystemAttributes.FILE_LOCATION;
Expand All @@ -52,6 +57,7 @@
import static io.trino.plugin.deltalake.TestDeltaLakeFileOperations.FileType.TRANSACTION_LOG_JSON;
import static io.trino.plugin.deltalake.TestDeltaLakeFileOperations.FileType.TRINO_EXTENDED_STATS_JSON;
import static io.trino.plugin.deltalake.TestingDeltaLakeUtils.copyDirectoryContents;
import static io.trino.plugin.hive.metastore.MetastoreUtil.buildInitialPrivilegeSet;
import static io.trino.testing.MultisetAssertions.assertMultisetsEqual;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static java.lang.Math.toIntExact;
Expand All @@ -66,6 +72,7 @@ public class TestDeltaLakeFileOperations
{
private static final int MAX_PREFIXES_COUNT = 10;

private HiveMetastore metastore;
// TODO: Consider waiting for scheduled task completion instead of manual triggering
private DeltaLakeTableMetadataScheduler metadataScheduler;

Expand All @@ -85,6 +92,7 @@ protected QueryRunner createQueryRunner()
.addDeltaProperty("delta.metastore.store-table-metadata-threads", "0") // Use the same thread to make the test deterministic
.addDeltaProperty("delta.metastore.store-table-metadata-interval", "30m") // Use a large interval to avoid interference with the test
.build();
metastore = TestingDeltaLakeUtils.getConnectorService(queryRunner, HiveMetastoreFactory.class).createMetastore(Optional.empty());
metadataScheduler = TestingDeltaLakeUtils.getConnectorService(queryRunner, DeltaLakeTableMetadataScheduler.class);
return queryRunner;
}
Expand Down Expand Up @@ -693,6 +701,12 @@ public void testTableChangesFileSystemAccess()

@Test
public void testInformationSchemaColumns()
{
testInformationSchemaColumns(true);
testInformationSchemaColumns(false);
}

private void testInformationSchemaColumns(boolean removeCachedProperties)
{
for (int tables : Arrays.asList(3, MAX_PREFIXES_COUNT, MAX_PREFIXES_COUNT + 3)) {
String schemaName = "test_i_s_columns_schema" + randomNameSuffix();
Expand All @@ -708,12 +722,35 @@ public void testInformationSchemaColumns()
assertUpdate(session, "INSERT INTO test_select_i_s_columns" + i + " VALUES ('xyz', 12)", 1);

assertUpdate(session, "CREATE TABLE test_other_select_i_s_columns" + i + "(id varchar, age integer)"); // won't match the filter

if (removeCachedProperties) {
removeLastTransactionVersionFromMetastore(schemaName, "test_select_i_s_columns" + i);
removeLastTransactionVersionFromMetastore(schemaName, "test_other_select_i_s_columns" + i);
metadataScheduler.clear();
}
}

// Store table metadata in metastore for making the file access counts deterministic
metadataScheduler.process();

// Bulk retrieval
assertFileSystemAccesses(session, "SELECT * FROM information_schema.columns WHERE table_schema = CURRENT_SCHEMA", removeCachedProperties ?
ImmutableMultiset.<FileOperation>builder()
.addCopies(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", "InputFile.newStream"), tables * 2)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.newStream"), tables * 2)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.newStream"), tables * 2)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.newStream"), tables)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.newStream"), tables)
.build() :
ImmutableMultiset.<FileOperation>builder()
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.exists"), tables)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.exists"), tables)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.exists"), tables)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.exists"), tables)
.build());

// Repeat bulk retrieval. The above query should store column definitions in metastore and the below query should not open any files.
metadataScheduler.process();
assertFileSystemAccesses(session, "SELECT * FROM information_schema.columns WHERE table_schema = CURRENT_SCHEMA",
ImmutableMultiset.<FileOperation>builder()
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.exists"), tables)
Expand Down Expand Up @@ -768,6 +805,12 @@ public void testInformationSchemaColumns()

@Test
public void testSystemMetadataTableComments()
{
testSystemMetadataTableComments(true);
testSystemMetadataTableComments(false);
}

private void testSystemMetadataTableComments(boolean removeCachedProperties)
{
for (int tables : Arrays.asList(3, MAX_PREFIXES_COUNT, MAX_PREFIXES_COUNT + 3)) {
String schemaName = "test_s_m_table_comments" + randomNameSuffix();
Expand All @@ -783,12 +826,35 @@ public void testSystemMetadataTableComments()
assertUpdate(session, "INSERT INTO test_select_s_m_t_comments" + i + " VALUES ('xyz', 12)", 1);

assertUpdate(session, "CREATE TABLE test_other_select_s_m_t_comments" + i + "(id varchar, age integer)"); // won't match the filter

if (removeCachedProperties) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It took me a bit to understand the purpose of removeCachedProperties in testSystemMetadataTableComments() method.

I understand though now the purpose - we want to verify how the system.metadata.table_comments query behaves when the table comments are added filled one by one as well as when they are added in bulk.

removeLastTransactionVersionFromMetastore(schemaName, "test_select_s_m_t_comments" + i);
removeLastTransactionVersionFromMetastore(schemaName, "test_other_select_s_m_t_comments" + i);
metadataScheduler.clear();
}
}

// Store table metadata in metastore for making the file access counts deterministic
metadataScheduler.process();

// Bulk retrieval
assertFileSystemAccesses(session, "SELECT * FROM system.metadata.table_comments WHERE schema_name = CURRENT_SCHEMA", removeCachedProperties ?
ImmutableMultiset.<FileOperation>builder()
.addCopies(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", "InputFile.newStream"), tables * 2)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.newStream"), tables * 2)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.newStream"), tables * 2)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.newStream"), tables)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.newStream"), tables)
.build() :
ImmutableMultiset.<FileOperation>builder()
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.exists"), tables)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.exists"), tables)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.exists"), tables)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.exists"), tables)
.build());

// Repeat bulk retrieval. The above query should store table comments in metastore and the below query should not open any files.
metadataScheduler.process();
assertFileSystemAccesses(session, "SELECT * FROM system.metadata.table_comments WHERE schema_name = CURRENT_SCHEMA",
ImmutableMultiset.<FileOperation>builder()
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.exists"), tables)
Expand Down Expand Up @@ -1067,4 +1133,13 @@ private URL getResourceLocation(String resourcePath)
{
return getClass().getClassLoader().getResource(resourcePath);
}

private void removeLastTransactionVersionFromMetastore(String schemaName, String tableName)
{
Table table = metastore.getTable(schemaName, tableName).orElseThrow();
Table newMetastoreTable = Table.builder(table)
.setParameters(filterKeys(table.getParameters(), key -> !key.equals("trino_last_transaction_version")))
.build();
metastore.replaceTable(table.getDatabaseName(), table.getTableName(), newMetastoreTable, buildInitialPrivilegeSet(table.getOwner().orElseThrow()));
}
}
Loading