Skip to content

Add connector specific partitioning support for remote exchanges#12373

Merged
arhimondr merged 7 commits intoprestodb:masterfrom
arhimondr:connector-partitioning
Feb 28, 2019
Merged

Add connector specific partitioning support for remote exchanges#12373
arhimondr merged 7 commits intoprestodb:masterfrom
arhimondr:connector-partitioning

Conversation

@arhimondr
Copy link
Member

No description provided.

Copy link
Contributor

@wenleix wenleix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Tiny ExchangeNode refactorings" and "Rename query.initial-hash-partitions property" looks good.

Copy link
Contributor

@wenleix wenleix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Add max_tasks_per_stage session property"

Minor comment. Maybe explain the motivation in commit message? (control the number of tasks in

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious: where is the name fixed distribution stage from? I guess it's from FixedSourcePartitionedScheduler ? :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I aggree that the description is kinda confusing. But i coudn't come up with anything better.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we name it stage.max-tasks-per-stage given how other property named ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might need to change other places used getHashPartitionCount(). Especially the usage in AddExchange and RewriteSpatialPartitioningAggregation .

Maybe refactor this min operation as a static helper method in SystemPartitioningHandle ?

Copy link
Contributor

@wenleix wenleix Feb 23, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, do we want to apply min here ? Today, we cannot execute when we have more system partitions than number of tasks.

However, in the future, we might want to materialize system partitioning result (as in contrast to Hive partitioning). And at that time it would be legit to have 1000 partitions, while the max tasks per stages is only 300?

Update: I think it's OK for now to have this behavior for SystemPartitioning

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Especially the usage in AddExchange

This is used when creating connector partitioning. The number of hash partitions for connector partitioning shouldn't depend on the number of nodes. (we want to have more buckets than we have nodes)

RewriteSpatialPartitioningAggregation

This is needed for partitioned spatial join. We can work on that later.

Maybe refactor this min operation as a static helper method in SystemPartitioningHandle

It is used only in one place. Once there are more places - we can refactor it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, this is about getting the NodePartitioningMap, so we should enforce max-stages-per-task. While in AddExchange and other plan optimizer, we should only use hash_partition_count since it's a logical partitioning. I am assuming today we will fail when hash_partition_count > max-stages-per-task

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It won't fail, as we don't have to specify the number of partitions for the system partitioning upfront. System partitioning is still a special case for now.

Copy link
Contributor

@wenleix wenleix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Use static imports in AddExchanges" and "Remove initial capacity hint when selecting nodes"

Looks good. I think in previous use case use initial capacity hint is not a bad idea ;)

Alternatively, you can make NodeSelector.selectRandomNodes short cut to allNodes() when limit > totalNodeCount

Copy link
Contributor

@shixuan-fan shixuan-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First two commits look good.

"Add max_tasks_per_stage session property"

Copy link
Contributor

@shixuan-fan shixuan-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code looks good. Would you mind adding motivation to the last commit message?

@arhimondr arhimondr force-pushed the connector-partitioning branch from 79a0e2d to ed91fd4 Compare February 22, 2019 18:57
Copy link
Contributor

@wenleix wenleix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Support connector partitioning provider"

Generally looks good to me. Maybe call the commit message should be "Allow use connector-specific partitioning for remote exchange" ?

And as @shixuan-fan suggests, we should explain the motivation in commit message :) .

There are two TODOs for me as reviewer:

  1. I will do a more careful pass into AddExchanges. The refactors looks reasonable, but might worthy a bit deeper looking given the complication of AddExchanges.

  2. I will contemplate/experiment about how does this work together with compatible partitioning. I suggest @shixuan-fan and @arhimondr also think about this ;) . For example:

Assume we are performing A JOIN B. A is bucketed into 512 buckets, and B is not bucketed. The session property tells us to use HivePartitioning with 1024 partitions for remote exchange.

Do we think the partitioning over A and B (after exchange) are considered as compatible partitioning? If so, with today's code we will try to re-adjust B's partitioning to be 512 partitions -- will it work since B's partitioning is derived from Remote Exchange, rather than Table Scan. -- We don't have to solve all the problems in this PR, and we should start thinking about these interesting synergies :)

