diff --git a/server/src/main/java/org/elasticsearch/action/support/ContextPreservingActionListener.java b/server/src/main/java/org/elasticsearch/action/support/ContextPreservingActionListener.java index 72f1e7c1d6643..b3ee1a8197db9 100644 --- a/server/src/main/java/org/elasticsearch/action/support/ContextPreservingActionListener.java +++ b/server/src/main/java/org/elasticsearch/action/support/ContextPreservingActionListener.java @@ -51,6 +51,11 @@ public void onFailure(Exception e) { } } + @Override + public String toString() { + return getClass().getName() + "/" + delegate.toString(); + } + /** * Wraps the provided action listener in a {@link ContextPreservingActionListener} that will * also copy the response headers when the {@link ThreadContext.StoredContext} is closed diff --git a/server/src/main/java/org/elasticsearch/transport/InboundHandler.java b/server/src/main/java/org/elasticsearch/transport/InboundHandler.java index fa7533299b687..69baf73416d66 100644 --- a/server/src/main/java/org/elasticsearch/transport/InboundHandler.java +++ b/server/src/main/java/org/elasticsearch/transport/InboundHandler.java @@ -199,7 +199,7 @@ private void handleResponse(InetSocketAddress remo response.remoteAddress(new TransportAddress(remoteAddress)); } catch (Exception e) { handleException(handler, new TransportSerializationException( - "Failed to deserialize response from handler [" + handler.getClass().getName() + "]", e)); + "Failed to deserialize response from handler [" + handler + "]", e)); return; } threadPool.executor(handler.executor()).execute(new AbstractRunnable() { @@ -220,7 +220,8 @@ private void handlerResponseError(StreamInput stream, final TransportResponseHan try { error = stream.readException(); } catch (Exception e) { - error = new TransportSerializationException("Failed to deserialize exception response from stream", e); + error = new TransportSerializationException( + "Failed to deserialize exception response from stream for handler [" + handler + "]", e); } handleException(handler, error); } diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index 797889680ee2b..efc88e6d27e34 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -560,37 +560,44 @@ public final void sendRequest(final DiscoveryNode public final void sendRequest(final Transport.Connection connection, final String action, final TransportRequest request, final TransportRequestOptions options, - TransportResponseHandler handler) { + final TransportResponseHandler handler) { try { + final TransportResponseHandler delegate; if (request.getParentTask().isSet()) { // TODO: capture the connection instead so that we can cancel child tasks on the remote connections. final Releasable unregisterChildNode = taskManager.registerChildNode(request.getParentTask().getId(), connection.getNode()); - final TransportResponseHandler delegate = handler; - handler = new TransportResponseHandler<>() { + delegate = new TransportResponseHandler<>() { @Override public void handleResponse(T response) { unregisterChildNode.close(); - delegate.handleResponse(response); + handler.handleResponse(response); } @Override public void handleException(TransportException exp) { unregisterChildNode.close(); - delegate.handleException(exp); + handler.handleException(exp); } @Override public String executor() { - return delegate.executor(); + return handler.executor(); } @Override public T read(StreamInput in) throws IOException { - return delegate.read(in); + return handler.read(in); + } + + @Override + public String toString() { + return getClass().getName() + "/[" + action + "]:" + handler.toString(); } }; + } else { + delegate = handler; } - asyncSender.sendRequest(connection, action, request, options, handler); + asyncSender.sendRequest(connection, action, request, options, delegate); } catch (final Exception ex) { // the caller might not handle this so we invoke the handler final TransportException te; diff --git a/server/src/test/java/org/elasticsearch/action/support/ContextPreservingActionListenerTests.java b/server/src/test/java/org/elasticsearch/action/support/ContextPreservingActionListenerTests.java index f0277c65c16d6..609d1c0865ff4 100644 --- a/server/src/test/java/org/elasticsearch/action/support/ContextPreservingActionListenerTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/ContextPreservingActionListenerTests.java @@ -25,6 +25,9 @@ import java.io.IOException; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.containsString; + public class ContextPreservingActionListenerTests extends ESTestCase { public void testOriginalContextIsPreservedAfterOnResponse() throws IOException { @@ -150,4 +153,30 @@ public void onFailure(Exception e) { assertNull(threadContext.getHeader("foo")); assertEquals(nonEmptyContext ? "value" : null, threadContext.getHeader("not empty")); } + + public void testToStringIncludesDelegate() { + ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + final ContextPreservingActionListener actionListener; + try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { + final ActionListener delegate = new ActionListener<>() { + @Override + public void onResponse(Void aVoid) { + } + + @Override + public void onFailure(Exception e) { + } + + @Override + public String toString() { + return "test delegate"; + } + }; + + actionListener = ContextPreservingActionListener.wrapPreservingContext(delegate, threadContext); + } + + assertThat(actionListener.toString(), allOf(containsString("test delegate"), containsString("ContextPreservingActionListener"))); + } + } diff --git a/server/src/test/java/org/elasticsearch/transport/TransportServiceDeserializationFailureTests.java b/server/src/test/java/org/elasticsearch/transport/TransportServiceDeserializationFailureTests.java new file mode 100644 index 0000000000000..be862ef45f592 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/transport/TransportServiceDeserializationFailureTests.java @@ -0,0 +1,168 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport; + +import org.elasticsearch.Version; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.coordination.DeterministicTaskQueue; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskAwareRequest; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.transport.MockTransport; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.Collections; +import java.util.List; + +import static org.elasticsearch.node.Node.NODE_NAME_SETTING; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.hasToString; + +public class TransportServiceDeserializationFailureTests extends ESTestCase { + + public void testDeserializationFailureLogIdentifiesListener() { + final DiscoveryNode localNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), Version.CURRENT); + final DiscoveryNode otherNode = new DiscoveryNode("other", buildNewFakeTransportAddress(), Version.CURRENT); + + final Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), "local").build(); + + final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings, random()); + + final String testActionName = "internal:test-action"; + + final MockTransport transport = new MockTransport() { + @Override + protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) { + if (action.equals(TransportService.HANDSHAKE_ACTION_NAME)) { + handleResponse(requestId, new TransportService.HandshakeResponse(otherNode, new ClusterName(""), Version.CURRENT)); + } + } + }; + final TransportService transportService = transport.createTransportService(Settings.EMPTY, + deterministicTaskQueue.getThreadPool(), TransportService.NOOP_TRANSPORT_INTERCEPTOR, ignored -> localNode, null, + Collections.emptySet()); + + transportService.registerRequestHandler(testActionName, ThreadPool.Names.SAME, TransportRequest.Empty::new, + (request, channel, task) -> channel.sendResponse(TransportResponse.Empty.INSTANCE)); + + transportService.start(); + transportService.acceptIncomingRequests(); + + final PlainActionFuture connectionFuture = new PlainActionFuture<>(); + transportService.connectToNode(otherNode, connectionFuture); + assertTrue(connectionFuture.isDone()); + + { + // requests without a parent task are recorded directly in the response context + + transportService.sendRequest(otherNode, testActionName, TransportRequest.Empty.INSTANCE, + TransportRequestOptions.EMPTY, new TransportResponseHandler() { + @Override + public void handleResponse(TransportResponse.Empty response) { + fail("should not be called"); + } + + @Override + public void handleException(TransportException exp) { + fail("should not be called"); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + + @Override + public TransportResponse.Empty read(StreamInput in) { + throw new AssertionError("should not be called"); + } + + @Override + public String toString() { + return "test handler without parent"; + } + }); + + final List> responseContexts + = transport.getResponseHandlers().prune(ignored -> true); + assertThat(responseContexts, hasSize(1)); + final TransportResponseHandler handler = responseContexts.get(0).handler(); + assertThat(handler, hasToString(containsString("test handler without parent"))); + } + + { + // requests with a parent task get wrapped up by the transport service, including the action name + + final Task parentTask = transportService.getTaskManager().register("test", "test-action", new TaskAwareRequest() { + @Override + public void setParentTask(TaskId taskId) { + fail("should not be called"); + } + + @Override + public TaskId getParentTask() { + return TaskId.EMPTY_TASK_ID; + } + }); + + transportService.sendChildRequest(otherNode, testActionName, TransportRequest.Empty.INSTANCE, parentTask, + TransportRequestOptions.EMPTY, new TransportResponseHandler() { + @Override + public void handleResponse(TransportResponse.Empty response) { + fail("should not be called"); + } + + @Override + public void handleException(TransportException exp) { + fail("should not be called"); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + + @Override + public TransportResponse.Empty read(StreamInput in) { + throw new AssertionError("should not be called"); + } + + @Override + public String toString() { + return "test handler with parent"; + } + }); + + final List> responseContexts + = transport.getResponseHandlers().prune(ignored -> true); + assertThat(responseContexts, hasSize(1)); + final TransportResponseHandler handler = responseContexts.get(0).handler(); + assertThat(handler, hasToString(allOf(containsString("test handler with parent"), containsString(testActionName)))); + } + } + +}