Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.facebook.presto.spi.Node;
import com.google.common.collect.ImmutableSet;

import java.util.Objects;
import java.util.Set;

import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -54,4 +55,26 @@ public Set<Node> getActiveCoordinators()
{
return activeCoordinators;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
AllNodes allNodes = (AllNodes) o;
return Objects.equals(activeNodes, allNodes.activeNodes) &&
Objects.equals(inactiveNodes, allNodes.inactiveNodes) &&
Objects.equals(shuttingDownNodes, allNodes.shuttingDownNodes) &&
Objects.equals(activeCoordinators, allNodes.activeCoordinators);
}

@Override
public int hashCode()
{
return Objects.hash(activeNodes, inactiveNodes, shuttingDownNodes, activeCoordinators);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -145,17 +145,14 @@ private static PrestoNode findCurrentNode(List<ServiceDescriptor> allServices, S
@PostConstruct
public void startPollingNodeStates()
{
// poll worker states only on the coordinators
if (currentNode.isCoordinator()) {
nodeStateUpdateExecutor.scheduleWithFixedDelay(() -> {
try {
pollWorkers();
}
catch (Exception e) {
log.error(e, "Error polling state of nodes");
}
}, 5, 5, TimeUnit.SECONDS);
}
nodeStateUpdateExecutor.scheduleWithFixedDelay(() -> {
try {
pollWorkers();
}
catch (Exception e) {
log.error(e, "Error polling state of nodes");
}
}, 5, 5, TimeUnit.SECONDS);
pollWorkers();
}

Expand Down Expand Up @@ -262,15 +259,20 @@ private synchronized void refreshNodesInternal()
}
}

// assign allNodes to a local variable for use in the callback below
AllNodes allNodes = new AllNodes(activeNodesBuilder.build(), inactiveNodesBuilder.build(), shuttingDownNodesBuilder.build(), coordinatorsBuilder.build());
this.allNodes = allNodes;
// nodes by connector id changes anytime a node adds or removes a connector (note: this is not part of the listener system)
activeNodesByConnectorId = byConnectorIdBuilder.build();
coordinators = coordinatorsBuilder.build();

// notify listeners
List<Consumer<AllNodes>> listeners = ImmutableList.copyOf(this.listeners);
nodeStateEventExecutor.submit(() -> listeners.forEach(listener -> listener.accept(allNodes)));
AllNodes allNodes = new AllNodes(activeNodesBuilder.build(), inactiveNodesBuilder.build(), shuttingDownNodesBuilder.build(), coordinatorsBuilder.build());
// only update if all nodes actually changed (note: this does not include the connectors registered with the nodes)
if (!allNodes.equals(this.allNodes)) {
// assign allNodes to a local variable for use in the callback below
this.allNodes = allNodes;
coordinators = coordinatorsBuilder.build();

// notify listeners
List<Consumer<AllNodes>> listeners = ImmutableList.copyOf(this.listeners);
nodeStateEventExecutor.submit(() -> listeners.forEach(listener -> listener.accept(allNodes)));
}
}

