Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions core/trino-main/src/main/java/io/trino/FeaturesConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ public class FeaturesConfig
private boolean allowSetViewAuthorization;

private boolean hideInaccessibleColumns;
private boolean forceSpillingJoin;

public enum DataIntegrityVerification
{
Expand Down Expand Up @@ -477,4 +478,17 @@ public FeaturesConfig setAllowSetViewAuthorization(boolean allowSetViewAuthoriza
this.allowSetViewAuthorization = allowSetViewAuthorization;
return this;
}

public boolean isForceSpillingJoin()
{
return forceSpillingJoin;
}

@Config("force-spilling-join-operator")
@ConfigDescription("Force spilling join operator in favour of the non-spilling one even when there is no spill")
public FeaturesConfig setForceSpillingJoin(boolean forceSpillingJoin)
{
this.forceSpillingJoin = forceSpillingJoin;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ public final class SystemSessionProperties
public static final String ADAPTIVE_PARTIAL_AGGREGATION_UNIQUE_ROWS_RATIO_THRESHOLD = "adaptive_partial_aggregation_unique_rows_ratio_threshold";
public static final String JOIN_PARTITIONED_BUILD_MIN_ROW_COUNT = "join_partitioned_build_min_row_count";
public static final String USE_EXACT_PARTITIONING = "use_exact_partitioning";
public static final String FORCE_SPILLING_JOIN = "force_spilling_join";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -823,6 +824,11 @@ public SystemSessionProperties(
USE_EXACT_PARTITIONING,
"When enabled this forces data repartitioning unless the partitioning of upstream stage matches exactly what downstream stage expects",
optimizerConfig.isUseExactPartitioning(),
false),
booleanProperty(
FORCE_SPILLING_JOIN,
"Force the usage of spliing join operator in favor of the non-spilling one, even if spill is not enabled",
featuresConfig.isForceSpillingJoin(),
false));
}

Expand Down Expand Up @@ -1475,4 +1481,9 @@ public static boolean isUseExactPartitioning(Session session)
{
return session.getSystemProperty(USE_EXACT_PARTITIONING, Boolean.class);
}

