From fc11e9a45d4e52447ea6c0ef7938809248dbd1cf Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Sun, 22 May 2022 22:27:21 +0200 Subject: [PATCH] Fixes #8007 - Support Loom. Implemented virtual threads support for HTTP/1.1, HTTP/2 and HTTP/3. Signed-off-by: Simone Bordet --- .../http2/server/HTTP2ServerConnection.java | 34 +++++++ .../server/HTTP3ServerConnectionFactory.java | 33 ++++++- .../eclipse/jetty/io/AbstractConnection.java | 34 ++++++- .../server/AbstractConnectionFactory.java | 29 ++++-- .../org/eclipse/jetty/server/HttpChannel.java | 12 +++ .../eclipse/jetty/util/VirtualThreads.java | 95 +++++++++++++++++++ tests/test-http-client-transport/pom.xml | 23 ++++- .../jetty/http/client/VirtualThreadsTest.java | 72 ++++++++++++++ 8 files changed, 316 insertions(+), 16 deletions(-) create mode 100644 jetty-util/src/main/java/org/eclipse/jetty/util/VirtualThreads.java create mode 100644 tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/VirtualThreadsTest.java diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java index d9f35cec7761..e02d5fa61935 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java @@ -51,6 +51,7 @@ import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.TypeUtil; +import org.eclipse.jetty.util.VirtualThreads; import org.eclipse.jetty.util.thread.AutoLock; public class HTTP2ServerConnection extends HTTP2Connection @@ -141,7 +142,14 @@ public void onNewStream(Connector connector, IStream stream, HeadersFrame frame) HttpChannelOverHTTP2 channel = provideHttpChannel(connector, stream); Runnable task = channel.onRequest(frame); if (task != null) + { + if (isUseVirtualThreadToInvokeRootHandler()) + { + if (VirtualThreads.startVirtualThread(task)) + return; + } offerTask(task, false); + } } public void onData(IStream stream, DataFrame frame, Callback callback) @@ -153,7 +161,14 @@ public void onData(IStream stream, DataFrame frame, Callback callback) { Runnable task = channel.onData(frame, callback); if (task != null) + { + if (isUseVirtualThreadToInvokeRootHandler()) + { + if (VirtualThreads.startVirtualThread(task)) + return; + } offerTask(task, false); + } } else { @@ -170,7 +185,14 @@ public void onTrailers(IStream stream, HeadersFrame frame) { Runnable task = channel.onTrailer(frame); if (task != null) + { + if (isUseVirtualThreadToInvokeRootHandler()) + { + if (VirtualThreads.startVirtualThread(task)) + return; + } offerTask(task, false); + } } } @@ -193,6 +215,11 @@ public void onStreamFailure(IStream stream, Throwable failure, Callback callback Runnable task = channel.onFailure(failure, callback); if (task != null) { + if (isUseVirtualThreadToInvokeRootHandler()) + { + if (VirtualThreads.startVirtualThread(task)) + return; + } // We must dispatch to another thread because the task // may call application code that performs blocking I/O. offerTask(task, true); @@ -234,7 +261,14 @@ public void push(Connector connector, IStream stream, MetaData.Request request) HttpChannelOverHTTP2 channel = provideHttpChannel(connector, stream); Runnable task = channel.onPushRequest(request); if (task != null) + { + if (isUseVirtualThreadToInvokeRootHandler()) + { + if (VirtualThreads.startVirtualThread(task)) + return; + } offerTask(task, true); + } } private HttpChannelOverHTTP2 provideHttpChannel(Connector connector, IStream stream) diff --git a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/HTTP3ServerConnectionFactory.java b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/HTTP3ServerConnectionFactory.java index 3371f3337dfa..6311ef02fe84 100644 --- a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/HTTP3ServerConnectionFactory.java +++ b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/HTTP3ServerConnectionFactory.java @@ -27,6 +27,7 @@ import org.eclipse.jetty.http3.server.internal.ServerHTTP3StreamConnection; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.server.HttpConfiguration; +import org.eclipse.jetty.util.VirtualThreads; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -105,9 +106,15 @@ private ServerHTTP3StreamConnection getConnection() public void onRequest(Stream stream, HeadersFrame frame) { HTTP3StreamServer http3Stream = (HTTP3StreamServer)stream; - Runnable task = getConnection().onRequest(http3Stream, frame); + ServerHTTP3StreamConnection connection = getConnection(); + Runnable task = connection.onRequest(http3Stream, frame); if (task != null) { + if (connection.isUseVirtualThreadToInvokeRootHandler()) + { + if (VirtualThreads.startVirtualThread(task)) + return; + } ServerHTTP3Session protocolSession = (ServerHTTP3Session)http3Stream.getSession().getProtocolSession(); protocolSession.offer(task, false); } @@ -117,9 +124,15 @@ public void onRequest(Stream stream, HeadersFrame frame) public void onDataAvailable(Stream.Server stream) { HTTP3Stream http3Stream = (HTTP3Stream)stream; - Runnable task = getConnection().onDataAvailable(http3Stream); + ServerHTTP3StreamConnection connection = getConnection(); + Runnable task = connection.onDataAvailable(http3Stream); if (task != null) { + if (connection.isUseVirtualThreadToInvokeRootHandler()) + { + if (VirtualThreads.startVirtualThread(task)) + return; + } ServerHTTP3Session protocolSession = (ServerHTTP3Session)http3Stream.getSession().getProtocolSession(); protocolSession.offer(task, false); } @@ -129,9 +142,15 @@ public void onDataAvailable(Stream.Server stream) public void onTrailer(Stream.Server stream, HeadersFrame frame) { HTTP3Stream http3Stream = (HTTP3Stream)stream; - Runnable task = getConnection().onTrailer(http3Stream, frame); + ServerHTTP3StreamConnection connection = getConnection(); + Runnable task = connection.onTrailer(http3Stream, frame); if (task != null) { + if (connection.isUseVirtualThreadToInvokeRootHandler()) + { + if (VirtualThreads.startVirtualThread(task)) + return; + } ServerHTTP3Session protocolSession = (ServerHTTP3Session)http3Stream.getSession().getProtocolSession(); protocolSession.offer(task, false); } @@ -155,9 +174,15 @@ public boolean onIdleTimeout(Stream.Server stream, Throwable failure) public void onFailure(Stream.Server stream, long error, Throwable failure) { HTTP3Stream http3Stream = (HTTP3Stream)stream; - Runnable task = getConnection().onFailure((HTTP3Stream)stream, failure); + ServerHTTP3StreamConnection connection = getConnection(); + Runnable task = connection.onFailure((HTTP3Stream)stream, failure); if (task != null) { + if (connection.isUseVirtualThreadToInvokeRootHandler()) + { + if (VirtualThreads.startVirtualThread(task)) + return; + } ServerHTTP3Session protocolSession = (ServerHTTP3Session)http3Stream.getSession().getProtocolSession(); protocolSession.offer(task, true); } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java index f7a48e359716..7cce03127133 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java @@ -21,6 +21,7 @@ import java.util.concurrent.TimeoutException; import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.VirtualThreads; import org.eclipse.jetty.util.thread.Invocable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,6 +43,7 @@ public abstract class AbstractConnection implements Connection private final Executor _executor; private final Callback _readCallback; private int _inputBufferSize = 2048; + private boolean _useVirtualThreadToInvokeRootHandler; protected AbstractConnection(EndPoint endp, Executor executor) { @@ -75,6 +77,16 @@ public void setInputBufferSize(int inputBufferSize) _inputBufferSize = inputBufferSize; } + public boolean isUseVirtualThreadToInvokeRootHandler() + { + return _useVirtualThreadToInvokeRootHandler; + } + + public void setUseVirtualThreadToInvokeRootHandler(boolean useVirtualThreadToInvokeRootHandler) + { + _useVirtualThreadToInvokeRootHandler = useVirtualThreadToInvokeRootHandler; + } + protected Executor getExecutor() { return _executor; @@ -311,24 +323,40 @@ public String toConnectionString() this); } - private class ReadCallback implements Callback + private class ReadCallback implements Callback, Invocable { @Override public void succeeded() { + if (isUseVirtualThreadToInvokeRootHandler()) + { + if (VirtualThreads.startVirtualThread(AbstractConnection.this::onFillable)) + return; + } onFillable(); } @Override - public void failed(final Throwable x) + public void failed(Throwable x) { + if (isUseVirtualThreadToInvokeRootHandler()) + { + if (VirtualThreads.startVirtualThread(() -> onFillInterestedFailed(x))) + return; + } onFillInterestedFailed(x); } + @Override + public InvocationType getInvocationType() + { + return isUseVirtualThreadToInvokeRootHandler() ? InvocationType.NON_BLOCKING : InvocationType.BLOCKING; + } + @Override public String toString() { - return String.format("AC.ReadCB@%h{%s}", AbstractConnection.this, AbstractConnection.this); + return String.format("%s@%x{%s}", getClass().getSimpleName(), hashCode(), AbstractConnection.this); } } } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnectionFactory.java b/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnectionFactory.java index 5e405f542cda..b227f36c86ea 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnectionFactory.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnectionFactory.java @@ -13,14 +13,13 @@ package org.eclipse.jetty.server; -import java.util.Arrays; -import java.util.Collections; import java.util.Iterator; import java.util.List; import org.eclipse.jetty.io.AbstractConnection; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.util.ArrayUtil; +import org.eclipse.jetty.util.VirtualThreads; import org.eclipse.jetty.util.annotation.ManagedAttribute; import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.component.ContainerLifeCycle; @@ -34,18 +33,19 @@ public abstract class AbstractConnectionFactory extends ContainerLifeCycle imple { private final String _protocol; private final List _protocols; - private int _inputbufferSize = 8192; + private int _inputBufferSize = 8192; + private boolean _useVirtualThreadToInvokeRootHandler; protected AbstractConnectionFactory(String protocol) { _protocol = protocol; - _protocols = Collections.unmodifiableList(Arrays.asList(new String[]{protocol})); + _protocols = List.of(protocol); } protected AbstractConnectionFactory(String... protocols) { _protocol = protocols[0]; - _protocols = Collections.unmodifiableList(Arrays.asList(protocols)); + _protocols = List.of(protocols); } @Override @@ -64,12 +64,25 @@ public List getProtocols() @ManagedAttribute("The buffer size used to read from the network") public int getInputBufferSize() { - return _inputbufferSize; + return _inputBufferSize; } public void setInputBufferSize(int size) { - _inputbufferSize = size; + _inputBufferSize = size; + } + + public boolean isUseVirtualThreadToInvokeRootHandler() + { + return _useVirtualThreadToInvokeRootHandler; + } + + public void setUseVirtualThreadToInvokeRootHandler(boolean useVirtualThreadToInvokeRootHandler) + { + if (useVirtualThreadToInvokeRootHandler && !VirtualThreads.areSupported()) + VirtualThreads.warn(); + else + _useVirtualThreadToInvokeRootHandler = useVirtualThreadToInvokeRootHandler; } protected String findNextProtocol(Connector connector) @@ -102,6 +115,8 @@ protected AbstractConnection configure(AbstractConnection connection, Connector // Add Connection.Listeners from this factory getEventListeners().forEach(connection::addEventListener); + connection.setUseVirtualThreadToInvokeRootHandler(isUseVirtualThreadToInvokeRootHandler()); + return connection; } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java index 6e13f885faf1..396bcdd54a99 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java @@ -42,6 +42,7 @@ import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http.MetaData; +import org.eclipse.jetty.io.AbstractConnection; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.EndPoint; @@ -54,6 +55,7 @@ import org.eclipse.jetty.util.HostPort; import org.eclipse.jetty.util.SharedBlockingCallback.Blocker; import org.eclipse.jetty.util.StringUtil; +import org.eclipse.jetty.util.VirtualThreads; import org.eclipse.jetty.util.thread.Scheduler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1146,6 +1148,16 @@ public HttpOutput.Interceptor getNextInterceptor() protected void execute(Runnable task) { + Connection c = getConnection(); + if (c instanceof AbstractConnection) + { + AbstractConnection connection = (AbstractConnection)c; + if (connection.isUseVirtualThreadToInvokeRootHandler()) + { + if (VirtualThreads.startVirtualThread(task)) + return; + } + } _executor.execute(task); } diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/VirtualThreads.java b/jetty-util/src/main/java/org/eclipse/jetty/util/VirtualThreads.java new file mode 100644 index 000000000000..ff9ac7c06b1d --- /dev/null +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/VirtualThreads.java @@ -0,0 +1,95 @@ +// +// ======================================================================== +// Copyright (c) 1995-2022 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.util; + +import java.lang.reflect.Method; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class VirtualThreads +{ + private static final Logger LOG = LoggerFactory.getLogger(VirtualThreads.class); + private static final Method startVirtualThread = probeStartVirtualThread(); + private static final Method isVirtualThread = probeIsVirtualThread(); + + private static Method probeStartVirtualThread() + { + try + { + return Thread.class.getMethod("startVirtualThread", Runnable.class); + } + catch (Throwable x) + { + if (LOG.isDebugEnabled()) + LOG.debug("Virtual thread support is not available in the current Java runtime ({})", System.getProperty("java.version")); + return null; + } + } + + private static Method probeIsVirtualThread() + { + try + { + return Thread.class.getMethod("isVirtual"); + } + catch (Throwable x) + { + if (LOG.isDebugEnabled()) + LOG.debug("Virtual thread support is not available in the current Java runtime ({})", System.getProperty("java.version")); + return null; + } + } + + public static boolean areSupported() + { + return startVirtualThread != null; + } + + public static void warn() + { + LOG.warn("Virtual thread support is not available (or not enabled via --enable-preview) in the current Java runtime ({})", System.getProperty("java.version")); + } + + public static boolean startVirtualThread(Runnable task) + { + try + { + startVirtualThread.invoke(null, task); + return true; + } + catch (Throwable x) + { + warn(); + return false; + } + } + + public static boolean isVirtualThread() + { + try + { + return (Boolean)isVirtualThread.invoke(Thread.currentThread()); + } + catch (Throwable x) + { + warn(); + return false; + } + } + + private VirtualThreads() + { + } +} diff --git a/tests/test-http-client-transport/pom.xml b/tests/test-http-client-transport/pom.xml index e58686350731..daa5bbd83b4c 100644 --- a/tests/test-http-client-transport/pom.xml +++ b/tests/test-http-client-transport/pom.xml @@ -30,9 +30,9 @@ - jdk17 + enable-incubator-foreign - [17,) + [17,19) @@ -50,6 +50,25 @@ + + enable-foreign-and-virtual-threads-preview + + [19,) + + + + + maven-surefire-plugin + + + @{argLine} + --enable-preview + + + + + + diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/VirtualThreadsTest.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/VirtualThreadsTest.java new file mode 100644 index 000000000000..61d423c055b9 --- /dev/null +++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/VirtualThreadsTest.java @@ -0,0 +1,72 @@ +// +// ======================================================================== +// Copyright (c) 1995-2022 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.http.client; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.eclipse.jetty.client.api.ContentResponse; +import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.server.AbstractConnectionFactory; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.util.VirtualThreads; +import org.junit.jupiter.api.Assumptions; +import org.junit.jupiter.api.condition.DisabledForJreRange; +import org.junit.jupiter.api.condition.JRE; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ArgumentsSource; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +@DisabledForJreRange(max = JRE.JAVA_18) +public class VirtualThreadsTest extends AbstractTest +{ + @Override + public void init(Transport transport) throws IOException + { + setScenario(new TransportScenario(transport)); + } + + @ParameterizedTest + @ArgumentsSource(TransportProvider.class) + public void testAsyncReadThrowsException(Transport transport) throws Exception + { + // No virtual thread support in FCGI server-side. + Assumptions.assumeTrue(transport != Transport.FCGI); + + init(transport); + scenario.start(new EmptyServerHandler() + { + @Override + protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + if (!VirtualThreads.isVirtualThread()) + response.setStatus(HttpStatus.NOT_IMPLEMENTED_501); + } + }); + scenario.connector.getConnectionFactories().stream() + .filter(f -> f instanceof AbstractConnectionFactory) + .map(AbstractConnectionFactory.class::cast) + .forEach(f -> f.setUseVirtualThreadToInvokeRootHandler(true)); + + ContentResponse response = scenario.client.newRequest(scenario.newURI()) + .timeout(5, TimeUnit.SECONDS) + .send(); + + assertEquals(HttpStatus.OK_200, response.getStatus()); + } +}