1313 */
1414package io .trino .sql .planner ;
1515
16+ import com .google .common .collect .ImmutableList ;
1617import com .google .common .collect .ImmutableMap ;
18+ import io .airlift .units .DataSize ;
1719import io .trino .spi .predicate .Domain ;
1820import io .trino .spi .predicate .TupleDomain ;
1921import io .trino .spi .type .Type ;
2022import io .trino .sql .planner .plan .DynamicFilterId ;
2123import io .trino .sql .planner .plan .JoinNode ;
2224import io .trino .sql .planner .plan .PlanNode ;
2325
24- import javax .annotation .Nullable ;
2526import javax .annotation .concurrent .GuardedBy ;
2627
27- import java .util .ArrayList ;
2828import java .util .HashMap ;
2929import java .util .List ;
3030import java .util .Map ;
31+ import java .util .Queue ;
3132import java .util .Set ;
33+ import java .util .concurrent .ConcurrentLinkedQueue ;
3234import java .util .function .Consumer ;
3335
3436import static com .google .common .base .MoreObjects .toStringHelper ;
3537import static com .google .common .base .Preconditions .checkArgument ;
3638import static com .google .common .base .Preconditions .checkState ;
3739import static com .google .common .base .Verify .verify ;
3840import static com .google .common .collect .ImmutableMap .toImmutableMap ;
41+ import static io .trino .spi .predicate .TupleDomain .columnWiseUnion ;
3942import static java .util .Objects .requireNonNull ;
4043import static java .util .function .Function .identity ;
4144
4245public class LocalDynamicFilterConsumer
4346 implements DynamicFilterSourceConsumer
4447{
45- private static final int PARTITION_COUNT_INITIAL_VALUE = -1 ;
4648 // Mapping from dynamic filter ID to its build channel indices.
4749 private final Map <DynamicFilterId , Integer > buildChannels ;
48-
4950 // Mapping from dynamic filter ID to its build channel type.
5051 private final Map <DynamicFilterId , Type > filterBuildTypes ;
51-
5252 private final List <Consumer <Map <DynamicFilterId , Domain >>> collectors ;
53+ private final long domainSizeLimitInBytes ;
5354
5455 // Number of build-side partitions to be collected, must be provided by setPartitionCount
5556 @ GuardedBy ("this" )
56- private int expectedPartitionCount = PARTITION_COUNT_INITIAL_VALUE ;
57-
57+ private Integer expectedPartitionCount ;
5858 @ GuardedBy ("this" )
59- private boolean collected ;
60-
61- // The resulting predicates from each build-side partition.
62- @ Nullable
59+ private int collectedPartitionCount ;
6360 @ GuardedBy ("this" )
64- private List < TupleDomain < DynamicFilterId >> partitions ;
61+ private volatile boolean collected ;
6562
66- public LocalDynamicFilterConsumer (
67- Map <DynamicFilterId , Integer > buildChannels ,
68- Map <DynamicFilterId , Type > filterBuildTypes ,
69- List <Consumer <Map <DynamicFilterId , Domain >>> collectors )
63+ private final Queue <TupleDomain <DynamicFilterId >> summaryDomains = new ConcurrentLinkedQueue <>();
64+
65+ public LocalDynamicFilterConsumer (Map <DynamicFilterId , Integer > buildChannels , Map <DynamicFilterId , Type > filterBuildTypes , List <Consumer <Map <DynamicFilterId , Domain >>> collectors , DataSize domainSizeLimit )
7066 {
7167 this .buildChannels = requireNonNull (buildChannels , "buildChannels is null" );
7268 this .filterBuildTypes = requireNonNull (filterBuildTypes , "filterBuildTypes is null" );
7369 verify (buildChannels .keySet ().equals (filterBuildTypes .keySet ()), "filterBuildTypes and buildChannels must have same keys" );
74-
7570 requireNonNull (collectors , "collectors is null" );
7671 checkArgument (!collectors .isEmpty (), "collectors is empty" );
77- this .collectors = collectors ;
78- this .partitions = new ArrayList <> ();
72+ this .collectors = ImmutableList . copyOf ( collectors ) ;
73+ this .domainSizeLimitInBytes = requireNonNull ( domainSizeLimit , "domainSizeLimit is null" ). toBytes ();
7974 }
8075
8176 @ Override
82- public void addPartition (TupleDomain <DynamicFilterId > tupleDomain )
77+ public void addPartition (TupleDomain <DynamicFilterId > domain )
8378 {
79+ if (collected ) {
80+ return ;
81+ }
82+
83+ summaryDomains .add (domain );
84+ // Operators collecting dynamic filters tend to finish all at the same time
85+ // when filters are collected right before the HashBuilderOperator.
86+ // To avoid multiple task executor threads being blocked on waiting
87+ // for each other when collecting the filters run the heavy union operation
88+ // outside the lock.
89+ unionSummaryDomains ();
90+
8491 TupleDomain <DynamicFilterId > result ;
8592 synchronized (this ) {
93+ verify (expectedPartitionCount == null || collectedPartitionCount < expectedPartitionCount );
94+
8695 if (collected ) {
96+ summaryDomains .clear ();
8797 return ;
8898 }
89- requireNonNull (partitions , "partitions is null" );
90- // Called concurrently by each DynamicFilterSourceOperator instance (when collection is over).
91- verify (expectedPartitionCount == PARTITION_COUNT_INITIAL_VALUE || partitions .size () < expectedPartitionCount );
92- // NOTE: may result in a bit more relaxed constraint if there are multiple columns and multiple rows.
93- // See the comment at TupleDomain::columnWiseUnion() for more details.
94- partitions .add (tupleDomain );
95- if (tupleDomain .isAll ()) {
96- result = tupleDomain ;
99+ collectedPartitionCount ++;
100+
101+ boolean allPartitionsCollected = expectedPartitionCount != null && collectedPartitionCount == expectedPartitionCount ;
102+ if (allPartitionsCollected ) {
103+ // run final compaction as previous concurrent compactions may have left more than a single domain
104+ unionSummaryDomains ();
97105 }
98- else if (partitions .size () == expectedPartitionCount ) {
99- // No more partitions are left to be processed.
100- if (partitions .isEmpty ()) {
101- result = TupleDomain .none ();
106+
107+ boolean sizeLimitExceeded = false ;
108+ TupleDomain <DynamicFilterId > summary = summaryDomains .poll ();
109+ // summary can be null as another concurrent summary compaction may be running
110+ if (summary != null ) {
111+ if (summary .getRetainedSizeInBytes (DynamicFilterId ::getRetainedSizeInBytes ) > domainSizeLimitInBytes ) {
112+ summary = summary .simplify (1 );
102113 }
103- else {
104- result = TupleDomain . columnWiseUnion ( partitions ) ;
114+ if ( summary . getRetainedSizeInBytes ( DynamicFilterId :: getRetainedSizeInBytes ) > domainSizeLimitInBytes ) {
115+ sizeLimitExceeded = true ;
105116 }
117+ summaryDomains .add (summary );
106118 }
107- else {
119+
120+ if (!allPartitionsCollected && !sizeLimitExceeded && !domain .isAll ()) {
108121 return ;
109122 }
123+
124+ if (sizeLimitExceeded || domain .isAll ()) {
125+ summaryDomains .clear ();
126+ result = TupleDomain .all ();
127+ }
128+ else {
129+ verify (expectedPartitionCount != null && collectedPartitionCount == expectedPartitionCount );
130+ verify (summaryDomains .size () == 1 );
131+ result = summaryDomains .poll ();
132+ verify (result != null );
133+ }
110134 collected = true ;
111- partitions = null ;
112135 }
113136
114- notifyConsumers ( result );
137+ collectors . forEach ( collector -> collector . accept ( convertTupleDomain ( result )) );
115138 }
116139
117140 @ Override
@@ -122,33 +145,43 @@ public void setPartitionCount(int partitionCount)
122145 if (collected ) {
123146 return ;
124147 }
125- checkState (expectedPartitionCount == PARTITION_COUNT_INITIAL_VALUE , "setPartitionCount should be called only once" );
126- requireNonNull (partitions , "partitions is null" );
148+ checkState (expectedPartitionCount == null , "setPartitionCount should be called only once" );
127149 expectedPartitionCount = partitionCount ;
128- if (partitions .size () == expectedPartitionCount ) {
129- // No more partitions are left to be processed.
130- if (partitions .isEmpty ()) {
131- result = TupleDomain .none ();
132- }
133- else {
134- result = TupleDomain .columnWiseUnion (partitions );
135- }
136- collected = true ;
137- partitions = null ;
150+ if (collectedPartitionCount < expectedPartitionCount ) {
151+ return ;
152+ }
153+ if (partitionCount == 0 ) {
154+ result = TupleDomain .all ();
138155 }
139156 else {
140- return ;
157+ // run final compaction as previous concurrent compactions may have left more than a single domain
158+ unionSummaryDomains ();
159+ verify (summaryDomains .size () == 1 );
160+ result = summaryDomains .poll ();
161+ verify (result != null );
141162 }
163+ collected = true ;
142164 }
143165
144- notifyConsumers ( result );
166+ collectors . forEach ( collector -> collector . accept ( convertTupleDomain ( result )) );
145167 }
146168
147- private void notifyConsumers ( TupleDomain < DynamicFilterId > result )
169+ private void unionSummaryDomains ( )
148170 {
149- requireNonNull (result , "result is null" );
150- Map <DynamicFilterId , Domain > dynamicFilterDomains = convertTupleDomain (result );
151- collectors .forEach (consumer -> consumer .accept (dynamicFilterDomains ));
171+ while (true ) {
172+ // This method is called every time a new domain is added to the summaryDomains queue.
173+ // In a normal situation (when there's no race) there should be no more than 2 domains in the queue.
174+ TupleDomain <DynamicFilterId > first = summaryDomains .poll ();
175+ if (first == null ) {
176+ return ;
177+ }
178+ TupleDomain <DynamicFilterId > second = summaryDomains .poll ();
179+ if (second == null ) {
180+ summaryDomains .add (first );
181+ return ;
182+ }
183+ summaryDomains .add (columnWiseUnion (first , second ));
184+ }
152185 }
153186
154187 private Map <DynamicFilterId , Domain > convertTupleDomain (TupleDomain <DynamicFilterId > result )
@@ -169,7 +202,8 @@ public static LocalDynamicFilterConsumer create(
169202 JoinNode planNode ,
170203 List <Type > buildSourceTypes ,
171204 Set <DynamicFilterId > collectedFilters ,
172- List <Consumer <Map <DynamicFilterId , Domain >>> collectors )
205+ List <Consumer <Map <DynamicFilterId , Domain >>> collectors ,
206+ DataSize domainSizeLimit )
173207 {
174208 checkArgument (!planNode .getDynamicFilters ().isEmpty (), "Join node dynamicFilters is empty." );
175209 checkArgument (!collectedFilters .isEmpty (), "Collected dynamic filters set is empty" );
@@ -193,7 +227,7 @@ public static LocalDynamicFilterConsumer create(
193227 .collect (toImmutableMap (
194228 Map .Entry ::getKey ,
195229 entry -> buildSourceTypes .get (entry .getValue ())));
196- return new LocalDynamicFilterConsumer (buildChannels , filterBuildTypes , collectors );
230+ return new LocalDynamicFilterConsumer (buildChannels , filterBuildTypes , collectors , domainSizeLimit );
197231 }
198232
199233 public Map <DynamicFilterId , Integer > getBuildChannels ()
@@ -206,9 +240,12 @@ public synchronized String toString()
206240 {
207241 return toStringHelper (this )
208242 .add ("buildChannels" , buildChannels )
243+ .add ("filterBuildTypes" , filterBuildTypes )
244+ .add ("domainSizeLimitInBytes" , domainSizeLimitInBytes )
209245 .add ("expectedPartitionCount" , expectedPartitionCount )
246+ .add ("collectedPartitionCount" , collectedPartitionCount )
210247 .add ("collected" , collected )
211- .add ("partitions " , partitions )
248+ .add ("summaryDomains " , summaryDomains )
212249 .toString ();
213250 }
214251}
0 commit comments