Skip to content

Commit

Permalink
Ensure that the HTTP server supports WebSocket upgrades for HTTP/1.0 …
Browse files Browse the repository at this point in the history
…non persistent connections.
  • Loading branch information
vietj committed Oct 10, 2023
1 parent a389f85 commit cbf9615
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 67 deletions.
123 changes: 60 additions & 63 deletions src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,34 @@ private void ackBytes(int len) {
}
}

private void writeHead(Stream stream, HttpRequestHead request, boolean chunked, ByteBuf buf, boolean end, boolean connect, PromiseInternal<Void> handler) {
writeToChannel(new MessageWrite() {
@Override
public void write() {
stream.request = request;
beginRequest(stream, request, chunked, buf, end, connect, handler);
}
@Override
public void cancel(Throwable cause) {
handler.fail(cause);
}
});
}

private void writeBuffer(Stream stream, ByteBuf buff, boolean end, PromiseInternal<Void> listener) {
writeToChannel(new MessageWrite() {
@Override
public void write() {
writeBuffer(stream, buff, end, (FutureListener<Void>)listener);
}

@Override
public void cancel(Throwable cause) {
listener.fail(cause);
}
});
}

private abstract static class Stream {

protected final Promise<HttpClientStream> promise;
Expand All @@ -391,6 +419,7 @@ private abstract static class Stream {

private Object trace;
private Object metric;
private HttpRequestHead request;
private HttpResponseHead response;
private boolean responseEnded;
private long bytesRead;
Expand Down Expand Up @@ -433,7 +462,6 @@ private static class StreamImpl extends Stream implements HttpClientStream {
private final InboundBuffer<Object> queue;
private boolean reset;
private boolean closed;
private HttpRequestHead request;
private Handler<HttpResponseHead> headHandler;
private Handler<Buffer> chunkHandler;
private Handler<MultiMap> endHandler;
Expand Down Expand Up @@ -560,50 +588,21 @@ public ContextInternal getContext() {
@Override
public Future<Void> writeHead(HttpRequestHead request, boolean chunked, ByteBuf buf, boolean end, StreamPriority priority, boolean connect) {
PromiseInternal<Void> promise = context.promise();
writeHead(request, chunked, buf, end, connect, promise);
conn.writeHead(this, request, chunked, buf, end, connect, promise);
return promise.future();
}

private void writeHead(HttpRequestHead request, boolean chunked, ByteBuf buf, boolean end, boolean connect, PromiseInternal<Void> handler) {
conn.writeToChannel(new MessageWrite() {
@Override
public void write() {
StreamImpl.this.request = request;
conn.beginRequest(StreamImpl.this, request, chunked, buf, end, connect, handler);
}

@Override
public void cancel(Throwable cause) {
handler.fail(cause);
}
});
}

@Override
public Future<Void> writeBuffer(ByteBuf buff, boolean end) {
if (buff != null || end) {
PromiseInternal<Void> listener = context.promise();
writeBuffer(buff, end, listener);
conn.writeBuffer(this, buff, end, listener);
return listener.future();
} else {
throw new IllegalStateException("???");
}
}

private void writeBuffer(ByteBuf buff, boolean end, PromiseInternal<Void> listener) {
conn.writeToChannel(new MessageWrite() {
@Override
public void write() {
conn.writeBuffer(StreamImpl.this, buff, end, listener);
}

@Override
public void cancel(Throwable cause) {
listener.fail(cause);
}
});
}

@Override
public Future<Void> writeFrame(int type, int flags, ByteBuf payload) {
throw new IllegalStateException("Cannot write an HTTP/2 frame over an HTTP/1.x connection");
Expand Down Expand Up @@ -849,40 +848,13 @@ private void handleResponseBegin(Stream stream, HttpResponseHead response) {
} else {
HttpRequestHead request;
synchronized (this) {
request = ((StreamImpl)stream).request;
request = stream.request;
stream.response = response;

if (metrics != null) {
metrics.responseBegin(stream.metric, response);
}

//
if (response.statusCode != 100 && request.method != HttpMethod.CONNECT) {
// See https://tools.ietf.org/html/rfc7230#section-6.3
String responseConnectionHeader = response.headers.get(HttpHeaderNames.CONNECTION);
String requestConnectionHeader = request.headers != null ? request.headers.get(HttpHeaderNames.CONNECTION) : null;
// We don't need to protect against concurrent changes on forceClose as it only goes from false -> true
if (HttpHeaderValues.CLOSE.contentEqualsIgnoreCase(responseConnectionHeader) || HttpHeaderValues.CLOSE.contentEqualsIgnoreCase(requestConnectionHeader)) {
// In all cases, if we have a close connection option then we SHOULD NOT treat the connection as persistent
this.close = true;
} else if (response.version == HttpVersion.HTTP_1_0 && !HttpHeaderValues.KEEP_ALIVE.contentEqualsIgnoreCase(responseConnectionHeader)) {
// In the HTTP/1.0 case both request/response need a keep-alive connection header the connection to be persistent
// currently Vertx forces the Connection header if keepalive is enabled for 1.0
this.close = true;
}
String keepAliveHeader = response.headers.get(HttpHeaderNames.KEEP_ALIVE);
if (keepAliveHeader != null) {
int timeout = HttpUtils.parseKeepAliveHeaderTimeout(keepAliveHeader);
if (timeout != -1) {
this.keepAliveTimeout = timeout;
}
}
}
}

//
stream.handleHead(response);

if (isConnect) {
if ((request.method == HttpMethod.CONNECT &&
response.statusCode == 200) || (
Expand Down Expand Up @@ -931,19 +903,44 @@ private void handleResponseChunk(Stream stream, ByteBuf chunk) {

private void handleResponseEnd(Stream stream, LastHttpContent trailer) {
boolean check;
HttpResponseHead response ;
synchronized (this) {
if (stream.response == null) {
response = stream.response;
if (response == null) {
// 100-continue
return;
}
responses.pop();
close |= !options.isKeepAlive();
HttpRequestHead request = stream.request;
if ((request.method != HttpMethod.CONNECT && response.statusCode != 101)) {
// See https://tools.ietf.org/html/rfc7230#section-6.3
String responseConnectionHeader = response.headers.get(HttpHeaderNames.CONNECTION);
String requestConnectionHeader = request.headers != null ? request.headers.get(HttpHeaderNames.CONNECTION) : null;
// We don't need to protect against concurrent changes on forceClose as it only goes from false -> true
boolean close = !options.isKeepAlive();
if (HttpHeaderValues.CLOSE.contentEqualsIgnoreCase(responseConnectionHeader) || HttpHeaderValues.CLOSE.contentEqualsIgnoreCase(requestConnectionHeader)) {
// In all cases, if we have a close connection option then we SHOULD NOT treat the connection as persistent
close = true;
} else if (response.version == HttpVersion.HTTP_1_0 && !HttpHeaderValues.KEEP_ALIVE.contentEqualsIgnoreCase(responseConnectionHeader)) {
// In the HTTP/1.0 case both request/response need a keep-alive connection header the connection to be persistent
// currently Vertx forces the Connection header if keepalive is enabled for 1.0
close = true;
}
this.close = close;
String keepAliveHeader = response.headers.get(HttpHeaderNames.KEEP_ALIVE);
if (keepAliveHeader != null) {
int timeout = HttpUtils.parseKeepAliveHeaderTimeout(keepAliveHeader);
if (timeout != -1) {
this.keepAliveTimeout = timeout;
}
}
}
stream.responseEnded = true;
check = requests.peek() != stream;
}
VertxTracer tracer = context.tracer();
if (tracer != null) {
tracer.receiveResponse(stream.context, stream.response, stream.trace, null, HttpUtils.CLIENT_RESPONSE_TAG_EXTRACTOR);
tracer.receiveResponse(stream.context, response, stream.trace, null, HttpUtils.CLIENT_RESPONSE_TAG_EXTRACTOR);
}
if (metrics != null) {
metrics.responseEnd(stream.metric, stream.bytesRead);
Expand Down
18 changes: 14 additions & 4 deletions src/test/java/io/vertx/core/http/WebSocketTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import io.vertx.test.proxy.HAProxy;
import io.vertx.test.tls.Cert;
import io.vertx.test.tls.Trust;
import org.junit.Ignore;
import org.junit.Test;

import java.io.IOException;
Expand Down Expand Up @@ -2833,17 +2834,26 @@ private void testCloseCustomPayloadFromClient(Consumer<WebSocket> closeOp) throw
}

@Test
public void testServerWebSocketHandshakeWithNonPersistentConnection() {
public void testServerWebSocketHandshakeWithNonPersistentHTTP1_0Connection() {
testServerWebSocketHandshakeWithNonPersistentConnection(HttpVersion.HTTP_1_0);
}

@Ignore
@Test
public void testServerWebSocketHandshakeWithNonPersistentHTTP1_1Connection() {
// Cannot pass until we merge connection header as it implies a "Connection: upgrade, close" header
testServerWebSocketHandshakeWithNonPersistentConnection(HttpVersion.HTTP_1_1);
}

private void testServerWebSocketHandshakeWithNonPersistentConnection(HttpVersion version) {
server = vertx.createHttpServer();
server.webSocketHandler(ws -> {
ws.frameHandler(frame -> {
ws.close();
});
});
server.listen(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST).onComplete(onSuccess(v1 -> {
handshake(vertx.createHttpClient(), req -> {
MultiMap headers = req.headers();
headers.add("Connection", "close");
handshake(vertx.createHttpClient(new HttpClientOptions().setProtocolVersion(version).setKeepAlive(false)), req -> {
req.send().onComplete(onSuccess(resp -> {
assertEquals(101, resp.statusCode());
resp.endHandler(v -> {
Expand Down

0 comments on commit cbf9615

Please sign in to comment.