Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.prestosql.plugin.hive.metastore.glue;

import javax.inject.Qualifier;

import java.lang.annotation.Retention;
import java.lang.annotation.Target;

import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

@Retention(RUNTIME)
@Target({FIELD, PARAMETER, METHOD})
@Qualifier
public @interface ForGlueHiveMetastore {}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import com.amazonaws.services.glue.model.PartitionError;
import com.amazonaws.services.glue.model.PartitionInput;
import com.amazonaws.services.glue.model.PartitionValueList;
import com.amazonaws.services.glue.model.Segment;
import com.amazonaws.services.glue.model.TableInput;
import com.amazonaws.services.glue.model.UpdateDatabaseRequest;
import com.amazonaws.services.glue.model.UpdatePartitionRequest;
Expand Down Expand Up @@ -94,19 +95,25 @@
import io.prestosql.spi.type.Type;
import org.apache.hadoop.fs.Path;

import javax.annotation.Nullable;
import javax.inject.Inject;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.function.Function;

import static com.google.common.base.Strings.isNullOrEmpty;
import static com.google.common.collect.Comparators.lexicographical;
import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_METASTORE_ERROR;
import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_PARTITION_DROPPED_DURING_QUERY;
import static io.prestosql.plugin.hive.metastore.MetastoreUtil.makePartName;
Expand All @@ -119,6 +126,7 @@
import static io.prestosql.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.prestosql.spi.security.PrincipalType.USER;
import static java.lang.String.format;
import static java.util.Comparator.comparing;
import static java.util.Objects.requireNonNull;
import static java.util.function.UnaryOperator.identity;
import static java.util.stream.Collectors.toList;
Expand All @@ -136,21 +144,30 @@ public class GlueHiveMetastore
private static final String WILDCARD_EXPRESSION = "";
private static final int BATCH_GET_PARTITION_MAX_PAGE_SIZE = 1000;
private static final int BATCH_CREATE_PARTITION_MAX_PAGE_SIZE = 100;
private static final Comparator<Partition> PARTITION_COMPARATOR =
comparing(Partition::getValues, lexicographical(String.CASE_INSENSITIVE_ORDER));

private final HdfsEnvironment hdfsEnvironment;
private final HdfsContext hdfsContext;
private final AWSGlueAsync glueClient;
private final Optional<String> defaultDir;
private final String catalogId;
private final int partitionSegments;
private final Executor executor;

@Inject
public GlueHiveMetastore(HdfsEnvironment hdfsEnvironment, GlueHiveMetastoreConfig glueConfig)
public GlueHiveMetastore(
HdfsEnvironment hdfsEnvironment,
GlueHiveMetastoreConfig glueConfig,
@ForGlueHiveMetastore Executor executor)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: per code style, if there are too many params to put on one line, we put every on a separate line:

public GlueHiveMetastore(
    HdfsEnvironment hdfsEnvironment, 
    GlueHiveMetastoreConfig glueConfig,
    @ForGlueHiveMetastore Executor executor)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still needs updating

{
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.hdfsContext = new HdfsContext(new ConnectorIdentity(DEFAULT_METASTORE_USER, Optional.empty(), Optional.empty()));
this.glueClient = requireNonNull(createAsyncGlueClient(glueConfig), "glueClient is null");
this.defaultDir = glueConfig.getDefaultWarehouseDir();
this.catalogId = glueConfig.getCatalogId().orElse(null);
this.partitionSegments = glueConfig.getPartitionSegments();
this.executor = requireNonNull(executor, "executor is null");
}

private static AWSGlueAsync createAsyncGlueClient(GlueHiveMetastoreConfig config)
Expand Down Expand Up @@ -668,6 +685,37 @@ public Optional<List<String>> getPartitionNamesByParts(HiveIdentity identity, St
}

private List<Partition> getPartitions(String databaseName, String tableName, String expression)
{
if (partitionSegments == 1) {
return getPartitions(databaseName, tableName, expression, null);
}

// Do parallel partition fetch.
CompletionService<List<Partition>> completionService = new ExecutorCompletionService<>(executor);
for (int i = 0; i < partitionSegments; i++) {
Segment segment = new Segment().withSegmentNumber(i).withTotalSegments(partitionSegments);
completionService.submit(() -> getPartitions(databaseName, tableName, expression, segment));
}

List<Partition> partitions = new ArrayList<>();
try {
for (int i = 0; i < partitionSegments; i++) {
Future<List<Partition>> futurePartitions = completionService.take();
partitions.addAll(futurePartitions.get());
}
}
catch (ExecutionException | InterruptedException e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
throw new PrestoException(HIVE_METASTORE_ERROR, "Failed to fetch partitions from Glue Data Catalog", e);
}

partitions.sort(PARTITION_COMPARATOR);
return partitions;
}

