Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure that the HTTP server supports WebSocket upgrades for HTTP/1.0 on persistent connections #4894

Merged
merged 1 commit into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 31 additions & 32 deletions src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ private abstract static class Stream {

private Object trace;
private Object metric;
private HttpRequestHead request;
private HttpResponseHead response;
private boolean responseEnded;
private long bytesRead;
Expand Down Expand Up @@ -422,7 +423,6 @@ private static class StreamImpl extends Stream implements HttpClientStream {
private final InboundBuffer<Object> queue;
private boolean reset;
private boolean closed;
private HttpRequestHead request;
private Handler<HttpResponseHead> headHandler;
private Handler<Buffer> chunkHandler;
private Handler<MultiMap> endHandler;
Expand Down Expand Up @@ -554,7 +554,7 @@ public void writeHead(HttpRequestHead request, boolean chunked, ByteBuf buf, boo
private void writeHead(HttpRequestHead request, boolean chunked, ByteBuf buf, boolean end, boolean connect, Handler<AsyncResult<Void>> handler) {
EventLoop eventLoop = conn.context.nettyEventLoop();
if (eventLoop.inEventLoop()) {
this.request = request;
((Stream)this).request = request;
conn.beginRequest(this, request, chunked, buf, end, connect, handler);
} else {
eventLoop.execute(() -> writeHead(request, chunked, buf, end, connect, handler));
Expand Down Expand Up @@ -840,40 +840,14 @@ private void handleResponseBegin(Stream stream, HttpResponseHead response) {
} else {
HttpRequestHead request;
synchronized (this) {
request = ((StreamImpl)stream).request;
request = stream.request;
stream.response = response;

if (metrics != null) {
metrics.responseBegin(stream.metric, response);
}

//
if (response.statusCode != 100 && request.method != HttpMethod.CONNECT) {
// See https://tools.ietf.org/html/rfc7230#section-6.3
String responseConnectionHeader = response.headers.get(HttpHeaderNames.CONNECTION);
String requestConnectionHeader = request.headers != null ? request.headers.get(HttpHeaderNames.CONNECTION) : null;
// We don't need to protect against concurrent changes on forceClose as it only goes from false -> true
if (HttpHeaderValues.CLOSE.contentEqualsIgnoreCase(responseConnectionHeader) || HttpHeaderValues.CLOSE.contentEqualsIgnoreCase(requestConnectionHeader)) {
// In all cases, if we have a close connection option then we SHOULD NOT treat the connection as persistent
this.close = true;
} else if (response.version == HttpVersion.HTTP_1_0 && !HttpHeaderValues.KEEP_ALIVE.contentEqualsIgnoreCase(responseConnectionHeader)) {
// In the HTTP/1.0 case both request/response need a keep-alive connection header the connection to be persistent
// currently Vertx forces the Connection header if keepalive is enabled for 1.0
this.close = true;
}
String keepAliveHeader = response.headers.get(HttpHeaderNames.KEEP_ALIVE);
if (keepAliveHeader != null) {
int timeout = HttpUtils.parseKeepAliveHeaderTimeout(keepAliveHeader);
if (timeout != -1) {
this.keepAliveTimeout = timeout;
}
}
}
}

//
stream.handleHead(response);

if (isConnect) {
if ((request.method == HttpMethod.CONNECT &&
response.statusCode == 200) || (
Expand Down Expand Up @@ -922,19 +896,44 @@ private void handleResponseChunk(Stream stream, ByteBuf chunk) {

private void handleResponseEnd(Stream stream, LastHttpContent trailer) {
boolean check;
HttpResponseHead response;
synchronized (this) {
if (stream.response == null) {
response = stream.response;
if (response == null) {
// 100-continue
return;
}
responses.pop();
close |= !options.isKeepAlive();
HttpRequestHead request = stream.request;
if ((request.method != HttpMethod.CONNECT && response.statusCode != 101)) {
// See https://tools.ietf.org/html/rfc7230#section-6.3
String responseConnectionHeader = response.headers.get(HttpHeaderNames.CONNECTION);
String requestConnectionHeader = request.headers != null ? request.headers.get(HttpHeaderNames.CONNECTION) : null;
// We don't need to protect against concurrent changes on forceClose as it only goes from false -> true
boolean close = !options.isKeepAlive();
if (HttpHeaderValues.CLOSE.contentEqualsIgnoreCase(responseConnectionHeader) || HttpHeaderValues.CLOSE.contentEqualsIgnoreCase(requestConnectionHeader)) {
// In all cases, if we have a close connection option then we SHOULD NOT treat the connection as persistent
close = true;
} else if (response.version == HttpVersion.HTTP_1_0 && !HttpHeaderValues.KEEP_ALIVE.contentEqualsIgnoreCase(responseConnectionHeader)) {
// In the HTTP/1.0 case both request/response need a keep-alive connection header the connection to be persistent
// currently Vertx forces the Connection header if keepalive is enabled for 1.0
close = true;
}
this.close = close;
String keepAliveHeader = response.headers.get(HttpHeaderNames.KEEP_ALIVE);
if (keepAliveHeader != null) {
int timeout = HttpUtils.parseKeepAliveHeaderTimeout(keepAliveHeader);
if (timeout != -1) {
this.keepAliveTimeout = timeout;
}
}
}
stream.responseEnded = true;
check = requests.peek() != stream;
}
VertxTracer tracer = context.tracer();
if (tracer != null) {
tracer.receiveResponse(stream.context, stream.response, stream.trace, null, HttpUtils.CLIENT_RESPONSE_TAG_EXTRACTOR);
tracer.receiveResponse(stream.context, response, stream.trace, null, HttpUtils.CLIENT_RESPONSE_TAG_EXTRACTOR);
}
if (metrics != null) {
metrics.responseEnd(stream.metric, stream.bytesRead);
Expand Down
17 changes: 13 additions & 4 deletions src/test/java/io/vertx/core/http/WebSocketTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2952,17 +2952,26 @@ private void testCloseCustomPayloadFromClient(Consumer<WebSocket> closeOp) {
}

@Test
public void testServerWebSocketHandshakeWithNonPersistentConnection() {
public void testServerWebSocketHandshakeWithNonPersistentHTTP1_0Connection() {
testServerWebSocketHandshakeWithNonPersistentConnection(HttpVersion.HTTP_1_0);
}

@Ignore
@Test
public void testServerWebSocketHandshakeWithNonPersistentHTTP1_1Connection() {
// Cannot pass until we merge connection header as it implies a "Connection: upgrade, close" header
testServerWebSocketHandshakeWithNonPersistentConnection(HttpVersion.HTTP_1_1);
}

private void testServerWebSocketHandshakeWithNonPersistentConnection(HttpVersion version) {
server = vertx.createHttpServer();
server.webSocketHandler(ws -> {
ws.frameHandler(frame -> {
ws.close();
});
});
server.listen(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, onSuccess(v1 -> {
handshake(vertx.createHttpClient(), req -> {
MultiMap headers = req.headers();
headers.add("Connection", "close");
handshake(vertx.createHttpClient(new HttpClientOptions().setProtocolVersion(version).setKeepAlive(false)), req -> {
req.send(onSuccess(resp -> {
assertEquals(101, resp.statusCode());
resp.endHandler(v -> {
Expand Down