Skip to content

Commit

Permalink
Fixes #8007 - Support Loom.
Browse files Browse the repository at this point in the history
Implemented virtual threads support for HTTP/1.1, HTTP/2 and HTTP/3.

Signed-off-by: Simone Bordet <[email protected]>
  • Loading branch information
sbordet committed May 22, 2022
1 parent 160d329 commit fc11e9a
Show file tree
Hide file tree
Showing 8 changed files with 316 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.VirtualThreads;
import org.eclipse.jetty.util.thread.AutoLock;

public class HTTP2ServerConnection extends HTTP2Connection
Expand Down Expand Up @@ -141,7 +142,14 @@ public void onNewStream(Connector connector, IStream stream, HeadersFrame frame)
HttpChannelOverHTTP2 channel = provideHttpChannel(connector, stream);
Runnable task = channel.onRequest(frame);
if (task != null)
{
if (isUseVirtualThreadToInvokeRootHandler())
{
if (VirtualThreads.startVirtualThread(task))
return;
}
offerTask(task, false);
}
}

public void onData(IStream stream, DataFrame frame, Callback callback)
Expand All @@ -153,7 +161,14 @@ public void onData(IStream stream, DataFrame frame, Callback callback)
{
Runnable task = channel.onData(frame, callback);
if (task != null)
{
if (isUseVirtualThreadToInvokeRootHandler())
{
if (VirtualThreads.startVirtualThread(task))
return;
}
offerTask(task, false);
}
}
else
{
Expand All @@ -170,7 +185,14 @@ public void onTrailers(IStream stream, HeadersFrame frame)
{
Runnable task = channel.onTrailer(frame);
if (task != null)
{
if (isUseVirtualThreadToInvokeRootHandler())
{
if (VirtualThreads.startVirtualThread(task))
return;
}
offerTask(task, false);
}
}
}

