feat(connector): Add support for AWS Glue Table and Column Statistics#26297
feat(connector): Add support for AWS Glue Table and Column Statistics#26297imjalpreet wants to merge 1 commit intoprestodb:masterfrom
Conversation
Reviewer's GuideThis PR adds comprehensive AWS Glue table and column statistics support by introducing a pluggable GlueColumnStatisticsProvider abstraction (with enabled/disabled implementations), wiring new executors and configuration flags, extending GlueHiveMetastore to delegate statistics operations (read and write), improving batch partition fetch and update logic, extending the updatePartitionStatistics API across implementations, and providing converter utilities and necessary test updates. Sequence diagram for updating partition statistics with column statistics in GlueHiveMetastoresequenceDiagram
participant "Caller"
participant "GlueHiveMetastore"
participant "DefaultGlueColumnStatisticsProvider"
participant "AWSGlueAsync"
"Caller"->>"GlueHiveMetastore": updatePartitionStatistics(...)
"GlueHiveMetastore"->>"DefaultGlueColumnStatisticsProvider": getPartitionColumnStatistics(partitions)
"DefaultGlueColumnStatisticsProvider"->>"AWSGlueAsync": GetColumnStatisticsForPartition
"AWSGlueAsync"-->>"DefaultGlueColumnStatisticsProvider": Partition column stats
"GlueHiveMetastore"->>"AWSGlueAsync": batchUpdatePartitionAsync
"GlueHiveMetastore"->>"DefaultGlueColumnStatisticsProvider": updatePartitionStatistics(updates)
"DefaultGlueColumnStatisticsProvider"->>"AWSGlueAsync": UpdateColumnStatisticsForPartition
"DefaultGlueColumnStatisticsProvider"->>"AWSGlueAsync": DeleteColumnStatisticsForPartition (if needed)
"AWSGlueAsync"-->>"DefaultGlueColumnStatisticsProvider": Update/Delete result
"GlueHiveMetastore"-->>"Caller": Done
ER diagram for GlueHiveMetastoreConfig statistics-related propertieserDiagram
GLUE_HIVE_METASTORE_CONFIG {
bool columnStatisticsEnabled
int readStatisticsThreads
int writeStatisticsThreads
}
GLUE_HIVE_METASTORE_CONFIG ||--o| GLUE_HIVE_METASTORE : "configures"
Class diagram for new and updated Glue column statistics supportclassDiagram
class GlueHiveMetastore {
- GlueColumnStatisticsProvider columnStatisticsProvider
- boolean enableColumnStatistics
+ getSupportedColumnStatistics(type)
+ getTableStatistics(...)
+ getPartitionStatistics(...)
+ updateTableStatistics(...)
+ updatePartitionStatistics(...)
}
class GlueColumnStatisticsProvider {
<<interface>>
+ getSupportedColumnStatistics(type)
+ getTableColumnStatistics(table)
+ getPartitionColumnStatistics(partitions)
+ updateTableColumnStatistics(table, columnStatistics)
+ updatePartitionStatistics(partitionStatisticsUpdates)
}
class DefaultGlueColumnStatisticsProvider {
+ getSupportedColumnStatistics(type)
+ getTableColumnStatistics(table)
+ getPartitionColumnStatistics(partitions)
+ updateTableColumnStatistics(table, columnStatistics)
+ updatePartitionStatistics(partitionStatisticsUpdates)
}
class DisabledGlueColumnStatisticsProvider {
+ getSupportedColumnStatistics(type)
+ getTableColumnStatistics(table)
+ getPartitionColumnStatistics(partitions)
+ updateTableColumnStatistics(table, columnStatistics)
+ updatePartitionStatistics(partitionStatisticsUpdates)
}
GlueHiveMetastore --> GlueColumnStatisticsProvider
GlueColumnStatisticsProvider <|.. DefaultGlueColumnStatisticsProvider
GlueColumnStatisticsProvider <|.. DisabledGlueColumnStatisticsProvider
class GlueHiveMetastoreConfig {
+ boolean columnStatisticsEnabled
+ int readStatisticsThreads
+ int writeStatisticsThreads
+ setColumnStatisticsEnabled(...)
+ setReadStatisticsThreads(...)
+ setWriteStatisticsThreads(...)
}
class GlueMetastoreModule {
+ createStatisticsReadExecutor(...)
+ createStatisticsWriteExecutor(...)
}
class ForGlueColumnStatisticsRead {
<<annotation>>
}
class ForGlueColumnStatisticsWrite {
<<annotation>>
}
GlueMetastoreModule --> ForGlueColumnStatisticsRead
GlueMetastoreModule --> ForGlueColumnStatisticsWrite
DefaultGlueColumnStatisticsProvider --> GlueMetastoreStats
class GlueMetastoreStats {
+ getGetColumnStatisticsForTable()
+ getGetColumnStatisticsForPartition()
+ getUpdateColumnStatisticsForTable()
+ getDeleteColumnStatisticsForTable()
+ getUpdateColumnStatisticsForPartition()
+ getDeleteColumnStatisticsForPartition()
}
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
Co-authored-by: Deepak Majeti <majeti.deepak@gmail.com> Co-authored-by: George Wang <fgwang7w@gmail.com>
b04463a to
c44ca2d
Compare
|
@imjalpreet Thank you for the PR! I'll review it tomorrow. @agrawalreetika Will you be able to review it first? |
| public static List<ColumnStatistics> toGlueColumnStatistics( | ||
| Partition partition, | ||
| Map<String, | ||
| HiveColumnStatistics> trinoColumnStats, |
There was a problem hiding this comment.
Remove "trino". Same to other places and files too.
Is this file ported over from Trino? If yes, please add co-authored-by section in the PR and commit message.
There was a problem hiding this comment.
Thank you, I missed this. Yes, this class and a subset of these changes are part of a Trino PR. Additionally, we have modified the implementation to have a more optimized version. I will also add the PR details.
There was a problem hiding this comment.
Hey there - I've reviewed your changes - here's some feedback:
- In batchGetPartition the while‐loop can spin indefinitely if Glue keeps returning unprocessed keys but no partitions; consider adding a max retry or bail-out condition to avoid infinite loops.
- The hard-coded batch sizes (BATCH_GET_PARTITION_MAX_PAGE_SIZE, BATCH_UPDATE_PARTITION_MAX_PAGE_SIZE) may not match Glue’s actual limits or customer workloads—consider making them configurable or aligning them precisely with AWS Glue API docs.
- Verify that the new default implementation of updatePartitionStatistics(single-partition) correctly delegates to the multi-partition overload so existing code paths and third-party metastore implementations remain fully compatible.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In batchGetPartition the while‐loop can spin indefinitely if Glue keeps returning unprocessed keys but no partitions; consider adding a max retry or bail-out condition to avoid infinite loops.
- The hard-coded batch sizes (BATCH_GET_PARTITION_MAX_PAGE_SIZE, BATCH_UPDATE_PARTITION_MAX_PAGE_SIZE) may not match Glue’s actual limits or customer workloads—consider making them configurable or aligning them precisely with AWS Glue API docs.
- Verify that the new default implementation of updatePartitionStatistics(single-partition) correctly delegates to the multi-partition overload so existing code paths and third-party metastore implementations remain fully compatible.
## Individual Comments
### Comment 1
<location> `presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/glue/GlueHiveMetastore.java:437-438` </location>
<code_context>
- if (!updatedStatistics.getColumnStatistics().isEmpty()) {
- throw new PrestoException(NOT_SUPPORTED, "Glue metastore does not support column level statistics");
- }
+ Iterables.partition(updates.entrySet(), BATCH_CREATE_PARTITION_MAX_PAGE_SIZE).forEach(partitionUpdates ->
+ updatePartitionStatisticsBatch(metastoreContext, databaseName, tableName, partitionUpdates.stream().collect(toImmutableMap(Entry::getKey, Entry::getValue))));
+ }
</code_context>
<issue_to_address>
**suggestion:** Partition statistics update batching uses BATCH_CREATE_PARTITION_MAX_PAGE_SIZE for both create and update.
Use BATCH_UPDATE_PARTITION_MAX_PAGE_SIZE for update batching to match the update API's limits.
```suggestion
Iterables.partition(updates.entrySet(), BATCH_UPDATE_PARTITION_MAX_PAGE_SIZE).forEach(partitionUpdates ->
updatePartitionStatisticsBatch(metastoreContext, databaseName, tableName, partitionUpdates.stream().collect(toImmutableMap(Entry::getKey, Entry::getValue))));
```
</issue_to_address>
### Comment 2
<location> `presto-hive/src/test/java/com/facebook/presto/hive/metastore/glue/TestHiveClientGlueMetastore.java:178-181` </location>
<code_context>
HdfsConfiguration hdfsConfiguration = new HiveHdfsConfiguration(new HdfsConfigurationInitializer(hiveClientConfig, metastoreClientConfig), ImmutableSet.of(), hiveClientConfig);
HdfsEnvironment hdfsEnvironment = new HdfsEnvironment(hdfsConfiguration, metastoreClientConfig, new NoHdfsAuthentication());
- GlueHiveMetastoreConfig glueConfig = new GlueHiveMetastoreConfig();
+ GlueHiveMetastoreConfig glueConfig = new GlueHiveMetastoreConfig().setColumnStatisticsEnabled(true);
glueConfig.setDefaultWarehouseDir(tempDir.toURI().toString());
- return new GlueHiveMetastore(hdfsEnvironment, glueConfig, executor);
+ return new GlueHiveMetastore(hdfsEnvironment, glueConfig, executor, executor, executor);
}
</code_context>
<issue_to_address>
**suggestion (testing):** Test setup enables column statistics, but lacks direct tests for Glue column statistics behavior.
Please add or update tests to directly verify Glue column statistics functionality, including reading, writing, error handling, and propagation through the metastore.
Suggested implementation:
```java
@Test
public void testGlueColumnStatisticsWriteAndRead() throws Exception {
// Setup: create table and column statistics
String databaseName = "test_db";
String tableName = "test_table";
String columnName = "test_column";
HiveColumnStatistics stats = HiveColumnStatistics.builder()
.setNullsCount(1L)
.setDistinctValuesCount(2L)
.setMaxValue(Optional.of("z"))
.setMinValue(Optional.of("a"))
.build();
// Create database and table
glueMetastore.createDatabase(new HiveDatabase(databaseName, "owner", "location", ImmutableMap.of()));
glueMetastore.createTable(new HiveTable(databaseName, tableName, "owner", "location", ImmutableList.of(), ImmutableMap.of()));
// Write column statistics
glueMetastore.updateTableColumnStatistics(databaseName, tableName, columnName, stats);
// Read column statistics
Optional<HiveColumnStatistics> readStats = glueMetastore.getTableColumnStatistics(databaseName, tableName, ImmutableList.of(columnName)).get(columnName);
assertTrue(readStats.isPresent(), "Column statistics should be present");
assertEquals(readStats.get().getNullsCount(), stats.getNullsCount(), "Nulls count should match");
assertEquals(readStats.get().getDistinctValuesCount(), stats.getDistinctValuesCount(), "Distinct values count should match");
assertEquals(readStats.get().getMaxValue(), stats.getMaxValue(), "Max value should match");
assertEquals(readStats.get().getMinValue(), stats.getMinValue(), "Min value should match");
}
@Test
public void testGlueColumnStatisticsErrorHandling() {
String databaseName = "nonexistent_db";
String tableName = "nonexistent_table";
String columnName = "nonexistent_column";
// Attempt to read statistics for non-existent table/column
try {
glueMetastore.getTableColumnStatistics(databaseName, tableName, ImmutableList.of(columnName));
fail("Expected exception for non-existent table/column");
} catch (Exception e) {
// Expected: verify error propagation
assertTrue(e.getMessage().contains("not found") || e.getMessage().contains("does not exist"));
}
}
@Test
public void testGlueColumnStatisticsPropagation() throws Exception {
String databaseName = "propagation_db";
String tableName = "propagation_table";
String columnName = "propagation_column";
HiveColumnStatistics stats = HiveColumnStatistics.builder()
.setNullsCount(5L)
.setDistinctValuesCount(10L)
.build();
glueMetastore.createDatabase(new HiveDatabase(databaseName, "owner", "location", ImmutableMap.of()));
glueMetastore.createTable(new HiveTable(databaseName, tableName, "owner", "location", ImmutableList.of(), ImmutableMap.of()));
glueMetastore.updateTableColumnStatistics(databaseName, tableName, columnName, stats);
// Simulate propagation: update stats and verify new value
HiveColumnStatistics updatedStats = HiveColumnStatistics.builder()
.setNullsCount(7L)
.setDistinctValuesCount(12L)
.build();
glueMetastore.updateTableColumnStatistics(databaseName, tableName, columnName, updatedStats);
Optional<HiveColumnStatistics> readStats = glueMetastore.getTableColumnStatistics(databaseName, tableName, ImmutableList.of(columnName)).get(columnName);
assertTrue(readStats.isPresent(), "Column statistics should be present after update");
assertEquals(readStats.get().getNullsCount(), updatedStats.getNullsCount(), "Updated nulls count should match");
assertEquals(readStats.get().getDistinctValuesCount(), updatedStats.getDistinctValuesCount(), "Updated distinct values count should match");
}
```
- Ensure that the `glueMetastore` instance is properly initialized and available in the test class.
- If the test database/table creation or statistics update methods differ in your codebase, adjust the method calls accordingly.
- You may need to import relevant classes such as `HiveColumnStatistics`, `ImmutableList`, and assertion methods.
- If you use a different assertion library, replace `assertTrue`, `assertEquals`, and `fail` with your project's equivalents.
</issue_to_address>
### Comment 3
<location> `presto-hive/src/test/java/com/facebook/presto/hive/metastore/glue/TestHiveClientGlueMetastore.java:201-204` </location>
<code_context>
-
- @Override
- public void testUpdatePartitionColumnStatistics()
+ public void testUpdateTableColumnStatisticsEmptyOptionalFields() throws Exception
{
- // column statistics are not supported by Glue
+ // this test expects consistency between written and read stats but this is not provided by glue at the moment
+ // when writing empty min/max statistics glue will return 0 to the readers
+ // in order to avoid incorrect data we skip writes for statistics with min/max = null
}
</code_context>
<issue_to_address>
**suggestion (testing):** Edge case for empty min/max statistics is acknowledged but not tested.
Consider adding a test that verifies Glue's handling of empty min/max statistics, ensuring the system responds correctly and helping to catch future regressions if Glue's behavior changes.
Suggested implementation:
```java
@Test
public void testGlueReturnsZeroForEmptyMinMaxStatistics() throws Exception
{
// Setup: create a table and column statistics with null min/max
String databaseName = "test_db";
String tableName = "test_table";
String columnName = "test_column";
// Create table and column if necessary (assume helper methods exist)
createTestTable(databaseName, tableName, columnName);
// Write column statistics with null min/max
HiveColumnStatistics statsWithNullMinMax = HiveColumnStatistics.builder()
.setMin(null)
.setMax(null)
.setNullsCount(0)
.setDistinctValuesCount(0)
.build();
glueMetastore.updateTableColumnStatistics(databaseName, tableName, columnName, statsWithNullMinMax);
// Read back statistics
Optional<HiveColumnStatistics> readStats = glueMetastore.getTableColumnStatistics(databaseName, tableName, columnName);
// Assert that Glue returns 0 for min/max
assertTrue(readStats.isPresent(), "Statistics should be present");
assertEquals(readStats.get().getMin(), 0, "Glue should return 0 for min when written as null");
assertEquals(readStats.get().getMax(), 0, "Glue should return 0 for max when written as null");
}
@Override
}
```
- You may need to implement or adjust helper methods like `createTestTable` and ensure `glueMetastore` is properly initialized for the test.
- Adjust the builder and assertion logic to match your actual `HiveColumnStatistics` API and types.
- If your statistics type is not integer, update the expected value and type accordingly.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| Iterables.partition(updates.entrySet(), BATCH_CREATE_PARTITION_MAX_PAGE_SIZE).forEach(partitionUpdates -> | ||
| updatePartitionStatisticsBatch(metastoreContext, databaseName, tableName, partitionUpdates.stream().collect(toImmutableMap(Entry::getKey, Entry::getValue)))); |
There was a problem hiding this comment.
suggestion: Partition statistics update batching uses BATCH_CREATE_PARTITION_MAX_PAGE_SIZE for both create and update.
Use BATCH_UPDATE_PARTITION_MAX_PAGE_SIZE for update batching to match the update API's limits.
| Iterables.partition(updates.entrySet(), BATCH_CREATE_PARTITION_MAX_PAGE_SIZE).forEach(partitionUpdates -> | |
| updatePartitionStatisticsBatch(metastoreContext, databaseName, tableName, partitionUpdates.stream().collect(toImmutableMap(Entry::getKey, Entry::getValue)))); | |
| Iterables.partition(updates.entrySet(), BATCH_UPDATE_PARTITION_MAX_PAGE_SIZE).forEach(partitionUpdates -> | |
| updatePartitionStatisticsBatch(metastoreContext, databaseName, tableName, partitionUpdates.stream().collect(toImmutableMap(Entry::getKey, Entry::getValue)))); |
|
@imjalpreet could you please add Trino cherry-pick commits as well, whichever is relevant? |
| } | ||
| } | ||
|
|
||
| private Optional<Map<String, HiveColumnStatistics>> getPartitionColumnStatisticsIfPresent(Partition partition) |
|
This PR needs some implementation changes to work with AWS SDK v2, as we are working on the upgrade: #26670. I will re-raise this feature as a separate PR with the updated implementation once the upgrade PR is merged. |
Description
Add support for AWS Glue Table and Column Statistics
Motivation and Context
Based on trinodb/trino@f1bcfa7
Impact
Users will be able to utilize statistics and enable CBO when using AWS Glue as a metastore.
Test Plan
Contributor checklist
Release Notes
Please follow release notes guidelines and fill in the release notes below.