Skip to content

Commit

Permalink
Fixes #8007 - Support Loom. (#8360)
Browse files Browse the repository at this point in the history
Implemented support for virtual threads for HTTP/1.1, HTTP/2 and HTTP/3.

The virtual thread support is in AdaptiveExecutionStrategy.
When virtual threads are supported and enabled, reserved threads are disabled and
blocking tasks are run in a virtual thread instead that being executed by the Executor.

Signed-off-by: Simone Bordet <[email protected]>
  • Loading branch information
sbordet authored Aug 10, 2022
1 parent 50b7dc4 commit be3d16b
Show file tree
Hide file tree
Showing 15 changed files with 461 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.eclipse.jetty.util.VirtualThreads;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.thread.ThreadPool;
import org.eclipse.jetty.util.thread.TryExecutor;

public class DelegatingThreadPool extends ContainerLifeCycle implements ThreadPool, TryExecutor
public class DelegatingThreadPool extends ContainerLifeCycle implements ThreadPool, TryExecutor, VirtualThreads.Configurable
{
private Executor _executor; // memory barrier provided by start/stop semantics
private TryExecutor _tryExecutor;
Expand Down Expand Up @@ -61,6 +62,19 @@ public boolean tryExecute(Runnable task)
return _tryExecutor.tryExecute(task);
}

@Override
public boolean isUseVirtualThreads()
{
return VirtualThreads.isUseVirtualThreads(_executor);
}

@Override
public void setUseVirtualThreads(boolean useVirtualThreads)
{
if (_executor instanceof VirtualThreads.Configurable)
((VirtualThreads.Configurable)_executor).setUseVirtualThreads(useVirtualThreads);
}

@Override
public int getIdleThreads()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ public abstract class AbstractConnection implements Connection
private final Callback _readCallback;
private int _inputBufferSize = 2048;

protected AbstractConnection(EndPoint endp, Executor executor)
protected AbstractConnection(EndPoint endPoint, Executor executor)
{
if (executor == null)
throw new IllegalArgumentException("Executor must not be null!");
_endPoint = endp;
_endPoint = endPoint;
_executor = executor;
_readCallback = new ReadCallback();
}
Expand Down Expand Up @@ -135,11 +135,6 @@ public void fillInterested()
getEndPoint().fillInterested(_readCallback);
}

public void tryFillInterested()
{
tryFillInterested(_readCallback);
}

public void tryFillInterested(Callback callback)
{
getEndPoint().tryFillInterested(callback);
Expand Down Expand Up @@ -320,15 +315,15 @@ public void succeeded()
}

@Override
public void failed(final Throwable x)
public void failed(Throwable x)
{
onFillInterestedFailed(x);
}

@Override
public String toString()
{
return String.format("AC.ReadCB@%h{%s}", AbstractConnection.this, AbstractConnection.this);
return String.format("%s@%x{%s}", getClass().getSimpleName(), hashCode(), AbstractConnection.this);
}
}
}
1 change: 1 addition & 0 deletions jetty-server/src/main/config/etc/jetty-threadpool.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
<Set name="minThreads" type="int"><Property name="jetty.threadPool.minThreads" deprecated="threads.min" default="10"/></Set>
<Set name="maxThreads" type="int"><Property name="jetty.threadPool.maxThreads" deprecated="threads.max" default="200"/></Set>
<Set name="reservedThreads" type="int"><Property name="jetty.threadPool.reservedThreads" default="-1"/></Set>
<Set name="useVirtualThreads" type="int"><Property name="jetty.threadPool.useVirtualThreads" default="false"/></Set>
<Set name="idleTimeout" type="int"><Property name="jetty.threadPool.idleTimeout" deprecated="threads.timeout" default="60000"/></Set>
<Set name="detailedDump" type="boolean"><Property name="jetty.threadPool.detailedDump" default="false"/></Set>
</New>
Expand Down
3 changes: 3 additions & 0 deletions jetty-server/src/main/config/modules/threadpool.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ etc/jetty-threadpool.xml
## Number of reserved threads (-1 for heuristic).
#jetty.threadPool.reservedThreads=-1

## Whether to use virtual threads, if the runtime supports them.
#jetty.threadPool.useVirtualThreads=false

## Thread idle timeout (in milliseconds).
#jetty.threadPool.idleTimeout=60000

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@

package org.eclipse.jetty.server;

import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;

