Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add partitioning push down #23432

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open

Add partitioning push down #23432

wants to merge 12 commits into from

Conversation

dain
Copy link
Member

@dain dain commented Sep 16, 2024

Description

Add partitioning push down to table scan which a connector can use to activate optional partitioning, or choose between multiple partitioning strategies. This replaces the existing Metadata makeCompatiblePartitioning method used exclusively by Hive with a more generic applyPartitioning method.

Hive has been updated to the new system, and now only applies bucketed execution when it is actually used in the coordinator. This can improve performance when parallelism is limited by the bucketing and the bucketing isn't necessary for the query. Additionally, mismatched bucket execution (support join between tables where bucket count differes by a power of two) in Hive is activated by default. I believe this was disabled before, because we did not have the system to automatically disable bucket execution when the bucket count is small compred to the number of nodes.

Iceberg has been updated to support bucketed execution also. This applies the same optimizations available to Hive which allows the engine to eliminate unnecessary redistribution of tables. Additionally, since Iceberg supports multiple independent partitioning functions, a table can effectively have multiple distributions, which makes the optimization
even more effective. Iceberg bucket execution can be controlled with the iceberg.bucket-execution configuration property and the bucket_execution_enabled session property.

Finally, for bucketed tables without a fixed node assignment, the connector can request a stable node distribution across queries. This implemented in Hive and Iceberg and improves cache hit rate for file system caching. The implementation is a simple Rendezvous Hashing (Highest Random Weight) algorithm.

Follup Work

  • Iceberg support for mismatched buckets

Release notes

( ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
(X) Release notes are required, with the following suggested text:

# SPI
* Add partitioning push down, which a connector can use to activate optional partitioning, or choose between multiple partitioning strategies. ({issue}`23432`)

# Iceberg
* Add bucketed execution which can improve performance when running a join or aggregation on a bucketed table. This can be disabled with `iceberg.bucket-execution` configuration property, and the `bucket_execution_enabled` session property. ({issue}`23432`)

# Hive
* Bucket execution is now only enabled when actually useful in the query. ({issue}`23432`)
* Enable mismatched bucket execution optimization by default.  This can be disabled with `hive.optimize-mismatched-bucket-count` configuration property, and the `optimize_mismatched_bucket_count` session property. ({issue}`23432`)

@@ -99,7 +99,8 @@ public Optional<ConnectorBucketNodeMap> getBucketNodeMapping(ConnectorTransactio
public ToIntFunction<ConnectorSplit> getSplitBucketFunction(
ConnectorTransactionHandle transactionHandle,
ConnectorSession session,
ConnectorPartitioningHandle partitioningHandle)
ConnectorPartitioningHandle partitioningHandle,
int bucketCount)
{
return value -> ((HiveSplit) value).getReadBucketNumber()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this validate bucketCount that it matches table bucketing?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the ConnectorPartitioningHandle represents a logical partitioning, and is not specific to any table. This is a common misconception. Additionally, Hive supports mismatched bucket execution if the counts differ by a power of two.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can add some of the above to the code comment in ConnectorPartitioningHandle ?

@@ -162,35 +164,40 @@ private NodePartitionMap getNodePartitioningMap(
requireNonNull(partitioningHandle, "partitioningHandle is null");

if (partitioningHandle.getConnectorHandle() instanceof SystemPartitioningHandle) {
return systemNodePartitionMap(session, partitioningHandle, systemPartitioningCache, partitionCount);
return new NodePartitionMap(systemBucketToNode(session, partitioningHandle, systemPartitioningCache, partitionCount), _ -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: getNodePartitioningMap is used in both scan and insert path, but currently NodePartitioningManager is making assumptions around partitionCount when it's absent. cc @marton-bod

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't follow. This is no different than the behavior of the code we have today. Also all existing tests pass.

@@ -81,18 +75,7 @@ public Optional<ConnectorBucketNodeMap> getBucketNodeMapping(ConnectorTransactio
if (!handle.isUsePartitionedBucketing()) {
return Optional.of(createBucketNodeMap(handle.getBucketCount()));
}

// Allocate a fixed number of buckets. Trino will assign consecutive buckets
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@raunaqmorarka could you run insert benchmarks on this change? I thin kit should be fine as https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/sql/planner/NodePartitioningManager.java#L264 provides enough buckets for global and local parallelism.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new code is just moving this decission to from the Hive connector to the core engine.

@dain dain force-pushed the apply-partitioning branch 8 times, most recently from 0ca7a63 to 5957f6f Compare October 2, 2024 23:05
@dain dain changed the title WIP Add partitioning push down Add partitioning push down Oct 6, 2024
@dain dain requested a review from electrum October 6, 2024 01:22
@@ -677,9 +684,54 @@ public PlanWithProperties visitFilter(FilterNode node, PreferredProperties prefe
@Override
public PlanWithProperties visitTableScan(TableScanNode node, PreferredProperties preferredProperties)
{
// attempt tp push preferred partitioning into the table scan
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: tp -> to

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

return metadata.applyPartitioning(session, node.getTable(), partitioningProperties.getPartitioning().map(Partitioning::getHandle), partitionColumns)
.map(node::withTableHandle)
// Activate the partitioning if it passes the rules defined in DetermineTableScanNodePartitioning
.map(newNode -> DetermineTableScanNodePartitioning.setUseConnectorNodePartitioning(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why it doesn't return node in case setUseConnectorNodePartitioning doesn't set withUseConnectorNodePartitioning(true)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that would be more complex, but also it does end up capturing the information that we could have used the partitioning, but the optimizer decided not to use it. I don't think this shows up in the output today, but it could be added.


@Config("iceberg.bucket-execution")
@ConfigDescription("Enable bucket-aware execution: use physical bucketing information to optimize queries")
public IcebergConfig setBucketExecutionEnabled(boolean bucketExecutionEnabled)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note that it won't work anyway because DetermineTableScanNodePartitioning uses:

        int numberOfBuckets = bucketNodeMap.map(ConnectorBucketNodeMap::getBucketCount)
                .orElseGet(() -> nodePartitioningManager.getNodeCount(context.getSession(), partitioning.partitioningHandle()));

It's essential for connector to be able to provide bucket count to the engine.

If it's just scanning one partition, then it doesn't make sense to funnel that though single node (unless the partition is super small itself).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The iceberg and hive code both only handle the pushdown if hash bucketing is used. This is not used for value based.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you miss the point. CBO (DetermineTableScanNodePartitioning) decides whether to use connector provided bucketing or not. DetermineTableScanNodePartitioning won't use connector bucketing if it doesn't know how many "buckets" there are

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but in AddExchanges we perform pushdown and if it works, we then directly call DetermineTableScanNodePartitioning so it can decide if we should actually use the bucketing from the connector. So both systems must decide that the partitioning should be used for execution.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dain DetermineTableScanNodePartitioning will never choose to use Iceberg partitioning in current code, because it doesn't know partition count.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assure it it does. When you have have a hash bucket function it uses that number. You can pull the branch and run it yourself.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you have have a hash bucket function it uses that number

What is the number? Is it artificial or it actually represents how many partitions there are? What happens if there is only single partition scanned?

The code in io.trino.sql.planner.iterative.rule.DetermineTableScanNodePartitioning#apply is:

        int numberOfBuckets = bucketNodeMap.map(ConnectorBucketNodeMap::getBucketCount)
                .orElseGet(() -> nodePartitioningManager.getNodeCount(context.getSession(), partitioning.getPartitioningHandle()));
        int numberOfTasks = max(taskCountEstimator.estimateSourceDistributedTaskCount(context.getSession()), 1);

        return Result.ofPlanNode(node
                .withUseConnectorNodePartitioning((double) numberOfBuckets / numberOfTasks >= getTableScanNodePartitioningMinBucketToTaskRatio(context.getSession())));

If the number of buckets is some arbitrary big number, but only single partition is scanned, then such query won't utilize the cluster properly (it will essentially utilize single node).

Copy link

github-actions bot commented Nov 8, 2024

This pull request has gone a while without any activity. Tagging the Trino developer relations team: @bitsondatadev @colebow @mosabua

@github-actions github-actions bot added the stale label Nov 8, 2024
@mosabua
Copy link
Member

mosabua commented Nov 8, 2024

Adding stale-ignore and performance labels.. also cc @martint

@mosabua mosabua added performance stale-ignore Use this label on PRs that should be ignored by the stale bot so they are not flagged or closed. and removed stale labels Nov 8, 2024
Check boolean nullsAndAnyReplicated field before more complex fields
Add partitioning push down to table scan which connector can use to
activate optional partitioning, or choose between multiple partitioning
strategies.

This replaces the existing Metadata makeCompatiblePartitioning
method used exclusively by Hive
Add support for pushing plan partitioning into Iceberg when Iceberg
tables use hash bucked partitioning. This enables co-located joins which
can be significantly more efficient. Additionally, since Iceberg
supports multiple independent partitioning functions, a table can
effectively have multiple distributions, which makes the optimization
more effective.

This feature can be controlled with the iceberg.bucket-execution
configuration property and the bucket_execution_enabled session
property.
This improves cache hit rate for file system caching
return bucketExecutionEnabled;
}

@Config("iceberg.bucket-execution")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to see a test in this commit highlighting actually what the 🥩 of the PR is actually about.

Copy link
Member

@raunaqmorarka raunaqmorarka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do the last 2 commits as a separate PR ? I think the rest can land already and it will potentially allow more ppl to review the iceberg bits

Comment on lines +27 to +28
* the bucket will be assigned to a node chosen by the system. The ConnectorPartitionHandle is defined
* declared in ConnectorTablePartitioning property of ConnectorTableProperties.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

defined declared one of them is redundant

@@ -86,7 +86,7 @@ private FaultTolerantPartitioningScheme create(PartitioningHandle partitioningHa
if (connectorBucketNodeMap.isEmpty()) {
return createSystemSchema(partitionCount.orElse(maxPartitionCount));
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

buckedCount typo in commit message

Comment on lines +32 to +35
default Optional<ConnectorBucketNodeMap> getBucketNodeMapping(
ConnectorTransactionHandle transactionHandle,
ConnectorSession session,
ConnectorPartitioningHandle partitioningHandle)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

formatting change in separate commit ?

* <p>
* If the ConnectorPartitioningHandle of two tables are equal, the tables are guaranteed
* to have the same partitioning scheme across nodes, and the engine may use a colocated join.
*/
public interface ConnectorPartitioningHandle
{
default boolean isSingleNode()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also add comments for isSingleNode and isCoordinatorOnly ?

@@ -99,7 +99,8 @@ public Optional<ConnectorBucketNodeMap> getBucketNodeMapping(ConnectorTransactio
public ToIntFunction<ConnectorSplit> getSplitBucketFunction(
ConnectorTransactionHandle transactionHandle,
ConnectorSession session,
ConnectorPartitioningHandle partitioningHandle)
ConnectorPartitioningHandle partitioningHandle,
int bucketCount)
{
return value -> ((HiveSplit) value).getReadBucketNumber()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can add some of the above to the code comment in ConnectorPartitioningHandle ?

List<ColumnHandle> partitionColumns;
if (partitioningProperties.getPartitioning().isPresent()) {
Partitioning partitioning = partitioningProperties.getPartitioning().get();
// constant partitioning values cannot be pushed into the table scan
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we have a combination of constants and partitioning columns, are we not able to take advantage of partitioning ?

import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;

/**
* @param active if not set partitioning is disabled for reading
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment seems a bit unclear, maybe If not set, partitioning is disabled for reading

* @param tableBucketCount Number of buckets in the table, as specified in table metadata
* @param readBucketCount Number of buckets the table will appear to have when the Hive connector presents the table to the engine for read.
* @param forWrite if true, the partitioning is being used for writing and cannot be changed
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if -> If

@@ -904,7 +904,7 @@ public boolean isBucketExecutionEnabled()
}

@Config("hive.bucket-execution")
@ConfigDescription("Enable bucket-aware execution: only use a single worker per bucket")
@ConfigDescription("Enable bucket-aware execution: use physical bucketing information to optimize queries")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should not be in this commit

@@ -161,7 +161,7 @@ public HiveSessionProperties(
sessionProperties = ImmutableList.of(
booleanProperty(
BUCKET_EXECUTION_ENABLED,
"Enable bucket-aware execution: only use a single worker per bucket",
"Enable bucket-aware execution: use physical bucketing information to optimize queries",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should not be in this commit

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cla-signed hive Hive connector iceberg Iceberg connector performance stale-ignore Use this label on PRs that should be ignored by the stale bot so they are not flagged or closed.
Development

Successfully merging this pull request may close these issues.

5 participants