private List<Partition> getPartitions(String databaseName, String tableName, String expression, @Nullable Segment segment)
{
try {
List<Partition> partitions = new ArrayList<>();
Expand All @@ -679,6 +727,7 @@ private List<Partition> getPartitions(String databaseName, String tableName, Str
.withDatabaseName(databaseName)
.withTableName(tableName)
.withExpression(expression)
.withSegment(segment)
.withNextToken(nextToken));
result.getPartitions()
.forEach(partition -> partitions.add(GlueToPrestoConverter.convertPartition(partition)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.airlift.configuration.ConfigDescription;
import io.airlift.configuration.ConfigSecuritySensitive;

import javax.validation.constraints.Max;
import javax.validation.constraints.Min;

import java.util.Optional;
Expand All @@ -33,6 +34,8 @@ public class GlueHiveMetastoreConfig
private Optional<String> awsCredentialsProvider = Optional.empty();
private boolean useInstanceCredentials;
private Optional<String> catalogId = Optional.empty();
private int partitionSegments = 5;
private int getPartitionThreads = 20;

public Optional<String> getGlueRegion()
{
Expand Down Expand Up @@ -163,4 +166,33 @@ public GlueHiveMetastoreConfig setAwsCredentialsProvider(String awsCredentialsPr
this.awsCredentialsProvider = Optional.ofNullable(awsCredentialsProvider);
return this;
}

@Min(1)
@Max(10)
public int getPartitionSegments()
{
return partitionSegments;
}

@Config("hive.metastore.glue.partitions-segments")
@ConfigDescription("Number of segments for partitioned Glue tables")
public GlueHiveMetastoreConfig setPartitionSegments(int partitionSegments)
{
this.partitionSegments = partitionSegments;
return this;
}

@Min(1)
public int getGetPartitionThreads()
{
return getPartitionThreads;
}

@Config("hive.metastore.glue.get-partition-threads")
@ConfigDescription("Number of threads for parallel partition fetches from Glue")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@electrum how to say that 1 is a special value here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The difference is subtle and complicated to explain. I think the description here is fine, though we could document it in the main Hive documentation.

public GlueHiveMetastoreConfig setGetPartitionThreads(int getPartitionThreads)
{
this.getPartitionThreads = getPartitionThreads;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,14 @@
package io.prestosql.plugin.hive.metastore.glue;

import com.google.inject.Binder;
import com.google.inject.Provides;
import com.google.inject.Scopes;
import com.google.inject.Singleton;
import com.google.inject.multibindings.Multibinder;
import io.airlift.concurrent.BoundedExecutor;
import io.airlift.configuration.AbstractConfigurationAwareModule;
import io.prestosql.plugin.hive.ForRecordingHiveMetastore;
import io.prestosql.plugin.hive.HiveCatalogName;
import io.prestosql.plugin.hive.HiveConfig;
import io.prestosql.plugin.hive.metastore.HiveMetastore;
import io.prestosql.plugin.hive.metastore.RecordingHiveMetastore;
Expand All @@ -26,8 +30,13 @@
import io.prestosql.plugin.hive.metastore.cache.ForCachingHiveMetastore;
import io.prestosql.spi.procedure.Procedure;

import java.util.concurrent.Executor;

import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static com.google.inject.multibindings.Multibinder.newSetBinder;
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.airlift.configuration.ConfigBinder.configBinder;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static org.weakref.jmx.guice.ExportBinder.newExporter;

public class GlueMetastoreModule
Expand Down Expand Up @@ -66,4 +75,17 @@ protected void setup(Binder binder)
}
binder.install(new CachingHiveMetastoreModule());
}

@Provides
@Singleton
@ForGlueHiveMetastore
public Executor createExecutor(HiveCatalogName catalogName, GlueHiveMetastoreConfig hiveConfig)
{
if (hiveConfig.getGetPartitionThreads() == 1) {
return directExecutor();
}
return new BoundedExecutor(
newCachedThreadPool(daemonThreadsNamed("hive-glue-%s")),
hiveConfig.getGetPartitionThreads());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ public abstract class AbstractTestHive
.row(3L, "bye", (byte) 46, (short) 346, 345, 456L, 754.2008f, 98.1, false, ImmutableList.of("ape", "bear"), ImmutableMap.of("three", 3L, "four", 4L), ImmutableList.of("false", 0L, false))
.build();

private static final List<ColumnMetadata> CREATE_TABLE_COLUMNS_PARTITIONED = ImmutableList.<ColumnMetadata>builder()
protected static final List<ColumnMetadata> CREATE_TABLE_COLUMNS_PARTITIONED = ImmutableList.<ColumnMetadata>builder()
.addAll(CREATE_TABLE_COLUMNS)
.add(new ColumnMetadata("ds", createUnboundedVarcharType()))
.build();
Expand Down Expand Up @@ -960,6 +960,7 @@ public void testListUnknownSchema()

@Test
public void testGetPartitions()
throws Exception
{
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ public void testDefaults()
.setAwsSecretKey(null)
.setAwsCredentialsProvider(null)
.setCatalogId(null)
.setUseInstanceCredentials(false));
.setUseInstanceCredentials(false)
.setPartitionSegments(5)
.setGetPartitionThreads(20));
}

@Test
Expand All @@ -54,6 +56,8 @@ public void testExplicitPropertyMapping()
.put("hive.metastore.glue.aws-credentials-provider", "custom")
.put("hive.metastore.glue.catalogid", "0123456789")
.put("hive.metastore.glue.use-instance-credentials", "true")
.put("hive.metastore.glue.partitions-segments", "10")
.put("hive.metastore.glue.get-partition-threads", "42")
.build();

GlueHiveMetastoreConfig expected = new GlueHiveMetastoreConfig()
Expand All @@ -66,7 +70,9 @@ public void testExplicitPropertyMapping()
.setAwsSecretKey("DEF")
.setAwsCredentialsProvider("custom")
.setCatalogId("0123456789")
.setUseInstanceCredentials(true);
.setUseInstanceCredentials(true)
.setPartitionSegments(10)
.setGetPartitionThreads(42);

assertFullMapping(properties, expected);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,54 @@
*/
package io.prestosql.plugin.hive.metastore.glue;

import com.google.common.collect.ImmutableList;
import io.airlift.concurrent.BoundedExecutor;
import io.prestosql.plugin.hive.AbstractTestHiveLocal;
import io.prestosql.plugin.hive.authentication.HiveIdentity;
import io.prestosql.plugin.hive.metastore.HiveMetastore;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import java.io.File;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;

import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.prestosql.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT;
import static io.prestosql.testing.TestingConnectorSession.SESSION;
import static java.util.Locale.ENGLISH;
import static java.util.UUID.randomUUID;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

public class TestHiveGlueMetastore
extends AbstractTestHiveLocal
{
private static final HiveIdentity HIVE_CONTEXT = new HiveIdentity(SESSION);

private ExecutorService executorService;

public TestHiveGlueMetastore()
{
super("test_glue" + randomUUID().toString().toLowerCase(ENGLISH).replace("-", ""));
}

@BeforeClass
public void setUp()
{
executorService = newCachedThreadPool(daemonThreadsNamed("hive-glue-%s"));
}

@AfterClass(alwaysRun = true)
public void tearDown()
{
executorService.shutdownNow();
}

/**
* GlueHiveMetastore currently uses AWS Default Credential Provider Chain,
* See https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html#credentials-default
Expand All @@ -41,7 +72,8 @@ protected HiveMetastore createMetastore(File tempDir)
GlueHiveMetastoreConfig glueConfig = new GlueHiveMetastoreConfig();
glueConfig.setDefaultWarehouseDir(tempDir.toURI().toString());

return new GlueHiveMetastore(HDFS_ENVIRONMENT, glueConfig);
Executor executor = new BoundedExecutor(executorService, 10);
return new GlueHiveMetastore(HDFS_ENVIRONMENT, glueConfig, executor);
}

@Override
Expand Down Expand Up @@ -87,4 +119,18 @@ public void testStorePartitionWithStatistics()
{
testStorePartitionWithStatistics(STATISTICS_PARTITIONED_TABLE_COLUMNS, BASIC_STATISTICS_1, BASIC_STATISTICS_2, BASIC_STATISTICS_1, EMPTY_TABLE_STATISTICS);
}

@Test
public void testGetPartitions() throws Exception
{
try {
createDummyPartitionedTable(tablePartitionFormat, CREATE_TABLE_COLUMNS_PARTITIONED);
Optional<List<String>> partitionNames = getMetastoreClient().getPartitionNames(HIVE_CONTEXT, tablePartitionFormat.getSchemaName(), tablePartitionFormat.getTableName());
assertTrue(partitionNames.isPresent());
assertEquals(partitionNames.get(), ImmutableList.of("ds=2016-01-01", "ds=2016-01-02"));
}
finally {
dropTable(tablePartitionFormat);
}
}
}