diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/CoreSession.java b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/CoreSession.java index 4c662787244b..7d6c83655e11 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/CoreSession.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/CoreSession.java @@ -19,9 +19,11 @@ import java.nio.channels.ReadPendingException; import java.util.List; import java.util.Map; +import java.util.function.Predicate; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.websocket.core.exception.WebSocketTimeoutException; /** * Represents the outgoing Frames. @@ -191,6 +193,19 @@ default void close(CloseStatus closeStatus, Callback callback) */ boolean isRsv3Used(); + /** + *

Adds a listener for websocket idle timeouts.

+ *

The listener is a predicate function that should return {@code true} to indicate + * that the timeout should be handled as a fatal failure or {@code false} to ignore + * that specific timeout and for another timeout to occur after another idle period.

+ *

Listeners are processed in the same order they are added, and the first that + * returns {@code true} stops the processing of subsequent listeners, which are + * therefore not invoked.

+ * + * @param onIdleTimeout the idle timeout listener as a predicate function + */ + void addIdleTimeoutListener(Predicate onIdleTimeout); + class Empty extends ConfigurationCustomizer implements CoreSession { @Override @@ -341,5 +356,10 @@ public boolean isRsv3Used() { return false; } + + @Override + public void addIdleTimeoutListener(Predicate onIdleTimeout) + { + } } } diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/WebSocketConnection.java b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/WebSocketConnection.java index 095c01071a47..09a8e390170c 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/WebSocketConnection.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/WebSocketConnection.java @@ -225,21 +225,11 @@ public boolean onIdleExpired(TimeoutException timeoutException) { if (LOG.isDebugEnabled()) LOG.debug("onIdleExpired()"); - - // treat as a handler error because socket is still open - coreSession.processHandlerError(new WebSocketTimeoutException("Connection Idle Timeout", timeoutException), Callback.NOOP); - return true; - } - - @Override - protected boolean onReadTimeout(TimeoutException timeout) - { - if (LOG.isDebugEnabled()) - LOG.debug("onReadTimeout()"); - - // treat as a handler error because socket is still open - coreSession.processHandlerError(new WebSocketTimeoutException("Timeout on Read", timeout), Callback.NOOP); - return false; + WebSocketTimeoutException exception = new WebSocketTimeoutException("Connection Idle Timeout", timeoutException); + boolean closeConnection = coreSession.onIdleTimeout(exception); + if (closeConnection) + coreSession.processConnectionError(exception, Callback.NOOP); + return closeConnection; } protected void onFrame(Frame.Parsed frame) diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/WebSocketCoreSession.java b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/WebSocketCoreSession.java index e328e018bb0d..57f49cff519c 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/WebSocketCoreSession.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/WebSocketCoreSession.java @@ -25,6 +25,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Predicate; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.util.Callback; @@ -71,6 +72,7 @@ public class WebSocketCoreSession implements CoreSession, Dumpable private Duration idleTimeout = WebSocketConstants.DEFAULT_IDLE_TIMEOUT; private Duration writeTimeout = WebSocketConstants.DEFAULT_WRITE_TIMEOUT; private ClassLoader classLoader; + private Predicate _onIdleTimeout; public WebSocketCoreSession(FrameHandler handler, Behavior behavior, Negotiated negotiated, WebSocketComponents components) { @@ -304,24 +306,7 @@ else if (cause instanceof WebSocketTimeoutException || cause instanceof TimeoutE else code = CloseStatus.NO_CLOSE; - CloseStatus closeStatus = new CloseStatus(code, cause); - if (CloseStatus.isTransmittableStatusCode(code)) - { - close(closeStatus, callback); - } - else - { - if (sessionState.onClosed(closeStatus)) - { - closeConnection(closeStatus, callback); - } - else - { - // We are already closed because of a previous failure. - // Succeed because failing might re-enter this branch if it's the Frame callback. - callback.succeeded(); - } - } + processError(new CloseStatus(code, cause), callback); } /** @@ -348,8 +333,12 @@ else if (behavior == Behavior.CLIENT) else code = CloseStatus.SERVER_ERROR; - CloseStatus closeStatus = new CloseStatus(code, cause); - if (CloseStatus.isTransmittableStatusCode(code)) + processError(new CloseStatus(code, cause), callback); + } + + private void processError(CloseStatus closeStatus, Callback callback) + { + if (CloseStatus.isTransmittableStatusCode(closeStatus.getCode())) { close(closeStatus, callback); } @@ -403,7 +392,7 @@ public void onOpen() { openCallback.failed(t); - /* This is double handling of the exception but we need to do this because we have two separate + /* This is double handling of the exception, but we need to do this because we have two separate mechanisms for returning the CoreSession, onOpen and the CompletableFuture and both the onOpen callback and the CompletableFuture require the exception. */ throw new RuntimeException(t); @@ -434,6 +423,36 @@ public boolean isRsv3Used() return getExtensionStack().isRsv3Used(); } + @Override + public void addIdleTimeoutListener(Predicate onIdleTimeout) + { + if (_onIdleTimeout == null) + { + _onIdleTimeout = onIdleTimeout; + } + else + { + Predicate previous = _onIdleTimeout; + _onIdleTimeout = throwable -> + { + if (!previous.test(throwable)) + return onIdleTimeout.test(throwable); + return true; + }; + } + } + + /** + * @return true to let the EndPoint handle the timeout, false to tell the EndPoint to halt the handling of the timeout. + **/ + boolean onIdleTimeout(WebSocketTimeoutException timeoutException) + { + if (_onIdleTimeout == null) + return true; + else + return _onIdleTimeout.test(timeoutException); + } + public WebSocketConnection getConnection() { return connection; diff --git a/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/Session.java b/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/Session.java index aba060cb4a3c..23a03578b79b 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/Session.java +++ b/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/Session.java @@ -16,8 +16,10 @@ import java.io.Closeable; import java.net.SocketAddress; import java.nio.ByteBuffer; +import java.util.function.Predicate; import org.eclipse.jetty.websocket.api.annotations.WebSocket; +import org.eclipse.jetty.websocket.api.exceptions.WebSocketTimeoutException; /** *

{@link Session} represents an active link of @@ -201,6 +203,19 @@ default void close() */ boolean isSecure(); + /** + *

Adds a listener for websocket idle timeouts.

+ *

The listener is a predicate function that should return {@code true} to indicate + * that the timeout should be handled as a fatal failure or {@code false} to ignore + * that specific timeout and for another timeout to occur after another idle period.

+ *

Listeners are processed in the same order they are added, and the first that + * returns {@code true} stops the processing of subsequent listeners, which are + * therefore not invoked.

+ * + * @param onIdleTimeout the idle timeout listener as a predicate function + */ + void addIdleTimeoutListener(Predicate onIdleTimeout); + /** *

The passive link of communication with a remote WebSocket endpoint.

*

Applications provide WebSocket endpoints that implement this interface diff --git a/jetty-core/jetty-websocket/jetty-websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java b/jetty-core/jetty-websocket/jetty-websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java index 79ad7521625d..6a8ac90a70df 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java +++ b/jetty-core/jetty-websocket/jetty-websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java @@ -18,6 +18,7 @@ import java.nio.ByteBuffer; import java.time.Duration; import java.util.Objects; +import java.util.function.Predicate; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.component.Dumpable; @@ -26,6 +27,7 @@ import org.eclipse.jetty.websocket.api.UpgradeRequest; import org.eclipse.jetty.websocket.api.UpgradeResponse; import org.eclipse.jetty.websocket.api.WebSocketContainer; +import org.eclipse.jetty.websocket.api.exceptions.WebSocketTimeoutException; import org.eclipse.jetty.websocket.core.CoreSession; import org.eclipse.jetty.websocket.core.Frame; import org.eclipse.jetty.websocket.core.OpCode; @@ -274,6 +276,12 @@ public boolean isSecure() return upgradeRequest.isSecure(); } + @Override + public void addIdleTimeoutListener(Predicate onIdleTimeout) + { + coreSession.addIdleTimeoutListener(t -> onIdleTimeout.test(new WebSocketTimeoutException(t))); + } + @Override public void disconnect() { diff --git a/jetty-core/jetty-websocket/jetty-websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/WebSocketIdleTimeoutTest.java b/jetty-core/jetty-websocket/jetty-websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/WebSocketIdleTimeoutTest.java new file mode 100644 index 000000000000..d170875ccf9a --- /dev/null +++ b/jetty-core/jetty-websocket/jetty-websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/WebSocketIdleTimeoutTest.java @@ -0,0 +1,111 @@ +// +// ======================================================================== +// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.websocket.tests; + +import java.net.URI; +import java.time.Duration; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.exceptions.WebSocketTimeoutException; +import org.eclipse.jetty.websocket.client.WebSocketClient; +import org.eclipse.jetty.websocket.server.WebSocketUpgradeHandler; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.eclipse.jetty.websocket.api.StatusCode.UNDEFINED; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class WebSocketIdleTimeoutTest +{ + private static final int IDLE_TIMEOUT = 1000; + private final AtomicBoolean _allowTimeout = new AtomicBoolean(); + private Server _server; + private ServerConnector _connector; + private WebSocketClient _client; + private TimeoutEndpoint _serverEndpoint; + + @BeforeEach + public void before() throws Exception + { + _server = new Server(); + _connector = new ServerConnector(_server); + _server.addConnector(_connector); + WebSocketUpgradeHandler upgradeHandler = WebSocketUpgradeHandler.from(_server); + _serverEndpoint = new TimeoutEndpoint(); + upgradeHandler.getServerWebSocketContainer().addMapping("/", (req, resp, cb) -> _serverEndpoint); + upgradeHandler.getServerWebSocketContainer().setIdleTimeout(Duration.ofMillis(IDLE_TIMEOUT)); + _server.setHandler(upgradeHandler); + _server.start(); + + _client = new WebSocketClient(); + _client.start(); + } + + @AfterEach + public void after() throws Exception + { + _client.stop(); + _server.stop(); + } + + public class TimeoutEndpoint extends EventSocket + { + volatile CountDownLatch timeoutLatch; + + public void awaitTimeouts(int num) throws InterruptedException + { + timeoutLatch = new CountDownLatch(num); + timeoutLatch.await(); + } + + @Override + public void onOpen(Session session) + { + session.addIdleTimeoutListener(t -> + { + if (timeoutLatch != null) + timeoutLatch.countDown(); + return _allowTimeout.get(); + }); + super.onOpen(session); + } + } + + @Test + public void testWebSocketIdleTimeout() throws Exception + { + EventSocket clientEndpoint = new EventSocket(); + _client.connect(clientEndpoint, URI.create("ws://localhost:" + _connector.getLocalPort())); + assertTrue(_serverEndpoint.openLatch.await(5, TimeUnit.SECONDS)); + + // The WebSocket connection has not been closed but multiple timeout events have occurred. + _serverEndpoint.awaitTimeouts(3); + assertThat(_serverEndpoint.closeCode, equalTo(UNDEFINED)); + assertThat(_serverEndpoint.closeLatch.getCount(), equalTo(1L)); + + // Allow the timeout listener to close the connection. + _allowTimeout.set(true); + assertTrue(_serverEndpoint.closeLatch.await(5, TimeUnit.SECONDS)); + assertThat(_serverEndpoint.error, instanceOf(WebSocketTimeoutException.class)); + } +}