From f06d91ce67038febddd0ff3df0f276e86ab4af36 Mon Sep 17 00:00:00 2001 From: Anoop Johnson Date: Wed, 20 Nov 2019 11:47:45 -0800 Subject: [PATCH] Implement Parallel Partition Pruning for Glue Hive Metastore This change parallelizes the partition fetch for the Glue metastore by splitting the partitions into non-overlapping segments[2]. This can speed up query planning by upto an order of magnitude. [1] https://docs.aws.amazon.com/glue/latest/webapi/API_Segment.html --- .../metastore/glue/ForGlueHiveMetastore.java | 29 ++++++++++ .../metastore/glue/GlueHiveMetastore.java | 58 ++++++++++++++++++- .../glue/GlueHiveMetastoreConfig.java | 32 ++++++++++ .../metastore/glue/GlueMetastoreModule.java | 21 +++++++ .../glue/TestGlueHiveMetastoreConfig.java | 10 +++- .../presto/hive/AbstractTestHiveClient.java | 3 +- .../glue/TestHiveClientGlueMetastore.java | 41 ++++++++++++- 7 files changed, 187 insertions(+), 7 deletions(-) create mode 100644 presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/glue/ForGlueHiveMetastore.java diff --git a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/glue/ForGlueHiveMetastore.java b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/glue/ForGlueHiveMetastore.java new file mode 100644 index 0000000000000..a0c0635db36a9 --- /dev/null +++ b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/glue/ForGlueHiveMetastore.java @@ -0,0 +1,29 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.hive.metastore.glue; + +import javax.inject.Qualifier; + +import java.lang.annotation.Retention; +import java.lang.annotation.Target; + +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.ElementType.PARAMETER; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +@Retention(RUNTIME) +@Target({FIELD, PARAMETER, METHOD}) +@Qualifier +public @interface ForGlueHiveMetastore {} diff --git a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/glue/GlueHiveMetastore.java b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/glue/GlueHiveMetastore.java index c031678124a70..029a90757ef57 100644 --- a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/glue/GlueHiveMetastore.java +++ b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/glue/GlueHiveMetastore.java @@ -47,6 +47,7 @@ import com.amazonaws.services.glue.model.PartitionError; import com.amazonaws.services.glue.model.PartitionInput; import com.amazonaws.services.glue.model.PartitionValueList; +import com.amazonaws.services.glue.model.Segment; import com.amazonaws.services.glue.model.TableInput; import com.amazonaws.services.glue.model.UpdateDatabaseRequest; import com.amazonaws.services.glue.model.UpdatePartitionRequest; @@ -86,15 +87,20 @@ import com.google.common.collect.Lists; import org.apache.hadoop.fs.Path; +import javax.annotation.Nullable; import javax.inject.Inject; import java.util.ArrayList; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.Future; import java.util.function.Function; @@ -115,6 +121,8 @@ import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; import static com.facebook.presto.spi.security.PrincipalType.USER; import static com.google.common.base.Strings.isNullOrEmpty; +import static com.google.common.collect.Comparators.lexicographical; +import static java.util.Comparator.comparing; import static java.util.Objects.requireNonNull; import static java.util.function.UnaryOperator.identity; import static java.util.stream.Collectors.toList; @@ -130,26 +138,38 @@ public class GlueHiveMetastore private static final String WILDCARD_EXPRESSION = ""; private static final int BATCH_GET_PARTITION_MAX_PAGE_SIZE = 1000; private static final int BATCH_CREATE_PARTITION_MAX_PAGE_SIZE = 100; + private static final Comparator PARTITION_COMPARATOR = comparing(Partition::getValues, lexicographical(String.CASE_INSENSITIVE_ORDER)); private final HdfsEnvironment hdfsEnvironment; private final HdfsContext hdfsContext; private final AWSGlueAsync glueClient; private final Optional defaultDir; private final String catalogId; + private final int partitionSegments; + private final Executor executor; @Inject - public GlueHiveMetastore(HdfsEnvironment hdfsEnvironment, GlueHiveMetastoreConfig glueConfig) + public GlueHiveMetastore( + HdfsEnvironment hdfsEnvironment, + GlueHiveMetastoreConfig glueConfig, + @ForGlueHiveMetastore Executor executor) { - this(hdfsEnvironment, glueConfig, createAsyncGlueClient(glueConfig)); + this(hdfsEnvironment, glueConfig, createAsyncGlueClient(glueConfig), executor); } - public GlueHiveMetastore(HdfsEnvironment hdfsEnvironment, GlueHiveMetastoreConfig glueConfig, AWSGlueAsync glueClient) + public GlueHiveMetastore( + HdfsEnvironment hdfsEnvironment, + GlueHiveMetastoreConfig glueConfig, + AWSGlueAsync glueClient, + @ForGlueHiveMetastore Executor executor) { this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.hdfsContext = new HdfsContext(new ConnectorIdentity(DEFAULT_METASTORE_USER, Optional.empty(), Optional.empty())); this.glueClient = requireNonNull(glueClient, "glueClient is null"); this.defaultDir = glueConfig.getDefaultWarehouseDir(); this.catalogId = glueConfig.getCatalogId().orElse(null); + this.partitionSegments = glueConfig.getPartitionSegments(); + this.executor = requireNonNull(executor, "executor is null"); } private static AWSGlueAsync createAsyncGlueClient(GlueHiveMetastoreConfig config) @@ -649,6 +669,37 @@ public Optional> getPartitionNamesByParts(String databaseName, Stri } private List getPartitions(String databaseName, String tableName, String expression) + { + if (partitionSegments == 1) { + return getPartitions(databaseName, tableName, expression, null); + } + + // Do parallel partition fetch. + CompletionService> completionService = new ExecutorCompletionService<>(executor); + for (int i = 0; i < partitionSegments; i++) { + Segment segment = new Segment().withSegmentNumber(i).withTotalSegments(partitionSegments); + completionService.submit(() -> getPartitions(databaseName, tableName, expression, segment)); + } + + List partitions = new ArrayList<>(); + try { + for (int i = 0; i < partitionSegments; i++) { + Future> futurePartitions = completionService.take(); + partitions.addAll(futurePartitions.get()); + } + } + catch (ExecutionException | InterruptedException e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + throw new PrestoException(HIVE_METASTORE_ERROR, "Failed to fetch partitions from Glue Data Catalog", e); + } + + partitions.sort(PARTITION_COMPARATOR); + return partitions; + } + + private List getPartitions(String databaseName, String tableName, String expression, @Nullable Segment segment) { try { List partitions = new ArrayList<>(); @@ -660,6 +711,7 @@ private List getPartitions(String databaseName, String tableName, Str .withDatabaseName(databaseName) .withTableName(tableName) .withExpression(expression) + .withSegment(segment) .withNextToken(nextToken)); result.getPartitions() .forEach(partition -> partitions.add(GlueToPrestoConverter.convertPartition(partition))); diff --git a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/glue/GlueHiveMetastoreConfig.java b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/glue/GlueHiveMetastoreConfig.java index f36bf9d903b73..979a29c734466 100644 --- a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/glue/GlueHiveMetastoreConfig.java +++ b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/glue/GlueHiveMetastoreConfig.java @@ -16,6 +16,7 @@ import com.facebook.airlift.configuration.Config; import com.facebook.airlift.configuration.ConfigDescription; +import javax.validation.constraints.Max; import javax.validation.constraints.Min; import java.util.Optional; @@ -27,6 +28,8 @@ public class GlueHiveMetastoreConfig private int maxGlueConnections = 5; private Optional defaultWarehouseDir = Optional.empty(); private Optional catalogId = Optional.empty(); + private int partitionSegments = 5; + private int getPartitionThreads = 20; public Optional getGlueRegion() { @@ -93,4 +96,33 @@ public GlueHiveMetastoreConfig setCatalogId(String catalogId) this.catalogId = Optional.ofNullable(catalogId); return this; } + + @Min(1) + @Max(10) + public int getPartitionSegments() + { + return partitionSegments; + } + + @Config("hive.metastore.glue.partitions-segments") + @ConfigDescription("Number of segments for partitioned Glue tables") + public GlueHiveMetastoreConfig setPartitionSegments(int partitionSegments) + { + this.partitionSegments = partitionSegments; + return this; + } + + @Min(1) + public int getGetPartitionThreads() + { + return getPartitionThreads; + } + + @Config("hive.metastore.glue.get-partition-threads") + @ConfigDescription("Number of threads for parallel partition fetches from Glue") + public GlueHiveMetastoreConfig setGetPartitionThreads(int getPartitionThreads) + { + this.getPartitionThreads = getPartitionThreads; + return this; + } } diff --git a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/glue/GlueMetastoreModule.java b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/glue/GlueMetastoreModule.java index df87e9db8d63c..d7f6a6da30851 100644 --- a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/glue/GlueMetastoreModule.java +++ b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/glue/GlueMetastoreModule.java @@ -13,13 +13,21 @@ */ package com.facebook.presto.hive.metastore.glue; +import com.facebook.airlift.concurrent.BoundedExecutor; import com.facebook.presto.hive.metastore.ExtendedHiveMetastore; import com.google.inject.Binder; import com.google.inject.Module; +import com.google.inject.Provides; import com.google.inject.Scopes; +import com.google.inject.Singleton; +import java.util.concurrent.Executor; + +import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed; import static com.facebook.airlift.configuration.ConfigBinder.configBinder; +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static java.util.Objects.requireNonNull; +import static java.util.concurrent.Executors.newCachedThreadPool; import static org.weakref.jmx.ObjectNames.generatedNameOf; import static org.weakref.jmx.guice.ExportBinder.newExporter; @@ -41,4 +49,17 @@ public void configure(Binder binder) newExporter(binder).export(ExtendedHiveMetastore.class) .as(generatedNameOf(GlueHiveMetastore.class, connectorId)); } + + @Provides + @Singleton + @ForGlueHiveMetastore + public Executor createExecutor(GlueHiveMetastoreConfig hiveConfig) + { + if (hiveConfig.getGetPartitionThreads() == 1) { + return directExecutor(); + } + return new BoundedExecutor( + newCachedThreadPool(daemonThreadsNamed("hive-glue-%s")), + hiveConfig.getGetPartitionThreads()); + } } diff --git a/presto-hive-metastore/src/test/java/com/facebook/presto/hive/metastore/glue/TestGlueHiveMetastoreConfig.java b/presto-hive-metastore/src/test/java/com/facebook/presto/hive/metastore/glue/TestGlueHiveMetastoreConfig.java index e3afd99c1c754..eb1caeac9aa4a 100644 --- a/presto-hive-metastore/src/test/java/com/facebook/presto/hive/metastore/glue/TestGlueHiveMetastoreConfig.java +++ b/presto-hive-metastore/src/test/java/com/facebook/presto/hive/metastore/glue/TestGlueHiveMetastoreConfig.java @@ -32,7 +32,9 @@ public void testDefaults() .setPinGlueClientToCurrentRegion(false) .setMaxGlueConnections(5) .setDefaultWarehouseDir(null) - .setCatalogId(null)); + .setCatalogId(null) + .setPartitionSegments(5) + .setGetPartitionThreads(20)); } @Test @@ -44,6 +46,8 @@ public void testExplicitPropertyMapping() .put("hive.metastore.glue.max-connections", "10") .put("hive.metastore.glue.default-warehouse-dir", "/location") .put("hive.metastore.glue.catalogid", "0123456789") + .put("hive.metastore.glue.partitions-segments", "10") + .put("hive.metastore.glue.get-partition-threads", "42") .build(); GlueHiveMetastoreConfig expected = new GlueHiveMetastoreConfig() @@ -51,7 +55,9 @@ public void testExplicitPropertyMapping() .setPinGlueClientToCurrentRegion(true) .setMaxGlueConnections(10) .setDefaultWarehouseDir("/location") - .setCatalogId("0123456789"); + .setCatalogId("0123456789") + .setPartitionSegments(10) + .setGetPartitionThreads(42); assertFullMapping(properties, expected); } diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveClient.java b/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveClient.java index 39df9eab954c7..ca3e1c85c8f7d 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveClient.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveClient.java @@ -345,7 +345,7 @@ public abstract class AbstractTestHiveClient .row(3L, "bye", (byte) 46, (short) 346, 345, 456L, 754.2008f, 98.1, false, ImmutableList.of("ape", "bear"), ImmutableMap.of("three", 3L, "four", 4L), ImmutableList.of("false", 0L, false)) .build(); - private static final List CREATE_TABLE_COLUMNS_PARTITIONED = ImmutableList.builder() + protected static final List CREATE_TABLE_COLUMNS_PARTITIONED = ImmutableList.builder() .addAll(CREATE_TABLE_COLUMNS) .add(new ColumnMetadata("ds", createUnboundedVarcharType())) .build(); @@ -1200,6 +1200,7 @@ public void testListUnknownSchema() @Test public void testGetPartitions() + throws Exception { try (Transaction transaction = newTransaction()) { ConnectorMetadata metadata = transaction.getMetadata(); diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/metastore/glue/TestHiveClientGlueMetastore.java b/presto-hive/src/test/java/com/facebook/presto/hive/metastore/glue/TestHiveClientGlueMetastore.java index a4461d9a34fdf..4bf99a018f1a9 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/metastore/glue/TestHiveClientGlueMetastore.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/metastore/glue/TestHiveClientGlueMetastore.java @@ -22,21 +22,46 @@ import com.facebook.presto.hive.MetastoreClientConfig; import com.facebook.presto.hive.authentication.NoHdfsAuthentication; import com.facebook.presto.hive.metastore.ExtendedHiveMetastore; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; import java.io.File; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ExecutorService; +import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed; import static java.util.Locale.ENGLISH; import static java.util.UUID.randomUUID; +import static java.util.concurrent.Executors.newCachedThreadPool; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; public class TestHiveClientGlueMetastore extends AbstractTestHiveClientLocal { + private ExecutorService executorService; + public TestHiveClientGlueMetastore() { super("test_glue" + randomUUID().toString().toLowerCase(ENGLISH).replace("-", "")); } + @BeforeClass + public void setUp() + { + executorService = newCachedThreadPool(daemonThreadsNamed("hive-glue-%s")); + } + + @AfterClass(alwaysRun = true) + public void tearDown() + { + executorService.shutdownNow(); + } + /** * GlueHiveMetastore currently uses AWS Default Credential Provider Chain, * See https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html#credentials-default @@ -52,7 +77,7 @@ protected ExtendedHiveMetastore createMetastore(File tempDir) GlueHiveMetastoreConfig glueConfig = new GlueHiveMetastoreConfig(); glueConfig.setDefaultWarehouseDir(tempDir.toURI().toString()); - return new GlueHiveMetastore(hdfsEnvironment, glueConfig); + return new GlueHiveMetastore(hdfsEnvironment, glueConfig, executor); } @Override @@ -98,4 +123,18 @@ public void testStorePartitionWithStatistics() { testStorePartitionWithStatistics(STATISTICS_PARTITIONED_TABLE_COLUMNS, BASIC_STATISTICS_1, BASIC_STATISTICS_2, BASIC_STATISTICS_1, EMPTY_TABLE_STATISTICS); } + + @Test + public void testGetPartitions() throws Exception + { + try { + createDummyPartitionedTable(tablePartitionFormat, CREATE_TABLE_COLUMNS_PARTITIONED); + Optional> partitionNames = getMetastoreClient().getPartitionNames(tablePartitionFormat.getSchemaName(), tablePartitionFormat.getTableName()); + assertTrue(partitionNames.isPresent()); + assertEquals(partitionNames.get(), ImmutableList.of("ds=2016-01-01", "ds=2016-01-02")); + } + finally { + dropTable(tablePartitionFormat); + } + } }