Skip to content

Commit

Permalink
Merge pull request #12359 from jetty/jetty-12.1.x-11358-websocketTime…
Browse files Browse the repository at this point in the history
…outs

Issue #11358 - Add API to handle timeouts in WebSocket
  • Loading branch information
lachlan-roberts authored Oct 15, 2024
2 parents 93569ee + a3b6004 commit d43241b
Show file tree
Hide file tree
Showing 6 changed files with 199 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
import java.nio.channels.ReadPendingException;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;

import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.exception.WebSocketTimeoutException;

/**
* Represents the outgoing Frames.
Expand Down Expand Up @@ -191,6 +193,19 @@ default void close(CloseStatus closeStatus, Callback callback)
*/
boolean isRsv3Used();

/**
* <p>Adds a listener for websocket idle timeouts.</p>
* <p>The listener is a predicate function that should return {@code true} to indicate
* that the timeout should be handled as a fatal failure or {@code false} to ignore
* that specific timeout and for another timeout to occur after another idle period.</p>
* <p>Listeners are processed in the same order they are added, and the first that
* returns {@code true} stops the processing of subsequent listeners, which are
* therefore not invoked.</p>
*
* @param onIdleTimeout the idle timeout listener as a predicate function
*/
void addIdleTimeoutListener(Predicate<WebSocketTimeoutException> onIdleTimeout);

