From e8b858a54cfb082ba645a4baee0987c39918e6ce Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Thu, 26 Sep 2024 15:48:03 +0900 Subject: [PATCH] Store metadata when listing columns/comments in Delta Lake --- .../plugin/deltalake/DeltaLakeMetadata.java | 4 +- .../TestDeltaLakeFileOperations.java | 75 +++++++++++++++++++ 2 files changed, 77 insertions(+), 2 deletions(-) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index b69c766682d4a..3af6f281c910d 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -921,6 +921,7 @@ private RelationCommentMetadata getRelationCommentMetadata(ConnectorSession sess TableSnapshot snapshot = getSnapshot(session, tableName, tableLocation, Optional.empty()); MetadataEntry metadata = transactionLogAccess.getMetadataEntry(session, snapshot); + enqueueUpdateInfo(session, table.getDatabaseName(), table.getTableName(), snapshot.getVersion(), metadata.getSchemaString(), Optional.ofNullable(metadata.getDescription())); return RelationCommentMetadata.forRelation(tableName, Optional.ofNullable(metadata.getDescription())); } catch (RuntimeException e) { @@ -994,12 +995,11 @@ public Iterator streamTableColumns(ConnectorSession sessio List columnsMetadata = metadataScheduler.getColumnsMetadata(table); return Stream.of(TableColumnsMetadata.forTable(tableName, columnsMetadata)); } - // Don't store cache in streamTableColumns method for avoiding too many update calls - TableSnapshot snapshot = transactionLogAccess.loadSnapshot(session, tableName, tableLocation, Optional.empty()); MetadataEntry metadata = transactionLogAccess.getMetadataEntry(session, snapshot); ProtocolEntry protocol = transactionLogAccess.getProtocolEntry(session, snapshot); List columnMetadata = getTableColumnMetadata(metadata, protocol); + enqueueUpdateInfo(session, table.getDatabaseName(), table.getTableName(), snapshot.getVersion(), metadata.getSchemaString(), Optional.ofNullable(metadata.getDescription())); return Stream.of(TableColumnsMetadata.forTable(tableName, columnMetadata)); } catch (NotADeltaLakeTableException | IOException e) { diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java index 80d94dc588106..7460919d0bfae 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java @@ -20,7 +20,10 @@ import io.opentelemetry.sdk.trace.data.SpanData; import io.trino.Session; import io.trino.SystemSessionProperties; +import io.trino.metastore.HiveMetastore; +import io.trino.metastore.Table; import io.trino.plugin.deltalake.metastore.DeltaLakeTableMetadataScheduler; +import io.trino.plugin.hive.metastore.HiveMetastoreFactory; import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.DistributedQueryRunner; import io.trino.testing.QueryRunner; @@ -36,10 +39,12 @@ import java.nio.file.Path; import java.util.Arrays; import java.util.List; +import java.util.Optional; import java.util.regex.Matcher; import java.util.regex.Pattern; import static com.google.common.base.MoreObjects.firstNonNull; +import static com.google.common.collect.Maps.filterKeys; import static com.google.common.io.MoreFiles.deleteRecursively; import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; import static io.trino.filesystem.tracing.FileSystemAttributes.FILE_LOCATION; @@ -52,6 +57,7 @@ import static io.trino.plugin.deltalake.TestDeltaLakeFileOperations.FileType.TRANSACTION_LOG_JSON; import static io.trino.plugin.deltalake.TestDeltaLakeFileOperations.FileType.TRINO_EXTENDED_STATS_JSON; import static io.trino.plugin.deltalake.TestingDeltaLakeUtils.copyDirectoryContents; +import static io.trino.plugin.hive.metastore.MetastoreUtil.buildInitialPrivilegeSet; import static io.trino.testing.MultisetAssertions.assertMultisetsEqual; import static io.trino.testing.TestingNames.randomNameSuffix; import static java.lang.Math.toIntExact; @@ -66,6 +72,7 @@ public class TestDeltaLakeFileOperations { private static final int MAX_PREFIXES_COUNT = 10; + private HiveMetastore metastore; // TODO: Consider waiting for scheduled task completion instead of manual triggering private DeltaLakeTableMetadataScheduler metadataScheduler; @@ -85,6 +92,7 @@ protected QueryRunner createQueryRunner() .addDeltaProperty("delta.metastore.store-table-metadata-threads", "0") // Use the same thread to make the test deterministic .addDeltaProperty("delta.metastore.store-table-metadata-interval", "30m") // Use a large interval to avoid interference with the test .build(); + metastore = TestingDeltaLakeUtils.getConnectorService(queryRunner, HiveMetastoreFactory.class).createMetastore(Optional.empty()); metadataScheduler = TestingDeltaLakeUtils.getConnectorService(queryRunner, DeltaLakeTableMetadataScheduler.class); return queryRunner; } @@ -693,6 +701,12 @@ public void testTableChangesFileSystemAccess() @Test public void testInformationSchemaColumns() + { + testInformationSchemaColumns(true); + testInformationSchemaColumns(false); + } + + private void testInformationSchemaColumns(boolean removeCachedProperties) { for (int tables : Arrays.asList(3, MAX_PREFIXES_COUNT, MAX_PREFIXES_COUNT + 3)) { String schemaName = "test_i_s_columns_schema" + randomNameSuffix(); @@ -708,12 +722,35 @@ public void testInformationSchemaColumns() assertUpdate(session, "INSERT INTO test_select_i_s_columns" + i + " VALUES ('xyz', 12)", 1); assertUpdate(session, "CREATE TABLE test_other_select_i_s_columns" + i + "(id varchar, age integer)"); // won't match the filter + + if (removeCachedProperties) { + removeLastTransactionVersionFromMetastore(schemaName, "test_select_i_s_columns" + i); + removeLastTransactionVersionFromMetastore(schemaName, "test_other_select_i_s_columns" + i); + metadataScheduler.clear(); + } } // Store table metadata in metastore for making the file access counts deterministic metadataScheduler.process(); // Bulk retrieval + assertFileSystemAccesses(session, "SELECT * FROM information_schema.columns WHERE table_schema = CURRENT_SCHEMA", removeCachedProperties ? + ImmutableMultiset.builder() + .addCopies(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", "InputFile.newStream"), tables * 2) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.newStream"), tables * 2) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.newStream"), tables * 2) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.newStream"), tables) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.newStream"), tables) + .build() : + ImmutableMultiset.builder() + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.exists"), tables) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.exists"), tables) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.exists"), tables) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.exists"), tables) + .build()); + + // Repeat bulk retrieval. The above query should store column definitions in metastore and the below query should not open any files. + metadataScheduler.process(); assertFileSystemAccesses(session, "SELECT * FROM information_schema.columns WHERE table_schema = CURRENT_SCHEMA", ImmutableMultiset.builder() .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.exists"), tables) @@ -768,6 +805,12 @@ public void testInformationSchemaColumns() @Test public void testSystemMetadataTableComments() + { + testSystemMetadataTableComments(true); + testSystemMetadataTableComments(false); + } + + private void testSystemMetadataTableComments(boolean removeCachedProperties) { for (int tables : Arrays.asList(3, MAX_PREFIXES_COUNT, MAX_PREFIXES_COUNT + 3)) { String schemaName = "test_s_m_table_comments" + randomNameSuffix(); @@ -783,12 +826,35 @@ public void testSystemMetadataTableComments() assertUpdate(session, "INSERT INTO test_select_s_m_t_comments" + i + " VALUES ('xyz', 12)", 1); assertUpdate(session, "CREATE TABLE test_other_select_s_m_t_comments" + i + "(id varchar, age integer)"); // won't match the filter + + if (removeCachedProperties) { + removeLastTransactionVersionFromMetastore(schemaName, "test_select_s_m_t_comments" + i); + removeLastTransactionVersionFromMetastore(schemaName, "test_other_select_s_m_t_comments" + i); + metadataScheduler.clear(); + } } // Store table metadata in metastore for making the file access counts deterministic metadataScheduler.process(); // Bulk retrieval + assertFileSystemAccesses(session, "SELECT * FROM system.metadata.table_comments WHERE schema_name = CURRENT_SCHEMA", removeCachedProperties ? + ImmutableMultiset.builder() + .addCopies(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", "InputFile.newStream"), tables * 2) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.newStream"), tables * 2) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.newStream"), tables * 2) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.newStream"), tables) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.newStream"), tables) + .build() : + ImmutableMultiset.builder() + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.exists"), tables) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.exists"), tables) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.exists"), tables) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.exists"), tables) + .build()); + + // Repeat bulk retrieval. The above query should store table comments in metastore and the below query should not open any files. + metadataScheduler.process(); assertFileSystemAccesses(session, "SELECT * FROM system.metadata.table_comments WHERE schema_name = CURRENT_SCHEMA", ImmutableMultiset.builder() .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.exists"), tables) @@ -1067,4 +1133,13 @@ private URL getResourceLocation(String resourcePath) { return getClass().getClassLoader().getResource(resourcePath); } + + private void removeLastTransactionVersionFromMetastore(String schemaName, String tableName) + { + Table table = metastore.getTable(schemaName, tableName).orElseThrow(); + Table newMetastoreTable = Table.builder(table) + .setParameters(filterKeys(table.getParameters(), key -> !key.equals("trino_last_transaction_version"))) + .build(); + metastore.replaceTable(table.getDatabaseName(), table.getTableName(), newMetastoreTable, buildInitialPrivilegeSet(table.getOwner().orElseThrow())); + } }