Skip to content

Commit d2ddf8c

Browse files
committed
Improve deserialization failure logging (#60577)
Today when a node fails to properly deserialize a transport message with a parent task we log the following relatively uninformative message: java.lang.IllegalStateException: Message not fully read (response) for requestId [9999], handler [org.elasticsearch.transport.TransportService$ContextRestoreResponseHandler/org.elasticsearch.transport.TransportService$ContextRestoreResponseHandler/org.elasticsearch.transport.TransportService$6@abcdefgh], error [false]; resetting In particular, the wrapping of the listener in the `TransportService` obscures all clues as to the source of the problem, e.g. the action name or the identity of the underlying listener. This commit exposes the inner listener to the logs. Also if the listener is wrapped with `ContextPreservingActionListener` then its identity is similarly hidden. This commit also exposes the wrapped listener in this case. Relates #38939
1 parent a76fc32 commit d2ddf8c

File tree

5 files changed

+220
-10
lines changed

5 files changed

+220
-10
lines changed

server/src/main/java/org/elasticsearch/action/support/ContextPreservingActionListener.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,11 @@ public void onFailure(Exception e) {
5151
}
5252
}
5353

54+
@Override
55+
public String toString() {
56+
return getClass().getName() + "/" + delegate.toString();
57+
}
58+
5459
/**
5560
* Wraps the provided action listener in a {@link ContextPreservingActionListener} that will
5661
* also copy the response headers when the {@link ThreadContext.StoredContext} is closed

server/src/main/java/org/elasticsearch/transport/InboundHandler.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ private <T extends TransportResponse> void handleResponse(InetSocketAddress remo
199199
response.remoteAddress(new TransportAddress(remoteAddress));
200200
} catch (Exception e) {
201201
handleException(handler, new TransportSerializationException(
202-
"Failed to deserialize response from handler [" + handler.getClass().getName() + "]", e));
202+
"Failed to deserialize response from handler [" + handler + "]", e));
203203
return;
204204
}
205205
threadPool.executor(handler.executor()).execute(new AbstractRunnable() {
@@ -220,7 +220,8 @@ private void handlerResponseError(StreamInput stream, final TransportResponseHan
220220
try {
221221
error = stream.readException();
222222
} catch (Exception e) {
223-
error = new TransportSerializationException("Failed to deserialize exception response from stream", e);
223+
error = new TransportSerializationException(
224+
"Failed to deserialize exception response from stream for handler [" + handler + "]", e);
224225
}
225226
handleException(handler, error);
226227
}

server/src/main/java/org/elasticsearch/transport/TransportService.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -619,37 +619,44 @@ public final <T extends TransportResponse> void sendRequest(final DiscoveryNode
619619
public final <T extends TransportResponse> void sendRequest(final Transport.Connection connection, final String action,
620620
final TransportRequest request,
621621
final TransportRequestOptions options,
622-
TransportResponseHandler<T> handler) {
622+
final TransportResponseHandler<T> handler) {
623623
try {
624+
final TransportResponseHandler<T> delegate;
624625
if (request.getParentTask().isSet()) {
625626
// TODO: capture the connection instead so that we can cancel child tasks on the remote connections.
626627
final Releasable unregisterChildNode = taskManager.registerChildNode(request.getParentTask().getId(), connection.getNode());
627-
final TransportResponseHandler<T> delegate = handler;
628-
handler = new TransportResponseHandler<T>() {
628+
delegate = new TransportResponseHandler<T>() {
629629
@Override
630630
public void handleResponse(T response) {
631631
unregisterChildNode.close();
632-
delegate.handleResponse(response);
632+
handler.handleResponse(response);
633633
}
634634

635635
@Override
636636
public void handleException(TransportException exp) {
637637
unregisterChildNode.close();
638-
delegate.handleException(exp);
638+
handler.handleException(exp);
639639
}
640640

641641
@Override
642642
public String executor() {
643-
return delegate.executor();
643+
return handler.executor();
644644
}
645645

646646
@Override
647647
public T read(StreamInput in) throws IOException {
648-
return delegate.read(in);
648+
return handler.read(in);
649+
}
650+
651+
@Override
652+
public String toString() {
653+
return getClass().getName() + "/[" + action + "]:" + handler.toString();
649654
}
650655
};
656+
} else {
657+
delegate = handler;
651658
}
652-
asyncSender.sendRequest(connection, action, request, options, handler);
659+
asyncSender.sendRequest(connection, action, request, options, delegate);
653660
} catch (final Exception ex) {
654661
// the caller might not handle this so we invoke the handler
655662
final TransportException te;

server/src/test/java/org/elasticsearch/action/support/ContextPreservingActionListenerTests.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@
2525

2626
import java.io.IOException;
2727

28+
import static org.hamcrest.Matchers.allOf;
29+
import static org.hamcrest.Matchers.containsString;
30+
2831
public class ContextPreservingActionListenerTests extends ESTestCase {
2932

3033
public void testOriginalContextIsPreservedAfterOnResponse() throws IOException {
@@ -150,4 +153,30 @@ public void onFailure(Exception e) {
150153
assertNull(threadContext.getHeader("foo"));
151154
assertEquals(nonEmptyContext ? "value" : null, threadContext.getHeader("not empty"));
152155
}
156+
157+
public void testToStringIncludesDelegate() {
158+
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
159+
final ContextPreservingActionListener<Void> actionListener;
160+
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
161+
final ActionListener<Void> delegate = new ActionListener<Void>() {
162+
@Override
163+
public void onResponse(Void aVoid) {
164+
}
165+
166+
@Override
167+
public void onFailure(Exception e) {
168+
}
169+
170+
@Override
171+
public String toString() {
172+
return "test delegate";
173+
}
174+
};
175+
176+
actionListener = ContextPreservingActionListener.wrapPreservingContext(delegate, threadContext);
177+
}
178+
179+
assertThat(actionListener.toString(), allOf(containsString("test delegate"), containsString("ContextPreservingActionListener")));
180+
}
181+
153182
}
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.transport;
21+
22+
import org.elasticsearch.Version;
23+
import org.elasticsearch.action.support.PlainActionFuture;
24+
import org.elasticsearch.cluster.ClusterName;
25+
import org.elasticsearch.cluster.coordination.DeterministicTaskQueue;
26+
import org.elasticsearch.cluster.node.DiscoveryNode;
27+
import org.elasticsearch.common.io.stream.StreamInput;
28+
import org.elasticsearch.common.settings.Settings;
29+
import org.elasticsearch.tasks.Task;
30+
import org.elasticsearch.tasks.TaskAwareRequest;
31+
import org.elasticsearch.tasks.TaskId;
32+
import org.elasticsearch.test.ESTestCase;
33+
import org.elasticsearch.test.transport.MockTransport;
34+
import org.elasticsearch.threadpool.ThreadPool;
35+
36+
import java.util.Collections;
37+
import java.util.List;
38+
39+
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
40+
import static org.hamcrest.Matchers.allOf;
41+
import static org.hamcrest.Matchers.containsString;
42+
import static org.hamcrest.Matchers.hasSize;
43+
import static org.hamcrest.Matchers.hasToString;
44+
45+
public class TransportServiceDeserializationFailureTests extends ESTestCase {
46+
47+
public void testDeserializationFailureLogIdentifiesListener() {
48+
final DiscoveryNode localNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), Version.CURRENT);
49+
final DiscoveryNode otherNode = new DiscoveryNode("other", buildNewFakeTransportAddress(), Version.CURRENT);
50+
51+
final Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), "local").build();
52+
53+
final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings, random());
54+
55+
final String testActionName = "internal:test-action";
56+
57+
final MockTransport transport = new MockTransport() {
58+
@Override
59+
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) {
60+
if (action.equals(TransportService.HANDSHAKE_ACTION_NAME)) {
61+
handleResponse(requestId, new TransportService.HandshakeResponse(otherNode, new ClusterName(""), Version.CURRENT));
62+
}
63+
}
64+
};
65+
final TransportService transportService = transport.createTransportService(Settings.EMPTY,
66+
deterministicTaskQueue.getThreadPool(), TransportService.NOOP_TRANSPORT_INTERCEPTOR, ignored -> localNode, null,
67+
Collections.emptySet());
68+
69+
transportService.registerRequestHandler(testActionName, ThreadPool.Names.SAME, TransportRequest.Empty::new,
70+
(request, channel, task) -> channel.sendResponse(TransportResponse.Empty.INSTANCE));
71+
72+
transportService.start();
73+
transportService.acceptIncomingRequests();
74+
75+
final PlainActionFuture<Void> connectionFuture = new PlainActionFuture<>();
76+
transportService.connectToNode(otherNode, connectionFuture);
77+
assertTrue(connectionFuture.isDone());
78+
79+
{
80+
// requests without a parent task are recorded directly in the response context
81+
82+
transportService.sendRequest(otherNode, testActionName, TransportRequest.Empty.INSTANCE,
83+
TransportRequestOptions.EMPTY, new TransportResponseHandler<TransportResponse.Empty>() {
84+
@Override
85+
public void handleResponse(TransportResponse.Empty response) {
86+
fail("should not be called");
87+
}
88+
89+
@Override
90+
public void handleException(TransportException exp) {
91+
fail("should not be called");
92+
}
93+
94+
@Override
95+
public String executor() {
96+
return ThreadPool.Names.SAME;
97+
}
98+
99+
@Override
100+
public TransportResponse.Empty read(StreamInput in) {
101+
throw new AssertionError("should not be called");
102+
}
103+
104+
@Override
105+
public String toString() {
106+
return "test handler without parent";
107+
}
108+
});
109+
110+
final List<Transport.ResponseContext<? extends TransportResponse>> responseContexts
111+
= transport.getResponseHandlers().prune(ignored -> true);
112+
assertThat(responseContexts, hasSize(1));
113+
final TransportResponseHandler<? extends TransportResponse> handler = responseContexts.get(0).handler();
114+
assertThat(handler, hasToString(containsString("test handler without parent")));
115+
}
116+
117+
{
118+
// requests with a parent task get wrapped up by the transport service, including the action name
119+
120+
final Task parentTask = transportService.getTaskManager().register("test", "test-action", new TaskAwareRequest() {
121+
@Override
122+
public void setParentTask(TaskId taskId) {
123+
fail("should not be called");
124+
}
125+
126+
@Override
127+
public TaskId getParentTask() {
128+
return TaskId.EMPTY_TASK_ID;
129+
}
130+
});
131+
132+
transportService.sendChildRequest(otherNode, testActionName, TransportRequest.Empty.INSTANCE, parentTask,
133+
TransportRequestOptions.EMPTY, new TransportResponseHandler<TransportResponse.Empty>() {
134+
@Override
135+
public void handleResponse(TransportResponse.Empty response) {
136+
fail("should not be called");
137+
}
138+
139+
@Override
140+
public void handleException(TransportException exp) {
141+
fail("should not be called");
142+
}
143+
144+
@Override
145+
public String executor() {
146+
return ThreadPool.Names.SAME;
147+
}
148+
149+
@Override
150+
public TransportResponse.Empty read(StreamInput in) {
151+
throw new AssertionError("should not be called");
152+
}
153+
154+
@Override
155+
public String toString() {
156+
return "test handler with parent";
157+
}
158+
});
159+
160+
final List<Transport.ResponseContext<? extends TransportResponse>> responseContexts
161+
= transport.getResponseHandlers().prune(ignored -> true);
162+
assertThat(responseContexts, hasSize(1));
163+
final TransportResponseHandler<? extends TransportResponse> handler = responseContexts.get(0).handler();
164+
assertThat(handler, hasToString(allOf(containsString("test handler with parent"), containsString(testActionName))));
165+
}
166+
}
167+
168+
}

0 commit comments

Comments
 (0)