class Empty extends ConfigurationCustomizer implements CoreSession
{
@Override
Expand Down Expand Up @@ -341,5 +356,10 @@ public boolean isRsv3Used()
{
return false;
}

@Override
public void addIdleTimeoutListener(Predicate<WebSocketTimeoutException> onIdleTimeout)
{
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -225,21 +225,11 @@ public boolean onIdleExpired(TimeoutException timeoutException)
{
if (LOG.isDebugEnabled())
LOG.debug("onIdleExpired()");

// treat as a handler error because socket is still open
coreSession.processHandlerError(new WebSocketTimeoutException("Connection Idle Timeout", timeoutException), Callback.NOOP);
return true;
}

@Override
protected boolean onReadTimeout(TimeoutException timeout)
{
if (LOG.isDebugEnabled())
LOG.debug("onReadTimeout()");

// treat as a handler error because socket is still open
coreSession.processHandlerError(new WebSocketTimeoutException("Timeout on Read", timeout), Callback.NOOP);
return false;
WebSocketTimeoutException exception = new WebSocketTimeoutException("Connection Idle Timeout", timeoutException);
boolean closeConnection = coreSession.onIdleTimeout(exception);
if (closeConnection)
coreSession.processConnectionError(exception, Callback.NOOP);
return closeConnection;
}

protected void onFrame(Frame.Parsed frame)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;

import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.Callback;
Expand Down Expand Up @@ -71,6 +72,7 @@ public class WebSocketCoreSession implements CoreSession, Dumpable
private Duration idleTimeout = WebSocketConstants.DEFAULT_IDLE_TIMEOUT;
private Duration writeTimeout = WebSocketConstants.DEFAULT_WRITE_TIMEOUT;
private ClassLoader classLoader;
private Predicate<WebSocketTimeoutException> _onIdleTimeout;

public WebSocketCoreSession(FrameHandler handler, Behavior behavior, Negotiated negotiated, WebSocketComponents components)
{
Expand Down Expand Up @@ -304,24 +306,7 @@ else if (cause instanceof WebSocketTimeoutException || cause instanceof TimeoutE
else
code = CloseStatus.NO_CLOSE;

CloseStatus closeStatus = new CloseStatus(code, cause);
if (CloseStatus.isTransmittableStatusCode(code))
{
close(closeStatus, callback);
}
else
{
if (sessionState.onClosed(closeStatus))
{
closeConnection(closeStatus, callback);
}
else
{
// We are already closed because of a previous failure.
// Succeed because failing might re-enter this branch if it's the Frame callback.
callback.succeeded();
}
}
processError(new CloseStatus(code, cause), callback);
}

/**
Expand All @@ -348,8 +333,12 @@ else if (behavior == Behavior.CLIENT)
else
code = CloseStatus.SERVER_ERROR;

CloseStatus closeStatus = new CloseStatus(code, cause);
if (CloseStatus.isTransmittableStatusCode(code))
processError(new CloseStatus(code, cause), callback);
}

private void processError(CloseStatus closeStatus, Callback callback)
{
if (CloseStatus.isTransmittableStatusCode(closeStatus.getCode()))
{
close(closeStatus, callback);
}
Expand Down Expand Up @@ -403,7 +392,7 @@ public void onOpen()
{
openCallback.failed(t);

/* This is double handling of the exception but we need to do this because we have two separate
/* This is double handling of the exception, but we need to do this because we have two separate
mechanisms for returning the CoreSession, onOpen and the CompletableFuture and both the onOpen callback
and the CompletableFuture require the exception. */
throw new RuntimeException(t);
Expand Down Expand Up @@ -434,6 +423,36 @@ public boolean isRsv3Used()
return getExtensionStack().isRsv3Used();
}

@Override
public void addIdleTimeoutListener(Predicate<WebSocketTimeoutException> onIdleTimeout)
{
if (_onIdleTimeout == null)
{
_onIdleTimeout = onIdleTimeout;
}
else
{
Predicate<WebSocketTimeoutException> previous = _onIdleTimeout;
_onIdleTimeout = throwable ->
{
if (!previous.test(throwable))
return onIdleTimeout.test(throwable);
return true;
};
}
}

/**
* @return true to let the EndPoint handle the timeout, false to tell the EndPoint to halt the handling of the timeout.
**/
boolean onIdleTimeout(WebSocketTimeoutException timeoutException)
{
if (_onIdleTimeout == null)
return true;
else
return _onIdleTimeout.test(timeoutException);
}

public WebSocketConnection getConnection()
{
return connection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
import java.io.Closeable;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.function.Predicate;

import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.api.exceptions.WebSocketTimeoutException;

/**
* <p>{@link Session} represents an active link of
Expand Down Expand Up @@ -201,6 +203,19 @@ default void close()
*/
boolean isSecure();

/**
* <p>Adds a listener for websocket idle timeouts.</p>
* <p>The listener is a predicate function that should return {@code true} to indicate
* that the timeout should be handled as a fatal failure or {@code false} to ignore
* that specific timeout and for another timeout to occur after another idle period.</p>
* <p>Listeners are processed in the same order they are added, and the first that
* returns {@code true} stops the processing of subsequent listeners, which are
* therefore not invoked.</p>
*
* @param onIdleTimeout the idle timeout listener as a predicate function
*/
void addIdleTimeoutListener(Predicate<WebSocketTimeoutException> onIdleTimeout);

/**
* <p>The passive link of communication with a remote WebSocket endpoint.</p>
* <p>Applications provide WebSocket endpoints that implement this interface
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Objects;
import java.util.function.Predicate;

import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.component.Dumpable;
Expand All @@ -26,6 +27,7 @@
import org.eclipse.jetty.websocket.api.UpgradeRequest;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.api.WebSocketContainer;
import org.eclipse.jetty.websocket.api.exceptions.WebSocketTimeoutException;
import org.eclipse.jetty.websocket.core.CoreSession;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.OpCode;
Expand Down Expand Up @@ -274,6 +276,12 @@ public boolean isSecure()
return upgradeRequest.isSecure();
}

@Override
public void addIdleTimeoutListener(Predicate<WebSocketTimeoutException> onIdleTimeout)
{
coreSession.addIdleTimeoutListener(t -> onIdleTimeout.test(new WebSocketTimeoutException(t)));
}

@Override
public void disconnect()
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
//
// ========================================================================
// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//

package org.eclipse.jetty.websocket.tests;

import java.net.URI;
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.exceptions.WebSocketTimeoutException;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.server.WebSocketUpgradeHandler;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import static org.eclipse.jetty.websocket.api.StatusCode.UNDEFINED;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class WebSocketIdleTimeoutTest
{
private static final int IDLE_TIMEOUT = 1000;
private final AtomicBoolean _allowTimeout = new AtomicBoolean();
private Server _server;
private ServerConnector _connector;
private WebSocketClient _client;
private TimeoutEndpoint _serverEndpoint;

@BeforeEach
public void before() throws Exception
{
_server = new Server();
_connector = new ServerConnector(_server);
_server.addConnector(_connector);
WebSocketUpgradeHandler upgradeHandler = WebSocketUpgradeHandler.from(_server);
_serverEndpoint = new TimeoutEndpoint();
upgradeHandler.getServerWebSocketContainer().addMapping("/", (req, resp, cb) -> _serverEndpoint);
upgradeHandler.getServerWebSocketContainer().setIdleTimeout(Duration.ofMillis(IDLE_TIMEOUT));
_server.setHandler(upgradeHandler);
_server.start();

_client = new WebSocketClient();
_client.start();
}

@AfterEach
public void after() throws Exception
{
_client.stop();
_server.stop();
}

public class TimeoutEndpoint extends EventSocket
{
volatile CountDownLatch timeoutLatch;

public void awaitTimeouts(int num) throws InterruptedException
{
timeoutLatch = new CountDownLatch(num);
timeoutLatch.await();
}

@Override
public void onOpen(Session session)
{
session.addIdleTimeoutListener(t ->
{
if (timeoutLatch != null)
timeoutLatch.countDown();
return _allowTimeout.get();
});
super.onOpen(session);
}
}

@Test
public void testWebSocketIdleTimeout() throws Exception
{
EventSocket clientEndpoint = new EventSocket();
_client.connect(clientEndpoint, URI.create("ws://localhost:" + _connector.getLocalPort()));
assertTrue(_serverEndpoint.openLatch.await(5, TimeUnit.SECONDS));

// The WebSocket connection has not been closed but multiple timeout events have occurred.
_serverEndpoint.awaitTimeouts(3);
assertThat(_serverEndpoint.closeCode, equalTo(UNDEFINED));
assertThat(_serverEndpoint.closeLatch.getCount(), equalTo(1L));

// Allow the timeout listener to close the connection.
_allowTimeout.set(true);
assertTrue(_serverEndpoint.closeLatch.await(5, TimeUnit.SECONDS));
assertThat(_serverEndpoint.error, instanceOf(WebSocketTimeoutException.class));
}
}

0 comments on commit d43241b

Please sign in to comment.