Skip to content

Commit

Permalink
The Vert.x HTTP server does not handle anymore WebSocket upgrades whe…
Browse files Browse the repository at this point in the history
…n the client presents a request with a connection header containing a close header (like nginx can send) and instead close the connection.

The server now will not close the connection when it is not persistent in case of a WebSocket upgrade.
  • Loading branch information
vietj committed Oct 9, 2023
1 parent 95cbc20 commit 6f1e620
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -183,15 +183,17 @@ public long concurrency() {
}

/**
* @return a raw {@code NetSocket} - for internal use
* @return a raw {@code NetSocket} - for internal use - must be called from event-loop
*/
public NetSocketInternal toNetSocket() {
removeChannelHandlers();
NetSocketImpl socket = new NetSocketImpl(context, chctx, null, metrics(), false);
socket.metric(metric());
evictionHandler.handle(null);
chctx.pipeline().replace("handler", "handler", VertxHandler.create(ctx -> socket));
return socket;
chctx.pipeline().replace("handler", "handler", VertxHandler.create(ctx -> {
NetSocketImpl socket = new NetSocketImpl(context, ctx, null, metrics(), false);
socket.metric(metric());
return socket;
}));
VertxHandler<NetSocketImpl> handler = (VertxHandler<NetSocketImpl>) chctx.pipeline().get(VertxHandler.class);
return handler.getConnection();
}

private HttpRequest createRequest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public HttpServerMetrics metrics() {

public void handleMessage(Object msg) {
assert msg != null;
if (requestInProgress == null && !keepAlive) {
if (requestInProgress == null && !keepAlive && webSocket == null) {
// Discard message
return;
}
Expand Down Expand Up @@ -247,7 +247,7 @@ void responseComplete() {
handleNext(next);
}
} else {
if (requestInProgress == request) {
if (requestInProgress == request || webSocket != null) {
// Deferred
} else {
flushAndClose();
Expand Down
50 changes: 44 additions & 6 deletions src/test/java/io/vertx/core/http/WebSocketTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,9 @@
package io.vertx.core.http;


import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocket13FrameDecoder;
import io.netty.handler.codec.http.websocketx.WebSocket13FrameEncoder;
import io.netty.handler.codec.http.websocketx.WebSocketHandshakeException;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.ReferenceCountUtil;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.AsyncResult;
Expand Down Expand Up @@ -2953,6 +2951,44 @@ private void testCloseCustomPayloadFromClient(Consumer<WebSocket> closeOp) {
await();
}

@Test
public void testServerWebSocketHandshakeWithNonPersistentConnection() {
server = vertx.createHttpServer();
server.webSocketHandler(ws -> {
ws.frameHandler(frame -> {
ws.close();
});
});
server.listen(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, onSuccess(v1 -> {
handshake(vertx.createHttpClient(), req -> {
MultiMap headers = req.headers();
headers.add("Connection", "close");
req.send(onSuccess(resp -> {
assertEquals(101, resp.statusCode());
resp.endHandler(v -> {
Http1xClientConnection conn = (Http1xClientConnection) req.connection();
NetSocketInternal soi = conn.toNetSocket();
soi.messageHandler(msg -> {
if (msg instanceof CloseWebSocketFrame) {
soi.close();
}
});
ChannelPipeline pipeline = soi.channelHandlerContext().pipeline();
pipeline.addBefore("handler", "encoder", new WebSocket13FrameEncoder(true));
pipeline.addBefore("handler", "decoder", new WebSocket13FrameDecoder(false, false, 1000));
pipeline.remove("codec");
soi.writeMessage(new PingWebSocketFrame()).onComplete(onSuccess(written -> {
}));
soi.closeHandler(v2 -> {
testComplete();
});
});
}));
});
}));
await();
}

@Test
public void testServerCloseHandshake() {
short status = (short)(4000 + TestUtils.randomPositiveInt() % 100);
Expand All @@ -2970,8 +3006,10 @@ public void testServerCloseHandshake() {
assertEquals(101, resp.statusCode());
Http1xClientConnection conn = (Http1xClientConnection) req.connection();
NetSocketInternal soi = conn.toNetSocket();
soi.channelHandlerContext().pipeline().addBefore("handler", "encoder", new WebSocket13FrameEncoder(true));
soi.channelHandlerContext().pipeline().addBefore("handler", "decoder", new WebSocket13FrameDecoder(false, false, 1000));
ChannelPipeline pipeline = soi.channelHandlerContext().pipeline();
pipeline.addBefore("handler", "encoder", new WebSocket13FrameEncoder(true));
pipeline.addBefore("handler", "decoder", new WebSocket13FrameDecoder(false, false, 1000));
pipeline.remove("codec");
String reason = randomAlphaString(10);
soi.writeMessage(new CloseWebSocketFrame(status, reason));
AtomicBoolean closeFrameReceived = new AtomicBoolean();
Expand Down

0 comments on commit 6f1e620

Please sign in to comment.