Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import jakarta.annotation.PreDestroy;
import org.weakref.jmx.Managed;

import java.util.List;
import java.util.PriorityQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
Expand Down Expand Up @@ -56,7 +57,7 @@ public class ClusterSizeMonitor
private int currentCount;

@GuardedBy("this")
private final PriorityQueue<MinNodesFuture> futuresQueue = new PriorityQueue<>(comparing(MinNodesFuture::getExecutionMinCount));
private final PriorityQueue<MinNodesFuture> futuresQueue = new PriorityQueue<>(comparing(MinNodesFuture::executionMinCount));

@Inject
public ClusterSizeMonitor(InternalNodeManager nodeManager, NodeSchedulerConfig nodeSchedulerConfig)
Expand Down Expand Up @@ -145,45 +146,31 @@ private synchronized void updateAllNodes(AllNodes allNodes)
ImmutableList.Builder<SettableFuture<Void>> listenersBuilder = ImmutableList.builder();
while (!futuresQueue.isEmpty()) {
MinNodesFuture minNodesFuture = futuresQueue.peek();
if (minNodesFuture == null || minNodesFuture.getExecutionMinCount() > currentCount) {
if (minNodesFuture.executionMinCount() > currentCount) {
break;
}
listenersBuilder.add(minNodesFuture.getFuture());
listenersBuilder.add(minNodesFuture.future());
// this should not happen since we have a lock
checkState(futuresQueue.poll() == minNodesFuture, "Unexpected modifications to MinNodesFuture queue");
}
ImmutableList<SettableFuture<Void>> listeners = listenersBuilder.build();
List<SettableFuture<Void>> listeners = listenersBuilder.build();
executor.submit(() -> listeners.forEach(listener -> listener.set(null)));
}

@Managed
public synchronized int getRequiredWorkers()
{
return futuresQueue.stream()
.map(MinNodesFuture::getExecutionMinCount)
.map(MinNodesFuture::executionMinCount)
.max(Integer::compareTo)
.orElse(0);
}

private static class MinNodesFuture
private record MinNodesFuture(int executionMinCount, SettableFuture<Void> future)
{
private final int executionMinCount;
private final SettableFuture<Void> future;

MinNodesFuture(int executionMinCount, SettableFuture<Void> future)
{
this.executionMinCount = executionMinCount;
this.future = future;
}

int getExecutionMinCount()
{
return executionMinCount;
}

SettableFuture<Void> getFuture()
MinNodesFuture
{
return future;
requireNonNull(future, "future is null");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ private void pollWorkers()
.addAll(allNodes.getShuttingDownNodes())
.build();

ImmutableSet<String> aliveNodeIds = aliveNodes.stream()
Set<String> aliveNodeIds = aliveNodes.stream()
.map(InternalNode::getNodeIdentifier)
.collect(toImmutableSet());

Expand Down Expand Up @@ -259,9 +259,13 @@ private synchronized void refreshNodesInternal()
}
}

Set<InternalNode> activeNodes = activeNodesBuilder.build();
Set<InternalNode> inactiveNodes = inactiveNodesBuilder.build();
Set<InternalNode> coordinators = coordinatorsBuilder.build();
Set<InternalNode> shuttingDownNodes = shuttingDownNodesBuilder.build();
if (allNodes != null) {
// log node that are no longer active (but not shutting down)
SetView<InternalNode> missingNodes = difference(allNodes.getActiveNodes(), Sets.union(activeNodesBuilder.build(), shuttingDownNodesBuilder.build()));
SetView<InternalNode> missingNodes = difference(allNodes.getActiveNodes(), Sets.union(activeNodes, shuttingDownNodes));
for (InternalNode missingNode : missingNodes) {
log.info("Previously active node is missing: %s (last seen at %s)", missingNode.getNodeIdentifier(), missingNode.getHost());
}
Expand All @@ -272,12 +276,12 @@ private synchronized void refreshNodesInternal()
activeNodesByCatalogHandle = Optional.of(byCatalogHandleBuilder.build());
}

AllNodes allNodes = new AllNodes(activeNodesBuilder.build(), inactiveNodesBuilder.build(), shuttingDownNodesBuilder.build(), coordinatorsBuilder.build());
AllNodes allNodes = new AllNodes(activeNodes, inactiveNodes, shuttingDownNodes, coordinators);
// only update if all nodes actually changed (note: this does not include the connectors registered with the nodes)
if (!allNodes.equals(this.allNodes)) {
// assign allNodes to a local variable for use in the callback below
this.allNodes = allNodes;
coordinators = coordinatorsBuilder.build();
this.coordinators = coordinators;

// notify listeners
List<Consumer<AllNodes>> listeners = ImmutableList.copyOf(this.listeners);
Expand All @@ -298,10 +302,9 @@ private NodeState getNodeState(InternalNode node)

private boolean isNodeShuttingDown(String nodeId)
{
Optional<NodeState> remoteNodeState = nodeStates.containsKey(nodeId)
? nodeStates.get(nodeId).getNodeState()
: Optional.empty();
return remoteNodeState.isPresent() && remoteNodeState.get() == SHUTTING_DOWN;
return Optional.ofNullable(nodeStates.get(nodeId))
.flatMap(RemoteNodeState::getNodeState)
.orElse(NodeState.ACTIVE) == SHUTTING_DOWN;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,23 +56,15 @@

public final class IcebergQueryRunner
{
private IcebergQueryRunner() {}

public static final String ICEBERG_CATALOG = "iceberg";

static {
Logging logging = Logging.initialize();
logging.setLevel("org.apache.iceberg", Level.OFF);
}

private IcebergQueryRunner() {}

public static QueryRunner createIcebergQueryRunner(TpchTable<?>... tables)
throws Exception
{
return builder()
.setInitialTables(tables)
.build();
}

public static Builder builder()
{
return new Builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class TestIcebergMergeAppend
protected QueryRunner createQueryRunner()
throws Exception
{
QueryRunner queryRunner = IcebergQueryRunner.createIcebergQueryRunner();
QueryRunner queryRunner = IcebergQueryRunner.builder().build();
HiveMetastore metastore = ((IcebergConnector) queryRunner.getCoordinator().getConnector(ICEBERG_CATALOG)).getInjector()
.getInstance(HiveMetastoreFactory.class)
.createMetastore(Optional.empty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;

import static io.trino.plugin.iceberg.IcebergQueryRunner.createIcebergQueryRunner;
import static java.lang.String.format;
import static java.time.ZoneOffset.UTC;
import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS;
Expand All @@ -42,7 +41,7 @@ public class TestIcebergReadVersionedTable
protected QueryRunner createQueryRunner()
throws Exception
{
return createIcebergQueryRunner();
return IcebergQueryRunner.builder().build();
}

@BeforeAll
Expand Down