From be3d16bdbbb63cc0df7e4964d8a47e3e376a631d Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Wed, 10 Aug 2022 14:31:04 +0200 Subject: [PATCH] Fixes #8007 - Support Loom. (#8360) Implemented support for virtual threads for HTTP/1.1, HTTP/2 and HTTP/3. The virtual thread support is in AdaptiveExecutionStrategy. When virtual threads are supported and enabled, reserved threads are disabled and blocking tasks are run in a virtual thread instead that being executed by the Executor. Signed-off-by: Simone Bordet --- .../jetty/http/spi/DelegatingThreadPool.java | 16 +- .../eclipse/jetty/io/AbstractConnection.java | 13 +- .../src/main/config/etc/jetty-threadpool.xml | 1 + .../src/main/config/modules/threadpool.mod | 3 + .../server/AbstractConnectionFactory.java | 12 +- .../eclipse/jetty/util/VirtualThreads.java | 161 ++++++++++++++++ .../jetty/util/thread/ExecutorThreadPool.java | 23 ++- .../jetty/util/thread/QueuedThreadPool.java | 23 ++- .../util/thread/ReservedThreadExecutor.java | 3 + .../strategy/AdaptiveExecutionStrategy.java | 22 ++- .../util/thread/strategy/ProduceConsume.java | 2 +- tests/test-http-client-transport/pom.xml | 23 ++- .../jetty/http/client/TransportScenario.java | 19 +- .../jetty/http/client/VirtualThreadsTest.java | 174 ++++++++++++++++++ .../test/resources/jetty-logging.properties | 2 + 15 files changed, 461 insertions(+), 36 deletions(-) create mode 100644 jetty-util/src/main/java/org/eclipse/jetty/util/VirtualThreads.java create mode 100644 tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/VirtualThreadsTest.java diff --git a/jetty-http-spi/src/main/java/org/eclipse/jetty/http/spi/DelegatingThreadPool.java b/jetty-http-spi/src/main/java/org/eclipse/jetty/http/spi/DelegatingThreadPool.java index 66ee03f4d27f..0735f1051550 100644 --- a/jetty-http-spi/src/main/java/org/eclipse/jetty/http/spi/DelegatingThreadPool.java +++ b/jetty-http-spi/src/main/java/org/eclipse/jetty/http/spi/DelegatingThreadPool.java @@ -18,12 +18,13 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import org.eclipse.jetty.util.VirtualThreads; import org.eclipse.jetty.util.component.ContainerLifeCycle; import org.eclipse.jetty.util.component.LifeCycle; import org.eclipse.jetty.util.thread.ThreadPool; import org.eclipse.jetty.util.thread.TryExecutor; -public class DelegatingThreadPool extends ContainerLifeCycle implements ThreadPool, TryExecutor +public class DelegatingThreadPool extends ContainerLifeCycle implements ThreadPool, TryExecutor, VirtualThreads.Configurable { private Executor _executor; // memory barrier provided by start/stop semantics private TryExecutor _tryExecutor; @@ -61,6 +62,19 @@ public boolean tryExecute(Runnable task) return _tryExecutor.tryExecute(task); } + @Override + public boolean isUseVirtualThreads() + { + return VirtualThreads.isUseVirtualThreads(_executor); + } + + @Override + public void setUseVirtualThreads(boolean useVirtualThreads) + { + if (_executor instanceof VirtualThreads.Configurable) + ((VirtualThreads.Configurable)_executor).setUseVirtualThreads(useVirtualThreads); + } + @Override public int getIdleThreads() { diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java index f7a48e359716..993523253ce9 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java @@ -43,11 +43,11 @@ public abstract class AbstractConnection implements Connection private final Callback _readCallback; private int _inputBufferSize = 2048; - protected AbstractConnection(EndPoint endp, Executor executor) + protected AbstractConnection(EndPoint endPoint, Executor executor) { if (executor == null) throw new IllegalArgumentException("Executor must not be null!"); - _endPoint = endp; + _endPoint = endPoint; _executor = executor; _readCallback = new ReadCallback(); } @@ -135,11 +135,6 @@ public void fillInterested() getEndPoint().fillInterested(_readCallback); } - public void tryFillInterested() - { - tryFillInterested(_readCallback); - } - public void tryFillInterested(Callback callback) { getEndPoint().tryFillInterested(callback); @@ -320,7 +315,7 @@ public void succeeded() } @Override - public void failed(final Throwable x) + public void failed(Throwable x) { onFillInterestedFailed(x); } @@ -328,7 +323,7 @@ public void failed(final Throwable x) @Override public String toString() { - return String.format("AC.ReadCB@%h{%s}", AbstractConnection.this, AbstractConnection.this); + return String.format("%s@%x{%s}", getClass().getSimpleName(), hashCode(), AbstractConnection.this); } } } diff --git a/jetty-server/src/main/config/etc/jetty-threadpool.xml b/jetty-server/src/main/config/etc/jetty-threadpool.xml index 8ae32fd1e200..6cb458872645 100644 --- a/jetty-server/src/main/config/etc/jetty-threadpool.xml +++ b/jetty-server/src/main/config/etc/jetty-threadpool.xml @@ -23,6 +23,7 @@ + diff --git a/jetty-server/src/main/config/modules/threadpool.mod b/jetty-server/src/main/config/modules/threadpool.mod index 3030b0a5d9b0..fbd90cf337a5 100644 --- a/jetty-server/src/main/config/modules/threadpool.mod +++ b/jetty-server/src/main/config/modules/threadpool.mod @@ -17,6 +17,9 @@ etc/jetty-threadpool.xml ## Number of reserved threads (-1 for heuristic). #jetty.threadPool.reservedThreads=-1 +## Whether to use virtual threads, if the runtime supports them. +#jetty.threadPool.useVirtualThreads=false + ## Thread idle timeout (in milliseconds). #jetty.threadPool.idleTimeout=60000 diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnectionFactory.java b/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnectionFactory.java index 5e405f542cda..1586d1e864ee 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnectionFactory.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnectionFactory.java @@ -13,8 +13,6 @@ package org.eclipse.jetty.server; -import java.util.Arrays; -import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -34,18 +32,18 @@ public abstract class AbstractConnectionFactory extends ContainerLifeCycle imple { private final String _protocol; private final List _protocols; - private int _inputbufferSize = 8192; + private int _inputBufferSize = 8192; protected AbstractConnectionFactory(String protocol) { _protocol = protocol; - _protocols = Collections.unmodifiableList(Arrays.asList(new String[]{protocol})); + _protocols = List.of(protocol); } protected AbstractConnectionFactory(String... protocols) { _protocol = protocols[0]; - _protocols = Collections.unmodifiableList(Arrays.asList(protocols)); + _protocols = List.of(protocols); } @Override @@ -64,12 +62,12 @@ public List getProtocols() @ManagedAttribute("The buffer size used to read from the network") public int getInputBufferSize() { - return _inputbufferSize; + return _inputBufferSize; } public void setInputBufferSize(int size) { - _inputbufferSize = size; + _inputBufferSize = size; } protected String findNextProtocol(Connector connector) diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/VirtualThreads.java b/jetty-util/src/main/java/org/eclipse/jetty/util/VirtualThreads.java new file mode 100644 index 000000000000..82ae6079150e --- /dev/null +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/VirtualThreads.java @@ -0,0 +1,161 @@ +// +// ======================================================================== +// Copyright (c) 1995-2022 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.util; + +import java.lang.reflect.Method; +import java.util.concurrent.Executor; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + *

