Skip to content

Disable writeHandlers by default #4626

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json, HttpSer
obj.setPerMessageWebSocketCompressionSupported((Boolean)member.getValue());
}
break;
case "registerWebSocketWriteHandlers":
if (member.getValue() instanceof Boolean) {
obj.setRegisterWebSocketWriteHandlers((Boolean)member.getValue());
}
break;
case "tracingPolicy":
if (member.getValue() instanceof String) {
obj.setTracingPolicy(io.vertx.core.tracing.TracingPolicy.valueOf((String)member.getValue()));
Expand Down Expand Up @@ -177,6 +182,7 @@ static void toJson(HttpServerOptions obj, java.util.Map<String, Object> json) {
json.put("maxWebSocketMessageSize", obj.getMaxWebSocketMessageSize());
json.put("perFrameWebSocketCompressionSupported", obj.getPerFrameWebSocketCompressionSupported());
json.put("perMessageWebSocketCompressionSupported", obj.getPerMessageWebSocketCompressionSupported());
json.put("registerWebSocketWriteHandlers", obj.isRegisterWebSocketWriteHandlers());
if (obj.getTracingPolicy() != null) {
json.put("tracingPolicy", obj.getTracingPolicy().name());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ public static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json,
obj.setAllowOriginHeader((Boolean)member.getValue());
}
break;
case "registerWriteHandlers":
if (member.getValue() instanceof Boolean) {
obj.setRegisterWriteHandlers((Boolean)member.getValue());
}
break;
case "subProtocols":
if (member.getValue() instanceof JsonArray) {
java.util.ArrayList<java.lang.String> list = new java.util.ArrayList<>();
Expand All @@ -50,6 +55,7 @@ public static void toJson(WebSocketConnectOptions obj, JsonObject json) {

public static void toJson(WebSocketConnectOptions obj, java.util.Map<String, Object> json) {
json.put("allowOriginHeader", obj.getAllowOriginHeader());
json.put("registerWriteHandlers", obj.isRegisterWriteHandlers());
if (obj.getSubProtocols() != null) {
JsonArray array = new JsonArray();
obj.getSubProtocols().forEach(item -> array.add(item));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json, NetClie
obj.setReconnectInterval(((Number)member.getValue()).longValue());
}
break;
case "registerWriteHandler":
if (member.getValue() instanceof Boolean) {
obj.setRegisterWriteHandler((Boolean)member.getValue());
}
break;
}
}
}
Expand All @@ -64,5 +69,6 @@ static void toJson(NetClientOptions obj, java.util.Map<String, Object> json) {
}
json.put("reconnectAttempts", obj.getReconnectAttempts());
json.put("reconnectInterval", obj.getReconnectInterval());
json.put("registerWriteHandler", obj.isRegisterWriteHandler());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json, NetServ
obj.setProxyProtocolTimeoutUnit(java.util.concurrent.TimeUnit.valueOf((String)member.getValue()));
}
break;
case "registerWriteHandler":
if (member.getValue() instanceof Boolean) {
obj.setRegisterWriteHandler((Boolean)member.getValue());
}
break;
case "sni":
if (member.getValue() instanceof Boolean) {
obj.setSni((Boolean)member.getValue());
Expand Down Expand Up @@ -81,6 +86,7 @@ static void toJson(NetServerOptions obj, java.util.Map<String, Object> json) {
if (obj.getProxyProtocolTimeoutUnit() != null) {
json.put("proxyProtocolTimeoutUnit", obj.getProxyProtocolTimeoutUnit().name());
}
json.put("registerWriteHandler", obj.isRegisterWriteHandler());
json.put("sni", obj.isSni());
json.put("useProxyProtocol", obj.isUseProxyProtocol());
}
Expand Down
66 changes: 63 additions & 3 deletions src/main/java/io/vertx/core/http/HttpServerOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,31 @@
package io.vertx.core.http;

import io.netty.handler.codec.compression.CompressionOptions;
import io.netty.handler.logging.ByteBufFormat;
import io.vertx.codegen.annotations.DataObject;
import io.vertx.codegen.annotations.Unstable;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.impl.Arguments;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.*;
import io.vertx.core.net.JdkSSLEngineOptions;
import io.vertx.core.net.JksOptions;
import io.vertx.core.net.KeyCertOptions;
import io.vertx.core.net.NetServerOptions;
import io.vertx.core.net.OpenSSLEngineOptions;
import io.vertx.core.net.PemKeyCertOptions;
import io.vertx.core.net.PemTrustOptions;
import io.vertx.core.net.PfxOptions;
import io.vertx.core.net.SSLEngineOptions;
import io.vertx.core.net.TrustOptions;
import io.vertx.core.tracing.TracingPolicy;
import io.netty.handler.logging.ByteBufFormat;

import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;

/**
Expand Down Expand Up @@ -149,6 +163,11 @@ public class HttpServerOptions extends NetServerOptions {
*/
public static final TracingPolicy DEFAULT_TRACING_POLICY = TracingPolicy.ALWAYS;

/**
* Whether write-handlers for server websockets should be registered by default = false.
*/
public static final boolean DEFAULT_REGISTER_WEBSOCKET_WRITE_HANDLERS = false;

private boolean compressionSupported;
private int compressionLevel;
private List<CompressionOptions> compressors;
Expand All @@ -173,6 +192,7 @@ public class HttpServerOptions extends NetServerOptions {
private boolean webSocketPreferredClientNoContext;
private int webSocketClosingTimeout;
private TracingPolicy tracingPolicy;
private boolean registerWebSocketWriteHandlers;

/**
* Default constructor
Expand Down Expand Up @@ -214,6 +234,7 @@ public HttpServerOptions(HttpServerOptions other) {
this.webSocketAllowServerNoContext = other.webSocketAllowServerNoContext;
this.webSocketClosingTimeout = other.webSocketClosingTimeout;
this.tracingPolicy = other.tracingPolicy;
this.registerWebSocketWriteHandlers = other.registerWebSocketWriteHandlers;
}

/**
Expand Down Expand Up @@ -262,6 +283,7 @@ private void init() {
webSocketAllowServerNoContext = DEFAULT_WEBSOCKET_ALLOW_SERVER_NO_CONTEXT;
webSocketClosingTimeout = DEFAULT_WEBSOCKET_CLOSING_TIMEOUT;
tracingPolicy = DEFAULT_TRACING_POLICY;
registerWebSocketWriteHandlers = DEFAULT_REGISTER_WEBSOCKET_WRITE_HANDLERS;
}

@Override
Expand Down Expand Up @@ -1030,4 +1052,42 @@ public HttpServerOptions setTracingPolicy(TracingPolicy tracingPolicy) {
this.tracingPolicy = tracingPolicy;
return this;
}

/**
* @return {@code false}, does not apply to HTTP servers
*/
@Override
public boolean isRegisterWriteHandler() {
return false;
}

/**
* Has no effect on HTTP server options.
*/
@Override
public HttpServerOptions setRegisterWriteHandler(boolean registerWriteHandler) {
return this;
}

/**
* @return {@code true} if write-handlers for server websockets should be registered on the {@link io.vertx.core.eventbus.EventBus}, otherwise {@code false}
*/
public boolean isRegisterWebSocketWriteHandlers() {
return registerWebSocketWriteHandlers;
}

/**
* Whether write-handlers for server websockets should be registered on the {@link io.vertx.core.eventbus.EventBus}.
* <p>
* Defaults to {@code false}.
*
* @param registerWebSocketWriteHandlers true to register write-handlers
* @return a reference to this, so the API can be used fluently
* @see WebSocketBase#textHandlerID()
* @see WebSocketBase#binaryHandlerID()
*/
public HttpServerOptions setRegisterWebSocketWriteHandlers(boolean registerWebSocketWriteHandlers) {
this.registerWebSocketWriteHandlers = registerWebSocketWriteHandlers;
return this;
}
}
20 changes: 17 additions & 3 deletions src/main/java/io/vertx/core/http/WebSocketBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@

package io.vertx.core.http;

import io.vertx.codegen.annotations.*;
import io.vertx.codegen.annotations.CacheReturn;
import io.vertx.codegen.annotations.Fluent;
import io.vertx.codegen.annotations.GenIgnore;
import io.vertx.codegen.annotations.Nullable;
import io.vertx.codegen.annotations.VertxGen;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
Expand Down Expand Up @@ -63,24 +67,34 @@ public interface WebSocketBase extends ReadStream<Buffer>, WriteStream<Buffer> {
WebSocketBase drainHandler(Handler<Void> handler);

/**
* When a {@code WebSocket} is created it automatically registers an event handler with the event bus - the ID of that
* When a {@code WebSocket} is created, it may register an event handler with the event bus - the ID of that
* handler is given by this method.
* <p>
* By default, no handler is registered, the feature must be enabled via {@link WebSocketConnectOptions#setRegisterWriteHandlers(boolean)} or {@link HttpServerOptions#setRegisterWebSocketWriteHandlers(boolean)}.
* <p>
* Given this ID, a different event loop can send a binary frame to that event handler using the event bus and
* that buffer will be received by this instance in its own event loop and written to the underlying connection. This
* allows you to write data to other WebSockets which are owned by different event loops.
*
* @return the binary handler id
* @see WebSocketConnectOptions#setRegisterWriteHandlers(boolean)
* @see HttpServerOptions#setRegisterWebSocketWriteHandlers(boolean)
*/
String binaryHandlerID();

/**
* When a {@code WebSocket} is created it automatically registers an event handler with the eventbus, the ID of that
* When a {@code WebSocket} is created, it may register an event handler with the eventbus, the ID of that
* handler is given by {@code textHandlerID}.
* <p>
* By default, no handler is registered, the feature must be enabled via {@link WebSocketConnectOptions#setRegisterWriteHandlers(boolean)} or {@link HttpServerOptions#setRegisterWebSocketWriteHandlers(boolean)}.
* <p>
* Given this ID, a different event loop can send a text frame to that event handler using the event bus and
* that buffer will be received by this instance in its own event loop and written to the underlying connection. This
* allows you to write data to other WebSockets which are owned by different event loops.
*
* @return the text handler id
* @see WebSocketConnectOptions#setRegisterWriteHandlers(boolean)
* @see HttpServerOptions#setRegisterWebSocketWriteHandlers(boolean)
*/
String textHandlerID();

Expand Down
30 changes: 30 additions & 0 deletions src/main/java/io/vertx/core/http/WebSocketConnectOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,23 @@ public class WebSocketConnectOptions extends RequestOptions {
*/
public static final boolean DEFAULT_ALLOW_ORIGIN_HEADER = true;

/**
* Whether write-handlers should be registered by default = false.
*/
public static final boolean DEFAULT_REGISTER_WRITE_HANDLERS = false;

private ProxyOptions proxyOptions;
private WebsocketVersion version;
private List<String> subProtocols;
private boolean allowOriginHeader;
private boolean registerWriteHandlers;

public WebSocketConnectOptions() {
proxyOptions = DEFAULT_PROXY_OPTIONS;
version = DEFAULT_VERSION;
subProtocols = DEFAULT_SUB_PROTOCOLS;
allowOriginHeader = DEFAULT_ALLOW_ORIGIN_HEADER;
registerWriteHandlers = DEFAULT_REGISTER_WRITE_HANDLERS;
}

public WebSocketConnectOptions(WebSocketConnectOptions other) {
Expand All @@ -66,6 +73,7 @@ public WebSocketConnectOptions(WebSocketConnectOptions other) {
this.version = other.version;
this.subProtocols = other.subProtocols;
this.allowOriginHeader = other.allowOriginHeader;
this.registerWriteHandlers = other.registerWriteHandlers;
}

public WebSocketConnectOptions(JsonObject json) {
Expand Down Expand Up @@ -235,4 +243,26 @@ public JsonObject toJson() {
WebSocketConnectOptionsConverter.toJson(this, json);
return json;
}

/**
* @return {@code true} if write-handlers should be registered on the {@link io.vertx.core.eventbus.EventBus}, otherwise {@code false}
*/
public boolean isRegisterWriteHandlers() {
return registerWriteHandlers;
}

/**
* Whether write-handlers should be registered on the {@link io.vertx.core.eventbus.EventBus}.
* <p>
* Defaults to {@code false}.
*
* @param registerWriteHandlers true to register write-handlers
* @return a reference to this, so the API can be used fluently
* @see WebSocketBase#textHandlerID()
* @see WebSocketBase#binaryHandlerID()
*/
public WebSocketConnectOptions setRegisterWriteHandlers(boolean registerWriteHandlers) {
this.registerWriteHandlers = registerWriteHandlers;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http.websocketx.WebSocket07FrameDecoder;
import io.netty.handler.codec.http.websocketx.WebSocket08FrameDecoder;
Expand Down Expand Up @@ -190,7 +190,7 @@ public long concurrency() {
*/
public NetSocketInternal toNetSocket() {
removeChannelHandlers();
NetSocketImpl socket = new NetSocketImpl(context, chctx, null, metrics());
NetSocketImpl socket = new NetSocketImpl(context, chctx, null, metrics(), false);
socket.metric(metric());
evictionHandler.handle(null);
chctx.pipeline().replace("handler", "handler", VertxHandler.create(ctx -> socket));
Expand Down Expand Up @@ -946,6 +946,7 @@ synchronized void toWebSocket(
WebsocketVersion vers,
List<String> subProtocols,
long handshakeTimeout,
boolean registerWriteHandlers,
int maxWebSocketFrameSize,
Promise<WebSocket> promise) {
try {
Expand Down Expand Up @@ -1008,7 +1009,8 @@ synchronized void toWebSocket(
version != WebSocketVersion.V00,
options.getWebSocketClosingTimeout(),
options.getMaxWebSocketFrameSize(),
options.getMaxWebSocketMessageSize());
options.getMaxWebSocketMessageSize(),
registerWriteHandlers);
w.subProtocol(handshaker.actualSubprotocol());
return w;
});
Expand Down
Loading