Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive.metastore.glue;

import javax.inject.Qualifier;

import java.lang.annotation.Retention;
import java.lang.annotation.Target;

import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

@Retention(RUNTIME)
@Target({FIELD, PARAMETER, METHOD})
@Qualifier
public @interface ForGlueHiveMetastore {}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import com.amazonaws.services.glue.model.PartitionError;
import com.amazonaws.services.glue.model.PartitionInput;
import com.amazonaws.services.glue.model.PartitionValueList;
import com.amazonaws.services.glue.model.Segment;
import com.amazonaws.services.glue.model.TableInput;
import com.amazonaws.services.glue.model.UpdateDatabaseRequest;
import com.amazonaws.services.glue.model.UpdatePartitionRequest;
Expand Down Expand Up @@ -86,15 +87,20 @@
import com.google.common.collect.Lists;
import org.apache.hadoop.fs.Path;

import javax.annotation.Nullable;
import javax.inject.Inject;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.function.Function;

Expand All @@ -115,6 +121,8 @@
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.facebook.presto.spi.security.PrincipalType.USER;
import static com.google.common.base.Strings.isNullOrEmpty;
import static com.google.common.collect.Comparators.lexicographical;
import static java.util.Comparator.comparing;
import static java.util.Objects.requireNonNull;
import static java.util.function.UnaryOperator.identity;
import static java.util.stream.Collectors.toList;
Expand All @@ -130,26 +138,38 @@ public class GlueHiveMetastore
private static final String WILDCARD_EXPRESSION = "";
private static final int BATCH_GET_PARTITION_MAX_PAGE_SIZE = 1000;
private static final int BATCH_CREATE_PARTITION_MAX_PAGE_SIZE = 100;
private static final Comparator<Partition> PARTITION_COMPARATOR = comparing(Partition::getValues, lexicographical(String.CASE_INSENSITIVE_ORDER));

private final HdfsEnvironment hdfsEnvironment;
private final HdfsContext hdfsContext;
private final AWSGlueAsync glueClient;
private final Optional<String> defaultDir;
private final String catalogId;
private final int partitionSegments;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: What about partitionSegmentCount?

private final Executor executor;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we wanna shutdown the executor in PreDestroy?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to explicitly shut down because:

  1. We are using a cached threadpool with the idle timeout of 60 seconds.
  2. The created threads are all daemon threads.


@Inject
public GlueHiveMetastore(HdfsEnvironment hdfsEnvironment, GlueHiveMetastoreConfig glueConfig)
public GlueHiveMetastore(
HdfsEnvironment hdfsEnvironment,
GlueHiveMetastoreConfig glueConfig,
@ForGlueHiveMetastore Executor executor)
{
this(hdfsEnvironment, glueConfig, createAsyncGlueClient(glueConfig));
this(hdfsEnvironment, glueConfig, createAsyncGlueClient(glueConfig), executor);
}

public GlueHiveMetastore(HdfsEnvironment hdfsEnvironment, GlueHiveMetastoreConfig glueConfig, AWSGlueAsync glueClient)
public GlueHiveMetastore(
HdfsEnvironment hdfsEnvironment,
GlueHiveMetastoreConfig glueConfig,
AWSGlueAsync glueClient,
@ForGlueHiveMetastore Executor executor)
{
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.hdfsContext = new HdfsContext(new ConnectorIdentity(DEFAULT_METASTORE_USER, Optional.empty(), Optional.empty()));
this.glueClient = requireNonNull(glueClient, "glueClient is null");
this.defaultDir = glueConfig.getDefaultWarehouseDir();
this.catalogId = glueConfig.getCatalogId().orElse(null);
this.partitionSegments = glueConfig.getPartitionSegments();
this.executor = requireNonNull(executor, "executor is null");
}

private static AWSGlueAsync createAsyncGlueClient(GlueHiveMetastoreConfig config)
Expand Down Expand Up @@ -649,6 +669,37 @@ public Optional<List<String>> getPartitionNamesByParts(String databaseName, Stri
}

