diff --git a/presto-main/src/main/java/com/facebook/presto/metadata/AllNodes.java b/presto-main/src/main/java/com/facebook/presto/metadata/AllNodes.java index 46deda0d43b76..4a449ccf35279 100644 --- a/presto-main/src/main/java/com/facebook/presto/metadata/AllNodes.java +++ b/presto-main/src/main/java/com/facebook/presto/metadata/AllNodes.java @@ -16,6 +16,7 @@ import com.facebook.presto.spi.Node; import com.google.common.collect.ImmutableSet; +import java.util.Objects; import java.util.Set; import static java.util.Objects.requireNonNull; @@ -54,4 +55,26 @@ public Set getActiveCoordinators() { return activeCoordinators; } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + AllNodes allNodes = (AllNodes) o; + return Objects.equals(activeNodes, allNodes.activeNodes) && + Objects.equals(inactiveNodes, allNodes.inactiveNodes) && + Objects.equals(shuttingDownNodes, allNodes.shuttingDownNodes) && + Objects.equals(activeCoordinators, allNodes.activeCoordinators); + } + + @Override + public int hashCode() + { + return Objects.hash(activeNodes, inactiveNodes, shuttingDownNodes, activeCoordinators); + } } diff --git a/presto-main/src/main/java/com/facebook/presto/metadata/DiscoveryNodeManager.java b/presto-main/src/main/java/com/facebook/presto/metadata/DiscoveryNodeManager.java index 480548628e31e..a2cbe9154c5bc 100644 --- a/presto-main/src/main/java/com/facebook/presto/metadata/DiscoveryNodeManager.java +++ b/presto-main/src/main/java/com/facebook/presto/metadata/DiscoveryNodeManager.java @@ -145,17 +145,14 @@ private static PrestoNode findCurrentNode(List allServices, S @PostConstruct public void startPollingNodeStates() { - // poll worker states only on the coordinators - if (currentNode.isCoordinator()) { - nodeStateUpdateExecutor.scheduleWithFixedDelay(() -> { - try { - pollWorkers(); - } - catch (Exception e) { - log.error(e, "Error polling state of nodes"); - } - }, 5, 5, TimeUnit.SECONDS); - } + nodeStateUpdateExecutor.scheduleWithFixedDelay(() -> { + try { + pollWorkers(); + } + catch (Exception e) { + log.error(e, "Error polling state of nodes"); + } + }, 5, 5, TimeUnit.SECONDS); pollWorkers(); } @@ -262,15 +259,20 @@ private synchronized void refreshNodesInternal() } } - // assign allNodes to a local variable for use in the callback below - AllNodes allNodes = new AllNodes(activeNodesBuilder.build(), inactiveNodesBuilder.build(), shuttingDownNodesBuilder.build(), coordinatorsBuilder.build()); - this.allNodes = allNodes; + // nodes by connector id changes anytime a node adds or removes a connector (note: this is not part of the listener system) activeNodesByConnectorId = byConnectorIdBuilder.build(); - coordinators = coordinatorsBuilder.build(); - // notify listeners - List> listeners = ImmutableList.copyOf(this.listeners); - nodeStateEventExecutor.submit(() -> listeners.forEach(listener -> listener.accept(allNodes))); + AllNodes allNodes = new AllNodes(activeNodesBuilder.build(), inactiveNodesBuilder.build(), shuttingDownNodesBuilder.build(), coordinatorsBuilder.build()); + // only update if all nodes actually changed (note: this does not include the connectors registered with the nodes) + if (!allNodes.equals(this.allNodes)) { + // assign allNodes to a local variable for use in the callback below + this.allNodes = allNodes; + coordinators = coordinatorsBuilder.build(); + + // notify listeners + List> listeners = ImmutableList.copyOf(this.listeners); + nodeStateEventExecutor.submit(() -> listeners.forEach(listener -> listener.accept(allNodes))); + } } private NodeState getNodeState(PrestoNode node) @@ -357,6 +359,8 @@ public synchronized Set getCoordinators() public synchronized void addNodeChangeListener(Consumer listener) { listeners.add(requireNonNull(listener, "listener is null")); + AllNodes allNodes = this.allNodes; + nodeStateEventExecutor.submit(() -> listener.accept(allNodes)); } @Override diff --git a/presto-main/src/test/java/com/facebook/presto/metadata/TestDiscoveryNodeManager.java b/presto-main/src/test/java/com/facebook/presto/metadata/TestDiscoveryNodeManager.java index c71ed533bb1d4..6c023425ef9a9 100644 --- a/presto-main/src/test/java/com/facebook/presto/metadata/TestDiscoveryNodeManager.java +++ b/presto-main/src/test/java/com/facebook/presto/metadata/TestDiscoveryNodeManager.java @@ -21,9 +21,9 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.ListenableFuture; import io.airlift.discovery.client.ServiceDescriptor; import io.airlift.discovery.client.ServiceSelector; -import io.airlift.discovery.client.testing.StaticServiceSelector; import io.airlift.http.client.HttpClient; import io.airlift.http.client.testing.TestingHttpClient; import io.airlift.http.client.testing.TestingResponse; @@ -32,15 +32,19 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import javax.annotation.concurrent.GuardedBy; + import java.net.URI; -import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import static com.facebook.presto.spi.NodeState.ACTIVE; import static com.facebook.presto.spi.NodeState.INACTIVE; import static io.airlift.discovery.client.ServiceDescriptor.serviceDescriptor; +import static io.airlift.discovery.client.ServiceSelectorConfig.DEFAULT_POOL; import static io.airlift.http.client.HttpStatus.OK; import static io.airlift.testing.Assertions.assertEqualsIgnoreOrder; import static org.testng.Assert.assertEquals; @@ -52,10 +56,11 @@ public class TestDiscoveryNodeManager private final NodeInfo nodeInfo = new NodeInfo("test"); private final InternalCommunicationConfig internalCommunicationConfig = new InternalCommunicationConfig(); private NodeVersion expectedVersion; - private List activeNodes; - private List inactiveNodes; + private Set activeNodes; + private Set inactiveNodes; private PrestoNode coordinator; - private ServiceSelector selector; + private PrestoNode currentNode; + private final PrestoNodeServiceSelector selector = new PrestoNodeServiceSelector(); private HttpClient testHttpClient; @BeforeMethod @@ -64,77 +69,81 @@ public void setup() testHttpClient = new TestingHttpClient(input -> new TestingResponse(OK, ArrayListMultimap.create(), ACTIVE.name().getBytes())); expectedVersion = new NodeVersion("1"); - coordinator = new PrestoNode(UUID.randomUUID().toString(), URI.create("https://192.0.2.8"), expectedVersion, false); - activeNodes = ImmutableList.of( - new PrestoNode(nodeInfo.getNodeId(), URI.create("http://192.0.1.1"), expectedVersion, false), + coordinator = new PrestoNode(UUID.randomUUID().toString(), URI.create("https://192.0.2.8"), expectedVersion, true); + currentNode = new PrestoNode(nodeInfo.getNodeId(), URI.create("http://192.0.1.1"), expectedVersion, false); + + activeNodes = ImmutableSet.of( + currentNode, new PrestoNode(UUID.randomUUID().toString(), URI.create("http://192.0.2.1:8080"), expectedVersion, false), new PrestoNode(UUID.randomUUID().toString(), URI.create("http://192.0.2.3"), expectedVersion, false), coordinator); - inactiveNodes = ImmutableList.of( + inactiveNodes = ImmutableSet.of( new PrestoNode(UUID.randomUUID().toString(), URI.create("https://192.0.3.9"), NodeVersion.UNKNOWN, false), new PrestoNode(UUID.randomUUID().toString(), URI.create("https://192.0.4.9"), new NodeVersion("2"), false)); - List descriptors = new ArrayList<>(); - for (PrestoNode node : Iterables.concat(activeNodes, inactiveNodes)) { - descriptors.add(serviceDescriptor("presto") - .setNodeId(node.getNodeIdentifier()) - .addProperty("http", node.getHttpUri().toString()) - .addProperty("node_version", node.getNodeVersion().toString()) - .addProperty("coordinator", String.valueOf(node.equals(coordinator))) - .build()); - } - - selector = new StaticServiceSelector(descriptors); + selector.announceNodes(activeNodes, inactiveNodes); } @Test public void testGetAllNodes() { DiscoveryNodeManager manager = new DiscoveryNodeManager(selector, nodeInfo, new NoOpFailureDetector(), expectedVersion, testHttpClient, internalCommunicationConfig); - AllNodes allNodes = manager.getAllNodes(); + try { + AllNodes allNodes = manager.getAllNodes(); - Set activeNodes = allNodes.getActiveNodes(); - assertEqualsIgnoreOrder(activeNodes, this.activeNodes); + Set activeNodes = allNodes.getActiveNodes(); + assertEqualsIgnoreOrder(activeNodes, this.activeNodes); - for (Node actual : activeNodes) { - for (Node expected : this.activeNodes) { - assertNotSame(actual, expected); + for (Node actual : activeNodes) { + for (Node expected : this.activeNodes) { + assertNotSame(actual, expected); + } } - } - assertEqualsIgnoreOrder(activeNodes, manager.getNodes(ACTIVE)); + assertEqualsIgnoreOrder(activeNodes, manager.getNodes(ACTIVE)); - Set inactiveNodes = allNodes.getInactiveNodes(); - assertEqualsIgnoreOrder(inactiveNodes, this.inactiveNodes); + Set inactiveNodes = allNodes.getInactiveNodes(); + assertEqualsIgnoreOrder(inactiveNodes, this.inactiveNodes); - for (Node actual : inactiveNodes) { - for (Node expected : this.inactiveNodes) { - assertNotSame(actual, expected); + for (Node actual : inactiveNodes) { + for (Node expected : this.inactiveNodes) { + assertNotSame(actual, expected); + } } - } - assertEqualsIgnoreOrder(inactiveNodes, manager.getNodes(INACTIVE)); + assertEqualsIgnoreOrder(inactiveNodes, manager.getNodes(INACTIVE)); + } + finally { + manager.stop(); + } } @Test public void testGetCurrentNode() { - Node expected = activeNodes.get(0); - NodeInfo nodeInfo = new NodeInfo(new NodeConfig() .setEnvironment("test") - .setNodeId(expected.getNodeIdentifier())); + .setNodeId(currentNode.getNodeIdentifier())); DiscoveryNodeManager manager = new DiscoveryNodeManager(selector, nodeInfo, new NoOpFailureDetector(), expectedVersion, testHttpClient, internalCommunicationConfig); - - assertEquals(manager.getCurrentNode(), expected); + try { + assertEquals(manager.getCurrentNode(), currentNode); + } + finally { + manager.stop(); + } } @Test public void testGetCoordinators() { - InternalNodeManager manager = new DiscoveryNodeManager(selector, nodeInfo, new NoOpFailureDetector(), expectedVersion, testHttpClient, internalCommunicationConfig); - assertEquals(manager.getCoordinators(), ImmutableSet.of(coordinator)); + DiscoveryNodeManager manager = new DiscoveryNodeManager(selector, nodeInfo, new NoOpFailureDetector(), expectedVersion, testHttpClient, internalCommunicationConfig); + try { + assertEquals(manager.getCoordinators(), ImmutableSet.of(coordinator)); + } + finally { + manager.stop(); + } } @SuppressWarnings("ResultOfObjectAllocationIgnored") @@ -143,4 +152,79 @@ public void testGetCurrentNodeRequired() { new DiscoveryNodeManager(selector, new NodeInfo("test"), new NoOpFailureDetector(), expectedVersion, testHttpClient, internalCommunicationConfig); } + + @Test(timeOut = 60000) + public void testNodeChangeListener() + throws Exception + { + DiscoveryNodeManager manager = new DiscoveryNodeManager(selector, nodeInfo, new NoOpFailureDetector(), expectedVersion, testHttpClient, internalCommunicationConfig); + try { + manager.startPollingNodeStates(); + + BlockingQueue notifications = new ArrayBlockingQueue<>(100); + manager.addNodeChangeListener(notifications::add); + AllNodes allNodes = notifications.take(); + assertEquals(allNodes.getActiveNodes(), activeNodes); + assertEquals(allNodes.getInactiveNodes(), inactiveNodes); + + selector.announceNodes(ImmutableSet.of(currentNode), ImmutableSet.of(coordinator)); + allNodes = notifications.take(); + assertEquals(allNodes.getActiveNodes(), ImmutableSet.of(currentNode, coordinator)); + assertEquals(allNodes.getActiveCoordinators(), ImmutableSet.of(coordinator)); + + selector.announceNodes(activeNodes, inactiveNodes); + allNodes = notifications.take(); + assertEquals(allNodes.getActiveNodes(), activeNodes); + assertEquals(allNodes.getInactiveNodes(), inactiveNodes); + } + finally { + manager.stop(); + } + } + + public static class PrestoNodeServiceSelector + implements ServiceSelector + { + @GuardedBy("this") + private List descriptors = ImmutableList.of(); + + private synchronized void announceNodes(Set activeNodes, Set inactiveNodes) + { + ImmutableList.Builder descriptors = ImmutableList.builder(); + for (Node node : Iterables.concat(activeNodes, inactiveNodes)) { + descriptors.add(serviceDescriptor("presto") + .setNodeId(node.getNodeIdentifier()) + .addProperty("http", node.getHttpUri().toString()) + .addProperty("node_version", ((PrestoNode) node).getNodeVersion().toString()) + .addProperty("coordinator", String.valueOf(node.isCoordinator())) + .build()); + } + + this.descriptors = descriptors.build(); + } + + @Override + public String getType() + { + return "presto"; + } + + @Override + public String getPool() + { + return DEFAULT_POOL; + } + + @Override + public synchronized List selectAllServices() + { + return descriptors; + } + + @Override + public ListenableFuture> refresh() + { + throw new UnsupportedOperationException(); + } + } }