From daaa774a50408b744ed6f39bad4267678f7c73d1 Mon Sep 17 00:00:00 2001 From: Dain Sundstrom Date: Mon, 17 Dec 2018 22:43:27 -0800 Subject: [PATCH 1/4] Update node list on workers Some connectors require an active list of all worers on worker nodes. This commit restores this behavior which was recently removed. --- .../presto/metadata/DiscoveryNodeManager.java | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/presto-main/src/main/java/com/facebook/presto/metadata/DiscoveryNodeManager.java b/presto-main/src/main/java/com/facebook/presto/metadata/DiscoveryNodeManager.java index 480548628e31e..ba830e8a5f200 100644 --- a/presto-main/src/main/java/com/facebook/presto/metadata/DiscoveryNodeManager.java +++ b/presto-main/src/main/java/com/facebook/presto/metadata/DiscoveryNodeManager.java @@ -145,17 +145,14 @@ private static PrestoNode findCurrentNode(List allServices, S @PostConstruct public void startPollingNodeStates() { - // poll worker states only on the coordinators - if (currentNode.isCoordinator()) { - nodeStateUpdateExecutor.scheduleWithFixedDelay(() -> { - try { - pollWorkers(); - } - catch (Exception e) { - log.error(e, "Error polling state of nodes"); - } - }, 5, 5, TimeUnit.SECONDS); - } + nodeStateUpdateExecutor.scheduleWithFixedDelay(() -> { + try { + pollWorkers(); + } + catch (Exception e) { + log.error(e, "Error polling state of nodes"); + } + }, 5, 5, TimeUnit.SECONDS); pollWorkers(); } From cd567437aeb4a0aa8f66443642d9f58ebc521e30 Mon Sep 17 00:00:00 2001 From: Dain Sundstrom Date: Fri, 21 Dec 2018 13:27:48 -0600 Subject: [PATCH 2/4] Stop DiscoveryNodeManager in tests --- .../metadata/TestDiscoveryNodeManager.java | 52 ++++++++++++------- 1 file changed, 33 insertions(+), 19 deletions(-) diff --git a/presto-main/src/test/java/com/facebook/presto/metadata/TestDiscoveryNodeManager.java b/presto-main/src/test/java/com/facebook/presto/metadata/TestDiscoveryNodeManager.java index c71ed533bb1d4..a61c20b0af99f 100644 --- a/presto-main/src/test/java/com/facebook/presto/metadata/TestDiscoveryNodeManager.java +++ b/presto-main/src/test/java/com/facebook/presto/metadata/TestDiscoveryNodeManager.java @@ -91,29 +91,34 @@ public void setup() public void testGetAllNodes() { DiscoveryNodeManager manager = new DiscoveryNodeManager(selector, nodeInfo, new NoOpFailureDetector(), expectedVersion, testHttpClient, internalCommunicationConfig); - AllNodes allNodes = manager.getAllNodes(); + try { + AllNodes allNodes = manager.getAllNodes(); - Set activeNodes = allNodes.getActiveNodes(); - assertEqualsIgnoreOrder(activeNodes, this.activeNodes); + Set activeNodes = allNodes.getActiveNodes(); + assertEqualsIgnoreOrder(activeNodes, this.activeNodes); - for (Node actual : activeNodes) { - for (Node expected : this.activeNodes) { - assertNotSame(actual, expected); + for (Node actual : activeNodes) { + for (Node expected : this.activeNodes) { + assertNotSame(actual, expected); + } } - } - assertEqualsIgnoreOrder(activeNodes, manager.getNodes(ACTIVE)); + assertEqualsIgnoreOrder(activeNodes, manager.getNodes(ACTIVE)); - Set inactiveNodes = allNodes.getInactiveNodes(); - assertEqualsIgnoreOrder(inactiveNodes, this.inactiveNodes); + Set inactiveNodes = allNodes.getInactiveNodes(); + assertEqualsIgnoreOrder(inactiveNodes, this.inactiveNodes); - for (Node actual : inactiveNodes) { - for (Node expected : this.inactiveNodes) { - assertNotSame(actual, expected); + for (Node actual : inactiveNodes) { + for (Node expected : this.inactiveNodes) { + assertNotSame(actual, expected); + } } - } - assertEqualsIgnoreOrder(inactiveNodes, manager.getNodes(INACTIVE)); + assertEqualsIgnoreOrder(inactiveNodes, manager.getNodes(INACTIVE)); + } + finally { + manager.stop(); + } } @Test @@ -126,15 +131,24 @@ public void testGetCurrentNode() .setNodeId(expected.getNodeIdentifier())); DiscoveryNodeManager manager = new DiscoveryNodeManager(selector, nodeInfo, new NoOpFailureDetector(), expectedVersion, testHttpClient, internalCommunicationConfig); - - assertEquals(manager.getCurrentNode(), expected); + try { + assertEquals(manager.getCurrentNode(), expected); + } + finally { + manager.stop(); + } } @Test public void testGetCoordinators() { - InternalNodeManager manager = new DiscoveryNodeManager(selector, nodeInfo, new NoOpFailureDetector(), expectedVersion, testHttpClient, internalCommunicationConfig); - assertEquals(manager.getCoordinators(), ImmutableSet.of(coordinator)); + DiscoveryNodeManager manager = new DiscoveryNodeManager(selector, nodeInfo, new NoOpFailureDetector(), expectedVersion, testHttpClient, internalCommunicationConfig); + try { + assertEquals(manager.getCoordinators(), ImmutableSet.of(coordinator)); + } + finally { + manager.stop(); + } } @SuppressWarnings("ResultOfObjectAllocationIgnored") From 5279dc6c686101c7aacdd75ebcca47230c6ab1c0 Mon Sep 17 00:00:00 2001 From: Dain Sundstrom Date: Thu, 3 Jan 2019 14:32:25 -0800 Subject: [PATCH 3/4] Only fire node change event on change Instead of firing node change event each time nodes are polled, fire it only when the node set changes. --- .../facebook/presto/metadata/AllNodes.java | 23 +++++++++++++++++++ .../presto/metadata/DiscoveryNodeManager.java | 21 +++++++++++------ 2 files changed, 37 insertions(+), 7 deletions(-) diff --git a/presto-main/src/main/java/com/facebook/presto/metadata/AllNodes.java b/presto-main/src/main/java/com/facebook/presto/metadata/AllNodes.java index 46deda0d43b76..4a449ccf35279 100644 --- a/presto-main/src/main/java/com/facebook/presto/metadata/AllNodes.java +++ b/presto-main/src/main/java/com/facebook/presto/metadata/AllNodes.java @@ -16,6 +16,7 @@ import com.facebook.presto.spi.Node; import com.google.common.collect.ImmutableSet; +import java.util.Objects; import java.util.Set; import static java.util.Objects.requireNonNull; @@ -54,4 +55,26 @@ public Set getActiveCoordinators() { return activeCoordinators; } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + AllNodes allNodes = (AllNodes) o; + return Objects.equals(activeNodes, allNodes.activeNodes) && + Objects.equals(inactiveNodes, allNodes.inactiveNodes) && + Objects.equals(shuttingDownNodes, allNodes.shuttingDownNodes) && + Objects.equals(activeCoordinators, allNodes.activeCoordinators); + } + + @Override + public int hashCode() + { + return Objects.hash(activeNodes, inactiveNodes, shuttingDownNodes, activeCoordinators); + } } diff --git a/presto-main/src/main/java/com/facebook/presto/metadata/DiscoveryNodeManager.java b/presto-main/src/main/java/com/facebook/presto/metadata/DiscoveryNodeManager.java index ba830e8a5f200..a2cbe9154c5bc 100644 --- a/presto-main/src/main/java/com/facebook/presto/metadata/DiscoveryNodeManager.java +++ b/presto-main/src/main/java/com/facebook/presto/metadata/DiscoveryNodeManager.java @@ -259,15 +259,20 @@ private synchronized void refreshNodesInternal() } } - // assign allNodes to a local variable for use in the callback below - AllNodes allNodes = new AllNodes(activeNodesBuilder.build(), inactiveNodesBuilder.build(), shuttingDownNodesBuilder.build(), coordinatorsBuilder.build()); - this.allNodes = allNodes; + // nodes by connector id changes anytime a node adds or removes a connector (note: this is not part of the listener system) activeNodesByConnectorId = byConnectorIdBuilder.build(); - coordinators = coordinatorsBuilder.build(); - // notify listeners - List> listeners = ImmutableList.copyOf(this.listeners); - nodeStateEventExecutor.submit(() -> listeners.forEach(listener -> listener.accept(allNodes))); + AllNodes allNodes = new AllNodes(activeNodesBuilder.build(), inactiveNodesBuilder.build(), shuttingDownNodesBuilder.build(), coordinatorsBuilder.build()); + // only update if all nodes actually changed (note: this does not include the connectors registered with the nodes) + if (!allNodes.equals(this.allNodes)) { + // assign allNodes to a local variable for use in the callback below + this.allNodes = allNodes; + coordinators = coordinatorsBuilder.build(); + + // notify listeners + List> listeners = ImmutableList.copyOf(this.listeners); + nodeStateEventExecutor.submit(() -> listeners.forEach(listener -> listener.accept(allNodes))); + } } private NodeState getNodeState(PrestoNode node) @@ -354,6 +359,8 @@ public synchronized Set getCoordinators() public synchronized void addNodeChangeListener(Consumer listener) { listeners.add(requireNonNull(listener, "listener is null")); + AllNodes allNodes = this.allNodes; + nodeStateEventExecutor.submit(() -> listener.accept(allNodes)); } @Override From c240f0d31763f49a929d00639ec57ad14a96d97b Mon Sep 17 00:00:00 2001 From: Dain Sundstrom Date: Fri, 21 Dec 2018 13:47:31 -0600 Subject: [PATCH 4/4] Add test for DiscoveryNodeManager listener --- .../metadata/TestDiscoveryNodeManager.java | 118 ++++++++++++++---- 1 file changed, 94 insertions(+), 24 deletions(-) diff --git a/presto-main/src/test/java/com/facebook/presto/metadata/TestDiscoveryNodeManager.java b/presto-main/src/test/java/com/facebook/presto/metadata/TestDiscoveryNodeManager.java index a61c20b0af99f..6c023425ef9a9 100644 --- a/presto-main/src/test/java/com/facebook/presto/metadata/TestDiscoveryNodeManager.java +++ b/presto-main/src/test/java/com/facebook/presto/metadata/TestDiscoveryNodeManager.java @@ -21,9 +21,9 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.ListenableFuture; import io.airlift.discovery.client.ServiceDescriptor; import io.airlift.discovery.client.ServiceSelector; -import io.airlift.discovery.client.testing.StaticServiceSelector; import io.airlift.http.client.HttpClient; import io.airlift.http.client.testing.TestingHttpClient; import io.airlift.http.client.testing.TestingResponse; @@ -32,15 +32,19 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import javax.annotation.concurrent.GuardedBy; + import java.net.URI; -import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import static com.facebook.presto.spi.NodeState.ACTIVE; import static com.facebook.presto.spi.NodeState.INACTIVE; import static io.airlift.discovery.client.ServiceDescriptor.serviceDescriptor; +import static io.airlift.discovery.client.ServiceSelectorConfig.DEFAULT_POOL; import static io.airlift.http.client.HttpStatus.OK; import static io.airlift.testing.Assertions.assertEqualsIgnoreOrder; import static org.testng.Assert.assertEquals; @@ -52,10 +56,11 @@ public class TestDiscoveryNodeManager private final NodeInfo nodeInfo = new NodeInfo("test"); private final InternalCommunicationConfig internalCommunicationConfig = new InternalCommunicationConfig(); private NodeVersion expectedVersion; - private List activeNodes; - private List inactiveNodes; + private Set activeNodes; + private Set inactiveNodes; private PrestoNode coordinator; - private ServiceSelector selector; + private PrestoNode currentNode; + private final PrestoNodeServiceSelector selector = new PrestoNodeServiceSelector(); private HttpClient testHttpClient; @BeforeMethod @@ -64,27 +69,19 @@ public void setup() testHttpClient = new TestingHttpClient(input -> new TestingResponse(OK, ArrayListMultimap.create(), ACTIVE.name().getBytes())); expectedVersion = new NodeVersion("1"); - coordinator = new PrestoNode(UUID.randomUUID().toString(), URI.create("https://192.0.2.8"), expectedVersion, false); - activeNodes = ImmutableList.of( - new PrestoNode(nodeInfo.getNodeId(), URI.create("http://192.0.1.1"), expectedVersion, false), + coordinator = new PrestoNode(UUID.randomUUID().toString(), URI.create("https://192.0.2.8"), expectedVersion, true); + currentNode = new PrestoNode(nodeInfo.getNodeId(), URI.create("http://192.0.1.1"), expectedVersion, false); + + activeNodes = ImmutableSet.of( + currentNode, new PrestoNode(UUID.randomUUID().toString(), URI.create("http://192.0.2.1:8080"), expectedVersion, false), new PrestoNode(UUID.randomUUID().toString(), URI.create("http://192.0.2.3"), expectedVersion, false), coordinator); - inactiveNodes = ImmutableList.of( + inactiveNodes = ImmutableSet.of( new PrestoNode(UUID.randomUUID().toString(), URI.create("https://192.0.3.9"), NodeVersion.UNKNOWN, false), new PrestoNode(UUID.randomUUID().toString(), URI.create("https://192.0.4.9"), new NodeVersion("2"), false)); - List descriptors = new ArrayList<>(); - for (PrestoNode node : Iterables.concat(activeNodes, inactiveNodes)) { - descriptors.add(serviceDescriptor("presto") - .setNodeId(node.getNodeIdentifier()) - .addProperty("http", node.getHttpUri().toString()) - .addProperty("node_version", node.getNodeVersion().toString()) - .addProperty("coordinator", String.valueOf(node.equals(coordinator))) - .build()); - } - - selector = new StaticServiceSelector(descriptors); + selector.announceNodes(activeNodes, inactiveNodes); } @Test @@ -124,15 +121,13 @@ public void testGetAllNodes() @Test public void testGetCurrentNode() { - Node expected = activeNodes.get(0); - NodeInfo nodeInfo = new NodeInfo(new NodeConfig() .setEnvironment("test") - .setNodeId(expected.getNodeIdentifier())); + .setNodeId(currentNode.getNodeIdentifier())); DiscoveryNodeManager manager = new DiscoveryNodeManager(selector, nodeInfo, new NoOpFailureDetector(), expectedVersion, testHttpClient, internalCommunicationConfig); try { - assertEquals(manager.getCurrentNode(), expected); + assertEquals(manager.getCurrentNode(), currentNode); } finally { manager.stop(); @@ -157,4 +152,79 @@ public void testGetCurrentNodeRequired() { new DiscoveryNodeManager(selector, new NodeInfo("test"), new NoOpFailureDetector(), expectedVersion, testHttpClient, internalCommunicationConfig); } + + @Test(timeOut = 60000) + public void testNodeChangeListener() + throws Exception + { + DiscoveryNodeManager manager = new DiscoveryNodeManager(selector, nodeInfo, new NoOpFailureDetector(), expectedVersion, testHttpClient, internalCommunicationConfig); + try { + manager.startPollingNodeStates(); + + BlockingQueue notifications = new ArrayBlockingQueue<>(100); + manager.addNodeChangeListener(notifications::add); + AllNodes allNodes = notifications.take(); + assertEquals(allNodes.getActiveNodes(), activeNodes); + assertEquals(allNodes.getInactiveNodes(), inactiveNodes); + + selector.announceNodes(ImmutableSet.of(currentNode), ImmutableSet.of(coordinator)); + allNodes = notifications.take(); + assertEquals(allNodes.getActiveNodes(), ImmutableSet.of(currentNode, coordinator)); + assertEquals(allNodes.getActiveCoordinators(), ImmutableSet.of(coordinator)); + + selector.announceNodes(activeNodes, inactiveNodes); + allNodes = notifications.take(); + assertEquals(allNodes.getActiveNodes(), activeNodes); + assertEquals(allNodes.getInactiveNodes(), inactiveNodes); + } + finally { + manager.stop(); + } + } + + public static class PrestoNodeServiceSelector + implements ServiceSelector + { + @GuardedBy("this") + private List descriptors = ImmutableList.of(); + + private synchronized void announceNodes(Set activeNodes, Set inactiveNodes) + { + ImmutableList.Builder descriptors = ImmutableList.builder(); + for (Node node : Iterables.concat(activeNodes, inactiveNodes)) { + descriptors.add(serviceDescriptor("presto") + .setNodeId(node.getNodeIdentifier()) + .addProperty("http", node.getHttpUri().toString()) + .addProperty("node_version", ((PrestoNode) node).getNodeVersion().toString()) + .addProperty("coordinator", String.valueOf(node.isCoordinator())) + .build()); + } + + this.descriptors = descriptors.build(); + } + + @Override + public String getType() + { + return "presto"; + } + + @Override + public String getPool() + { + return DEFAULT_POOL; + } + + @Override + public synchronized List selectAllServices() + { + return descriptors; + } + + @Override + public ListenableFuture> refresh() + { + throw new UnsupportedOperationException(); + } + } }