Skip to content

Commit 7cdbae2

Browse files
authored
Add Writeable.Reader support to TransportResponseHandler (#28010)
Allows TransportResponse objects not to implement Streamable anymore. As an example, I've adapted the response handler for ShardActiveResponse, allowing the fields in that class to become final.
1 parent d36ec18 commit 7cdbae2

File tree

8 files changed

+49
-30
lines changed

8 files changed

+49
-30
lines changed

core/src/main/java/org/elasticsearch/indices/store/IndicesStore.java

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -238,8 +238,8 @@ private class ShardActiveResponseHandler implements TransportResponseHandler<Sha
238238
}
239239

240240
@Override
241-
public ShardActiveResponse newInstance() {
242-
return new ShardActiveResponse();
241+
public ShardActiveResponse read(StreamInput in) throws IOException {
242+
return new ShardActiveResponse(in);
243243
}
244244

245245
@Override
@@ -417,20 +417,15 @@ public void writeTo(StreamOutput out) throws IOException {
417417

418418
private static class ShardActiveResponse extends TransportResponse {
419419

420-
private boolean shardActive;
421-
private DiscoveryNode node;
422-
423-
ShardActiveResponse() {
424-
}
420+
private final boolean shardActive;
421+
private final DiscoveryNode node;
425422

426423
ShardActiveResponse(boolean shardActive, DiscoveryNode node) {
427424
this.shardActive = shardActive;
428425
this.node = node;
429426
}
430427

431-
@Override
432-
public void readFrom(StreamInput in) throws IOException {
433-
super.readFrom(in);
428+
ShardActiveResponse(StreamInput in) throws IOException {
434429
shardActive = in.readBoolean();
435430
node = new DiscoveryNode(in);
436431
}

core/src/main/java/org/elasticsearch/transport/PlainTransportFuture.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@
2121

2222
import org.elasticsearch.ElasticsearchException;
2323
import org.elasticsearch.ElasticsearchTimeoutException;
24+
import org.elasticsearch.common.io.stream.StreamInput;
2425
import org.elasticsearch.common.util.concurrent.BaseFuture;
2526

27+
import java.io.IOException;
2628
import java.util.concurrent.ExecutionException;
2729
import java.util.concurrent.TimeUnit;
2830
import java.util.concurrent.TimeoutException;
@@ -70,8 +72,8 @@ public V txGet(long timeout, TimeUnit unit) {
7072
}
7173

7274
@Override
73-
public V newInstance() {
74-
return handler.newInstance();
75+
public V read(StreamInput in) throws IOException {
76+
return handler.read(in);
7577
}
7678

7779
@Override

core/src/main/java/org/elasticsearch/transport/TcpTransport.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1432,13 +1432,13 @@ static void ensureVersionCompatibility(Version version, Version currentVersion,
14321432
}
14331433

14341434
private void handleResponse(InetSocketAddress remoteAddress, final StreamInput stream, final TransportResponseHandler handler) {
1435-
final TransportResponse response = handler.newInstance();
1436-
response.remoteAddress(new TransportAddress(remoteAddress));
1435+
final TransportResponse response;
14371436
try {
1438-
response.readFrom(stream);
1437+
response = handler.read(stream);
1438+
response.remoteAddress(new TransportAddress(remoteAddress));
14391439
} catch (Exception e) {
14401440
handleException(handler, new TransportSerializationException(
1441-
"Failed to deserialize response of type [" + response.getClass().getName() + "]", e));
1441+
"Failed to deserialize response from handler [" + handler.getClass().getName() + "]", e));
14421442
return;
14431443
}
14441444
threadPool.executor(handler.executor()).execute(new AbstractRunnable() {

core/src/main/java/org/elasticsearch/transport/TransportResponseHandler.java

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,34 @@
1919

2020
package org.elasticsearch.transport;
2121

22-
public interface TransportResponseHandler<T extends TransportResponse> {
22+
import org.elasticsearch.common.io.stream.StreamInput;
23+
import org.elasticsearch.common.io.stream.Writeable;
24+
25+
import java.io.IOException;
26+
27+
public interface TransportResponseHandler<T extends TransportResponse> extends Writeable.Reader<T> {
28+
29+
/**
30+
* @deprecated Implement {@link #read(StreamInput)} instead.
31+
*/
32+
@Deprecated
33+
default T newInstance() {
34+
throw new UnsupportedOperationException();
35+
}
2336

2437
/**
25-
* creates a new instance of the return type from the remote call.
26-
* called by the infra before de-serializing the response.
38+
* deserializes a new instance of the return type from the stream.
39+
* called by the infra when de-serializing the response.
2740
*
28-
* @return a new response copy.
41+
* @return the deserialized response.
2942
*/
30-
T newInstance();
43+
@SuppressWarnings("deprecation")
44+
@Override
45+
default T read(StreamInput in) throws IOException {
46+
T instance = newInstance();
47+
instance.readFrom(in);
48+
return instance;
49+
}
3150

3251
void handleResponse(T response);
3352

core/src/main/java/org/elasticsearch/transport/TransportService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1079,8 +1079,8 @@ public ContextRestoreResponseHandler(Supplier<ThreadContext.StoredContext> conte
10791079
}
10801080

10811081
@Override
1082-
public T newInstance() {
1083-
return delegate.newInstance();
1082+
public T read(StreamInput in) throws IOException {
1083+
return delegate.read(in);
10841084
}
10851085

10861086
@Override

core/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
import org.elasticsearch.cluster.node.DiscoveryNode;
3232
import org.elasticsearch.cluster.node.DiscoveryNodes;
3333
import org.elasticsearch.common.UUIDs;
34-
import org.elasticsearch.common.settings.Setting;
34+
import org.elasticsearch.common.io.stream.StreamInput;
3535
import org.elasticsearch.common.settings.Settings;
3636
import org.elasticsearch.common.transport.TransportAddress;
3737
import org.elasticsearch.common.unit.TimeValue;
@@ -176,8 +176,8 @@ private <T extends TransportResponse> TransportResponseHandler wrapLivenessRespo
176176
ClusterName clusterName) {
177177
return new TransportResponseHandler<T>() {
178178
@Override
179-
public T newInstance() {
180-
return handler.newInstance();
179+
public T read(StreamInput in) throws IOException {
180+
return handler.read(in);
181181
}
182182

183183
@Override

core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.cluster.node.DiscoveryNodes;
3131
import org.elasticsearch.common.CheckedBiConsumer;
3232
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
33+
import org.elasticsearch.common.io.stream.StreamInput;
3334
import org.elasticsearch.common.network.NetworkAddress;
3435
import org.elasticsearch.common.network.NetworkService;
3536
import org.elasticsearch.common.settings.Settings;
@@ -899,8 +900,8 @@ protected TransportResponseHandler<UnicastPingResponse> getPingResponseHandler(P
899900
TransportResponseHandler<UnicastPingResponse> original = super.getPingResponseHandler(pingingRound, node);
900901
return new TransportResponseHandler<UnicastPingResponse>() {
901902
@Override
902-
public UnicastPingResponse newInstance() {
903-
return original.newInstance();
903+
public UnicastPingResponse read(StreamInput in) throws IOException {
904+
return original.read(in);
904905
}
905906

906907
@Override

test/framework/src/main/java/org/elasticsearch/transport/AssertingTransportInterceptor.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.elasticsearch.Version;
2222
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
23+
import org.elasticsearch.common.io.stream.StreamInput;
2324
import org.elasticsearch.common.io.stream.Streamable;
2425
import org.elasticsearch.common.settings.Settings;
2526
import org.elasticsearch.common.util.concurrent.ThreadContext;
@@ -30,6 +31,7 @@
3031
import org.elasticsearch.test.VersionUtils;
3132
import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
3233

34+
import java.io.IOException;
3335
import java.util.Collections;
3436
import java.util.List;
3537
import java.util.Random;
@@ -100,8 +102,8 @@ public <T extends TransportResponse> void sendRequest(Transport.Connection conne
100102
assertVersionSerializable(request);
101103
sender.sendRequest(connection, action, request, options, new TransportResponseHandler<T>() {
102104
@Override
103-
public T newInstance() {
104-
return handler.newInstance();
105+
public T read(StreamInput in) throws IOException {
106+
return handler.read(in);
105107
}
106108

107109
@Override

0 commit comments

Comments
 (0)