A more fundamental problem is today the compatible partitioning is adapted in the connector's read path and totally opaque to engine. We might make engine aware of that at some time :)

@wenleix
Copy link
Contributor

wenleix commented Feb 23, 2019

Also Travis are failure :). I assume they are trivial fixes such as some tests has to be disabled :)

@arhimondr
Copy link
Member Author

Do we think the partitioning over A and B (after exchange) are considered as compatible partitioning? If so, with today's code we will try to re-adjust B's partitioning to be 512 partitions -- will it work since B's partitioning is derived from Remote Exchange, rather than Table Scan. -- We don't have to solve all the problems in this PR, and we should start thinking about these interesting synergies :)

That's a very good question. So yeah, now if one table is already bucketed - the code will try to bucket the other table with the same number of buckets. So the hash_partitions property won't override the bucket number for now. That is something we should consider doing (maybe making it optional). If you don't mind i would like to keep this PR simple for now, and contemplate the approach for overriding the number of buckets later.

@arhimondr arhimondr force-pushed the connector-partitioning branch from ed91fd4 to 55a4228 Compare February 25, 2019 15:09
@wenleix
Copy link
Contributor

wenleix commented Feb 25, 2019

@arhimondr

the code will try to bucket the other table with the same number of buckets.

Ah I see. At least there is no compatible buckets comes into complicate the issue :).

That is something we should consider doing (maybe making it optional).

Yeah... the problem is we don't want to have 100 configs on this... tough question :)

i would like to keep this PR simple for now

Agree. We should keep this PR as simple as possible. Just keep in mind there might be interesting behaviors in some cases...

Shall we add experimental- prefix to this new session property ? :)

@wenleix
Copy link
Contributor

wenleix commented Feb 26, 2019

I looked into a code a bit, for a join consists of all remote sources will use FixedCountScheduler:

// remote source requires nodePartitionMap
NodePartitionMap nodePartitionMap = partitioningCache.apply(plan.getFragment().getPartitioning());
if (groupedExecutionForStage) {
checkState(connectorPartitionHandles.size() == nodePartitionMap.getBucketToPartition().length);
}
stageNodeList = nodePartitionMap.getPartitionToNode();
bucketNodeMap = nodePartitionMap.asBucketNodeMap();
bucketToPartition = Optional.of(nodePartitionMap.getBucketToPartition());

The nodePartitionMap computation logic is in NodePartitioningManager. getNodePartitioningMap:

public NodePartitionMap getNodePartitioningMap(Session session, PartitioningHandle partitioningHandle)
.

So, for connector provided partitioning, #nodes = all-nodes-count, #partitions = hash-partition-count :

bucketToNode = createArbitraryBucketToNode(
nodeScheduler.createNodeSelector(connectorId).allNodes(),
connectorBucketNodeMap.getBucketCount());

Otherwise it goes into SystemPartitioningHandle. getNodePartitionMap. And #nodes = #partitions = min(hash-partition-count, max-task-per-stage).

if (partitioningHandle.getConnectorHandle() instanceof SystemPartitioningHandle) {
return ((SystemPartitioningHandle) partitioningHandle.getConnectorHandle()).getNodePartitionMap(session, nodeScheduler);
}


This is probably OK for now, but just keep in mind at some point we might want to have more consistent behavior between SystemPartitioning and ConnectorPartitioning

Copy link
Contributor

@wenleix wenleix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some additional minor comments to Support connector partitioning provider. Otherwise looks good.

@wenleix
Copy link
Contributor

wenleix commented Feb 26, 2019

This is part of the effort on Support Materialized Exchange (#12387)

@arhimondr
Copy link
Member Author

@wenleix

Alternatively, you can make NodeSelector.selectRandomNodes short cut to allNodes() when limit > totalNodeCount

Unfortunately nodes are provided as an Iterator. Iterator doesn't have the size method in it. We can try to change the interfaces, but honestly i don't think that expanding the list would be an issue. Most of the time we will expland it to 1000 elements max.

@arhimondr
Copy link
Member Author

@wenleix

Alternatively, you can make NodeSelector.selectRandomNodes short cut to allNodes() when limit > totalNodeCount

I added the size() method to the ResettableRandomizedIterator, so the initial capacity hint can take the total available number of nodes into account

@arhimondr
Copy link
Member Author

@wenleix

Shall we add experimental- prefix to this new session property ? :)