private List<Partition> getPartitions(String databaseName, String tableName, String expression)
{
if (partitionSegments == 1) {
return getPartitions(databaseName, tableName, expression, null);
}

// Do parallel partition fetch.
CompletionService<List<Partition>> completionService = new ExecutorCompletionService<>(executor);
for (int i = 0; i < partitionSegments; i++) {
Segment segment = new Segment().withSegmentNumber(i).withTotalSegments(partitionSegments);
completionService.submit(() -> getPartitions(databaseName, tableName, expression, segment));
}

List<Partition> partitions = new ArrayList<>();
try {
for (int i = 0; i < partitionSegments; i++) {
Future<List<Partition>> futurePartitions = completionService.take();
partitions.addAll(futurePartitions.get());
}
}
catch (ExecutionException | InterruptedException e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
throw new PrestoException(HIVE_METASTORE_ERROR, "Failed to fetch partitions from Glue Data Catalog", e);
}

partitions.sort(PARTITION_COMPARATOR);
return partitions;
}

private List<Partition> getPartitions(String databaseName, String tableName, String expression, @Nullable Segment segment)
{
try {
List<Partition> partitions = new ArrayList<>();
Expand All @@ -660,6 +711,7 @@ private List<Partition> getPartitions(String databaseName, String tableName, Str
.withDatabaseName(databaseName)
.withTableName(tableName)
.withExpression(expression)
.withSegment(segment)
.withNextToken(nextToken));
result.getPartitions()
.forEach(partition -> partitions.add(GlueToPrestoConverter.convertPartition(partition)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.facebook.airlift.configuration.Config;
import com.facebook.airlift.configuration.ConfigDescription;

import javax.validation.constraints.Max;
import javax.validation.constraints.Min;

import java.util.Optional;
Expand All @@ -27,6 +28,8 @@ public class GlueHiveMetastoreConfig
private int maxGlueConnections = 5;
private Optional<String> defaultWarehouseDir = Optional.empty();
private Optional<String> catalogId = Optional.empty();
private int partitionSegments = 5;
private int getPartitionThreads = 20;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You must have a really powerful coordinator lol...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

20 threads is fairly conservative.


public Optional<String> getGlueRegion()
{
Expand Down Expand Up @@ -93,4 +96,33 @@ public GlueHiveMetastoreConfig setCatalogId(String catalogId)
this.catalogId = Optional.ofNullable(catalogId);
return this;
}

@Min(1)
@Max(10)
public int getPartitionSegments()
{
return partitionSegments;
}

@Config("hive.metastore.glue.partitions-segments")
@ConfigDescription("Number of segments for partitioned Glue tables")
public GlueHiveMetastoreConfig setPartitionSegments(int partitionSegments)
{
this.partitionSegments = partitionSegments;
return this;
}

@Min(1)
public int getGetPartitionThreads()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: getGetPartitionThreads. Two issue:

  1. It should be an noun for get/set method (e.g. partitionFetchingThreadCount)
  2. We should add glue into the method name to reflect it only affects Glue related metastore :)

What about getGluePartitionFetchingThreadCount? ditto for the set method.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hear you, but I feel that it would be inconsistent with the rest of the code base. Example. Also we should not need to add the "Glue" prefix since the class is specific to Glue.

Copy link
Contributor

@wenleix wenleix Dec 5, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@anoopj : But isn't this configuration itself tied to Glue?

We should decide what the convention is, and if necessary, fix other use cases. What do you think whether we want "Glue" in method name ? @highker , @arhimondr , @rongrong

{
return getPartitionThreads;
}

@Config("hive.metastore.glue.get-partition-threads")
@ConfigDescription("Number of threads for parallel partition fetches from Glue")
public GlueHiveMetastoreConfig setGetPartitionThreads(int getPartitionThreads)
{
this.getPartitionThreads = getPartitionThreads;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,21 @@
*/
package com.facebook.presto.hive.metastore.glue;

import com.facebook.airlift.concurrent.BoundedExecutor;
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.Scopes;
import com.google.inject.Singleton;

import java.util.concurrent.Executor;

import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed;
import static com.facebook.airlift.configuration.ConfigBinder.configBinder;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static org.weakref.jmx.ObjectNames.generatedNameOf;
import static org.weakref.jmx.guice.ExportBinder.newExporter;

Expand All @@ -41,4 +49,17 @@ public void configure(Binder binder)
newExporter(binder).export(ExtendedHiveMetastore.class)
.as(generatedNameOf(GlueHiveMetastore.class, connectorId));
}

@Provides
@Singleton
@ForGlueHiveMetastore
public Executor createExecutor(GlueHiveMetastoreConfig hiveConfig)
{
if (hiveConfig.getGetPartitionThreads() == 1) {
return directExecutor();
}
return new BoundedExecutor(
newCachedThreadPool(daemonThreadsNamed("hive-glue-%s")),
hiveConfig.getGetPartitionThreads());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ public void testDefaults()
.setPinGlueClientToCurrentRegion(false)
.setMaxGlueConnections(5)
.setDefaultWarehouseDir(null)
.setCatalogId(null));
.setCatalogId(null)
.setPartitionSegments(5)
.setGetPartitionThreads(20));
}

@Test
Expand All @@ -44,14 +46,18 @@ public void testExplicitPropertyMapping()
.put("hive.metastore.glue.max-connections", "10")
.put("hive.metastore.glue.default-warehouse-dir", "/location")
.put("hive.metastore.glue.catalogid", "0123456789")
.put("hive.metastore.glue.partitions-segments", "10")
.put("hive.metastore.glue.get-partition-threads", "42")
.build();

GlueHiveMetastoreConfig expected = new GlueHiveMetastoreConfig()
.setGlueRegion("us-east-1")
.setPinGlueClientToCurrentRegion(true)
.setMaxGlueConnections(10)
.setDefaultWarehouseDir("/location")
.setCatalogId("0123456789");
.setCatalogId("0123456789")
.setPartitionSegments(10)
.setGetPartitionThreads(42);

assertFullMapping(properties, expected);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ public abstract class AbstractTestHiveClient
.row(3L, "bye", (byte) 46, (short) 346, 345, 456L, 754.2008f, 98.1, false, ImmutableList.of("ape", "bear"), ImmutableMap.of("three", 3L, "four", 4L), ImmutableList.of("false", 0L, false))
.build();

private static final List<ColumnMetadata> CREATE_TABLE_COLUMNS_PARTITIONED = ImmutableList.<ColumnMetadata>builder()
protected static final List<ColumnMetadata> CREATE_TABLE_COLUMNS_PARTITIONED = ImmutableList.<ColumnMetadata>builder()
.addAll(CREATE_TABLE_COLUMNS)
.add(new ColumnMetadata("ds", createUnboundedVarcharType()))
.build();
Expand Down Expand Up @@ -1200,6 +1200,7 @@ public void testListUnknownSchema()

@Test
public void testGetPartitions()
throws Exception
{
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,46 @@
import com.facebook.presto.hive.MetastoreClientConfig;
import com.facebook.presto.hive.authentication.NoHdfsAuthentication;
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import java.io.File;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;

import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed;
import static java.util.Locale.ENGLISH;
import static java.util.UUID.randomUUID;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

public class TestHiveClientGlueMetastore
extends AbstractTestHiveClientLocal
{
private ExecutorService executorService;

public TestHiveClientGlueMetastore()
{
super("test_glue" + randomUUID().toString().toLowerCase(ENGLISH).replace("-", ""));
}

@BeforeClass
public void setUp()
{
executorService = newCachedThreadPool(daemonThreadsNamed("hive-glue-%s"));
}

@AfterClass(alwaysRun = true)
public void tearDown()
{
executorService.shutdownNow();
}

/**
* GlueHiveMetastore currently uses AWS Default Credential Provider Chain,
* See https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html#credentials-default
Expand All @@ -52,7 +77,7 @@ protected ExtendedHiveMetastore createMetastore(File tempDir)
GlueHiveMetastoreConfig glueConfig = new GlueHiveMetastoreConfig();
glueConfig.setDefaultWarehouseDir(tempDir.toURI().toString());

return new GlueHiveMetastore(hdfsEnvironment, glueConfig);
return new GlueHiveMetastore(hdfsEnvironment, glueConfig, executor);
}

@Override
Expand Down Expand Up @@ -98,4 +123,18 @@ public void testStorePartitionWithStatistics()
{
testStorePartitionWithStatistics(STATISTICS_PARTITIONED_TABLE_COLUMNS, BASIC_STATISTICS_1, BASIC_STATISTICS_2, BASIC_STATISTICS_1, EMPTY_TABLE_STATISTICS);
}

@Test
public void testGetPartitions() throws Exception
{
try {
createDummyPartitionedTable(tablePartitionFormat, CREATE_TABLE_COLUMNS_PARTITIONED);
Optional<List<String>> partitionNames = getMetastoreClient().getPartitionNames(tablePartitionFormat.getSchemaName(), tablePartitionFormat.getTableName());
assertTrue(partitionNames.isPresent());
assertEquals(partitionNames.get(), ImmutableList.of("ds=2016-01-01", "ds=2016-01-02"));
}
finally {
dropTable(tablePartitionFormat);
}
}
}