feat(connector): Upgrade AWS Glue to AWS SDK v2 and Migrate to MetricPublisher#26670
feat(connector): Upgrade AWS Glue to AWS SDK v2 and Migrate to MetricPublisher#26670yingsu00 merged 1 commit intoprestodb:masterfrom
Conversation
e56f090 to
783f099
Compare
783f099 to
737966c
Compare
Reviewer's GuideUpgrades the Glue metastore integration from AWS SDK v1 to v2, refactors Glue/HMS converters and tests to the v2 model/builder style, introduces a generic AWS SDK v2 metrics publisher and wires it into GlueMetastoreStats, updates configuration around Glue/STS, and replaces v1 dependencies in the Hive/Hive-metastore modules with the corresponding AWS SDK v2 artifacts. Sequence diagram for Glue getDatabase call using AWS SDK v2 async client and sync helpersequenceDiagram
actor QueryEngine
participant HiveClient as HiveConnector
participant Metastore as GlueHiveMetastore
participant Stats as GlueCatalogApiStats
participant GlueClient as GlueAsyncClient
participant AwsMetrics as AwsSdkClientRequestMetricsPublisher
QueryEngine->>HiveClient: execute query
HiveClient->>Metastore: getDatabase(metastoreContext, databaseName)
Metastore->>Metastore: awsSyncRequest(glueClient.getDatabase, GetDatabaseRequest, stats.getGetDatabase())
activate Metastore
Metastore->>Stats: record(() -> ...)
activate Stats
Stats-->>Metastore: wraps supplier
deactivate Stats
Metastore->>GlueClient: getDatabase(GetDatabaseRequest)
activate GlueClient
GlueClient-->>Metastore: CompletableFuture~GetDatabaseResponse~
deactivate GlueClient
Metastore->>Metastore: join() on CompletableFuture
note over Metastore,GlueClient: AWS SDK v2 internally publishes metrics
GlueClient-)AwsMetrics: publish(MetricCollection)
activate AwsMetrics
AwsMetrics->>AwsMetrics: update AwsSdkClientStats counters and timers
deactivate AwsMetrics
Metastore-->>Metastore: GetDatabaseResponse
Metastore-->>HiveClient: Optional~Database~
deactivate Metastore
HiveClient-->>QueryEngine: database metadata
Class diagram for updated GlueHiveMetastore and AWS SDK v2 integrationclassDiagram
class GlueHiveMetastore {
-GlueMetastoreStats stats
-HdfsEnvironment hdfsEnvironment
-HdfsContext hdfsContext
-GlueAsyncClient glueClient
-Optional defaultDir
-String catalogId
-int partitionSegments
-Executor executor
+GlueHiveMetastore(HdfsEnvironment hdfsEnvironment, GlueHiveMetastoreConfig glueConfig, Executor executor)
-static GlueAsyncClient createAsyncGlueClient(GlueHiveMetastoreConfig config, MetricPublisher metricPublisher)
-static <R,T> T awsSyncRequest(Function~R,CompletableFuture~T~~ submission, R request, GlueCatalogApiStats stats)
-static <T> void awsSyncPaginatedRequest(SdkPublisher~T~ paginator, Consumer~T~ resultConsumer, GlueCatalogApiStats stats)
+Optional~Database~ getDatabase(MetastoreContext metastoreContext, String databaseName)
+List~String~ getAllDatabases(MetastoreContext metastoreContext)
+Optional~Table~ getTable(MetastoreContext metastoreContext, String databaseName, String tableName)
+Optional~List~String~~ getAllTables(MetastoreContext metastoreContext, String databaseName)
+Optional~List~String~~ getAllViews(MetastoreContext metastoreContext, String databaseName)
+void createDatabase(MetastoreContext metastoreContext, Database database)
+void dropDatabase(MetastoreContext metastoreContext, String databaseName)
+void renameDatabase(MetastoreContext metastoreContext, String databaseName, String newDatabaseName)
+MetastoreOperationResult createTable(MetastoreContext metastoreContext, Table table, PrincipalPrivileges privileges)
+void dropTable(MetastoreContext metastoreContext, String databaseName, String tableName)
+MetastoreOperationResult replaceTable(MetastoreContext metastoreContext, String databaseName, String tableName, Table newTable)
+MetastoreOperationResult persistTable(MetastoreContext metastoreContext, String databaseName, String tableName, Table newTable, PartitionStatistics updatedStatistics)
+MetastoreOperationResult addColumn(...)
+MetastoreOperationResult renameColumn(...)
+MetastoreOperationResult dropColumn(...)
+Optional~Partition~ getPartition(MetastoreContext metastoreContext, String databaseName, String tableName, List~String~ partitionValues)
+List~Partition~ getPartitions(String databaseName, String tableName, String expression, Segment segment)
+MetastoreOperationResult addPartitions(...)
+MetastoreOperationResult alterPartition(...)
}
class GlueMetastoreStats {
-GlueCatalogApiStats getDatabases
-GlueCatalogApiStats getDatabase
-GlueCatalogApiStats getTables
-GlueCatalogApiStats getTable
-GlueCatalogApiStats createDatabase
-GlueCatalogApiStats deleteDatabase
-GlueCatalogApiStats updateDatabase
-GlueCatalogApiStats createTable
-GlueCatalogApiStats deleteTable
-GlueCatalogApiStats updateTable
-GlueCatalogApiStats getPartitions
-GlueCatalogApiStats getPartition
-AwsSdkClientStats awsSdkClientStats
+GlueCatalogApiStats getGetDatabases()
+GlueCatalogApiStats getGetDatabase()
+GlueCatalogApiStats getGetTables()
+GlueCatalogApiStats getGetTable()
+GlueCatalogApiStats getCreateDatabase()
+GlueCatalogApiStats getDeleteDatabase()
+GlueCatalogApiStats getUpdateDatabase()
+GlueCatalogApiStats getCreateTable()
+GlueCatalogApiStats getDeleteTable()
+GlueCatalogApiStats getUpdateTable()
+GlueCatalogApiStats getGetPartitions()
+GlueCatalogApiStats getGetPartition()
+AwsSdkClientStats getAwsSdkClientStats()
+MetricPublisher newRequestMetricPublisher()
}
class GlueCatalogApiStats {
-TimeStat time
-CounterStat totalFailures
+void record(Runnable action)
+<T> T record(Supplier~T~ action)
+void recordAsync(long executionTimeNanos, boolean failed)
}
class AwsSdkClientStats {
-CounterStat awsRequestCount
-CounterStat awsRetryCount
-CounterStat awsThrottleExceptions
-TimeStat awsServiceCallDuration
-TimeStat awsApiCallDuration
-TimeStat awsBackoffDelayDuration
+CounterStat getAwsRequestCount()
+CounterStat getAwsRetryCount()
+CounterStat getAwsThrottleExceptions()
+TimeStat getAwsServiceCallDuration()
+TimeStat getAwsApiCallDuration()
+TimeStat getAwsBackoffDelayDuration()
+AwsSdkClientRequestMetricsPublisher newRequestMetricsPublisher()
}
class AwsSdkClientRequestMetricsPublisher {
-AwsSdkClientStats stats
+AwsSdkClientRequestMetricsPublisher(AwsSdkClientStats stats)
+void publish(MetricCollection metricCollection)
+void close()
}
class GlueHiveMetastoreConfig {
-Optional~String~ glueRegion
-Optional~String~ glueEndpointUrl
-Optional~String~ glueStsRegion
-Optional~String~ glueStsEndpointUrl
-int maxGlueErrorRetries
-int maxGlueConnections
-Optional~String~ defaultWarehouseDir
-Optional~String~ catalogId
+Optional~String~ getGlueRegion()
+GlueHiveMetastoreConfig setGlueRegion(String region)
+Optional~String~ getGlueEndpointUrl()
+GlueHiveMetastoreConfig setGlueEndpointUrl(String url)
+Optional~String~ getGlueStsRegion()
+GlueHiveMetastoreConfig setGlueStsRegion(String region)
+Optional~String~ getGlueStsEndpointUrl()
+GlueHiveMetastoreConfig setGlueStsEndpointUrl(String url)
+int getMaxGlueErrorRetries()
+int getMaxGlueConnections()
}
class GlueAsyncClient {
+CompletableFuture~GetDatabaseResponse~ getDatabase(GetDatabaseRequest request)
+CompletableFuture~GetTableResponse~ getTable(GetTableRequest request)
+GetDatabasesIterable getDatabasesPaginator(GetDatabasesRequest request)
+GetTablesIterable getTablesPaginator(GetTablesRequest request)
+GetPartitionsIterable getPartitionsPaginator(GetPartitionsRequest request)
+CompletableFuture~BatchGetPartitionResponse~ batchGetPartition(BatchGetPartitionRequest request)
+CompletableFuture~BatchCreatePartitionResponse~ batchCreatePartition(BatchCreatePartitionRequest request)
}
GlueHiveMetastore --> GlueMetastoreStats : composes
GlueHiveMetastore --> GlueHiveMetastoreConfig : uses
GlueHiveMetastore --> GlueAsyncClient : uses
GlueHiveMetastore ..> GlueCatalogApiStats : uses
GlueMetastoreStats --> GlueCatalogApiStats : aggregates
GlueMetastoreStats --> AwsSdkClientStats : aggregates
AwsSdkClientStats --> AwsSdkClientRequestMetricsPublisher : creates
GlueHiveMetastore ..> MetricPublisher : passes to client
GlueHiveMetastore ..> AwsSdkClientRequestMetricsPublisher : metrics via publisher
Class diagram for updated Glue input/output converters using AWS SDK v2 modelsclassDiagram
class GlueInputConverter {
<<utility>>
+static DatabaseInput convertDatabase(Database database)
+static TableInput convertTable(Table table)
+static TableInput toTableInput(software.amazon.awssdk.services.glue.model.Table table)
+static PartitionInput convertPartition(PartitionWithStatistics partitionWithStatistics)
+static PartitionInput convertPartition(Partition partition)
-static StorageDescriptor convertStorage(Storage storage, List~Column~ columns)
+static software.amazon.awssdk.services.glue.model.Column convertColumn(Column prestoColumn)
}
class GlueToPrestoConverter {
<<utility>>
+static Database convertDatabase(software.amazon.awssdk.services.glue.model.Database glueDb)
+static Table convertTable(software.amazon.awssdk.services.glue.model.Table glueTable, String dbName)
}
class GluePartitionConverter {
<<static nested>>
-Function~List~software.amazon.awssdk.services.glue.model.Column~~, List~Column~~ columnsConverter
-Function~Map~String,String~~, Map~String,String~~ parametersConverter
-StorageConverter storageConverter
-String databaseName
-String tableName
+GluePartitionConverter(String databaseName, String tableName)
+Partition apply(software.amazon.awssdk.services.glue.model.Partition gluePartition)
}
class StorageConverter {
<<static nested>>
-Function~List~String~~, List~String~~ bucketColumns
-Function~List~software.amazon.awssdk.services.glue.model.Order~~, List~SortingColumn~~ sortColumns
-UnaryOperator~Optional~HiveBucketProperty~~ bucketProperty
-Function~Map~String,String~~, Map~String,String~~ serdeParametersConverter
-Function~Map~String,String~~, Map~String,String~~ partitionParametersConverter
-StorageFormatConverter storageFormatConverter
+void setConvertedStorage(StorageDescriptor sd, Storage.Builder storageBuilder)
-Optional~HiveBucketProperty~ createBucketProperty(StorageDescriptor sd)
-static List~SortingColumn~ createSortingColumns(List~software.amazon.awssdk.services.glue.model.Order~ sortColumns)
}
class StorageFormatConverter {
<<static nested>>
-Function~String,String~ serializationLib
-Function~String,String~ inputFormat
-Function~String,String~ outputFormat
+StorageFormat createStorageFormat(SerDeInfo serdeInfo, StorageDescriptor storageDescriptor)
}
class DatabaseInput {
+builder()
}
class TableInput {
+builder()
+TableInput.Builder toBuilder()
}
class PartitionInput {
+builder()
+PartitionInput.Builder toBuilder()
}
class StorageDescriptor {
+builder()
+StorageDescriptor.Builder toBuilder()
}
class SerDeInfo {
+builder()
}
class Column {
}
class Table {
}
class Database {
}
class Partition {
}
GlueInputConverter ..> DatabaseInput : builds
GlueInputConverter ..> TableInput : builds
GlueInputConverter ..> PartitionInput : builds
GlueInputConverter ..> StorageDescriptor : builds
GlueInputConverter ..> SerDeInfo : builds
GlueInputConverter ..> software.amazon.awssdk.services.glue.model.Column : builds
GlueInputConverter ..> Table : reads
GlueInputConverter ..> Database : reads
GlueInputConverter ..> Partition : reads
GlueToPrestoConverter ..> GluePartitionConverter : uses
GlueToPrestoConverter ..> StorageConverter : uses
GlueToPrestoConverter ..> StorageFormatConverter : uses
GlueToPrestoConverter ..> software.amazon.awssdk.services.glue.model.Table : reads
GlueToPrestoConverter ..> software.amazon.awssdk.services.glue.model.Database : reads
GluePartitionConverter ..> Partition : builds
GluePartitionConverter ..> StorageConverter : uses
StorageConverter ..> StorageFormatConverter : uses
StorageConverter ..> StorageDescriptor : reads
StorageFormatConverter ..> StorageDescriptor : reads
StorageFormatConverter ..> SerDeInfo : reads
Class diagram for GlueCatalogApiStats async metrics and GlueStatsAsyncHandlerclassDiagram
class GlueCatalogApiStats {
-TimeStat time
-CounterStat totalFailures
+void record(Runnable action)
+<T> T record(Supplier~T~ action)
+void recordAsync(long executionTimeNanos, boolean failed)
}
class GlueStatsAsyncHandler {
+GlueStatsAsyncHandler(GlueCatalogApiStats stats)
-GlueCatalogApiStats stats
-Stopwatch stopwatch
+void onError(Throwable e)
+void onSuccess(GlueResponse response)
}
class Stopwatch {
+static Stopwatch createStarted()
+long elapsed(TimeUnit unit)
}
class GlueResponse {
}
GlueStatsAsyncHandler --> GlueCatalogApiStats : updates
GlueStatsAsyncHandler ..> Stopwatch : measures
GlueStatsAsyncHandler ..> GlueResponse : consumes
GlueHiveMetastore ..> GlueStatsAsyncHandler : uses for async batch APIs
File-Level Changes
Assessment against linked issues
Possibly linked issues
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Hey there - I've reviewed your changes and they look great!
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location> `presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/glue/GlueHiveMetastore.java:1208-1233` </location>
<code_context>
+ }
+ }
+
+ private static <T> void awsSyncPaginatedRequest(
+ SdkPublisher<T> paginator,
+ Consumer<T> resultConsumer,
+ GlueCatalogApiStats stats)
+ {
+ requireNonNull(paginator, "paginator is null");
+ requireNonNull(resultConsumer, "resultConsumer is null");
+
+ try {
+ CompletableFuture<Void> paginationFuture;
+ if (stats != null) {
+ paginationFuture = stats.record(() -> paginator.subscribe(resultConsumer));
+ }
+ else {
+ paginationFuture = paginator.subscribe(resultConsumer);
+ }
+
+ paginationFuture.join();
+ }
+ catch (CompletionException e) {
+ if (e.getCause() instanceof GlueException) {
+ throw (GlueException) e.getCause();
+ }
+ throw new PrestoException(HIVE_METASTORE_ERROR, e.getCause());
+ }
+ }
</code_context>
<issue_to_address>
**suggestion:** Consider using a single join point for pagination to keep exception handling consistent.
Currently, `stats.record` only wraps `paginator.subscribe(resultConsumer)`, so the recorded time excludes the `join()` duration. If you want stats to cover the full paginated request, including completion, move `join()` inside the `record` lambda (or call `subscribe(...).join()` inside it) so the timing matches the actual wall clock time of the request.
```suggestion
private static <T> void awsSyncPaginatedRequest(
SdkPublisher<T> paginator,
Consumer<T> resultConsumer,
GlueCatalogApiStats stats)
{
requireNonNull(paginator, "paginator is null");
requireNonNull(resultConsumer, "resultConsumer is null");
// Single join point so exception handling is consistent, and stats (when present)
// cover the full wall-clock time of the paginated request including completion.
Runnable paginationTask = () -> paginator.subscribe(resultConsumer).join();
try {
if (stats != null) {
// Record the duration of the entire paginated request, including join().
stats.record(() -> {
paginationTask.run();
return null;
});
}
else {
paginationTask.run();
}
}
catch (CompletionException e) {
if (e.getCause() instanceof GlueException) {
throw (GlueException) e.getCause();
}
throw new PrestoException(HIVE_METASTORE_ERROR, e.getCause());
}
}
```
</issue_to_address>
### Comment 2
<location> `presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/glue/GlueHiveMetastore.java:950-958` </location>
<code_context>
- .withPartitionsToGet(partitionValuesBatch), stats.getBatchGetPartitions().metricsAsyncHandler()));
+ List<PartitionValueList> partitionValuesBatch = mappedCopy(partitionNamesBatch, partitionName -> PartitionValueList.builder().values(toPartitionValues(partitionName)).build());
+
+ GlueStatsAsyncHandler asyncHandler = new GlueStatsAsyncHandler(stats.getBatchGetPartitions());
+
+ batchGetPartitionFutures.add(glueClient.batchGetPartition(BatchGetPartitionRequest.builder()
+ .catalogId(catalogId)
+ .databaseName(databaseName)
+ .tableName(tableName)
+ .partitionsToGet(partitionValuesBatch)
+ .build())
+ .whenCompleteAsync((response, exception) -> {
+ if (response != null) {
+ asyncHandler.onSuccess(response);
</code_context>
<issue_to_address>
**suggestion (performance):** Reconsider using whenCompleteAsync without an explicit executor for metrics-only callbacks.
These callbacks currently use `whenCompleteAsync` without an executor, so they run on `ForkJoinPool.commonPool`. Since they only record metrics and don’t block, `whenComplete` should be enough and avoids extra scheduling and common-pool contention:
```java
.whenComplete((response, exception) -> { ... })
```
If true async decoupling is still desired, pass a dedicated executor instead of relying on the common pool.
Suggested implementation:
```java
.build())
.whenComplete((response, exception) -> {
if (response != null) {
asyncHandler.onSuccess(response);
}
else if (exception != null) {
asyncHandler.onError(exception);
}
}));
```
If at some later point you decide you do want decoupled, explicit async execution for these metrics callbacks, introduce a dedicated `Executor` (e.g., a shared metrics executor in this class or injected from elsewhere) and change this call to `whenCompleteAsync(..., metricsExecutor)` instead of relying on the `ForkJoinPool.commonPool`.
</issue_to_address>
### Comment 3
<location> `presto-hive-common/src/main/java/com/facebook/presto/hive/aws/metrics/AwsSdkClientStats.java:108-117` </location>
<code_context>
+ public void publish(MetricCollection metricCollection)
</code_context>
<issue_to_address>
**suggestion:** Clarify how requestCount and retryCount are aggregated from RETRY_COUNT metrics.
This logic sums `RETRY_COUNT` and adds 1 per entry to derive `requestCount`, which implicitly assumes `RETRY_COUNT` is the per-attempt retry index:
```java
long requestCount = metricCollection.metricValues(RETRY_COUNT)
.stream()
.map(i -> i + 1)
.reduce(Integer::sum).orElse(0);
```
To make this more obvious and less tied to current SDK semantics, consider counting each `ApiCallAttempt` as one request (e.g., via the size of the attempt collection) and deriving retries separately from `RETRY_COUNT`. This would be easier to reason about and more robust to future changes in metric definitions.
</issue_to_address>
### Comment 4
<location> `presto-hive/src/test/java/com/facebook/presto/hive/metastore/glue/TestGlueInputConverter.java:55` </location>
<code_context>
+ assertEquals(dbInput.parameters(), testDb.getParameters());
}
@Test
</code_context>
<issue_to_address>
**suggestion (testing):** Add coverage for `convertPartition(PartitionWithStatistics)` and statistics parameter merging
The current tests cover `convertDatabase`, `convertTable`, and `convertPartition(Partition)`, but not the new `convertPartition(PartitionWithStatistics)` behavior that merges basic statistics into parameters via `updateStatisticsParameters`.
Please add a focused test (e.g., `testConvertPartitionWithStatistics`) that:
- builds a `PartitionWithStatistics` with non-empty basic statistics and existing parameters,
- calls `convertPartition(PartitionWithStatistics)`, and
- asserts that the resulting `PartitionInput.parameters()` contains both the original parameters and the statistics-derived entries, and that non-empty column statistics still cause the expected `PrestoException`.
This will verify the new builder-based implementation and protect against regressions.
Suggested implementation:
```java
assertEquals(dbInput.name(), testDb.getDatabaseName());
assertEquals(dbInput.description(), testDb.getComment().get());
assertEquals(dbInput.locationUri(), testDb.getLocation().get());
assertEquals(dbInput.parameters(), testDb.getParameters());
}
@Test
public void testConvertPartitionWithStatistics()
{
// existing partition with parameters
Partition partition = new Partition(
"test_db",
"test_table",
ImmutableList.of("part_value"),
"s3://bucket/path",
"inputFormat",
"outputFormat",
"serdeLib",
ImmutableMap.of("existing_param", "existing_value"),
Optional.empty(),
Optional.empty());
// non-empty basic statistics
HiveBasicStatistics basicStatistics = new HiveBasicStatistics(
OptionalLong.of(5L), // fileCount
OptionalLong.of(100L), // rowCount
OptionalLong.of(1024L), // inMemoryDataSizeInBytes
OptionalLong.of(2048L)); // onDiskDataSizeInBytes
// no column statistics
PartitionWithStatistics partitionWithStatistics = PartitionWithStatistics.newPartition(
partition,
PartitionStatistics.builder()
.setBasicStatistics(basicStatistics)
.setColumnStatistics(ImmutableMap.of())
.build());
PartitionInput partitionInput = GlueInputConverter.convertPartition(partitionWithStatistics);
Map<String, String> parameters = partitionInput.parameters();
// original parameters are preserved
assertEquals(parameters.get("existing_param"), "existing_value");
// basic statistics are merged into parameters (at least one extra entry added)
assertTrue(parameters.size() > 1, "expected statistics-derived parameters to be merged");
// verify that non-empty column statistics still cause a PrestoException
ColumnStatistics nonEmptyColumnStatistics = new ColumnStatistics(
Optional.of(new BooleanStatistics(Optional.of(1L), Optional.of(0L))),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty());
PartitionWithStatistics partitionWithColumnStats = PartitionWithStatistics.newPartition(
partition,
PartitionStatistics.builder()
.setBasicStatistics(basicStatistics)
.setColumnStatistics(ImmutableMap.of("col", nonEmptyColumnStatistics))
.build());
assertThrows(PrestoException.class, () -> GlueInputConverter.convertPartition(partitionWithColumnStats));
}
@Test
```
This new test relies on several types and utilities that may already be imported elsewhere in `TestGlueInputConverter`. If any of the following are missing from the import section of the file, they need to be added:
- `import com.facebook.presto.hive.metastore.ColumnStatistics;`
- `import com.facebook.presto.hive.metastore.HiveBasicStatistics;`
- `import com.facebook.presto.hive.metastore.Partition;`
- `import com.facebook.presto.hive.metastore.PartitionStatistics;`
- `import com.facebook.presto.hive.metastore.PartitionWithStatistics;`
- `import com.facebook.presto.hive.statistics.BooleanStatistics;`
- `import com.facebook.presto.spi.PrestoException;`
- `import com.google.common.collect.ImmutableList;`
- `import com.google.common.collect.ImmutableMap;`
- `import java.util.Map;`
- `import java.util.Optional;`
- `import java.util.OptionalLong;`
- `import static org.junit.Assert.assertThrows;`
- `import static org.testng.Assert.assertTrue;` (or the appropriate assertTrue/throws utilities consistent with the rest of the test file)
If the project prefers TestNG’s `expectedExceptions` instead of `assertThrows`, you should adjust the `assertThrows` usage to a separate `@Test(expectedExceptions = PrestoException.class)` method or adapt it to the existing testing style in this file.
</issue_to_address>
### Comment 5
<location> `presto-hive-metastore/src/test/java/com/facebook/presto/hive/metastore/glue/TestGlueHiveMetastoreConfig.java:30-34` </location>
<code_context>
assertRecordedDefaults(recordDefaults(GlueHiveMetastoreConfig.class)
.setGlueRegion(null)
.setGlueEndpointUrl(null)
- .setPinGlueClientToCurrentRegion(false)
+ .setGlueStsRegion(null)
+ .setGlueStsEndpointUrl(null)
.setMaxGlueConnections(50)
.setMaxGlueErrorRetries(10)
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding tests for the new AwsSdkClientStats MetricPublisher wiring
This config test only covers the new STS fields and removal of `pin-client-to-current-region`, but doesn’t exercise the new metrics path:
* `AwsSdkClientStats` and `AwsSdkClientRequestMetricsPublisher` in `presto-hive-common`
* `newRequestMetricPublisher()` usage in `GlueMetastoreStats` and Glue client creation
Please add a focused test (e.g., `TestAwsSdkClientStats`) that builds a synthetic `MetricCollection`, passes it to the publisher, and asserts that:
* `RETRY_COUNT` is split into `awsRequestCount` and `awsRetryCount`
* throttling errors (`ERROR_TYPE == THROTTLING`) are counted
* `SERVICE_CALL_DURATION`, `API_CALL_DURATION`, and `BACKOFF_DELAY_DURATION` feed the correct `TimeStat`s
This will verify the v2 MetricPublisher wiring and preserve the semantics of the old v1 metrics collector.
Suggested implementation:
```java
assertRecordedDefaults(recordDefaults(GlueHiveMetastoreConfig.class)
.setGlueRegion(null)
.setGlueEndpointUrl(null)
.setGlueStsRegion(null)
.setGlueStsEndpointUrl(null)
.setMaxGlueConnections(50)
.setMaxGlueErrorRetries(10)
.setDefaultWarehouseDir(null)
Map<String, String> properties = new ImmutableMap.Builder<String, String>()
.put("hive.metastore.glue.region", "us-east-1")
.put("hive.metastore.glue.endpoint-url", "http://foo.bar")
.put("hive.metastore.glue.sts.region", "us-east-1")
.put("hive.metastore.glue.sts.endpoint-url", "http://foo.bar")
.put("hive.metastore.glue.max-connections", "10")
.put("hive.metastore.glue.max-error-retries", "20")
.put("hive.metastore.glue.default-warehouse-dir", "/location")
```
To fully implement your suggestion about testing the new `AwsSdkClientStats` MetricPublisher wiring, you should add a new test class in the `presto-hive-common` module (or wherever `AwsSdkClientStats` lives), for example:
`presto-hive-common/src/test/java/com/facebook/presto/hive/metrics/TestAwsSdkClientStats.java`.
Key points for that test:
1. **Fixture setup**
* Instantiate `AwsSdkClientStats` (or directly `AwsSdkClientRequestMetricsPublisher`) similarly to production wiring (same constructor parameters / dependencies).
* Create a synthetic `MetricCollection` using the AWS SDK v2 metrics types, e.g.:
* `MetricCollection.builder().namespace("test").metric(SdkMetric, value)...build()`
* Use the same `SdkMetric` constants that the production code expects:
* `CoreMetric.RETRY_COUNT`
* `CoreMetric.ERROR_TYPE`
* `CoreMetric.SERVICE_CALL_DURATION`
* `CoreMetric.API_CALL_DURATION`
* `CoreMetric.BACKOFF_DELAY_DURATION`
* And `CoreMetric.THROTTLING_EXCEPTION` / `CoreMetric.THROTTLING` (or the appropriate enum value used by your code for throttling errors).
2. **Test: RETRY_COUNT split**
* Build a `MetricCollection` with:
* `RETRY_COUNT = 3`
* `API_CALL_ATTEMPT_COUNT = 4` (or equivalent; depends on how your implementation derives the total request count).
* Pass it to `publisher.publish(metricCollection)`.
* Assert that:
* The `AwsSdkClientStats` counters (or their wrapped `CounterStat` / `Counter` objects) report:
* `awsRequestCount = 4`
* `awsRetryCount = 3`
3. **Test: throttling errors counted**
* Build a `MetricCollection` containing an error metric with `ERROR_TYPE == THROTTLING`.
* Publish it via the `MetricPublisher`.
* Assert that the throttling counter in `AwsSdkClientStats` (e.g., `awsThrottlingErrors` or similar) increments by 1.
4. **Test: durations wired to `TimeStat`s**
* Create a `MetricCollection` that includes:
* `SERVICE_CALL_DURATION` with a known value (e.g., a `Duration` of 100 ms).
* `API_CALL_DURATION` with a different known value.
* `BACKOFF_DELAY_DURATION` with another known value.
* Publish the metrics.
* Assert that the corresponding `TimeStat` instances inside `AwsSdkClientStats` have recorded those values (e.g., via `getAllTime()`, `getTotalCount()`, `getTotalTime()`, or equivalent methods used in existing TimeStat tests in the project).
5. **Check existing patterns**
* Follow the conventions in existing metric test classes in the repository (e.g., `TestHiveMetastoreStats`, `TestGlueMetastoreStats`, or any existing tests for `TimeStat`/`CounterStat`) for:
* How to construct and verify `TimeStat` and `CounterStat`.
* Thread-safety / synchronization expectations.
* Naming of test methods (e.g., `testRetryAndRequestCounts`, `testThrottlingErrors`, `testDurations`).
These additional tests will validate that the v2 `MetricPublisher` preserves the semantics of the previous v1 metrics collector, in particular:
* Splitting retry and request counts correctly.
* Counting throttling errors.
* Routing AWS duration metrics into the correct `TimeStat`s used by the Presto metrics system.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
...-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/glue/GlueHiveMetastore.java
Show resolved
Hide resolved
| GlueStatsAsyncHandler asyncHandler = new GlueStatsAsyncHandler(stats.getBatchGetPartitions()); | ||
|
|
||
| batchGetPartitionFutures.add(glueClient.batchGetPartition(BatchGetPartitionRequest.builder() | ||
| .catalogId(catalogId) | ||
| .databaseName(databaseName) | ||
| .tableName(tableName) | ||
| .partitionsToGet(partitionValuesBatch) | ||
| .build()) | ||
| .whenCompleteAsync((response, exception) -> { |
There was a problem hiding this comment.
suggestion (performance): Reconsider using whenCompleteAsync without an explicit executor for metrics-only callbacks.
These callbacks currently use whenCompleteAsync without an executor, so they run on ForkJoinPool.commonPool. Since they only record metrics and don’t block, whenComplete should be enough and avoids extra scheduling and common-pool contention:
.whenComplete((response, exception) -> { ... })If true async decoupling is still desired, pass a dedicated executor instead of relying on the common pool.
Suggested implementation:
.build())
.whenComplete((response, exception) -> {
if (response != null) {
asyncHandler.onSuccess(response);
}
else if (exception != null) {
asyncHandler.onError(exception);
}
}));If at some later point you decide you do want decoupled, explicit async execution for these metrics callbacks, introduce a dedicated Executor (e.g., a shared metrics executor in this class or injected from elsewhere) and change this call to whenCompleteAsync(..., metricsExecutor) instead of relying on the ForkJoinPool.commonPool.
aaneja
left a comment
There was a problem hiding this comment.
Some questions, but looks good for the most part
...-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/glue/GlueHiveMetastore.java
Show resolved
Hide resolved
...-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/glue/GlueHiveMetastore.java
Outdated
Show resolved
Hide resolved
...-hive/src/test/java/com/facebook/presto/hive/metastore/glue/TestHiveClientGlueMetastore.java
Show resolved
Hide resolved
737966c to
c1feaaa
Compare
|
@aaneja, I replied to your comments. Please have a look when you get a chance. Thanks! |
c1feaaa to
2159dfc
Compare
|
@imjalpreet imported this issue as lakehouse/presto #26670 |
Description
This PR upgrades the Glue client to AWS SDK v2 and updates the GlueHiveMetastore used by the Presto Lakehouse connectors accordingly.
Key Changes
Motivation and Context
#26668 and #25529
Impact
hive.metastore.glue.pin-client-to-current-regionis deprecated as AWS Region SDK v2 no longer supports the Regions.getCurrentRegion API. The current region will be inferred automatically if Presto is running on an EC2 machine.Test Plan
Sample JMX metrics output for the new metrics published via the MetricPublisher
Ran the TestHiveClientGlueMetastore suite with AWS Glue, and below are the test results
TestHiveClientGlueMetastore.html
Contributor checklist
Release Notes
Please follow release notes guidelines and fill in the release notes below.