I'm not sure. We got a lot of properties that are "experimental" and that have been there for years. So i'm a little bit reluctant of adding that prefix.

Approaching from the other side - no matter if we prefix it with experimental - if we decided to get rid of that property - we will have to migrate all the clients anyway

@arhimondr
Copy link
Member Author

arhimondr commented Feb 26, 2019

@wenleix

This is probably OK for now, but just keep in mind at some point we might want to have more consistent behavior between SystemPartitioning and ConnectorPartitioning

It doesn't make sense to have more partitions than we have nodes for the system partitioning. We are not going to use the system partitioning for materialization and bucket-by-bucket for now. And if all the partitions have to be executed all at once - there's no point of having more of them than there are nodes in the cluster.

@arhimondr
Copy link
Member Author

@wenleix @shixuan-fan Comments addressed. Please have another pass.

@wenleix
Copy link
Contributor

wenleix commented Feb 27, 2019

@arhimondr

It doesn't make sense to have more partitions than we have nodes for the system partitioning. We are not going to use the system partitioning for materialization and bucket-by-bucket for now. And if all the partitions have to be executed all at once - there's no point of having more of them than there are nodes in the cluster.

Correct for now. But I expect at some time we want to support materializing system partitioning as well. And it seems to me make sense to unify system partitioning and connector partitioning.

I do agree we don't need to worry about it now. But just something keep in mind :)

@wenleix wenleix assigned wenleix and unassigned arhimondr Feb 27, 2019
Copy link
Contributor

@nezihyigitbasi nezihyigitbasi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add max_tasks_per_stage session property

  • commit details message has the config name incorrect.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for non-source stages OR for intermediate stages ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nezihyigitbasi : Thanks for the comment! In our case it's more about whether the stage/fragment's distribution is SOURCE_DISTRIBUTION (

public static final PartitioningHandle SOURCE_DISTRIBUTION = createSystemPartitioning(SystemPartitioning.SOURCE, SystemPartitionFunction.UNKNOWN);
) . For example, a collocated join might be a leaf stage, but the distribution of that stage/fragment would follow table partitioning.

Maybe we can say Maximum number of tasks for stage, unless its partitioning handle is SOURCE_DISTRIBUTION to avoid confusion?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if an average end user would now what it is a partitioning handle.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maximum number of tasks for a non source distributed stage - I think it should be pretty intuitive.

The user should understand that it is not possible (at least for all the cases) to limit the number of source distributed tasks. As the source distribute tasks might be bound to the data location. So, somehow it makes sense. However i agree that it is still quite confusing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor

@nezihyigitbasi nezihyigitbasi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Allow use connector-specific partitioning for remote exchange

  • Commit message title can be Add connector specific partitioning support for remote exchanges
  • If connector specified partitioning is used for exchanges, remote exchanges can be replaced with a bucketed table write followed by a bucketed table read.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

. at the end.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not plural (getPartitioningHandleForExchanges)?
Or getExchangePartitioningHandle?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we are getting partitioning handles per exchange, as the handle contains the list of types for partitioning columns. So technically there will be one partitioning handle per exchange.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we need a better description here as this repeats the property name. Maybe something like Name of the catalog providing custom partitioning. or sth along those lines.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

. at the end.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partitioningProviderCatalogName

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just partitioningProviderCatalog to be consistent with the property?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have multiple partitionedExchange methods with complex list of arguments, would be nice to unify some of them to simplify the API surface here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nezihyigitbasi : Does the next commit (rename some of them specifically to systemPartitiontedExchange ) help to clarify the API ?

Copy link
Contributor

@wenleix wenleix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Improve selected nodes initial capacity hint"

