diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/metastore/glue/ForGlueHiveMetastore.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/metastore/glue/ForGlueHiveMetastore.java new file mode 100644 index 000000000000..472d2b41bf47 --- /dev/null +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/metastore/glue/ForGlueHiveMetastore.java @@ -0,0 +1,29 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.prestosql.plugin.hive.metastore.glue; + +import javax.inject.Qualifier; + +import java.lang.annotation.Retention; +import java.lang.annotation.Target; + +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.ElementType.PARAMETER; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +@Retention(RUNTIME) +@Target({FIELD, PARAMETER, METHOD}) +@Qualifier +public @interface ForGlueHiveMetastore {} diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/metastore/glue/GlueHiveMetastore.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/metastore/glue/GlueHiveMetastore.java index a1364485a644..ca8a016a71a0 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/metastore/glue/GlueHiveMetastore.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/metastore/glue/GlueHiveMetastore.java @@ -53,6 +53,7 @@ import com.amazonaws.services.glue.model.PartitionError; import com.amazonaws.services.glue.model.PartitionInput; import com.amazonaws.services.glue.model.PartitionValueList; +import com.amazonaws.services.glue.model.Segment; import com.amazonaws.services.glue.model.TableInput; import com.amazonaws.services.glue.model.UpdateDatabaseRequest; import com.amazonaws.services.glue.model.UpdatePartitionRequest; @@ -94,19 +95,25 @@ import io.prestosql.spi.type.Type; import org.apache.hadoop.fs.Path; +import javax.annotation.Nullable; import javax.inject.Inject; import java.util.ArrayList; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.Future; import java.util.function.Function; import static com.google.common.base.Strings.isNullOrEmpty; +import static com.google.common.collect.Comparators.lexicographical; import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_METASTORE_ERROR; import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_PARTITION_DROPPED_DURING_QUERY; import static io.prestosql.plugin.hive.metastore.MetastoreUtil.makePartName; @@ -119,6 +126,7 @@ import static io.prestosql.spi.StandardErrorCode.NOT_SUPPORTED; import static io.prestosql.spi.security.PrincipalType.USER; import static java.lang.String.format; +import static java.util.Comparator.comparing; import static java.util.Objects.requireNonNull; import static java.util.function.UnaryOperator.identity; import static java.util.stream.Collectors.toList; @@ -136,21 +144,30 @@ public class GlueHiveMetastore private static final String WILDCARD_EXPRESSION = ""; private static final int BATCH_GET_PARTITION_MAX_PAGE_SIZE = 1000; private static final int BATCH_CREATE_PARTITION_MAX_PAGE_SIZE = 100; + private static final Comparator PARTITION_COMPARATOR = + comparing(Partition::getValues, lexicographical(String.CASE_INSENSITIVE_ORDER)); private final HdfsEnvironment hdfsEnvironment; private final HdfsContext hdfsContext; private final AWSGlueAsync glueClient; private final Optional defaultDir; private final String catalogId; + private final int partitionSegments; + private final Executor executor; @Inject - public GlueHiveMetastore(HdfsEnvironment hdfsEnvironment, GlueHiveMetastoreConfig glueConfig) + public GlueHiveMetastore( + HdfsEnvironment hdfsEnvironment, + GlueHiveMetastoreConfig glueConfig, + @ForGlueHiveMetastore Executor executor) { this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.hdfsContext = new HdfsContext(new ConnectorIdentity(DEFAULT_METASTORE_USER, Optional.empty(), Optional.empty())); this.glueClient = requireNonNull(createAsyncGlueClient(glueConfig), "glueClient is null"); this.defaultDir = glueConfig.getDefaultWarehouseDir(); this.catalogId = glueConfig.getCatalogId().orElse(null); + this.partitionSegments = glueConfig.getPartitionSegments(); + this.executor = requireNonNull(executor, "executor is null"); } private static AWSGlueAsync createAsyncGlueClient(GlueHiveMetastoreConfig config) @@ -668,6 +685,37 @@ public Optional> getPartitionNamesByParts(HiveIdentity identity, St } private List getPartitions(String databaseName, String tableName, String expression) + { + if (partitionSegments == 1) { + return getPartitions(databaseName, tableName, expression, null); + } + + // Do parallel partition fetch. + CompletionService> completionService = new ExecutorCompletionService<>(executor); + for (int i = 0; i < partitionSegments; i++) { + Segment segment = new Segment().withSegmentNumber(i).withTotalSegments(partitionSegments); + completionService.submit(() -> getPartitions(databaseName, tableName, expression, segment)); + } + + List partitions = new ArrayList<>(); + try { + for (int i = 0; i < partitionSegments; i++) { + Future> futurePartitions = completionService.take(); + partitions.addAll(futurePartitions.get()); + } + } + catch (ExecutionException | InterruptedException e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + throw new PrestoException(HIVE_METASTORE_ERROR, "Failed to fetch partitions from Glue Data Catalog", e); + } + + partitions.sort(PARTITION_COMPARATOR); + return partitions; + } + + private List getPartitions(String databaseName, String tableName, String expression, @Nullable Segment segment) { try { List partitions = new ArrayList<>(); @@ -679,6 +727,7 @@ private List getPartitions(String databaseName, String tableName, Str .withDatabaseName(databaseName) .withTableName(tableName) .withExpression(expression) + .withSegment(segment) .withNextToken(nextToken)); result.getPartitions() .forEach(partition -> partitions.add(GlueToPrestoConverter.convertPartition(partition))); diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/metastore/glue/GlueHiveMetastoreConfig.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/metastore/glue/GlueHiveMetastoreConfig.java index bfba5a62b0e5..b6868d6243b3 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/metastore/glue/GlueHiveMetastoreConfig.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/metastore/glue/GlueHiveMetastoreConfig.java @@ -17,6 +17,7 @@ import io.airlift.configuration.ConfigDescription; import io.airlift.configuration.ConfigSecuritySensitive; +import javax.validation.constraints.Max; import javax.validation.constraints.Min; import java.util.Optional; @@ -33,6 +34,8 @@ public class GlueHiveMetastoreConfig private Optional awsCredentialsProvider = Optional.empty(); private boolean useInstanceCredentials; private Optional catalogId = Optional.empty(); + private int partitionSegments = 5; + private int getPartitionThreads = 20; public Optional getGlueRegion() { @@ -163,4 +166,33 @@ public GlueHiveMetastoreConfig setAwsCredentialsProvider(String awsCredentialsPr this.awsCredentialsProvider = Optional.ofNullable(awsCredentialsProvider); return this; } + + @Min(1) + @Max(10) + public int getPartitionSegments() + { + return partitionSegments; + } + + @Config("hive.metastore.glue.partitions-segments") + @ConfigDescription("Number of segments for partitioned Glue tables") + public GlueHiveMetastoreConfig setPartitionSegments(int partitionSegments) + { + this.partitionSegments = partitionSegments; + return this; + } + + @Min(1) + public int getGetPartitionThreads() + { + return getPartitionThreads; + } + + @Config("hive.metastore.glue.get-partition-threads") + @ConfigDescription("Number of threads for parallel partition fetches from Glue") + public GlueHiveMetastoreConfig setGetPartitionThreads(int getPartitionThreads) + { + this.getPartitionThreads = getPartitionThreads; + return this; + } } diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/metastore/glue/GlueMetastoreModule.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/metastore/glue/GlueMetastoreModule.java index 1c88b5a958a0..b19c6107aba7 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/metastore/glue/GlueMetastoreModule.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/metastore/glue/GlueMetastoreModule.java @@ -14,10 +14,14 @@ package io.prestosql.plugin.hive.metastore.glue; import com.google.inject.Binder; +import com.google.inject.Provides; import com.google.inject.Scopes; +import com.google.inject.Singleton; import com.google.inject.multibindings.Multibinder; +import io.airlift.concurrent.BoundedExecutor; import io.airlift.configuration.AbstractConfigurationAwareModule; import io.prestosql.plugin.hive.ForRecordingHiveMetastore; +import io.prestosql.plugin.hive.HiveCatalogName; import io.prestosql.plugin.hive.HiveConfig; import io.prestosql.plugin.hive.metastore.HiveMetastore; import io.prestosql.plugin.hive.metastore.RecordingHiveMetastore; @@ -26,8 +30,13 @@ import io.prestosql.plugin.hive.metastore.cache.ForCachingHiveMetastore; import io.prestosql.spi.procedure.Procedure; +import java.util.concurrent.Executor; + +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static com.google.inject.multibindings.Multibinder.newSetBinder; +import static io.airlift.concurrent.Threads.daemonThreadsNamed; import static io.airlift.configuration.ConfigBinder.configBinder; +import static java.util.concurrent.Executors.newCachedThreadPool; import static org.weakref.jmx.guice.ExportBinder.newExporter; public class GlueMetastoreModule @@ -66,4 +75,17 @@ protected void setup(Binder binder) } binder.install(new CachingHiveMetastoreModule()); } + + @Provides + @Singleton + @ForGlueHiveMetastore + public Executor createExecutor(HiveCatalogName catalogName, GlueHiveMetastoreConfig hiveConfig) + { + if (hiveConfig.getGetPartitionThreads() == 1) { + return directExecutor(); + } + return new BoundedExecutor( + newCachedThreadPool(daemonThreadsNamed("hive-glue-%s")), + hiveConfig.getGetPartitionThreads()); + } } diff --git a/presto-hive/src/test/java/io/prestosql/plugin/hive/AbstractTestHive.java b/presto-hive/src/test/java/io/prestosql/plugin/hive/AbstractTestHive.java index b6aa9c918087..2fbe12780e1b 100644 --- a/presto-hive/src/test/java/io/prestosql/plugin/hive/AbstractTestHive.java +++ b/presto-hive/src/test/java/io/prestosql/plugin/hive/AbstractTestHive.java @@ -309,7 +309,7 @@ public abstract class AbstractTestHive .row(3L, "bye", (byte) 46, (short) 346, 345, 456L, 754.2008f, 98.1, false, ImmutableList.of("ape", "bear"), ImmutableMap.of("three", 3L, "four", 4L), ImmutableList.of("false", 0L, false)) .build(); - private static final List CREATE_TABLE_COLUMNS_PARTITIONED = ImmutableList.builder() + protected static final List CREATE_TABLE_COLUMNS_PARTITIONED = ImmutableList.builder() .addAll(CREATE_TABLE_COLUMNS) .add(new ColumnMetadata("ds", createUnboundedVarcharType())) .build(); @@ -960,6 +960,7 @@ public void testListUnknownSchema() @Test public void testGetPartitions() + throws Exception { try (Transaction transaction = newTransaction()) { ConnectorMetadata metadata = transaction.getMetadata(); diff --git a/presto-hive/src/test/java/io/prestosql/plugin/hive/metastore/glue/TestGlueHiveMetastoreConfig.java b/presto-hive/src/test/java/io/prestosql/plugin/hive/metastore/glue/TestGlueHiveMetastoreConfig.java index c1c0c295ca7f..cf66c06af7b2 100644 --- a/presto-hive/src/test/java/io/prestosql/plugin/hive/metastore/glue/TestGlueHiveMetastoreConfig.java +++ b/presto-hive/src/test/java/io/prestosql/plugin/hive/metastore/glue/TestGlueHiveMetastoreConfig.java @@ -37,7 +37,9 @@ public void testDefaults() .setAwsSecretKey(null) .setAwsCredentialsProvider(null) .setCatalogId(null) - .setUseInstanceCredentials(false)); + .setUseInstanceCredentials(false) + .setPartitionSegments(5) + .setGetPartitionThreads(20)); } @Test @@ -54,6 +56,8 @@ public void testExplicitPropertyMapping() .put("hive.metastore.glue.aws-credentials-provider", "custom") .put("hive.metastore.glue.catalogid", "0123456789") .put("hive.metastore.glue.use-instance-credentials", "true") + .put("hive.metastore.glue.partitions-segments", "10") + .put("hive.metastore.glue.get-partition-threads", "42") .build(); GlueHiveMetastoreConfig expected = new GlueHiveMetastoreConfig() @@ -66,7 +70,9 @@ public void testExplicitPropertyMapping() .setAwsSecretKey("DEF") .setAwsCredentialsProvider("custom") .setCatalogId("0123456789") - .setUseInstanceCredentials(true); + .setUseInstanceCredentials(true) + .setPartitionSegments(10) + .setGetPartitionThreads(42); assertFullMapping(properties, expected); } diff --git a/presto-hive/src/test/java/io/prestosql/plugin/hive/metastore/glue/TestHiveGlueMetastore.java b/presto-hive/src/test/java/io/prestosql/plugin/hive/metastore/glue/TestHiveGlueMetastore.java index ce3e7f4e3db5..d6f842cd696c 100644 --- a/presto-hive/src/test/java/io/prestosql/plugin/hive/metastore/glue/TestHiveGlueMetastore.java +++ b/presto-hive/src/test/java/io/prestosql/plugin/hive/metastore/glue/TestHiveGlueMetastore.java @@ -13,23 +13,54 @@ */ package io.prestosql.plugin.hive.metastore.glue; +import com.google.common.collect.ImmutableList; +import io.airlift.concurrent.BoundedExecutor; import io.prestosql.plugin.hive.AbstractTestHiveLocal; +import io.prestosql.plugin.hive.authentication.HiveIdentity; import io.prestosql.plugin.hive.metastore.HiveMetastore; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; import java.io.File; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import static io.airlift.concurrent.Threads.daemonThreadsNamed; import static io.prestosql.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT; +import static io.prestosql.testing.TestingConnectorSession.SESSION; import static java.util.Locale.ENGLISH; import static java.util.UUID.randomUUID; +import static java.util.concurrent.Executors.newCachedThreadPool; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; public class TestHiveGlueMetastore extends AbstractTestHiveLocal { + private static final HiveIdentity HIVE_CONTEXT = new HiveIdentity(SESSION); + + private ExecutorService executorService; + public TestHiveGlueMetastore() { super("test_glue" + randomUUID().toString().toLowerCase(ENGLISH).replace("-", "")); } + @BeforeClass + public void setUp() + { + executorService = newCachedThreadPool(daemonThreadsNamed("hive-glue-%s")); + } + + @AfterClass(alwaysRun = true) + public void tearDown() + { + executorService.shutdownNow(); + } + /** * GlueHiveMetastore currently uses AWS Default Credential Provider Chain, * See https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html#credentials-default @@ -41,7 +72,8 @@ protected HiveMetastore createMetastore(File tempDir) GlueHiveMetastoreConfig glueConfig = new GlueHiveMetastoreConfig(); glueConfig.setDefaultWarehouseDir(tempDir.toURI().toString()); - return new GlueHiveMetastore(HDFS_ENVIRONMENT, glueConfig); + Executor executor = new BoundedExecutor(executorService, 10); + return new GlueHiveMetastore(HDFS_ENVIRONMENT, glueConfig, executor); } @Override @@ -87,4 +119,18 @@ public void testStorePartitionWithStatistics() { testStorePartitionWithStatistics(STATISTICS_PARTITIONED_TABLE_COLUMNS, BASIC_STATISTICS_1, BASIC_STATISTICS_2, BASIC_STATISTICS_1, EMPTY_TABLE_STATISTICS); } + + @Test + public void testGetPartitions() throws Exception + { + try { + createDummyPartitionedTable(tablePartitionFormat, CREATE_TABLE_COLUMNS_PARTITIONED); + Optional> partitionNames = getMetastoreClient().getPartitionNames(HIVE_CONTEXT, tablePartitionFormat.getSchemaName(), tablePartitionFormat.getTableName()); + assertTrue(partitionNames.isPresent()); + assertEquals(partitionNames.get(), ImmutableList.of("ds=2016-01-01", "ds=2016-01-02")); + } + finally { + dropTable(tablePartitionFormat); + } + } }