Allow forcing streaming exchange for Mark Distinct#14216
Allow forcing streaming exchange for Mark Distinct#14216wenleix merged 2 commits intoprestodb:masterfrom
Conversation
21d0ef0 to
7425200
Compare
|
Hmm.. ideally we want to add the third option into
But I can totally see this would make operation / rollout totally a mess. So it makes sense to have this new |
|
I don't think this is an exchange materialization strategy because we may have several different permutations of whether we want to stream or materialize. Enumerating the permutations would be difficult under one config option. A strategy would be something more like "CBO_BASED_MATERIALIZATION" that makes smarter decisions. |
7425200 to
76c3ea2
Compare
@aweisberg : By "permutation" I assume you mean there are several different orthogonal dimensions about whether do stream or materialize ? (e.g. based on operator ? :) ) In my opinion it's still an exchange materialization strategy, however, we decompose the strategy space into several orthogonal options, such as decision on |
wenleix
left a comment
There was a problem hiding this comment.
"Use static imports in TestAddExchangePlans". LGTM.
...-main/src/test/java/com/facebook/presto/sql/planner/optimizations/TestAddExchangesPlans.java
Outdated
Show resolved
Hide resolved
|
Not related to this change, but a related topic. We should change the default for |
wenleix
left a comment
There was a problem hiding this comment.
"Add stream mark distinct with LBM session property". One question about the condition.
For the commit message subject, "LBM" is an Facebook internal term, what about :
Allow stream exchange for mark distinct when materialized exchange is set
When MarkDistinctOperator is to execute queries with many COUNT(DISTINCT)
it requires separate exchange for every distinct COUNT(DISTINCT), which is
prohibitively expensive for materialized exchange.
There was a problem hiding this comment.
nit: What about useStreamExchangeForMarkDistinct?
There was a problem hiding this comment.
You are right technically that is what this does. I'll change to that.
There was a problem hiding this comment.
ditto, query.use-stream-exchange-for-mark-distinct
There was a problem hiding this comment.
nit: useStreamExchangeForMarkDitinct as variable name ? :)
presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java
Outdated
Show resolved
Hide resolved
35c8f41 to
58bf6d0
Compare
There was a problem hiding this comment.
nit: It should be fine to inline it after the static import is applied
|
@rongrong Regarding changing the default of use_mark_distinct. Where peak_task_memory is an issue use_mark_distinct can help. For T1 this is more interesting because we have more distributed memory than task memory. |
|
@wenleix regarding the commit message. I was trying to keep the first line under 50 characters. That's 73. |
|
@wenleix Also the description seems too specific WRT to COUNT(DISTINCT)? You can distinct the inputs to any aggregation column. |
412ad85 to
c7071fe
Compare
c7071fe to
48aa362
Compare
|
@wenleix Finished landing changes. I went with a slightly different commit message and |
wenleix
left a comment
There was a problem hiding this comment.
LGTM. Will merge it once Travis is green.
(A side note -- do we know if this helps? 😃 )
When MarkDistinctOperator executes queries with distincted aggregation columns it requires an exchange per column, which is which is prohibitively expensive for materialized exchange. Add "query.use-stream-exchange-for-mark-distinct" to force streaming for mark distinct even if materialized exchange is enabled.
48aa362 to
1e124c4
Compare
|
Merged #14216, thanks for the contribution! |
Mark distinct is enabled by default and introduces a stage per distinct column in a query in order to redistribute the tracking of distinct values across the entire cluster. With materialized exchange the cost of these additional exchanges is higher. Enabling streaming exchange if there is sufficient memory available to perform the distincting will reduce the cost of these additional stages while still reducing the memory used by other by non-Mark Distinct stages.