minor comments... I really feel we can just change it to List<Node :) . ResettableRandomizedIterator will anyway copy it into a List.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow . I really feel we just want to use a List<Node> (and do ImmutableList.copyOf if the original input is a Set .

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apparently we were trying to optimize randomization. So instead of shuffling all nodes everytime, only the number of nodes needed can be shuffled.

Copy link
Contributor

@wenleix wenleix Feb 27, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By using ResettableRandomizedIterator, it already defeat the purpose since it will do a copy into ArrayList in constructor :)

Maybe we should just use your previous solution and allow list resizing. Sorry about the back-and-forth :(.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By using ResettableRandomizedIterator, it already defeat the purpose since it will do a

We do a copy, but we don't call to the random number generator unless needed. Not sure how big a problem is that. But since it was optimized that way - let's keep it as is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's kind of weird for an iterator to have size. Plus, this looks really like an ListIterator :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, not nice. But the class is pretty private. I would simply keep it. The usages are pretty readable (e.g: candidates.size())

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be an IllegalStateException? This method seems only applicable to DML, which I think if catalog is missing, it should be caught earlier than here (not entirely sure about this).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should keep the exception as user error. IllegalStateException will be categorized as in internal error.

Copy link
Contributor

@wenleix wenleix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Allow use connector-specific partitioning for remote exchange"

Looks good. I agree with @nezihyigitbasi 's comment about commit message, with some additions about this bucketed execution is a future work. Feel free to revise it.

Add connector specific partitioning support for remote exchanges

This opens the opportunities to perform exchange through connector tables (by replacing remote exchanges with bucketed table write followed by a bucketed table read), which is required by the following use cases:
* Supports recoverable exchange
* Supports arbitrary large JOIN/AGGREGATE by first bucket input data, and use grouped execution.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nezihyigitbasi : Does the next commit (rename some of them specifically to systemPartitiontedExchange ) help to clarify the API ?

Copy link
Contributor

@wenleix wenleix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Rename static factory methods in ExchangeNode"

Looks good.

@wenleix
Copy link
Contributor

wenleix commented Feb 27, 2019

Make sure also addresses @nezihyigitbasi and @shixuan-fan 's comments :)

@wenleix wenleix assigned arhimondr and unassigned wenleix Feb 27, 2019
@wenleix wenleix changed the title Support arbitrary connector partitioning Add connector specific partitioning support for remote exchanges Feb 27, 2019
@arhimondr arhimondr force-pushed the connector-partitioning branch from 9003715 to 397a776 Compare February 27, 2019 20:47
@arhimondr
Copy link
Member Author

@nezihyigitbasi , @shixuan-fan comments addressed. Could you please have an another look?

Copy link
Contributor

@shixuan-fan shixuan-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good.

Copy link
Contributor

@nezihyigitbasi nezihyigitbasi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add max_tasks_per_stage session property

The commit details doesn't have the correct config name (task.max-tasks-per-stage is not correct).

Copy link
Contributor

@nezihyigitbasi nezihyigitbasi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Allow use connector-specific partitioning for remote exchange

Please see my previous comment for the commit message.

Copy link
Contributor

@nezihyigitbasi nezihyigitbasi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM too % minor comments about the commit messages.

@arhimondr
Copy link
Member Author

Please see my previous comment for the commit message.

Sorry. I don't know why was i sure that i have it changed.

@arhimondr arhimondr force-pushed the connector-partitioning branch from 397a776 to 8ba0d6a Compare February 27, 2019 22:34
Rename the query.initial-hash-partitions to query.hash-partition-count to match
the session property name
And coresponding stage.max-tasks-per-stage configuration property
If connector specified partitioning is used for exchanges, remote
exchanges can be replaced with a bucketed table write followed by
a bucketed table read
Rename partitionedExchange to systemPartitionedExchange to emphasize that the
system partitioning is created within.
@arhimondr arhimondr force-pushed the connector-partitioning branch from 535e878 to 42b7a6c Compare February 28, 2019 18:16
@arhimondr arhimondr merged commit e170666 into prestodb:master Feb 28, 2019
@arhimondr arhimondr deleted the connector-partitioning branch February 28, 2019 18:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants