Implement Parallel Partition Pruning for Glue Hive Metastore#1465
Implement Parallel Partition Pruning for Glue Hive Metastore#1465electrum merged 1 commit intotrinodb:masterfrom anoopj:master
Conversation
presto-hive/src/main/java/io/prestosql/plugin/hive/metastore/glue/GlueHiveMetastore.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
nit: per code style, if there are too many params to put on one line, we put every on a separate line:
public GlueHiveMetastore(
HdfsEnvironment hdfsEnvironment,
GlueHiveMetastoreConfig glueConfig,
@ForGlueHiveMetastore Executor executor)
There was a problem hiding this comment.
requireNonNull(executor, "executor is null");
There was a problem hiding this comment.
we prefer immutable collections, so: toImmutableList()
however, in this case you don't need the list at all:
.forEach(segment -> completionService.submit(() -> ....);
There was a problem hiding this comment.
we don't use abbreviations like "num"
maybe getPartitionThreads ?
There was a problem hiding this comment.
- no need for "Glue" here
- add "total" to emphasize this is total cap, not per invocation:
maxGetPartitionTotalThreads ?
Also, what is the rationale for 50 as the default?
What's the default request limit for Glue GetPartition call?
What's the typical duration of a call?
Also, should this be off by default?
There was a problem hiding this comment.
Will change variable names. Typically the call is expected to take hundreds of millis. 5 concurrent segments is conservative enough to be one by default.
There was a problem hiding this comment.
This needs to be renamed appropriately, see comment at field name.
There was a problem hiding this comment.
When hiveConfig.getMaxGlueGetPartitionThreads() == 1 we could return directExecutor() here.
presto-hive/src/test/java/io/prestosql/plugin/hive/metastore/glue/TestHiveGlueMetastore.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Sort partitions before returning. We want planning to be deterministic (as much as possible).
There was a problem hiding this comment.
When sorting, please do this outside of try block, so that try-catch encompasses as little as possible.
There was a problem hiding this comment.
OK. Will do the sorting based on partition values.
There was a problem hiding this comment.
Adding this method may change number of test methods effectively run in TestHiveGlueMetastore. Please extract to a separate commit.
There was a problem hiding this comment.
This change will cause all of the tests that depend on existing tables to fail (they are currently skipped by the method in the super class). What is the reason for this change?
There was a problem hiding this comment.
I found that the getPartitions tests are getting skipped without this change. What is the best way to test them?
There was a problem hiding this comment.
It's complicated. Travis invokes presto-hive-hadoop2/bin/run_hive_tests.sh which
- starts up HDFS and a Hive metastore in Docker
- runs the
presto-hive/src/test/sql/create-test.sqlHive script to create test tables - runs the
AbstractTestHivetests against this environment
The purpose is to create tables in various ways using Hive, then make sure that Presto can read them. That's where the "existing tables" part comes from -- they already exist when the tests run.
It's fine to leave this out, since there are plenty of existing tests that exercise the metastore partition calls. I verified this by running TestHiveInMemoryMetastore with logging on the various getPartition* metastore calls.
|
@anoopj technically, this looks decent. Are we improving Glue throughput this way? |
This change can improve query planning time on heavily partitioned tables because we can do the scan of the partitions in parallel. For large tables with millions of partitions, a query can be stuck in the planning phase for dozens of minutes. My tests show that the query planning time can be improved by up to an order of magnitude.
Please note that Glue GetPartitions is a paginated API and it returns only a set of partitions. So even if the Glue service did parallel reads across segments, it is not likely to help clients because they would be making the same number of calls anyway to the service.
I think 5 is a conservative number and should be a safe default. For what it's worth, this is the default followed by Spark and Hive on EMR. (search for segments on the doc page) We can add some documentation in Presto about the new setting and advise Presto users to either adjust this setting if they run into throttling or contact AWS to raise the throttling limits. |
good point. |
There was a problem hiding this comment.
nit
| public @interface ForGlueHiveMetastore | |
| public @interface ForGlueHiveMetastore {} |
There was a problem hiding this comment.
When sorting, please do this outside of try block, so that try-catch encompasses as little as possible.
There was a problem hiding this comment.
it's hierarchical: next page (token) within a segment, so move withSegment before withNextToken
There was a problem hiding this comment.
Unrelated, so best to separate commit or drop the change.
|
@findepi Updated the PR incorporating the review comments. |
|
I have some doubts about intuitiveness of the configuration.
This may be confusing to administrators. Also, we don't have option to configure "1 thread throttle". At least in theory, this might be an issue in an organization with multiple clusters, hitting Glue API limits. I think we could use @electrum what's your thinking? |
|
Currently when hiveConfig.getMaxGlueGetPartitionThreads() == 1, we are using a direct executor. Why don't we just use a threadpool of size 1 so that the behavior is consistent? |
|
Let's rename the config to |
electrum
left a comment
There was a problem hiding this comment.
A few minor comments. Overall code looks good.
There was a problem hiding this comment.
This names seems wrong, since it's not for caching. Could just be createExecutor since the scope is for the Glue metastore module.
There was a problem hiding this comment.
Sorry for missing that one.
There was a problem hiding this comment.
I think this would be easier to read as a traditional for loop
presto-hive/src/main/java/io/prestosql/plugin/hive/metastore/glue/GlueHiveMetastore.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
We shouldn't need to convert to string here, as List<String> is naturally comparable with the same semantics
There was a problem hiding this comment.
Also this should probably be partitions.sort() rather than Collections.sort(partitions)
There was a problem hiding this comment.
Will change to partitions.sort(). Not using the string would require me to write a custom comparator that compares List since List is not Comparable. Something like:
partitions.sort((p1, p2) -> {
List<String> values1 = p1.getValues();
List<String> values2 = p2.getValues();
if (values1.size() != values2.size()) {
return values1.size() - values2.size();
}
for (int i = 0; i < values1.size(); i++) {
int c = values1.get(i).compareTo(values2.get(i));
if (c != 0) {
return c;
}
}
return 0;
});
Does that sound reasonable?
There was a problem hiding this comment.
partitions.sort(com.google.common.collect.Ordering.natural().lexicographical())
(there should be a way to do this without guava... but i didn't find it.)
There was a problem hiding this comment.
This catch block is not needed -- any exception thrown by the test method will fail the test.
There was a problem hiding this comment.
Sure. Had to add throws to the parent class method too.
There was a problem hiding this comment.
Yep, throws Exception is very common for test methods (and only for test methods)
There was a problem hiding this comment.
This change will cause all of the tests that depend on existing tables to fail (they are currently skipped by the method in the super class). What is the reason for this change?
There was a problem hiding this comment.
use ordinary if
if (hiveConfig.getTotalGetPartitionThreads() == 1) {
return directExecutor();
}
There was a problem hiding this comment.
@electrum how to say that 1 is a special value here?
There was a problem hiding this comment.
The difference is subtle and complicated to explain. I think the description here is fine, though we could document it in the main Hive documentation.
presto-hive/src/main/java/io/prestosql/plugin/hive/metastore/glue/GlueHiveMetastoreConfig.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Also this should probably be partitions.sort() rather than Collections.sort(partitions)
There was a problem hiding this comment.
i don't think "total" should be here.
@electrum, hive.metastore.glue.partitions-segments ?
There was a problem hiding this comment.
Please let me know if this needs a change.
There was a problem hiding this comment.
Agreed, let's change this as @findepi suggested. The "total segments" makes sense in terms of the parameter in the API call (since it is saying X of Y), but "partition segments" seems better for a configuration name.
|
@anoopj a new thought. |
|
@findepi My recommendation is to keep it simple and use parallel calls because a typical query usually spans way more than 5 partitions. Also, the default throttling of Glue is way higher and should allow several concurrent queries (and can be raised by contacting AWS). |
|
I agree with keeping the code simple, but I also agree with @findepi that we could be a bit smarter about parallel reads. One idea is to have a total = min(((partitionCount - 1) / minPartitionsPerSegment) + 1, totalSegments);This would limit parallelism for small numbers of partitions while adding minimal complexity. |
Maybe |
|
Sorry, i missed last update.
If this is an assumption, then we have divergent assumptions here.
I am aware. This is why i proposed to go parallel only after first call. Maybe we try to impl that and see how much we sacrifice on code simplicity? |
|
I'm more worried about making an unnecessary API call to switch to parallel partition pruning. In my experience using partitioned tables, the tables were typically partitioned using a set of keys and also a time series field and many queries were hitting several partitions with some queries spending more time in query planning than execution and hence this PR. If there are concerns, we could make this serial by default. |
electrum
left a comment
There was a problem hiding this comment.
If no one has strong objections, I think this is good to merge with the current behavior. It's a win in the case of many partitions, and for the few partitions case, it likely doesn't add enough overhead to matter. Otherwise, I think we'd need to collect real stats to compare the different cases in various scenarios.
@anoopj can you rebase and address all of the existing comments so that we can get this merged?
There was a problem hiding this comment.
Agreed, let's change this as @findepi suggested. The "total segments" makes sense in terms of the parameter in the API call (since it is saying X of Y), but "partition segments" seems better for a configuration name.
There was a problem hiding this comment.
Yep, throws Exception is very common for test methods (and only for test methods)
|
@dain and I were looking at this and noticed a big inefficiency in the interaction between Presto and the Glue API design. Presto accesses partition information in two phases:
The problem is that the Glue API only allows fetching full partition metadata -- there is no efficient way to fetch just the partition names. So for the Glue metastore implementation, we fetch the full partition metadata during planning, throw away everything but the name, then fetch the metadata again during execution. If the Glue API had a way to fetch just partition names, we might not need this segmented fetching, since listing names would (hopefully) be significantly faster. |
|
That is a good observation and we are aware that this is suboptimal. We currently are working on adding a flag to the Glue I don't think it would obviate the parallel/segmented calls though, since there could be heavily partitioned tables and queries that need to read a lot of partitions. Maybe we could adjust the defaults to be even lower than 5 based on some tests. I'm a bit overbooked this week and will update the PR with feedback next week hopefully. |
|
Updated the PR incorporating feedback. I've also tested this on a Presto cluster. On a table with about 2000 partitions, On heavily partitioned tables, this can result in an order of magnitude improvement. |
electrum
left a comment
There was a problem hiding this comment.
A few minor comments, otherwise looks good. There is one minor fix needed for the Travis build.
There was a problem hiding this comment.
Nit: use a method reference and static import
private static final Comparator<Partition> PARTITION_COMPARATOR =
comparing(Partition::getValues, lexicographical(CASE_INSENSITIVE_ORDER));There was a problem hiding this comment.
The difference is subtle and complicated to explain. I think the description here is fine, though we could document it in the main Hive documentation.
There was a problem hiding this comment.
Nit: put throws clause on next line
There was a problem hiding this comment.
Create a shared cached thread pool for the test class using @BeforeClass / @AfterClass so that we shut it down, otherwise we can run out of JVM threads when running many tests. See TestThrottledAsyncQueue for an example.
There was a problem hiding this comment.
These need to be updated to match the new names. This is the cause of the Travis CI failure.
There was a problem hiding this comment.
Fixed. Sorry for missing that.
This change parallelizes the partition fetch for the Glue metastore by splitting the partitions into non-overlapping segments[2]. This can speed up query planning by upto an order of magnitude. [1] https://docs.aws.amazon.com/glue/latest/webapi/API_Segment.html
|
Updated the PR with feedback. |
|
Merged, thanks! |
Document the new configurations introduced as part of #1465 and a few other configs introduced over time. Co-authored-by: Piotr Findeisen <piotr.findeisen@gmail.com>
This change parallelizes the partition fetch for the Glue metastore by
splitting the partitions into non-overlapping segments[2]. This can speed
up query planning by upto an order of magnitude.
[1] https://docs.aws.amazon.com/glue/latest/webapi/API_Segment.html