From ef83acee73b64aab7f8daef6d5764546d2025f66 Mon Sep 17 00:00:00 2001 From: Nikola Grcevski Date: Mon, 18 Apr 2022 15:02:06 -0400 Subject: [PATCH] Add connection accounting tests --- .../org/elasticsearch/tasks/TaskManager.java | 9 ++ .../elasticsearch/tasks/TaskManagerTests.java | 84 +++++++++++++++++++ 2 files changed, 93 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/tasks/TaskManager.java b/server/src/main/java/org/elasticsearch/tasks/TaskManager.java index e0ca0ed5ab06e..9bfbc7c7dcbee 100644 --- a/server/src/main/java/org/elasticsearch/tasks/TaskManager.java +++ b/server/src/main/java/org/elasticsearch/tasks/TaskManager.java @@ -272,6 +272,15 @@ public Releasable registerChildConnection(long taskId, Transport.Connection chil return null; } + // package private for testing + Integer childTasksPerConnection(long taskId, Transport.Connection childConnection) { + final CancellableTaskHolder holder = cancellableTasks.get(taskId); + if (holder != null) { + return holder.childTasksPerConnection.get(childConnection); + } + return null; + } + /** * Stores the task failure */ diff --git a/server/src/test/java/org/elasticsearch/tasks/TaskManagerTests.java b/server/src/test/java/org/elasticsearch/tasks/TaskManagerTests.java index 6e40e9434141e..3f0b2ecc980cf 100644 --- a/server/src/test/java/org/elasticsearch/tasks/TaskManagerTests.java +++ b/server/src/test/java/org/elasticsearch/tasks/TaskManagerTests.java @@ -11,6 +11,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.node.tasks.TransportTasksActionTests; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.network.CloseableChannel; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; @@ -24,7 +25,10 @@ import org.elasticsearch.transport.TcpChannel; import org.elasticsearch.transport.TcpTransportChannel; import org.elasticsearch.transport.TestTransportChannels; +import org.elasticsearch.transport.Transport; +import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; import org.junit.After; import org.junit.Before; @@ -230,6 +234,32 @@ void cancelTaskAndDescendants(CancellableTask task, String reason, boolean waitF assertThat(taskManager.numberOfChannelPendingTaskTrackers(), equalTo(0)); } + public void testTaskAccounting() { + final TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Set.of()); + + final Task task1 = taskManager.register("transport", "test", new CancellableRequest("thread 1")); + final Task task2 = taskManager.register("transport", "test", new CancellableRequest("thread 2")); + + final MockConnection connection1 = new MockConnection(); + final MockConnection connection2 = new MockConnection(); + + Releasable releasableConnection1 = taskManager.registerChildConnection(task1.getId(), connection1); + Releasable releasableConnection2 = taskManager.registerChildConnection(task2.getId(), connection2); + Releasable releasableConnection3 = taskManager.registerChildConnection(task1.getId(), connection1); + + assertEquals(2, taskManager.childTasksPerConnection(task1.getId(), connection1).intValue()); + assertEquals(1, taskManager.childTasksPerConnection(task2.getId(), connection2).intValue()); + + releasableConnection1.close(); + assertEquals(1, taskManager.childTasksPerConnection(task1.getId(), connection1).intValue()); + + releasableConnection2.close(); + assertNull(taskManager.childTasksPerConnection(task2.getId(), connection2)); + + releasableConnection3.close(); + assertNull(taskManager.childTasksPerConnection(task1.getId(), connection1)); + } + static class CancellableRequest extends TransportRequest { private final String requestId; @@ -265,4 +295,58 @@ public void addCloseListener(ActionListener listener) { super.addCloseListener(listener); } } + + public static final class MockConnection implements Transport.Connection { + @Override + public DiscoveryNode getNode() { + return null; + } + + @Override + public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options) + throws TransportException { + throw new UnsupportedOperationException(); + } + + @Override + public void addCloseListener(ActionListener listener) {} + + @Override + public void addRemovedListener(ActionListener listener) {} + + @Override + public boolean isClosed() { + return false; + } + + @Override + public void close() { + throw new UnsupportedOperationException(); + } + + @Override + public void onRemoved() { + throw new UnsupportedOperationException(); + } + + @Override + public void incRef() {} + + @Override + public boolean tryIncRef() { + return true; + } + + @Override + public boolean decRef() { + assert false : "shouldn't release a mock connection"; + return false; + } + + @Override + public boolean hasReferences() { + return true; + } + } + }