Skip to content

Commit

Permalink
WebSockets Next: get rid of UniHelper#toUni()
Browse files Browse the repository at this point in the history
- and replace it with Uni.createFrom().completionStage(Supplier)
- we need lazy subscription of the produced Uni
  • Loading branch information
mkouba committed Jun 26, 2024
1 parent a5ffb89 commit 450681e
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ void testError() throws InterruptedException {
.connectAndAwait();
connection.sendTextAndAwait("foo");
assertFalse(connection.isClosed());
connection.sendText("bar");
connection.sendTextAndAwait("bar");
assertTrue(ClientMessageErrorEndpoint.MESSAGE_LATCH.await(5, TimeUnit.SECONDS));
assertEquals("bar", ClientMessageErrorEndpoint.MESSAGES.get(0));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import io.quarkus.websockets.next.WebSocketClientException;
import io.quarkus.websockets.next.WebSocketsClientRuntimeConfig;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.UniHelper;
import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
Expand Down Expand Up @@ -144,7 +143,7 @@ public Uni<WebSocketClientConnection> connect() {
throw new WebSocketClientException(e);
}

return UniHelper.toUni(client.connect(connectOptions))
return Uni.createFrom().completionStage(() -> client.connect(connectOptions).toCompletionStage())
.map(ws -> {
String clientId = BasicWebSocketConnector.class.getName();
TrafficLogger trafficLogger = TrafficLogger.forClient(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import io.quarkus.websockets.next.HandshakeRequest;
import io.quarkus.websockets.next.WebSocketConnection.BroadcastSender;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.UniHelper;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.buffer.impl.BufferImpl;
import io.vertx.core.http.WebSocketBase;
Expand Down Expand Up @@ -55,14 +54,14 @@ public String pathParam(String name) {
}

public Uni<Void> sendText(String message) {
Uni<Void> uni = UniHelper.toUni(webSocket().writeTextMessage(message));
Uni<Void> uni = Uni.createFrom().completionStage(() -> webSocket().writeTextMessage(message).toCompletionStage());
return trafficLogger == null ? uni : uni.invoke(() -> {
trafficLogger.textMessageSent(this, message);
});
}

public Uni<Void> sendBinary(Buffer message) {
Uni<Void> uni = UniHelper.toUni(webSocket().writeBinaryMessage(message));
Uni<Void> uni = Uni.createFrom().completionStage(() -> webSocket().writeBinaryMessage(message).toCompletionStage());
return trafficLogger == null ? uni : uni.invoke(() -> trafficLogger.binaryMessageSent(this, message));
}

Expand All @@ -81,7 +80,7 @@ public <M> Uni<Void> sendText(M message) {
}

public Uni<Void> sendPing(Buffer data) {
return UniHelper.toUni(webSocket().writePing(data));
return Uni.createFrom().completionStage(() -> webSocket().writePing(data).toCompletionStage());
}

void sendAutoPing() {
Expand All @@ -93,7 +92,7 @@ void sendAutoPing() {
}

public Uni<Void> sendPong(Buffer data) {
return UniHelper.toUni(webSocket().writePong(data));
return Uni.createFrom().completionStage(() -> webSocket().writePong(data).toCompletionStage());
}

public Uni<Void> close() {
Expand All @@ -105,7 +104,8 @@ public Uni<Void> close(CloseReason reason) {
LOG.warnf("Connection already closed: %s", this);
return Uni.createFrom().voidItem();
}
return UniHelper.toUni(webSocket().close((short) reason.getCode(), reason.getMessage()));
return Uni.createFrom()
.completionStage(() -> webSocket().close((short) reason.getCode(), reason.getMessage()).toCompletionStage());
}

public boolean isSecure() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import io.quarkus.websockets.next.runtime.WebSocketClientRecorder.ClientEndpoint;
import io.quarkus.websockets.next.runtime.WebSocketClientRecorder.ClientEndpointsContext;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.UniHelper;
import io.vertx.core.Vertx;
import io.vertx.core.http.WebSocketClient;
import io.vertx.core.http.WebSocketConnectOptions;
Expand Down Expand Up @@ -92,7 +91,7 @@ public Uni<WebSocketClientConnection> connect() {
}
subprotocols.forEach(connectOptions::addSubProtocol);

return UniHelper.toUni(client.connect(connectOptions))
return Uni.createFrom().completionStage(() -> client.connect(connectOptions).toCompletionStage())
.map(ws -> {
TrafficLogger trafficLogger = TrafficLogger.forClient(config);
WebSocketClientConnectionImpl connection = new WebSocketClientConnectionImpl(clientEndpoint.clientId, ws,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import io.quarkus.websockets.next.runtime.ConcurrencyLimiter.PromiseComplete;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.UniHelper;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
Expand Down Expand Up @@ -256,7 +255,7 @@ public void handle(Void event) {
}
}
});
return UniHelper.toUni(promise.future());
return Uni.createFrom().completionStage(() -> promise.future().toCompletionStage());
}

public Object beanInstance() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor;
import io.smallrye.mutiny.vertx.UniHelper;
import io.vertx.core.Vertx;
import io.vertx.core.http.WebSocket;
import io.vertx.core.http.WebSocketClient;
Expand Down Expand Up @@ -110,12 +109,13 @@ public Uni<JsonObject> openDevConnection(String path, String endpointPath) {
}
WebSocketClient client = vertx.createWebSocketClient();
String connectionKey = UUID.randomUUID().toString();
Uni<WebSocket> uni = UniHelper.toUni(client
Uni<WebSocket> uni = Uni.createFrom().completionStage(() -> client
.connect(new WebSocketConnectOptions()
.setPort(httpConfig.port)
.setHost(httpConfig.host)
.setURI(path)
.addHeader(DEVUI_SOCKET_KEY_HEADER, connectionKey)));
.addHeader(DEVUI_SOCKET_KEY_HEADER, connectionKey))
.toCompletionStage());
return uni.onItem().transform(s -> {
LOG.debugf("Opened Dev UI connection with key %s to %s", connectionKey, path);
List<TextMessage> messages = new ArrayList<>();
Expand Down Expand Up @@ -181,7 +181,7 @@ private static String normalize(String path) {
public Uni<JsonObject> closeDevConnection(String connectionKey) {
DevWebSocket socket = sockets.remove(connectionKey);
if (socket != null) {
Uni<Void> uni = UniHelper.toUni(socket.socket.close());
Uni<Void> uni = Uni.createFrom().completionStage(() -> socket.socket.close().toCompletionStage());
return uni.onItem().transform(v -> {
LOG.debugf("Closed Dev UI connection with key %s", connectionKey);
return new JsonObject().put("success", true);
Expand All @@ -196,7 +196,7 @@ public Uni<JsonObject> closeDevConnection(String connectionKey) {
public Uni<JsonObject> sendTextMessage(String connectionKey, String message) {
DevWebSocket socket = sockets.get(connectionKey);
if (socket != null) {
Uni<Void> uni = UniHelper.toUni(socket.socket.writeTextMessage(message));
Uni<Void> uni = Uni.createFrom().completionStage(() -> socket.socket.writeTextMessage(message).toCompletionStage());
return uni.onItem().transform(v -> {
List<TextMessage> messages = socket.messages;
synchronized (messages) {
Expand Down

0 comments on commit 450681e

Please sign in to comment.