Utility class to use to query the runtime for virtual thread support, + * and, if virtual threads are supported, to start virtual threads.

+ * + * @see #areSupported() + * @see #startVirtualThread(Runnable) + * @see #isVirtualThread() + */ +public class VirtualThreads +{ + private static final Logger LOG = LoggerFactory.getLogger(VirtualThreads.class); + private static final Method startVirtualThread = probeStartVirtualThread(); + private static final Method isVirtualThread = probeIsVirtualThread(); + + private static Method probeStartVirtualThread() + { + try + { + return Thread.class.getMethod("startVirtualThread", Runnable.class); + } + catch (Throwable x) + { + return null; + } + } + + private static Method probeIsVirtualThread() + { + try + { + return Thread.class.getMethod("isVirtual"); + } + catch (Throwable x) + { + return null; + } + } + + private static void warn() + { + LOG.warn("Virtual thread support is not available (or not enabled via --enable-preview) in the current Java runtime ({})", System.getProperty("java.version")); + } + + /** + * @return whether the runtime supports virtual threads + */ + public static boolean areSupported() + { + return startVirtualThread != null; + } + + /** + *

Starts a virtual thread to execute the given task, or throws + * {@link UnsupportedOperationException} if virtual threads are not + * supported.

+ * + * @param task the task to execute in a virtual thread + * @see #areSupported() + */ + public static void startVirtualThread(Runnable task) + { + try + { + if (LOG.isDebugEnabled()) + LOG.debug("Starting in virtual thread: {}", task); + startVirtualThread.invoke(null, task); + } + catch (Throwable x) + { + warn(); + throw new UnsupportedOperationException(x); + } + } + + /** + * @return whether the current thread is a virtual thread + */ + public static boolean isVirtualThread() + { + try + { + return (Boolean)isVirtualThread.invoke(Thread.currentThread()); + } + catch (Throwable x) + { + warn(); + return false; + } + } + + /** + *

