Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions core/trino-main/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@
<artifactId>jackson-databind</artifactId>
</dependency>

<dependency>
<groupId>com.github.ishugaliy</groupId>
<artifactId>allgood-consistent-hash</artifactId>
<version>1.0.0</version>
</dependency>

<dependency>
<groupId>com.google.errorprone</groupId>
<artifactId>error_prone_annotations</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,82 +11,89 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.filesystem.cache;
package io.trino.execution.scheduler;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import io.airlift.log.Logger;
import io.trino.node.InternalNode;
import io.trino.node.InternalNodeManager;
import io.trino.node.NodeState;
import io.trino.spi.HostAddress;
import io.trino.spi.Node;
import io.trino.spi.NodeManager;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import org.ishugaliy.allgood.consistent.hash.ConsistentHash;
import org.ishugaliy.allgood.consistent.hash.HashRing;
import org.ishugaliy.allgood.consistent.hash.hasher.DefaultHasher;
import org.ishugaliy.allgood.consistent.hash.node.Node;

import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.airlift.units.Duration.nanosSince;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static java.util.concurrent.TimeUnit.SECONDS;

public class ConsistentHashingHostAddressProvider
implements CachingHostAddressProvider
/**
* Maps a split's cache key to a stable list of worker host addresses using a consistent-hash ring.
* Worker membership is refreshed lazily on access (bounded by a short time window), so no
* background executor is needed.
*/
public class ConsistentHashingAddressProvider
{
private static final Logger log = Logger.get(ConsistentHashingHostAddressProvider.class);
private static final Logger log = Logger.get(ConsistentHashingAddressProvider.class);
private static final long WORKER_NODES_CACHE_TIMEOUT_SECS = 5;

private final NodeManager nodeManager;
private final ScheduledExecutorService hashRingUpdater = newSingleThreadScheduledExecutor(daemonThreadsNamed("hash-ring-refresher-%s"));
private final int replicationFactor;
private final InternalNodeManager nodeManager;
private final int preferredHostsCount;
private final Comparator<HostAddress> hostAddressComparator = Comparator.comparing(HostAddress::getHostText).thenComparing(HostAddress::getPort);

private final ConsistentHash<TrinoNode> consistentHashRing = HashRing.<TrinoNode>newBuilder()
.hasher(DefaultHasher.METRO_HASH)
.build();
private volatile long lastRefreshTime;

@Inject
public ConsistentHashingHostAddressProvider(NodeManager nodeManager, ConsistentHashingHostAddressProviderConfig configuration)
public ConsistentHashingAddressProvider(InternalNodeManager nodeManager, ConsistentHashingAddressProviderConfig config)
{
this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
this.replicationFactor = configuration.getPreferredHostsCount();
this.preferredHostsCount = config.getPreferredHostsCount();
refreshHashRing();
}

@Override
public List<HostAddress> getHosts(String splitKey, List<HostAddress> defaultAddresses)
public List<HostAddress> getHosts(String cacheKey)
{
return consistentHashRing.locate(splitKey, replicationFactor)
refreshHashRingIfNeeded();
return consistentHashRing.locate(cacheKey, preferredHostsCount)
.stream()
.map(TrinoNode::getHostAndPort)
.sorted(hostAddressComparator)
.collect(toImmutableList());
}
Comment thread
raunaqmorarka marked this conversation as resolved.

@PostConstruct
public void startRefreshingHashRing()
{
hashRingUpdater.scheduleWithFixedDelay(this::refreshHashRing, 5, 5, TimeUnit.SECONDS);
refreshHashRing();
}

@PreDestroy
public void destroy()
private void refreshHashRingIfNeeded()
{
hashRingUpdater.shutdownNow();
if (nanosSince(lastRefreshTime).getValue(SECONDS) > WORKER_NODES_CACHE_TIMEOUT_SECS) {
// Double-checked locking to reduce contention on the hash ring's write path
synchronized (this) {
if (nanosSince(lastRefreshTime).getValue(SECONDS) > WORKER_NODES_CACHE_TIMEOUT_SECS) {
refreshHashRing();
}
}
}
}

@VisibleForTesting
synchronized void refreshHashRing()
{
try {
Set<TrinoNode> trinoNodes = nodeManager.getWorkerNodes().stream().map(TrinoNode::of).collect(toImmutableSet());
Set<TrinoNode> trinoNodes = nodeManager.getNodes(NodeState.ACTIVE).stream()
.filter(node -> !node.isCoordinator())
.map(TrinoNode::of)
.collect(toImmutableSet());
lastRefreshTime = System.nanoTime();
Set<TrinoNode> hashRingNodes = consistentHashRing.getNodes();
Set<TrinoNode> removedNodes = Sets.difference(hashRingNodes, trinoNodes);
Set<TrinoNode> newNodes = Sets.difference(trinoNodes, hashRingNodes);
Expand All @@ -104,9 +111,9 @@ synchronized void refreshHashRing()
}

private record TrinoNode(String nodeIdentifier, HostAddress hostAndPort)
implements org.ishugaliy.allgood.consistent.hash.node.Node
implements Node
{
public static TrinoNode of(Node node)
public static TrinoNode of(InternalNode node)
{
return new TrinoNode(node.getNodeIdentifier(), node.getHostAndPort());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.filesystem.cache;
package io.trino.execution.scheduler;

import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
import jakarta.validation.constraints.Min;

public class ConsistentHashingHostAddressProviderConfig
public class ConsistentHashingAddressProviderConfig
{
private int preferredHostsCount = 2;

@Config("fs.cache.preferred-hosts-count")
@ConfigDescription("The number of preferred nodes for caching a file. Defaults to 2.")
public ConsistentHashingHostAddressProviderConfig setPreferredHostsCount(int preferredHostsCount)
@Min(1)
public int getPreferredHostsCount()
{
this.preferredHostsCount = preferredHostsCount;
return this;
return preferredHostsCount;
}

public int getPreferredHostsCount()
@Config("node-scheduler.cache-preferred-hosts-count")
Comment thread
raunaqmorarka marked this conversation as resolved.
@ConfigDescription("Number of preferred worker hosts the scheduler derives from a split's cache key")
public ConsistentHashingAddressProviderConfig setPreferredHostsCount(int preferredHostsCount)
{
return this.preferredHostsCount;
this.preferredHostsCount = preferredHostsCount;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public class TopologyAwareNodeSelector
private final int maxUnacknowledgedSplitsPerTask;
private final List<CounterStat> topologicalSplitCounters;
private final NetworkTopology networkTopology;
private final ConsistentHashingAddressProvider consistentHashingAddressProvider;

public TopologyAwareNodeSelector(
InternalNode currentNode,
Expand All @@ -74,7 +75,8 @@ public TopologyAwareNodeSelector(
long maxPendingSplitsWeightPerTask,
int maxUnacknowledgedSplitsPerTask,
List<CounterStat> topologicalSplitCounters,
NetworkTopology networkTopology)
NetworkTopology networkTopology,
ConsistentHashingAddressProvider consistentHashingAddressProvider)
{
this.currentNode = requireNonNull(currentNode, "currentNode is null");
this.nodeTaskMap = requireNonNull(nodeTaskMap, "nodeTaskMap is null");
Expand All @@ -87,6 +89,7 @@ public TopologyAwareNodeSelector(
checkArgument(maxUnacknowledgedSplitsPerTask > 0, "maxUnacknowledgedSplitsPerTask must be > 0, found: %s", maxUnacknowledgedSplitsPerTask);
this.topologicalSplitCounters = requireNonNull(topologicalSplitCounters, "topologicalSplitCounters is null");
this.networkTopology = requireNonNull(networkTopology, "networkTopology is null");
this.consistentHashingAddressProvider = requireNonNull(consistentHashingAddressProvider, "consistentHashingAddressProvider is null");
}

@Override
Expand Down Expand Up @@ -149,7 +152,10 @@ else if (!splitWaitingForAnyNode) {
int depth = topologicalSplitCounters.size() - 1;
int chosenDepth = 0;
Set<NetworkLocation> locations = new HashSet<>();
for (HostAddress host : split.getAddresses()) {
List<HostAddress> preferredAddresses = split.getConnectorSplit().getAffinityKey()
.map(consistentHashingAddressProvider::getHosts)
.orElseGet(split::getAddresses);
for (HostAddress host : preferredAddresses) {
locations.add(networkTopology.locate(host));
}
if (locations.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public class TopologyAwareNodeSelectorFactory
private final NetworkTopology networkTopology;
private final InternalNode currentNode;
private final InternalNodeManager nodeManager;
private final ConsistentHashingAddressProvider consistentHashingAddressProvider;
private final int minCandidates;
private final boolean includeCoordinator;
private final long maxSplitsWeightPerNode;
Expand All @@ -75,7 +76,8 @@ public TopologyAwareNodeSelectorFactory(
InternalNodeManager nodeManager,
NodeSchedulerConfig schedulerConfig,
NodeTaskMap nodeTaskMap,
TopologyAwareNodeSelectorConfig topologyConfig)
TopologyAwareNodeSelectorConfig topologyConfig,
ConsistentHashingAddressProvider consistentHashingAddressProvider)
{
requireNonNull(networkTopology, "networkTopology is null");
requireNonNull(currentNode, "currentNode is null");
Expand All @@ -85,6 +87,7 @@ public TopologyAwareNodeSelectorFactory(
this.networkTopology = networkTopology;
this.currentNode = currentNode;
this.nodeManager = nodeManager;
this.consistentHashingAddressProvider = requireNonNull(consistentHashingAddressProvider, "consistentHashingAddressProvider is null");
this.minCandidates = schedulerConfig.getMinCandidates();
this.includeCoordinator = schedulerConfig.isIncludeCoordinator();
this.nodeTaskMap = requireNonNull(nodeTaskMap, "nodeTaskMap is null");
Expand Down Expand Up @@ -136,7 +139,8 @@ public NodeSelector createNodeSelector(Session session)
minPendingSplitsWeightPerTask,
getMaxUnacknowledgedSplitsPerTask(session),
placementCounters,
networkTopology);
networkTopology,
consistentHashingAddressProvider);
}

private NodeMap createNodeMap()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.trino.execution.scheduler.NodeSchedulerConfig.SplitsBalancingPolicy;
import io.trino.metadata.Split;
import io.trino.node.InternalNode;
import io.trino.spi.HostAddress;
import io.trino.spi.TrinoException;
import jakarta.annotation.Nullable;

Expand Down Expand Up @@ -68,6 +69,7 @@ public class UniformNodeSelector
private final SplitsBalancingPolicy splitsBalancingPolicy;
private final boolean optimizedLocalScheduling;
private final QueueSizeAdjuster queueSizeAdjuster;
private final ConsistentHashingAddressProvider consistentHashingAddressProvider;

public UniformNodeSelector(
InternalNode currentNode,
Expand All @@ -80,7 +82,8 @@ public UniformNodeSelector(
long maxAdjustedPendingSplitsWeightPerTask,
int maxUnacknowledgedSplitsPerTask,
SplitsBalancingPolicy splitsBalancingPolicy,
boolean optimizedLocalScheduling)
boolean optimizedLocalScheduling,
ConsistentHashingAddressProvider consistentHashingAddressProvider)
{
this(currentNode,
nodeTaskMap,
Expand All @@ -92,7 +95,8 @@ public UniformNodeSelector(
maxUnacknowledgedSplitsPerTask,
splitsBalancingPolicy,
optimizedLocalScheduling,
new QueueSizeAdjuster(minPendingSplitsWeightPerTask, maxAdjustedPendingSplitsWeightPerTask));
new QueueSizeAdjuster(minPendingSplitsWeightPerTask, maxAdjustedPendingSplitsWeightPerTask),
consistentHashingAddressProvider);
}

@VisibleForTesting
Expand All @@ -107,7 +111,8 @@ public UniformNodeSelector(
int maxUnacknowledgedSplitsPerTask,
SplitsBalancingPolicy splitsBalancingPolicy,
boolean optimizedLocalScheduling,
QueueSizeAdjuster queueSizeAdjuster)
QueueSizeAdjuster queueSizeAdjuster,
ConsistentHashingAddressProvider consistentHashingAddressProvider)
{
this.currentNode = requireNonNull(currentNode, "currentNode is null");
this.nodeTaskMap = requireNonNull(nodeTaskMap, "nodeTaskMap is null");
Expand All @@ -121,6 +126,7 @@ public UniformNodeSelector(
this.splitsBalancingPolicy = requireNonNull(splitsBalancingPolicy, "splitsBalancingPolicy is null");
this.optimizedLocalScheduling = optimizedLocalScheduling;
this.queueSizeAdjuster = queueSizeAdjuster;
this.consistentHashingAddressProvider = requireNonNull(consistentHashingAddressProvider, "consistentHashingAddressProvider is null");
}

@Override
Expand Down Expand Up @@ -173,8 +179,11 @@ public SplitPlacementResult computeAssignments(Set<Split> splits, List<RemoteTas
}
else {
// optimizedLocalScheduling enables prioritized assignment of splits to local nodes when splits contain locality information
if (optimizedLocalScheduling && !split.getAddresses().isEmpty()) {
candidateNodes = selectExactNodes(nodeMap, split.getAddresses(), includeCoordinator);
List<HostAddress> preferredAddresses = split.getConnectorSplit().getAffinityKey()
.map(consistentHashingAddressProvider::getHosts)
.orElseGet(split::getAddresses);
Comment thread
raunaqmorarka marked this conversation as resolved.
if (optimizedLocalScheduling && !preferredAddresses.isEmpty()) {
candidateNodes = selectExactNodes(nodeMap, preferredAddresses, includeCoordinator);
if (candidateNodes.isEmpty()) {
// choose any other node if preferred node is not available
candidateNodes = selectNodes(minCandidates, randomCandidates);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class UniformNodeSelectorFactory

private final InternalNode currentNode;
private final InternalNodeManager nodeManager;
private final ConsistentHashingAddressProvider consistentHashingAddressProvider;
private final int minCandidates;
private final boolean includeCoordinator;
private final long maxSplitsWeightPerNode;
Expand All @@ -71,9 +72,10 @@ public UniformNodeSelectorFactory(
InternalNode currentNode,
InternalNodeManager nodeManager,
NodeSchedulerConfig config,
NodeTaskMap nodeTaskMap)
NodeTaskMap nodeTaskMap,
ConsistentHashingAddressProvider consistentHashingAddressProvider)
{
this(currentNode, nodeManager, config, nodeTaskMap, new Duration(5, SECONDS));
this(currentNode, nodeManager, config, nodeTaskMap, consistentHashingAddressProvider, new Duration(5, SECONDS));
}

@VisibleForTesting
Expand All @@ -82,10 +84,12 @@ public UniformNodeSelectorFactory(
InternalNodeManager nodeManager,
NodeSchedulerConfig config,
NodeTaskMap nodeTaskMap,
ConsistentHashingAddressProvider consistentHashingAddressProvider,
Duration nodeMapMemoizationDuration)
{
this.currentNode = requireNonNull(currentNode, "currentNode is null");
this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
this.consistentHashingAddressProvider = requireNonNull(consistentHashingAddressProvider, "consistentHashingAddressProvider is null");
this.minCandidates = config.getMinCandidates();
this.includeCoordinator = config.isIncludeCoordinator();
this.splitsBalancingPolicy = config.getSplitsBalancingPolicy();
Expand Down Expand Up @@ -128,7 +132,8 @@ public NodeSelector createNodeSelector(Session session)
maxAdjustedPendingSplitsWeightPerTask,
getMaxUnacknowledgedSplitsPerTask(session),
splitsBalancingPolicy,
optimizedLocalScheduling);
optimizedLocalScheduling,
consistentHashingAddressProvider);
}

private NodeMap createNodeMap()
Expand Down
Loading
Loading