diff --git a/core/trino-main/pom.xml b/core/trino-main/pom.xml index 34b4d3e09011..147822c45cba 100644 --- a/core/trino-main/pom.xml +++ b/core/trino-main/pom.xml @@ -40,6 +40,12 @@ jackson-databind + + com.github.ishugaliy + allgood-consistent-hash + 1.0.0 + + com.google.errorprone error_prone_annotations diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/ConsistentHashingHostAddressProvider.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/ConsistentHashingAddressProvider.java similarity index 60% rename from lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/ConsistentHashingHostAddressProvider.java rename to core/trino-main/src/main/java/io/trino/execution/scheduler/ConsistentHashingAddressProvider.java index 0e67ac201edb..28d48d579117 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/ConsistentHashingHostAddressProvider.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/ConsistentHashingAddressProvider.java @@ -11,82 +11,89 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.trino.filesystem.cache; +package io.trino.execution.scheduler; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Sets; import com.google.inject.Inject; import io.airlift.log.Logger; +import io.trino.node.InternalNode; +import io.trino.node.InternalNodeManager; +import io.trino.node.NodeState; import io.trino.spi.HostAddress; -import io.trino.spi.Node; -import io.trino.spi.NodeManager; -import jakarta.annotation.PostConstruct; -import jakarta.annotation.PreDestroy; import org.ishugaliy.allgood.consistent.hash.ConsistentHash; import org.ishugaliy.allgood.consistent.hash.HashRing; import org.ishugaliy.allgood.consistent.hash.hasher.DefaultHasher; +import org.ishugaliy.allgood.consistent.hash.node.Node; import java.util.Comparator; import java.util.List; import java.util.Set; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableSet.toImmutableSet; -import static io.airlift.concurrent.Threads.daemonThreadsNamed; +import static io.airlift.units.Duration.nanosSince; import static java.util.Objects.requireNonNull; -import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; +import static java.util.concurrent.TimeUnit.SECONDS; -public class ConsistentHashingHostAddressProvider - implements CachingHostAddressProvider +/** + * Maps a split's cache key to a stable list of worker host addresses using a consistent-hash ring. + * Worker membership is refreshed lazily on access (bounded by a short time window), so no + * background executor is needed. + */ +public class ConsistentHashingAddressProvider { - private static final Logger log = Logger.get(ConsistentHashingHostAddressProvider.class); + private static final Logger log = Logger.get(ConsistentHashingAddressProvider.class); + private static final long WORKER_NODES_CACHE_TIMEOUT_SECS = 5; - private final NodeManager nodeManager; - private final ScheduledExecutorService hashRingUpdater = newSingleThreadScheduledExecutor(daemonThreadsNamed("hash-ring-refresher-%s")); - private final int replicationFactor; + private final InternalNodeManager nodeManager; + private final int preferredHostsCount; private final Comparator hostAddressComparator = Comparator.comparing(HostAddress::getHostText).thenComparing(HostAddress::getPort); private final ConsistentHash consistentHashRing = HashRing.newBuilder() .hasher(DefaultHasher.METRO_HASH) .build(); + private volatile long lastRefreshTime; @Inject - public ConsistentHashingHostAddressProvider(NodeManager nodeManager, ConsistentHashingHostAddressProviderConfig configuration) + public ConsistentHashingAddressProvider(InternalNodeManager nodeManager, ConsistentHashingAddressProviderConfig config) { this.nodeManager = requireNonNull(nodeManager, "nodeManager is null"); - this.replicationFactor = configuration.getPreferredHostsCount(); + this.preferredHostsCount = config.getPreferredHostsCount(); + refreshHashRing(); } - @Override - public List getHosts(String splitKey, List defaultAddresses) + public List getHosts(String cacheKey) { - return consistentHashRing.locate(splitKey, replicationFactor) + refreshHashRingIfNeeded(); + return consistentHashRing.locate(cacheKey, preferredHostsCount) .stream() .map(TrinoNode::getHostAndPort) .sorted(hostAddressComparator) .collect(toImmutableList()); } - @PostConstruct - public void startRefreshingHashRing() - { - hashRingUpdater.scheduleWithFixedDelay(this::refreshHashRing, 5, 5, TimeUnit.SECONDS); - refreshHashRing(); - } - - @PreDestroy - public void destroy() + private void refreshHashRingIfNeeded() { - hashRingUpdater.shutdownNow(); + if (nanosSince(lastRefreshTime).getValue(SECONDS) > WORKER_NODES_CACHE_TIMEOUT_SECS) { + // Double-checked locking to reduce contention on the hash ring's write path + synchronized (this) { + if (nanosSince(lastRefreshTime).getValue(SECONDS) > WORKER_NODES_CACHE_TIMEOUT_SECS) { + refreshHashRing(); + } + } + } } @VisibleForTesting synchronized void refreshHashRing() { try { - Set trinoNodes = nodeManager.getWorkerNodes().stream().map(TrinoNode::of).collect(toImmutableSet()); + Set trinoNodes = nodeManager.getNodes(NodeState.ACTIVE).stream() + .filter(node -> !node.isCoordinator()) + .map(TrinoNode::of) + .collect(toImmutableSet()); + lastRefreshTime = System.nanoTime(); Set hashRingNodes = consistentHashRing.getNodes(); Set removedNodes = Sets.difference(hashRingNodes, trinoNodes); Set newNodes = Sets.difference(trinoNodes, hashRingNodes); @@ -104,9 +111,9 @@ synchronized void refreshHashRing() } private record TrinoNode(String nodeIdentifier, HostAddress hostAndPort) - implements org.ishugaliy.allgood.consistent.hash.node.Node + implements Node { - public static TrinoNode of(Node node) + public static TrinoNode of(InternalNode node) { return new TrinoNode(node.getNodeIdentifier(), node.getHostAndPort()); } diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/ConsistentHashingHostAddressProviderConfig.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/ConsistentHashingAddressProviderConfig.java similarity index 65% rename from lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/ConsistentHashingHostAddressProviderConfig.java rename to core/trino-main/src/main/java/io/trino/execution/scheduler/ConsistentHashingAddressProviderConfig.java index c1efe3579682..50737cabe4c9 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/ConsistentHashingHostAddressProviderConfig.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/ConsistentHashingAddressProviderConfig.java @@ -11,25 +11,27 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.trino.filesystem.cache; +package io.trino.execution.scheduler; import io.airlift.configuration.Config; import io.airlift.configuration.ConfigDescription; +import jakarta.validation.constraints.Min; -public class ConsistentHashingHostAddressProviderConfig +public class ConsistentHashingAddressProviderConfig { private int preferredHostsCount = 2; - @Config("fs.cache.preferred-hosts-count") - @ConfigDescription("The number of preferred nodes for caching a file. Defaults to 2.") - public ConsistentHashingHostAddressProviderConfig setPreferredHostsCount(int preferredHostsCount) + @Min(1) + public int getPreferredHostsCount() { - this.preferredHostsCount = preferredHostsCount; - return this; + return preferredHostsCount; } - public int getPreferredHostsCount() + @Config("node-scheduler.cache-preferred-hosts-count") + @ConfigDescription("Number of preferred worker hosts the scheduler derives from a split's cache key") + public ConsistentHashingAddressProviderConfig setPreferredHostsCount(int preferredHostsCount) { - return this.preferredHostsCount; + this.preferredHostsCount = preferredHostsCount; + return this; } } diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/TopologyAwareNodeSelector.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/TopologyAwareNodeSelector.java index 09a4532b2a5a..cdae44282e34 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/TopologyAwareNodeSelector.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/TopologyAwareNodeSelector.java @@ -63,6 +63,7 @@ public class TopologyAwareNodeSelector private final int maxUnacknowledgedSplitsPerTask; private final List topologicalSplitCounters; private final NetworkTopology networkTopology; + private final ConsistentHashingAddressProvider consistentHashingAddressProvider; public TopologyAwareNodeSelector( InternalNode currentNode, @@ -74,7 +75,8 @@ public TopologyAwareNodeSelector( long maxPendingSplitsWeightPerTask, int maxUnacknowledgedSplitsPerTask, List topologicalSplitCounters, - NetworkTopology networkTopology) + NetworkTopology networkTopology, + ConsistentHashingAddressProvider consistentHashingAddressProvider) { this.currentNode = requireNonNull(currentNode, "currentNode is null"); this.nodeTaskMap = requireNonNull(nodeTaskMap, "nodeTaskMap is null"); @@ -87,6 +89,7 @@ public TopologyAwareNodeSelector( checkArgument(maxUnacknowledgedSplitsPerTask > 0, "maxUnacknowledgedSplitsPerTask must be > 0, found: %s", maxUnacknowledgedSplitsPerTask); this.topologicalSplitCounters = requireNonNull(topologicalSplitCounters, "topologicalSplitCounters is null"); this.networkTopology = requireNonNull(networkTopology, "networkTopology is null"); + this.consistentHashingAddressProvider = requireNonNull(consistentHashingAddressProvider, "consistentHashingAddressProvider is null"); } @Override @@ -149,7 +152,10 @@ else if (!splitWaitingForAnyNode) { int depth = topologicalSplitCounters.size() - 1; int chosenDepth = 0; Set locations = new HashSet<>(); - for (HostAddress host : split.getAddresses()) { + List preferredAddresses = split.getConnectorSplit().getAffinityKey() + .map(consistentHashingAddressProvider::getHosts) + .orElseGet(split::getAddresses); + for (HostAddress host : preferredAddresses) { locations.add(networkTopology.locate(host)); } if (locations.isEmpty()) { diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/TopologyAwareNodeSelectorFactory.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/TopologyAwareNodeSelectorFactory.java index 908b883b4d32..65952d2f0060 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/TopologyAwareNodeSelectorFactory.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/TopologyAwareNodeSelectorFactory.java @@ -59,6 +59,7 @@ public class TopologyAwareNodeSelectorFactory private final NetworkTopology networkTopology; private final InternalNode currentNode; private final InternalNodeManager nodeManager; + private final ConsistentHashingAddressProvider consistentHashingAddressProvider; private final int minCandidates; private final boolean includeCoordinator; private final long maxSplitsWeightPerNode; @@ -75,7 +76,8 @@ public TopologyAwareNodeSelectorFactory( InternalNodeManager nodeManager, NodeSchedulerConfig schedulerConfig, NodeTaskMap nodeTaskMap, - TopologyAwareNodeSelectorConfig topologyConfig) + TopologyAwareNodeSelectorConfig topologyConfig, + ConsistentHashingAddressProvider consistentHashingAddressProvider) { requireNonNull(networkTopology, "networkTopology is null"); requireNonNull(currentNode, "currentNode is null"); @@ -85,6 +87,7 @@ public TopologyAwareNodeSelectorFactory( this.networkTopology = networkTopology; this.currentNode = currentNode; this.nodeManager = nodeManager; + this.consistentHashingAddressProvider = requireNonNull(consistentHashingAddressProvider, "consistentHashingAddressProvider is null"); this.minCandidates = schedulerConfig.getMinCandidates(); this.includeCoordinator = schedulerConfig.isIncludeCoordinator(); this.nodeTaskMap = requireNonNull(nodeTaskMap, "nodeTaskMap is null"); @@ -136,7 +139,8 @@ public NodeSelector createNodeSelector(Session session) minPendingSplitsWeightPerTask, getMaxUnacknowledgedSplitsPerTask(session), placementCounters, - networkTopology); + networkTopology, + consistentHashingAddressProvider); } private NodeMap createNodeMap() diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/UniformNodeSelector.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/UniformNodeSelector.java index 661032a220b6..3d75be06b336 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/UniformNodeSelector.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/UniformNodeSelector.java @@ -27,6 +27,7 @@ import io.trino.execution.scheduler.NodeSchedulerConfig.SplitsBalancingPolicy; import io.trino.metadata.Split; import io.trino.node.InternalNode; +import io.trino.spi.HostAddress; import io.trino.spi.TrinoException; import jakarta.annotation.Nullable; @@ -68,6 +69,7 @@ public class UniformNodeSelector private final SplitsBalancingPolicy splitsBalancingPolicy; private final boolean optimizedLocalScheduling; private final QueueSizeAdjuster queueSizeAdjuster; + private final ConsistentHashingAddressProvider consistentHashingAddressProvider; public UniformNodeSelector( InternalNode currentNode, @@ -80,7 +82,8 @@ public UniformNodeSelector( long maxAdjustedPendingSplitsWeightPerTask, int maxUnacknowledgedSplitsPerTask, SplitsBalancingPolicy splitsBalancingPolicy, - boolean optimizedLocalScheduling) + boolean optimizedLocalScheduling, + ConsistentHashingAddressProvider consistentHashingAddressProvider) { this(currentNode, nodeTaskMap, @@ -92,7 +95,8 @@ public UniformNodeSelector( maxUnacknowledgedSplitsPerTask, splitsBalancingPolicy, optimizedLocalScheduling, - new QueueSizeAdjuster(minPendingSplitsWeightPerTask, maxAdjustedPendingSplitsWeightPerTask)); + new QueueSizeAdjuster(minPendingSplitsWeightPerTask, maxAdjustedPendingSplitsWeightPerTask), + consistentHashingAddressProvider); } @VisibleForTesting @@ -107,7 +111,8 @@ public UniformNodeSelector( int maxUnacknowledgedSplitsPerTask, SplitsBalancingPolicy splitsBalancingPolicy, boolean optimizedLocalScheduling, - QueueSizeAdjuster queueSizeAdjuster) + QueueSizeAdjuster queueSizeAdjuster, + ConsistentHashingAddressProvider consistentHashingAddressProvider) { this.currentNode = requireNonNull(currentNode, "currentNode is null"); this.nodeTaskMap = requireNonNull(nodeTaskMap, "nodeTaskMap is null"); @@ -121,6 +126,7 @@ public UniformNodeSelector( this.splitsBalancingPolicy = requireNonNull(splitsBalancingPolicy, "splitsBalancingPolicy is null"); this.optimizedLocalScheduling = optimizedLocalScheduling; this.queueSizeAdjuster = queueSizeAdjuster; + this.consistentHashingAddressProvider = requireNonNull(consistentHashingAddressProvider, "consistentHashingAddressProvider is null"); } @Override @@ -173,8 +179,11 @@ public SplitPlacementResult computeAssignments(Set splits, List preferredAddresses = split.getConnectorSplit().getAffinityKey() + .map(consistentHashingAddressProvider::getHosts) + .orElseGet(split::getAddresses); + if (optimizedLocalScheduling && !preferredAddresses.isEmpty()) { + candidateNodes = selectExactNodes(nodeMap, preferredAddresses, includeCoordinator); if (candidateNodes.isEmpty()) { // choose any other node if preferred node is not available candidateNodes = selectNodes(minCandidates, randomCandidates); diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/UniformNodeSelectorFactory.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/UniformNodeSelectorFactory.java index 838d0383e68c..63f1c38a5bc2 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/UniformNodeSelectorFactory.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/UniformNodeSelectorFactory.java @@ -56,6 +56,7 @@ public class UniformNodeSelectorFactory private final InternalNode currentNode; private final InternalNodeManager nodeManager; + private final ConsistentHashingAddressProvider consistentHashingAddressProvider; private final int minCandidates; private final boolean includeCoordinator; private final long maxSplitsWeightPerNode; @@ -71,9 +72,10 @@ public UniformNodeSelectorFactory( InternalNode currentNode, InternalNodeManager nodeManager, NodeSchedulerConfig config, - NodeTaskMap nodeTaskMap) + NodeTaskMap nodeTaskMap, + ConsistentHashingAddressProvider consistentHashingAddressProvider) { - this(currentNode, nodeManager, config, nodeTaskMap, new Duration(5, SECONDS)); + this(currentNode, nodeManager, config, nodeTaskMap, consistentHashingAddressProvider, new Duration(5, SECONDS)); } @VisibleForTesting @@ -82,10 +84,12 @@ public UniformNodeSelectorFactory( InternalNodeManager nodeManager, NodeSchedulerConfig config, NodeTaskMap nodeTaskMap, + ConsistentHashingAddressProvider consistentHashingAddressProvider, Duration nodeMapMemoizationDuration) { this.currentNode = requireNonNull(currentNode, "currentNode is null"); this.nodeManager = requireNonNull(nodeManager, "nodeManager is null"); + this.consistentHashingAddressProvider = requireNonNull(consistentHashingAddressProvider, "consistentHashingAddressProvider is null"); this.minCandidates = config.getMinCandidates(); this.includeCoordinator = config.isIncludeCoordinator(); this.splitsBalancingPolicy = config.getSplitsBalancingPolicy(); @@ -128,7 +132,8 @@ public NodeSelector createNodeSelector(Session session) maxAdjustedPendingSplitsWeightPerTask, getMaxUnacknowledgedSplitsPerTask(session), splitsBalancingPolicy, - optimizedLocalScheduling); + optimizedLocalScheduling, + consistentHashingAddressProvider); } private NodeMap createNodeMap() diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/ArbitraryDistributionSplitAssigner.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/ArbitraryDistributionSplitAssigner.java index 0477363a167e..750cb30e840b 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/ArbitraryDistributionSplitAssigner.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/ArbitraryDistributionSplitAssigner.java @@ -20,6 +20,7 @@ import com.google.common.collect.ListMultimap; import io.trino.connector.CatalogHandle; import io.trino.exchange.SpoolingExchangeInput; +import io.trino.execution.scheduler.ConsistentHashingAddressProvider; import io.trino.metadata.Split; import io.trino.spi.HostAddress; import io.trino.spi.SplitWeight; @@ -59,6 +60,7 @@ class ArbitraryDistributionSplitAssigner private final long maxTargetPartitionSizeInBytes; private final long standardSplitSizeInBytes; private final int maxTaskSplitCount; + private final ConsistentHashingAddressProvider consistentHashingAddressProvider; private int nextPartitionId; private int adaptiveCounter; @@ -81,7 +83,8 @@ class ArbitraryDistributionSplitAssigner long minTargetPartitionSizeInBytes, long maxTargetPartitionSizeInBytes, long standardSplitSizeInBytes, - int maxTaskSplitCount) + int maxTaskSplitCount, + ConsistentHashingAddressProvider consistentHashingAddressProvider) { this.catalogRequirement = requireNonNull(catalogRequirement, "catalogRequirement is null"); this.partitionedSources = ImmutableSet.copyOf(requireNonNull(partitionedSources, "partitionedSources is null")); @@ -96,6 +99,7 @@ class ArbitraryDistributionSplitAssigner this.maxTargetPartitionSizeInBytes = maxTargetPartitionSizeInBytes; this.standardSplitSizeInBytes = standardSplitSizeInBytes; this.maxTaskSplitCount = maxTaskSplitCount; + this.consistentHashingAddressProvider = requireNonNull(consistentHashingAddressProvider, "consistentHashingAddressProvider is null"); this.targetPartitionSizeInBytes = minTargetPartitionSizeInBytes; this.roundedTargetPartitionSizeInBytes = minTargetPartitionSizeInBytes; @@ -365,11 +369,14 @@ private long rank(HostAddress address) private NodeRequirements getNodeRequirements(Split split) { - if (split.getAddresses().isEmpty()) { + List preferredAddresses = split.getConnectorSplit().getAffinityKey() + .map(consistentHashingAddressProvider::getHosts) + .orElseGet(split::getAddresses); + if (preferredAddresses.isEmpty()) { checkArgument(split.isRemotelyAccessible(), "split is not remotely accessible but the list of hosts is empty: %s", split); return new NodeRequirements(catalogRequirement, Optional.empty(), true); } - HostAddress selectedAddress = split.getAddresses().stream() + HostAddress selectedAddress = preferredAddresses.stream() .min(Comparator.comparingLong(this::rank)) .orElseThrow(); return new NodeRequirements(catalogRequirement, Optional.of(selectedAddress), split.isRemotelyAccessible()); diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenTaskSourceFactory.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenTaskSourceFactory.java index 0d255b840004..4883eef98a9e 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenTaskSourceFactory.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenTaskSourceFactory.java @@ -21,6 +21,7 @@ import io.trino.execution.ForQueryExecution; import io.trino.execution.QueryManagerConfig; import io.trino.execution.TableExecuteContextManager; +import io.trino.execution.scheduler.ConsistentHashingAddressProvider; import io.trino.execution.scheduler.OutputDataSizeEstimate; import io.trino.node.InternalNode; import io.trino.node.InternalNodeManager; @@ -78,6 +79,7 @@ public class EventDrivenTaskSourceFactory private final InternalNode currentNode; private final InternalNodeManager nodeManager; private final TableExecuteContextManager tableExecuteContextManager; + private final ConsistentHashingAddressProvider consistentHashingAddressProvider; private final int splitBatchSize; @Inject @@ -87,6 +89,7 @@ public EventDrivenTaskSourceFactory( InternalNode currentNode, InternalNodeManager nodeManager, TableExecuteContextManager tableExecuteContextManager, + ConsistentHashingAddressProvider consistentHashingAddressProvider, QueryManagerConfig queryManagerConfig) { this( @@ -95,6 +98,7 @@ public EventDrivenTaskSourceFactory( currentNode, nodeManager, tableExecuteContextManager, + consistentHashingAddressProvider, requireNonNull(queryManagerConfig, "queryManagerConfig is null").getScheduleSplitBatchSize()); } @@ -104,6 +108,7 @@ public EventDrivenTaskSourceFactory( InternalNode currentNode, InternalNodeManager nodeManager, TableExecuteContextManager tableExecuteContextManager, + ConsistentHashingAddressProvider consistentHashingAddressProvider, int splitBatchSize) { this.splitSourceFactory = requireNonNull(splitSourceFactory, "splitSourceFactory is null"); @@ -111,6 +116,7 @@ public EventDrivenTaskSourceFactory( this.currentNode = requireNonNull(currentNode, "currentNode is null"); this.nodeManager = requireNonNull(nodeManager, "nodeManager is null"); this.tableExecuteContextManager = requireNonNull(tableExecuteContextManager, "tableExecuteContextManager is null"); + this.consistentHashingAddressProvider = requireNonNull(consistentHashingAddressProvider, "consistentHashingAddressProvider is null"); this.splitBatchSize = splitBatchSize; } @@ -215,7 +221,8 @@ private SplitAssigner createSplitAssigner( arbitraryDistributionComputeTaskTargetSizeInBytesMin, arbitraryDistributionComputeTaskTargetSizeInBytesMax, standardSplitSizeInBytes, - maxArbitraryDistributionTaskSplitCount); + maxArbitraryDistributionTaskSplitCount, + consistentHashingAddressProvider); } if (partitioning.equals(SCALED_WRITER_ROUND_ROBIN_DISTRIBUTION)) { @@ -228,7 +235,8 @@ private SplitAssigner createSplitAssigner( arbitraryDistributionWriteTaskTargetSizeInBytesMin, arbitraryDistributionWriteTaskTargetSizeInBytesMax, standardSplitSizeInBytes, - maxArbitraryDistributionTaskSplitCount); + maxArbitraryDistributionTaskSplitCount, + consistentHashingAddressProvider); } if (partitioning.equals(FIXED_HASH_DISTRIBUTION)) { return HashDistributionSplitAssigner.create( diff --git a/core/trino-main/src/main/java/io/trino/metadata/Split.java b/core/trino-main/src/main/java/io/trino/metadata/Split.java index 240071a4a76b..2a46d23e61aa 100644 --- a/core/trino-main/src/main/java/io/trino/metadata/Split.java +++ b/core/trino-main/src/main/java/io/trino/metadata/Split.java @@ -23,6 +23,7 @@ import java.util.List; import static com.google.common.base.MoreObjects.toStringHelper; +import static com.google.common.base.Preconditions.checkArgument; import static io.airlift.slice.SizeOf.instanceSize; import static java.util.Objects.requireNonNull; @@ -40,6 +41,8 @@ public Split( { this.catalogHandle = requireNonNull(catalogHandle, "catalogHandle is null"); this.connectorSplit = requireNonNull(connectorSplit, "connectorSplit is null"); + checkArgument(connectorSplit.getAffinityKey().isEmpty() || connectorSplit.isRemotelyAccessible(), + "Split with an affinity key must be remotely accessible: %s", connectorSplit); } @JsonProperty diff --git a/core/trino-main/src/main/java/io/trino/server/CoordinatorModule.java b/core/trino-main/src/main/java/io/trino/server/CoordinatorModule.java index 0ac029a8953b..1262f0764511 100644 --- a/core/trino-main/src/main/java/io/trino/server/CoordinatorModule.java +++ b/core/trino-main/src/main/java/io/trino/server/CoordinatorModule.java @@ -70,6 +70,8 @@ import io.trino.execution.resourcegroups.LegacyResourceGroupConfigurationManager; import io.trino.execution.resourcegroups.ResourceGroupInfoProvider; import io.trino.execution.resourcegroups.ResourceGroupManager; +import io.trino.execution.scheduler.ConsistentHashingAddressProvider; +import io.trino.execution.scheduler.ConsistentHashingAddressProviderConfig; import io.trino.execution.scheduler.NodeScheduler; import io.trino.execution.scheduler.NodeSchedulerConfig; import io.trino.execution.scheduler.SplitSchedulerStats; @@ -272,6 +274,10 @@ protected void setup(Binder binder) binder.bind(NodeTaskMap.class).in(Scopes.SINGLETON); newExporter(binder).export(NodeScheduler.class).withGeneratedName(); + // consistent hashing address provider used by the scheduler to place splits that carry a cache key + configBinder(binder).bindConfig(ConsistentHashingAddressProviderConfig.class); + binder.bind(ConsistentHashingAddressProvider.class).in(Scopes.SINGLETON); + // network topology switch (buildConfigObject(NodeSchedulerConfig.class).getNodeSchedulerPolicy()) { case UNIFORM -> install(new UniformNodeSelectorModule()); diff --git a/core/trino-main/src/main/java/io/trino/testing/PlanTester.java b/core/trino-main/src/main/java/io/trino/testing/PlanTester.java index ea6c90e15fec..5579fb5c7256 100644 --- a/core/trino-main/src/main/java/io/trino/testing/PlanTester.java +++ b/core/trino-main/src/main/java/io/trino/testing/PlanTester.java @@ -78,6 +78,8 @@ import io.trino.execution.TaskManagerConfig; import io.trino.execution.querystats.PlanOptimizersStatsCollector; import io.trino.execution.resourcegroups.NoOpResourceGroupManager; +import io.trino.execution.scheduler.ConsistentHashingAddressProvider; +import io.trino.execution.scheduler.ConsistentHashingAddressProviderConfig; import io.trino.execution.scheduler.NodeScheduler; import io.trino.execution.scheduler.NodeSchedulerConfig; import io.trino.execution.scheduler.UniformNodeSelectorFactory; @@ -433,7 +435,8 @@ TypeSignature.class, new TypeSignatureDeserializer(), this.pageSourceManager = new PageSourceManager(createPageSourceProviderFactory(catalogManager)); this.pageSinkManager = new PageSinkManager(createPageSinkProvider(catalogManager)); this.indexManager = new IndexManager(createIndexProvider(catalogManager)); - NodeScheduler nodeScheduler = new NodeScheduler(new UniformNodeSelectorFactory(CURRENT_NODE, nodeManager, nodeSchedulerConfig, new NodeTaskMap(finalizerService))); + ConsistentHashingAddressProvider consistentHashingAddressProvider = new ConsistentHashingAddressProvider(nodeManager, new ConsistentHashingAddressProviderConfig()); + NodeScheduler nodeScheduler = new NodeScheduler(new UniformNodeSelectorFactory(CURRENT_NODE, nodeManager, nodeSchedulerConfig, new NodeTaskMap(finalizerService), consistentHashingAddressProvider)); this.sessionPropertyManager = createSessionPropertyManager(catalogManager, taskManagerConfig, optimizerConfig); this.nodePartitioningManager = new NodePartitioningManager(nodeScheduler, createNodePartitioningProvider(catalogManager)); this.partitionFunctionProvider = new PartitionFunctionProvider(hashCompiler, createNodePartitioningProvider(catalogManager)); diff --git a/core/trino-main/src/test/java/io/trino/execution/BenchmarkNodeScheduler.java b/core/trino-main/src/test/java/io/trino/execution/BenchmarkNodeScheduler.java index 9534c4e72b55..1e00422e9dba 100644 --- a/core/trino-main/src/test/java/io/trino/execution/BenchmarkNodeScheduler.java +++ b/core/trino-main/src/test/java/io/trino/execution/BenchmarkNodeScheduler.java @@ -19,6 +19,8 @@ import com.google.common.collect.Iterators; import com.google.common.collect.Multimap; import io.trino.Session; +import io.trino.execution.scheduler.ConsistentHashingAddressProvider; +import io.trino.execution.scheduler.ConsistentHashingAddressProviderConfig; import io.trino.execution.scheduler.FlatNetworkTopology; import io.trino.execution.scheduler.NetworkLocation; import io.trino.execution.scheduler.NetworkTopology; @@ -194,13 +196,14 @@ private NodeSelectorFactory getNodeSelectorFactory(NodeTaskMap nodeTaskMap) { InternalNodeManager nodeManager = TestingInternalNodeManager.createDefault(); NodeSchedulerConfig nodeSchedulerConfig = getNodeSchedulerConfig(); + ConsistentHashingAddressProvider consistentHashingAddressProvider = new ConsistentHashingAddressProvider(nodeManager, new ConsistentHashingAddressProviderConfig()); switch (policy) { case "uniform": - return new UniformNodeSelectorFactory(CURRENT_NODE, nodeManager, nodeSchedulerConfig, nodeTaskMap); + return new UniformNodeSelectorFactory(CURRENT_NODE, nodeManager, nodeSchedulerConfig, nodeTaskMap, consistentHashingAddressProvider); case "topology": - return new TopologyAwareNodeSelectorFactory(new FlatNetworkTopology(), CURRENT_NODE, nodeManager, nodeSchedulerConfig, nodeTaskMap, new TopologyAwareNodeSelectorConfig()); + return new TopologyAwareNodeSelectorFactory(new FlatNetworkTopology(), CURRENT_NODE, nodeManager, nodeSchedulerConfig, nodeTaskMap, new TopologyAwareNodeSelectorConfig(), consistentHashingAddressProvider); case "benchmark": - return new TopologyAwareNodeSelectorFactory(new BenchmarkNetworkTopology(), CURRENT_NODE, nodeManager, nodeSchedulerConfig, nodeTaskMap, getBenchmarkNetworkTopologyConfig()); + return new TopologyAwareNodeSelectorFactory(new BenchmarkNetworkTopology(), CURRENT_NODE, nodeManager, nodeSchedulerConfig, nodeTaskMap, getBenchmarkNetworkTopologyConfig(), consistentHashingAddressProvider); default: throw new IllegalStateException(); } diff --git a/core/trino-main/src/test/java/io/trino/execution/TestNodeScheduler.java b/core/trino-main/src/test/java/io/trino/execution/TestNodeScheduler.java index 9606a1447c11..47809be75e90 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestNodeScheduler.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestNodeScheduler.java @@ -22,6 +22,8 @@ import com.google.common.collect.Sets; import io.trino.Session; import io.trino.SystemSessionProperties; +import io.trino.execution.scheduler.ConsistentHashingAddressProvider; +import io.trino.execution.scheduler.ConsistentHashingAddressProviderConfig; import io.trino.execution.scheduler.NetworkLocation; import io.trino.execution.scheduler.NetworkTopology; import io.trino.execution.scheduler.NodeScheduler; @@ -115,7 +117,7 @@ public void setUp() .setMaxAdjustedPendingSplitsWeightPerTask(100) .setIncludeCoordinator(false); - nodeScheduler = new NodeScheduler(new UniformNodeSelectorFactory(CURRENT_NODE, nodeManager, nodeSchedulerConfig, nodeTaskMap)); + nodeScheduler = new NodeScheduler(new UniformNodeSelectorFactory(CURRENT_NODE, nodeManager, nodeSchedulerConfig, nodeTaskMap, new ConsistentHashingAddressProvider(nodeManager, new ConsistentHashingAddressProviderConfig()))); // contents of taskMap indicate the node-task map for the current stage taskMap = new HashMap<>(); nodeSelector = nodeScheduler.createNodeSelector(session); @@ -190,7 +192,8 @@ public void testTopologyAwareScheduling() nodeManager, nodeSchedulerConfig, nodeTaskMap, - getNetworkTopologyConfig()); + getNetworkTopologyConfig(), + new ConsistentHashingAddressProvider(nodeManager, new ConsistentHashingAddressProviderConfig())); NodeScheduler nodeScheduler = new NodeScheduler(nodeSelectorFactory); NodeSelector nodeSelector = nodeScheduler.createNodeSelector(session); @@ -613,7 +616,8 @@ public void testTopologyAwareFailover() nodeManager, nodeSchedulerConfig, nodeTaskMap, - getNetworkTopologyConfig()); + getNetworkTopologyConfig(), + new ConsistentHashingAddressProvider(nodeManager, new ConsistentHashingAddressProviderConfig())); NodeScheduler nodeScheduler = new NodeScheduler(nodeSelectorFactory); NodeSelector nodeSelector = nodeScheduler.createNodeSelector(session); diff --git a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestMultiSourcePartitionedScheduler.java b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestMultiSourcePartitionedScheduler.java index 9b600ea04f2c..1b9592490291 100644 --- a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestMultiSourcePartitionedScheduler.java +++ b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestMultiSourcePartitionedScheduler.java @@ -602,7 +602,7 @@ private SplitPlacementPolicy createSplitPlacementPolicies(Session session, Stage .setMaxSplitsPerNode(100) .setMinPendingSplitsPerTask(0) .setSplitsBalancingPolicy(STAGE); - NodeScheduler nodeScheduler = new NodeScheduler(new UniformNodeSelectorFactory(CURRENT_NODE, nodeManager, nodeSchedulerConfig, nodeTaskMap, new Duration(0, SECONDS))); + NodeScheduler nodeScheduler = new NodeScheduler(new UniformNodeSelectorFactory(CURRENT_NODE, nodeManager, nodeSchedulerConfig, nodeTaskMap, new ConsistentHashingAddressProvider(nodeManager, new ConsistentHashingAddressProviderConfig()), new Duration(0, SECONDS))); return new DynamicSplitPlacementPolicy(nodeScheduler.createNodeSelector(session), stage::getAllTasks); } diff --git a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestScaledWriterScheduler.java b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestScaledWriterScheduler.java index c4fc9862aa10..5c8b53a0d182 100644 --- a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestScaledWriterScheduler.java +++ b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestScaledWriterScheduler.java @@ -32,6 +32,7 @@ import io.trino.execution.buffer.OutputBufferStatus; import io.trino.metadata.Split; import io.trino.node.InternalNode; +import io.trino.node.InternalNodeManager; import io.trino.node.TestingInternalNodeManager; import io.trino.spi.NodeVersion; import io.trino.spi.QueryId; @@ -207,16 +208,23 @@ private ScaledWriterScheduler buildScaledWriterScheduler(AtomicReference { NodeTaskMap nodeTaskMap = new NodeTaskMap(finalizerService); TestingInternalNodeManager nodeManager = TestingInternalNodeManager.createDefault(); - NodeScheduler nodeScheduler = new NodeScheduler(new UniformNodeSelectorFactory(CURRENT_NODE, nodeManager, new NodeSchedulerConfig().setIncludeCoordinator(false), nodeTaskMap)); + NodeScheduler nodeScheduler = new NodeScheduler(new UniformNodeSelectorFactory(CURRENT_NODE, nodeManager, new NodeSchedulerConfig().setIncludeCoordinator(false), nodeTaskMap, new ConsistentHashingAddressProvider(nodeManager, new ConsistentHashingAddressProviderConfig()))); PlanFragment plan = createFragment(); StageExecution stage = createStageExecution(plan, nodeTaskMap); @@ -502,7 +502,7 @@ public void testNewTaskScheduledWhenChildStageBufferIsUnderutilized() new InternalNode("other1", URI.create("http://127.0.0.1:11"), NodeVersion.UNKNOWN, false), new InternalNode("other2", URI.create("http://127.0.0.1:12"), NodeVersion.UNKNOWN, false), new InternalNode("other3", URI.create("http://127.0.0.1:13"), NodeVersion.UNKNOWN, false)); - NodeScheduler nodeScheduler = new NodeScheduler(new UniformNodeSelectorFactory(CURRENT_NODE, nodeManager, new NodeSchedulerConfig().setIncludeCoordinator(false), nodeTaskMap, new Duration(0, SECONDS))); + NodeScheduler nodeScheduler = new NodeScheduler(new UniformNodeSelectorFactory(CURRENT_NODE, nodeManager, new NodeSchedulerConfig().setIncludeCoordinator(false), nodeTaskMap, new ConsistentHashingAddressProvider(nodeManager, new ConsistentHashingAddressProviderConfig()), new Duration(0, SECONDS))); PlanFragment plan = createFragment(); StageExecution stage = createStageExecution(plan, nodeTaskMap); @@ -545,7 +545,7 @@ public void testNoNewTaskScheduledWhenChildStageBufferIsOverutilized() new InternalNode("other1", URI.create("http://127.0.0.1:11"), NodeVersion.UNKNOWN, false), new InternalNode("other2", URI.create("http://127.0.0.1:12"), NodeVersion.UNKNOWN, false), new InternalNode("other3", URI.create("http://127.0.0.1:13"), NodeVersion.UNKNOWN, false)); - NodeScheduler nodeScheduler = new NodeScheduler(new UniformNodeSelectorFactory(CURRENT_NODE, nodeManager, new NodeSchedulerConfig().setIncludeCoordinator(false), nodeTaskMap, new Duration(0, SECONDS))); + NodeScheduler nodeScheduler = new NodeScheduler(new UniformNodeSelectorFactory(CURRENT_NODE, nodeManager, new NodeSchedulerConfig().setIncludeCoordinator(false), nodeTaskMap, new ConsistentHashingAddressProvider(nodeManager, new ConsistentHashingAddressProviderConfig()), new Duration(0, SECONDS))); PlanFragment plan = createFragment(); StageExecution stage = createStageExecution(plan, nodeTaskMap); @@ -585,7 +585,7 @@ public void testDynamicFiltersUnblockedOnBlockedBuildSource() PlanFragment plan = createFragment(); NodeTaskMap nodeTaskMap = new NodeTaskMap(finalizerService); StageExecution stage = createStageExecution(plan, nodeTaskMap); - NodeScheduler nodeScheduler = new NodeScheduler(new UniformNodeSelectorFactory(CURRENT_NODE, nodeManager, new NodeSchedulerConfig().setIncludeCoordinator(false), nodeTaskMap)); + NodeScheduler nodeScheduler = new NodeScheduler(new UniformNodeSelectorFactory(CURRENT_NODE, nodeManager, new NodeSchedulerConfig().setIncludeCoordinator(false), nodeTaskMap, new ConsistentHashingAddressProvider(nodeManager, new ConsistentHashingAddressProviderConfig()))); DynamicFilterService dynamicFilterService = new DynamicFilterService(metadata, functionManager, typeOperators, new DynamicFilterConfig()); dynamicFilterService.registerQuery( QUERY_ID, @@ -660,7 +660,7 @@ private StageScheduler getSourcePartitionedScheduler( .setMaxSplitsPerNode(20) .setMinPendingSplitsPerTask(0) .setSplitsBalancingPolicy(splitsBalancingPolicy); - NodeScheduler nodeScheduler = new NodeScheduler(new UniformNodeSelectorFactory(CURRENT_NODE, nodeManager, nodeSchedulerConfig, nodeTaskMap, new Duration(0, SECONDS))); + NodeScheduler nodeScheduler = new NodeScheduler(new UniformNodeSelectorFactory(CURRENT_NODE, nodeManager, nodeSchedulerConfig, nodeTaskMap, new ConsistentHashingAddressProvider(nodeManager, new ConsistentHashingAddressProviderConfig()), new Duration(0, SECONDS))); SplitPlacementPolicy placementPolicy = new DynamicSplitPlacementPolicy(nodeScheduler.createNodeSelector(session), stage::getAllTasks); return newSourcePartitionedSchedulerAsStageScheduler( diff --git a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestUniformNodeSelector.java b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestUniformNodeSelector.java index 42f7485ae7e8..e5f6a35c6bfc 100644 --- a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestUniformNodeSelector.java +++ b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestUniformNodeSelector.java @@ -99,7 +99,7 @@ public void setUp() .setIncludeCoordinator(false); // contents of taskMap indicate the node-task map for the current stage - nodeScheduler = new NodeScheduler(new UniformNodeSelectorFactory(CURRENT_NODE, nodeManager, nodeSchedulerConfig, nodeTaskMap)); + nodeScheduler = new NodeScheduler(new UniformNodeSelectorFactory(CURRENT_NODE, nodeManager, nodeSchedulerConfig, nodeTaskMap, new ConsistentHashingAddressProvider(nodeManager, new ConsistentHashingAddressProviderConfig()))); taskMap = new HashMap<>(); nodeSelector = nodeScheduler.createNodeSelector(session); remoteTaskExecutor = newCachedThreadPool(daemonThreadsNamed("remoteTaskExecutor-%s")); @@ -139,7 +139,8 @@ public void testQueueSizeAdjustmentScaleDown() 500, NodeSchedulerConfig.SplitsBalancingPolicy.STAGE, false, - queueSizeAdjuster); + queueSizeAdjuster, + new ConsistentHashingAddressProvider(nodeManager, new ConsistentHashingAddressProviderConfig())); for (int i = 0; i < 20; i++) { splits.add(new Split(TEST_CATALOG_HANDLE, TestingSplit.createRemoteSplit())); @@ -311,7 +312,8 @@ public void testFailover() 2000, NodeSchedulerConfig.SplitsBalancingPolicy.STAGE, true, - new UniformNodeSelector.QueueSizeAdjuster(1000, 10000, new TestingTicker())); + new UniformNodeSelector.QueueSizeAdjuster(1000, 10000, new TestingTicker()), + new ConsistentHashingAddressProvider(nodeManager, new ConsistentHashingAddressProviderConfig())); Split rigidSplit = new Split(TEST_CATALOG_HANDLE, new TestingSplit(false, ImmutableList.of(node1.getHostAndPort()))); splits.add(rigidSplit); diff --git a/core/trino-main/src/test/java/io/trino/execution/scheduler/faulttolerant/TestArbitraryDistributionSplitAssigner.java b/core/trino-main/src/test/java/io/trino/execution/scheduler/faulttolerant/TestArbitraryDistributionSplitAssigner.java index 63b99a764cb9..5b2ba29d0466 100644 --- a/core/trino-main/src/test/java/io/trino/execution/scheduler/faulttolerant/TestArbitraryDistributionSplitAssigner.java +++ b/core/trino-main/src/test/java/io/trino/execution/scheduler/faulttolerant/TestArbitraryDistributionSplitAssigner.java @@ -22,7 +22,10 @@ import com.google.common.collect.Multimaps; import com.google.common.collect.SetMultimap; import com.google.common.collect.Sets; +import io.trino.execution.scheduler.ConsistentHashingAddressProvider; +import io.trino.execution.scheduler.ConsistentHashingAddressProviderConfig; import io.trino.metadata.Split; +import io.trino.node.TestingInternalNodeManager; import io.trino.spi.HostAddress; import io.trino.sql.planner.plan.PlanNodeId; import org.junit.jupiter.api.Test; @@ -56,6 +59,8 @@ public class TestArbitraryDistributionSplitAssigner private static final long STANDARD_SPLIT_SIZE_IN_BYTES = 1; + private static final ConsistentHashingAddressProvider CONSISTENT_HASHING_ADDRESS_PROVIDER = new ConsistentHashingAddressProvider(TestingInternalNodeManager.createDefault(), new ConsistentHashingAddressProviderConfig()); + private static final PlanNodeId PARTITIONED_1 = new PlanNodeId("partitioned-1"); private static final PlanNodeId PARTITIONED_2 = new PlanNodeId("partitioned-2"); private static final PlanNodeId REPLICATED_1 = new PlanNodeId("replicated-1"); @@ -474,7 +479,8 @@ public void testAdaptiveTaskSizing() 1, 4, STANDARD_SPLIT_SIZE_IN_BYTES, - 5); + 5, + CONSISTENT_HASHING_ADDRESS_PROVIDER); SplitAssignerTester tester = new SplitAssignerTester(); for (SplitBatch batch : batches) { PlanNodeId planNodeId = batch.getPlanNodeId(); @@ -539,7 +545,8 @@ public void testAdaptiveTaskSizingRounding() 100, 400, 100, - 5); + 5, + CONSISTENT_HASHING_ADDRESS_PROVIDER); SplitAssignerTester tester = new SplitAssignerTester(); for (SplitBatch batch : batches) { PlanNodeId planNodeId = batch.getPlanNodeId(); @@ -856,7 +863,8 @@ private static ArbitraryDistributionSplitAssigner createSplitAssigner( targetPartitionSizeInBytes, targetPartitionSizeInBytes, STANDARD_SPLIT_SIZE_IN_BYTES, - maxTaskSplitCount); + maxTaskSplitCount, + CONSISTENT_HASHING_ADDRESS_PROVIDER); } private static class SplitBatch diff --git a/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorSplit.java b/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorSplit.java index 8119183c723e..2b3634832daa 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorSplit.java +++ b/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorSplit.java @@ -17,6 +17,7 @@ import io.trino.spi.SplitWeight; import java.util.List; +import java.util.Optional; public interface ConnectorSplit { @@ -40,6 +41,17 @@ default List getAddresses() return List.of(); } + /** + * Returns an optional affinity key so splits reading related content are routed to the + * same worker(s) across queries. When empty, scheduling falls back to {@link #getAddresses()}. + *

+ * Only remotely accessible splits may supply an affinity key (see {@link #isRemotelyAccessible()}). + */ + default Optional getAffinityKey() + { + return Optional.empty(); + } + default SplitWeight getSplitWeight() { return SplitWeight.standard(); diff --git a/lib/trino-filesystem-cache-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemCacheModule.java b/lib/trino-filesystem-cache-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemCacheModule.java index 4d063e05e5d9..15e979705bcc 100644 --- a/lib/trino-filesystem-cache-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemCacheModule.java +++ b/lib/trino-filesystem-cache-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemCacheModule.java @@ -18,9 +18,8 @@ import com.google.inject.Binder; import com.google.inject.Provider; import io.airlift.configuration.AbstractConfigurationAwareModule; -import io.trino.filesystem.cache.CachingHostAddressProvider; -import io.trino.filesystem.cache.ConsistentHashingHostAddressProvider; -import io.trino.filesystem.cache.ConsistentHashingHostAddressProviderConfig; +import io.trino.filesystem.cache.CacheSplitAffinityProvider; +import io.trino.filesystem.cache.SplitAffinityProvider; import io.trino.filesystem.cache.TrinoFileSystemCache; import io.trino.spi.catalog.CatalogName; @@ -45,14 +44,13 @@ public AlluxioFileSystemCacheModule(boolean isCoordinator) protected void setup(Binder binder) { configBinder(binder).bindConfig(AlluxioFileSystemCacheConfig.class); - configBinder(binder).bindConfig(ConsistentHashingHostAddressProviderConfig.class); binder.bind(AlluxioCacheStats.class).in(SINGLETON); Provider catalogName = binder.getProvider(CatalogName.class); newExporter(binder).export(AlluxioCacheStats.class) .as(generator -> generator.generatedNameOf(AlluxioCacheStats.class, catalogName.get().toString())); if (isCoordinator) { - newOptionalBinder(binder, CachingHostAddressProvider.class).setBinding().to(ConsistentHashingHostAddressProvider.class).in(SINGLETON); + newOptionalBinder(binder, SplitAffinityProvider.class).setBinding().to(CacheSplitAffinityProvider.class).in(SINGLETON); } binder.bind(TrinoFileSystemCache.class).to(AlluxioFileSystemCache.class).in(SINGLETON); diff --git a/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemModule.java b/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemModule.java index 8183e434f61c..353658cec3aa 100644 --- a/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemModule.java +++ b/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemModule.java @@ -30,9 +30,9 @@ import io.trino.filesystem.azure.AzureFileSystemModule; import io.trino.filesystem.cache.CacheFileSystemFactory; import io.trino.filesystem.cache.CacheKeyProvider; -import io.trino.filesystem.cache.CachingHostAddressProvider; import io.trino.filesystem.cache.DefaultCacheKeyProvider; -import io.trino.filesystem.cache.DefaultCachingHostAddressProvider; +import io.trino.filesystem.cache.NoopSplitAffinityProvider; +import io.trino.filesystem.cache.SplitAffinityProvider; import io.trino.filesystem.cache.TrinoFileSystemCache; import io.trino.filesystem.gcs.GcsFileSystemFactory; import io.trino.filesystem.gcs.GcsFileSystemModule; @@ -121,8 +121,8 @@ protected void setup(Binder binder) factories.addBinding("file").to(LocalFileSystemFactory.class); } - newOptionalBinder(binder, CachingHostAddressProvider.class).setDefault().to(DefaultCachingHostAddressProvider.class).in(Scopes.SINGLETON); newOptionalBinder(binder, CacheKeyProvider.class).setDefault().to(DefaultCacheKeyProvider.class).in(Scopes.SINGLETON); + newOptionalBinder(binder, SplitAffinityProvider.class).setDefault().to(NoopSplitAffinityProvider.class).in(Scopes.SINGLETON); newOptionalBinder(binder, TrinoFileSystemCache.class); newOptionalBinder(binder, MemoryFileSystemCache.class); diff --git a/lib/trino-filesystem/pom.xml b/lib/trino-filesystem/pom.xml index ac7dcb5f4bc4..b522b21e5950 100644 --- a/lib/trino-filesystem/pom.xml +++ b/lib/trino-filesystem/pom.xml @@ -18,12 +18,6 @@ - - com.github.ishugaliy - allgood-consistent-hash - 1.0.0 - - com.google.guava guava @@ -90,11 +84,6 @@ trino-spi - - jakarta.annotation - jakarta.annotation-api - - jakarta.validation jakarta.validation-api diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/DefaultCachingHostAddressProvider.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/CacheSplitAffinityProvider.java similarity index 69% rename from lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/DefaultCachingHostAddressProvider.java rename to lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/CacheSplitAffinityProvider.java index 9fdbe388eabe..1ca4dba0015f 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/DefaultCachingHostAddressProvider.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/CacheSplitAffinityProvider.java @@ -13,16 +13,14 @@ */ package io.trino.filesystem.cache; -import io.trino.spi.HostAddress; +import java.util.Optional; -import java.util.List; - -public class DefaultCachingHostAddressProvider - implements CachingHostAddressProvider +public class CacheSplitAffinityProvider + implements SplitAffinityProvider { @Override - public List getHosts(String splitKey, List defaultAddresses) + public Optional getKey(String path, long offset, long length) { - return defaultAddresses; + return Optional.of(path + ":" + offset + ":" + length); } } diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/CachingHostAddressProvider.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/CachingHostAddressProvider.java deleted file mode 100644 index dbc28b1d7674..000000000000 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/CachingHostAddressProvider.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.filesystem.cache; - -import io.trino.spi.HostAddress; - -import java.util.List; - -public interface CachingHostAddressProvider -{ - /** - * Returns a lists of hosts which are preferred to cache the split with the given path. - */ - List getHosts(String splitKey, List defaultAddresses); - - // Include offset and length in key to allow different splits belonging to the same file to be cached on different nodes - // This can help with avoiding hotspots when multiple splits are created from a file - static String getSplitKey(String splitPath, long offset, long length) - { - return splitPath + ":" + offset + ":" + length; - } -} diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/NoopSplitAffinityProvider.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/NoopSplitAffinityProvider.java new file mode 100644 index 000000000000..9e41b3328740 --- /dev/null +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/NoopSplitAffinityProvider.java @@ -0,0 +1,26 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.filesystem.cache; + +import java.util.Optional; + +public class NoopSplitAffinityProvider + implements SplitAffinityProvider +{ + @Override + public Optional getKey(String path, long offset, long length) + { + return Optional.empty(); + } +} diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/SplitAffinityProvider.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/SplitAffinityProvider.java new file mode 100644 index 000000000000..e18de49533e6 --- /dev/null +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/SplitAffinityProvider.java @@ -0,0 +1,27 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.filesystem.cache; + +import java.util.Optional; + +/** + * Produces a stable affinity key for a split, used by the engine to consistently route + * splits reading the same content to the same worker(s). A non-empty key opts the split + * into cache-aware scheduling via the engine's consistent-hash ring; an empty key leaves + * scheduling to the split's own addresses. + */ +public interface SplitAffinityProvider +{ + Optional getKey(String path, long offset, long length); +} diff --git a/lib/trino-filesystem/src/test/java/io/trino/filesystem/cache/TestConsistentHashingCacheHostAddressProvider.java b/lib/trino-filesystem/src/test/java/io/trino/filesystem/cache/TestConsistentHashingCacheHostAddressProvider.java deleted file mode 100644 index c24a50c880fd..000000000000 --- a/lib/trino-filesystem/src/test/java/io/trino/filesystem/cache/TestConsistentHashingCacheHostAddressProvider.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.filesystem.cache; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Sets; -import io.trino.node.InternalNode; -import io.trino.spi.Node; -import io.trino.spi.NodeVersion; -import io.trino.testing.TestingNodeManager; -import org.junit.jupiter.api.Test; - -import java.net.URI; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; - -import static java.lang.Math.abs; -import static org.assertj.core.api.Assertions.assertThat; - -public class TestConsistentHashingCacheHostAddressProvider -{ - @Test - public void testConsistentHashing() - { - TestingNodeManager nodeManager = TestingNodeManager.builder() - .addNode(node("test-1")) - .addNode(node("test-2")) - .addNode(node("test-3")) - .build(); - ConsistentHashingHostAddressProvider provider = new ConsistentHashingHostAddressProvider( - nodeManager, - new ConsistentHashingHostAddressProviderConfig().setPreferredHostsCount(1)); - provider.refreshHashRing(); - assertFairDistribution(provider, nodeManager.getWorkerNodes()); - nodeManager.removeNode(node("test-2")); - provider.refreshHashRing(); - assertFairDistribution(provider, nodeManager.getWorkerNodes()); - nodeManager.addNode(node("test-4")); - nodeManager.addNode(node("test-5")); - provider.refreshHashRing(); - assertFairDistribution(provider, nodeManager.getWorkerNodes()); - } - - @Test - public void testConsistentHashingFairRedistribution() - { - TestingNodeManager nodeManager = TestingNodeManager.builder() - .addNode(node("test-1")) - .addNode(node("test-2")) - .addNode(node("test-3")) - .build(); - ConsistentHashingHostAddressProvider provider = new ConsistentHashingHostAddressProvider( - nodeManager, - new ConsistentHashingHostAddressProviderConfig().setPreferredHostsCount(1)); - provider.refreshHashRing(); - Map> distribution = getDistribution(provider); - nodeManager.removeNode(node("test-1")); - provider.refreshHashRing(); - Map> removeOne = getDistribution(provider); - assertMinimalRedistribution(distribution, removeOne); - nodeManager.addNode(node("test-1")); - provider.refreshHashRing(); - Map> addOne = getDistribution(provider); - assertMinimalRedistribution(removeOne, addOne); - assertThat(addOne).isEqualTo(distribution); - nodeManager.addNode(node("test-4")); - provider.refreshHashRing(); - Map> addTwo = getDistribution(provider); - assertMinimalRedistribution(addOne, addTwo); - } - - private static void assertFairDistribution(CachingHostAddressProvider cachingHostAddressProvider, Set nodeNames) - { - int n = 1000; - Map counts = new HashMap<>(); - for (int i = 0; i < n; i++) { - counts.merge(cachingHostAddressProvider.getHosts(String.valueOf(i), ImmutableList.of()).get(0).getHostText(), 1, Math::addExact); - } - assertThat(nodeNames.stream().map(m -> m.getHostAndPort().getHostText()).collect(Collectors.toSet())).isEqualTo(counts.keySet()); - counts.values().forEach(c -> assertThat(abs(c - n / nodeNames.size()) < 0.1 * n).isTrue()); - } - - private void assertMinimalRedistribution(Map> oldDistribution, Map> newDistribution) - { - oldDistribution.entrySet().stream().filter(e -> newDistribution.containsKey(e.getKey())).forEach(entry -> { - int sameKeySize = Sets.intersection(newDistribution.get(entry.getKey()), entry.getValue()).size(); - int oldKeySize = entry.getValue().size(); - assertThat(abs(sameKeySize - oldKeySize) < oldKeySize / oldDistribution.size()).isTrue(); - }); - } - - private Map> getDistribution(ConsistentHashingHostAddressProvider provider) - { - int n = 1000; - Map> distribution = new HashMap<>(); - for (int i = 0; i < n; i++) { - String host = provider.getHosts(String.valueOf(i), ImmutableList.of()).get(0).getHostText(); - distribution.computeIfAbsent(host, (k) -> new HashSet<>()).add(i); - } - return distribution; - } - - private static Node node(String nodeName) - { - return new InternalNode(nodeName, URI.create("http://" + nodeName + "/"), NodeVersion.UNKNOWN, false); - } -} diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSourceProvider.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSourceProvider.java index 9c4cce955d6e..0749d5bfdec3 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSourceProvider.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSourceProvider.java @@ -168,7 +168,7 @@ public ConnectorPageSource createPageSource( .filter(column -> (column.columnType() == REGULAR) || column.baseColumnName().equals(ROW_ID_COLUMN_NAME)) .collect(toImmutableList()); - Map> partitionKeys = split.getPartitionKeys(); + Map> partitionKeys = split.partitionKeys(); ColumnMappingMode columnMappingMode = getColumnMappingMode(table.getMetadataEntry(), table.getProtocolEntry()); Optional> partitionValues = Optional.empty(); if (deltaLakeColumns.stream().anyMatch(column -> column.baseColumnName().equals(ROW_ID_COLUMN_NAME))) { @@ -195,7 +195,7 @@ public ConnectorPageSource createPageSource( // is available now, without having to access parquet file footer for row-group stats. TupleDomain filteredSplitPredicate = TupleDomain.intersect(ImmutableList.of( table.getNonPartitionConstraint(), - split.getStatisticsPredicate(), + split.statisticsPredicate(), dynamicFilter.getCurrentPredicate().transformKeys(DeltaLakeColumnHandle.class::cast))); if (filteredSplitPredicate.isNone()) { return new EmptyPageSource(); @@ -203,34 +203,34 @@ public ConnectorPageSource createPageSource( Map partitionColumnDomains = filteredSplitPredicate.getDomains().orElseThrow().entrySet().stream() .filter(entry -> entry.getKey().columnType() == DeltaLakeColumnType.PARTITION_KEY) .collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue)); - if (!partitionMatchesPredicate(split.getPartitionKeys(), partitionColumnDomains)) { + if (!partitionMatchesPredicate(split.partitionKeys(), partitionColumnDomains)) { return new EmptyPageSource(); } // Skip reading the file if none of the actual file columns are being read if (filteredSplitPredicate.isAll() && - split.getStart() == 0 && split.getLength() == split.getFileSize() && - split.getFileRowCount().isPresent() && - split.getDeletionVector().isEmpty() && + split.start() == 0 && split.length() == split.fileSize() && + split.fileRowCount().isPresent() && + split.deletionVector().isEmpty() && (regularColumns.isEmpty() || onlyRowIdColumn(regularColumns))) { return projectColumns( deltaLakeColumns, ImmutableSet.of(), partitionKeys, partitionValues, - generatePages(split.getFileRowCount().get(), onlyRowIdColumn(regularColumns)), - split.getPath(), - split.getFileSize(), - split.getFileModifiedTime()); + generatePages(split.fileRowCount().get(), onlyRowIdColumn(regularColumns)), + split.path(), + split.fileSize(), + split.fileModifiedTime()); } - Location location = Location.of(split.getPath()); + Location location = Location.of(split.path()); TrinoFileSystem fileSystem = fileSystemFactory.create(session, table); - TrinoInputFile inputFile = fileSystem.newInputFile(location, split.getFileSize()); + TrinoInputFile inputFile = fileSystem.newInputFile(location, split.fileSize()); ParquetReaderOptions options = ParquetReaderOptions.builder(parquetReaderOptions) .withMaxReadBlockSize(getParquetMaxReadBlockSize(session)) .withMaxReadBlockRowCount(getParquetMaxReadBlockRowCount(session)) .withSmallFileThreshold(getParquetSmallFileThreshold(session)) - .withUseColumnIndex(!table.isMerge() && split.getDeletionVector().isEmpty() && isParquetUseColumnIndex(session)) + .withUseColumnIndex(!table.isMerge() && split.deletionVector().isEmpty() && isParquetUseColumnIndex(session)) .withIgnoreStatistics(isParquetIgnoreStatistics(session)) .withVectorizedDecodingEnabled(isParquetVectorizedDecodingEnabled(session)) .build(); @@ -248,7 +248,7 @@ public ConnectorPageSource createPageSource( hiveColumnHandlesBuilder::add, () -> missingColumnNamesBuilder.add(column.baseColumnName())); } - if (split.getDeletionVector().isPresent() && !regularColumns.contains(rowPositionColumnHandle())) { + if (split.deletionVector().isPresent() && !regularColumns.contains(rowPositionColumnHandle())) { hiveColumnHandlesBuilder.add(PARQUET_ROW_INDEX_COLUMN); } List hiveColumnHandles = hiveColumnHandlesBuilder.build(); @@ -258,8 +258,8 @@ public ConnectorPageSource createPageSource( ConnectorPageSource delegate = ParquetPageSourceFactory.createPageSource( inputFile, - split.getStart(), - split.getLength(), + split.start(), + split.length(), hiveColumnHandles, ImmutableList.of(parquetPredicate), true, @@ -269,15 +269,15 @@ public ConnectorPageSource createPageSource( Optional.empty(), Optional.empty(), domainCompactionThreshold, - OptionalLong.of(split.getFileSize())); + OptionalLong.of(split.fileSize())); - if (split.getDeletionVector().isPresent()) { + if (split.deletionVector().isPresent()) { var pageFilterSupplier = Suppliers.memoize(() -> { List requiredColumns = ImmutableList.builderWithExpectedSize(regularColumns.size() + 1) .addAll(regularColumns) .add(rowPositionColumnHandle()) .build(); - PositionDeleteFilter deleteFilter = readDeletes(fileSystem, Location.of(table.location()), split.getDeletionVector().get()); + PositionDeleteFilter deleteFilter = readDeletes(fileSystem, Location.of(table.location()), split.deletionVector().get()); return deleteFilter.createPredicate(requiredColumns); }); @@ -292,9 +292,9 @@ public ConnectorPageSource createPageSource( partitionKeys, partitionValues, delegate, - split.getPath(), - split.getFileSize(), - split.getFileModifiedTime()); + split.path(), + split.fileSize(), + split.fileModifiedTime()); } public static ConnectorPageSource projectColumns( diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplit.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplit.java index 2cb0d556d230..24b242b89e4e 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplit.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplit.java @@ -13,20 +13,14 @@ */ package io.trino.plugin.deltalake; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.collect.ImmutableList; import io.airlift.slice.SizeOf; import io.trino.plugin.deltalake.transactionlog.DeletionVectorEntry; -import io.trino.spi.HostAddress; import io.trino.spi.SplitWeight; import io.trino.spi.connector.ConnectorSplit; import io.trino.spi.predicate.TupleDomain; -import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Optional; import static com.google.common.base.MoreObjects.toStringHelper; @@ -36,148 +30,45 @@ import static io.airlift.slice.SizeOf.sizeOf; import static java.util.Objects.requireNonNull; -public class DeltaLakeSplit +public record DeltaLakeSplit( + @JsonProperty("path") String path, + @JsonProperty("start") long start, + @JsonProperty("length") long length, + @JsonProperty("fileSize") long fileSize, + @JsonProperty("rowCount") Optional fileRowCount, + @JsonProperty("fileModifiedTime") long fileModifiedTime, + @JsonProperty("deletionVector") Optional deletionVector, + @JsonProperty("affinityKey") Optional affinityKey, + @JsonProperty("splitWeight") SplitWeight splitWeight, + @JsonProperty("statisticsPredicate") TupleDomain statisticsPredicate, + @JsonProperty("partitionKeys") Map> partitionKeys) implements ConnectorSplit { private static final int INSTANCE_SIZE = instanceSize(DeltaLakeSplit.class); - private final String path; - private final long start; - private final long length; - private final long fileSize; - private final Optional fileRowCount; - private final long fileModifiedTime; - private final Optional deletionVector; - private final List addresses; - private final SplitWeight splitWeight; - private final TupleDomain statisticsPredicate; - private final Map> partitionKeys; - - @JsonCreator - public DeltaLakeSplit( - @JsonProperty("path") String path, - @JsonProperty("start") long start, - @JsonProperty("length") long length, - @JsonProperty("fileSize") long fileSize, - @JsonProperty("rowCount") Optional fileRowCount, - @JsonProperty("fileModifiedTime") long fileModifiedTime, - @JsonProperty("deletionVector") Optional deletionVector, - @JsonProperty("splitWeight") SplitWeight splitWeight, - @JsonProperty("statisticsPredicate") TupleDomain statisticsPredicate, - @JsonProperty("partitionKeys") Map> partitionKeys) + public DeltaLakeSplit { - this( - path, - start, - length, - fileSize, - fileRowCount, - fileModifiedTime, - deletionVector, - ImmutableList.of(), - splitWeight, - statisticsPredicate, - partitionKeys); + requireNonNull(path, "path is null"); + requireNonNull(fileRowCount, "rowCount is null"); + requireNonNull(deletionVector, "deletionVector is null"); + requireNonNull(affinityKey, "affinityKey is null"); + requireNonNull(splitWeight, "splitWeight is null"); + requireNonNull(statisticsPredicate, "statisticsPredicate is null"); + requireNonNull(partitionKeys, "partitionKeys is null"); } - public DeltaLakeSplit( - String path, - long start, - long length, - long fileSize, - Optional fileRowCount, - long fileModifiedTime, - Optional deletionVector, - List addresses, - SplitWeight splitWeight, - TupleDomain statisticsPredicate, - Map> partitionKeys) - { - this.path = requireNonNull(path, "path is null"); - this.start = start; - this.length = length; - this.fileSize = fileSize; - this.fileRowCount = requireNonNull(fileRowCount, "rowCount is null"); - this.fileModifiedTime = fileModifiedTime; - this.deletionVector = requireNonNull(deletionVector, "deletionVector is null"); - this.addresses = requireNonNull(addresses, "addresses is null"); - this.splitWeight = requireNonNull(splitWeight, "splitWeight is null"); - this.statisticsPredicate = requireNonNull(statisticsPredicate, "statisticsPredicate is null"); - this.partitionKeys = requireNonNull(partitionKeys, "partitionKeys is null"); - } - - // do not serialize addresses as they are not needed on workers - @JsonIgnore @Override - public List getAddresses() + public Optional getAffinityKey() { - return addresses; + return affinityKey; } - @JsonProperty @Override public SplitWeight getSplitWeight() { return splitWeight; } - @JsonProperty - public String getPath() - { - return path; - } - - @JsonProperty - public long getStart() - { - return start; - } - - @JsonProperty - public long getLength() - { - return length; - } - - @JsonProperty - public long getFileSize() - { - return fileSize; - } - - @JsonProperty - public Optional getFileRowCount() - { - return fileRowCount; - } - - @JsonProperty - public long getFileModifiedTime() - { - return fileModifiedTime; - } - - @JsonProperty - public Optional getDeletionVector() - { - return deletionVector; - } - - /** - * A TupleDomain representing the min/max statistics from the file this split was generated from. This does not contain any partitioning information. - */ - @JsonProperty - public TupleDomain getStatisticsPredicate() - { - return statisticsPredicate; - } - - @JsonProperty - public Map> getPartitionKeys() - { - return partitionKeys; - } - @Override public long getRetainedSizeInBytes() { @@ -185,6 +76,7 @@ public long getRetainedSizeInBytes() + estimatedSizeOf(path) + sizeOf(fileRowCount, value -> LONG_INSTANCE_SIZE) + sizeOf(deletionVector, DeletionVectorEntry::sizeInBytes) + + sizeOf(affinityKey, SizeOf::estimatedSizeOf) + splitWeight.getRetainedSizeInBytes() + statisticsPredicate.getRetainedSizeInBytes(DeltaLakeColumnHandle::retainedSizeInBytes) + estimatedSizeOf(partitionKeys, SizeOf::estimatedSizeOf, value -> sizeOf(value, SizeOf::estimatedSizeOf)); @@ -201,37 +93,8 @@ public String toString() .add("rowCount", fileRowCount) .add("fileModifiedTime", fileModifiedTime) .add("deletionVector", deletionVector) - .add("addresses", addresses) .add("statisticsPredicate", statisticsPredicate) .add("partitionKeys", partitionKeys) .toString(); } - - @Override - public boolean equals(Object o) - { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - DeltaLakeSplit that = (DeltaLakeSplit) o; - return start == that.start && - length == that.length && - fileSize == that.fileSize && - fileModifiedTime == that.fileModifiedTime && - path.equals(that.path) && - fileRowCount.equals(that.fileRowCount) && - deletionVector.equals(that.deletionVector) && - Objects.equals(addresses, that.addresses) && - Objects.equals(statisticsPredicate, that.statisticsPredicate) && - Objects.equals(partitionKeys, that.partitionKeys); - } - - @Override - public int hashCode() - { - return Objects.hash(path, start, length, fileSize, fileRowCount, fileModifiedTime, deletionVector, addresses, statisticsPredicate, partitionKeys); - } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java index 3e212c4f31d1..3c0136e34b67 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java @@ -17,7 +17,7 @@ import com.google.inject.Inject; import io.airlift.units.DataSize; import io.trino.filesystem.Location; -import io.trino.filesystem.cache.CachingHostAddressProvider; +import io.trino.filesystem.cache.SplitAffinityProvider; import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorSplitSource; import io.trino.plugin.deltalake.functions.tablechanges.TableChangesSplitSource; import io.trino.plugin.deltalake.functions.tablechanges.TableChangesTableFunctionHandle; @@ -56,7 +56,6 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableMap.toImmutableMap; import static com.google.common.collect.ImmutableSet.toImmutableSet; -import static io.trino.filesystem.cache.CachingHostAddressProvider.getSplitKey; import static io.trino.plugin.deltalake.DeltaLakeAnalyzeProperties.AnalyzeMode.FULL_REFRESH; import static io.trino.plugin.deltalake.DeltaLakeMetadata.createStatisticsPredicate; import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getDynamicFilteringWaitTimeout; @@ -86,7 +85,7 @@ public class DeltaLakeSplitManager private final double minimumAssignedSplitWeight; private final DeltaLakeFileSystemFactory fileSystemFactory; private final DeltaLakeTransactionManager deltaLakeTransactionManager; - private final CachingHostAddressProvider cachingHostAddressProvider; + private final SplitAffinityProvider splitAffinityProvider; @Inject public DeltaLakeSplitManager( @@ -96,7 +95,7 @@ public DeltaLakeSplitManager( DeltaLakeConfig config, DeltaLakeFileSystemFactory fileSystemFactory, DeltaLakeTransactionManager deltaLakeTransactionManager, - CachingHostAddressProvider cachingHostAddressProvider) + SplitAffinityProvider splitAffinityProvider) { this.typeManager = requireNonNull(typeManager, "typeManager is null"); this.transactionLogAccess = requireNonNull(transactionLogAccess, "transactionLogAccess is null"); @@ -106,7 +105,7 @@ public DeltaLakeSplitManager( this.minimumAssignedSplitWeight = config.getMinimumAssignedSplitWeight(); this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); this.deltaLakeTransactionManager = requireNonNull(deltaLakeTransactionManager, "deltaLakeTransactionManager is null"); - this.cachingHostAddressProvider = requireNonNull(cachingHostAddressProvider, "cacheHostAddressProvider is null"); + this.splitAffinityProvider = requireNonNull(splitAffinityProvider, "splitAffinityProvider is null"); } @Override @@ -309,7 +308,7 @@ private List splitsForFile( addFileEntry.getStats().flatMap(DeltaLakeFileStatistics::getNumRecords), addFileEntry.getModificationTime(), addFileEntry.getDeletionVector(), - cachingHostAddressProvider.getHosts(splitPath, ImmutableList.of()), + splitAffinityProvider.getKey(splitPath, 0, fileSize), SplitWeight.standard(), statisticsPredicate, partitionKeys)); @@ -321,6 +320,7 @@ private List splitsForFile( long maxSplitSize = getMaxSplitSize(session).toBytes(); long splitSize = Math.min(maxSplitSize, fileSize - currentOffset); + Optional affinityKey = splitAffinityProvider.getKey(splitPath, currentOffset, splitSize); splits.add(new DeltaLakeSplit( splitPath, currentOffset, @@ -329,7 +329,7 @@ private List splitsForFile( Optional.empty(), addFileEntry.getModificationTime(), addFileEntry.getDeletionVector(), - cachingHostAddressProvider.getHosts(getSplitKey(splitPath, currentOffset, splitSize), ImmutableList.of()), + affinityKey, SplitWeight.fromProportion(clamp((double) splitSize / maxSplitSize, minimumAssignedSplitWeight, 1.0)), statisticsPredicate, partitionKeys)); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitSource.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitSource.java index 075c1665b1a9..690a0dd0149e 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitSource.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitSource.java @@ -142,11 +142,11 @@ public CompletableFuture getNextBatch(int maxSize) .collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue)); List filteredSplits = splits.stream() .map(DeltaLakeSplit.class::cast) - .filter(split -> partitionMatchesPredicate(split.getPartitionKeys(), partitionColumnDomains) && - split.getStatisticsPredicate().overlaps(dynamicFilterPredicate)) + .filter(split -> partitionMatchesPredicate(split.partitionKeys(), partitionColumnDomains) && + split.statisticsPredicate().overlaps(dynamicFilterPredicate)) .collect(toImmutableList()); if (recordScannedFiles) { - filteredSplits.forEach(split -> scannedFilePaths.add(new DeltaLakeScannedDataFile(((DeltaLakeSplit) split).getPath(), ((DeltaLakeSplit) split).getPartitionKeys()))); + filteredSplits.forEach(split -> scannedFilePaths.add(new DeltaLakeScannedDataFile(((DeltaLakeSplit) split).path(), ((DeltaLakeSplit) split).partitionKeys()))); } return new ConnectorSplitBatch(filteredSplits, noMoreSplits); }, diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeMetadata.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeMetadata.java index e96e7afc6087..1ec70c88ac1c 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeMetadata.java @@ -23,8 +23,8 @@ import io.airlift.bootstrap.Bootstrap; import io.airlift.json.JsonModule; import io.trino.filesystem.TrinoFileSystemFactory; -import io.trino.filesystem.cache.CachingHostAddressProvider; -import io.trino.filesystem.cache.DefaultCachingHostAddressProvider; +import io.trino.filesystem.cache.NoopSplitAffinityProvider; +import io.trino.filesystem.cache.SplitAffinityProvider; import io.trino.filesystem.hdfs.HdfsFileSystemFactory; import io.trino.hdfs.HdfsEnvironment; import io.trino.hdfs.TrinoHdfsFileSystemStats; @@ -196,7 +196,7 @@ public void setUp() binder.bind(HdfsEnvironment.class).toInstance(HDFS_ENVIRONMENT); binder.bind(TrinoHdfsFileSystemStats.class).toInstance(HDFS_FILE_SYSTEM_STATS); binder.bind(TrinoFileSystemFactory.class).to(HdfsFileSystemFactory.class).in(Scopes.SINGLETON); - binder.bind(CachingHostAddressProvider.class).to(DefaultCachingHostAddressProvider.class).in(Scopes.SINGLETON); + binder.bind(SplitAffinityProvider.class).toInstance(new NoopSplitAffinityProvider()); }, new AbstractModule() { diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeNodeLocalDynamicSplitPruning.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeNodeLocalDynamicSplitPruning.java index ffa215cbb70d..56499384c915 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeNodeLocalDynamicSplitPruning.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeNodeLocalDynamicSplitPruning.java @@ -118,6 +118,7 @@ public void testDynamicSplitPruningOnUnpartitionedTable() Optional.empty(), 0, Optional.empty(), + Optional.empty(), SplitWeight.standard(), TupleDomain.all(), ImmutableMap.of()); @@ -215,6 +216,7 @@ public void testDynamicSplitPruningWithExplicitPartitionFilter() Optional.empty(), 0, Optional.empty(), + Optional.empty(), SplitWeight.standard(), TupleDomain.all(), ImmutableMap.of(dateColumnName, Optional.of("2023-01-10"))); diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java index 12cb207d0168..f8bbe30439d4 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java @@ -18,7 +18,7 @@ import io.airlift.json.JsonCodecFactory; import io.airlift.units.DataSize; import io.trino.filesystem.Location; -import io.trino.filesystem.cache.DefaultCachingHostAddressProvider; +import io.trino.filesystem.cache.NoopSplitAffinityProvider; import io.trino.filesystem.memory.MemoryFileSystemFactory; import io.trino.metastore.HiveMetastoreFactory; import io.trino.plugin.base.metrics.FileFormatDataSourceStats; @@ -126,11 +126,12 @@ private void testAbsolutePathSplits(String absoluteRawEncodedFilePath, String ab DeltaLakeSplitManager splitManager = setupSplitManager(addFileEntries, deltaLakeConfig); List splits = getSplits(splitManager, deltaLakeConfig); + long maxSplitSize = deltaLakeConfig.getMaxSplitSize().toBytes(); List expected = ImmutableList.of( - makeSplit(absoluteDecodedParsedFilePath, 0, 5_000, fileSize, minimumAssignedSplitWeight), - makeSplit(absoluteDecodedParsedFilePath, 5_000, 5_000, fileSize, minimumAssignedSplitWeight), - makeSplit(absoluteDecodedParsedFilePath, 10_000, 5_000, fileSize, minimumAssignedSplitWeight), - makeSplit(absoluteDecodedParsedFilePath, 15_000, 5_000, fileSize, minimumAssignedSplitWeight)); + makeSplit(absoluteDecodedParsedFilePath, 0, 5_000, fileSize, maxSplitSize, minimumAssignedSplitWeight), + makeSplit(absoluteDecodedParsedFilePath, 5_000, 5_000, fileSize, maxSplitSize, minimumAssignedSplitWeight), + makeSplit(absoluteDecodedParsedFilePath, 10_000, 5_000, fileSize, maxSplitSize, minimumAssignedSplitWeight), + makeSplit(absoluteDecodedParsedFilePath, 15_000, 5_000, fileSize, maxSplitSize, minimumAssignedSplitWeight)); assertThat(splits).isEqualTo(expected); } @@ -148,10 +149,11 @@ public void testSplitSizes() DeltaLakeSplitManager splitManager = setupSplitManager(addFileEntries, deltaLakeConfig); List splits = getSplits(splitManager, deltaLakeConfig); + long maxSplitSize = deltaLakeConfig.getMaxSplitSize().toBytes(); List expected = ImmutableList.of( - makeSplit(FULL_PATH, 0, 20_000, fileSize, minimumAssignedSplitWeight), - makeSplit(FULL_PATH, 20_000, 20_000, fileSize, minimumAssignedSplitWeight), - makeSplit(FULL_PATH, 40_000, 10_000, fileSize, minimumAssignedSplitWeight)); + makeSplit(FULL_PATH, 0, 20_000, fileSize, maxSplitSize, minimumAssignedSplitWeight), + makeSplit(FULL_PATH, 20_000, 20_000, fileSize, maxSplitSize, minimumAssignedSplitWeight), + makeSplit(FULL_PATH, 40_000, 10_000, fileSize, maxSplitSize, minimumAssignedSplitWeight)); assertThat(splits).isEqualTo(expected); } @@ -170,10 +172,11 @@ public void testSplitsFromMultipleFiles() DeltaLakeSplitManager splitManager = setupSplitManager(addFileEntries, deltaLakeConfig); List splits = getSplits(splitManager, deltaLakeConfig); + long maxSplitSize = deltaLakeConfig.getMaxSplitSize().toBytes(); List expected = ImmutableList.of( - makeSplit(FULL_PATH, 0, 1_000, firstFileSize, minimumAssignedSplitWeight), - makeSplit(FULL_PATH, 0, 10_000, secondFileSize, minimumAssignedSplitWeight), - makeSplit(FULL_PATH, 10_000, 10_000, secondFileSize, minimumAssignedSplitWeight)); + makeSplit(FULL_PATH, 0, 1_000, firstFileSize, maxSplitSize, minimumAssignedSplitWeight), + makeSplit(FULL_PATH, 0, 10_000, secondFileSize, maxSplitSize, minimumAssignedSplitWeight), + makeSplit(FULL_PATH, 10_000, 10_000, secondFileSize, maxSplitSize, minimumAssignedSplitWeight)); assertThat(splits).isEqualTo(expected); } @@ -246,7 +249,7 @@ public Stream getActiveFiles( deltaLakeConfig, new DefaultDeltaLakeFileSystemFactory(HDFS_FILE_SYSTEM_FACTORY, new NoOpVendedCredentialsProvider()), deltaLakeTransactionManager, - new DefaultCachingHostAddressProvider()); + new NoopSplitAffinityProvider()); } private AddFileEntry addFileEntryOfSize(String path, long fileSize) @@ -254,10 +257,10 @@ private AddFileEntry addFileEntryOfSize(String path, long fileSize) return new AddFileEntry(path, ImmutableMap.of(), fileSize, 0, false, Optional.empty(), Optional.empty(), ImmutableMap.of(), Optional.empty()); } - private DeltaLakeSplit makeSplit(String path, long start, long splitSize, long fileSize, double minimumAssignedSplitWeight) + private DeltaLakeSplit makeSplit(String path, long start, long splitSize, long fileSize, long maxSplitSize, double minimumAssignedSplitWeight) { - SplitWeight splitWeight = SplitWeight.fromProportion(clamp((double) fileSize / splitSize, minimumAssignedSplitWeight, 1.0)); - return new DeltaLakeSplit(path, start, splitSize, fileSize, Optional.empty(), 0, Optional.empty(), splitWeight, TupleDomain.all(), ImmutableMap.of()); + SplitWeight splitWeight = SplitWeight.fromProportion(clamp((double) splitSize / maxSplitSize, minimumAssignedSplitWeight, 1.0)); + return new DeltaLakeSplit(path, start, splitSize, fileSize, Optional.empty(), 0, Optional.empty(), Optional.empty(), splitWeight, TupleDomain.all(), ImmutableMap.of()); } private List getSplits(DeltaLakeSplitManager splitManager, DeltaLakeConfig deltaLakeConfig) diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplit.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplit.java index 125ac10f579d..596fd6074bb0 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplit.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplit.java @@ -18,6 +18,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import io.airlift.slice.SizeOf; import io.trino.metastore.HiveTypeName; import io.trino.plugin.hive.util.HiveBucketing.BucketingVersion; import io.trino.spi.HostAddress; @@ -51,6 +52,7 @@ public class HiveSplit private final Schema schema; private final List partitionKeys; private final List addresses; + private final Optional affinityKey; private final String partitionName; private final OptionalInt readBucketNumber; private final OptionalInt tableBucketNumber; @@ -90,6 +92,7 @@ public HiveSplit( schema, partitionKeys, ImmutableList.of(), + Optional.empty(), readBucketNumber, tableBucketNumber, forceLocalScheduling, @@ -110,6 +113,7 @@ public HiveSplit( Schema schema, List partitionKeys, List addresses, + Optional affinityKey, OptionalInt readBucketNumber, OptionalInt tableBucketNumber, boolean forceLocalScheduling, @@ -127,6 +131,7 @@ public HiveSplit( requireNonNull(schema, "schema is null"); requireNonNull(partitionKeys, "partitionKeys is null"); requireNonNull(addresses, "addresses is null"); + requireNonNull(affinityKey, "affinityKey is null"); requireNonNull(readBucketNumber, "readBucketNumber is null"); requireNonNull(tableBucketNumber, "tableBucketNumber is null"); requireNonNull(hiveColumnCoercions, "hiveColumnCoercions is null"); @@ -143,6 +148,7 @@ public HiveSplit( this.schema = schema; this.partitionKeys = ImmutableList.copyOf(partitionKeys); this.addresses = ImmutableList.copyOf(addresses); + this.affinityKey = affinityKey; this.readBucketNumber = readBucketNumber; this.tableBucketNumber = tableBucketNumber; this.forceLocalScheduling = forceLocalScheduling; @@ -209,6 +215,14 @@ public List getAddresses() return addresses; } + // do not serialize affinity key as it is only used by the scheduler on the coordinator + @JsonIgnore + @Override + public Optional getAffinityKey() + { + return affinityKey; + } + @JsonProperty public OptionalInt getReadBucketNumber() { @@ -272,6 +286,7 @@ public long getRetainedSizeInBytes() + schema.getRetainedSizeInBytes() + estimatedSizeOf(partitionKeys, HivePartitionKey::estimatedSizeInBytes) + estimatedSizeOf(addresses, HostAddress::getRetainedSizeInBytes) + + sizeOf(affinityKey, SizeOf::estimatedSizeOf) + estimatedSizeOf(partitionName) + sizeOf(readBucketNumber) + sizeOf(tableBucketNumber) diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitManager.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitManager.java index 94a739307466..3fc04a2f5ea1 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitManager.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitManager.java @@ -24,7 +24,7 @@ import io.airlift.stats.CounterStat; import io.airlift.units.DataSize; import io.trino.filesystem.TrinoFileSystemFactory; -import io.trino.filesystem.cache.CachingHostAddressProvider; +import io.trino.filesystem.cache.SplitAffinityProvider; import io.trino.metastore.Column; import io.trino.metastore.HiveBucketProperty; import io.trino.metastore.HivePartition; @@ -122,7 +122,7 @@ public class HiveSplitManager private final boolean recursiveDfsWalkerEnabled; private final CounterStat highMemorySplitSourceCounter; private final TypeManager typeManager; - private final CachingHostAddressProvider cachingHostAddressProvider; + private final SplitAffinityProvider splitAffinityProvider; private final int maxPartitionsPerScan; @Inject @@ -134,7 +134,7 @@ public HiveSplitManager( @ForHiveSplitManager ExecutorService executorService, VersionEmbedder versionEmbedder, TypeManager typeManager, - CachingHostAddressProvider cachingHostAddressProvider) + SplitAffinityProvider splitAffinityProvider) { this( transactionManager, @@ -151,7 +151,7 @@ public HiveSplitManager( hiveConfig.getMaxSplitsPerSecond(), hiveConfig.getRecursiveDirWalkerEnabled(), typeManager, - cachingHostAddressProvider, + splitAffinityProvider, hiveConfig.getMaxPartitionsPerScan()); } @@ -170,7 +170,7 @@ public HiveSplitManager( @Nullable Integer maxSplitsPerSecond, boolean recursiveDfsWalkerEnabled, TypeManager typeManager, - CachingHostAddressProvider cachingHostAddressProvider, + SplitAffinityProvider splitAffinityProvider, int maxPartitionsPerScan) { this.transactionManager = requireNonNull(transactionManager, "transactionManager is null"); @@ -188,7 +188,7 @@ public HiveSplitManager( this.maxSplitsPerSecond = requireNonNullElse(maxSplitsPerSecond, Integer.MAX_VALUE); this.recursiveDfsWalkerEnabled = recursiveDfsWalkerEnabled; this.typeManager = requireNonNull(typeManager, "typeManager is null"); - this.cachingHostAddressProvider = requireNonNull(cachingHostAddressProvider, "cachingHostAddressProvider is null"); + this.splitAffinityProvider = requireNonNull(splitAffinityProvider, "splitAffinityProvider is null"); this.maxPartitionsPerScan = maxPartitionsPerScan; } @@ -291,7 +291,7 @@ public ConnectorSplitSource getSplits( hiveSplitLoader, executor, highMemorySplitSourceCounter, - cachingHostAddressProvider, + splitAffinityProvider, hiveTable.isRecordScannedFiles()); hiveSplitLoader.start(splitSource); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitSource.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitSource.java index 6332eb98ca31..9f1cf8477e8c 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitSource.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitSource.java @@ -19,7 +19,7 @@ import io.airlift.log.Logger; import io.airlift.stats.CounterStat; import io.airlift.units.DataSize; -import io.trino.filesystem.cache.CachingHostAddressProvider; +import io.trino.filesystem.cache.SplitAffinityProvider; import io.trino.plugin.hive.InternalHiveSplit.InternalHiveBlock; import io.trino.plugin.hive.util.AsyncQueue; import io.trino.plugin.hive.util.AsyncQueue.BorrowResult; @@ -86,7 +86,7 @@ class HiveSplitSource private final CounterStat highMemorySplitSourceCounter; private final AtomicBoolean loggedHighMemoryWarning = new AtomicBoolean(); private final HiveSplitWeightProvider splitWeightProvider; - private final CachingHostAddressProvider cachingHostAddressProvider; + private final SplitAffinityProvider splitAffinityProvider; private final boolean recordScannedFiles; private final ImmutableList.Builder scannedFilePaths = ImmutableList.builder(); @@ -101,7 +101,7 @@ private HiveSplitSource( HiveSplitLoader splitLoader, AtomicReference stateReference, CounterStat highMemorySplitSourceCounter, - CachingHostAddressProvider cachingHostAddressProvider, + SplitAffinityProvider splitAffinityProvider, boolean recordScannedFiles) { requireNonNull(session, "session is null"); @@ -119,7 +119,7 @@ private HiveSplitSource( this.deterministicSplits = maxInitialSplits == 0 || maxInitialSplitSize.equals(maxSplitSize); this.remainingInitialSplits = new AtomicInteger(maxInitialSplits); this.splitWeightProvider = isSizeBasedSplitWeightsEnabled(session) ? new SizeBasedSplitWeightProvider(getMinimumAssignedSplitWeight(session), maxSplitSize) : HiveSplitWeightProvider.uniformStandardWeightProvider(); - this.cachingHostAddressProvider = requireNonNull(cachingHostAddressProvider, "cachingHostAddressProvider is null"); + this.splitAffinityProvider = requireNonNull(splitAffinityProvider, "splitAffinityProvider is null"); this.recordScannedFiles = recordScannedFiles; } @@ -134,7 +134,7 @@ public static HiveSplitSource allAtOnce( HiveSplitLoader splitLoader, Executor executor, CounterStat highMemorySplitSourceCounter, - CachingHostAddressProvider cachingHostAddressProvider, + SplitAffinityProvider splitAffinityProvider, boolean recordScannedFiles) { AtomicReference stateReference = new AtomicReference<>(State.initial()); @@ -175,7 +175,7 @@ public boolean isFinished() splitLoader, stateReference, highMemorySplitSourceCounter, - cachingHostAddressProvider, + splitAffinityProvider, recordScannedFiles); } @@ -304,6 +304,15 @@ else if (maxSplitBytes * 2 >= remainingBlockBytes) { splitBytes = internalSplit.getEnd() - internalSplit.getStart(); } + // Force-local splits are pinned to specific addresses by the scheduler; affinity keys are only + // meaningful for remotely accessible splits. When it is not guaranteed that the splits from a file + // will have the same size across different queries, use a file-wide key so all splits of the same + // file land on the same worker and reuse the cached content. + Optional affinityKey = internalSplit.isForceLocalScheduling() + ? Optional.empty() + : deterministicSplits + ? splitAffinityProvider.getKey(internalSplit.getPath(), internalSplit.getStart(), splitBytes) + : splitAffinityProvider.getKey(internalSplit.getPath(), 0, internalSplit.getEstimatedFileSize()); resultBuilder.add(new HiveSplit( internalSplit.getPartitionName(), internalSplit.getPath(), @@ -313,7 +322,8 @@ else if (maxSplitBytes * 2 >= remainingBlockBytes) { internalSplit.getFileModifiedTime(), internalSplit.getSchema(), internalSplit.getPartitionKeys(), - cachingHostAddressProvider.getHosts(getSplitKey(internalSplit.getPath(), internalSplit.getStart(), splitBytes), block.addresses()), + block.addresses(), + affinityKey, internalSplit.getReadBucketNumber(), internalSplit.getTableBucketNumber(), internalSplit.isForceLocalScheduling(), @@ -402,16 +412,6 @@ public void close() } } - private String getSplitKey(String path, long start, long length) - { - if (deterministicSplits) { - return CachingHostAddressProvider.getSplitKey(path, start, length); - } - // When it is not guaranteed that the splits from a file will have the same size across different queries, - // do not include start and length in the key to avoid cache misses. - return path; - } - private static boolean setIf(AtomicReference atomicReference, T newValue, Predicate predicate) { while (true) { diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestBackgroundHiveSplitLoader.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestBackgroundHiveSplitLoader.java index 5645e82f8231..96a5ed8b78a9 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestBackgroundHiveSplitLoader.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestBackgroundHiveSplitLoader.java @@ -29,7 +29,7 @@ import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.filesystem.TrinoInputFile; import io.trino.filesystem.TrinoOutputFile; -import io.trino.filesystem.cache.DefaultCachingHostAddressProvider; +import io.trino.filesystem.cache.NoopSplitAffinityProvider; import io.trino.filesystem.memory.MemoryFileSystemFactory; import io.trino.metastore.Column; import io.trino.metastore.HiveBucketProperty; @@ -1340,7 +1340,7 @@ private HiveSplitSource hiveSplitSource(HiveSplitLoader hiveSplitLoader) hiveSplitLoader, executor, new CounterStat(), - new DefaultCachingHostAddressProvider(), + new NoopSplitAffinityProvider(), false); } diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHivePageSink.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHivePageSink.java index d25c3beceedc..7abcd906082d 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHivePageSink.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHivePageSink.java @@ -404,6 +404,7 @@ private static ConnectorPageSource createPageSource(TrinoFileSystemFactory fileS new Schema(config.getHiveStorageFormat().getSerde(), false, splitProperties), ImmutableList.of(), ImmutableList.of(), + Optional.empty(), OptionalInt.empty(), OptionalInt.empty(), false, diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveSplit.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveSplit.java index bb520a8d91d2..5f7fd6bfb811 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveSplit.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveSplit.java @@ -74,6 +74,7 @@ public void testJsonRoundTrip() new Schema("abc", true, schema), partitionKeys, addresses, + Optional.of("path:42:87"), OptionalInt.empty(), OptionalInt.empty(), true, diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveSplitSource.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveSplitSource.java index 60654093018d..1fc4a1b2b0e5 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveSplitSource.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveSplitSource.java @@ -18,7 +18,7 @@ import com.google.common.util.concurrent.SettableFuture; import io.airlift.stats.CounterStat; import io.airlift.units.DataSize; -import io.trino.filesystem.cache.DefaultCachingHostAddressProvider; +import io.trino.filesystem.cache.NoopSplitAffinityProvider; import io.trino.spi.connector.ConnectorSplit; import io.trino.spi.connector.ConnectorSplitSource; import org.junit.jupiter.api.Test; @@ -58,7 +58,7 @@ public void testOutstandingSplitCount() new TestingHiveSplitLoader(), Executors.newFixedThreadPool(5), new CounterStat(), - new DefaultCachingHostAddressProvider(), + new NoopSplitAffinityProvider(), false); // add 10 splits @@ -94,7 +94,7 @@ public void testDynamicPartitionPruning() new TestingHiveSplitLoader(), Executors.newFixedThreadPool(5), new CounterStat(), - new DefaultCachingHostAddressProvider(), + new NoopSplitAffinityProvider(), false); // add two splits, one of the splits is dynamically pruned @@ -122,7 +122,7 @@ public void testEvenlySizedSplitRemainder() new TestingHiveSplitLoader(), Executors.newSingleThreadExecutor(), new CounterStat(), - new DefaultCachingHostAddressProvider(), + new NoopSplitAffinityProvider(), false); // One byte larger than the initial split max size @@ -151,7 +151,7 @@ public void testFail() new TestingHiveSplitLoader(), Executors.newFixedThreadPool(5), new CounterStat(), - new DefaultCachingHostAddressProvider(), + new NoopSplitAffinityProvider(), false); // add some splits @@ -203,7 +203,7 @@ public void testReaderWaitsForSplits() new TestingHiveSplitLoader(), Executors.newFixedThreadPool(5), new CounterStat(), - new DefaultCachingHostAddressProvider(), + new NoopSplitAffinityProvider(), false); SettableFuture splits = SettableFuture.create(); @@ -259,7 +259,7 @@ public void testOutstandingSplitSize() new TestingHiveSplitLoader(), Executors.newFixedThreadPool(5), new CounterStat(), - new DefaultCachingHostAddressProvider(), + new NoopSplitAffinityProvider(), false); int testSplitSizeInBytes = new TestSplit(0).getEstimatedSizeInBytes(); diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestNodeLocalDynamicSplitPruning.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestNodeLocalDynamicSplitPruning.java index 8bdbfcd331a4..a42c6122977f 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestNodeLocalDynamicSplitPruning.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestNodeLocalDynamicSplitPruning.java @@ -125,6 +125,7 @@ private static ConnectorPageSource createTestingPageSource(HiveTransactionHandle new Schema(hiveConfig.getHiveStorageFormat().getSerde(), false, ImmutableMap.of()), ImmutableList.of(new HivePartitionKey(PARTITION_COLUMN.getName(), "42")), ImmutableList.of(), + Optional.empty(), OptionalInt.of(1), OptionalInt.of(1), false, diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplit.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplit.java index 5899ef8936b8..dd2b8fe01166 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplit.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplit.java @@ -14,6 +14,7 @@ package io.trino.plugin.hudi; import com.google.common.collect.ImmutableList; +import io.airlift.slice.SizeOf; import io.trino.plugin.hive.HiveColumnHandle; import io.trino.plugin.hive.HivePartitionKey; import io.trino.spi.SplitWeight; @@ -21,11 +22,13 @@ import io.trino.spi.predicate.TupleDomain; import java.util.List; +import java.util.Optional; import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkArgument; import static io.airlift.slice.SizeOf.estimatedSizeOf; import static io.airlift.slice.SizeOf.instanceSize; +import static io.airlift.slice.SizeOf.sizeOf; import static java.lang.Math.toIntExact; import static java.util.Objects.requireNonNull; @@ -37,7 +40,8 @@ public record HudiSplit( long fileModifiedTime, TupleDomain predicate, List partitionKeys, - SplitWeight splitWeight) + SplitWeight splitWeight, + Optional affinityKey) implements ConnectorSplit { private static final int INSTANCE_SIZE = toIntExact(instanceSize(HudiSplit.class)); @@ -52,6 +56,13 @@ public record HudiSplit( requireNonNull(predicate, "predicate is null"); partitionKeys = ImmutableList.copyOf(partitionKeys); requireNonNull(splitWeight, "splitWeight is null"); + requireNonNull(affinityKey, "affinityKey is null"); + } + + @Override + public Optional getAffinityKey() + { + return affinityKey; } @Override @@ -61,7 +72,8 @@ public long getRetainedSizeInBytes() + estimatedSizeOf(location) + splitWeight.getRetainedSizeInBytes() + predicate.getRetainedSizeInBytes(HiveColumnHandle::getRetainedSizeInBytes) - + estimatedSizeOf(partitionKeys, HivePartitionKey::estimatedSizeInBytes); + + estimatedSizeOf(partitionKeys, HivePartitionKey::estimatedSizeInBytes) + + sizeOf(affinityKey, SizeOf::estimatedSizeOf); } @Override diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitManager.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitManager.java index ae2902ae58e4..957b721aa249 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitManager.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitManager.java @@ -16,6 +16,7 @@ import com.google.common.collect.ImmutableList; import com.google.inject.Inject; import io.trino.filesystem.TrinoFileSystemFactory; +import io.trino.filesystem.cache.SplitAffinityProvider; import io.trino.metastore.HiveMetastore; import io.trino.metastore.Table; import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorSplitSource; @@ -54,6 +55,7 @@ public class HudiSplitManager private final TrinoFileSystemFactory fileSystemFactory; private final ExecutorService executor; private final ScheduledExecutorService splitLoaderExecutorService; + private final SplitAffinityProvider splitAffinityProvider; @Inject public HudiSplitManager( @@ -61,13 +63,15 @@ public HudiSplitManager( HudiTransactionManager transactionManager, @ForHudiSplitManager ExecutorService executor, TrinoFileSystemFactory fileSystemFactory, - @ForHudiSplitSource ScheduledExecutorService splitLoaderExecutorService) + @ForHudiSplitSource ScheduledExecutorService splitLoaderExecutorService, + SplitAffinityProvider splitAffinityProvider) { this.typeManager = requireNonNull(typeManager, "typeManager is null"); this.transactionManager = requireNonNull(transactionManager, "transactionManager is null"); this.executor = requireNonNull(executor, "executor is null"); this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); this.splitLoaderExecutorService = requireNonNull(splitLoaderExecutorService, "splitLoaderExecutorService is null"); + this.splitAffinityProvider = requireNonNull(splitAffinityProvider, "splitAffinityProvider is null"); } @Override @@ -99,6 +103,7 @@ public ConnectorSplitSource getSplits( splitLoaderExecutorService, getMaxSplitsPerSecond(session), getMaxOutstandingSplits(session), + splitAffinityProvider, partitions); return new ClassLoaderSafeConnectorSplitSource(splitSource, HudiSplitManager.class.getClassLoader()); } diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitSource.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitSource.java index ee2efa976db8..3f3783b9a4d7 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitSource.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitSource.java @@ -17,6 +17,7 @@ import io.airlift.concurrent.BoundedExecutor; import io.airlift.units.DataSize; import io.trino.filesystem.TrinoFileSystemFactory; +import io.trino.filesystem.cache.SplitAffinityProvider; import io.trino.metastore.HiveMetastore; import io.trino.metastore.Table; import io.trino.plugin.hive.HiveColumnHandle; @@ -71,6 +72,7 @@ public HudiSplitSource( ScheduledExecutorService splitLoaderExecutorService, int maxSplitsPerSecond, int maxOutstandingSplits, + SplitAffinityProvider splitAffinityProvider, List partitions) { HoodieTableMetaClient metaClient = buildTableMetaClient(fileSystemFactory.create(session), tableHandle.getBasePath()); @@ -94,6 +96,7 @@ public HudiSplitSource( queue, new BoundedExecutor(executor, getSplitGeneratorParallelism(session)), createSplitWeightProvider(session), + splitAffinityProvider, partitions, throwable -> { trinoException.compareAndSet(null, new TrinoException(HUDI_CANNOT_OPEN_SPLIT, diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/split/HudiBackgroundSplitLoader.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/split/HudiBackgroundSplitLoader.java index 81447f1f7a78..b5c404c3c316 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/split/HudiBackgroundSplitLoader.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/split/HudiBackgroundSplitLoader.java @@ -15,6 +15,7 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import io.trino.filesystem.cache.SplitAffinityProvider; import io.trino.plugin.hive.util.AsyncQueue; import io.trino.plugin.hudi.HudiTableHandle; import io.trino.plugin.hudi.partition.HudiPartitionInfoLoader; @@ -55,6 +56,7 @@ public HudiBackgroundSplitLoader( AsyncQueue asyncQueue, Executor splitGeneratorExecutor, HudiSplitWeightProvider hudiSplitWeightProvider, + SplitAffinityProvider splitAffinityProvider, List partitions, Consumer errorListener) { @@ -62,7 +64,7 @@ public HudiBackgroundSplitLoader( this.asyncQueue = requireNonNull(asyncQueue, "asyncQueue is null"); this.splitGeneratorExecutor = requireNonNull(splitGeneratorExecutor, "splitGeneratorExecutorService is null"); this.splitGeneratorNumThreads = getSplitGeneratorParallelism(session); - this.hudiSplitFactory = new HudiSplitFactory(tableHandle, hudiSplitWeightProvider); + this.hudiSplitFactory = new HudiSplitFactory(tableHandle, hudiSplitWeightProvider, splitAffinityProvider); this.partitions = requireNonNull(partitions, "partitions is null"); this.errorListener = requireNonNull(errorListener, "errorListener is null"); } diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/split/HudiSplitFactory.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/split/HudiSplitFactory.java index ccae0b5a38f8..f5fce02c058b 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/split/HudiSplitFactory.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/split/HudiSplitFactory.java @@ -14,6 +14,7 @@ package io.trino.plugin.hudi.split; import com.google.common.collect.ImmutableList; +import io.trino.filesystem.cache.SplitAffinityProvider; import io.trino.plugin.hive.HivePartitionKey; import io.trino.plugin.hudi.HudiFileStatus; import io.trino.plugin.hudi.HudiSplit; @@ -32,13 +33,16 @@ public class HudiSplitFactory private final HudiTableHandle hudiTableHandle; private final HudiSplitWeightProvider hudiSplitWeightProvider; + private final SplitAffinityProvider splitAffinityProvider; public HudiSplitFactory( HudiTableHandle hudiTableHandle, - HudiSplitWeightProvider hudiSplitWeightProvider) + HudiSplitWeightProvider hudiSplitWeightProvider, + SplitAffinityProvider splitAffinityProvider) { this.hudiTableHandle = requireNonNull(hudiTableHandle, "hudiTableHandle is null"); this.hudiSplitWeightProvider = requireNonNull(hudiSplitWeightProvider, "hudiSplitWeightProvider is null"); + this.splitAffinityProvider = requireNonNull(splitAffinityProvider, "splitAffinityProvider is null"); } public List createSplits(List partitionKeys, HudiFileStatus fileStatus) @@ -48,17 +52,19 @@ public List createSplits(List partitionKeys, HudiFi } long fileSize = fileStatus.length(); + String location = fileStatus.location().toString(); if (fileSize == 0) { return ImmutableList.of(new HudiSplit( - fileStatus.location().toString(), + location, 0, fileSize, fileSize, fileStatus.modificationTime(), hudiTableHandle.getRegularPredicates(), partitionKeys, - hudiSplitWeightProvider.calculateSplitWeight(fileSize))); + hudiSplitWeightProvider.calculateSplitWeight(fileSize), + splitAffinityProvider.getKey(location, 0, fileSize))); } ImmutableList.Builder splits = ImmutableList.builder(); @@ -66,27 +72,31 @@ public List createSplits(List partitionKeys, HudiFi long bytesRemaining = fileSize; while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) { + long start = fileSize - bytesRemaining; splits.add(new HudiSplit( - fileStatus.location().toString(), - fileSize - bytesRemaining, + location, + start, splitSize, fileSize, fileStatus.modificationTime(), hudiTableHandle.getRegularPredicates(), partitionKeys, - hudiSplitWeightProvider.calculateSplitWeight(splitSize))); + hudiSplitWeightProvider.calculateSplitWeight(splitSize), + splitAffinityProvider.getKey(location, start, splitSize))); bytesRemaining -= splitSize; } if (bytesRemaining > 0) { + long start = fileSize - bytesRemaining; splits.add(new HudiSplit( - fileStatus.location().toString(), - fileSize - bytesRemaining, + location, + start, bytesRemaining, fileSize, fileStatus.modificationTime(), hudiTableHandle.getRegularPredicates(), partitionKeys, - hudiSplitWeightProvider.calculateSplitWeight(bytesRemaining))); + hudiSplitWeightProvider.calculateSplitWeight(bytesRemaining), + splitAffinityProvider.getKey(location, start, bytesRemaining))); } return splits.build(); } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergBucketFunction.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergBucketFunction.java index 6b1235d6a127..69833c6f035e 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergBucketFunction.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergBucketFunction.java @@ -87,7 +87,7 @@ public int getBucket(Page page, int position) @Override public int applyAsInt(ConnectorSplit split) { - List partitionValues = getPartitionValues(((IcebergSplit) split).getPartitionValues()); + List partitionValues = getPartitionValues(((IcebergSplit) split).partitionValues()); if (singleBucketFunction) { long bucket = (long) requireNonNullElse(partitionValues.getFirst(), 0L); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java index a82f5b0dfe12..531f0b1f4658 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java @@ -273,7 +273,7 @@ public ConnectorPageSource createPageSource( .collect(toImmutableList()); IcebergTableHandle tableHandle = (IcebergTableHandle) connectorTable; Schema schema = SchemaParser.fromJson(tableHandle.getTableSchemaJson()); - String partitionSpecJson = tableHandle.getPartitionSpecJsons().get(split.getSpecId()); + String partitionSpecJson = tableHandle.getPartitionSpecJsons().get(split.specId()); PartitionSpec partitionSpec = PartitionSpecParser.fromJson(schema, partitionSpecJson); org.apache.iceberg.types.Type[] partitionColumnTypes = partitionSpec.fields().stream() .map(field -> field.transform().getResultType(schema.findType(field.sourceId()))) @@ -284,20 +284,20 @@ public ConnectorPageSource createPageSource( icebergColumns, schema, partitionSpec, - PartitionData.fromBlocks(split.getPartitionValues(), partitionColumnTypes, typeManager), - split.getDeletes(), + PartitionData.fromBlocks(split.partitionValues(), partitionColumnTypes, typeManager), + split.deletes(), dynamicFilter, tableHandle.getUnenforcedPredicate(), - split.getFileStatisticsDomain(), - split.getPath(), - split.getStart(), - split.getLength(), - split.getFileSize(), - split.getFileRecordCount(), - split.getFileFormat(), + split.fileStatisticsDomain(), + split.path(), + split.start(), + split.length(), + split.fileSize(), + split.fileRecordCount(), + split.fileFormat(), getFileIoProperties(connectorTableCredentials), - split.getDataSequenceNumber(), - split.getFileFirstRowId(), + split.dataSequenceNumber(), + split.fileFirstRowId(), tableHandle.getNameMappingJson().map(NameMappingParser::fromJson)); } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java index 6bed586ed031..bf94f02fd37e 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java @@ -13,19 +13,18 @@ */ package io.trino.plugin.iceberg; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.MoreObjects.ToStringHelper; import com.google.common.collect.ImmutableList; +import io.airlift.slice.SizeOf; import io.trino.plugin.iceberg.delete.DeleteFile; -import io.trino.spi.HostAddress; import io.trino.spi.SplitWeight; import io.trino.spi.block.Block; import io.trino.spi.connector.ConnectorSplit; import io.trino.spi.predicate.TupleDomain; import java.util.List; +import java.util.Optional; import java.util.OptionalLong; import static com.google.common.base.MoreObjects.toStringHelper; @@ -33,179 +32,52 @@ import static io.airlift.slice.SizeOf.SIZE_OF_LONG; import static io.airlift.slice.SizeOf.estimatedSizeOf; import static io.airlift.slice.SizeOf.instanceSize; +import static io.airlift.slice.SizeOf.sizeOf; import static java.util.Objects.requireNonNull; -public class IcebergSplit +public record IcebergSplit( + @JsonProperty("path") String path, + @JsonProperty("start") long start, + @JsonProperty("length") long length, + @JsonProperty("fileSize") long fileSize, + @JsonProperty("fileRecordCount") long fileRecordCount, + @JsonProperty("fileFormat") IcebergFileFormat fileFormat, + @JsonProperty("specId") int specId, + @JsonProperty("partitionValues") List partitionValues, + @JsonProperty("deletes") List deletes, + @JsonProperty("splitWeight") SplitWeight splitWeight, + @JsonProperty("fileStatisticsDomain") TupleDomain fileStatisticsDomain, + @JsonProperty("affinityKey") Optional affinityKey, + @JsonProperty("dataSequenceNumber") long dataSequenceNumber, + @JsonProperty("fileFirstRowId") OptionalLong fileFirstRowId) implements ConnectorSplit { private static final int INSTANCE_SIZE = instanceSize(IcebergSplit.class); - private final String path; - private final long start; - private final long length; - private final long fileSize; - private final long fileRecordCount; - private final IcebergFileFormat fileFormat; - private final int specId; - private final List partitionValues; - private final List deletes; - private final SplitWeight splitWeight; - private final TupleDomain fileStatisticsDomain; - private final long dataSequenceNumber; - private final OptionalLong fileFirstRowId; - private final List addresses; - - @JsonCreator - public IcebergSplit( - @JsonProperty("path") String path, - @JsonProperty("start") long start, - @JsonProperty("length") long length, - @JsonProperty("fileSize") long fileSize, - @JsonProperty("fileRecordCount") long fileRecordCount, - @JsonProperty("fileFormat") IcebergFileFormat fileFormat, - @JsonProperty("specId") int specId, - @JsonProperty("partitionValues") List partitionValues, - @JsonProperty("deletes") List deletes, - @JsonProperty("splitWeight") SplitWeight splitWeight, - @JsonProperty("fileStatisticsDomain") TupleDomain fileStatisticsDomain, - @JsonProperty("dataSequenceNumber") long dataSequenceNumber, - @JsonProperty("fileFirstRowId") OptionalLong fileFirstRowId) + public IcebergSplit { - this( - path, - start, - length, - fileSize, - fileRecordCount, - fileFormat, - specId, - partitionValues, - deletes, - splitWeight, - fileStatisticsDomain, - ImmutableList.of(), - dataSequenceNumber, - fileFirstRowId); + requireNonNull(path, "path is null"); + requireNonNull(fileFormat, "fileFormat is null"); + partitionValues = ImmutableList.copyOf(partitionValues); + deletes = ImmutableList.copyOf(requireNonNull(deletes, "deletes is null")); + requireNonNull(splitWeight, "splitWeight is null"); + requireNonNull(fileStatisticsDomain, "fileStatisticsDomain is null"); + requireNonNull(affinityKey, "affinityKey is null"); + requireNonNull(fileFirstRowId, "fileFirstRowId is null"); } - public IcebergSplit( - String path, - long start, - long length, - long fileSize, - long fileRecordCount, - IcebergFileFormat fileFormat, - int specId, - List partitionValues, - List deletes, - SplitWeight splitWeight, - TupleDomain fileStatisticsDomain, - List addresses, - long dataSequenceNumber, - OptionalLong fileFirstRowId) - { - this.path = requireNonNull(path, "path is null"); - this.start = start; - this.length = length; - this.fileSize = fileSize; - this.fileRecordCount = fileRecordCount; - this.fileFormat = requireNonNull(fileFormat, "fileFormat is null"); - this.specId = specId; - this.partitionValues = ImmutableList.copyOf(partitionValues); - this.deletes = ImmutableList.copyOf(requireNonNull(deletes, "deletes is null")); - this.splitWeight = requireNonNull(splitWeight, "splitWeight is null"); - this.fileStatisticsDomain = requireNonNull(fileStatisticsDomain, "fileStatisticsDomain is null"); - this.addresses = requireNonNull(addresses, "addresses is null"); - this.dataSequenceNumber = dataSequenceNumber; - this.fileFirstRowId = requireNonNull(fileFirstRowId, "fileFirstRowId is null"); - } - - @JsonIgnore @Override - public List getAddresses() - { - return addresses; - } - - @JsonProperty - public String getPath() - { - return path; - } - - @JsonProperty - public long getStart() - { - return start; - } - - @JsonProperty - public long getLength() - { - return length; - } - - @JsonProperty - public long getFileSize() - { - return fileSize; - } - - @JsonProperty - public long getFileRecordCount() - { - return fileRecordCount; - } - - @JsonProperty - public IcebergFileFormat getFileFormat() - { - return fileFormat; - } - - @JsonProperty - public int getSpecId() - { - return specId; - } - - @JsonProperty - public List getPartitionValues() - { - return partitionValues; - } - - @JsonProperty - public List getDeletes() + public Optional getAffinityKey() { - return deletes; + return affinityKey; } - @JsonProperty @Override public SplitWeight getSplitWeight() { return splitWeight; } - @JsonProperty - public TupleDomain getFileStatisticsDomain() - { - return fileStatisticsDomain; - } - - @JsonProperty - public long getDataSequenceNumber() - { - return dataSequenceNumber; - } - - @JsonProperty - public OptionalLong getFileFirstRowId() - { - return fileFirstRowId; - } - @Override public long getRetainedSizeInBytes() { @@ -218,7 +90,7 @@ public long getRetainedSizeInBytes() + splitWeight.getRetainedSizeInBytes() + fileStatisticsDomain.getRetainedSizeInBytes(IcebergColumnHandle::getRetainedSizeInBytes) + SIZE_OF_LONG // dataSequenceNumber - + estimatedSizeOf(addresses, HostAddress::getRetainedSizeInBytes) + + sizeOf(affinityKey, SizeOf::estimatedSizeOf) + (fileFirstRowId.isPresent() ? SIZE_OF_LONG : 0); } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitManager.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitManager.java index b57a63670203..407101eaffbf 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitManager.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitManager.java @@ -17,7 +17,7 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.inject.Inject; import io.airlift.units.Duration; -import io.trino.filesystem.cache.CachingHostAddressProvider; +import io.trino.filesystem.cache.SplitAffinityProvider; import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorSplitSource; import io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunctionHandle; import io.trino.plugin.iceberg.functions.tablechanges.TableChangesSplitSource; @@ -64,7 +64,7 @@ public class IcebergSplitManager private final IcebergFileSystemFactory fileSystemFactory; private final ListeningExecutorService splitSourceExecutor; private final ExecutorService icebergPlanningExecutor; - private final CachingHostAddressProvider cachingHostAddressProvider; + private final SplitAffinityProvider splitAffinityProvider; @Inject public IcebergSplitManager( @@ -73,14 +73,14 @@ public IcebergSplitManager( IcebergFileSystemFactory fileSystemFactory, @ForIcebergSplitSource ListeningExecutorService splitSourceExecutor, @ForIcebergSplitManager ExecutorService icebergPlanningExecutor, - CachingHostAddressProvider cachingHostAddressProvider) + SplitAffinityProvider splitAffinityProvider) { this.transactionManager = requireNonNull(transactionManager, "transactionManager is null"); this.typeManager = requireNonNull(typeManager, "typeManager is null"); this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); this.splitSourceExecutor = requireNonNull(splitSourceExecutor, "splitSourceExecutor is null"); this.icebergPlanningExecutor = requireNonNull(icebergPlanningExecutor, "icebergPlanningExecutor is null"); - this.cachingHostAddressProvider = requireNonNull(cachingHostAddressProvider, "cachingHostAddressProvider is null"); + this.splitAffinityProvider = requireNonNull(splitAffinityProvider, "splitAffinityProvider is null"); } @Override @@ -120,7 +120,7 @@ public ConnectorSplitSource getSplits( typeManager, table.isRecordScannedFiles(), getMinimumAssignedSplitWeight(session), - cachingHostAddressProvider, + splitAffinityProvider, metricsReporter, splitSourceExecutor); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java index 9d81375a524e..08fa3a7c2ece 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java @@ -29,7 +29,7 @@ import io.airlift.units.DataSize; import io.airlift.units.Duration; import io.trino.cache.NonEvictableCache; -import io.trino.filesystem.cache.CachingHostAddressProvider; +import io.trino.filesystem.cache.SplitAffinityProvider; import io.trino.plugin.base.metrics.DurationTiming; import io.trino.plugin.base.metrics.IntList; import io.trino.plugin.base.metrics.LongCount; @@ -100,7 +100,6 @@ import static io.airlift.slice.Slices.utf8Slice; import static io.trino.cache.CacheUtils.uncheckedCacheGet; import static io.trino.cache.SafeCaches.buildNonEvictableCache; -import static io.trino.filesystem.cache.CachingHostAddressProvider.getSplitKey; import static io.trino.plugin.iceberg.ExpressionConverter.isConvertibleToIcebergExpression; import static io.trino.plugin.iceberg.ExpressionConverter.toIcebergExpression; import static io.trino.plugin.iceberg.IcebergExceptions.translateMetadataException; @@ -185,7 +184,7 @@ public class IcebergSplitSource private Map> scannedFilesByPartition = new HashMap<>(); @GuardedBy("this") private long outputRowsLowerBound; - private final CachingHostAddressProvider cachingHostAddressProvider; + private final SplitAffinityProvider splitAffinityProvider; private final InMemoryMetricsReporter metricsReporter; private volatile boolean finished; @@ -202,7 +201,7 @@ public IcebergSplitSource( TypeManager typeManager, boolean recordScannedFiles, double minimumAssignedSplitWeight, - CachingHostAddressProvider cachingHostAddressProvider, + SplitAffinityProvider splitAffinityProvider, InMemoryMetricsReporter metricsReporter, ListeningExecutorService executor) { @@ -241,7 +240,7 @@ public IcebergSplitSource( .map(IcebergColumnHandle::getId) .collect(toImmutableSet()); this.fileModifiedTimeDomain = getFileModifiedTimeDomain(tableHandle.getEnforcedPredicate()); - this.cachingHostAddressProvider = requireNonNull(cachingHostAddressProvider, "cachingHostAddressProvider is null"); + this.splitAffinityProvider = requireNonNull(splitAffinityProvider, "splitAffinityProvider is null"); this.metricsReporter = requireNonNull(metricsReporter, "metricsReporter is null"); this.executor = requireNonNull(executor, "executor is null"); } @@ -729,6 +728,7 @@ private IcebergSplit toIcebergSplit(FileScanTaskWithDomain taskWithDomain) { FileScanTask task = taskWithDomain.fileScanTask(); + Optional affinityKey = splitAffinityProvider.getKey(task.file().location(), task.start(), task.length()); return new IcebergSplit( task.file().location(), task.start(), @@ -744,7 +744,7 @@ private IcebergSplit toIcebergSplit(FileScanTaskWithDomain taskWithDomain) .collect(toImmutableList()), SplitWeight.fromProportion(clamp(getSplitWeight(task), minimumAssignedSplitWeight, 1.0)), taskWithDomain.fileStatisticsDomain(), - cachingHostAddressProvider.getHosts(getSplitKey(task.file().location(), task.start(), task.length()), ImmutableList.of()), + affinityKey, task.file().dataSequenceNumber(), task.file().firstRowId() == null ? OptionalLong.empty() : OptionalLong.of(task.file().firstRowId())); } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java index 2b2cbbd07132..023498d261f9 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java @@ -151,6 +151,7 @@ public void testDynamicSplitPruningOnUnpartitionedTable() ImmutableList.of(), SplitWeight.standard(), TupleDomain.all(), + Optional.empty(), 0, OptionalLong.empty()); @@ -213,6 +214,7 @@ public void testDynamicSplitPruningOnUnpartitionedTable() ImmutableList.of(), SplitWeight.standard(), TupleDomain.withColumnDomains(ImmutableMap.of(keyColumnHandle, Domain.singleValue(INTEGER, (long) keyColumnValue))), + Optional.empty(), 0, OptionalLong.empty()); @@ -323,6 +325,7 @@ public void testDynamicSplitPruningWithExplicitPartitionFilter() ImmutableList.of(), SplitWeight.standard(), TupleDomain.all(), + Optional.empty(), 0, OptionalLong.empty()); @@ -476,6 +479,7 @@ public void testDynamicSplitPruningWithExplicitPartitionFilterPartitionEvolution ImmutableList.of(), SplitWeight.standard(), TupleDomain.all(), + Optional.empty(), 0, OptionalLong.empty()); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java index 2271d22eb42a..354fc55f2dee 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java @@ -18,7 +18,7 @@ import com.google.common.collect.ImmutableSet; import io.airlift.units.Duration; import io.trino.filesystem.TrinoFileSystemFactory; -import io.trino.filesystem.cache.DefaultCachingHostAddressProvider; +import io.trino.filesystem.cache.NoopSplitAffinityProvider; import io.trino.metastore.HiveMetastore; import io.trino.metastore.cache.CachingHiveMetastore; import io.trino.plugin.hive.TrinoViewHiveMetastore; @@ -192,7 +192,7 @@ public TupleDomain getCurrentPredicate() TESTING_TYPE_MANAGER, false, new IcebergConfig().getMinimumAssignedSplitWeight(), - new DefaultCachingHostAddressProvider(), + new NoopSplitAffinityProvider(), new InMemoryMetricsReporter(), newDirectExecutorService())) { ImmutableList.Builder splits = ImmutableList.builder(); @@ -223,14 +223,14 @@ public void testFileStatisticsDomain() IcebergTableHandle tableHandle = createTableHandle(schemaTableName, nationTable, TupleDomain.all()); IcebergSplit split = generateSplit(nationTable, tableHandle, DynamicFilter.EMPTY); - assertThat(split.getFileStatisticsDomain()).isEqualTo(TupleDomain.all()); + assertThat(split.fileStatisticsDomain()).isEqualTo(TupleDomain.all()); IcebergColumnHandle nationKey = IcebergColumnHandle.optional(new ColumnIdentity(1, "nationkey", ColumnIdentity.TypeCategory.PRIMITIVE, ImmutableList.of())) .columnType(BIGINT) .build(); tableHandle = createTableHandle(schemaTableName, nationTable, TupleDomain.fromFixedValues(ImmutableMap.of(nationKey, NullableValue.of(BIGINT, 1L)))); split = generateSplit(nationTable, tableHandle, DynamicFilter.EMPTY); - assertThat(split.getFileStatisticsDomain()).isEqualTo(TupleDomain.withColumnDomains( + assertThat(split.fileStatisticsDomain()).isEqualTo(TupleDomain.withColumnDomains( ImmutableMap.of(nationKey, Domain.create(ValueSet.ofRanges(Range.range(BIGINT, 0L, true, 24L, true)), false)))); IcebergColumnHandle regionKey = IcebergColumnHandle.optional(new ColumnIdentity(3, "regionkey", ColumnIdentity.TypeCategory.PRIMITIVE, ImmutableList.of())) @@ -268,7 +268,7 @@ public TupleDomain getCurrentPredicate() return TupleDomain.all(); } }); - assertThat(split.getFileStatisticsDomain()).isEqualTo(TupleDomain.withColumnDomains( + assertThat(split.fileStatisticsDomain()).isEqualTo(TupleDomain.withColumnDomains( ImmutableMap.of( nationKey, Domain.create(ValueSet.ofRanges(Range.range(BIGINT, 0L, true, 24L, true)), false), regionKey, Domain.create(ValueSet.ofRanges(Range.range(BIGINT, 0L, true, 4L, true)), false)))); @@ -418,7 +418,7 @@ private IcebergSplit generateSplit(Table nationTable, IcebergTableHandle tableHa TESTING_TYPE_MANAGER, false, 0, - new DefaultCachingHostAddressProvider(), + new NoopSplitAffinityProvider(), new InMemoryMetricsReporter(), newDirectExecutorService())) { ImmutableList.Builder builder = ImmutableList.builder();