Tests whether the given executor implements {@link Configurable} and + * it has been configured to use virtual threads.

+ * + * @param executor the Executor to test + * @return whether the given executor implements {@link Configurable} + * and it has been configured to use virtual threads + */ + public static boolean isUseVirtualThreads(Executor executor) + { + if (executor instanceof Configurable) + return ((Configurable)executor).isUseVirtualThreads(); + return false; + } + + /** + *

Implementations of this interface can be configured to use virtual threads.

+ *

Whether virtual threads are actually used depends on whether the runtime + * supports virtual threads and, if the runtime supports them, whether they are + * configured to be used via {@link #setUseVirtualThreads(boolean)}.

+ */ + public interface Configurable + { + /** + * @return whether to use virtual threads + */ + default boolean isUseVirtualThreads() + { + return false; + } + + /** + * @param useVirtualThreads whether to use virtual threads + * @throws UnsupportedOperationException if the runtime does not support virtual threads + * @see #areSupported() + */ + default void setUseVirtualThreads(boolean useVirtualThreads) + { + if (useVirtualThreads && !VirtualThreads.areSupported()) + { + warn(); + throw new UnsupportedOperationException(); + } + } + } + + private VirtualThreads() + { + } +} diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ExecutorThreadPool.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ExecutorThreadPool.java index 0d8b3c31eea8..0019c7c7359e 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ExecutorThreadPool.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ExecutorThreadPool.java @@ -24,6 +24,7 @@ import java.util.stream.Collectors; import org.eclipse.jetty.util.ProcessorUtils; +import org.eclipse.jetty.util.VirtualThreads; import org.eclipse.jetty.util.annotation.ManagedAttribute; import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.component.ContainerLifeCycle; @@ -34,7 +35,7 @@ * A {@link org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool} wrapper around {@link ThreadPoolExecutor}. */ @ManagedObject("A thread pool") -public class ExecutorThreadPool extends ContainerLifeCycle implements ThreadPool.SizedThreadPool, TryExecutor +public class ExecutorThreadPool extends ContainerLifeCycle implements ThreadPool.SizedThreadPool, TryExecutor, VirtualThreads.Configurable { private final ThreadPoolExecutor _executor; private final ThreadPoolBudget _budget; @@ -46,6 +47,7 @@ public class ExecutorThreadPool extends ContainerLifeCycle implements ThreadPool private int _priority = Thread.NORM_PRIORITY; private boolean _daemon; private boolean _detailedDump; + private boolean _useVirtualThreads; public ExecutorThreadPool() { @@ -268,6 +270,25 @@ public boolean isLowOnThreads() return getThreads() == getMaxThreads() && _executor.getQueue().size() >= getIdleThreads(); } + @Override + public boolean isUseVirtualThreads() + { + return _useVirtualThreads; + } + + @Override + public void setUseVirtualThreads(boolean useVirtualThreads) + { + try + { + VirtualThreads.Configurable.super.setUseVirtualThreads(useVirtualThreads); + _useVirtualThreads = useVirtualThreads; + } + catch (UnsupportedOperationException ignored) + { + } + } + @Override protected void doStart() throws Exception { diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java index 8033d6cd0358..a1e9f221f150 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java @@ -28,6 +28,7 @@ import org.eclipse.jetty.util.AtomicBiInteger; import org.eclipse.jetty.util.BlockingArrayQueue; import org.eclipse.jetty.util.StringUtil; +import org.eclipse.jetty.util.VirtualThreads; import org.eclipse.jetty.util.annotation.ManagedAttribute; import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.annotation.ManagedOperation; @@ -74,7 +75,7 @@ * */ @ManagedObject("A thread pool") -public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactory, SizedThreadPool, Dumpable, TryExecutor +public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactory, SizedThreadPool, Dumpable, TryExecutor, VirtualThreads.Configurable { private static final Logger LOG = LoggerFactory.getLogger(QueuedThreadPool.class); private static final Runnable NOOP = () -> @@ -109,6 +110,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor private int _lowThreadsThreshold = 1; private ThreadPoolBudget _budget; private long _stopTimeout; + private boolean _useVirtualThreads; public QueuedThreadPool() { @@ -511,6 +513,25 @@ public void setLowThreadsThreshold(int lowThreadsThreshold) _lowThreadsThreshold = lowThreadsThreshold; } + @Override + public boolean isUseVirtualThreads() + { + return _useVirtualThreads; + } + + @Override + public void setUseVirtualThreads(boolean useVirtualThreads) + { + try + { + VirtualThreads.Configurable.super.setUseVirtualThreads(useVirtualThreads); + _useVirtualThreads = useVirtualThreads; + } + catch (UnsupportedOperationException ignored) + { + } + } + /** * @return the number of jobs in the queue waiting for a thread */ diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java index 1e0e8377c43f..21f21a32e20f 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java @@ -26,6 +26,7 @@ import org.eclipse.jetty.util.AtomicBiInteger; import org.eclipse.jetty.util.ProcessorUtils; +import org.eclipse.jetty.util.VirtualThreads; import org.eclipse.jetty.util.annotation.ManagedAttribute; import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.component.AbstractLifeCycle; @@ -103,6 +104,8 @@ private static int reservedThreads(Executor executor, int capacity) { if (capacity >= 0) return capacity; + if (VirtualThreads.isUseVirtualThreads(executor)) + return 0; int cpus = ProcessorUtils.availableProcessors(); if (executor instanceof ThreadPool.SizedThreadPool) { diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java index 02db9f14005e..ecf261f675f7 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java @@ -21,6 +21,7 @@ import java.util.concurrent.atomic.LongAdder; import org.eclipse.jetty.util.IO; +import org.eclipse.jetty.util.VirtualThreads; import org.eclipse.jetty.util.annotation.ManagedAttribute; import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.annotation.ManagedOperation; @@ -136,11 +137,12 @@ private enum SubStrategy private final Executor _executor; private final TryExecutor _tryExecutor; private final Runnable _runPendingProducer = () -> tryProduce(true); + private boolean _useVirtualThreads; private State _state = State.IDLE; private boolean _pending; /** - * @param producer The produce of tasks to be consumed. + * @param producer The producer of tasks to be consumed. * @param executor The executor to be used for executing producers or consumers, depending on the sub-strategy. */ public AdaptiveExecutionStrategy(Producer producer, Executor executor) @@ -154,6 +156,13 @@ public AdaptiveExecutionStrategy(Producer producer, Executor executor) LOG.debug("{} created", this); } + @Override + protected void doStart() throws Exception + { + super.doStart(); + _useVirtualThreads = VirtualThreads.isUseVirtualThreads(_executor); + } + @Override public void dispatch() { @@ -462,7 +471,10 @@ private void execute(Runnable task) { try { - _executor.execute(task); + if (isUseVirtualThreads()) + VirtualThreads.startVirtualThread(task); + else + _executor.execute(task); } catch (RejectedExecutionException e) { @@ -476,6 +488,12 @@ private void execute(Runnable task) } } + @ManagedAttribute(value = "whether this execution strategy uses virtual threads", readonly = true) + public boolean isUseVirtualThreads() + { + return _useVirtualThreads; + } + @ManagedAttribute(value = "number of tasks consumed with PC mode", readonly = true) public long getPCTasksConsumed() { diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ProduceConsume.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ProduceConsume.java index 3d5580be4671..649458f7bdc5 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ProduceConsume.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ProduceConsume.java @@ -26,7 +26,7 @@ */ public class ProduceConsume implements ExecutionStrategy, Runnable { - private static final Logger LOG = LoggerFactory.getLogger(ExecuteProduceConsume.class); + private static final Logger LOG = LoggerFactory.getLogger(ProduceConsume.class); private final AutoLock _lock = new AutoLock(); private final Producer _producer; diff --git a/tests/test-http-client-transport/pom.xml b/tests/test-http-client-transport/pom.xml index a95887aaa2f5..660329858934 100644 --- a/tests/test-http-client-transport/pom.xml +++ b/tests/test-http-client-transport/pom.xml @@ -35,9 +35,9 @@ - jdk17 + enable-incubator-foreign - [17,) + [17,19) @@ -55,6 +55,25 @@ + + enable-foreign-and-virtual-threads-preview + + [19,) + + + + + maven-surefire-plugin + + + @{argLine} + --enable-preview + + + + + + diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/TransportScenario.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/TransportScenario.java index d60fb908a2aa..efb576e7a12d 100644 --- a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/TransportScenario.java +++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/TransportScenario.java @@ -120,7 +120,7 @@ public Connector newServerConnector(Server server) case H2C: case H2: case FCGI: - return new ServerConnector(server, provideServerConnectionFactory(transport)); + return new ServerConnector(server, 1, 1, provideServerConnectionFactory(transport)); case H3: return new HTTP3ServerConnector(server, sslContextFactory, provideServerConnectionFactory(transport)); case UNIX_DOMAIN: @@ -343,6 +343,12 @@ public void startServer(HttpServlet servlet) throws Exception } public void startServer(Handler handler) throws Exception + { + prepareServer(handler); + server.start(); + } + + protected void prepareServer(Handler handler) { sslContextFactory = newServerSslContextFactory(); QueuedThreadPool serverThreads = new QueuedThreadPool(); @@ -353,23 +359,12 @@ public void startServer(Handler handler) throws Exception server.addBean(mbeanContainer); connector = newServerConnector(server); server.addConnector(connector); - server.setRequestLog((request, response) -> { int status = response.getCommittedMetaData().getStatus(); requestLog.offer(String.format("%s %s %s %03d", request.getMethod(), request.getRequestURI(), request.getProtocol(), status)); }); - server.setHandler(handler); - - try - { - server.start(); - } - catch (Exception e) - { - e.printStackTrace(); - } } protected SslContextFactory.Server newServerSslContextFactory() diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/VirtualThreadsTest.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/VirtualThreadsTest.java new file mode 100644 index 000000000000..5649ff1716e3 --- /dev/null +++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/VirtualThreadsTest.java @@ -0,0 +1,174 @@ +// +// ======================================================================== +// Copyright (c) 1995-2022 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.http.client; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import javax.servlet.AsyncContext; +import javax.servlet.ReadListener; +import javax.servlet.ServletException; +import javax.servlet.ServletInputStream; +import javax.servlet.ServletOutputStream; +import javax.servlet.WriteListener; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.eclipse.jetty.client.api.ContentResponse; +import org.eclipse.jetty.client.util.StringRequestContent; +import org.eclipse.jetty.http.HttpMethod; +import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.util.VirtualThreads; +import org.eclipse.jetty.util.thread.ThreadPool; +import org.junit.jupiter.api.Assumptions; +import org.junit.jupiter.api.condition.DisabledForJreRange; +import org.junit.jupiter.api.condition.JRE; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ArgumentsSource; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@DisabledForJreRange(max = JRE.JAVA_18) +public class VirtualThreadsTest extends AbstractTest +{ + @Override + public void init(Transport transport) throws IOException + { + setScenario(new TransportScenario(transport)); + } + + @ParameterizedTest + @ArgumentsSource(TransportProvider.class) + public void testServletInvokedOnVirtualThread(Transport transport) throws Exception + { + // No virtual thread support in FCGI server-side. + Assumptions.assumeTrue(transport != Transport.FCGI); + + init(transport); + scenario.prepareServer(new EmptyServerHandler() + { + @Override + protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + if (!VirtualThreads.isVirtualThread()) + response.setStatus(HttpStatus.NOT_IMPLEMENTED_501); + } + }); + ThreadPool threadPool = scenario.server.getThreadPool(); + if (threadPool instanceof VirtualThreads.Configurable) + ((VirtualThreads.Configurable)threadPool).setUseVirtualThreads(true); + scenario.server.start(); + scenario.startClient(); + + ContentResponse response = scenario.client.newRequest(scenario.newURI()) + .timeout(5, TimeUnit.SECONDS) + .send(); + + assertEquals(HttpStatus.OK_200, response.getStatus(), " for transport " + transport); + } + + @ParameterizedTest + @ArgumentsSource(TransportProvider.class) + public void testServletCallbacksInvokedOnVirtualThread(Transport transport) throws Exception + { + // No virtual thread support in FCGI server-side. + Assumptions.assumeTrue(transport != Transport.FCGI); + + init(transport); + byte[] data = new byte[128 * 1024 * 1024]; + scenario.prepareServer(new EmptyServerHandler() + { + @Override + protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + if (!VirtualThreads.isVirtualThread()) + response.setStatus(HttpStatus.NOT_IMPLEMENTED_501); + + AsyncContext asyncContext = request.startAsync(); + ServletInputStream input = request.getInputStream(); + ServletOutputStream output = response.getOutputStream(); + + input.setReadListener(new ReadListener() + { + @Override + public void onDataAvailable() throws IOException + { + if (!VirtualThreads.isVirtualThread()) + throw new IOException("not a virtual thread"); + while (input.isReady()) + { + int read = input.read(); + if (read < 0) + break; + } + } + + @Override + public void onAllDataRead() throws IOException + { + if (!VirtualThreads.isVirtualThread()) + throw new IOException("not a virtual thread"); + // Write a large response content to cause onWritePossible() to be called. + output.write(data); + } + + @Override + public void onError(Throwable t) + { + } + }); + + output.setWriteListener(new WriteListener() + { + @Override + public void onWritePossible() throws IOException + { + if (!VirtualThreads.isVirtualThread()) + throw new IOException("not a virtual thread"); + asyncContext.complete(); + } + + @Override + public void onError(Throwable t) + { + } + }); + } + }); + ThreadPool threadPool = scenario.server.getThreadPool(); + if (threadPool instanceof VirtualThreads.Configurable) + ((VirtualThreads.Configurable)threadPool).setUseVirtualThreads(true); + scenario.server.start(); + scenario.startClient(); + + CountDownLatch latch = new CountDownLatch(1); + AtomicInteger length = new AtomicInteger(); + scenario.client.newRequest(scenario.newURI()) + .method(HttpMethod.POST) + .body(new StringRequestContent("hello")) + .onResponseContent((response, content) -> length.addAndGet(content.remaining())) + .timeout(5, TimeUnit.SECONDS) + .send(result -> + { + if (result.isSucceeded() && result.getResponse().getStatus() == HttpStatus.OK_200) + latch.countDown(); + }); + + assertTrue(latch.await(15, TimeUnit.SECONDS)); + assertEquals(length.get(), data.length); + } +} diff --git a/tests/test-http-client-transport/src/test/resources/jetty-logging.properties b/tests/test-http-client-transport/src/test/resources/jetty-logging.properties index 90c97123795f..7ba9dbc3b1fe 100644 --- a/tests/test-http-client-transport/src/test/resources/jetty-logging.properties +++ b/tests/test-http-client-transport/src/test/resources/jetty-logging.properties @@ -10,3 +10,5 @@ org.eclipse.jetty.http2.hpack.LEVEL=INFO org.eclipse.jetty.http3.qpack.LEVEL=INFO #org.eclipse.jetty.quic.LEVEL=DEBUG org.eclipse.jetty.quic.quiche.LEVEL=INFO +#org.eclipse.jetty.util.VirtualThreads.LEVEL=DEBUG +#org.eclipse.jetty.util.thread.strategy.LEVEL=DEBUG