Expand All @@ -34,18 +32,18 @@ public abstract class AbstractConnectionFactory extends ContainerLifeCycle imple
{
private final String _protocol;
private final List<String> _protocols;
private int _inputbufferSize = 8192;
private int _inputBufferSize = 8192;

protected AbstractConnectionFactory(String protocol)
{
_protocol = protocol;
_protocols = Collections.unmodifiableList(Arrays.asList(new String[]{protocol}));
_protocols = List.of(protocol);
}

protected AbstractConnectionFactory(String... protocols)
{
_protocol = protocols[0];
_protocols = Collections.unmodifiableList(Arrays.asList(protocols));
_protocols = List.of(protocols);
}

@Override
Expand All @@ -64,12 +62,12 @@ public List<String> getProtocols()
@ManagedAttribute("The buffer size used to read from the network")
public int getInputBufferSize()
{
return _inputbufferSize;
return _inputBufferSize;
}

public void setInputBufferSize(int size)
{
_inputbufferSize = size;
_inputBufferSize = size;
}

protected String findNextProtocol(Connector connector)
Expand Down
161 changes: 161 additions & 0 deletions jetty-util/src/main/java/org/eclipse/jetty/util/VirtualThreads.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
//
// ========================================================================
// Copyright (c) 1995-2022 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//

package org.eclipse.jetty.util;

import java.lang.reflect.Method;
import java.util.concurrent.Executor;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* <p>Utility class to use to query the runtime for virtual thread support,
* and, if virtual threads are supported, to start virtual threads.</p>
*
* @see #areSupported()
* @see #startVirtualThread(Runnable)
* @see #isVirtualThread()
*/
public class VirtualThreads
{
private static final Logger LOG = LoggerFactory.getLogger(VirtualThreads.class);
private static final Method startVirtualThread = probeStartVirtualThread();
private static final Method isVirtualThread = probeIsVirtualThread();

private static Method probeStartVirtualThread()
{
try
{
return Thread.class.getMethod("startVirtualThread", Runnable.class);
}
catch (Throwable x)
{
return null;
}
}

private static Method probeIsVirtualThread()
{
try
{
return Thread.class.getMethod("isVirtual");
}
catch (Throwable x)
{
return null;
}
}

private static void warn()
{
LOG.warn("Virtual thread support is not available (or not enabled via --enable-preview) in the current Java runtime ({})", System.getProperty("java.version"));
}

/**
* @return whether the runtime supports virtual threads
*/
public static boolean areSupported()
{
return startVirtualThread != null;
}

/**
* <p>Starts a virtual thread to execute the given task, or throws
* {@link UnsupportedOperationException} if virtual threads are not
* supported.</p>
*
* @param task the task to execute in a virtual thread
* @see #areSupported()
*/
public static void startVirtualThread(Runnable task)
{
try
{
if (LOG.isDebugEnabled())
LOG.debug("Starting in virtual thread: {}", task);
startVirtualThread.invoke(null, task);
}
catch (Throwable x)
{
warn();
throw new UnsupportedOperationException(x);
}
}

/**
* @return whether the current thread is a virtual thread
*/
public static boolean isVirtualThread()
{
try
{
return (Boolean)isVirtualThread.invoke(Thread.currentThread());
}
catch (Throwable x)
{
warn();
return false;
}
}

/**
* <p>Tests whether the given executor implements {@link Configurable} and
* it has been configured to use virtual threads.</p>
*
* @param executor the Executor to test
* @return whether the given executor implements {@link Configurable}
* and it has been configured to use virtual threads
*/
public static boolean isUseVirtualThreads(Executor executor)
{
if (executor instanceof Configurable)
return ((Configurable)executor).isUseVirtualThreads();
return false;
}

/**
* <p>Implementations of this interface can be configured to use virtual threads.</p>
* <p>Whether virtual threads are actually used depends on whether the runtime
* supports virtual threads and, if the runtime supports them, whether they are
* configured to be used via {@link #setUseVirtualThreads(boolean)}.</p>
*/
public interface Configurable
{
/**
* @return whether to use virtual threads
*/
default boolean isUseVirtualThreads()
{
return false;
}

/**
* @param useVirtualThreads whether to use virtual threads
* @throws UnsupportedOperationException if the runtime does not support virtual threads
* @see #areSupported()
*/
default void setUseVirtualThreads(boolean useVirtualThreads)
{
if (useVirtualThreads && !VirtualThreads.areSupported())
{
warn();
throw new UnsupportedOperationException();
}
}
}

private VirtualThreads()
{
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.stream.Collectors;

import org.eclipse.jetty.util.ProcessorUtils;
import org.eclipse.jetty.util.VirtualThreads;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
Expand All @@ -34,7 +35,7 @@
* A {@link org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool} wrapper around {@link ThreadPoolExecutor}.
*/
@ManagedObject("A thread pool")
public class ExecutorThreadPool extends ContainerLifeCycle implements ThreadPool.SizedThreadPool, TryExecutor
public class ExecutorThreadPool extends ContainerLifeCycle implements ThreadPool.SizedThreadPool, TryExecutor, VirtualThreads.Configurable
{
private final ThreadPoolExecutor _executor;
private final ThreadPoolBudget _budget;
Expand All @@ -46,6 +47,7 @@ public class ExecutorThreadPool extends ContainerLifeCycle implements ThreadPool
private int _priority = Thread.NORM_PRIORITY;
private boolean _daemon;
private boolean _detailedDump;
private boolean _useVirtualThreads;

public ExecutorThreadPool()
{
Expand Down Expand Up @@ -268,6 +270,25 @@ public boolean isLowOnThreads()
return getThreads() == getMaxThreads() && _executor.getQueue().size() >= getIdleThreads();
}

@Override
public boolean isUseVirtualThreads()
{
return _useVirtualThreads;
}

@Override
public void setUseVirtualThreads(boolean useVirtualThreads)
{
try
{
VirtualThreads.Configurable.super.setUseVirtualThreads(useVirtualThreads);
_useVirtualThreads = useVirtualThreads;
}
catch (UnsupportedOperationException ignored)
{
}
}

@Override
protected void doStart() throws Exception
{
Expand Down
Loading

0 comments on commit be3d16b

Please sign in to comment.