Skip to content

Commit d91400a

Browse files
committed
Add more coverage for streaming and non-streaming netty connections
Signed-off-by: Sergei Ustimenko <[email protected]>
1 parent 84ee332 commit d91400a

File tree

2 files changed

+141
-37
lines changed

2 files changed

+141
-37
lines changed

plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransportStreamingTests.java

Lines changed: 80 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,86 @@ public void testRequestResponseStreaming() throws InterruptedException {
101101
final String url = "/stream/";
102102

103103
final ToXContent[] chunks = newChunks(responseString);
104-
final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() {
104+
final HttpServerTransport.Dispatcher dispatcher = createStreamingDispatcher(url, responseString);
105+
106+
try (
107+
ReactorNetty4HttpServerTransport transport = new ReactorNetty4HttpServerTransport(
108+
Settings.EMPTY,
109+
networkService,
110+
bigArrays,
111+
threadPool,
112+
xContentRegistry(),
113+
dispatcher,
114+
clusterSettings,
115+
new SharedGroupFactory(Settings.EMPTY),
116+
NoopTracer.INSTANCE
117+
)
118+
) {
119+
transport.start();
120+
final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses());
121+
122+
try (ReactorHttpClient client = ReactorHttpClient.create(false)) {
123+
HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url);
124+
final FullHttpResponse response = client.stream(remoteAddress.address(), request, Arrays.stream(chunks));
125+
try {
126+
assertThat(response.status(), equalTo(HttpResponseStatus.OK));
127+
byte[] bytes = new byte[response.content().readableBytes()];
128+
response.content().readBytes(bytes);
129+
assertThat(new String(bytes, StandardCharsets.UTF_8), equalTo(Arrays.stream(newChunks(responseString)).map(s -> {
130+
try (XContentBuilder builder = XContentType.JSON.contentBuilder()) {
131+
return s.toXContent(builder, ToXContent.EMPTY_PARAMS).toString();
132+
} catch (final IOException ex) {
133+
throw new UncheckedIOException(ex);
134+
}
135+
}).collect(Collectors.joining("\r\n", "", "\r\n"))));
136+
} finally {
137+
response.release();
138+
}
139+
}
140+
}
141+
}
142+
143+
public void testConnectionsGettingClosedForStreamingRequests() throws InterruptedException {
144+
final String responseString = randomAlphaOfLength(4 * 1024);
145+
final String url = "/stream/";
146+
147+
final ToXContent[] chunks = newChunks(responseString);
148+
final HttpServerTransport.Dispatcher dispatcher = createStreamingDispatcher(url, responseString);
149+
150+
try (
151+
ReactorNetty4HttpServerTransport transport = new ReactorNetty4HttpServerTransport(
152+
Settings.EMPTY,
153+
networkService,
154+
bigArrays,
155+
threadPool,
156+
xContentRegistry(),
157+
dispatcher,
158+
clusterSettings,
159+
new SharedGroupFactory(Settings.EMPTY),
160+
NoopTracer.INSTANCE
161+
);
162+
ReactorHttpClient client = ReactorHttpClient.create(false)
163+
) {
164+
transport.start();
165+
final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses());
166+
HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url);
167+
long numRequests = randomLongBetween(5L, 15L);
168+
for (int i = 0; i < numRequests; i++) {
169+
logger.info("Sending request {}/{}", i + 1, numRequests);
170+
final FullHttpResponse response = client.stream(remoteAddress.address(), request, Arrays.stream(chunks));
171+
try {
172+
assertThat(response.status(), equalTo(HttpResponseStatus.OK));
173+
} finally {
174+
response.release();
175+
}
176+
}
177+
assertThat(transport.stats().getServerOpen(), equalTo(0L));
178+
assertThat(transport.stats().getTotalOpen(), equalTo(numRequests));
179+
}
180+
}
181+
182+
private HttpServerTransport.Dispatcher createStreamingDispatcher(String url, String responseString) {
183+
return new HttpServerTransport.Dispatcher() {
105184
@Override
106185
public Optional<RestHandler> dispatchHandler(String uri, String rawPath, Method method, Map<String, String> params) {
107186
return Optional.of(new RestHandler() {
@@ -161,42 +240,6 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th
161240
}
162241

163242
};
164-
165-
try (
166-
ReactorNetty4HttpServerTransport transport = new ReactorNetty4HttpServerTransport(
167-
Settings.EMPTY,
168-
networkService,
169-
bigArrays,
170-
threadPool,
171-
xContentRegistry(),
172-
dispatcher,
173-
clusterSettings,
174-
new SharedGroupFactory(Settings.EMPTY),
175-
NoopTracer.INSTANCE
176-
)
177-
) {
178-
transport.start();
179-
final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses());
180-
181-
try (ReactorHttpClient client = ReactorHttpClient.create(false)) {
182-
HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url);
183-
final FullHttpResponse response = client.stream(remoteAddress.address(), request, Arrays.stream(chunks));
184-
try {
185-
assertThat(response.status(), equalTo(HttpResponseStatus.OK));
186-
byte[] bytes = new byte[response.content().readableBytes()];
187-
response.content().readBytes(bytes);
188-
assertThat(new String(bytes, StandardCharsets.UTF_8), equalTo(Arrays.stream(newChunks(responseString)).map(s -> {
189-
try (XContentBuilder builder = XContentType.JSON.contentBuilder()) {
190-
return s.toXContent(builder, ToXContent.EMPTY_PARAMS).toString();
191-
} catch (final IOException ex) {
192-
throw new UncheckedIOException(ex);
193-
}
194-
}).collect(Collectors.joining("\r\n", "", "\r\n"))));
195-
} finally {
196-
response.release();
197-
}
198-
}
199-
}
200243
}
201244

