Skip to content

Commit

Permalink
WebSockets Next: make it possible to store user data in a connection
Browse files Browse the repository at this point in the history
- resolves quarkusio#43772
  • Loading branch information
mkouba committed Oct 10, 2024
1 parent af494da commit a3d3a32
Show file tree
Hide file tree
Showing 8 changed files with 396 additions and 167 deletions.
69 changes: 69 additions & 0 deletions docs/src/main/asciidoc/websockets-next-reference.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,40 @@ class MyBean {
There are also other convenient methods.
For example, `OpenConnections#findByEndpointId(String)` makes it easy to find connections for a specific endpoint.

==== User data

It is also possible to associate arbitrary user data with a specific connection.
The `io.quarkus.websockets.next.UserData` object obtained by the `WebSocketConnection#userData()` method represents mutable user data associated with a connection.

[source, java]
----
import io.quarkus.websockets.next.WebSocketConnection;
import io.quarkus.websockets.next.UserData.TypedKey;
@WebSocket(path = "/endpoint/{username}")
class MyEndpoint {
@Inject
CoolService service;
@OnOpen
void open(WebSocketConnection connection) {
connection.userData().put(TypedKey.forBoolean("isCool"), service.isCool(connection.pathParam("username"))); <1>
}
@OnTextMessage
String process(String message) {
if (connection.userData().get(TypedKey.forBoolean("isCool"))) { <2>
return "Cool message processed!";
} else {
return "Message processed!";
}
}
}
----
<1> `CoolService#isCool()` returns `Boolean` that is associated with the current connection.
<2> The `TypedKey.forBoolean("isCool")` is the key used to obtain the data stored when the connection was created.

[[server-cdi-events]]
==== CDI events

Expand Down Expand Up @@ -997,6 +1031,41 @@ class MyBean {
There are also other convenient methods.
For example, `OpenClientConnections#findByClientId(String)` makes it easy to find connections for a specific endpoint.

==== User data

It is also possible to associate arbitrary user data with a specific connection.
The `io.quarkus.websockets.next.UserData` object obtained by the `WebSocketClientConnection#userData()` method represents mutable user data associated with a connection.

[source, java]
----
import io.quarkus.websockets.next.WebSocketClientConnection;
import io.quarkus.websockets.next.UserData.TypedKey;
@WebSocketClient(path = "/endpoint/{username}")
class MyEndpoint {
@Inject
CoolService service;
@OnOpen
void open(WebSocketClientConnection connection) {
connection.userData().put(TypedKey.forBoolean("isCool"), service.isCool(connection.pathParam("username"))); <1>
}
@OnTextMessage
String process(String message) {
if (connection.userData().get(TypedKey.forBoolean("isCool"))) { <2>
return "Cool message processed!";
} else {
return "Message processed!";
}
}
}
----
<1> `CoolService#isCool()` returns `Boolean` that is associated with the current connection.
<2> The `TypedKey.forBoolean("isCool")` is the key used to obtain the data stored when the connection was created.


[[client-cdi-events]]
==== CDI events

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package io.quarkus.websockets.next.test.connection;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;

import java.net.URI;
import java.util.List;

import jakarta.inject.Inject;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.websockets.next.OnOpen;
import io.quarkus.websockets.next.OnTextMessage;
import io.quarkus.websockets.next.OpenConnections;
import io.quarkus.websockets.next.UserData.TypedKey;
import io.quarkus.websockets.next.WebSocket;
import io.quarkus.websockets.next.WebSocketConnection;
import io.quarkus.websockets.next.test.utils.WSClient;
import io.vertx.core.Vertx;

public class ConnectionUserDataTest {

@RegisterExtension
public static final QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot(root -> {
root.addClasses(MyEndpoint.class, WSClient.class);
});

@Inject
Vertx vertx;

@TestHTTPResource("/end")
URI baseUri;

@Inject
OpenConnections connections;

@Test
void testConnectionData() {
try (WSClient client = WSClient.create(vertx).connect(baseUri)) {
assertEquals("5", client.sendAndAwaitReply("bar").toString());
assertNotNull(connections.stream().filter(c -> c.userData().get(TypedKey.forString("username")) != null).findFirst()
.orElse(null));
assertEquals("FOOMartin", client.sendAndAwaitReply("foo").toString());
assertEquals("0", client.sendAndAwaitReply("bar").toString());
}
}

@WebSocket(path = "/end")
public static class MyEndpoint {

@OnOpen
void onOpen(WebSocketConnection connection) {
connection.userData().put(TypedKey.forInt("baz"), 5);
connection.userData().put(TypedKey.forLong("foo"), 42l);
connection.userData().put(TypedKey.forString("username"), "Martin");
connection.userData().put(TypedKey.forBoolean("isActive"), true);
connection.userData().put(new TypedKey<List<String>>("list"), List.of());
}

@OnTextMessage
public String onMessage(String message, WebSocketConnection connection) {
if ("bar".equals(message)) {
return connection.userData().size() + "";
}
try {
connection.userData().get(TypedKey.forString("foo")).toString();
throw new IllegalStateException();
} catch (ClassCastException expected) {
}
if (!connection.userData().get(TypedKey.forBoolean("isActive"))
|| !connection.userData().get(new TypedKey<List<String>>("list")).isEmpty()) {
return "NOK";
}
if (connection.userData().remove(TypedKey.forLong("foo")) != 42l) {
throw new IllegalStateException();
}
if (connection.userData().remove(TypedKey.forInt("baz")) != 5) {
throw new IllegalStateException();
}
String ret = message.toUpperCase() + connection.userData().get(TypedKey.forString("username"));
connection.userData().clear();
return ret;
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package io.quarkus.websockets.next;

import java.time.Instant;

import io.smallrye.common.annotation.CheckReturnValue;
import io.smallrye.mutiny.Uni;

/**
* WebSocket connection.
*
* @see WebSocketConnection
* @see WebSocketClientConnection
*/
public interface Connection extends BlockingSender {

/**
*
* @return the unique identifier assigned to this connection
*/
String id();

/**
*
* @param name
* @return the value of the path parameter or {@code null}
* @see WebSocketClient#path()
*/
String pathParam(String name);

/**
* @return {@code true} if the HTTP connection is encrypted via SSL/TLS
*/
boolean isSecure();

/**
* @return {@code true} if the WebSocket is closed
*/
boolean isClosed();

/**
*
* @return the close reason or {@code null} if the connection is not closed
*/
CloseReason closeReason();

/**
*
* @return {@code true} if the WebSocket is open
*/
default boolean isOpen() {
return !isClosed();
}

/**
* Close the connection.
*
* @return a new {@link Uni} with a {@code null} item
*/
@CheckReturnValue
default Uni<Void> close() {
return close(CloseReason.NORMAL);
}

/**
* Close the connection with a specific reason.
*
* @param reason
* @return a new {@link Uni} with a {@code null} item
*/
Uni<Void> close(CloseReason reason);

/**
* Close the connection and wait for the completion.
*/
default void closeAndAwait() {
close().await().indefinitely();
}

/**
* Close the connection with a specific reason and wait for the completion.
*/
default void closeAndAwait(CloseReason reason) {
close(reason).await().indefinitely();
}

/**
*
* @return the handshake request
*/
HandshakeRequest handshakeRequest();

/**
*
* @return the time when this connection was created
*/
Instant creationTime();

/**
*
* @return the user data associated with this connection
*/
UserData userData();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package io.quarkus.websockets.next;

/**
* Mutable user data associated with a connection. Implementations must be thread-safe.
*/
public interface UserData {

/**
*
* @param <VALUE>
* @param key
* @return the value or {@code null} if no mapping is found
*/
<VALUE> VALUE get(TypedKey<VALUE> key);

/**
* Associates the specified value with the specified key. An old value is replaced by the specified value.
*
* @param <ConnectionData.VALUE>
* @param key
* @param value
* @return the previous value associated with {@code key}, or {@code null} if no mapping exists
*/
<VALUE> VALUE put(TypedKey<VALUE> key, VALUE value);

/**
*
* @param <VALUE>
* @param key
*/
<VALUE> VALUE remove(TypedKey<VALUE> key);

int size();

void clear();

/**
* @param <TYPE> The type this key is used for.
*/
record TypedKey<TYPE>(String value) {

public static TypedKey<Integer> forInt(String key) {
return new TypedKey<>(key);
}

public static TypedKey<Long> forLong(String key) {
return new TypedKey<>(key);
}

public static TypedKey<String> forString(String key) {
return new TypedKey<>(key);
}

public static TypedKey<Boolean> forBoolean(String key) {
return new TypedKey<>(key);
}
}

}
Loading

0 comments on commit a3d3a32

Please sign in to comment.