Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -643,20 +643,16 @@ private static boolean isCoordinatorSidecar(ServiceDescriptor service)
* Resource Manager -> All Nodes
* Catalog Server -> All Nodes
* Worker -> Resource Managers or Catalog Servers
* Sidecar -> Resource Managers or Catalog Servers
*
* @return Predicate to filter Service Descriptor for Nodes
*/
private Predicate<ServiceDescriptor> filterRelevantNodes()
{
if (currentNode.isCoordinator() || currentNode.isResourceManager() || currentNode.isCatalogServer() || currentNode.isCoordinatorSidecar()) {
// Allowing coordinator node in the list of services, even if it's not allowed by nodeStatusService with currentNode check
return service ->
!nodeStatusService.isPresent()
|| nodeStatusService.get().isAllowed(service.getLocation())
|| isCatalogServer(service)
|| isCoordinatorSidecar(service);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition is redundant. The node would already be included in this condition
nodeStatusService.get().isAllowed(service.getLocation())

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either we can keep this or remove it

if (currentNode.isCoordinator() || currentNode.isResourceManager() || currentNode.isCatalogServer()) {
return service -> !nodeStatusService.isPresent() || nodeStatusService.get().isAllowed(service.getLocation());
}

return service -> isResourceManager(service) || isCatalogServer(service) || isCoordinatorSidecar(service);
return service -> isResourceManager(service) || isCatalogServer(service);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public void testGetAllNodesForWorkerNode()
AllNodes allNodes = manager.getAllNodes();

Set<InternalNode> activeNodes = allNodes.getActiveNodes();
assertEqualsIgnoreOrder(activeNodes, ImmutableSet.of(resourceManager, catalogServer, coordinatorSidecar));
assertEqualsIgnoreOrder(activeNodes, ImmutableSet.of(resourceManager, catalogServer));

for (InternalNode actual : activeNodes) {
for (InternalNode expected : this.activeNodes) {
Expand All @@ -180,7 +180,7 @@ public void testGetAllNodesForWorkerNode()
assertEqualsIgnoreOrder(activeNodes, manager.getNodes(ACTIVE));

Set<InternalNode> inactiveNodes = allNodes.getInactiveNodes();
assertEqualsIgnoreOrder(inactiveNodes, ImmutableSet.of(inActiveResourceManager, inActiveCatalogServer, inActiveCoordinatorSidecar));
assertEqualsIgnoreOrder(inactiveNodes, ImmutableSet.of(inActiveResourceManager, inActiveCatalogServer));

for (InternalNode actual : inactiveNodes) {
for (InternalNode expected : this.inactiveNodes) {
Expand Down Expand Up @@ -271,7 +271,7 @@ public void testGetAllNodesForCoordinatorSidecar()
AllNodes allNodes = manager.getAllNodes();

Set<InternalNode> activeNodes = allNodes.getActiveNodes();
assertEqualsIgnoreOrder(activeNodes, this.activeNodes);
assertEqualsIgnoreOrder(activeNodes, ImmutableSet.of(resourceManager, catalogServer));

for (InternalNode actual : activeNodes) {
for (InternalNode expected : this.activeNodes) {
Expand All @@ -282,7 +282,7 @@ public void testGetAllNodesForCoordinatorSidecar()
assertEqualsIgnoreOrder(activeNodes, manager.getNodes(ACTIVE));

Set<InternalNode> inactiveNodes = allNodes.getInactiveNodes();
assertEqualsIgnoreOrder(inactiveNodes, this.inactiveNodes);
assertEqualsIgnoreOrder(inactiveNodes, ImmutableSet.of(inActiveResourceManager, inActiveCatalogServer));

for (InternalNode actual : inactiveNodes) {
for (InternalNode expected : this.inactiveNodes) {
Expand All @@ -298,11 +298,15 @@ public void testGetAllNodesForCoordinatorSidecar()
}

@Test
public void testGetCurrentNode()
public void testNodesVisibleToWorkerNode()
{
DiscoveryNodeManager manager = new DiscoveryNodeManager(selector, workerNodeInfo, new NoOpFailureDetector(), Optional.empty(), expectedVersion, testHttpClient, new TestingDriftClient<>(), internalCommunicationConfig);
try {
assertEquals(manager.getCurrentNode(), workerNode1);
assertEquals(manager.getCatalogServers(), ImmutableSet.of(catalogServer));
assertEquals(manager.getResourceManagers(), ImmutableSet.of(resourceManager));
assertEquals(manager.getCoordinatorSidecars(), ImmutableSet.of());
assertEquals(manager.getCoordinators(), ImmutableSet.of());
}
finally {
manager.stop();
Expand Down Expand Up @@ -346,11 +350,14 @@ public void testGetCatalogServers()
}

@Test
public void testGetCoordinatorSidecar()
public void testNodesVisibleToCoordinatorSidecar()
{
DiscoveryNodeManager manager = new DiscoveryNodeManager(selector, coordinatorSidecarNodeInfo, new NoOpFailureDetector(), Optional.of(host -> false), expectedVersion, testHttpClient, new TestingDriftClient<>(), internalCommunicationConfig);
try {
assertEquals(manager.getCoordinatorSidecars(), ImmutableSet.of(coordinatorSidecar));
assertEquals(manager.getCatalogServers(), ImmutableSet.of(catalogServer));
assertEquals(manager.getResourceManagers(), ImmutableSet.of(resourceManager));
assertEquals(manager.getCoordinatorSidecars(), ImmutableSet.of());
assertEquals(manager.getCoordinators(), ImmutableSet.of());
}
finally {
manager.stop();
Expand Down
Loading