public static boolean isForceSpillingOperator(Session session)
{
return session.getSystemProperty(FORCE_SPILLING_JOIN, Boolean.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package io.trino.operator;

import io.trino.operator.join.JoinBridgeManager;
import io.trino.operator.join.LookupSourceFactory;
import io.trino.spi.type.Type;
import io.trino.spiller.PartitioningSpillerFactory;
import io.trino.sql.planner.plan.PlanNodeId;
Expand All @@ -29,10 +28,11 @@ public interface OperatorFactories
OperatorFactory innerJoin(
int operatorId,
PlanNodeId planNodeId,
JoinBridgeManager<? extends LookupSourceFactory> lookupSourceFactory,
JoinBridgeManager<?> lookupSourceFactory,
boolean outputSingleMatch,
boolean waitForBuild,
boolean hasFilter,
boolean spillingEnabled,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can skip OperatorFactories for "new" join. Won't serve any purpose

List<Type> probeTypes,
List<Integer> probeJoinChannel,
OptionalInt probeHashChannel,
Expand All @@ -44,9 +44,10 @@ OperatorFactory innerJoin(
OperatorFactory probeOuterJoin(
int operatorId,
PlanNodeId planNodeId,
JoinBridgeManager<? extends LookupSourceFactory> lookupSourceFactory,
JoinBridgeManager<?> lookupSourceFactory,
boolean outputSingleMatch,
boolean hasFilter,
boolean spillingEnabled,
List<Type> probeTypes,
List<Integer> probeJoinChannel,
OptionalInt probeHashChannel,
Expand All @@ -58,9 +59,10 @@ OperatorFactory probeOuterJoin(
OperatorFactory lookupOuterJoin(
int operatorId,
PlanNodeId planNodeId,
JoinBridgeManager<? extends LookupSourceFactory> lookupSourceFactory,
JoinBridgeManager<?> lookupSourceFactory,
boolean waitForBuild,
boolean hasFilter,
boolean spillingEnabled,
List<Type> probeTypes,
List<Integer> probeJoinChannel,
OptionalInt probeHashChannel,
Expand All @@ -72,8 +74,9 @@ OperatorFactory lookupOuterJoin(
OperatorFactory fullOuterJoin(
int operatorId,
PlanNodeId planNodeId,
JoinBridgeManager<? extends LookupSourceFactory> lookupSourceFactory,
JoinBridgeManager<?> lookupSourceFactory,
boolean hasFilter,
boolean spillingEnabled,
List<Type> probeTypes,
List<Integer> probeJoinChannel,
OptionalInt probeHashChannel,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,11 @@ public class TrinoOperatorFactories
public OperatorFactory innerJoin(
int operatorId,
PlanNodeId planNodeId,
JoinBridgeManager<? extends LookupSourceFactory> lookupSourceFactory,
JoinBridgeManager<?> lookupSourceFactory,
boolean outputSingleMatch,
boolean waitForBuild,
boolean hasFilter,
boolean spillingEnabled,
List<Type> probeTypes,
List<Integer> probeJoinChannel,
OptionalInt probeHashChannel,
Expand All @@ -64,6 +65,7 @@ public OperatorFactory innerJoin(
INNER,
outputSingleMatch,
waitForBuild,
spillingEnabled,
totalOperatorsCount,
partitioningSpillerFactory,
blockTypeOperators);
Expand All @@ -73,9 +75,10 @@ public OperatorFactory innerJoin(
public OperatorFactory probeOuterJoin(
int operatorId,
PlanNodeId planNodeId,
JoinBridgeManager<? extends LookupSourceFactory> lookupSourceFactory,
JoinBridgeManager<?> lookupSourceFactory,
boolean outputSingleMatch,
boolean hasFilter,
boolean spillingEnabled,
List<Type> probeTypes,
List<Integer> probeJoinChannel,
OptionalInt probeHashChannel,
Expand All @@ -95,6 +98,7 @@ public OperatorFactory probeOuterJoin(
PROBE_OUTER,
outputSingleMatch,
false,
spillingEnabled,
totalOperatorsCount,
partitioningSpillerFactory,
blockTypeOperators);
Expand All @@ -104,9 +108,10 @@ public OperatorFactory probeOuterJoin(
public OperatorFactory lookupOuterJoin(
int operatorId,
PlanNodeId planNodeId,
JoinBridgeManager<? extends LookupSourceFactory> lookupSourceFactory,
JoinBridgeManager<?> lookupSourceFactory,
boolean waitForBuild,
boolean hasFilter,
boolean spillingEnabled,
List<Type> probeTypes,
List<Integer> probeJoinChannel,
OptionalInt probeHashChannel,
Expand All @@ -126,6 +131,7 @@ public OperatorFactory lookupOuterJoin(
LOOKUP_OUTER,
false,
waitForBuild,
spillingEnabled,
totalOperatorsCount,
partitioningSpillerFactory,
blockTypeOperators);
Expand All @@ -135,8 +141,9 @@ public OperatorFactory lookupOuterJoin(
public OperatorFactory fullOuterJoin(
int operatorId,
PlanNodeId planNodeId,
JoinBridgeManager<? extends LookupSourceFactory> lookupSourceFactory,
JoinBridgeManager<?> lookupSourceFactory,
boolean hasFilter,
boolean spillingEnabled,
List<Type> probeTypes,
List<Integer> probeJoinChannel,
OptionalInt probeHashChannel,
Expand All @@ -156,6 +163,7 @@ public OperatorFactory fullOuterJoin(
FULL_OUTER,
false,
false,
spillingEnabled,
totalOperatorsCount,
partitioningSpillerFactory,
blockTypeOperators);
Expand All @@ -171,14 +179,15 @@ private static List<Integer> rangeList(int endExclusive)
private OperatorFactory createJoinOperatorFactory(
int operatorId,
PlanNodeId planNodeId,
JoinBridgeManager<? extends LookupSourceFactory> lookupSourceFactoryManager,
JoinBridgeManager<?> lookupSourceFactoryManager,
List<Type> probeTypes,
List<Integer> probeJoinChannel,
OptionalInt probeHashChannel,
List<Integer> probeOutputChannels,
JoinType joinType,
boolean outputSingleMatch,
boolean waitForBuild,
boolean spillingEnabled,
OptionalInt totalOperatorsCount,
PartitioningSpillerFactory partitioningSpillerFactory,
BlockTypeOperators blockTypeOperators)
Expand All @@ -187,21 +196,39 @@ private OperatorFactory createJoinOperatorFactory(
.map(probeTypes::get)
.collect(toImmutableList());

return new LookupJoinOperatorFactory(
operatorId,
planNodeId,
lookupSourceFactoryManager,
probeTypes,
probeOutputChannelTypes,
lookupSourceFactoryManager.getBuildOutputTypes(),
joinType,
outputSingleMatch,
waitForBuild,
new JoinProbeFactory(probeOutputChannels.stream().mapToInt(i -> i).toArray(), probeJoinChannel, probeHashChannel),
blockTypeOperators,
totalOperatorsCount,
probeJoinChannel,
probeHashChannel,
partitioningSpillerFactory);
if (spillingEnabled) {
return new LookupJoinOperatorFactory(
operatorId,
planNodeId,
(JoinBridgeManager<? extends LookupSourceFactory>) lookupSourceFactoryManager,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should create a new type hierarchy or new operator factory methods rather than explicitly cast here

probeTypes,
probeOutputChannelTypes,
lookupSourceFactoryManager.getBuildOutputTypes(),
joinType,
outputSingleMatch,
waitForBuild,
new JoinProbeFactory(probeOutputChannels.stream().mapToInt(i -> i).toArray(), probeJoinChannel, probeHashChannel),
blockTypeOperators,
totalOperatorsCount,
probeJoinChannel,
probeHashChannel,
partitioningSpillerFactory);
}
else {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

else is redundant

return new io.trino.operator.join.unspilled.LookupJoinOperatorFactory(
operatorId,
planNodeId,
(JoinBridgeManager<? extends io.trino.operator.join.unspilled.PartitionedLookupSourceFactory>) lookupSourceFactoryManager,
probeTypes,
probeOutputChannelTypes,
lookupSourceFactoryManager.getBuildOutputTypes(),
joinType,
outputSingleMatch,
waitForBuild,
new JoinProbeFactory(probeOutputChannels.stream().mapToInt(i -> i).toArray(), probeJoinChannel, probeHashChannel),
blockTypeOperators,
probeJoinChannel,
probeHashChannel);
}
}
}
Loading