202245
private static ToXContent[] newChunks(final String responseString) {

plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransportTests.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ public void shutdown() throws Exception {
132132

133133
/**
134134
* Test that {@link ReactorNetty4HttpServerTransport} supports the "Expect: 100-continue" HTTP header
135+
*
135136
* @throws InterruptedException if the client communication with the server is interrupted
136137
*/
137138
public void testExpectContinueHeader() throws InterruptedException {
@@ -144,6 +145,7 @@ public void testExpectContinueHeader() throws InterruptedException {
144145
* Test that {@link ReactorNetty4HttpServerTransport} responds to a
145146
* 100-continue expectation with too large a content-length
146147
* with a 413 status.
148+
*
147149
* @throws InterruptedException if the client communication with the server is interrupted
148150
*/
149151
public void testExpectContinueHeaderContentLengthTooLong() throws InterruptedException {
@@ -156,6 +158,7 @@ public void testExpectContinueHeaderContentLengthTooLong() throws InterruptedExc
156158

157159
/**
158160
* Test that {@link ReactorNetty4HttpServerTransport} responds to an unsupported expectation with a 417 status.
161+
*
159162
* @throws InterruptedException if the client communication with the server is interrupted
160163
*/
161164
public void testExpectUnsupportedExpectation() throws InterruptedException {
@@ -436,6 +439,64 @@ private long getHugeAllocationCount() {
436439
return numOfHugAllocations;
437440
}
438441

442+
public void testConnectionsGettingClosed() throws InterruptedException {
443+
final String responseString = "ok";
444+
final String url = "/thing/";
445+
final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() {
446+
@Override
447+
public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) {
448+
if (url.equals(request.uri())) {
449+
channel.sendResponse(new BytesRestResponse(OK, responseString));
450+
} else {
451+
logger.error("--> Unexpected successful uri [{}]", request.uri());
452+
throw new AssertionError();
453+
}
454+
}
455+
456+
@Override
457+
public void dispatchBadRequest(RestChannel channel, ThreadContext threadContext, Throwable cause) {
458+
logger.error(
459+
new ParameterizedMessage("--> Unexpected bad request [{}]", FakeRestRequest.requestToString(channel.request())),
460+
cause
461+
);
462+
throw new AssertionError(cause);
463+
}
464+
};
465+
466+
try (
467+
ReactorNetty4HttpServerTransport transport = new ReactorNetty4HttpServerTransport(
468+
Settings.EMPTY,
469+
networkService,
470+
bigArrays,
471+
threadPool,
472+
xContentRegistry(),
473+
dispatcher,
474+
clusterSettings,
475+
new SharedGroupFactory(Settings.EMPTY),
476+
NoopTracer.INSTANCE
477+
);
478+
ReactorHttpClient client = ReactorHttpClient.create()
479+
) {
480+
transport.start();
481+
TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses());
482+
long numRequests = randomLongBetween(5L, 15L);
483+
484+
for (int i = 0; i < numRequests; i++) {
485+
DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url);
486+
logger.info("Sending request {}/{}", i + 1, numRequests);
487+
final FullHttpResponse response = client.send(remoteAddress.address(), request);
488+
try {
489+
assertThat(response.status(), equalTo(HttpResponseStatus.OK));
490+
} finally {
491+
response.release();
492+
}
493+
}
494+
495+
assertThat(transport.stats().getServerOpen(), equalTo(0L));
496+
assertThat(transport.stats().getTotalOpen(), equalTo(numRequests));
497+
}
498+
}
499+
439500
public void testCorsRequest() throws InterruptedException {
440501
final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() {
441502

0 commit comments

Comments
 (0)