private NodeState getNodeState(PrestoNode node)
Expand Down Expand Up @@ -357,6 +359,8 @@ public synchronized Set<Node> getCoordinators()
public synchronized void addNodeChangeListener(Consumer<AllNodes> listener)
{
listeners.add(requireNonNull(listener, "listener is null"));
AllNodes allNodes = this.allNodes;
nodeStateEventExecutor.submit(() -> listener.accept(allNodes));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.discovery.client.ServiceDescriptor;
import io.airlift.discovery.client.ServiceSelector;
import io.airlift.discovery.client.testing.StaticServiceSelector;
import io.airlift.http.client.HttpClient;
import io.airlift.http.client.testing.TestingHttpClient;
import io.airlift.http.client.testing.TestingResponse;
Expand All @@ -32,15 +32,19 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import javax.annotation.concurrent.GuardedBy;

import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

import static com.facebook.presto.spi.NodeState.ACTIVE;
import static com.facebook.presto.spi.NodeState.INACTIVE;
import static io.airlift.discovery.client.ServiceDescriptor.serviceDescriptor;
import static io.airlift.discovery.client.ServiceSelectorConfig.DEFAULT_POOL;
import static io.airlift.http.client.HttpStatus.OK;
import static io.airlift.testing.Assertions.assertEqualsIgnoreOrder;
import static org.testng.Assert.assertEquals;
Expand All @@ -52,10 +56,11 @@ public class TestDiscoveryNodeManager
private final NodeInfo nodeInfo = new NodeInfo("test");
private final InternalCommunicationConfig internalCommunicationConfig = new InternalCommunicationConfig();
private NodeVersion expectedVersion;
private List<PrestoNode> activeNodes;
private List<PrestoNode> inactiveNodes;
private Set<Node> activeNodes;
private Set<Node> inactiveNodes;
private PrestoNode coordinator;
private ServiceSelector selector;
private PrestoNode currentNode;
private final PrestoNodeServiceSelector selector = new PrestoNodeServiceSelector();
private HttpClient testHttpClient;

@BeforeMethod
Expand All @@ -64,77 +69,81 @@ public void setup()
testHttpClient = new TestingHttpClient(input -> new TestingResponse(OK, ArrayListMultimap.create(), ACTIVE.name().getBytes()));

expectedVersion = new NodeVersion("1");
coordinator = new PrestoNode(UUID.randomUUID().toString(), URI.create("https://192.0.2.8"), expectedVersion, false);
activeNodes = ImmutableList.of(
new PrestoNode(nodeInfo.getNodeId(), URI.create("http://192.0.1.1"), expectedVersion, false),
coordinator = new PrestoNode(UUID.randomUUID().toString(), URI.create("https://192.0.2.8"), expectedVersion, true);
currentNode = new PrestoNode(nodeInfo.getNodeId(), URI.create("http://192.0.1.1"), expectedVersion, false);

activeNodes = ImmutableSet.of(
currentNode,
new PrestoNode(UUID.randomUUID().toString(), URI.create("http://192.0.2.1:8080"), expectedVersion, false),
new PrestoNode(UUID.randomUUID().toString(), URI.create("http://192.0.2.3"), expectedVersion, false),
coordinator);
inactiveNodes = ImmutableList.of(
inactiveNodes = ImmutableSet.of(
new PrestoNode(UUID.randomUUID().toString(), URI.create("https://192.0.3.9"), NodeVersion.UNKNOWN, false),
new PrestoNode(UUID.randomUUID().toString(), URI.create("https://192.0.4.9"), new NodeVersion("2"), false));

List<ServiceDescriptor> descriptors = new ArrayList<>();
for (PrestoNode node : Iterables.concat(activeNodes, inactiveNodes)) {
descriptors.add(serviceDescriptor("presto")
.setNodeId(node.getNodeIdentifier())
.addProperty("http", node.getHttpUri().toString())
.addProperty("node_version", node.getNodeVersion().toString())
.addProperty("coordinator", String.valueOf(node.equals(coordinator)))
.build());
}

selector = new StaticServiceSelector(descriptors);
selector.announceNodes(activeNodes, inactiveNodes);
}

@Test
public void testGetAllNodes()
{
DiscoveryNodeManager manager = new DiscoveryNodeManager(selector, nodeInfo, new NoOpFailureDetector(), expectedVersion, testHttpClient, internalCommunicationConfig);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you use try-with-resources for manager?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No it doesn't implement AutoCloseable

AllNodes allNodes = manager.getAllNodes();
try {
AllNodes allNodes = manager.getAllNodes();

Set<Node> activeNodes = allNodes.getActiveNodes();
assertEqualsIgnoreOrder(activeNodes, this.activeNodes);
Set<Node> activeNodes = allNodes.getActiveNodes();
assertEqualsIgnoreOrder(activeNodes, this.activeNodes);

for (Node actual : activeNodes) {
for (Node expected : this.activeNodes) {
assertNotSame(actual, expected);
for (Node actual : activeNodes) {
for (Node expected : this.activeNodes) {
assertNotSame(actual, expected);
}
}
}

assertEqualsIgnoreOrder(activeNodes, manager.getNodes(ACTIVE));
assertEqualsIgnoreOrder(activeNodes, manager.getNodes(ACTIVE));

Set<Node> inactiveNodes = allNodes.getInactiveNodes();
assertEqualsIgnoreOrder(inactiveNodes, this.inactiveNodes);
Set<Node> inactiveNodes = allNodes.getInactiveNodes();
assertEqualsIgnoreOrder(inactiveNodes, this.inactiveNodes);

for (Node actual : inactiveNodes) {
for (Node expected : this.inactiveNodes) {
assertNotSame(actual, expected);
for (Node actual : inactiveNodes) {
for (Node expected : this.inactiveNodes) {
assertNotSame(actual, expected);
}
}
}

assertEqualsIgnoreOrder(inactiveNodes, manager.getNodes(INACTIVE));
assertEqualsIgnoreOrder(inactiveNodes, manager.getNodes(INACTIVE));
}
finally {
manager.stop();
}
}

@Test
public void testGetCurrentNode()
{
Node expected = activeNodes.get(0);

NodeInfo nodeInfo = new NodeInfo(new NodeConfig()
.setEnvironment("test")
.setNodeId(expected.getNodeIdentifier()));
.setNodeId(currentNode.getNodeIdentifier()));

DiscoveryNodeManager manager = new DiscoveryNodeManager(selector, nodeInfo, new NoOpFailureDetector(), expectedVersion, testHttpClient, internalCommunicationConfig);

assertEquals(manager.getCurrentNode(), expected);
try {
assertEquals(manager.getCurrentNode(), currentNode);
}
finally {
manager.stop();
}
}

@Test
public void testGetCoordinators()
{
InternalNodeManager manager = new DiscoveryNodeManager(selector, nodeInfo, new NoOpFailureDetector(), expectedVersion, testHttpClient, internalCommunicationConfig);
assertEquals(manager.getCoordinators(), ImmutableSet.of(coordinator));
DiscoveryNodeManager manager = new DiscoveryNodeManager(selector, nodeInfo, new NoOpFailureDetector(), expectedVersion, testHttpClient, internalCommunicationConfig);
try {
assertEquals(manager.getCoordinators(), ImmutableSet.of(coordinator));
}
finally {
manager.stop();
}
}

@SuppressWarnings("ResultOfObjectAllocationIgnored")
Expand All @@ -143,4 +152,79 @@ public void testGetCurrentNodeRequired()
{
new DiscoveryNodeManager(selector, new NodeInfo("test"), new NoOpFailureDetector(), expectedVersion, testHttpClient, internalCommunicationConfig);
}

@Test(timeOut = 60000)
public void testNodeChangeListener()
throws Exception
{
DiscoveryNodeManager manager = new DiscoveryNodeManager(selector, nodeInfo, new NoOpFailureDetector(), expectedVersion, testHttpClient, internalCommunicationConfig);
try {
manager.startPollingNodeStates();

BlockingQueue<AllNodes> notifications = new ArrayBlockingQueue<>(100);
manager.addNodeChangeListener(notifications::add);
AllNodes allNodes = notifications.take();
assertEquals(allNodes.getActiveNodes(), activeNodes);
assertEquals(allNodes.getInactiveNodes(), inactiveNodes);

selector.announceNodes(ImmutableSet.of(currentNode), ImmutableSet.of(coordinator));
allNodes = notifications.take();
assertEquals(allNodes.getActiveNodes(), ImmutableSet.of(currentNode, coordinator));
assertEquals(allNodes.getActiveCoordinators(), ImmutableSet.of(coordinator));

selector.announceNodes(activeNodes, inactiveNodes);
allNodes = notifications.take();
assertEquals(allNodes.getActiveNodes(), activeNodes);
assertEquals(allNodes.getInactiveNodes(), inactiveNodes);
}
finally {
manager.stop();
}
}

public static class PrestoNodeServiceSelector
implements ServiceSelector
{
@GuardedBy("this")
private List<ServiceDescriptor> descriptors = ImmutableList.of();

private synchronized void announceNodes(Set<Node> activeNodes, Set<Node> inactiveNodes)
{
ImmutableList.Builder<ServiceDescriptor> descriptors = ImmutableList.builder();
for (Node node : Iterables.concat(activeNodes, inactiveNodes)) {
descriptors.add(serviceDescriptor("presto")
.setNodeId(node.getNodeIdentifier())
.addProperty("http", node.getHttpUri().toString())
.addProperty("node_version", ((PrestoNode) node).getNodeVersion().toString())
.addProperty("coordinator", String.valueOf(node.isCoordinator()))
.build());
}

this.descriptors = descriptors.build();
}

@Override
public String getType()
{
return "presto";
}

@Override
public String getPool()
{
return DEFAULT_POOL;
}

@Override
public synchronized List<ServiceDescriptor> selectAllServices()
{
return descriptors;
}

@Override
public ListenableFuture<List<ServiceDescriptor>> refresh()
{
throw new UnsupportedOperationException();
}
}
}