Expand All @@ -193,6 +215,11 @@ public void onStreamFailure(IStream stream, Throwable failure, Callback callback
Runnable task = channel.onFailure(failure, callback);
if (task != null)
{
if (isUseVirtualThreadToInvokeRootHandler())
{
if (VirtualThreads.startVirtualThread(task))
return;
}
// We must dispatch to another thread because the task
// may call application code that performs blocking I/O.
offerTask(task, true);
Expand Down Expand Up @@ -234,7 +261,14 @@ public void push(Connector connector, IStream stream, MetaData.Request request)
HttpChannelOverHTTP2 channel = provideHttpChannel(connector, stream);
Runnable task = channel.onPushRequest(request);
if (task != null)
{
if (isUseVirtualThreadToInvokeRootHandler())
{
if (VirtualThreads.startVirtualThread(task))
return;
}
offerTask(task, true);
}
}

private HttpChannelOverHTTP2 provideHttpChannel(Connector connector, IStream stream)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.eclipse.jetty.http3.server.internal.ServerHTTP3StreamConnection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.util.VirtualThreads;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -105,9 +106,15 @@ private ServerHTTP3StreamConnection getConnection()
public void onRequest(Stream stream, HeadersFrame frame)
{
HTTP3StreamServer http3Stream = (HTTP3StreamServer)stream;
Runnable task = getConnection().onRequest(http3Stream, frame);
ServerHTTP3StreamConnection connection = getConnection();
Runnable task = connection.onRequest(http3Stream, frame);
if (task != null)
{
if (connection.isUseVirtualThreadToInvokeRootHandler())
{
if (VirtualThreads.startVirtualThread(task))
return;
}
ServerHTTP3Session protocolSession = (ServerHTTP3Session)http3Stream.getSession().getProtocolSession();
protocolSession.offer(task, false);
}
Expand All @@ -117,9 +124,15 @@ public void onRequest(Stream stream, HeadersFrame frame)
public void onDataAvailable(Stream.Server stream)
{
HTTP3Stream http3Stream = (HTTP3Stream)stream;
Runnable task = getConnection().onDataAvailable(http3Stream);
ServerHTTP3StreamConnection connection = getConnection();
Runnable task = connection.onDataAvailable(http3Stream);
if (task != null)
{
if (connection.isUseVirtualThreadToInvokeRootHandler())
{
if (VirtualThreads.startVirtualThread(task))
return;
}
ServerHTTP3Session protocolSession = (ServerHTTP3Session)http3Stream.getSession().getProtocolSession();
protocolSession.offer(task, false);
}
Expand All @@ -129,9 +142,15 @@ public void onDataAvailable(Stream.Server stream)
public void onTrailer(Stream.Server stream, HeadersFrame frame)
{
HTTP3Stream http3Stream = (HTTP3Stream)stream;
Runnable task = getConnection().onTrailer(http3Stream, frame);
ServerHTTP3StreamConnection connection = getConnection();
Runnable task = connection.onTrailer(http3Stream, frame);
if (task != null)
{
if (connection.isUseVirtualThreadToInvokeRootHandler())
{
if (VirtualThreads.startVirtualThread(task))
return;
}
ServerHTTP3Session protocolSession = (ServerHTTP3Session)http3Stream.getSession().getProtocolSession();
protocolSession.offer(task, false);
}
Expand All @@ -155,9 +174,15 @@ public boolean onIdleTimeout(Stream.Server stream, Throwable failure)
public void onFailure(Stream.Server stream, long error, Throwable failure)
{
HTTP3Stream http3Stream = (HTTP3Stream)stream;
Runnable task = getConnection().onFailure((HTTP3Stream)stream, failure);
ServerHTTP3StreamConnection connection = getConnection();
Runnable task = connection.onFailure((HTTP3Stream)stream, failure);
if (task != null)
{
if (connection.isUseVirtualThreadToInvokeRootHandler())
{
if (VirtualThreads.startVirtualThread(task))
return;
}
ServerHTTP3Session protocolSession = (ServerHTTP3Session)http3Stream.getSession().getProtocolSession();
protocolSession.offer(task, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.concurrent.TimeoutException;

import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.VirtualThreads;
import org.eclipse.jetty.util.thread.Invocable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -42,6 +43,7 @@ public abstract class AbstractConnection implements Connection
private final Executor _executor;
private final Callback _readCallback;
private int _inputBufferSize = 2048;
private boolean _useVirtualThreadToInvokeRootHandler;

protected AbstractConnection(EndPoint endp, Executor executor)
{
Expand Down Expand Up @@ -75,6 +77,16 @@ public void setInputBufferSize(int inputBufferSize)
_inputBufferSize = inputBufferSize;
}

public boolean isUseVirtualThreadToInvokeRootHandler()
{
return _useVirtualThreadToInvokeRootHandler;
}

public void setUseVirtualThreadToInvokeRootHandler(boolean useVirtualThreadToInvokeRootHandler)
{
_useVirtualThreadToInvokeRootHandler = useVirtualThreadToInvokeRootHandler;
}

protected Executor getExecutor()
{
return _executor;
Expand Down Expand Up @@ -311,24 +323,40 @@ public String toConnectionString()
this);
}

private class ReadCallback implements Callback
private class ReadCallback implements Callback, Invocable
{
@Override
public void succeeded()
{
if (isUseVirtualThreadToInvokeRootHandler())
{
if (VirtualThreads.startVirtualThread(AbstractConnection.this::onFillable))
return;
}
onFillable();
}

@Override
public void failed(final Throwable x)
public void failed(Throwable x)
{
if (isUseVirtualThreadToInvokeRootHandler())
{
if (VirtualThreads.startVirtualThread(() -> onFillInterestedFailed(x)))
return;
}
onFillInterestedFailed(x);
}

@Override
public InvocationType getInvocationType()
{
return isUseVirtualThreadToInvokeRootHandler() ? InvocationType.NON_BLOCKING : InvocationType.BLOCKING;
}

@Override
public String toString()
{
return String.format("AC.ReadCB@%h{%s}", AbstractConnection.this, AbstractConnection.this);
return String.format("%s@%x{%s}", getClass().getSimpleName(), hashCode(), AbstractConnection.this);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,13 @@

package org.eclipse.jetty.server;

import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;

import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.ArrayUtil;
import org.eclipse.jetty.util.VirtualThreads;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
Expand All @@ -34,18 +33,19 @@ public abstract class AbstractConnectionFactory extends ContainerLifeCycle imple
{
private final String _protocol;
private final List<String> _protocols;
private int _inputbufferSize = 8192;
private int _inputBufferSize = 8192;
private boolean _useVirtualThreadToInvokeRootHandler;

protected AbstractConnectionFactory(String protocol)
{
_protocol = protocol;
_protocols = Collections.unmodifiableList(Arrays.asList(new String[]{protocol}));
_protocols = List.of(protocol);
}

protected AbstractConnectionFactory(String... protocols)
{
_protocol = protocols[0];
_protocols = Collections.unmodifiableList(Arrays.asList(protocols));
_protocols = List.of(protocols);
}

@Override
Expand All @@ -64,12 +64,25 @@ public List<String> getProtocols()
@ManagedAttribute("The buffer size used to read from the network")
public int getInputBufferSize()
{
return _inputbufferSize;
return _inputBufferSize;
}

public void setInputBufferSize(int size)
{
_inputbufferSize = size;
_inputBufferSize = size;
}

public boolean isUseVirtualThreadToInvokeRootHandler()
{
return _useVirtualThreadToInvokeRootHandler;
}

public void setUseVirtualThreadToInvokeRootHandler(boolean useVirtualThreadToInvokeRootHandler)
{
if (useVirtualThreadToInvokeRootHandler && !VirtualThreads.areSupported())
VirtualThreads.warn();
else
_useVirtualThreadToInvokeRootHandler = useVirtualThreadToInvokeRootHandler;
}

protected String findNextProtocol(Connector connector)
Expand Down Expand Up @@ -102,6 +115,8 @@ protected AbstractConnection configure(AbstractConnection connection, Connector
// Add Connection.Listeners from this factory
getEventListeners().forEach(connection::addEventListener);

connection.setUseVirtualThreadToInvokeRootHandler(isUseVirtualThreadToInvokeRootHandler());

return connection;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
Expand All @@ -54,6 +55,7 @@
import org.eclipse.jetty.util.HostPort;
import org.eclipse.jetty.util.SharedBlockingCallback.Blocker;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.VirtualThreads;
import org.eclipse.jetty.util.thread.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -1146,6 +1148,16 @@ public HttpOutput.Interceptor getNextInterceptor()

protected void execute(Runnable task)
{
Connection c = getConnection();
if (c instanceof AbstractConnection)
{
AbstractConnection connection = (AbstractConnection)c;
if (connection.isUseVirtualThreadToInvokeRootHandler())
{
if (VirtualThreads.startVirtualThread(task))
return;
}
}
_executor.execute(task);
}

Expand Down
Loading

0 comments on commit fc11e9a

Please sign in to comment.